pub struct SubscribeOptions { /* private fields */ }Expand description
Set of options that are required by RedisStream::subscribe()
Implementations§
Source§impl SubscribeOptions
impl SubscribeOptions
pub fn new(stream: Vec<String>) -> SubscribeOptions
Sourcepub fn with_group(stream: Vec<String>, group: RedisGroup) -> SubscribeOptions
pub fn with_group(stream: Vec<String>, group: RedisGroup) -> SubscribeOptions
Examples found in repository?
examples/stream_consumer.rs (line 128)
67fn main() {
68 println!("Consumer example has started");
69 println!("Please enter a STREAM to listen on");
70 let stream_name = read_stdin();
71 println!("Please enter a GROUP");
72 let group_name = read_stdin();
73 println!("Please enter a CONSUMER");
74 let consumer_name = read_stdin();
75
76 let redis_address = env::var("REDIS_URL")
77 .unwrap_or("127.0.0.1:6379".to_string())
78 .parse::<SocketAddr>().expect("Couldn't parse Redis URl");
79
80 let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
81
82 // Try to create a group.
83 // If the group exists already, the future will not be set into an error.
84 // The create_group variable is the Future<Item=(), Error=()>.
85 let create_group = RedisStream::connect(&redis_address)
86 .and_then(move |con|
87 // Create group if the one does not exists yet
88 con.touch_group(touch_options))
89 .then(|_| -> RedisResult<()> { Ok(()) });
90
91 // Start the consuming after the group has been checked.
92 //
93 // Note nothing will happen if the previous future has failed.
94 // The consumer variable in a result is the Future<Item=(), Error=()>.
95 // The Item and Error are required by tokio::run().
96 let consumer = create_group
97 .and_then(move |_| {
98 // Create two connections to the Redis Server:
99 // first will be used for managing (send Acknowledge request),
100 // second will be used for receive entries from Redis Server.
101 let manager = RedisStream::connect(&redis_address);
102 let consumer = RedisStream::connect(&redis_address);
103 consumer.join(manager)
104 })
105 .and_then(move |(connection, manager)| {
106 // Create an unbounded channel to allow the consumer notifies the manager
107 // about received and unacknowledged yet entries.
108 let (tx, rx) = unbounded::<EntryId>();
109
110 // Copy of stream_name and group_name to move it into ack_entry future.
111 let stream = stream_name.clone();
112 let group = group_name.clone();
113
114 let ack_entry = rx
115 .map_err(|_|
116 RedisError::new(RedisErrorKind::InternalError,
117 "Something went wrong with UnboundedChannel".to_string()))
118 // Use fold() to redirect notification from the channel receiver (rx) to the manager.
119 .fold(manager, move |manager, id_to_ack|
120 ack_stream_entry(manager, stream.clone(), group.clone(), id_to_ack))
121 .map(|_| ())
122 .map_err(|_| ());
123
124 // Spawn the ack_entry future to be handled separately from the process_entry future.
125 tokio::spawn(ack_entry);
126
127 let group = RedisGroup::new(group_name, consumer_name);
128 let options = SubscribeOptions::with_group(vec![stream_name], group);
129
130 // Subscribe to a Redis stream, processes any incoming entries and sends
131 // entry ids of success processed entries to the manager via the channel sender (tx).
132 let process_entry =
133 connection.subscribe(options)
134 .and_then(move |subscribe|
135 subscribe.for_each(move |entries|
136 process_stream_entries(tx.clone(), entries)));
137 // Return and run later the process_entry future.
138 process_entry
139 })
140 .map_err(|err| eprintln!("Something went wrong: {:?}", err));
141
142 tokio::run(consumer);
143}Trait Implementations§
Source§impl Clone for SubscribeOptions
impl Clone for SubscribeOptions
Source§fn clone(&self) -> SubscribeOptions
fn clone(&self) -> SubscribeOptions
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreAuto Trait Implementations§
impl Freeze for SubscribeOptions
impl RefUnwindSafe for SubscribeOptions
impl Send for SubscribeOptions
impl Sync for SubscribeOptions
impl Unpin for SubscribeOptions
impl UnwindSafe for SubscribeOptions
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more