iridium-db 0.4.0

A high-performance vector-graph hybrid storage and indexing engine
use super::*;

fn open(base: &std::path::Path) -> StorageHandle {
    open_store(StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    })
    .unwrap()
}

// ── 3a: put_vector_delta_batch ─────────────────────────────────────────────

/// Batch-writing N vectors makes them all searchable via HNSW.
#[test]
fn batch_write_all_vectors_searchable() {
    let base = temp_dir("batch_all_searchable");
    let mut handle = open(&base);

    let space_id = 1_u32;
    let n = 10_usize;
    let mut deltas = Vec::new();
    for i in 0..n {
        put_full_node(&mut handle, i as u64, 1, &[]).unwrap();
        let v = vec![i as f32, 0.0, 0.0];
        let payload = encode_vector_payload_f32(space_id, VectorMetric::Cosine, &v, false);
        deltas.push(encode_delta(i as u64, 2, &payload));
    }

    put_vector_delta_batch(&mut handle, &deltas).unwrap();

    // All nodes should appear in HNSW.
    let results = hnsw_search_in_space(&handle, space_id, &[9.0, 0.0, 0.0], n);
    assert_eq!(
        results.len(),
        n,
        "all {n} vectors should be searchable after batch write"
    );
}

/// WAL replay after a batch write restores all vectors into HNSW.
#[test]
fn batch_write_wal_replay_restores_all_vectors() {
    let base = temp_dir("batch_wal_replay");
    let space_id = 2_u32;

    {
        let mut handle = open(&base);
        let n = 5_usize;
        let mut deltas = Vec::new();
        for i in 0..n {
            put_full_node(&mut handle, i as u64, 1, &[]).unwrap();
            let v = vec![i as f32, 0.0];
            let payload = encode_vector_payload_f32(space_id, VectorMetric::Cosine, &v, false);
            deltas.push(encode_delta(i as u64, 2, &payload));
        }
        put_vector_delta_batch(&mut handle, &deltas).unwrap();
    }

    let mut handle = open(&base);
    recover_from_wal(&mut handle).unwrap();

    let results = hnsw_search_in_space(&handle, space_id, &[4.0, 0.0], 10);
    assert_eq!(
        results.len(),
        5,
        "all 5 vectors must survive WAL replay after batch write"
    );
}

/// Batch write clears embedding_pending for all nodes in the batch.
#[test]
fn batch_write_clears_embedding_pending_flags() {
    let base = temp_dir("batch_clears_pending");
    let mut handle = open(&base);

    let space_id = 3_u32;
    for i in 0..3_u64 {
        put_full_node(&mut handle, i, 1, &[]).unwrap();
        put_embedding_pending(&mut handle, i).unwrap();
        assert!(node_is_embedding_pending(&handle, i));
    }

    let mut deltas = Vec::new();
    for i in 0..3_u64 {
        let payload =
            encode_vector_payload_f32(space_id, VectorMetric::Cosine, &[i as f32, 0.0], false);
        deltas.push(encode_delta(i, 2, &payload));
    }
    put_vector_delta_batch(&mut handle, &deltas).unwrap();

    for i in 0..3_u64 {
        assert!(
            !node_is_embedding_pending(&handle, i),
            "embedding_pending flag for node {i} must be cleared after batch write"
        );
    }
}

/// Empty batch is a no-op.
#[test]
fn batch_write_empty_is_noop() {
    let base = temp_dir("batch_empty");
    let mut handle = open(&base);
    put_vector_delta_batch(&mut handle, &[]).unwrap();
}

// ── 3b: maintenance suspension ─────────────────────────────────────────────

