use std::time::Duration;
use crate::event::SpanEvent;
use crate::http_client::{self, HttpClient};
use crate::ingest::auth_header::AuthHeader;
use crate::ingest::lookback::LookbackError;
use crate::ingest::otlp::convert_otlp_request;
use crate::ingest::url_enc::{percent_encode_query_value, validate_http_endpoint};
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use prost::Message;
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum TempoError {
#[error("invalid Tempo endpoint: {0}")]
InvalidEndpoint(String),
#[error("invalid lookback duration: {0}")]
InvalidLookback(#[from] LookbackError),
#[error("invalid auth header: {0}")]
InvalidAuthHeader(String),
#[error("HTTP transport error: {0}")]
Transport(String),
#[error("Tempo returned HTTP {status} for {url}")]
HttpStatus { status: u16, url: String },
#[error("request timed out")]
Timeout,
#[error("failed to read response body: {0}")]
BodyRead(String),
#[error("failed to decode protobuf response: {0}")]
ProtobufDecode(String),
#[error("failed to parse JSON response: {0}")]
JsonParse(String),
#[error("trace not found: {0}")]
TraceNotFound(String),
#[error("no traces found for the given search criteria")]
NoTracesFound,
#[error("Tempo fetch was interrupted by Ctrl-C before any trace completed")]
Interrupted,
}
pub fn parse_lookback(s: &str) -> Result<Duration, TempoError> {
crate::ingest::lookback::parse(s).map_err(Into::into)
}
#[derive(serde::Deserialize)]
struct SearchResponse {
#[serde(default)]
traces: Vec<TraceMeta>,
}
#[derive(serde::Deserialize)]
struct TraceMeta {
#[serde(rename = "traceID")]
trace_id: String,
}
const MAX_SEARCH_BODY_BYTES: usize = 1024 * 1024;
const MAX_TRACE_BODY_BYTES: usize = 64 * 1024 * 1024;
const SEARCH_TIMEOUT: Duration = Duration::from_secs(5);
const FETCH_TRACE_TIMEOUT: Duration = Duration::from_secs(30);
const FETCH_CONCURRENCY: usize = 16;
async fn fetch_raw(
client: &HttpClient,
uri: hyper::Uri,
accept: &'static str,
max_bytes: usize,
map_404: bool,
timeout: Duration,
auth: Option<&AuthHeader>,
) -> Result<bytes::Bytes, TempoError> {
let mut builder = hyper::Request::builder()
.method(hyper::Method::GET)
.uri(&uri)
.header("Accept", accept)
.header("User-Agent", "perf-sentinel");
if let Some(auth) = auth {
builder = builder.header(&auth.name, &auth.value);
}
let req = builder
.body(http_body_util::Empty::<bytes::Bytes>::new())
.map_err(|e| TempoError::Transport(e.to_string()))?;
let resp = tokio::time::timeout(timeout, client.request(req))
.await
.map_err(|_| TempoError::Timeout)?
.map_err(|e| TempoError::Transport(e.to_string()))?;
let status = resp.status().as_u16();
if map_404 && status == 404 {
return Err(TempoError::TraceNotFound(http_client::redact_endpoint(
&uri,
)));
}
if status != 200 {
return Err(TempoError::HttpStatus {
status,
url: http_client::redact_endpoint(&uri),
});
}
let limited = http_body_util::Limited::new(resp.into_body(), max_bytes);
let body = http_body_util::BodyExt::collect(limited)
.await
.map_err(|e| TempoError::BodyRead(e.to_string()))?
.to_bytes();
Ok(body)
}
async fn fetch_bytes(
client: &HttpClient,
uri: hyper::Uri,
max_bytes: usize,
auth: Option<&AuthHeader>,
) -> Result<bytes::Bytes, TempoError> {
fetch_raw(
client,
uri,
"application/protobuf",
max_bytes,
true,
FETCH_TRACE_TIMEOUT,
auth,
)
.await
}
async fn fetch_json(
client: &HttpClient,
uri: hyper::Uri,
max_bytes: usize,
auth: Option<&AuthHeader>,
) -> Result<String, TempoError> {
let body = fetch_raw(
client,
uri,
"application/json",
max_bytes,
false,
SEARCH_TIMEOUT,
auth,
)
.await?;
String::from_utf8(body.to_vec()).map_err(|e| TempoError::BodyRead(e.to_string()))
}
pub async fn search_traces(
client: &HttpClient,
endpoint: &str,
service: &str,
lookback: Duration,
limit: usize,
auth: Option<&AuthHeader>,
) -> Result<Vec<String>, TempoError> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default();
let end = now.as_secs();
let start = end.saturating_sub(lookback.as_secs());
let encoded_service = percent_encode_query_value(service);
let uri_str = format!(
"{endpoint}/api/search?tags=service.name%3D{encoded_service}&start={start}&end={end}&limit={limit}"
);
let uri: hyper::Uri = uri_str
.parse()
.map_err(|_| TempoError::InvalidEndpoint(endpoint.to_string()))?;
let json = fetch_json(client, uri, MAX_SEARCH_BODY_BYTES, auth).await?;
let response: SearchResponse =
serde_json::from_str(&json).map_err(|e| TempoError::JsonParse(e.to_string()))?;
let ids: Vec<String> = response.traces.into_iter().map(|t| t.trace_id).collect();
if ids.is_empty() {
return Err(TempoError::NoTracesFound);
}
Ok(ids)
}
pub async fn fetch_trace(
client: &HttpClient,
endpoint: &str,
trace_id: &str,
auth: Option<&AuthHeader>,
) -> Result<Vec<SpanEvent>, TempoError> {
if !trace_id.bytes().all(|b| b.is_ascii_hexdigit()) {
return Err(TempoError::InvalidEndpoint(format!(
"trace ID '{trace_id}' contains non-hex characters"
)));
}
let uri_str = format!("{endpoint}/api/traces/{trace_id}");
let uri: hyper::Uri = uri_str
.parse()
.map_err(|_| TempoError::InvalidEndpoint(endpoint.to_string()))?;
let body = fetch_bytes(client, uri, MAX_TRACE_BODY_BYTES, auth).await?;
let request = ExportTraceServiceRequest::decode(body)
.map_err(|e| TempoError::ProtobufDecode(e.to_string()))?;
Ok(convert_otlp_request(&request))
}
fn classify_fetch_error(error: &TempoError) -> &'static str {
match error {
TempoError::Timeout => "timeout",
TempoError::Transport(_) => "transport",
TempoError::HttpStatus { .. } => "http_status",
TempoError::ProtobufDecode(_) => "protobuf_decode",
TempoError::BodyRead(_) => "body_read",
TempoError::JsonParse(_) => "json_parse",
_ => "other",
}
}
fn emit_fetch_summary(
fail_counts: &std::collections::HashMap<&'static str, usize>,
total_attempted: usize,
interrupted: bool,
collected_events: usize,
) {
if !fail_counts.is_empty() {
let total_failures: usize = fail_counts.values().sum();
let only_not_found = fail_counts.keys().all(|k| *k == "not_found");
if only_not_found {
tracing::warn!(
total_failures,
total_attempted,
counts = ?fail_counts,
"Tempo fetch completed with skipped traces"
);
} else {
tracing::error!(
total_failures,
total_attempted,
counts = ?fail_counts,
"Tempo fetch completed with failures"
);
}
}
if interrupted {
tracing::warn!(
collected = collected_events,
total_attempted,
"Tempo fetch interrupted by Ctrl-C, proceeding with partial results"
);
}
}
pub async fn ingest_from_tempo(
endpoint: &str,
service: Option<&str>,
trace_id: Option<&str>,
lookback: Duration,
max_traces: usize,
auth_header: Option<&str>,
) -> Result<Vec<SpanEvent>, TempoError> {
validate_http_endpoint(endpoint)
.map_err(|msg| TempoError::InvalidEndpoint(format!("{msg}, got '{endpoint}'")))?;
let parsed_auth = auth_header
.map(AuthHeader::parse)
.transpose()
.map_err(|msg| TempoError::InvalidAuthHeader(msg.to_string()))?;
if let Some(auth) = parsed_auth.as_ref() {
tracing::info!(header_name = %auth.name, "Using auth header for Tempo requests");
if endpoint.starts_with("http://") {
tracing::warn!(
"Sending auth header over cleartext HTTP, prefer https:// to avoid credential leak"
);
}
}
let client = http_client::build_client();
if let Some(tid) = trace_id {
tracing::info!(trace_id = tid, "Fetching single trace from Tempo");
return fetch_trace(&client, endpoint, tid, parsed_auth.as_ref()).await;
}
let svc = service.ok_or_else(|| {
TempoError::InvalidEndpoint("either --trace-id or --service is required".to_string())
})?;
tracing::info!(
service = svc,
lookback_secs = lookback.as_secs(),
max_traces,
"Searching Tempo for traces"
);
let trace_ids = search_traces(
&client,
endpoint,
svc,
lookback,
max_traces,
parsed_auth.as_ref(),
)
.await?;
let total = trace_ids.len();
tracing::info!(count = total, "Found traces, fetching...");
let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(FETCH_CONCURRENCY));
let mut set: tokio::task::JoinSet<(String, Result<Vec<SpanEvent>, TempoError>)> =
tokio::task::JoinSet::new();
for tid in trace_ids {
let client_clone = client.clone();
let endpoint_owned = endpoint.to_string();
let auth_clone = parsed_auth.clone();
let sem = std::sync::Arc::clone(&semaphore);
set.spawn(async move {
let Ok(_permit) = sem.acquire_owned().await else {
return (
tid,
Err(TempoError::Transport("semaphore closed".to_string())),
);
};
let result =
fetch_trace(&client_clone, &endpoint_owned, &tid, auth_clone.as_ref()).await;
(tid, result)
});
}
let drained = drain_fetch_set(set, total).await;
emit_fetch_summary(
&drained.fail_counts,
total,
drained.interrupted,
drained.events.len(),
);
if drained.events.is_empty() {
if drained.interrupted {
return Err(TempoError::Interrupted);
}
return Err(TempoError::NoTracesFound);
}
Ok(drained.events)
}
struct FetchLoopOutcome {
events: Vec<SpanEvent>,
fail_counts: std::collections::HashMap<&'static str, usize>,
interrupted: bool,
}
async fn drain_fetch_set(
mut set: tokio::task::JoinSet<(String, Result<Vec<SpanEvent>, TempoError>)>,
total: usize,
) -> FetchLoopOutcome {
let shutdown_signal = tokio::signal::ctrl_c();
tokio::pin!(shutdown_signal);
let mut events = Vec::new();
let mut done: usize = 0;
let mut fail_counts: std::collections::HashMap<&'static str, usize> =
std::collections::HashMap::new();
let mut interrupted = false;
loop {
tokio::select! {
biased;
_ = &mut shutdown_signal, if !interrupted => {
tracing::warn!(
completed = done,
pending = set.len(),
"Received Ctrl-C, aborting in-flight Tempo fetches"
);
set.abort_all();
interrupted = true;
}
maybe_result = set.join_next() => {
match maybe_result {
None => break,
Some(Ok((tid, Ok(batch)))) => {
done += 1;
tracing::debug!(
trace_id = %tid,
events = batch.len(),
progress = format!("{done}/{total}"),
"Fetched trace"
);
events.extend(batch);
}
Some(Ok((tid, Err(TempoError::TraceNotFound(_))))) => {
done += 1;
*fail_counts.entry("not_found").or_insert(0) += 1;
tracing::debug!(trace_id = %tid, "Trace not found, skipping");
}
Some(Ok((tid, Err(e)))) => {
done += 1;
*fail_counts.entry(classify_fetch_error(&e)).or_insert(0) += 1;
tracing::debug!(
trace_id = %tid,
error = %e,
"Failed to fetch trace, skipping"
);
}
Some(Err(e)) if e.is_cancelled() => {
}
Some(Err(e)) => {
done += 1;
*fail_counts.entry("task_panic").or_insert(0) += 1;
tracing::error!(error = %e, "Trace fetch task panicked");
}
}
}
}
}
FetchLoopOutcome {
events,
fail_counts,
interrupted,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_lookback_wraps_shared_helper() {
assert_eq!(parse_lookback("1h").unwrap(), Duration::from_hours(1));
let err = parse_lookback("").expect_err("empty must fail");
assert!(matches!(err, TempoError::InvalidLookback(_)));
}
#[test]
fn parse_search_response() {
let json = r#"{"traces":[{"traceID":"abc123"},{"traceID":"def456"}]}"#;
let response: SearchResponse = serde_json::from_str(json).unwrap();
assert_eq!(response.traces.len(), 2);
assert_eq!(response.traces[0].trace_id, "abc123");
assert_eq!(response.traces[1].trace_id, "def456");
}
#[test]
fn parse_search_response_empty() {
let json = r#"{"traces":[]}"#;
let response: SearchResponse = serde_json::from_str(json).unwrap();
assert!(response.traces.is_empty());
}
#[test]
fn parse_search_response_missing_traces() {
let json = r"{}";
let response: SearchResponse = serde_json::from_str(json).unwrap();
assert!(response.traces.is_empty());
}
#[test]
fn protobuf_decode_empty_request() {
let request = ExportTraceServiceRequest {
resource_spans: vec![],
};
let mut buf = Vec::new();
request.encode(&mut buf).unwrap();
let decoded = ExportTraceServiceRequest::decode(bytes::Bytes::from(buf)).unwrap();
let events = convert_otlp_request(&decoded);
assert!(events.is_empty());
}
use crate::test_helpers::{http_200_bytes, http_200_text, http_status, spawn_one_shot_server};
fn http_200_json(body: &str) -> Vec<u8> {
http_200_text("application/json", body)
}
fn http_200_proto(body: &[u8]) -> Vec<u8> {
http_200_bytes("application/protobuf", body)
}
#[tokio::test]
async fn ingest_from_tempo_rejects_non_http_scheme() {
let err = ingest_from_tempo(
"ftp://tempo.local",
Some("foo-svc"),
None,
Duration::from_mins(1),
10,
None,
)
.await
.expect_err("non-http must be rejected");
match err {
TempoError::InvalidEndpoint(msg) => assert!(msg.contains("http://")),
other => panic!("expected InvalidEndpoint, got {other:?}"),
}
}
#[tokio::test]
async fn ingest_from_tempo_rejects_credentials_in_endpoint() {
let err = ingest_from_tempo(
"http://user:pass@tempo.local",
None,
Some("abc"),
Duration::from_mins(1),
10,
None,
)
.await
.expect_err("credentials must be rejected");
match err {
TempoError::InvalidEndpoint(msg) => assert!(msg.contains("credentials")),
other => panic!("expected InvalidEndpoint, got {other:?}"),
}
}
#[tokio::test]
async fn ingest_from_tempo_rejects_missing_service_and_trace_id() {
let err = ingest_from_tempo(
"http://tempo.local",
None,
None,
Duration::from_mins(1),
10,
None,
)
.await
.expect_err("missing both must be rejected");
match err {
TempoError::InvalidEndpoint(msg) => {
assert!(msg.contains("trace-id") || msg.contains("service"));
}
other => panic!("expected InvalidEndpoint, got {other:?}"),
}
}
#[tokio::test]
async fn ingest_from_tempo_accepts_percent_encoded_at_in_query_string() {
let result = ingest_from_tempo(
"http://127.0.0.1:1/api/traces?owner=foo%40example.com",
None,
Some("abc123"),
Duration::from_mins(1),
10,
None,
)
.await;
match result {
Err(TempoError::InvalidEndpoint(msg)) if msg.contains("credentials") => {
panic!("validator must not reject `@` in the query string");
}
_ => {} }
}
#[tokio::test]
async fn fetch_trace_rejects_non_hex_trace_id() {
let client = http_client::build_client();
let err = fetch_trace(&client, "http://tempo.local", "not-hex-id!", None)
.await
.expect_err("non-hex must be rejected");
match err {
TempoError::InvalidEndpoint(msg) => assert!(msg.contains("non-hex")),
other => panic!("expected InvalidEndpoint, got {other:?}"),
}
}
#[tokio::test]
async fn fetch_trace_decodes_empty_otlp_request() {
let request = ExportTraceServiceRequest {
resource_spans: vec![],
};
let mut buf = Vec::new();
request.encode(&mut buf).unwrap();
let (endpoint, server) = spawn_one_shot_server(http_200_proto(&buf)).await;
let client = http_client::build_client();
let events = fetch_trace(&client, &endpoint, "abc123def456", None)
.await
.expect("valid OTLP must decode");
assert!(events.is_empty());
server.await.unwrap();
}
#[tokio::test]
async fn fetch_trace_surfaces_404_as_trace_not_found() {
let (endpoint, server) = spawn_one_shot_server(http_status(404, "Not Found")).await;
let client = http_client::build_client();
let err = fetch_trace(&client, &endpoint, "abc123", None)
.await
.expect_err("404 must surface as TraceNotFound");
assert!(matches!(err, TempoError::TraceNotFound(_)));
server.await.unwrap();
}
#[tokio::test]
async fn fetch_trace_surfaces_500_as_http_status() {
let (endpoint, server) = spawn_one_shot_server(http_status(500, "Internal")).await;
let client = http_client::build_client();
let err = fetch_trace(&client, &endpoint, "abc123", None)
.await
.expect_err("500 must surface as HttpStatus");
match err {
TempoError::HttpStatus { status: 500, .. } => {}
other => panic!("expected HttpStatus {{ status: 500, .. }}, got {other:?}"),
}
server.await.unwrap();
}
#[tokio::test]
async fn fetch_trace_rejects_malformed_protobuf() {
let garbage = http_200_proto(b"\xff\xff\xff\xff\xff\xff\xff\xff");
let (endpoint, server) = spawn_one_shot_server(garbage).await;
let client = http_client::build_client();
let err = fetch_trace(&client, &endpoint, "abc123", None)
.await
.expect_err("malformed protobuf must surface as ProtobufDecode");
assert!(matches!(err, TempoError::ProtobufDecode(_)));
server.await.unwrap();
}
#[tokio::test]
async fn search_traces_happy_path_returns_ids() {
let body = r#"{"traces":[{"traceID":"aaa111"},{"traceID":"bbb222"}]}"#;
let (endpoint, server) = spawn_one_shot_server(http_200_json(body)).await;
let client = http_client::build_client();
let ids = search_traces(
&client,
&endpoint,
"foo-svc",
Duration::from_mins(5),
10,
None,
)
.await
.expect("search must succeed");
assert_eq!(ids, vec!["aaa111".to_string(), "bbb222".to_string()]);
server.await.unwrap();
}
#[tokio::test]
async fn search_traces_empty_result_surfaces_no_traces_found() {
let body = r#"{"traces":[]}"#;
let (endpoint, server) = spawn_one_shot_server(http_200_json(body)).await;
let client = http_client::build_client();
let err = search_traces(
&client,
&endpoint,
"foo-svc",
Duration::from_mins(1),
10,
None,
)
.await
.expect_err("empty search result must be NoTracesFound");
assert!(matches!(err, TempoError::NoTracesFound));
server.await.unwrap();
}
#[tokio::test]
async fn search_traces_malformed_json_surfaces_json_parse() {
let (endpoint, server) = spawn_one_shot_server(http_200_json("not json")).await;
let client = http_client::build_client();
let err = search_traces(
&client,
&endpoint,
"foo-svc",
Duration::from_mins(1),
10,
None,
)
.await
.expect_err("malformed JSON must be JsonParse");
assert!(matches!(err, TempoError::JsonParse(_)));
server.await.unwrap();
}
#[tokio::test]
async fn search_traces_http_500_surfaces_http_status() {
let (endpoint, server) = spawn_one_shot_server(http_status(500, "Internal")).await;
let client = http_client::build_client();
let err = search_traces(
&client,
&endpoint,
"foo-svc",
Duration::from_mins(1),
10,
None,
)
.await
.expect_err("500 must surface as HttpStatus");
match err {
TempoError::HttpStatus { status: 500, .. } => {}
other => panic!("expected HttpStatus {{ status: 500, .. }}, got {other:?}"),
}
server.await.unwrap();
}
#[tokio::test]
async fn ingest_from_tempo_sends_auth_header_on_both_connections() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let search_body = r#"{"traces":[{"traceID":"abcdef"}]}"#;
let search_resp = http_200_json(search_body);
let mut proto_buf = Vec::new();
ExportTraceServiceRequest {
resource_spans: vec![],
}
.encode(&mut proto_buf)
.expect("encode protobuf");
let trace_resp = http_200_proto(&proto_buf);
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = listener.local_addr().expect("addr");
let endpoint = format!("http://{addr}");
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(2);
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.expect("accept 1");
let mut rbuf = vec![0u8; 4096];
let n = socket.read(&mut rbuf).await.expect("read 1");
rbuf.truncate(n);
tx.send(rbuf).await.expect("send 1");
socket.write_all(&search_resp).await.expect("write 1");
let _ = socket.shutdown().await;
drop(socket);
let (mut socket, _) = listener.accept().await.expect("accept 2");
let mut rbuf = vec![0u8; 4096];
let n = socket.read(&mut rbuf).await.expect("read 2");
rbuf.truncate(n);
tx.send(rbuf).await.expect("send 2");
socket.write_all(&trace_resp).await.expect("write 2");
let _ = socket.shutdown().await;
});
let _ = ingest_from_tempo(
&endpoint,
Some("foo-svc"),
None,
Duration::from_mins(5),
5,
Some("Authorization: Bearer topsecret"),
)
.await;
for label in ["search", "fetch"] {
let captured = rx.recv().await.expect("captured request");
let text = std::str::from_utf8(&captured).expect("utf8");
assert!(
text.to_lowercase()
.contains("authorization: bearer topsecret"),
"auth header missing from {label} request, got:\n{text}"
);
}
server.await.expect("server join");
}
#[tokio::test]
async fn ingest_from_tempo_search_then_fetch_aggregates_events() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let search_body = r#"{"traces":[{"traceID":"abcdef"}]}"#;
let search_resp = http_200_json(search_body);
let mut proto_buf = Vec::new();
ExportTraceServiceRequest {
resource_spans: vec![],
}
.encode(&mut proto_buf)
.unwrap();
let trace_resp = http_200_proto(&proto_buf);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let endpoint = format!("http://{addr}");
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut rbuf = [0u8; 4096];
let _ = socket.read(&mut rbuf).await;
let _ = socket.write_all(&search_resp).await;
let _ = socket.shutdown().await;
drop(socket);
let (mut socket, _) = listener.accept().await.unwrap();
let _ = socket.read(&mut rbuf).await;
let _ = socket.write_all(&trace_resp).await;
let _ = socket.shutdown().await;
});
let err = ingest_from_tempo(
&endpoint,
Some("foo-svc"),
None,
Duration::from_mins(5),
5,
None,
)
.await
.expect_err("empty trace must surface as NoTracesFound after loop");
assert!(matches!(err, TempoError::NoTracesFound));
server.await.unwrap();
}
#[test]
fn tempo_error_display_messages_are_informative() {
let e1 = TempoError::InvalidEndpoint("bad".to_string());
let e2 = TempoError::Transport("oops".to_string());
let e3 = TempoError::BodyRead("body".to_string());
let e4 = TempoError::HttpStatus {
status: 418,
url: "http://tempo.example/api/search".to_string(),
};
let e5 = TempoError::Timeout;
let e6 = TempoError::JsonParse("json".to_string());
let e7 = TempoError::ProtobufDecode("proto".to_string());
let e8 = TempoError::TraceNotFound("http://x".to_string());
let e9 = TempoError::NoTracesFound;
let e10 = TempoError::Interrupted;
assert!(format!("{e1}").contains("endpoint"));
assert!(format!("{e2}").contains("transport") || format!("{e2}").contains("Transport"));
assert!(format!("{e3}").contains("body"));
assert!(format!("{e4}").contains("418"));
assert!(format!("{e5}").contains("timed out"));
assert!(format!("{e6}").contains("JSON"));
assert!(format!("{e7}").contains("protobuf") || format!("{e7}").contains("Protobuf"));
assert!(format!("{e8}").contains("not found") || format!("{e8}").contains("Not found"));
assert!(format!("{e9}").contains("no traces") || format!("{e9}").contains("No traces"));
assert!(
format!("{e10}").contains("interrupted") || format!("{e10}").contains("Interrupted")
);
}
#[test]
fn classify_fetch_error_buckets_every_hard_failure_variant() {
assert_eq!(classify_fetch_error(&TempoError::Timeout), "timeout");
assert_eq!(
classify_fetch_error(&TempoError::Transport("x".into())),
"transport"
);
assert_eq!(
classify_fetch_error(&TempoError::HttpStatus {
status: 500,
url: "u".into()
}),
"http_status"
);
assert_eq!(
classify_fetch_error(&TempoError::ProtobufDecode("p".into())),
"protobuf_decode"
);
assert_eq!(
classify_fetch_error(&TempoError::BodyRead("b".into())),
"body_read"
);
assert_eq!(
classify_fetch_error(&TempoError::JsonParse("j".into())),
"json_parse"
);
assert_eq!(
classify_fetch_error(&TempoError::InvalidEndpoint("x".into())),
"other"
);
assert_eq!(classify_fetch_error(&TempoError::NoTracesFound), "other");
}
#[tokio::test]
async fn ingest_from_tempo_drains_mixed_per_trace_outcomes() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
let search_body =
r#"{"traces":[{"traceID":"aaa111"},{"traceID":"bbb222"},{"traceID":"ccc333"}]}"#;
let search_resp = http_200_json(search_body);
let mut empty_proto = Vec::new();
ExportTraceServiceRequest {
resource_spans: vec![],
}
.encode(&mut empty_proto)
.unwrap();
let ok_empty_resp = http_200_proto(&empty_proto);
let http_500 = http_status(500, "Internal");
let http_404 = http_status(404, "Not Found");
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let endpoint = format!("http://{addr}");
let server = tokio::spawn(async move {
let (mut sock, _) = listener.accept().await.unwrap();
let mut rbuf = [0u8; 4096];
let _ = sock.read(&mut rbuf).await;
let _ = sock.write_all(&search_resp).await;
let _ = sock.shutdown().await;
drop(sock);
for _ in 0..3 {
let (mut sock, _) = listener.accept().await.unwrap();
let mut rbuf = [0u8; 4096];
let n = sock.read(&mut rbuf).await.unwrap_or(0);
let req = std::str::from_utf8(&rbuf[..n]).unwrap_or("");
let resp: &[u8] = if req.contains("/api/traces/bbb222") {
&http_500
} else if req.contains("/api/traces/ccc333") {
&http_404
} else {
&ok_empty_resp
};
let _ = sock.write_all(resp).await;
let _ = sock.shutdown().await;
}
});
let err = ingest_from_tempo(
&endpoint,
Some("foo-svc"),
None,
Duration::from_mins(5),
10,
None,
)
.await
.expect_err("mixed-outcome run with only empty successes must surface NoTracesFound");
assert!(
matches!(err, TempoError::NoTracesFound),
"expected NoTracesFound, got {err:?}"
);
server.await.unwrap();
}
}