HTTP Source
A Drasi source plugin that exposes HTTP endpoints for receiving data change events. It provides both single-event and batch submission modes with adaptive batching for optimized throughput.
Overview
The HTTP Source is a plugin for the Drasi continuous query system that allows applications to submit graph data changes (nodes and relations) via REST API endpoints. It features:
- REST API Endpoints: Simple HTTP POST interface for submitting events
- Adaptive Batching: Automatically adjusts batch size and timing based on throughput patterns
- Dual Submission Modes: Single-event and batch endpoints for different use cases
- Universal Bootstrap: Supports any bootstrap provider (PostgreSQL, ScriptFile, Platform, etc.)
- Graph Data Model: Native support for nodes and relations with labels and properties
- Flexible Configuration: Builder pattern or configuration struct approaches
Key Capabilities
- Submit graph data changes via HTTP POST requests
- Automatic batch optimization based on traffic patterns
- Label-based query subscriptions and filtering
- Bootstrap initial data from external sources while streaming continues
- Health check endpoint for monitoring
- Configurable timeouts and dispatch modes
Use Cases
- Real-time Event Streaming: External systems push change events to Drasi
- Hybrid Data Loading: Bootstrap from database, then stream changes via HTTP
- Webhook Integration: Receive webhook notifications as graph events
- Manual Testing: Submit test data via curl during development
- API Integration: Connect third-party services to Drasi continuous queries
Configuration
Builder Pattern (Recommended)
The builder pattern provides a fluent, type-safe API for constructing HTTP sources:
use HttpSource;
// Basic HTTP source
let source = builder
.with_host
.with_port
.with_auto_start
.build?;
// With adaptive batching tuning
let source = builder
.with_host
.with_port
.with_adaptive_max_batch_size
.with_adaptive_min_batch_size
.with_adaptive_max_wait_ms
.with_adaptive_enabled
.build?;
// With custom dispatch settings
let source = builder
.with_host
.with_port
.with_dispatch_mode
.with_dispatch_buffer_capacity
.build?;
// With bootstrap provider
let source = builder
.with_host
.with_port
.with_bootstrap_provider
.build?;
Configuration Struct Approach
Alternatively, use HttpSourceConfig directly:
use ;
let config = HttpSourceConfig ;
let source = new?;
YAML Configuration (DrasiServer)
When using DrasiServer, configure HTTP sources via YAML:
sources:
- id: "my-http-source"
source_type: "http"
auto_start: true
host: "0.0.0.0"
port: 8080
endpoint: "/events"
timeout_ms: 30000
adaptive_enabled: true
adaptive_max_batch_size: 1000
adaptive_min_batch_size: 10
adaptive_max_wait_ms: 100
adaptive_min_wait_ms: 1
adaptive_window_secs: 5
Configuration Options
Core Settings
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
host |
HTTP server host address to bind to | String | Any valid hostname or IP | Required |
port |
HTTP server port number | u16 | 1-65535 | 8080 |
endpoint |
Optional custom endpoint path | Option | Any valid path | None |
timeout_ms |
Request timeout in milliseconds | u64 | Any positive integer | 10000 |
auto_start |
Whether to start automatically when added to DrasiLib | bool | true, false |
true |
Adaptive Batching Settings
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
adaptive_enabled |
Enable/disable adaptive batching | Option | true, false | true |
adaptive_max_batch_size |
Maximum events per batch | Option | Any positive integer | 1000 |
adaptive_min_batch_size |
Minimum events per batch | Option | Any positive integer | 10 |
adaptive_max_wait_ms |
Maximum wait time before dispatching (ms) | Option | Any positive integer | 100 |
adaptive_min_wait_ms |
Minimum wait time between batches (ms) | Option | Any positive integer | 1 |
adaptive_window_secs |
Throughput measurement window (seconds) | Option | Any positive integer | 5 |
Dispatch Settings (Builder Only)
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
dispatch_mode |
Event routing mode | DispatchMode | Channel, Broadcast | Channel |
dispatch_buffer_capacity |
Buffer size for dispatch channel | usize | Any positive integer | 1000 |
Bootstrap Settings (Builder Only)
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
bootstrap_provider |
Bootstrap provider for initial data | Box | Any provider implementation | None |
Input Schema
The HTTP source accepts JSON data in the HttpSourceChange format. All events use a tagged union structure with an operation field.
Insert Operation (Node)
Insert Operation (Relation)
Update Operation
Delete Operation
Batch Submission
Field Descriptions
- operation: Must be "insert", "update", or "delete"
- element: The graph element (node or relation)
- type: Must be "node" or "relation"
- id: Unique identifier for the element
- labels: Array of label strings (e.g., ["User"], ["FOLLOWS"])
- properties: JSON object with arbitrary key-value pairs
- from: (Relations only) Source node ID
- to: (Relations only) Target node ID
- timestamp: Optional nanoseconds since Unix epoch (auto-generated if omitted)
Usage Examples
Starting an HTTP Source
use HttpSource;
use Source;
// Create and start the source
let source = builder
.with_host
.with_port
.build?;
source.start.await?;
// Check status
assert_eq!;
Submitting Events via curl
# Submit single node
# Submit batch of events
# Health check
Python Example
# Submit a node
=
=
# {"success": true, "message": "All 1 events processed successfully"}
JavaScript/Node.js Example
const axios = require;
Complete Integration Example
use HttpSource;
use ;
async
Endpoints
The HTTP source exposes the following endpoints:
| Method | Path | Description |
|---|---|---|
| POST | /sources/{source_id}/events |
Submit a single event |
| POST | /sources/{source_id}/events/batch |
Submit multiple events |
| GET | /health |
Health check (returns service status and features) |
Response Format
Success:
Partial success (batch):
Error:
Adaptive Batching
The HTTP source includes intelligent batching that automatically adjusts based on throughput:
Throughput Levels
| Level | Messages/Second | Batch Size | Wait Time |
|---|---|---|---|
| Idle | < 1 | Minimum | Minimum (1ms) |
| Low | 1-100 | Small (2x min) | 1ms |
| Medium | 100-1,000 | Moderate (25% of max) | 10ms |
| High | 1,000-10,000 | Large (50% of max) | 25ms |
| Burst | > 10,000 | Maximum | 50ms |
Tuning Guidelines
For low latency (real-time dashboards):
.with_adaptive_max_wait_ms
.with_adaptive_min_batch_size
For high throughput (bulk data ingestion):
.with_adaptive_max_batch_size
.with_adaptive_max_wait_ms
To disable adaptive batching:
.with_adaptive_enabled
Bootstrap Providers
The HTTP source supports universal bootstrap - any bootstrap provider can be used to load initial data before streaming begins.
Common Patterns
Bootstrap from PostgreSQL, stream via HTTP:
let postgres_provider = new?;
let source = builder
.with_host
.with_port
.with_bootstrap_provider
.build?;
Bootstrap from files:
let file_provider = new?;
let source = builder
.with_host
.with_port
.with_bootstrap_provider
.build?;
Bootstrap Behavior
- Bootstrap runs asynchronously in a separate task
- Streaming events are processed immediately
- Queries receive both bootstrap and streaming data
- Bootstrap provider properties are passed via the source's generic properties map
Error Handling
Common Errors
Port already in use:
Failed to bind HTTP server to 0.0.0.0:8080: Address already in use
Solution: Change port or stop conflicting service
Invalid JSON:
Solution: Validate JSON structure against schema
Source name mismatch:
Solution: Ensure URL path matches source ID
Validation errors:
- Port cannot be 0
- Timeout cannot be 0
- Min batch size cannot exceed max batch size
- Min wait time cannot exceed max wait time
Testing
Unit Tests
# Run all HTTP source tests
# Run specific test module
# Run with logging
RUST_LOG=debug
Integration Testing
# Start test server
# In another terminal, submit test events
Performance Considerations
Channel Capacity
The internal batch channel capacity is automatically calculated as max_batch_size × 5:
| Max Batch Size | Channel Capacity | Memory (1KB/event) |
|---|---|---|
| 100 | 500 | ~500 KB |
| 1,000 | 5,000 | ~5 MB |
| 5,000 | 25,000 | ~25 MB |
Dispatch Modes
- Channel (default): Isolated channels per subscriber with backpressure, zero message loss
- Broadcast: Shared channel, no backpressure, possible message loss under high load
Best Practices
- Use batch endpoint for bulk operations (reduces HTTP overhead)
- Enable adaptive batching for variable traffic patterns
- Tune batch sizes based on your throughput requirements
- Monitor health endpoint for production deployments
- Use Channel dispatch mode when message reliability is critical
Architecture Notes
Internal Structure
- HttpSource: Main plugin implementation (lib.rs)
- HttpSourceConfig: Configuration struct (config.rs)
- HttpElement/HttpSourceChange: Event models (models.rs)
- AdaptiveBatcher: Batching logic (adaptive_batcher.rs)
- Time utilities: Timestamp handling (time.rs)
Data Flow
- HTTP POST request arrives at Axum endpoint
- JSON deserialized to
HttpSourceChange - Converted to
drasi_core::models::SourceChange - Sent to adaptive batcher via mpsc channel
- Batcher accumulates events based on throughput
- Batch forwarded to dispatchers
- Dispatchers route to subscribed queries
Thread Safety
- All operations are async using Tokio
- Shared state protected by Arc<RwLock<_>>
- Channel-based communication for event flow
- Component status tracked with atomic updates