use std::{fmt, future::Future, sync::Arc, time::Duration};
use serde::Serialize;
use socketioxide_core::Sid;
use crate::{
BroadcastError, EmitWithAckError, SendError,
ack::{AckInnerStream, AckStream},
adapter::{Adapter, LocalAdapter},
extract::SocketRef,
ns::Namespace,
parser::Parser,
socket::{RemoteSocket, Socket},
};
use socketioxide_core::{
adapter::{BroadcastFlags, BroadcastOptions, Room, RoomParam},
packet::Packet,
parser::{Parse, ParserError},
};
pub struct ConfOperators<'a, A: Adapter = LocalAdapter> {
timeout: Option<Duration>,
socket: &'a Socket<A>,
}
pub struct BroadcastOperators<A: Adapter = LocalAdapter> {
timeout: Option<Duration>,
ns: Arc<Namespace<A>>,
parser: Parser,
opts: BroadcastOptions,
}
impl<A: Adapter> From<ConfOperators<'_, A>> for BroadcastOperators<A> {
fn from(conf: ConfOperators<'_, A>) -> Self {
let opts = BroadcastOptions::new(conf.socket.id);
Self {
timeout: conf.timeout,
ns: conf.socket.ns.clone(),
parser: conf.socket.parser,
opts,
}
}
}
impl<'a, A: Adapter> ConfOperators<'a, A> {
pub(crate) fn new(sender: &'a Socket<A>) -> Self {
Self {
timeout: None,
socket: sender,
}
}
#[doc = include_str!("../docs/operators/to.md")]
pub fn to(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
BroadcastOperators::from(self).to(rooms)
}
#[doc = include_str!("../docs/operators/within.md")]
pub fn within(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
BroadcastOperators::from(self).within(rooms)
}
#[doc = include_str!("../docs/operators/except.md")]
pub fn except(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
BroadcastOperators::from(self).except(rooms)
}
#[doc = include_str!("../docs/operators/local.md")]
pub fn local(self) -> BroadcastOperators<A> {
BroadcastOperators::from(self).local()
}
#[doc = include_str!("../docs/operators/broadcast.md")]
pub fn broadcast(self) -> BroadcastOperators<A> {
BroadcastOperators::from(self).broadcast()
}
#[doc = include_str!("../docs/operators/timeout.md")]
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
}
impl<A: Adapter> ConfOperators<'_, A> {
#[doc = include_str!("../docs/operators/emit.md")]
pub fn emit<T: ?Sized + Serialize>(
mut self,
event: impl AsRef<str>,
data: &T,
) -> Result<(), SendError> {
use crate::SocketError;
use crate::socket::PermitExt;
if !self.socket.connected() {
return Err(SendError::Socket(SocketError::Closed));
}
let permit = match self.socket.reserve() {
Ok(permit) => permit,
Err(e) => {
#[cfg(feature = "tracing")]
tracing::debug!("sending error during emit message: {e:?}");
return Err(SendError::Socket(e));
}
};
let packet = self.get_packet(event, data)?;
permit.send(packet, self.socket.parser);
Ok(())
}
#[doc = include_str!("../docs/operators/emit_with_ack.md")]
pub fn emit_with_ack<T: ?Sized + Serialize, V>(
mut self,
event: impl AsRef<str>,
data: &T,
) -> Result<AckStream<V>, SendError> {
use crate::SocketError;
if !self.socket.connected() {
return Err(SendError::Socket(SocketError::Closed));
}
let permit = match self.socket.reserve() {
Ok(permit) => permit,
Err(e) => {
#[cfg(feature = "tracing")]
tracing::debug!("sending error during emit message: {e:?}");
return Err(SendError::Socket(e));
}
};
let timeout = self
.timeout
.unwrap_or_else(|| self.socket.get_io().config().ack_timeout);
let packet = self.get_packet(event, data)?;
let rx = self.socket.send_with_ack_permit(packet, permit);
let stream = AckInnerStream::send(rx, timeout, self.socket.id);
Ok(AckStream::<V>::new(stream, self.socket.parser))
}
#[doc = include_str!("../docs/operators/join.md")]
pub fn join(self, rooms: impl RoomParam) {
self.socket.join(rooms)
}
#[doc = include_str!("../docs/operators/leave.md")]
pub async fn leave(self, rooms: impl RoomParam) {
self.socket.leave(rooms)
}
fn get_packet<T: ?Sized + Serialize>(
&mut self,
event: impl AsRef<str>,
data: &T,
) -> Result<Packet, ParserError> {
let ns = self.socket.ns.path.clone();
let event = event.as_ref();
let data = self.socket.parser.encode_value(&data, Some(event))?;
Ok(Packet::event(ns, data))
}
}
impl<A: Adapter> BroadcastOperators<A> {
pub(crate) fn new(ns: Arc<Namespace<A>>, parser: Parser) -> Self {
Self {
timeout: None,
ns,
parser,
opts: BroadcastOptions::default(),
}
}
pub(crate) fn from_sock(ns: Arc<Namespace<A>>, sid: Sid, parser: Parser) -> Self {
Self {
timeout: None,
ns,
parser,
opts: BroadcastOptions::new(sid),
}
}
pub fn ns_path(&self) -> &str {
&self.ns.path
}
#[doc = include_str!("../docs/operators/to.md")]
pub fn to(mut self, rooms: impl RoomParam) -> Self {
self.opts.rooms.extend(rooms.into_room_iter());
self.broadcast()
}
#[doc = include_str!("../docs/operators/within.md")]
pub fn within(mut self, rooms: impl RoomParam) -> Self {
self.opts.rooms.extend(rooms.into_room_iter());
self
}
#[doc = include_str!("../docs/operators/except.md")]
pub fn except(mut self, rooms: impl RoomParam) -> Self {
self.opts.except.extend(rooms.into_room_iter());
self.broadcast()
}
#[doc = include_str!("../docs/operators/local.md")]
pub fn local(mut self) -> Self {
self.opts.add_flag(BroadcastFlags::Local);
self
}
#[doc = include_str!("../docs/operators/broadcast.md")]
pub fn broadcast(mut self) -> Self {
self.opts.add_flag(BroadcastFlags::Broadcast);
self
}
#[doc = include_str!("../docs/operators/timeout.md")]
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
}
impl<A: Adapter> BroadcastOperators<A> {
#[doc = include_str!("../docs/operators/emit.md")]
pub fn emit<T: ?Sized + Serialize>(
mut self,
event: impl AsRef<str>,
data: &T,
) -> impl Future<Output = Result<(), BroadcastError>> + Send {
let packet = self.get_packet(event, data);
async move {
self.ns
.adapter
.broadcast(packet?, self.opts)
.await
.inspect_err(|_e| {
#[cfg(feature = "tracing")]
tracing::debug!("broadcast error: {_e}");
})?;
Ok(())
}
}
#[doc = include_str!("../docs/operators/emit_with_ack.md")]
pub fn emit_with_ack<T: ?Sized + Serialize, V>(
mut self,
event: impl AsRef<str>,
data: &T,
) -> impl Future<Output = Result<AckStream<V, A>, EmitWithAckError>> + Send {
let packet = self.get_packet(event, data);
async move {
let stream = self
.ns
.adapter
.broadcast_with_ack(packet?, self.opts, self.timeout)
.await
.map_err(|e| EmitWithAckError::Adapter(Box::new(e)))?;
Ok(AckStream::new(stream, self.parser))
}
}
#[doc = include_str!("../docs/operators/sockets.md")]
pub fn sockets(self) -> Vec<SocketRef<A>> {
let ids = self.ns.adapter.get_local().sockets(self.opts);
ids.into_iter()
.filter_map(|id| self.ns.get_socket(id).ok())
.map(SocketRef::from)
.collect()
}
#[doc = include_str!("../docs/operators/fetch_sockets.md")]
pub async fn fetch_sockets(self) -> Result<Vec<RemoteSocket<A>>, A::Error> {
let sockets = self
.ns
.adapter
.fetch_sockets(self.opts)
.await?
.into_iter()
.map(|data| RemoteSocket::new(data, &self.ns.adapter, self.parser))
.collect();
Ok(sockets)
}
#[doc = include_str!("../docs/operators/disconnect.md")]
pub async fn disconnect(self) -> Result<(), BroadcastError> {
self.ns.adapter.disconnect_socket(self.opts).await
}
#[doc = include_str!("../docs/operators/join.md")]
#[allow(clippy::manual_async_fn)] pub fn join(self, rooms: impl RoomParam) -> impl Future<Output = Result<(), A::Error>> + Send {
async move { self.ns.adapter.add_sockets(self.opts, rooms).await }
}
#[doc = include_str!("../docs/operators/leave.md")]
#[allow(clippy::manual_async_fn)] pub fn leave(self, rooms: impl RoomParam) -> impl Future<Output = Result<(), A::Error>> + Send {
async move { self.ns.adapter.del_sockets(self.opts, rooms).await }
}
#[doc = include_str!("../docs/operators/rooms.md")]
pub async fn rooms(self) -> Result<Vec<Room>, A::Error> {
self.ns.adapter.rooms(self.opts).await
}
#[doc = include_str!("../docs/operators/get_socket.md")]
pub fn get_socket(&self, sid: Sid) -> Option<SocketRef<A>> {
self.ns.get_socket(sid).map(SocketRef::from).ok()
}
fn get_packet<T: ?Sized + Serialize>(
&mut self,
event: impl AsRef<str>,
data: &T,
) -> Result<Packet, ParserError> {
let ns = self.ns.path.clone();
let data = self.parser.encode_value(data, Some(event.as_ref()))?;
Ok(Packet::event(ns, data))
}
}
impl<'a, A: Adapter> Clone for ConfOperators<'a, A> {
fn clone(&self) -> Self {
Self {
timeout: self.timeout,
socket: self.socket,
}
}
}
impl<A: Adapter> Clone for BroadcastOperators<A> {
fn clone(&self) -> Self {
Self {
ns: self.ns.clone(),
opts: self.opts.clone(),
timeout: self.timeout,
parser: self.parser,
}
}
}
impl<A: Adapter> fmt::Debug for BroadcastOperators<A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BroadcastOperators")
.field("ns", &self.ns)
.field("opts", &self.opts)
.field("timeout", &self.timeout)
.finish()
}
}
impl<A: Adapter> fmt::Debug for ConfOperators<'_, A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConfOperators")
.field("timeout", &self.timeout)
.finish()
}
}