Skip to main content

vela_protocol/
state_integrity.rs

1//! Structural integrity checks for accepted frontier state.
2//!
3//! These checks are intentionally about substrate correctness, not scientific
4//! completeness. Missing evidence spans can keep a frontier out of strict proof
5//! use; duplicate events, broken replay, and applied proposals without events
6//! are harder failures because they mean the state history itself is suspect.
7
8use std::collections::{BTreeMap, BTreeSet};
9use std::path::Path;
10
11use serde::{Deserialize, Serialize};
12
13use crate::events::{self, ReplayReport};
14use crate::project::Project;
15use crate::repo;
16
17pub const STATE_INTEGRITY_SCHEMA: &str = "vela.state_integrity_report.v0.1";
18
19#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
20pub struct IntegrityIssue {
21    pub rule_id: String,
22    pub message: String,
23    #[serde(default, skip_serializing_if = "Option::is_none")]
24    pub object_id: Option<String>,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct StateIntegrityReport {
29    pub schema: String,
30    pub status: String,
31    #[serde(default)]
32    pub structural_errors: Vec<IntegrityIssue>,
33    #[serde(default)]
34    pub warnings: Vec<IntegrityIssue>,
35    pub proof_freshness: String,
36    pub replay: ReplayReport,
37    #[serde(default)]
38    pub summary: BTreeMap<String, usize>,
39}
40
41pub fn analyze_path(path: &Path) -> Result<StateIntegrityReport, String> {
42    let frontier = repo::load_from_path(path)?;
43    let mut report = analyze(&frontier);
44    for layout_issue in crate::frontier_repo::layout_issues(path, &frontier) {
45        report.structural_errors.push(IntegrityIssue {
46            rule_id: layout_issue.rule_id,
47            message: layout_issue.message,
48            object_id: None,
49        });
50    }
51    if !report.structural_errors.is_empty() {
52        report.status = "fail".to_string();
53    } else if !report.warnings.is_empty() {
54        report.status = "warn".to_string();
55    } else {
56        report.status = "ok".to_string();
57    }
58    report.summary.insert(
59        "structural_errors".to_string(),
60        report.structural_errors.len(),
61    );
62    report
63        .summary
64        .insert("warnings".to_string(), report.warnings.len());
65    Ok(report)
66}
67
68pub fn analyze(frontier: &Project) -> StateIntegrityReport {
69    let replay = events::replay_report(frontier);
70    let mut structural_errors = Vec::new();
71    let mut warnings = Vec::new();
72    let event_ids = frontier
73        .events
74        .iter()
75        .map(|event| event.id.as_str())
76        .collect::<BTreeSet<_>>();
77
78    for id in &replay.event_log.duplicate_ids {
79        structural_errors.push(issue(
80            "duplicate_event_id",
81            format!("Duplicate canonical event id {id}."),
82            Some(id.clone()),
83        ));
84    }
85    for id in &replay.event_log.orphan_targets {
86        structural_errors.push(issue(
87            "orphan_event_target",
88            format!("Canonical event targets missing finding {id}."),
89            Some(id.clone()),
90        ));
91    }
92    if !replay.ok {
93        for conflict in &replay.conflicts {
94            if conflict.starts_with("duplicate event id:")
95                || conflict.starts_with("orphan event target:")
96            {
97                continue;
98            }
99            structural_errors.push(issue("replay_conflict", conflict.clone(), None));
100        }
101    }
102
103    for event in &frontier.events {
104        if is_accepted_state_event(&event.kind)
105            && event
106                .payload
107                .get("proposal_id")
108                .and_then(|value| value.as_str())
109                .is_none_or(|value| value.trim().is_empty())
110        {
111            structural_errors.push(issue(
112                "accepted_event_missing_proposal_id",
113                format!("Accepted event {} has no payload.proposal_id.", event.id),
114                Some(event.id.clone()),
115            ));
116        }
117    }
118
119    for proposal in &frontier.proposals {
120        if matches!(proposal.status.as_str(), "accepted" | "applied") {
121            let Some(event_id) = proposal.applied_event_id.as_deref() else {
122                structural_errors.push(issue(
123                    "applied_proposal_missing_event",
124                    format!(
125                        "Proposal {} is {} without an applied event id.",
126                        proposal.id, proposal.status
127                    ),
128                    Some(proposal.id.clone()),
129                ));
130                continue;
131            };
132            if !event_ids.contains(event_id) {
133                structural_errors.push(issue(
134                    "applied_proposal_event_missing",
135                    format!(
136                        "Proposal {} points to missing event {event_id}.",
137                        proposal.id
138                    ),
139                    Some(proposal.id.clone()),
140                ));
141            }
142        }
143
144        if proposal.kind == "artifact.assert"
145            && matches!(proposal.status.as_str(), "accepted" | "applied")
146        {
147            let artifact = proposal.payload.get("artifact");
148            let locator_missing = artifact
149                .and_then(|value| value.get("locator"))
150                .and_then(|value| value.as_str())
151                .is_none_or(|value| value.trim().is_empty());
152            let hash_missing = artifact
153                .and_then(|value| value.get("content_hash"))
154                .and_then(|value| value.as_str())
155                .is_none_or(|value| !value.starts_with("sha256:"));
156            if locator_missing || hash_missing {
157                structural_errors.push(issue(
158                    "accepted_artifact_missing_locator_or_hash",
159                    format!(
160                        "Artifact proposal {} is accepted without locator or content hash.",
161                        proposal.id
162                    ),
163                    Some(proposal.id.clone()),
164                ));
165            }
166        }
167    }
168
169    let proof_freshness = proof_freshness(frontier);
170    if proof_freshness == "stale" {
171        structural_errors.push(issue(
172            "stale_proof_packet",
173            "Recorded proof packet is stale relative to accepted events.".to_string(),
174            None,
175        ));
176    } else if proof_freshness == "unknown" {
177        warnings.push(issue(
178            "proof_freshness_unknown",
179            "No current proof packet is recorded for this frontier.".to_string(),
180            None,
181        ));
182    }
183
184    let status = if structural_errors.is_empty() {
185        if warnings.is_empty() { "ok" } else { "warn" }
186    } else {
187        "fail"
188    }
189    .to_string();
190
191    let mut summary = BTreeMap::new();
192    summary.insert("events".to_string(), frontier.events.len());
193    summary.insert("proposals".to_string(), frontier.proposals.len());
194    summary.insert("structural_errors".to_string(), structural_errors.len());
195    summary.insert("warnings".to_string(), warnings.len());
196
197    StateIntegrityReport {
198        schema: STATE_INTEGRITY_SCHEMA.to_string(),
199        status,
200        structural_errors,
201        warnings,
202        proof_freshness,
203        replay,
204        summary,
205    }
206}
207
208fn issue(rule_id: &str, message: String, object_id: Option<String>) -> IntegrityIssue {
209    IntegrityIssue {
210        rule_id: rule_id.to_string(),
211        message,
212        object_id,
213    }
214}
215
216fn is_accepted_state_event(kind: &str) -> bool {
217    matches!(
218        kind,
219        "finding.asserted"
220            | "finding.reviewed"
221            | "finding.noted"
222            | "finding.caveated"
223            | "finding.confidence_revised"
224            | "finding.rejected"
225            | "finding.retracted"
226            | "finding.dependency_invalidated"
227            | "artifact.asserted"
228            | "artifact.reviewed"
229            | "artifact.retracted"
230            | "negative_result.asserted"
231            | "negative_result.reviewed"
232            | "negative_result.retracted"
233            | "trajectory.created"
234            | "trajectory.step_appended"
235            | "trajectory.reviewed"
236            | "trajectory.retracted"
237    )
238}
239
240fn proof_freshness(frontier: &Project) -> String {
241    let state = &frontier.proof_state.latest_packet;
242    if state.status == "never_exported" {
243        return "unknown".to_string();
244    }
245    if state.status == "stale" {
246        return "stale".to_string();
247    }
248    let current_event_hash = events::event_log_hash(&frontier.events);
249    match state.event_log_hash.as_deref() {
250        Some(hash) if hash == current_event_hash => "fresh".to_string(),
251        Some(_) => "stale".to_string(),
252        None => "unknown".to_string(),
253    }
254}