socketioxide/
io.rs

1use std::{borrow::Cow, fmt, sync::Arc, time::Duration};
2
3use engineioxide::{
4    TransportType,
5    config::{EngineIoConfig, EngineIoConfigBuilder},
6    service::NotFoundService,
7};
8use serde::Serialize;
9use socketioxide_core::{
10    Sid, Uid,
11    adapter::{DefinedAdapter, Room, RoomParam},
12};
13use socketioxide_parser_common::CommonParser;
14#[cfg(feature = "msgpack")]
15use socketioxide_parser_msgpack::MsgPackParser;
16
17use crate::{
18    BroadcastError, EmitWithAckError,
19    ack::AckStream,
20    adapter::{Adapter, LocalAdapter},
21    client::Client,
22    extract::SocketRef,
23    handler::ConnectHandler,
24    layer::SocketIoLayer,
25    operators::BroadcastOperators,
26    parser::Parser,
27    service::SocketIoService,
28    socket::RemoteSocket,
29};
30
31/// The parser to use to encode and decode socket.io packets
32///
33/// Be sure that the selected parser matches the client parser.
34#[derive(Debug, Clone)]
35pub struct ParserConfig(Parser);
36
37impl ParserConfig {
38    /// Use a [`CommonParser`] to parse incoming and outgoing socket.io packets
39    pub fn common() -> Self {
40        ParserConfig(Parser::Common(CommonParser))
41    }
42
43    /// Use a [`MsgPackParser`] to parse incoming and outgoing socket.io packets
44    #[cfg_attr(docsrs, doc(cfg(feature = "msgpack")))]
45    #[cfg(feature = "msgpack")]
46    pub fn msgpack() -> Self {
47        ParserConfig(Parser::MsgPack(MsgPackParser))
48    }
49}
50
51/// Configuration for Socket.IO & Engine.IO
52#[derive(Debug, Clone)]
53pub struct SocketIoConfig {
54    /// The inner Engine.IO config
55    pub engine_config: EngineIoConfig,
56
57    /// The amount of time the server will wait for an acknowledgement from the client before closing the connection.
58    ///
59    /// Defaults to 5 seconds.
60    pub ack_timeout: Duration,
61
62    /// The amount of time before disconnecting a client that has not successfully joined a namespace.
63    ///
64    /// Defaults to 45 seconds.
65    pub connect_timeout: Duration,
66
67    /// The parser to use to encode and decode socket.io packets
68    pub(crate) parser: Parser,
69
70    /// A global server identifier
71    pub server_id: Uid,
72}
73
74impl Default for SocketIoConfig {
75    fn default() -> Self {
76        Self {
77            engine_config: EngineIoConfig {
78                req_path: "/socket.io".into(),
79                ..Default::default()
80            },
81            ack_timeout: Duration::from_secs(5),
82            connect_timeout: Duration::from_secs(45),
83            parser: Parser::default(),
84            server_id: Uid::new(),
85        }
86    }
87}
88
89/// A builder to create a [`SocketIo`] instance.
90/// It contains everything to configure the socket.io server with a [`SocketIoConfig`].
91/// It can be used to build either a Tower [`Layer`](tower_layer::Layer) or a [`Service`](tower_service::Service).
92pub struct SocketIoBuilder<A: Adapter = LocalAdapter> {
93    config: SocketIoConfig,
94    engine_config_builder: EngineIoConfigBuilder,
95    adapter_state: A::State,
96    #[cfg(feature = "state")]
97    state: state::TypeMap![Send + Sync],
98}
99
100impl SocketIoBuilder<LocalAdapter> {
101    /// Creates a new [`SocketIoBuilder`] with default config
102    pub fn new() -> Self {
103        Self {
104            engine_config_builder: EngineIoConfigBuilder::new().req_path("/socket.io".to_string()),
105            config: SocketIoConfig::default(),
106            adapter_state: (),
107            #[cfg(feature = "state")]
108            state: std::default::Default::default(),
109        }
110    }
111}
112impl<A: Adapter> SocketIoBuilder<A> {
113    /// The path to listen for socket.io requests on.
114    ///
115    /// Defaults to "/socket.io".
116    #[inline]
117    pub fn req_path(mut self, req_path: impl Into<Cow<'static, str>>) -> Self {
118        self.engine_config_builder = self.engine_config_builder.req_path(req_path);
119        self
120    }
121
122    /// The interval at which the server will send a ping packet to the client.
123    ///
124    /// Defaults to 25 seconds.
125    #[inline]
126    pub fn ping_interval(mut self, ping_interval: Duration) -> Self {
127        self.engine_config_builder = self.engine_config_builder.ping_interval(ping_interval);
128        self
129    }
130
131    /// The amount of time the server will wait for a ping response from the client before closing the connection.
132    ///
133    /// Defaults to 20 seconds.
134    #[inline]
135    pub fn ping_timeout(mut self, ping_timeout: Duration) -> Self {
136        self.engine_config_builder = self.engine_config_builder.ping_timeout(ping_timeout);
137        self
138    }
139
140    /// The maximum number of packets that can be buffered per connection before being emitted to the client.
141    /// If the buffer if full the `emit()` method will return an error
142    ///
143    /// Defaults to 128 packets.
144    #[inline]
145    pub fn max_buffer_size(mut self, max_buffer_size: usize) -> Self {
146        self.engine_config_builder = self.engine_config_builder.max_buffer_size(max_buffer_size);
147        self
148    }
149
150    /// The maximum size of a payload in bytes.
151    /// If a payload is bigger than this value the `emit()` method will return an error.
152    ///
153    /// Defaults to 100 kb.
154    #[inline]
155    pub fn max_payload(mut self, max_payload: u64) -> Self {
156        self.engine_config_builder = self.engine_config_builder.max_payload(max_payload);
157        self
158    }
159
160    /// The size of the read buffer for the websocket transport.
161    /// You can tweak this value depending on your use case. Defaults to 4KiB.
162    ///
163    /// Setting it to a higher value will improve performance on heavy read scenarios
164    /// but will consume more memory.
165    #[inline]
166    pub fn ws_read_buffer_size(mut self, ws_read_buffer_size: usize) -> Self {
167        self.engine_config_builder = self
168            .engine_config_builder
169            .ws_read_buffer_size(ws_read_buffer_size);
170        self
171    }
172
173    /// Allowed transports on this server
174    ///
175    /// The `transports` array should have a size of 1 or 2
176    ///
177    /// Defaults to :
178    /// `[TransportType::Polling, TransportType::Websocket]`
179    #[inline]
180    pub fn transports<const N: usize>(mut self, transports: [TransportType; N]) -> Self {
181        self.engine_config_builder = self.engine_config_builder.transports(transports);
182        self
183    }
184
185    /// The amount of time the server will wait for an acknowledgement from the client before closing the connection.
186    ///
187    /// Defaults to 5 seconds.
188    #[inline]
189    pub fn ack_timeout(mut self, ack_timeout: Duration) -> Self {
190        self.config.ack_timeout = ack_timeout;
191        self
192    }
193
194    /// The amount of time before disconnecting a client that has not successfully joined a namespace.
195    ///
196    /// Defaults to 45 seconds.
197    #[inline]
198    pub fn connect_timeout(mut self, connect_timeout: Duration) -> Self {
199        self.config.connect_timeout = connect_timeout;
200        self
201    }
202
203    /// Sets a custom [`SocketIoConfig`] created previously for this [`SocketIoBuilder`]
204    #[inline]
205    pub fn with_config(mut self, config: SocketIoConfig) -> Self {
206        self.config = config;
207        self
208    }
209
210    /// Set a custom [`ParserConfig`] for this [`SocketIoBuilder`]
211    /// ```
212    /// # use socketioxide::{SocketIo, ParserConfig};
213    /// let (io, layer) = SocketIo::builder()
214    ///     .with_parser(ParserConfig::msgpack())
215    ///     .build_layer();
216    /// ```
217    #[inline]
218    pub fn with_parser(mut self, parser: ParserConfig) -> Self {
219        self.config.parser = parser.0;
220        self
221    }
222
223    /// Set a custom [`Adapter`] for this [`SocketIoBuilder`]
224    pub fn with_adapter<B: Adapter>(self, adapter_state: B::State) -> SocketIoBuilder<B> {
225        SocketIoBuilder {
226            config: self.config,
227            engine_config_builder: self.engine_config_builder,
228            adapter_state,
229            #[cfg(feature = "state")]
230            state: self.state,
231        }
232    }
233
234    /// Add a custom global state for the [`SocketIo`] instance.
235    /// This state will be accessible from every handler with the [`State`](crate::extract::State) extractor.
236    /// You can set any number of states as long as they have different types.
237    /// The state must be cloneable, therefore it is recommended to wrap it in an `Arc` if you want shared state.
238    #[inline]
239    #[cfg_attr(docsrs, doc(cfg(feature = "state")))]
240    #[cfg(feature = "state")]
241    pub fn with_state<S: Clone + Send + Sync + 'static>(self, state: S) -> Self {
242        self.state.set(state);
243        self
244    }
245}
246
247impl<A: Adapter> SocketIoBuilder<A> {
248    /// Build a [`SocketIoLayer`] and a [`SocketIo`] instance that can be used as a [`tower_layer::Layer`].
249    pub fn build_layer(mut self) -> (SocketIoLayer<A>, SocketIo<A>) {
250        self.config.engine_config = self.engine_config_builder.build();
251
252        let (layer, client) = SocketIoLayer::from_config(
253            self.config,
254            self.adapter_state,
255            #[cfg(feature = "state")]
256            self.state,
257        );
258        (layer, SocketIo(client))
259    }
260
261    /// Build a [`SocketIoService`] and a [`SocketIo`] instance that
262    /// can be used as a [`hyper::service::Service`] or a [`tower_service::Service`].
263    ///
264    /// This service will be a _standalone_ service that return a 404 error for every non-socket.io request
265    pub fn build_svc(mut self) -> (SocketIoService<NotFoundService, A>, SocketIo<A>) {
266        self.config.engine_config = self.engine_config_builder.build();
267        let (svc, client) = SocketIoService::with_config_inner(
268            NotFoundService,
269            self.config,
270            self.adapter_state,
271            #[cfg(feature = "state")]
272            self.state,
273        );
274        (svc, SocketIo(client))
275    }
276
277    /// Build a [`SocketIoService`] and a [`SocketIo`] instance with an inner service that
278    /// can be used as a [`hyper::service::Service`] or a [`tower_service::Service`].
279    pub fn build_with_inner_svc<S: Clone>(
280        mut self,
281        svc: S,
282    ) -> (SocketIoService<S, A>, SocketIo<A>) {
283        self.config.engine_config = self.engine_config_builder.build();
284
285        let (svc, client) = SocketIoService::with_config_inner(
286            svc,
287            self.config,
288            self.adapter_state,
289            #[cfg(feature = "state")]
290            self.state,
291        );
292        (svc, SocketIo(client))
293    }
294}
295
296impl Default for SocketIoBuilder {
297    fn default() -> Self {
298        Self::new()
299    }
300}
301
302/// The [`SocketIo`] instance can be cheaply cloned and moved around everywhere in your program.
303/// It can be used as the main handle to access the whole socket.io context.
304///
305/// You can also use it as an extractor for all your [`handlers`](crate::handler).
306///
307/// It is generic over the [`Adapter`] type. If you plan to use it with another adapter than the default,
308/// make sure to have a handler that is [generic over the adapter type](crate#adapters).
309pub struct SocketIo<A: Adapter = LocalAdapter>(Arc<Client<A>>);
310
311impl SocketIo<LocalAdapter> {
312    /// Create a new [`SocketIoBuilder`] with a default config
313    #[inline(always)]
314    pub fn builder() -> SocketIoBuilder {
315        SocketIoBuilder::new()
316    }
317
318    /// Create a new [`SocketIoService`] and a [`SocketIo`] instance with a default config.
319    /// This service will be a _standalone_ service that return a 404 error for every non-socket.io request.
320    /// It can be used as a [`Service`](tower_service::Service) (see hyper example)
321    #[inline(always)]
322    pub fn new_svc() -> (SocketIoService<NotFoundService>, SocketIo) {
323        Self::builder().build_svc()
324    }
325
326    /// Create a new [`SocketIoService`] and a [`SocketIo`] instance with a default config.
327    /// It can be used as a [`Service`](tower_service::Service) with an inner service
328    #[inline(always)]
329    pub fn new_inner_svc<S: Clone>(svc: S) -> (SocketIoService<S>, SocketIo) {
330        Self::builder().build_with_inner_svc(svc)
331    }
332
333    /// Build a [`SocketIoLayer`] and a [`SocketIo`] instance with a default config.
334    /// It can be used as a tower [`Layer`](tower_layer::Layer) (see axum example)
335    #[inline(always)]
336    pub fn new_layer() -> (SocketIoLayer, SocketIo) {
337        Self::builder().build_layer()
338    }
339}
340
341impl<A: Adapter> SocketIo<A> {
342    /// Return a reference to the [`SocketIoConfig`] used by this [`SocketIo`] instance
343    #[inline]
344    pub fn config(&self) -> &SocketIoConfig {
345        &self.0.config
346    }
347
348    /// # Register a [`ConnectHandler`] for the given dynamic namespace.
349    ///
350    /// You can specify dynamic parts in the path by using the `{name}` syntax.
351    /// Note that any static namespace will take precedence over a dynamic one.
352    ///
353    /// For more info about namespace routing, see the [matchit] router documentation.
354    ///
355    /// The dynamic namespace will create a child namespace for any path that matches the given pattern
356    /// with the given handler.
357    ///
358    /// * See the [`connect`](crate::handler::connect) module doc for more details on connect handler.
359    /// * See the [`extract`](crate::extract) module doc for more details on available extractors.
360    ///
361    /// ## Errors
362    /// If the pattern is invalid, a [`NsInsertError`](crate::NsInsertError) will be returned.
363    ///
364    /// ## Example
365    /// ```
366    /// # use socketioxide::{SocketIo, extract::SocketRef};
367    /// let (_, io) = SocketIo::new_svc();
368    /// io.dyn_ns("/client/{client_id}", async |socket: SocketRef| {
369    ///     println!("Socket connected on dynamic namespace with namespace path: {}", socket.ns());
370    /// }).unwrap();
371    ///
372    /// ```
373    /// ```
374    /// # use socketioxide::{SocketIo, extract::SocketRef};
375    /// let (_, io) = SocketIo::new_svc();
376    /// io.dyn_ns("/client/{*remaining_path}", async |socket: SocketRef| {
377    ///     println!("Socket connected on dynamic namespace with namespace path: {}", socket.ns());
378    /// }).unwrap();
379    ///
380    /// ```
381    #[inline]
382    pub fn dyn_ns<C, T>(
383        &self,
384        path: impl Into<String>,
385        callback: C,
386    ) -> Result<(), crate::NsInsertError>
387    where
388        C: ConnectHandler<A, T>,
389        T: Send + Sync + 'static,
390    {
391        self.0.add_dyn_ns(path.into(), callback)
392    }
393
394    /// # Delete the namespace with the given path.
395    ///
396    /// This will disconnect all sockets connected to this
397    /// namespace in a deferred way.
398    ///
399    /// # Panics
400    /// If the v4 protocol (legacy) is enabled and the namespace to delete is the default namespace "/".
401    /// For v4, the default namespace cannot be deleted.
402    /// See [official doc](https://socket.io/docs/v3/namespaces/#main-namespace) for more informations.
403    #[inline]
404    pub fn delete_ns(&self, path: impl AsRef<str>) {
405        self.0.delete_ns(path.as_ref());
406    }
407
408    /// # Gracefully close all the connections and drop every sockets
409    ///
410    /// Any `on_disconnect` handler will called with
411    /// [`DisconnectReason::ClosingServer`](crate::socket::DisconnectReason::ClosingServer)
412    #[inline]
413    pub async fn close(&self) {
414        self.0.close().await;
415    }
416
417    /// # Get all the namespaces to perform operations on.
418    ///
419    /// This will return a vector of [`BroadcastOperators`] for each namespace.
420    ///
421    /// # Example
422    /// ```
423    /// # use socketioxide::{SocketIo, extract::SocketRef};
424    /// let (_, io) = SocketIo::new_svc();
425    /// io.ns("/custom_ns", async |socket: SocketRef| {
426    ///     println!("Socket connected on /custom_ns namespace with id: {}", socket.id);
427    /// });
428    ///
429    /// // Later in your code you can get all the namespaces
430    /// for ns in io.nsps() {
431    ///     assert_eq!(ns.ns_path(), "/custom_ns");
432    /// }
433    /// ```
434    #[inline]
435    pub fn nsps(&self) -> Vec<BroadcastOperators<A>> {
436        let parser = self.0.parser();
437        self.0
438            .nsps
439            .read()
440            .unwrap()
441            .values()
442            .map(|ns| BroadcastOperators::new(ns.clone(), parser).broadcast())
443            .collect()
444    }
445
446    // Chaining operators fns
447
448    /// # Select a specific namespace to perform operations on.
449    ///
450    /// Currently you cannot select a dynamic namespace with this method.
451    ///
452    /// # Example
453    /// ```
454    /// # use socketioxide::{SocketIo, extract::SocketRef};
455    /// let (_, io) = SocketIo::new_svc();
456    /// io.ns("custom_ns", async |socket: SocketRef| {
457    ///     println!("Socket connected on /custom_ns namespace with id: {}", socket.id);
458    /// });
459    ///
460    /// // Later in your code you can select the custom_ns namespace
461    /// // and show all sockets connected to it
462    /// async fn test(io: SocketIo) {
463    ///     let sockets = io.of("custom_ns").unwrap().sockets();
464    ///     for socket in sockets {
465    ///        println!("found socket on /custom_ns namespace with id: {}", socket.id);
466    ///     }
467    /// }
468    /// ```
469    #[inline]
470    pub fn of(&self, path: impl AsRef<str>) -> Option<BroadcastOperators<A>> {
471        self.get_op(path.as_ref())
472    }
473
474    /// _Alias for `io.of("/").unwrap().to()`_. If the **default namespace "/" is not found** this fn will panic!
475    #[doc = include_str!("../docs/operators/to.md")]
476    #[inline]
477    pub fn to(&self, rooms: impl RoomParam) -> BroadcastOperators<A> {
478        self.get_default_op().to(rooms)
479    }
480
481    /// _Alias for `io.of("/").unwrap().within()`_. If the **default namespace "/" is not found** this fn will panic!
482    #[doc = include_str!("../docs/operators/within.md")]
483    #[inline]
484    pub fn within(&self, rooms: impl RoomParam) -> BroadcastOperators<A> {
485        self.get_default_op().within(rooms)
486    }
487
488    /// _Alias for `io.of("/").unwrap().except()`_. If the **default namespace "/" is not found** this fn will panic!
489    #[doc = include_str!("../docs/operators/except.md")]
490    #[inline]
491    pub fn except(&self, rooms: impl RoomParam) -> BroadcastOperators<A> {
492        self.get_default_op().except(rooms)
493    }
494
495    /// _Alias for `io.of("/").unwrap().local()`_. If the **default namespace "/" is not found** this fn will panic!
496    #[doc = include_str!("../docs/operators/local.md")]
497    #[inline]
498    pub fn local(&self) -> BroadcastOperators<A> {
499        self.get_default_op().local()
500    }
501
502    /// _Alias for `io.of("/").unwrap().timeout()`_. If the **default namespace "/" is not found** this fn will panic!
503    #[doc = include_str!("../docs/operators/timeout.md")]
504    #[inline]
505    pub fn timeout(&self, timeout: Duration) -> BroadcastOperators<A> {
506        self.get_default_op().timeout(timeout)
507    }
508
509    /// _Alias for `io.of("/").unwrap().emit()`_. If the **default namespace "/" is not found** this fn will panic!
510    #[doc = include_str!("../docs/operators/emit.md")]
511    #[inline]
512    pub async fn emit<T: ?Sized + Serialize>(
513        &self,
514        event: impl AsRef<str>,
515        data: &T,
516    ) -> Result<(), BroadcastError> {
517        self.get_default_op().emit(event, data).await
518    }
519
520    /// _Alias for `io.of("/").unwrap().emit_with_ack()`_. If the **default namespace "/" is not found** this fn will panic!
521    #[doc = include_str!("../docs/operators/emit_with_ack.md")]
522    #[inline]
523    pub async fn emit_with_ack<T: ?Sized + Serialize, V>(
524        &self,
525        event: impl AsRef<str>,
526        data: &T,
527    ) -> Result<AckStream<V, A>, EmitWithAckError> {
528        self.get_default_op().emit_with_ack(event, data).await
529    }
530
531    /// _Alias for `io.of("/").unwrap().sockets()`_. If the **default namespace "/" is not found** this fn will panic!
532    #[doc = include_str!("../docs/operators/sockets.md")]
533    #[inline]
534    pub fn sockets(&self) -> Vec<SocketRef<A>> {
535        self.get_default_op().sockets()
536    }
537
538    /// _Alias for `io.of("/").unwrap().fetch_sockets()`_. If the **default namespace "/" is not found** this fn will panic!
539    #[doc = include_str!("../docs/operators/fetch_sockets.md")]
540    #[inline]
541    pub async fn fetch_sockets(&self) -> Result<Vec<RemoteSocket<A>>, A::Error> {
542        self.get_default_op().fetch_sockets().await
543    }
544
545    /// _Alias for `io.of("/").unwrap().disconnect()`_. If the **default namespace "/" is not found** this fn will panic!
546    #[doc = include_str!("../docs/operators/disconnect.md")]
547    #[inline]
548    pub async fn disconnect(&self) -> Result<(), BroadcastError> {
549        self.get_default_op().disconnect().await
550    }
551
552    /// _Alias for `io.of("/").unwrap().join()`_. If the **default namespace "/" is not found** this fn will panic!
553    #[doc = include_str!("../docs/operators/join.md")]
554    #[inline]
555    pub async fn join(self, rooms: impl RoomParam) -> Result<(), A::Error> {
556        self.get_default_op().join(rooms).await
557    }
558
559    /// _Alias for `io.of("/").unwrap().rooms()`_. If the **default namespace "/" is not found** this fn will panic!
560    #[doc = include_str!("../docs/operators/rooms.md")]
561    pub async fn rooms(&self) -> Result<Vec<Room>, A::Error> {
562        self.get_default_op().rooms().await
563    }
564
565    /// _Alias for `io.of("/").unwrap().rooms()`_. If the **default namespace "/" is not found** this fn will panic!
566    #[doc = include_str!("../docs/operators/leave.md")]
567    #[inline]
568    pub async fn leave(self, rooms: impl RoomParam) -> Result<(), A::Error> {
569        self.get_default_op().leave(rooms).await
570    }
571
572    /// _Alias for `io.of("/").unwrap().get_socket()`_. If the **default namespace "/" is not found** this fn will panic!
573    #[doc = include_str!("../docs/operators/get_socket.md")]
574    #[inline]
575    pub fn get_socket(&self, sid: Sid) -> Option<SocketRef<A>> {
576        self.get_default_op().get_socket(sid)
577    }
578
579    /// _Alias for `io.of("/").unwrap().broadcast()`_. If the **default namespace "/" is not found** this fn will panic!
580    #[doc = include_str!("../docs/operators/broadcast.md")]
581    #[inline]
582    pub fn broadcast(&self) -> BroadcastOperators<A> {
583        self.get_default_op()
584    }
585
586    #[cfg(feature = "state")]
587    pub(crate) fn get_state<T: Clone + 'static>(&self) -> Option<T> {
588        self.0.state.try_get::<T>().cloned()
589    }
590
591    /// Returns a new operator on the given namespace
592    #[inline(always)]
593    fn get_op(&self, path: &str) -> Option<BroadcastOperators<A>> {
594        let parser = self.config().parser;
595        self.0
596            .get_ns(path)
597            .map(|ns| BroadcastOperators::new(ns, parser).broadcast())
598    }
599
600    /// Returns a new operator on the default namespace "/" (root namespace)
601    ///
602    /// # Panics
603    ///
604    /// If the **default namespace "/" is not found** this fn will panic!
605    #[inline(always)]
606    fn get_default_op(&self) -> BroadcastOperators<A> {
607        self.get_op("/").expect("default namespace not found")
608    }
609}
610
611// This private impl is used to ensure that the following methods
612// are only available on a *defined* adapter.
613#[allow(private_bounds)]
614impl<A: DefinedAdapter + Adapter> SocketIo<A> {
615    /// # Register a [`ConnectHandler`] for the given namespace
616    ///
617    /// * See the [`connect`](crate::handler::connect) module doc for more details on connect handler.
618    /// * See the [`extract`](crate::extract) module doc for more details on available extractors.
619    ///
620    /// # Simple example with an async closure:
621    /// ```
622    /// # use socketioxide::{SocketIo, extract::*};
623    /// # use serde::{Serialize, Deserialize};
624    /// #[derive(Debug, Serialize, Deserialize)]
625    /// struct MyData {
626    ///     name: String,
627    ///     age: u8,
628    /// }
629    ///
630    /// let (_, io) = SocketIo::new_svc();
631    /// io.ns("/", async |socket: SocketRef| {
632    ///     // Register a handler for the "test" event and extract the data as a `MyData` struct
633    ///     // With the Data extractor, the handler is called only if the data can be deserialized as a `MyData` struct
634    ///     // If you want to manage errors yourself you can use the TryData extractor
635    ///     socket.on("test", async |socket: SocketRef, Data::<MyData>(data)| {
636    ///         println!("Received a test message {:?}", data);
637    ///         socket.emit("test-test", &MyData { name: "Test".to_string(), age: 8 }).ok(); // Emit a message to the client
638    ///     });
639    /// });
640    ///
641    /// ```
642    ///
643    /// # Example with a closure and an acknowledgement + binary data:
644    /// ```
645    /// # use socketioxide::{SocketIo, extract::*};
646    /// # use serde_json::Value;
647    /// # use serde::{Serialize, Deserialize};
648    /// #[derive(Debug, Serialize, Deserialize)]
649    /// struct MyData {
650    ///     name: String,
651    ///     age: u8,
652    /// }
653    ///
654    /// let (_, io) = SocketIo::new_svc();
655    /// io.ns("/", async |socket: SocketRef| {
656    ///     // Register an async handler for the "test" event and extract the data as a `MyData` struct
657    ///     // Extract the binary payload as a `Vec<Bytes>` with the Bin extractor.
658    ///     // It should be the last extractor because it consumes the request
659    ///     socket.on("test", async |socket: SocketRef, Data::<MyData>(data), ack: AckSender| {
660    ///         println!("Received a test message {:?}", data);
661    ///         tokio::time::sleep(std::time::Duration::from_secs(1)).await;
662    ///         ack.send(&data).ok(); // The data received is sent back to the client through the ack
663    ///         socket.emit("test-test", &MyData { name: "Test".to_string(), age: 8 }).ok(); // Emit a message to the client
664    ///     });
665    /// });
666    /// ```
667    /// # Example with a closure and an authentication process:
668    /// ```
669    /// # use socketioxide::{SocketIo, extract::{SocketRef, Data}};
670    /// # use serde::{Serialize, Deserialize};
671    /// #[derive(Debug, Deserialize)]
672    /// struct MyAuthData {
673    ///     token: String,
674    /// }
675    /// #[derive(Debug, Serialize, Deserialize)]
676    /// struct MyData {
677    ///     name: String,
678    ///     age: u8,
679    /// }
680    ///
681    /// let (_, io) = SocketIo::new_svc();
682    /// io.ns("/", async |socket: SocketRef, Data(auth): Data<MyAuthData>| {
683    ///     if auth.token.is_empty() {
684    ///         println!("Invalid token, disconnecting");
685    ///         socket.disconnect().ok();
686    ///         return;
687    ///     }
688    ///     socket.on("test", async |socket: SocketRef, Data::<MyData>(data)| {
689    ///         println!("Received a test message {:?}", data);
690    ///         socket.emit("test-test", &MyData { name: "Test".to_string(), age: 8 }).ok(); // Emit a message to the client
691    ///     });
692    /// });
693    ///
694    /// ```
695    ///
696    /// # With remote adapters, this method is only available on a defined adapter:
697    /// ```compile_fail
698    /// # use socketioxide::{SocketIo};
699    /// // The SocketIo instance is generic over the adapter type.
700    /// async fn test<A: Adapter>(io: SocketIo<A>) {
701    ///     io.ns("/", async || ());
702    /// }
703    /// ```
704    /// ```
705    /// # use socketioxide::{SocketIo, adapter::LocalAdapter};
706    /// // The SocketIo instance is not generic over the adapter type.
707    /// async fn test(io: SocketIo<LocalAdapter>) {
708    ///     io.ns("/", async || ());
709    /// }
710    /// async fn test_default_adapter(io: SocketIo) {
711    ///     io.ns("/", async || ());
712    /// }
713    /// ```
714    pub fn ns<C, T>(&self, path: impl Into<Cow<'static, str>>, callback: C) -> A::InitRes
715    where
716        C: ConnectHandler<A, T>,
717        T: Send + Sync + 'static,
718    {
719        self.0.clone().add_ns(path.into(), callback)
720    }
721}
722
723impl<A: Adapter> fmt::Debug for SocketIo<A> {
724    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
725        f.debug_struct("SocketIo").field("client", &self.0).finish()
726    }
727}
728impl<A: Adapter> Clone for SocketIo<A> {
729    fn clone(&self) -> Self {
730        Self(self.0.clone())
731    }
732}
733impl<A: Adapter> From<Arc<Client<A>>> for SocketIo<A> {
734    fn from(client: Arc<Client<A>>) -> Self {
735        SocketIo(client)
736    }
737}
738
739#[doc(hidden)]
740#[cfg(feature = "__test_harness")]
741impl<A: Adapter> SocketIo<A> {
742    /// Create a dummy socket for testing purpose with a
743    /// receiver to get the packets sent to the client
744    pub async fn new_dummy_sock(
745        &self,
746        ns: &'static str,
747        auth: impl serde::Serialize,
748    ) -> (
749        tokio::sync::mpsc::Sender<engineioxide::Packet>,
750        tokio::sync::mpsc::Receiver<engineioxide::Packet>,
751    ) {
752        self.0.clone().new_dummy_sock(ns, auth).await
753    }
754}
755
756#[cfg(test)]
757mod tests {
758
759    use crate::client::SocketData;
760
761    use super::*;
762
763    #[test]
764    fn get_default_op() {
765        let (_, io) = SocketIo::new_svc();
766        io.ns("/", async || {});
767        let _ = io.get_default_op();
768    }
769
770    #[test]
771    #[should_panic(expected = "default namespace not found")]
772    fn get_default_op_panic() {
773        let (_, io) = SocketIo::new_svc();
774        let _ = io.get_default_op();
775    }
776
777    #[test]
778    fn get_op() {
779        let (_, io) = SocketIo::new_svc();
780        io.ns("test", async || {});
781        assert!(io.get_op("test").is_some());
782        assert!(io.get_op("test2").is_none());
783    }
784
785    #[tokio::test]
786    async fn get_socket_by_sid() {
787        use engineioxide::Socket;
788        let sid = Sid::new();
789        let (_, io) = SocketIo::new_svc();
790        io.ns("/", async || {});
791        let socket = Socket::<SocketData<LocalAdapter>>::new_dummy(sid, Box::new(|_, _| {}));
792        socket.data.io.set(io.clone()).unwrap();
793        io.0.get_ns("/")
794            .unwrap()
795            .connect(sid, socket, None)
796            .await
797            .ok();
798
799        assert!(io.get_socket(sid).is_some());
800        assert!(io.get_socket(Sid::new()).is_none());
801    }
802}