Please check the build logs for more information.
See Builds for ideas on how to fix a failed build, or Metadata for how to configure docs.rs builds.
If you believe this is docs.rs' fault, open an issue.
DrasiLib
DrasiLib is a Rust library for building change-driven solutions that detect and react to data changes with precision.
Overview
DrasiLib enables real-time change detection using continuous queries that maintain live result sets. Unlike traditional event-driven systems, you declare what changes matter using Cypher queries, and DrasiLib handles the complexity of tracking state and detecting meaningful transitions.
Sources → Continuous Queries → Reactions
↓ ↓ ↓
Data In Change Detection Actions Out
Key Benefits:
- Declarative: Define change detection with Cypher queries, not custom code
- Precise: Get before/after states for every change, not just raw events
- Pluggable: Use existing source/reaction plugins or build your own
- Scalable: Built-in backpressure, priority queues, and dispatch modes
Getting Started
Installation
[]
= { = "path/to/drasi-lib" }
= { = "1", = ["full"] }
Initialization Methods
DrasiLib can be initialized in two ways:
- Builder Pattern (Recommended) - Fluent API for programmatic configuration
- Config Struct - Direct configuration for YAML/JSON loading scenarios
Method 1: Builder Pattern (Recommended)
The builder provides a fluent interface for configuring sources, queries, and reactions.
Basic Example
use ;
use ;
use ;
async
DrasiLibBuilder API Reference
Create a builder with DrasiLib::builder() and configure using the fluent API.
with_id(id: impl Into<String>)
Set a unique identifier for this DrasiLib instance. Used for logging and debugging.
builder
.with_id
with_source(source: impl Source + 'static)
Add a source plugin instance. Ownership is transferred to DrasiLib.
let source = new?;
builder
.with_source // source is moved here
You can add multiple sources by chaining:
builder
.with_source
.with_source
.with_source
with_reaction(reaction: impl Reaction + 'static)
Add a reaction plugin instance. Ownership is transferred to DrasiLib.
let reaction = new;
builder
.with_reaction // reaction is moved here
with_query(config: QueryConfig)
Add a query configuration. Use the Query builder to create configurations:
builder
.with_query
with_priority_queue_capacity(capacity: usize)
Set the default priority queue capacity for all components. Controls how many events can be buffered before backpressure is applied.
builder
.with_priority_queue_capacity // Default: 10000
with_dispatch_buffer_capacity(capacity: usize)
Set the default dispatch buffer capacity for event routing channels.
builder
.with_dispatch_buffer_capacity // Default: 1000
add_storage_backend(config: StorageBackendConfig)
Add a storage backend for persistent query state (RocksDB, Redis/Garnet).
use StorageBackendConfig;
builder
.add_storage_backend
build() -> Result<DrasiLib>
Build the DrasiLib instance. This validates configuration, creates all components, and initializes the system.
let core = builder
.with_id
.with_source
.with_query
.build
.await?;
// Now start processing
core.start.await?;
Method 2: Config Struct Initialization
For scenarios where you need to load configuration from YAML/JSON files or construct configuration programmatically without the builder.
DrasiLibConfig Structure
use ;
use Arc;
// Create configuration directly
let config = DrasiLibConfig ;
// Validate the configuration
config.validate?;
YAML Configuration Format
# DrasiLibConfig structure
id: my-server
priority_queue_capacity: 50000 # Optional, default: 10000
dispatch_buffer_capacity: 5000 # Optional, default: 1000
storage_backends: # Optional
- id: rocksdb-backend
spec:
type: rocksdb
path: ./data
queries:
- id: high-temp-alerts
query: |
MATCH (s:Sensor)
WHERE s.temperature > 75
RETURN s.id, s.temperature, s.location
queryLanguage: Cypher # Optional, default: Cypher
sources:
- source_id: sensors
pipeline: # Optional middleware pipeline
auto_start: true # Optional, default: true
enableBootstrap: true # Optional, default: true
bootstrapBufferSize: 10000 # Optional, default: 10000
- id: complex-join-query
query: |
MATCH (o:Order)-[:PLACED_BY]->(c:Customer)
WHERE o.status = 'pending'
RETURN o.id, c.email, o.total
sources:
- source_id: orders
- source_id: customers
joins: # Synthetic joins for cross-source queries
- id: PLACED_BY
keys:
- label: Order
property: customer_id
- label: Customer
property: id
Loading from YAML and Initializing
use ;
use Arc;
// Load YAML configuration
let yaml_str = read_to_string?;
let config: DrasiLibConfig = from_str?;
// Validate configuration
config.validate?;
// Convert to RuntimeConfig (applies defaults to queries)
let runtime_config = new;
// Note: DrasiLib::new() is internal. For config-based initialization,
// use the builder pattern and add sources/reactions programmatically:
let mut core = builder
.with_id;
// Add queries from config
for query_config in &runtime_config.queries
// Add sources and reactions as plugin instances
// (Sources/reactions must be created from plugin types - they cannot be in YAML)
let source = create_source_from_external_config?;
let reaction = create_reaction_from_external_config?;
let core = core
.with_source
.with_reaction
.build
.await?;
core.start.await?;
Configuration Fields Reference
| Field | Type | Default | Description |
|---|---|---|---|
id |
String |
UUID | Unique identifier for this instance |
priority_queue_capacity |
Option<usize> |
10000 |
Default event queue capacity |
dispatch_buffer_capacity |
Option<usize> |
1000 |
Default channel buffer capacity |
storage_backends |
Vec<StorageBackendConfig> |
[] |
Storage backend definitions |
queries |
Vec<QueryConfig> |
[] |
Query configurations |
QueryConfig Fields Reference
| Field | Type | Default | Description |
|---|---|---|---|
id |
String |
Required | Unique query identifier |
query |
String |
Required | Cypher or GQL query string |
query_language |
QueryLanguage |
Cypher |
Query language (Cypher or GQL) |
sources |
Vec<SourceSubscriptionConfig> |
[] |
Source subscriptions |
middleware |
Vec<SourceMiddlewareConfig> |
[] |
Middleware configurations |
auto_start |
bool |
true |
Start automatically with DrasiLib |
joins |
Option<Vec<QueryJoinConfig>> |
None |
Synthetic join definitions |
enable_bootstrap |
bool |
true |
Load initial data from sources |
bootstrap_buffer_size |
usize |
10000 |
Bootstrap buffer size |
priority_queue_capacity |
Option<usize> |
Inherited | Override queue capacity |
dispatch_buffer_capacity |
Option<usize> |
Inherited | Override buffer capacity |
dispatch_mode |
Option<DispatchMode> |
Channel |
Event dispatch mode |
storage_backend |
Option<StorageBackendRef> |
Default | Storage backend reference |
Query Builder
The Query builder creates query configurations with a fluent API.
Query::cypher(id: impl Into<String>)
Create a new Cypher query builder:
cypher
.query
.from_source
.build
Query::gql(id: impl Into<String>)
Create a new GQL (GraphQL) query builder:
gql
.query
.from_source
.build
Query Builder Methods
| Method | Description | Default |
|---|---|---|
.query(str) |
Set the query string | Required |
.from_source(id) |
Subscribe to a source | Required (at least one) |
.from_source_with_pipeline(id, pipeline) |
Subscribe with middleware pipeline | - |
.with_middleware(config) |
Add middleware transformation | [] |
.auto_start(bool) |
Start automatically with DrasiLib | true |
.enable_bootstrap(bool) |
Load initial data from sources | true |
.with_bootstrap_buffer_size(size) |
Bootstrap buffer size | 10000 |
.with_joins(joins) |
Configure synthetic joins | None |
.with_priority_queue_capacity(cap) |
Override queue capacity | Inherited |
.with_dispatch_buffer_capacity(cap) |
Override buffer capacity | Inherited |
.with_dispatch_mode(mode) |
Set dispatch mode | Channel |
.with_storage_backend(ref) |
Use specific storage backend | Default |
Complete Query Example
use ;
cypher
.query
.from_source
.from_source
.auto_start
.enable_bootstrap
.with_priority_queue_capacity
.with_dispatch_mode
.build
Runtime Management
After building, use these methods to manage the DrasiLib instance:
// Lifecycle
core.start.await?; // Start all auto-start components
core.stop.await?; // Stop all components
// Component control
core.start_source.await?;
core.stop_source.await?;
core.start_query.await?;
core.stop_query.await?;
core.start_reaction.await?;
core.stop_reaction.await?;
// Add components at runtime
core.add_source.await?;
core.add_reaction.await?;
core.add_query.await?;
// Remove components
core.remove_source.await?;
core.remove_query.await?;
core.remove_reaction.await?;
// Inspection
let sources = core.list_sources.await?;
let queries = core.list_queries.await?;
let reactions = core.list_reactions.await?;
let status = core.get_source_status.await?;
let results = core.get_query_results.await?;
// Check running state
let is_running = core.is_running.await;
// Get current configuration
let config = core.get_current_config.await?;
Core Concepts
Sources
Sources ingest data from external systems and emit graph elements (nodes and relationships). DrasiLib provides a trait-based plugin architecture.
Important: Sources are plugins that must be instantiated externally and passed to DrasiLib. DrasiLib has no awareness of which source plugins exist.
Available Source Plugins: See components/sources/ for PostgreSQL, HTTP, gRPC, Mock, Platform, and Application sources.
Queries
Queries define what changes matter using Cypher. They track three types of results:
Adding- New rows matching the queryUpdating- Existing rows with changed valuesRemoving- Rows that no longer match
Limitation: ORDER BY, TOP, and LIMIT are not supported in continuous queries.
Reactions
Reactions respond to query results by triggering actions (webhooks, logging, etc.).
Important: Like sources, reactions are plugins that must be instantiated externally.
Available Reaction Plugins: See components/reactions/ for HTTP, gRPC, SSE, Log, Platform, and Profiler reactions.
Bootstrap Providers
Bootstrap providers deliver initial data to queries before streaming begins. Any source can use any bootstrap provider, enabling flexible scenarios like "bootstrap from database, stream changes via HTTP."
Dispatch Modes
DrasiLib supports two dispatch modes for event routing:
| Mode | Backpressure | Message Loss | Best For |
|---|---|---|---|
| Channel (default) | Yes | None | Different subscriber speeds, critical data |
| Broadcast | No | Possible | High fanout (10+ subscribers), uniform speeds |
Configure per-query:
use DispatchMode;
cypher
.with_dispatch_mode // Default
// or
.with_dispatch_mode
.build
Developer Guides
For detailed information on building plugins:
- Source Developer Guide - Creating custom data sources
- Reaction Developer Guide - Creating custom reactions
- Bootstrap Provider Guide - Creating bootstrap providers
- State Store Provider Guide - Creating state store providers
State Store Providers
State store providers allow plugins (Sources, BootstrapProviders, and Reactions) to persist runtime state that survives restarts of DrasiLib.
Default (Memory) Provider
By default, DrasiLib uses an in-memory state store that does not persist across restarts:
let core = builder
.with_id
.build // Uses MemoryStateStoreProvider by default
.await?;
Redb Provider
For ACID-compliant persistent storage, use the redb state store provider:
use RedbStateStoreProvider;
use Arc;
let state_store = new?;
let core = builder
.with_id
.with_state_store_provider
.build
.await?;
Using State Store in Plugins
When a Source or Reaction is added to DrasiLib, a runtime context is provided via the initialize() method. The context contains all DrasiLib-provided services, including the state store (if configured):
use ;
For reactions, use ReactionRuntimeContext which also provides access to QueryProvider:
use ;
See the State Store Provider Guide for full documentation.
Troubleshooting
"Source/Reaction not found"
Ensure you've added the instance to DrasiLib before referencing it in queries:
let source = new?;
let core = builder
.with_source // Must add source before queries reference it
.with_query
.build
.await?;
Events not being received
- Broadcast mode: May lose events if receivers are slow
- Channel mode: Provides backpressure but may block senders
- Check
dispatch_buffer_capacityandpriority_queue_capacitysettings
Debug Logging
set_var;
init;
License
Apache License 2.0
Related Projects
- Drasi Platform - Complete deployment platform
- Drasi Core - Continuous query engine