1use std::{fmt, future::Future, sync::Arc, time::Duration};
10
11use serde::Serialize;
12use socketioxide_core::Sid;
13
14use crate::{
15 BroadcastError, EmitWithAckError, SendError,
16 ack::{AckInnerStream, AckStream},
17 adapter::{Adapter, LocalAdapter},
18 extract::SocketRef,
19 ns::Namespace,
20 parser::Parser,
21 socket::{RemoteSocket, Socket},
22};
23
24use socketioxide_core::{
25 adapter::{BroadcastFlags, BroadcastOptions, Room, RoomParam},
26 packet::Packet,
27 parser::{Parse, ParserError},
28};
29
30pub struct ConfOperators<'a, A: Adapter = LocalAdapter> {
32 timeout: Option<Duration>,
33 socket: &'a Socket<A>,
34}
35pub struct BroadcastOperators<A: Adapter = LocalAdapter> {
37 timeout: Option<Duration>,
38 ns: Arc<Namespace<A>>,
39 parser: Parser,
40 opts: BroadcastOptions,
41}
42
43impl<A: Adapter> From<ConfOperators<'_, A>> for BroadcastOperators<A> {
44 fn from(conf: ConfOperators<'_, A>) -> Self {
45 let opts = BroadcastOptions::new(conf.socket.id);
46 Self {
47 timeout: conf.timeout,
48 ns: conf.socket.ns.clone(),
49 parser: conf.socket.parser,
50 opts,
51 }
52 }
53}
54
55impl<'a, A: Adapter> ConfOperators<'a, A> {
57 pub(crate) fn new(sender: &'a Socket<A>) -> Self {
58 Self {
59 timeout: None,
60 socket: sender,
61 }
62 }
63
64 #[doc = include_str!("../docs/operators/to.md")]
65 pub fn to(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
66 BroadcastOperators::from(self).to(rooms)
67 }
68
69 #[doc = include_str!("../docs/operators/within.md")]
70 pub fn within(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
71 BroadcastOperators::from(self).within(rooms)
72 }
73
74 #[doc = include_str!("../docs/operators/except.md")]
75 pub fn except(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
76 BroadcastOperators::from(self).except(rooms)
77 }
78
79 #[doc = include_str!("../docs/operators/local.md")]
80 pub fn local(self) -> BroadcastOperators<A> {
81 BroadcastOperators::from(self).local()
82 }
83
84 #[doc = include_str!("../docs/operators/broadcast.md")]
85 pub fn broadcast(self) -> BroadcastOperators<A> {
86 BroadcastOperators::from(self).broadcast()
87 }
88
89 #[doc = include_str!("../docs/operators/timeout.md")]
90 pub fn timeout(mut self, timeout: Duration) -> Self {
91 self.timeout = Some(timeout);
92 self
93 }
94}
95
96impl<A: Adapter> ConfOperators<'_, A> {
98 #[doc = include_str!("../docs/operators/emit.md")]
99 pub fn emit<T: ?Sized + Serialize>(
100 mut self,
101 event: impl AsRef<str>,
102 data: &T,
103 ) -> Result<(), SendError> {
104 use crate::SocketError;
105 use crate::socket::PermitExt;
106 if !self.socket.connected() {
107 return Err(SendError::Socket(SocketError::Closed));
108 }
109 let permit = match self.socket.reserve() {
110 Ok(permit) => permit,
111 Err(e) => {
112 #[cfg(feature = "tracing")]
113 tracing::debug!("sending error during emit message: {e:?}");
114 return Err(SendError::Socket(e));
115 }
116 };
117 let packet = self.get_packet(event, data)?;
118 permit.send(packet, self.socket.parser);
119
120 Ok(())
121 }
122
123 #[doc = include_str!("../docs/operators/emit_with_ack.md")]
124 pub fn emit_with_ack<T: ?Sized + Serialize, V>(
125 mut self,
126 event: impl AsRef<str>,
127 data: &T,
128 ) -> Result<AckStream<V>, SendError> {
129 use crate::SocketError;
130 if !self.socket.connected() {
131 return Err(SendError::Socket(SocketError::Closed));
132 }
133 let permit = match self.socket.reserve() {
134 Ok(permit) => permit,
135 Err(e) => {
136 #[cfg(feature = "tracing")]
137 tracing::debug!("sending error during emit message: {e:?}");
138 return Err(SendError::Socket(e));
139 }
140 };
141 let timeout = self
142 .timeout
143 .unwrap_or_else(|| self.socket.get_io().config().ack_timeout);
144 let packet = self.get_packet(event, data)?;
145 let rx = self.socket.send_with_ack_permit(packet, permit);
146 let stream = AckInnerStream::send(rx, timeout, self.socket.id);
147 Ok(AckStream::<V>::new(stream, self.socket.parser))
148 }
149
150 #[doc = include_str!("../docs/operators/join.md")]
151 pub fn join(self, rooms: impl RoomParam) {
152 self.socket.join(rooms)
153 }
154
155 #[doc = include_str!("../docs/operators/leave.md")]
156 pub async fn leave(self, rooms: impl RoomParam) {
157 self.socket.leave(rooms)
158 }
159
160 fn get_packet<T: ?Sized + Serialize>(
162 &mut self,
163 event: impl AsRef<str>,
164 data: &T,
165 ) -> Result<Packet, ParserError> {
166 let ns = self.socket.ns.path.clone();
167 let event = event.as_ref();
168 let data = self.socket.parser.encode_value(&data, Some(event))?;
169 Ok(Packet::event(ns, data))
170 }
171}
172
173impl<A: Adapter> BroadcastOperators<A> {
174 pub(crate) fn new(ns: Arc<Namespace<A>>, parser: Parser) -> Self {
175 Self {
176 timeout: None,
177 ns,
178 parser,
179 opts: BroadcastOptions::default(),
180 }
181 }
182 pub(crate) fn from_sock(ns: Arc<Namespace<A>>, sid: Sid, parser: Parser) -> Self {
183 Self {
184 timeout: None,
185 ns,
186 parser,
187 opts: BroadcastOptions::new(sid),
188 }
189 }
190
191 pub fn ns_path(&self) -> &str {
193 &self.ns.path
194 }
195
196 #[doc = include_str!("../docs/operators/to.md")]
197 pub fn to(mut self, rooms: impl RoomParam) -> Self {
198 self.opts.rooms.extend(rooms.into_room_iter());
199 self.broadcast()
200 }
201
202 #[doc = include_str!("../docs/operators/within.md")]
203 pub fn within(mut self, rooms: impl RoomParam) -> Self {
204 self.opts.rooms.extend(rooms.into_room_iter());
205 self
206 }
207
208 #[doc = include_str!("../docs/operators/except.md")]
209 pub fn except(mut self, rooms: impl RoomParam) -> Self {
210 self.opts.except.extend(rooms.into_room_iter());
211 self.broadcast()
212 }
213
214 #[doc = include_str!("../docs/operators/local.md")]
215 pub fn local(mut self) -> Self {
216 self.opts.add_flag(BroadcastFlags::Local);
217 self
218 }
219
220 #[doc = include_str!("../docs/operators/broadcast.md")]
221 pub fn broadcast(mut self) -> Self {
222 self.opts.add_flag(BroadcastFlags::Broadcast);
223 self
224 }
225
226 #[doc = include_str!("../docs/operators/timeout.md")]
227 pub fn timeout(mut self, timeout: Duration) -> Self {
228 self.timeout = Some(timeout);
229 self
230 }
231}
232
233impl<A: Adapter> BroadcastOperators<A> {
235 #[doc = include_str!("../docs/operators/emit.md")]
236 pub fn emit<T: ?Sized + Serialize>(
237 mut self,
238 event: impl AsRef<str>,
239 data: &T,
240 ) -> impl Future<Output = Result<(), BroadcastError>> + Send {
241 let packet = self.get_packet(event, data);
242 async move {
243 self.ns
244 .adapter
245 .broadcast(packet?, self.opts)
246 .await
247 .map_err(|e| {
248 #[cfg(feature = "tracing")]
249 tracing::debug!("broadcast error: {e}");
250 e
251 })?;
252 Ok(())
253 }
254 }
255
256 #[doc = include_str!("../docs/operators/emit_with_ack.md")]
257 pub fn emit_with_ack<T: ?Sized + Serialize, V>(
258 mut self,
259 event: impl AsRef<str>,
260 data: &T,
261 ) -> impl Future<Output = Result<AckStream<V, A>, EmitWithAckError>> + Send {
262 let packet = self.get_packet(event, data);
263 async move {
264 let stream = self
265 .ns
266 .adapter
267 .broadcast_with_ack(packet?, self.opts, self.timeout)
268 .await
269 .map_err(|e| EmitWithAckError::Adapter(Box::new(e)))?;
270 Ok(AckStream::new(stream, self.parser))
271 }
272 }
273
274 #[doc = include_str!("../docs/operators/sockets.md")]
275 pub fn sockets(self) -> Vec<SocketRef<A>> {
276 let ids = self.ns.adapter.get_local().sockets(self.opts);
277
278 ids.into_iter()
279 .filter_map(|id| self.ns.get_socket(id).ok())
280 .map(SocketRef::from)
281 .collect()
282 }
283
284 #[doc = include_str!("../docs/operators/fetch_sockets.md")]
285 pub async fn fetch_sockets(self) -> Result<Vec<RemoteSocket<A>>, A::Error> {
286 let sockets = self
287 .ns
288 .adapter
289 .fetch_sockets(self.opts)
290 .await?
291 .into_iter()
292 .map(|data| RemoteSocket::new(data, &self.ns.adapter, self.parser))
293 .collect();
294 Ok(sockets)
295 }
296
297 #[doc = include_str!("../docs/operators/disconnect.md")]
298 pub async fn disconnect(self) -> Result<(), BroadcastError> {
299 self.ns.adapter.disconnect_socket(self.opts).await
300 }
301
302 #[doc = include_str!("../docs/operators/join.md")]
303 #[allow(clippy::manual_async_fn)] pub fn join(self, rooms: impl RoomParam) -> impl Future<Output = Result<(), A::Error>> + Send {
305 async move { self.ns.adapter.add_sockets(self.opts, rooms).await }
306 }
307
308 #[doc = include_str!("../docs/operators/leave.md")]
309 #[allow(clippy::manual_async_fn)] pub fn leave(self, rooms: impl RoomParam) -> impl Future<Output = Result<(), A::Error>> + Send {
311 async move { self.ns.adapter.del_sockets(self.opts, rooms).await }
312 }
313
314 #[doc = include_str!("../docs/operators/rooms.md")]
315 pub async fn rooms(self) -> Result<Vec<Room>, A::Error> {
316 self.ns.adapter.rooms(self.opts).await
317 }
318
319 #[doc = include_str!("../docs/operators/get_socket.md")]
320 pub fn get_socket(&self, sid: Sid) -> Option<SocketRef<A>> {
321 self.ns.get_socket(sid).map(SocketRef::from).ok()
322 }
323
324 fn get_packet<T: ?Sized + Serialize>(
326 &mut self,
327 event: impl AsRef<str>,
328 data: &T,
329 ) -> Result<Packet, ParserError> {
330 let ns = self.ns.path.clone();
331 let data = self.parser.encode_value(data, Some(event.as_ref()))?;
332 Ok(Packet::event(ns, data))
333 }
334}
335
336impl<'a, A: Adapter> Clone for ConfOperators<'a, A> {
337 fn clone(&self) -> Self {
338 Self {
339 timeout: self.timeout,
340 socket: self.socket,
341 }
342 }
343}
344impl<A: Adapter> Clone for BroadcastOperators<A> {
345 fn clone(&self) -> Self {
346 Self {
347 ns: self.ns.clone(),
348 opts: self.opts.clone(),
349 timeout: self.timeout,
350 parser: self.parser,
351 }
352 }
353}
354impl<A: Adapter> fmt::Debug for BroadcastOperators<A> {
355 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
356 f.debug_struct("BroadcastOperators")
357 .field("ns", &self.ns)
358 .field("opts", &self.opts)
359 .field("timeout", &self.timeout)
360 .finish()
361 }
362}
363
364impl<A: Adapter> fmt::Debug for ConfOperators<'_, A> {
365 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
366 f.debug_struct("ConfOperators")
367 .field("timeout", &self.timeout)
368 .finish()
369 }
370}