1use std::collections::{HashMap, HashSet, VecDeque};
8
9use chrono::Utc;
10use sha2::{Digest, Sha256};
11
12use colored::Colorize;
13
14use crate::bundle::{FindingBundle, ReviewAction, ReviewEvent};
15use crate::cli_style as style;
16use crate::events::{
17 EVENT_SCHEMA, StateActor, StateEvent, StateTarget, compute_event_id, finding_hash,
18};
19use crate::project::Project;
20use serde_json::json;
21
22#[derive(Debug, Clone)]
24pub enum PropagationAction {
25 Retracted,
27 #[allow(dead_code)]
29 Corrected {
30 field: String,
31 original: String,
32 corrected: String,
33 },
34 ConfidenceReduced { new_score: f64 },
36 ReplicationOutcome { outcome: String, vrep_id: String },
46}
47
48pub struct PropagationResult {
50 pub affected: usize,
52 pub cascade: Vec<Vec<String>>,
54 pub events: Vec<ReviewEvent>,
56}
57
58const MAX_DEPTH: usize = 3;
60
61pub fn propagate_correction(
64 frontier: &mut Project,
65 finding_id: &str,
66 action: PropagationAction,
67) -> PropagationResult {
68 let now = Utc::now().to_rfc3339();
69
70 let mut reverse_links: HashMap<String, Vec<(usize, String)>> = HashMap::new();
73 for (idx, finding) in frontier.findings.iter().enumerate() {
74 for link in &finding.links {
75 if link.link_type == "supports" || link.link_type == "depends" {
76 reverse_links
77 .entry(link.target.clone())
78 .or_default()
79 .push((idx, link.link_type.clone()));
80 }
81 }
82 }
83
84 let mut forward_deps: HashMap<String, Vec<(usize, String)>> = HashMap::new();
87 for (idx, finding) in frontier.findings.iter().enumerate() {
88 for link in &finding.links {
89 forward_deps
90 .entry(finding.id.clone())
91 .or_default()
92 .push((idx, link.link_type.clone()));
93 }
94 }
95
96 let source_idx = frontier.findings.iter().position(|f| f.id == finding_id);
98
99 let mut events: Vec<ReviewEvent> = Vec::new();
100 let mut cascade: Vec<Vec<String>> = Vec::new();
101
102 if let Some(idx) = source_idx {
104 match &action {
105 PropagationAction::Retracted => {
106 frontier.findings[idx].flags.retracted = true;
107 let event = make_event(
108 finding_id,
109 "propagation_engine",
110 &now,
111 ReviewAction::Flagged {
112 flag_type: "retracted".into(),
113 },
114 "Source paper retracted",
115 );
116 events.push(event);
117 }
118 PropagationAction::Corrected {
119 field,
120 original,
121 corrected,
122 } => {
123 let event = make_event(
124 finding_id,
125 "propagation_engine",
126 &now,
127 ReviewAction::Corrected {
128 field: field.clone(),
129 original: original.clone(),
130 corrected: corrected.clone(),
131 },
132 "Upstream correction applied",
133 );
134 events.push(event);
135 }
136 PropagationAction::ConfidenceReduced { new_score } => {
137 let old = frontier.findings[idx].confidence.score;
138 frontier.findings[idx].confidence.score = *new_score;
139 frontier.findings[idx].confidence.basis = format!(
140 "Reduced from {:.3} to {:.3} (manual correction)",
141 old, new_score
142 );
143 let event = make_event(
144 finding_id,
145 "propagation_engine",
146 &now,
147 ReviewAction::Flagged {
148 flag_type: format!("confidence_reduced_to_{:.2}", new_score),
149 },
150 &format!("Confidence reduced from {:.3} to {:.3}", old, new_score),
151 );
152 events.push(event);
153 }
154 PropagationAction::ReplicationOutcome { outcome, vrep_id } => {
155 let target_bundle = frontier.findings[idx].clone();
160 let new_conf = frontier.compute_confidence_for(&target_bundle);
161 let old = frontier.findings[idx].confidence.score;
162 let new_score = new_conf.score;
163 let before_hash = finding_hash(&frontier.findings[idx]);
169 frontier.findings[idx].confidence = new_conf;
170 let after_hash = finding_hash(&frontier.findings[idx]);
171 let revise_reason = format!(
172 "{outcome} replication {vrep_id} recorded; confidence {:.3} -> {:.3}",
173 old, new_score
174 );
175 let mut state_event = StateEvent {
176 schema: EVENT_SCHEMA.to_string(),
177 id: String::new(),
178 kind: "finding.confidence_revised".to_string(),
179 target: StateTarget {
180 r#type: "finding".to_string(),
181 id: finding_id.to_string(),
182 },
183 actor: StateActor {
184 id: "propagation_engine".to_string(),
185 r#type: "system".to_string(),
186 },
187 timestamp: now.clone(),
188 reason: revise_reason.clone(),
189 before_hash,
190 after_hash,
191 payload: json!({
192 "proposal_id": format!("vpr_synthetic_{}", &vrep_id[..vrep_id.len().min(16)]),
193 "previous_score": old,
194 "new_score": new_score,
195 "trigger": "replication_outcome",
196 "vrep_id": vrep_id,
197 "outcome": outcome,
198 }),
199 caveats: Vec::new(),
200 signature: None,
201 schema_artifact_id: None,
202 };
203 state_event.id = compute_event_id(&state_event);
204 frontier.events.push(state_event);
205
206 let event = make_event(
209 finding_id,
210 "propagation_engine",
211 &now,
212 ReviewAction::Flagged {
213 flag_type: format!("replication_{}", outcome),
214 },
215 &revise_reason,
216 );
217 events.push(event);
218 }
219 }
220 }
221
222 let mut visited: HashSet<String> = HashSet::new();
224 visited.insert(finding_id.to_string());
225
226 let mut queue: VecDeque<(String, usize)> = VecDeque::new();
227 queue.push_back((finding_id.to_string(), 0));
228
229 while let Some((current_id, depth)) = queue.pop_front() {
230 if depth >= MAX_DEPTH {
231 continue;
232 }
233
234 let dependents = find_dependents(&frontier.findings, ¤t_id);
236
237 if dependents.is_empty() {
238 continue;
239 }
240
241 let mut level_ids: Vec<String> = Vec::new();
242
243 for dep_idx in dependents {
244 let dep_id = frontier.findings[dep_idx].id.clone();
245 if visited.contains(&dep_id) {
246 continue;
247 }
248 visited.insert(dep_id.clone());
249
250 let (flag_type, reason) = match &action {
252 PropagationAction::Retracted => (
253 "upstream_retracted".to_string(),
254 format!(
255 "Upstream finding {} was retracted (depth {})",
256 finding_id,
257 depth + 1
258 ),
259 ),
260 PropagationAction::Corrected { field, .. } => (
261 "upstream_corrected".to_string(),
262 format!(
263 "Upstream finding {} had field '{}' corrected (depth {})",
264 finding_id,
265 field,
266 depth + 1
267 ),
268 ),
269 PropagationAction::ConfidenceReduced { new_score } => {
270 if *new_score < 0.5 {
271 (
272 "upstream_at_risk".to_string(),
273 format!(
274 "Upstream finding {} confidence reduced to {:.2} (depth {})",
275 finding_id,
276 new_score,
277 depth + 1
278 ),
279 )
280 } else {
281 continue; }
283 }
284 PropagationAction::ReplicationOutcome { outcome, .. } => match outcome.as_str() {
285 "failed" => (
286 "upstream_replication_failed".to_string(),
287 format!(
288 "Upstream finding {} failed replication (depth {})",
289 finding_id,
290 depth + 1
291 ),
292 ),
293 "partial" => (
294 "upstream_replication_partial".to_string(),
295 format!(
296 "Upstream finding {} partially replicated (depth {})",
297 finding_id,
298 depth + 1
299 ),
300 ),
301 "replicated" => (
302 "upstream_replication_succeeded".to_string(),
303 format!(
304 "Upstream finding {} replicated successfully (depth {})",
305 finding_id,
306 depth + 1
307 ),
308 ),
309 _ => continue,
311 },
312 };
313
314 let event = make_event(
315 &dep_id,
316 "propagation_engine",
317 &now,
318 ReviewAction::Flagged {
319 flag_type: flag_type.clone(),
320 },
321 &reason,
322 );
323 events.push(event);
324 level_ids.push(dep_id.clone());
325
326 if matches!(action, PropagationAction::Retracted) {
328 frontier.findings[dep_idx].flags.contested = true;
329 }
330
331 queue.push_back((dep_id, depth + 1));
332 }
333
334 if !level_ids.is_empty() {
335 while cascade.len() <= depth {
337 cascade.push(Vec::new());
338 }
339 cascade[depth].extend(level_ids);
340 }
341 }
342
343 let affected = cascade.iter().map(|level| level.len()).sum();
344
345 PropagationResult {
346 affected,
347 cascade,
348 events,
349 }
350}
351
352fn find_dependents(findings: &[FindingBundle], target_id: &str) -> Vec<usize> {
355 findings
356 .iter()
357 .enumerate()
358 .filter(|(_, f)| {
359 f.links.iter().any(|l| {
360 l.target == target_id && (l.link_type == "supports" || l.link_type == "depends")
361 })
362 })
363 .map(|(idx, _)| idx)
364 .collect()
365}
366
367fn make_event(
369 finding_id: &str,
370 reviewer: &str,
371 timestamp: &str,
372 action: ReviewAction,
373 reason: &str,
374) -> ReviewEvent {
375 let content = serde_json::json!({
376 "finding_id": finding_id,
377 "reviewer": reviewer,
378 "reviewed_at": timestamp,
379 "action": action,
380 "reason": reason,
381 });
382 let canonical = serde_json::to_string(&content).unwrap_or_default();
383 let hash = Sha256::digest(canonical.as_bytes());
384 let id = format!("rev_{}", &hex::encode(hash)[..16]);
385
386 ReviewEvent {
387 id,
388 workspace: None,
389 finding_id: finding_id.to_string(),
390 reviewer: reviewer.to_string(),
391 reviewed_at: timestamp.to_string(),
392 scope: None,
393 status: None,
394 action,
395 reason: reason.to_string(),
396 evidence_considered: Vec::new(),
397 state_change: None,
398 }
399}
400
401pub fn make_retraction_event(finding_id: &str, reason: &str) -> ReviewEvent {
403 let now = Utc::now().to_rfc3339();
404 make_event(
405 finding_id,
406 "retraction",
407 &now,
408 ReviewAction::Flagged {
409 flag_type: "retracted".into(),
410 },
411 reason,
412 )
413}
414
415pub fn print_result(result: &PropagationResult, action_label: &str, finding_id: &str) {
417 println!();
418 println!(
419 " {}",
420 format!(
421 "VELA · PROPAGATE · {} · {}",
422 action_label.to_uppercase(),
423 finding_id
424 )
425 .dimmed()
426 );
427 println!(" {}", style::tick_row(60));
428 println!(" {} findings affected", result.affected);
429
430 for (depth, ids) in result.cascade.iter().enumerate() {
431 if !ids.is_empty() {
432 println!(" depth {}: {} findings", depth + 1, ids.len());
433 for id in ids {
434 println!(" · {}", id);
435 }
436 }
437 }
438
439 if !result.events.is_empty() {
440 println!();
441 println!(" review events created: {}", result.events.len());
442 for event in &result.events {
443 println!(
444 " {} · {} · {}",
445 event.id.dimmed(),
446 event.finding_id,
447 event.reason
448 );
449 }
450 }
451 println!();
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457 use crate::bundle::*;
458 use crate::project;
459
460 fn make_finding(id: &str, score: f64) -> FindingBundle {
461 FindingBundle {
462 id: id.into(),
463 version: 1,
464 previous_version: None,
465 assertion: Assertion {
466 text: format!("Finding {id}"),
467 assertion_type: "mechanism".into(),
468 entities: vec![],
469 relation: None,
470 direction: None,
471 causal_claim: None,
472 causal_evidence_grade: None,
473 },
474 evidence: Evidence {
475 evidence_type: "experimental".into(),
476 model_system: String::new(),
477 species: None,
478 method: String::new(),
479 sample_size: None,
480 effect_size: None,
481 p_value: None,
482 replicated: false,
483 replication_count: None,
484 evidence_spans: vec![],
485 },
486 conditions: Conditions {
487 text: String::new(),
488 species_verified: vec![],
489 species_unverified: vec![],
490 in_vitro: false,
491 in_vivo: false,
492 human_data: false,
493 clinical_trial: false,
494 concentration_range: None,
495 duration: None,
496 age_group: None,
497 cell_type: None,
498 },
499 confidence: Confidence::raw(score, "test", 0.85),
500 provenance: Provenance {
501 source_type: "published_paper".into(),
502 doi: None,
503 pmid: None,
504 pmc: None,
505 openalex_id: None,
506 url: None,
507 title: "Test".into(),
508 authors: vec![],
509 year: Some(2025),
510 journal: None,
511 license: None,
512 publisher: None,
513 funders: vec![],
514 extraction: Extraction::default(),
515 review: None,
516 citation_count: None,
517 },
518 flags: Flags {
519 gap: false,
520 negative_space: false,
521 contested: false,
522 retracted: false,
523 declining: false,
524 gravity_well: false,
525 review_state: None,
526 superseded: false,
527 signature_threshold: None,
528 jointly_accepted: false,
529 },
530 links: vec![],
531 attachments: vec![],
532 annotations: vec![],
533 created: String::new(),
534 updated: None,
535
536 access_tier: crate::access_tier::AccessTier::Public,
537 }
538 }
539
540 fn make_frontier(findings: Vec<FindingBundle>) -> Project {
541 project::assemble("test", findings, 1, 0, "test frontier")
542 }
543
544 #[test]
545 fn retraction_propagates() {
546 let a = make_finding("a", 0.8);
547 let mut b = make_finding("b", 0.7);
548 b.add_link("a", "depends", "b depends on a");
550
551 let mut c = make_frontier(vec![a, b]);
552 let result = propagate_correction(&mut c, "a", PropagationAction::Retracted);
553
554 assert!(c.findings[0].flags.retracted);
556 assert!(c.findings[1].flags.contested);
558 assert_eq!(result.affected, 1);
559 }
560
561 #[test]
562 fn confidence_reduction_propagates_below_half() {
563 let a = make_finding("a", 0.8);
564 let mut b = make_finding("b", 0.7);
565 b.add_link("a", "supports", "b supports a");
566
567 let mut c = make_frontier(vec![a, b]);
568 let result = propagate_correction(
569 &mut c,
570 "a",
571 PropagationAction::ConfidenceReduced { new_score: 0.3 },
572 );
573
574 assert!((c.findings[0].confidence.score - 0.3).abs() < 0.001);
575 assert_eq!(result.affected, 1);
576 }
577
578 #[test]
579 fn confidence_above_half_does_not_propagate() {
580 let a = make_finding("a", 0.8);
581 let mut b = make_finding("b", 0.7);
582 b.add_link("a", "supports", "b supports a");
583
584 let mut c = make_frontier(vec![a, b]);
585 let result = propagate_correction(
586 &mut c,
587 "a",
588 PropagationAction::ConfidenceReduced { new_score: 0.6 },
589 );
590
591 assert!((c.findings[0].confidence.score - 0.6).abs() < 0.001);
593 assert_eq!(result.affected, 0);
594 }
595
596 #[test]
597 fn failed_replication_flags_dependents() {
598 let a = make_finding("vf_aaaa", 0.8);
601 let mut b = make_finding("vf_bbbb", 0.7);
602 b.add_link("vf_aaaa", "supports", "b supports a");
603 let mut frontier = make_frontier(vec![a, b]);
604 let result = propagate_correction(
605 &mut frontier,
606 "vf_aaaa",
607 PropagationAction::ReplicationOutcome {
608 outcome: "failed".into(),
609 vrep_id: "vrep_test01".into(),
610 },
611 );
612 assert_eq!(result.affected, 1);
614 assert!(
615 result
616 .events
617 .iter()
618 .any(|e| matches!(&e.action,
619 ReviewAction::Flagged { flag_type } if flag_type == "upstream_replication_failed"))
620 );
621 }
622
623 #[test]
624 fn successful_replication_recomputes_target_and_flags_dependents() {
625 let a = make_finding("vf_aaaa", 0.5);
629 let mut b = make_finding("vf_bbbb", 0.5);
630 b.add_link("vf_aaaa", "depends", "b depends on a");
631 let mut frontier = make_frontier(vec![a, b]);
632
633 frontier.replications.push(Replication {
636 id: "vrep_test02".into(),
637 target_finding: "vf_aaaa".into(),
638 attempted_by: "lab:test".into(),
639 outcome: "replicated".into(),
640 evidence: frontier.findings[0].evidence.clone(),
641 conditions: frontier.findings[0].conditions.clone(),
642 provenance: frontier.findings[0].provenance.clone(),
643 notes: String::new(),
644 created: String::new(),
645 previous_attempt: None,
646 });
647
648 let result = propagate_correction(
649 &mut frontier,
650 "vf_aaaa",
651 PropagationAction::ReplicationOutcome {
652 outcome: "replicated".into(),
653 vrep_id: "vrep_test02".into(),
654 },
655 );
656
657 assert_eq!(
659 frontier.findings[0].confidence.method,
660 ConfidenceMethod::Computed
661 );
662 assert_eq!(result.affected, 1);
664 assert!(
665 result
666 .events
667 .iter()
668 .any(|e| matches!(&e.action,
669 ReviewAction::Flagged { flag_type } if flag_type == "upstream_replication_succeeded"))
670 );
671 }
672
673 #[test]
674 fn inconclusive_replication_does_not_cascade() {
675 let a = make_finding("vf_aaaa", 0.7);
676 let mut b = make_finding("vf_bbbb", 0.7);
677 b.add_link("vf_aaaa", "supports", "");
678 let mut frontier = make_frontier(vec![a, b]);
679 let result = propagate_correction(
680 &mut frontier,
681 "vf_aaaa",
682 PropagationAction::ReplicationOutcome {
683 outcome: "inconclusive".into(),
684 vrep_id: "vrep_test03".into(),
685 },
686 );
687 assert_eq!(result.affected, 0);
689 }
690
691 #[test]
692 fn depth_limit_respected() {
693 let a = make_finding("a", 0.8);
695 let mut b = make_finding("b", 0.7);
696 b.add_link("a", "depends", "");
697 let mut c_f = make_finding("c", 0.7);
698 c_f.add_link("b", "depends", "");
699 let mut d = make_finding("d", 0.7);
700 d.add_link("c", "depends", "");
701 let mut e = make_finding("e", 0.7);
702 e.add_link("d", "depends", "");
703
704 let mut frontier = make_frontier(vec![a, b, c_f, d, e]);
705 let result = propagate_correction(&mut frontier, "a", PropagationAction::Retracted);
706
707 assert!(result.affected <= 3);
709 }
710}