Struct Socket

Source
pub struct Socket { /* private fields */ }
Expand description

An NNG socket.

All communication between application and remote Scalability Protocol peers is done through sockets. A given socket can have multiple dialers, listeners, and pipes, and may be connected to multiple transports at the same time. However, a given socket will have exactly one protocol associated with it and is responsible for any state machines or other application-specific logic.

See the NNG documentation for more information.

Implementations§

Source§

impl Socket

Source

pub fn new(t: Protocol) -> Result<Socket>

Creates a new socket which uses the specified protocol.

§Errors
Examples found in repository?
examples/pair.rs (line 31)
30fn node0(url: &str) -> Result<(), Error> {
31    let s = Socket::new(Protocol::Pair0)?;
32    s.listen(url)?;
33
34    send_recv(&s, "NODE0")
35}
36
37/// The dialing node.
38fn node1(url: &str) -> Result<(), Error> {
39    let s = Socket::new(Protocol::Pair0)?;
40    s.dial(url)?;
41
42    send_recv(&s, "NODE1")
43}
More examples
Hide additional examples
examples/pipeline.rs (line 29)
28fn pull(url: &str) -> Result<(), Error> {
29    let s = Socket::new(Protocol::Pull0)?;
30    s.listen(url)?;
31
32    loop {
33        let msg = s.recv()?;
34        let arg = str::from_utf8(&msg).expect("message has invalid UTF-8");
35
36        println!("PULL: RECEIVED \"{}\"", arg);
37    }
38}
39
40/// Push socket.
41fn push(url: &str, arg: &str) -> Result<(), Error> {
42    let s = Socket::new(Protocol::Push0)?;
43    s.dial(url)?;
44
45    println!("PUSH: SENDING \"{}\"", arg);
46    s.send(arg.as_bytes())?;
47
48    // Wait for messages to flush before shutting down.
49    thread::sleep(Duration::from_secs(1));
50    Ok(())
51}
examples/reqrep.rs (line 35)
34fn request(url: &str) -> Result<(), Error> {
35    let s = Socket::new(Protocol::Req0)?;
36    s.dial(url)?;
37
38    println!("REQUEST: SENDING DATE REQUEST");
39    s.send(DATE_REQUEST.to_le_bytes())?;
40
41    println!("REQUEST: WAITING FOR RESPONSE");
42    let msg = s.recv()?;
43    let epoch = u64::from_le_bytes(msg[..].try_into().unwrap());
44
45    println!("REQUEST: UNIX EPOCH WAS {} SECONDS AGO", epoch);
46
47    Ok(())
48}
49
50/// Run the reply portion of the program.
51fn reply(url: &str) -> Result<(), Error> {
52    let s = Socket::new(Protocol::Rep0)?;
53    s.listen(url)?;
54
55    loop {
56        println!("REPLY: WAITING FOR COMMAND");
57        let mut msg = s.recv()?;
58
59        let cmd = u64::from_le_bytes(msg[..].try_into().unwrap());
60        if cmd != DATE_REQUEST {
61            println!("REPLY: UNKNOWN COMMAND");
62            continue;
63        }
64
65        println!("REPLY: RECEIVED DATE REQUEST");
66        let rep = SystemTime::now()
67            .duration_since(SystemTime::UNIX_EPOCH)
68            .expect("current system time is before Unix epoch")
69            .as_secs();
70
71        msg.clear();
72        msg.push_back(&rep.to_le_bytes());
73
74        println!("REPLY: SENDING {}", rep);
75        s.send(msg)?;
76    }
77}
examples/async.rs (line 44)
43fn client(url: &str, ms: u64) -> Result<(), nng::Error> {
44    let s = Socket::new(Protocol::Req0)?;
45    s.dial(url)?;
46
47    let start = Instant::now();
48    s.send(ms.to_le_bytes())?;
49    s.recv()?;
50
51    let dur = Instant::now().duration_since(start);
52    let subsecs: u64 = dur.subsec_millis().into();
53    println!(
54        "Request took {} milliseconds",
55        dur.as_secs() * 1000 + subsecs
56    );
57
58    Ok(())
59}
60
61/// Run the server portion of the program.
62fn server(url: &str) -> Result<(), nng::Error> {
63    // Create the socket
64    let s = Socket::new(Protocol::Rep0)?;
65
66    // Create all of the worker contexts
67    let workers: Vec<_> = (0..PARALLEL)
68        .map(|_| {
69            let ctx = Context::new(&s)?;
70            let ctx_clone = ctx.clone();
71            let aio = Aio::new(move |aio, res| worker_callback(aio, &ctx_clone, res))?;
72            Ok((aio, ctx))
73        })
74        .collect::<Result<_, nng::Error>>()?;
75
76    // Only after we have the workers do we start listening.
77    s.listen(url)?;
78
79    // Now start all of the workers listening.
80    for (a, c) in &workers {
81        c.recv(a)?;
82    }
83
84    thread::sleep(Duration::from_secs(60 * 60 * 24 * 365));
85
86    Ok(())
87}
examples/bus.rs (line 26)
25fn node(name: &str, listen: &str, dial: &[String]) -> Result<(), Error> {
26    let s = Socket::new(Protocol::Bus0)?;
27    s.listen(listen)?;
28
29    // Give time for peers to bind.
30    thread::sleep(Duration::from_secs(1));
31    for peer in dial {
32        s.dial(peer)?;
33    }
34
35    // SEND
36    println!("{0}: SENDING \"{0}\" ONTO BUS", name);
37    s.send(name.as_bytes())?;
38
39    // RECV
40    loop {
41        let msg = s.recv()?;
42        let peer = str::from_utf8(&msg).expect("invalid UTF-8");
43
44        println!("{}: RECEIVED \"{}\" FROM BUS", name, peer);
45    }
46}
examples/survey.rs (line 31)
30fn surveyor(url: &str) -> Result<(), Error> {
31    let s = Socket::new(Protocol::Surveyor0)?;
32    s.listen(url)?;
33
34    loop {
35        println!("SURVEYOR: SENDING DATE SURVEY REQUEST");
36        s.send(DATE.as_bytes())?;
37
38        loop {
39            let msg = match s.recv() {
40                Ok(m) => m,
41                Err(Error::TimedOut) => break,
42                Err(e) => return Err(e),
43            };
44
45            let date = u64::from_le_bytes(msg[..].try_into().unwrap());
46            println!("SURVEYOR: RECEIVED \"{}\" SURVEY RESPONSE", date);
47        }
48
49        println!("SURVEYOR SURVEY COMPLETE");
50    }
51}
52
53/// Respondent socket.
54fn respondent(url: &str, name: &str) -> Result<(), Error> {
55    let s = Socket::new(Protocol::Respondent0)?;
56    s.dial(url)?;
57
58    loop {
59        let mut msg = s.recv()?;
60
61        let survey = str::from_utf8(&msg).expect("invalid UTF-8");
62        println!(
63            "RESPONDENT ({}): RECEIVED \"{}\" SURVEY REQUEST",
64            name, survey
65        );
66
67        // Reuse the message to avoid allocation.
68        msg.clear();
69        let date = SystemTime::now()
70            .duration_since(SystemTime::UNIX_EPOCH)
71            .expect("system time is before Unix epoch")
72            .as_secs();
73
74        msg.push_back(&date.to_le_bytes());
75
76        println!("RESPONDENT ({}): SENDING \"{}\"", name, date);
77        s.send(msg)?;
78    }
79}
Source

