# ingest
`src/features/ingest/api.rs`
The real-time ingestion pipeline. Buffers events in a bounded queue, validates them, and flushes batches to the storage engine. Provides backpressure when the queue is full and an optimized fast path for edge-delta-only batches.
---
## Setup
```rust
let config = IngestConfig::default(); // max_queue_depth: 16384, max_batch_size: 1024
let mut pipeline = IngestPipeline::new(config)?;
```
---
## IngestConfig
```rust
pub struct IngestConfig {
pub max_queue_depth: usize, // default: 16384
pub max_batch_size: usize, // default: 1024
}
```
`max_batch_size` controls how many events are flushed to storage in a single batch. `max_queue_depth` is the maximum number of events that may be buffered before backpressure kicks in.
---
## IngestPipeline
```rust
pub struct IngestPipeline { /* ... */ }
impl IngestPipeline {
pub fn new(config: IngestConfig) -> Result<Self>
pub fn queue_depth(&self) -> usize
pub fn ingest_event(&mut self, handle: &mut StorageHandle, event: IngestEvent) -> Result<IngestAck>
pub fn ingest_batch(&mut self, handle: &mut StorageHandle, events: &[IngestEvent]) -> Result<IngestAck>
pub fn ingest_edge_delta_range(&mut self, handle: &mut StorageHandle, start_node_id: u64, count: u64, version: u64, payload_prefix: &str) -> Result<IngestAck>
pub fn flush_all(&mut self, handle: &mut StorageHandle) -> Result<usize>
}
```
---
## Free Functions
These are thin wrappers over the `IngestPipeline` methods and are provided for ergonomic use without needing to call methods directly.
```rust
pub fn ingest_event(
pipeline: &mut IngestPipeline,
handle: &mut StorageHandle,
event: IngestEvent,
) -> Result<IngestAck>
```
```rust
pub fn ingest_batch(
pipeline: &mut IngestPipeline,
handle: &mut StorageHandle,
events: &[IngestEvent],
) -> Result<IngestAck>
```
```rust
pub fn ingest_edge_delta_range(
pipeline: &mut IngestPipeline,
handle: &mut StorageHandle,
start_node_id: u64,
count: u64,
version: u64,
payload_prefix: &str,
) -> Result<IngestAck>
```
Generates and writes `count` edge deltas for node IDs `[start_node_id, start_node_id + count)`, each with payload `"{payload_prefix}-{i}"`. Bypasses the queue and writes directly to storage in `max_batch_size` chunks. Useful for bulk loading.
---
## Event Types
```rust
pub enum IngestEvent {
InsertNode {
node_id: u64,
version: u64, // must be > 0
adjacency: Vec<u64>,
bitmap_terms: Vec<(String, String)>, // (index_name, value_key)
},
AddEdgeDelta {
node_id: u64,
version: u64, // must be > 0
payload: Vec<u8>, // must be non-empty
},
UpdateVectorDelta {
node_id: u64,
version: u64, // must be > 0
payload: Vec<u8>, // canonical: structured vector payload
},
}
```
**Validation rules:**
- All events: `version` must be > 0.
- `AddEdgeDelta` / `UpdateVectorDelta`: `payload` must be non-empty.
- `InsertNode`: each `bitmap_terms` entry must have a non-empty `index_name` and `value_key`.
For `UpdateVectorDelta`, new writes should use the structured vector payload contract produced by `storage::api::encode_vector_payload_f32(...)` or `storage::api::encode_vector_payload_quantized_i8(...)`. Legacy raw packed-`f32` payloads remain readable only while manifest compatibility is enabled, but should not be emitted by new ingest surfaces.
---
## Acknowledgement
```rust
pub struct IngestAck {
pub accepted: usize,
pub rejected: usize, // events dropped due to queue full
pub flushed: usize, // events written to storage in this call
pub queue_depth: usize, // current queue depth after the call
}
```
---
## Fast Path
When all events in a batch are `AddEdgeDelta` and the queue is empty, `ingest_batch` takes an optimized fast path that encodes and writes deltas directly without going through the event queue, reducing allocations and improving throughput.
---
## Errors
```rust
pub enum IngestError {
InvalidInput(String),
Backpressure { queue_depth: usize, max_queue_depth: usize },
Storage(String),
}
pub type Result<T> = std::result::Result<T, IngestError>;
```
`Backpressure` is returned when the entire batch is rejected (all events could not be accepted). Partial acceptance (some events rejected) is reflected in `IngestAck::rejected` without returning an error.