syncable_cli/analyzer/k8s_optimize/
live_analyzer.rs

1//! Live Cluster Analyzer for Kubernetes resource optimization.
2//!
3//! Combines metrics from the Kubernetes metrics-server (real-time) and
4//! Prometheus (historical) to provide data-driven right-sizing recommendations.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────────────────┐
10//! │                        Live Analyzer                                │
11//! │                                                                     │
12//! │  ┌─────────────────┐    ┌──────────────────┐    ┌───────────────┐  │
13//! │  │  MetricsClient  │    │ PrometheusClient │    │ Static Rules  │  │
14//! │  │  (Real-time)    │    │ (Historical)     │    │ (Fallback)    │  │
15//! │  └────────┬────────┘    └────────┬─────────┘    └───────┬───────┘  │
16//! │           │                      │                      │          │
17//! │           └──────────────────────┴──────────────────────┘          │
18//! │                                  │                                  │
19//! │                                  ▼                                  │
20//! │                       ┌──────────────────┐                         │
21//! │                       │  Recommendations │                         │
22//! │                       │  (Data-Driven)   │                         │
23//! │                       └──────────────────┘                         │
24//! └─────────────────────────────────────────────────────────────────────┘
25//! ```
26
27use super::metrics_client::{MetricsClient, MetricsError, PodResources, ResourceComparison};
28use super::prometheus_client::{
29    ContainerHistory, HistoricalRecommendation, PrometheusClient, PrometheusError,
30};
31use super::types::Severity;
32use serde::{Deserialize, Serialize};
33
34/// Error type for live analysis operations.
35#[derive(Debug, thiserror::Error)]
36pub enum LiveAnalyzerError {
37    #[error("Kubernetes API error: {0}")]
38    KubernetesError(#[from] MetricsError),
39
40    #[error("Prometheus error: {0}")]
41    PrometheusError(#[from] PrometheusError),
42
43    #[error("No cluster connection available")]
44    NoClusterConnection,
45
46    #[error("Insufficient data for reliable recommendations")]
47    InsufficientData,
48}
49
50/// Data source for recommendations.
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
52pub enum DataSource {
53    /// Real-time metrics from metrics-server (current snapshot)
54    MetricsServer,
55    /// Historical data from Prometheus (7-30 days)
56    Prometheus,
57    /// Combined real-time + historical (most accurate)
58    Combined,
59    /// Static heuristics only (no cluster data)
60    Static,
61}
62
63/// Configuration for live analysis.
64#[derive(Debug, Clone)]
65pub struct LiveAnalyzerConfig {
66    /// Prometheus URL (optional)
67    pub prometheus_url: Option<String>,
68    /// Time range for historical data (e.g., "7d", "30d")
69    pub history_period: String,
70    /// Safety margin percentage (default: 20%)
71    pub safety_margin_pct: u8,
72    /// Minimum samples required for high-confidence recommendations
73    pub min_samples: usize,
74    /// Waste threshold percentage to report
75    pub waste_threshold_pct: f32,
76    /// Target namespace (None = all namespaces)
77    pub namespace: Option<String>,
78    /// Include system namespaces
79    pub include_system: bool,
80}
81
82impl Default for LiveAnalyzerConfig {
83    fn default() -> Self {
84        Self {
85            prometheus_url: None,
86            history_period: "7d".to_string(),
87            safety_margin_pct: 20,
88            min_samples: 100,
89            waste_threshold_pct: 10.0,
90            namespace: None,
91            include_system: false,
92        }
93    }
94}
95
96/// Live cluster analyzer.
97pub struct LiveAnalyzer {
98    metrics_client: Option<MetricsClient>,
99    prometheus_client: Option<PrometheusClient>,
100    config: LiveAnalyzerConfig,
101}
102
103impl LiveAnalyzer {
104    /// Create a new live analyzer, attempting to connect to the cluster.
105    pub async fn new(config: LiveAnalyzerConfig) -> Result<Self, LiveAnalyzerError> {
106        // Try to create Kubernetes client
107        let metrics_client = match MetricsClient::new().await {
108            Ok(client) => Some(client),
109            Err(e) => {
110                eprintln!("Warning: Could not connect to Kubernetes cluster: {}", e);
111                None
112            }
113        };
114
115        // Try to create Prometheus client if URL provided
116        let prometheus_client =
117            config
118                .prometheus_url
119                .as_ref()
120                .and_then(|url| match PrometheusClient::new(url) {
121                    Ok(client) => Some(client),
122                    Err(e) => {
123                        eprintln!("Warning: Could not create Prometheus client: {}", e);
124                        None
125                    }
126                });
127
128        Ok(Self {
129            metrics_client,
130            prometheus_client,
131            config,
132        })
133    }
134
135    /// Create analyzer with specific context.
136    pub async fn with_context(
137        context: &str,
138        config: LiveAnalyzerConfig,
139    ) -> Result<Self, LiveAnalyzerError> {
140        let metrics_client = match MetricsClient::with_context(context).await {
141            Ok(client) => Some(client),
142            Err(e) => {
143                eprintln!("Warning: Could not connect to context '{}': {}", context, e);
144                None
145            }
146        };
147
148        let prometheus_client = config
149            .prometheus_url
150            .as_ref()
151            .and_then(|url| PrometheusClient::new(url).ok());
152
153        Ok(Self {
154            metrics_client,
155            prometheus_client,
156            config,
157        })
158    }
159
160    /// Check what data sources are available.
161    pub async fn available_sources(&self) -> Vec<DataSource> {
162        let mut sources = vec![DataSource::Static]; // Always available
163
164        if let Some(ref metrics) = self.metrics_client
165            && metrics.is_metrics_available().await
166        {
167            sources.push(DataSource::MetricsServer);
168        }
169
170        if let Some(ref prometheus) = self.prometheus_client
171            && prometheus.is_available().await
172        {
173            sources.push(DataSource::Prometheus);
174        }
175
176        if sources.contains(&DataSource::MetricsServer) && sources.contains(&DataSource::Prometheus)
177        {
178            sources.push(DataSource::Combined);
179        }
180
181        sources
182    }
183
184    /// Analyze cluster and generate recommendations.
185    pub async fn analyze(&self) -> Result<LiveAnalysisResult, LiveAnalyzerError> {
186        let sources = self.available_sources().await;
187
188        let best_source = if sources.contains(&DataSource::Combined) {
189            DataSource::Combined
190        } else if sources.contains(&DataSource::Prometheus) {
191            DataSource::Prometheus
192        } else if sources.contains(&DataSource::MetricsServer) {
193            DataSource::MetricsServer
194        } else {
195            DataSource::Static
196        };
197
198        match best_source {
199            DataSource::Combined => self.analyze_combined().await,
200            DataSource::Prometheus => self.analyze_prometheus().await,
201            DataSource::MetricsServer => self.analyze_metrics_server().await,
202            DataSource::Static => Ok(LiveAnalysisResult::static_fallback()),
203        }
204    }
205
206    /// Analyze using metrics-server data (real-time snapshot).
207    async fn analyze_metrics_server(&self) -> Result<LiveAnalysisResult, LiveAnalyzerError> {
208        let client = self
209            .metrics_client
210            .as_ref()
211            .ok_or(LiveAnalyzerError::NoClusterConnection)?;
212
213        let namespace = self.config.namespace.as_deref();
214        let comparisons = client.compare_usage(namespace).await?;
215        let total_count = comparisons.len();
216
217        let mut recommendations = Vec::new();
218        let mut total_cpu_waste: u64 = 0;
219        let mut total_memory_waste: u64 = 0;
220        let mut over_provisioned = 0;
221        let mut under_provisioned = 0;
222
223        for comp in comparisons {
224            // Skip system namespaces unless configured
225            if !self.config.include_system && is_system_namespace(&comp.namespace) {
226                continue;
227            }
228
229            // Skip if waste is below threshold
230            if comp.cpu_waste_pct.abs() < self.config.waste_threshold_pct
231                && comp.memory_waste_pct.abs() < self.config.waste_threshold_pct
232            {
233                continue;
234            }
235
236            let recommendation = self.comparison_to_recommendation(&comp);
237
238            if comp.cpu_waste_pct > 0.0 || comp.memory_waste_pct > 0.0 {
239                over_provisioned += 1;
240                if let Some(req) = comp.cpu_request {
241                    total_cpu_waste += (req as f32 * (comp.cpu_waste_pct / 100.0)) as u64;
242                }
243                if let Some(req) = comp.memory_request {
244                    total_memory_waste += (req as f32 * (comp.memory_waste_pct / 100.0)) as u64;
245                }
246            } else {
247                under_provisioned += 1;
248            }
249
250            recommendations.push(recommendation);
251        }
252
253        Ok(LiveAnalysisResult {
254            source: DataSource::MetricsServer,
255            recommendations,
256            summary: AnalysisSummary {
257                resources_analyzed: total_count,
258                over_provisioned,
259                under_provisioned,
260                optimal: total_count.saturating_sub(over_provisioned + under_provisioned),
261                total_cpu_waste_millicores: total_cpu_waste,
262                total_memory_waste_bytes: total_memory_waste,
263                confidence: 60, // Lower confidence for point-in-time data
264            },
265            warnings: vec![
266                "Real-time snapshot only. For accurate recommendations, enable Prometheus for historical data.".to_string()
267            ],
268        })
269    }
270
271    /// Analyze using Prometheus historical data.
272    async fn analyze_prometheus(&self) -> Result<LiveAnalysisResult, LiveAnalyzerError> {
273        let client = self
274            .prometheus_client
275            .as_ref()
276            .ok_or(LiveAnalyzerError::NoClusterConnection)?;
277
278        let metrics_client = self.metrics_client.as_ref();
279
280        // Get pod resources to understand current requests
281        let pod_resources = if let Some(mc) = metrics_client {
282            mc.get_pod_resources(self.config.namespace.as_deref())
283                .await
284                .ok()
285        } else {
286            None
287        };
288
289        let mut recommendations = Vec::new();
290        let mut over_provisioned = 0;
291        let mut under_provisioned = 0;
292        let mut total_cpu_waste: u64 = 0;
293        let mut total_memory_waste: u64 = 0;
294
295        // Group by unique workloads
296        let workloads = if let Some(ref resources) = pod_resources {
297            extract_workloads(resources)
298        } else {
299            Vec::new()
300        };
301
302        let resources_analyzed = workloads.len();
303
304        for (namespace, owner_name, containers) in workloads {
305            if !self.config.include_system && is_system_namespace(&namespace) {
306                continue;
307            }
308
309            for (container_name, cpu_request, memory_request) in containers {
310                match client
311                    .get_container_history(
312                        &namespace,
313                        &owner_name,
314                        &container_name,
315                        &self.config.history_period,
316                    )
317                    .await
318                {
319                    Ok(history) => {
320                        let rec = PrometheusClient::generate_recommendation(
321                            &history,
322                            cpu_request,
323                            memory_request,
324                            self.config.safety_margin_pct,
325                        );
326
327                        if rec.cpu_savings_pct.abs() < self.config.waste_threshold_pct
328                            && rec.memory_savings_pct.abs() < self.config.waste_threshold_pct
329                        {
330                            continue;
331                        }
332
333                        if rec.cpu_savings_pct > 0.0 || rec.memory_savings_pct > 0.0 {
334                            over_provisioned += 1;
335                            if let Some(req) = cpu_request {
336                                total_cpu_waste +=
337                                    (req as f32 * (rec.cpu_savings_pct / 100.0)) as u64;
338                            }
339                            if let Some(req) = memory_request {
340                                total_memory_waste +=
341                                    (req as f32 * (rec.memory_savings_pct / 100.0)) as u64;
342                            }
343                        } else {
344                            under_provisioned += 1;
345                        }
346
347                        recommendations
348                            .push(self.history_to_recommendation(&rec, &namespace, &history));
349                    }
350                    Err(_) => continue,
351                }
352            }
353        }
354
355        Ok(LiveAnalysisResult {
356            source: DataSource::Prometheus,
357            recommendations,
358            summary: AnalysisSummary {
359                resources_analyzed,
360                over_provisioned,
361                under_provisioned,
362                optimal: resources_analyzed - over_provisioned - under_provisioned,
363                total_cpu_waste_millicores: total_cpu_waste,
364                total_memory_waste_bytes: total_memory_waste,
365                confidence: 85,
366            },
367            warnings: vec![],
368        })
369    }
370
371    /// Analyze using both real-time and historical data (highest accuracy).
372    async fn analyze_combined(&self) -> Result<LiveAnalysisResult, LiveAnalyzerError> {
373        // Get Prometheus-based recommendations (more accurate)
374        let mut result = self.analyze_prometheus().await?;
375
376        // Get real-time data for current state
377        if let Ok(_realtime) = self.analyze_metrics_server().await {
378            // Merge real-time data with historical
379            result.source = DataSource::Combined;
380            result.summary.confidence = 95;
381            result.warnings = vec![];
382        }
383
384        Ok(result)
385    }
386
387    /// Convert a ResourceComparison to a recommendation.
388    fn comparison_to_recommendation(&self, comp: &ResourceComparison) -> LiveRecommendation {
389        let severity = if comp.memory_waste_pct < -25.0 {
390            Severity::Critical // Significantly under-provisioned memory
391        } else if comp.cpu_waste_pct < -25.0 || comp.memory_waste_pct < -10.0 {
392            Severity::High
393        } else if comp.cpu_waste_pct > 50.0 || comp.memory_waste_pct > 50.0 {
394            Severity::High
395        } else if comp.cpu_waste_pct > 25.0 || comp.memory_waste_pct > 25.0 {
396            Severity::Medium
397        } else {
398            Severity::Low
399        };
400
401        let margin = 1.0 + (self.config.safety_margin_pct as f64 / 100.0);
402        let recommended_cpu = round_cpu((comp.cpu_actual as f64 * margin) as u64);
403        let recommended_memory = round_memory((comp.memory_actual as f64 * margin) as u64);
404
405        LiveRecommendation {
406            workload_name: comp
407                .owner_name
408                .clone()
409                .unwrap_or_else(|| comp.pod_name.clone()),
410            workload_kind: comp.owner_kind.clone().unwrap_or_else(|| "Pod".to_string()),
411            namespace: comp.namespace.clone(),
412            container_name: comp.container_name.clone(),
413            severity,
414            current_cpu_millicores: comp.cpu_request,
415            current_memory_bytes: comp.memory_request,
416            actual_cpu_millicores: comp.cpu_actual,
417            actual_memory_bytes: comp.memory_actual,
418            recommended_cpu_millicores: recommended_cpu,
419            recommended_memory_bytes: recommended_memory,
420            cpu_waste_pct: comp.cpu_waste_pct,
421            memory_waste_pct: comp.memory_waste_pct,
422            confidence: 60,
423            data_source: DataSource::MetricsServer,
424        }
425    }
426
427    /// Convert historical recommendation to our format.
428    fn history_to_recommendation(
429        &self,
430        rec: &HistoricalRecommendation,
431        namespace: &str,
432        history: &ContainerHistory,
433    ) -> LiveRecommendation {
434        let severity = if rec.memory_savings_pct < -25.0 {
435            Severity::Critical
436        } else if rec.cpu_savings_pct > 50.0 || rec.memory_savings_pct > 50.0 {
437            Severity::High
438        } else if rec.cpu_savings_pct > 25.0 || rec.memory_savings_pct > 25.0 {
439            Severity::Medium
440        } else {
441            Severity::Low
442        };
443
444        LiveRecommendation {
445            workload_name: rec.workload_name.clone(),
446            workload_kind: "Deployment".to_string(), // Assume deployment
447            namespace: namespace.to_string(),
448            container_name: rec.container_name.clone(),
449            severity,
450            current_cpu_millicores: rec.current_cpu_request,
451            current_memory_bytes: rec.current_memory_request,
452            actual_cpu_millicores: history.cpu_p99,
453            actual_memory_bytes: history.memory_p99,
454            recommended_cpu_millicores: rec.recommended_cpu_request,
455            recommended_memory_bytes: rec.recommended_memory_request,
456            cpu_waste_pct: rec.cpu_savings_pct,
457            memory_waste_pct: rec.memory_savings_pct,
458            confidence: rec.confidence,
459            data_source: DataSource::Prometheus,
460        }
461    }
462}
463
464/// Result of live cluster analysis.
465#[derive(Debug, Clone, Serialize, Deserialize)]
466pub struct LiveAnalysisResult {
467    /// Data source used for recommendations
468    pub source: DataSource,
469    /// Individual recommendations
470    pub recommendations: Vec<LiveRecommendation>,
471    /// Summary statistics
472    pub summary: AnalysisSummary,
473    /// Warnings or notes
474    pub warnings: Vec<String>,
475}
476
477impl LiveAnalysisResult {
478    /// Create a static fallback result when no cluster connection is available.
479    fn static_fallback() -> Self {
480        Self {
481            source: DataSource::Static,
482            recommendations: vec![],
483            summary: AnalysisSummary {
484                resources_analyzed: 0,
485                over_provisioned: 0,
486                under_provisioned: 0,
487                optimal: 0,
488                total_cpu_waste_millicores: 0,
489                total_memory_waste_bytes: 0,
490                confidence: 0,
491            },
492            warnings: vec![
493                "No cluster connection available. Using static analysis only.".to_string(),
494                "Connect to a cluster with --cluster for data-driven recommendations.".to_string(),
495            ],
496        }
497    }
498}
499
500/// Summary of analysis results.
501#[derive(Debug, Clone, Serialize, Deserialize)]
502pub struct AnalysisSummary {
503    pub resources_analyzed: usize,
504    pub over_provisioned: usize,
505    pub under_provisioned: usize,
506    pub optimal: usize,
507    pub total_cpu_waste_millicores: u64,
508    pub total_memory_waste_bytes: u64,
509    /// Confidence percentage (0-100)
510    pub confidence: u8,
511}
512
513/// A single recommendation from live analysis.
514#[derive(Debug, Clone, Serialize, Deserialize)]
515pub struct LiveRecommendation {
516    pub workload_name: String,
517    pub workload_kind: String,
518    pub namespace: String,
519    pub container_name: String,
520    pub severity: Severity,
521    /// Current CPU request (millicores)
522    pub current_cpu_millicores: Option<u64>,
523    /// Current memory request (bytes)
524    pub current_memory_bytes: Option<u64>,
525    /// Actual CPU usage (millicores)
526    pub actual_cpu_millicores: u64,
527    /// Actual memory usage (bytes)
528    pub actual_memory_bytes: u64,
529    /// Recommended CPU request (millicores)
530    pub recommended_cpu_millicores: u64,
531    /// Recommended memory request (bytes)
532    pub recommended_memory_bytes: u64,
533    /// CPU waste percentage (positive = over-provisioned)
534    pub cpu_waste_pct: f32,
535    /// Memory waste percentage (positive = over-provisioned)
536    pub memory_waste_pct: f32,
537    /// Confidence level (0-100)
538    pub confidence: u8,
539    /// Source of the data
540    pub data_source: DataSource,
541}
542
543impl LiveRecommendation {
544    /// Generate a YAML fix snippet for this recommendation.
545    pub fn generate_fix_yaml(&self) -> String {
546        let cpu_str = format_cpu_millicores(self.recommended_cpu_millicores);
547        let mem_str = format_memory_bytes(self.recommended_memory_bytes);
548
549        format!(
550            "# Fix for {}/{} container {}
551# Source: {:?} (confidence: {}%)
552resources:
553  requests:
554    cpu: \"{}\"
555    memory: \"{}\"
556  limits:
557    cpu: \"{}\"    # Consider 2x request for burst
558    memory: \"{}\"  # Same as request to prevent OOM",
559            self.namespace,
560            self.workload_name,
561            self.container_name,
562            self.data_source,
563            self.confidence,
564            cpu_str,
565            mem_str,
566            format_cpu_millicores(self.recommended_cpu_millicores * 2), // 2x for limit
567            mem_str, // Memory limit = request to prevent OOM
568        )
569    }
570}
571
572/// Format CPU millicores as Kubernetes resource string.
573fn format_cpu_millicores(millicores: u64) -> String {
574    if millicores >= 1000 {
575        format!("{}", millicores / 1000) // Full cores
576    } else {
577        format!("{}m", millicores)
578    }
579}
580
581/// Format memory bytes as Kubernetes resource string.
582fn format_memory_bytes(bytes: u64) -> String {
583    const GI: u64 = 1024 * 1024 * 1024;
584    const MI: u64 = 1024 * 1024;
585
586    if bytes >= GI {
587        format!("{}Gi", bytes / GI)
588    } else {
589        format!("{}Mi", bytes / MI)
590    }
591}
592
593// ============================================================================
594// Helper functions
595// ============================================================================
596
597/// Check if a namespace is a system namespace.
598fn is_system_namespace(namespace: &str) -> bool {
599    matches!(
600        namespace,
601        "kube-system"
602            | "kube-public"
603            | "kube-node-lease"
604            | "default"
605            | "ingress-nginx"
606            | "cert-manager"
607            | "monitoring"
608            | "logging"
609            | "istio-system"
610    )
611}
612
613/// Extract unique workloads from pod resources.
614fn extract_workloads(
615    resources: &[PodResources],
616) -> Vec<(String, String, Vec<(String, Option<u64>, Option<u64>)>)> {
617    use std::collections::HashMap;
618
619    let mut workloads: HashMap<(String, String), Vec<(String, Option<u64>, Option<u64>)>> =
620        HashMap::new();
621
622    for pod in resources {
623        let owner = pod.owner_name.clone().unwrap_or_else(|| pod.name.clone());
624        let key = (pod.namespace.clone(), owner);
625
626        let containers: Vec<_> = pod
627            .containers
628            .iter()
629            .map(|c| (c.name.clone(), c.cpu_request, c.memory_request))
630            .collect();
631
632        workloads.entry(key).or_default().extend(containers);
633    }
634
635    workloads
636        .into_iter()
637        .map(|((ns, owner), containers)| (ns, owner, containers))
638        .collect()
639}
640
641/// Round CPU to nice values.
642/// Small values use ceiling (to prevent under-provisioning), larger values use rounding.
643fn round_cpu(millicores: u64) -> u64 {
644    if millicores == 0 {
645        0
646    } else if millicores <= 100 {
647        // Ceiling to nearest 25m
648        millicores.div_ceil(25) * 25
649    } else if millicores <= 1000 {
650        // Round to nearest 50m
651        ((millicores + 25) / 50) * 50
652    } else {
653        // Round to nearest 100m
654        ((millicores + 50) / 100) * 100
655    }
656}
657
658/// Round memory to nice values.
659fn round_memory(bytes: u64) -> u64 {
660    const MI: u64 = 1024 * 1024;
661    if bytes <= 128 * MI {
662        ((bytes + 16 * MI) / (32 * MI)) * (32 * MI)
663    } else {
664        ((bytes + 32 * MI) / (64 * MI)) * (64 * MI)
665    }
666}
667
668#[cfg(test)]
669mod tests {
670    use super::*;
671
672    #[test]
673    fn test_is_system_namespace() {
674        assert!(is_system_namespace("kube-system"));
675        assert!(is_system_namespace("kube-public"));
676        assert!(!is_system_namespace("production"));
677        assert!(!is_system_namespace("my-app"));
678    }
679
680    #[test]
681    fn test_round_cpu() {
682        assert_eq!(round_cpu(10), 25);
683        assert_eq!(round_cpu(90), 100);
684        assert_eq!(round_cpu(150), 150);
685        assert_eq!(round_cpu(1250), 1300);
686    }
687}