1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
use std::ffi::CString;
use std::ptr;
use nng_sys::protocol::*;
use crate::error::{ErrorKind, Result, SendResult};
use crate::message::Message;
/// A nanomsg-next-generation socket.
///
/// All communication between application and remote Scalability Protocol peers
/// is done through sockets. A given socket can have multiple dialers,
/// listeners, and pipes, and may be connected to multiple transports at the
/// same time. However, a given socket will have exactly one protocol
/// associated with it and is responsible for any state machines or other
/// application-specific logic.
///
/// See the [nng documenatation][1] for more information.
///
/// [1]: https://nanomsg.github.io/nng/man/v1.0.0/nng_socket.5.html
#[derive(Debug)]
pub struct Socket
{
/// Handle to the underlying nng socket.
handle: nng_sys::nng_socket,
/// Whether or not this socket should block on sending and receiving
nonblocking: bool,
}
impl Socket
{
/// Creates a new socket which uses the specified protocol.
pub fn new(t: Protocol) -> Result<Socket>
{
// Create the uninitialized nng_socket
let mut socket = nng_sys::NNG_SOCKET_INITIALIZER;
// Try to open a socket of the specified type
let rv = unsafe {
match t {
Protocol::Bus0 => bus0::nng_bus0_open(&mut socket as *mut _),
Protocol::Pair0 => pair0::nng_pair0_open(&mut socket as *mut _),
Protocol::Pair1 => pair1::nng_pair1_open(&mut socket as *mut _),
Protocol::Pub0 => pubsub0::nng_pub0_open(&mut socket as *mut _),
Protocol::Pull0 => pipeline0::nng_pull0_open(&mut socket as *mut _),
Protocol::Push0 => pipeline0::nng_pull0_open(&mut socket as *mut _),
Protocol::Rep0 => reqrep0::nng_rep0_open(&mut socket as *mut _),
Protocol::Req0 => reqrep0::nng_req0_open(&mut socket as *mut _),
Protocol::Respondent0 => survey0::nng_respondent0_open(&mut socket as *mut _),
Protocol::Sub0 => pubsub0::nng_sub0_open(&mut socket as *mut _),
Protocol::Surveyor0 => survey0::nng_surveyor0_open(&mut socket as *mut _),
}
};
rv2res!(rv, Socket { handle: socket, nonblocking: false })
}
/// Initiates a remote connection to a listener.
///
/// When the connection is closed, the underlying `Dialer` will attempt to
/// re-establish the connection. It will also periodically retry a
/// connection automatically if an attempt to connect asynchronously fails.
///
/// Normally, the first attempt to connect to the address indicated by the
/// provided _url_ is done synchronously, including any necessary name
/// resolution. As a result, a failure, such as if the connection is
/// refused, will be returned immediately and no further action will be
/// taken.
///
/// However, if the socket is set to `nonblocking`, then the connection
/// attempt is made asynchronously.
///
/// Furthermore, if the connection was closed for a synchronously dialed
/// connection, the dialer will still attempt to redial asynchronously.
///
/// Because the dialer is started immediately, it is generally not possible
/// to apply extra configuration. If that is needed, or if one wishes to
/// close the dialer before the socket, applications should consider using
/// the `Dialer` type directly.
///
/// See the [nng documentation][1] for more information.
///
/// [1]: https://nanomsg.github.io/nng/man/v1.0.0/nng_dial.3.html
pub fn dial(&mut self, url: &str) -> Result<()>
{
let addr = CString::new(url).map_err(|_| ErrorKind::AddressInvalid)?;
let flags = if self.nonblocking { nng_sys::NNG_FLAG_NONBLOCK } else { 0 };
let rv = unsafe {
nng_sys::nng_dial(self.handle, addr.as_ptr(), ptr::null_mut(), flags)
};
rv2res!(rv)
}
/// Initiates and starts a listener on the specified address.
///
/// Listeners are used to accept connections initiated by remote dialers.
/// Unlike a dialer, listeners generally can have many connections open
/// concurrently.
///
/// Normally, the act of "binding" to the address indicated by _url_ is
/// done synchronously, including any necessary name resolution. As a
/// result, a failure, such as if the address is already in use, will be
/// returned immediately. However, if the socket is set to `nonblocking`
/// then this is done asynchronously; furthermore any failure to bind will
/// be periodically reattempted in the background.
///
/// Because the listener is started immediately, it is generally not
/// possible to apply extra configuration. If that is needed, or if one
/// wishes to close the dialer before the socket, applications should
/// consider using the `Listener` type directly.
///
/// See the [nng documentation][1] for more information.
///
/// [1]: https://nanomsg.github.io/nng/man/v1.0.0/nng_listen.3.html
pub fn listen(&mut self, url: &str) -> Result<()>
{
let addr = CString::new(url).map_err(|_| ErrorKind::AddressInvalid)?;
let flags = if self.nonblocking { nng_sys::NNG_FLAG_NONBLOCK } else { 0 };
let rv = unsafe {
nng_sys::nng_listen(self.handle, addr.as_ptr(), ptr::null_mut(), flags)
};
rv2res!(rv)
}
/// Sets whether or not this socket should use nonblocking operations.
///
/// If the socket is set to nonblocking mode, then the send and receive
/// functions return immediately even if there are no messages available or
/// the message cannot be sent. Otherwise, the functions will wailt until
/// the operation can complete or any configured timer expires.
///
/// The default is blocking operations.
pub fn set_nonblocking(&mut self, nonblocking: bool)
{
self.nonblocking = nonblocking;
}
/// Receives a message from the socket.
///
/// The semantics of what receiving a message means vary from protocol to
/// protocol, so examination of the protocol documentation is encouraged.
/// For example, with a _req_ socket a message may only be received after a
/// request has been sent. Furthermore, some protocols may not support
/// receiving data at all, such as _pub_.
pub fn recv(&mut self) -> Result<Message>
{
let mut msgp: *mut nng_sys::nng_msg = ptr::null_mut();
let flags = if self.nonblocking { nng_sys::NNG_FLAG_NONBLOCK } else { 0 };
let rv = unsafe {
nng_sys::nng_recvmsg(self.handle, &mut msgp as _, flags)
};
validate_ptr!(rv, msgp);
Ok(unsafe { Message::from_ptr(msgp) })
}
/// Sends a message on the socket.
///
/// The semantics of what sending a message means vary from protocol to
/// protocol, so examination of the protocol documentation is encouraged.
/// For example, with a _pub_ socket the data is broadcast so that any
/// peers who have a suitable subscription will be able to receive it.
/// Furthermore, some protocols may not support sending data (such as
/// _sub_) or may require other conditions. For example, _rep_sockets
/// cannot normally send data, which are responses to requests, until they
/// have first received a request.
///
/// If the message cannot be sent, then it is returned to the caller as a
/// part of the `Error`.
pub fn send(&mut self, data: Message) -> SendResult<()>
{
let flags = if self.nonblocking { nng_sys::NNG_FLAG_NONBLOCK } else { 0 };
unsafe {
let msgp = data.into_ptr();
let rv = nng_sys::nng_sendmsg(self.handle, msgp, flags);
if rv != 0 {
Err((Message::from_ptr(msgp), ErrorKind::from_code(rv).into()))
} else {
Ok(())
}
}
}
/// Get the positive identifier for the socket.
pub fn id(&self) -> i32
{
let id = unsafe { nng_sys::nng_socket_id(self.handle) };
assert!(id > 0, "Invalid socket ID returned from valid socket");
id
}
/// Returns the underlying `nng_socket`.
pub(crate) fn handle(&self) -> nng_sys::nng_socket
{
self.handle
}
}
expose_options!{
Socket :: handle -> nng_sys::nng_socket;
GETOPT_BOOL = nng_sys::nng_getopt_bool;
GETOPT_INT = nng_sys::nng_getopt_int;
GETOPT_MS = nng_sys::nng_getopt_ms;
GETOPT_SIZE = nng_sys::nng_getopt_size;
GETOPT_SOCKADDR = crate::fake_opt;
GETOPT_STRING = nng_sys::nng_getopt_string;
SETOPT = nng_sys::nng_setopt;
SETOPT_BOOL = nng_sys::nng_setopt_bool;
SETOPT_INT = nng_sys::nng_setopt_int;
SETOPT_MS = nng_sys::nng_setopt_ms;
SETOPT_SIZE = nng_sys::nng_setopt_size;
SETOPT_STRING = nng_sys::nng_setopt_string;
Gets -> [Raw, MaxTtl, RecvBufferSize,
RecvTimeout, SendBufferSize,
SendTimeout, SocketName,
protocol::reqrep::ResendTime,
protocol::survey::SurveyTime];
Sets -> [ReconnectMinTime, ReconnectMaxTime,
RecvBufferSize, RecvMaxSize,
RecvTimeout, SendBufferSize,
SendTimeout, SocketName, MaxTtl,
protocol::reqrep::ResendTime,
protocol::pubsub::Subscribe,
protocol::pubsub::Unsubscribe,
protocol::survey::SurveyTime,
transport::tcp::NoDelay,
transport::tcp::KeepAlive,
transport::tls::CaFile,
transport::tls::CertKeyFile,
transport::websocket::RequestHeaders,
transport::websocket::ResponseHeaders];
}
impl Drop for Socket
{
fn drop(&mut self)
{
// Closing a socket should only ever return success or ECLOSED and both
// of those mean we have nothing to drop. However, just to be sane
// about it all, we'll warn the user if we see something odd. If that
// ever happens, hopefully it will make its way to a bug report.
let rv = unsafe { nng_sys::nng_close(self.handle) };
assert!(
rv == 0 || rv == nng_sys::NNG_ECLOSED,
"Unexpected error code while closing socket ({})", rv
);
}
}
/// Protocols available for use by sockets.
#[derive(Copy, Clone, Debug)]
pub enum Protocol
{
/// Version 0 of the bus protocol.
///
/// The _bus_ protocol provides for building mesh networks where every peer
/// is connected to every other peer. In this protocol, each message sent
/// by a node is sent to every one of its directly connected peers. See
/// the [bus documentation][1] for more information.
///
/// [1]: https://nanomsg.github.io/nng/man/v1.0.0/nng_bus.7.html
Bus0,
/// Version 0 of the pair protocol.
///
/// The _pair_ protocol implements a peer-to-peer pattern, where
/// relationships between peers are one-to-one. See the
/// [pair documentation][1] for more information.
///
/// [1]: https://nanomsg.github.io/nng/man/v1.0.0/nng_pair.7.html
Pair0,
/// Version 1 of the pair protocol.
///
/// The _pair_ protocol implements a peer-to-peer pattern, where
/// relationships between peers are one-to-one. Version 1 of this protocol
/// supports and optional _polyamorous_ mode. See the [pair documentation][1]
/// for more information.
///
/// [1]: https://nanomsg.github.io/nng/man/v1.0.0/nng_pair.7.html
Pair1,
/// Version 0 of the publisher protocol.
///
/// The _pub_ protocol is one half of a publisher/subscriber pattern. In
/// this pattern, a publisher sends data, which is broadcast to all
/// subscribers. The subscribing applications only see the data to which
/// they have subscribed. See the [publisher/subscriber documentation][1]
/// for more information.
///
/// [1]: https://nanomsg.github.io/nng/man/v1.0.0/nng_pub.7.html
Pub0,
/// Version 0 of the pull protocol.
///
/// The _pull_ protocol is one half of a pipeline pattern. The other half
/// is the _push_ protocol. In the pipeline pattern, pushers distribute
/// messages to pullers. Each message sent by a pusher will be sent to one
/// of its peer pullers, chosen in a round-robin fashion from the set of
/// connected peers available for receiving. This property makes this
/// pattern useful in load-balancing scenarios.
///
/// See the [pipeline documentation][1] for more information.
///
/// [1]: https://nanomsg.github.io/nng/man/v1.0.0/nng_pull.7.html
Pull0,
/// Version 0 of the push protocol.
///
/// The _push_ protocol is one half of a pipeline pattern. The other side
/// is the _pull_ protocol. In the pipeline pattern, pushers distribute
/// messages to pullers. Each message sent by a pusher will be sent to one
/// of its peer pullers, chosen in a round-robin fashion from the set of
/// connected peers available for receiving. This property makes this
/// pattern useful in load-balancing scenarios.
///
/// See the [pipeline documentation][1] for more information.
///
/// [1]: https://nanomsg.github.io/nng/man/v1.0.0/nng_push.7.html
Push0,
/// Version 0 of the reply protocol.
///
/// The _rep_ protocol is one half of a request/reply pattern. In this
/// pattern, a requester sends a message to one replier, who is expected to
/// reply. The request is resent if no reply arrives, until a reply is
/// received or the request times out.
///
/// See the [request/reply documentation][1] for more information.
///
/// [1]: https://nanomsg.github.io/nng/man/v1.0.0/nng_rep.7.html
Rep0,
/// Version 0 of the request protocol.
///
/// The _req_ protocol is one half of a request/reply pattern. In this
/// pattern, a requester sends a message to one replier, who is expected to
/// reply. The request is resent if no reply arrives, until a reply is
/// received or the request times out.
///
/// See the [request/reply documentation][1] for more information.
///
/// [1]: https://nanomsg.github.io/nng/man/v1.0.0/nng_req.7.html
Req0,
/// Version 0 of the respondent protocol.
///
/// The _respondent_ protocol is one half of a survey pattern. In this
/// pattern, a surveyor sends a survey, which is broadcast to all peer
/// respondents. The respondents then have a chance to reply (but are not
/// obliged to reply). The survey itself is a timed event, so that
/// responses received after the survey has finished are discarded.
///
/// See the [survery documentation][1] for more information.
///
/// [1]: https://nanomsg.github.io/nng/man/v1.0.0/nng_respondent.7.html
Respondent0,
/// Version 0 of the subscriber protocol.
///
/// The _sub_ protocol is one half of a publisher/subscriber pattern. In
/// this pattern, a publisher sends data, which is broadcast to all
/// subscribers. The subscribing applications only see the data to which
/// they have subscribed.
///
/// See the [publisher/subscriber documentation][1] for more information.
///
/// [1]: https://nanomsg.github.io/nng/man/v1.0.0/nng_sub.7.html
Sub0,
/// Version 0 of the surveyor protocol.
///
/// The _surveyor_ protocol is one half of a survey pattern. In this
/// pattern, a surveyor sends a survey, which is broadcast to all peer
/// respondents. The respondents then have a chance to reply (but are not
/// obliged to reply). The survey itself is a timed event, so that
/// responses received after the survey has finished are discarded.
///
/// See the [survey documentation][1] for more information.
///
/// [1]: https://nanomsg.github.io/nng/man/v1.0.0/nng_surveyor.7.html
Surveyor0,
}