pub fn dial(&self, url: &str) -> Result<()>

Initiates a remote connection to a listener.

When the connection is closed, the underlying Dialer will attempt to re-establish the connection.

The first attempt to connect to the address indicated by the provided url is done synchronously, including any necessary name resolution. As a result, a failure, such as if the connection is refused, will be returned immediately and no further action will be taken.

If the connection was closed for a synchronously dialed connection, the dialer will still attempt to redial asynchronously.

Because the dialer is started immediately, it is generally not possible to apply extra configuration. If that is needed, or if one wishes to close the dialer before the socket, applications should consider using the Dialer type directly.

See the NNG documentation for more information.

§Errors
Examples found in repository?
examples/pair.rs (line 40)
38fn node1(url: &str) -> Result<(), Error> {
39    let s = Socket::new(Protocol::Pair0)?;
40    s.dial(url)?;
41
42    send_recv(&s, "NODE1")
43}
More examples
Hide additional examples
examples/pipeline.rs (line 43)
41fn push(url: &str, arg: &str) -> Result<(), Error> {
42    let s = Socket::new(Protocol::Push0)?;
43    s.dial(url)?;
44
45    println!("PUSH: SENDING \"{}\"", arg);
46    s.send(arg.as_bytes())?;
47
48    // Wait for messages to flush before shutting down.
49    thread::sleep(Duration::from_secs(1));
50    Ok(())
51}
examples/reqrep.rs (line 36)
34fn request(url: &str) -> Result<(), Error> {
35    let s = Socket::new(Protocol::Req0)?;
36    s.dial(url)?;
37
38    println!("REQUEST: SENDING DATE REQUEST");
39    s.send(DATE_REQUEST.to_le_bytes())?;
40
41    println!("REQUEST: WAITING FOR RESPONSE");
42    let msg = s.recv()?;
43    let epoch = u64::from_le_bytes(msg[..].try_into().unwrap());
44
45    println!("REQUEST: UNIX EPOCH WAS {} SECONDS AGO", epoch);
46
47    Ok(())
48}
examples/async.rs (line 45)
43fn client(url: &str, ms: u64) -> Result<(), nng::Error> {
44    let s = Socket::new(Protocol::Req0)?;
45    s.dial(url)?;
46
47    let start = Instant::now();
48    s.send(ms.to_le_bytes())?;
49    s.recv()?;
50
51    let dur = Instant::now().duration_since(start);
52    let subsecs: u64 = dur.subsec_millis().into();
53    println!(
54        "Request took {} milliseconds",
55        dur.as_secs() * 1000 + subsecs
56    );
57
58    Ok(())
59}
examples/pubsub.rs (line 66)
64fn subscriber(url: &str) -> Result<(), nng::Error> {
65    let s = Socket::new(Protocol::Sub0)?;
66    s.dial(url)?;
67
68    println!("SUBSCRIBER: SUBSCRIBING TO ALL TOPICS");
69    let all_topics = vec![];
70    s.set_opt::<Subscribe>(all_topics)?;
71
72    loop {
73        let msg = s.recv()?;
74        let subs = usize::from_le_bytes(msg[..].try_into().unwrap());
75        println!("SUBSCRIBER: THERE ARE {} SUBSCRIBERS", subs);
76    }
77}
examples/bus.rs (line 32)
25fn node(name: &str, listen: &str, dial: &[String]) -> Result<(), Error> {
26    let s = Socket::new(Protocol::Bus0)?;
27    s.listen(listen)?;
28
29    // Give time for peers to bind.
30    thread::sleep(Duration::from_secs(1));
31    for peer in dial {
32        s.dial(peer)?;
33    }
34
35    // SEND
36    println!("{0}: SENDING \"{0}\" ONTO BUS", name);
37    s.send(name.as_bytes())?;
38
39    // RECV
40    loop {
41        let msg = s.recv()?;
42        let peer = str::from_utf8(&msg).expect("invalid UTF-8");
43
44        println!("{}: RECEIVED \"{}\" FROM BUS", name, peer);
45    }
46}
Source

