use super::super::types::ChecksumAlgorithm;
use super::*;
use std::sync::Barrier;
use std::thread;
const STRESS_READERS: usize = 8;
const STRESS_READS_PER_THREAD: usize = 5_000;
#[test]
fn load_returns_initial_config() {
let handle = RuntimeConfigHandle::new(RuntimeConfig::default());
let snap = handle.load();
assert_eq!(snap.block_checksum_algo, ChecksumAlgorithm::Xxh3_64);
}
#[test]
#[cfg(feature = "page_ecc")]
fn try_update_accepts_ecc_on_with_secded_and_shard_schemes() {
use super::super::types::EccScheme;
let handle = RuntimeConfigHandle::new(RuntimeConfig::default());
let secded = handle.try_update(|c| c.page_ecc = true);
assert!(
secded.is_ok(),
"ECC on + Secded (Block) must be accepted, got {secded:?}"
);
assert!(handle.load().page_ecc, "accepted update must enable ECC");
assert_eq!(handle.load().ecc_scheme, EccScheme::Secded);
let xor = handle.try_update(|c| {
c.page_ecc = true;
c.ecc_scheme = EccScheme::Xor { data_shards: 10 };
});
assert!(
xor.is_ok(),
"ECC on + explicit Xor must be accepted, got {xor:?}"
);
assert!(handle.load().page_ecc);
for bad_scheme in [
EccScheme::Xor { data_shards: 0 },
EccScheme::ReedSolomon {
data_shards: 0,
parity_shards: 2,
},
EccScheme::ReedSolomon {
data_shards: 8,
parity_shards: 0,
},
] {
let h = RuntimeConfigHandle::new(RuntimeConfig::default());
let bad = h.try_update(|c| {
c.page_ecc = true;
c.ecc_scheme = bad_scheme;
});
assert!(
matches!(bad, Err(crate::Error::FeatureUnsupported(_))),
"zero-shard scheme {bad_scheme:?} must be rejected, got {bad:?}"
);
assert!(!h.load().page_ecc, "rejected update must not enable ECC");
}
}
#[test]
#[cfg(feature = "page_ecc")]
fn try_update_rejects_unwritable_ecc_layouts_when_enabled() {
use super::super::types::{EccGranularity, EccScheme};
let page = RuntimeConfigHandle::new(RuntimeConfig::default());
let r = page.try_update(|c| {
c.page_ecc = true;
c.ecc_scheme = EccScheme::ReedSolomon {
data_shards: 8,
parity_shards: 2,
};
c.ecc_granularity = EccGranularity::Page;
});
assert!(
matches!(r, Err(crate::Error::FeatureUnsupported(_))),
"{r:?}"
);
assert!(!page.load().page_ecc);
let rs1 = RuntimeConfigHandle::new(RuntimeConfig::default());
let r = rs1.try_update(|c| {
c.page_ecc = true;
c.ecc_scheme = EccScheme::ReedSolomon {
data_shards: 8,
parity_shards: 1,
};
});
assert!(
matches!(r, Err(crate::Error::FeatureUnsupported(_))),
"{r:?}"
);
assert!(!rs1.load().page_ecc);
let ovr = RuntimeConfigHandle::new(RuntimeConfig::default());
let r = ovr.try_update(|c| {
c.data_block_ecc_override = Some(true);
c.ecc_scheme = EccScheme::Xor { data_shards: 8 };
});
assert!(
r.is_ok(),
"override-enabled valid scheme must be accepted: {r:?}"
);
let ovr_secded = RuntimeConfigHandle::new(RuntimeConfig::default());
let r = ovr_secded.try_update(|c| {
c.data_block_ecc_override = Some(true);
});
assert!(
r.is_ok(),
"override-enabled Secded (Block) must be accepted: {r:?}"
);
}
#[test]
fn try_update_rejects_at_insert_with_8_byte_algorithm() {
use super::super::types::KvChecksumComputePoint;
let handle = RuntimeConfigHandle::new(RuntimeConfig::default());
assert_eq!(
handle.load().kv_checksum_algo,
ChecksumAlgorithm::Xxh3_64,
"precondition: default algo is the 8-byte Xxh3_64"
);
let result = handle.try_update(|c| {
c.kv_checksum_compute_point = KvChecksumComputePoint::AtInsert;
});
assert!(
matches!(result, Err(crate::Error::FeatureUnsupported(_))),
"AtInsert + 8-byte algo must be rejected, got {result:?}"
);
assert_eq!(
handle.load().kv_checksum_compute_point,
KvChecksumComputePoint::AtBlockCompile,
"rejected update must not mutate the live snapshot"
);
}
#[test]
fn try_update_accepts_at_insert_with_4_byte_algorithm() {
use super::super::types::KvChecksumComputePoint;
let assert_accepted = |algo: ChecksumAlgorithm| {
let handle = RuntimeConfigHandle::new(RuntimeConfig::default());
let result = handle.try_update(|c| {
c.kv_checksum_algo = algo;
c.kv_checksum_compute_point = KvChecksumComputePoint::AtInsert;
});
assert!(
result.is_ok(),
"AtInsert + compiled 4-byte {algo:?} must be accepted, got {result:?}"
);
assert_eq!(
handle.load().kv_checksum_compute_point,
KvChecksumComputePoint::AtInsert,
"accepted AtInsert must be visible on next load for {algo:?}"
);
};
assert_accepted(ChecksumAlgorithm::Xxh3Low32);
#[cfg(feature = "crc32c")]
assert_accepted(ChecksumAlgorithm::Crc32c);
}
#[cfg(not(feature = "crc32c"))]
#[test]
fn try_update_rejects_at_insert_with_uncompiled_algorithm() {
use super::super::types::KvChecksumComputePoint;
let handle = RuntimeConfigHandle::new(RuntimeConfig::default());
let result = handle.try_update(|c| {
c.kv_checksum_algo = ChecksumAlgorithm::Crc32c;
c.kv_checksum_compute_point = KvChecksumComputePoint::AtInsert;
});
assert!(
matches!(result, Err(crate::Error::FeatureUnsupported(_))),
"AtInsert + uncompiled Crc32c must be rejected, got {result:?}"
);
}
#[test]
fn update_applies_mutation_visible_on_next_load() {
let handle = RuntimeConfigHandle::new(RuntimeConfig::default());
handle
.try_update(|cfg| {
cfg.block_checksum_algo = ChecksumAlgorithm::Crc32c;
})
.unwrap();
let snap = handle.load();
assert_eq!(snap.block_checksum_algo, ChecksumAlgorithm::Crc32c);
}
#[test]
fn snapshot_held_during_update_is_unchanged() {
let handle = RuntimeConfigHandle::new(RuntimeConfig::default());
let snap_before = handle.load_full();
handle
.try_update(|cfg| {
cfg.block_checksum_algo = ChecksumAlgorithm::Crc32c;
})
.unwrap();
assert_eq!(snap_before.block_checksum_algo, ChecksumAlgorithm::Xxh3_64);
assert_eq!(handle.load().block_checksum_algo, ChecksumAlgorithm::Crc32c,);
}
#[test]
fn concurrent_reads_during_single_update_observe_consistent_snapshot() {
let handle = Arc::new(RuntimeConfigHandle::new(RuntimeConfig {
block_checksum_algo: ChecksumAlgorithm::Xxh3_64,
kv_checksum_algo: ChecksumAlgorithm::Xxh3_64,
..RuntimeConfig::default()
}));
let barrier = Arc::new(Barrier::new(STRESS_READERS + 1));
let reader_handles: Vec<_> = (0..STRESS_READERS)
.map(|_| {
let handle = Arc::clone(&handle);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
for _ in 0..STRESS_READS_PER_THREAD {
let snap = handle.load();
let (a, b) = (snap.block_checksum_algo, snap.kv_checksum_algo);
let initial =
a == ChecksumAlgorithm::Xxh3_64 && b == ChecksumAlgorithm::Xxh3_64;
let updated = a == ChecksumAlgorithm::Crc32c && b == ChecksumAlgorithm::Crc32c;
assert!(initial || updated, "torn read: block={a:?}, kv={b:?}",);
}
})
})
.collect();
barrier.wait();
handle
.try_update(|cfg| {
cfg.block_checksum_algo = ChecksumAlgorithm::Crc32c;
cfg.kv_checksum_algo = ChecksumAlgorithm::Crc32c;
})
.unwrap();
for h in reader_handles {
assert!(h.join().is_ok(), "reader thread panicked");
}
}
#[test]
fn multiple_back_to_back_updates_final_state_observable() {
let handle = RuntimeConfigHandle::new(RuntimeConfig::default());
handle
.try_update(|cfg| cfg.block_checksum_algo = ChecksumAlgorithm::Crc32c)
.unwrap();
handle
.try_update(|cfg| cfg.kv_checksum_algo = ChecksumAlgorithm::Xxh3Low32)
.unwrap();
handle
.try_update(|cfg| cfg.block_checksum_algo = ChecksumAlgorithm::Xxh3Low32)
.unwrap();
let snap = handle.load();
assert_eq!(snap.block_checksum_algo, ChecksumAlgorithm::Xxh3Low32);
assert_eq!(snap.kv_checksum_algo, ChecksumAlgorithm::Xxh3Low32);
}