Skip to main content

vela_protocol/
propagate.rs

1//! Correction propagation through the frontier link graph.
2//!
3//! When a finding is corrected or retracted, everything that depends on it
4//! should know. This module walks the link graph and flags downstream findings,
5//! creating ReviewEvent records for each propagation step.
6
7use std::collections::{HashMap, HashSet, VecDeque};
8
9use chrono::Utc;
10use sha2::{Digest, Sha256};
11
12use colored::Colorize;
13
14use crate::bundle::{FindingBundle, ReviewAction, ReviewEvent};
15use crate::cli_style as style;
16use crate::events::{
17    EVENT_SCHEMA, StateActor, StateEvent, StateTarget, compute_event_id, finding_hash,
18};
19use crate::project::Project;
20use serde_json::json;
21
22/// The type of correction being propagated.
23#[derive(Debug, Clone)]
24pub enum PropagationAction {
25    /// Source paper was retracted. Mark finding as retracted, flag all dependents.
26    Retracted,
27    /// A specific field was corrected. Flag dependents if assertion text or direction changed.
28    #[allow(dead_code)]
29    Corrected {
30        field: String,
31        original: String,
32        corrected: String,
33    },
34    /// Confidence was reduced to a specific value. Flag dependents if below 0.5.
35    ConfidenceReduced { new_score: f64 },
36    /// v0.36.1: A `vrep_<id>` replication record landed against the
37    /// target finding. The target's confidence is recomputed from the
38    /// updated `Project.replications` collection (via
39    /// `Project::compute_confidence_for`). Dependents are flagged for
40    /// review when:
41    /// - `failed` / `partial`: downstream may need to weaken;
42    /// - `replicated`: downstream may now safely strengthen.
43    /// `inconclusive` outcomes do not cascade — they represent
44    /// methodological ambiguity, not evidence.
45    ReplicationOutcome { outcome: String, vrep_id: String },
46}
47
48/// Result of a propagation pass.
49pub struct PropagationResult {
50    /// Total findings directly or transitively affected.
51    pub affected: usize,
52    /// Finding IDs affected at each depth level.
53    pub cascade: Vec<Vec<String>>,
54    /// Review events created during propagation.
55    pub events: Vec<ReviewEvent>,
56}
57
58/// Maximum recursion depth to prevent runaway cascades.
59const MAX_DEPTH: usize = 3;
60
61/// Propagate a correction through the frontier. Returns a PropagationResult
62/// describing the cascade.
63pub fn propagate_correction(
64    frontier: &mut Project,
65    finding_id: &str,
66    action: PropagationAction,
67) -> PropagationResult {
68    let now = Utc::now().to_rfc3339();
69
70    // Build a reverse adjacency map: target_id -> list of (source_idx, link_type).
71    // We want findings that link TO the corrected finding via supports or depends.
72    let mut reverse_links: HashMap<String, Vec<(usize, String)>> = HashMap::new();
73    for (idx, finding) in frontier.findings.iter().enumerate() {
74        for link in &finding.links {
75            if link.link_type == "supports" || link.link_type == "depends" {
76                reverse_links
77                    .entry(link.target.clone())
78                    .or_default()
79                    .push((idx, link.link_type.clone()));
80            }
81        }
82    }
83
84    // Also build forward links: source finding has links with target.
85    // Findings that the corrected finding supports or that depend on it.
86    let mut forward_deps: HashMap<String, Vec<(usize, String)>> = HashMap::new();
87    for (idx, finding) in frontier.findings.iter().enumerate() {
88        for link in &finding.links {
89            forward_deps
90                .entry(finding.id.clone())
91                .or_default()
92                .push((idx, link.link_type.clone()));
93        }
94    }
95
96    // Find the source finding index.
97    let source_idx = frontier.findings.iter().position(|f| f.id == finding_id);
98
99    let mut events: Vec<ReviewEvent> = Vec::new();
100    let mut cascade: Vec<Vec<String>> = Vec::new();
101
102    // Step 1: Apply the action to the source finding itself.
103    if let Some(idx) = source_idx {
104        match &action {
105            PropagationAction::Retracted => {
106                frontier.findings[idx].flags.retracted = true;
107                let event = make_event(
108                    finding_id,
109                    "propagation_engine",
110                    &now,
111                    ReviewAction::Flagged {
112                        flag_type: "retracted".into(),
113                    },
114                    "Source paper retracted",
115                );
116                events.push(event);
117            }
118            PropagationAction::Corrected {
119                field,
120                original,
121                corrected,
122            } => {
123                let event = make_event(
124                    finding_id,
125                    "propagation_engine",
126                    &now,
127                    ReviewAction::Corrected {
128                        field: field.clone(),
129                        original: original.clone(),
130                        corrected: corrected.clone(),
131                    },
132                    "Upstream correction applied",
133                );
134                events.push(event);
135            }
136            PropagationAction::ConfidenceReduced { new_score } => {
137                let old = frontier.findings[idx].confidence.score;
138                frontier.findings[idx].confidence.score = *new_score;
139                frontier.findings[idx].confidence.basis = format!(
140                    "Reduced from {:.3} to {:.3} (manual correction)",
141                    old, new_score
142                );
143                let event = make_event(
144                    finding_id,
145                    "propagation_engine",
146                    &now,
147                    ReviewAction::Flagged {
148                        flag_type: format!("confidence_reduced_to_{:.2}", new_score),
149                    },
150                    &format!("Confidence reduced from {:.3} to {:.3}", old, new_score),
151                );
152                events.push(event);
153            }
154            PropagationAction::ReplicationOutcome { outcome, vrep_id } => {
155                // Recompute the target finding's confidence from the
156                // current `Project.replications` collection. This is the
157                // v0.36.1 source-of-truth path: confidence is a function
158                // of recorded replications, not the legacy scalar flag.
159                let target_bundle = frontier.findings[idx].clone();
160                let new_conf = frontier.compute_confidence_for(&target_bundle);
161                let old = frontier.findings[idx].confidence.score;
162                let new_score = new_conf.score;
163                // v0.43.0 fix: capture the finding's hash *before* mutating
164                // it, then emit a canonical `finding.confidence_revised`
165                // state event after mutation so the per-finding event
166                // chain stays continuous. Pre-v0.43.0 the cascade
167                // mutated confidence silently and broke replay.
168                let before_hash = finding_hash(&frontier.findings[idx]);
169                frontier.findings[idx].confidence = new_conf;
170                let after_hash = finding_hash(&frontier.findings[idx]);
171                let revise_reason = format!(
172                    "{outcome} replication {vrep_id} recorded; confidence {:.3} -> {:.3}",
173                    old, new_score
174                );
175                let mut state_event = StateEvent {
176                    schema: EVENT_SCHEMA.to_string(),
177                    id: String::new(),
178                    kind: "finding.confidence_revised".to_string(),
179                    target: StateTarget {
180                        r#type: "finding".to_string(),
181                        id: finding_id.to_string(),
182                    },
183                    actor: StateActor {
184                        id: "propagation_engine".to_string(),
185                        r#type: "system".to_string(),
186                    },
187                    timestamp: now.clone(),
188                    reason: revise_reason.clone(),
189                    before_hash,
190                    after_hash,
191                    payload: json!({
192                        "proposal_id": format!("vpr_synthetic_{}", &vrep_id[..vrep_id.len().min(16)]),
193                        "previous_score": old,
194                        "new_score": new_score,
195                        "trigger": "replication_outcome",
196                        "vrep_id": vrep_id,
197                        "outcome": outcome,
198                    }),
199                    caveats: Vec::new(),
200                    signature: None,
201                    schema_artifact_id: None,
202                };
203                state_event.id = compute_event_id(&state_event);
204                frontier.events.push(state_event);
205
206                // Also emit the human-readable review event for the
207                // review queue (existing v0.36.1 behavior).
208                let event = make_event(
209                    finding_id,
210                    "propagation_engine",
211                    &now,
212                    ReviewAction::Flagged {
213                        flag_type: format!("replication_{}", outcome),
214                    },
215                    &revise_reason,
216                );
217                events.push(event);
218            }
219        }
220    }
221
222    // Step 2: BFS through dependents, up to MAX_DEPTH.
223    let mut visited: HashSet<String> = HashSet::new();
224    visited.insert(finding_id.to_string());
225
226    let mut queue: VecDeque<(String, usize)> = VecDeque::new();
227    queue.push_back((finding_id.to_string(), 0));
228
229    while let Some((current_id, depth)) = queue.pop_front() {
230        if depth >= MAX_DEPTH {
231            continue;
232        }
233
234        // Find all findings that have a supports/depends link targeting current_id.
235        let dependents = find_dependents(&frontier.findings, &current_id);
236
237        if dependents.is_empty() {
238            continue;
239        }
240
241        let mut level_ids: Vec<String> = Vec::new();
242
243        for dep_idx in dependents {
244            let dep_id = frontier.findings[dep_idx].id.clone();
245            if visited.contains(&dep_id) {
246                continue;
247            }
248            visited.insert(dep_id.clone());
249
250            // Flag the dependent finding.
251            let (flag_type, reason) = match &action {
252                PropagationAction::Retracted => (
253                    "upstream_retracted".to_string(),
254                    format!(
255                        "Upstream finding {} was retracted (depth {})",
256                        finding_id,
257                        depth + 1
258                    ),
259                ),
260                PropagationAction::Corrected { field, .. } => (
261                    "upstream_corrected".to_string(),
262                    format!(
263                        "Upstream finding {} had field '{}' corrected (depth {})",
264                        finding_id,
265                        field,
266                        depth + 1
267                    ),
268                ),
269                PropagationAction::ConfidenceReduced { new_score } => {
270                    if *new_score < 0.5 {
271                        (
272                            "upstream_at_risk".to_string(),
273                            format!(
274                                "Upstream finding {} confidence reduced to {:.2} (depth {})",
275                                finding_id,
276                                new_score,
277                                depth + 1
278                            ),
279                        )
280                    } else {
281                        continue; // Only propagate if below 0.5
282                    }
283                }
284                PropagationAction::ReplicationOutcome { outcome, .. } => match outcome.as_str() {
285                    "failed" => (
286                        "upstream_replication_failed".to_string(),
287                        format!(
288                            "Upstream finding {} failed replication (depth {})",
289                            finding_id,
290                            depth + 1
291                        ),
292                    ),
293                    "partial" => (
294                        "upstream_replication_partial".to_string(),
295                        format!(
296                            "Upstream finding {} partially replicated (depth {})",
297                            finding_id,
298                            depth + 1
299                        ),
300                    ),
301                    "replicated" => (
302                        "upstream_replication_succeeded".to_string(),
303                        format!(
304                            "Upstream finding {} replicated successfully (depth {})",
305                            finding_id,
306                            depth + 1
307                        ),
308                    ),
309                    // `inconclusive` and unknown outcomes do not cascade.
310                    _ => continue,
311                },
312            };
313
314            let event = make_event(
315                &dep_id,
316                "propagation_engine",
317                &now,
318                ReviewAction::Flagged {
319                    flag_type: flag_type.clone(),
320                },
321                &reason,
322            );
323            events.push(event);
324            level_ids.push(dep_id.clone());
325
326            // If retracted, also mark the dependent as contested.
327            if matches!(action, PropagationAction::Retracted) {
328                frontier.findings[dep_idx].flags.contested = true;
329            }
330
331            queue.push_back((dep_id, depth + 1));
332        }
333
334        if !level_ids.is_empty() {
335            // Ensure cascade has enough depth levels.
336            while cascade.len() <= depth {
337                cascade.push(Vec::new());
338            }
339            cascade[depth].extend(level_ids);
340        }
341    }
342
343    let affected = cascade.iter().map(|level| level.len()).sum();
344
345    PropagationResult {
346        affected,
347        cascade,
348        events,
349    }
350}
351
352/// Find indices of findings that have a supports or depends link targeting the
353/// given finding ID.
354fn find_dependents(findings: &[FindingBundle], target_id: &str) -> Vec<usize> {
355    findings
356        .iter()
357        .enumerate()
358        .filter(|(_, f)| {
359            f.links.iter().any(|l| {
360                l.target == target_id && (l.link_type == "supports" || l.link_type == "depends")
361            })
362        })
363        .map(|(idx, _)| idx)
364        .collect()
365}
366
367/// Create a content-addressed review event.
368fn make_event(
369    finding_id: &str,
370    reviewer: &str,
371    timestamp: &str,
372    action: ReviewAction,
373    reason: &str,
374) -> ReviewEvent {
375    let content = serde_json::json!({
376        "finding_id": finding_id,
377        "reviewer": reviewer,
378        "reviewed_at": timestamp,
379        "action": action,
380        "reason": reason,
381    });
382    let canonical = serde_json::to_string(&content).unwrap_or_default();
383    let hash = Sha256::digest(canonical.as_bytes());
384    let id = format!("rev_{}", &hex::encode(hash)[..16]);
385
386    ReviewEvent {
387        id,
388        workspace: None,
389        finding_id: finding_id.to_string(),
390        reviewer: reviewer.to_string(),
391        reviewed_at: timestamp.to_string(),
392        scope: None,
393        status: None,
394        action,
395        reason: reason.to_string(),
396        evidence_considered: Vec::new(),
397        state_change: None,
398    }
399}
400
401/// Create a review event recording a retraction with a human-readable reason.
402pub fn make_retraction_event(finding_id: &str, reason: &str) -> ReviewEvent {
403    let now = Utc::now().to_rfc3339();
404    make_event(
405        finding_id,
406        "retraction",
407        &now,
408        ReviewAction::Flagged {
409            flag_type: "retracted".into(),
410        },
411        reason,
412    )
413}
414
415/// Print a propagation result to stdout.
416pub fn print_result(result: &PropagationResult, action_label: &str, finding_id: &str) {
417    println!();
418    println!(
419        "  {}",
420        format!(
421            "VELA · PROPAGATE · {} · {}",
422            action_label.to_uppercase(),
423            finding_id
424        )
425        .dimmed()
426    );
427    println!("  {}", style::tick_row(60));
428    println!("  {} findings affected", result.affected);
429
430    for (depth, ids) in result.cascade.iter().enumerate() {
431        if !ids.is_empty() {
432            println!("  depth {}: {} findings", depth + 1, ids.len());
433            for id in ids {
434                println!("    · {}", id);
435            }
436        }
437    }
438
439    if !result.events.is_empty() {
440        println!();
441        println!("  review events created: {}", result.events.len());
442        for event in &result.events {
443            println!(
444                "    {} · {} · {}",
445                event.id.dimmed(),
446                event.finding_id,
447                event.reason
448            );
449        }
450    }
451    println!();
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use crate::bundle::*;
458    use crate::project;
459
460    fn make_finding(id: &str, score: f64) -> FindingBundle {
461        FindingBundle {
462            id: id.into(),
463            version: 1,
464            previous_version: None,
465            assertion: Assertion {
466                text: format!("Finding {id}"),
467                assertion_type: "mechanism".into(),
468                entities: vec![],
469                relation: None,
470                direction: None,
471                causal_claim: None,
472                causal_evidence_grade: None,
473            },
474            evidence: Evidence {
475                evidence_type: "experimental".into(),
476                model_system: String::new(),
477                species: None,
478                method: String::new(),
479                sample_size: None,
480                effect_size: None,
481                p_value: None,
482                replicated: false,
483                replication_count: None,
484                evidence_spans: vec![],
485            },
486            conditions: Conditions {
487                text: String::new(),
488                species_verified: vec![],
489                species_unverified: vec![],
490                in_vitro: false,
491                in_vivo: false,
492                human_data: false,
493                clinical_trial: false,
494                concentration_range: None,
495                duration: None,
496                age_group: None,
497                cell_type: None,
498            },
499            confidence: Confidence::raw(score, "test", 0.85),
500            provenance: Provenance {
501                source_type: "published_paper".into(),
502                doi: None,
503                pmid: None,
504                pmc: None,
505                openalex_id: None,
506                url: None,
507                title: "Test".into(),
508                authors: vec![],
509                year: Some(2025),
510                journal: None,
511                license: None,
512                publisher: None,
513                funders: vec![],
514                extraction: Extraction::default(),
515                review: None,
516                citation_count: None,
517            },
518            flags: Flags {
519                gap: false,
520                negative_space: false,
521                contested: false,
522                retracted: false,
523                declining: false,
524                gravity_well: false,
525                review_state: None,
526                superseded: false,
527                signature_threshold: None,
528                jointly_accepted: false,
529            },
530            links: vec![],
531            attachments: vec![],
532            annotations: vec![],
533            created: String::new(),
534            updated: None,
535
536            access_tier: crate::access_tier::AccessTier::Public,
537        }
538    }
539
540    fn make_frontier(findings: Vec<FindingBundle>) -> Project {
541        project::assemble("test", findings, 1, 0, "test frontier")
542    }
543
544    #[test]
545    fn retraction_propagates() {
546        let a = make_finding("a", 0.8);
547        let mut b = make_finding("b", 0.7);
548        // b depends on a
549        b.add_link("a", "depends", "b depends on a");
550
551        let mut c = make_frontier(vec![a, b]);
552        let result = propagate_correction(&mut c, "a", PropagationAction::Retracted);
553
554        // a should be retracted
555        assert!(c.findings[0].flags.retracted);
556        // b should be contested (flagged)
557        assert!(c.findings[1].flags.contested);
558        assert_eq!(result.affected, 1);
559    }
560
561    #[test]
562    fn confidence_reduction_propagates_below_half() {
563        let a = make_finding("a", 0.8);
564        let mut b = make_finding("b", 0.7);
565        b.add_link("a", "supports", "b supports a");
566
567        let mut c = make_frontier(vec![a, b]);
568        let result = propagate_correction(
569            &mut c,
570            "a",
571            PropagationAction::ConfidenceReduced { new_score: 0.3 },
572        );
573
574        assert!((c.findings[0].confidence.score - 0.3).abs() < 0.001);
575        assert_eq!(result.affected, 1);
576    }
577
578    #[test]
579    fn confidence_above_half_does_not_propagate() {
580        let a = make_finding("a", 0.8);
581        let mut b = make_finding("b", 0.7);
582        b.add_link("a", "supports", "b supports a");
583
584        let mut c = make_frontier(vec![a, b]);
585        let result = propagate_correction(
586            &mut c,
587            "a",
588            PropagationAction::ConfidenceReduced { new_score: 0.6 },
589        );
590
591        // Confidence updated on source, but no cascade.
592        assert!((c.findings[0].confidence.score - 0.6).abs() < 0.001);
593        assert_eq!(result.affected, 0);
594    }
595
596    #[test]
597    fn failed_replication_flags_dependents() {
598        // a is supported by b. A failed replication of a lands.
599        // b should be flagged with `upstream_replication_failed`.
600        let a = make_finding("vf_aaaa", 0.8);
601        let mut b = make_finding("vf_bbbb", 0.7);
602        b.add_link("vf_aaaa", "supports", "b supports a");
603        let mut frontier = make_frontier(vec![a, b]);
604        let result = propagate_correction(
605            &mut frontier,
606            "vf_aaaa",
607            PropagationAction::ReplicationOutcome {
608                outcome: "failed".into(),
609                vrep_id: "vrep_test01".into(),
610            },
611        );
612        // b should be flagged.
613        assert_eq!(result.affected, 1);
614        assert!(
615            result
616                .events
617                .iter()
618                .any(|e| matches!(&e.action,
619                    ReviewAction::Flagged { flag_type } if flag_type == "upstream_replication_failed"))
620        );
621    }
622
623    #[test]
624    fn successful_replication_recomputes_target_and_flags_dependents() {
625        // a has a successful replication. After propagation, a's
626        // confidence is recomputed from Project.replications, and
627        // b is flagged for review.
628        let a = make_finding("vf_aaaa", 0.5);
629        let mut b = make_finding("vf_bbbb", 0.5);
630        b.add_link("vf_aaaa", "depends", "b depends on a");
631        let mut frontier = make_frontier(vec![a, b]);
632
633        // Inject a replicated record so compute_confidence_for has
634        // something to count.
635        frontier.replications.push(Replication {
636            id: "vrep_test02".into(),
637            target_finding: "vf_aaaa".into(),
638            attempted_by: "lab:test".into(),
639            outcome: "replicated".into(),
640            evidence: frontier.findings[0].evidence.clone(),
641            conditions: frontier.findings[0].conditions.clone(),
642            provenance: frontier.findings[0].provenance.clone(),
643            notes: String::new(),
644            created: String::new(),
645            previous_attempt: None,
646        });
647
648        let result = propagate_correction(
649            &mut frontier,
650            "vf_aaaa",
651            PropagationAction::ReplicationOutcome {
652                outcome: "replicated".into(),
653                vrep_id: "vrep_test02".into(),
654            },
655        );
656
657        // Target's confidence was recomputed.
658        assert_eq!(
659            frontier.findings[0].confidence.method,
660            ConfidenceMethod::Computed
661        );
662        // Dependent flagged for review.
663        assert_eq!(result.affected, 1);
664        assert!(
665            result
666                .events
667                .iter()
668                .any(|e| matches!(&e.action,
669                    ReviewAction::Flagged { flag_type } if flag_type == "upstream_replication_succeeded"))
670        );
671    }
672
673    #[test]
674    fn inconclusive_replication_does_not_cascade() {
675        let a = make_finding("vf_aaaa", 0.7);
676        let mut b = make_finding("vf_bbbb", 0.7);
677        b.add_link("vf_aaaa", "supports", "");
678        let mut frontier = make_frontier(vec![a, b]);
679        let result = propagate_correction(
680            &mut frontier,
681            "vf_aaaa",
682            PropagationAction::ReplicationOutcome {
683                outcome: "inconclusive".into(),
684                vrep_id: "vrep_test03".into(),
685            },
686        );
687        // Source still gets a recompute event, but no dependents flagged.
688        assert_eq!(result.affected, 0);
689    }
690
691    #[test]
692    fn depth_limit_respected() {
693        // Chain: a <- b <- c <- d <- e (each depends on previous)
694        let a = make_finding("a", 0.8);
695        let mut b = make_finding("b", 0.7);
696        b.add_link("a", "depends", "");
697        let mut c_f = make_finding("c", 0.7);
698        c_f.add_link("b", "depends", "");
699        let mut d = make_finding("d", 0.7);
700        d.add_link("c", "depends", "");
701        let mut e = make_finding("e", 0.7);
702        e.add_link("d", "depends", "");
703
704        let mut frontier = make_frontier(vec![a, b, c_f, d, e]);
705        let result = propagate_correction(&mut frontier, "a", PropagationAction::Retracted);
706
707        // Should stop at depth 3: b, c, d get flagged; e does not.
708        assert!(result.affected <= 3);
709    }
710}