1#![allow(dead_code)]
7
8use crate::model::{Triple, TriplePattern};
9use crate::OxirsError;
10use async_trait::async_trait;
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, HashSet, VecDeque};
13use std::net::SocketAddr;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::{mpsc, oneshot, Mutex, RwLock};
17use tokio::time::interval;
18
19#[derive(Debug, Clone)]
21pub struct RaftConfig {
22 pub node_id: String,
24 pub peers: Vec<RaftPeer>,
26 pub election_timeout: (u64, u64),
28 pub heartbeat_interval: u64,
30 pub compaction: CompactionConfig,
32 pub snapshot: SnapshotConfig,
34 pub storage_path: String,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct RaftPeer {
41 pub id: String,
43 pub address: SocketAddr,
45 pub voting: bool,
47}
48
49#[derive(Debug, Clone)]
51pub struct CompactionConfig {
52 pub auto_compact: bool,
54 pub threshold: usize,
56 pub min_entries: usize,
58 pub delta_compression: bool,
60 pub batch_size: usize,
62}
63
64impl Default for CompactionConfig {
65 fn default() -> Self {
66 CompactionConfig {
67 auto_compact: true,
68 threshold: 10000,
69 min_entries: 1000,
70 delta_compression: true,
71 batch_size: 1000,
72 }
73 }
74}
75
76#[derive(Debug, Clone)]
78struct VoteRequestParams {
79 pub request_term: u64,
80 pub candidate_id: String,
81 pub last_log_index: u64,
82 pub last_log_term: u64,
83}
84
85#[derive(Debug, Clone)]
87struct AppendEntriesParams {
88 pub request_term: u64,
89 pub leader_id: String,
90 pub prev_log_index: u64,
91 pub prev_log_term: u64,
92 pub entries: Vec<RaftLogEntry>,
93 pub leader_commit: u64,
94}
95
96#[derive(Debug, Clone)]
98pub struct SnapshotConfig {
99 pub auto_snapshot: bool,
101 pub interval: usize,
103 pub incremental: bool,
105 pub compression: bool,
107 pub max_concurrent: usize,
109}
110
111impl Default for SnapshotConfig {
112 fn default() -> Self {
113 SnapshotConfig {
114 auto_snapshot: true,
115 interval: 50000,
116 incremental: true,
117 compression: true,
118 max_concurrent: 2,
119 }
120 }
121}
122
123#[derive(Debug, Clone, PartialEq)]
125pub enum NodeState {
126 Follower,
127 Candidate,
128 Leader,
129 Learner,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
134pub enum LogEntry {
135 AddTriple(Triple),
137 RemoveTriple(Triple),
139 BatchAdd(Vec<Triple>),
141 BatchRemove(Vec<Triple>),
143 ConfigChange(ConfigChangeEntry),
145 SnapshotMarker(SnapshotInfo),
147 CompactedEntry(CompactedData),
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct ConfigChangeEntry {
154 pub change_type: ConfigChangeType,
156 pub peer: RaftPeer,
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
162pub enum ConfigChangeType {
163 AddNode,
164 RemoveNode,
165 PromoteToVoter,
166 DemoteToLearner,
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct SnapshotInfo {
172 pub id: String,
174 pub last_index: u64,
176 pub last_term: u64,
178 pub size: usize,
180 pub checksum: String,
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct CompactedData {
187 pub start_index: u64,
189 pub end_index: u64,
191 pub size: usize,
193 pub data: Vec<u8>,
195 pub base_snapshot: Option<String>,
197}
198
199#[derive(Debug)]
201pub enum RaftMessage {
202 VoteRequest {
204 term: u64,
205 candidate_id: String,
206 last_log_index: u64,
207 last_log_term: u64,
208 },
209 VoteResponse { term: u64, vote_granted: bool },
211 AppendEntries {
213 term: u64,
214 _leader_id: String,
215 prev_log_index: u64,
216 prev_log_term: u64,
217 entries: Vec<RaftLogEntry>,
218 leader_commit: u64,
219 },
220 AppendResponse {
222 term: u64,
223 success: bool,
224 match_index: u64,
225 conflict_term: Option<u64>,
226 conflict_index: Option<u64>,
227 },
228 InstallSnapshot {
230 term: u64,
231 _leader_id: String,
232 last_included_index: u64,
233 last_included_term: u64,
234 offset: u64,
235 data: Vec<u8>,
236 done: bool,
237 },
238 ClientRequest {
240 request_id: String,
241 entry: LogEntry,
242 response_tx: oneshot::Sender<Result<(), OxirsError>>,
243 },
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct RaftLogEntry {
249 pub index: u64,
251 pub term: u64,
253 pub entry: LogEntry,
255 pub timestamp: u64,
257}
258
259pub struct RaftNode {
261 config: RaftConfig,
263 state: Arc<RwLock<NodeState>>,
265 current_term: Arc<RwLock<u64>>,
267 voted_for: Arc<RwLock<Option<String>>>,
269 log: Arc<RwLock<RaftLog>>,
271 commit_index: Arc<RwLock<u64>>,
273 last_applied: Arc<RwLock<u64>>,
275 leader_state: Arc<RwLock<Option<LeaderState>>>,
277 message_tx: mpsc::Sender<RaftMessage>,
279 message_rx: Arc<Mutex<mpsc::Receiver<RaftMessage>>>,
280 shutdown: Arc<RwLock<bool>>,
282 stats: Arc<RwLock<RaftStats>>,
284}
285
286struct RaftLog {
288 entries: VecDeque<RaftLogEntry>,
290 compacted: HashMap<u64, CompactedData>,
292 snapshots: HashMap<String, SnapshotInfo>,
294 start_index: u64,
296 compaction_state: CompactionState,
298}
299
300struct CompactionState {
302 last_compacted: u64,
304 pending: Option<CompactionJob>,
306 stats: CompactionStats,
308}
309
310#[allow(dead_code)]
312struct CompactionJob {
313 start: u64,
315 end: u64,
317 start_time: Instant,
319}
320
321#[derive(Debug, Default)]
323struct CompactionStats {
324 total_compactions: u64,
326 entries_compacted: u64,
328 space_saved_bytes: u64,
330 compression_ratio: f64,
332}
333
334struct LeaderState {
336 next_index: HashMap<String, u64>,
338 match_index: HashMap<String, u64>,
340 replication_progress: HashMap<String, ReplicationProgress>,
342 pending_requests: HashMap<String, PendingRequest>,
344}
345
346struct ReplicationProgress {
348 last_sent: Instant,
350 failures: u32,
352 in_flight: u64,
354 bandwidth_bps: f64,
356}
357
358struct PendingRequest {
360 request_id: String,
362 log_index: u64,
364 response_tx: oneshot::Sender<Result<(), OxirsError>>,
366 timeout: Instant,
368}
369
370#[derive(Debug, Default)]
372struct RaftStats {
373 elections_held: u64,
375 elections_won: u64,
377 messages_sent: u64,
379 messages_received: u64,
381 entries_replicated: u64,
383 snapshots_sent: u64,
385 snapshots_received: u64,
387}
388
389impl RaftNode {
390 pub async fn new(config: RaftConfig) -> Result<Self, OxirsError> {
392 let (message_tx, message_rx) = mpsc::channel(10000);
393
394 Ok(RaftNode {
395 config,
396 state: Arc::new(RwLock::new(NodeState::Follower)),
397 current_term: Arc::new(RwLock::new(0)),
398 voted_for: Arc::new(RwLock::new(None)),
399 log: Arc::new(RwLock::new(RaftLog::new())),
400 commit_index: Arc::new(RwLock::new(0)),
401 last_applied: Arc::new(RwLock::new(0)),
402 leader_state: Arc::new(RwLock::new(None)),
403 message_tx,
404 message_rx: Arc::new(Mutex::new(message_rx)),
405 shutdown: Arc::new(RwLock::new(false)),
406 stats: Arc::new(RwLock::new(RaftStats::default())),
407 })
408 }
409
410 pub async fn start(&self) -> Result<(), OxirsError> {
412 self.spawn_message_processor();
414
415 self.spawn_election_timer();
417
418 self.spawn_heartbeat_timer();
420
421 if self.config.compaction.auto_compact {
423 self.spawn_compaction_worker();
424 }
425
426 if self.config.snapshot.auto_snapshot {
428 self.spawn_snapshot_manager();
429 }
430
431 Ok(())
432 }
433
434 pub async fn submit(&self, entry: LogEntry) -> Result<(), OxirsError> {
436 let state = self.state.read().await;
437 if *state != NodeState::Leader {
438 return Err(OxirsError::Store("Not the leader".to_string()));
439 }
440
441 let request_id = uuid::Uuid::new_v4().to_string();
442 let (response_tx, response_rx) = oneshot::channel();
443
444 self.message_tx
445 .send(RaftMessage::ClientRequest {
446 request_id,
447 entry,
448 response_tx,
449 })
450 .await
451 .map_err(|_| OxirsError::Store("Failed to send request".to_string()))?;
452
453 response_rx
454 .await
455 .map_err(|_| OxirsError::Store("Request cancelled".to_string()))?
456 }
457
458 pub async fn query(&self, pattern: &TriplePattern) -> Result<Vec<Triple>, OxirsError> {
460 let log = self.log.read().await;
461 let last_applied = *self.last_applied.read().await;
462
463 let mut results = Vec::new();
464 let mut current_state = HashSet::new();
465
466 for entry in &log.entries {
468 if entry.index > last_applied {
469 break;
470 }
471
472 match &entry.entry {
473 LogEntry::AddTriple(triple) => {
474 current_state.insert(triple.clone());
475 }
476 LogEntry::RemoveTriple(triple) => {
477 current_state.remove(triple);
478 }
479 LogEntry::BatchAdd(triples) => {
480 for triple in triples {
481 current_state.insert(triple.clone());
482 }
483 }
484 LogEntry::BatchRemove(triples) => {
485 for triple in triples {
486 current_state.remove(triple);
487 }
488 }
489 _ => {}
490 }
491 }
492
493 for triple in current_state {
495 if pattern.matches(&triple) {
496 results.push(triple);
497 }
498 }
499
500 Ok(results)
501 }
502
503 fn spawn_message_processor(&self) {
505 let message_rx = self.message_rx.clone();
506 let state = self.state.clone();
507 let current_term = self.current_term.clone();
508 let voted_for = self.voted_for.clone();
509 let log = self.log.clone();
510 let commit_index = self.commit_index.clone();
511 let leader_state = self.leader_state.clone();
512 let stats = self.stats.clone();
513 let node_id = self.config.node_id.clone();
514
515 tokio::spawn(async move {
516 let mut rx = message_rx.lock().await;
517 while let Some(message) = rx.recv().await {
518 let mut stats_guard = stats.write().await;
519 stats_guard.messages_received += 1;
520 drop(stats_guard);
521
522 match message {
523 RaftMessage::VoteRequest {
524 term,
525 candidate_id,
526 last_log_index,
527 last_log_term,
528 } => {
529 Self::handle_vote_request(
530 &state,
531 ¤t_term,
532 &voted_for,
533 &log,
534 &node_id,
535 VoteRequestParams {
536 request_term: term,
537 candidate_id,
538 last_log_index,
539 last_log_term,
540 },
541 )
542 .await;
543 }
544 RaftMessage::AppendEntries {
545 term,
546 _leader_id,
547 prev_log_index,
548 prev_log_term,
549 entries,
550 leader_commit,
551 } => {
552 Self::handle_append_entries(
553 &state,
554 ¤t_term,
555 &log,
556 &commit_index,
557 AppendEntriesParams {
558 request_term: term,
559 leader_id: _leader_id,
560 prev_log_index,
561 prev_log_term,
562 entries,
563 leader_commit,
564 },
565 )
566 .await;
567 }
568 RaftMessage::ClientRequest {
569 request_id,
570 entry,
571 response_tx,
572 } => {
573 Self::handle_client_request(
574 &state,
575 ¤t_term,
576 &log,
577 &leader_state,
578 request_id,
579 entry,
580 response_tx,
581 )
582 .await;
583 }
584 _ => {}
585 }
586 }
587 });
588 }
589
590 fn spawn_election_timer(&self) {
592 let state = self.state.clone();
593 let current_term = self.current_term.clone();
594 let voted_for = self.voted_for.clone();
595 let config = self.config.clone();
596 let shutdown = self.shutdown.clone();
597 let stats = self.stats.clone();
598
599 tokio::spawn(async move {
600 #[allow(unused_imports)]
601 use scirs2_core::random::{Random, Rng};
602
603 loop {
604 let timeout = {
606 let mut random = Random::default();
607 random.gen_range(config.election_timeout.0..config.election_timeout.1)
608 };
609 tokio::time::sleep(Duration::from_millis(timeout)).await;
610
611 if *shutdown.read().await {
612 break;
613 }
614
615 let current_state = state.read().await.clone();
616 if current_state == NodeState::Follower || current_state == NodeState::Candidate {
617 Self::start_election(&state, ¤t_term, &voted_for, &config, &stats).await;
619 }
620 }
621 });
622 }
623
624 fn spawn_heartbeat_timer(&self) {
626 let state = self.state.clone();
627 let config = self.config.clone();
628 let shutdown = self.shutdown.clone();
629
630 tokio::spawn(async move {
631 let mut interval = interval(Duration::from_millis(config.heartbeat_interval));
632
633 loop {
634 interval.tick().await;
635
636 if *shutdown.read().await {
637 break;
638 }
639
640 let current_state = state.read().await.clone();
641 if current_state == NodeState::Leader {
642 Self::send_heartbeats(&config).await;
644 }
645 }
646 });
647 }
648
649 fn spawn_compaction_worker(&self) {
651 let log = self.log.clone();
652 let config = self.config.clone();
653 let shutdown = self.shutdown.clone();
654
655 tokio::spawn(async move {
656 let mut interval = interval(Duration::from_secs(60)); loop {
659 interval.tick().await;
660
661 if *shutdown.read().await {
662 break;
663 }
664
665 let mut log_guard = log.write().await;
667 if log_guard.entries.len() > config.compaction.threshold {
668 Self::compact_log(&mut log_guard, &config.compaction).await;
669 }
670 }
671 });
672 }
673
674 fn spawn_snapshot_manager(&self) {
676 let log = self.log.clone();
677 let commit_index = self.commit_index.clone();
678 let config = self.config.clone();
679 let shutdown = self.shutdown.clone();
680
681 tokio::spawn(async move {
682 let mut interval = interval(Duration::from_secs(300)); loop {
685 interval.tick().await;
686
687 if *shutdown.read().await {
688 break;
689 }
690
691 let current_commit = *commit_index.read().await;
692 let log_guard = log.read().await;
693
694 if let Some(last_snapshot) =
696 log_guard.snapshots.values().max_by_key(|s| s.last_index)
697 {
698 if current_commit - last_snapshot.last_index > config.snapshot.interval as u64 {
699 drop(log_guard);
700 Self::create_snapshot(&log, current_commit, &config.snapshot).await;
701 }
702 } else if current_commit > config.snapshot.interval as u64 {
703 drop(log_guard);
704 Self::create_snapshot(&log, current_commit, &config.snapshot).await;
705 }
706 }
707 });
708 }
709
710 async fn handle_vote_request(
712 state: &Arc<RwLock<NodeState>>,
713 current_term: &Arc<RwLock<u64>>,
714 voted_for: &Arc<RwLock<Option<String>>>,
715 log: &Arc<RwLock<RaftLog>>,
716 node_id: &str,
717 request: VoteRequestParams,
718 ) {
719 let mut term = current_term.write().await;
720 let mut voted = voted_for.write().await;
721
722 if request.request_term > *term {
724 *term = request.request_term;
725 *voted = None;
726 *state.write().await = NodeState::Follower;
727 }
728
729 let vote_granted = if request.request_term < *term
731 || (voted.as_ref().is_some_and(|v| v != &request.candidate_id))
732 {
733 false
734 } else {
735 let log_guard = log.read().await;
737 let our_last_index = log_guard.last_index();
738 let our_last_term = log_guard.last_term();
739 drop(log_guard);
740
741 request.last_log_term > our_last_term
742 || (request.last_log_term == our_last_term
743 && request.last_log_index >= our_last_index)
744 };
745
746 if vote_granted {
747 *voted = Some(request.candidate_id);
748 }
749
750 tracing::info!(
752 "Node {} vote response: term={}, granted={}",
753 node_id,
754 *term,
755 vote_granted
756 );
757 }
758
759 async fn handle_append_entries(
761 state: &Arc<RwLock<NodeState>>,
762 current_term: &Arc<RwLock<u64>>,
763 log: &Arc<RwLock<RaftLog>>,
764 commit_index: &Arc<RwLock<u64>>,
765 request: AppendEntriesParams,
766 ) {
767 let mut term = current_term.write().await;
768
769 if request.request_term > *term {
771 *term = request.request_term;
772 *state.write().await = NodeState::Follower;
773 }
774
775 if request.request_term < *term {
777 return;
778 }
779
780 *state.write().await = NodeState::Follower;
782
783 let mut log_guard = log.write().await;
785 let success = if request.prev_log_index == 0 {
786 true
787 } else if let Some(entry) = log_guard.get(request.prev_log_index) {
788 entry.term == request.prev_log_term
789 } else {
790 false
791 };
792
793 if success {
794 for entry in request.entries {
796 log_guard.append(entry);
797 }
798
799 if request.leader_commit > *commit_index.read().await {
801 let last_index = log_guard.last_index();
802 *commit_index.write().await = request.leader_commit.min(last_index);
803 }
804 }
805 }
806
807 async fn handle_client_request(
809 state: &Arc<RwLock<NodeState>>,
810 current_term: &Arc<RwLock<u64>>,
811 log: &Arc<RwLock<RaftLog>>,
812 leader_state: &Arc<RwLock<Option<LeaderState>>>,
813 request_id: String,
814 entry: LogEntry,
815 response_tx: oneshot::Sender<Result<(), OxirsError>>,
816 ) {
817 let current_state = state.read().await.clone();
818 if current_state != NodeState::Leader {
819 let _ = response_tx.send(Err(OxirsError::Store("Not the leader".to_string())));
820 return;
821 }
822
823 let term = *current_term.read().await;
825 let mut log_guard = log.write().await;
826 let index = log_guard.next_index();
827
828 let raft_entry = RaftLogEntry {
829 index,
830 term,
831 entry,
832 timestamp: std::time::SystemTime::now()
833 .duration_since(std::time::UNIX_EPOCH)
834 .expect("SystemTime should be after UNIX_EPOCH")
835 .as_secs(),
836 };
837
838 log_guard.append(raft_entry);
839 drop(log_guard);
840
841 if let Some(ref mut leader) = *leader_state.write().await {
843 leader.pending_requests.insert(
844 request_id.clone(),
845 PendingRequest {
846 request_id,
847 log_index: index,
848 response_tx,
849 timeout: Instant::now() + Duration::from_secs(5),
850 },
851 );
852 }
853 }
854
855 async fn start_election(
857 state: &Arc<RwLock<NodeState>>,
858 current_term: &Arc<RwLock<u64>>,
859 voted_for: &Arc<RwLock<Option<String>>>,
860 config: &RaftConfig,
861 stats: &Arc<RwLock<RaftStats>>,
862 ) {
863 *state.write().await = NodeState::Candidate;
864 let mut term = current_term.write().await;
865 *term += 1;
866 *voted_for.write().await = Some(config.node_id.clone());
867
868 let mut stats_guard = stats.write().await;
869 stats_guard.elections_held += 1;
870
871 tracing::info!(
872 "Node {} starting election for term {}",
873 config.node_id,
874 *term
875 );
876
877 }
880
881 async fn send_heartbeats(config: &RaftConfig) {
883 tracing::debug!("Leader {} sending heartbeats", config.node_id);
885 }
886
887 async fn compact_log(log: &mut RaftLog, config: &CompactionConfig) {
889 if log.entries.len() <= config.min_entries {
890 return;
891 }
892 let entries_to_compact = log.entries.len() - config.min_entries;
893
894 tracing::info!(
895 "Starting log compaction, compacting {} entries",
896 entries_to_compact
897 );
898
899 let start_index = log.start_index;
901 let end_index = start_index + entries_to_compact as u64;
902
903 log.compaction_state.pending = Some(CompactionJob {
904 start: start_index,
905 end: end_index,
906 start_time: Instant::now(),
907 });
908
909 let mut compacted_data = Vec::new();
911 let mut removed_entries = Vec::new();
912
913 for _ in 0..entries_to_compact {
914 if let Some(entry) = log.entries.pop_front() {
915 let serialized = oxicode::serde::encode_to_vec(&entry, oxicode::config::standard())
917 .expect("serialization should succeed for valid entry");
918 compacted_data.extend_from_slice(&serialized);
919 removed_entries.push(entry);
920 }
921 }
922
923 let compressed = if config.delta_compression {
925 oxiarc_zstd::encode_all(&compacted_data, 3).expect("zstd compression should succeed")
927 } else {
928 compacted_data
929 };
930
931 let compressed_size = compressed.len();
933 let compacted = CompactedData {
934 start_index,
935 end_index,
936 size: compressed_size,
937 data: compressed,
938 base_snapshot: None,
939 };
940
941 log.compacted.insert(start_index, compacted);
942 log.start_index = end_index;
943
944 log.compaction_state.last_compacted = end_index;
946 log.compaction_state.pending = None;
947 log.compaction_state.stats.total_compactions += 1;
948 log.compaction_state.stats.entries_compacted += entries_to_compact as u64;
949 log.compaction_state.stats.space_saved_bytes +=
950 (removed_entries.len() * std::mem::size_of::<RaftLogEntry>()) as u64
951 - compressed_size as u64;
952
953 tracing::info!(
954 "Log compaction completed, saved {} bytes",
955 log.compaction_state.stats.space_saved_bytes
956 );
957 }
958
959 async fn create_snapshot(
961 log: &Arc<RwLock<RaftLog>>,
962 last_index: u64,
963 _config: &SnapshotConfig,
964 ) {
965 tracing::info!("Creating snapshot at index {}", last_index);
966
967 let snapshot_id = uuid::Uuid::new_v4().to_string();
969 let snapshot_info = SnapshotInfo {
970 id: snapshot_id.clone(),
971 last_index,
972 last_term: 0, size: 0, checksum: "dummy".to_string(),
975 };
976
977 let mut log_guard = log.write().await;
978 log_guard.snapshots.insert(snapshot_id, snapshot_info);
979 }
980}
981
982impl RaftLog {
983 fn new() -> Self {
985 RaftLog {
986 entries: VecDeque::new(),
987 compacted: HashMap::new(),
988 snapshots: HashMap::new(),
989 start_index: 1,
990 compaction_state: CompactionState {
991 last_compacted: 0,
992 pending: None,
993 stats: CompactionStats::default(),
994 },
995 }
996 }
997
998 fn get(&self, index: u64) -> Option<&RaftLogEntry> {
1000 if index < self.start_index {
1001 None
1003 } else {
1004 let offset = (index - self.start_index) as usize;
1005 self.entries.get(offset)
1006 }
1007 }
1008
1009 fn append(&mut self, entry: RaftLogEntry) {
1011 self.entries.push_back(entry);
1012 }
1013
1014 fn last_index(&self) -> u64 {
1016 if self.entries.is_empty() {
1017 self.start_index - 1
1018 } else {
1019 self.start_index + self.entries.len() as u64 - 1
1020 }
1021 }
1022
1023 fn last_term(&self) -> u64 {
1025 self.entries.back().map(|e| e.term).unwrap_or(0)
1026 }
1027
1028 fn next_index(&self) -> u64 {
1030 self.last_index() + 1
1031 }
1032}
1033
1034#[async_trait]
1036pub trait RaftStorage: Send + Sync {
1037 async fn save_term(&mut self, term: u64) -> Result<(), OxirsError>;
1039
1040 async fn load_term(&self) -> Result<u64, OxirsError>;
1042
1043 async fn save_voted_for(&mut self, voted_for: Option<String>) -> Result<(), OxirsError>;
1045
1046 async fn load_voted_for(&self) -> Result<Option<String>, OxirsError>;
1048
1049 async fn append_entries(&mut self, entries: Vec<RaftLogEntry>) -> Result<(), OxirsError>;
1051
1052 async fn load_entries(&self, start: u64, end: u64) -> Result<Vec<RaftLogEntry>, OxirsError>;
1054
1055 async fn save_snapshot(
1057 &mut self,
1058 snapshot: SnapshotInfo,
1059 data: Vec<u8>,
1060 ) -> Result<(), OxirsError>;
1061
1062 async fn load_snapshot(&self, id: &str) -> Result<(SnapshotInfo, Vec<u8>), OxirsError>;
1064}
1065
1066#[cfg(test)]
1067mod tests {
1068 use super::*;
1069 use crate::model::{Literal, NamedNode};
1070
1071 #[tokio::test]
1072 async fn test_raft_node_creation() {
1073 let config = RaftConfig {
1074 node_id: "node1".to_string(),
1075 peers: vec![],
1076 election_timeout: (150, 300),
1077 heartbeat_interval: 50,
1078 compaction: CompactionConfig::default(),
1079 snapshot: SnapshotConfig::default(),
1080 storage_path: "/tmp/raft_test".to_string(),
1081 };
1082
1083 let node = RaftNode::new(config)
1084 .await
1085 .expect("async operation should succeed");
1086
1087 assert_eq!(*node.state.read().await, NodeState::Follower);
1089 assert_eq!(*node.current_term.read().await, 0);
1090 assert_eq!(*node.commit_index.read().await, 0);
1091 }
1092
1093 #[tokio::test]
1094 async fn test_log_operations() {
1095 let mut log = RaftLog::new();
1096
1097 for i in 1..=10 {
1099 let entry = RaftLogEntry {
1100 index: i,
1101 term: 1,
1102 entry: LogEntry::AddTriple(Triple::new(
1103 NamedNode::new(format!("http://example.org/s{i}"))
1104 .expect("valid IRI from format"),
1105 NamedNode::new("http://example.org/p").expect("valid IRI"),
1106 crate::model::Object::Literal(Literal::new(format!("value{i}"))),
1107 )),
1108 timestamp: i,
1109 };
1110 log.append(entry);
1111 }
1112
1113 assert_eq!(log.last_index(), 10);
1114 assert_eq!(log.last_term(), 1);
1115 assert_eq!(log.entries.len(), 10);
1116
1117 assert!(log.get(5).is_some());
1119 assert_eq!(log.get(5).expect("index should be valid").index, 5);
1120 }
1121
1122 #[tokio::test]
1123 async fn test_log_compaction() {
1124 let mut log = RaftLog::new();
1125
1126 for i in 1..=100 {
1128 let entry = RaftLogEntry {
1129 index: i,
1130 term: 1,
1131 entry: LogEntry::AddTriple(Triple::new(
1132 NamedNode::new(format!("http://example.org/s{i}"))
1133 .expect("valid IRI from format"),
1134 NamedNode::new("http://example.org/p").expect("valid IRI"),
1135 crate::model::Object::Literal(Literal::new(format!("value{i}"))),
1136 )),
1137 timestamp: i,
1138 };
1139 log.append(entry);
1140 }
1141
1142 let config = CompactionConfig {
1143 auto_compact: true,
1144 threshold: 50,
1145 min_entries: 10,
1146 delta_compression: true,
1147 batch_size: 10,
1148 };
1149
1150 RaftNode::compact_log(&mut log, &config).await;
1152
1153 assert!(log.entries.len() <= config.min_entries);
1155 assert!(log.start_index > 1);
1156 assert!(!log.compacted.is_empty());
1157 assert_eq!(log.compaction_state.stats.total_compactions, 1);
1158 }
1159}