1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use zmq::{self, Context as ZmqContext};

use crate::{poll::ZmqPoller, socket::AsZmqSocket, FromZmqSocket, Receiver, SocketBuilder};

/// Create a builder for a SUB socket.
///
/// ## Usage Example
///
/// ```rust,no_run
/// use futures::StreamExt;
///
/// use tmq::{subscribe, Context, Result};
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
///
///     let mut socket = subscribe(&Context::new())
///         .connect("tcp://127.0.0.1:7899")?
///         .subscribe(b"topic")?;
///
///     while let Some(msg) = socket.next().await {
///         println!(
///             "Subscribe: {:?}",
///             msg?.iter()
///                 .map(|item| item.as_str().unwrap_or("invalid text"))
///                 .collect::<Vec<&str>>()
///         );
///     }
///     Ok(())
/// }
/// ```
pub fn subscribe(context: &ZmqContext) -> SocketBuilder<SubscribeWithoutTopic> {
    SocketBuilder::new(context, zmq::SocketType::SUB)
}

/// SUB socket which is already bound or connected, but isn't yet subscribed to a topic.
pub struct SubscribeWithoutTopic {
    socket: zmq::Socket,
}

impl FromZmqSocket<SubscribeWithoutTopic> for SubscribeWithoutTopic {
    fn from_zmq_socket(socket: zmq::Socket) -> crate::Result<Self> {
        Ok(Self { socket })
    }
}

impl SubscribeWithoutTopic {
    /// Finishes creating the SUB socket by subscribing to the given topic.
    pub fn subscribe(self, topic: &[u8]) -> crate::Result<Subscribe> {
        self.socket.set_subscribe(topic)?;
        Ok(Subscribe {
            inner: Receiver::new(ZmqPoller::from_zmq_socket(self.socket)?),
        })
    }
}

/// Asynchronous SUB socket.
pub struct Subscribe {
    inner: Receiver,
}
impl_wrapper!(Subscribe, Receiver, inner);
impl_wrapper_stream!(Subscribe, inner);

impl Subscribe {
    /// Adds another topic to this subscriber.
    /// This doesn't remove the previously added topics.
    pub fn subscribe(&mut self, topic: &[u8]) -> crate::Result<()> {
        self.get_socket().set_subscribe(topic)?;
        Ok(())
    }

    /// Removes a topic from this subscriber.
    pub fn unsubscribe(&mut self, topic: &[u8]) -> crate::Result<()> {
        self.get_socket().set_unsubscribe(topic)?;
        Ok(())
    }
}