Struct EntryId

Source
pub struct EntryId(/* private fields */);

Implementations§

Source§

impl EntryId

Source

pub fn new(ms: u64, id: u64) -> EntryId

Source

pub fn to_string(&self) -> String

Examples found in repository?
examples/stream_consumer.rs (line 163)
156fn ack_stream_entry(manager: RedisStream, stream: String, group: String, id_to_ack: EntryId)
157                    -> impl Future<Item=RedisStream, Error=RedisError> {
158    let options = AckOptions::new(stream.clone(), group.clone(), id_to_ack.clone());
159
160    manager.ack_entry(options)
161        .map(move |(manager, response)| {
162            match response {
163                AckResponse::Ok => println!("{:?} is acknowledged", id_to_ack.to_string()),
164                AckResponse::NotExists =>
165                    eprintln!("Couldn't acknowledge {:?}", id_to_ack.to_string())
166            };
167            manager
168        })
169}
170
171fn process_stream_entries(acknowledger: UnboundedSender<EntryId>, entries: Vec<StreamEntry>)
172                          -> RedisResult<()> {
173    entries.into_iter()
174        .for_each(move |entry| {
175            let message =
176                Message::from_redis_stream_entry(&entry.values);
177            match message {
178                Ok(message) =>
179                    println!("Received message(ID={:?}): {:?}", entry.id.to_string(), message),
180                Err(err) => {
181                    eprintln!("{}", err);
182                    // do not acknowledge the message
183                    return;
184                }
185            }
186
187            // Notifies the manager about the received and processed entry.
188            let future = acknowledger.clone()
189                .send(entry.id)
190                .map(|_| ())
191                .map_err(|_| ());
192            tokio::spawn(future);
193        });
194    Ok(())
195}
More examples
Hide additional examples
examples/stream_producer.rs (line 111)
71fn start_producer(rx: UnboundedReceiver<Message>,
72                  stream_name: String,
73                  group_name: String,
74                  redis_address: SocketAddr) {
75    let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
76
77    // Try to create a group.
78    // If the group exists already, the future will not be set into an error.
79    // The create_group variable is the Future<Item=(), Error=()>.
80    let create_group = RedisStream::connect(&redis_address)
81        .and_then(move |con|
82            // Create group if the one does not exists yet.
83            con.touch_group(touch_options))
84        .then(|_| -> RedisResult<()> { Ok(()) });
85
86    // Creates and holds a connection to the Redis Server, waits new messages from
87    // the channel receiver (rx) and send them to a Redis stream.
88    //
89    // Note nothing will happen if the previous future has failed.
90    // The producer variable in a result is the Future<Item=(), Error=()>.
91    // The Item and Error are required by tokio::run().
92    let producer = create_group
93        .and_then(move |_| {
94            RedisStream::connect(&redis_address)
95        })
96        .and_then(move |producer| {
97            rx
98                .map_err(|_|
99                    RedisError::new(RedisErrorKind::InternalError,
100                                    "Something went wrong with UnboundedChannel".to_string()))
101                // Use fold() to redirect messages from the channel receiver (rx) to the Redis stream.
102                .fold(producer, move |producer, message| {
103                    let options = SendEntryOptions::new(stream_name.clone());
104
105                    // Serialize the message to pairs of key-value.
106                    let data = message.into_redis_stream_entry();
107
108                    producer
109                        .send_entry(options, data)
110                        .map(|(producer, added_entry_id)| {
111                            println!("{:?} has sent", added_entry_id.to_string());
112                            println!("Please enter a message");
113                            producer
114                        })
115                })
116        })
117        .map(|_| ())
118        .map_err(|err| println!("{}", err));
119
120    tokio::run(producer);
121}

Trait Implementations§

Source§

impl Clone for EntryId

Source§

fn clone(&self) -> EntryId

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for EntryId

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl PartialEq for EntryId

Source§

fn eq(&self, other: &EntryId) -> bool

Tests for self and other values to be equal, and is used by ==.
1.0.0 · Source§

fn ne(&self, other: &Rhs) -> bool

Tests for !=. The default implementation is almost always sufficient, and should not be overridden without very good reason.
Source§

impl PartialOrd for EntryId

Source§

fn partial_cmp(&self, other: &EntryId) -> Option<Ordering>

This method returns an ordering between self and other values if one exists. Read more
1.0.0 · Source§

fn lt(&self, other: &Rhs) -> bool

Tests less than (for self and other) and is used by the < operator. Read more
1.0.0 · Source§

fn le(&self, other: &Rhs) -> bool

Tests less than or equal to (for self and other) and is used by the <= operator. Read more
1.0.0 · Source§

fn gt(&self, other: &Rhs) -> bool

Tests greater than (for self and other) and is used by the > operator. Read more
1.0.0 · Source§

fn ge(&self, other: &Rhs) -> bool

Tests greater than or equal to (for self and other) and is used by the >= operator. Read more
Source§

impl StructuralPartialEq for EntryId

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.