use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Barrier,
};
use std::thread;
use std::time::Duration;
use rollblock::block_journal::SyncPolicy;
use rollblock::error::StoreError;
use rollblock::orchestrator::{BlockOrchestrator, DefaultBlockOrchestrator};
use rollblock::state_engine::ShardedStateEngine;
use rollblock::state_shard::{RawTableShard, StateShard};
use rollblock::types::{ShardOp, ShardStats, ShardUndo, StoreKey as Key, Value, ValueBuf};
use rollblock::FileBlockJournal;
use rollblock::MetadataStore;
use super::support::{
operation, synchronous_settings, tempdir, MemoryMetadataStore, NoopSnapshotter,
};
struct BlockingShard {
inner: RawTableShard,
apply_ready: Arc<Barrier>,
apply_release: Arc<Barrier>,
apply_calls: AtomicUsize,
}
impl BlockingShard {
fn new(
shard_index: usize,
capacity: usize,
apply_ready: Arc<Barrier>,
apply_release: Arc<Barrier>,
) -> Self {
Self {
inner: RawTableShard::new(shard_index, capacity),
apply_ready,
apply_release,
apply_calls: AtomicUsize::new(0),
}
}
}
impl StateShard for BlockingShard {
fn apply(&self, ops: &[ShardOp]) -> ShardUndo {
let call_index = self.apply_calls.fetch_add(1, Ordering::SeqCst);
if call_index == 1 {
self.apply_ready.wait();
self.apply_release.wait();
}
self.inner.apply(ops)
}
fn revert(&self, undo: &ShardUndo) {
self.inner.revert(undo);
}
fn get(&self, key: &Key) -> Option<ValueBuf> {
self.inner.get(key)
}
fn stats(&self) -> ShardStats {
self.inner.stats()
}
fn export_data(&self) -> Vec<(Key, ValueBuf)> {
self.inner.export_data()
}
fn visit_entries(&self, visitor: &mut dyn FnMut(Key, ValueBuf)) {
self.inner.visit_entries(visitor);
}
fn import_data(&self, data: Vec<(Key, ValueBuf)>) {
self.inner.import_data(data);
}
}
#[test]
fn apply_and_revert_flow() {
let tmp = tempdir();
let journal_path = tmp.path().join("journal");
let metadata = Arc::new(MemoryMetadataStore::new());
let journal = Arc::new(FileBlockJournal::new(&journal_path).unwrap());
let snapshotter = Arc::new(NoopSnapshotter);
let shards: Vec<Arc<dyn StateShard>> = (0..4)
.map(|index| Arc::new(RawTableShard::new(index, 32)) as Arc<dyn StateShard>)
.collect();
let engine = Arc::new(ShardedStateEngine::new(shards, Arc::clone(&metadata)));
let orchestrator = DefaultBlockOrchestrator::new(
engine,
journal,
snapshotter,
Arc::clone(&metadata),
synchronous_settings(),
)
.unwrap();
let key_a: Key = [1u8; Key::BYTES].into();
let key_b: Key = [2u8; Key::BYTES].into();
orchestrator
.apply_operations(1, vec![operation(key_a, 10)])
.unwrap();
assert_eq!(orchestrator.fetch(key_a).unwrap(), 10);
assert_eq!(metadata.current_block().unwrap(), 1);
orchestrator
.apply_operations(2, vec![operation(key_a, 42), operation(key_b, 7)])
.unwrap();
assert_eq!(orchestrator.fetch(key_a).unwrap(), 42);
assert_eq!(orchestrator.fetch(key_b).unwrap(), 7);
assert_eq!(metadata.current_block().unwrap(), 2);
orchestrator.revert_to(0).unwrap();
assert_eq!(orchestrator.fetch(key_a).unwrap(), 0);
assert_eq!(orchestrator.fetch(key_b).unwrap(), 0);
assert_eq!(metadata.current_block().unwrap(), 0);
}
#[test]
fn rollback_discards_unsynced_relaxed_blocks() {
let tmp = tempdir();
let journal_path = tmp.path().join("journal");
let metadata = Arc::new(MemoryMetadataStore::new());
let journal = Arc::new(FileBlockJournal::new(&journal_path).unwrap());
let snapshotter = Arc::new(NoopSnapshotter);
let shards: Vec<Arc<dyn StateShard>> = (0..2)
.map(|index| Arc::new(RawTableShard::new(index, 32)) as Arc<dyn StateShard>)
.collect();
let engine = Arc::new(ShardedStateEngine::new(shards, Arc::clone(&metadata)));
let orchestrator = DefaultBlockOrchestrator::new(
engine,
journal,
snapshotter,
Arc::clone(&metadata),
synchronous_settings(),
)
.unwrap();
orchestrator.set_sync_policy(SyncPolicy::every_n_blocks(10));
orchestrator.set_metadata_sync_interval(10).unwrap();
let key: Key = [0x11u8; Key::BYTES].into();
orchestrator
.apply_operations(1, vec![operation(key, 10)])
.unwrap();
orchestrator
.apply_operations(2, vec![operation(key, 20)])
.unwrap();
orchestrator
.apply_operations(3, vec![operation(key, 30)])
.unwrap();
assert_eq!(
metadata.current_block().unwrap(),
1,
"metadata should only advance through the initial durable block while batching is enabled"
);
assert_eq!(orchestrator.fetch(key).unwrap(), 30);
orchestrator.revert_to(1).unwrap();
assert_eq!(orchestrator.fetch(key).unwrap(), 10);
assert_eq!(
metadata.current_block().unwrap(),
1,
"rollback should advance metadata to the requested height"
);
assert!(
!metadata.has_offset(2) && !metadata.has_offset(3),
"pending metadata for truncated blocks must be discarded"
);
}
#[test]
fn reader_gate_blocks_fetch_during_pending_apply() {
let tmp = tempdir();
let journal_path = tmp.path().join("journal");
let metadata = Arc::new(MemoryMetadataStore::new());
let journal = Arc::new(FileBlockJournal::new(&journal_path).unwrap());
let snapshotter = Arc::new(NoopSnapshotter);
let apply_ready = Arc::new(Barrier::new(2));
let apply_release = Arc::new(Barrier::new(2));
let shard: Arc<dyn StateShard> = Arc::new(BlockingShard::new(
0,
32,
Arc::clone(&apply_ready),
Arc::clone(&apply_release),
));
let engine = Arc::new(ShardedStateEngine::new(vec![shard], Arc::clone(&metadata)));
let orchestrator = Arc::new(
DefaultBlockOrchestrator::new(
engine,
journal,
snapshotter,
Arc::clone(&metadata),
synchronous_settings(),
)
.unwrap(),
);
let key: Key = [1u8; Key::BYTES].into();
orchestrator
.apply_operations(1, vec![operation(key, 11)])
.unwrap();
let writer = {
let orchestrator = Arc::clone(&orchestrator);
thread::spawn(move || {
orchestrator
.apply_operations(2, vec![operation(key, 42)])
.unwrap();
})
};
apply_ready.wait();
let fetch_start = Arc::new(Barrier::new(2));
let fetch_finished = Arc::new(AtomicBool::new(false));
let fetch_handle = {
let orchestrator = Arc::clone(&orchestrator);
let fetch_start = Arc::clone(&fetch_start);
let fetch_finished = Arc::clone(&fetch_finished);
thread::spawn(move || {
fetch_start.wait();
let value = orchestrator.fetch(key).unwrap();
fetch_finished.store(true, Ordering::Release);
value
})
};
fetch_start.wait();
thread::sleep(Duration::from_millis(50));
assert!(
!fetch_finished.load(Ordering::Acquire),
"fetch should still be blocked while apply is pending"
);
apply_release.wait();
writer.join().unwrap();
let fetched = fetch_handle.join().unwrap();
assert_eq!(fetched, 42);
}
#[test]
fn parallel_apply_with_rayon() {
let tmp = tempdir();
let journal_path = tmp.path().join("journal");
let metadata = Arc::new(MemoryMetadataStore::new());
let journal = Arc::new(FileBlockJournal::new(&journal_path).unwrap());
let snapshotter = Arc::new(NoopSnapshotter);
let thread_pool = Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(4)
.build()
.unwrap(),
);
let shards: Vec<Arc<dyn StateShard>> = (0..16)
.map(|index| Arc::new(RawTableShard::new(index, 32)) as Arc<dyn StateShard>)
.collect();
let engine = Arc::new(ShardedStateEngine::with_thread_pool(
shards,
Arc::clone(&metadata),
Some(thread_pool.clone()),
));
let orchestrator = DefaultBlockOrchestrator::new(
engine,
journal,
snapshotter,
Arc::clone(&metadata),
synchronous_settings(),
)
.unwrap();
let mut ops = Vec::new();
let make_key = |i: u32| Key::from_prefix(i.to_le_bytes());
for i in 0..1000 {
let key = make_key(i);
ops.push(operation(key, i as u64));
}
orchestrator.apply_operations(1, ops.clone()).unwrap();
for i in [0, 100, 500, 999] {
let key = make_key(i);
assert_eq!(orchestrator.fetch(key).unwrap(), i as u64);
}
let mut update_ops = Vec::new();
for i in 0..500 {
let key = make_key(i);
update_ops.push(operation(key, i as u64 + 1000));
}
orchestrator.apply_operations(2, update_ops).unwrap();
let key_0: Key = [0u8; Key::BYTES].into();
assert_eq!(orchestrator.fetch(key_0).unwrap(), 1000);
orchestrator.revert_to(1).unwrap();
assert_eq!(orchestrator.fetch(key_0).unwrap(), 0);
orchestrator.revert_to(0).unwrap();
assert_eq!(orchestrator.fetch(key_0).unwrap(), 0);
}
#[test]
fn orchestrator_supports_thread_pools() {
let tmp = tempdir();
let journal_path = tmp.path().join("journal");
let metadata = Arc::new(MemoryMetadataStore::new());
let journal = Arc::new(FileBlockJournal::new(&journal_path).unwrap());
let snapshotter = Arc::new(NoopSnapshotter);
let shards: Vec<Arc<dyn StateShard>> = (0..8)
.map(|index| Arc::new(RawTableShard::new(index, 32)) as Arc<dyn StateShard>)
.collect();
let thread_pool = Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(4)
.build()
.unwrap(),
);
let engine = Arc::new(ShardedStateEngine::with_thread_pool(
shards,
Arc::clone(&metadata),
Some(thread_pool),
));
let orchestrator = DefaultBlockOrchestrator::new(
engine,
journal,
snapshotter,
Arc::clone(&metadata),
synchronous_settings(),
)
.unwrap();
let key_a: Key = [1u8; Key::BYTES].into();
orchestrator
.apply_operations(1, vec![operation(key_a, 42)])
.unwrap();
assert_eq!(orchestrator.fetch(key_a).unwrap(), 42);
}
#[test]
fn empty_blocks_and_sparse_block_heights() {
let tmp = tempdir();
let journal_path = tmp.path().join("journal");
let metadata = Arc::new(MemoryMetadataStore::new());
let journal = Arc::new(FileBlockJournal::new(&journal_path).unwrap());
let snapshotter = Arc::new(NoopSnapshotter);
let shards: Vec<Arc<dyn StateShard>> = (0..4)
.map(|index| Arc::new(RawTableShard::new(index, 32)) as Arc<dyn StateShard>)
.collect();
let engine = Arc::new(ShardedStateEngine::new(shards, Arc::clone(&metadata)));
let orchestrator = DefaultBlockOrchestrator::new(
engine,
journal,
snapshotter,
Arc::clone(&metadata),
synchronous_settings(),
)
.unwrap();
let key_a: Key = [1u8; Key::BYTES].into();
let key_b: Key = [2u8; Key::BYTES].into();
orchestrator
.apply_operations(100, vec![operation(key_a, 10)])
.unwrap();
assert_eq!(orchestrator.fetch(key_a).unwrap(), 10);
assert_eq!(metadata.current_block().unwrap(), 100);
orchestrator
.apply_operations(105, vec![operation(key_b, 20)])
.unwrap();
assert_eq!(orchestrator.fetch(key_b).unwrap(), 20);
assert_eq!(metadata.current_block().unwrap(), 105);
orchestrator
.apply_operations(110, vec![operation(key_a, 30)])
.unwrap();
assert_eq!(orchestrator.fetch(key_a).unwrap(), 30);
assert_eq!(metadata.current_block().unwrap(), 110);
orchestrator.revert_to(107).unwrap();
assert_eq!(orchestrator.fetch(key_a).unwrap(), 10); assert_eq!(orchestrator.fetch(key_b).unwrap(), 20); assert_eq!(metadata.current_block().unwrap(), 107);
orchestrator.revert_to(102).unwrap();
assert_eq!(orchestrator.fetch(key_a).unwrap(), 10);
assert_eq!(orchestrator.fetch(key_b).unwrap(), 0); assert_eq!(metadata.current_block().unwrap(), 102);
orchestrator.revert_to(50).unwrap();
assert_eq!(orchestrator.fetch(key_a).unwrap(), 0);
assert_eq!(orchestrator.fetch(key_b).unwrap(), 0);
assert_eq!(metadata.current_block().unwrap(), 50);
}
#[test]
fn empty_block_only() {
let tmp = tempdir();
let journal_path = tmp.path().join("journal");
let metadata = Arc::new(MemoryMetadataStore::new());
let journal = Arc::new(FileBlockJournal::new(&journal_path).unwrap());
let snapshotter = Arc::new(NoopSnapshotter);
let shards: Vec<Arc<dyn StateShard>> = (0..4)
.map(|index| Arc::new(RawTableShard::new(index, 32)) as Arc<dyn StateShard>)
.collect();
let engine = Arc::new(ShardedStateEngine::new(shards, Arc::clone(&metadata)));
let orchestrator = DefaultBlockOrchestrator::new(
engine,
journal,
snapshotter,
Arc::clone(&metadata),
synchronous_settings(),
)
.unwrap();
orchestrator.apply_operations(42, vec![]).unwrap();
assert_eq!(metadata.current_block().unwrap(), 42);
orchestrator.apply_operations(100, vec![]).unwrap();
assert_eq!(metadata.current_block().unwrap(), 100);
orchestrator.revert_to(42).unwrap();
assert_eq!(metadata.current_block().unwrap(), 42);
}
#[test]
fn block_height_must_be_increasing() {
let tmp = tempdir();
let journal_path = tmp.path().join("journal");
let metadata = Arc::new(MemoryMetadataStore::new());
let journal = Arc::new(FileBlockJournal::new(&journal_path).unwrap());
let snapshotter = Arc::new(NoopSnapshotter);
let shards: Vec<Arc<dyn StateShard>> = (0..4)
.map(|index| Arc::new(RawTableShard::new(index, 32)) as Arc<dyn StateShard>)
.collect();
let engine = Arc::new(ShardedStateEngine::new(shards, Arc::clone(&metadata)));
let orchestrator = DefaultBlockOrchestrator::new(
engine,
journal,
snapshotter,
Arc::clone(&metadata),
synchronous_settings(),
)
.unwrap();
let key_a: Key = [1u8; Key::BYTES].into();
orchestrator
.apply_operations(100, vec![operation(key_a, 10)])
.unwrap();
let result = orchestrator.apply_operations(100, vec![operation(key_a, 20)]);
assert!(matches!(
result,
Err(StoreError::BlockIdNotIncreasing { .. })
));
let result = orchestrator.apply_operations(50, vec![operation(key_a, 20)]);
assert!(matches!(
result,
Err(StoreError::BlockIdNotIncreasing { .. })
));
orchestrator
.apply_operations(101, vec![operation(key_a, 20)])
.unwrap();
assert_eq!(orchestrator.fetch(key_a).unwrap(), 20);
}
#[test]
fn allows_genesis_block_zero_once() {
let tmp = tempdir();
let journal_path = tmp.path().join("journal");
let metadata = Arc::new(MemoryMetadataStore::new());
let journal = Arc::new(FileBlockJournal::new(&journal_path).unwrap());
let snapshotter = Arc::new(NoopSnapshotter);
let shards: Vec<Arc<dyn StateShard>> = (0..2)
.map(|index| Arc::new(RawTableShard::new(index, 32)) as Arc<dyn StateShard>)
.collect();
let engine = Arc::new(ShardedStateEngine::new(shards, Arc::clone(&metadata)));
let orchestrator = DefaultBlockOrchestrator::new(
engine,
journal,
snapshotter,
Arc::clone(&metadata),
synchronous_settings(),
)
.unwrap();
let key: Key = [7u8; Key::BYTES].into();
orchestrator
.apply_operations(0, vec![operation(key, 99)])
.expect("genesis block 0 should be accepted");
assert_eq!(orchestrator.fetch(key).unwrap(), 99);
assert_eq!(
metadata.current_block().unwrap(),
0,
"metadata should record the genesis block"
);
let second_zero = orchestrator.apply_operations(0, vec![operation(key, 42)]);
assert!(
matches!(second_zero, Err(StoreError::BlockIdNotIncreasing { .. })),
"block 0 should only be accepted once"
);
orchestrator
.apply_operations(1, vec![operation(key, 100)])
.unwrap();
assert_eq!(metadata.current_block().unwrap(), 1);
}
#[test]
fn empty_value_sets_delete_keys() {
let tmp = tempdir();
let journal_path = tmp.path().join("journal");
let metadata = Arc::new(MemoryMetadataStore::new());
let journal = Arc::new(FileBlockJournal::new(&journal_path).unwrap());
let snapshotter = Arc::new(NoopSnapshotter);
let shards: Vec<Arc<dyn StateShard>> = (0..4)
.map(|index| Arc::new(RawTableShard::new(index, 32)) as Arc<dyn StateShard>)
.collect();
let engine = Arc::new(ShardedStateEngine::new(shards, Arc::clone(&metadata)));
let orchestrator = DefaultBlockOrchestrator::new(
engine,
journal,
snapshotter,
Arc::clone(&metadata),
synchronous_settings(),
)
.unwrap();
let key_a: Key = [1u8; Key::BYTES].into();
orchestrator
.apply_operations(1, vec![operation(key_a, 10)])
.unwrap();
assert_eq!(orchestrator.fetch(key_a).unwrap(), 10);
orchestrator
.apply_operations(2, vec![operation(key_a, Value::empty())])
.unwrap();
assert_eq!(orchestrator.fetch(key_a).unwrap(), 0);
assert_eq!(metadata.current_block().unwrap(), 2);
}