d_engine_server/storage/adaptors/rocksdb/
rocksdb_state_machine.rs

1use std::path::Path;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::Ordering;
7use std::time::SystemTime;
8
9use arc_swap::ArcSwap;
10use bytes::Bytes;
11use d_engine_core::Error;
12use d_engine_core::Lease;
13use d_engine_core::StateMachine;
14use d_engine_core::StorageError;
15use d_engine_proto::client::WriteCommand;
16use d_engine_proto::client::write_command::Delete;
17use d_engine_proto::client::write_command::Insert;
18use d_engine_proto::client::write_command::Operation;
19use d_engine_proto::common::Entry;
20use d_engine_proto::common::LogId;
21use d_engine_proto::common::entry_payload::Payload;
22use d_engine_proto::server::storage::SnapshotMetadata;
23use parking_lot::RwLock;
24use prost::Message;
25use rocksdb::Cache;
26use rocksdb::DB;
27use rocksdb::IteratorMode;
28use rocksdb::Options;
29use rocksdb::WriteBatch;
30use tonic::async_trait;
31use tracing::debug;
32use tracing::error;
33use tracing::info;
34use tracing::instrument;
35use tracing::warn;
36
37use crate::storage::DefaultLease;
38
39const STATE_MACHINE_CF: &str = "state_machine";
40const STATE_MACHINE_META_CF: &str = "state_machine_meta";
41const LAST_APPLIED_INDEX_KEY: &[u8] = b"last_applied_index";
42const LAST_APPLIED_TERM_KEY: &[u8] = b"last_applied_term";
43const SNAPSHOT_METADATA_KEY: &[u8] = b"snapshot_metadata";
44const TTL_STATE_KEY: &[u8] = b"ttl_state";
45
46/// RocksDB-based state machine implementation with lease support
47#[derive(Debug)]
48pub struct RocksDBStateMachine {
49    db: Arc<ArcSwap<DB>>,
50    db_path: PathBuf,
51    is_serving: AtomicBool,
52    last_applied_index: AtomicU64,
53    last_applied_term: AtomicU64,
54    last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
55
56    // Lease management for automatic key expiration
57    // DefaultLease is thread-safe internally (uses DashMap + Mutex)
58    // Injected by NodeBuilder after construction
59    lease: Option<Arc<DefaultLease>>,
60
61    /// Whether lease manager is enabled (immutable after init)
62    /// Set to true when lease is injected, never changes after that
63    ///
64    /// Invariant: lease_enabled == true ⟹ lease.is_some()
65    /// Performance: Allows safe unwrap_unchecked in hot paths
66    lease_enabled: bool,
67}
68
69impl RocksDBStateMachine {
70    /// Creates a new RocksDB-based state machine
71    ///
72    /// Lease will be injected by NodeBuilder after construction.
73    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
74        let db_path = path.as_ref().to_path_buf();
75
76        // Configure RocksDB options using shared configuration
77        let opts = Self::configure_db_options();
78        let cfs = vec![STATE_MACHINE_CF, STATE_MACHINE_META_CF];
79
80        let db =
81            DB::open_cf(&opts, &db_path, cfs).map_err(|e| StorageError::DbError(e.to_string()))?;
82        let db_arc = Arc::new(db);
83
84        // Load metadata
85        let (last_applied_index, last_applied_term) = Self::load_state_machine_metadata(&db_arc)?;
86        let last_snapshot_metadata = Self::load_snapshot_metadata(&db_arc)?;
87
88        Ok(Self {
89            db: Arc::new(ArcSwap::new(db_arc)),
90            db_path,
91            is_serving: AtomicBool::new(true),
92            last_applied_index: AtomicU64::new(last_applied_index),
93            last_applied_term: AtomicU64::new(last_applied_term),
94            last_snapshot_metadata: RwLock::new(last_snapshot_metadata),
95            lease: None,          // Will be injected by NodeBuilder
96            lease_enabled: false, // Default: no lease until set
97        })
98    }
99
100    /// Sets the lease manager for this state machine.
101    ///
102    /// This is an internal method called by NodeBuilder during initialization.
103    /// The lease will also be restored from snapshot during `apply_snapshot_from_file()`.
104    /// Also available for testing and benchmarks.
105    pub fn set_lease(
106        &mut self,
107        lease: Arc<DefaultLease>,
108    ) {
109        // Mark lease as enabled (immutable after this point)
110        self.lease_enabled = true;
111        self.lease = Some(lease);
112    }
113
114    // Injects lease configuration into this state machine.
115    //
116    // Framework-internal method: called by NodeBuilder::build() during initialization.
117    // Opens RocksDB with the standard configuration
118    // ========== Private helper methods ==========
119
120    /// Configure high-performance RocksDB options.
121    ///
122    /// This shared configuration is used by both `new()` and `open_db()` to ensure
123    /// consistency between initial DB creation and snapshot restoration.
124    ///
125    /// # Configuration Details
126    ///
127    /// - **Memory**: 128MB write buffer, 4 max buffers, merge at 2
128    /// - **Compression**: LZ4 (fast), Zstd for bottommost (space-efficient)
129    /// - **WAL**: 1MB sync interval, manual flush, no fsync
130    /// - **Performance**: 4 background jobs, 5000 max open files, direct I/O
131    /// - **Compaction**: Dynamic level bytes, 64MB target file size, 256MB base level
132    /// - **Cache**: 128MB LRU block cache
133    fn configure_db_options() -> Options {
134        let mut opts = Options::default();
135        opts.create_if_missing(true);
136        opts.create_missing_column_families(true);
137
138        // Memory and write optimization
139        opts.set_max_write_buffer_number(4);
140        opts.set_min_write_buffer_number_to_merge(2);
141        opts.set_write_buffer_size(128 * 1024 * 1024); // 128MB
142
143        // Compression optimization
144        opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
145        opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
146        opts.set_compression_options(-14, 0, 0, 0); // LZ4 fast compression
147
148        // WAL-related optimizations
149        opts.set_wal_bytes_per_sync(1024 * 1024); // 1MB sync
150        opts.set_manual_wal_flush(true);
151        opts.set_use_fsync(false);
152
153        // Performance Tuning
154        opts.set_max_background_jobs(4);
155        opts.set_max_open_files(5000);
156        opts.set_use_direct_io_for_flush_and_compaction(true);
157        opts.set_use_direct_reads(true);
158
159        // Leveled Compaction Configuration
160        opts.set_level_compaction_dynamic_level_bytes(true);
161        opts.set_target_file_size_base(64 * 1024 * 1024); // 64MB
162        opts.set_max_bytes_for_level_base(256 * 1024 * 1024); // 256MB
163
164        // Block cache configuration
165        let cache = Cache::new_lru_cache(128 * 1024 * 1024); // 128MB
166        opts.set_row_cache(&cache);
167
168        opts
169    }
170
171    fn open_db<P: AsRef<Path>>(path: P) -> Result<DB, Error> {
172        let opts = Self::configure_db_options();
173        let cfs = vec![STATE_MACHINE_CF, STATE_MACHINE_META_CF];
174        DB::open_cf(&opts, path, cfs).map_err(|e| StorageError::DbError(e.to_string()).into())
175    }
176
177    fn load_state_machine_metadata(db: &Arc<DB>) -> Result<(u64, u64), Error> {
178        let cf = db
179            .cf_handle(STATE_MACHINE_META_CF)
180            .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
181
182        let index = match db
183            .get_cf(&cf, LAST_APPLIED_INDEX_KEY)
184            .map_err(|e| StorageError::DbError(e.to_string()))?
185        {
186            Some(bytes) if bytes.len() == 8 => u64::from_be_bytes([
187                bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
188            ]),
189            _ => 0,
190        };
191
192        let term = match db
193            .get_cf(&cf, LAST_APPLIED_TERM_KEY)
194            .map_err(|e| StorageError::DbError(e.to_string()))?
195        {
196            Some(bytes) if bytes.len() == 8 => u64::from_be_bytes([
197                bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
198            ]),
199            _ => 0,
200        };
201
202        Ok((index, term))
203    }
204
205    fn load_snapshot_metadata(db: &Arc<DB>) -> Result<Option<SnapshotMetadata>, Error> {
206        let cf = db
207            .cf_handle(STATE_MACHINE_META_CF)
208            .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
209
210        match db
211            .get_cf(&cf, SNAPSHOT_METADATA_KEY)
212            .map_err(|e| StorageError::DbError(e.to_string()))?
213        {
214            Some(bytes) => {
215                let metadata = bincode::deserialize(&bytes).map_err(StorageError::BincodeError)?;
216                Ok(Some(metadata))
217            }
218            None => Ok(None),
219        }
220    }
221
222    fn persist_state_machine_metadata(&self) -> Result<(), Error> {
223        let db = self.db.load();
224        let cf = db
225            .cf_handle(STATE_MACHINE_META_CF)
226            .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
227
228        let index = self.last_applied_index.load(Ordering::SeqCst);
229        let term = self.last_applied_term.load(Ordering::SeqCst);
230
231        db.put_cf(&cf, LAST_APPLIED_INDEX_KEY, index.to_be_bytes())
232            .map_err(|e| StorageError::DbError(e.to_string()))?;
233        db.put_cf(&cf, LAST_APPLIED_TERM_KEY, term.to_be_bytes())
234            .map_err(|e| StorageError::DbError(e.to_string()))?;
235
236        Ok(())
237    }
238
239    fn persist_snapshot_metadata(&self) -> Result<(), Error> {
240        let db = self.db.load();
241        let cf = db
242            .cf_handle(STATE_MACHINE_META_CF)
243            .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
244
245        if let Some(metadata) = self.last_snapshot_metadata.read().clone() {
246            let bytes = bincode::serialize(&metadata).map_err(StorageError::BincodeError)?;
247            db.put_cf(&cf, SNAPSHOT_METADATA_KEY, bytes)
248                .map_err(|e| StorageError::DbError(e.to_string()))?;
249        }
250        Ok(())
251    }
252
253    fn persist_ttl_metadata(&self) -> Result<(), Error> {
254        if let Some(ref lease) = self.lease {
255            let db = self.db.load();
256            let cf = db.cf_handle(STATE_MACHINE_META_CF).ok_or_else(|| {
257                StorageError::DbError("State machine meta CF not found".to_string())
258            })?;
259
260            let ttl_snapshot = lease.to_snapshot();
261
262            db.put_cf(&cf, TTL_STATE_KEY, ttl_snapshot)
263                .map_err(|e| StorageError::DbError(e.to_string()))?;
264
265            debug!("Persisted TTL state to RocksDB");
266        }
267        Ok(())
268    }
269
270    /// Loads TTL state from RocksDB metadata after lease injection.
271    ///
272    /// Called after NodeBuilder injects the lease.
273    /// Also available for testing and benchmarks.
274    pub async fn load_lease_data(&self) -> Result<(), Error> {
275        let Some(ref lease) = self.lease else {
276            return Ok(()); // No lease configured
277        };
278
279        let db = self.db.load();
280        let cf = db
281            .cf_handle(STATE_MACHINE_META_CF)
282            .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
283
284        match db
285            .get_cf(&cf, TTL_STATE_KEY)
286            .map_err(|e| StorageError::DbError(e.to_string()))?
287        {
288            Some(ttl_data) => {
289                lease.reload(&ttl_data)?;
290                debug!("Loaded TTL state from RocksDB: {} active TTLs", lease.len());
291            }
292            None => {
293                debug!("No TTL state found in RocksDB");
294            }
295        }
296
297        Ok(())
298    }
299
300    /// Piggyback cleanup: Remove expired keys with time budget
301    ///
302    /// This method is called during apply_chunk to cleanup expired keys
303    /// opportunistically (piggyback on existing Raft events).
304    ///
305    /// # Arguments
306    /// * `max_duration_ms` - Maximum time budget for cleanup (milliseconds)
307    ///
308    /// # Returns
309    /// Number of keys deleted
310    ///
311    /// # Performance
312    /// - Fast-path: ~10ns if no TTL keys exist (lazy activation check)
313    /// - Cleanup: O(log N + K) where K = expired keys
314    /// - Time-bounded: stops after max_duration_ms to avoid blocking Raft
315    #[allow(dead_code)]
316    fn maybe_cleanup_expired(
317        &self,
318        max_duration_ms: u64,
319    ) -> usize {
320        let start = std::time::Instant::now();
321        let now = SystemTime::now();
322        let mut deleted_count = 0;
323
324        // Fast path: skip if TTL never used (lazy activation)
325        if let Some(ref lease) = self.lease {
326            if !lease.has_lease_keys() {
327                return 0; // No TTL keys, skip cleanup (~10ns overhead)
328            }
329
330            // Quick check: any expired keys?
331            if !lease.may_have_expired_keys(now) {
332                return 0; // No expired keys, skip cleanup (~30ns overhead)
333            }
334        } else {
335            return 0; // No lease configured
336        }
337
338        // Get database handle
339        let db = self.db.load();
340        let cf = match db.cf_handle(STATE_MACHINE_CF) {
341            Some(cf) => cf,
342            None => {
343                error!("State machine CF not found during TTL cleanup");
344                return 0;
345            }
346        };
347
348        // Cleanup expired keys with time budget
349        let max_duration = std::time::Duration::from_millis(max_duration_ms);
350
351        loop {
352            // Check time budget
353            if start.elapsed() >= max_duration {
354                debug!(
355                    "Piggyback cleanup time budget exceeded: deleted {} keys in {:?}",
356                    deleted_count,
357                    start.elapsed()
358                );
359                break;
360            }
361
362            // Get next batch of expired keys
363            let expired_keys = if let Some(ref lease) = self.lease {
364                lease.get_expired_keys(now)
365            } else {
366                vec![]
367            };
368
369            if expired_keys.is_empty() {
370                break; // No more expired keys
371            }
372
373            // Delete expired keys from RocksDB using batch for efficiency
374            let mut batch = WriteBatch::default();
375            for key in expired_keys {
376                batch.delete_cf(&cf, &key);
377                deleted_count += 1;
378            }
379
380            // Apply batch delete
381            if let Err(e) = db.write(batch) {
382                error!("Failed to delete expired keys: {}", e);
383                break;
384            }
385        }
386
387        if deleted_count > 0 {
388            debug!(
389                "Piggyback cleanup: deleted {} expired keys in {:?}",
390                deleted_count,
391                start.elapsed()
392            );
393        }
394
395        deleted_count
396    }
397
398    fn apply_batch(
399        &self,
400        batch: WriteBatch,
401    ) -> Result<(), Error> {
402        self.db.load().write(batch).map_err(|e| StorageError::DbError(e.to_string()))?;
403        Ok(())
404    }
405}
406
407#[async_trait]
408impl StateMachine for RocksDBStateMachine {
409    async fn start(&self) -> Result<(), Error> {
410        self.is_serving.store(true, Ordering::SeqCst);
411
412        // Load persisted lease data if configured
413        if let Some(ref _lease) = self.lease {
414            self.load_lease_data().await?;
415            debug!("Lease data loaded during state machine initialization");
416        }
417
418        info!("RocksDB state machine started");
419        Ok(())
420    }
421
422    fn stop(&self) -> Result<(), Error> {
423        self.is_serving.store(false, Ordering::SeqCst);
424
425        // Graceful shutdown: persist TTL state to disk
426        // This ensures lease data survives across restarts
427        if let Err(e) = self.persist_ttl_metadata() {
428            error!("Failed to persist TTL metadata on shutdown: {:?}", e);
429            return Err(e);
430        }
431
432        info!("RocksDB state machine stopped");
433        Ok(())
434    }
435
436    fn is_running(&self) -> bool {
437        self.is_serving.load(Ordering::SeqCst)
438    }
439
440    fn get(
441        &self,
442        key_buffer: &[u8],
443    ) -> Result<Option<Bytes>, Error> {
444        // Guard against reads during snapshot restoration
445        // During apply_snapshot_from_file(), is_serving is set to false while the database
446        // is being replaced. This prevents reads from accessing temporary or inconsistent state.
447        if !self.is_serving.load(Ordering::SeqCst) {
448            return Err(StorageError::NotServing(
449                "State machine is restoring from snapshot".to_string(),
450            )
451            .into());
452        }
453
454        let db = self.db.load();
455        let cf = db
456            .cf_handle(STATE_MACHINE_CF)
457            .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
458
459        match db.get_cf(&cf, key_buffer).map_err(|e| StorageError::DbError(e.to_string()))? {
460            Some(value) => Ok(Some(Bytes::copy_from_slice(&value))),
461            None => Ok(None),
462        }
463    }
464
465    fn entry_term(
466        &self,
467        _entry_id: u64,
468    ) -> Option<u64> {
469        // In RocksDB state machine, we don't store term per key. This method is not typically used.
470        // If needed, we might need to change the design to store term along with value.
471        None
472    }
473
474    /// Thread-safe: called serially by single-task CommitHandler
475    #[instrument(skip(self, chunk))]
476    async fn apply_chunk(
477        &self,
478        chunk: Vec<Entry>,
479    ) -> Result<(), Error> {
480        let db = self.db.load();
481        let cf = db
482            .cf_handle(STATE_MACHINE_CF)
483            .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
484
485        let mut batch = WriteBatch::default();
486        let mut highest_index_entry: Option<LogId> = None;
487
488        for entry in chunk {
489            assert!(entry.payload.is_some(), "Entry payload should not be None!");
490
491            if let Some(prev) = highest_index_entry {
492                assert!(
493                    entry.index > prev.index,
494                    "apply_chunk: received unordered entry at index {} (prev={})",
495                    entry.index,
496                    prev.index
497                );
498            }
499            highest_index_entry = Some(LogId {
500                index: entry.index,
501                term: entry.term,
502            });
503
504            match entry.payload.unwrap().payload {
505                Some(Payload::Noop(_)) => {
506                    debug!("Handling NOOP command at index {}", entry.index);
507                }
508                Some(Payload::Command(data)) => match WriteCommand::decode(&data[..]) {
509                    Ok(write_cmd) => match write_cmd.operation {
510                        Some(Operation::Insert(Insert {
511                            key,
512                            value,
513                            ttl_secs,
514                        })) => {
515                            batch.put_cf(&cf, &key, &value);
516
517                            // Register lease if TTL specified
518                            if ttl_secs > 0 {
519                                // Validate lease is enabled before accepting TTL requests
520                                if !self.lease_enabled {
521                                    return Err(StorageError::FeatureNotEnabled(
522                                        "TTL feature is not enabled on this server. \
523                                         Enable it in config: [raft.state_machine.lease] enabled = true".into()
524                                    ).into());
525                                }
526
527                                // Safety: lease_enabled invariant ensures lease.is_some()
528                                let lease = unsafe { self.lease.as_ref().unwrap_unchecked() };
529                                lease.register(key.clone(), ttl_secs);
530                            }
531                        }
532                        Some(Operation::Delete(Delete { key })) => {
533                            batch.delete_cf(&cf, &key);
534
535                            // Unregister TTL for deleted key
536                            if let Some(ref lease) = self.lease {
537                                lease.unregister(&key);
538                            }
539                        }
540                        None => {
541                            warn!("WriteCommand without operation at index {}", entry.index);
542                        }
543                    },
544                    Err(e) => {
545                        error!(
546                            "Failed to decode WriteCommand at index {}: {:?}",
547                            entry.index, e
548                        );
549                        return Err(StorageError::SerializationError(e.to_string()).into());
550                    }
551                },
552                Some(Payload::Config(_config_change)) => {
553                    debug!("Ignoring config change at index {}", entry.index);
554                }
555                None => panic!("Entry payload variant should not be None!"),
556            }
557        }
558
559        self.apply_batch(batch)?;
560
561        // Note: Lease cleanup is now handled by:
562        // - Lazy strategy: cleanup in get() method
563        // - Background strategy: dedicated async task
564        // This avoids blocking the Raft apply hot path
565
566        if let Some(log_id) = highest_index_entry {
567            self.update_last_applied(log_id);
568        }
569
570        Ok(())
571    }
572
573    fn len(&self) -> usize {
574        let db = self.db.load();
575        let cf = match db.cf_handle(STATE_MACHINE_CF) {
576            Some(cf) => cf,
577            None => return 0,
578        };
579
580        // Note: This is an expensive operation because it iterates over all keys.
581        let iter = db.iterator_cf(&cf, IteratorMode::Start);
582        iter.count()
583    }
584
585    fn update_last_applied(
586        &self,
587        last_applied: LogId,
588    ) {
589        self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
590        self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
591    }
592
593    fn last_applied(&self) -> LogId {
594        LogId {
595            index: self.last_applied_index.load(Ordering::SeqCst),
596            term: self.last_applied_term.load(Ordering::SeqCst),
597        }
598    }
599
600    fn persist_last_applied(
601        &self,
602        last_applied: LogId,
603    ) -> Result<(), Error> {
604        self.update_last_applied(last_applied);
605        self.persist_state_machine_metadata()
606    }
607
608    fn update_last_snapshot_metadata(
609        &self,
610        snapshot_metadata: &SnapshotMetadata,
611    ) -> Result<(), Error> {
612        *self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
613        Ok(())
614    }
615
616    fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
617        self.last_snapshot_metadata.read().clone()
618    }
619
620    fn persist_last_snapshot_metadata(
621        &self,
622        snapshot_metadata: &SnapshotMetadata,
623    ) -> Result<(), Error> {
624        self.update_last_snapshot_metadata(snapshot_metadata)?;
625        self.persist_snapshot_metadata()
626    }
627
628    #[instrument(skip(self))]
629    async fn apply_snapshot_from_file(
630        &self,
631        metadata: &SnapshotMetadata,
632        snapshot_dir: std::path::PathBuf,
633    ) -> Result<(), Error> {
634        info!("Applying snapshot from checkpoint: {:?}", snapshot_dir);
635
636        // PHASE 1: Stop serving requests
637        self.is_serving.store(false, Ordering::SeqCst);
638        info!("Stopped serving requests for snapshot restoration");
639
640        // PHASE 2: Flush and prepare old DB for replacement
641        {
642            let old_db = self.db.load();
643            old_db.flush().map_err(|e| StorageError::DbError(e.to_string()))?;
644            old_db.cancel_all_background_work(true);
645            info!("Flushed and stopped background work on old DB");
646        }
647
648        // PHASE 2.5: Create temporary DB and swap to release old DB's lock
649        // This is critical: we must release the old DB instance completely before moving directories
650        // Otherwise, the old DB still holds a lock on rocksdb_sm/LOCK even after directory rename
651        let temp_dir = tempfile::TempDir::new()?;
652        let temp_db_path = temp_dir.path().join("temp_db");
653        let temp_db = Self::open_db(&temp_db_path).map_err(|e| {
654            error!("Failed to create temporary DB: {:?}", e);
655            e
656        })?;
657
658        // Swap temp DB into self.db, which releases the old DB's Arc
659        // Now old DB's Arc refcount drops to 0, DB instance is dropped, lock is released
660        self.db.store(Arc::new(temp_db));
661        info!("Swapped to temporary DB, old DB lock released");
662
663        // PHASE 3: Atomic directory replacement
664        let backup_dir = self.db_path.with_extension("backup");
665
666        // Remove old backup if exists
667        if backup_dir.exists() {
668            tokio::fs::remove_dir_all(&backup_dir).await?;
669        }
670
671        // Move current DB to backup
672        tokio::fs::rename(&self.db_path, &backup_dir).await?;
673        info!("Backed up current DB to: {:?}", backup_dir);
674
675        // Move checkpoint to DB path
676        tokio::fs::rename(&snapshot_dir, &self.db_path).await.inspect_err(|_e| {
677            // Rollback: restore from backup
678            let _ = std::fs::rename(&backup_dir, &self.db_path);
679        })?;
680        info!("Moved checkpoint to DB path: {:?}", self.db_path);
681
682        // PHASE 4: Open new DB from checkpoint
683        let new_db = Self::open_db(&self.db_path).map_err(|e| {
684            // Rollback: restore from backup
685            let _ = std::fs::rename(&backup_dir, &self.db_path);
686            error!("Failed to open new DB, rolled back to backup: {:?}", e);
687            e
688        })?;
689
690        // Atomically swap DB reference (replacing temp DB with new DB)
691        self.db.store(Arc::new(new_db));
692        info!("Atomically swapped to new DB instance");
693
694        // PHASE 5: Restore TTL state (if lease is configured)
695        if let Some(ref lease) = self.lease {
696            let ttl_path = self.db_path.join("ttl_state.bin");
697            if ttl_path.exists() {
698                let ttl_data = tokio::fs::read(&ttl_path).await?;
699                lease.reload(&ttl_data)?;
700
701                // Persist TTL state to metadata CF to ensure consistency after restart
702                // Without this, a subsequent restart (non-snapshot) would lose TTL state
703                // because load_lease_data() reads from metadata CF, not ttl_state.bin
704                self.persist_ttl_metadata()?;
705
706                info!("Lease state restored from snapshot and persisted to metadata CF");
707            } else {
708                warn!("No lease state found in snapshot");
709            }
710        }
711
712        // PHASE 6: Update metadata
713        *self.last_snapshot_metadata.write() = Some(metadata.clone());
714        if let Some(last_included) = &metadata.last_included {
715            self.update_last_applied(*last_included);
716        }
717
718        // PHASE 7: Resume serving
719        self.is_serving.store(true, Ordering::SeqCst);
720        info!("Resumed serving requests");
721
722        // PHASE 8: Clean up backup (best effort)
723        if let Err(e) = tokio::fs::remove_dir_all(&backup_dir).await {
724            warn!("Failed to remove backup directory: {}", e);
725        } else {
726            info!("Cleaned up backup directory");
727        }
728
729        info!("Snapshot applied successfully - full DB restoration complete");
730        Ok(())
731    }
732
733    #[instrument(skip(self))]
734    async fn generate_snapshot_data(
735        &self,
736        new_snapshot_dir: std::path::PathBuf,
737        last_included: LogId,
738    ) -> Result<Bytes, Error> {
739        // Create a checkpoint in the new_snapshot_dir
740        // Use scope to ensure checkpoint is dropped before await
741        {
742            let db = self.db.load();
743            let checkpoint = rocksdb::checkpoint::Checkpoint::new(db.as_ref())
744                .map_err(|e| StorageError::DbError(e.to_string()))?;
745            checkpoint
746                .create_checkpoint(&new_snapshot_dir)
747                .map_err(|e| StorageError::DbError(e.to_string()))?;
748        } // checkpoint dropped here, before any await
749
750        // Persist lease state alongside the checkpoint (if configured)
751        if let Some(ref lease) = self.lease {
752            let ttl_snapshot = lease.to_snapshot();
753            let ttl_path = new_snapshot_dir.join("ttl_state.bin");
754            tokio::fs::write(&ttl_path, ttl_snapshot).await?;
755        }
756
757        // Update metadata
758        let checksum = [0; 32]; // For now, we return a dummy checksum.
759        let snapshot_metadata = SnapshotMetadata {
760            last_included: Some(last_included),
761            checksum: Bytes::copy_from_slice(&checksum),
762        };
763        self.persist_last_snapshot_metadata(&snapshot_metadata)?;
764
765        info!("Snapshot generated at {:?} with TTL data", new_snapshot_dir);
766        Ok(Bytes::copy_from_slice(&checksum))
767    }
768
769    fn save_hard_state(&self) -> Result<(), Error> {
770        self.persist_state_machine_metadata()?;
771        self.persist_snapshot_metadata()?;
772        Ok(())
773    }
774
775    fn flush(&self) -> Result<(), Error> {
776        let db = self.db.load();
777
778        // Step 1: Sync WAL to disk (critical!)
779        // true = sync to disk
780        db.flush_wal(true).map_err(|e| StorageError::DbError(e.to_string()))?;
781        // Step 2: Flush memtables to SST files
782        db.flush().map_err(|e| StorageError::DbError(e.to_string()))?;
783
784        // Persist state machine metadata (last_applied_index, last_applied_term, snapshot_metadata)
785        self.persist_state_machine_metadata()?;
786
787        Ok(())
788    }
789
790    async fn flush_async(&self) -> Result<(), Error> {
791        self.flush()
792    }
793
794    #[instrument(skip(self))]
795    async fn reset(&self) -> Result<(), Error> {
796        let db = self.db.load();
797        let cf = db
798            .cf_handle(STATE_MACHINE_CF)
799            .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
800
801        // Delete all keys in the state machine
802        let mut batch = WriteBatch::default();
803        let iter = db.iterator_cf(&cf, IteratorMode::Start);
804
805        for item in iter {
806            let (key, _) = item.map_err(|e| StorageError::DbError(e.to_string()))?;
807            batch.delete_cf(&cf, &key);
808        }
809
810        db.write(batch).map_err(|e| StorageError::DbError(e.to_string()))?;
811
812        // Reset metadata
813        self.last_applied_index.store(0, Ordering::SeqCst);
814        self.last_applied_term.store(0, Ordering::SeqCst);
815        *self.last_snapshot_metadata.write() = None;
816
817        // Note: Lease is managed by NodeBuilder and doesn't need reset
818
819        self.persist_state_machine_metadata()?;
820        self.persist_snapshot_metadata()?;
821
822        info!("RocksDB state machine reset completed");
823        Ok(())
824    }
825
826    async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
827        // Fast path: no lease configured
828        let Some(ref lease) = self.lease else {
829            return Ok(vec![]);
830        };
831
832        // Get all expired keys
833        let now = SystemTime::now();
834        let expired_keys = lease.get_expired_keys(now);
835
836        if expired_keys.is_empty() {
837            return Ok(vec![]);
838        }
839
840        debug!(
841            "Lease background cleanup: found {} expired keys",
842            expired_keys.len()
843        );
844
845        // Delete expired keys from RocksDB
846        let db = self.db.load();
847        let cf = db
848            .cf_handle(STATE_MACHINE_CF)
849            .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
850
851        let mut batch = WriteBatch::default();
852        for key in &expired_keys {
853            batch.delete_cf(&cf, key);
854        }
855
856        self.apply_batch(batch)?;
857
858        info!(
859            "Lease background cleanup: deleted {} expired keys",
860            expired_keys.len()
861        );
862
863        Ok(expired_keys)
864    }
865}
866impl Drop for RocksDBStateMachine {
867    fn drop(&mut self) {
868        // save_hard_state() persists last_applied metadata before flush
869        // This is critical to prevent replay of already-applied entries on restart
870        if let Err(e) = self.save_hard_state() {
871            error!("Failed to save hard state on drop: {}", e);
872        }
873
874        // Then flush data to disk
875        if let Err(e) = self.flush() {
876            error!("Failed to flush on drop: {}", e);
877        } else {
878            debug!("RocksDBStateMachine flushed successfully on drop");
879        }
880
881        // This ensures flush operations are truly finished
882        self.db.load().cancel_all_background_work(true); // true = wait for completion
883        debug!("RocksDB background work cancelled on drop");
884    }
885}