use std::collections::BTreeMap;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TelemetryEvent {
pub stage: String,
pub subject_id: String,
pub status: String,
pub attrs: BTreeMap<String, String>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MetricPoint {
pub metric_name: String,
pub stage: String,
pub subject_id: String,
pub value: f64,
pub unit: String,
pub observed_at: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TraceSpan {
pub trace_id: String,
pub run_id: String,
pub stage: String,
pub subject_id: String,
pub started_at: String,
pub ended_at: Option<String>,
pub status: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AuditRecord {
pub actor_id: String,
pub action_type: String,
pub subject_id: String,
pub event_time: String,
pub evidence_ref: String,
}
#[derive(Debug, Clone, Default)]
pub struct TelemetryBuffer {
events: Vec<TelemetryEvent>,
metrics: Vec<MetricPoint>,
spans: Vec<TraceSpan>,
audits: Vec<AuditRecord>,
}
impl TelemetryBuffer {
pub fn push(&mut self, event: TelemetryEvent) {
self.events.push(event);
}
pub fn query_by_subject(&self, subject_id: &str) -> Vec<TelemetryEvent> {
self.events
.iter()
.filter(|e| e.subject_id == subject_id)
.cloned()
.collect()
}
pub fn count_by_stage(&self, stage: &str) -> usize {
self.events.iter().filter(|e| e.stage == stage).count()
}
pub fn record_metric(&mut self, point: MetricPoint) {
self.metrics.push(point);
}
pub fn query_metric(&self, metric_name: &str, stage: &str) -> Vec<MetricPoint> {
self.metrics
.iter()
.filter(|m| m.metric_name == metric_name && m.stage == stage)
.cloned()
.collect()
}
pub fn start_span(&mut self, span: TraceSpan) {
self.spans.push(span);
}
pub fn end_span(&mut self, trace_id: &str, stage: &str, ended_at: &str, status: &str) -> bool {
if let Some(span) = self
.spans
.iter_mut()
.rev()
.find(|s| s.trace_id == trace_id && s.stage == stage && s.ended_at.is_none())
{
span.ended_at = Some(ended_at.to_string());
span.status = Some(status.to_string());
true
} else {
false
}
}
pub fn query_trace(&self, trace_id: &str) -> Vec<TraceSpan> {
self.spans
.iter()
.filter(|s| s.trace_id == trace_id)
.cloned()
.collect()
}
pub fn record_audit(&mut self, record: AuditRecord) {
self.audits.push(record);
}
pub fn query_audit(
&self,
actor_id: Option<&str>,
action_type: Option<&str>,
start_time: Option<&str>,
end_time: Option<&str>,
) -> Vec<AuditRecord> {
let start = start_time.and_then(parse_rfc3339);
let end = end_time.and_then(parse_rfc3339);
self.audits
.iter()
.filter(|record| {
actor_id.is_none_or(|actor| record.actor_id == actor)
&& action_type.is_none_or(|action| record.action_type == action)
&& parse_rfc3339(&record.event_time).is_some_and(|event_time| {
start.is_none_or(|s| event_time >= s) && end.is_none_or(|e| event_time <= e)
})
})
.cloned()
.collect()
}
}
fn parse_rfc3339(value: &str) -> Option<OffsetDateTime> {
OffsetDateTime::parse(value, &Rfc3339).ok()
}
pub fn make_event(stage: &str, subject_id: &str, status: &str) -> TelemetryEvent {
TelemetryEvent {
stage: stage.to_string(),
subject_id: subject_id.to_string(),
status: status.to_string(),
attrs: BTreeMap::new(),
}
}
pub fn make_metric(
metric_name: &str,
stage: &str,
subject_id: &str,
value: f64,
unit: &str,
observed_at: &str,
) -> MetricPoint {
MetricPoint {
metric_name: metric_name.to_string(),
stage: stage.to_string(),
subject_id: subject_id.to_string(),
value,
unit: unit.to_string(),
observed_at: observed_at.to_string(),
}
}
pub fn make_span(
trace_id: &str,
run_id: &str,
stage: &str,
subject_id: &str,
started_at: &str,
) -> TraceSpan {
TraceSpan {
trace_id: trace_id.to_string(),
run_id: run_id.to_string(),
stage: stage.to_string(),
subject_id: subject_id.to_string(),
started_at: started_at.to_string(),
ended_at: None,
status: None,
}
}
pub fn make_audit_record(
actor_id: &str,
action_type: &str,
subject_id: &str,
event_time: &str,
evidence_ref: &str,
) -> AuditRecord {
AuditRecord {
actor_id: actor_id.to_string(),
action_type: action_type.to_string(),
subject_id: subject_id.to_string(),
event_time: event_time.to_string(),
evidence_ref: evidence_ref.to_string(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::BTreeMap;
#[test]
fn telemetry_buffer_push_and_query() {
let mut buf = TelemetryBuffer::default();
buf.push(make_event("S0", "sub-1", "ok"));
buf.push(make_event("S1", "sub-1", "ok"));
buf.push(make_event("S1", "sub-2", "ok"));
let events = buf.query_by_subject("sub-1");
assert_eq!(events.len(), 2);
assert_eq!(buf.count_by_stage("S1"), 2);
}
#[test]
fn telemetry_metrics_and_trace_query() {
let mut buf = TelemetryBuffer::default();
buf.record_metric(make_metric(
"verification_latency_ms",
"S7",
"sub-1",
41.0,
"ms",
"2026-01-01T00:00:01Z",
));
let mut attrs = BTreeMap::new();
attrs.insert("trace_id".to_string(), "trace-1".to_string());
attrs.insert("run_id".to_string(), "run-1".to_string());
buf.push(TelemetryEvent {
stage: "S7".to_string(),
subject_id: "sub-1".to_string(),
status: "ok".to_string(),
attrs,
});
buf.start_span(make_span(
"trace-1",
"run-1",
"S7",
"sub-1",
"2026-01-01T00:00:00Z",
));
assert!(buf.end_span("trace-1", "S7", "2026-01-01T00:00:02Z", "ok"));
let metrics = buf.query_metric("verification_latency_ms", "S7");
assert_eq!(metrics.len(), 1);
assert_eq!(metrics[0].value, 41.0);
let spans = buf.query_trace("trace-1");
assert_eq!(spans.len(), 1);
assert_eq!(spans[0].status.as_deref(), Some("ok"));
}
#[test]
fn telemetry_audit_query_filters_actor_action_and_time() {
let mut buf = TelemetryBuffer::default();
buf.record_audit(make_audit_record(
"actor-1",
"publish",
"dataset-1",
"2026-01-01T00:30:00Z",
"publish://1",
));
buf.record_audit(make_audit_record(
"actor-2",
"verify",
"dataset-1",
"2026-01-01T00:40:00Z",
"verify://1",
));
let filtered = buf.query_audit(
Some("actor-1"),
Some("publish"),
Some("2026-01-01T00:00:00Z"),
Some("2026-01-01T00:35:00Z"),
);
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0].evidence_ref, "publish://1");
}
}