1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::sync::Arc;
3
4use serde::{Deserialize, Serialize};
5use sha2::{Digest, Sha256};
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9use crate::event_log::{
10 active_event_log, sanitize_topic_component, AnyEventLog, EventId, EventLog, LogError, LogEvent,
11 Topic,
12};
13use crate::orchestration::CapabilityPolicy;
14
15pub const OPENTRUSTGRAPH_SCHEMA_V0: &str = "opentrustgraph/v0";
16pub const TRUST_GRAPH_GLOBAL_TOPIC: &str = "trust_graph";
17pub const TRUST_GRAPH_LEGACY_GLOBAL_TOPIC: &str = "trust.graph";
18pub const TRUST_GRAPH_TOPIC_PREFIX: &str = "trust_graph.";
19pub const TRUST_GRAPH_LEGACY_TOPIC_PREFIX: &str = "trust.graph.";
20pub const TRUST_GRAPH_EVENT_KIND: &str = "trust_recorded";
21
22#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
23#[serde(rename_all = "snake_case")]
24pub enum AutonomyTier {
25 Shadow,
26 Suggest,
27 ActWithApproval,
28 #[default]
29 ActAuto,
30}
31
32impl AutonomyTier {
33 pub fn as_str(self) -> &'static str {
34 match self {
35 Self::Shadow => "shadow",
36 Self::Suggest => "suggest",
37 Self::ActWithApproval => "act_with_approval",
38 Self::ActAuto => "act_auto",
39 }
40 }
41}
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45pub enum TrustOutcome {
46 Success,
47 Failure,
48 Denied,
49 Timeout,
50}
51
52impl TrustOutcome {
53 pub fn as_str(self) -> &'static str {
54 match self {
55 Self::Success => "success",
56 Self::Failure => "failure",
57 Self::Denied => "denied",
58 Self::Timeout => "timeout",
59 }
60 }
61}
62
63#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
64pub struct TrustRecord {
65 pub schema: String,
66 pub record_id: String,
67 pub agent: String,
68 pub action: String,
69 pub approver: Option<String>,
70 pub outcome: TrustOutcome,
71 pub trace_id: String,
72 pub autonomy_tier: AutonomyTier,
73 #[serde(with = "time::serde::rfc3339")]
74 pub timestamp: OffsetDateTime,
75 pub cost_usd: Option<f64>,
76 #[serde(default)]
77 pub chain_index: u64,
78 #[serde(default)]
79 pub previous_hash: Option<String>,
80 #[serde(default)]
81 pub entry_hash: String,
82 #[serde(default)]
83 pub metadata: BTreeMap<String, serde_json::Value>,
84}
85
86impl TrustRecord {
87 pub fn new(
88 agent: impl Into<String>,
89 action: impl Into<String>,
90 approver: Option<String>,
91 outcome: TrustOutcome,
92 trace_id: impl Into<String>,
93 autonomy_tier: AutonomyTier,
94 ) -> Self {
95 Self {
96 schema: OPENTRUSTGRAPH_SCHEMA_V0.to_string(),
97 record_id: Uuid::now_v7().to_string(),
98 agent: agent.into(),
99 action: action.into(),
100 approver,
101 outcome,
102 trace_id: trace_id.into(),
103 autonomy_tier,
104 timestamp: OffsetDateTime::now_utc(),
105 cost_usd: None,
106 chain_index: 0,
107 previous_hash: None,
108 entry_hash: String::new(),
109 metadata: BTreeMap::new(),
110 }
111 }
112}
113
114#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
115#[serde(default)]
116pub struct TrustQueryFilters {
117 pub agent: Option<String>,
118 pub action: Option<String>,
119 #[serde(with = "time::serde::rfc3339::option")]
120 pub since: Option<OffsetDateTime>,
121 #[serde(with = "time::serde::rfc3339::option")]
122 pub until: Option<OffsetDateTime>,
123 pub tier: Option<AutonomyTier>,
124 pub outcome: Option<TrustOutcome>,
125 pub limit: Option<usize>,
126 pub grouped_by_trace: bool,
127}
128
129#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
130#[serde(default)]
131pub struct TrustTraceGroup {
132 pub trace_id: String,
133 pub records: Vec<TrustRecord>,
134}
135
136#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
137#[serde(default)]
138pub struct TrustAgentSummary {
139 pub agent: String,
140 pub total: u64,
141 pub success_rate: f64,
142 pub mean_cost_usd: Option<f64>,
143 pub tier_distribution: BTreeMap<String, u64>,
144 pub outcome_distribution: BTreeMap<String, u64>,
145}
146
147#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
148#[serde(default)]
149pub struct TrustScore {
150 pub agent: String,
151 pub action: Option<String>,
152 pub total: u64,
153 pub successes: u64,
154 pub failures: u64,
155 pub denied: u64,
156 pub timeouts: u64,
157 pub success_rate: f64,
158 pub latest_outcome: Option<TrustOutcome>,
159 #[serde(with = "time::serde::rfc3339::option")]
160 pub latest_timestamp: Option<OffsetDateTime>,
161 pub effective_tier: AutonomyTier,
162 pub policy: CapabilityPolicy,
163}
164
165#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
166#[serde(default)]
167pub struct TrustChainReport {
168 pub topic: String,
169 pub total: u64,
170 pub verified: bool,
171 pub root_hash: Option<String>,
172 pub broken_at_event_id: Option<EventId>,
173 pub errors: Vec<String>,
174}
175
176fn global_topic() -> Result<Topic, LogError> {
177 Topic::new(TRUST_GRAPH_GLOBAL_TOPIC)
178}
179
180fn legacy_global_topic() -> Result<Topic, LogError> {
181 Topic::new(TRUST_GRAPH_LEGACY_GLOBAL_TOPIC)
182}
183
184pub fn topic_for_agent(agent: &str) -> Result<Topic, LogError> {
185 Topic::new(format!(
186 "{TRUST_GRAPH_TOPIC_PREFIX}{}",
187 sanitize_topic_component(agent)
188 ))
189}
190
191pub fn legacy_topic_for_agent(agent: &str) -> Result<Topic, LogError> {
192 Topic::new(format!(
193 "{TRUST_GRAPH_LEGACY_TOPIC_PREFIX}{}",
194 sanitize_topic_component(agent)
195 ))
196}
197
198pub async fn append_trust_record(
199 log: &Arc<AnyEventLog>,
200 record: &TrustRecord,
201) -> Result<TrustRecord, LogError> {
202 let finalized = finalize_trust_record(log, record.clone()).await?;
203 let payload = serde_json::to_value(&finalized)
204 .map_err(|error| LogError::Serde(format!("trust record encode error: {error}")))?;
205 let mut headers = BTreeMap::new();
206 headers.insert("trace_id".to_string(), finalized.trace_id.clone());
207 headers.insert("agent".to_string(), finalized.agent.clone());
208 headers.insert(
209 "autonomy_tier".to_string(),
210 finalized.autonomy_tier.as_str().to_string(),
211 );
212 headers.insert(
213 "outcome".to_string(),
214 finalized.outcome.as_str().to_string(),
215 );
216 headers.insert("entry_hash".to_string(), finalized.entry_hash.clone());
217 let event = LogEvent::new(TRUST_GRAPH_EVENT_KIND, payload).with_headers(headers);
218 for topic in append_topics_for_record(&finalized)? {
219 log.append(&topic, event.clone()).await?;
220 }
221 Ok(finalized)
222}
223
224pub async fn append_active_trust_record(record: &TrustRecord) -> Result<TrustRecord, LogError> {
225 let log = active_event_log()
226 .ok_or_else(|| LogError::Config("trust graph requires an active event log".to_string()))?;
227 append_trust_record(&log, record).await
228}
229
230pub async fn query_trust_records(
231 log: &Arc<AnyEventLog>,
232 filters: &TrustQueryFilters,
233) -> Result<Vec<TrustRecord>, LogError> {
234 let topics = query_topics(filters)?;
235 let mut records = Vec::new();
236 let mut seen = HashSet::new();
237 for topic in topics {
238 for (_, event) in log.read_range(&topic, None, usize::MAX).await? {
239 if event.kind != TRUST_GRAPH_EVENT_KIND {
240 continue;
241 }
242 let Ok(record) = serde_json::from_value::<TrustRecord>(event.payload) else {
243 continue;
244 };
245 if !matches_filters(&record, filters) {
246 continue;
247 }
248 let dedupe_key = trust_record_dedupe_key(&record);
249 if seen.insert(dedupe_key) {
250 records.push(record);
251 }
252 }
253 }
254 records.sort_by(|left, right| {
255 left.timestamp
256 .cmp(&right.timestamp)
257 .then(left.chain_index.cmp(&right.chain_index))
258 .then(left.agent.cmp(&right.agent))
259 .then(left.record_id.cmp(&right.record_id))
260 });
261 apply_record_limit(&mut records, filters.limit);
262 Ok(records)
263}
264
265pub async fn trust_score_for(
266 log: &Arc<AnyEventLog>,
267 agent: &str,
268 action: Option<&str>,
269) -> Result<TrustScore, LogError> {
270 let records = query_trust_records(
271 log,
272 &TrustQueryFilters {
273 agent: Some(agent.to_string()),
274 action: action.map(ToString::to_string),
275 ..TrustQueryFilters::default()
276 },
277 )
278 .await?;
279 let effective_tier = resolve_agent_autonomy_tier(log, agent, AutonomyTier::ActAuto).await?;
280 Ok(score_from_records(agent, action, effective_tier, &records))
281}
282
283pub async fn policy_for_agent(
284 log: &Arc<AnyEventLog>,
285 agent: &str,
286) -> Result<CapabilityPolicy, LogError> {
287 Ok(trust_score_for(log, agent, None).await?.policy)
288}
289
290pub async fn verify_trust_chain(log: &Arc<AnyEventLog>) -> Result<TrustChainReport, LogError> {
291 let (topic, records) = preferred_chain_records(log).await?;
292 let mut previous_hash: Option<String> = None;
293 let mut errors = Vec::new();
294 let mut broken_at_event_id = None;
295
296 for (position, (event_id, record)) in records.iter().enumerate() {
297 let expected_index = (position as u64) + 1;
298 if record.chain_index != expected_index {
299 errors.push(format!(
300 "event {event_id}: expected chain_index {expected_index}, found {}",
301 record.chain_index
302 ));
303 }
304 if record.previous_hash != previous_hash {
305 errors.push(format!(
306 "event {event_id}: previous_hash mismatch; expected {:?}, found {:?}",
307 previous_hash, record.previous_hash
308 ));
309 }
310 match compute_trust_record_hash(record) {
311 Ok(expected_hash) if expected_hash == record.entry_hash => {}
312 Ok(expected_hash) => errors.push(format!(
313 "event {event_id}: entry_hash mismatch; expected {expected_hash}, found {}",
314 record.entry_hash
315 )),
316 Err(error) => errors.push(format!("event {event_id}: {error}")),
317 }
318 if !errors.is_empty() && broken_at_event_id.is_none() {
319 broken_at_event_id = Some(*event_id);
320 }
321 previous_hash = Some(record.entry_hash.clone());
322 }
323
324 Ok(TrustChainReport {
325 topic: topic.as_str().to_string(),
326 total: records.len() as u64,
327 verified: errors.is_empty(),
328 root_hash: records.last().map(|(_, record)| record.entry_hash.clone()),
329 broken_at_event_id,
330 errors,
331 })
332}
333
334pub fn compute_trust_record_hash(record: &TrustRecord) -> Result<String, LogError> {
335 let mut value = serde_json::to_value(record)
336 .map_err(|error| LogError::Serde(format!("trust record hash encode error: {error}")))?;
337 if let Some(object) = value.as_object_mut() {
338 object.remove("entry_hash");
339 }
340 let canonical = serde_json::to_string(&value)
341 .map_err(|error| LogError::Serde(format!("trust record canonicalize error: {error}")))?;
342 let digest = Sha256::digest(canonical.as_bytes());
343 Ok(format!("sha256:{}", hex::encode(digest)))
344}
345
346pub fn group_trust_records_by_trace(records: &[TrustRecord]) -> Vec<TrustTraceGroup> {
347 let mut groups: Vec<TrustTraceGroup> = Vec::new();
348 let mut positions: HashMap<String, usize> = HashMap::new();
349 for record in records {
350 if let Some(index) = positions.get(record.trace_id.as_str()).copied() {
351 groups[index].records.push(record.clone());
352 continue;
353 }
354 positions.insert(record.trace_id.clone(), groups.len());
355 groups.push(TrustTraceGroup {
356 trace_id: record.trace_id.clone(),
357 records: vec![record.clone()],
358 });
359 }
360 groups
361}
362
363pub fn summarize_trust_records(records: &[TrustRecord]) -> Vec<TrustAgentSummary> {
364 #[derive(Default)]
365 struct RunningSummary {
366 total: u64,
367 successes: u64,
368 cost_sum: f64,
369 cost_count: u64,
370 tier_distribution: BTreeMap<String, u64>,
371 outcome_distribution: BTreeMap<String, u64>,
372 }
373
374 let mut by_agent: BTreeMap<String, RunningSummary> = BTreeMap::new();
375 for record in records {
376 let entry = by_agent.entry(record.agent.clone()).or_default();
377 entry.total += 1;
378 if record.outcome == TrustOutcome::Success {
379 entry.successes += 1;
380 }
381 if let Some(cost_usd) = record.cost_usd {
382 entry.cost_sum += cost_usd;
383 entry.cost_count += 1;
384 }
385 *entry
386 .tier_distribution
387 .entry(record.autonomy_tier.as_str().to_string())
388 .or_default() += 1;
389 *entry
390 .outcome_distribution
391 .entry(record.outcome.as_str().to_string())
392 .or_default() += 1;
393 }
394
395 by_agent
396 .into_iter()
397 .map(|(agent, summary)| TrustAgentSummary {
398 agent,
399 total: summary.total,
400 success_rate: if summary.total == 0 {
401 0.0
402 } else {
403 summary.successes as f64 / summary.total as f64
404 },
405 mean_cost_usd: (summary.cost_count > 0)
406 .then_some(summary.cost_sum / summary.cost_count as f64),
407 tier_distribution: summary.tier_distribution,
408 outcome_distribution: summary.outcome_distribution,
409 })
410 .collect()
411}
412
413pub async fn resolve_agent_autonomy_tier(
414 log: &Arc<AnyEventLog>,
415 agent: &str,
416 default: AutonomyTier,
417) -> Result<AutonomyTier, LogError> {
418 let records = query_trust_records(
419 log,
420 &TrustQueryFilters {
421 agent: Some(agent.to_string()),
422 ..TrustQueryFilters::default()
423 },
424 )
425 .await?;
426 let mut current = default;
427 for record in records {
428 if matches!(record.action.as_str(), "trust.promote" | "trust.demote")
429 && record.outcome == TrustOutcome::Success
430 {
431 current = record.autonomy_tier;
432 }
433 }
434 Ok(current)
435}
436
437fn matches_filters(record: &TrustRecord, filters: &TrustQueryFilters) -> bool {
438 if let Some(agent) = filters.agent.as_deref() {
439 if record.agent != agent {
440 return false;
441 }
442 }
443 if let Some(action) = filters.action.as_deref() {
444 if record.action != action {
445 return false;
446 }
447 }
448 if let Some(since) = filters.since {
449 if record.timestamp < since {
450 return false;
451 }
452 }
453 if let Some(until) = filters.until {
454 if record.timestamp > until {
455 return false;
456 }
457 }
458 if let Some(tier) = filters.tier {
459 if record.autonomy_tier != tier {
460 return false;
461 }
462 }
463 if let Some(outcome) = filters.outcome {
464 if record.outcome != outcome {
465 return false;
466 }
467 }
468 true
469}
470
471fn query_topics(filters: &TrustQueryFilters) -> Result<Vec<Topic>, LogError> {
472 match filters.agent.as_deref() {
473 Some(agent) => unique_topics(vec![
474 topic_for_agent(agent)?,
475 legacy_topic_for_agent(agent)?,
476 ]),
477 None => unique_topics(vec![global_topic()?, legacy_global_topic()?]),
478 }
479}
480
481fn append_topics_for_record(record: &TrustRecord) -> Result<Vec<Topic>, LogError> {
482 unique_topics(vec![
483 global_topic()?,
484 legacy_global_topic()?,
485 topic_for_agent(&record.agent)?,
486 legacy_topic_for_agent(&record.agent)?,
487 ])
488}
489
490fn unique_topics(topics: Vec<Topic>) -> Result<Vec<Topic>, LogError> {
491 let mut seen = HashSet::new();
492 Ok(topics
493 .into_iter()
494 .filter(|topic| seen.insert(topic.as_str().to_string()))
495 .collect())
496}
497
498async fn finalize_trust_record(
499 log: &Arc<AnyEventLog>,
500 mut record: TrustRecord,
501) -> Result<TrustRecord, LogError> {
502 let latest = latest_chain_record(log).await?;
503 record.chain_index = latest
504 .as_ref()
505 .map(|(_, record)| record.chain_index.saturating_add(1).max(1))
506 .unwrap_or(1);
507 record.previous_hash = latest.and_then(|(_, record)| {
508 if record.entry_hash.is_empty() {
509 compute_trust_record_hash(&record).ok()
510 } else {
511 Some(record.entry_hash)
512 }
513 });
514 record.entry_hash.clear();
515 record.entry_hash = compute_trust_record_hash(&record)?;
516 Ok(record)
517}
518
519async fn latest_chain_record(
520 log: &Arc<AnyEventLog>,
521) -> Result<Option<(EventId, TrustRecord)>, LogError> {
522 let (_, records) = preferred_chain_records(log).await?;
523 Ok(records.into_iter().last())
524}
525
526async fn preferred_chain_records(
527 log: &Arc<AnyEventLog>,
528) -> Result<(Topic, Vec<(EventId, TrustRecord)>), LogError> {
529 let canonical = global_topic()?;
530 let canonical_records = read_trust_records_from_topic(log, &canonical).await?;
531 if !canonical_records.is_empty() {
532 return Ok((canonical, canonical_records));
533 }
534 let legacy = legacy_global_topic()?;
535 let legacy_records = read_trust_records_from_topic(log, &legacy).await?;
536 if legacy_records.is_empty() {
537 Ok((canonical, Vec::new()))
538 } else {
539 Ok((legacy, legacy_records))
540 }
541}
542
543async fn read_trust_records_from_topic(
544 log: &Arc<AnyEventLog>,
545 topic: &Topic,
546) -> Result<Vec<(EventId, TrustRecord)>, LogError> {
547 let events = log.read_range(topic, None, usize::MAX).await?;
548 let mut records = Vec::new();
549 let mut seen = HashSet::new();
550 for (event_id, event) in events {
551 if event.kind != TRUST_GRAPH_EVENT_KIND {
552 continue;
553 }
554 let Ok(record) = serde_json::from_value::<TrustRecord>(event.payload) else {
555 continue;
556 };
557 if seen.insert(trust_record_dedupe_key(&record)) {
558 records.push((event_id, record));
559 }
560 }
561 Ok(records)
562}
563
564fn trust_record_dedupe_key(record: &TrustRecord) -> String {
565 if !record.entry_hash.is_empty() {
566 return record.entry_hash.clone();
567 }
568 record.record_id.clone()
569}
570
571fn score_from_records(
572 agent: &str,
573 action: Option<&str>,
574 effective_tier: AutonomyTier,
575 records: &[TrustRecord],
576) -> TrustScore {
577 let mut score = TrustScore {
578 agent: agent.to_string(),
579 action: action.map(ToString::to_string),
580 effective_tier,
581 ..TrustScore::default()
582 };
583 for record in records {
584 score.total += 1;
585 match record.outcome {
586 TrustOutcome::Success => score.successes += 1,
587 TrustOutcome::Failure => score.failures += 1,
588 TrustOutcome::Denied => score.denied += 1,
589 TrustOutcome::Timeout => score.timeouts += 1,
590 }
591 score.latest_outcome = Some(record.outcome);
592 score.latest_timestamp = Some(record.timestamp);
593 }
594 score.success_rate = if score.total == 0 {
595 0.0
596 } else {
597 score.successes as f64 / score.total as f64
598 };
599 score.policy = policy_from_score(&score);
600 score
601}
602
603fn policy_from_score(score: &TrustScore) -> CapabilityPolicy {
604 let mut policy = policy_for_autonomy_tier(score.effective_tier);
605 let latest_bad = matches!(
606 score.latest_outcome,
607 Some(TrustOutcome::Denied | TrustOutcome::Failure | TrustOutcome::Timeout)
608 );
609 if latest_bad || (score.total >= 3 && score.success_rate < 0.5) {
610 policy.side_effect_level = Some("read_only".to_string());
611 }
612 policy
613}
614
615pub fn policy_for_autonomy_tier(tier: AutonomyTier) -> CapabilityPolicy {
616 CapabilityPolicy {
617 side_effect_level: Some(
618 match tier {
619 AutonomyTier::Shadow => "none",
620 AutonomyTier::Suggest => "read_only",
621 AutonomyTier::ActWithApproval => "read_only",
622 AutonomyTier::ActAuto => "network",
623 }
624 .to_string(),
625 ),
626 recursion_limit: matches!(tier, AutonomyTier::Shadow).then_some(0),
627 ..CapabilityPolicy::default()
628 }
629}
630
631fn apply_record_limit(records: &mut Vec<TrustRecord>, limit: Option<usize>) {
632 let Some(limit) = limit else {
633 return;
634 };
635 if records.len() <= limit {
636 return;
637 }
638 let keep_from = records.len() - limit;
639 records.drain(0..keep_from);
640}
641
642#[cfg(test)]
643mod tests {
644 use super::*;
645 use crate::event_log::MemoryEventLog;
646 use time::Duration;
647
648 const RECORD_SCHEMA_JSON: &str =
649 include_str!("../../../opentrustgraph-spec/schemas/trust-record.v0.schema.json");
650 const CHAIN_SCHEMA_JSON: &str =
651 include_str!("../../../opentrustgraph-spec/schemas/trust-chain.v0.schema.json");
652 const VALID_DECISION_CHAIN_JSON: &str =
653 include_str!("../../../opentrustgraph-spec/fixtures/valid/decision-chain.json");
654 const VALID_TIER_TRANSITION_JSON: &str =
655 include_str!("../../../opentrustgraph-spec/fixtures/valid/tier-transition.json");
656 const INVALID_TAMPERED_CHAIN_JSON: &str =
657 include_str!("../../../opentrustgraph-spec/fixtures/invalid/tampered-chain.json");
658 const INVALID_MISSING_APPROVAL_JSON: &str =
659 include_str!("../../../opentrustgraph-spec/fixtures/invalid/missing-approval.json");
660
661 #[derive(Debug, serde::Deserialize)]
662 struct TrustChainFixture {
663 schema: String,
664 chain: TrustChainFixtureMetadata,
665 records: Vec<TrustRecord>,
666 }
667
668 #[derive(Debug, serde::Deserialize)]
669 struct TrustChainFixtureMetadata {
670 topic: String,
671 total: u64,
672 root_hash: Option<String>,
673 verified: bool,
674 generated_at: String,
675 producer: BTreeMap<String, serde_json::Value>,
676 }
677
678 #[tokio::test]
679 async fn append_and_query_round_trip() {
680 let log: Arc<AnyEventLog> = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(16)));
681 let mut record = TrustRecord::new(
682 "github-triage-bot",
683 "github.issue.opened",
684 Some("reviewer".to_string()),
685 TrustOutcome::Success,
686 "trace-1",
687 AutonomyTier::ActWithApproval,
688 );
689 record.cost_usd = Some(1.25);
690 append_trust_record(&log, &record).await.unwrap();
691
692 let records = query_trust_records(
693 &log,
694 &TrustQueryFilters {
695 agent: Some("github-triage-bot".to_string()),
696 ..TrustQueryFilters::default()
697 },
698 )
699 .await
700 .unwrap();
701
702 assert_eq!(records.len(), 1);
703 assert_eq!(records[0].agent, "github-triage-bot");
704 assert_eq!(records[0].cost_usd, Some(1.25));
705 assert_eq!(records[0].chain_index, 1);
706 assert!(records[0].previous_hash.is_none());
707 assert!(records[0].entry_hash.starts_with("sha256:"));
708 }
709
710 #[tokio::test]
711 async fn verify_chain_detects_hash_tampering() {
712 let log: Arc<AnyEventLog> = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(16)));
713 let first = append_trust_record(
714 &log,
715 &TrustRecord::new(
716 "bot",
717 "first",
718 None,
719 TrustOutcome::Success,
720 "trace-1",
721 AutonomyTier::Suggest,
722 ),
723 )
724 .await
725 .unwrap();
726 let mut second = append_trust_record(
727 &log,
728 &TrustRecord::new(
729 "bot",
730 "second",
731 None,
732 TrustOutcome::Success,
733 "trace-2",
734 AutonomyTier::Suggest,
735 ),
736 )
737 .await
738 .unwrap();
739
740 let report = verify_trust_chain(&log).await.unwrap();
741 assert!(report.verified);
742 assert_eq!(
743 report.root_hash.as_deref(),
744 Some(second.entry_hash.as_str())
745 );
746 assert_eq!(
747 second.previous_hash.as_deref(),
748 Some(first.entry_hash.as_str())
749 );
750
751 second.previous_hash = Some(
752 "sha256:0000000000000000000000000000000000000000000000000000000000000000".to_string(),
753 );
754 second.entry_hash =
755 "sha256:1111111111111111111111111111111111111111111111111111111111111111".to_string();
756 log.append(
757 &global_topic().unwrap(),
758 LogEvent::new(
759 TRUST_GRAPH_EVENT_KIND,
760 serde_json::to_value(second).unwrap(),
761 ),
762 )
763 .await
764 .unwrap();
765 let report = verify_trust_chain(&log).await.unwrap();
766 assert!(!report.verified);
767 assert!(report
768 .errors
769 .iter()
770 .any(|error| error.contains("previous_hash mismatch")));
771 }
772
773 #[tokio::test]
774 async fn resolve_autonomy_tier_prefers_latest_control_record() {
775 let log: Arc<AnyEventLog> = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(16)));
776 append_trust_record(
777 &log,
778 &TrustRecord::new(
779 "bot",
780 "trust.promote",
781 None,
782 TrustOutcome::Success,
783 "trace-1",
784 AutonomyTier::ActWithApproval,
785 ),
786 )
787 .await
788 .unwrap();
789 append_trust_record(
790 &log,
791 &TrustRecord::new(
792 "bot",
793 "trust.demote",
794 None,
795 TrustOutcome::Success,
796 "trace-2",
797 AutonomyTier::Shadow,
798 ),
799 )
800 .await
801 .unwrap();
802
803 let tier = resolve_agent_autonomy_tier(&log, "bot", AutonomyTier::ActAuto)
804 .await
805 .unwrap();
806 assert_eq!(tier, AutonomyTier::Shadow);
807 }
808
809 #[tokio::test]
810 async fn query_limit_keeps_newest_matching_records() {
811 let log: Arc<AnyEventLog> = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(16)));
812 let base = OffsetDateTime::from_unix_timestamp(1_775_000_000).unwrap();
813 for (offset, action) in ["first", "second", "third"].into_iter().enumerate() {
814 let mut record = TrustRecord::new(
815 "bot",
816 action,
817 None,
818 TrustOutcome::Success,
819 format!("trace-{action}"),
820 AutonomyTier::ActAuto,
821 );
822 record.timestamp = base + Duration::seconds(offset as i64);
823 append_trust_record(&log, &record).await.unwrap();
824 }
825
826 let records = query_trust_records(
827 &log,
828 &TrustQueryFilters {
829 agent: Some("bot".to_string()),
830 limit: Some(2),
831 ..TrustQueryFilters::default()
832 },
833 )
834 .await
835 .unwrap();
836
837 assert_eq!(records.len(), 2);
838 assert_eq!(records[0].action, "second");
839 assert_eq!(records[1].action, "third");
840 }
841
842 #[test]
843 fn group_by_trace_preserves_chronological_group_order() {
844 let make_record = |trace_id: &str, action: &str| TrustRecord {
845 trace_id: trace_id.to_string(),
846 action: action.to_string(),
847 ..TrustRecord::new(
848 "bot",
849 action,
850 None,
851 TrustOutcome::Success,
852 trace_id,
853 AutonomyTier::ActAuto,
854 )
855 };
856 let grouped = group_trust_records_by_trace(&[
857 make_record("trace-1", "first"),
858 make_record("trace-2", "second"),
859 make_record("trace-1", "third"),
860 ]);
861
862 assert_eq!(grouped.len(), 2);
863 assert_eq!(grouped[0].trace_id, "trace-1");
864 assert_eq!(grouped[0].records.len(), 2);
865 assert_eq!(grouped[0].records[1].action, "third");
866 assert_eq!(grouped[1].trace_id, "trace-2");
867 }
868
869 #[test]
870 fn opentrustgraph_schema_files_are_parseable_and_match_runtime_enums() {
871 let record_schema: serde_json::Value = serde_json::from_str(RECORD_SCHEMA_JSON).unwrap();
872 let chain_schema: serde_json::Value = serde_json::from_str(CHAIN_SCHEMA_JSON).unwrap();
873
874 assert_eq!(
875 record_schema["properties"]["schema"]["const"],
876 serde_json::json!(OPENTRUSTGRAPH_SCHEMA_V0)
877 );
878 assert_eq!(
879 chain_schema["properties"]["schema"]["const"],
880 serde_json::json!("opentrustgraph-chain/v0")
881 );
882
883 let outcomes = record_schema["properties"]["outcome"]["enum"]
884 .as_array()
885 .unwrap();
886 for outcome in [
887 TrustOutcome::Success,
888 TrustOutcome::Failure,
889 TrustOutcome::Denied,
890 TrustOutcome::Timeout,
891 ] {
892 assert!(outcomes.contains(&serde_json::json!(outcome.as_str())));
893 }
894
895 let tiers = record_schema["properties"]["autonomy_tier"]["enum"]
896 .as_array()
897 .unwrap();
898 for tier in [
899 AutonomyTier::Shadow,
900 AutonomyTier::Suggest,
901 AutonomyTier::ActWithApproval,
902 AutonomyTier::ActAuto,
903 ] {
904 assert!(tiers.contains(&serde_json::json!(tier.as_str())));
905 }
906 }
907
908 #[test]
909 fn opentrustgraph_valid_fixtures_match_runtime_contract() {
910 for (name, fixture) in [
911 ("decision-chain", VALID_DECISION_CHAIN_JSON),
912 ("tier-transition", VALID_TIER_TRANSITION_JSON),
913 ] {
914 let fixture = parse_chain_fixture(fixture);
915 let errors = validate_chain_fixture(&fixture);
916 assert!(errors.is_empty(), "{name} errors: {errors:?}");
917 }
918 }
919
920 #[test]
921 fn opentrustgraph_invalid_fixtures_exercise_expected_failures() {
922 let tampered = parse_chain_fixture(INVALID_TAMPERED_CHAIN_JSON);
923 let tampered_errors = validate_chain_fixture(&tampered);
924 assert!(
925 tampered_errors
926 .iter()
927 .any(|error| error.contains("previous_hash mismatch")),
928 "tampered-chain errors: {tampered_errors:?}"
929 );
930 assert!(
931 !tampered_errors
932 .iter()
933 .any(|error| error.contains("entry_hash mismatch")),
934 "tampered-chain should isolate hash-link tampering: {tampered_errors:?}"
935 );
936
937 let missing_approval = parse_chain_fixture(INVALID_MISSING_APPROVAL_JSON);
938 let missing_errors = validate_chain_fixture(&missing_approval);
939 assert!(
940 missing_errors
941 .iter()
942 .any(|error| error.contains("approval required")),
943 "missing-approval errors: {missing_errors:?}"
944 );
945 }
946
947 fn parse_chain_fixture(input: &str) -> TrustChainFixture {
948 serde_json::from_str(input).unwrap()
949 }
950
951 fn validate_chain_fixture(fixture: &TrustChainFixture) -> Vec<String> {
952 let mut errors = Vec::new();
953 if fixture.schema != "opentrustgraph-chain/v0" {
954 errors.push(format!("unsupported chain schema {}", fixture.schema));
955 }
956 if fixture.chain.topic.trim().is_empty() {
957 errors.push("chain topic is empty".to_string());
958 }
959 if fixture.chain.total != fixture.records.len() as u64 {
960 errors.push(format!(
961 "chain total mismatch; expected {}, found {}",
962 fixture.records.len(),
963 fixture.chain.total
964 ));
965 }
966 if fixture
967 .chain
968 .producer
969 .get("name")
970 .and_then(|value| value.as_str())
971 .unwrap_or_default()
972 .trim()
973 .is_empty()
974 {
975 errors.push("chain producer.name is empty".to_string());
976 }
977 if OffsetDateTime::parse(
978 &fixture.chain.generated_at,
979 &time::format_description::well_known::Rfc3339,
980 )
981 .is_err()
982 {
983 errors.push("chain generated_at is not RFC3339".to_string());
984 }
985
986 for (index, record) in fixture.records.iter().enumerate() {
987 errors.extend(validate_fixture_record_contract(index, record));
988 }
989 errors.extend(validate_fixture_hash_chain(fixture));
990
991 let expected_verified = errors.is_empty();
992 if fixture.chain.verified != expected_verified {
993 errors.push(format!(
994 "chain verified flag mismatch; expected {expected_verified}, found {}",
995 fixture.chain.verified
996 ));
997 }
998 errors
999 }
1000
1001 fn validate_fixture_record_contract(index: usize, record: &TrustRecord) -> Vec<String> {
1002 let mut errors = Vec::new();
1003 let label = format!("record {index}");
1004 if record.schema != OPENTRUSTGRAPH_SCHEMA_V0 {
1005 errors.push(format!("{label}: unsupported schema {}", record.schema));
1006 }
1007 if record.record_id.trim().is_empty() {
1008 errors.push(format!("{label}: record_id is empty"));
1009 }
1010 if record.agent.trim().is_empty() {
1011 errors.push(format!("{label}: agent is empty"));
1012 }
1013 if record.action.trim().is_empty() {
1014 errors.push(format!("{label}: action is empty"));
1015 }
1016 if record.trace_id.trim().is_empty() {
1017 errors.push(format!("{label}: trace_id is empty"));
1018 }
1019 if !record.entry_hash.starts_with("sha256:") {
1020 errors.push(format!("{label}: entry_hash is not sha256-prefixed"));
1021 }
1022 if let Some(cost_usd) = record.cost_usd {
1023 if cost_usd < 0.0 {
1024 errors.push(format!("{label}: cost_usd is negative"));
1025 }
1026 }
1027
1028 if record.outcome == TrustOutcome::Success
1029 && record.autonomy_tier == AutonomyTier::ActWithApproval
1030 && approval_required(record)
1031 {
1032 if record
1033 .approver
1034 .as_deref()
1035 .unwrap_or_default()
1036 .trim()
1037 .is_empty()
1038 {
1039 errors.push(format!("{label}: approval required but approver is empty"));
1040 }
1041 if approval_signature_count(record) == 0 {
1042 errors.push(format!(
1043 "{label}: approval required but signatures are empty"
1044 ));
1045 }
1046 }
1047
1048 errors
1049 }
1050
1051 fn validate_fixture_hash_chain(fixture: &TrustChainFixture) -> Vec<String> {
1052 let mut errors = Vec::new();
1053 let mut previous_hash: Option<String> = None;
1054
1055 for (position, record) in fixture.records.iter().enumerate() {
1056 let expected_index = position as u64 + 1;
1057 if record.chain_index != expected_index {
1058 errors.push(format!(
1059 "record {position}: expected chain_index {expected_index}, found {}",
1060 record.chain_index
1061 ));
1062 }
1063 if record.previous_hash != previous_hash {
1064 errors.push(format!(
1065 "record {position}: previous_hash mismatch; expected {:?}, found {:?}",
1066 previous_hash, record.previous_hash
1067 ));
1068 }
1069 let expected_hash = compute_trust_record_hash(record).unwrap();
1070 if expected_hash != record.entry_hash {
1071 errors.push(format!(
1072 "record {position}: entry_hash mismatch; expected {expected_hash}, found {}",
1073 record.entry_hash
1074 ));
1075 }
1076 previous_hash = Some(record.entry_hash.clone());
1077 }
1078
1079 if fixture.chain.root_hash != previous_hash {
1080 errors.push(format!(
1081 "chain root_hash mismatch; expected {:?}, found {:?}",
1082 previous_hash, fixture.chain.root_hash
1083 ));
1084 }
1085 errors
1086 }
1087
1088 fn approval_required(record: &TrustRecord) -> bool {
1089 record
1090 .metadata
1091 .get("approval")
1092 .and_then(|approval| approval.get("required"))
1093 .and_then(|required| required.as_bool())
1094 .unwrap_or(false)
1095 }
1096
1097 fn approval_signature_count(record: &TrustRecord) -> usize {
1098 record
1099 .metadata
1100 .get("approval")
1101 .and_then(|approval| approval.get("signatures"))
1102 .and_then(|signatures| signatures.as_array())
1103 .map(Vec::len)
1104 .unwrap_or(0)
1105 }
1106}