use bytes::Bytes;
use compio::net::TcpStream;
use monocoque_core::monitor::{create_monitor, SocketEvent, SocketEventSender, SocketMonitor};
use monocoque_core::options::SocketOptions;
use monocoque_zmtp::subscriber::SubSocket as InternalSub;
use monocoque_zmtp::SocketType;
use std::io;
pub struct SubSocket<S = TcpStream>
where
S: compio::io::AsyncRead + compio::io::AsyncWrite + Unpin,
{
inner: InternalSub<S>,
monitor: Option<SocketEventSender>,
}
impl SubSocket {
pub async fn connect(endpoint: &str) -> io::Result<Self> {
let addr = if let Ok(monocoque_core::endpoint::Endpoint::Tcp(a)) =
monocoque_core::endpoint::Endpoint::parse(endpoint)
{
a
} else {
endpoint
.parse::<std::net::SocketAddr>()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?
};
let sock = Self {
inner: InternalSub::connect(addr).await?,
monitor: None,
};
sock.emit_event(SocketEvent::Connected(
monocoque_core::endpoint::Endpoint::Tcp(addr),
));
Ok(sock)
}
#[inline]
pub fn is_connected(&self) -> bool {
self.inner.is_connected()
}
pub async fn try_reconnect(&mut self) -> io::Result<()> {
self.inner.try_reconnect().await
}
pub async fn recv_with_reconnect(&mut self) -> io::Result<Option<Vec<bytes::Bytes>>> {
self.inner.recv_with_reconnect().await
}
#[cfg(unix)]
pub async fn connect_ipc(path: &str) -> io::Result<SubSocket<compio::net::UnixStream>> {
use std::path::PathBuf;
let clean_path = path.strip_prefix("ipc://").unwrap_or(path);
let ipc_path = PathBuf::from(clean_path);
let stream = monocoque_core::ipc::connect(&ipc_path).await?;
let sock = SubSocket::from_unix_stream(stream).await?;
sock.emit_event(SocketEvent::Connected(
monocoque_core::endpoint::Endpoint::Ipc(ipc_path),
));
Ok(sock)
}
pub async fn from_tcp(stream: TcpStream) -> io::Result<Self> {
Ok(Self {
inner: InternalSub::from_tcp(stream).await?,
monitor: None,
})
}
pub async fn from_tcp_with_options(
stream: TcpStream,
options: monocoque_core::options::SocketOptions,
) -> io::Result<Self> {
Ok(Self {
inner: InternalSub::from_tcp_with_options(stream, options).await?,
monitor: None,
})
}
pub async fn with_options<Stream>(
stream: Stream,
options: monocoque_core::options::SocketOptions,
) -> io::Result<SubSocket<Stream>>
where
Stream: compio::io::AsyncRead + compio::io::AsyncWrite + Unpin,
{
Ok(SubSocket {
inner: InternalSub::with_options(stream, options).await?,
monitor: None,
})
}
}
impl<S> SubSocket<S>
where
S: compio::io::AsyncRead + compio::io::AsyncWrite + Unpin,
{
pub fn monitor(&mut self) -> SocketMonitor {
let (sender, receiver) = create_monitor();
self.monitor = Some(sender);
receiver
}
fn emit_event(&self, event: SocketEvent) {
if let Some(monitor) = &self.monitor {
let _ = monitor.send(event);
}
}
pub async fn subscribe(&mut self, topic: &[u8]) -> io::Result<()> {
self.inner.subscribe(Bytes::copy_from_slice(topic)).await
}
pub async fn unsubscribe(&mut self, topic: &[u8]) -> io::Result<()> {
self.inner.unsubscribe(&Bytes::copy_from_slice(topic)).await
}
pub async fn recv(&mut self) -> io::Result<Option<Vec<Bytes>>> {
self.inner.recv().await
}
#[inline]
pub const fn socket_type() -> SocketType {
SocketType::Sub
}
#[inline]
pub fn last_endpoint(&self) -> Option<&monocoque_core::endpoint::Endpoint> {
self.inner.last_endpoint()
}
#[inline]
pub fn has_more(&self) -> bool {
self.inner.has_more()
}
#[inline]
pub fn events(&self) -> u32 {
self.inner.events()
}
#[inline]
pub fn options_mut(&mut self) -> &mut SocketOptions {
self.inner.options_mut()
}
}
#[cfg(unix)]
impl SubSocket<compio::net::UnixStream> {
pub async fn from_unix_stream(stream: compio::net::UnixStream) -> io::Result<Self> {
Ok(Self {
inner: InternalSub::new(stream).await?,
monitor: None,
})
}
pub async fn from_unix_stream_with_options(
stream: compio::net::UnixStream,
options: monocoque_core::options::SocketOptions,
) -> io::Result<Self> {
Ok(Self {
inner: InternalSub::with_options(stream, options).await?,
monitor: None,
})
}
}