socketioxide/
operators.rs

1//! Operators are used to select sockets to send a packet to,
2//! or to configure the packet that will be emitted.
3//!
4//! They use the builder pattern to chain operators.
5//!
6//! There are two types of operators:
7//! * [`ConfOperators`]: Chainable operators to configure the message to be sent.
8//! * [`BroadcastOperators`]: Chainable operators to select sockets to send a message to and to configure the message to be sent.
9use std::{future::Future, sync::Arc, time::Duration};
10
11use serde::Serialize;
12use socketioxide_core::Sid;
13
14use crate::{
15    BroadcastError, EmitWithAckError, SendError,
16    ack::{AckInnerStream, AckStream},
17    adapter::{Adapter, LocalAdapter},
18    extract::SocketRef,
19    ns::Namespace,
20    parser::Parser,
21    socket::{RemoteSocket, Socket},
22};
23
24use socketioxide_core::{
25    adapter::{BroadcastFlags, BroadcastOptions, Room, RoomParam},
26    packet::Packet,
27    parser::{Parse, ParserError},
28};
29
30/// Chainable operators to configure the message to be sent.
31pub struct ConfOperators<'a, A: Adapter = LocalAdapter> {
32    timeout: Option<Duration>,
33    socket: &'a Socket<A>,
34}
35/// Chainable operators to select sockets to send a message to and to configure the message to be sent.
36pub struct BroadcastOperators<A: Adapter = LocalAdapter> {
37    timeout: Option<Duration>,
38    ns: Arc<Namespace<A>>,
39    parser: Parser,
40    opts: BroadcastOptions,
41}
42
43impl<A: Adapter> From<ConfOperators<'_, A>> for BroadcastOperators<A> {
44    fn from(conf: ConfOperators<'_, A>) -> Self {
45        let opts = BroadcastOptions::new(conf.socket.id);
46        Self {
47            timeout: conf.timeout,
48            ns: conf.socket.ns.clone(),
49            parser: conf.socket.parser,
50            opts,
51        }
52    }
53}
54
55// ==== impl ConfOperators operations ====
56impl<'a, A: Adapter> ConfOperators<'a, A> {
57    pub(crate) fn new(sender: &'a Socket<A>) -> Self {
58        Self {
59            timeout: None,
60            socket: sender,
61        }
62    }
63
64    #[doc = include_str!("../docs/operators/to.md")]
65    pub fn to(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
66        BroadcastOperators::from(self).to(rooms)
67    }
68
69    #[doc = include_str!("../docs/operators/within.md")]
70    pub fn within(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
71        BroadcastOperators::from(self).within(rooms)
72    }
73
74    #[doc = include_str!("../docs/operators/except.md")]
75    pub fn except(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
76        BroadcastOperators::from(self).except(rooms)
77    }
78
79    #[doc = include_str!("../docs/operators/local.md")]
80    pub fn local(self) -> BroadcastOperators<A> {
81        BroadcastOperators::from(self).local()
82    }
83
84    #[doc = include_str!("../docs/operators/broadcast.md")]
85    pub fn broadcast(self) -> BroadcastOperators<A> {
86        BroadcastOperators::from(self).broadcast()
87    }
88
89    #[doc = include_str!("../docs/operators/timeout.md")]
90    pub fn timeout(mut self, timeout: Duration) -> Self {
91        self.timeout = Some(timeout);
92        self
93    }
94}
95
96// ==== impl ConfOperators consume fns ====
97impl<A: Adapter> ConfOperators<'_, A> {
98    #[doc = include_str!("../docs/operators/emit.md")]
99    pub fn emit<T: ?Sized + Serialize>(
100        mut self,
101        event: impl AsRef<str>,
102        data: &T,
103    ) -> Result<(), SendError> {
104        use crate::SocketError;
105        use crate::socket::PermitExt;
106        if !self.socket.connected() {
107            return Err(SendError::Socket(SocketError::Closed));
108        }
109        let permit = match self.socket.reserve() {
110            Ok(permit) => permit,
111            Err(e) => {
112                #[cfg(feature = "tracing")]
113                tracing::debug!("sending error during emit message: {e:?}");
114                return Err(SendError::Socket(e));
115            }
116        };
117        let packet = self.get_packet(event, data)?;
118        permit.send(packet, self.socket.parser);
119
120        Ok(())
121    }
122
123    #[doc = include_str!("../docs/operators/emit_with_ack.md")]
124    pub fn emit_with_ack<T: ?Sized + Serialize, V>(
125        mut self,
126        event: impl AsRef<str>,
127        data: &T,
128    ) -> Result<AckStream<V>, SendError> {
129        use crate::SocketError;
130        if !self.socket.connected() {
131            return Err(SendError::Socket(SocketError::Closed));
132        }
133        let permit = match self.socket.reserve() {
134            Ok(permit) => permit,
135            Err(e) => {
136                #[cfg(feature = "tracing")]
137                tracing::debug!("sending error during emit message: {e:?}");
138                return Err(SendError::Socket(e));
139            }
140        };
141        let timeout = self
142            .timeout
143            .unwrap_or_else(|| self.socket.get_io().config().ack_timeout);
144        let packet = self.get_packet(event, data)?;
145        let rx = self.socket.send_with_ack_permit(packet, permit);
146        let stream = AckInnerStream::send(rx, timeout, self.socket.id);
147        Ok(AckStream::<V>::new(stream, self.socket.parser))
148    }
149
150    #[doc = include_str!("../docs/operators/join.md")]
151    pub fn join(self, rooms: impl RoomParam) {
152        self.socket.join(rooms)
153    }
154
155    #[doc = include_str!("../docs/operators/leave.md")]
156    pub async fn leave(self, rooms: impl RoomParam) {
157        self.socket.leave(rooms)
158    }
159
160    /// Creates a packet with the given event and data.
161    fn get_packet<T: ?Sized + Serialize>(
162        &mut self,
163        event: impl AsRef<str>,
164        data: &T,
165    ) -> Result<Packet, ParserError> {
166        let ns = self.socket.ns.path.clone();
167        let event = event.as_ref();
168        let data = self.socket.parser.encode_value(&data, Some(event))?;
169        Ok(Packet::event(ns, data))
170    }
171}
172
173impl<A: Adapter> BroadcastOperators<A> {
174    pub(crate) fn new(ns: Arc<Namespace<A>>, parser: Parser) -> Self {
175        Self {
176            timeout: None,
177            ns,
178            parser,
179            opts: BroadcastOptions::default(),
180        }
181    }
182    pub(crate) fn from_sock(ns: Arc<Namespace<A>>, sid: Sid, parser: Parser) -> Self {
183        Self {
184            timeout: None,
185            ns,
186            parser,
187            opts: BroadcastOptions::new(sid),
188        }
189    }
190
191    #[doc = include_str!("../docs/operators/to.md")]
192    pub fn to(mut self, rooms: impl RoomParam) -> Self {
193        self.opts.rooms.extend(rooms.into_room_iter());
194        self.broadcast()
195    }
196
197    #[doc = include_str!("../docs/operators/within.md")]
198    pub fn within(mut self, rooms: impl RoomParam) -> Self {
199        self.opts.rooms.extend(rooms.into_room_iter());
200        self
201    }
202
203    #[doc = include_str!("../docs/operators/except.md")]
204    pub fn except(mut self, rooms: impl RoomParam) -> Self {
205        self.opts.except.extend(rooms.into_room_iter());
206        self.broadcast()
207    }
208
209    #[doc = include_str!("../docs/operators/local.md")]
210    pub fn local(mut self) -> Self {
211        self.opts.add_flag(BroadcastFlags::Local);
212        self
213    }
214
215    #[doc = include_str!("../docs/operators/broadcast.md")]
216    pub fn broadcast(mut self) -> Self {
217        self.opts.add_flag(BroadcastFlags::Broadcast);
218        self
219    }
220
221    #[doc = include_str!("../docs/operators/timeout.md")]
222    pub fn timeout(mut self, timeout: Duration) -> Self {
223        self.timeout = Some(timeout);
224        self
225    }
226}
227
228// ==== impl BroadcastOperators consume fns ====
229impl<A: Adapter> BroadcastOperators<A> {
230    #[doc = include_str!("../docs/operators/emit.md")]
231    pub fn emit<T: ?Sized + Serialize>(
232        mut self,
233        event: impl AsRef<str>,
234        data: &T,
235    ) -> impl Future<Output = Result<(), BroadcastError>> + Send {
236        let packet = self.get_packet(event, data);
237        async move {
238            self.ns
239                .adapter
240                .broadcast(packet?, self.opts)
241                .await
242                .map_err(|e| {
243                    #[cfg(feature = "tracing")]
244                    tracing::debug!("broadcast error: {e}");
245                    e
246                })?;
247            Ok(())
248        }
249    }
250
251    #[doc = include_str!("../docs/operators/emit_with_ack.md")]
252    pub fn emit_with_ack<T: ?Sized + Serialize, V>(
253        mut self,
254        event: impl AsRef<str>,
255        data: &T,
256    ) -> impl Future<Output = Result<AckStream<V, A>, EmitWithAckError>> + Send {
257        let packet = self.get_packet(event, data);
258        async move {
259            let stream = self
260                .ns
261                .adapter
262                .broadcast_with_ack(packet?, self.opts, self.timeout)
263                .await
264                .map_err(|e| EmitWithAckError::Adapter(Box::new(e)))?;
265            Ok(AckStream::new(stream, self.parser))
266        }
267    }
268
269    #[doc = include_str!("../docs/operators/sockets.md")]
270    pub fn sockets(self) -> Vec<SocketRef<A>> {
271        let ids = self.ns.adapter.get_local().sockets(self.opts);
272
273        ids.into_iter()
274            .filter_map(|id| self.ns.get_socket(id).ok())
275            .map(SocketRef::from)
276            .collect()
277    }
278
279    #[doc = include_str!("../docs/operators/fetch_sockets.md")]
280    pub async fn fetch_sockets(self) -> Result<Vec<RemoteSocket<A>>, A::Error> {
281        let sockets = self
282            .ns
283            .adapter
284            .fetch_sockets(self.opts)
285            .await?
286            .into_iter()
287            .map(|data| RemoteSocket::new(data, &self.ns.adapter, self.parser))
288            .collect();
289        Ok(sockets)
290    }
291
292    #[doc = include_str!("../docs/operators/disconnect.md")]
293    pub async fn disconnect(self) -> Result<(), BroadcastError> {
294        self.ns.adapter.disconnect_socket(self.opts).await
295    }
296
297    #[doc = include_str!("../docs/operators/join.md")]
298    #[allow(clippy::manual_async_fn)] // related to issue: https://github.com/rust-lang/rust-clippy/issues/12664
299    pub fn join(self, rooms: impl RoomParam) -> impl Future<Output = Result<(), A::Error>> + Send {
300        async move { self.ns.adapter.add_sockets(self.opts, rooms).await }
301    }
302
303    #[doc = include_str!("../docs/operators/leave.md")]
304    #[allow(clippy::manual_async_fn)] // related to issue: https://github.com/rust-lang/rust-clippy/issues/12664
305    pub fn leave(self, rooms: impl RoomParam) -> impl Future<Output = Result<(), A::Error>> + Send {
306        async move { self.ns.adapter.del_sockets(self.opts, rooms).await }
307    }
308
309    #[doc = include_str!("../docs/operators/rooms.md")]
310    pub async fn rooms(self) -> Result<Vec<Room>, A::Error> {
311        self.ns.adapter.rooms(self.opts).await
312    }
313
314    #[doc = include_str!("../docs/operators/get_socket.md")]
315    pub fn get_socket(&self, sid: Sid) -> Option<SocketRef<A>> {
316        self.ns.get_socket(sid).map(SocketRef::from).ok()
317    }
318
319    /// Creates a packet with the given event and data.
320    fn get_packet<T: ?Sized + Serialize>(
321        &mut self,
322        event: impl AsRef<str>,
323        data: &T,
324    ) -> Result<Packet, ParserError> {
325        let ns = self.ns.path.clone();
326        let data = self.parser.encode_value(data, Some(event.as_ref()))?;
327        Ok(Packet::event(ns, data))
328    }
329}