pub struct RedisGroup { /* private fields */ }Expand description
Pair of group name and consumer name
Implementations§
Source§impl RedisGroup
impl RedisGroup
Sourcepub fn new(group: String, consumer: String) -> RedisGroup
pub fn new(group: String, consumer: String) -> RedisGroup
Examples found in repository?
examples/stream_consumer.rs (line 127)
67fn main() {
68 println!("Consumer example has started");
69 println!("Please enter a STREAM to listen on");
70 let stream_name = read_stdin();
71 println!("Please enter a GROUP");
72 let group_name = read_stdin();
73 println!("Please enter a CONSUMER");
74 let consumer_name = read_stdin();
75
76 let redis_address = env::var("REDIS_URL")
77 .unwrap_or("127.0.0.1:6379".to_string())
78 .parse::<SocketAddr>().expect("Couldn't parse Redis URl");
79
80 let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
81
82 // Try to create a group.
83 // If the group exists already, the future will not be set into an error.
84 // The create_group variable is the Future<Item=(), Error=()>.
85 let create_group = RedisStream::connect(&redis_address)
86 .and_then(move |con|
87 // Create group if the one does not exists yet
88 con.touch_group(touch_options))
89 .then(|_| -> RedisResult<()> { Ok(()) });
90
91 // Start the consuming after the group has been checked.
92 //
93 // Note nothing will happen if the previous future has failed.
94 // The consumer variable in a result is the Future<Item=(), Error=()>.
95 // The Item and Error are required by tokio::run().
96 let consumer = create_group
97 .and_then(move |_| {
98 // Create two connections to the Redis Server:
99 // first will be used for managing (send Acknowledge request),
100 // second will be used for receive entries from Redis Server.
101 let manager = RedisStream::connect(&redis_address);
102 let consumer = RedisStream::connect(&redis_address);
103 consumer.join(manager)
104 })
105 .and_then(move |(connection, manager)| {
106 // Create an unbounded channel to allow the consumer notifies the manager
107 // about received and unacknowledged yet entries.
108 let (tx, rx) = unbounded::<EntryId>();
109
110 // Copy of stream_name and group_name to move it into ack_entry future.
111 let stream = stream_name.clone();
112 let group = group_name.clone();
113
114 let ack_entry = rx
115 .map_err(|_|
116 RedisError::new(RedisErrorKind::InternalError,
117 "Something went wrong with UnboundedChannel".to_string()))
118 // Use fold() to redirect notification from the channel receiver (rx) to the manager.
119 .fold(manager, move |manager, id_to_ack|
120 ack_stream_entry(manager, stream.clone(), group.clone(), id_to_ack))
121 .map(|_| ())
122 .map_err(|_| ());
123
124 // Spawn the ack_entry future to be handled separately from the process_entry future.
125 tokio::spawn(ack_entry);
126
127 let group = RedisGroup::new(group_name, consumer_name);
128 let options = SubscribeOptions::with_group(vec![stream_name], group);
129
130 // Subscribe to a Redis stream, processes any incoming entries and sends
131 // entry ids of success processed entries to the manager via the channel sender (tx).
132 let process_entry =
133 connection.subscribe(options)
134 .and_then(move |subscribe|
135 subscribe.for_each(move |entries|
136 process_stream_entries(tx.clone(), entries)));
137 // Return and run later the process_entry future.
138 process_entry
139 })
140 .map_err(|err| eprintln!("Something went wrong: {:?}", err));
141
142 tokio::run(consumer);
143}Trait Implementations§
Source§impl Clone for RedisGroup
impl Clone for RedisGroup
Source§fn clone(&self) -> RedisGroup
fn clone(&self) -> RedisGroup
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreAuto Trait Implementations§
impl Freeze for RedisGroup
impl RefUnwindSafe for RedisGroup
impl Send for RedisGroup
impl Sync for RedisGroup
impl Unpin for RedisGroup
impl UnwindSafe for RedisGroup
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more