async_zmq/xsubscribe.rs
1//! XSUB socket module of Pub-Sub pattern in ZMQ
2//!
3//! Use the [`xsubscribe`] function to instantiate an xsubscribe socket and use
4//! methods from the [`Stream`]/[`StreamExt`] trait.
5//!
6//! An xsubscribe socket must be paired with a [`publish`] or [`xpublish`] socket.
7//!
8//! # Example
9//!
10//! ```no_run
11//! use async_zmq::{Result, StreamExt};
12//!
13//! #[async_std::main]
14//! async fn main() -> Result<()> {
15//! let mut zmq = async_zmq::xsubscribe("tcp://127.0.0.1:5555")?.bind()?;
16//!
17//! // Subscribe the topic you want to listen.
18//! // Users can subscribe multiple topics and even unsubscribe later.
19//! zmq.set_subscribe("topic")?;
20//!
21//! while let Some(msg) = zmq.next().await {
22//! // Received message is a type of Result<MessageBuf>
23//! let msg = msg?;
24//!
25//! println!("{:?}", msg.iter());
26//! }
27//! Ok(())
28//! }
29//! ```
30//!
31//! [`xpublish`]: ../xpublish/index.html
32//! [`publish`]: ../publish/index.html
33//! [`xsubscribe`]: fn.xsubscribe.html
34//! [`Stream`]: ../trait.Stream.html
35//! [`StreamExt`]: ../trait.StreamExt.html
36
37use std::pin::Pin;
38use std::task::{Context, Poll};
39
40use zmq::SocketType;
41
42use crate::{
43 reactor::{AsRawSocket, ZmqSocket},
44 socket::{Multipart, Receiver, SocketBuilder},
45 RecvError, SocketError, Stream, SubscribeError,
46};
47
48/// Create a ZMQ socket with XSUB type
49pub fn xsubscribe(endpoint: &str) -> Result<SocketBuilder<'_, XSubscribe>, SocketError> {
50 Ok(SocketBuilder::new(SocketType::XSUB, endpoint))
51}
52
53/// The async wrapper of ZMQ socket with XSUB type
54pub struct XSubscribe(Receiver);
55
56impl From<zmq::Socket> for XSubscribe {
57 fn from(socket: zmq::Socket) -> Self {
58 Self(Receiver {
59 socket: ZmqSocket::from(socket),
60 })
61 }
62}
63
64impl Stream for XSubscribe {
65 type Item = Result<Multipart, RecvError>;
66
67 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
68 Pin::new(&mut self.get_mut().0)
69 .poll_next(cx)
70 .map(|poll| poll.map(|result| result.map_err(Into::into)))
71 }
72}
73
74impl XSubscribe {
75 /// Subscribe a topic to the socket
76 pub fn set_subscribe(&self, topic: &str) -> Result<(), SubscribeError> {
77 Ok(self.as_raw_socket().set_subscribe(topic.as_bytes())?)
78 }
79
80 /// Remove a topic from the socket
81 pub fn set_unsubscribe(&self, topic: &str) -> Result<(), SubscribeError> {
82 Ok(self.as_raw_socket().set_unsubscribe(topic.as_bytes())?)
83 }
84
85 /// Represent as `Socket` from zmq crate in case you want to call its methods.
86 pub fn as_raw_socket(&self) -> &zmq::Socket {
87 self.0.socket.as_socket()
88 }
89}