pub struct Socket { /* private fields */ }
Expand description
An NNG socket.
All communication between application and remote Scalability Protocol peers is done through sockets. A given socket can have multiple dialers, listeners, and pipes, and may be connected to multiple transports at the same time. However, a given socket will have exactly one protocol associated with it and is responsible for any state machines or other application-specific logic.
See the NNG documentation for more information.
Implementations§
Source§impl Socket
impl Socket
Sourcepub fn new(t: Protocol) -> Result<Socket>
pub fn new(t: Protocol) -> Result<Socket>
Creates a new socket which uses the specified protocol.
§Errors
NotSupported
: Protocol is not enabled.OutOfMemory
: Insufficient memory available.
Examples found in repository?
30fn node0(url: &str) -> Result<(), Error> {
31 let s = Socket::new(Protocol::Pair0)?;
32 s.listen(url)?;
33
34 send_recv(&s, "NODE0")
35}
36
37/// The dialing node.
38fn node1(url: &str) -> Result<(), Error> {
39 let s = Socket::new(Protocol::Pair0)?;
40 s.dial(url)?;
41
42 send_recv(&s, "NODE1")
43}
More examples
28fn pull(url: &str) -> Result<(), Error> {
29 let s = Socket::new(Protocol::Pull0)?;
30 s.listen(url)?;
31
32 loop {
33 let msg = s.recv()?;
34 let arg = str::from_utf8(&msg).expect("message has invalid UTF-8");
35
36 println!("PULL: RECEIVED \"{}\"", arg);
37 }
38}
39
40/// Push socket.
41fn push(url: &str, arg: &str) -> Result<(), Error> {
42 let s = Socket::new(Protocol::Push0)?;
43 s.dial(url)?;
44
45 println!("PUSH: SENDING \"{}\"", arg);
46 s.send(arg.as_bytes())?;
47
48 // Wait for messages to flush before shutting down.
49 thread::sleep(Duration::from_secs(1));
50 Ok(())
51}
34fn request(url: &str) -> Result<(), Error> {
35 let s = Socket::new(Protocol::Req0)?;
36 s.dial(url)?;
37
38 println!("REQUEST: SENDING DATE REQUEST");
39 s.send(DATE_REQUEST.to_le_bytes())?;
40
41 println!("REQUEST: WAITING FOR RESPONSE");
42 let msg = s.recv()?;
43 let epoch = u64::from_le_bytes(msg[..].try_into().unwrap());
44
45 println!("REQUEST: UNIX EPOCH WAS {} SECONDS AGO", epoch);
46
47 Ok(())
48}
49
50/// Run the reply portion of the program.
51fn reply(url: &str) -> Result<(), Error> {
52 let s = Socket::new(Protocol::Rep0)?;
53 s.listen(url)?;
54
55 loop {
56 println!("REPLY: WAITING FOR COMMAND");
57 let mut msg = s.recv()?;
58
59 let cmd = u64::from_le_bytes(msg[..].try_into().unwrap());
60 if cmd != DATE_REQUEST {
61 println!("REPLY: UNKNOWN COMMAND");
62 continue;
63 }
64
65 println!("REPLY: RECEIVED DATE REQUEST");
66 let rep = SystemTime::now()
67 .duration_since(SystemTime::UNIX_EPOCH)
68 .expect("current system time is before Unix epoch")
69 .as_secs();
70
71 msg.clear();
72 msg.push_back(&rep.to_le_bytes());
73
74 println!("REPLY: SENDING {}", rep);
75 s.send(msg)?;
76 }
77}
43fn client(url: &str, ms: u64) -> Result<(), nng::Error> {
44 let s = Socket::new(Protocol::Req0)?;
45 s.dial(url)?;
46
47 let start = Instant::now();
48 s.send(ms.to_le_bytes())?;
49 s.recv()?;
50
51 let dur = Instant::now().duration_since(start);
52 let subsecs: u64 = dur.subsec_millis().into();
53 println!(
54 "Request took {} milliseconds",
55 dur.as_secs() * 1000 + subsecs
56 );
57
58 Ok(())
59}
60
61/// Run the server portion of the program.
62fn server(url: &str) -> Result<(), nng::Error> {
63 // Create the socket
64 let s = Socket::new(Protocol::Rep0)?;
65
66 // Create all of the worker contexts
67 let workers: Vec<_> = (0..PARALLEL)
68 .map(|_| {
69 let ctx = Context::new(&s)?;
70 let ctx_clone = ctx.clone();
71 let aio = Aio::new(move |aio, res| worker_callback(aio, &ctx_clone, res))?;
72 Ok((aio, ctx))
73 })
74 .collect::<Result<_, nng::Error>>()?;
75
76 // Only after we have the workers do we start listening.
77 s.listen(url)?;
78
79 // Now start all of the workers listening.
80 for (a, c) in &workers {
81 c.recv(a)?;
82 }
83
84 thread::sleep(Duration::from_secs(60 * 60 * 24 * 365));
85
86 Ok(())
87}
25fn node(name: &str, listen: &str, dial: &[String]) -> Result<(), Error> {
26 let s = Socket::new(Protocol::Bus0)?;
27 s.listen(listen)?;
28
29 // Give time for peers to bind.
30 thread::sleep(Duration::from_secs(1));
31 for peer in dial {
32 s.dial(peer)?;
33 }
34
35 // SEND
36 println!("{0}: SENDING \"{0}\" ONTO BUS", name);
37 s.send(name.as_bytes())?;
38
39 // RECV
40 loop {
41 let msg = s.recv()?;
42 let peer = str::from_utf8(&msg).expect("invalid UTF-8");
43
44 println!("{}: RECEIVED \"{}\" FROM BUS", name, peer);
45 }
46}
30fn surveyor(url: &str) -> Result<(), Error> {
31 let s = Socket::new(Protocol::Surveyor0)?;
32 s.listen(url)?;
33
34 loop {
35 println!("SURVEYOR: SENDING DATE SURVEY REQUEST");
36 s.send(DATE.as_bytes())?;
37
38 loop {
39 let msg = match s.recv() {
40 Ok(m) => m,
41 Err(Error::TimedOut) => break,
42 Err(e) => return Err(e),
43 };
44
45 let date = u64::from_le_bytes(msg[..].try_into().unwrap());
46 println!("SURVEYOR: RECEIVED \"{}\" SURVEY RESPONSE", date);
47 }
48
49 println!("SURVEYOR SURVEY COMPLETE");
50 }
51}
52
53/// Respondent socket.
54fn respondent(url: &str, name: &str) -> Result<(), Error> {
55 let s = Socket::new(Protocol::Respondent0)?;
56 s.dial(url)?;
57
58 loop {
59 let mut msg = s.recv()?;
60
61 let survey = str::from_utf8(&msg).expect("invalid UTF-8");
62 println!(
63 "RESPONDENT ({}): RECEIVED \"{}\" SURVEY REQUEST",
64 name, survey
65 );
66
67 // Reuse the message to avoid allocation.
68 msg.clear();
69 let date = SystemTime::now()
70 .duration_since(SystemTime::UNIX_EPOCH)
71 .expect("system time is before Unix epoch")
72 .as_secs();
73
74 msg.push_back(&date.to_le_bytes());
75
76 println!("RESPONDENT ({}): SENDING \"{}\"", name, date);
77 s.send(msg)?;
78 }
79}
Sourcepub fn dial(&self, url: &str) -> Result<()>
pub fn dial(&self, url: &str) -> Result<()>
Initiates a remote connection to a listener.
When the connection is closed, the underlying Dialer
will attempt to
re-establish the connection.
The first attempt to connect to the address indicated by the provided url is done synchronously, including any necessary name resolution. As a result, a failure, such as if the connection is refused, will be returned immediately and no further action will be taken.
If the connection was closed for a synchronously dialed connection, the dialer will still attempt to redial asynchronously.
Because the dialer is started immediately, it is generally not possible
to apply extra configuration. If that is needed, or if one wishes to
close the dialer before the socket, applications should consider using
the Dialer
type directly.
See the NNG documentation for more information.
§Errors
AddressInvalid
: An invalid url was specified.Closed
: The socket is not open.ConnectionRefused
: The remote peer refused the connection.ConnectionReset
: The remote peer reset the connection.DestUnreachable
: The remote address is not reachable.OutOfMemory
: Insufficient memory is available.PeerAuth
: Authentication or authorization failure.Protocol
: A protocol error occurred.
Examples found in repository?
More examples
34fn request(url: &str) -> Result<(), Error> {
35 let s = Socket::new(Protocol::Req0)?;
36 s.dial(url)?;
37
38 println!("REQUEST: SENDING DATE REQUEST");
39 s.send(DATE_REQUEST.to_le_bytes())?;
40
41 println!("REQUEST: WAITING FOR RESPONSE");
42 let msg = s.recv()?;
43 let epoch = u64::from_le_bytes(msg[..].try_into().unwrap());
44
45 println!("REQUEST: UNIX EPOCH WAS {} SECONDS AGO", epoch);
46
47 Ok(())
48}
43fn client(url: &str, ms: u64) -> Result<(), nng::Error> {
44 let s = Socket::new(Protocol::Req0)?;
45 s.dial(url)?;
46
47 let start = Instant::now();
48 s.send(ms.to_le_bytes())?;
49 s.recv()?;
50
51 let dur = Instant::now().duration_since(start);
52 let subsecs: u64 = dur.subsec_millis().into();
53 println!(
54 "Request took {} milliseconds",
55 dur.as_secs() * 1000 + subsecs
56 );
57
58 Ok(())
59}
64fn subscriber(url: &str) -> Result<(), nng::Error> {
65 let s = Socket::new(Protocol::Sub0)?;
66 s.dial(url)?;
67
68 println!("SUBSCRIBER: SUBSCRIBING TO ALL TOPICS");
69 let all_topics = vec![];
70 s.set_opt::<Subscribe>(all_topics)?;
71
72 loop {
73 let msg = s.recv()?;
74 let subs = usize::from_le_bytes(msg[..].try_into().unwrap());
75 println!("SUBSCRIBER: THERE ARE {} SUBSCRIBERS", subs);
76 }
77}
25fn node(name: &str, listen: &str, dial: &[String]) -> Result<(), Error> {
26 let s = Socket::new(Protocol::Bus0)?;
27 s.listen(listen)?;
28
29 // Give time for peers to bind.
30 thread::sleep(Duration::from_secs(1));
31 for peer in dial {
32 s.dial(peer)?;
33 }
34
35 // SEND
36 println!("{0}: SENDING \"{0}\" ONTO BUS", name);
37 s.send(name.as_bytes())?;
38
39 // RECV
40 loop {
41 let msg = s.recv()?;
42 let peer = str::from_utf8(&msg).expect("invalid UTF-8");
43
44 println!("{}: RECEIVED \"{}\" FROM BUS", name, peer);
45 }
46}
Sourcepub fn listen(&self, url: &str) -> Result<()>
pub fn listen(&self, url: &str) -> Result<()>
Initiates and starts a listener on the specified address.
Listeners are used to accept connections initiated by remote dialers. Unlike a dialer, listeners generally can have many connections open concurrently.
The act of “binding” to the address indicated by url is done synchronously, including any necessary name resolution. As a result, a failure, such as if the address is already in use, will be returned immediately.
Because the listener is started immediately, it is generally not
possible to apply extra configuration. If that is needed, or if one
wishes to close the dialer before the socket, applications should
consider using the Listener
type directly.
See the NNG documentation for more information.
§Errors
AddressInUse
: The address specified by url is already in use.AddressInvalid
: An invalid url was specified.Closed
: The socket is not open.OutOfMemory
: Insufficient memory is available.
Examples found in repository?
More examples
25fn node(name: &str, listen: &str, dial: &[String]) -> Result<(), Error> {
26 let s = Socket::new(Protocol::Bus0)?;
27 s.listen(listen)?;
28
29 // Give time for peers to bind.
30 thread::sleep(Duration::from_secs(1));
31 for peer in dial {
32 s.dial(peer)?;
33 }
34
35 // SEND
36 println!("{0}: SENDING \"{0}\" ONTO BUS", name);
37 s.send(name.as_bytes())?;
38
39 // RECV
40 loop {
41 let msg = s.recv()?;
42 let peer = str::from_utf8(&msg).expect("invalid UTF-8");
43
44 println!("{}: RECEIVED \"{}\" FROM BUS", name, peer);
45 }
46}
30fn surveyor(url: &str) -> Result<(), Error> {
31 let s = Socket::new(Protocol::Surveyor0)?;
32 s.listen(url)?;
33
34 loop {
35 println!("SURVEYOR: SENDING DATE SURVEY REQUEST");
36 s.send(DATE.as_bytes())?;
37
38 loop {
39 let msg = match s.recv() {
40 Ok(m) => m,
41 Err(Error::TimedOut) => break,
42 Err(e) => return Err(e),
43 };
44
45 let date = u64::from_le_bytes(msg[..].try_into().unwrap());
46 println!("SURVEYOR: RECEIVED \"{}\" SURVEY RESPONSE", date);
47 }
48
49 println!("SURVEYOR SURVEY COMPLETE");
50 }
51}
62fn server(url: &str) -> Result<(), nng::Error> {
63 // Create the socket
64 let s = Socket::new(Protocol::Rep0)?;
65
66 // Create all of the worker contexts
67 let workers: Vec<_> = (0..PARALLEL)
68 .map(|_| {
69 let ctx = Context::new(&s)?;
70 let ctx_clone = ctx.clone();
71 let aio = Aio::new(move |aio, res| worker_callback(aio, &ctx_clone, res))?;
72 Ok((aio, ctx))
73 })
74 .collect::<Result<_, nng::Error>>()?;
75
76 // Only after we have the workers do we start listening.
77 s.listen(url)?;
78
79 // Now start all of the workers listening.
80 for (a, c) in &workers {
81 c.recv(a)?;
82 }
83
84 thread::sleep(Duration::from_secs(60 * 60 * 24 * 365));
85
86 Ok(())
87}
51fn reply(url: &str) -> Result<(), Error> {
52 let s = Socket::new(Protocol::Rep0)?;
53 s.listen(url)?;
54
55 loop {
56 println!("REPLY: WAITING FOR COMMAND");
57 let mut msg = s.recv()?;
58
59 let cmd = u64::from_le_bytes(msg[..].try_into().unwrap());
60 if cmd != DATE_REQUEST {
61 println!("REPLY: UNKNOWN COMMAND");
62 continue;
63 }
64
65 println!("REPLY: RECEIVED DATE REQUEST");
66 let rep = SystemTime::now()
67 .duration_since(SystemTime::UNIX_EPOCH)
68 .expect("current system time is before Unix epoch")
69 .as_secs();
70
71 msg.clear();
72 msg.push_back(&rep.to_le_bytes());
73
74 println!("REPLY: SENDING {}", rep);
75 s.send(msg)?;
76 }
77}
Sourcepub fn dial_async(&self, url: &str) -> Result<()>
pub fn dial_async(&self, url: &str) -> Result<()>
Asynchronously initiates a remote connection to a listener.
When the connection is closed, the underlying Dialer
will attempt to
re-establish the connection. It will also periodically retry a
connection automatically if an attempt to connect asynchronously fails.
Because the dialer is started immediately, it is generally not possible
to apply extra configuration. If that is needed, or if one wishes to
close the dialer before the socket, applications should consider using
the Dialer
type directly.
See the NNG documentation for more information.
§Errors
AddressInvalid
: An invalid url was specified.Closed
: The socket is not open.ConnectionRefused
: The remote peer refused the connection.ConnectionReset
: The remote peer reset the connection.DestUnreachable
: The remote address is not reachable.OutOfMemory
: Insufficient memory is available.PeerAuth
: Authentication or authorization failure.Protocol
: A protocol error occurred.
Sourcepub fn recv(&self) -> Result<Message>
pub fn recv(&self) -> Result<Message>
Receives a message from the socket.
The semantics of what receiving a message means vary from protocol to protocol, so examination of the protocol documentation is encouraged. For example, with a req socket a message may only be received after a request has been sent. Furthermore, some protocols may not support receiving data at all, such as pub.
§Errors
Closed
: The socket is not open.IncorrectState
: The socket cannot receive data in this state.NotSupported
: The protocol does not support receiving.OutOfMemory
: Insufficient memory is available.TimedOut
: The operation timed out.
Examples found in repository?
More examples
34fn request(url: &str) -> Result<(), Error> {
35 let s = Socket::new(Protocol::Req0)?;
36 s.dial(url)?;
37
38 println!("REQUEST: SENDING DATE REQUEST");
39 s.send(DATE_REQUEST.to_le_bytes())?;
40
41 println!("REQUEST: WAITING FOR RESPONSE");
42 let msg = s.recv()?;
43 let epoch = u64::from_le_bytes(msg[..].try_into().unwrap());
44
45 println!("REQUEST: UNIX EPOCH WAS {} SECONDS AGO", epoch);
46
47 Ok(())
48}
49
50/// Run the reply portion of the program.
51fn reply(url: &str) -> Result<(), Error> {
52 let s = Socket::new(Protocol::Rep0)?;
53 s.listen(url)?;
54
55 loop {
56 println!("REPLY: WAITING FOR COMMAND");
57 let mut msg = s.recv()?;
58
59 let cmd = u64::from_le_bytes(msg[..].try_into().unwrap());
60 if cmd != DATE_REQUEST {
61 println!("REPLY: UNKNOWN COMMAND");
62 continue;
63 }
64
65 println!("REPLY: RECEIVED DATE REQUEST");
66 let rep = SystemTime::now()
67 .duration_since(SystemTime::UNIX_EPOCH)
68 .expect("current system time is before Unix epoch")
69 .as_secs();
70
71 msg.clear();
72 msg.push_back(&rep.to_le_bytes());
73
74 println!("REPLY: SENDING {}", rep);
75 s.send(msg)?;
76 }
77}
43fn client(url: &str, ms: u64) -> Result<(), nng::Error> {
44 let s = Socket::new(Protocol::Req0)?;
45 s.dial(url)?;
46
47 let start = Instant::now();
48 s.send(ms.to_le_bytes())?;
49 s.recv()?;
50
51 let dur = Instant::now().duration_since(start);
52 let subsecs: u64 = dur.subsec_millis().into();
53 println!(
54 "Request took {} milliseconds",
55 dur.as_secs() * 1000 + subsecs
56 );
57
58 Ok(())
59}
64fn subscriber(url: &str) -> Result<(), nng::Error> {
65 let s = Socket::new(Protocol::Sub0)?;
66 s.dial(url)?;
67
68 println!("SUBSCRIBER: SUBSCRIBING TO ALL TOPICS");
69 let all_topics = vec![];
70 s.set_opt::<Subscribe>(all_topics)?;
71
72 loop {
73 let msg = s.recv()?;
74 let subs = usize::from_le_bytes(msg[..].try_into().unwrap());
75 println!("SUBSCRIBER: THERE ARE {} SUBSCRIBERS", subs);
76 }
77}
25fn node(name: &str, listen: &str, dial: &[String]) -> Result<(), Error> {
26 let s = Socket::new(Protocol::Bus0)?;
27 s.listen(listen)?;
28
29 // Give time for peers to bind.
30 thread::sleep(Duration::from_secs(1));
31 for peer in dial {
32 s.dial(peer)?;
33 }
34
35 // SEND
36 println!("{0}: SENDING \"{0}\" ONTO BUS", name);
37 s.send(name.as_bytes())?;
38
39 // RECV
40 loop {
41 let msg = s.recv()?;
42 let peer = str::from_utf8(&msg).expect("invalid UTF-8");
43
44 println!("{}: RECEIVED \"{}\" FROM BUS", name, peer);
45 }
46}
30fn surveyor(url: &str) -> Result<(), Error> {
31 let s = Socket::new(Protocol::Surveyor0)?;
32 s.listen(url)?;
33
34 loop {
35 println!("SURVEYOR: SENDING DATE SURVEY REQUEST");
36 s.send(DATE.as_bytes())?;
37
38 loop {
39 let msg = match s.recv() {
40 Ok(m) => m,
41 Err(Error::TimedOut) => break,
42 Err(e) => return Err(e),
43 };
44
45 let date = u64::from_le_bytes(msg[..].try_into().unwrap());
46 println!("SURVEYOR: RECEIVED \"{}\" SURVEY RESPONSE", date);
47 }
48
49 println!("SURVEYOR SURVEY COMPLETE");
50 }
51}
52
53/// Respondent socket.
54fn respondent(url: &str, name: &str) -> Result<(), Error> {
55 let s = Socket::new(Protocol::Respondent0)?;
56 s.dial(url)?;
57
58 loop {
59 let mut msg = s.recv()?;
60
61 let survey = str::from_utf8(&msg).expect("invalid UTF-8");
62 println!(
63 "RESPONDENT ({}): RECEIVED \"{}\" SURVEY REQUEST",
64 name, survey
65 );
66
67 // Reuse the message to avoid allocation.
68 msg.clear();
69 let date = SystemTime::now()
70 .duration_since(SystemTime::UNIX_EPOCH)
71 .expect("system time is before Unix epoch")
72 .as_secs();
73
74 msg.push_back(&date.to_le_bytes());
75
76 println!("RESPONDENT ({}): SENDING \"{}\"", name, date);
77 s.send(msg)?;
78 }
79}
Sourcepub fn send<M: Into<Message>>(&self, msg: M) -> Result<(), (Message, Error)>
pub fn send<M: Into<Message>>(&self, msg: M) -> Result<(), (Message, Error)>
Sends a message on the socket.
The semantics of what sending a message means vary from protocol to protocol, so examination of the protocol documentation is encouraged. For example, with a pub socket the data is broadcast so that any peers who have a suitable subscription will be able to receive it. Furthermore, some protocols may not support sending data (such as sub) or may require other conditions. For example, rep sockets cannot normally send data, which are responses to requests, until they have first received a request.
If the message cannot be sent, then it is returned to the caller as a
part of the Error
.
§Errors
Closed
: The socket is not open.IncorrectState
: The socket cannot send messages in this state.MessageTooLarge
: The message is too large.NotSupported
: The protocol does not support sending messages.OutOfMemory
: Insufficient memory available.TimedOut
: The operation timed out.
Examples found in repository?
More examples
34fn request(url: &str) -> Result<(), Error> {
35 let s = Socket::new(Protocol::Req0)?;
36 s.dial(url)?;
37
38 println!("REQUEST: SENDING DATE REQUEST");
39 s.send(DATE_REQUEST.to_le_bytes())?;
40
41 println!("REQUEST: WAITING FOR RESPONSE");
42 let msg = s.recv()?;
43 let epoch = u64::from_le_bytes(msg[..].try_into().unwrap());
44
45 println!("REQUEST: UNIX EPOCH WAS {} SECONDS AGO", epoch);
46
47 Ok(())
48}
49
50/// Run the reply portion of the program.
51fn reply(url: &str) -> Result<(), Error> {
52 let s = Socket::new(Protocol::Rep0)?;
53 s.listen(url)?;
54
55 loop {
56 println!("REPLY: WAITING FOR COMMAND");
57 let mut msg = s.recv()?;
58
59 let cmd = u64::from_le_bytes(msg[..].try_into().unwrap());
60 if cmd != DATE_REQUEST {
61 println!("REPLY: UNKNOWN COMMAND");
62 continue;
63 }
64
65 println!("REPLY: RECEIVED DATE REQUEST");
66 let rep = SystemTime::now()
67 .duration_since(SystemTime::UNIX_EPOCH)
68 .expect("current system time is before Unix epoch")
69 .as_secs();
70
71 msg.clear();
72 msg.push_back(&rep.to_le_bytes());
73
74 println!("REPLY: SENDING {}", rep);
75 s.send(msg)?;
76 }
77}
43fn client(url: &str, ms: u64) -> Result<(), nng::Error> {
44 let s = Socket::new(Protocol::Req0)?;
45 s.dial(url)?;
46
47 let start = Instant::now();
48 s.send(ms.to_le_bytes())?;
49 s.recv()?;
50
51 let dur = Instant::now().duration_since(start);
52 let subsecs: u64 = dur.subsec_millis().into();
53 println!(
54 "Request took {} milliseconds",
55 dur.as_secs() * 1000 + subsecs
56 );
57
58 Ok(())
59}
25fn node(name: &str, listen: &str, dial: &[String]) -> Result<(), Error> {
26 let s = Socket::new(Protocol::Bus0)?;
27 s.listen(listen)?;
28
29 // Give time for peers to bind.
30 thread::sleep(Duration::from_secs(1));
31 for peer in dial {
32 s.dial(peer)?;
33 }
34
35 // SEND
36 println!("{0}: SENDING \"{0}\" ONTO BUS", name);
37 s.send(name.as_bytes())?;
38
39 // RECV
40 loop {
41 let msg = s.recv()?;
42 let peer = str::from_utf8(&msg).expect("invalid UTF-8");
43
44 println!("{}: RECEIVED \"{}\" FROM BUS", name, peer);
45 }
46}
30fn surveyor(url: &str) -> Result<(), Error> {
31 let s = Socket::new(Protocol::Surveyor0)?;
32 s.listen(url)?;
33
34 loop {
35 println!("SURVEYOR: SENDING DATE SURVEY REQUEST");
36 s.send(DATE.as_bytes())?;
37
38 loop {
39 let msg = match s.recv() {
40 Ok(m) => m,
41 Err(Error::TimedOut) => break,
42 Err(e) => return Err(e),
43 };
44
45 let date = u64::from_le_bytes(msg[..].try_into().unwrap());
46 println!("SURVEYOR: RECEIVED \"{}\" SURVEY RESPONSE", date);
47 }
48
49 println!("SURVEYOR SURVEY COMPLETE");
50 }
51}
52
53/// Respondent socket.
54fn respondent(url: &str, name: &str) -> Result<(), Error> {
55 let s = Socket::new(Protocol::Respondent0)?;
56 s.dial(url)?;
57
58 loop {
59 let mut msg = s.recv()?;
60
61 let survey = str::from_utf8(&msg).expect("invalid UTF-8");
62 println!(
63 "RESPONDENT ({}): RECEIVED \"{}\" SURVEY REQUEST",
64 name, survey
65 );
66
67 // Reuse the message to avoid allocation.
68 msg.clear();
69 let date = SystemTime::now()
70 .duration_since(SystemTime::UNIX_EPOCH)
71 .expect("system time is before Unix epoch")
72 .as_secs();
73
74 msg.push_back(&date.to_le_bytes());
75
76 println!("RESPONDENT ({}): SENDING \"{}\"", name, date);
77 s.send(msg)?;
78 }
79}
46fn send_recv(s: &Socket, name: &str) -> Result<(), Error> {
47 s.set_opt::<RecvTimeout>(Some(Duration::from_millis(100)))?;
48 loop {
49 // Attempt to reuse the message if we can.
50 let mut msg = match s.recv() {
51 Ok(m) => {
52 let partner = str::from_utf8(&m).expect("invalid UTF-8 message");
53 println!("{}: RECEIVED \"{}\"", name, partner);
54
55 m
56 }
57
58 Err(Error::TimedOut) => Message::new(),
59
60 Err(e) => return Err(e),
61 };
62
63 thread::sleep(Duration::from_secs(1));
64
65 msg.clear();
66 write!(msg, "{}", name).expect("failed to write to message");
67
68 println!("{0}: SENDING \"{0}\"", name);
69 s.send(msg)?;
70 }
71}
Sourcepub fn try_recv(&self) -> Result<Message>
pub fn try_recv(&self) -> Result<Message>
Attempts to receives a message from the socket.
The semantics of what receiving a message means vary from protocol to protocol, so examination of the protocol documentation is encouraged. For example, with a req socket a message may only be received after a request has been sent. Furthermore, some protocols may not support receiving data at all, such as pub.
If no message is available, this function will immediately return.
§Errors
Closed
: The socket is not open.IncorrectState
: The socket cannot receive data in this state.NotSupported
: The protocol does not support receiving.OutOfMemory
: Insufficient memory is available.TryAgain
: The operation would block.
Sourcepub fn try_send<M: Into<Message>>(&self, msg: M) -> Result<(), (Message, Error)>
pub fn try_send<M: Into<Message>>(&self, msg: M) -> Result<(), (Message, Error)>
Attempts to sends a message on the socket.
The semantics of what sending a message means vary from protocol to protocol, so examination of the protocol documentation is encouraged. For example, with a pub socket the data is broadcast so that any peers who have a suitable subscription will be able to receive it. Furthermore, some protocols may not support sending data (such as sub) or may require other conditions. For example, rep sockets cannot normally send data, which are responses to requests, until they have first received a request.
If the message cannot be sent (e.g., there are no peers or there is
backpressure from the peers) then this function will return immediately.
If the message cannot be sent, then it is returned to the caller as a
part of the Error
.
§Errors
Closed
: The socket is not open.IncorrectState
: The socket cannot send messages in this state.MessageTooLarge
: The message is too large.NotSupported
: The protocol does not support sending messages.OutOfMemory
: Insufficient memory available.TryAgain
: The operation would block.
Sourcepub fn recv_async(&self, aio: &Aio) -> Result<()>
pub fn recv_async(&self, aio: &Aio) -> Result<()>
Start a receive operation using the given Aio
and return immediately.
§Errors
IncorrectState
: TheAio
already has a running operation.
Sourcepub fn send_async<M: Into<Message>>(
&self,
aio: &Aio,
msg: M,
) -> Result<(), (Message, Error)>
pub fn send_async<M: Into<Message>>( &self, aio: &Aio, msg: M, ) -> Result<(), (Message, Error)>
Start a send operation on the given Aio
and return immediately.
§Errors
IncorrectState
: TheAio
already has a running operation.
Sourcepub fn pipe_notify<F>(&self, callback: F) -> Result<()>
pub fn pipe_notify<F>(&self, callback: F) -> Result<()>
Register a callback function to be called whenever a pipe event occurs on the socket.
Only a single callback function can be supplied at a time. Registering a new callback implicitly unregisters any previously registered.
§Errors
None specified.
§Panics
If the callback function panics, the program will log the panic if possible and then abort. Future Rustc versions will likely do the same for uncaught panics at FFI boundaries, so this library will produce the abort in order to keep things consistent. As such, the user is responsible for either having a callback that never panics or catching and handling the panic within the callback.
Examples found in repository?
37fn publisher(url: &str) -> Result<(), nng::Error> {
38 let s = Socket::new(Protocol::Pub0)?;
39 let count = Arc::new(AtomicUsize::new(0));
40 let count_clone = count.clone();
41
42 s.pipe_notify(move |_, ev| {
43 match ev {
44 PipeEvent::AddPost => count_clone.fetch_add(1, Ordering::Relaxed),
45 PipeEvent::RemovePost => count_clone.fetch_sub(1, Ordering::Relaxed),
46 _ => 0,
47 };
48 })?;
49
50 s.listen(url)?;
51
52 loop {
53 // Sleep for a little bit before sending the next message.
54 thread::sleep(Duration::from_secs(3));
55
56 // Load the number of subscribers and send the value across
57 let data = count.load(Ordering::Relaxed) as u64;
58 println!("PUBLISHER: SENDING {}", data);
59 s.send(data.to_le_bytes())?;
60 }
61}
Sourcepub fn close(&self)
pub fn close(&self)
Close the underlying socket.
Messages that have been submitted for sending may be flushed or delivered depending on the transport and the linger option. Further attempts to use the socket (via this handle or any other) after this call returns will result in an error. Threads waiting for operations on the socket when this call is executed may also return with an error.
Closing the socket while data is in transmission will likely lead to loss of that data. There is no automatic linger or flush to ensure that the socket send buffers have completely transmitted. It is recommended to wait a brief period after sending data before calling this function.
This function will be called automatically when all handles have been dropped.