use std::collections::VecDeque;
use crate::features::storage::api as storage_api;
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IngestEvent {
InsertNode {
node_id: u64,
version: u64,
adjacency: Vec<u64>,
bitmap_terms: Vec<(String, String)>,
embedding_pending: bool,
},
AddEdgeDelta {
node_id: u64,
version: u64,
payload: Vec<u8>,
},
UpdateVectorDelta {
node_id: u64,
version: u64,
payload: Vec<u8>,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct IngestConfig {
pub max_queue_depth: usize,
pub max_batch_size: usize,
}
impl Default for IngestConfig {
fn default() -> Self {
Self {
max_queue_depth: 16384,
max_batch_size: 1024,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct IngestAck {
pub accepted: usize,
pub rejected: usize,
pub flushed: usize,
pub queue_depth: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IngestError {
InvalidInput(String),
Backpressure {
queue_depth: usize,
max_queue_depth: usize,
},
Storage(String),
}
pub type Result<T> = std::result::Result<T, IngestError>;
pub struct IngestPipeline {
config: IngestConfig,
queue: VecDeque<IngestEvent>,
}
impl IngestPipeline {
pub fn new(config: IngestConfig) -> Result<Self> {
if config.max_queue_depth == 0 {
return Err(IngestError::InvalidInput(
"max_queue_depth must be > 0".to_string(),
));
}
if config.max_batch_size == 0 {
return Err(IngestError::InvalidInput(
"max_batch_size must be > 0".to_string(),
));
}
Ok(Self {
config,
queue: VecDeque::new(),
})
}
pub fn queue_depth(&self) -> usize {
self.queue.len()
}
pub fn ingest_event(
&mut self,
handle: &mut storage_api::StorageHandle,
event: IngestEvent,
) -> Result<IngestAck> {
self.validate_event(&event)?;
self.push_event(event)?;
let flushed = self.flush_if_needed(handle)?;
Ok(IngestAck {
accepted: 1,
rejected: 0,
flushed,
queue_depth: self.queue_depth(),
})
}
pub fn ingest_batch(
&mut self,
handle: &mut storage_api::StorageHandle,
events: &[IngestEvent],
) -> Result<IngestAck> {
if self.can_use_edge_batch_fast_path(events) {
return self.ingest_batch_edge_fast_path(handle, events);
}
let mut accepted = 0usize;
let mut rejected = 0usize;
let mut flushed = 0usize;
for event in events {
self.validate_event(event)?;
if self.queue.len() >= self.config.max_queue_depth {
rejected += 1;
continue;
}
self.queue.push_back(event.clone());
accepted += 1;
while self.queue.len() >= self.config.max_batch_size {
flushed += self.flush_once(handle)?;
}
}
if accepted == 0 && rejected > 0 {
return Err(IngestError::Backpressure {
queue_depth: self.queue_depth(),
max_queue_depth: self.config.max_queue_depth,
});
}
Ok(IngestAck {
accepted,
rejected,
flushed,
queue_depth: self.queue_depth(),
})
}
pub fn ingest_edge_delta_range(
&mut self,
handle: &mut storage_api::StorageHandle,
start_node_id: u64,
count: u64,
version: u64,
payload_prefix: &str,
) -> Result<IngestAck> {
if version == 0 {
return Err(IngestError::InvalidInput(
"delta version must be > 0".to_string(),
));
}
let mut accepted = 0usize;
let rejected = 0usize;
let mut flushed = 0usize;
if !self.queue.is_empty() {
flushed += self.flush_all(handle)?;
}
let mut deltas: Vec<Vec<u8>> = Vec::with_capacity(self.config.max_batch_size);
for i in 0..count {
let payload = format!("{}-{}", payload_prefix, i).into_bytes();
let node_id = start_node_id + i;
deltas.push(storage_api::encode_delta(node_id, version, &payload));
accepted += 1;
if deltas.len() >= self.config.max_batch_size {
storage_api::put_edge_deltas_batch(handle, &deltas).map_err(storage_err)?;
flushed += deltas.len();
deltas.clear();
}
}
if !deltas.is_empty() {
storage_api::put_edge_deltas_batch(handle, &deltas).map_err(storage_err)?;
flushed += deltas.len();
}
Ok(IngestAck {
accepted,
rejected,
flushed,
queue_depth: self.queue_depth(),
})
}
fn can_use_edge_batch_fast_path(&self, events: &[IngestEvent]) -> bool {
if !self.queue.is_empty() {
return false;
}
if self.config.max_batch_size > self.config.max_queue_depth {
return false;
}
events
.iter()
.all(|event| matches!(event, IngestEvent::AddEdgeDelta { .. }))
}
fn ingest_batch_edge_fast_path(
&mut self,
handle: &mut storage_api::StorageHandle,
events: &[IngestEvent],
) -> Result<IngestAck> {
let mut accepted = 0usize;
let mut rejected = 0usize;
let mut flushed = 0usize;
let mut edge_batch: Vec<(u64, u64, Vec<u8>)> =
Vec::with_capacity(self.config.max_batch_size);
for event in events {
self.validate_event(event)?;
if self.queue.len() + edge_batch.len() >= self.config.max_queue_depth {
rejected += 1;
continue;
}
if let IngestEvent::AddEdgeDelta {
node_id,
version,
payload,
} = event
{
edge_batch.push((*node_id, *version, payload.clone()));
accepted += 1;
if edge_batch.len() >= self.config.max_batch_size {
let deltas = edge_batch
.iter()
.map(|(node_id, version, payload)| {
storage_api::encode_delta(*node_id, *version, payload)
})
.collect::<Vec<Vec<u8>>>();
storage_api::put_edge_deltas_batch(handle, &deltas).map_err(storage_err)?;
flushed += deltas.len();
edge_batch.clear();
}
}
}
for (node_id, version, payload) in edge_batch {
self.queue.push_back(IngestEvent::AddEdgeDelta {
node_id,
version,
payload,
});
}
if accepted == 0 && rejected > 0 {
return Err(IngestError::Backpressure {
queue_depth: self.queue_depth(),
max_queue_depth: self.config.max_queue_depth,
});
}
Ok(IngestAck {
accepted,
rejected,
flushed,
queue_depth: self.queue_depth(),
})
}
pub fn flush_all(&mut self, handle: &mut storage_api::StorageHandle) -> Result<usize> {
let mut total_flushed = 0usize;
while !self.queue.is_empty() {
total_flushed += self.flush_once(handle)?;
}
Ok(total_flushed)
}
fn validate_event(&self, event: &IngestEvent) -> Result<()> {
match event {
IngestEvent::InsertNode {
node_id: _,
version,
adjacency: _,
bitmap_terms,
embedding_pending: _,
} => {
if *version == 0 {
return Err(IngestError::InvalidInput(
"InsertNode version must be > 0".to_string(),
));
}
for (index_name, value_key) in bitmap_terms {
if index_name.trim().is_empty() || value_key.trim().is_empty() {
return Err(IngestError::InvalidInput(
"bitmap terms require non-empty index name and value key".to_string(),
));
}
}
}
IngestEvent::AddEdgeDelta {
node_id: _,
version,
payload,
}
| IngestEvent::UpdateVectorDelta {
node_id: _,
version,
payload,
} => {
if *version == 0 {
return Err(IngestError::InvalidInput(
"delta version must be > 0".to_string(),
));
}
if payload.is_empty() {
return Err(IngestError::InvalidInput(
"delta payload must not be empty".to_string(),
));
}
}
}
Ok(())
}
fn push_event(&mut self, event: IngestEvent) -> Result<()> {
if self.queue.len() >= self.config.max_queue_depth {
return Err(IngestError::Backpressure {
queue_depth: self.queue_depth(),
max_queue_depth: self.config.max_queue_depth,
});
}
self.queue.push_back(event);
Ok(())
}
fn flush_if_needed(&mut self, handle: &mut storage_api::StorageHandle) -> Result<usize> {
if self.queue.len() < self.config.max_batch_size {
return Ok(0);
}
self.flush_once(handle)
}
fn flush_once(&mut self, handle: &mut storage_api::StorageHandle) -> Result<usize> {
let take_count = self.config.max_batch_size.min(self.queue.len());
if take_count == 0 {
return Ok(0);
}
let mut batch = Vec::with_capacity(take_count);
for _ in 0..take_count {
if let Some(event) = self.queue.pop_front() {
batch.push(event);
}
}
let mut edge_deltas = Vec::new();
for event in batch {
match event {
IngestEvent::InsertNode {
node_id,
version,
adjacency,
bitmap_terms,
embedding_pending,
} => {
storage_api::put_full_node(handle, node_id, version, &adjacency)
.map_err(storage_err)?;
for (index_name, value_key) in bitmap_terms {
storage_api::bitmap_add_posting(handle, &index_name, &value_key, node_id)
.map_err(storage_err)?;
}
if embedding_pending {
storage_api::put_embedding_pending(handle, node_id)
.map_err(storage_err)?;
}
}
IngestEvent::AddEdgeDelta {
node_id,
version,
payload,
} => edge_deltas.push(storage_api::encode_delta(node_id, version, &payload)),
IngestEvent::UpdateVectorDelta {
node_id,
version,
payload,
} => {
let delta = storage_api::encode_delta(node_id, version, &payload);
storage_api::put_vector_delta(handle, &delta).map_err(storage_err)?;
}
}
}
if !edge_deltas.is_empty() {
storage_api::put_edge_deltas_batch(handle, &edge_deltas).map_err(storage_err)?;
}
Ok(take_count)
}
}
pub fn ingest_event(
pipeline: &mut IngestPipeline,
handle: &mut storage_api::StorageHandle,
event: IngestEvent,
) -> Result<IngestAck> {
pipeline.ingest_event(handle, event)
}
pub fn ingest_batch(
pipeline: &mut IngestPipeline,
handle: &mut storage_api::StorageHandle,
events: &[IngestEvent],
) -> Result<IngestAck> {
pipeline.ingest_batch(handle, events)
}
pub fn ingest_edge_delta_range(
pipeline: &mut IngestPipeline,
handle: &mut storage_api::StorageHandle,
start_node_id: u64,
count: u64,
version: u64,
payload_prefix: &str,
) -> Result<IngestAck> {
pipeline.ingest_edge_delta_range(handle, start_node_id, count, version, payload_prefix)
}
fn storage_err(err: storage_api::StorageError) -> IngestError {
IngestError::Storage(format!("{:?}", err))
}
#[cfg(test)]
mod tests;