1use std::path::Path;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::Ordering;
7use std::time::SystemTime;
8
9use arc_swap::ArcSwap;
10use bytes::Bytes;
11use d_engine_core::ApplyResult;
12use d_engine_core::Error;
13use d_engine_core::Lease;
14use d_engine_core::StateMachine;
15use d_engine_core::StorageError;
16use d_engine_proto::client::WriteCommand;
17use d_engine_proto::client::write_command::CompareAndSwap;
18use d_engine_proto::client::write_command::Delete;
19use d_engine_proto::client::write_command::Insert;
20use d_engine_proto::client::write_command::Operation;
21use d_engine_proto::common::Entry;
22use d_engine_proto::common::LogId;
23use d_engine_proto::common::entry_payload::Payload;
24use d_engine_proto::server::storage::SnapshotMetadata;
25use parking_lot::RwLock;
26use prost::Message;
27use rocksdb::Cache;
28use rocksdb::DB;
29use rocksdb::IteratorMode;
30use rocksdb::Options;
31use rocksdb::WriteBatch;
32use tonic::async_trait;
33use tracing::debug;
34use tracing::error;
35use tracing::info;
36use tracing::instrument;
37use tracing::warn;
38
39use crate::storage::DefaultLease;
40
41const STATE_MACHINE_CF: &str = "state_machine";
42const STATE_MACHINE_META_CF: &str = "state_machine_meta";
43const LAST_APPLIED_INDEX_KEY: &[u8] = b"last_applied_index";
44const LAST_APPLIED_TERM_KEY: &[u8] = b"last_applied_term";
45const SNAPSHOT_METADATA_KEY: &[u8] = b"snapshot_metadata";
46const TTL_STATE_KEY: &[u8] = b"ttl_state";
47
48#[derive(Debug)]
50pub struct RocksDBStateMachine {
51 db: Arc<ArcSwap<DB>>,
52 db_path: PathBuf,
53 is_serving: AtomicBool,
54 last_applied_index: AtomicU64,
55 last_applied_term: AtomicU64,
56 last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
57
58 lease: Option<Arc<DefaultLease>>,
62
63 lease_enabled: bool,
69}
70
71impl RocksDBStateMachine {
72 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
76 let db_path = path.as_ref().to_path_buf();
77
78 let opts = Self::configure_db_options();
80 let cfs = vec![STATE_MACHINE_CF, STATE_MACHINE_META_CF];
81
82 let db =
83 DB::open_cf(&opts, &db_path, cfs).map_err(|e| StorageError::DbError(e.to_string()))?;
84 let db_arc = Arc::new(db);
85
86 let (last_applied_index, last_applied_term) = Self::load_state_machine_metadata(&db_arc)?;
88 let last_snapshot_metadata = Self::load_snapshot_metadata(&db_arc)?;
89
90 Ok(Self {
91 db: Arc::new(ArcSwap::new(db_arc)),
92 db_path,
93 is_serving: AtomicBool::new(true),
94 last_applied_index: AtomicU64::new(last_applied_index),
95 last_applied_term: AtomicU64::new(last_applied_term),
96 last_snapshot_metadata: RwLock::new(last_snapshot_metadata),
97 lease: None, lease_enabled: false, })
100 }
101
102 pub fn set_lease(
108 &mut self,
109 lease: Arc<DefaultLease>,
110 ) {
111 self.lease_enabled = true;
113 self.lease = Some(lease);
114 }
115
116 fn configure_db_options() -> Options {
136 let mut opts = Options::default();
137 opts.create_if_missing(true);
138 opts.create_missing_column_families(true);
139
140 opts.set_max_write_buffer_number(4);
142 opts.set_min_write_buffer_number_to_merge(2);
143 opts.set_write_buffer_size(128 * 1024 * 1024); opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
147 opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
148 opts.set_compression_options(-14, 0, 0, 0); opts.set_wal_bytes_per_sync(1024 * 1024); opts.set_manual_wal_flush(true);
153 opts.set_use_fsync(false);
154
155 opts.set_max_background_jobs(4);
157 opts.set_max_open_files(5000);
158 opts.set_use_direct_io_for_flush_and_compaction(true);
159 opts.set_use_direct_reads(true);
160
161 opts.set_level_compaction_dynamic_level_bytes(true);
163 opts.set_target_file_size_base(64 * 1024 * 1024); opts.set_max_bytes_for_level_base(256 * 1024 * 1024); let cache = Cache::new_lru_cache(128 * 1024 * 1024); opts.set_row_cache(&cache);
169
170 opts
171 }
172
173 fn open_db<P: AsRef<Path>>(path: P) -> Result<DB, Error> {
174 let opts = Self::configure_db_options();
175 let cfs = vec![STATE_MACHINE_CF, STATE_MACHINE_META_CF];
176 DB::open_cf(&opts, path, cfs).map_err(|e| StorageError::DbError(e.to_string()).into())
177 }
178
179 fn load_state_machine_metadata(db: &Arc<DB>) -> Result<(u64, u64), Error> {
180 let cf = db
181 .cf_handle(STATE_MACHINE_META_CF)
182 .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
183
184 let index = match db
185 .get_cf(&cf, LAST_APPLIED_INDEX_KEY)
186 .map_err(|e| StorageError::DbError(e.to_string()))?
187 {
188 Some(bytes) if bytes.len() == 8 => u64::from_be_bytes([
189 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
190 ]),
191 _ => 0,
192 };
193
194 let term = match db
195 .get_cf(&cf, LAST_APPLIED_TERM_KEY)
196 .map_err(|e| StorageError::DbError(e.to_string()))?
197 {
198 Some(bytes) if bytes.len() == 8 => u64::from_be_bytes([
199 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
200 ]),
201 _ => 0,
202 };
203
204 Ok((index, term))
205 }
206
207 fn load_snapshot_metadata(db: &Arc<DB>) -> Result<Option<SnapshotMetadata>, Error> {
208 let cf = db
209 .cf_handle(STATE_MACHINE_META_CF)
210 .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
211
212 match db
213 .get_cf(&cf, SNAPSHOT_METADATA_KEY)
214 .map_err(|e| StorageError::DbError(e.to_string()))?
215 {
216 Some(bytes) => {
217 let metadata = bincode::deserialize(&bytes).map_err(StorageError::BincodeError)?;
218 Ok(Some(metadata))
219 }
220 None => Ok(None),
221 }
222 }
223
224 fn persist_state_machine_metadata(&self) -> Result<(), Error> {
225 let db = self.db.load();
226 let cf = db
227 .cf_handle(STATE_MACHINE_META_CF)
228 .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
229
230 let index = self.last_applied_index.load(Ordering::SeqCst);
231 let term = self.last_applied_term.load(Ordering::SeqCst);
232
233 db.put_cf(&cf, LAST_APPLIED_INDEX_KEY, index.to_be_bytes())
234 .map_err(|e| StorageError::DbError(e.to_string()))?;
235 db.put_cf(&cf, LAST_APPLIED_TERM_KEY, term.to_be_bytes())
236 .map_err(|e| StorageError::DbError(e.to_string()))?;
237
238 Ok(())
239 }
240
241 fn persist_snapshot_metadata(&self) -> Result<(), Error> {
242 let db = self.db.load();
243 let cf = db
244 .cf_handle(STATE_MACHINE_META_CF)
245 .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
246
247 if let Some(metadata) = self.last_snapshot_metadata.read().clone() {
248 let bytes = bincode::serialize(&metadata).map_err(StorageError::BincodeError)?;
249 db.put_cf(&cf, SNAPSHOT_METADATA_KEY, bytes)
250 .map_err(|e| StorageError::DbError(e.to_string()))?;
251 }
252 Ok(())
253 }
254
255 fn persist_ttl_metadata(&self) -> Result<(), Error> {
256 if let Some(ref lease) = self.lease {
257 let db = self.db.load();
258 let cf = db.cf_handle(STATE_MACHINE_META_CF).ok_or_else(|| {
259 StorageError::DbError("State machine meta CF not found".to_string())
260 })?;
261
262 let ttl_snapshot = lease.to_snapshot();
263
264 db.put_cf(&cf, TTL_STATE_KEY, ttl_snapshot)
265 .map_err(|e| StorageError::DbError(e.to_string()))?;
266
267 debug!("Persisted TTL state to RocksDB");
268 }
269 Ok(())
270 }
271
272 pub async fn load_lease_data(&self) -> Result<(), Error> {
277 let Some(ref lease) = self.lease else {
278 return Ok(()); };
280
281 let db = self.db.load();
282 let cf = db
283 .cf_handle(STATE_MACHINE_META_CF)
284 .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
285
286 match db
287 .get_cf(&cf, TTL_STATE_KEY)
288 .map_err(|e| StorageError::DbError(e.to_string()))?
289 {
290 Some(ttl_data) => {
291 lease.reload(&ttl_data)?;
292 debug!("Loaded TTL state from RocksDB: {} active TTLs", lease.len());
293 }
294 None => {
295 debug!("No TTL state found in RocksDB");
296 }
297 }
298
299 Ok(())
300 }
301
302 #[allow(dead_code)]
318 fn maybe_cleanup_expired(
319 &self,
320 max_duration_ms: u64,
321 ) -> usize {
322 let start = std::time::Instant::now();
323 let now = SystemTime::now();
324 let mut deleted_count = 0;
325
326 if let Some(ref lease) = self.lease {
328 if !lease.has_lease_keys() {
329 return 0; }
331
332 if !lease.may_have_expired_keys(now) {
334 return 0; }
336 } else {
337 return 0; }
339
340 let db = self.db.load();
342 let cf = match db.cf_handle(STATE_MACHINE_CF) {
343 Some(cf) => cf,
344 None => {
345 error!("State machine CF not found during TTL cleanup");
346 return 0;
347 }
348 };
349
350 let max_duration = std::time::Duration::from_millis(max_duration_ms);
352
353 loop {
354 if start.elapsed() >= max_duration {
356 debug!(
357 "Piggyback cleanup time budget exceeded: deleted {} keys in {:?}",
358 deleted_count,
359 start.elapsed()
360 );
361 break;
362 }
363
364 let expired_keys = if let Some(ref lease) = self.lease {
366 lease.get_expired_keys(now)
367 } else {
368 vec![]
369 };
370
371 if expired_keys.is_empty() {
372 break; }
374
375 let mut batch = WriteBatch::default();
377 for key in expired_keys {
378 batch.delete_cf(&cf, &key);
379 deleted_count += 1;
380 }
381
382 if let Err(e) = db.write(batch) {
384 error!("Failed to delete expired keys: {}", e);
385 break;
386 }
387 }
388
389 if deleted_count > 0 {
390 debug!(
391 "Piggyback cleanup: deleted {} expired keys in {:?}",
392 deleted_count,
393 start.elapsed()
394 );
395 }
396
397 deleted_count
398 }
399
400 fn apply_batch(
401 &self,
402 batch: WriteBatch,
403 ) -> Result<(), Error> {
404 self.db.load().write(batch).map_err(|e| StorageError::DbError(e.to_string()))?;
405 Ok(())
406 }
407}
408
409#[async_trait]
410impl StateMachine for RocksDBStateMachine {
411 async fn start(&self) -> Result<(), Error> {
412 self.is_serving.store(true, Ordering::SeqCst);
413
414 if let Some(ref _lease) = self.lease {
416 self.load_lease_data().await?;
417 debug!("Lease data loaded during state machine initialization");
418 }
419
420 info!("RocksDB state machine started");
421 Ok(())
422 }
423
424 fn stop(&self) -> Result<(), Error> {
425 self.is_serving.store(false, Ordering::SeqCst);
426
427 if let Err(e) = self.persist_ttl_metadata() {
430 error!("Failed to persist TTL metadata on shutdown: {:?}", e);
431 return Err(e);
432 }
433
434 info!("RocksDB state machine stopped");
435 Ok(())
436 }
437
438 fn is_running(&self) -> bool {
439 self.is_serving.load(Ordering::SeqCst)
440 }
441
442 fn get(
443 &self,
444 key_buffer: &[u8],
445 ) -> Result<Option<Bytes>, Error> {
446 if !self.is_serving.load(Ordering::SeqCst) {
450 return Err(StorageError::NotServing(
451 "State machine is restoring from snapshot".to_string(),
452 )
453 .into());
454 }
455
456 let db = self.db.load();
457 let cf = db
458 .cf_handle(STATE_MACHINE_CF)
459 .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
460
461 match db.get_cf(&cf, key_buffer).map_err(|e| StorageError::DbError(e.to_string()))? {
462 Some(value) => Ok(Some(Bytes::copy_from_slice(&value))),
463 None => Ok(None),
464 }
465 }
466
467 fn entry_term(
468 &self,
469 _entry_id: u64,
470 ) -> Option<u64> {
471 None
474 }
475
476 #[instrument(skip(self, chunk))]
478 async fn apply_chunk(
479 &self,
480 chunk: Vec<Entry>,
481 ) -> Result<Vec<ApplyResult>, Error> {
482 let db = self.db.load();
483 let cf = db
484 .cf_handle(STATE_MACHINE_CF)
485 .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
486
487 let mut batch = WriteBatch::default();
488 let mut highest_index_entry: Option<LogId> = None;
489 let mut results = Vec::with_capacity(chunk.len());
490
491 for entry in chunk {
492 assert!(entry.payload.is_some(), "Entry payload should not be None!");
493
494 if let Some(prev) = highest_index_entry {
495 assert!(
496 entry.index > prev.index,
497 "apply_chunk: received unordered entry at index {} (prev={})",
498 entry.index,
499 prev.index
500 );
501 }
502 highest_index_entry = Some(LogId {
503 index: entry.index,
504 term: entry.term,
505 });
506
507 match entry.payload.unwrap().payload {
508 Some(Payload::Noop(_)) => {
509 debug!("Handling NOOP command at index {}", entry.index);
510 results.push(ApplyResult::success(entry.index));
512 }
513 Some(Payload::Command(data)) => match WriteCommand::decode(&data[..]) {
514 Ok(write_cmd) => match write_cmd.operation {
515 Some(Operation::Insert(Insert {
516 key,
517 value,
518 ttl_secs,
519 })) => {
520 batch.put_cf(&cf, &key, &value);
521
522 if ttl_secs > 0 {
524 if !self.lease_enabled {
526 return Err(StorageError::FeatureNotEnabled(
527 "TTL feature is not enabled on this server. \
528 Enable it in config: [raft.state_machine.lease] enabled = true".into()
529 ).into());
530 }
531
532 let lease = unsafe { self.lease.as_ref().unwrap_unchecked() };
534 lease.register(key.clone(), ttl_secs);
535 }
536
537 results.push(ApplyResult::success(entry.index));
539 }
540 Some(Operation::Delete(Delete { key })) => {
541 batch.delete_cf(&cf, &key);
542
543 if let Some(ref lease) = self.lease {
545 lease.unregister(&key);
546 }
547
548 results.push(ApplyResult::success(entry.index));
550 }
551 Some(Operation::CompareAndSwap(CompareAndSwap {
552 key,
553 expected_value,
554 new_value,
555 })) => {
556 let current_value = db.get_cf(&cf, &key).map_err(|e| {
559 StorageError::DbError(format!("CAS read failed: {e}"))
560 })?;
561
562 let cas_success = match (current_value, &expected_value) {
563 (Some(current), Some(expected)) => current == expected.as_ref(),
564 (None, None) => true,
565 _ => false,
566 };
567
568 if cas_success {
569 batch.put_cf(&cf, &key, &new_value);
570 }
571
572 results.push(if cas_success {
574 ApplyResult::success(entry.index)
575 } else {
576 ApplyResult::failure(entry.index)
577 });
578
579 debug!(
580 "CAS at index {}: key={:?}, success={}",
581 entry.index,
582 String::from_utf8_lossy(&key),
583 cas_success
584 );
585 }
586 None => {
587 warn!("WriteCommand without operation at index {}", entry.index);
588 }
589 },
590 Err(e) => {
591 error!(
592 "Failed to decode WriteCommand at index {}: {:?}",
593 entry.index, e
594 );
595 return Err(StorageError::SerializationError(e.to_string()).into());
596 }
597 },
598 Some(Payload::Config(_config_change)) => {
599 debug!("Ignoring config change at index {}", entry.index);
600 }
601 None => panic!("Entry payload variant should not be None!"),
602 }
603 }
604
605 self.apply_batch(batch)?;
606
607 if let Some(highest) = highest_index_entry {
614 self.update_last_applied(highest);
615 }
616
617 Ok(results)
618 }
619
620 fn len(&self) -> usize {
621 let db = self.db.load();
622 let cf = match db.cf_handle(STATE_MACHINE_CF) {
623 Some(cf) => cf,
624 None => return 0,
625 };
626
627 let iter = db.iterator_cf(&cf, IteratorMode::Start);
629 iter.count()
630 }
631
632 fn update_last_applied(
633 &self,
634 last_applied: LogId,
635 ) {
636 self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
637 self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
638 }
639
640 fn last_applied(&self) -> LogId {
641 LogId {
642 index: self.last_applied_index.load(Ordering::SeqCst),
643 term: self.last_applied_term.load(Ordering::SeqCst),
644 }
645 }
646
647 fn persist_last_applied(
648 &self,
649 last_applied: LogId,
650 ) -> Result<(), Error> {
651 self.update_last_applied(last_applied);
652 self.persist_state_machine_metadata()
653 }
654
655 fn update_last_snapshot_metadata(
656 &self,
657 snapshot_metadata: &SnapshotMetadata,
658 ) -> Result<(), Error> {
659 *self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
660 Ok(())
661 }
662
663 fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
664 self.last_snapshot_metadata.read().clone()
665 }
666
667 fn persist_last_snapshot_metadata(
668 &self,
669 snapshot_metadata: &SnapshotMetadata,
670 ) -> Result<(), Error> {
671 self.update_last_snapshot_metadata(snapshot_metadata)?;
672 self.persist_snapshot_metadata()
673 }
674
675 #[instrument(skip(self))]
676 async fn apply_snapshot_from_file(
677 &self,
678 metadata: &SnapshotMetadata,
679 snapshot_dir: std::path::PathBuf,
680 ) -> Result<(), Error> {
681 info!("Applying snapshot from checkpoint: {:?}", snapshot_dir);
682
683 self.is_serving.store(false, Ordering::SeqCst);
685 info!("Stopped serving requests for snapshot restoration");
686
687 {
689 let old_db = self.db.load();
690 old_db.flush().map_err(|e| StorageError::DbError(e.to_string()))?;
691 old_db.cancel_all_background_work(true);
692 info!("Flushed and stopped background work on old DB");
693 }
694
695 let temp_dir = tempfile::TempDir::new()?;
699 let temp_db_path = temp_dir.path().join("temp_db");
700 let temp_db = Self::open_db(&temp_db_path).map_err(|e| {
701 error!("Failed to create temporary DB: {:?}", e);
702 e
703 })?;
704
705 self.db.store(Arc::new(temp_db));
708 info!("Swapped to temporary DB, old DB lock released");
709
710 let backup_dir = self.db_path.with_extension("backup");
712
713 if backup_dir.exists() {
715 tokio::fs::remove_dir_all(&backup_dir).await?;
716 }
717
718 tokio::fs::rename(&self.db_path, &backup_dir).await?;
720 info!("Backed up current DB to: {:?}", backup_dir);
721
722 tokio::fs::rename(&snapshot_dir, &self.db_path).await.inspect_err(|_e| {
724 let _ = std::fs::rename(&backup_dir, &self.db_path);
726 })?;
727 info!("Moved checkpoint to DB path: {:?}", self.db_path);
728
729 let new_db = Self::open_db(&self.db_path).map_err(|e| {
731 let _ = std::fs::rename(&backup_dir, &self.db_path);
733 error!("Failed to open new DB, rolled back to backup: {:?}", e);
734 e
735 })?;
736
737 self.db.store(Arc::new(new_db));
739 info!("Atomically swapped to new DB instance");
740
741 if let Some(ref lease) = self.lease {
743 let ttl_path = self.db_path.join("ttl_state.bin");
744 if ttl_path.exists() {
745 let ttl_data = tokio::fs::read(&ttl_path).await?;
746 lease.reload(&ttl_data)?;
747
748 self.persist_ttl_metadata()?;
752
753 info!("Lease state restored from snapshot and persisted to metadata CF");
754 } else {
755 warn!("No lease state found in snapshot");
756 }
757 }
758
759 *self.last_snapshot_metadata.write() = Some(metadata.clone());
761 if let Some(last_included) = &metadata.last_included {
762 self.update_last_applied(*last_included);
763 }
764
765 self.is_serving.store(true, Ordering::SeqCst);
767 info!("Resumed serving requests");
768
769 if let Err(e) = tokio::fs::remove_dir_all(&backup_dir).await {
771 warn!("Failed to remove backup directory: {}", e);
772 } else {
773 info!("Cleaned up backup directory");
774 }
775
776 info!("Snapshot applied successfully - full DB restoration complete");
777 Ok(())
778 }
779
780 #[instrument(skip(self))]
781 async fn generate_snapshot_data(
782 &self,
783 new_snapshot_dir: std::path::PathBuf,
784 last_included: LogId,
785 ) -> Result<Bytes, Error> {
786 {
789 let db = self.db.load();
790 let checkpoint = rocksdb::checkpoint::Checkpoint::new(db.as_ref())
791 .map_err(|e| StorageError::DbError(e.to_string()))?;
792 checkpoint
793 .create_checkpoint(&new_snapshot_dir)
794 .map_err(|e| StorageError::DbError(e.to_string()))?;
795 } if let Some(ref lease) = self.lease {
799 let ttl_snapshot = lease.to_snapshot();
800 let ttl_path = new_snapshot_dir.join("ttl_state.bin");
801 tokio::fs::write(&ttl_path, ttl_snapshot).await?;
802 }
803
804 let checksum = [0; 32]; let snapshot_metadata = SnapshotMetadata {
807 last_included: Some(last_included),
808 checksum: Bytes::copy_from_slice(&checksum),
809 };
810 self.persist_last_snapshot_metadata(&snapshot_metadata)?;
811
812 info!("Snapshot generated at {:?} with TTL data", new_snapshot_dir);
813 Ok(Bytes::copy_from_slice(&checksum))
814 }
815
816 fn save_hard_state(&self) -> Result<(), Error> {
817 self.persist_state_machine_metadata()?;
818 self.persist_snapshot_metadata()?;
819 Ok(())
820 }
821
822 fn flush(&self) -> Result<(), Error> {
823 let db = self.db.load();
824
825 db.flush_wal(true).map_err(|e| StorageError::DbError(e.to_string()))?;
828 db.flush().map_err(|e| StorageError::DbError(e.to_string()))?;
830
831 self.persist_state_machine_metadata()?;
833
834 Ok(())
835 }
836
837 async fn flush_async(&self) -> Result<(), Error> {
838 self.flush()
839 }
840
841 #[instrument(skip(self))]
842 async fn reset(&self) -> Result<(), Error> {
843 let db = self.db.load();
844 let cf = db
845 .cf_handle(STATE_MACHINE_CF)
846 .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
847
848 let mut batch = WriteBatch::default();
850 let iter = db.iterator_cf(&cf, IteratorMode::Start);
851
852 for item in iter {
853 let (key, _) = item.map_err(|e| StorageError::DbError(e.to_string()))?;
854 batch.delete_cf(&cf, &key);
855 }
856
857 db.write(batch).map_err(|e| StorageError::DbError(e.to_string()))?;
858
859 self.last_applied_index.store(0, Ordering::SeqCst);
861 self.last_applied_term.store(0, Ordering::SeqCst);
862 *self.last_snapshot_metadata.write() = None;
863
864 self.persist_state_machine_metadata()?;
867 self.persist_snapshot_metadata()?;
868
869 info!("RocksDB state machine reset completed");
870 Ok(())
871 }
872
873 async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
874 let Some(ref lease) = self.lease else {
876 return Ok(vec![]);
877 };
878
879 let now = SystemTime::now();
881 let expired_keys = lease.get_expired_keys(now);
882
883 if expired_keys.is_empty() {
884 return Ok(vec![]);
885 }
886
887 debug!(
888 "Lease background cleanup: found {} expired keys",
889 expired_keys.len()
890 );
891
892 let db = self.db.load();
894 let cf = db
895 .cf_handle(STATE_MACHINE_CF)
896 .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
897
898 let mut batch = WriteBatch::default();
899 for key in &expired_keys {
900 batch.delete_cf(&cf, key);
901 }
902
903 self.apply_batch(batch)?;
904
905 info!(
906 "Lease background cleanup: deleted {} expired keys",
907 expired_keys.len()
908 );
909
910 Ok(expired_keys)
911 }
912}
913impl Drop for RocksDBStateMachine {
914 fn drop(&mut self) {
915 if let Err(e) = self.save_hard_state() {
918 error!("Failed to save hard state on drop: {}", e);
919 }
920
921 if let Err(e) = self.flush() {
923 error!("Failed to flush on drop: {}", e);
924 } else {
925 debug!("RocksDBStateMachine flushed successfully on drop");
926 }
927
928 self.db.load().cancel_all_background_work(true); debug!("RocksDB background work cancelled on drop");
931 }
932}