1use std::collections::HashMap;
80use std::io::Write;
81use std::path::PathBuf;
82use std::sync::Arc;
83use std::sync::atomic::AtomicBool;
84use std::sync::atomic::AtomicU64;
85use std::sync::atomic::Ordering;
86use std::time::SystemTime;
87
88use bytes::Bytes;
89use d_engine_core::Error;
90use d_engine_core::Lease;
91use d_engine_core::StateMachine;
92use d_engine_core::StorageError;
93use d_engine_proto::client::WriteCommand;
94use d_engine_proto::client::write_command::Delete;
95use d_engine_proto::client::write_command::Insert;
96use d_engine_proto::client::write_command::Operation;
97use d_engine_proto::common::Entry;
98use d_engine_proto::common::LogId;
99use d_engine_proto::common::entry_payload::Payload;
100use d_engine_proto::server::storage::SnapshotMetadata;
101use parking_lot::RwLock;
102use prost::Message;
103use tokio::fs;
104use tokio::fs::File;
105use tokio::fs::OpenOptions;
106use tokio::io::AsyncReadExt;
107use tokio::io::AsyncWriteExt;
108use tokio::time::Instant;
109use tonic::async_trait;
110use tracing::debug;
111use tracing::error;
112use tracing::info;
113use tracing::warn;
114
115use crate::storage::DefaultLease;
116
117type FileStateMachineDataType = RwLock<HashMap<Bytes, (Bytes, u64)>>;
118
119#[repr(u8)]
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122enum WalOpCode {
123 Noop = 0,
124 Insert = 1,
125 Delete = 2,
126 Config = 3,
127}
128
129impl WalOpCode {
130 fn from_str(s: &str) -> Self {
131 match s {
132 "INSERT" => Self::Insert,
133 "DELETE" => Self::Delete,
134 "CONFIG" => Self::Config,
135 _ => Self::Noop,
136 }
137 }
138
139 fn from_u8(byte: u8) -> Self {
140 match byte {
141 1 => Self::Insert,
142 2 => Self::Delete,
143 3 => Self::Config,
144 _ => Self::Noop,
145 }
146 }
147}
148
149#[derive(Debug)]
159pub struct FileStateMachine {
160 data: FileStateMachineDataType, lease: Option<Arc<DefaultLease>>,
167
168 lease_enabled: bool,
174
175 last_applied_index: AtomicU64,
177 last_applied_term: AtomicU64,
178 last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
179
180 running: AtomicBool,
182
183 data_dir: PathBuf,
185 }
189
190impl FileStateMachine {
191 pub async fn new(data_dir: PathBuf) -> Result<Self, Error> {
201 fs::create_dir_all(&data_dir).await?;
203
204 let machine = Self {
205 data: RwLock::new(HashMap::new()),
206 lease: None, lease_enabled: false, last_applied_index: AtomicU64::new(0),
209 last_applied_term: AtomicU64::new(0),
210 last_snapshot_metadata: RwLock::new(None),
211 running: AtomicBool::new(true),
212 data_dir: data_dir.clone(),
213 };
214
215 machine.load_from_disk().await?;
217
218 Ok(machine)
219 }
220
221 pub fn set_lease(
227 &mut self,
228 lease: Arc<DefaultLease>,
229 ) {
230 self.lease_enabled = true;
232 self.lease = Some(lease);
233 }
234
235 async fn load_from_disk(&self) -> Result<(), Error> {
240 self.load_metadata().await?;
242
243 self.load_data().await?;
245
246 self.load_ttl_data().await?;
248
249 self.replay_wal().await?;
251
252 info!("Loaded state machine data from disk");
253 Ok(())
254 }
255
256 async fn load_ttl_data(&self) -> Result<(), Error> {
258 Ok(())
262 }
263
264 pub async fn load_lease_data(&self) -> Result<(), Error> {
269 let Some(ref lease) = self.lease else {
270 return Ok(()); };
272
273 let ttl_path = self.data_dir.join("ttl_state.bin");
274 if !ttl_path.exists() {
275 debug!("No TTL state file found");
276 return Ok(());
277 }
278
279 let ttl_data = tokio::fs::read(&ttl_path).await?;
280 lease.reload(&ttl_data)?;
281
282 info!("Loaded TTL state from disk: {} active TTLs", lease.len());
283 Ok(())
284 }
285
286 async fn load_metadata(&self) -> Result<(), Error> {
288 let metadata_path = self.data_dir.join("metadata.bin");
289 if !metadata_path.exists() {
290 return Ok(());
291 }
292
293 let mut file = File::open(metadata_path).await?;
294 let mut buffer = [0u8; 16];
295
296 if file.read_exact(&mut buffer).await.is_ok() {
297 let index = u64::from_be_bytes([
298 buffer[0], buffer[1], buffer[2], buffer[3], buffer[4], buffer[5], buffer[6],
299 buffer[7],
300 ]);
301
302 let term = u64::from_be_bytes([
303 buffer[8], buffer[9], buffer[10], buffer[11], buffer[12], buffer[13], buffer[14],
304 buffer[15],
305 ]);
306
307 self.last_applied_index.store(index, Ordering::SeqCst);
308 self.last_applied_term.store(term, Ordering::SeqCst);
309 }
310
311 Ok(())
312 }
313
314 async fn load_data(&self) -> Result<(), Error> {
316 let data_path = self.data_dir.join("state.data");
317 if !data_path.exists() {
318 return Ok(());
319 }
320
321 let mut file = File::open(data_path).await?;
322 let mut buffer = Vec::new();
323 file.read_to_end(&mut buffer).await?;
324
325 let mut pos = 0;
326 let mut data = self.data.write();
327
328 while pos < buffer.len() {
329 if pos + 8 > buffer.len() {
331 break;
332 }
333
334 let key_len_bytes = &buffer[pos..pos + 8];
335 let key_len = u64::from_be_bytes([
336 key_len_bytes[0],
337 key_len_bytes[1],
338 key_len_bytes[2],
339 key_len_bytes[3],
340 key_len_bytes[4],
341 key_len_bytes[5],
342 key_len_bytes[6],
343 key_len_bytes[7],
344 ]) as usize;
345
346 pos += 8;
347
348 if pos + key_len > buffer.len() {
350 break;
351 }
352
353 let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
354 pos += key_len;
355
356 if pos + 8 > buffer.len() {
358 break;
359 }
360
361 let value_len_bytes = &buffer[pos..pos + 8];
362 let value_len = u64::from_be_bytes([
363 value_len_bytes[0],
364 value_len_bytes[1],
365 value_len_bytes[2],
366 value_len_bytes[3],
367 value_len_bytes[4],
368 value_len_bytes[5],
369 value_len_bytes[6],
370 value_len_bytes[7],
371 ]) as usize;
372
373 pos += 8;
374
375 if pos + value_len > buffer.len() {
377 break;
378 }
379
380 let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
381 pos += value_len;
382
383 if pos + 8 > buffer.len() {
385 break;
386 }
387
388 let term_bytes = &buffer[pos..pos + 8];
389 let term = u64::from_be_bytes([
390 term_bytes[0],
391 term_bytes[1],
392 term_bytes[2],
393 term_bytes[3],
394 term_bytes[4],
395 term_bytes[5],
396 term_bytes[6],
397 term_bytes[7],
398 ]);
399
400 pos += 8;
401
402 data.insert(key, (value, term));
404 }
405
406 Ok(())
407 }
408
409 async fn replay_wal(&self) -> Result<(), Error> {
411 let wal_path = self.data_dir.join("wal.log");
412 if !wal_path.exists() {
413 debug!("No WAL file found, skipping replay");
414 return Ok(());
415 }
416
417 let mut file = File::open(wal_path).await?;
418 let mut buffer = Vec::new();
419 file.read_to_end(&mut buffer).await?;
420
421 if buffer.is_empty() {
422 debug!("WAL file is empty, skipping replay");
423 return Ok(());
424 }
425
426 let mut pos = 0;
427 let mut operations = Vec::new();
428 let mut replayed_count = 0;
429
430 while pos + 17 < buffer.len() {
431 let _index = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
433 pos += 8;
434
435 let term = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
437 pos += 8;
438
439 let op_code = WalOpCode::from_u8(buffer[pos]);
441 pos += 1;
442
443 if pos + 8 > buffer.len() {
445 warn!("Incomplete key length at position {}, stopping replay", pos);
446 break;
447 }
448
449 let key_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
451 pos += 8;
452
453 if pos + key_len > buffer.len() {
455 warn!(
456 "Incomplete key data at position {} (need {} bytes, have {})",
457 pos,
458 key_len,
459 buffer.len() - pos
460 );
461 break;
462 }
463
464 let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
466 pos += key_len;
467
468 if pos + 8 > buffer.len() {
470 warn!(
471 "Incomplete value length at position {}, stopping replay",
472 pos
473 );
474 break;
475 }
476
477 let value_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
479 pos += 8;
480
481 let value = if value_len > 0 {
483 if pos + value_len > buffer.len() {
484 warn!("Incomplete value data at position {}, stopping replay", pos);
485 break;
486 }
487 let value_data = Bytes::from(buffer[pos..pos + value_len].to_vec());
488 pos += value_len;
489 Some(value_data)
490 } else {
491 None
492 };
493
494 let expire_at_secs = if pos + 8 <= buffer.len() {
508 let secs = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
509 pos += 8;
510 if secs > 0 { Some(secs) } else { None }
511 } else {
512 debug!(
515 "No expiration time field at position {}, assuming no TTL (incomplete WAL entry)",
516 pos
517 );
518 None
519 };
520
521 operations.push((op_code, key, value, term, expire_at_secs));
522 replayed_count += 1;
523 }
524
525 info!(
526 "Parsed {} WAL operations, applying to memory",
527 operations.len()
528 );
529
530 let mut applied_count = 0;
532 let mut skipped_expired = 0;
533 let now = std::time::SystemTime::now();
534 {
535 let mut data = self.data.write();
536
537 for (op_code, key, value, term, expire_at_secs) in operations {
538 match op_code {
539 WalOpCode::Insert => {
540 if let Some(value_data) = value {
541 let is_expired = if let Some(secs) = expire_at_secs {
543 let expire_at =
544 std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs);
545 now >= expire_at
546 } else {
547 false
548 };
549
550 if is_expired {
551 debug!("Skipped expired key during WAL replay: key={:?}", key);
553 skipped_expired += 1;
554 continue;
555 }
556
557 data.insert(key.clone(), (value_data, term));
558
559 if let Some(secs) = expire_at_secs {
561 if let Some(ref lease) = self.lease {
562 let expire_at = std::time::UNIX_EPOCH
563 + std::time::Duration::from_secs(secs);
564 let remaining = expire_at
565 .duration_since(now)
566 .map(|d| d.as_secs())
567 .unwrap_or(0);
568
569 if remaining > 0 {
570 lease.register(key.clone(), remaining);
571 debug!(
572 "Replayed INSERT with TTL: key={:?}, remaining={}s",
573 key, remaining
574 );
575 }
576 }
577 } else {
578 debug!("Replayed INSERT: key={:?}", key);
579 }
580
581 applied_count += 1;
582 } else {
583 warn!("INSERT operation without value");
584 }
585 }
586 WalOpCode::Delete => {
587 data.remove(&key);
588 if let Some(ref lease) = self.lease {
589 lease.unregister(&key);
590 }
591 applied_count += 1;
592 debug!("Replayed DELETE: key={:?}", key);
593 }
594 WalOpCode::Noop | WalOpCode::Config => {
595 applied_count += 1;
597 debug!("Replayed {:?} operation", op_code);
598 }
599 }
600 }
601 }
602
603 info!(
604 "WAL replay complete: {} operations replayed, {} applied, {} expired keys skipped",
605 replayed_count, applied_count, skipped_expired
606 );
607
608 if applied_count > 0 {
610 self.clear_wal_async().await?;
611 debug!(
612 "Cleared WAL after successful replay of {} operations",
613 applied_count
614 );
615 }
616
617 Ok(())
618 }
619
620 fn persist_data(&self) -> Result<(), Error> {
622 let data_copy: HashMap<Bytes, (Bytes, u64)> = {
624 let data = self.data.read();
625 data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
626 };
627
628 let data_path = self.data_dir.join("state.data");
630 let mut file = std::fs::OpenOptions::new()
631 .write(true)
632 .create(true)
633 .truncate(true)
634 .open(data_path)?;
635
636 for (key, (value, term)) in data_copy.iter() {
637 let key_len = key.len() as u64;
639 file.write_all(&key_len.to_be_bytes())?;
640
641 file.write_all(key)?;
643
644 let value_len = value.len() as u64;
646 file.write_all(&value_len.to_be_bytes())?;
647
648 file.write_all(value)?;
650
651 file.write_all(&term.to_be_bytes())?;
653 }
654
655 file.flush()?;
656 Ok(())
657 }
658
659 async fn persist_data_async(&self) -> Result<(), Error> {
661 let data_copy: HashMap<Bytes, (Bytes, u64)> = {
663 let data = self.data.read();
664 data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
665 };
666
667 let data_path = self.data_dir.join("state.data");
669 let mut file = OpenOptions::new()
670 .write(true)
671 .create(true)
672 .truncate(true)
673 .open(data_path)
674 .await?;
675
676 for (key, (value, term)) in data_copy.iter() {
677 let key_len = key.len() as u64;
679 file.write_all(&key_len.to_be_bytes()).await?;
680
681 file.write_all(key.as_ref()).await?;
683
684 let value_len = value.len() as u64;
686 file.write_all(&value_len.to_be_bytes()).await?;
687
688 file.write_all(value.as_ref()).await?;
690
691 file.write_all(&term.to_be_bytes()).await?;
693 }
694
695 file.flush().await?;
696
697 Ok(())
698 }
699
700 fn persist_metadata(&self) -> Result<(), Error> {
702 let metadata_path = self.data_dir.join("metadata.bin");
703 let mut file = std::fs::OpenOptions::new()
704 .write(true)
705 .create(true)
706 .truncate(true)
707 .open(metadata_path)?;
708
709 let index = self.last_applied_index.load(Ordering::SeqCst);
710 let term = self.last_applied_term.load(Ordering::SeqCst);
711
712 file.write_all(&index.to_be_bytes())?;
713 file.write_all(&term.to_be_bytes())?;
714
715 file.flush()?;
716 Ok(())
717 }
718
719 async fn persist_metadata_async(&self) -> Result<(), Error> {
720 let metadata_path = self.data_dir.join("metadata.bin");
721 let mut file = OpenOptions::new()
722 .write(true)
723 .create(true)
724 .truncate(true)
725 .open(metadata_path)
726 .await?;
727
728 let index = self.last_applied_index.load(Ordering::SeqCst);
729 let term = self.last_applied_term.load(Ordering::SeqCst);
730
731 file.write_all(&index.to_be_bytes()).await?;
732 file.write_all(&term.to_be_bytes()).await?;
733
734 file.flush().await?;
735 Ok(())
736 }
737
738 #[allow(unused)]
740 fn clear_wal(&self) -> Result<(), Error> {
741 let wal_path = self.data_dir.join("wal.log");
742 let mut file = std::fs::OpenOptions::new()
743 .write(true)
744 .create(true)
745 .truncate(true)
746 .open(wal_path)?;
747
748 file.set_len(0)?;
749 file.flush()?;
750 Ok(())
751 }
752
753 async fn clear_wal_async(&self) -> Result<(), Error> {
755 let wal_path = self.data_dir.join("wal.log");
756 let mut file = OpenOptions::new()
757 .write(true)
758 .create(true)
759 .truncate(true)
760 .open(wal_path)
761 .await?;
762
763 file.set_len(0).await?;
764 file.flush().await?;
765 Ok(())
766 }
767
768 pub async fn reset(&self) -> Result<(), Error> {
776 info!("Resetting state machine");
777
778 {
780 let mut data = self.data.write();
781 data.clear();
782 }
783
784 self.last_applied_index.store(0, Ordering::SeqCst);
786 self.last_applied_term.store(0, Ordering::SeqCst);
787
788 {
789 let mut snapshot_metadata = self.last_snapshot_metadata.write();
790 *snapshot_metadata = None;
791 }
792
793 self.clear_data_file().await?;
795 self.clear_metadata_file().await?;
796 self.clear_wal_async().await?;
797
798 info!("State machine reset completed");
799 Ok(())
800 }
801
802 async fn clear_data_file(&self) -> Result<(), Error> {
804 let data_path = self.data_dir.join("state.data");
805 let mut file = OpenOptions::new()
806 .write(true)
807 .create(true)
808 .truncate(true)
809 .open(data_path)
810 .await?;
811
812 file.set_len(0).await?;
813 file.flush().await?;
814 Ok(())
815 }
816
817 async fn clear_metadata_file(&self) -> Result<(), Error> {
819 let metadata_path = self.data_dir.join("metadata.bin");
820 let mut file = OpenOptions::new()
821 .write(true)
822 .create(true)
823 .truncate(true)
824 .open(metadata_path)
825 .await?;
826
827 file.write_all(&0u64.to_be_bytes()).await?;
829 file.write_all(&0u64.to_be_bytes()).await?;
830
831 file.flush().await?;
832 Ok(())
833 }
834
835 pub(crate) async fn append_to_wal(
846 &self,
847 entries: Vec<(Entry, String, Bytes, Option<Bytes>, u64)>,
848 ) -> Result<(), Error> {
849 if entries.is_empty() {
850 return Ok(());
851 }
852
853 let wal_path = self.data_dir.join("wal.log");
854
855 let mut file =
856 OpenOptions::new().write(true).create(true).append(true).open(&wal_path).await?;
857
858 let estimated_size: usize = entries
860 .iter()
861 .map(|(_, _, key, value, _)| {
862 8 + 8 + 1 + 8 + key.len() + 8 + value.as_ref().map_or(0, |v| v.len()) + 8
863 })
864 .sum();
865
866 let mut batch_buffer = Vec::with_capacity(estimated_size);
868
869 for (entry, operation, key, value, ttl_secs) in entries {
870 batch_buffer.extend_from_slice(&entry.index.to_be_bytes());
872 batch_buffer.extend_from_slice(&entry.term.to_be_bytes());
873
874 let op_code = WalOpCode::from_str(&operation);
876 batch_buffer.push(op_code as u8);
877
878 batch_buffer.extend_from_slice(&(key.len() as u64).to_be_bytes());
880 batch_buffer.extend_from_slice(&key);
881
882 if let Some(value_data) = value {
885 batch_buffer.extend_from_slice(&(value_data.len() as u64).to_be_bytes());
886 batch_buffer.extend_from_slice(&value_data);
887 } else {
888 batch_buffer.extend_from_slice(&0u64.to_be_bytes());
890 }
891
892 let expire_at_secs = if ttl_secs > 0 {
895 let expire_at =
896 std::time::SystemTime::now() + std::time::Duration::from_secs(ttl_secs);
897 expire_at
898 .duration_since(std::time::UNIX_EPOCH)
899 .map(|d| d.as_secs())
900 .unwrap_or(0)
901 } else {
902 0
903 };
904 batch_buffer.extend_from_slice(&expire_at_secs.to_be_bytes());
905 }
906
907 file.write_all(&batch_buffer).await?;
908 file.flush().await?;
909
910 Ok(())
911 }
912
913 #[allow(unused)]
933 pub(crate) async fn checkpoint(&self) -> Result<(), Error> {
934 self.persist_data_async().await?;
936 self.persist_metadata_async().await?;
937
938 self.clear_wal_async().await?;
940
941 Ok(())
942 }
943}
944
945impl Drop for FileStateMachine {
946 fn drop(&mut self) {
947 let timer = Instant::now();
948
949 match self.save_hard_state() {
951 Ok(_) => debug!("StateMachine saved in {:?}", timer.elapsed()),
952 Err(e) => error!("Failed to save StateMachine: {}", e),
953 }
954 }
955}
956
957#[async_trait]
958impl StateMachine for FileStateMachine {
959 async fn start(&self) -> Result<(), Error> {
960 self.running.store(true, Ordering::SeqCst);
961
962 if self.lease.is_some() {
964 self.load_lease_data().await?;
965 debug!("Lease data loaded during state machine initialization");
966 }
967
968 info!("File state machine started");
969 Ok(())
970 }
971
972 fn stop(&self) -> Result<(), Error> {
973 self.running.store(false, Ordering::SeqCst);
975
976 if let Some(ref lease) = self.lease {
979 let ttl_snapshot = lease.to_snapshot();
980 let ttl_path = self.data_dir.join("ttl_state.bin");
981 std::fs::write(&ttl_path, ttl_snapshot)
983 .map_err(d_engine_core::StorageError::IoError)?;
984 debug!("Persisted TTL state on shutdown");
985 }
986
987 info!("File state machine stopped");
988 Ok(())
989 }
990
991 fn is_running(&self) -> bool {
992 self.running.load(Ordering::SeqCst)
993 }
994
995 fn get(
996 &self,
997 key_buffer: &[u8],
998 ) -> Result<Option<Bytes>, Error> {
999 let data = self.data.read();
1004 Ok(data.get(key_buffer).map(|(value, _)| value.clone()))
1005 }
1006
1007 fn entry_term(
1008 &self,
1009 entry_id: u64,
1010 ) -> Option<u64> {
1011 let data = self.data.read();
1012 data.values().find(|(_, index)| *index == entry_id).map(|(_, term)| *term)
1013 }
1014
1015 async fn apply_chunk(
1017 &self,
1018 chunk: Vec<Entry>,
1019 ) -> Result<(), Error> {
1020 let mut highest_index_entry: Option<LogId> = None;
1021 let mut batch_operations = Vec::new();
1022
1023 for entry in chunk {
1025 let entry_index = entry.index;
1026
1027 assert!(entry.payload.is_some(), "Entry payload should not be None!");
1028
1029 if let Some(prev) = &highest_index_entry {
1031 assert!(
1032 entry.index > prev.index,
1033 "apply_chunk: received unordered entry at index {} (prev={})",
1034 entry.index,
1035 prev.index
1036 );
1037 }
1038 highest_index_entry = Some(LogId {
1039 index: entry.index,
1040 term: entry.term,
1041 });
1042
1043 match entry.payload.as_ref().unwrap().payload.as_ref() {
1045 Some(Payload::Noop(_)) => {
1046 debug!("Handling NOOP command at index {}", entry.index);
1047 batch_operations.push((entry, "NOOP", Bytes::new(), None, 0));
1048 }
1049 Some(Payload::Command(bytes)) => match WriteCommand::decode(&bytes[..]) {
1050 Ok(write_cmd) => {
1051 match write_cmd.operation {
1053 Some(Operation::Insert(Insert {
1054 key,
1055 value,
1056 ttl_secs,
1057 })) => {
1058 batch_operations.push((
1059 entry,
1060 "INSERT",
1061 key,
1062 Some(value),
1063 ttl_secs,
1064 ));
1065 }
1066 Some(Operation::Delete(Delete { key })) => {
1067 batch_operations.push((entry, "DELETE", key, None, 0));
1068 }
1069 None => {
1070 warn!("WriteCommand without operation at index {}", entry.index);
1071 batch_operations.push((entry, "NOOP", Bytes::new(), None, 0));
1072 }
1073 }
1074 }
1075 Err(e) => {
1076 error!(
1077 "Failed to decode WriteCommand at index {}: {:?}",
1078 entry.index, e
1079 );
1080 return Err(StorageError::SerializationError(e.to_string()).into());
1081 }
1082 },
1083 Some(Payload::Config(_config_change)) => {
1084 debug!("Ignoring config change at index {}", entry.index);
1085 batch_operations.push((entry, "CONFIG", Bytes::new(), None, 0));
1086 }
1087 None => panic!("Entry payload variant should not be None!"),
1088 }
1089
1090 info!("COMMITTED_LOG_METRIC: {}", entry_index);
1091 }
1092
1093 let mut wal_entries = Vec::new();
1095 for (entry, operation, key, value, ttl_secs) in &batch_operations {
1096 wal_entries.push((
1098 entry.clone(),
1099 operation.to_string(),
1100 key.clone(),
1101 value.clone(),
1102 *ttl_secs, ));
1104 }
1105
1106 self.append_to_wal(wal_entries).await?;
1108
1109 {
1111 let mut data = self.data.write();
1112
1113 for (entry, operation, key, value, ttl_secs) in batch_operations {
1115 match operation {
1116 "INSERT" => {
1117 if let Some(value) = value {
1118 data.insert(key.clone(), (value, entry.term));
1120
1121 if ttl_secs > 0 {
1123 if !self.lease_enabled {
1125 return Err(StorageError::FeatureNotEnabled(
1126 "TTL feature is not enabled on this server. \
1127 Enable it in config: [raft.state_machine.lease] enabled = true".into()
1128 ).into());
1129 }
1130
1131 let lease = unsafe { self.lease.as_ref().unwrap_unchecked() };
1133 lease.register(key, ttl_secs);
1134 }
1135 }
1136 }
1137 "DELETE" => {
1138 data.remove(&key);
1139 if let Some(ref lease) = self.lease {
1140 lease.unregister(&key);
1141 }
1142 }
1143 "NOOP" | "CONFIG" => {
1144 }
1146 _ => warn!("Unknown operation: {}", operation),
1147 }
1148 }
1149 } if let Some(log_id) = highest_index_entry {
1158 debug!("State machine - updated last_applied: {:?}", log_id);
1159 self.update_last_applied(log_id);
1160 }
1161
1162 self.persist_data_async().await?;
1164 self.persist_metadata_async().await?;
1165 self.clear_wal_async().await?;
1166
1167 Ok(())
1168 }
1169
1170 fn len(&self) -> usize {
1171 self.data.read().len()
1172 }
1173
1174 fn update_last_applied(
1175 &self,
1176 last_applied: LogId,
1177 ) {
1178 self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
1179 self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
1180 }
1181
1182 fn last_applied(&self) -> LogId {
1183 LogId {
1184 index: self.last_applied_index.load(Ordering::SeqCst),
1185 term: self.last_applied_term.load(Ordering::SeqCst),
1186 }
1187 }
1188
1189 fn persist_last_applied(
1190 &self,
1191 last_applied: LogId,
1192 ) -> Result<(), Error> {
1193 self.update_last_applied(last_applied);
1194 self.persist_metadata()
1195 }
1196
1197 fn update_last_snapshot_metadata(
1198 &self,
1199 snapshot_metadata: &SnapshotMetadata,
1200 ) -> Result<(), Error> {
1201 *self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
1202 Ok(())
1203 }
1204
1205 fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
1206 self.last_snapshot_metadata.read().clone()
1207 }
1208
1209 fn persist_last_snapshot_metadata(
1210 &self,
1211 snapshot_metadata: &SnapshotMetadata,
1212 ) -> Result<(), Error> {
1213 self.update_last_snapshot_metadata(snapshot_metadata)
1214 }
1215
1216 async fn apply_snapshot_from_file(
1217 &self,
1218 metadata: &SnapshotMetadata,
1219 snapshot_dir: std::path::PathBuf,
1220 ) -> Result<(), Error> {
1221 info!("Applying snapshot from file: {:?}", snapshot_dir);
1222
1223 let snapshot_data_path = snapshot_dir.join("snapshot.bin");
1225 let mut file = File::open(snapshot_data_path).await?;
1226 let mut buffer = Vec::new();
1227 file.read_to_end(&mut buffer).await?;
1228
1229 let mut pos = 0;
1231 let mut new_data = HashMap::new();
1232
1233 while pos < buffer.len() {
1234 if pos + 8 > buffer.len() {
1236 break;
1237 }
1238
1239 let key_len_bytes = &buffer[pos..pos + 8];
1240 let key_len = u64::from_be_bytes([
1241 key_len_bytes[0],
1242 key_len_bytes[1],
1243 key_len_bytes[2],
1244 key_len_bytes[3],
1245 key_len_bytes[4],
1246 key_len_bytes[5],
1247 key_len_bytes[6],
1248 key_len_bytes[7],
1249 ]) as usize;
1250
1251 pos += 8;
1252
1253 if pos + key_len > buffer.len() {
1255 break;
1256 }
1257
1258 let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
1259 pos += key_len;
1260
1261 if pos + 8 > buffer.len() {
1263 break;
1264 }
1265
1266 let value_len_bytes = &buffer[pos..pos + 8];
1267 let value_len = u64::from_be_bytes([
1268 value_len_bytes[0],
1269 value_len_bytes[1],
1270 value_len_bytes[2],
1271 value_len_bytes[3],
1272 value_len_bytes[4],
1273 value_len_bytes[5],
1274 value_len_bytes[6],
1275 value_len_bytes[7],
1276 ]) as usize;
1277
1278 pos += 8;
1279
1280 if pos + value_len > buffer.len() {
1282 break;
1283 }
1284
1285 let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
1286 pos += value_len;
1287
1288 if pos + 8 > buffer.len() {
1290 break;
1291 }
1292
1293 let term_bytes = &buffer[pos..pos + 8];
1294 let term = u64::from_be_bytes([
1295 term_bytes[0],
1296 term_bytes[1],
1297 term_bytes[2],
1298 term_bytes[3],
1299 term_bytes[4],
1300 term_bytes[5],
1301 term_bytes[6],
1302 term_bytes[7],
1303 ]);
1304
1305 pos += 8;
1306
1307 new_data.insert(key, (value, term));
1309 }
1310
1311 if pos + 8 <= buffer.len() {
1313 let ttl_len_bytes = &buffer[pos..pos + 8];
1314 let ttl_len = u64::from_be_bytes([
1315 ttl_len_bytes[0],
1316 ttl_len_bytes[1],
1317 ttl_len_bytes[2],
1318 ttl_len_bytes[3],
1319 ttl_len_bytes[4],
1320 ttl_len_bytes[5],
1321 ttl_len_bytes[6],
1322 ttl_len_bytes[7],
1323 ]) as usize;
1324 pos += 8;
1325
1326 if pos + ttl_len <= buffer.len() {
1327 let ttl_data = &buffer[pos..pos + ttl_len];
1328 if let Some(ref lease) = self.lease {
1329 lease.reload(ttl_data)?;
1330 }
1331 }
1332 }
1333
1334 {
1336 let mut data = self.data.write();
1337 *data = new_data;
1338 }
1339
1340 *self.last_snapshot_metadata.write() = Some(metadata.clone());
1342
1343 if let Some(last_included) = &metadata.last_included {
1344 self.update_last_applied(*last_included);
1345 }
1346
1347 self.persist_data_async().await?;
1349 self.persist_metadata_async().await?;
1350 self.clear_wal_async().await?;
1351
1352 info!("Snapshot applied successfully");
1353 Ok(())
1354 }
1355
1356 async fn generate_snapshot_data(
1357 &self,
1358 new_snapshot_dir: std::path::PathBuf,
1359 last_included: LogId,
1360 ) -> Result<Bytes, Error> {
1361 info!("Generating snapshot data up to {:?}", last_included);
1362
1363 fs::create_dir_all(&new_snapshot_dir).await?;
1365
1366 let snapshot_path = new_snapshot_dir.join("snapshot.bin");
1368 let mut file = File::create(&snapshot_path).await?;
1369
1370 let data_copy: HashMap<Bytes, (Bytes, u64)> = {
1371 let data = self.data.read();
1372 data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
1373 };
1374
1375 for (key, (value, term)) in data_copy.iter() {
1377 let key_len = key.len() as u64;
1379 file.write_all(&key_len.to_be_bytes()).await?;
1380
1381 file.write_all(key).await?;
1383
1384 let value_len = value.len() as u64;
1386 file.write_all(&value_len.to_be_bytes()).await?;
1387
1388 file.write_all(value).await?;
1390
1391 file.write_all(&term.to_be_bytes()).await?;
1393 }
1394
1395 let lease_snapshot = if let Some(ref lease) = self.lease {
1397 lease.to_snapshot()
1398 } else {
1399 Vec::new()
1400 };
1401
1402 let lease_len = lease_snapshot.len() as u64;
1404 file.write_all(&lease_len.to_be_bytes()).await?;
1405
1406 file.write_all(&lease_snapshot).await?;
1408
1409 file.flush().await?;
1410
1411 let metadata = SnapshotMetadata {
1413 last_included: Some(last_included),
1414 checksum: Bytes::from(vec![0; 32]), };
1416
1417 self.update_last_snapshot_metadata(&metadata)?;
1418
1419 info!("Snapshot generated at {:?}", snapshot_path);
1420
1421 Ok(Bytes::from_static(&[0u8; 32]))
1423 }
1424
1425 fn save_hard_state(&self) -> Result<(), Error> {
1426 let last_applied = self.last_applied();
1427 self.persist_last_applied(last_applied)?;
1428
1429 if let Some(last_snapshot_metadata) = self.snapshot_metadata() {
1430 self.persist_last_snapshot_metadata(&last_snapshot_metadata)?;
1431 }
1432
1433 self.flush()?;
1434 Ok(())
1435 }
1436
1437 fn flush(&self) -> Result<(), Error> {
1438 self.persist_data()?;
1439 self.persist_metadata()?;
1440 Ok(())
1442 }
1443
1444 async fn flush_async(&self) -> Result<(), Error> {
1445 self.persist_data_async().await?;
1446 self.persist_metadata_async().await?;
1447 Ok(())
1449 }
1450
1451 async fn reset(&self) -> Result<(), Error> {
1452 self.reset().await
1453 }
1454
1455 async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
1456 let Some(ref lease) = self.lease else {
1458 return Ok(vec![]);
1459 };
1460
1461 let now = SystemTime::now();
1463 let expired_keys = lease.get_expired_keys(now);
1464
1465 if expired_keys.is_empty() {
1466 return Ok(vec![]);
1467 }
1468
1469 debug!(
1470 "Lease background cleanup: found {} expired keys",
1471 expired_keys.len()
1472 );
1473
1474 {
1476 let mut data = self.data.write();
1477 for key in &expired_keys {
1478 data.remove(key);
1479 }
1480 }
1481
1482 if let Err(e) = self.persist_data() {
1484 error!("Failed to persist after background cleanup: {:?}", e);
1485 }
1486
1487 info!(
1488 "Lease background cleanup: deleted {} expired keys",
1489 expired_keys.len()
1490 );
1491
1492 Ok(expired_keys)
1493 }
1494}