pilgrimage/broker/
data_replication.rs

1//! Data Replication System
2//!
3//! Implements distributed data replication with strong consistency guarantees,
4//! automatic failover, and conflict resolution for production environments.
5
6use crate::message::message::Message;
7/// Node identifier type
8pub type NodeId = String;
9
10/// Node state enumeration
11#[derive(Debug, Clone, PartialEq)]
12pub enum NodeState {
13    Active,
14    Inactive,
15    Syncing,
16    Failed,
17}
18
19use crate::network::error::{NetworkError, NetworkResult};
20use serde::{Deserialize, Serialize};
21use std::collections::{HashMap, VecDeque};
22use std::sync::Arc;
23use std::time::{Duration, SystemTime};
24use tokio::sync::{mpsc, RwLock, Mutex};
25use tokio::time::{interval, timeout};
26
27/// Replication configuration
28#[derive(Debug, Clone)]
29pub struct ReplicationConfig {
30    /// Replication factor (number of replicas)
31    pub replication_factor: usize,
32    /// Minimum in-sync replicas for writes
33    pub min_in_sync_replicas: usize,
34    /// Acknowledgment timeout for replication
35    pub ack_timeout: Duration,
36    /// Maximum log lag for in-sync replicas
37    pub max_lag_ms: u64,
38    /// Batch size for replication
39    pub batch_size: usize,
40    /// Replication retry attempts
41    pub max_retries: usize,
42}
43
44impl Default for ReplicationConfig {
45    fn default() -> Self {
46        Self {
47            replication_factor: 3,
48            min_in_sync_replicas: 2,
49            ack_timeout: Duration::from_secs(5),
50            max_lag_ms: 1000,
51            batch_size: 100,
52            max_retries: 3,
53        }
54    }
55}
56
57/// Replication log entry
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct ReplicationLogEntry {
60    /// Log sequence number
61    pub lsn: u64,
62    /// Topic name
63    pub topic: String,
64    /// Partition ID
65    pub partition_id: u32,
66    /// Message data
67    pub message: Message,
68    /// Timestamp
69    pub timestamp: u64,
70    /// Checksum for integrity
71    pub checksum: u64,
72}
73
74/// Replica state
75#[derive(Debug, Clone)]
76pub struct ReplicaState {
77    /// Node ID of the replica
78    pub node_id: NodeId,
79    /// Last synced LSN
80    pub last_synced_lsn: u64,
81    /// Last heartbeat timestamp
82    pub last_heartbeat: SystemTime,
83    /// In-sync status
84    pub in_sync: bool,
85    /// Lag in milliseconds
86    pub lag_ms: u64,
87}
88
89/// Replication manager for distributed data synchronization
90pub struct ReplicationManager {
91    /// Configuration
92    config: ReplicationConfig,
93    /// Current node ID
94    node_id: NodeId,
95    /// Replication log
96    replication_log: Arc<RwLock<VecDeque<ReplicationLogEntry>>>,
97    /// Current LSN (Log Sequence Number)
98    current_lsn: Arc<Mutex<u64>>,
99    /// Replica states
100    replica_states: Arc<RwLock<HashMap<NodeId, ReplicaState>>>,
101    /// In-sync replica set
102    in_sync_replicas: Arc<RwLock<Vec<NodeId>>>,
103    /// Pending acknowledgments
104    pending_acks: Arc<RwLock<HashMap<u64, PendingAck>>>,
105    /// Replication channels
106    replication_tx: mpsc::UnboundedSender<ReplicationMessage>,
107    /// Statistics
108    stats: Arc<RwLock<ReplicationStats>>,
109}
110
111/// Pending acknowledgment tracking
112#[derive(Debug)]
113#[allow(dead_code)]
114struct PendingAck {
115    lsn: u64,
116    required_acks: usize,
117    received_acks: Vec<NodeId>,
118    created_at: SystemTime,
119    sender: Option<tokio::sync::oneshot::Sender<NetworkResult<()>>>,
120}
121
122/// Replication message types
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub enum ReplicationMessage {
125    /// Replicate log entries to followers
126    ReplicateEntries {
127        entries: Vec<ReplicationLogEntry>,
128        leader_commit: u64,
129    },
130    /// Acknowledgment from follower
131    ReplicationAck {
132        node_id: NodeId,
133        last_synced_lsn: u64,
134        success: bool,
135    },
136    /// Heartbeat for replica liveness
137    Heartbeat {
138        node_id: NodeId,
139        last_lsn: u64,
140    },
141    /// Request for log compaction
142    CompactionRequest {
143        before_lsn: u64,
144    },
145}
146
147/// Replication statistics
148#[derive(Debug, Default)]
149pub struct ReplicationStats {
150    pub total_replicated_entries: u64,
151    pub failed_replications: u64,
152    pub average_replication_lag_ms: f64,
153    pub in_sync_replica_count: usize,
154    pub total_acks_received: u64,
155    pub replication_errors: u64,
156}
157
158impl ReplicationManager {
159    /// Create new replication manager
160    pub async fn new(config: ReplicationConfig, node_id: NodeId) -> NetworkResult<Self> {
161        let (replication_tx, replication_rx) = mpsc::unbounded_channel();
162
163        let manager = Self {
164            config: config.clone(),
165            node_id: node_id.clone(),
166            replication_log: Arc::new(RwLock::new(VecDeque::new())),
167            current_lsn: Arc::new(Mutex::new(0)),
168            replica_states: Arc::new(RwLock::new(HashMap::new())),
169            in_sync_replicas: Arc::new(RwLock::new(Vec::new())),
170            pending_acks: Arc::new(RwLock::new(HashMap::new())),
171            replication_tx: replication_tx.clone(),
172            stats: Arc::new(RwLock::new(ReplicationStats::default())),
173        };
174
175        // Start replication message handler
176        let manager_clone = manager.clone();
177        tokio::spawn(async move {
178            manager_clone.handle_replication_messages(replication_rx).await;
179        });
180
181        // Start heartbeat and cleanup tasks
182        manager.start_background_tasks().await;
183
184        println!("✅ Replication manager initialized for node: {}", node_id);
185        Ok(manager)
186    }
187
188    /// Replicate message to all replicas
189    pub async fn replicate_message(
190        &self,
191        _topic: &str,
192        _partition_id: u32,
193        message: Message,
194    ) -> NetworkResult<u64> {
195        // Generate LSN
196        let lsn = self.next_lsn().await;
197
198        // Create replication log entry
199            let entry = ReplicationLogEntry {
200                lsn,
201                topic: message.topic_id.clone(),
202                partition_id: message.partition_id as u32,
203                message: message.clone(),
204                timestamp: std::time::SystemTime::now()
205                    .duration_since(std::time::UNIX_EPOCH)
206                    .unwrap()
207                    .as_secs(),
208                checksum: self.calculate_checksum(&message.content.as_bytes()),
209            };        // Add to local log
210        self.replication_log.write().await.push_back(entry.clone());
211
212        // Get current in-sync replicas
213        let in_sync_replicas = self.in_sync_replicas.read().await.clone();
214
215        if in_sync_replicas.len() < self.config.min_in_sync_replicas {
216            return Err(NetworkError::ReplicationError(
217                "Insufficient in-sync replicas".to_string()
218            ));
219        }
220
221        // Create pending acknowledgment
222        let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
223        {
224            let mut pending_acks = self.pending_acks.write().await;
225            pending_acks.insert(lsn, PendingAck {
226                lsn,
227                required_acks: self.config.min_in_sync_replicas,
228                received_acks: Vec::new(),
229                created_at: SystemTime::now(),
230                sender: Some(ack_tx),
231            });
232        }
233
234        // Send replication message to all replicas
235        let replication_msg = ReplicationMessage::ReplicateEntries {
236            entries: vec![entry],
237            leader_commit: lsn,
238        };
239
240        for replica_id in &in_sync_replicas {
241            if *replica_id != self.node_id {
242                // In a real implementation, this would send over network
243                self.replication_tx.send(replication_msg.clone())
244                    .map_err(|e| NetworkError::ReplicationError(e.to_string()))?;
245            }
246        }
247
248        // Wait for acknowledgments with timeout
249        match timeout(self.config.ack_timeout, ack_rx).await {
250            Ok(Ok(Ok(()))) => {
251                self.stats.write().await.total_replicated_entries += 1;
252                Ok(lsn)
253            }
254            Ok(Ok(Err(e))) => Err(NetworkError::ReplicationError(
255                format!("Replication acknowledgment error: {}", e)
256            )),
257            Ok(Err(_)) => Err(NetworkError::ReplicationError(
258                "Replication acknowledgment channel closed".to_string()
259            )),
260            Err(_) => {
261                // Remove pending ack on timeout
262                self.pending_acks.write().await.remove(&lsn);
263                self.stats.write().await.failed_replications += 1;
264                Err(NetworkError::ReplicationError(
265                    "Replication acknowledgment timeout".to_string()
266                ))
267            }
268        }
269    }
270
271    /// Add replica to the cluster
272    pub async fn add_replica(&self, node_id: NodeId) -> NetworkResult<()> {
273        let replica_state = ReplicaState {
274            node_id: node_id.clone(),
275            last_synced_lsn: 0,
276            last_heartbeat: SystemTime::now(),
277            in_sync: false,
278            lag_ms: 0,
279        };
280
281        self.replica_states.write().await.insert(node_id.clone(), replica_state);
282
283        println!("📥 Added replica: {}", node_id);
284        Ok(())
285    }
286
287    /// Remove replica from the cluster
288    pub async fn remove_replica(&self, node_id: &NodeId) -> NetworkResult<()> {
289        self.replica_states.write().await.remove(node_id);
290
291        // Remove from in-sync replicas
292        let mut in_sync_replicas = self.in_sync_replicas.write().await;
293        in_sync_replicas.retain(|id| id != node_id);
294
295        println!("📤 Removed replica: {}", node_id);
296        Ok(())
297    }
298
299    /// Handle replication acknowledgment
300    pub async fn handle_replication_ack(
301        &self,
302        node_id: NodeId,
303        lsn: u64,
304        success: bool,
305    ) -> NetworkResult<()> {
306        // Update replica state
307        if let Some(replica_state) = self.replica_states.write().await.get_mut(&node_id) {
308            if success {
309                replica_state.last_synced_lsn = lsn;
310                replica_state.last_heartbeat = SystemTime::now();
311                replica_state.lag_ms = 0; // Would calculate actual lag in real implementation
312            }
313        }
314
315        // Check pending acknowledgments
316        let mut pending_acks = self.pending_acks.write().await;
317        if let Some(pending_ack) = pending_acks.get_mut(&lsn) {
318            if success && !pending_ack.received_acks.contains(&node_id) {
319                pending_ack.received_acks.push(node_id.clone());
320
321                // Check if we have enough acknowledgments
322                if pending_ack.received_acks.len() >= pending_ack.required_acks {
323                    if let Some(sender) = pending_ack.sender.take() {
324                        let _ = sender.send(Ok(()));
325                    }
326                    pending_acks.remove(&lsn);
327                    self.stats.write().await.total_acks_received += 1;
328                }
329            }
330        }
331
332        Ok(())
333    }
334
335    /// Get replication statistics
336    pub async fn get_statistics(&self) -> ReplicationStats {
337        let stats = self.stats.read().await;
338        let mut result = ReplicationStats {
339            total_replicated_entries: stats.total_replicated_entries,
340            failed_replications: stats.failed_replications,
341            average_replication_lag_ms: stats.average_replication_lag_ms,
342            in_sync_replica_count: 0, // Will be set below
343            total_acks_received: stats.total_acks_received,
344            replication_errors: stats.replication_errors,
345        };
346        result.in_sync_replica_count = self.in_sync_replicas.read().await.len();
347
348        // Calculate average lag
349        let replica_states = self.replica_states.read().await;
350        if !replica_states.is_empty() {
351            let total_lag: u64 = replica_states.values().map(|s| s.lag_ms).sum();
352            result.average_replication_lag_ms = total_lag as f64 / replica_states.len() as f64;
353        }
354
355        result
356    }
357
358    /// Get next LSN
359    async fn next_lsn(&self) -> u64 {
360        let mut current_lsn = self.current_lsn.lock().await;
361        *current_lsn += 1;
362        *current_lsn
363    }
364
365    /// Calculate byte checksum
366    fn calculate_checksum(&self, data: &[u8]) -> u64 {
367        use std::collections::hash_map::DefaultHasher;
368        use std::hash::{Hash, Hasher};
369
370        let mut hasher = DefaultHasher::new();
371        data.hash(&mut hasher);
372        hasher.finish()
373    }
374
375    /// Calculate message checksum
376    #[allow(dead_code)]
377    fn calculate_message_checksum(&self, message: &Message) -> u64 {
378        use std::collections::hash_map::DefaultHasher;
379        use std::hash::{Hash, Hasher};
380
381        let mut hasher = DefaultHasher::new();
382        message.id.hash(&mut hasher);
383        message.content.hash(&mut hasher);
384        hasher.finish()
385    }
386
387    /// Handle replication messages
388    async fn handle_replication_messages(&self, mut rx: mpsc::UnboundedReceiver<ReplicationMessage>) {
389        while let Some(message) = rx.recv().await {
390            match message {
391                ReplicationMessage::ReplicateEntries { entries, leader_commit: _ } => {
392                    // Handle replication from leader
393                    for entry in entries {
394                        // Add to local log
395                        self.replication_log.write().await.push_back(entry.clone());
396
397                        // Send acknowledgment
398                        let ack_msg = ReplicationMessage::ReplicationAck {
399                            node_id: self.node_id.clone(),
400                            last_synced_lsn: entry.lsn,
401                            success: true,
402                        };
403
404                        let _ = self.replication_tx.send(ack_msg);
405                    }
406                }
407                ReplicationMessage::ReplicationAck { node_id, last_synced_lsn, success } => {
408                    let _ = self.handle_replication_ack(node_id, last_synced_lsn, success).await;
409                }
410                ReplicationMessage::Heartbeat { node_id, last_lsn } => {
411                    // Update replica heartbeat
412                    if let Some(replica_state) = self.replica_states.write().await.get_mut(&node_id) {
413                        replica_state.last_heartbeat = SystemTime::now();
414                        replica_state.last_synced_lsn = last_lsn;
415                    }
416                }
417                ReplicationMessage::CompactionRequest { before_lsn } => {
418                    // Handle log compaction
419                    self.compact_log(before_lsn).await;
420                }
421            }
422        }
423    }
424
425    /// Start background tasks
426    async fn start_background_tasks(&self) {
427        let manager = self.clone();
428
429        // Heartbeat task
430        tokio::spawn(async move {
431            let mut interval = interval(Duration::from_secs(1));
432            loop {
433                interval.tick().await;
434                manager.send_heartbeats().await;
435                manager.update_in_sync_replicas().await;
436                manager.cleanup_expired_acks().await;
437            }
438        });
439    }
440
441    /// Send heartbeats to all replicas
442    async fn send_heartbeats(&self) {
443        let current_lsn = *self.current_lsn.lock().await;
444        let heartbeat_msg = ReplicationMessage::Heartbeat {
445            node_id: self.node_id.clone(),
446            last_lsn: current_lsn,
447        };
448
449        let _ = self.replication_tx.send(heartbeat_msg);
450    }
451
452    /// Update in-sync replica set
453    async fn update_in_sync_replicas(&self) {
454        let now = SystemTime::now();
455        let mut new_in_sync_replicas = Vec::new();
456
457        let replica_states = self.replica_states.read().await;
458        for (node_id, state) in replica_states.iter() {
459            // Check if replica is in sync based on lag and heartbeat
460            let heartbeat_age = now.duration_since(state.last_heartbeat)
461                .unwrap_or_default()
462                .as_millis() as u64;
463
464            if heartbeat_age < self.config.max_lag_ms && state.lag_ms < self.config.max_lag_ms {
465                new_in_sync_replicas.push(node_id.clone());
466            }
467        }
468
469        *self.in_sync_replicas.write().await = new_in_sync_replicas;
470    }
471
472    /// Clean up expired acknowledgments
473    async fn cleanup_expired_acks(&self) {
474        let now = SystemTime::now();
475        let mut pending_acks = self.pending_acks.write().await;
476
477        pending_acks.retain(|_, ack| {
478            let age = now.duration_since(ack.created_at).unwrap_or_default();
479            age < self.config.ack_timeout * 2
480        });
481    }
482
483    /// Compact replication log
484    async fn compact_log(&self, before_lsn: u64) {
485        let mut log = self.replication_log.write().await;
486        while let Some(entry) = log.front() {
487            if entry.lsn < before_lsn {
488                log.pop_front();
489            } else {
490                break;
491            }
492        }
493
494        println!("🗜️ Compacted replication log up to LSN: {}", before_lsn);
495    }
496}
497
498impl Clone for ReplicationManager {
499    fn clone(&self) -> Self {
500        Self {
501            config: self.config.clone(),
502            node_id: self.node_id.clone(),
503            replication_log: Arc::clone(&self.replication_log),
504            current_lsn: Arc::clone(&self.current_lsn),
505            replica_states: Arc::clone(&self.replica_states),
506            in_sync_replicas: Arc::clone(&self.in_sync_replicas),
507            pending_acks: Arc::clone(&self.pending_acks),
508            replication_tx: self.replication_tx.clone(),
509            stats: Arc::clone(&self.stats),
510        }
511    }
512}
513
514// Add ReplicationError to NetworkError enum
515impl NetworkError {
516    pub fn replication_error(msg: String) -> Self {
517        NetworkError::ReplicationError(msg)
518    }
519}
520
521// NetworkError Display implementation is handled by thiserror derive macro
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526
527    #[tokio::test]
528    async fn test_replication_manager_creation() {
529        let config = ReplicationConfig::default();
530        let node_id = "test-node-1".to_string();
531
532        let manager = ReplicationManager::new(config, node_id).await;
533        assert!(manager.is_ok());
534    }
535
536    #[tokio::test]
537    async fn test_replica_management() {
538        let config = ReplicationConfig::default();
539        let manager = ReplicationManager::new(config, "leader".to_string()).await.unwrap();
540
541        // Add replica
542        let result = manager.add_replica("replica-1".to_string()).await;
543        assert!(result.is_ok());
544
545        // Remove replica
546        let result = manager.remove_replica(&"replica-1".to_string()).await;
547        assert!(result.is_ok());
548    }
549
550    #[tokio::test]
551    async fn test_statistics() {
552        let config = ReplicationConfig::default();
553        let manager = ReplicationManager::new(config, "test-node".to_string()).await.unwrap();
554
555        let stats = manager.get_statistics().await;
556        assert_eq!(stats.total_replicated_entries, 0);
557        assert_eq!(stats.in_sync_replica_count, 0);
558    }
559}