use super::*;
use std::path::Path;
use std::path::PathBuf;
fn temp_dir(prefix: &str) -> PathBuf {
let mut dir = std::env::temp_dir();
let stamp = format!(
"{}_{}_{}",
prefix,
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
dir.push(stamp);
std::fs::create_dir_all(&dir).expect("create temp dir");
dir
}
fn open_handle(base: &Path) -> storage_api::StorageHandle {
storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap()
}
#[test]
fn ingest_single_event_flushes_at_batch_size() {
let base = temp_dir("ingest_single_flush");
let mut handle = open_handle(&base);
let mut pipeline = IngestPipeline::new(IngestConfig {
max_queue_depth: 8,
max_batch_size: 1,
})
.unwrap();
let ack = ingest_event(
&mut pipeline,
&mut handle,
IngestEvent::AddEdgeDelta {
node_id: 10,
version: 1,
payload: b"edge".to_vec(),
},
)
.unwrap();
assert_eq!(ack.accepted, 1);
assert_eq!(ack.flushed, 1);
assert_eq!(pipeline.queue_depth(), 0);
let logical = storage_api::get_logical_node(&mut handle, 10).unwrap();
assert_eq!(logical.deltas.len(), 1);
}
#[test]
fn ingest_batch_applies_backpressure() {
let base = temp_dir("ingest_backpressure");
let mut handle = open_handle(&base);
let mut pipeline = IngestPipeline::new(IngestConfig {
max_queue_depth: 2,
max_batch_size: 10,
})
.unwrap();
let events = vec![
IngestEvent::AddEdgeDelta {
node_id: 1,
version: 1,
payload: b"a".to_vec(),
},
IngestEvent::AddEdgeDelta {
node_id: 2,
version: 1,
payload: b"b".to_vec(),
},
IngestEvent::AddEdgeDelta {
node_id: 3,
version: 1,
payload: b"c".to_vec(),
},
];
let ack = ingest_batch(&mut pipeline, &mut handle, &events).unwrap();
assert_eq!(ack.accepted, 2);
assert_eq!(ack.rejected, 1);
assert_eq!(ack.flushed, 0);
}
#[test]
fn flush_all_applies_mixed_events() {
let base = temp_dir("ingest_flush_all");
let mut handle = open_handle(&base);
let mut pipeline = IngestPipeline::new(IngestConfig {
max_queue_depth: 16,
max_batch_size: 16,
})
.unwrap();
let events = vec![
IngestEvent::InsertNode {
node_id: 5,
version: 1,
adjacency: vec![6, 7],
bitmap_terms: Vec::new(),
embedding_pending: false,
},
IngestEvent::AddEdgeDelta {
node_id: 5,
version: 2,
payload: b"delta".to_vec(),
},
IngestEvent::UpdateVectorDelta {
node_id: 5,
version: 3,
payload: storage_api::encode_vector_payload_f32(
1,
storage_api::VectorMetric::Cosine,
&[1.0, 0.0],
false,
),
},
];
let ack = ingest_batch(&mut pipeline, &mut handle, &events).unwrap();
assert_eq!(ack.accepted, 3);
assert_eq!(ack.flushed, 0);
let flushed = pipeline.flush_all(&mut handle).unwrap();
assert_eq!(flushed, 3);
let logical = storage_api::get_logical_node(&mut handle, 5).unwrap();
assert!(logical.full.is_some());
assert!(logical.deltas.len() >= 2);
}
#[test]
fn flush_all_applies_bitmap_terms_for_insert_nodes() {
let base = temp_dir("ingest_bitmap_terms");
let mut handle = open_handle(&base);
storage_api::create_bitmap_index(&mut handle, "idx_country", "n.country").unwrap();
storage_api::create_bitmap_index(&mut handle, "idx_risk", "n.risk").unwrap();
let mut pipeline = IngestPipeline::new(IngestConfig {
max_queue_depth: 16,
max_batch_size: 16,
})
.unwrap();
let events = vec![
IngestEvent::InsertNode {
node_id: 8,
version: 1,
adjacency: vec![9],
bitmap_terms: vec![("idx_country".to_string(), "US".to_string())],
embedding_pending: false,
},
IngestEvent::InsertNode {
node_id: 9,
version: 1,
adjacency: vec![8],
bitmap_terms: vec![
("idx_country".to_string(), "US".to_string()),
("idx_risk".to_string(), "high".to_string()),
],
embedding_pending: false,
},
];
let ack = ingest_batch(&mut pipeline, &mut handle, &events).unwrap();
assert_eq!(ack.accepted, 2);
let flushed = pipeline.flush_all(&mut handle).unwrap();
assert_eq!(flushed, 2);
storage_api::flush(&mut handle).unwrap();
assert_eq!(
storage_api::bitmap_postings(&handle, "idx_country", "US"),
vec![8, 9]
);
assert_eq!(
storage_api::bitmap_postings(&handle, "idx_risk", "high"),
vec![9]
);
}
#[test]
fn ingest_insert_node_rejects_empty_bitmap_terms() {
let base = temp_dir("ingest_bitmap_invalid");
let mut handle = open_handle(&base);
let mut pipeline = IngestPipeline::new(IngestConfig {
max_queue_depth: 16,
max_batch_size: 1,
})
.unwrap();
let err = ingest_event(
&mut pipeline,
&mut handle,
IngestEvent::InsertNode {
node_id: 1,
version: 1,
adjacency: vec![2],
bitmap_terms: vec![("".to_string(), "US".to_string())],
embedding_pending: false,
},
)
.unwrap_err();
match err {
IngestError::InvalidInput(msg) => assert!(msg.contains("bitmap terms require")),
other => panic!("unexpected error: {:?}", other),
}
}
#[test]
fn ingest_batch_flushes_multiple_threshold_batches() {
let base = temp_dir("ingest_multi_threshold_flush");
let mut handle = open_handle(&base);
let mut pipeline = IngestPipeline::new(IngestConfig {
max_queue_depth: 32,
max_batch_size: 4,
})
.unwrap();
let mut events = Vec::new();
for node_id in 1..=10_u64 {
events.push(IngestEvent::AddEdgeDelta {
node_id,
version: 1,
payload: format!("e{}", node_id).into_bytes(),
});
}
let ack = ingest_batch(&mut pipeline, &mut handle, &events).unwrap();
assert_eq!(ack.accepted, 10);
assert_eq!(ack.rejected, 0);
assert_eq!(ack.flushed, 8);
assert_eq!(pipeline.queue_depth(), 2);
let flushed_rest = pipeline.flush_all(&mut handle).unwrap();
assert_eq!(flushed_rest, 2);
}
#[test]
fn ingest_edge_delta_range_writes_all_edges() {
let base = temp_dir("ingest_edge_range");
let mut handle = open_handle(&base);
let mut pipeline = IngestPipeline::new(IngestConfig {
max_queue_depth: 16,
max_batch_size: 4,
})
.unwrap();
let ack = ingest_edge_delta_range(&mut pipeline, &mut handle, 100, 10, 1, "p").unwrap();
assert_eq!(ack.accepted, 10);
assert_eq!(ack.rejected, 0);
assert_eq!(ack.flushed, 10);
assert_eq!(ack.queue_depth, 0);
for node_id in 100..110 {
let logical = storage_api::get_logical_node(&mut handle, node_id).unwrap();
assert_eq!(logical.deltas.len(), 1);
}
}
#[test]
fn ingest_insert_node_with_embedding_pending_sets_flag() {
let base = temp_dir("ingest_embedding_pending");
let mut handle = open_handle(&base);
let mut pipeline = IngestPipeline::new(IngestConfig {
max_queue_depth: 8,
max_batch_size: 1,
})
.unwrap();
let ack = ingest_event(
&mut pipeline,
&mut handle,
IngestEvent::InsertNode {
node_id: 42,
version: 1,
adjacency: vec![43],
bitmap_terms: Vec::new(),
embedding_pending: true,
},
)
.unwrap();
assert_eq!(ack.accepted, 1);
assert_eq!(ack.flushed, 1);
let logical = storage_api::get_logical_node(&mut handle, 42).unwrap();
assert!(logical.full.is_some());
assert!(storage_api::node_is_embedding_pending(&handle, 42));
}
#[test]
fn ingest_update_vector_delta_clears_embedding_pending() {
let base = temp_dir("ingest_pending_cleared");
let mut handle = open_handle(&base);
let mut pipeline = IngestPipeline::new(IngestConfig {
max_queue_depth: 16,
max_batch_size: 1,
})
.unwrap();
ingest_event(
&mut pipeline,
&mut handle,
IngestEvent::InsertNode {
node_id: 77,
version: 1,
adjacency: vec![],
bitmap_terms: Vec::new(),
embedding_pending: true,
},
)
.unwrap();
assert!(storage_api::node_is_embedding_pending(&handle, 77));
ingest_event(
&mut pipeline,
&mut handle,
IngestEvent::UpdateVectorDelta {
node_id: 77,
version: 2,
payload: storage_api::encode_vector_payload_f32(
1,
storage_api::VectorMetric::Cosine,
&[1.0, 0.0],
false,
),
},
)
.unwrap();
assert!(!storage_api::node_is_embedding_pending(&handle, 77));
}