Skip to main content

harn_vm/
trust_graph.rs

1use std::collections::{BTreeMap, HashMap};
2use std::sync::Arc;
3
4use serde::{Deserialize, Serialize};
5use time::OffsetDateTime;
6use uuid::Uuid;
7
8use crate::event_log::{
9    active_event_log, sanitize_topic_component, AnyEventLog, EventLog, LogError, LogEvent, Topic,
10};
11
12pub const OPENTRUSTGRAPH_SCHEMA_V0: &str = "opentrustgraph/v0";
13pub const TRUST_GRAPH_GLOBAL_TOPIC: &str = "trust.graph";
14pub const TRUST_GRAPH_TOPIC_PREFIX: &str = "trust.graph.";
15
16#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
17#[serde(rename_all = "snake_case")]
18pub enum AutonomyTier {
19    Shadow,
20    Suggest,
21    ActWithApproval,
22    #[default]
23    ActAuto,
24}
25
26impl AutonomyTier {
27    pub fn as_str(self) -> &'static str {
28        match self {
29            Self::Shadow => "shadow",
30            Self::Suggest => "suggest",
31            Self::ActWithApproval => "act_with_approval",
32            Self::ActAuto => "act_auto",
33        }
34    }
35}
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
38#[serde(rename_all = "snake_case")]
39pub enum TrustOutcome {
40    Success,
41    Failure,
42    Denied,
43    Timeout,
44}
45
46impl TrustOutcome {
47    pub fn as_str(self) -> &'static str {
48        match self {
49            Self::Success => "success",
50            Self::Failure => "failure",
51            Self::Denied => "denied",
52            Self::Timeout => "timeout",
53        }
54    }
55}
56
57#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
58pub struct TrustRecord {
59    pub schema: String,
60    pub record_id: String,
61    pub agent: String,
62    pub action: String,
63    pub approver: Option<String>,
64    pub outcome: TrustOutcome,
65    pub trace_id: String,
66    pub autonomy_tier: AutonomyTier,
67    #[serde(with = "time::serde::rfc3339")]
68    pub timestamp: OffsetDateTime,
69    pub cost_usd: Option<f64>,
70    #[serde(default)]
71    pub metadata: BTreeMap<String, serde_json::Value>,
72}
73
74impl TrustRecord {
75    pub fn new(
76        agent: impl Into<String>,
77        action: impl Into<String>,
78        approver: Option<String>,
79        outcome: TrustOutcome,
80        trace_id: impl Into<String>,
81        autonomy_tier: AutonomyTier,
82    ) -> Self {
83        Self {
84            schema: OPENTRUSTGRAPH_SCHEMA_V0.to_string(),
85            record_id: Uuid::now_v7().to_string(),
86            agent: agent.into(),
87            action: action.into(),
88            approver,
89            outcome,
90            trace_id: trace_id.into(),
91            autonomy_tier,
92            timestamp: OffsetDateTime::now_utc(),
93            cost_usd: None,
94            metadata: BTreeMap::new(),
95        }
96    }
97}
98
99#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
100#[serde(default)]
101pub struct TrustQueryFilters {
102    pub agent: Option<String>,
103    pub action: Option<String>,
104    #[serde(with = "time::serde::rfc3339::option")]
105    pub since: Option<OffsetDateTime>,
106    #[serde(with = "time::serde::rfc3339::option")]
107    pub until: Option<OffsetDateTime>,
108    pub tier: Option<AutonomyTier>,
109    pub outcome: Option<TrustOutcome>,
110    pub limit: Option<usize>,
111    pub grouped_by_trace: bool,
112}
113
114#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
115#[serde(default)]
116pub struct TrustTraceGroup {
117    pub trace_id: String,
118    pub records: Vec<TrustRecord>,
119}
120
121#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
122#[serde(default)]
123pub struct TrustAgentSummary {
124    pub agent: String,
125    pub total: u64,
126    pub success_rate: f64,
127    pub mean_cost_usd: Option<f64>,
128    pub tier_distribution: BTreeMap<String, u64>,
129    pub outcome_distribution: BTreeMap<String, u64>,
130}
131
132fn global_topic() -> Result<Topic, LogError> {
133    Topic::new(TRUST_GRAPH_GLOBAL_TOPIC)
134}
135
136pub fn topic_for_agent(agent: &str) -> Result<Topic, LogError> {
137    Topic::new(format!(
138        "{TRUST_GRAPH_TOPIC_PREFIX}{}",
139        sanitize_topic_component(agent)
140    ))
141}
142
143pub async fn append_trust_record(
144    log: &Arc<AnyEventLog>,
145    record: &TrustRecord,
146) -> Result<(), LogError> {
147    let payload = serde_json::to_value(record)
148        .map_err(|error| LogError::Serde(format!("trust record encode error: {error}")))?;
149    let mut headers = BTreeMap::new();
150    headers.insert("trace_id".to_string(), record.trace_id.clone());
151    headers.insert("agent".to_string(), record.agent.clone());
152    headers.insert(
153        "autonomy_tier".to_string(),
154        record.autonomy_tier.as_str().to_string(),
155    );
156    headers.insert("outcome".to_string(), record.outcome.as_str().to_string());
157    let event = LogEvent::new("trust_recorded", payload).with_headers(headers);
158    let per_agent = topic_for_agent(&record.agent)?;
159    log.append(&global_topic()?, event.clone()).await?;
160    log.append(&per_agent, event).await?;
161    Ok(())
162}
163
164pub async fn append_active_trust_record(record: &TrustRecord) -> Result<(), LogError> {
165    let log = active_event_log()
166        .ok_or_else(|| LogError::Config("trust graph requires an active event log".to_string()))?;
167    append_trust_record(&log, record).await
168}
169
170pub async fn query_trust_records(
171    log: &Arc<AnyEventLog>,
172    filters: &TrustQueryFilters,
173) -> Result<Vec<TrustRecord>, LogError> {
174    let topic = query_topic(filters)?;
175    let events = log.read_range(&topic, None, usize::MAX).await?;
176    let mut records = Vec::new();
177    for (_, event) in events {
178        if event.kind != "trust_recorded" {
179            continue;
180        }
181        let Ok(record) = serde_json::from_value::<TrustRecord>(event.payload) else {
182            continue;
183        };
184        if !matches_filters(&record, filters) {
185            continue;
186        }
187        records.push(record);
188    }
189    records.sort_by(|left, right| {
190        left.timestamp
191            .cmp(&right.timestamp)
192            .then(left.agent.cmp(&right.agent))
193            .then(left.record_id.cmp(&right.record_id))
194    });
195    apply_record_limit(&mut records, filters.limit);
196    Ok(records)
197}
198
199pub fn group_trust_records_by_trace(records: &[TrustRecord]) -> Vec<TrustTraceGroup> {
200    let mut groups: Vec<TrustTraceGroup> = Vec::new();
201    let mut positions: HashMap<String, usize> = HashMap::new();
202    for record in records {
203        if let Some(index) = positions.get(record.trace_id.as_str()).copied() {
204            groups[index].records.push(record.clone());
205            continue;
206        }
207        positions.insert(record.trace_id.clone(), groups.len());
208        groups.push(TrustTraceGroup {
209            trace_id: record.trace_id.clone(),
210            records: vec![record.clone()],
211        });
212    }
213    groups
214}
215
216pub fn summarize_trust_records(records: &[TrustRecord]) -> Vec<TrustAgentSummary> {
217    #[derive(Default)]
218    struct RunningSummary {
219        total: u64,
220        successes: u64,
221        cost_sum: f64,
222        cost_count: u64,
223        tier_distribution: BTreeMap<String, u64>,
224        outcome_distribution: BTreeMap<String, u64>,
225    }
226
227    let mut by_agent: BTreeMap<String, RunningSummary> = BTreeMap::new();
228    for record in records {
229        let entry = by_agent.entry(record.agent.clone()).or_default();
230        entry.total += 1;
231        if record.outcome == TrustOutcome::Success {
232            entry.successes += 1;
233        }
234        if let Some(cost_usd) = record.cost_usd {
235            entry.cost_sum += cost_usd;
236            entry.cost_count += 1;
237        }
238        *entry
239            .tier_distribution
240            .entry(record.autonomy_tier.as_str().to_string())
241            .or_default() += 1;
242        *entry
243            .outcome_distribution
244            .entry(record.outcome.as_str().to_string())
245            .or_default() += 1;
246    }
247
248    by_agent
249        .into_iter()
250        .map(|(agent, summary)| TrustAgentSummary {
251            agent,
252            total: summary.total,
253            success_rate: if summary.total == 0 {
254                0.0
255            } else {
256                summary.successes as f64 / summary.total as f64
257            },
258            mean_cost_usd: (summary.cost_count > 0)
259                .then_some(summary.cost_sum / summary.cost_count as f64),
260            tier_distribution: summary.tier_distribution,
261            outcome_distribution: summary.outcome_distribution,
262        })
263        .collect()
264}
265
266pub async fn resolve_agent_autonomy_tier(
267    log: &Arc<AnyEventLog>,
268    agent: &str,
269    default: AutonomyTier,
270) -> Result<AutonomyTier, LogError> {
271    let records = query_trust_records(
272        log,
273        &TrustQueryFilters {
274            agent: Some(agent.to_string()),
275            ..TrustQueryFilters::default()
276        },
277    )
278    .await?;
279    let mut current = default;
280    for record in records {
281        if matches!(record.action.as_str(), "trust.promote" | "trust.demote")
282            && record.outcome == TrustOutcome::Success
283        {
284            current = record.autonomy_tier;
285        }
286    }
287    Ok(current)
288}
289
290fn matches_filters(record: &TrustRecord, filters: &TrustQueryFilters) -> bool {
291    if let Some(agent) = filters.agent.as_deref() {
292        if record.agent != agent {
293            return false;
294        }
295    }
296    if let Some(action) = filters.action.as_deref() {
297        if record.action != action {
298            return false;
299        }
300    }
301    if let Some(since) = filters.since {
302        if record.timestamp < since {
303            return false;
304        }
305    }
306    if let Some(until) = filters.until {
307        if record.timestamp > until {
308            return false;
309        }
310    }
311    if let Some(tier) = filters.tier {
312        if record.autonomy_tier != tier {
313            return false;
314        }
315    }
316    if let Some(outcome) = filters.outcome {
317        if record.outcome != outcome {
318            return false;
319        }
320    }
321    true
322}
323
324fn query_topic(filters: &TrustQueryFilters) -> Result<Topic, LogError> {
325    match filters.agent.as_deref() {
326        Some(agent) => topic_for_agent(agent),
327        None => global_topic(),
328    }
329}
330
331fn apply_record_limit(records: &mut Vec<TrustRecord>, limit: Option<usize>) {
332    let Some(limit) = limit else {
333        return;
334    };
335    if records.len() <= limit {
336        return;
337    }
338    let keep_from = records.len() - limit;
339    records.drain(0..keep_from);
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345    use crate::event_log::MemoryEventLog;
346    use time::Duration;
347
348    #[tokio::test]
349    async fn append_and_query_round_trip() {
350        let log: Arc<AnyEventLog> = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(16)));
351        let mut record = TrustRecord::new(
352            "github-triage-bot",
353            "github.issue.opened",
354            Some("reviewer".to_string()),
355            TrustOutcome::Success,
356            "trace-1",
357            AutonomyTier::ActWithApproval,
358        );
359        record.cost_usd = Some(1.25);
360        append_trust_record(&log, &record).await.unwrap();
361
362        let records = query_trust_records(
363            &log,
364            &TrustQueryFilters {
365                agent: Some("github-triage-bot".to_string()),
366                ..TrustQueryFilters::default()
367            },
368        )
369        .await
370        .unwrap();
371
372        assert_eq!(records.len(), 1);
373        assert_eq!(records[0].agent, "github-triage-bot");
374        assert_eq!(records[0].cost_usd, Some(1.25));
375    }
376
377    #[tokio::test]
378    async fn resolve_autonomy_tier_prefers_latest_control_record() {
379        let log: Arc<AnyEventLog> = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(16)));
380        append_trust_record(
381            &log,
382            &TrustRecord::new(
383                "bot",
384                "trust.promote",
385                None,
386                TrustOutcome::Success,
387                "trace-1",
388                AutonomyTier::ActWithApproval,
389            ),
390        )
391        .await
392        .unwrap();
393        append_trust_record(
394            &log,
395            &TrustRecord::new(
396                "bot",
397                "trust.demote",
398                None,
399                TrustOutcome::Success,
400                "trace-2",
401                AutonomyTier::Shadow,
402            ),
403        )
404        .await
405        .unwrap();
406
407        let tier = resolve_agent_autonomy_tier(&log, "bot", AutonomyTier::ActAuto)
408            .await
409            .unwrap();
410        assert_eq!(tier, AutonomyTier::Shadow);
411    }
412
413    #[tokio::test]
414    async fn query_limit_keeps_newest_matching_records() {
415        let log: Arc<AnyEventLog> = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(16)));
416        let base = OffsetDateTime::now_utc();
417        for (offset, action) in ["first", "second", "third"].into_iter().enumerate() {
418            let mut record = TrustRecord::new(
419                "bot",
420                action,
421                None,
422                TrustOutcome::Success,
423                format!("trace-{action}"),
424                AutonomyTier::ActAuto,
425            );
426            record.timestamp = base + Duration::seconds(offset as i64);
427            append_trust_record(&log, &record).await.unwrap();
428        }
429
430        let records = query_trust_records(
431            &log,
432            &TrustQueryFilters {
433                agent: Some("bot".to_string()),
434                limit: Some(2),
435                ..TrustQueryFilters::default()
436            },
437        )
438        .await
439        .unwrap();
440
441        assert_eq!(records.len(), 2);
442        assert_eq!(records[0].action, "second");
443        assert_eq!(records[1].action, "third");
444    }
445
446    #[test]
447    fn group_by_trace_preserves_chronological_group_order() {
448        let make_record = |trace_id: &str, action: &str| TrustRecord {
449            trace_id: trace_id.to_string(),
450            action: action.to_string(),
451            ..TrustRecord::new(
452                "bot",
453                action,
454                None,
455                TrustOutcome::Success,
456                trace_id,
457                AutonomyTier::ActAuto,
458            )
459        };
460        let grouped = group_trust_records_by_trace(&[
461            make_record("trace-1", "first"),
462            make_record("trace-2", "second"),
463            make_record("trace-1", "third"),
464        ]);
465
466        assert_eq!(grouped.len(), 2);
467        assert_eq!(grouped[0].trace_id, "trace-1");
468        assert_eq!(grouped[0].records.len(), 2);
469        assert_eq!(grouped[0].records[1].action, "third");
470        assert_eq!(grouped[1].trace_id, "trace-2");
471    }
472}