1use std::{future::Future, sync::Arc, time::Duration};
10
11use serde::Serialize;
12use socketioxide_core::Sid;
13
14use crate::{
15 BroadcastError, EmitWithAckError, SendError,
16 ack::{AckInnerStream, AckStream},
17 adapter::{Adapter, LocalAdapter},
18 extract::SocketRef,
19 ns::Namespace,
20 parser::Parser,
21 socket::{RemoteSocket, Socket},
22};
23
24use socketioxide_core::{
25 adapter::{BroadcastFlags, BroadcastOptions, Room, RoomParam},
26 packet::Packet,
27 parser::{Parse, ParserError},
28};
29
30pub struct ConfOperators<'a, A: Adapter = LocalAdapter> {
32 timeout: Option<Duration>,
33 socket: &'a Socket<A>,
34}
35pub struct BroadcastOperators<A: Adapter = LocalAdapter> {
37 timeout: Option<Duration>,
38 ns: Arc<Namespace<A>>,
39 parser: Parser,
40 opts: BroadcastOptions,
41}
42
43impl<A: Adapter> From<ConfOperators<'_, A>> for BroadcastOperators<A> {
44 fn from(conf: ConfOperators<'_, A>) -> Self {
45 let opts = BroadcastOptions::new(conf.socket.id);
46 Self {
47 timeout: conf.timeout,
48 ns: conf.socket.ns.clone(),
49 parser: conf.socket.parser,
50 opts,
51 }
52 }
53}
54
55impl<'a, A: Adapter> ConfOperators<'a, A> {
57 pub(crate) fn new(sender: &'a Socket<A>) -> Self {
58 Self {
59 timeout: None,
60 socket: sender,
61 }
62 }
63
64 #[doc = include_str!("../docs/operators/to.md")]
65 pub fn to(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
66 BroadcastOperators::from(self).to(rooms)
67 }
68
69 #[doc = include_str!("../docs/operators/within.md")]
70 pub fn within(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
71 BroadcastOperators::from(self).within(rooms)
72 }
73
74 #[doc = include_str!("../docs/operators/except.md")]
75 pub fn except(self, rooms: impl RoomParam) -> BroadcastOperators<A> {
76 BroadcastOperators::from(self).except(rooms)
77 }
78
79 #[doc = include_str!("../docs/operators/local.md")]
80 pub fn local(self) -> BroadcastOperators<A> {
81 BroadcastOperators::from(self).local()
82 }
83
84 #[doc = include_str!("../docs/operators/broadcast.md")]
85 pub fn broadcast(self) -> BroadcastOperators<A> {
86 BroadcastOperators::from(self).broadcast()
87 }
88
89 #[doc = include_str!("../docs/operators/timeout.md")]
90 pub fn timeout(mut self, timeout: Duration) -> Self {
91 self.timeout = Some(timeout);
92 self
93 }
94}
95
96impl<A: Adapter> ConfOperators<'_, A> {
98 #[doc = include_str!("../docs/operators/emit.md")]
99 pub fn emit<T: ?Sized + Serialize>(
100 mut self,
101 event: impl AsRef<str>,
102 data: &T,
103 ) -> Result<(), SendError> {
104 use crate::SocketError;
105 use crate::socket::PermitExt;
106 if !self.socket.connected() {
107 return Err(SendError::Socket(SocketError::Closed));
108 }
109 let permit = match self.socket.reserve() {
110 Ok(permit) => permit,
111 Err(e) => {
112 #[cfg(feature = "tracing")]
113 tracing::debug!("sending error during emit message: {e:?}");
114 return Err(SendError::Socket(e));
115 }
116 };
117 let packet = self.get_packet(event, data)?;
118 permit.send(packet, self.socket.parser);
119
120 Ok(())
121 }
122
123 #[doc = include_str!("../docs/operators/emit_with_ack.md")]
124 pub fn emit_with_ack<T: ?Sized + Serialize, V>(
125 mut self,
126 event: impl AsRef<str>,
127 data: &T,
128 ) -> Result<AckStream<V>, SendError> {
129 use crate::SocketError;
130 if !self.socket.connected() {
131 return Err(SendError::Socket(SocketError::Closed));
132 }
133 let permit = match self.socket.reserve() {
134 Ok(permit) => permit,
135 Err(e) => {
136 #[cfg(feature = "tracing")]
137 tracing::debug!("sending error during emit message: {e:?}");
138 return Err(SendError::Socket(e));
139 }
140 };
141 let timeout = self
142 .timeout
143 .unwrap_or_else(|| self.socket.get_io().config().ack_timeout);
144 let packet = self.get_packet(event, data)?;
145 let rx = self.socket.send_with_ack_permit(packet, permit);
146 let stream = AckInnerStream::send(rx, timeout, self.socket.id);
147 Ok(AckStream::<V>::new(stream, self.socket.parser))
148 }
149
150 #[doc = include_str!("../docs/operators/join.md")]
151 pub fn join(self, rooms: impl RoomParam) {
152 self.socket.join(rooms)
153 }
154
155 #[doc = include_str!("../docs/operators/leave.md")]
156 pub async fn leave(self, rooms: impl RoomParam) {
157 self.socket.leave(rooms)
158 }
159
160 fn get_packet<T: ?Sized + Serialize>(
162 &mut self,
163 event: impl AsRef<str>,
164 data: &T,
165 ) -> Result<Packet, ParserError> {
166 let ns = self.socket.ns.path.clone();
167 let event = event.as_ref();
168 let data = self.socket.parser.encode_value(&data, Some(event))?;
169 Ok(Packet::event(ns, data))
170 }
171}
172
173impl<A: Adapter> BroadcastOperators<A> {
174 pub(crate) fn new(ns: Arc<Namespace<A>>, parser: Parser) -> Self {
175 Self {
176 timeout: None,
177 ns,
178 parser,
179 opts: BroadcastOptions::default(),
180 }
181 }
182 pub(crate) fn from_sock(ns: Arc<Namespace<A>>, sid: Sid, parser: Parser) -> Self {
183 Self {
184 timeout: None,
185 ns,
186 parser,
187 opts: BroadcastOptions::new(sid),
188 }
189 }
190
191 #[doc = include_str!("../docs/operators/to.md")]
192 pub fn to(mut self, rooms: impl RoomParam) -> Self {
193 self.opts.rooms.extend(rooms.into_room_iter());
194 self.broadcast()
195 }
196
197 #[doc = include_str!("../docs/operators/within.md")]
198 pub fn within(mut self, rooms: impl RoomParam) -> Self {
199 self.opts.rooms.extend(rooms.into_room_iter());
200 self
201 }
202
203 #[doc = include_str!("../docs/operators/except.md")]
204 pub fn except(mut self, rooms: impl RoomParam) -> Self {
205 self.opts.except.extend(rooms.into_room_iter());
206 self.broadcast()
207 }
208
209 #[doc = include_str!("../docs/operators/local.md")]
210 pub fn local(mut self) -> Self {
211 self.opts.add_flag(BroadcastFlags::Local);
212 self
213 }
214
215 #[doc = include_str!("../docs/operators/broadcast.md")]
216 pub fn broadcast(mut self) -> Self {
217 self.opts.add_flag(BroadcastFlags::Broadcast);
218 self
219 }
220
221 #[doc = include_str!("../docs/operators/timeout.md")]
222 pub fn timeout(mut self, timeout: Duration) -> Self {
223 self.timeout = Some(timeout);
224 self
225 }
226}
227
228impl<A: Adapter> BroadcastOperators<A> {
230 #[doc = include_str!("../docs/operators/emit.md")]
231 pub fn emit<T: ?Sized + Serialize>(
232 mut self,
233 event: impl AsRef<str>,
234 data: &T,
235 ) -> impl Future<Output = Result<(), BroadcastError>> + Send {
236 let packet = self.get_packet(event, data);
237 async move {
238 self.ns
239 .adapter
240 .broadcast(packet?, self.opts)
241 .await
242 .map_err(|e| {
243 #[cfg(feature = "tracing")]
244 tracing::debug!("broadcast error: {e}");
245 e
246 })?;
247 Ok(())
248 }
249 }
250
251 #[doc = include_str!("../docs/operators/emit_with_ack.md")]
252 pub fn emit_with_ack<T: ?Sized + Serialize, V>(
253 mut self,
254 event: impl AsRef<str>,
255 data: &T,
256 ) -> impl Future<Output = Result<AckStream<V, A>, EmitWithAckError>> + Send {
257 let packet = self.get_packet(event, data);
258 async move {
259 let stream = self
260 .ns
261 .adapter
262 .broadcast_with_ack(packet?, self.opts, self.timeout)
263 .await
264 .map_err(|e| EmitWithAckError::Adapter(Box::new(e)))?;
265 Ok(AckStream::new(stream, self.parser))
266 }
267 }
268
269 #[doc = include_str!("../docs/operators/sockets.md")]
270 pub fn sockets(self) -> Vec<SocketRef<A>> {
271 let ids = self.ns.adapter.get_local().sockets(self.opts);
272
273 ids.into_iter()
274 .filter_map(|id| self.ns.get_socket(id).ok())
275 .map(SocketRef::from)
276 .collect()
277 }
278
279 #[doc = include_str!("../docs/operators/fetch_sockets.md")]
280 pub async fn fetch_sockets(self) -> Result<Vec<RemoteSocket<A>>, A::Error> {
281 let sockets = self
282 .ns
283 .adapter
284 .fetch_sockets(self.opts)
285 .await?
286 .into_iter()
287 .map(|data| RemoteSocket::new(data, &self.ns.adapter, self.parser))
288 .collect();
289 Ok(sockets)
290 }
291
292 #[doc = include_str!("../docs/operators/disconnect.md")]
293 pub async fn disconnect(self) -> Result<(), BroadcastError> {
294 self.ns.adapter.disconnect_socket(self.opts).await
295 }
296
297 #[doc = include_str!("../docs/operators/join.md")]
298 #[allow(clippy::manual_async_fn)] pub fn join(self, rooms: impl RoomParam) -> impl Future<Output = Result<(), A::Error>> + Send {
300 async move { self.ns.adapter.add_sockets(self.opts, rooms).await }
301 }
302
303 #[doc = include_str!("../docs/operators/leave.md")]
304 #[allow(clippy::manual_async_fn)] pub fn leave(self, rooms: impl RoomParam) -> impl Future<Output = Result<(), A::Error>> + Send {
306 async move { self.ns.adapter.del_sockets(self.opts, rooms).await }
307 }
308
309 #[doc = include_str!("../docs/operators/rooms.md")]
310 pub async fn rooms(self) -> Result<Vec<Room>, A::Error> {
311 self.ns.adapter.rooms(self.opts).await
312 }
313
314 #[doc = include_str!("../docs/operators/get_socket.md")]
315 pub fn get_socket(&self, sid: Sid) -> Option<SocketRef<A>> {
316 self.ns.get_socket(sid).map(SocketRef::from).ok()
317 }
318
319 fn get_packet<T: ?Sized + Serialize>(
321 &mut self,
322 event: impl AsRef<str>,
323 data: &T,
324 ) -> Result<Packet, ParserError> {
325 let ns = self.ns.path.clone();
326 let data = self.parser.encode_value(data, Some(event.as_ref()))?;
327 Ok(Packet::event(ns, data))
328 }
329}