use anyhow::{Context, Result};
use reqwest::{
header::{HeaderMap, HeaderName, HeaderValue},
Client as ReqwestClient,
};
use std::str::FromStr;
use crate::processing::{
compact_telemetry_payloads,
send_telemetry_payload,
SpanCompactionConfig,
TelemetryData,
};
pub fn parse_otlp_headers_from_vec(headers_vec: &[String]) -> Result<HeaderMap> {
let mut otlp_header_map = HeaderMap::new();
for header_str in headers_vec {
let parts: Vec<&str> = header_str.splitn(2, '=').collect();
if parts.len() == 2 {
let header_name = HeaderName::from_str(parts[0])
.with_context(|| format!("Invalid OTLP header name: {}", parts[0]))?;
let header_value = HeaderValue::from_str(parts[1]).with_context(|| {
format!("Invalid OTLP header value for {}: {}", parts[0], parts[1])
})?;
otlp_header_map.insert(header_name, header_value);
} else {
tracing::warn!(
"Ignoring malformed OTLP header (expected Key=Value): {}",
header_str
);
}
}
Ok(otlp_header_map)
}
pub async fn send_batch(
http_client: &ReqwestClient,
endpoint: &str,
batch: Vec<TelemetryData>,
compaction_config: &SpanCompactionConfig,
headers: HeaderMap,
) -> Result<()> {
if batch.is_empty() {
return Ok(());
}
tracing::debug!("Compacting batch of {} item(s)...", batch.len());
match compact_telemetry_payloads(batch, compaction_config) {
Ok(compacted_data) => {
tracing::debug!(
"Sending compacted batch ({} bytes) to {}",
compacted_data.payload.len(),
endpoint
);
if let Err(e) =
send_telemetry_payload(http_client, endpoint, compacted_data.payload, headers).await
{
tracing::error!("Failed to send compacted batch: {}", e);
}
}
Err(e) => {
tracing::error!("Failed to compact telemetry batch: {}", e);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use reqwest::header::HeaderValue;
#[test]
fn test_parse_otlp_headers_valid() {
let headers_vec = vec![
"Authorization=Bearer 123".to_string(),
"X-Custom-Header=Value456".to_string(),
];
let result = parse_otlp_headers_from_vec(&headers_vec).unwrap();
assert_eq!(result.len(), 2);
assert_eq!(
result.get("Authorization").unwrap(),
&HeaderValue::from_str("Bearer 123").unwrap()
);
assert_eq!(
result.get("x-custom-header").unwrap(),
&HeaderValue::from_str("Value456").unwrap()
);
}
#[test]
fn test_parse_otlp_headers_malformed() {
let headers_vec = vec![
"Authorization=Bearer 123".to_string(),
"MalformedHeader".to_string(), "EmptyValue=".to_string(), "=NoKey".to_string(), "Key=Val1,Key=Val2".to_string(), ];
let result = parse_otlp_headers_from_vec(&headers_vec);
assert!(result.is_err());
}
#[test]
fn test_parse_otlp_headers_empty_input() {
let headers_vec: Vec<String> = vec![];
let result = parse_otlp_headers_from_vec(&headers_vec).unwrap();
assert!(result.is_empty());
}
#[test]
fn test_parse_otlp_headers_invalid_name() {
let headers_vec = vec!["=NoKey".to_string()];
let result = parse_otlp_headers_from_vec(&headers_vec);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Invalid OTLP header name"));
}
}