d_engine/storage/adaptors/file/
file_state_machine.rs

1use std::collections::HashMap;
2use std::io::Write;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::Ordering;
7
8use bytes::Bytes;
9use parking_lot::RwLock;
10use prost::Message;
11use tokio::fs;
12use tokio::fs::File;
13use tokio::fs::OpenOptions;
14use tokio::io::AsyncReadExt;
15use tokio::io::AsyncWriteExt;
16use tokio::time::Instant;
17use tonic::async_trait;
18use tracing::debug;
19use tracing::error;
20use tracing::info;
21use tracing::trace;
22use tracing::warn;
23
24use crate::proto::client::write_command::Delete;
25use crate::proto::client::write_command::Insert;
26use crate::proto::client::write_command::Operation;
27use crate::proto::client::WriteCommand;
28use crate::proto::common::entry_payload::Payload;
29use crate::proto::common::Entry;
30use crate::proto::common::LogId;
31use crate::proto::storage::SnapshotMetadata;
32use crate::Error;
33use crate::StateMachine;
34use crate::StorageError;
35
36type FileStateMachineDataType = RwLock<HashMap<Bytes, (Bytes, u64)>>;
37
38/// WAL operation codes for fixed-size encoding
39#[repr(u8)]
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41enum WalOpCode {
42    Noop = 0,
43    Insert = 1,
44    Delete = 2,
45    Config = 3,
46}
47
48impl WalOpCode {
49    fn from_str(s: &str) -> Self {
50        match s {
51            "INSERT" => Self::Insert,
52            "DELETE" => Self::Delete,
53            "CONFIG" => Self::Config,
54            _ => Self::Noop,
55        }
56    }
57
58    fn from_u8(byte: u8) -> Self {
59        match byte {
60            1 => Self::Insert,
61            2 => Self::Delete,
62            3 => Self::Config,
63            _ => Self::Noop,
64        }
65    }
66}
67
68/// File-based state machine implementation with persistence
69///
70/// Design principles:
71/// - All data is persisted to disk for durability
72/// - In-memory cache for fast read operations
73/// - Write-ahead logging for crash consistency
74/// - Efficient snapshot handling with file-based storage
75/// - Thread-safe with minimal lock contention
76#[derive(Debug)]
77pub struct FileStateMachine {
78    // Key-value storage with disk persistence
79    data: FileStateMachineDataType, // (value, term)
80
81    // Raft state with disk persistence
82    last_applied_index: AtomicU64,
83    last_applied_term: AtomicU64,
84    last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
85
86    // Operational state
87    running: AtomicBool,
88
89    // File handles for persistence
90    data_dir: PathBuf,
91    // data_file: RwLock<File>,
92    // metadata_file: RwLock<File>,
93    // wal_file: RwLock<File>, // Write-ahead log for crash recovery
94}
95
96impl FileStateMachine {
97    /// Creates a new file-based state machine with persistence
98    ///
99    /// # Arguments
100    /// * `data_dir` - Directory where data files will be stored
101    /// * `node_id` - Unique identifier for this node
102    ///
103    /// # Returns
104    /// Result containing the initialized FileStateMachine
105    pub async fn new(data_dir: PathBuf) -> Result<Self, Error> {
106        // Ensure data directory exists
107        fs::create_dir_all(&data_dir).await?;
108
109        let machine = Self {
110            data: RwLock::new(HashMap::new()),
111            last_applied_index: AtomicU64::new(0),
112            last_applied_term: AtomicU64::new(0),
113            last_snapshot_metadata: RwLock::new(None),
114            running: AtomicBool::new(true),
115            data_dir: data_dir.clone(),
116        };
117
118        // Load existing data from disk
119        machine.load_from_disk().await?;
120
121        Ok(machine)
122    }
123
124    /// Loads state machine data from disk files
125    async fn load_from_disk(&self) -> Result<(), Error> {
126        // Load last applied index and term from metadata file
127        self.load_metadata().await?;
128
129        // Load key-value data from data file
130        self.load_data().await?;
131
132        // Replay write-ahead log for crash recovery
133        self.replay_wal().await?;
134
135        info!("Loaded state machine data from disk");
136        Ok(())
137    }
138
139    /// Loads metadata from disk
140    async fn load_metadata(&self) -> Result<(), Error> {
141        let metadata_path = self.data_dir.join("metadata.bin");
142        if !metadata_path.exists() {
143            return Ok(());
144        }
145
146        let mut file = File::open(metadata_path).await?;
147        let mut buffer = [0u8; 16];
148
149        if file.read_exact(&mut buffer).await.is_ok() {
150            let index = u64::from_be_bytes([
151                buffer[0], buffer[1], buffer[2], buffer[3], buffer[4], buffer[5], buffer[6],
152                buffer[7],
153            ]);
154
155            let term = u64::from_be_bytes([
156                buffer[8], buffer[9], buffer[10], buffer[11], buffer[12], buffer[13], buffer[14],
157                buffer[15],
158            ]);
159
160            self.last_applied_index.store(index, Ordering::SeqCst);
161            self.last_applied_term.store(term, Ordering::SeqCst);
162        }
163
164        Ok(())
165    }
166
167    /// Loads key-value data from disk
168    async fn load_data(&self) -> Result<(), Error> {
169        let data_path = self.data_dir.join("state.data");
170        if !data_path.exists() {
171            return Ok(());
172        }
173
174        let mut file = File::open(data_path).await?;
175        let mut buffer = Vec::new();
176        file.read_to_end(&mut buffer).await?;
177
178        let mut pos = 0;
179        let mut data = self.data.write();
180
181        while pos < buffer.len() {
182            // Read key length
183            if pos + 8 > buffer.len() {
184                break;
185            }
186
187            let key_len_bytes = &buffer[pos..pos + 8];
188            let key_len = u64::from_be_bytes([
189                key_len_bytes[0],
190                key_len_bytes[1],
191                key_len_bytes[2],
192                key_len_bytes[3],
193                key_len_bytes[4],
194                key_len_bytes[5],
195                key_len_bytes[6],
196                key_len_bytes[7],
197            ]) as usize;
198
199            pos += 8;
200
201            // Read key
202            if pos + key_len > buffer.len() {
203                break;
204            }
205
206            let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
207            pos += key_len;
208
209            // Read value length
210            if pos + 8 > buffer.len() {
211                break;
212            }
213
214            let value_len_bytes = &buffer[pos..pos + 8];
215            let value_len = u64::from_be_bytes([
216                value_len_bytes[0],
217                value_len_bytes[1],
218                value_len_bytes[2],
219                value_len_bytes[3],
220                value_len_bytes[4],
221                value_len_bytes[5],
222                value_len_bytes[6],
223                value_len_bytes[7],
224            ]) as usize;
225
226            pos += 8;
227
228            // Read value
229            if pos + value_len > buffer.len() {
230                break;
231            }
232
233            let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
234            pos += value_len;
235
236            // Read term
237            if pos + 8 > buffer.len() {
238                break;
239            }
240
241            let term_bytes = &buffer[pos..pos + 8];
242            let term = u64::from_be_bytes([
243                term_bytes[0],
244                term_bytes[1],
245                term_bytes[2],
246                term_bytes[3],
247                term_bytes[4],
248                term_bytes[5],
249                term_bytes[6],
250                term_bytes[7],
251            ]);
252
253            pos += 8;
254
255            // Store in memory
256            data.insert(key, (value, term));
257        }
258
259        Ok(())
260    }
261
262    /// Replays write-ahead log for crash recovery
263    async fn replay_wal(&self) -> Result<(), Error> {
264        let wal_path = self.data_dir.join("wal.log");
265        if !wal_path.exists() {
266            debug!("No WAL file found, skipping replay");
267            return Ok(());
268        }
269
270        let mut file = File::open(wal_path).await?;
271        let mut buffer = Vec::new();
272        file.read_to_end(&mut buffer).await?;
273
274        if buffer.is_empty() {
275            debug!("WAL file is empty, skipping replay");
276            return Ok(());
277        }
278
279        let mut pos = 0;
280        let mut operations = Vec::new();
281        let mut replayed_count = 0;
282
283        while pos + 17 < buffer.len() {
284            // Read entry index (8 bytes)
285            let _index = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
286            pos += 8;
287
288            // Read entry term (8 bytes)
289            let term = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
290            pos += 8;
291
292            // Read operation code (1 byte)
293            let op_code = WalOpCode::from_u8(buffer[pos]);
294            pos += 1;
295
296            // Check if we have enough bytes for key length
297            if pos + 8 > buffer.len() {
298                warn!("Incomplete key length at position {}, stopping replay", pos);
299                break;
300            }
301
302            // Read key length (8 bytes)
303            let key_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
304            pos += 8;
305
306            // Check if we have enough data for the key
307            if pos + key_len > buffer.len() {
308                warn!(
309                    "Incomplete key data at position {} (need {} bytes, have {})",
310                    pos,
311                    key_len,
312                    buffer.len() - pos
313                );
314                break;
315            }
316
317            // Read key
318            let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
319            pos += key_len;
320
321            // Check if we have enough bytes for value length
322            if pos + 8 > buffer.len() {
323                warn!(
324                    "Incomplete value length at position {}, stopping replay",
325                    pos
326                );
327                break;
328            }
329
330            // Read value length (8 bytes)
331            let value_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
332            pos += 8;
333
334            // Read value if present
335            let value = if value_len > 0 {
336                if pos + value_len > buffer.len() {
337                    warn!("Incomplete value data at position {}, stopping replay", pos);
338                    break;
339                }
340                let value_data = Bytes::from(buffer[pos..pos + value_len].to_vec());
341                pos += value_len;
342                Some(value_data)
343            } else {
344                None
345            };
346
347            operations.push((op_code, key, value, term));
348            replayed_count += 1;
349        }
350
351        info!(
352            "Parsed {} WAL operations, applying to memory",
353            operations.len()
354        );
355
356        // Apply all collected operations with a single lock acquisition
357        let mut applied_count = 0;
358        {
359            let mut data = self.data.write();
360            for (op_code, key, value, term) in operations {
361                match op_code {
362                    WalOpCode::Insert => {
363                        if let Some(value_data) = value {
364                            data.insert(key, (value_data, term));
365                            applied_count += 1;
366                            debug!("Applied INSERT");
367                        } else {
368                            warn!("INSERT operation without value");
369                        }
370                    }
371                    WalOpCode::Delete => {
372                        data.remove(&key);
373                        applied_count += 1;
374                        debug!("Replayed DELETE: key={:?}", key);
375                    }
376                    WalOpCode::Noop | WalOpCode::Config => {
377                        // No data modification needed
378                        applied_count += 1;
379                        debug!("Replayed {:?} operation", op_code);
380                    }
381                }
382            }
383        }
384
385        info!(
386            "WAL replay complete: {} operations replayed_count, {} operations applied",
387            replayed_count, applied_count
388        );
389
390        // Clear WAL only if replay was successful
391        if applied_count > 0 {
392            self.clear_wal_async().await?;
393            debug!(
394                "Cleared WAL after successful replay of {} operations",
395                applied_count
396            );
397        }
398
399        Ok(())
400    }
401
402    /// Persists key-value data to disk
403    fn persist_data(&self) -> Result<(), Error> {
404        // Collect data first to minimize lock time
405        let data_copy: HashMap<Bytes, (Bytes, u64)> = {
406            let data = self.data.read();
407            data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
408        };
409
410        // Write to file without holding data lock
411        let data_path = self.data_dir.join("state.data");
412        let mut file = std::fs::OpenOptions::new()
413            .write(true)
414            .create(true)
415            .truncate(true)
416            .open(data_path)?;
417
418        for (key, (value, term)) in data_copy.iter() {
419            // Write key length (8 bytes)
420            let key_len = key.len() as u64;
421            file.write_all(&key_len.to_be_bytes())?;
422
423            // Write key
424            file.write_all(key)?;
425
426            // Write value length (8 bytes)
427            let value_len = value.len() as u64;
428            file.write_all(&value_len.to_be_bytes())?;
429
430            // Write value
431            file.write_all(value)?;
432
433            // Write term (8 bytes)
434            file.write_all(&term.to_be_bytes())?;
435        }
436
437        file.flush()?;
438        Ok(())
439    }
440
441    /// Persists key-value data to disk
442    async fn persist_data_async(&self) -> Result<(), Error> {
443        // Collect data first to minimize lock time
444        let data_copy: HashMap<Bytes, (Bytes, u64)> = {
445            let data = self.data.read();
446            data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
447        };
448
449        // Write to file without holding data lock
450        let data_path = self.data_dir.join("state.data");
451        let mut file = OpenOptions::new()
452            .write(true)
453            .create(true)
454            .truncate(true)
455            .open(data_path)
456            .await?;
457
458        for (key, (value, term)) in data_copy.iter() {
459            // Write key length (8 bytes)
460            let key_len = key.len() as u64;
461            file.write_all(&key_len.to_be_bytes()).await?;
462
463            // Write key
464            file.write_all(key.as_ref()).await?;
465
466            // Write value length (8 bytes)
467            let value_len = value.len() as u64;
468            file.write_all(&value_len.to_be_bytes()).await?;
469
470            // Write value
471            file.write_all(value.as_ref()).await?;
472
473            // Write term (8 bytes)
474            file.write_all(&term.to_be_bytes()).await?;
475        }
476
477        file.flush().await?;
478        Ok(())
479    }
480
481    /// Persists metadata to disk
482    fn persist_metadata(&self) -> Result<(), Error> {
483        let metadata_path = self.data_dir.join("metadata.bin");
484        let mut file = std::fs::OpenOptions::new()
485            .write(true)
486            .create(true)
487            .truncate(true)
488            .open(metadata_path)?;
489
490        let index = self.last_applied_index.load(Ordering::SeqCst);
491        let term = self.last_applied_term.load(Ordering::SeqCst);
492
493        file.write_all(&index.to_be_bytes())?;
494        file.write_all(&term.to_be_bytes())?;
495
496        file.flush()?;
497        Ok(())
498    }
499
500    async fn persist_metadata_async(&self) -> Result<(), Error> {
501        let metadata_path = self.data_dir.join("metadata.bin");
502        let mut file = OpenOptions::new()
503            .write(true)
504            .create(true)
505            .truncate(true)
506            .open(metadata_path)
507            .await?;
508
509        let index = self.last_applied_index.load(Ordering::SeqCst);
510        let term = self.last_applied_term.load(Ordering::SeqCst);
511
512        file.write_all(&index.to_be_bytes()).await?;
513        file.write_all(&term.to_be_bytes()).await?;
514
515        file.flush().await?;
516        Ok(())
517    }
518
519    /// Clears the write-ahead log (called after successful persistence)
520    #[allow(unused)]
521    fn clear_wal(&self) -> Result<(), Error> {
522        let wal_path = self.data_dir.join("wal.log");
523        let mut file = std::fs::OpenOptions::new()
524            .write(true)
525            .create(true)
526            .truncate(true)
527            .open(wal_path)?;
528
529        file.set_len(0)?;
530        file.flush()?;
531        Ok(())
532    }
533
534    /// Clears the write-ahead log (called after successful persistence)
535    async fn clear_wal_async(&self) -> Result<(), Error> {
536        let wal_path = self.data_dir.join("wal.log");
537        let mut file = OpenOptions::new()
538            .write(true)
539            .create(true)
540            .truncate(true)
541            .open(wal_path)
542            .await?;
543
544        file.set_len(0).await?;
545        file.flush().await?;
546        Ok(())
547    }
548
549    /// Resets the state machine to its initial empty state
550    ///
551    /// This method:
552    /// 1. Clears all in-memory data
553    /// 2. Resets Raft state to initial values
554    /// 3. Clears all persisted files
555    /// 4. Maintains operational state (running status, node ID)
556    pub async fn reset(&self) -> Result<(), Error> {
557        info!("Resetting state machine");
558
559        // Clear in-memory data
560        {
561            let mut data = self.data.write();
562            data.clear();
563        }
564
565        // Reset Raft state
566        self.last_applied_index.store(0, Ordering::SeqCst);
567        self.last_applied_term.store(0, Ordering::SeqCst);
568
569        {
570            let mut snapshot_metadata = self.last_snapshot_metadata.write();
571            *snapshot_metadata = None;
572        }
573
574        // Clear all persisted files
575        self.clear_data_file().await?;
576        self.clear_metadata_file().await?;
577        self.clear_wal_async().await?;
578
579        info!("State machine reset completed");
580        Ok(())
581    }
582
583    /// Clears the data file
584    async fn clear_data_file(&self) -> Result<(), Error> {
585        let data_path = self.data_dir.join("state.data");
586        let mut file = OpenOptions::new()
587            .write(true)
588            .create(true)
589            .truncate(true)
590            .open(data_path)
591            .await?;
592
593        file.set_len(0).await?;
594        file.flush().await?;
595        Ok(())
596    }
597
598    /// Clears the metadata file
599    async fn clear_metadata_file(&self) -> Result<(), Error> {
600        let metadata_path = self.data_dir.join("metadata.bin");
601        let mut file = OpenOptions::new()
602            .write(true)
603            .create(true)
604            .truncate(true)
605            .open(metadata_path)
606            .await?;
607
608        // Write default values (0 for both index and term)
609        file.write_all(&0u64.to_be_bytes()).await?;
610        file.write_all(&0u64.to_be_bytes()).await?;
611
612        file.flush().await?;
613        Ok(())
614    }
615
616    /// Batch WAL writes with proper durability guarantees
617    ///
618    /// Format per entry:
619    /// - 8 bytes: entry index (big-endian u64)
620    /// - 8 bytes: entry term (big-endian u64)
621    /// - 1 byte: operation code (0=NOOP, 1=INSERT, 2=DELETE, 3=CONFIG)
622    /// - 8 bytes: key length (big-endian u64)
623    /// - N bytes: key data
624    /// - 8 bytes: value length (big-endian u64, 0 if no value)
625    /// - M bytes: value data (only if length > 0)
626    pub(crate) async fn append_to_wal(
627        &self,
628        entries: Vec<(Entry, String, Bytes, Option<Bytes>)>,
629    ) -> Result<(), Error> {
630        if entries.is_empty() {
631            return Ok(());
632        }
633
634        let wal_path = self.data_dir.join("wal.log");
635
636        let mut file =
637            OpenOptions::new().write(true).create(true).append(true).open(&wal_path).await?;
638
639        // Pre-allocate buffer with estimated size
640        let estimated_size: usize = entries
641            .iter()
642            .map(|(_, _, key, value)| {
643                8 + 8 + 1 + 8 + key.len() + 8 + value.as_ref().map_or(0, |v| v.len())
644            })
645            .sum();
646
647        // Single batched write instead of multiple small writes
648        let mut batch_buffer = Vec::with_capacity(estimated_size);
649
650        for (entry, operation, key, value) in entries {
651            // Write entry index and term (16 bytes total)
652            batch_buffer.extend_from_slice(&entry.index.to_be_bytes());
653            batch_buffer.extend_from_slice(&entry.term.to_be_bytes());
654
655            // Write operation code (1 byte)
656            let op_code = WalOpCode::from_str(&operation);
657            batch_buffer.push(op_code as u8);
658
659            // Write key length and data (8 + N bytes)
660            batch_buffer.extend_from_slice(&(key.len() as u64).to_be_bytes());
661            batch_buffer.extend_from_slice(&key);
662
663            // Write value length and data (8 + M bytes)
664            // Always write length field for consistent format
665            if let Some(value_data) = value {
666                batch_buffer.extend_from_slice(&(value_data.len() as u64).to_be_bytes());
667                batch_buffer.extend_from_slice(&value_data);
668            } else {
669                // Write 0 length for operations without value
670                batch_buffer.extend_from_slice(&0u64.to_be_bytes());
671            }
672        }
673
674        file.write_all(&batch_buffer).await?;
675        file.flush().await?;
676
677        Ok(())
678    }
679
680    /// Checkpoint: Persist memory to disk and clear WAL
681    /// This is the "safe point" after which WAL is no longer needed
682    #[allow(unused)]
683    pub(crate) async fn checkpoint(&self) -> Result<(), Error> {
684        // 1. Persist current state
685        self.persist_data_async().await?;
686        self.persist_metadata_async().await?;
687
688        // 2. Clear WAL (data is now in state.data)
689        self.clear_wal_async().await?;
690
691        Ok(())
692    }
693}
694
695impl Drop for FileStateMachine {
696    fn drop(&mut self) {
697        let timer = Instant::now();
698
699        // Save state into local database including flush operation
700        match self.save_hard_state() {
701            Ok(_) => debug!("StateMachine saved in {:?}", timer.elapsed()),
702            Err(e) => error!("Failed to save StateMachine: {}", e),
703        }
704    }
705}
706
707#[async_trait]
708impl StateMachine for FileStateMachine {
709    fn start(&self) -> Result<(), Error> {
710        self.running.store(true, Ordering::SeqCst);
711        info!("File state machine started");
712        Ok(())
713    }
714
715    fn stop(&self) -> Result<(), Error> {
716        // Ensure all data is flushed to disk before stopping
717        self.running.store(false, Ordering::SeqCst);
718        info!("File state machine stopped");
719        Ok(())
720    }
721
722    fn is_running(&self) -> bool {
723        self.running.load(Ordering::SeqCst)
724    }
725
726    fn get(
727        &self,
728        key_buffer: &[u8],
729    ) -> Result<Option<Bytes>, Error> {
730        let data = self.data.read();
731        Ok(data.get(key_buffer).map(|(value, _)| value.clone()))
732    }
733
734    fn entry_term(
735        &self,
736        entry_id: u64,
737    ) -> Option<u64> {
738        let data = self.data.read();
739        data.values().find(|(_, index)| *index == entry_id).map(|(_, term)| *term)
740    }
741
742    async fn apply_chunk(
743        &self,
744        chunk: Vec<Entry>,
745    ) -> Result<(), Error> {
746        trace!("Applying chunk: {:?}.", chunk);
747
748        let mut highest_index_entry: Option<LogId> = None;
749        let mut batch_operations = Vec::new();
750
751        // PHASE 1: Decode all operations and prepare WAL entries
752        for entry in chunk {
753            let entry_index = entry.index;
754
755            assert!(entry.payload.is_some(), "Entry payload should not be None!");
756
757            // Ensure entries are processed in order
758            if let Some(prev) = &highest_index_entry {
759                assert!(
760                    entry.index > prev.index,
761                    "apply_chunk: received unordered entry at index {} (prev={})",
762                    entry.index,
763                    prev.index
764                );
765            }
766            highest_index_entry = Some(LogId {
767                index: entry.index,
768                term: entry.term,
769            });
770
771            // Decode operations without holding locks
772            match entry.payload.as_ref().unwrap().payload.as_ref() {
773                Some(Payload::Noop(_)) => {
774                    debug!("Handling NOOP command at index {}", entry.index);
775                    batch_operations.push((entry, "NOOP", Bytes::new(), None));
776                }
777                Some(Payload::Command(bytes)) => match WriteCommand::decode(&bytes[..]) {
778                    Ok(write_cmd) => {
779                        // Extract operation data for batch processing
780                        match write_cmd.operation {
781                            Some(Operation::Insert(Insert { key, value })) => {
782                                batch_operations.push((entry, "INSERT", key, Some(value)));
783                            }
784                            Some(Operation::Delete(Delete { key })) => {
785                                batch_operations.push((entry, "DELETE", key, None));
786                            }
787                            None => {
788                                warn!("WriteCommand without operation at index {}", entry.index);
789                                batch_operations.push((entry, "NOOP", Bytes::new(), None));
790                            }
791                        }
792                    }
793                    Err(e) => {
794                        error!(
795                            "Failed to decode WriteCommand at index {}: {:?}",
796                            entry.index, e
797                        );
798                        return Err(StorageError::SerializationError(e.to_string()).into());
799                    }
800                },
801                Some(Payload::Config(_config_change)) => {
802                    debug!("Ignoring config change at index {}", entry.index);
803                    batch_operations.push((entry, "CONFIG", Bytes::new(), None));
804                }
805                None => panic!("Entry payload variant should not be None!"),
806            }
807
808            info!("COMMITTED_LOG_METRIC: {}", entry_index);
809        }
810
811        // PHASE 2: Batch WAL writes (minimize I/O latency)
812        let mut wal_entries = Vec::new();
813        for (entry, operation, key, value) in &batch_operations {
814            // Prepare WAL data without immediate I/O
815            wal_entries.push((
816                entry.clone(),
817                operation.to_string(),
818                key.clone(),
819                value.clone(),
820            ));
821        }
822
823        // Single batch WAL write (reduces I/O overhead)
824        self.append_to_wal(wal_entries).await?;
825
826        // PHASE 3: Fast in-memory updates with minimal lock time (ZERO-COPY)
827        {
828            let mut data = self.data.write();
829
830            // Process all operations without any awaits inside the lock
831            for (entry, operation, key, value) in batch_operations {
832                match operation {
833                    "INSERT" => {
834                        if let Some(value) = value {
835                            // ZERO-COPY: Use existing Bytes without cloning if possible
836                            data.insert(key, (value, entry.term));
837                        }
838                    }
839                    "DELETE" => {
840                        data.remove(&key);
841                    }
842                    "NOOP" | "CONFIG" => {
843                        // No data modification needed
844                    }
845                    _ => warn!("Unknown operation: {}", operation),
846                }
847            }
848        } // Lock released immediately - no awaits inside!
849
850        if let Some(log_id) = highest_index_entry {
851            debug!("State machine - updated last_applied: {:?}", log_id);
852            self.update_last_applied(log_id);
853        }
854
855        // Persist changes to disk and clear WAL
856        self.persist_data_async().await?;
857        self.persist_metadata_async().await?;
858        self.clear_wal_async().await?;
859
860        Ok(())
861    }
862
863    fn len(&self) -> usize {
864        self.data.read().len()
865    }
866
867    fn update_last_applied(
868        &self,
869        last_applied: LogId,
870    ) {
871        self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
872        self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
873    }
874
875    fn last_applied(&self) -> LogId {
876        LogId {
877            index: self.last_applied_index.load(Ordering::SeqCst),
878            term: self.last_applied_term.load(Ordering::SeqCst),
879        }
880    }
881
882    fn persist_last_applied(
883        &self,
884        last_applied: LogId,
885    ) -> Result<(), Error> {
886        self.update_last_applied(last_applied);
887        self.persist_metadata()
888    }
889
890    fn update_last_snapshot_metadata(
891        &self,
892        snapshot_metadata: &SnapshotMetadata,
893    ) -> Result<(), Error> {
894        *self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
895        Ok(())
896    }
897
898    fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
899        self.last_snapshot_metadata.read().clone()
900    }
901
902    fn persist_last_snapshot_metadata(
903        &self,
904        snapshot_metadata: &SnapshotMetadata,
905    ) -> Result<(), Error> {
906        self.update_last_snapshot_metadata(snapshot_metadata)
907    }
908
909    async fn apply_snapshot_from_file(
910        &self,
911        metadata: &SnapshotMetadata,
912        snapshot_dir: std::path::PathBuf,
913    ) -> Result<(), Error> {
914        info!("Applying snapshot from file: {:?}", snapshot_dir);
915
916        // Read from the snapshot.bin file inside the directory
917        let snapshot_data_path = snapshot_dir.join("snapshot.bin");
918        let mut file = File::open(snapshot_data_path).await?;
919        let mut buffer = Vec::new();
920        file.read_to_end(&mut buffer).await?;
921
922        // Parse snapshot data
923        let mut pos = 0;
924        let mut new_data = HashMap::new();
925
926        while pos < buffer.len() {
927            // Read key length
928            if pos + 8 > buffer.len() {
929                break;
930            }
931
932            let key_len_bytes = &buffer[pos..pos + 8];
933            let key_len = u64::from_be_bytes([
934                key_len_bytes[0],
935                key_len_bytes[1],
936                key_len_bytes[2],
937                key_len_bytes[3],
938                key_len_bytes[4],
939                key_len_bytes[5],
940                key_len_bytes[6],
941                key_len_bytes[7],
942            ]) as usize;
943
944            pos += 8;
945
946            // Read key
947            if pos + key_len > buffer.len() {
948                break;
949            }
950
951            let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
952            pos += key_len;
953
954            // Read value length
955            if pos + 8 > buffer.len() {
956                break;
957            }
958
959            let value_len_bytes = &buffer[pos..pos + 8];
960            let value_len = u64::from_be_bytes([
961                value_len_bytes[0],
962                value_len_bytes[1],
963                value_len_bytes[2],
964                value_len_bytes[3],
965                value_len_bytes[4],
966                value_len_bytes[5],
967                value_len_bytes[6],
968                value_len_bytes[7],
969            ]) as usize;
970
971            pos += 8;
972
973            // Read value
974            if pos + value_len > buffer.len() {
975                break;
976            }
977
978            let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
979            pos += value_len;
980
981            // Read term
982            if pos + 8 > buffer.len() {
983                break;
984            }
985
986            let term_bytes = &buffer[pos..pos + 8];
987            let term = u64::from_be_bytes([
988                term_bytes[0],
989                term_bytes[1],
990                term_bytes[2],
991                term_bytes[3],
992                term_bytes[4],
993                term_bytes[5],
994                term_bytes[6],
995                term_bytes[7],
996            ]);
997
998            pos += 8;
999
1000            // Add to new data
1001            new_data.insert(key, (value, term));
1002        }
1003
1004        // Atomically replace the data
1005        {
1006            let mut data = self.data.write();
1007            *data = new_data;
1008        }
1009
1010        // Update metadata
1011        *self.last_snapshot_metadata.write() = Some(metadata.clone());
1012
1013        if let Some(last_included) = &metadata.last_included {
1014            self.update_last_applied(*last_included);
1015        }
1016
1017        // Persist to disk
1018        self.persist_data_async().await?;
1019        self.persist_metadata_async().await?;
1020        self.clear_wal_async().await?;
1021
1022        info!("Snapshot applied successfully");
1023        Ok(())
1024    }
1025
1026    async fn generate_snapshot_data(
1027        &self,
1028        new_snapshot_dir: std::path::PathBuf,
1029        last_included: LogId,
1030    ) -> Result<Bytes, Error> {
1031        info!("Generating snapshot data up to {:?}", last_included);
1032
1033        // Create snapshot directory
1034        fs::create_dir_all(&new_snapshot_dir).await?;
1035
1036        // Create snapshot file
1037        let snapshot_path = new_snapshot_dir.join("snapshot.bin");
1038        let mut file = File::create(&snapshot_path).await?;
1039
1040        let data_copy: HashMap<Bytes, (Bytes, u64)> = {
1041            let data = self.data.read();
1042            data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
1043        };
1044
1045        // Write data in the same format as the data file
1046        for (key, (value, term)) in data_copy.iter() {
1047            // Write key length (8 bytes)
1048            let key_len = key.len() as u64;
1049            file.write_all(&key_len.to_be_bytes()).await?;
1050
1051            // Write key
1052            file.write_all(key).await?;
1053
1054            // Write value length (8 bytes)
1055            let value_len = value.len() as u64;
1056            file.write_all(&value_len.to_be_bytes()).await?;
1057
1058            // Write value
1059            file.write_all(value).await?;
1060
1061            // Write term (8 bytes)
1062            file.write_all(&term.to_be_bytes()).await?;
1063        }
1064
1065        file.flush().await?;
1066
1067        // Update metadata
1068        let metadata = SnapshotMetadata {
1069            last_included: Some(last_included),
1070            checksum: Bytes::from(vec![0; 32]), // Simple checksum for demo
1071        };
1072
1073        self.update_last_snapshot_metadata(&metadata)?;
1074
1075        info!("Snapshot generated at {:?}", snapshot_path);
1076
1077        // Return dummy checksum
1078        Ok(Bytes::from_static(&[0u8; 32]))
1079    }
1080
1081    fn save_hard_state(&self) -> Result<(), Error> {
1082        let last_applied = self.last_applied();
1083        self.persist_last_applied(last_applied)?;
1084
1085        if let Some(last_snapshot_metadata) = self.snapshot_metadata() {
1086            self.persist_last_snapshot_metadata(&last_snapshot_metadata)?;
1087        }
1088
1089        self.flush()?;
1090        Ok(())
1091    }
1092
1093    fn flush(&self) -> Result<(), Error> {
1094        self.persist_data()?;
1095        self.persist_metadata()?;
1096        // self.clear_wal()?;
1097        Ok(())
1098    }
1099
1100    async fn flush_async(&self) -> Result<(), Error> {
1101        self.persist_data_async().await?;
1102        self.persist_metadata_async().await?;
1103        // self.clear_wal_async().await?;
1104        Ok(())
1105    }
1106
1107    async fn reset(&self) -> Result<(), Error> {
1108        self.reset().await
1109    }
1110}