1use std::sync::Arc;
2use std::sync::atomic::AtomicBool;
3use std::sync::atomic::AtomicU64;
4use std::sync::atomic::Ordering;
5use std::time::SystemTime;
6
7use arc_swap::ArcSwap;
8use bytes::Bytes;
9use d_engine_core::ApplyEntry;
10use d_engine_core::ApplyResult;
11use d_engine_core::Command;
12use d_engine_core::Error;
13use d_engine_core::Lease;
14use d_engine_core::ScanResult;
15use d_engine_core::StateMachine;
16use d_engine_core::StorageError;
17use d_engine_proto::common::LogId;
18use d_engine_proto::server::storage::SnapshotMetadata;
19use parking_lot::RwLock;
20use std::path::Path;
21
22use async_trait::async_trait;
23use rocksdb::Cache;
24use rocksdb::ColumnFamilyDescriptor;
25use rocksdb::DB;
26use rocksdb::Direction;
27use rocksdb::ExportImportFilesMetaData;
28use rocksdb::ImportColumnFamilyOptions;
29use rocksdb::IteratorMode;
30use rocksdb::LiveFile;
31use rocksdb::Options;
32use rocksdb::ReadOptions;
33use rocksdb::WriteBatch;
34use rocksdb::WriteBatchWithIndex;
35use serde::Deserialize;
36use serde::Serialize;
37use tracing::debug;
38use tracing::error;
39use tracing::info;
40use tracing::instrument;
41use tracing::warn;
42
43use crate::storage::DefaultLease;
44
45use super::STATE_MACHINE_CF;
46use super::STATE_MACHINE_META_CF;
47
48const LAST_APPLIED_INDEX_KEY: &[u8] = b"last_applied_index";
49const LAST_APPLIED_TERM_KEY: &[u8] = b"last_applied_term";
50const SNAPSHOT_METADATA_KEY: &[u8] = b"snapshot_metadata";
51const TTL_STATE_KEY: &[u8] = b"ttl_state";
52
53#[derive(Serialize, Deserialize)]
57struct CfExportMeta {
58 db_comparator_name: String,
59 files: Vec<CfExportFile>,
60}
61
62#[derive(Serialize, Deserialize)]
63struct CfExportFile {
64 column_family_name: String,
65 name: String,
66 size: usize,
67 level: i32,
68 start_key: Option<Vec<u8>>,
69 end_key: Option<Vec<u8>>,
70 smallest_seqno: u64,
71 largest_seqno: u64,
72 num_entries: u64,
73 num_deletions: u64,
74}
75
76#[derive(Debug)]
78pub struct RocksDBStateMachine {
79 db: Arc<ArcSwap<DB>>,
80 is_serving: AtomicBool,
81 last_applied_index: AtomicU64,
82 last_applied_term: AtomicU64,
83 last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
84
85 lease: Option<Arc<DefaultLease>>,
89
90 lease_enabled: bool,
96}
97
98fn prefix_successor(prefix: &[u8]) -> Option<Vec<u8>> {
109 let mut upper = prefix.to_vec();
110 while upper.last() == Some(&0xFF) {
111 upper.pop();
112 }
113 if upper.is_empty() {
114 return None;
115 }
116 *upper.last_mut().unwrap() += 1;
117 Some(upper)
118}
119
120impl RocksDBStateMachine {
121 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
125 let db_opts = super::base_db_options();
126
127 let cache = Cache::new_lru_cache(128 * 1024 * 1024);
128 let sm_cf = ColumnFamilyDescriptor::new(STATE_MACHINE_CF, super::sm_cf_options(&cache));
129 let sm_meta_cf =
130 ColumnFamilyDescriptor::new(STATE_MACHINE_META_CF, super::meta_cf_options(&cache));
131
132 let db = DB::open_cf_descriptors(&db_opts, path, vec![sm_cf, sm_meta_cf])
133 .map_err(|e| StorageError::DbError(e.to_string()))?;
134 let db_arc = Arc::new(db);
135
136 let (last_applied_index, last_applied_term) = Self::load_state_machine_metadata(&db_arc)?;
137 let last_snapshot_metadata = Self::load_snapshot_metadata(&db_arc)?;
138
139 Ok(Self {
140 db: Arc::new(ArcSwap::new(db_arc)),
141 is_serving: AtomicBool::new(true),
142 last_applied_index: AtomicU64::new(last_applied_index),
143 last_applied_term: AtomicU64::new(last_applied_term),
144 last_snapshot_metadata: RwLock::new(last_snapshot_metadata),
145 lease: None,
146 lease_enabled: false,
147 })
148 }
149
150 pub(super) fn from_shared_db(db: Arc<DB>) -> Result<Self, Error> {
155 let (last_applied_index, last_applied_term) = Self::load_state_machine_metadata(&db)?;
156 let last_snapshot_metadata = Self::load_snapshot_metadata(&db)?;
157
158 Ok(Self {
159 db: Arc::new(ArcSwap::new(db)),
160 is_serving: AtomicBool::new(true),
161 last_applied_index: AtomicU64::new(last_applied_index),
162 last_applied_term: AtomicU64::new(last_applied_term),
163 last_snapshot_metadata: RwLock::new(last_snapshot_metadata),
164 lease: None,
165 lease_enabled: false,
166 })
167 }
168
169 pub fn set_lease(
175 &mut self,
176 lease: Arc<DefaultLease>,
177 ) {
178 self.lease_enabled = true;
180 self.lease = Some(lease);
181 }
182
183 #[cfg(test)]
186 pub(super) fn swap_db_for_test(
187 &self,
188 new_db: DB,
189 ) {
190 self.db.store(Arc::new(new_db));
191 }
192
193 fn load_state_machine_metadata(db: &Arc<DB>) -> Result<(u64, u64), Error> {
200 let cf = db
201 .cf_handle(STATE_MACHINE_META_CF)
202 .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
203
204 let index = match db
205 .get_cf(&cf, LAST_APPLIED_INDEX_KEY)
206 .map_err(|e| StorageError::DbError(e.to_string()))?
207 {
208 Some(bytes) if bytes.len() == 8 => u64::from_be_bytes([
209 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
210 ]),
211 _ => 0,
212 };
213
214 let term = match db
215 .get_cf(&cf, LAST_APPLIED_TERM_KEY)
216 .map_err(|e| StorageError::DbError(e.to_string()))?
217 {
218 Some(bytes) if bytes.len() == 8 => u64::from_be_bytes([
219 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
220 ]),
221 _ => 0,
222 };
223
224 Ok((index, term))
225 }
226
227 fn load_snapshot_metadata(db: &Arc<DB>) -> Result<Option<SnapshotMetadata>, Error> {
228 let cf = db
229 .cf_handle(STATE_MACHINE_META_CF)
230 .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
231
232 match db
233 .get_cf(&cf, SNAPSHOT_METADATA_KEY)
234 .map_err(|e| StorageError::DbError(e.to_string()))?
235 {
236 Some(bytes) => {
237 let metadata = bincode::deserialize(&bytes).map_err(StorageError::BincodeError)?;
238 Ok(Some(metadata))
239 }
240 None => Ok(None),
241 }
242 }
243
244 fn persist_state_machine_metadata(&self) -> Result<(), Error> {
245 let db = self.db.load();
246 let cf = db
247 .cf_handle(STATE_MACHINE_META_CF)
248 .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
249
250 let index = self.last_applied_index.load(Ordering::SeqCst);
251 let term = self.last_applied_term.load(Ordering::SeqCst);
252
253 db.put_cf(&cf, LAST_APPLIED_INDEX_KEY, index.to_be_bytes())
254 .map_err(|e| StorageError::DbError(e.to_string()))?;
255 db.put_cf(&cf, LAST_APPLIED_TERM_KEY, term.to_be_bytes())
256 .map_err(|e| StorageError::DbError(e.to_string()))?;
257
258 Ok(())
259 }
260
261 fn persist_snapshot_metadata(&self) -> Result<(), Error> {
262 let db = self.db.load();
263 let cf = db
264 .cf_handle(STATE_MACHINE_META_CF)
265 .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
266
267 if let Some(metadata) = self.last_snapshot_metadata.read().clone() {
268 let bytes = bincode::serialize(&metadata).map_err(StorageError::BincodeError)?;
269 db.put_cf(&cf, SNAPSHOT_METADATA_KEY, bytes)
270 .map_err(|e| StorageError::DbError(e.to_string()))?;
271 }
272 Ok(())
273 }
274
275 fn persist_ttl_metadata(&self) -> Result<(), Error> {
276 if let Some(ref lease) = self.lease {
277 let db = self.db.load();
278 let cf = db.cf_handle(STATE_MACHINE_META_CF).ok_or_else(|| {
279 StorageError::DbError("State machine meta CF not found".to_string())
280 })?;
281
282 let ttl_snapshot = lease.to_snapshot();
283
284 db.put_cf(&cf, TTL_STATE_KEY, ttl_snapshot)
285 .map_err(|e| StorageError::DbError(e.to_string()))?;
286
287 debug!("Persisted TTL state to RocksDB");
288 }
289 Ok(())
290 }
291
292 pub async fn load_lease_data(&self) -> Result<(), Error> {
297 let Some(ref lease) = self.lease else {
298 return Ok(()); };
300
301 let db = self.db.load();
302 let cf = db
303 .cf_handle(STATE_MACHINE_META_CF)
304 .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
305
306 match db
307 .get_cf(&cf, TTL_STATE_KEY)
308 .map_err(|e| StorageError::DbError(e.to_string()))?
309 {
310 Some(ttl_data) => {
311 lease.reload(&ttl_data)?;
312 debug!("Loaded TTL state from RocksDB: {} active TTLs", lease.len());
313 }
314 None => {
315 debug!("No TTL state found in RocksDB");
316 }
317 }
318
319 Ok(())
320 }
321
322 #[allow(dead_code)]
338 fn maybe_cleanup_expired(
339 &self,
340 max_duration_ms: u64,
341 ) -> usize {
342 let start = std::time::Instant::now();
343 let now = SystemTime::now();
344 let mut deleted_count = 0;
345
346 if let Some(ref lease) = self.lease {
348 if !lease.has_lease_keys() {
349 return 0; }
351
352 if !lease.may_have_expired_keys(now) {
354 return 0; }
356 } else {
357 return 0; }
359
360 let db = self.db.load();
362 let cf = match db.cf_handle(STATE_MACHINE_CF) {
363 Some(cf) => cf,
364 None => {
365 error!("State machine CF not found during TTL cleanup");
366 return 0;
367 }
368 };
369
370 let max_duration = std::time::Duration::from_millis(max_duration_ms);
372
373 loop {
374 if start.elapsed() >= max_duration {
376 debug!(
377 "Piggyback cleanup time budget exceeded: deleted {} keys in {:?}",
378 deleted_count,
379 start.elapsed()
380 );
381 break;
382 }
383
384 let expired_keys = if let Some(ref lease) = self.lease {
386 lease.get_expired_keys(now)
387 } else {
388 vec![]
389 };
390
391 if expired_keys.is_empty() {
392 break; }
394
395 let mut batch = WriteBatch::default();
397 for key in expired_keys {
398 batch.delete_cf(&cf, &key);
399 deleted_count += 1;
400 }
401
402 if let Err(e) = db.write(&batch) {
404 error!("Failed to delete expired keys: {}", e);
405 break;
406 }
407 }
408
409 if deleted_count > 0 {
410 debug!(
411 "Piggyback cleanup: deleted {} expired keys in {:?}",
412 deleted_count,
413 start.elapsed()
414 );
415 }
416
417 deleted_count
418 }
419
420 fn apply_batch(
421 &self,
422 batch: WriteBatch,
423 ) -> Result<(), Error> {
424 self.db.load().write(&batch).map_err(|e| StorageError::DbError(e.to_string()))?;
425 Ok(())
426 }
427
428 async fn restore_from_snapshot(
434 &self,
435 metadata: &SnapshotMetadata,
436 snapshot_dir: &std::path::Path,
437 ) -> Result<(), Error> {
438 let db = self.db.load();
439 Self::restore_from_cf_export(&db, snapshot_dir)?;
440
441 info!("Snapshot restore complete");
442
443 if let Some(ref lease) = self.lease {
445 let ttl_path = snapshot_dir.join("ttl_state.bin");
446 if ttl_path.exists() {
447 let ttl_data = tokio::fs::read(&ttl_path).await?;
448 lease.reload(&ttl_data)?;
449 self.persist_ttl_metadata()?;
450 } else {
451 warn!("No lease state found in snapshot");
452 }
453 }
454
455 *self.last_snapshot_metadata.write() = Some(metadata.clone());
456 if let Some(last_included) = &metadata.last_included {
457 self.update_last_applied(*last_included);
458 }
459
460 self.is_serving.store(true, Ordering::SeqCst);
461 info!("Snapshot applied successfully");
462 Ok(())
463 }
464
465 fn restore_from_cf_export(
470 db: &DB,
471 snapshot_dir: &std::path::Path,
472 ) -> Result<(), Error> {
473 let sm_meta = Self::load_cf_export_metadata(
474 &snapshot_dir.join("sm_metadata.bin"),
475 &snapshot_dir.join("sm"),
476 )?;
477 let sm_meta_meta = Self::load_cf_export_metadata(
478 &snapshot_dir.join("sm_meta_metadata.bin"),
479 &snapshot_dir.join("sm_meta"),
480 )?;
481
482 let import_opts = ImportColumnFamilyOptions::default();
483 let cf_opts = Options::default();
484
485 db.drop_cf(STATE_MACHINE_CF).map_err(|e| StorageError::DbError(e.to_string()))?;
491 db.drop_cf(STATE_MACHINE_META_CF)
492 .map_err(|e| StorageError::DbError(e.to_string()))?;
493
494 if sm_meta.get_files().is_empty() {
497 db.create_cf(STATE_MACHINE_CF, &cf_opts)
498 .map_err(|e| StorageError::DbError(e.to_string()))?;
499 } else {
500 db.create_column_family_with_import(&cf_opts, STATE_MACHINE_CF, &import_opts, &sm_meta)
501 .map_err(|e| StorageError::DbError(e.to_string()))?;
502 }
503
504 if sm_meta_meta.get_files().is_empty() {
505 db.create_cf(STATE_MACHINE_META_CF, &cf_opts)
506 .map_err(|e| StorageError::DbError(e.to_string()))?;
507 } else {
508 db.create_column_family_with_import(
509 &cf_opts,
510 STATE_MACHINE_META_CF,
511 &import_opts,
512 &sm_meta_meta,
513 )
514 .map_err(|e| StorageError::DbError(e.to_string()))?;
515 }
516
517 Ok(())
518 }
519
520 fn save_cf_export_metadata(
524 metadata: &ExportImportFilesMetaData,
525 path: &std::path::Path,
526 ) -> Result<(), Error> {
527 let files = metadata
528 .get_files()
529 .into_iter()
530 .map(|f| CfExportFile {
531 column_family_name: f.column_family_name,
532 name: f.name,
533 size: f.size,
534 level: f.level,
535 start_key: f.start_key,
536 end_key: f.end_key,
537 smallest_seqno: f.smallest_seqno,
538 largest_seqno: f.largest_seqno,
539 num_entries: f.num_entries,
540 num_deletions: f.num_deletions,
541 })
542 .collect();
543
544 let meta = CfExportMeta {
545 db_comparator_name: metadata.get_db_comparator_name(),
546 files,
547 };
548
549 let bytes = bincode::serialize(&meta).map_err(StorageError::BincodeError)?;
550 std::fs::write(path, bytes).map_err(|e| StorageError::DbError(e.to_string()))?;
551 Ok(())
552 }
553
554 fn load_cf_export_metadata(
557 path: &std::path::Path,
558 actual_dir: &std::path::Path,
559 ) -> Result<ExportImportFilesMetaData, Error> {
560 let bytes = std::fs::read(path).map_err(|e| StorageError::DbError(e.to_string()))?;
561 let meta: CfExportMeta =
562 bincode::deserialize(&bytes).map_err(StorageError::BincodeError)?;
563
564 let dir_str = actual_dir
565 .to_str()
566 .ok_or_else(|| StorageError::DbError("Snapshot path is not valid UTF-8".to_string()))?
567 .to_string();
568
569 let live_files: Vec<LiveFile> = meta
570 .files
571 .into_iter()
572 .map(|f| LiveFile {
573 column_family_name: f.column_family_name,
574 name: f.name,
575 directory: dir_str.clone(),
576 size: f.size,
577 level: f.level,
578 start_key: f.start_key,
579 end_key: f.end_key,
580 smallest_seqno: f.smallest_seqno,
581 largest_seqno: f.largest_seqno,
582 num_entries: f.num_entries,
583 num_deletions: f.num_deletions,
584 })
585 .collect();
586
587 let mut export_metadata = ExportImportFilesMetaData::default();
588 export_metadata.set_db_comparator_name(&meta.db_comparator_name);
589 export_metadata
590 .set_files(&live_files)
591 .map_err(|e| StorageError::DbError(e.to_string()))?;
592
593 Ok(export_metadata)
594 }
595
596 pub(crate) fn map_snapshot_join_error(e: tokio::task::JoinError) -> StorageError {
597 let msg = if e.is_panic() {
598 format!("snapshot blocking task panicked: {e}")
599 } else {
600 format!("snapshot blocking task was cancelled: {e}")
601 };
602 StorageError::DbError(msg)
603 }
604
605 pub fn scan_prefix(
611 &self,
612 prefix: &[u8],
613 ) -> Result<ScanResult, Error> {
614 if prefix.is_empty() {
615 let revision = self.last_applied_index.load(Ordering::SeqCst);
616 return Ok(ScanResult {
617 entries: vec![],
618 revision,
619 });
620 }
621
622 let mut opts = ReadOptions::default();
623 if let Some(upper) = prefix_successor(prefix) {
626 opts.set_iterate_upper_bound(upper);
627 }
628
629 let db = self.db.load();
630 let cf = db
631 .cf_handle(STATE_MACHINE_CF)
632 .ok_or_else(|| StorageError::DbError("STATE_MACHINE_CF not found".into()))?;
633 let iter = db.iterator_cf_opt(&cf, opts, IteratorMode::From(prefix, Direction::Forward));
634
635 let mut entries = Vec::new();
636 for item in iter {
637 let (k, v) = item.map_err(|e| StorageError::DbError(e.to_string()))?;
638 if !k.starts_with(prefix) {
639 break;
640 }
641 entries.push((Bytes::copy_from_slice(&k), Bytes::copy_from_slice(&v)));
642 }
643
644 let revision = self.last_applied_index.load(Ordering::SeqCst);
645 Ok(ScanResult { entries, revision })
646 }
647}
648
649#[async_trait]
650impl StateMachine for RocksDBStateMachine {
651 async fn start(&self) -> Result<(), Error> {
652 self.is_serving.store(true, Ordering::SeqCst);
653
654 if let Some(ref _lease) = self.lease {
656 self.load_lease_data().await?;
657 debug!("Lease data loaded during state machine initialization");
658 }
659
660 info!("RocksDB state machine started");
661 Ok(())
662 }
663
664 fn stop(&self) -> Result<(), Error> {
665 self.is_serving.store(false, Ordering::SeqCst);
666
667 if let Err(e) = self.persist_ttl_metadata() {
670 error!("Failed to persist TTL metadata on shutdown: {:?}", e);
671 return Err(e);
672 }
673
674 info!("RocksDB state machine stopped");
675 Ok(())
676 }
677
678 fn is_running(&self) -> bool {
679 self.is_serving.load(Ordering::SeqCst)
680 }
681
682 fn get(
683 &self,
684 key_buffer: &[u8],
685 ) -> Result<Option<Bytes>, Error> {
686 if !self.is_serving.load(Ordering::SeqCst) {
690 return Err(StorageError::NotServing(
691 "State machine is restoring from snapshot".to_string(),
692 )
693 .into());
694 }
695
696 let db = self.db.load();
697 let cf = db
698 .cf_handle(STATE_MACHINE_CF)
699 .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
700
701 match db.get_cf(&cf, key_buffer).map_err(|e| StorageError::DbError(e.to_string()))? {
702 Some(value) => Ok(Some(Bytes::copy_from_slice(&value))),
703 None => Ok(None),
704 }
705 }
706
707 fn entry_term(
708 &self,
709 _entry_id: u64,
710 ) -> Option<u64> {
711 None
714 }
715
716 #[instrument(skip(self, chunk))]
718 async fn apply_chunk(
719 &self,
720 chunk: &[ApplyEntry],
721 ) -> Result<Vec<ApplyResult>, Error> {
722 let db = self.db.load();
723 let cf = db
724 .cf_handle(STATE_MACHINE_CF)
725 .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
726
727 let mut batch = WriteBatchWithIndex::new(0, true);
728 let mut highest_index_entry: Option<LogId> = None;
729 let mut results = Vec::with_capacity(chunk.len());
730
731 for entry in chunk {
732 if let Some(prev) = highest_index_entry {
733 assert!(
734 entry.index > prev.index,
735 "apply_chunk: received unordered entry at index {} (prev={})",
736 entry.index,
737 prev.index
738 );
739 }
740 highest_index_entry = Some(LogId {
741 index: entry.index,
742 term: entry.term,
743 });
744
745 match &entry.command {
746 Command::Noop => {
747 debug!("Handling NOOP command at index {}", entry.index);
748 results.push(ApplyResult::success(entry.index));
749 }
750 Command::Insert {
751 key,
752 value,
753 ttl_secs,
754 } => {
755 batch.put_cf(&cf, key, value);
756
757 if let Some(ttl) = ttl_secs {
758 if !self.lease_enabled {
759 return Err(StorageError::FeatureNotEnabled(
760 "TTL feature is not enabled on this server. \
761 Enable it in config: [raft.state_machine.lease] enabled = true"
762 .into(),
763 )
764 .into());
765 }
766 let lease = unsafe { self.lease.as_ref().unwrap_unchecked() };
768 lease.register(key.clone(), *ttl);
769 }
770
771 results.push(ApplyResult::success(entry.index));
772 }
773 Command::Delete { key } => {
774 batch.delete_cf(&cf, key);
775 if let Some(ref lease) = self.lease {
776 lease.unregister(key);
777 }
778 results.push(ApplyResult::success(entry.index));
779 }
780 Command::CompareAndSwap {
781 key,
782 expected,
783 value: new_value,
784 } => {
785 let current_value = batch
789 .get_from_batch_and_db_cf(&*db, &cf, key, &ReadOptions::default())
790 .map_err(|e| StorageError::DbError(format!("CAS read failed: {e}")))?;
791
792 let cas_success = match (current_value, expected) {
793 (Some(current), Some(exp)) => current == exp.as_ref(),
794 (None, None) => true,
795 _ => false,
796 };
797
798 if cas_success {
799 batch.put_cf(&cf, key, new_value);
800 }
801
802 results.push(if cas_success {
803 ApplyResult::success(entry.index)
804 } else {
805 ApplyResult::failure(entry.index)
806 });
807
808 debug!(
809 "CAS at index {}: key={:?}, success={}",
810 entry.index,
811 String::from_utf8_lossy(key),
812 cas_success
813 );
814 }
815 }
816 }
817
818 self.db
819 .load()
820 .write_wbwi(&batch)
821 .map_err(|e| StorageError::DbError(e.to_string()))?;
822
823 if let Some(highest) = highest_index_entry {
824 self.update_last_applied(highest);
825 }
826
827 Ok(results)
828 }
829
830 fn len(&self) -> usize {
831 let db = self.db.load();
832 let cf = match db.cf_handle(STATE_MACHINE_CF) {
833 Some(cf) => cf,
834 None => return 0,
835 };
836
837 let iter = db.iterator_cf(&cf, IteratorMode::Start);
839 iter.count()
840 }
841
842 fn update_last_applied(
843 &self,
844 last_applied: LogId,
845 ) {
846 self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
847 self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
848 }
849
850 fn last_applied(&self) -> LogId {
851 LogId {
852 index: self.last_applied_index.load(Ordering::SeqCst),
853 term: self.last_applied_term.load(Ordering::SeqCst),
854 }
855 }
856
857 fn persist_last_applied(
858 &self,
859 last_applied: LogId,
860 ) -> Result<(), Error> {
861 self.update_last_applied(last_applied);
862 self.persist_state_machine_metadata()
863 }
864
865 fn update_last_snapshot_metadata(
866 &self,
867 snapshot_metadata: &SnapshotMetadata,
868 ) -> Result<(), Error> {
869 *self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
870 Ok(())
871 }
872
873 fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
874 self.last_snapshot_metadata.read().clone()
875 }
876
877 fn persist_last_snapshot_metadata(
878 &self,
879 snapshot_metadata: &SnapshotMetadata,
880 ) -> Result<(), Error> {
881 self.update_last_snapshot_metadata(snapshot_metadata)?;
882 self.persist_snapshot_metadata()
883 }
884
885 #[instrument(skip(self))]
886 async fn apply_snapshot_from_file(
887 &self,
888 metadata: &SnapshotMetadata,
889 snapshot_dir: std::path::PathBuf,
890 ) -> Result<(), Error> {
891 info!("Applying snapshot from: {:?}", snapshot_dir);
892
893 self.is_serving.store(false, Ordering::SeqCst);
895
896 let result = self.restore_from_snapshot(metadata, &snapshot_dir).await;
897
898 if let Err(ref e) = result {
899 error!(
900 "Snapshot restore failed, resuming serving with pre-restore state: {:?}",
901 e
902 );
903 self.is_serving.store(true, Ordering::SeqCst);
907 }
908
909 result
910 }
911
912 #[instrument(skip(self))]
913 async fn generate_snapshot_data(
914 &self,
915 new_snapshot_dir: std::path::PathBuf,
916 last_included: LogId,
917 ) -> Result<Bytes, Error> {
918 let db = self.db.load_full(); let dir = new_snapshot_dir.clone();
928 tokio::task::spawn_blocking(move || -> Result<(), Error> {
929 std::fs::create_dir_all(&dir)?;
930 {
931 let checkpoint = rocksdb::checkpoint::Checkpoint::new(db.as_ref())
932 .map_err(|e| StorageError::DbError(e.to_string()))?;
933 let cf_sm = db
934 .cf_handle(STATE_MACHINE_CF)
935 .ok_or_else(|| StorageError::DbError("SM CF not found".to_string()))?;
936 let cf_sm_meta = db
937 .cf_handle(STATE_MACHINE_META_CF)
938 .ok_or_else(|| StorageError::DbError("SM meta CF not found".to_string()))?;
939
940 let flush_opts = rocksdb::FlushOptions::default();
946 db.flush_cf_opt(&cf_sm, &flush_opts)
947 .map_err(|e| StorageError::DbError(e.to_string()))?;
948 db.flush_cf_opt(&cf_sm_meta, &flush_opts)
949 .map_err(|e| StorageError::DbError(e.to_string()))?;
950
951 let sm_export = checkpoint
952 .export_column_family(&cf_sm, dir.join("sm"))
953 .map_err(|e| StorageError::DbError(e.to_string()))?;
954 let sm_meta_export = checkpoint
955 .export_column_family(&cf_sm_meta, dir.join("sm_meta"))
956 .map_err(|e| StorageError::DbError(e.to_string()))?;
957
958 Self::save_cf_export_metadata(&sm_export, &dir.join("sm_metadata.bin"))?;
959 Self::save_cf_export_metadata(&sm_meta_export, &dir.join("sm_meta_metadata.bin"))?;
960 } Ok(())
962 })
963 .await
964 .map_err(Self::map_snapshot_join_error)??;
965
966 if let Some(ref lease) = self.lease {
968 let ttl_snapshot = lease.to_snapshot();
969 let ttl_path = new_snapshot_dir.join("ttl_state.bin");
970 tokio::fs::write(&ttl_path, ttl_snapshot).await?;
971 }
972
973 let checksum = [0; 32];
975 let snapshot_metadata = SnapshotMetadata {
976 last_included: Some(last_included),
977 checksum: Bytes::copy_from_slice(&checksum),
978 };
979 self.persist_last_snapshot_metadata(&snapshot_metadata)?;
980
981 info!("Snapshot generated at {:?}", new_snapshot_dir);
982 Ok(Bytes::copy_from_slice(&checksum))
983 }
984
985 fn save_hard_state(&self) -> Result<(), Error> {
986 self.persist_state_machine_metadata()?;
987 self.persist_snapshot_metadata()?;
988 Ok(())
989 }
990
991 fn flush(&self) -> Result<(), Error> {
992 let db = self.db.load();
993
994 db.flush_wal(true).map_err(|e| StorageError::DbError(e.to_string()))?;
997 db.flush().map_err(|e| StorageError::DbError(e.to_string()))?;
999
1000 self.persist_state_machine_metadata()?;
1002
1003 Ok(())
1004 }
1005
1006 async fn flush_async(&self) -> Result<(), Error> {
1007 self.flush()
1008 }
1009
1010 #[instrument(skip(self))]
1011 async fn reset(&self) -> Result<(), Error> {
1012 let db = self.db.load();
1013 let cf = db
1014 .cf_handle(STATE_MACHINE_CF)
1015 .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
1016
1017 let mut batch = WriteBatch::default();
1019 let iter = db.iterator_cf(&cf, IteratorMode::Start);
1020
1021 for item in iter {
1022 let (key, _) = item.map_err(|e| StorageError::DbError(e.to_string()))?;
1023 batch.delete_cf(&cf, &key);
1024 }
1025
1026 db.write(&batch).map_err(|e| StorageError::DbError(e.to_string()))?;
1027
1028 self.last_applied_index.store(0, Ordering::SeqCst);
1030 self.last_applied_term.store(0, Ordering::SeqCst);
1031 *self.last_snapshot_metadata.write() = None;
1032
1033 self.persist_state_machine_metadata()?;
1036 self.persist_snapshot_metadata()?;
1037
1038 info!("RocksDB state machine reset completed");
1039 Ok(())
1040 }
1041
1042 async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
1043 let Some(ref lease) = self.lease else {
1045 return Ok(vec![]);
1046 };
1047
1048 let now = SystemTime::now();
1050 let expired_keys = lease.get_expired_keys(now);
1051
1052 if expired_keys.is_empty() {
1053 return Ok(vec![]);
1054 }
1055
1056 debug!(
1057 "Lease background cleanup: found {} expired keys",
1058 expired_keys.len()
1059 );
1060
1061 let db = self.db.load();
1063 let cf = db
1064 .cf_handle(STATE_MACHINE_CF)
1065 .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
1066
1067 let mut batch = WriteBatch::default();
1068 for key in &expired_keys {
1069 batch.delete_cf(&cf, key);
1070 }
1071
1072 self.apply_batch(batch)?;
1073
1074 info!(
1075 "Lease background cleanup: deleted {} expired keys",
1076 expired_keys.len()
1077 );
1078
1079 Ok(expired_keys)
1080 }
1081
1082 fn scan_prefix(
1083 &self,
1084 prefix: &[u8],
1085 ) -> Result<ScanResult, Error> {
1086 self.scan_prefix(prefix)
1087 }
1088}
1089impl Drop for RocksDBStateMachine {
1090 fn drop(&mut self) {
1091 if let Err(e) = self.save_hard_state() {
1094 error!("Failed to save hard state on drop: {}", e);
1095 }
1096
1097 if let Err(e) = self.flush() {
1099 error!("Failed to flush on drop: {}", e);
1100 } else {
1101 debug!("RocksDBStateMachine flushed successfully on drop");
1102 }
1103
1104 self.db.load().cancel_all_background_work(true); debug!("RocksDB background work cancelled on drop");
1107 }
1108}