use compio::net::{TcpListener, TcpStream};
use monocoque_core::monitor::{create_monitor, SocketEventSender, SocketMonitor};
use monocoque_core::options::SocketOptions;
use monocoque_zmtp::PushSocket as InternalPush;
use std::io;
pub struct PushSocket<S = TcpStream>
where
S: compio::io::AsyncRead + compio::io::AsyncWrite + Unpin,
{
inner: InternalPush<S>,
monitor: Option<SocketEventSender>,
}
impl PushSocket<TcpStream> {
pub async fn bind(
addr: impl compio::net::ToSocketAddrsAsync,
) -> io::Result<(TcpListener, Self)> {
let listener = TcpListener::bind(addr).await?;
let (stream, _) = listener.accept().await?;
let socket = Self::from_tcp(stream).await?;
Ok((listener, socket))
}
pub async fn connect(addr: impl compio::net::ToSocketAddrsAsync) -> io::Result<Self> {
Ok(Self {
inner: InternalPush::connect(addr).await?,
monitor: None,
})
}
pub async fn connect_with_options(
addr: impl compio::net::ToSocketAddrsAsync,
options: SocketOptions,
) -> io::Result<Self> {
Ok(Self {
inner: InternalPush::connect_with_options(addr, options).await?,
monitor: None,
})
}
#[inline]
pub fn is_connected(&self) -> bool {
self.inner.is_connected()
}
pub async fn try_reconnect(&mut self) -> io::Result<()> {
self.inner.try_reconnect().await
}
pub async fn send_with_reconnect(&mut self, msg: Vec<bytes::Bytes>) -> io::Result<()> {
self.inner.send_with_reconnect(msg).await
}
pub async fn from_tcp(stream: TcpStream) -> io::Result<Self> {
Ok(Self {
inner: InternalPush::from_tcp(stream).await?,
monitor: None,
})
}
pub async fn from_tcp_with_options(
stream: TcpStream,
options: SocketOptions,
) -> io::Result<Self> {
Ok(Self {
inner: InternalPush::from_tcp_with_options(stream, options).await?,
monitor: None,
})
}
}
impl<S> PushSocket<S>
where
S: compio::io::AsyncRead + compio::io::AsyncWrite + Unpin,
{
pub async fn new(stream: S) -> io::Result<Self> {
Ok(Self {
inner: InternalPush::new(stream).await?,
monitor: None,
})
}
pub async fn with_options(stream: S, options: SocketOptions) -> io::Result<Self> {
Ok(Self {
inner: InternalPush::with_options(stream, options).await?,
monitor: None,
})
}
pub async fn send(&mut self, msg: Vec<bytes::Bytes>) -> io::Result<()> {
self.inner.send(msg).await
}
pub fn monitor(&mut self) -> SocketMonitor {
let (sender, receiver) = create_monitor();
self.monitor = Some(sender);
receiver
}
#[inline]
pub fn options_mut(&mut self) -> &mut SocketOptions {
self.inner.options_mut()
}
}
#[cfg(unix)]
impl PushSocket<compio::net::UnixStream> {
pub async fn from_unix_stream(stream: compio::net::UnixStream) -> io::Result<Self> {
Ok(Self {
inner: InternalPush::new(stream).await?,
monitor: None,
})
}
pub async fn from_unix_stream_with_options(
stream: compio::net::UnixStream,
options: SocketOptions,
) -> io::Result<Self> {
Ok(Self {
inner: InternalPush::with_options(stream, options).await?,
monitor: None,
})
}
}