streamweave-redis
Redis Streams integration for StreamWeave
Produce to and consume from Redis Streams with streaming processing.
The streamweave-redis package provides Redis Streams producers and consumers for StreamWeave. It enables reading from Redis Streams and writing to Redis Streams with consumer groups, message acknowledgment, and stream length management.
✨ Key Features
- RedisProducer: Consume messages from Redis Streams and stream them
- RedisConsumer: Produce messages to Redis Streams from streams
- Consumer Groups: Support for Redis Streams consumer groups
- Message Acknowledgment: Automatic and manual message acknowledgment
- Stream Length Management: Configurable stream length limits
- XADD/XREAD Operations: Direct Redis Streams command support
- Error Handling: Comprehensive error handling with retry strategies
📦 Installation
Add this to your Cargo.toml:
[]
= { = "0.3.0", = ["redis"] }
🚀 Quick Start
Consume from Redis Streams
use ;
use PipelineBuilder;
let config = default
.with_connection_url
.with_stream
.with_group
.with_consumer
.with_start_id;
let pipeline = new
.producer
.consumer;
pipeline.run.await?;
Produce to Redis Streams
use ;
use PipelineBuilder;
use Serialize;
let config = default
.with_connection_url
.with_stream
.with_maxlen;
let pipeline = new
.producer
.consumer;
pipeline.run.await?;
📖 API Overview
RedisProducer
Consumes messages from Redis Streams and streams them:
Key Methods:
new(config)- Create producer with Redis configurationwith_error_strategy(strategy)- Set error handling strategywith_name(name)- Set component nameproduce()- Generate stream from Redis Streams messages
RedisConsumer
Produces messages to Redis Streams from streams:
Key Methods:
new(config)- Create consumer with Redis configurationwith_error_strategy(strategy)- Set error handling strategywith_name(name)- Set component nameconsume(stream)- Send stream items to Redis Stream
RedisMessage
Represents a message received from Redis Streams:
📚 Usage Examples
Consumer Group Setup
Configure consumer groups for distributed processing:
use ;
let config = default
.with_connection_url
.with_stream
.with_group
.with_consumer
.with_start_id // Read new messages only
.with_block_ms
.with_count
.with_auto_ack;
let producer = new;
Reading from Beginning
Read all messages from the beginning of a stream:
use ;
let config = default
.with_connection_url
.with_stream
.with_start_id // Start from beginning
.with_block_ms;
let producer = new;
Reading New Messages Only
Read only new messages (after current position):
use ;
let config = default
.with_connection_url
.with_stream
.with_start_id // Read only new messages
.with_block_ms;
let producer = new;
Stream Length Management
Limit stream length to prevent unbounded growth:
use ;
let config = default
.with_connection_url
.with_stream
.with_maxlen // Keep only last 10000 messages
.with_approximate_maxlen; // More efficient for large streams
let consumer = new;
Message Acknowledgment
Configure automatic message acknowledgment:
use ;
let config = default
.with_connection_url
.with_stream
.with_group
.with_consumer
.with_auto_ack; // Automatically acknowledge messages
let producer = new;
Error Handling
Configure error handling strategies:
use ;
use ErrorStrategy;
let config = default
.with_connection_url
.with_stream;
let producer = new
.with_error_strategy; // Retry up to 5 times
🏗️ Architecture
Redis Streams integration flow:
┌──────────┐
│ Redis │───> RedisProducer ───> Stream<RedisMessage> ───> Transformer ───> Stream<T> ───> RedisConsumer ───> Redis
│ Streams │ │ Streams │
└──────────┘ └──────────┘
Redis Streams Flow:
- RedisProducer uses XREAD/XREADGROUP to consume messages
- RedisMessage items flow through transformers
- RedisConsumer serializes and uses XADD to send items to stream
- Consumer groups manage message distribution
- Message acknowledgment tracks processing status
🔧 Configuration
Consumer Configuration (RedisConsumerConfig)
- connection_url: Redis connection URL (e.g., "redis://localhost:6379")
- stream: Stream name to consume from
- group: Consumer group name (optional, enables consumer groups)
- consumer: Consumer name (required if using consumer groups)
- start_id: Starting ID ("0" for beginning, "$" for new messages, ">" for consumer groups)
- block_ms: Block time in milliseconds (0 for non-blocking)
- count: Number of messages to read per call
- auto_ack: Whether to automatically acknowledge messages
Producer Configuration (RedisProducerConfig)
- connection_url: Redis connection URL
- stream: Stream name to produce to
- maxlen: Maximum length of stream (None for no limit)
- approximate_maxlen: Use approximate maxlen (more efficient)
🔍 Error Handling
Redis errors are handled through the error system:
use ErrorStrategy;
let producer = new
.with_error_strategy; // Skip errors and continue
let consumer = new
.with_error_strategy; // Retry up to 3 times
⚡ Performance Considerations
- Stream Length: Use maxlen to prevent unbounded growth
- Approximate Maxlen: Use approximate_maxlen for better performance
- Blocking Reads: Use block_ms for efficient polling
- Batch Reads: Use count to read multiple messages at once
- Consumer Groups: Use consumer groups for parallel processing
📝 Examples
For more examples, see:
🔗 Dependencies
streamweave-redis depends on:
streamweave- Core traitsstreamweave-error- Error handlingstreamweave-message(optional) - Message envelope supportredis- Redis client librarytokio- Async runtimefutures- Stream utilitiesserde- Serialization supportasync-stream- Stream utilities
🎯 Use Cases
Redis Streams integration is used for:
- Event Streaming: Stream events from Redis Streams
- Message Queues: Use Redis Streams as message queues
- Real-Time Processing: Process Redis Streams messages in real-time
- Consumer Groups: Distribute processing across consumers
- Event Sourcing: Implement event sourcing patterns
📖 Documentation
🔗 See Also
- streamweave - Core traits
- streamweave-error - Error handling
- streamweave-message - Message envelopes
- streamweave-offset - Offset management
🤝 Contributing
Contributions are welcome! Please see the Contributing Guide for details.
📄 License
This project is licensed under the CC BY-SA 4.0 license.