1use crate::{ConversionRequest, ConversionResult, Error, Result};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::{Arc, RwLock};
11use std::time::{Duration, Instant, SystemTime};
12use tokio::sync::{mpsc, RwLock as AsyncRwLock, Semaphore};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct CloudScalingConfig {
17 pub max_nodes: usize,
19 pub min_nodes: usize,
21 pub target_cpu_utilization: f32,
23 pub target_memory_utilization: f32,
25 pub scaling_cooldown: Duration,
27 pub load_balancing_strategy: LoadBalancingStrategy,
29 pub auto_scaling_enabled: bool,
31 pub health_check_interval: Duration,
33 pub request_timeout: Duration,
35 pub max_queue_size: usize,
37 pub retry_config: RetryConfig,
39}
40
41#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
43pub enum LoadBalancingStrategy {
44 RoundRobin,
46 LeastConnections,
48 WeightedRoundRobin,
50 LoadBased,
52 Geographic,
54 Custom,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct RetryConfig {
61 pub max_retries: u32,
63 pub base_delay: Duration,
65 pub max_delay: Duration,
67 pub backoff_multiplier: f32,
69 pub jitter_enabled: bool,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct CloudNode {
76 pub id: String,
78 pub endpoint: String,
80 pub region: String,
82 pub availability_zone: String,
84 pub capacity: f32,
86 pub resource_usage: NodeResourceUsage,
88 pub status: NodeStatus,
90 pub last_health_check: SystemTime,
92 pub capabilities: NodeCapabilities,
94 pub queue_size: usize,
96 pub active_connections: usize,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct NodeResourceUsage {
103 pub cpu_utilization: f32,
105 pub memory_utilization: f32,
107 pub network_utilization: f32,
109 pub storage_utilization: f32,
111 pub gpu_utilization: Option<f32>,
113}
114
115#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
117pub enum NodeStatus {
118 Healthy,
120 Degraded,
122 Unhealthy,
124 Draining,
126 Offline,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct NodeCapabilities {
133 pub supported_models: Vec<String>,
135 pub gpu_acceleration: bool,
137 pub realtime_processing: bool,
139 pub batch_processing: bool,
141 pub max_concurrent_requests: usize,
143 pub memory_capacity_gb: f32,
145 pub cpu_cores: usize,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct DistributedConversionRequest {
152 pub request: ConversionRequest,
154 pub request_id: String,
156 pub priority: RequestPriority,
158 pub geographic_preference: Option<String>,
160 pub required_capabilities: Vec<String>,
162 pub timeout: Duration,
164 pub client_id: Option<String>,
166}
167
168#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
170pub enum RequestPriority {
171 Low = 1,
173 Normal = 2,
175 High = 3,
177 Critical = 4,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct DistributedConversionResult {
184 pub result: ConversionResult,
186 pub request_id: String,
188 pub processing_node: String,
190 pub processing_time_ms: u64,
192 pub queue_time_ms: u64,
194 pub retry_count: u32,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct ClusterMetrics {
201 pub total_nodes: usize,
203 pub healthy_nodes: usize,
205 pub avg_cpu_utilization: f32,
207 pub avg_memory_utilization: f32,
209 pub requests_per_second: f32,
211 pub avg_response_time_ms: f32,
213 pub total_queue_depth: usize,
215 pub error_rate_percent: f32,
217 pub last_scaling_action: Option<SystemTime>,
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct ScalingDecision {
224 pub action: ScalingAction,
226 pub reason: String,
228 pub target_nodes: usize,
230 pub triggering_metrics: ClusterMetrics,
232 pub timestamp: SystemTime,
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
238pub enum ScalingAction {
239 ScaleUp {
241 nodes_to_add: usize,
243 preferred_regions: Vec<String>,
245 },
246 ScaleDown {
248 nodes_to_remove: usize,
250 nodes_to_drain: Vec<String>,
252 },
253 NoAction,
255}
256
257pub struct CloudScalingController {
259 config: CloudScalingConfig,
261 nodes: Arc<RwLock<HashMap<String, CloudNode>>>,
263 routing_state: Arc<RwLock<RoutingState>>,
265 metrics: Arc<RwLock<ClusterMetrics>>,
267 auto_scaler: Arc<AsyncRwLock<AutoScaler>>,
269 health_monitor: Arc<AsyncRwLock<HealthMonitor>>,
271 request_dispatcher: RequestDispatcher,
273}
274
275#[derive(Debug)]
277struct RoutingState {
278 round_robin_counter: usize,
280 selection_history: Vec<String>,
282 geographic_nodes: HashMap<String, Vec<String>>,
284}
285
286struct AutoScaler {
288 last_scaling_time: Instant,
290 scaling_history: Vec<ScalingDecision>,
292 scaling_mode: AutoScalingMode,
294}
295
296#[derive(Debug, Clone)]
298enum AutoScalingMode {
299 Conservative,
301 Balanced,
303 Aggressive,
305 Custom {
307 scale_up_threshold: f32,
308 scale_down_threshold: f32,
309 cooldown_multiplier: f32,
310 },
311}
312
313struct HealthMonitor {
315 health_history: HashMap<String, Vec<HealthCheckResult>>,
317 last_check_time: Instant,
319}
320
321#[derive(Debug, Clone)]
323struct HealthCheckResult {
324 node_id: String,
326 status: NodeStatus,
328 response_time_ms: u32,
330 resource_usage: NodeResourceUsage,
332 timestamp: Instant,
334}
335
336struct RequestDispatcher {
338 pending_requests: Arc<RwLock<HashMap<RequestPriority, Vec<DistributedConversionRequest>>>>,
340 timeout_tracker: Arc<RwLock<HashMap<String, Instant>>>,
342 retry_manager: RetryManager,
344}
345
346struct RetryManager {
348 retry_attempts: Arc<RwLock<HashMap<String, u32>>>,
350 failed_requests: Arc<RwLock<Vec<DistributedConversionRequest>>>,
352}
353
354impl Default for CloudScalingConfig {
355 fn default() -> Self {
356 Self {
357 max_nodes: 100,
358 min_nodes: 2,
359 target_cpu_utilization: 0.7,
360 target_memory_utilization: 0.8,
361 scaling_cooldown: Duration::from_secs(300), load_balancing_strategy: LoadBalancingStrategy::LoadBased,
363 auto_scaling_enabled: true,
364 health_check_interval: Duration::from_secs(30),
365 request_timeout: Duration::from_secs(60),
366 max_queue_size: 1000,
367 retry_config: RetryConfig::default(),
368 }
369 }
370}
371
372impl Default for RetryConfig {
373 fn default() -> Self {
374 Self {
375 max_retries: 3,
376 base_delay: Duration::from_millis(100),
377 max_delay: Duration::from_secs(5),
378 backoff_multiplier: 2.0,
379 jitter_enabled: true,
380 }
381 }
382}
383
384impl CloudScalingController {
385 pub fn new(config: CloudScalingConfig) -> Self {
387 Self {
388 config: config.clone(),
389 nodes: Arc::new(RwLock::new(HashMap::new())),
390 routing_state: Arc::new(RwLock::new(RoutingState {
391 round_robin_counter: 0,
392 selection_history: Vec::new(),
393 geographic_nodes: HashMap::new(),
394 })),
395 metrics: Arc::new(RwLock::new(ClusterMetrics::default())),
396 auto_scaler: Arc::new(AsyncRwLock::new(AutoScaler {
397 last_scaling_time: Instant::now(),
398 scaling_history: Vec::new(),
399 scaling_mode: AutoScalingMode::Balanced,
400 })),
401 health_monitor: Arc::new(AsyncRwLock::new(HealthMonitor {
402 health_history: HashMap::new(),
403 last_check_time: Instant::now(),
404 })),
405 request_dispatcher: RequestDispatcher {
406 pending_requests: Arc::new(RwLock::new(HashMap::new())),
407 timeout_tracker: Arc::new(RwLock::new(HashMap::new())),
408 retry_manager: RetryManager {
409 retry_attempts: Arc::new(RwLock::new(HashMap::new())),
410 failed_requests: Arc::new(RwLock::new(Vec::new())),
411 },
412 },
413 }
414 }
415
416 pub async fn add_node(&self, node: CloudNode) -> Result<()> {
418 {
420 let mut nodes = self
421 .nodes
422 .write()
423 .map_err(|_| Error::runtime("Failed to acquire write lock on nodes".to_string()))?;
424
425 {
427 let mut routing_state = self.routing_state.write().map_err(|_| {
428 Error::runtime("Failed to acquire write lock on routing state".to_string())
429 })?;
430
431 routing_state
432 .geographic_nodes
433 .entry(node.region.clone())
434 .or_default()
435 .push(node.id.clone());
436 }
437
438 nodes.insert(node.id.clone(), node);
439 }
441
442 self.perform_health_check().await?;
444
445 Ok(())
446 }
447
448 pub async fn remove_node(&self, node_id: &str) -> Result<()> {
450 let mut nodes = self
451 .nodes
452 .write()
453 .map_err(|_| Error::runtime("Failed to acquire write lock on nodes".to_string()))?;
454
455 if let Some(node) = nodes.remove(node_id) {
456 let mut routing_state = self.routing_state.write().map_err(|_| {
458 Error::runtime("Failed to acquire write lock on routing state".to_string())
459 })?;
460
461 if let Some(region_nodes) = routing_state.geographic_nodes.get_mut(&node.region) {
462 region_nodes.retain(|id| id != node_id);
463 if region_nodes.is_empty() {
464 routing_state.geographic_nodes.remove(&node.region);
465 }
466 }
467 }
468
469 Ok(())
470 }
471
472 pub async fn process_request(
474 &self,
475 request: DistributedConversionRequest,
476 ) -> Result<DistributedConversionResult> {
477 let start_time = Instant::now();
478
479 let selected_node = self.select_node(&request).await?;
481
482 let queue_time = start_time.elapsed();
483
484 let processing_start = Instant::now();
486 let result = self.execute_on_node(&selected_node, &request).await?;
487 let processing_time = processing_start.elapsed();
488
489 let retry_count = {
491 let retry_attempts = self
492 .request_dispatcher
493 .retry_manager
494 .retry_attempts
495 .read()
496 .map_err(|_| {
497 Error::runtime("Failed to acquire read lock on retry attempts".to_string())
498 })?;
499 retry_attempts
500 .get(&request.request_id)
501 .copied()
502 .unwrap_or(0)
503 };
504
505 Ok(DistributedConversionResult {
506 result,
507 request_id: request.request_id,
508 processing_node: selected_node,
509 processing_time_ms: processing_time.as_millis() as u64,
510 queue_time_ms: queue_time.as_millis() as u64,
511 retry_count,
512 })
513 }
514
515 async fn select_node(&self, request: &DistributedConversionRequest) -> Result<String> {
517 let nodes = self
518 .nodes
519 .read()
520 .map_err(|_| Error::runtime("Failed to acquire read lock on nodes".to_string()))?;
521
522 let healthy_nodes: Vec<&CloudNode> = nodes
523 .values()
524 .filter(|node| node.status == NodeStatus::Healthy)
525 .collect();
526
527 if healthy_nodes.is_empty() {
528 return Err(Error::runtime("No healthy nodes available".to_string()));
529 }
530
531 match self.config.load_balancing_strategy {
532 LoadBalancingStrategy::RoundRobin => self.select_round_robin(&healthy_nodes),
533 LoadBalancingStrategy::LeastConnections => {
534 self.select_least_connections(&healthy_nodes)
535 }
536 LoadBalancingStrategy::WeightedRoundRobin => {
537 self.select_weighted_round_robin(&healthy_nodes)
538 }
539 LoadBalancingStrategy::LoadBased => self.select_load_based(&healthy_nodes),
540 LoadBalancingStrategy::Geographic => self.select_geographic(&healthy_nodes, request),
541 LoadBalancingStrategy::Custom => self.select_custom(&healthy_nodes, request),
542 }
543 }
544
545 fn select_round_robin(&self, nodes: &[&CloudNode]) -> Result<String> {
547 let mut routing_state = self.routing_state.write().map_err(|_| {
548 Error::runtime("Failed to acquire write lock on routing state".to_string())
549 })?;
550
551 let index = routing_state.round_robin_counter % nodes.len();
552 routing_state.round_robin_counter = (routing_state.round_robin_counter + 1) % nodes.len();
553
554 Ok(nodes[index].id.clone())
555 }
556
557 fn select_least_connections(&self, nodes: &[&CloudNode]) -> Result<String> {
559 let min_connections_node = nodes
560 .iter()
561 .min_by_key(|node| node.active_connections)
562 .ok_or_else(|| Error::runtime("No nodes available for selection".to_string()))?;
563
564 Ok(min_connections_node.id.clone())
565 }
566
567 fn select_weighted_round_robin(&self, nodes: &[&CloudNode]) -> Result<String> {
569 let total_capacity: f32 = nodes.iter().map(|node| node.capacity).sum();
570 let mut cumulative_weight = 0.0;
571 let target_weight = fastrand::f32() * total_capacity;
572
573 for node in nodes {
574 cumulative_weight += node.capacity;
575 if cumulative_weight >= target_weight {
576 return Ok(node.id.clone());
577 }
578 }
579
580 Ok(nodes[0].id.clone())
582 }
583
584 fn select_load_based(&self, nodes: &[&CloudNode]) -> Result<String> {
586 let best_node = nodes
587 .iter()
588 .min_by(|a, b| {
589 let load_a =
590 (a.resource_usage.cpu_utilization + a.resource_usage.memory_utilization) / 2.0;
591 let load_b =
592 (b.resource_usage.cpu_utilization + b.resource_usage.memory_utilization) / 2.0;
593 load_a
594 .partial_cmp(&load_b)
595 .unwrap_or(std::cmp::Ordering::Equal)
596 })
597 .ok_or_else(|| Error::runtime("No nodes available for selection".to_string()))?;
598
599 Ok(best_node.id.clone())
600 }
601
602 fn select_geographic(
604 &self,
605 nodes: &[&CloudNode],
606 request: &DistributedConversionRequest,
607 ) -> Result<String> {
608 if let Some(preferred_region) = &request.geographic_preference {
609 let region_nodes: Vec<&CloudNode> = nodes
610 .iter()
611 .filter(|node| &node.region == preferred_region)
612 .copied()
613 .collect();
614
615 if !region_nodes.is_empty() {
616 return self.select_load_based(®ion_nodes);
617 }
618 }
619
620 self.select_load_based(nodes)
622 }
623
624 fn select_custom(
626 &self,
627 nodes: &[&CloudNode],
628 request: &DistributedConversionRequest,
629 ) -> Result<String> {
630 let mut scored_nodes: Vec<(f32, &CloudNode)> = nodes
631 .iter()
632 .map(|node| {
633 let mut score = 0.0;
634
635 let load = (node.resource_usage.cpu_utilization
637 + node.resource_usage.memory_utilization)
638 / 2.0;
639 score += (1.0 - load) * 0.4;
640
641 score += node.capacity * 0.3;
643
644 let queue_factor =
646 1.0 - (node.queue_size as f32 / self.config.max_queue_size as f32).min(1.0);
647 score += queue_factor * 0.2;
648
649 if request.priority == RequestPriority::Critical
651 && node.capabilities.realtime_processing
652 {
653 score += 0.1;
654 }
655
656 (score, *node)
657 })
658 .collect();
659
660 scored_nodes.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
661
662 Ok(scored_nodes[0].1.id.clone())
663 }
664
665 async fn execute_on_node(
667 &self,
668 node_id: &str,
669 request: &DistributedConversionRequest,
670 ) -> Result<ConversionResult> {
671 {
676 let mut nodes = self
677 .nodes
678 .write()
679 .map_err(|_| Error::runtime("Failed to acquire write lock on nodes".to_string()))?;
680
681 if let Some(node) = nodes.get_mut(node_id) {
682 node.active_connections += 1;
683 node.queue_size = node.queue_size.saturating_sub(1);
684 }
685 }
686
687 let processing_delay = match request.priority {
689 RequestPriority::Critical => Duration::from_millis(50),
690 RequestPriority::High => Duration::from_millis(100),
691 RequestPriority::Normal => Duration::from_millis(200),
692 RequestPriority::Low => Duration::from_millis(500),
693 };
694
695 tokio::time::sleep(processing_delay).await;
696
697 Ok(ConversionResult {
699 request_id: request.request_id.clone(),
700 converted_audio: vec![0.0; 1000], output_sample_rate: 22050,
702 quality_metrics: HashMap::new(),
703 artifacts: None,
704 objective_quality: None,
705 processing_time: processing_delay,
706 conversion_type: crate::types::ConversionType::SpeakerConversion,
707 success: true,
708 error_message: None,
709 timestamp: std::time::SystemTime::now(),
710 })
711 }
712
713 async fn perform_health_check(&self) -> Result<()> {
715 let nodes = {
716 let nodes_guard = self
717 .nodes
718 .read()
719 .map_err(|_| Error::runtime("Failed to acquire read lock on nodes".to_string()))?;
720 nodes_guard.clone()
721 };
722
723 let mut health_results = Vec::new();
724
725 for (node_id, node) in &nodes {
726 let health_result = self.check_node_health(node_id, node).await;
727 health_results.push(health_result);
728 }
729
730 {
732 let mut health_monitor = self.health_monitor.write().await;
733 for result in health_results {
734 health_monitor
735 .health_history
736 .entry(result.node_id.clone())
737 .or_insert_with(Vec::new)
738 .push(result);
739 }
740 health_monitor.last_check_time = Instant::now();
741 }
742
743 Ok(())
744 }
745
746 async fn check_node_health(&self, node_id: &str, node: &CloudNode) -> HealthCheckResult {
748 let start_time = Instant::now();
749
750 let simulated_delay = Duration::from_millis(fastrand::u64(10..100));
752 tokio::time::sleep(simulated_delay).await;
753
754 let response_time = start_time.elapsed();
755
756 let status = if response_time > Duration::from_millis(1000) {
758 NodeStatus::Unhealthy
759 } else if node.resource_usage.cpu_utilization > 0.9
760 || node.resource_usage.memory_utilization > 0.95
761 {
762 NodeStatus::Degraded
763 } else {
764 NodeStatus::Healthy
765 };
766
767 HealthCheckResult {
768 node_id: node_id.to_string(),
769 status,
770 response_time_ms: response_time.as_millis() as u32,
771 resource_usage: node.resource_usage.clone(),
772 timestamp: Instant::now(),
773 }
774 }
775
776 pub async fn get_cluster_metrics(&self) -> Result<ClusterMetrics> {
778 let (
779 total_nodes,
780 healthy_nodes,
781 avg_cpu_utilization,
782 avg_memory_utilization,
783 total_queue_depth,
784 ) = {
785 let nodes = self
786 .nodes
787 .read()
788 .map_err(|_| Error::runtime("Failed to acquire read lock on nodes".to_string()))?;
789
790 let total_nodes = nodes.len();
791 let healthy_nodes = nodes
792 .values()
793 .filter(|node| node.status == NodeStatus::Healthy)
794 .count();
795
796 let avg_cpu_utilization = if !nodes.is_empty() {
797 nodes
798 .values()
799 .map(|node| node.resource_usage.cpu_utilization)
800 .sum::<f32>()
801 / nodes.len() as f32
802 } else {
803 0.0
804 };
805
806 let avg_memory_utilization = if !nodes.is_empty() {
807 nodes
808 .values()
809 .map(|node| node.resource_usage.memory_utilization)
810 .sum::<f32>()
811 / nodes.len() as f32
812 } else {
813 0.0
814 };
815
816 let total_queue_depth = nodes.values().map(|node| node.queue_size).sum();
817
818 (
819 total_nodes,
820 healthy_nodes,
821 avg_cpu_utilization,
822 avg_memory_utilization,
823 total_queue_depth,
824 )
825 };
826
827 let auto_scaler = self.auto_scaler.read().await;
828 let last_scaling_action = auto_scaler
829 .scaling_history
830 .last()
831 .map(|decision| decision.timestamp);
832
833 Ok(ClusterMetrics {
834 total_nodes,
835 healthy_nodes,
836 avg_cpu_utilization,
837 avg_memory_utilization,
838 requests_per_second: 0.0, avg_response_time_ms: 0.0, total_queue_depth,
841 error_rate_percent: 0.0, last_scaling_action,
843 })
844 }
845
846 pub async fn evaluate_scaling(&self) -> Result<ScalingDecision> {
848 let metrics = self.get_cluster_metrics().await?;
849 let mut auto_scaler = self.auto_scaler.write().await;
850
851 if auto_scaler.last_scaling_time.elapsed() < self.config.scaling_cooldown {
853 return Ok(ScalingDecision {
854 action: ScalingAction::NoAction,
855 reason: "Scaling cooldown period active".to_string(),
856 target_nodes: metrics.total_nodes,
857 triggering_metrics: metrics,
858 timestamp: SystemTime::now(),
859 });
860 }
861
862 let decision = if metrics.avg_cpu_utilization > self.config.target_cpu_utilization
864 || metrics.avg_memory_utilization > self.config.target_memory_utilization
865 {
866 let nodes_to_add = ((metrics
868 .avg_cpu_utilization
869 .max(metrics.avg_memory_utilization)
870 - 0.5)
871 * 4.0)
872 .ceil() as usize;
873 ScalingDecision {
874 action: ScalingAction::ScaleUp {
875 nodes_to_add,
876 preferred_regions: vec!["us-west-2".to_string(), "us-east-1".to_string()],
877 },
878 reason: format!(
879 "High resource utilization: CPU {:.1}%, Memory {:.1}%",
880 metrics.avg_cpu_utilization * 100.0,
881 metrics.avg_memory_utilization * 100.0
882 ),
883 target_nodes: metrics.total_nodes + nodes_to_add,
884 triggering_metrics: metrics,
885 timestamp: SystemTime::now(),
886 }
887 } else if metrics.total_nodes > self.config.min_nodes
888 && metrics.avg_cpu_utilization < 0.3
889 && metrics.avg_memory_utilization < 0.4
890 {
891 let nodes_to_remove = ((0.5
893 - metrics
894 .avg_cpu_utilization
895 .max(metrics.avg_memory_utilization))
896 * 2.0)
897 .ceil() as usize;
898 ScalingDecision {
899 action: ScalingAction::ScaleDown {
900 nodes_to_remove,
901 nodes_to_drain: Vec::new(), },
903 reason: format!(
904 "Low resource utilization: CPU {:.1}%, Memory {:.1}%",
905 metrics.avg_cpu_utilization * 100.0,
906 metrics.avg_memory_utilization * 100.0
907 ),
908 target_nodes: metrics
909 .total_nodes
910 .saturating_sub(nodes_to_remove)
911 .max(self.config.min_nodes),
912 triggering_metrics: metrics,
913 timestamp: SystemTime::now(),
914 }
915 } else {
916 ScalingDecision {
917 action: ScalingAction::NoAction,
918 reason: "Resource utilization within target thresholds".to_string(),
919 target_nodes: metrics.total_nodes,
920 triggering_metrics: metrics,
921 timestamp: SystemTime::now(),
922 }
923 };
924
925 auto_scaler.scaling_history.push(decision.clone());
927 if !matches!(decision.action, ScalingAction::NoAction) {
928 auto_scaler.last_scaling_time = Instant::now();
929 }
930
931 Ok(decision)
932 }
933
934 pub fn start_background_tasks(controller: Arc<CloudScalingController>) -> Result<()> {
936 if !controller.config.auto_scaling_enabled {
937 return Ok(());
938 }
939
940 let health_controller = Arc::clone(&controller);
942 tokio::spawn(async move {
943 let mut interval =
944 tokio::time::interval(health_controller.config.health_check_interval);
945 loop {
946 interval.tick().await;
947 if let Err(e) = health_controller.perform_health_check().await {
948 eprintln!("Health check failed: {e}");
949 }
950 }
951 });
952
953 let scaling_controller = Arc::clone(&controller);
955 tokio::spawn(async move {
956 let mut interval = tokio::time::interval(Duration::from_secs(60)); loop {
958 interval.tick().await;
959 if let Ok(decision) = scaling_controller.evaluate_scaling().await {
960 if !matches!(decision.action, ScalingAction::NoAction) {
961 println!("Scaling decision: {decision:?}");
962 }
964 }
965 }
966 });
967
968 Ok(())
969 }
970}
971
972impl Default for ClusterMetrics {
973 fn default() -> Self {
974 Self {
975 total_nodes: 0,
976 healthy_nodes: 0,
977 avg_cpu_utilization: 0.0,
978 avg_memory_utilization: 0.0,
979 requests_per_second: 0.0,
980 avg_response_time_ms: 0.0,
981 total_queue_depth: 0,
982 error_rate_percent: 0.0,
983 last_scaling_action: None,
984 }
985 }
986}
987
988#[cfg(test)]
989mod tests {
990 use super::*;
991
992 #[tokio::test]
993 async fn test_cloud_scaling_controller_creation() {
994 let config = CloudScalingConfig::default();
995 let controller = CloudScalingController::new(config);
996
997 let metrics = controller.get_cluster_metrics().await.unwrap();
998 assert_eq!(metrics.total_nodes, 0);
999 assert_eq!(metrics.healthy_nodes, 0);
1000 }
1001
1002 #[test]
1003 fn test_add_node() {
1004 let config = CloudScalingConfig::default();
1005 let controller = CloudScalingController::new(config);
1006
1007 let node = CloudNode {
1008 id: "node-1".to_string(),
1009 endpoint: "192.168.1.100:8080".to_string(),
1010 region: "us-west-2".to_string(),
1011 availability_zone: "us-west-2a".to_string(),
1012 capacity: 1.0,
1013 resource_usage: NodeResourceUsage {
1014 cpu_utilization: 0.5,
1015 memory_utilization: 0.6,
1016 network_utilization: 0.3,
1017 storage_utilization: 0.4,
1018 gpu_utilization: Some(0.2),
1019 },
1020 status: NodeStatus::Healthy,
1021 last_health_check: SystemTime::now(),
1022 capabilities: NodeCapabilities {
1023 supported_models: vec!["voice-conversion-v1".to_string()],
1024 gpu_acceleration: true,
1025 realtime_processing: true,
1026 batch_processing: true,
1027 max_concurrent_requests: 10,
1028 memory_capacity_gb: 8.0,
1029 cpu_cores: 4,
1030 },
1031 queue_size: 0,
1032 active_connections: 0,
1033 };
1034
1035 let nodes = controller.nodes.read().unwrap();
1037 assert_eq!(nodes.len(), 0);
1038 }
1039
1040 #[test]
1041 fn test_retry_config_default() {
1042 let config = RetryConfig::default();
1043 assert_eq!(config.max_retries, 3);
1044 assert_eq!(config.base_delay, Duration::from_millis(100));
1045 assert!(config.jitter_enabled);
1046 }
1047
1048 #[test]
1049 fn test_request_priority_ordering() {
1050 assert!(RequestPriority::Critical > RequestPriority::High);
1051 assert!(RequestPriority::High > RequestPriority::Normal);
1052 assert!(RequestPriority::Normal > RequestPriority::Low);
1053 }
1054
1055 #[test]
1056 fn test_scaling_decision() {
1057 let config = CloudScalingConfig::default();
1058
1059 assert_eq!(config.target_cpu_utilization, 0.7);
1061 assert_eq!(config.target_memory_utilization, 0.8);
1062
1063 let controller = CloudScalingController::new(config.clone());
1064
1065 let high_cpu = 0.9f32;
1067 let high_memory = 0.85f32;
1068
1069 assert!(high_cpu > config.target_cpu_utilization);
1070 assert!(high_memory > config.target_memory_utilization);
1071 }
1072}