1use crate::cluster::{Cluster, ClusterConfig, ClusterError, ClusterState, ClusterStats};
9use crate::node::{NodeHealth, NodeId, NodeInfo, NodeRole};
10use crate::raft::{AppendEntriesRequest, RaftConfig, RaftNode, VoteRequest, VoteResponse};
11use crate::state::{Command, CommandResult};
12use crate::transport::{Message, MessagePayload, Transport};
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16#[derive(Debug, Clone)]
22pub struct ReplicationConfig {
23 pub raft: RaftConfig,
24 pub cluster: ClusterConfig,
25 pub tick_interval: Duration,
26 pub apply_batch_size: usize,
27}
28
29impl Default for ReplicationConfig {
30 fn default() -> Self {
31 Self {
32 raft: RaftConfig::default(),
33 cluster: ClusterConfig::default(),
34 tick_interval: Duration::from_millis(10),
35 apply_batch_size: 100,
36 }
37 }
38}
39
40impl ReplicationConfig {
41 pub fn new() -> Self {
42 Self::default()
43 }
44
45 pub fn with_raft(mut self, raft: RaftConfig) -> Self {
46 self.raft = raft;
47 self
48 }
49
50 pub fn with_cluster(mut self, cluster: ClusterConfig) -> Self {
51 self.cluster = cluster;
52 self
53 }
54}
55
56pub struct ReplicationEngine {
62 config: ReplicationConfig,
63 raft: Arc<RaftNode>,
64 cluster: Arc<Cluster>,
65 last_tick: Instant,
66 pending_proposals: Vec<PendingProposal>,
67}
68
69#[derive(Debug)]
71#[allow(dead_code)]
72struct PendingProposal {
73 index: u64,
74 command: Command,
75 proposed_at: Instant,
76}
77
78impl ReplicationEngine {
79 pub fn new(local_node: NodeInfo, config: ReplicationConfig) -> Self {
81 let node_id = local_node.id.clone();
82 let raft = Arc::new(RaftNode::new(node_id, config.raft.clone()));
83 let cluster = Arc::new(Cluster::new(local_node, config.cluster.clone()));
84
85 Self {
86 config,
87 raft,
88 cluster,
89 last_tick: Instant::now(),
90 pending_proposals: Vec::new(),
91 }
92 }
93
94 pub fn node_id(&self) -> NodeId {
96 self.raft.id()
97 }
98
99 pub fn role(&self) -> NodeRole {
101 self.raft.role()
102 }
103
104 pub fn is_leader(&self) -> bool {
106 self.raft.is_leader()
107 }
108
109 pub fn leader_id(&self) -> Option<NodeId> {
111 self.raft.leader_id()
112 }
113
114 pub fn current_term(&self) -> u64 {
116 self.raft.current_term()
117 }
118
119 pub fn cluster_state(&self) -> ClusterState {
121 self.cluster.state()
122 }
123
124 pub fn cluster_stats(&self) -> ClusterStats {
126 self.cluster.stats()
127 }
128
129 pub fn add_peer(&self, peer: NodeInfo) -> Result<(), ClusterError> {
135 let peer_id = peer.id.clone();
136 self.cluster.add_node(peer)?;
137 self.raft.add_peer(peer_id);
138 Ok(())
139 }
140
141 pub fn remove_peer(&self, peer_id: &NodeId) -> Result<NodeInfo, ClusterError> {
143 self.raft.remove_peer(peer_id);
144 self.cluster.remove_node(peer_id)
145 }
146
147 pub fn peers(&self) -> Vec<NodeInfo> {
149 self.cluster.peers()
150 }
151
152 pub fn peer_ids(&self) -> Vec<NodeId> {
154 self.cluster.peer_ids()
155 }
156
157 pub fn propose(&mut self, command: Command) -> Result<u64, ReplicationError> {
163 if !self.is_leader() {
164 return Err(ReplicationError::NotLeader(self.leader_id()));
165 }
166
167 let index = self
168 .raft
169 .propose(command.clone())
170 .map_err(ReplicationError::ProposalFailed)?;
171
172 self.pending_proposals.push(PendingProposal {
173 index,
174 command,
175 proposed_at: Instant::now(),
176 });
177
178 Ok(index)
179 }
180
181 pub fn get(&self, key: &str) -> Option<Vec<u8>> {
183 self.raft.get(key)
184 }
185
186 pub fn set(&mut self, key: impl Into<String>, value: Vec<u8>) -> Result<u64, ReplicationError> {
188 let command = Command::set(key, value);
189 self.propose(command)
190 }
191
192 pub fn delete(&mut self, key: impl Into<String>) -> Result<u64, ReplicationError> {
194 let command = Command::delete(key);
195 self.propose(command)
196 }
197
198 pub fn apply_committed(&self) -> Vec<CommandResult> {
200 self.raft.apply_committed()
201 }
202
203 pub fn tick(&mut self) -> TickResult {
209 let mut result = TickResult::default();
210
211 let elapsed = self.last_tick.elapsed();
212 if elapsed < self.config.tick_interval {
213 return result;
214 }
215 self.last_tick = Instant::now();
216
217 match self.raft.role() {
218 NodeRole::Follower | NodeRole::Candidate => {
219 if self.raft.election_timeout_elapsed() {
220 result.should_start_election = true;
221 }
222 }
223 NodeRole::Leader => {
224 result.should_send_heartbeats = true;
225 }
226 }
227
228 let failed = self.cluster.check_failures();
229 result.failed_nodes = failed;
230
231 let applied = self.apply_committed();
232 result.applied_count = applied.len();
233
234 self.cleanup_pending_proposals();
235
236 result
237 }
238
239 pub fn start_election(&self) -> VoteRequest {
241 self.raft.start_election()
242 }
243
244 pub fn handle_vote_request(&self, request: &VoteRequest) -> VoteResponse {
246 let response = self.raft.handle_vote_request(request);
247 self.sync_role_to_cluster();
248 response
249 }
250
251 pub fn handle_vote_response(&self, response: &VoteResponse) -> bool {
253 let became_leader = self.raft.handle_vote_response(response);
254 if became_leader {
255 self.cluster.set_leader(Some(self.node_id()));
256 self.cluster
257 .set_node_role(&self.node_id(), NodeRole::Leader);
258 }
259 self.sync_role_to_cluster();
260 became_leader
261 }
262
263 pub fn create_append_entries_for_peers(&self) -> Vec<(NodeId, AppendEntriesRequest)> {
265 self.peer_ids()
266 .into_iter()
267 .filter_map(|peer_id| {
268 self.raft
269 .create_append_entries(&peer_id)
270 .map(|req| (peer_id, req))
271 })
272 .collect()
273 }
274
275 pub fn handle_append_entries(
277 &self,
278 request: &AppendEntriesRequest,
279 ) -> crate::raft::AppendEntriesResponse {
280 let response = self.raft.handle_append_entries(request);
281
282 if response.success {
283 self.cluster.set_leader(Some(request.leader_id.clone()));
284 self.cluster.heartbeat(&request.leader_id);
285 }
286
287 self.sync_role_to_cluster();
288 response
289 }
290
291 pub fn handle_append_entries_response(
293 &self,
294 peer_id: &NodeId,
295 response: &crate::raft::AppendEntriesResponse,
296 ) {
297 self.raft.handle_append_entries_response(peer_id, response);
298 self.sync_role_to_cluster();
299 }
300
301 fn sync_role_to_cluster(&self) {
302 let role = self.raft.role();
303 self.cluster.set_node_role(&self.node_id(), role);
304
305 if role != NodeRole::Leader && self.cluster.is_leader() {
306 self.cluster.set_leader(None);
307 }
308 }
309
310 fn cleanup_pending_proposals(&mut self) {
311 let timeout = Duration::from_secs(30);
312 self.pending_proposals
313 .retain(|p| p.proposed_at.elapsed() < timeout);
314 }
315
316 pub fn update_health(&self, health: NodeHealth) {
322 self.cluster.update_health(health);
323 }
324
325 pub fn heartbeat(&self, peer_id: &NodeId) {
327 self.cluster.heartbeat(peer_id);
328 self.raft.reset_heartbeat();
329 }
330
331 pub fn process_message(&mut self, message: Message) -> Option<Message> {
337 match message.payload {
338 MessagePayload::VoteRequest(ref req) => {
339 let response = self.handle_vote_request(req);
340 Some(Message::vote_response(
341 self.node_id(),
342 message.from,
343 response,
344 ))
345 }
346 MessagePayload::VoteResponse(ref resp) => {
347 self.handle_vote_response(resp);
348 None
349 }
350 MessagePayload::AppendEntries(ref req) => {
351 let response = self.handle_append_entries(req);
352 Some(Message::append_entries_response(
353 self.node_id(),
354 message.from,
355 response,
356 ))
357 }
358 MessagePayload::AppendEntriesResponse(ref resp) => {
359 self.handle_append_entries_response(&message.from, resp);
360 None
361 }
362 MessagePayload::Heartbeat => {
363 self.heartbeat(&message.from);
364 None
365 }
366 _ => None,
367 }
368 }
369
370 pub fn send_heartbeats(&self, transport: &dyn Transport) {
372 if !self.is_leader() {
373 return;
374 }
375
376 for (peer_id, request) in self.create_append_entries_for_peers() {
377 let msg = Message::append_entries(self.node_id(), peer_id, request);
378 let _ = transport.send(msg);
379 }
380 }
381
382 pub fn broadcast_election(&self, transport: &dyn Transport) {
384 let request = self.start_election();
385 for peer_id in self.peer_ids() {
386 let msg = Message::vote_request(self.node_id(), peer_id, request.clone());
387 let _ = transport.send(msg);
388 }
389 }
390
391 pub fn raft(&self) -> &RaftNode {
397 &self.raft
398 }
399
400 pub fn cluster(&self) -> &Cluster {
402 &self.cluster
403 }
404
405 pub fn config(&self) -> &ReplicationConfig {
407 &self.config
408 }
409}
410
411#[derive(Debug, Default)]
417pub struct TickResult {
418 pub should_start_election: bool,
419 pub should_send_heartbeats: bool,
420 pub failed_nodes: Vec<NodeId>,
421 pub applied_count: usize,
422}
423
424#[derive(Debug)]
430pub enum ReplicationError {
431 NotLeader(Option<NodeId>),
432 ProposalFailed(String),
433 ClusterError(ClusterError),
434 Timeout,
435}
436
437impl std::fmt::Display for ReplicationError {
438 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
439 match self {
440 Self::NotLeader(leader) => match leader {
441 Some(id) => write!(f, "Not the leader, current leader: {}", id),
442 None => write!(f, "Not the leader, no leader elected"),
443 },
444 Self::ProposalFailed(e) => write!(f, "Proposal failed: {}", e),
445 Self::ClusterError(e) => write!(f, "Cluster error: {}", e),
446 Self::Timeout => write!(f, "Operation timed out"),
447 }
448 }
449}
450
451impl std::error::Error for ReplicationError {}
452
453impl From<ClusterError> for ReplicationError {
454 fn from(e: ClusterError) -> Self {
455 Self::ClusterError(e)
456 }
457}
458
459#[cfg(test)]
464mod tests {
465 use super::*;
466
467 fn create_engine(id: &str) -> ReplicationEngine {
468 let node = NodeInfo::new(id, "127.0.0.1", 5000);
469 ReplicationEngine::new(node, ReplicationConfig::default())
470 }
471
472 #[test]
473 fn test_engine_creation() {
474 let engine = create_engine("node1");
475
476 assert_eq!(engine.node_id().as_str(), "node1");
477 assert_eq!(engine.role(), NodeRole::Follower);
478 assert!(!engine.is_leader());
479 }
480
481 #[test]
482 fn test_add_remove_peer() {
483 let engine = create_engine("node1");
484
485 let peer = NodeInfo::new("node2", "127.0.0.1", 5001);
486 engine.add_peer(peer).unwrap();
487
488 assert_eq!(engine.peers().len(), 1);
489
490 engine.remove_peer(&NodeId::new("node2")).unwrap();
491 assert_eq!(engine.peers().len(), 0);
492 }
493
494 #[test]
495 fn test_propose_not_leader() {
496 let mut engine = create_engine("node1");
497
498 let command = Command::set("key", b"value".to_vec());
499 let result = engine.propose(command);
500
501 assert!(matches!(result, Err(ReplicationError::NotLeader(_))));
502 }
503
504 #[test]
505 fn test_become_leader_and_propose() {
506 let mut engine = create_engine("node1");
507
508 let peer = NodeInfo::new("node2", "127.0.0.1", 5001);
509 engine.add_peer(peer).unwrap();
510
511 let request = engine.start_election();
512 let response = VoteResponse {
513 term: request.term,
514 vote_granted: true,
515 voter_id: NodeId::new("node2"),
516 };
517 engine.handle_vote_response(&response);
518
519 assert!(engine.is_leader());
520
521 let index = engine.set("key1", b"value1".to_vec()).unwrap();
522 assert!(index > 0);
523 }
524
525 #[test]
526 fn test_tick_result() {
527 let mut engine = create_engine("node1");
528
529 std::thread::sleep(Duration::from_millis(15));
530 let result = engine.tick();
531
532 assert!(result.should_start_election || result.applied_count == 0);
533 }
534
535 #[test]
536 fn test_leader_creates_append_entries() {
537 let engine = create_engine("node1");
538
539 let peer = NodeInfo::new("node2", "127.0.0.1", 5001);
540 engine.add_peer(peer).unwrap();
541
542 engine.start_election();
543 engine.handle_vote_response(&VoteResponse {
544 term: 1,
545 vote_granted: true,
546 voter_id: NodeId::new("node2"),
547 });
548
549 let requests = engine.create_append_entries_for_peers();
550 assert_eq!(requests.len(), 1);
551 assert_eq!(requests[0].0.as_str(), "node2");
552 }
553
554 #[test]
555 #[allow(unused_mut)]
556 fn test_message_processing() {
557 let mut engine1 = create_engine("node1");
558 let mut engine2 = create_engine("node2");
559
560 engine1
561 .add_peer(NodeInfo::new("node2", "127.0.0.1", 5001))
562 .unwrap();
563 engine2
564 .add_peer(NodeInfo::new("node1", "127.0.0.1", 5000))
565 .unwrap();
566
567 let vote_request = engine1.start_election();
568 let msg = Message::vote_request(NodeId::new("node1"), NodeId::new("node2"), vote_request);
569
570 let response_msg = engine2.process_message(msg).unwrap();
571
572 if let MessagePayload::VoteResponse(resp) = response_msg.payload {
573 assert!(resp.vote_granted);
574 } else {
575 panic!("Expected VoteResponse");
576 }
577 }
578
579 #[test]
580 fn test_cluster_stats() {
581 let engine = create_engine("node1");
582
583 let mut peer = NodeInfo::new("node2", "127.0.0.1", 5001);
584 peer.mark_healthy();
585 engine.add_peer(peer).unwrap();
586
587 let stats = engine.cluster_stats();
588 assert_eq!(stats.total_nodes, 2);
589 }
590
591 #[test]
592 fn test_get_set() {
593 let mut engine = create_engine("node1");
594 let peer = NodeInfo::new("node2", "127.0.0.1", 5001);
595 engine.add_peer(peer).unwrap();
596
597 engine.start_election();
598 engine.handle_vote_response(&VoteResponse {
599 term: 1,
600 vote_granted: true,
601 voter_id: NodeId::new("node2"),
602 });
603
604 engine.set("key1", b"value1".to_vec()).unwrap();
605
606 engine
607 .raft
608 .log()
609 .set_commit_index(engine.raft.log().last_index());
610 engine.apply_committed();
611
612 let value = engine.get("key1");
613 assert_eq!(value, Some(b"value1".to_vec()));
614 }
615}