1use std::collections::HashMap;
2use std::io::Write;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::Ordering;
7
8use parking_lot::RwLock;
9use prost::Message;
10use tokio::fs;
11use tokio::fs::File;
12use tokio::fs::OpenOptions;
13use tokio::io::AsyncReadExt;
14use tokio::io::AsyncWriteExt;
15use tokio::time::Instant;
16use tonic::async_trait;
17use tracing::debug;
18use tracing::error;
19use tracing::info;
20use tracing::trace;
21use tracing::warn;
22
23use crate::proto::client::write_command::Delete;
24use crate::proto::client::write_command::Insert;
25use crate::proto::client::write_command::Operation;
26use crate::proto::client::WriteCommand;
27use crate::proto::common::entry_payload::Payload;
28use crate::proto::common::Entry;
29use crate::proto::common::LogId;
30use crate::proto::storage::SnapshotMetadata;
31use crate::Error;
32use crate::StateMachine;
33use crate::StorageError;
34
35type FileStateMachineDataType = RwLock<HashMap<Vec<u8>, (Vec<u8>, u64)>>;
36#[derive(Debug)]
45pub struct FileStateMachine {
46 data: FileStateMachineDataType, last_applied_index: AtomicU64,
51 last_applied_term: AtomicU64,
52 last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
53
54 running: AtomicBool,
56 node_id: u32,
57
58 data_dir: PathBuf,
60 }
64
65impl FileStateMachine {
66 pub async fn new(
75 data_dir: PathBuf,
76 node_id: u32,
77 ) -> Result<Self, Error> {
78 fs::create_dir_all(&data_dir).await?;
80
81 let machine = Self {
82 data: RwLock::new(HashMap::new()),
83 last_applied_index: AtomicU64::new(0),
84 last_applied_term: AtomicU64::new(0),
85 last_snapshot_metadata: RwLock::new(None),
86 running: AtomicBool::new(true),
87 node_id,
88 data_dir: data_dir.clone(),
89 };
90
91 machine.load_from_disk().await?;
93
94 Ok(machine)
95 }
96
97 async fn load_from_disk(&self) -> Result<(), Error> {
99 self.load_metadata().await?;
101
102 self.load_data().await?;
104
105 self.replay_wal().await?;
107
108 info!(
109 "[Node-{}] Loaded state machine data from disk",
110 self.node_id
111 );
112 Ok(())
113 }
114
115 async fn load_metadata(&self) -> Result<(), Error> {
117 let metadata_path = self.data_dir.join("metadata.bin");
118 if !metadata_path.exists() {
119 return Ok(());
120 }
121
122 let mut file = File::open(metadata_path).await?;
123 let mut buffer = [0u8; 16];
124
125 if file.read_exact(&mut buffer).await.is_ok() {
126 let index = u64::from_be_bytes([
127 buffer[0], buffer[1], buffer[2], buffer[3], buffer[4], buffer[5], buffer[6],
128 buffer[7],
129 ]);
130
131 let term = u64::from_be_bytes([
132 buffer[8], buffer[9], buffer[10], buffer[11], buffer[12], buffer[13], buffer[14],
133 buffer[15],
134 ]);
135
136 self.last_applied_index.store(index, Ordering::SeqCst);
137 self.last_applied_term.store(term, Ordering::SeqCst);
138 }
139
140 Ok(())
141 }
142
143 async fn load_data(&self) -> Result<(), Error> {
145 let data_path = self.data_dir.join("state.data");
146 if !data_path.exists() {
147 return Ok(());
148 }
149
150 let mut file = File::open(data_path).await?;
151 let mut buffer = Vec::new();
152 file.read_to_end(&mut buffer).await?;
153
154 let mut pos = 0;
155 let mut data = self.data.write();
156
157 while pos < buffer.len() {
158 if pos + 8 > buffer.len() {
160 break;
161 }
162
163 let key_len_bytes = &buffer[pos..pos + 8];
164 let key_len = u64::from_be_bytes([
165 key_len_bytes[0],
166 key_len_bytes[1],
167 key_len_bytes[2],
168 key_len_bytes[3],
169 key_len_bytes[4],
170 key_len_bytes[5],
171 key_len_bytes[6],
172 key_len_bytes[7],
173 ]) as usize;
174
175 pos += 8;
176
177 if pos + key_len > buffer.len() {
179 break;
180 }
181
182 let key = buffer[pos..pos + key_len].to_vec();
183 pos += key_len;
184
185 if pos + 8 > buffer.len() {
187 break;
188 }
189
190 let value_len_bytes = &buffer[pos..pos + 8];
191 let value_len = u64::from_be_bytes([
192 value_len_bytes[0],
193 value_len_bytes[1],
194 value_len_bytes[2],
195 value_len_bytes[3],
196 value_len_bytes[4],
197 value_len_bytes[5],
198 value_len_bytes[6],
199 value_len_bytes[7],
200 ]) as usize;
201
202 pos += 8;
203
204 if pos + value_len > buffer.len() {
206 break;
207 }
208
209 let value = buffer[pos..pos + value_len].to_vec();
210 pos += value_len;
211
212 if pos + 8 > buffer.len() {
214 break;
215 }
216
217 let term_bytes = &buffer[pos..pos + 8];
218 let term = u64::from_be_bytes([
219 term_bytes[0],
220 term_bytes[1],
221 term_bytes[2],
222 term_bytes[3],
223 term_bytes[4],
224 term_bytes[5],
225 term_bytes[6],
226 term_bytes[7],
227 ]);
228
229 pos += 8;
230
231 data.insert(key, (value, term));
233 }
234
235 Ok(())
236 }
237
238 async fn replay_wal(&self) -> Result<(), Error> {
240 let wal_path = self.data_dir.join("wal.log");
241 if !wal_path.exists() {
242 return Ok(());
243 }
244
245 let mut file = File::open(wal_path).await?;
246 let mut buffer = Vec::new();
247 file.read_to_end(&mut buffer).await?;
248
249 if !buffer.is_empty() {
251 warn!(
252 "[Node-{}] Replaying write-ahead log for crash recovery",
253 self.node_id
254 );
255
256 file.set_len(0).await?; }
260
261 Ok(())
262 }
263
264 fn persist_data(&self) -> Result<(), Error> {
266 let data_copy: HashMap<Vec<u8>, (Vec<u8>, u64)> = {
268 let data = self.data.read();
269 data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
270 };
271
272 let data_path = self.data_dir.join("state.data");
274 let mut file = std::fs::OpenOptions::new()
275 .write(true)
276 .create(true)
277 .truncate(true)
278 .open(data_path)?;
279
280 for (key, (value, term)) in data_copy.iter() {
281 let key_len = key.len() as u64;
283 file.write_all(&key_len.to_be_bytes())?;
284
285 file.write_all(key)?;
287
288 let value_len = value.len() as u64;
290 file.write_all(&value_len.to_be_bytes())?;
291
292 file.write_all(value)?;
294
295 file.write_all(&term.to_be_bytes())?;
297 }
298
299 file.flush()?;
300 Ok(())
301 }
302
303 async fn persist_data_async(&self) -> Result<(), Error> {
305 let data_copy: HashMap<Vec<u8>, (Vec<u8>, u64)> = {
307 let data = self.data.read();
308 data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
309 };
310
311 let data_path = self.data_dir.join("state.data");
313 let mut file = OpenOptions::new()
314 .write(true)
315 .create(true)
316 .truncate(true)
317 .open(data_path)
318 .await?;
319
320 for (key, (value, term)) in data_copy.iter() {
321 let key_len = key.len() as u64;
323 file.write_all(&key_len.to_be_bytes()).await?;
324
325 file.write_all(key).await?;
327
328 let value_len = value.len() as u64;
330 file.write_all(&value_len.to_be_bytes()).await?;
331
332 file.write_all(value).await?;
334
335 file.write_all(&term.to_be_bytes()).await?;
337 }
338
339 file.flush().await?;
340 Ok(())
341 }
342
343 fn persist_metadata(&self) -> Result<(), Error> {
345 let metadata_path = self.data_dir.join("metadata.bin");
346 let mut file = std::fs::OpenOptions::new()
347 .write(true)
348 .create(true)
349 .truncate(true)
350 .open(metadata_path)?;
351
352 let index = self.last_applied_index.load(Ordering::SeqCst);
353 let term = self.last_applied_term.load(Ordering::SeqCst);
354
355 file.write_all(&index.to_be_bytes())?;
356 file.write_all(&term.to_be_bytes())?;
357
358 file.flush()?;
359 Ok(())
360 }
361
362 async fn persist_metadata_async(&self) -> Result<(), Error> {
363 let metadata_path = self.data_dir.join("metadata.bin");
364 let mut file = OpenOptions::new()
365 .write(true)
366 .create(true)
367 .truncate(true)
368 .open(metadata_path)
369 .await?;
370
371 let index = self.last_applied_index.load(Ordering::SeqCst);
372 let term = self.last_applied_term.load(Ordering::SeqCst);
373
374 file.write_all(&index.to_be_bytes()).await?;
375 file.write_all(&term.to_be_bytes()).await?;
376
377 file.flush().await?;
378 Ok(())
379 }
380
381 async fn append_to_wal(
383 &self,
384 entry: &Entry,
385 operation: &str,
386 key: &[u8],
387 value: Option<&[u8]>,
388 ) -> Result<(), Error> {
389 let wal_path = self.data_dir.join("wal.log");
390 let mut file =
391 OpenOptions::new().write(true).create(true).append(true).open(wal_path).await?;
392
393 file.write_all(&entry.index.to_be_bytes()).await?;
395 file.write_all(&entry.term.to_be_bytes()).await?;
396
397 file.write_all(operation.as_bytes()).await?;
399
400 file.write_all(&(key.len() as u64).to_be_bytes()).await?;
402 file.write_all(key).await?;
403
404 if let Some(value_data) = value {
406 file.write_all(&(value_data.len() as u64).to_be_bytes()).await?;
407 file.write_all(value_data).await?;
408 }
409
410 file.flush().await?;
411 Ok(())
412 }
413
414 fn clear_wal(&self) -> Result<(), Error> {
416 let wal_path = self.data_dir.join("wal.log");
417 let mut file = std::fs::OpenOptions::new()
418 .write(true)
419 .create(true)
420 .truncate(true)
421 .open(wal_path)?;
422
423 file.set_len(0)?;
424 file.flush()?;
425 Ok(())
426 }
427
428 async fn clear_wal_async(&self) -> Result<(), Error> {
430 let wal_path = self.data_dir.join("wal.log");
431 let mut file = OpenOptions::new()
432 .write(true)
433 .create(true)
434 .truncate(true)
435 .open(wal_path)
436 .await?;
437
438 file.set_len(0).await?;
439 file.flush().await?;
440 Ok(())
441 }
442
443 pub async fn reset(&self) -> Result<(), Error> {
451 info!("[Node-{}] Resetting state machine", self.node_id);
452
453 {
455 let mut data = self.data.write();
456 data.clear();
457 }
458
459 self.last_applied_index.store(0, Ordering::SeqCst);
461 self.last_applied_term.store(0, Ordering::SeqCst);
462
463 {
464 let mut snapshot_metadata = self.last_snapshot_metadata.write();
465 *snapshot_metadata = None;
466 }
467
468 self.clear_data_file().await?;
470 self.clear_metadata_file().await?;
471 self.clear_wal_async().await?;
472
473 info!("[Node-{}] State machine reset completed", self.node_id);
474 Ok(())
475 }
476
477 async fn clear_data_file(&self) -> Result<(), Error> {
479 let data_path = self.data_dir.join("state.data");
480 let mut file = OpenOptions::new()
481 .write(true)
482 .create(true)
483 .truncate(true)
484 .open(data_path)
485 .await?;
486
487 file.set_len(0).await?;
488 file.flush().await?;
489 Ok(())
490 }
491
492 async fn clear_metadata_file(&self) -> Result<(), Error> {
494 let metadata_path = self.data_dir.join("metadata.bin");
495 let mut file = OpenOptions::new()
496 .write(true)
497 .create(true)
498 .truncate(true)
499 .open(metadata_path)
500 .await?;
501
502 file.write_all(&0u64.to_be_bytes()).await?;
504 file.write_all(&0u64.to_be_bytes()).await?;
505
506 file.flush().await?;
507 Ok(())
508 }
509
510 }
524
525impl Drop for FileStateMachine {
526 fn drop(&mut self) {
527 let timer = Instant::now();
528
529 match self.save_hard_state() {
531 Ok(_) => debug!("StateMachine saved in {:?}", timer.elapsed()),
532 Err(e) => error!("Failed to save StateMachine: {}", e),
533 }
534 }
535}
536
537#[async_trait]
538impl StateMachine for FileStateMachine {
539 fn start(&self) -> Result<(), Error> {
540 self.running.store(true, Ordering::SeqCst);
541 info!("[Node-{}] File state machine started", self.node_id);
542 Ok(())
543 }
544
545 fn stop(&self) -> Result<(), Error> {
546 self.running.store(false, Ordering::SeqCst);
548 info!("[Node-{}] File state machine stopped", self.node_id);
549 Ok(())
550 }
551
552 fn is_running(&self) -> bool {
553 self.running.load(Ordering::SeqCst)
554 }
555
556 fn get(
557 &self,
558 key_buffer: &[u8],
559 ) -> Result<Option<Vec<u8>>, Error> {
560 let data = self.data.read();
561 Ok(data.get(key_buffer).map(|(value, _)| value.clone()))
562 }
563
564 fn entry_term(
565 &self,
566 entry_id: u64,
567 ) -> Option<u64> {
568 let data = self.data.read();
569 data.values().find(|(_, index)| *index == entry_id).map(|(_, term)| *term)
570 }
571
572 async fn apply_chunk(
573 &self,
574 chunk: Vec<Entry>,
575 ) -> Result<(), Error> {
576 trace!("Applying chunk: {:?}.", chunk);
577
578 let mut highest_index_entry: Option<LogId> = None;
579
580 for entry in chunk {
582 assert!(entry.payload.is_some(), "Entry payload should not be None!");
583
584 if let Some(prev) = &highest_index_entry {
586 assert!(
587 entry.index > prev.index,
588 "apply_chunk: received unordered entry at index {} (prev={})",
589 entry.index,
590 prev.index
591 );
592 }
593 highest_index_entry = Some(LogId {
594 index: entry.index,
595 term: entry.term,
596 });
597
598 match entry.payload.as_ref().unwrap().payload.as_ref() {
599 Some(Payload::Noop(_)) => {
600 debug!("Handling NOOP command at index {}", entry.index);
601 self.append_to_wal(&entry, "NOOP", &[], None).await?;
602 }
603 Some(Payload::Command(bytes)) => match WriteCommand::decode(&bytes[..]) {
604 Ok(write_cmd) => match write_cmd.operation {
605 Some(Operation::Insert(Insert { key, value })) => {
606 debug!(
607 "[Node-{}] Applying INSERT at index {}: {:?}",
608 self.node_id, entry.index, key
609 );
610
611 self.append_to_wal(&entry, "INSERT", &key, Some(&value)).await?;
613
614 let mut data = self.data.write();
616 data.insert(key, (value, entry.term));
617 }
618 Some(Operation::Delete(Delete { key })) => {
619 debug!(
620 "[Node-{}] Applying DELETE at index {}: {:?}",
621 self.node_id, entry.index, key
622 );
623
624 self.append_to_wal(&entry, "DELETE", &key, None).await?;
626
627 let mut data = self.data.write();
629 data.remove(&key);
630 }
631 None => {
632 warn!(
633 "[Node-{}] WriteCommand without operation at index {}",
634 self.node_id, entry.index
635 );
636 }
637 },
638 Err(e) => {
639 error!(
640 "[Node-{}] Failed to decode WriteCommand at index {}: {:?}",
641 self.node_id, entry.index, e
642 );
643 return Err(StorageError::SerializationError(e.to_string()).into());
644 }
645 },
646 Some(Payload::Config(_config_change)) => {
647 debug!(
648 "[Node-{}] Ignoring config change at index {}",
649 self.node_id, entry.index
650 );
651 self.append_to_wal(&entry, "CONFIG", &[], None).await?;
652 }
653 None => panic!("Entry payload variant should not be None!"),
654 }
655
656 info!("[{}]- COMMITTED_LOG_METRIC: {} ", self.node_id, entry.index);
657 }
658
659 if let Some(log_id) = highest_index_entry {
660 debug!(
661 "[Node-{}] State machine - updated last_applied: {:?}",
662 self.node_id, log_id
663 );
664 self.update_last_applied(log_id);
665 }
666
667 self.persist_data_async().await?;
669 self.persist_metadata_async().await?;
670 self.clear_wal_async().await?;
671
672 Ok(())
673 }
674
675 fn len(&self) -> usize {
676 self.data.read().len()
677 }
678
679 fn update_last_applied(
680 &self,
681 last_applied: LogId,
682 ) {
683 self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
684 self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
685 }
686
687 fn last_applied(&self) -> LogId {
688 LogId {
689 index: self.last_applied_index.load(Ordering::SeqCst),
690 term: self.last_applied_term.load(Ordering::SeqCst),
691 }
692 }
693
694 fn persist_last_applied(
695 &self,
696 last_applied: LogId,
697 ) -> Result<(), Error> {
698 self.update_last_applied(last_applied);
699 self.persist_metadata()
700 }
701
702 fn update_last_snapshot_metadata(
703 &self,
704 snapshot_metadata: &SnapshotMetadata,
705 ) -> Result<(), Error> {
706 *self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
707 Ok(())
708 }
709
710 fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
711 self.last_snapshot_metadata.read().clone()
712 }
713
714 fn persist_last_snapshot_metadata(
715 &self,
716 snapshot_metadata: &SnapshotMetadata,
717 ) -> Result<(), Error> {
718 self.update_last_snapshot_metadata(snapshot_metadata)
719 }
720
721 async fn apply_snapshot_from_file(
722 &self,
723 metadata: &SnapshotMetadata,
724 snapshot_dir: std::path::PathBuf,
725 ) -> Result<(), Error> {
726 info!(
727 "[Node-{}] Applying snapshot from file: {:?}",
728 self.node_id, snapshot_dir
729 );
730 println!(
731 "[Node-{}] Applying snapshot from file: {:?}",
732 self.node_id, snapshot_dir
733 );
734
735 let snapshot_data_path = snapshot_dir.join("snapshot.bin");
737 let mut file = File::open(snapshot_data_path).await?;
738 let mut buffer = Vec::new();
739 file.read_to_end(&mut buffer).await?;
740
741 let mut pos = 0;
743 let mut new_data = HashMap::new();
744
745 while pos < buffer.len() {
746 if pos + 8 > buffer.len() {
748 break;
749 }
750
751 let key_len_bytes = &buffer[pos..pos + 8];
752 let key_len = u64::from_be_bytes([
753 key_len_bytes[0],
754 key_len_bytes[1],
755 key_len_bytes[2],
756 key_len_bytes[3],
757 key_len_bytes[4],
758 key_len_bytes[5],
759 key_len_bytes[6],
760 key_len_bytes[7],
761 ]) as usize;
762
763 pos += 8;
764
765 if pos + key_len > buffer.len() {
767 break;
768 }
769
770 let key = buffer[pos..pos + key_len].to_vec();
771 pos += key_len;
772
773 if pos + 8 > buffer.len() {
775 break;
776 }
777
778 let value_len_bytes = &buffer[pos..pos + 8];
779 let value_len = u64::from_be_bytes([
780 value_len_bytes[0],
781 value_len_bytes[1],
782 value_len_bytes[2],
783 value_len_bytes[3],
784 value_len_bytes[4],
785 value_len_bytes[5],
786 value_len_bytes[6],
787 value_len_bytes[7],
788 ]) as usize;
789
790 pos += 8;
791
792 if pos + value_len > buffer.len() {
794 break;
795 }
796
797 let value = buffer[pos..pos + value_len].to_vec();
798 pos += value_len;
799
800 if pos + 8 > buffer.len() {
802 break;
803 }
804
805 let term_bytes = &buffer[pos..pos + 8];
806 let term = u64::from_be_bytes([
807 term_bytes[0],
808 term_bytes[1],
809 term_bytes[2],
810 term_bytes[3],
811 term_bytes[4],
812 term_bytes[5],
813 term_bytes[6],
814 term_bytes[7],
815 ]);
816
817 pos += 8;
818
819 new_data.insert(key, (value, term));
821 }
822
823 {
825 let mut data = self.data.write();
826 *data = new_data;
827 }
828
829 *self.last_snapshot_metadata.write() = Some(metadata.clone());
831
832 if let Some(last_included) = &metadata.last_included {
833 self.update_last_applied(*last_included);
834 }
835
836 self.persist_data_async().await?;
838 self.persist_metadata_async().await?;
839 self.clear_wal_async().await?;
840
841 info!("[Node-{}] Snapshot applied successfully", self.node_id);
842 Ok(())
843 }
844
845 async fn generate_snapshot_data(
846 &self,
847 new_snapshot_dir: std::path::PathBuf,
848 last_included: LogId,
849 ) -> Result<[u8; 32], Error> {
850 info!(
851 "[Node-{}] Generating snapshot data up to {:?}",
852 self.node_id, last_included
853 );
854
855 fs::create_dir_all(&new_snapshot_dir).await?;
857
858 let snapshot_path = new_snapshot_dir.join("snapshot.bin");
860 let mut file = File::create(&snapshot_path).await?;
861
862 let data_copy: HashMap<Vec<u8>, (Vec<u8>, u64)> = {
863 let data = self.data.read();
864 data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
865 };
866
867 for (key, (value, term)) in data_copy.iter() {
869 let key_len = key.len() as u64;
871 file.write_all(&key_len.to_be_bytes()).await?;
872
873 file.write_all(key).await?;
875
876 let value_len = value.len() as u64;
878 file.write_all(&value_len.to_be_bytes()).await?;
879
880 file.write_all(value).await?;
882
883 file.write_all(&term.to_be_bytes()).await?;
885 }
886
887 file.flush().await?;
888
889 let metadata = SnapshotMetadata {
891 last_included: Some(last_included),
892 checksum: vec![0; 32], };
894
895 self.update_last_snapshot_metadata(&metadata)?;
896
897 info!(
898 "[Node-{}] Snapshot generated at {:?}",
899 self.node_id, snapshot_path
900 );
901
902 Ok([0; 32])
904 }
905
906 fn save_hard_state(&self) -> Result<(), Error> {
907 let last_applied = self.last_applied();
908 self.persist_last_applied(last_applied)?;
909
910 if let Some(last_snapshot_metadata) = self.snapshot_metadata() {
911 self.persist_last_snapshot_metadata(&last_snapshot_metadata)?;
912 }
913
914 self.flush()?;
915 Ok(())
916 }
917
918 fn flush(&self) -> Result<(), Error> {
919 self.persist_data()?;
920 self.persist_metadata()?;
921 self.clear_wal()?;
922 Ok(())
923 }
924
925 async fn flush_async(&self) -> Result<(), Error> {
926 self.persist_data_async().await?;
927 self.persist_metadata_async().await?;
928 self.clear_wal_async().await?;
929 Ok(())
930 }
931
932 async fn reset(&self) -> Result<(), Error> {
933 self.reset().await
934 }
935}