drasi-source-http 0.1.1

HTTP source plugin for Drasi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
# HTTP Source

A Drasi source plugin that exposes HTTP endpoints for receiving data change events. It provides both single-event and batch submission modes with adaptive batching for optimized throughput.

## Overview

The HTTP Source is a plugin for the Drasi continuous query system that allows applications to submit graph data changes (nodes and relations) via REST API endpoints. It features:

- **REST API Endpoints**: Simple HTTP POST interface for submitting events
- **Adaptive Batching**: Automatically adjusts batch size and timing based on throughput patterns
- **Dual Submission Modes**: Single-event and batch endpoints for different use cases
- **Universal Bootstrap**: Supports any bootstrap provider (PostgreSQL, ScriptFile, Platform, etc.)
- **Graph Data Model**: Native support for nodes and relations with labels and properties
- **Flexible Configuration**: Builder pattern or configuration struct approaches

### Key Capabilities

- Submit graph data changes via HTTP POST requests
- Automatic batch optimization based on traffic patterns
- Label-based query subscriptions and filtering
- Bootstrap initial data from external sources while streaming continues
- Health check endpoint for monitoring
- Configurable timeouts and dispatch modes

### Use Cases

- **Real-time Event Streaming**: External systems push change events to Drasi
- **Hybrid Data Loading**: Bootstrap from database, then stream changes via HTTP
- **Webhook Integration**: Receive webhook notifications as graph events
- **Manual Testing**: Submit test data via curl during development
- **API Integration**: Connect third-party services to Drasi continuous queries

## Configuration

### Builder Pattern (Recommended)

The builder pattern provides a fluent, type-safe API for constructing HTTP sources:

```rust
use drasi_source_http::HttpSource;

// Basic HTTP source
let source = HttpSource::builder("my-source")
    .with_host("0.0.0.0")
    .with_port(8080)
    .with_auto_start(true)
    .build()?;

// With adaptive batching tuning
let source = HttpSource::builder("high-throughput-source")
    .with_host("0.0.0.0")
    .with_port(9000)
    .with_adaptive_max_batch_size(2000)
    .with_adaptive_min_batch_size(50)
    .with_adaptive_max_wait_ms(200)
    .with_adaptive_enabled(true)
    .build()?;

// With custom dispatch settings
let source = HttpSource::builder("custom-source")
    .with_host("localhost")
    .with_port(8080)
    .with_dispatch_mode(DispatchMode::Channel)
    .with_dispatch_buffer_capacity(5000)
    .build()?;

// With bootstrap provider
let source = HttpSource::builder("bootstrapped-source")
    .with_host("0.0.0.0")
    .with_port(8080)
    .with_bootstrap_provider(postgres_provider)
    .build()?;
```

### Configuration Struct Approach

Alternatively, use `HttpSourceConfig` directly:

```rust
use drasi_source_http::{HttpSource, HttpSourceConfig};

let config = HttpSourceConfig {
    host: "0.0.0.0".to_string(),
    port: 8080,
    endpoint: None,
    timeout_ms: 10000,
    adaptive_max_batch_size: Some(1000),
    adaptive_min_batch_size: Some(10),
    adaptive_max_wait_ms: Some(100),
    adaptive_min_wait_ms: Some(1),
    adaptive_window_secs: Some(5),
    adaptive_enabled: Some(true),
};

let source = HttpSource::new("my-source", config)?;
```

### YAML Configuration (DrasiServer)

When using DrasiServer, configure HTTP sources via YAML:

```yaml
sources:
  - id: "my-http-source"
    source_type: "http"
    auto_start: true
    host: "0.0.0.0"
    port: 8080
    endpoint: "/events"
    timeout_ms: 30000
    adaptive_enabled: true
    adaptive_max_batch_size: 1000
    adaptive_min_batch_size: 10
    adaptive_max_wait_ms: 100
    adaptive_min_wait_ms: 1
    adaptive_window_secs: 5
```

## Configuration Options

### Core Settings

| Name | Description | Data Type | Valid Values | Default |
|------|-------------|-----------|--------------|---------|
| `host` | HTTP server host address to bind to | String | Any valid hostname or IP | **Required** |
| `port` | HTTP server port number | u16 | 1-65535 | 8080 |
| `endpoint` | Optional custom endpoint path | Option<String> | Any valid path | None |
| `timeout_ms` | Request timeout in milliseconds | u64 | Any positive integer | 10000 |
| `auto_start` | Whether to start automatically when added to DrasiLib | bool | `true`, `false` | `true` |

