socketioxide/io.rs
1use std::{borrow::Cow, fmt, sync::Arc, time::Duration};
2
3use engineioxide::{
4 TransportType,
5 config::{EngineIoConfig, EngineIoConfigBuilder},
6 service::NotFoundService,
7};
8use serde::Serialize;
9use socketioxide_core::{
10 Sid, Uid,
11 adapter::{DefinedAdapter, Room, RoomParam},
12};
13use socketioxide_parser_common::CommonParser;
14#[cfg(feature = "msgpack")]
15use socketioxide_parser_msgpack::MsgPackParser;
16
17use crate::{
18 BroadcastError, EmitWithAckError,
19 ack::AckStream,
20 adapter::{Adapter, LocalAdapter},
21 client::Client,
22 extract::SocketRef,
23 handler::ConnectHandler,
24 layer::SocketIoLayer,
25 operators::BroadcastOperators,
26 parser::Parser,
27 service::SocketIoService,
28 socket::RemoteSocket,
29};
30
31/// The parser to use to encode and decode socket.io packets
32///
33/// Be sure that the selected parser matches the client parser.
34#[derive(Debug, Clone)]
35pub struct ParserConfig(Parser);
36
37impl ParserConfig {
38 /// Use a [`CommonParser`] to parse incoming and outgoing socket.io packets
39 pub fn common() -> Self {
40 ParserConfig(Parser::Common(CommonParser))
41 }
42
43 /// Use a [`MsgPackParser`] to parse incoming and outgoing socket.io packets
44 #[cfg_attr(docsrs, doc(cfg(feature = "msgpack")))]
45 #[cfg(feature = "msgpack")]
46 pub fn msgpack() -> Self {
47 ParserConfig(Parser::MsgPack(MsgPackParser))
48 }
49}
50
51/// Configuration for Socket.IO & Engine.IO
52#[derive(Debug, Clone)]
53pub struct SocketIoConfig {
54 /// The inner Engine.IO config
55 pub engine_config: EngineIoConfig,
56
57 /// The amount of time the server will wait for an acknowledgement from the client before closing the connection.
58 ///
59 /// Defaults to 5 seconds.
60 pub ack_timeout: Duration,
61
62 /// The amount of time before disconnecting a client that has not successfully joined a namespace.
63 ///
64 /// Defaults to 45 seconds.
65 pub connect_timeout: Duration,
66
67 /// The parser to use to encode and decode socket.io packets
68 pub(crate) parser: Parser,
69
70 /// A global server identifier
71 pub server_id: Uid,
72}
73
74impl Default for SocketIoConfig {
75 fn default() -> Self {
76 Self {
77 engine_config: EngineIoConfig {
78 req_path: "/socket.io".into(),
79 ..Default::default()
80 },
81 ack_timeout: Duration::from_secs(5),
82 connect_timeout: Duration::from_secs(45),
83 parser: Parser::default(),
84 server_id: Uid::new(),
85 }
86 }
87}
88
89/// A builder to create a [`SocketIo`] instance.
90/// It contains everything to configure the socket.io server with a [`SocketIoConfig`].
91/// It can be used to build either a Tower [`Layer`](tower_layer::Layer) or a [`Service`](tower_service::Service).
92pub struct SocketIoBuilder<A: Adapter = LocalAdapter> {
93 config: SocketIoConfig,
94 engine_config_builder: EngineIoConfigBuilder,
95 adapter_state: A::State,
96 #[cfg(feature = "state")]
97 state: state::TypeMap![Send + Sync],
98}
99
100impl SocketIoBuilder<LocalAdapter> {
101 /// Creates a new [`SocketIoBuilder`] with default config
102 pub fn new() -> Self {
103 Self {
104 engine_config_builder: EngineIoConfigBuilder::new().req_path("/socket.io".to_string()),
105 config: SocketIoConfig::default(),
106 adapter_state: (),
107 #[cfg(feature = "state")]
108 state: std::default::Default::default(),
109 }
110 }
111}
112impl<A: Adapter> SocketIoBuilder<A> {
113 /// The path to listen for socket.io requests on.
114 ///
115 /// Defaults to "/socket.io".
116 #[inline]
117 pub fn req_path(mut self, req_path: impl Into<Cow<'static, str>>) -> Self {
118 self.engine_config_builder = self.engine_config_builder.req_path(req_path);
119 self
120 }
121
122 /// The interval at which the server will send a ping packet to the client.
123 ///
124 /// Defaults to 25 seconds.
125 #[inline]
126 pub fn ping_interval(mut self, ping_interval: Duration) -> Self {
127 self.engine_config_builder = self.engine_config_builder.ping_interval(ping_interval);
128 self
129 }
130
131 /// The amount of time the server will wait for a ping response from the client before closing the connection.
132 ///
133 /// Defaults to 20 seconds.
134 #[inline]
135 pub fn ping_timeout(mut self, ping_timeout: Duration) -> Self {
136 self.engine_config_builder = self.engine_config_builder.ping_timeout(ping_timeout);
137 self
138 }
139
140 /// The maximum number of packets that can be buffered per connection before being emitted to the client.
141 /// If the buffer if full the `emit()` method will return an error
142 ///
143 /// Defaults to 128 packets.
144 #[inline]
145 pub fn max_buffer_size(mut self, max_buffer_size: usize) -> Self {
146 self.engine_config_builder = self.engine_config_builder.max_buffer_size(max_buffer_size);
147 self
148 }
149
150 /// The maximum size of a payload in bytes.
151 /// If a payload is bigger than this value the `emit()` method will return an error.
152 ///
153 /// Defaults to 100 kb.
154 #[inline]
155 pub fn max_payload(mut self, max_payload: u64) -> Self {
156 self.engine_config_builder = self.engine_config_builder.max_payload(max_payload);
157 self
158 }
159
160 /// The size of the read buffer for the websocket transport.
161 /// You can tweak this value depending on your use case. Defaults to 4KiB.
162 ///
163 /// Setting it to a higher value will improve performance on heavy read scenarios
164 /// but will consume more memory.
165 #[inline]
166 pub fn ws_read_buffer_size(mut self, ws_read_buffer_size: usize) -> Self {
167 self.engine_config_builder = self
168 .engine_config_builder
169 .ws_read_buffer_size(ws_read_buffer_size);
170 self
171 }
172
173 /// Allowed transports on this server
174 ///
175 /// The `transports` array should have a size of 1 or 2
176 ///
177 /// Defaults to :
178 /// `[TransportType::Polling, TransportType::Websocket]`
179 #[inline]
180 pub fn transports<const N: usize>(mut self, transports: [TransportType; N]) -> Self {
181 self.engine_config_builder = self.engine_config_builder.transports(transports);
182 self
183 }
184
185 /// The amount of time the server will wait for an acknowledgement from the client before closing the connection.
186 ///
187 /// Defaults to 5 seconds.
188 #[inline]
189 pub fn ack_timeout(mut self, ack_timeout: Duration) -> Self {
190 self.config.ack_timeout = ack_timeout;
191 self
192 }
193
194 /// The amount of time before disconnecting a client that has not successfully joined a namespace.
195 ///
196 /// Defaults to 45 seconds.
197 #[inline]
198 pub fn connect_timeout(mut self, connect_timeout: Duration) -> Self {
199 self.config.connect_timeout = connect_timeout;
200 self
201 }
202
203 /// Sets a custom [`SocketIoConfig`] created previously for this [`SocketIoBuilder`]
204 #[inline]
205 pub fn with_config(mut self, config: SocketIoConfig) -> Self {
206 self.config = config;
207 self
208 }
209
210 /// Set a custom [`ParserConfig`] for this [`SocketIoBuilder`]
211 /// ```
212 /// # use socketioxide::{SocketIo, ParserConfig};
213 /// let (io, layer) = SocketIo::builder()
214 /// .with_parser(ParserConfig::msgpack())
215 /// .build_layer();
216 /// ```
217 #[inline]
218 pub fn with_parser(mut self, parser: ParserConfig) -> Self {
219 self.config.parser = parser.0;
220 self
221 }
222
223 /// Set a custom [`Adapter`] for this [`SocketIoBuilder`]
224 pub fn with_adapter<B: Adapter>(self, adapter_state: B::State) -> SocketIoBuilder<B> {
225 SocketIoBuilder {
226 config: self.config,
227 engine_config_builder: self.engine_config_builder,
228 adapter_state,
229 #[cfg(feature = "state")]
230 state: self.state,
231 }
232 }
233
234 /// Add a custom global state for the [`SocketIo`] instance.
235 /// This state will be accessible from every handler with the [`State`](crate::extract::State) extractor.
236 /// You can set any number of states as long as they have different types.
237 /// The state must be cloneable, therefore it is recommended to wrap it in an `Arc` if you want shared state.
238 #[inline]
239 #[cfg_attr(docsrs, doc(cfg(feature = "state")))]
240 #[cfg(feature = "state")]
241 pub fn with_state<S: Clone + Send + Sync + 'static>(self, state: S) -> Self {
242 self.state.set(state);
243 self
244 }
245}
246
247impl<A: Adapter> SocketIoBuilder<A> {
248 /// Build a [`SocketIoLayer`] and a [`SocketIo`] instance that can be used as a [`tower_layer::Layer`].
249 pub fn build_layer(mut self) -> (SocketIoLayer<A>, SocketIo<A>) {
250 self.config.engine_config = self.engine_config_builder.build();
251
252 let (layer, client) = SocketIoLayer::from_config(
253 self.config,
254 self.adapter_state,
255 #[cfg(feature = "state")]
256 self.state,
257 );
258 (layer, SocketIo(client))
259 }
260
261 /// Build a [`SocketIoService`] and a [`SocketIo`] instance that
262 /// can be used as a [`hyper::service::Service`] or a [`tower_service::Service`].
263 ///
264 /// This service will be a _standalone_ service that return a 404 error for every non-socket.io request
265 pub fn build_svc(mut self) -> (SocketIoService<NotFoundService, A>, SocketIo<A>) {
266 self.config.engine_config = self.engine_config_builder.build();
267 let (svc, client) = SocketIoService::with_config_inner(
268 NotFoundService,
269 self.config,
270 self.adapter_state,
271 #[cfg(feature = "state")]
272 self.state,
273 );
274 (svc, SocketIo(client))
275 }
276
277 /// Build a [`SocketIoService`] and a [`SocketIo`] instance with an inner service that
278 /// can be used as a [`hyper::service::Service`] or a [`tower_service::Service`].
279 pub fn build_with_inner_svc<S: Clone>(
280 mut self,
281 svc: S,
282 ) -> (SocketIoService<S, A>, SocketIo<A>) {
283 self.config.engine_config = self.engine_config_builder.build();
284
285 let (svc, client) = SocketIoService::with_config_inner(
286 svc,
287 self.config,
288 self.adapter_state,
289 #[cfg(feature = "state")]
290 self.state,
291 );
292 (svc, SocketIo(client))
293 }
294}
295
296impl Default for SocketIoBuilder {
297 fn default() -> Self {
298 Self::new()
299 }
300}
301
302/// The [`SocketIo`] instance can be cheaply cloned and moved around everywhere in your program.
303/// It can be used as the main handle to access the whole socket.io context.
304///
305/// You can also use it as an extractor for all your [`handlers`](crate::handler).
306///
307/// It is generic over the [`Adapter`] type. If you plan to use it with another adapter than the default,
308/// make sure to have a handler that is [generic over the adapter type](crate#adapters).
309pub struct SocketIo<A: Adapter = LocalAdapter>(Arc<Client<A>>);
310
311impl SocketIo<LocalAdapter> {
312 /// Create a new [`SocketIoBuilder`] with a default config
313 #[inline(always)]
314 pub fn builder() -> SocketIoBuilder {
315 SocketIoBuilder::new()
316 }
317
318 /// Create a new [`SocketIoService`] and a [`SocketIo`] instance with a default config.
319 /// This service will be a _standalone_ service that return a 404 error for every non-socket.io request.
320 /// It can be used as a [`Service`](tower_service::Service) (see hyper example)
321 #[inline(always)]
322 pub fn new_svc() -> (SocketIoService<NotFoundService>, SocketIo) {
323 Self::builder().build_svc()
324 }
325
326 /// Create a new [`SocketIoService`] and a [`SocketIo`] instance with a default config.
327 /// It can be used as a [`Service`](tower_service::Service) with an inner service
328 #[inline(always)]
329 pub fn new_inner_svc<S: Clone>(svc: S) -> (SocketIoService<S>, SocketIo) {
330 Self::builder().build_with_inner_svc(svc)
331 }
332
333 /// Build a [`SocketIoLayer`] and a [`SocketIo`] instance with a default config.
334 /// It can be used as a tower [`Layer`](tower_layer::Layer) (see axum example)
335 #[inline(always)]
336 pub fn new_layer() -> (SocketIoLayer, SocketIo) {
337 Self::builder().build_layer()
338 }
339}
340
341impl<A: Adapter> SocketIo<A> {
342 /// Return a reference to the [`SocketIoConfig`] used by this [`SocketIo`] instance
343 #[inline]
344 pub fn config(&self) -> &SocketIoConfig {
345 &self.0.config
346 }
347
348 /// # Register a [`ConnectHandler`] for the given dynamic namespace.
349 ///
350 /// You can specify dynamic parts in the path by using the `{name}` syntax.
351 /// Note that any static namespace will take precedence over a dynamic one.
352 ///
353 /// For more info about namespace routing, see the [matchit] router documentation.
354 ///
355 /// The dynamic namespace will create a child namespace for any path that matches the given pattern
356 /// with the given handler.
357 ///
358 /// * See the [`connect`](crate::handler::connect) module doc for more details on connect handler.
359 /// * See the [`extract`](crate::extract) module doc for more details on available extractors.
360 ///
361 /// ## Errors
362 /// If the pattern is invalid, a [`NsInsertError`](crate::NsInsertError) will be returned.
363 ///
364 /// ## Example
365 /// ```
366 /// # use socketioxide::{SocketIo, extract::SocketRef};
367 /// let (_, io) = SocketIo::new_svc();
368 /// io.dyn_ns("/client/{client_id}", async |socket: SocketRef| {
369 /// println!("Socket connected on dynamic namespace with namespace path: {}", socket.ns());
370 /// }).unwrap();
371 ///
372 /// ```
373 /// ```
374 /// # use socketioxide::{SocketIo, extract::SocketRef};
375 /// let (_, io) = SocketIo::new_svc();
376 /// io.dyn_ns("/client/{*remaining_path}", async |socket: SocketRef| {
377 /// println!("Socket connected on dynamic namespace with namespace path: {}", socket.ns());
378 /// }).unwrap();
379 ///
380 /// ```
381 #[inline]
382 pub fn dyn_ns<C, T>(
383 &self,
384 path: impl Into<String>,
385 callback: C,
386 ) -> Result<(), crate::NsInsertError>
387 where
388 C: ConnectHandler<A, T>,
389 T: Send + Sync + 'static,
390 {
391 self.0.add_dyn_ns(path.into(), callback)
392 }
393
394 /// # Delete the namespace with the given path.
395 ///
396 /// This will disconnect all sockets connected to this
397 /// namespace in a deferred way.
398 ///
399 /// # Panics
400 /// If the v4 protocol (legacy) is enabled and the namespace to delete is the default namespace "/".
401 /// For v4, the default namespace cannot be deleted.
402 /// See [official doc](https://socket.io/docs/v3/namespaces/#main-namespace) for more informations.
403 #[inline]
404 pub fn delete_ns(&self, path: impl AsRef<str>) {
405 self.0.delete_ns(path.as_ref());
406 }
407
408 /// # Gracefully close all the connections and drop every sockets
409 ///
410 /// Any `on_disconnect` handler will called with
411 /// [`DisconnectReason::ClosingServer`](crate::socket::DisconnectReason::ClosingServer)
412 #[inline]
413 pub async fn close(&self) {
414 self.0.close().await;
415 }
416
417 /// # Get all the namespaces to perform operations on.
418 ///
419 /// This will return a vector of [`BroadcastOperators`] for each namespace.
420 ///
421 /// # Example
422 /// ```
423 /// # use socketioxide::{SocketIo, extract::SocketRef};
424 /// let (_, io) = SocketIo::new_svc();
425 /// io.ns("/custom_ns", async |socket: SocketRef| {
426 /// println!("Socket connected on /custom_ns namespace with id: {}", socket.id);
427 /// });
428 ///
429 /// // Later in your code you can get all the namespaces
430 /// for ns in io.nsps() {
431 /// assert_eq!(ns.ns_path(), "/custom_ns");
432 /// }
433 /// ```
434 #[inline]
435 pub fn nsps(&self) -> Vec<BroadcastOperators<A>> {
436 let parser = self.0.parser();
437 self.0
438 .nsps
439 .read()
440 .unwrap()
441 .values()
442 .map(|ns| BroadcastOperators::new(ns.clone(), parser).broadcast())
443 .collect()
444 }
445
446 // Chaining operators fns
447
448 /// # Select a specific namespace to perform operations on.
449 ///
450 /// Currently you cannot select a dynamic namespace with this method.
451 ///
452 /// # Example
453 /// ```
454 /// # use socketioxide::{SocketIo, extract::SocketRef};
455 /// let (_, io) = SocketIo::new_svc();
456 /// io.ns("custom_ns", async |socket: SocketRef| {
457 /// println!("Socket connected on /custom_ns namespace with id: {}", socket.id);
458 /// });
459 ///
460 /// // Later in your code you can select the custom_ns namespace
461 /// // and show all sockets connected to it
462 /// async fn test(io: SocketIo) {
463 /// let sockets = io.of("custom_ns").unwrap().sockets();
464 /// for socket in sockets {
465 /// println!("found socket on /custom_ns namespace with id: {}", socket.id);
466 /// }
467 /// }
468 /// ```
469 #[inline]
470 pub fn of(&self, path: impl AsRef<str>) -> Option<BroadcastOperators<A>> {
471 self.get_op(path.as_ref())
472 }
473
474 /// _Alias for `io.of("/").unwrap().to()`_. If the **default namespace "/" is not found** this fn will panic!
475 #[doc = include_str!("../docs/operators/to.md")]
476 #[inline]
477 pub fn to(&self, rooms: impl RoomParam) -> BroadcastOperators<A> {
478 self.get_default_op().to(rooms)
479 }
480
481 /// _Alias for `io.of("/").unwrap().within()`_. If the **default namespace "/" is not found** this fn will panic!
482 #[doc = include_str!("../docs/operators/within.md")]
483 #[inline]
484 pub fn within(&self, rooms: impl RoomParam) -> BroadcastOperators<A> {
485 self.get_default_op().within(rooms)
486 }
487
488 /// _Alias for `io.of("/").unwrap().except()`_. If the **default namespace "/" is not found** this fn will panic!
489 #[doc = include_str!("../docs/operators/except.md")]
490 #[inline]
491 pub fn except(&self, rooms: impl RoomParam) -> BroadcastOperators<A> {
492 self.get_default_op().except(rooms)
493 }
494
495 /// _Alias for `io.of("/").unwrap().local()`_. If the **default namespace "/" is not found** this fn will panic!
496 #[doc = include_str!("../docs/operators/local.md")]
497 #[inline]
498 pub fn local(&self) -> BroadcastOperators<A> {
499 self.get_default_op().local()
500 }
501
502 /// _Alias for `io.of("/").unwrap().timeout()`_. If the **default namespace "/" is not found** this fn will panic!
503 #[doc = include_str!("../docs/operators/timeout.md")]
504 #[inline]
505 pub fn timeout(&self, timeout: Duration) -> BroadcastOperators<A> {
506 self.get_default_op().timeout(timeout)
507 }
508
509 /// _Alias for `io.of("/").unwrap().emit()`_. If the **default namespace "/" is not found** this fn will panic!
510 #[doc = include_str!("../docs/operators/emit.md")]
511 #[inline]
512 pub async fn emit<T: ?Sized + Serialize>(
513 &self,
514 event: impl AsRef<str>,
515 data: &T,
516 ) -> Result<(), BroadcastError> {
517 self.get_default_op().emit(event, data).await
518 }
519
520 /// _Alias for `io.of("/").unwrap().emit_with_ack()`_. If the **default namespace "/" is not found** this fn will panic!
521 #[doc = include_str!("../docs/operators/emit_with_ack.md")]
522 #[inline]
523 pub async fn emit_with_ack<T: ?Sized + Serialize, V>(
524 &self,
525 event: impl AsRef<str>,
526 data: &T,
527 ) -> Result<AckStream<V, A>, EmitWithAckError> {
528 self.get_default_op().emit_with_ack(event, data).await
529 }
530
531 /// _Alias for `io.of("/").unwrap().sockets()`_. If the **default namespace "/" is not found** this fn will panic!
532 #[doc = include_str!("../docs/operators/sockets.md")]
533 #[inline]
534 pub fn sockets(&self) -> Vec<SocketRef<A>> {
535 self.get_default_op().sockets()
536 }
537
538 /// _Alias for `io.of("/").unwrap().fetch_sockets()`_. If the **default namespace "/" is not found** this fn will panic!
539 #[doc = include_str!("../docs/operators/fetch_sockets.md")]
540 #[inline]
541 pub async fn fetch_sockets(&self) -> Result<Vec<RemoteSocket<A>>, A::Error> {
542 self.get_default_op().fetch_sockets().await
543 }
544
545 /// _Alias for `io.of("/").unwrap().disconnect()`_. If the **default namespace "/" is not found** this fn will panic!
546 #[doc = include_str!("../docs/operators/disconnect.md")]
547 #[inline]
548 pub async fn disconnect(&self) -> Result<(), BroadcastError> {
549 self.get_default_op().disconnect().await
550 }
551
552 /// _Alias for `io.of("/").unwrap().join()`_. If the **default namespace "/" is not found** this fn will panic!
553 #[doc = include_str!("../docs/operators/join.md")]
554 #[inline]
555 pub async fn join(self, rooms: impl RoomParam) -> Result<(), A::Error> {
556 self.get_default_op().join(rooms).await
557 }
558
559 /// _Alias for `io.of("/").unwrap().rooms()`_. If the **default namespace "/" is not found** this fn will panic!
560 #[doc = include_str!("../docs/operators/rooms.md")]
561 pub async fn rooms(&self) -> Result<Vec<Room>, A::Error> {
562 self.get_default_op().rooms().await
563 }
564
565 /// _Alias for `io.of("/").unwrap().rooms()`_. If the **default namespace "/" is not found** this fn will panic!
566 #[doc = include_str!("../docs/operators/leave.md")]
567 #[inline]
568 pub async fn leave(self, rooms: impl RoomParam) -> Result<(), A::Error> {
569 self.get_default_op().leave(rooms).await
570 }
571
572 /// _Alias for `io.of("/").unwrap().get_socket()`_. If the **default namespace "/" is not found** this fn will panic!
573 #[doc = include_str!("../docs/operators/get_socket.md")]
574 #[inline]
575 pub fn get_socket(&self, sid: Sid) -> Option<SocketRef<A>> {
576 self.get_default_op().get_socket(sid)
577 }
578
579 /// _Alias for `io.of("/").unwrap().broadcast()`_. If the **default namespace "/" is not found** this fn will panic!
580 #[doc = include_str!("../docs/operators/broadcast.md")]
581 #[inline]
582 pub fn broadcast(&self) -> BroadcastOperators<A> {
583 self.get_default_op()
584 }
585
586 #[cfg(feature = "state")]
587 pub(crate) fn get_state<T: Clone + 'static>(&self) -> Option<T> {
588 self.0.state.try_get::<T>().cloned()
589 }
590
591 /// Returns a new operator on the given namespace
592 #[inline(always)]
593 fn get_op(&self, path: &str) -> Option<BroadcastOperators<A>> {
594 let parser = self.config().parser;
595 self.0
596 .get_ns(path)
597 .map(|ns| BroadcastOperators::new(ns, parser).broadcast())
598 }
599
600 /// Returns a new operator on the default namespace "/" (root namespace)
601 ///
602 /// # Panics
603 ///
604 /// If the **default namespace "/" is not found** this fn will panic!
605 #[inline(always)]
606 fn get_default_op(&self) -> BroadcastOperators<A> {
607 self.get_op("/").expect("default namespace not found")
608 }
609}
610
611// This private impl is used to ensure that the following methods
612// are only available on a *defined* adapter.
613#[allow(private_bounds)]
614impl<A: DefinedAdapter + Adapter> SocketIo<A> {
615 /// # Register a [`ConnectHandler`] for the given namespace
616 ///
617 /// * See the [`connect`](crate::handler::connect) module doc for more details on connect handler.
618 /// * See the [`extract`](crate::extract) module doc for more details on available extractors.
619 ///
620 /// # Simple example with an async closure:
621 /// ```
622 /// # use socketioxide::{SocketIo, extract::*};
623 /// # use serde::{Serialize, Deserialize};
624 /// #[derive(Debug, Serialize, Deserialize)]
625 /// struct MyData {
626 /// name: String,
627 /// age: u8,
628 /// }
629 ///
630 /// let (_, io) = SocketIo::new_svc();
631 /// io.ns("/", async |socket: SocketRef| {
632 /// // Register a handler for the "test" event and extract the data as a `MyData` struct
633 /// // With the Data extractor, the handler is called only if the data can be deserialized as a `MyData` struct
634 /// // If you want to manage errors yourself you can use the TryData extractor
635 /// socket.on("test", async |socket: SocketRef, Data::<MyData>(data)| {
636 /// println!("Received a test message {:?}", data);
637 /// socket.emit("test-test", &MyData { name: "Test".to_string(), age: 8 }).ok(); // Emit a message to the client
638 /// });
639 /// });
640 ///
641 /// ```
642 ///
643 /// # Example with a closure and an acknowledgement + binary data:
644 /// ```
645 /// # use socketioxide::{SocketIo, extract::*};
646 /// # use serde_json::Value;
647 /// # use serde::{Serialize, Deserialize};
648 /// #[derive(Debug, Serialize, Deserialize)]
649 /// struct MyData {
650 /// name: String,
651 /// age: u8,
652 /// }
653 ///
654 /// let (_, io) = SocketIo::new_svc();
655 /// io.ns("/", async |socket: SocketRef| {
656 /// // Register an async handler for the "test" event and extract the data as a `MyData` struct
657 /// // Extract the binary payload as a `Vec<Bytes>` with the Bin extractor.
658 /// // It should be the last extractor because it consumes the request
659 /// socket.on("test", async |socket: SocketRef, Data::<MyData>(data), ack: AckSender| {
660 /// println!("Received a test message {:?}", data);
661 /// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
662 /// ack.send(&data).ok(); // The data received is sent back to the client through the ack
663 /// socket.emit("test-test", &MyData { name: "Test".to_string(), age: 8 }).ok(); // Emit a message to the client
664 /// });
665 /// });
666 /// ```
667 /// # Example with a closure and an authentication process:
668 /// ```
669 /// # use socketioxide::{SocketIo, extract::{SocketRef, Data}};
670 /// # use serde::{Serialize, Deserialize};
671 /// #[derive(Debug, Deserialize)]
672 /// struct MyAuthData {
673 /// token: String,
674 /// }
675 /// #[derive(Debug, Serialize, Deserialize)]
676 /// struct MyData {
677 /// name: String,
678 /// age: u8,
679 /// }
680 ///
681 /// let (_, io) = SocketIo::new_svc();
682 /// io.ns("/", async |socket: SocketRef, Data(auth): Data<MyAuthData>| {
683 /// if auth.token.is_empty() {
684 /// println!("Invalid token, disconnecting");
685 /// socket.disconnect().ok();
686 /// return;
687 /// }
688 /// socket.on("test", async |socket: SocketRef, Data::<MyData>(data)| {
689 /// println!("Received a test message {:?}", data);
690 /// socket.emit("test-test", &MyData { name: "Test".to_string(), age: 8 }).ok(); // Emit a message to the client
691 /// });
692 /// });
693 ///
694 /// ```
695 ///
696 /// # With remote adapters, this method is only available on a defined adapter:
697 /// ```compile_fail
698 /// # use socketioxide::{SocketIo};
699 /// // The SocketIo instance is generic over the adapter type.
700 /// async fn test<A: Adapter>(io: SocketIo<A>) {
701 /// io.ns("/", async || ());
702 /// }
703 /// ```
704 /// ```
705 /// # use socketioxide::{SocketIo, adapter::LocalAdapter};
706 /// // The SocketIo instance is not generic over the adapter type.
707 /// async fn test(io: SocketIo<LocalAdapter>) {
708 /// io.ns("/", async || ());
709 /// }
710 /// async fn test_default_adapter(io: SocketIo) {
711 /// io.ns("/", async || ());
712 /// }
713 /// ```
714 pub fn ns<C, T>(&self, path: impl Into<Cow<'static, str>>, callback: C) -> A::InitRes
715 where
716 C: ConnectHandler<A, T>,
717 T: Send + Sync + 'static,
718 {
719 self.0.clone().add_ns(path.into(), callback)
720 }
721}
722
723impl<A: Adapter> fmt::Debug for SocketIo<A> {
724 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
725 f.debug_struct("SocketIo").field("client", &self.0).finish()
726 }
727}
728impl<A: Adapter> Clone for SocketIo<A> {
729 fn clone(&self) -> Self {
730 Self(self.0.clone())
731 }
732}
733impl<A: Adapter> From<Arc<Client<A>>> for SocketIo<A> {
734 fn from(client: Arc<Client<A>>) -> Self {
735 SocketIo(client)
736 }
737}
738
739#[doc(hidden)]
740#[cfg(feature = "__test_harness")]
741impl<A: Adapter> SocketIo<A> {
742 /// Create a dummy socket for testing purpose with a
743 /// receiver to get the packets sent to the client
744 pub async fn new_dummy_sock(
745 &self,
746 ns: &'static str,
747 auth: impl serde::Serialize,
748 ) -> (
749 tokio::sync::mpsc::Sender<engineioxide::Packet>,
750 tokio::sync::mpsc::Receiver<engineioxide::Packet>,
751 ) {
752 self.0.clone().new_dummy_sock(ns, auth).await
753 }
754}
755
756#[cfg(test)]
757mod tests {
758
759 use crate::client::SocketData;
760
761 use super::*;
762
763 #[test]
764 fn get_default_op() {
765 let (_, io) = SocketIo::new_svc();
766 io.ns("/", async || {});
767 let _ = io.get_default_op();
768 }
769
770 #[test]
771 #[should_panic(expected = "default namespace not found")]
772 fn get_default_op_panic() {
773 let (_, io) = SocketIo::new_svc();
774 let _ = io.get_default_op();
775 }
776
777 #[test]
778 fn get_op() {
779 let (_, io) = SocketIo::new_svc();
780 io.ns("test", async || {});
781 assert!(io.get_op("test").is_some());
782 assert!(io.get_op("test2").is_none());
783 }
784
785 #[tokio::test]
786 async fn get_socket_by_sid() {
787 use engineioxide::Socket;
788 let sid = Sid::new();
789 let (_, io) = SocketIo::new_svc();
790 io.ns("/", async || {});
791 let socket = Socket::<SocketData<LocalAdapter>>::new_dummy(sid, Box::new(|_, _| {}));
792 socket.data.io.set(io.clone()).unwrap();
793 io.0.get_ns("/")
794 .unwrap()
795 .connect(sid, socket, None)
796 .await
797 .ok();
798
799 assert!(io.get_socket(sid).is_some());
800 assert!(io.get_socket(Sid::new()).is_none());
801 }
802}