sub/
sub.rs

1extern crate futures;
2extern crate tokio_core;
3extern crate nsqueue;
4
5use futures::{Stream, Future};
6use tokio_core::reactor::Core;
7use nsqueue::config::*;
8use nsqueue::consumer::*;
9
10fn main() {
11     let mut core = Core::new().unwrap();
12     let handle = core.handle();
13     let addr = "127.0.0.1:4150".parse().unwrap();
14
15     core.run(
16         Consumer::connect(&addr, &handle, Config::default())
17         .and_then(|conn| {
18            conn.subscribe("some_topic".into(), "some_channel".into())
19            .and_then(move |response| {
20                let ret = response.for_each(move |message| {
21                    if message.message_id == "_heartbeat_" {
22                        conn.nop();
23                    } else {
24                        println!("Response {:?} {:?}", message.message_id, message.message_body);
25                        conn.fin(message.message_id); // Inform NSQ (Message consumed)
26                    }
27                    Ok(())
28                });
29                ret
30            })
31         })
32     ).unwrap();
33}