Expand description
Platform Source Plugin for Drasi
This plugin consumes data change events from Redis Streams, which is the primary integration point for the Drasi platform. It supports CloudEvent-wrapped messages containing node and relation changes, as well as control events for query subscriptions.
§Architecture
The platform source connects to Redis as a consumer group member, enabling:
- At-least-once delivery: Messages are acknowledged after processing
- Horizontal scaling: Multiple consumers can share the workload
- Fault tolerance: Unacknowledged messages are redelivered
§Configuration
| Field | Type | Default | Description |
|---|---|---|---|
redis_url | string | required | Redis connection URL (e.g., redis://localhost:6379) |
stream_key | string | required | Redis stream key to consume from |
consumer_group | string | "drasi-core" | Consumer group name |
consumer_name | string | auto-generated | Unique consumer name within the group |
batch_size | usize | 100 | Number of events to read per XREADGROUP call |
block_ms | u64 | 5000 | Milliseconds to block waiting for events |
§Data Format
The platform source expects CloudEvent-wrapped messages with a data array
containing change events. Each event includes an operation type and payload.
§Node Insert
{
"data": [{
"op": "i",
"payload": {
"after": {
"id": "user-123",
"labels": ["User"],
"properties": {
"name": "Alice",
"email": "alice@example.com"
}
},
"source": {
"db": "mydb",
"table": "node",
"ts_ns": 1699900000000000000
}
}
}]
}§Node Update
{
"data": [{
"op": "u",
"payload": {
"after": {
"id": "user-123",
"labels": ["User"],
"properties": { "name": "Alice Updated" }
},
"source": { "table": "node", "ts_ns": 1699900001000000000 }
}
}]
}§Node Delete
{
"data": [{
"op": "d",
"payload": {
"before": {
"id": "user-123",
"labels": ["User"],
"properties": {}
},
"source": { "table": "node", "ts_ns": 1699900002000000000 }
}
}]
}§Relation Insert
{
"data": [{
"op": "i",
"payload": {
"after": {
"id": "follows-1",
"labels": ["FOLLOWS"],
"startId": "user-123",
"endId": "user-456",
"properties": { "since": "2024-01-01" }
},
"source": { "table": "rel", "ts_ns": 1699900003000000000 }
}
}]
}§Control Events
Control events are identified by payload.source.db = "Drasi" (case-insensitive).
Currently supported control types:
- SourceSubscription: Query subscription management
§Example Configuration (YAML)
source_type: platform
properties:
redis_url: "redis://localhost:6379"
stream_key: "my-app-changes"
consumer_group: "drasi-consumers"
batch_size: 50
block_ms: 10000§Usage Example
ⓘ
use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
use std::sync::Arc;
let config = PlatformSourceBuilder::new()
.with_redis_url("redis://localhost:6379")
.with_stream_key("my-changes")
.with_consumer_group("my-consumers")
.build();
let source = Arc::new(PlatformSource::new("platform-source", config)?);
drasi.add_source(source).await?;Re-exports§
pub use config::PlatformSourceConfig;
Modules§
- config
- Configuration types for the platform source plugin.
Structs§
- Platform
Source - Platform source that reads events from Redis Streams.
- Platform
Source Builder - Builder for creating
PlatformSourceinstances.