1use reqwest::{Client, RequestBuilder};
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38
39#[derive(Debug, thiserror::Error)]
41pub enum PrometheusError {
42 #[error("Failed to connect to Prometheus: {0}")]
43 ConnectionFailed(String),
44
45 #[error("HTTP request failed: {0}")]
46 HttpError(#[from] reqwest::Error),
47
48 #[error("Invalid Prometheus URL: {0}")]
49 InvalidUrl(String),
50
51 #[error("Query failed: {0}")]
52 QueryFailed(String),
53
54 #[error("No data available for the specified time range")]
55 NoData,
56
57 #[error("Failed to parse response: {0}")]
58 ParseError(String),
59
60 #[error("Authentication failed: {0}")]
61 AuthError(String),
62}
63
64#[derive(Debug, Clone, Default, Serialize, Deserialize)]
70pub enum PrometheusAuth {
71 #[default]
73 None,
74 Basic { username: String, password: String },
76 Bearer(String),
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct ContainerHistory {
83 pub pod_name: String,
85 pub container_name: String,
87 pub namespace: String,
89 pub time_range: String,
91 pub sample_count: usize,
93 pub cpu_min: u64,
95 pub cpu_p50: u64,
96 pub cpu_p95: u64,
97 pub cpu_p99: u64,
98 pub cpu_max: u64,
99 pub cpu_avg: u64,
100 pub memory_min: u64,
102 pub memory_p50: u64,
103 pub memory_p95: u64,
104 pub memory_p99: u64,
105 pub memory_max: u64,
106 pub memory_avg: u64,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct WorkloadHistory {
112 pub workload_name: String,
114 pub workload_kind: String,
116 pub namespace: String,
118 pub containers: Vec<ContainerHistory>,
120 pub time_range: String,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct HistoricalRecommendation {
127 pub workload_name: String,
129 pub container_name: String,
131 pub current_cpu_request: Option<u64>,
133 pub recommended_cpu_request: u64,
135 pub cpu_savings_pct: f32,
137 pub current_memory_request: Option<u64>,
139 pub recommended_memory_request: u64,
141 pub memory_savings_pct: f32,
143 pub confidence: u8,
145 pub safety_margin_pct: u8,
147}
148
149pub struct PrometheusClient {
151 base_url: String,
152 http_client: Client,
153 auth: PrometheusAuth,
154}
155
156impl PrometheusClient {
157 pub fn new(url: &str) -> Result<Self, PrometheusError> {
162 Self::with_auth(url, PrometheusAuth::None)
163 }
164
165 pub fn with_auth(url: &str, auth: PrometheusAuth) -> Result<Self, PrometheusError> {
169 let base_url = url.trim_end_matches('/').to_string();
170
171 if !base_url.starts_with("http://") && !base_url.starts_with("https://") {
173 return Err(PrometheusError::InvalidUrl(
174 "URL must start with http:// or https://".to_string(),
175 ));
176 }
177
178 let http_client = Client::builder()
179 .timeout(std::time::Duration::from_secs(30))
180 .build()?;
181
182 Ok(Self {
183 base_url,
184 http_client,
185 auth,
186 })
187 }
188
189 fn add_auth(&self, req: RequestBuilder) -> RequestBuilder {
191 match &self.auth {
192 PrometheusAuth::None => req,
193 PrometheusAuth::Basic { username, password } => {
194 req.basic_auth(username, Some(password))
195 }
196 PrometheusAuth::Bearer(token) => req.bearer_auth(token),
197 }
198 }
199
200 pub async fn is_available(&self) -> bool {
202 let url = format!("{}/-/healthy", self.base_url);
204 let req = self
205 .http_client
206 .get(&url)
207 .timeout(std::time::Duration::from_secs(5));
208 match self.add_auth(req).send().await {
209 Ok(response) => response.status().is_success(),
210 Err(_) => false,
211 }
212 }
213
214 pub async fn get_container_history(
216 &self,
217 namespace: &str,
218 pod_pattern: &str,
219 container: &str,
220 time_range: &str,
221 ) -> Result<ContainerHistory, PrometheusError> {
222 let duration = parse_duration(time_range)?;
223
224 let cpu_query = format!(
226 r#"rate(container_cpu_usage_seconds_total{{namespace="{}", pod=~"{}.*", container="{}"}}[5m]) * 1000"#,
227 namespace, pod_pattern, container
228 );
229
230 let memory_query = format!(
232 r#"container_memory_working_set_bytes{{namespace="{}", pod=~"{}.*", container="{}"}}"#,
233 namespace, pod_pattern, container
234 );
235
236 let cpu_values = self.query_range(&cpu_query, &duration).await?;
237 let memory_values = self.query_range(&memory_query, &duration).await?;
238
239 if cpu_values.is_empty() && memory_values.is_empty() {
240 return Err(PrometheusError::NoData);
241 }
242
243 Ok(ContainerHistory {
244 pod_name: pod_pattern.to_string(),
245 container_name: container.to_string(),
246 namespace: namespace.to_string(),
247 time_range: time_range.to_string(),
248 sample_count: cpu_values.len().max(memory_values.len()),
249 cpu_min: percentile(&cpu_values, 0.0) as u64,
250 cpu_p50: percentile(&cpu_values, 0.50) as u64,
251 cpu_p95: percentile(&cpu_values, 0.95) as u64,
252 cpu_p99: percentile(&cpu_values, 0.99) as u64,
253 cpu_max: percentile(&cpu_values, 1.0) as u64,
254 cpu_avg: average(&cpu_values) as u64,
255 memory_min: percentile(&memory_values, 0.0) as u64,
256 memory_p50: percentile(&memory_values, 0.50) as u64,
257 memory_p95: percentile(&memory_values, 0.95) as u64,
258 memory_p99: percentile(&memory_values, 0.99) as u64,
259 memory_max: percentile(&memory_values, 1.0) as u64,
260 memory_avg: average(&memory_values) as u64,
261 })
262 }
263
264 pub async fn get_workload_history(
266 &self,
267 namespace: &str,
268 workload_name: &str,
269 workload_kind: &str,
270 time_range: &str,
271 ) -> Result<WorkloadHistory, PrometheusError> {
272 let containers = self.discover_containers(namespace, workload_name).await?;
274
275 let mut container_histories = Vec::new();
276
277 for container_name in containers {
278 match self
279 .get_container_history(namespace, workload_name, &container_name, time_range)
280 .await
281 {
282 Ok(history) => container_histories.push(history),
283 Err(PrometheusError::NoData) => continue, Err(e) => return Err(e),
285 }
286 }
287
288 Ok(WorkloadHistory {
289 workload_name: workload_name.to_string(),
290 workload_kind: workload_kind.to_string(),
291 namespace: namespace.to_string(),
292 containers: container_histories,
293 time_range: time_range.to_string(),
294 })
295 }
296
297 pub fn generate_recommendation(
299 history: &ContainerHistory,
300 current_cpu_request: Option<u64>,
301 current_memory_request: Option<u64>,
302 safety_margin_pct: u8,
303 ) -> HistoricalRecommendation {
304 let margin_multiplier = 1.0 + (safety_margin_pct as f64 / 100.0);
305
306 let recommended_cpu = (history.cpu_p99 as f64 * margin_multiplier).ceil() as u64;
308 let recommended_memory = (history.memory_p99 as f64 * margin_multiplier).ceil() as u64;
309
310 let recommended_cpu = round_cpu(recommended_cpu);
312 let recommended_memory = round_memory(recommended_memory);
314
315 let cpu_savings_pct = current_cpu_request
316 .map(|curr| ((curr as f32 - recommended_cpu as f32) / curr as f32) * 100.0)
317 .unwrap_or(0.0);
318
319 let memory_savings_pct = current_memory_request
320 .map(|curr| ((curr as f32 - recommended_memory as f32) / curr as f32) * 100.0)
321 .unwrap_or(0.0);
322
323 let confidence = match history.sample_count {
325 0..=10 => 20,
326 11..=50 => 40,
327 51..=100 => 60,
328 101..=500 => 80,
329 _ => 95,
330 };
331
332 HistoricalRecommendation {
333 workload_name: history.pod_name.clone(),
334 container_name: history.container_name.clone(),
335 current_cpu_request,
336 recommended_cpu_request: recommended_cpu,
337 cpu_savings_pct,
338 current_memory_request,
339 recommended_memory_request: recommended_memory,
340 memory_savings_pct,
341 confidence,
342 safety_margin_pct,
343 }
344 }
345
346 async fn query_range(&self, query: &str, duration: &str) -> Result<Vec<f64>, PrometheusError> {
348 let now = std::time::SystemTime::now()
350 .duration_since(std::time::UNIX_EPOCH)
351 .unwrap()
352 .as_secs();
353 let duration_secs = parse_duration_to_seconds(duration)?;
354 let start = now - duration_secs;
355
356 let step = if duration_secs > 86400 * 3 {
358 "1h"
359 } else {
360 "5m"
361 };
362
363 let url = format!(
364 "{}/api/v1/query_range?query={}&start={}&end={}&step={}",
365 self.base_url,
366 urlencoding::encode(query),
367 start,
368 now,
369 step
370 );
371
372 let req = self.http_client.get(&url);
373 let response = self.add_auth(req).send().await?;
374
375 if !response.status().is_success() {
376 return Err(PrometheusError::QueryFailed(format!(
377 "HTTP {}: {}",
378 response.status(),
379 response.text().await.unwrap_or_default()
380 )));
381 }
382
383 let body: PrometheusResponse = response
384 .json()
385 .await
386 .map_err(|e| PrometheusError::ParseError(format!("Failed to parse response: {}", e)))?;
387
388 if body.status != "success" {
389 return Err(PrometheusError::QueryFailed(
390 body.error.unwrap_or_else(|| "Unknown error".to_string()),
391 ));
392 }
393
394 let mut values = Vec::new();
396 if let Some(result) = body.data.result {
397 for series in result {
398 for (_, value) in series.values.unwrap_or_default() {
399 if let Ok(v) = value.parse::<f64>() {
400 if !v.is_nan() && v.is_finite() {
401 values.push(v);
402 }
403 }
404 }
405 }
406 }
407
408 Ok(values)
409 }
410
411 async fn discover_containers(
413 &self,
414 namespace: &str,
415 workload_pattern: &str,
416 ) -> Result<Vec<String>, PrometheusError> {
417 let query = format!(
418 r#"count by (container) (container_cpu_usage_seconds_total{{namespace="{}", pod=~"{}.*", container!="POD", container!=""}})"#,
419 namespace, workload_pattern
420 );
421
422 let url = format!(
423 "{}/api/v1/query?query={}",
424 self.base_url,
425 urlencoding::encode(&query)
426 );
427
428 let req = self.http_client.get(&url);
429 let response = self.add_auth(req).send().await?;
430
431 if !response.status().is_success() {
432 return Err(PrometheusError::QueryFailed(format!(
433 "HTTP {}",
434 response.status()
435 )));
436 }
437
438 let body: PrometheusResponse = response
439 .json()
440 .await
441 .map_err(|e| PrometheusError::ParseError(format!("Failed to parse response: {}", e)))?;
442
443 let mut containers = Vec::new();
444 if let Some(result) = body.data.result {
445 for series in result {
446 if let Some(container) = series.metric.get("container") {
447 containers.push(container.clone());
448 }
449 }
450 }
451
452 Ok(containers)
453 }
454}
455
456#[derive(Debug, Deserialize)]
461struct PrometheusResponse {
462 status: String,
463 error: Option<String>,
464 data: PrometheusData,
465}
466
467#[derive(Debug, Deserialize)]
468struct PrometheusData {
469 #[serde(rename = "resultType")]
470 #[allow(dead_code)]
471 result_type: Option<String>,
472 result: Option<Vec<PrometheusResult>>,
473}
474
475#[derive(Debug, Deserialize)]
476struct PrometheusResult {
477 metric: HashMap<String, String>,
478 #[allow(dead_code)]
479 value: Option<(f64, String)>, values: Option<Vec<(f64, String)>>, }
482
483fn parse_duration(duration: &str) -> Result<String, PrometheusError> {
489 let duration = duration.trim().to_lowercase();
490
491 if duration.ends_with("days") {
493 let num: u32 = duration
494 .trim_end_matches("days")
495 .trim()
496 .parse()
497 .map_err(|_| PrometheusError::ParseError("Invalid duration number".to_string()))?;
498 Ok(format!("{}d", num))
499 } else if duration.ends_with("day") {
500 let num: u32 = duration
501 .trim_end_matches("day")
502 .trim()
503 .parse()
504 .map_err(|_| PrometheusError::ParseError("Invalid duration number".to_string()))?;
505 Ok(format!("{}d", num))
506 } else if duration.ends_with("weeks") {
507 let num: u32 = duration
508 .trim_end_matches("weeks")
509 .trim()
510 .parse()
511 .map_err(|_| PrometheusError::ParseError("Invalid duration number".to_string()))?;
512 Ok(format!("{}d", num * 7))
513 } else if duration.ends_with("week") {
514 let num: u32 = duration
515 .trim_end_matches("week")
516 .trim()
517 .parse()
518 .map_err(|_| PrometheusError::ParseError("Invalid duration number".to_string()))?;
519 Ok(format!("{}d", num * 7))
520 } else if duration.ends_with('d')
521 || duration.ends_with('h')
522 || duration.ends_with('m')
523 || duration.ends_with('s')
524 {
525 Ok(duration)
527 } else {
528 let num: u32 = duration
530 .parse()
531 .map_err(|_| PrometheusError::ParseError(format!("Invalid duration: {}", duration)))?;
532 Ok(format!("{}d", num))
533 }
534}
535
536fn parse_duration_to_seconds(duration: &str) -> Result<u64, PrometheusError> {
538 let duration = duration.trim().to_lowercase();
539
540 let (num_str, unit) = if duration.ends_with("days") {
542 (duration.trim_end_matches("days").trim(), "d")
543 } else if duration.ends_with("day") {
544 (duration.trim_end_matches("day").trim(), "d")
545 } else if duration.ends_with("weeks") {
546 (duration.trim_end_matches("weeks").trim(), "w")
547 } else if duration.ends_with("week") {
548 (duration.trim_end_matches("week").trim(), "w")
549 } else if duration.ends_with('d') {
550 (duration.trim_end_matches('d'), "d")
551 } else if duration.ends_with('h') {
552 (duration.trim_end_matches('h'), "h")
553 } else if duration.ends_with('m') {
554 (duration.trim_end_matches('m'), "m")
555 } else if duration.ends_with('s') {
556 (duration.trim_end_matches('s'), "s")
557 } else {
558 (duration.as_str(), "d")
560 };
561
562 let num: u64 = num_str.parse().map_err(|_| {
563 PrometheusError::ParseError(format!("Invalid duration number: {}", duration))
564 })?;
565
566 let seconds = match unit {
567 "w" => num * 7 * 24 * 60 * 60,
568 "d" => num * 24 * 60 * 60,
569 "h" => num * 60 * 60,
570 "m" => num * 60,
571 "s" => num,
572 _ => num * 24 * 60 * 60, };
574
575 Ok(seconds)
576}
577
578fn percentile(values: &[f64], p: f64) -> f64 {
580 if values.is_empty() {
581 return 0.0;
582 }
583
584 let mut sorted = values.to_vec();
585 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
586
587 if p <= 0.0 {
588 return sorted[0];
589 }
590 if p >= 1.0 {
591 return sorted[sorted.len() - 1];
592 }
593
594 let index = (p * (sorted.len() - 1) as f64).round() as usize;
595 sorted[index]
596}
597
598fn average(values: &[f64]) -> f64 {
600 if values.is_empty() {
601 return 0.0;
602 }
603 values.iter().sum::<f64>() / values.len() as f64
604}
605
606fn round_cpu(millicores: u64) -> u64 {
609 if millicores == 0 {
610 0
611 } else if millicores <= 100 {
612 ((millicores + 24) / 25) * 25
614 } else if millicores <= 1000 {
615 ((millicores + 25) / 50) * 50
617 } else {
618 ((millicores + 50) / 100) * 100
620 }
621}
622
623fn round_memory(bytes: u64) -> u64 {
625 const MI: u64 = 1024 * 1024;
626 const INCREMENT: u64 = 64 * MI;
627
628 if bytes <= 128 * MI {
629 let increment = 32 * MI;
631 ((bytes + increment / 2) / increment) * increment
632 } else {
633 ((bytes + INCREMENT / 2) / INCREMENT) * INCREMENT
635 }
636}
637
638#[cfg(test)]
639mod tests {
640 use super::*;
641
642 #[test]
643 fn test_parse_duration() {
644 assert_eq!(parse_duration("7d").unwrap(), "7d");
645 assert_eq!(parse_duration("24h").unwrap(), "24h");
646 assert_eq!(parse_duration("30m").unwrap(), "30m");
647 assert_eq!(parse_duration("1week").unwrap(), "7d");
648 assert_eq!(parse_duration("2weeks").unwrap(), "14d");
649 }
650
651 #[test]
652 fn test_percentile() {
653 let values = vec![10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0];
654 assert!((percentile(&values, 0.0) - 10.0).abs() < 0.1);
655 assert!((percentile(&values, 0.5) - 55.0).abs() < 5.1); assert!((percentile(&values, 1.0) - 100.0).abs() < 0.1);
657 }
658
659 #[test]
660 fn test_round_cpu() {
661 assert_eq!(round_cpu(12), 25);
662 assert_eq!(round_cpu(23), 25);
663 assert_eq!(round_cpu(37), 50);
664 assert_eq!(round_cpu(120), 100);
665 assert_eq!(round_cpu(175), 200);
666 assert_eq!(round_cpu(1234), 1200);
667 }
668
669 #[test]
670 fn test_round_memory() {
671 const MI: u64 = 1024 * 1024;
672 assert_eq!(round_memory(50 * MI), 64 * MI);
673 assert_eq!(round_memory(100 * MI), 96 * MI);
674 assert_eq!(round_memory(200 * MI), 192 * MI);
675 assert_eq!(round_memory(500 * MI), 512 * MI);
676 }
677
678 #[test]
679 fn test_parse_duration_to_seconds() {
680 assert_eq!(parse_duration_to_seconds("7d").unwrap(), 7 * 24 * 60 * 60);
682 assert_eq!(parse_duration_to_seconds("1d").unwrap(), 24 * 60 * 60);
683 assert_eq!(parse_duration_to_seconds("24h").unwrap(), 24 * 60 * 60);
685 assert_eq!(parse_duration_to_seconds("1h").unwrap(), 60 * 60);
686 assert_eq!(parse_duration_to_seconds("30m").unwrap(), 30 * 60);
688 assert_eq!(
690 parse_duration_to_seconds("1week").unwrap(),
691 7 * 24 * 60 * 60
692 );
693 assert_eq!(
694 parse_duration_to_seconds("2weeks").unwrap(),
695 14 * 24 * 60 * 60
696 );
697 }
698}