d_engine_server/storage/adaptors/file/
file_state_machine.rs

1//! File-based state machine implementation with crash recovery
2//!
3//! This module provides a durable state machine implementation using file-based storage
4//! with Write-Ahead Logging (WAL) for crash consistency.
5//!
6//! # Architecture
7//!
8//! ## Storage Components
9//!
10//! - **`state.data`**: Serialized key-value store (persisted after each apply_chunk)
11//! - **`wal.log`**: Write-Ahead Log for crash recovery (cleared after successful persistence)
12//! - **`ttl_state.bin`**: TTL manager state (persisted alongside state.data)
13//! - **`metadata.bin`**: Raft metadata (last_applied_index, last_applied_term)
14//!
15//! ## Write-Ahead Log (WAL) Design
16//!
17//! The WAL ensures crash consistency by recording operations before they are applied to
18//! in-memory state. Each WAL entry contains:
19//!
20//! ```text
21//! ┌─────────────────────────────────────────────────────────────────┐
22//! │ Entry Index (8 bytes) │ Entry Term (8 bytes) │ OpCode (1 byte) │
23//! ├─────────────────────────────────────────────────────────────────┤
24//! │ Key Length (8 bytes)  │ Key Data (N bytes)                      │
25//! ├─────────────────────────────────────────────────────────────────┤
26//! │ Value Length (8 bytes)│ Value Data (M bytes, if present)        │
27//! ├─────────────────────────────────────────────────────────────────┤
28//! │ Expire At (8 bytes, 0 = no TTL, >0 = UNIX timestamp in seconds) │
29//! └─────────────────────────────────────────────────────────────────┘
30//! ```
31//!
32//! ### TTL Semantics
33//!
34//! d-engine uses **absolute expiration time**:
35//!
36//! - When a key is created with TTL, the system calculates: `expire_at = now() + ttl_secs`
37//! - WAL stores the **absolute expiration timestamp** (UNIX seconds since epoch)
38//! - After crash recovery, expired keys are **not restored** (checked during replay)
39//! - TTL does **not reset** on restart (crash-safe)
40//!
41//! **Example:**
42//! ```text
43//! T0:  PUT key="foo", ttl=10s → expire_at = T0 + 10 = T10 (stored in WAL)
44//! T5:  CRASH
45//! T12: RESTART
46//!      → Replay WAL: expire_at = T10 < T12 (already expired)
47//!      → Key is NOT restored (correctly expired)
48//! ```
49//!
50//! **Why absolute time in WAL:**
51//! 1. Ensures expired keys stay expired after crash (durable expiration semantics)
52//! 2. Passive expiration (in get()) is crash-safe without WAL writes
53//! 3. No TTL reset on recovery (deterministic expiration)
54//!
55//! ### WAL Lifecycle
56//!
57//! ```text
58//! apply_chunk() → append_to_wal() → [crash safe] → persist_data_async()
59//!                                                 → clear_wal_async()
60//! ```
61//!
62//! After successful persistence, WAL is cleared since state is now in `state.data`.
63//!
64//! ## Crash Recovery Flow
65//!
66//! On node startup (`new()`):
67//! 1. `load_metadata()` - Restore Raft state
68//! 2. `load_data()` - Load persisted key-value data
69//! 3. `load_ttl_data()` - Load persisted TTL state
70//! 4. `replay_wal()` - **Critical**: Replay uncommitted operations from WAL
71//!    - Restores keys AND their TTL metadata
72//!    - Ensures crash consistency (operations are idempotent)
73//!
74//! ## TTL Cleanup Strategy
75//!
76//! - **Background Cleanup**: Periodic async worker scans and deletes expired keys
77//! - **Zero Overhead**: When TTL feature is disabled, no lease components are initialized
78
79use std::collections::HashMap;
80use std::io::Write;
81use std::path::PathBuf;
82use std::sync::Arc;
83use std::sync::atomic::AtomicBool;
84use std::sync::atomic::AtomicU64;
85use std::sync::atomic::Ordering;
86use std::time::SystemTime;
87
88use bytes::Bytes;
89use d_engine_core::Error;
90use d_engine_core::Lease;
91use d_engine_core::StateMachine;
92use d_engine_core::StorageError;
93use d_engine_proto::client::WriteCommand;
94use d_engine_proto::client::write_command::Delete;
95use d_engine_proto::client::write_command::Insert;
96use d_engine_proto::client::write_command::Operation;
97use d_engine_proto::common::Entry;
98use d_engine_proto::common::LogId;
99use d_engine_proto::common::entry_payload::Payload;
100use d_engine_proto::server::storage::SnapshotMetadata;
101use parking_lot::RwLock;
102use prost::Message;
103use tokio::fs;
104use tokio::fs::File;
105use tokio::fs::OpenOptions;
106use tokio::io::AsyncReadExt;
107use tokio::io::AsyncWriteExt;
108use tokio::time::Instant;
109use tonic::async_trait;
110use tracing::debug;
111use tracing::error;
112use tracing::info;
113use tracing::warn;
114
115use crate::storage::DefaultLease;
116
117type FileStateMachineDataType = RwLock<HashMap<Bytes, (Bytes, u64)>>;
118
119/// WAL operation codes for fixed-size encoding
120#[repr(u8)]
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122enum WalOpCode {
123    Noop = 0,
124    Insert = 1,
125    Delete = 2,
126    Config = 3,
127}
128
129impl WalOpCode {
130    fn from_str(s: &str) -> Self {
131        match s {
132            "INSERT" => Self::Insert,
133            "DELETE" => Self::Delete,
134            "CONFIG" => Self::Config,
135            _ => Self::Noop,
136        }
137    }
138
139    fn from_u8(byte: u8) -> Self {
140        match byte {
141            1 => Self::Insert,
142            2 => Self::Delete,
143            3 => Self::Config,
144            _ => Self::Noop,
145        }
146    }
147}
148
149/// File-based state machine implementation with persistence
150///
151/// Design principles:
152/// - All data is persisted to disk for durability
153/// - In-memory cache for fast read operations
154/// - Write-ahead logging for crash consistency
155/// - Efficient snapshot handling with file-based storage
156/// - Thread-safe with minimal lock contention
157/// - TTL support for automatic key expiration
158#[derive(Debug)]
159pub struct FileStateMachine {
160    // Key-value storage with disk persistence
161    data: FileStateMachineDataType, // (value, term)
162
163    // Lease management for automatic key expiration
164    // DefaultLease is thread-safe internally (uses DashMap + Mutex)
165    // Injected by NodeBuilder after construction
166    lease: Option<Arc<DefaultLease>>,
167
168    /// Whether lease manager is enabled (immutable after init)
169    /// Set to true when lease is injected, never changes after that
170    ///
171    /// Invariant: lease_enabled == true ⟹ lease.is_some()
172    /// Performance: Allows safe unwrap_unchecked in hot paths
173    lease_enabled: bool,
174
175    // Raft state with disk persistence
176    last_applied_index: AtomicU64,
177    last_applied_term: AtomicU64,
178    last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
179
180    // Operational state
181    running: AtomicBool,
182
183    // File handles for persistence
184    data_dir: PathBuf,
185    // data_file: RwLock<File>,
186    // metadata_file: RwLock<File>,
187    // wal_file: RwLock<File>, // Write-ahead log for crash recovery
188}
189
190impl FileStateMachine {
191    /// Creates a new file-based state machine with persistence
192    ///
193    /// Lease will be injected by NodeBuilder after construction.
194    ///
195    /// # Arguments
196    /// * `data_dir` - Directory where data files will be stored
197    ///
198    /// # Returns
199    /// Result containing the initialized FileStateMachine
200    pub async fn new(data_dir: PathBuf) -> Result<Self, Error> {
201        // Ensure data directory exists
202        fs::create_dir_all(&data_dir).await?;
203
204        let machine = Self {
205            data: RwLock::new(HashMap::new()),
206            lease: None,          // Will be injected by NodeBuilder
207            lease_enabled: false, // Default: no lease until set
208            last_applied_index: AtomicU64::new(0),
209            last_applied_term: AtomicU64::new(0),
210            last_snapshot_metadata: RwLock::new(None),
211            running: AtomicBool::new(true),
212            data_dir: data_dir.clone(),
213        };
214
215        // Load existing data from disk
216        machine.load_from_disk().await?;
217
218        Ok(machine)
219    }
220
221    /// Sets the lease manager for this state machine.
222    ///
223    /// This is an internal method called by NodeBuilder during initialization.
224    /// The lease will also be restored from snapshot during `apply_snapshot_from_file()`.
225    /// Also available for testing and benchmarks.
226    pub fn set_lease(
227        &mut self,
228        lease: Arc<DefaultLease>,
229    ) {
230        // Mark lease as enabled (immutable after this point)
231        self.lease_enabled = true;
232        self.lease = Some(lease);
233    }
234
235    /// Injects lease configuration into this state machine.
236    ///
237    /// Framework-internal method: called by NodeBuilder::build() during initialization.
238    /// Loads state machine data from disk files
239    async fn load_from_disk(&self) -> Result<(), Error> {
240        // Load last applied index and term from metadata file
241        self.load_metadata().await?;
242
243        // Load key-value data from data file
244        self.load_data().await?;
245
246        // Load TTL data from disk
247        self.load_ttl_data().await?;
248
249        // Replay write-ahead log for crash recovery
250        self.replay_wal().await?;
251
252        info!("Loaded state machine data from disk");
253        Ok(())
254    }
255
256    /// Loads TTL data from disk (if lease is configured)
257    async fn load_ttl_data(&self) -> Result<(), Error> {
258        // Lease will be injected by NodeBuilder later
259        // The lease data will be loaded after injection
260        // For now, just skip this step during construction
261        Ok(())
262    }
263
264    /// Loads TTL data into the configured lease
265    ///
266    /// Called after NodeBuilder injects the lease.
267    /// Also available for testing and benchmarks.
268    pub async fn load_lease_data(&self) -> Result<(), Error> {
269        let Some(ref lease) = self.lease else {
270            return Ok(()); // No lease configured
271        };
272
273        let ttl_path = self.data_dir.join("ttl_state.bin");
274        if !ttl_path.exists() {
275            debug!("No TTL state file found");
276            return Ok(());
277        }
278
279        let ttl_data = tokio::fs::read(&ttl_path).await?;
280        lease.reload(&ttl_data)?;
281
282        info!("Loaded TTL state from disk: {} active TTLs", lease.len());
283        Ok(())
284    }
285
286    /// Loads metadata from disk
287    async fn load_metadata(&self) -> Result<(), Error> {
288        let metadata_path = self.data_dir.join("metadata.bin");
289        if !metadata_path.exists() {
290            return Ok(());
291        }
292
293        let mut file = File::open(metadata_path).await?;
294        let mut buffer = [0u8; 16];
295
296        if file.read_exact(&mut buffer).await.is_ok() {
297            let index = u64::from_be_bytes([
298                buffer[0], buffer[1], buffer[2], buffer[3], buffer[4], buffer[5], buffer[6],
299                buffer[7],
300            ]);
301
302            let term = u64::from_be_bytes([
303                buffer[8], buffer[9], buffer[10], buffer[11], buffer[12], buffer[13], buffer[14],
304                buffer[15],
305            ]);
306
307            self.last_applied_index.store(index, Ordering::SeqCst);
308            self.last_applied_term.store(term, Ordering::SeqCst);
309        }
310
311        Ok(())
312    }
313
314    /// Loads key-value data from disk
315    async fn load_data(&self) -> Result<(), Error> {
316        let data_path = self.data_dir.join("state.data");
317        if !data_path.exists() {
318            return Ok(());
319        }
320
321        let mut file = File::open(data_path).await?;
322        let mut buffer = Vec::new();
323        file.read_to_end(&mut buffer).await?;
324
325        let mut pos = 0;
326        let mut data = self.data.write();
327
328        while pos < buffer.len() {
329            // Read key length
330            if pos + 8 > buffer.len() {
331                break;
332            }
333
334            let key_len_bytes = &buffer[pos..pos + 8];
335            let key_len = u64::from_be_bytes([
336                key_len_bytes[0],
337                key_len_bytes[1],
338                key_len_bytes[2],
339                key_len_bytes[3],
340                key_len_bytes[4],
341                key_len_bytes[5],
342                key_len_bytes[6],
343                key_len_bytes[7],
344            ]) as usize;
345
346            pos += 8;
347
348            // Read key
349            if pos + key_len > buffer.len() {
350                break;
351            }
352
353            let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
354            pos += key_len;
355
356            // Read value length
357            if pos + 8 > buffer.len() {
358                break;
359            }
360
361            let value_len_bytes = &buffer[pos..pos + 8];
362            let value_len = u64::from_be_bytes([
363                value_len_bytes[0],
364                value_len_bytes[1],
365                value_len_bytes[2],
366                value_len_bytes[3],
367                value_len_bytes[4],
368                value_len_bytes[5],
369                value_len_bytes[6],
370                value_len_bytes[7],
371            ]) as usize;
372
373            pos += 8;
374
375            // Read value
376            if pos + value_len > buffer.len() {
377                break;
378            }
379
380            let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
381            pos += value_len;
382
383            // Read term
384            if pos + 8 > buffer.len() {
385                break;
386            }
387
388            let term_bytes = &buffer[pos..pos + 8];
389            let term = u64::from_be_bytes([
390                term_bytes[0],
391                term_bytes[1],
392                term_bytes[2],
393                term_bytes[3],
394                term_bytes[4],
395                term_bytes[5],
396                term_bytes[6],
397                term_bytes[7],
398            ]);
399
400            pos += 8;
401
402            // Store in memory
403            data.insert(key, (value, term));
404        }
405
406        Ok(())
407    }
408
409    /// Replays write-ahead log for crash recovery
410    async fn replay_wal(&self) -> Result<(), Error> {
411        let wal_path = self.data_dir.join("wal.log");
412        if !wal_path.exists() {
413            debug!("No WAL file found, skipping replay");
414            return Ok(());
415        }
416
417        let mut file = File::open(wal_path).await?;
418        let mut buffer = Vec::new();
419        file.read_to_end(&mut buffer).await?;
420
421        if buffer.is_empty() {
422            debug!("WAL file is empty, skipping replay");
423            return Ok(());
424        }
425
426        let mut pos = 0;
427        let mut operations = Vec::new();
428        let mut replayed_count = 0;
429
430        while pos + 17 < buffer.len() {
431            // Read entry index (8 bytes)
432            let _index = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
433            pos += 8;
434
435            // Read entry term (8 bytes)
436            let term = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
437            pos += 8;
438
439            // Read operation code (1 byte)
440            let op_code = WalOpCode::from_u8(buffer[pos]);
441            pos += 1;
442
443            // Check if we have enough bytes for key length
444            if pos + 8 > buffer.len() {
445                warn!("Incomplete key length at position {}, stopping replay", pos);
446                break;
447            }
448
449            // Read key length (8 bytes)
450            let key_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
451            pos += 8;
452
453            // Check if we have enough data for the key
454            if pos + key_len > buffer.len() {
455                warn!(
456                    "Incomplete key data at position {} (need {} bytes, have {})",
457                    pos,
458                    key_len,
459                    buffer.len() - pos
460                );
461                break;
462            }
463
464            // Read key
465            let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
466            pos += key_len;
467
468            // Check if we have enough bytes for value length
469            if pos + 8 > buffer.len() {
470                warn!(
471                    "Incomplete value length at position {}, stopping replay",
472                    pos
473                );
474                break;
475            }
476
477            // Read value length (8 bytes)
478            let value_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
479            pos += 8;
480
481            // Read value if present
482            let value = if value_len > 0 {
483                if pos + value_len > buffer.len() {
484                    warn!("Incomplete value data at position {}, stopping replay", pos);
485                    break;
486                }
487                let value_data = Bytes::from(buffer[pos..pos + value_len].to_vec());
488                pos += value_len;
489                Some(value_data)
490            } else {
491                None
492            };
493
494            // Read absolute expiration time (8 bytes) - 0 means no TTL
495            //
496            // WAL Format Migration Path:
497            // - Old format (pre-v0.2.0): ttl_secs (u32, 4 bytes, relative time)
498            // - New format (v0.2.0+): expire_at_secs (u64, 8 bytes, absolute time)
499            //
500            // Backward Compatibility Strategy:
501            // Since this is a breaking change and d-engine has not been deployed to production,
502            // we do NOT support reading old WAL format. All WAL entries must use the new format.
503            // If upgrading from pre-v0.2.0, users must:
504            // 1. Gracefully stop the old version (persists state.data + ttl_state.bin)
505            // 2. Upgrade to v0.2.0+
506            // 3. Start the new version (loads from persisted state, not WAL)
507            let expire_at_secs = if pos + 8 <= buffer.len() {
508                let secs = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
509                pos += 8;
510                if secs > 0 { Some(secs) } else { None }
511            } else {
512                // Incomplete WAL entry - log and skip
513                // This indicates corrupted WAL or incomplete write before crash
514                debug!(
515                    "No expiration time field at position {}, assuming no TTL (incomplete WAL entry)",
516                    pos
517                );
518                None
519            };
520
521            operations.push((op_code, key, value, term, expire_at_secs));
522            replayed_count += 1;
523        }
524
525        info!(
526            "Parsed {} WAL operations, applying to memory",
527            operations.len()
528        );
529
530        // Apply all collected operations with a single lock acquisition
531        let mut applied_count = 0;
532        let mut skipped_expired = 0;
533        let now = std::time::SystemTime::now();
534        {
535            let mut data = self.data.write();
536
537            for (op_code, key, value, term, expire_at_secs) in operations {
538                match op_code {
539                    WalOpCode::Insert => {
540                        if let Some(value_data) = value {
541                            // Check if key is already expired (crash-safe TTL semantics)
542                            let is_expired = if let Some(secs) = expire_at_secs {
543                                let expire_at =
544                                    std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs);
545                                now >= expire_at
546                            } else {
547                                false
548                            };
549
550                            if is_expired {
551                                // Skip restoring expired keys (durable expiration semantics)
552                                debug!("Skipped expired key during WAL replay: key={:?}", key);
553                                skipped_expired += 1;
554                                continue;
555                            }
556
557                            data.insert(key.clone(), (value_data, term));
558
559                            // Restore TTL from WAL (if lease configured and has expiration)
560                            if let Some(secs) = expire_at_secs {
561                                if let Some(ref lease) = self.lease {
562                                    let expire_at = std::time::UNIX_EPOCH
563                                        + std::time::Duration::from_secs(secs);
564                                    let remaining = expire_at
565                                        .duration_since(now)
566                                        .map(|d| d.as_secs())
567                                        .unwrap_or(0);
568
569                                    if remaining > 0 {
570                                        lease.register(key.clone(), remaining);
571                                        debug!(
572                                            "Replayed INSERT with TTL: key={:?}, remaining={}s",
573                                            key, remaining
574                                        );
575                                    }
576                                }
577                            } else {
578                                debug!("Replayed INSERT: key={:?}", key);
579                            }
580
581                            applied_count += 1;
582                        } else {
583                            warn!("INSERT operation without value");
584                        }
585                    }
586                    WalOpCode::Delete => {
587                        data.remove(&key);
588                        if let Some(ref lease) = self.lease {
589                            lease.unregister(&key);
590                        }
591                        applied_count += 1;
592                        debug!("Replayed DELETE: key={:?}", key);
593                    }
594                    WalOpCode::Noop | WalOpCode::Config => {
595                        // No data modification needed
596                        applied_count += 1;
597                        debug!("Replayed {:?} operation", op_code);
598                    }
599                }
600            }
601        }
602
603        info!(
604            "WAL replay complete: {} operations replayed, {} applied, {} expired keys skipped",
605            replayed_count, applied_count, skipped_expired
606        );
607
608        // Clear WAL only if replay was successful
609        if applied_count > 0 {
610            self.clear_wal_async().await?;
611            debug!(
612                "Cleared WAL after successful replay of {} operations",
613                applied_count
614            );
615        }
616
617        Ok(())
618    }
619
620    /// Persists key-value data to disk
621    fn persist_data(&self) -> Result<(), Error> {
622        // Collect data first to minimize lock time
623        let data_copy: HashMap<Bytes, (Bytes, u64)> = {
624            let data = self.data.read();
625            data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
626        };
627
628        // Write to file without holding data lock
629        let data_path = self.data_dir.join("state.data");
630        let mut file = std::fs::OpenOptions::new()
631            .write(true)
632            .create(true)
633            .truncate(true)
634            .open(data_path)?;
635
636        for (key, (value, term)) in data_copy.iter() {
637            // Write key length (8 bytes)
638            let key_len = key.len() as u64;
639            file.write_all(&key_len.to_be_bytes())?;
640
641            // Write key
642            file.write_all(key)?;
643
644            // Write value length (8 bytes)
645            let value_len = value.len() as u64;
646            file.write_all(&value_len.to_be_bytes())?;
647
648            // Write value
649            file.write_all(value)?;
650
651            // Write term (8 bytes)
652            file.write_all(&term.to_be_bytes())?;
653        }
654
655        file.flush()?;
656        Ok(())
657    }
658
659    /// Persists key-value data to disk
660    async fn persist_data_async(&self) -> Result<(), Error> {
661        // Collect data first to minimize lock time
662        let data_copy: HashMap<Bytes, (Bytes, u64)> = {
663            let data = self.data.read();
664            data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
665        };
666
667        // Write to file without holding data lock
668        let data_path = self.data_dir.join("state.data");
669        let mut file = OpenOptions::new()
670            .write(true)
671            .create(true)
672            .truncate(true)
673            .open(data_path)
674            .await?;
675
676        for (key, (value, term)) in data_copy.iter() {
677            // Write key length (8 bytes)
678            let key_len = key.len() as u64;
679            file.write_all(&key_len.to_be_bytes()).await?;
680
681            // Write key
682            file.write_all(key.as_ref()).await?;
683
684            // Write value length (8 bytes)
685            let value_len = value.len() as u64;
686            file.write_all(&value_len.to_be_bytes()).await?;
687
688            // Write value
689            file.write_all(value.as_ref()).await?;
690
691            // Write term (8 bytes)
692            file.write_all(&term.to_be_bytes()).await?;
693        }
694
695        file.flush().await?;
696
697        Ok(())
698    }
699
700    /// Persists metadata to disk
701    fn persist_metadata(&self) -> Result<(), Error> {
702        let metadata_path = self.data_dir.join("metadata.bin");
703        let mut file = std::fs::OpenOptions::new()
704            .write(true)
705            .create(true)
706            .truncate(true)
707            .open(metadata_path)?;
708
709        let index = self.last_applied_index.load(Ordering::SeqCst);
710        let term = self.last_applied_term.load(Ordering::SeqCst);
711
712        file.write_all(&index.to_be_bytes())?;
713        file.write_all(&term.to_be_bytes())?;
714
715        file.flush()?;
716        Ok(())
717    }
718
719    async fn persist_metadata_async(&self) -> Result<(), Error> {
720        let metadata_path = self.data_dir.join("metadata.bin");
721        let mut file = OpenOptions::new()
722            .write(true)
723            .create(true)
724            .truncate(true)
725            .open(metadata_path)
726            .await?;
727
728        let index = self.last_applied_index.load(Ordering::SeqCst);
729        let term = self.last_applied_term.load(Ordering::SeqCst);
730
731        file.write_all(&index.to_be_bytes()).await?;
732        file.write_all(&term.to_be_bytes()).await?;
733
734        file.flush().await?;
735        Ok(())
736    }
737
738    /// Clears the write-ahead log (called after successful persistence)
739    #[allow(unused)]
740    fn clear_wal(&self) -> Result<(), Error> {
741        let wal_path = self.data_dir.join("wal.log");
742        let mut file = std::fs::OpenOptions::new()
743            .write(true)
744            .create(true)
745            .truncate(true)
746            .open(wal_path)?;
747
748        file.set_len(0)?;
749        file.flush()?;
750        Ok(())
751    }
752
753    /// Clears the write-ahead log (called after successful persistence)
754    async fn clear_wal_async(&self) -> Result<(), Error> {
755        let wal_path = self.data_dir.join("wal.log");
756        let mut file = OpenOptions::new()
757            .write(true)
758            .create(true)
759            .truncate(true)
760            .open(wal_path)
761            .await?;
762
763        file.set_len(0).await?;
764        file.flush().await?;
765        Ok(())
766    }
767
768    /// Resets the state machine to its initial empty state
769    ///
770    /// This method:
771    /// 1. Clears all in-memory data
772    /// 2. Resets Raft state to initial values
773    /// 3. Clears all persisted files
774    /// 4. Maintains operational state (running status, node ID)
775    pub async fn reset(&self) -> Result<(), Error> {
776        info!("Resetting state machine");
777
778        // Clear in-memory data
779        {
780            let mut data = self.data.write();
781            data.clear();
782        }
783
784        // Reset Raft state
785        self.last_applied_index.store(0, Ordering::SeqCst);
786        self.last_applied_term.store(0, Ordering::SeqCst);
787
788        {
789            let mut snapshot_metadata = self.last_snapshot_metadata.write();
790            *snapshot_metadata = None;
791        }
792
793        // Clear all persisted files
794        self.clear_data_file().await?;
795        self.clear_metadata_file().await?;
796        self.clear_wal_async().await?;
797
798        info!("State machine reset completed");
799        Ok(())
800    }
801
802    /// Clears the data file
803    async fn clear_data_file(&self) -> Result<(), Error> {
804        let data_path = self.data_dir.join("state.data");
805        let mut file = OpenOptions::new()
806            .write(true)
807            .create(true)
808            .truncate(true)
809            .open(data_path)
810            .await?;
811
812        file.set_len(0).await?;
813        file.flush().await?;
814        Ok(())
815    }
816
817    /// Clears the metadata file
818    async fn clear_metadata_file(&self) -> Result<(), Error> {
819        let metadata_path = self.data_dir.join("metadata.bin");
820        let mut file = OpenOptions::new()
821            .write(true)
822            .create(true)
823            .truncate(true)
824            .open(metadata_path)
825            .await?;
826
827        // Write default values (0 for both index and term)
828        file.write_all(&0u64.to_be_bytes()).await?;
829        file.write_all(&0u64.to_be_bytes()).await?;
830
831        file.flush().await?;
832        Ok(())
833    }
834
835    /// Batch WAL writes with proper durability guarantees
836    ///
837    /// Format per entry:
838    /// - 8 bytes: entry index (big-endian u64)
839    /// - 8 bytes: entry term (big-endian u64)
840    /// - 1 byte: operation code (0=NOOP, 1=INSERT, 2=DELETE, 3=CONFIG)
841    /// - 8 bytes: key length (big-endian u64)
842    /// - N bytes: key data
843    /// - 8 bytes: value length (big-endian u64, 0 if no value)
844    /// - M bytes: value data (only if length > 0)
845    pub(crate) async fn append_to_wal(
846        &self,
847        entries: Vec<(Entry, String, Bytes, Option<Bytes>, u64)>,
848    ) -> Result<(), Error> {
849        if entries.is_empty() {
850            return Ok(());
851        }
852
853        let wal_path = self.data_dir.join("wal.log");
854
855        let mut file =
856            OpenOptions::new().write(true).create(true).append(true).open(&wal_path).await?;
857
858        // Pre-allocate buffer with estimated size
859        let estimated_size: usize = entries
860            .iter()
861            .map(|(_, _, key, value, _)| {
862                8 + 8 + 1 + 8 + key.len() + 8 + value.as_ref().map_or(0, |v| v.len()) + 8
863            })
864            .sum();
865
866        // Single batched write instead of multiple small writes
867        let mut batch_buffer = Vec::with_capacity(estimated_size);
868
869        for (entry, operation, key, value, ttl_secs) in entries {
870            // Write entry index and term (16 bytes total)
871            batch_buffer.extend_from_slice(&entry.index.to_be_bytes());
872            batch_buffer.extend_from_slice(&entry.term.to_be_bytes());
873
874            // Write operation code (1 byte)
875            let op_code = WalOpCode::from_str(&operation);
876            batch_buffer.push(op_code as u8);
877
878            // Write key length and data (8 + N bytes)
879            batch_buffer.extend_from_slice(&(key.len() as u64).to_be_bytes());
880            batch_buffer.extend_from_slice(&key);
881
882            // Write value length and data (8 + M bytes)
883            // Always write length field for consistent format
884            if let Some(value_data) = value {
885                batch_buffer.extend_from_slice(&(value_data.len() as u64).to_be_bytes());
886                batch_buffer.extend_from_slice(&value_data);
887            } else {
888                // Write 0 length for operations without value
889                batch_buffer.extend_from_slice(&0u64.to_be_bytes());
890            }
891
892            // Write absolute expiration time (8 bytes) - 0 means no TTL
893            // Store UNIX timestamp (seconds since epoch) for crash-safe expiration
894            let expire_at_secs = if ttl_secs > 0 {
895                let expire_at =
896                    std::time::SystemTime::now() + std::time::Duration::from_secs(ttl_secs);
897                expire_at
898                    .duration_since(std::time::UNIX_EPOCH)
899                    .map(|d| d.as_secs())
900                    .unwrap_or(0)
901            } else {
902                0
903            };
904            batch_buffer.extend_from_slice(&expire_at_secs.to_be_bytes());
905        }
906
907        file.write_all(&batch_buffer).await?;
908        file.flush().await?;
909
910        Ok(())
911    }
912
913    /// Piggyback cleanup: Remove expired keys with time budget
914    ///
915    /// This method is called during apply_chunk to cleanup expired keys
916    /// opportunistically (piggyback on existing Raft events).
917    ///
918    /// # Arguments
919    /// * `max_duration_ms` - Maximum time budget for cleanup (milliseconds)
920    ///
921    /// # Returns
922    /// Number of keys deleted
923    ///
924    /// # Performance
925    /// - Fast-path: ~10ns if no TTL keys exist (lazy activation check)
926    /// - Cleanup: O(log N + K) where K = expired keys
927    /// - Time-bounded: stops after max_duration_ms to avoid blocking Raft
928    ///
929    /// # Checkpoint
930    /// Persist memory to disk and clear WAL.
931    /// This is the "safe point" after which WAL is no longer needed.
932    #[allow(unused)]
933    pub(crate) async fn checkpoint(&self) -> Result<(), Error> {
934        // 1. Persist current state
935        self.persist_data_async().await?;
936        self.persist_metadata_async().await?;
937
938        // 2. Clear WAL (data is now in state.data)
939        self.clear_wal_async().await?;
940
941        Ok(())
942    }
943}
944
945impl Drop for FileStateMachine {
946    fn drop(&mut self) {
947        let timer = Instant::now();
948
949        // Save state into local database including flush operation
950        match self.save_hard_state() {
951            Ok(_) => debug!("StateMachine saved in {:?}", timer.elapsed()),
952            Err(e) => error!("Failed to save StateMachine: {}", e),
953        }
954    }
955}
956
957#[async_trait]
958impl StateMachine for FileStateMachine {
959    async fn start(&self) -> Result<(), Error> {
960        self.running.store(true, Ordering::SeqCst);
961
962        // Load persisted lease data if configured
963        if self.lease.is_some() {
964            self.load_lease_data().await?;
965            debug!("Lease data loaded during state machine initialization");
966        }
967
968        info!("File state machine started");
969        Ok(())
970    }
971
972    fn stop(&self) -> Result<(), Error> {
973        // Ensure all data is flushed to disk before stopping
974        self.running.store(false, Ordering::SeqCst);
975
976        // Graceful shutdown: persist TTL state to disk
977        // This ensures lease data survives across restarts
978        if let Some(ref lease) = self.lease {
979            let ttl_snapshot = lease.to_snapshot();
980            let ttl_path = self.data_dir.join("ttl_state.bin");
981            // Use blocking write since stop() is sync
982            std::fs::write(&ttl_path, ttl_snapshot)
983                .map_err(d_engine_core::StorageError::IoError)?;
984            debug!("Persisted TTL state on shutdown");
985        }
986
987        info!("File state machine stopped");
988        Ok(())
989    }
990
991    fn is_running(&self) -> bool {
992        self.running.load(Ordering::SeqCst)
993    }
994
995    fn get(
996        &self,
997        key_buffer: &[u8],
998    ) -> Result<Option<Bytes>, Error> {
999        // Lazy cleanup: only check expiration in Lazy strategy
1000        // Background strategy handles cleanup in dedicated async task
1001        // Background cleanup strategy: expired keys are cleaned by background worker
1002        // No on-read checks needed (simplifies get() hot path)
1003        let data = self.data.read();
1004        Ok(data.get(key_buffer).map(|(value, _)| value.clone()))
1005    }
1006
1007    fn entry_term(
1008        &self,
1009        entry_id: u64,
1010    ) -> Option<u64> {
1011        let data = self.data.read();
1012        data.values().find(|(_, index)| *index == entry_id).map(|(_, term)| *term)
1013    }
1014
1015    /// Thread-safe: called serially by single-task CommitHandler
1016    async fn apply_chunk(
1017        &self,
1018        chunk: Vec<Entry>,
1019    ) -> Result<(), Error> {
1020        let mut highest_index_entry: Option<LogId> = None;
1021        let mut batch_operations = Vec::new();
1022
1023        // PHASE 1: Decode all operations and prepare WAL entries
1024        for entry in chunk {
1025            let entry_index = entry.index;
1026
1027            assert!(entry.payload.is_some(), "Entry payload should not be None!");
1028
1029            // Ensure entries are processed in order
1030            if let Some(prev) = &highest_index_entry {
1031                assert!(
1032                    entry.index > prev.index,
1033                    "apply_chunk: received unordered entry at index {} (prev={})",
1034                    entry.index,
1035                    prev.index
1036                );
1037            }
1038            highest_index_entry = Some(LogId {
1039                index: entry.index,
1040                term: entry.term,
1041            });
1042
1043            // Decode operations without holding locks
1044            match entry.payload.as_ref().unwrap().payload.as_ref() {
1045                Some(Payload::Noop(_)) => {
1046                    debug!("Handling NOOP command at index {}", entry.index);
1047                    batch_operations.push((entry, "NOOP", Bytes::new(), None, 0));
1048                }
1049                Some(Payload::Command(bytes)) => match WriteCommand::decode(&bytes[..]) {
1050                    Ok(write_cmd) => {
1051                        // Extract operation data for batch processing
1052                        match write_cmd.operation {
1053                            Some(Operation::Insert(Insert {
1054                                key,
1055                                value,
1056                                ttl_secs,
1057                            })) => {
1058                                batch_operations.push((
1059                                    entry,
1060                                    "INSERT",
1061                                    key,
1062                                    Some(value),
1063                                    ttl_secs,
1064                                ));
1065                            }
1066                            Some(Operation::Delete(Delete { key })) => {
1067                                batch_operations.push((entry, "DELETE", key, None, 0));
1068                            }
1069                            None => {
1070                                warn!("WriteCommand without operation at index {}", entry.index);
1071                                batch_operations.push((entry, "NOOP", Bytes::new(), None, 0));
1072                            }
1073                        }
1074                    }
1075                    Err(e) => {
1076                        error!(
1077                            "Failed to decode WriteCommand at index {}: {:?}",
1078                            entry.index, e
1079                        );
1080                        return Err(StorageError::SerializationError(e.to_string()).into());
1081                    }
1082                },
1083                Some(Payload::Config(_config_change)) => {
1084                    debug!("Ignoring config change at index {}", entry.index);
1085                    batch_operations.push((entry, "CONFIG", Bytes::new(), None, 0));
1086                }
1087                None => panic!("Entry payload variant should not be None!"),
1088            }
1089
1090            info!("COMMITTED_LOG_METRIC: {}", entry_index);
1091        }
1092
1093        // PHASE 2: Batch WAL writes (minimize I/O latency)
1094        let mut wal_entries = Vec::new();
1095        for (entry, operation, key, value, ttl_secs) in &batch_operations {
1096            // Prepare WAL data without immediate I/O - include TTL for crash recovery
1097            wal_entries.push((
1098                entry.clone(),
1099                operation.to_string(),
1100                key.clone(),
1101                value.clone(),
1102                *ttl_secs, // ttl_secs is now u64 (0 = no TTL) from protobuf
1103            ));
1104        }
1105
1106        // Single batch WAL write (reduces I/O overhead)
1107        self.append_to_wal(wal_entries).await?;
1108
1109        // PHASE 3: Fast in-memory updates with minimal lock time (ZERO-COPY)
1110        {
1111            let mut data = self.data.write();
1112
1113            // Process all operations without any awaits inside the lock
1114            for (entry, operation, key, value, ttl_secs) in batch_operations {
1115                match operation {
1116                    "INSERT" => {
1117                        if let Some(value) = value {
1118                            // ZERO-COPY: Use existing Bytes without cloning if possible
1119                            data.insert(key.clone(), (value, entry.term));
1120
1121                            // Register lease if TTL specified
1122                            if ttl_secs > 0 {
1123                                // Validate lease is enabled before accepting TTL requests
1124                                if !self.lease_enabled {
1125                                    return Err(StorageError::FeatureNotEnabled(
1126                                        "TTL feature is not enabled on this server. \
1127                                         Enable it in config: [raft.state_machine.lease] enabled = true".into()
1128                                    ).into());
1129                                }
1130
1131                                // Safety: lease_enabled invariant ensures lease.is_some()
1132                                let lease = unsafe { self.lease.as_ref().unwrap_unchecked() };
1133                                lease.register(key, ttl_secs);
1134                            }
1135                        }
1136                    }
1137                    "DELETE" => {
1138                        data.remove(&key);
1139                        if let Some(ref lease) = self.lease {
1140                            lease.unregister(&key);
1141                        }
1142                    }
1143                    "NOOP" | "CONFIG" => {
1144                        // No data modification needed
1145                    }
1146                    _ => warn!("Unknown operation: {}", operation),
1147                }
1148            }
1149        } // Lock released immediately - no awaits inside!
1150
1151        // PHASE 4: Update last applied index
1152        // Note: Lease cleanup is now handled by:
1153        // - Lazy strategy: cleanup in get() method
1154        // - Background strategy: dedicated async task
1155        // This avoids blocking the Raft apply hot path
1156
1157        if let Some(log_id) = highest_index_entry {
1158            debug!("State machine - updated last_applied: {:?}", log_id);
1159            self.update_last_applied(log_id);
1160        }
1161
1162        // Persist changes to disk and clear WAL
1163        self.persist_data_async().await?;
1164        self.persist_metadata_async().await?;
1165        self.clear_wal_async().await?;
1166
1167        Ok(())
1168    }
1169
1170    fn len(&self) -> usize {
1171        self.data.read().len()
1172    }
1173
1174    fn update_last_applied(
1175        &self,
1176        last_applied: LogId,
1177    ) {
1178        self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
1179        self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
1180    }
1181
1182    fn last_applied(&self) -> LogId {
1183        LogId {
1184            index: self.last_applied_index.load(Ordering::SeqCst),
1185            term: self.last_applied_term.load(Ordering::SeqCst),
1186        }
1187    }
1188
1189    fn persist_last_applied(
1190        &self,
1191        last_applied: LogId,
1192    ) -> Result<(), Error> {
1193        self.update_last_applied(last_applied);
1194        self.persist_metadata()
1195    }
1196
1197    fn update_last_snapshot_metadata(
1198        &self,
1199        snapshot_metadata: &SnapshotMetadata,
1200    ) -> Result<(), Error> {
1201        *self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
1202        Ok(())
1203    }
1204
1205    fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
1206        self.last_snapshot_metadata.read().clone()
1207    }
1208
1209    fn persist_last_snapshot_metadata(
1210        &self,
1211        snapshot_metadata: &SnapshotMetadata,
1212    ) -> Result<(), Error> {
1213        self.update_last_snapshot_metadata(snapshot_metadata)
1214    }
1215
1216    async fn apply_snapshot_from_file(
1217        &self,
1218        metadata: &SnapshotMetadata,
1219        snapshot_dir: std::path::PathBuf,
1220    ) -> Result<(), Error> {
1221        info!("Applying snapshot from file: {:?}", snapshot_dir);
1222
1223        // Read from the snapshot.bin file inside the directory
1224        let snapshot_data_path = snapshot_dir.join("snapshot.bin");
1225        let mut file = File::open(snapshot_data_path).await?;
1226        let mut buffer = Vec::new();
1227        file.read_to_end(&mut buffer).await?;
1228
1229        // Parse snapshot data
1230        let mut pos = 0;
1231        let mut new_data = HashMap::new();
1232
1233        while pos < buffer.len() {
1234            // Read key length
1235            if pos + 8 > buffer.len() {
1236                break;
1237            }
1238
1239            let key_len_bytes = &buffer[pos..pos + 8];
1240            let key_len = u64::from_be_bytes([
1241                key_len_bytes[0],
1242                key_len_bytes[1],
1243                key_len_bytes[2],
1244                key_len_bytes[3],
1245                key_len_bytes[4],
1246                key_len_bytes[5],
1247                key_len_bytes[6],
1248                key_len_bytes[7],
1249            ]) as usize;
1250
1251            pos += 8;
1252
1253            // Read key
1254            if pos + key_len > buffer.len() {
1255                break;
1256            }
1257
1258            let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
1259            pos += key_len;
1260
1261            // Read value length
1262            if pos + 8 > buffer.len() {
1263                break;
1264            }
1265
1266            let value_len_bytes = &buffer[pos..pos + 8];
1267            let value_len = u64::from_be_bytes([
1268                value_len_bytes[0],
1269                value_len_bytes[1],
1270                value_len_bytes[2],
1271                value_len_bytes[3],
1272                value_len_bytes[4],
1273                value_len_bytes[5],
1274                value_len_bytes[6],
1275                value_len_bytes[7],
1276            ]) as usize;
1277
1278            pos += 8;
1279
1280            // Read value
1281            if pos + value_len > buffer.len() {
1282                break;
1283            }
1284
1285            let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
1286            pos += value_len;
1287
1288            // Read term
1289            if pos + 8 > buffer.len() {
1290                break;
1291            }
1292
1293            let term_bytes = &buffer[pos..pos + 8];
1294            let term = u64::from_be_bytes([
1295                term_bytes[0],
1296                term_bytes[1],
1297                term_bytes[2],
1298                term_bytes[3],
1299                term_bytes[4],
1300                term_bytes[5],
1301                term_bytes[6],
1302                term_bytes[7],
1303            ]);
1304
1305            pos += 8;
1306
1307            // Add to new data
1308            new_data.insert(key, (value, term));
1309        }
1310
1311        // Read and reload lease data if present
1312        if pos + 8 <= buffer.len() {
1313            let ttl_len_bytes = &buffer[pos..pos + 8];
1314            let ttl_len = u64::from_be_bytes([
1315                ttl_len_bytes[0],
1316                ttl_len_bytes[1],
1317                ttl_len_bytes[2],
1318                ttl_len_bytes[3],
1319                ttl_len_bytes[4],
1320                ttl_len_bytes[5],
1321                ttl_len_bytes[6],
1322                ttl_len_bytes[7],
1323            ]) as usize;
1324            pos += 8;
1325
1326            if pos + ttl_len <= buffer.len() {
1327                let ttl_data = &buffer[pos..pos + ttl_len];
1328                if let Some(ref lease) = self.lease {
1329                    lease.reload(ttl_data)?;
1330                }
1331            }
1332        }
1333
1334        // Atomically replace the data
1335        {
1336            let mut data = self.data.write();
1337            *data = new_data;
1338        }
1339
1340        // Update metadata
1341        *self.last_snapshot_metadata.write() = Some(metadata.clone());
1342
1343        if let Some(last_included) = &metadata.last_included {
1344            self.update_last_applied(*last_included);
1345        }
1346
1347        // Persist to disk
1348        self.persist_data_async().await?;
1349        self.persist_metadata_async().await?;
1350        self.clear_wal_async().await?;
1351
1352        info!("Snapshot applied successfully");
1353        Ok(())
1354    }
1355
1356    async fn generate_snapshot_data(
1357        &self,
1358        new_snapshot_dir: std::path::PathBuf,
1359        last_included: LogId,
1360    ) -> Result<Bytes, Error> {
1361        info!("Generating snapshot data up to {:?}", last_included);
1362
1363        // Create snapshot directory
1364        fs::create_dir_all(&new_snapshot_dir).await?;
1365
1366        // Create snapshot file
1367        let snapshot_path = new_snapshot_dir.join("snapshot.bin");
1368        let mut file = File::create(&snapshot_path).await?;
1369
1370        let data_copy: HashMap<Bytes, (Bytes, u64)> = {
1371            let data = self.data.read();
1372            data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
1373        };
1374
1375        // Write data in the same format as the data file
1376        for (key, (value, term)) in data_copy.iter() {
1377            // Write key length (8 bytes)
1378            let key_len = key.len() as u64;
1379            file.write_all(&key_len.to_be_bytes()).await?;
1380
1381            // Write key
1382            file.write_all(key).await?;
1383
1384            // Write value length (8 bytes)
1385            let value_len = value.len() as u64;
1386            file.write_all(&value_len.to_be_bytes()).await?;
1387
1388            // Write value
1389            file.write_all(value).await?;
1390
1391            // Write term (8 bytes)
1392            file.write_all(&term.to_be_bytes()).await?;
1393        }
1394
1395        // Write lease state if configured
1396        let lease_snapshot = if let Some(ref lease) = self.lease {
1397            lease.to_snapshot()
1398        } else {
1399            Vec::new()
1400        };
1401
1402        // Write lease data length
1403        let lease_len = lease_snapshot.len() as u64;
1404        file.write_all(&lease_len.to_be_bytes()).await?;
1405
1406        // Write lease data
1407        file.write_all(&lease_snapshot).await?;
1408
1409        file.flush().await?;
1410
1411        // Update metadata
1412        let metadata = SnapshotMetadata {
1413            last_included: Some(last_included),
1414            checksum: Bytes::from(vec![0; 32]), // Simple checksum for demo
1415        };
1416
1417        self.update_last_snapshot_metadata(&metadata)?;
1418
1419        info!("Snapshot generated at {:?}", snapshot_path);
1420
1421        // Return dummy checksum
1422        Ok(Bytes::from_static(&[0u8; 32]))
1423    }
1424
1425    fn save_hard_state(&self) -> Result<(), Error> {
1426        let last_applied = self.last_applied();
1427        self.persist_last_applied(last_applied)?;
1428
1429        if let Some(last_snapshot_metadata) = self.snapshot_metadata() {
1430            self.persist_last_snapshot_metadata(&last_snapshot_metadata)?;
1431        }
1432
1433        self.flush()?;
1434        Ok(())
1435    }
1436
1437    fn flush(&self) -> Result<(), Error> {
1438        self.persist_data()?;
1439        self.persist_metadata()?;
1440        // self.clear_wal()?;
1441        Ok(())
1442    }
1443
1444    async fn flush_async(&self) -> Result<(), Error> {
1445        self.persist_data_async().await?;
1446        self.persist_metadata_async().await?;
1447        // self.clear_wal_async().await?;
1448        Ok(())
1449    }
1450
1451    async fn reset(&self) -> Result<(), Error> {
1452        self.reset().await
1453    }
1454
1455    async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
1456        // Fast path: no lease configured
1457        let Some(ref lease) = self.lease else {
1458            return Ok(vec![]);
1459        };
1460
1461        // Get all expired keys
1462        let now = SystemTime::now();
1463        let expired_keys = lease.get_expired_keys(now);
1464
1465        if expired_keys.is_empty() {
1466            return Ok(vec![]);
1467        }
1468
1469        debug!(
1470            "Lease background cleanup: found {} expired keys",
1471            expired_keys.len()
1472        );
1473
1474        // Delete expired keys from storage
1475        {
1476            let mut data = self.data.write();
1477            for key in &expired_keys {
1478                data.remove(key);
1479            }
1480        }
1481
1482        // Persist to disk after batch deletion
1483        if let Err(e) = self.persist_data() {
1484            error!("Failed to persist after background cleanup: {:?}", e);
1485        }
1486
1487        info!(
1488            "Lease background cleanup: deleted {} expired keys",
1489            expired_keys.len()
1490        );
1491
1492        Ok(expired_keys)
1493    }
1494}