1pub mod config;
17pub mod consensus;
18pub mod fault_tolerance;
19pub mod sharding;
20pub mod transport;
21
22pub use config::*;
24pub use consensus::{
25 ConsensusFactory, ConsensusManager, PbftConsensus, RaftConsensus, SimpleMajorityConsensus,
26};
27pub use fault_tolerance::{
28 FaultRecoveryManager, HealthMonitor, HealthSummary, NodeMetrics, RecoveryAction,
29};
30pub use sharding::{DataShard, ShardManager, ShardMigration, ShardingStats};
31
32#[derive(Debug, Clone, Default, Serialize, Deserialize)]
35pub struct DistributedConfig {
36 pub cluster_size: usize,
37 pub node_id: String,
38 pub timeout_ms: u64,
39}
40
41#[derive(Debug, Clone)]
43pub struct DistributedMetricsBuilder {
44 config: DistributedConfig,
45}
46
47impl DistributedMetricsBuilder {
48 pub fn new(config: DistributedConfig) -> Self {
49 Self { config }
50 }
51}
52
53#[derive(Debug)]
55pub struct DistributedMetricsCoordinator {
56 config: DistributedConfig,
57}
58
59impl DistributedMetricsCoordinator {
60 pub fn new(config: DistributedConfig) -> Self {
61 Self { config }
62 }
63}
64
65use crate::error::{MetricsError, Result};
66use serde::{Deserialize, Serialize};
67use std::collections::HashMap;
68use std::sync::{Arc, RwLock};
69use std::time::{Duration, Instant, SystemTime};
70
71pub struct AdvancedDistributedCoordinator {
73 config: AdvancedClusterConfig,
75 #[allow(dead_code)]
77 consensus: Option<Box<dyn ConsensusManager>>,
78 shard_manager: ShardManager,
80 fault_manager: FaultRecoveryManager,
82 cluster_state: Arc<RwLock<ClusterState>>,
84 performance_metrics: Arc<RwLock<ClusterPerformanceMetrics>>,
86 status: CoordinatorStatus,
88}
89
90impl AdvancedDistributedCoordinator {
91 pub fn new(config: AdvancedClusterConfig) -> Result<Self> {
93 let shard_manager = ShardManager::new(config.sharding_config.clone());
95
96 let fault_manager = FaultRecoveryManager::new(config.fault_tolerance.clone());
98
99 let cluster_state = Arc::new(RwLock::new(ClusterState::new()));
101
102 let performance_metrics = Arc::new(RwLock::new(ClusterPerformanceMetrics::new()));
104
105 Ok(Self {
106 config,
107 consensus: None,
108 shard_manager,
109 fault_manager,
110 cluster_state,
111 performance_metrics,
112 status: CoordinatorStatus::Stopped,
113 })
114 }
115
116 pub fn start(&mut self, node_id: String, peers: Vec<String>) -> Result<()> {
118 if self.config.consensus_config.algorithm != ConsensusAlgorithm::None {
120 let consensus = ConsensusFactory::create_consensus(
121 self.config.consensus_config.algorithm.clone(),
122 node_id.clone(),
123 peers.clone(),
124 self.config.consensus_config.clone(),
125 )?;
126 self.consensus = Some(consensus);
127
128 if let Some(ref mut consensus) = self.consensus {
129 consensus.start()?;
130 }
131 }
132
133 self.shard_manager.initialize(peers.clone())?;
135
136 self.fault_manager.start()?;
138
139 {
141 let mut state = self.cluster_state.write().expect("Operation failed");
142 state.local_node_id = node_id;
143 state.cluster_size = peers.len() + 1; state.status = ClusterStatus::Active;
145 state.last_updated = SystemTime::now();
146 }
147
148 self.status = CoordinatorStatus::Running;
149
150 Ok(())
151 }
152
153 pub fn stop(&mut self) -> Result<()> {
155 self.fault_manager.stop()?;
157
158 self.status = CoordinatorStatus::Stopped;
160
161 {
163 let mut state = self.cluster_state.write().expect("Operation failed");
164 state.status = ClusterStatus::Stopped;
165 state.last_updated = SystemTime::now();
166 }
167
168 Ok(())
169 }
170
171 pub fn submit_consensus(&mut self, data: Vec<u8>) -> Result<String> {
173 if let Some(ref mut consensus) = self.consensus {
174 consensus.propose(data)
175 } else {
176 Err(MetricsError::ConsensusError(
177 "Consensus not initialized".to_string(),
178 ))
179 }
180 }
181
182 pub fn get_consensus_state(&self) -> Option<consensus::ConsensusState> {
184 self.consensus.as_ref().map(|c| c.get_state())
185 }
186
187 pub fn find_shard(&self, key: &str) -> Result<String> {
189 self.shard_manager.find_shard(key)
190 }
191
192 pub fn get_node_for_key(&self, key: &str) -> Result<String> {
194 self.shard_manager.get_node_for_key(key)
195 }
196
197 pub fn add_node(&mut self, node_id: String) -> Result<()> {
199 self.shard_manager.add_node(node_id.clone())?;
201
202 let metrics = NodeMetrics::healthy();
204 self.fault_manager.register_node(node_id.clone(), metrics)?;
205
206 {
208 let mut state = self.cluster_state.write().expect("Operation failed");
209 state.cluster_size += 1;
210 state.last_updated = SystemTime::now();
211 }
212
213 Ok(())
214 }
215
216 pub fn remove_node(&mut self, node_id: &str) -> Result<()> {
218 self.shard_manager.remove_node(node_id)?;
220
221 self.fault_manager.unregister_node(node_id)?;
223
224 {
226 let mut state = self.cluster_state.write().expect("Operation failed");
227 state.cluster_size = state.cluster_size.saturating_sub(1);
228 state.last_updated = SystemTime::now();
229 }
230
231 Ok(())
232 }
233
234 pub fn update_node_metrics(&mut self, node_id: &str, metrics: NodeMetrics) -> Result<()> {
236 self.fault_manager.update_node_metrics(node_id, metrics)
237 }
238
239 pub fn get_health_summary(&self) -> HealthSummary {
241 self.fault_manager.get_health_summary()
242 }
243
244 pub fn get_sharding_stats(&self) -> ShardingStats {
246 self.shard_manager.get_stats()
247 }
248
249 pub fn get_cluster_state(&self) -> ClusterState {
251 let state = self.cluster_state.read().expect("Operation failed");
252 state.clone()
253 }
254
255 pub fn get_performance_metrics(&self) -> ClusterPerformanceMetrics {
257 let metrics = self.performance_metrics.read().expect("Operation failed");
258 metrics.clone()
259 }
260
261 pub fn get_status(&self) -> CoordinatorStatus {
263 self.status.clone()
264 }
265
266 pub fn migrate_shard(&mut self, shard_id: &str, target_node: Option<String>) -> Result<String> {
268 self.shard_manager.migrate_shard(shard_id, target_node)
269 }
270
271 pub fn process_recovery_actions(&mut self) -> Result<Vec<RecoveryAction>> {
273 Ok(self.fault_manager.get_recovery_history())
274 }
275
276 pub fn update_performance_metrics(&mut self, metrics: ClusterPerformanceMetrics) {
278 let mut perf_metrics = self.performance_metrics.write().expect("Operation failed");
279 *perf_metrics = metrics;
280 }
281
282 pub fn get_active_recoveries(&self) -> Vec<fault_tolerance::RecoveryOperation> {
284 self.fault_manager.get_active_recoveries()
285 }
286
287 pub fn list_shards(&self) -> Vec<DataShard> {
289 self.shard_manager.list_shards()
290 }
291
292 pub fn get_shard(&self, shard_id: &str) -> Option<DataShard> {
294 self.shard_manager.get_shard(shard_id)
295 }
296
297 pub fn update_shard_stats(
299 &mut self,
300 shard_id: &str,
301 size_bytes: u64,
302 key_count: usize,
303 ) -> Result<()> {
304 self.shard_manager
305 .update_shard_stats(shard_id, size_bytes, key_count)
306 }
307}
308
309#[derive(Debug, Clone, PartialEq)]
311pub enum CoordinatorStatus {
312 Stopped,
314 Starting,
316 Running,
318 Stopping,
320 Error(String),
322}
323
324#[derive(Debug, Clone, Serialize, Deserialize)]
326pub struct ClusterState {
327 pub local_node_id: String,
329 pub cluster_size: usize,
331 pub status: ClusterStatus,
333 pub last_updated: SystemTime,
335 pub nodes: HashMap<String, NodeInfo>,
337 pub active_tasks: usize,
339 pub config_version: u64,
341}
342
343impl ClusterState {
344 pub fn new() -> Self {
346 Self {
347 local_node_id: String::new(),
348 cluster_size: 0,
349 status: ClusterStatus::Stopped,
350 last_updated: SystemTime::now(),
351 nodes: HashMap::new(),
352 active_tasks: 0,
353 config_version: 1,
354 }
355 }
356
357 pub fn add_node(&mut self, node_id: String, info: NodeInfo) {
359 self.nodes.insert(node_id, info);
360 self.last_updated = SystemTime::now();
361 }
362
363 pub fn remove_node(&mut self, node_id: &str) {
365 self.nodes.remove(node_id);
366 self.last_updated = SystemTime::now();
367 }
368
369 pub fn update_node(&mut self, node_id: &str, info: NodeInfo) {
371 if self.nodes.contains_key(node_id) {
372 self.nodes.insert(node_id.to_string(), info);
373 self.last_updated = SystemTime::now();
374 }
375 }
376
377 pub fn healthy_node_count(&self) -> usize {
379 self.nodes
380 .values()
381 .filter(|node| node.status == NodeStatus::Healthy)
382 .count()
383 }
384}
385
386impl Default for ClusterState {
387 fn default() -> Self {
388 Self::new()
389 }
390}
391
392#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
394pub enum ClusterStatus {
395 Stopped,
397 Starting,
399 Active,
401 Degraded,
403 Failed,
405 Maintenance,
407}
408
409#[derive(Debug, Clone, Serialize, Deserialize)]
411pub struct NodeInfo {
412 pub id: String,
414 pub address: Option<String>,
416 pub status: NodeStatus,
418 pub role: NodeRole,
420 pub resources: ResourceInfo,
422 pub last_seen: SystemTime,
424 pub metadata: HashMap<String, String>,
426}
427
428#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
430pub enum NodeStatus {
431 Healthy,
433 Degraded,
435 Failed,
437 Unknown,
439 Maintenance,
441}
442
443#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
445pub enum NodeRole {
446 Master,
448 Worker,
450 Standby,
452 Storage,
454 Compute,
456 Gateway,
458 Mixed(Vec<String>),
460}
461
462#[derive(Debug, Clone, Serialize, Deserialize)]
464pub struct ResourceInfo {
465 pub cpu_cores: f64,
467 pub memory_gb: f64,
469 pub storage_gb: f64,
471 pub network_gbps: f64,
473 pub gpu_info: Option<GpuInfo>,
475 pub custom_resources: HashMap<String, f64>,
477}
478
479#[derive(Debug, Clone, Serialize, Deserialize)]
481pub struct GpuInfo {
482 pub model: String,
484 pub memory_gb: f64,
486 pub cores: usize,
488 pub utilization: f64,
490}
491
492#[derive(Debug, Clone, Serialize, Deserialize)]
494pub struct ClusterPerformanceMetrics {
495 pub throughput: f64,
497 pub latency_ms: f64,
499 pub error_rate: f64,
501 pub resource_utilization: ResourceUtilization,
503 pub network_stats: NetworkStats,
505 pub storage_stats: StorageStats,
507 pub last_updated: SystemTime,
509}
510
511impl ClusterPerformanceMetrics {
512 pub fn new() -> Self {
514 Self {
515 throughput: 0.0,
516 latency_ms: 0.0,
517 error_rate: 0.0,
518 resource_utilization: ResourceUtilization::default(),
519 network_stats: NetworkStats::default(),
520 storage_stats: StorageStats::default(),
521 last_updated: SystemTime::now(),
522 }
523 }
524}
525
526impl Default for ClusterPerformanceMetrics {
527 fn default() -> Self {
528 Self::new()
529 }
530}
531
532#[derive(Debug, Clone, Serialize, Deserialize)]
534pub struct ResourceUtilization {
535 pub cpu_percent: f64,
537 pub memory_percent: f64,
539 pub storage_percent: f64,
541 pub network_percent: f64,
543 pub gpu_percent: Option<f64>,
545}
546
547impl Default for ResourceUtilization {
548 fn default() -> Self {
549 Self {
550 cpu_percent: 0.0,
551 memory_percent: 0.0,
552 storage_percent: 0.0,
553 network_percent: 0.0,
554 gpu_percent: None,
555 }
556 }
557}
558
559#[derive(Debug, Clone, Serialize, Deserialize)]
561pub struct NetworkStats {
562 pub bytes_sent: u64,
564 pub bytes_received: u64,
566 pub packets_sent: u64,
568 pub packets_received: u64,
570 pub errors: u64,
572 pub bandwidth_mbps: f64,
574}
575
576impl Default for NetworkStats {
577 fn default() -> Self {
578 Self {
579 bytes_sent: 0,
580 bytes_received: 0,
581 packets_sent: 0,
582 packets_received: 0,
583 errors: 0,
584 bandwidth_mbps: 0.0,
585 }
586 }
587}
588
589#[derive(Debug, Clone, Serialize, Deserialize)]
591pub struct StorageStats {
592 pub reads: u64,
594 pub writes: u64,
596 pub bytes_read: u64,
598 pub bytes_written: u64,
600 pub errors: u64,
602 pub iops: f64,
604}
605
606impl Default for StorageStats {
607 fn default() -> Self {
608 Self {
609 reads: 0,
610 writes: 0,
611 bytes_read: 0,
612 bytes_written: 0,
613 errors: 0,
614 iops: 0.0,
615 }
616 }
617}
618
619mod duration_serde {
621 use serde::{Deserialize, Deserializer, Serializer};
622 use std::time::Duration;
623
624 pub fn serialize<S>(duration: &Duration, serializer: S) -> std::result::Result<S::Ok, S::Error>
625 where
626 S: Serializer,
627 {
628 serializer.serialize_u64(duration.as_millis() as u64)
629 }
630
631 pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<Duration, D::Error>
632 where
633 D: Deserializer<'de>,
634 {
635 let millis = u64::deserialize(deserializer)?;
636 Ok(Duration::from_millis(millis))
637 }
638}
639
640#[derive(Debug, Clone)]
642pub struct MetricsCollector {
643 }
645
646impl MetricsCollector {
647 pub fn new() -> Self {
648 Self {}
649 }
650}
651
652impl Default for MetricsCollector {
653 fn default() -> Self {
654 Self::new()
655 }
656}
657
658#[cfg(test)]
659mod tests {
660 use super::*;
661
662 #[test]
663 fn test_advanced_cluster_config_creation() {
664 let config = AdvancedClusterConfig::default();
665 assert!(config.consensus_config.quorum_size > 0);
666 assert!(config.sharding_config.shard_count > 0);
667 assert!(config.optimization_config.enabled);
668 }
669
670 #[test]
671 fn test_distributed_coordinator_creation() {
672 let config = AdvancedClusterConfig::default();
673 let coordinator = AdvancedDistributedCoordinator::new(config);
674 assert!(coordinator.is_ok());
675
676 let coordinator = coordinator.expect("Operation failed");
677 assert_eq!(coordinator.get_status(), CoordinatorStatus::Stopped);
678 }
679
680 #[test]
681 fn test_cluster_state_operations() {
682 let mut state = ClusterState::new();
683 assert_eq!(state.cluster_size, 0);
684
685 let node_info = NodeInfo {
686 id: "node1".to_string(),
687 address: Some("localhost:8080".to_string()),
688 status: NodeStatus::Healthy,
689 role: NodeRole::Worker,
690 resources: ResourceInfo {
691 cpu_cores: 4.0,
692 memory_gb: 16.0,
693 storage_gb: 100.0,
694 network_gbps: 1.0,
695 gpu_info: None,
696 custom_resources: HashMap::new(),
697 },
698 last_seen: SystemTime::now(),
699 metadata: HashMap::new(),
700 };
701
702 state.add_node("node1".to_string(), node_info);
703 assert_eq!(state.nodes.len(), 1);
704 assert_eq!(state.healthy_node_count(), 1);
705 }
706
707 #[test]
708 fn test_cluster_performance_metrics() {
709 let metrics = ClusterPerformanceMetrics::new();
710 assert_eq!(metrics.throughput, 0.0);
711 assert_eq!(metrics.error_rate, 0.0);
712 }
713
714 #[test]
715 fn test_coordinator_start_stop() {
716 let config = AdvancedClusterConfig::default();
717 let mut coordinator =
718 AdvancedDistributedCoordinator::new(config).expect("Operation failed");
719
720 let nodes = vec!["node1".to_string(), "node2".to_string()];
721 coordinator
722 .start("node0".to_string(), nodes)
723 .expect("Operation failed");
724 assert_eq!(coordinator.get_status(), CoordinatorStatus::Running);
725
726 coordinator.stop().expect("Operation failed");
727 assert_eq!(coordinator.get_status(), CoordinatorStatus::Stopped);
728 }
729}