pub fn listen(&self, url: &str) -> Result<()>

Initiates and starts a listener on the specified address.

Listeners are used to accept connections initiated by remote dialers. Unlike a dialer, listeners generally can have many connections open concurrently.

The act of “binding” to the address indicated by url is done synchronously, including any necessary name resolution. As a result, a failure, such as if the address is already in use, will be returned immediately.

Because the listener is started immediately, it is generally not possible to apply extra configuration. If that is needed, or if one wishes to close the dialer before the socket, applications should consider using the Listener type directly.

See the NNG documentation for more information.

§Errors
Examples found in repository?
examples/pair.rs (line 32)
30fn node0(url: &str) -> Result<(), Error> {
31    let s = Socket::new(Protocol::Pair0)?;
32    s.listen(url)?;
33
34    send_recv(&s, "NODE0")
35}
More examples
Hide additional examples
examples/pipeline.rs (line 30)
28fn pull(url: &str) -> Result<(), Error> {
29    let s = Socket::new(Protocol::Pull0)?;
30    s.listen(url)?;
31
32    loop {
33        let msg = s.recv()?;
34        let arg = str::from_utf8(&msg).expect("message has invalid UTF-8");
35
36        println!("PULL: RECEIVED \"{}\"", arg);
37    }
38}
examples/bus.rs (line 27)
25fn node(name: &str, listen: &str, dial: &[String]) -> Result<(), Error> {
26    let s = Socket::new(Protocol::Bus0)?;
27    s.listen(listen)?;
28
29    // Give time for peers to bind.
30    thread::sleep(Duration::from_secs(1));
31    for peer in dial {
32        s.dial(peer)?;
33    }
34
35    // SEND
36    println!("{0}: SENDING \"{0}\" ONTO BUS", name);
37    s.send(name.as_bytes())?;
38
39    // RECV
40    loop {
41        let msg = s.recv()?;
42        let peer = str::from_utf8(&msg).expect("invalid UTF-8");
43
44        println!("{}: RECEIVED \"{}\" FROM BUS", name, peer);
45    }
46}
examples/survey.rs (line 32)
30fn surveyor(url: &str) -> Result<(), Error> {
31    let s = Socket::new(Protocol::Surveyor0)?;
32    s.listen(url)?;
33
34    loop {
35        println!("SURVEYOR: SENDING DATE SURVEY REQUEST");
36        s.send(DATE.as_bytes())?;
37
38        loop {
39            let msg = match s.recv() {
40                Ok(m) => m,
41                Err(Error::TimedOut) => break,
42                Err(e) => return Err(e),
43            };
44
45            let date = u64::from_le_bytes(msg[..].try_into().unwrap());
46            println!("SURVEYOR: RECEIVED \"{}\" SURVEY RESPONSE", date);
47        }
48
49        println!("SURVEYOR SURVEY COMPLETE");
50    }
51}
examples/async.rs (line 77)
62fn server(url: &str) -> Result<(), nng::Error> {
63    // Create the socket
64    let s = Socket::new(Protocol::Rep0)?;
65
66    // Create all of the worker contexts
67    let workers: Vec<_> = (0..PARALLEL)
68        .map(|_| {
69            let ctx = Context::new(&s)?;
70            let ctx_clone = ctx.clone();
71            let aio = Aio::new(move |aio, res| worker_callback(aio, &ctx_clone, res))?;
72            Ok((aio, ctx))
73        })
74        .collect::<Result<_, nng::Error>>()?;
75
76    // Only after we have the workers do we start listening.
77    s.listen(url)?;
78
79    // Now start all of the workers listening.
80    for (a, c) in &workers {
81        c.recv(a)?;
82    }
83
84    thread::sleep(Duration::from_secs(60 * 60 * 24 * 365));
85
86    Ok(())
87}
examples/reqrep.rs (line 53)
51fn reply(url: &str) -> Result<(), Error> {
52    let s = Socket::new(Protocol::Rep0)?;
53    s.listen(url)?;
54
55    loop {
56        println!("REPLY: WAITING FOR COMMAND");
57        let mut msg = s.recv()?;
58
59        let cmd = u64::from_le_bytes(msg[..].try_into().unwrap());
60        if cmd != DATE_REQUEST {
61            println!("REPLY: UNKNOWN COMMAND");
62            continue;
63        }
64
65        println!("REPLY: RECEIVED DATE REQUEST");
66        let rep = SystemTime::now()
67            .duration_since(SystemTime::UNIX_EPOCH)
68            .expect("current system time is before Unix epoch")
69            .as_secs();
70
71        msg.clear();
72        msg.push_back(&rep.to_le_bytes());
73
74        println!("REPLY: SENDING {}", rep);
75        s.send(msg)?;
76    }
77}
Source

