pub struct RedisStreams { /* private fields */ }Expand description
Redis Streams client for event-driven architectures
Wraps Redis ConnectionManager to provide high-level streaming operations.
Implementations§
Source§impl RedisStreams
impl RedisStreams
Sourcepub async fn stream_add(
&self,
stream_key: &str,
fields: Vec<(String, String)>,
maxlen: Option<usize>,
) -> Result<String>
pub async fn stream_add( &self, stream_key: &str, fields: Vec<(String, String)>, maxlen: Option<usize>, ) -> Result<String>
Publish data to Redis Stream using XADD
§Arguments
stream_key- Name of the Redis Stream (e.g., “events_stream”)fields- Vec of field-value pairs to add to the streammaxlen- Optional maximum length for stream trimming (uses MAXLEN ~ for approximate trimming)
§Returns
The stream entry ID generated by Redis (e.g., “1234567890123-0”)
§Example
let event_id = streams.stream_add(
"user_events",
vec![
("user_id".to_string(), "42".to_string()),
("action".to_string(), "purchase".to_string()),
("amount".to_string(), "99.99".to_string()),
],
Some(10000) // Keep max 10k events
).await?;
println!("Event ID: {}", event_id);§Errors
Returns an error if the XADD command fails.
Sourcepub async fn stream_read_latest(
&self,
stream_key: &str,
count: usize,
) -> Result<Vec<StreamEntry>>
pub async fn stream_read_latest( &self, stream_key: &str, count: usize, ) -> Result<Vec<StreamEntry>>
Read latest entries from Redis Stream using XREVRANGE
§Arguments
stream_key- Name of the Redis Streamcount- Number of latest entries to retrieve
§Returns
Vector of (entry_id, fields) tuples, ordered from newest to oldest
§Example
let latest_10 = streams.stream_read_latest("events", 10).await?;
for (id, fields) in latest_10 {
println!("Event {}: {:?}", id, fields);
}§Errors
Returns an error if the XREVRANGE command fails.
Sourcepub async fn stream_read(
&self,
stream_key: &str,
last_id: &str,
count: usize,
block_ms: Option<usize>,
) -> Result<Vec<StreamEntry>>
pub async fn stream_read( &self, stream_key: &str, last_id: &str, count: usize, block_ms: Option<usize>, ) -> Result<Vec<StreamEntry>>
Read entries from Redis Stream using XREAD (blocking or non-blocking)
§Arguments
stream_key- Name of the Redis Streamlast_id- Last entry ID seen (use “0” to read from beginning, “$” for only new entries)count- Maximum number of entries to retrieveblock_ms- Optional blocking timeout in milliseconds (None for non-blocking)
§Returns
Vector of (entry_id, fields) tuples
§Example
// Non-blocking: read new entries since last seen ID
let entries = streams.stream_read("events", "1234567890-0", 100, None).await?;
// Blocking: wait up to 5 seconds for new entries
let new_entries = streams.stream_read("events", "$", 100, Some(5000)).await?;§Errors
Returns an error if the XREAD command fails.
Trait Implementations§
Source§impl Clone for RedisStreams
impl Clone for RedisStreams
Source§fn clone(&self) -> RedisStreams
fn clone(&self) -> RedisStreams
Returns a duplicate of the value. Read more
1.0.0§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreSource§impl StreamingBackend for RedisStreams
Implement StreamingBackend trait for RedisStreams
impl StreamingBackend for RedisStreams
Implement StreamingBackend trait for RedisStreams
This enables RedisStreams to be used as a pluggable streaming backend.
Source§fn stream_add<'life0, 'life1, 'async_trait>(
&'life0 self,
stream_key: &'life1 str,
fields: Vec<(String, String)>,
maxlen: Option<usize>,
) -> Pin<Box<dyn Future<Output = Result<String>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn stream_add<'life0, 'life1, 'async_trait>(
&'life0 self,
stream_key: &'life1 str,
fields: Vec<(String, String)>,
maxlen: Option<usize>,
) -> Pin<Box<dyn Future<Output = Result<String>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Add an entry to a stream Read more
Source§fn stream_read_latest<'life0, 'life1, 'async_trait>(
&'life0 self,
stream_key: &'life1 str,
count: usize,
) -> Pin<Box<dyn Future<Output = Result<Vec<(String, Vec<(String, String)>)>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn stream_read_latest<'life0, 'life1, 'async_trait>(
&'life0 self,
stream_key: &'life1 str,
count: usize,
) -> Pin<Box<dyn Future<Output = Result<Vec<(String, Vec<(String, String)>)>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Read the latest N entries from a stream (newest first) Read more
Source§fn stream_read<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
stream_key: &'life1 str,
last_id: &'life2 str,
count: usize,
block_ms: Option<usize>,
) -> Pin<Box<dyn Future<Output = Result<Vec<(String, Vec<(String, String)>)>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn stream_read<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
stream_key: &'life1 str,
last_id: &'life2 str,
count: usize,
block_ms: Option<usize>,
) -> Pin<Box<dyn Future<Output = Result<Vec<(String, Vec<(String, String)>)>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Read entries from a stream with optional blocking Read more
Auto Trait Implementations§
impl Freeze for RedisStreams
impl !RefUnwindSafe for RedisStreams
impl Send for RedisStreams
impl Sync for RedisStreams
impl Unpin for RedisStreams
impl !UnwindSafe for RedisStreams
Blanket Implementations§
§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§unsafe fn clone_to_uninit(&self, dest: *mut u8)
unsafe fn clone_to_uninit(&self, dest: *mut u8)
🔬This is a nightly-only experimental API. (
clone_to_uninit)