rust_socketio/
socket.rs

1use crate::error::{Error, Result};
2use crate::packet::{Packet, PacketId};
3use bytes::Bytes;
4use rust_engineio::{Client as EngineClient, Packet as EnginePacket, PacketId as EnginePacketId};
5use std::convert::TryFrom;
6use std::sync::{atomic::AtomicBool, Arc};
7use std::{fmt::Debug, sync::atomic::Ordering};
8
9use super::{event::Event, payload::Payload};
10
11/// Handles communication in the `socket.io` protocol.
12#[derive(Clone, Debug)]
13pub(crate) struct Socket {
14    //TODO: 0.4.0 refactor this
15    engine_client: Arc<EngineClient>,
16    connected: Arc<AtomicBool>,
17}
18
19impl Socket {
20    /// Creates an instance of `Socket`.
21
22    pub(super) fn new(engine_client: EngineClient) -> Result<Self> {
23        Ok(Socket {
24            engine_client: Arc::new(engine_client),
25            connected: Arc::new(AtomicBool::default()),
26        })
27    }
28
29    /// Connects to the server. This includes a connection of the underlying
30    /// engine.io client and afterwards an opening socket.io request.
31    pub fn connect(&self) -> Result<()> {
32        self.engine_client.connect()?;
33
34        // store the connected value as true, if the connection process fails
35        // later, the value will be updated
36        self.connected.store(true, Ordering::Release);
37
38        Ok(())
39    }
40
41    /// Disconnects from the server by sending a socket.io `Disconnect` packet. This results
42    /// in the underlying engine.io transport to get closed as well.
43    pub fn disconnect(&self) -> Result<()> {
44        if self.is_engineio_connected()? {
45            self.engine_client.disconnect()?;
46        }
47        if self.connected.load(Ordering::Acquire) {
48            self.connected.store(false, Ordering::Release);
49        }
50        Ok(())
51    }
52
53    /// Sends a `socket.io` packet to the server using the `engine.io` client.
54    pub fn send(&self, packet: Packet) -> Result<()> {
55        if !self.is_engineio_connected()? || !self.connected.load(Ordering::Acquire) {
56            return Err(Error::IllegalActionBeforeOpen());
57        }
58
59        // the packet, encoded as an engine.io message packet
60        let engine_packet = EnginePacket::new(EnginePacketId::Message, Bytes::from(&packet));
61        self.engine_client.emit(engine_packet)?;
62
63        if let Some(attachments) = packet.attachments {
64            for attachment in attachments {
65                let engine_packet = EnginePacket::new(EnginePacketId::MessageBinary, attachment);
66                self.engine_client.emit(engine_packet)?;
67            }
68        }
69
70        Ok(())
71    }
72
73    /// Emits to certain event with given data. The data needs to be JSON,
74    /// otherwise this returns an `InvalidJson` error.
75    pub fn emit(&self, nsp: &str, event: Event, data: Payload) -> Result<()> {
76        let socket_packet = Packet::new_from_payload(data, event, nsp, None)?;
77
78        self.send(socket_packet)
79    }
80
81    pub(crate) fn poll(&self) -> Result<Option<Packet>> {
82        loop {
83            match self.engine_client.poll() {
84                Ok(Some(packet)) => {
85                    if packet.packet_id == EnginePacketId::Message
86                        || packet.packet_id == EnginePacketId::MessageBinary
87                    {
88                        let packet = self.handle_engineio_packet(packet)?;
89                        self.handle_socketio_packet(&packet);
90                        return Ok(Some(packet));
91                    } else {
92                        continue;
93                    }
94                }
95                Ok(None) => {
96                    return Ok(None);
97                }
98                Err(err) => return Err(err.into()),
99            }
100        }
101    }
102
103    /// Handles the connection/disconnection.
104    #[inline]
105    fn handle_socketio_packet(&self, socket_packet: &Packet) {
106        match socket_packet.packet_type {
107            PacketId::Connect => {
108                self.connected.store(true, Ordering::Release);
109            }
110            PacketId::ConnectError => {
111                self.connected.store(false, Ordering::Release);
112            }
113            PacketId::Disconnect => {
114                self.connected.store(false, Ordering::Release);
115            }
116            _ => (),
117        }
118    }
119
120    /// Handles new incoming engineio packets
121    fn handle_engineio_packet(&self, packet: EnginePacket) -> Result<Packet> {
122        let mut socket_packet = Packet::try_from(&packet.data)?;
123
124        // Only handle attachments if there are any
125        if socket_packet.attachment_count > 0 {
126            let mut attachments_left = socket_packet.attachment_count;
127            let mut attachments = Vec::new();
128            while attachments_left > 0 {
129                let next = self.engine_client.poll();
130                match next {
131                    Err(err) => return Err(err.into()),
132                    Ok(Some(packet)) => match packet.packet_id {
133                        EnginePacketId::MessageBinary | EnginePacketId::Message => {
134                            attachments.push(packet.data);
135                            attachments_left -= 1;
136                        }
137                        _ => {
138                            return Err(Error::InvalidAttachmentPacketType(
139                                packet.packet_id.into(),
140                            ));
141                        }
142                    },
143                    Ok(None) => {
144                        // Engineio closed before attachments completed.
145                        return Err(Error::IncompletePacket());
146                    }
147                }
148            }
149            socket_packet.attachments = Some(attachments);
150        }
151
152        Ok(socket_packet)
153    }
154
155    fn is_engineio_connected(&self) -> Result<bool> {
156        Ok(self.engine_client.is_connected()?)
157    }
158}