pub fn dial_async(&self, url: &str) -> Result<()>

Asynchronously initiates a remote connection to a listener.

When the connection is closed, the underlying Dialer will attempt to re-establish the connection. It will also periodically retry a connection automatically if an attempt to connect asynchronously fails.

Because the dialer is started immediately, it is generally not possible to apply extra configuration. If that is needed, or if one wishes to close the dialer before the socket, applications should consider using the Dialer type directly.

See the NNG documentation for more information.

§Errors
Source

pub fn recv(&self) -> Result<Message>

Receives a message from the socket.

The semantics of what receiving a message means vary from protocol to protocol, so examination of the protocol documentation is encouraged. For example, with a req socket a message may only be received after a request has been sent. Furthermore, some protocols may not support receiving data at all, such as pub.

§Errors
Examples found in repository?
examples/pipeline.rs (line 33)
28fn pull(url: &str) -> Result<(), Error> {
29    let s = Socket::new(Protocol::Pull0)?;
30    s.listen(url)?;
31
32    loop {
33        let msg = s.recv()?;
34        let arg = str::from_utf8(&msg).expect("message has invalid UTF-8");
35
36        println!("PULL: RECEIVED \"{}\"", arg);
37    }
38}
More examples
Hide additional examples
examples/reqrep.rs (line 42)
34fn request(url: &str) -> Result<(), Error> {
35    let s = Socket::new(Protocol::Req0)?;
36    s.dial(url)?;
37
38    println!("REQUEST: SENDING DATE REQUEST");
39    s.send(DATE_REQUEST.to_le_bytes())?;
40
41    println!("REQUEST: WAITING FOR RESPONSE");
42    let msg = s.recv()?;
43    let epoch = u64::from_le_bytes(msg[..].try_into().unwrap());
44
45    println!("REQUEST: UNIX EPOCH WAS {} SECONDS AGO", epoch);
46
47    Ok(())
48}
49
50/// Run the reply portion of the program.
51fn reply(url: &str) -> Result<(), Error> {
52    let s = Socket::new(Protocol::Rep0)?;
53    s.listen(url)?;
54
55    loop {
56        println!("REPLY: WAITING FOR COMMAND");
57        let mut msg = s.recv()?;
58
59        let cmd = u64::from_le_bytes(msg[..].try_into().unwrap());
60        if cmd != DATE_REQUEST {
61            println!("REPLY: UNKNOWN COMMAND");
62            continue;
63        }
64
65        println!("REPLY: RECEIVED DATE REQUEST");
66        let rep = SystemTime::now()
67            .duration_since(SystemTime::UNIX_EPOCH)
68            .expect("current system time is before Unix epoch")
69            .as_secs();
70
71        msg.clear();
72        msg.push_back(&rep.to_le_bytes());
73
74        println!("REPLY: SENDING {}", rep);
75        s.send(msg)?;
76    }
77}
examples/async.rs (line 49)
43fn client(url: &str, ms: u64) -> Result<(), nng::Error> {
44    let s = Socket::new(Protocol::Req0)?;
45    s.dial(url)?;
46
47    let start = Instant::now();
48    s.send(ms.to_le_bytes())?;
49    s.recv()?;
50
51    let dur = Instant::now().duration_since(start);
52    let subsecs: u64 = dur.subsec_millis().into();
53    println!(
54        "Request took {} milliseconds",
55        dur.as_secs() * 1000 + subsecs
56    );
57
58    Ok(())
59}
examples/pubsub.rs (line 73)
64fn subscriber(url: &str) -> Result<(), nng::Error> {
65    let s = Socket::new(Protocol::Sub0)?;
66    s.dial(url)?;
67
68    println!("SUBSCRIBER: SUBSCRIBING TO ALL TOPICS");
69    let all_topics = vec![];
70    s.set_opt::<Subscribe>(all_topics)?;
71
72    loop {
73        let msg = s.recv()?;
74        let subs = usize::from_le_bytes(msg[..].try_into().unwrap());
75        println!("SUBSCRIBER: THERE ARE {} SUBSCRIBERS", subs);
76    }
77}
examples/bus.rs (line 41)
25fn node(name: &str, listen: &str, dial: &[String]) -> Result<(), Error> {
26    let s = Socket::new(Protocol::Bus0)?;
27    s.listen(listen)?;
28
29    // Give time for peers to bind.
30    thread::sleep(Duration::from_secs(1));
31    for peer in dial {
32        s.dial(peer)?;
33    }
34
35    // SEND
36    println!("{0}: SENDING \"{0}\" ONTO BUS", name);
37    s.send(name.as_bytes())?;
38
39    // RECV
40    loop {
41        let msg = s.recv()?;
42        let peer = str::from_utf8(&msg).expect("invalid UTF-8");
43
44        println!("{}: RECEIVED \"{}\" FROM BUS", name, peer);
45    }
46}
examples/survey.rs (line 39)
30fn surveyor(url: &str) -> Result<(), Error> {
31    let s = Socket::new(Protocol::Surveyor0)?;
32    s.listen(url)?;
33
34    loop {
35        println!("SURVEYOR: SENDING DATE SURVEY REQUEST");
36        s.send(DATE.as_bytes())?;
37
38        loop {
39            let msg = match s.recv() {
40                Ok(m) => m,
41                Err(Error::TimedOut) => break,
42                Err(e) => return Err(e),
43            };
44
45            let date = u64::from_le_bytes(msg[..].try_into().unwrap());
46            println!("SURVEYOR: RECEIVED \"{}\" SURVEY RESPONSE", date);
47        }
48
49        println!("SURVEYOR SURVEY COMPLETE");
50    }
51}
52
53/// Respondent socket.
54fn respondent(url: &str, name: &str) -> Result<(), Error> {
55    let s = Socket::new(Protocol::Respondent0)?;
56    s.dial(url)?;
57
58    loop {
59        let mut msg = s.recv()?;
60
61        let survey = str::from_utf8(&msg).expect("invalid UTF-8");
62        println!(
63            "RESPONDENT ({}): RECEIVED \"{}\" SURVEY REQUEST",
64            name, survey
65        );
66
67        // Reuse the message to avoid allocation.
68        msg.clear();
69        let date = SystemTime::now()
70            .duration_since(SystemTime::UNIX_EPOCH)
71            .expect("system time is before Unix epoch")
72            .as_secs();
73
74        msg.push_back(&date.to_le_bytes());
75
76        println!("RESPONDENT ({}): SENDING \"{}\"", name, date);
77        s.send(msg)?;
78    }
79}
Source

