use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::SystemTime;
use arc_swap::ArcSwap;
use bytes::Bytes;
use d_engine_core::ApplyEntry;
use d_engine_core::ApplyResult;
use d_engine_core::Command;
use d_engine_core::Error;
use d_engine_core::Lease;
use d_engine_core::ScanResult;
use d_engine_core::StateMachine;
use d_engine_core::StorageError;
use d_engine_proto::common::LogId;
use d_engine_proto::server::storage::SnapshotMetadata;
use parking_lot::RwLock;
use std::path::Path;
use async_trait::async_trait;
use rocksdb::Cache;
use rocksdb::ColumnFamilyDescriptor;
use rocksdb::DB;
use rocksdb::Direction;
use rocksdb::ExportImportFilesMetaData;
use rocksdb::ImportColumnFamilyOptions;
use rocksdb::IteratorMode;
use rocksdb::LiveFile;
use rocksdb::Options;
use rocksdb::ReadOptions;
use rocksdb::WriteBatch;
use rocksdb::WriteBatchWithIndex;
use serde::Deserialize;
use serde::Serialize;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::instrument;
use tracing::warn;
use crate::storage::DefaultLease;
use super::STATE_MACHINE_CF;
use super::STATE_MACHINE_META_CF;
const LAST_APPLIED_INDEX_KEY: &[u8] = b"last_applied_index";
const LAST_APPLIED_TERM_KEY: &[u8] = b"last_applied_term";
const SNAPSHOT_METADATA_KEY: &[u8] = b"snapshot_metadata";
const TTL_STATE_KEY: &[u8] = b"ttl_state";
#[derive(Serialize, Deserialize)]
struct CfExportMeta {
db_comparator_name: String,
files: Vec<CfExportFile>,
}
#[derive(Serialize, Deserialize)]
struct CfExportFile {
column_family_name: String,
name: String,
size: usize,
level: i32,
start_key: Option<Vec<u8>>,
end_key: Option<Vec<u8>>,
smallest_seqno: u64,
largest_seqno: u64,
num_entries: u64,
num_deletions: u64,
}
#[derive(Debug)]
pub struct RocksDBStateMachine {
db: Arc<ArcSwap<DB>>,
is_serving: AtomicBool,
last_applied_index: AtomicU64,
last_applied_term: AtomicU64,
last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
lease: Option<Arc<DefaultLease>>,
lease_enabled: bool,
}
fn prefix_successor(prefix: &[u8]) -> Option<Vec<u8>> {
let mut upper = prefix.to_vec();
while upper.last() == Some(&0xFF) {
upper.pop();
}
if upper.is_empty() {
return None;
}
*upper.last_mut().unwrap() += 1;
Some(upper)
}
impl RocksDBStateMachine {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let db_opts = super::base_db_options();
let cache = Cache::new_lru_cache(128 * 1024 * 1024);
let sm_cf = ColumnFamilyDescriptor::new(STATE_MACHINE_CF, super::sm_cf_options(&cache));
let sm_meta_cf =
ColumnFamilyDescriptor::new(STATE_MACHINE_META_CF, super::meta_cf_options(&cache));
let db = DB::open_cf_descriptors(&db_opts, path, vec![sm_cf, sm_meta_cf])
.map_err(|e| StorageError::DbError(e.to_string()))?;
let db_arc = Arc::new(db);
let (last_applied_index, last_applied_term) = Self::load_state_machine_metadata(&db_arc)?;
let last_snapshot_metadata = Self::load_snapshot_metadata(&db_arc)?;
Ok(Self {
db: Arc::new(ArcSwap::new(db_arc)),
is_serving: AtomicBool::new(true),
last_applied_index: AtomicU64::new(last_applied_index),
last_applied_term: AtomicU64::new(last_applied_term),
last_snapshot_metadata: RwLock::new(last_snapshot_metadata),
lease: None,
lease_enabled: false,
})
}
pub(super) fn from_shared_db(db: Arc<DB>) -> Result<Self, Error> {
let (last_applied_index, last_applied_term) = Self::load_state_machine_metadata(&db)?;
let last_snapshot_metadata = Self::load_snapshot_metadata(&db)?;
Ok(Self {
db: Arc::new(ArcSwap::new(db)),
is_serving: AtomicBool::new(true),
last_applied_index: AtomicU64::new(last_applied_index),
last_applied_term: AtomicU64::new(last_applied_term),
last_snapshot_metadata: RwLock::new(last_snapshot_metadata),
lease: None,
lease_enabled: false,
})
}
pub fn set_lease(
&mut self,
lease: Arc<DefaultLease>,
) {
self.lease_enabled = true;
self.lease = Some(lease);
}
#[cfg(test)]
pub(super) fn swap_db_for_test(
&self,
new_db: DB,
) {
self.db.store(Arc::new(new_db));
}
fn load_state_machine_metadata(db: &Arc<DB>) -> Result<(u64, u64), Error> {
let cf = db
.cf_handle(STATE_MACHINE_META_CF)
.ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
let index = match db
.get_cf(&cf, LAST_APPLIED_INDEX_KEY)
.map_err(|e| StorageError::DbError(e.to_string()))?
{
Some(bytes) if bytes.len() == 8 => u64::from_be_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
]),
_ => 0,
};
let term = match db
.get_cf(&cf, LAST_APPLIED_TERM_KEY)
.map_err(|e| StorageError::DbError(e.to_string()))?
{
Some(bytes) if bytes.len() == 8 => u64::from_be_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
]),
_ => 0,
};
Ok((index, term))
}
fn load_snapshot_metadata(db: &Arc<DB>) -> Result<Option<SnapshotMetadata>, Error> {
let cf = db
.cf_handle(STATE_MACHINE_META_CF)
.ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
match db
.get_cf(&cf, SNAPSHOT_METADATA_KEY)
.map_err(|e| StorageError::DbError(e.to_string()))?
{
Some(bytes) => {
let metadata = bincode::deserialize(&bytes).map_err(StorageError::BincodeError)?;
Ok(Some(metadata))
}
None => Ok(None),
}
}
fn persist_state_machine_metadata(&self) -> Result<(), Error> {
let db = self.db.load();
let cf = db
.cf_handle(STATE_MACHINE_META_CF)
.ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
let index = self.last_applied_index.load(Ordering::SeqCst);
let term = self.last_applied_term.load(Ordering::SeqCst);
db.put_cf(&cf, LAST_APPLIED_INDEX_KEY, index.to_be_bytes())
.map_err(|e| StorageError::DbError(e.to_string()))?;
db.put_cf(&cf, LAST_APPLIED_TERM_KEY, term.to_be_bytes())
.map_err(|e| StorageError::DbError(e.to_string()))?;
Ok(())
}
fn persist_snapshot_metadata(&self) -> Result<(), Error> {
let db = self.db.load();
let cf = db
.cf_handle(STATE_MACHINE_META_CF)
.ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
if let Some(metadata) = self.last_snapshot_metadata.read().clone() {
let bytes = bincode::serialize(&metadata).map_err(StorageError::BincodeError)?;
db.put_cf(&cf, SNAPSHOT_METADATA_KEY, bytes)
.map_err(|e| StorageError::DbError(e.to_string()))?;
}
Ok(())
}
fn persist_ttl_metadata(&self) -> Result<(), Error> {
if let Some(ref lease) = self.lease {
let db = self.db.load();
let cf = db.cf_handle(STATE_MACHINE_META_CF).ok_or_else(|| {
StorageError::DbError("State machine meta CF not found".to_string())
})?;
let ttl_snapshot = lease.to_snapshot();
db.put_cf(&cf, TTL_STATE_KEY, ttl_snapshot)
.map_err(|e| StorageError::DbError(e.to_string()))?;
debug!("Persisted TTL state to RocksDB");
}
Ok(())
}
pub async fn load_lease_data(&self) -> Result<(), Error> {
let Some(ref lease) = self.lease else {
return Ok(()); };
let db = self.db.load();
let cf = db
.cf_handle(STATE_MACHINE_META_CF)
.ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
match db
.get_cf(&cf, TTL_STATE_KEY)
.map_err(|e| StorageError::DbError(e.to_string()))?
{
Some(ttl_data) => {
lease.reload(&ttl_data)?;
debug!("Loaded TTL state from RocksDB: {} active TTLs", lease.len());
}
None => {
debug!("No TTL state found in RocksDB");
}
}
Ok(())
}
#[allow(dead_code)]
fn maybe_cleanup_expired(
&self,
max_duration_ms: u64,
) -> usize {
let start = std::time::Instant::now();
let now = SystemTime::now();
let mut deleted_count = 0;
if let Some(ref lease) = self.lease {
if !lease.has_lease_keys() {
return 0; }
if !lease.may_have_expired_keys(now) {
return 0; }
} else {
return 0; }
let db = self.db.load();
let cf = match db.cf_handle(STATE_MACHINE_CF) {
Some(cf) => cf,
None => {
error!("State machine CF not found during TTL cleanup");
return 0;
}
};
let max_duration = std::time::Duration::from_millis(max_duration_ms);
loop {
if start.elapsed() >= max_duration {
debug!(
"Piggyback cleanup time budget exceeded: deleted {} keys in {:?}",
deleted_count,
start.elapsed()
);
break;
}
let expired_keys = if let Some(ref lease) = self.lease {
lease.get_expired_keys(now)
} else {
vec![]
};
if expired_keys.is_empty() {
break; }
let mut batch = WriteBatch::default();
for key in expired_keys {
batch.delete_cf(&cf, &key);
deleted_count += 1;
}
if let Err(e) = db.write(&batch) {
error!("Failed to delete expired keys: {}", e);
break;
}
}
if deleted_count > 0 {
debug!(
"Piggyback cleanup: deleted {} expired keys in {:?}",
deleted_count,
start.elapsed()
);
}
deleted_count
}
fn apply_batch(
&self,
batch: WriteBatch,
) -> Result<(), Error> {
self.db.load().write(&batch).map_err(|e| StorageError::DbError(e.to_string()))?;
Ok(())
}
async fn restore_from_snapshot(
&self,
metadata: &SnapshotMetadata,
snapshot_dir: &std::path::Path,
) -> Result<(), Error> {
let db = self.db.load();
Self::restore_from_cf_export(&db, snapshot_dir)?;
info!("Snapshot restore complete");
if let Some(ref lease) = self.lease {
let ttl_path = snapshot_dir.join("ttl_state.bin");
if ttl_path.exists() {
let ttl_data = tokio::fs::read(&ttl_path).await?;
lease.reload(&ttl_data)?;
self.persist_ttl_metadata()?;
} else {
warn!("No lease state found in snapshot");
}
}
*self.last_snapshot_metadata.write() = Some(metadata.clone());
if let Some(last_included) = &metadata.last_included {
self.update_last_applied(*last_included);
}
self.is_serving.store(true, Ordering::SeqCst);
info!("Snapshot applied successfully");
Ok(())
}
fn restore_from_cf_export(
db: &DB,
snapshot_dir: &std::path::Path,
) -> Result<(), Error> {
let sm_meta = Self::load_cf_export_metadata(
&snapshot_dir.join("sm_metadata.bin"),
&snapshot_dir.join("sm"),
)?;
let sm_meta_meta = Self::load_cf_export_metadata(
&snapshot_dir.join("sm_meta_metadata.bin"),
&snapshot_dir.join("sm_meta"),
)?;
let import_opts = ImportColumnFamilyOptions::default();
let cf_opts = Options::default();
db.drop_cf(STATE_MACHINE_CF).map_err(|e| StorageError::DbError(e.to_string()))?;
db.drop_cf(STATE_MACHINE_META_CF)
.map_err(|e| StorageError::DbError(e.to_string()))?;
if sm_meta.get_files().is_empty() {
db.create_cf(STATE_MACHINE_CF, &cf_opts)
.map_err(|e| StorageError::DbError(e.to_string()))?;
} else {
db.create_column_family_with_import(&cf_opts, STATE_MACHINE_CF, &import_opts, &sm_meta)
.map_err(|e| StorageError::DbError(e.to_string()))?;
}
if sm_meta_meta.get_files().is_empty() {
db.create_cf(STATE_MACHINE_META_CF, &cf_opts)
.map_err(|e| StorageError::DbError(e.to_string()))?;
} else {
db.create_column_family_with_import(
&cf_opts,
STATE_MACHINE_META_CF,
&import_opts,
&sm_meta_meta,
)
.map_err(|e| StorageError::DbError(e.to_string()))?;
}
Ok(())
}
fn save_cf_export_metadata(
metadata: &ExportImportFilesMetaData,
path: &std::path::Path,
) -> Result<(), Error> {
let files = metadata
.get_files()
.into_iter()
.map(|f| CfExportFile {
column_family_name: f.column_family_name,
name: f.name,
size: f.size,
level: f.level,
start_key: f.start_key,
end_key: f.end_key,
smallest_seqno: f.smallest_seqno,
largest_seqno: f.largest_seqno,
num_entries: f.num_entries,
num_deletions: f.num_deletions,
})
.collect();
let meta = CfExportMeta {
db_comparator_name: metadata.get_db_comparator_name(),
files,
};
let bytes = bincode::serialize(&meta).map_err(StorageError::BincodeError)?;
std::fs::write(path, bytes).map_err(|e| StorageError::DbError(e.to_string()))?;
Ok(())
}
fn load_cf_export_metadata(
path: &std::path::Path,
actual_dir: &std::path::Path,
) -> Result<ExportImportFilesMetaData, Error> {
let bytes = std::fs::read(path).map_err(|e| StorageError::DbError(e.to_string()))?;
let meta: CfExportMeta =
bincode::deserialize(&bytes).map_err(StorageError::BincodeError)?;
let dir_str = actual_dir
.to_str()
.ok_or_else(|| StorageError::DbError("Snapshot path is not valid UTF-8".to_string()))?
.to_string();
let live_files: Vec<LiveFile> = meta
.files
.into_iter()
.map(|f| LiveFile {
column_family_name: f.column_family_name,
name: f.name,
directory: dir_str.clone(),
size: f.size,
level: f.level,
start_key: f.start_key,
end_key: f.end_key,
smallest_seqno: f.smallest_seqno,
largest_seqno: f.largest_seqno,
num_entries: f.num_entries,
num_deletions: f.num_deletions,
})
.collect();
let mut export_metadata = ExportImportFilesMetaData::default();
export_metadata.set_db_comparator_name(&meta.db_comparator_name);
export_metadata
.set_files(&live_files)
.map_err(|e| StorageError::DbError(e.to_string()))?;
Ok(export_metadata)
}
pub(crate) fn map_snapshot_join_error(e: tokio::task::JoinError) -> StorageError {
let msg = if e.is_panic() {
format!("snapshot blocking task panicked: {e}")
} else {
format!("snapshot blocking task was cancelled: {e}")
};
StorageError::DbError(msg)
}
pub fn scan_prefix(
&self,
prefix: &[u8],
) -> Result<ScanResult, Error> {
if prefix.is_empty() {
let revision = self.last_applied_index.load(Ordering::SeqCst);
return Ok(ScanResult {
entries: vec![],
revision,
});
}
let mut opts = ReadOptions::default();
if let Some(upper) = prefix_successor(prefix) {
opts.set_iterate_upper_bound(upper);
}
let db = self.db.load();
let cf = db
.cf_handle(STATE_MACHINE_CF)
.ok_or_else(|| StorageError::DbError("STATE_MACHINE_CF not found".into()))?;
let iter = db.iterator_cf_opt(&cf, opts, IteratorMode::From(prefix, Direction::Forward));
let mut entries = Vec::new();
for item in iter {
let (k, v) = item.map_err(|e| StorageError::DbError(e.to_string()))?;
if !k.starts_with(prefix) {
break;
}
entries.push((Bytes::copy_from_slice(&k), Bytes::copy_from_slice(&v)));
}
let revision = self.last_applied_index.load(Ordering::SeqCst);
Ok(ScanResult { entries, revision })
}
}
#[async_trait]
impl StateMachine for RocksDBStateMachine {
async fn start(&self) -> Result<(), Error> {
self.is_serving.store(true, Ordering::SeqCst);
if let Some(ref _lease) = self.lease {
self.load_lease_data().await?;
debug!("Lease data loaded during state machine initialization");
}
info!("RocksDB state machine started");
Ok(())
}
fn stop(&self) -> Result<(), Error> {
self.is_serving.store(false, Ordering::SeqCst);
if let Err(e) = self.persist_ttl_metadata() {
error!("Failed to persist TTL metadata on shutdown: {:?}", e);
return Err(e);
}
info!("RocksDB state machine stopped");
Ok(())
}
fn is_running(&self) -> bool {
self.is_serving.load(Ordering::SeqCst)
}
fn get(
&self,
key_buffer: &[u8],
) -> Result<Option<Bytes>, Error> {
if !self.is_serving.load(Ordering::SeqCst) {
return Err(StorageError::NotServing(
"State machine is restoring from snapshot".to_string(),
)
.into());
}
let db = self.db.load();
let cf = db
.cf_handle(STATE_MACHINE_CF)
.ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
match db.get_cf(&cf, key_buffer).map_err(|e| StorageError::DbError(e.to_string()))? {
Some(value) => Ok(Some(Bytes::copy_from_slice(&value))),
None => Ok(None),
}
}
fn entry_term(
&self,
_entry_id: u64,
) -> Option<u64> {
None
}
#[instrument(skip(self, chunk))]
async fn apply_chunk(
&self,
chunk: &[ApplyEntry],
) -> Result<Vec<ApplyResult>, Error> {
let db = self.db.load();
let cf = db
.cf_handle(STATE_MACHINE_CF)
.ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
let mut batch = WriteBatchWithIndex::new(0, true);
let mut highest_index_entry: Option<LogId> = None;
let mut results = Vec::with_capacity(chunk.len());
for entry in chunk {
if let Some(prev) = highest_index_entry {
assert!(
entry.index > prev.index,
"apply_chunk: received unordered entry at index {} (prev={})",
entry.index,
prev.index
);
}
highest_index_entry = Some(LogId {
index: entry.index,
term: entry.term,
});
match &entry.command {
Command::Noop => {
debug!("Handling NOOP command at index {}", entry.index);
results.push(ApplyResult::success(entry.index));
}
Command::Insert {
key,
value,
ttl_secs,
} => {
batch.put_cf(&cf, key, value);
if let Some(ttl) = ttl_secs {
if !self.lease_enabled {
return Err(StorageError::FeatureNotEnabled(
"TTL feature is not enabled on this server. \
Enable it in config: [raft.state_machine.lease] enabled = true"
.into(),
)
.into());
}
let lease = unsafe { self.lease.as_ref().unwrap_unchecked() };
lease.register(key.clone(), *ttl);
}
results.push(ApplyResult::success(entry.index));
}
Command::Delete { key } => {
batch.delete_cf(&cf, key);
if let Some(ref lease) = self.lease {
lease.unregister(key);
}
results.push(ApplyResult::success(entry.index));
}
Command::CompareAndSwap {
key,
expected,
value: new_value,
} => {
let current_value = batch
.get_from_batch_and_db_cf(&*db, &cf, key, &ReadOptions::default())
.map_err(|e| StorageError::DbError(format!("CAS read failed: {e}")))?;
let cas_success = match (current_value, expected) {
(Some(current), Some(exp)) => current == exp.as_ref(),
(None, None) => true,
_ => false,
};
if cas_success {
batch.put_cf(&cf, key, new_value);
}
results.push(if cas_success {
ApplyResult::success(entry.index)
} else {
ApplyResult::failure(entry.index)
});
debug!(
"CAS at index {}: key={:?}, success={}",
entry.index,
String::from_utf8_lossy(key),
cas_success
);
}
}
}
self.db
.load()
.write_wbwi(&batch)
.map_err(|e| StorageError::DbError(e.to_string()))?;
if let Some(highest) = highest_index_entry {
self.update_last_applied(highest);
}
Ok(results)
}
fn len(&self) -> usize {
let db = self.db.load();
let cf = match db.cf_handle(STATE_MACHINE_CF) {
Some(cf) => cf,
None => return 0,
};
let iter = db.iterator_cf(&cf, IteratorMode::Start);
iter.count()
}
fn update_last_applied(
&self,
last_applied: LogId,
) {
self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
}
fn last_applied(&self) -> LogId {
LogId {
index: self.last_applied_index.load(Ordering::SeqCst),
term: self.last_applied_term.load(Ordering::SeqCst),
}
}
fn persist_last_applied(
&self,
last_applied: LogId,
) -> Result<(), Error> {
self.update_last_applied(last_applied);
self.persist_state_machine_metadata()
}
fn update_last_snapshot_metadata(
&self,
snapshot_metadata: &SnapshotMetadata,
) -> Result<(), Error> {
*self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
Ok(())
}
fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
self.last_snapshot_metadata.read().clone()
}
fn persist_last_snapshot_metadata(
&self,
snapshot_metadata: &SnapshotMetadata,
) -> Result<(), Error> {
self.update_last_snapshot_metadata(snapshot_metadata)?;
self.persist_snapshot_metadata()
}
#[instrument(skip(self))]
async fn apply_snapshot_from_file(
&self,
metadata: &SnapshotMetadata,
snapshot_dir: std::path::PathBuf,
) -> Result<(), Error> {
info!("Applying snapshot from: {:?}", snapshot_dir);
self.is_serving.store(false, Ordering::SeqCst);
let result = self.restore_from_snapshot(metadata, &snapshot_dir).await;
if let Err(ref e) = result {
error!(
"Snapshot restore failed, resuming serving with pre-restore state: {:?}",
e
);
self.is_serving.store(true, Ordering::SeqCst);
}
result
}
#[instrument(skip(self))]
async fn generate_snapshot_data(
&self,
new_snapshot_dir: std::path::PathBuf,
last_included: LogId,
) -> Result<Bytes, Error> {
let db = self.db.load_full(); let dir = new_snapshot_dir.clone();
tokio::task::spawn_blocking(move || -> Result<(), Error> {
std::fs::create_dir_all(&dir)?;
{
let checkpoint = rocksdb::checkpoint::Checkpoint::new(db.as_ref())
.map_err(|e| StorageError::DbError(e.to_string()))?;
let cf_sm = db
.cf_handle(STATE_MACHINE_CF)
.ok_or_else(|| StorageError::DbError("SM CF not found".to_string()))?;
let cf_sm_meta = db
.cf_handle(STATE_MACHINE_META_CF)
.ok_or_else(|| StorageError::DbError("SM meta CF not found".to_string()))?;
let flush_opts = rocksdb::FlushOptions::default();
db.flush_cf_opt(&cf_sm, &flush_opts)
.map_err(|e| StorageError::DbError(e.to_string()))?;
db.flush_cf_opt(&cf_sm_meta, &flush_opts)
.map_err(|e| StorageError::DbError(e.to_string()))?;
let sm_export = checkpoint
.export_column_family(&cf_sm, dir.join("sm"))
.map_err(|e| StorageError::DbError(e.to_string()))?;
let sm_meta_export = checkpoint
.export_column_family(&cf_sm_meta, dir.join("sm_meta"))
.map_err(|e| StorageError::DbError(e.to_string()))?;
Self::save_cf_export_metadata(&sm_export, &dir.join("sm_metadata.bin"))?;
Self::save_cf_export_metadata(&sm_meta_export, &dir.join("sm_meta_metadata.bin"))?;
} Ok(())
})
.await
.map_err(Self::map_snapshot_join_error)??;
if let Some(ref lease) = self.lease {
let ttl_snapshot = lease.to_snapshot();
let ttl_path = new_snapshot_dir.join("ttl_state.bin");
tokio::fs::write(&ttl_path, ttl_snapshot).await?;
}
let checksum = [0; 32];
let snapshot_metadata = SnapshotMetadata {
last_included: Some(last_included),
checksum: Bytes::copy_from_slice(&checksum),
};
self.persist_last_snapshot_metadata(&snapshot_metadata)?;
info!("Snapshot generated at {:?}", new_snapshot_dir);
Ok(Bytes::copy_from_slice(&checksum))
}
fn save_hard_state(&self) -> Result<(), Error> {
self.persist_state_machine_metadata()?;
self.persist_snapshot_metadata()?;
Ok(())
}
fn flush(&self) -> Result<(), Error> {
let db = self.db.load();
db.flush_wal(true).map_err(|e| StorageError::DbError(e.to_string()))?;
db.flush().map_err(|e| StorageError::DbError(e.to_string()))?;
self.persist_state_machine_metadata()?;
Ok(())
}
async fn flush_async(&self) -> Result<(), Error> {
self.flush()
}
#[instrument(skip(self))]
async fn reset(&self) -> Result<(), Error> {
let db = self.db.load();
let cf = db
.cf_handle(STATE_MACHINE_CF)
.ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
let mut batch = WriteBatch::default();
let iter = db.iterator_cf(&cf, IteratorMode::Start);
for item in iter {
let (key, _) = item.map_err(|e| StorageError::DbError(e.to_string()))?;
batch.delete_cf(&cf, &key);
}
db.write(&batch).map_err(|e| StorageError::DbError(e.to_string()))?;
self.last_applied_index.store(0, Ordering::SeqCst);
self.last_applied_term.store(0, Ordering::SeqCst);
*self.last_snapshot_metadata.write() = None;
self.persist_state_machine_metadata()?;
self.persist_snapshot_metadata()?;
info!("RocksDB state machine reset completed");
Ok(())
}
async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
let Some(ref lease) = self.lease else {
return Ok(vec![]);
};
let now = SystemTime::now();
let expired_keys = lease.get_expired_keys(now);
if expired_keys.is_empty() {
return Ok(vec![]);
}
debug!(
"Lease background cleanup: found {} expired keys",
expired_keys.len()
);
let db = self.db.load();
let cf = db
.cf_handle(STATE_MACHINE_CF)
.ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
let mut batch = WriteBatch::default();
for key in &expired_keys {
batch.delete_cf(&cf, key);
}
self.apply_batch(batch)?;
info!(
"Lease background cleanup: deleted {} expired keys",
expired_keys.len()
);
Ok(expired_keys)
}
fn scan_prefix(
&self,
prefix: &[u8],
) -> Result<ScanResult, Error> {
self.scan_prefix(prefix)
}
}
impl Drop for RocksDBStateMachine {
fn drop(&mut self) {
if let Err(e) = self.save_hard_state() {
error!("Failed to save hard state on drop: {}", e);
}
if let Err(e) = self.flush() {
error!("Failed to flush on drop: {}", e);
} else {
debug!("RocksDBStateMachine flushed successfully on drop");
}
self.db.load().cancel_all_background_work(true); debug!("RocksDB background work cancelled on drop");
}
}