stream_producer/
stream_producer.rs

1extern crate tokio;
2extern crate futures;
3
4use std::env;
5use std::io;
6use std::io::BufReader;
7use std::thread;
8use std::net::SocketAddr;
9use std::collections::HashMap;
10use futures::{Future, Stream, Sink};
11use futures::sync::mpsc::{UnboundedReceiver, unbounded};
12
13use redis_asio::{RedisResult, RedisError, RedisErrorKind, IntoRedisArgument, RedisArgument};
14use redis_asio::stream::{RedisStream, TouchGroupOptions, SendEntryOptions};
15
16struct Message(String);
17
18/// Implements the trait to allow use the structure as a RedisArgument within RedisCommand::arg().
19impl IntoRedisArgument for Message {
20    fn into_redis_argument(self) -> RedisArgument {
21        RedisArgument::String(self.0)
22    }
23}
24
25impl Message {
26    fn into_redis_stream_entry(self) -> HashMap<String, RedisArgument> {
27        let mut result = HashMap::new();
28        result.insert("type".to_string(), "Message".into_redis_argument());
29        result.insert("data".to_string(), self.into_redis_argument());
30        result
31    }
32}
33
34fn main() {
35    println!("Producer example has started");
36    println!("Please enter a STREAM to write to it");
37    let stream_name = read_stdin();
38    println!("Please enter a GROUP (is used only to create if that does not exist)");
39    let group_name = read_stdin();
40
41    let redis_address = env::var("REDIS_URL")
42        .unwrap_or("127.0.0.1:6379".to_string())
43        .parse::<SocketAddr>().expect("Couldn't parse Redis URl");
44
45    // Create an unbounded channel to allow the main thread notifies a child-network thread
46    // of the need to send a Message to a Redis stream.
47    let (tx, rx) = unbounded::<Message>();
48    let child = thread::spawn(move ||
49        // Spawn a child-network thread and run the producer
50        start_producer(rx, stream_name, group_name, redis_address));
51
52    println!("Please enter a message");
53    let stdin = tokio::io::lines(BufReader::new(tokio::io::stdin()));
54    let stdin = stdin
55        .map_err(|err| eprintln!("Cannot read from stdin: {}", err))
56        .for_each(move |line| {
57            // Redirect the stdin stream to the channel sender (tx).
58            tx.clone()
59                .send(Message(line)).map(|_| ())
60                .map_err(|err| eprintln!("Cannot read from stdin: {}", err))
61        });
62
63    tokio::run(stdin);
64    // Wait the child thread to complete (it will happen because if we are here the tx is closed,
65    // therefore rx listening will finish with an error).
66    child.join().expect("Expect joined thread");
67}
68
69/// Creates and holds a connection to the Redis Server, waits new messages from
70/// the channel receiver (rx) and send them to a Redis stream.
71fn start_producer(rx: UnboundedReceiver<Message>,
72                  stream_name: String,
73                  group_name: String,
74                  redis_address: SocketAddr) {
75    let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
76
77    // Try to create a group.
78    // If the group exists already, the future will not be set into an error.
79    // The create_group variable is the Future<Item=(), Error=()>.
80    let create_group = RedisStream::connect(&redis_address)
81        .and_then(move |con|
82            // Create group if the one does not exists yet.
83            con.touch_group(touch_options))
84        .then(|_| -> RedisResult<()> { Ok(()) });
85
86    // Creates and holds a connection to the Redis Server, waits new messages from
87    // the channel receiver (rx) and send them to a Redis stream.
88    //
89    // Note nothing will happen if the previous future has failed.
90    // The producer variable in a result is the Future<Item=(), Error=()>.
91    // The Item and Error are required by tokio::run().
92    let producer = create_group
93        .and_then(move |_| {
94            RedisStream::connect(&redis_address)
95        })
96        .and_then(move |producer| {
97            rx
98                .map_err(|_|
99                    RedisError::new(RedisErrorKind::InternalError,
100                                    "Something went wrong with UnboundedChannel".to_string()))
101                // Use fold() to redirect messages from the channel receiver (rx) to the Redis stream.
102                .fold(producer, move |producer, message| {
103                    let options = SendEntryOptions::new(stream_name.clone());
104
105                    // Serialize the message to pairs of key-value.
106                    let data = message.into_redis_stream_entry();
107
108                    producer
109                        .send_entry(options, data)
110                        .map(|(producer, added_entry_id)| {
111                            println!("{:?} has sent", added_entry_id.to_string());
112                            println!("Please enter a message");
113                            producer
114                        })
115                })
116        })
117        .map(|_| ())
118        .map_err(|err| println!("{}", err));
119
120    tokio::run(producer);
121}
122
123fn read_stdin() -> String {
124    let mut value = String::new();
125    io::stdin().read_line(&mut value).expect("Expect a valid string");
126    if value.ends_with("\n") {
127        value.pop().expect("Expect no empty string");
128    }
129
130    assert!(!value.is_empty(), "Expect no empty string");
131    value
132}