d_engine/storage/adaptors/file/
file_state_machine.rs

1use std::collections::HashMap;
2use std::io::Write;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::Ordering;
7
8use parking_lot::RwLock;
9use prost::Message;
10use tokio::fs;
11use tokio::fs::File;
12use tokio::fs::OpenOptions;
13use tokio::io::AsyncReadExt;
14use tokio::io::AsyncWriteExt;
15use tokio::time::Instant;
16use tonic::async_trait;
17use tracing::debug;
18use tracing::error;
19use tracing::info;
20use tracing::trace;
21use tracing::warn;
22
23use crate::proto::client::write_command::Delete;
24use crate::proto::client::write_command::Insert;
25use crate::proto::client::write_command::Operation;
26use crate::proto::client::WriteCommand;
27use crate::proto::common::entry_payload::Payload;
28use crate::proto::common::Entry;
29use crate::proto::common::LogId;
30use crate::proto::storage::SnapshotMetadata;
31use crate::Error;
32use crate::StateMachine;
33use crate::StorageError;
34
35type FileStateMachineDataType = RwLock<HashMap<Vec<u8>, (Vec<u8>, u64)>>;
36/// File-based state machine implementation with persistence
37///
38/// Design principles:
39/// - All data is persisted to disk for durability
40/// - In-memory cache for fast read operations
41/// - Write-ahead logging for crash consistency
42/// - Efficient snapshot handling with file-based storage
43/// - Thread-safe with minimal lock contention
44#[derive(Debug)]
45pub struct FileStateMachine {
46    // Key-value storage with disk persistence
47    data: FileStateMachineDataType, // (value, term)
48
49    // Raft state with disk persistence
50    last_applied_index: AtomicU64,
51    last_applied_term: AtomicU64,
52    last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
53
54    // Operational state
55    running: AtomicBool,
56    node_id: u32,
57
58    // File handles for persistence
59    data_dir: PathBuf,
60    // data_file: RwLock<File>,
61    // metadata_file: RwLock<File>,
62    // wal_file: RwLock<File>, // Write-ahead log for crash recovery
63}
64
65impl FileStateMachine {
66    /// Creates a new file-based state machine with persistence
67    ///
68    /// # Arguments
69    /// * `data_dir` - Directory where data files will be stored
70    /// * `node_id` - Unique identifier for this node
71    ///
72    /// # Returns
73    /// Result containing the initialized FileStateMachine
74    pub async fn new(
75        data_dir: PathBuf,
76        node_id: u32,
77    ) -> Result<Self, Error> {
78        // Ensure data directory exists
79        fs::create_dir_all(&data_dir).await?;
80
81        let machine = Self {
82            data: RwLock::new(HashMap::new()),
83            last_applied_index: AtomicU64::new(0),
84            last_applied_term: AtomicU64::new(0),
85            last_snapshot_metadata: RwLock::new(None),
86            running: AtomicBool::new(true),
87            node_id,
88            data_dir: data_dir.clone(),
89        };
90
91        // Load existing data from disk
92        machine.load_from_disk().await?;
93
94        Ok(machine)
95    }
96
97    /// Loads state machine data from disk files
98    async fn load_from_disk(&self) -> Result<(), Error> {
99        // Load last applied index and term from metadata file
100        self.load_metadata().await?;
101
102        // Load key-value data from data file
103        self.load_data().await?;
104
105        // Replay write-ahead log for crash recovery
106        self.replay_wal().await?;
107
108        info!(
109            "[Node-{}] Loaded state machine data from disk",
110            self.node_id
111        );
112        Ok(())
113    }
114
115    /// Loads metadata from disk
116    async fn load_metadata(&self) -> Result<(), Error> {
117        let metadata_path = self.data_dir.join("metadata.bin");
118        if !metadata_path.exists() {
119            return Ok(());
120        }
121
122        let mut file = File::open(metadata_path).await?;
123        let mut buffer = [0u8; 16];
124
125        if file.read_exact(&mut buffer).await.is_ok() {
126            let index = u64::from_be_bytes([
127                buffer[0], buffer[1], buffer[2], buffer[3], buffer[4], buffer[5], buffer[6],
128                buffer[7],
129            ]);
130
131            let term = u64::from_be_bytes([
132                buffer[8], buffer[9], buffer[10], buffer[11], buffer[12], buffer[13], buffer[14],
133                buffer[15],
134            ]);
135
136            self.last_applied_index.store(index, Ordering::SeqCst);
137            self.last_applied_term.store(term, Ordering::SeqCst);
138        }
139
140        Ok(())
141    }
142
143    /// Loads key-value data from disk
144    async fn load_data(&self) -> Result<(), Error> {
145        let data_path = self.data_dir.join("state.data");
146        if !data_path.exists() {
147            return Ok(());
148        }
149
150        let mut file = File::open(data_path).await?;
151        let mut buffer = Vec::new();
152        file.read_to_end(&mut buffer).await?;
153
154        let mut pos = 0;
155        let mut data = self.data.write();
156
157        while pos < buffer.len() {
158            // Read key length
159            if pos + 8 > buffer.len() {
160                break;
161            }
162
163            let key_len_bytes = &buffer[pos..pos + 8];
164            let key_len = u64::from_be_bytes([
165                key_len_bytes[0],
166                key_len_bytes[1],
167                key_len_bytes[2],
168                key_len_bytes[3],
169                key_len_bytes[4],
170                key_len_bytes[5],
171                key_len_bytes[6],
172                key_len_bytes[7],
173            ]) as usize;
174
175            pos += 8;
176
177            // Read key
178            if pos + key_len > buffer.len() {
179                break;
180            }
181
182            let key = buffer[pos..pos + key_len].to_vec();
183            pos += key_len;
184
185            // Read value length
186            if pos + 8 > buffer.len() {
187                break;
188            }
189
190            let value_len_bytes = &buffer[pos..pos + 8];
191            let value_len = u64::from_be_bytes([
192                value_len_bytes[0],
193                value_len_bytes[1],
194                value_len_bytes[2],
195                value_len_bytes[3],
196                value_len_bytes[4],
197                value_len_bytes[5],
198                value_len_bytes[6],
199                value_len_bytes[7],
200            ]) as usize;
201
202            pos += 8;
203
204            // Read value
205            if pos + value_len > buffer.len() {
206                break;
207            }
208
209            let value = buffer[pos..pos + value_len].to_vec();
210            pos += value_len;
211
212            // Read term
213            if pos + 8 > buffer.len() {
214                break;
215            }
216
217            let term_bytes = &buffer[pos..pos + 8];
218            let term = u64::from_be_bytes([
219                term_bytes[0],
220                term_bytes[1],
221                term_bytes[2],
222                term_bytes[3],
223                term_bytes[4],
224                term_bytes[5],
225                term_bytes[6],
226                term_bytes[7],
227            ]);
228
229            pos += 8;
230
231            // Store in memory
232            data.insert(key, (value, term));
233        }
234
235        Ok(())
236    }
237
238    /// Replays write-ahead log for crash recovery
239    async fn replay_wal(&self) -> Result<(), Error> {
240        let wal_path = self.data_dir.join("wal.log");
241        if !wal_path.exists() {
242            return Ok(());
243        }
244
245        let mut file = File::open(wal_path).await?;
246        let mut buffer = Vec::new();
247        file.read_to_end(&mut buffer).await?;
248
249        // If WAL has content, we need to replay it
250        if !buffer.is_empty() {
251            warn!(
252                "[Node-{}] Replaying write-ahead log for crash recovery",
253                self.node_id
254            );
255
256            // For simplicity, we'll just clear and rebuild from data file
257            // In a production system, you'd parse and replay each WAL entry
258            file.set_len(0).await?; // Clear WAL after recovery
259        }
260
261        Ok(())
262    }
263
264    /// Persists key-value data to disk
265    fn persist_data(&self) -> Result<(), Error> {
266        // Collect data first to minimize lock time
267        let data_copy: HashMap<Vec<u8>, (Vec<u8>, u64)> = {
268            let data = self.data.read();
269            data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
270        };
271
272        // Write to file without holding data lock
273        let data_path = self.data_dir.join("state.data");
274        let mut file = std::fs::OpenOptions::new()
275            .write(true)
276            .create(true)
277            .truncate(true)
278            .open(data_path)?;
279
280        for (key, (value, term)) in data_copy.iter() {
281            // Write key length (8 bytes)
282            let key_len = key.len() as u64;
283            file.write_all(&key_len.to_be_bytes())?;
284
285            // Write key
286            file.write_all(key)?;
287
288            // Write value length (8 bytes)
289            let value_len = value.len() as u64;
290            file.write_all(&value_len.to_be_bytes())?;
291
292            // Write value
293            file.write_all(value)?;
294
295            // Write term (8 bytes)
296            file.write_all(&term.to_be_bytes())?;
297        }
298
299        file.flush()?;
300        Ok(())
301    }
302
303    /// Persists key-value data to disk
304    async fn persist_data_async(&self) -> Result<(), Error> {
305        // Collect data first to minimize lock time
306        let data_copy: HashMap<Vec<u8>, (Vec<u8>, u64)> = {
307            let data = self.data.read();
308            data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
309        };
310
311        // Write to file without holding data lock
312        let data_path = self.data_dir.join("state.data");
313        let mut file = OpenOptions::new()
314            .write(true)
315            .create(true)
316            .truncate(true)
317            .open(data_path)
318            .await?;
319
320        for (key, (value, term)) in data_copy.iter() {
321            // Write key length (8 bytes)
322            let key_len = key.len() as u64;
323            file.write_all(&key_len.to_be_bytes()).await?;
324
325            // Write key
326            file.write_all(key).await?;
327
328            // Write value length (8 bytes)
329            let value_len = value.len() as u64;
330            file.write_all(&value_len.to_be_bytes()).await?;
331
332            // Write value
333            file.write_all(value).await?;
334
335            // Write term (8 bytes)
336            file.write_all(&term.to_be_bytes()).await?;
337        }
338
339        file.flush().await?;
340        Ok(())
341    }
342
343    /// Persists metadata to disk
344    fn persist_metadata(&self) -> Result<(), Error> {
345        let metadata_path = self.data_dir.join("metadata.bin");
346        let mut file = std::fs::OpenOptions::new()
347            .write(true)
348            .create(true)
349            .truncate(true)
350            .open(metadata_path)?;
351
352        let index = self.last_applied_index.load(Ordering::SeqCst);
353        let term = self.last_applied_term.load(Ordering::SeqCst);
354
355        file.write_all(&index.to_be_bytes())?;
356        file.write_all(&term.to_be_bytes())?;
357
358        file.flush()?;
359        Ok(())
360    }
361
362    async fn persist_metadata_async(&self) -> Result<(), Error> {
363        let metadata_path = self.data_dir.join("metadata.bin");
364        let mut file = OpenOptions::new()
365            .write(true)
366            .create(true)
367            .truncate(true)
368            .open(metadata_path)
369            .await?;
370
371        let index = self.last_applied_index.load(Ordering::SeqCst);
372        let term = self.last_applied_term.load(Ordering::SeqCst);
373
374        file.write_all(&index.to_be_bytes()).await?;
375        file.write_all(&term.to_be_bytes()).await?;
376
377        file.flush().await?;
378        Ok(())
379    }
380
381    /// Appends an operation to the write-ahead log
382    async fn append_to_wal(
383        &self,
384        entry: &Entry,
385        operation: &str,
386        key: &[u8],
387        value: Option<&[u8]>,
388    ) -> Result<(), Error> {
389        let wal_path = self.data_dir.join("wal.log");
390        let mut file =
391            OpenOptions::new().write(true).create(true).append(true).open(wal_path).await?;
392
393        // Write entry index and term
394        file.write_all(&entry.index.to_be_bytes()).await?;
395        file.write_all(&entry.term.to_be_bytes()).await?;
396
397        // Write operation type (insert/delete)
398        file.write_all(operation.as_bytes()).await?;
399
400        // Write key length and key
401        file.write_all(&(key.len() as u64).to_be_bytes()).await?;
402        file.write_all(key).await?;
403
404        // For insert operations, write value length and value
405        if let Some(value_data) = value {
406            file.write_all(&(value_data.len() as u64).to_be_bytes()).await?;
407            file.write_all(value_data).await?;
408        }
409
410        file.flush().await?;
411        Ok(())
412    }
413
414    /// Clears the write-ahead log (called after successful persistence)
415    fn clear_wal(&self) -> Result<(), Error> {
416        let wal_path = self.data_dir.join("wal.log");
417        let mut file = std::fs::OpenOptions::new()
418            .write(true)
419            .create(true)
420            .truncate(true)
421            .open(wal_path)?;
422
423        file.set_len(0)?;
424        file.flush()?;
425        Ok(())
426    }
427
428    /// Clears the write-ahead log (called after successful persistence)
429    async fn clear_wal_async(&self) -> Result<(), Error> {
430        let wal_path = self.data_dir.join("wal.log");
431        let mut file = OpenOptions::new()
432            .write(true)
433            .create(true)
434            .truncate(true)
435            .open(wal_path)
436            .await?;
437
438        file.set_len(0).await?;
439        file.flush().await?;
440        Ok(())
441    }
442
443    /// Resets the state machine to its initial empty state
444    ///
445    /// This method:
446    /// 1. Clears all in-memory data
447    /// 2. Resets Raft state to initial values
448    /// 3. Clears all persisted files
449    /// 4. Maintains operational state (running status, node ID)
450    pub async fn reset(&self) -> Result<(), Error> {
451        info!("[Node-{}] Resetting state machine", self.node_id);
452
453        // Clear in-memory data
454        {
455            let mut data = self.data.write();
456            data.clear();
457        }
458
459        // Reset Raft state
460        self.last_applied_index.store(0, Ordering::SeqCst);
461        self.last_applied_term.store(0, Ordering::SeqCst);
462
463        {
464            let mut snapshot_metadata = self.last_snapshot_metadata.write();
465            *snapshot_metadata = None;
466        }
467
468        // Clear all persisted files
469        self.clear_data_file().await?;
470        self.clear_metadata_file().await?;
471        self.clear_wal_async().await?;
472
473        info!("[Node-{}] State machine reset completed", self.node_id);
474        Ok(())
475    }
476
477    /// Clears the data file
478    async fn clear_data_file(&self) -> Result<(), Error> {
479        let data_path = self.data_dir.join("state.data");
480        let mut file = OpenOptions::new()
481            .write(true)
482            .create(true)
483            .truncate(true)
484            .open(data_path)
485            .await?;
486
487        file.set_len(0).await?;
488        file.flush().await?;
489        Ok(())
490    }
491
492    /// Clears the metadata file
493    async fn clear_metadata_file(&self) -> Result<(), Error> {
494        let metadata_path = self.data_dir.join("metadata.bin");
495        let mut file = OpenOptions::new()
496            .write(true)
497            .create(true)
498            .truncate(true)
499            .open(metadata_path)
500            .await?;
501
502        // Write default values (0 for both index and term)
503        file.write_all(&0u64.to_be_bytes()).await?;
504        file.write_all(&0u64.to_be_bytes()).await?;
505
506        file.flush().await?;
507        Ok(())
508    }
509
510    // pub(super) async fn decompress_snapshot(
511    //     &self,
512    //     compressed_path: &Path,
513    //     dest_dir: &Path,
514    // ) -> Result<(), Error> {
515    //     let file = File::open(compressed_path).await.map_err(StorageError::IoError)?;
516    //     let buf_reader = BufReader::new(file);
517    //     let gzip_decoder = GzipDecoder::new(buf_reader);
518    //     let mut archive = Archive::new(gzip_decoder);
519
520    //     archive.unpack(dest_dir).await.map_err(StorageError::IoError)?;
521    //     Ok(())
522    // }
523}
524
525impl Drop for FileStateMachine {
526    fn drop(&mut self) {
527        let timer = Instant::now();
528
529        // Save state into local database including flush operation
530        match self.save_hard_state() {
531            Ok(_) => debug!("StateMachine saved in {:?}", timer.elapsed()),
532            Err(e) => error!("Failed to save StateMachine: {}", e),
533        }
534    }
535}
536
537#[async_trait]
538impl StateMachine for FileStateMachine {
539    fn start(&self) -> Result<(), Error> {
540        self.running.store(true, Ordering::SeqCst);
541        info!("[Node-{}] File state machine started", self.node_id);
542        Ok(())
543    }
544
545    fn stop(&self) -> Result<(), Error> {
546        // Ensure all data is flushed to disk before stopping
547        self.running.store(false, Ordering::SeqCst);
548        info!("[Node-{}] File state machine stopped", self.node_id);
549        Ok(())
550    }
551
552    fn is_running(&self) -> bool {
553        self.running.load(Ordering::SeqCst)
554    }
555
556    fn get(
557        &self,
558        key_buffer: &[u8],
559    ) -> Result<Option<Vec<u8>>, Error> {
560        let data = self.data.read();
561        Ok(data.get(key_buffer).map(|(value, _)| value.clone()))
562    }
563
564    fn entry_term(
565        &self,
566        entry_id: u64,
567    ) -> Option<u64> {
568        let data = self.data.read();
569        data.values().find(|(_, index)| *index == entry_id).map(|(_, term)| *term)
570    }
571
572    async fn apply_chunk(
573        &self,
574        chunk: Vec<Entry>,
575    ) -> Result<(), Error> {
576        trace!("Applying chunk: {:?}.", chunk);
577
578        let mut highest_index_entry: Option<LogId> = None;
579
580        // Process each entry in the chunk
581        for entry in chunk {
582            assert!(entry.payload.is_some(), "Entry payload should not be None!");
583
584            // Ensure entries are processed in order
585            if let Some(prev) = &highest_index_entry {
586                assert!(
587                    entry.index > prev.index,
588                    "apply_chunk: received unordered entry at index {} (prev={})",
589                    entry.index,
590                    prev.index
591                );
592            }
593            highest_index_entry = Some(LogId {
594                index: entry.index,
595                term: entry.term,
596            });
597
598            match entry.payload.as_ref().unwrap().payload.as_ref() {
599                Some(Payload::Noop(_)) => {
600                    debug!("Handling NOOP command at index {}", entry.index);
601                    self.append_to_wal(&entry, "NOOP", &[], None).await?;
602                }
603                Some(Payload::Command(bytes)) => match WriteCommand::decode(&bytes[..]) {
604                    Ok(write_cmd) => match write_cmd.operation {
605                        Some(Operation::Insert(Insert { key, value })) => {
606                            debug!(
607                                "[Node-{}] Applying INSERT at index {}: {:?}",
608                                self.node_id, entry.index, key
609                            );
610
611                            // Write to WAL first
612                            self.append_to_wal(&entry, "INSERT", &key, Some(&value)).await?;
613
614                            // Update in-memory data
615                            let mut data = self.data.write();
616                            data.insert(key, (value, entry.term));
617                        }
618                        Some(Operation::Delete(Delete { key })) => {
619                            debug!(
620                                "[Node-{}] Applying DELETE at index {}: {:?}",
621                                self.node_id, entry.index, key
622                            );
623
624                            // Write to WAL first
625                            self.append_to_wal(&entry, "DELETE", &key, None).await?;
626
627                            // Update in-memory data
628                            let mut data = self.data.write();
629                            data.remove(&key);
630                        }
631                        None => {
632                            warn!(
633                                "[Node-{}] WriteCommand without operation at index {}",
634                                self.node_id, entry.index
635                            );
636                        }
637                    },
638                    Err(e) => {
639                        error!(
640                            "[Node-{}] Failed to decode WriteCommand at index {}: {:?}",
641                            self.node_id, entry.index, e
642                        );
643                        return Err(StorageError::SerializationError(e.to_string()).into());
644                    }
645                },
646                Some(Payload::Config(_config_change)) => {
647                    debug!(
648                        "[Node-{}] Ignoring config change at index {}",
649                        self.node_id, entry.index
650                    );
651                    self.append_to_wal(&entry, "CONFIG", &[], None).await?;
652                }
653                None => panic!("Entry payload variant should not be None!"),
654            }
655
656            info!("[{}]- COMMITTED_LOG_METRIC: {} ", self.node_id, entry.index);
657        }
658
659        if let Some(log_id) = highest_index_entry {
660            debug!(
661                "[Node-{}] State machine - updated last_applied: {:?}",
662                self.node_id, log_id
663            );
664            self.update_last_applied(log_id);
665        }
666
667        // Persist changes to disk and clear WAL
668        self.persist_data_async().await?;
669        self.persist_metadata_async().await?;
670        self.clear_wal_async().await?;
671
672        Ok(())
673    }
674
675    fn len(&self) -> usize {
676        self.data.read().len()
677    }
678
679    fn update_last_applied(
680        &self,
681        last_applied: LogId,
682    ) {
683        self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
684        self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
685    }
686
687    fn last_applied(&self) -> LogId {
688        LogId {
689            index: self.last_applied_index.load(Ordering::SeqCst),
690            term: self.last_applied_term.load(Ordering::SeqCst),
691        }
692    }
693
694    fn persist_last_applied(
695        &self,
696        last_applied: LogId,
697    ) -> Result<(), Error> {
698        self.update_last_applied(last_applied);
699        self.persist_metadata()
700    }
701
702    fn update_last_snapshot_metadata(
703        &self,
704        snapshot_metadata: &SnapshotMetadata,
705    ) -> Result<(), Error> {
706        *self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
707        Ok(())
708    }
709
710    fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
711        self.last_snapshot_metadata.read().clone()
712    }
713
714    fn persist_last_snapshot_metadata(
715        &self,
716        snapshot_metadata: &SnapshotMetadata,
717    ) -> Result<(), Error> {
718        self.update_last_snapshot_metadata(snapshot_metadata)
719    }
720
721    async fn apply_snapshot_from_file(
722        &self,
723        metadata: &SnapshotMetadata,
724        snapshot_dir: std::path::PathBuf,
725    ) -> Result<(), Error> {
726        info!(
727            "[Node-{}] Applying snapshot from file: {:?}",
728            self.node_id, snapshot_dir
729        );
730        println!(
731            "[Node-{}] Applying snapshot from file: {:?}",
732            self.node_id, snapshot_dir
733        );
734
735        // Read from the snapshot.bin file inside the directory
736        let snapshot_data_path = snapshot_dir.join("snapshot.bin");
737        let mut file = File::open(snapshot_data_path).await?;
738        let mut buffer = Vec::new();
739        file.read_to_end(&mut buffer).await?;
740
741        // Parse snapshot data
742        let mut pos = 0;
743        let mut new_data = HashMap::new();
744
745        while pos < buffer.len() {
746            // Read key length
747            if pos + 8 > buffer.len() {
748                break;
749            }
750
751            let key_len_bytes = &buffer[pos..pos + 8];
752            let key_len = u64::from_be_bytes([
753                key_len_bytes[0],
754                key_len_bytes[1],
755                key_len_bytes[2],
756                key_len_bytes[3],
757                key_len_bytes[4],
758                key_len_bytes[5],
759                key_len_bytes[6],
760                key_len_bytes[7],
761            ]) as usize;
762
763            pos += 8;
764
765            // Read key
766            if pos + key_len > buffer.len() {
767                break;
768            }
769
770            let key = buffer[pos..pos + key_len].to_vec();
771            pos += key_len;
772
773            // Read value length
774            if pos + 8 > buffer.len() {
775                break;
776            }
777
778            let value_len_bytes = &buffer[pos..pos + 8];
779            let value_len = u64::from_be_bytes([
780                value_len_bytes[0],
781                value_len_bytes[1],
782                value_len_bytes[2],
783                value_len_bytes[3],
784                value_len_bytes[4],
785                value_len_bytes[5],
786                value_len_bytes[6],
787                value_len_bytes[7],
788            ]) as usize;
789
790            pos += 8;
791
792            // Read value
793            if pos + value_len > buffer.len() {
794                break;
795            }
796
797            let value = buffer[pos..pos + value_len].to_vec();
798            pos += value_len;
799
800            // Read term
801            if pos + 8 > buffer.len() {
802                break;
803            }
804
805            let term_bytes = &buffer[pos..pos + 8];
806            let term = u64::from_be_bytes([
807                term_bytes[0],
808                term_bytes[1],
809                term_bytes[2],
810                term_bytes[3],
811                term_bytes[4],
812                term_bytes[5],
813                term_bytes[6],
814                term_bytes[7],
815            ]);
816
817            pos += 8;
818
819            // Add to new data
820            new_data.insert(key, (value, term));
821        }
822
823        // Atomically replace the data
824        {
825            let mut data = self.data.write();
826            *data = new_data;
827        }
828
829        // Update metadata
830        *self.last_snapshot_metadata.write() = Some(metadata.clone());
831
832        if let Some(last_included) = &metadata.last_included {
833            self.update_last_applied(*last_included);
834        }
835
836        // Persist to disk
837        self.persist_data_async().await?;
838        self.persist_metadata_async().await?;
839        self.clear_wal_async().await?;
840
841        info!("[Node-{}] Snapshot applied successfully", self.node_id);
842        Ok(())
843    }
844
845    async fn generate_snapshot_data(
846        &self,
847        new_snapshot_dir: std::path::PathBuf,
848        last_included: LogId,
849    ) -> Result<[u8; 32], Error> {
850        info!(
851            "[Node-{}] Generating snapshot data up to {:?}",
852            self.node_id, last_included
853        );
854
855        // Create snapshot directory
856        fs::create_dir_all(&new_snapshot_dir).await?;
857
858        // Create snapshot file
859        let snapshot_path = new_snapshot_dir.join("snapshot.bin");
860        let mut file = File::create(&snapshot_path).await?;
861
862        let data_copy: HashMap<Vec<u8>, (Vec<u8>, u64)> = {
863            let data = self.data.read();
864            data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
865        };
866
867        // Write data in the same format as the data file
868        for (key, (value, term)) in data_copy.iter() {
869            // Write key length (8 bytes)
870            let key_len = key.len() as u64;
871            file.write_all(&key_len.to_be_bytes()).await?;
872
873            // Write key
874            file.write_all(key).await?;
875
876            // Write value length (8 bytes)
877            let value_len = value.len() as u64;
878            file.write_all(&value_len.to_be_bytes()).await?;
879
880            // Write value
881            file.write_all(value).await?;
882
883            // Write term (8 bytes)
884            file.write_all(&term.to_be_bytes()).await?;
885        }
886
887        file.flush().await?;
888
889        // Update metadata
890        let metadata = SnapshotMetadata {
891            last_included: Some(last_included),
892            checksum: vec![0; 32], // Simple checksum for demo
893        };
894
895        self.update_last_snapshot_metadata(&metadata)?;
896
897        info!(
898            "[Node-{}] Snapshot generated at {:?}",
899            self.node_id, snapshot_path
900        );
901
902        // Return dummy checksum
903        Ok([0; 32])
904    }
905
906    fn save_hard_state(&self) -> Result<(), Error> {
907        let last_applied = self.last_applied();
908        self.persist_last_applied(last_applied)?;
909
910        if let Some(last_snapshot_metadata) = self.snapshot_metadata() {
911            self.persist_last_snapshot_metadata(&last_snapshot_metadata)?;
912        }
913
914        self.flush()?;
915        Ok(())
916    }
917
918    fn flush(&self) -> Result<(), Error> {
919        self.persist_data()?;
920        self.persist_metadata()?;
921        self.clear_wal()?;
922        Ok(())
923    }
924
925    async fn flush_async(&self) -> Result<(), Error> {
926        self.persist_data_async().await?;
927        self.persist_metadata_async().await?;
928        self.clear_wal_async().await?;
929        Ok(())
930    }
931
932    async fn reset(&self) -> Result<(), Error> {
933        self.reset().await
934    }
935}