Skip to main content

d_engine_server/storage/adaptors/file/
file_state_machine.rs

1//! File-based state machine implementation with crash recovery
2//!
3//! This module provides a durable state machine implementation using file-based storage
4//! with Write-Ahead Logging (WAL) for crash consistency.
5//!
6//! # Architecture
7//!
8//! ## Storage Components
9//!
10//! - **`state.data`**: Serialized key-value store (persisted after each apply_chunk)
11//! - **`wal.log`**: Write-Ahead Log for crash recovery (cleared after successful persistence)
12//! - **`ttl_state.bin`**: TTL manager state (persisted alongside state.data)
13//! - **`metadata.bin`**: Raft metadata (last_applied_index, last_applied_term)
14//!
15//! ## Write-Ahead Log (WAL) Design
16//!
17//! The WAL ensures crash consistency by recording operations before they are applied to
18//! in-memory state. Each WAL entry contains:
19//!
20//! ```text
21//! ┌─────────────────────────────────────────────────────────────────┐
22//! │ Entry Index (8 bytes) │ Entry Term (8 bytes) │ OpCode (1 byte) │
23//! ├─────────────────────────────────────────────────────────────────┤
24//! │ Key Length (8 bytes)  │ Key Data (N bytes)                      │
25//! ├─────────────────────────────────────────────────────────────────┤
26//! │ Value Length (8 bytes)│ Value Data (M bytes, if present)        │
27//! ├─────────────────────────────────────────────────────────────────┤
28//! │ Expire At (8 bytes, 0 = no TTL, >0 = UNIX timestamp in seconds) │
29//! └─────────────────────────────────────────────────────────────────┘
30//! ```
31//!
32//! ### TTL Semantics
33//!
34//! d-engine uses **absolute expiration time**:
35//!
36//! - When a key is created with TTL, the system calculates: `expire_at = now() + ttl_secs`
37//! - WAL stores the **absolute expiration timestamp** (UNIX seconds since epoch)
38//! - After crash recovery, expired keys are **not restored** (checked during replay)
39//! - TTL does **not reset** on restart (crash-safe)
40//!
41//! **Example:**
42//! ```text
43//! T0:  PUT key="foo", ttl=10s → expire_at = T0 + 10 = T10 (stored in WAL)
44//! T5:  CRASH
45//! T12: RESTART
46//!      → Replay WAL: expire_at = T10 < T12 (already expired)
47//!      → Key is NOT restored (correctly expired)
48//! ```
49//!
50//! **Why absolute time in WAL:**
51//! 1. Ensures expired keys stay expired after crash (durable expiration semantics)
52//! 2. Passive expiration (in get()) is crash-safe without WAL writes
53//! 3. No TTL reset on recovery (deterministic expiration)
54//!
55//! ### WAL Lifecycle
56//!
57//! WAL is the **primary** crash-safety path. Checkpoints are taken periodically
58//! (every 1000 entries or 10s) to bound replay time on recovery — not on every apply.
59//!
60//! ```text
61//! apply_chunk() → append_to_wal() → update memory → [if should_checkpoint()] → checkpoint()
62//!                                                                                  ├─ persist_data_async()
63//!                                                                                  ├─ persist_metadata_async()
64//!                                                                                  └─ clear_wal_async()
65//! ```
66//!
67//! On shutdown (`flush()`), a forced checkpoint ensures no data loss.
68//!
69//! ## Crash Recovery Flow
70//!
71//! On node startup (`new()`):
72//! 1. `load_metadata()` - Restore Raft state
73//! 2. `load_data()` - Load persisted key-value data
74//! 3. `load_ttl_data()` - Load persisted TTL state
75//! 4. `replay_wal()` - **Critical**: Replay uncommitted operations from WAL
76//!    - Restores keys AND their TTL metadata
77//!    - Ensures crash consistency (operations are idempotent)
78//!
79//! ## TTL Cleanup Strategy
80//!
81//! - **Background Cleanup**: Periodic async worker scans and deletes expired keys
82//! - **Zero Overhead**: When TTL feature is disabled, no lease components are initialized
83
84use std::collections::HashMap;
85use std::io::Write;
86use std::path::PathBuf;
87use std::sync::Arc;
88use std::sync::Mutex;
89use std::sync::atomic::AtomicBool;
90use std::sync::atomic::AtomicU64;
91use std::sync::atomic::Ordering;
92use std::time::SystemTime;
93
94use bytes::Bytes;
95use d_engine_core::ApplyResult;
96use d_engine_core::Error;
97use d_engine_core::Lease;
98use d_engine_core::StateMachine;
99use d_engine_core::StorageError;
100use d_engine_proto::client::WriteCommand;
101use d_engine_proto::client::write_command::CompareAndSwap;
102use d_engine_proto::client::write_command::Delete;
103use d_engine_proto::client::write_command::Insert;
104use d_engine_proto::client::write_command::Operation;
105use d_engine_proto::common::Entry;
106use d_engine_proto::common::LogId;
107use d_engine_proto::common::entry_payload::Payload;
108use d_engine_proto::server::storage::SnapshotMetadata;
109use parking_lot::RwLock;
110use prost::Message;
111use tokio::fs;
112use tokio::fs::File;
113use tokio::fs::OpenOptions;
114use tokio::io::AsyncReadExt;
115use tokio::io::AsyncWriteExt;
116use tokio::time::Instant;
117use tonic::async_trait;
118use tracing::debug;
119use tracing::error;
120use tracing::info;
121use tracing::warn;
122
123use crate::storage::DefaultLease;
124
125type FileStateMachineDataType = RwLock<HashMap<Bytes, (Bytes, u64)>>;
126
127/// WAL operation codes for fixed-size encoding
128#[repr(u8)]
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130enum WalOpCode {
131    Noop = 0,
132    Insert = 1,
133    Delete = 2,
134    Config = 3,
135    CompareAndSwap = 4,
136}
137
138impl WalOpCode {
139    fn from_str(s: &str) -> Self {
140        match s {
141            "INSERT" => Self::Insert,
142            "DELETE" => Self::Delete,
143            "CONFIG" => Self::Config,
144            "CAS" => Self::CompareAndSwap,
145            _ => Self::Noop,
146        }
147    }
148
149    fn from_u8(byte: u8) -> Self {
150        match byte {
151            1 => Self::Insert,
152            2 => Self::Delete,
153            3 => Self::Config,
154            4 => Self::CompareAndSwap,
155            _ => Self::Noop,
156        }
157    }
158}
159
160/// File-based state machine implementation with persistence
161///
162/// Design principles:
163/// - All data is persisted to disk for durability
164/// - In-memory cache for fast read operations
165/// - Write-ahead logging for crash consistency
166/// - Efficient snapshot handling with file-based storage
167/// - Thread-safe with minimal lock contention
168/// - TTL support for automatic key expiration
169#[derive(Debug)]
170pub struct FileStateMachine {
171    // Key-value storage with disk persistence
172    data: FileStateMachineDataType, // (value, term)
173
174    // Lease management for automatic key expiration
175    // DefaultLease is thread-safe internally (uses DashMap + Mutex)
176    // Injected by NodeBuilder after construction
177    lease: Option<Arc<DefaultLease>>,
178
179    /// Whether lease manager is enabled (immutable after init)
180    /// Set to true when lease is injected, never changes after that
181    ///
182    /// Invariant: lease_enabled == true ⟹ lease.is_some()
183    /// Performance: Allows safe unwrap_unchecked in hot paths
184    lease_enabled: bool,
185
186    // Raft state with disk persistence
187    last_applied_index: AtomicU64,
188    last_applied_term: AtomicU64,
189    last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
190
191    // Operational state
192    running: AtomicBool,
193
194    // Checkpoint state: WAL is the primary persistence path; checkpoint snapshots
195    // data periodically to bound crash-recovery replay time.
196    wal_entries_since_checkpoint: AtomicU64,
197    last_checkpoint: Mutex<Instant>,
198
199    // File handles for persistence
200    data_dir: PathBuf,
201    // data_file: RwLock<File>,
202    // metadata_file: RwLock<File>,
203    // wal_file: RwLock<File>, // Write-ahead log for crash recovery
204}
205
206impl FileStateMachine {
207    /// Creates a new file-based state machine with persistence
208    ///
209    /// Lease will be injected by NodeBuilder after construction.
210    ///
211    /// # Arguments
212    /// * `data_dir` - Directory where data files will be stored
213    ///
214    /// # Returns
215    /// Result containing the initialized FileStateMachine
216    pub async fn new(data_dir: PathBuf) -> Result<Self, Error> {
217        // Ensure data directory exists
218        fs::create_dir_all(&data_dir).await?;
219
220        let machine = Self {
221            data: RwLock::new(HashMap::new()),
222            lease: None,          // Will be injected by NodeBuilder
223            lease_enabled: false, // Default: no lease until set
224            last_applied_index: AtomicU64::new(0),
225            last_applied_term: AtomicU64::new(0),
226            last_snapshot_metadata: RwLock::new(None),
227            running: AtomicBool::new(true),
228            wal_entries_since_checkpoint: AtomicU64::new(0),
229            last_checkpoint: Mutex::new(Instant::now()),
230            data_dir: data_dir.clone(),
231        };
232
233        // Load existing data from disk
234        machine.load_from_disk().await?;
235
236        Ok(machine)
237    }
238
239    /// Sets the lease manager for this state machine.
240    ///
241    /// This is an internal method called by NodeBuilder during initialization.
242    /// The lease will also be restored from snapshot during `apply_snapshot_from_file()`.
243    /// Also available for testing and benchmarks.
244    pub fn set_lease(
245        &mut self,
246        lease: Arc<DefaultLease>,
247    ) {
248        // Mark lease as enabled (immutable after this point)
249        self.lease_enabled = true;
250        self.lease = Some(lease);
251    }
252
253    /// Injects lease configuration into this state machine.
254    ///
255    /// Framework-internal method: called by NodeBuilder::build() during initialization.
256    /// Loads state machine data from disk files
257    async fn load_from_disk(&self) -> Result<(), Error> {
258        // Load last applied index and term from metadata file
259        self.load_metadata().await?;
260
261        // Load key-value data from data file
262        self.load_data().await?;
263
264        // Load TTL data from disk
265        self.load_ttl_data().await?;
266
267        // Replay write-ahead log for crash recovery
268        self.replay_wal().await?;
269
270        info!("Loaded state machine data from disk");
271        Ok(())
272    }
273
274    /// Loads TTL data from disk (if lease is configured)
275    async fn load_ttl_data(&self) -> Result<(), Error> {
276        // Lease will be injected by NodeBuilder later
277        // The lease data will be loaded after injection
278        // For now, just skip this step during construction
279        Ok(())
280    }
281
282    /// Loads TTL data into the configured lease
283    ///
284    /// Called after NodeBuilder injects the lease.
285    /// Also available for testing and benchmarks.
286    pub async fn load_lease_data(&self) -> Result<(), Error> {
287        let Some(ref lease) = self.lease else {
288            return Ok(()); // No lease configured
289        };
290
291        let ttl_path = self.data_dir.join("ttl_state.bin");
292        if !ttl_path.exists() {
293            debug!("No TTL state file found");
294            return Ok(());
295        }
296
297        let ttl_data = tokio::fs::read(&ttl_path).await?;
298        lease.reload(&ttl_data)?;
299
300        info!("Loaded TTL state from disk: {} active TTLs", lease.len());
301        Ok(())
302    }
303
304    /// Loads metadata from disk
305    async fn load_metadata(&self) -> Result<(), Error> {
306        let metadata_path = self.data_dir.join("metadata.bin");
307        if !metadata_path.exists() {
308            return Ok(());
309        }
310
311        let mut file = File::open(metadata_path).await?;
312        let mut buffer = [0u8; 16];
313
314        if file.read_exact(&mut buffer).await.is_ok() {
315            let index = u64::from_be_bytes([
316                buffer[0], buffer[1], buffer[2], buffer[3], buffer[4], buffer[5], buffer[6],
317                buffer[7],
318            ]);
319
320            let term = u64::from_be_bytes([
321                buffer[8], buffer[9], buffer[10], buffer[11], buffer[12], buffer[13], buffer[14],
322                buffer[15],
323            ]);
324
325            self.last_applied_index.store(index, Ordering::SeqCst);
326            self.last_applied_term.store(term, Ordering::SeqCst);
327        }
328
329        Ok(())
330    }
331
332    /// Loads key-value data from disk
333    async fn load_data(&self) -> Result<(), Error> {
334        let data_path = self.data_dir.join("state.data");
335        if !data_path.exists() {
336            return Ok(());
337        }
338
339        let mut file = File::open(data_path).await?;
340        let mut buffer = Vec::new();
341        file.read_to_end(&mut buffer).await?;
342
343        let mut pos = 0;
344        let mut data = self.data.write();
345
346        while pos < buffer.len() {
347            // Read key length
348            if pos + 8 > buffer.len() {
349                break;
350            }
351
352            let key_len_bytes = &buffer[pos..pos + 8];
353            let key_len = u64::from_be_bytes([
354                key_len_bytes[0],
355                key_len_bytes[1],
356                key_len_bytes[2],
357                key_len_bytes[3],
358                key_len_bytes[4],
359                key_len_bytes[5],
360                key_len_bytes[6],
361                key_len_bytes[7],
362            ]) as usize;
363
364            pos += 8;
365
366            // Read key
367            if pos + key_len > buffer.len() {
368                break;
369            }
370
371            let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
372            pos += key_len;
373
374            // Read value length
375            if pos + 8 > buffer.len() {
376                break;
377            }
378
379            let value_len_bytes = &buffer[pos..pos + 8];
380            let value_len = u64::from_be_bytes([
381                value_len_bytes[0],
382                value_len_bytes[1],
383                value_len_bytes[2],
384                value_len_bytes[3],
385                value_len_bytes[4],
386                value_len_bytes[5],
387                value_len_bytes[6],
388                value_len_bytes[7],
389            ]) as usize;
390
391            pos += 8;
392
393            // Read value
394            if pos + value_len > buffer.len() {
395                break;
396            }
397
398            let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
399            pos += value_len;
400
401            // Read term
402            if pos + 8 > buffer.len() {
403                break;
404            }
405
406            let term_bytes = &buffer[pos..pos + 8];
407            let term = u64::from_be_bytes([
408                term_bytes[0],
409                term_bytes[1],
410                term_bytes[2],
411                term_bytes[3],
412                term_bytes[4],
413                term_bytes[5],
414                term_bytes[6],
415                term_bytes[7],
416            ]);
417
418            pos += 8;
419
420            // Store in memory
421            data.insert(key, (value, term));
422        }
423
424        Ok(())
425    }
426
427    /// Replays write-ahead log for crash recovery
428    async fn replay_wal(&self) -> Result<(), Error> {
429        let wal_path = self.data_dir.join("wal.log");
430        if !wal_path.exists() {
431            debug!("No WAL file found, skipping replay");
432            return Ok(());
433        }
434
435        let mut file = File::open(wal_path).await?;
436        let mut buffer = Vec::new();
437        file.read_to_end(&mut buffer).await?;
438
439        if buffer.is_empty() {
440            debug!("WAL file is empty, skipping replay");
441            return Ok(());
442        }
443
444        let mut pos = 0;
445        let mut operations = Vec::new();
446        let mut replayed_count = 0;
447
448        while pos + 17 < buffer.len() {
449            // Read entry index (8 bytes)
450            let _index = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
451            pos += 8;
452
453            // Read entry term (8 bytes)
454            let term = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
455            pos += 8;
456
457            // Read operation code (1 byte)
458            let op_code = WalOpCode::from_u8(buffer[pos]);
459            pos += 1;
460
461            // Check if we have enough bytes for key length
462            if pos + 8 > buffer.len() {
463                warn!("Incomplete key length at position {}, stopping replay", pos);
464                break;
465            }
466
467            // Read key length (8 bytes)
468            let key_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
469            pos += 8;
470
471            // Check if we have enough data for the key
472            if pos + key_len > buffer.len() {
473                warn!(
474                    "Incomplete key data at position {} (need {} bytes, have {})",
475                    pos,
476                    key_len,
477                    buffer.len() - pos
478                );
479                break;
480            }
481
482            // Read key
483            let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
484            pos += key_len;
485
486            // Check if we have enough bytes for value length
487            if pos + 8 > buffer.len() {
488                warn!(
489                    "Incomplete value length at position {}, stopping replay",
490                    pos
491                );
492                break;
493            }
494
495            // Read value length (8 bytes)
496            let value_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
497            pos += 8;
498
499            // Read value if present
500            let value = if value_len > 0 {
501                if pos + value_len > buffer.len() {
502                    warn!("Incomplete value data at position {}, stopping replay", pos);
503                    break;
504                }
505                let value_data = Bytes::from(buffer[pos..pos + value_len].to_vec());
506                pos += value_len;
507                Some(value_data)
508            } else {
509                None
510            };
511
512            // Read absolute expiration time (8 bytes) - 0 means no TTL
513            //
514            // WAL Format Migration Path:
515            // - Old format (pre-v0.2.0): ttl_secs (u32, 4 bytes, relative time)
516            // - New format (v0.2.0+): expire_at_secs (u64, 8 bytes, absolute time)
517            //
518            // Backward Compatibility Strategy:
519            // Since this is a breaking change and d-engine has not been deployed to production,
520            // we do NOT support reading old WAL format. All WAL entries must use the new format.
521            // If upgrading from pre-v0.2.0, users must:
522            // 1. Gracefully stop the old version (persists state.data + ttl_state.bin)
523            // 2. Upgrade to v0.2.0+
524            // 3. Start the new version (loads from persisted state, not WAL)
525            let expire_at_secs = if pos + 8 <= buffer.len() {
526                let secs = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
527                pos += 8;
528                if secs > 0 { Some(secs) } else { None }
529            } else {
530                // Incomplete WAL entry - log and skip
531                // This indicates corrupted WAL or incomplete write before crash
532                debug!(
533                    "No expiration time field at position {}, assuming no TTL (incomplete WAL entry)",
534                    pos
535                );
536                None
537            };
538
539            operations.push((op_code, key, value, term, expire_at_secs));
540            replayed_count += 1;
541        }
542
543        info!(
544            "Parsed {} WAL operations, applying to memory",
545            operations.len()
546        );
547
548        // Apply all collected operations with a single lock acquisition
549        let mut applied_count = 0;
550        let mut skipped_expired = 0;
551        let now = std::time::SystemTime::now();
552        {
553            let mut data = self.data.write();
554
555            for (op_code, key, value, term, expire_at_secs) in operations {
556                match op_code {
557                    WalOpCode::Insert => {
558                        if let Some(value_data) = value {
559                            // Check if key is already expired (crash-safe TTL semantics)
560                            let is_expired = if let Some(secs) = expire_at_secs {
561                                let expire_at =
562                                    std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs);
563                                now >= expire_at
564                            } else {
565                                false
566                            };
567
568                            if is_expired {
569                                // Skip restoring expired keys (durable expiration semantics)
570                                debug!("Skipped expired key during WAL replay: key={:?}", key);
571                                skipped_expired += 1;
572                                continue;
573                            }
574
575                            data.insert(key.clone(), (value_data, term));
576
577                            // Restore TTL from WAL (if lease configured and has expiration)
578                            if let Some(secs) = expire_at_secs {
579                                if let Some(ref lease) = self.lease {
580                                    let expire_at = std::time::UNIX_EPOCH
581                                        + std::time::Duration::from_secs(secs);
582                                    let remaining = expire_at
583                                        .duration_since(now)
584                                        .map(|d| d.as_secs())
585                                        .unwrap_or(0);
586
587                                    if remaining > 0 {
588                                        lease.register(key.clone(), remaining);
589                                        debug!(
590                                            "Replayed INSERT with TTL: key={:?}, remaining={}s",
591                                            key, remaining
592                                        );
593                                    }
594                                }
595                            } else {
596                                debug!("Replayed INSERT: key={:?}", key);
597                            }
598
599                            applied_count += 1;
600                        } else {
601                            warn!("INSERT operation without value");
602                        }
603                    }
604                    WalOpCode::Delete => {
605                        data.remove(&key);
606                        if let Some(ref lease) = self.lease {
607                            lease.unregister(&key);
608                        }
609                        applied_count += 1;
610                        debug!("Replayed DELETE: key={:?}", key);
611                    }
612                    WalOpCode::CompareAndSwap => {
613                        // CAS during replay: Just apply the new_value if present
614                        // The comparison was already done before crash, and succeeded
615                        // (otherwise this entry wouldn't be in WAL)
616                        if let Some(new_value) = value {
617                            data.insert(key.clone(), (new_value, term));
618                            applied_count += 1;
619                            debug!("Replayed CAS: key={:?}", key);
620                        } else {
621                            warn!("CAS operation without new_value in WAL");
622                        }
623                    }
624                    WalOpCode::Noop | WalOpCode::Config => {
625                        // No data modification needed
626                        applied_count += 1;
627                        debug!("Replayed {:?} operation", op_code);
628                    }
629                }
630            }
631        }
632
633        info!(
634            "WAL replay complete: {} operations replayed, {} applied, {} expired keys skipped",
635            replayed_count, applied_count, skipped_expired
636        );
637
638        // Clear WAL only if replay was successful
639        if applied_count > 0 {
640            self.clear_wal_async().await?;
641            debug!(
642                "Cleared WAL after successful replay of {} operations",
643                applied_count
644            );
645        }
646
647        Ok(())
648    }
649
650    /// Persists key-value data to disk
651    fn persist_data(&self) -> Result<(), Error> {
652        // Collect data first to minimize lock time
653        let data_copy: HashMap<Bytes, (Bytes, u64)> = {
654            let data = self.data.read();
655            data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
656        };
657
658        // Batch serialize into a single buffer — mirrors persist_data_async().
659        let data_path = self.data_dir.join("state.data");
660
661        let estimated: usize =
662            data_copy.iter().map(|(k, (v, _))| 8 + k.len() + 8 + v.len() + 8).sum();
663        let mut buf = Vec::with_capacity(estimated);
664
665        for (key, (value, term)) in &data_copy {
666            buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
667            buf.extend_from_slice(key);
668            buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
669            buf.extend_from_slice(value);
670            buf.extend_from_slice(&term.to_be_bytes());
671        }
672
673        std::fs::write(data_path, buf)?;
674        Ok(())
675    }
676
677    /// Persists key-value data to disk
678    async fn persist_data_async(&self) -> Result<(), Error> {
679        // Collect data under minimal lock scope
680        let data_copy: HashMap<Bytes, (Bytes, u64)> = {
681            let data = self.data.read();
682            data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
683        };
684
685        let data_path = self.data_dir.join("state.data");
686        let mut file = OpenOptions::new()
687            .write(true)
688            .create(true)
689            .truncate(true)
690            .open(data_path)
691            .await?;
692
693        // Batch serialize into a single buffer — eliminates per-entry async yield overhead.
694        // Mirrors append_to_wal's approach for consistent I/O pattern.
695        let estimated: usize =
696            data_copy.iter().map(|(k, (v, _))| 8 + k.len() + 8 + v.len() + 8).sum();
697        let mut buf = Vec::with_capacity(estimated);
698
699        for (key, (value, term)) in &data_copy {
700            buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
701            buf.extend_from_slice(key);
702            buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
703            buf.extend_from_slice(value);
704            buf.extend_from_slice(&term.to_be_bytes());
705        }
706
707        file.write_all(&buf).await?;
708        file.flush().await?;
709
710        Ok(())
711    }
712
713    /// Persists metadata to disk
714    fn persist_metadata(&self) -> Result<(), Error> {
715        let metadata_path = self.data_dir.join("metadata.bin");
716        let mut file = std::fs::OpenOptions::new()
717            .write(true)
718            .create(true)
719            .truncate(true)
720            .open(metadata_path)?;
721
722        let index = self.last_applied_index.load(Ordering::SeqCst);
723        let term = self.last_applied_term.load(Ordering::SeqCst);
724
725        file.write_all(&index.to_be_bytes())?;
726        file.write_all(&term.to_be_bytes())?;
727
728        file.flush()?;
729        Ok(())
730    }
731
732    async fn persist_metadata_async(&self) -> Result<(), Error> {
733        let metadata_path = self.data_dir.join("metadata.bin");
734        let mut file = OpenOptions::new()
735            .write(true)
736            .create(true)
737            .truncate(true)
738            .open(metadata_path)
739            .await?;
740
741        let index = self.last_applied_index.load(Ordering::SeqCst);
742        let term = self.last_applied_term.load(Ordering::SeqCst);
743
744        file.write_all(&index.to_be_bytes()).await?;
745        file.write_all(&term.to_be_bytes()).await?;
746
747        file.flush().await?;
748        Ok(())
749    }
750
751    /// Clears the write-ahead log (called after successful persistence)
752    #[allow(unused)]
753    fn clear_wal(&self) -> Result<(), Error> {
754        let wal_path = self.data_dir.join("wal.log");
755        let mut file = std::fs::OpenOptions::new()
756            .write(true)
757            .create(true)
758            .truncate(true)
759            .open(wal_path)?;
760
761        file.set_len(0)?;
762        file.flush()?;
763        Ok(())
764    }
765
766    /// Clears the write-ahead log (called after successful persistence)
767    async fn clear_wal_async(&self) -> Result<(), Error> {
768        let wal_path = self.data_dir.join("wal.log");
769        let mut file = OpenOptions::new()
770            .write(true)
771            .create(true)
772            .truncate(true)
773            .open(wal_path)
774            .await?;
775
776        file.set_len(0).await?;
777        file.flush().await?;
778        Ok(())
779    }
780
781    /// Returns true if a checkpoint should be taken now.
782    ///
783    /// Triggers on any of:
784    /// - WAL has accumulated ≥ 1000 entries since last checkpoint
785    /// - Last checkpoint was > 10s ago
786    fn should_checkpoint(&self) -> bool {
787        const WAL_ENTRY_THRESHOLD: u64 = 1000;
788        const TIME_THRESHOLD_SECS: u64 = 10;
789
790        if self.wal_entries_since_checkpoint.load(Ordering::Relaxed) >= WAL_ENTRY_THRESHOLD {
791            return true;
792        }
793        if let Ok(last) = self.last_checkpoint.lock() {
794            return last.elapsed().as_secs() >= TIME_THRESHOLD_SECS;
795        }
796        false
797    }
798
799    /// Checkpoint: snapshot full data + metadata to disk, then clear WAL.
800    ///
801    /// WAL is the primary crash-safety path; checkpoint bounds recovery replay time.
802    pub(crate) async fn checkpoint(&self) -> Result<(), Error> {
803        self.persist_data_async().await?;
804        self.persist_metadata_async().await?;
805        self.clear_wal_async().await?;
806
807        self.wal_entries_since_checkpoint.store(0, Ordering::Relaxed);
808        if let Ok(mut last) = self.last_checkpoint.lock() {
809            *last = Instant::now();
810        }
811        debug!("Checkpoint complete");
812        Ok(())
813    }
814
815    /// Resets the state machine to its initial empty state
816    ///
817    /// This method:
818    /// 1. Clears all in-memory data
819    /// 2. Resets Raft state to initial values
820    /// 3. Clears all persisted files
821    /// 4. Maintains operational state (running status, node ID)
822    pub async fn reset(&self) -> Result<(), Error> {
823        info!("Resetting state machine");
824
825        // Clear in-memory data
826        {
827            let mut data = self.data.write();
828            data.clear();
829        }
830
831        // Reset Raft state
832        self.last_applied_index.store(0, Ordering::SeqCst);
833        self.last_applied_term.store(0, Ordering::SeqCst);
834
835        {
836            let mut snapshot_metadata = self.last_snapshot_metadata.write();
837            *snapshot_metadata = None;
838        }
839
840        // Clear all persisted files
841        self.clear_data_file().await?;
842        self.clear_metadata_file().await?;
843        self.clear_wal_async().await?;
844
845        info!("State machine reset completed");
846        Ok(())
847    }
848
849    /// Clears the data file
850    async fn clear_data_file(&self) -> Result<(), Error> {
851        let data_path = self.data_dir.join("state.data");
852        let mut file = OpenOptions::new()
853            .write(true)
854            .create(true)
855            .truncate(true)
856            .open(data_path)
857            .await?;
858
859        file.set_len(0).await?;
860        file.flush().await?;
861        Ok(())
862    }
863
864    /// Clears the metadata file
865    async fn clear_metadata_file(&self) -> Result<(), Error> {
866        let metadata_path = self.data_dir.join("metadata.bin");
867        let mut file = OpenOptions::new()
868            .write(true)
869            .create(true)
870            .truncate(true)
871            .open(metadata_path)
872            .await?;
873
874        // Write default values (0 for both index and term)
875        file.write_all(&0u64.to_be_bytes()).await?;
876        file.write_all(&0u64.to_be_bytes()).await?;
877
878        file.flush().await?;
879        Ok(())
880    }
881
882    /// Batch WAL writes with proper durability guarantees
883    ///
884    /// Format per entry:
885    /// - 8 bytes: entry index (big-endian u64)
886    /// - 8 bytes: entry term (big-endian u64)
887    /// - 1 byte: operation code (0=NOOP, 1=INSERT, 2=DELETE, 3=CONFIG)
888    /// - 8 bytes: key length (big-endian u64)
889    /// - N bytes: key data
890    /// - 8 bytes: value length (big-endian u64, 0 if no value)
891    /// - M bytes: value data (only if length > 0)
892    pub(crate) async fn append_to_wal(
893        &self,
894        entries: Vec<(Entry, String, Bytes, Option<Bytes>, u64)>,
895    ) -> Result<(), Error> {
896        if entries.is_empty() {
897            return Ok(());
898        }
899
900        let wal_path = self.data_dir.join("wal.log");
901
902        let mut file =
903            OpenOptions::new().write(true).create(true).append(true).open(&wal_path).await?;
904
905        // Pre-allocate buffer with estimated size
906        let estimated_size: usize = entries
907            .iter()
908            .map(|(_, _, key, value, _)| {
909                8 + 8 + 1 + 8 + key.len() + 8 + value.as_ref().map_or(0, |v| v.len()) + 8
910            })
911            .sum();
912
913        // Single batched write instead of multiple small writes
914        let mut batch_buffer = Vec::with_capacity(estimated_size);
915
916        for (entry, operation, key, value, ttl_secs) in entries {
917            // Write entry index and term (16 bytes total)
918            batch_buffer.extend_from_slice(&entry.index.to_be_bytes());
919            batch_buffer.extend_from_slice(&entry.term.to_be_bytes());
920
921            // Write operation code (1 byte)
922            let op_code = WalOpCode::from_str(&operation);
923            batch_buffer.push(op_code as u8);
924
925            // Write key length and data (8 + N bytes)
926            batch_buffer.extend_from_slice(&(key.len() as u64).to_be_bytes());
927            batch_buffer.extend_from_slice(&key);
928
929            // Write value length and data (8 + M bytes)
930            // Always write length field for consistent format
931            if let Some(value_data) = value {
932                batch_buffer.extend_from_slice(&(value_data.len() as u64).to_be_bytes());
933                batch_buffer.extend_from_slice(&value_data);
934            } else {
935                // Write 0 length for operations without value
936                batch_buffer.extend_from_slice(&0u64.to_be_bytes());
937            }
938
939            // Write absolute expiration time (8 bytes) - 0 means no TTL
940            // Store UNIX timestamp (seconds since epoch) for crash-safe expiration
941            let expire_at_secs = if ttl_secs > 0 {
942                let expire_at =
943                    std::time::SystemTime::now() + std::time::Duration::from_secs(ttl_secs);
944                expire_at
945                    .duration_since(std::time::UNIX_EPOCH)
946                    .map(|d| d.as_secs())
947                    .unwrap_or(0)
948            } else {
949                0
950            };
951            batch_buffer.extend_from_slice(&expire_at_secs.to_be_bytes());
952        }
953
954        file.write_all(&batch_buffer).await?;
955        file.flush().await?;
956
957        Ok(())
958    }
959}
960
961impl Drop for FileStateMachine {
962    fn drop(&mut self) {
963        let timer = Instant::now();
964
965        // Save state into local database including flush operation
966        match self.save_hard_state() {
967            Ok(_) => debug!("StateMachine saved in {:?}", timer.elapsed()),
968            Err(e) => error!("Failed to save StateMachine: {}", e),
969        }
970    }
971}
972
973#[async_trait]
974impl StateMachine for FileStateMachine {
975    async fn start(&self) -> Result<(), Error> {
976        self.running.store(true, Ordering::SeqCst);
977
978        // Load persisted lease data if configured
979        if self.lease.is_some() {
980            self.load_lease_data().await?;
981            debug!("Lease data loaded during state machine initialization");
982        }
983
984        info!("File state machine started");
985        Ok(())
986    }
987
988    fn stop(&self) -> Result<(), Error> {
989        // Ensure all data is flushed to disk before stopping
990        self.running.store(false, Ordering::SeqCst);
991
992        // Graceful shutdown: persist TTL state to disk
993        // This ensures lease data survives across restarts
994        if let Some(ref lease) = self.lease {
995            let ttl_snapshot = lease.to_snapshot();
996            let ttl_path = self.data_dir.join("ttl_state.bin");
997            // Use blocking write since stop() is sync
998            std::fs::write(&ttl_path, ttl_snapshot)
999                .map_err(d_engine_core::StorageError::IoError)?;
1000            debug!("Persisted TTL state on shutdown");
1001        }
1002
1003        info!("File state machine stopped");
1004        Ok(())
1005    }
1006
1007    fn is_running(&self) -> bool {
1008        self.running.load(Ordering::SeqCst)
1009    }
1010
1011    fn get(
1012        &self,
1013        key_buffer: &[u8],
1014    ) -> Result<Option<Bytes>, Error> {
1015        // Lazy cleanup: only check expiration in Lazy strategy
1016        // Background strategy handles cleanup in dedicated async task
1017        // Background cleanup strategy: expired keys are cleaned by background worker
1018        // No on-read checks needed (simplifies get() hot path)
1019        let data = self.data.read();
1020        Ok(data.get(key_buffer).map(|(value, _)| value.clone()))
1021    }
1022
1023    fn entry_term(
1024        &self,
1025        entry_id: u64,
1026    ) -> Option<u64> {
1027        let data = self.data.read();
1028        data.values().find(|(_, index)| *index == entry_id).map(|(_, term)| *term)
1029    }
1030
1031    /// Thread-safe: called serially by single-task CommitHandler
1032    async fn apply_chunk(
1033        &self,
1034        chunk: Vec<Entry>,
1035    ) -> Result<Vec<ApplyResult>, Error> {
1036        let chunk_len = chunk.len();
1037        let mut highest_index_entry: Option<LogId> = None;
1038        let mut batch_operations = Vec::new();
1039        let mut results = Vec::with_capacity(chunk_len);
1040
1041        // PHASE 1: Decode all operations and prepare WAL entries
1042        for entry in chunk {
1043            let entry_index = entry.index;
1044
1045            assert!(entry.payload.is_some(), "Entry payload should not be None!");
1046
1047            // Ensure entries are processed in order
1048            if let Some(prev) = &highest_index_entry {
1049                assert!(
1050                    entry.index > prev.index,
1051                    "apply_chunk: received unordered entry at index {} (prev={})",
1052                    entry.index,
1053                    prev.index
1054                );
1055            }
1056            highest_index_entry = Some(LogId {
1057                index: entry.index,
1058                term: entry.term,
1059            });
1060
1061            // Decode operations without holding locks
1062            match entry.payload.as_ref().unwrap().payload.as_ref() {
1063                Some(Payload::Noop(_)) => {
1064                    let entry_index = entry.index;
1065                    debug!("Handling NOOP command at index {}", entry_index);
1066                    batch_operations.push((entry, "NOOP", Bytes::new(), None, 0));
1067                    results.push(ApplyResult::success(entry_index));
1068                }
1069                Some(Payload::Command(bytes)) => match WriteCommand::decode(&bytes[..]) {
1070                    Ok(write_cmd) => {
1071                        // Extract operation data for batch processing
1072                        match write_cmd.operation {
1073                            Some(Operation::Insert(Insert {
1074                                key,
1075                                value,
1076                                ttl_secs,
1077                            })) => {
1078                                let entry_index = entry.index;
1079                                batch_operations.push((
1080                                    entry,
1081                                    "INSERT",
1082                                    key,
1083                                    Some(value),
1084                                    ttl_secs,
1085                                ));
1086                                results.push(ApplyResult::success(entry_index));
1087                            }
1088                            Some(Operation::Delete(Delete { key })) => {
1089                                let entry_index = entry.index;
1090                                batch_operations.push((entry, "DELETE", key, None, 0));
1091                                results.push(ApplyResult::success(entry_index));
1092                            }
1093                            Some(Operation::CompareAndSwap(CompareAndSwap {
1094                                key,
1095                                expected_value: _,
1096                                new_value,
1097                            })) => {
1098                                batch_operations.push((entry, "CAS", key, Some(new_value), 0));
1099                                // Note: CAS result will be pushed in PHASE 3 after comparison
1100                            }
1101                            None => {
1102                                warn!("WriteCommand without operation at index {}", entry.index);
1103                                batch_operations.push((entry, "NOOP", Bytes::new(), None, 0));
1104                            }
1105                        }
1106                    }
1107                    Err(e) => {
1108                        error!(
1109                            "Failed to decode WriteCommand at index {}: {:?}",
1110                            entry.index, e
1111                        );
1112                        return Err(StorageError::SerializationError(e.to_string()).into());
1113                    }
1114                },
1115                Some(Payload::Config(_config_change)) => {
1116                    debug!("Ignoring config change at index {}", entry.index);
1117                    batch_operations.push((entry, "CONFIG", Bytes::new(), None, 0));
1118                }
1119                None => panic!("Entry payload variant should not be None!"),
1120            }
1121
1122            info!("COMMITTED_LOG_METRIC: {}", entry_index);
1123        }
1124
1125        // PHASE 2: Batch WAL writes (minimize I/O latency)
1126        let mut wal_entries = Vec::new();
1127        for (entry, operation, key, value, ttl_secs) in &batch_operations {
1128            // Prepare WAL data without immediate I/O - include TTL for crash recovery
1129            wal_entries.push((
1130                entry.clone(),
1131                operation.to_string(),
1132                key.clone(),
1133                value.clone(),
1134                *ttl_secs, // ttl_secs is now u64 (0 = no TTL) from protobuf
1135            ));
1136        }
1137
1138        // Single batch WAL write (reduces I/O overhead)
1139        self.append_to_wal(wal_entries).await?;
1140
1141        // PHASE 3: Fast in-memory updates with minimal lock time (ZERO-COPY)
1142        {
1143            let mut data = self.data.write();
1144
1145            // Process all operations without any awaits inside the lock
1146            for (entry, operation, key, value, ttl_secs) in batch_operations {
1147                match operation {
1148                    "NOOP" => {
1149                        // NOOP result already pushed in PHASE 1
1150                    }
1151                    "INSERT" => {
1152                        if let Some(value) = value {
1153                            // ZERO-COPY: Use existing Bytes without cloning if possible
1154                            data.insert(key.clone(), (value, entry.term));
1155
1156                            // Register lease if TTL specified
1157                            if ttl_secs > 0 {
1158                                // Validate lease is enabled before accepting TTL requests
1159                                if !self.lease_enabled {
1160                                    return Err(StorageError::FeatureNotEnabled(
1161                                        "TTL feature is not enabled on this server. \
1162                                         Enable it in config: [raft.state_machine.lease] enabled = true".into()
1163                                    ).into());
1164                                }
1165
1166                                // Safety: lease_enabled invariant ensures lease.is_some()
1167                                let lease = unsafe { self.lease.as_ref().unwrap_unchecked() };
1168                                lease.register(key, ttl_secs);
1169                            }
1170                        }
1171                        // Result already pushed in PHASE 1
1172                    }
1173                    "DELETE" => {
1174                        data.remove(&key);
1175                        if let Some(ref lease) = self.lease {
1176                            lease.unregister(&key);
1177                        }
1178                        // Result already pushed in PHASE 1
1179                    }
1180                    "CAS" => {
1181                        // Extract expected_value from original entry
1182                        if let Some(Payload::Command(bytes)) =
1183                            entry.payload.as_ref().unwrap().payload.as_ref()
1184                        {
1185                            if let Ok(write_cmd) = WriteCommand::decode(&bytes[..]) {
1186                                if let Some(Operation::CompareAndSwap(CompareAndSwap {
1187                                    expected_value,
1188                                    ..
1189                                })) = write_cmd.operation
1190                                {
1191                                    // Read-compare-write is safe due to sequential apply
1192                                    let current_value = data.get(&key);
1193
1194                                    let cas_success = match (current_value, &expected_value) {
1195                                        (Some((current, _)), Some(expected)) => {
1196                                            current.as_ref() == expected.as_ref()
1197                                        }
1198                                        (None, None) => true,
1199                                        _ => false,
1200                                    };
1201
1202                                    // Store CAS result for client response
1203                                    results.push(if cas_success {
1204                                        ApplyResult::success(entry.index)
1205                                    } else {
1206                                        ApplyResult::failure(entry.index)
1207                                    });
1208
1209                                    debug!(
1210                                        "CAS at index {}: key={:?}, success={}",
1211                                        entry.index,
1212                                        String::from_utf8_lossy(&key),
1213                                        cas_success
1214                                    );
1215
1216                                    if cas_success {
1217                                        if let Some(new_value) = value {
1218                                            data.insert(key, (new_value, entry.term));
1219                                        }
1220                                    }
1221                                }
1222                            }
1223                        }
1224                    }
1225                    "CONFIG" => {
1226                        // No data modification needed
1227                    }
1228                    _ => warn!("Unknown operation: {}", operation),
1229                }
1230            }
1231        } // Lock released immediately - no awaits inside!
1232
1233        // PHASE 4: Update last applied index and conditionally checkpoint.
1234        // WAL (written in PHASE 2) is the primary crash-safety path.
1235        // Checkpoint snapshots full data periodically to bound WAL replay time on recovery.
1236        if let Some(log_id) = highest_index_entry {
1237            debug!("State machine - updated last_applied: {:?}", log_id);
1238            self.update_last_applied(log_id);
1239        }
1240
1241        self.wal_entries_since_checkpoint.fetch_add(chunk_len as u64, Ordering::Relaxed);
1242
1243        if self.should_checkpoint() {
1244            self.checkpoint().await?;
1245        }
1246
1247        Ok(results)
1248    }
1249
1250    fn len(&self) -> usize {
1251        self.data.read().len()
1252    }
1253
1254    fn update_last_applied(
1255        &self,
1256        last_applied: LogId,
1257    ) {
1258        self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
1259        self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
1260    }
1261
1262    fn last_applied(&self) -> LogId {
1263        LogId {
1264            index: self.last_applied_index.load(Ordering::SeqCst),
1265            term: self.last_applied_term.load(Ordering::SeqCst),
1266        }
1267    }
1268
1269    fn persist_last_applied(
1270        &self,
1271        last_applied: LogId,
1272    ) -> Result<(), Error> {
1273        self.update_last_applied(last_applied);
1274        self.persist_metadata()
1275    }
1276
1277    fn update_last_snapshot_metadata(
1278        &self,
1279        snapshot_metadata: &SnapshotMetadata,
1280    ) -> Result<(), Error> {
1281        *self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
1282        Ok(())
1283    }
1284
1285    fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
1286        self.last_snapshot_metadata.read().clone()
1287    }
1288
1289    fn persist_last_snapshot_metadata(
1290        &self,
1291        snapshot_metadata: &SnapshotMetadata,
1292    ) -> Result<(), Error> {
1293        self.update_last_snapshot_metadata(snapshot_metadata)
1294    }
1295
1296    async fn apply_snapshot_from_file(
1297        &self,
1298        metadata: &SnapshotMetadata,
1299        snapshot_dir: std::path::PathBuf,
1300    ) -> Result<(), Error> {
1301        info!("Applying snapshot from file: {:?}", snapshot_dir);
1302
1303        // Read from the snapshot.bin file inside the directory
1304        let snapshot_data_path = snapshot_dir.join("snapshot.bin");
1305        let mut file = File::open(snapshot_data_path).await?;
1306        let mut buffer = Vec::new();
1307        file.read_to_end(&mut buffer).await?;
1308
1309        // Parse snapshot data
1310        let mut pos = 0;
1311        let mut new_data = HashMap::new();
1312
1313        while pos < buffer.len() {
1314            // Read key length
1315            if pos + 8 > buffer.len() {
1316                break;
1317            }
1318
1319            let key_len_bytes = &buffer[pos..pos + 8];
1320            let key_len = u64::from_be_bytes([
1321                key_len_bytes[0],
1322                key_len_bytes[1],
1323                key_len_bytes[2],
1324                key_len_bytes[3],
1325                key_len_bytes[4],
1326                key_len_bytes[5],
1327                key_len_bytes[6],
1328                key_len_bytes[7],
1329            ]) as usize;
1330
1331            pos += 8;
1332
1333            // Read key
1334            if pos + key_len > buffer.len() {
1335                break;
1336            }
1337
1338            let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
1339            pos += key_len;
1340
1341            // Read value length
1342            if pos + 8 > buffer.len() {
1343                break;
1344            }
1345
1346            let value_len_bytes = &buffer[pos..pos + 8];
1347            let value_len = u64::from_be_bytes([
1348                value_len_bytes[0],
1349                value_len_bytes[1],
1350                value_len_bytes[2],
1351                value_len_bytes[3],
1352                value_len_bytes[4],
1353                value_len_bytes[5],
1354                value_len_bytes[6],
1355                value_len_bytes[7],
1356            ]) as usize;
1357
1358            pos += 8;
1359
1360            // Read value
1361            if pos + value_len > buffer.len() {
1362                break;
1363            }
1364
1365            let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
1366            pos += value_len;
1367
1368            // Read term
1369            if pos + 8 > buffer.len() {
1370                break;
1371            }
1372
1373            let term_bytes = &buffer[pos..pos + 8];
1374            let term = u64::from_be_bytes([
1375                term_bytes[0],
1376                term_bytes[1],
1377                term_bytes[2],
1378                term_bytes[3],
1379                term_bytes[4],
1380                term_bytes[5],
1381                term_bytes[6],
1382                term_bytes[7],
1383            ]);
1384
1385            pos += 8;
1386
1387            // Add to new data
1388            new_data.insert(key, (value, term));
1389        }
1390
1391        // Read and reload lease data if present
1392        if pos + 8 <= buffer.len() {
1393            let ttl_len_bytes = &buffer[pos..pos + 8];
1394            let ttl_len = u64::from_be_bytes([
1395                ttl_len_bytes[0],
1396                ttl_len_bytes[1],
1397                ttl_len_bytes[2],
1398                ttl_len_bytes[3],
1399                ttl_len_bytes[4],
1400                ttl_len_bytes[5],
1401                ttl_len_bytes[6],
1402                ttl_len_bytes[7],
1403            ]) as usize;
1404            pos += 8;
1405
1406            if pos + ttl_len <= buffer.len() {
1407                let ttl_data = &buffer[pos..pos + ttl_len];
1408                if let Some(ref lease) = self.lease {
1409                    lease.reload(ttl_data)?;
1410                }
1411            }
1412        }
1413
1414        // Atomically replace the data
1415        {
1416            let mut data = self.data.write();
1417            *data = new_data;
1418        }
1419
1420        // Update metadata
1421        *self.last_snapshot_metadata.write() = Some(metadata.clone());
1422
1423        if let Some(last_included) = &metadata.last_included {
1424            self.update_last_applied(*last_included);
1425        }
1426
1427        // Persist to disk
1428        self.persist_data_async().await?;
1429        self.persist_metadata_async().await?;
1430        self.clear_wal_async().await?;
1431
1432        info!("Snapshot applied successfully");
1433        Ok(())
1434    }
1435
1436    async fn generate_snapshot_data(
1437        &self,
1438        new_snapshot_dir: std::path::PathBuf,
1439        last_included: LogId,
1440    ) -> Result<Bytes, Error> {
1441        info!("Generating snapshot data up to {:?}", last_included);
1442
1443        // Create snapshot directory
1444        fs::create_dir_all(&new_snapshot_dir).await?;
1445
1446        // Create snapshot file
1447        let snapshot_path = new_snapshot_dir.join("snapshot.bin");
1448        let mut file = File::create(&snapshot_path).await?;
1449
1450        let data_copy: HashMap<Bytes, (Bytes, u64)> = {
1451            let data = self.data.read();
1452            data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
1453        };
1454
1455        // Batch serialize into a single buffer — eliminates per-entry async yield overhead.
1456        let lease_snapshot = if let Some(ref lease) = self.lease {
1457            lease.to_snapshot()
1458        } else {
1459            Vec::new()
1460        };
1461        let estimated: usize =
1462            data_copy.iter().map(|(k, (v, _))| 8 + k.len() + 8 + v.len() + 8).sum::<usize>()
1463                + 8
1464                + lease_snapshot.len();
1465        let mut buf = Vec::with_capacity(estimated);
1466
1467        for (key, (value, term)) in &data_copy {
1468            buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
1469            buf.extend_from_slice(key);
1470            buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
1471            buf.extend_from_slice(value);
1472            buf.extend_from_slice(&term.to_be_bytes());
1473        }
1474
1475        buf.extend_from_slice(&(lease_snapshot.len() as u64).to_be_bytes());
1476        buf.extend_from_slice(&lease_snapshot);
1477
1478        file.write_all(&buf).await?;
1479
1480        file.flush().await?;
1481
1482        // Update metadata
1483        let metadata = SnapshotMetadata {
1484            last_included: Some(last_included),
1485            checksum: Bytes::from(vec![0; 32]), // Simple checksum for demo
1486        };
1487
1488        self.update_last_snapshot_metadata(&metadata)?;
1489
1490        info!("Snapshot generated at {:?}", snapshot_path);
1491
1492        // Return dummy checksum
1493        Ok(Bytes::from_static(&[0u8; 32]))
1494    }
1495
1496    fn save_hard_state(&self) -> Result<(), Error> {
1497        let last_applied = self.last_applied();
1498        self.persist_last_applied(last_applied)?;
1499
1500        if let Some(last_snapshot_metadata) = self.snapshot_metadata() {
1501            self.persist_last_snapshot_metadata(&last_snapshot_metadata)?;
1502        }
1503
1504        self.flush()?;
1505        Ok(())
1506    }
1507
1508    fn flush(&self) -> Result<(), Error> {
1509        self.persist_data()?;
1510        self.persist_metadata()?;
1511        // self.clear_wal()?;
1512        Ok(())
1513    }
1514
1515    async fn flush_async(&self) -> Result<(), Error> {
1516        // Force checkpoint on shutdown to ensure WAL entries are not lost.
1517        self.checkpoint().await
1518    }
1519
1520    async fn reset(&self) -> Result<(), Error> {
1521        self.reset().await
1522    }
1523
1524    async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
1525        // Fast path: no lease configured
1526        let Some(ref lease) = self.lease else {
1527            return Ok(vec![]);
1528        };
1529
1530        // Get all expired keys
1531        let now = SystemTime::now();
1532        let expired_keys = lease.get_expired_keys(now);
1533
1534        if expired_keys.is_empty() {
1535            return Ok(vec![]);
1536        }
1537
1538        debug!(
1539            "Lease background cleanup: found {} expired keys",
1540            expired_keys.len()
1541        );
1542
1543        // Delete expired keys from storage
1544        {
1545            let mut data = self.data.write();
1546            for key in &expired_keys {
1547                data.remove(key);
1548            }
1549        }
1550
1551        // Persist to disk after batch deletion; propagate error so caller can retry
1552        self.persist_data_async().await?;
1553
1554        info!(
1555            "Lease background cleanup: deleted {} expired keys",
1556            expired_keys.len()
1557        );
1558
1559        Ok(expired_keys)
1560    }
1561}