1use std::collections::{HashMap, HashSet, VecDeque};
8
9use chrono::Utc;
10use sha2::{Digest, Sha256};
11
12use colored::Colorize;
13
14use crate::bundle::{FindingBundle, ReviewAction, ReviewEvent};
15use crate::cli_style as style;
16use crate::events::{
17 EVENT_SCHEMA, StateActor, StateEvent, StateTarget, compute_event_id, finding_hash,
18};
19use crate::project::Project;
20use serde_json::json;
21
22#[derive(Debug, Clone)]
24pub enum PropagationAction {
25 Retracted,
27 #[allow(dead_code)]
29 Corrected {
30 field: String,
31 original: String,
32 corrected: String,
33 },
34 ConfidenceReduced { new_score: f64 },
36 ReplicationOutcome { outcome: String, vrep_id: String },
46}
47
48pub struct PropagationResult {
50 pub affected: usize,
52 pub cascade: Vec<Vec<String>>,
54 pub events: Vec<ReviewEvent>,
56}
57
58const MAX_DEPTH: usize = 3;
60
61pub fn propagate_correction(
64 frontier: &mut Project,
65 finding_id: &str,
66 action: PropagationAction,
67) -> PropagationResult {
68 let now = Utc::now().to_rfc3339();
69
70 let mut reverse_links: HashMap<String, Vec<(usize, String)>> = HashMap::new();
73 for (idx, finding) in frontier.findings.iter().enumerate() {
74 for link in &finding.links {
75 if link.link_type == "supports" || link.link_type == "depends" {
76 reverse_links
77 .entry(link.target.clone())
78 .or_default()
79 .push((idx, link.link_type.clone()));
80 }
81 }
82 }
83
84 let mut forward_deps: HashMap<String, Vec<(usize, String)>> = HashMap::new();
87 for (idx, finding) in frontier.findings.iter().enumerate() {
88 for link in &finding.links {
89 forward_deps
90 .entry(finding.id.clone())
91 .or_default()
92 .push((idx, link.link_type.clone()));
93 }
94 }
95
96 let source_idx = frontier.findings.iter().position(|f| f.id == finding_id);
98
99 let mut events: Vec<ReviewEvent> = Vec::new();
100 let mut cascade: Vec<Vec<String>> = Vec::new();
101
102 if let Some(idx) = source_idx {
104 match &action {
105 PropagationAction::Retracted => {
106 frontier.findings[idx].flags.retracted = true;
107 let event = make_event(
108 finding_id,
109 "propagation_engine",
110 &now,
111 ReviewAction::Flagged {
112 flag_type: "retracted".into(),
113 },
114 "Source paper retracted",
115 );
116 events.push(event);
117 }
118 PropagationAction::Corrected {
119 field,
120 original,
121 corrected,
122 } => {
123 let event = make_event(
124 finding_id,
125 "propagation_engine",
126 &now,
127 ReviewAction::Corrected {
128 field: field.clone(),
129 original: original.clone(),
130 corrected: corrected.clone(),
131 },
132 "Upstream correction applied",
133 );
134 events.push(event);
135 }
136 PropagationAction::ConfidenceReduced { new_score } => {
137 let old = frontier.findings[idx].confidence.score;
138 frontier.findings[idx].confidence.score = *new_score;
139 frontier.findings[idx].confidence.basis = format!(
140 "Reduced from {:.3} to {:.3} (manual correction)",
141 old, new_score
142 );
143 let event = make_event(
144 finding_id,
145 "propagation_engine",
146 &now,
147 ReviewAction::Flagged {
148 flag_type: format!("confidence_reduced_to_{:.2}", new_score),
149 },
150 &format!("Confidence reduced from {:.3} to {:.3}", old, new_score),
151 );
152 events.push(event);
153 }
154 PropagationAction::ReplicationOutcome { outcome, vrep_id } => {
155 let target_bundle = frontier.findings[idx].clone();
160 let new_conf = frontier.compute_confidence_for(&target_bundle);
161 let old = frontier.findings[idx].confidence.score;
162 let new_score = new_conf.score;
163 let before_hash = finding_hash(&frontier.findings[idx]);
169 frontier.findings[idx].confidence = new_conf;
170 let after_hash = finding_hash(&frontier.findings[idx]);
171 let revise_reason = format!(
172 "{outcome} replication {vrep_id} recorded; confidence {:.3} -> {:.3}",
173 old, new_score
174 );
175 let mut state_event = StateEvent {
176 schema: EVENT_SCHEMA.to_string(),
177 id: String::new(),
178 kind: "finding.confidence_revised".to_string(),
179 target: StateTarget {
180 r#type: "finding".to_string(),
181 id: finding_id.to_string(),
182 },
183 actor: StateActor {
184 id: "propagation_engine".to_string(),
185 r#type: "system".to_string(),
186 },
187 timestamp: now.clone(),
188 reason: revise_reason.clone(),
189 before_hash,
190 after_hash,
191 payload: json!({
192 "proposal_id": format!("vpr_synthetic_{}", &vrep_id[..vrep_id.len().min(16)]),
193 "previous_score": old,
194 "new_score": new_score,
195 "trigger": "replication_outcome",
196 "vrep_id": vrep_id,
197 "outcome": outcome,
198 }),
199 caveats: Vec::new(),
200 signature: None,
201 };
202 state_event.id = compute_event_id(&state_event);
203 frontier.events.push(state_event);
204
205 let event = make_event(
208 finding_id,
209 "propagation_engine",
210 &now,
211 ReviewAction::Flagged {
212 flag_type: format!("replication_{}", outcome),
213 },
214 &revise_reason,
215 );
216 events.push(event);
217 }
218 }
219 }
220
221 let mut visited: HashSet<String> = HashSet::new();
223 visited.insert(finding_id.to_string());
224
225 let mut queue: VecDeque<(String, usize)> = VecDeque::new();
226 queue.push_back((finding_id.to_string(), 0));
227
228 while let Some((current_id, depth)) = queue.pop_front() {
229 if depth >= MAX_DEPTH {
230 continue;
231 }
232
233 let dependents = find_dependents(&frontier.findings, ¤t_id);
235
236 if dependents.is_empty() {
237 continue;
238 }
239
240 let mut level_ids: Vec<String> = Vec::new();
241
242 for dep_idx in dependents {
243 let dep_id = frontier.findings[dep_idx].id.clone();
244 if visited.contains(&dep_id) {
245 continue;
246 }
247 visited.insert(dep_id.clone());
248
249 let (flag_type, reason) = match &action {
251 PropagationAction::Retracted => (
252 "upstream_retracted".to_string(),
253 format!(
254 "Upstream finding {} was retracted (depth {})",
255 finding_id,
256 depth + 1
257 ),
258 ),
259 PropagationAction::Corrected { field, .. } => (
260 "upstream_corrected".to_string(),
261 format!(
262 "Upstream finding {} had field '{}' corrected (depth {})",
263 finding_id,
264 field,
265 depth + 1
266 ),
267 ),
268 PropagationAction::ConfidenceReduced { new_score } => {
269 if *new_score < 0.5 {
270 (
271 "upstream_at_risk".to_string(),
272 format!(
273 "Upstream finding {} confidence reduced to {:.2} (depth {})",
274 finding_id,
275 new_score,
276 depth + 1
277 ),
278 )
279 } else {
280 continue; }
282 }
283 PropagationAction::ReplicationOutcome { outcome, .. } => match outcome.as_str() {
284 "failed" => (
285 "upstream_replication_failed".to_string(),
286 format!(
287 "Upstream finding {} failed replication (depth {})",
288 finding_id,
289 depth + 1
290 ),
291 ),
292 "partial" => (
293 "upstream_replication_partial".to_string(),
294 format!(
295 "Upstream finding {} partially replicated (depth {})",
296 finding_id,
297 depth + 1
298 ),
299 ),
300 "replicated" => (
301 "upstream_replication_succeeded".to_string(),
302 format!(
303 "Upstream finding {} replicated successfully (depth {})",
304 finding_id,
305 depth + 1
306 ),
307 ),
308 _ => continue,
310 },
311 };
312
313 let event = make_event(
314 &dep_id,
315 "propagation_engine",
316 &now,
317 ReviewAction::Flagged {
318 flag_type: flag_type.clone(),
319 },
320 &reason,
321 );
322 events.push(event);
323 level_ids.push(dep_id.clone());
324
325 if matches!(action, PropagationAction::Retracted) {
327 frontier.findings[dep_idx].flags.contested = true;
328 }
329
330 queue.push_back((dep_id, depth + 1));
331 }
332
333 if !level_ids.is_empty() {
334 while cascade.len() <= depth {
336 cascade.push(Vec::new());
337 }
338 cascade[depth].extend(level_ids);
339 }
340 }
341
342 let affected = cascade.iter().map(|level| level.len()).sum();
343
344 PropagationResult {
345 affected,
346 cascade,
347 events,
348 }
349}
350
351fn find_dependents(findings: &[FindingBundle], target_id: &str) -> Vec<usize> {
354 findings
355 .iter()
356 .enumerate()
357 .filter(|(_, f)| {
358 f.links.iter().any(|l| {
359 l.target == target_id && (l.link_type == "supports" || l.link_type == "depends")
360 })
361 })
362 .map(|(idx, _)| idx)
363 .collect()
364}
365
366fn make_event(
368 finding_id: &str,
369 reviewer: &str,
370 timestamp: &str,
371 action: ReviewAction,
372 reason: &str,
373) -> ReviewEvent {
374 let content = serde_json::json!({
375 "finding_id": finding_id,
376 "reviewer": reviewer,
377 "reviewed_at": timestamp,
378 "action": action,
379 "reason": reason,
380 });
381 let canonical = serde_json::to_string(&content).unwrap_or_default();
382 let hash = Sha256::digest(canonical.as_bytes());
383 let id = format!("rev_{}", &hex::encode(hash)[..16]);
384
385 ReviewEvent {
386 id,
387 workspace: None,
388 finding_id: finding_id.to_string(),
389 reviewer: reviewer.to_string(),
390 reviewed_at: timestamp.to_string(),
391 scope: None,
392 status: None,
393 action,
394 reason: reason.to_string(),
395 evidence_considered: Vec::new(),
396 state_change: None,
397 }
398}
399
400pub fn make_retraction_event(finding_id: &str, reason: &str) -> ReviewEvent {
402 let now = Utc::now().to_rfc3339();
403 make_event(
404 finding_id,
405 "retraction",
406 &now,
407 ReviewAction::Flagged {
408 flag_type: "retracted".into(),
409 },
410 reason,
411 )
412}
413
414pub fn print_result(result: &PropagationResult, action_label: &str, finding_id: &str) {
416 println!();
417 println!(
418 " {}",
419 format!(
420 "VELA · PROPAGATE · {} · {}",
421 action_label.to_uppercase(),
422 finding_id
423 )
424 .dimmed()
425 );
426 println!(" {}", style::tick_row(60));
427 println!(" {} findings affected", result.affected);
428
429 for (depth, ids) in result.cascade.iter().enumerate() {
430 if !ids.is_empty() {
431 println!(" depth {}: {} findings", depth + 1, ids.len());
432 for id in ids {
433 println!(" · {}", id);
434 }
435 }
436 }
437
438 if !result.events.is_empty() {
439 println!();
440 println!(" review events created: {}", result.events.len());
441 for event in &result.events {
442 println!(
443 " {} · {} · {}",
444 event.id.dimmed(),
445 event.finding_id,
446 event.reason
447 );
448 }
449 }
450 println!();
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456 use crate::bundle::*;
457 use crate::project;
458
459 fn make_finding(id: &str, score: f64) -> FindingBundle {
460 FindingBundle {
461 id: id.into(),
462 version: 1,
463 previous_version: None,
464 assertion: Assertion {
465 text: format!("Finding {id}"),
466 assertion_type: "mechanism".into(),
467 entities: vec![],
468 relation: None,
469 direction: None,
470 causal_claim: None,
471 causal_evidence_grade: None,
472 },
473 evidence: Evidence {
474 evidence_type: "experimental".into(),
475 model_system: String::new(),
476 species: None,
477 method: String::new(),
478 sample_size: None,
479 effect_size: None,
480 p_value: None,
481 replicated: false,
482 replication_count: None,
483 evidence_spans: vec![],
484 },
485 conditions: Conditions {
486 text: String::new(),
487 species_verified: vec![],
488 species_unverified: vec![],
489 in_vitro: false,
490 in_vivo: false,
491 human_data: false,
492 clinical_trial: false,
493 concentration_range: None,
494 duration: None,
495 age_group: None,
496 cell_type: None,
497 },
498 confidence: Confidence::raw(score, "test", 0.85),
499 provenance: Provenance {
500 source_type: "published_paper".into(),
501 doi: None,
502 pmid: None,
503 pmc: None,
504 openalex_id: None,
505 url: None,
506 title: "Test".into(),
507 authors: vec![],
508 year: Some(2025),
509 journal: None,
510 license: None,
511 publisher: None,
512 funders: vec![],
513 extraction: Extraction::default(),
514 review: None,
515 citation_count: None,
516 },
517 flags: Flags {
518 gap: false,
519 negative_space: false,
520 contested: false,
521 retracted: false,
522 declining: false,
523 gravity_well: false,
524 review_state: None,
525 superseded: false,
526 signature_threshold: None,
527 jointly_accepted: false,
528 },
529 links: vec![],
530 attachments: vec![],
531 annotations: vec![],
532 created: String::new(),
533 updated: None,
534
535 access_tier: crate::access_tier::AccessTier::Public,
536 }
537 }
538
539 fn make_frontier(findings: Vec<FindingBundle>) -> Project {
540 project::assemble("test", findings, 1, 0, "test frontier")
541 }
542
543 #[test]
544 fn retraction_propagates() {
545 let a = make_finding("a", 0.8);
546 let mut b = make_finding("b", 0.7);
547 b.add_link("a", "depends", "b depends on a");
549
550 let mut c = make_frontier(vec![a, b]);
551 let result = propagate_correction(&mut c, "a", PropagationAction::Retracted);
552
553 assert!(c.findings[0].flags.retracted);
555 assert!(c.findings[1].flags.contested);
557 assert_eq!(result.affected, 1);
558 }
559
560 #[test]
561 fn confidence_reduction_propagates_below_half() {
562 let a = make_finding("a", 0.8);
563 let mut b = make_finding("b", 0.7);
564 b.add_link("a", "supports", "b supports a");
565
566 let mut c = make_frontier(vec![a, b]);
567 let result = propagate_correction(
568 &mut c,
569 "a",
570 PropagationAction::ConfidenceReduced { new_score: 0.3 },
571 );
572
573 assert!((c.findings[0].confidence.score - 0.3).abs() < 0.001);
574 assert_eq!(result.affected, 1);
575 }
576
577 #[test]
578 fn confidence_above_half_does_not_propagate() {
579 let a = make_finding("a", 0.8);
580 let mut b = make_finding("b", 0.7);
581 b.add_link("a", "supports", "b supports a");
582
583 let mut c = make_frontier(vec![a, b]);
584 let result = propagate_correction(
585 &mut c,
586 "a",
587 PropagationAction::ConfidenceReduced { new_score: 0.6 },
588 );
589
590 assert!((c.findings[0].confidence.score - 0.6).abs() < 0.001);
592 assert_eq!(result.affected, 0);
593 }
594
595 #[test]
596 fn failed_replication_flags_dependents() {
597 let a = make_finding("vf_aaaa", 0.8);
600 let mut b = make_finding("vf_bbbb", 0.7);
601 b.add_link("vf_aaaa", "supports", "b supports a");
602 let mut frontier = make_frontier(vec![a, b]);
603 let result = propagate_correction(
604 &mut frontier,
605 "vf_aaaa",
606 PropagationAction::ReplicationOutcome {
607 outcome: "failed".into(),
608 vrep_id: "vrep_test01".into(),
609 },
610 );
611 assert_eq!(result.affected, 1);
613 assert!(
614 result
615 .events
616 .iter()
617 .any(|e| matches!(&e.action,
618 ReviewAction::Flagged { flag_type } if flag_type == "upstream_replication_failed"))
619 );
620 }
621
622 #[test]
623 fn successful_replication_recomputes_target_and_flags_dependents() {
624 let a = make_finding("vf_aaaa", 0.5);
628 let mut b = make_finding("vf_bbbb", 0.5);
629 b.add_link("vf_aaaa", "depends", "b depends on a");
630 let mut frontier = make_frontier(vec![a, b]);
631
632 frontier.replications.push(Replication {
635 id: "vrep_test02".into(),
636 target_finding: "vf_aaaa".into(),
637 attempted_by: "lab:test".into(),
638 outcome: "replicated".into(),
639 evidence: frontier.findings[0].evidence.clone(),
640 conditions: frontier.findings[0].conditions.clone(),
641 provenance: frontier.findings[0].provenance.clone(),
642 notes: String::new(),
643 created: String::new(),
644 previous_attempt: None,
645 });
646
647 let result = propagate_correction(
648 &mut frontier,
649 "vf_aaaa",
650 PropagationAction::ReplicationOutcome {
651 outcome: "replicated".into(),
652 vrep_id: "vrep_test02".into(),
653 },
654 );
655
656 assert_eq!(
658 frontier.findings[0].confidence.method,
659 ConfidenceMethod::Computed
660 );
661 assert_eq!(result.affected, 1);
663 assert!(
664 result
665 .events
666 .iter()
667 .any(|e| matches!(&e.action,
668 ReviewAction::Flagged { flag_type } if flag_type == "upstream_replication_succeeded"))
669 );
670 }
671
672 #[test]
673 fn inconclusive_replication_does_not_cascade() {
674 let a = make_finding("vf_aaaa", 0.7);
675 let mut b = make_finding("vf_bbbb", 0.7);
676 b.add_link("vf_aaaa", "supports", "");
677 let mut frontier = make_frontier(vec![a, b]);
678 let result = propagate_correction(
679 &mut frontier,
680 "vf_aaaa",
681 PropagationAction::ReplicationOutcome {
682 outcome: "inconclusive".into(),
683 vrep_id: "vrep_test03".into(),
684 },
685 );
686 assert_eq!(result.affected, 0);
688 }
689
690 #[test]
691 fn depth_limit_respected() {
692 let a = make_finding("a", 0.8);
694 let mut b = make_finding("b", 0.7);
695 b.add_link("a", "depends", "");
696 let mut c_f = make_finding("c", 0.7);
697 c_f.add_link("b", "depends", "");
698 let mut d = make_finding("d", 0.7);
699 d.add_link("c", "depends", "");
700 let mut e = make_finding("e", 0.7);
701 e.add_link("d", "depends", "");
702
703 let mut frontier = make_frontier(vec![a, b, c_f, d, e]);
704 let result = propagate_correction(&mut frontier, "a", PropagationAction::Retracted);
705
706 assert!(result.affected <= 3);
708 }
709}