1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use zmq::{self, Context as ZmqContext};
use crate::{poll::ZmqPoller, socket::AsZmqSocket, FromZmqSocket, Receiver, SocketBuilder};
pub fn subscribe(context: &ZmqContext) -> SocketBuilder<SubscribeWithoutTopic> {
SocketBuilder::new(context, zmq::SocketType::SUB)
}
pub struct SubscribeWithoutTopic {
socket: zmq::Socket,
}
impl FromZmqSocket<SubscribeWithoutTopic> for SubscribeWithoutTopic {
fn from_zmq_socket(socket: zmq::Socket) -> crate::Result<Self> {
Ok(Self { socket })
}
}
impl SubscribeWithoutTopic {
pub fn subscribe(self, topic: &[u8]) -> crate::Result<Subscribe> {
self.socket.set_subscribe(topic)?;
Ok(Subscribe {
inner: Receiver::new(ZmqPoller::from_zmq_socket(self.socket)?),
})
}
}
pub struct Subscribe {
inner: Receiver,
}
impl_wrapper!(Subscribe, Receiver, inner);
impl_wrapper_stream!(Subscribe, inner);
impl Subscribe {
pub fn subscribe(&mut self, topic: &[u8]) -> crate::Result<()> {
self.get_socket().set_subscribe(topic)?;
Ok(())
}
pub fn unsubscribe(&mut self, topic: &[u8]) -> crate::Result<()> {
self.get_socket().set_unsubscribe(topic)?;
Ok(())
}
}