use crate::ack::AckType;
use crate::config::ChannelConfig;
use crate::error::ManagerError;
use crate::error::{ClientBuilderError, ClientError, PayloadError, SocketError};
use crate::manager::{DirectiveSender, Manager, ManagerAction, message_sink};
use crate::marker::{AckId, AckMarker, BinaryMarker};
use crate::packet::{Directive, DynEvent, Signal};
use bytestring::ByteString;
use eioc::engine::Engine;
use eioc::transport::TransportStrategy;
use eioc::websocket::WebSocketConnector;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use url::Url;
pub trait Emit<A, B>
where
A: AckMarker,
B: BinaryMarker,
{
type Output;
fn prepare(self) -> Result<(Directive, Self::Output), PayloadError>;
}
pub trait Acknowledge<A, B>
where
A: AckType,
B: BinaryMarker,
{
fn into_directive(self, id: u64) -> Result<Directive, PayloadError>;
}
pub struct ClientBuilder<C = ()> {
url: Url,
path: String,
http_client: Option<reqwest::Client>,
websocket_connector: C,
transport_strategy: TransportStrategy,
channels: ChannelConfig,
}
impl ClientBuilder<()> {
pub fn new(url: impl Into<Url>) -> Self {
Self {
url: url.into(),
path: "socket.io/".to_string(),
http_client: None,
websocket_connector: (),
transport_strategy: TransportStrategy::default(),
channels: ChannelConfig::default(),
}
}
}
impl<C> ClientBuilder<C>
where
C: WebSocketConnector,
{
pub fn path(mut self, path: impl Into<String>) -> Self {
self.path = path.into();
self
}
pub fn http_client(mut self, client: reqwest::Client) -> Self {
self.http_client = Some(client);
self
}
pub fn websocket_connector<C2>(self, connector: C2) -> ClientBuilder<C2>
where
C2: WebSocketConnector,
{
ClientBuilder {
url: self.url,
path: self.path,
http_client: self.http_client,
websocket_connector: connector,
transport_strategy: self.transport_strategy,
channels: self.channels,
}
}
pub fn transport(mut self, strategy: TransportStrategy) -> Self {
self.transport_strategy = strategy;
self
}
pub fn channels(mut self, config: impl Into<ChannelConfig>) -> Self {
self.channels = config.into();
self
}
#[must_use = "dropping the Client stops the background tasks"]
pub fn open(self) -> Result<Client, ClientBuilderError> {
let http_client = self.http_client.unwrap_or_default();
let websocket_connector = self.websocket_connector;
let url = self.url.join(&self.path)?;
let (manager_tx, manager_rx) = mpsc::channel::<ManagerAction>(self.channels.manager);
let engine = Engine::connect(
url,
http_client,
websocket_connector,
self.transport_strategy,
message_sink(manager_tx.clone()),
self.channels.engine,
self.channels.transport,
);
let manager = Manager::new(manager_rx);
let manager_handle = tokio::spawn(manager.socket_io(engine));
Ok(Client {
tx: DirectiveSender::new(manager_tx),
handle: manager_handle,
socket_capacity: self.channels.socket,
})
}
}
#[derive(Debug)]
pub struct Client {
tx: DirectiveSender,
handle: JoinHandle<Result<(), ManagerError>>,
socket_capacity: usize,
}
impl Client {
pub fn builder(url: impl Into<Url>) -> ClientBuilder {
ClientBuilder::new(url)
}
pub async fn connect<S>(&self, ns: S) -> Result<(SocketSender, SocketReceiver), SocketError>
where
S: Into<ByteString>,
{
self.connect_with(ns, ByteString::new()).await
}
pub async fn connect_with<S, B>(
&self,
ns: S,
payload: B,
) -> Result<(SocketSender, SocketReceiver), SocketError>
where
S: Into<ByteString>,
B: Into<ByteString>,
{
let (tx, rx) = mpsc::channel(self.socket_capacity);
let socket_tx = SocketSender::new(ns.into(), self.tx.clone());
let socket_rx = SocketReceiver { rx };
let directive = Directive::Connect {
tx,
payload: payload.into(),
};
socket_tx.0.send(directive).await?;
Ok((socket_tx, socket_rx))
}
pub async fn join(self) -> Result<(), ClientError> {
drop(self.tx);
self.handle.await??;
Ok(())
}
}
#[derive(Debug)]
struct SocketSenderInner {
ns: ByteString,
tx: DirectiveSender,
disconnected: AtomicBool,
}
impl SocketSenderInner {
async fn send(&self, directive: Directive) -> Result<(), SocketError> {
self.tx
.send(self.ns.clone(), directive)
.await
.map_err(SocketError::Send)
}
}
impl Drop for SocketSenderInner {
fn drop(&mut self) {
if !self.disconnected.swap(true, Ordering::Relaxed) {
let _ = self.tx.try_send(self.ns.clone(), Directive::Dropped);
}
}
}
#[derive(Clone, Debug)]
pub struct SocketSender(Arc<SocketSenderInner>);
impl SocketSender {
fn new(ns: ByteString, tx: DirectiveSender) -> Self {
Self(Arc::new(SocketSenderInner {
ns,
tx,
disconnected: AtomicBool::new(false),
}))
}
pub async fn emit<E, A, B>(&self, event: E) -> Result<E::Output, SocketError>
where
E: Emit<A, B>,
A: AckMarker,
B: BinaryMarker,
{
let (directive, output) = event.prepare()?;
self.0.send(directive).await?;
Ok(output)
}
pub async fn acknowledge<T, A, B>(&self, id: AckId<A>, payload: T) -> Result<(), SocketError>
where
T: Acknowledge<A, B>,
A: AckType,
B: BinaryMarker,
{
let directive = payload.into_directive(id.get())?;
self.0.send(directive).await
}
pub async fn disconnect(&self) -> Result<(), SocketError> {
if !self.0.disconnected.swap(true, Ordering::Relaxed) {
self.0.send(Directive::Disconnect).await?;
}
Ok(())
}
}
#[derive(Debug)]
pub struct SocketReceiver {
rx: mpsc::Receiver<Signal>,
}
impl SocketReceiver {
pub async fn listen<E>(&mut self) -> Result<Option<E>, E::Error>
where
E: TryFrom<DynEvent>,
{
loop {
match self.rx.recv().await {
None => return Ok(None),
Some(Signal::Event(e)) => return E::try_from(e).map(Some),
Some(_) => continue,
}
}
}
}
impl std::ops::Deref for SocketReceiver {
type Target = mpsc::Receiver<Signal>;
fn deref(&self) -> &Self::Target {
&self.rx
}
}
impl std::ops::DerefMut for SocketReceiver {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.rx
}
}