use super::common::{channel_to_io_error, parse_tcp_endpoint};
use bytes::Bytes;
use compio::net::TcpStream;
use monocoque_core::monitor::{create_monitor, SocketEvent, SocketEventSender, SocketMonitor};
use monocoque_zmtp::dealer::DealerSocket as InternalDealer;
use std::io;
pub struct DealerSocket<S = TcpStream>
where
S: compio::io::AsyncRead + compio::io::AsyncWrite + Unpin,
{
inner: InternalDealer<S>,
monitor: Option<SocketEventSender>,
}
impl DealerSocket {
pub async fn connect(endpoint: &str) -> io::Result<Self> {
let addr = parse_tcp_endpoint(endpoint)?;
let inner = InternalDealer::connect_with_options(
addr,
monocoque_core::options::SocketOptions::default(),
)
.await?;
let sock = Self {
inner,
monitor: None,
};
sock.emit_event(SocketEvent::Connected(
monocoque_core::endpoint::Endpoint::Tcp(addr),
));
Ok(sock)
}
pub async fn connect_with_options(
endpoint: &str,
options: monocoque_core::options::SocketOptions,
) -> io::Result<Self> {
let addr = parse_tcp_endpoint(endpoint)?;
let inner = InternalDealer::connect_with_options(addr, options).await?;
let sock = Self {
inner,
monitor: None,
};
sock.emit_event(SocketEvent::Connected(
monocoque_core::endpoint::Endpoint::Tcp(addr),
));
Ok(sock)
}
#[cfg(unix)]
pub async fn connect_ipc(path: &str) -> io::Result<DealerSocket<compio::net::UnixStream>> {
use std::path::PathBuf;
let clean_path = path.strip_prefix("ipc://").unwrap_or(path);
let ipc_path = PathBuf::from(clean_path);
let stream = monocoque_core::ipc::connect(&ipc_path).await?;
let sock = DealerSocket::from_unix_stream(stream).await?;
sock.emit_event(SocketEvent::Connected(
monocoque_core::endpoint::Endpoint::Ipc(ipc_path),
));
Ok(sock)
}
pub async fn bind(
addr: impl compio::net::ToSocketAddrsAsync,
) -> io::Result<(compio::net::TcpListener, Self)> {
let listener = compio::net::TcpListener::bind(addr).await?;
let (stream, _) = listener.accept().await?;
let socket = Self::from_tcp(stream).await?;
Ok((listener, socket))
}
pub async fn from_tcp(stream: TcpStream) -> io::Result<Self> {
Ok(Self {
inner: InternalDealer::from_tcp(stream).await?,
monitor: None,
})
}
pub async fn from_tcp_with_options(
stream: TcpStream,
options: monocoque_core::options::SocketOptions,
) -> io::Result<Self> {
Ok(Self {
inner: InternalDealer::from_tcp_with_options(stream, options).await?,
monitor: None,
})
}
pub async fn with_options<Stream>(
stream: Stream,
options: monocoque_core::options::SocketOptions,
) -> io::Result<DealerSocket<Stream>>
where
Stream: compio::io::AsyncRead + compio::io::AsyncWrite + Unpin,
{
Ok(DealerSocket {
inner: InternalDealer::with_options(stream, options).await?,
monitor: None,
})
}
}
impl<S> DealerSocket<S>
where
S: compio::io::AsyncRead + compio::io::AsyncWrite + Unpin,
{
pub fn monitor(&mut self) -> SocketMonitor {
let (sender, receiver) = create_monitor();
self.monitor = Some(sender);
receiver
}
fn emit_event(&self, event: SocketEvent) {
if let Some(monitor) = &self.monitor {
let _ = monitor.send(event); }
}
pub async fn send(&mut self, msg: Vec<Bytes>) -> io::Result<()> {
channel_to_io_error(self.inner.send(msg).await)
}
pub fn send_buffered(&mut self, msg: Vec<Bytes>) -> io::Result<()> {
channel_to_io_error(self.inner.send_buffered(msg))
}
pub async fn flush(&mut self) -> io::Result<()> {
channel_to_io_error(self.inner.flush().await)
}
pub async fn send_batch(&mut self, messages: &[Vec<Bytes>]) -> io::Result<()> {
channel_to_io_error(self.inner.send_batch(messages).await)
}
#[inline]
pub fn buffered_bytes(&self) -> usize {
self.inner.buffered_bytes()
}
pub fn events(&self) -> u32 {
self.inner.events()
}
pub async fn recv(&mut self) -> io::Result<Option<Vec<Bytes>>> {
self.inner.recv().await
}
}
impl<S> DealerSocket<S>
where
S: compio::io::AsyncRead + compio::io::AsyncWrite + Unpin,
{
#[inline]
pub const fn socket_type(&self) -> monocoque_zmtp::session::SocketType {
self.inner.socket_type()
}
#[inline]
pub fn last_endpoint(&self) -> Option<&str> {
self.inner.last_endpoint_string()
}
#[inline]
pub fn has_more(&self) -> bool {
self.inner.has_more()
}
#[inline]
pub fn options_mut(&mut self) -> &mut monocoque_core::options::SocketOptions {
self.inner.options_mut()
}
#[inline]
pub const fn options(&self) -> &monocoque_core::options::SocketOptions {
self.inner.options()
}
}
#[cfg(unix)]
impl DealerSocket<compio::net::UnixStream> {
pub async fn from_unix_stream(stream: compio::net::UnixStream) -> io::Result<Self> {
Ok(Self {
inner: InternalDealer::new(stream).await?,
monitor: None,
})
}
pub async fn from_unix_stream_with_options(
stream: compio::net::UnixStream,
options: monocoque_core::options::SocketOptions,
) -> io::Result<Self> {
Ok(Self {
inner: InternalDealer::with_options(stream, options).await?,
monitor: None,
})
}
}
impl monocoque_zmtp::proxy::ProxySocket for DealerSocket<TcpStream> {
fn recv_multipart<'life0, 'async_trait>(
&'life0 mut self,
) -> ::core::pin::Pin<
Box<dyn ::core::future::Future<Output = io::Result<Option<Vec<Bytes>>>> + 'async_trait>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
Box::pin(async move { self.recv().await })
}
fn send_multipart<'life0, 'async_trait>(
&'life0 mut self,
msg: Vec<Bytes>,
) -> ::core::pin::Pin<Box<dyn ::core::future::Future<Output = io::Result<()>> + 'async_trait>>
where
'life0: 'async_trait,
Self: 'async_trait,
{
Box::pin(async move { self.send(msg).await })
}
fn socket_desc(&self) -> &'static str {
"DEALER"
}
}