use crate::error::{Error, Result};
use crate::packet::{Packet, PacketId};
use bytes::Bytes;
use rust_engineio::{Client as EngineClient, Packet as EnginePacket, PacketId as EnginePacketId};
use std::convert::TryFrom;
use std::sync::{atomic::AtomicBool, Arc};
use std::{fmt::Debug, sync::atomic::Ordering};
use super::{event::Event, payload::Payload};
#[derive(Clone, Debug)]
pub(crate) struct Socket {
engine_client: Arc<EngineClient>,
connected: Arc<AtomicBool>,
}
impl Socket {
pub(super) fn new(engine_client: EngineClient) -> Result<Self> {
Ok(Socket {
engine_client: Arc::new(engine_client),
connected: Arc::new(AtomicBool::default()),
})
}
pub fn connect(&self) -> Result<()> {
self.engine_client.connect()?;
self.connected.store(true, Ordering::Release);
Ok(())
}
pub fn disconnect(&self) -> Result<()> {
if self.is_engineio_connected()? {
self.engine_client.disconnect()?;
}
if self.connected.load(Ordering::Acquire) {
self.connected.store(false, Ordering::Release);
}
Ok(())
}
pub fn send(&self, packet: Packet) -> Result<()> {
if !self.is_engineio_connected()? || !self.connected.load(Ordering::Acquire) {
return Err(Error::IllegalActionBeforeOpen());
}
let engine_packet = EnginePacket::new(EnginePacketId::Message, Bytes::from(&packet));
self.engine_client.emit(engine_packet)?;
if let Some(attachments) = packet.attachments {
for attachment in attachments {
let engine_packet = EnginePacket::new(EnginePacketId::MessageBinary, attachment);
self.engine_client.emit(engine_packet)?;
}
}
Ok(())
}
pub fn emit(&self, nsp: &str, event: Event, data: Payload) -> Result<()> {
let socket_packet = self.build_packet_for_payload(data, event, nsp, None)?;
self.send(socket_packet)
}
#[inline]
pub(crate) fn build_packet_for_payload<'a>(
&'a self,
payload: Payload,
event: Event,
nsp: &'a str,
id: Option<i32>,
) -> Result<Packet> {
match payload {
Payload::Binary(bin_data) => Ok(Packet::new(
if id.is_some() {
PacketId::BinaryAck
} else {
PacketId::BinaryEvent
},
nsp.to_owned(),
Some(serde_json::Value::String(event.into()).to_string()),
id,
1,
Some(vec![bin_data]),
)),
Payload::String(str_data) => {
let payload = if serde_json::from_str::<serde_json::Value>(&str_data).is_ok() {
format!("[\"{}\",{}]", String::from(event), str_data)
} else {
format!("[\"{}\",\"{}\"]", String::from(event), str_data)
};
Ok(Packet::new(
PacketId::Event,
nsp.to_owned(),
Some(payload),
id,
0,
None,
))
}
}
}
pub(crate) fn poll(&self) -> Result<Option<Packet>> {
loop {
match self.engine_client.poll() {
Ok(Some(packet)) => {
if packet.packet_id == EnginePacketId::Message
|| packet.packet_id == EnginePacketId::MessageBinary
{
let packet = self.handle_engineio_packet(packet)?;
self.handle_socketio_packet(&packet);
return Ok(Some(packet));
} else {
continue;
}
}
Ok(None) => {
return Ok(None);
}
Err(err) => return Err(err.into()),
}
}
}
#[inline]
fn handle_socketio_packet(&self, socket_packet: &Packet) {
match socket_packet.packet_type {
PacketId::Connect => {
self.connected.store(true, Ordering::Release);
}
PacketId::ConnectError => {
self.connected.store(false, Ordering::Release);
}
PacketId::Disconnect => {
self.connected.store(false, Ordering::Release);
}
_ => (),
}
}
fn handle_engineio_packet(&self, packet: EnginePacket) -> Result<Packet> {
let mut socket_packet = Packet::try_from(&packet.data)?;
if socket_packet.attachment_count > 0 {
let mut attachments_left = socket_packet.attachment_count;
let mut attachments = Vec::new();
while attachments_left > 0 {
let next = self.engine_client.poll();
match next {
Err(err) => return Err(err.into()),
Ok(Some(packet)) => match packet.packet_id {
EnginePacketId::MessageBinary | EnginePacketId::Message => {
attachments.push(packet.data);
attachments_left -= 1;
}
_ => {
return Err(Error::InvalidAttachmentPacketType(
packet.packet_id.into(),
));
}
},
Ok(None) => {
return Err(Error::IncompletePacket());
}
}
}
socket_packet.attachments = Some(attachments);
}
Ok(socket_packet)
}
fn is_engineio_connected(&self) -> Result<bool> {
Ok(self.engine_client.is_connected()?)
}
}