HTTP Source
A Drasi source plugin that exposes HTTP endpoints for receiving data change events. It supports two mutually exclusive modes: Standard Mode for structured events and Webhook Mode for receiving arbitrary payloads from external services like GitHub, Shopify, or custom applications.
Overview
The HTTP Source is a plugin for the Drasi continuous query system that allows applications to submit graph data changes (nodes and relations) via REST API endpoints. It features:
- REST API Endpoints: Simple HTTP POST interface for submitting events
- Adaptive Batching: Automatically adjusts batch size and timing based on throughput patterns
- Dual Submission Modes: Single-event and batch endpoints for different use cases
- Webhook Mode: Configurable routes with template-based payload transformation for external services
- Authentication: HMAC signature verification and Bearer token support
- Multi-Format Payloads: JSON, XML, YAML, and plain text content types
- Universal Bootstrap: Supports any bootstrap provider (PostgreSQL, ScriptFile, Platform, etc.)
- Graph Data Model: Native support for nodes and relations with labels and properties
- Flexible Configuration: Builder pattern or configuration struct approaches
Modes
The HTTP source operates in one of two mutually exclusive modes:
| Mode | When Active | Endpoints Available |
|---|---|---|
| Standard Mode | No webhooks config present |
/sources/{id}/events, /sources/{id}/events/batch, /health |
| Webhook Mode | webhooks config present |
Custom routes defined in config + /health |
Note: When webhook mode is active, standard endpoints return 404. The modes cannot be combined.
Key Capabilities
- Submit graph data changes via HTTP POST requests
- Automatic batch optimization based on traffic patterns
- Label-based query subscriptions and filtering
- Bootstrap initial data from external sources while streaming continues
- Health check endpoint for monitoring
- Configurable timeouts and dispatch modes
Use Cases
- Real-time Event Streaming: External systems push change events to Drasi
- Hybrid Data Loading: Bootstrap from database, then stream changes via HTTP
- Webhook Integration: Receive webhooks from GitHub, Shopify, Stripe, or any external service
- Manual Testing: Submit test data via curl during development
- API Integration: Connect third-party services to Drasi continuous queries
Configuration
Builder Pattern (Recommended)
The builder pattern provides a fluent, type-safe API for constructing HTTP sources:
use HttpSource;
// Basic HTTP source
let source = builder
.with_host
.with_port
.with_auto_start
.build?;
// With adaptive batching tuning
let source = builder
.with_host
.with_port
.with_adaptive_max_batch_size
.with_adaptive_min_batch_size
.with_adaptive_max_wait_ms
.with_adaptive_enabled
.build?;
// With custom dispatch settings
let source = builder
.with_host
.with_port
.with_dispatch_mode
.with_dispatch_buffer_capacity
.build?;
// With bootstrap provider
let source = builder
.with_host
.with_port
.with_bootstrap_provider
.build?;
Configuration Struct Approach
Alternatively, use HttpSourceConfig directly:
use ;
let config = HttpSourceConfig ;
let source = new?;
YAML Configuration (DrasiServer)
When using DrasiServer, configure HTTP sources via YAML:
sources:
- id: "my-http-source"
source_type: "http"
auto_start: true
host: "0.0.0.0"
port: 8080
endpoint: "/events"
timeout_ms: 30000
adaptive_enabled: true
adaptive_max_batch_size: 1000
adaptive_min_batch_size: 10
adaptive_max_wait_ms: 100
adaptive_min_wait_ms: 1
adaptive_window_secs: 5
Configuration Options
Core Settings
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
host |
HTTP server host address to bind to | String | Any valid hostname or IP | Required |
port |
HTTP server port number | u16 | 1-65535 | 8080 |
endpoint |
Optional custom endpoint path | Option | Any valid path | None |
timeout_ms |
Request timeout in milliseconds | u64 | Any positive integer | 10000 |
auto_start |
Whether to start automatically when added to DrasiLib | bool | true, false |
true |
Adaptive Batching Settings
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
adaptive_enabled |
Enable/disable adaptive batching | Option | true, false | true |
adaptive_max_batch_size |
Maximum events per batch | Option | Any positive integer | 1000 |
adaptive_min_batch_size |
Minimum events per batch | Option | Any positive integer | 10 |
adaptive_max_wait_ms |
Maximum wait time before dispatching (ms) | Option | Any positive integer | 100 |
adaptive_min_wait_ms |
Minimum wait time between batches (ms) | Option | Any positive integer | 1 |
adaptive_window_secs |
Throughput measurement window (seconds) | Option | Any positive integer | 5 |
Dispatch Settings (Builder Only)
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
dispatch_mode |
Event routing mode | DispatchMode | Channel, Broadcast | Channel |
dispatch_buffer_capacity |
Buffer size for dispatch channel | usize | Any positive integer | 1000 |
Bootstrap Settings (Builder Only)
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
bootstrap_provider |
Bootstrap provider for initial data | Box | Any provider implementation | None |
Webhook Mode
Webhook mode enables the HTTP source to receive arbitrary payloads from external services and transform them into graph events using Handlebars templates.
Webhook Configuration
sources:
- id: "webhook-source"
source_type: "http"
host: "0.0.0.0"
port: 8080
webhooks:
error_behavior: accept_and_log # Global default
routes:
- path: "/github/events"
methods:
auth:
signature:
type: hmac-sha256
secret_env: GITHUB_WEBHOOK_SECRET
header: X-Hub-Signature-256
prefix: "sha256="
error_behavior: reject # Override for this route
mappings:
- when:
header: X-GitHub-Event
equals: push
operation: insert
element_type: node
effective_from: "{{payload.head_commit.timestamp}}"
template:
id: "commit-{{payload.head_commit.id}}"
labels:
properties:
message: "{{payload.head_commit.message}}"
author: "{{payload.head_commit.author.name}}"
branch: "{{payload.ref}}"
Webhook Configuration Options
Top-Level Webhook Settings
| Name | Description | Values | Default |
|---|---|---|---|
error_behavior |
How to handle mapping errors | reject, accept_and_log, accept_silent |
reject |
cors |
CORS configuration | Object (see below) | None (CORS disabled) |
routes |
Array of route configurations | Array | Required |
CORS Configuration
Enable CORS (Cross-Origin Resource Sharing) for browser-based clients:
webhooks:
cors:
enabled: true
allow_origins: # or specific origins
allow_methods:
allow_headers:
expose_headers:
allow_credentials: false
max_age: 3600
routes:
# ...
| Name | Description | Default |
|---|---|---|
enabled |
Enable/disable CORS | true |
allow_origins |
Allowed origins (["*"] for any) |
["*"] |
allow_methods |
Allowed HTTP methods | ["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"] |
allow_headers |
Allowed request headers | ["Content-Type", "Authorization", "X-Requested-With"] |
expose_headers |
Headers to expose to browser | [] |
allow_credentials |
Allow cookies/auth headers | false |
max_age |
Preflight cache duration (seconds) | 3600 |
Route Settings
| Name | Description | Required |
|---|---|---|
path |
URL path pattern (supports :param syntax) |
Yes |
methods |
HTTP methods to accept | No (defaults to all) |
auth |
Authentication configuration | No |
error_behavior |
Override global error behavior | No |
mappings |
Array of payload-to-event mappings | Yes |
Authentication Options
HMAC Signature Verification (GitHub, Shopify):
auth:
signature:
type: hmac-sha256 # or hmac-sha1
secret_env: SECRET_VAR # Environment variable name
header: X-Hub-Signature-256
prefix: "sha256=" # Optional prefix to strip
encoding: hex # hex (default) or base64
Bearer Token:
auth:
bearer:
token_env: API_TOKEN # Environment variable name
Note: When both signature and bearer auth are configured, both must pass.
Mapping Settings
| Name | Description | Required |
|---|---|---|
when |
Condition for this mapping to apply | No |
operation |
Graph operation: insert, update, delete |
No (can derive from payload) |
operation_from |
Path to operation value in payload | No |
element_type |
node or relation |
Yes |
effective_from |
Handlebars template for timestamp | No |
template |
Element template (see below) | Yes |
Condition Syntax (when)
# Match header value
when:
header: X-GitHub-Event
equals: push
# Match payload field (dot notation for nested fields)
when:
field: event.type
equals: order.created
# Match with contains
when:
header: Content-Type
contains: json
# Match with regex
when:
field: action
regex: "^(created|updated)$"
Note: Conditions currently support
header(HTTP headers) andfield(payload fields). Path parameters and query parameters are available in templates via{{route.param}}and{{query.param}}.
Template Structure
Node Template:
template:
id: "{{payload.id}}"
labels:
properties:
customer: "{{payload.customer.name}}"
total: "{{payload.total}}"
items: "{{payload.line_items}}" # Preserves arrays/objects
Relation Template:
template:
id: "rel-{{payload.id}}"
labels:
from: "customer-{{payload.customer_id}}"
to: "product-{{payload.product_id}}"
properties:
quantity: "{{payload.quantity}}"
Spread Entire Payload as Properties:
To use all fields from the incoming payload as node/relation properties, use a template string instead of an object:
template:
id: "event-{{payload.id}}"
labels:
properties: "{{payload}}" # Spreads all payload fields as properties
With this configuration, an incoming payload like:
Creates a node with properties: id, name, status, and metadata (as nested object).
Spread Nested Object:
template:
id: "{{payload.id}}"
labels:
properties: "{{payload.data}}" # Spread only the 'data' sub-object
Template Context Variables
Templates have access to these variables:
| Variable | Description | Example |
|---|---|---|
payload |
Parsed request body | {{payload.user.name}} |
headers |
HTTP headers (lowercase keys) | {{headers.x-request-id}} |
route |
Path parameters from route | {{route.id}} |
query |
Query string parameters | {{query.filter}} |
method |
HTTP method | {{method}} |
path |
Request path | {{path}} |
Note: Access Content-Type via headers:
{{headers.content-type}}
Template Helpers
| Helper | Description | Example |
|---|---|---|
lowercase |
Convert to lowercase | {{lowercase payload.name}} |
uppercase |
Convert to uppercase | {{uppercase payload.status}} |
now |
Current timestamp (ms) | {{now}} |
concat |
Concatenate strings | {{concat "prefix-" payload.id}} |
default |
Fallback value | {{default payload.name "Unknown"}} |
json |
Serialize to JSON string | {{json payload.metadata}} |
Property Value Types
Template values preserve their original types from the payload:
| Payload Type | Result Type |
|---|---|
| String | ElementValue::String |
| Number | ElementValue::Integer or Float |
| Boolean | ElementValue::Bool |
| Array | ElementValue::List |
| Object | ElementValue::Object |
| Null | ElementValue::Null |
To force a value to a JSON string, use the json helper:
metadata: "{{json payload.complex_object}}" # Becomes a string
Timestamp Handling (effective_from)
The effective_from field sets the element's effective timestamp in milliseconds since Unix epoch. It auto-detects format:
| Input | Detection | Conversion |
|---|---|---|
1699900000 |
Unix seconds | × 1,000 (to milliseconds) |
1699900000000 |
Unix milliseconds | Used directly |
1699900000000000000 |
Unix nanoseconds | ÷ 1,000,000 (to milliseconds) |
2024-01-15T10:30:00Z |
ISO 8601 | Parsed to milliseconds |
2024-01-15T10:30:00.123Z |
ISO 8601 with ms | Parsed to milliseconds |
Error Behavior
| Value | Behavior |
|---|---|
reject |
Return HTTP 400/500 error |
accept_and_log |
Return HTTP 200, log error |
accept_silent |
Return HTTP 200, ignore silently |
Example: GitHub Webhooks
webhooks:
routes:
- path: "/github/events"
methods:
auth:
signature:
type: hmac-sha256
secret_env: GITHUB_WEBHOOK_SECRET
header: X-Hub-Signature-256
prefix: "sha256="
mappings:
# Push events
- when:
header: X-GitHub-Event
equals: push
operation: insert
element_type: node
effective_from: "{{payload.head_commit.timestamp}}"
template:
id: "commit-{{payload.head_commit.id}}"
labels:
properties:
message: "{{payload.head_commit.message}}"
author: "{{payload.head_commit.author.name}}"
repo: "{{payload.repository.full_name}}"
branch: "{{payload.ref}}"
# Issue events
- when:
header: X-GitHub-Event
equals: issues
operation_from: payload.action # opened, closed, etc.
element_type: node
template:
id: "issue-{{payload.issue.id}}"
labels:
properties:
title: "{{payload.issue.title}}"
state: "{{payload.issue.state}}"
author: "{{payload.issue.user.login}}"
Example: Shopify Webhooks
webhooks:
routes:
- path: "/shopify/orders"
methods:
auth:
signature:
type: hmac-sha256
secret_env: SHOPIFY_WEBHOOK_SECRET
header: X-Shopify-Hmac-SHA256
encoding: base64
mappings:
- when:
header: X-Shopify-Topic
equals: orders/create
operation: insert
element_type: node
template:
id: "order-{{payload.id}}"
labels:
properties:
order_number: "{{payload.order_number}}"
total: "{{payload.total_price}}"
currency: "{{payload.currency}}"
customer_email: "{{payload.email}}"
line_items: "{{payload.line_items}}"
Example: Generic REST API
webhooks:
routes:
- path: "/api/:resource/:id"
methods:
auth:
bearer:
token_env: API_SECRET_TOKEN
mappings:
# Match users resource via payload field (since conditions support header/field)
- when:
field: resource_type
equals: users
operation: insert
element_type: node
template:
id: "user-{{route.id}}"
labels:
properties:
name: "{{payload.name}}"
email: "{{payload.email}}"
Input Schema (Standard Mode)
The HTTP source accepts JSON data in the HttpSourceChange format. All events use a tagged union structure with an operation field.
Insert Operation (Node)
Insert Operation (Relation)
Update Operation
Delete Operation
Batch Submission
Field Descriptions
- operation: Must be "insert", "update", or "delete"
- element: The graph element (node or relation)
- type: Must be "node" or "relation"
- id: Unique identifier for the element
- labels: Array of label strings (e.g., ["User"], ["FOLLOWS"])
- properties: JSON object with arbitrary key-value pairs
- from: (Relations only) Source node ID
- to: (Relations only) Target node ID
- timestamp: Optional nanoseconds since Unix epoch (auto-generated if omitted)
Usage Examples
Starting an HTTP Source
use HttpSource;
use Source;
// Create and start the source
let source = builder
.with_host
.with_port
.build?;
source.start.await?;
// Check status
assert_eq!;
Submitting Events via curl
# Submit single node
# Submit batch of events
# Health check
Python Example
# Submit a node
=
=
# {"success": true, "message": "All 1 events processed successfully"}
JavaScript/Node.js Example
const axios = require;
Complete Integration Example
use HttpSource;
use ;
async
Endpoints
The HTTP source exposes the following endpoints:
| Method | Path | Description |
|---|---|---|
| POST | /sources/{source_id}/events |
Submit a single event |
| POST | /sources/{source_id}/events/batch |
Submit multiple events |
| GET | /health |
Health check (returns service status and features) |
Response Format
Success:
Partial success (batch):
Error:
Adaptive Batching
The HTTP source includes intelligent batching that automatically adjusts based on throughput:
Throughput Levels
| Level | Messages/Second | Batch Size | Wait Time |
|---|---|---|---|
| Idle | < 1 | Minimum | Minimum (1ms) |
| Low | 1-100 | Small (2x min) | 1ms |
| Medium | 100-1,000 | Moderate (25% of max) | 10ms |
| High | 1,000-10,000 | Large (50% of max) | 25ms |
| Burst | > 10,000 | Maximum | 50ms |
Tuning Guidelines
For low latency (real-time dashboards):
.with_adaptive_max_wait_ms
.with_adaptive_min_batch_size
For high throughput (bulk data ingestion):
.with_adaptive_max_batch_size
.with_adaptive_max_wait_ms
To disable adaptive batching:
.with_adaptive_enabled
Bootstrap Providers
The HTTP source supports universal bootstrap - any bootstrap provider can be used to load initial data before streaming begins.
Common Patterns
Bootstrap from PostgreSQL, stream via HTTP:
let postgres_provider = new?;
let source = builder
.with_host
.with_port
.with_bootstrap_provider
.build?;
Bootstrap from files:
let file_provider = new?;
let source = builder
.with_host
.with_port
.with_bootstrap_provider
.build?;
Bootstrap Behavior
- Bootstrap runs asynchronously in a separate task
- Streaming events are processed immediately
- Queries receive both bootstrap and streaming data
- Bootstrap provider properties are passed via the source's generic properties map
Error Handling
Common Errors
Port already in use:
Failed to bind HTTP server to 0.0.0.0:8080: Address already in use
Solution: Change port or stop conflicting service
Invalid JSON:
Solution: Validate JSON structure against schema
Source name mismatch:
Solution: Ensure URL path matches source ID
Validation errors:
- Port cannot be 0
- Timeout cannot be 0
- Min batch size cannot exceed max batch size
- Min wait time cannot exceed max wait time
Testing
Unit Tests
# Run all HTTP source tests
# Run specific test module
# Run with logging
RUST_LOG=debug
Integration Testing
# Start test server
# In another terminal, submit test events
Performance Considerations
Channel Capacity
The internal batch channel capacity is automatically calculated as max_batch_size × 5:
| Max Batch Size | Channel Capacity | Memory (1KB/event) |
|---|---|---|
| 100 | 500 | ~500 KB |
| 1,000 | 5,000 | ~5 MB |
| 5,000 | 25,000 | ~25 MB |
Dispatch Modes
- Channel (default): Isolated channels per subscriber with backpressure, zero message loss
- Broadcast: Shared channel, no backpressure, possible message loss under high load
Best Practices
- Use batch endpoint for bulk operations (reduces HTTP overhead)
- Enable adaptive batching for variable traffic patterns
- Tune batch sizes based on your throughput requirements
- Monitor health endpoint for production deployments
- Use Channel dispatch mode when message reliability is critical
Architecture Notes
Internal Structure
- HttpSource: Main plugin implementation (lib.rs)
- HttpSourceConfig: Configuration struct (config.rs)
- HttpElement/HttpSourceChange: Event models (models.rs)
- AdaptiveBatcher: Batching logic (adaptive_batcher.rs)
- Time utilities: Timestamp handling (time.rs)
Webhook Modules
- auth.rs: HMAC signature and Bearer token verification
- content_parser.rs: JSON, XML, YAML, and text payload parsing
- template_engine.rs: Handlebars template processing with custom helpers
- route_matcher.rs: Route pattern matching and condition evaluation
Data Flow
Standard Mode:
- HTTP POST request arrives at Axum endpoint
- JSON deserialized to
HttpSourceChange - Converted to
drasi_core::models::SourceChange - Sent to adaptive batcher via mpsc channel
- Batcher accumulates events based on throughput
- Batch forwarded to dispatchers
- Dispatchers route to subscribed queries
Webhook Mode:
- HTTP request arrives at configured route
- Authentication verified (signature/bearer token)
- Payload parsed based on Content-Type
- Route matched and conditions evaluated
- Template engine renders element from payload
- Converted to
SourceChangeand dispatched - Dispatchers route to subscribed queries
Thread Safety
- All operations are async using Tokio
- Shared state protected by Arc<RwLock<_>>
- Channel-based communication for event flow
- Component status tracked with atomic updates