1use super::callback::Callback;
2use crate::packet::{Packet, PacketId};
3use crate::Error;
4pub(crate) use crate::{event::CloseReason, event::Event, payload::Payload};
5use rand::{thread_rng, Rng};
6use serde_json::Value;
7
8use crate::client::callback::{SocketAnyCallback, SocketCallback};
9use crate::error::Result;
10use std::collections::HashMap;
11use std::ops::DerefMut;
12use std::sync::{Arc, Mutex};
13use std::time::Duration;
14use std::time::Instant;
15
16use crate::socket::Socket as InnerSocket;
17
18#[derive(Debug)]
23pub struct Ack {
24 pub id: i32,
25 timeout: Duration,
26 time_started: Instant,
27 callback: Callback<SocketCallback>,
28}
29
30#[derive(Clone)]
34pub struct RawClient {
35 socket: InnerSocket,
37 on: Arc<Mutex<HashMap<Event, Callback<SocketCallback>>>>,
38 on_any: Arc<Mutex<Option<Callback<SocketAnyCallback>>>>,
39 outstanding_acks: Arc<Mutex<Vec<Ack>>>,
40 nsp: String,
42 auth: Option<Value>,
44}
45
46impl RawClient {
47 pub(crate) fn new<T: Into<String>>(
52 socket: InnerSocket,
53 namespace: T,
54 on: Arc<Mutex<HashMap<Event, Callback<SocketCallback>>>>,
55 on_any: Arc<Mutex<Option<Callback<SocketAnyCallback>>>>,
56 auth: Option<Value>,
57 ) -> Result<Self> {
58 Ok(RawClient {
59 socket,
60 nsp: namespace.into(),
61 on,
62 on_any,
63 outstanding_acks: Arc::new(Mutex::new(Vec::new())),
64 auth,
65 })
66 }
67
68 pub(crate) fn connect(&self) -> Result<()> {
72 self.socket.connect()?;
74
75 let auth = self.auth.as_ref().map(|data| data.to_string());
76
77 let open_packet = Packet::new(PacketId::Connect, self.nsp.clone(), auth, None, 0, None);
79
80 self.socket.send(open_packet)?;
81
82 Ok(())
83 }
84
85 #[inline]
111 pub fn emit<E, D>(&self, event: E, data: D) -> Result<()>
112 where
113 E: Into<Event>,
114 D: Into<Payload>,
115 {
116 self.socket.emit(&self.nsp, event.into(), data.into())
117 }
118
119 #[inline]
163 pub fn ack<D>(&self, data: D) -> Result<()>
164 where
165 D: Into<Payload>,
166 {
167 self.socket.ack(&self.nsp, data.into(), None)
170 }
171
172 #[inline]
174 pub fn ack_with_id<D>(&self, ack_id: i32, data: D) -> Result<()>
175 where
176 D: Into<Payload>,
177 {
178 self.socket.ack(&self.nsp, data.into(), Some(ack_id))
179 }
180
181 pub fn disconnect(&self) -> Result<()> {
207 let disconnect_packet =
208 Packet::new(PacketId::Disconnect, self.nsp.clone(), None, None, 0, None);
209
210 let _ = self.socket.send(disconnect_packet);
212 self.socket.disconnect()?;
213
214 let _ = self.callback(
215 &Event::Close,
216 CloseReason::IOClientDisconnect.as_str(),
217 None,
218 ); Ok(())
220 }
221
222 #[inline]
261 pub fn emit_with_ack<F, E, D>(
262 &self,
263 event: E,
264 data: D,
265 timeout: Duration,
266 callback: F,
267 ) -> Result<()>
268 where
269 F: FnMut(Payload, RawClient) + 'static + Send,
270 E: Into<Event>,
271 D: Into<Payload>,
272 {
273 let id = thread_rng().gen_range(0..999);
274 let socket_packet =
275 Packet::new_from_payload(data.into(), event.into(), &self.nsp, Some(id))?;
276
277 let ack = Ack {
278 id,
279 time_started: Instant::now(),
280 timeout,
281 callback: Callback::<SocketCallback>::new(callback),
282 };
283
284 self.outstanding_acks.lock()?.push(ack);
286
287 self.socket.send(socket_packet)?;
288 Ok(())
289 }
290
291 pub(crate) fn poll(&self) -> Result<Option<Packet>> {
292 loop {
293 match self.socket.poll() {
294 Err(err) => {
295 self.callback(&Event::Error, err.to_string(), None)?;
296 return Err(err);
297 }
298 Ok(Some(packet)) => {
299 if packet.nsp == self.nsp {
300 self.handle_socketio_packet(&packet)?;
301 return Ok(Some(packet));
302 } else {
303 }
305 }
306 Ok(None) => return Ok(None),
307 }
308 }
309 }
310
311 #[cfg(test)]
312 pub(crate) fn iter(&self) -> Iter<'_> {
313 Iter { socket: self }
314 }
315
316 fn callback<P: Into<Payload>>(
317 &self,
318 event: &Event,
319 payload: P,
320 ack_id: Option<i32>,
321 ) -> Result<()> {
322 let mut on = self.on.lock()?;
323 let mut on_any = self.on_any.lock()?;
324 let lock = on.deref_mut();
325 let on_any_lock = on_any.deref_mut();
326
327 let mut payload = payload.into();
328 payload.set_ack_id(ack_id);
329
330 if let Some(callback) = lock.get_mut(event) {
331 callback(payload.clone(), self.clone());
332 }
333 match event {
334 Event::Message | Event::Custom(_) => {
335 if let Some(callback) = on_any_lock {
336 callback(event.clone(), payload, self.clone())
337 }
338 }
339 _ => {}
340 }
341 drop(on);
342 drop(on_any);
343 Ok(())
344 }
345
346 #[inline]
348 fn handle_ack(&self, socket_packet: &Packet) -> Result<()> {
349 let Some(id) = socket_packet.id else {
350 return Ok(());
351 };
352
353 let outstanding_ack = {
354 let mut outstanding_acks = self.outstanding_acks.lock()?;
355 outstanding_acks
356 .iter()
357 .position(|ack| ack.id == id)
358 .map(|pos| outstanding_acks.remove(pos))
359 };
360
361 if let Some(mut ack) = outstanding_ack {
365 if ack.time_started.elapsed() < ack.timeout {
366 if let Some(ref payload) = socket_packet.data {
367 let mut payload = Payload::from(payload.to_owned());
368 payload.set_ack_id(socket_packet.id);
369 ack.callback.deref_mut()(payload, self.clone());
370 }
371
372 if let Some(ref attachments) = socket_packet.attachments {
373 if let Some(payload) = attachments.first() {
374 let payload = Payload::Binary(payload.to_owned(), socket_packet.id);
375 ack.callback.deref_mut()(payload, self.clone());
376 }
377 }
378 }
379 }
380
381 Ok(())
382 }
383
384 #[inline]
386 fn handle_binary_event(&self, packet: &Packet) -> Result<()> {
387 let event = if let Some(string_data) = &packet.data {
388 string_data.replace('\"', "").into()
389 } else {
390 Event::Message
391 };
392
393 if let Some(attachments) = &packet.attachments {
394 if let Some(binary_payload) = attachments.first() {
395 self.callback(
396 &event,
397 Payload::Binary(binary_payload.to_owned(), packet.id),
398 packet.id,
399 )?;
400 }
401 }
402 Ok(())
403 }
404
405 fn handle_event(&self, packet: &Packet) -> Result<()> {
408 let Some(ref data) = packet.data else {
409 return Ok(());
410 };
411
412 if let Ok(Value::Array(contents)) = serde_json::from_str::<Value>(data) {
418 let (event, payloads) = match contents.len() {
419 0 => return Err(Error::IncompletePacket()),
420 1 => (Event::Message, contents.as_slice()),
421 _ => match contents.first() {
423 Some(Value::String(ev)) => (Event::from(ev.as_str()), &contents[1..]),
424 _ => (Event::Message, contents.as_slice()),
426 },
428 };
429
430 self.callback(&event, payloads.to_vec(), packet.id)?;
432 }
433
434 Ok(())
435 }
436
437 #[inline]
441 fn handle_socketio_packet(&self, packet: &Packet) -> Result<()> {
442 if packet.nsp == self.nsp {
443 match packet.packet_type {
444 PacketId::Ack | PacketId::BinaryAck => {
445 if let Err(err) = self.handle_ack(packet) {
446 self.callback(&Event::Error, err.to_string(), None)?;
447 return Err(err);
448 }
449 }
450 PacketId::BinaryEvent => {
451 if let Err(err) = self.handle_binary_event(packet) {
452 self.callback(&Event::Error, err.to_string(), None)?;
453 }
454 }
455 PacketId::Connect => {
456 self.callback(&Event::Connect, "", None)?;
457 }
458 PacketId::Disconnect => {
459 self.callback(
460 &Event::Close,
461 CloseReason::IOServerDisconnect.as_str(),
462 None,
463 )?;
464 }
465 PacketId::ConnectError => {
466 self.callback(
467 &Event::Error,
468 String::from("Received an ConnectError frame: ")
469 + &packet
470 .clone()
471 .data
472 .unwrap_or_else(|| String::from("\"No error message provided\"")),
473 None,
474 )?;
475 }
476 PacketId::Event => {
477 if let Err(err) = self.handle_event(packet) {
478 self.callback(&Event::Error, err.to_string(), None)?;
479 }
480 }
481 }
482 }
483 Ok(())
484 }
485}
486
487#[cfg(test)]
488pub struct Iter<'a> {
489 socket: &'a RawClient,
490}
491
492#[cfg(test)]
493impl<'a> Iterator for Iter<'a> {
494 type Item = Result<Packet>;
495 fn next(&mut self) -> std::option::Option<<Self as std::iter::Iterator>::Item> {
496 match self.socket.poll() {
497 Err(err) => Some(Err(err)),
498 Ok(Some(packet)) => Some(Ok(packet)),
499 Ok(None) => None,
500 }
501 }
502}
503
504#[cfg(test)]
505mod test {
506 use std::sync::mpsc;
507 use std::thread::sleep;
508
509 use super::*;
510 use crate::{client::TransportType, payload::Payload, ClientBuilder};
511 use bytes::Bytes;
512 use native_tls::TlsConnector;
513 use serde_json::json;
514 use std::time::Duration;
515
516 #[test]
517 fn socket_io_integration() -> Result<()> {
518 let url = crate::test::socket_io_server();
519
520 let socket = ClientBuilder::new(url)
521 .on("test", |msg, _| match msg {
522 #[allow(deprecated)]
523 Payload::String(str, _) => println!("Received string: {}", str),
524 Payload::Text(text, _) => println!("Received json: {:#?}", text),
525 Payload::Binary(bin, _) => println!("Received binary data: {:#?}", bin),
526 })
527 .connect()?;
528
529 let payload = json!({"token": 123});
530 #[allow(deprecated)]
531 let result = socket.emit("test", Payload::String(payload.to_string(), None));
532
533 assert!(result.is_ok());
534
535 let ack_callback = move |message: Payload, socket: RawClient| {
536 let result = socket.emit("test", Payload::Text(vec![json!({"got ack": true})], None));
537 assert!(result.is_ok());
538
539 println!("Yehaa! My ack got acked?");
540 if let Payload::Text(values, _) = message {
541 println!("Received json Ack");
542 println!("Ack data: {:#?}", values);
543 }
544 };
545
546 let ack = socket.emit_with_ack(
547 "test",
548 Payload::Text(vec![payload], None),
549 Duration::from_secs(1),
550 ack_callback,
551 );
552 assert!(ack.is_ok());
553
554 sleep(Duration::from_secs(2));
555
556 assert!(socket.disconnect().is_ok());
557
558 Ok(())
559 }
560
561 #[test]
562 fn socket_io_builder_integration() -> Result<()> {
563 let url = crate::test::socket_io_server();
564
565 let socket_builder = ClientBuilder::new(url);
567
568 let tls_connector = TlsConnector::builder()
569 .use_sni(true)
570 .build()
571 .expect("Found illegal configuration");
572
573 let socket = socket_builder
574 .namespace("/admin")
575 .tls_config(tls_connector)
576 .opening_header("accept-encoding", "application/json")
577 .on("test", |str, _| println!("Received: {:#?}", str))
578 .on("message", |payload, _| println!("{:#?}", payload))
579 .connect()?;
580
581 assert!(socket.emit("message", json!("Hello World")).is_ok());
582
583 assert!(socket.emit("binary", Bytes::from_static(&[46, 88])).is_ok());
584
585 assert!(socket
586 .emit_with_ack(
587 "binary",
588 json!("pls ack"),
589 Duration::from_secs(1),
590 |payload, _| {
591 println!("Yehaa the ack got acked");
592 println!("With data: {:#?}", payload);
593 }
594 )
595 .is_ok());
596
597 sleep(Duration::from_secs(2));
598
599 Ok(())
600 }
601
602 #[test]
603 fn socket_io_builder_integration_iterator() -> Result<()> {
604 let url = crate::test::socket_io_server();
605
606 let socket_builder = ClientBuilder::new(url);
608
609 let tls_connector = TlsConnector::builder()
610 .use_sni(true)
611 .build()
612 .expect("Found illegal configuration");
613
614 let socket = socket_builder
615 .namespace("/admin")
616 .tls_config(tls_connector)
617 .opening_header("accept-encoding", "application/json")
618 .on("test", |str, _| println!("Received: {:#?}", str))
619 .on("message", |payload, _| println!("{:#?}", payload))
620 .connect_raw()?;
621
622 assert!(socket.emit("message", json!("Hello World")).is_ok());
623
624 assert!(socket.emit("binary", Bytes::from_static(&[46, 88])).is_ok());
625
626 assert!(socket
627 .emit_with_ack(
628 "binary",
629 json!("pls ack"),
630 Duration::from_secs(1),
631 |payload, _| {
632 println!("Yehaa the ack got acked");
633 println!("With data: {:#?}", payload);
634 }
635 )
636 .is_ok());
637
638 test_socketio_socket(socket, "/admin".to_owned())
639 }
640
641 #[test]
642 fn socket_io_on_any_integration() -> Result<()> {
643 let url = crate::test::socket_io_server();
644
645 let (tx, rx) = mpsc::sync_channel(1);
646
647 let _socket = ClientBuilder::new(url)
648 .namespace("/")
649 .auth(json!({ "password": "123" }))
650 .on("auth", |payload, _client| {
651 if let Payload::Text(payload, _) = payload {
652 println!("{:#?}", payload);
653 }
654 })
655 .on_any(move |event, payload, _client| {
656 if let Payload::Text(payload, _) = payload {
657 println!("{event} {payload:#?}");
658 }
659 tx.send(String::from(event)).unwrap();
660 })
661 .connect()?;
662
663 sleep(Duration::from_secs(2));
665
666 let event = rx.recv().unwrap();
667 assert_eq!(event, "message");
668 let event = rx.recv().unwrap();
669 assert_eq!(event, "test");
670
671 Ok(())
672 }
673
674 #[test]
675 fn socket_io_auth_builder_integration() -> Result<()> {
676 let url = crate::test::socket_io_auth_server();
677 let nsp = String::from("/admin");
678 let socket = ClientBuilder::new(url)
679 .namespace(nsp.clone())
680 .auth(json!({ "password": "123" }))
681 .connect_raw()?;
682
683 let mut iter = socket
684 .iter()
685 .map(|packet| packet.unwrap())
686 .filter(|packet| packet.packet_type != PacketId::Connect);
687
688 let packet: Option<Packet> = iter.next();
689 assert!(packet.is_some());
690
691 let packet = packet.unwrap();
692
693 assert_eq!(
694 packet,
695 Packet::new(
696 PacketId::Event,
697 nsp,
698 Some("[\"auth\",\"success\"]".to_owned()),
699 None,
700 0,
701 None
702 )
703 );
704
705 Ok(())
706 }
707
708 #[test]
709 fn socketio_polling_integration() -> Result<()> {
710 let url = crate::test::socket_io_server();
711 let socket = ClientBuilder::new(url)
712 .transport_type(TransportType::Polling)
713 .connect_raw()?;
714 test_socketio_socket(socket, "/".to_owned())
715 }
716
717 #[test]
718 fn socket_io_websocket_integration() -> Result<()> {
719 let url = crate::test::socket_io_server();
720 let socket = ClientBuilder::new(url)
721 .transport_type(TransportType::Websocket)
722 .connect_raw()?;
723 test_socketio_socket(socket, "/".to_owned())
724 }
725
726 #[test]
727 fn socket_io_websocket_upgrade_integration() -> Result<()> {
728 let url = crate::test::socket_io_server();
729 let socket = ClientBuilder::new(url)
730 .transport_type(TransportType::WebsocketUpgrade)
731 .connect_raw()?;
732 test_socketio_socket(socket, "/".to_owned())
733 }
734
735 #[test]
736 fn socket_io_any_integration() -> Result<()> {
737 let url = crate::test::socket_io_server();
738 let socket = ClientBuilder::new(url)
739 .transport_type(TransportType::Any)
740 .connect_raw()?;
741 test_socketio_socket(socket, "/".to_owned())
742 }
743
744 fn test_socketio_socket(socket: RawClient, nsp: String) -> Result<()> {
745 let mut iter = socket
746 .iter()
747 .map(|packet| packet.unwrap())
748 .filter(|packet| packet.packet_type != PacketId::Connect);
749
750 let packet: Option<Packet> = iter.next();
751 assert!(packet.is_some());
752
753 let packet = packet.unwrap();
754
755 assert_eq!(
756 packet,
757 Packet::new(
758 PacketId::Event,
759 nsp.clone(),
760 Some("[\"Hello from the message event!\"]".to_owned()),
761 None,
762 0,
763 None,
764 )
765 );
766
767 let packet: Option<Packet> = iter.next();
768 assert!(packet.is_some());
769
770 let packet = packet.unwrap();
771
772 assert_eq!(
773 packet,
774 Packet::new(
775 PacketId::Event,
776 nsp.clone(),
777 Some("[\"test\",\"Hello from the test event!\"]".to_owned()),
778 None,
779 0,
780 None
781 )
782 );
783
784 let packet: Option<Packet> = iter.next();
785 assert!(packet.is_some());
786
787 let packet = packet.unwrap();
788 assert_eq!(
789 packet,
790 Packet::new(
791 PacketId::BinaryEvent,
792 nsp.clone(),
793 None,
794 None,
795 1,
796 Some(vec![Bytes::from_static(&[4, 5, 6])]),
797 )
798 );
799
800 let packet: Option<Packet> = iter.next();
801 assert!(packet.is_some());
802
803 let packet = packet.unwrap();
804 assert_eq!(
805 packet,
806 Packet::new(
807 PacketId::BinaryEvent,
808 nsp.clone(),
809 Some("\"test\"".to_owned()),
810 None,
811 1,
812 Some(vec![Bytes::from_static(&[1, 2, 3])]),
813 )
814 );
815
816 let packet: Option<Packet> = iter.next();
817
818 assert!(packet.is_some());
819
820 let packet = packet.unwrap();
821 assert_eq!(
822 packet,
823 Packet::new(
824 PacketId::Event,
825 nsp.clone(),
826 Some(
827 serde_json::Value::Array(vec![
828 serde_json::Value::from("This is the first argument"),
829 serde_json::Value::from("This is the second argument"),
830 serde_json::json!({"argCount":3})
831 ])
832 .to_string()
833 ),
834 None,
835 0,
836 None,
837 )
838 );
839
840 let packet: Option<Packet> = iter.next();
841
842 assert!(packet.is_some());
843
844 let packet = packet.unwrap();
845 assert_eq!(
846 packet,
847 Packet::new(
848 PacketId::Event,
849 nsp.clone(),
850 Some(
851 serde_json::json!([
852 "on_abc_event",
853 "",
854 {
855 "abc": 0,
856 "some_other": "value",
857 }
858 ])
859 .to_string()
860 ),
861 None,
862 0,
863 None,
864 )
865 );
866
867 assert!(socket
868 .emit_with_ack(
869 "test",
870 Payload::from("123"),
871 Duration::from_secs(10),
872 |message: Payload, _| {
873 println!("Yehaa! My ack got acked?");
874 if let Payload::Text(values, _) = message {
875 println!("Received ack");
876 println!("Ack data: {values:#?}");
877 }
878 }
879 )
880 .is_ok());
881
882 Ok(())
883 }
884
885 }