trazaeo 0.5.0

Open-source provenance SDK and specification for verifiable EO and climate data workflows
Documentation
use std::collections::BTreeMap;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TelemetryEvent {
    pub stage: String,
    pub subject_id: String,
    pub status: String,
    pub attrs: BTreeMap<String, String>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct MetricPoint {
    pub metric_name: String,
    pub stage: String,
    pub subject_id: String,
    pub value: f64,
    pub unit: String,
    pub observed_at: String,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TraceSpan {
    pub trace_id: String,
    pub run_id: String,
    pub stage: String,
    pub subject_id: String,
    pub started_at: String,
    pub ended_at: Option<String>,
    pub status: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AuditRecord {
    pub actor_id: String,
    pub action_type: String,
    pub subject_id: String,
    pub event_time: String,
    pub evidence_ref: String,
}

#[derive(Debug, Clone, Default)]
pub struct TelemetryBuffer {
    events: Vec<TelemetryEvent>,
    metrics: Vec<MetricPoint>,
    spans: Vec<TraceSpan>,
    audits: Vec<AuditRecord>,
}

impl TelemetryBuffer {
    /// Handles push.
    pub fn push(&mut self, event: TelemetryEvent) {
        self.events.push(event);
    }

    /// Queries by subject.
    pub fn query_by_subject(&self, subject_id: &str) -> Vec<TelemetryEvent> {
        self.events
            .iter()
            .filter(|e| e.subject_id == subject_id)
            .cloned()
            .collect()
    }

    /// Handles count by stage.
    pub fn count_by_stage(&self, stage: &str) -> usize {
        self.events.iter().filter(|e| e.stage == stage).count()
    }

    /// Records metric.
    pub fn record_metric(&mut self, point: MetricPoint) {
        self.metrics.push(point);
    }

    /// Queries metric.
    pub fn query_metric(&self, metric_name: &str, stage: &str) -> Vec<MetricPoint> {
        self.metrics
            .iter()
            .filter(|m| m.metric_name == metric_name && m.stage == stage)
            .cloned()
            .collect()
    }

    /// Starts span.
    pub fn start_span(&mut self, span: TraceSpan) {
        self.spans.push(span);
    }

    /// Ends span.
    pub fn end_span(&mut self, trace_id: &str, stage: &str, ended_at: &str, status: &str) -> bool {
        if let Some(span) = self
            .spans
            .iter_mut()
            .rev()
            .find(|s| s.trace_id == trace_id && s.stage == stage && s.ended_at.is_none())
        {
            span.ended_at = Some(ended_at.to_string());
            span.status = Some(status.to_string());
            true
        } else {
            false
        }
    }

    /// Queries trace.
    pub fn query_trace(&self, trace_id: &str) -> Vec<TraceSpan> {
        self.spans
            .iter()
            .filter(|s| s.trace_id == trace_id)
            .cloned()
            .collect()
    }

    /// Records audit.
    pub fn record_audit(&mut self, record: AuditRecord) {
        self.audits.push(record);
    }

    /// Queries audit.
    pub fn query_audit(
        &self,
        actor_id: Option<&str>,
        action_type: Option<&str>,
        start_time: Option<&str>,
        end_time: Option<&str>,
    ) -> Vec<AuditRecord> {
        let start = start_time.and_then(parse_rfc3339);
        let end = end_time.and_then(parse_rfc3339);
        self.audits
            .iter()
            .filter(|record| {
                actor_id.is_none_or(|actor| record.actor_id == actor)
                    && action_type.is_none_or(|action| record.action_type == action)
                    && parse_rfc3339(&record.event_time).is_some_and(|event_time| {
                        start.is_none_or(|s| event_time >= s) && end.is_none_or(|e| event_time <= e)
                    })
            })
            .cloned()
            .collect()
    }
}

/// Parses rfc3339.
fn parse_rfc3339(value: &str) -> Option<OffsetDateTime> {
    OffsetDateTime::parse(value, &Rfc3339).ok()
}

/// Creates event.
pub fn make_event(stage: &str, subject_id: &str, status: &str) -> TelemetryEvent {
    TelemetryEvent {
        stage: stage.to_string(),
        subject_id: subject_id.to_string(),
        status: status.to_string(),
        attrs: BTreeMap::new(),
    }
}

/// Creates metric.
pub fn make_metric(
    metric_name: &str,
    stage: &str,
    subject_id: &str,
    value: f64,
    unit: &str,
    observed_at: &str,
) -> MetricPoint {
    MetricPoint {
        metric_name: metric_name.to_string(),
        stage: stage.to_string(),
        subject_id: subject_id.to_string(),
        value,
        unit: unit.to_string(),
        observed_at: observed_at.to_string(),
    }
}

/// Creates span.
pub fn make_span(
    trace_id: &str,
    run_id: &str,
    stage: &str,
    subject_id: &str,
    started_at: &str,
) -> TraceSpan {
    TraceSpan {
        trace_id: trace_id.to_string(),
        run_id: run_id.to_string(),
        stage: stage.to_string(),
        subject_id: subject_id.to_string(),
        started_at: started_at.to_string(),
        ended_at: None,
        status: None,
    }
}

/// Creates audit record.
pub fn make_audit_record(
    actor_id: &str,
    action_type: &str,
    subject_id: &str,
    event_time: &str,
    evidence_ref: &str,
) -> AuditRecord {
    AuditRecord {
        actor_id: actor_id.to_string(),
        action_type: action_type.to_string(),
        subject_id: subject_id.to_string(),
        event_time: event_time.to_string(),
        evidence_ref: evidence_ref.to_string(),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::collections::BTreeMap;

    /// Tests that telemetry buffer push and query.
    #[test]
    fn telemetry_buffer_push_and_query() {
        let mut buf = TelemetryBuffer::default();
        buf.push(make_event("S0", "sub-1", "ok"));
        buf.push(make_event("S1", "sub-1", "ok"));
        buf.push(make_event("S1", "sub-2", "ok"));

        let events = buf.query_by_subject("sub-1");
        assert_eq!(events.len(), 2);
        assert_eq!(buf.count_by_stage("S1"), 2);
    }

    /// Tests that telemetry metrics and trace query.
    #[test]
    fn telemetry_metrics_and_trace_query() {
        let mut buf = TelemetryBuffer::default();
        buf.record_metric(make_metric(
            "verification_latency_ms",
            "S7",
            "sub-1",
            41.0,
            "ms",
            "2026-01-01T00:00:01Z",
        ));
        let mut attrs = BTreeMap::new();
        attrs.insert("trace_id".to_string(), "trace-1".to_string());
        attrs.insert("run_id".to_string(), "run-1".to_string());
        buf.push(TelemetryEvent {
            stage: "S7".to_string(),
            subject_id: "sub-1".to_string(),
            status: "ok".to_string(),
            attrs,
        });
        buf.start_span(make_span(
            "trace-1",
            "run-1",
            "S7",
            "sub-1",
            "2026-01-01T00:00:00Z",
        ));
        assert!(buf.end_span("trace-1", "S7", "2026-01-01T00:00:02Z", "ok"));

        let metrics = buf.query_metric("verification_latency_ms", "S7");
        assert_eq!(metrics.len(), 1);
        assert_eq!(metrics[0].value, 41.0);
        let spans = buf.query_trace("trace-1");
        assert_eq!(spans.len(), 1);
        assert_eq!(spans[0].status.as_deref(), Some("ok"));
    }

    /// Tests that telemetry audit query filters actor action and time.
    #[test]
    fn telemetry_audit_query_filters_actor_action_and_time() {
        let mut buf = TelemetryBuffer::default();
        buf.record_audit(make_audit_record(
            "actor-1",
            "publish",
            "dataset-1",
            "2026-01-01T00:30:00Z",
            "publish://1",
        ));
        buf.record_audit(make_audit_record(
            "actor-2",
            "verify",
            "dataset-1",
            "2026-01-01T00:40:00Z",
            "verify://1",
        ));

        let filtered = buf.query_audit(
            Some("actor-1"),
            Some("publish"),
            Some("2026-01-01T00:00:00Z"),
            Some("2026-01-01T00:35:00Z"),
        );
        assert_eq!(filtered.len(), 1);
        assert_eq!(filtered[0].evidence_ref, "publish://1");
    }
}