#![allow(clippy::field_reassign_with_default)]
#![allow(clippy::uninlined_format_args)]
mod lease_tests {
use std::thread::sleep;
use std::time::Duration;
use bytes::Bytes;
use d_engine_core::Lease;
use crate::storage::DefaultLease;
#[test]
fn test_register_and_get_expired() {
let config = d_engine_core::config::LeaseConfig::default();
let manager = DefaultLease::new(config);
manager.register(Bytes::from("key1"), 1);
manager.register(Bytes::from("key2"), 1);
assert_eq!(manager.len(), 2);
let expired = manager.get_expired_keys(std::time::SystemTime::now());
assert_eq!(expired.len(), 0);
sleep(Duration::from_secs(2));
let expired = manager.get_expired_keys(std::time::SystemTime::now());
assert_eq!(expired.len(), 2);
assert_eq!(manager.len(), 0);
}
#[test]
fn test_unregister() {
let config = d_engine_core::config::LeaseConfig::default();
let manager = DefaultLease::new(config);
manager.register(Bytes::from("key1"), 10);
assert_eq!(manager.len(), 1);
manager.unregister(b"key1");
assert_eq!(manager.len(), 0);
}
#[test]
fn test_update_ttl() {
let config = d_engine_core::config::LeaseConfig::default();
let manager = DefaultLease::new(config);
manager.register(Bytes::from("key1"), 10);
manager.register(Bytes::from("key1"), 20);
assert_eq!(manager.len(), 1);
}
#[test]
fn test_snapshot_roundtrip() {
let config = d_engine_core::config::LeaseConfig::default();
let manager = DefaultLease::new(config.clone());
manager.register(Bytes::from("key1"), 3600);
manager.register(Bytes::from("key2"), 7200);
let snapshot = manager.to_snapshot();
let restored = DefaultLease::from_snapshot(&snapshot, config);
assert_eq!(restored.len(), 2);
}
#[test]
fn test_snapshot_filters_expired() {
let config = d_engine_core::config::LeaseConfig::default();
let manager = DefaultLease::new(config.clone());
manager.register(Bytes::from("key1"), 1);
manager.register(Bytes::from("key2"), 3600);
sleep(Duration::from_secs(2));
let snapshot = manager.to_snapshot();
let restored = DefaultLease::from_snapshot(&snapshot, config);
assert_eq!(restored.len(), 1);
}
}
mod file_state_machine_tests {
use std::time::Duration;
use bytes::Bytes;
use d_engine_core::StateMachine;
use d_engine_proto::client::WriteCommand;
use d_engine_proto::client::write_command::Insert;
use d_engine_proto::client::write_command::Operation;
use d_engine_proto::common::Entry;
use d_engine_proto::common::EntryPayload;
use d_engine_proto::common::entry_payload::Payload;
use prost::Message;
use tempfile::TempDir;
use tokio::time::sleep;
use crate::storage::DefaultLease;
use crate::storage::FileStateMachine;
async fn create_file_state_machine_with_lease(
path: std::path::PathBuf,
lease_config: d_engine_core::config::LeaseConfig,
) -> FileStateMachine {
let mut sm = FileStateMachine::new(path).await.unwrap();
let lease = std::sync::Arc::new(DefaultLease::new(lease_config));
sm.set_lease(lease);
sm.load_lease_data().await.unwrap();
sm
}
fn create_insert_entry(
index: u64,
term: u64,
key: &[u8],
value: &[u8],
ttl_secs: u64,
) -> Entry {
let insert = Insert {
key: Bytes::from(key.to_vec()),
value: Bytes::from(value.to_vec()),
ttl_secs,
};
let write_cmd = WriteCommand {
operation: Some(Operation::Insert(insert)),
};
let payload = Payload::Command(write_cmd.encode_to_vec().into());
Entry {
index,
term,
payload: Some(EntryPayload {
payload: Some(payload),
}),
}
}
#[tokio::test]
async fn test_ttl_expiration_after_apply() {
let temp_dir = TempDir::new().unwrap();
let lease_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1,
};
let sm =
create_file_state_machine_with_lease(temp_dir.path().to_path_buf(), lease_config).await;
let entry = create_insert_entry(1, 1, b"ttl_key", b"ttl_value", 2);
sm.apply_chunk(vec![entry]).await.unwrap();
let value = sm.get(b"ttl_key").unwrap();
assert_eq!(value, Some(Bytes::from("ttl_value")));
sleep(Duration::from_secs(3)).await;
sm.lease_background_cleanup().await.unwrap();
let value = sm.get(b"ttl_key").unwrap();
assert_eq!(value, None);
}
#[tokio::test]
async fn test_ttl_snapshot_persistence() {
let temp_dir = TempDir::new().unwrap();
let lease_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1,
};
let sm = create_file_state_machine_with_lease(
temp_dir.path().to_path_buf(),
lease_config.clone(),
)
.await;
let entry = create_insert_entry(1, 1, b"persistent_key", b"persistent_value", 3600);
sm.apply_chunk(vec![entry]).await.unwrap();
let snapshot_dir = temp_dir.path().join("snapshot");
sm.generate_snapshot_data(
snapshot_dir.clone(),
d_engine_proto::common::LogId { index: 1, term: 1 },
)
.await
.unwrap();
let temp_dir2 = TempDir::new().unwrap();
let sm2 =
create_file_state_machine_with_lease(temp_dir2.path().to_path_buf(), lease_config)
.await;
sm2.apply_snapshot_from_file(
&d_engine_proto::server::storage::SnapshotMetadata {
last_included: Some(d_engine_proto::common::LogId { index: 1, term: 1 }),
checksum: Bytes::new(),
},
snapshot_dir,
)
.await
.unwrap();
let value = sm2.get(b"persistent_key").unwrap();
assert_eq!(value, Some(Bytes::from("persistent_value")));
}
#[tokio::test]
async fn test_file_state_machine_ttl_persistence_across_restart() {
let temp_dir = TempDir::new().unwrap();
let state_machine_path = temp_dir.path().to_path_buf();
let lease_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1,
};
{
let sm = create_file_state_machine_with_lease(
state_machine_path.clone(),
lease_config.clone(),
)
.await;
let entry1 = create_insert_entry(1, 1, b"short_ttl_key", b"value1", 2);
let entry2 = create_insert_entry(2, 1, b"long_ttl_key", b"value2", 3600);
let entry3 = create_insert_entry(3, 1, b"no_ttl_key", b"value3", 0);
sm.apply_chunk(vec![entry1, entry2, entry3]).await.unwrap();
assert_eq!(
sm.get(b"short_ttl_key").unwrap(),
Some(Bytes::from("value1"))
);
assert_eq!(
sm.get(b"long_ttl_key").unwrap(),
Some(Bytes::from("value2"))
);
assert_eq!(sm.get(b"no_ttl_key").unwrap(), Some(Bytes::from("value3")));
sm.stop().unwrap();
}
{
let sm = create_file_state_machine_with_lease(state_machine_path.clone(), lease_config)
.await;
assert_eq!(
sm.get(b"short_ttl_key").unwrap(),
Some(Bytes::from("value1"))
);
assert_eq!(
sm.get(b"long_ttl_key").unwrap(),
Some(Bytes::from("value2"))
);
assert_eq!(sm.get(b"no_ttl_key").unwrap(), Some(Bytes::from("value3")));
sleep(Duration::from_secs(3)).await;
sm.lease_background_cleanup().await.unwrap();
assert_eq!(sm.get(b"short_ttl_key").unwrap(), None);
assert_eq!(
sm.get(b"long_ttl_key").unwrap(),
Some(Bytes::from("value2"))
);
assert_eq!(sm.get(b"no_ttl_key").unwrap(), Some(Bytes::from("value3")));
}
}
#[tokio::test]
async fn test_ttl_update_cancels_previous() {
let temp_dir = TempDir::new().unwrap();
let lease_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1,
};
let sm =
create_file_state_machine_with_lease(temp_dir.path().to_path_buf(), lease_config).await;
let entry1 = create_insert_entry(1, 1, b"update_key", b"value1", 2);
sm.apply_chunk(vec![entry1]).await.unwrap();
let entry2 = create_insert_entry(2, 1, b"update_key", b"value2", 10);
sm.apply_chunk(vec![entry2]).await.unwrap();
sleep(Duration::from_secs(3)).await;
sm.lease_background_cleanup().await.unwrap();
let value = sm.get(b"update_key").unwrap();
assert_eq!(value, Some(Bytes::from("value2")));
}
#[tokio::test]
async fn test_crash_safe_ttl_wal_replay_skips_expired() {
let temp_dir = TempDir::new().unwrap();
let state_machine_path = temp_dir.path().to_path_buf();
let lease_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1,
};
{
let sm = create_file_state_machine_with_lease(
state_machine_path.clone(),
lease_config.clone(),
)
.await;
let entry1 = create_insert_entry(1, 1, b"expired_key", b"value1", 1);
let entry2 = create_insert_entry(2, 1, b"valid_key", b"value2", 3600);
let entry3 = create_insert_entry(3, 1, b"permanent_key", b"value3", 0);
sm.apply_chunk(vec![entry1, entry2, entry3]).await.unwrap();
assert_eq!(sm.get(b"expired_key").unwrap(), Some(Bytes::from("value1")));
assert_eq!(sm.get(b"valid_key").unwrap(), Some(Bytes::from("value2")));
assert_eq!(
sm.get(b"permanent_key").unwrap(),
Some(Bytes::from("value3"))
);
sleep(Duration::from_secs(2)).await;
sm.lease_background_cleanup().await.unwrap();
assert_eq!(
sm.get(b"expired_key").unwrap(),
None,
"Key should be expired after background cleanup"
);
let data_path = state_machine_path.join("state.data");
if data_path.exists() {
std::fs::remove_file(&data_path).unwrap();
}
let ttl_path = state_machine_path.join("ttl_state.bin");
if ttl_path.exists() {
std::fs::remove_file(&ttl_path).unwrap();
}
drop(sm);
}
{
let sm = create_file_state_machine_with_lease(state_machine_path.clone(), lease_config)
.await;
assert_eq!(
sm.get(b"expired_key").unwrap(),
None,
"Expired key should NOT be restored from WAL"
);
assert_eq!(
sm.get(b"valid_key").unwrap(),
Some(Bytes::from("value2")),
"Valid key should be restored from WAL"
);
assert_eq!(
sm.get(b"permanent_key").unwrap(),
Some(Bytes::from("value3")),
"Permanent key should be restored from WAL"
);
sm.stop().unwrap();
}
}
#[tokio::test]
async fn test_wal_replay_handles_incomplete_entries() {
let temp_dir = TempDir::new().unwrap();
let state_machine_path = temp_dir.path().to_path_buf();
let wal_path = state_machine_path.join("wal.log");
let mut wal_data = Vec::new();
wal_data.extend_from_slice(&1u64.to_be_bytes()); wal_data.extend_from_slice(&1u64.to_be_bytes()); wal_data.push(1u8); wal_data.extend_from_slice(&8u64.to_be_bytes()); wal_data.extend_from_slice(b"complete"); wal_data.extend_from_slice(&6u64.to_be_bytes()); wal_data.extend_from_slice(b"value1"); wal_data.extend_from_slice(&0u64.to_be_bytes());
wal_data.extend_from_slice(&2u64.to_be_bytes()); wal_data.extend_from_slice(&1u64.to_be_bytes()); wal_data.push(1u8); wal_data.extend_from_slice(&10u64.to_be_bytes()); wal_data.extend_from_slice(b"incomplete"); wal_data.extend_from_slice(&6u64.to_be_bytes()); wal_data.extend_from_slice(b"value2");
std::fs::write(&wal_path, wal_data).unwrap();
let sm = FileStateMachine::new(state_machine_path.clone()).await.unwrap();
let result = sm.get(b"complete").unwrap();
assert_eq!(result, Some(Bytes::from("value1")));
let result = sm.get(b"incomplete").unwrap();
assert_eq!(result, Some(Bytes::from("value2")));
}
#[tokio::test]
async fn test_wal_replay_all_expired_empty_state() {
let temp_dir = TempDir::new().unwrap();
let state_machine_path = temp_dir.path().to_path_buf();
let now = std::time::SystemTime::now();
let expired_time = now - std::time::Duration::from_secs(100);
let expire_at_secs = expired_time.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
let wal_path = state_machine_path.join("wal.log");
let mut wal_data = Vec::new();
wal_data.extend_from_slice(&1u64.to_be_bytes()); wal_data.extend_from_slice(&1u64.to_be_bytes()); wal_data.push(1u8); wal_data.extend_from_slice(&4u64.to_be_bytes()); wal_data.extend_from_slice(b"key1"); wal_data.extend_from_slice(&6u64.to_be_bytes()); wal_data.extend_from_slice(b"value1"); wal_data.extend_from_slice(&expire_at_secs.to_be_bytes());
wal_data.extend_from_slice(&2u64.to_be_bytes()); wal_data.extend_from_slice(&1u64.to_be_bytes()); wal_data.push(1u8); wal_data.extend_from_slice(&4u64.to_be_bytes()); wal_data.extend_from_slice(b"key2"); wal_data.extend_from_slice(&6u64.to_be_bytes()); wal_data.extend_from_slice(b"value2"); wal_data.extend_from_slice(&expire_at_secs.to_be_bytes());
std::fs::write(&wal_path, wal_data).unwrap();
let sm = FileStateMachine::new(state_machine_path.clone()).await.unwrap();
assert_eq!(sm.get(b"key1").unwrap(), None);
assert_eq!(sm.get(b"key2").unwrap(), None);
}
#[tokio::test]
async fn test_wal_replay_mixed_expired_and_valid() {
let temp_dir = TempDir::new().unwrap();
let state_machine_path = temp_dir.path().to_path_buf();
let now = std::time::SystemTime::now();
let expired_time = now - std::time::Duration::from_secs(100);
let future_time = now + std::time::Duration::from_secs(3600);
let expire_at_expired =
expired_time.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
let expire_at_future = future_time.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
let wal_path = state_machine_path.join("wal.log");
let mut wal_data = Vec::new();
wal_data.extend_from_slice(&1u64.to_be_bytes()); wal_data.extend_from_slice(&1u64.to_be_bytes()); wal_data.push(1u8); wal_data.extend_from_slice(&11u64.to_be_bytes()); wal_data.extend_from_slice(b"expired_key"); wal_data.extend_from_slice(&6u64.to_be_bytes()); wal_data.extend_from_slice(b"value1"); wal_data.extend_from_slice(&expire_at_expired.to_be_bytes());
wal_data.extend_from_slice(&2u64.to_be_bytes()); wal_data.extend_from_slice(&1u64.to_be_bytes()); wal_data.push(1u8); wal_data.extend_from_slice(&9u64.to_be_bytes()); wal_data.extend_from_slice(b"valid_key"); wal_data.extend_from_slice(&6u64.to_be_bytes()); wal_data.extend_from_slice(b"value2"); wal_data.extend_from_slice(&expire_at_future.to_be_bytes());
wal_data.extend_from_slice(&3u64.to_be_bytes()); wal_data.extend_from_slice(&1u64.to_be_bytes()); wal_data.push(1u8); wal_data.extend_from_slice(&13u64.to_be_bytes()); wal_data.extend_from_slice(b"permanent_key"); wal_data.extend_from_slice(&6u64.to_be_bytes()); wal_data.extend_from_slice(b"value3"); wal_data.extend_from_slice(&0u64.to_be_bytes());
std::fs::write(&wal_path, wal_data).unwrap();
let sm = FileStateMachine::new(state_machine_path.clone()).await.unwrap();
assert_eq!(sm.get(b"expired_key").unwrap(), None);
assert_eq!(sm.get(b"valid_key").unwrap(), Some(Bytes::from("value2")));
assert_eq!(
sm.get(b"permanent_key").unwrap(),
Some(Bytes::from("value3"))
);
}
#[tokio::test]
async fn test_background_strategy_manual_cleanup() {
let temp_dir = TempDir::new().unwrap();
let lease_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 50,
};
let sm =
create_file_state_machine_with_lease(temp_dir.path().to_path_buf(), lease_config).await;
for i in 0..20 {
let key = format!("bg_key_{}", i);
let entry = create_insert_entry(i + 1, 1, key.as_bytes(), b"value", 1);
sm.apply_chunk(vec![entry]).await.unwrap();
}
for i in 0..20 {
let key = format!("bg_key_{}", i);
assert_eq!(sm.get(key.as_bytes()).unwrap(), Some(Bytes::from("value")));
}
tokio::time::sleep(Duration::from_secs(2)).await;
for i in 0..20 {
let key = format!("bg_key_{}", i);
assert_eq!(sm.get(key.as_bytes()).unwrap(), Some(Bytes::from("value")));
}
let cleaned = sm.lease_background_cleanup().await.unwrap();
assert_eq!(
cleaned.len(),
20,
"Background cleanup should remove all 20 expired keys"
);
for i in 0..20 {
let key = format!("bg_key_{}", i);
assert_eq!(sm.get(key.as_bytes()).unwrap(), None);
}
}
#[tokio::test]
async fn test_background_cleanup_respects_max_duration() {
let temp_dir = TempDir::new().unwrap();
let lease_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1, };
let sm =
create_file_state_machine_with_lease(temp_dir.path().to_path_buf(), lease_config).await;
for i in 0..100 {
let key = format!("duration_key_{}", i);
let entry = create_insert_entry(i + 1, 1, key.as_bytes(), b"value", 1);
sm.apply_chunk(vec![entry]).await.unwrap();
}
tokio::time::sleep(Duration::from_secs(2)).await;
let start = std::time::Instant::now();
let cleaned = sm.lease_background_cleanup().await.unwrap();
let duration = start.elapsed();
assert!(
!cleaned.is_empty(),
"Background cleanup should clean at least some expired keys"
);
assert!(
duration < Duration::from_millis(100),
"Cleanup should respect max duration (took {:?})",
duration
);
}
}
#[cfg(all(test, feature = "rocksdb"))]
mod rocksdb_state_machine_tests {
use std::time::Duration;
use bytes::Bytes;
use d_engine_core::StateMachine;
use d_engine_proto::client::WriteCommand;
use d_engine_proto::client::write_command::Delete;
use d_engine_proto::client::write_command::Insert;
use d_engine_proto::client::write_command::Operation;
use d_engine_proto::common::Entry;
use d_engine_proto::common::EntryPayload;
use d_engine_proto::common::entry_payload::Payload;
use prost::Message;
use tempfile::TempDir;
use tokio::time::sleep;
use crate::storage::RocksDBStateMachine;
async fn create_rocksdb_state_machine_with_lease(
path: std::path::PathBuf,
lease_config: d_engine_core::config::LeaseConfig,
) -> RocksDBStateMachine {
let mut sm = RocksDBStateMachine::new(path).unwrap();
let lease = std::sync::Arc::new(crate::storage::DefaultLease::new(lease_config));
sm.set_lease(lease);
sm.load_lease_data().await.unwrap();
sm
}
fn create_insert_entry(
index: u64,
term: u64,
key: &[u8],
value: &[u8],
ttl_secs: u64,
) -> Entry {
let insert = Insert {
key: Bytes::from(key.to_vec()),
value: Bytes::from(value.to_vec()),
ttl_secs,
};
let write_cmd = WriteCommand {
operation: Some(Operation::Insert(insert)),
};
let payload = Payload::Command(write_cmd.encode_to_vec().into());
Entry {
index,
term,
payload: Some(EntryPayload {
payload: Some(payload),
}),
}
}
fn create_delete_entry(
index: u64,
term: u64,
key: &[u8],
) -> Entry {
let delete = Delete {
key: Bytes::from(key.to_vec()),
};
let write_cmd = WriteCommand {
operation: Some(Operation::Delete(delete)),
};
let payload = Payload::Command(write_cmd.encode_to_vec().into());
Entry {
index,
term,
payload: Some(EntryPayload {
payload: Some(payload),
}),
}
}
#[tokio::test]
async fn test_rocksdb_ttl_expiration_after_apply() {
let temp_dir = TempDir::new().unwrap();
let ttl_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1,
};
let sm =
create_rocksdb_state_machine_with_lease(temp_dir.path().join("rocksdb"), ttl_config)
.await;
let entry = create_insert_entry(1, 1, b"ttl_key", b"ttl_value", 2);
sm.apply_chunk(vec![entry]).await.unwrap();
let value = sm.get(b"ttl_key").unwrap();
assert_eq!(value, Some(Bytes::from("ttl_value")));
sleep(Duration::from_secs(3)).await;
sm.lease_background_cleanup().await.unwrap();
let value = sm.get(b"ttl_key").unwrap();
assert_eq!(value, None);
}
#[tokio::test]
#[ignore] async fn test_rocksdb_ttl_snapshot_persistence() {
let temp_dir = TempDir::new().unwrap();
let ttl_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1,
};
let sm = create_rocksdb_state_machine_with_lease(
temp_dir.path().join("rocksdb"),
ttl_config.clone(),
)
.await;
let entry = create_insert_entry(1, 1, b"persistent_key", b"persistent_value", 3600);
sm.apply_chunk(vec![entry]).await.unwrap();
let snapshot_dir = temp_dir.path().join("snapshot");
sm.generate_snapshot_data(
snapshot_dir.clone(),
d_engine_proto::common::LogId { index: 1, term: 1 },
)
.await
.unwrap();
let ttl_file = snapshot_dir.join("ttl_state.bin");
assert!(
ttl_file.exists(),
"TTL state file should be created in snapshot"
);
let temp_dir2 = TempDir::new().unwrap();
let sm2 =
create_rocksdb_state_machine_with_lease(temp_dir2.path().join("rocksdb"), ttl_config)
.await;
sm2.apply_snapshot_from_file(
&d_engine_proto::server::storage::SnapshotMetadata {
last_included: Some(d_engine_proto::common::LogId { index: 1, term: 1 }),
checksum: Bytes::new(),
},
snapshot_dir,
)
.await
.unwrap();
}
#[tokio::test]
async fn test_rocksdb_ttl_update_cancels_previous() {
let temp_dir = TempDir::new().unwrap();
let ttl_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1,
};
let sm =
create_rocksdb_state_machine_with_lease(temp_dir.path().join("rocksdb"), ttl_config)
.await;
let entry1 = create_insert_entry(1, 1, b"update_key", b"value1", 2);
sm.apply_chunk(vec![entry1]).await.unwrap();
let entry2 = create_insert_entry(2, 1, b"update_key", b"value2", 10);
sm.apply_chunk(vec![entry2]).await.unwrap();
sleep(Duration::from_secs(3)).await;
let entry3 = create_insert_entry(3, 1, b"trigger", b"trigger", 0);
sm.apply_chunk(vec![entry3]).await.unwrap();
let value = sm.get(b"update_key").unwrap();
assert_eq!(value, Some(Bytes::from("value2")));
}
#[tokio::test]
async fn test_rocksdb_ttl_delete_unregisters() {
let temp_dir = TempDir::new().unwrap();
let ttl_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1,
};
let sm =
create_rocksdb_state_machine_with_lease(temp_dir.path().join("rocksdb"), ttl_config)
.await;
let entry1 = create_insert_entry(1, 1, b"delete_key", b"delete_value", 3600);
sm.apply_chunk(vec![entry1]).await.unwrap();
let entry2 = create_delete_entry(2, 1, b"delete_key");
sm.apply_chunk(vec![entry2]).await.unwrap();
let value = sm.get(b"delete_key").unwrap();
assert_eq!(value, None);
sleep(Duration::from_secs(2)).await;
let entry3 = create_insert_entry(3, 1, b"trigger", b"trigger", 0);
sm.apply_chunk(vec![entry3]).await.unwrap();
}
#[tokio::test]
async fn test_rocksdb_multiple_keys_with_different_ttls() {
let temp_dir = TempDir::new().unwrap();
let ttl_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1,
};
let sm =
create_rocksdb_state_machine_with_lease(temp_dir.path().join("rocksdb"), ttl_config)
.await;
let entry1 = create_insert_entry(1, 1, b"key_1sec", b"value1", 1);
let entry2 = create_insert_entry(2, 1, b"key_5sec", b"value2", 5);
let entry3 = create_insert_entry(3, 1, b"key_no_ttl", b"value3", 0);
sm.apply_chunk(vec![entry1, entry2, entry3]).await.unwrap();
assert_eq!(sm.get(b"key_1sec").unwrap(), Some(Bytes::from("value1")));
assert_eq!(sm.get(b"key_5sec").unwrap(), Some(Bytes::from("value2")));
assert_eq!(sm.get(b"key_no_ttl").unwrap(), Some(Bytes::from("value3")));
sleep(Duration::from_secs(2)).await;
sm.lease_background_cleanup().await.unwrap();
assert_eq!(sm.get(b"key_1sec").unwrap(), None);
assert_eq!(sm.get(b"key_5sec").unwrap(), Some(Bytes::from("value2")));
assert_eq!(sm.get(b"key_no_ttl").unwrap(), Some(Bytes::from("value3")));
sleep(Duration::from_secs(4)).await;
sm.lease_background_cleanup().await.unwrap();
assert_eq!(sm.get(b"key_5sec").unwrap(), None);
assert_eq!(sm.get(b"key_no_ttl").unwrap(), Some(Bytes::from("value3")));
}
#[tokio::test]
async fn test_rocksdb_ttl_persistence_across_restart() {
let temp_dir = TempDir::new().unwrap();
let state_machine_path = temp_dir.path().join("rocksdb");
let ttl_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1,
};
{
let sm = create_rocksdb_state_machine_with_lease(
state_machine_path.clone(),
ttl_config.clone(),
)
.await;
let entry1 = create_insert_entry(1, 1, b"short_ttl_key", b"value1", 2);
let entry2 = create_insert_entry(2, 1, b"long_ttl_key", b"value2", 3600);
let entry3 = create_insert_entry(3, 1, b"no_ttl_key", b"value3", 0);
sm.apply_chunk(vec![entry1, entry2, entry3]).await.unwrap();
assert_eq!(
sm.get(b"short_ttl_key").unwrap(),
Some(Bytes::from("value1"))
);
assert_eq!(
sm.get(b"long_ttl_key").unwrap(),
Some(Bytes::from("value2"))
);
assert_eq!(sm.get(b"no_ttl_key").unwrap(), Some(Bytes::from("value3")));
sm.stop().unwrap();
}
{
let sm =
create_rocksdb_state_machine_with_lease(state_machine_path.clone(), ttl_config)
.await;
assert_eq!(
sm.get(b"short_ttl_key").unwrap(),
Some(Bytes::from("value1"))
);
assert_eq!(
sm.get(b"long_ttl_key").unwrap(),
Some(Bytes::from("value2"))
);
assert_eq!(sm.get(b"no_ttl_key").unwrap(), Some(Bytes::from("value3")));
sleep(Duration::from_secs(3)).await;
sm.lease_background_cleanup().await.unwrap();
assert_eq!(sm.get(b"short_ttl_key").unwrap(), None);
assert_eq!(
sm.get(b"long_ttl_key").unwrap(),
Some(Bytes::from("value2"))
);
assert_eq!(sm.get(b"no_ttl_key").unwrap(), Some(Bytes::from("value3")));
}
}
#[tokio::test]
async fn test_rocksdb_reset_clears_ttl_manager() {
let temp_dir = TempDir::new().unwrap();
let ttl_config = d_engine_core::config::LeaseConfig {
enabled: true,
interval_ms: 1000,
max_cleanup_duration_ms: 1,
};
let sm =
create_rocksdb_state_machine_with_lease(temp_dir.path().join("rocksdb"), ttl_config)
.await;
let entry1 = create_insert_entry(1, 1, b"key1", b"value1", 3600);
let entry2 = create_insert_entry(2, 1, b"key2", b"value2", 7200);
sm.apply_chunk(vec![entry1, entry2]).await.unwrap();
sm.reset().await.unwrap();
assert_eq!(sm.get(b"key1").unwrap(), None);
assert_eq!(sm.get(b"key2").unwrap(), None);
assert_eq!(sm.len(), 0);
}
}