use crate::triples;
use crate::tuples;
use futures::StreamExt;
use k8s_openapi::api::core::v1::Pod;
use kube::api::Api;
use regex::Regex;
use sqlx::sqlite::SqlitePool;
use std::error::Error;
use tokio::io::AsyncWriteExt;
use tracing::{error, info, warn};
fn parse_help_type(line: &str) -> Result<(String, String), Box<dyn Error>> {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 3 {
return Err(From::from("Invalid line format"));
}
let first_word = format!("{} {}", parts[0], parts[1]);
if first_word != "# HELP" && first_word != "# TYPE" {
return Err(From::from("First word must be '# HELP' or '# TYPE'"));
}
let name = parts[2].to_string();
let value = if parts.len() > 3 {
parts[3..].join(" ")
} else {
name.replace('_', " ")
};
Ok((name, value))
}
fn parse_metric(
metrics_text: &str,
k8p_description: &str,
k8p_type: &str,
) -> Result<Vec<(String, String)>, Box<dyn Error>> {
let mut result = Vec::new();
if metrics_text.contains('{') {
let re = Regex::new(r"(?P<metric_name>[^{]+)\{(?P<labels>.*)\} (?P<value>.*)")?;
let caps = re
.captures(metrics_text)
.ok_or(format!("Failed to parse the input string: {metrics_text}"))?;
result.push((
"k8p_metric_name".to_string(),
caps["metric_name"].trim().to_string(),
));
result.push(("k8p_value".to_string(), caps["value"].trim().to_string()));
let labels_text = &caps["labels"];
let label_re = Regex::new(r#"(?P<key>[^=,]+)="(?P<value>[^"]*)""#)?;
for caps in label_re.captures_iter(labels_text) {
let key = caps["key"].trim();
let value = caps["value"].trim();
result.push((key.to_string(), value.to_string()));
}
} else {
let split: Vec<&str> = metrics_text.split_whitespace().collect();
if split.len() != 2 {
warn!("Failed to parse the input string: {}", metrics_text);
return Ok(Vec::new());
}
result.push(("k8p_metric_name".to_string(), split[0].to_string()));
result.push(("k8p_value".to_string(), split[1].to_string()));
}
result.push(("k8p_description".to_string(), k8p_description.to_string()));
result.push(("k8p_type".to_string(), k8p_type.to_string()));
Ok(result)
}
fn parse_all(metrics_text: &str) -> Vec<Vec<(String, String)>> {
let mut result = Vec::new();
let mut k8p_description = String::new();
let mut k8p_type = String::new();
for line in metrics_text.lines() {
let line = line.trim();
if line.starts_with("# HELP") || line.starts_with("# TYPE") {
match parse_help_type(line) {
Ok((_name, value)) => {
if line.starts_with("# HELP") {
k8p_description = value;
} else {
k8p_type = value;
}
}
Err(err) => warn!("Failed to parse line '{}': {}", line, err),
}
} else if !line.is_empty() {
match parse_metric(line, &k8p_description, &k8p_type) {
Ok(metric) => result.push(metric),
Err(err) => warn!("Failed to parse line '{}': {}", line, err),
}
}
}
result
}
pub async fn get_text(
pods: &Api<Pod>,
metadata_name: &str,
path: &str,
port: &str,
) -> Result<String, Box<dyn std::error::Error>> {
let local_port: u16 = port.parse()?;
let mut port_forwarder = pods.portforward(metadata_name, &[local_port]).await?;
let Some(mut port_stream) = port_forwarder.take_stream(local_port) else {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"Unable to take stream",
)));
};
let request = format!(
"GET {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\nAccept: */*\r\n\r\n"
);
port_stream.write_all(request.as_bytes()).await?;
let mut response_stream = tokio_util::io::ReaderStream::new(port_stream);
let mut metrics_text = String::new();
while let Some(response) = response_stream.next().await {
match response {
Ok(bytes) => {
metrics_text.push_str(std::str::from_utf8(&bytes[..])?);
}
Err(err) => error!("Error reading response: {:?}", err),
}
}
Ok(metrics_text)
}
pub async fn process(
pool: &SqlitePool,
pods: &Api<Pod>,
metadata_name: &str,
path: &str,
port: &str,
appname: &str,
namespace: &str,
) -> Result<(), Box<dyn std::error::Error>> {
info!("getting health from {}{}:{}", metadata_name, path, port);
let metrics_text = get_text(pods, metadata_name, path, port).await?;
let metrics = parse_all(&metrics_text);
let tuples = tuples::format(metrics, metadata_name, appname, namespace);
let triples = triples::format(tuples);
triples::persist(triples, pool).await
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_help_type_valid() {
let result =
parse_help_type("# HELP http_requests_total The total number of HTTP requests.")
.unwrap();
assert_eq!(
(
"http_requests_total".to_string(),
"The total number of HTTP requests.".to_string()
),
result
);
let result = parse_help_type("# TYPE http_requests_total counter").unwrap();
assert_eq!(
("http_requests_total".to_string(), "counter".to_string()),
result
);
}
#[test]
#[should_panic(expected = "Invalid line format")]
fn test_parse_help_type_invalid_format() {
parse_help_type("# HELP").unwrap();
}
#[test]
#[should_panic(expected = "First word must be '# HELP' or '# TYPE'")]
fn test_parse_help_type_invalid_first_word() {
parse_help_type("# INVALID http_requests_total counter").unwrap();
}
#[test]
fn test_parse_metric_with_labels() {
let result = parse_metric(
"http_requests_total{method=\"post\",code=\"200\"} 1027",
"The total number of HTTP requests.",
"counter",
)
.unwrap();
let expected = vec![
(
"k8p_metric_name".to_string(),
"http_requests_total".to_string(),
),
("k8p_value".to_string(), "1027".to_string()),
("method".to_string(), "post".to_string()),
("code".to_string(), "200".to_string()),
(
"k8p_description".to_string(),
"The total number of HTTP requests.".to_string(),
),
("k8p_type".to_string(), "counter".to_string()),
];
assert_eq!(expected, result);
}
#[test]
fn test_parse_metric_without_labels() {
let result = parse_metric(
"http_requests_total 1027",
"The total number of HTTP requests.",
"counter",
)
.unwrap();
let expected = vec![
(
"k8p_metric_name".to_string(),
"http_requests_total".to_string(),
),
("k8p_value".to_string(), "1027".to_string()),
(
"k8p_description".to_string(),
"The total number of HTTP requests.".to_string(),
),
("k8p_type".to_string(), "counter".to_string()),
];
assert_eq!(expected, result);
}
}