socketioxide_emitter/
lib.rs

1#![doc(
2    html_logo_url = "https://raw.githubusercontent.com/Totodore/socketioxide/refs/heads/main/.github/logo_dark.svg"
3)]
4#![doc(
5    html_favicon_url = "https://raw.githubusercontent.com/Totodore/socketioxide/refs/heads/main/.github/logo_dark.ico"
6)]
7#![cfg_attr(docsrs, feature(doc_auto_cfg))]
8#![warn(
9    clippy::all,
10    clippy::todo,
11    clippy::empty_enum,
12    clippy::mem_forget,
13    clippy::unused_self,
14    clippy::filter_map_next,
15    clippy::needless_continue,
16    clippy::needless_borrow,
17    clippy::match_wildcard_for_single_variants,
18    clippy::if_let_mutex,
19    clippy::await_holding_lock,
20    clippy::match_on_vec_items,
21    clippy::imprecise_flops,
22    clippy::suboptimal_flops,
23    clippy::lossy_float_literal,
24    clippy::rest_pat_in_fully_bound_structs,
25    clippy::fn_params_excessive_bools,
26    clippy::exit,
27    clippy::inefficient_to_string,
28    clippy::linkedlist,
29    clippy::macro_use_imports,
30    clippy::option_option,
31    clippy::verbose_file_reads,
32    clippy::unnested_or_patterns,
33    rust_2018_idioms,
34    future_incompatible,
35    nonstandard_style,
36    missing_docs
37)]
38//! The Socketioxide Emitter crate allows you to easily communicate with a group of Socket.IO servers
39//! from another rust process (server-side). It must be used in conjunction with [socketioxide-redis](https://docs.rs/socketioxide-redis).
40//!
41//! <div class="warning">
42//!     Socketioxide Emitter is not compatible with <code>@socketio/redis-adapter</code>
43//!     and <code>@socketio/redis-emitter</code>. They use completely different protocols and
44//!     cannot be used together. Do not mix socket.io JS servers with socketioxide rust servers.
45//!     If you are looking for a way to emit events to a cluster of node socket.io servers in rust,
46//!     you should use the <a href="https://github.com/epli2/socketio-rust-emitter">socketio-rust-emitter</a> package.
47//! </div>
48//!
49//! # Diagram taken from the socket.io documentation:
50//! <img src="https://raw.githubusercontent.com/socketio/socket.io-redis-emitter/refs/heads/main/assets/emitter.png" width="600" />
51//!
52//! # Features and parsers
53//! The emitter supports two parsers: Common and MessagePack. You can enable/disable them with the `parser-common`
54//! and `parser-msgpack` feature flags. If you disable all features, you won't be able to emit events.
55//! It will be only possible to manipulate sockets (join/leave rooms, disconnect).
56//!
57//! # Emit cheat sheet (example with redis)
58//! ```no_run
59//! use redis::{AsyncCommands, aio::MultiplexedConnection};
60//! use socketioxide_emitter::{Driver, IoEmitter};
61//!
62//! struct RedisConnection(MultiplexedConnection);
63//! impl Driver for RedisConnection {
64//!     type Error = redis::RedisError;
65//!
66//!     async fn emit(&self, channel: String, data: Vec<u8>) -> Result<(), Self::Error> {
67//!         self.0
68//!             .clone()
69//!             .publish::<_, _, redis::Value>(channel, data)
70//!             .await?;
71//!         Ok(())
72//!     }
73//! }
74//!
75//! #[tokio::main]
76//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
77//!     let client = redis::Client::open("redis://127.0.0.1").unwrap();
78//!     let conn = client.get_multiplexed_tokio_connection().await?;
79//!     let conn = RedisConnection(conn);
80//!     // sending to all clients
81//!     IoEmitter::new().emit("event", "hello!", &conn).await?;
82//!
83//!     // sending to all clients in 'room1' room
84//!     IoEmitter::new().to("room1").emit("event", "message", &conn).await?;
85//!
86//!     // sending to all clients in 'room1' except those in 'room2'
87//!     IoEmitter::new().to("room1").except("room2").emit("event", "message", &conn).await?;
88//!
89//!     // sending to individual socketid (private message).
90//!     // (You will have to make the socket join a room corresponding to its id when it connects.)
91//!     IoEmitter::new().to("tK3lxSproMuTbioPAAAB").emit("event", "message", &conn).await?;
92//!
93//!     let nsp = IoEmitter::new().of("/admin");
94//!
95//!     // sending to all clients in 'admin' namespace
96//!     nsp.clone().emit("event", "message", &conn).await?;
97//!
98//!     // sending to all clients in 'admin' namespace and in 'notifications' room
99//!     nsp.to("notifications").emit("event", "message", &conn).await?;
100//!
101//!     let msgpack = IoEmitter::new_msgpack();
102//!
103//!     // sending to all clients and encode message with the msgpack format.
104//!     msgpack.clone().emit("event", "message", &conn).await?;
105//!
106//!     // sending to all clients in 'notifications' room and encode message with the msgpack format.
107//!     msgpack.to("notifications").emit("event", "message", &conn).await?;
108//!
109//!     Ok(())
110//! }
111use requests::{Request, RequestType};
112use socketioxide_core::{
113    Str,
114    adapter::{BroadcastFlags, BroadcastOptions, RoomParam},
115};
116
117mod requests;
118
119#[cfg(any(feature = "msgpack-parser", feature = "common-parser"))]
120mod emit;
121#[cfg(any(feature = "msgpack-parser", feature = "common-parser"))]
122pub use emit::EmitError;
123
124/// The abstraction between the socketio emitter and the underlying system.
125/// You must implement it for your specific
126/// [`Adapter`](https://docs.rs/socketioxide/latest/socketioxide/#adapters) driver.
127///
128/// For a redis emitter you would implement the driver trait to emit events to a pubsub channel.
129/// The only requirement is that the driver must be able to emit data to specified channels.
130///
131/// # Example with the [redis](https://docs.rs/redis) crate
132/// ```
133/// use redis::{AsyncCommands, aio::MultiplexedConnection};
134/// use socketioxide_emitter::{Driver, IoEmitter};
135///
136/// struct RedisConnection(MultiplexedConnection);
137/// impl Driver for RedisConnection {
138///     type Error = redis::RedisError;
139///
140///     async fn emit(&self, channel: String, data: Vec<u8>) -> Result<(), Self::Error> {
141///         self.0
142///             .clone()
143///             .publish::<_, _, redis::Value>(channel, data)
144///             .await?;
145///         Ok(())
146///     }
147/// }
148/// ```
149///
150/// # Example with the [fred](https://docs.rs/fred) crate
151/// ```
152/// use fred::{
153///     clients::SubscriberClient,
154///     prelude::{ClientLike, PubsubInterface},
155/// };
156/// use socketioxide_emitter::{Driver, IoEmitter};
157///
158/// struct FredConnection(SubscriberClient);
159/// impl Driver for FredConnection {
160///     type Error = fred::error::Error;
161///
162///     async fn emit(&self, channel: String, data: Vec<u8>) -> Result<(), Self::Error> {
163///         self.0.publish::<u16, _, _>(channel, data).await?;
164///         Ok(())
165///     }
166/// }
167/// ```
168pub trait Driver {
169    /// The error type returned by the driver.
170    type Error: std::error::Error;
171    /// Emit data to a given channel.
172    fn emit(&self, channel: String, data: Vec<u8>)
173    -> impl Future<Output = Result<(), Self::Error>>;
174}
175
176/// The [`IoEmitter`] is the main structure for emitting events to a socket.io cluster.
177/// It provides a convenient way to broadcast events to all connected nodes and clients.
178/// It acts as a simple builder for creating socket.io messages to send through the driver.
179#[derive(Clone, Debug)]
180pub struct IoEmitter {
181    opts: BroadcastOptions,
182    ns: Str,
183    prefix: Option<String>,
184    #[cfg(any(feature = "common-parser", feature = "msgpack-parser"))]
185    parser: emit::Parser,
186}
187
188impl Default for IoEmitter {
189    fn default() -> Self {
190        let mut io = Self {
191            opts: Default::default(),
192            ns: Str::from("/"),
193            prefix: None,
194            #[cfg(any(feature = "common-parser", feature = "msgpack-parser"))]
195            parser: emit::Parser::default(),
196        };
197        io.opts.add_flag(BroadcastFlags::Broadcast);
198        io
199    }
200}
201
202impl IoEmitter {
203    /// Creates a new [`IoEmitter`] with the default settings.
204    pub fn new() -> Self {
205        Self::default()
206    }
207    /// Creates a new [`IoEmitter`] with the msgpack parser.
208    #[cfg(feature = "msgpack-parser")]
209    pub fn new_msgpack() -> Self {
210        Self {
211            parser: emit::Parser::MsgPack,
212            ..Default::default()
213        }
214    }
215    /// Sets the namespace for this [`IoEmitter`]. By default, the namespace is set to `/`.
216    pub fn of(mut self, ns: impl Into<Str>) -> IoEmitter {
217        self.ns = ns.into();
218        self
219    }
220    /// Sets the rooms for this [`IoEmitter`]. By default, events are sent to all rooms.
221    pub fn to(mut self, rooms: impl RoomParam) -> IoEmitter {
222        self.opts.rooms.extend(rooms.into_room_iter());
223        self
224    }
225    /// Alias for [`IoEmitter::to`].
226    pub fn within(self, rooms: impl RoomParam) -> IoEmitter {
227        self.to(rooms)
228    }
229    /// Excludes the specified rooms.
230    pub fn except(mut self, rooms: impl RoomParam) -> IoEmitter {
231        self.opts.except.extend(rooms.into_room_iter());
232        self
233    }
234    /// You may have set a custom prefix on your adapter config,
235    /// which will be used as a prefix for the channel name.
236    /// By default, the prefix is `socket.io`.
237    pub fn prefix(mut self, prefix: impl Into<String>) -> IoEmitter {
238        self.prefix = Some(prefix.into());
239        self
240    }
241}
242
243impl IoEmitter {
244    /// Makes the selected sockets join the specified rooms.
245    ///
246    /// # Example
247    ///
248    /// ```ignore
249    /// // Makes the sockets in the root namespace and in the room1 and room2, join the room4.
250    /// IoEmitter::new()
251    ///     .to(["room1", "room2"])
252    ///     .join("room4", &driver)
253    ///     .await?;
254    /// ```
255    pub async fn join<D: Driver>(self, rooms: impl RoomParam, driver: &D) -> Result<(), D::Error> {
256        let rooms = rooms.into_room_iter().collect();
257        let chan = self.get_channel();
258        let data = serialize(self.opts, RequestType::AddSockets(rooms));
259        driver.emit(chan, data).await
260    }
261    /// Makes the selected sockets leave the specified rooms.
262    ///
263    /// # Example
264    ///
265    /// ```ignore
266    /// // Makes the sockets in the root namespace and in the room1 and room2, leave the room4.
267    /// IoEmitter::new()
268    ///     .to(["room1", "room2"])
269    ///     .leave("room4", &driver)
270    ///     .await?;
271    /// ```
272    pub async fn leave<D: Driver>(self, rooms: impl RoomParam, driver: &D) -> Result<(), D::Error> {
273        let rooms = rooms.into_room_iter().collect();
274        let chan = self.get_channel();
275        let data = serialize(self.opts, RequestType::DelSockets(rooms));
276        driver.emit(chan, data).await
277    }
278    /// Disconnects the selected sockets from their namespace.
279    ///
280    /// ```ignore
281    /// // Makes the sockets in the root namespace and in the room1 and room2, disconnect.
282    /// IoEmitter::new()
283    ///     .to(["room1", "room2"])
284    ///     .disconnect(&driver)
285    ///     .await?;
286    /// ```
287    pub async fn disconnect<D: Driver>(self, driver: &D) -> Result<(), D::Error> {
288        let chan = self.get_channel();
289        let data = serialize(self.opts, RequestType::DisconnectSockets);
290        driver.emit(chan, data).await
291    }
292
293    /// Emits a socket.io event to the selected sockets.
294    ///
295    /// ```ignore
296    /// // Emits the event "message" with the message "Hello, world!" to the root namespace sockets
297    /// // that are in the room1 and room2
298    /// IoEmitter::new()
299    ///     .to(["room1", "room2"])
300    ///     .emit("message", "Hello, world!", &driver)
301    ///     .await?;
302    /// ```
303    #[cfg(any(feature = "msgpack-parser", feature = "common-parser"))]
304    pub async fn emit<D: Driver, T: serde::Serialize + ?Sized>(
305        self,
306        event: &str,
307        msg: &T,
308        driver: &D,
309    ) -> Result<(), emit::EmitError<D>> {
310        use emit::{EmitError, Parser};
311        use socketioxide_core::{
312            packet::{Packet, PacketData},
313            parser::Parse,
314        };
315
316        let value = match self.parser {
317            #[cfg(feature = "common-parser")]
318            Parser::Common => {
319                socketioxide_parser_common::CommonParser.encode_value(msg, Some(event))
320            }
321            #[cfg(feature = "msgpack-parser")]
322            Parser::MsgPack => {
323                socketioxide_parser_msgpack::MsgPackParser.encode_value(msg, Some(event))
324            }
325        }
326        .map_err(EmitError::Parser)?;
327
328        let chan = self.get_channel();
329        let packet = Packet {
330            inner: PacketData::Event(value, None),
331            ns: self.ns,
332        };
333
334        let data = serialize(self.opts, RequestType::Broadcast(packet));
335        driver.emit(chan, data).await.map_err(EmitError::Driver)?;
336        Ok(())
337    }
338}
339
340impl IoEmitter {
341    /// The request channel used to broadcast requests to all the servers.
342    /// Format: `{prefix}-request#{path}#`.
343    fn get_channel(&self) -> String {
344        let prefix = self.prefix.as_deref().unwrap_or("socket.io");
345        format!("{}-request#{}#", prefix, &self.ns)
346    }
347}
348fn serialize(opts: BroadcastOptions, req_type: RequestType) -> Vec<u8> {
349    let req = Request::new(req_type, opts);
350    rmp_serde::to_vec(&req).unwrap()
351}