streamweave-redis 0.4.0

Redis producer and consumer for StreamWeave
Documentation

streamweave-redis

Crates.io Documentation License: CC BY-SA 4.0

Redis Streams integration for StreamWeave
Produce to and consume from Redis Streams with streaming processing.

The streamweave-redis package provides Redis Streams producers and consumers for StreamWeave. It enables reading from Redis Streams and writing to Redis Streams with consumer groups, message acknowledgment, and stream length management.

✨ Key Features

  • RedisProducer: Consume messages from Redis Streams and stream them
  • RedisConsumer: Produce messages to Redis Streams from streams
  • Consumer Groups: Support for Redis Streams consumer groups
  • Message Acknowledgment: Automatic and manual message acknowledgment
  • Stream Length Management: Configurable stream length limits
  • XADD/XREAD Operations: Direct Redis Streams command support
  • Error Handling: Comprehensive error handling with retry strategies

📦 Installation

Add this to your Cargo.toml:

[dependencies]
streamweave-redis = { version = "0.3.0", features = ["redis"] }

🚀 Quick Start

Consume from Redis Streams

use streamweave_redis::producers::{RedisProducer, RedisConsumerConfig};
use streamweave_pipeline::PipelineBuilder;

let config = RedisConsumerConfig::default()
    .with_connection_url("redis://localhost:6379")
    .with_stream("mystream")
    .with_group("my-group")
    .with_consumer("consumer-1")
    .with_start_id("0");

let pipeline = PipelineBuilder::new()
    .producer(RedisProducer::new(config))
    .consumer(/* process messages */);

pipeline.run().await?;

Produce to Redis Streams

use streamweave_redis::consumers::{RedisConsumer, RedisProducerConfig};
use streamweave_pipeline::PipelineBuilder;
use serde::Serialize;

#[derive(Serialize)]
struct Event {
    id: u32,
    message: String,
}

let config = RedisProducerConfig::default()
    .with_connection_url("redis://localhost:6379")
    .with_stream("mystream")
    .with_maxlen(10000);

let pipeline = PipelineBuilder::new()
    .producer(/* produce events */)
    .consumer(RedisConsumer::<Event>::new(config));

pipeline.run().await?;

📖 API Overview

RedisProducer

Consumes messages from Redis Streams and streams them:

pub struct RedisProducer {
    pub config: ProducerConfig<RedisMessage>,
    pub redis_config: RedisConsumerConfig,
}

Key Methods:

  • new(config) - Create producer with Redis configuration
  • with_error_strategy(strategy) - Set error handling strategy
  • with_name(name) - Set component name
  • produce() - Generate stream from Redis Streams messages

RedisConsumer

Produces messages to Redis Streams from streams:

pub struct RedisConsumer<T> {
    pub config: ConsumerConfig<T>,
    pub redis_config: RedisProducerConfig,
}

Key Methods:

  • new(config) - Create consumer with Redis configuration
  • with_error_strategy(strategy) - Set error handling strategy
  • with_name(name) - Set component name
  • consume(stream) - Send stream items to Redis Stream

RedisMessage

Represents a message received from Redis Streams:

pub struct RedisMessage {
    pub stream: String,
    pub id: String,
    pub fields: HashMap<String, String>,
}

📚 Usage Examples

Consumer Group Setup

Configure consumer groups for distributed processing:

use streamweave_redis::producers::{RedisProducer, RedisConsumerConfig};

let config = RedisConsumerConfig::default()
    .with_connection_url("redis://localhost:6379")
    .with_stream("events")
    .with_group("my-consumer-group")
    .with_consumer("consumer-1")
    .with_start_id(">")  // Read new messages only
    .with_block_ms(5000)
    .with_count(100)
    .with_auto_ack(true);

let producer = RedisProducer::new(config);

Reading from Beginning

Read all messages from the beginning of a stream:

use streamweave_redis::producers::{RedisProducer, RedisConsumerConfig};

let config = RedisConsumerConfig::default()
    .with_connection_url("redis://localhost:6379")
    .with_stream("events")
    .with_start_id("0")  // Start from beginning
    .with_block_ms(1000);

let producer = RedisProducer::new(config);

Reading New Messages Only

Read only new messages (after current position):

use streamweave_redis::producers::{RedisProducer, RedisConsumerConfig};

let config = RedisConsumerConfig::default()
    .with_connection_url("redis://localhost:6379")
    .with_stream("events")
    .with_start_id("$")  // Read only new messages
    .with_block_ms(5000);

