iridium-db 0.4.0

A high-performance vector-graph hybrid storage and indexing engine
use super::*;

#[test]
fn local_blob_backend_put_get_has_delete_round_trip() {
    let base = temp_dir("storage_blob_local_roundtrip");
    let mut handle = open_store_with_blob_backend(
        StorageConfig {
            buffer_pool_pages: 8,
            wal_dir: base.join("wal"),
            wal_segment_max_bytes: 1 << 20,
            manifest_path: base.join("ir.manifest"),
            sstable_dir: base.join("sst"),
        },
        BlobBackend::Local,
    )
    .expect("open store");

    assert_eq!(handle.blob_backend, BlobBackend::Local);
    assert!(!has_blob(&handle, "chunk-1").expect("has before put"));

    put_blob(&mut handle, "chunk-1", b"hello").expect("put blob");
    assert!(has_blob(&handle, "chunk-1").expect("has after put"));
    let got = get_blob(&handle, "chunk-1").expect("get blob");
    assert_eq!(got, Some(b"hello".to_vec()));

    delete_blob(&mut handle, "chunk-1").expect("delete blob");
    assert!(!has_blob(&handle, "chunk-1").expect("has after delete"));
    assert_eq!(
        get_blob(&handle, "chunk-1").expect("get after delete"),
        None
    );
}

#[test]
#[cfg(not(feature = "rhodium-backend"))]
fn rhodium_blob_backend_requires_feature_flag() {
    let base = temp_dir("storage_blob_rhodium_stub");
    let mut handle = open_store_with_blob_backend(
        StorageConfig {
            buffer_pool_pages: 8,
            wal_dir: base.join("wal"),
            wal_segment_max_bytes: 1 << 20,
            manifest_path: base.join("ir.manifest"),
            sstable_dir: base.join("sst"),
        },
        BlobBackend::Rhodium,
    )
    .expect("open store");

    assert_eq!(handle.blob_backend, BlobBackend::Rhodium);

    let err = put_blob(&mut handle, "chunk-2", b"hello").expect_err("rhodium put should fail");
    match err {
        StorageError::InvalidInput(message) => {
            assert!(message.contains("not wired yet"));
        }
        other => panic!("expected rhodium stub error, got {:?}", other),
    }
}

#[test]
#[cfg(feature = "rhodium-backend")]
fn rhodium_blob_backend_put_get_has_delete_round_trip() {
    let base = temp_dir("storage_blob_rhodium_roundtrip");
    let mut handle = open_store_with_blob_backend(
        StorageConfig {
            buffer_pool_pages: 8,
            wal_dir: base.join("wal"),
            wal_segment_max_bytes: 1 << 20,
            manifest_path: base.join("ir.manifest"),
            sstable_dir: base.join("sst"),
        },
        BlobBackend::Rhodium,
    )
    .expect("open store");

    assert_eq!(handle.blob_backend, BlobBackend::Rhodium);
    assert!(!has_blob(&handle, "chunk-rh-1").expect("has before put"));
    assert_eq!(
        get_blob(&handle, "chunk-rh-1").expect("get before put"),
        None
    );

    put_blob(&mut handle, "chunk-rh-1", b"hello-rh").expect("put blob");
    assert!(has_blob(&handle, "chunk-rh-1").expect("has after put"));
    assert_eq!(
        get_blob(&handle, "chunk-rh-1").expect("get after put"),
        Some(b"hello-rh".to_vec())
    );

    delete_blob(&mut handle, "chunk-rh-1").expect("delete blob");
    assert!(!has_blob(&handle, "chunk-rh-1").expect("has after delete"));
    assert_eq!(
        get_blob(&handle, "chunk-rh-1").expect("get after delete"),
        None
    );
}

#[test]
fn local_blob_backend_rejects_invalid_blob_ids() {
    let base = temp_dir("storage_blob_invalid_id");
    let mut handle = open_store_with_blob_backend(
        StorageConfig {
            buffer_pool_pages: 8,
            wal_dir: base.join("wal"),
            wal_segment_max_bytes: 1 << 20,
            manifest_path: base.join("ir.manifest"),
            sstable_dir: base.join("sst"),
        },
        BlobBackend::Local,
    )
    .expect("open store");

    let err = put_blob(&mut handle, "a/b", b"x").expect_err("invalid blob id should fail");
    match err {
        StorageError::InvalidInput(message) => {
            assert!(message.contains("path separators"));
        }
        other => panic!("expected invalid input for blob id, got {:?}", other),
    }
}

