Skip to main content

d_engine_server/storage/adaptors/rocksdb/
rocksdb_state_machine.rs

1use std::path::Path;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::Ordering;
7use std::time::SystemTime;
8
9use arc_swap::ArcSwap;
10use bytes::Bytes;
11use d_engine_core::ApplyResult;
12use d_engine_core::Error;
13use d_engine_core::Lease;
14use d_engine_core::StateMachine;
15use d_engine_core::StorageError;
16use d_engine_proto::client::WriteCommand;
17use d_engine_proto::client::write_command::CompareAndSwap;
18use d_engine_proto::client::write_command::Delete;
19use d_engine_proto::client::write_command::Insert;
20use d_engine_proto::client::write_command::Operation;
21use d_engine_proto::common::Entry;
22use d_engine_proto::common::LogId;
23use d_engine_proto::common::entry_payload::Payload;
24use d_engine_proto::server::storage::SnapshotMetadata;
25use parking_lot::RwLock;
26use prost::Message;
27use rocksdb::Cache;
28use rocksdb::DB;
29use rocksdb::IteratorMode;
30use rocksdb::Options;
31use rocksdb::WriteBatch;
32use tonic::async_trait;
33use tracing::debug;
34use tracing::error;
35use tracing::info;
36use tracing::instrument;
37use tracing::warn;
38
39use crate::storage::DefaultLease;
40
41const STATE_MACHINE_CF: &str = "state_machine";
42const STATE_MACHINE_META_CF: &str = "state_machine_meta";
43const LAST_APPLIED_INDEX_KEY: &[u8] = b"last_applied_index";
44const LAST_APPLIED_TERM_KEY: &[u8] = b"last_applied_term";
45const SNAPSHOT_METADATA_KEY: &[u8] = b"snapshot_metadata";
46const TTL_STATE_KEY: &[u8] = b"ttl_state";
47
48/// RocksDB-based state machine implementation with lease support
49#[derive(Debug)]
50pub struct RocksDBStateMachine {
51    db: Arc<ArcSwap<DB>>,
52    db_path: PathBuf,
53    is_serving: AtomicBool,
54    last_applied_index: AtomicU64,
55    last_applied_term: AtomicU64,
56    last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
57
58    // Lease management for automatic key expiration
59    // DefaultLease is thread-safe internally (uses DashMap + Mutex)
60    // Injected by NodeBuilder after construction
61    lease: Option<Arc<DefaultLease>>,
62
63    /// Whether lease manager is enabled (immutable after init)
64    /// Set to true when lease is injected, never changes after that
65    ///
66    /// Invariant: lease_enabled == true ⟹ lease.is_some()
67    /// Performance: Allows safe unwrap_unchecked in hot paths
68    lease_enabled: bool,
69}
70
71impl RocksDBStateMachine {
72    /// Creates a new RocksDB-based state machine
73    ///
74    /// Lease will be injected by NodeBuilder after construction.
75    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
76        let db_path = path.as_ref().to_path_buf();
77
78        // Configure RocksDB options using shared configuration
79        let opts = Self::configure_db_options();
80        let cfs = vec![STATE_MACHINE_CF, STATE_MACHINE_META_CF];
81
82        let db =
83            DB::open_cf(&opts, &db_path, cfs).map_err(|e| StorageError::DbError(e.to_string()))?;
84        let db_arc = Arc::new(db);
85
86        // Load metadata
87        let (last_applied_index, last_applied_term) = Self::load_state_machine_metadata(&db_arc)?;
88        let last_snapshot_metadata = Self::load_snapshot_metadata(&db_arc)?;
89
90        Ok(Self {
91            db: Arc::new(ArcSwap::new(db_arc)),
92            db_path,
93            is_serving: AtomicBool::new(true),
94            last_applied_index: AtomicU64::new(last_applied_index),
95            last_applied_term: AtomicU64::new(last_applied_term),
96            last_snapshot_metadata: RwLock::new(last_snapshot_metadata),
97            lease: None,          // Will be injected by NodeBuilder
98            lease_enabled: false, // Default: no lease until set
99        })
100    }
101
102    /// Sets the lease manager for this state machine.
103    ///
104    /// This is an internal method called by NodeBuilder during initialization.
105    /// The lease will also be restored from snapshot during `apply_snapshot_from_file()`.
106    /// Also available for testing and benchmarks.
107    pub fn set_lease(
108        &mut self,
109        lease: Arc<DefaultLease>,
110    ) {
111        // Mark lease as enabled (immutable after this point)
112        self.lease_enabled = true;
113        self.lease = Some(lease);
114    }
115
116    // Injects lease configuration into this state machine.
117    //
118    // Framework-internal method: called by NodeBuilder::build() during initialization.
119    // Opens RocksDB with the standard configuration
120    // ========== Private helper methods ==========
121
122    /// Configure high-performance RocksDB options.
123    ///
124    /// This shared configuration is used by both `new()` and `open_db()` to ensure
125    /// consistency between initial DB creation and snapshot restoration.
126    ///
127    /// # Configuration Details
128    ///
129    /// - **Memory**: 128MB write buffer, 4 max buffers, merge at 2
130    /// - **Compression**: LZ4 (fast), Zstd for bottommost (space-efficient)
131    /// - **WAL**: 1MB sync interval, manual flush, no fsync
132    /// - **Performance**: 4 background jobs, 5000 max open files, direct I/O
133    /// - **Compaction**: Dynamic level bytes, 64MB target file size, 256MB base level
134    /// - **Cache**: 128MB LRU block cache
135    fn configure_db_options() -> Options {
136        let mut opts = Options::default();
137        opts.create_if_missing(true);
138        opts.create_missing_column_families(true);
139
140        // Memory and write optimization
141        opts.set_max_write_buffer_number(4);
142        opts.set_min_write_buffer_number_to_merge(2);
143        opts.set_write_buffer_size(128 * 1024 * 1024); // 128MB
144
145        // Compression optimization
146        opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
147        opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
148        opts.set_compression_options(-14, 0, 0, 0); // LZ4 fast compression
149
150        // WAL-related optimizations
151        opts.set_wal_bytes_per_sync(1024 * 1024); // 1MB sync
152        opts.set_manual_wal_flush(true);
153        opts.set_use_fsync(false);
154
155        // Performance Tuning
156        opts.set_max_background_jobs(4);
157        opts.set_max_open_files(5000);
158        opts.set_use_direct_io_for_flush_and_compaction(true);
159        opts.set_use_direct_reads(true);
160
161        // Leveled Compaction Configuration
162        opts.set_level_compaction_dynamic_level_bytes(true);
163        opts.set_target_file_size_base(64 * 1024 * 1024); // 64MB
164        opts.set_max_bytes_for_level_base(256 * 1024 * 1024); // 256MB
165
166        // Block cache configuration
167        let cache = Cache::new_lru_cache(128 * 1024 * 1024); // 128MB
168        opts.set_row_cache(&cache);
169
170        opts
171    }
172
173    fn open_db<P: AsRef<Path>>(path: P) -> Result<DB, Error> {
174        let opts = Self::configure_db_options();
175        let cfs = vec![STATE_MACHINE_CF, STATE_MACHINE_META_CF];
176        DB::open_cf(&opts, path, cfs).map_err(|e| StorageError::DbError(e.to_string()).into())
177    }
178
179    fn load_state_machine_metadata(db: &Arc<DB>) -> Result<(u64, u64), Error> {
180        let cf = db
181            .cf_handle(STATE_MACHINE_META_CF)
182            .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
183
184        let index = match db
185            .get_cf(&cf, LAST_APPLIED_INDEX_KEY)
186            .map_err(|e| StorageError::DbError(e.to_string()))?
187        {
188            Some(bytes) if bytes.len() == 8 => u64::from_be_bytes([
189                bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
190            ]),
191            _ => 0,
192        };
193
194        let term = match db
195            .get_cf(&cf, LAST_APPLIED_TERM_KEY)
196            .map_err(|e| StorageError::DbError(e.to_string()))?
197        {
198            Some(bytes) if bytes.len() == 8 => u64::from_be_bytes([
199                bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
200            ]),
201            _ => 0,
202        };
203
204        Ok((index, term))
205    }
206
207    fn load_snapshot_metadata(db: &Arc<DB>) -> Result<Option<SnapshotMetadata>, Error> {
208        let cf = db
209            .cf_handle(STATE_MACHINE_META_CF)
210            .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
211
212        match db
213            .get_cf(&cf, SNAPSHOT_METADATA_KEY)
214            .map_err(|e| StorageError::DbError(e.to_string()))?
215        {
216            Some(bytes) => {
217                let metadata = bincode::deserialize(&bytes).map_err(StorageError::BincodeError)?;
218                Ok(Some(metadata))
219            }
220            None => Ok(None),
221        }
222    }
223
224    fn persist_state_machine_metadata(&self) -> Result<(), Error> {
225        let db = self.db.load();
226        let cf = db
227            .cf_handle(STATE_MACHINE_META_CF)
228            .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
229
230        let index = self.last_applied_index.load(Ordering::SeqCst);
231        let term = self.last_applied_term.load(Ordering::SeqCst);
232
233        db.put_cf(&cf, LAST_APPLIED_INDEX_KEY, index.to_be_bytes())
234            .map_err(|e| StorageError::DbError(e.to_string()))?;
235        db.put_cf(&cf, LAST_APPLIED_TERM_KEY, term.to_be_bytes())
236            .map_err(|e| StorageError::DbError(e.to_string()))?;
237
238        Ok(())
239    }
240
241    fn persist_snapshot_metadata(&self) -> Result<(), Error> {
242        let db = self.db.load();
243        let cf = db
244            .cf_handle(STATE_MACHINE_META_CF)
245            .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
246
247        if let Some(metadata) = self.last_snapshot_metadata.read().clone() {
248            let bytes = bincode::serialize(&metadata).map_err(StorageError::BincodeError)?;
249            db.put_cf(&cf, SNAPSHOT_METADATA_KEY, bytes)
250                .map_err(|e| StorageError::DbError(e.to_string()))?;
251        }
252        Ok(())
253    }
254
255    fn persist_ttl_metadata(&self) -> Result<(), Error> {
256        if let Some(ref lease) = self.lease {
257            let db = self.db.load();
258            let cf = db.cf_handle(STATE_MACHINE_META_CF).ok_or_else(|| {
259                StorageError::DbError("State machine meta CF not found".to_string())
260            })?;
261
262            let ttl_snapshot = lease.to_snapshot();
263
264            db.put_cf(&cf, TTL_STATE_KEY, ttl_snapshot)
265                .map_err(|e| StorageError::DbError(e.to_string()))?;
266
267            debug!("Persisted TTL state to RocksDB");
268        }
269        Ok(())
270    }
271
272    /// Loads TTL state from RocksDB metadata after lease injection.
273    ///
274    /// Called after NodeBuilder injects the lease.
275    /// Also available for testing and benchmarks.
276    pub async fn load_lease_data(&self) -> Result<(), Error> {
277        let Some(ref lease) = self.lease else {
278            return Ok(()); // No lease configured
279        };
280
281        let db = self.db.load();
282        let cf = db
283            .cf_handle(STATE_MACHINE_META_CF)
284            .ok_or_else(|| StorageError::DbError("State machine meta CF not found".to_string()))?;
285
286        match db
287            .get_cf(&cf, TTL_STATE_KEY)
288            .map_err(|e| StorageError::DbError(e.to_string()))?
289        {
290            Some(ttl_data) => {
291                lease.reload(&ttl_data)?;
292                debug!("Loaded TTL state from RocksDB: {} active TTLs", lease.len());
293            }
294            None => {
295                debug!("No TTL state found in RocksDB");
296            }
297        }
298
299        Ok(())
300    }
301
302    /// Piggyback cleanup: Remove expired keys with time budget
303    ///
304    /// This method is called during apply_chunk to cleanup expired keys
305    /// opportunistically (piggyback on existing Raft events).
306    ///
307    /// # Arguments
308    /// * `max_duration_ms` - Maximum time budget for cleanup (milliseconds)
309    ///
310    /// # Returns
311    /// Number of keys deleted
312    ///
313    /// # Performance
314    /// - Fast-path: ~10ns if no TTL keys exist (lazy activation check)
315    /// - Cleanup: O(log N + K) where K = expired keys
316    /// - Time-bounded: stops after max_duration_ms to avoid blocking Raft
317    #[allow(dead_code)]
318    fn maybe_cleanup_expired(
319        &self,
320        max_duration_ms: u64,
321    ) -> usize {
322        let start = std::time::Instant::now();
323        let now = SystemTime::now();
324        let mut deleted_count = 0;
325
326        // Fast path: skip if TTL never used (lazy activation)
327        if let Some(ref lease) = self.lease {
328            if !lease.has_lease_keys() {
329                return 0; // No TTL keys, skip cleanup (~10ns overhead)
330            }
331
332            // Quick check: any expired keys?
333            if !lease.may_have_expired_keys(now) {
334                return 0; // No expired keys, skip cleanup (~30ns overhead)
335            }
336        } else {
337            return 0; // No lease configured
338        }
339
340        // Get database handle
341        let db = self.db.load();
342        let cf = match db.cf_handle(STATE_MACHINE_CF) {
343            Some(cf) => cf,
344            None => {
345                error!("State machine CF not found during TTL cleanup");
346                return 0;
347            }
348        };
349
350        // Cleanup expired keys with time budget
351        let max_duration = std::time::Duration::from_millis(max_duration_ms);
352
353        loop {
354            // Check time budget
355            if start.elapsed() >= max_duration {
356                debug!(
357                    "Piggyback cleanup time budget exceeded: deleted {} keys in {:?}",
358                    deleted_count,
359                    start.elapsed()
360                );
361                break;
362            }
363
364            // Get next batch of expired keys
365            let expired_keys = if let Some(ref lease) = self.lease {
366                lease.get_expired_keys(now)
367            } else {
368                vec![]
369            };
370
371            if expired_keys.is_empty() {
372                break; // No more expired keys
373            }
374
375            // Delete expired keys from RocksDB using batch for efficiency
376            let mut batch = WriteBatch::default();
377            for key in expired_keys {
378                batch.delete_cf(&cf, &key);
379                deleted_count += 1;
380            }
381
382            // Apply batch delete
383            if let Err(e) = db.write(batch) {
384                error!("Failed to delete expired keys: {}", e);
385                break;
386            }
387        }
388
389        if deleted_count > 0 {
390            debug!(
391                "Piggyback cleanup: deleted {} expired keys in {:?}",
392                deleted_count,
393                start.elapsed()
394            );
395        }
396
397        deleted_count
398    }
399
400    fn apply_batch(
401        &self,
402        batch: WriteBatch,
403    ) -> Result<(), Error> {
404        self.db.load().write(batch).map_err(|e| StorageError::DbError(e.to_string()))?;
405        Ok(())
406    }
407}
408
409#[async_trait]
410impl StateMachine for RocksDBStateMachine {
411    async fn start(&self) -> Result<(), Error> {
412        self.is_serving.store(true, Ordering::SeqCst);
413
414        // Load persisted lease data if configured
415        if let Some(ref _lease) = self.lease {
416            self.load_lease_data().await?;
417            debug!("Lease data loaded during state machine initialization");
418        }
419
420        info!("RocksDB state machine started");
421        Ok(())
422    }
423
424    fn stop(&self) -> Result<(), Error> {
425        self.is_serving.store(false, Ordering::SeqCst);
426
427        // Graceful shutdown: persist TTL state to disk
428        // This ensures lease data survives across restarts
429        if let Err(e) = self.persist_ttl_metadata() {
430            error!("Failed to persist TTL metadata on shutdown: {:?}", e);
431            return Err(e);
432        }
433
434        info!("RocksDB state machine stopped");
435        Ok(())
436    }
437
438    fn is_running(&self) -> bool {
439        self.is_serving.load(Ordering::SeqCst)
440    }
441
442    fn get(
443        &self,
444        key_buffer: &[u8],
445    ) -> Result<Option<Bytes>, Error> {
446        // Guard against reads during snapshot restoration
447        // During apply_snapshot_from_file(), is_serving is set to false while the database
448        // is being replaced. This prevents reads from accessing temporary or inconsistent state.
449        if !self.is_serving.load(Ordering::SeqCst) {
450            return Err(StorageError::NotServing(
451                "State machine is restoring from snapshot".to_string(),
452            )
453            .into());
454        }
455
456        let db = self.db.load();
457        let cf = db
458            .cf_handle(STATE_MACHINE_CF)
459            .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
460
461        match db.get_cf(&cf, key_buffer).map_err(|e| StorageError::DbError(e.to_string()))? {
462            Some(value) => Ok(Some(Bytes::copy_from_slice(&value))),
463            None => Ok(None),
464        }
465    }
466
467    fn entry_term(
468        &self,
469        _entry_id: u64,
470    ) -> Option<u64> {
471        // In RocksDB state machine, we don't store term per key. This method is not typically used.
472        // If needed, we might need to change the design to store term along with value.
473        None
474    }
475
476    /// Thread-safe: called serially by single-task CommitHandler
477    #[instrument(skip(self, chunk))]
478    async fn apply_chunk(
479        &self,
480        chunk: Vec<Entry>,
481    ) -> Result<Vec<ApplyResult>, Error> {
482        let db = self.db.load();
483        let cf = db
484            .cf_handle(STATE_MACHINE_CF)
485            .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
486
487        let mut batch = WriteBatch::default();
488        let mut highest_index_entry: Option<LogId> = None;
489        let mut results = Vec::with_capacity(chunk.len());
490
491        for entry in chunk {
492            assert!(entry.payload.is_some(), "Entry payload should not be None!");
493
494            if let Some(prev) = highest_index_entry {
495                assert!(
496                    entry.index > prev.index,
497                    "apply_chunk: received unordered entry at index {} (prev={})",
498                    entry.index,
499                    prev.index
500                );
501            }
502            highest_index_entry = Some(LogId {
503                index: entry.index,
504                term: entry.term,
505            });
506
507            match entry.payload.unwrap().payload {
508                Some(Payload::Noop(_)) => {
509                    debug!("Handling NOOP command at index {}", entry.index);
510                    // NOOP always succeeds
511                    results.push(ApplyResult::success(entry.index));
512                }
513                Some(Payload::Command(data)) => match WriteCommand::decode(&data[..]) {
514                    Ok(write_cmd) => match write_cmd.operation {
515                        Some(Operation::Insert(Insert {
516                            key,
517                            value,
518                            ttl_secs,
519                        })) => {
520                            batch.put_cf(&cf, &key, &value);
521
522                            // Register lease if TTL specified
523                            if ttl_secs > 0 {
524                                // Validate lease is enabled before accepting TTL requests
525                                if !self.lease_enabled {
526                                    return Err(StorageError::FeatureNotEnabled(
527                                        "TTL feature is not enabled on this server. \
528                                         Enable it in config: [raft.state_machine.lease] enabled = true".into()
529                                    ).into());
530                                }
531
532                                // Safety: lease_enabled invariant ensures lease.is_some()
533                                let lease = unsafe { self.lease.as_ref().unwrap_unchecked() };
534                                lease.register(key.clone(), ttl_secs);
535                            }
536
537                            // PUT always succeeds (errors returned as Err)
538                            results.push(ApplyResult::success(entry.index));
539                        }
540                        Some(Operation::Delete(Delete { key })) => {
541                            batch.delete_cf(&cf, &key);
542
543                            // Unregister TTL for deleted key
544                            if let Some(ref lease) = self.lease {
545                                lease.unregister(&key);
546                            }
547
548                            // DELETE always succeeds (errors returned as Err)
549                            results.push(ApplyResult::success(entry.index));
550                        }
551                        Some(Operation::CompareAndSwap(CompareAndSwap {
552                            key,
553                            expected_value,
554                            new_value,
555                        })) => {
556                            // RocksDB doesn't have native CAS, implement via read-compare-write
557                            // This is safe because apply_chunk is called sequentially per Raft log order
558                            let current_value = db.get_cf(&cf, &key).map_err(|e| {
559                                StorageError::DbError(format!("CAS read failed: {e}"))
560                            })?;
561
562                            let cas_success = match (current_value, &expected_value) {
563                                (Some(current), Some(expected)) => current == expected.as_ref(),
564                                (None, None) => true,
565                                _ => false,
566                            };
567
568                            if cas_success {
569                                batch.put_cf(&cf, &key, &new_value);
570                            }
571
572                            // Store CAS result for client response
573                            results.push(if cas_success {
574                                ApplyResult::success(entry.index)
575                            } else {
576                                ApplyResult::failure(entry.index)
577                            });
578
579                            debug!(
580                                "CAS at index {}: key={:?}, success={}",
581                                entry.index,
582                                String::from_utf8_lossy(&key),
583                                cas_success
584                            );
585                        }
586                        None => {
587                            warn!("WriteCommand without operation at index {}", entry.index);
588                        }
589                    },
590                    Err(e) => {
591                        error!(
592                            "Failed to decode WriteCommand at index {}: {:?}",
593                            entry.index, e
594                        );
595                        return Err(StorageError::SerializationError(e.to_string()).into());
596                    }
597                },
598                Some(Payload::Config(_config_change)) => {
599                    debug!("Ignoring config change at index {}", entry.index);
600                }
601                None => panic!("Entry payload variant should not be None!"),
602            }
603        }
604
605        self.apply_batch(batch)?;
606
607        // Note: Lease cleanup is now handled by:
608        // - Lazy strategy: cleanup in get() method
609        // - Background strategy: dedicated async task
610        // This avoids blocking the Raft apply hot path
611
612        // Update last_applied after successful batch write
613        if let Some(highest) = highest_index_entry {
614            self.update_last_applied(highest);
615        }
616
617        Ok(results)
618    }
619
620    fn len(&self) -> usize {
621        let db = self.db.load();
622        let cf = match db.cf_handle(STATE_MACHINE_CF) {
623            Some(cf) => cf,
624            None => return 0,
625        };
626
627        // Note: This is an expensive operation because it iterates over all keys.
628        let iter = db.iterator_cf(&cf, IteratorMode::Start);
629        iter.count()
630    }
631
632    fn update_last_applied(
633        &self,
634        last_applied: LogId,
635    ) {
636        self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
637        self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
638    }
639
640    fn last_applied(&self) -> LogId {
641        LogId {
642            index: self.last_applied_index.load(Ordering::SeqCst),
643            term: self.last_applied_term.load(Ordering::SeqCst),
644        }
645    }
646
647    fn persist_last_applied(
648        &self,
649        last_applied: LogId,
650    ) -> Result<(), Error> {
651        self.update_last_applied(last_applied);
652        self.persist_state_machine_metadata()
653    }
654
655    fn update_last_snapshot_metadata(
656        &self,
657        snapshot_metadata: &SnapshotMetadata,
658    ) -> Result<(), Error> {
659        *self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
660        Ok(())
661    }
662
663    fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
664        self.last_snapshot_metadata.read().clone()
665    }
666
667    fn persist_last_snapshot_metadata(
668        &self,
669        snapshot_metadata: &SnapshotMetadata,
670    ) -> Result<(), Error> {
671        self.update_last_snapshot_metadata(snapshot_metadata)?;
672        self.persist_snapshot_metadata()
673    }
674
675    #[instrument(skip(self))]
676    async fn apply_snapshot_from_file(
677        &self,
678        metadata: &SnapshotMetadata,
679        snapshot_dir: std::path::PathBuf,
680    ) -> Result<(), Error> {
681        info!("Applying snapshot from checkpoint: {:?}", snapshot_dir);
682
683        // PHASE 1: Stop serving requests
684        self.is_serving.store(false, Ordering::SeqCst);
685        info!("Stopped serving requests for snapshot restoration");
686
687        // PHASE 2: Flush and prepare old DB for replacement
688        {
689            let old_db = self.db.load();
690            old_db.flush().map_err(|e| StorageError::DbError(e.to_string()))?;
691            old_db.cancel_all_background_work(true);
692            info!("Flushed and stopped background work on old DB");
693        }
694
695        // PHASE 2.5: Create temporary DB and swap to release old DB's lock
696        // This is critical: we must release the old DB instance completely before moving directories
697        // Otherwise, the old DB still holds a lock on rocksdb_sm/LOCK even after directory rename
698        let temp_dir = tempfile::TempDir::new()?;
699        let temp_db_path = temp_dir.path().join("temp_db");
700        let temp_db = Self::open_db(&temp_db_path).map_err(|e| {
701            error!("Failed to create temporary DB: {:?}", e);
702            e
703        })?;
704
705        // Swap temp DB into self.db, which releases the old DB's Arc
706        // Now old DB's Arc refcount drops to 0, DB instance is dropped, lock is released
707        self.db.store(Arc::new(temp_db));
708        info!("Swapped to temporary DB, old DB lock released");
709
710        // PHASE 3: Atomic directory replacement
711        let backup_dir = self.db_path.with_extension("backup");
712
713        // Remove old backup if exists
714        if backup_dir.exists() {
715            tokio::fs::remove_dir_all(&backup_dir).await?;
716        }
717
718        // Move current DB to backup
719        tokio::fs::rename(&self.db_path, &backup_dir).await?;
720        info!("Backed up current DB to: {:?}", backup_dir);
721
722        // Move checkpoint to DB path
723        tokio::fs::rename(&snapshot_dir, &self.db_path).await.inspect_err(|_e| {
724            // Rollback: restore from backup
725            let _ = std::fs::rename(&backup_dir, &self.db_path);
726        })?;
727        info!("Moved checkpoint to DB path: {:?}", self.db_path);
728
729        // PHASE 4: Open new DB from checkpoint
730        let new_db = Self::open_db(&self.db_path).map_err(|e| {
731            // Rollback: restore from backup
732            let _ = std::fs::rename(&backup_dir, &self.db_path);
733            error!("Failed to open new DB, rolled back to backup: {:?}", e);
734            e
735        })?;
736
737        // Atomically swap DB reference (replacing temp DB with new DB)
738        self.db.store(Arc::new(new_db));
739        info!("Atomically swapped to new DB instance");
740
741        // PHASE 5: Restore TTL state (if lease is configured)
742        if let Some(ref lease) = self.lease {
743            let ttl_path = self.db_path.join("ttl_state.bin");
744            if ttl_path.exists() {
745                let ttl_data = tokio::fs::read(&ttl_path).await?;
746                lease.reload(&ttl_data)?;
747
748                // Persist TTL state to metadata CF to ensure consistency after restart
749                // Without this, a subsequent restart (non-snapshot) would lose TTL state
750                // because load_lease_data() reads from metadata CF, not ttl_state.bin
751                self.persist_ttl_metadata()?;
752
753                info!("Lease state restored from snapshot and persisted to metadata CF");
754            } else {
755                warn!("No lease state found in snapshot");
756            }
757        }
758
759        // PHASE 6: Update metadata
760        *self.last_snapshot_metadata.write() = Some(metadata.clone());
761        if let Some(last_included) = &metadata.last_included {
762            self.update_last_applied(*last_included);
763        }
764
765        // PHASE 7: Resume serving
766        self.is_serving.store(true, Ordering::SeqCst);
767        info!("Resumed serving requests");
768
769        // PHASE 8: Clean up backup (best effort)
770        if let Err(e) = tokio::fs::remove_dir_all(&backup_dir).await {
771            warn!("Failed to remove backup directory: {}", e);
772        } else {
773            info!("Cleaned up backup directory");
774        }
775
776        info!("Snapshot applied successfully - full DB restoration complete");
777        Ok(())
778    }
779
780    #[instrument(skip(self))]
781    async fn generate_snapshot_data(
782        &self,
783        new_snapshot_dir: std::path::PathBuf,
784        last_included: LogId,
785    ) -> Result<Bytes, Error> {
786        // Create a checkpoint in the new_snapshot_dir
787        // Use scope to ensure checkpoint is dropped before await
788        {
789            let db = self.db.load();
790            let checkpoint = rocksdb::checkpoint::Checkpoint::new(db.as_ref())
791                .map_err(|e| StorageError::DbError(e.to_string()))?;
792            checkpoint
793                .create_checkpoint(&new_snapshot_dir)
794                .map_err(|e| StorageError::DbError(e.to_string()))?;
795        } // checkpoint dropped here, before any await
796
797        // Persist lease state alongside the checkpoint (if configured)
798        if let Some(ref lease) = self.lease {
799            let ttl_snapshot = lease.to_snapshot();
800            let ttl_path = new_snapshot_dir.join("ttl_state.bin");
801            tokio::fs::write(&ttl_path, ttl_snapshot).await?;
802        }
803
804        // Update metadata
805        let checksum = [0; 32]; // For now, we return a dummy checksum.
806        let snapshot_metadata = SnapshotMetadata {
807            last_included: Some(last_included),
808            checksum: Bytes::copy_from_slice(&checksum),
809        };
810        self.persist_last_snapshot_metadata(&snapshot_metadata)?;
811
812        info!("Snapshot generated at {:?} with TTL data", new_snapshot_dir);
813        Ok(Bytes::copy_from_slice(&checksum))
814    }
815
816    fn save_hard_state(&self) -> Result<(), Error> {
817        self.persist_state_machine_metadata()?;
818        self.persist_snapshot_metadata()?;
819        Ok(())
820    }
821
822    fn flush(&self) -> Result<(), Error> {
823        let db = self.db.load();
824
825        // Step 1: Sync WAL to disk (critical!)
826        // true = sync to disk
827        db.flush_wal(true).map_err(|e| StorageError::DbError(e.to_string()))?;
828        // Step 2: Flush memtables to SST files
829        db.flush().map_err(|e| StorageError::DbError(e.to_string()))?;
830
831        // Persist state machine metadata (last_applied_index, last_applied_term, snapshot_metadata)
832        self.persist_state_machine_metadata()?;
833
834        Ok(())
835    }
836
837    async fn flush_async(&self) -> Result<(), Error> {
838        self.flush()
839    }
840
841    #[instrument(skip(self))]
842    async fn reset(&self) -> Result<(), Error> {
843        let db = self.db.load();
844        let cf = db
845            .cf_handle(STATE_MACHINE_CF)
846            .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
847
848        // Delete all keys in the state machine
849        let mut batch = WriteBatch::default();
850        let iter = db.iterator_cf(&cf, IteratorMode::Start);
851
852        for item in iter {
853            let (key, _) = item.map_err(|e| StorageError::DbError(e.to_string()))?;
854            batch.delete_cf(&cf, &key);
855        }
856
857        db.write(batch).map_err(|e| StorageError::DbError(e.to_string()))?;
858
859        // Reset metadata
860        self.last_applied_index.store(0, Ordering::SeqCst);
861        self.last_applied_term.store(0, Ordering::SeqCst);
862        *self.last_snapshot_metadata.write() = None;
863
864        // Note: Lease is managed by NodeBuilder and doesn't need reset
865
866        self.persist_state_machine_metadata()?;
867        self.persist_snapshot_metadata()?;
868
869        info!("RocksDB state machine reset completed");
870        Ok(())
871    }
872
873    async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
874        // Fast path: no lease configured
875        let Some(ref lease) = self.lease else {
876            return Ok(vec![]);
877        };
878
879        // Get all expired keys
880        let now = SystemTime::now();
881        let expired_keys = lease.get_expired_keys(now);
882
883        if expired_keys.is_empty() {
884            return Ok(vec![]);
885        }
886
887        debug!(
888            "Lease background cleanup: found {} expired keys",
889            expired_keys.len()
890        );
891
892        // Delete expired keys from RocksDB
893        let db = self.db.load();
894        let cf = db
895            .cf_handle(STATE_MACHINE_CF)
896            .ok_or_else(|| StorageError::DbError("State machine CF not found".to_string()))?;
897
898        let mut batch = WriteBatch::default();
899        for key in &expired_keys {
900            batch.delete_cf(&cf, key);
901        }
902
903        self.apply_batch(batch)?;
904
905        info!(
906            "Lease background cleanup: deleted {} expired keys",
907            expired_keys.len()
908        );
909
910        Ok(expired_keys)
911    }
912}
913impl Drop for RocksDBStateMachine {
914    fn drop(&mut self) {
915        // save_hard_state() persists last_applied metadata before flush
916        // This is critical to prevent replay of already-applied entries on restart
917        if let Err(e) = self.save_hard_state() {
918            error!("Failed to save hard state on drop: {}", e);
919        }
920
921        // Then flush data to disk
922        if let Err(e) = self.flush() {
923            error!("Failed to flush on drop: {}", e);
924        } else {
925            debug!("RocksDBStateMachine flushed successfully on drop");
926        }
927
928        // This ensures flush operations are truly finished
929        self.db.load().cancel_all_background_work(true); // true = wait for completion
930        debug!("RocksDB background work cancelled on drop");
931    }
932}