pub fn send<M: Into<Message>>(&self, msg: M) -> Result<(), (Message, Error)>

Sends a message on the socket.

The semantics of what sending a message means vary from protocol to protocol, so examination of the protocol documentation is encouraged. For example, with a pub socket the data is broadcast so that any peers who have a suitable subscription will be able to receive it. Furthermore, some protocols may not support sending data (such as sub) or may require other conditions. For example, rep sockets cannot normally send data, which are responses to requests, until they have first received a request.

If the message cannot be sent, then it is returned to the caller as a part of the Error.

§Errors
Examples found in repository?
examples/pipeline.rs (line 46)
41fn push(url: &str, arg: &str) -> Result<(), Error> {
42    let s = Socket::new(Protocol::Push0)?;
43    s.dial(url)?;
44
45    println!("PUSH: SENDING \"{}\"", arg);
46    s.send(arg.as_bytes())?;
47
48    // Wait for messages to flush before shutting down.
49    thread::sleep(Duration::from_secs(1));
50    Ok(())
51}
More examples
Hide additional examples
examples/reqrep.rs (line 39)
34fn request(url: &str) -> Result<(), Error> {
35    let s = Socket::new(Protocol::Req0)?;
36    s.dial(url)?;
37
38    println!("REQUEST: SENDING DATE REQUEST");
39    s.send(DATE_REQUEST.to_le_bytes())?;
40
41    println!("REQUEST: WAITING FOR RESPONSE");
42    let msg = s.recv()?;
43    let epoch = u64::from_le_bytes(msg[..].try_into().unwrap());
44
45    println!("REQUEST: UNIX EPOCH WAS {} SECONDS AGO", epoch);
46
47    Ok(())
48}
49
50/// Run the reply portion of the program.
51fn reply(url: &str) -> Result<(), Error> {
52    let s = Socket::new(Protocol::Rep0)?;
53    s.listen(url)?;
54
55    loop {
56        println!("REPLY: WAITING FOR COMMAND");
57        let mut msg = s.recv()?;
58
59        let cmd = u64::from_le_bytes(msg[..].try_into().unwrap());
60        if cmd != DATE_REQUEST {
61            println!("REPLY: UNKNOWN COMMAND");
62            continue;
63        }
64
65        println!("REPLY: RECEIVED DATE REQUEST");
66        let rep = SystemTime::now()
67            .duration_since(SystemTime::UNIX_EPOCH)
68            .expect("current system time is before Unix epoch")
69            .as_secs();
70
71        msg.clear();
72        msg.push_back(&rep.to_le_bytes());
73
74        println!("REPLY: SENDING {}", rep);
75        s.send(msg)?;
76    }
77}
examples/async.rs (line 48)
43fn client(url: &str, ms: u64) -> Result<(), nng::Error> {
44    let s = Socket::new(Protocol::Req0)?;
45    s.dial(url)?;
46
47    let start = Instant::now();
48    s.send(ms.to_le_bytes())?;
49    s.recv()?;
50
51    let dur = Instant::now().duration_since(start);
52    let subsecs: u64 = dur.subsec_millis().into();
53    println!(
54        "Request took {} milliseconds",
55        dur.as_secs() * 1000 + subsecs
56    );
57
58    Ok(())
59}
examples/bus.rs (line 37)
25fn node(name: &str, listen: &str, dial: &[String]) -> Result<(), Error> {
26    let s = Socket::new(Protocol::Bus0)?;
27    s.listen(listen)?;
28
29    // Give time for peers to bind.
30    thread::sleep(Duration::from_secs(1));
31    for peer in dial {
32        s.dial(peer)?;
33    }
34
35    // SEND
36    println!("{0}: SENDING \"{0}\" ONTO BUS", name);
37    s.send(name.as_bytes())?;
38
39    // RECV
40    loop {
41        let msg = s.recv()?;
42        let peer = str::from_utf8(&msg).expect("invalid UTF-8");
43
44        println!("{}: RECEIVED \"{}\" FROM BUS", name, peer);
45    }
46}
examples/survey.rs (line 36)
30fn surveyor(url: &str) -> Result<(), Error> {
31    let s = Socket::new(Protocol::Surveyor0)?;
32    s.listen(url)?;
33
34    loop {
35        println!("SURVEYOR: SENDING DATE SURVEY REQUEST");
36        s.send(DATE.as_bytes())?;
37
38        loop {
39            let msg = match s.recv() {
40                Ok(m) => m,
41                Err(Error::TimedOut) => break,
42                Err(e) => return Err(e),
43            };
44
45            let date = u64::from_le_bytes(msg[..].try_into().unwrap());
46            println!("SURVEYOR: RECEIVED \"{}\" SURVEY RESPONSE", date);
47        }
48
49        println!("SURVEYOR SURVEY COMPLETE");
50    }
51}
52
53/// Respondent socket.
54fn respondent(url: &str, name: &str) -> Result<(), Error> {
55    let s = Socket::new(Protocol::Respondent0)?;
56    s.dial(url)?;
57
58    loop {
59        let mut msg = s.recv()?;
60
61        let survey = str::from_utf8(&msg).expect("invalid UTF-8");
62        println!(
63            "RESPONDENT ({}): RECEIVED \"{}\" SURVEY REQUEST",
64            name, survey
65        );
66
67        // Reuse the message to avoid allocation.
68        msg.clear();
69        let date = SystemTime::now()
70            .duration_since(SystemTime::UNIX_EPOCH)
71            .expect("system time is before Unix epoch")
72            .as_secs();
73
74        msg.push_back(&date.to_le_bytes());
75
76        println!("RESPONDENT ({}): SENDING \"{}\"", name, date);
77        s.send(msg)?;
78    }
79}
examples/pair.rs (line 69)
46fn send_recv(s: &Socket, name: &str) -> Result<(), Error> {
47    s.set_opt::<RecvTimeout>(Some(Duration::from_millis(100)))?;
48    loop {
49        // Attempt to reuse the message if we can.
50        let mut msg = match s.recv() {
51            Ok(m) => {
52                let partner = str::from_utf8(&m).expect("invalid UTF-8 message");
53                println!("{}: RECEIVED \"{}\"", name, partner);
54
55                m
56            }
57
58            Err(Error::TimedOut) => Message::new(),
59
60            Err(e) => return Err(e),
61        };
62
63        thread::sleep(Duration::from_secs(1));
64
65        msg.clear();
66        write!(msg, "{}", name).expect("failed to write to message");
67
68        println!("{0}: SENDING \"{0}\"", name);
69        s.send(msg)?;
70    }
71}
Source

