pub struct TouchGroupOptions { /* private fields */ }Expand description
Set of options that are required by RedisStream::touch_group()
Implementations§
Source§impl TouchGroupOptions
impl TouchGroupOptions
Sourcepub fn new(stream: String, group: String) -> Self
pub fn new(stream: String, group: String) -> Self
Examples found in repository?
examples/stream_producer.rs (line 75)
71fn start_producer(rx: UnboundedReceiver<Message>,
72 stream_name: String,
73 group_name: String,
74 redis_address: SocketAddr) {
75 let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
76
77 // Try to create a group.
78 // If the group exists already, the future will not be set into an error.
79 // The create_group variable is the Future<Item=(), Error=()>.
80 let create_group = RedisStream::connect(&redis_address)
81 .and_then(move |con|
82 // Create group if the one does not exists yet.
83 con.touch_group(touch_options))
84 .then(|_| -> RedisResult<()> { Ok(()) });
85
86 // Creates and holds a connection to the Redis Server, waits new messages from
87 // the channel receiver (rx) and send them to a Redis stream.
88 //
89 // Note nothing will happen if the previous future has failed.
90 // The producer variable in a result is the Future<Item=(), Error=()>.
91 // The Item and Error are required by tokio::run().
92 let producer = create_group
93 .and_then(move |_| {
94 RedisStream::connect(&redis_address)
95 })
96 .and_then(move |producer| {
97 rx
98 .map_err(|_|
99 RedisError::new(RedisErrorKind::InternalError,
100 "Something went wrong with UnboundedChannel".to_string()))
101 // Use fold() to redirect messages from the channel receiver (rx) to the Redis stream.
102 .fold(producer, move |producer, message| {
103 let options = SendEntryOptions::new(stream_name.clone());
104
105 // Serialize the message to pairs of key-value.
106 let data = message.into_redis_stream_entry();
107
108 producer
109 .send_entry(options, data)
110 .map(|(producer, added_entry_id)| {
111 println!("{:?} has sent", added_entry_id.to_string());
112 println!("Please enter a message");
113 producer
114 })
115 })
116 })
117 .map(|_| ())
118 .map_err(|err| println!("{}", err));
119
120 tokio::run(producer);
121}More examples
examples/stream_consumer.rs (line 80)
67fn main() {
68 println!("Consumer example has started");
69 println!("Please enter a STREAM to listen on");
70 let stream_name = read_stdin();
71 println!("Please enter a GROUP");
72 let group_name = read_stdin();
73 println!("Please enter a CONSUMER");
74 let consumer_name = read_stdin();
75
76 let redis_address = env::var("REDIS_URL")
77 .unwrap_or("127.0.0.1:6379".to_string())
78 .parse::<SocketAddr>().expect("Couldn't parse Redis URl");
79
80 let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
81
82 // Try to create a group.
83 // If the group exists already, the future will not be set into an error.
84 // The create_group variable is the Future<Item=(), Error=()>.
85 let create_group = RedisStream::connect(&redis_address)
86 .and_then(move |con|
87 // Create group if the one does not exists yet
88 con.touch_group(touch_options))
89 .then(|_| -> RedisResult<()> { Ok(()) });
90
91 // Start the consuming after the group has been checked.
92 //
93 // Note nothing will happen if the previous future has failed.
94 // The consumer variable in a result is the Future<Item=(), Error=()>.
95 // The Item and Error are required by tokio::run().
96 let consumer = create_group
97 .and_then(move |_| {
98 // Create two connections to the Redis Server:
99 // first will be used for managing (send Acknowledge request),
100 // second will be used for receive entries from Redis Server.
101 let manager = RedisStream::connect(&redis_address);
102 let consumer = RedisStream::connect(&redis_address);
103 consumer.join(manager)
104 })
105 .and_then(move |(connection, manager)| {
106 // Create an unbounded channel to allow the consumer notifies the manager
107 // about received and unacknowledged yet entries.
108 let (tx, rx) = unbounded::<EntryId>();
109
110 // Copy of stream_name and group_name to move it into ack_entry future.
111 let stream = stream_name.clone();
112 let group = group_name.clone();
113
114 let ack_entry = rx
115 .map_err(|_|
116 RedisError::new(RedisErrorKind::InternalError,
117 "Something went wrong with UnboundedChannel".to_string()))
118 // Use fold() to redirect notification from the channel receiver (rx) to the manager.
119 .fold(manager, move |manager, id_to_ack|
120 ack_stream_entry(manager, stream.clone(), group.clone(), id_to_ack))
121 .map(|_| ())
122 .map_err(|_| ());
123
124 // Spawn the ack_entry future to be handled separately from the process_entry future.
125 tokio::spawn(ack_entry);
126
127 let group = RedisGroup::new(group_name, consumer_name);
128 let options = SubscribeOptions::with_group(vec![stream_name], group);
129
130 // Subscribe to a Redis stream, processes any incoming entries and sends
131 // entry ids of success processed entries to the manager via the channel sender (tx).
132 let process_entry =
133 connection.subscribe(options)
134 .and_then(move |subscribe|
135 subscribe.for_each(move |entries|
136 process_stream_entries(tx.clone(), entries)));
137 // Return and run later the process_entry future.
138 process_entry
139 })
140 .map_err(|err| eprintln!("Something went wrong: {:?}", err));
141
142 tokio::run(consumer);
143}Trait Implementations§
Source§impl Clone for TouchGroupOptions
impl Clone for TouchGroupOptions
Source§fn clone(&self) -> TouchGroupOptions
fn clone(&self) -> TouchGroupOptions
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreAuto Trait Implementations§
impl Freeze for TouchGroupOptions
impl RefUnwindSafe for TouchGroupOptions
impl Send for TouchGroupOptions
impl Sync for TouchGroupOptions
impl Unpin for TouchGroupOptions
impl UnwindSafe for TouchGroupOptions
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more