Skip to main content

tf_rust_socketio/client/
raw_client.rs

1use super::callback::Callback;
2use crate::packet::{Packet, PacketId};
3use crate::Error;
4pub(crate) use crate::{event::CloseReason, event::Event, payload::Payload};
5use rand::{thread_rng, Rng};
6use serde_json::Value;
7
8use crate::client::callback::{SocketAnyCallback, SocketCallback};
9use crate::error::Result;
10use std::collections::HashMap;
11use std::ops::DerefMut;
12use std::sync::{Arc, Mutex};
13use std::time::Duration;
14use std::time::Instant;
15
16use crate::socket::Socket as InnerSocket;
17
18/// Represents an `Ack` as given back to the caller. Holds the internal `id` as
19/// well as the current ack'ed state. Holds data which will be accessible as
20/// soon as the ack'ed state is set to true. An `Ack` that didn't get ack'ed
21/// won't contain data.
22#[derive(Debug)]
23pub struct Ack {
24    pub id: i32,
25    timeout: Duration,
26    time_started: Instant,
27    callback: Callback<SocketCallback>,
28}
29
30/// A socket which handles communication with the server. It's initialized with
31/// a specific address as well as an optional namespace to connect to. If `None`
32/// is given the server will connect to the default namespace `"/"`.
33#[derive(Clone)]
34pub struct RawClient {
35    /// The inner socket client to delegate the methods to.
36    socket: InnerSocket,
37    on: Arc<Mutex<HashMap<Event, Callback<SocketCallback>>>>,
38    on_any: Arc<Mutex<Option<Callback<SocketAnyCallback>>>>,
39    outstanding_acks: Arc<Mutex<Vec<Ack>>>,
40    // namespace, for multiplexing messages
41    nsp: String,
42    // Data send in the opening packet (commonly used as for auth)
43    auth: Option<Value>,
44}
45
46impl RawClient {
47    /// Creates a socket with a certain address to connect to as well as a
48    /// namespace. If `None` is passed in as namespace, the default namespace
49    /// `"/"` is taken.
50    /// ```
51    pub(crate) fn new<T: Into<String>>(
52        socket: InnerSocket,
53        namespace: T,
54        on: Arc<Mutex<HashMap<Event, Callback<SocketCallback>>>>,
55        on_any: Arc<Mutex<Option<Callback<SocketAnyCallback>>>>,
56        auth: Option<Value>,
57    ) -> Result<Self> {
58        Ok(RawClient {
59            socket,
60            nsp: namespace.into(),
61            on,
62            on_any,
63            outstanding_acks: Arc::new(Mutex::new(Vec::new())),
64            auth,
65        })
66    }
67
68    /// Connects the client to a server. Afterwards the `emit_*` methods can be
69    /// called to interact with the server. Attention: it's not allowed to add a
70    /// callback after a call to this method.
71    pub(crate) fn connect(&self) -> Result<()> {
72        // Connect the underlying socket
73        self.socket.connect()?;
74
75        let auth = self.auth.as_ref().map(|data| data.to_string());
76
77        // construct the opening packet
78        let open_packet = Packet::new(PacketId::Connect, self.nsp.clone(), auth, None, 0, None);
79
80        self.socket.send(open_packet)?;
81
82        Ok(())
83    }
84
85    /// Sends a message to the server using the underlying `engine.io` protocol.
86    /// This message takes an event, which could either be one of the common
87    /// events like "message" or "error" or a custom event like "foo". But be
88    /// careful, the data string needs to be valid JSON. It's recommended to use
89    /// a library like `serde_json` to serialize the data properly.
90    ///
91    /// # Example
92    /// ```
93    /// use tf_rust_socketio::{ClientBuilder, RawClient, Payload};
94    /// use serde_json::json;
95    ///
96    /// let mut socket = ClientBuilder::new("http://localhost:4200/")
97    ///     .on("test", |payload: Payload, socket: RawClient| {
98    ///         println!("Received: {:#?}", payload);
99    ///         socket.emit("test", json!({"hello": true})).expect("Server unreachable");
100    ///      })
101    ///     .connect()
102    ///     .expect("connection failed");
103    ///
104    /// let json_payload = json!({"token": 123});
105    ///
106    /// let result = socket.emit("foo", json_payload);
107    ///
108    /// assert!(result.is_ok());
109    /// ```
110    #[inline]
111    pub fn emit<E, D>(&self, event: E, data: D) -> Result<()>
112    where
113        E: Into<Event>,
114        D: Into<Payload>,
115    {
116        self.socket.emit(&self.nsp, event.into(), data.into())
117    }
118
119    /// Example code for handling ACK response when calling emitWithAck on the server
120    ///
121    /// # Example
122    /// ```
123    /// use tf_rust_socketio::{ClientBuilder, Payload, RawClient};
124    /// use std::time::Duration;
125    /// use std::thread::sleep;
126    ///
127    ///
128    /// let ack_callback = |message: Payload, socket: RawClient| {
129    ///     match message {
130    ///         Payload::Text(values, ack_id) => {
131    ///             println!("{:#?}", values);
132    ///             // Respond with the specific ack_id
133    ///             if let Some(id) = ack_id {
134    ///                 socket.ack_with_id(id, "response").unwrap();
135    ///             }
136    ///         },
137    ///         Payload::Binary(bytes, ack_id) => {
138    ///             println!("Received bytes: {:#?}", bytes);
139    ///             if let Some(id) = ack_id {
140    ///                 socket.ack_with_id(id, vec![1, 2, 3]).unwrap();
141    ///             }
142    ///         },
143    ///         // This is deprecated, use Payload::Text instead
144    ///         Payload::String(str, ack_id) => {
145    ///             println!("{}", str);
146    ///             if let Some(id) = ack_id {
147    ///                 socket.ack_with_id(id, "response").unwrap();
148    ///             }
149    ///         },
150    ///    }
151    /// };
152    ///
153    /// let mut socket = ClientBuilder::new("http://localhost:4200/")
154    ///     .on("foo", ack_callback)
155    ///     .connect()
156    ///     .expect("connection failed");
157    ///
158    ///
159    ///
160    /// sleep(Duration::from_secs(2));
161    /// ```
162    #[inline]
163    pub fn ack<D>(&self, data: D) -> Result<()>
164    where
165        D: Into<Payload>,
166    {
167        // For backward compatibility, this method doesn't specify an ack_id
168        // It should only be used when there's only one pending ack
169        self.socket.ack(&self.nsp, data.into(), None)
170    }
171
172    /// Acknowledge a message with a specific ack_id
173    #[inline]
174    pub fn ack_with_id<D>(&self, ack_id: i32, data: D) -> Result<()>
175    where
176        D: Into<Payload>,
177    {
178        self.socket.ack(&self.nsp, data.into(), Some(ack_id))
179    }
180
181    /// Disconnects this client from the server by sending a `socket.io` closing
182    /// packet.
183    /// # Example
184    /// ```rust
185    /// use tf_rust_socketio::{ClientBuilder, Payload, RawClient};
186    /// use serde_json::json;
187    ///
188    /// fn handle_test(payload: Payload, socket: RawClient) {
189    ///     println!("Received: {:#?}", payload);
190    ///     socket.emit("test", json!({"hello": true})).expect("Server unreachable");
191    /// }
192    ///
193    /// let mut socket = ClientBuilder::new("http://localhost:4200/")
194    ///     .on("test", handle_test)
195    ///     .connect()
196    ///     .expect("connection failed");
197    ///
198    /// let json_payload = json!({"token": 123});
199    ///
200    /// socket.emit("foo", json_payload);
201    ///
202    /// // disconnect from the server
203    /// socket.disconnect();
204    ///
205    /// ```
206    pub fn disconnect(&self) -> Result<()> {
207        let disconnect_packet =
208            Packet::new(PacketId::Disconnect, self.nsp.clone(), None, None, 0, None);
209
210        // TODO: logging
211        let _ = self.socket.send(disconnect_packet);
212        self.socket.disconnect()?;
213
214        let _ = self.callback(
215            &Event::Close,
216            CloseReason::IOClientDisconnect.as_str(),
217            None,
218        ); // trigger on_close
219        Ok(())
220    }
221
222    /// Sends a message to the server but `alloc`s an `ack` to check whether the
223    /// server responded in a given time span. This message takes an event, which
224    /// could either be one of the common events like "message" or "error" or a
225    /// custom event like "foo", as well as a data parameter. But be careful,
226    /// in case you send a [`Payload::String`], the string needs to be valid JSON.
227    /// It's even recommended to use a library like serde_json to serialize the data properly.
228    /// It also requires a timeout `Duration` in which the client needs to answer.
229    /// If the ack is acked in the correct time span, the specified callback is
230    /// called. The callback consumes a [`Payload`] which represents the data send
231    /// by the server.
232    ///
233    /// # Example
234    /// ```
235    /// use tf_rust_socketio::{ClientBuilder, Payload, RawClient};
236    /// use serde_json::json;
237    /// use std::time::Duration;
238    /// use std::thread::sleep;
239    ///
240    /// let mut socket = ClientBuilder::new("http://localhost:4200/")
241    ///     .on("foo", |payload: Payload, _| println!("Received: {:#?}", payload))
242    ///     .connect()
243    ///     .expect("connection failed");
244    ///
245    /// let ack_callback = |message: Payload, socket: RawClient| {
246    ///     match message {
247    ///         Payload::Text(values, _) => println!("{:#?}", values),
248    ///         Payload::Binary(bytes, _) => println!("Received bytes: {:#?}", bytes),
249    ///         // This is deprecated, use Payload::Text instead
250    ///         #[allow(deprecated)]
251    ///         Payload::String(str, _) => println!("{}", str),
252    ///    }
253    /// };
254    ///
255    /// let payload = json!({"token": 123});
256    /// socket.emit_with_ack("foo", payload, Duration::from_secs(2), ack_callback).unwrap();
257    ///
258    /// sleep(Duration::from_secs(2));
259    /// ```
260    #[inline]
261    pub fn emit_with_ack<F, E, D>(
262        &self,
263        event: E,
264        data: D,
265        timeout: Duration,
266        callback: F,
267    ) -> Result<()>
268    where
269        F: FnMut(Payload, RawClient) + 'static + Send,
270        E: Into<Event>,
271        D: Into<Payload>,
272    {
273        let id = thread_rng().gen_range(0..999);
274        let socket_packet =
275            Packet::new_from_payload(data.into(), event.into(), &self.nsp, Some(id))?;
276
277        let ack = Ack {
278            id,
279            time_started: Instant::now(),
280            timeout,
281            callback: Callback::<SocketCallback>::new(callback),
282        };
283
284        // add the ack to the tuple of outstanding acks
285        self.outstanding_acks.lock()?.push(ack);
286
287        self.socket.send(socket_packet)?;
288        Ok(())
289    }
290
291    pub(crate) fn poll(&self) -> Result<Option<Packet>> {
292        loop {
293            match self.socket.poll() {
294                Err(err) => {
295                    self.callback(&Event::Error, err.to_string(), None)?;
296                    return Err(err);
297                }
298                Ok(Some(packet)) => {
299                    if packet.nsp == self.nsp {
300                        self.handle_socketio_packet(&packet)?;
301                        return Ok(Some(packet));
302                    } else {
303                        // Not our namespace continue polling
304                    }
305                }
306                Ok(None) => return Ok(None),
307            }
308        }
309    }
310
311    #[cfg(test)]
312    pub(crate) fn iter(&self) -> Iter<'_> {
313        Iter { socket: self }
314    }
315
316    fn callback<P: Into<Payload>>(
317        &self,
318        event: &Event,
319        payload: P,
320        ack_id: Option<i32>,
321    ) -> Result<()> {
322        let mut on = self.on.lock()?;
323        let mut on_any = self.on_any.lock()?;
324        let lock = on.deref_mut();
325        let on_any_lock = on_any.deref_mut();
326
327        let mut payload = payload.into();
328        payload.set_ack_id(ack_id);
329
330        if let Some(callback) = lock.get_mut(event) {
331            callback(payload.clone(), self.clone());
332        }
333        match event {
334            Event::Message | Event::Custom(_) => {
335                if let Some(callback) = on_any_lock {
336                    callback(event.clone(), payload, self.clone())
337                }
338            }
339            _ => {}
340        }
341        drop(on);
342        drop(on_any);
343        Ok(())
344    }
345
346    /// Handles the incoming acks and classifies what callbacks to call and how.
347    #[inline]
348    fn handle_ack(&self, socket_packet: &Packet) -> Result<()> {
349        let Some(id) = socket_packet.id else {
350            return Ok(());
351        };
352
353        let outstanding_ack = {
354            let mut outstanding_acks = self.outstanding_acks.lock()?;
355            outstanding_acks
356                .iter()
357                .position(|ack| ack.id == id)
358                .map(|pos| outstanding_acks.remove(pos))
359        };
360
361        // If we found a matching ack, call its callback otherwise ignore it.
362        // The official implementation just removes the ack id on timeout:
363        // https://github.com/socketio/socket.io-client/blob/main/lib/socket.ts#L467-L495
364        if let Some(mut ack) = outstanding_ack {
365            if ack.time_started.elapsed() < ack.timeout {
366                if let Some(ref payload) = socket_packet.data {
367                    let mut payload = Payload::from(payload.to_owned());
368                    payload.set_ack_id(socket_packet.id);
369                    ack.callback.deref_mut()(payload, self.clone());
370                }
371
372                if let Some(ref attachments) = socket_packet.attachments {
373                    if let Some(payload) = attachments.first() {
374                        let payload = Payload::Binary(payload.to_owned(), socket_packet.id);
375                        ack.callback.deref_mut()(payload, self.clone());
376                    }
377                }
378            }
379        }
380
381        Ok(())
382    }
383
384    /// Handles a binary event.
385    #[inline]
386    fn handle_binary_event(&self, packet: &Packet) -> Result<()> {
387        let event = if let Some(string_data) = &packet.data {
388            string_data.replace('\"', "").into()
389        } else {
390            Event::Message
391        };
392
393        if let Some(attachments) = &packet.attachments {
394            if let Some(binary_payload) = attachments.first() {
395                self.callback(
396                    &event,
397                    Payload::Binary(binary_payload.to_owned(), packet.id),
398                    packet.id,
399                )?;
400            }
401        }
402        Ok(())
403    }
404
405    /// A method that parses a packet and eventually calls the corresponding
406    /// callback with the supplied data.
407    fn handle_event(&self, packet: &Packet) -> Result<()> {
408        let Some(ref data) = packet.data else {
409            return Ok(());
410        };
411
412        // a socketio message always comes in one of the following two flavors (both JSON):
413        // 1: `["event", "msg", ...]`
414        // 2: `["msg"]`
415        // in case 2, the message is ment for the default message event, in case 1 the event
416        // is specified
417        if let Ok(Value::Array(contents)) = serde_json::from_str::<Value>(data) {
418            let (event, payloads) = match contents.len() {
419                0 => return Err(Error::IncompletePacket()),
420                1 => (Event::Message, contents.as_slice()),
421                // get rest of data if first is a event
422                _ => match contents.first() {
423                    Some(Value::String(ev)) => (Event::from(ev.as_str()), &contents[1..]),
424                    // get rest(1..) of them as data, not just take the 2nd element
425                    _ => (Event::Message, contents.as_slice()),
426                    // take them all as data
427                },
428            };
429
430            // call the correct callback
431            self.callback(&event, payloads.to_vec(), packet.id)?;
432        }
433
434        Ok(())
435    }
436
437    /// Handles the incoming messages and classifies what callbacks to call and how.
438    /// This method is later registered as the callback for the `on_data` event of the
439    /// engineio client.
440    #[inline]
441    fn handle_socketio_packet(&self, packet: &Packet) -> Result<()> {
442        if packet.nsp == self.nsp {
443            match packet.packet_type {
444                PacketId::Ack | PacketId::BinaryAck => {
445                    if let Err(err) = self.handle_ack(packet) {
446                        self.callback(&Event::Error, err.to_string(), None)?;
447                        return Err(err);
448                    }
449                }
450                PacketId::BinaryEvent => {
451                    if let Err(err) = self.handle_binary_event(packet) {
452                        self.callback(&Event::Error, err.to_string(), None)?;
453                    }
454                }
455                PacketId::Connect => {
456                    self.callback(&Event::Connect, "", None)?;
457                }
458                PacketId::Disconnect => {
459                    self.callback(
460                        &Event::Close,
461                        CloseReason::IOServerDisconnect.as_str(),
462                        None,
463                    )?;
464                }
465                PacketId::ConnectError => {
466                    self.callback(
467                        &Event::Error,
468                        String::from("Received an ConnectError frame: ")
469                            + &packet
470                                .clone()
471                                .data
472                                .unwrap_or_else(|| String::from("\"No error message provided\"")),
473                        None,
474                    )?;
475                }
476                PacketId::Event => {
477                    if let Err(err) = self.handle_event(packet) {
478                        self.callback(&Event::Error, err.to_string(), None)?;
479                    }
480                }
481            }
482        }
483        Ok(())
484    }
485}
486
487#[cfg(test)]
488pub struct Iter<'a> {
489    socket: &'a RawClient,
490}
491
492#[cfg(test)]
493impl<'a> Iterator for Iter<'a> {
494    type Item = Result<Packet>;
495    fn next(&mut self) -> std::option::Option<<Self as std::iter::Iterator>::Item> {
496        match self.socket.poll() {
497            Err(err) => Some(Err(err)),
498            Ok(Some(packet)) => Some(Ok(packet)),
499            Ok(None) => None,
500        }
501    }
502}
503
504#[cfg(test)]
505mod test {
506    use std::sync::mpsc;
507    use std::thread::sleep;
508
509    use super::*;
510    use crate::{client::TransportType, payload::Payload, ClientBuilder};
511    use bytes::Bytes;
512    use native_tls::TlsConnector;
513    use serde_json::json;
514    use std::time::Duration;
515
516    #[test]
517    fn socket_io_integration() -> Result<()> {
518        let url = crate::test::socket_io_server();
519
520        let socket = ClientBuilder::new(url)
521            .on("test", |msg, _| match msg {
522                #[allow(deprecated)]
523                Payload::String(str, _) => println!("Received string: {}", str),
524                Payload::Text(text, _) => println!("Received json: {:#?}", text),
525                Payload::Binary(bin, _) => println!("Received binary data: {:#?}", bin),
526            })
527            .connect()?;
528
529        let payload = json!({"token": 123});
530        #[allow(deprecated)]
531        let result = socket.emit("test", Payload::String(payload.to_string(), None));
532
533        assert!(result.is_ok());
534
535        let ack_callback = move |message: Payload, socket: RawClient| {
536            let result = socket.emit("test", Payload::Text(vec![json!({"got ack": true})], None));
537            assert!(result.is_ok());
538
539            println!("Yehaa! My ack got acked?");
540            if let Payload::Text(values, _) = message {
541                println!("Received json Ack");
542                println!("Ack data: {:#?}", values);
543            }
544        };
545
546        let ack = socket.emit_with_ack(
547            "test",
548            Payload::Text(vec![payload], None),
549            Duration::from_secs(1),
550            ack_callback,
551        );
552        assert!(ack.is_ok());
553
554        sleep(Duration::from_secs(2));
555
556        assert!(socket.disconnect().is_ok());
557
558        Ok(())
559    }
560
561    #[test]
562    fn socket_io_builder_integration() -> Result<()> {
563        let url = crate::test::socket_io_server();
564
565        // test socket build logic
566        let socket_builder = ClientBuilder::new(url);
567
568        let tls_connector = TlsConnector::builder()
569            .use_sni(true)
570            .build()
571            .expect("Found illegal configuration");
572
573        let socket = socket_builder
574            .namespace("/admin")
575            .tls_config(tls_connector)
576            .opening_header("accept-encoding", "application/json")
577            .on("test", |str, _| println!("Received: {:#?}", str))
578            .on("message", |payload, _| println!("{:#?}", payload))
579            .connect()?;
580
581        assert!(socket.emit("message", json!("Hello World")).is_ok());
582
583        assert!(socket.emit("binary", Bytes::from_static(&[46, 88])).is_ok());
584
585        assert!(socket
586            .emit_with_ack(
587                "binary",
588                json!("pls ack"),
589                Duration::from_secs(1),
590                |payload, _| {
591                    println!("Yehaa the ack got acked");
592                    println!("With data: {:#?}", payload);
593                }
594            )
595            .is_ok());
596
597        sleep(Duration::from_secs(2));
598
599        Ok(())
600    }
601
602    #[test]
603    fn socket_io_builder_integration_iterator() -> Result<()> {
604        let url = crate::test::socket_io_server();
605
606        // test socket build logic
607        let socket_builder = ClientBuilder::new(url);
608
609        let tls_connector = TlsConnector::builder()
610            .use_sni(true)
611            .build()
612            .expect("Found illegal configuration");
613
614        let socket = socket_builder
615            .namespace("/admin")
616            .tls_config(tls_connector)
617            .opening_header("accept-encoding", "application/json")
618            .on("test", |str, _| println!("Received: {:#?}", str))
619            .on("message", |payload, _| println!("{:#?}", payload))
620            .connect_raw()?;
621
622        assert!(socket.emit("message", json!("Hello World")).is_ok());
623
624        assert!(socket.emit("binary", Bytes::from_static(&[46, 88])).is_ok());
625
626        assert!(socket
627            .emit_with_ack(
628                "binary",
629                json!("pls ack"),
630                Duration::from_secs(1),
631                |payload, _| {
632                    println!("Yehaa the ack got acked");
633                    println!("With data: {:#?}", payload);
634                }
635            )
636            .is_ok());
637
638        test_socketio_socket(socket, "/admin".to_owned())
639    }
640
641    #[test]
642    fn socket_io_on_any_integration() -> Result<()> {
643        let url = crate::test::socket_io_server();
644
645        let (tx, rx) = mpsc::sync_channel(1);
646
647        let _socket = ClientBuilder::new(url)
648            .namespace("/")
649            .auth(json!({ "password": "123" }))
650            .on("auth", |payload, _client| {
651                if let Payload::Text(payload, _) = payload {
652                    println!("{:#?}", payload);
653                }
654            })
655            .on_any(move |event, payload, _client| {
656                if let Payload::Text(payload, _) = payload {
657                    println!("{event} {payload:#?}");
658                }
659                tx.send(String::from(event)).unwrap();
660            })
661            .connect()?;
662
663        // Sleep to give server enough time to send 2 events
664        sleep(Duration::from_secs(2));
665
666        let event = rx.recv().unwrap();
667        assert_eq!(event, "message");
668        let event = rx.recv().unwrap();
669        assert_eq!(event, "test");
670
671        Ok(())
672    }
673
674    #[test]
675    fn socket_io_auth_builder_integration() -> Result<()> {
676        let url = crate::test::socket_io_auth_server();
677        let nsp = String::from("/admin");
678        let socket = ClientBuilder::new(url)
679            .namespace(nsp.clone())
680            .auth(json!({ "password": "123" }))
681            .connect_raw()?;
682
683        let mut iter = socket
684            .iter()
685            .map(|packet| packet.unwrap())
686            .filter(|packet| packet.packet_type != PacketId::Connect);
687
688        let packet: Option<Packet> = iter.next();
689        assert!(packet.is_some());
690
691        let packet = packet.unwrap();
692
693        assert_eq!(
694            packet,
695            Packet::new(
696                PacketId::Event,
697                nsp,
698                Some("[\"auth\",\"success\"]".to_owned()),
699                None,
700                0,
701                None
702            )
703        );
704
705        Ok(())
706    }
707
708    #[test]
709    fn socketio_polling_integration() -> Result<()> {
710        let url = crate::test::socket_io_server();
711        let socket = ClientBuilder::new(url)
712            .transport_type(TransportType::Polling)
713            .connect_raw()?;
714        test_socketio_socket(socket, "/".to_owned())
715    }
716
717    #[test]
718    fn socket_io_websocket_integration() -> Result<()> {
719        let url = crate::test::socket_io_server();
720        let socket = ClientBuilder::new(url)
721            .transport_type(TransportType::Websocket)
722            .connect_raw()?;
723        test_socketio_socket(socket, "/".to_owned())
724    }
725
726    #[test]
727    fn socket_io_websocket_upgrade_integration() -> Result<()> {
728        let url = crate::test::socket_io_server();
729        let socket = ClientBuilder::new(url)
730            .transport_type(TransportType::WebsocketUpgrade)
731            .connect_raw()?;
732        test_socketio_socket(socket, "/".to_owned())
733    }
734
735    #[test]
736    fn socket_io_any_integration() -> Result<()> {
737        let url = crate::test::socket_io_server();
738        let socket = ClientBuilder::new(url)
739            .transport_type(TransportType::Any)
740            .connect_raw()?;
741        test_socketio_socket(socket, "/".to_owned())
742    }
743
744    fn test_socketio_socket(socket: RawClient, nsp: String) -> Result<()> {
745        let mut iter = socket
746            .iter()
747            .map(|packet| packet.unwrap())
748            .filter(|packet| packet.packet_type != PacketId::Connect);
749
750        let packet: Option<Packet> = iter.next();
751        assert!(packet.is_some());
752
753        let packet = packet.unwrap();
754
755        assert_eq!(
756            packet,
757            Packet::new(
758                PacketId::Event,
759                nsp.clone(),
760                Some("[\"Hello from the message event!\"]".to_owned()),
761                None,
762                0,
763                None,
764            )
765        );
766
767        let packet: Option<Packet> = iter.next();
768        assert!(packet.is_some());
769
770        let packet = packet.unwrap();
771
772        assert_eq!(
773            packet,
774            Packet::new(
775                PacketId::Event,
776                nsp.clone(),
777                Some("[\"test\",\"Hello from the test event!\"]".to_owned()),
778                None,
779                0,
780                None
781            )
782        );
783
784        let packet: Option<Packet> = iter.next();
785        assert!(packet.is_some());
786
787        let packet = packet.unwrap();
788        assert_eq!(
789            packet,
790            Packet::new(
791                PacketId::BinaryEvent,
792                nsp.clone(),
793                None,
794                None,
795                1,
796                Some(vec![Bytes::from_static(&[4, 5, 6])]),
797            )
798        );
799
800        let packet: Option<Packet> = iter.next();
801        assert!(packet.is_some());
802
803        let packet = packet.unwrap();
804        assert_eq!(
805            packet,
806            Packet::new(
807                PacketId::BinaryEvent,
808                nsp.clone(),
809                Some("\"test\"".to_owned()),
810                None,
811                1,
812                Some(vec![Bytes::from_static(&[1, 2, 3])]),
813            )
814        );
815
816        let packet: Option<Packet> = iter.next();
817
818        assert!(packet.is_some());
819
820        let packet = packet.unwrap();
821        assert_eq!(
822            packet,
823            Packet::new(
824                PacketId::Event,
825                nsp.clone(),
826                Some(
827                    serde_json::Value::Array(vec![
828                        serde_json::Value::from("This is the first argument"),
829                        serde_json::Value::from("This is the second argument"),
830                        serde_json::json!({"argCount":3})
831                    ])
832                    .to_string()
833                ),
834                None,
835                0,
836                None,
837            )
838        );
839
840        let packet: Option<Packet> = iter.next();
841
842        assert!(packet.is_some());
843
844        let packet = packet.unwrap();
845        assert_eq!(
846            packet,
847            Packet::new(
848                PacketId::Event,
849                nsp.clone(),
850                Some(
851                    serde_json::json!([
852                        "on_abc_event",
853                        "",
854                        {
855                        "abc": 0,
856                        "some_other": "value",
857                        }
858                    ])
859                    .to_string()
860                ),
861                None,
862                0,
863                None,
864            )
865        );
866
867        assert!(socket
868            .emit_with_ack(
869                "test",
870                Payload::from("123"),
871                Duration::from_secs(10),
872                |message: Payload, _| {
873                    println!("Yehaa! My ack got acked?");
874                    if let Payload::Text(values, _) = message {
875                        println!("Received ack");
876                        println!("Ack data: {values:#?}");
877                    }
878                }
879            )
880            .is_ok());
881
882        Ok(())
883    }
884
885    // TODO: add secure socketio server
886}