Skip to main content

vela_protocol/
source_adapters.rs

1//! Source adapter runs that turn live source records into artifact packets.
2//!
3//! The shipped adapters read the reviewed frontier-owned source ingest plan,
4//! fetch selected public records, compare content hashes against the current
5//! artifact projection, and route changed source records through the existing
6//! artifact-to-state path.
7
8use std::collections::BTreeMap;
9use std::fs;
10use std::path::{Path, PathBuf};
11
12use chrono::Utc;
13use serde::{Deserialize, Serialize};
14use serde_json::{Value, json};
15use sha2::{Digest, Sha256};
16
17use crate::artifact_to_state::{
18    ARTIFACT_PACKET_SCHEMA, ArtifactPacket, ImportIdempotency, PacketArtifact, PacketProducer,
19};
20use crate::bundle::{Artifact, valid_artifact_kind};
21use crate::canonical;
22use crate::decision::{SOURCE_INGEST_PLAN_SCHEMA, SourceIngestEntry, SourceIngestPlan};
23use crate::events::StateTarget;
24use crate::proposals;
25use crate::{artifact_to_state, project, repo};
26
27pub const CLINICALTRIALS_GOV_V2: &str = "clinicaltrials-gov-v2";
28pub const REGULATORY_DOCUMENTS_V1: &str = "regulatory-documents-v1";
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31enum AdapterKind {
32    ClinicalTrialsGovV2,
33    RegulatoryDocumentsV1,
34}
35
36#[derive(Debug, Clone, Copy)]
37struct AdapterSpec {
38    id: &'static str,
39    kind: AdapterKind,
40    category: &'static str,
41    manifest_title: &'static str,
42    producer_name: &'static str,
43    source_label: &'static str,
44    caveat: &'static str,
45}
46
47fn adapter_spec(adapter: &str) -> Result<AdapterSpec, String> {
48    match adapter {
49        CLINICALTRIALS_GOV_V2 => Ok(AdapterSpec {
50            id: CLINICALTRIALS_GOV_V2,
51            kind: AdapterKind::ClinicalTrialsGovV2,
52            category: "clinical_trial_registry",
53            manifest_title: "ClinicalTrials.gov adapter run",
54            producer_name: "ClinicalTrials.gov API v2 source adapter",
55            source_label: "ClinicalTrials.gov record",
56            caveat: "Source adapter output is registry metadata, not a clinical conclusion.",
57        }),
58        REGULATORY_DOCUMENTS_V1 => Ok(AdapterSpec {
59            id: REGULATORY_DOCUMENTS_V1,
60            kind: AdapterKind::RegulatoryDocumentsV1,
61            category: "regulatory",
62            manifest_title: "Regulatory document adapter run",
63            producer_name: "Regulatory document source adapter",
64            source_label: "regulatory source record",
65            caveat: "Source adapter output is regulatory source metadata, not a clinical conclusion.",
66        }),
67        _ => Err(format!("unsupported source adapter '{adapter}'")),
68    }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
72pub struct SourceAdapterRunOptions {
73    pub adapter: String,
74    pub actor: String,
75    #[serde(default)]
76    pub entries: Vec<String>,
77    #[serde(default, skip_serializing_if = "Option::is_none")]
78    pub priority: Option<String>,
79    #[serde(default)]
80    pub include_excluded: bool,
81    #[serde(default)]
82    pub allow_partial: bool,
83    #[serde(default)]
84    pub dry_run: bool,
85    #[serde(default, skip_serializing_if = "Option::is_none")]
86    pub input_dir: Option<PathBuf>,
87    #[serde(default)]
88    pub apply_artifacts: bool,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
92pub struct SourceAdapterRunReport {
93    pub ok: bool,
94    pub command: String,
95    pub adapter: String,
96    pub run_id: String,
97    pub frontier: String,
98    pub dry_run: bool,
99    pub selected_entries: usize,
100    pub skipped_excluded: usize,
101    pub fetched_records: usize,
102    pub unchanged_records: usize,
103    pub changed_records: usize,
104    pub artifact_proposals: usize,
105    #[serde(default)]
106    pub review_note_proposals: usize,
107    pub proposal_ids: Vec<String>,
108    #[serde(default)]
109    pub review_proposal_ids: Vec<String>,
110    pub applied_event_ids: Vec<String>,
111    pub idempotency: ImportIdempotency,
112    pub trusted_state_effect: String,
113    pub failed_records: Vec<SourceAdapterFailure>,
114    pub records: Vec<ClinicalTrialsRecordReport>,
115    #[serde(default, skip_serializing_if = "Option::is_none")]
116    pub packet_id: Option<String>,
117    #[serde(default, skip_serializing_if = "Option::is_none")]
118    pub packet_path: Option<PathBuf>,
119    #[serde(default, skip_serializing_if = "Option::is_none")]
120    pub run_path: Option<PathBuf>,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
124pub struct SourceAdapterFailure {
125    pub entry_id: String,
126    #[serde(default)]
127    pub source_id: String,
128    pub nct_id: String,
129    pub locator: String,
130    pub error: String,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
134pub struct ClinicalTrialsRecordReport {
135    pub entry_id: String,
136    #[serde(default)]
137    pub source_id: String,
138    pub nct_id: String,
139    #[serde(default)]
140    pub source_label: String,
141    #[serde(default)]
142    pub artifact_kind: String,
143    pub status: String,
144    pub source_url: String,
145    pub api_url: String,
146    pub content_hash: String,
147    #[serde(default, skip_serializing_if = "Option::is_none")]
148    pub old_artifact_id: Option<String>,
149    #[serde(default, skip_serializing_if = "Option::is_none")]
150    pub old_content_hash: Option<String>,
151    #[serde(default)]
152    pub target_findings: Vec<String>,
153    #[serde(default)]
154    pub changed_fields: Vec<ClinicalTrialsFieldChange>,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
158pub struct ClinicalTrialsFieldChange {
159    pub field: String,
160    pub label: String,
161    pub path: String,
162    #[serde(default, skip_serializing_if = "Option::is_none")]
163    pub old_value: Option<String>,
164    #[serde(default, skip_serializing_if = "Option::is_none")]
165    pub new_value: Option<String>,
166}
167
168#[derive(Debug, Clone)]
169struct FetchedRecord {
170    entry: SourceIngestEntry,
171    source_id: String,
172    nct_id: String,
173    source_label: String,
174    artifact_kind: String,
175    source_url: String,
176    api_url: String,
177    value: Value,
178    content_hash: String,
179    old_content_hash: Option<String>,
180    changed_fields: Vec<ClinicalTrialsFieldChange>,
181    changed: bool,
182}
183
184#[derive(Debug, Clone)]
185struct SourceFetch {
186    value: Value,
187    content_hash: String,
188    source_url: String,
189    api_url: String,
190}
191
192pub async fn run(
193    frontier_path: &Path,
194    options: SourceAdapterRunOptions,
195) -> Result<SourceAdapterRunReport, String> {
196    let spec = adapter_spec(&options.adapter)?;
197    if options.actor.trim().is_empty() {
198        return Err("actor must be non-empty".to_string());
199    }
200    if let Some(priority) = options.priority.as_deref()
201        && !matches!(priority, "P0" | "P1" | "P2")
202    {
203        return Err("priority must be P0, P1, or P2".to_string());
204    }
205
206    let frontier = repo::load_from_path(frontier_path)?;
207    let ingest_dir = source_ingest_dir(frontier_path)?;
208    let frontier_root = frontier_root_for_ingest_dir(&ingest_dir, frontier_path);
209    let plan = load_ingest_plan(&ingest_dir)?;
210    let selected = selected_entries(&plan, &options, &spec);
211    let skipped_excluded = plan
212        .entries
213        .iter()
214        .filter(|entry| {
215            entry.category == spec.category
216                && entry.ingest_status == "excluded"
217                && !options.include_excluded
218        })
219        .count();
220
221    let mut fetched = Vec::new();
222    let mut failures = Vec::new();
223    for entry in selected.iter().cloned() {
224        let source_id = match source_id_for_entry(&entry, &spec) {
225            Ok(source_id) => source_id,
226            Err(error) => {
227                failures.push(SourceAdapterFailure {
228                    entry_id: entry.id,
229                    source_id: String::new(),
230                    nct_id: String::new(),
231                    locator: entry.locator,
232                    error,
233                });
234                continue;
235            }
236        };
237        match fetch_source_record(&entry, &spec, &source_id, options.input_dir.as_deref()).await {
238            Ok(fetched_record) => {
239                let nct_id = if spec.kind == AdapterKind::ClinicalTrialsGovV2 {
240                    source_id.clone()
241                } else {
242                    entry.id.clone()
243                };
244                let artifact_kind = artifact_kind_for_entry(&entry);
245                let current_artifact = entry
246                    .current_frontier_artifact_id
247                    .as_deref()
248                    .and_then(|id| frontier.artifacts.iter().find(|artifact| artifact.id == id));
249                let old_content_hash =
250                    current_artifact.map(|artifact| artifact.content_hash.clone());
251                let previous_record = previous_source_record(
252                    &frontier,
253                    &ingest_dir,
254                    &entry,
255                    &source_id,
256                    old_content_hash.as_deref(),
257                );
258                let mut changed_fields = tracked_source_changes(
259                    &spec,
260                    previous_record.as_ref(),
261                    &fetched_record.value,
262                    old_content_hash.as_deref(),
263                    &fetched_record.content_hash,
264                );
265                let changed = source_record_changed(
266                    &spec,
267                    current_artifact,
268                    &entry,
269                    &frontier_root,
270                    previous_record.as_ref(),
271                    old_content_hash.as_deref(),
272                    &fetched_record.content_hash,
273                );
274                if spec.kind == AdapterKind::RegulatoryDocumentsV1
275                    && previous_record.is_none()
276                    && let Some(old_locator) = current_artifact.and_then(|artifact| {
277                        regulatory_locator_manifest_url(&frontier_root, artifact, &entry)
278                    })
279                {
280                    changed_fields = if old_locator == entry.locator {
281                        Vec::new()
282                    } else {
283                        vec![ClinicalTrialsFieldChange {
284                            field: "locator".to_string(),
285                            label: "source locator".to_string(),
286                            path: "/locator".to_string(),
287                            old_value: Some(old_locator),
288                            new_value: Some(entry.locator.clone()),
289                        }]
290                    };
291                }
292                if !changed {
293                    changed_fields.clear();
294                }
295                fetched.push(FetchedRecord {
296                    entry,
297                    source_id: source_id.clone(),
298                    source_label: spec.source_label.to_string(),
299                    artifact_kind,
300                    source_url: fetched_record.source_url,
301                    api_url: fetched_record.api_url,
302                    nct_id,
303                    value: fetched_record.value,
304                    content_hash: fetched_record.content_hash,
305                    old_content_hash,
306                    changed_fields,
307                    changed,
308                });
309            }
310            Err(error) => failures.push(SourceAdapterFailure {
311                entry_id: entry.id.clone(),
312                source_id: source_id.clone(),
313                nct_id: if spec.kind == AdapterKind::ClinicalTrialsGovV2 {
314                    source_id
315                } else {
316                    entry.id
317                },
318                locator: entry.locator,
319                error,
320            }),
321        }
322    }
323
324    if !failures.is_empty() && !options.allow_partial {
325        let failed = failures
326            .iter()
327            .map(|failure| format!("{} ({})", failure.nct_id, failure.error))
328            .collect::<Vec<_>>()
329            .join(", ");
330        return Err(format!("source adapter run failed for {failed}"));
331    }
332
333    let changed_records = fetched.iter().filter(|record| record.changed).count();
334    let unchanged_records = fetched.len() - changed_records;
335    let run_id = run_id(&options.adapter, &fetched, &failures);
336    let records = fetched
337        .iter()
338        .map(|record| ClinicalTrialsRecordReport {
339            entry_id: record.entry.id.clone(),
340            source_id: record.source_id.clone(),
341            nct_id: record.nct_id.clone(),
342            source_label: record.source_label.clone(),
343            artifact_kind: record.artifact_kind.clone(),
344            status: if record.changed {
345                if record.old_content_hash.is_some() {
346                    "changed".to_string()
347                } else {
348                    "new".to_string()
349                }
350            } else {
351                "unchanged".to_string()
352            },
353            source_url: record.source_url.clone(),
354            api_url: record.api_url.clone(),
355            content_hash: record.content_hash.clone(),
356            old_artifact_id: record.entry.current_frontier_artifact_id.clone(),
357            old_content_hash: record.old_content_hash.clone(),
358            target_findings: record.entry.target_findings.clone(),
359            changed_fields: record.changed_fields.clone(),
360        })
361        .collect::<Vec<_>>();
362
363    if options.dry_run {
364        return Ok(SourceAdapterRunReport {
365            ok: true,
366            command: "source-adapter.run".to_string(),
367            adapter: options.adapter,
368            run_id: run_id.clone(),
369            frontier: frontier.project.name,
370            dry_run: true,
371            selected_entries: selected.len(),
372            skipped_excluded,
373            fetched_records: fetched.len(),
374            unchanged_records,
375            changed_records,
376            artifact_proposals: 0,
377            review_note_proposals: 0,
378            proposal_ids: Vec::new(),
379            review_proposal_ids: Vec::new(),
380            applied_event_ids: Vec::new(),
381            idempotency: ImportIdempotency {
382                packet_hash: format!("sha256:{}", hex::encode(Sha256::digest(run_id.as_bytes()))),
383                duplicate_packet: false,
384                skipped_existing_proposals: Vec::new(),
385                skipped_existing_artifacts: Vec::new(),
386            },
387            trusted_state_effect: "none".to_string(),
388            failed_records: failures,
389            records,
390            packet_id: None,
391            packet_path: None,
392            run_path: None,
393        });
394    }
395
396    let run_dir = ingest_dir.join("runs").join(&run_id);
397    fs::create_dir_all(run_dir.join("records"))
398        .map_err(|e| format!("create source adapter run dir '{}': {e}", run_dir.display()))?;
399    for record in &fetched {
400        fs::write(
401            run_dir
402                .join("records")
403                .join(format!("{}.json", record.source_id)),
404            serde_json::to_vec_pretty(&record.value)
405                .map_err(|e| format!("serialize record: {e}"))?,
406        )
407        .map_err(|e| format!("write fetched record {}: {e}", record.source_id))?;
408    }
409
410    let manifest = run_manifest(
411        &options,
412        &run_id,
413        selected.len(),
414        skipped_excluded,
415        &records,
416        &failures,
417    );
418    let manifest_bytes = canonical::to_canonical_bytes(&manifest)?;
419    let manifest_hash = sha256_for_bytes(&manifest_bytes);
420    let packet_id = packet_id(&options.adapter, &run_id, &manifest_hash);
421    let packet = artifact_packet(
422        &options,
423        &packet_id,
424        &run_id,
425        &manifest_hash,
426        &records,
427        &fetched,
428        &spec,
429    )?;
430    let packet_path = run_dir.join("artifact-packet.json");
431    fs::write(
432        &packet_path,
433        serde_json::to_vec_pretty(&packet).map_err(|e| format!("serialize packet: {e}"))?,
434    )
435    .map_err(|e| format!("write artifact packet '{}': {e}", packet_path.display()))?;
436
437    let import_report = artifact_to_state::import_packet_at_path(
438        frontier_path,
439        &packet_path,
440        &options.actor,
441        options.apply_artifacts,
442    )?;
443    let review_proposal_ids = create_review_note_proposals(
444        frontier_path,
445        &options,
446        &spec,
447        &run_id,
448        &packet_id,
449        &fetched,
450    )?;
451    let mut proposal_ids = import_report.proposal_ids;
452    proposal_ids.extend(review_proposal_ids.clone());
453    if options.apply_artifacts {
454        update_ingest_plan_after_apply(frontier_path, &ingest_dir, &fetched)?;
455    }
456
457    let final_run = json!({
458        "schema": "vela.source-adapter-run.v1",
459        "run_id": run_id,
460        "adapter": options.adapter,
461        "frontier": frontier.project.name,
462        "started_at": packet.created_at,
463        "selected_entries": selected.len(),
464        "skipped_excluded": skipped_excluded,
465        "fetched_records": fetched.len(),
466        "unchanged_records": unchanged_records,
467        "changed_records": changed_records,
468        "failed_records": failures,
469        "records": records,
470        "packet_id": packet_id,
471        "packet_path": "artifact-packet.json",
472        "proposal_ids": proposal_ids,
473        "review_proposal_ids": review_proposal_ids,
474        "applied_event_ids": import_report.applied_event_ids,
475        "idempotency": import_report.idempotency,
476        "trusted_state_effect": import_report.trusted_state_effect,
477    });
478    fs::write(
479        run_dir.join("run.json"),
480        serde_json::to_vec_pretty(&final_run).map_err(|e| format!("serialize run: {e}"))?,
481    )
482    .map_err(|e| format!("write run manifest '{}': {e}", run_dir.display()))?;
483
484    Ok(SourceAdapterRunReport {
485        ok: true,
486        command: "source-adapter.run".to_string(),
487        adapter: options.adapter,
488        run_id,
489        frontier: frontier.project.name,
490        dry_run: false,
491        selected_entries: selected.len(),
492        skipped_excluded,
493        fetched_records: fetched.len(),
494        unchanged_records,
495        changed_records,
496        artifact_proposals: import_report.artifact_proposals,
497        review_note_proposals: review_proposal_ids.len(),
498        proposal_ids,
499        review_proposal_ids,
500        applied_event_ids: import_report.applied_event_ids,
501        idempotency: import_report.idempotency,
502        trusted_state_effect: import_report.trusted_state_effect,
503        failed_records: failures,
504        records,
505        packet_id: Some(packet_id),
506        packet_path: Some(packet_path),
507        run_path: Some(run_dir),
508    })
509}
510
511fn source_ingest_dir(frontier_path: &Path) -> Result<PathBuf, String> {
512    match repo::detect(frontier_path)? {
513        repo::VelaSource::VelaRepo(root) => Ok(root.join("ingest")),
514        repo::VelaSource::ProjectFile(path) => path
515            .parent()
516            .map(|parent| parent.join("ingest"))
517            .ok_or_else(|| format!("frontier file '{}' has no parent", path.display())),
518        repo::VelaSource::PacketDir(dir) => Ok(dir.join("ingest")),
519    }
520}
521
522fn frontier_root_for_ingest_dir(ingest_dir: &Path, frontier_path: &Path) -> PathBuf {
523    ingest_dir
524        .parent()
525        .map(Path::to_path_buf)
526        .or_else(|| {
527            if frontier_path.is_dir() {
528                Some(frontier_path.to_path_buf())
529            } else {
530                frontier_path.parent().map(Path::to_path_buf)
531            }
532        })
533        .unwrap_or_else(|| PathBuf::from("."))
534}
535
536fn load_ingest_plan(ingest_dir: &Path) -> Result<SourceIngestPlan, String> {
537    let path = ingest_dir.join("source-ingest-plan.v1.json");
538    let data = fs::read_to_string(&path)
539        .map_err(|e| format!("read source ingest plan '{}': {e}", path.display()))?;
540    let plan: SourceIngestPlan =
541        serde_json::from_str(&data).map_err(|e| format!("parse source ingest plan: {e}"))?;
542    if plan.schema != SOURCE_INGEST_PLAN_SCHEMA {
543        return Err(format!(
544            "source ingest plan schema must be {SOURCE_INGEST_PLAN_SCHEMA}"
545        ));
546    }
547    Ok(plan)
548}
549
550fn source_record_changed(
551    spec: &AdapterSpec,
552    current_artifact: Option<&Artifact>,
553    entry: &SourceIngestEntry,
554    frontier_root: &Path,
555    previous_record: Option<&Value>,
556    old_content_hash: Option<&str>,
557    new_content_hash: &str,
558) -> bool {
559    let Some(old_content_hash) = old_content_hash else {
560        return true;
561    };
562    match spec.kind {
563        AdapterKind::ClinicalTrialsGovV2 => old_content_hash != new_content_hash,
564        AdapterKind::RegulatoryDocumentsV1 => {
565            if previous_record.is_some() {
566                return old_content_hash != new_content_hash;
567            }
568            if let Some(old_locator) = current_artifact.and_then(|artifact| {
569                regulatory_locator_manifest_url(frontier_root, artifact, entry)
570            }) {
571                return old_locator != entry.locator;
572            }
573            old_content_hash != new_content_hash
574        }
575    }
576}
577
578fn regulatory_locator_manifest_url(
579    frontier_root: &Path,
580    artifact: &Artifact,
581    entry: &SourceIngestEntry,
582) -> Option<String> {
583    if entry.source_type == "frontier_projection" {
584        return Some(entry.locator.clone());
585    }
586    if artifact.metadata.contains_key("source_adapter")
587        || artifact.metadata.contains_key("source_adapter_run_id")
588        || artifact.metadata.contains_key("run_id")
589    {
590        return None;
591    }
592    if artifact.storage_mode != "local_blob" {
593        return None;
594    }
595    let is_json = artifact
596        .media_type
597        .as_deref()
598        .is_some_and(|media_type| media_type.contains("json"));
599    if !is_json {
600        return None;
601    }
602    let locator = artifact.locator.as_deref()?;
603    let path = resolve_artifact_locator(frontier_root, locator);
604    let Ok(bytes) = fs::read(path) else {
605        return None;
606    };
607    let Ok(value) = serde_json::from_slice::<Value>(&bytes) else {
608        return None;
609    };
610    let manifest_url = value
611        .get("url")
612        .or_else(|| value.get("source_url"))
613        .and_then(Value::as_str)
614        .map(str::to_string)?;
615    let manifest_only = value
616        .get("access_terms")
617        .and_then(Value::as_str)
618        .is_some_and(|terms| terms.contains("stored as a Vela manifest"));
619    if manifest_only || value.get("content_hash").is_none() {
620        Some(manifest_url)
621    } else {
622        None
623    }
624}
625
626fn resolve_artifact_locator(frontier_root: &Path, locator: &str) -> PathBuf {
627    let path = Path::new(locator);
628    if path.is_absolute() {
629        path.to_path_buf()
630    } else {
631        frontier_root.join(path)
632    }
633}
634
635fn update_ingest_plan_after_apply(
636    frontier_path: &Path,
637    ingest_dir: &Path,
638    fetched: &[FetchedRecord],
639) -> Result<(), String> {
640    let path = ingest_dir.join("source-ingest-plan.v1.json");
641    let mut plan = load_ingest_plan(ingest_dir)?;
642    let frontier = repo::load_from_path(frontier_path)?;
643    let mut changed = false;
644    for record in fetched.iter().filter(|record| record.changed) {
645        let Some(artifact_id) = frontier
646            .artifacts
647            .iter()
648            .find(|artifact| {
649                artifact.kind == record.artifact_kind
650                    && artifact.content_hash == record.content_hash
651                    && artifact
652                        .metadata
653                        .get("source_id")
654                        .or_else(|| artifact.metadata.get("entry_id"))
655                        .or_else(|| artifact.metadata.get("nct_id"))
656                        .and_then(Value::as_str)
657                        .is_some_and(|source| source == record.source_id)
658            })
659            .map(|artifact| artifact.id.clone())
660        else {
661            continue;
662        };
663        if let Some(entry) = plan
664            .entries
665            .iter_mut()
666            .find(|entry| entry.id == record.entry.id)
667            && entry.current_frontier_artifact_id.as_deref() != Some(artifact_id.as_str())
668        {
669            entry.current_frontier_artifact_id = Some(artifact_id);
670            entry.ingest_status = "ingested".to_string();
671            changed = true;
672        }
673    }
674    if changed {
675        fs::write(
676            &path,
677            serde_json::to_vec_pretty(&plan)
678                .map_err(|e| format!("serialize source ingest plan: {e}"))?,
679        )
680        .map_err(|e| format!("write source ingest plan '{}': {e}", path.display()))?;
681    }
682    Ok(())
683}
684
685fn selected_entries(
686    plan: &SourceIngestPlan,
687    options: &SourceAdapterRunOptions,
688    spec: &AdapterSpec,
689) -> Vec<SourceIngestEntry> {
690    plan.entries
691        .iter()
692        .filter(|entry| entry.category == spec.category)
693        .filter(|entry| options.include_excluded || entry.ingest_status != "excluded")
694        .filter(|entry| {
695            options.entries.is_empty() || options.entries.iter().any(|wanted| wanted == &entry.id)
696        })
697        .filter(|entry| {
698            options
699                .priority
700                .as_deref()
701                .is_none_or(|priority| entry.priority == priority)
702        })
703        .cloned()
704        .collect()
705}
706
707fn previous_source_record(
708    frontier: &project::Project,
709    ingest_dir: &Path,
710    source_entry: &SourceIngestEntry,
711    source_id: &str,
712    expected_hash: Option<&str>,
713) -> Option<Value> {
714    let artifact = source_entry
715        .current_frontier_artifact_id
716        .as_deref()
717        .and_then(|id| frontier.artifacts.iter().find(|artifact| artifact.id == id))?;
718    let expected_hash = expected_hash.unwrap_or(&artifact.content_hash);
719
720    if let Some(run_id) = artifact
721        .metadata
722        .get("run_id")
723        .or_else(|| artifact.metadata.get("source_adapter_run_id"))
724        .and_then(Value::as_str)
725    {
726        let path = ingest_dir
727            .join("runs")
728            .join(run_id)
729            .join("records")
730            .join(format!("{source_id}.json"));
731        if let Some(value) = read_record_if_hash_matches(&path, expected_hash) {
732            return Some(value);
733        }
734    }
735
736    let runs_dir = ingest_dir.join("runs");
737    let entries = fs::read_dir(runs_dir).ok()?;
738    for entry in entries.flatten() {
739        let path = entry
740            .path()
741            .join("records")
742            .join(format!("{source_id}.json"));
743        if let Some(value) = read_record_if_hash_matches(&path, expected_hash) {
744            return Some(value);
745        }
746    }
747    None
748}
749
750fn read_record_if_hash_matches(path: &Path, expected_hash: &str) -> Option<Value> {
751    let text = fs::read_to_string(path).ok()?;
752    let value: Value = serde_json::from_str(&text).ok()?;
753    if value
754        .get("content_hash")
755        .and_then(Value::as_str)
756        .is_some_and(|hash| hash == expected_hash)
757    {
758        return Some(value);
759    }
760    let bytes = canonical::to_canonical_bytes(&value).ok()?;
761    if sha256_for_bytes(&bytes) == expected_hash {
762        Some(value)
763    } else {
764        None
765    }
766}
767
768fn tracked_source_changes(
769    spec: &AdapterSpec,
770    previous_record: Option<&Value>,
771    new_record: &Value,
772    old_content_hash: Option<&str>,
773    new_content_hash: &str,
774) -> Vec<ClinicalTrialsFieldChange> {
775    match spec.kind {
776        AdapterKind::ClinicalTrialsGovV2 => previous_record
777            .map(|previous| tracked_clinicaltrials_changes(previous, new_record))
778            .unwrap_or_default(),
779        AdapterKind::RegulatoryDocumentsV1 => {
780            if let Some(previous) = previous_record {
781                tracked_regulatory_changes(previous, new_record)
782            } else if old_content_hash.is_some_and(|old| old != new_content_hash) {
783                vec![ClinicalTrialsFieldChange {
784                    field: "content_hash".to_string(),
785                    label: "content hash".to_string(),
786                    path: "/content_hash".to_string(),
787                    old_value: old_content_hash.map(str::to_string),
788                    new_value: Some(new_content_hash.to_string()),
789                }]
790            } else {
791                Vec::new()
792            }
793        }
794    }
795}
796
797fn tracked_regulatory_changes(
798    old_record: &Value,
799    new_record: &Value,
800) -> Vec<ClinicalTrialsFieldChange> {
801    const FIELDS: &[(&str, &str, &str)] = &[
802        ("content_hash", "content hash", "/content_hash"),
803        ("content_length", "content length", "/content_length"),
804        ("content_type", "content type", "/content_type"),
805        ("locator", "source locator", "/locator"),
806    ];
807
808    FIELDS
809        .iter()
810        .filter_map(|(field, label, path)| {
811            let old_value = old_record.pointer(path);
812            let new_value = new_record.pointer(path);
813            if old_value == new_value {
814                return None;
815            }
816            Some(ClinicalTrialsFieldChange {
817                field: (*field).to_string(),
818                label: (*label).to_string(),
819                path: (*path).to_string(),
820                old_value: field_value_summary(old_value),
821                new_value: field_value_summary(new_value),
822            })
823        })
824        .collect()
825}
826
827fn tracked_clinicaltrials_changes(
828    old_record: &Value,
829    new_record: &Value,
830) -> Vec<ClinicalTrialsFieldChange> {
831    const FIELDS: &[(&str, &str, &str)] = &[
832        (
833            "overall_status",
834            "overall status",
835            "/protocolSection/statusModule/overallStatus",
836        ),
837        (
838            "start_date",
839            "start date",
840            "/protocolSection/statusModule/startDateStruct/date",
841        ),
842        (
843            "primary_completion_date",
844            "primary completion date",
845            "/protocolSection/statusModule/primaryCompletionDateStruct/date",
846        ),
847        (
848            "completion_date",
849            "completion date",
850            "/protocolSection/statusModule/completionDateStruct/date",
851        ),
852        (
853            "phases",
854            "trial phase",
855            "/protocolSection/designModule/phases",
856        ),
857        (
858            "enrollment_count",
859            "enrollment count",
860            "/protocolSection/designModule/enrollmentInfo/count",
861        ),
862        (
863            "enrollment_type",
864            "enrollment type",
865            "/protocolSection/designModule/enrollmentInfo/type",
866        ),
867        (
868            "primary_outcomes",
869            "primary outcomes",
870            "/protocolSection/outcomesModule/primaryOutcomes",
871        ),
872        ("has_results", "posted results section", "/resultsSection"),
873    ];
874
875    FIELDS
876        .iter()
877        .filter_map(|(field, label, path)| {
878            let old_value = old_record.pointer(path);
879            let new_value = new_record.pointer(path);
880            if old_value == new_value {
881                return None;
882            }
883            Some(ClinicalTrialsFieldChange {
884                field: (*field).to_string(),
885                label: (*label).to_string(),
886                path: (*path).to_string(),
887                old_value: field_value_summary(old_value),
888                new_value: field_value_summary(new_value),
889            })
890        })
891        .collect()
892}
893
894fn field_value_summary(value: Option<&Value>) -> Option<String> {
895    let value = value?;
896    match value {
897        Value::Null => None,
898        Value::String(value) => Some(value.clone()),
899        Value::Number(value) => Some(value.to_string()),
900        Value::Bool(value) => Some(value.to_string()),
901        Value::Array(_) | Value::Object(_) => serde_json::to_string(value).ok(),
902    }
903}
904
905fn create_review_note_proposals(
906    frontier_path: &Path,
907    options: &SourceAdapterRunOptions,
908    spec: &AdapterSpec,
909    run_id: &str,
910    packet_id: &str,
911    fetched: &[FetchedRecord],
912) -> Result<Vec<String>, String> {
913    let mut ids = Vec::new();
914    for record in fetched
915        .iter()
916        .filter(|record| !record.changed_fields.is_empty())
917    {
918        let targets = record
919            .entry
920            .target_findings
921            .iter()
922            .filter(|id| id.starts_with("vf_"))
923            .cloned()
924            .collect::<std::collections::BTreeSet<_>>();
925        if targets.is_empty() {
926            continue;
927        }
928        let note_text = review_note_text(record);
929        for finding_id in targets {
930            let proposal = proposals::new_proposal(
931                "finding.note",
932                StateTarget {
933                    r#type: "finding".to_string(),
934                    id: finding_id,
935                },
936                options.actor.clone(),
937                if options.actor.starts_with("agent:") {
938                    "agent"
939                } else {
940                    "human"
941                },
942                format!(
943                    "Review {} delta for {} from run {}",
944                    spec.source_label, record.source_id, run_id
945                ),
946                json!({
947                    "text": note_text,
948                    "source_adapter": options.adapter,
949                    "source_adapter_run_id": run_id,
950                    "artifact_packet_id": packet_id,
951                    "entry_id": record.entry.id,
952                    "source_id": record.source_id,
953                    "source_label": record.source_label,
954                    "nct_id": record.nct_id,
955                    "api_url": record.api_url,
956                    "source_url": record.source_url,
957                    "old_content_hash": record.old_content_hash,
958                    "new_content_hash": record.content_hash,
959                    "changed_fields": record.changed_fields,
960                }),
961                vec![
962                    record.api_url.clone(),
963                    record.source_url.clone(),
964                    format!("source_adapter_run:{run_id}"),
965                    format!("artifact_packet:{packet_id}"),
966                ],
967                review_note_caveats(spec),
968            );
969            let result = proposals::create_or_apply(frontier_path, proposal, false)?;
970            ids.push(result.proposal_id);
971        }
972    }
973    Ok(ids)
974}
975
976fn review_note_text(record: &FetchedRecord) -> String {
977    let fields = record
978        .changed_fields
979        .iter()
980        .map(|change| {
981            format!(
982                "{} changed from {} to {}",
983                change.label,
984                change.old_value.as_deref().unwrap_or("missing"),
985                change.new_value.as_deref().unwrap_or("missing")
986            )
987        })
988        .collect::<Vec<_>>()
989        .join("; ");
990    if record.source_label == "ClinicalTrials.gov record" {
991        format!(
992            "ClinicalTrials.gov record {} changed tracked registry fields: {}. Review whether this affects the linked finding scope, trial table, or decision brief. Registry metadata alone does not change the claim.",
993            record.source_id, fields
994        )
995    } else {
996        format!(
997            "Regulatory source {} changed tracked source fields: {}. Review whether this affects the linked finding scope, trial table, or decision brief; source metadata alone does not change the claim.",
998            record.source_id, fields
999        )
1000    }
1001}
1002
1003fn review_note_caveats(spec: &AdapterSpec) -> Vec<String> {
1004    match spec.kind {
1005        AdapterKind::ClinicalTrialsGovV2 => vec![
1006            "ClinicalTrials.gov metadata changed; this is a review task, not a claim update."
1007                .to_string(),
1008            "Accepting this note records reviewer awareness only.".to_string(),
1009        ],
1010        AdapterKind::RegulatoryDocumentsV1 => vec![
1011            "Regulatory source metadata changed; this is a review task, not a claim update."
1012                .to_string(),
1013            "Accepting this note records reviewer awareness only.".to_string(),
1014        ],
1015    }
1016}
1017
1018async fn fetch_source_record(
1019    entry: &SourceIngestEntry,
1020    spec: &AdapterSpec,
1021    source_id: &str,
1022    input_dir: Option<&Path>,
1023) -> Result<SourceFetch, String> {
1024    match spec.kind {
1025        AdapterKind::ClinicalTrialsGovV2 => fetch_clinicaltrials_record(source_id, input_dir).await,
1026        AdapterKind::RegulatoryDocumentsV1 => {
1027            fetch_regulatory_record(entry, source_id, input_dir).await
1028        }
1029    }
1030}
1031
1032async fn fetch_clinicaltrials_record(
1033    nct_id: &str,
1034    input_dir: Option<&Path>,
1035) -> Result<SourceFetch, String> {
1036    let raw = if let Some(dir) = input_dir {
1037        let path = dir.join(format!("{nct_id}.json"));
1038        fs::read_to_string(&path)
1039            .map_err(|e| format!("read ClinicalTrials.gov fixture '{}': {e}", path.display()))?
1040    } else {
1041        let url = format!("https://clinicaltrials.gov/api/v2/studies/{nct_id}");
1042        let response = reqwest::get(&url)
1043            .await
1044            .map_err(|e| format!("fetch {url}: {e}"))?;
1045        let response = response
1046            .error_for_status()
1047            .map_err(|e| format!("fetch {url}: {e}"))?;
1048        response
1049            .text()
1050            .await
1051            .map_err(|e| format!("read {url}: {e}"))?
1052    };
1053    let value: Value =
1054        serde_json::from_str(&raw).map_err(|e| format!("parse ClinicalTrials.gov record: {e}"))?;
1055    let canonical_bytes = canonical::to_canonical_bytes(&value)?;
1056    Ok(SourceFetch {
1057        value,
1058        content_hash: sha256_for_bytes(&canonical_bytes),
1059        source_url: format!("https://clinicaltrials.gov/study/{nct_id}"),
1060        api_url: format!("https://clinicaltrials.gov/api/v2/studies/{nct_id}"),
1061    })
1062}
1063
1064async fn fetch_regulatory_record(
1065    entry: &SourceIngestEntry,
1066    source_id: &str,
1067    input_dir: Option<&Path>,
1068) -> Result<SourceFetch, String> {
1069    let (bytes, content_type) = if let Some(dir) = input_dir {
1070        let path = fixture_path_for_source(dir, source_id)
1071            .ok_or_else(|| format!("read regulatory fixture for {source_id}: file not found"))?;
1072        let bytes = fs::read(&path)
1073            .map_err(|e| format!("read regulatory fixture '{}': {e}", path.display()))?;
1074        (bytes, content_type_for_path(&path))
1075    } else {
1076        let client = reqwest::Client::builder()
1077            .user_agent("vela-source-adapter/0.55 (+https://vela.science)")
1078            .build()
1079            .map_err(|e| format!("create http client: {e}"))?;
1080        let response = client
1081            .get(&entry.locator)
1082            .send()
1083            .await
1084            .map_err(|e| format!("fetch {}: {e}", entry.locator))?;
1085        let response = response
1086            .error_for_status()
1087            .map_err(|e| format!("fetch {}: {e}", entry.locator))?;
1088        let content_type = response
1089            .headers()
1090            .get(reqwest::header::CONTENT_TYPE)
1091            .and_then(|value| value.to_str().ok())
1092            .unwrap_or("application/octet-stream")
1093            .to_string();
1094        let bytes = response
1095            .bytes()
1096            .await
1097            .map_err(|e| format!("read {}: {e}", entry.locator))?
1098            .to_vec();
1099        (bytes, content_type)
1100    };
1101    let content_hash = sha256_for_bytes(&bytes);
1102    let value = json!({
1103        "schema": "vela.regulatory-source-record.v1",
1104        "entry_id": entry.id,
1105        "source_id": source_id,
1106        "name": entry.name,
1107        "source_type": entry.source_type,
1108        "representation": entry.representation,
1109        "locator": entry.locator,
1110        "content_type": content_type,
1111        "content_length": bytes.len(),
1112        "content_hash": content_hash,
1113    });
1114    Ok(SourceFetch {
1115        value,
1116        content_hash,
1117        source_url: entry.locator.clone(),
1118        api_url: entry.locator.clone(),
1119    })
1120}
1121
1122fn fixture_path_for_source(dir: &Path, source_id: &str) -> Option<PathBuf> {
1123    ["json", "txt", "html", "pdf"]
1124        .iter()
1125        .map(|extension| dir.join(format!("{source_id}.{extension}")))
1126        .find(|path| path.exists())
1127}
1128
1129fn content_type_for_path(path: &Path) -> String {
1130    match path.extension().and_then(|extension| extension.to_str()) {
1131        Some("json") => "application/json".to_string(),
1132        Some("html") => "text/html".to_string(),
1133        Some("pdf") => "application/pdf".to_string(),
1134        Some("txt") => "text/plain".to_string(),
1135        _ => "application/octet-stream".to_string(),
1136    }
1137}
1138
1139fn nct_id_from_locator(locator: &str) -> Result<String, String> {
1140    locator
1141        .split('/')
1142        .next_back()
1143        .filter(|value| value.starts_with("NCT"))
1144        .map(str::to_string)
1145        .ok_or_else(|| format!("ClinicalTrials.gov locator does not end in an NCT id: {locator}"))
1146}
1147
1148fn source_id_for_entry(entry: &SourceIngestEntry, spec: &AdapterSpec) -> Result<String, String> {
1149    match spec.kind {
1150        AdapterKind::ClinicalTrialsGovV2 => nct_id_from_locator(&entry.locator),
1151        AdapterKind::RegulatoryDocumentsV1 => Ok(entry.id.clone()),
1152    }
1153}
1154
1155fn artifact_kind_for_entry(entry: &SourceIngestEntry) -> String {
1156    if valid_artifact_kind(&entry.representation) {
1157        entry.representation.clone()
1158    } else if valid_artifact_kind(&entry.source_type) {
1159        entry.source_type.clone()
1160    } else {
1161        "registry_record".to_string()
1162    }
1163}
1164
1165fn run_manifest(
1166    options: &SourceAdapterRunOptions,
1167    run_id: &str,
1168    selected_entries: usize,
1169    skipped_excluded: usize,
1170    records: &[ClinicalTrialsRecordReport],
1171    failures: &[SourceAdapterFailure],
1172) -> Value {
1173    json!({
1174        "schema": "vela.source-adapter-run.v1",
1175        "run_id": run_id,
1176        "adapter": options.adapter,
1177        "actor": options.actor,
1178        "created_at": Utc::now().to_rfc3339(),
1179        "selected_entries": selected_entries,
1180        "skipped_excluded": skipped_excluded,
1181        "records": records,
1182        "failed_records": failures,
1183        "policy": {
1184            "apply_artifacts": options.apply_artifacts,
1185            "allow_partial": options.allow_partial,
1186            "include_excluded": options.include_excluded,
1187            "priority": options.priority,
1188            "entries": options.entries,
1189        }
1190    })
1191}
1192
1193fn artifact_packet(
1194    options: &SourceAdapterRunOptions,
1195    packet_id: &str,
1196    run_id: &str,
1197    manifest_hash: &str,
1198    records: &[ClinicalTrialsRecordReport],
1199    fetched: &[FetchedRecord],
1200    spec: &AdapterSpec,
1201) -> Result<ArtifactPacket, String> {
1202    let created_at = Utc::now().to_rfc3339();
1203    let target_findings = records
1204        .iter()
1205        .flat_map(|record| record.target_findings.clone())
1206        .collect::<std::collections::BTreeSet<_>>()
1207        .into_iter()
1208        .collect::<Vec<_>>();
1209    let mut artifacts = vec![PacketArtifact {
1210        id: format!("{run_id}_manifest"),
1211        kind: "source_file".to_string(),
1212        title: format!("{} {run_id}", spec.manifest_title),
1213        locator: format!(
1214            "https://github.com/vela-science/vela/tree/main/projects/anti-amyloid-translation/ingest/runs/{run_id}/run.json"
1215        ),
1216        content_hash: manifest_hash.to_string(),
1217        parents: Vec::new(),
1218        metadata: BTreeMap::from([
1219            ("adapter".to_string(), json!(options.adapter)),
1220            ("run_id".to_string(), json!(run_id)),
1221            ("records".to_string(), json!(records)),
1222            ("target_findings".to_string(), json!(target_findings)),
1223        ]),
1224    }];
1225
1226    for record in fetched.iter().filter(|record| record.changed) {
1227        artifacts.push(PacketArtifact {
1228            id: format!(
1229                "{}_{}",
1230                safe_id_fragment(spec.id),
1231                safe_id_fragment(&record.source_id)
1232            ),
1233            kind: record.artifact_kind.clone(),
1234            title: source_record_title(spec, record),
1235            locator: record.source_url.clone(),
1236            content_hash: record.content_hash.clone(),
1237            parents: vec![format!("{run_id}_manifest")],
1238            metadata: BTreeMap::from([
1239                ("adapter".to_string(), json!(options.adapter)),
1240                ("run_id".to_string(), json!(run_id)),
1241                ("entry_id".to_string(), json!(record.entry.id)),
1242                ("source_id".to_string(), json!(record.source_id)),
1243                ("source_label".to_string(), json!(record.source_label)),
1244                ("artifact_kind".to_string(), json!(record.artifact_kind)),
1245                ("nct_id".to_string(), json!(record.nct_id)),
1246                ("api_url".to_string(), json!(record.api_url)),
1247                ("source_url".to_string(), json!(record.source_url)),
1248                (
1249                    "old_artifact_id".to_string(),
1250                    json!(record.entry.current_frontier_artifact_id),
1251                ),
1252                (
1253                    "old_content_hash".to_string(),
1254                    json!(record.old_content_hash),
1255                ),
1256                ("new_content_hash".to_string(), json!(record.content_hash)),
1257                (
1258                    "target_findings".to_string(),
1259                    json!(record.entry.target_findings),
1260                ),
1261                ("retrieved_at".to_string(), json!(created_at)),
1262            ]),
1263        });
1264    }
1265
1266    Ok(ArtifactPacket {
1267        schema: ARTIFACT_PACKET_SCHEMA.to_string(),
1268        packet_id: packet_id.to_string(),
1269        producer: PacketProducer {
1270            kind: "source_adapter".to_string(),
1271            id: format!("adapter:{}", options.adapter),
1272            name: spec.producer_name.to_string(),
1273        },
1274        topic: "Anti-amyloid translation in Alzheimer's disease".to_string(),
1275        created_at,
1276        artifacts,
1277        candidate_claims: Vec::new(),
1278        open_needs: Vec::new(),
1279        caveats: vec![
1280            spec.caveat.to_string(),
1281            "Truth-changing frontier updates require reviewer acceptance.".to_string(),
1282        ],
1283    })
1284}
1285
1286fn source_record_title(spec: &AdapterSpec, record: &FetchedRecord) -> String {
1287    match spec.kind {
1288        AdapterKind::ClinicalTrialsGovV2 => record
1289            .value
1290            .pointer("/protocolSection/identificationModule/briefTitle")
1291            .and_then(Value::as_str)
1292            .or_else(|| {
1293                record
1294                    .value
1295                    .pointer("/protocolSection/identificationModule/officialTitle")
1296                    .and_then(Value::as_str)
1297            })
1298            .map_or_else(
1299                || format!("ClinicalTrials.gov {}", record.source_id),
1300                |title| format!("ClinicalTrials.gov {} · {title}", record.source_id),
1301            ),
1302        AdapterKind::RegulatoryDocumentsV1 => {
1303            format!("{} · {}", record.entry.name, record.source_id)
1304        }
1305    }
1306}
1307
1308fn safe_id_fragment(value: &str) -> String {
1309    value
1310        .chars()
1311        .map(|ch| {
1312            if ch.is_ascii_alphanumeric() || ch == '_' {
1313                ch
1314            } else {
1315                '_'
1316            }
1317        })
1318        .collect()
1319}
1320
1321fn sha256_for_bytes(bytes: &[u8]) -> String {
1322    format!("sha256:{}", hex::encode(Sha256::digest(bytes)))
1323}
1324
1325fn run_id(adapter: &str, fetched: &[FetchedRecord], failures: &[SourceAdapterFailure]) -> String {
1326    let preimage = json!({
1327        "adapter": adapter,
1328        "created_at": Utc::now().to_rfc3339(),
1329        "records": fetched.iter().map(|record| json!({
1330            "entry_id": record.entry.id,
1331            "source_id": record.source_id,
1332            "nct_id": record.nct_id,
1333            "content_hash": record.content_hash,
1334        })).collect::<Vec<_>>(),
1335        "failures": failures,
1336    });
1337    let bytes = canonical::to_canonical_bytes(&preimage).unwrap_or_else(|_| Vec::new());
1338    format!("sir_{}", &hex::encode(Sha256::digest(&bytes))[..16])
1339}
1340
1341fn packet_id(adapter: &str, run_id: &str, manifest_hash: &str) -> String {
1342    let preimage = format!("{adapter}|{run_id}|{manifest_hash}");
1343    format!(
1344        "cap_{}",
1345        &hex::encode(Sha256::digest(preimage.as_bytes()))[..16]
1346    )
1347}
1348
1349#[allow(dead_code)]
1350fn _compiler_version() -> &'static str {
1351    project::VELA_COMPILER_VERSION
1352}