1use crate::*;
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::{RwLock, mpsc};
11use tokio::time::{self, MissedTickBehavior};
12
13pub struct ReplicationManager {
15 cluster_state: Arc<ClusterState>,
17 config: ReplicationConfig,
19 replication_queues: Arc<RwLock<HashMap<NodeId, ReplicationQueue>>>,
21 status: Arc<RwLock<ReplicationStatus>>,
23 command_tx: mpsc::Sender<ReplicationCommand>,
25 command_rx: mpsc::Receiver<ReplicationCommand>,
26}
27
28impl ReplicationManager {
29 pub fn new(cluster_state: Arc<ClusterState>, config: ReplicationConfig) -> Self {
31 let (command_tx, command_rx) = mpsc::channel(1000);
32
33 Self {
34 cluster_state,
35 config,
36 replication_queues: Arc::new(RwLock::new(HashMap::new())),
37 status: Arc::new(RwLock::new(ReplicationStatus::new())),
38 command_tx,
39 command_rx,
40 }
41 }
42
43 pub async fn start(&mut self) -> Result<(), ReplicationError> {
45 Ok(())
48 }
49
50 pub async fn replicate_operation(&self, operation: Operation, primary_node: &NodeId) -> Result<(), ReplicationError> {
52 let mut queues = self.replication_queues.write().await;
53
54 let key = self.get_operation_key(&operation);
56 let replica_nodes = self.cluster_state.get_nodes_for_key(&key).await;
57
58 for node_id in replica_nodes {
60 if &node_id != primary_node {
61 let queue = queues.entry(node_id.clone()).or_insert_with(ReplicationQueue::new);
62 queue.push(ReplicationItem {
63 operation: operation.clone(),
64 timestamp: Instant::now(),
65 retries: 0,
66 });
67 }
68 }
69
70 Ok(())
71 }
72
73 pub async fn sync_with_node(&self, node_id: &NodeId) -> Result<(), ReplicationError> {
75 let config = self.cluster_state.config.read().await;
76 let partitions = config.nodes.get(node_id)
77 .map(|info| info.partitions.clone())
78 .unwrap_or_default();
79
80 println!("Synchronizing {} partitions with node {}", partitions.len(), node_id.0);
82
83 Ok(())
84 }
85
86 pub async fn handle_node_failure(&self, failed_node: &NodeId) -> Result<(), ReplicationError> {
88 println!("Handling failure of node {}", failed_node.0);
89
90 let partitions = self.cluster_state.get_partitions_for_node(failed_node).await;
92
93 for partition in partitions {
95 self.redistribute_partition(&partition).await?;
96 }
97
98 let mut status = self.status.write().await;
100 status.mark_node_failed(failed_node);
101
102 Ok(())
103 }
104
105 pub async fn check_health(&self) -> ReplicationHealth {
107 let status = self.status.read().await;
108 let queues = self.replication_queues.read().await;
109
110 let total_queued = queues.values().map(|q| q.len()).sum();
111 let failed_nodes = status.failed_nodes.len();
112
113 ReplicationHealth {
114 total_queued_operations: total_queued,
115 failed_nodes_count: failed_nodes,
116 replication_lag: status.get_average_lag(),
117 is_healthy: failed_nodes == 0 && total_queued < 1000, }
119 }
120
121 pub async fn get_status(&self) -> ReplicationStatusSnapshot {
123 let status = self.status.read().await;
124 let queues = self.replication_queues.read().await;
125
126 let node_statuses: HashMap<NodeId, NodeReplicationStatus> = queues.iter()
127 .map(|(node_id, queue)| {
128 let node_status = status.node_status.get(node_id)
129 .cloned()
130 .unwrap_or_default();
131
132 (node_id.clone(), NodeReplicationStatus {
133 queued_operations: queue.len(),
134 last_sync: node_status.last_sync,
135 replication_lag: node_status.replication_lag,
136 is_synced: node_status.is_synced,
137 })
138 })
139 .collect();
140
141 ReplicationStatusSnapshot {
142 node_statuses,
143 total_operations_processed: status.total_operations_processed,
144 total_operations_failed: status.total_operations_failed,
145 }
146 }
147
148 async fn start_status_monitor(&self, cluster_state: Arc<ClusterState>) -> Result<(), ReplicationError> {
151 let status = Arc::clone(&self.status);
152 let queues = Arc::clone(&self.replication_queues);
153 let config = self.config.clone();
154
155 tokio::spawn(async move {
156 let mut interval = time::interval(config.status_check_interval);
157 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
158
159 loop {
160 interval.tick().await;
161
162 let mut status = status.write().await;
163 let queues = queues.read().await;
164
165 for (node_id, queue) in queues.iter() {
167 if let Some(oldest) = queue.oldest_item() {
168 let lag = oldest.timestamp.elapsed();
169 status.update_node_lag(node_id, lag);
170 }
171 }
172
173 let timeout = config.node_failure_timeout;
175 let mut failed_nodes = Vec::new();
176
177 for (node_id, node_status) in &status.node_status {
178 if let Some(last_sync) = node_status.last_sync {
179 if last_sync.elapsed() > timeout {
180 failed_nodes.push(node_id.clone());
181 }
182 }
183 }
184
185 for node_id in failed_nodes {
186 status.mark_node_failed(&node_id);
187 println!("Node {} marked as failed due to timeout", node_id.0);
188 }
189 }
190 });
191
192 Ok(())
193 }
194
195 async fn start_queue_processor(&self, cluster_state: Arc<ClusterState>) -> Result<(), ReplicationError> {
196 let queues = Arc::clone(&self.replication_queues);
197 let status = Arc::clone(&self.status);
198 let config = self.config.clone();
199
200 tokio::spawn(async move {
201 loop {
202 let mut queues = queues.write().await;
203 let mut status = status.write().await;
204
205 for (node_id, queue) in queues.iter_mut() {
207 while let Some(item) = queue.pop() {
208 match Self::replicate_item_to_node(&item, node_id, &config).await {
209 Ok(_) => {
210 status.record_operation_success(node_id);
211 }
212 Err(e) => {
213 status.record_operation_failure(node_id);
214 println!("Replication failed for node {}: {}", node_id.0, e);
215
216 if item.retries < config.max_retries {
218 let mut retry_item = item;
219 retry_item.retries += 1;
220 queue.push(retry_item);
221 }
222 }
223 }
224 }
225 }
226
227 tokio::time::sleep(config.queue_processing_interval).await;
229 }
230 });
231
232 Ok(())
233 }
234
235 async fn start_sync_scheduler(&self, cluster_state: Arc<ClusterState>) -> Result<(), ReplicationError> {
236 let queues = Arc::clone(&self.replication_queues);
237 let config = self.config.clone();
238
239 tokio::spawn(async move {
240 let mut interval = time::interval(config.full_sync_interval);
241 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
242
243 loop {
244 interval.tick().await;
245
246 let queues = queues.read().await;
248
249 for node_id in queues.keys() {
250 println!("Scheduling full sync for node {}", node_id.0);
253 }
254 }
255 });
256
257 Ok(())
258 }
259
260 async fn replicate_item_to_node(
261 item: &ReplicationItem,
262 node_id: &NodeId,
263 config: &ReplicationConfig,
264 ) -> Result<(), ReplicationError> {
265 tokio::time::sleep(Duration::from_millis(10)).await;
268
269 if rand::random::<f32>() < config.failure_rate {
271 return Err(ReplicationError::NetworkError("Simulated failure".to_string()));
272 }
273
274 println!("Replicated operation to node {}", node_id.0);
275 Ok(())
276 }
277
278 async fn redistribute_partition(&self, partition: &PartitionId) -> Result<(), ReplicationError> {
279 let new_nodes = self.cluster_state.get_nodes_for_key(&partition.0.to_le_bytes()).await;
281
282 println!("Redistributing partition {} to {} nodes", partition.0, new_nodes.len());
283
284 Ok(())
291 }
292
293 fn get_operation_key(&self, operation: &Operation) -> Vec<u8> {
294 match operation {
296 Operation::CreateNode { .. } => {
297 b"default".to_vec()
299 }
300 Operation::UpdateNode { cid, .. } | Operation::DeleteNode { cid } => {
301 cid.as_bytes().to_vec()
302 }
303 Operation::CreateEdge { source_cid, .. } => {
304 source_cid.as_bytes().to_vec()
305 }
306 Operation::UpdateEdge { cid, .. } | Operation::DeleteEdge { cid } => {
307 cid.as_bytes().to_vec()
308 }
309 }
310 }
311
312 pub fn command_sender(&self) -> mpsc::Sender<ReplicationCommand> {
314 self.command_tx.clone()
315 }
316}
317
318#[derive(Debug, Clone)]
320pub struct ReplicationConfig {
321 pub replication_factor: usize,
323 pub max_retries: usize,
325 pub status_check_interval: Duration,
327 pub queue_processing_interval: Duration,
329 pub full_sync_interval: Duration,
331 pub node_failure_timeout: Duration,
333 pub failure_rate: f32,
335}
336
337impl Default for ReplicationConfig {
338 fn default() -> Self {
339 Self {
340 replication_factor: 3,
341 max_retries: 3,
342 status_check_interval: Duration::from_secs(5),
343 queue_processing_interval: Duration::from_millis(100),
344 full_sync_interval: Duration::from_secs(300), node_failure_timeout: Duration::from_secs(30),
346 failure_rate: 0.01, }
348 }
349}
350
351#[derive(Debug)]
353pub struct ReplicationQueue {
354 items: Vec<ReplicationItem>,
355}
356
357impl ReplicationQueue {
358 pub fn new() -> Self {
359 Self {
360 items: Vec::new(),
361 }
362 }
363
364 pub fn push(&mut self, item: ReplicationItem) {
365 self.items.push(item);
366 }
367
368 pub fn pop(&mut self) -> Option<ReplicationItem> {
369 if !self.items.is_empty() {
370 Some(self.items.remove(0))
371 } else {
372 None
373 }
374 }
375
376 pub fn len(&self) -> usize {
377 self.items.len()
378 }
379
380 pub fn oldest_item(&self) -> Option<&ReplicationItem> {
381 self.items.first()
382 }
383}
384
385#[derive(Debug, Clone)]
387pub struct ReplicationItem {
388 pub operation: Operation,
389 pub timestamp: Instant,
390 pub retries: usize,
391}
392
393#[derive(Debug)]
395pub struct ReplicationStatus {
396 pub node_status: HashMap<NodeId, NodeStatus>,
397 pub failed_nodes: HashSet<NodeId>,
398 pub total_operations_processed: u64,
399 pub total_operations_failed: u64,
400}
401
402impl ReplicationStatus {
403 pub fn new() -> Self {
404 Self {
405 node_status: HashMap::new(),
406 failed_nodes: HashSet::new(),
407 total_operations_processed: 0,
408 total_operations_failed: 0,
409 }
410 }
411
412 pub fn update_node_lag(&mut self, node_id: &NodeId, lag: Duration) {
413 let status = self.node_status.entry(node_id.clone()).or_default();
414 status.replication_lag = lag;
415 status.last_sync = Some(Instant::now());
416 status.is_synced = lag < Duration::from_secs(5); }
418
419 pub fn mark_node_failed(&mut self, node_id: &NodeId) {
420 self.failed_nodes.insert(node_id.clone());
421 }
422
423 pub fn record_operation_success(&mut self, node_id: &NodeId) {
424 let status = self.node_status.entry(node_id.clone()).or_default();
425 status.last_sync = Some(Instant::now());
426 self.total_operations_processed += 1;
427 }
428
429 pub fn record_operation_failure(&mut self, node_id: &NodeId) {
430 self.total_operations_failed += 1;
431 }
433
434 pub fn get_average_lag(&self) -> Duration {
435 if self.node_status.is_empty() {
436 return Duration::from_secs(0);
437 }
438
439 let total_lag: Duration = self.node_status.values()
440 .map(|status| status.replication_lag)
441 .sum();
442
443 total_lag / self.node_status.len() as u32
444 }
445}
446
447#[derive(Debug, Clone, Default)]
449pub struct NodeStatus {
450 pub last_sync: Option<Instant>,
451 pub replication_lag: Duration,
452 pub is_synced: bool,
453}
454
455#[derive(Debug, Clone)]
457pub struct ReplicationHealth {
458 pub total_queued_operations: usize,
459 pub failed_nodes_count: usize,
460 pub replication_lag: Duration,
461 pub is_healthy: bool,
462}
463
464#[derive(Debug, Clone)]
466pub struct ReplicationStatusSnapshot {
467 pub node_statuses: HashMap<NodeId, NodeReplicationStatus>,
468 pub total_operations_processed: u64,
469 pub total_operations_failed: u64,
470}
471
472#[derive(Debug, Clone)]
474pub struct NodeReplicationStatus {
475 pub queued_operations: usize,
476 pub last_sync: Option<Instant>,
477 pub replication_lag: Duration,
478 pub is_synced: bool,
479}
480
481#[derive(Debug)]
483pub enum ReplicationCommand {
484 SyncWithNode { node_id: NodeId },
485 RedistributePartition { partition_id: PartitionId },
486 CheckHealth,
487}
488
489#[derive(Debug, thiserror::Error)]
491pub enum ReplicationError {
492 #[error("Network communication error: {0}")]
493 NetworkError(String),
494
495 #[error("Serialization error: {0}")]
496 Serialization(String),
497
498 #[error("Node failure: {0}")]
499 NodeFailure(String),
500
501 #[error("Partition error: {0}")]
502 PartitionError(String),
503
504 #[error("Timeout error")]
505 Timeout,
506
507 #[error("Configuration error: {0}")]
508 ConfigError(String),
509}
510
511#[cfg(test)]
512mod tests {
513 use super::*;
514
515 #[test]
516 fn test_replication_queue() {
517 let mut queue = ReplicationQueue::new();
518
519 let item = ReplicationItem {
520 operation: Operation::CreateNode {
521 properties: HashMap::new(),
522 },
523 timestamp: Instant::now(),
524 retries: 0,
525 };
526
527 queue.push(item);
528 assert_eq!(queue.len(), 1);
529
530 let popped = queue.pop();
531 assert!(popped.is_some());
532 assert_eq!(queue.len(), 0);
533 }
534
535 #[test]
536 fn test_replication_status() {
537 let mut status = ReplicationStatus::new();
538 let node_id = NodeId("test-node".to_string());
539
540 status.record_operation_success(&node_id);
541 status.record_operation_failure(&node_id);
542
543 assert_eq!(status.total_operations_processed, 1);
544 assert_eq!(status.total_operations_failed, 1);
545 }
546
547 #[tokio::test]
548 async fn test_replication_manager_creation() {
549 let cluster_state = Arc::new(ClusterState::new(NodeId("test".to_string())));
550 let config = ReplicationConfig::default();
551
552 let manager = ReplicationManager::new(cluster_state, config);
553
554 let health = manager.check_health().await;
555 assert!(health.is_healthy); }
557}