use super::*;
#[test]
fn local_blob_backend_put_get_has_delete_round_trip() {
let base = temp_dir("storage_blob_local_roundtrip");
let mut handle = open_store_with_blob_backend(
StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
},
BlobBackend::Local,
)
.expect("open store");
assert_eq!(handle.blob_backend, BlobBackend::Local);
assert!(!has_blob(&handle, "chunk-1").expect("has before put"));
put_blob(&mut handle, "chunk-1", b"hello").expect("put blob");
assert!(has_blob(&handle, "chunk-1").expect("has after put"));
let got = get_blob(&handle, "chunk-1").expect("get blob");
assert_eq!(got, Some(b"hello".to_vec()));
delete_blob(&mut handle, "chunk-1").expect("delete blob");
assert!(!has_blob(&handle, "chunk-1").expect("has after delete"));
assert_eq!(
get_blob(&handle, "chunk-1").expect("get after delete"),
None
);
}
#[test]
#[cfg(not(feature = "rhodium-backend"))]
fn rhodium_blob_backend_requires_feature_flag() {
let base = temp_dir("storage_blob_rhodium_stub");
let mut handle = open_store_with_blob_backend(
StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
},
BlobBackend::Rhodium,
)
.expect("open store");
assert_eq!(handle.blob_backend, BlobBackend::Rhodium);
let err = put_blob(&mut handle, "chunk-2", b"hello").expect_err("rhodium put should fail");
match err {
StorageError::InvalidInput(message) => {
assert!(message.contains("not wired yet"));
}
other => panic!("expected rhodium stub error, got {:?}", other),
}
}
#[test]
#[cfg(feature = "rhodium-backend")]
fn rhodium_blob_backend_put_get_has_delete_round_trip() {
let base = temp_dir("storage_blob_rhodium_roundtrip");
let mut handle = open_store_with_blob_backend(
StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
},
BlobBackend::Rhodium,
)
.expect("open store");
assert_eq!(handle.blob_backend, BlobBackend::Rhodium);
assert!(!has_blob(&handle, "chunk-rh-1").expect("has before put"));
assert_eq!(
get_blob(&handle, "chunk-rh-1").expect("get before put"),
None
);
put_blob(&mut handle, "chunk-rh-1", b"hello-rh").expect("put blob");
assert!(has_blob(&handle, "chunk-rh-1").expect("has after put"));
assert_eq!(
get_blob(&handle, "chunk-rh-1").expect("get after put"),
Some(b"hello-rh".to_vec())
);
delete_blob(&mut handle, "chunk-rh-1").expect("delete blob");
assert!(!has_blob(&handle, "chunk-rh-1").expect("has after delete"));
assert_eq!(
get_blob(&handle, "chunk-rh-1").expect("get after delete"),
None
);
}
#[test]
fn local_blob_backend_rejects_invalid_blob_ids() {
let base = temp_dir("storage_blob_invalid_id");
let mut handle = open_store_with_blob_backend(
StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
},
BlobBackend::Local,
)
.expect("open store");
let err = put_blob(&mut handle, "a/b", b"x").expect_err("invalid blob id should fail");
match err {
StorageError::InvalidInput(message) => {
assert!(message.contains("path separators"));
}
other => panic!("expected invalid input for blob id, got {:?}", other),
}
}
#[test]
fn local_blob_backend_put_options_and_batch_prefix_ops() {
let base = temp_dir("storage_blob_local_options_batch_prefix");
let mut handle = open_store_with_blob_backend(
StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
},
BlobBackend::Local,
)
.expect("open store");
let first = put_blob_with_options(
&mut handle,
"ns-a1",
b"v1",
BlobPutOptions {
idempotent: true,
..BlobPutOptions::default()
},
)
.expect("first put");
assert!(first.inserted);
assert!(!first.idempotent_noop);
let second = put_blob_with_options(
&mut handle,
"ns-a1",
b"v1",
BlobPutOptions {
idempotent: true,
..BlobPutOptions::default()
},
)
.expect("second put idempotent noop");
assert!(second.idempotent_noop);
let deny = put_blob_with_options(
&mut handle,
"ns-a1",
b"new",
BlobPutOptions {
deny_if_exists: true,
..BlobPutOptions::default()
},
)
.expect_err("deny-if-exists should reject overwrite");
match deny {
StorageError::InvalidInput(message) => assert!(message.contains("already exists")),
other => panic!("unexpected error for deny-if-exists: {:?}", other),
}
put_blob(&mut handle, "ns-a2", b"v2").expect("put a2");
put_blob(&mut handle, "ns-b1", b"v3").expect("put b1");
let has = has_blobs(
&handle,
&[
"ns-a1".to_string(),
"ns-a2".to_string(),
"missing".to_string(),
],
)
.expect("has_blobs");
assert_eq!(has, vec![true, true, false]);
let listed = list_blob_prefix(&handle, "ns-", "a", 10).expect("list prefix");
assert_eq!(listed, vec!["ns-a1".to_string(), "ns-a2".to_string()]);
let deleted_prefix = delete_blob_prefix(&mut handle, "ns-", "a", 1).expect("delete prefix");
assert_eq!(deleted_prefix.deleted, 1);
assert!(deleted_prefix.truncated);
let deleted_batch = delete_blobs(&mut handle, &["ns-b1".to_string(), "missing".to_string()])
.expect("delete blobs");
assert_eq!(deleted_batch, 1);
}
#[test]
#[cfg(feature = "rhodium-backend")]
fn rhodium_blob_backend_optioned_and_batch_prefix_ops() {
let base = temp_dir("storage_blob_rhodium_options_batch_prefix");
let mut handle = open_store_with_blob_backend(
StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
},
BlobBackend::Rhodium,
)
.expect("open store");
let id = "manifest/a/1";
let put = put_blob_with_options(
&mut handle,
id,
b"rh-v1",
BlobPutOptions {
idempotent: true,
ack_mode: BlobAckMode::Flush,
durability_target: BlobDurabilityTarget::Disk,
timeout_ms: 5_000,
..BlobPutOptions::default()
},
)
.expect("rhodium put");
assert!(put.inserted);
let got = get_blob_with_options(
&handle,
id,
BlobReadOptions {
tier_policy: BlobReadTierPolicy::LocalFirst,
rehydrate_local: true,
},
)
.expect("get with options")
.expect("blob exists");
assert_eq!(got.data, b"rh-v1".to_vec());
put_blob(&mut handle, "manifest/a/2", b"rh-v2").expect("put a2");
put_blob(&mut handle, "manifest/b/1", b"rh-v3").expect("put b1");
let has = has_blobs(
&handle,
&[
"manifest/a/1".to_string(),
"manifest/a/2".to_string(),
"manifest/missing".to_string(),
],
)
.expect("has_blobs");
assert_eq!(has, vec![true, true, false]);
let listed = list_blob_prefix(&handle, "manifest", "/a", 10).expect("list prefix");
assert_eq!(listed.len(), 2);
assert!(listed.contains(&"manifest/a/1".to_string()));
assert!(listed.contains(&"manifest/a/2".to_string()));
let del_prefix = delete_blob_prefix(&mut handle, "manifest", "/a", 1).expect("delete prefix");
assert_eq!(del_prefix.deleted, 1);
assert!(del_prefix.truncated);
let del_batch = delete_blobs(
&mut handle,
&["manifest/b/1".to_string(), "manifest/missing".to_string()],
)
.expect("delete batch");
assert_eq!(del_batch, 1);
}