syncable_cli/analyzer/k8s_optimize/
prometheus_client.rs

1//! Prometheus Client for historical Kubernetes metrics.
2//!
3//! Fetches historical CPU/memory usage data from Prometheus to calculate
4//! percentile values (P50, P95, P99, max) for accurate right-sizing.
5//!
6//! # Prerequisites
7//!
8//! - Prometheus accessible (via port-forward, ingress, or direct URL)
9//! - Prometheus collecting Kubernetes metrics (typically via kube-state-metrics and cAdvisor)
10//!
11//! # Authentication
12//!
13//! Authentication is **optional** and typically not needed when using `kubectl port-forward`
14//! because the connection goes directly to the pod, bypassing ingress/auth layers.
15//! Auth is only needed for externally exposed Prometheus instances.
16//!
17//! # Example
18//!
19//! ```rust,ignore
20//! use syncable_cli::analyzer::k8s_optimize::prometheus_client::{PrometheusClient, PrometheusAuth};
21//!
22//! // Default: No authentication (works with port-forward)
23//! let client = PrometheusClient::new("http://localhost:9090")?;
24//!
25//! // With authentication (for external Prometheus)
26//! let client = PrometheusClient::with_auth(
27//!     "https://prometheus.example.com",
28//!     PrometheusAuth::Bearer("token123".to_string())
29//! )?;
30//!
31//! let history = client.get_container_history("default", "api-gateway", "main", "7d").await?;
32//! println!("CPU P99: {}m", history.cpu_p99);
33//! ```
34
35use reqwest::{Client, RequestBuilder};
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38
39/// Error type for Prometheus client operations.
40#[derive(Debug, thiserror::Error)]
41pub enum PrometheusError {
42    #[error("Failed to connect to Prometheus: {0}")]
43    ConnectionFailed(String),
44
45    #[error("HTTP request failed: {0}")]
46    HttpError(#[from] reqwest::Error),
47
48    #[error("Invalid Prometheus URL: {0}")]
49    InvalidUrl(String),
50
51    #[error("Query failed: {0}")]
52    QueryFailed(String),
53
54    #[error("No data available for the specified time range")]
55    NoData,
56
57    #[error("Failed to parse response: {0}")]
58    ParseError(String),
59
60    #[error("Authentication failed: {0}")]
61    AuthError(String),
62}
63
64/// Authentication method for Prometheus (optional).
65///
66/// Authentication is typically NOT needed when using `kubectl port-forward`
67/// because the connection goes directly to the pod, bypassing ingress/auth layers.
68/// Auth is only needed for externally exposed Prometheus instances.
69#[derive(Debug, Clone, Default, Serialize, Deserialize)]
70pub enum PrometheusAuth {
71    /// No authentication (default - works for port-forward)
72    #[default]
73    None,
74    /// Basic auth (for externally exposed Prometheus)
75    Basic { username: String, password: String },
76    /// Bearer token (for externally exposed Prometheus with OAuth/OIDC)
77    Bearer(String),
78}
79
80/// Historical resource usage data for a container.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct ContainerHistory {
83    /// Pod name
84    pub pod_name: String,
85    /// Container name
86    pub container_name: String,
87    /// Namespace
88    pub namespace: String,
89    /// Time range queried (e.g., "7d", "30d")
90    pub time_range: String,
91    /// Number of data points
92    pub sample_count: usize,
93    /// CPU usage percentiles (in millicores)
94    pub cpu_min: u64,
95    pub cpu_p50: u64,
96    pub cpu_p95: u64,
97    pub cpu_p99: u64,
98    pub cpu_max: u64,
99    pub cpu_avg: u64,
100    /// Memory usage percentiles (in bytes)
101    pub memory_min: u64,
102    pub memory_p50: u64,
103    pub memory_p95: u64,
104    pub memory_p99: u64,
105    pub memory_max: u64,
106    pub memory_avg: u64,
107}
108
109/// Aggregated history for a workload (Deployment/StatefulSet/etc).
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct WorkloadHistory {
112    /// Workload name
113    pub workload_name: String,
114    /// Workload kind (Deployment, StatefulSet, etc.)
115    pub workload_kind: String,
116    /// Namespace
117    pub namespace: String,
118    /// Container histories
119    pub containers: Vec<ContainerHistory>,
120    /// Time range queried
121    pub time_range: String,
122}
123
124/// Right-sizing recommendation based on historical data.
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct HistoricalRecommendation {
127    /// Workload name
128    pub workload_name: String,
129    /// Container name
130    pub container_name: String,
131    /// Current CPU request (millicores)
132    pub current_cpu_request: Option<u64>,
133    /// Recommended CPU request (millicores)
134    pub recommended_cpu_request: u64,
135    /// CPU savings percentage (negative if under-provisioned)
136    pub cpu_savings_pct: f32,
137    /// Current memory request (bytes)
138    pub current_memory_request: Option<u64>,
139    /// Recommended memory request (bytes)
140    pub recommended_memory_request: u64,
141    /// Memory savings percentage (negative if under-provisioned)
142    pub memory_savings_pct: f32,
143    /// Confidence level (0-100, based on sample count)
144    pub confidence: u8,
145    /// Safety margin applied
146    pub safety_margin_pct: u8,
147}
148
149/// Prometheus client for querying historical metrics.
150pub struct PrometheusClient {
151    base_url: String,
152    http_client: Client,
153    auth: PrometheusAuth,
154}
155
156impl PrometheusClient {
157    /// Create a new Prometheus client without authentication.
158    ///
159    /// This is the default and works for `kubectl port-forward` connections
160    /// where no authentication is needed.
161    pub fn new(url: &str) -> Result<Self, PrometheusError> {
162        Self::with_auth(url, PrometheusAuth::None)
163    }
164
165    /// Create a new Prometheus client with optional authentication.
166    ///
167    /// Use this for externally exposed Prometheus instances that require auth.
168    pub fn with_auth(url: &str, auth: PrometheusAuth) -> Result<Self, PrometheusError> {
169        let base_url = url.trim_end_matches('/').to_string();
170
171        // Validate URL format
172        if !base_url.starts_with("http://") && !base_url.starts_with("https://") {
173            return Err(PrometheusError::InvalidUrl(
174                "URL must start with http:// or https://".to_string(),
175            ));
176        }
177
178        let http_client = Client::builder()
179            .timeout(std::time::Duration::from_secs(30))
180            .build()?;
181
182        Ok(Self {
183            base_url,
184            http_client,
185            auth,
186        })
187    }
188
189    /// Add authentication headers to a request (if configured).
190    fn add_auth(&self, req: RequestBuilder) -> RequestBuilder {
191        match &self.auth {
192            PrometheusAuth::None => req,
193            PrometheusAuth::Basic { username, password } => {
194                req.basic_auth(username, Some(password))
195            }
196            PrometheusAuth::Bearer(token) => req.bearer_auth(token),
197        }
198    }
199
200    /// Check if Prometheus is reachable.
201    pub async fn is_available(&self) -> bool {
202        // Use the health endpoint which is faster and simpler
203        let url = format!("{}/-/healthy", self.base_url);
204        let req = self
205            .http_client
206            .get(&url)
207            .timeout(std::time::Duration::from_secs(5));
208        match self.add_auth(req).send().await {
209            Ok(response) => response.status().is_success(),
210            Err(_) => false,
211        }
212    }
213
214    /// Get container CPU/memory history.
215    pub async fn get_container_history(
216        &self,
217        namespace: &str,
218        pod_pattern: &str,
219        container: &str,
220        time_range: &str,
221    ) -> Result<ContainerHistory, PrometheusError> {
222        let duration = parse_duration(time_range)?;
223
224        // Query CPU usage (rate of CPU seconds over time, converted to millicores)
225        let cpu_query = format!(
226            r#"rate(container_cpu_usage_seconds_total{{namespace="{}", pod=~"{}.*", container="{}"}}[5m]) * 1000"#,
227            namespace, pod_pattern, container
228        );
229
230        // Query memory usage
231        let memory_query = format!(
232            r#"container_memory_working_set_bytes{{namespace="{}", pod=~"{}.*", container="{}"}}"#,
233            namespace, pod_pattern, container
234        );
235
236        let cpu_values = self.query_range(&cpu_query, &duration).await?;
237        let memory_values = self.query_range(&memory_query, &duration).await?;
238
239        if cpu_values.is_empty() && memory_values.is_empty() {
240            return Err(PrometheusError::NoData);
241        }
242
243        Ok(ContainerHistory {
244            pod_name: pod_pattern.to_string(),
245            container_name: container.to_string(),
246            namespace: namespace.to_string(),
247            time_range: time_range.to_string(),
248            sample_count: cpu_values.len().max(memory_values.len()),
249            cpu_min: percentile(&cpu_values, 0.0) as u64,
250            cpu_p50: percentile(&cpu_values, 0.50) as u64,
251            cpu_p95: percentile(&cpu_values, 0.95) as u64,
252            cpu_p99: percentile(&cpu_values, 0.99) as u64,
253            cpu_max: percentile(&cpu_values, 1.0) as u64,
254            cpu_avg: average(&cpu_values) as u64,
255            memory_min: percentile(&memory_values, 0.0) as u64,
256            memory_p50: percentile(&memory_values, 0.50) as u64,
257            memory_p95: percentile(&memory_values, 0.95) as u64,
258            memory_p99: percentile(&memory_values, 0.99) as u64,
259            memory_max: percentile(&memory_values, 1.0) as u64,
260            memory_avg: average(&memory_values) as u64,
261        })
262    }
263
264    /// Get history for all containers in a workload.
265    pub async fn get_workload_history(
266        &self,
267        namespace: &str,
268        workload_name: &str,
269        workload_kind: &str,
270        time_range: &str,
271    ) -> Result<WorkloadHistory, PrometheusError> {
272        // First, discover containers in this workload
273        let containers = self.discover_containers(namespace, workload_name).await?;
274
275        let mut container_histories = Vec::new();
276
277        for container_name in containers {
278            match self
279                .get_container_history(namespace, workload_name, &container_name, time_range)
280                .await
281            {
282                Ok(history) => container_histories.push(history),
283                Err(PrometheusError::NoData) => continue, // Skip containers with no data
284                Err(e) => return Err(e),
285            }
286        }
287
288        Ok(WorkloadHistory {
289            workload_name: workload_name.to_string(),
290            workload_kind: workload_kind.to_string(),
291            namespace: namespace.to_string(),
292            containers: container_histories,
293            time_range: time_range.to_string(),
294        })
295    }
296
297    /// Generate right-sizing recommendations based on historical data.
298    pub fn generate_recommendation(
299        history: &ContainerHistory,
300        current_cpu_request: Option<u64>,
301        current_memory_request: Option<u64>,
302        safety_margin_pct: u8,
303    ) -> HistoricalRecommendation {
304        let margin_multiplier = 1.0 + (safety_margin_pct as f64 / 100.0);
305
306        // Use P99 + safety margin for recommendations
307        let recommended_cpu = (history.cpu_p99 as f64 * margin_multiplier).ceil() as u64;
308        let recommended_memory = (history.memory_p99 as f64 * margin_multiplier).ceil() as u64;
309
310        // Round CPU to nice values (nearest 25m for small, 100m for larger)
311        let recommended_cpu = round_cpu(recommended_cpu);
312        // Round memory to nice values (nearest 64Mi)
313        let recommended_memory = round_memory(recommended_memory);
314
315        let cpu_savings_pct = current_cpu_request
316            .map(|curr| ((curr as f32 - recommended_cpu as f32) / curr as f32) * 100.0)
317            .unwrap_or(0.0);
318
319        let memory_savings_pct = current_memory_request
320            .map(|curr| ((curr as f32 - recommended_memory as f32) / curr as f32) * 100.0)
321            .unwrap_or(0.0);
322
323        // Confidence based on sample count
324        let confidence = match history.sample_count {
325            0..=10 => 20,
326            11..=50 => 40,
327            51..=100 => 60,
328            101..=500 => 80,
329            _ => 95,
330        };
331
332        HistoricalRecommendation {
333            workload_name: history.pod_name.clone(),
334            container_name: history.container_name.clone(),
335            current_cpu_request,
336            recommended_cpu_request: recommended_cpu,
337            cpu_savings_pct,
338            current_memory_request,
339            recommended_memory_request: recommended_memory,
340            memory_savings_pct,
341            confidence,
342            safety_margin_pct,
343        }
344    }
345
346    /// Query Prometheus for a range of values.
347    async fn query_range(&self, query: &str, duration: &str) -> Result<Vec<f64>, PrometheusError> {
348        // Prometheus API requires Unix timestamps, not relative strings like "now-7d"
349        let now = std::time::SystemTime::now()
350            .duration_since(std::time::UNIX_EPOCH)
351            .unwrap()
352            .as_secs();
353        let duration_secs = parse_duration_to_seconds(duration)?;
354        let start = now - duration_secs;
355
356        // Use 1h step for 7d+ queries to avoid too many data points
357        let step = if duration_secs > 86400 * 3 {
358            "1h"
359        } else {
360            "5m"
361        };
362
363        let url = format!(
364            "{}/api/v1/query_range?query={}&start={}&end={}&step={}",
365            self.base_url,
366            urlencoding::encode(query),
367            start,
368            now,
369            step
370        );
371
372        let req = self.http_client.get(&url);
373        let response = self.add_auth(req).send().await?;
374
375        if !response.status().is_success() {
376            return Err(PrometheusError::QueryFailed(format!(
377                "HTTP {}: {}",
378                response.status(),
379                response.text().await.unwrap_or_default()
380            )));
381        }
382
383        let body: PrometheusResponse = response
384            .json()
385            .await
386            .map_err(|e| PrometheusError::ParseError(format!("Failed to parse response: {}", e)))?;
387
388        if body.status != "success" {
389            return Err(PrometheusError::QueryFailed(
390                body.error.unwrap_or_else(|| "Unknown error".to_string()),
391            ));
392        }
393
394        // Extract all values from the result
395        let mut values = Vec::new();
396        if let Some(result) = body.data.result {
397            for series in result {
398                for (_, value) in series.values.unwrap_or_default() {
399                    if let Ok(v) = value.parse::<f64>() {
400                        if !v.is_nan() && v.is_finite() {
401                            values.push(v);
402                        }
403                    }
404                }
405            }
406        }
407
408        Ok(values)
409    }
410
411    /// Discover containers in a workload.
412    async fn discover_containers(
413        &self,
414        namespace: &str,
415        workload_pattern: &str,
416    ) -> Result<Vec<String>, PrometheusError> {
417        let query = format!(
418            r#"count by (container) (container_cpu_usage_seconds_total{{namespace="{}", pod=~"{}.*", container!="POD", container!=""}})"#,
419            namespace, workload_pattern
420        );
421
422        let url = format!(
423            "{}/api/v1/query?query={}",
424            self.base_url,
425            urlencoding::encode(&query)
426        );
427
428        let req = self.http_client.get(&url);
429        let response = self.add_auth(req).send().await?;
430
431        if !response.status().is_success() {
432            return Err(PrometheusError::QueryFailed(format!(
433                "HTTP {}",
434                response.status()
435            )));
436        }
437
438        let body: PrometheusResponse = response
439            .json()
440            .await
441            .map_err(|e| PrometheusError::ParseError(format!("Failed to parse response: {}", e)))?;
442
443        let mut containers = Vec::new();
444        if let Some(result) = body.data.result {
445            for series in result {
446                if let Some(container) = series.metric.get("container") {
447                    containers.push(container.clone());
448                }
449            }
450        }
451
452        Ok(containers)
453    }
454}
455
456// ============================================================================
457// Prometheus API response types
458// ============================================================================
459
460#[derive(Debug, Deserialize)]
461struct PrometheusResponse {
462    status: String,
463    error: Option<String>,
464    data: PrometheusData,
465}
466
467#[derive(Debug, Deserialize)]
468struct PrometheusData {
469    #[serde(rename = "resultType")]
470    #[allow(dead_code)]
471    result_type: Option<String>,
472    result: Option<Vec<PrometheusResult>>,
473}
474
475#[derive(Debug, Deserialize)]
476struct PrometheusResult {
477    metric: HashMap<String, String>,
478    #[allow(dead_code)]
479    value: Option<(f64, String)>, // For instant queries
480    values: Option<Vec<(f64, String)>>, // For range queries
481}
482
483// ============================================================================
484// Helper functions
485// ============================================================================
486
487/// Parse a duration string (e.g., "7d", "24h", "30m") to Prometheus format.
488fn parse_duration(duration: &str) -> Result<String, PrometheusError> {
489    let duration = duration.trim().to_lowercase();
490
491    // Check for human-readable formats first (before single-char suffixes)
492    if duration.ends_with("days") {
493        let num: u32 = duration
494            .trim_end_matches("days")
495            .trim()
496            .parse()
497            .map_err(|_| PrometheusError::ParseError("Invalid duration number".to_string()))?;
498        Ok(format!("{}d", num))
499    } else if duration.ends_with("day") {
500        let num: u32 = duration
501            .trim_end_matches("day")
502            .trim()
503            .parse()
504            .map_err(|_| PrometheusError::ParseError("Invalid duration number".to_string()))?;
505        Ok(format!("{}d", num))
506    } else if duration.ends_with("weeks") {
507        let num: u32 = duration
508            .trim_end_matches("weeks")
509            .trim()
510            .parse()
511            .map_err(|_| PrometheusError::ParseError("Invalid duration number".to_string()))?;
512        Ok(format!("{}d", num * 7))
513    } else if duration.ends_with("week") {
514        let num: u32 = duration
515            .trim_end_matches("week")
516            .trim()
517            .parse()
518            .map_err(|_| PrometheusError::ParseError("Invalid duration number".to_string()))?;
519        Ok(format!("{}d", num * 7))
520    } else if duration.ends_with('d')
521        || duration.ends_with('h')
522        || duration.ends_with('m')
523        || duration.ends_with('s')
524    {
525        // Prometheus already understands these formats
526        Ok(duration)
527    } else {
528        // Default to treating as days
529        let num: u32 = duration
530            .parse()
531            .map_err(|_| PrometheusError::ParseError(format!("Invalid duration: {}", duration)))?;
532        Ok(format!("{}d", num))
533    }
534}
535
536/// Parse a duration string (e.g., "7d", "24h", "30m") to seconds.
537fn parse_duration_to_seconds(duration: &str) -> Result<u64, PrometheusError> {
538    let duration = duration.trim().to_lowercase();
539
540    // Extract the numeric part and unit
541    let (num_str, unit) = if duration.ends_with("days") {
542        (duration.trim_end_matches("days").trim(), "d")
543    } else if duration.ends_with("day") {
544        (duration.trim_end_matches("day").trim(), "d")
545    } else if duration.ends_with("weeks") {
546        (duration.trim_end_matches("weeks").trim(), "w")
547    } else if duration.ends_with("week") {
548        (duration.trim_end_matches("week").trim(), "w")
549    } else if duration.ends_with('d') {
550        (duration.trim_end_matches('d'), "d")
551    } else if duration.ends_with('h') {
552        (duration.trim_end_matches('h'), "h")
553    } else if duration.ends_with('m') {
554        (duration.trim_end_matches('m'), "m")
555    } else if duration.ends_with('s') {
556        (duration.trim_end_matches('s'), "s")
557    } else {
558        // Default to days
559        (duration.as_str(), "d")
560    };
561
562    let num: u64 = num_str.parse().map_err(|_| {
563        PrometheusError::ParseError(format!("Invalid duration number: {}", duration))
564    })?;
565
566    let seconds = match unit {
567        "w" => num * 7 * 24 * 60 * 60,
568        "d" => num * 24 * 60 * 60,
569        "h" => num * 60 * 60,
570        "m" => num * 60,
571        "s" => num,
572        _ => num * 24 * 60 * 60, // Default to days
573    };
574
575    Ok(seconds)
576}
577
578/// Calculate percentile of a sorted slice.
579fn percentile(values: &[f64], p: f64) -> f64 {
580    if values.is_empty() {
581        return 0.0;
582    }
583
584    let mut sorted = values.to_vec();
585    sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
586
587    if p <= 0.0 {
588        return sorted[0];
589    }
590    if p >= 1.0 {
591        return sorted[sorted.len() - 1];
592    }
593
594    let index = (p * (sorted.len() - 1) as f64).round() as usize;
595    sorted[index]
596}
597
598/// Calculate average of values.
599fn average(values: &[f64]) -> f64 {
600    if values.is_empty() {
601        return 0.0;
602    }
603    values.iter().sum::<f64>() / values.len() as f64
604}
605
606/// Round CPU millicores to nice values.
607/// Small values use ceiling (to prevent under-provisioning), larger values use rounding.
608fn round_cpu(millicores: u64) -> u64 {
609    if millicores == 0 {
610        0
611    } else if millicores <= 100 {
612        // Ceiling to nearest 25m (prevent under-provisioning for small requests)
613        ((millicores + 24) / 25) * 25
614    } else if millicores <= 1000 {
615        // Round to nearest 50m
616        ((millicores + 25) / 50) * 50
617    } else {
618        // Round to nearest 100m
619        ((millicores + 50) / 100) * 100
620    }
621}
622
623/// Round memory bytes to nice values (64Mi increments).
624fn round_memory(bytes: u64) -> u64 {
625    const MI: u64 = 1024 * 1024;
626    const INCREMENT: u64 = 64 * MI;
627
628    if bytes <= 128 * MI {
629        // Round to nearest 32Mi for small values
630        let increment = 32 * MI;
631        ((bytes + increment / 2) / increment) * increment
632    } else {
633        // Round to nearest 64Mi
634        ((bytes + INCREMENT / 2) / INCREMENT) * INCREMENT
635    }
636}
637
638#[cfg(test)]
639mod tests {
640    use super::*;
641
642    #[test]
643    fn test_parse_duration() {
644        assert_eq!(parse_duration("7d").unwrap(), "7d");
645        assert_eq!(parse_duration("24h").unwrap(), "24h");
646        assert_eq!(parse_duration("30m").unwrap(), "30m");
647        assert_eq!(parse_duration("1week").unwrap(), "7d");
648        assert_eq!(parse_duration("2weeks").unwrap(), "14d");
649    }
650
651    #[test]
652    fn test_percentile() {
653        let values = vec![10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0];
654        assert!((percentile(&values, 0.0) - 10.0).abs() < 0.1);
655        assert!((percentile(&values, 0.5) - 55.0).abs() < 5.1); // ~50th percentile
656        assert!((percentile(&values, 1.0) - 100.0).abs() < 0.1);
657    }
658
659    #[test]
660    fn test_round_cpu() {
661        assert_eq!(round_cpu(12), 25);
662        assert_eq!(round_cpu(23), 25);
663        assert_eq!(round_cpu(37), 50);
664        assert_eq!(round_cpu(120), 100);
665        assert_eq!(round_cpu(175), 200);
666        assert_eq!(round_cpu(1234), 1200);
667    }
668
669    #[test]
670    fn test_round_memory() {
671        const MI: u64 = 1024 * 1024;
672        assert_eq!(round_memory(50 * MI), 64 * MI);
673        assert_eq!(round_memory(100 * MI), 96 * MI);
674        assert_eq!(round_memory(200 * MI), 192 * MI);
675        assert_eq!(round_memory(500 * MI), 512 * MI);
676    }
677
678    #[test]
679    fn test_parse_duration_to_seconds() {
680        // Days
681        assert_eq!(parse_duration_to_seconds("7d").unwrap(), 7 * 24 * 60 * 60);
682        assert_eq!(parse_duration_to_seconds("1d").unwrap(), 24 * 60 * 60);
683        // Hours
684        assert_eq!(parse_duration_to_seconds("24h").unwrap(), 24 * 60 * 60);
685        assert_eq!(parse_duration_to_seconds("1h").unwrap(), 60 * 60);
686        // Minutes
687        assert_eq!(parse_duration_to_seconds("30m").unwrap(), 30 * 60);
688        // Weeks
689        assert_eq!(
690            parse_duration_to_seconds("1week").unwrap(),
691            7 * 24 * 60 * 60
692        );
693        assert_eq!(
694            parse_duration_to_seconds("2weeks").unwrap(),
695            14 * 24 * 60 * 60
696        );
697    }
698}