Skip to main content

oxirs_core/distributed/
raft.rs

1//! Raft consensus with optimized log compaction
2//!
3//! This module implements the Raft consensus algorithm optimized for RDF data,
4//! with efficient log compaction and snapshot management.
5
6#![allow(dead_code)]
7
8use crate::model::{Triple, TriplePattern};
9use crate::OxirsError;
10use async_trait::async_trait;
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, HashSet, VecDeque};
13use std::net::SocketAddr;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::{mpsc, oneshot, Mutex, RwLock};
17use tokio::time::interval;
18
19/// Raft configuration
20#[derive(Debug, Clone)]
21pub struct RaftConfig {
22    /// Node ID
23    pub node_id: String,
24    /// Cluster peers
25    pub peers: Vec<RaftPeer>,
26    /// Election timeout range (ms)
27    pub election_timeout: (u64, u64),
28    /// Heartbeat interval (ms)
29    pub heartbeat_interval: u64,
30    /// Log compaction configuration
31    pub compaction: CompactionConfig,
32    /// Snapshot configuration
33    pub snapshot: SnapshotConfig,
34    /// Storage path
35    pub storage_path: String,
36}
37
38/// Raft peer information
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct RaftPeer {
41    /// Peer ID
42    pub id: String,
43    /// Peer address
44    pub address: SocketAddr,
45    /// Voting member
46    pub voting: bool,
47}
48
49/// Log compaction configuration
50#[derive(Debug, Clone)]
51pub struct CompactionConfig {
52    /// Enable automatic compaction
53    pub auto_compact: bool,
54    /// Compaction threshold (number of entries)
55    pub threshold: usize,
56    /// Minimum entries to keep
57    pub min_entries: usize,
58    /// Delta compression for similar entries
59    pub delta_compression: bool,
60    /// Batch size for compaction
61    pub batch_size: usize,
62}
63
64impl Default for CompactionConfig {
65    fn default() -> Self {
66        CompactionConfig {
67            auto_compact: true,
68            threshold: 10000,
69            min_entries: 1000,
70            delta_compression: true,
71            batch_size: 1000,
72        }
73    }
74}
75
76/// Vote request parameters
77#[derive(Debug, Clone)]
78struct VoteRequestParams {
79    pub request_term: u64,
80    pub candidate_id: String,
81    pub last_log_index: u64,
82    pub last_log_term: u64,
83}
84
85/// Append entries request parameters  
86#[derive(Debug, Clone)]
87struct AppendEntriesParams {
88    pub request_term: u64,
89    pub leader_id: String,
90    pub prev_log_index: u64,
91    pub prev_log_term: u64,
92    pub entries: Vec<RaftLogEntry>,
93    pub leader_commit: u64,
94}
95
96/// Snapshot configuration
97#[derive(Debug, Clone)]
98pub struct SnapshotConfig {
99    /// Enable automatic snapshots
100    pub auto_snapshot: bool,
101    /// Snapshot interval (entries)
102    pub interval: usize,
103    /// Incremental snapshots
104    pub incremental: bool,
105    /// Compression for snapshots
106    pub compression: bool,
107    /// Maximum concurrent snapshots
108    pub max_concurrent: usize,
109}
110
111impl Default for SnapshotConfig {
112    fn default() -> Self {
113        SnapshotConfig {
114            auto_snapshot: true,
115            interval: 50000,
116            incremental: true,
117            compression: true,
118            max_concurrent: 2,
119        }
120    }
121}
122
123/// Raft node states
124#[derive(Debug, Clone, PartialEq)]
125pub enum NodeState {
126    Follower,
127    Candidate,
128    Leader,
129    Learner,
130}
131
132/// Log entry types
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub enum LogEntry {
135    /// Add triple
136    AddTriple(Triple),
137    /// Remove triple
138    RemoveTriple(Triple),
139    /// Batch add
140    BatchAdd(Vec<Triple>),
141    /// Batch remove
142    BatchRemove(Vec<Triple>),
143    /// Configuration change
144    ConfigChange(ConfigChangeEntry),
145    /// Snapshot marker
146    SnapshotMarker(SnapshotInfo),
147    /// Compacted entry
148    CompactedEntry(CompactedData),
149}
150
151/// Configuration change entry
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct ConfigChangeEntry {
154    /// Change type
155    pub change_type: ConfigChangeType,
156    /// Peer info
157    pub peer: RaftPeer,
158}
159
160/// Configuration change types
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub enum ConfigChangeType {
163    AddNode,
164    RemoveNode,
165    PromoteToVoter,
166    DemoteToLearner,
167}
168
169/// Snapshot information
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct SnapshotInfo {
172    /// Snapshot ID
173    pub id: String,
174    /// Index of last included entry
175    pub last_index: u64,
176    /// Term of last included entry
177    pub last_term: u64,
178    /// Snapshot size
179    pub size: usize,
180    /// Checksum
181    pub checksum: String,
182}
183
184/// Compacted log data
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct CompactedData {
187    /// Start index
188    pub start_index: u64,
189    /// End index
190    pub end_index: u64,
191    /// Compacted size
192    pub size: usize,
193    /// Delta-compressed data
194    pub data: Vec<u8>,
195    /// Reference snapshot
196    pub base_snapshot: Option<String>,
197}
198
199/// Raft message types
200#[derive(Debug)]
201pub enum RaftMessage {
202    /// Request vote
203    VoteRequest {
204        term: u64,
205        candidate_id: String,
206        last_log_index: u64,
207        last_log_term: u64,
208    },
209    /// Vote response
210    VoteResponse { term: u64, vote_granted: bool },
211    /// Append entries (heartbeat/replication)
212    AppendEntries {
213        term: u64,
214        _leader_id: String,
215        prev_log_index: u64,
216        prev_log_term: u64,
217        entries: Vec<RaftLogEntry>,
218        leader_commit: u64,
219    },
220    /// Append entries response
221    AppendResponse {
222        term: u64,
223        success: bool,
224        match_index: u64,
225        conflict_term: Option<u64>,
226        conflict_index: Option<u64>,
227    },
228    /// Install snapshot
229    InstallSnapshot {
230        term: u64,
231        _leader_id: String,
232        last_included_index: u64,
233        last_included_term: u64,
234        offset: u64,
235        data: Vec<u8>,
236        done: bool,
237    },
238    /// Client request
239    ClientRequest {
240        request_id: String,
241        entry: LogEntry,
242        response_tx: oneshot::Sender<Result<(), OxirsError>>,
243    },
244}
245
246/// Raft log entry with metadata
247#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct RaftLogEntry {
249    /// Entry index
250    pub index: u64,
251    /// Entry term
252    pub term: u64,
253    /// Entry data
254    pub entry: LogEntry,
255    /// Timestamp
256    pub timestamp: u64,
257}
258
259/// Raft node implementation
260pub struct RaftNode {
261    /// Configuration
262    config: RaftConfig,
263    /// Current state
264    state: Arc<RwLock<NodeState>>,
265    /// Current term
266    current_term: Arc<RwLock<u64>>,
267    /// Voted for
268    voted_for: Arc<RwLock<Option<String>>>,
269    /// Log entries
270    log: Arc<RwLock<RaftLog>>,
271    /// Commit index
272    commit_index: Arc<RwLock<u64>>,
273    /// Last applied
274    last_applied: Arc<RwLock<u64>>,
275    /// Leader state
276    leader_state: Arc<RwLock<Option<LeaderState>>>,
277    /// Message channels
278    message_tx: mpsc::Sender<RaftMessage>,
279    message_rx: Arc<Mutex<mpsc::Receiver<RaftMessage>>>,
280    /// Shutdown signal
281    shutdown: Arc<RwLock<bool>>,
282    /// Statistics
283    stats: Arc<RwLock<RaftStats>>,
284}
285
286/// Raft log with optimized compaction
287struct RaftLog {
288    /// Log entries
289    entries: VecDeque<RaftLogEntry>,
290    /// Compacted entries
291    compacted: HashMap<u64, CompactedData>,
292    /// Snapshot metadata
293    snapshots: HashMap<String, SnapshotInfo>,
294    /// Start index (after compaction)
295    start_index: u64,
296    /// Compaction state
297    compaction_state: CompactionState,
298}
299
300/// Compaction state
301struct CompactionState {
302    /// Last compaction index
303    last_compacted: u64,
304    /// Pending compaction
305    pending: Option<CompactionJob>,
306    /// Compaction statistics
307    stats: CompactionStats,
308}
309
310/// Compaction job
311#[allow(dead_code)]
312struct CompactionJob {
313    /// Start index
314    start: u64,
315    /// End index
316    end: u64,
317    /// Start time
318    start_time: Instant,
319}
320
321/// Compaction statistics
322#[derive(Debug, Default)]
323struct CompactionStats {
324    /// Total compactions
325    total_compactions: u64,
326    /// Entries compacted
327    entries_compacted: u64,
328    /// Space saved
329    space_saved_bytes: u64,
330    /// Compression ratio
331    compression_ratio: f64,
332}
333
334/// Leader-specific state
335struct LeaderState {
336    /// Next index for each peer
337    next_index: HashMap<String, u64>,
338    /// Match index for each peer
339    match_index: HashMap<String, u64>,
340    /// Replication progress
341    replication_progress: HashMap<String, ReplicationProgress>,
342    /// Pending client requests
343    pending_requests: HashMap<String, PendingRequest>,
344}
345
346/// Replication progress tracking
347struct ReplicationProgress {
348    /// Last sent time
349    last_sent: Instant,
350    /// Consecutive failures
351    failures: u32,
352    /// In-flight entries
353    in_flight: u64,
354    /// Bandwidth estimate
355    bandwidth_bps: f64,
356}
357
358/// Pending client request
359struct PendingRequest {
360    /// Request ID
361    request_id: String,
362    /// Log index
363    log_index: u64,
364    /// Response channel
365    response_tx: oneshot::Sender<Result<(), OxirsError>>,
366    /// Timeout
367    timeout: Instant,
368}
369
370/// Raft statistics
371#[derive(Debug, Default)]
372struct RaftStats {
373    /// Elections held
374    elections_held: u64,
375    /// Elections won
376    elections_won: u64,
377    /// Messages sent
378    messages_sent: u64,
379    /// Messages received
380    messages_received: u64,
381    /// Entries replicated
382    entries_replicated: u64,
383    /// Snapshots sent
384    snapshots_sent: u64,
385    /// Snapshots received
386    snapshots_received: u64,
387}
388
389impl RaftNode {
390    /// Create new Raft node
391    pub async fn new(config: RaftConfig) -> Result<Self, OxirsError> {
392        let (message_tx, message_rx) = mpsc::channel(10000);
393
394        Ok(RaftNode {
395            config,
396            state: Arc::new(RwLock::new(NodeState::Follower)),
397            current_term: Arc::new(RwLock::new(0)),
398            voted_for: Arc::new(RwLock::new(None)),
399            log: Arc::new(RwLock::new(RaftLog::new())),
400            commit_index: Arc::new(RwLock::new(0)),
401            last_applied: Arc::new(RwLock::new(0)),
402            leader_state: Arc::new(RwLock::new(None)),
403            message_tx,
404            message_rx: Arc::new(Mutex::new(message_rx)),
405            shutdown: Arc::new(RwLock::new(false)),
406            stats: Arc::new(RwLock::new(RaftStats::default())),
407        })
408    }
409
410    /// Start Raft node
411    pub async fn start(&self) -> Result<(), OxirsError> {
412        // Start message processing
413        self.spawn_message_processor();
414
415        // Start election timer
416        self.spawn_election_timer();
417
418        // Start heartbeat timer (for leaders)
419        self.spawn_heartbeat_timer();
420
421        // Start log compaction
422        if self.config.compaction.auto_compact {
423            self.spawn_compaction_worker();
424        }
425
426        // Start snapshot manager
427        if self.config.snapshot.auto_snapshot {
428            self.spawn_snapshot_manager();
429        }
430
431        Ok(())
432    }
433
434    /// Submit a client request
435    pub async fn submit(&self, entry: LogEntry) -> Result<(), OxirsError> {
436        let state = self.state.read().await;
437        if *state != NodeState::Leader {
438            return Err(OxirsError::Store("Not the leader".to_string()));
439        }
440
441        let request_id = uuid::Uuid::new_v4().to_string();
442        let (response_tx, response_rx) = oneshot::channel();
443
444        self.message_tx
445            .send(RaftMessage::ClientRequest {
446                request_id,
447                entry,
448                response_tx,
449            })
450            .await
451            .map_err(|_| OxirsError::Store("Failed to send request".to_string()))?;
452
453        response_rx
454            .await
455            .map_err(|_| OxirsError::Store("Request cancelled".to_string()))?
456    }
457
458    /// Query committed data
459    pub async fn query(&self, pattern: &TriplePattern) -> Result<Vec<Triple>, OxirsError> {
460        let log = self.log.read().await;
461        let last_applied = *self.last_applied.read().await;
462
463        let mut results = Vec::new();
464        let mut current_state = HashSet::new();
465
466        // Apply log entries up to last_applied
467        for entry in &log.entries {
468            if entry.index > last_applied {
469                break;
470            }
471
472            match &entry.entry {
473                LogEntry::AddTriple(triple) => {
474                    current_state.insert(triple.clone());
475                }
476                LogEntry::RemoveTriple(triple) => {
477                    current_state.remove(triple);
478                }
479                LogEntry::BatchAdd(triples) => {
480                    for triple in triples {
481                        current_state.insert(triple.clone());
482                    }
483                }
484                LogEntry::BatchRemove(triples) => {
485                    for triple in triples {
486                        current_state.remove(triple);
487                    }
488                }
489                _ => {}
490            }
491        }
492
493        // Filter by pattern
494        for triple in current_state {
495            if pattern.matches(&triple) {
496                results.push(triple);
497            }
498        }
499
500        Ok(results)
501    }
502
503    /// Spawn message processor
504    fn spawn_message_processor(&self) {
505        let message_rx = self.message_rx.clone();
506        let state = self.state.clone();
507        let current_term = self.current_term.clone();
508        let voted_for = self.voted_for.clone();
509        let log = self.log.clone();
510        let commit_index = self.commit_index.clone();
511        let leader_state = self.leader_state.clone();
512        let stats = self.stats.clone();
513        let node_id = self.config.node_id.clone();
514
515        tokio::spawn(async move {
516            let mut rx = message_rx.lock().await;
517            while let Some(message) = rx.recv().await {
518                let mut stats_guard = stats.write().await;
519                stats_guard.messages_received += 1;
520                drop(stats_guard);
521
522                match message {
523                    RaftMessage::VoteRequest {
524                        term,
525                        candidate_id,
526                        last_log_index,
527                        last_log_term,
528                    } => {
529                        Self::handle_vote_request(
530                            &state,
531                            &current_term,
532                            &voted_for,
533                            &log,
534                            &node_id,
535                            VoteRequestParams {
536                                request_term: term,
537                                candidate_id,
538                                last_log_index,
539                                last_log_term,
540                            },
541                        )
542                        .await;
543                    }
544                    RaftMessage::AppendEntries {
545                        term,
546                        _leader_id,
547                        prev_log_index,
548                        prev_log_term,
549                        entries,
550                        leader_commit,
551                    } => {
552                        Self::handle_append_entries(
553                            &state,
554                            &current_term,
555                            &log,
556                            &commit_index,
557                            AppendEntriesParams {
558                                request_term: term,
559                                leader_id: _leader_id,
560                                prev_log_index,
561                                prev_log_term,
562                                entries,
563                                leader_commit,
564                            },
565                        )
566                        .await;
567                    }
568                    RaftMessage::ClientRequest {
569                        request_id,
570                        entry,
571                        response_tx,
572                    } => {
573                        Self::handle_client_request(
574                            &state,
575                            &current_term,
576                            &log,
577                            &leader_state,
578                            request_id,
579                            entry,
580                            response_tx,
581                        )
582                        .await;
583                    }
584                    _ => {}
585                }
586            }
587        });
588    }
589
590    /// Spawn election timer
591    fn spawn_election_timer(&self) {
592        let state = self.state.clone();
593        let current_term = self.current_term.clone();
594        let voted_for = self.voted_for.clone();
595        let config = self.config.clone();
596        let shutdown = self.shutdown.clone();
597        let stats = self.stats.clone();
598
599        tokio::spawn(async move {
600            #[allow(unused_imports)]
601            use scirs2_core::random::{Random, Rng};
602
603            loop {
604                // Random election timeout
605                let timeout = {
606                    let mut random = Random::default();
607                    random.gen_range(config.election_timeout.0..config.election_timeout.1)
608                };
609                tokio::time::sleep(Duration::from_millis(timeout)).await;
610
611                if *shutdown.read().await {
612                    break;
613                }
614
615                let current_state = state.read().await.clone();
616                if current_state == NodeState::Follower || current_state == NodeState::Candidate {
617                    // Start election
618                    Self::start_election(&state, &current_term, &voted_for, &config, &stats).await;
619                }
620            }
621        });
622    }
623
624    /// Spawn heartbeat timer
625    fn spawn_heartbeat_timer(&self) {
626        let state = self.state.clone();
627        let config = self.config.clone();
628        let shutdown = self.shutdown.clone();
629
630        tokio::spawn(async move {
631            let mut interval = interval(Duration::from_millis(config.heartbeat_interval));
632
633            loop {
634                interval.tick().await;
635
636                if *shutdown.read().await {
637                    break;
638                }
639
640                let current_state = state.read().await.clone();
641                if current_state == NodeState::Leader {
642                    // Send heartbeats
643                    Self::send_heartbeats(&config).await;
644                }
645            }
646        });
647    }
648
649    /// Spawn compaction worker
650    fn spawn_compaction_worker(&self) {
651        let log = self.log.clone();
652        let config = self.config.clone();
653        let shutdown = self.shutdown.clone();
654
655        tokio::spawn(async move {
656            let mut interval = interval(Duration::from_secs(60)); // Check every minute
657
658            loop {
659                interval.tick().await;
660
661                if *shutdown.read().await {
662                    break;
663                }
664
665                // Check if compaction needed
666                let mut log_guard = log.write().await;
667                if log_guard.entries.len() > config.compaction.threshold {
668                    Self::compact_log(&mut log_guard, &config.compaction).await;
669                }
670            }
671        });
672    }
673
674    /// Spawn snapshot manager
675    fn spawn_snapshot_manager(&self) {
676        let log = self.log.clone();
677        let commit_index = self.commit_index.clone();
678        let config = self.config.clone();
679        let shutdown = self.shutdown.clone();
680
681        tokio::spawn(async move {
682            let mut interval = interval(Duration::from_secs(300)); // Check every 5 minutes
683
684            loop {
685                interval.tick().await;
686
687                if *shutdown.read().await {
688                    break;
689                }
690
691                let current_commit = *commit_index.read().await;
692                let log_guard = log.read().await;
693
694                // Check if snapshot needed
695                if let Some(last_snapshot) =
696                    log_guard.snapshots.values().max_by_key(|s| s.last_index)
697                {
698                    if current_commit - last_snapshot.last_index > config.snapshot.interval as u64 {
699                        drop(log_guard);
700                        Self::create_snapshot(&log, current_commit, &config.snapshot).await;
701                    }
702                } else if current_commit > config.snapshot.interval as u64 {
703                    drop(log_guard);
704                    Self::create_snapshot(&log, current_commit, &config.snapshot).await;
705                }
706            }
707        });
708    }
709
710    /// Handle vote request
711    async fn handle_vote_request(
712        state: &Arc<RwLock<NodeState>>,
713        current_term: &Arc<RwLock<u64>>,
714        voted_for: &Arc<RwLock<Option<String>>>,
715        log: &Arc<RwLock<RaftLog>>,
716        node_id: &str,
717        request: VoteRequestParams,
718    ) {
719        let mut term = current_term.write().await;
720        let mut voted = voted_for.write().await;
721
722        // Update term if needed
723        if request.request_term > *term {
724            *term = request.request_term;
725            *voted = None;
726            *state.write().await = NodeState::Follower;
727        }
728
729        // Check if we can vote
730        let vote_granted = if request.request_term < *term
731            || (voted.as_ref().is_some_and(|v| v != &request.candidate_id))
732        {
733            false
734        } else {
735            // Check log up-to-date
736            let log_guard = log.read().await;
737            let our_last_index = log_guard.last_index();
738            let our_last_term = log_guard.last_term();
739            drop(log_guard);
740
741            request.last_log_term > our_last_term
742                || (request.last_log_term == our_last_term
743                    && request.last_log_index >= our_last_index)
744        };
745
746        if vote_granted {
747            *voted = Some(request.candidate_id);
748        }
749
750        // Send response (would use actual networking)
751        tracing::info!(
752            "Node {} vote response: term={}, granted={}",
753            node_id,
754            *term,
755            vote_granted
756        );
757    }
758
759    /// Handle append entries
760    async fn handle_append_entries(
761        state: &Arc<RwLock<NodeState>>,
762        current_term: &Arc<RwLock<u64>>,
763        log: &Arc<RwLock<RaftLog>>,
764        commit_index: &Arc<RwLock<u64>>,
765        request: AppendEntriesParams,
766    ) {
767        let mut term = current_term.write().await;
768
769        // Update term if needed
770        if request.request_term > *term {
771            *term = request.request_term;
772            *state.write().await = NodeState::Follower;
773        }
774
775        // Reject if term is old
776        if request.request_term < *term {
777            return;
778        }
779
780        // Reset to follower
781        *state.write().await = NodeState::Follower;
782
783        // Check log consistency
784        let mut log_guard = log.write().await;
785        let success = if request.prev_log_index == 0 {
786            true
787        } else if let Some(entry) = log_guard.get(request.prev_log_index) {
788            entry.term == request.prev_log_term
789        } else {
790            false
791        };
792
793        if success {
794            // Append entries
795            for entry in request.entries {
796                log_guard.append(entry);
797            }
798
799            // Update commit index
800            if request.leader_commit > *commit_index.read().await {
801                let last_index = log_guard.last_index();
802                *commit_index.write().await = request.leader_commit.min(last_index);
803            }
804        }
805    }
806
807    /// Handle client request
808    async fn handle_client_request(
809        state: &Arc<RwLock<NodeState>>,
810        current_term: &Arc<RwLock<u64>>,
811        log: &Arc<RwLock<RaftLog>>,
812        leader_state: &Arc<RwLock<Option<LeaderState>>>,
813        request_id: String,
814        entry: LogEntry,
815        response_tx: oneshot::Sender<Result<(), OxirsError>>,
816    ) {
817        let current_state = state.read().await.clone();
818        if current_state != NodeState::Leader {
819            let _ = response_tx.send(Err(OxirsError::Store("Not the leader".to_string())));
820            return;
821        }
822
823        // Append to log
824        let term = *current_term.read().await;
825        let mut log_guard = log.write().await;
826        let index = log_guard.next_index();
827
828        let raft_entry = RaftLogEntry {
829            index,
830            term,
831            entry,
832            timestamp: std::time::SystemTime::now()
833                .duration_since(std::time::UNIX_EPOCH)
834                .expect("SystemTime should be after UNIX_EPOCH")
835                .as_secs(),
836        };
837
838        log_guard.append(raft_entry);
839        drop(log_guard);
840
841        // Track pending request
842        if let Some(ref mut leader) = *leader_state.write().await {
843            leader.pending_requests.insert(
844                request_id.clone(),
845                PendingRequest {
846                    request_id,
847                    log_index: index,
848                    response_tx,
849                    timeout: Instant::now() + Duration::from_secs(5),
850                },
851            );
852        }
853    }
854
855    /// Start election
856    async fn start_election(
857        state: &Arc<RwLock<NodeState>>,
858        current_term: &Arc<RwLock<u64>>,
859        voted_for: &Arc<RwLock<Option<String>>>,
860        config: &RaftConfig,
861        stats: &Arc<RwLock<RaftStats>>,
862    ) {
863        *state.write().await = NodeState::Candidate;
864        let mut term = current_term.write().await;
865        *term += 1;
866        *voted_for.write().await = Some(config.node_id.clone());
867
868        let mut stats_guard = stats.write().await;
869        stats_guard.elections_held += 1;
870
871        tracing::info!(
872            "Node {} starting election for term {}",
873            config.node_id,
874            *term
875        );
876
877        // In real implementation, would send vote requests to all peers
878        // and handle responses
879    }
880
881    /// Send heartbeats
882    async fn send_heartbeats(config: &RaftConfig) {
883        // In real implementation, would send heartbeat messages to all followers
884        tracing::debug!("Leader {} sending heartbeats", config.node_id);
885    }
886
887    /// Compact log
888    async fn compact_log(log: &mut RaftLog, config: &CompactionConfig) {
889        if log.entries.len() <= config.min_entries {
890            return;
891        }
892        let entries_to_compact = log.entries.len() - config.min_entries;
893
894        tracing::info!(
895            "Starting log compaction, compacting {} entries",
896            entries_to_compact
897        );
898
899        // Create compaction job
900        let start_index = log.start_index;
901        let end_index = start_index + entries_to_compact as u64;
902
903        log.compaction_state.pending = Some(CompactionJob {
904            start: start_index,
905            end: end_index,
906            start_time: Instant::now(),
907        });
908
909        // Perform compaction (simplified)
910        let mut compacted_data = Vec::new();
911        let mut removed_entries = Vec::new();
912
913        for _ in 0..entries_to_compact {
914            if let Some(entry) = log.entries.pop_front() {
915                // Serialize entry for compaction
916                let serialized = oxicode::serde::encode_to_vec(&entry, oxicode::config::standard())
917                    .expect("serialization should succeed for valid entry");
918                compacted_data.extend_from_slice(&serialized);
919                removed_entries.push(entry);
920            }
921        }
922
923        // Apply compression
924        let compressed = if config.delta_compression {
925            // Delta compression would go here
926            oxiarc_zstd::encode_all(&compacted_data, 3).expect("zstd compression should succeed")
927        } else {
928            compacted_data
929        };
930
931        // Store compacted data
932        let compressed_size = compressed.len();
933        let compacted = CompactedData {
934            start_index,
935            end_index,
936            size: compressed_size,
937            data: compressed,
938            base_snapshot: None,
939        };
940
941        log.compacted.insert(start_index, compacted);
942        log.start_index = end_index;
943
944        // Update compaction state
945        log.compaction_state.last_compacted = end_index;
946        log.compaction_state.pending = None;
947        log.compaction_state.stats.total_compactions += 1;
948        log.compaction_state.stats.entries_compacted += entries_to_compact as u64;
949        log.compaction_state.stats.space_saved_bytes +=
950            (removed_entries.len() * std::mem::size_of::<RaftLogEntry>()) as u64
951                - compressed_size as u64;
952
953        tracing::info!(
954            "Log compaction completed, saved {} bytes",
955            log.compaction_state.stats.space_saved_bytes
956        );
957    }
958
959    /// Create snapshot
960    async fn create_snapshot(
961        log: &Arc<RwLock<RaftLog>>,
962        last_index: u64,
963        _config: &SnapshotConfig,
964    ) {
965        tracing::info!("Creating snapshot at index {}", last_index);
966
967        // In real implementation, would create actual snapshot
968        let snapshot_id = uuid::Uuid::new_v4().to_string();
969        let snapshot_info = SnapshotInfo {
970            id: snapshot_id.clone(),
971            last_index,
972            last_term: 0, // Would get from log
973            size: 0,      // Would calculate
974            checksum: "dummy".to_string(),
975        };
976
977        let mut log_guard = log.write().await;
978        log_guard.snapshots.insert(snapshot_id, snapshot_info);
979    }
980}
981
982impl RaftLog {
983    /// Create new log
984    fn new() -> Self {
985        RaftLog {
986            entries: VecDeque::new(),
987            compacted: HashMap::new(),
988            snapshots: HashMap::new(),
989            start_index: 1,
990            compaction_state: CompactionState {
991                last_compacted: 0,
992                pending: None,
993                stats: CompactionStats::default(),
994            },
995        }
996    }
997
998    /// Get entry at index
999    fn get(&self, index: u64) -> Option<&RaftLogEntry> {
1000        if index < self.start_index {
1001            // Entry is compacted
1002            None
1003        } else {
1004            let offset = (index - self.start_index) as usize;
1005            self.entries.get(offset)
1006        }
1007    }
1008
1009    /// Append entry
1010    fn append(&mut self, entry: RaftLogEntry) {
1011        self.entries.push_back(entry);
1012    }
1013
1014    /// Get last index
1015    fn last_index(&self) -> u64 {
1016        if self.entries.is_empty() {
1017            self.start_index - 1
1018        } else {
1019            self.start_index + self.entries.len() as u64 - 1
1020        }
1021    }
1022
1023    /// Get last term
1024    fn last_term(&self) -> u64 {
1025        self.entries.back().map(|e| e.term).unwrap_or(0)
1026    }
1027
1028    /// Get next index
1029    fn next_index(&self) -> u64 {
1030        self.last_index() + 1
1031    }
1032}
1033
1034/// Raft storage trait for persistence
1035#[async_trait]
1036pub trait RaftStorage: Send + Sync {
1037    /// Save current term
1038    async fn save_term(&mut self, term: u64) -> Result<(), OxirsError>;
1039
1040    /// Load current term
1041    async fn load_term(&self) -> Result<u64, OxirsError>;
1042
1043    /// Save voted for
1044    async fn save_voted_for(&mut self, voted_for: Option<String>) -> Result<(), OxirsError>;
1045
1046    /// Load voted for
1047    async fn load_voted_for(&self) -> Result<Option<String>, OxirsError>;
1048
1049    /// Append log entries
1050    async fn append_entries(&mut self, entries: Vec<RaftLogEntry>) -> Result<(), OxirsError>;
1051
1052    /// Load log entries
1053    async fn load_entries(&self, start: u64, end: u64) -> Result<Vec<RaftLogEntry>, OxirsError>;
1054
1055    /// Save snapshot
1056    async fn save_snapshot(
1057        &mut self,
1058        snapshot: SnapshotInfo,
1059        data: Vec<u8>,
1060    ) -> Result<(), OxirsError>;
1061
1062    /// Load snapshot
1063    async fn load_snapshot(&self, id: &str) -> Result<(SnapshotInfo, Vec<u8>), OxirsError>;
1064}
1065
1066#[cfg(test)]
1067mod tests {
1068    use super::*;
1069    use crate::model::{Literal, NamedNode};
1070
1071    #[tokio::test]
1072    async fn test_raft_node_creation() {
1073        let config = RaftConfig {
1074            node_id: "node1".to_string(),
1075            peers: vec![],
1076            election_timeout: (150, 300),
1077            heartbeat_interval: 50,
1078            compaction: CompactionConfig::default(),
1079            snapshot: SnapshotConfig::default(),
1080            storage_path: "/tmp/raft_test".to_string(),
1081        };
1082
1083        let node = RaftNode::new(config)
1084            .await
1085            .expect("async operation should succeed");
1086
1087        // Check initial state
1088        assert_eq!(*node.state.read().await, NodeState::Follower);
1089        assert_eq!(*node.current_term.read().await, 0);
1090        assert_eq!(*node.commit_index.read().await, 0);
1091    }
1092
1093    #[tokio::test]
1094    async fn test_log_operations() {
1095        let mut log = RaftLog::new();
1096
1097        // Add entries
1098        for i in 1..=10 {
1099            let entry = RaftLogEntry {
1100                index: i,
1101                term: 1,
1102                entry: LogEntry::AddTriple(Triple::new(
1103                    NamedNode::new(format!("http://example.org/s{i}"))
1104                        .expect("valid IRI from format"),
1105                    NamedNode::new("http://example.org/p").expect("valid IRI"),
1106                    crate::model::Object::Literal(Literal::new(format!("value{i}"))),
1107                )),
1108                timestamp: i,
1109            };
1110            log.append(entry);
1111        }
1112
1113        assert_eq!(log.last_index(), 10);
1114        assert_eq!(log.last_term(), 1);
1115        assert_eq!(log.entries.len(), 10);
1116
1117        // Test get
1118        assert!(log.get(5).is_some());
1119        assert_eq!(log.get(5).expect("index should be valid").index, 5);
1120    }
1121
1122    #[tokio::test]
1123    async fn test_log_compaction() {
1124        let mut log = RaftLog::new();
1125
1126        // Add many entries
1127        for i in 1..=100 {
1128            let entry = RaftLogEntry {
1129                index: i,
1130                term: 1,
1131                entry: LogEntry::AddTriple(Triple::new(
1132                    NamedNode::new(format!("http://example.org/s{i}"))
1133                        .expect("valid IRI from format"),
1134                    NamedNode::new("http://example.org/p").expect("valid IRI"),
1135                    crate::model::Object::Literal(Literal::new(format!("value{i}"))),
1136                )),
1137                timestamp: i,
1138            };
1139            log.append(entry);
1140        }
1141
1142        let config = CompactionConfig {
1143            auto_compact: true,
1144            threshold: 50,
1145            min_entries: 10,
1146            delta_compression: true,
1147            batch_size: 10,
1148        };
1149
1150        // Compact log
1151        RaftNode::compact_log(&mut log, &config).await;
1152
1153        // Check compaction
1154        assert!(log.entries.len() <= config.min_entries);
1155        assert!(log.start_index > 1);
1156        assert!(!log.compacted.is_empty());
1157        assert_eq!(log.compaction_state.stats.total_compactions, 1);
1158    }
1159}