1use std::collections::{BTreeMap, HashMap};
2use std::sync::Arc;
3
4use serde::{Deserialize, Serialize};
5use time::OffsetDateTime;
6use uuid::Uuid;
7
8use crate::event_log::{
9 active_event_log, sanitize_topic_component, AnyEventLog, EventLog, LogError, LogEvent, Topic,
10};
11
12pub const OPENTRUSTGRAPH_SCHEMA_V0: &str = "opentrustgraph/v0";
13pub const TRUST_GRAPH_GLOBAL_TOPIC: &str = "trust.graph";
14pub const TRUST_GRAPH_TOPIC_PREFIX: &str = "trust.graph.";
15
16#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
17#[serde(rename_all = "snake_case")]
18pub enum AutonomyTier {
19 Shadow,
20 Suggest,
21 ActWithApproval,
22 #[default]
23 ActAuto,
24}
25
26impl AutonomyTier {
27 pub fn as_str(self) -> &'static str {
28 match self {
29 Self::Shadow => "shadow",
30 Self::Suggest => "suggest",
31 Self::ActWithApproval => "act_with_approval",
32 Self::ActAuto => "act_auto",
33 }
34 }
35}
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
38#[serde(rename_all = "snake_case")]
39pub enum TrustOutcome {
40 Success,
41 Failure,
42 Denied,
43 Timeout,
44}
45
46impl TrustOutcome {
47 pub fn as_str(self) -> &'static str {
48 match self {
49 Self::Success => "success",
50 Self::Failure => "failure",
51 Self::Denied => "denied",
52 Self::Timeout => "timeout",
53 }
54 }
55}
56
57#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
58pub struct TrustRecord {
59 pub schema: String,
60 pub record_id: String,
61 pub agent: String,
62 pub action: String,
63 pub approver: Option<String>,
64 pub outcome: TrustOutcome,
65 pub trace_id: String,
66 pub autonomy_tier: AutonomyTier,
67 #[serde(with = "time::serde::rfc3339")]
68 pub timestamp: OffsetDateTime,
69 pub cost_usd: Option<f64>,
70 #[serde(default)]
71 pub metadata: BTreeMap<String, serde_json::Value>,
72}
73
74impl TrustRecord {
75 pub fn new(
76 agent: impl Into<String>,
77 action: impl Into<String>,
78 approver: Option<String>,
79 outcome: TrustOutcome,
80 trace_id: impl Into<String>,
81 autonomy_tier: AutonomyTier,
82 ) -> Self {
83 Self {
84 schema: OPENTRUSTGRAPH_SCHEMA_V0.to_string(),
85 record_id: Uuid::now_v7().to_string(),
86 agent: agent.into(),
87 action: action.into(),
88 approver,
89 outcome,
90 trace_id: trace_id.into(),
91 autonomy_tier,
92 timestamp: OffsetDateTime::now_utc(),
93 cost_usd: None,
94 metadata: BTreeMap::new(),
95 }
96 }
97}
98
99#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
100#[serde(default)]
101pub struct TrustQueryFilters {
102 pub agent: Option<String>,
103 pub action: Option<String>,
104 #[serde(with = "time::serde::rfc3339::option")]
105 pub since: Option<OffsetDateTime>,
106 #[serde(with = "time::serde::rfc3339::option")]
107 pub until: Option<OffsetDateTime>,
108 pub tier: Option<AutonomyTier>,
109 pub outcome: Option<TrustOutcome>,
110 pub limit: Option<usize>,
111 pub grouped_by_trace: bool,
112}
113
114#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
115#[serde(default)]
116pub struct TrustTraceGroup {
117 pub trace_id: String,
118 pub records: Vec<TrustRecord>,
119}
120
121#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
122#[serde(default)]
123pub struct TrustAgentSummary {
124 pub agent: String,
125 pub total: u64,
126 pub success_rate: f64,
127 pub mean_cost_usd: Option<f64>,
128 pub tier_distribution: BTreeMap<String, u64>,
129 pub outcome_distribution: BTreeMap<String, u64>,
130}
131
132fn global_topic() -> Result<Topic, LogError> {
133 Topic::new(TRUST_GRAPH_GLOBAL_TOPIC)
134}
135
136pub fn topic_for_agent(agent: &str) -> Result<Topic, LogError> {
137 Topic::new(format!(
138 "{TRUST_GRAPH_TOPIC_PREFIX}{}",
139 sanitize_topic_component(agent)
140 ))
141}
142
143pub async fn append_trust_record(
144 log: &Arc<AnyEventLog>,
145 record: &TrustRecord,
146) -> Result<(), LogError> {
147 let payload = serde_json::to_value(record)
148 .map_err(|error| LogError::Serde(format!("trust record encode error: {error}")))?;
149 let mut headers = BTreeMap::new();
150 headers.insert("trace_id".to_string(), record.trace_id.clone());
151 headers.insert("agent".to_string(), record.agent.clone());
152 headers.insert(
153 "autonomy_tier".to_string(),
154 record.autonomy_tier.as_str().to_string(),
155 );
156 headers.insert("outcome".to_string(), record.outcome.as_str().to_string());
157 let event = LogEvent::new("trust_recorded", payload).with_headers(headers);
158 let per_agent = topic_for_agent(&record.agent)?;
159 log.append(&global_topic()?, event.clone()).await?;
160 log.append(&per_agent, event).await?;
161 Ok(())
162}
163
164pub async fn append_active_trust_record(record: &TrustRecord) -> Result<(), LogError> {
165 let log = active_event_log()
166 .ok_or_else(|| LogError::Config("trust graph requires an active event log".to_string()))?;
167 append_trust_record(&log, record).await
168}
169
170pub async fn query_trust_records(
171 log: &Arc<AnyEventLog>,
172 filters: &TrustQueryFilters,
173) -> Result<Vec<TrustRecord>, LogError> {
174 let topic = query_topic(filters)?;
175 let events = log.read_range(&topic, None, usize::MAX).await?;
176 let mut records = Vec::new();
177 for (_, event) in events {
178 if event.kind != "trust_recorded" {
179 continue;
180 }
181 let Ok(record) = serde_json::from_value::<TrustRecord>(event.payload) else {
182 continue;
183 };
184 if !matches_filters(&record, filters) {
185 continue;
186 }
187 records.push(record);
188 }
189 records.sort_by(|left, right| {
190 left.timestamp
191 .cmp(&right.timestamp)
192 .then(left.agent.cmp(&right.agent))
193 .then(left.record_id.cmp(&right.record_id))
194 });
195 apply_record_limit(&mut records, filters.limit);
196 Ok(records)
197}
198
199pub fn group_trust_records_by_trace(records: &[TrustRecord]) -> Vec<TrustTraceGroup> {
200 let mut groups: Vec<TrustTraceGroup> = Vec::new();
201 let mut positions: HashMap<String, usize> = HashMap::new();
202 for record in records {
203 if let Some(index) = positions.get(record.trace_id.as_str()).copied() {
204 groups[index].records.push(record.clone());
205 continue;
206 }
207 positions.insert(record.trace_id.clone(), groups.len());
208 groups.push(TrustTraceGroup {
209 trace_id: record.trace_id.clone(),
210 records: vec![record.clone()],
211 });
212 }
213 groups
214}
215
216pub fn summarize_trust_records(records: &[TrustRecord]) -> Vec<TrustAgentSummary> {
217 #[derive(Default)]
218 struct RunningSummary {
219 total: u64,
220 successes: u64,
221 cost_sum: f64,
222 cost_count: u64,
223 tier_distribution: BTreeMap<String, u64>,
224 outcome_distribution: BTreeMap<String, u64>,
225 }
226
227 let mut by_agent: BTreeMap<String, RunningSummary> = BTreeMap::new();
228 for record in records {
229 let entry = by_agent.entry(record.agent.clone()).or_default();
230 entry.total += 1;
231 if record.outcome == TrustOutcome::Success {
232 entry.successes += 1;
233 }
234 if let Some(cost_usd) = record.cost_usd {
235 entry.cost_sum += cost_usd;
236 entry.cost_count += 1;
237 }
238 *entry
239 .tier_distribution
240 .entry(record.autonomy_tier.as_str().to_string())
241 .or_default() += 1;
242 *entry
243 .outcome_distribution
244 .entry(record.outcome.as_str().to_string())
245 .or_default() += 1;
246 }
247
248 by_agent
249 .into_iter()
250 .map(|(agent, summary)| TrustAgentSummary {
251 agent,
252 total: summary.total,
253 success_rate: if summary.total == 0 {
254 0.0
255 } else {
256 summary.successes as f64 / summary.total as f64
257 },
258 mean_cost_usd: (summary.cost_count > 0)
259 .then_some(summary.cost_sum / summary.cost_count as f64),
260 tier_distribution: summary.tier_distribution,
261 outcome_distribution: summary.outcome_distribution,
262 })
263 .collect()
264}
265
266pub async fn resolve_agent_autonomy_tier(
267 log: &Arc<AnyEventLog>,
268 agent: &str,
269 default: AutonomyTier,
270) -> Result<AutonomyTier, LogError> {
271 let records = query_trust_records(
272 log,
273 &TrustQueryFilters {
274 agent: Some(agent.to_string()),
275 ..TrustQueryFilters::default()
276 },
277 )
278 .await?;
279 let mut current = default;
280 for record in records {
281 if matches!(record.action.as_str(), "trust.promote" | "trust.demote")
282 && record.outcome == TrustOutcome::Success
283 {
284 current = record.autonomy_tier;
285 }
286 }
287 Ok(current)
288}
289
290fn matches_filters(record: &TrustRecord, filters: &TrustQueryFilters) -> bool {
291 if let Some(agent) = filters.agent.as_deref() {
292 if record.agent != agent {
293 return false;
294 }
295 }
296 if let Some(action) = filters.action.as_deref() {
297 if record.action != action {
298 return false;
299 }
300 }
301 if let Some(since) = filters.since {
302 if record.timestamp < since {
303 return false;
304 }
305 }
306 if let Some(until) = filters.until {
307 if record.timestamp > until {
308 return false;
309 }
310 }
311 if let Some(tier) = filters.tier {
312 if record.autonomy_tier != tier {
313 return false;
314 }
315 }
316 if let Some(outcome) = filters.outcome {
317 if record.outcome != outcome {
318 return false;
319 }
320 }
321 true
322}
323
324fn query_topic(filters: &TrustQueryFilters) -> Result<Topic, LogError> {
325 match filters.agent.as_deref() {
326 Some(agent) => topic_for_agent(agent),
327 None => global_topic(),
328 }
329}
330
331fn apply_record_limit(records: &mut Vec<TrustRecord>, limit: Option<usize>) {
332 let Some(limit) = limit else {
333 return;
334 };
335 if records.len() <= limit {
336 return;
337 }
338 let keep_from = records.len() - limit;
339 records.drain(0..keep_from);
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use crate::event_log::MemoryEventLog;
346 use time::Duration;
347
348 #[tokio::test]
349 async fn append_and_query_round_trip() {
350 let log: Arc<AnyEventLog> = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(16)));
351 let mut record = TrustRecord::new(
352 "github-triage-bot",
353 "github.issue.opened",
354 Some("reviewer".to_string()),
355 TrustOutcome::Success,
356 "trace-1",
357 AutonomyTier::ActWithApproval,
358 );
359 record.cost_usd = Some(1.25);
360 append_trust_record(&log, &record).await.unwrap();
361
362 let records = query_trust_records(
363 &log,
364 &TrustQueryFilters {
365 agent: Some("github-triage-bot".to_string()),
366 ..TrustQueryFilters::default()
367 },
368 )
369 .await
370 .unwrap();
371
372 assert_eq!(records.len(), 1);
373 assert_eq!(records[0].agent, "github-triage-bot");
374 assert_eq!(records[0].cost_usd, Some(1.25));
375 }
376
377 #[tokio::test]
378 async fn resolve_autonomy_tier_prefers_latest_control_record() {
379 let log: Arc<AnyEventLog> = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(16)));
380 append_trust_record(
381 &log,
382 &TrustRecord::new(
383 "bot",
384 "trust.promote",
385 None,
386 TrustOutcome::Success,
387 "trace-1",
388 AutonomyTier::ActWithApproval,
389 ),
390 )
391 .await
392 .unwrap();
393 append_trust_record(
394 &log,
395 &TrustRecord::new(
396 "bot",
397 "trust.demote",
398 None,
399 TrustOutcome::Success,
400 "trace-2",
401 AutonomyTier::Shadow,
402 ),
403 )
404 .await
405 .unwrap();
406
407 let tier = resolve_agent_autonomy_tier(&log, "bot", AutonomyTier::ActAuto)
408 .await
409 .unwrap();
410 assert_eq!(tier, AutonomyTier::Shadow);
411 }
412
413 #[tokio::test]
414 async fn query_limit_keeps_newest_matching_records() {
415 let log: Arc<AnyEventLog> = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(16)));
416 let base = OffsetDateTime::now_utc();
417 for (offset, action) in ["first", "second", "third"].into_iter().enumerate() {
418 let mut record = TrustRecord::new(
419 "bot",
420 action,
421 None,
422 TrustOutcome::Success,
423 format!("trace-{action}"),
424 AutonomyTier::ActAuto,
425 );
426 record.timestamp = base + Duration::seconds(offset as i64);
427 append_trust_record(&log, &record).await.unwrap();
428 }
429
430 let records = query_trust_records(
431 &log,
432 &TrustQueryFilters {
433 agent: Some("bot".to_string()),
434 limit: Some(2),
435 ..TrustQueryFilters::default()
436 },
437 )
438 .await
439 .unwrap();
440
441 assert_eq!(records.len(), 2);
442 assert_eq!(records[0].action, "second");
443 assert_eq!(records[1].action, "third");
444 }
445
446 #[test]
447 fn group_by_trace_preserves_chronological_group_order() {
448 let make_record = |trace_id: &str, action: &str| TrustRecord {
449 trace_id: trace_id.to_string(),
450 action: action.to_string(),
451 ..TrustRecord::new(
452 "bot",
453 action,
454 None,
455 TrustOutcome::Success,
456 trace_id,
457 AutonomyTier::ActAuto,
458 )
459 };
460 let grouped = group_trust_records_by_trace(&[
461 make_record("trace-1", "first"),
462 make_record("trace-2", "second"),
463 make_record("trace-1", "third"),
464 ]);
465
466 assert_eq!(grouped.len(), 2);
467 assert_eq!(grouped[0].trace_id, "trace-1");
468 assert_eq!(grouped[0].records.len(), 2);
469 assert_eq!(grouped[0].records[1].action, "third");
470 assert_eq!(grouped[1].trace_id, "trace-2");
471 }
472}