Please check the build logs for more information.
See Builds for ideas on how to fix a failed build, or Metadata for how to configure docs.rs builds.
If you believe this is docs.rs' fault, open an issue.
Mock Source
Overview
The Mock Source is a synthetic data generator plugin designed for testing, development, and demonstration purposes within the Drasi data processing platform. It generates continuous streams of graph node data without requiring any external systems or databases, making it ideal for rapid prototyping, testing query logic, and demonstrating Drasi capabilities.
Key Capabilities
- Three Built-in Data Generation Modes: Counter, Sensor, and Generic data types
- Configurable Generation Intervals: Control data emission rate from milliseconds to seconds
- Zero External Dependencies: No databases, APIs, or external services required
- Async Task-Based Generation: Runs as a Tokio task with precise interval timing
- Builder Pattern Support: Fluent API for easy source construction
- Label-Based Bootstrap Filtering: Supports bootstrap requests with label-based filtering
- Test Utilities: Built-in methods for injecting events and subscribing to data in tests
Use Cases
Testing and Development
- Test continuous query logic without setting up external databases
- Validate query filtering, aggregation, and transformation logic
- Develop and test reactions in isolation
- Prototype new features quickly with predictable data
Demonstrations and Presentations
- Show Drasi capabilities without infrastructure setup
- Create predictable, repeatable demo scenarios
- Demonstrate real-time data processing and continuous queries
Load Testing
- Generate continuous data streams for performance testing
- Test system behavior under sustained load with configurable rates
- Validate scaling characteristics and backpressure handling
Integration Testing
- Test multi-source pipelines with predictable data
- Validate data routing between components
- Test bootstrap and streaming data integration
Configuration
Builder Pattern (Recommended)
The builder pattern provides a fluent API for constructing MockSource instances with compile-time validation:
use MockSource;
use DispatchMode;
// Basic construction with defaults
let source = builder
.build?;
// Full configuration with all options
let source = builder
.with_data_type
.with_interval_ms
.with_dispatch_mode
.with_dispatch_buffer_capacity
.with_bootstrap_provider
.with_auto_start
.build?;
// Counter source for testing
let counter = builder
.with_data_type
.with_interval_ms
.build?;
Config Struct Approach
For programmatic or configuration-file-driven scenarios:
use ;
// Using MockSourceConfig
let config = MockSourceConfig ;
let source = new?;
// With custom dispatch settings
let source = with_dispatch?;
Configuration Options
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
id |
Unique identifier for the source instance | String |
Any non-empty string | (Required) |
data_type |
Type of synthetic data to generate | String |
"counter", "sensor", "generic" |
"generic" |
interval_ms |
Interval between data generation events in milliseconds | u64 |
Any positive integer (minimum 1) | 5000 |
dispatch_mode |
Event dispatch mode for subscribers | DispatchMode |
Channel (isolated with backpressure), Broadcast (shared, no backpressure) |
Channel |
dispatch_buffer_capacity |
Buffer size for dispatch channels | usize |
Any positive integer | 1000 |
bootstrap_provider |
Bootstrap provider for initial data delivery | Box<dyn BootstrapProvider> |
Any bootstrap provider implementation | None |
auto_start |
Whether to start automatically when added to DrasiLib | bool |
true, false |
true |
Configuration Validation:
The MockSourceConfig::validate() method checks:
data_typeis one of the valid types:"counter","sensor", or"generic"interval_msis greater than 0 (non-zero interval required)
Input Schema
The MockSource generates data internally and does not consume external input. However, it produces graph nodes with the following schemas based on the configured data_type:
Counter Mode (data_type: "counter")
Generated Node Schema:
Label: Counter
Element ID Format: counter_{sequence}
Properties:
- value: Integer (sequential counter starting from 1)
- timestamp: String (RFC3339 formatted UTC timestamp)
Example Node:
Characteristics:
- Sequential integer values starting from 1
- Increments by 1 on each generation
- Predictable, deterministic sequence
- Useful for testing ordering and sequential processing
Sensor Mode (data_type: "sensor")
Generated Node Schema:
Label: SensorReading
Element ID Format: reading_{sensor_id}_{sequence}
Properties:
- sensor_id: String (randomly selected: "sensor_0" to "sensor_4")
- temperature: Float (random value between 20.0 and 30.0)
- humidity: Float (random value between 40.0 and 60.0)
- timestamp: String (RFC3339 formatted UTC timestamp)
Example Node:
Characteristics:
- Simulates 5 different IoT sensors (sensor_0 through sensor_4)
- Randomized sensor selection on each generation
- Temperature range: 20.0°C to 30.0°C
- Humidity range: 40% to 60%
- Useful for testing IoT scenarios, aggregations, and filtering
Generic Mode (data_type: "generic")
Generated Node Schema:
Label: Generic
Element ID Format: generic_{sequence}
Properties:
- value: Integer (random 32-bit signed integer)
- message: String (fixed: "Generic mock data")
- timestamp: String (RFC3339 formatted UTC timestamp)
Example Node:
Characteristics:
- Random integer values (full i32 range: -2,147,483,648 to 2,147,483,647)
- Fixed message string for consistency
- Default mode when data_type is not specified
- General-purpose testing and development
Usage Examples
Basic Usage
use MockSource;
use Source;
use Arc;
async
Integration with DrasiLib
use MockSource;
use DrasiLib;
use Arc;
async
Testing with Mock Source
use ;
use SourceEvent;
use Source;
async
Manual Event Injection (Testing)
use MockSource;
use ;
use Arc;
async
With Bootstrap Provider
use MockSource;
use ;
use async_trait;
// Custom bootstrap provider
;
async
Different Dispatch Modes
use MockSource;
use DispatchMode;
// Channel mode - isolated channels per subscriber with backpressure
let channel_source = builder
.with_data_type
.with_dispatch_mode
.with_dispatch_buffer_capacity
.build?;
// Broadcast mode - shared channel, no backpressure
// Events may be dropped if subscribers can't keep up
let broadcast_source = builder
.with_data_type
.with_dispatch_mode
.with_dispatch_buffer_capacity
.build?;
Implementation Details
Data Generation Mechanism
The MockSource runs an internal Tokio task that:
- Interval Timer: Uses
tokio::time::intervalfor precise timing - Status Checking: Monitors component status and stops when not Running
- Sequence Counter: Maintains a sequence number starting from 1
- Data Generation: Creates nodes based on
data_typeconfiguration - Event Dispatch: Publishes via SourceBase dispatcher system
- Profiling Metadata: Includes timestamps for performance tracking
Generation Loop:
Start → Set Interval → Tick → Check Status → Generate Data →
Dispatch Event → Increment Sequence → Tick → ...
Element ID Generation
All generated nodes have predictable element IDs:
- Counter:
counter_{sequence}(e.g.,counter_1,counter_2, ...) - Sensor:
reading_{sensor_id}_{sequence}(e.g.,reading_2_42) - Generic:
generic_{sequence}(e.g.,generic_1,generic_2, ...)
The sequence number increments with each generation cycle, providing traceability and ordering.
Timestamp Generation
All modes include an RFC3339 formatted timestamp:
2025-12-05T10:30:45.123456789Z
Timestamp Strategy:
- Primary:
chrono::Utc::now().to_rfc3339()for timestamp property - Metadata: Nanosecond precision using
SystemTime::duration_since(UNIX_EPOCH) - Fallback: If nanosecond timestamp fails, uses millisecond precision × 1,000,000
Randomization Details
Sensor Mode:
- Temperature:
20.0 + rand::random::<f64>() * 10.0→ [20.0, 30.0) - Humidity:
40.0 + rand::random::<f64>() * 20.0→ [40.0, 60.0) - Sensor ID:
rand::random::<u32>() % 5→ {0, 1, 2, 3, 4}
Generic Mode:
- Value:
rand::random::<i32>()→ Full i32 range
Note: Uses the rand crate's default RNG, not cryptographically secure.
Lifecycle Management
Status Transitions:
Stopped → Starting → Running → Stopping → Stopped
Start Process:
- Set status to Starting
- Send component event
- Spawn generation task
- Set status to Running
- Send success event
Stop Process:
- Set status to Stopping
- Send component event
- Abort generation task
- Wait for task completion
- Set status to Stopped
- Send success event
Label-Based Bootstrap
When queries subscribe with bootstrap enabled:
- Query provides required node/relation labels
- Bootstrap provider (if configured) receives label filters
- Provider returns initial data matching labels
- Initial data sent before continuous streaming begins
Default Behavior: If no bootstrap provider is configured, bootstrap completes immediately with no initial data.
Testing Utilities
test_subscribe()
Creates a test subscription that receives all generated events:
let source = builder.build?;
let mut rx = source.test_subscribe;
source.start.await?;
// Receive events
while let Ok = rx.recv.await
Returns: Box<dyn ChangeReceiver<SourceEventWrapper>>
inject_event()
Manually injects a custom event for testing:
let source = builder.build?;
let mut rx = source.test_subscribe;
let change = Insert ;
source.inject_event.await?;
// Event will be received by subscribers
let event = rx.recv.await?;
Use Cases:
- Testing specific edge cases
- Simulating error conditions
- Testing event processing logic
- Creating deterministic test scenarios
Performance Characteristics
Generation Rate
| Interval Setting | Events/Second | Typical Use Case |
|---|---|---|
| 10-50ms | 20-100 | High-volume load testing |
| 100-500ms | 2-10 | Rapid testing, development |
| 1000-3000ms | 0.33-1 | Demos, presentations |
| 5000-10000ms | 0.1-0.2 | Slow background generation |
Default: 5000ms (0.2 events/second)
Memory Overhead
- Per Event: Minimal (~200-500 bytes per node)
- Task Overhead: Single Tokio task per source
- No Persistence: Events are not stored, only generated and dispatched
- Channel Buffers: Configurable (default 1000 events)
Throughput Recommendations
Testing: 100-1000ms intervals
- Fast enough to catch issues quickly
- Not overwhelming for debugging
Demonstrations: 1000-3000ms intervals
- Visible data flow without overwhelming viewers
- Comfortable pace for explanations
Load Testing: 10-100ms intervals
- High-volume data generation
- Tests backpressure and buffer handling
- May impact system performance with complex queries
Warning: Very short intervals (<10ms) can saturate CPU and memory depending on query complexity and reaction processing time.
Dispatch Mode Performance
Channel Mode (Default):
- Isolated channel per subscriber
- Backpressure applied when subscriber is slow
- Zero message loss guarantee
- Higher memory usage with many subscribers
Broadcast Mode:
- Single shared channel for all subscribers
- No backpressure (fast path)
- Slow subscribers may drop messages
- Lower memory overhead
Known Limitations
- Insert-Only Operations: Only generates insert events, never updates or deletes
- Node-Only Data: Only generates nodes, not graph relationships/edges
- Fixed Schemas: Each mode has a predefined schema that cannot be customized
- No Custom Properties: Cannot add additional properties beyond the schema
- Predictable Randomness: Not cryptographically secure (uses standard
randcrate) - Single Label Per Mode: Each mode generates nodes with exactly one label
- No State Persistence: Sequence counter resets on restart
- No Relationship Generation: Cannot generate edges between nodes
Troubleshooting
No Data Being Generated
Symptoms:
- Source status shows Running but no events received
- Queries return no results
Checks:
- Verify source has been started:
source.start().await? - Check status:
source.status().awaitshould beComponentStatus::Running - Verify interval is reasonable (not hours/days)
- Check logs for "Mock source started successfully" message
- Ensure subscribers are properly registered
Debug Commands:
// Check status
let status = source.status.await;
println!;
// Check properties
let props = source.properties;
println!;
println!;
Wrong Data Type Generated
Symptoms:
- Unexpected node labels in query results
- Missing expected properties
Checks:
- Verify
data_typespelling is exact (case-sensitive) - Valid values:
"counter","sensor","generic" - Invalid/missing values default to
"generic" - Check configuration was applied correctly
Verification:
let props = source.properties;
assert_eq!;
Query Returns No Results
Symptoms:
- Source is generating data but queries are empty
Common Causes:
Wrong Label:
-- Wrong: Looking for wrong label
MATCH (n:Sensor) RETURN n // Should be SensorReading
-- Correct:
MATCH (n:SensorReading) RETURN n
Wrong Property Names:
-- Wrong: Property name typo
MATCH (s:SensorReading) WHERE s.temp > 25 RETURN s // Should be temperature
-- Correct:
MATCH (s:SensorReading) WHERE s.temperature > 25 RETURN s
Impossible Filter:
-- Wrong: Temperature range is 20-30, this filters everything
MATCH (s:SensorReading) WHERE s.temperature > 100 RETURN s
-- Correct: Use realistic range
MATCH (s:SensorReading) WHERE s.temperature > 25 RETURN s
Configuration Validation Errors
Invalid Data Type:
Error: Validation error: data_type 'sensors' is not valid.
Valid options are: counter, sensor, generic
Solution: Use exact spelling: "counter", "sensor", or "generic"
Zero Interval:
Error: Validation error: interval_ms cannot be 0.
Please specify a positive interval in milliseconds (minimum 1)
Solution: Use positive interval (minimum 1ms)
High CPU Usage
Symptoms:
- MockSource consuming significant CPU
- System slowdown
Common Causes:
- Very short interval (<10ms) generating too many events
- Complex queries processing each event
- Too many concurrent mock sources
Solutions:
// Increase interval for lower CPU usage
let source = builder
.with_interval_ms // Was: 10ms
.build?;
// Use broadcast mode to reduce dispatch overhead
let source = builder
.with_dispatch_mode
.build?;
Related Components
Dependencies
- drasi-lib: Core Drasi library providing Source trait and channel infrastructure
- drasi-core: Core models (Element, SourceChange, ElementValue, etc.)
- tokio: Async runtime for task execution and interval timing
- chrono: Timestamp generation and formatting
- rand: Random value generation for sensor and generic modes
- serde/serde_json: Configuration serialization
Related Documentation
- Source Plugin Guide:
/Users/allenjones/dev/agentofreality/drasi/drasi-server/drasi-core/lib/src/sources/README.md - Bootstrap Providers:
/Users/allenjones/dev/agentofreality/drasi/drasi-server/drasi-core/lib/src/bootstrap/README.md - Channel System:
/Users/allenjones/dev/agentofreality/drasi/drasi-server/drasi-core/lib/src/channels/README.md - SourceBase:
/Users/allenjones/dev/agentofreality/drasi/drasi-server/drasi-core/lib/src/sources/base.rs
Example Projects
See the test suite in src/tests.rs for comprehensive usage examples including:
- Construction patterns
- Lifecycle management
- Event generation verification
- Builder API usage
- Configuration serialization
Changelog
Version 0.2.0
- Initial release as separate plugin crate
- Three data generation modes: counter, sensor, generic
- Builder pattern support
- Bootstrap provider integration
- Test utilities (inject_event, test_subscribe)
- Configurable dispatch modes and buffer capacity