1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
use crate::{
ZmqResult, sealed,
socket::{MultipartReceiver, MultipartSender, Socket, SocketOption, SocketType},
};
/// # A Reply socket `ZMQ_REP`
///
/// A socket of type [`Reply`] is used by a service to receive requests from and send replies to a
/// client. This socket type allows only an alternating sequence of [`recv_msg()`] and subsequent
/// [`send_msg()`] calls. Each request received is fair-queued from among all clients, and each
/// reply sent is routed to the client that issued the last request. If the original requester does
/// not exist any more the reply is silently discarded.
///
/// [`Reply`]: ReplySocket
/// [`send_msg()`]: #impl-Sender-for-Socket<T>
/// [`recv_msg()`]: #impl-Receiver-for-Socket<T>
pub type ReplySocket = Socket<Reply>;
pub struct Reply {}
impl sealed::SenderFlag for Reply {}
impl sealed::ReceiverFlag for Reply {}
impl sealed::SocketType for Reply {
fn raw_socket_type() -> SocketType {
SocketType::Reply
}
}
unsafe impl Sync for Socket<Reply> {}
unsafe impl Send for Socket<Reply> {}
impl MultipartSender for Socket<Reply> {}
impl MultipartReceiver for Socket<Reply> {}
impl Socket<Reply> {
/// # Set socket routing id `ZMQ_ROUTING_ID`
///
/// The [`set_routing_id()`] option shall set the routing id of the specified 'socket' when
/// connecting to a [`Router`] socket.
///
/// A routing id must be at least one byte and at most 255 bytes long. Identities starting with
/// a zero byte are reserved for use by the 0MQ infrastructure.
///
/// If two clients use the same routing id when connecting to a [`Router`], the results shall
/// depend on the [`set_router_handover()`] option setting. If that is not set (or set to the
/// default of zero), the [`Router`] socket shall reject clients trying to connect with an
/// already-used routing id. If that option is set to `true`, the [`Router`]socket shall
/// hand-over the connection to the new client and disconnect the existing one.
///
/// [`set_routing_id()`]: #method.set_routing_id
/// [`Router`]: super::RouterSocket
/// [`set_router_handover()`]: super::RouterSocket::set_router_handover
pub fn set_routing_id<V>(&self, value: V) -> ZmqResult<()>
where
V: AsRef<str>,
{
self.set_sockopt_string(SocketOption::RoutingId, value)
}
/// # Retrieve socket routing id `ZMQ_ROUTING_ID`
///
/// The [`routing_id()`] option shall retrieve the routing id of the specified 'socket'.
/// Routing ids are used only by the request/reply pattern. Specifically, it can be used in
/// tandem with [`Router`] socket to route messages to the peer with a specific routing id.
///
/// A routing id must be at least one byte and at most 255 bytes long. Identities starting
/// with a zero byte are reserved for use by the 0MQ infrastructure.
///
/// [`routing_id()`]: #method.routing_id
/// [`Router`]: super::RouterSocket
pub fn routing_id(&self) -> ZmqResult<String> {
self.get_sockopt_string(SocketOption::RoutingId)
}
}
#[cfg(test)]
mod reply_tests {
use super::*;
use crate::socket::{Context, ZmqResult};
#[test]
fn set_routing_id_sets_routing_id() -> ZmqResult<()> {
let context = Context::new()?;
let socket = ReplySocket::from_context(&context)?;
socket.set_routing_id("asdf")?;
assert_eq!(socket.routing_id()?, "asdf");
Ok(())
}
}
#[cfg(feature = "builder")]
pub(crate) mod builder {
use core::default::Default;
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use super::ReplySocket;
use crate::{ZmqResult, context::Context, socket::SocketBuilder};
#[derive(Default, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Builder)]
#[builder(
pattern = "owned",
name = "ReplyBuilder",
public,
build_fn(skip, error = "ZmqError"),
derive(PartialEq, Eq, Hash, Clone, serde::Serialize, serde::Deserialize)
)]
#[builder_struct_attr(doc = "Builder for [`ReplySocket`].\n\n")]
#[allow(dead_code)]
struct ReplyConfig {
socket_builder: SocketBuilder,
#[builder(setter(into), default = "Default::default()")]
routing_id: String,
}
impl ReplyBuilder {
pub fn apply(self, socket: &ReplySocket) -> ZmqResult<()> {
if let Some(socket_builder) = self.socket_builder {
socket_builder.apply(socket)?;
}
self.routing_id
.iter()
.try_for_each(|routing_id| socket.set_routing_id(routing_id))?;
Ok(())
}
pub fn build_from_context(self, context: &Context) -> ZmqResult<ReplySocket> {
let socket = ReplySocket::from_context(context)?;
self.apply(&socket)?;
Ok(socket)
}
}
#[cfg(test)]
mod reply_builder_tests {
use super::ReplyBuilder;
use crate::socket::{Context, SocketBuilder, ZmqResult};
#[test]
fn default_reply_builder() -> ZmqResult<()> {
let context = Context::new()?;
let socket = ReplyBuilder::default().build_from_context(&context)?;
assert_eq!(socket.routing_id()?, "");
Ok(())
}
#[test]
fn reply_builder_with_custom_values() -> ZmqResult<()> {
let context = Context::new()?;
let socket = ReplyBuilder::default()
.socket_builder(SocketBuilder::default())
.routing_id("asdf")
.build_from_context(&context)?;
assert_eq!(socket.routing_id()?, "asdf");
Ok(())
}
}
}