use std::sync::Arc;
use std::sync::Mutex;
use async_trait::async_trait;
use d_engine_core::Error;
use d_engine_core::Lease;
use d_engine_core::LogStore;
use d_engine_core::StorageEngine;
use d_engine_core::config::LeaseConfig;
use d_engine_core::state_machine_test::{StateMachineBuilder, StateMachineTestSuite};
use d_engine_core::storage_engine_test::{StorageEngineBuilder, StorageEngineTestSuite};
use tempfile::TempDir;
use super::RocksDBStateMachine;
use super::RocksDBStorageEngine;
use super::RocksDBUnifiedEngine;
use crate::StateMachine;
use crate::storage::DefaultLease;
struct UnifiedStorageBuilder {
temp_dir: TempDir,
sm: Mutex<Option<RocksDBStateMachine>>,
}
impl UnifiedStorageBuilder {
fn new() -> Self {
Self {
temp_dir: TempDir::new().expect("temp dir"),
sm: Mutex::new(None),
}
}
}
#[async_trait]
impl StorageEngineBuilder for UnifiedStorageBuilder {
type Engine = RocksDBStorageEngine;
async fn build(&self) -> Result<Arc<Self::Engine>, Error> {
{
*self.sm.lock().unwrap() = None;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let (storage, sm) = RocksDBUnifiedEngine::open(self.temp_dir.path())?;
*self.sm.lock().unwrap() = Some(sm);
storage.log_store().flush_async().await?;
Ok(Arc::new(storage))
}
async fn cleanup(&self) -> Result<(), Error> {
*self.sm.lock().unwrap() = None;
let delay = if std::env::var("CI").is_ok() {
std::time::Duration::from_millis(500)
} else {
std::time::Duration::from_millis(100)
};
tokio::time::sleep(delay).await;
Ok(())
}
}
#[tokio::test]
async fn test_unified_storage_engine_suite() {
let builder = UnifiedStorageBuilder::new();
StorageEngineTestSuite::run_all_tests(builder)
.await
.expect("unified storage engine should pass all tests");
}
struct UnifiedStateMachineBuilder {
temp_dir: TempDir,
storage: Mutex<Option<RocksDBStorageEngine>>,
}
impl UnifiedStateMachineBuilder {
fn new() -> Self {
Self {
temp_dir: TempDir::new().expect("temp dir"),
storage: Mutex::new(None),
}
}
}
#[async_trait]
impl StateMachineBuilder for UnifiedStateMachineBuilder {
async fn build(&self) -> Result<Arc<dyn StateMachine>, Error> {
{
*self.storage.lock().unwrap() = None;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let (storage, sm) = RocksDBUnifiedEngine::open(self.temp_dir.path())?;
*self.storage.lock().unwrap() = Some(storage);
Ok(Arc::new(sm))
}
async fn cleanup(&self) -> Result<(), Error> {
*self.storage.lock().unwrap() = None;
let delay = if std::env::var("CI").is_ok() {
std::time::Duration::from_millis(500)
} else {
std::time::Duration::from_millis(100)
};
tokio::time::sleep(delay).await;
Ok(())
}
}
#[tokio::test]
async fn test_unified_state_machine_suite() {
let builder = UnifiedStateMachineBuilder::new();
StateMachineTestSuite::run_all_tests(builder)
.await
.expect("unified state machine should pass all tests");
}
#[test]
fn test_open_creates_db_at_fresh_path() {
let dir = TempDir::new().expect("temp dir");
let result = RocksDBUnifiedEngine::open(dir.path());
assert!(result.is_ok(), "open() must succeed on a fresh path");
}
#[test]
fn test_reopen_after_drop_succeeds() {
let dir = TempDir::new().expect("temp dir");
{
let _handles = RocksDBUnifiedEngine::open(dir.path()).expect("first open");
}
let result = RocksDBUnifiedEngine::open(dir.path());
assert!(
result.is_ok(),
"reopen after all handles dropped must succeed"
);
}
#[test]
fn test_concurrent_open_same_path_fails() {
let dir = TempDir::new().expect("temp dir");
let _first = RocksDBUnifiedEngine::open(dir.path()).expect("first open");
let second = RocksDBUnifiedEngine::open(dir.path());
assert!(
second.is_err(),
"second open on a live DB must fail with lock error"
);
}
fn make_lease() -> Arc<DefaultLease> {
Arc::new(DefaultLease::new(LeaseConfig {
enabled: true,
cleanup_interval_ms: 1000,
max_cleanup_duration_ms: 10,
}))
}
#[tokio::test]
async fn test_load_lease_data_without_lease_is_noop() {
let dir = TempDir::new().expect("temp dir");
let (_storage, sm) = RocksDBUnifiedEngine::open(dir.path()).expect("open");
let result = sm.load_lease_data().await;
assert!(
result.is_ok(),
"load_lease_data with no lease must not fail"
);
}
#[tokio::test]
async fn test_persist_and_reload_lease_round_trip() {
let dir = TempDir::new().expect("temp dir");
let lease = make_lease();
{
let (_storage, mut sm) = RocksDBUnifiedEngine::open(dir.path()).expect("open");
sm.set_lease(Arc::clone(&lease));
lease.register(bytes::Bytes::from("persistent_key"), 3600);
sm.stop().expect("stop");
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let new_lease = make_lease();
let (_storage2, mut sm2) = RocksDBUnifiedEngine::open(dir.path()).expect("reopen");
sm2.set_lease(Arc::clone(&new_lease));
sm2.load_lease_data().await.expect("load_lease_data");
assert!(
new_lease.has_lease_keys(),
"reloaded lease must contain the persisted key"
);
assert_eq!(new_lease.len(), 1, "exactly one key should be restored");
}
#[tokio::test]
async fn test_load_lease_data_with_empty_db_succeeds() {
let dir = TempDir::new().expect("temp dir");
let (_storage, mut sm) = RocksDBUnifiedEngine::open(dir.path()).expect("open");
sm.set_lease(make_lease());
let result = sm.load_lease_data().await;
assert!(result.is_ok(), "load_lease_data on empty DB must succeed");
}