socketioxide/
operators.rs

1//! Operators are used to select sockets to send a packet to,
2//! or to configure the packet that will be emitted.
3//!
4//! They use the builder pattern to chain operators.
5//!
6//! There are two types of operators:
7//! * [`ConfOperators`]: Chainable operators to configure the message to be sent.
8//! * [`BroadcastOperators`]: Chainable operators to select sockets to send a message to and to configure the message to be sent.
9use std::{fmt, future::Future, sync::Arc, time::Duration};
10
11use serde::Serialize;
12use socketioxide_core::Sid;
13
14use crate::{
15    BroadcastError, EmitWithAckError, SendError,
16    ack::{AckInnerStream, AckStream},
17    adapter::{Adapter, LocalAdapter},
18    extract::SocketRef,
19    ns::Namespace,
20    parser::Parser,
21    socket::{RemoteSocket, Socket},
22};
23
24use socketioxide_core::{
25    adapter::{BroadcastFlags, BroadcastOptions, Room, RoomParam},
26    packet::Packet,
27    parser::{Parse, ParserError},
28};
29
30/// Chainable operators to configure the message to be sent.
31pub struct ConfOperators<'a, A: Adapter = LocalAdapter> {
32    timeout: Option<Duration>,
33    socket: &'a Socket<A>,
34}
35/// Chainable operators to select sockets to send a message to and to configure the message to be sent.
36pub struct BroadcastOperators<A: Adapter = LocalAdapter> {
37    timeout: Option<Duration>,
38    ns: Arc<Namespace<A>>,
39    parser: Parser,
40    opts: BroadcastOptions,
41}
42
43impl<A: Adapter> From<ConfOperators<'_, A>> for BroadcastOperators<A> {
44    fn from(conf: ConfOperators<'_, A>) -> Self {
45        let opts = BroadcastOptions::new(conf.socket.id);
46        Self {
47            timeout: conf.timeout,
48            ns: conf.socket.ns.clone(),
49            parser: conf.socket.parser,
50            opts,
51        }
52    }
53}
54
55// ==== impl ConfOperators operations ====
56impl<'a, A: Adapter> ConfOperators<'a, A> {
57    pub(crate) fn new(sender: &'a Socket<A>) -> Self {
58        Self {
59            timeout: None,
60            socket: sender,
61        }
62    }
63
64    #[doc = include_str!("../docs/operators/to.md")]
65    pub fn to(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
66        BroadcastOperators::from(self).to(rooms)
67    }
68
69    #[doc = include_str!("../docs/operators/within.md")]
70    pub fn within(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
71        BroadcastOperators::from(self).within(rooms)
72    }
73
74    #[doc = include_str!("../docs/operators/except.md")]
75    pub fn except(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
76        BroadcastOperators::from(self).except(rooms)
77    }
78
79    #[doc = include_str!("../docs/operators/local.md")]
80    pub fn local(self) -> BroadcastOperators<A> {
81        BroadcastOperators::from(self).local()
82    }
83
84    #[doc = include_str!("../docs/operators/broadcast.md")]
85    pub fn broadcast(self) -> BroadcastOperators<A> {
86        BroadcastOperators::from(self).broadcast()
87    }
88
89    #[doc = include_str!("../docs/operators/timeout.md")]
90    pub fn timeout(mut self, timeout: Duration) -> Self {
91        self.timeout = Some(timeout);
92        self
93    }
94}
95
96// ==== impl ConfOperators consume fns ====
97impl<A: Adapter> ConfOperators<'_, A> {
98    #[doc = include_str!("../docs/operators/emit.md")]
99    pub fn emit<T: ?Sized + Serialize>(
100        mut self,
101        event: impl AsRef<str>,
102        data: &T,
103    ) -> Result<(), SendError> {
104        use crate::SocketError;
105        use crate::socket::PermitExt;
106        if !self.socket.connected() {
107            return Err(SendError::Socket(SocketError::Closed));
108        }
109        let permit = match self.socket.reserve() {
110            Ok(permit) => permit,
111            Err(e) => {
112                #[cfg(feature = "tracing")]
113                tracing::debug!("sending error during emit message: {e:?}");
114                return Err(SendError::Socket(e));
115            }
116        };
117        let packet = self.get_packet(event, data)?;
118        permit.send(packet, self.socket.parser);
119
120        Ok(())
121    }
122
123    #[doc = include_str!("../docs/operators/emit_with_ack.md")]
124    pub fn emit_with_ack<T: ?Sized + Serialize, V>(
125        mut self,
126        event: impl AsRef<str>,
127        data: &T,
128    ) -> Result<AckStream<V>, SendError> {
129        use crate::SocketError;
130        if !self.socket.connected() {
131            return Err(SendError::Socket(SocketError::Closed));
132        }
133        let permit = match self.socket.reserve() {
134            Ok(permit) => permit,
135            Err(e) => {
136                #[cfg(feature = "tracing")]
137                tracing::debug!("sending error during emit message: {e:?}");
138                return Err(SendError::Socket(e));
139            }
140        };
141        let timeout = self
142            .timeout
143            .unwrap_or_else(|| self.socket.get_io().config().ack_timeout);
144        let packet = self.get_packet(event, data)?;
145        let rx = self.socket.send_with_ack_permit(packet, permit);
146        let stream = AckInnerStream::send(rx, timeout, self.socket.id);
147        Ok(AckStream::<V>::new(stream, self.socket.parser))
148    }
149
150    #[doc = include_str!("../docs/operators/join.md")]
151    pub fn join(self, rooms: impl RoomParam) {
152        self.socket.join(rooms)
153    }
154
155    #[doc = include_str!("../docs/operators/leave.md")]
156    pub async fn leave(self, rooms: impl RoomParam) {
157        self.socket.leave(rooms)
158    }
159
160    /// Creates a packet with the given event and data.
161    fn get_packet<T: ?Sized + Serialize>(
162        &mut self,
163        event: impl AsRef<str>,
164        data: &T,
165    ) -> Result<Packet, ParserError> {
166        let ns = self.socket.ns.path.clone();
167        let event = event.as_ref();
168        let data = self.socket.parser.encode_value(&data, Some(event))?;
169        Ok(Packet::event(ns, data))
170    }
171}
172
173impl<A: Adapter> BroadcastOperators<A> {
174    pub(crate) fn new(ns: Arc<Namespace<A>>, parser: Parser) -> Self {
175        Self {
176            timeout: None,
177            ns,
178            parser,
179            opts: BroadcastOptions::default(),
180        }
181    }
182    pub(crate) fn from_sock(ns: Arc<Namespace<A>>, sid: Sid, parser: Parser) -> Self {
183        Self {
184            timeout: None,
185            ns,
186            parser,
187            opts: BroadcastOptions::new(sid),
188        }
189    }
190
191    /// Returns the namespace path of the broadcast operators.
192    pub fn ns_path(&self) -> &str {
193        &self.ns.path
194    }
195
196    #[doc = include_str!("../docs/operators/to.md")]
197    pub fn to(mut self, rooms: impl RoomParam) -> Self {
198        self.opts.rooms.extend(rooms.into_room_iter());
199        self.broadcast()
200    }
201
202    #[doc = include_str!("../docs/operators/within.md")]
203    pub fn within(mut self, rooms: impl RoomParam) -> Self {
204        self.opts.rooms.extend(rooms.into_room_iter());
205        self
206    }
207
208    #[doc = include_str!("../docs/operators/except.md")]
209    pub fn except(mut self, rooms: impl RoomParam) -> Self {
210        self.opts.except.extend(rooms.into_room_iter());
211        self.broadcast()
212    }
213
214    #[doc = include_str!("../docs/operators/local.md")]
215    pub fn local(mut self) -> Self {
216        self.opts.add_flag(BroadcastFlags::Local);
217        self
218    }
219
220    #[doc = include_str!("../docs/operators/broadcast.md")]
221    pub fn broadcast(mut self) -> Self {
222        self.opts.add_flag(BroadcastFlags::Broadcast);
223        self
224    }
225
226    #[doc = include_str!("../docs/operators/timeout.md")]
227    pub fn timeout(mut self, timeout: Duration) -> Self {
228        self.timeout = Some(timeout);
229        self
230    }
231}
232
233// ==== impl BroadcastOperators consume fns ====
234impl<A: Adapter> BroadcastOperators<A> {
235    #[doc = include_str!("../docs/operators/emit.md")]
236    pub fn emit<T: ?Sized + Serialize>(
237        mut self,
238        event: impl AsRef<str>,
239        data: &T,
240    ) -> impl Future<Output = Result<(), BroadcastError>> + Send {
241        let packet = self.get_packet(event, data);
242        async move {
243            self.ns
244                .adapter
245                .broadcast(packet?, self.opts)
246                .await
247                .map_err(|e| {
248                    #[cfg(feature = "tracing")]
249                    tracing::debug!("broadcast error: {e}");
250                    e
251                })?;
252            Ok(())
253        }
254    }
255
256    #[doc = include_str!("../docs/operators/emit_with_ack.md")]
257    pub fn emit_with_ack<T: ?Sized + Serialize, V>(
258        mut self,
259        event: impl AsRef<str>,
260        data: &T,
261    ) -> impl Future<Output = Result<AckStream<V, A>, EmitWithAckError>> + Send {
262        let packet = self.get_packet(event, data);
263        async move {
264            let stream = self
265                .ns
266                .adapter
267                .broadcast_with_ack(packet?, self.opts, self.timeout)
268                .await
269                .map_err(|e| EmitWithAckError::Adapter(Box::new(e)))?;
270            Ok(AckStream::new(stream, self.parser))
271        }
272    }
273
274    #[doc = include_str!("../docs/operators/sockets.md")]
275    pub fn sockets(self) -> Vec<SocketRef<A>> {
276        let ids = self.ns.adapter.get_local().sockets(self.opts);
277
278        ids.into_iter()
279            .filter_map(|id| self.ns.get_socket(id).ok())
280            .map(SocketRef::from)
281            .collect()
282    }
283
284    #[doc = include_str!("../docs/operators/fetch_sockets.md")]
285    pub async fn fetch_sockets(self) -> Result<Vec<RemoteSocket<A>>, A::Error> {
286        let sockets = self
287            .ns
288            .adapter
289            .fetch_sockets(self.opts)
290            .await?
291            .into_iter()
292            .map(|data| RemoteSocket::new(data, &self.ns.adapter, self.parser))
293            .collect();
294        Ok(sockets)
295    }
296
297    #[doc = include_str!("../docs/operators/disconnect.md")]
298    pub async fn disconnect(self) -> Result<(), BroadcastError> {
299        self.ns.adapter.disconnect_socket(self.opts).await
300    }
301
302    #[doc = include_str!("../docs/operators/join.md")]
303    #[allow(clippy::manual_async_fn)] // related to issue: https://github.com/rust-lang/rust-clippy/issues/12664
304    pub fn join(self, rooms: impl RoomParam) -> impl Future<Output = Result<(), A::Error>> + Send {
305        async move { self.ns.adapter.add_sockets(self.opts, rooms).await }
306    }
307
308    #[doc = include_str!("../docs/operators/leave.md")]
309    #[allow(clippy::manual_async_fn)] // related to issue: https://github.com/rust-lang/rust-clippy/issues/12664
310    pub fn leave(self, rooms: impl RoomParam) -> impl Future<Output = Result<(), A::Error>> + Send {
311        async move { self.ns.adapter.del_sockets(self.opts, rooms).await }
312    }
313
314    #[doc = include_str!("../docs/operators/rooms.md")]
315    pub async fn rooms(self) -> Result<Vec<Room>, A::Error> {
316        self.ns.adapter.rooms(self.opts).await
317    }
318
319    #[doc = include_str!("../docs/operators/get_socket.md")]
320    pub fn get_socket(&self, sid: Sid) -> Option<SocketRef<A>> {
321        self.ns.get_socket(sid).map(SocketRef::from).ok()
322    }
323
324    /// Creates a packet with the given event and data.
325    fn get_packet<T: ?Sized + Serialize>(
326        &mut self,
327        event: impl AsRef<str>,
328        data: &T,
329    ) -> Result<Packet, ParserError> {
330        let ns = self.ns.path.clone();
331        let data = self.parser.encode_value(data, Some(event.as_ref()))?;
332        Ok(Packet::event(ns, data))
333    }
334}
335
336impl<'a, A: Adapter> Clone for ConfOperators<'a, A> {
337    fn clone(&self) -> Self {
338        Self {
339            timeout: self.timeout,
340            socket: self.socket,
341        }
342    }
343}
344impl<A: Adapter> Clone for BroadcastOperators<A> {
345    fn clone(&self) -> Self {
346        Self {
347            ns: self.ns.clone(),
348            opts: self.opts.clone(),
349            timeout: self.timeout,
350            parser: self.parser,
351        }
352    }
353}
354impl<A: Adapter> fmt::Debug for BroadcastOperators<A> {
355    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
356        f.debug_struct("BroadcastOperators")
357            .field("ns", &self.ns)
358            .field("opts", &self.opts)
359            .field("timeout", &self.timeout)
360            .finish()
361    }
362}
363
364impl<A: Adapter> fmt::Debug for ConfOperators<'_, A> {
365    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
366        f.debug_struct("ConfOperators")
367            .field("timeout", &self.timeout)
368            .finish()
369    }
370}