1use super::metrics_client::{MetricsClient, MetricsError, PodResources, ResourceComparison};
28use super::prometheus_client::{
29 ContainerHistory, HistoricalRecommendation, PrometheusClient, PrometheusError,
30};
31use super::types::Severity;
32use serde::{Deserialize, Serialize};
33
34#[derive(Debug, thiserror::Error)]
36pub enum LiveAnalyzerError {
37 #[error("Kubernetes API error: {0}")]
38 KubernetesError(#[from] MetricsError),
39
40 #[error("Prometheus error: {0}")]
41 PrometheusError(#[from] PrometheusError),
42
43 #[error("No cluster connection available")]
44 NoClusterConnection,
45
46 #[error("Insufficient data for reliable recommendations")]
47 InsufficientData,
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
52pub enum DataSource {
53 MetricsServer,
55 Prometheus,
57 Combined,
59 Static,
61}
62
63#[derive(Debug, Clone)]
65pub struct LiveAnalyzerConfig {
66 pub prometheus_url: Option<String>,
68 pub history_period: String,
70 pub safety_margin_pct: u8,
72 pub min_samples: usize,
74 pub waste_threshold_pct: f32,
76 pub namespace: Option<String>,
78 pub include_system: bool,
80}
81
82impl Default for LiveAnalyzerConfig {
83 fn default() -> Self {
84 Self {
85 prometheus_url: None,
86 history_period: "7d".to_string(),
87 safety_margin_pct: 20,
88 min_samples: 100,
89 waste_threshold_pct: 10.0,
90 namespace: None,
91 include_system: false,
92 }
93 }
94}
95
96pub struct LiveAnalyzer {
98 metrics_client: Option<MetricsClient>,
99 prometheus_client: Option<PrometheusClient>,
100 config: LiveAnalyzerConfig,
101}
102
103impl LiveAnalyzer {
104 pub async fn new(config: LiveAnalyzerConfig) -> Result<Self, LiveAnalyzerError> {
106 let metrics_client = match MetricsClient::new().await {
108 Ok(client) => Some(client),
109 Err(e) => {
110 eprintln!("Warning: Could not connect to Kubernetes cluster: {}", e);
111 None
112 }
113 };
114
115 let prometheus_client =
117 config
118 .prometheus_url
119 .as_ref()
120 .and_then(|url| match PrometheusClient::new(url) {
121 Ok(client) => Some(client),
122 Err(e) => {
123 eprintln!("Warning: Could not create Prometheus client: {}", e);
124 None
125 }
126 });
127
128 Ok(Self {
129 metrics_client,
130 prometheus_client,
131 config,
132 })
133 }
134
135 pub async fn with_context(
137 context: &str,
138 config: LiveAnalyzerConfig,
139 ) -> Result<Self, LiveAnalyzerError> {
140 let metrics_client = match MetricsClient::with_context(context).await {
141 Ok(client) => Some(client),
142 Err(e) => {
143 eprintln!("Warning: Could not connect to context '{}': {}", context, e);
144 None
145 }
146 };
147
148 let prometheus_client = config
149 .prometheus_url
150 .as_ref()
151 .and_then(|url| PrometheusClient::new(url).ok());
152
153 Ok(Self {
154 metrics_client,
155 prometheus_client,
156 config,
157 })
158 }
159
160 pub async fn available_sources(&self) -> Vec<DataSource> {
162 let mut sources = vec![DataSource::Static]; if let Some(ref metrics) = self.metrics_client
165 && metrics.is_metrics_available().await
166 {
167 sources.push(DataSource::MetricsServer);
168 }
169
170 if let Some(ref prometheus) = self.prometheus_client
171 && prometheus.is_available().await
172 {
173 sources.push(DataSource::Prometheus);
174 }
175
176 if sources.contains(&DataSource::MetricsServer) && sources.contains(&DataSource::Prometheus)
177 {
178 sources.push(DataSource::Combined);
179 }
180
181 sources
182 }
183
184 pub async fn analyze(&self) -> Result<LiveAnalysisResult, LiveAnalyzerError> {
186 let sources = self.available_sources().await;
187
188 let best_source = if sources.contains(&DataSource::Combined) {
189 DataSource::Combined
190 } else if sources.contains(&DataSource::Prometheus) {
191 DataSource::Prometheus
192 } else if sources.contains(&DataSource::MetricsServer) {
193 DataSource::MetricsServer
194 } else {
195 DataSource::Static
196 };
197
198 match best_source {
199 DataSource::Combined => self.analyze_combined().await,
200 DataSource::Prometheus => self.analyze_prometheus().await,
201 DataSource::MetricsServer => self.analyze_metrics_server().await,
202 DataSource::Static => Ok(LiveAnalysisResult::static_fallback()),
203 }
204 }
205
206 async fn analyze_metrics_server(&self) -> Result<LiveAnalysisResult, LiveAnalyzerError> {
208 let client = self
209 .metrics_client
210 .as_ref()
211 .ok_or(LiveAnalyzerError::NoClusterConnection)?;
212
213 let namespace = self.config.namespace.as_deref();
214 let comparisons = client.compare_usage(namespace).await?;
215 let total_count = comparisons.len();
216
217 let mut recommendations = Vec::new();
218 let mut total_cpu_waste: u64 = 0;
219 let mut total_memory_waste: u64 = 0;
220 let mut over_provisioned = 0;
221 let mut under_provisioned = 0;
222
223 for comp in comparisons {
224 if !self.config.include_system && is_system_namespace(&comp.namespace) {
226 continue;
227 }
228
229 if comp.cpu_waste_pct.abs() < self.config.waste_threshold_pct
231 && comp.memory_waste_pct.abs() < self.config.waste_threshold_pct
232 {
233 continue;
234 }
235
236 let recommendation = self.comparison_to_recommendation(&comp);
237
238 if comp.cpu_waste_pct > 0.0 || comp.memory_waste_pct > 0.0 {
239 over_provisioned += 1;
240 if let Some(req) = comp.cpu_request {
241 total_cpu_waste += (req as f32 * (comp.cpu_waste_pct / 100.0)) as u64;
242 }
243 if let Some(req) = comp.memory_request {
244 total_memory_waste += (req as f32 * (comp.memory_waste_pct / 100.0)) as u64;
245 }
246 } else {
247 under_provisioned += 1;
248 }
249
250 recommendations.push(recommendation);
251 }
252
253 Ok(LiveAnalysisResult {
254 source: DataSource::MetricsServer,
255 recommendations,
256 summary: AnalysisSummary {
257 resources_analyzed: total_count,
258 over_provisioned,
259 under_provisioned,
260 optimal: total_count.saturating_sub(over_provisioned + under_provisioned),
261 total_cpu_waste_millicores: total_cpu_waste,
262 total_memory_waste_bytes: total_memory_waste,
263 confidence: 60, },
265 warnings: vec![
266 "Real-time snapshot only. For accurate recommendations, enable Prometheus for historical data.".to_string()
267 ],
268 })
269 }
270
271 async fn analyze_prometheus(&self) -> Result<LiveAnalysisResult, LiveAnalyzerError> {
273 let client = self
274 .prometheus_client
275 .as_ref()
276 .ok_or(LiveAnalyzerError::NoClusterConnection)?;
277
278 let metrics_client = self.metrics_client.as_ref();
279
280 let pod_resources = if let Some(mc) = metrics_client {
282 mc.get_pod_resources(self.config.namespace.as_deref())
283 .await
284 .ok()
285 } else {
286 None
287 };
288
289 let mut recommendations = Vec::new();
290 let mut over_provisioned = 0;
291 let mut under_provisioned = 0;
292 let mut total_cpu_waste: u64 = 0;
293 let mut total_memory_waste: u64 = 0;
294
295 let workloads = if let Some(ref resources) = pod_resources {
297 extract_workloads(resources)
298 } else {
299 Vec::new()
300 };
301
302 let resources_analyzed = workloads.len();
303
304 for (namespace, owner_name, containers) in workloads {
305 if !self.config.include_system && is_system_namespace(&namespace) {
306 continue;
307 }
308
309 for (container_name, cpu_request, memory_request) in containers {
310 match client
311 .get_container_history(
312 &namespace,
313 &owner_name,
314 &container_name,
315 &self.config.history_period,
316 )
317 .await
318 {
319 Ok(history) => {
320 let rec = PrometheusClient::generate_recommendation(
321 &history,
322 cpu_request,
323 memory_request,
324 self.config.safety_margin_pct,
325 );
326
327 if rec.cpu_savings_pct.abs() < self.config.waste_threshold_pct
328 && rec.memory_savings_pct.abs() < self.config.waste_threshold_pct
329 {
330 continue;
331 }
332
333 if rec.cpu_savings_pct > 0.0 || rec.memory_savings_pct > 0.0 {
334 over_provisioned += 1;
335 if let Some(req) = cpu_request {
336 total_cpu_waste +=
337 (req as f32 * (rec.cpu_savings_pct / 100.0)) as u64;
338 }
339 if let Some(req) = memory_request {
340 total_memory_waste +=
341 (req as f32 * (rec.memory_savings_pct / 100.0)) as u64;
342 }
343 } else {
344 under_provisioned += 1;
345 }
346
347 recommendations
348 .push(self.history_to_recommendation(&rec, &namespace, &history));
349 }
350 Err(_) => continue,
351 }
352 }
353 }
354
355 Ok(LiveAnalysisResult {
356 source: DataSource::Prometheus,
357 recommendations,
358 summary: AnalysisSummary {
359 resources_analyzed,
360 over_provisioned,
361 under_provisioned,
362 optimal: resources_analyzed - over_provisioned - under_provisioned,
363 total_cpu_waste_millicores: total_cpu_waste,
364 total_memory_waste_bytes: total_memory_waste,
365 confidence: 85,
366 },
367 warnings: vec![],
368 })
369 }
370
371 async fn analyze_combined(&self) -> Result<LiveAnalysisResult, LiveAnalyzerError> {
373 let mut result = self.analyze_prometheus().await?;
375
376 if let Ok(_realtime) = self.analyze_metrics_server().await {
378 result.source = DataSource::Combined;
380 result.summary.confidence = 95;
381 result.warnings = vec![];
382 }
383
384 Ok(result)
385 }
386
387 fn comparison_to_recommendation(&self, comp: &ResourceComparison) -> LiveRecommendation {
389 let severity = if comp.memory_waste_pct < -25.0 {
390 Severity::Critical } else if comp.cpu_waste_pct < -25.0 || comp.memory_waste_pct < -10.0 {
392 Severity::High
393 } else if comp.cpu_waste_pct > 50.0 || comp.memory_waste_pct > 50.0 {
394 Severity::High
395 } else if comp.cpu_waste_pct > 25.0 || comp.memory_waste_pct > 25.0 {
396 Severity::Medium
397 } else {
398 Severity::Low
399 };
400
401 let margin = 1.0 + (self.config.safety_margin_pct as f64 / 100.0);
402 let recommended_cpu = round_cpu((comp.cpu_actual as f64 * margin) as u64);
403 let recommended_memory = round_memory((comp.memory_actual as f64 * margin) as u64);
404
405 LiveRecommendation {
406 workload_name: comp
407 .owner_name
408 .clone()
409 .unwrap_or_else(|| comp.pod_name.clone()),
410 workload_kind: comp.owner_kind.clone().unwrap_or_else(|| "Pod".to_string()),
411 namespace: comp.namespace.clone(),
412 container_name: comp.container_name.clone(),
413 severity,
414 current_cpu_millicores: comp.cpu_request,
415 current_memory_bytes: comp.memory_request,
416 actual_cpu_millicores: comp.cpu_actual,
417 actual_memory_bytes: comp.memory_actual,
418 recommended_cpu_millicores: recommended_cpu,
419 recommended_memory_bytes: recommended_memory,
420 cpu_waste_pct: comp.cpu_waste_pct,
421 memory_waste_pct: comp.memory_waste_pct,
422 confidence: 60,
423 data_source: DataSource::MetricsServer,
424 }
425 }
426
427 fn history_to_recommendation(
429 &self,
430 rec: &HistoricalRecommendation,
431 namespace: &str,
432 history: &ContainerHistory,
433 ) -> LiveRecommendation {
434 let severity = if rec.memory_savings_pct < -25.0 {
435 Severity::Critical
436 } else if rec.cpu_savings_pct > 50.0 || rec.memory_savings_pct > 50.0 {
437 Severity::High
438 } else if rec.cpu_savings_pct > 25.0 || rec.memory_savings_pct > 25.0 {
439 Severity::Medium
440 } else {
441 Severity::Low
442 };
443
444 LiveRecommendation {
445 workload_name: rec.workload_name.clone(),
446 workload_kind: "Deployment".to_string(), namespace: namespace.to_string(),
448 container_name: rec.container_name.clone(),
449 severity,
450 current_cpu_millicores: rec.current_cpu_request,
451 current_memory_bytes: rec.current_memory_request,
452 actual_cpu_millicores: history.cpu_p99,
453 actual_memory_bytes: history.memory_p99,
454 recommended_cpu_millicores: rec.recommended_cpu_request,
455 recommended_memory_bytes: rec.recommended_memory_request,
456 cpu_waste_pct: rec.cpu_savings_pct,
457 memory_waste_pct: rec.memory_savings_pct,
458 confidence: rec.confidence,
459 data_source: DataSource::Prometheus,
460 }
461 }
462}
463
464#[derive(Debug, Clone, Serialize, Deserialize)]
466pub struct LiveAnalysisResult {
467 pub source: DataSource,
469 pub recommendations: Vec<LiveRecommendation>,
471 pub summary: AnalysisSummary,
473 pub warnings: Vec<String>,
475}
476
477impl LiveAnalysisResult {
478 fn static_fallback() -> Self {
480 Self {
481 source: DataSource::Static,
482 recommendations: vec![],
483 summary: AnalysisSummary {
484 resources_analyzed: 0,
485 over_provisioned: 0,
486 under_provisioned: 0,
487 optimal: 0,
488 total_cpu_waste_millicores: 0,
489 total_memory_waste_bytes: 0,
490 confidence: 0,
491 },
492 warnings: vec![
493 "No cluster connection available. Using static analysis only.".to_string(),
494 "Connect to a cluster with --cluster for data-driven recommendations.".to_string(),
495 ],
496 }
497 }
498}
499
500#[derive(Debug, Clone, Serialize, Deserialize)]
502pub struct AnalysisSummary {
503 pub resources_analyzed: usize,
504 pub over_provisioned: usize,
505 pub under_provisioned: usize,
506 pub optimal: usize,
507 pub total_cpu_waste_millicores: u64,
508 pub total_memory_waste_bytes: u64,
509 pub confidence: u8,
511}
512
513#[derive(Debug, Clone, Serialize, Deserialize)]
515pub struct LiveRecommendation {
516 pub workload_name: String,
517 pub workload_kind: String,
518 pub namespace: String,
519 pub container_name: String,
520 pub severity: Severity,
521 pub current_cpu_millicores: Option<u64>,
523 pub current_memory_bytes: Option<u64>,
525 pub actual_cpu_millicores: u64,
527 pub actual_memory_bytes: u64,
529 pub recommended_cpu_millicores: u64,
531 pub recommended_memory_bytes: u64,
533 pub cpu_waste_pct: f32,
535 pub memory_waste_pct: f32,
537 pub confidence: u8,
539 pub data_source: DataSource,
541}
542
543impl LiveRecommendation {
544 pub fn generate_fix_yaml(&self) -> String {
546 let cpu_str = format_cpu_millicores(self.recommended_cpu_millicores);
547 let mem_str = format_memory_bytes(self.recommended_memory_bytes);
548
549 format!(
550 "# Fix for {}/{} container {}
551# Source: {:?} (confidence: {}%)
552resources:
553 requests:
554 cpu: \"{}\"
555 memory: \"{}\"
556 limits:
557 cpu: \"{}\" # Consider 2x request for burst
558 memory: \"{}\" # Same as request to prevent OOM",
559 self.namespace,
560 self.workload_name,
561 self.container_name,
562 self.data_source,
563 self.confidence,
564 cpu_str,
565 mem_str,
566 format_cpu_millicores(self.recommended_cpu_millicores * 2), mem_str, )
569 }
570}
571
572fn format_cpu_millicores(millicores: u64) -> String {
574 if millicores >= 1000 {
575 format!("{}", millicores / 1000) } else {
577 format!("{}m", millicores)
578 }
579}
580
581fn format_memory_bytes(bytes: u64) -> String {
583 const GI: u64 = 1024 * 1024 * 1024;
584 const MI: u64 = 1024 * 1024;
585
586 if bytes >= GI {
587 format!("{}Gi", bytes / GI)
588 } else {
589 format!("{}Mi", bytes / MI)
590 }
591}
592
593fn is_system_namespace(namespace: &str) -> bool {
599 matches!(
600 namespace,
601 "kube-system"
602 | "kube-public"
603 | "kube-node-lease"
604 | "default"
605 | "ingress-nginx"
606 | "cert-manager"
607 | "monitoring"
608 | "logging"
609 | "istio-system"
610 )
611}
612
613fn extract_workloads(
615 resources: &[PodResources],
616) -> Vec<(String, String, Vec<(String, Option<u64>, Option<u64>)>)> {
617 use std::collections::HashMap;
618
619 let mut workloads: HashMap<(String, String), Vec<(String, Option<u64>, Option<u64>)>> =
620 HashMap::new();
621
622 for pod in resources {
623 let owner = pod.owner_name.clone().unwrap_or_else(|| pod.name.clone());
624 let key = (pod.namespace.clone(), owner);
625
626 let containers: Vec<_> = pod
627 .containers
628 .iter()
629 .map(|c| (c.name.clone(), c.cpu_request, c.memory_request))
630 .collect();
631
632 workloads.entry(key).or_default().extend(containers);
633 }
634
635 workloads
636 .into_iter()
637 .map(|((ns, owner), containers)| (ns, owner, containers))
638 .collect()
639}
640
641fn round_cpu(millicores: u64) -> u64 {
644 if millicores == 0 {
645 0
646 } else if millicores <= 100 {
647 millicores.div_ceil(25) * 25
649 } else if millicores <= 1000 {
650 ((millicores + 25) / 50) * 50
652 } else {
653 ((millicores + 50) / 100) * 100
655 }
656}
657
658fn round_memory(bytes: u64) -> u64 {
660 const MI: u64 = 1024 * 1024;
661 if bytes <= 128 * MI {
662 ((bytes + 16 * MI) / (32 * MI)) * (32 * MI)
663 } else {
664 ((bytes + 32 * MI) / (64 * MI)) * (64 * MI)
665 }
666}
667
668#[cfg(test)]
669mod tests {
670 use super::*;
671
672 #[test]
673 fn test_is_system_namespace() {
674 assert!(is_system_namespace("kube-system"));
675 assert!(is_system_namespace("kube-public"));
676 assert!(!is_system_namespace("production"));
677 assert!(!is_system_namespace("my-app"));
678 }
679
680 #[test]
681 fn test_round_cpu() {
682 assert_eq!(round_cpu(10), 25);
683 assert_eq!(round_cpu(90), 100);
684 assert_eq!(round_cpu(150), 150);
685 assert_eq!(round_cpu(1250), 1300);
686 }
687}