syncable_cli/analyzer/k8s_optimize/
prometheus_client.rs

1//! Prometheus Client for historical Kubernetes metrics.
2//!
3//! Fetches historical CPU/memory usage data from Prometheus to calculate
4//! percentile values (P50, P95, P99, max) for accurate right-sizing.
5//!
6//! # Prerequisites
7//!
8//! - Prometheus accessible (via port-forward, ingress, or direct URL)
9//! - Prometheus collecting Kubernetes metrics (typically via kube-state-metrics and cAdvisor)
10//!
11//! # Authentication
12//!
13//! Authentication is **optional** and typically not needed when using `kubectl port-forward`
14//! because the connection goes directly to the pod, bypassing ingress/auth layers.
15//! Auth is only needed for externally exposed Prometheus instances.
16//!
17//! # Example
18//!
19//! ```rust,ignore
20//! use syncable_cli::analyzer::k8s_optimize::prometheus_client::{PrometheusClient, PrometheusAuth};
21//!
22//! // Default: No authentication (works with port-forward)
23//! let client = PrometheusClient::new("http://localhost:9090")?;
24//!
25//! // With authentication (for external Prometheus)
26//! let client = PrometheusClient::with_auth(
27//!     "https://prometheus.example.com",
28//!     PrometheusAuth::Bearer("token123".to_string())
29//! )?;
30//!
31//! let history = client.get_container_history("default", "api-gateway", "main", "7d").await?;
32//! println!("CPU P99: {}m", history.cpu_p99);
33//! ```
34
35use reqwest::{Client, RequestBuilder};
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38
39/// Error type for Prometheus client operations.
40#[derive(Debug, thiserror::Error)]
41pub enum PrometheusError {
42    #[error("Failed to connect to Prometheus: {0}")]
43    ConnectionFailed(String),
44
45    #[error("HTTP request failed: {0}")]
46    HttpError(#[from] reqwest::Error),
47
48    #[error("Invalid Prometheus URL: {0}")]
49    InvalidUrl(String),
50
51    #[error("Query failed: {0}")]
52    QueryFailed(String),
53
54    #[error("No data available for the specified time range")]
55    NoData,
56
57    #[error("Failed to parse response: {0}")]
58    ParseError(String),
59
60    #[error("Authentication failed: {0}")]
61    AuthError(String),
62}
63
64/// Authentication method for Prometheus (optional).
65///
66/// Authentication is typically NOT needed when using `kubectl port-forward`
67/// because the connection goes directly to the pod, bypassing ingress/auth layers.
68/// Auth is only needed for externally exposed Prometheus instances.
69#[derive(Debug, Clone, Default, Serialize, Deserialize)]
70pub enum PrometheusAuth {
71    /// No authentication (default - works for port-forward)
72    #[default]
73    None,
74    /// Basic auth (for externally exposed Prometheus)
75    Basic { username: String, password: String },
76    /// Bearer token (for externally exposed Prometheus with OAuth/OIDC)
77    Bearer(String),
78}
79
80/// Historical resource usage data for a container.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct ContainerHistory {
83    /// Pod name
84    pub pod_name: String,
85    /// Container name
86    pub container_name: String,
87    /// Namespace
88    pub namespace: String,
89    /// Time range queried (e.g., "7d", "30d")
90    pub time_range: String,
91    /// Number of data points
92    pub sample_count: usize,
93    /// CPU usage percentiles (in millicores)
94    pub cpu_min: u64,
95    pub cpu_p50: u64,
96    pub cpu_p95: u64,
97    pub cpu_p99: u64,
98    pub cpu_max: u64,
99    pub cpu_avg: u64,
100    /// Memory usage percentiles (in bytes)
101    pub memory_min: u64,
102    pub memory_p50: u64,
103    pub memory_p95: u64,
104    pub memory_p99: u64,
105    pub memory_max: u64,
106    pub memory_avg: u64,
107}
108
109/// Aggregated history for a workload (Deployment/StatefulSet/etc).
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct WorkloadHistory {
112    /// Workload name
113    pub workload_name: String,
114    /// Workload kind (Deployment, StatefulSet, etc.)
115    pub workload_kind: String,
116    /// Namespace
117    pub namespace: String,
118    /// Container histories
119    pub containers: Vec<ContainerHistory>,
120    /// Time range queried
121    pub time_range: String,
122}
123
124/// Right-sizing recommendation based on historical data.
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct HistoricalRecommendation {
127    /// Workload name
128    pub workload_name: String,
129    /// Container name
130    pub container_name: String,
131    /// Current CPU request (millicores)
132    pub current_cpu_request: Option<u64>,
133    /// Recommended CPU request (millicores)
134    pub recommended_cpu_request: u64,
135    /// CPU savings percentage (negative if under-provisioned)
136    pub cpu_savings_pct: f32,
137    /// Current memory request (bytes)
138    pub current_memory_request: Option<u64>,
139    /// Recommended memory request (bytes)
140    pub recommended_memory_request: u64,
141    /// Memory savings percentage (negative if under-provisioned)
142    pub memory_savings_pct: f32,
143    /// Confidence level (0-100, based on sample count)
144    pub confidence: u8,
145    /// Safety margin applied
146    pub safety_margin_pct: u8,
147}
148
149/// Prometheus client for querying historical metrics.
150pub struct PrometheusClient {
151    base_url: String,
152    http_client: Client,
153    auth: PrometheusAuth,
154}
155
156impl PrometheusClient {
157    /// Create a new Prometheus client without authentication.
158    ///
159    /// This is the default and works for `kubectl port-forward` connections
160    /// where no authentication is needed.
161    pub fn new(url: &str) -> Result<Self, PrometheusError> {
162        Self::with_auth(url, PrometheusAuth::None)
163    }
164
165    /// Create a new Prometheus client with optional authentication.
166    ///
167    /// Use this for externally exposed Prometheus instances that require auth.
168    pub fn with_auth(url: &str, auth: PrometheusAuth) -> Result<Self, PrometheusError> {
169        let base_url = url.trim_end_matches('/').to_string();
170
171        // Validate URL format
172        if !base_url.starts_with("http://") && !base_url.starts_with("https://") {
173            return Err(PrometheusError::InvalidUrl(
174                "URL must start with http:// or https://".to_string(),
175            ));
176        }
177
178        let http_client = Client::builder()
179            .timeout(std::time::Duration::from_secs(30))
180            .build()?;
181
182        Ok(Self {
183            base_url,
184            http_client,
185            auth,
186        })
187    }
188
189    /// Add authentication headers to a request (if configured).
190    fn add_auth(&self, req: RequestBuilder) -> RequestBuilder {
191        match &self.auth {
192            PrometheusAuth::None => req,
193            PrometheusAuth::Basic { username, password } => {
194                req.basic_auth(username, Some(password))
195            }
196            PrometheusAuth::Bearer(token) => req.bearer_auth(token),
197        }
198    }
199
200    /// Check if Prometheus is reachable.
201    pub async fn is_available(&self) -> bool {
202        // Use the health endpoint which is faster and simpler
203        let url = format!("{}/-/healthy", self.base_url);
204        let req = self
205            .http_client
206            .get(&url)
207            .timeout(std::time::Duration::from_secs(5));
208        match self.add_auth(req).send().await {
209            Ok(response) => response.status().is_success(),
210            Err(_) => false,
211        }
212    }
213
214    /// Get container CPU/memory history.
215    pub async fn get_container_history(
216        &self,
217        namespace: &str,
218        pod_pattern: &str,
219        container: &str,
220        time_range: &str,
221    ) -> Result<ContainerHistory, PrometheusError> {
222        let duration = parse_duration(time_range)?;
223
224        // Query CPU usage (rate of CPU seconds over time, converted to millicores)
225        let cpu_query = format!(
226            r#"rate(container_cpu_usage_seconds_total{{namespace="{}", pod=~"{}.*", container="{}"}}[5m]) * 1000"#,
227            namespace, pod_pattern, container
228        );
229
230        // Query memory usage
231        let memory_query = format!(
232            r#"container_memory_working_set_bytes{{namespace="{}", pod=~"{}.*", container="{}"}}"#,
233            namespace, pod_pattern, container
234        );
235
236        let cpu_values = self.query_range(&cpu_query, &duration).await?;
237        let memory_values = self.query_range(&memory_query, &duration).await?;
238
239        if cpu_values.is_empty() && memory_values.is_empty() {
240            return Err(PrometheusError::NoData);
241        }
242
243        Ok(ContainerHistory {
244            pod_name: pod_pattern.to_string(),
245            container_name: container.to_string(),
246            namespace: namespace.to_string(),
247            time_range: time_range.to_string(),
248            sample_count: cpu_values.len().max(memory_values.len()),
249            cpu_min: percentile(&cpu_values, 0.0) as u64,
250            cpu_p50: percentile(&cpu_values, 0.50) as u64,
251            cpu_p95: percentile(&cpu_values, 0.95) as u64,
252            cpu_p99: percentile(&cpu_values, 0.99) as u64,
253            cpu_max: percentile(&cpu_values, 1.0) as u64,
254            cpu_avg: average(&cpu_values) as u64,
255            memory_min: percentile(&memory_values, 0.0) as u64,
256            memory_p50: percentile(&memory_values, 0.50) as u64,
257            memory_p95: percentile(&memory_values, 0.95) as u64,
258            memory_p99: percentile(&memory_values, 0.99) as u64,
259            memory_max: percentile(&memory_values, 1.0) as u64,
260            memory_avg: average(&memory_values) as u64,
261        })
262    }
263
264    /// Get history for all containers in a workload.
265    pub async fn get_workload_history(
266        &self,
267        namespace: &str,
268        workload_name: &str,
269        workload_kind: &str,
270        time_range: &str,
271    ) -> Result<WorkloadHistory, PrometheusError> {
272        // First, discover containers in this workload
273        let containers = self.discover_containers(namespace, workload_name).await?;
274
275        let mut container_histories = Vec::new();
276
277        for container_name in containers {
278            match self
279                .get_container_history(namespace, workload_name, &container_name, time_range)
280                .await
281            {
282                Ok(history) => container_histories.push(history),
283                Err(PrometheusError::NoData) => continue, // Skip containers with no data
284                Err(e) => return Err(e),
285            }
286        }
287
288        Ok(WorkloadHistory {
289            workload_name: workload_name.to_string(),
290            workload_kind: workload_kind.to_string(),
291            namespace: namespace.to_string(),
292            containers: container_histories,
293            time_range: time_range.to_string(),
294        })
295    }
296
297    /// Generate right-sizing recommendations based on historical data.
298    pub fn generate_recommendation(
299        history: &ContainerHistory,
300        current_cpu_request: Option<u64>,
301        current_memory_request: Option<u64>,
302        safety_margin_pct: u8,
303    ) -> HistoricalRecommendation {
304        let margin_multiplier = 1.0 + (safety_margin_pct as f64 / 100.0);
305
306        // Use P99 + safety margin for recommendations
307        let recommended_cpu = (history.cpu_p99 as f64 * margin_multiplier).ceil() as u64;
308        let recommended_memory = (history.memory_p99 as f64 * margin_multiplier).ceil() as u64;
309
310        // Round CPU to nice values (nearest 25m for small, 100m for larger)
311        let recommended_cpu = round_cpu(recommended_cpu);
312        // Round memory to nice values (nearest 64Mi)
313        let recommended_memory = round_memory(recommended_memory);
314
315        let cpu_savings_pct = current_cpu_request
316            .map(|curr| ((curr as f32 - recommended_cpu as f32) / curr as f32) * 100.0)
317            .unwrap_or(0.0);
318
319        let memory_savings_pct = current_memory_request
320            .map(|curr| ((curr as f32 - recommended_memory as f32) / curr as f32) * 100.0)
321            .unwrap_or(0.0);
322
323        // Confidence based on sample count
324        let confidence = match history.sample_count {
325            0..=10 => 20,
326            11..=50 => 40,
327            51..=100 => 60,
328            101..=500 => 80,
329            _ => 95,
330        };
331
332        HistoricalRecommendation {
333            workload_name: history.pod_name.clone(),
334            container_name: history.container_name.clone(),
335            current_cpu_request,
336            recommended_cpu_request: recommended_cpu,
337            cpu_savings_pct,
338            current_memory_request,
339            recommended_memory_request: recommended_memory,
340            memory_savings_pct,
341            confidence,
342            safety_margin_pct,
343        }
344    }
345
346    /// Query Prometheus for a range of values.
347    async fn query_range(&self, query: &str, duration: &str) -> Result<Vec<f64>, PrometheusError> {
348        // Prometheus API requires Unix timestamps, not relative strings like "now-7d"
349        let now = std::time::SystemTime::now()
350            .duration_since(std::time::UNIX_EPOCH)
351            .unwrap()
352            .as_secs();
353        let duration_secs = parse_duration_to_seconds(duration)?;
354        let start = now - duration_secs;
355
356        // Use 1h step for 7d+ queries to avoid too many data points
357        let step = if duration_secs > 86400 * 3 {
358            "1h"
359        } else {
360            "5m"
361        };
362
363        let url = format!(
364            "{}/api/v1/query_range?query={}&start={}&end={}&step={}",
365            self.base_url,
366            urlencoding::encode(query),
367            start,
368            now,
369            step
370        );
371
372        let req = self.http_client.get(&url);
373        let response = self.add_auth(req).send().await?;
374
375        if !response.status().is_success() {
376            return Err(PrometheusError::QueryFailed(format!(
377                "HTTP {}: {}",
378                response.status(),
379                response.text().await.unwrap_or_default()
380            )));
381        }
382
383        let body: PrometheusResponse = response
384            .json()
385            .await
386            .map_err(|e| PrometheusError::ParseError(format!("Failed to parse response: {}", e)))?;
387
388        if body.status != "success" {
389            return Err(PrometheusError::QueryFailed(
390                body.error.unwrap_or_else(|| "Unknown error".to_string()),
391            ));
392        }
393
394        // Extract all values from the result
395        let mut values = Vec::new();
396        if let Some(result) = body.data.result {
397            for series in result {
398                for (_, value) in series.values.unwrap_or_default() {
399                    if let Ok(v) = value.parse::<f64>()
400                        && !v.is_nan()
401                        && v.is_finite()
402                    {
403                        values.push(v);
404                    }
405                }
406            }
407        }
408
409        Ok(values)
410    }
411
412    /// Discover containers in a workload.
413    async fn discover_containers(
414        &self,
415        namespace: &str,
416        workload_pattern: &str,
417    ) -> Result<Vec<String>, PrometheusError> {
418        let query = format!(
419            r#"count by (container) (container_cpu_usage_seconds_total{{namespace="{}", pod=~"{}.*", container!="POD", container!=""}})"#,
420            namespace, workload_pattern
421        );
422
423        let url = format!(
424            "{}/api/v1/query?query={}",
425            self.base_url,
426            urlencoding::encode(&query)
427        );
428
429        let req = self.http_client.get(&url);
430        let response = self.add_auth(req).send().await?;
431
432        if !response.status().is_success() {
433            return Err(PrometheusError::QueryFailed(format!(
434                "HTTP {}",
435                response.status()
436            )));
437        }
438
439        let body: PrometheusResponse = response
440            .json()
441            .await
442            .map_err(|e| PrometheusError::ParseError(format!("Failed to parse response: {}", e)))?;
443
444        let mut containers = Vec::new();
445        if let Some(result) = body.data.result {
446            for series in result {
447                if let Some(container) = series.metric.get("container") {
448                    containers.push(container.clone());
449                }
450            }
451        }
452
453        Ok(containers)
454    }
455}
456
457// ============================================================================
458// Prometheus API response types
459// ============================================================================
460
461#[derive(Debug, Deserialize)]
462struct PrometheusResponse {
463    status: String,
464    error: Option<String>,
465    data: PrometheusData,
466}
467
468#[derive(Debug, Deserialize)]
469struct PrometheusData {
470    #[serde(rename = "resultType")]
471    #[allow(dead_code)]
472    result_type: Option<String>,
473    result: Option<Vec<PrometheusResult>>,
474}
475
476#[derive(Debug, Deserialize)]
477struct PrometheusResult {
478    metric: HashMap<String, String>,
479    #[allow(dead_code)]
480    value: Option<(f64, String)>, // For instant queries
481    values: Option<Vec<(f64, String)>>, // For range queries
482}
483
484// ============================================================================
485// Helper functions
486// ============================================================================
487
488/// Parse a duration string (e.g., "7d", "24h", "30m") to Prometheus format.
489fn parse_duration(duration: &str) -> Result<String, PrometheusError> {
490    let duration = duration.trim().to_lowercase();
491
492    // Check for human-readable formats first (before single-char suffixes)
493    if duration.ends_with("days") {
494        let num: u32 = duration
495            .trim_end_matches("days")
496            .trim()
497            .parse()
498            .map_err(|_| PrometheusError::ParseError("Invalid duration number".to_string()))?;
499        Ok(format!("{}d", num))
500    } else if duration.ends_with("day") {
501        let num: u32 = duration
502            .trim_end_matches("day")
503            .trim()
504            .parse()
505            .map_err(|_| PrometheusError::ParseError("Invalid duration number".to_string()))?;
506        Ok(format!("{}d", num))
507    } else if duration.ends_with("weeks") {
508        let num: u32 = duration
509            .trim_end_matches("weeks")
510            .trim()
511            .parse()
512            .map_err(|_| PrometheusError::ParseError("Invalid duration number".to_string()))?;
513        Ok(format!("{}d", num * 7))
514    } else if duration.ends_with("week") {
515        let num: u32 = duration
516            .trim_end_matches("week")
517            .trim()
518            .parse()
519            .map_err(|_| PrometheusError::ParseError("Invalid duration number".to_string()))?;
520        Ok(format!("{}d", num * 7))
521    } else if duration.ends_with('d')
522        || duration.ends_with('h')
523        || duration.ends_with('m')
524        || duration.ends_with('s')
525    {
526        // Prometheus already understands these formats
527        Ok(duration)
528    } else {
529        // Default to treating as days
530        let num: u32 = duration
531            .parse()
532            .map_err(|_| PrometheusError::ParseError(format!("Invalid duration: {}", duration)))?;
533        Ok(format!("{}d", num))
534    }
535}
536
537/// Parse a duration string (e.g., "7d", "24h", "30m") to seconds.
538fn parse_duration_to_seconds(duration: &str) -> Result<u64, PrometheusError> {
539    let duration = duration.trim().to_lowercase();
540
541    // Extract the numeric part and unit
542    let (num_str, unit) = if duration.ends_with("days") {
543        (duration.trim_end_matches("days").trim(), "d")
544    } else if duration.ends_with("day") {
545        (duration.trim_end_matches("day").trim(), "d")
546    } else if duration.ends_with("weeks") {
547        (duration.trim_end_matches("weeks").trim(), "w")
548    } else if duration.ends_with("week") {
549        (duration.trim_end_matches("week").trim(), "w")
550    } else if duration.ends_with('d') {
551        (duration.trim_end_matches('d'), "d")
552    } else if duration.ends_with('h') {
553        (duration.trim_end_matches('h'), "h")
554    } else if duration.ends_with('m') {
555        (duration.trim_end_matches('m'), "m")
556    } else if duration.ends_with('s') {
557        (duration.trim_end_matches('s'), "s")
558    } else {
559        // Default to days
560        (duration.as_str(), "d")
561    };
562
563    let num: u64 = num_str.parse().map_err(|_| {
564        PrometheusError::ParseError(format!("Invalid duration number: {}", duration))
565    })?;
566
567    let seconds = match unit {
568        "w" => num * 7 * 24 * 60 * 60,
569        "d" => num * 24 * 60 * 60,
570        "h" => num * 60 * 60,
571        "m" => num * 60,
572        "s" => num,
573        _ => num * 24 * 60 * 60, // Default to days
574    };
575
576    Ok(seconds)
577}
578
579/// Calculate percentile of a sorted slice.
580fn percentile(values: &[f64], p: f64) -> f64 {
581    if values.is_empty() {
582        return 0.0;
583    }
584
585    let mut sorted = values.to_vec();
586    sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
587
588    if p <= 0.0 {
589        return sorted[0];
590    }
591    if p >= 1.0 {
592        return sorted[sorted.len() - 1];
593    }
594
595    let index = (p * (sorted.len() - 1) as f64).round() as usize;
596    sorted[index]
597}
598
599/// Calculate average of values.
600fn average(values: &[f64]) -> f64 {
601    if values.is_empty() {
602        return 0.0;
603    }
604    values.iter().sum::<f64>() / values.len() as f64
605}
606
607/// Round CPU millicores to nice values.
608/// Small values use ceiling (to prevent under-provisioning), larger values use rounding.
609fn round_cpu(millicores: u64) -> u64 {
610    if millicores == 0 {
611        0
612    } else if millicores <= 100 {
613        // Ceiling to nearest 25m (prevent under-provisioning for small requests)
614        millicores.div_ceil(25) * 25
615    } else if millicores <= 1000 {
616        // Round to nearest 50m
617        ((millicores + 25) / 50) * 50
618    } else {
619        // Round to nearest 100m
620        ((millicores + 50) / 100) * 100
621    }
622}
623
624/// Round memory bytes to nice values (64Mi increments).
625fn round_memory(bytes: u64) -> u64 {
626    const MI: u64 = 1024 * 1024;
627    const INCREMENT: u64 = 64 * MI;
628
629    if bytes <= 128 * MI {
630        // Round to nearest 32Mi for small values
631        let increment = 32 * MI;
632        ((bytes + increment / 2) / increment) * increment
633    } else {
634        // Round to nearest 64Mi
635        ((bytes + INCREMENT / 2) / INCREMENT) * INCREMENT
636    }
637}
638
639#[cfg(test)]
640mod tests {
641    use super::*;
642
643    #[test]
644    fn test_parse_duration() {
645        assert_eq!(parse_duration("7d").unwrap(), "7d");
646        assert_eq!(parse_duration("24h").unwrap(), "24h");
647        assert_eq!(parse_duration("30m").unwrap(), "30m");
648        assert_eq!(parse_duration("1week").unwrap(), "7d");
649        assert_eq!(parse_duration("2weeks").unwrap(), "14d");
650    }
651
652    #[test]
653    fn test_percentile() {
654        let values = vec![10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0];
655        assert!((percentile(&values, 0.0) - 10.0).abs() < 0.1);
656        assert!((percentile(&values, 0.5) - 55.0).abs() < 5.1); // ~50th percentile
657        assert!((percentile(&values, 1.0) - 100.0).abs() < 0.1);
658    }
659
660    #[test]
661    fn test_round_cpu() {
662        assert_eq!(round_cpu(12), 25);
663        assert_eq!(round_cpu(23), 25);
664        assert_eq!(round_cpu(37), 50);
665        assert_eq!(round_cpu(120), 100);
666        assert_eq!(round_cpu(175), 200);
667        assert_eq!(round_cpu(1234), 1200);
668    }
669
670    #[test]
671    fn test_round_memory() {
672        const MI: u64 = 1024 * 1024;
673        assert_eq!(round_memory(50 * MI), 64 * MI);
674        assert_eq!(round_memory(100 * MI), 96 * MI);
675        assert_eq!(round_memory(200 * MI), 192 * MI);
676        assert_eq!(round_memory(500 * MI), 512 * MI);
677    }
678
679    #[test]
680    fn test_parse_duration_to_seconds() {
681        // Days
682        assert_eq!(parse_duration_to_seconds("7d").unwrap(), 7 * 24 * 60 * 60);
683        assert_eq!(parse_duration_to_seconds("1d").unwrap(), 24 * 60 * 60);
684        // Hours
685        assert_eq!(parse_duration_to_seconds("24h").unwrap(), 24 * 60 * 60);
686        assert_eq!(parse_duration_to_seconds("1h").unwrap(), 60 * 60);
687        // Minutes
688        assert_eq!(parse_duration_to_seconds("30m").unwrap(), 30 * 60);
689        // Weeks
690        assert_eq!(
691            parse_duration_to_seconds("1week").unwrap(),
692            7 * 24 * 60 * 60
693        );
694        assert_eq!(
695            parse_duration_to_seconds("2weeks").unwrap(),
696            14 * 24 * 60 * 60
697        );
698    }
699}