Expand description
Β§streamweave-redis
Redis Streams integration for StreamWeave
Produce to and consume from Redis Streams with streaming processing.
The streamweave-redis package provides Redis Streams producers and consumers for StreamWeave. It enables reading from Redis Streams and writing to Redis Streams with consumer groups, message acknowledgment, and stream length management.
Β§β¨ Key Features
- RedisProducer: Consume messages from Redis Streams and stream them
- RedisConsumer: Produce messages to Redis Streams from streams
- Consumer Groups: Support for Redis Streams consumer groups
- Message Acknowledgment: Automatic and manual message acknowledgment
- Stream Length Management: Configurable stream length limits
- XADD/XREAD Operations: Direct Redis Streams command support
- Error Handling: Comprehensive error handling with retry strategies
Β§π¦ Installation
Add this to your Cargo.toml:
[dependencies]
streamweave-redis = { version = "0.3.0", features = ["redis"] }Β§π Quick Start
Β§Consume from Redis Streams
use streamweave_redis::producers::{RedisProducer, RedisConsumerConfig};
use streamweave_pipeline::PipelineBuilder;
let config = RedisConsumerConfig::default()
.with_connection_url("redis://localhost:6379")
.with_stream("mystream")
.with_group("my-group")
.with_consumer("consumer-1")
.with_start_id("0");
let pipeline = PipelineBuilder::new()
.producer(RedisProducer::new(config))
.consumer(/* process messages */);
pipeline.run().await?;Β§Produce to Redis Streams
use streamweave_redis::consumers::{RedisConsumer, RedisProducerConfig};
use streamweave_pipeline::PipelineBuilder;
use serde::Serialize;
#[derive(Serialize)]
struct Event {
id: u32,
message: String,
}
let config = RedisProducerConfig::default()
.with_connection_url("redis://localhost:6379")
.with_stream("mystream")
.with_maxlen(10000);
let pipeline = PipelineBuilder::new()
.producer(/* produce events */)
.consumer(RedisConsumer::<Event>::new(config));
pipeline.run().await?;Β§π API Overview
Β§RedisProducer
Consumes messages from Redis Streams and streams them:
pub struct RedisProducer {
pub config: ProducerConfig<RedisMessage>,
pub redis_config: RedisConsumerConfig,
}Key Methods:
new(config)- Create producer with Redis configurationwith_error_strategy(strategy)- Set error handling strategywith_name(name)- Set component nameproduce()- Generate stream from Redis Streams messages
Β§RedisConsumer
Produces messages to Redis Streams from streams:
pub struct RedisConsumer<T> {
pub config: ConsumerConfig<T>,
pub redis_config: RedisProducerConfig,
}Key Methods:
new(config)- Create consumer with Redis configurationwith_error_strategy(strategy)- Set error handling strategywith_name(name)- Set component nameconsume(stream)- Send stream items to Redis Stream
Β§RedisMessage
Represents a message received from Redis Streams:
pub struct RedisMessage {
pub stream: String,
pub id: String,
pub fields: HashMap<String, String>,
}Β§π Usage Examples
Β§Consumer Group Setup
Configure consumer groups for distributed processing:
use streamweave_redis::producers::{RedisProducer, RedisConsumerConfig};
let config = RedisConsumerConfig::default()
.with_connection_url("redis://localhost:6379")
.with_stream("events")
.with_group("my-consumer-group")
.with_consumer("consumer-1")
.with_start_id(">") // Read new messages only
.with_block_ms(5000)
.with_count(100)
.with_auto_ack(true);
let producer = RedisProducer::new(config);Β§Reading from Beginning
Read all messages from the beginning of a stream:
use streamweave_redis::producers::{RedisProducer, RedisConsumerConfig};
let config = RedisConsumerConfig::default()
.with_connection_url("redis://localhost:6379")
.with_stream("events")
.with_start_id("0") // Start from beginning
.with_block_ms(1000);
let producer = RedisProducer::new(config);Β§Reading New Messages Only
Read only new messages (after current position):
use streamweave_redis::producers::{RedisProducer, RedisConsumerConfig};
let config = RedisConsumerConfig::default()
.with_connection_url("redis://localhost:6379")
.with_stream("events")
.with_start_id("$") // Read only new messages
.with_block_ms(5000);
let producer = RedisProducer::new(config);Β§Stream Length Management
Limit stream length to prevent unbounded growth:
use streamweave_redis::consumers::{RedisConsumer, RedisProducerConfig};
let config = RedisProducerConfig::default()
.with_connection_url("redis://localhost:6379")
.with_stream("events")
.with_maxlen(10000) // Keep only last 10000 messages
.with_approximate_maxlen(true); // More efficient for large streams
let consumer = RedisConsumer::<Event>::new(config);Β§Message Acknowledgment
Configure automatic message acknowledgment:
use streamweave_redis::producers::{RedisProducer, RedisConsumerConfig};
let config = RedisConsumerConfig::default()
.with_connection_url("redis://localhost:6379")
.with_stream("events")
.with_group("my-group")
.with_consumer("consumer-1")
.with_auto_ack(true); // Automatically acknowledge messages
let producer = RedisProducer::new(config);Β§Error Handling
Configure error handling strategies:
use streamweave_redis::producers::{RedisProducer, RedisConsumerConfig};
use streamweave_error::ErrorStrategy;
let config = RedisConsumerConfig::default()
.with_connection_url("redis://localhost:6379")
.with_stream("events");
let producer = RedisProducer::new(config)
.with_error_strategy(ErrorStrategy::Retry(5)); // Retry up to 5 timesΒ§ποΈ Architecture
Redis Streams integration flow:
ββββββββββββ
β Redis ββββ> RedisProducer βββ> Stream<RedisMessage> βββ> Transformer βββ> Stream<T> βββ> RedisConsumer βββ> Redis
β Streams β β Streams β
ββββββββββββ ββββββββββββRedis Streams Flow:
- RedisProducer uses XREAD/XREADGROUP to consume messages
- RedisMessage items flow through transformers
- RedisConsumer serializes and uses XADD to send items to stream
- Consumer groups manage message distribution
- Message acknowledgment tracks processing status
Β§π§ Configuration
Β§Consumer Configuration (RedisConsumerConfig)
- connection_url: Redis connection URL (e.g., βredis://localhost:6379β)
- stream: Stream name to consume from
- group: Consumer group name (optional, enables consumer groups)
- consumer: Consumer name (required if using consumer groups)
- start_id: Starting ID (β0β for beginning, β$β for new messages, β>β for consumer groups)
- block_ms: Block time in milliseconds (0 for non-blocking)
- count: Number of messages to read per call
- auto_ack: Whether to automatically acknowledge messages
Β§Producer Configuration (RedisProducerConfig)
- connection_url: Redis connection URL
- stream: Stream name to produce to
- maxlen: Maximum length of stream (None for no limit)
- approximate_maxlen: Use approximate maxlen (more efficient)
Β§π Error Handling
Redis errors are handled through the error system:
use streamweave_error::ErrorStrategy;
let producer = RedisProducer::new(config)
.with_error_strategy(ErrorStrategy::Skip); // Skip errors and continue
let consumer = RedisConsumer::<Event>::new(consumer_config)
.with_error_strategy(ErrorStrategy::Retry(3)); // Retry up to 3 timesΒ§β‘ Performance Considerations
- Stream Length: Use maxlen to prevent unbounded growth
- Approximate Maxlen: Use approximate_maxlen for better performance
- Blocking Reads: Use block_ms for efficient polling
- Batch Reads: Use count to read multiple messages at once
- Consumer Groups: Use consumer groups for parallel processing
Β§π Examples
For more examples, see:
Β§π Dependencies
streamweave-redis depends on:
streamweave- Core traitsstreamweave-error- Error handlingstreamweave-message(optional) - Message envelope supportredis- Redis client librarytokio- Async runtimefutures- Stream utilitiesserde- Serialization supportasync-stream- Stream utilities
Β§π― Use Cases
Redis Streams integration is used for:
- Event Streaming: Stream events from Redis Streams
- Message Queues: Use Redis Streams as message queues
- Real-Time Processing: Process Redis Streams messages in real-time
- Consumer Groups: Distribute processing across consumers
- Event Sourcing: Implement event sourcing patterns
Β§π Documentation
Β§π See Also
- streamweave - Core traits
- streamweave-error - Error handling
- streamweave-message - Message envelopes
- streamweave-offset - Offset management
Β§π€ Contributing
Contributions are welcome! Please see the Contributing Guide for details.
Β§π License
This project is licensed under the CC BY-SA 4.0 license.
Re-exportsΒ§
pub use consumers::RedisProducerConfig;pub use producers::RedisConsumerConfig;