socketioxide_emitter/lib.rs
1#![doc(
2 html_logo_url = "https://raw.githubusercontent.com/Totodore/socketioxide/refs/heads/main/.github/logo_dark.svg"
3)]
4#![doc(
5 html_favicon_url = "https://raw.githubusercontent.com/Totodore/socketioxide/refs/heads/main/.github/logo_dark.ico"
6)]
7#![cfg_attr(docsrs, feature(doc_auto_cfg))]
8#![warn(
9 clippy::all,
10 clippy::todo,
11 clippy::empty_enum,
12 clippy::mem_forget,
13 clippy::unused_self,
14 clippy::filter_map_next,
15 clippy::needless_continue,
16 clippy::needless_borrow,
17 clippy::match_wildcard_for_single_variants,
18 clippy::if_let_mutex,
19 clippy::await_holding_lock,
20 clippy::match_on_vec_items,
21 clippy::imprecise_flops,
22 clippy::suboptimal_flops,
23 clippy::lossy_float_literal,
24 clippy::rest_pat_in_fully_bound_structs,
25 clippy::fn_params_excessive_bools,
26 clippy::exit,
27 clippy::inefficient_to_string,
28 clippy::linkedlist,
29 clippy::macro_use_imports,
30 clippy::option_option,
31 clippy::verbose_file_reads,
32 clippy::unnested_or_patterns,
33 rust_2018_idioms,
34 future_incompatible,
35 nonstandard_style,
36 missing_docs
37)]
38//! The Socketioxide Emitter crate allows you to easily communicate with a group of Socket.IO servers
39//! from another rust process (server-side). It must be used in conjunction with [socketioxide-redis](https://docs.rs/socketioxide-redis).
40//!
41//! <div class="warning">
42//! Socketioxide Emitter is not compatible with <code>@socketio/redis-adapter</code>
43//! and <code>@socketio/redis-emitter</code>. They use completely different protocols and
44//! cannot be used together. Do not mix socket.io JS servers with socketioxide rust servers.
45//! If you are looking for a way to emit events to a cluster of node socket.io servers in rust,
46//! you should use the <a href="https://github.com/epli2/socketio-rust-emitter">socketio-rust-emitter</a> package.
47//! </div>
48//!
49//! # Diagram taken from the socket.io documentation:
50//! <img src="https://raw.githubusercontent.com/socketio/socket.io-redis-emitter/refs/heads/main/assets/emitter.png" width="600" />
51//!
52//! # Features and parsers
53//! The emitter supports two parsers: Common and MessagePack. You can enable/disable them with the `parser-common`
54//! and `parser-msgpack` feature flags. If you disable all features, you won't be able to emit events.
55//! It will be only possible to manipulate sockets (join/leave rooms, disconnect).
56//!
57//! # Emit cheat sheet (example with redis)
58//! ```no_run
59//! use redis::{AsyncCommands, aio::MultiplexedConnection};
60//! use socketioxide_emitter::{Driver, IoEmitter};
61//!
62//! struct RedisConnection(MultiplexedConnection);
63//! impl Driver for RedisConnection {
64//! type Error = redis::RedisError;
65//!
66//! async fn emit(&self, channel: String, data: Vec<u8>) -> Result<(), Self::Error> {
67//! self.0
68//! .clone()
69//! .publish::<_, _, redis::Value>(channel, data)
70//! .await?;
71//! Ok(())
72//! }
73//! }
74//!
75//! #[tokio::main]
76//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
77//! let client = redis::Client::open("redis://127.0.0.1").unwrap();
78//! let conn = client.get_multiplexed_tokio_connection().await?;
79//! let conn = RedisConnection(conn);
80//! // sending to all clients
81//! IoEmitter::new().emit("event", "hello!", &conn).await?;
82//!
83//! // sending to all clients in 'room1' room
84//! IoEmitter::new().to("room1").emit("event", "message", &conn).await?;
85//!
86//! // sending to all clients in 'room1' except those in 'room2'
87//! IoEmitter::new().to("room1").except("room2").emit("event", "message", &conn).await?;
88//!
89//! // sending to individual socketid (private message).
90//! // (You will have to make the socket join a room corresponding to its id when it connects.)
91//! IoEmitter::new().to("tK3lxSproMuTbioPAAAB").emit("event", "message", &conn).await?;
92//!
93//! let nsp = IoEmitter::new().of("/admin");
94//!
95//! // sending to all clients in 'admin' namespace
96//! nsp.clone().emit("event", "message", &conn).await?;
97//!
98//! // sending to all clients in 'admin' namespace and in 'notifications' room
99//! nsp.to("notifications").emit("event", "message", &conn).await?;
100//!
101//! let msgpack = IoEmitter::new_msgpack();
102//!
103//! // sending to all clients and encode message with the msgpack format.
104//! msgpack.clone().emit("event", "message", &conn).await?;
105//!
106//! // sending to all clients in 'notifications' room and encode message with the msgpack format.
107//! msgpack.to("notifications").emit("event", "message", &conn).await?;
108//!
109//! Ok(())
110//! }
111use requests::{Request, RequestType};
112use socketioxide_core::{
113 Str,
114 adapter::{BroadcastFlags, BroadcastOptions, RoomParam},
115};
116
117mod requests;
118
119#[cfg(any(feature = "msgpack-parser", feature = "common-parser"))]
120mod emit;
121#[cfg(any(feature = "msgpack-parser", feature = "common-parser"))]
122pub use emit::EmitError;
123
124/// The abstraction between the socketio emitter and the underlying system.
125/// You must implement it for your specific
126/// [`Adapter`](https://docs.rs/socketioxide/latest/socketioxide/#adapters) driver.
127///
128/// For a redis emitter you would implement the driver trait to emit events to a pubsub channel.
129/// The only requirement is that the driver must be able to emit data to specified channels.
130///
131/// # Example with the [redis](https://docs.rs/redis) crate
132/// ```
133/// use redis::{AsyncCommands, aio::MultiplexedConnection};
134/// use socketioxide_emitter::{Driver, IoEmitter};
135///
136/// struct RedisConnection(MultiplexedConnection);
137/// impl Driver for RedisConnection {
138/// type Error = redis::RedisError;
139///
140/// async fn emit(&self, channel: String, data: Vec<u8>) -> Result<(), Self::Error> {
141/// self.0
142/// .clone()
143/// .publish::<_, _, redis::Value>(channel, data)
144/// .await?;
145/// Ok(())
146/// }
147/// }
148/// ```
149///
150/// # Example with the [fred](https://docs.rs/fred) crate
151/// ```
152/// use fred::{
153/// clients::SubscriberClient,
154/// prelude::{ClientLike, PubsubInterface},
155/// };
156/// use socketioxide_emitter::{Driver, IoEmitter};
157///
158/// struct FredConnection(SubscriberClient);
159/// impl Driver for FredConnection {
160/// type Error = fred::error::Error;
161///
162/// async fn emit(&self, channel: String, data: Vec<u8>) -> Result<(), Self::Error> {
163/// self.0.publish::<u16, _, _>(channel, data).await?;
164/// Ok(())
165/// }
166/// }
167/// ```
168pub trait Driver {
169 /// The error type returned by the driver.
170 type Error: std::error::Error;
171 /// Emit data to a given channel.
172 fn emit(&self, channel: String, data: Vec<u8>)
173 -> impl Future<Output = Result<(), Self::Error>>;
174}
175
176/// The [`IoEmitter`] is the main structure for emitting events to a socket.io cluster.
177/// It provides a convenient way to broadcast events to all connected nodes and clients.
178/// It acts as a simple builder for creating socket.io messages to send through the driver.
179#[derive(Clone, Debug)]
180pub struct IoEmitter {
181 opts: BroadcastOptions,
182 ns: Str,
183 prefix: Option<String>,
184 #[cfg(any(feature = "common-parser", feature = "msgpack-parser"))]
185 parser: emit::Parser,
186}
187
188impl Default for IoEmitter {
189 fn default() -> Self {
190 let mut io = Self {
191 opts: Default::default(),
192 ns: Str::from("/"),
193 prefix: None,
194 #[cfg(any(feature = "common-parser", feature = "msgpack-parser"))]
195 parser: emit::Parser::default(),
196 };
197 io.opts.add_flag(BroadcastFlags::Broadcast);
198 io
199 }
200}
201
202impl IoEmitter {
203 /// Creates a new [`IoEmitter`] with the default settings.
204 pub fn new() -> Self {
205 Self::default()
206 }
207 /// Creates a new [`IoEmitter`] with the msgpack parser.
208 #[cfg(feature = "msgpack-parser")]
209 pub fn new_msgpack() -> Self {
210 Self {
211 parser: emit::Parser::MsgPack,
212 ..Default::default()
213 }
214 }
215 /// Sets the namespace for this [`IoEmitter`]. By default, the namespace is set to `/`.
216 pub fn of(mut self, ns: impl Into<Str>) -> IoEmitter {
217 self.ns = ns.into();
218 self
219 }
220 /// Sets the rooms for this [`IoEmitter`]. By default, events are sent to all rooms.
221 pub fn to(mut self, rooms: impl RoomParam) -> IoEmitter {
222 self.opts.rooms.extend(rooms.into_room_iter());
223 self
224 }
225 /// Alias for [`IoEmitter::to`].
226 pub fn within(self, rooms: impl RoomParam) -> IoEmitter {
227 self.to(rooms)
228 }
229 /// Excludes the specified rooms.
230 pub fn except(mut self, rooms: impl RoomParam) -> IoEmitter {
231 self.opts.except.extend(rooms.into_room_iter());
232 self
233 }
234 /// You may have set a custom prefix on your adapter config,
235 /// which will be used as a prefix for the channel name.
236 /// By default, the prefix is `socket.io`.
237 pub fn prefix(mut self, prefix: impl Into<String>) -> IoEmitter {
238 self.prefix = Some(prefix.into());
239 self
240 }
241}
242
243impl IoEmitter {
244 /// Makes the selected sockets join the specified rooms.
245 ///
246 /// # Example
247 ///
248 /// ```ignore
249 /// // Makes the sockets in the root namespace and in the room1 and room2, join the room4.
250 /// IoEmitter::new()
251 /// .to(["room1", "room2"])
252 /// .join("room4", &driver)
253 /// .await?;
254 /// ```
255 pub async fn join<D: Driver>(self, rooms: impl RoomParam, driver: &D) -> Result<(), D::Error> {
256 let rooms = rooms.into_room_iter().collect();
257 let chan = self.get_channel();
258 let data = serialize(self.opts, RequestType::AddSockets(rooms));
259 driver.emit(chan, data).await
260 }
261 /// Makes the selected sockets leave the specified rooms.
262 ///
263 /// # Example
264 ///
265 /// ```ignore
266 /// // Makes the sockets in the root namespace and in the room1 and room2, leave the room4.
267 /// IoEmitter::new()
268 /// .to(["room1", "room2"])
269 /// .leave("room4", &driver)
270 /// .await?;
271 /// ```
272 pub async fn leave<D: Driver>(self, rooms: impl RoomParam, driver: &D) -> Result<(), D::Error> {
273 let rooms = rooms.into_room_iter().collect();
274 let chan = self.get_channel();
275 let data = serialize(self.opts, RequestType::DelSockets(rooms));
276 driver.emit(chan, data).await
277 }
278 /// Disconnects the selected sockets from their namespace.
279 ///
280 /// ```ignore
281 /// // Makes the sockets in the root namespace and in the room1 and room2, disconnect.
282 /// IoEmitter::new()
283 /// .to(["room1", "room2"])
284 /// .disconnect(&driver)
285 /// .await?;
286 /// ```
287 pub async fn disconnect<D: Driver>(self, driver: &D) -> Result<(), D::Error> {
288 let chan = self.get_channel();
289 let data = serialize(self.opts, RequestType::DisconnectSockets);
290 driver.emit(chan, data).await
291 }
292
293 /// Emits a socket.io event to the selected sockets.
294 ///
295 /// ```ignore
296 /// // Emits the event "message" with the message "Hello, world!" to the root namespace sockets
297 /// // that are in the room1 and room2
298 /// IoEmitter::new()
299 /// .to(["room1", "room2"])
300 /// .emit("message", "Hello, world!", &driver)
301 /// .await?;
302 /// ```
303 #[cfg(any(feature = "msgpack-parser", feature = "common-parser"))]
304 pub async fn emit<D: Driver, T: serde::Serialize + ?Sized>(
305 self,
306 event: &str,
307 msg: &T,
308 driver: &D,
309 ) -> Result<(), emit::EmitError<D>> {
310 use emit::{EmitError, Parser};
311 use socketioxide_core::{
312 packet::{Packet, PacketData},
313 parser::Parse,
314 };
315
316 let value = match self.parser {
317 #[cfg(feature = "common-parser")]
318 Parser::Common => {
319 socketioxide_parser_common::CommonParser.encode_value(msg, Some(event))
320 }
321 #[cfg(feature = "msgpack-parser")]
322 Parser::MsgPack => {
323 socketioxide_parser_msgpack::MsgPackParser.encode_value(msg, Some(event))
324 }
325 }
326 .map_err(EmitError::Parser)?;
327
328 let chan = self.get_channel();
329 let packet = Packet {
330 inner: PacketData::Event(value, None),
331 ns: self.ns,
332 };
333
334 let data = serialize(self.opts, RequestType::Broadcast(packet));
335 driver.emit(chan, data).await.map_err(EmitError::Driver)?;
336 Ok(())
337 }
338}
339
340impl IoEmitter {
341 /// The request channel used to broadcast requests to all the servers.
342 /// Format: `{prefix}-request#{path}#`.
343 fn get_channel(&self) -> String {
344 let prefix = self.prefix.as_deref().unwrap_or("socket.io");
345 format!("{}-request#{}#", prefix, &self.ns)
346 }
347}
348fn serialize(opts: BroadcastOptions, req_type: RequestType) -> Vec<u8> {
349 let req = Request::new(req_type, opts);
350 rmp_serde::to_vec(&req).unwrap()
351}