use super::RocksDBStorageEngine;
use super::RocksDBUnifiedEngine;
use crate::{Error, StateMachine};
use async_trait::async_trait;
use bytes::Bytes;
use d_engine_core::state_machine_test::{StateMachineBuilder, StateMachineTestSuite};
use d_engine_core::{ApplyEntry, Command};
use std::sync::Arc;
use std::sync::Mutex;
use tempfile::TempDir;
struct RocksDBStateMachineBuilder {
temp_dir: TempDir,
storage: Mutex<Option<RocksDBStorageEngine>>,
}
impl RocksDBStateMachineBuilder {
fn new() -> Self {
Self {
temp_dir: TempDir::new().expect("Failed to create temp dir"),
storage: Mutex::new(None),
}
}
}
#[async_trait]
impl StateMachineBuilder for RocksDBStateMachineBuilder {
async fn build(&self) -> Result<Arc<dyn StateMachine>, Error> {
{
*self.storage.lock().unwrap() = None;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let path = self.temp_dir.path().join("rocksdb_sm");
let (storage, sm) = RocksDBUnifiedEngine::open(&path)?;
*self.storage.lock().unwrap() = Some(storage);
Ok(Arc::new(sm))
}
async fn cleanup(&self) -> Result<(), Error> {
*self.storage.lock().unwrap() = None;
let delay = if std::env::var("CI").is_ok() {
std::time::Duration::from_millis(500)
} else {
std::time::Duration::from_millis(100)
};
tokio::time::sleep(delay).await;
Ok(())
}
}
#[tokio::test]
async fn test_rocksdb_state_machine_suite() {
let builder = RocksDBStateMachineBuilder::new();
StateMachineTestSuite::run_all_tests(builder)
.await
.expect("RocksDBStateMachine should pass all tests");
}
#[tokio::test]
#[ignore]
async fn test_rocksdb_state_machine_performance() {
let builder = RocksDBStateMachineBuilder::new();
StateMachineTestSuite::run_performance_tests(builder)
.await
.expect("RocksDBStateMachine should pass performance tests");
}
#[tokio::test]
async fn test_get_rejected_when_not_serving() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let db_path = temp_dir.path().join("test_not_serving");
let (_storage, state_machine) =
RocksDBUnifiedEngine::open(&db_path).expect("Failed to open unified DB");
state_machine.start().await.expect("Failed to start");
let test_key = b"test_key";
let test_value = b"test_value";
let entry = ApplyEntry {
index: 1,
term: 1,
command: Command::Insert {
key: Bytes::from(test_key.to_vec()),
value: Bytes::from(test_value.to_vec()),
ttl_secs: None,
},
};
state_machine.apply_chunk(&[entry]).await.expect("Failed to apply entry");
let value = state_machine.get(test_key).expect("Failed to get");
assert_eq!(value, Some(Bytes::from(test_value.to_vec())));
state_machine.stop().expect("Failed to stop");
let result = state_machine.get(test_key);
assert!(result.is_err(), "Read should fail when not serving");
let err = result.unwrap_err();
let err_debug = format!("{err:?}"); assert!(
err_debug.contains("NotServing")
|| err_debug.contains("not serving")
|| err_debug.contains("restoring from snapshot"),
"Error should indicate not serving state, got: {err_debug}"
);
state_machine.start().await.expect("Failed to restart");
let value = state_machine.get(test_key).expect("Failed to get after resuming");
assert_eq!(
value,
Some(Bytes::from(test_value.to_vec())),
"Data should be accessible after resuming service"
);
}
#[tokio::test]
async fn test_scan_prefix_returns_only_matching_keys() {
let dir = tempfile::TempDir::new().unwrap();
let (_storage, sm) = RocksDBUnifiedEngine::open(dir.path()).unwrap();
sm.apply_chunk(&[
insert_at(b"/services/node1", b"10.0.0.1", 1),
insert_at(b"/services/node2", b"10.0.0.2", 2),
insert_at(b"/other/key", b"must_not_appear", 3),
])
.await
.unwrap();
let result = sm.scan_prefix(b"/services/").unwrap();
assert_eq!(
result.entries.len(),
2,
"only /services/ keys should be returned"
);
let keys: Vec<&Bytes> = result.entries.iter().map(|(k, _)| k).collect();
assert!(keys.contains(&&Bytes::from_static(b"/services/node1")));
assert!(keys.contains(&&Bytes::from_static(b"/services/node2")));
}
#[tokio::test]
async fn test_scan_prefix_upper_bound_excludes_adjacent_namespace() {
let dir = tempfile::TempDir::new().unwrap();
let (_storage, sm) = RocksDBUnifiedEngine::open(dir.path()).unwrap();
sm.apply_chunk(&[
insert_at(b"/services/last", b"v", 1),
insert_at(b"/t/trap", b"must_not_appear", 2),
])
.await
.unwrap();
let result = sm.scan_prefix(b"/services/").unwrap();
assert_eq!(
result.entries.len(),
1,
"/t/trap must not appear in /services/ scan"
);
assert_eq!(result.entries[0].0, Bytes::from_static(b"/services/last"));
}
#[tokio::test]
async fn test_scan_prefix_empty_namespace_returns_empty_vec() {
let dir = tempfile::TempDir::new().unwrap();
let (_storage, sm) = RocksDBUnifiedEngine::open(dir.path()).unwrap();
sm.apply_chunk(&[insert_at(b"/other/key", b"v", 1)]).await.unwrap();
let result = sm.scan_prefix(b"/missing/").unwrap();
assert!(
result.entries.is_empty(),
"missing prefix must return empty entries, not an error"
);
}
#[tokio::test]
async fn test_scan_prefix_revision_reflects_applied_index() {
let dir = tempfile::TempDir::new().unwrap();
let (_storage, sm) = RocksDBUnifiedEngine::open(dir.path()).unwrap();
sm.apply_chunk(&[
insert_at(b"/s/a", b"1", 1),
insert_at(b"/s/b", b"2", 2),
insert_at(b"/s/c", b"3", 3),
])
.await
.unwrap();
let result = sm.scan_prefix(b"/s/").unwrap();
assert_eq!(result.entries.len(), 3);
assert_eq!(
result.revision, 3,
"revision must equal the applied index after the writes"
);
}
#[tokio::test]
async fn test_scan_prefix_0xff_suffix_carry_propagation() {
let dir = tempfile::TempDir::new().unwrap();
let (_storage, sm) = RocksDBUnifiedEngine::open(dir.path()).unwrap();
let prefix: &[u8] = b"\x61\xFF";
let key: &[u8] = b"\x61\xFFnode";
let decoy: &[u8] = b"\x62decoy";
sm.apply_chunk(&[
insert_at(key, b"value", 1),
insert_at(decoy, b"must_not_appear", 2),
])
.await
.unwrap();
let result = sm.scan_prefix(prefix).unwrap();
assert_eq!(
result.entries.len(),
1,
"only key under 0xFF prefix should appear"
);
assert_eq!(result.entries[0].0, Bytes::copy_from_slice(key));
}
#[tokio::test]
async fn test_scan_prefix_empty_prefix_returns_empty() {
let dir = tempfile::TempDir::new().unwrap();
let (_storage, sm) = RocksDBUnifiedEngine::open(dir.path()).unwrap();
sm.apply_chunk(&[insert_at(b"/services/node1", b"v", 1)]).await.unwrap();
let result = sm.scan_prefix(b"").unwrap();
assert!(
result.entries.is_empty(),
"empty prefix should return empty entries (not all keys)"
);
}
#[tokio::test]
async fn test_scan_prefix_all_0xff_no_upper_bound() {
let dir = tempfile::TempDir::new().unwrap();
let (_storage, sm) = RocksDBUnifiedEngine::open(dir.path()).unwrap();
let prefix: &[u8] = b"\xFF\xFF";
let key: &[u8] = b"\xFF\xFF\x01";
let decoy: &[u8] = b"\xFE\xAA";
sm.apply_chunk(&[
insert_at(key, b"top_value", 1),
insert_at(decoy, b"must_not_appear", 2),
])
.await
.unwrap();
let result = sm.scan_prefix(prefix).unwrap();
assert_eq!(
result.entries.len(),
1,
"only key under all-0xFF prefix should appear"
);
assert_eq!(result.entries[0].0, Bytes::copy_from_slice(key));
}
fn insert_at(
key: &[u8],
value: &[u8],
index: u64,
) -> ApplyEntry {
ApplyEntry {
index,
term: 1,
command: Command::Insert {
key: Bytes::copy_from_slice(key),
value: Bytes::copy_from_slice(value),
ttl_secs: None,
},
}
}
fn insert_with_ttl(
key: &[u8],
value: &[u8],
ttl_secs: u64,
index: u64,
) -> ApplyEntry {
ApplyEntry {
index,
term: 1,
command: Command::Insert {
key: Bytes::copy_from_slice(key),
value: Bytes::copy_from_slice(value),
ttl_secs: if ttl_secs > 0 { Some(ttl_secs) } else { None },
},
}
}
fn delete_at(
key: &[u8],
index: u64,
) -> ApplyEntry {
ApplyEntry {
index,
term: 1,
command: Command::Delete {
key: Bytes::copy_from_slice(key),
},
}
}
fn cas_at(
key: &[u8],
expected: Option<&[u8]>,
new_value: &[u8],
index: u64,
) -> ApplyEntry {
ApplyEntry {
index,
term: 1,
command: Command::CompareAndSwap {
key: Bytes::copy_from_slice(key),
expected: expected.map(Bytes::copy_from_slice),
value: Bytes::copy_from_slice(new_value),
},
}
}
#[tokio::test]
async fn test_apply_chunk_cas_match_succeeds_and_writes_new_value() {
let dir = tempfile::TempDir::new().unwrap();
let (_storage, sm) = RocksDBUnifiedEngine::open(dir.path()).unwrap();
sm.apply_chunk(&[insert_with_ttl(b"k", b"old", 0, 1)]).await.unwrap();
let results = sm.apply_chunk(&[cas_at(b"k", Some(b"old"), b"new", 2)]).await.unwrap();
assert!(
results[0].succeeded,
"CAS should succeed when current == expected"
);
assert_eq!(sm.get(b"k").unwrap(), Some(Bytes::from("new")));
}
#[tokio::test]
async fn test_apply_chunk_cas_mismatch_returns_failure_and_preserves_value() {
let dir = tempfile::TempDir::new().unwrap();
let (_storage, sm) = RocksDBUnifiedEngine::open(dir.path()).unwrap();
sm.apply_chunk(&[insert_with_ttl(b"k", b"current", 0, 1)]).await.unwrap();
let results = sm
.apply_chunk(&[cas_at(b"k", Some(b"wrong_expected"), b"new", 2)])
.await
.unwrap();
assert!(
!results[0].succeeded,
"CAS should fail when current != expected"
);
assert_eq!(sm.get(b"k").unwrap(), Some(Bytes::from("current")));
}
#[tokio::test]
async fn test_apply_chunk_cas_none_expected_on_absent_key_succeeds() {
let dir = tempfile::TempDir::new().unwrap();
let (_storage, sm) = RocksDBUnifiedEngine::open(dir.path()).unwrap();
let results = sm.apply_chunk(&[cas_at(b"new_key", None, b"value", 1)]).await.unwrap();
assert!(
results[0].succeeded,
"CAS(None→value) on absent key should succeed"
);
assert_eq!(sm.get(b"new_key").unwrap(), Some(Bytes::from("value")));
}
#[tokio::test]
async fn test_apply_chunk_cas_none_expected_on_existing_key_fails() {
let dir = tempfile::TempDir::new().unwrap();
let (_storage, sm) = RocksDBUnifiedEngine::open(dir.path()).unwrap();
sm.apply_chunk(&[insert_with_ttl(b"k", b"exists", 0, 1)]).await.unwrap();
let results = sm.apply_chunk(&[cas_at(b"k", None, b"new", 2)]).await.unwrap();
assert!(
!results[0].succeeded,
"CAS(None) should fail when key exists"
);
assert_eq!(sm.get(b"k").unwrap(), Some(Bytes::from("exists")));
}
#[tokio::test]
async fn test_apply_chunk_two_cas_same_key_second_must_see_first_write() {
let dir = tempfile::TempDir::new().unwrap();
let (_storage, sm) = RocksDBUnifiedEngine::open(dir.path()).unwrap();
sm.apply_chunk(&[insert_with_ttl(b"k", b"v0", 0, 1)]).await.unwrap();
let results = sm
.apply_chunk(&[
cas_at(b"k", Some(b"v0"), b"v1", 2),
cas_at(b"k", Some(b"v0"), b"v2", 3),
])
.await
.unwrap();
assert!(results[0].succeeded, "CAS1 (v0→v1) must succeed");
assert!(
!results[1].succeeded,
"CAS2 (v0→v2) must fail: CAS1 already wrote v1, so expected=v0 no longer matches"
);
assert_eq!(
sm.get(b"k").unwrap(),
Some(Bytes::from("v1")),
"final value must be v1; CAS2 must not overwrite CAS1's committed write"
);
}
#[tokio::test]
async fn test_apply_chunk_delete_removes_key() {
let dir = tempfile::TempDir::new().unwrap();
let (_storage, sm) = RocksDBUnifiedEngine::open(dir.path()).unwrap();
sm.apply_chunk(&[insert_with_ttl(b"to_delete", b"v", 0, 1)]).await.unwrap();
sm.apply_chunk(&[delete_at(b"to_delete", 2)]).await.unwrap();
assert_eq!(sm.get(b"to_delete").unwrap(), None);
}
#[tokio::test]
async fn test_apply_chunk_returns_error_on_read_only_db() {
use rocksdb::{DB, Options};
let dir = tempfile::TempDir::new().unwrap();
let db_path = dir.path().join("rocksdb");
let (_storage, sm) = RocksDBUnifiedEngine::open(&db_path).unwrap();
sm.apply_chunk(&[insert_with_ttl(b"k", b"v", 0, 1)]).await.unwrap();
let ro_db = DB::open_cf_for_read_only(
&Options::default(),
&db_path,
[super::STATE_MACHINE_CF, super::STATE_MACHINE_META_CF],
false,
)
.unwrap();
sm.swap_db_for_test(ro_db);
let result = sm.apply_chunk(&[insert_with_ttl(b"k2", b"v2", 0, 2)]).await;
assert!(
result.is_err(),
"apply_chunk should return Err when DB is read-only"
);
}
#[tokio::test]
async fn test_snapshot_join_error_reports_panic() {
use super::RocksDBStateMachine;
let handle = tokio::task::spawn_blocking(|| -> Result<(), crate::Error> {
panic!("intentional test panic");
});
let join_err = handle.await.unwrap_err();
let storage_err = RocksDBStateMachine::map_snapshot_join_error(join_err);
assert!(
storage_err.to_string().contains("panicked"),
"expected 'panicked' in error, got: {storage_err}"
);
}
#[tokio::test]
async fn test_snapshot_join_error_reports_cancellation() {
use super::RocksDBStateMachine;
let handle = tokio::task::spawn_blocking(|| -> Result<(), crate::Error> {
std::thread::sleep(std::time::Duration::from_secs(60));
Ok(())
});
handle.abort();
let join_err = handle.await.unwrap_err();
let storage_err = RocksDBStateMachine::map_snapshot_join_error(join_err);
assert!(
storage_err.to_string().contains("cancelled"),
"expected 'cancelled' in error, got: {storage_err}"
);
}