1use crate::message::message::Message;
7pub type NodeId = String;
9
10#[derive(Debug, Clone, PartialEq)]
12pub enum NodeState {
13 Active,
14 Inactive,
15 Syncing,
16 Failed,
17}
18
19use crate::network::error::{NetworkError, NetworkResult};
20use serde::{Deserialize, Serialize};
21use std::collections::{HashMap, VecDeque};
22use std::sync::Arc;
23use std::time::{Duration, SystemTime};
24use tokio::sync::{mpsc, RwLock, Mutex};
25use tokio::time::{interval, timeout};
26
27#[derive(Debug, Clone)]
29pub struct ReplicationConfig {
30 pub replication_factor: usize,
32 pub min_in_sync_replicas: usize,
34 pub ack_timeout: Duration,
36 pub max_lag_ms: u64,
38 pub batch_size: usize,
40 pub max_retries: usize,
42}
43
44impl Default for ReplicationConfig {
45 fn default() -> Self {
46 Self {
47 replication_factor: 3,
48 min_in_sync_replicas: 2,
49 ack_timeout: Duration::from_secs(5),
50 max_lag_ms: 1000,
51 batch_size: 100,
52 max_retries: 3,
53 }
54 }
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct ReplicationLogEntry {
60 pub lsn: u64,
62 pub topic: String,
64 pub partition_id: u32,
66 pub message: Message,
68 pub timestamp: u64,
70 pub checksum: u64,
72}
73
74#[derive(Debug, Clone)]
76pub struct ReplicaState {
77 pub node_id: NodeId,
79 pub last_synced_lsn: u64,
81 pub last_heartbeat: SystemTime,
83 pub in_sync: bool,
85 pub lag_ms: u64,
87}
88
89pub struct ReplicationManager {
91 config: ReplicationConfig,
93 node_id: NodeId,
95 replication_log: Arc<RwLock<VecDeque<ReplicationLogEntry>>>,
97 current_lsn: Arc<Mutex<u64>>,
99 replica_states: Arc<RwLock<HashMap<NodeId, ReplicaState>>>,
101 in_sync_replicas: Arc<RwLock<Vec<NodeId>>>,
103 pending_acks: Arc<RwLock<HashMap<u64, PendingAck>>>,
105 replication_tx: mpsc::UnboundedSender<ReplicationMessage>,
107 stats: Arc<RwLock<ReplicationStats>>,
109}
110
111#[derive(Debug)]
113#[allow(dead_code)]
114struct PendingAck {
115 lsn: u64,
116 required_acks: usize,
117 received_acks: Vec<NodeId>,
118 created_at: SystemTime,
119 sender: Option<tokio::sync::oneshot::Sender<NetworkResult<()>>>,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub enum ReplicationMessage {
125 ReplicateEntries {
127 entries: Vec<ReplicationLogEntry>,
128 leader_commit: u64,
129 },
130 ReplicationAck {
132 node_id: NodeId,
133 last_synced_lsn: u64,
134 success: bool,
135 },
136 Heartbeat {
138 node_id: NodeId,
139 last_lsn: u64,
140 },
141 CompactionRequest {
143 before_lsn: u64,
144 },
145}
146
147#[derive(Debug, Default)]
149pub struct ReplicationStats {
150 pub total_replicated_entries: u64,
151 pub failed_replications: u64,
152 pub average_replication_lag_ms: f64,
153 pub in_sync_replica_count: usize,
154 pub total_acks_received: u64,
155 pub replication_errors: u64,
156}
157
158impl ReplicationManager {
159 pub async fn new(config: ReplicationConfig, node_id: NodeId) -> NetworkResult<Self> {
161 let (replication_tx, replication_rx) = mpsc::unbounded_channel();
162
163 let manager = Self {
164 config: config.clone(),
165 node_id: node_id.clone(),
166 replication_log: Arc::new(RwLock::new(VecDeque::new())),
167 current_lsn: Arc::new(Mutex::new(0)),
168 replica_states: Arc::new(RwLock::new(HashMap::new())),
169 in_sync_replicas: Arc::new(RwLock::new(Vec::new())),
170 pending_acks: Arc::new(RwLock::new(HashMap::new())),
171 replication_tx: replication_tx.clone(),
172 stats: Arc::new(RwLock::new(ReplicationStats::default())),
173 };
174
175 let manager_clone = manager.clone();
177 tokio::spawn(async move {
178 manager_clone.handle_replication_messages(replication_rx).await;
179 });
180
181 manager.start_background_tasks().await;
183
184 println!("✅ Replication manager initialized for node: {}", node_id);
185 Ok(manager)
186 }
187
188 pub async fn replicate_message(
190 &self,
191 _topic: &str,
192 _partition_id: u32,
193 message: Message,
194 ) -> NetworkResult<u64> {
195 let lsn = self.next_lsn().await;
197
198 let entry = ReplicationLogEntry {
200 lsn,
201 topic: message.topic_id.clone(),
202 partition_id: message.partition_id as u32,
203 message: message.clone(),
204 timestamp: std::time::SystemTime::now()
205 .duration_since(std::time::UNIX_EPOCH)
206 .unwrap()
207 .as_secs(),
208 checksum: self.calculate_checksum(&message.content.as_bytes()),
209 }; self.replication_log.write().await.push_back(entry.clone());
211
212 let in_sync_replicas = self.in_sync_replicas.read().await.clone();
214
215 if in_sync_replicas.len() < self.config.min_in_sync_replicas {
216 return Err(NetworkError::ReplicationError(
217 "Insufficient in-sync replicas".to_string()
218 ));
219 }
220
221 let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
223 {
224 let mut pending_acks = self.pending_acks.write().await;
225 pending_acks.insert(lsn, PendingAck {
226 lsn,
227 required_acks: self.config.min_in_sync_replicas,
228 received_acks: Vec::new(),
229 created_at: SystemTime::now(),
230 sender: Some(ack_tx),
231 });
232 }
233
234 let replication_msg = ReplicationMessage::ReplicateEntries {
236 entries: vec![entry],
237 leader_commit: lsn,
238 };
239
240 for replica_id in &in_sync_replicas {
241 if *replica_id != self.node_id {
242 self.replication_tx.send(replication_msg.clone())
244 .map_err(|e| NetworkError::ReplicationError(e.to_string()))?;
245 }
246 }
247
248 match timeout(self.config.ack_timeout, ack_rx).await {
250 Ok(Ok(Ok(()))) => {
251 self.stats.write().await.total_replicated_entries += 1;
252 Ok(lsn)
253 }
254 Ok(Ok(Err(e))) => Err(NetworkError::ReplicationError(
255 format!("Replication acknowledgment error: {}", e)
256 )),
257 Ok(Err(_)) => Err(NetworkError::ReplicationError(
258 "Replication acknowledgment channel closed".to_string()
259 )),
260 Err(_) => {
261 self.pending_acks.write().await.remove(&lsn);
263 self.stats.write().await.failed_replications += 1;
264 Err(NetworkError::ReplicationError(
265 "Replication acknowledgment timeout".to_string()
266 ))
267 }
268 }
269 }
270
271 pub async fn add_replica(&self, node_id: NodeId) -> NetworkResult<()> {
273 let replica_state = ReplicaState {
274 node_id: node_id.clone(),
275 last_synced_lsn: 0,
276 last_heartbeat: SystemTime::now(),
277 in_sync: false,
278 lag_ms: 0,
279 };
280
281 self.replica_states.write().await.insert(node_id.clone(), replica_state);
282
283 println!("📥 Added replica: {}", node_id);
284 Ok(())
285 }
286
287 pub async fn remove_replica(&self, node_id: &NodeId) -> NetworkResult<()> {
289 self.replica_states.write().await.remove(node_id);
290
291 let mut in_sync_replicas = self.in_sync_replicas.write().await;
293 in_sync_replicas.retain(|id| id != node_id);
294
295 println!("📤 Removed replica: {}", node_id);
296 Ok(())
297 }
298
299 pub async fn handle_replication_ack(
301 &self,
302 node_id: NodeId,
303 lsn: u64,
304 success: bool,
305 ) -> NetworkResult<()> {
306 if let Some(replica_state) = self.replica_states.write().await.get_mut(&node_id) {
308 if success {
309 replica_state.last_synced_lsn = lsn;
310 replica_state.last_heartbeat = SystemTime::now();
311 replica_state.lag_ms = 0; }
313 }
314
315 let mut pending_acks = self.pending_acks.write().await;
317 if let Some(pending_ack) = pending_acks.get_mut(&lsn) {
318 if success && !pending_ack.received_acks.contains(&node_id) {
319 pending_ack.received_acks.push(node_id.clone());
320
321 if pending_ack.received_acks.len() >= pending_ack.required_acks {
323 if let Some(sender) = pending_ack.sender.take() {
324 let _ = sender.send(Ok(()));
325 }
326 pending_acks.remove(&lsn);
327 self.stats.write().await.total_acks_received += 1;
328 }
329 }
330 }
331
332 Ok(())
333 }
334
335 pub async fn get_statistics(&self) -> ReplicationStats {
337 let stats = self.stats.read().await;
338 let mut result = ReplicationStats {
339 total_replicated_entries: stats.total_replicated_entries,
340 failed_replications: stats.failed_replications,
341 average_replication_lag_ms: stats.average_replication_lag_ms,
342 in_sync_replica_count: 0, total_acks_received: stats.total_acks_received,
344 replication_errors: stats.replication_errors,
345 };
346 result.in_sync_replica_count = self.in_sync_replicas.read().await.len();
347
348 let replica_states = self.replica_states.read().await;
350 if !replica_states.is_empty() {
351 let total_lag: u64 = replica_states.values().map(|s| s.lag_ms).sum();
352 result.average_replication_lag_ms = total_lag as f64 / replica_states.len() as f64;
353 }
354
355 result
356 }
357
358 async fn next_lsn(&self) -> u64 {
360 let mut current_lsn = self.current_lsn.lock().await;
361 *current_lsn += 1;
362 *current_lsn
363 }
364
365 fn calculate_checksum(&self, data: &[u8]) -> u64 {
367 use std::collections::hash_map::DefaultHasher;
368 use std::hash::{Hash, Hasher};
369
370 let mut hasher = DefaultHasher::new();
371 data.hash(&mut hasher);
372 hasher.finish()
373 }
374
375 #[allow(dead_code)]
377 fn calculate_message_checksum(&self, message: &Message) -> u64 {
378 use std::collections::hash_map::DefaultHasher;
379 use std::hash::{Hash, Hasher};
380
381 let mut hasher = DefaultHasher::new();
382 message.id.hash(&mut hasher);
383 message.content.hash(&mut hasher);
384 hasher.finish()
385 }
386
387 async fn handle_replication_messages(&self, mut rx: mpsc::UnboundedReceiver<ReplicationMessage>) {
389 while let Some(message) = rx.recv().await {
390 match message {
391 ReplicationMessage::ReplicateEntries { entries, leader_commit: _ } => {
392 for entry in entries {
394 self.replication_log.write().await.push_back(entry.clone());
396
397 let ack_msg = ReplicationMessage::ReplicationAck {
399 node_id: self.node_id.clone(),
400 last_synced_lsn: entry.lsn,
401 success: true,
402 };
403
404 let _ = self.replication_tx.send(ack_msg);
405 }
406 }
407 ReplicationMessage::ReplicationAck { node_id, last_synced_lsn, success } => {
408 let _ = self.handle_replication_ack(node_id, last_synced_lsn, success).await;
409 }
410 ReplicationMessage::Heartbeat { node_id, last_lsn } => {
411 if let Some(replica_state) = self.replica_states.write().await.get_mut(&node_id) {
413 replica_state.last_heartbeat = SystemTime::now();
414 replica_state.last_synced_lsn = last_lsn;
415 }
416 }
417 ReplicationMessage::CompactionRequest { before_lsn } => {
418 self.compact_log(before_lsn).await;
420 }
421 }
422 }
423 }
424
425 async fn start_background_tasks(&self) {
427 let manager = self.clone();
428
429 tokio::spawn(async move {
431 let mut interval = interval(Duration::from_secs(1));
432 loop {
433 interval.tick().await;
434 manager.send_heartbeats().await;
435 manager.update_in_sync_replicas().await;
436 manager.cleanup_expired_acks().await;
437 }
438 });
439 }
440
441 async fn send_heartbeats(&self) {
443 let current_lsn = *self.current_lsn.lock().await;
444 let heartbeat_msg = ReplicationMessage::Heartbeat {
445 node_id: self.node_id.clone(),
446 last_lsn: current_lsn,
447 };
448
449 let _ = self.replication_tx.send(heartbeat_msg);
450 }
451
452 async fn update_in_sync_replicas(&self) {
454 let now = SystemTime::now();
455 let mut new_in_sync_replicas = Vec::new();
456
457 let replica_states = self.replica_states.read().await;
458 for (node_id, state) in replica_states.iter() {
459 let heartbeat_age = now.duration_since(state.last_heartbeat)
461 .unwrap_or_default()
462 .as_millis() as u64;
463
464 if heartbeat_age < self.config.max_lag_ms && state.lag_ms < self.config.max_lag_ms {
465 new_in_sync_replicas.push(node_id.clone());
466 }
467 }
468
469 *self.in_sync_replicas.write().await = new_in_sync_replicas;
470 }
471
472 async fn cleanup_expired_acks(&self) {
474 let now = SystemTime::now();
475 let mut pending_acks = self.pending_acks.write().await;
476
477 pending_acks.retain(|_, ack| {
478 let age = now.duration_since(ack.created_at).unwrap_or_default();
479 age < self.config.ack_timeout * 2
480 });
481 }
482
483 async fn compact_log(&self, before_lsn: u64) {
485 let mut log = self.replication_log.write().await;
486 while let Some(entry) = log.front() {
487 if entry.lsn < before_lsn {
488 log.pop_front();
489 } else {
490 break;
491 }
492 }
493
494 println!("🗜️ Compacted replication log up to LSN: {}", before_lsn);
495 }
496}
497
498impl Clone for ReplicationManager {
499 fn clone(&self) -> Self {
500 Self {
501 config: self.config.clone(),
502 node_id: self.node_id.clone(),
503 replication_log: Arc::clone(&self.replication_log),
504 current_lsn: Arc::clone(&self.current_lsn),
505 replica_states: Arc::clone(&self.replica_states),
506 in_sync_replicas: Arc::clone(&self.in_sync_replicas),
507 pending_acks: Arc::clone(&self.pending_acks),
508 replication_tx: self.replication_tx.clone(),
509 stats: Arc::clone(&self.stats),
510 }
511 }
512}
513
514impl NetworkError {
516 pub fn replication_error(msg: String) -> Self {
517 NetworkError::ReplicationError(msg)
518 }
519}
520
521#[cfg(test)]
524mod tests {
525 use super::*;
526
527 #[tokio::test]
528 async fn test_replication_manager_creation() {
529 let config = ReplicationConfig::default();
530 let node_id = "test-node-1".to_string();
531
532 let manager = ReplicationManager::new(config, node_id).await;
533 assert!(manager.is_ok());
534 }
535
536 #[tokio::test]
537 async fn test_replica_management() {
538 let config = ReplicationConfig::default();
539 let manager = ReplicationManager::new(config, "leader".to_string()).await.unwrap();
540
541 let result = manager.add_replica("replica-1".to_string()).await;
543 assert!(result.is_ok());
544
545 let result = manager.remove_replica(&"replica-1".to_string()).await;
547 assert!(result.is_ok());
548 }
549
550 #[tokio::test]
551 async fn test_statistics() {
552 let config = ReplicationConfig::default();
553 let manager = ReplicationManager::new(config, "test-node".to_string()).await.unwrap();
554
555 let stats = manager.get_statistics().await;
556 assert_eq!(stats.total_replicated_entries, 0);
557 assert_eq!(stats.in_sync_replica_count, 0);
558 }
559}