vela_protocol/
state_integrity.rs1use std::collections::{BTreeMap, BTreeSet};
9use std::path::Path;
10
11use serde::{Deserialize, Serialize};
12
13use crate::events::{self, ReplayReport};
14use crate::project::Project;
15use crate::repo;
16
17pub const STATE_INTEGRITY_SCHEMA: &str = "vela.state_integrity_report.v0.1";
18
19#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
20pub struct IntegrityIssue {
21 pub rule_id: String,
22 pub message: String,
23 #[serde(default, skip_serializing_if = "Option::is_none")]
24 pub object_id: Option<String>,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct StateIntegrityReport {
29 pub schema: String,
30 pub status: String,
31 #[serde(default)]
32 pub structural_errors: Vec<IntegrityIssue>,
33 #[serde(default)]
34 pub warnings: Vec<IntegrityIssue>,
35 pub proof_freshness: String,
36 pub replay: ReplayReport,
37 #[serde(default)]
38 pub summary: BTreeMap<String, usize>,
39}
40
41pub fn analyze_path(path: &Path) -> Result<StateIntegrityReport, String> {
42 let frontier = repo::load_from_path(path)?;
43 let mut report = analyze(&frontier);
44 for layout_issue in crate::frontier_repo::layout_issues(path, &frontier) {
45 report.structural_errors.push(IntegrityIssue {
46 rule_id: layout_issue.rule_id,
47 message: layout_issue.message,
48 object_id: None,
49 });
50 }
51 if !report.structural_errors.is_empty() {
52 report.status = "fail".to_string();
53 } else if !report.warnings.is_empty() {
54 report.status = "warn".to_string();
55 } else {
56 report.status = "ok".to_string();
57 }
58 report.summary.insert(
59 "structural_errors".to_string(),
60 report.structural_errors.len(),
61 );
62 report
63 .summary
64 .insert("warnings".to_string(), report.warnings.len());
65 Ok(report)
66}
67
68pub fn analyze(frontier: &Project) -> StateIntegrityReport {
69 let replay = events::replay_report(frontier);
70 let mut structural_errors = Vec::new();
71 let mut warnings = Vec::new();
72 let event_ids = frontier
73 .events
74 .iter()
75 .map(|event| event.id.as_str())
76 .collect::<BTreeSet<_>>();
77
78 for id in &replay.event_log.duplicate_ids {
79 structural_errors.push(issue(
80 "duplicate_event_id",
81 format!("Duplicate canonical event id {id}."),
82 Some(id.clone()),
83 ));
84 }
85 for id in &replay.event_log.orphan_targets {
86 structural_errors.push(issue(
87 "orphan_event_target",
88 format!("Canonical event targets missing finding {id}."),
89 Some(id.clone()),
90 ));
91 }
92 if !replay.ok {
93 for conflict in &replay.conflicts {
94 if conflict.starts_with("duplicate event id:")
95 || conflict.starts_with("orphan event target:")
96 {
97 continue;
98 }
99 structural_errors.push(issue("replay_conflict", conflict.clone(), None));
100 }
101 }
102
103 for event in &frontier.events {
104 if is_accepted_state_event(&event.kind)
105 && event
106 .payload
107 .get("proposal_id")
108 .and_then(|value| value.as_str())
109 .is_none_or(|value| value.trim().is_empty())
110 {
111 structural_errors.push(issue(
112 "accepted_event_missing_proposal_id",
113 format!("Accepted event {} has no payload.proposal_id.", event.id),
114 Some(event.id.clone()),
115 ));
116 }
117 }
118
119 for proposal in &frontier.proposals {
120 if matches!(proposal.status.as_str(), "accepted" | "applied") {
121 let Some(event_id) = proposal.applied_event_id.as_deref() else {
122 structural_errors.push(issue(
123 "applied_proposal_missing_event",
124 format!(
125 "Proposal {} is {} without an applied event id.",
126 proposal.id, proposal.status
127 ),
128 Some(proposal.id.clone()),
129 ));
130 continue;
131 };
132 if !event_ids.contains(event_id) {
133 structural_errors.push(issue(
134 "applied_proposal_event_missing",
135 format!(
136 "Proposal {} points to missing event {event_id}.",
137 proposal.id
138 ),
139 Some(proposal.id.clone()),
140 ));
141 }
142 }
143
144 if proposal.kind == "artifact.assert"
145 && matches!(proposal.status.as_str(), "accepted" | "applied")
146 {
147 let artifact = proposal.payload.get("artifact");
148 let locator_missing = artifact
149 .and_then(|value| value.get("locator"))
150 .and_then(|value| value.as_str())
151 .is_none_or(|value| value.trim().is_empty());
152 let hash_missing = artifact
153 .and_then(|value| value.get("content_hash"))
154 .and_then(|value| value.as_str())
155 .is_none_or(|value| !value.starts_with("sha256:"));
156 if locator_missing || hash_missing {
157 structural_errors.push(issue(
158 "accepted_artifact_missing_locator_or_hash",
159 format!(
160 "Artifact proposal {} is accepted without locator or content hash.",
161 proposal.id
162 ),
163 Some(proposal.id.clone()),
164 ));
165 }
166 }
167 }
168
169 let proof_freshness = proof_freshness(frontier);
170 if proof_freshness == "stale" {
171 structural_errors.push(issue(
172 "stale_proof_packet",
173 "Recorded proof packet is stale relative to accepted events.".to_string(),
174 None,
175 ));
176 } else if proof_freshness == "unknown" {
177 warnings.push(issue(
178 "proof_freshness_unknown",
179 "No current proof packet is recorded for this frontier.".to_string(),
180 None,
181 ));
182 }
183
184 let status = if structural_errors.is_empty() {
185 if warnings.is_empty() { "ok" } else { "warn" }
186 } else {
187 "fail"
188 }
189 .to_string();
190
191 let mut summary = BTreeMap::new();
192 summary.insert("events".to_string(), frontier.events.len());
193 summary.insert("proposals".to_string(), frontier.proposals.len());
194 summary.insert("structural_errors".to_string(), structural_errors.len());
195 summary.insert("warnings".to_string(), warnings.len());
196
197 StateIntegrityReport {
198 schema: STATE_INTEGRITY_SCHEMA.to_string(),
199 status,
200 structural_errors,
201 warnings,
202 proof_freshness,
203 replay,
204 summary,
205 }
206}
207
208fn issue(rule_id: &str, message: String, object_id: Option<String>) -> IntegrityIssue {
209 IntegrityIssue {
210 rule_id: rule_id.to_string(),
211 message,
212 object_id,
213 }
214}
215
216fn is_accepted_state_event(kind: &str) -> bool {
217 matches!(
218 kind,
219 "finding.asserted"
220 | "finding.reviewed"
221 | "finding.noted"
222 | "finding.caveated"
223 | "finding.confidence_revised"
224 | "finding.rejected"
225 | "finding.retracted"
226 | "finding.dependency_invalidated"
227 | "artifact.asserted"
228 | "artifact.reviewed"
229 | "artifact.retracted"
230 | "negative_result.asserted"
231 | "negative_result.reviewed"
232 | "negative_result.retracted"
233 | "trajectory.created"
234 | "trajectory.step_appended"
235 | "trajectory.reviewed"
236 | "trajectory.retracted"
237 )
238}
239
240fn proof_freshness(frontier: &Project) -> String {
241 let state = &frontier.proof_state.latest_packet;
242 if state.status == "never_exported" {
243 return "unknown".to_string();
244 }
245 if state.status == "stale" {
246 return "stale".to_string();
247 }
248 let current_event_hash = events::event_log_hash(&frontier.events);
249 match state.event_log_hash.as_deref() {
250 Some(hash) if hash == current_event_hash => "fresh".to_string(),
251 Some(_) => "stale".to_string(),
252 None => "unknown".to_string(),
253 }
254}