pub fn try_recv(&self) -> Result<Message>

Attempts to receives a message from the socket.

The semantics of what receiving a message means vary from protocol to protocol, so examination of the protocol documentation is encouraged. For example, with a req socket a message may only be received after a request has been sent. Furthermore, some protocols may not support receiving data at all, such as pub.

If no message is available, this function will immediately return.

§Errors
Source

pub fn try_send<M: Into<Message>>(&self, msg: M) -> Result<(), (Message, Error)>

Attempts to sends a message on the socket.

The semantics of what sending a message means vary from protocol to protocol, so examination of the protocol documentation is encouraged. For example, with a pub socket the data is broadcast so that any peers who have a suitable subscription will be able to receive it. Furthermore, some protocols may not support sending data (such as sub) or may require other conditions. For example, rep sockets cannot normally send data, which are responses to requests, until they have first received a request.

If the message cannot be sent (e.g., there are no peers or there is backpressure from the peers) then this function will return immediately. If the message cannot be sent, then it is returned to the caller as a part of the Error.

§Errors
Source

pub fn recv_async(&self, aio: &Aio) -> Result<()>

Start a receive operation using the given Aio and return immediately.

§Errors
Source

pub fn send_async<M: Into<Message>>( &self, aio: &Aio, msg: M, ) -> Result<(), (Message, Error)>