### Adaptive Batching Settings

| Name | Description | Data Type | Valid Values | Default |
|------|-------------|-----------|--------------|---------|
| `adaptive_enabled` | Enable/disable adaptive batching | Option<bool> | true, false | true |
| `adaptive_max_batch_size` | Maximum events per batch | Option<usize> | Any positive integer | 1000 |
| `adaptive_min_batch_size` | Minimum events per batch | Option<usize> | Any positive integer | 10 |
| `adaptive_max_wait_ms` | Maximum wait time before dispatching (ms) | Option<u64> | Any positive integer | 100 |
| `adaptive_min_wait_ms` | Minimum wait time between batches (ms) | Option<u64> | Any positive integer | 1 |
| `adaptive_window_secs` | Throughput measurement window (seconds) | Option<u64> | Any positive integer | 5 |

### Dispatch Settings (Builder Only)

| Name | Description | Data Type | Valid Values | Default |
|------|-------------|-----------|--------------|---------|
| `dispatch_mode` | Event routing mode | DispatchMode | Channel, Broadcast | Channel |
| `dispatch_buffer_capacity` | Buffer size for dispatch channel | usize | Any positive integer | 1000 |

### Bootstrap Settings (Builder Only)

| Name | Description | Data Type | Valid Values | Default |
|------|-------------|-----------|--------------|---------|
| `bootstrap_provider` | Bootstrap provider for initial data | Box<dyn BootstrapProvider> | Any provider implementation | None |

## Input Schema

The HTTP source accepts JSON data in the `HttpSourceChange` format. All events use a tagged union structure with an `operation` field.

### Insert Operation (Node)

```json
{
  "operation": "insert",
  "element": {
    "type": "node",
    "id": "user-123",
    "labels": ["User", "Customer"],
    "properties": {
      "name": "Alice",
      "email": "alice@example.com",
      "age": 30,
      "active": true
    }
  },
  "timestamp": 1699900000000000000
}
```

### Insert Operation (Relation)

```json
{
  "operation": "insert",
  "element": {
    "type": "relation",
    "id": "follows-1",
    "labels": ["FOLLOWS"],
    "from": "user-123",
    "to": "user-456",
    "properties": {
      "since": "2024-01-01",
      "weight": 1.0
    }
  }
}
```

### Update Operation

```json
{
  "operation": "update",
  "element": {
    "type": "node",
    "id": "user-123",
    "labels": ["User", "Premium"],
    "properties": {
      "name": "Alice Updated",
      "membership": "premium"
    }
  },
  "timestamp": 1699900001000000000
}
```

### Delete Operation

```json
{
  "operation": "delete",
  "id": "user-123",
  "labels": ["User"],
  "timestamp": 1699900002000000000
}
```

### Batch Submission

```json
{
  "events": [
    {
      "operation": "insert",
      "element": {
        "type": "node",
        "id": "1",
        "labels": ["Test"],
        "properties": {}
      }
    },
    {
      "operation": "insert",
      "element": {
        "type": "node",
        "id": "2",
        "labels": ["Test"],
        "properties": {}
      }
    }
  ]
}
```

### Field Descriptions

- **operation**: Must be "insert", "update", or "delete"
- **element**: The graph element (node or relation)
  - **type**: Must be "node" or "relation"
  - **id**: Unique identifier for the element
  - **labels**: Array of label strings (e.g., ["User"], ["FOLLOWS"])
  - **properties**: JSON object with arbitrary key-value pairs
  - **from**: (Relations only) Source node ID
  - **to**: (Relations only) Target node ID
- **timestamp**: Optional nanoseconds since Unix epoch (auto-generated if omitted)

## Usage Examples

### Starting an HTTP Source

```rust
use drasi_source_http::HttpSource;
use drasi_lib::Source;

// Create and start the source
let source = HttpSource::builder("my-source")
    .with_host("0.0.0.0")
    .with_port(8080)
    .build()?;

source.start().await?;

// Check status
assert_eq!(source.status().await, ComponentStatus::Running);
```

### Submitting Events via curl

