use bytes::Bytes;
use compio::net::TcpListener;
use monocoque_core::monitor::{create_monitor, SocketEventSender, SocketMonitor};
use monocoque_core::options::SocketOptions;
use monocoque_zmtp::publisher::PubSocket as InternalPub;
use monocoque_zmtp::SocketType;
use std::io;
pub struct PubSocket {
inner: InternalPub,
listener: TcpListener,
monitor: Option<SocketEventSender>,
}
impl PubSocket {
pub async fn bind(addr: impl compio::net::ToSocketAddrsAsync) -> io::Result<Self> {
let listener = TcpListener::bind(addr).await?;
Ok(Self {
inner: InternalPub::new(),
listener,
monitor: None,
})
}
pub async fn bind_with_workers(
addr: impl compio::net::ToSocketAddrsAsync,
worker_count: usize,
) -> io::Result<Self> {
let listener = TcpListener::bind(addr).await?;
Ok(Self {
inner: InternalPub::with_workers(worker_count),
listener,
monitor: None,
})
}
pub async fn accept_subscriber(&mut self) -> io::Result<u64> {
self.inner.accept_subscriber(&self.listener).await
}
pub async fn send(&mut self, msg: Vec<Bytes>) -> io::Result<()> {
self.inner.send(msg).await
}
pub const fn subscriber_count(&self) -> usize {
self.inner.subscriber_count()
}
pub fn local_addr(&self) -> io::Result<std::net::SocketAddr> {
self.listener.local_addr()
}
#[inline]
pub const fn socket_type() -> SocketType {
SocketType::Pub
}
pub fn monitor(&mut self) -> SocketMonitor {
let (sender, receiver) = create_monitor();
self.monitor = Some(sender);
receiver
}
#[inline]
pub fn options_mut(&mut self) -> &mut SocketOptions {
self.inner.options_mut()
}
#[inline]
pub fn drop_count(&self) -> u64 {
self.inner.drop_count()
}
}