Skip to main content

aegis_replication/
engine.rs

1//! Aegis Replication Engine
2//!
3//! Main replication engine coordinating Raft consensus and cluster management.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::cluster::{Cluster, ClusterConfig, ClusterError, ClusterState, ClusterStats};
9use crate::node::{NodeHealth, NodeId, NodeInfo, NodeRole};
10use crate::raft::{AppendEntriesRequest, RaftConfig, RaftNode, VoteRequest, VoteResponse};
11use crate::state::{Command, CommandResult};
12use crate::transport::{Message, MessagePayload, Transport};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16// =============================================================================
17// Replication Engine Configuration
18// =============================================================================
19
20/// Configuration for the replication engine.
21#[derive(Debug, Clone)]
22pub struct ReplicationConfig {
23    pub raft: RaftConfig,
24    pub cluster: ClusterConfig,
25    pub tick_interval: Duration,
26    pub apply_batch_size: usize,
27}
28
29impl Default for ReplicationConfig {
30    fn default() -> Self {
31        Self {
32            raft: RaftConfig::default(),
33            cluster: ClusterConfig::default(),
34            tick_interval: Duration::from_millis(10),
35            apply_batch_size: 100,
36        }
37    }
38}
39
40impl ReplicationConfig {
41    pub fn new() -> Self {
42        Self::default()
43    }
44
45    pub fn with_raft(mut self, raft: RaftConfig) -> Self {
46        self.raft = raft;
47        self
48    }
49
50    pub fn with_cluster(mut self, cluster: ClusterConfig) -> Self {
51        self.cluster = cluster;
52        self
53    }
54}
55
56// =============================================================================
57// Replication Engine
58// =============================================================================
59
60/// The main replication engine.
61pub struct ReplicationEngine {
62    config: ReplicationConfig,
63    raft: Arc<RaftNode>,
64    cluster: Arc<Cluster>,
65    last_tick: Instant,
66    pending_proposals: Vec<PendingProposal>,
67}
68
69/// A pending proposal waiting to be committed.
70#[derive(Debug)]
71#[allow(dead_code)]
72struct PendingProposal {
73    index: u64,
74    command: Command,
75    proposed_at: Instant,
76}
77
78impl ReplicationEngine {
79    /// Create a new replication engine.
80    pub fn new(local_node: NodeInfo, config: ReplicationConfig) -> Self {
81        let node_id = local_node.id.clone();
82        let raft = Arc::new(RaftNode::new(node_id, config.raft.clone()));
83        let cluster = Arc::new(Cluster::new(local_node, config.cluster.clone()));
84
85        Self {
86            config,
87            raft,
88            cluster,
89            last_tick: Instant::now(),
90            pending_proposals: Vec::new(),
91        }
92    }
93
94    /// Get the node ID.
95    pub fn node_id(&self) -> NodeId {
96        self.raft.id()
97    }
98
99    /// Get the current role.
100    pub fn role(&self) -> NodeRole {
101        self.raft.role()
102    }
103
104    /// Check if this node is the leader.
105    pub fn is_leader(&self) -> bool {
106        self.raft.is_leader()
107    }
108
109    /// Get the current leader ID.
110    pub fn leader_id(&self) -> Option<NodeId> {
111        self.raft.leader_id()
112    }
113
114    /// Get the current term.
115    pub fn current_term(&self) -> u64 {
116        self.raft.current_term()
117    }
118
119    /// Get cluster state.
120    pub fn cluster_state(&self) -> ClusterState {
121        self.cluster.state()
122    }
123
124    /// Get cluster stats.
125    pub fn cluster_stats(&self) -> ClusterStats {
126        self.cluster.stats()
127    }
128
129    // =========================================================================
130    // Cluster Management
131    // =========================================================================
132
133    /// Add a peer to the cluster.
134    pub fn add_peer(&self, peer: NodeInfo) -> Result<(), ClusterError> {
135        let peer_id = peer.id.clone();
136        self.cluster.add_node(peer)?;
137        self.raft.add_peer(peer_id);
138        Ok(())
139    }
140
141    /// Remove a peer from the cluster.
142    pub fn remove_peer(&self, peer_id: &NodeId) -> Result<NodeInfo, ClusterError> {
143        self.raft.remove_peer(peer_id);
144        self.cluster.remove_node(peer_id)
145    }
146
147    /// Get all peers.
148    pub fn peers(&self) -> Vec<NodeInfo> {
149        self.cluster.peers()
150    }
151
152    /// Get peer IDs.
153    pub fn peer_ids(&self) -> Vec<NodeId> {
154        self.cluster.peer_ids()
155    }
156
157    // =========================================================================
158    // State Machine Operations
159    // =========================================================================
160
161    /// Propose a command to the cluster.
162    pub fn propose(&mut self, command: Command) -> Result<u64, ReplicationError> {
163        if !self.is_leader() {
164            return Err(ReplicationError::NotLeader(self.leader_id()));
165        }
166
167        let index = self
168            .raft
169            .propose(command.clone())
170            .map_err(ReplicationError::ProposalFailed)?;
171
172        self.pending_proposals.push(PendingProposal {
173            index,
174            command,
175            proposed_at: Instant::now(),
176        });
177
178        Ok(index)
179    }
180
181    /// Get a value from the state machine.
182    pub fn get(&self, key: &str) -> Option<Vec<u8>> {
183        self.raft.get(key)
184    }
185
186    /// Set a value (proposes to cluster if leader).
187    pub fn set(&mut self, key: impl Into<String>, value: Vec<u8>) -> Result<u64, ReplicationError> {
188        let command = Command::set(key, value);
189        self.propose(command)
190    }
191
192    /// Delete a value (proposes to cluster if leader).
193    pub fn delete(&mut self, key: impl Into<String>) -> Result<u64, ReplicationError> {
194        let command = Command::delete(key);
195        self.propose(command)
196    }
197
198    /// Apply committed entries to the state machine.
199    pub fn apply_committed(&self) -> Vec<CommandResult> {
200        self.raft.apply_committed()
201    }
202
203    // =========================================================================
204    // Tick / Event Loop
205    // =========================================================================
206
207    /// Process a tick of the event loop.
208    pub fn tick(&mut self) -> TickResult {
209        let mut result = TickResult::default();
210
211        let elapsed = self.last_tick.elapsed();
212        if elapsed < self.config.tick_interval {
213            return result;
214        }
215        self.last_tick = Instant::now();
216
217        match self.raft.role() {
218            NodeRole::Follower | NodeRole::Candidate => {
219                if self.raft.election_timeout_elapsed() {
220                    result.should_start_election = true;
221                }
222            }
223            NodeRole::Leader => {
224                result.should_send_heartbeats = true;
225            }
226        }
227
228        let failed = self.cluster.check_failures();
229        result.failed_nodes = failed;
230
231        let applied = self.apply_committed();
232        result.applied_count = applied.len();
233
234        self.cleanup_pending_proposals();
235
236        result
237    }
238
239    /// Start an election.
240    pub fn start_election(&self) -> VoteRequest {
241        self.raft.start_election()
242    }
243
244    /// Handle a vote request.
245    pub fn handle_vote_request(&self, request: &VoteRequest) -> VoteResponse {
246        let response = self.raft.handle_vote_request(request);
247        self.sync_role_to_cluster();
248        response
249    }
250
251    /// Handle a vote response.
252    pub fn handle_vote_response(&self, response: &VoteResponse) -> bool {
253        let became_leader = self.raft.handle_vote_response(response);
254        if became_leader {
255            self.cluster.set_leader(Some(self.node_id()));
256            self.cluster
257                .set_node_role(&self.node_id(), NodeRole::Leader);
258        }
259        self.sync_role_to_cluster();
260        became_leader
261    }
262
263    /// Create append entries requests for all peers.
264    pub fn create_append_entries_for_peers(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
265        self.peer_ids()
266            .into_iter()
267            .filter_map(|peer_id| {
268                self.raft
269                    .create_append_entries(&peer_id)
270                    .map(|req| (peer_id, req))
271            })
272            .collect()
273    }
274
275    /// Handle an append entries request.
276    pub fn handle_append_entries(
277        &self,
278        request: &AppendEntriesRequest,
279    ) -> crate::raft::AppendEntriesResponse {
280        let response = self.raft.handle_append_entries(request);
281
282        if response.success {
283            self.cluster.set_leader(Some(request.leader_id.clone()));
284            self.cluster.heartbeat(&request.leader_id);
285        }
286
287        self.sync_role_to_cluster();
288        response
289    }
290
291    /// Handle an append entries response.
292    pub fn handle_append_entries_response(
293        &self,
294        peer_id: &NodeId,
295        response: &crate::raft::AppendEntriesResponse,
296    ) {
297        self.raft.handle_append_entries_response(peer_id, response);
298        self.sync_role_to_cluster();
299    }
300
301    fn sync_role_to_cluster(&self) {
302        let role = self.raft.role();
303        self.cluster.set_node_role(&self.node_id(), role);
304
305        if role != NodeRole::Leader && self.cluster.is_leader() {
306            self.cluster.set_leader(None);
307        }
308    }
309
310    fn cleanup_pending_proposals(&mut self) {
311        let timeout = Duration::from_secs(30);
312        self.pending_proposals
313            .retain(|p| p.proposed_at.elapsed() < timeout);
314    }
315
316    // =========================================================================
317    // Health Management
318    // =========================================================================
319
320    /// Update health for a node.
321    pub fn update_health(&self, health: NodeHealth) {
322        self.cluster.update_health(health);
323    }
324
325    /// Report heartbeat from a peer.
326    pub fn heartbeat(&self, peer_id: &NodeId) {
327        self.cluster.heartbeat(peer_id);
328        self.raft.reset_heartbeat();
329    }
330
331    // =========================================================================
332    // Transport Integration
333    // =========================================================================
334
335    /// Process incoming message.
336    pub fn process_message(&mut self, message: Message) -> Option<Message> {
337        match message.payload {
338            MessagePayload::VoteRequest(ref req) => {
339                let response = self.handle_vote_request(req);
340                Some(Message::vote_response(
341                    self.node_id(),
342                    message.from,
343                    response,
344                ))
345            }
346            MessagePayload::VoteResponse(ref resp) => {
347                self.handle_vote_response(resp);
348                None
349            }
350            MessagePayload::AppendEntries(ref req) => {
351                let response = self.handle_append_entries(req);
352                Some(Message::append_entries_response(
353                    self.node_id(),
354                    message.from,
355                    response,
356                ))
357            }
358            MessagePayload::AppendEntriesResponse(ref resp) => {
359                self.handle_append_entries_response(&message.from, resp);
360                None
361            }
362            MessagePayload::Heartbeat => {
363                self.heartbeat(&message.from);
364                None
365            }
366            _ => None,
367        }
368    }
369
370    /// Send messages to peers using transport.
371    pub fn send_heartbeats(&self, transport: &dyn Transport) {
372        if !self.is_leader() {
373            return;
374        }
375
376        for (peer_id, request) in self.create_append_entries_for_peers() {
377            let msg = Message::append_entries(self.node_id(), peer_id, request);
378            let _ = transport.send(msg);
379        }
380    }
381
382    /// Broadcast election request.
383    pub fn broadcast_election(&self, transport: &dyn Transport) {
384        let request = self.start_election();
385        for peer_id in self.peer_ids() {
386            let msg = Message::vote_request(self.node_id(), peer_id, request.clone());
387            let _ = transport.send(msg);
388        }
389    }
390
391    // =========================================================================
392    // Accessors
393    // =========================================================================
394
395    /// Get the Raft node.
396    pub fn raft(&self) -> &RaftNode {
397        &self.raft
398    }
399
400    /// Get the cluster.
401    pub fn cluster(&self) -> &Cluster {
402        &self.cluster
403    }
404
405    /// Get the configuration.
406    pub fn config(&self) -> &ReplicationConfig {
407        &self.config
408    }
409}
410
411// =============================================================================
412// Tick Result
413// =============================================================================
414
415/// Result of a tick operation.
416#[derive(Debug, Default)]
417pub struct TickResult {
418    pub should_start_election: bool,
419    pub should_send_heartbeats: bool,
420    pub failed_nodes: Vec<NodeId>,
421    pub applied_count: usize,
422}
423
424// =============================================================================
425// Replication Error
426// =============================================================================
427
428/// Errors that can occur during replication.
429#[derive(Debug)]
430pub enum ReplicationError {
431    NotLeader(Option<NodeId>),
432    ProposalFailed(String),
433    ClusterError(ClusterError),
434    Timeout,
435}
436
437impl std::fmt::Display for ReplicationError {
438    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
439        match self {
440            Self::NotLeader(leader) => match leader {
441                Some(id) => write!(f, "Not the leader, current leader: {}", id),
442                None => write!(f, "Not the leader, no leader elected"),
443            },
444            Self::ProposalFailed(e) => write!(f, "Proposal failed: {}", e),
445            Self::ClusterError(e) => write!(f, "Cluster error: {}", e),
446            Self::Timeout => write!(f, "Operation timed out"),
447        }
448    }
449}
450
451impl std::error::Error for ReplicationError {}
452
453impl From<ClusterError> for ReplicationError {
454    fn from(e: ClusterError) -> Self {
455        Self::ClusterError(e)
456    }
457}
458
459// =============================================================================
460// Tests
461// =============================================================================
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466
467    fn create_engine(id: &str) -> ReplicationEngine {
468        let node = NodeInfo::new(id, "127.0.0.1", 5000);
469        ReplicationEngine::new(node, ReplicationConfig::default())
470    }
471
472    #[test]
473    fn test_engine_creation() {
474        let engine = create_engine("node1");
475
476        assert_eq!(engine.node_id().as_str(), "node1");
477        assert_eq!(engine.role(), NodeRole::Follower);
478        assert!(!engine.is_leader());
479    }
480
481    #[test]
482    fn test_add_remove_peer() {
483        let engine = create_engine("node1");
484
485        let peer = NodeInfo::new("node2", "127.0.0.1", 5001);
486        engine.add_peer(peer).unwrap();
487
488        assert_eq!(engine.peers().len(), 1);
489
490        engine.remove_peer(&NodeId::new("node2")).unwrap();
491        assert_eq!(engine.peers().len(), 0);
492    }
493
494    #[test]
495    fn test_propose_not_leader() {
496        let mut engine = create_engine("node1");
497
498        let command = Command::set("key", b"value".to_vec());
499        let result = engine.propose(command);
500
501        assert!(matches!(result, Err(ReplicationError::NotLeader(_))));
502    }
503
504    #[test]
505    fn test_become_leader_and_propose() {
506        let mut engine = create_engine("node1");
507
508        let peer = NodeInfo::new("node2", "127.0.0.1", 5001);
509        engine.add_peer(peer).unwrap();
510
511        let request = engine.start_election();
512        let response = VoteResponse {
513            term: request.term,
514            vote_granted: true,
515            voter_id: NodeId::new("node2"),
516        };
517        engine.handle_vote_response(&response);
518
519        assert!(engine.is_leader());
520
521        let index = engine.set("key1", b"value1".to_vec()).unwrap();
522        assert!(index > 0);
523    }
524
525    #[test]
526    fn test_tick_result() {
527        let mut engine = create_engine("node1");
528
529        std::thread::sleep(Duration::from_millis(15));
530        let result = engine.tick();
531
532        assert!(result.should_start_election || result.applied_count == 0);
533    }
534
535    #[test]
536    fn test_leader_creates_append_entries() {
537        let engine = create_engine("node1");
538
539        let peer = NodeInfo::new("node2", "127.0.0.1", 5001);
540        engine.add_peer(peer).unwrap();
541
542        engine.start_election();
543        engine.handle_vote_response(&VoteResponse {
544            term: 1,
545            vote_granted: true,
546            voter_id: NodeId::new("node2"),
547        });
548
549        let requests = engine.create_append_entries_for_peers();
550        assert_eq!(requests.len(), 1);
551        assert_eq!(requests[0].0.as_str(), "node2");
552    }
553
554    #[test]
555    #[allow(unused_mut)]
556    fn test_message_processing() {
557        let mut engine1 = create_engine("node1");
558        let mut engine2 = create_engine("node2");
559
560        engine1
561            .add_peer(NodeInfo::new("node2", "127.0.0.1", 5001))
562            .unwrap();
563        engine2
564            .add_peer(NodeInfo::new("node1", "127.0.0.1", 5000))
565            .unwrap();
566
567        let vote_request = engine1.start_election();
568        let msg = Message::vote_request(NodeId::new("node1"), NodeId::new("node2"), vote_request);
569
570        let response_msg = engine2.process_message(msg).unwrap();
571
572        if let MessagePayload::VoteResponse(resp) = response_msg.payload {
573            assert!(resp.vote_granted);
574        } else {
575            panic!("Expected VoteResponse");
576        }
577    }
578
579    #[test]
580    fn test_cluster_stats() {
581        let engine = create_engine("node1");
582
583        let mut peer = NodeInfo::new("node2", "127.0.0.1", 5001);
584        peer.mark_healthy();
585        engine.add_peer(peer).unwrap();
586
587        let stats = engine.cluster_stats();
588        assert_eq!(stats.total_nodes, 2);
589    }
590
591    #[test]
592    fn test_get_set() {
593        let mut engine = create_engine("node1");
594        let peer = NodeInfo::new("node2", "127.0.0.1", 5001);
595        engine.add_peer(peer).unwrap();
596
597        engine.start_election();
598        engine.handle_vote_response(&VoteResponse {
599            term: 1,
600            vote_granted: true,
601            voter_id: NodeId::new("node2"),
602        });
603
604        engine.set("key1", b"value1".to_vec()).unwrap();
605
606        engine
607            .raft
608            .log()
609            .set_commit_index(engine.raft.log().last_index());
610        engine.apply_committed();
611
612        let value = engine.get("key1");
613        assert_eq!(value, Some(b"value1".to_vec()));
614    }
615}