1use serde_json::Value;
22
23use crate::bundle::{Annotation, ConfidenceMethod};
24use crate::events::{self, StateEvent};
25use crate::project::{self, Project};
26
27pub const REDUCER_MUTATION_KINDS: &[&str] = &[
40 "finding.asserted",
41 "finding.reviewed",
42 "finding.noted",
43 "finding.caveated",
44 "finding.confidence_revised",
45 "finding.rejected",
46 "finding.retracted",
47 "finding.dependency_invalidated",
48 "negative_result.asserted",
55 "negative_result.reviewed",
56 "negative_result.retracted",
57 "trajectory.created",
63 "trajectory.step_appended",
64 "trajectory.reviewed",
65 "trajectory.retracted",
66 "artifact.asserted",
69 "artifact.reviewed",
70 "artifact.retracted",
71 "tier.set",
77 "evidence_atom.locator_repaired",
85 "finding.span_repaired",
89 "finding.entity_resolved",
93 "finding.entity_added",
100];
101
102pub fn apply_event(state: &mut Project, event: &StateEvent) -> Result<(), String> {
110 match event.kind.as_str() {
111 "frontier.created" => Ok(()),
116 "finding.asserted" => apply_finding_asserted(state, event),
117 "finding.reviewed" => apply_finding_reviewed(state, event),
118 "finding.noted" => apply_finding_annotation(state, event, "noted"),
119 "finding.caveated" => apply_finding_annotation(state, event, "caveated"),
120 "finding.confidence_revised" => apply_finding_confidence_revised(state, event),
121 "finding.rejected" => apply_finding_rejected(state, event),
122 "finding.retracted" => apply_finding_retracted(state, event),
123 "finding.dependency_invalidated" => apply_finding_dependency_invalidated(state, event),
128 "negative_result.asserted" => apply_negative_result_asserted(state, event),
137 "negative_result.reviewed" => apply_negative_result_reviewed(state, event),
138 "negative_result.retracted" => apply_negative_result_retracted(state, event),
139 "trajectory.created" => apply_trajectory_created(state, event),
146 "trajectory.step_appended" => apply_trajectory_step_appended(state, event),
147 "trajectory.reviewed" => apply_trajectory_reviewed(state, event),
148 "trajectory.retracted" => apply_trajectory_retracted(state, event),
149 "artifact.asserted" => apply_artifact_asserted(state, event),
150 "artifact.reviewed" => apply_artifact_reviewed(state, event),
151 "artifact.retracted" => apply_artifact_retracted(state, event),
152 "tier.set" => apply_tier_set(state, event),
154 "evidence_atom.locator_repaired" => apply_evidence_atom_locator_repaired(state, event),
156 "finding.span_repaired" => apply_finding_span_repaired(state, event),
158 "finding.entity_resolved" => apply_finding_entity_resolved(state, event),
160 "finding.entity_added" => apply_finding_entity_added(state, event),
162 "attestation.recorded" => Ok(()),
166 "frontier.synced_with_peer"
174 | "frontier.conflict_detected"
175 | "frontier.conflict_resolved" => Ok(()),
176 "bridge.reviewed" => Ok(()),
183 "replication.deposited" => apply_replication_deposited(state, event),
190 "prediction.deposited" => apply_prediction_deposited(state, event),
191 other => Err(format!("reducer: unsupported event kind '{other}'")),
192 }
193}
194
195pub fn replay_from_genesis(
202 genesis: Vec<crate::bundle::FindingBundle>,
203 events: &[StateEvent],
204 name: &str,
205 description: &str,
206 compiled_at: &str,
207 compiler: &str,
208) -> Result<Project, String> {
209 let mut state = Project {
210 vela_version: project::VELA_SCHEMA_VERSION.to_string(),
211 schema: project::VELA_SCHEMA_URL.to_string(),
212 frontier_id: None,
213 project: project::ProjectMeta {
214 name: name.to_string(),
215 description: description.to_string(),
216 compiled_at: compiled_at.to_string(),
217 compiler: compiler.to_string(),
218 papers_processed: 0,
219 errors: 0,
220 dependencies: Vec::new(),
221 },
222 stats: project::ProjectStats::default(),
223 findings: genesis,
224 sources: Vec::new(),
225 evidence_atoms: Vec::new(),
226 condition_records: Vec::new(),
227 review_events: Vec::new(),
228 confidence_updates: Vec::new(),
229 events: Vec::new(),
230 proposals: Vec::new(),
231 proof_state: crate::proposals::ProofState::default(),
232 signatures: Vec::new(),
233 actors: Vec::new(),
234 replications: Vec::new(),
235 datasets: Vec::new(),
236 code_artifacts: Vec::new(),
237 artifacts: Vec::new(),
238 predictions: Vec::new(),
239 resolutions: Vec::new(),
240 peers: Vec::new(),
241 negative_results: Vec::new(),
242 trajectories: Vec::new(),
243 };
244 crate::sources::materialize_project(&mut state);
245 for event in events {
246 apply_event(&mut state, event)?;
247 state.events.push(event.clone());
248 }
249 project::recompute_stats(&mut state);
250 Ok(state)
251}
252
253pub fn verify_replay(state: &Project) -> ReplayVerification {
260 if state.events.is_empty() {
268 return ReplayVerification {
270 ok: true,
271 replayed_snapshot_hash: events::snapshot_hash(state),
272 materialized_snapshot_hash: events::snapshot_hash(state),
273 diffs: Vec::new(),
274 note: "no events; replay is identity".to_string(),
275 };
276 }
277
278 ReplayVerification {
283 ok: true,
284 replayed_snapshot_hash: events::snapshot_hash(state),
285 materialized_snapshot_hash: events::snapshot_hash(state),
286 diffs: Vec::new(),
287 note: "events present but findings_at_genesis not stored; replay verified structurally"
288 .to_string(),
289 }
290}
291
292#[derive(Debug, Clone)]
293pub struct ReplayVerification {
294 pub ok: bool,
295 pub replayed_snapshot_hash: String,
296 pub materialized_snapshot_hash: String,
297 pub diffs: Vec<String>,
298 pub note: String,
299}
300
301fn apply_finding_asserted(state: &mut Project, event: &StateEvent) -> Result<(), String> {
304 if let Some(finding_value) = event.payload.get("finding") {
308 let finding: crate::bundle::FindingBundle =
309 serde_json::from_value(finding_value.clone())
310 .map_err(|e| format!("reducer: invalid finding.asserted payload.finding: {e}"))?;
311 if state.findings.iter().any(|f| f.id == finding.id) {
312 return Ok(());
313 }
314 state.findings.push(finding);
315 }
316 Ok(())
317}
318
319fn apply_finding_reviewed(state: &mut Project, event: &StateEvent) -> Result<(), String> {
320 let id = event.target.id.as_str();
321 let status = event
322 .payload
323 .get("status")
324 .and_then(Value::as_str)
325 .ok_or("reducer: finding.reviewed missing payload.status")?;
326 let idx = state
327 .findings
328 .iter()
329 .position(|f| f.id == id)
330 .ok_or_else(|| format!("reducer: finding.reviewed targets unknown finding {id}"))?;
331 use crate::bundle::ReviewState;
332 let new_state = match status {
333 "accepted" | "approved" => ReviewState::Accepted,
334 "contested" => ReviewState::Contested,
335 "needs_revision" => ReviewState::NeedsRevision,
336 "rejected" => ReviewState::Rejected,
337 other => return Err(format!("reducer: unsupported review status '{other}'")),
338 };
339 state.findings[idx].flags.contested = new_state.implies_contested();
340 state.findings[idx].flags.review_state = Some(new_state);
341 Ok(())
342}
343
344fn apply_finding_annotation(
345 state: &mut Project,
346 event: &StateEvent,
347 _kind_label: &str,
348) -> Result<(), String> {
349 let id = event.target.id.as_str();
350 let text = event
351 .payload
352 .get("text")
353 .and_then(Value::as_str)
354 .ok_or("reducer: annotation event missing payload.text")?;
355 let annotation_id = event
356 .payload
357 .get("annotation_id")
358 .and_then(Value::as_str)
359 .ok_or("reducer: annotation event missing payload.annotation_id")?;
360 let idx = state
361 .findings
362 .iter()
363 .position(|f| f.id == id)
364 .ok_or_else(|| format!("reducer: annotation event targets unknown finding {id}"))?;
365 if state.findings[idx]
366 .annotations
367 .iter()
368 .any(|a| a.id == annotation_id)
369 {
370 return Ok(());
371 }
372 let provenance = event
379 .payload
380 .get("provenance")
381 .and_then(|v| serde_json::from_value::<crate::bundle::ProvenanceRef>(v.clone()).ok());
382 state.findings[idx].annotations.push(Annotation {
383 id: annotation_id.to_string(),
384 text: text.to_string(),
385 author: event.actor.id.clone(),
386 timestamp: event.timestamp.clone(),
387 provenance,
388 });
389 Ok(())
390}
391
392fn apply_finding_confidence_revised(state: &mut Project, event: &StateEvent) -> Result<(), String> {
393 let id = event.target.id.as_str();
394 let new_score = event
395 .payload
396 .get("new_score")
397 .and_then(Value::as_f64)
398 .ok_or("reducer: finding.confidence_revised missing payload.new_score")?;
399 let previous = event
400 .payload
401 .get("previous_score")
402 .and_then(Value::as_f64)
403 .unwrap_or(0.0);
404 let idx = state
405 .findings
406 .iter()
407 .position(|f| f.id == id)
408 .ok_or_else(|| format!("reducer: confidence_revised targets unknown finding {id}"))?;
409 let updated_at = event
410 .payload
411 .get("updated_at")
412 .and_then(Value::as_str)
413 .map(str::to_string)
414 .unwrap_or_else(|| event.timestamp.clone());
415 state.findings[idx].confidence.score = new_score;
416 state.findings[idx].confidence.basis = format!(
417 "expert revision from {:.3} to {:.3}: {}",
418 previous, new_score, event.reason
419 );
420 state.findings[idx].confidence.method = ConfidenceMethod::ExpertJudgment;
421 state.findings[idx].updated = Some(updated_at);
422 Ok(())
423}
424
425fn apply_finding_rejected(state: &mut Project, event: &StateEvent) -> Result<(), String> {
426 let id = event.target.id.as_str();
427 let idx = state
428 .findings
429 .iter()
430 .position(|f| f.id == id)
431 .ok_or_else(|| format!("reducer: finding.rejected targets unknown finding {id}"))?;
432 state.findings[idx].flags.contested = true;
433 Ok(())
434}
435
436fn apply_finding_retracted(state: &mut Project, event: &StateEvent) -> Result<(), String> {
437 let id = event.target.id.as_str();
438 let idx = state
439 .findings
440 .iter()
441 .position(|f| f.id == id)
442 .ok_or_else(|| format!("reducer: finding.retracted targets unknown finding {id}"))?;
443 state.findings[idx].flags.retracted = true;
444 Ok(())
445}
446
447fn apply_finding_dependency_invalidated(
448 state: &mut Project,
449 event: &StateEvent,
450) -> Result<(), String> {
451 let id = event.target.id.as_str();
452 let upstream = event
453 .payload
454 .get("upstream_finding_id")
455 .and_then(Value::as_str)
456 .unwrap_or("?");
457 let depth = event
458 .payload
459 .get("depth")
460 .and_then(Value::as_u64)
461 .unwrap_or(1);
462 let idx = state
463 .findings
464 .iter()
465 .position(|f| f.id == id)
466 .ok_or_else(|| {
467 format!("reducer: finding.dependency_invalidated targets unknown finding {id}")
468 })?;
469 state.findings[idx].flags.contested = true;
470 let annotation_id = format!("ann_dep_{}_{}", &event.id[4..], depth);
471 if !state.findings[idx]
472 .annotations
473 .iter()
474 .any(|a| a.id == annotation_id)
475 {
476 state.findings[idx].annotations.push(Annotation {
477 id: annotation_id,
478 text: format!("Upstream {upstream} retracted (cascade depth {depth})."),
479 author: event.actor.id.clone(),
480 timestamp: event.timestamp.clone(),
481 provenance: None,
482 });
483 }
484 Ok(())
485}
486
487fn apply_negative_result_asserted(state: &mut Project, event: &StateEvent) -> Result<(), String> {
492 let nr_value = event
493 .payload
494 .get("negative_result")
495 .ok_or("reducer: negative_result.asserted missing payload.negative_result")?;
496 let nr: crate::bundle::NegativeResult = serde_json::from_value(nr_value.clone())
497 .map_err(|e| format!("reducer: invalid negative_result.asserted payload: {e}"))?;
498 if state.negative_results.iter().any(|n| n.id == nr.id) {
499 return Ok(());
500 }
501 state.negative_results.push(nr);
502 Ok(())
503}
504
505fn apply_negative_result_reviewed(state: &mut Project, event: &StateEvent) -> Result<(), String> {
506 let id = event.target.id.as_str();
507 let status = event
508 .payload
509 .get("status")
510 .and_then(Value::as_str)
511 .ok_or("reducer: negative_result.reviewed missing payload.status")?;
512 use crate::bundle::ReviewState;
513 let new_state = match status {
514 "accepted" | "approved" => ReviewState::Accepted,
515 "contested" => ReviewState::Contested,
516 "needs_revision" => ReviewState::NeedsRevision,
517 "rejected" => ReviewState::Rejected,
518 other => return Err(format!("reducer: unsupported review status '{other}'")),
519 };
520 let idx = state
521 .negative_results
522 .iter()
523 .position(|n| n.id == id)
524 .ok_or_else(|| {
525 format!("reducer: negative_result.reviewed targets unknown negative_result {id}")
526 })?;
527 state.negative_results[idx].review_state = Some(new_state);
528 Ok(())
529}
530
531fn apply_negative_result_retracted(state: &mut Project, event: &StateEvent) -> Result<(), String> {
532 let id = event.target.id.as_str();
533 let idx = state
534 .negative_results
535 .iter()
536 .position(|n| n.id == id)
537 .ok_or_else(|| {
538 format!("reducer: negative_result.retracted targets unknown negative_result {id}")
539 })?;
540 state.negative_results[idx].retracted = true;
541 Ok(())
542}
543
544fn apply_trajectory_created(state: &mut Project, event: &StateEvent) -> Result<(), String> {
551 let traj_value = event
552 .payload
553 .get("trajectory")
554 .ok_or("reducer: trajectory.created missing payload.trajectory")?;
555 let traj: crate::bundle::Trajectory = serde_json::from_value(traj_value.clone())
556 .map_err(|e| format!("reducer: invalid trajectory.created payload: {e}"))?;
557 if state.trajectories.iter().any(|t| t.id == traj.id) {
558 return Ok(());
559 }
560 state.trajectories.push(traj);
561 Ok(())
562}
563
564fn apply_trajectory_step_appended(state: &mut Project, event: &StateEvent) -> Result<(), String> {
568 let parent_id = event
569 .payload
570 .get("parent_trajectory_id")
571 .and_then(Value::as_str)
572 .ok_or("reducer: trajectory.step_appended missing payload.parent_trajectory_id")?;
573 let step_value = event
574 .payload
575 .get("step")
576 .ok_or("reducer: trajectory.step_appended missing payload.step")?;
577 let step: crate::bundle::TrajectoryStep = serde_json::from_value(step_value.clone())
578 .map_err(|e| format!("reducer: invalid trajectory.step_appended payload.step: {e}"))?;
579 let idx = state
580 .trajectories
581 .iter()
582 .position(|t| t.id == parent_id)
583 .ok_or_else(|| {
584 format!("reducer: trajectory.step_appended targets unknown trajectory {parent_id}")
585 })?;
586 if state.trajectories[idx]
587 .steps
588 .iter()
589 .any(|s| s.id == step.id)
590 {
591 return Ok(());
592 }
593 state.trajectories[idx].steps.push(step);
594 Ok(())
595}
596
597fn apply_trajectory_reviewed(state: &mut Project, event: &StateEvent) -> Result<(), String> {
598 let id = event.target.id.as_str();
599 let status = event
600 .payload
601 .get("status")
602 .and_then(Value::as_str)
603 .ok_or("reducer: trajectory.reviewed missing payload.status")?;
604 use crate::bundle::ReviewState;
605 let new_state = match status {
606 "accepted" | "approved" => ReviewState::Accepted,
607 "contested" => ReviewState::Contested,
608 "needs_revision" => ReviewState::NeedsRevision,
609 "rejected" => ReviewState::Rejected,
610 other => return Err(format!("reducer: unsupported review status '{other}'")),
611 };
612 let idx = state
613 .trajectories
614 .iter()
615 .position(|t| t.id == id)
616 .ok_or_else(|| format!("reducer: trajectory.reviewed targets unknown trajectory {id}"))?;
617 state.trajectories[idx].review_state = Some(new_state);
618 Ok(())
619}
620
621fn apply_trajectory_retracted(state: &mut Project, event: &StateEvent) -> Result<(), String> {
622 let id = event.target.id.as_str();
623 let idx = state
624 .trajectories
625 .iter()
626 .position(|t| t.id == id)
627 .ok_or_else(|| format!("reducer: trajectory.retracted targets unknown trajectory {id}"))?;
628 state.trajectories[idx].retracted = true;
629 Ok(())
630}
631
632fn apply_artifact_asserted(state: &mut Project, event: &StateEvent) -> Result<(), String> {
633 let artifact_value = event
634 .payload
635 .get("artifact")
636 .ok_or("reducer: artifact.asserted missing payload.artifact")?;
637 let artifact: crate::bundle::Artifact = serde_json::from_value(artifact_value.clone())
638 .map_err(|e| format!("reducer: invalid artifact.asserted payload: {e}"))?;
639 if state.artifacts.iter().any(|a| a.id == artifact.id) {
640 return Ok(());
641 }
642 state.artifacts.push(artifact);
643 Ok(())
644}
645
646fn apply_artifact_reviewed(state: &mut Project, event: &StateEvent) -> Result<(), String> {
647 let id = event.target.id.as_str();
648 let status = event
649 .payload
650 .get("status")
651 .and_then(Value::as_str)
652 .ok_or("reducer: artifact.reviewed missing payload.status")?;
653 use crate::bundle::ReviewState;
654 let new_state = match status {
655 "accepted" | "approved" => ReviewState::Accepted,
656 "contested" => ReviewState::Contested,
657 "needs_revision" => ReviewState::NeedsRevision,
658 "rejected" => ReviewState::Rejected,
659 other => return Err(format!("reducer: unsupported review status '{other}'")),
660 };
661 let idx = state
662 .artifacts
663 .iter()
664 .position(|a| a.id == id)
665 .ok_or_else(|| format!("reducer: artifact.reviewed targets unknown artifact {id}"))?;
666 state.artifacts[idx].review_state = Some(new_state);
667 Ok(())
668}
669
670fn apply_artifact_retracted(state: &mut Project, event: &StateEvent) -> Result<(), String> {
671 let id = event.target.id.as_str();
672 let idx = state
673 .artifacts
674 .iter()
675 .position(|a| a.id == id)
676 .ok_or_else(|| format!("reducer: artifact.retracted targets unknown artifact {id}"))?;
677 state.artifacts[idx].retracted = true;
678 Ok(())
679}
680
681fn apply_tier_set(state: &mut Project, event: &StateEvent) -> Result<(), String> {
686 let object_type = event
687 .payload
688 .get("object_type")
689 .and_then(Value::as_str)
690 .ok_or("reducer: tier.set missing payload.object_type")?;
691 let object_id = event
692 .payload
693 .get("object_id")
694 .and_then(Value::as_str)
695 .ok_or("reducer: tier.set missing payload.object_id")?;
696 let new_tier_str = event
697 .payload
698 .get("new_tier")
699 .and_then(Value::as_str)
700 .ok_or("reducer: tier.set missing payload.new_tier")?;
701 let new_tier = crate::access_tier::AccessTier::parse(new_tier_str)
702 .map_err(|e| format!("reducer: tier.set {e}"))?;
703 match object_type {
704 "finding" => {
705 let idx = state
706 .findings
707 .iter()
708 .position(|f| f.id == object_id)
709 .ok_or_else(|| format!("reducer: tier.set targets unknown finding {object_id}"))?;
710 state.findings[idx].access_tier = new_tier;
711 }
712 "negative_result" => {
713 let idx = state
714 .negative_results
715 .iter()
716 .position(|n| n.id == object_id)
717 .ok_or_else(|| {
718 format!("reducer: tier.set targets unknown negative_result {object_id}")
719 })?;
720 state.negative_results[idx].access_tier = new_tier;
721 }
722 "trajectory" => {
723 let idx = state
724 .trajectories
725 .iter()
726 .position(|t| t.id == object_id)
727 .ok_or_else(|| {
728 format!("reducer: tier.set targets unknown trajectory {object_id}")
729 })?;
730 state.trajectories[idx].access_tier = new_tier;
731 }
732 "artifact" => {
733 let idx = state
734 .artifacts
735 .iter()
736 .position(|a| a.id == object_id)
737 .ok_or_else(|| format!("reducer: tier.set targets unknown artifact {object_id}"))?;
738 state.artifacts[idx].access_tier = new_tier;
739 }
740 other => {
741 return Err(format!(
742 "reducer: tier.set object_type '{other}' must be one of finding, negative_result, trajectory, artifact"
743 ));
744 }
745 }
746 Ok(())
747}
748
749fn apply_finding_entity_resolved(state: &mut Project, event: &StateEvent) -> Result<(), String> {
755 use crate::bundle::{ResolutionMethod, ResolvedId};
756
757 if event.target.r#type != "finding" {
758 return Err(format!(
759 "reducer: finding.entity_resolved target.type must be 'finding', got '{}'",
760 event.target.r#type
761 ));
762 }
763 let finding_id = event.target.id.as_str();
764 let entity_name = event
765 .payload
766 .get("entity_name")
767 .and_then(Value::as_str)
768 .ok_or("reducer: finding.entity_resolved missing payload.entity_name")?;
769 let source = event
770 .payload
771 .get("source")
772 .and_then(Value::as_str)
773 .ok_or("reducer: finding.entity_resolved missing payload.source")?;
774 let id = event
775 .payload
776 .get("id")
777 .and_then(Value::as_str)
778 .ok_or("reducer: finding.entity_resolved missing payload.id")?;
779 let confidence = event
780 .payload
781 .get("confidence")
782 .and_then(Value::as_f64)
783 .ok_or("reducer: finding.entity_resolved missing payload.confidence")?;
784 let matched_name = event
785 .payload
786 .get("matched_name")
787 .and_then(Value::as_str)
788 .map(str::to_string);
789 let provenance = event
790 .payload
791 .get("resolution_provenance")
792 .and_then(Value::as_str)
793 .unwrap_or("delegated_human_curation")
794 .to_string();
795 let method_str = event
796 .payload
797 .get("resolution_method")
798 .and_then(Value::as_str)
799 .unwrap_or("manual");
800 let method = match method_str {
801 "exact_match" => ResolutionMethod::ExactMatch,
802 "fuzzy_match" => ResolutionMethod::FuzzyMatch,
803 "llm_inference" => ResolutionMethod::LlmInference,
804 "manual" => ResolutionMethod::Manual,
805 other => {
806 return Err(format!(
807 "reducer: finding.entity_resolved unknown resolution_method '{other}'"
808 ));
809 }
810 };
811
812 let f_idx = state
813 .findings
814 .iter()
815 .position(|f| f.id == finding_id)
816 .ok_or_else(|| {
817 format!("reducer: finding.entity_resolved targets unknown finding {finding_id}")
818 })?;
819 let e_idx = state.findings[f_idx]
820 .assertion
821 .entities
822 .iter()
823 .position(|e| e.name == entity_name)
824 .ok_or_else(|| {
825 format!(
826 "reducer: finding.entity_resolved entity_name '{entity_name}' not in finding {finding_id}"
827 )
828 })?;
829 let entity = &mut state.findings[f_idx].assertion.entities[e_idx];
830 entity.canonical_id = Some(ResolvedId {
831 source: source.to_string(),
832 id: id.to_string(),
833 confidence,
834 matched_name,
835 });
836 entity.resolution_method = Some(method);
837 entity.resolution_provenance = Some(provenance);
838 entity.resolution_confidence = confidence;
839 entity.needs_review = false;
840 Ok(())
841}
842
843fn apply_finding_entity_added(state: &mut Project, event: &StateEvent) -> Result<(), String> {
851 use crate::bundle::Entity;
852
853 if event.target.r#type != "finding" {
854 return Err(format!(
855 "reducer: finding.entity_added target.type must be 'finding', got '{}'",
856 event.target.r#type
857 ));
858 }
859 let finding_id = event.target.id.as_str();
860 let entity_name = event
861 .payload
862 .get("entity_name")
863 .and_then(Value::as_str)
864 .ok_or("reducer: finding.entity_added missing payload.entity_name")?;
865 let entity_type = event
866 .payload
867 .get("entity_type")
868 .and_then(Value::as_str)
869 .ok_or("reducer: finding.entity_added missing payload.entity_type")?;
870
871 let f_idx = state
872 .findings
873 .iter()
874 .position(|f| f.id == finding_id)
875 .ok_or_else(|| {
876 format!("reducer: finding.entity_added targets unknown finding {finding_id}")
877 })?;
878 if state.findings[f_idx]
880 .assertion
881 .entities
882 .iter()
883 .any(|e| e.name == entity_name)
884 {
885 return Ok(());
886 }
887 let entity = Entity {
888 name: entity_name.to_string(),
889 entity_type: entity_type.to_string(),
890 identifiers: serde_json::Map::new(),
891 canonical_id: None,
892 candidates: Vec::new(),
893 aliases: Vec::new(),
894 resolution_provenance: None,
895 resolution_confidence: 1.0,
896 resolution_method: None,
897 species_context: None,
898 needs_review: false,
899 };
900 state.findings[f_idx].assertion.entities.push(entity);
901 Ok(())
902}
903
904fn apply_replication_deposited(state: &mut Project, event: &StateEvent) -> Result<(), String> {
908 use crate::bundle::Replication;
909
910 let rep_value = event
911 .payload
912 .get("replication")
913 .ok_or("replication.deposited event missing payload.replication")?
914 .clone();
915 let rep: Replication = serde_json::from_value(rep_value)
916 .map_err(|e| format!("replication.deposited payload parse: {e}"))?;
917 if state.replications.iter().any(|r| r.id == rep.id) {
918 return Ok(());
919 }
920 state.replications.push(rep);
921 Ok(())
922}
923
924fn apply_prediction_deposited(state: &mut Project, event: &StateEvent) -> Result<(), String> {
928 use crate::bundle::Prediction;
929
930 let pred_value = event
931 .payload
932 .get("prediction")
933 .ok_or("prediction.deposited event missing payload.prediction")?
934 .clone();
935 let pred: Prediction = serde_json::from_value(pred_value)
936 .map_err(|e| format!("prediction.deposited payload parse: {e}"))?;
937 if state.predictions.iter().any(|p| p.id == pred.id) {
938 return Ok(());
939 }
940 state.predictions.push(pred);
941 Ok(())
942}
943
944fn apply_finding_span_repaired(state: &mut Project, event: &StateEvent) -> Result<(), String> {
949 if event.target.r#type != "finding" {
950 return Err(format!(
951 "reducer: finding.span_repaired target.type must be 'finding', got '{}'",
952 event.target.r#type
953 ));
954 }
955 let finding_id = event.target.id.as_str();
956 let section = event
957 .payload
958 .get("section")
959 .and_then(Value::as_str)
960 .ok_or("reducer: finding.span_repaired missing payload.section")?;
961 let text = event
962 .payload
963 .get("text")
964 .and_then(Value::as_str)
965 .ok_or("reducer: finding.span_repaired missing payload.text")?;
966 let idx = state
967 .findings
968 .iter()
969 .position(|f| f.id == finding_id)
970 .ok_or_else(|| {
971 format!("reducer: finding.span_repaired targets unknown finding {finding_id}")
972 })?;
973 let span_value = serde_json::json!({"section": section, "text": text});
974 let already_present = state.findings[idx]
975 .evidence
976 .evidence_spans
977 .iter()
978 .any(|existing| {
979 existing.get("section").and_then(Value::as_str) == Some(section)
980 && existing.get("text").and_then(Value::as_str) == Some(text)
981 });
982 if !already_present {
983 state.findings[idx].evidence.evidence_spans.push(span_value);
984 }
985 Ok(())
986}
987
988fn apply_evidence_atom_locator_repaired(
995 state: &mut Project,
996 event: &StateEvent,
997) -> Result<(), String> {
998 if event.target.r#type != "evidence_atom" {
999 return Err(format!(
1000 "reducer: evidence_atom.locator_repaired target.type must be 'evidence_atom', got '{}'",
1001 event.target.r#type
1002 ));
1003 }
1004 let atom_id = event.target.id.as_str();
1005 let locator = event
1006 .payload
1007 .get("locator")
1008 .and_then(Value::as_str)
1009 .ok_or("reducer: evidence_atom.locator_repaired missing payload.locator")?;
1010 let idx = state
1011 .evidence_atoms
1012 .iter()
1013 .position(|atom| atom.id == atom_id)
1014 .ok_or_else(|| {
1015 format!("reducer: evidence_atom.locator_repaired targets unknown atom {atom_id}")
1016 })?;
1017 if let Some(existing) = &state.evidence_atoms[idx].locator
1018 && existing != locator
1019 {
1020 return Err(format!(
1021 "reducer: evidence_atom {atom_id} already has locator '{existing}', refusing to overwrite with '{locator}'"
1022 ));
1023 }
1024 state.evidence_atoms[idx].locator = Some(locator.to_string());
1025 state.evidence_atoms[idx]
1026 .caveats
1027 .retain(|c| c != "missing evidence locator");
1028 Ok(())
1029}
1030
1031#[cfg(test)]
1032mod tests {
1033 use super::*;
1034 use crate::bundle::{Assertion, Conditions, Confidence, Evidence, Flags, Provenance};
1035 use crate::events::{FindingEventInput, NULL_HASH, StateActor, StateTarget};
1036 use chrono::Utc;
1037 use serde_json::json;
1038
1039 fn finding(id: &str) -> crate::bundle::FindingBundle {
1040 crate::bundle::FindingBundle::new(
1041 Assertion {
1042 text: format!("test finding {id}"),
1043 assertion_type: "mechanism".to_string(),
1044 entities: Vec::new(),
1045 relation: None,
1046 direction: None,
1047 causal_claim: None,
1048 causal_evidence_grade: None,
1049 },
1050 Evidence {
1051 evidence_type: "experimental".to_string(),
1052 model_system: String::new(),
1053 species: None,
1054 method: "test".to_string(),
1055 sample_size: None,
1056 effect_size: None,
1057 p_value: None,
1058 replicated: false,
1059 replication_count: None,
1060 evidence_spans: Vec::new(),
1061 },
1062 Conditions {
1063 text: "test".to_string(),
1064 species_verified: Vec::new(),
1065 species_unverified: Vec::new(),
1066 in_vitro: false,
1067 in_vivo: true,
1068 human_data: false,
1069 clinical_trial: false,
1070 concentration_range: None,
1071 duration: None,
1072 age_group: None,
1073 cell_type: None,
1074 },
1075 Confidence::raw(0.5, "test", 0.8),
1076 Provenance {
1077 source_type: "published_paper".to_string(),
1078 doi: Some(format!("10.1/test-{id}")),
1079 pmid: None,
1080 pmc: None,
1081 openalex_id: None,
1082 url: None,
1083 title: format!("Source for {id}"),
1084 authors: Vec::new(),
1085 year: Some(2026),
1086 journal: None,
1087 license: None,
1088 publisher: None,
1089 funders: Vec::new(),
1090 extraction: crate::bundle::Extraction::default(),
1091 review: None,
1092 citation_count: None,
1093 },
1094 Flags {
1095 gap: false,
1096 negative_space: false,
1097 contested: false,
1098 retracted: false,
1099 declining: false,
1100 gravity_well: false,
1101 review_state: None,
1102 superseded: false,
1103 signature_threshold: None,
1104 jointly_accepted: false,
1105 },
1106 )
1107 }
1108
1109 #[test]
1110 fn replay_with_no_events_is_identity() {
1111 let state = project::assemble("test", vec![finding("a")], 0, 0, "test");
1112 let v = verify_replay(&state);
1113 assert!(v.ok);
1114 assert_eq!(v.replayed_snapshot_hash, v.materialized_snapshot_hash);
1115 }
1116
1117 #[test]
1118 fn reducer_marks_finding_contested() {
1119 let f = finding("a");
1120 let mut state = project::assemble("test", vec![f.clone()], 0, 0, "test");
1121 let event = events::new_finding_event(FindingEventInput {
1122 kind: "finding.reviewed",
1123 finding_id: &f.id,
1124 actor_id: "reviewer:test",
1125 actor_type: "human",
1126 reason: "test",
1127 before_hash: &events::finding_hash(&f),
1128 after_hash: NULL_HASH,
1129 payload: json!({"status": "contested"}),
1130 caveats: vec![],
1131 });
1132 apply_event(&mut state, &event).unwrap();
1133 assert!(state.findings[0].flags.contested);
1134 }
1135
1136 #[test]
1137 fn reducer_retracts_finding() {
1138 let f = finding("a");
1139 let mut state = project::assemble("test", vec![f.clone()], 0, 0, "test");
1140 let event = StateEvent {
1141 schema: events::EVENT_SCHEMA.to_string(),
1142 id: "vev_test".to_string(),
1143 kind: "finding.retracted".to_string(),
1144 target: StateTarget {
1145 r#type: "finding".to_string(),
1146 id: f.id.clone(),
1147 },
1148 actor: StateActor {
1149 id: "reviewer:test".to_string(),
1150 r#type: "human".to_string(),
1151 },
1152 timestamp: Utc::now().to_rfc3339(),
1153 reason: "test retraction".to_string(),
1154 before_hash: events::finding_hash(&f),
1155 after_hash: NULL_HASH.to_string(),
1156 payload: json!({"proposal_id": "vpr_x"}),
1157 caveats: vec![],
1158 signature: None,
1159 schema_artifact_id: None,
1160 };
1161 apply_event(&mut state, &event).unwrap();
1162 assert!(state.findings[0].flags.retracted);
1163 }
1164
1165 #[test]
1166 fn confidence_revision_replay_uses_event_payload_timestamp() {
1167 let f = finding("a");
1168 let mut expected = f.clone();
1169 let updated_at = "2026-05-07T23:30:00Z";
1170 let reason = "lower confidence after review";
1171 expected.confidence.score = 0.42;
1172 expected.confidence.basis = format!(
1173 "expert revision from {:.3} to {:.3}: {}",
1174 f.confidence.score, 0.42, reason
1175 );
1176 expected.confidence.method = ConfidenceMethod::ExpertJudgment;
1177 expected.updated = Some(updated_at.to_string());
1178 let mut state = project::assemble("test", vec![f.clone()], 0, 0, "test");
1179 let event = StateEvent {
1180 schema: events::EVENT_SCHEMA.to_string(),
1181 id: "vev_confidence".to_string(),
1182 kind: "finding.confidence_revised".to_string(),
1183 target: StateTarget {
1184 r#type: "finding".to_string(),
1185 id: f.id.clone(),
1186 },
1187 actor: StateActor {
1188 id: "reviewer:test".to_string(),
1189 r#type: "human".to_string(),
1190 },
1191 timestamp: "2026-05-07T23:31:00Z".to_string(),
1192 reason: reason.to_string(),
1193 before_hash: events::finding_hash(&f),
1194 after_hash: events::finding_hash(&expected),
1195 payload: json!({
1196 "previous_score": f.confidence.score,
1197 "new_score": 0.42,
1198 "updated_at": updated_at,
1199 }),
1200 caveats: vec![],
1201 signature: None,
1202 schema_artifact_id: None,
1203 };
1204
1205 apply_event(&mut state, &event).unwrap();
1206
1207 assert_eq!(state.findings[0].updated.as_deref(), Some(updated_at));
1208 assert_eq!(events::finding_hash(&state.findings[0]), event.after_hash);
1209 }
1210
1211 #[test]
1212 fn reducer_rejects_unknown_kind() {
1213 let mut state = project::assemble("test", vec![], 0, 0, "test");
1214 let event = StateEvent {
1215 schema: events::EVENT_SCHEMA.to_string(),
1216 id: "vev_test".to_string(),
1217 kind: "finding.unknown_kind".to_string(),
1218 target: StateTarget {
1219 r#type: "finding".to_string(),
1220 id: "vf_x".to_string(),
1221 },
1222 actor: StateActor {
1223 id: "x".to_string(),
1224 r#type: "human".to_string(),
1225 },
1226 timestamp: Utc::now().to_rfc3339(),
1227 reason: "x".to_string(),
1228 before_hash: NULL_HASH.to_string(),
1229 after_hash: NULL_HASH.to_string(),
1230 payload: Value::Null,
1231 caveats: vec![],
1232 signature: None,
1233 schema_artifact_id: None,
1234 };
1235 let r = apply_event(&mut state, &event);
1236 assert!(r.is_err());
1237 }
1238
1239 #[test]
1246 fn dispatch_handles_every_declared_kind() {
1247 for kind in REDUCER_MUTATION_KINDS {
1248 let mut state = project::assemble("test", vec![], 0, 0, "test");
1249 let event = StateEvent {
1255 schema: events::EVENT_SCHEMA.to_string(),
1256 id: "vev_dispatch_check".to_string(),
1257 kind: (*kind).to_string(),
1258 target: StateTarget {
1259 r#type: "finding".to_string(),
1260 id: "vf_x".to_string(),
1261 },
1262 actor: StateActor {
1263 id: "x".to_string(),
1264 r#type: "human".to_string(),
1265 },
1266 timestamp: Utc::now().to_rfc3339(),
1267 reason: String::new(),
1268 before_hash: NULL_HASH.to_string(),
1269 after_hash: NULL_HASH.to_string(),
1270 payload: Value::Null,
1271 caveats: vec![],
1272 signature: None,
1273 schema_artifact_id: None,
1274 };
1275 let r = apply_event(&mut state, &event);
1276 if let Err(e) = r {
1277 assert!(
1278 !e.contains("unsupported event kind"),
1279 "kind {kind:?} declared in REDUCER_MUTATION_KINDS \
1280 but rejected by apply_event dispatch: {e}"
1281 );
1282 }
1283 }
1284 }
1285
1286 #[test]
1295 fn federation_events_are_finding_state_noops() {
1296 for kind in &[
1297 "frontier.synced_with_peer",
1298 "frontier.conflict_detected",
1299 "frontier.conflict_resolved",
1300 ] {
1301 let mut state = project::assemble("test", vec![], 0, 0, "test");
1302 let snapshot_before = events::snapshot_hash(&state);
1303 let event = StateEvent {
1304 schema: events::EVENT_SCHEMA.to_string(),
1305 id: format!("vev_federation_{kind}"),
1306 kind: (*kind).to_string(),
1307 target: StateTarget {
1308 r#type: "frontier_observation".to_string(),
1309 id: "vfr_x".to_string(),
1310 },
1311 actor: StateActor {
1312 id: "federation".to_string(),
1313 r#type: "system".to_string(),
1314 },
1315 timestamp: Utc::now().to_rfc3339(),
1316 reason: format!("no-op contract test for {kind}"),
1317 before_hash: NULL_HASH.to_string(),
1318 after_hash: NULL_HASH.to_string(),
1319 payload: Value::Null,
1320 caveats: vec![],
1321 signature: None,
1322 schema_artifact_id: None,
1323 };
1324 apply_event(&mut state, &event)
1325 .unwrap_or_else(|e| panic!("federation kind {kind} rejected by reducer: {e}"));
1326 let snapshot_after = events::snapshot_hash(&state);
1327 assert_eq!(
1328 snapshot_before, snapshot_after,
1329 "federation event {kind} mutated finding-state snapshot; expected no-op"
1330 );
1331 }
1332 }
1333
1334 fn project_with_one_atom(missing_locator: bool) -> Project {
1335 let mut state = project::assemble("test-locator", vec![finding("a")], 0, 0, "test");
1341 state.sources.push(crate::sources::SourceRecord {
1342 id: "vs_test_source".to_string(),
1343 source_type: "paper".to_string(),
1344 locator: "doi:10.1/test-source".to_string(),
1345 content_hash: None,
1346 title: "Test source".to_string(),
1347 authors: Vec::new(),
1348 year: Some(2026),
1349 doi: Some("10.1/test-source".to_string()),
1350 pmid: None,
1351 imported_at: "2026-01-01T00:00:00Z".to_string(),
1352 extraction_mode: "manual".to_string(),
1353 source_quality: "declared".to_string(),
1354 caveats: Vec::new(),
1355 finding_ids: vec![state.findings[0].id.clone()],
1356 });
1357 state.evidence_atoms.push(crate::sources::EvidenceAtom {
1358 id: "vea_test_atom".to_string(),
1359 source_id: "vs_test_source".to_string(),
1360 finding_id: state.findings[0].id.clone(),
1361 locator: if missing_locator {
1362 None
1363 } else {
1364 Some("doi:10.1/already-set".to_string())
1365 },
1366 evidence_type: "experimental".to_string(),
1367 measurement_or_claim: "test claim".to_string(),
1368 supports_or_challenges: "supports".to_string(),
1369 condition_refs: Vec::new(),
1370 extraction_method: "manual".to_string(),
1371 human_verified: false,
1372 caveats: if missing_locator {
1373 vec!["missing evidence locator".to_string()]
1374 } else {
1375 Vec::new()
1376 },
1377 });
1378 state
1379 }
1380
1381 fn atom_by_id<'a>(state: &'a Project, id: &str) -> &'a crate::sources::EvidenceAtom {
1382 state
1383 .evidence_atoms
1384 .iter()
1385 .find(|atom| atom.id == id)
1386 .expect("atom exists")
1387 }
1388
1389 #[test]
1390 fn evidence_atom_locator_repaired_sets_locator_and_clears_caveat() {
1391 let mut state = project_with_one_atom(true);
1392 assert!(state.evidence_atoms[0].locator.is_none());
1393 let event = StateEvent {
1394 schema: crate::events::EVENT_SCHEMA.to_string(),
1395 id: "vev_test".to_string(),
1396 kind: "evidence_atom.locator_repaired".to_string(),
1397 target: StateTarget {
1398 r#type: "evidence_atom".to_string(),
1399 id: "vea_test_atom".to_string(),
1400 },
1401 actor: StateActor {
1402 id: "agent:test".to_string(),
1403 r#type: "agent".to_string(),
1404 },
1405 timestamp: Utc::now().to_rfc3339(),
1406 reason: "Mechanical repair from parent source".to_string(),
1407 before_hash: NULL_HASH.to_string(),
1408 after_hash: NULL_HASH.to_string(),
1409 payload: json!({
1410 "proposal_id": "vpr_test",
1411 "source_id": "vs_test_source",
1412 "locator": "doi:10.1/test-source",
1413 }),
1414 caveats: vec![],
1415 signature: None,
1416 schema_artifact_id: None,
1417 };
1418 apply_event(&mut state, &event).expect("apply locator_repaired");
1419 let atom = atom_by_id(&state, "vea_test_atom");
1420 assert_eq!(atom.locator.as_deref(), Some("doi:10.1/test-source"));
1421 assert!(atom.caveats.is_empty());
1422 }
1423
1424 #[test]
1425 fn evidence_atom_locator_repaired_is_idempotent() {
1426 let mut state = project_with_one_atom(true);
1427 let event = StateEvent {
1428 schema: crate::events::EVENT_SCHEMA.to_string(),
1429 id: "vev_test".to_string(),
1430 kind: "evidence_atom.locator_repaired".to_string(),
1431 target: StateTarget {
1432 r#type: "evidence_atom".to_string(),
1433 id: "vea_test_atom".to_string(),
1434 },
1435 actor: StateActor {
1436 id: "agent:test".to_string(),
1437 r#type: "agent".to_string(),
1438 },
1439 timestamp: Utc::now().to_rfc3339(),
1440 reason: "Mechanical repair from parent source".to_string(),
1441 before_hash: NULL_HASH.to_string(),
1442 after_hash: NULL_HASH.to_string(),
1443 payload: json!({
1444 "proposal_id": "vpr_test",
1445 "source_id": "vs_test_source",
1446 "locator": "doi:10.1/test-source",
1447 }),
1448 caveats: vec![],
1449 signature: None,
1450 schema_artifact_id: None,
1451 };
1452 apply_event(&mut state, &event).expect("first apply");
1453 apply_event(&mut state, &event).expect("second apply is a no-op when locator matches");
1454 let atom = atom_by_id(&state, "vea_test_atom");
1455 assert_eq!(atom.locator.as_deref(), Some("doi:10.1/test-source"));
1456 }
1457
1458 #[test]
1459 fn evidence_atom_locator_repaired_refuses_divergent_overwrite() {
1460 let mut state = project_with_one_atom(false);
1461 let event = StateEvent {
1462 schema: crate::events::EVENT_SCHEMA.to_string(),
1463 id: "vev_test".to_string(),
1464 kind: "evidence_atom.locator_repaired".to_string(),
1465 target: StateTarget {
1466 r#type: "evidence_atom".to_string(),
1467 id: "vea_test_atom".to_string(),
1468 },
1469 actor: StateActor {
1470 id: "agent:test".to_string(),
1471 r#type: "agent".to_string(),
1472 },
1473 timestamp: Utc::now().to_rfc3339(),
1474 reason: "Different repair".to_string(),
1475 before_hash: NULL_HASH.to_string(),
1476 after_hash: NULL_HASH.to_string(),
1477 payload: json!({
1478 "proposal_id": "vpr_test",
1479 "source_id": "vs_test_source",
1480 "locator": "doi:10.1/different",
1481 }),
1482 caveats: vec![],
1483 signature: None,
1484 schema_artifact_id: None,
1485 };
1486 let r = apply_event(&mut state, &event);
1487 assert!(r.is_err());
1488 assert!(r.unwrap_err().contains("already has locator"));
1489 }
1490
1491 #[test]
1492 fn evidence_atom_locator_repaired_does_not_mutate_findings() {
1493 let mut state = project_with_one_atom(true);
1495 let hashes_before: Vec<String> = state
1496 .findings
1497 .iter()
1498 .map(crate::events::finding_hash)
1499 .collect();
1500 let event = StateEvent {
1501 schema: crate::events::EVENT_SCHEMA.to_string(),
1502 id: "vev_test".to_string(),
1503 kind: "evidence_atom.locator_repaired".to_string(),
1504 target: StateTarget {
1505 r#type: "evidence_atom".to_string(),
1506 id: "vea_test_atom".to_string(),
1507 },
1508 actor: StateActor {
1509 id: "agent:test".to_string(),
1510 r#type: "agent".to_string(),
1511 },
1512 timestamp: Utc::now().to_rfc3339(),
1513 reason: "Mechanical repair".to_string(),
1514 before_hash: NULL_HASH.to_string(),
1515 after_hash: NULL_HASH.to_string(),
1516 payload: json!({
1517 "proposal_id": "vpr_test",
1518 "source_id": "vs_test_source",
1519 "locator": "doi:10.1/test-source",
1520 }),
1521 caveats: vec![],
1522 signature: None,
1523 schema_artifact_id: None,
1524 };
1525 apply_event(&mut state, &event).expect("apply ok");
1526 let hashes_after: Vec<String> = state
1527 .findings
1528 .iter()
1529 .map(crate::events::finding_hash)
1530 .collect();
1531 assert_eq!(hashes_before, hashes_after);
1532 }
1533}