pub struct RedisStream { /* private fields */ }Expand description
The structure represents a Redis connection that provides interface for working with Redis Stream “https://redis.io/topics/streams-intro”.
The structure wraps an actual RedisCoreConnection,
converts RedisValue into and from considered structures that are easier
to use in Redis Stream context.
See more examples in examples directory.
Implementations§
Source§impl RedisStream
impl RedisStream
Sourcepub fn connect(
addr: &SocketAddr,
) -> impl Future<Item = RedisStream, Error = RedisError> + Send + 'static
pub fn connect( addr: &SocketAddr, ) -> impl Future<Item = RedisStream, Error = RedisError> + Send + 'static
Open a connection to Redis server and wrap it into RedisStream,
that will be available in the future.
Examples found in repository?
71fn start_producer(rx: UnboundedReceiver<Message>,
72 stream_name: String,
73 group_name: String,
74 redis_address: SocketAddr) {
75 let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
76
77 // Try to create a group.
78 // If the group exists already, the future will not be set into an error.
79 // The create_group variable is the Future<Item=(), Error=()>.
80 let create_group = RedisStream::connect(&redis_address)
81 .and_then(move |con|
82 // Create group if the one does not exists yet.
83 con.touch_group(touch_options))
84 .then(|_| -> RedisResult<()> { Ok(()) });
85
86 // Creates and holds a connection to the Redis Server, waits new messages from
87 // the channel receiver (rx) and send them to a Redis stream.
88 //
89 // Note nothing will happen if the previous future has failed.
90 // The producer variable in a result is the Future<Item=(), Error=()>.
91 // The Item and Error are required by tokio::run().
92 let producer = create_group
93 .and_then(move |_| {
94 RedisStream::connect(&redis_address)
95 })
96 .and_then(move |producer| {
97 rx
98 .map_err(|_|
99 RedisError::new(RedisErrorKind::InternalError,
100 "Something went wrong with UnboundedChannel".to_string()))
101 // Use fold() to redirect messages from the channel receiver (rx) to the Redis stream.
102 .fold(producer, move |producer, message| {
103 let options = SendEntryOptions::new(stream_name.clone());
104
105 // Serialize the message to pairs of key-value.
106 let data = message.into_redis_stream_entry();
107
108 producer
109 .send_entry(options, data)
110 .map(|(producer, added_entry_id)| {
111 println!("{:?} has sent", added_entry_id.to_string());
112 println!("Please enter a message");
113 producer
114 })
115 })
116 })
117 .map(|_| ())
118 .map_err(|err| println!("{}", err));
119
120 tokio::run(producer);
121}More examples
67fn main() {
68 println!("Consumer example has started");
69 println!("Please enter a STREAM to listen on");
70 let stream_name = read_stdin();
71 println!("Please enter a GROUP");
72 let group_name = read_stdin();
73 println!("Please enter a CONSUMER");
74 let consumer_name = read_stdin();
75
76 let redis_address = env::var("REDIS_URL")
77 .unwrap_or("127.0.0.1:6379".to_string())
78 .parse::<SocketAddr>().expect("Couldn't parse Redis URl");
79
80 let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
81
82 // Try to create a group.
83 // If the group exists already, the future will not be set into an error.
84 // The create_group variable is the Future<Item=(), Error=()>.
85 let create_group = RedisStream::connect(&redis_address)
86 .and_then(move |con|
87 // Create group if the one does not exists yet
88 con.touch_group(touch_options))
89 .then(|_| -> RedisResult<()> { Ok(()) });
90
91 // Start the consuming after the group has been checked.
92 //
93 // Note nothing will happen if the previous future has failed.
94 // The consumer variable in a result is the Future<Item=(), Error=()>.
95 // The Item and Error are required by tokio::run().
96 let consumer = create_group
97 .and_then(move |_| {
98 // Create two connections to the Redis Server:
99 // first will be used for managing (send Acknowledge request),
100 // second will be used for receive entries from Redis Server.
101 let manager = RedisStream::connect(&redis_address);
102 let consumer = RedisStream::connect(&redis_address);
103 consumer.join(manager)
104 })
105 .and_then(move |(connection, manager)| {
106 // Create an unbounded channel to allow the consumer notifies the manager
107 // about received and unacknowledged yet entries.
108 let (tx, rx) = unbounded::<EntryId>();
109
110 // Copy of stream_name and group_name to move it into ack_entry future.
111 let stream = stream_name.clone();
112 let group = group_name.clone();
113
114 let ack_entry = rx
115 .map_err(|_|
116 RedisError::new(RedisErrorKind::InternalError,
117 "Something went wrong with UnboundedChannel".to_string()))
118 // Use fold() to redirect notification from the channel receiver (rx) to the manager.
119 .fold(manager, move |manager, id_to_ack|
120 ack_stream_entry(manager, stream.clone(), group.clone(), id_to_ack))
121 .map(|_| ())
122 .map_err(|_| ());
123
124 // Spawn the ack_entry future to be handled separately from the process_entry future.
125 tokio::spawn(ack_entry);
126
127 let group = RedisGroup::new(group_name, consumer_name);
128 let options = SubscribeOptions::with_group(vec![stream_name], group);
129
130 // Subscribe to a Redis stream, processes any incoming entries and sends
131 // entry ids of success processed entries to the manager via the channel sender (tx).
132 let process_entry =
133 connection.subscribe(options)
134 .and_then(move |subscribe|
135 subscribe.for_each(move |entries|
136 process_stream_entries(tx.clone(), entries)));
137 // Return and run later the process_entry future.
138 process_entry
139 })
140 .map_err(|err| eprintln!("Something went wrong: {:?}", err));
141
142 tokio::run(consumer);
143}Sourcepub fn send_entry<T>(
self,
options: SendEntryOptions,
key_values: HashMap<String, T>,
) -> impl Future<Item = (RedisStream, EntryId), Error = RedisError> + Send + 'staticwhere
T: IntoRedisArgument,
pub fn send_entry<T>(
self,
options: SendEntryOptions,
key_values: HashMap<String, T>,
) -> impl Future<Item = (RedisStream, EntryId), Error = RedisError> + Send + 'staticwhere
T: IntoRedisArgument,
Send an entry that will be constructed by options and pairs of key-values.
§Example
use std::net::SocketAddr;
use std::collections::HashMap;
use futures::Future;
use redis_asio::{RedisArgument, IntoRedisArgument};
use redis_asio::stream::{RedisStream, SendEntryOptions, EntryId};
let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
let send_options = SendEntryOptions::new("mystream".to_string());
let mut request: HashMap<String, RedisArgument> = HashMap::new();
request.insert("type".to_string(), 3i32.into_redis_argument());
request.insert("data".to_string(), "Hello, world!".into_redis_argument());
let future = RedisStream::connect(address)
.and_then(move |stream: RedisStream| {
// HashMap<String, RedisArgument> satisfies the
// HashMap<String, ToRedisArgument>
stream.send_entry(send_options, request)
})
.map(|(_, inserted_entry_id): (RedisStream, EntryId)| {
println!("{:?} has sent", inserted_entry_id.to_string());
})
.map_err(|err| eprintln!("something went wrong: {}", err));
tokio::run(future);Examples found in repository?
71fn start_producer(rx: UnboundedReceiver<Message>,
72 stream_name: String,
73 group_name: String,
74 redis_address: SocketAddr) {
75 let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
76
77 // Try to create a group.
78 // If the group exists already, the future will not be set into an error.
79 // The create_group variable is the Future<Item=(), Error=()>.
80 let create_group = RedisStream::connect(&redis_address)
81 .and_then(move |con|
82 // Create group if the one does not exists yet.
83 con.touch_group(touch_options))
84 .then(|_| -> RedisResult<()> { Ok(()) });
85
86 // Creates and holds a connection to the Redis Server, waits new messages from
87 // the channel receiver (rx) and send them to a Redis stream.
88 //
89 // Note nothing will happen if the previous future has failed.
90 // The producer variable in a result is the Future<Item=(), Error=()>.
91 // The Item and Error are required by tokio::run().
92 let producer = create_group
93 .and_then(move |_| {
94 RedisStream::connect(&redis_address)
95 })
96 .and_then(move |producer| {
97 rx
98 .map_err(|_|
99 RedisError::new(RedisErrorKind::InternalError,
100 "Something went wrong with UnboundedChannel".to_string()))
101 // Use fold() to redirect messages from the channel receiver (rx) to the Redis stream.
102 .fold(producer, move |producer, message| {
103 let options = SendEntryOptions::new(stream_name.clone());
104
105 // Serialize the message to pairs of key-value.
106 let data = message.into_redis_stream_entry();
107
108 producer
109 .send_entry(options, data)
110 .map(|(producer, added_entry_id)| {
111 println!("{:?} has sent", added_entry_id.to_string());
112 println!("Please enter a message");
113 producer
114 })
115 })
116 })
117 .map(|_| ())
118 .map_err(|err| println!("{}", err));
119
120 tokio::run(producer);
121}Sourcepub fn read_explicit(
self,
options: ReadExplicitOptions,
) -> impl Future<Item = (RedisStream, Vec<StreamEntry>), Error = RedisError> + Send + 'static
pub fn read_explicit( self, options: ReadExplicitOptions, ) -> impl Future<Item = (RedisStream, Vec<StreamEntry>), Error = RedisError> + Send + 'static
Read entries with IDs greater than specified start_id.
§Example
use std::net::SocketAddr;
use std::collections::HashMap;
use futures::Future;
use redis_asio::stream::{RedisStream, ReadExplicitOptions, EntryId,
StreamEntry};
let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
// start_id = "0-0" means get any entries
let mut read_options =
ReadExplicitOptions::new("stream1".to_string(), EntryId::new(0, 0), 10);
read_options.add_stream("stream2".to_string(), EntryId::new(0, 0));
let future = RedisStream::connect(address)
.and_then(move |stream: RedisStream| {
stream.read_explicit(read_options)
})
.map(|(_, entries): (RedisStream, Vec<StreamEntry>)| {
for entry in entries.into_iter() {
println!("Received: {:?}", entry);
}
})
.map_err(|err| eprintln!("something went wrong: {}", err));
tokio::run(future);Sourcepub fn range(
self,
options: RangeOptions,
) -> impl Future<Item = (RedisStream, Vec<RangeEntry>), Error = RedisError> + Send + 'static
pub fn range( self, options: RangeOptions, ) -> impl Future<Item = (RedisStream, Vec<RangeEntry>), Error = RedisError> + Send + 'static
Get entries in specified range.
§Example
use std::net::SocketAddr;
use futures::Future;
use redis_asio::stream::{RedisStream, RangeOptions, RangeType, RangeEntry};
let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
let range_options =
RangeOptions::new("stream1".to_string(), 10, RangeType::Any).unwrap();
let future = RedisStream::connect(address)
.and_then(move |stream: RedisStream| {
stream.range(range_options)
})
.map(|(_, entries): (RedisStream, Vec<RangeEntry>)| {
for entry in entries.into_iter() {
println!("Received: {:?}", entry);
}
})
.map_err(|err| eprintln!("something went wrong: {}", err));
tokio::run(future);Sourcepub fn subscribe(
self,
options: SubscribeOptions,
) -> impl Future<Item = Subscribe, Error = RedisError> + Send + 'static
pub fn subscribe( self, options: SubscribeOptions, ) -> impl Future<Item = Subscribe, Error = RedisError> + Send + 'static
Subscribe to a Redis stream and process all incoming entries. Redis Streams requires to send XREAD/XREADGROUP requests every time the client receives a response on the previous, in other words Redis Streams does not provide an interface to subscribe to a Redis stream.
In the Crate the subscription is possible by hidden requests sending within the Crate engine.
Request that will be sent to get new entries in the following example: “XREADGROUP GROUP mygroup Bob BLOCK 0 STREAMS mystream <”
§Example
use std::net::SocketAddr;
use futures::{Future, Stream};
use redis_asio::stream::{RedisStream, SubscribeOptions, StreamEntry,
RedisGroup};
let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
let group_info = RedisGroup::new("mygroup".to_string(), "Bob".to_string());
let subscribe_options =
SubscribeOptions::with_group(vec!["stream1".to_string()], group_info);
let future = RedisStream::connect(address)
.and_then(move |stream: RedisStream| {
stream.subscribe(subscribe_options)
})
.and_then(|subscribe| /*:Subscribe*/ {
subscribe.for_each(|entries: Vec<StreamEntry>| {
for entry in entries.into_iter() {
println!("Received: {:?}", entry);
}
Ok(())
})
})
.map_err(|err| eprintln!("something went wrong: {}", err));
tokio::run(future);Examples found in repository?
67fn main() {
68 println!("Consumer example has started");
69 println!("Please enter a STREAM to listen on");
70 let stream_name = read_stdin();
71 println!("Please enter a GROUP");
72 let group_name = read_stdin();
73 println!("Please enter a CONSUMER");
74 let consumer_name = read_stdin();
75
76 let redis_address = env::var("REDIS_URL")
77 .unwrap_or("127.0.0.1:6379".to_string())
78 .parse::<SocketAddr>().expect("Couldn't parse Redis URl");
79
80 let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
81
82 // Try to create a group.
83 // If the group exists already, the future will not be set into an error.
84 // The create_group variable is the Future<Item=(), Error=()>.
85 let create_group = RedisStream::connect(&redis_address)
86 .and_then(move |con|
87 // Create group if the one does not exists yet
88 con.touch_group(touch_options))
89 .then(|_| -> RedisResult<()> { Ok(()) });
90
91 // Start the consuming after the group has been checked.
92 //
93 // Note nothing will happen if the previous future has failed.
94 // The consumer variable in a result is the Future<Item=(), Error=()>.
95 // The Item and Error are required by tokio::run().
96 let consumer = create_group
97 .and_then(move |_| {
98 // Create two connections to the Redis Server:
99 // first will be used for managing (send Acknowledge request),
100 // second will be used for receive entries from Redis Server.
101 let manager = RedisStream::connect(&redis_address);
102 let consumer = RedisStream::connect(&redis_address);
103 consumer.join(manager)
104 })
105 .and_then(move |(connection, manager)| {
106 // Create an unbounded channel to allow the consumer notifies the manager
107 // about received and unacknowledged yet entries.
108 let (tx, rx) = unbounded::<EntryId>();
109
110 // Copy of stream_name and group_name to move it into ack_entry future.
111 let stream = stream_name.clone();
112 let group = group_name.clone();
113
114 let ack_entry = rx
115 .map_err(|_|
116 RedisError::new(RedisErrorKind::InternalError,
117 "Something went wrong with UnboundedChannel".to_string()))
118 // Use fold() to redirect notification from the channel receiver (rx) to the manager.
119 .fold(manager, move |manager, id_to_ack|
120 ack_stream_entry(manager, stream.clone(), group.clone(), id_to_ack))
121 .map(|_| ())
122 .map_err(|_| ());
123
124 // Spawn the ack_entry future to be handled separately from the process_entry future.
125 tokio::spawn(ack_entry);
126
127 let group = RedisGroup::new(group_name, consumer_name);
128 let options = SubscribeOptions::with_group(vec![stream_name], group);
129
130 // Subscribe to a Redis stream, processes any incoming entries and sends
131 // entry ids of success processed entries to the manager via the channel sender (tx).
132 let process_entry =
133 connection.subscribe(options)
134 .and_then(move |subscribe|
135 subscribe.for_each(move |entries|
136 process_stream_entries(tx.clone(), entries)));
137 // Return and run later the process_entry future.
138 process_entry
139 })
140 .map_err(|err| eprintln!("Something went wrong: {:?}", err));
141
142 tokio::run(consumer);
143}Sourcepub fn ack_entry(
self,
options: AckOptions,
) -> impl Future<Item = (Self, AckResponse), Error = RedisError> + Send + 'static
pub fn ack_entry( self, options: AckOptions, ) -> impl Future<Item = (Self, AckResponse), Error = RedisError> + Send + 'static
Acknowledge an entry by its ID.
§Example
use std::net::SocketAddr;
use futures::{Future, Stream};
use redis_asio::stream::{RedisStream, AckOptions, AckResponse, EntryId};
let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
let ack_options =
AckOptions::new(
"mystream".to_string(),
"mygroup".to_string(),
EntryId::new(0, 0));
let future = RedisStream::connect(address)
.and_then(move |stream: RedisStream| {
stream.ack_entry(ack_options)
})
.map(|(_, response): (RedisStream, AckResponse)| {
assert_eq!(AckResponse::Ok, response);
})
.map_err(|err| eprintln!("something went wrong: {}", err));
tokio::run(future);Examples found in repository?
156fn ack_stream_entry(manager: RedisStream, stream: String, group: String, id_to_ack: EntryId)
157 -> impl Future<Item=RedisStream, Error=RedisError> {
158 let options = AckOptions::new(stream.clone(), group.clone(), id_to_ack.clone());
159
160 manager.ack_entry(options)
161 .map(move |(manager, response)| {
162 match response {
163 AckResponse::Ok => println!("{:?} is acknowledged", id_to_ack.to_string()),
164 AckResponse::NotExists =>
165 eprintln!("Couldn't acknowledge {:?}", id_to_ack.to_string())
166 };
167 manager
168 })
169}Sourcepub fn pending_entries(
self,
options: PendingOptions,
) -> impl Future<Item = (Self, Vec<StreamEntry>), Error = RedisError> + Send + 'static
pub fn pending_entries( self, options: PendingOptions, ) -> impl Future<Item = (Self, Vec<StreamEntry>), Error = RedisError> + Send + 'static
Get entries that was not acknowledged but was sent to specified consumer.
§Example
use std::net::SocketAddr;
use futures::Future;
use redis_asio::stream::{RedisStream, PendingOptions, StreamEntry, EntryId};
let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
let pending_options =
PendingOptions::new(
"mystream".to_string(),
"mygroup".to_string(),
"Bob".to_string(),
EntryId::new(0, 0)).unwrap();
let future = RedisStream::connect(address)
.and_then(move |stream: RedisStream| {
stream.pending_entries(pending_options)
})
.map(|(_, entries): (RedisStream, Vec<StreamEntry>)| {
for entry in entries.into_iter() {
println!("Received: {:?}", entry);
}
})
.map_err(|err| eprintln!("something went wrong: {}", err));
tokio::run(future);Sourcepub fn touch_group(
self,
options: TouchGroupOptions,
) -> impl Future<Item = (), Error = RedisError> + Send + 'static
pub fn touch_group( self, options: TouchGroupOptions, ) -> impl Future<Item = (), Error = RedisError> + Send + 'static
Try to create a group. If the group exists already, do not return an error.
§Example
use std::net::SocketAddr;
use futures::Future;
use redis_asio::stream::{RedisStream, TouchGroupOptions, StreamEntry,
EntryId};
let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
let touch_options =
TouchGroupOptions::new("mystream".to_string(), "mygroup".to_string());
let future = RedisStream::connect(&address)
.and_then(move |con|
// create group if the one does not exists yet
con.touch_group(touch_options))
// ignore an error if the group exists already
.then(|_| -> Result<(), ()> { Ok(()) });
tokio::run(future);Examples found in repository?
71fn start_producer(rx: UnboundedReceiver<Message>,
72 stream_name: String,
73 group_name: String,
74 redis_address: SocketAddr) {
75 let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
76
77 // Try to create a group.
78 // If the group exists already, the future will not be set into an error.
79 // The create_group variable is the Future<Item=(), Error=()>.
80 let create_group = RedisStream::connect(&redis_address)
81 .and_then(move |con|
82 // Create group if the one does not exists yet.
83 con.touch_group(touch_options))
84 .then(|_| -> RedisResult<()> { Ok(()) });
85
86 // Creates and holds a connection to the Redis Server, waits new messages from
87 // the channel receiver (rx) and send them to a Redis stream.
88 //
89 // Note nothing will happen if the previous future has failed.
90 // The producer variable in a result is the Future<Item=(), Error=()>.
91 // The Item and Error are required by tokio::run().
92 let producer = create_group
93 .and_then(move |_| {
94 RedisStream::connect(&redis_address)
95 })
96 .and_then(move |producer| {
97 rx
98 .map_err(|_|
99 RedisError::new(RedisErrorKind::InternalError,
100 "Something went wrong with UnboundedChannel".to_string()))
101 // Use fold() to redirect messages from the channel receiver (rx) to the Redis stream.
102 .fold(producer, move |producer, message| {
103 let options = SendEntryOptions::new(stream_name.clone());
104
105 // Serialize the message to pairs of key-value.
106 let data = message.into_redis_stream_entry();
107
108 producer
109 .send_entry(options, data)
110 .map(|(producer, added_entry_id)| {
111 println!("{:?} has sent", added_entry_id.to_string());
112 println!("Please enter a message");
113 producer
114 })
115 })
116 })
117 .map(|_| ())
118 .map_err(|err| println!("{}", err));
119
120 tokio::run(producer);
121}More examples
67fn main() {
68 println!("Consumer example has started");
69 println!("Please enter a STREAM to listen on");
70 let stream_name = read_stdin();
71 println!("Please enter a GROUP");
72 let group_name = read_stdin();
73 println!("Please enter a CONSUMER");
74 let consumer_name = read_stdin();
75
76 let redis_address = env::var("REDIS_URL")
77 .unwrap_or("127.0.0.1:6379".to_string())
78 .parse::<SocketAddr>().expect("Couldn't parse Redis URl");
79
80 let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
81
82 // Try to create a group.
83 // If the group exists already, the future will not be set into an error.
84 // The create_group variable is the Future<Item=(), Error=()>.
85 let create_group = RedisStream::connect(&redis_address)
86 .and_then(move |con|
87 // Create group if the one does not exists yet
88 con.touch_group(touch_options))
89 .then(|_| -> RedisResult<()> { Ok(()) });
90
91 // Start the consuming after the group has been checked.
92 //
93 // Note nothing will happen if the previous future has failed.
94 // The consumer variable in a result is the Future<Item=(), Error=()>.
95 // The Item and Error are required by tokio::run().
96 let consumer = create_group
97 .and_then(move |_| {
98 // Create two connections to the Redis Server:
99 // first will be used for managing (send Acknowledge request),
100 // second will be used for receive entries from Redis Server.
101 let manager = RedisStream::connect(&redis_address);
102 let consumer = RedisStream::connect(&redis_address);
103 consumer.join(manager)
104 })
105 .and_then(move |(connection, manager)| {
106 // Create an unbounded channel to allow the consumer notifies the manager
107 // about received and unacknowledged yet entries.
108 let (tx, rx) = unbounded::<EntryId>();
109
110 // Copy of stream_name and group_name to move it into ack_entry future.
111 let stream = stream_name.clone();
112 let group = group_name.clone();
113
114 let ack_entry = rx
115 .map_err(|_|
116 RedisError::new(RedisErrorKind::InternalError,
117 "Something went wrong with UnboundedChannel".to_string()))
118 // Use fold() to redirect notification from the channel receiver (rx) to the manager.
119 .fold(manager, move |manager, id_to_ack|
120 ack_stream_entry(manager, stream.clone(), group.clone(), id_to_ack))
121 .map(|_| ())
122 .map_err(|_| ());
123
124 // Spawn the ack_entry future to be handled separately from the process_entry future.
125 tokio::spawn(ack_entry);
126
127 let group = RedisGroup::new(group_name, consumer_name);
128 let options = SubscribeOptions::with_group(vec![stream_name], group);
129
130 // Subscribe to a Redis stream, processes any incoming entries and sends
131 // entry ids of success processed entries to the manager via the channel sender (tx).
132 let process_entry =
133 connection.subscribe(options)
134 .and_then(move |subscribe|
135 subscribe.for_each(move |entries|
136 process_stream_entries(tx.clone(), entries)));
137 // Return and run later the process_entry future.
138 process_entry
139 })
140 .map_err(|err| eprintln!("Something went wrong: {:?}", err));
141
142 tokio::run(consumer);
143}