1use std::collections::HashMap;
2use std::io::Write;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::Ordering;
7
8use bytes::Bytes;
9use parking_lot::RwLock;
10use prost::Message;
11use tokio::fs;
12use tokio::fs::File;
13use tokio::fs::OpenOptions;
14use tokio::io::AsyncReadExt;
15use tokio::io::AsyncWriteExt;
16use tokio::time::Instant;
17use tonic::async_trait;
18use tracing::debug;
19use tracing::error;
20use tracing::info;
21use tracing::trace;
22use tracing::warn;
23
24use crate::proto::client::write_command::Delete;
25use crate::proto::client::write_command::Insert;
26use crate::proto::client::write_command::Operation;
27use crate::proto::client::WriteCommand;
28use crate::proto::common::entry_payload::Payload;
29use crate::proto::common::Entry;
30use crate::proto::common::LogId;
31use crate::proto::storage::SnapshotMetadata;
32use crate::Error;
33use crate::StateMachine;
34use crate::StorageError;
35
36type FileStateMachineDataType = RwLock<HashMap<Bytes, (Bytes, u64)>>;
37
38#[repr(u8)]
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41enum WalOpCode {
42 Noop = 0,
43 Insert = 1,
44 Delete = 2,
45 Config = 3,
46}
47
48impl WalOpCode {
49 fn from_str(s: &str) -> Self {
50 match s {
51 "INSERT" => Self::Insert,
52 "DELETE" => Self::Delete,
53 "CONFIG" => Self::Config,
54 _ => Self::Noop,
55 }
56 }
57
58 fn from_u8(byte: u8) -> Self {
59 match byte {
60 1 => Self::Insert,
61 2 => Self::Delete,
62 3 => Self::Config,
63 _ => Self::Noop,
64 }
65 }
66}
67
68#[derive(Debug)]
77pub struct FileStateMachine {
78 data: FileStateMachineDataType, last_applied_index: AtomicU64,
83 last_applied_term: AtomicU64,
84 last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
85
86 running: AtomicBool,
88
89 data_dir: PathBuf,
91 }
95
96impl FileStateMachine {
97 pub async fn new(data_dir: PathBuf) -> Result<Self, Error> {
106 fs::create_dir_all(&data_dir).await?;
108
109 let machine = Self {
110 data: RwLock::new(HashMap::new()),
111 last_applied_index: AtomicU64::new(0),
112 last_applied_term: AtomicU64::new(0),
113 last_snapshot_metadata: RwLock::new(None),
114 running: AtomicBool::new(true),
115 data_dir: data_dir.clone(),
116 };
117
118 machine.load_from_disk().await?;
120
121 Ok(machine)
122 }
123
124 async fn load_from_disk(&self) -> Result<(), Error> {
126 self.load_metadata().await?;
128
129 self.load_data().await?;
131
132 self.replay_wal().await?;
134
135 info!("Loaded state machine data from disk");
136 Ok(())
137 }
138
139 async fn load_metadata(&self) -> Result<(), Error> {
141 let metadata_path = self.data_dir.join("metadata.bin");
142 if !metadata_path.exists() {
143 return Ok(());
144 }
145
146 let mut file = File::open(metadata_path).await?;
147 let mut buffer = [0u8; 16];
148
149 if file.read_exact(&mut buffer).await.is_ok() {
150 let index = u64::from_be_bytes([
151 buffer[0], buffer[1], buffer[2], buffer[3], buffer[4], buffer[5], buffer[6],
152 buffer[7],
153 ]);
154
155 let term = u64::from_be_bytes([
156 buffer[8], buffer[9], buffer[10], buffer[11], buffer[12], buffer[13], buffer[14],
157 buffer[15],
158 ]);
159
160 self.last_applied_index.store(index, Ordering::SeqCst);
161 self.last_applied_term.store(term, Ordering::SeqCst);
162 }
163
164 Ok(())
165 }
166
167 async fn load_data(&self) -> Result<(), Error> {
169 let data_path = self.data_dir.join("state.data");
170 if !data_path.exists() {
171 return Ok(());
172 }
173
174 let mut file = File::open(data_path).await?;
175 let mut buffer = Vec::new();
176 file.read_to_end(&mut buffer).await?;
177
178 let mut pos = 0;
179 let mut data = self.data.write();
180
181 while pos < buffer.len() {
182 if pos + 8 > buffer.len() {
184 break;
185 }
186
187 let key_len_bytes = &buffer[pos..pos + 8];
188 let key_len = u64::from_be_bytes([
189 key_len_bytes[0],
190 key_len_bytes[1],
191 key_len_bytes[2],
192 key_len_bytes[3],
193 key_len_bytes[4],
194 key_len_bytes[5],
195 key_len_bytes[6],
196 key_len_bytes[7],
197 ]) as usize;
198
199 pos += 8;
200
201 if pos + key_len > buffer.len() {
203 break;
204 }
205
206 let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
207 pos += key_len;
208
209 if pos + 8 > buffer.len() {
211 break;
212 }
213
214 let value_len_bytes = &buffer[pos..pos + 8];
215 let value_len = u64::from_be_bytes([
216 value_len_bytes[0],
217 value_len_bytes[1],
218 value_len_bytes[2],
219 value_len_bytes[3],
220 value_len_bytes[4],
221 value_len_bytes[5],
222 value_len_bytes[6],
223 value_len_bytes[7],
224 ]) as usize;
225
226 pos += 8;
227
228 if pos + value_len > buffer.len() {
230 break;
231 }
232
233 let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
234 pos += value_len;
235
236 if pos + 8 > buffer.len() {
238 break;
239 }
240
241 let term_bytes = &buffer[pos..pos + 8];
242 let term = u64::from_be_bytes([
243 term_bytes[0],
244 term_bytes[1],
245 term_bytes[2],
246 term_bytes[3],
247 term_bytes[4],
248 term_bytes[5],
249 term_bytes[6],
250 term_bytes[7],
251 ]);
252
253 pos += 8;
254
255 data.insert(key, (value, term));
257 }
258
259 Ok(())
260 }
261
262 async fn replay_wal(&self) -> Result<(), Error> {
264 let wal_path = self.data_dir.join("wal.log");
265 if !wal_path.exists() {
266 debug!("No WAL file found, skipping replay");
267 return Ok(());
268 }
269
270 let mut file = File::open(wal_path).await?;
271 let mut buffer = Vec::new();
272 file.read_to_end(&mut buffer).await?;
273
274 if buffer.is_empty() {
275 debug!("WAL file is empty, skipping replay");
276 return Ok(());
277 }
278
279 let mut pos = 0;
280 let mut operations = Vec::new();
281 let mut replayed_count = 0;
282
283 while pos + 17 < buffer.len() {
284 let _index = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
286 pos += 8;
287
288 let term = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
290 pos += 8;
291
292 let op_code = WalOpCode::from_u8(buffer[pos]);
294 pos += 1;
295
296 if pos + 8 > buffer.len() {
298 warn!("Incomplete key length at position {}, stopping replay", pos);
299 break;
300 }
301
302 let key_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
304 pos += 8;
305
306 if pos + key_len > buffer.len() {
308 warn!(
309 "Incomplete key data at position {} (need {} bytes, have {})",
310 pos,
311 key_len,
312 buffer.len() - pos
313 );
314 break;
315 }
316
317 let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
319 pos += key_len;
320
321 if pos + 8 > buffer.len() {
323 warn!(
324 "Incomplete value length at position {}, stopping replay",
325 pos
326 );
327 break;
328 }
329
330 let value_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
332 pos += 8;
333
334 let value = if value_len > 0 {
336 if pos + value_len > buffer.len() {
337 warn!("Incomplete value data at position {}, stopping replay", pos);
338 break;
339 }
340 let value_data = Bytes::from(buffer[pos..pos + value_len].to_vec());
341 pos += value_len;
342 Some(value_data)
343 } else {
344 None
345 };
346
347 operations.push((op_code, key, value, term));
348 replayed_count += 1;
349 }
350
351 info!(
352 "Parsed {} WAL operations, applying to memory",
353 operations.len()
354 );
355
356 let mut applied_count = 0;
358 {
359 let mut data = self.data.write();
360 for (op_code, key, value, term) in operations {
361 match op_code {
362 WalOpCode::Insert => {
363 if let Some(value_data) = value {
364 data.insert(key, (value_data, term));
365 applied_count += 1;
366 debug!("Applied INSERT");
367 } else {
368 warn!("INSERT operation without value");
369 }
370 }
371 WalOpCode::Delete => {
372 data.remove(&key);
373 applied_count += 1;
374 debug!("Replayed DELETE: key={:?}", key);
375 }
376 WalOpCode::Noop | WalOpCode::Config => {
377 applied_count += 1;
379 debug!("Replayed {:?} operation", op_code);
380 }
381 }
382 }
383 }
384
385 info!(
386 "WAL replay complete: {} operations replayed_count, {} operations applied",
387 replayed_count, applied_count
388 );
389
390 if applied_count > 0 {
392 self.clear_wal_async().await?;
393 debug!(
394 "Cleared WAL after successful replay of {} operations",
395 applied_count
396 );
397 }
398
399 Ok(())
400 }
401
402 fn persist_data(&self) -> Result<(), Error> {
404 let data_copy: HashMap<Bytes, (Bytes, u64)> = {
406 let data = self.data.read();
407 data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
408 };
409
410 let data_path = self.data_dir.join("state.data");
412 let mut file = std::fs::OpenOptions::new()
413 .write(true)
414 .create(true)
415 .truncate(true)
416 .open(data_path)?;
417
418 for (key, (value, term)) in data_copy.iter() {
419 let key_len = key.len() as u64;
421 file.write_all(&key_len.to_be_bytes())?;
422
423 file.write_all(key)?;
425
426 let value_len = value.len() as u64;
428 file.write_all(&value_len.to_be_bytes())?;
429
430 file.write_all(value)?;
432
433 file.write_all(&term.to_be_bytes())?;
435 }
436
437 file.flush()?;
438 Ok(())
439 }
440
441 async fn persist_data_async(&self) -> Result<(), Error> {
443 let data_copy: HashMap<Bytes, (Bytes, u64)> = {
445 let data = self.data.read();
446 data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
447 };
448
449 let data_path = self.data_dir.join("state.data");
451 let mut file = OpenOptions::new()
452 .write(true)
453 .create(true)
454 .truncate(true)
455 .open(data_path)
456 .await?;
457
458 for (key, (value, term)) in data_copy.iter() {
459 let key_len = key.len() as u64;
461 file.write_all(&key_len.to_be_bytes()).await?;
462
463 file.write_all(key.as_ref()).await?;
465
466 let value_len = value.len() as u64;
468 file.write_all(&value_len.to_be_bytes()).await?;
469
470 file.write_all(value.as_ref()).await?;
472
473 file.write_all(&term.to_be_bytes()).await?;
475 }
476
477 file.flush().await?;
478 Ok(())
479 }
480
481 fn persist_metadata(&self) -> Result<(), Error> {
483 let metadata_path = self.data_dir.join("metadata.bin");
484 let mut file = std::fs::OpenOptions::new()
485 .write(true)
486 .create(true)
487 .truncate(true)
488 .open(metadata_path)?;
489
490 let index = self.last_applied_index.load(Ordering::SeqCst);
491 let term = self.last_applied_term.load(Ordering::SeqCst);
492
493 file.write_all(&index.to_be_bytes())?;
494 file.write_all(&term.to_be_bytes())?;
495
496 file.flush()?;
497 Ok(())
498 }
499
500 async fn persist_metadata_async(&self) -> Result<(), Error> {
501 let metadata_path = self.data_dir.join("metadata.bin");
502 let mut file = OpenOptions::new()
503 .write(true)
504 .create(true)
505 .truncate(true)
506 .open(metadata_path)
507 .await?;
508
509 let index = self.last_applied_index.load(Ordering::SeqCst);
510 let term = self.last_applied_term.load(Ordering::SeqCst);
511
512 file.write_all(&index.to_be_bytes()).await?;
513 file.write_all(&term.to_be_bytes()).await?;
514
515 file.flush().await?;
516 Ok(())
517 }
518
519 #[allow(unused)]
521 fn clear_wal(&self) -> Result<(), Error> {
522 let wal_path = self.data_dir.join("wal.log");
523 let mut file = std::fs::OpenOptions::new()
524 .write(true)
525 .create(true)
526 .truncate(true)
527 .open(wal_path)?;
528
529 file.set_len(0)?;
530 file.flush()?;
531 Ok(())
532 }
533
534 async fn clear_wal_async(&self) -> Result<(), Error> {
536 let wal_path = self.data_dir.join("wal.log");
537 let mut file = OpenOptions::new()
538 .write(true)
539 .create(true)
540 .truncate(true)
541 .open(wal_path)
542 .await?;
543
544 file.set_len(0).await?;
545 file.flush().await?;
546 Ok(())
547 }
548
549 pub async fn reset(&self) -> Result<(), Error> {
557 info!("Resetting state machine");
558
559 {
561 let mut data = self.data.write();
562 data.clear();
563 }
564
565 self.last_applied_index.store(0, Ordering::SeqCst);
567 self.last_applied_term.store(0, Ordering::SeqCst);
568
569 {
570 let mut snapshot_metadata = self.last_snapshot_metadata.write();
571 *snapshot_metadata = None;
572 }
573
574 self.clear_data_file().await?;
576 self.clear_metadata_file().await?;
577 self.clear_wal_async().await?;
578
579 info!("State machine reset completed");
580 Ok(())
581 }
582
583 async fn clear_data_file(&self) -> Result<(), Error> {
585 let data_path = self.data_dir.join("state.data");
586 let mut file = OpenOptions::new()
587 .write(true)
588 .create(true)
589 .truncate(true)
590 .open(data_path)
591 .await?;
592
593 file.set_len(0).await?;
594 file.flush().await?;
595 Ok(())
596 }
597
598 async fn clear_metadata_file(&self) -> Result<(), Error> {
600 let metadata_path = self.data_dir.join("metadata.bin");
601 let mut file = OpenOptions::new()
602 .write(true)
603 .create(true)
604 .truncate(true)
605 .open(metadata_path)
606 .await?;
607
608 file.write_all(&0u64.to_be_bytes()).await?;
610 file.write_all(&0u64.to_be_bytes()).await?;
611
612 file.flush().await?;
613 Ok(())
614 }
615
616 pub(crate) async fn append_to_wal(
627 &self,
628 entries: Vec<(Entry, String, Bytes, Option<Bytes>)>,
629 ) -> Result<(), Error> {
630 if entries.is_empty() {
631 return Ok(());
632 }
633
634 let wal_path = self.data_dir.join("wal.log");
635
636 let mut file =
637 OpenOptions::new().write(true).create(true).append(true).open(&wal_path).await?;
638
639 let estimated_size: usize = entries
641 .iter()
642 .map(|(_, _, key, value)| {
643 8 + 8 + 1 + 8 + key.len() + 8 + value.as_ref().map_or(0, |v| v.len())
644 })
645 .sum();
646
647 let mut batch_buffer = Vec::with_capacity(estimated_size);
649
650 for (entry, operation, key, value) in entries {
651 batch_buffer.extend_from_slice(&entry.index.to_be_bytes());
653 batch_buffer.extend_from_slice(&entry.term.to_be_bytes());
654
655 let op_code = WalOpCode::from_str(&operation);
657 batch_buffer.push(op_code as u8);
658
659 batch_buffer.extend_from_slice(&(key.len() as u64).to_be_bytes());
661 batch_buffer.extend_from_slice(&key);
662
663 if let Some(value_data) = value {
666 batch_buffer.extend_from_slice(&(value_data.len() as u64).to_be_bytes());
667 batch_buffer.extend_from_slice(&value_data);
668 } else {
669 batch_buffer.extend_from_slice(&0u64.to_be_bytes());
671 }
672 }
673
674 file.write_all(&batch_buffer).await?;
675 file.flush().await?;
676
677 Ok(())
678 }
679
680 #[allow(unused)]
683 pub(crate) async fn checkpoint(&self) -> Result<(), Error> {
684 self.persist_data_async().await?;
686 self.persist_metadata_async().await?;
687
688 self.clear_wal_async().await?;
690
691 Ok(())
692 }
693}
694
695impl Drop for FileStateMachine {
696 fn drop(&mut self) {
697 let timer = Instant::now();
698
699 match self.save_hard_state() {
701 Ok(_) => debug!("StateMachine saved in {:?}", timer.elapsed()),
702 Err(e) => error!("Failed to save StateMachine: {}", e),
703 }
704 }
705}
706
707#[async_trait]
708impl StateMachine for FileStateMachine {
709 fn start(&self) -> Result<(), Error> {
710 self.running.store(true, Ordering::SeqCst);
711 info!("File state machine started");
712 Ok(())
713 }
714
715 fn stop(&self) -> Result<(), Error> {
716 self.running.store(false, Ordering::SeqCst);
718 info!("File state machine stopped");
719 Ok(())
720 }
721
722 fn is_running(&self) -> bool {
723 self.running.load(Ordering::SeqCst)
724 }
725
726 fn get(
727 &self,
728 key_buffer: &[u8],
729 ) -> Result<Option<Bytes>, Error> {
730 let data = self.data.read();
731 Ok(data.get(key_buffer).map(|(value, _)| value.clone()))
732 }
733
734 fn entry_term(
735 &self,
736 entry_id: u64,
737 ) -> Option<u64> {
738 let data = self.data.read();
739 data.values().find(|(_, index)| *index == entry_id).map(|(_, term)| *term)
740 }
741
742 async fn apply_chunk(
743 &self,
744 chunk: Vec<Entry>,
745 ) -> Result<(), Error> {
746 trace!("Applying chunk: {:?}.", chunk);
747
748 let mut highest_index_entry: Option<LogId> = None;
749 let mut batch_operations = Vec::new();
750
751 for entry in chunk {
753 let entry_index = entry.index;
754
755 assert!(entry.payload.is_some(), "Entry payload should not be None!");
756
757 if let Some(prev) = &highest_index_entry {
759 assert!(
760 entry.index > prev.index,
761 "apply_chunk: received unordered entry at index {} (prev={})",
762 entry.index,
763 prev.index
764 );
765 }
766 highest_index_entry = Some(LogId {
767 index: entry.index,
768 term: entry.term,
769 });
770
771 match entry.payload.as_ref().unwrap().payload.as_ref() {
773 Some(Payload::Noop(_)) => {
774 debug!("Handling NOOP command at index {}", entry.index);
775 batch_operations.push((entry, "NOOP", Bytes::new(), None));
776 }
777 Some(Payload::Command(bytes)) => match WriteCommand::decode(&bytes[..]) {
778 Ok(write_cmd) => {
779 match write_cmd.operation {
781 Some(Operation::Insert(Insert { key, value })) => {
782 batch_operations.push((entry, "INSERT", key, Some(value)));
783 }
784 Some(Operation::Delete(Delete { key })) => {
785 batch_operations.push((entry, "DELETE", key, None));
786 }
787 None => {
788 warn!("WriteCommand without operation at index {}", entry.index);
789 batch_operations.push((entry, "NOOP", Bytes::new(), None));
790 }
791 }
792 }
793 Err(e) => {
794 error!(
795 "Failed to decode WriteCommand at index {}: {:?}",
796 entry.index, e
797 );
798 return Err(StorageError::SerializationError(e.to_string()).into());
799 }
800 },
801 Some(Payload::Config(_config_change)) => {
802 debug!("Ignoring config change at index {}", entry.index);
803 batch_operations.push((entry, "CONFIG", Bytes::new(), None));
804 }
805 None => panic!("Entry payload variant should not be None!"),
806 }
807
808 info!("COMMITTED_LOG_METRIC: {}", entry_index);
809 }
810
811 let mut wal_entries = Vec::new();
813 for (entry, operation, key, value) in &batch_operations {
814 wal_entries.push((
816 entry.clone(),
817 operation.to_string(),
818 key.clone(),
819 value.clone(),
820 ));
821 }
822
823 self.append_to_wal(wal_entries).await?;
825
826 {
828 let mut data = self.data.write();
829
830 for (entry, operation, key, value) in batch_operations {
832 match operation {
833 "INSERT" => {
834 if let Some(value) = value {
835 data.insert(key, (value, entry.term));
837 }
838 }
839 "DELETE" => {
840 data.remove(&key);
841 }
842 "NOOP" | "CONFIG" => {
843 }
845 _ => warn!("Unknown operation: {}", operation),
846 }
847 }
848 } if let Some(log_id) = highest_index_entry {
851 debug!("State machine - updated last_applied: {:?}", log_id);
852 self.update_last_applied(log_id);
853 }
854
855 self.persist_data_async().await?;
857 self.persist_metadata_async().await?;
858 self.clear_wal_async().await?;
859
860 Ok(())
861 }
862
863 fn len(&self) -> usize {
864 self.data.read().len()
865 }
866
867 fn update_last_applied(
868 &self,
869 last_applied: LogId,
870 ) {
871 self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
872 self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
873 }
874
875 fn last_applied(&self) -> LogId {
876 LogId {
877 index: self.last_applied_index.load(Ordering::SeqCst),
878 term: self.last_applied_term.load(Ordering::SeqCst),
879 }
880 }
881
882 fn persist_last_applied(
883 &self,
884 last_applied: LogId,
885 ) -> Result<(), Error> {
886 self.update_last_applied(last_applied);
887 self.persist_metadata()
888 }
889
890 fn update_last_snapshot_metadata(
891 &self,
892 snapshot_metadata: &SnapshotMetadata,
893 ) -> Result<(), Error> {
894 *self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
895 Ok(())
896 }
897
898 fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
899 self.last_snapshot_metadata.read().clone()
900 }
901
902 fn persist_last_snapshot_metadata(
903 &self,
904 snapshot_metadata: &SnapshotMetadata,
905 ) -> Result<(), Error> {
906 self.update_last_snapshot_metadata(snapshot_metadata)
907 }
908
909 async fn apply_snapshot_from_file(
910 &self,
911 metadata: &SnapshotMetadata,
912 snapshot_dir: std::path::PathBuf,
913 ) -> Result<(), Error> {
914 info!("Applying snapshot from file: {:?}", snapshot_dir);
915
916 let snapshot_data_path = snapshot_dir.join("snapshot.bin");
918 let mut file = File::open(snapshot_data_path).await?;
919 let mut buffer = Vec::new();
920 file.read_to_end(&mut buffer).await?;
921
922 let mut pos = 0;
924 let mut new_data = HashMap::new();
925
926 while pos < buffer.len() {
927 if pos + 8 > buffer.len() {
929 break;
930 }
931
932 let key_len_bytes = &buffer[pos..pos + 8];
933 let key_len = u64::from_be_bytes([
934 key_len_bytes[0],
935 key_len_bytes[1],
936 key_len_bytes[2],
937 key_len_bytes[3],
938 key_len_bytes[4],
939 key_len_bytes[5],
940 key_len_bytes[6],
941 key_len_bytes[7],
942 ]) as usize;
943
944 pos += 8;
945
946 if pos + key_len > buffer.len() {
948 break;
949 }
950
951 let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
952 pos += key_len;
953
954 if pos + 8 > buffer.len() {
956 break;
957 }
958
959 let value_len_bytes = &buffer[pos..pos + 8];
960 let value_len = u64::from_be_bytes([
961 value_len_bytes[0],
962 value_len_bytes[1],
963 value_len_bytes[2],
964 value_len_bytes[3],
965 value_len_bytes[4],
966 value_len_bytes[5],
967 value_len_bytes[6],
968 value_len_bytes[7],
969 ]) as usize;
970
971 pos += 8;
972
973 if pos + value_len > buffer.len() {
975 break;
976 }
977
978 let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
979 pos += value_len;
980
981 if pos + 8 > buffer.len() {
983 break;
984 }
985
986 let term_bytes = &buffer[pos..pos + 8];
987 let term = u64::from_be_bytes([
988 term_bytes[0],
989 term_bytes[1],
990 term_bytes[2],
991 term_bytes[3],
992 term_bytes[4],
993 term_bytes[5],
994 term_bytes[6],
995 term_bytes[7],
996 ]);
997
998 pos += 8;
999
1000 new_data.insert(key, (value, term));
1002 }
1003
1004 {
1006 let mut data = self.data.write();
1007 *data = new_data;
1008 }
1009
1010 *self.last_snapshot_metadata.write() = Some(metadata.clone());
1012
1013 if let Some(last_included) = &metadata.last_included {
1014 self.update_last_applied(*last_included);
1015 }
1016
1017 self.persist_data_async().await?;
1019 self.persist_metadata_async().await?;
1020 self.clear_wal_async().await?;
1021
1022 info!("Snapshot applied successfully");
1023 Ok(())
1024 }
1025
1026 async fn generate_snapshot_data(
1027 &self,
1028 new_snapshot_dir: std::path::PathBuf,
1029 last_included: LogId,
1030 ) -> Result<Bytes, Error> {
1031 info!("Generating snapshot data up to {:?}", last_included);
1032
1033 fs::create_dir_all(&new_snapshot_dir).await?;
1035
1036 let snapshot_path = new_snapshot_dir.join("snapshot.bin");
1038 let mut file = File::create(&snapshot_path).await?;
1039
1040 let data_copy: HashMap<Bytes, (Bytes, u64)> = {
1041 let data = self.data.read();
1042 data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
1043 };
1044
1045 for (key, (value, term)) in data_copy.iter() {
1047 let key_len = key.len() as u64;
1049 file.write_all(&key_len.to_be_bytes()).await?;
1050
1051 file.write_all(key).await?;
1053
1054 let value_len = value.len() as u64;
1056 file.write_all(&value_len.to_be_bytes()).await?;
1057
1058 file.write_all(value).await?;
1060
1061 file.write_all(&term.to_be_bytes()).await?;
1063 }
1064
1065 file.flush().await?;
1066
1067 let metadata = SnapshotMetadata {
1069 last_included: Some(last_included),
1070 checksum: Bytes::from(vec![0; 32]), };
1072
1073 self.update_last_snapshot_metadata(&metadata)?;
1074
1075 info!("Snapshot generated at {:?}", snapshot_path);
1076
1077 Ok(Bytes::from_static(&[0u8; 32]))
1079 }
1080
1081 fn save_hard_state(&self) -> Result<(), Error> {
1082 let last_applied = self.last_applied();
1083 self.persist_last_applied(last_applied)?;
1084
1085 if let Some(last_snapshot_metadata) = self.snapshot_metadata() {
1086 self.persist_last_snapshot_metadata(&last_snapshot_metadata)?;
1087 }
1088
1089 self.flush()?;
1090 Ok(())
1091 }
1092
1093 fn flush(&self) -> Result<(), Error> {
1094 self.persist_data()?;
1095 self.persist_metadata()?;
1096 Ok(())
1098 }
1099
1100 async fn flush_async(&self) -> Result<(), Error> {
1101 self.persist_data_async().await?;
1102 self.persist_metadata_async().await?;
1103 Ok(())
1105 }
1106
1107 async fn reset(&self) -> Result<(), Error> {
1108 self.reset().await
1109 }
1110}