Expand description
Redis Streams support for event sourcing and messaging
Redis Streams provide a powerful data structure for handling time-series data, event sourcing, and message queuing. This module provides a high-level API for working with Redis Streams.
§Examples
§Basic Stream Operations
use redis_oxide::{Client, ConnectionConfig};
use std::collections::HashMap;
let config = ConnectionConfig::new("redis://localhost:6379");
let client = Client::connect(config).await?;
// Add entries to a stream
let mut fields = HashMap::new();
fields.insert("user_id".to_string(), "123".to_string());
fields.insert("action".to_string(), "login".to_string());
let entry_id = client.xadd("events", "*", fields).await?;
println!("Added entry: {}", entry_id);
// Read from stream
let entries = client.xrange("events", "-", "+", Some(10)).await?;
for entry in entries {
println!("Entry {}: {:?}", entry.id, entry.fields);
}§Consumer Groups
use redis_oxide::{Client, ConnectionConfig};
let config = ConnectionConfig::new("redis://localhost:6379");
let client = Client::connect(config).await?;
// Create a consumer group
client.xgroup_create("events", "processors", "$", true).await?;
// Read from the group
let messages = client.xreadgroup(
"processors",
"worker-1",
vec![("events".to_string(), ">".to_string())],
Some(1),
Some(std::time::Duration::from_secs(1))
).await?;
for (stream, entries) in messages {
for entry in entries {
println!("Processing {}: {:?}", entry.id, entry.fields);
// Process the message...
// Acknowledge the message
client.xack("events", "processors", vec![entry.id]).await?;
}
}Structs§
- Consumer
Group Info - Information about a consumer group
- Consumer
Info - Information about a consumer in a group
- Pending
Message - Pending message information
- Read
Options - Options for XREAD and XREADGROUP commands
- Stream
Entry - Represents a single entry in a Redis Stream
- Stream
Info - Information about a Redis Stream
- Stream
Range - Stream range options for XRANGE and XREVRANGE
Functions§
- parse_
stream_ entries - Parse stream entries from Redis response
- parse_
stream_ info - Parse stream info from XINFO STREAM response
- parse_
xread_ response - Parse XREAD/XREADGROUP response