1use std::collections::HashMap;
85use std::io::Write;
86use std::path::PathBuf;
87use std::sync::Arc;
88use std::sync::Mutex;
89use std::sync::atomic::AtomicBool;
90use std::sync::atomic::AtomicU64;
91use std::sync::atomic::Ordering;
92use std::time::SystemTime;
93
94use bytes::Bytes;
95use d_engine_core::ApplyResult;
96use d_engine_core::Error;
97use d_engine_core::Lease;
98use d_engine_core::StateMachine;
99use d_engine_core::StorageError;
100use d_engine_proto::client::WriteCommand;
101use d_engine_proto::client::write_command::CompareAndSwap;
102use d_engine_proto::client::write_command::Delete;
103use d_engine_proto::client::write_command::Insert;
104use d_engine_proto::client::write_command::Operation;
105use d_engine_proto::common::Entry;
106use d_engine_proto::common::LogId;
107use d_engine_proto::common::entry_payload::Payload;
108use d_engine_proto::server::storage::SnapshotMetadata;
109use parking_lot::RwLock;
110use prost::Message;
111use tokio::fs;
112use tokio::fs::File;
113use tokio::fs::OpenOptions;
114use tokio::io::AsyncReadExt;
115use tokio::io::AsyncWriteExt;
116use tokio::time::Instant;
117use tonic::async_trait;
118use tracing::debug;
119use tracing::error;
120use tracing::info;
121use tracing::warn;
122
123use crate::storage::DefaultLease;
124
125type FileStateMachineDataType = RwLock<HashMap<Bytes, (Bytes, u64)>>;
126
127#[repr(u8)]
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130enum WalOpCode {
131 Noop = 0,
132 Insert = 1,
133 Delete = 2,
134 Config = 3,
135 CompareAndSwap = 4,
136}
137
138impl WalOpCode {
139 fn from_str(s: &str) -> Self {
140 match s {
141 "INSERT" => Self::Insert,
142 "DELETE" => Self::Delete,
143 "CONFIG" => Self::Config,
144 "CAS" => Self::CompareAndSwap,
145 _ => Self::Noop,
146 }
147 }
148
149 fn from_u8(byte: u8) -> Self {
150 match byte {
151 1 => Self::Insert,
152 2 => Self::Delete,
153 3 => Self::Config,
154 4 => Self::CompareAndSwap,
155 _ => Self::Noop,
156 }
157 }
158}
159
160#[derive(Debug)]
170pub struct FileStateMachine {
171 data: FileStateMachineDataType, lease: Option<Arc<DefaultLease>>,
178
179 lease_enabled: bool,
185
186 last_applied_index: AtomicU64,
188 last_applied_term: AtomicU64,
189 last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
190
191 running: AtomicBool,
193
194 wal_entries_since_checkpoint: AtomicU64,
197 last_checkpoint: Mutex<Instant>,
198
199 data_dir: PathBuf,
201 }
205
206impl FileStateMachine {
207 pub async fn new(data_dir: PathBuf) -> Result<Self, Error> {
217 fs::create_dir_all(&data_dir).await?;
219
220 let machine = Self {
221 data: RwLock::new(HashMap::new()),
222 lease: None, lease_enabled: false, last_applied_index: AtomicU64::new(0),
225 last_applied_term: AtomicU64::new(0),
226 last_snapshot_metadata: RwLock::new(None),
227 running: AtomicBool::new(true),
228 wal_entries_since_checkpoint: AtomicU64::new(0),
229 last_checkpoint: Mutex::new(Instant::now()),
230 data_dir: data_dir.clone(),
231 };
232
233 machine.load_from_disk().await?;
235
236 Ok(machine)
237 }
238
239 pub fn set_lease(
245 &mut self,
246 lease: Arc<DefaultLease>,
247 ) {
248 self.lease_enabled = true;
250 self.lease = Some(lease);
251 }
252
253 async fn load_from_disk(&self) -> Result<(), Error> {
258 self.load_metadata().await?;
260
261 self.load_data().await?;
263
264 self.load_ttl_data().await?;
266
267 self.replay_wal().await?;
269
270 info!("Loaded state machine data from disk");
271 Ok(())
272 }
273
274 async fn load_ttl_data(&self) -> Result<(), Error> {
276 Ok(())
280 }
281
282 pub async fn load_lease_data(&self) -> Result<(), Error> {
287 let Some(ref lease) = self.lease else {
288 return Ok(()); };
290
291 let ttl_path = self.data_dir.join("ttl_state.bin");
292 if !ttl_path.exists() {
293 debug!("No TTL state file found");
294 return Ok(());
295 }
296
297 let ttl_data = tokio::fs::read(&ttl_path).await?;
298 lease.reload(&ttl_data)?;
299
300 info!("Loaded TTL state from disk: {} active TTLs", lease.len());
301 Ok(())
302 }
303
304 async fn load_metadata(&self) -> Result<(), Error> {
306 let metadata_path = self.data_dir.join("metadata.bin");
307 if !metadata_path.exists() {
308 return Ok(());
309 }
310
311 let mut file = File::open(metadata_path).await?;
312 let mut buffer = [0u8; 16];
313
314 if file.read_exact(&mut buffer).await.is_ok() {
315 let index = u64::from_be_bytes([
316 buffer[0], buffer[1], buffer[2], buffer[3], buffer[4], buffer[5], buffer[6],
317 buffer[7],
318 ]);
319
320 let term = u64::from_be_bytes([
321 buffer[8], buffer[9], buffer[10], buffer[11], buffer[12], buffer[13], buffer[14],
322 buffer[15],
323 ]);
324
325 self.last_applied_index.store(index, Ordering::SeqCst);
326 self.last_applied_term.store(term, Ordering::SeqCst);
327 }
328
329 Ok(())
330 }
331
332 async fn load_data(&self) -> Result<(), Error> {
334 let data_path = self.data_dir.join("state.data");
335 if !data_path.exists() {
336 return Ok(());
337 }
338
339 let mut file = File::open(data_path).await?;
340 let mut buffer = Vec::new();
341 file.read_to_end(&mut buffer).await?;
342
343 let mut pos = 0;
344 let mut data = self.data.write();
345
346 while pos < buffer.len() {
347 if pos + 8 > buffer.len() {
349 break;
350 }
351
352 let key_len_bytes = &buffer[pos..pos + 8];
353 let key_len = u64::from_be_bytes([
354 key_len_bytes[0],
355 key_len_bytes[1],
356 key_len_bytes[2],
357 key_len_bytes[3],
358 key_len_bytes[4],
359 key_len_bytes[5],
360 key_len_bytes[6],
361 key_len_bytes[7],
362 ]) as usize;
363
364 pos += 8;
365
366 if pos + key_len > buffer.len() {
368 break;
369 }
370
371 let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
372 pos += key_len;
373
374 if pos + 8 > buffer.len() {
376 break;
377 }
378
379 let value_len_bytes = &buffer[pos..pos + 8];
380 let value_len = u64::from_be_bytes([
381 value_len_bytes[0],
382 value_len_bytes[1],
383 value_len_bytes[2],
384 value_len_bytes[3],
385 value_len_bytes[4],
386 value_len_bytes[5],
387 value_len_bytes[6],
388 value_len_bytes[7],
389 ]) as usize;
390
391 pos += 8;
392
393 if pos + value_len > buffer.len() {
395 break;
396 }
397
398 let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
399 pos += value_len;
400
401 if pos + 8 > buffer.len() {
403 break;
404 }
405
406 let term_bytes = &buffer[pos..pos + 8];
407 let term = u64::from_be_bytes([
408 term_bytes[0],
409 term_bytes[1],
410 term_bytes[2],
411 term_bytes[3],
412 term_bytes[4],
413 term_bytes[5],
414 term_bytes[6],
415 term_bytes[7],
416 ]);
417
418 pos += 8;
419
420 data.insert(key, (value, term));
422 }
423
424 Ok(())
425 }
426
427 async fn replay_wal(&self) -> Result<(), Error> {
429 let wal_path = self.data_dir.join("wal.log");
430 if !wal_path.exists() {
431 debug!("No WAL file found, skipping replay");
432 return Ok(());
433 }
434
435 let mut file = File::open(wal_path).await?;
436 let mut buffer = Vec::new();
437 file.read_to_end(&mut buffer).await?;
438
439 if buffer.is_empty() {
440 debug!("WAL file is empty, skipping replay");
441 return Ok(());
442 }
443
444 let mut pos = 0;
445 let mut operations = Vec::new();
446 let mut replayed_count = 0;
447
448 while pos + 17 < buffer.len() {
449 let _index = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
451 pos += 8;
452
453 let term = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
455 pos += 8;
456
457 let op_code = WalOpCode::from_u8(buffer[pos]);
459 pos += 1;
460
461 if pos + 8 > buffer.len() {
463 warn!("Incomplete key length at position {}, stopping replay", pos);
464 break;
465 }
466
467 let key_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
469 pos += 8;
470
471 if pos + key_len > buffer.len() {
473 warn!(
474 "Incomplete key data at position {} (need {} bytes, have {})",
475 pos,
476 key_len,
477 buffer.len() - pos
478 );
479 break;
480 }
481
482 let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
484 pos += key_len;
485
486 if pos + 8 > buffer.len() {
488 warn!(
489 "Incomplete value length at position {}, stopping replay",
490 pos
491 );
492 break;
493 }
494
495 let value_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
497 pos += 8;
498
499 let value = if value_len > 0 {
501 if pos + value_len > buffer.len() {
502 warn!("Incomplete value data at position {}, stopping replay", pos);
503 break;
504 }
505 let value_data = Bytes::from(buffer[pos..pos + value_len].to_vec());
506 pos += value_len;
507 Some(value_data)
508 } else {
509 None
510 };
511
512 let expire_at_secs = if pos + 8 <= buffer.len() {
526 let secs = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
527 pos += 8;
528 if secs > 0 { Some(secs) } else { None }
529 } else {
530 debug!(
533 "No expiration time field at position {}, assuming no TTL (incomplete WAL entry)",
534 pos
535 );
536 None
537 };
538
539 operations.push((op_code, key, value, term, expire_at_secs));
540 replayed_count += 1;
541 }
542
543 info!(
544 "Parsed {} WAL operations, applying to memory",
545 operations.len()
546 );
547
548 let mut applied_count = 0;
550 let mut skipped_expired = 0;
551 let now = std::time::SystemTime::now();
552 {
553 let mut data = self.data.write();
554
555 for (op_code, key, value, term, expire_at_secs) in operations {
556 match op_code {
557 WalOpCode::Insert => {
558 if let Some(value_data) = value {
559 let is_expired = if let Some(secs) = expire_at_secs {
561 let expire_at =
562 std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs);
563 now >= expire_at
564 } else {
565 false
566 };
567
568 if is_expired {
569 debug!("Skipped expired key during WAL replay: key={:?}", key);
571 skipped_expired += 1;
572 continue;
573 }
574
575 data.insert(key.clone(), (value_data, term));
576
577 if let Some(secs) = expire_at_secs {
579 if let Some(ref lease) = self.lease {
580 let expire_at = std::time::UNIX_EPOCH
581 + std::time::Duration::from_secs(secs);
582 let remaining = expire_at
583 .duration_since(now)
584 .map(|d| d.as_secs())
585 .unwrap_or(0);
586
587 if remaining > 0 {
588 lease.register(key.clone(), remaining);
589 debug!(
590 "Replayed INSERT with TTL: key={:?}, remaining={}s",
591 key, remaining
592 );
593 }
594 }
595 } else {
596 debug!("Replayed INSERT: key={:?}", key);
597 }
598
599 applied_count += 1;
600 } else {
601 warn!("INSERT operation without value");
602 }
603 }
604 WalOpCode::Delete => {
605 data.remove(&key);
606 if let Some(ref lease) = self.lease {
607 lease.unregister(&key);
608 }
609 applied_count += 1;
610 debug!("Replayed DELETE: key={:?}", key);
611 }
612 WalOpCode::CompareAndSwap => {
613 if let Some(new_value) = value {
617 data.insert(key.clone(), (new_value, term));
618 applied_count += 1;
619 debug!("Replayed CAS: key={:?}", key);
620 } else {
621 warn!("CAS operation without new_value in WAL");
622 }
623 }
624 WalOpCode::Noop | WalOpCode::Config => {
625 applied_count += 1;
627 debug!("Replayed {:?} operation", op_code);
628 }
629 }
630 }
631 }
632
633 info!(
634 "WAL replay complete: {} operations replayed, {} applied, {} expired keys skipped",
635 replayed_count, applied_count, skipped_expired
636 );
637
638 if applied_count > 0 {
640 self.clear_wal_async().await?;
641 debug!(
642 "Cleared WAL after successful replay of {} operations",
643 applied_count
644 );
645 }
646
647 Ok(())
648 }
649
650 fn persist_data(&self) -> Result<(), Error> {
652 let data_copy: HashMap<Bytes, (Bytes, u64)> = {
654 let data = self.data.read();
655 data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
656 };
657
658 let data_path = self.data_dir.join("state.data");
660
661 let estimated: usize =
662 data_copy.iter().map(|(k, (v, _))| 8 + k.len() + 8 + v.len() + 8).sum();
663 let mut buf = Vec::with_capacity(estimated);
664
665 for (key, (value, term)) in &data_copy {
666 buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
667 buf.extend_from_slice(key);
668 buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
669 buf.extend_from_slice(value);
670 buf.extend_from_slice(&term.to_be_bytes());
671 }
672
673 std::fs::write(data_path, buf)?;
674 Ok(())
675 }
676
677 async fn persist_data_async(&self) -> Result<(), Error> {
679 let data_copy: HashMap<Bytes, (Bytes, u64)> = {
681 let data = self.data.read();
682 data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
683 };
684
685 let data_path = self.data_dir.join("state.data");
686 let mut file = OpenOptions::new()
687 .write(true)
688 .create(true)
689 .truncate(true)
690 .open(data_path)
691 .await?;
692
693 let estimated: usize =
696 data_copy.iter().map(|(k, (v, _))| 8 + k.len() + 8 + v.len() + 8).sum();
697 let mut buf = Vec::with_capacity(estimated);
698
699 for (key, (value, term)) in &data_copy {
700 buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
701 buf.extend_from_slice(key);
702 buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
703 buf.extend_from_slice(value);
704 buf.extend_from_slice(&term.to_be_bytes());
705 }
706
707 file.write_all(&buf).await?;
708 file.flush().await?;
709
710 Ok(())
711 }
712
713 fn persist_metadata(&self) -> Result<(), Error> {
715 let metadata_path = self.data_dir.join("metadata.bin");
716 let mut file = std::fs::OpenOptions::new()
717 .write(true)
718 .create(true)
719 .truncate(true)
720 .open(metadata_path)?;
721
722 let index = self.last_applied_index.load(Ordering::SeqCst);
723 let term = self.last_applied_term.load(Ordering::SeqCst);
724
725 file.write_all(&index.to_be_bytes())?;
726 file.write_all(&term.to_be_bytes())?;
727
728 file.flush()?;
729 Ok(())
730 }
731
732 async fn persist_metadata_async(&self) -> Result<(), Error> {
733 let metadata_path = self.data_dir.join("metadata.bin");
734 let mut file = OpenOptions::new()
735 .write(true)
736 .create(true)
737 .truncate(true)
738 .open(metadata_path)
739 .await?;
740
741 let index = self.last_applied_index.load(Ordering::SeqCst);
742 let term = self.last_applied_term.load(Ordering::SeqCst);
743
744 file.write_all(&index.to_be_bytes()).await?;
745 file.write_all(&term.to_be_bytes()).await?;
746
747 file.flush().await?;
748 Ok(())
749 }
750
751 #[allow(unused)]
753 fn clear_wal(&self) -> Result<(), Error> {
754 let wal_path = self.data_dir.join("wal.log");
755 let mut file = std::fs::OpenOptions::new()
756 .write(true)
757 .create(true)
758 .truncate(true)
759 .open(wal_path)?;
760
761 file.set_len(0)?;
762 file.flush()?;
763 Ok(())
764 }
765
766 async fn clear_wal_async(&self) -> Result<(), Error> {
768 let wal_path = self.data_dir.join("wal.log");
769 let mut file = OpenOptions::new()
770 .write(true)
771 .create(true)
772 .truncate(true)
773 .open(wal_path)
774 .await?;
775
776 file.set_len(0).await?;
777 file.flush().await?;
778 Ok(())
779 }
780
781 fn should_checkpoint(&self) -> bool {
787 const WAL_ENTRY_THRESHOLD: u64 = 1000;
788 const TIME_THRESHOLD_SECS: u64 = 10;
789
790 if self.wal_entries_since_checkpoint.load(Ordering::Relaxed) >= WAL_ENTRY_THRESHOLD {
791 return true;
792 }
793 if let Ok(last) = self.last_checkpoint.lock() {
794 return last.elapsed().as_secs() >= TIME_THRESHOLD_SECS;
795 }
796 false
797 }
798
799 pub(crate) async fn checkpoint(&self) -> Result<(), Error> {
803 self.persist_data_async().await?;
804 self.persist_metadata_async().await?;
805 self.clear_wal_async().await?;
806
807 self.wal_entries_since_checkpoint.store(0, Ordering::Relaxed);
808 if let Ok(mut last) = self.last_checkpoint.lock() {
809 *last = Instant::now();
810 }
811 debug!("Checkpoint complete");
812 Ok(())
813 }
814
815 pub async fn reset(&self) -> Result<(), Error> {
823 info!("Resetting state machine");
824
825 {
827 let mut data = self.data.write();
828 data.clear();
829 }
830
831 self.last_applied_index.store(0, Ordering::SeqCst);
833 self.last_applied_term.store(0, Ordering::SeqCst);
834
835 {
836 let mut snapshot_metadata = self.last_snapshot_metadata.write();
837 *snapshot_metadata = None;
838 }
839
840 self.clear_data_file().await?;
842 self.clear_metadata_file().await?;
843 self.clear_wal_async().await?;
844
845 info!("State machine reset completed");
846 Ok(())
847 }
848
849 async fn clear_data_file(&self) -> Result<(), Error> {
851 let data_path = self.data_dir.join("state.data");
852 let mut file = OpenOptions::new()
853 .write(true)
854 .create(true)
855 .truncate(true)
856 .open(data_path)
857 .await?;
858
859 file.set_len(0).await?;
860 file.flush().await?;
861 Ok(())
862 }
863
864 async fn clear_metadata_file(&self) -> Result<(), Error> {
866 let metadata_path = self.data_dir.join("metadata.bin");
867 let mut file = OpenOptions::new()
868 .write(true)
869 .create(true)
870 .truncate(true)
871 .open(metadata_path)
872 .await?;
873
874 file.write_all(&0u64.to_be_bytes()).await?;
876 file.write_all(&0u64.to_be_bytes()).await?;
877
878 file.flush().await?;
879 Ok(())
880 }
881
882 pub(crate) async fn append_to_wal(
893 &self,
894 entries: Vec<(Entry, String, Bytes, Option<Bytes>, u64)>,
895 ) -> Result<(), Error> {
896 if entries.is_empty() {
897 return Ok(());
898 }
899
900 let wal_path = self.data_dir.join("wal.log");
901
902 let mut file =
903 OpenOptions::new().write(true).create(true).append(true).open(&wal_path).await?;
904
905 let estimated_size: usize = entries
907 .iter()
908 .map(|(_, _, key, value, _)| {
909 8 + 8 + 1 + 8 + key.len() + 8 + value.as_ref().map_or(0, |v| v.len()) + 8
910 })
911 .sum();
912
913 let mut batch_buffer = Vec::with_capacity(estimated_size);
915
916 for (entry, operation, key, value, ttl_secs) in entries {
917 batch_buffer.extend_from_slice(&entry.index.to_be_bytes());
919 batch_buffer.extend_from_slice(&entry.term.to_be_bytes());
920
921 let op_code = WalOpCode::from_str(&operation);
923 batch_buffer.push(op_code as u8);
924
925 batch_buffer.extend_from_slice(&(key.len() as u64).to_be_bytes());
927 batch_buffer.extend_from_slice(&key);
928
929 if let Some(value_data) = value {
932 batch_buffer.extend_from_slice(&(value_data.len() as u64).to_be_bytes());
933 batch_buffer.extend_from_slice(&value_data);
934 } else {
935 batch_buffer.extend_from_slice(&0u64.to_be_bytes());
937 }
938
939 let expire_at_secs = if ttl_secs > 0 {
942 let expire_at =
943 std::time::SystemTime::now() + std::time::Duration::from_secs(ttl_secs);
944 expire_at
945 .duration_since(std::time::UNIX_EPOCH)
946 .map(|d| d.as_secs())
947 .unwrap_or(0)
948 } else {
949 0
950 };
951 batch_buffer.extend_from_slice(&expire_at_secs.to_be_bytes());
952 }
953
954 file.write_all(&batch_buffer).await?;
955 file.flush().await?;
956
957 Ok(())
958 }
959}
960
961impl Drop for FileStateMachine {
962 fn drop(&mut self) {
963 let timer = Instant::now();
964
965 match self.save_hard_state() {
967 Ok(_) => debug!("StateMachine saved in {:?}", timer.elapsed()),
968 Err(e) => error!("Failed to save StateMachine: {}", e),
969 }
970 }
971}
972
973#[async_trait]
974impl StateMachine for FileStateMachine {
975 async fn start(&self) -> Result<(), Error> {
976 self.running.store(true, Ordering::SeqCst);
977
978 if self.lease.is_some() {
980 self.load_lease_data().await?;
981 debug!("Lease data loaded during state machine initialization");
982 }
983
984 info!("File state machine started");
985 Ok(())
986 }
987
988 fn stop(&self) -> Result<(), Error> {
989 self.running.store(false, Ordering::SeqCst);
991
992 if let Some(ref lease) = self.lease {
995 let ttl_snapshot = lease.to_snapshot();
996 let ttl_path = self.data_dir.join("ttl_state.bin");
997 std::fs::write(&ttl_path, ttl_snapshot)
999 .map_err(d_engine_core::StorageError::IoError)?;
1000 debug!("Persisted TTL state on shutdown");
1001 }
1002
1003 info!("File state machine stopped");
1004 Ok(())
1005 }
1006
1007 fn is_running(&self) -> bool {
1008 self.running.load(Ordering::SeqCst)
1009 }
1010
1011 fn get(
1012 &self,
1013 key_buffer: &[u8],
1014 ) -> Result<Option<Bytes>, Error> {
1015 let data = self.data.read();
1020 Ok(data.get(key_buffer).map(|(value, _)| value.clone()))
1021 }
1022
1023 fn entry_term(
1024 &self,
1025 entry_id: u64,
1026 ) -> Option<u64> {
1027 let data = self.data.read();
1028 data.values().find(|(_, index)| *index == entry_id).map(|(_, term)| *term)
1029 }
1030
1031 async fn apply_chunk(
1033 &self,
1034 chunk: Vec<Entry>,
1035 ) -> Result<Vec<ApplyResult>, Error> {
1036 let chunk_len = chunk.len();
1037 let mut highest_index_entry: Option<LogId> = None;
1038 let mut batch_operations = Vec::new();
1039 let mut results = Vec::with_capacity(chunk_len);
1040
1041 for entry in chunk {
1043 let entry_index = entry.index;
1044
1045 assert!(entry.payload.is_some(), "Entry payload should not be None!");
1046
1047 if let Some(prev) = &highest_index_entry {
1049 assert!(
1050 entry.index > prev.index,
1051 "apply_chunk: received unordered entry at index {} (prev={})",
1052 entry.index,
1053 prev.index
1054 );
1055 }
1056 highest_index_entry = Some(LogId {
1057 index: entry.index,
1058 term: entry.term,
1059 });
1060
1061 match entry.payload.as_ref().unwrap().payload.as_ref() {
1063 Some(Payload::Noop(_)) => {
1064 let entry_index = entry.index;
1065 debug!("Handling NOOP command at index {}", entry_index);
1066 batch_operations.push((entry, "NOOP", Bytes::new(), None, 0));
1067 results.push(ApplyResult::success(entry_index));
1068 }
1069 Some(Payload::Command(bytes)) => match WriteCommand::decode(&bytes[..]) {
1070 Ok(write_cmd) => {
1071 match write_cmd.operation {
1073 Some(Operation::Insert(Insert {
1074 key,
1075 value,
1076 ttl_secs,
1077 })) => {
1078 let entry_index = entry.index;
1079 batch_operations.push((
1080 entry,
1081 "INSERT",
1082 key,
1083 Some(value),
1084 ttl_secs,
1085 ));
1086 results.push(ApplyResult::success(entry_index));
1087 }
1088 Some(Operation::Delete(Delete { key })) => {
1089 let entry_index = entry.index;
1090 batch_operations.push((entry, "DELETE", key, None, 0));
1091 results.push(ApplyResult::success(entry_index));
1092 }
1093 Some(Operation::CompareAndSwap(CompareAndSwap {
1094 key,
1095 expected_value: _,
1096 new_value,
1097 })) => {
1098 batch_operations.push((entry, "CAS", key, Some(new_value), 0));
1099 }
1101 None => {
1102 warn!("WriteCommand without operation at index {}", entry.index);
1103 batch_operations.push((entry, "NOOP", Bytes::new(), None, 0));
1104 }
1105 }
1106 }
1107 Err(e) => {
1108 error!(
1109 "Failed to decode WriteCommand at index {}: {:?}",
1110 entry.index, e
1111 );
1112 return Err(StorageError::SerializationError(e.to_string()).into());
1113 }
1114 },
1115 Some(Payload::Config(_config_change)) => {
1116 debug!("Ignoring config change at index {}", entry.index);
1117 batch_operations.push((entry, "CONFIG", Bytes::new(), None, 0));
1118 }
1119 None => panic!("Entry payload variant should not be None!"),
1120 }
1121
1122 info!("COMMITTED_LOG_METRIC: {}", entry_index);
1123 }
1124
1125 let mut wal_entries = Vec::new();
1127 for (entry, operation, key, value, ttl_secs) in &batch_operations {
1128 wal_entries.push((
1130 entry.clone(),
1131 operation.to_string(),
1132 key.clone(),
1133 value.clone(),
1134 *ttl_secs, ));
1136 }
1137
1138 self.append_to_wal(wal_entries).await?;
1140
1141 {
1143 let mut data = self.data.write();
1144
1145 for (entry, operation, key, value, ttl_secs) in batch_operations {
1147 match operation {
1148 "NOOP" => {
1149 }
1151 "INSERT" => {
1152 if let Some(value) = value {
1153 data.insert(key.clone(), (value, entry.term));
1155
1156 if ttl_secs > 0 {
1158 if !self.lease_enabled {
1160 return Err(StorageError::FeatureNotEnabled(
1161 "TTL feature is not enabled on this server. \
1162 Enable it in config: [raft.state_machine.lease] enabled = true".into()
1163 ).into());
1164 }
1165
1166 let lease = unsafe { self.lease.as_ref().unwrap_unchecked() };
1168 lease.register(key, ttl_secs);
1169 }
1170 }
1171 }
1173 "DELETE" => {
1174 data.remove(&key);
1175 if let Some(ref lease) = self.lease {
1176 lease.unregister(&key);
1177 }
1178 }
1180 "CAS" => {
1181 if let Some(Payload::Command(bytes)) =
1183 entry.payload.as_ref().unwrap().payload.as_ref()
1184 {
1185 if let Ok(write_cmd) = WriteCommand::decode(&bytes[..]) {
1186 if let Some(Operation::CompareAndSwap(CompareAndSwap {
1187 expected_value,
1188 ..
1189 })) = write_cmd.operation
1190 {
1191 let current_value = data.get(&key);
1193
1194 let cas_success = match (current_value, &expected_value) {
1195 (Some((current, _)), Some(expected)) => {
1196 current.as_ref() == expected.as_ref()
1197 }
1198 (None, None) => true,
1199 _ => false,
1200 };
1201
1202 results.push(if cas_success {
1204 ApplyResult::success(entry.index)
1205 } else {
1206 ApplyResult::failure(entry.index)
1207 });
1208
1209 debug!(
1210 "CAS at index {}: key={:?}, success={}",
1211 entry.index,
1212 String::from_utf8_lossy(&key),
1213 cas_success
1214 );
1215
1216 if cas_success {
1217 if let Some(new_value) = value {
1218 data.insert(key, (new_value, entry.term));
1219 }
1220 }
1221 }
1222 }
1223 }
1224 }
1225 "CONFIG" => {
1226 }
1228 _ => warn!("Unknown operation: {}", operation),
1229 }
1230 }
1231 } if let Some(log_id) = highest_index_entry {
1237 debug!("State machine - updated last_applied: {:?}", log_id);
1238 self.update_last_applied(log_id);
1239 }
1240
1241 self.wal_entries_since_checkpoint.fetch_add(chunk_len as u64, Ordering::Relaxed);
1242
1243 if self.should_checkpoint() {
1244 self.checkpoint().await?;
1245 }
1246
1247 Ok(results)
1248 }
1249
1250 fn len(&self) -> usize {
1251 self.data.read().len()
1252 }
1253
1254 fn update_last_applied(
1255 &self,
1256 last_applied: LogId,
1257 ) {
1258 self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
1259 self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
1260 }
1261
1262 fn last_applied(&self) -> LogId {
1263 LogId {
1264 index: self.last_applied_index.load(Ordering::SeqCst),
1265 term: self.last_applied_term.load(Ordering::SeqCst),
1266 }
1267 }
1268
1269 fn persist_last_applied(
1270 &self,
1271 last_applied: LogId,
1272 ) -> Result<(), Error> {
1273 self.update_last_applied(last_applied);
1274 self.persist_metadata()
1275 }
1276
1277 fn update_last_snapshot_metadata(
1278 &self,
1279 snapshot_metadata: &SnapshotMetadata,
1280 ) -> Result<(), Error> {
1281 *self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
1282 Ok(())
1283 }
1284
1285 fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
1286 self.last_snapshot_metadata.read().clone()
1287 }
1288
1289 fn persist_last_snapshot_metadata(
1290 &self,
1291 snapshot_metadata: &SnapshotMetadata,
1292 ) -> Result<(), Error> {
1293 self.update_last_snapshot_metadata(snapshot_metadata)
1294 }
1295
1296 async fn apply_snapshot_from_file(
1297 &self,
1298 metadata: &SnapshotMetadata,
1299 snapshot_dir: std::path::PathBuf,
1300 ) -> Result<(), Error> {
1301 info!("Applying snapshot from file: {:?}", snapshot_dir);
1302
1303 let snapshot_data_path = snapshot_dir.join("snapshot.bin");
1305 let mut file = File::open(snapshot_data_path).await?;
1306 let mut buffer = Vec::new();
1307 file.read_to_end(&mut buffer).await?;
1308
1309 let mut pos = 0;
1311 let mut new_data = HashMap::new();
1312
1313 while pos < buffer.len() {
1314 if pos + 8 > buffer.len() {
1316 break;
1317 }
1318
1319 let key_len_bytes = &buffer[pos..pos + 8];
1320 let key_len = u64::from_be_bytes([
1321 key_len_bytes[0],
1322 key_len_bytes[1],
1323 key_len_bytes[2],
1324 key_len_bytes[3],
1325 key_len_bytes[4],
1326 key_len_bytes[5],
1327 key_len_bytes[6],
1328 key_len_bytes[7],
1329 ]) as usize;
1330
1331 pos += 8;
1332
1333 if pos + key_len > buffer.len() {
1335 break;
1336 }
1337
1338 let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
1339 pos += key_len;
1340
1341 if pos + 8 > buffer.len() {
1343 break;
1344 }
1345
1346 let value_len_bytes = &buffer[pos..pos + 8];
1347 let value_len = u64::from_be_bytes([
1348 value_len_bytes[0],
1349 value_len_bytes[1],
1350 value_len_bytes[2],
1351 value_len_bytes[3],
1352 value_len_bytes[4],
1353 value_len_bytes[5],
1354 value_len_bytes[6],
1355 value_len_bytes[7],
1356 ]) as usize;
1357
1358 pos += 8;
1359
1360 if pos + value_len > buffer.len() {
1362 break;
1363 }
1364
1365 let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
1366 pos += value_len;
1367
1368 if pos + 8 > buffer.len() {
1370 break;
1371 }
1372
1373 let term_bytes = &buffer[pos..pos + 8];
1374 let term = u64::from_be_bytes([
1375 term_bytes[0],
1376 term_bytes[1],
1377 term_bytes[2],
1378 term_bytes[3],
1379 term_bytes[4],
1380 term_bytes[5],
1381 term_bytes[6],
1382 term_bytes[7],
1383 ]);
1384
1385 pos += 8;
1386
1387 new_data.insert(key, (value, term));
1389 }
1390
1391 if pos + 8 <= buffer.len() {
1393 let ttl_len_bytes = &buffer[pos..pos + 8];
1394 let ttl_len = u64::from_be_bytes([
1395 ttl_len_bytes[0],
1396 ttl_len_bytes[1],
1397 ttl_len_bytes[2],
1398 ttl_len_bytes[3],
1399 ttl_len_bytes[4],
1400 ttl_len_bytes[5],
1401 ttl_len_bytes[6],
1402 ttl_len_bytes[7],
1403 ]) as usize;
1404 pos += 8;
1405
1406 if pos + ttl_len <= buffer.len() {
1407 let ttl_data = &buffer[pos..pos + ttl_len];
1408 if let Some(ref lease) = self.lease {
1409 lease.reload(ttl_data)?;
1410 }
1411 }
1412 }
1413
1414 {
1416 let mut data = self.data.write();
1417 *data = new_data;
1418 }
1419
1420 *self.last_snapshot_metadata.write() = Some(metadata.clone());
1422
1423 if let Some(last_included) = &metadata.last_included {
1424 self.update_last_applied(*last_included);
1425 }
1426
1427 self.persist_data_async().await?;
1429 self.persist_metadata_async().await?;
1430 self.clear_wal_async().await?;
1431
1432 info!("Snapshot applied successfully");
1433 Ok(())
1434 }
1435
1436 async fn generate_snapshot_data(
1437 &self,
1438 new_snapshot_dir: std::path::PathBuf,
1439 last_included: LogId,
1440 ) -> Result<Bytes, Error> {
1441 info!("Generating snapshot data up to {:?}", last_included);
1442
1443 fs::create_dir_all(&new_snapshot_dir).await?;
1445
1446 let snapshot_path = new_snapshot_dir.join("snapshot.bin");
1448 let mut file = File::create(&snapshot_path).await?;
1449
1450 let data_copy: HashMap<Bytes, (Bytes, u64)> = {
1451 let data = self.data.read();
1452 data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
1453 };
1454
1455 let lease_snapshot = if let Some(ref lease) = self.lease {
1457 lease.to_snapshot()
1458 } else {
1459 Vec::new()
1460 };
1461 let estimated: usize =
1462 data_copy.iter().map(|(k, (v, _))| 8 + k.len() + 8 + v.len() + 8).sum::<usize>()
1463 + 8
1464 + lease_snapshot.len();
1465 let mut buf = Vec::with_capacity(estimated);
1466
1467 for (key, (value, term)) in &data_copy {
1468 buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
1469 buf.extend_from_slice(key);
1470 buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
1471 buf.extend_from_slice(value);
1472 buf.extend_from_slice(&term.to_be_bytes());
1473 }
1474
1475 buf.extend_from_slice(&(lease_snapshot.len() as u64).to_be_bytes());
1476 buf.extend_from_slice(&lease_snapshot);
1477
1478 file.write_all(&buf).await?;
1479
1480 file.flush().await?;
1481
1482 let metadata = SnapshotMetadata {
1484 last_included: Some(last_included),
1485 checksum: Bytes::from(vec![0; 32]), };
1487
1488 self.update_last_snapshot_metadata(&metadata)?;
1489
1490 info!("Snapshot generated at {:?}", snapshot_path);
1491
1492 Ok(Bytes::from_static(&[0u8; 32]))
1494 }
1495
1496 fn save_hard_state(&self) -> Result<(), Error> {
1497 let last_applied = self.last_applied();
1498 self.persist_last_applied(last_applied)?;
1499
1500 if let Some(last_snapshot_metadata) = self.snapshot_metadata() {
1501 self.persist_last_snapshot_metadata(&last_snapshot_metadata)?;
1502 }
1503
1504 self.flush()?;
1505 Ok(())
1506 }
1507
1508 fn flush(&self) -> Result<(), Error> {
1509 self.persist_data()?;
1510 self.persist_metadata()?;
1511 Ok(())
1513 }
1514
1515 async fn flush_async(&self) -> Result<(), Error> {
1516 self.checkpoint().await
1518 }
1519
1520 async fn reset(&self) -> Result<(), Error> {
1521 self.reset().await
1522 }
1523
1524 async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
1525 let Some(ref lease) = self.lease else {
1527 return Ok(vec![]);
1528 };
1529
1530 let now = SystemTime::now();
1532 let expired_keys = lease.get_expired_keys(now);
1533
1534 if expired_keys.is_empty() {
1535 return Ok(vec![]);
1536 }
1537
1538 debug!(
1539 "Lease background cleanup: found {} expired keys",
1540 expired_keys.len()
1541 );
1542
1543 {
1545 let mut data = self.data.write();
1546 for key in &expired_keys {
1547 data.remove(key);
1548 }
1549 }
1550
1551 self.persist_data_async().await?;
1553
1554 info!(
1555 "Lease background cleanup: deleted {} expired keys",
1556 expired_keys.len()
1557 );
1558
1559 Ok(expired_keys)
1560 }
1561}