use std::collections::HashMap;
use crate::{ConfigError, SondaError};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ParsedColumnHeader {
pub metric_name: Option<String>,
pub labels: HashMap<String, String>,
}
pub(crate) fn parse_column_header(header: &str) -> Result<ParsedColumnHeader, SondaError> {
let header = header.trim();
if header.is_empty() {
return Ok(ParsedColumnHeader {
metric_name: None,
labels: HashMap::new(),
});
}
let brace_pos = header.find('{');
match brace_pos {
None => {
Ok(ParsedColumnHeader {
metric_name: Some(header.to_string()),
labels: HashMap::new(),
})
}
Some(0) => {
let mut labels = parse_label_block(header)?;
let name = labels.remove("__name__");
Ok(ParsedColumnHeader {
metric_name: name,
labels,
})
}
Some(pos) => {
let name = header[..pos].trim().to_string();
let labels = parse_label_block(&header[pos..])?;
Ok(ParsedColumnHeader {
metric_name: Some(name),
labels,
})
}
}
}
fn parse_label_block(block: &str) -> Result<HashMap<String, String>, SondaError> {
let block = block.trim();
if !block.starts_with('{') {
return Err(SondaError::Config(ConfigError::invalid(
"csv_header: label block must start with '{'",
)));
}
let close = block.rfind('}').ok_or_else(|| {
SondaError::Config(ConfigError::invalid(
"csv_header: unmatched '{' — missing closing '}'",
))
})?;
let inner = block[1..close].trim();
if inner.is_empty() {
return Ok(HashMap::new());
}
parse_label_pairs(inner)
}
fn parse_label_pairs(inner: &str) -> Result<HashMap<String, String>, SondaError> {
let mut labels = HashMap::new();
let mut remaining = inner.trim();
while !remaining.is_empty() {
let eq_pos = remaining.find('=').ok_or_else(|| {
SondaError::Config(ConfigError::invalid(format!(
"csv_header: expected '=' in label pair, got: {:?}",
remaining
)))
})?;
let key = remaining[..eq_pos].trim();
if key.is_empty() {
return Err(SondaError::Config(ConfigError::invalid(
"csv_header: empty label key",
)));
}
remaining = remaining[eq_pos + 1..].trim_start();
let (value, rest) = if let Some(stripped) = remaining.strip_prefix('"') {
parse_quoted_value(stripped)?
} else {
parse_unquoted_value(remaining)?
};
labels.insert(key.to_string(), value);
remaining = rest.trim_start();
if remaining.starts_with(',') {
remaining = remaining[1..].trim_start();
}
}
Ok(labels)
}
fn parse_quoted_value(input: &str) -> Result<(String, &str), SondaError> {
let mut value = String::new();
let mut chars = input.char_indices();
loop {
match chars.next() {
None => {
return Err(SondaError::Config(ConfigError::invalid(
"csv_header: unterminated quoted value",
)));
}
Some((_, '\\')) => {
if let Some((_, ch)) = chars.next() {
value.push(ch);
} else {
return Err(SondaError::Config(ConfigError::invalid(
"csv_header: unterminated escape in quoted value",
)));
}
}
Some((i, '"')) => {
let rest = &input[i + 1..];
return Ok((value, rest));
}
Some((_, ch)) => {
value.push(ch);
}
}
}
}
fn parse_unquoted_value(input: &str) -> Result<(String, &str), SondaError> {
match input.find(',') {
Some(pos) => {
let value = input[..pos].trim().to_string();
Ok((value, &input[pos..]))
}
None => {
let value = input.trim().to_string();
Ok((value, ""))
}
}
}
pub fn is_header_line(line: &str) -> bool {
let fields: Vec<&str> = line.split(',').collect();
if fields.len() <= 1 {
return fields
.first()
.map(|f| f.trim().parse::<f64>().is_err())
.unwrap_or(false);
}
fields
.iter()
.skip(1)
.any(|f| f.trim().parse::<f64>().is_err())
}
pub fn split_csv_header_fields(line: &str) -> Vec<String> {
let mut fields = Vec::new();
let mut current = String::new();
let mut in_quotes = false;
let mut chars = line.chars().peekable();
while let Some(ch) = chars.next() {
if in_quotes {
if ch == '"' {
if chars.peek() == Some(&'"') {
current.push('"');
chars.next();
} else {
in_quotes = false;
}
} else {
current.push(ch);
}
} else {
match ch {
',' => {
fields.push(current.clone());
current.clear();
}
'"' => {
in_quotes = true;
}
_ => {
current.push(ch);
}
}
}
}
fields.push(current);
fields
}
pub fn parse_header_row(line: &str) -> Result<Vec<ParsedColumnHeader>, SondaError> {
let fields = split_csv_header_fields(line);
fields
.iter()
.map(|field| parse_column_header(field))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn format1_name_from_dunder_name_label() {
let h =
parse_column_header(r#"{__name__="up", instance="localhost:9090", job="prometheus"}"#)
.expect("format 1 must parse");
assert_eq!(h.metric_name.as_deref(), Some("up"));
assert_eq!(
h.labels.get("instance").map(|s| s.as_str()),
Some("localhost:9090")
);
assert_eq!(h.labels.get("job").map(|s| s.as_str()), Some("prometheus"));
assert!(
!h.labels.contains_key("__name__"),
"__name__ must be removed from labels"
);
}
#[test]
fn format1_name_only_in_dunder() {
let h = parse_column_header(r#"{__name__="process_cpu_seconds_total"}"#)
.expect("single __name__ must parse");
assert_eq!(h.metric_name.as_deref(), Some("process_cpu_seconds_total"));
assert!(h.labels.is_empty());
}
#[test]
fn format2_name_before_brace() {
let h = parse_column_header(r#"up{instance="localhost:9090", job="prometheus"}"#)
.expect("format 2 must parse");
assert_eq!(h.metric_name.as_deref(), Some("up"));
assert_eq!(
h.labels.get("instance").map(|s| s.as_str()),
Some("localhost:9090")
);
assert_eq!(h.labels.get("job").map(|s| s.as_str()), Some("prometheus"));
}
#[test]
fn format2_name_with_empty_labels() {
let h = parse_column_header("up{}").expect("empty braces must parse");
assert_eq!(h.metric_name.as_deref(), Some("up"));
assert!(h.labels.is_empty());
}
#[test]
fn format2_name_with_single_label() {
let h = parse_column_header(r#"http_requests_total{method="GET"}"#)
.expect("single label must parse");
assert_eq!(h.metric_name.as_deref(), Some("http_requests_total"));
assert_eq!(h.labels.len(), 1);
assert_eq!(h.labels.get("method").map(|s| s.as_str()), Some("GET"));
}
#[test]
fn format3_labels_only_no_name() {
let h = parse_column_header(r#"{instance="foo", job="bar"}"#).expect("format 3 must parse");
assert!(h.metric_name.is_none(), "format 3 must have no metric name");
assert_eq!(h.labels.get("instance").map(|s| s.as_str()), Some("foo"));
assert_eq!(h.labels.get("job").map(|s| s.as_str()), Some("bar"));
}
#[test]
fn format4_plain_name() {
let h = parse_column_header("cpu_percent").expect("plain name must parse");
assert_eq!(h.metric_name.as_deref(), Some("cpu_percent"));
assert!(h.labels.is_empty());
}
#[test]
fn format5_simple_word() {
let h = parse_column_header("prometheus").expect("simple word must parse");
assert_eq!(h.metric_name.as_deref(), Some("prometheus"));
assert!(h.labels.is_empty());
}
#[test]
fn plain_name_with_whitespace() {
let h = parse_column_header(" cpu_percent ").expect("trimmed plain name must parse");
assert_eq!(h.metric_name.as_deref(), Some("cpu_percent"));
assert!(h.labels.is_empty());
}
#[test]
fn empty_header_returns_no_name_no_labels() {
let h = parse_column_header("").expect("empty header must parse");
assert!(h.metric_name.is_none());
assert!(h.labels.is_empty());
}
#[test]
fn whitespace_only_header() {
let h = parse_column_header(" ").expect("whitespace header must parse");
assert!(h.metric_name.is_none());
assert!(h.labels.is_empty());
}
#[test]
fn empty_braces() {
let h = parse_column_header("{}").expect("empty braces must parse");
assert!(h.metric_name.is_none());
assert!(h.labels.is_empty());
}
#[test]
fn spaces_around_label_pairs() {
let h = parse_column_header(r#"{ instance = "foo" , job = "bar" }"#)
.expect("spaces around pairs must parse");
assert!(h.metric_name.is_none());
assert_eq!(h.labels.get("instance").map(|s| s.as_str()), Some("foo"));
assert_eq!(h.labels.get("job").map(|s| s.as_str()), Some("bar"));
}
#[test]
fn label_value_with_escaped_quote() {
let h =
parse_column_header(r#"{path="say \"hello\""}"#).expect("escaped quotes must parse");
assert_eq!(
h.labels.get("path").map(|s| s.as_str()),
Some(r#"say "hello""#)
);
}
#[test]
fn label_value_with_comma_inside_quotes() {
let h = parse_column_header(r#"{path="a,b"}"#).expect("comma in quoted value must parse");
assert_eq!(h.labels.get("path").map(|s| s.as_str()), Some("a,b"));
}
#[test]
fn multiple_labels_three() {
let h = parse_column_header(
r#"{__name__="metric", instance="host:9090", job="prom", env="prod"}"#,
)
.expect("multiple labels must parse");
assert_eq!(h.metric_name.as_deref(), Some("metric"));
assert_eq!(h.labels.len(), 3);
assert_eq!(h.labels.get("env").map(|s| s.as_str()), Some("prod"));
}
#[test]
fn unmatched_open_brace_returns_error() {
let result = parse_column_header("{instance=\"foo\"");
assert!(result.is_err(), "unmatched brace must error");
let msg = result.unwrap_err().to_string();
assert!(msg.contains("missing closing '}'"), "got: {msg}");
}
#[test]
fn missing_equals_returns_error() {
let result = parse_column_header("{instance}");
assert!(result.is_err(), "missing = must error");
let msg = result.unwrap_err().to_string();
assert!(msg.contains("'='"), "got: {msg}");
}
#[test]
fn empty_key_returns_error() {
let result = parse_column_header(r#"{="value"}"#);
assert!(result.is_err(), "empty key must error");
let msg = result.unwrap_err().to_string();
assert!(msg.contains("empty label key"), "got: {msg}");
}
#[test]
fn unterminated_quoted_value_returns_error() {
let result = parse_column_header(r#"{key="unterminated}"#);
assert!(result.is_err(), "unterminated quote must error");
}
#[test]
fn split_simple_unquoted_fields() {
let fields = split_csv_header_fields("timestamp,cpu,mem");
assert_eq!(fields, vec!["timestamp", "cpu", "mem"]);
}
#[test]
fn split_quoted_fields_strip_outer_quotes() {
let fields = split_csv_header_fields(r#""Time","Value""#);
assert_eq!(fields, vec!["Time", "Value"]);
}
#[test]
fn split_rfc4180_escaped_quotes() {
let line = r#""Time","{__name__=""up"", job=""prom""}""#;
let fields = split_csv_header_fields(line);
assert_eq!(fields.len(), 2);
assert_eq!(fields[0], "Time");
assert_eq!(fields[1], r#"{__name__="up", job="prom"}"#);
}
#[test]
fn split_empty_line() {
let fields = split_csv_header_fields("");
assert_eq!(fields, vec![""]);
}
#[test]
fn split_single_field() {
let fields = split_csv_header_fields("timestamp");
assert_eq!(fields, vec!["timestamp"]);
}
#[test]
fn split_mixed_quoted_and_unquoted() {
let fields = split_csv_header_fields(r#"Time,"cpu_percent",mem"#);
assert_eq!(fields, vec!["Time", "cpu_percent", "mem"]);
}
#[test]
fn split_grafana_style_header() {
let line = r#""Time","{__name__=""up"", instance=""localhost:9090"", job=""prometheus""}","{__name__=""up"", instance=""localhost:9100"", job=""node""}""#;
let fields = split_csv_header_fields(line);
assert_eq!(fields.len(), 3);
assert_eq!(fields[0], "Time");
assert_eq!(
fields[1],
r#"{__name__="up", instance="localhost:9090", job="prometheus"}"#
);
assert_eq!(
fields[2],
r#"{__name__="up", instance="localhost:9100", job="node"}"#
);
}
#[test]
fn parse_header_row_plain_columns() {
let headers = parse_header_row("timestamp,cpu_percent,mem_percent")
.expect("plain headers must parse");
assert_eq!(headers.len(), 3);
assert_eq!(headers[0].metric_name.as_deref(), Some("timestamp"));
assert_eq!(headers[1].metric_name.as_deref(), Some("cpu_percent"));
assert_eq!(headers[2].metric_name.as_deref(), Some("mem_percent"));
}
#[test]
fn parse_header_row_grafana_export() {
let line = r#""Time","{__name__=""up"", instance=""localhost:9090"", job=""prometheus""}","{__name__=""up"", instance=""localhost:9100"", job=""node""}""#;
let headers = parse_header_row(line).expect("grafana headers must parse");
assert_eq!(headers.len(), 3);
assert_eq!(headers[0].metric_name.as_deref(), Some("Time"));
assert!(headers[0].labels.is_empty());
assert_eq!(headers[1].metric_name.as_deref(), Some("up"));
assert_eq!(
headers[1].labels.get("instance").map(|s| s.as_str()),
Some("localhost:9090")
);
assert_eq!(
headers[1].labels.get("job").map(|s| s.as_str()),
Some("prometheus")
);
assert_eq!(headers[2].metric_name.as_deref(), Some("up"));
assert_eq!(
headers[2].labels.get("instance").map(|s| s.as_str()),
Some("localhost:9100")
);
assert_eq!(
headers[2].labels.get("job").map(|s| s.as_str()),
Some("node")
);
}
#[test]
fn parse_header_row_format2_mixed() {
let line = r#"Time,up{instance="host1"},up{instance="host2"}"#;
let headers = parse_header_row(line).expect("format2 headers must parse");
assert_eq!(headers.len(), 3);
assert_eq!(headers[1].metric_name.as_deref(), Some("up"));
assert_eq!(
headers[1].labels.get("instance").map(|s| s.as_str()),
Some("host1")
);
assert_eq!(headers[2].metric_name.as_deref(), Some("up"));
assert_eq!(
headers[2].labels.get("instance").map(|s| s.as_str()),
Some("host2")
);
}
#[test]
fn unquoted_label_value() {
let h = parse_column_header("{key=value}").expect("unquoted value must parse");
assert_eq!(h.labels.get("key").map(|s| s.as_str()), Some("value"));
}
fn assert_send_sync<T: Send + Sync>() {}
#[test]
fn parsed_column_header_is_send_and_sync() {
assert_send_sync::<ParsedColumnHeader>();
}
#[test]
fn determinism_same_header_twice() {
let header = r#"{__name__="up", instance="localhost:9090", job="prometheus"}"#;
let a = parse_column_header(header).expect("first parse");
let b = parse_column_header(header).expect("second parse");
assert_eq!(a, b);
}
#[test]
fn is_header_line_detects_text_header() {
assert!(is_header_line("timestamp,cpu,mem"));
}
#[test]
fn is_header_line_rejects_all_numeric() {
assert!(!is_header_line("1000,42.5,99.1"));
}
#[test]
fn is_header_line_single_column_text() {
assert!(is_header_line("metric_name"));
}
#[test]
fn is_header_line_single_column_numeric() {
assert!(!is_header_line("42.5"));
}
#[test]
fn is_header_line_first_col_numeric_second_text() {
assert!(is_header_line("1000,cpu_percent"));
}
#[test]
fn is_header_line_empty_string_is_non_numeric() {
assert!(is_header_line(""));
}
}