Platform Source
A Redis Streams-based source plugin for Drasi that consumes CloudEvent-wrapped change events from the Drasi platform infrastructure.
Overview
The Platform Source provides integration between external Drasi platform sources and drasi-lib's continuous query engine. It consumes events from Redis Streams using consumer groups, transforming platform SDK event formats into drasi-core's SourceChange format for processing by continuous queries.
Key Capabilities
- Redis Streams Integration: Consumes events from Redis Streams using consumer groups for reliable delivery
- CloudEvent Support: Parses CloudEvent-wrapped messages containing node and relation changes
- At-Least-Once Delivery: Leverages Redis consumer groups with acknowledgments to ensure reliable event processing
- Horizontal Scaling: Multiple consumers can share the workload within the same consumer group
- Automatic Reconnection: Handles Redis connection failures with exponential backoff and retry logic
- Control Events: Supports control messages for query subscription coordination
- Profiling Support: Captures and propagates timing metadata for end-to-end performance analysis
Use Cases
- Platform Integration: Connect external Drasi platform sources (PostgreSQL, MongoDB, etc.) to drasi-lib queries
- Distributed Processing: Scale event processing across multiple consumer instances
- Event Replay: Replay historical events from Redis Streams for query initialization
- Cross-Service Communication: Enable event-driven communication between Drasi components
Architecture
External Drasi Platform Source
↓
Redis Stream (CloudEvents)
↓
Platform Source (Consumer Group)
↓
Event Transformation
↓
drasi-lib Queries
The platform source acts as a bridge between external Drasi platform sources and drasi-lib queries:
- Reads CloudEvent-wrapped messages from a Redis Stream
- Extracts and transforms events to
SourceChangeformat - Publishes changes to subscribed queries via internal channels
- Acknowledges processed events for reliable delivery
Consumer Groups
Consumer groups enable coordinated consumption across multiple instances:
- Load Balancing: Messages distributed among consumers in the group
- Exactly-Once Per Group: Each message delivered to only one consumer
- Failure Recovery: Unacknowledged messages can be claimed by other consumers
- Position Tracking: Group maintains last processed position in the stream
Configuration
Builder Pattern (Preferred)
The builder pattern provides a type-safe, fluent API for constructing platform sources:
use PlatformSource;
let source = builder
.with_redis_url
.with_stream_key
.with_consumer_group
.with_consumer_name
.with_batch_size
.with_block_ms
.with_dispatch_mode
.with_dispatch_buffer_capacity
.with_auto_start
.build?;
drasi_lib.add_source.await?;
Config Struct Approach
For programmatic configuration or deserialization from files:
use ;
let config = PlatformSourceConfig ;
let source = new?;
drasi_lib.add_source.await?;
YAML Configuration
For declarative configuration in YAML files:
sources:
- id: platform_source
source_type: platform
auto_start: true
dispatch_mode: channel # or "broadcast"
dispatch_buffer_capacity: 1500
properties:
redis_url: "redis://localhost:6379"
stream_key: "sensor-changes"
consumer_group: "drasi-consumers"
consumer_name: "consumer-1"
batch_size: 50
block_ms: 10000
Configuration Options
| Name | Type | Description | Valid Values | Default |
|---|---|---|---|---|
redis_url |
String | Redis connection URL (standard redis:// format) | Valid Redis URL | Required |
stream_key |
String | Redis stream key to consume events from | Any valid stream key | Required |
consumer_group |
String | Consumer group name for coordinated consumption | Any identifier | "drasi-core" |
consumer_name |
Option<String> | Unique consumer name within the group | Any unique ID | Auto-generated from source ID |
batch_size |
usize | Number of events to read per XREADGROUP call | 1-10000 (recommended) | 100 |
block_ms |
u64 | Milliseconds to block waiting for new events | 100-60000 (recommended) | 5000 |
dispatch_mode |
DispatchMode | Event dispatch strategy | Channel, Broadcast |
Channel |
dispatch_buffer_capacity |
usize | Buffer size for dispatch channels | Any positive integer | 1000 |
auto_start |
bool | Whether to start automatically when added to DrasiLib | true, false |
true |
Configuration Details
Redis URL Formats
Standard Redis connection string formats are supported:
redis://localhost:6379- Local Redis without authenticationredis://:password@host:6379- Redis with password authenticationredis://user:password@host:6379- Redis with username and passwordrediss://host:6379- Redis with TLS encryption
Consumer Name
The consumer name should be unique within a consumer group:
- Kubernetes: Use
${HOSTNAME}or pod name for automatic uniqueness - Docker: Use container ID or hostname
- Local Development: Can be omitted (auto-generated from source ID)
Batch Size
Controls throughput vs. latency tradeoff:
- Higher values (100-500): Better throughput, higher memory usage
- Lower values (10-50): Lower latency, more frequent Redis calls
- Recommended: Start with 100, tune based on event rate
Block Timeout
Controls responsiveness vs. CPU usage:
- Higher values (10000-30000ms): Lower CPU, higher shutdown latency
- Lower values (1000-5000ms): More responsive, higher CPU usage
- Recommended: 5000ms for balanced performance
Dispatch Mode
- Channel (default): Each subscriber gets isolated channel with backpressure and zero message loss
- Broadcast: Shared channel with no backpressure, possible message loss under heavy load
Input Schema
CloudEvent Wrapper
All events are wrapped in CloudEvent format with the following structure:
Data Change Events
The data array contains one or more change events. Each event has:
- op: Operation type (
"i"= insert,"u"= update,"d"= delete) - payload: Event payload with element data and metadata
- reactivatorStart_ns (optional): Upstream processing start timestamp
- reactivatorEnd_ns (optional): Upstream processing end timestamp
Node Insert
Node Update
Node Delete
Relation Insert
Relation Update
Relation Delete
Field Descriptions
| Field | Type | Required | Description |
|---|---|---|---|
op |
String | Yes | Operation: "i" (insert), "u" (update), "d" (delete) |
payload.after |
Object | Yes (for i/u) | Element state after change |
payload.before |
Object | Yes (for d) | Element state before deletion |
payload.source.db |
String | Yes | Database name (use "Drasi" for control events) |
payload.source.table |
String | Yes | Element type: "node", "rel", or "relation" |
payload.source.ts_ns |
u64 | Yes | Timestamp in nanoseconds (used as effective_from) |
id |
String | Yes | Unique element identifier |
labels |
Array<String> | Yes | Element labels (at least one required) |
properties |
Object | Yes | Element properties (can be empty) |
startId |
String | Yes (relations) | Outgoing node ID for relations |
endId |
String | Yes (relations) | Incoming node ID for relations |
Control Events
Control events coordinate query subscriptions and are identified by payload.source.db = "Drasi" (case-insensitive).
SourceSubscription Control Event
| Field | Type | Required | Description |
|---|---|---|---|
queryId |
String | Yes | Unique query identifier |
queryNodeId |
String | Yes | Query node identifier |
nodeLabels |
Array<String> | No | Node labels query is interested in (defaults to empty) |
relLabels |
Array<String> | No | Relation labels query is interested in (defaults to empty) |
Operations:
"i": Insert subscription (query subscribes to source)"u": Update subscription (query changes label filters)"d": Delete subscription (query unsubscribes from source)
Behavior:
- Unknown control types are silently skipped with info log
- Missing required fields cause event to be skipped with warning
- Missing optional fields default to empty arrays
Event Transformation Mapping
| Platform Event | drasi-core SourceChange |
|---|---|
op: "i" |
SourceChange::Insert |
op: "u" |
SourceChange::Update |
op: "d" |
SourceChange::Delete |
payload.source.table: "node" |
Element::Node |
payload.source.table: "rel" or "relation" |
Element::Relation |
startId |
out_node (ElementReference) |
endId |
in_node (ElementReference) |
payload.source.ts_ns |
effective_from (nanoseconds) |
Property Types
All JSON property types are supported and converted to ElementValue:
Usage Examples
Basic Usage with Builder
use PlatformSource;
use Arc;
// Create platform source
let source = builder
.with_redis_url
.with_stream_key
.with_consumer_group
.with_batch_size
.build?;
// Add to drasi-lib
drasi_lib.add_source.await?;
Kubernetes Deployment
use PlatformSource;
use env;
// Use hostname for unique consumer name
let consumer_name = var
.unwrap_or_else;
let source = builder
.with_redis_url
.with_stream_key
.with_consumer_group
.with_consumer_name
.with_batch_size
.build?;
drasi_lib.add_source.await?;
With Bootstrap Provider
use PlatformSource;
use InMemoryBootstrapProvider;
let bootstrap_provider = new;
let source = builder
.with_redis_url
.with_stream_key
.with_bootstrap_provider
.build?;
drasi_lib.add_source.await?;
Custom Dispatch Settings
use PlatformSource;
use DispatchMode;
let source = builder
.with_redis_url
.with_stream_key
.with_dispatch_mode
.with_dispatch_buffer_capacity
.build?;
drasi_lib.add_source.await?;
Publishing Events to Redis
From external sources, publish events using XADD:
Batch Processing Multiple Events
The data array can contain multiple events for batch processing:
Testing with Test Subscription
use PlatformSource;
async
Lifecycle Management
Start
When start() is called:
- Connect to Redis with exponential backoff retry (default: 5 attempts)
- Create consumer group if it doesn't exist (using XGROUP CREATE with MKSTREAM)
- Spawn async task to consume stream
- Update status to
Running - Begin reading events using XREADGROUP
Running
During normal operation:
- Read events in batches using XREADGROUP with
>(new messages) - Extract and parse CloudEvent wrapper
- Detect message type (data vs. control)
- Transform events to
SourceChangeformat - Dispatch to subscribed queries via channels
- Batch acknowledge all processed messages with XACK
- Handle errors and connection failures
- Continue loop until stopped
Stop
When stop() is called:
- Cancel consumer task (abort tokio task)
- Close Redis connection
- Update status to
Stopped
Note: Consumer group position is preserved in Redis. Restarting will resume from last acknowledged position.
Error Handling
Connection Errors
- Initial connection failure: Retry with exponential backoff (1s, 2s, 4s, 8s, 16s)
- Connection lost during operation: Auto-reconnect with same retry logic
- Redis unavailable: Emit ComponentEvent, keep retrying in background
- Network issues: Automatic recovery when connection restored
Event Processing Errors
- Malformed JSON: Log warning, acknowledge event to skip, continue processing
- Invalid event format: Log error with details, acknowledge to avoid reprocessing
- Missing required fields: Log error, acknowledge event, continue
- Transformation errors: Send ComponentEvent, acknowledge event, continue
Stream Errors
- Consumer group already exists: Ignore BUSYGROUP error, continue normally
- Stream doesn't exist: Created automatically via MKSTREAM flag
- Read timeout: Normal operation when no events available, continue loop
- Acknowledgment failure: Fallback to individual acknowledgments
Error Recovery Strategy
- Transient errors: Retry with exponential backoff
- Event errors: Skip event with logging to prevent blocking
- Connection errors: Reconnect and resume from last position
- Fatal errors: Stop source and emit failure status
Performance Considerations
Throughput
- Target: >10,000 events/second per consumer
- Batch size: Higher values (100-500) improve throughput
- Horizontal scaling: Add more consumers in same group
- Network latency: Co-locate with Redis for best performance
Latency
- Target: p99 < 10ms for transformation
- Block timeout: Lower values reduce latency but increase CPU
- Batch processing: All events in batch processed before acknowledgment
- Channel dispatch: Async channels provide non-blocking delivery
Memory Usage
- Event batching: Controlled by
batch_sizeparameter - Dispatch buffers: Controlled by
dispatch_buffer_capacity - Stable under load: Minimal allocations in hot path
- Acknowledgment batching: All stream IDs acknowledged in single XACK
Tuning Recommendations
High Throughput:
.with_batch_size
.with_block_ms
.with_dispatch_buffer_capacity
Low Latency:
.with_batch_size
.with_block_ms
.with_dispatch_buffer_capacity
Balanced:
.with_batch_size
.with_block_ms
.with_dispatch_buffer_capacity
Troubleshooting
Connection Issues
Symptom: "Failed to connect to Redis" errors
Solutions:
- Verify
redis_urlis correct and accessible - Check network connectivity and firewall rules
- Ensure Redis is running:
redis-cli ping - Check Redis logs for connection errors
Consumer Group Conflicts
Symptom: BUSYGROUP error or duplicate processing
Solutions:
- Ensure
consumer_nameis unique per instance - In Kubernetes: Use
${HOSTNAME}or pod name - Check existing consumers:
redis-cli XINFO CONSUMERS stream_key group_name - Remove zombie consumers:
redis-cli XGROUP DELCONSUMER stream_key group_name consumer_name
Missing Events
Symptom: Events not appearing in queries
Solutions:
- Verify
stream_keymatches external source's stream - Check consumer group position:
redis-cli XINFO GROUPS stream_key - Ensure events are being written:
redis-cli XLEN stream_key - Review logs for transformation errors
- Verify query subscriptions are active
Event Replay
Symptom: Need to reprocess all events from stream beginning
Solutions:
- Delete consumer group:
redis-cli XGROUP DESTROY stream_key group_name - Recreate with start position:
redis-cli XGROUP CREATE stream_key group_name 0 MKSTREAM - Or use configuration (advanced): Set
always_create_consumer_group: trueandstart_id: "0"in internal config
Warning: Deleting consumer group affects all consumers in the group.
Performance Issues
Symptom: High latency or low throughput
Solutions:
- Increase
batch_sizefor better throughput (100-500) - Reduce
block_msfor lower latency (1000-3000) - Add more consumers in same group for horizontal scaling
- Check Redis performance:
redis-cli INFO stats - Monitor consumer lag:
redis-cli XINFO GROUPS stream_key - Check network latency between source and Redis
Event Format Errors
Symptom: "Transformation error" or "Failed to parse JSON" in logs
Solutions:
- Verify external source produces correct CloudEvent format
- Check required fields:
op,payload,source,table,ts_ns - For nodes: Ensure
id,labels,propertiesare present - For relations: Ensure
startIdandendIdare present - Review event in Redis:
redis-cli XRANGE stream_key - + COUNT 1 - Validate JSON format with external tool
Memory Issues
Symptom: High memory usage or OOM errors
Solutions:
- Reduce
batch_sizeto process fewer events at once - Reduce
dispatch_buffer_capacityto limit buffering - Check for query backpressure (slow query processing)
- Monitor memory usage:
ps aux | grep drasi - Review Redis memory usage:
redis-cli INFO memory
Monitoring
Key Metrics
Consumer Group Metrics:
# Check 'lag' field - number of unprocessed messages
# Check 'pending' field - messages delivered but not acknowledged
Consumer Metrics:
# Check 'pending' field per consumer
# Check 'idle' field - time since last activity
Stream Metrics:
Health Checks
- Source Status: Check
status()returnsComponentStatus::Running - Redis Connectivity: Verify no connection errors in recent logs
- Event Flow: Confirm events flowing to queries via ComponentEvents
- Consumer Lag: Monitor consumer group lag stays within acceptable bounds
- Error Rate: Track transformation errors and failed acknowledgments
Logging
The platform source uses structured logging:
- info: Normal operations (start, stop, connection, consumer group creation)
- warn: Non-fatal issues (transformation errors, unknown control types)
- error: Fatal issues (connection failures, acknowledgment failures)
- debug: Detailed event processing (individual events, acknowledgments)
Enable debug logging for troubleshooting:
RUST_LOG=drasi_source_platform=debug
Redis Streams Reference
Consumer Group Commands
Create consumer group:
# 0 = start from beginning
# $ = start from end
# > = only new messages (used internally)
Read events:
Acknowledge events:
View pending messages:
Delete consumer:
Delete consumer group:
Stream IDs
Redis stream IDs have format {timestamp_ms}-{sequence}:
- 0: Start of stream (all historical events)
- $: Current latest position (skip existing events)
- >: Only new undelivered messages (default for XREADGROUP)
- Specific ID: Resume from specific position (e.g.,
1699900000000-0)
Integration Examples
With DrasiLib
use ;
use PlatformSource;
use Arc;
// Create drasi-lib instance
let mut drasi = new;
// Add platform source
let source = builder
.with_redis_url
.with_stream_key
.build?;
drasi.add_source.await?;
// Add query
let query = cypher
.query
.from_source
.build;
drasi.add_query.await?;
// Start all components
drasi.start.await?;
With DrasiServer
use DrasiServerBuilder;
use PlatformSource;
use Arc;
let source = builder
.with_redis_url
.with_stream_key
.build?;
let server = new
.with_id
.with_host_port
.with_source
.build
.await?;
server.run.await?;
Advanced Features
Profiling Support
The platform source captures and propagates timing metadata:
- source_ns: Original event timestamp from
payload.source.ts_ns - reactivator_start_ns: Upstream reactivator start time (from event)
- reactivator_end_ns: Upstream reactivator end time (from event)
- source_send_ns: Platform source dispatch timestamp
This enables end-to-end latency analysis across the entire pipeline.
Multiple Events Per CloudEvent
The platform source efficiently processes batches of events:
All events in the data array are transformed and dispatched sequentially.
Consumer Group Persistence
Consumer group position is persisted in Redis:
- Position maintained across restarts
- Enables exactly-once semantics per group
- Supports consumer failover and recovery
- No data loss on graceful shutdown
Dispatch Modes
Channel Mode (default):
- Each subscriber gets isolated channel
- Backpressure when subscriber is slow
- Zero message loss
- Higher memory usage
Broadcast Mode:
- Shared channel for all subscribers
- No backpressure (slow subscribers drop messages)
- Lower memory usage
- Possible message loss under load
See Also
- drasi-lib Documentation - Core library documentation
- Drasi Platform SDK - External platform sources
- Redis Streams Documentation - Redis Streams reference
- CloudEvents Specification - CloudEvent format details