use super::common::channel_to_io_error;
use bytes::Bytes;
use compio::net::TcpStream;
use monocoque_core::monitor::{create_monitor, SocketEvent, SocketEventSender, SocketMonitor};
use monocoque_zmtp::req::ReqSocket as InternalReq;
use monocoque_zmtp::SocketType;
use std::io;
pub struct ReqSocket<S = TcpStream>
where
S: compio::io::AsyncRead + compio::io::AsyncWrite + Unpin,
{
inner: InternalReq<S>,
monitor: Option<SocketEventSender>,
}
impl ReqSocket {
pub async fn connect(endpoint: &str) -> io::Result<Self> {
let addr = if let Ok(monocoque_core::endpoint::Endpoint::Tcp(a)) =
monocoque_core::endpoint::Endpoint::parse(endpoint)
{
a
} else {
endpoint
.parse::<std::net::SocketAddr>()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?
};
let sock = Self {
inner: InternalReq::connect(addr).await?,
monitor: None,
};
sock.emit_event(SocketEvent::Connected(
monocoque_core::endpoint::Endpoint::Tcp(addr),
));
Ok(sock)
}
#[inline]
pub fn is_connected(&self) -> bool {
self.inner.is_connected()
}
pub async fn try_reconnect(&mut self) -> io::Result<()> {
self.inner.try_reconnect().await
}
pub async fn send_with_reconnect(&mut self, msg: Vec<bytes::Bytes>) -> io::Result<()> {
self.inner.send_with_reconnect(msg).await
}
pub async fn recv_with_reconnect(&mut self) -> io::Result<Option<Vec<bytes::Bytes>>> {
self.inner.recv_with_reconnect().await
}
pub async fn connect_with_options(
endpoint: &str,
options: monocoque_core::options::SocketOptions,
) -> io::Result<Self> {
let addr = if let Ok(monocoque_core::endpoint::Endpoint::Tcp(a)) =
monocoque_core::endpoint::Endpoint::parse(endpoint)
{
a
} else {
endpoint
.parse::<std::net::SocketAddr>()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?
};
let stream = TcpStream::connect(addr).await?;
let sock = Self::from_tcp_with_options(stream, options).await?;
sock.emit_event(SocketEvent::Connected(
monocoque_core::endpoint::Endpoint::Tcp(addr),
));
Ok(sock)
}
#[cfg(unix)]
pub async fn connect_ipc(path: &str) -> io::Result<ReqSocket<compio::net::UnixStream>> {
use std::path::PathBuf;
let clean_path = path.strip_prefix("ipc://").unwrap_or(path);
let ipc_path = PathBuf::from(clean_path);
let stream = monocoque_core::ipc::connect(&ipc_path).await?;
let sock = ReqSocket::from_unix_stream(stream).await?;
sock.emit_event(SocketEvent::Connected(
monocoque_core::endpoint::Endpoint::Ipc(ipc_path),
));
Ok(sock)
}
pub async fn from_tcp(stream: TcpStream) -> io::Result<Self> {
Ok(Self {
inner: InternalReq::from_tcp(stream).await?,
monitor: None,
})
}
pub async fn from_tcp_with_options(
stream: TcpStream,
options: monocoque_core::options::SocketOptions,
) -> io::Result<Self> {
Ok(Self {
inner: InternalReq::from_tcp_with_options(stream, options).await?,
monitor: None,
})
}
pub async fn with_options<Stream>(
stream: Stream,
options: monocoque_core::options::SocketOptions,
) -> io::Result<ReqSocket<Stream>>
where
Stream: compio::io::AsyncRead + compio::io::AsyncWrite + Unpin,
{
Ok(ReqSocket {
inner: InternalReq::with_options(stream, options).await?,
monitor: None,
})
}
}
impl<S> ReqSocket<S>
where
S: compio::io::AsyncRead + compio::io::AsyncWrite + Unpin,
{
pub fn monitor(&mut self) -> SocketMonitor {
let (sender, receiver) = create_monitor();
self.monitor = Some(sender);
receiver
}
fn emit_event(&self, event: SocketEvent) {
if let Some(monitor) = &self.monitor {
let _ = monitor.send(event);
}
}
pub async fn send(&mut self, msg: Vec<Bytes>) -> io::Result<()> {
channel_to_io_error(self.inner.send(msg).await)
}
#[inline]
pub const fn socket_type() -> SocketType {
SocketType::Req
}
#[inline]
pub fn last_endpoint(&self) -> Option<&monocoque_core::endpoint::Endpoint> {
self.inner.last_endpoint()
}
#[inline]
pub fn has_more(&self) -> bool {
self.inner.has_more()
}
#[inline]
pub fn events(&self) -> u32 {
self.inner.events()
}
pub async fn recv(&mut self) -> io::Result<Option<Vec<Bytes>>> {
self.inner.recv().await
}
pub const fn options(&self) -> &monocoque_core::options::SocketOptions {
self.inner.options()
}
pub fn options_mut(&mut self) -> &mut monocoque_core::options::SocketOptions {
self.inner.options_mut()
}
pub fn set_options(&mut self, options: monocoque_core::options::SocketOptions) {
self.inner.set_options(options);
}
}
#[cfg(unix)]
impl ReqSocket<compio::net::UnixStream> {
pub async fn from_unix_stream(stream: compio::net::UnixStream) -> io::Result<Self> {
Ok(Self {
inner: InternalReq::new(stream).await?,
monitor: None,
})
}
pub async fn from_unix_stream_with_options(
stream: compio::net::UnixStream,
options: monocoque_core::options::SocketOptions,
) -> io::Result<Self> {
Ok(Self {
inner: InternalReq::with_options(stream, options).await?,
monitor: None,
})
}
}