```bash
# Submit single node
curl -X POST http://localhost:8080/sources/my-source/events \
  -H "Content-Type: application/json" \
  -d '{
    "operation": "insert",
    "element": {
      "type": "node",
      "id": "user-1",
      "labels": ["User"],
      "properties": {"name": "Alice"}
    }
  }'

# Submit batch of events
curl -X POST http://localhost:8080/sources/my-source/events/batch \
  -H "Content-Type: application/json" \
  -d '{
    "events": [
      {
        "operation": "insert",
        "element": {
          "type": "node",
          "id": "user-1",
          "labels": ["User"],
          "properties": {"name": "Alice"}
        }
      },
      {
        "operation": "insert",
        "element": {
          "type": "node",
          "id": "user-2",
          "labels": ["User"],
          "properties": {"name": "Bob"}
        }
      }
    ]
  }'

# Health check
curl http://localhost:8080/health
```

### Python Example

```python
import requests
import json

# Submit a node
event = {
    "operation": "insert",
    "element": {
        "type": "node",
        "id": "sensor-42",
        "labels": ["Sensor", "IoT"],
        "properties": {
            "temperature": 72.5,
            "location": "Building A",
            "active": True
        }
    }
}

response = requests.post(
    "http://localhost:8080/sources/my-source/events",
    json=event
)

print(response.json())
# {"success": true, "message": "All 1 events processed successfully"}
```

### JavaScript/Node.js Example

```javascript
const axios = require('axios');

async function submitEvent() {
  const event = {
    operation: 'insert',
    element: {
      type: 'node',
      id: 'product-123',
      labels: ['Product'],
      properties: {
        name: 'Widget',
        price: 29.99,
        inStock: true
      }
    }
  };

  const response = await axios.post(
    'http://localhost:8080/sources/my-source/events',
    event
  );

  console.log(response.data);
}
```

### Complete Integration Example

```rust
use drasi_source_http::HttpSource;
use drasi_lib::{DrasiLib, Query};

#[tokio::main]
async fn main() -> Result<()> {
    // Create HTTP source
    let http_source = HttpSource::builder("http-source")
        .with_host("0.0.0.0")
        .with_port(8080)
        .with_adaptive_enabled(true)
        .build()?;

    // Create continuous query
    let query = Query::cypher("active-users")
        .query("MATCH (u:User) WHERE u.active = true RETURN u.name")
        .from_source("http-source")
        .build();

    // Initialize Drasi
    let drasi = DrasiLib::new()
        .with_source(http_source)
        .with_query(query)
        .build()
        .await?;

    // Start processing
    drasi.start().await?;

    // Now submit events via HTTP POST to localhost:8080
    // Query results will update automatically as events arrive

    Ok(())
}
```

## Endpoints

The HTTP source exposes the following endpoints:

| Method | Path | Description |
|--------|------|-------------|
| POST | `/sources/{source_id}/events` | Submit a single event |
| POST | `/sources/{source_id}/events/batch` | Submit multiple events |
| GET | `/health` | Health check (returns service status and features) |

### Response Format

Success:
```json
{
  "success": true,
  "message": "All 2 events processed successfully",
  "error": null
}
```

Partial success (batch):
```json
{
  "success": true,
  "message": "Processed 8 events successfully, 2 failed",
  "error": "Invalid element type"
}
```

Error:
```json
{
  "success": false,
  "message": "All 1 events failed",
  "error": "Source name mismatch"
}
```

## Adaptive Batching

The HTTP source includes intelligent batching that automatically adjusts based on throughput:

### Throughput Levels

| Level | Messages/Second | Batch Size | Wait Time |
|-------|----------------|------------|-----------|
| Idle | < 1 | Minimum | Minimum (1ms) |
| Low | 1-100 | Small (2x min) | 1ms |
| Medium | 100-1,000 | Moderate (25% of max) | 10ms |
| High | 1,000-10,000 | Large (50% of max) | 25ms |
| Burst | > 10,000 | Maximum | 50ms |

### Tuning Guidelines

**For low latency** (real-time dashboards):
```rust
.with_adaptive_max_wait_ms(10)
.with_adaptive_min_batch_size(1)
```

**For high throughput** (bulk data ingestion):
```rust
.with_adaptive_max_batch_size(5000)
.with_adaptive_max_wait_ms(500)
```

**To disable adaptive batching**:
```rust
.with_adaptive_enabled(false)
```

