1use crate::cleanroom::{HealthStatus, ServiceHandle};
10use crate::error::{CleanroomError, Result};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tracing::{debug, info, warn};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct ServiceMetrics {
19 pub service_id: String,
21 pub service_name: String,
23 pub cpu_usage: f64,
25 pub memory_usage: f64,
27 pub network_io: f64,
29 pub active_connections: u32,
31 pub request_rate: f64,
33 pub response_time_ms: f64,
35 pub error_rate: f64,
37 pub timestamp: u64,
39}
40
41impl ServiceMetrics {
42 pub fn new(service_id: String, service_name: String) -> Self {
44 Self {
45 service_id,
46 service_name,
47 cpu_usage: 0.0,
48 memory_usage: 0.0,
49 network_io: 0.0,
50 active_connections: 0,
51 request_rate: 0.0,
52 response_time_ms: 0.0,
53 error_rate: 0.0,
54 timestamp: SystemTime::now()
55 .duration_since(UNIX_EPOCH)
56 .unwrap_or_default()
57 .as_secs(),
58 }
59 }
60
61 pub fn health_score(&self) -> f64 {
63 let cpu_score = (100.0 - self.cpu_usage).max(0.0);
64 let memory_score = (100.0 - (self.memory_usage / 10.24)).max(0.0); let error_score = (1.0 - self.error_rate) * 100.0;
66 let response_score = (1000.0 / (self.response_time_ms + 1.0)).min(100.0);
67
68 cpu_score * 0.3 + memory_score * 0.3 + error_score * 0.2 + response_score * 0.2
69 }
70}
71
72#[derive(Debug, Clone)]
74pub struct MetricsHistory {
75 pub service_id: String,
77 pub history: Vec<ServiceMetrics>,
79 max_size: usize,
81}
82
83impl MetricsHistory {
84 pub fn new(service_id: String) -> Self {
86 Self {
87 service_id,
88 history: Vec::new(),
89 max_size: 1000,
90 }
91 }
92
93 pub fn add_metrics(&mut self, metrics: ServiceMetrics) {
95 self.history.push(metrics);
96 if self.history.len() > self.max_size {
97 self.history.remove(0);
98 }
99 }
100
101 pub fn average_metrics(&self, last_n: usize) -> Option<ServiceMetrics> {
103 if self.history.is_empty() {
104 return None;
105 }
106
107 let n = last_n.min(self.history.len());
108 let slice = &self.history[self.history.len() - n..];
109
110 let avg_cpu = slice.iter().map(|m| m.cpu_usage).sum::<f64>() / n as f64;
111 let avg_memory = slice.iter().map(|m| m.memory_usage).sum::<f64>() / n as f64;
112 let avg_network = slice.iter().map(|m| m.network_io).sum::<f64>() / n as f64;
113 let avg_connections = slice.iter().map(|m| m.active_connections).sum::<u32>() / n as u32;
114 let avg_request_rate = slice.iter().map(|m| m.request_rate).sum::<f64>() / n as f64;
115 let avg_response_time = slice.iter().map(|m| m.response_time_ms).sum::<f64>() / n as f64;
116 let avg_error_rate = slice.iter().map(|m| m.error_rate).sum::<f64>() / n as f64;
117
118 Some(ServiceMetrics {
119 service_id: self.service_id.clone(),
120 service_name: self.history.last()?.service_name.clone(),
121 cpu_usage: avg_cpu,
122 memory_usage: avg_memory,
123 network_io: avg_network,
124 active_connections: avg_connections,
125 request_rate: avg_request_rate,
126 response_time_ms: avg_response_time,
127 error_rate: avg_error_rate,
128 timestamp: SystemTime::now()
129 .duration_since(UNIX_EPOCH)
130 .unwrap_or_default()
131 .as_secs(),
132 })
133 }
134
135 pub fn predict_load(&self, horizon_minutes: u32) -> Option<ServiceMetrics> {
137 if self.history.len() < 3 {
138 return None;
139 }
140
141 let alpha = 0.3; let mut ema_cpu = self.history[0].cpu_usage;
144 let mut ema_memory = self.history[0].memory_usage;
145 let mut ema_request_rate = self.history[0].request_rate;
146
147 for metrics in &self.history[1..] {
148 ema_cpu = alpha * metrics.cpu_usage + (1.0 - alpha) * ema_cpu;
149 ema_memory = alpha * metrics.memory_usage + (1.0 - alpha) * ema_memory;
150 ema_request_rate = alpha * metrics.request_rate + (1.0 - alpha) * ema_request_rate;
151 }
152
153 let recent = self.average_metrics(5)?;
155 let older = self.average_metrics(20)?;
156
157 let cpu_trend = (recent.cpu_usage - older.cpu_usage) / older.cpu_usage.max(1.0);
158 let memory_trend = (recent.memory_usage - older.memory_usage) / older.memory_usage.max(1.0);
159 let request_trend =
160 (recent.request_rate - older.request_rate) / older.request_rate.max(1.0);
161
162 Some(ServiceMetrics {
163 service_id: self.service_id.clone(),
164 service_name: recent.service_name.clone(),
165 cpu_usage: (ema_cpu * (1.0 + cpu_trend)).min(100.0),
166 memory_usage: ema_memory * (1.0 + memory_trend),
167 network_io: recent.network_io,
168 active_connections: recent.active_connections,
169 request_rate: ema_request_rate * (1.0 + request_trend),
170 response_time_ms: recent.response_time_ms,
171 error_rate: recent.error_rate,
172 timestamp: SystemTime::now()
173 .duration_since(UNIX_EPOCH)
174 .unwrap_or_default()
175 .as_secs()
176 + (horizon_minutes as u64 * 60),
177 })
178 }
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct AutoScaleConfig {
184 pub min_instances: u32,
186 pub max_instances: u32,
188 pub cpu_scale_up_threshold: f64,
190 pub cpu_scale_down_threshold: f64,
192 pub memory_scale_up_threshold: f64,
194 pub memory_scale_down_threshold: f64,
196 pub request_rate_scale_up_threshold: f64,
198 pub cooldown_seconds: u64,
200}
201
202impl Default for AutoScaleConfig {
203 fn default() -> Self {
204 Self {
205 min_instances: 1,
206 max_instances: 10,
207 cpu_scale_up_threshold: 70.0,
208 cpu_scale_down_threshold: 30.0,
209 memory_scale_up_threshold: 512.0,
210 memory_scale_down_threshold: 128.0,
211 request_rate_scale_up_threshold: 100.0,
212 cooldown_seconds: 60,
213 }
214 }
215}
216
217#[derive(Debug, Clone, PartialEq)]
219pub enum ScalingAction {
220 ScaleUp(u32),
222 ScaleDown(u32),
224 NoAction,
226}
227
228#[derive(Debug, Clone)]
230pub struct ResourcePool {
231 pub service_name: String,
233 pub available: Vec<ServiceHandle>,
235 pub in_use: Vec<ServiceHandle>,
237 pub max_size: usize,
239}
240
241impl ResourcePool {
242 pub fn new(service_name: String, max_size: usize) -> Self {
244 Self {
245 service_name,
246 available: Vec::new(),
247 in_use: Vec::new(),
248 max_size,
249 }
250 }
251
252 pub fn acquire(&mut self) -> Option<ServiceHandle> {
254 if let Some(handle) = self.available.pop() {
255 self.in_use.push(handle.clone());
256 Some(handle)
257 } else {
258 None
259 }
260 }
261
262 pub fn release(&mut self, handle: ServiceHandle) {
264 if let Some(pos) = self.in_use.iter().position(|h| h.id == handle.id) {
265 self.in_use.remove(pos);
266 if self.available.len() < self.max_size {
267 self.available.push(handle);
268 }
269 }
270 }
271
272 pub fn utilization(&self) -> f64 {
274 let total = self.available.len() + self.in_use.len();
275 if total == 0 {
276 0.0
277 } else {
278 self.in_use.len() as f64 / total as f64
279 }
280 }
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize)]
285pub struct CostRecommendation {
286 pub service_id: String,
288 pub service_name: String,
290 pub recommendation_type: String,
292 pub description: String,
294 pub estimated_savings: f64,
296 pub priority: u32,
298}
299
300pub struct ServiceManager {
302 metrics_history: HashMap<String, MetricsHistory>,
304 auto_scale_configs: HashMap<String, AutoScaleConfig>,
306 pub service_instances: HashMap<String, u32>,
308 last_scaling_action: HashMap<String, u64>,
310 pub resource_pools: HashMap<String, ResourcePool>,
312}
313
314impl ServiceManager {
315 pub fn new() -> Self {
317 Self {
318 metrics_history: HashMap::new(),
319 auto_scale_configs: HashMap::new(),
320 service_instances: HashMap::new(),
321 last_scaling_action: HashMap::new(),
322 resource_pools: HashMap::new(),
323 }
324 }
325
326 pub fn record_metrics(&mut self, metrics: ServiceMetrics) {
328 let service_id = metrics.service_id.clone();
329
330 self.metrics_history
331 .entry(service_id)
332 .or_insert_with(|| MetricsHistory::new(metrics.service_id.clone()))
333 .add_metrics(metrics);
334 }
335
336 pub fn set_auto_scale_config(&mut self, service_id: String, config: AutoScaleConfig) {
338 self.auto_scale_configs.insert(service_id, config);
339 }
340
341 pub fn predict_load(&self, service_id: &str, horizon_minutes: u32) -> Option<ServiceMetrics> {
343 self.metrics_history
344 .get(service_id)
345 .and_then(|history| history.predict_load(horizon_minutes))
346 }
347
348 pub fn determine_scaling_action(&mut self, service_id: &str) -> Result<ScalingAction> {
350 let config = self.auto_scale_configs.get(service_id).ok_or_else(|| {
351 CleanroomError::internal_error("No auto-scale config found")
352 .with_context(format!("Service: {}", service_id))
353 })?;
354
355 let now = SystemTime::now()
357 .duration_since(UNIX_EPOCH)
358 .unwrap_or_default()
359 .as_secs();
360
361 if let Some(&last_action) = self.last_scaling_action.get(service_id) {
362 if now - last_action < config.cooldown_seconds {
363 debug!(
364 "Scaling action in cooldown period for service: {}",
365 service_id
366 );
367 return Ok(ScalingAction::NoAction);
368 }
369 }
370
371 let history = self.metrics_history.get(service_id).ok_or_else(|| {
373 CleanroomError::internal_error("No metrics history found")
374 .with_context(format!("Service: {}", service_id))
375 })?;
376
377 let current = history
378 .average_metrics(5)
379 .ok_or_else(|| CleanroomError::internal_error("Insufficient metrics data"))?;
380
381 let predicted = history.predict_load(5).unwrap_or_else(|| current.clone());
382
383 let current_instances = *self.service_instances.get(service_id).unwrap_or(&1);
384
385 let max_cpu = current.cpu_usage.max(predicted.cpu_usage);
387 let max_memory = current.memory_usage.max(predicted.memory_usage);
388 let max_request_rate = current.request_rate.max(predicted.request_rate);
389
390 if max_cpu > config.cpu_scale_up_threshold
391 || max_memory > config.memory_scale_up_threshold
392 || max_request_rate > config.request_rate_scale_up_threshold
393 {
394 if current_instances < config.max_instances {
395 let scale_up = ((max_cpu / config.cpu_scale_up_threshold).ceil() as u32)
396 .min(config.max_instances - current_instances);
397
398 self.last_scaling_action.insert(service_id.to_string(), now);
399 info!(
400 "Scaling up service {} by {} instances",
401 service_id, scale_up
402 );
403 return Ok(ScalingAction::ScaleUp(scale_up));
404 }
405 } else if max_cpu < config.cpu_scale_down_threshold
406 && max_memory < config.memory_scale_down_threshold
407 && current_instances > config.min_instances
408 {
409 let scale_down = 1.min(current_instances - config.min_instances);
410 self.last_scaling_action.insert(service_id.to_string(), now);
411 info!(
412 "Scaling down service {} by {} instances",
413 service_id, scale_down
414 );
415 return Ok(ScalingAction::ScaleDown(scale_down));
416 }
417
418 Ok(ScalingAction::NoAction)
419 }
420
421 pub fn update_instance_count(&mut self, service_id: String, count: u32) {
423 self.service_instances.insert(service_id, count);
424 }
425
426 pub fn predict_service_health(&self, service_id: &str) -> Result<HealthStatus> {
428 let history = self.metrics_history.get(service_id).ok_or_else(|| {
429 CleanroomError::internal_error("No metrics history found")
430 .with_context(format!("Service: {}", service_id))
431 })?;
432
433 let predicted = history.predict_load(5);
434
435 if let Some(metrics) = predicted {
436 let health_score = metrics.health_score();
437
438 if health_score > 70.0 {
439 Ok(HealthStatus::Healthy)
440 } else if health_score > 40.0 {
441 warn!(
442 "Service {} predicted to be degraded (score: {})",
443 service_id, health_score
444 );
445 Ok(HealthStatus::Unknown)
446 } else {
447 warn!(
448 "Service {} predicted to be unhealthy (score: {})",
449 service_id, health_score
450 );
451 Ok(HealthStatus::Unhealthy)
452 }
453 } else {
454 Ok(HealthStatus::Unknown)
455 }
456 }
457
458 pub fn get_or_create_pool(
460 &mut self,
461 service_name: String,
462 max_size: usize,
463 ) -> &mut ResourcePool {
464 self.resource_pools
465 .entry(service_name.clone())
466 .or_insert_with(|| ResourcePool::new(service_name, max_size))
467 }
468
469 pub fn generate_cost_recommendations(&self, service_id: &str) -> Vec<CostRecommendation> {
471 let mut recommendations = Vec::new();
472
473 if let Some(history) = self.metrics_history.get(service_id) {
474 if let Some(avg_metrics) = history.average_metrics(100) {
475 let service_name = avg_metrics.service_name.clone();
476
477 if avg_metrics.cpu_usage < 20.0 && avg_metrics.memory_usage < 100.0 {
479 recommendations.push(CostRecommendation {
480 service_id: service_id.to_string(),
481 service_name: service_name.clone(),
482 recommendation_type: "Downsize".to_string(),
483 description: "Service is significantly under-utilized. Consider reducing instance size.".to_string(),
484 estimated_savings: 30.0,
485 priority: 5,
486 });
487 }
488
489 if avg_metrics.error_rate > 0.05 {
491 recommendations.push(CostRecommendation {
492 service_id: service_id.to_string(),
493 service_name: service_name.clone(),
494 recommendation_type: "Optimize".to_string(),
495 description:
496 "High error rate detected. Investigate and fix to reduce retry costs."
497 .to_string(),
498 estimated_savings: 15.0,
499 priority: 4,
500 });
501 }
502
503 if let Some(pool) = self.resource_pools.get(&service_name) {
505 if pool.utilization() < 0.5 && pool.available.len() > 2 {
506 recommendations.push(CostRecommendation {
507 service_id: service_id.to_string(),
508 service_name: service_name.clone(),
509 recommendation_type: "Pool Optimization".to_string(),
510 description:
511 "Resource pool has low utilization. Consider reducing pool size."
512 .to_string(),
513 estimated_savings: 20.0,
514 priority: 3,
515 });
516 }
517 }
518
519 if avg_metrics.request_rate < 10.0 {
521 recommendations.push(CostRecommendation {
522 service_id: service_id.to_string(),
523 service_name,
524 recommendation_type: "Serverless Migration".to_string(),
525 description:
526 "Low consistent load. Consider migrating to serverless architecture."
527 .to_string(),
528 estimated_savings: 40.0,
529 priority: 4,
530 });
531 }
532 }
533 }
534
535 recommendations.sort_by(|a, b| b.priority.cmp(&a.priority));
536 recommendations
537 }
538
539 pub fn get_summary(&self) -> HashMap<String, serde_json::Value> {
541 let mut summary = HashMap::new();
542
543 summary.insert(
544 "total_services".to_string(),
545 serde_json::json!(self.metrics_history.len()),
546 );
547
548 summary.insert(
549 "total_instances".to_string(),
550 serde_json::json!(self.service_instances.values().sum::<u32>()),
551 );
552
553 summary.insert(
554 "total_pools".to_string(),
555 serde_json::json!(self.resource_pools.len()),
556 );
557
558 let avg_utilization: f64 = self
559 .resource_pools
560 .values()
561 .map(|p| p.utilization())
562 .sum::<f64>()
563 / self.resource_pools.len().max(1) as f64;
564
565 summary.insert(
566 "avg_pool_utilization".to_string(),
567 serde_json::json!(format!("{:.1}%", avg_utilization * 100.0)),
568 );
569
570 summary
571 }
572}
573
574impl Default for ServiceManager {
575 fn default() -> Self {
576 Self::new()
577 }
578}