iridium-db 0.3.0

A high-performance vector-graph hybrid storage and indexing engine
use super::*;
use std::path::Path;
use std::path::PathBuf;

fn temp_dir(prefix: &str) -> PathBuf {
    let mut dir = std::env::temp_dir();
    let stamp = format!(
        "{}_{}_{}",
        prefix,
        std::process::id(),
        std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_nanos()
    );
    dir.push(stamp);
    std::fs::create_dir_all(&dir).expect("create temp dir");
    dir
}

fn open_handle(base: &Path) -> storage_api::StorageHandle {
    storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap()
}

#[test]
fn ingest_single_event_flushes_at_batch_size() {
    let base = temp_dir("ingest_single_flush");
    let mut handle = open_handle(&base);
    let mut pipeline = IngestPipeline::new(IngestConfig {
        max_queue_depth: 8,
        max_batch_size: 1,
    })
    .unwrap();

    let ack = ingest_event(
        &mut pipeline,
        &mut handle,
        IngestEvent::AddEdgeDelta {
            node_id: 10,
            version: 1,
            payload: b"edge".to_vec(),
        },
    )
    .unwrap();
    assert_eq!(ack.accepted, 1);
    assert_eq!(ack.flushed, 1);
    assert_eq!(pipeline.queue_depth(), 0);

    let logical = storage_api::get_logical_node(&mut handle, 10).unwrap();
    assert_eq!(logical.deltas.len(), 1);
}

#[test]
fn ingest_batch_applies_backpressure() {
    let base = temp_dir("ingest_backpressure");
    let mut handle = open_handle(&base);
    let mut pipeline = IngestPipeline::new(IngestConfig {
        max_queue_depth: 2,
        max_batch_size: 10,
    })
    .unwrap();

    let events = vec![
        IngestEvent::AddEdgeDelta {
            node_id: 1,
            version: 1,
            payload: b"a".to_vec(),
        },
        IngestEvent::AddEdgeDelta {
            node_id: 2,
            version: 1,
            payload: b"b".to_vec(),
        },
        IngestEvent::AddEdgeDelta {
            node_id: 3,
            version: 1,
            payload: b"c".to_vec(),
        },
    ];
    let ack = ingest_batch(&mut pipeline, &mut handle, &events).unwrap();
    assert_eq!(ack.accepted, 2);
    assert_eq!(ack.rejected, 1);
    assert_eq!(ack.flushed, 0);
}

#[test]
fn flush_all_applies_mixed_events() {
    let base = temp_dir("ingest_flush_all");
    let mut handle = open_handle(&base);
    let mut pipeline = IngestPipeline::new(IngestConfig {
        max_queue_depth: 16,
        max_batch_size: 16,
    })
    .unwrap();

    let events = vec![
        IngestEvent::InsertNode {
            node_id: 5,
            version: 1,
            adjacency: vec![6, 7],
            bitmap_terms: Vec::new(),
            embedding_pending: false,
        },
        IngestEvent::AddEdgeDelta {
            node_id: 5,
            version: 2,
            payload: b"delta".to_vec(),
        },
        IngestEvent::UpdateVectorDelta {
            node_id: 5,
            version: 3,
            payload: storage_api::encode_vector_payload_f32(
                1,
                storage_api::VectorMetric::Cosine,
                &[1.0, 0.0],
                false,
            ),
        },
    ];
    let ack = ingest_batch(&mut pipeline, &mut handle, &events).unwrap();
    assert_eq!(ack.accepted, 3);
    assert_eq!(ack.flushed, 0);

    let flushed = pipeline.flush_all(&mut handle).unwrap();
    assert_eq!(flushed, 3);

    let logical = storage_api::get_logical_node(&mut handle, 5).unwrap();
    assert!(logical.full.is_some());
    assert!(logical.deltas.len() >= 2);
}

#[test]
fn flush_all_applies_bitmap_terms_for_insert_nodes() {
    let base = temp_dir("ingest_bitmap_terms");
    let mut handle = open_handle(&base);
    storage_api::create_bitmap_index(&mut handle, "idx_country", "n.country").unwrap();
    storage_api::create_bitmap_index(&mut handle, "idx_risk", "n.risk").unwrap();

    let mut pipeline = IngestPipeline::new(IngestConfig {
        max_queue_depth: 16,
        max_batch_size: 16,
    })
    .unwrap();

    let events = vec![
        IngestEvent::InsertNode {
            node_id: 8,
            version: 1,
            adjacency: vec![9],
            bitmap_terms: vec![("idx_country".to_string(), "US".to_string())],
            embedding_pending: false,
        },
        IngestEvent::InsertNode {
            node_id: 9,
            version: 1,
            adjacency: vec![8],
            bitmap_terms: vec![
                ("idx_country".to_string(), "US".to_string()),
                ("idx_risk".to_string(), "high".to_string()),
            ],
            embedding_pending: false,
        },
    ];
    let ack = ingest_batch(&mut pipeline, &mut handle, &events).unwrap();
    assert_eq!(ack.accepted, 2);

    let flushed = pipeline.flush_all(&mut handle).unwrap();
    assert_eq!(flushed, 2);
    storage_api::flush(&mut handle).unwrap();

    assert_eq!(
        storage_api::bitmap_postings(&handle, "idx_country", "US"),
        vec![8, 9]
    );
    assert_eq!(
        storage_api::bitmap_postings(&handle, "idx_risk", "high"),
        vec![9]
    );
}

