Skip to main content

d_engine_server/storage/adaptors/rocksdb/
rocksdb_state_machine.rs

1use std::sync::Arc;
2use std::sync::atomic::AtomicBool;
3use std::sync::atomic::AtomicU64;
4use std::sync::atomic::Ordering;
5use std::time::SystemTime;
6
7use arc_swap::ArcSwap;
8use bytes::Bytes;
9use d_engine_core::ApplyEntry;
10use d_engine_core::ApplyResult;
11use d_engine_core::Command;
12use d_engine_core::Error;
13use d_engine_core::Lease;
14use d_engine_core::ScanResult;
15use d_engine_core::StateMachine;
16use d_engine_core::StorageError;
17use d_engine_proto::common::LogId;
18use d_engine_proto::server::storage::SnapshotMetadata;
19use parking_lot::RwLock;
20use std::path::Path;
21
22use async_trait::async_trait;
23use rocksdb::Cache;
24use rocksdb::ColumnFamilyDescriptor;
25use rocksdb::DB;
26use rocksdb::Direction;
27use rocksdb::ExportImportFilesMetaData;
28use rocksdb::ImportColumnFamilyOptions;
29use rocksdb::IteratorMode;
30use rocksdb::LiveFile;
31use rocksdb::Options;
32use rocksdb::ReadOptions;
33use rocksdb::WriteBatch;
34use rocksdb::WriteBatchWithIndex;
35use serde::Deserialize;
36use serde::Serialize;
37use tracing::debug;
38use tracing::error;
39use tracing::info;
40use tracing::instrument;
41use tracing::warn;
42
43use crate::storage::DefaultLease;
44
45use super::STATE_MACHINE_CF;
46use super::STATE_MACHINE_META_CF;
47
48const LAST_APPLIED_INDEX_KEY: &[u8] = b"last_applied_index";
49const LAST_APPLIED_TERM_KEY: &[u8] = b"last_applied_term";
50const SNAPSHOT_METADATA_KEY: &[u8] = b"snapshot_metadata";
51const TTL_STATE_KEY: &[u8] = b"ttl_state";
52
53/// Persisted representation of `ExportImportFilesMetaData` for cross-node snapshot transfer.
54///
55/// `directory` is excluded — it is reconstructed from the local snapshot path on restore.
56#[derive(Serialize, Deserialize)]
57struct CfExportMeta {
58    db_comparator_name: String,
59    files: Vec<CfExportFile>,
60}
61
62#[derive(Serialize, Deserialize)]
63struct CfExportFile {
64    column_family_name: String,
65    name: String,
66    size: usize,
67    level: i32,
68    start_key: Option<Vec<u8>>,
69    end_key: Option<Vec<u8>>,
70    smallest_seqno: u64,
71    largest_seqno: u64,
72    num_entries: u64,
73    num_deletions: u64,
74}
75
76/// RocksDB-based state machine implementation with lease support
77#[derive(Debug)]
78pub struct RocksDBStateMachine {
79    db: Arc<ArcSwap<DB>>,
80    is_serving: AtomicBool,
81    last_applied_index: AtomicU64,
82    last_applied_term: AtomicU64,
83    last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
84
85    // Lease management for automatic key expiration
86    // DefaultLease is thread-safe internally (uses DashMap + Mutex)
87    // Injected by NodeBuilder after construction
88    lease: Option<Arc<DefaultLease>>,
89
90    /// Whether lease manager is enabled (immutable after init)
91    /// Set to true when lease is injected, never changes after that
92    ///
93    /// Invariant: lease_enabled == true ⟹ lease.is_some()
94    /// Performance: Allows safe unwrap_unchecked in hot paths
95    lease_enabled: bool,
96}
97
98/// Returns the lexicographic successor of `prefix` for use as an iterator upper bound.
99///
100/// Trims trailing 0xFF bytes (which would wrap to 0x00 on increment), then increments
101/// the last remaining byte. Returns None when all bytes are 0xFF (no upper bound needed;
102/// caller must use a starts_with guard instead).
103///
104/// Examples:
105///   [0x61, 0xFF] → Some([0x62])   — carry propagates past the 0xFF
106///   [0xFF, 0xFF] → None           — all-0xFF prefix, scan to end of keyspace
107///   [0x2F]       → Some([0x30])   — '/' (0x2F) → '0' (0x30), normal slash-prefix
108fn prefix_successor(prefix: &[u8]) -> Option<Vec<u8>> {
109    let mut upper = prefix.to_vec();
110    while upper.last() == Some(&0xFF) {
111        upper.pop();
112    }
113    if upper.is_empty() {
114        return None;
115    }
116    *upper.last_mut().unwrap() += 1;
117    Some(upper)
118}
119
120impl RocksDBStateMachine {
121    /// Opens (or creates) a dedicated RocksDB instance for state machine storage.
122    ///
123    /// Used when `unified_db = false` (default): each engine owns its own DB instance.
124    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
125        let db_opts = super::base_db_options();
126
127        let cache = Cache::new_lru_cache(128 * 1024 * 1024);
128        let sm_cf = ColumnFamilyDescriptor::new(STATE_MACHINE_CF, super::sm_cf_options(&cache));
129        let sm_meta_cf =
130            ColumnFamilyDescriptor::new(STATE_MACHINE_META_CF, super::meta_cf_options(&cache));
131
132        let db = DB::open_cf_descriptors(&db_opts, path, vec![sm_cf, sm_meta_cf])
133            .map_err(|e| StorageError::DbError(e.to_string()))?;
134        let db_arc = Arc::new(db);
135
136        let (last_applied_index, last_applied_term) = Self::load_state_machine_metadata(&db_arc)?;
137        let last_snapshot_metadata = Self::load_snapshot_metadata(&db_arc)?;
138
139        Ok(Self {
140            db: Arc::new(ArcSwap::new(db_arc)),
141            is_serving: AtomicBool::new(true),
142            last_applied_index: AtomicU64::new(last_applied_index),
143            last_applied_term: AtomicU64::new(last_applied_term),
144            last_snapshot_metadata: RwLock::new(last_snapshot_metadata),
145            lease: None,
146            lease_enabled: false,
147        })
148    }
149
150    /// Creates a state machine sharing an existing `Arc<DB>` (unified mode).
151    ///
152    /// Called by `RocksDBUnifiedEngine::open()`. The DB must already have
153    /// `STATE_MACHINE_CF` and `STATE_MACHINE_META_CF` column families open.
154    pub(super) fn from_shared_db(db: Arc<DB>) -> Result<Self, Error> {
155        let (last_applied_index, last_applied_term) = Self::load_state_machine_metadata(&db)?;
156        let last_snapshot_metadata = Self::load_snapshot_metadata(&db)?;
157
158        Ok(Self {
159            db: Arc::new(ArcSwap::new(db)),
160            is_serving: AtomicBool::new(true),
161            last_applied_index: AtomicU64::new(last_applied_index),
162            last_applied_term: AtomicU64::new(last_applied_term),
163            last_snapshot_metadata: RwLock::new(last_snapshot_metadata),
164            lease: None,
165            lease_enabled: false,
166        })
167    }
168
169    /// Sets the lease manager for this state machine.
170    ///
171    /// This is an internal method called by NodeBuilder during initialization.
172    /// The lease will also be restored from snapshot during `apply_snapshot_from_file()`.
173    /// Also available for testing and benchmarks.
174    pub fn set_lease(
175        &mut self,
176        lease: Arc<DefaultLease>,
177    ) {
178        // Mark lease as enabled (immutable after this point)
179        self.lease_enabled = true;
180        self.lease = Some(lease);
181    }
182
183    /// Replaces the underlying DB with `new_db`. Only available in `#[cfg(test)]`.
184    /// Used to inject a read-only or otherwise broken DB to exercise error paths.
185    #[cfg(test)]
186    pub(super) fn swap_db_for_test(
187        &self,
188        new_db: DB,
189    ) {
190        self.db.store(Arc::new(new_db));
191    }
192
193    // Injects lease configuration into this state machine.
194    //
195    // Framework-internal method: called by NodeBuilder::build() during initialization.
196    // Opens RocksDB with the standard configuration
197    // ========== Private helper methods ==========
198
199    fn load_state_machine_metadata(db: &Arc<DB>) -> Result<(u64, u64), Error> {
200        let cf = db
201            .cf_handle(STATE_MACHINE_META_CF)
202            .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
203
204        let index = match db
205            .get_cf(&cf, LAST_APPLIED_INDEX_KEY)
206            .map_err(|e| StorageError::DbError(e.to_string()))?
207        {
208            Some(bytes) if bytes.len() == 8 => u64::from_be_bytes([
209                bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
210            ]),
211            _ => 0,
212        };
213
214        let term = match db
215            .get_cf(&cf, LAST_APPLIED_TERM_KEY)
216            .map_err(|e| StorageError::DbError(e.to_string()))?
217        {
218            Some(bytes) if bytes.len() == 8 => u64::from_be_bytes([
219                bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
220            ]),
221            _ => 0,
222        };
223
224        Ok((index, term))
225    }
226
227    fn load_snapshot_metadata(db: &Arc<DB>) -> Result<Option<SnapshotMetadata>, Error> {
228        let cf = db
229            .cf_handle(STATE_MACHINE_META_CF)
230            .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
231
232        match db
233            .get_cf(&cf, SNAPSHOT_METADATA_KEY)
234            .map_err(|e| StorageError::DbError(e.to_string()))?
235        {
236            Some(bytes) => {
237                let metadata = bincode::deserialize(&bytes).map_err(StorageError::BincodeError)?;
238                Ok(Some(metadata))
239            }
240            None => Ok(None),
241        }
242    }
243
244    fn persist_state_machine_metadata(&self) -> Result<(), Error> {
245        let db = self.db.load();
246        let cf = db
247            .cf_handle(STATE_MACHINE_META_CF)
248            .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
249
250        let index = self.last_applied_index.load(Ordering::SeqCst);
251        let term = self.last_applied_term.load(Ordering::SeqCst);
252
253        db.put_cf(&cf, LAST_APPLIED_INDEX_KEY, index.to_be_bytes())
254            .map_err(|e| StorageError::DbError(e.to_string()))?;
255        db.put_cf(&cf, LAST_APPLIED_TERM_KEY, term.to_be_bytes())
256            .map_err(|e| StorageError::DbError(e.to_string()))?;
257
258        Ok(())
259    }
260
261    fn persist_snapshot_metadata(&self) -> Result<(), Error> {
262        let db = self.db.load();
263        let cf = db
264            .cf_handle(STATE_MACHINE_META_CF)
265            .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
266
267        if let Some(metadata) = self.last_snapshot_metadata.read().clone() {
268            let bytes = bincode::serialize(&metadata).map_err(StorageError::BincodeError)?;
269            db.put_cf(&cf, SNAPSHOT_METADATA_KEY, bytes)
270                .map_err(|e| StorageError::DbError(e.to_string()))?;
271        }
272        Ok(())
273    }
274
275    fn persist_ttl_metadata(&self) -> Result<(), Error> {
276        if let Some(ref lease) = self.lease {
277            let db = self.db.load();
278            let cf = db.cf_handle(STATE_MACHINE_META_CF).ok_or_else(|| {
279                StorageError::DbError("State machine meta CF not found".to_string())
280            })?;
281
282            let ttl_snapshot = lease.to_snapshot();
283
284            db.put_cf(&cf, TTL_STATE_KEY, ttl_snapshot)
285                .map_err(|e| StorageError::DbError(e.to_string()))?;
286
287            debug!("Persisted TTL state to RocksDB");
288        }
289        Ok(())
290    }
291
292    /// Loads TTL state from RocksDB metadata after lease injection.
293    ///
294    /// Called after NodeBuilder injects the lease.
295    /// Also available for testing and benchmarks.
296    pub async fn load_lease_data(&self) -> Result<(), Error> {
297        let Some(ref lease) = self.lease else {
298            return Ok(()); // No lease configured
299        };
300
301        let db = self.db.load();
302        let cf = db
303            .cf_handle(STATE_MACHINE_META_CF)
304            .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
305
306        match db
307            .get_cf(&cf, TTL_STATE_KEY)
308            .map_err(|e| StorageError::DbError(e.to_string()))?
309        {
310            Some(ttl_data) => {
311                lease.reload(&ttl_data)?;
312                debug!("Loaded TTL state from RocksDB: {} active TTLs", lease.len());
313            }
314            None => {
315                debug!("No TTL state found in RocksDB");
316            }
317        }
318
319        Ok(())
320    }
321
322    /// Piggyback cleanup: Remove expired keys with time budget
323    ///
324    /// This method is called during apply_chunk to cleanup expired keys
325    /// opportunistically (piggyback on existing Raft events).
326    ///
327    /// # Arguments
328    /// * `max_duration_ms` - Maximum time budget for cleanup (milliseconds)
329    ///
330    /// # Returns
331    /// Number of keys deleted
332    ///
333    /// # Performance
334    /// - Fast-path: ~10ns if no TTL keys exist (lazy activation check)
335    /// - Cleanup: O(log N + K) where K = expired keys
336    /// - Time-bounded: stops after max_duration_ms to avoid blocking Raft
337    #[allow(dead_code)]
338    fn maybe_cleanup_expired(
339        &self,
340        max_duration_ms: u64,
341    ) -> usize {
342        let start = std::time::Instant::now();
343        let now = SystemTime::now();
344        let mut deleted_count = 0;
345
346        // Fast path: skip if TTL never used (lazy activation)
347        if let Some(ref lease) = self.lease {
348            if !lease.has_lease_keys() {
349                return 0; // No TTL keys, skip cleanup (~10ns overhead)
350            }
351
352            // Quick check: any expired keys?
353            if !lease.may_have_expired_keys(now) {
354                return 0; // No expired keys, skip cleanup (~30ns overhead)
355            }
356        } else {
357            return 0; // No lease configured
358        }
359
360        // Get database handle
361        let db = self.db.load();
362        let cf = match db.cf_handle(STATE_MACHINE_CF) {
363            Some(cf) => cf,
364            None => {
365                error!("State machine CF not found during TTL cleanup");
366                return 0;
367            }
368        };
369
370        // Cleanup expired keys with time budget
371        let max_duration = std::time::Duration::from_millis(max_duration_ms);
372
373        loop {
374            // Check time budget
375            if start.elapsed() >= max_duration {
376                debug!(
377                    "Piggyback cleanup time budget exceeded: deleted {} keys in {:?}",
378                    deleted_count,
379                    start.elapsed()
380                );
381                break;
382            }
383
384            // Get next batch of expired keys
385            let expired_keys = if let Some(ref lease) = self.lease {
386                lease.get_expired_keys(now)
387            } else {
388                vec![]
389            };
390
391            if expired_keys.is_empty() {
392                break; // No more expired keys
393            }
394
395            // Delete expired keys from RocksDB using batch for efficiency
396            let mut batch = WriteBatch::default();
397            for key in expired_keys {
398                batch.delete_cf(&cf, &key);
399                deleted_count += 1;
400            }
401
402            // Apply batch delete
403            if let Err(e) = db.write(&batch) {
404                error!("Failed to delete expired keys: {}", e);
405                break;
406            }
407        }
408
409        if deleted_count > 0 {
410            debug!(
411                "Piggyback cleanup: deleted {} expired keys in {:?}",
412                deleted_count,
413                start.elapsed()
414            );
415        }
416
417        deleted_count
418    }
419
420    fn apply_batch(
421        &self,
422        batch: WriteBatch,
423    ) -> Result<(), Error> {
424        self.db.load().write(&batch).map_err(|e| StorageError::DbError(e.to_string()))?;
425        Ok(())
426    }
427
428    // ===== Snapshot restore helpers =====
429
430    /// Restore SM state from a snapshot (CF export format).
431    ///
432    /// Uses CF export: drop+import, O(1), no tombstones.
433    async fn restore_from_snapshot(
434        &self,
435        metadata: &SnapshotMetadata,
436        snapshot_dir: &std::path::Path,
437    ) -> Result<(), Error> {
438        let db = self.db.load();
439        Self::restore_from_cf_export(&db, snapshot_dir)?;
440
441        info!("Snapshot restore complete");
442
443        // Restore lease if configured
444        if let Some(ref lease) = self.lease {
445            let ttl_path = snapshot_dir.join("ttl_state.bin");
446            if ttl_path.exists() {
447                let ttl_data = tokio::fs::read(&ttl_path).await?;
448                lease.reload(&ttl_data)?;
449                self.persist_ttl_metadata()?;
450            } else {
451                warn!("No lease state found in snapshot");
452            }
453        }
454
455        *self.last_snapshot_metadata.write() = Some(metadata.clone());
456        if let Some(last_included) = &metadata.last_included {
457            self.update_last_applied(*last_included);
458        }
459
460        self.is_serving.store(true, Ordering::SeqCst);
461        info!("Snapshot applied successfully");
462        Ok(())
463    }
464
465    /// Restore SM CFs from a CF export snapshot (new format).
466    ///
467    /// drop_cf + create_column_family_with_import: atomic per CF, no tombstones,
468    /// O(1) via hard-links when source and DB are on the same filesystem.
469    fn restore_from_cf_export(
470        db: &DB,
471        snapshot_dir: &std::path::Path,
472    ) -> Result<(), Error> {
473        let sm_meta = Self::load_cf_export_metadata(
474            &snapshot_dir.join("sm_metadata.bin"),
475            &snapshot_dir.join("sm"),
476        )?;
477        let sm_meta_meta = Self::load_cf_export_metadata(
478            &snapshot_dir.join("sm_meta_metadata.bin"),
479            &snapshot_dir.join("sm_meta"),
480        )?;
481
482        let import_opts = ImportColumnFamilyOptions::default();
483        let cf_opts = Options::default();
484
485        // SAFETY NOTE: drop_cf + create_column_family_with_import is NOT atomic.
486        // If the process crashes between the two drop_cf calls and the imports, the CFs
487        // will be absent on restart. RocksDB does not support CF rename, so true atomicity
488        // is not achievable here. Recovery: the node will restart with missing CFs, return
489        // errors on reads, and wait for the leader to re-send the snapshot (issue #308).
490        db.drop_cf(STATE_MACHINE_CF).map_err(|e| StorageError::DbError(e.to_string()))?;
491        db.drop_cf(STATE_MACHINE_META_CF)
492            .map_err(|e| StorageError::DbError(e.to_string()))?;
493
494        // create_column_family_with_import requires at least one SST file.
495        // A CF with no files (e.g. sm_meta before its first flush) must use create_cf instead.
496        if sm_meta.get_files().is_empty() {
497            db.create_cf(STATE_MACHINE_CF, &cf_opts)
498                .map_err(|e| StorageError::DbError(e.to_string()))?;
499        } else {
500            db.create_column_family_with_import(&cf_opts, STATE_MACHINE_CF, &import_opts, &sm_meta)
501                .map_err(|e| StorageError::DbError(e.to_string()))?;
502        }
503
504        if sm_meta_meta.get_files().is_empty() {
505            db.create_cf(STATE_MACHINE_META_CF, &cf_opts)
506                .map_err(|e| StorageError::DbError(e.to_string()))?;
507        } else {
508            db.create_column_family_with_import(
509                &cf_opts,
510                STATE_MACHINE_META_CF,
511                &import_opts,
512                &sm_meta_meta,
513            )
514            .map_err(|e| StorageError::DbError(e.to_string()))?;
515        }
516
517        Ok(())
518    }
519
520    /// Serialize `ExportImportFilesMetaData` to a bincode file at `path`.
521    ///
522    /// `directory` is excluded; on restore it is reconstructed from the snapshot path.
523    fn save_cf_export_metadata(
524        metadata: &ExportImportFilesMetaData,
525        path: &std::path::Path,
526    ) -> Result<(), Error> {
527        let files = metadata
528            .get_files()
529            .into_iter()
530            .map(|f| CfExportFile {
531                column_family_name: f.column_family_name,
532                name: f.name,
533                size: f.size,
534                level: f.level,
535                start_key: f.start_key,
536                end_key: f.end_key,
537                smallest_seqno: f.smallest_seqno,
538                largest_seqno: f.largest_seqno,
539                num_entries: f.num_entries,
540                num_deletions: f.num_deletions,
541            })
542            .collect();
543
544        let meta = CfExportMeta {
545            db_comparator_name: metadata.get_db_comparator_name(),
546            files,
547        };
548
549        let bytes = bincode::serialize(&meta).map_err(StorageError::BincodeError)?;
550        std::fs::write(path, bytes).map_err(|e| StorageError::DbError(e.to_string()))?;
551        Ok(())
552    }
553
554    /// Deserialize `ExportImportFilesMetaData` from a bincode file, setting `directory` to
555    /// `actual_dir` (the local path of the exported SST files after snapshot transfer).
556    fn load_cf_export_metadata(
557        path: &std::path::Path,
558        actual_dir: &std::path::Path,
559    ) -> Result<ExportImportFilesMetaData, Error> {
560        let bytes = std::fs::read(path).map_err(|e| StorageError::DbError(e.to_string()))?;
561        let meta: CfExportMeta =
562            bincode::deserialize(&bytes).map_err(StorageError::BincodeError)?;
563
564        let dir_str = actual_dir
565            .to_str()
566            .ok_or_else(|| StorageError::DbError("Snapshot path is not valid UTF-8".to_string()))?
567            .to_string();
568
569        let live_files: Vec<LiveFile> = meta
570            .files
571            .into_iter()
572            .map(|f| LiveFile {
573                column_family_name: f.column_family_name,
574                name: f.name,
575                directory: dir_str.clone(),
576                size: f.size,
577                level: f.level,
578                start_key: f.start_key,
579                end_key: f.end_key,
580                smallest_seqno: f.smallest_seqno,
581                largest_seqno: f.largest_seqno,
582                num_entries: f.num_entries,
583                num_deletions: f.num_deletions,
584            })
585            .collect();
586
587        let mut export_metadata = ExportImportFilesMetaData::default();
588        export_metadata.set_db_comparator_name(&meta.db_comparator_name);
589        export_metadata
590            .set_files(&live_files)
591            .map_err(|e| StorageError::DbError(e.to_string()))?;
592
593        Ok(export_metadata)
594    }
595
596    pub(crate) fn map_snapshot_join_error(e: tokio::task::JoinError) -> StorageError {
597        let msg = if e.is_panic() {
598            format!("snapshot blocking task panicked: {e}")
599        } else {
600            format!("snapshot blocking task was cancelled: {e}")
601        };
602        StorageError::DbError(msg)
603    }
604
605    /// Scans all entries whose key starts with `prefix`.
606    ///
607    /// Uses `set_iterate_upper_bound(prefix_successor)` so RocksDB stops at the
608    /// block level — O(results), not O(total keys). No `prefix_extractor` needed.
609    /// `revision` in the result equals `last_applied_index` — the watch-filter anchor.
610    pub fn scan_prefix(
611        &self,
612        prefix: &[u8],
613    ) -> Result<ScanResult, Error> {
614        if prefix.is_empty() {
615            let revision = self.last_applied_index.load(Ordering::SeqCst);
616            return Ok(ScanResult {
617                entries: vec![],
618                revision,
619            });
620        }
621
622        let mut opts = ReadOptions::default();
623        // prefix_successor handles 0xFF carry: [0x61,0xFF] → [0x62], all-0xFF → None.
624        // When None, no upper bound is set and a starts_with guard terminates the scan.
625        if let Some(upper) = prefix_successor(prefix) {
626            opts.set_iterate_upper_bound(upper);
627        }
628
629        let db = self.db.load();
630        let cf = db
631            .cf_handle(STATE_MACHINE_CF)
632            .ok_or_else(|| StorageError::DbError("STATE_MACHINE_CF not found".into()))?;
633        let iter = db.iterator_cf_opt(&cf, opts, IteratorMode::From(prefix, Direction::Forward));
634
635        let mut entries = Vec::new();
636        for item in iter {
637            let (k, v) = item.map_err(|e| StorageError::DbError(e.to_string()))?;
638            if !k.starts_with(prefix) {
639                break;
640            }
641            entries.push((Bytes::copy_from_slice(&k), Bytes::copy_from_slice(&v)));
642        }
643
644        let revision = self.last_applied_index.load(Ordering::SeqCst);
645        Ok(ScanResult { entries, revision })
646    }
647}
648
649#[async_trait]
650impl StateMachine for RocksDBStateMachine {
651    async fn start(&self) -> Result<(), Error> {
652        self.is_serving.store(true, Ordering::SeqCst);
653
654        // Load persisted lease data if configured
655        if let Some(ref _lease) = self.lease {
656            self.load_lease_data().await?;
657            debug!("Lease data loaded during state machine initialization");
658        }
659
660        info!("RocksDB state machine started");
661        Ok(())
662    }
663
664    fn stop(&self) -> Result<(), Error> {
665        self.is_serving.store(false, Ordering::SeqCst);
666
667        // Graceful shutdown: persist TTL state to disk
668        // This ensures lease data survives across restarts
669        if let Err(e) = self.persist_ttl_metadata() {
670            error!("Failed to persist TTL metadata on shutdown: {:?}", e);
671            return Err(e);
672        }
673
674        info!("RocksDB state machine stopped");
675        Ok(())
676    }
677
678    fn is_running(&self) -> bool {
679        self.is_serving.load(Ordering::SeqCst)
680    }
681
682    fn get(
683        &self,
684        key_buffer: &[u8],
685    ) -> Result<Option<Bytes>, Error> {
686        // Guard against reads during snapshot restoration
687        // During apply_snapshot_from_file(), is_serving is set to false while the database
688        // is being replaced. This prevents reads from accessing temporary or inconsistent state.
689        if !self.is_serving.load(Ordering::SeqCst) {
690            return Err(StorageError::NotServing(
691                "State machine is restoring from snapshot".to_string(),
692            )
693            .into());
694        }
695
696        let db = self.db.load();
697        let cf = db
698            .cf_handle(STATE_MACHINE_CF)
699            .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
700
701        match db.get_cf(&cf, key_buffer).map_err(|e| StorageError::DbError(e.to_string()))? {
702            Some(value) => Ok(Some(Bytes::copy_from_slice(&value))),
703            None => Ok(None),
704        }
705    }
706
707    fn entry_term(
708        &self,
709        _entry_id: u64,
710    ) -> Option<u64> {
711        // In RocksDB state machine, we don't store term per key. This method is not typically used.
712        // If needed, we might need to change the design to store term along with value.
713        None
714    }
715
716    /// Thread-safe: called serially by single-task CommitHandler
717    #[instrument(skip(self, chunk))]
718    async fn apply_chunk(
719        &self,
720        chunk: &[ApplyEntry],
721    ) -> Result<Vec<ApplyResult>, Error> {
722        let db = self.db.load();
723        let cf = db
724            .cf_handle(STATE_MACHINE_CF)
725            .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
726
727        let mut batch = WriteBatchWithIndex::new(0, true);
728        let mut highest_index_entry: Option<LogId> = None;
729        let mut results = Vec::with_capacity(chunk.len());
730
731        for entry in chunk {
732            if let Some(prev) = highest_index_entry {
733                assert!(
734                    entry.index > prev.index,
735                    "apply_chunk: received unordered entry at index {} (prev={})",
736                    entry.index,
737                    prev.index
738                );
739            }
740            highest_index_entry = Some(LogId {
741                index: entry.index,
742                term: entry.term,
743            });
744
745            match &entry.command {
746                Command::Noop => {
747                    debug!("Handling NOOP command at index {}", entry.index);
748                    results.push(ApplyResult::success(entry.index));
749                }
750                Command::Insert {
751                    key,
752                    value,
753                    ttl_secs,
754                } => {
755                    batch.put_cf(&cf, key, value);
756
757                    if let Some(ttl) = ttl_secs {
758                        if !self.lease_enabled {
759                            return Err(StorageError::FeatureNotEnabled(
760                                "TTL feature is not enabled on this server. \
761                                 Enable it in config: [raft.state_machine.lease] enabled = true"
762                                    .into(),
763                            )
764                            .into());
765                        }
766                        // Safety: lease_enabled invariant ensures lease.is_some()
767                        let lease = unsafe { self.lease.as_ref().unwrap_unchecked() };
768                        lease.register(key.clone(), *ttl);
769                    }
770
771                    results.push(ApplyResult::success(entry.index));
772                }
773                Command::Delete { key } => {
774                    batch.delete_cf(&cf, key);
775                    if let Some(ref lease) = self.lease {
776                        lease.unregister(key);
777                    }
778                    results.push(ApplyResult::success(entry.index));
779                }
780                Command::CompareAndSwap {
781                    key,
782                    expected,
783                    value: new_value,
784                } => {
785                    // RocksDB doesn't have native CAS; implement via read-compare-write.
786                    // Read through WriteBatchWithIndex so earlier writes in this batch
787                    // are visible, preventing stale-read linearizability violations.
788                    let current_value = batch
789                        .get_from_batch_and_db_cf(&*db, &cf, key, &ReadOptions::default())
790                        .map_err(|e| StorageError::DbError(format!("CAS read failed: {e}")))?;
791
792                    let cas_success = match (current_value, expected) {
793                        (Some(current), Some(exp)) => current == exp.as_ref(),
794                        (None, None) => true,
795                        _ => false,
796                    };
797
798                    if cas_success {
799                        batch.put_cf(&cf, key, new_value);
800                    }
801
802                    results.push(if cas_success {
803                        ApplyResult::success(entry.index)
804                    } else {
805                        ApplyResult::failure(entry.index)
806                    });
807
808                    debug!(
809                        "CAS at index {}: key={:?}, success={}",
810                        entry.index,
811                        String::from_utf8_lossy(key),
812                        cas_success
813                    );
814                }
815            }
816        }
817
818        self.db
819            .load()
820            .write_wbwi(&batch)
821            .map_err(|e| StorageError::DbError(e.to_string()))?;
822
823        if let Some(highest) = highest_index_entry {
824            self.update_last_applied(highest);
825        }
826
827        Ok(results)
828    }
829
830    fn len(&self) -> usize {
831        let db = self.db.load();
832        let cf = match db.cf_handle(STATE_MACHINE_CF) {
833            Some(cf) => cf,
834            None => return 0,
835        };
836
837        // Note: This is an expensive operation because it iterates over all keys.
838        let iter = db.iterator_cf(&cf, IteratorMode::Start);
839        iter.count()
840    }
841
842    fn update_last_applied(
843        &self,
844        last_applied: LogId,
845    ) {
846        self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
847        self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
848    }
849
850    fn last_applied(&self) -> LogId {
851        LogId {
852            index: self.last_applied_index.load(Ordering::SeqCst),
853            term: self.last_applied_term.load(Ordering::SeqCst),
854        }
855    }
856
857    fn persist_last_applied(
858        &self,
859        last_applied: LogId,
860    ) -> Result<(), Error> {
861        self.update_last_applied(last_applied);
862        self.persist_state_machine_metadata()
863    }
864
865    fn update_last_snapshot_metadata(
866        &self,
867        snapshot_metadata: &SnapshotMetadata,
868    ) -> Result<(), Error> {
869        *self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
870        Ok(())
871    }
872
873    fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
874        self.last_snapshot_metadata.read().clone()
875    }
876
877    fn persist_last_snapshot_metadata(
878        &self,
879        snapshot_metadata: &SnapshotMetadata,
880    ) -> Result<(), Error> {
881        self.update_last_snapshot_metadata(snapshot_metadata)?;
882        self.persist_snapshot_metadata()
883    }
884
885    #[instrument(skip(self))]
886    async fn apply_snapshot_from_file(
887        &self,
888        metadata: &SnapshotMetadata,
889        snapshot_dir: std::path::PathBuf,
890    ) -> Result<(), Error> {
891        info!("Applying snapshot from: {:?}", snapshot_dir);
892
893        // Stop serving — prevents SM reads/writes during restore
894        self.is_serving.store(false, Ordering::SeqCst);
895
896        let result = self.restore_from_snapshot(metadata, &snapshot_dir).await;
897
898        if let Err(ref e) = result {
899            error!(
900                "Snapshot restore failed, resuming serving with pre-restore state: {:?}",
901                e
902            );
903            // Restore is_serving so the node stays alive and can accept future snapshots.
904            // last_applied_index was NOT updated on failure, so the leader will detect
905            // this follower is still behind and re-send the snapshot (see: issue #308).
906            self.is_serving.store(true, Ordering::SeqCst);
907        }
908
909        result
910    }
911
912    #[instrument(skip(self))]
913    async fn generate_snapshot_data(
914        &self,
915        new_snapshot_dir: std::path::PathBuf,
916        last_included: LogId,
917    ) -> Result<Bytes, Error> {
918        // Export only SM CFs: compact SST-only export, no Raft log data.
919        // export_column_family() creates only the final subdirectory (e.g. "sm"), so the parent
920        // (new_snapshot_dir) must already exist before calling it.
921        //
922        // All blocking RocksDB operations (create_dir_all, flush_cf_opt, export_column_family)
923        // run on tokio's blocking thread pool via spawn_blocking, not on an async worker thread.
924        // This prevents snapshot disk I/O from starving the Raft event loop under concurrent
925        // writes (#315).
926        let db = self.db.load_full(); // Arc<DB>: Send, safe to move into spawn_blocking
927        let dir = new_snapshot_dir.clone();
928        tokio::task::spawn_blocking(move || -> Result<(), Error> {
929            std::fs::create_dir_all(&dir)?;
930            {
931                let checkpoint = rocksdb::checkpoint::Checkpoint::new(db.as_ref())
932                    .map_err(|e| StorageError::DbError(e.to_string()))?;
933                let cf_sm = db
934                    .cf_handle(STATE_MACHINE_CF)
935                    .ok_or_else(|| StorageError::DbError("SM CF not found".to_string()))?;
936                let cf_sm_meta = db
937                    .cf_handle(STATE_MACHINE_META_CF)
938                    .ok_or_else(|| StorageError::DbError("SM meta CF not found".to_string()))?;
939
940                // Flush both CFs before export: export_column_family only captures SST files,
941                // not MemTable data. Without this, small CFs (e.g. sm_meta with only a few
942                // metadata keys) may never have been flushed and would export 0 files.
943                // Synchronous flush (FlushOptions::wait = true by default) is intentional:
944                // we must wait for flush to complete before export or in-flight writes are lost.
945                let flush_opts = rocksdb::FlushOptions::default();
946                db.flush_cf_opt(&cf_sm, &flush_opts)
947                    .map_err(|e| StorageError::DbError(e.to_string()))?;
948                db.flush_cf_opt(&cf_sm_meta, &flush_opts)
949                    .map_err(|e| StorageError::DbError(e.to_string()))?;
950
951                let sm_export = checkpoint
952                    .export_column_family(&cf_sm, dir.join("sm"))
953                    .map_err(|e| StorageError::DbError(e.to_string()))?;
954                let sm_meta_export = checkpoint
955                    .export_column_family(&cf_sm_meta, dir.join("sm_meta"))
956                    .map_err(|e| StorageError::DbError(e.to_string()))?;
957
958                Self::save_cf_export_metadata(&sm_export, &dir.join("sm_metadata.bin"))?;
959                Self::save_cf_export_metadata(&sm_meta_export, &dir.join("sm_meta_metadata.bin"))?;
960            } // checkpoint and CF handles dropped here
961            Ok(())
962        })
963        .await
964        .map_err(Self::map_snapshot_join_error)??;
965
966        // Persist lease state alongside the export (if configured)
967        if let Some(ref lease) = self.lease {
968            let ttl_snapshot = lease.to_snapshot();
969            let ttl_path = new_snapshot_dir.join("ttl_state.bin");
970            tokio::fs::write(&ttl_path, ttl_snapshot).await?;
971        }
972
973        // Update metadata
974        let checksum = [0; 32];
975        let snapshot_metadata = SnapshotMetadata {
976            last_included: Some(last_included),
977            checksum: Bytes::copy_from_slice(&checksum),
978        };
979        self.persist_last_snapshot_metadata(&snapshot_metadata)?;
980
981        info!("Snapshot generated at {:?}", new_snapshot_dir);
982        Ok(Bytes::copy_from_slice(&checksum))
983    }
984
985    fn save_hard_state(&self) -> Result<(), Error> {
986        self.persist_state_machine_metadata()?;
987        self.persist_snapshot_metadata()?;
988        Ok(())
989    }
990
991    fn flush(&self) -> Result<(), Error> {
992        let db = self.db.load();
993
994        // Step 1: Sync WAL to disk (critical!)
995        // true = sync to disk
996        db.flush_wal(true).map_err(|e| StorageError::DbError(e.to_string()))?;
997        // Step 2: Flush memtables to SST files
998        db.flush().map_err(|e| StorageError::DbError(e.to_string()))?;
999
1000        // Persist state machine metadata (last_applied_index, last_applied_term, snapshot_metadata)
1001        self.persist_state_machine_metadata()?;
1002
1003        Ok(())
1004    }
1005
1006    async fn flush_async(&self) -> Result<(), Error> {
1007        self.flush()
1008    }
1009
1010    #[instrument(skip(self))]
1011    async fn reset(&self) -> Result<(), Error> {
1012        let db = self.db.load();
1013        let cf = db
1014            .cf_handle(STATE_MACHINE_CF)
1015            .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
1016
1017        // Delete all keys in the state machine
1018        let mut batch = WriteBatch::default();
1019        let iter = db.iterator_cf(&cf, IteratorMode::Start);
1020
1021        for item in iter {
1022            let (key, _) = item.map_err(|e| StorageError::DbError(e.to_string()))?;
1023            batch.delete_cf(&cf, &key);
1024        }
1025
1026        db.write(&batch).map_err(|e| StorageError::DbError(e.to_string()))?;
1027
1028        // Reset metadata
1029        self.last_applied_index.store(0, Ordering::SeqCst);
1030        self.last_applied_term.store(0, Ordering::SeqCst);
1031        *self.last_snapshot_metadata.write() = None;
1032
1033        // Note: Lease is managed by NodeBuilder and doesn't need reset
1034
1035        self.persist_state_machine_metadata()?;
1036        self.persist_snapshot_metadata()?;
1037
1038        info!("RocksDB state machine reset completed");
1039        Ok(())
1040    }
1041
1042    async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
1043        // Fast path: no lease configured
1044        let Some(ref lease) = self.lease else {
1045            return Ok(vec![]);
1046        };
1047
1048        // Get all expired keys
1049        let now = SystemTime::now();
1050        let expired_keys = lease.get_expired_keys(now);
1051
1052        if expired_keys.is_empty() {
1053            return Ok(vec![]);
1054        }
1055
1056        debug!(
1057            "Lease background cleanup: found {} expired keys",
1058            expired_keys.len()
1059        );
1060
1061        // Delete expired keys from RocksDB
1062        let db = self.db.load();
1063        let cf = db
1064            .cf_handle(STATE_MACHINE_CF)
1065            .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
1066
1067        let mut batch = WriteBatch::default();
1068        for key in &expired_keys {
1069            batch.delete_cf(&cf, key);
1070        }
1071
1072        self.apply_batch(batch)?;
1073
1074        info!(
1075            "Lease background cleanup: deleted {} expired keys",
1076            expired_keys.len()
1077        );
1078
1079        Ok(expired_keys)
1080    }
1081
1082    fn scan_prefix(
1083        &self,
1084        prefix: &[u8],
1085    ) -> Result<ScanResult, Error> {
1086        self.scan_prefix(prefix)
1087    }
1088}
1089impl Drop for RocksDBStateMachine {
1090    fn drop(&mut self) {
1091        // save_hard_state() persists last_applied metadata before flush
1092        // This is critical to prevent replay of already-applied entries on restart
1093        if let Err(e) = self.save_hard_state() {
1094            error!("Failed to save hard state on drop: {}", e);
1095        }
1096
1097        // Then flush data to disk
1098        if let Err(e) = self.flush() {
1099            error!("Failed to flush on drop: {}", e);
1100        } else {
1101            debug!("RocksDBStateMachine flushed successfully on drop");
1102        }
1103
1104        // This ensures flush operations are truly finished
1105        self.db.load().cancel_all_background_work(true); // true = wait for completion
1106        debug!("RocksDB background work cancelled on drop");
1107    }
1108}