syncable_cli/analyzer/k8s_optimize/
metrics_client.rs

1//! Kubernetes Metrics Client for live cluster resource usage.
2//!
3//! Connects to a Kubernetes cluster and fetches actual CPU/memory usage
4//! from the metrics-server API. This provides the "ground truth" data
5//! needed for precise right-sizing recommendations.
6//!
7//! # Prerequisites
8//!
9//! - Valid kubeconfig (uses default context or specified context)
10//! - metrics-server installed in the cluster
11//! - RBAC permissions to read pods and metrics
12//!
13//! # Example
14//!
15//! ```rust,ignore
16//! use syncable_cli::analyzer::k8s_optimize::metrics_client::MetricsClient;
17//!
18//! let client = MetricsClient::new().await?;
19//! let metrics = client.get_pod_metrics("default").await?;
20//!
21//! for pod in metrics {
22//!     println!("{}: CPU={}, Memory={}", pod.name, pod.cpu_usage, pod.memory_usage);
23//! }
24//! ```
25
26use k8s_openapi::api::core::v1::{Container, Pod};
27use kube::{
28    Client, Config,
29    api::{Api, ListParams},
30};
31use serde::{Deserialize, Serialize};
32use std::collections::HashMap;
33
34/// Error type for metrics client operations.
35#[derive(Debug, thiserror::Error)]
36pub enum MetricsError {
37    #[error("Failed to create Kubernetes client: {0}")]
38    ClientCreation(#[from] kube::Error),
39
40    #[error("Failed to infer Kubernetes config: {0}")]
41    ConfigError(#[from] kube::config::InferConfigError),
42
43    #[error("Failed to read kubeconfig: {0}")]
44    KubeconfigError(#[from] kube::config::KubeconfigError),
45
46    #[error("Metrics server not available or not installed")]
47    MetricsServerUnavailable,
48
49    #[error("Namespace not found: {0}")]
50    NamespaceNotFound(String),
51
52    #[error("Failed to parse resource quantity: {0}")]
53    QuantityParse(String),
54
55    #[error("API request failed: {0}")]
56    ApiError(String),
57}
58
59/// Metrics for a single pod.
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct PodMetrics {
62    /// Pod name
63    pub name: String,
64    /// Namespace
65    pub namespace: String,
66    /// Container metrics
67    pub containers: Vec<ContainerMetrics>,
68    /// Total CPU usage in millicores
69    pub total_cpu_millicores: u64,
70    /// Total memory usage in bytes
71    pub total_memory_bytes: u64,
72    /// Timestamp of the metrics
73    pub timestamp: String,
74}
75
76/// Metrics for a single container.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct ContainerMetrics {
79    /// Container name
80    pub name: String,
81    /// CPU usage in millicores
82    pub cpu_millicores: u64,
83    /// Memory usage in bytes
84    pub memory_bytes: u64,
85}
86
87/// Resource specifications from pod spec.
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct PodResources {
90    /// Pod name
91    pub name: String,
92    /// Namespace
93    pub namespace: String,
94    /// Owner reference (Deployment, StatefulSet, etc.)
95    pub owner_kind: Option<String>,
96    /// Owner name
97    pub owner_name: Option<String>,
98    /// Container resources
99    pub containers: Vec<ContainerResources>,
100}
101
102/// Resource specifications for a container.
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct ContainerResources {
105    /// Container name
106    pub name: String,
107    /// Container image
108    pub image: String,
109    /// CPU request in millicores
110    pub cpu_request: Option<u64>,
111    /// Memory request in bytes
112    pub memory_request: Option<u64>,
113    /// CPU limit in millicores
114    pub cpu_limit: Option<u64>,
115    /// Memory limit in bytes
116    pub memory_limit: Option<u64>,
117}
118
119/// Comparison between requested and actual resource usage.
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct ResourceComparison {
122    /// Pod name
123    pub pod_name: String,
124    /// Namespace
125    pub namespace: String,
126    /// Container name
127    pub container_name: String,
128    /// Owner kind (Deployment, StatefulSet, etc.)
129    pub owner_kind: Option<String>,
130    /// Owner name
131    pub owner_name: Option<String>,
132    /// CPU request in millicores
133    pub cpu_request: Option<u64>,
134    /// Actual CPU usage in millicores
135    pub cpu_actual: u64,
136    /// CPU waste percentage (negative if under-provisioned)
137    pub cpu_waste_pct: f32,
138    /// Memory request in bytes
139    pub memory_request: Option<u64>,
140    /// Actual memory usage in bytes
141    pub memory_actual: u64,
142    /// Memory waste percentage (negative if under-provisioned)
143    pub memory_waste_pct: f32,
144}
145
146/// Kubernetes metrics client.
147pub struct MetricsClient {
148    client: Client,
149}
150
151impl MetricsClient {
152    /// Create a new metrics client using the default kubeconfig.
153    pub async fn new() -> Result<Self, MetricsError> {
154        let config = Config::infer().await?;
155        let client = Client::try_from(config)?;
156        Ok(Self { client })
157    }
158
159    /// Create a new metrics client with a specific kubeconfig context.
160    pub async fn with_context(context: &str) -> Result<Self, MetricsError> {
161        let kubeconfig = kube::config::Kubeconfig::read()?;
162        let config = Config::from_custom_kubeconfig(
163            kubeconfig,
164            &kube::config::KubeConfigOptions {
165                context: Some(context.to_string()),
166                ..Default::default()
167            },
168        )
169        .await?;
170        let client = Client::try_from(config)?;
171        Ok(Self { client })
172    }
173
174    /// Get the current context name.
175    pub async fn current_context() -> Result<String, MetricsError> {
176        let kubeconfig = kube::config::Kubeconfig::read()?;
177        Ok(kubeconfig
178            .current_context
179            .unwrap_or_else(|| "default".to_string()))
180    }
181
182    /// List available contexts.
183    pub async fn list_contexts() -> Result<Vec<String>, MetricsError> {
184        let kubeconfig = kube::config::Kubeconfig::read()?;
185        Ok(kubeconfig.contexts.into_iter().map(|c| c.name).collect())
186    }
187
188    /// Get pod resource specifications from the cluster.
189    pub async fn get_pod_resources(
190        &self,
191        namespace: Option<&str>,
192    ) -> Result<Vec<PodResources>, MetricsError> {
193        let pods: Api<Pod> = match namespace {
194            Some(ns) => Api::namespaced(self.client.clone(), ns),
195            None => Api::all(self.client.clone()),
196        };
197
198        let pod_list = pods
199            .list(&ListParams::default())
200            .await
201            .map_err(|e| MetricsError::ApiError(format!("Failed to list pods: {}", e)))?;
202
203        let mut results = Vec::new();
204
205        for pod in pod_list.items {
206            let metadata = pod.metadata;
207            let spec = match pod.spec {
208                Some(s) => s,
209                None => continue,
210            };
211
212            let name = metadata.name.unwrap_or_default();
213            let namespace = metadata.namespace.unwrap_or_else(|| "default".to_string());
214
215            // Get owner reference
216            let (owner_kind, owner_name) = metadata
217                .owner_references
218                .and_then(|refs| refs.into_iter().next())
219                .map(|owner| (Some(owner.kind), Some(owner.name)))
220                .unwrap_or((None, None));
221
222            let containers: Vec<ContainerResources> = spec
223                .containers
224                .into_iter()
225                .map(|c| container_to_resources(&c))
226                .collect();
227
228            results.push(PodResources {
229                name,
230                namespace,
231                owner_kind,
232                owner_name,
233                containers,
234            });
235        }
236
237        Ok(results)
238    }
239
240    /// Get pod metrics from the metrics-server.
241    ///
242    /// Note: This requires the metrics-server to be installed in the cluster.
243    /// The metrics API is a custom resource, so we use a raw request.
244    pub async fn get_pod_metrics(
245        &self,
246        namespace: Option<&str>,
247    ) -> Result<Vec<PodMetrics>, MetricsError> {
248        // The metrics API path depends on whether we're querying a specific namespace
249        let path = match namespace {
250            Some(ns) => format!("/apis/metrics.k8s.io/v1beta1/namespaces/{}/pods", ns),
251            None => "/apis/metrics.k8s.io/v1beta1/pods".to_string(),
252        };
253
254        // Make raw API request
255        let request = http::Request::builder()
256            .method("GET")
257            .uri(&path)
258            .body(Vec::new())
259            .map_err(|e| MetricsError::ApiError(format!("Failed to build request: {}", e)))?;
260
261        let response = self
262            .client
263            .request::<PodMetricsList>(request)
264            .await
265            .map_err(|e| {
266                if e.to_string().contains("404") || e.to_string().contains("not found") {
267                    MetricsError::MetricsServerUnavailable
268                } else {
269                    MetricsError::ApiError(format!("Metrics API error: {}", e))
270                }
271            })?;
272
273        let results: Vec<PodMetrics> = response
274            .items
275            .into_iter()
276            .map(|pm| {
277                let containers: Vec<ContainerMetrics> = pm
278                    .containers
279                    .into_iter()
280                    .map(|c| ContainerMetrics {
281                        name: c.name,
282                        cpu_millicores: parse_cpu_quantity(&c.usage.cpu),
283                        memory_bytes: parse_memory_quantity(&c.usage.memory),
284                    })
285                    .collect();
286
287                let total_cpu: u64 = containers.iter().map(|c| c.cpu_millicores).sum();
288                let total_memory: u64 = containers.iter().map(|c| c.memory_bytes).sum();
289
290                PodMetrics {
291                    name: pm.metadata.name,
292                    namespace: pm.metadata.namespace,
293                    containers,
294                    total_cpu_millicores: total_cpu,
295                    total_memory_bytes: total_memory,
296                    timestamp: pm.timestamp,
297                }
298            })
299            .collect();
300
301        Ok(results)
302    }
303
304    /// Compare actual usage against requested resources.
305    pub async fn compare_usage(
306        &self,
307        namespace: Option<&str>,
308    ) -> Result<Vec<ResourceComparison>, MetricsError> {
309        let resources = self.get_pod_resources(namespace).await?;
310        let metrics = self.get_pod_metrics(namespace).await?;
311
312        // Create a map of pod/container -> metrics
313        let mut metrics_map: HashMap<(String, String, String), (u64, u64)> = HashMap::new();
314        for pm in &metrics {
315            for cm in &pm.containers {
316                metrics_map.insert(
317                    (pm.namespace.clone(), pm.name.clone(), cm.name.clone()),
318                    (cm.cpu_millicores, cm.memory_bytes),
319                );
320            }
321        }
322
323        let mut comparisons = Vec::new();
324
325        for pod in resources {
326            for container in pod.containers {
327                let key = (
328                    pod.namespace.clone(),
329                    pod.name.clone(),
330                    container.name.clone(),
331                );
332
333                if let Some((cpu_actual, memory_actual)) = metrics_map.get(&key) {
334                    let cpu_waste_pct = calculate_waste_pct(container.cpu_request, *cpu_actual);
335                    let memory_waste_pct =
336                        calculate_waste_pct(container.memory_request, *memory_actual);
337
338                    comparisons.push(ResourceComparison {
339                        pod_name: pod.name.clone(),
340                        namespace: pod.namespace.clone(),
341                        container_name: container.name,
342                        owner_kind: pod.owner_kind.clone(),
343                        owner_name: pod.owner_name.clone(),
344                        cpu_request: container.cpu_request,
345                        cpu_actual: *cpu_actual,
346                        cpu_waste_pct,
347                        memory_request: container.memory_request,
348                        memory_actual: *memory_actual,
349                        memory_waste_pct,
350                    });
351                }
352            }
353        }
354
355        Ok(comparisons)
356    }
357
358    /// Check if metrics-server is available.
359    pub async fn is_metrics_available(&self) -> bool {
360        let request = http::Request::builder()
361            .method("GET")
362            .uri("/apis/metrics.k8s.io/v1beta1")
363            .body(Vec::new());
364
365        match request {
366            Ok(req) => self.client.request::<serde_json::Value>(req).await.is_ok(),
367            Err(_) => false,
368        }
369    }
370}
371
372// ============================================================================
373// Internal types for metrics API responses
374// ============================================================================
375
376#[derive(Debug, Deserialize)]
377struct PodMetricsList {
378    items: Vec<PodMetricsItem>,
379}
380
381#[derive(Debug, Deserialize)]
382struct PodMetricsItem {
383    metadata: PodMetricsMetadata,
384    timestamp: String,
385    containers: Vec<ContainerMetricsItem>,
386}
387
388#[derive(Debug, Deserialize)]
389struct PodMetricsMetadata {
390    name: String,
391    namespace: String,
392}
393
394#[derive(Debug, Deserialize)]
395struct ContainerMetricsItem {
396    name: String,
397    usage: ResourceUsage,
398}
399
400#[derive(Debug, Deserialize)]
401struct ResourceUsage {
402    cpu: String,
403    memory: String,
404}
405
406// ============================================================================
407// Helper functions
408// ============================================================================
409
410/// Convert a K8s container spec to our resource struct.
411fn container_to_resources(container: &Container) -> ContainerResources {
412    let resources = container.resources.as_ref();
413
414    let cpu_request = resources
415        .and_then(|r| r.requests.as_ref())
416        .and_then(|req| req.get("cpu"))
417        .map(|q| parse_cpu_quantity(&q.0));
418
419    let memory_request = resources
420        .and_then(|r| r.requests.as_ref())
421        .and_then(|req| req.get("memory"))
422        .map(|q| parse_memory_quantity(&q.0));
423
424    let cpu_limit = resources
425        .and_then(|r| r.limits.as_ref())
426        .and_then(|lim| lim.get("cpu"))
427        .map(|q| parse_cpu_quantity(&q.0));
428
429    let memory_limit = resources
430        .and_then(|r| r.limits.as_ref())
431        .and_then(|lim| lim.get("memory"))
432        .map(|q| parse_memory_quantity(&q.0));
433
434    ContainerResources {
435        name: container.name.clone(),
436        image: container.image.clone().unwrap_or_default(),
437        cpu_request,
438        memory_request,
439        cpu_limit,
440        memory_limit,
441    }
442}
443
444/// Parse a CPU quantity string (e.g., "100m", "1", "500n") to millicores.
445fn parse_cpu_quantity(quantity: &str) -> u64 {
446    let quantity = quantity.trim();
447
448    if let Some(val) = quantity.strip_suffix('n') {
449        // Nanocores to millicores
450        val.parse::<u64>().map(|n| n / 1_000_000).unwrap_or(0)
451    } else if let Some(val) = quantity.strip_suffix('u') {
452        // Microcores to millicores
453        val.parse::<u64>().map(|u| u / 1_000).unwrap_or(0)
454    } else if let Some(val) = quantity.strip_suffix('m') {
455        // Already in millicores
456        val.parse::<u64>().unwrap_or(0)
457    } else {
458        // Whole cores to millicores
459        quantity
460            .parse::<f64>()
461            .map(|c| (c * 1000.0) as u64)
462            .unwrap_or(0)
463    }
464}
465
466/// Parse a memory quantity string (e.g., "128Mi", "1Gi", "256000Ki") to bytes.
467fn parse_memory_quantity(quantity: &str) -> u64 {
468    let quantity = quantity.trim();
469
470    if let Some(val) = quantity.strip_suffix("Ki") {
471        val.parse::<u64>().map(|k| k * 1024).unwrap_or(0)
472    } else if let Some(val) = quantity.strip_suffix("Mi") {
473        val.parse::<u64>().map(|m| m * 1024 * 1024).unwrap_or(0)
474    } else if let Some(val) = quantity.strip_suffix("Gi") {
475        val.parse::<u64>()
476            .map(|g| g * 1024 * 1024 * 1024)
477            .unwrap_or(0)
478    } else if let Some(val) = quantity.strip_suffix("Ti") {
479        val.parse::<u64>()
480            .map(|t| t * 1024 * 1024 * 1024 * 1024)
481            .unwrap_or(0)
482    } else if let Some(val) = quantity.strip_suffix('K').or_else(|| quantity.strip_suffix('k')) {
483        val.parse::<u64>().map(|k| k * 1000).unwrap_or(0)
484    } else if let Some(val) = quantity.strip_suffix('M') {
485        val.parse::<u64>().map(|m| m * 1_000_000).unwrap_or(0)
486    } else if let Some(val) = quantity.strip_suffix('G') {
487        val.parse::<u64>().map(|g| g * 1_000_000_000).unwrap_or(0)
488    } else {
489        // Plain bytes
490        quantity.parse::<u64>().unwrap_or(0)
491    }
492}
493
494/// Calculate waste percentage.
495/// Positive = over-provisioned, Negative = under-provisioned
496fn calculate_waste_pct(request: Option<u64>, actual: u64) -> f32 {
497    match request {
498        Some(req) if req > 0 => {
499            let waste = req as f32 - actual as f32;
500            (waste / req as f32) * 100.0
501        }
502        _ => 0.0, // No request defined, can't calculate waste
503    }
504}
505
506#[cfg(test)]
507mod tests {
508    use super::*;
509
510    #[test]
511    fn test_parse_cpu_quantity() {
512        assert_eq!(parse_cpu_quantity("100m"), 100);
513        assert_eq!(parse_cpu_quantity("1"), 1000);
514        assert_eq!(parse_cpu_quantity("0.5"), 500);
515        assert_eq!(parse_cpu_quantity("2.5"), 2500);
516        assert_eq!(parse_cpu_quantity("500000000n"), 500);
517        assert_eq!(parse_cpu_quantity("500000u"), 500);
518    }
519
520    #[test]
521    fn test_parse_memory_quantity() {
522        assert_eq!(parse_memory_quantity("128Mi"), 128 * 1024 * 1024);
523        assert_eq!(parse_memory_quantity("1Gi"), 1024 * 1024 * 1024);
524        assert_eq!(parse_memory_quantity("256Ki"), 256 * 1024);
525        assert_eq!(parse_memory_quantity("500M"), 500_000_000);
526        assert_eq!(parse_memory_quantity("1G"), 1_000_000_000);
527        assert_eq!(parse_memory_quantity("1000000"), 1_000_000);
528    }
529
530    #[test]
531    fn test_calculate_waste_pct() {
532        // 50% over-provisioned
533        assert!((calculate_waste_pct(Some(1000), 500) - 50.0).abs() < 0.1);
534        // 100% over-provisioned (no usage)
535        assert!((calculate_waste_pct(Some(1000), 0) - 100.0).abs() < 0.1);
536        // Under-provisioned (using more than requested)
537        assert!((calculate_waste_pct(Some(500), 1000) - (-100.0)).abs() < 0.1);
538        // No request defined
539        assert!((calculate_waste_pct(None, 500) - 0.0).abs() < 0.1);
540    }
541}