#[test]
fn ingest_insert_node_rejects_empty_bitmap_terms() {
    let base = temp_dir("ingest_bitmap_invalid");
    let mut handle = open_handle(&base);
    let mut pipeline = IngestPipeline::new(IngestConfig {
        max_queue_depth: 16,
        max_batch_size: 1,
    })
    .unwrap();

    let err = ingest_event(
        &mut pipeline,
        &mut handle,
        IngestEvent::InsertNode {
            node_id: 1,
            version: 1,
            adjacency: vec![2],
            bitmap_terms: vec![("".to_string(), "US".to_string())],
            embedding_pending: false,
        },
    )
    .unwrap_err();
    match err {
        IngestError::InvalidInput(msg) => assert!(msg.contains("bitmap terms require")),
        other => panic!("unexpected error: {:?}", other),
    }
}

#[test]
fn ingest_batch_flushes_multiple_threshold_batches() {
    let base = temp_dir("ingest_multi_threshold_flush");
    let mut handle = open_handle(&base);
    let mut pipeline = IngestPipeline::new(IngestConfig {
        max_queue_depth: 32,
        max_batch_size: 4,
    })
    .unwrap();

    let mut events = Vec::new();
    for node_id in 1..=10_u64 {
        events.push(IngestEvent::AddEdgeDelta {
            node_id,
            version: 1,
            payload: format!("e{}", node_id).into_bytes(),
        });
    }

    let ack = ingest_batch(&mut pipeline, &mut handle, &events).unwrap();
    assert_eq!(ack.accepted, 10);
    assert_eq!(ack.rejected, 0);
    assert_eq!(ack.flushed, 8);
    assert_eq!(pipeline.queue_depth(), 2);

    let flushed_rest = pipeline.flush_all(&mut handle).unwrap();
    assert_eq!(flushed_rest, 2);
}

#[test]
fn ingest_edge_delta_range_writes_all_edges() {
    let base = temp_dir("ingest_edge_range");
    let mut handle = open_handle(&base);
    let mut pipeline = IngestPipeline::new(IngestConfig {
        max_queue_depth: 16,
        max_batch_size: 4,
    })
    .unwrap();

    let ack = ingest_edge_delta_range(&mut pipeline, &mut handle, 100, 10, 1, "p").unwrap();
    assert_eq!(ack.accepted, 10);
    assert_eq!(ack.rejected, 0);
    assert_eq!(ack.flushed, 10);
    assert_eq!(ack.queue_depth, 0);

    for node_id in 100..110 {
        let logical = storage_api::get_logical_node(&mut handle, node_id).unwrap();
        assert_eq!(logical.deltas.len(), 1);
    }
}

#[test]
fn ingest_insert_node_with_embedding_pending_sets_flag() {
    let base = temp_dir("ingest_embedding_pending");
    let mut handle = open_handle(&base);
    let mut pipeline = IngestPipeline::new(IngestConfig {
        max_queue_depth: 8,
        max_batch_size: 1,
    })
    .unwrap();

    let ack = ingest_event(
        &mut pipeline,
        &mut handle,
        IngestEvent::InsertNode {
            node_id: 42,
            version: 1,
            adjacency: vec![43],
            bitmap_terms: Vec::new(),
            embedding_pending: true,
        },
    )
    .unwrap();
    assert_eq!(ack.accepted, 1);
    assert_eq!(ack.flushed, 1);

    // Node is graph-queryable.
    let logical = storage_api::get_logical_node(&mut handle, 42).unwrap();
    assert!(logical.full.is_some());

    // Pending flag is set.
    assert!(storage_api::node_is_embedding_pending(&handle, 42));
}

#[test]
fn ingest_update_vector_delta_clears_embedding_pending() {
    let base = temp_dir("ingest_pending_cleared");
    let mut handle = open_handle(&base);
    let mut pipeline = IngestPipeline::new(IngestConfig {
        max_queue_depth: 16,
        max_batch_size: 1,
    })
    .unwrap();

    ingest_event(
        &mut pipeline,
        &mut handle,
        IngestEvent::InsertNode {
            node_id: 77,
            version: 1,
            adjacency: vec![],
            bitmap_terms: Vec::new(),
            embedding_pending: true,
        },
    )
    .unwrap();
    assert!(storage_api::node_is_embedding_pending(&handle, 77));

    ingest_event(
        &mut pipeline,
        &mut handle,
        IngestEvent::UpdateVectorDelta {
            node_id: 77,
            version: 2,
            payload: storage_api::encode_vector_payload_f32(
                1,
                storage_api::VectorMetric::Cosine,
                &[1.0, 0.0],
                false,
            ),
        },
    )
    .unwrap();

    assert!(!storage_api::node_is_embedding_pending(&handle, 77));
}