Skip to main content

d_engine_server/storage/adaptors/file/
file_state_machine.rs

1//! File-based state machine implementation with crash recovery
2//!
3//! This module provides a durable state machine implementation using file-based storage
4//! with Write-Ahead Logging (WAL) for crash consistency.
5//!
6//! # Architecture
7//!
8//! ## Storage Components
9//!
10//! - **`state.data`**: Serialized key-value store (persisted after each apply_chunk)
11//! - **`wal.log`**: Write-Ahead Log for crash recovery (cleared after successful persistence)
12//! - **`ttl_state.bin`**: TTL manager state (persisted alongside state.data)
13//! - **`metadata.bin`**: Raft metadata (last_applied_index, last_applied_term)
14//!
15//! ## Write-Ahead Log (WAL) Design
16//!
17//! The WAL ensures crash consistency by recording operations before they are applied to
18//! in-memory state. Each WAL entry contains:
19//!
20//! ```text
21//! ┌─────────────────────────────────────────────────────────────────┐
22//! │ Entry Index (8 bytes) │ Entry Term (8 bytes) │ OpCode (1 byte) │
23//! ├─────────────────────────────────────────────────────────────────┤
24//! │ Key Length (8 bytes)  │ Key Data (N bytes)                      │
25//! ├─────────────────────────────────────────────────────────────────┤
26//! │ Value Length (8 bytes)│ Value Data (M bytes, if present)        │
27//! ├─────────────────────────────────────────────────────────────────┤
28//! │ Expire At (8 bytes, 0 = no TTL, >0 = UNIX timestamp in seconds) │
29//! └─────────────────────────────────────────────────────────────────┘
30//! ```
31//!
32//! ### TTL Semantics
33//!
34//! d-engine uses **absolute expiration time**:
35//!
36//! - When a key is created with TTL, the system calculates: `expire_at = now() + ttl_secs`
37//! - WAL stores the **absolute expiration timestamp** (UNIX seconds since epoch)
38//! - After crash recovery, expired keys are **not restored** (checked during replay)
39//! - TTL does **not reset** on restart (crash-safe)
40//!
41//! **Example:**
42//! ```text
43//! T0:  PUT key="foo", ttl=10s → expire_at = T0 + 10 = T10 (stored in WAL)
44//! T5:  CRASH
45//! T12: RESTART
46//!      → Replay WAL: expire_at = T10 < T12 (already expired)
47//!      → Key is NOT restored (correctly expired)
48//! ```
49//!
50//! **Why absolute time in WAL:**
51//! 1. Ensures expired keys stay expired after crash (durable expiration semantics)
52//! 2. Passive expiration (in get()) is crash-safe without WAL writes
53//! 3. No TTL reset on recovery (deterministic expiration)
54//!
55//! ### WAL Lifecycle
56//!
57//! WAL is the **primary** crash-safety path. Checkpoints are taken periodically
58//! (every 1000 entries or 10s) to bound replay time on recovery — not on every apply.
59//!
60//! ```text
61//! apply_chunk() → append_to_wal() → update memory → [if should_checkpoint()] → checkpoint()
62//!                                                                                  ├─ persist_data_async()
63//!                                                                                  ├─ persist_metadata_async()
64//!                                                                                  └─ clear_wal_async()
65//! ```
66//!
67//! On shutdown (`flush()`), a forced checkpoint ensures no data loss.
68//!
69//! ## Crash Recovery Flow
70//!
71//! On node startup (`new()`):
72//! 1. `load_metadata()` - Restore Raft state
73//! 2. `load_data()` - Load persisted key-value data
74//! 3. `load_ttl_data()` - Load persisted TTL state
75//! 4. `replay_wal()` - **Critical**: Replay uncommitted operations from WAL
76//!    - Restores keys AND their TTL metadata
77//!    - Ensures crash consistency (operations are idempotent)
78//!
79//! ## TTL Cleanup Strategy
80//!
81//! - **Background Cleanup**: Periodic async worker scans and deletes expired keys
82//! - **Zero Overhead**: When TTL feature is disabled, no lease components are initialized
83
84use std::collections::{HashMap, HashSet};
85use std::io::Write;
86use std::path::PathBuf;
87use std::sync::Arc;
88use std::sync::Mutex;
89use std::sync::atomic::AtomicBool;
90use std::sync::atomic::AtomicU64;
91use std::sync::atomic::Ordering;
92use std::time::SystemTime;
93
94use async_trait::async_trait;
95use bytes::Bytes;
96use d_engine_core::ApplyEntry;
97use d_engine_core::ApplyResult;
98use d_engine_core::Command;
99use d_engine_core::Error;
100use d_engine_core::Lease;
101use d_engine_core::ScanResult;
102use d_engine_core::StateMachine;
103use d_engine_core::StorageError;
104use d_engine_proto::common::LogId;
105use d_engine_proto::server::storage::SnapshotMetadata;
106use parking_lot::RwLock;
107use tokio::fs;
108use tokio::fs::File;
109use tokio::fs::OpenOptions;
110use tokio::io::AsyncReadExt;
111use tokio::io::AsyncWriteExt;
112use tokio::time::Instant;
113use tracing::debug;
114use tracing::error;
115use tracing::info;
116use tracing::warn;
117
118use crate::storage::DefaultLease;
119
120type FileStateMachineDataType = RwLock<HashMap<Bytes, (Bytes, u64)>>;
121
122/// WAL operation codes for fixed-size encoding
123#[repr(u8)]
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125enum WalOpCode {
126    Noop = 0,
127    Insert = 1,
128    Delete = 2,
129    Config = 3,
130    CompareAndSwap = 4,
131    CasFailed = 5,
132}
133
134impl WalOpCode {
135    #[allow(dead_code)]
136    fn from_str(s: &str) -> Option<Self> {
137        match s {
138            "NOOP" => Some(Self::Noop),
139            "INSERT" => Some(Self::Insert),
140            "DELETE" => Some(Self::Delete),
141            "CONFIG" => Some(Self::Config),
142            "CAS" => Some(Self::CompareAndSwap),
143            "CAS_FAILED" => Some(Self::CasFailed),
144            _ => None,
145        }
146    }
147
148    fn from_u8(byte: u8) -> Option<Self> {
149        match byte {
150            0 => Some(Self::Noop),
151            1 => Some(Self::Insert),
152            2 => Some(Self::Delete),
153            3 => Some(Self::Config),
154            4 => Some(Self::CompareAndSwap),
155            5 => Some(Self::CasFailed),
156            _ => None,
157        }
158    }
159}
160
161/// Encodes a single WAL entry into `buf`.
162///
163/// For `CompareAndSwap`, `cas_success` determines whether to write
164/// `Insert` (success, replay applies unconditionally) or
165/// `CasFailed` (failure, replay skips).
166fn encode_wal_entry(
167    buf: &mut Vec<u8>,
168    entry: &ApplyEntry,
169    cas_success: bool,
170) {
171    buf.extend_from_slice(&entry.index.to_be_bytes());
172    buf.extend_from_slice(&entry.term.to_be_bytes());
173    match &entry.command {
174        Command::Noop => {
175            buf.push(WalOpCode::Noop as u8);
176            buf.extend_from_slice(&0u64.to_be_bytes());
177            buf.extend_from_slice(&0u64.to_be_bytes());
178            buf.extend_from_slice(&0u64.to_be_bytes());
179        }
180        Command::Insert {
181            key,
182            value,
183            ttl_secs,
184        } => {
185            buf.push(WalOpCode::Insert as u8);
186            buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
187            buf.extend_from_slice(key);
188            buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
189            buf.extend_from_slice(value);
190            // expire_at is computed at WAL-encode time, not at apply time. Any delay
191            // between Raft commit and WAL write (e.g. slow I/O) causes a small under-count
192            // (key expires slightly earlier than intended). This is a known, accepted
193            // trade-off: TTL precision is best-effort, not a hard guarantee.
194            let expire_at_secs = if let Some(ttl) = ttl_secs {
195                let expire_at = std::time::SystemTime::now() + std::time::Duration::from_secs(*ttl);
196                expire_at
197                    .duration_since(std::time::UNIX_EPOCH)
198                    .map(|d| d.as_secs())
199                    .unwrap_or(0)
200            } else {
201                0
202            };
203            buf.extend_from_slice(&expire_at_secs.to_be_bytes());
204        }
205        Command::Delete { key } => {
206            buf.push(WalOpCode::Delete as u8);
207            buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
208            buf.extend_from_slice(key);
209            buf.extend_from_slice(&0u64.to_be_bytes()); // val_len = 0
210            buf.extend_from_slice(&0u64.to_be_bytes()); // expire_at = 0
211        }
212        Command::CompareAndSwap { key, value, .. } => {
213            if cas_success {
214                buf.push(WalOpCode::Insert as u8);
215                buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
216                buf.extend_from_slice(key);
217                buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
218                buf.extend_from_slice(value);
219                buf.extend_from_slice(&0u64.to_be_bytes()); // no TTL
220            } else {
221                buf.push(WalOpCode::CasFailed as u8);
222                buf.extend_from_slice(&0u64.to_be_bytes()); // key_len = 0
223                buf.extend_from_slice(&0u64.to_be_bytes()); // val_len = 0
224                buf.extend_from_slice(&0u64.to_be_bytes()); // expire_at = 0
225            }
226        }
227    }
228}
229
230/// File-based state machine implementation with persistence
231///
232/// Design principles:
233/// - All data is persisted to disk for durability
234/// - In-memory cache for fast read operations
235/// - Write-ahead logging for crash consistency
236/// - Efficient snapshot handling with file-based storage
237/// - Thread-safe with minimal lock contention
238/// - TTL support for automatic key expiration
239#[derive(Debug)]
240pub struct FileStateMachine {
241    // Key-value storage with disk persistence
242    data: FileStateMachineDataType, // (value, term)
243
244    // Lease management for automatic key expiration
245    // DefaultLease is thread-safe internally (uses DashMap + Mutex)
246    // Injected by NodeBuilder after construction
247    lease: Option<Arc<DefaultLease>>,
248
249    /// Whether lease manager is enabled (immutable after init)
250    /// Set to true when lease is injected, never changes after that
251    ///
252    /// Invariant: lease_enabled == true ⟹ lease.is_some()
253    /// Performance: Allows safe unwrap_unchecked in hot paths
254    lease_enabled: bool,
255
256    // Raft state with disk persistence
257    last_applied_index: AtomicU64,
258    last_applied_term: AtomicU64,
259    last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
260
261    // Operational state
262    running: AtomicBool,
263
264    // Checkpoint state: WAL is the primary persistence path; checkpoint snapshots
265    // data periodically to bound crash-recovery replay time.
266    wal_entries_since_checkpoint: AtomicU64,
267    last_checkpoint: Mutex<Instant>,
268
269    // File handles for persistence
270    data_dir: PathBuf,
271    // data_file: RwLock<File>,
272    // metadata_file: RwLock<File>,
273    // wal_file: RwLock<File>, // Write-ahead log for crash recovery
274}
275
276impl FileStateMachine {
277    /// Creates a new file-based state machine with persistence
278    ///
279    /// Lease will be injected by NodeBuilder after construction.
280    ///
281    /// # Arguments
282    /// * `data_dir` - Directory where data files will be stored
283    ///
284    /// # Returns
285    /// Result containing the initialized FileStateMachine
286    pub async fn new(data_dir: PathBuf) -> Result<Self, Error> {
287        // Ensure data directory exists
288        fs::create_dir_all(&data_dir).await?;
289
290        let machine = Self {
291            data: RwLock::new(HashMap::new()),
292            lease: None,          // Will be injected by NodeBuilder
293            lease_enabled: false, // Default: no lease until set
294            last_applied_index: AtomicU64::new(0),
295            last_applied_term: AtomicU64::new(0),
296            last_snapshot_metadata: RwLock::new(None),
297            running: AtomicBool::new(true),
298            wal_entries_since_checkpoint: AtomicU64::new(0),
299            last_checkpoint: Mutex::new(Instant::now()),
300            data_dir: data_dir.clone(),
301        };
302
303        // Load existing data from disk
304        machine.load_from_disk().await?;
305
306        Ok(machine)
307    }
308
309    /// Sets the lease manager for this state machine.
310    ///
311    /// This is an internal method called by NodeBuilder during initialization.
312    /// The lease will also be restored from snapshot during `apply_snapshot_from_file()`.
313    /// Also available for testing and benchmarks.
314    pub fn set_lease(
315        &mut self,
316        lease: Arc<DefaultLease>,
317    ) {
318        // Mark lease as enabled (immutable after this point)
319        self.lease_enabled = true;
320        self.lease = Some(lease);
321    }
322
323    /// Injects lease configuration into this state machine.
324    ///
325    /// Framework-internal method: called by NodeBuilder::build() during initialization.
326    /// Loads state machine data from disk files
327    async fn load_from_disk(&self) -> Result<(), Error> {
328        // Load last applied index and term from metadata file
329        self.load_metadata().await?;
330
331        // Load key-value data from data file
332        self.load_data().await?;
333
334        // Load TTL data from disk
335        self.load_ttl_data().await?;
336
337        // Replay write-ahead log for crash recovery
338        self.replay_wal().await?;
339
340        info!("Loaded state machine data from disk");
341        Ok(())
342    }
343
344    /// Loads TTL data from disk (if lease is configured)
345    async fn load_ttl_data(&self) -> Result<(), Error> {
346        // Lease will be injected by NodeBuilder later
347        // The lease data will be loaded after injection
348        // For now, just skip this step during construction
349        Ok(())
350    }
351
352    /// Loads TTL data into the configured lease
353    ///
354    /// Called after NodeBuilder injects the lease.
355    /// Also available for testing and benchmarks.
356    pub async fn load_lease_data(&self) -> Result<(), Error> {
357        let Some(ref lease) = self.lease else {
358            return Ok(()); // No lease configured
359        };
360
361        let ttl_path = self.data_dir.join("ttl_state.bin");
362        if !ttl_path.exists() {
363            debug!("No TTL state file found");
364            return Ok(());
365        }
366
367        let ttl_data = tokio::fs::read(&ttl_path).await?;
368        lease.reload(&ttl_data)?;
369
370        info!("Loaded TTL state from disk: {} active TTLs", lease.len());
371        Ok(())
372    }
373
374    /// Loads metadata from disk
375    async fn load_metadata(&self) -> Result<(), Error> {
376        let metadata_path = self.data_dir.join("metadata.bin");
377        if !metadata_path.exists() {
378            return Ok(());
379        }
380
381        let mut file = File::open(metadata_path).await?;
382        let mut buffer = [0u8; 16];
383
384        if file.read_exact(&mut buffer).await.is_ok() {
385            let index = u64::from_be_bytes([
386                buffer[0], buffer[1], buffer[2], buffer[3], buffer[4], buffer[5], buffer[6],
387                buffer[7],
388            ]);
389
390            let term = u64::from_be_bytes([
391                buffer[8], buffer[9], buffer[10], buffer[11], buffer[12], buffer[13], buffer[14],
392                buffer[15],
393            ]);
394
395            self.last_applied_index.store(index, Ordering::SeqCst);
396            self.last_applied_term.store(term, Ordering::SeqCst);
397        }
398
399        Ok(())
400    }
401
402    /// Loads key-value data from disk
403    async fn load_data(&self) -> Result<(), Error> {
404        let data_path = self.data_dir.join("state.data");
405        if !data_path.exists() {
406            return Ok(());
407        }
408
409        let mut file = File::open(data_path).await?;
410        let mut buffer = Vec::new();
411        file.read_to_end(&mut buffer).await?;
412
413        let mut pos = 0;
414        let mut data = self.data.write();
415
416        while pos < buffer.len() {
417            // Read key length
418            if pos + 8 > buffer.len() {
419                break;
420            }
421
422            let key_len_bytes = &buffer[pos..pos + 8];
423            let key_len = u64::from_be_bytes([
424                key_len_bytes[0],
425                key_len_bytes[1],
426                key_len_bytes[2],
427                key_len_bytes[3],
428                key_len_bytes[4],
429                key_len_bytes[5],
430                key_len_bytes[6],
431                key_len_bytes[7],
432            ]) as usize;
433
434            pos += 8;
435
436            // Read key
437            if pos + key_len > buffer.len() {
438                break;
439            }
440
441            let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
442            pos += key_len;
443
444            // Read value length
445            if pos + 8 > buffer.len() {
446                break;
447            }
448
449            let value_len_bytes = &buffer[pos..pos + 8];
450            let value_len = u64::from_be_bytes([
451                value_len_bytes[0],
452                value_len_bytes[1],
453                value_len_bytes[2],
454                value_len_bytes[3],
455                value_len_bytes[4],
456                value_len_bytes[5],
457                value_len_bytes[6],
458                value_len_bytes[7],
459            ]) as usize;
460
461            pos += 8;
462
463            // Read value
464            if pos + value_len > buffer.len() {
465                break;
466            }
467
468            let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
469            pos += value_len;
470
471            // Read term
472            if pos + 8 > buffer.len() {
473                break;
474            }
475
476            let term_bytes = &buffer[pos..pos + 8];
477            let term = u64::from_be_bytes([
478                term_bytes[0],
479                term_bytes[1],
480                term_bytes[2],
481                term_bytes[3],
482                term_bytes[4],
483                term_bytes[5],
484                term_bytes[6],
485                term_bytes[7],
486            ]);
487
488            pos += 8;
489
490            // Store in memory
491            data.insert(key, (value, term));
492        }
493
494        Ok(())
495    }
496
497    /// Replays write-ahead log for crash recovery
498    async fn replay_wal(&self) -> Result<(), Error> {
499        let wal_path = self.data_dir.join("wal.log");
500        if !wal_path.exists() {
501            debug!("No WAL file found, skipping replay");
502            return Ok(());
503        }
504
505        let mut file = File::open(wal_path).await?;
506        let mut buffer = Vec::new();
507        file.read_to_end(&mut buffer).await?;
508
509        if buffer.is_empty() {
510            debug!("WAL file is empty, skipping replay");
511            return Ok(());
512        }
513
514        let mut pos = 0;
515        let mut operations = Vec::new();
516        let mut replayed_count = 0;
517
518        while pos + 17 < buffer.len() {
519            // Read entry index (8 bytes)
520            let _index = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
521            pos += 8;
522
523            // Read entry term (8 bytes)
524            let term = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
525            pos += 8;
526
527            // Read operation code (1 byte)
528            let op_code =
529                WalOpCode::from_u8(buffer[pos]).ok_or_else(|| StorageError::DataCorruption {
530                    location: format!("WAL opcode {} at byte offset {}", buffer[pos], pos),
531                })?;
532            pos += 1;
533
534            // Check if we have enough bytes for key length
535            if pos + 8 > buffer.len() {
536                warn!("Incomplete key length at position {}, stopping replay", pos);
537                break;
538            }
539
540            // Read key length (8 bytes)
541            let key_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
542            pos += 8;
543
544            // Check if we have enough data for the key
545            if pos + key_len > buffer.len() {
546                warn!(
547                    "Incomplete key data at position {} (need {} bytes, have {})",
548                    pos,
549                    key_len,
550                    buffer.len() - pos
551                );
552                break;
553            }
554
555            // Read key
556            let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
557            pos += key_len;
558
559            // Check if we have enough bytes for value length
560            if pos + 8 > buffer.len() {
561                warn!(
562                    "Incomplete value length at position {}, stopping replay",
563                    pos
564                );
565                break;
566            }
567
568            // Read value length (8 bytes)
569            let value_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
570            pos += 8;
571
572            // Read value if present
573            let value = if value_len > 0 {
574                if pos + value_len > buffer.len() {
575                    warn!("Incomplete value data at position {}, stopping replay", pos);
576                    break;
577                }
578                let value_data = Bytes::from(buffer[pos..pos + value_len].to_vec());
579                pos += value_len;
580                Some(value_data)
581            } else {
582                None
583            };
584
585            // Read absolute expiration time (8 bytes) - 0 means no TTL
586            //
587            // WAL Format Migration Path:
588            // - Old format (pre-v0.2.0): ttl_secs (u32, 4 bytes, relative time)
589            // - New format (v0.2.0+): expire_at_secs (u64, 8 bytes, absolute time)
590            //
591            // Backward Compatibility Strategy:
592            // Since this is a breaking change and d-engine has not been deployed to production,
593            // we do NOT support reading old WAL format. All WAL entries must use the new format.
594            // If upgrading from pre-v0.2.0, users must:
595            // 1. Gracefully stop the old version (persists state.data + ttl_state.bin)
596            // 2. Upgrade to v0.2.0+
597            // 3. Start the new version (loads from persisted state, not WAL)
598            // Truncated tail: stop here rather than continuing with garbage bytes.
599            // A partial write at the WAL tail is expected on a crash; entries after
600            // the truncation point are discarded and will be re-applied via Raft.
601            if pos + 8 > buffer.len() {
602                warn!(
603                    "Incomplete WAL entry at byte offset {} (expected expire_at field), truncating replay here",
604                    pos
605                );
606                break;
607            }
608            let secs = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
609            pos += 8;
610            let expire_at_secs = if secs > 0 { Some(secs) } else { None };
611
612            operations.push((op_code, key, value, term, expire_at_secs));
613            replayed_count += 1;
614        }
615
616        info!(
617            "Parsed {} WAL operations, applying to memory",
618            operations.len()
619        );
620
621        // Apply all collected operations with a single lock acquisition
622        let mut applied_count = 0;
623        let mut skipped_expired = 0;
624        let now = std::time::SystemTime::now();
625        {
626            let mut data = self.data.write();
627
628            for (op_code, key, value, term, expire_at_secs) in operations {
629                match op_code {
630                    WalOpCode::Insert => {
631                        if let Some(value_data) = value {
632                            // Check if key is already expired (crash-safe TTL semantics)
633                            let is_expired = if let Some(secs) = expire_at_secs {
634                                let expire_at =
635                                    std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs);
636                                now >= expire_at
637                            } else {
638                                false
639                            };
640
641                            if is_expired {
642                                // Skip restoring expired keys (durable expiration semantics)
643                                debug!("Skipped expired key during WAL replay: key={:?}", key);
644                                skipped_expired += 1;
645                                continue;
646                            }
647
648                            data.insert(key.clone(), (value_data, term));
649
650                            // Restore TTL from WAL (if lease configured and has expiration)
651                            if let Some(secs) = expire_at_secs {
652                                if let Some(ref lease) = self.lease {
653                                    let expire_at = std::time::UNIX_EPOCH
654                                        + std::time::Duration::from_secs(secs);
655                                    let remaining = expire_at
656                                        .duration_since(now)
657                                        .map(|d| d.as_secs())
658                                        .unwrap_or(0);
659
660                                    if remaining > 0 {
661                                        lease.register(key.clone(), remaining);
662                                        debug!(
663                                            "Replayed INSERT with TTL: key={:?}, remaining={}s",
664                                            key, remaining
665                                        );
666                                    }
667                                }
668                            } else {
669                                debug!("Replayed INSERT: key={:?}", key);
670                            }
671
672                            applied_count += 1;
673                        } else {
674                            warn!("INSERT operation without value");
675                        }
676                    }
677                    WalOpCode::Delete => {
678                        data.remove(&key);
679                        if let Some(ref lease) = self.lease {
680                            lease.unregister(&key);
681                        }
682                        applied_count += 1;
683                        debug!("Replayed DELETE: key={:?}", key);
684                    }
685                    WalOpCode::CompareAndSwap => {
686                        // Legacy WAL format from before the CAS outcome fix.
687                        // Current code writes CAS results as Insert (success) or Noop (failure),
688                        // so this branch is only reached when replaying WAL files created before
689                        // the fix. Apply new_value unconditionally as best-effort.
690                        if let Some(new_value) = value {
691                            data.insert(key.clone(), (new_value, term));
692                            applied_count += 1;
693                            debug!("Replayed legacy CAS: key={:?}", key);
694                        } else {
695                            warn!("Legacy CAS WAL entry missing new_value");
696                        }
697                    }
698                    WalOpCode::Noop | WalOpCode::Config | WalOpCode::CasFailed => {
699                        // No data modification needed
700                        applied_count += 1;
701                        debug!("Replayed {:?} operation", op_code);
702                    }
703                }
704            }
705        }
706
707        info!(
708            "WAL replay complete: {} operations replayed, {} applied, {} expired keys skipped",
709            replayed_count, applied_count, skipped_expired
710        );
711
712        // Unconditionally clear WAL after replay. load_data() already restored the last
713        // checkpoint; WAL is only the post-checkpoint delta. Even if 0 entries were applied
714        // (e.g. truncated tail only), the WAL is stale. Keeping it would cause infinite
715        // replay-of-the-same-truncated-entry on every subsequent startup.
716        self.clear_wal_async().await?;
717        debug!(
718            "Cleared WAL after replay ({} operations applied)",
719            applied_count
720        );
721
722        Ok(())
723    }
724
725    /// Persists key-value data to disk
726    fn persist_data(&self) -> Result<(), Error> {
727        // Collect data first to minimize lock time
728        let data_copy: HashMap<Bytes, (Bytes, u64)> = {
729            let data = self.data.read();
730            data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
731        };
732
733        // Batch serialize into a single buffer — mirrors persist_data_async().
734        let data_path = self.data_dir.join("state.data");
735
736        let estimated: usize =
737            data_copy.iter().map(|(k, (v, _))| 8 + k.len() + 8 + v.len() + 8).sum();
738        let mut buf = Vec::with_capacity(estimated);
739
740        for (key, (value, term)) in &data_copy {
741            buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
742            buf.extend_from_slice(key);
743            buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
744            buf.extend_from_slice(value);
745            buf.extend_from_slice(&term.to_be_bytes());
746        }
747
748        std::fs::write(data_path, buf)?;
749        Ok(())
750    }
751
752    /// Persists key-value data to disk
753    async fn persist_data_async(&self) -> Result<(), Error> {
754        // Collect data under minimal lock scope
755        let data_copy: HashMap<Bytes, (Bytes, u64)> = {
756            let data = self.data.read();
757            data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
758        };
759
760        let data_path = self.data_dir.join("state.data");
761        let mut file = OpenOptions::new()
762            .write(true)
763            .create(true)
764            .truncate(true)
765            .open(data_path)
766            .await?;
767
768        // Batch serialize into a single buffer — eliminates per-entry async yield overhead.
769        // Mirrors append_to_wal's approach for consistent I/O pattern.
770        let estimated: usize =
771            data_copy.iter().map(|(k, (v, _))| 8 + k.len() + 8 + v.len() + 8).sum();
772        let mut buf = Vec::with_capacity(estimated);
773
774        for (key, (value, term)) in &data_copy {
775            buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
776            buf.extend_from_slice(key);
777            buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
778            buf.extend_from_slice(value);
779            buf.extend_from_slice(&term.to_be_bytes());
780        }
781
782        file.write_all(&buf).await?;
783        file.flush().await?;
784
785        Ok(())
786    }
787
788    /// Persists metadata to disk
789    fn persist_metadata(&self) -> Result<(), Error> {
790        let metadata_path = self.data_dir.join("metadata.bin");
791        let mut file = std::fs::OpenOptions::new()
792            .write(true)
793            .create(true)
794            .truncate(true)
795            .open(metadata_path)?;
796
797        let index = self.last_applied_index.load(Ordering::SeqCst);
798        let term = self.last_applied_term.load(Ordering::SeqCst);
799
800        file.write_all(&index.to_be_bytes())?;
801        file.write_all(&term.to_be_bytes())?;
802
803        file.flush()?;
804        Ok(())
805    }
806
807    async fn persist_metadata_async(&self) -> Result<(), Error> {
808        let metadata_path = self.data_dir.join("metadata.bin");
809        let mut file = OpenOptions::new()
810            .write(true)
811            .create(true)
812            .truncate(true)
813            .open(metadata_path)
814            .await?;
815
816        let index = self.last_applied_index.load(Ordering::SeqCst);
817        let term = self.last_applied_term.load(Ordering::SeqCst);
818
819        file.write_all(&index.to_be_bytes()).await?;
820        file.write_all(&term.to_be_bytes()).await?;
821
822        file.flush().await?;
823        Ok(())
824    }
825
826    /// Clears the write-ahead log (called after successful persistence)
827    #[allow(unused)]
828    fn clear_wal(&self) -> Result<(), Error> {
829        let wal_path = self.data_dir.join("wal.log");
830        let mut file = std::fs::OpenOptions::new()
831            .write(true)
832            .create(true)
833            .truncate(true)
834            .open(wal_path)?;
835
836        file.set_len(0)?;
837        file.flush()?;
838        Ok(())
839    }
840
841    /// Clears the write-ahead log (called after successful persistence)
842    async fn clear_wal_async(&self) -> Result<(), Error> {
843        let wal_path = self.data_dir.join("wal.log");
844        let mut file = OpenOptions::new()
845            .write(true)
846            .create(true)
847            .truncate(true)
848            .open(wal_path)
849            .await?;
850
851        file.set_len(0).await?;
852        file.flush().await?;
853        Ok(())
854    }
855
856    /// Returns true if a checkpoint should be taken now.
857    ///
858    /// Triggers on any of:
859    /// - WAL has accumulated ≥ 1000 entries since last checkpoint
860    /// - Last checkpoint was > 10s ago
861    fn should_checkpoint(&self) -> bool {
862        const WAL_ENTRY_THRESHOLD: u64 = 1000;
863        const TIME_THRESHOLD_SECS: u64 = 10;
864
865        if self.wal_entries_since_checkpoint.load(Ordering::Relaxed) >= WAL_ENTRY_THRESHOLD {
866            return true;
867        }
868        if let Ok(last) = self.last_checkpoint.lock() {
869            return last.elapsed().as_secs() >= TIME_THRESHOLD_SECS;
870        }
871        false
872    }
873
874    /// Checkpoint: snapshot full data + metadata to disk, then clear WAL.
875    ///
876    /// WAL is the primary crash-safety path; checkpoint bounds recovery replay time.
877    pub(crate) async fn checkpoint(&self) -> Result<(), Error> {
878        self.persist_data_async().await?;
879        self.persist_metadata_async().await?;
880        self.clear_wal_async().await?;
881
882        self.wal_entries_since_checkpoint.store(0, Ordering::Relaxed);
883        if let Ok(mut last) = self.last_checkpoint.lock() {
884            *last = Instant::now();
885        }
886        debug!("Checkpoint complete");
887        Ok(())
888    }
889
890    /// Resets the state machine to its initial empty state
891    ///
892    /// This method:
893    /// 1. Clears all in-memory data
894    /// 2. Resets Raft state to initial values
895    /// 3. Clears all persisted files
896    /// 4. Maintains operational state (running status, node ID)
897    pub async fn reset(&self) -> Result<(), Error> {
898        info!("Resetting state machine");
899
900        // Clear in-memory data
901        {
902            let mut data = self.data.write();
903            data.clear();
904        }
905
906        // Reset Raft state
907        self.last_applied_index.store(0, Ordering::SeqCst);
908        self.last_applied_term.store(0, Ordering::SeqCst);
909
910        {
911            let mut snapshot_metadata = self.last_snapshot_metadata.write();
912            *snapshot_metadata = None;
913        }
914
915        // Clear all persisted files
916        self.clear_data_file().await?;
917        self.clear_metadata_file().await?;
918        self.clear_wal_async().await?;
919
920        info!("State machine reset completed");
921        Ok(())
922    }
923
924    /// Clears the data file
925    async fn clear_data_file(&self) -> Result<(), Error> {
926        let data_path = self.data_dir.join("state.data");
927        let mut file = OpenOptions::new()
928            .write(true)
929            .create(true)
930            .truncate(true)
931            .open(data_path)
932            .await?;
933
934        file.set_len(0).await?;
935        file.flush().await?;
936        Ok(())
937    }
938
939    /// Clears the metadata file
940    async fn clear_metadata_file(&self) -> Result<(), Error> {
941        let metadata_path = self.data_dir.join("metadata.bin");
942        let mut file = OpenOptions::new()
943            .write(true)
944            .create(true)
945            .truncate(true)
946            .open(metadata_path)
947            .await?;
948
949        // Write default values (0 for both index and term)
950        file.write_all(&0u64.to_be_bytes()).await?;
951        file.write_all(&0u64.to_be_bytes()).await?;
952
953        file.flush().await?;
954        Ok(())
955    }
956
957    /// Batch WAL writes with proper durability guarantees.
958    ///
959    /// `cas_outcomes[i]` is consulted only when `entries[i].command` is
960    /// `CompareAndSwap`. Successful CAS is recorded as `Insert`; failed CAS as
961    /// `Noop`, so replay never applies a CAS that failed at runtime.
962    ///
963    /// Format per entry:
964    ///
965    /// - 8 bytes: entry index (big-endian u64)
966    /// - 8 bytes: entry term (big-endian u64)
967    /// - 1 byte: operation code (0=NOOP, 1=INSERT, 2=DELETE, 3=CONFIG)
968    /// - 8 bytes: key length (big-endian u64)
969    /// - N bytes: key data
970    /// - 8 bytes: value length (big-endian u64, 0 if no value)
971    /// - M bytes: value data (only if length > 0)
972    #[cfg(test)]
973    pub(crate) async fn append_to_wal(
974        &self,
975        entries: &[ApplyEntry],
976        cas_outcomes: &[bool],
977    ) -> Result<(), Error> {
978        assert_eq!(
979            entries.len(),
980            cas_outcomes.len(),
981            "cas_outcomes must have the same length as entries"
982        );
983        if entries.is_empty() {
984            return Ok(());
985        }
986
987        let wal_path = self.data_dir.join("wal.log");
988        let mut file =
989            OpenOptions::new().write(true).create(true).append(true).open(&wal_path).await?;
990
991        let mut batch_buffer = Vec::with_capacity(entries.len() * 64);
992        for (i, entry) in entries.iter().enumerate() {
993            encode_wal_entry(&mut batch_buffer, entry, cas_outcomes[i]);
994        }
995
996        file.write_all(&batch_buffer).await?;
997        file.flush().await?;
998
999        Ok(())
1000    }
1001}
1002
1003impl Drop for FileStateMachine {
1004    fn drop(&mut self) {
1005        let timer = Instant::now();
1006
1007        // Save state into local database including flush operation
1008        match self.save_hard_state() {
1009            Ok(_) => debug!("StateMachine saved in {:?}", timer.elapsed()),
1010            Err(e) => error!("Failed to save StateMachine: {}", e),
1011        }
1012    }
1013}
1014
1015#[async_trait]
1016impl StateMachine for FileStateMachine {
1017    async fn start(&self) -> Result<(), Error> {
1018        self.running.store(true, Ordering::SeqCst);
1019
1020        // Load persisted lease data if configured
1021        if self.lease.is_some() {
1022            self.load_lease_data().await?;
1023            debug!("Lease data loaded during state machine initialization");
1024        }
1025
1026        info!("File state machine started");
1027        Ok(())
1028    }
1029
1030    fn stop(&self) -> Result<(), Error> {
1031        // Ensure all data is flushed to disk before stopping
1032        self.running.store(false, Ordering::SeqCst);
1033
1034        // Graceful shutdown: persist TTL state to disk
1035        // This ensures lease data survives across restarts
1036        if let Some(ref lease) = self.lease {
1037            let ttl_snapshot = lease.to_snapshot();
1038            let ttl_path = self.data_dir.join("ttl_state.bin");
1039            // Use blocking write since stop() is sync
1040            std::fs::write(&ttl_path, ttl_snapshot)
1041                .map_err(d_engine_core::StorageError::IoError)?;
1042            debug!("Persisted TTL state on shutdown");
1043        }
1044
1045        info!("File state machine stopped");
1046        Ok(())
1047    }
1048
1049    fn is_running(&self) -> bool {
1050        self.running.load(Ordering::SeqCst)
1051    }
1052
1053    fn get(
1054        &self,
1055        key_buffer: &[u8],
1056    ) -> Result<Option<Bytes>, Error> {
1057        // Lazy cleanup: only check expiration in Lazy strategy
1058        // Background strategy handles cleanup in dedicated async task
1059        // Background cleanup strategy: expired keys are cleaned by background worker
1060        // No on-read checks needed (simplifies get() hot path)
1061        let data = self.data.read();
1062        Ok(data.get(key_buffer).map(|(value, _)| value.clone()))
1063    }
1064
1065    fn entry_term(
1066        &self,
1067        entry_id: u64,
1068    ) -> Option<u64> {
1069        let data = self.data.read();
1070        data.values().find(|(_, index)| *index == entry_id).map(|(_, term)| *term)
1071    }
1072
1073    /// Thread-safe: called serially by single-task CommitHandler
1074    async fn apply_chunk(
1075        &self,
1076        chunk: &[ApplyEntry],
1077    ) -> Result<Vec<ApplyResult>, Error> {
1078        let chunk_len = chunk.len();
1079        let mut results = Vec::with_capacity(chunk_len);
1080        let mut highest_log_id: Option<LogId> = None;
1081
1082        // Validate ordering — Command is already decoded, no proto work here
1083        for entry in chunk {
1084            if let Some(prev) = &highest_log_id {
1085                assert!(
1086                    entry.index > prev.index,
1087                    "apply_chunk: received unordered entry at index {} (prev={})",
1088                    entry.index,
1089                    prev.index
1090                );
1091            }
1092            highest_log_id = Some(LogId {
1093                index: entry.index,
1094                term: entry.term,
1095            });
1096            info!("COMMITTED_LOG_METRIC: {}", entry.index);
1097        }
1098
1099        // PRE-PHASE + PHASE 2 combined: evaluate outcomes and encode WAL in one pass.
1100        //
1101        // WAL records outcomes, not intents. We simulate in chunk order so earlier
1102        // entries are visible to later CAS — matching Raft's serial apply semantics.
1103        //
1104        // Key-scoped snapshot: only CAS entries need to read the current value.
1105        // We fetch only CAS-referenced keys from self.data (read lock held briefly)
1106        // and track chunk-level mutations in a small `delta` map, avoiding a full
1107        // HashMap clone regardless of data set size.
1108        //
1109        // Encoding happens in the same loop as evaluation, eliminating the second
1110        // pass over chunk that a separate append_to_wal call would require.
1111        let (cas_outcomes, wal_buf) = {
1112            // Collect keys that any CAS in this chunk will compare against.
1113            let cas_keys: HashSet<&Bytes> = chunk
1114                .iter()
1115                .filter_map(|e| match &e.command {
1116                    Command::CompareAndSwap { key, .. } => Some(key),
1117                    _ => None,
1118                })
1119                .collect();
1120
1121            // Fetch only those keys; read lock released at end of this block.
1122            let base: HashMap<Bytes, (Bytes, u64)> = {
1123                let guard = self.data.read();
1124                cas_keys
1125                    .iter()
1126                    .filter_map(|k| guard.get(k.as_ref()).map(|v| ((*k).clone(), v.clone())))
1127                    .collect()
1128            };
1129
1130            // delta tracks mutations made by earlier entries in this chunk.
1131            // None = deleted by an earlier Delete in the same chunk.
1132            let mut delta: HashMap<Bytes, Option<(Bytes, u64)>> = HashMap::new();
1133            let mut wal_buf: Vec<u8> = Vec::with_capacity(chunk.len() * 64);
1134
1135            let outcomes: Vec<bool> = chunk
1136                .iter()
1137                .map(|entry| {
1138                    let success = match &entry.command {
1139                        Command::Insert { key, value, .. } => {
1140                            delta.insert(key.clone(), Some((value.clone(), entry.term)));
1141                            false
1142                        }
1143                        Command::Delete { key } => {
1144                            delta.insert(key.clone(), None);
1145                            false
1146                        }
1147                        Command::CompareAndSwap {
1148                            key,
1149                            expected,
1150                            value: new_value,
1151                        } => {
1152                            // Look up current value: delta (this chunk) takes precedence over base.
1153                            let current =
1154                                delta.get(key).map(|v| v.as_ref()).unwrap_or_else(|| base.get(key));
1155                            let success = match (current, expected) {
1156                                (Some((c, _)), Some(e)) => c.as_ref() == e.as_ref(),
1157                                (None, None) => true,
1158                                _ => false,
1159                            };
1160                            if success {
1161                                delta.insert(key.clone(), Some((new_value.clone(), entry.term)));
1162                            }
1163                            success
1164                        }
1165                        Command::Noop => false,
1166                    };
1167                    encode_wal_entry(&mut wal_buf, entry, success);
1168                    success
1169                })
1170                .collect();
1171
1172            (outcomes, wal_buf)
1173        };
1174
1175        // PHASE 2: single fsync with buffer built during evaluation above.
1176        // Successful CAS → WalOpCode::Insert; failed CAS → WalOpCode::CasFailed.
1177        // Note: wal_buf is never empty here in practice — every entry encodes at least
1178        // 41 bytes — but the guard is kept as a cheap defensive check.
1179        if !wal_buf.is_empty() {
1180            let wal_path = self.data_dir.join("wal.log");
1181            let mut file =
1182                OpenOptions::new().write(true).create(true).append(true).open(&wal_path).await?;
1183            file.write_all(&wal_buf).await?;
1184            file.flush().await?;
1185        }
1186
1187        // PHASE 3: Fast in-memory updates with minimal lock time
1188        // (CAS uses pre-computed outcomes — no re-evaluation under write lock)
1189        {
1190            let mut data = self.data.write();
1191
1192            for (entry, &cas_success) in chunk.iter().zip(cas_outcomes.iter()) {
1193                match &entry.command {
1194                    Command::Noop => {
1195                        results.push(ApplyResult::success(entry.index));
1196                    }
1197                    Command::Insert {
1198                        key,
1199                        value,
1200                        ttl_secs,
1201                    } => {
1202                        data.insert(key.clone(), (value.clone(), entry.term));
1203                        if let Some(ttl) = ttl_secs {
1204                            if !self.lease_enabled {
1205                                return Err(StorageError::FeatureNotEnabled(
1206                                    "TTL feature is not enabled on this server. \
1207                                     Enable it in config: [raft.state_machine.lease] enabled = true"
1208                                        .into(),
1209                                )
1210                                .into());
1211                            }
1212                            // Safety: lease_enabled invariant ensures lease.is_some()
1213                            let lease = unsafe { self.lease.as_ref().unwrap_unchecked() };
1214                            lease.register(key.clone(), *ttl);
1215                        }
1216                        results.push(ApplyResult::success(entry.index));
1217                    }
1218                    Command::Delete { key } => {
1219                        data.remove(key.as_ref());
1220                        if let Some(ref lease) = self.lease {
1221                            lease.unregister(key);
1222                        }
1223                        results.push(ApplyResult::success(entry.index));
1224                    }
1225                    Command::CompareAndSwap {
1226                        key,
1227                        value: new_value,
1228                        ..
1229                    } => {
1230                        debug!(
1231                            "CAS at index {}: key={:?}, success={}",
1232                            entry.index,
1233                            String::from_utf8_lossy(key),
1234                            cas_success
1235                        );
1236                        results.push(if cas_success {
1237                            ApplyResult::success(entry.index)
1238                        } else {
1239                            ApplyResult::failure(entry.index)
1240                        });
1241                        if cas_success {
1242                            data.insert(key.clone(), (new_value.clone(), entry.term));
1243                        }
1244                    }
1245                }
1246            }
1247        } // Lock released immediately - no awaits inside!
1248
1249        // PHASE 4: Update last applied index and conditionally checkpoint.
1250        // WAL (written in PHASE 2) is the primary crash-safety path.
1251        // Checkpoint snapshots full data periodically to bound WAL replay time on recovery.
1252        if let Some(log_id) = highest_log_id {
1253            debug!("State machine - updated last_applied: {:?}", log_id);
1254            self.update_last_applied(log_id);
1255        }
1256
1257        self.wal_entries_since_checkpoint.fetch_add(chunk_len as u64, Ordering::Relaxed);
1258
1259        if self.should_checkpoint() {
1260            self.checkpoint().await?;
1261        }
1262
1263        Ok(results)
1264    }
1265
1266    fn len(&self) -> usize {
1267        self.data.read().len()
1268    }
1269
1270    fn update_last_applied(
1271        &self,
1272        last_applied: LogId,
1273    ) {
1274        self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
1275        self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
1276    }
1277
1278    fn last_applied(&self) -> LogId {
1279        LogId {
1280            index: self.last_applied_index.load(Ordering::SeqCst),
1281            term: self.last_applied_term.load(Ordering::SeqCst),
1282        }
1283    }
1284
1285    fn persist_last_applied(
1286        &self,
1287        last_applied: LogId,
1288    ) -> Result<(), Error> {
1289        self.update_last_applied(last_applied);
1290        self.persist_metadata()
1291    }
1292
1293    fn update_last_snapshot_metadata(
1294        &self,
1295        snapshot_metadata: &SnapshotMetadata,
1296    ) -> Result<(), Error> {
1297        *self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
1298        Ok(())
1299    }
1300
1301    fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
1302        self.last_snapshot_metadata.read().clone()
1303    }
1304
1305    fn persist_last_snapshot_metadata(
1306        &self,
1307        snapshot_metadata: &SnapshotMetadata,
1308    ) -> Result<(), Error> {
1309        self.update_last_snapshot_metadata(snapshot_metadata)
1310    }
1311
1312    async fn apply_snapshot_from_file(
1313        &self,
1314        metadata: &SnapshotMetadata,
1315        snapshot_dir: std::path::PathBuf,
1316    ) -> Result<(), Error> {
1317        info!("Applying snapshot from file: {:?}", snapshot_dir);
1318
1319        // Read from the snapshot.bin file inside the directory
1320        let snapshot_data_path = snapshot_dir.join("snapshot.bin");
1321        let mut file = File::open(snapshot_data_path).await?;
1322        let mut buffer = Vec::new();
1323        file.read_to_end(&mut buffer).await?;
1324
1325        // Parse snapshot data
1326        let mut pos = 0;
1327        let mut new_data = HashMap::new();
1328
1329        while pos < buffer.len() {
1330            // Read key length
1331            if pos + 8 > buffer.len() {
1332                break;
1333            }
1334
1335            let key_len_bytes = &buffer[pos..pos + 8];
1336            let key_len = u64::from_be_bytes([
1337                key_len_bytes[0],
1338                key_len_bytes[1],
1339                key_len_bytes[2],
1340                key_len_bytes[3],
1341                key_len_bytes[4],
1342                key_len_bytes[5],
1343                key_len_bytes[6],
1344                key_len_bytes[7],
1345            ]) as usize;
1346
1347            pos += 8;
1348
1349            // Read key
1350            if pos + key_len > buffer.len() {
1351                break;
1352            }
1353
1354            let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
1355            pos += key_len;
1356
1357            // Read value length
1358            if pos + 8 > buffer.len() {
1359                break;
1360            }
1361
1362            let value_len_bytes = &buffer[pos..pos + 8];
1363            let value_len = u64::from_be_bytes([
1364                value_len_bytes[0],
1365                value_len_bytes[1],
1366                value_len_bytes[2],
1367                value_len_bytes[3],
1368                value_len_bytes[4],
1369                value_len_bytes[5],
1370                value_len_bytes[6],
1371                value_len_bytes[7],
1372            ]) as usize;
1373
1374            pos += 8;
1375
1376            // Read value
1377            if pos + value_len > buffer.len() {
1378                break;
1379            }
1380
1381            let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
1382            pos += value_len;
1383
1384            // Read term
1385            if pos + 8 > buffer.len() {
1386                break;
1387            }
1388
1389            let term_bytes = &buffer[pos..pos + 8];
1390            let term = u64::from_be_bytes([
1391                term_bytes[0],
1392                term_bytes[1],
1393                term_bytes[2],
1394                term_bytes[3],
1395                term_bytes[4],
1396                term_bytes[5],
1397                term_bytes[6],
1398                term_bytes[7],
1399            ]);
1400
1401            pos += 8;
1402
1403            // Add to new data
1404            new_data.insert(key, (value, term));
1405        }
1406
1407        // Read and reload lease data if present
1408        if pos + 8 <= buffer.len() {
1409            let ttl_len_bytes = &buffer[pos..pos + 8];
1410            let ttl_len = u64::from_be_bytes([
1411                ttl_len_bytes[0],
1412                ttl_len_bytes[1],
1413                ttl_len_bytes[2],
1414                ttl_len_bytes[3],
1415                ttl_len_bytes[4],
1416                ttl_len_bytes[5],
1417                ttl_len_bytes[6],
1418                ttl_len_bytes[7],
1419            ]) as usize;
1420            pos += 8;
1421
1422            if pos + ttl_len <= buffer.len() {
1423                let ttl_data = &buffer[pos..pos + ttl_len];
1424                if let Some(ref lease) = self.lease {
1425                    lease.reload(ttl_data)?;
1426                }
1427            }
1428        }
1429
1430        // Atomically replace the data
1431        {
1432            let mut data = self.data.write();
1433            *data = new_data;
1434        }
1435
1436        // Update metadata
1437        *self.last_snapshot_metadata.write() = Some(metadata.clone());
1438
1439        if let Some(last_included) = &metadata.last_included {
1440            self.update_last_applied(*last_included);
1441        }
1442
1443        // Persist to disk
1444        self.persist_data_async().await?;
1445        self.persist_metadata_async().await?;
1446        self.clear_wal_async().await?;
1447
1448        info!("Snapshot applied successfully");
1449        Ok(())
1450    }
1451
1452    async fn generate_snapshot_data(
1453        &self,
1454        new_snapshot_dir: std::path::PathBuf,
1455        last_included: LogId,
1456    ) -> Result<Bytes, Error> {
1457        info!("Generating snapshot data up to {:?}", last_included);
1458
1459        // Create snapshot directory
1460        fs::create_dir_all(&new_snapshot_dir).await?;
1461
1462        // Create snapshot file
1463        let snapshot_path = new_snapshot_dir.join("snapshot.bin");
1464        let mut file = File::create(&snapshot_path).await?;
1465
1466        let data_copy: HashMap<Bytes, (Bytes, u64)> = {
1467            let data = self.data.read();
1468            data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
1469        };
1470
1471        // Batch serialize into a single buffer — eliminates per-entry async yield overhead.
1472        let lease_snapshot = if let Some(ref lease) = self.lease {
1473            lease.to_snapshot()
1474        } else {
1475            Vec::new()
1476        };
1477        let estimated: usize =
1478            data_copy.iter().map(|(k, (v, _))| 8 + k.len() + 8 + v.len() + 8).sum::<usize>()
1479                + 8
1480                + lease_snapshot.len();
1481        let mut buf = Vec::with_capacity(estimated);
1482
1483        for (key, (value, term)) in &data_copy {
1484            buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
1485            buf.extend_from_slice(key);
1486            buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
1487            buf.extend_from_slice(value);
1488            buf.extend_from_slice(&term.to_be_bytes());
1489        }
1490
1491        buf.extend_from_slice(&(lease_snapshot.len() as u64).to_be_bytes());
1492        buf.extend_from_slice(&lease_snapshot);
1493
1494        file.write_all(&buf).await?;
1495
1496        file.flush().await?;
1497
1498        // Update metadata
1499        let metadata = SnapshotMetadata {
1500            last_included: Some(last_included),
1501            checksum: Bytes::from(vec![0; 32]), // Simple checksum for demo
1502        };
1503
1504        self.update_last_snapshot_metadata(&metadata)?;
1505
1506        info!("Snapshot generated at {:?}", snapshot_path);
1507
1508        // Return dummy checksum
1509        Ok(Bytes::from_static(&[0u8; 32]))
1510    }
1511
1512    fn save_hard_state(&self) -> Result<(), Error> {
1513        let last_applied = self.last_applied();
1514        self.persist_last_applied(last_applied)?;
1515
1516        if let Some(last_snapshot_metadata) = self.snapshot_metadata() {
1517            self.persist_last_snapshot_metadata(&last_snapshot_metadata)?;
1518        }
1519
1520        self.flush()?;
1521        Ok(())
1522    }
1523
1524    fn flush(&self) -> Result<(), Error> {
1525        self.persist_data()?;
1526        self.persist_metadata()?;
1527        // self.clear_wal()?;
1528        Ok(())
1529    }
1530
1531    async fn flush_async(&self) -> Result<(), Error> {
1532        // Force checkpoint on shutdown to ensure WAL entries are not lost.
1533        self.checkpoint().await
1534    }
1535
1536    async fn reset(&self) -> Result<(), Error> {
1537        self.reset().await
1538    }
1539
1540    async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
1541        // Fast path: no lease configured
1542        let Some(ref lease) = self.lease else {
1543            return Ok(vec![]);
1544        };
1545
1546        // Get all expired keys
1547        let now = SystemTime::now();
1548        let expired_keys = lease.get_expired_keys(now);
1549
1550        if expired_keys.is_empty() {
1551            return Ok(vec![]);
1552        }
1553
1554        debug!(
1555            "Lease background cleanup: found {} expired keys",
1556            expired_keys.len()
1557        );
1558
1559        // Delete expired keys from storage
1560        {
1561            let mut data = self.data.write();
1562            for key in &expired_keys {
1563                data.remove(key);
1564            }
1565        }
1566
1567        // Persist to disk after batch deletion; propagate error so caller can retry
1568        self.persist_data_async().await?;
1569
1570        info!(
1571            "Lease background cleanup: deleted {} expired keys",
1572            expired_keys.len()
1573        );
1574
1575        Ok(expired_keys)
1576    }
1577
1578    fn scan_prefix(
1579        &self,
1580        prefix: &[u8],
1581    ) -> Result<ScanResult, Error> {
1582        let data = self.data.read();
1583        let entries: Vec<(Bytes, Bytes)> = data
1584            .iter()
1585            .filter(|(k, _)| k.starts_with(prefix))
1586            .map(|(k, (v, _))| (k.clone(), v.clone()))
1587            .collect();
1588        let revision = self.last_applied_index.load(Ordering::SeqCst);
1589        Ok(ScanResult { entries, revision })
1590    }
1591}