Module streams

Module streams 

Source
Expand description

Redis Streams support for event sourcing and messaging

Redis Streams provide a powerful data structure for handling time-series data, event sourcing, and message queuing. This module provides a high-level API for working with Redis Streams.

§Examples

§Basic Stream Operations

use redis_oxide::{Client, ConnectionConfig};
use std::collections::HashMap;

let config = ConnectionConfig::new("redis://localhost:6379");
let client = Client::connect(config).await?;

// Add entries to a stream
let mut fields = HashMap::new();
fields.insert("user_id".to_string(), "123".to_string());
fields.insert("action".to_string(), "login".to_string());

let entry_id = client.xadd("events", "*", fields).await?;
println!("Added entry: {}", entry_id);

// Read from stream
let entries = client.xrange("events", "-", "+", Some(10)).await?;
for entry in entries {
    println!("Entry {}: {:?}", entry.id, entry.fields);
}

§Consumer Groups

use redis_oxide::{Client, ConnectionConfig};

let config = ConnectionConfig::new("redis://localhost:6379");
let client = Client::connect(config).await?;

// Create a consumer group
client.xgroup_create("events", "processors", "$", true).await?;

// Read from the group
let messages = client.xreadgroup(
    "processors",
    "worker-1",
    vec![("events".to_string(), ">".to_string())],
    Some(1),
    Some(std::time::Duration::from_secs(1))
).await?;

for (stream, entries) in messages {
    for entry in entries {
        println!("Processing {}: {:?}", entry.id, entry.fields);
        // Process the message...
         
        // Acknowledge the message
        client.xack("events", "processors", vec![entry.id]).await?;
    }
}

Structs§

ConsumerGroupInfo
Information about a consumer group
ConsumerInfo
Information about a consumer in a group
PendingMessage
Pending message information
ReadOptions
Options for XREAD and XREADGROUP commands
StreamEntry
Represents a single entry in a Redis Stream
StreamInfo
Information about a Redis Stream
StreamRange
Stream range options for XRANGE and XREVRANGE

Functions§

parse_stream_entries
Parse stream entries from Redis response
parse_stream_info
Parse stream info from XINFO STREAM response
parse_xread_response
Parse XREAD/XREADGROUP response