1use std::collections::{HashMap, HashSet};
85use std::io::Write;
86use std::path::PathBuf;
87use std::sync::Arc;
88use std::sync::Mutex;
89use std::sync::atomic::AtomicBool;
90use std::sync::atomic::AtomicU64;
91use std::sync::atomic::Ordering;
92use std::time::SystemTime;
93
94use async_trait::async_trait;
95use bytes::Bytes;
96use d_engine_core::ApplyEntry;
97use d_engine_core::ApplyResult;
98use d_engine_core::Command;
99use d_engine_core::Error;
100use d_engine_core::Lease;
101use d_engine_core::ScanResult;
102use d_engine_core::StateMachine;
103use d_engine_core::StorageError;
104use d_engine_proto::common::LogId;
105use d_engine_proto::server::storage::SnapshotMetadata;
106use parking_lot::RwLock;
107use tokio::fs;
108use tokio::fs::File;
109use tokio::fs::OpenOptions;
110use tokio::io::AsyncReadExt;
111use tokio::io::AsyncWriteExt;
112use tokio::time::Instant;
113use tracing::debug;
114use tracing::error;
115use tracing::info;
116use tracing::warn;
117
118use crate::storage::DefaultLease;
119
120type FileStateMachineDataType = RwLock<HashMap<Bytes, (Bytes, u64)>>;
121
122#[repr(u8)]
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125enum WalOpCode {
126 Noop = 0,
127 Insert = 1,
128 Delete = 2,
129 Config = 3,
130 CompareAndSwap = 4,
131 CasFailed = 5,
132}
133
134impl WalOpCode {
135 #[allow(dead_code)]
136 fn from_str(s: &str) -> Option<Self> {
137 match s {
138 "NOOP" => Some(Self::Noop),
139 "INSERT" => Some(Self::Insert),
140 "DELETE" => Some(Self::Delete),
141 "CONFIG" => Some(Self::Config),
142 "CAS" => Some(Self::CompareAndSwap),
143 "CAS_FAILED" => Some(Self::CasFailed),
144 _ => None,
145 }
146 }
147
148 fn from_u8(byte: u8) -> Option<Self> {
149 match byte {
150 0 => Some(Self::Noop),
151 1 => Some(Self::Insert),
152 2 => Some(Self::Delete),
153 3 => Some(Self::Config),
154 4 => Some(Self::CompareAndSwap),
155 5 => Some(Self::CasFailed),
156 _ => None,
157 }
158 }
159}
160
161fn encode_wal_entry(
167 buf: &mut Vec<u8>,
168 entry: &ApplyEntry,
169 cas_success: bool,
170) {
171 buf.extend_from_slice(&entry.index.to_be_bytes());
172 buf.extend_from_slice(&entry.term.to_be_bytes());
173 match &entry.command {
174 Command::Noop => {
175 buf.push(WalOpCode::Noop as u8);
176 buf.extend_from_slice(&0u64.to_be_bytes());
177 buf.extend_from_slice(&0u64.to_be_bytes());
178 buf.extend_from_slice(&0u64.to_be_bytes());
179 }
180 Command::Insert {
181 key,
182 value,
183 ttl_secs,
184 } => {
185 buf.push(WalOpCode::Insert as u8);
186 buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
187 buf.extend_from_slice(key);
188 buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
189 buf.extend_from_slice(value);
190 let expire_at_secs = if let Some(ttl) = ttl_secs {
195 let expire_at = std::time::SystemTime::now() + std::time::Duration::from_secs(*ttl);
196 expire_at
197 .duration_since(std::time::UNIX_EPOCH)
198 .map(|d| d.as_secs())
199 .unwrap_or(0)
200 } else {
201 0
202 };
203 buf.extend_from_slice(&expire_at_secs.to_be_bytes());
204 }
205 Command::Delete { key } => {
206 buf.push(WalOpCode::Delete as u8);
207 buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
208 buf.extend_from_slice(key);
209 buf.extend_from_slice(&0u64.to_be_bytes()); buf.extend_from_slice(&0u64.to_be_bytes()); }
212 Command::CompareAndSwap { key, value, .. } => {
213 if cas_success {
214 buf.push(WalOpCode::Insert as u8);
215 buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
216 buf.extend_from_slice(key);
217 buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
218 buf.extend_from_slice(value);
219 buf.extend_from_slice(&0u64.to_be_bytes()); } else {
221 buf.push(WalOpCode::CasFailed as u8);
222 buf.extend_from_slice(&0u64.to_be_bytes()); buf.extend_from_slice(&0u64.to_be_bytes()); buf.extend_from_slice(&0u64.to_be_bytes()); }
226 }
227 }
228}
229
230#[derive(Debug)]
240pub struct FileStateMachine {
241 data: FileStateMachineDataType, lease: Option<Arc<DefaultLease>>,
248
249 lease_enabled: bool,
255
256 last_applied_index: AtomicU64,
258 last_applied_term: AtomicU64,
259 last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
260
261 running: AtomicBool,
263
264 wal_entries_since_checkpoint: AtomicU64,
267 last_checkpoint: Mutex<Instant>,
268
269 data_dir: PathBuf,
271 }
275
276impl FileStateMachine {
277 pub async fn new(data_dir: PathBuf) -> Result<Self, Error> {
287 fs::create_dir_all(&data_dir).await?;
289
290 let machine = Self {
291 data: RwLock::new(HashMap::new()),
292 lease: None, lease_enabled: false, last_applied_index: AtomicU64::new(0),
295 last_applied_term: AtomicU64::new(0),
296 last_snapshot_metadata: RwLock::new(None),
297 running: AtomicBool::new(true),
298 wal_entries_since_checkpoint: AtomicU64::new(0),
299 last_checkpoint: Mutex::new(Instant::now()),
300 data_dir: data_dir.clone(),
301 };
302
303 machine.load_from_disk().await?;
305
306 Ok(machine)
307 }
308
309 pub fn set_lease(
315 &mut self,
316 lease: Arc<DefaultLease>,
317 ) {
318 self.lease_enabled = true;
320 self.lease = Some(lease);
321 }
322
323 async fn load_from_disk(&self) -> Result<(), Error> {
328 self.load_metadata().await?;
330
331 self.load_data().await?;
333
334 self.load_ttl_data().await?;
336
337 self.replay_wal().await?;
339
340 info!("Loaded state machine data from disk");
341 Ok(())
342 }
343
344 async fn load_ttl_data(&self) -> Result<(), Error> {
346 Ok(())
350 }
351
352 pub async fn load_lease_data(&self) -> Result<(), Error> {
357 let Some(ref lease) = self.lease else {
358 return Ok(()); };
360
361 let ttl_path = self.data_dir.join("ttl_state.bin");
362 if !ttl_path.exists() {
363 debug!("No TTL state file found");
364 return Ok(());
365 }
366
367 let ttl_data = tokio::fs::read(&ttl_path).await?;
368 lease.reload(&ttl_data)?;
369
370 info!("Loaded TTL state from disk: {} active TTLs", lease.len());
371 Ok(())
372 }
373
374 async fn load_metadata(&self) -> Result<(), Error> {
376 let metadata_path = self.data_dir.join("metadata.bin");
377 if !metadata_path.exists() {
378 return Ok(());
379 }
380
381 let mut file = File::open(metadata_path).await?;
382 let mut buffer = [0u8; 16];
383
384 if file.read_exact(&mut buffer).await.is_ok() {
385 let index = u64::from_be_bytes([
386 buffer[0], buffer[1], buffer[2], buffer[3], buffer[4], buffer[5], buffer[6],
387 buffer[7],
388 ]);
389
390 let term = u64::from_be_bytes([
391 buffer[8], buffer[9], buffer[10], buffer[11], buffer[12], buffer[13], buffer[14],
392 buffer[15],
393 ]);
394
395 self.last_applied_index.store(index, Ordering::SeqCst);
396 self.last_applied_term.store(term, Ordering::SeqCst);
397 }
398
399 Ok(())
400 }
401
402 async fn load_data(&self) -> Result<(), Error> {
404 let data_path = self.data_dir.join("state.data");
405 if !data_path.exists() {
406 return Ok(());
407 }
408
409 let mut file = File::open(data_path).await?;
410 let mut buffer = Vec::new();
411 file.read_to_end(&mut buffer).await?;
412
413 let mut pos = 0;
414 let mut data = self.data.write();
415
416 while pos < buffer.len() {
417 if pos + 8 > buffer.len() {
419 break;
420 }
421
422 let key_len_bytes = &buffer[pos..pos + 8];
423 let key_len = u64::from_be_bytes([
424 key_len_bytes[0],
425 key_len_bytes[1],
426 key_len_bytes[2],
427 key_len_bytes[3],
428 key_len_bytes[4],
429 key_len_bytes[5],
430 key_len_bytes[6],
431 key_len_bytes[7],
432 ]) as usize;
433
434 pos += 8;
435
436 if pos + key_len > buffer.len() {
438 break;
439 }
440
441 let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
442 pos += key_len;
443
444 if pos + 8 > buffer.len() {
446 break;
447 }
448
449 let value_len_bytes = &buffer[pos..pos + 8];
450 let value_len = u64::from_be_bytes([
451 value_len_bytes[0],
452 value_len_bytes[1],
453 value_len_bytes[2],
454 value_len_bytes[3],
455 value_len_bytes[4],
456 value_len_bytes[5],
457 value_len_bytes[6],
458 value_len_bytes[7],
459 ]) as usize;
460
461 pos += 8;
462
463 if pos + value_len > buffer.len() {
465 break;
466 }
467
468 let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
469 pos += value_len;
470
471 if pos + 8 > buffer.len() {
473 break;
474 }
475
476 let term_bytes = &buffer[pos..pos + 8];
477 let term = u64::from_be_bytes([
478 term_bytes[0],
479 term_bytes[1],
480 term_bytes[2],
481 term_bytes[3],
482 term_bytes[4],
483 term_bytes[5],
484 term_bytes[6],
485 term_bytes[7],
486 ]);
487
488 pos += 8;
489
490 data.insert(key, (value, term));
492 }
493
494 Ok(())
495 }
496
497 async fn replay_wal(&self) -> Result<(), Error> {
499 let wal_path = self.data_dir.join("wal.log");
500 if !wal_path.exists() {
501 debug!("No WAL file found, skipping replay");
502 return Ok(());
503 }
504
505 let mut file = File::open(wal_path).await?;
506 let mut buffer = Vec::new();
507 file.read_to_end(&mut buffer).await?;
508
509 if buffer.is_empty() {
510 debug!("WAL file is empty, skipping replay");
511 return Ok(());
512 }
513
514 let mut pos = 0;
515 let mut operations = Vec::new();
516 let mut replayed_count = 0;
517
518 while pos + 17 < buffer.len() {
519 let _index = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
521 pos += 8;
522
523 let term = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
525 pos += 8;
526
527 let op_code =
529 WalOpCode::from_u8(buffer[pos]).ok_or_else(|| StorageError::DataCorruption {
530 location: format!("WAL opcode {} at byte offset {}", buffer[pos], pos),
531 })?;
532 pos += 1;
533
534 if pos + 8 > buffer.len() {
536 warn!("Incomplete key length at position {}, stopping replay", pos);
537 break;
538 }
539
540 let key_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
542 pos += 8;
543
544 if pos + key_len > buffer.len() {
546 warn!(
547 "Incomplete key data at position {} (need {} bytes, have {})",
548 pos,
549 key_len,
550 buffer.len() - pos
551 );
552 break;
553 }
554
555 let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
557 pos += key_len;
558
559 if pos + 8 > buffer.len() {
561 warn!(
562 "Incomplete value length at position {}, stopping replay",
563 pos
564 );
565 break;
566 }
567
568 let value_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
570 pos += 8;
571
572 let value = if value_len > 0 {
574 if pos + value_len > buffer.len() {
575 warn!("Incomplete value data at position {}, stopping replay", pos);
576 break;
577 }
578 let value_data = Bytes::from(buffer[pos..pos + value_len].to_vec());
579 pos += value_len;
580 Some(value_data)
581 } else {
582 None
583 };
584
585 if pos + 8 > buffer.len() {
602 warn!(
603 "Incomplete WAL entry at byte offset {} (expected expire_at field), truncating replay here",
604 pos
605 );
606 break;
607 }
608 let secs = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
609 pos += 8;
610 let expire_at_secs = if secs > 0 { Some(secs) } else { None };
611
612 operations.push((op_code, key, value, term, expire_at_secs));
613 replayed_count += 1;
614 }
615
616 info!(
617 "Parsed {} WAL operations, applying to memory",
618 operations.len()
619 );
620
621 let mut applied_count = 0;
623 let mut skipped_expired = 0;
624 let now = std::time::SystemTime::now();
625 {
626 let mut data = self.data.write();
627
628 for (op_code, key, value, term, expire_at_secs) in operations {
629 match op_code {
630 WalOpCode::Insert => {
631 if let Some(value_data) = value {
632 let is_expired = if let Some(secs) = expire_at_secs {
634 let expire_at =
635 std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs);
636 now >= expire_at
637 } else {
638 false
639 };
640
641 if is_expired {
642 debug!("Skipped expired key during WAL replay: key={:?}", key);
644 skipped_expired += 1;
645 continue;
646 }
647
648 data.insert(key.clone(), (value_data, term));
649
650 if let Some(secs) = expire_at_secs {
652 if let Some(ref lease) = self.lease {
653 let expire_at = std::time::UNIX_EPOCH
654 + std::time::Duration::from_secs(secs);
655 let remaining = expire_at
656 .duration_since(now)
657 .map(|d| d.as_secs())
658 .unwrap_or(0);
659
660 if remaining > 0 {
661 lease.register(key.clone(), remaining);
662 debug!(
663 "Replayed INSERT with TTL: key={:?}, remaining={}s",
664 key, remaining
665 );
666 }
667 }
668 } else {
669 debug!("Replayed INSERT: key={:?}", key);
670 }
671
672 applied_count += 1;
673 } else {
674 warn!("INSERT operation without value");
675 }
676 }
677 WalOpCode::Delete => {
678 data.remove(&key);
679 if let Some(ref lease) = self.lease {
680 lease.unregister(&key);
681 }
682 applied_count += 1;
683 debug!("Replayed DELETE: key={:?}", key);
684 }
685 WalOpCode::CompareAndSwap => {
686 if let Some(new_value) = value {
691 data.insert(key.clone(), (new_value, term));
692 applied_count += 1;
693 debug!("Replayed legacy CAS: key={:?}", key);
694 } else {
695 warn!("Legacy CAS WAL entry missing new_value");
696 }
697 }
698 WalOpCode::Noop | WalOpCode::Config | WalOpCode::CasFailed => {
699 applied_count += 1;
701 debug!("Replayed {:?} operation", op_code);
702 }
703 }
704 }
705 }
706
707 info!(
708 "WAL replay complete: {} operations replayed, {} applied, {} expired keys skipped",
709 replayed_count, applied_count, skipped_expired
710 );
711
712 self.clear_wal_async().await?;
717 debug!(
718 "Cleared WAL after replay ({} operations applied)",
719 applied_count
720 );
721
722 Ok(())
723 }
724
725 fn persist_data(&self) -> Result<(), Error> {
727 let data_copy: HashMap<Bytes, (Bytes, u64)> = {
729 let data = self.data.read();
730 data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
731 };
732
733 let data_path = self.data_dir.join("state.data");
735
736 let estimated: usize =
737 data_copy.iter().map(|(k, (v, _))| 8 + k.len() + 8 + v.len() + 8).sum();
738 let mut buf = Vec::with_capacity(estimated);
739
740 for (key, (value, term)) in &data_copy {
741 buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
742 buf.extend_from_slice(key);
743 buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
744 buf.extend_from_slice(value);
745 buf.extend_from_slice(&term.to_be_bytes());
746 }
747
748 std::fs::write(data_path, buf)?;
749 Ok(())
750 }
751
752 async fn persist_data_async(&self) -> Result<(), Error> {
754 let data_copy: HashMap<Bytes, (Bytes, u64)> = {
756 let data = self.data.read();
757 data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
758 };
759
760 let data_path = self.data_dir.join("state.data");
761 let mut file = OpenOptions::new()
762 .write(true)
763 .create(true)
764 .truncate(true)
765 .open(data_path)
766 .await?;
767
768 let estimated: usize =
771 data_copy.iter().map(|(k, (v, _))| 8 + k.len() + 8 + v.len() + 8).sum();
772 let mut buf = Vec::with_capacity(estimated);
773
774 for (key, (value, term)) in &data_copy {
775 buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
776 buf.extend_from_slice(key);
777 buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
778 buf.extend_from_slice(value);
779 buf.extend_from_slice(&term.to_be_bytes());
780 }
781
782 file.write_all(&buf).await?;
783 file.flush().await?;
784
785 Ok(())
786 }
787
788 fn persist_metadata(&self) -> Result<(), Error> {
790 let metadata_path = self.data_dir.join("metadata.bin");
791 let mut file = std::fs::OpenOptions::new()
792 .write(true)
793 .create(true)
794 .truncate(true)
795 .open(metadata_path)?;
796
797 let index = self.last_applied_index.load(Ordering::SeqCst);
798 let term = self.last_applied_term.load(Ordering::SeqCst);
799
800 file.write_all(&index.to_be_bytes())?;
801 file.write_all(&term.to_be_bytes())?;
802
803 file.flush()?;
804 Ok(())
805 }
806
807 async fn persist_metadata_async(&self) -> Result<(), Error> {
808 let metadata_path = self.data_dir.join("metadata.bin");
809 let mut file = OpenOptions::new()
810 .write(true)
811 .create(true)
812 .truncate(true)
813 .open(metadata_path)
814 .await?;
815
816 let index = self.last_applied_index.load(Ordering::SeqCst);
817 let term = self.last_applied_term.load(Ordering::SeqCst);
818
819 file.write_all(&index.to_be_bytes()).await?;
820 file.write_all(&term.to_be_bytes()).await?;
821
822 file.flush().await?;
823 Ok(())
824 }
825
826 #[allow(unused)]
828 fn clear_wal(&self) -> Result<(), Error> {
829 let wal_path = self.data_dir.join("wal.log");
830 let mut file = std::fs::OpenOptions::new()
831 .write(true)
832 .create(true)
833 .truncate(true)
834 .open(wal_path)?;
835
836 file.set_len(0)?;
837 file.flush()?;
838 Ok(())
839 }
840
841 async fn clear_wal_async(&self) -> Result<(), Error> {
843 let wal_path = self.data_dir.join("wal.log");
844 let mut file = OpenOptions::new()
845 .write(true)
846 .create(true)
847 .truncate(true)
848 .open(wal_path)
849 .await?;
850
851 file.set_len(0).await?;
852 file.flush().await?;
853 Ok(())
854 }
855
856 fn should_checkpoint(&self) -> bool {
862 const WAL_ENTRY_THRESHOLD: u64 = 1000;
863 const TIME_THRESHOLD_SECS: u64 = 10;
864
865 if self.wal_entries_since_checkpoint.load(Ordering::Relaxed) >= WAL_ENTRY_THRESHOLD {
866 return true;
867 }
868 if let Ok(last) = self.last_checkpoint.lock() {
869 return last.elapsed().as_secs() >= TIME_THRESHOLD_SECS;
870 }
871 false
872 }
873
874 pub(crate) async fn checkpoint(&self) -> Result<(), Error> {
878 self.persist_data_async().await?;
879 self.persist_metadata_async().await?;
880 self.clear_wal_async().await?;
881
882 self.wal_entries_since_checkpoint.store(0, Ordering::Relaxed);
883 if let Ok(mut last) = self.last_checkpoint.lock() {
884 *last = Instant::now();
885 }
886 debug!("Checkpoint complete");
887 Ok(())
888 }
889
890 pub async fn reset(&self) -> Result<(), Error> {
898 info!("Resetting state machine");
899
900 {
902 let mut data = self.data.write();
903 data.clear();
904 }
905
906 self.last_applied_index.store(0, Ordering::SeqCst);
908 self.last_applied_term.store(0, Ordering::SeqCst);
909
910 {
911 let mut snapshot_metadata = self.last_snapshot_metadata.write();
912 *snapshot_metadata = None;
913 }
914
915 self.clear_data_file().await?;
917 self.clear_metadata_file().await?;
918 self.clear_wal_async().await?;
919
920 info!("State machine reset completed");
921 Ok(())
922 }
923
924 async fn clear_data_file(&self) -> Result<(), Error> {
926 let data_path = self.data_dir.join("state.data");
927 let mut file = OpenOptions::new()
928 .write(true)
929 .create(true)
930 .truncate(true)
931 .open(data_path)
932 .await?;
933
934 file.set_len(0).await?;
935 file.flush().await?;
936 Ok(())
937 }
938
939 async fn clear_metadata_file(&self) -> Result<(), Error> {
941 let metadata_path = self.data_dir.join("metadata.bin");
942 let mut file = OpenOptions::new()
943 .write(true)
944 .create(true)
945 .truncate(true)
946 .open(metadata_path)
947 .await?;
948
949 file.write_all(&0u64.to_be_bytes()).await?;
951 file.write_all(&0u64.to_be_bytes()).await?;
952
953 file.flush().await?;
954 Ok(())
955 }
956
957 #[cfg(test)]
973 pub(crate) async fn append_to_wal(
974 &self,
975 entries: &[ApplyEntry],
976 cas_outcomes: &[bool],
977 ) -> Result<(), Error> {
978 assert_eq!(
979 entries.len(),
980 cas_outcomes.len(),
981 "cas_outcomes must have the same length as entries"
982 );
983 if entries.is_empty() {
984 return Ok(());
985 }
986
987 let wal_path = self.data_dir.join("wal.log");
988 let mut file =
989 OpenOptions::new().write(true).create(true).append(true).open(&wal_path).await?;
990
991 let mut batch_buffer = Vec::with_capacity(entries.len() * 64);
992 for (i, entry) in entries.iter().enumerate() {
993 encode_wal_entry(&mut batch_buffer, entry, cas_outcomes[i]);
994 }
995
996 file.write_all(&batch_buffer).await?;
997 file.flush().await?;
998
999 Ok(())
1000 }
1001}
1002
1003impl Drop for FileStateMachine {
1004 fn drop(&mut self) {
1005 let timer = Instant::now();
1006
1007 match self.save_hard_state() {
1009 Ok(_) => debug!("StateMachine saved in {:?}", timer.elapsed()),
1010 Err(e) => error!("Failed to save StateMachine: {}", e),
1011 }
1012 }
1013}
1014
1015#[async_trait]
1016impl StateMachine for FileStateMachine {
1017 async fn start(&self) -> Result<(), Error> {
1018 self.running.store(true, Ordering::SeqCst);
1019
1020 if self.lease.is_some() {
1022 self.load_lease_data().await?;
1023 debug!("Lease data loaded during state machine initialization");
1024 }
1025
1026 info!("File state machine started");
1027 Ok(())
1028 }
1029
1030 fn stop(&self) -> Result<(), Error> {
1031 self.running.store(false, Ordering::SeqCst);
1033
1034 if let Some(ref lease) = self.lease {
1037 let ttl_snapshot = lease.to_snapshot();
1038 let ttl_path = self.data_dir.join("ttl_state.bin");
1039 std::fs::write(&ttl_path, ttl_snapshot)
1041 .map_err(d_engine_core::StorageError::IoError)?;
1042 debug!("Persisted TTL state on shutdown");
1043 }
1044
1045 info!("File state machine stopped");
1046 Ok(())
1047 }
1048
1049 fn is_running(&self) -> bool {
1050 self.running.load(Ordering::SeqCst)
1051 }
1052
1053 fn get(
1054 &self,
1055 key_buffer: &[u8],
1056 ) -> Result<Option<Bytes>, Error> {
1057 let data = self.data.read();
1062 Ok(data.get(key_buffer).map(|(value, _)| value.clone()))
1063 }
1064
1065 fn entry_term(
1066 &self,
1067 entry_id: u64,
1068 ) -> Option<u64> {
1069 let data = self.data.read();
1070 data.values().find(|(_, index)| *index == entry_id).map(|(_, term)| *term)
1071 }
1072
1073 async fn apply_chunk(
1075 &self,
1076 chunk: &[ApplyEntry],
1077 ) -> Result<Vec<ApplyResult>, Error> {
1078 let chunk_len = chunk.len();
1079 let mut results = Vec::with_capacity(chunk_len);
1080 let mut highest_log_id: Option<LogId> = None;
1081
1082 for entry in chunk {
1084 if let Some(prev) = &highest_log_id {
1085 assert!(
1086 entry.index > prev.index,
1087 "apply_chunk: received unordered entry at index {} (prev={})",
1088 entry.index,
1089 prev.index
1090 );
1091 }
1092 highest_log_id = Some(LogId {
1093 index: entry.index,
1094 term: entry.term,
1095 });
1096 info!("COMMITTED_LOG_METRIC: {}", entry.index);
1097 }
1098
1099 let (cas_outcomes, wal_buf) = {
1112 let cas_keys: HashSet<&Bytes> = chunk
1114 .iter()
1115 .filter_map(|e| match &e.command {
1116 Command::CompareAndSwap { key, .. } => Some(key),
1117 _ => None,
1118 })
1119 .collect();
1120
1121 let base: HashMap<Bytes, (Bytes, u64)> = {
1123 let guard = self.data.read();
1124 cas_keys
1125 .iter()
1126 .filter_map(|k| guard.get(k.as_ref()).map(|v| ((*k).clone(), v.clone())))
1127 .collect()
1128 };
1129
1130 let mut delta: HashMap<Bytes, Option<(Bytes, u64)>> = HashMap::new();
1133 let mut wal_buf: Vec<u8> = Vec::with_capacity(chunk.len() * 64);
1134
1135 let outcomes: Vec<bool> = chunk
1136 .iter()
1137 .map(|entry| {
1138 let success = match &entry.command {
1139 Command::Insert { key, value, .. } => {
1140 delta.insert(key.clone(), Some((value.clone(), entry.term)));
1141 false
1142 }
1143 Command::Delete { key } => {
1144 delta.insert(key.clone(), None);
1145 false
1146 }
1147 Command::CompareAndSwap {
1148 key,
1149 expected,
1150 value: new_value,
1151 } => {
1152 let current =
1154 delta.get(key).map(|v| v.as_ref()).unwrap_or_else(|| base.get(key));
1155 let success = match (current, expected) {
1156 (Some((c, _)), Some(e)) => c.as_ref() == e.as_ref(),
1157 (None, None) => true,
1158 _ => false,
1159 };
1160 if success {
1161 delta.insert(key.clone(), Some((new_value.clone(), entry.term)));
1162 }
1163 success
1164 }
1165 Command::Noop => false,
1166 };
1167 encode_wal_entry(&mut wal_buf, entry, success);
1168 success
1169 })
1170 .collect();
1171
1172 (outcomes, wal_buf)
1173 };
1174
1175 if !wal_buf.is_empty() {
1180 let wal_path = self.data_dir.join("wal.log");
1181 let mut file =
1182 OpenOptions::new().write(true).create(true).append(true).open(&wal_path).await?;
1183 file.write_all(&wal_buf).await?;
1184 file.flush().await?;
1185 }
1186
1187 {
1190 let mut data = self.data.write();
1191
1192 for (entry, &cas_success) in chunk.iter().zip(cas_outcomes.iter()) {
1193 match &entry.command {
1194 Command::Noop => {
1195 results.push(ApplyResult::success(entry.index));
1196 }
1197 Command::Insert {
1198 key,
1199 value,
1200 ttl_secs,
1201 } => {
1202 data.insert(key.clone(), (value.clone(), entry.term));
1203 if let Some(ttl) = ttl_secs {
1204 if !self.lease_enabled {
1205 return Err(StorageError::FeatureNotEnabled(
1206 "TTL feature is not enabled on this server. \
1207 Enable it in config: [raft.state_machine.lease] enabled = true"
1208 .into(),
1209 )
1210 .into());
1211 }
1212 let lease = unsafe { self.lease.as_ref().unwrap_unchecked() };
1214 lease.register(key.clone(), *ttl);
1215 }
1216 results.push(ApplyResult::success(entry.index));
1217 }
1218 Command::Delete { key } => {
1219 data.remove(key.as_ref());
1220 if let Some(ref lease) = self.lease {
1221 lease.unregister(key);
1222 }
1223 results.push(ApplyResult::success(entry.index));
1224 }
1225 Command::CompareAndSwap {
1226 key,
1227 value: new_value,
1228 ..
1229 } => {
1230 debug!(
1231 "CAS at index {}: key={:?}, success={}",
1232 entry.index,
1233 String::from_utf8_lossy(key),
1234 cas_success
1235 );
1236 results.push(if cas_success {
1237 ApplyResult::success(entry.index)
1238 } else {
1239 ApplyResult::failure(entry.index)
1240 });
1241 if cas_success {
1242 data.insert(key.clone(), (new_value.clone(), entry.term));
1243 }
1244 }
1245 }
1246 }
1247 } if let Some(log_id) = highest_log_id {
1253 debug!("State machine - updated last_applied: {:?}", log_id);
1254 self.update_last_applied(log_id);
1255 }
1256
1257 self.wal_entries_since_checkpoint.fetch_add(chunk_len as u64, Ordering::Relaxed);
1258
1259 if self.should_checkpoint() {
1260 self.checkpoint().await?;
1261 }
1262
1263 Ok(results)
1264 }
1265
1266 fn len(&self) -> usize {
1267 self.data.read().len()
1268 }
1269
1270 fn update_last_applied(
1271 &self,
1272 last_applied: LogId,
1273 ) {
1274 self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
1275 self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
1276 }
1277
1278 fn last_applied(&self) -> LogId {
1279 LogId {
1280 index: self.last_applied_index.load(Ordering::SeqCst),
1281 term: self.last_applied_term.load(Ordering::SeqCst),
1282 }
1283 }
1284
1285 fn persist_last_applied(
1286 &self,
1287 last_applied: LogId,
1288 ) -> Result<(), Error> {
1289 self.update_last_applied(last_applied);
1290 self.persist_metadata()
1291 }
1292
1293 fn update_last_snapshot_metadata(
1294 &self,
1295 snapshot_metadata: &SnapshotMetadata,
1296 ) -> Result<(), Error> {
1297 *self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
1298 Ok(())
1299 }
1300
1301 fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
1302 self.last_snapshot_metadata.read().clone()
1303 }
1304
1305 fn persist_last_snapshot_metadata(
1306 &self,
1307 snapshot_metadata: &SnapshotMetadata,
1308 ) -> Result<(), Error> {
1309 self.update_last_snapshot_metadata(snapshot_metadata)
1310 }
1311
1312 async fn apply_snapshot_from_file(
1313 &self,
1314 metadata: &SnapshotMetadata,
1315 snapshot_dir: std::path::PathBuf,
1316 ) -> Result<(), Error> {
1317 info!("Applying snapshot from file: {:?}", snapshot_dir);
1318
1319 let snapshot_data_path = snapshot_dir.join("snapshot.bin");
1321 let mut file = File::open(snapshot_data_path).await?;
1322 let mut buffer = Vec::new();
1323 file.read_to_end(&mut buffer).await?;
1324
1325 let mut pos = 0;
1327 let mut new_data = HashMap::new();
1328
1329 while pos < buffer.len() {
1330 if pos + 8 > buffer.len() {
1332 break;
1333 }
1334
1335 let key_len_bytes = &buffer[pos..pos + 8];
1336 let key_len = u64::from_be_bytes([
1337 key_len_bytes[0],
1338 key_len_bytes[1],
1339 key_len_bytes[2],
1340 key_len_bytes[3],
1341 key_len_bytes[4],
1342 key_len_bytes[5],
1343 key_len_bytes[6],
1344 key_len_bytes[7],
1345 ]) as usize;
1346
1347 pos += 8;
1348
1349 if pos + key_len > buffer.len() {
1351 break;
1352 }
1353
1354 let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
1355 pos += key_len;
1356
1357 if pos + 8 > buffer.len() {
1359 break;
1360 }
1361
1362 let value_len_bytes = &buffer[pos..pos + 8];
1363 let value_len = u64::from_be_bytes([
1364 value_len_bytes[0],
1365 value_len_bytes[1],
1366 value_len_bytes[2],
1367 value_len_bytes[3],
1368 value_len_bytes[4],
1369 value_len_bytes[5],
1370 value_len_bytes[6],
1371 value_len_bytes[7],
1372 ]) as usize;
1373
1374 pos += 8;
1375
1376 if pos + value_len > buffer.len() {
1378 break;
1379 }
1380
1381 let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
1382 pos += value_len;
1383
1384 if pos + 8 > buffer.len() {
1386 break;
1387 }
1388
1389 let term_bytes = &buffer[pos..pos + 8];
1390 let term = u64::from_be_bytes([
1391 term_bytes[0],
1392 term_bytes[1],
1393 term_bytes[2],
1394 term_bytes[3],
1395 term_bytes[4],
1396 term_bytes[5],
1397 term_bytes[6],
1398 term_bytes[7],
1399 ]);
1400
1401 pos += 8;
1402
1403 new_data.insert(key, (value, term));
1405 }
1406
1407 if pos + 8 <= buffer.len() {
1409 let ttl_len_bytes = &buffer[pos..pos + 8];
1410 let ttl_len = u64::from_be_bytes([
1411 ttl_len_bytes[0],
1412 ttl_len_bytes[1],
1413 ttl_len_bytes[2],
1414 ttl_len_bytes[3],
1415 ttl_len_bytes[4],
1416 ttl_len_bytes[5],
1417 ttl_len_bytes[6],
1418 ttl_len_bytes[7],
1419 ]) as usize;
1420 pos += 8;
1421
1422 if pos + ttl_len <= buffer.len() {
1423 let ttl_data = &buffer[pos..pos + ttl_len];
1424 if let Some(ref lease) = self.lease {
1425 lease.reload(ttl_data)?;
1426 }
1427 }
1428 }
1429
1430 {
1432 let mut data = self.data.write();
1433 *data = new_data;
1434 }
1435
1436 *self.last_snapshot_metadata.write() = Some(metadata.clone());
1438
1439 if let Some(last_included) = &metadata.last_included {
1440 self.update_last_applied(*last_included);
1441 }
1442
1443 self.persist_data_async().await?;
1445 self.persist_metadata_async().await?;
1446 self.clear_wal_async().await?;
1447
1448 info!("Snapshot applied successfully");
1449 Ok(())
1450 }
1451
1452 async fn generate_snapshot_data(
1453 &self,
1454 new_snapshot_dir: std::path::PathBuf,
1455 last_included: LogId,
1456 ) -> Result<Bytes, Error> {
1457 info!("Generating snapshot data up to {:?}", last_included);
1458
1459 fs::create_dir_all(&new_snapshot_dir).await?;
1461
1462 let snapshot_path = new_snapshot_dir.join("snapshot.bin");
1464 let mut file = File::create(&snapshot_path).await?;
1465
1466 let data_copy: HashMap<Bytes, (Bytes, u64)> = {
1467 let data = self.data.read();
1468 data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
1469 };
1470
1471 let lease_snapshot = if let Some(ref lease) = self.lease {
1473 lease.to_snapshot()
1474 } else {
1475 Vec::new()
1476 };
1477 let estimated: usize =
1478 data_copy.iter().map(|(k, (v, _))| 8 + k.len() + 8 + v.len() + 8).sum::<usize>()
1479 + 8
1480 + lease_snapshot.len();
1481 let mut buf = Vec::with_capacity(estimated);
1482
1483 for (key, (value, term)) in &data_copy {
1484 buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
1485 buf.extend_from_slice(key);
1486 buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
1487 buf.extend_from_slice(value);
1488 buf.extend_from_slice(&term.to_be_bytes());
1489 }
1490
1491 buf.extend_from_slice(&(lease_snapshot.len() as u64).to_be_bytes());
1492 buf.extend_from_slice(&lease_snapshot);
1493
1494 file.write_all(&buf).await?;
1495
1496 file.flush().await?;
1497
1498 let metadata = SnapshotMetadata {
1500 last_included: Some(last_included),
1501 checksum: Bytes::from(vec![0; 32]), };
1503
1504 self.update_last_snapshot_metadata(&metadata)?;
1505
1506 info!("Snapshot generated at {:?}", snapshot_path);
1507
1508 Ok(Bytes::from_static(&[0u8; 32]))
1510 }
1511
1512 fn save_hard_state(&self) -> Result<(), Error> {
1513 let last_applied = self.last_applied();
1514 self.persist_last_applied(last_applied)?;
1515
1516 if let Some(last_snapshot_metadata) = self.snapshot_metadata() {
1517 self.persist_last_snapshot_metadata(&last_snapshot_metadata)?;
1518 }
1519
1520 self.flush()?;
1521 Ok(())
1522 }
1523
1524 fn flush(&self) -> Result<(), Error> {
1525 self.persist_data()?;
1526 self.persist_metadata()?;
1527 Ok(())
1529 }
1530
1531 async fn flush_async(&self) -> Result<(), Error> {
1532 self.checkpoint().await
1534 }
1535
1536 async fn reset(&self) -> Result<(), Error> {
1537 self.reset().await
1538 }
1539
1540 async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
1541 let Some(ref lease) = self.lease else {
1543 return Ok(vec![]);
1544 };
1545
1546 let now = SystemTime::now();
1548 let expired_keys = lease.get_expired_keys(now);
1549
1550 if expired_keys.is_empty() {
1551 return Ok(vec![]);
1552 }
1553
1554 debug!(
1555 "Lease background cleanup: found {} expired keys",
1556 expired_keys.len()
1557 );
1558
1559 {
1561 let mut data = self.data.write();
1562 for key in &expired_keys {
1563 data.remove(key);
1564 }
1565 }
1566
1567 self.persist_data_async().await?;
1569
1570 info!(
1571 "Lease background cleanup: deleted {} expired keys",
1572 expired_keys.len()
1573 );
1574
1575 Ok(expired_keys)
1576 }
1577
1578 fn scan_prefix(
1579 &self,
1580 prefix: &[u8],
1581 ) -> Result<ScanResult, Error> {
1582 let data = self.data.read();
1583 let entries: Vec<(Bytes, Bytes)> = data
1584 .iter()
1585 .filter(|(k, _)| k.starts_with(prefix))
1586 .map(|(k, (v, _))| (k.clone(), v.clone()))
1587 .collect();
1588 let revision = self.last_applied_index.load(Ordering::SeqCst);
1589 Ok(ScanResult { entries, revision })
1590 }
1591}