pub struct EntryId(/* private fields */);Implementations§
Source§impl EntryId
impl EntryId
pub fn new(ms: u64, id: u64) -> EntryId
Sourcepub fn to_string(&self) -> String
pub fn to_string(&self) -> String
Examples found in repository?
examples/stream_consumer.rs (line 163)
156fn ack_stream_entry(manager: RedisStream, stream: String, group: String, id_to_ack: EntryId)
157 -> impl Future<Item=RedisStream, Error=RedisError> {
158 let options = AckOptions::new(stream.clone(), group.clone(), id_to_ack.clone());
159
160 manager.ack_entry(options)
161 .map(move |(manager, response)| {
162 match response {
163 AckResponse::Ok => println!("{:?} is acknowledged", id_to_ack.to_string()),
164 AckResponse::NotExists =>
165 eprintln!("Couldn't acknowledge {:?}", id_to_ack.to_string())
166 };
167 manager
168 })
169}
170
171fn process_stream_entries(acknowledger: UnboundedSender<EntryId>, entries: Vec<StreamEntry>)
172 -> RedisResult<()> {
173 entries.into_iter()
174 .for_each(move |entry| {
175 let message =
176 Message::from_redis_stream_entry(&entry.values);
177 match message {
178 Ok(message) =>
179 println!("Received message(ID={:?}): {:?}", entry.id.to_string(), message),
180 Err(err) => {
181 eprintln!("{}", err);
182 // do not acknowledge the message
183 return;
184 }
185 }
186
187 // Notifies the manager about the received and processed entry.
188 let future = acknowledger.clone()
189 .send(entry.id)
190 .map(|_| ())
191 .map_err(|_| ());
192 tokio::spawn(future);
193 });
194 Ok(())
195}More examples
examples/stream_producer.rs (line 111)
71fn start_producer(rx: UnboundedReceiver<Message>,
72 stream_name: String,
73 group_name: String,
74 redis_address: SocketAddr) {
75 let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
76
77 // Try to create a group.
78 // If the group exists already, the future will not be set into an error.
79 // The create_group variable is the Future<Item=(), Error=()>.
80 let create_group = RedisStream::connect(&redis_address)
81 .and_then(move |con|
82 // Create group if the one does not exists yet.
83 con.touch_group(touch_options))
84 .then(|_| -> RedisResult<()> { Ok(()) });
85
86 // Creates and holds a connection to the Redis Server, waits new messages from
87 // the channel receiver (rx) and send them to a Redis stream.
88 //
89 // Note nothing will happen if the previous future has failed.
90 // The producer variable in a result is the Future<Item=(), Error=()>.
91 // The Item and Error are required by tokio::run().
92 let producer = create_group
93 .and_then(move |_| {
94 RedisStream::connect(&redis_address)
95 })
96 .and_then(move |producer| {
97 rx
98 .map_err(|_|
99 RedisError::new(RedisErrorKind::InternalError,
100 "Something went wrong with UnboundedChannel".to_string()))
101 // Use fold() to redirect messages from the channel receiver (rx) to the Redis stream.
102 .fold(producer, move |producer, message| {
103 let options = SendEntryOptions::new(stream_name.clone());
104
105 // Serialize the message to pairs of key-value.
106 let data = message.into_redis_stream_entry();
107
108 producer
109 .send_entry(options, data)
110 .map(|(producer, added_entry_id)| {
111 println!("{:?} has sent", added_entry_id.to_string());
112 println!("Please enter a message");
113 producer
114 })
115 })
116 })
117 .map(|_| ())
118 .map_err(|err| println!("{}", err));
119
120 tokio::run(producer);
121}Trait Implementations§
Source§impl PartialOrd for EntryId
impl PartialOrd for EntryId
impl StructuralPartialEq for EntryId
Auto Trait Implementations§
impl Freeze for EntryId
impl RefUnwindSafe for EntryId
impl Send for EntryId
impl Sync for EntryId
impl Unpin for EntryId
impl UnwindSafe for EntryId
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more