stream_producer/
stream_producer.rs1extern crate tokio;
2extern crate futures;
3
4use std::env;
5use std::io;
6use std::io::BufReader;
7use std::thread;
8use std::net::SocketAddr;
9use std::collections::HashMap;
10use futures::{Future, Stream, Sink};
11use futures::sync::mpsc::{UnboundedReceiver, unbounded};
12
13use redis_asio::{RedisResult, RedisError, RedisErrorKind, IntoRedisArgument, RedisArgument};
14use redis_asio::stream::{RedisStream, TouchGroupOptions, SendEntryOptions};
15
16struct Message(String);
17
18impl IntoRedisArgument for Message {
20 fn into_redis_argument(self) -> RedisArgument {
21 RedisArgument::String(self.0)
22 }
23}
24
25impl Message {
26 fn into_redis_stream_entry(self) -> HashMap<String, RedisArgument> {
27 let mut result = HashMap::new();
28 result.insert("type".to_string(), "Message".into_redis_argument());
29 result.insert("data".to_string(), self.into_redis_argument());
30 result
31 }
32}
33
34fn main() {
35 println!("Producer example has started");
36 println!("Please enter a STREAM to write to it");
37 let stream_name = read_stdin();
38 println!("Please enter a GROUP (is used only to create if that does not exist)");
39 let group_name = read_stdin();
40
41 let redis_address = env::var("REDIS_URL")
42 .unwrap_or("127.0.0.1:6379".to_string())
43 .parse::<SocketAddr>().expect("Couldn't parse Redis URl");
44
45 let (tx, rx) = unbounded::<Message>();
48 let child = thread::spawn(move ||
49 start_producer(rx, stream_name, group_name, redis_address));
51
52 println!("Please enter a message");
53 let stdin = tokio::io::lines(BufReader::new(tokio::io::stdin()));
54 let stdin = stdin
55 .map_err(|err| eprintln!("Cannot read from stdin: {}", err))
56 .for_each(move |line| {
57 tx.clone()
59 .send(Message(line)).map(|_| ())
60 .map_err(|err| eprintln!("Cannot read from stdin: {}", err))
61 });
62
63 tokio::run(stdin);
64 child.join().expect("Expect joined thread");
67}
68
69fn start_producer(rx: UnboundedReceiver<Message>,
72 stream_name: String,
73 group_name: String,
74 redis_address: SocketAddr) {
75 let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
76
77 let create_group = RedisStream::connect(&redis_address)
81 .and_then(move |con|
82 con.touch_group(touch_options))
84 .then(|_| -> RedisResult<()> { Ok(()) });
85
86 let producer = create_group
93 .and_then(move |_| {
94 RedisStream::connect(&redis_address)
95 })
96 .and_then(move |producer| {
97 rx
98 .map_err(|_|
99 RedisError::new(RedisErrorKind::InternalError,
100 "Something went wrong with UnboundedChannel".to_string()))
101 .fold(producer, move |producer, message| {
103 let options = SendEntryOptions::new(stream_name.clone());
104
105 let data = message.into_redis_stream_entry();
107
108 producer
109 .send_entry(options, data)
110 .map(|(producer, added_entry_id)| {
111 println!("{:?} has sent", added_entry_id.to_string());
112 println!("Please enter a message");
113 producer
114 })
115 })
116 })
117 .map(|_| ())
118 .map_err(|err| println!("{}", err));
119
120 tokio::run(producer);
121}
122
123fn read_stdin() -> String {
124 let mut value = String::new();
125 io::stdin().read_line(&mut value).expect("Expect a valid string");
126 if value.ends_with("\n") {
127 value.pop().expect("Expect no empty string");
128 }
129
130 assert!(!value.is_empty(), "Expect no empty string");
131 value
132}