use zmq::Context as ZmqContext;
use crate::{poll::ZmqPoller, socket::AsZmqSocket, FromZmqSocket, Receiver, SocketBuilder};
pub fn subscribe(context: &ZmqContext) -> SocketBuilder<SubscribeWithoutTopic> {
SocketBuilder::new(context, zmq::SocketType::SUB)
}
pub struct SubscribeWithoutTopic {
socket: zmq::Socket,
}
impl FromZmqSocket<SubscribeWithoutTopic> for SubscribeWithoutTopic {
fn from_zmq_socket(socket: zmq::Socket) -> crate::Result<Self> {
Ok(Self { socket })
}
}
impl SubscribeWithoutTopic {
pub fn subscribe(self, topic: &[u8]) -> crate::Result<Subscribe> {
self.socket.set_subscribe(topic)?;
Ok(Subscribe {
inner: Receiver::new(ZmqPoller::from_zmq_socket(self.socket)?),
})
}
}
pub struct Subscribe {
inner: Receiver,
}
impl_wrapper!(Subscribe, Receiver, inner);
impl_wrapper_stream!(Subscribe, inner);
impl Subscribe {
pub fn subscribe(&mut self, topic: &[u8]) -> crate::Result<()> {
self.get_socket().set_subscribe(topic)?;
Ok(())
}
pub fn unsubscribe(&mut self, topic: &[u8]) -> crate::Result<()> {
self.get_socket().set_unsubscribe(topic)?;
Ok(())
}
}