clnrm_core/services/
service_manager.rs

1//! Intelligent Service Manager - AI-driven service lifecycle management
2//!
3//! Provides autonomous service management with:
4//! - Auto-scaling based on load prediction
5//! - Resource optimization and pooling
6//! - Service health prediction
7//! - Cost optimization recommendations
8
9use crate::cleanroom::{HealthStatus, ServiceHandle};
10use crate::error::{CleanroomError, Result};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::time::{SystemTime, UNIX_EPOCH};
14use tracing::{debug, info, warn};
15
16/// Service metrics for tracking resource usage and performance
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct ServiceMetrics {
19    /// Service identifier
20    pub service_id: String,
21    /// Service name
22    pub service_name: String,
23    /// CPU usage percentage (0-100)
24    pub cpu_usage: f64,
25    /// Memory usage in MB
26    pub memory_usage: f64,
27    /// Network I/O in MB/s
28    pub network_io: f64,
29    /// Number of active connections
30    pub active_connections: u32,
31    /// Request rate (requests per second)
32    pub request_rate: f64,
33    /// Average response time in milliseconds
34    pub response_time_ms: f64,
35    /// Error rate (0-1)
36    pub error_rate: f64,
37    /// Timestamp when metrics were collected
38    pub timestamp: u64,
39}
40
41impl ServiceMetrics {
42    /// Create new service metrics
43    pub fn new(service_id: String, service_name: String) -> Self {
44        Self {
45            service_id,
46            service_name,
47            cpu_usage: 0.0,
48            memory_usage: 0.0,
49            network_io: 0.0,
50            active_connections: 0,
51            request_rate: 0.0,
52            response_time_ms: 0.0,
53            error_rate: 0.0,
54            timestamp: SystemTime::now()
55                .duration_since(UNIX_EPOCH)
56                .unwrap_or_default()
57                .as_secs(),
58        }
59    }
60
61    /// Calculate health score (0-100)
62    pub fn health_score(&self) -> f64 {
63        let cpu_score = (100.0 - self.cpu_usage).max(0.0);
64        let memory_score = (100.0 - (self.memory_usage / 10.24)).max(0.0); // Assuming 1GB max
65        let error_score = (1.0 - self.error_rate) * 100.0;
66        let response_score = (1000.0 / (self.response_time_ms + 1.0)).min(100.0);
67
68        cpu_score * 0.3 + memory_score * 0.3 + error_score * 0.2 + response_score * 0.2
69    }
70}
71
72/// Historical metrics for load prediction
73#[derive(Debug, Clone)]
74pub struct MetricsHistory {
75    /// Service identifier
76    pub service_id: String,
77    /// Historical metrics (up to 1000 entries)
78    pub history: Vec<ServiceMetrics>,
79    /// Maximum history size
80    max_size: usize,
81}
82
83impl MetricsHistory {
84    /// Create new metrics history
85    pub fn new(service_id: String) -> Self {
86        Self {
87            service_id,
88            history: Vec::new(),
89            max_size: 1000,
90        }
91    }
92
93    /// Add metrics to history
94    pub fn add_metrics(&mut self, metrics: ServiceMetrics) {
95        self.history.push(metrics);
96        if self.history.len() > self.max_size {
97            self.history.remove(0);
98        }
99    }
100
101    /// Get average metrics over last N entries
102    pub fn average_metrics(&self, last_n: usize) -> Option<ServiceMetrics> {
103        if self.history.is_empty() {
104            return None;
105        }
106
107        let n = last_n.min(self.history.len());
108        let slice = &self.history[self.history.len() - n..];
109
110        let avg_cpu = slice.iter().map(|m| m.cpu_usage).sum::<f64>() / n as f64;
111        let avg_memory = slice.iter().map(|m| m.memory_usage).sum::<f64>() / n as f64;
112        let avg_network = slice.iter().map(|m| m.network_io).sum::<f64>() / n as f64;
113        let avg_connections = slice.iter().map(|m| m.active_connections).sum::<u32>() / n as u32;
114        let avg_request_rate = slice.iter().map(|m| m.request_rate).sum::<f64>() / n as f64;
115        let avg_response_time = slice.iter().map(|m| m.response_time_ms).sum::<f64>() / n as f64;
116        let avg_error_rate = slice.iter().map(|m| m.error_rate).sum::<f64>() / n as f64;
117
118        Some(ServiceMetrics {
119            service_id: self.service_id.clone(),
120            service_name: self.history.last()?.service_name.clone(),
121            cpu_usage: avg_cpu,
122            memory_usage: avg_memory,
123            network_io: avg_network,
124            active_connections: avg_connections,
125            request_rate: avg_request_rate,
126            response_time_ms: avg_response_time,
127            error_rate: avg_error_rate,
128            timestamp: SystemTime::now()
129                .duration_since(UNIX_EPOCH)
130                .unwrap_or_default()
131                .as_secs(),
132        })
133    }
134
135    /// Predict future load using simple moving average
136    pub fn predict_load(&self, horizon_minutes: u32) -> Option<ServiceMetrics> {
137        if self.history.len() < 3 {
138            return None;
139        }
140
141        // Use exponential moving average for better prediction
142        let alpha = 0.3; // Smoothing factor
143        let mut ema_cpu = self.history[0].cpu_usage;
144        let mut ema_memory = self.history[0].memory_usage;
145        let mut ema_request_rate = self.history[0].request_rate;
146
147        for metrics in &self.history[1..] {
148            ema_cpu = alpha * metrics.cpu_usage + (1.0 - alpha) * ema_cpu;
149            ema_memory = alpha * metrics.memory_usage + (1.0 - alpha) * ema_memory;
150            ema_request_rate = alpha * metrics.request_rate + (1.0 - alpha) * ema_request_rate;
151        }
152
153        // Apply trend factor based on recent growth
154        let recent = self.average_metrics(5)?;
155        let older = self.average_metrics(20)?;
156
157        let cpu_trend = (recent.cpu_usage - older.cpu_usage) / older.cpu_usage.max(1.0);
158        let memory_trend = (recent.memory_usage - older.memory_usage) / older.memory_usage.max(1.0);
159        let request_trend =
160            (recent.request_rate - older.request_rate) / older.request_rate.max(1.0);
161
162        Some(ServiceMetrics {
163            service_id: self.service_id.clone(),
164            service_name: recent.service_name.clone(),
165            cpu_usage: (ema_cpu * (1.0 + cpu_trend)).min(100.0),
166            memory_usage: ema_memory * (1.0 + memory_trend),
167            network_io: recent.network_io,
168            active_connections: recent.active_connections,
169            request_rate: ema_request_rate * (1.0 + request_trend),
170            response_time_ms: recent.response_time_ms,
171            error_rate: recent.error_rate,
172            timestamp: SystemTime::now()
173                .duration_since(UNIX_EPOCH)
174                .unwrap_or_default()
175                .as_secs()
176                + (horizon_minutes as u64 * 60),
177        })
178    }
179}
180
181/// Auto-scaling configuration
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct AutoScaleConfig {
184    /// Minimum number of instances
185    pub min_instances: u32,
186    /// Maximum number of instances
187    pub max_instances: u32,
188    /// CPU threshold for scaling up (percentage)
189    pub cpu_scale_up_threshold: f64,
190    /// CPU threshold for scaling down (percentage)
191    pub cpu_scale_down_threshold: f64,
192    /// Memory threshold for scaling up (MB)
193    pub memory_scale_up_threshold: f64,
194    /// Memory threshold for scaling down (MB)
195    pub memory_scale_down_threshold: f64,
196    /// Request rate threshold for scaling up (req/s)
197    pub request_rate_scale_up_threshold: f64,
198    /// Cool-down period between scaling actions (seconds)
199    pub cooldown_seconds: u64,
200}
201
202impl Default for AutoScaleConfig {
203    fn default() -> Self {
204        Self {
205            min_instances: 1,
206            max_instances: 10,
207            cpu_scale_up_threshold: 70.0,
208            cpu_scale_down_threshold: 30.0,
209            memory_scale_up_threshold: 512.0,
210            memory_scale_down_threshold: 128.0,
211            request_rate_scale_up_threshold: 100.0,
212            cooldown_seconds: 60,
213        }
214    }
215}
216
217/// Scaling decision
218#[derive(Debug, Clone, PartialEq)]
219pub enum ScalingAction {
220    /// Scale up by N instances
221    ScaleUp(u32),
222    /// Scale down by N instances
223    ScaleDown(u32),
224    /// No scaling needed
225    NoAction,
226}
227
228/// Resource pool for service reuse
229#[derive(Debug, Clone)]
230pub struct ResourcePool {
231    /// Service name
232    pub service_name: String,
233    /// Available service instances
234    pub available: Vec<ServiceHandle>,
235    /// In-use service instances
236    pub in_use: Vec<ServiceHandle>,
237    /// Maximum pool size
238    pub max_size: usize,
239}
240
241impl ResourcePool {
242    /// Create new resource pool
243    pub fn new(service_name: String, max_size: usize) -> Self {
244        Self {
245            service_name,
246            available: Vec::new(),
247            in_use: Vec::new(),
248            max_size,
249        }
250    }
251
252    /// Acquire a service from pool
253    pub fn acquire(&mut self) -> Option<ServiceHandle> {
254        if let Some(handle) = self.available.pop() {
255            self.in_use.push(handle.clone());
256            Some(handle)
257        } else {
258            None
259        }
260    }
261
262    /// Release a service back to pool
263    pub fn release(&mut self, handle: ServiceHandle) {
264        if let Some(pos) = self.in_use.iter().position(|h| h.id == handle.id) {
265            self.in_use.remove(pos);
266            if self.available.len() < self.max_size {
267                self.available.push(handle);
268            }
269        }
270    }
271
272    /// Get pool utilization (0-1)
273    pub fn utilization(&self) -> f64 {
274        let total = self.available.len() + self.in_use.len();
275        if total == 0 {
276            0.0
277        } else {
278            self.in_use.len() as f64 / total as f64
279        }
280    }
281}
282
283/// Cost optimization recommendation
284#[derive(Debug, Clone, Serialize, Deserialize)]
285pub struct CostRecommendation {
286    /// Service identifier
287    pub service_id: String,
288    /// Service name
289    pub service_name: String,
290    /// Recommendation type
291    pub recommendation_type: String,
292    /// Description
293    pub description: String,
294    /// Estimated savings (percentage)
295    pub estimated_savings: f64,
296    /// Priority (1-5, 5 being highest)
297    pub priority: u32,
298}
299
300/// Intelligent Service Manager
301pub struct ServiceManager {
302    /// Service metrics history
303    metrics_history: HashMap<String, MetricsHistory>,
304    /// Auto-scaling configurations
305    auto_scale_configs: HashMap<String, AutoScaleConfig>,
306    /// Current service instances count
307    pub service_instances: HashMap<String, u32>,
308    /// Last scaling action timestamp
309    last_scaling_action: HashMap<String, u64>,
310    /// Resource pools
311    pub resource_pools: HashMap<String, ResourcePool>,
312}
313
314impl ServiceManager {
315    /// Create new service manager
316    pub fn new() -> Self {
317        Self {
318            metrics_history: HashMap::new(),
319            auto_scale_configs: HashMap::new(),
320            service_instances: HashMap::new(),
321            last_scaling_action: HashMap::new(),
322            resource_pools: HashMap::new(),
323        }
324    }
325
326    /// Record service metrics
327    pub fn record_metrics(&mut self, metrics: ServiceMetrics) {
328        let service_id = metrics.service_id.clone();
329
330        self.metrics_history
331            .entry(service_id)
332            .or_insert_with(|| MetricsHistory::new(metrics.service_id.clone()))
333            .add_metrics(metrics);
334    }
335
336    /// Set auto-scaling configuration for a service
337    pub fn set_auto_scale_config(&mut self, service_id: String, config: AutoScaleConfig) {
338        self.auto_scale_configs.insert(service_id, config);
339    }
340
341    /// Predict service load
342    pub fn predict_load(&self, service_id: &str, horizon_minutes: u32) -> Option<ServiceMetrics> {
343        self.metrics_history
344            .get(service_id)
345            .and_then(|history| history.predict_load(horizon_minutes))
346    }
347
348    /// Determine scaling action based on metrics and prediction
349    pub fn determine_scaling_action(&mut self, service_id: &str) -> Result<ScalingAction> {
350        let config = self.auto_scale_configs.get(service_id).ok_or_else(|| {
351            CleanroomError::internal_error("No auto-scale config found")
352                .with_context(format!("Service: {}", service_id))
353        })?;
354
355        // Check cooldown period
356        let now = SystemTime::now()
357            .duration_since(UNIX_EPOCH)
358            .unwrap_or_default()
359            .as_secs();
360
361        if let Some(&last_action) = self.last_scaling_action.get(service_id) {
362            if now - last_action < config.cooldown_seconds {
363                debug!(
364                    "Scaling action in cooldown period for service: {}",
365                    service_id
366                );
367                return Ok(ScalingAction::NoAction);
368            }
369        }
370
371        // Get current and predicted metrics
372        let history = self.metrics_history.get(service_id).ok_or_else(|| {
373            CleanroomError::internal_error("No metrics history found")
374                .with_context(format!("Service: {}", service_id))
375        })?;
376
377        let current = history
378            .average_metrics(5)
379            .ok_or_else(|| CleanroomError::internal_error("Insufficient metrics data"))?;
380
381        let predicted = history.predict_load(5).unwrap_or_else(|| current.clone());
382
383        let current_instances = *self.service_instances.get(service_id).unwrap_or(&1);
384
385        // Determine if scaling is needed based on current + predicted metrics
386        let max_cpu = current.cpu_usage.max(predicted.cpu_usage);
387        let max_memory = current.memory_usage.max(predicted.memory_usage);
388        let max_request_rate = current.request_rate.max(predicted.request_rate);
389
390        if max_cpu > config.cpu_scale_up_threshold
391            || max_memory > config.memory_scale_up_threshold
392            || max_request_rate > config.request_rate_scale_up_threshold
393        {
394            if current_instances < config.max_instances {
395                let scale_up = ((max_cpu / config.cpu_scale_up_threshold).ceil() as u32)
396                    .min(config.max_instances - current_instances);
397
398                self.last_scaling_action.insert(service_id.to_string(), now);
399                info!(
400                    "Scaling up service {} by {} instances",
401                    service_id, scale_up
402                );
403                return Ok(ScalingAction::ScaleUp(scale_up));
404            }
405        } else if max_cpu < config.cpu_scale_down_threshold
406            && max_memory < config.memory_scale_down_threshold
407            && current_instances > config.min_instances
408        {
409            let scale_down = 1.min(current_instances - config.min_instances);
410            self.last_scaling_action.insert(service_id.to_string(), now);
411            info!(
412                "Scaling down service {} by {} instances",
413                service_id, scale_down
414            );
415            return Ok(ScalingAction::ScaleDown(scale_down));
416        }
417
418        Ok(ScalingAction::NoAction)
419    }
420
421    /// Update service instance count
422    pub fn update_instance_count(&mut self, service_id: String, count: u32) {
423        self.service_instances.insert(service_id, count);
424    }
425
426    /// Predict service health
427    pub fn predict_service_health(&self, service_id: &str) -> Result<HealthStatus> {
428        let history = self.metrics_history.get(service_id).ok_or_else(|| {
429            CleanroomError::internal_error("No metrics history found")
430                .with_context(format!("Service: {}", service_id))
431        })?;
432
433        let predicted = history.predict_load(5);
434
435        if let Some(metrics) = predicted {
436            let health_score = metrics.health_score();
437
438            if health_score > 70.0 {
439                Ok(HealthStatus::Healthy)
440            } else if health_score > 40.0 {
441                warn!(
442                    "Service {} predicted to be degraded (score: {})",
443                    service_id, health_score
444                );
445                Ok(HealthStatus::Unknown)
446            } else {
447                warn!(
448                    "Service {} predicted to be unhealthy (score: {})",
449                    service_id, health_score
450                );
451                Ok(HealthStatus::Unhealthy)
452            }
453        } else {
454            Ok(HealthStatus::Unknown)
455        }
456    }
457
458    /// Get or create resource pool for a service
459    pub fn get_or_create_pool(
460        &mut self,
461        service_name: String,
462        max_size: usize,
463    ) -> &mut ResourcePool {
464        self.resource_pools
465            .entry(service_name.clone())
466            .or_insert_with(|| ResourcePool::new(service_name, max_size))
467    }
468
469    /// Generate cost optimization recommendations
470    pub fn generate_cost_recommendations(&self, service_id: &str) -> Vec<CostRecommendation> {
471        let mut recommendations = Vec::new();
472
473        if let Some(history) = self.metrics_history.get(service_id) {
474            if let Some(avg_metrics) = history.average_metrics(100) {
475                let service_name = avg_metrics.service_name.clone();
476
477                // Check for over-provisioning
478                if avg_metrics.cpu_usage < 20.0 && avg_metrics.memory_usage < 100.0 {
479                    recommendations.push(CostRecommendation {
480                        service_id: service_id.to_string(),
481                        service_name: service_name.clone(),
482                        recommendation_type: "Downsize".to_string(),
483                        description: "Service is significantly under-utilized. Consider reducing instance size.".to_string(),
484                        estimated_savings: 30.0,
485                        priority: 5,
486                    });
487                }
488
489                // Check for high error rate
490                if avg_metrics.error_rate > 0.05 {
491                    recommendations.push(CostRecommendation {
492                        service_id: service_id.to_string(),
493                        service_name: service_name.clone(),
494                        recommendation_type: "Optimize".to_string(),
495                        description:
496                            "High error rate detected. Investigate and fix to reduce retry costs."
497                                .to_string(),
498                        estimated_savings: 15.0,
499                        priority: 4,
500                    });
501                }
502
503                // Check for resource pooling opportunity
504                if let Some(pool) = self.resource_pools.get(&service_name) {
505                    if pool.utilization() < 0.5 && pool.available.len() > 2 {
506                        recommendations.push(CostRecommendation {
507                            service_id: service_id.to_string(),
508                            service_name: service_name.clone(),
509                            recommendation_type: "Pool Optimization".to_string(),
510                            description:
511                                "Resource pool has low utilization. Consider reducing pool size."
512                                    .to_string(),
513                            estimated_savings: 20.0,
514                            priority: 3,
515                        });
516                    }
517                }
518
519                // Check for consistent low load
520                if avg_metrics.request_rate < 10.0 {
521                    recommendations.push(CostRecommendation {
522                        service_id: service_id.to_string(),
523                        service_name,
524                        recommendation_type: "Serverless Migration".to_string(),
525                        description:
526                            "Low consistent load. Consider migrating to serverless architecture."
527                                .to_string(),
528                        estimated_savings: 40.0,
529                        priority: 4,
530                    });
531                }
532            }
533        }
534
535        recommendations.sort_by(|a, b| b.priority.cmp(&a.priority));
536        recommendations
537    }
538
539    /// Get summary statistics
540    pub fn get_summary(&self) -> HashMap<String, serde_json::Value> {
541        let mut summary = HashMap::new();
542
543        summary.insert(
544            "total_services".to_string(),
545            serde_json::json!(self.metrics_history.len()),
546        );
547
548        summary.insert(
549            "total_instances".to_string(),
550            serde_json::json!(self.service_instances.values().sum::<u32>()),
551        );
552
553        summary.insert(
554            "total_pools".to_string(),
555            serde_json::json!(self.resource_pools.len()),
556        );
557
558        let avg_utilization: f64 = self
559            .resource_pools
560            .values()
561            .map(|p| p.utilization())
562            .sum::<f64>()
563            / self.resource_pools.len().max(1) as f64;
564
565        summary.insert(
566            "avg_pool_utilization".to_string(),
567            serde_json::json!(format!("{:.1}%", avg_utilization * 100.0)),
568        );
569
570        summary
571    }
572}
573
574impl Default for ServiceManager {
575    fn default() -> Self {
576        Self::new()
577    }
578}