Application Source
Overview
The Application Source is a programmatic data injection plugin for Drasi that enables direct, in-process delivery of graph data changes from Rust applications. Unlike network-based sources (HTTP, gRPC) or database-connected sources (PostgreSQL), the Application Source provides a native Rust API through a clonable handle pattern, allowing any part of your application to send graph events directly into Drasi's continuous query processing pipeline.
Key Capabilities
- Handle-based API: Clone and share handles across threads for concurrent event injection
- Type-safe Event Construction: Builder pattern for creating graph nodes and relationships with compile-time type safety
- Pluggable Bootstrap Support: Configure any bootstrap provider for initial data delivery
- Zero Network Overhead: In-process communication via bounded async channels
- Flexible Property Types: Support for strings, integers, floats, booleans, and null values
- Batch Operations: Send multiple changes efficiently in a single operation
Use Cases
Ideal for:
- Embedded Drasi: Integrate Drasi directly within your Rust application without external infrastructure
- Testing: Write precise unit and integration tests with full control over event sequences and timing
- Synthetic Data Generation: Create simulation environments, demos, or development data pipelines
- Hybrid Architectures: Combine programmatic events with data from external sources
- Low-latency Scenarios: Eliminate network serialization overhead for performance-critical applications
Not suitable for:
- External system integration (use PostgreSQL, HTTP, or gRPC sources instead)
- Cross-language scenarios (use HTTP or gRPC sources for language-agnostic access)
- Persistent data sources (use PostgreSQL or other database sources)
Configuration
Builder Pattern (Recommended)
The Application Source is typically created programmatically using the constructor pattern:
use ;
use HashMap;
// Create minimal configuration
let config = ApplicationSourceConfig ;
// Create source and handle
let = new?;
// Configure bootstrap provider (optional)
source.set_bootstrap_provider.await;
Configuration Struct
Configuration Options
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
properties |
Custom application-specific properties passed through to Source::properties() |
HashMap<String, serde_json::Value> |
Any JSON-serializable key-value pairs | {} (empty map) |
auto_start |
Whether to start automatically when added to DrasiLib | bool |
true, false |
true |
Note: The Application Source has minimal configuration requirements since it operates entirely in-process. The properties field is primarily for metadata and custom application logic.
Auto-Start Behavior: When auto_start=true (default), the source starts immediately if added to a running DrasiLib instance. If added before drasi.start() is called, it starts when the DrasiLib starts. When auto_start=false, the source must be started manually via drasi.start_source("source-id").
Input Schema
The Application Source accepts graph data changes through the ApplicationSourceHandle API. Events follow Drasi's graph data model:
Graph Elements
Node Structure:
Node
Relationship Structure:
Relation
Property Types
Properties are built using PropertyMapBuilder and support the following types:
| Type | Rust Type | Builder Method | Example |
|---|---|---|---|
| String | Arc<str> |
.with_string(key, value) |
.with_string("name", "Alice") |
| Integer | i64 |
.with_integer(key, value) |
.with_integer("age", 30) |
| Float | f64 |
.with_float(key, value) |
.with_float("score", 95.5) |
| Boolean | bool |
.with_bool(key, value) |
.with_bool("active", true) |
| Null | - | .with_null(key) |
.with_null("optional_field") |
Event Types
The Application Source processes three types of graph changes:
-
Insert: Add a new node or relationship
Insert -
Update: Modify an existing node or relationship (full replacement)
Update -
Delete: Remove a node or relationship
Delete
Usage Examples
Basic Setup
use ;
use HashMap;
async
Inserting Nodes
// Insert a person node
let person_props = new
.with_string
.with_integer
.with_string
.with_bool
.build;
handle.send_node_insert.await?;
// Insert a company node
let company_props = new
.with_string
.with_string
.with_integer
.build;
handle.send_node_insert.await?;
Inserting Relationships
// Create an employment relationship
let employment_props = new
.with_string
.with_string
.with_integer
.build;
handle.send_relation_insert.await?;
// Create a social relationship
let knows_props = new
.with_string
.with_integer
.build;
handle.send_relation_insert.await?;
Updating Nodes
// Update existing node (full replacement)
let updated_props = new
.with_string // Changed name
.with_integer // Birthday!
.with_string
.with_bool
.with_string // New property
.build;
handle.send_node_update.await?;
Deleting Elements
// Delete a node
handle.send_delete.await?;
// Delete a relationship
handle.send_delete.await?;
Batch Operations
use ;
use Arc;
let mut changes = Vecnew;
// Build multiple changes
for i in 1..=5
// Send all changes in sequence
handle.send_batch.await?;
Multi-threaded Usage
use task;
// Clone handle for concurrent access
let handle1 = handle.clone;
let handle2 = handle.clone;
let task1 = spawn;
let task2 = spawn;
// Wait for both tasks
task1.await??;
task2.await??;
Integration with Drasi Server
use DrasiLib;
use Arc;
// Create Drasi instance
let drasi = new.await?;
// Create application source
let config = ApplicationSourceConfig ;
let = new?;
// Add source to Drasi
drasi.add_source.await?;
// Define a query that uses the source
let query = drasi.create_query
.cypher
.from_source
.build
.await?;
drasi.add_query.await?;
// Now send events via handle
let props = new
.with_string
.with_integer
.build;
handle.send_node_insert.await?;
Bootstrap Support
The Application Source supports pluggable bootstrap providers via the BootstrapProvider trait. Any bootstrap provider implementation can be used with this source.
Configuring Bootstrap
use ;
// Bootstrap providers are separate crates - add the one you need to your Cargo.toml
// use drasi_bootstrap_application::ApplicationBootstrapProvider;
// use drasi_bootstrap_scriptfile::ScriptFileBootstrapProvider;
// Create source
let config = ApplicationSourceConfig ;
let = new?;
// Configure bootstrap provider (example with ApplicationBootstrapProvider)
// let bootstrap_provider = ApplicationBootstrapProvider::new();
// source.set_bootstrap_provider(Box::new(bootstrap_provider)).await;
Common Bootstrap Provider Options
Bootstrap providers are independent crates that you can add as dependencies:
ApplicationBootstrapProvider(drasi-bootstrap-application) - Replays stored insert events from shared stateScriptFileBootstrapProvider(drasi-bootstrap-scriptfile) - Loads initial data from JSONL filesNoopBootstrapProvider(drasi-bootstrap-noop) - Skips bootstrap entirely- Custom implementations of the
BootstrapProvidertrait fromdrasi-lib
Bootstrap Behavior
When a query subscribes with enable_bootstrap: true:
- The source delegates to the configured bootstrap provider
- The provider sends initial data events to the query
- After bootstrap completes, the query receives streaming events
If no bootstrap provider is configured, the query is informed that bootstrap is not available.
Thread Safety and Concurrency
Channel Architecture
The Application Source uses Tokio's bounded mpsc channel for event passing:
// Internal channel (default capacity: 1000 events)
let = channel;
Backpressure: If 1000 events are queued without processing, subsequent .send() calls will await until capacity is available.
Handle Cloning
ApplicationSourceHandle implements Clone and is safe to share across threads:
let handle2 = handle.clone;
let handle3 = handle.clone;
// All handles send to the same source
spawn;
spawn;
Async Operation
All send methods are async and must be awaited:
// Correct
handle.send_node_insert.await?;
// Incorrect (won't compile)
handle.send_node_insert?;
Shutdown Behavior
When ApplicationSource::stop() is called:
- Internal processing task is aborted
- Channel receiver is dropped
- Subsequent handle sends return error:
"Failed to send event: channel closed"
Best Practice: Handle channel closure errors gracefully in long-running applications.
Error Handling
Common Errors
// Channel closed (source stopped)
let result = handle.send_node_insert.await;
match result
Timestamp Errors
The source automatically generates timestamps using chrono::Utc::now(). In rare cases where timestamp generation fails, it falls back to millisecond precision:
// Automatic fallback (handled internally)
let effective_from = crateget_current_timestamp_nanos
.unwrap_or_else;
Testing Patterns
Basic Test Setup
use ;
use HashMap;
async
Testing Event Sequences
async
Testing Error Conditions
async
Performance Considerations
Channel Capacity
Default capacity: 1000 events. For high-throughput scenarios:
// Currently requires source code modification
// Future enhancement: configurable channel size
let = channel; // Larger buffer
Batch vs. Individual Sends
Individual sends (typical):
for event in events
Batch sends (better for large volumes):
let changes: = events.iter.map.collect;
handle.send_batch.await?;
Memory Usage
Memory usage depends on the configured bootstrap provider:
ApplicationBootstrapProviderstores events in memoryScriptFileBootstrapProviderreads from disk on demand- Consider bootstrap provider memory implications for high-volume scenarios
Comparison with Other Sources
| Feature | Application | HTTP | gRPC | PostgreSQL |
|---|---|---|---|---|
| Access | Rust API | REST | Protocol Buffers | Database CDC |
| Performance | Highest (in-process) | Moderate | Moderate | Database-dependent |
| Language Support | Rust only | Any | Any | Any (via database) |
| Network | No | Yes | Yes | Yes |
| Bootstrap | Pluggable provider | Pluggable provider | Pluggable provider | Pluggable provider |
| Use Case | Embedded, testing | Cross-language | Microservices | DB integration |
| Setup Complexity | Minimal | Moderate | Moderate | High |
Known Limitations
- No Deduplication: Source does not prevent duplicate element IDs (query engine handles this)
- Fixed Channel Size: Default 1000-event capacity requires code changes to increase
- Rust-Only: No cross-language support (by design)
- No Manual Timestamps: Timestamps are auto-generated, cannot be manually set
- No Reconnection: Stopped sources cannot be reused, handles become permanently unusable
- Bootstrap Requires Provider: No built-in bootstrap; requires configuring a bootstrap provider
Advanced Topics
Profiling Metadata
The source automatically adds profiling metadata to events for performance analysis:
let mut profiling = new;
profiling.source_send_ns = Some;
This metadata flows through the query pipeline and enables end-to-end latency tracking.
Component Lifecycle Events
The source emits lifecycle events via the component event channel:
- Starting: Source initialization began
- Running: Event processor started successfully
- Stopping: Shutdown initiated
- Stopped: Shutdown completed
Monitor these events for observability in production systems.
Retrieving Additional Handles
After source creation, get additional handles:
let = new?;
let handle2 = source.get_handle;
let handle3 = source.get_handle;
All handles (including the original) connect to the same source instance.
API Reference Summary
ApplicationSource
| Method | Description |
|---|---|
new(id, config) |
Create source and handle |
get_handle() |
Get an additional handle |
set_bootstrap_provider(provider) |
Configure bootstrap provider |
start() |
Start event processing |
stop() |
Stop event processing |
ApplicationSourceHandle
| Method | Description |
|---|---|
send(change) |
Send raw SourceChange event |
send_node_insert(id, labels, props) |
Insert a node |
send_node_update(id, labels, props) |
Update a node |
send_delete(id, labels) |
Delete a node or relation |
send_relation_insert(id, labels, props, start, end) |
Insert a relationship |
send_batch(changes) |
Send multiple changes |
source_id() |
Get source identifier |
PropertyMapBuilder
| Method | Description |
|---|---|
new() |
Create new builder |
with_string(key, value) |
Add string property |
with_integer(key, value) |
Add integer property (i64) |
with_float(key, value) |
Add float property (f64) |
with_bool(key, value) |
Add boolean property |
with_null(key) |
Add null property |
build() |
Build final property map |
Further Reading
- Source Implementation:
/Users/allenjones/dev/agentofreality/drasi/drasi-server/drasi-core/components/sources/application/src/lib.rs - Test Examples:
/Users/allenjones/dev/agentofreality/drasi/drasi-server/drasi-core/components/sources/application/src/tests.rs - Drasi Core Models: See
drasi-core/models/for graph data type definitions - Bootstrap Providers: See
drasi-lib/bootstrap/for bootstrap provider architecture