/// While a space is suspended, the HNSW scheduler does not record a rebuild reason.
#[test]
fn suspended_space_suppresses_scheduler() {
    let base = temp_dir("suspend_suppresses_scheduler");
    let mut handle = open(&base);
    let space_id = 10_u32;

    // Threshold is 5% of total. Insert enough to exceed it.
    let n = 200_usize;
    for i in 0..n {
        put_full_node(&mut handle, i as u64, 1, &[]).unwrap();
    }

    suspend_hnsw_maintenance(&mut handle, space_id);

    let mut deltas = Vec::new();
    for i in 0..n {
        let payload =
            encode_vector_payload_f32(space_id, VectorMetric::Cosine, &[i as f32, 0.0], false);
        deltas.push(encode_delta(i as u64, 2, &payload));
    }
    put_vector_delta_batch(&mut handle, &deltas).unwrap();

    assert!(
        handle.last_hnsw_rebuild_reason.is_none(),
        "scheduler must not fire for a suspended space"
    );
}

/// resume_hnsw_maintenance re-enables the scheduler for the space.
#[test]
fn resume_reenables_scheduler() {
    let base = temp_dir("resume_reenables");
    let mut handle = open(&base);
    let space_id = 11_u32;

    let n = 200_usize;
    for i in 0..n {
        put_full_node(&mut handle, i as u64, 1, &[]).unwrap();
    }

    suspend_hnsw_maintenance(&mut handle, space_id);
    let mut deltas: Vec<Vec<u8>> = Vec::new();
    for i in 0..n {
        let payload =
            encode_vector_payload_f32(space_id, VectorMetric::Cosine, &[i as f32, 0.0], false);
        deltas.push(encode_delta(i as u64, 2, &payload));
    }
    put_vector_delta_batch(&mut handle, &deltas).unwrap();
    assert!(handle.last_hnsw_rebuild_reason.is_none());

    // After resume, insert enough to trigger the scheduler.
    resume_hnsw_maintenance(&mut handle, space_id);
    let extra_payload =
        encode_vector_payload_f32(space_id, VectorMetric::Cosine, &[999.0, 0.0], false);
    let extra_delta = encode_delta(9999, 2, &extra_payload);
    put_full_node(&mut handle, 9999, 1, &[]).unwrap();
    put_vector_delta(&mut handle, &extra_delta).unwrap();

    // The single-insert path checks the scheduler per-insert; with n vectors and 1 more
    // update the ratio (1/201 < 5%) might not trip immediately. What matters is that
    // the scheduler is no longer blocked. We verify it's unblocked by directly inserting
    // enough to trip the threshold.
    handle.hnsw_updated_vectors = (handle.hnsw_total_vectors as f32 * 0.1) as u64;
    let trip_payload =
        encode_vector_payload_f32(space_id, VectorMetric::Cosine, &[1000.0, 0.0], false);
    let trip_delta = encode_delta(10000, 2, &trip_payload);
    put_full_node(&mut handle, 10000, 1, &[]).unwrap();
    put_vector_delta(&mut handle, &trip_delta).unwrap();

    assert!(
        handle.last_hnsw_rebuild_reason.is_some(),
        "scheduler must fire after resume when threshold is exceeded"
    );
}

/// rebuild_vector_space reconstructs the HNSW from non-tombstoned vectors.
#[test]
fn rebuild_vector_space_produces_correct_graph() {
    let base = temp_dir("rebuild_correct");
    let mut handle = open(&base);
    let space_id = 20_u32;

    // Insert 3 nodes.
    for i in 1..=3_u64 {
        put_full_node(&mut handle, i, 1, &[]).unwrap();
        let payload =
            encode_vector_payload_f32(space_id, VectorMetric::Cosine, &[i as f32, 0.0], false);
        put_vector_delta(&mut handle, &encode_delta(i, 2, &payload)).unwrap();
    }

    // Tombstone node 2.
    put_tombstone(&mut handle, 2, 3).unwrap();

    // Rebuild.
    rebuild_vector_space(&mut handle, space_id).unwrap();

    let results = hnsw_search_in_space(&handle, space_id, &[1.0, 0.0], 10);
    let ids: Vec<u64> = results.into_iter().map(|(id, _)| id).collect();

    assert!(ids.contains(&1), "node 1 must be in rebuilt graph");
    assert!(ids.contains(&3), "node 3 must be in rebuilt graph");
    assert!(
        !ids.contains(&2),
        "tombstoned node 2 must not appear after rebuild"
    );
}