Start a send operation on the given Aio and return immediately.

§Errors
Source

pub fn pipe_notify<F>(&self, callback: F) -> Result<()>
where F: Fn(Pipe, PipeEvent) + Send + Sync + 'static,

Register a callback function to be called whenever a pipe event occurs on the socket.

Only a single callback function can be supplied at a time. Registering a new callback implicitly unregisters any previously registered.

§Errors

None specified.

§Panics

If the callback function panics, the program will log the panic if possible and then abort. Future Rustc versions will likely do the same for uncaught panics at FFI boundaries, so this library will produce the abort in order to keep things consistent. As such, the user is responsible for either having a callback that never panics or catching and handling the panic within the callback.

Examples found in repository?
examples/pubsub.rs (lines 42-48)
37fn publisher(url: &str) -> Result<(), nng::Error> {
38    let s = Socket::new(Protocol::Pub0)?;
39    let count = Arc::new(AtomicUsize::new(0));
40    let count_clone = count.clone();
41
42    s.pipe_notify(move |_, ev| {
43        match ev {
44            PipeEvent::AddPost => count_clone.fetch_add(1, Ordering::Relaxed),
45            PipeEvent::RemovePost => count_clone.fetch_sub(1, Ordering::Relaxed),
46            _ => 0,
47        };
48    })?;
49
50    s.listen(url)?;
51
52    loop {
53        // Sleep for a little bit before sending the next message.
54        thread::sleep(Duration::from_secs(3));
55
56        // Load the number of subscribers and send the value across
57        let data = count.load(Ordering::Relaxed) as u64;
58        println!("PUBLISHER: SENDING {}", data);
59        s.send(data.to_le_bytes())?;
60    }
61}
Source

pub fn close(&self)

Close the underlying socket.

Messages that have been submitted for sending may be flushed or delivered depending on the transport and the linger option. Further attempts to use the socket (via this handle or any other) after this call returns will result in an error. Threads waiting for operations on the socket when this call is executed may also return with an error.

Closing the socket while data is in transmission will likely lead to loss of that data. There is no automatic linger or flush to ensure that the socket send buffers have completely transmitted. It is recommended to wait a brief period after sending data before calling this function.

This function will be called automatically when all handles have been dropped.

Trait Implementations§

Source§

impl Clone for Socket

Source§

