Skip to main content

vela_protocol/
propagate.rs

1//! Correction propagation through the frontier link graph.
2//!
3//! When a finding is corrected or retracted, everything that depends on it
4//! should know. This module walks the link graph and flags downstream findings,
5//! creating ReviewEvent records for each propagation step.
6
7use std::collections::{HashMap, HashSet, VecDeque};
8
9use chrono::Utc;
10use sha2::{Digest, Sha256};
11
12use colored::Colorize;
13
14use crate::bundle::{FindingBundle, ReviewAction, ReviewEvent};
15use crate::cli_style as style;
16use crate::events::{
17    EVENT_SCHEMA, StateActor, StateEvent, StateTarget, compute_event_id, finding_hash,
18};
19use crate::project::Project;
20use serde_json::json;
21
22/// The type of correction being propagated.
23#[derive(Debug, Clone)]
24pub enum PropagationAction {
25    /// Source paper was retracted. Mark finding as retracted, flag all dependents.
26    Retracted,
27    /// A specific field was corrected. Flag dependents if assertion text or direction changed.
28    #[allow(dead_code)]
29    Corrected {
30        field: String,
31        original: String,
32        corrected: String,
33    },
34    /// Confidence was reduced to a specific value. Flag dependents if below 0.5.
35    ConfidenceReduced { new_score: f64 },
36    /// v0.36.1: A `vrep_<id>` replication record landed against the
37    /// target finding. The target's confidence is recomputed from the
38    /// updated `Project.replications` collection (via
39    /// `Project::compute_confidence_for`). Dependents are flagged for
40    /// review when:
41    /// - `failed` / `partial`: downstream may need to weaken;
42    /// - `replicated`: downstream may now safely strengthen.
43    /// `inconclusive` outcomes do not cascade — they represent
44    /// methodological ambiguity, not evidence.
45    ReplicationOutcome { outcome: String, vrep_id: String },
46}
47
48/// Result of a propagation pass.
49pub struct PropagationResult {
50    /// Total findings directly or transitively affected.
51    pub affected: usize,
52    /// Finding IDs affected at each depth level.
53    pub cascade: Vec<Vec<String>>,
54    /// Review events created during propagation.
55    pub events: Vec<ReviewEvent>,
56}
57
58/// Maximum recursion depth to prevent runaway cascades.
59const MAX_DEPTH: usize = 3;
60
61/// Propagate a correction through the frontier. Returns a PropagationResult
62/// describing the cascade.
63pub fn propagate_correction(
64    frontier: &mut Project,
65    finding_id: &str,
66    action: PropagationAction,
67) -> PropagationResult {
68    let now = Utc::now().to_rfc3339();
69
70    // Build a reverse adjacency map: target_id -> list of (source_idx, link_type).
71    // We want findings that link TO the corrected finding via supports or depends.
72    let mut reverse_links: HashMap<String, Vec<(usize, String)>> = HashMap::new();
73    for (idx, finding) in frontier.findings.iter().enumerate() {
74        for link in &finding.links {
75            if link.link_type == "supports" || link.link_type == "depends" {
76                reverse_links
77                    .entry(link.target.clone())
78                    .or_default()
79                    .push((idx, link.link_type.clone()));
80            }
81        }
82    }
83
84    // Also build forward links: source finding has links with target.
85    // Findings that the corrected finding supports or that depend on it.
86    let mut forward_deps: HashMap<String, Vec<(usize, String)>> = HashMap::new();
87    for (idx, finding) in frontier.findings.iter().enumerate() {
88        for link in &finding.links {
89            forward_deps
90                .entry(finding.id.clone())
91                .or_default()
92                .push((idx, link.link_type.clone()));
93        }
94    }
95
96    // Find the source finding index.
97    let source_idx = frontier.findings.iter().position(|f| f.id == finding_id);
98
99    let mut events: Vec<ReviewEvent> = Vec::new();
100    let mut cascade: Vec<Vec<String>> = Vec::new();
101
102    // Step 1: Apply the action to the source finding itself.
103    if let Some(idx) = source_idx {
104        match &action {
105            PropagationAction::Retracted => {
106                frontier.findings[idx].flags.retracted = true;
107                let event = make_event(
108                    finding_id,
109                    "propagation_engine",
110                    &now,
111                    ReviewAction::Flagged {
112                        flag_type: "retracted".into(),
113                    },
114                    "Source paper retracted",
115                );
116                events.push(event);
117            }
118            PropagationAction::Corrected {
119                field,
120                original,
121                corrected,
122            } => {
123                let event = make_event(
124                    finding_id,
125                    "propagation_engine",
126                    &now,
127                    ReviewAction::Corrected {
128                        field: field.clone(),
129                        original: original.clone(),
130                        corrected: corrected.clone(),
131                    },
132                    "Upstream correction applied",
133                );
134                events.push(event);
135            }
136            PropagationAction::ConfidenceReduced { new_score } => {
137                let old = frontier.findings[idx].confidence.score;
138                frontier.findings[idx].confidence.score = *new_score;
139                frontier.findings[idx].confidence.basis = format!(
140                    "Reduced from {:.3} to {:.3} (manual correction)",
141                    old, new_score
142                );
143                let event = make_event(
144                    finding_id,
145                    "propagation_engine",
146                    &now,
147                    ReviewAction::Flagged {
148                        flag_type: format!("confidence_reduced_to_{:.2}", new_score),
149                    },
150                    &format!("Confidence reduced from {:.3} to {:.3}", old, new_score),
151                );
152                events.push(event);
153            }
154            PropagationAction::ReplicationOutcome { outcome, vrep_id } => {
155                // Recompute the target finding's confidence from the
156                // current `Project.replications` collection. This is the
157                // v0.36.1 source-of-truth path: confidence is a function
158                // of recorded replications, not the legacy scalar flag.
159                let target_bundle = frontier.findings[idx].clone();
160                let new_conf = frontier.compute_confidence_for(&target_bundle);
161                let old = frontier.findings[idx].confidence.score;
162                let new_score = new_conf.score;
163                // v0.43.0 fix: capture the finding's hash *before* mutating
164                // it, then emit a canonical `finding.confidence_revised`
165                // state event after mutation so the per-finding event
166                // chain stays continuous. Pre-v0.43.0 the cascade
167                // mutated confidence silently and broke replay.
168                let before_hash = finding_hash(&frontier.findings[idx]);
169                frontier.findings[idx].confidence = new_conf;
170                let after_hash = finding_hash(&frontier.findings[idx]);
171                let revise_reason = format!(
172                    "{outcome} replication {vrep_id} recorded; confidence {:.3} -> {:.3}",
173                    old, new_score
174                );
175                let mut state_event = StateEvent {
176                    schema: EVENT_SCHEMA.to_string(),
177                    id: String::new(),
178                    kind: "finding.confidence_revised".to_string(),
179                    target: StateTarget {
180                        r#type: "finding".to_string(),
181                        id: finding_id.to_string(),
182                    },
183                    actor: StateActor {
184                        id: "propagation_engine".to_string(),
185                        r#type: "system".to_string(),
186                    },
187                    timestamp: now.clone(),
188                    reason: revise_reason.clone(),
189                    before_hash,
190                    after_hash,
191                    payload: json!({
192                        "proposal_id": format!("vpr_synthetic_{}", &vrep_id[..vrep_id.len().min(16)]),
193                        "previous_score": old,
194                        "new_score": new_score,
195                        "trigger": "replication_outcome",
196                        "vrep_id": vrep_id,
197                        "outcome": outcome,
198                    }),
199                    caveats: Vec::new(),
200                    signature: None,
201                };
202                state_event.id = compute_event_id(&state_event);
203                frontier.events.push(state_event);
204
205                // Also emit the human-readable review event for the
206                // review queue (existing v0.36.1 behavior).
207                let event = make_event(
208                    finding_id,
209                    "propagation_engine",
210                    &now,
211                    ReviewAction::Flagged {
212                        flag_type: format!("replication_{}", outcome),
213                    },
214                    &revise_reason,
215                );
216                events.push(event);
217            }
218        }
219    }
220
221    // Step 2: BFS through dependents, up to MAX_DEPTH.
222    let mut visited: HashSet<String> = HashSet::new();
223    visited.insert(finding_id.to_string());
224
225    let mut queue: VecDeque<(String, usize)> = VecDeque::new();
226    queue.push_back((finding_id.to_string(), 0));
227
228    while let Some((current_id, depth)) = queue.pop_front() {
229        if depth >= MAX_DEPTH {
230            continue;
231        }
232
233        // Find all findings that have a supports/depends link targeting current_id.
234        let dependents = find_dependents(&frontier.findings, &current_id);
235
236        if dependents.is_empty() {
237            continue;
238        }
239
240        let mut level_ids: Vec<String> = Vec::new();
241
242        for dep_idx in dependents {
243            let dep_id = frontier.findings[dep_idx].id.clone();
244            if visited.contains(&dep_id) {
245                continue;
246            }
247            visited.insert(dep_id.clone());
248
249            // Flag the dependent finding.
250            let (flag_type, reason) = match &action {
251                PropagationAction::Retracted => (
252                    "upstream_retracted".to_string(),
253                    format!(
254                        "Upstream finding {} was retracted (depth {})",
255                        finding_id,
256                        depth + 1
257                    ),
258                ),
259                PropagationAction::Corrected { field, .. } => (
260                    "upstream_corrected".to_string(),
261                    format!(
262                        "Upstream finding {} had field '{}' corrected (depth {})",
263                        finding_id,
264                        field,
265                        depth + 1
266                    ),
267                ),
268                PropagationAction::ConfidenceReduced { new_score } => {
269                    if *new_score < 0.5 {
270                        (
271                            "upstream_at_risk".to_string(),
272                            format!(
273                                "Upstream finding {} confidence reduced to {:.2} (depth {})",
274                                finding_id,
275                                new_score,
276                                depth + 1
277                            ),
278                        )
279                    } else {
280                        continue; // Only propagate if below 0.5
281                    }
282                }
283                PropagationAction::ReplicationOutcome { outcome, .. } => match outcome.as_str() {
284                    "failed" => (
285                        "upstream_replication_failed".to_string(),
286                        format!(
287                            "Upstream finding {} failed replication (depth {})",
288                            finding_id,
289                            depth + 1
290                        ),
291                    ),
292                    "partial" => (
293                        "upstream_replication_partial".to_string(),
294                        format!(
295                            "Upstream finding {} partially replicated (depth {})",
296                            finding_id,
297                            depth + 1
298                        ),
299                    ),
300                    "replicated" => (
301                        "upstream_replication_succeeded".to_string(),
302                        format!(
303                            "Upstream finding {} replicated successfully (depth {})",
304                            finding_id,
305                            depth + 1
306                        ),
307                    ),
308                    // `inconclusive` and unknown outcomes do not cascade.
309                    _ => continue,
310                },
311            };
312
313            let event = make_event(
314                &dep_id,
315                "propagation_engine",
316                &now,
317                ReviewAction::Flagged {
318                    flag_type: flag_type.clone(),
319                },
320                &reason,
321            );
322            events.push(event);
323            level_ids.push(dep_id.clone());
324
325            // If retracted, also mark the dependent as contested.
326            if matches!(action, PropagationAction::Retracted) {
327                frontier.findings[dep_idx].flags.contested = true;
328            }
329
330            queue.push_back((dep_id, depth + 1));
331        }
332
333        if !level_ids.is_empty() {
334            // Ensure cascade has enough depth levels.
335            while cascade.len() <= depth {
336                cascade.push(Vec::new());
337            }
338            cascade[depth].extend(level_ids);
339        }
340    }
341
342    let affected = cascade.iter().map(|level| level.len()).sum();
343
344    PropagationResult {
345        affected,
346        cascade,
347        events,
348    }
349}
350
351/// Find indices of findings that have a supports or depends link targeting the
352/// given finding ID.
353fn find_dependents(findings: &[FindingBundle], target_id: &str) -> Vec<usize> {
354    findings
355        .iter()
356        .enumerate()
357        .filter(|(_, f)| {
358            f.links.iter().any(|l| {
359                l.target == target_id && (l.link_type == "supports" || l.link_type == "depends")
360            })
361        })
362        .map(|(idx, _)| idx)
363        .collect()
364}
365
366/// Create a content-addressed review event.
367fn make_event(
368    finding_id: &str,
369    reviewer: &str,
370    timestamp: &str,
371    action: ReviewAction,
372    reason: &str,
373) -> ReviewEvent {
374    let content = serde_json::json!({
375        "finding_id": finding_id,
376        "reviewer": reviewer,
377        "reviewed_at": timestamp,
378        "action": action,
379        "reason": reason,
380    });
381    let canonical = serde_json::to_string(&content).unwrap_or_default();
382    let hash = Sha256::digest(canonical.as_bytes());
383    let id = format!("rev_{}", &hex::encode(hash)[..16]);
384
385    ReviewEvent {
386        id,
387        workspace: None,
388        finding_id: finding_id.to_string(),
389        reviewer: reviewer.to_string(),
390        reviewed_at: timestamp.to_string(),
391        scope: None,
392        status: None,
393        action,
394        reason: reason.to_string(),
395        evidence_considered: Vec::new(),
396        state_change: None,
397    }
398}
399
400/// Create a review event recording a retraction with a human-readable reason.
401pub fn make_retraction_event(finding_id: &str, reason: &str) -> ReviewEvent {
402    let now = Utc::now().to_rfc3339();
403    make_event(
404        finding_id,
405        "retraction",
406        &now,
407        ReviewAction::Flagged {
408            flag_type: "retracted".into(),
409        },
410        reason,
411    )
412}
413
414/// Print a propagation result to stdout.
415pub fn print_result(result: &PropagationResult, action_label: &str, finding_id: &str) {
416    println!();
417    println!(
418        "  {}",
419        format!(
420            "VELA · PROPAGATE · {} · {}",
421            action_label.to_uppercase(),
422            finding_id
423        )
424        .dimmed()
425    );
426    println!("  {}", style::tick_row(60));
427    println!("  {} findings affected", result.affected);
428
429    for (depth, ids) in result.cascade.iter().enumerate() {
430        if !ids.is_empty() {
431            println!("  depth {}: {} findings", depth + 1, ids.len());
432            for id in ids {
433                println!("    · {}", id);
434            }
435        }
436    }
437
438    if !result.events.is_empty() {
439        println!();
440        println!("  review events created: {}", result.events.len());
441        for event in &result.events {
442            println!(
443                "    {} · {} · {}",
444                event.id.dimmed(),
445                event.finding_id,
446                event.reason
447            );
448        }
449    }
450    println!();
451}
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456    use crate::bundle::*;
457    use crate::project;
458
459    fn make_finding(id: &str, score: f64) -> FindingBundle {
460        FindingBundle {
461            id: id.into(),
462            version: 1,
463            previous_version: None,
464            assertion: Assertion {
465                text: format!("Finding {id}"),
466                assertion_type: "mechanism".into(),
467                entities: vec![],
468                relation: None,
469                direction: None,
470                causal_claim: None,
471                causal_evidence_grade: None,
472            },
473            evidence: Evidence {
474                evidence_type: "experimental".into(),
475                model_system: String::new(),
476                species: None,
477                method: String::new(),
478                sample_size: None,
479                effect_size: None,
480                p_value: None,
481                replicated: false,
482                replication_count: None,
483                evidence_spans: vec![],
484            },
485            conditions: Conditions {
486                text: String::new(),
487                species_verified: vec![],
488                species_unverified: vec![],
489                in_vitro: false,
490                in_vivo: false,
491                human_data: false,
492                clinical_trial: false,
493                concentration_range: None,
494                duration: None,
495                age_group: None,
496                cell_type: None,
497            },
498            confidence: Confidence::raw(score, "test", 0.85),
499            provenance: Provenance {
500                source_type: "published_paper".into(),
501                doi: None,
502                pmid: None,
503                pmc: None,
504                openalex_id: None,
505                url: None,
506                title: "Test".into(),
507                authors: vec![],
508                year: Some(2025),
509                journal: None,
510                license: None,
511                publisher: None,
512                funders: vec![],
513                extraction: Extraction::default(),
514                review: None,
515                citation_count: None,
516            },
517            flags: Flags {
518                gap: false,
519                negative_space: false,
520                contested: false,
521                retracted: false,
522                declining: false,
523                gravity_well: false,
524                review_state: None,
525                superseded: false,
526                signature_threshold: None,
527                jointly_accepted: false,
528            },
529            links: vec![],
530            attachments: vec![],
531            annotations: vec![],
532            created: String::new(),
533            updated: None,
534
535            access_tier: crate::access_tier::AccessTier::Public,
536        }
537    }
538
539    fn make_frontier(findings: Vec<FindingBundle>) -> Project {
540        project::assemble("test", findings, 1, 0, "test frontier")
541    }
542
543    #[test]
544    fn retraction_propagates() {
545        let a = make_finding("a", 0.8);
546        let mut b = make_finding("b", 0.7);
547        // b depends on a
548        b.add_link("a", "depends", "b depends on a");
549
550        let mut c = make_frontier(vec![a, b]);
551        let result = propagate_correction(&mut c, "a", PropagationAction::Retracted);
552
553        // a should be retracted
554        assert!(c.findings[0].flags.retracted);
555        // b should be contested (flagged)
556        assert!(c.findings[1].flags.contested);
557        assert_eq!(result.affected, 1);
558    }
559
560    #[test]
561    fn confidence_reduction_propagates_below_half() {
562        let a = make_finding("a", 0.8);
563        let mut b = make_finding("b", 0.7);
564        b.add_link("a", "supports", "b supports a");
565
566        let mut c = make_frontier(vec![a, b]);
567        let result = propagate_correction(
568            &mut c,
569            "a",
570            PropagationAction::ConfidenceReduced { new_score: 0.3 },
571        );
572
573        assert!((c.findings[0].confidence.score - 0.3).abs() < 0.001);
574        assert_eq!(result.affected, 1);
575    }
576
577    #[test]
578    fn confidence_above_half_does_not_propagate() {
579        let a = make_finding("a", 0.8);
580        let mut b = make_finding("b", 0.7);
581        b.add_link("a", "supports", "b supports a");
582
583        let mut c = make_frontier(vec![a, b]);
584        let result = propagate_correction(
585            &mut c,
586            "a",
587            PropagationAction::ConfidenceReduced { new_score: 0.6 },
588        );
589
590        // Confidence updated on source, but no cascade.
591        assert!((c.findings[0].confidence.score - 0.6).abs() < 0.001);
592        assert_eq!(result.affected, 0);
593    }
594
595    #[test]
596    fn failed_replication_flags_dependents() {
597        // a is supported by b. A failed replication of a lands.
598        // b should be flagged with `upstream_replication_failed`.
599        let a = make_finding("vf_aaaa", 0.8);
600        let mut b = make_finding("vf_bbbb", 0.7);
601        b.add_link("vf_aaaa", "supports", "b supports a");
602        let mut frontier = make_frontier(vec![a, b]);
603        let result = propagate_correction(
604            &mut frontier,
605            "vf_aaaa",
606            PropagationAction::ReplicationOutcome {
607                outcome: "failed".into(),
608                vrep_id: "vrep_test01".into(),
609            },
610        );
611        // b should be flagged.
612        assert_eq!(result.affected, 1);
613        assert!(
614            result
615                .events
616                .iter()
617                .any(|e| matches!(&e.action,
618                    ReviewAction::Flagged { flag_type } if flag_type == "upstream_replication_failed"))
619        );
620    }
621
622    #[test]
623    fn successful_replication_recomputes_target_and_flags_dependents() {
624        // a has a successful replication. After propagation, a's
625        // confidence is recomputed from Project.replications, and
626        // b is flagged for review.
627        let a = make_finding("vf_aaaa", 0.5);
628        let mut b = make_finding("vf_bbbb", 0.5);
629        b.add_link("vf_aaaa", "depends", "b depends on a");
630        let mut frontier = make_frontier(vec![a, b]);
631
632        // Inject a replicated record so compute_confidence_for has
633        // something to count.
634        frontier.replications.push(Replication {
635            id: "vrep_test02".into(),
636            target_finding: "vf_aaaa".into(),
637            attempted_by: "lab:test".into(),
638            outcome: "replicated".into(),
639            evidence: frontier.findings[0].evidence.clone(),
640            conditions: frontier.findings[0].conditions.clone(),
641            provenance: frontier.findings[0].provenance.clone(),
642            notes: String::new(),
643            created: String::new(),
644            previous_attempt: None,
645        });
646
647        let result = propagate_correction(
648            &mut frontier,
649            "vf_aaaa",
650            PropagationAction::ReplicationOutcome {
651                outcome: "replicated".into(),
652                vrep_id: "vrep_test02".into(),
653            },
654        );
655
656        // Target's confidence was recomputed.
657        assert_eq!(
658            frontier.findings[0].confidence.method,
659            ConfidenceMethod::Computed
660        );
661        // Dependent flagged for review.
662        assert_eq!(result.affected, 1);
663        assert!(
664            result
665                .events
666                .iter()
667                .any(|e| matches!(&e.action,
668                    ReviewAction::Flagged { flag_type } if flag_type == "upstream_replication_succeeded"))
669        );
670    }
671
672    #[test]
673    fn inconclusive_replication_does_not_cascade() {
674        let a = make_finding("vf_aaaa", 0.7);
675        let mut b = make_finding("vf_bbbb", 0.7);
676        b.add_link("vf_aaaa", "supports", "");
677        let mut frontier = make_frontier(vec![a, b]);
678        let result = propagate_correction(
679            &mut frontier,
680            "vf_aaaa",
681            PropagationAction::ReplicationOutcome {
682                outcome: "inconclusive".into(),
683                vrep_id: "vrep_test03".into(),
684            },
685        );
686        // Source still gets a recompute event, but no dependents flagged.
687        assert_eq!(result.affected, 0);
688    }
689
690    #[test]
691    fn depth_limit_respected() {
692        // Chain: a <- b <- c <- d <- e (each depends on previous)
693        let a = make_finding("a", 0.8);
694        let mut b = make_finding("b", 0.7);
695        b.add_link("a", "depends", "");
696        let mut c_f = make_finding("c", 0.7);
697        c_f.add_link("b", "depends", "");
698        let mut d = make_finding("d", 0.7);
699        d.add_link("c", "depends", "");
700        let mut e = make_finding("e", 0.7);
701        e.add_link("d", "depends", "");
702
703        let mut frontier = make_frontier(vec![a, b, c_f, d, e]);
704        let result = propagate_correction(&mut frontier, "a", PropagationAction::Retracted);
705
706        // Should stop at depth 3: b, c, d get flagged; e does not.
707        assert!(result.affected <= 3);
708    }
709}