1use std::collections::BTreeMap;
9use std::fs;
10use std::path::{Path, PathBuf};
11
12use chrono::Utc;
13use serde::{Deserialize, Serialize};
14use serde_json::{Value, json};
15use sha2::{Digest, Sha256};
16
17use crate::artifact_to_state::{
18 ARTIFACT_PACKET_SCHEMA, ArtifactPacket, ImportIdempotency, PacketArtifact, PacketProducer,
19};
20use crate::bundle::{Artifact, valid_artifact_kind};
21use crate::canonical;
22use crate::decision::{SOURCE_INGEST_PLAN_SCHEMA, SourceIngestEntry, SourceIngestPlan};
23use crate::events::StateTarget;
24use crate::proposals;
25use crate::{artifact_to_state, project, repo};
26
27pub const CLINICALTRIALS_GOV_V2: &str = "clinicaltrials-gov-v2";
28pub const REGULATORY_DOCUMENTS_V1: &str = "regulatory-documents-v1";
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31enum AdapterKind {
32 ClinicalTrialsGovV2,
33 RegulatoryDocumentsV1,
34}
35
36#[derive(Debug, Clone, Copy)]
37struct AdapterSpec {
38 id: &'static str,
39 kind: AdapterKind,
40 category: &'static str,
41 manifest_title: &'static str,
42 producer_name: &'static str,
43 source_label: &'static str,
44 caveat: &'static str,
45}
46
47fn adapter_spec(adapter: &str) -> Result<AdapterSpec, String> {
48 match adapter {
49 CLINICALTRIALS_GOV_V2 => Ok(AdapterSpec {
50 id: CLINICALTRIALS_GOV_V2,
51 kind: AdapterKind::ClinicalTrialsGovV2,
52 category: "clinical_trial_registry",
53 manifest_title: "ClinicalTrials.gov adapter run",
54 producer_name: "ClinicalTrials.gov API v2 source adapter",
55 source_label: "ClinicalTrials.gov record",
56 caveat: "Source adapter output is registry metadata, not a clinical conclusion.",
57 }),
58 REGULATORY_DOCUMENTS_V1 => Ok(AdapterSpec {
59 id: REGULATORY_DOCUMENTS_V1,
60 kind: AdapterKind::RegulatoryDocumentsV1,
61 category: "regulatory",
62 manifest_title: "Regulatory document adapter run",
63 producer_name: "Regulatory document source adapter",
64 source_label: "regulatory source record",
65 caveat: "Source adapter output is regulatory source metadata, not a clinical conclusion.",
66 }),
67 _ => Err(format!("unsupported source adapter '{adapter}'")),
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
72pub struct SourceAdapterRunOptions {
73 pub adapter: String,
74 pub actor: String,
75 #[serde(default)]
76 pub entries: Vec<String>,
77 #[serde(default, skip_serializing_if = "Option::is_none")]
78 pub priority: Option<String>,
79 #[serde(default)]
80 pub include_excluded: bool,
81 #[serde(default)]
82 pub allow_partial: bool,
83 #[serde(default)]
84 pub dry_run: bool,
85 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub input_dir: Option<PathBuf>,
87 #[serde(default)]
88 pub apply_artifacts: bool,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
92pub struct SourceAdapterRunReport {
93 pub ok: bool,
94 pub command: String,
95 pub adapter: String,
96 pub run_id: String,
97 pub frontier: String,
98 pub dry_run: bool,
99 pub selected_entries: usize,
100 pub skipped_excluded: usize,
101 pub fetched_records: usize,
102 pub unchanged_records: usize,
103 pub changed_records: usize,
104 pub artifact_proposals: usize,
105 #[serde(default)]
106 pub review_note_proposals: usize,
107 pub proposal_ids: Vec<String>,
108 #[serde(default)]
109 pub review_proposal_ids: Vec<String>,
110 pub applied_event_ids: Vec<String>,
111 pub idempotency: ImportIdempotency,
112 pub trusted_state_effect: String,
113 pub failed_records: Vec<SourceAdapterFailure>,
114 pub records: Vec<ClinicalTrialsRecordReport>,
115 #[serde(default, skip_serializing_if = "Option::is_none")]
116 pub packet_id: Option<String>,
117 #[serde(default, skip_serializing_if = "Option::is_none")]
118 pub packet_path: Option<PathBuf>,
119 #[serde(default, skip_serializing_if = "Option::is_none")]
120 pub run_path: Option<PathBuf>,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
124pub struct SourceAdapterFailure {
125 pub entry_id: String,
126 #[serde(default)]
127 pub source_id: String,
128 pub nct_id: String,
129 pub locator: String,
130 pub error: String,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
134pub struct ClinicalTrialsRecordReport {
135 pub entry_id: String,
136 #[serde(default)]
137 pub source_id: String,
138 pub nct_id: String,
139 #[serde(default)]
140 pub source_label: String,
141 #[serde(default)]
142 pub artifact_kind: String,
143 pub status: String,
144 pub source_url: String,
145 pub api_url: String,
146 pub content_hash: String,
147 #[serde(default, skip_serializing_if = "Option::is_none")]
148 pub old_artifact_id: Option<String>,
149 #[serde(default, skip_serializing_if = "Option::is_none")]
150 pub old_content_hash: Option<String>,
151 #[serde(default)]
152 pub target_findings: Vec<String>,
153 #[serde(default)]
154 pub changed_fields: Vec<ClinicalTrialsFieldChange>,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
158pub struct ClinicalTrialsFieldChange {
159 pub field: String,
160 pub label: String,
161 pub path: String,
162 #[serde(default, skip_serializing_if = "Option::is_none")]
163 pub old_value: Option<String>,
164 #[serde(default, skip_serializing_if = "Option::is_none")]
165 pub new_value: Option<String>,
166}
167
168#[derive(Debug, Clone)]
169struct FetchedRecord {
170 entry: SourceIngestEntry,
171 source_id: String,
172 nct_id: String,
173 source_label: String,
174 artifact_kind: String,
175 source_url: String,
176 api_url: String,
177 value: Value,
178 content_hash: String,
179 old_content_hash: Option<String>,
180 changed_fields: Vec<ClinicalTrialsFieldChange>,
181 changed: bool,
182}
183
184#[derive(Debug, Clone)]
185struct SourceFetch {
186 value: Value,
187 content_hash: String,
188 source_url: String,
189 api_url: String,
190}
191
192pub async fn run(
193 frontier_path: &Path,
194 options: SourceAdapterRunOptions,
195) -> Result<SourceAdapterRunReport, String> {
196 let spec = adapter_spec(&options.adapter)?;
197 if options.actor.trim().is_empty() {
198 return Err("actor must be non-empty".to_string());
199 }
200 if let Some(priority) = options.priority.as_deref()
201 && !matches!(priority, "P0" | "P1" | "P2")
202 {
203 return Err("priority must be P0, P1, or P2".to_string());
204 }
205
206 let frontier = repo::load_from_path(frontier_path)?;
207 let ingest_dir = source_ingest_dir(frontier_path)?;
208 let frontier_root = frontier_root_for_ingest_dir(&ingest_dir, frontier_path);
209 let plan = load_ingest_plan(&ingest_dir)?;
210 let selected = selected_entries(&plan, &options, &spec);
211 let skipped_excluded = plan
212 .entries
213 .iter()
214 .filter(|entry| {
215 entry.category == spec.category
216 && entry.ingest_status == "excluded"
217 && !options.include_excluded
218 })
219 .count();
220
221 let mut fetched = Vec::new();
222 let mut failures = Vec::new();
223 for entry in selected.iter().cloned() {
224 let source_id = match source_id_for_entry(&entry, &spec) {
225 Ok(source_id) => source_id,
226 Err(error) => {
227 failures.push(SourceAdapterFailure {
228 entry_id: entry.id,
229 source_id: String::new(),
230 nct_id: String::new(),
231 locator: entry.locator,
232 error,
233 });
234 continue;
235 }
236 };
237 match fetch_source_record(&entry, &spec, &source_id, options.input_dir.as_deref()).await {
238 Ok(fetched_record) => {
239 let nct_id = if spec.kind == AdapterKind::ClinicalTrialsGovV2 {
240 source_id.clone()
241 } else {
242 entry.id.clone()
243 };
244 let artifact_kind = artifact_kind_for_entry(&entry);
245 let current_artifact = entry
246 .current_frontier_artifact_id
247 .as_deref()
248 .and_then(|id| frontier.artifacts.iter().find(|artifact| artifact.id == id));
249 let old_content_hash =
250 current_artifact.map(|artifact| artifact.content_hash.clone());
251 let previous_record = previous_source_record(
252 &frontier,
253 &ingest_dir,
254 &entry,
255 &source_id,
256 old_content_hash.as_deref(),
257 );
258 let mut changed_fields = tracked_source_changes(
259 &spec,
260 previous_record.as_ref(),
261 &fetched_record.value,
262 old_content_hash.as_deref(),
263 &fetched_record.content_hash,
264 );
265 let changed = source_record_changed(
266 &spec,
267 current_artifact,
268 &entry,
269 &frontier_root,
270 previous_record.as_ref(),
271 old_content_hash.as_deref(),
272 &fetched_record.content_hash,
273 );
274 if spec.kind == AdapterKind::RegulatoryDocumentsV1
275 && previous_record.is_none()
276 && let Some(old_locator) = current_artifact.and_then(|artifact| {
277 regulatory_locator_manifest_url(&frontier_root, artifact, &entry)
278 })
279 {
280 changed_fields = if old_locator == entry.locator {
281 Vec::new()
282 } else {
283 vec![ClinicalTrialsFieldChange {
284 field: "locator".to_string(),
285 label: "source locator".to_string(),
286 path: "/locator".to_string(),
287 old_value: Some(old_locator),
288 new_value: Some(entry.locator.clone()),
289 }]
290 };
291 }
292 if !changed {
293 changed_fields.clear();
294 }
295 fetched.push(FetchedRecord {
296 entry,
297 source_id: source_id.clone(),
298 source_label: spec.source_label.to_string(),
299 artifact_kind,
300 source_url: fetched_record.source_url,
301 api_url: fetched_record.api_url,
302 nct_id,
303 value: fetched_record.value,
304 content_hash: fetched_record.content_hash,
305 old_content_hash,
306 changed_fields,
307 changed,
308 });
309 }
310 Err(error) => failures.push(SourceAdapterFailure {
311 entry_id: entry.id.clone(),
312 source_id: source_id.clone(),
313 nct_id: if spec.kind == AdapterKind::ClinicalTrialsGovV2 {
314 source_id
315 } else {
316 entry.id
317 },
318 locator: entry.locator,
319 error,
320 }),
321 }
322 }
323
324 if !failures.is_empty() && !options.allow_partial {
325 let failed = failures
326 .iter()
327 .map(|failure| format!("{} ({})", failure.nct_id, failure.error))
328 .collect::<Vec<_>>()
329 .join(", ");
330 return Err(format!("source adapter run failed for {failed}"));
331 }
332
333 let changed_records = fetched.iter().filter(|record| record.changed).count();
334 let unchanged_records = fetched.len() - changed_records;
335 let run_id = run_id(&options.adapter, &fetched, &failures);
336 let records = fetched
337 .iter()
338 .map(|record| ClinicalTrialsRecordReport {
339 entry_id: record.entry.id.clone(),
340 source_id: record.source_id.clone(),
341 nct_id: record.nct_id.clone(),
342 source_label: record.source_label.clone(),
343 artifact_kind: record.artifact_kind.clone(),
344 status: if record.changed {
345 if record.old_content_hash.is_some() {
346 "changed".to_string()
347 } else {
348 "new".to_string()
349 }
350 } else {
351 "unchanged".to_string()
352 },
353 source_url: record.source_url.clone(),
354 api_url: record.api_url.clone(),
355 content_hash: record.content_hash.clone(),
356 old_artifact_id: record.entry.current_frontier_artifact_id.clone(),
357 old_content_hash: record.old_content_hash.clone(),
358 target_findings: record.entry.target_findings.clone(),
359 changed_fields: record.changed_fields.clone(),
360 })
361 .collect::<Vec<_>>();
362
363 if options.dry_run {
364 return Ok(SourceAdapterRunReport {
365 ok: true,
366 command: "source-adapter.run".to_string(),
367 adapter: options.adapter,
368 run_id: run_id.clone(),
369 frontier: frontier.project.name,
370 dry_run: true,
371 selected_entries: selected.len(),
372 skipped_excluded,
373 fetched_records: fetched.len(),
374 unchanged_records,
375 changed_records,
376 artifact_proposals: 0,
377 review_note_proposals: 0,
378 proposal_ids: Vec::new(),
379 review_proposal_ids: Vec::new(),
380 applied_event_ids: Vec::new(),
381 idempotency: ImportIdempotency {
382 packet_hash: format!("sha256:{}", hex::encode(Sha256::digest(run_id.as_bytes()))),
383 duplicate_packet: false,
384 skipped_existing_proposals: Vec::new(),
385 skipped_existing_artifacts: Vec::new(),
386 },
387 trusted_state_effect: "none".to_string(),
388 failed_records: failures,
389 records,
390 packet_id: None,
391 packet_path: None,
392 run_path: None,
393 });
394 }
395
396 let run_dir = ingest_dir.join("runs").join(&run_id);
397 fs::create_dir_all(run_dir.join("records"))
398 .map_err(|e| format!("create source adapter run dir '{}': {e}", run_dir.display()))?;
399 for record in &fetched {
400 fs::write(
401 run_dir
402 .join("records")
403 .join(format!("{}.json", record.source_id)),
404 serde_json::to_vec_pretty(&record.value)
405 .map_err(|e| format!("serialize record: {e}"))?,
406 )
407 .map_err(|e| format!("write fetched record {}: {e}", record.source_id))?;
408 }
409
410 let manifest = run_manifest(
411 &options,
412 &run_id,
413 selected.len(),
414 skipped_excluded,
415 &records,
416 &failures,
417 );
418 let manifest_bytes = canonical::to_canonical_bytes(&manifest)?;
419 let manifest_hash = sha256_for_bytes(&manifest_bytes);
420 let packet_id = packet_id(&options.adapter, &run_id, &manifest_hash);
421 let packet = artifact_packet(
422 &options,
423 &packet_id,
424 &run_id,
425 &manifest_hash,
426 &records,
427 &fetched,
428 &spec,
429 )?;
430 let packet_path = run_dir.join("artifact-packet.json");
431 fs::write(
432 &packet_path,
433 serde_json::to_vec_pretty(&packet).map_err(|e| format!("serialize packet: {e}"))?,
434 )
435 .map_err(|e| format!("write artifact packet '{}': {e}", packet_path.display()))?;
436
437 let import_report = artifact_to_state::import_packet_at_path(
438 frontier_path,
439 &packet_path,
440 &options.actor,
441 options.apply_artifacts,
442 )?;
443 let review_proposal_ids = create_review_note_proposals(
444 frontier_path,
445 &options,
446 &spec,
447 &run_id,
448 &packet_id,
449 &fetched,
450 )?;
451 let mut proposal_ids = import_report.proposal_ids;
452 proposal_ids.extend(review_proposal_ids.clone());
453 if options.apply_artifacts {
454 update_ingest_plan_after_apply(frontier_path, &ingest_dir, &fetched)?;
455 }
456
457 let final_run = json!({
458 "schema": "vela.source-adapter-run.v1",
459 "run_id": run_id,
460 "adapter": options.adapter,
461 "frontier": frontier.project.name,
462 "started_at": packet.created_at,
463 "selected_entries": selected.len(),
464 "skipped_excluded": skipped_excluded,
465 "fetched_records": fetched.len(),
466 "unchanged_records": unchanged_records,
467 "changed_records": changed_records,
468 "failed_records": failures,
469 "records": records,
470 "packet_id": packet_id,
471 "packet_path": "artifact-packet.json",
472 "proposal_ids": proposal_ids,
473 "review_proposal_ids": review_proposal_ids,
474 "applied_event_ids": import_report.applied_event_ids,
475 "idempotency": import_report.idempotency,
476 "trusted_state_effect": import_report.trusted_state_effect,
477 });
478 fs::write(
479 run_dir.join("run.json"),
480 serde_json::to_vec_pretty(&final_run).map_err(|e| format!("serialize run: {e}"))?,
481 )
482 .map_err(|e| format!("write run manifest '{}': {e}", run_dir.display()))?;
483
484 Ok(SourceAdapterRunReport {
485 ok: true,
486 command: "source-adapter.run".to_string(),
487 adapter: options.adapter,
488 run_id,
489 frontier: frontier.project.name,
490 dry_run: false,
491 selected_entries: selected.len(),
492 skipped_excluded,
493 fetched_records: fetched.len(),
494 unchanged_records,
495 changed_records,
496 artifact_proposals: import_report.artifact_proposals,
497 review_note_proposals: review_proposal_ids.len(),
498 proposal_ids,
499 review_proposal_ids,
500 applied_event_ids: import_report.applied_event_ids,
501 idempotency: import_report.idempotency,
502 trusted_state_effect: import_report.trusted_state_effect,
503 failed_records: failures,
504 records,
505 packet_id: Some(packet_id),
506 packet_path: Some(packet_path),
507 run_path: Some(run_dir),
508 })
509}
510
511fn source_ingest_dir(frontier_path: &Path) -> Result<PathBuf, String> {
512 match repo::detect(frontier_path)? {
513 repo::VelaSource::VelaRepo(root) => Ok(root.join("ingest")),
514 repo::VelaSource::ProjectFile(path) => path
515 .parent()
516 .map(|parent| parent.join("ingest"))
517 .ok_or_else(|| format!("frontier file '{}' has no parent", path.display())),
518 repo::VelaSource::PacketDir(dir) => Ok(dir.join("ingest")),
519 }
520}
521
522fn frontier_root_for_ingest_dir(ingest_dir: &Path, frontier_path: &Path) -> PathBuf {
523 ingest_dir
524 .parent()
525 .map(Path::to_path_buf)
526 .or_else(|| {
527 if frontier_path.is_dir() {
528 Some(frontier_path.to_path_buf())
529 } else {
530 frontier_path.parent().map(Path::to_path_buf)
531 }
532 })
533 .unwrap_or_else(|| PathBuf::from("."))
534}
535
536fn load_ingest_plan(ingest_dir: &Path) -> Result<SourceIngestPlan, String> {
537 let path = ingest_dir.join("source-ingest-plan.v1.json");
538 let data = fs::read_to_string(&path)
539 .map_err(|e| format!("read source ingest plan '{}': {e}", path.display()))?;
540 let plan: SourceIngestPlan =
541 serde_json::from_str(&data).map_err(|e| format!("parse source ingest plan: {e}"))?;
542 if plan.schema != SOURCE_INGEST_PLAN_SCHEMA {
543 return Err(format!(
544 "source ingest plan schema must be {SOURCE_INGEST_PLAN_SCHEMA}"
545 ));
546 }
547 Ok(plan)
548}
549
550fn source_record_changed(
551 spec: &AdapterSpec,
552 current_artifact: Option<&Artifact>,
553 entry: &SourceIngestEntry,
554 frontier_root: &Path,
555 previous_record: Option<&Value>,
556 old_content_hash: Option<&str>,
557 new_content_hash: &str,
558) -> bool {
559 let Some(old_content_hash) = old_content_hash else {
560 return true;
561 };
562 match spec.kind {
563 AdapterKind::ClinicalTrialsGovV2 => old_content_hash != new_content_hash,
564 AdapterKind::RegulatoryDocumentsV1 => {
565 if previous_record.is_some() {
566 return old_content_hash != new_content_hash;
567 }
568 if let Some(old_locator) = current_artifact.and_then(|artifact| {
569 regulatory_locator_manifest_url(frontier_root, artifact, entry)
570 }) {
571 return old_locator != entry.locator;
572 }
573 old_content_hash != new_content_hash
574 }
575 }
576}
577
578fn regulatory_locator_manifest_url(
579 frontier_root: &Path,
580 artifact: &Artifact,
581 entry: &SourceIngestEntry,
582) -> Option<String> {
583 if entry.source_type == "frontier_projection" {
584 return Some(entry.locator.clone());
585 }
586 if artifact.metadata.contains_key("source_adapter")
587 || artifact.metadata.contains_key("source_adapter_run_id")
588 || artifact.metadata.contains_key("run_id")
589 {
590 return None;
591 }
592 if artifact.storage_mode != "local_blob" {
593 return None;
594 }
595 let is_json = artifact
596 .media_type
597 .as_deref()
598 .is_some_and(|media_type| media_type.contains("json"));
599 if !is_json {
600 return None;
601 }
602 let locator = artifact.locator.as_deref()?;
603 let path = resolve_artifact_locator(frontier_root, locator);
604 let Ok(bytes) = fs::read(path) else {
605 return None;
606 };
607 let Ok(value) = serde_json::from_slice::<Value>(&bytes) else {
608 return None;
609 };
610 let manifest_url = value
611 .get("url")
612 .or_else(|| value.get("source_url"))
613 .and_then(Value::as_str)
614 .map(str::to_string)?;
615 let manifest_only = value
616 .get("access_terms")
617 .and_then(Value::as_str)
618 .is_some_and(|terms| terms.contains("stored as a Vela manifest"));
619 if manifest_only || value.get("content_hash").is_none() {
620 Some(manifest_url)
621 } else {
622 None
623 }
624}
625
626fn resolve_artifact_locator(frontier_root: &Path, locator: &str) -> PathBuf {
627 let path = Path::new(locator);
628 if path.is_absolute() {
629 path.to_path_buf()
630 } else {
631 frontier_root.join(path)
632 }
633}
634
635fn update_ingest_plan_after_apply(
636 frontier_path: &Path,
637 ingest_dir: &Path,
638 fetched: &[FetchedRecord],
639) -> Result<(), String> {
640 let path = ingest_dir.join("source-ingest-plan.v1.json");
641 let mut plan = load_ingest_plan(ingest_dir)?;
642 let frontier = repo::load_from_path(frontier_path)?;
643 let mut changed = false;
644 for record in fetched.iter().filter(|record| record.changed) {
645 let Some(artifact_id) = frontier
646 .artifacts
647 .iter()
648 .find(|artifact| {
649 artifact.kind == record.artifact_kind
650 && artifact.content_hash == record.content_hash
651 && artifact
652 .metadata
653 .get("source_id")
654 .or_else(|| artifact.metadata.get("entry_id"))
655 .or_else(|| artifact.metadata.get("nct_id"))
656 .and_then(Value::as_str)
657 .is_some_and(|source| source == record.source_id)
658 })
659 .map(|artifact| artifact.id.clone())
660 else {
661 continue;
662 };
663 if let Some(entry) = plan
664 .entries
665 .iter_mut()
666 .find(|entry| entry.id == record.entry.id)
667 && entry.current_frontier_artifact_id.as_deref() != Some(artifact_id.as_str())
668 {
669 entry.current_frontier_artifact_id = Some(artifact_id);
670 entry.ingest_status = "ingested".to_string();
671 changed = true;
672 }
673 }
674 if changed {
675 fs::write(
676 &path,
677 serde_json::to_vec_pretty(&plan)
678 .map_err(|e| format!("serialize source ingest plan: {e}"))?,
679 )
680 .map_err(|e| format!("write source ingest plan '{}': {e}", path.display()))?;
681 }
682 Ok(())
683}
684
685fn selected_entries(
686 plan: &SourceIngestPlan,
687 options: &SourceAdapterRunOptions,
688 spec: &AdapterSpec,
689) -> Vec<SourceIngestEntry> {
690 plan.entries
691 .iter()
692 .filter(|entry| entry.category == spec.category)
693 .filter(|entry| options.include_excluded || entry.ingest_status != "excluded")
694 .filter(|entry| {
695 options.entries.is_empty() || options.entries.iter().any(|wanted| wanted == &entry.id)
696 })
697 .filter(|entry| {
698 options
699 .priority
700 .as_deref()
701 .is_none_or(|priority| entry.priority == priority)
702 })
703 .cloned()
704 .collect()
705}
706
707fn previous_source_record(
708 frontier: &project::Project,
709 ingest_dir: &Path,
710 source_entry: &SourceIngestEntry,
711 source_id: &str,
712 expected_hash: Option<&str>,
713) -> Option<Value> {
714 let artifact = source_entry
715 .current_frontier_artifact_id
716 .as_deref()
717 .and_then(|id| frontier.artifacts.iter().find(|artifact| artifact.id == id))?;
718 let expected_hash = expected_hash.unwrap_or(&artifact.content_hash);
719
720 if let Some(run_id) = artifact
721 .metadata
722 .get("run_id")
723 .or_else(|| artifact.metadata.get("source_adapter_run_id"))
724 .and_then(Value::as_str)
725 {
726 let path = ingest_dir
727 .join("runs")
728 .join(run_id)
729 .join("records")
730 .join(format!("{source_id}.json"));
731 if let Some(value) = read_record_if_hash_matches(&path, expected_hash) {
732 return Some(value);
733 }
734 }
735
736 let runs_dir = ingest_dir.join("runs");
737 let entries = fs::read_dir(runs_dir).ok()?;
738 for entry in entries.flatten() {
739 let path = entry
740 .path()
741 .join("records")
742 .join(format!("{source_id}.json"));
743 if let Some(value) = read_record_if_hash_matches(&path, expected_hash) {
744 return Some(value);
745 }
746 }
747 None
748}
749
750fn read_record_if_hash_matches(path: &Path, expected_hash: &str) -> Option<Value> {
751 let text = fs::read_to_string(path).ok()?;
752 let value: Value = serde_json::from_str(&text).ok()?;
753 if value
754 .get("content_hash")
755 .and_then(Value::as_str)
756 .is_some_and(|hash| hash == expected_hash)
757 {
758 return Some(value);
759 }
760 let bytes = canonical::to_canonical_bytes(&value).ok()?;
761 if sha256_for_bytes(&bytes) == expected_hash {
762 Some(value)
763 } else {
764 None
765 }
766}
767
768fn tracked_source_changes(
769 spec: &AdapterSpec,
770 previous_record: Option<&Value>,
771 new_record: &Value,
772 old_content_hash: Option<&str>,
773 new_content_hash: &str,
774) -> Vec<ClinicalTrialsFieldChange> {
775 match spec.kind {
776 AdapterKind::ClinicalTrialsGovV2 => previous_record
777 .map(|previous| tracked_clinicaltrials_changes(previous, new_record))
778 .unwrap_or_default(),
779 AdapterKind::RegulatoryDocumentsV1 => {
780 if let Some(previous) = previous_record {
781 tracked_regulatory_changes(previous, new_record)
782 } else if old_content_hash.is_some_and(|old| old != new_content_hash) {
783 vec![ClinicalTrialsFieldChange {
784 field: "content_hash".to_string(),
785 label: "content hash".to_string(),
786 path: "/content_hash".to_string(),
787 old_value: old_content_hash.map(str::to_string),
788 new_value: Some(new_content_hash.to_string()),
789 }]
790 } else {
791 Vec::new()
792 }
793 }
794 }
795}
796
797fn tracked_regulatory_changes(
798 old_record: &Value,
799 new_record: &Value,
800) -> Vec<ClinicalTrialsFieldChange> {
801 const FIELDS: &[(&str, &str, &str)] = &[
802 ("content_hash", "content hash", "/content_hash"),
803 ("content_length", "content length", "/content_length"),
804 ("content_type", "content type", "/content_type"),
805 ("locator", "source locator", "/locator"),
806 ];
807
808 FIELDS
809 .iter()
810 .filter_map(|(field, label, path)| {
811 let old_value = old_record.pointer(path);
812 let new_value = new_record.pointer(path);
813 if old_value == new_value {
814 return None;
815 }
816 Some(ClinicalTrialsFieldChange {
817 field: (*field).to_string(),
818 label: (*label).to_string(),
819 path: (*path).to_string(),
820 old_value: field_value_summary(old_value),
821 new_value: field_value_summary(new_value),
822 })
823 })
824 .collect()
825}
826
827fn tracked_clinicaltrials_changes(
828 old_record: &Value,
829 new_record: &Value,
830) -> Vec<ClinicalTrialsFieldChange> {
831 const FIELDS: &[(&str, &str, &str)] = &[
832 (
833 "overall_status",
834 "overall status",
835 "/protocolSection/statusModule/overallStatus",
836 ),
837 (
838 "start_date",
839 "start date",
840 "/protocolSection/statusModule/startDateStruct/date",
841 ),
842 (
843 "primary_completion_date",
844 "primary completion date",
845 "/protocolSection/statusModule/primaryCompletionDateStruct/date",
846 ),
847 (
848 "completion_date",
849 "completion date",
850 "/protocolSection/statusModule/completionDateStruct/date",
851 ),
852 (
853 "phases",
854 "trial phase",
855 "/protocolSection/designModule/phases",
856 ),
857 (
858 "enrollment_count",
859 "enrollment count",
860 "/protocolSection/designModule/enrollmentInfo/count",
861 ),
862 (
863 "enrollment_type",
864 "enrollment type",
865 "/protocolSection/designModule/enrollmentInfo/type",
866 ),
867 (
868 "primary_outcomes",
869 "primary outcomes",
870 "/protocolSection/outcomesModule/primaryOutcomes",
871 ),
872 ("has_results", "posted results section", "/resultsSection"),
873 ];
874
875 FIELDS
876 .iter()
877 .filter_map(|(field, label, path)| {
878 let old_value = old_record.pointer(path);
879 let new_value = new_record.pointer(path);
880 if old_value == new_value {
881 return None;
882 }
883 Some(ClinicalTrialsFieldChange {
884 field: (*field).to_string(),
885 label: (*label).to_string(),
886 path: (*path).to_string(),
887 old_value: field_value_summary(old_value),
888 new_value: field_value_summary(new_value),
889 })
890 })
891 .collect()
892}
893
894fn field_value_summary(value: Option<&Value>) -> Option<String> {
895 let value = value?;
896 match value {
897 Value::Null => None,
898 Value::String(value) => Some(value.clone()),
899 Value::Number(value) => Some(value.to_string()),
900 Value::Bool(value) => Some(value.to_string()),
901 Value::Array(_) | Value::Object(_) => serde_json::to_string(value).ok(),
902 }
903}
904
905fn create_review_note_proposals(
906 frontier_path: &Path,
907 options: &SourceAdapterRunOptions,
908 spec: &AdapterSpec,
909 run_id: &str,
910 packet_id: &str,
911 fetched: &[FetchedRecord],
912) -> Result<Vec<String>, String> {
913 let mut ids = Vec::new();
914 for record in fetched
915 .iter()
916 .filter(|record| !record.changed_fields.is_empty())
917 {
918 let targets = record
919 .entry
920 .target_findings
921 .iter()
922 .filter(|id| id.starts_with("vf_"))
923 .cloned()
924 .collect::<std::collections::BTreeSet<_>>();
925 if targets.is_empty() {
926 continue;
927 }
928 let note_text = review_note_text(record);
929 for finding_id in targets {
930 let proposal = proposals::new_proposal(
931 "finding.note",
932 StateTarget {
933 r#type: "finding".to_string(),
934 id: finding_id,
935 },
936 options.actor.clone(),
937 if options.actor.starts_with("agent:") {
938 "agent"
939 } else {
940 "human"
941 },
942 format!(
943 "Review {} delta for {} from run {}",
944 spec.source_label, record.source_id, run_id
945 ),
946 json!({
947 "text": note_text,
948 "source_adapter": options.adapter,
949 "source_adapter_run_id": run_id,
950 "artifact_packet_id": packet_id,
951 "entry_id": record.entry.id,
952 "source_id": record.source_id,
953 "source_label": record.source_label,
954 "nct_id": record.nct_id,
955 "api_url": record.api_url,
956 "source_url": record.source_url,
957 "old_content_hash": record.old_content_hash,
958 "new_content_hash": record.content_hash,
959 "changed_fields": record.changed_fields,
960 }),
961 vec![
962 record.api_url.clone(),
963 record.source_url.clone(),
964 format!("source_adapter_run:{run_id}"),
965 format!("artifact_packet:{packet_id}"),
966 ],
967 review_note_caveats(spec),
968 );
969 let result = proposals::create_or_apply(frontier_path, proposal, false)?;
970 ids.push(result.proposal_id);
971 }
972 }
973 Ok(ids)
974}
975
976fn review_note_text(record: &FetchedRecord) -> String {
977 let fields = record
978 .changed_fields
979 .iter()
980 .map(|change| {
981 format!(
982 "{} changed from {} to {}",
983 change.label,
984 change.old_value.as_deref().unwrap_or("missing"),
985 change.new_value.as_deref().unwrap_or("missing")
986 )
987 })
988 .collect::<Vec<_>>()
989 .join("; ");
990 if record.source_label == "ClinicalTrials.gov record" {
991 format!(
992 "ClinicalTrials.gov record {} changed tracked registry fields: {}. Review whether this affects the linked finding scope, trial table, or decision brief. Registry metadata alone does not change the claim.",
993 record.source_id, fields
994 )
995 } else {
996 format!(
997 "Regulatory source {} changed tracked source fields: {}. Review whether this affects the linked finding scope, trial table, or decision brief; source metadata alone does not change the claim.",
998 record.source_id, fields
999 )
1000 }
1001}
1002
1003fn review_note_caveats(spec: &AdapterSpec) -> Vec<String> {
1004 match spec.kind {
1005 AdapterKind::ClinicalTrialsGovV2 => vec![
1006 "ClinicalTrials.gov metadata changed; this is a review task, not a claim update."
1007 .to_string(),
1008 "Accepting this note records reviewer awareness only.".to_string(),
1009 ],
1010 AdapterKind::RegulatoryDocumentsV1 => vec![
1011 "Regulatory source metadata changed; this is a review task, not a claim update."
1012 .to_string(),
1013 "Accepting this note records reviewer awareness only.".to_string(),
1014 ],
1015 }
1016}
1017
1018async fn fetch_source_record(
1019 entry: &SourceIngestEntry,
1020 spec: &AdapterSpec,
1021 source_id: &str,
1022 input_dir: Option<&Path>,
1023) -> Result<SourceFetch, String> {
1024 match spec.kind {
1025 AdapterKind::ClinicalTrialsGovV2 => fetch_clinicaltrials_record(source_id, input_dir).await,
1026 AdapterKind::RegulatoryDocumentsV1 => {
1027 fetch_regulatory_record(entry, source_id, input_dir).await
1028 }
1029 }
1030}
1031
1032async fn fetch_clinicaltrials_record(
1033 nct_id: &str,
1034 input_dir: Option<&Path>,
1035) -> Result<SourceFetch, String> {
1036 let raw = if let Some(dir) = input_dir {
1037 let path = dir.join(format!("{nct_id}.json"));
1038 fs::read_to_string(&path)
1039 .map_err(|e| format!("read ClinicalTrials.gov fixture '{}': {e}", path.display()))?
1040 } else {
1041 let url = format!("https://clinicaltrials.gov/api/v2/studies/{nct_id}");
1042 let response = reqwest::get(&url)
1043 .await
1044 .map_err(|e| format!("fetch {url}: {e}"))?;
1045 let response = response
1046 .error_for_status()
1047 .map_err(|e| format!("fetch {url}: {e}"))?;
1048 response
1049 .text()
1050 .await
1051 .map_err(|e| format!("read {url}: {e}"))?
1052 };
1053 let value: Value =
1054 serde_json::from_str(&raw).map_err(|e| format!("parse ClinicalTrials.gov record: {e}"))?;
1055 let canonical_bytes = canonical::to_canonical_bytes(&value)?;
1056 Ok(SourceFetch {
1057 value,
1058 content_hash: sha256_for_bytes(&canonical_bytes),
1059 source_url: format!("https://clinicaltrials.gov/study/{nct_id}"),
1060 api_url: format!("https://clinicaltrials.gov/api/v2/studies/{nct_id}"),
1061 })
1062}
1063
1064async fn fetch_regulatory_record(
1065 entry: &SourceIngestEntry,
1066 source_id: &str,
1067 input_dir: Option<&Path>,
1068) -> Result<SourceFetch, String> {
1069 let (bytes, content_type) = if let Some(dir) = input_dir {
1070 let path = fixture_path_for_source(dir, source_id)
1071 .ok_or_else(|| format!("read regulatory fixture for {source_id}: file not found"))?;
1072 let bytes = fs::read(&path)
1073 .map_err(|e| format!("read regulatory fixture '{}': {e}", path.display()))?;
1074 (bytes, content_type_for_path(&path))
1075 } else {
1076 let client = reqwest::Client::builder()
1077 .user_agent("vela-source-adapter/0.55 (+https://vela.science)")
1078 .build()
1079 .map_err(|e| format!("create http client: {e}"))?;
1080 let response = client
1081 .get(&entry.locator)
1082 .send()
1083 .await
1084 .map_err(|e| format!("fetch {}: {e}", entry.locator))?;
1085 let response = response
1086 .error_for_status()
1087 .map_err(|e| format!("fetch {}: {e}", entry.locator))?;
1088 let content_type = response
1089 .headers()
1090 .get(reqwest::header::CONTENT_TYPE)
1091 .and_then(|value| value.to_str().ok())
1092 .unwrap_or("application/octet-stream")
1093 .to_string();
1094 let bytes = response
1095 .bytes()
1096 .await
1097 .map_err(|e| format!("read {}: {e}", entry.locator))?
1098 .to_vec();
1099 (bytes, content_type)
1100 };
1101 let content_hash = sha256_for_bytes(&bytes);
1102 let value = json!({
1103 "schema": "vela.regulatory-source-record.v1",
1104 "entry_id": entry.id,
1105 "source_id": source_id,
1106 "name": entry.name,
1107 "source_type": entry.source_type,
1108 "representation": entry.representation,
1109 "locator": entry.locator,
1110 "content_type": content_type,
1111 "content_length": bytes.len(),
1112 "content_hash": content_hash,
1113 });
1114 Ok(SourceFetch {
1115 value,
1116 content_hash,
1117 source_url: entry.locator.clone(),
1118 api_url: entry.locator.clone(),
1119 })
1120}
1121
1122fn fixture_path_for_source(dir: &Path, source_id: &str) -> Option<PathBuf> {
1123 ["json", "txt", "html", "pdf"]
1124 .iter()
1125 .map(|extension| dir.join(format!("{source_id}.{extension}")))
1126 .find(|path| path.exists())
1127}
1128
1129fn content_type_for_path(path: &Path) -> String {
1130 match path.extension().and_then(|extension| extension.to_str()) {
1131 Some("json") => "application/json".to_string(),
1132 Some("html") => "text/html".to_string(),
1133 Some("pdf") => "application/pdf".to_string(),
1134 Some("txt") => "text/plain".to_string(),
1135 _ => "application/octet-stream".to_string(),
1136 }
1137}
1138
1139fn nct_id_from_locator(locator: &str) -> Result<String, String> {
1140 locator
1141 .split('/')
1142 .next_back()
1143 .filter(|value| value.starts_with("NCT"))
1144 .map(str::to_string)
1145 .ok_or_else(|| format!("ClinicalTrials.gov locator does not end in an NCT id: {locator}"))
1146}
1147
1148fn source_id_for_entry(entry: &SourceIngestEntry, spec: &AdapterSpec) -> Result<String, String> {
1149 match spec.kind {
1150 AdapterKind::ClinicalTrialsGovV2 => nct_id_from_locator(&entry.locator),
1151 AdapterKind::RegulatoryDocumentsV1 => Ok(entry.id.clone()),
1152 }
1153}
1154
1155fn artifact_kind_for_entry(entry: &SourceIngestEntry) -> String {
1156 if valid_artifact_kind(&entry.representation) {
1157 entry.representation.clone()
1158 } else if valid_artifact_kind(&entry.source_type) {
1159 entry.source_type.clone()
1160 } else {
1161 "registry_record".to_string()
1162 }
1163}
1164
1165fn run_manifest(
1166 options: &SourceAdapterRunOptions,
1167 run_id: &str,
1168 selected_entries: usize,
1169 skipped_excluded: usize,
1170 records: &[ClinicalTrialsRecordReport],
1171 failures: &[SourceAdapterFailure],
1172) -> Value {
1173 json!({
1174 "schema": "vela.source-adapter-run.v1",
1175 "run_id": run_id,
1176 "adapter": options.adapter,
1177 "actor": options.actor,
1178 "created_at": Utc::now().to_rfc3339(),
1179 "selected_entries": selected_entries,
1180 "skipped_excluded": skipped_excluded,
1181 "records": records,
1182 "failed_records": failures,
1183 "policy": {
1184 "apply_artifacts": options.apply_artifacts,
1185 "allow_partial": options.allow_partial,
1186 "include_excluded": options.include_excluded,
1187 "priority": options.priority,
1188 "entries": options.entries,
1189 }
1190 })
1191}
1192
1193fn artifact_packet(
1194 options: &SourceAdapterRunOptions,
1195 packet_id: &str,
1196 run_id: &str,
1197 manifest_hash: &str,
1198 records: &[ClinicalTrialsRecordReport],
1199 fetched: &[FetchedRecord],
1200 spec: &AdapterSpec,
1201) -> Result<ArtifactPacket, String> {
1202 let created_at = Utc::now().to_rfc3339();
1203 let target_findings = records
1204 .iter()
1205 .flat_map(|record| record.target_findings.clone())
1206 .collect::<std::collections::BTreeSet<_>>()
1207 .into_iter()
1208 .collect::<Vec<_>>();
1209 let mut artifacts = vec![PacketArtifact {
1210 id: format!("{run_id}_manifest"),
1211 kind: "source_file".to_string(),
1212 title: format!("{} {run_id}", spec.manifest_title),
1213 locator: format!(
1214 "https://github.com/vela-science/vela/tree/main/projects/anti-amyloid-translation/ingest/runs/{run_id}/run.json"
1215 ),
1216 content_hash: manifest_hash.to_string(),
1217 parents: Vec::new(),
1218 metadata: BTreeMap::from([
1219 ("adapter".to_string(), json!(options.adapter)),
1220 ("run_id".to_string(), json!(run_id)),
1221 ("records".to_string(), json!(records)),
1222 ("target_findings".to_string(), json!(target_findings)),
1223 ]),
1224 }];
1225
1226 for record in fetched.iter().filter(|record| record.changed) {
1227 artifacts.push(PacketArtifact {
1228 id: format!(
1229 "{}_{}",
1230 safe_id_fragment(spec.id),
1231 safe_id_fragment(&record.source_id)
1232 ),
1233 kind: record.artifact_kind.clone(),
1234 title: source_record_title(spec, record),
1235 locator: record.source_url.clone(),
1236 content_hash: record.content_hash.clone(),
1237 parents: vec![format!("{run_id}_manifest")],
1238 metadata: BTreeMap::from([
1239 ("adapter".to_string(), json!(options.adapter)),
1240 ("run_id".to_string(), json!(run_id)),
1241 ("entry_id".to_string(), json!(record.entry.id)),
1242 ("source_id".to_string(), json!(record.source_id)),
1243 ("source_label".to_string(), json!(record.source_label)),
1244 ("artifact_kind".to_string(), json!(record.artifact_kind)),
1245 ("nct_id".to_string(), json!(record.nct_id)),
1246 ("api_url".to_string(), json!(record.api_url)),
1247 ("source_url".to_string(), json!(record.source_url)),
1248 (
1249 "old_artifact_id".to_string(),
1250 json!(record.entry.current_frontier_artifact_id),
1251 ),
1252 (
1253 "old_content_hash".to_string(),
1254 json!(record.old_content_hash),
1255 ),
1256 ("new_content_hash".to_string(), json!(record.content_hash)),
1257 (
1258 "target_findings".to_string(),
1259 json!(record.entry.target_findings),
1260 ),
1261 ("retrieved_at".to_string(), json!(created_at)),
1262 ]),
1263 });
1264 }
1265
1266 Ok(ArtifactPacket {
1267 schema: ARTIFACT_PACKET_SCHEMA.to_string(),
1268 packet_id: packet_id.to_string(),
1269 producer: PacketProducer {
1270 kind: "source_adapter".to_string(),
1271 id: format!("adapter:{}", options.adapter),
1272 name: spec.producer_name.to_string(),
1273 },
1274 topic: "Anti-amyloid translation in Alzheimer's disease".to_string(),
1275 created_at,
1276 artifacts,
1277 candidate_claims: Vec::new(),
1278 open_needs: Vec::new(),
1279 caveats: vec![
1280 spec.caveat.to_string(),
1281 "Truth-changing frontier updates require reviewer acceptance.".to_string(),
1282 ],
1283 })
1284}
1285
1286fn source_record_title(spec: &AdapterSpec, record: &FetchedRecord) -> String {
1287 match spec.kind {
1288 AdapterKind::ClinicalTrialsGovV2 => record
1289 .value
1290 .pointer("/protocolSection/identificationModule/briefTitle")
1291 .and_then(Value::as_str)
1292 .or_else(|| {
1293 record
1294 .value
1295 .pointer("/protocolSection/identificationModule/officialTitle")
1296 .and_then(Value::as_str)
1297 })
1298 .map_or_else(
1299 || format!("ClinicalTrials.gov {}", record.source_id),
1300 |title| format!("ClinicalTrials.gov {} · {title}", record.source_id),
1301 ),
1302 AdapterKind::RegulatoryDocumentsV1 => {
1303 format!("{} · {}", record.entry.name, record.source_id)
1304 }
1305 }
1306}
1307
1308fn safe_id_fragment(value: &str) -> String {
1309 value
1310 .chars()
1311 .map(|ch| {
1312 if ch.is_ascii_alphanumeric() || ch == '_' {
1313 ch
1314 } else {
1315 '_'
1316 }
1317 })
1318 .collect()
1319}
1320
1321fn sha256_for_bytes(bytes: &[u8]) -> String {
1322 format!("sha256:{}", hex::encode(Sha256::digest(bytes)))
1323}
1324
1325fn run_id(adapter: &str, fetched: &[FetchedRecord], failures: &[SourceAdapterFailure]) -> String {
1326 let preimage = json!({
1327 "adapter": adapter,
1328 "created_at": Utc::now().to_rfc3339(),
1329 "records": fetched.iter().map(|record| json!({
1330 "entry_id": record.entry.id,
1331 "source_id": record.source_id,
1332 "nct_id": record.nct_id,
1333 "content_hash": record.content_hash,
1334 })).collect::<Vec<_>>(),
1335 "failures": failures,
1336 });
1337 let bytes = canonical::to_canonical_bytes(&preimage).unwrap_or_else(|_| Vec::new());
1338 format!("sir_{}", &hex::encode(Sha256::digest(&bytes))[..16])
1339}
1340
1341fn packet_id(adapter: &str, run_id: &str, manifest_hash: &str) -> String {
1342 let preimage = format!("{adapter}|{run_id}|{manifest_hash}");
1343 format!(
1344 "cap_{}",
1345 &hex::encode(Sha256::digest(preimage.as_bytes()))[..16]
1346 )
1347}
1348
1349#[allow(dead_code)]
1350fn _compiler_version() -> &'static str {
1351 project::VELA_COMPILER_VERSION
1352}