Skip to main content

Crate drasi_source_platform

Crate drasi_source_platform 

Source
Expand description

Platform Source Plugin for Drasi

This plugin consumes data change events from Redis Streams, which is the primary integration point for the Drasi platform. It supports CloudEvent-wrapped messages containing node and relation changes, as well as control events for query subscriptions.

§Architecture

The platform source connects to Redis as a consumer group member, enabling:

  • At-least-once delivery: Messages are acknowledged after processing
  • Horizontal scaling: Multiple consumers can share the workload
  • Fault tolerance: Unacknowledged messages are redelivered

§Configuration

FieldTypeDefaultDescription
redis_urlstringrequiredRedis connection URL (e.g., redis://localhost:6379)
stream_keystringrequiredRedis stream key to consume from
consumer_groupstring"drasi-core"Consumer group name
consumer_namestringauto-generatedUnique consumer name within the group
batch_sizeusize100Number of events to read per XREADGROUP call
block_msu645000Milliseconds to block waiting for events

§Data Format

The platform source expects CloudEvent-wrapped messages with a data array containing change events. Each event includes an operation type and payload.

§Node Insert

{
    "data": [{
        "op": "i",
        "payload": {
            "after": {
                "id": "user-123",
                "labels": ["User"],
                "properties": {
                    "name": "Alice",
                    "email": "alice@example.com"
                }
            },
            "source": {
                "db": "mydb",
                "table": "node",
                "ts_ns": 1699900000000000000
            }
        }
    }]
}

§Node Update

{
    "data": [{
        "op": "u",
        "payload": {
            "after": {
                "id": "user-123",
                "labels": ["User"],
                "properties": { "name": "Alice Updated" }
            },
            "source": { "table": "node", "ts_ns": 1699900001000000000 }
        }
    }]
}

§Node Delete

{
    "data": [{
        "op": "d",
        "payload": {
            "before": {
                "id": "user-123",
                "labels": ["User"],
                "properties": {}
            },
            "source": { "table": "node", "ts_ns": 1699900002000000000 }
        }
    }]
}

§Relation Insert

{
    "data": [{
        "op": "i",
        "payload": {
            "after": {
                "id": "follows-1",
                "labels": ["FOLLOWS"],
                "startId": "user-123",
                "endId": "user-456",
                "properties": { "since": "2024-01-01" }
            },
            "source": { "table": "rel", "ts_ns": 1699900003000000000 }
        }
    }]
}

§Control Events

Control events are identified by payload.source.db = "Drasi" (case-insensitive). Currently supported control types:

  • SourceSubscription: Query subscription management

§Example Configuration (YAML)

source_type: platform
properties:
  redis_url: "redis://localhost:6379"
  stream_key: "my-app-changes"
  consumer_group: "drasi-consumers"
  batch_size: 50
  block_ms: 10000

§Usage Example

use drasi_source_platform::{PlatformSource, PlatformSourceBuilder};
use std::sync::Arc;

let config = PlatformSourceBuilder::new()
    .with_redis_url("redis://localhost:6379")
    .with_stream_key("my-changes")
    .with_consumer_group("my-consumers")
    .build();

let source = Arc::new(PlatformSource::new("platform-source", config)?);
drasi.add_source(source).await?;

Re-exports§

pub use config::PlatformSourceConfig;

Modules§

config
Configuration types for the platform source plugin.

Structs§

PlatformSource
Platform source that reads events from Redis Streams.
PlatformSourceBuilder
Builder for creating PlatformSource instances.