## Bootstrap Providers

The HTTP source supports universal bootstrap - any bootstrap provider can be used to load initial data before streaming begins.

### Common Patterns

**Bootstrap from PostgreSQL, stream via HTTP**:
```rust
let postgres_provider = PostgresBootstrapProvider::new(config)?;

let source = HttpSource::builder("http-source")
    .with_host("0.0.0.0")
    .with_port(8080)
    .with_bootstrap_provider(postgres_provider)
    .build()?;
```

**Bootstrap from files**:
```rust
let file_provider = ScriptFileBootstrapProvider::new(file_paths)?;

let source = HttpSource::builder("http-source")
    .with_host("0.0.0.0")
    .with_port(8080)
    .with_bootstrap_provider(file_provider)
    .build()?;
```

### Bootstrap Behavior

- Bootstrap runs asynchronously in a separate task
- Streaming events are processed immediately
- Queries receive both bootstrap and streaming data
- Bootstrap provider properties are passed via the source's generic properties map

## Error Handling

### Common Errors

**Port already in use**:
```
Failed to bind HTTP server to 0.0.0.0:8080: Address already in use
```
Solution: Change port or stop conflicting service

**Invalid JSON**:
```json
{"success": false, "message": "Failed to parse JSON", "error": "..."}
```
Solution: Validate JSON structure against schema

**Source name mismatch**:
```json
{"success": false, "message": "Source name mismatch", "error": "Expected 'my-source', got 'wrong-source'"}
```
Solution: Ensure URL path matches source ID

**Validation errors**:
- Port cannot be 0
- Timeout cannot be 0
- Min batch size cannot exceed max batch size
- Min wait time cannot exceed max wait time

## Testing

### Unit Tests

```bash
# Run all HTTP source tests
cargo test -p drasi-source-http

# Run specific test module
cargo test -p drasi-source-http construction

# Run with logging
RUST_LOG=debug cargo test -p drasi-source-http -- --nocapture
```

### Integration Testing

```bash
# Start test server
cargo run --example http_source_example

# In another terminal, submit test events
curl -X POST http://localhost:8080/sources/test-source/events \
  -H "Content-Type: application/json" \
  -d '{"operation":"insert","element":{"type":"node","id":"1","labels":["Test"],"properties":{}}}'
```

## Performance Considerations

### Channel Capacity

The internal batch channel capacity is automatically calculated as `max_batch_size × 5`:

| Max Batch Size | Channel Capacity | Memory (1KB/event) |
|----------------|------------------|--------------------|
| 100 | 500 | ~500 KB |
| 1,000 | 5,000 | ~5 MB |
| 5,000 | 25,000 | ~25 MB |

### Dispatch Modes

- **Channel** (default): Isolated channels per subscriber with backpressure, zero message loss
- **Broadcast**: Shared channel, no backpressure, possible message loss under high load

### Best Practices

1. **Use batch endpoint** for bulk operations (reduces HTTP overhead)
2. **Enable adaptive batching** for variable traffic patterns
3. **Tune batch sizes** based on your throughput requirements
4. **Monitor health endpoint** for production deployments
5. **Use Channel dispatch mode** when message reliability is critical

## Architecture Notes

### Internal Structure

- **HttpSource**: Main plugin implementation (lib.rs)
- **HttpSourceConfig**: Configuration struct (config.rs)
- **HttpElement/HttpSourceChange**: Event models (models.rs)
- **AdaptiveBatcher**: Batching logic (adaptive_batcher.rs)
- **Time utilities**: Timestamp handling (time.rs)

### Data Flow

1. HTTP POST request arrives at Axum endpoint
2. JSON deserialized to `HttpSourceChange`
3. Converted to `drasi_core::models::SourceChange`
4. Sent to adaptive batcher via mpsc channel
5. Batcher accumulates events based on throughput
6. Batch forwarded to dispatchers
7. Dispatchers route to subscribed queries

### Thread Safety

- All operations are async using Tokio
- Shared state protected by Arc<RwLock<_>>
- Channel-based communication for event flow
- Component status tracked with atomic updates

## See Also

- [Drasi Core Documentation]../../README.md
- [Source Plugin API]../../../lib/src/sources/traits.rs
- [Bootstrap Providers]../../../lib/src/bootstrap/
- [Continuous Queries]../../../query-cypher/