1use reqwest::{Client, RequestBuilder};
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38
39#[derive(Debug, thiserror::Error)]
41pub enum PrometheusError {
42 #[error("Failed to connect to Prometheus: {0}")]
43 ConnectionFailed(String),
44
45 #[error("HTTP request failed: {0}")]
46 HttpError(#[from] reqwest::Error),
47
48 #[error("Invalid Prometheus URL: {0}")]
49 InvalidUrl(String),
50
51 #[error("Query failed: {0}")]
52 QueryFailed(String),
53
54 #[error("No data available for the specified time range")]
55 NoData,
56
57 #[error("Failed to parse response: {0}")]
58 ParseError(String),
59
60 #[error("Authentication failed: {0}")]
61 AuthError(String),
62}
63
64#[derive(Debug, Clone, Default, Serialize, Deserialize)]
70pub enum PrometheusAuth {
71 #[default]
73 None,
74 Basic { username: String, password: String },
76 Bearer(String),
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct ContainerHistory {
83 pub pod_name: String,
85 pub container_name: String,
87 pub namespace: String,
89 pub time_range: String,
91 pub sample_count: usize,
93 pub cpu_min: u64,
95 pub cpu_p50: u64,
96 pub cpu_p95: u64,
97 pub cpu_p99: u64,
98 pub cpu_max: u64,
99 pub cpu_avg: u64,
100 pub memory_min: u64,
102 pub memory_p50: u64,
103 pub memory_p95: u64,
104 pub memory_p99: u64,
105 pub memory_max: u64,
106 pub memory_avg: u64,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct WorkloadHistory {
112 pub workload_name: String,
114 pub workload_kind: String,
116 pub namespace: String,
118 pub containers: Vec<ContainerHistory>,
120 pub time_range: String,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct HistoricalRecommendation {
127 pub workload_name: String,
129 pub container_name: String,
131 pub current_cpu_request: Option<u64>,
133 pub recommended_cpu_request: u64,
135 pub cpu_savings_pct: f32,
137 pub current_memory_request: Option<u64>,
139 pub recommended_memory_request: u64,
141 pub memory_savings_pct: f32,
143 pub confidence: u8,
145 pub safety_margin_pct: u8,
147}
148
149pub struct PrometheusClient {
151 base_url: String,
152 http_client: Client,
153 auth: PrometheusAuth,
154}
155
156impl PrometheusClient {
157 pub fn new(url: &str) -> Result<Self, PrometheusError> {
162 Self::with_auth(url, PrometheusAuth::None)
163 }
164
165 pub fn with_auth(url: &str, auth: PrometheusAuth) -> Result<Self, PrometheusError> {
169 let base_url = url.trim_end_matches('/').to_string();
170
171 if !base_url.starts_with("http://") && !base_url.starts_with("https://") {
173 return Err(PrometheusError::InvalidUrl(
174 "URL must start with http:// or https://".to_string(),
175 ));
176 }
177
178 let http_client = Client::builder()
179 .timeout(std::time::Duration::from_secs(30))
180 .build()?;
181
182 Ok(Self {
183 base_url,
184 http_client,
185 auth,
186 })
187 }
188
189 fn add_auth(&self, req: RequestBuilder) -> RequestBuilder {
191 match &self.auth {
192 PrometheusAuth::None => req,
193 PrometheusAuth::Basic { username, password } => {
194 req.basic_auth(username, Some(password))
195 }
196 PrometheusAuth::Bearer(token) => req.bearer_auth(token),
197 }
198 }
199
200 pub async fn is_available(&self) -> bool {
202 let url = format!("{}/-/healthy", self.base_url);
204 let req = self
205 .http_client
206 .get(&url)
207 .timeout(std::time::Duration::from_secs(5));
208 match self.add_auth(req).send().await {
209 Ok(response) => response.status().is_success(),
210 Err(_) => false,
211 }
212 }
213
214 pub async fn get_container_history(
216 &self,
217 namespace: &str,
218 pod_pattern: &str,
219 container: &str,
220 time_range: &str,
221 ) -> Result<ContainerHistory, PrometheusError> {
222 let duration = parse_duration(time_range)?;
223
224 let cpu_query = format!(
226 r#"rate(container_cpu_usage_seconds_total{{namespace="{}", pod=~"{}.*", container="{}"}}[5m]) * 1000"#,
227 namespace, pod_pattern, container
228 );
229
230 let memory_query = format!(
232 r#"container_memory_working_set_bytes{{namespace="{}", pod=~"{}.*", container="{}"}}"#,
233 namespace, pod_pattern, container
234 );
235
236 let cpu_values = self.query_range(&cpu_query, &duration).await?;
237 let memory_values = self.query_range(&memory_query, &duration).await?;
238
239 if cpu_values.is_empty() && memory_values.is_empty() {
240 return Err(PrometheusError::NoData);
241 }
242
243 Ok(ContainerHistory {
244 pod_name: pod_pattern.to_string(),
245 container_name: container.to_string(),
246 namespace: namespace.to_string(),
247 time_range: time_range.to_string(),
248 sample_count: cpu_values.len().max(memory_values.len()),
249 cpu_min: percentile(&cpu_values, 0.0) as u64,
250 cpu_p50: percentile(&cpu_values, 0.50) as u64,
251 cpu_p95: percentile(&cpu_values, 0.95) as u64,
252 cpu_p99: percentile(&cpu_values, 0.99) as u64,
253 cpu_max: percentile(&cpu_values, 1.0) as u64,
254 cpu_avg: average(&cpu_values) as u64,
255 memory_min: percentile(&memory_values, 0.0) as u64,
256 memory_p50: percentile(&memory_values, 0.50) as u64,
257 memory_p95: percentile(&memory_values, 0.95) as u64,
258 memory_p99: percentile(&memory_values, 0.99) as u64,
259 memory_max: percentile(&memory_values, 1.0) as u64,
260 memory_avg: average(&memory_values) as u64,
261 })
262 }
263
264 pub async fn get_workload_history(
266 &self,
267 namespace: &str,
268 workload_name: &str,
269 workload_kind: &str,
270 time_range: &str,
271 ) -> Result<WorkloadHistory, PrometheusError> {
272 let containers = self.discover_containers(namespace, workload_name).await?;
274
275 let mut container_histories = Vec::new();
276
277 for container_name in containers {
278 match self
279 .get_container_history(namespace, workload_name, &container_name, time_range)
280 .await
281 {
282 Ok(history) => container_histories.push(history),
283 Err(PrometheusError::NoData) => continue, Err(e) => return Err(e),
285 }
286 }
287
288 Ok(WorkloadHistory {
289 workload_name: workload_name.to_string(),
290 workload_kind: workload_kind.to_string(),
291 namespace: namespace.to_string(),
292 containers: container_histories,
293 time_range: time_range.to_string(),
294 })
295 }
296
297 pub fn generate_recommendation(
299 history: &ContainerHistory,
300 current_cpu_request: Option<u64>,
301 current_memory_request: Option<u64>,
302 safety_margin_pct: u8,
303 ) -> HistoricalRecommendation {
304 let margin_multiplier = 1.0 + (safety_margin_pct as f64 / 100.0);
305
306 let recommended_cpu = (history.cpu_p99 as f64 * margin_multiplier).ceil() as u64;
308 let recommended_memory = (history.memory_p99 as f64 * margin_multiplier).ceil() as u64;
309
310 let recommended_cpu = round_cpu(recommended_cpu);
312 let recommended_memory = round_memory(recommended_memory);
314
315 let cpu_savings_pct = current_cpu_request
316 .map(|curr| ((curr as f32 - recommended_cpu as f32) / curr as f32) * 100.0)
317 .unwrap_or(0.0);
318
319 let memory_savings_pct = current_memory_request
320 .map(|curr| ((curr as f32 - recommended_memory as f32) / curr as f32) * 100.0)
321 .unwrap_or(0.0);
322
323 let confidence = match history.sample_count {
325 0..=10 => 20,
326 11..=50 => 40,
327 51..=100 => 60,
328 101..=500 => 80,
329 _ => 95,
330 };
331
332 HistoricalRecommendation {
333 workload_name: history.pod_name.clone(),
334 container_name: history.container_name.clone(),
335 current_cpu_request,
336 recommended_cpu_request: recommended_cpu,
337 cpu_savings_pct,
338 current_memory_request,
339 recommended_memory_request: recommended_memory,
340 memory_savings_pct,
341 confidence,
342 safety_margin_pct,
343 }
344 }
345
346 async fn query_range(&self, query: &str, duration: &str) -> Result<Vec<f64>, PrometheusError> {
348 let now = std::time::SystemTime::now()
350 .duration_since(std::time::UNIX_EPOCH)
351 .unwrap()
352 .as_secs();
353 let duration_secs = parse_duration_to_seconds(duration)?;
354 let start = now - duration_secs;
355
356 let step = if duration_secs > 86400 * 3 {
358 "1h"
359 } else {
360 "5m"
361 };
362
363 let url = format!(
364 "{}/api/v1/query_range?query={}&start={}&end={}&step={}",
365 self.base_url,
366 urlencoding::encode(query),
367 start,
368 now,
369 step
370 );
371
372 let req = self.http_client.get(&url);
373 let response = self.add_auth(req).send().await?;
374
375 if !response.status().is_success() {
376 return Err(PrometheusError::QueryFailed(format!(
377 "HTTP {}: {}",
378 response.status(),
379 response.text().await.unwrap_or_default()
380 )));
381 }
382
383 let body: PrometheusResponse = response
384 .json()
385 .await
386 .map_err(|e| PrometheusError::ParseError(format!("Failed to parse response: {}", e)))?;
387
388 if body.status != "success" {
389 return Err(PrometheusError::QueryFailed(
390 body.error.unwrap_or_else(|| "Unknown error".to_string()),
391 ));
392 }
393
394 let mut values = Vec::new();
396 if let Some(result) = body.data.result {
397 for series in result {
398 for (_, value) in series.values.unwrap_or_default() {
399 if let Ok(v) = value.parse::<f64>()
400 && !v.is_nan()
401 && v.is_finite()
402 {
403 values.push(v);
404 }
405 }
406 }
407 }
408
409 Ok(values)
410 }
411
412 async fn discover_containers(
414 &self,
415 namespace: &str,
416 workload_pattern: &str,
417 ) -> Result<Vec<String>, PrometheusError> {
418 let query = format!(
419 r#"count by (container) (container_cpu_usage_seconds_total{{namespace="{}", pod=~"{}.*", container!="POD", container!=""}})"#,
420 namespace, workload_pattern
421 );
422
423 let url = format!(
424 "{}/api/v1/query?query={}",
425 self.base_url,
426 urlencoding::encode(&query)
427 );
428
429 let req = self.http_client.get(&url);
430 let response = self.add_auth(req).send().await?;
431
432 if !response.status().is_success() {
433 return Err(PrometheusError::QueryFailed(format!(
434 "HTTP {}",
435 response.status()
436 )));
437 }
438
439 let body: PrometheusResponse = response
440 .json()
441 .await
442 .map_err(|e| PrometheusError::ParseError(format!("Failed to parse response: {}", e)))?;
443
444 let mut containers = Vec::new();
445 if let Some(result) = body.data.result {
446 for series in result {
447 if let Some(container) = series.metric.get("container") {
448 containers.push(container.clone());
449 }
450 }
451 }
452
453 Ok(containers)
454 }
455}
456
457#[derive(Debug, Deserialize)]
462struct PrometheusResponse {
463 status: String,
464 error: Option<String>,
465 data: PrometheusData,
466}
467
468#[derive(Debug, Deserialize)]
469struct PrometheusData {
470 #[serde(rename = "resultType")]
471 #[allow(dead_code)]
472 result_type: Option<String>,
473 result: Option<Vec<PrometheusResult>>,
474}
475
476#[derive(Debug, Deserialize)]
477struct PrometheusResult {
478 metric: HashMap<String, String>,
479 #[allow(dead_code)]
480 value: Option<(f64, String)>, values: Option<Vec<(f64, String)>>, }
483
484fn parse_duration(duration: &str) -> Result<String, PrometheusError> {
490 let duration = duration.trim().to_lowercase();
491
492 if duration.ends_with("days") {
494 let num: u32 = duration
495 .trim_end_matches("days")
496 .trim()
497 .parse()
498 .map_err(|_| PrometheusError::ParseError("Invalid duration number".to_string()))?;
499 Ok(format!("{}d", num))
500 } else if duration.ends_with("day") {
501 let num: u32 = duration
502 .trim_end_matches("day")
503 .trim()
504 .parse()
505 .map_err(|_| PrometheusError::ParseError("Invalid duration number".to_string()))?;
506 Ok(format!("{}d", num))
507 } else if duration.ends_with("weeks") {
508 let num: u32 = duration
509 .trim_end_matches("weeks")
510 .trim()
511 .parse()
512 .map_err(|_| PrometheusError::ParseError("Invalid duration number".to_string()))?;
513 Ok(format!("{}d", num * 7))
514 } else if duration.ends_with("week") {
515 let num: u32 = duration
516 .trim_end_matches("week")
517 .trim()
518 .parse()
519 .map_err(|_| PrometheusError::ParseError("Invalid duration number".to_string()))?;
520 Ok(format!("{}d", num * 7))
521 } else if duration.ends_with('d')
522 || duration.ends_with('h')
523 || duration.ends_with('m')
524 || duration.ends_with('s')
525 {
526 Ok(duration)
528 } else {
529 let num: u32 = duration
531 .parse()
532 .map_err(|_| PrometheusError::ParseError(format!("Invalid duration: {}", duration)))?;
533 Ok(format!("{}d", num))
534 }
535}
536
537fn parse_duration_to_seconds(duration: &str) -> Result<u64, PrometheusError> {
539 let duration = duration.trim().to_lowercase();
540
541 let (num_str, unit) = if duration.ends_with("days") {
543 (duration.trim_end_matches("days").trim(), "d")
544 } else if duration.ends_with("day") {
545 (duration.trim_end_matches("day").trim(), "d")
546 } else if duration.ends_with("weeks") {
547 (duration.trim_end_matches("weeks").trim(), "w")
548 } else if duration.ends_with("week") {
549 (duration.trim_end_matches("week").trim(), "w")
550 } else if duration.ends_with('d') {
551 (duration.trim_end_matches('d'), "d")
552 } else if duration.ends_with('h') {
553 (duration.trim_end_matches('h'), "h")
554 } else if duration.ends_with('m') {
555 (duration.trim_end_matches('m'), "m")
556 } else if duration.ends_with('s') {
557 (duration.trim_end_matches('s'), "s")
558 } else {
559 (duration.as_str(), "d")
561 };
562
563 let num: u64 = num_str.parse().map_err(|_| {
564 PrometheusError::ParseError(format!("Invalid duration number: {}", duration))
565 })?;
566
567 let seconds = match unit {
568 "w" => num * 7 * 24 * 60 * 60,
569 "d" => num * 24 * 60 * 60,
570 "h" => num * 60 * 60,
571 "m" => num * 60,
572 "s" => num,
573 _ => num * 24 * 60 * 60, };
575
576 Ok(seconds)
577}
578
579fn percentile(values: &[f64], p: f64) -> f64 {
581 if values.is_empty() {
582 return 0.0;
583 }
584
585 let mut sorted = values.to_vec();
586 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
587
588 if p <= 0.0 {
589 return sorted[0];
590 }
591 if p >= 1.0 {
592 return sorted[sorted.len() - 1];
593 }
594
595 let index = (p * (sorted.len() - 1) as f64).round() as usize;
596 sorted[index]
597}
598
599fn average(values: &[f64]) -> f64 {
601 if values.is_empty() {
602 return 0.0;
603 }
604 values.iter().sum::<f64>() / values.len() as f64
605}
606
607fn round_cpu(millicores: u64) -> u64 {
610 if millicores == 0 {
611 0
612 } else if millicores <= 100 {
613 millicores.div_ceil(25) * 25
615 } else if millicores <= 1000 {
616 ((millicores + 25) / 50) * 50
618 } else {
619 ((millicores + 50) / 100) * 100
621 }
622}
623
624fn round_memory(bytes: u64) -> u64 {
626 const MI: u64 = 1024 * 1024;
627 const INCREMENT: u64 = 64 * MI;
628
629 if bytes <= 128 * MI {
630 let increment = 32 * MI;
632 ((bytes + increment / 2) / increment) * increment
633 } else {
634 ((bytes + INCREMENT / 2) / INCREMENT) * INCREMENT
636 }
637}
638
639#[cfg(test)]
640mod tests {
641 use super::*;
642
643 #[test]
644 fn test_parse_duration() {
645 assert_eq!(parse_duration("7d").unwrap(), "7d");
646 assert_eq!(parse_duration("24h").unwrap(), "24h");
647 assert_eq!(parse_duration("30m").unwrap(), "30m");
648 assert_eq!(parse_duration("1week").unwrap(), "7d");
649 assert_eq!(parse_duration("2weeks").unwrap(), "14d");
650 }
651
652 #[test]
653 fn test_percentile() {
654 let values = vec![10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0];
655 assert!((percentile(&values, 0.0) - 10.0).abs() < 0.1);
656 assert!((percentile(&values, 0.5) - 55.0).abs() < 5.1); assert!((percentile(&values, 1.0) - 100.0).abs() < 0.1);
658 }
659
660 #[test]
661 fn test_round_cpu() {
662 assert_eq!(round_cpu(12), 25);
663 assert_eq!(round_cpu(23), 25);
664 assert_eq!(round_cpu(37), 50);
665 assert_eq!(round_cpu(120), 100);
666 assert_eq!(round_cpu(175), 200);
667 assert_eq!(round_cpu(1234), 1200);
668 }
669
670 #[test]
671 fn test_round_memory() {
672 const MI: u64 = 1024 * 1024;
673 assert_eq!(round_memory(50 * MI), 64 * MI);
674 assert_eq!(round_memory(100 * MI), 96 * MI);
675 assert_eq!(round_memory(200 * MI), 192 * MI);
676 assert_eq!(round_memory(500 * MI), 512 * MI);
677 }
678
679 #[test]
680 fn test_parse_duration_to_seconds() {
681 assert_eq!(parse_duration_to_seconds("7d").unwrap(), 7 * 24 * 60 * 60);
683 assert_eq!(parse_duration_to_seconds("1d").unwrap(), 24 * 60 * 60);
684 assert_eq!(parse_duration_to_seconds("24h").unwrap(), 24 * 60 * 60);
686 assert_eq!(parse_duration_to_seconds("1h").unwrap(), 60 * 60);
687 assert_eq!(parse_duration_to_seconds("30m").unwrap(), 30 * 60);
689 assert_eq!(
691 parse_duration_to_seconds("1week").unwrap(),
692 7 * 24 * 60 * 60
693 );
694 assert_eq!(
695 parse_duration_to_seconds("2weeks").unwrap(),
696 14 * 24 * 60 * 60
697 );
698 }
699}