fn clone(&self) -> Socket

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for Socket

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Hash for Socket

Source§

fn hash<H: Hasher>(&self, state: &mut H)

Feeds this value into the given Hasher. Read more
1.3.0 · Source§

fn hash_slice<H>(data: &[Self], state: &mut H)
where H: Hasher, Self: Sized,

Feeds a slice of this type into the given Hasher. Read more
Source§

impl Ord for Socket

Source§

fn cmp(&self, other: &Socket) -> Ordering

This method returns an Ordering between self and other. Read more
1.21.0 · Source§

fn max(self, other: Self) -> Self
where Self: Sized,

Compares and returns the maximum of two values. Read more
1.21.0 · Source§

fn min(self, other: Self) -> Self
where Self: Sized,

Compares and returns the minimum of two values. Read more
1.50.0 · Source§

fn clamp(self, min: Self, max: Self) -> Self
where Self: Sized,

Restrict a value to a certain interval. Read more
Source§

impl PartialEq for Socket

Source§

fn eq(&self, other: &Socket) -> bool

Tests for self and other values to be equal, and is used by ==.
1.0.0 · Source§

fn ne(&self, other: &Rhs) -> bool

Tests for !=. The default implementation is almost always sufficient, and should not be overridden without very good reason.
Source§

impl PartialOrd for Socket

Source§

fn partial_cmp(&self, other: &Socket) -> Option<Ordering>

This method returns an ordering between self and other values if one exists. Read more
1.0.0 · Source§

fn lt(&self, other: &Rhs) -> bool

Tests less than (for self and other) and is used by the < operator. Read more
1.0.0 · Source§

fn le(&self, other: &Rhs) -> bool

Tests less than or equal to (for self and other) and is used by the <= operator. Read more
1.0.0 · Source§

fn gt(&self, other: &Rhs) -> bool

Tests greater than (for self and other) and is used by the > operator. Read more
1.0.0 · Source§

fn ge(&self, other: &Rhs) -> bool

Tests greater than or equal to (for self and other) and is used by the >= operator. Read more
Source§

impl TryFrom<Socket> for RawSocket

Source§

type Error = CookedSocketError

The type returned in the event of a conversion error.
Source§

fn try_from(socket: Socket) -> Result<Self, Self::Error>

Performs the conversion.
Source§

impl Eq for Socket

Source§

impl GetOpt<MaxTtl> for Socket

Source§

impl GetOpt<Raw> for Socket

Source§

impl GetOpt<RecvBufferSize> for Socket

Source§

impl GetOpt<RecvFd> for Socket

Source§

impl GetOpt<RecvTimeout> for Socket

Source§

impl GetOpt<ResendTime> for Socket

Source§

impl GetOpt<SendBufferSize> for Socket

Source§

impl GetOpt<SendFd> for Socket

Source§

impl GetOpt<SendTimeout> for Socket

Source§

impl GetOpt<SocketName> for Socket

Source§

impl GetOpt<SurveyTime> for Socket

Source§

impl SetOpt<CaFile> for Socket

Source§

impl SetOpt<CertKeyFile> for Socket

Source§

impl SetOpt<KeepAlive> for Socket

Source§

impl SetOpt<MaxTtl> for Socket

Source§

impl SetOpt<NoDelay> for Socket

Source§

impl SetOpt<ReconnectMaxTime> for Socket

Source§

impl SetOpt<ReconnectMinTime> for Socket

Source§

impl SetOpt<RecvBufferSize> for Socket

Source§

impl SetOpt<RecvMaxSize> for Socket

Source§

impl SetOpt<RecvTimeout> for Socket

Source§

impl SetOpt<RequestHeaders> for Socket

Source§

impl SetOpt<ResendTime> for Socket

Source§

impl SetOpt<ResponseHeaders> for Socket

Source§

impl SetOpt<SendBufferSize> for Socket

Source§

impl SetOpt<SendTimeout> for Socket

Source§

impl SetOpt<SocketName> for Socket

Source§

impl SetOpt<Subscribe> for Socket

Source§

impl SetOpt<SurveyTime> for Socket

Source§

impl SetOpt<Unsubscribe> for Socket

Auto Trait Implementations§

§

impl Freeze for Socket

§

impl RefUnwindSafe for Socket

§

impl Send for Socket

§

impl Sync for Socket

§

impl Unpin for Socket

§

impl UnwindSafe for Socket

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Options for T
where T: HasOpts,

Source§

fn get_opt<T: OptOps>(&self) -> Result<T::OptType>
where Self: GetOpt<T>,

Reads the specified option from the object.
Source§

fn set_opt<T: OptOps>(&self, val: T::OptType) -> Result<()>
where Self: SetOpt<T>,

Writes the specified option to the object.
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.