Skip to main content

Crate drasi_source_http

Crate drasi_source_http 

Source
Expand description

HTTP Source Plugin for Drasi

This plugin exposes HTTP endpoints for receiving data change events. It supports two mutually exclusive modes:

  • Standard Mode: Fixed-format HttpSourceChange endpoints with adaptive batching
  • Webhook Mode: Configurable routes with template-based payload transformation

§Standard Mode

When no webhooks configuration is present, the source operates in standard mode with the following endpoints:

  • POST /sources/{source_id}/events - Submit a single event
  • POST /sources/{source_id}/events/batch - Submit multiple events
  • GET /health - Health check endpoint

§Data Format

Events are submitted as JSON using the HttpSourceChange format:

§Insert Operation

{
    "operation": "insert",
    "element": {
        "type": "node",
        "id": "user-123",
        "labels": ["User"],
        "properties": {
            "name": "Alice",
            "email": "alice@example.com"
        }
    },
    "timestamp": 1699900000000000000
}

§Update Operation

{
    "operation": "update",
    "element": {
        "type": "node",
        "id": "user-123",
        "labels": ["User"],
        "properties": {
            "name": "Alice Updated"
        }
    }
}

§Delete Operation

{
    "operation": "delete",
    "id": "user-123",
    "labels": ["User"]
}

§Webhook Mode

When a webhooks section is present in the configuration, the source operates in webhook mode. This enables:

  • Custom routes with path parameters (e.g., /github/events, /users/:id/hooks)
  • Multiple HTTP methods per route (POST, PUT, PATCH, DELETE, GET)
  • Handlebars template-based payload transformation
  • HMAC signature verification (GitHub, Shopify style)
  • Bearer token authentication
  • Support for JSON, XML, YAML, and plain text payloads

§Webhook Configuration Example

webhooks:
  error_behavior: accept_and_log
  routes:
    - path: "/github/events"
      methods: ["POST"]
      auth:
        signature:
          type: hmac-sha256
          secret_env: GITHUB_WEBHOOK_SECRET
          header: X-Hub-Signature-256
          prefix: "sha256="
      error_behavior: reject
      mappings:
        - when:
            header: X-GitHub-Event
            equals: push
          operation: insert
          element_type: node
          effective_from: "{{payload.head_commit.timestamp}}"
          template:
            id: "commit-{{payload.head_commit.id}}"
            labels: ["Commit"]
            properties:
              message: "{{payload.head_commit.message}}"
              author: "{{payload.head_commit.author.name}}"

§Relation Element

{
    "operation": "insert",
    "element": {
        "type": "relation",
        "id": "follows-1",
        "labels": ["FOLLOWS"],
        "from": "user-123",
        "to": "user-456",
        "properties": {}
    }
}

§Batch Submission

{
    "events": [
        { "operation": "insert", ... },
        { "operation": "update", ... }
    ]
}

§Adaptive Batching

The HTTP source includes adaptive batching to optimize throughput. Events are buffered and dispatched in batches, with batch size and timing adjusted based on throughput patterns.

ParameterDefaultDescription
adaptive_enabledtrueEnable/disable adaptive batching
adaptive_max_batch_size1000Maximum events per batch
adaptive_min_batch_size1Minimum events per batch
adaptive_max_wait_ms100Maximum wait time before dispatching
adaptive_min_wait_ms10Minimum wait time between batches

§Configuration

FieldTypeDefaultDescription
hoststringrequiredHost address to bind to
portu168080Port to listen on
endpointstringNoneOptional custom path prefix
timeout_msu6410000Request timeout in milliseconds

§Example Configuration (YAML)

source_type: http
properties:
  host: "0.0.0.0"
  port: 8080
  adaptive_enabled: true
  adaptive_max_batch_size: 500

§Usage Examples

§Rust

use drasi_source_http::{HttpSource, HttpSourceBuilder};

let config = HttpSourceBuilder::new()
    .with_host("0.0.0.0")
    .with_port(8080)
    .with_adaptive_enabled(true)
    .build();

let source = Arc::new(HttpSource::new("http-source", config)?);
drasi.add_source(source).await?;

§curl (Single Event)

curl -X POST http://localhost:8080/sources/my-source/events \
  -H "Content-Type: application/json" \
  -d '{"operation":"insert","element":{"type":"node","id":"1","labels":["Test"],"properties":{}}}'

§curl (Batch)

curl -X POST http://localhost:8080/sources/my-source/events/batch \
  -H "Content-Type: application/json" \
  -d '{"events":[...]}'

Re-exports§

pub use config::HttpSourceConfig;

Modules§

auth
Authentication module for webhook requests.
config
Configuration for the HTTP source.
content_parser
Content parsing for webhook payloads.
route_matcher
Route matching and condition evaluation for webhooks.
template_engine
Template engine for webhook payload transformation.

Structs§

BatchEventRequest
Batch event request that can accept multiple events
EventResponse
Response for event submission
HttpSource
HTTP source with configurable adaptive batching.
HttpSourceBuilder
Builder for HttpSource instances.

Enums§

HttpElement
Element that can be either a Node or Relation
HttpSourceChange
Data schema for HTTP source events

Functions§

convert_http_to_source_change
Convert HttpSourceChange to drasi_core::models::SourceChange