1use crate::error::{Error, Result};
2use crate::packet::{Packet, PacketId};
3use bytes::Bytes;
4use rust_engineio::{Client as EngineClient, Packet as EnginePacket, PacketId as EnginePacketId};
5use std::convert::TryFrom;
6use std::sync::{atomic::AtomicBool, Arc};
7use std::{fmt::Debug, sync::atomic::Ordering};
8
9use super::{event::Event, payload::Payload};
10
11#[derive(Clone, Debug)]
13pub(crate) struct Socket {
14 engine_client: Arc<EngineClient>,
16 connected: Arc<AtomicBool>,
17}
18
19impl Socket {
20 pub(super) fn new(engine_client: EngineClient) -> Result<Self> {
23 Ok(Socket {
24 engine_client: Arc::new(engine_client),
25 connected: Arc::new(AtomicBool::default()),
26 })
27 }
28
29 pub fn connect(&self) -> Result<()> {
32 self.engine_client.connect()?;
33
34 self.connected.store(true, Ordering::Release);
37
38 Ok(())
39 }
40
41 pub fn disconnect(&self) -> Result<()> {
44 if self.is_engineio_connected()? {
45 self.engine_client.disconnect()?;
46 }
47 if self.connected.load(Ordering::Acquire) {
48 self.connected.store(false, Ordering::Release);
49 }
50 Ok(())
51 }
52
53 pub fn send(&self, packet: Packet) -> Result<()> {
55 if !self.is_engineio_connected()? || !self.connected.load(Ordering::Acquire) {
56 return Err(Error::IllegalActionBeforeOpen());
57 }
58
59 let engine_packet = EnginePacket::new(EnginePacketId::Message, Bytes::from(&packet));
61 self.engine_client.emit(engine_packet)?;
62
63 if let Some(attachments) = packet.attachments {
64 for attachment in attachments {
65 let engine_packet = EnginePacket::new(EnginePacketId::MessageBinary, attachment);
66 self.engine_client.emit(engine_packet)?;
67 }
68 }
69
70 Ok(())
71 }
72
73 pub fn emit(&self, nsp: &str, event: Event, data: Payload) -> Result<()> {
76 let socket_packet = Packet::new_from_payload(data, event, nsp, None)?;
77
78 self.send(socket_packet)
79 }
80
81 pub(crate) fn poll(&self) -> Result<Option<Packet>> {
82 loop {
83 match self.engine_client.poll() {
84 Ok(Some(packet)) => {
85 if packet.packet_id == EnginePacketId::Message
86 || packet.packet_id == EnginePacketId::MessageBinary
87 {
88 let packet = self.handle_engineio_packet(packet)?;
89 self.handle_socketio_packet(&packet);
90 return Ok(Some(packet));
91 } else {
92 continue;
93 }
94 }
95 Ok(None) => {
96 return Ok(None);
97 }
98 Err(err) => return Err(err.into()),
99 }
100 }
101 }
102
103 #[inline]
105 fn handle_socketio_packet(&self, socket_packet: &Packet) {
106 match socket_packet.packet_type {
107 PacketId::Connect => {
108 self.connected.store(true, Ordering::Release);
109 }
110 PacketId::ConnectError => {
111 self.connected.store(false, Ordering::Release);
112 }
113 PacketId::Disconnect => {
114 self.connected.store(false, Ordering::Release);
115 }
116 _ => (),
117 }
118 }
119
120 fn handle_engineio_packet(&self, packet: EnginePacket) -> Result<Packet> {
122 let mut socket_packet = Packet::try_from(&packet.data)?;
123
124 if socket_packet.attachment_count > 0 {
126 let mut attachments_left = socket_packet.attachment_count;
127 let mut attachments = Vec::new();
128 while attachments_left > 0 {
129 let next = self.engine_client.poll();
130 match next {
131 Err(err) => return Err(err.into()),
132 Ok(Some(packet)) => match packet.packet_id {
133 EnginePacketId::MessageBinary | EnginePacketId::Message => {
134 attachments.push(packet.data);
135 attachments_left -= 1;
136 }
137 _ => {
138 return Err(Error::InvalidAttachmentPacketType(
139 packet.packet_id.into(),
140 ));
141 }
142 },
143 Ok(None) => {
144 return Err(Error::IncompletePacket());
146 }
147 }
148 }
149 socket_packet.attachments = Some(attachments);
150 }
151
152 Ok(socket_packet)
153 }
154
155 fn is_engineio_connected(&self) -> Result<bool> {
156 Ok(self.engine_client.is_connected()?)
157 }
158}