use super::common::channel_to_io_error;
use bytes::Bytes;
use compio::net::{TcpListener, TcpStream};
use monocoque_core::monitor::{create_monitor, SocketEventSender, SocketMonitor};
use monocoque_core::options::SocketOptions;
use monocoque_zmtp::router::RouterSocket as InternalRouter;
use monocoque_zmtp::SocketType;
use std::io;
pub struct RouterSocket<S = TcpStream>
where
S: compio::io::AsyncRead + compio::io::AsyncWrite + Unpin,
{
inner: InternalRouter<S>,
monitor: Option<SocketEventSender>,
}
impl RouterSocket {
pub async fn bind(
addr: impl compio::net::ToSocketAddrsAsync,
) -> io::Result<(TcpListener, Self)> {
let listener = TcpListener::bind(addr).await?;
let (stream, _) = listener.accept().await?;
let socket = Self::from_tcp(stream).await?;
Ok((listener, socket))
}
#[deprecated(
since = "0.1.0",
note = "Use `from_tcp()` instead to enable TCP_NODELAY"
)]
pub async fn from_stream(stream: TcpStream) -> io::Result<Self> {
Ok(Self {
inner: InternalRouter::new(stream).await?,
monitor: None,
})
}
pub async fn from_tcp(stream: TcpStream) -> io::Result<Self> {
Ok(Self {
inner: InternalRouter::from_tcp(stream).await?,
monitor: None,
})
}
pub async fn from_tcp_with_options(
stream: TcpStream,
options: monocoque_core::options::SocketOptions,
) -> io::Result<Self> {
Ok(Self {
inner: InternalRouter::from_tcp_with_options(stream, options).await?,
monitor: None,
})
}
pub async fn with_options<Stream>(
stream: Stream,
options: monocoque_core::options::SocketOptions,
) -> io::Result<RouterSocket<Stream>>
where
Stream: compio::io::AsyncRead + compio::io::AsyncWrite + Unpin,
{
Ok(RouterSocket {
inner: InternalRouter::with_options(stream, options).await?,
monitor: None,
})
}
}
impl<S> RouterSocket<S>
where
S: compio::io::AsyncRead + compio::io::AsyncWrite + Unpin,
{
pub fn monitor(&mut self) -> SocketMonitor {
let (sender, receiver) = create_monitor();
self.monitor = Some(sender);
receiver
}
#[inline]
pub fn options_mut(&mut self) -> &mut SocketOptions {
self.inner.options_mut()
}
pub async fn send(&mut self, msg: Vec<Bytes>) -> io::Result<()> {
channel_to_io_error(self.inner.send(msg).await)
}
pub fn send_buffered(&mut self, msg: Vec<Bytes>) -> io::Result<()> {
channel_to_io_error(self.inner.send_buffered(msg))
}
pub async fn flush(&mut self) -> io::Result<()> {
channel_to_io_error(self.inner.flush().await)
}
pub async fn send_batch(&mut self, messages: &[Vec<Bytes>]) -> io::Result<()> {
channel_to_io_error(self.inner.send_batch(messages).await)
}
#[inline]
pub fn buffered_bytes(&self) -> usize {
self.inner.buffered_bytes()
}
#[inline]
pub const fn socket_type() -> SocketType {
SocketType::Router
}
#[inline]
pub fn last_endpoint(&self) -> Option<&monocoque_core::endpoint::Endpoint> {
self.inner.last_endpoint()
}
#[inline]
pub fn has_more(&self) -> bool {
self.inner.has_more()
}
#[inline]
pub fn events(&self) -> u32 {
self.inner.events()
}
pub fn set_connect_routing_id(&mut self, id: Vec<u8>) -> io::Result<()> {
monocoque_core::options::SocketOptions::validate_router_identity(&id)?;
self.inner.options_mut().connect_routing_id = Some(Bytes::from(id));
Ok(())
}
pub fn set_router_mandatory(&mut self, enabled: bool) {
self.inner.options_mut().router_mandatory = enabled;
}
pub fn set_router_handover(&mut self, enabled: bool) {
self.inner.options_mut().router_handover = enabled;
}
pub const fn peer_identity(&self) -> &Bytes {
self.inner.peer_identity()
}
pub async fn recv(&mut self) -> io::Result<Option<Vec<Bytes>>> {
self.inner.recv().await
}
}
#[cfg(unix)]
impl RouterSocket<compio::net::UnixStream> {
pub async fn from_unix_stream(stream: compio::net::UnixStream) -> io::Result<Self> {
Ok(Self {
inner: InternalRouter::new(stream).await?,
monitor: None,
})
}
pub async fn from_unix_stream_with_options(
stream: compio::net::UnixStream,
options: monocoque_core::options::SocketOptions,
) -> io::Result<Self> {
Ok(Self {
inner: InternalRouter::with_options(stream, options).await?,
monitor: None,
})
}
}
impl monocoque_zmtp::proxy::ProxySocket for RouterSocket<TcpStream> {
fn recv_multipart<'life0, 'async_trait>(
&'life0 mut self,
) -> ::core::pin::Pin<
Box<dyn ::core::future::Future<Output = io::Result<Option<Vec<Bytes>>>> + 'async_trait>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
Box::pin(async move { self.recv().await })
}
fn send_multipart<'life0, 'async_trait>(
&'life0 mut self,
msg: Vec<Bytes>,
) -> ::core::pin::Pin<Box<dyn ::core::future::Future<Output = io::Result<()>> + 'async_trait>>
where
'life0: 'async_trait,
Self: 'async_trait,
{
Box::pin(async move { self.send(msg).await })
}
fn socket_desc(&self) -> &'static str {
"ROUTER"
}
}