pipeline/
pipeline.rs

1//! Pipeline (one-way pipe) example.
2//!
3//! This pattern is useful for solving producer/consumer problems, including load-balancing.
4//! Messages flow from the push side to the pull side. If multiple peers are connected, the pattern
5//! attempts to distribute them fairly.
6//!
7//! This example was derived from [this NNG example][1].
8//!
9//! [1]: https://nanomsg.org/gettingstarted/nng/pipeline.html
10use nng::{Error, Protocol, Socket};
11use std::{env, process, str, thread, time::Duration};
12
13/// Entry point of the application.
14pub fn main() -> Result<(), Error> {
15    let args: Vec<_> = env::args().take(4).collect();
16
17    match &args[..] {
18        [_, t, url] if t == "pull" => pull(url),
19        [_, t, url, arg] if t == "push" => push(url, arg),
20        _ => {
21            println!("Usage: pipeline pull|push <URL> <ARG> ...");
22            process::exit(1);
23        }
24    }
25}
26
27/// Pull socket.
28fn pull(url: &str) -> Result<(), Error> {
29    let s = Socket::new(Protocol::Pull0)?;
30    s.listen(url)?;
31
32    loop {
33        let msg = s.recv()?;
34        let arg = str::from_utf8(&msg).expect("message has invalid UTF-8");
35
36        println!("PULL: RECEIVED \"{}\"", arg);
37    }
38}
39
40/// Push socket.
41fn push(url: &str, arg: &str) -> Result<(), Error> {
42    let s = Socket::new(Protocol::Push0)?;
43    s.dial(url)?;
44
45    println!("PUSH: SENDING \"{}\"", arg);
46    s.send(arg.as_bytes())?;
47
48    // Wait for messages to flush before shutting down.
49    thread::sleep(Duration::from_secs(1));
50    Ok(())
51}