Expand description
HTTP Source Plugin for Drasi
This plugin exposes HTTP endpoints for receiving data change events. It supports two mutually exclusive modes:
- Standard Mode: Fixed-format
HttpSourceChangeendpoints with adaptive batching - Webhook Mode: Configurable routes with template-based payload transformation
§Standard Mode
When no webhooks configuration is present, the source operates in standard mode
with the following endpoints:
POST /sources/{source_id}/events- Submit a single eventPOST /sources/{source_id}/events/batch- Submit multiple eventsGET /health- Health check endpoint
§Data Format
Events are submitted as JSON using the HttpSourceChange format:
§Insert Operation
{
"operation": "insert",
"element": {
"type": "node",
"id": "user-123",
"labels": ["User"],
"properties": {
"name": "Alice",
"email": "alice@example.com"
}
},
"timestamp": 1699900000000000000
}§Update Operation
{
"operation": "update",
"element": {
"type": "node",
"id": "user-123",
"labels": ["User"],
"properties": {
"name": "Alice Updated"
}
}
}§Delete Operation
{
"operation": "delete",
"id": "user-123",
"labels": ["User"]
}§Webhook Mode
When a webhooks section is present in the configuration, the source operates
in webhook mode. This enables:
- Custom routes with path parameters (e.g.,
/github/events,/users/:id/hooks) - Multiple HTTP methods per route (POST, PUT, PATCH, DELETE, GET)
- Handlebars template-based payload transformation
- HMAC signature verification (GitHub, Shopify style)
- Bearer token authentication
- Support for JSON, XML, YAML, and plain text payloads
§Webhook Configuration Example
webhooks:
error_behavior: accept_and_log
routes:
- path: "/github/events"
methods: ["POST"]
auth:
signature:
type: hmac-sha256
secret_env: GITHUB_WEBHOOK_SECRET
header: X-Hub-Signature-256
prefix: "sha256="
error_behavior: reject
mappings:
- when:
header: X-GitHub-Event
equals: push
operation: insert
element_type: node
effective_from: "{{payload.head_commit.timestamp}}"
template:
id: "commit-{{payload.head_commit.id}}"
labels: ["Commit"]
properties:
message: "{{payload.head_commit.message}}"
author: "{{payload.head_commit.author.name}}"§Relation Element
{
"operation": "insert",
"element": {
"type": "relation",
"id": "follows-1",
"labels": ["FOLLOWS"],
"from": "user-123",
"to": "user-456",
"properties": {}
}
}§Batch Submission
{
"events": [
{ "operation": "insert", ... },
{ "operation": "update", ... }
]
}§Adaptive Batching
The HTTP source includes adaptive batching to optimize throughput. Events are buffered and dispatched in batches, with batch size and timing adjusted based on throughput patterns.
| Parameter | Default | Description |
|---|---|---|
adaptive_enabled | true | Enable/disable adaptive batching |
adaptive_max_batch_size | 1000 | Maximum events per batch |
adaptive_min_batch_size | 1 | Minimum events per batch |
adaptive_max_wait_ms | 100 | Maximum wait time before dispatching |
adaptive_min_wait_ms | 10 | Minimum wait time between batches |
§Configuration
| Field | Type | Default | Description |
|---|---|---|---|
host | string | required | Host address to bind to |
port | u16 | 8080 | Port to listen on |
endpoint | string | None | Optional custom path prefix |
timeout_ms | u64 | 10000 | Request timeout in milliseconds |
§Example Configuration (YAML)
source_type: http
properties:
host: "0.0.0.0"
port: 8080
adaptive_enabled: true
adaptive_max_batch_size: 500§Usage Examples
§Rust
ⓘ
use drasi_source_http::{HttpSource, HttpSourceBuilder};
let config = HttpSourceBuilder::new()
.with_host("0.0.0.0")
.with_port(8080)
.with_adaptive_enabled(true)
.build();
let source = Arc::new(HttpSource::new("http-source", config)?);
drasi.add_source(source).await?;§curl (Single Event)
curl -X POST http://localhost:8080/sources/my-source/events \
-H "Content-Type: application/json" \
-d '{"operation":"insert","element":{"type":"node","id":"1","labels":["Test"],"properties":{}}}'§curl (Batch)
curl -X POST http://localhost:8080/sources/my-source/events/batch \
-H "Content-Type: application/json" \
-d '{"events":[...]}'Re-exports§
pub use config::HttpSourceConfig;
Modules§
- auth
- Authentication module for webhook requests.
- config
- Configuration for the HTTP source.
- content_
parser - Content parsing for webhook payloads.
- route_
matcher - Route matching and condition evaluation for webhooks.
- template_
engine - Template engine for webhook payload transformation.
Structs§
- Batch
Event Request - Batch event request that can accept multiple events
- Event
Response - Response for event submission
- Http
Source - HTTP source with configurable adaptive batching.
- Http
Source Builder - Builder for HttpSource instances.
Enums§
- Http
Element - Element that can be either a Node or Relation
- Http
Source Change - Data schema for HTTP source events
Functions§
- convert_
http_ to_ source_ change - Convert HttpSourceChange to drasi_core::models::SourceChange