DrasiLib
DrasiLib is a Rust library that brings Drasi change processing into your application as an embedded library. It monitors data sources using continuous queries and delivers precise change notifications to reactions — all in-process, with no external infrastructure required.
DrasiLib is part of the Drasi project, a CNCF Sandbox Data Change Processing platform.
How It Works
Sources --> Continuous Queries --> Reactions
| | |
Data In Change Detection Actions Out
- Sources connect to databases, APIs, or streams and model incoming data as a property graph of nodes and relationships.
- Continuous Queries run Cypher or GQL (ISO 9074:2024) queries perpetually against that graph. When source data changes, queries detect which results were added, updated (with before/after), or deleted.
- Reactions receive those result changes and take action — send webhooks, write to databases, log alerts, or anything else.
You declare what changes matter with a query. DrasiLib handles the rest.
Quick Start
Add to your Cargo.toml:
[]
= "0.4"
= { = "1", = ["full"] }
Minimal example:
use ;
async
What happens when you call start():
- All sources begin ingesting data and populating the graph.
- Each query bootstraps (loads initial data), then continuously evaluates against live changes.
- Reactions subscribe to query results and process every add/update/delete.
Table of Contents
- Builder API
- Query Builder
- Multi-Source Queries and Joins
- Query Examples (Cypher)
- Runtime Management
- Component Lifecycle Events
- Component Dependency Graph
- Dispatch Modes
- Storage Backends
- State Store Providers
- Logging
- Middleware
- Plugin Architecture
- YAML Configuration
- Error Handling
- Feature Flags
Builder API
Create a DrasiLib instance with DrasiLib::builder():
let core = builder
.with_id // Instance name (default: UUID)
.with_source // Add a source plugin
.with_source // Add another source
.with_reaction // Add a reaction plugin
.with_query // Add a query (see Query Builder)
.with_priority_queue_capacity // Event queue depth (default: 10,000)
.with_dispatch_buffer_capacity // Channel buffer size (default: 1,000)
.add_storage_backend // Named storage backend (RocksDB, Redis)
.with_index_provider // Plugin for persistent indexes
.with_state_store_provider // Plugin state persistence
.build
.await?;
Sources and reactions are owned by DrasiLib after calling with_source() / with_reaction(). You cannot use the instance after passing it to the builder.
Builder Method Reference
| Method | Type | Default |
|---|---|---|
with_id(impl Into<String>) |
Instance name for logging | Auto-generated UUID |
with_source(impl Source + 'static) |
Source plugin (chainable) | — |
with_reaction(impl Reaction + 'static) |
Reaction plugin (chainable) | — |
with_query(QueryConfig) |
Query config from Query builder |
— |
with_priority_queue_capacity(usize) |
Default event queue capacity | 10,000 |
with_dispatch_buffer_capacity(usize) |
Default channel buffer size | 1,000 |
add_storage_backend(StorageBackendConfig) |
Named storage backend definition | — |
with_index_provider(Arc<dyn IndexBackendPlugin>) |
Persistent index plugin | In-memory |
with_state_store_provider(Arc<dyn StateStoreProvider>) |
Plugin state persistence | In-memory |
build() -> Result<DrasiLib> |
Validate and construct | — |
Query Builder
Use the Query builder to create query configurations:
use Query;
let config = cypher
.query
.from_source
.build;
For GQL (ISO 9074:2024 graph query language — not GraphQL):
let config = gql
.query
.from_source
.build;
Query Builder Methods
| Method | Description | Default |
|---|---|---|
query(impl Into<String>) |
Cypher or GQL query string | Required |
from_source(impl Into<String>) |
Subscribe to a source by ID | Required (at least one) |
from_source_with_pipeline(id, Vec<String>) |
Subscribe with named middleware pipeline | — |
auto_start(bool) |
Start with core.start() |
true |
enable_bootstrap(bool) |
Load initial data from sources | true |
with_bootstrap_buffer_size(usize) |
Buffer size during bootstrap | 10,000 |
with_joins(Vec<QueryJoinConfig>) |
Synthetic joins for multi-source queries | None |
with_priority_queue_capacity(usize) |
Override instance-level queue capacity | Inherited |
with_dispatch_buffer_capacity(usize) |
Override instance-level buffer size | Inherited |
with_dispatch_mode(DispatchMode) |
Channel (backpressure) or Broadcast (fanout) |
Channel |
with_storage_backend(StorageBackendRef) |
Persistent storage for this query | In-memory |
with_middleware(SourceMiddlewareConfig) |
Add middleware transformation | [] |
build() -> QueryConfig |
Build the configuration | — |
Multi-Source Queries and Joins
A single query can span data from multiple sources. Define synthetic joins to tell DrasiLib how to create relationships between elements from different sources:
use ;
let config = cypher
.query
.from_source
.from_source
.with_joins
.build;
DrasiLib creates PLACED_BY relationships whenever Order.customer_id == Customer.id, even though the orders and customers come from different databases.
Query Examples (Cypher)
DrasiLib supports a subset of openCypher optimized for continuous evaluation:
Simple filter:
MATCH (s:Sensor)
WHERE s.temperature > 80
RETURN s.id, s.temperature, s.location
Relationship traversal:
MATCH (e:Employee)-[:WORKS_IN]->(d:Department)
WHERE d.name = 'Engineering'
RETURN e.name, e.title, d.name
Aggregation (results update as underlying data changes):
MATCH (o:Order)
WHERE o.status = 'completed'
RETURN o.region, count(o) AS order_count, sum(o.total) AS revenue
Multi-hop traversal:
MATCH (c:Customer)-[:PLACED]->(o:Order)-[:CONTAINS]->(p:Product)
WHERE p.category = 'electronics' AND o.total > 500
RETURN c.name, o.id, collect(p.name) AS products
Temporal (NULL-based state detection):
MATCH (t:Task)
WHERE t.completed_at IS NULL AND t.created_at < datetime() - duration('P7D')
RETURN t.id, t.title, t.assignee
Limitation:
ORDER BY,LIMIT, andTOPare not supported in continuous queries.
Runtime Management
Lifecycle
core.start.await?; // Start sources -> queries -> reactions
core.stop.await?; // Stop reactions -> queries -> sources
let running = core.is_running.await; // Check if running
Adding, Removing, and Updating Components at Runtime
// Add (auto-starts if server is running and component has auto_start=true)
core.add_source.await?;
core.add_query.await?;
core.add_reaction.await?;
// Remove (cleanup=true calls deprovision() for resource cleanup)
core.remove_source.await?;
core.remove_query.await?;
core.remove_reaction.await?;
// Hot-swap (preserves graph edges, event history, and relationships)
core.update_source.await?;
core.update_query.await?;
core.update_reaction.await?;
// Start / stop individual components
core.start_source.await?;
core.stop_source.await?;
core.start_query.await?;
core.stop_query.await?;
core.start_reaction.await?;
core.stop_reaction.await?;
Inspecting Components
// List all components with their current status
let sources: = core.list_sources.await?;
let queries = core.list_queries.await?;
let reactions = core.list_reactions.await?;
// Get status of a specific component
let status: ComponentStatus = core.get_source_status.await?;
// Get detailed info (type, status, configuration metadata)
let info = core.get_source_info.await?; // -> SourceRuntime
let info = core.get_query_info.await?; // -> QueryRuntime
let info = core.get_reaction_info.await?; // -> ReactionRuntime
// Get current query result set as a JSON snapshot
let results: = core.get_query_results.await?;
// Get query configuration
let config: QueryConfig = core.get_query_config.await?;
// Export full DrasiLib configuration
let config: DrasiLibConfig = core.get_current_config.await?;
ComponentStatus Values
| Status | Meaning |
|---|---|
Stopped |
Not running (initial state) |
Starting |
Initialization in progress |
Running |
Actively processing |
Stopping |
Graceful shutdown in progress |
Error |
Failed (check events for details) |
Reconfiguring |
Being updated via update_*() |
Component Lifecycle Events
Every status change is recorded and can be subscribed to in real-time:
// Subscribe to events for a specific component (returns history + live stream)
let = core.subscribe_source_events.await?;
let = core.subscribe_query_events.await?;
let = core.subscribe_reaction_events.await?;
// Process historical events
for event in &history
// Stream live events
while let Ok = rx.recv.await
// Subscribe to ALL component events (global broadcast)
let mut rx = core.subscribe_all_component_events;
while let Ok = rx.recv.await
ComponentEvent Fields
Component Dependency Graph
DrasiLib maintains a directed graph of all components and their relationships, backed by petgraph. The graph is the single source of truth for component metadata, runtime instances, and lifecycle events.
Instance ("my-app")
|-- Owns --> Source: "orders-db"
| '-- Feeds --> Query: "active-orders"
|-- Owns --> Query: "active-orders"
| '-- Feeds --> Reaction: "webhook"
'-- Owns --> Reaction: "webhook"
Querying the Graph
// Full graph snapshot (serializable to JSON via serde)
let snapshot: GraphSnapshot = core.get_graph.await;
let json = to_string_pretty?;
// Find what depends on a component
let dependents: = core.get_dependents.await;
// Find what a component depends on
let deps: = core.get_dependencies.await;
// Check if safe to remove (errors if other components depend on it)
core.can_remove_component.await?;
Relationship Types
| From | Relationship | To |
|---|---|---|
| Source | Feeds | Query |
| Query | Feeds | Reaction |
| BootstrapProvider | Bootstraps | Source |
| IdentityProvider | Authenticates | Component |
All relationships are bidirectional (e.g., Feeds / SubscribesTo). Ownership edges (Owns / OwnedBy) are created automatically between the instance root and each component.
Dispatch Modes
Configure how query results are routed to reaction subscribers:
| Mode | Backpressure | Message Loss | Best For |
|---|---|---|---|
Channel (default) |
Yes — slow consumers block producers | None | Reliable delivery, different consumer speeds |
Broadcast |
No — fast fire-and-forget | Possible if receivers lag | High fanout (many subscribers), uniform speeds |
cypher
.with_dispatch_mode // Default: dedicated channel per subscriber
.build
cypher
.with_dispatch_mode // Shared broadcast channel
.build
Storage Backends
By default, query indexes are held in memory. For persistent state that survives restarts, configure a storage backend:
use ;
let core = builder
.with_id
// 1. Define a named backend
.add_storage_backend
// 2. Provide the plugin that implements the backend
.with_index_provider
.with_source
.with_query
.build
.await?;
StorageBackendSpec Variants
| Variant | Fields | Notes |
|---|---|---|
Memory |
enable_archive: bool |
Default. Volatile — data lost on restart. |
RocksDb |
path: String, enable_archive: bool, direct_io: bool |
Path must be absolute. |
Redis |
connection_string: String, cache_size: Option<usize> |
URL must start with redis:// or rediss://. |
State Store Providers
State stores let plugins (sources, reactions) persist key-value data across restarts. This is independent of query index storage.
// Default: in-memory (lost on restart)
let core = builder.with_id.build.await?;
// Persistent: redb (ACID-compliant embedded database)
use RedbStateStoreProvider;
let core = builder
.with_id
.with_state_store_provider
.build
.await?;
Plugins access the state store through their runtime context:
// Inside a Source or Reaction implementation:
async
async
Logging
DrasiLib provides component-aware logging built on tracing. Logging is initialized automatically when you call build() — no manual setup required.
Control verbosity with RUST_LOG:
RUST_LOG=info RUST_LOG=debug RUST_LOG=drasi_lib=debug
Subscribing to Component Logs
// Returns (recent_history, live_broadcast_receiver)
let = core.subscribe_source_logs.await?;
let = core.subscribe_query_logs.await?;
let = core.subscribe_reaction_logs.await?;
for msg in &history
while let Ok = rx.recv.await
LogMessage Fields
Standard log::info!() and tracing::info!() macros both work inside plugin code — logs are automatically routed to the component that spawned the task.
Middleware
Middleware transforms data between sources and queries. Each middleware is a Cargo feature that must be enabled explicitly.
[]
= { = "0.4", = ["middleware-promote", "middleware-decoder"] }
Available Middleware
| Feature | Kind | Description |
|---|---|---|
middleware-jq |
Transform | Apply jq expressions to incoming data |
middleware-bundled-jq |
Transform | Same as above, but bundles jq (no system dep) |
middleware-map |
Transform | Map properties using JSONPath selectors |
middleware-promote |
Transform | Copy nested values to top-level properties |
middleware-relabel |
Transform | Rename element labels |
middleware-decoder |
Transform | Decode base64, hex, URL-encoded, or JSON-escaped strings |
middleware-parse-json |
Transform | Parse JSON strings into structured objects |
middleware-unwind |
Transform | Expand arrays into separate graph elements |
middleware-all |
Convenience | Enable all middleware |
Note:
middleware-jqcompiles jq from source and requires build tools: macOS:brew install autoconf automake libtool/ Ubuntu:sudo apt-get install autoconf automake libtool flex bison
Configuring Middleware on a Query
use SourceMiddlewareConfig;
use json;
let config = cypher
.query
.from_source
.with_middleware
.build;
Plugin Architecture
DrasiLib uses a trait-based plugin system. Sources, reactions, bootstrap providers, and index backends are all implemented as plugins.
Dynamic Plugin Loading
When using cdylib plugins (shared libraries), the plugin loader discovers and loads them from a configured directory:
- Plugins are matched by glob patterns (e.g.,
libdrasi_source_*,libdrasi_reaction_*) - Only cdylib shared libraries are loaded:
.dylib(macOS),.so(Linux),.dll(Windows) - Non-cdylib Cargo artifacts (
.rlib,.rmeta,.d) that may exist alongside the cdylib are silently ignored - Each plugin must have exactly one cdylib file; if multiple cdylib extensions exist for the same base name, the loader reports an ambiguity error
Source Plugins
A source implements the Source trait:
use ;
use SourceRuntimeContext;
use SubscriptionResponse;
use async_trait;
Available source plugins: drasi-source-postgres, drasi-source-http, drasi-source-grpc, drasi-source-mock, drasi-source-mssql, drasi-source-platform, drasi-source-application.
Reaction Plugins
A reaction implements the Reaction trait:
use ;
use ReactionRuntimeContext;
use async_trait;
Available reaction plugins: drasi-reaction-http, drasi-reaction-grpc, drasi-reaction-grpc-adaptive, drasi-reaction-sse, drasi-reaction-log, drasi-reaction-platform, drasi-reaction-profiler, drasi-reaction-storedproc-postgres, drasi-reaction-storedproc-mysql, drasi-reaction-storedproc-mssql, drasi-reaction-application.
Result Format
Reactions receive QueryResult values containing ResultDiff items:
YAML Configuration
Queries can be defined in YAML and loaded at startup. Sources and reactions are always created programmatically (they are runtime plugin instances, not config).
id: my-app
priority_queue_capacity: 50000
dispatch_buffer_capacity: 5000
queries:
- id: high-temp-alerts
query: |
MATCH (s:Sensor)
WHERE s.temperature > 75
RETURN s.id, s.temperature, s.location
queryLanguage: Cypher
sources:
- source_id: sensors
auto_start: true
enableBootstrap: true
bootstrapBufferSize: 10000
- id: cross-source
query: |
MATCH (o:Order)-[:PLACED_BY]->(c:Customer)
WHERE o.status = 'pending'
RETURN o.id, c.email, o.total
sources:
- source_id: orders
- source_id: customers
joins:
- id: PLACED_BY
keys:
- label: Order
property: customer_id
- label: Customer
property: id
Loading YAML
use DrasiLibConfig;
let yaml = read_to_string?;
let config: DrasiLibConfig = from_str?;
config.validate?;
let mut builder = builder.with_id;
for q in &config.queries
let core = builder
.with_source
.with_reaction
.build
.await?;
DrasiLibConfig Fields
| Field | Type | Default |
|---|---|---|
id |
String |
UUID |
priority_queue_capacity |
Option<usize> |
10,000 |
dispatch_buffer_capacity |
Option<usize> |
1,000 |
storage_backends |
Vec<StorageBackendConfig> |
[] |
queries |
Vec<QueryConfig> |
[] |
QueryConfig Fields
| Field | YAML Key | Type | Default |
|---|---|---|---|
id |
id |
String |
Required |
query |
query |
String |
Required |
query_language |
queryLanguage |
Cypher or GQL |
Cypher |
sources |
sources |
Vec<SourceSubscriptionConfig> |
[] |
middleware |
middleware |
Vec<SourceMiddlewareConfig> |
[] |
auto_start |
auto_start |
bool |
true |
enable_bootstrap |
enableBootstrap |
bool |
true |
bootstrap_buffer_size |
bootstrapBufferSize |
usize |
10,000 |
joins |
joins |
Option<Vec<QueryJoinConfig>> |
None |
dispatch_mode |
dispatch_mode |
Option<DispatchMode> |
Channel |
storage_backend |
storage_backend |
Option<StorageBackendRef> |
In-memory |
Error Handling
All public methods return drasi_lib::Result<T>, which wraps DrasiError:
use ;
match core.get_source_status.await
DrasiError Variants
| Variant | When |
|---|---|
ComponentNotFound { component_type, component_id } |
Component does not exist |
AlreadyExists { component_type, component_id } |
Duplicate component ID |
InvalidConfig { message } |
Configuration validation failed |
InvalidState { message } |
Operation not valid in current state |
Validation { message } |
Input validation failed |
OperationFailed { component_type, component_id, operation, reason } |
Runtime operation failed |
Internal(anyhow::Error) |
Unexpected internal error |
Feature Flags
| Feature | Description |
|---|---|
middleware-jq |
JQ transformations (requires system jq build tools) |
middleware-bundled-jq |
JQ transformations (bundles jq, no system dependency) |
middleware-decoder |
Base64, hex, URL, JSON-escape decoding |
middleware-map |
JSONPath property mapping |
middleware-parse-json |
Parse JSON strings into objects |
middleware-promote |
Promote nested properties to top level |
middleware-relabel |
Rename element labels |
middleware-unwind |
Expand arrays into elements |
middleware-all |
Enable all middleware |
azure-identity |
Azure Managed Identity / Workload Identity credential provider |
aws-identity |
AWS IAM / RDS credential provider |
all-identity |
Enable all identity providers |
Related Projects
- Drasi documentation
- Drasi Platform — Kubernetes deployment
- Drasi Server — Single-process / Docker deployment
- Drasi Core — Continuous query engine (this repo)
License
Apache License 2.0