let producer = RedisProducer::new(config);

Stream Length Management

Limit stream length to prevent unbounded growth:

use streamweave_redis::consumers::{RedisConsumer, RedisProducerConfig};

let config = RedisProducerConfig::default()
    .with_connection_url("redis://localhost:6379")
    .with_stream("events")
    .with_maxlen(10000)  // Keep only last 10000 messages
    .with_approximate_maxlen(true);  // More efficient for large streams

let consumer = RedisConsumer::<Event>::new(config);

Message Acknowledgment

Configure automatic message acknowledgment:

use streamweave_redis::producers::{RedisProducer, RedisConsumerConfig};

let config = RedisConsumerConfig::default()
    .with_connection_url("redis://localhost:6379")
    .with_stream("events")
    .with_group("my-group")
    .with_consumer("consumer-1")
    .with_auto_ack(true);  // Automatically acknowledge messages

let producer = RedisProducer::new(config);

Error Handling

Configure error handling strategies:

use streamweave_redis::producers::{RedisProducer, RedisConsumerConfig};
use streamweave_error::ErrorStrategy;

let config = RedisConsumerConfig::default()
    .with_connection_url("redis://localhost:6379")
    .with_stream("events");

let producer = RedisProducer::new(config)
    .with_error_strategy(ErrorStrategy::Retry(5));  // Retry up to 5 times

🏗️ Architecture

Redis Streams integration flow:

┌──────────┐
│  Redis   │───> RedisProducer ───> Stream<RedisMessage> ───> Transformer ───> Stream<T> ───> RedisConsumer ───> Redis
│ Streams  │                                                                                                                      │ Streams  │
└──────────┘                                                                                                                      └──────────┘

Redis Streams Flow:

  1. RedisProducer uses XREAD/XREADGROUP to consume messages
  2. RedisMessage items flow through transformers
  3. RedisConsumer serializes and uses XADD to send items to stream
  4. Consumer groups manage message distribution
  5. Message acknowledgment tracks processing status

🔧 Configuration

Consumer Configuration (RedisConsumerConfig)

  • connection_url: Redis connection URL (e.g., "redis://localhost:6379")
  • stream: Stream name to consume from
  • group: Consumer group name (optional, enables consumer groups)
  • consumer: Consumer name (required if using consumer groups)
  • start_id: Starting ID ("0" for beginning, "$" for new messages, ">" for consumer groups)
  • block_ms: Block time in milliseconds (0 for non-blocking)
  • count: Number of messages to read per call
  • auto_ack: Whether to automatically acknowledge messages

Producer Configuration (RedisProducerConfig)

  • connection_url: Redis connection URL
  • stream: Stream name to produce to
  • maxlen: Maximum length of stream (None for no limit)
  • approximate_maxlen: Use approximate maxlen (more efficient)

🔍 Error Handling

Redis errors are handled through the error system:

use streamweave_error::ErrorStrategy;

let producer = RedisProducer::new(config)
    .with_error_strategy(ErrorStrategy::Skip);  // Skip errors and continue

let consumer = RedisConsumer::<Event>::new(consumer_config)
    .with_error_strategy(ErrorStrategy::Retry(3));  // Retry up to 3 times

⚡ Performance Considerations

  • Stream Length: Use maxlen to prevent unbounded growth
  • Approximate Maxlen: Use approximate_maxlen for better performance
  • Blocking Reads: Use block_ms for efficient polling
  • Batch Reads: Use count to read multiple messages at once
  • Consumer Groups: Use consumer groups for parallel processing

📝 Examples

For more examples, see:

🔗 Dependencies

streamweave-redis depends on:

  • streamweave - Core traits
  • streamweave-error - Error handling
  • streamweave-message (optional) - Message envelope support
  • redis - Redis client library
  • tokio - Async runtime
  • futures - Stream utilities
  • serde - Serialization support
  • async-stream - Stream utilities

🎯 Use Cases

Redis Streams integration is used for:

  1. Event Streaming: Stream events from Redis Streams
  2. Message Queues: Use Redis Streams as message queues
  3. Real-Time Processing: Process Redis Streams messages in real-time
  4. Consumer Groups: Distribute processing across consumers
  5. Event Sourcing: Implement event sourcing patterns

📖 Documentation

🔗 See Also

🤝 Contributing

Contributions are welcome! Please see the Contributing Guide for details.

📄 License

This project is licensed under the CC BY-SA 4.0 license.