Application Reaction
Overview
The Application Reaction component provides programmatic access to continuous query results directly within your Rust application. Unlike other reaction types (HTTP, gRPC, SSE, Log) that send results to external systems, the Application Reaction delivers results through an in-process channel, enabling direct consumption of query results with zero network overhead.
Key Capabilities
- In-Process Result Delivery: Receive query results via async channels without network calls
- Multiple Consumption Patterns: Callbacks, async streams, or flexible subscriptions
- Filtering & Buffering: Configure query filtering, buffer sizes, timeouts, and batch processing
- Type-Safe API: Strongly-typed Rust interfaces with comprehensive error handling
- Zero-Copy Architecture: Results delivered through efficient async channels
- Single Consumer Model: Each reaction creates one handle for result consumption
Use Cases
- Embedded Applications: Applications that embed Drasi as a library and need direct access to query results
- Real-Time Processing: Low-latency reaction to data changes without network overhead
- Integration Testing: Test harness for validating query behavior programmatically
- Custom Business Logic: Complex application logic that responds to continuous query results
- Data Pipelines: Stream query results to custom processing pipelines
- Analytics & Monitoring: Real-time dashboards and monitoring systems built in Rust
Architecture
┌─────────────────┐
│ Drasi Queries │
└────────┬────────┘
│ Query Results
▼
┌─────────────────────────┐
│ ApplicationReaction │
│ (Priority Queue) │
└────────┬────────────────┘
│ mpsc::channel
▼
┌─────────────────────────┐
│ ApplicationReactionHandle│
└────────┬────────────────┘
│
┌────┴─────┬──────────┬────────────┐
▼ ▼ ▼ ▼
Callback Stream Subscription Raw Receiver
Configuration
Builder Pattern (Recommended)
The builder pattern provides a fluent API for creating application reactions:
use ApplicationReaction;
let = builder
.with_query
.with_query
.with_priority_queue_capacity
.with_auto_start
.build;
// Add to DrasiLib
drasi_lib.add_reaction.await?;
// Use handle to receive results
let mut subscription = handle.subscribe_with_options.await?;
while let Some = subscription.recv.await
Constructor Pattern
For simple cases, use the direct constructor:
use ApplicationReaction;
let = new;
drasi_lib.add_reaction.await?;
Configuration Struct
The ApplicationReactionConfig struct is used for serialization/deserialization:
use ApplicationReactionConfig;
use HashMap;
let config = ApplicationReactionConfig ;
Configuration Options
Builder Methods
| Method | Description | Data Type | Default |
|---|---|---|---|
with_query(query_id) |
Add a single query ID to subscribe to | String |
Empty list |
with_queries(query_ids) |
Set multiple query IDs to subscribe to | Vec<String> |
Empty list |
with_priority_queue_capacity(capacity) |
Set the priority queue buffer size | usize |
1000 |
with_auto_start(auto_start) |
Set whether reaction auto-starts | bool |
true |
Subscription Options
Configure how results are received using SubscriptionOptions:
| Option | Description | Data Type | Default |
|---|---|---|---|
buffer_size |
Maximum number of results to buffer | usize |
1000 |
query_filter |
Filter results by query IDs (empty = all) | Vec<String> |
Empty |
timeout |
Maximum time to wait for results | Option<Duration> |
None (wait forever) |
batch_size |
Maximum results per batch | Option<usize> |
None (10 for batches) |
Output Schema
QueryResult Structure
Results are delivered as QueryResult objects with the following schema:
Result Format
Each result in the results array is a JSON object representing a row:
Usage Examples
Example 1: Basic Subscription (Recommended)
The most flexible and recommended approach using subscriptions:
use ;
// Create reaction and handle
let = builder
.with_query
.build;
// Add to DrasiLib
drasi_lib.add_reaction.await?;
// Create subscription with default options
let mut subscription = handle.subscribe_with_options.await?;
// Receive results one at a time
while let Some = subscription.recv.await
Example 2: Callback Pattern
Process results with a callback function (spawns background task):
use ApplicationReaction;
let = builder
.with_queries
.build;
drasi_lib.add_reaction.await?;
// Subscribe with callback - runs in background
handle.subscribe.await?;
// Keep main task alive while callback processes results
sleep.await;
Example 3: Async Stream Pattern
Use async iteration for processing results:
use ApplicationReaction;
let = builder
.with_query
.build;
drasi_lib.add_reaction.await?;
// Convert to stream for async iteration
if let Some = handle.as_stream.await
Example 4: Filtered Subscription
Only receive results from specific queries:
use ;
let = builder
.with_queries
.build;
drasi_lib.add_reaction.await?;
// Only receive "users" query results
let options = default
.with_query_filter;
let mut subscription = handle.subscribe_with_options.await?;
while let Some = subscription.recv.await
Example 5: Batch Processing
Receive multiple results at once for high-throughput scenarios:
use ;
let = builder
.with_query
.with_priority_queue_capacity // Large buffer
.build;
drasi_lib.add_reaction.await?;
// Configure for batch processing
let options = default
.with_buffer_size
.with_batch_size; // Receive up to 100 results at a time
let mut subscription = handle.subscribe_with_options.await?;
loop
Example 6: Timeout Configuration
Use timeouts to prevent indefinite blocking:
use ;
use Duration;
let = builder
.with_query
.build;
drasi_lib.add_reaction.await?;
// Configure timeout
let options = default
.with_timeout;
let mut subscription = handle.subscribe_with_options.await?;
loop
Example 7: Non-Blocking Polling
Check for results without blocking:
use ApplicationReaction;
use ;
let = builder
.with_query
.build;
drasi_lib.add_reaction.await?;
let mut subscription = handle.subscribe_with_options.await?;
loop
Example 8: Multiple Consumers (Separate Reactions)
If you need multiple consumers, create separate application reactions:
use ApplicationReaction;
// Create separate reactions for different consumers
let = builder
.with_query
.build;
let = builder
.with_query
.build;
// Add both reactions
drasi_lib.add_reaction.await?;
drasi_lib.add_reaction.await?;
// Each consumer gets its own copy of results
spawn;
spawn;
Important Considerations
Single Consumer Model
Each ApplicationReactionHandle can only be consumed once. The underlying receiver is taken on first use:
- ✅ Valid: Call one consumption method (subscribe, as_stream, subscribe_with_options, take_receiver)
- ❌ Invalid: Call multiple consumption methods on the same handle
- ✅ Solution: Create multiple application reactions for multiple consumers
let = builder.build;
// This works - first call succeeds
let mut subscription = handle.subscribe_with_options.await?;
// This fails - receiver already taken
let result = handle.as_stream.await;
assert!; // Returns None
Cloning Handles
ApplicationReactionHandle is Clone, but all clones share the same receiver:
let = builder.build;
let handle2 = handle1.clone;
// Only ONE of these will succeed (whichever is called first)
let result1 = handle1.take_receiver.await; // Gets the receiver
let result2 = handle2.take_receiver.await; // Returns None
Thread Safety
ApplicationReactionHandleis thread-safe and can be shared across threadsSubscriptionandResultStreamare NOTSend- use within a single task- Callbacks must be
Send + 'staticas they run in background tasks
Priority Queue Behavior
Results are delivered in timestamp order using a priority queue:
- Default capacity: 1000 results
- Configurable via
with_priority_queue_capacity() - Larger capacity = more memory, better handling of out-of-order results
- Results are automatically sorted by timestamp before delivery
Error Handling
Methods return anyhow::Result for error handling:
use Result;
async
Performance Considerations
- Buffer Sizing: Larger buffers (both priority queue and subscription) handle bursts better
- Batch Processing: Use
recv_batch()for high-throughput scenarios - Query Filtering: Filter early with
query_filterto reduce processing overhead - Memory Usage: Each buffered result consumes memory - size buffers appropriately
- Zero-Copy: Results are cloned from the priority queue but use Arc internally
API Reference
ApplicationReaction
ApplicationReactionBuilder
ApplicationReactionHandle
SubscriptionOptions
Subscription
ResultStream / SubscriptionStream
Testing
Run the component tests:
Run with logging:
RUST_LOG=debug
License
Copyright 2025 The Drasi Authors.
Licensed under the Apache License, Version 2.0.