Struct SubscribeOptions

Source
pub struct SubscribeOptions { /* private fields */ }
Expand description

Set of options that are required by RedisStream::subscribe()

Implementations§

Source§

impl SubscribeOptions

Source

pub fn new(stream: Vec<String>) -> SubscribeOptions

Source

pub fn with_group(stream: Vec<String>, group: RedisGroup) -> SubscribeOptions

Examples found in repository?
examples/stream_consumer.rs (line 128)
67fn main() {
68    println!("Consumer example has started");
69    println!("Please enter a STREAM to listen on");
70    let stream_name = read_stdin();
71    println!("Please enter a GROUP");
72    let group_name = read_stdin();
73    println!("Please enter a CONSUMER");
74    let consumer_name = read_stdin();
75
76    let redis_address = env::var("REDIS_URL")
77        .unwrap_or("127.0.0.1:6379".to_string())
78        .parse::<SocketAddr>().expect("Couldn't parse Redis URl");
79
80    let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
81
82    // Try to create a group.
83    // If the group exists already, the future will not be set into an error.
84    // The create_group variable is the Future<Item=(), Error=()>.
85    let create_group = RedisStream::connect(&redis_address)
86        .and_then(move |con|
87            // Create group if the one does not exists yet
88            con.touch_group(touch_options))
89        .then(|_| -> RedisResult<()> { Ok(()) });
90
91    // Start the consuming after the group has been checked.
92    //
93    // Note nothing will happen if the previous future has failed.
94    // The consumer variable in a result is the Future<Item=(), Error=()>.
95    // The Item and Error are required by tokio::run().
96    let consumer = create_group
97        .and_then(move |_| {
98            // Create two connections to the Redis Server:
99            // first will be used for managing (send Acknowledge request),
100            // second will be used for receive entries from Redis Server.
101            let manager = RedisStream::connect(&redis_address);
102            let consumer = RedisStream::connect(&redis_address);
103            consumer.join(manager)
104        })
105        .and_then(move |(connection, manager)| {
106            // Create an unbounded channel to allow the consumer notifies the manager
107            // about received and unacknowledged yet entries.
108            let (tx, rx) = unbounded::<EntryId>();
109
110            // Copy of stream_name and group_name to move it into ack_entry future.
111            let stream = stream_name.clone();
112            let group = group_name.clone();
113
114            let ack_entry = rx
115                .map_err(|_|
116                    RedisError::new(RedisErrorKind::InternalError,
117                                    "Something went wrong with UnboundedChannel".to_string()))
118                // Use fold() to redirect notification from the channel receiver (rx) to the manager.
119                .fold(manager, move |manager, id_to_ack|
120                    ack_stream_entry(manager, stream.clone(), group.clone(), id_to_ack))
121                .map(|_| ())
122                .map_err(|_| ());
123
124            // Spawn the ack_entry future to be handled separately from the process_entry future.
125            tokio::spawn(ack_entry);
126
127            let group = RedisGroup::new(group_name, consumer_name);
128            let options = SubscribeOptions::with_group(vec![stream_name], group);
129
130            // Subscribe to a Redis stream, processes any incoming entries and sends
131            // entry ids of success processed entries to the manager via the channel sender (tx).
132            let process_entry =
133                connection.subscribe(options)
134                    .and_then(move |subscribe|
135                        subscribe.for_each(move |entries|
136                            process_stream_entries(tx.clone(), entries)));
137            // Return and run later the process_entry future.
138            process_entry
139        })
140        .map_err(|err| eprintln!("Something went wrong: {:?}", err));
141
142    tokio::run(consumer);
143}

Trait Implementations§

Source§

impl Clone for SubscribeOptions

Source§

fn clone(&self) -> SubscribeOptions

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.