1use std::path::Path;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::Ordering;
7use std::time::SystemTime;
8
9use arc_swap::ArcSwap;
10use bytes::Bytes;
11use d_engine_core::Error;
12use d_engine_core::Lease;
13use d_engine_core::StateMachine;
14use d_engine_core::StorageError;
15use d_engine_proto::client::WriteCommand;
16use d_engine_proto::client::write_command::Delete;
17use d_engine_proto::client::write_command::Insert;
18use d_engine_proto::client::write_command::Operation;
19use d_engine_proto::common::Entry;
20use d_engine_proto::common::LogId;
21use d_engine_proto::common::entry_payload::Payload;
22use d_engine_proto::server::storage::SnapshotMetadata;
23use parking_lot::RwLock;
24use prost::Message;
25use rocksdb::Cache;
26use rocksdb::DB;
27use rocksdb::IteratorMode;
28use rocksdb::Options;
29use rocksdb::WriteBatch;
30use tonic::async_trait;
31use tracing::debug;
32use tracing::error;
33use tracing::info;
34use tracing::instrument;
35use tracing::warn;
36
37use crate::storage::DefaultLease;
38
39const STATE_MACHINE_CF: &str = "state_machine";
40const STATE_MACHINE_META_CF: &str = "state_machine_meta";
41const LAST_APPLIED_INDEX_KEY: &[u8] = b"last_applied_index";
42const LAST_APPLIED_TERM_KEY: &[u8] = b"last_applied_term";
43const SNAPSHOT_METADATA_KEY: &[u8] = b"snapshot_metadata";
44const TTL_STATE_KEY: &[u8] = b"ttl_state";
45
46#[derive(Debug)]
48pub struct RocksDBStateMachine {
49 db: Arc<ArcSwap<DB>>,
50 db_path: PathBuf,
51 is_serving: AtomicBool,
52 last_applied_index: AtomicU64,
53 last_applied_term: AtomicU64,
54 last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
55
56 lease: Option<Arc<DefaultLease>>,
60
61 lease_enabled: bool,
67}
68
69impl RocksDBStateMachine {
70 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
74 let db_path = path.as_ref().to_path_buf();
75
76 let opts = Self::configure_db_options();
78 let cfs = vec![STATE_MACHINE_CF, STATE_MACHINE_META_CF];
79
80 let db =
81 DB::open_cf(&opts, &db_path, cfs).map_err(|e| StorageError::DbError(e.to_string()))?;
82 let db_arc = Arc::new(db);
83
84 let (last_applied_index, last_applied_term) = Self::load_state_machine_metadata(&db_arc)?;
86 let last_snapshot_metadata = Self::load_snapshot_metadata(&db_arc)?;
87
88 Ok(Self {
89 db: Arc::new(ArcSwap::new(db_arc)),
90 db_path,
91 is_serving: AtomicBool::new(true),
92 last_applied_index: AtomicU64::new(last_applied_index),
93 last_applied_term: AtomicU64::new(last_applied_term),
94 last_snapshot_metadata: RwLock::new(last_snapshot_metadata),
95 lease: None, lease_enabled: false, })
98 }
99
100 pub fn set_lease(
106 &mut self,
107 lease: Arc<DefaultLease>,
108 ) {
109 self.lease_enabled = true;
111 self.lease = Some(lease);
112 }
113
114 fn configure_db_options() -> Options {
134 let mut opts = Options::default();
135 opts.create_if_missing(true);
136 opts.create_missing_column_families(true);
137
138 opts.set_max_write_buffer_number(4);
140 opts.set_min_write_buffer_number_to_merge(2);
141 opts.set_write_buffer_size(128 * 1024 * 1024); opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
145 opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
146 opts.set_compression_options(-14, 0, 0, 0); opts.set_wal_bytes_per_sync(1024 * 1024); opts.set_manual_wal_flush(true);
151 opts.set_use_fsync(false);
152
153 opts.set_max_background_jobs(4);
155 opts.set_max_open_files(5000);
156 opts.set_use_direct_io_for_flush_and_compaction(true);
157 opts.set_use_direct_reads(true);
158
159 opts.set_level_compaction_dynamic_level_bytes(true);
161 opts.set_target_file_size_base(64 * 1024 * 1024); opts.set_max_bytes_for_level_base(256 * 1024 * 1024); let cache = Cache::new_lru_cache(128 * 1024 * 1024); opts.set_row_cache(&cache);
167
168 opts
169 }
170
171 fn open_db<P: AsRef<Path>>(path: P) -> Result<DB, Error> {
172 let opts = Self::configure_db_options();
173 let cfs = vec![STATE_MACHINE_CF, STATE_MACHINE_META_CF];
174 DB::open_cf(&opts, path, cfs).map_err(|e| StorageError::DbError(e.to_string()).into())
175 }
176
177 fn load_state_machine_metadata(db: &Arc<DB>) -> Result<(u64, u64), Error> {
178 let cf = db
179 .cf_handle(STATE_MACHINE_META_CF)
180 .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
181
182 let index = match db
183 .get_cf(&cf, LAST_APPLIED_INDEX_KEY)
184 .map_err(|e| StorageError::DbError(e.to_string()))?
185 {
186 Some(bytes) if bytes.len() == 8 => u64::from_be_bytes([
187 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
188 ]),
189 _ => 0,
190 };
191
192 let term = match db
193 .get_cf(&cf, LAST_APPLIED_TERM_KEY)
194 .map_err(|e| StorageError::DbError(e.to_string()))?
195 {
196 Some(bytes) if bytes.len() == 8 => u64::from_be_bytes([
197 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
198 ]),
199 _ => 0,
200 };
201
202 Ok((index, term))
203 }
204
205 fn load_snapshot_metadata(db: &Arc<DB>) -> Result<Option<SnapshotMetadata>, Error> {
206 let cf = db
207 .cf_handle(STATE_MACHINE_META_CF)
208 .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
209
210 match db
211 .get_cf(&cf, SNAPSHOT_METADATA_KEY)
212 .map_err(|e| StorageError::DbError(e.to_string()))?
213 {
214 Some(bytes) => {
215 let metadata = bincode::deserialize(&bytes).map_err(StorageError::BincodeError)?;
216 Ok(Some(metadata))
217 }
218 None => Ok(None),
219 }
220 }
221
222 fn persist_state_machine_metadata(&self) -> Result<(), Error> {
223 let db = self.db.load();
224 let cf = db
225 .cf_handle(STATE_MACHINE_META_CF)
226 .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
227
228 let index = self.last_applied_index.load(Ordering::SeqCst);
229 let term = self.last_applied_term.load(Ordering::SeqCst);
230
231 db.put_cf(&cf, LAST_APPLIED_INDEX_KEY, index.to_be_bytes())
232 .map_err(|e| StorageError::DbError(e.to_string()))?;
233 db.put_cf(&cf, LAST_APPLIED_TERM_KEY, term.to_be_bytes())
234 .map_err(|e| StorageError::DbError(e.to_string()))?;
235
236 Ok(())
237 }
238
239 fn persist_snapshot_metadata(&self) -> Result<(), Error> {
240 let db = self.db.load();
241 let cf = db
242 .cf_handle(STATE_MACHINE_META_CF)
243 .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
244
245 if let Some(metadata) = self.last_snapshot_metadata.read().clone() {
246 let bytes = bincode::serialize(&metadata).map_err(StorageError::BincodeError)?;
247 db.put_cf(&cf, SNAPSHOT_METADATA_KEY, bytes)
248 .map_err(|e| StorageError::DbError(e.to_string()))?;
249 }
250 Ok(())
251 }
252
253 fn persist_ttl_metadata(&self) -> Result<(), Error> {
254 if let Some(ref lease) = self.lease {
255 let db = self.db.load();
256 let cf = db.cf_handle(STATE_MACHINE_META_CF).ok_or_else(|| {
257 StorageError::DbError("State machine meta CF not found".to_string())
258 })?;
259
260 let ttl_snapshot = lease.to_snapshot();
261
262 db.put_cf(&cf, TTL_STATE_KEY, ttl_snapshot)
263 .map_err(|e| StorageError::DbError(e.to_string()))?;
264
265 debug!("Persisted TTL state to RocksDB");
266 }
267 Ok(())
268 }
269
270 pub async fn load_lease_data(&self) -> Result<(), Error> {
275 let Some(ref lease) = self.lease else {
276 return Ok(()); };
278
279 let db = self.db.load();
280 let cf = db
281 .cf_handle(STATE_MACHINE_META_CF)
282 .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
283
284 match db
285 .get_cf(&cf, TTL_STATE_KEY)
286 .map_err(|e| StorageError::DbError(e.to_string()))?
287 {
288 Some(ttl_data) => {
289 lease.reload(&ttl_data)?;
290 debug!("Loaded TTL state from RocksDB: {} active TTLs", lease.len());
291 }
292 None => {
293 debug!("No TTL state found in RocksDB");
294 }
295 }
296
297 Ok(())
298 }
299
300 #[allow(dead_code)]
316 fn maybe_cleanup_expired(
317 &self,
318 max_duration_ms: u64,
319 ) -> usize {
320 let start = std::time::Instant::now();
321 let now = SystemTime::now();
322 let mut deleted_count = 0;
323
324 if let Some(ref lease) = self.lease {
326 if !lease.has_lease_keys() {
327 return 0; }
329
330 if !lease.may_have_expired_keys(now) {
332 return 0; }
334 } else {
335 return 0; }
337
338 let db = self.db.load();
340 let cf = match db.cf_handle(STATE_MACHINE_CF) {
341 Some(cf) => cf,
342 None => {
343 error!("State machine CF not found during TTL cleanup");
344 return 0;
345 }
346 };
347
348 let max_duration = std::time::Duration::from_millis(max_duration_ms);
350
351 loop {
352 if start.elapsed() >= max_duration {
354 debug!(
355 "Piggyback cleanup time budget exceeded: deleted {} keys in {:?}",
356 deleted_count,
357 start.elapsed()
358 );
359 break;
360 }
361
362 let expired_keys = if let Some(ref lease) = self.lease {
364 lease.get_expired_keys(now)
365 } else {
366 vec![]
367 };
368
369 if expired_keys.is_empty() {
370 break; }
372
373 let mut batch = WriteBatch::default();
375 for key in expired_keys {
376 batch.delete_cf(&cf, &key);
377 deleted_count += 1;
378 }
379
380 if let Err(e) = db.write(batch) {
382 error!("Failed to delete expired keys: {}", e);
383 break;
384 }
385 }
386
387 if deleted_count > 0 {
388 debug!(
389 "Piggyback cleanup: deleted {} expired keys in {:?}",
390 deleted_count,
391 start.elapsed()
392 );
393 }
394
395 deleted_count
396 }
397
398 fn apply_batch(
399 &self,
400 batch: WriteBatch,
401 ) -> Result<(), Error> {
402 self.db.load().write(batch).map_err(|e| StorageError::DbError(e.to_string()))?;
403 Ok(())
404 }
405}
406
407#[async_trait]
408impl StateMachine for RocksDBStateMachine {
409 async fn start(&self) -> Result<(), Error> {
410 self.is_serving.store(true, Ordering::SeqCst);
411
412 if let Some(ref _lease) = self.lease {
414 self.load_lease_data().await?;
415 debug!("Lease data loaded during state machine initialization");
416 }
417
418 info!("RocksDB state machine started");
419 Ok(())
420 }
421
422 fn stop(&self) -> Result<(), Error> {
423 self.is_serving.store(false, Ordering::SeqCst);
424
425 if let Err(e) = self.persist_ttl_metadata() {
428 error!("Failed to persist TTL metadata on shutdown: {:?}", e);
429 return Err(e);
430 }
431
432 info!("RocksDB state machine stopped");
433 Ok(())
434 }
435
436 fn is_running(&self) -> bool {
437 self.is_serving.load(Ordering::SeqCst)
438 }
439
440 fn get(
441 &self,
442 key_buffer: &[u8],
443 ) -> Result<Option<Bytes>, Error> {
444 if !self.is_serving.load(Ordering::SeqCst) {
448 return Err(StorageError::NotServing(
449 "State machine is restoring from snapshot".to_string(),
450 )
451 .into());
452 }
453
454 let db = self.db.load();
455 let cf = db
456 .cf_handle(STATE_MACHINE_CF)
457 .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
458
459 match db.get_cf(&cf, key_buffer).map_err(|e| StorageError::DbError(e.to_string()))? {
460 Some(value) => Ok(Some(Bytes::copy_from_slice(&value))),
461 None => Ok(None),
462 }
463 }
464
465 fn entry_term(
466 &self,
467 _entry_id: u64,
468 ) -> Option<u64> {
469 None
472 }
473
474 #[instrument(skip(self, chunk))]
476 async fn apply_chunk(
477 &self,
478 chunk: Vec<Entry>,
479 ) -> Result<(), Error> {
480 let db = self.db.load();
481 let cf = db
482 .cf_handle(STATE_MACHINE_CF)
483 .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
484
485 let mut batch = WriteBatch::default();
486 let mut highest_index_entry: Option<LogId> = None;
487
488 for entry in chunk {
489 assert!(entry.payload.is_some(), "Entry payload should not be None!");
490
491 if let Some(prev) = highest_index_entry {
492 assert!(
493 entry.index > prev.index,
494 "apply_chunk: received unordered entry at index {} (prev={})",
495 entry.index,
496 prev.index
497 );
498 }
499 highest_index_entry = Some(LogId {
500 index: entry.index,
501 term: entry.term,
502 });
503
504 match entry.payload.unwrap().payload {
505 Some(Payload::Noop(_)) => {
506 debug!("Handling NOOP command at index {}", entry.index);
507 }
508 Some(Payload::Command(data)) => match WriteCommand::decode(&data[..]) {
509 Ok(write_cmd) => match write_cmd.operation {
510 Some(Operation::Insert(Insert {
511 key,
512 value,
513 ttl_secs,
514 })) => {
515 batch.put_cf(&cf, &key, &value);
516
517 if ttl_secs > 0 {
519 if !self.lease_enabled {
521 return Err(StorageError::FeatureNotEnabled(
522 "TTL feature is not enabled on this server. \
523 Enable it in config: [raft.state_machine.lease] enabled = true".into()
524 ).into());
525 }
526
527 let lease = unsafe { self.lease.as_ref().unwrap_unchecked() };
529 lease.register(key.clone(), ttl_secs);
530 }
531 }
532 Some(Operation::Delete(Delete { key })) => {
533 batch.delete_cf(&cf, &key);
534
535 if let Some(ref lease) = self.lease {
537 lease.unregister(&key);
538 }
539 }
540 None => {
541 warn!("WriteCommand without operation at index {}", entry.index);
542 }
543 },
544 Err(e) => {
545 error!(
546 "Failed to decode WriteCommand at index {}: {:?}",
547 entry.index, e
548 );
549 return Err(StorageError::SerializationError(e.to_string()).into());
550 }
551 },
552 Some(Payload::Config(_config_change)) => {
553 debug!("Ignoring config change at index {}", entry.index);
554 }
555 None => panic!("Entry payload variant should not be None!"),
556 }
557 }
558
559 self.apply_batch(batch)?;
560
561 if let Some(log_id) = highest_index_entry {
567 self.update_last_applied(log_id);
568 }
569
570 Ok(())
571 }
572
573 fn len(&self) -> usize {
574 let db = self.db.load();
575 let cf = match db.cf_handle(STATE_MACHINE_CF) {
576 Some(cf) => cf,
577 None => return 0,
578 };
579
580 let iter = db.iterator_cf(&cf, IteratorMode::Start);
582 iter.count()
583 }
584
585 fn update_last_applied(
586 &self,
587 last_applied: LogId,
588 ) {
589 self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
590 self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
591 }
592
593 fn last_applied(&self) -> LogId {
594 LogId {
595 index: self.last_applied_index.load(Ordering::SeqCst),
596 term: self.last_applied_term.load(Ordering::SeqCst),
597 }
598 }
599
600 fn persist_last_applied(
601 &self,
602 last_applied: LogId,
603 ) -> Result<(), Error> {
604 self.update_last_applied(last_applied);
605 self.persist_state_machine_metadata()
606 }
607
608 fn update_last_snapshot_metadata(
609 &self,
610 snapshot_metadata: &SnapshotMetadata,
611 ) -> Result<(), Error> {
612 *self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
613 Ok(())
614 }
615
616 fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
617 self.last_snapshot_metadata.read().clone()
618 }
619
620 fn persist_last_snapshot_metadata(
621 &self,
622 snapshot_metadata: &SnapshotMetadata,
623 ) -> Result<(), Error> {
624 self.update_last_snapshot_metadata(snapshot_metadata)?;
625 self.persist_snapshot_metadata()
626 }
627
628 #[instrument(skip(self))]
629 async fn apply_snapshot_from_file(
630 &self,
631 metadata: &SnapshotMetadata,
632 snapshot_dir: std::path::PathBuf,
633 ) -> Result<(), Error> {
634 info!("Applying snapshot from checkpoint: {:?}", snapshot_dir);
635
636 self.is_serving.store(false, Ordering::SeqCst);
638 info!("Stopped serving requests for snapshot restoration");
639
640 {
642 let old_db = self.db.load();
643 old_db.flush().map_err(|e| StorageError::DbError(e.to_string()))?;
644 old_db.cancel_all_background_work(true);
645 info!("Flushed and stopped background work on old DB");
646 }
647
648 let temp_dir = tempfile::TempDir::new()?;
652 let temp_db_path = temp_dir.path().join("temp_db");
653 let temp_db = Self::open_db(&temp_db_path).map_err(|e| {
654 error!("Failed to create temporary DB: {:?}", e);
655 e
656 })?;
657
658 self.db.store(Arc::new(temp_db));
661 info!("Swapped to temporary DB, old DB lock released");
662
663 let backup_dir = self.db_path.with_extension("backup");
665
666 if backup_dir.exists() {
668 tokio::fs::remove_dir_all(&backup_dir).await?;
669 }
670
671 tokio::fs::rename(&self.db_path, &backup_dir).await?;
673 info!("Backed up current DB to: {:?}", backup_dir);
674
675 tokio::fs::rename(&snapshot_dir, &self.db_path).await.inspect_err(|_e| {
677 let _ = std::fs::rename(&backup_dir, &self.db_path);
679 })?;
680 info!("Moved checkpoint to DB path: {:?}", self.db_path);
681
682 let new_db = Self::open_db(&self.db_path).map_err(|e| {
684 let _ = std::fs::rename(&backup_dir, &self.db_path);
686 error!("Failed to open new DB, rolled back to backup: {:?}", e);
687 e
688 })?;
689
690 self.db.store(Arc::new(new_db));
692 info!("Atomically swapped to new DB instance");
693
694 if let Some(ref lease) = self.lease {
696 let ttl_path = self.db_path.join("ttl_state.bin");
697 if ttl_path.exists() {
698 let ttl_data = tokio::fs::read(&ttl_path).await?;
699 lease.reload(&ttl_data)?;
700
701 self.persist_ttl_metadata()?;
705
706 info!("Lease state restored from snapshot and persisted to metadata CF");
707 } else {
708 warn!("No lease state found in snapshot");
709 }
710 }
711
712 *self.last_snapshot_metadata.write() = Some(metadata.clone());
714 if let Some(last_included) = &metadata.last_included {
715 self.update_last_applied(*last_included);
716 }
717
718 self.is_serving.store(true, Ordering::SeqCst);
720 info!("Resumed serving requests");
721
722 if let Err(e) = tokio::fs::remove_dir_all(&backup_dir).await {
724 warn!("Failed to remove backup directory: {}", e);
725 } else {
726 info!("Cleaned up backup directory");
727 }
728
729 info!("Snapshot applied successfully - full DB restoration complete");
730 Ok(())
731 }
732
733 #[instrument(skip(self))]
734 async fn generate_snapshot_data(
735 &self,
736 new_snapshot_dir: std::path::PathBuf,
737 last_included: LogId,
738 ) -> Result<Bytes, Error> {
739 {
742 let db = self.db.load();
743 let checkpoint = rocksdb::checkpoint::Checkpoint::new(db.as_ref())
744 .map_err(|e| StorageError::DbError(e.to_string()))?;
745 checkpoint
746 .create_checkpoint(&new_snapshot_dir)
747 .map_err(|e| StorageError::DbError(e.to_string()))?;
748 } if let Some(ref lease) = self.lease {
752 let ttl_snapshot = lease.to_snapshot();
753 let ttl_path = new_snapshot_dir.join("ttl_state.bin");
754 tokio::fs::write(&ttl_path, ttl_snapshot).await?;
755 }
756
757 let checksum = [0; 32]; let snapshot_metadata = SnapshotMetadata {
760 last_included: Some(last_included),
761 checksum: Bytes::copy_from_slice(&checksum),
762 };
763 self.persist_last_snapshot_metadata(&snapshot_metadata)?;
764
765 info!("Snapshot generated at {:?} with TTL data", new_snapshot_dir);
766 Ok(Bytes::copy_from_slice(&checksum))
767 }
768
769 fn save_hard_state(&self) -> Result<(), Error> {
770 self.persist_state_machine_metadata()?;
771 self.persist_snapshot_metadata()?;
772 Ok(())
773 }
774
775 fn flush(&self) -> Result<(), Error> {
776 let db = self.db.load();
777
778 db.flush_wal(true).map_err(|e| StorageError::DbError(e.to_string()))?;
781 db.flush().map_err(|e| StorageError::DbError(e.to_string()))?;
783
784 self.persist_state_machine_metadata()?;
786
787 Ok(())
788 }
789
790 async fn flush_async(&self) -> Result<(), Error> {
791 self.flush()
792 }
793
794 #[instrument(skip(self))]
795 async fn reset(&self) -> Result<(), Error> {
796 let db = self.db.load();
797 let cf = db
798 .cf_handle(STATE_MACHINE_CF)
799 .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
800
801 let mut batch = WriteBatch::default();
803 let iter = db.iterator_cf(&cf, IteratorMode::Start);
804
805 for item in iter {
806 let (key, _) = item.map_err(|e| StorageError::DbError(e.to_string()))?;
807 batch.delete_cf(&cf, &key);
808 }
809
810 db.write(batch).map_err(|e| StorageError::DbError(e.to_string()))?;
811
812 self.last_applied_index.store(0, Ordering::SeqCst);
814 self.last_applied_term.store(0, Ordering::SeqCst);
815 *self.last_snapshot_metadata.write() = None;
816
817 self.persist_state_machine_metadata()?;
820 self.persist_snapshot_metadata()?;
821
822 info!("RocksDB state machine reset completed");
823 Ok(())
824 }
825
826 async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
827 let Some(ref lease) = self.lease else {
829 return Ok(vec![]);
830 };
831
832 let now = SystemTime::now();
834 let expired_keys = lease.get_expired_keys(now);
835
836 if expired_keys.is_empty() {
837 return Ok(vec![]);
838 }
839
840 debug!(
841 "Lease background cleanup: found {} expired keys",
842 expired_keys.len()
843 );
844
845 let db = self.db.load();
847 let cf = db
848 .cf_handle(STATE_MACHINE_CF)
849 .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
850
851 let mut batch = WriteBatch::default();
852 for key in &expired_keys {
853 batch.delete_cf(&cf, key);
854 }
855
856 self.apply_batch(batch)?;
857
858 info!(
859 "Lease background cleanup: deleted {} expired keys",
860 expired_keys.len()
861 );
862
863 Ok(expired_keys)
864 }
865}
866impl Drop for RocksDBStateMachine {
867 fn drop(&mut self) {
868 if let Err(e) = self.save_hard_state() {
871 error!("Failed to save hard state on drop: {}", e);
872 }
873
874 if let Err(e) = self.flush() {
876 error!("Failed to flush on drop: {}", e);
877 } else {
878 debug!("RocksDBStateMachine flushed successfully on drop");
879 }
880
881 self.db.load().cancel_all_background_work(true); debug!("RocksDB background work cancelled on drop");
884 }
885}