iridium-db 0.2.0

A high-performance vector-graph hybrid storage and indexing engine
use super::*;
#[test]
fn execute_fanout_merges_rows_across_shards() {
    let base = temp_dir("runtime_fanout_merge");
    let manifest_path = base.join("ir.manifest");
    let config0 = storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: manifest_path.clone(),
        sstable_dir: base.join("sst"),
    };
    let config1 = storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path,
        sstable_dir: base.join("sst"),
    };
    let lanes = storage_api::ThreadCoreLaneConfig::default();
    let request0 = storage_api::ThreadCoreRequest {
        core_id: 0,
        shard_count: 2,
    };
    let request1 = storage_api::ThreadCoreRequest {
        core_id: 1,
        shard_count: 2,
    };
    let mut handle0 = storage_api::open_store_for_request(config0, &request0, &lanes).unwrap();
    let mut handle1 = storage_api::open_store_for_request(config1, &request1, &lanes).unwrap();

    for node_id in 1..=12 {
        if storage_api::request_owns_node(&request0, node_id) {
            storage_api::put_full_node(&mut handle0, node_id, 1, &[node_id + 1]).unwrap();
        } else {
            storage_api::put_full_node(&mut handle1, node_id, 1, &[node_id + 1]).unwrap();
        }
    }
    storage_api::flush(&mut handle0).unwrap();
    storage_api::flush(&mut handle1).unwrap();

    let typed = validate(&parse("MATCH (n) RETURN n LIMIT 20").unwrap(), &Catalog).unwrap();
    let plan = explain(&typed).unwrap();
    let mut shards = vec![
        FanoutShardExecution {
            request: request0.clone(),
            handle: &mut handle0,
        },
        FanoutShardExecution {
            request: request1.clone(),
            handle: &mut handle1,
        },
    ];
    let out = execute_fanout(
        &plan,
        &ExecuteParams {
            scan_start: 1,
            scan_end_exclusive: 13,
            morsel_size: 8,
            parallel_workers: 1,
        },
        &mut shards,
    )
    .unwrap();
    let ids: Vec<u64> = out.rows.iter().map(|row| row.node_id).collect();
    assert_eq!(ids, (1..=12).collect::<Vec<u64>>());
}

#[test]
fn execute_fanout_merge_is_deterministic_across_shard_order() {
    let base = temp_dir("runtime_fanout_order");
    let manifest_path = base.join("ir.manifest");
    let lanes = storage_api::ThreadCoreLaneConfig::default();
    let request0 = storage_api::ThreadCoreRequest {
        core_id: 0,
        shard_count: 2,
    };
    let request1 = storage_api::ThreadCoreRequest {
        core_id: 1,
        shard_count: 2,
    };
    let config_a = storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: manifest_path.clone(),
        sstable_dir: base.join("sst"),
    };
    let config_b = storage_api::StorageConfig {
        buffer_pool_pages: 8,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path,
        sstable_dir: base.join("sst"),
    };
    let mut handle0 = storage_api::open_store_for_request(config_a, &request0, &lanes).unwrap();
    let mut handle1 = storage_api::open_store_for_request(config_b, &request1, &lanes).unwrap();

    for node_id in 1..=20 {
        if storage_api::request_owns_node(&request0, node_id) {
            storage_api::put_full_node(&mut handle0, node_id, 1, &[node_id + 1]).unwrap();
            let delta = storage_api::encode_delta(
                node_id,
                2,
                &encode_vector_payload(&[(node_id as f32) / 20.0, 1.0]),
            );
            storage_api::put_vector_delta(&mut handle0, &delta).unwrap();
        } else {
            storage_api::put_full_node(&mut handle1, node_id, 1, &[node_id + 1]).unwrap();
            let delta = storage_api::encode_delta(
                node_id,
                2,
                &encode_vector_payload(&[(node_id as f32) / 20.0, 1.0]),
            );
            storage_api::put_vector_delta(&mut handle1, &delta).unwrap();
        }
    }
    storage_api::flush(&mut handle0).unwrap();
    storage_api::flush(&mut handle1).unwrap();

    let typed = validate(
        &parse("MATCH (n) WHERE vector.cosine(n.embedding, $q:1:1) > 0.4 RETURN n LIMIT 8")
            .unwrap(),
        &Catalog,
    )
    .unwrap();
    let plan = explain(&typed).unwrap();
    let params = ExecuteParams {
        scan_start: 1,
        scan_end_exclusive: 21,
        morsel_size: 8,
        parallel_workers: 1,
    };

    let ids_a = {
        let mut shards = vec![
            FanoutShardExecution {
                request: request0.clone(),
                handle: &mut handle0,
            },
            FanoutShardExecution {
                request: request1.clone(),
                handle: &mut handle1,
            },
        ];
        let out = execute_fanout(&plan, &params, &mut shards).unwrap();
        out.rows
            .into_iter()
            .map(|row| row.node_id)
            .collect::<Vec<u64>>()
    };

    let ids_b = {
        let mut shards = vec![
            FanoutShardExecution {
                request: request1,
                handle: &mut handle1,
            },
            FanoutShardExecution {
                request: request0,
                handle: &mut handle0,
            },
        ];
        let out = execute_fanout(&plan, &params, &mut shards).unwrap();
        out.rows
            .into_iter()
            .map(|row| row.node_id)
            .collect::<Vec<u64>>()
    };

    assert_eq!(ids_a, ids_b);
}