#[test]
fn local_blob_backend_put_options_and_batch_prefix_ops() {
    let base = temp_dir("storage_blob_local_options_batch_prefix");
    let mut handle = open_store_with_blob_backend(
        StorageConfig {
            buffer_pool_pages: 8,
            wal_dir: base.join("wal"),
            wal_segment_max_bytes: 1 << 20,
            manifest_path: base.join("ir.manifest"),
            sstable_dir: base.join("sst"),
        },
        BlobBackend::Local,
    )
    .expect("open store");

    let first = put_blob_with_options(
        &mut handle,
        "ns-a1",
        b"v1",
        BlobPutOptions {
            idempotent: true,
            ..BlobPutOptions::default()
        },
    )
    .expect("first put");
    assert!(first.inserted);
    assert!(!first.idempotent_noop);

    let second = put_blob_with_options(
        &mut handle,
        "ns-a1",
        b"v1",
        BlobPutOptions {
            idempotent: true,
            ..BlobPutOptions::default()
        },
    )
    .expect("second put idempotent noop");
    assert!(second.idempotent_noop);

    let deny = put_blob_with_options(
        &mut handle,
        "ns-a1",
        b"new",
        BlobPutOptions {
            deny_if_exists: true,
            ..BlobPutOptions::default()
        },
    )
    .expect_err("deny-if-exists should reject overwrite");
    match deny {
        StorageError::InvalidInput(message) => assert!(message.contains("already exists")),
        other => panic!("unexpected error for deny-if-exists: {:?}", other),
    }

    put_blob(&mut handle, "ns-a2", b"v2").expect("put a2");
    put_blob(&mut handle, "ns-b1", b"v3").expect("put b1");

    let has = has_blobs(
        &handle,
        &[
            "ns-a1".to_string(),
            "ns-a2".to_string(),
            "missing".to_string(),
        ],
    )
    .expect("has_blobs");
    assert_eq!(has, vec![true, true, false]);

    let listed = list_blob_prefix(&handle, "ns-", "a", 10).expect("list prefix");
    assert_eq!(listed, vec!["ns-a1".to_string(), "ns-a2".to_string()]);

    let deleted_prefix = delete_blob_prefix(&mut handle, "ns-", "a", 1).expect("delete prefix");
    assert_eq!(deleted_prefix.deleted, 1);
    assert!(deleted_prefix.truncated);

    let deleted_batch = delete_blobs(&mut handle, &["ns-b1".to_string(), "missing".to_string()])
        .expect("delete blobs");
    assert_eq!(deleted_batch, 1);
}

#[test]
#[cfg(feature = "rhodium-backend")]
fn rhodium_blob_backend_optioned_and_batch_prefix_ops() {
    let base = temp_dir("storage_blob_rhodium_options_batch_prefix");
    let mut handle = open_store_with_blob_backend(
        StorageConfig {
            buffer_pool_pages: 8,
            wal_dir: base.join("wal"),
            wal_segment_max_bytes: 1 << 20,
            manifest_path: base.join("ir.manifest"),
            sstable_dir: base.join("sst"),
        },
        BlobBackend::Rhodium,
    )
    .expect("open store");

    let id = "manifest/a/1";
    let put = put_blob_with_options(
        &mut handle,
        id,
        b"rh-v1",
        BlobPutOptions {
            idempotent: true,
            ack_mode: BlobAckMode::Flush,
            durability_target: BlobDurabilityTarget::Disk,
            timeout_ms: 5_000,
            ..BlobPutOptions::default()
        },
    )
    .expect("rhodium put");
    assert!(put.inserted);

    let got = get_blob_with_options(
        &handle,
        id,
        BlobReadOptions {
            tier_policy: BlobReadTierPolicy::LocalFirst,
            rehydrate_local: true,
        },
    )
    .expect("get with options")
    .expect("blob exists");
    assert_eq!(got.data, b"rh-v1".to_vec());

    put_blob(&mut handle, "manifest/a/2", b"rh-v2").expect("put a2");
    put_blob(&mut handle, "manifest/b/1", b"rh-v3").expect("put b1");

    let has = has_blobs(
        &handle,
        &[
            "manifest/a/1".to_string(),
            "manifest/a/2".to_string(),
            "manifest/missing".to_string(),
        ],
    )
    .expect("has_blobs");
    assert_eq!(has, vec![true, true, false]);

    let listed = list_blob_prefix(&handle, "manifest", "/a", 10).expect("list prefix");
    assert_eq!(listed.len(), 2);
    assert!(listed.contains(&"manifest/a/1".to_string()));
    assert!(listed.contains(&"manifest/a/2".to_string()));

    let del_prefix = delete_blob_prefix(&mut handle, "manifest", "/a", 1).expect("delete prefix");
    assert_eq!(del_prefix.deleted, 1);
    assert!(del_prefix.truncated);

    let del_batch = delete_blobs(
        &mut handle,
        &["manifest/b/1".to_string(), "manifest/missing".to_string()],
    )
    .expect("delete batch");
    assert_eq!(del_batch, 1);
}