use std::fs;
use std::path::PathBuf;
use serde_json::json;
use super::otlp::{ExportTraceServiceRequest, KeyValue, Status, key_values_to_map};
use super::store::{MAX_SPANS_PER_TRACE, TraceStore, nanos_to_iso8601};
const TRACE_ID: &str = "13d08563f4c89941166adde225dfd18e";
fn temp_dir(tag: &str) -> PathBuf {
let dir = std::env::temp_dir().join(format!("polaris-otel-test-{}-{tag}", std::process::id()));
let _ = fs::remove_dir_all(&dir);
dir
}
fn single_span_request(trace_id: &str, end: Option<&str>, code: i64) -> ExportTraceServiceRequest {
let mut span = json!({
"traceId": trace_id, "spanId": "root", "name": "op",
"startTimeUnixNano": "1000000000", "status": {"code": code}
});
if let Some(end) = end {
span["endTimeUnixNano"] = json!(end);
}
let body = json!({
"resourceSpans": [{
"resource": {"attributes": []},
"scopeSpans": [{"scope": {"name": "svc"}, "spans": [span]}]
}]
});
serde_json::from_value(body).expect("fixture parses as OTLP")
}
fn sample_request() -> ExportTraceServiceRequest {
let body = serde_json::json!({
"resourceSpans": [{
"resource": {
"attributes": [
{"key": "service.name", "value": {"stringValue": "example-service"}},
{"key": "service.namespace", "value": {"stringValue": "example"}},
{"key": "gen_ai.system", "value": {"stringValue": "example-agent"}}
]
},
"scopeSpans": [{
"scope": {"name": "example.otlp-adapter"},
"spans": [
{
"traceId": TRACE_ID, "spanId": "e58ea7c047ae866f",
"name": "agent.call", "kind": 1,
"startTimeUnixNano": "1779796800000000000",
"endTimeUnixNano": "1779797100000000000",
"attributes": [
{"key": "agent.call.id", "value": {"stringValue": "call-smoke-123"}},
{"key": "agent.ended_reason", "value": {"stringValue": "customer-ended-call"}},
{"key": "agent.cost.total_usd", "value": {"doubleValue": 0.42}},
{"key": "agent.message.count", "value": {"intValue": "6"}}
],
"status": {"code": 0}
},
{
"traceId": TRACE_ID, "spanId": "0c94567ef3122bb4",
"parentSpanId": "e58ea7c047ae866f", "name": "agent.turn.system",
"startTimeUnixNano": "1779796800000000000",
"endTimeUnixNano": "1779796800000000000",
"attributes": [
{"key": "agent.message.role", "value": {"stringValue": "system"}},
{"key": "agent.message.index", "value": {"intValue": "0"}}
],
"status": {"code": 0}
},
{
"traceId": TRACE_ID, "spanId": "6978312d69e7b7e7",
"parentSpanId": "e58ea7c047ae866f", "name": "agent.turn.bot",
"startTimeUnixNano": "1779796801000000000",
"endTimeUnixNano": "1779796803000000000",
"attributes": [
{"key": "agent.message.role", "value": {"stringValue": "bot"}},
{"key": "agent.message.index", "value": {"intValue": "1"}}
],
"status": {"code": 0}
},
{
"traceId": TRACE_ID, "spanId": "7e1484fdbd0e7555",
"parentSpanId": "e58ea7c047ae866f", "name": "agent.turn.user",
"startTimeUnixNano": "1779796804000000000",
"endTimeUnixNano": "1779796806000000000",
"attributes": [
{"key": "agent.message.role", "value": {"stringValue": "user"}},
{"key": "agent.message.index", "value": {"intValue": "2"}}
],
"status": {"code": 0}
},
{
"traceId": TRACE_ID, "spanId": "2a04d3d2109ebb0f",
"parentSpanId": "e58ea7c047ae866f", "name": "agent.tool_call",
"startTimeUnixNano": "1779796807000000000",
"endTimeUnixNano": "1779796807500000000",
"attributes": [
{"key": "agent.message.index", "value": {"intValue": "3"}},
{"key": "agent.tool.name", "value": {"stringValue": "verify_account"}},
{"key": "agent.tool.error", "value": {"boolValue": false}}
],
"status": {"code": 0}
},
{
"traceId": TRACE_ID, "spanId": "2ea4d228e11111be",
"parentSpanId": "e58ea7c047ae866f", "name": "agent.tool_call",
"startTimeUnixNano": "1779796807000000000",
"endTimeUnixNano": "1779796808200000000",
"attributes": [
{"key": "agent.message.index", "value": {"intValue": "3"}},
{"key": "agent.tool.name", "value": {"stringValue": "submit_form"}},
{"key": "agent.tool.error", "value": {"boolValue": true}}
],
"status": {"code": 2, "message": "submit failed"}
}
]
}]
}]
});
serde_json::from_value(body).expect("fixture parses as OTLP")
}
#[test]
fn ingest_groups_spans_into_one_run() {
let store = TraceStore::new();
let n = store.ingest(sample_request());
assert_eq!(n, 6, "all six spans accepted");
let runs = store.list_runs();
assert_eq!(runs.len(), 1);
let run = &runs[0];
assert_eq!(run.run_id, TRACE_ID);
assert!(run.agent_name.is_none());
}
#[test]
fn resource_attributes_become_run_labels() {
let store = TraceStore::new();
store.ingest(sample_request());
let run = &store.list_runs()[0];
assert_eq!(run.labels["service.name"], "example-service");
assert_eq!(run.labels["service.namespace"], "example");
assert_eq!(run.labels["gen_ai.system"], "example-agent");
}
#[test]
fn run_cost_generalises_to_non_polaris_total() {
let store = TraceStore::new();
store.ingest(sample_request());
let run = &store.list_runs()[0];
assert!((run.cost_usd - 0.42).abs() < 1e-9);
assert_eq!(run.input_tokens, 0);
assert_eq!(run.output_tokens, 0);
}
#[test]
fn error_status_marks_the_run_failed() {
let store = TraceStore::new();
store.ingest(sample_request());
assert_eq!(store.list_runs()[0].outcome.as_deref(), Some("error"));
}
#[test]
fn tree_has_one_root_with_five_sorted_children() {
let store = TraceStore::new();
store.ingest(sample_request());
let tree = store.run_tree(TRACE_ID).expect("tree exists");
assert_eq!(tree.roots.len(), 1, "single root span");
assert!(tree.orphans.is_empty());
let root = &tree.roots[0];
assert_eq!(root.name, "agent.call");
assert_eq!(root.children.len(), 5);
assert_eq!(root.children[0].name, "agent.turn.system");
let errored = root
.children
.iter()
.find(|c| c.fields.get("agent.tool.name").and_then(|v| v.as_str()) == Some("submit_form"))
.expect("submit_form span present");
assert_eq!(errored.level, "error");
assert_eq!(errored.fields["otel.status_code"], serde_json::json!(2));
assert_eq!(errored.target, "example.otlp-adapter");
}
#[test]
fn timestamps_render_as_iso8601() {
assert_eq!(
nanos_to_iso8601(1_700_000_000_000_000_000),
"2023-11-14T22:13:20.000Z"
);
assert_eq!(nanos_to_iso8601(0), "1970-01-01T00:00:00.000Z");
}
#[test]
fn capacity_evicts_oldest_traces() {
let store = TraceStore::with_capacity(2);
for i in 0..3u8 {
let body = serde_json::json!({
"resourceSpans": [{
"resource": {"attributes": []},
"scopeSpans": [{
"scope": {"name": "svc"},
"spans": [{
"traceId": format!("trace-{i}"), "spanId": "a",
"name": "op", "startTimeUnixNano": "1000000000",
"endTimeUnixNano": "2000000000", "status": {"code": 0}
}]
}]
}]
});
store.ingest(serde_json::from_value(body).unwrap());
}
let runs = store.list_runs();
assert_eq!(runs.len(), 2, "capacity bounds retained traces");
assert_eq!(runs[0].run_id, "trace-2");
assert!(store.run_tree("trace-0").is_none());
}
#[test]
fn ttl_evicts_idle_traces_but_keeps_active_ones() {
use std::time::{Duration, Instant};
let ttl = Duration::from_secs(60);
let store = TraceStore::with_capacity(10).with_ttl(ttl);
let t0 = Instant::now();
store.ingest_at(single_span_request("old", Some("2000000000"), 0), t0);
let almost = t0 + ttl - Duration::from_secs(1);
assert_eq!(store.list_runs_at(almost).len(), 1);
assert!(store.run_tree_at("old", almost).is_some());
let later = t0 + Duration::from_secs(90);
store.ingest_at(single_span_request("new", Some("2000000000"), 0), later);
let runs = store.list_runs_at(later);
assert_eq!(runs.len(), 1, "the idle trace aged out");
assert_eq!(runs[0].run_id, "new");
assert!(store.run_tree_at("old", later).is_none());
store.ingest_at(single_span_request("new", Some("2000000000"), 0), later);
let kept_alive = later + ttl - Duration::from_secs(1);
assert!(store.run_tree_at("new", kept_alive).is_some());
}
#[test]
fn out_of_order_and_multi_batch_ingest_assembles_one_tree() {
let store = TraceStore::new();
let child = serde_json::json!({
"resourceSpans": [{
"resource": {"attributes": [{"key": "service.name", "value": {"stringValue": "svc"}}]},
"scopeSpans": [{
"scope": {"name": "svc"},
"spans": [{
"traceId": "t", "spanId": "child", "parentSpanId": "root",
"name": "child", "startTimeUnixNano": "2000000000",
"endTimeUnixNano": "3000000000", "status": {"code": 0}
}]
}]
}]
});
store.ingest(serde_json::from_value(child).unwrap());
assert_eq!(store.run_tree("t").unwrap().orphans.len(), 1);
let root = serde_json::json!({
"resourceSpans": [{
"resource": {"attributes": [{"key": "service.name", "value": {"stringValue": "svc"}}]},
"scopeSpans": [{
"scope": {"name": "svc"},
"spans": [{
"traceId": "t", "spanId": "root",
"name": "root", "startTimeUnixNano": "1000000000",
"endTimeUnixNano": "4000000000", "status": {"code": 0}
}]
}]
}]
});
store.ingest(serde_json::from_value(root).unwrap());
let tree = store.run_tree("t").unwrap();
assert!(tree.orphans.is_empty());
assert_eq!(tree.roots.len(), 1);
assert_eq!(tree.roots[0].children.len(), 1);
}
#[test]
fn persistence_survives_a_restart() {
let dir = temp_dir("roundtrip");
{
let store = TraceStore::new().with_persistence(&dir);
assert_eq!(store.ingest(sample_request()), 6);
}
let reloaded = TraceStore::new().with_persistence(&dir);
let runs = reloaded.list_runs();
assert_eq!(runs.len(), 1, "the persisted trace is loaded back");
assert_eq!(runs[0].run_id, TRACE_ID);
let tree = reloaded
.run_tree(TRACE_ID)
.expect("tree restored from disk");
assert_eq!(tree.roots.len(), 1);
assert_eq!(
tree.roots[0].children.len(),
5,
"all six spans round-tripped"
);
fs::remove_dir_all(&dir).ok();
}
#[test]
fn persistence_neutralises_hostile_trace_ids() {
let dir = temp_dir("traversal");
let store = TraceStore::new().with_persistence(&dir);
store.ingest(single_span_request(
"../../etc/passwd",
Some("2000000000"),
0,
));
let entries: Vec<_> = fs::read_dir(&dir).unwrap().flatten().collect();
assert_eq!(
entries.len(),
1,
"exactly one trace file, written inside the dir"
);
let name = entries[0].file_name().into_string().unwrap();
assert!(
!name.contains('/') && !name.contains(".."),
"filename neutralised, got {name:?}"
);
assert!(name.ends_with(".jsonl"));
assert!(!dir.parent().unwrap().join("etc").exists());
fs::remove_dir_all(&dir).ok();
}
#[test]
fn disk_ttl_removes_file_as_trace_ages_out() {
use std::time::{Duration, Instant};
let dir = temp_dir("disk-ttl-evict");
let ttl = Duration::from_secs(60);
let store = TraceStore::with_capacity(10)
.with_ttl(ttl)
.with_persistence(&dir);
let t0 = Instant::now();
store.ingest_at(single_span_request("old", Some("2000000000"), 0), t0);
assert!(dir.join("old.jsonl").exists(), "trace persisted");
let later = t0 + Duration::from_secs(90);
store.ingest_at(single_span_request("new", Some("2000000000"), 0), later);
assert!(
!dir.join("old.jsonl").exists(),
"expired trace's file removed alongside the in-memory eviction"
);
assert!(dir.join("new.jsonl").exists(), "the live trace stays on disk");
fs::remove_dir_all(&dir).ok();
}
#[test]
fn disk_ttl_keeps_file_for_a_trace_revived_in_the_same_batch() {
use std::time::{Duration, Instant};
let dir = temp_dir("disk-ttl-revive");
let ttl = Duration::from_secs(60);
let store = TraceStore::with_capacity(10)
.with_ttl(ttl)
.with_persistence(&dir);
let t0 = Instant::now();
store.ingest_at(single_span_request("t", Some("2000000000"), 0), t0);
let later = t0 + Duration::from_secs(90);
store.ingest_at(single_span_request("t", Some("2000000000"), 0), later);
assert!(
dir.join("t.jsonl").exists(),
"a trace revived by the same batch keeps its file"
);
assert!(store.run_tree_at("t", later).is_some(), "and stays live");
fs::remove_dir_all(&dir).ok();
}
#[test]
fn disk_ttl_prunes_stale_files_on_load() {
use std::time::Duration;
let dir = temp_dir("disk-ttl-load");
let ttl = Duration::from_secs(60);
{
let store = TraceStore::new().with_persistence(&dir);
store.ingest(single_span_request("stale", Some("2000000000"), 0));
}
let file = dir.join("stale.jsonl");
let mtime = fs::metadata(&file).unwrap().modified().unwrap();
let reloaded = TraceStore::new()
.with_ttl(ttl)
.with_persistence_at(&dir, mtime + ttl + Duration::from_secs(1));
assert!(
reloaded.list_runs().is_empty(),
"stale history is not reloaded"
);
assert!(!file.exists(), "stale file is deleted on load");
fs::remove_dir_all(&dir).ok();
}
#[test]
fn disk_ttl_keeps_fresh_files_on_load() {
use std::time::Duration;
let dir = temp_dir("disk-ttl-load-keep");
let ttl = Duration::from_secs(60);
{
let store = TraceStore::new().with_persistence(&dir);
store.ingest(single_span_request("fresh", Some("2000000000"), 0));
}
let mtime = fs::metadata(dir.join("fresh.jsonl")).unwrap().modified().unwrap();
let reloaded = TraceStore::new()
.with_ttl(ttl)
.with_persistence_at(&dir, mtime + Duration::from_secs(1));
assert_eq!(reloaded.list_runs().len(), 1, "fresh history reloads");
assert!(dir.join("fresh.jsonl").exists());
fs::remove_dir_all(&dir).ok();
}
#[test]
fn disk_ttl_set_after_persistence_still_prunes_on_load() {
use std::time::Duration;
let dir = temp_dir("disk-ttl-after-persist");
let ttl = Duration::from_secs(60);
{
let store = TraceStore::new().with_persistence(&dir);
store.ingest(single_span_request("stale", Some("2000000000"), 0));
}
let file = dir.join("stale.jsonl");
let mtime = fs::metadata(&file).unwrap().modified().unwrap();
let reloaded = TraceStore::new()
.with_persistence(&dir)
.with_ttl_at(ttl, mtime + ttl + Duration::from_secs(1));
assert!(
reloaded.list_runs().is_empty(),
"stale history pruned even though the TTL was set after persistence"
);
assert!(!file.exists(), "stale file deleted by the re-run load");
fs::remove_dir_all(&dir).ok();
}
#[test]
fn persistence_without_ttl_stays_an_unbounded_archive() {
use std::time::{Duration, Instant};
let dir = temp_dir("no-ttl-archive");
let store = TraceStore::with_capacity(1).with_persistence(&dir);
let t0 = Instant::now();
store.ingest_at(single_span_request("a", Some("2000000000"), 0), t0);
store.ingest_at(
single_span_request("b", Some("2000000000"), 0),
t0 + Duration::from_secs(99999),
);
assert!(store.run_tree("a").is_none(), "capacity-evicted from memory");
assert!(dir.join("a.jsonl").exists(), "no TTL → file retained on disk");
assert!(dir.join("b.jsonl").exists());
fs::remove_dir_all(&dir).ok();
}
#[test]
fn status_is_error_across_numeric_and_string_forms() {
let status = |code| Status {
code: Some(code),
message: None,
};
assert!(status(json!(2)).is_error());
assert!(status(json!("2")).is_error());
assert!(status(json!("STATUS_CODE_ERROR")).is_error());
assert!(status(json!("error")).is_error(), "case-insensitive");
assert!(!status(json!(0)).is_error());
assert!(!status(json!(1)).is_error());
assert!(!status(json!("OK")).is_error());
assert!(
!Status::default().is_error(),
"absent status is not an error"
);
}
#[test]
fn outcome_is_success_when_all_spans_close_without_error() {
let store = TraceStore::new();
store.ingest(single_span_request("ok", Some("2000000000"), 0));
assert_eq!(store.list_runs()[0].outcome.as_deref(), Some("success"));
}
#[test]
fn outcome_is_open_while_a_span_has_no_end() {
let store = TraceStore::new();
store.ingest(single_span_request("open", None, 0));
assert_eq!(store.list_runs()[0].outcome, None);
}
#[test]
fn gen_ai_tokens_and_cost_accumulate() {
let store = TraceStore::new();
let body = json!({
"resourceSpans": [{
"resource": {"attributes": []},
"scopeSpans": [{
"scope": {"name": "svc"},
"spans": [{
"traceId": "g", "spanId": "r", "name": "llm",
"startTimeUnixNano": "1000000000", "endTimeUnixNano": "2000000000",
"attributes": [
{"key": "gen_ai.usage.input_tokens", "value": {"intValue": "100"}},
{"key": "gen_ai.usage.output_tokens", "value": {"intValue": "50"}},
{"key": "gen_ai.usage.cost_usd", "value": {"doubleValue": 0.0125}}
],
"status": {"code": 0}
}]
}]
}]
});
store.ingest(serde_json::from_value(body).unwrap());
let run = &store.list_runs()[0];
assert_eq!(run.input_tokens, 100);
assert_eq!(run.output_tokens, 50);
assert!((run.cost_usd - 0.0125).abs() < 1e-9);
}
#[test]
fn any_value_collapses_every_otlp_variant() {
let kvs: Vec<KeyValue> = serde_json::from_value(json!([
{"key": "s", "value": {"stringValue": "hi"}},
{"key": "i", "value": {"intValue": "7"}},
{"key": "d", "value": {"doubleValue": 1.5}},
{"key": "b", "value": {"boolValue": true}},
{"key": "arr", "value": {"arrayValue": {"values": [{"stringValue": "x"}, {"intValue": "2"}]}}},
{"key": "kv", "value": {"kvlistValue": {"values": [{"key": "nested", "value": {"stringValue": "y"}}]}}},
{"key": "by", "value": {"bytesValue": "deadbeef"}}
]))
.unwrap();
let map = key_values_to_map(&kvs);
assert_eq!(map["s"], json!("hi"));
assert_eq!(
map["i"],
json!(7),
"intValue string parses back to an integer"
);
assert_eq!(map["d"], json!(1.5));
assert_eq!(map["b"], json!(true));
assert_eq!(map["arr"], json!(["x", 2]));
assert_eq!(map["kv"], json!({"nested": "y"}));
assert_eq!(map["by"], json!("deadbeef"));
}
#[test]
fn with_capacity_zero_clamps_to_one() {
let store = TraceStore::with_capacity(0);
store.ingest(single_span_request("a", Some("2000000000"), 0));
store.ingest(single_span_request("b", Some("2000000000"), 0));
let runs = store.list_runs();
assert_eq!(runs.len(), 1, "capacity floored at one, not zero");
assert_eq!(runs[0].run_id, "b", "newest trace retained");
}
#[test]
fn ingest_otlp_json_accepts_bytes_and_rejects_garbage() {
let store = TraceStore::new();
let body = br#"{"resourceSpans":[{"resource":{"attributes":[]},
"scopeSpans":[{"scope":{"name":"svc"},"spans":[
{"traceId":"j","spanId":"root","name":"op",
"startTimeUnixNano":"1000000000","endTimeUnixNano":"2000000000",
"status":{"code":0}}]}]}]}"#;
assert_eq!(store.ingest_otlp_json(body).unwrap(), 1);
assert_eq!(store.list_runs().len(), 1);
assert!(store.ingest_otlp_json(b"not json").is_err());
}
#[test]
fn per_trace_span_cap_bounds_growth() {
let store = TraceStore::new();
let over = MAX_SPANS_PER_TRACE + 50;
let spans: Vec<_> = (0..over)
.map(|i| {
json!({
"traceId": "flood", "spanId": format!("s{i}"), "name": "op",
"startTimeUnixNano": "1000000000", "endTimeUnixNano": "2000000000",
"status": {"code": 0}
})
})
.collect();
let body = json!({
"resourceSpans": [{
"resource": {"attributes": []},
"scopeSpans": [{"scope": {"name": "svc"}, "spans": spans}]
}]
});
store.ingest(serde_json::from_value(body).unwrap());
let tree = store.run_tree("flood").expect("trace exists");
assert!(tree.orphans.is_empty());
assert_eq!(
tree.roots.len(),
MAX_SPANS_PER_TRACE,
"per-trace span growth is capped; excess new spans are dropped"
);
}
#[test]
fn token_sums_saturate_instead_of_overflowing() {
let store = TraceStore::new();
let huge = u64::MAX.to_string();
let span = |id: &str| {
json!({
"traceId": "sat", "spanId": id, "name": "llm",
"startTimeUnixNano": "1000000000", "endTimeUnixNano": "2000000000",
"attributes": [
{"key": "gen_ai.usage.input_tokens", "value": {"stringValue": huge}},
{"key": "gen_ai.usage.output_tokens", "value": {"stringValue": huge}}
],
"status": {"code": 0}
})
};
let body = json!({
"resourceSpans": [{
"resource": {"attributes": []},
"scopeSpans": [{"scope": {"name": "svc"}, "spans": [span("a"), span("b")]}]
}]
});
store.ingest(serde_json::from_value(body).unwrap());
let run = &store.list_runs()[0];
assert_eq!(run.input_tokens, u64::MAX);
assert_eq!(run.output_tokens, u64::MAX);
}
#[test]
fn plugin_with_persistence_reloads_history() {
let dir = temp_dir("plugin-persist");
{
let plugin = super::OtelTracingPlugin::builder()
.with_persistence(&dir)
.build();
assert_eq!(plugin.store().ingest(sample_request()), 6);
}
let plugin = super::OtelTracingPlugin::builder()
.with_persistence(&dir)
.build();
assert_eq!(
plugin.store().list_runs().len(),
1,
"history reloaded through the plugin builder"
);
fs::remove_dir_all(&dir).ok();
}
#[test]
fn plugin_builder_composes_every_knob() {
let dir = temp_dir("builder-compose");
let plugin = super::OtelTracingPlugin::builder()
.with_ttl(std::time::Duration::from_secs(3600))
.with_capacity(2)
.with_persistence(&dir)
.build();
for id in ["a", "b", "c"] {
plugin
.store()
.ingest(single_span_request(id, Some("2000000000"), 0));
}
let runs = plugin.store().list_runs();
assert_eq!(runs.len(), 2, "capacity bound applied from the builder");
assert_eq!(runs[0].run_id, "c");
assert!(fs::read_dir(&dir).unwrap().flatten().next().is_some());
fs::remove_dir_all(&dir).ok();
}
#[test]
fn timestamps_render_before_the_epoch() {
assert_eq!(
nanos_to_iso8601(-86_400 * 1_000_000_000),
"1969-12-31T00:00:00.000Z"
);
}