Skip to main content

dag_ml_core/
provenance.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use serde::{de::DeserializeOwned, Deserialize, Serialize};
4use serde_json::{json, Value};
5use sha2::{Digest, Sha256};
6
7use crate::bundle::ExecutionBundle;
8use crate::data::ExternalDataPlanEnvelope;
9use crate::error::{DagMlError, Result};
10use crate::ids::{ArtifactId, LineageId};
11use crate::plan::ExecutionPlan;
12use crate::runtime::{
13    FileArtifactManifest, FilePredictionCacheManifest, LineageRecord, FILE_ARTIFACT_MANIFEST_FILE,
14    FILE_PREDICTION_CACHE_MANIFEST_FILE,
15};
16
17pub const RESEARCH_PROVENANCE_SCHEMA_VERSION: u32 = 1;
18pub const EXECUTION_PLAN_FILE: &str = "execution_plan.json";
19pub const EXECUTION_BUNDLE_FILE: &str = "execution_bundle.json";
20pub const LINEAGE_RECORDS_FILE: &str = "lineage_records.json";
21pub const PROV_JSONLD_FILE: &str = "lineage.prov.jsonld";
22pub const RO_CRATE_METADATA_FILE: &str = "ro-crate-metadata.json";
23pub const OPENLINEAGE_RUN_EVENT_SCHEMA_URL: &str =
24    "https://openlineage.io/spec/1-0-0/OpenLineage.json#/definitions/RunEvent";
25pub const DAGML_OPENLINEAGE_FACET_SCHEMA_URL: &str =
26    "https://github.com/GBeurier/dag-ml/schemas/openlineage_dagml_facets.v1.schema.json";
27
28#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
29pub struct ResearchProvenanceExport {
30    pub schema_version: u32,
31    pub prov_jsonld: Value,
32    pub ro_crate_metadata: Value,
33}
34
35#[derive(Clone, Debug, Eq, PartialEq)]
36pub struct ResearchProvenancePackage {
37    pub schema_version: u32,
38    pub files: BTreeMap<String, ResearchProvenancePackageFile>,
39}
40
41#[derive(Clone, Debug, Eq, PartialEq)]
42pub struct ResearchProvenancePackageFile {
43    pub path: String,
44    pub sha256: String,
45    pub size_bytes: usize,
46    pub bytes: Vec<u8>,
47}
48
49#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
50pub struct ResearchProvenancePackageValidation {
51    pub schema_version: u32,
52    pub plan_id: String,
53    pub bundle_id: String,
54    pub file_count: usize,
55    pub checksummed_file_count: usize,
56    pub lineage_record_count: usize,
57    pub data_envelope_count: usize,
58    pub has_prediction_cache_manifest: bool,
59    pub has_artifact_manifest: bool,
60}
61
62#[derive(Clone, Debug, Eq, PartialEq)]
63pub struct OpenLineageRunEventOptions {
64    pub namespace: String,
65    pub event_time: String,
66}
67
68pub fn build_research_provenance_export(
69    plan: &ExecutionPlan,
70    bundle: &ExecutionBundle,
71    lineage: &[LineageRecord],
72    data_envelopes: &BTreeMap<String, ExternalDataPlanEnvelope>,
73    prediction_cache_manifest: Option<&FilePredictionCacheManifest>,
74    artifact_manifest: Option<&FileArtifactManifest>,
75) -> Result<ResearchProvenanceExport> {
76    validate_provenance_inputs(
77        plan,
78        bundle,
79        lineage,
80        data_envelopes,
81        prediction_cache_manifest,
82        artifact_manifest,
83    )?;
84
85    Ok(ResearchProvenanceExport {
86        schema_version: RESEARCH_PROVENANCE_SCHEMA_VERSION,
87        prov_jsonld: build_prov_jsonld(
88            plan,
89            bundle,
90            lineage,
91            data_envelopes,
92            prediction_cache_manifest,
93            artifact_manifest,
94        )?,
95        ro_crate_metadata: build_ro_crate_metadata(
96            plan,
97            bundle,
98            data_envelopes,
99            prediction_cache_manifest,
100            artifact_manifest,
101        )?,
102    })
103}
104
105pub fn build_research_provenance_package(
106    plan: &ExecutionPlan,
107    bundle: &ExecutionBundle,
108    lineage: &[LineageRecord],
109    data_envelopes: &BTreeMap<String, ExternalDataPlanEnvelope>,
110    prediction_cache_manifest: Option<&FilePredictionCacheManifest>,
111    artifact_manifest: Option<&FileArtifactManifest>,
112) -> Result<ResearchProvenancePackage> {
113    let export = build_research_provenance_export(
114        plan,
115        bundle,
116        lineage,
117        data_envelopes,
118        prediction_cache_manifest,
119        artifact_manifest,
120    )?;
121    let mut files = BTreeMap::new();
122    add_json_package_file(&mut files, EXECUTION_PLAN_FILE, plan, "execution plan")?;
123    add_json_package_file(
124        &mut files,
125        EXECUTION_BUNDLE_FILE,
126        bundle,
127        "execution bundle",
128    )?;
129    add_json_package_file(
130        &mut files,
131        LINEAGE_RECORDS_FILE,
132        &lineage,
133        "lineage records",
134    )?;
135    add_json_package_file(
136        &mut files,
137        PROV_JSONLD_FILE,
138        &export.prov_jsonld,
139        "PROV JSON-LD",
140    )?;
141    if let Some(manifest) = prediction_cache_manifest {
142        add_json_package_file(
143            &mut files,
144            FILE_PREDICTION_CACHE_MANIFEST_FILE,
145            manifest,
146            "prediction cache manifest",
147        )?;
148    }
149    if let Some(manifest) = artifact_manifest {
150        add_json_package_file(
151            &mut files,
152            FILE_ARTIFACT_MANIFEST_FILE,
153            manifest,
154            "artifact manifest",
155        )?;
156    }
157    for (key, envelope) in data_envelopes {
158        add_json_package_file(
159            &mut files,
160            &data_envelope_file_path(key)?,
161            envelope,
162            "data envelope",
163        )?;
164    }
165
166    let mut ro_crate_metadata = export.ro_crate_metadata;
167    annotate_ro_crate_package_files(&mut ro_crate_metadata, &files)?;
168    add_json_package_file(
169        &mut files,
170        RO_CRATE_METADATA_FILE,
171        &ro_crate_metadata,
172        "RO-Crate metadata",
173    )?;
174
175    Ok(ResearchProvenancePackage {
176        schema_version: RESEARCH_PROVENANCE_SCHEMA_VERSION,
177        files,
178    })
179}
180
181pub fn validate_research_provenance_package_files(
182    files: &BTreeMap<String, Vec<u8>>,
183) -> Result<ResearchProvenancePackageValidation> {
184    if files.is_empty() {
185        return Err(DagMlError::RuntimeValidation(
186            "research provenance package has no files".to_string(),
187        ));
188    }
189    for path in files.keys() {
190        validate_package_path(path)?;
191    }
192    require_package_file(files, EXECUTION_PLAN_FILE)?;
193    require_package_file(files, EXECUTION_BUNDLE_FILE)?;
194    require_package_file(files, LINEAGE_RECORDS_FILE)?;
195    require_package_file(files, PROV_JSONLD_FILE)?;
196    let ro_crate_metadata: Value = parse_package_json(
197        require_package_file(files, RO_CRATE_METADATA_FILE)?,
198        RO_CRATE_METADATA_FILE,
199    )?;
200
201    let checksummed_file_count = validate_ro_crate_package_checksums(&ro_crate_metadata, files)?;
202    validate_prov_jsonld_root(parse_package_json(
203        require_package_file(files, PROV_JSONLD_FILE)?,
204        PROV_JSONLD_FILE,
205    )?)?;
206
207    let plan: ExecutionPlan = parse_package_json(
208        require_package_file(files, EXECUTION_PLAN_FILE)?,
209        EXECUTION_PLAN_FILE,
210    )?;
211    let bundle: ExecutionBundle = parse_package_json(
212        require_package_file(files, EXECUTION_BUNDLE_FILE)?,
213        EXECUTION_BUNDLE_FILE,
214    )?;
215    let lineage: Vec<LineageRecord> = parse_package_json(
216        require_package_file(files, LINEAGE_RECORDS_FILE)?,
217        LINEAGE_RECORDS_FILE,
218    )?;
219    let data_envelopes = parse_package_data_envelopes(files)?;
220    let prediction_cache_manifest: Option<FilePredictionCacheManifest> = files
221        .get(FILE_PREDICTION_CACHE_MANIFEST_FILE)
222        .map(|bytes| parse_package_json(bytes, FILE_PREDICTION_CACHE_MANIFEST_FILE))
223        .transpose()?;
224    let artifact_manifest: Option<FileArtifactManifest> = files
225        .get(FILE_ARTIFACT_MANIFEST_FILE)
226        .map(|bytes| parse_package_json(bytes, FILE_ARTIFACT_MANIFEST_FILE))
227        .transpose()?;
228
229    validate_provenance_inputs(
230        &plan,
231        &bundle,
232        &lineage,
233        &data_envelopes,
234        prediction_cache_manifest.as_ref(),
235        artifact_manifest.as_ref(),
236    )?;
237
238    Ok(ResearchProvenancePackageValidation {
239        schema_version: RESEARCH_PROVENANCE_SCHEMA_VERSION,
240        plan_id: plan.id.to_string(),
241        bundle_id: bundle.bundle_id.to_string(),
242        file_count: files.len(),
243        checksummed_file_count,
244        lineage_record_count: lineage.len(),
245        data_envelope_count: data_envelopes.len(),
246        has_prediction_cache_manifest: prediction_cache_manifest.is_some(),
247        has_artifact_manifest: artifact_manifest.is_some(),
248    })
249}
250
251pub fn build_openlineage_run_event_from_package_files(
252    files: &BTreeMap<String, Vec<u8>>,
253    namespace: &str,
254    event_time: &str,
255) -> Result<Value> {
256    validate_research_provenance_package_files(files)?;
257    let plan: ExecutionPlan = parse_package_json(
258        require_package_file(files, EXECUTION_PLAN_FILE)?,
259        EXECUTION_PLAN_FILE,
260    )?;
261    let bundle: ExecutionBundle = parse_package_json(
262        require_package_file(files, EXECUTION_BUNDLE_FILE)?,
263        EXECUTION_BUNDLE_FILE,
264    )?;
265    let lineage: Vec<LineageRecord> = parse_package_json(
266        require_package_file(files, LINEAGE_RECORDS_FILE)?,
267        LINEAGE_RECORDS_FILE,
268    )?;
269    let data_envelopes = parse_package_data_envelopes(files)?;
270    let prediction_cache_manifest: Option<FilePredictionCacheManifest> = files
271        .get(FILE_PREDICTION_CACHE_MANIFEST_FILE)
272        .map(|bytes| parse_package_json(bytes, FILE_PREDICTION_CACHE_MANIFEST_FILE))
273        .transpose()?;
274    let artifact_manifest: Option<FileArtifactManifest> = files
275        .get(FILE_ARTIFACT_MANIFEST_FILE)
276        .map(|bytes| parse_package_json(bytes, FILE_ARTIFACT_MANIFEST_FILE))
277        .transpose()?;
278    let options = OpenLineageRunEventOptions {
279        namespace: namespace.to_string(),
280        event_time: event_time.to_string(),
281    };
282
283    build_openlineage_run_event(
284        &plan,
285        &bundle,
286        &lineage,
287        &data_envelopes,
288        prediction_cache_manifest.as_ref(),
289        artifact_manifest.as_ref(),
290        &options,
291    )
292}
293
294pub fn build_openlineage_run_event(
295    plan: &ExecutionPlan,
296    bundle: &ExecutionBundle,
297    lineage: &[LineageRecord],
298    data_envelopes: &BTreeMap<String, ExternalDataPlanEnvelope>,
299    prediction_cache_manifest: Option<&FilePredictionCacheManifest>,
300    artifact_manifest: Option<&FileArtifactManifest>,
301    options: &OpenLineageRunEventOptions,
302) -> Result<Value> {
303    validate_provenance_inputs(
304        plan,
305        bundle,
306        lineage,
307        data_envelopes,
308        prediction_cache_manifest,
309        artifact_manifest,
310    )?;
311    validate_openlineage_namespace(options.namespace.as_str())?;
312    validate_openlineage_event_time(options.event_time.as_str())?;
313
314    Ok(json!({
315        "eventType": "COMPLETE",
316        "eventTime": options.event_time.as_str(),
317        "run": {
318            "runId": openlineage_run_id(plan, bundle),
319            "facets": {
320                "dagml_reproducibility": dagml_openlineage_reproducibility_run_facet(plan, bundle),
321                "dagml_oof_safety": dagml_openlineage_oof_safety_run_facet(bundle, lineage),
322            },
323        },
324        "job": {
325            "namespace": options.namespace.as_str(),
326            "name": format!("{}::{}", plan.id, bundle.bundle_id),
327            "facets": {
328                "dagml_plan": dagml_openlineage_plan_job_facet(plan, bundle),
329            },
330        },
331        "inputs": openlineage_input_datasets(bundle, data_envelopes),
332        "outputs": openlineage_output_datasets(bundle, prediction_cache_manifest, artifact_manifest),
333        "producer": "https://github.com/GBeurier/dag-ml",
334        "schemaURL": OPENLINEAGE_RUN_EVENT_SCHEMA_URL,
335    }))
336}
337
338fn validate_provenance_inputs(
339    plan: &ExecutionPlan,
340    bundle: &ExecutionBundle,
341    lineage: &[LineageRecord],
342    data_envelopes: &BTreeMap<String, ExternalDataPlanEnvelope>,
343    prediction_cache_manifest: Option<&FilePredictionCacheManifest>,
344    artifact_manifest: Option<&FileArtifactManifest>,
345) -> Result<()> {
346    plan.validate()?;
347    bundle.validate_against_plan(plan)?;
348    if !data_envelopes.is_empty() {
349        bundle.validate_replay_envelopes(data_envelopes)?;
350    }
351    if let Some(manifest) = prediction_cache_manifest {
352        manifest.validate_against_bundle(bundle)?;
353    }
354    if let Some(manifest) = artifact_manifest {
355        manifest.validate_against_bundle(bundle)?;
356    }
357
358    let mut lineage_ids = BTreeSet::<&LineageId>::new();
359    for record in lineage {
360        record.validate()?;
361        if !plan.node_plans.contains_key(&record.node_id) {
362            return Err(DagMlError::RuntimeValidation(format!(
363                "provenance lineage `{}` references unknown node `{}`",
364                record.record_id, record.node_id
365            )));
366        }
367        if !plan
368            .controller_manifests
369            .contains_key(&record.controller_id)
370        {
371            return Err(DagMlError::RuntimeValidation(format!(
372                "provenance lineage `{}` references unknown controller `{}`",
373                record.record_id, record.controller_id
374            )));
375        }
376        if !lineage_ids.insert(&record.record_id) {
377            return Err(DagMlError::RuntimeValidation(format!(
378                "duplicate provenance lineage record `{}`",
379                record.record_id
380            )));
381        }
382    }
383    for record in lineage {
384        for input_id in &record.input_lineage {
385            if !lineage_ids.contains(input_id) {
386                return Err(DagMlError::RuntimeValidation(format!(
387                    "provenance lineage `{}` references missing input lineage `{}`",
388                    record.record_id, input_id
389                )));
390            }
391        }
392    }
393    Ok(())
394}
395
396fn build_prov_jsonld(
397    plan: &ExecutionPlan,
398    bundle: &ExecutionBundle,
399    lineage: &[LineageRecord],
400    data_envelopes: &BTreeMap<String, ExternalDataPlanEnvelope>,
401    prediction_cache_manifest: Option<&FilePredictionCacheManifest>,
402    artifact_manifest: Option<&FileArtifactManifest>,
403) -> Result<Value> {
404    let plan_entity_id = format!("dagml:execution-plan:{}", plan.id);
405    let bundle_entity_id = format!("dagml:execution-bundle:{}", bundle.bundle_id);
406    let packaging_activity_id = format!("dagml:activity:package-bundle:{}", bundle.bundle_id);
407    let coordinator_agent_id = "dagml:agent:dag-ml".to_string();
408
409    let mut entity = BTreeMap::<String, Value>::new();
410    entity.insert(
411        plan_entity_id.clone(),
412        json!({
413            "prov:type": ["prov:Entity", "dagml:ExecutionPlan"],
414            "dagml:plan_id": plan.id,
415            "dagml:graph_fingerprint": plan.graph_fingerprint,
416            "dagml:campaign_fingerprint": plan.campaign_fingerprint,
417            "dagml:controller_fingerprint": plan.controller_fingerprint,
418            "dagml:variant_count": plan.variants.len(),
419            "dagml:has_fold_set": plan.fold_set.is_some(),
420        }),
421    );
422    entity.insert(
423        bundle_entity_id.clone(),
424        json!({
425            "prov:type": ["prov:Entity", "dagml:ExecutionBundle"],
426            "dagml:bundle_id": bundle.bundle_id,
427            "dagml:schema_version": bundle.schema_version,
428            "dagml:plan_id": bundle.plan_id,
429            "dagml:selected_variant_id": bundle.selected_variant_id,
430            "dagml:graph_fingerprint": bundle.graph_fingerprint,
431            "dagml:campaign_fingerprint": bundle.campaign_fingerprint,
432            "dagml:controller_fingerprint": bundle.controller_fingerprint,
433            "dagml:unsafe_flags": bundle.unsafe_flags,
434            "dagml:selection_count": bundle.selections.len(),
435        }),
436    );
437
438    for requirement in &bundle.data_requirements {
439        let key = requirement.key();
440        entity.insert(
441            data_requirement_entity_id(&key),
442            json!({
443                "prov:type": ["prov:Entity", "dagml:DataRequirement"],
444                "dagml:requirement_key": key,
445                "dagml:node_id": requirement.node_id,
446                "dagml:input_name": requirement.input_name,
447                "dagml:schema_fingerprint": requirement.schema_fingerprint,
448                "dagml:plan_fingerprint": requirement.plan_fingerprint,
449                "dagml:relation_fingerprint": requirement.relation_fingerprint,
450                "dagml:feature_set_id": requirement.feature_set_id,
451            }),
452        );
453    }
454    for (key, envelope) in data_envelopes {
455        entity.insert(
456            data_envelope_entity_id(key),
457            json!({
458                "prov:type": ["prov:Entity", "dagml:ExternalDataPlanEnvelope"],
459                "dagml:envelope_key": key,
460                "dagml:schema_version": envelope.schema_version,
461                "dagml:schema_fingerprint": envelope.schema_fingerprint,
462                "dagml:plan_fingerprint": envelope.plan_fingerprint,
463                "dagml:relation_fingerprint": envelope.relation_fingerprint,
464            }),
465        );
466    }
467    for requirement in &bundle.prediction_requirements {
468        let key = requirement.key();
469        entity.insert(
470            prediction_requirement_entity_id(&key),
471            json!({
472                "prov:type": ["prov:Entity", "dagml:PredictionRequirement"],
473                "dagml:requirement_key": key,
474                "dagml:producer_node": requirement.producer_node,
475                "dagml:consumer_node": requirement.consumer_node,
476                "dagml:prediction_level": requirement.prediction_level,
477                "dagml:fold_ids": requirement.fold_ids,
478                "dagml:unit_ids": requirement.unit_ids,
479                "dagml:sample_ids": requirement.sample_ids,
480                "dagml:prediction_width": requirement.prediction_width,
481                "dagml:target_names": requirement.target_names,
482            }),
483        );
484    }
485    for cache in &bundle.prediction_caches {
486        entity.insert(
487            prediction_cache_entity_id(&cache.cache_id),
488            json!({
489                "prov:type": ["prov:Entity", "dagml:PredictionCache"],
490                "dagml:requirement_key": cache.requirement_key,
491                "dagml:cache_id": cache.cache_id,
492                "dagml:format": cache.format,
493                "dagml:prediction_level": cache.prediction_level,
494                "dagml:unit_ids": cache.unit_ids,
495                "dagml:block_count": cache.block_count,
496                "dagml:row_count": cache.row_count,
497                "dagml:content_fingerprint": cache.content_fingerprint,
498            }),
499        );
500    }
501    if let Some(manifest) = prediction_cache_manifest {
502        entity.insert(
503            "dagml:file:prediction-cache-manifest".to_string(),
504            json!({
505                "prov:type": ["prov:Entity", "dagml:PredictionCacheManifest"],
506                "dagml:file": FILE_PREDICTION_CACHE_MANIFEST_FILE,
507                "dagml:schema_version": manifest.schema_version,
508                "dagml:cache_count": manifest.caches.len(),
509            }),
510        );
511    }
512    for record in &bundle.refit_artifacts {
513        entity.insert(
514            artifact_entity_id(&record.artifact.id),
515            json!({
516                "prov:type": ["prov:Entity", "dagml:ModelArtifact"],
517                "dagml:artifact_id": record.artifact.id,
518                "dagml:kind": record.artifact.kind,
519                "dagml:node_id": record.node_id,
520                "dagml:controller_id": record.controller_id,
521                "dagml:backend": record.artifact.backend,
522                "dagml:uri": record.artifact.uri,
523                "dagml:content_fingerprint": record.artifact.content_fingerprint,
524                "dagml:size_bytes": record.artifact.size_bytes,
525                "dagml:plugin": record.artifact.plugin,
526                "dagml:plugin_version": record.artifact.plugin_version,
527                "dagml:params_fingerprint": record.params_fingerprint,
528                "dagml:data_requirement_keys": record.data_requirement_keys,
529                "dagml:prediction_requirement_keys": record.prediction_requirement_keys,
530            }),
531        );
532    }
533    if let Some(manifest) = artifact_manifest {
534        entity.insert(
535            "dagml:file:artifact-manifest".to_string(),
536            json!({
537                "prov:type": ["prov:Entity", "dagml:ArtifactManifest"],
538                "dagml:file": FILE_ARTIFACT_MANIFEST_FILE,
539                "dagml:schema_version": manifest.schema_version,
540                "dagml:artifact_count": manifest.artifacts.len(),
541            }),
542        );
543    }
544    for record in lineage {
545        entity.insert(
546            lineage_record_entity_id(&record.record_id),
547            json!({
548                "prov:type": ["prov:Entity", "dagml:LineageRecord"],
549                "dagml:lineage_id": record.record_id,
550                "dagml:run_id": record.run_id,
551                "dagml:node_id": record.node_id,
552                "dagml:phase": record.phase,
553                "dagml:controller_id": record.controller_id,
554                "dagml:variant_id": record.variant_id,
555                "dagml:fold_id": record.fold_id,
556                "dagml:branch_path": record.branch_path,
557                "dagml:input_lineage": record.input_lineage,
558                "dagml:artifact_refs": record
559                    .artifact_refs
560                    .iter()
561                    .map(|artifact| artifact.id.clone())
562                    .collect::<Vec<_>>(),
563            }),
564        );
565    }
566
567    let mut agent = BTreeMap::<String, Value>::new();
568    agent.insert(
569        coordinator_agent_id.clone(),
570        json!({
571            "prov:type": ["prov:Agent", "dagml:Coordinator"],
572            "dagml:name": "dag-ml",
573            "dagml:provenance_schema_version": RESEARCH_PROVENANCE_SCHEMA_VERSION,
574        }),
575    );
576    for manifest in plan.controller_manifests.values() {
577        agent.insert(
578            controller_agent_id(manifest.controller_id.as_str()),
579            json!({
580                "prov:type": ["prov:Agent", "dagml:Controller"],
581                "dagml:controller_id": manifest.controller_id,
582                "dagml:controller_version": manifest.controller_version,
583                "dagml:operator_kind": manifest.operator_kind,
584                "dagml:fit_scope": manifest.fit_scope,
585                "dagml:rng_policy": manifest.rng_policy,
586                "dagml:artifact_policy": manifest.artifact_policy,
587                "dagml:capabilities": manifest.capabilities,
588            }),
589        );
590    }
591
592    let mut activity = BTreeMap::<String, Value>::new();
593    activity.insert(
594        packaging_activity_id.clone(),
595        json!({
596            "prov:type": ["prov:Activity", "dagml:BundlePackaging"],
597            "dagml:bundle_id": bundle.bundle_id,
598            "dagml:plan_id": bundle.plan_id,
599            "dagml:selected_variant_id": bundle.selected_variant_id,
600        }),
601    );
602    for record in lineage {
603        activity.insert(
604            lineage_activity_id(record),
605            json!({
606                "prov:type": ["prov:Activity", "dagml:NodeExecution"],
607                "dagml:lineage_id": record.record_id,
608                "dagml:run_id": record.run_id,
609                "dagml:node_id": record.node_id,
610                "dagml:phase": record.phase,
611                "dagml:controller_id": record.controller_id,
612                "dagml:controller_version": record.controller_version,
613                "dagml:variant_id": record.variant_id,
614                "dagml:fold_id": record.fold_id,
615                "dagml:branch_path": record.branch_path,
616                "dagml:params_fingerprint": record.params_fingerprint,
617                "dagml:data_model_shape_fingerprint": record.data_model_shape_fingerprint,
618                "dagml:aggregation_policy_fingerprint": record.aggregation_policy_fingerprint,
619                "dagml:seed": record.seed,
620                "dagml:unsafe_flags": record.unsafe_flags,
621                "dagml:metrics": record.metrics,
622            }),
623        );
624    }
625
626    let mut used = BTreeMap::<String, Value>::new();
627    used.insert(
628        "dagml:used:bundle-plan".to_string(),
629        json!({
630            "prov:activity": packaging_activity_id,
631            "prov:entity": plan_entity_id,
632        }),
633    );
634    for record in lineage {
635        for input_id in &record.input_lineage {
636            used.insert(
637                format!("dagml:used:{}:{}", record.record_id, input_id),
638                json!({
639                    "prov:activity": lineage_activity_id(record),
640                    "prov:entity": lineage_record_entity_id(input_id),
641                    "dagml:input_lineage_id": input_id,
642                }),
643            );
644        }
645    }
646
647    let lineage_by_artifact = lineage_artifact_index(lineage);
648    let mut was_generated_by = BTreeMap::<String, Value>::new();
649    was_generated_by.insert(
650        "dagml:generated:bundle".to_string(),
651        json!({
652            "prov:entity": bundle_entity_id,
653            "prov:activity": packaging_activity_id,
654        }),
655    );
656    for record in lineage {
657        was_generated_by.insert(
658            format!("dagml:generated:lineage:{}", record.record_id),
659            json!({
660                "prov:entity": lineage_record_entity_id(&record.record_id),
661                "prov:activity": lineage_activity_id(record),
662            }),
663        );
664    }
665    for record in &bundle.refit_artifacts {
666        let activity_id = lineage_by_artifact
667            .get(&record.artifact.id)
668            .cloned()
669            .unwrap_or_else(|| packaging_activity_id.clone());
670        was_generated_by.insert(
671            format!("dagml:generated:artifact:{}", record.artifact.id),
672            json!({
673                "prov:entity": artifact_entity_id(&record.artifact.id),
674                "prov:activity": activity_id,
675            }),
676        );
677    }
678
679    let mut was_derived_from = BTreeMap::<String, Value>::new();
680    was_derived_from.insert(
681        "dagml:derived:bundle-plan".to_string(),
682        json!({
683            "prov:generatedEntity": bundle_entity_id,
684            "prov:usedEntity": plan_entity_id,
685        }),
686    );
687    for record in &bundle.refit_artifacts {
688        for key in &record.data_requirement_keys {
689            was_derived_from.insert(
690                format!("dagml:derived:{}:data:{key}", record.artifact.id),
691                json!({
692                    "prov:generatedEntity": artifact_entity_id(&record.artifact.id),
693                    "prov:usedEntity": data_requirement_entity_id(key),
694                    "dagml:refit_dependency": "data_requirement",
695                }),
696            );
697        }
698        for key in &record.prediction_requirement_keys {
699            was_derived_from.insert(
700                format!("dagml:derived:{}:prediction:{key}", record.artifact.id),
701                json!({
702                    "prov:generatedEntity": artifact_entity_id(&record.artifact.id),
703                    "prov:usedEntity": prediction_requirement_entity_id(key),
704                    "dagml:refit_dependency": "prediction_requirement",
705                    "dagml:oof_dependency": true,
706                }),
707            );
708        }
709    }
710    for cache in &bundle.prediction_caches {
711        was_derived_from.insert(
712            format!("dagml:derived:cache:{}", cache.cache_id),
713            json!({
714                "prov:generatedEntity": prediction_cache_entity_id(&cache.cache_id),
715                "prov:usedEntity": prediction_requirement_entity_id(&cache.requirement_key),
716            }),
717        );
718    }
719    for record in lineage {
720        for input_id in &record.input_lineage {
721            was_derived_from.insert(
722                format!("dagml:derived:lineage:{}:{input_id}", record.record_id),
723                json!({
724                    "prov:generatedEntity": lineage_record_entity_id(&record.record_id),
725                    "prov:usedEntity": lineage_record_entity_id(input_id),
726                    "dagml:lineage_dependency": true,
727                }),
728            );
729        }
730    }
731
732    let mut was_associated_with = BTreeMap::<String, Value>::new();
733    was_associated_with.insert(
734        "dagml:associated:bundle-packaging".to_string(),
735        json!({
736            "prov:activity": packaging_activity_id,
737            "prov:agent": coordinator_agent_id,
738        }),
739    );
740    for record in lineage {
741        was_associated_with.insert(
742            format!("dagml:associated:{}", record.record_id),
743            json!({
744                "prov:activity": lineage_activity_id(record),
745                "prov:agent": controller_agent_id(record.controller_id.as_str()),
746            }),
747        );
748    }
749
750    Ok(json!({
751        "@context": {
752            "prov": "http://www.w3.org/ns/prov#",
753            "dagml": "https://dag-ml.dev/ns#",
754        },
755        "entity": entity,
756        "activity": activity,
757        "agent": agent,
758        "used": used,
759        "wasGeneratedBy": was_generated_by,
760        "wasDerivedFrom": was_derived_from,
761        "wasAssociatedWith": was_associated_with,
762    }))
763}
764
765fn build_ro_crate_metadata(
766    plan: &ExecutionPlan,
767    bundle: &ExecutionBundle,
768    data_envelopes: &BTreeMap<String, ExternalDataPlanEnvelope>,
769    prediction_cache_manifest: Option<&FilePredictionCacheManifest>,
770    artifact_manifest: Option<&FileArtifactManifest>,
771) -> Result<Value> {
772    let mut has_part = vec![
773        json!({"@id": "execution_plan.json"}),
774        json!({"@id": "execution_bundle.json"}),
775        json!({"@id": PROV_JSONLD_FILE}),
776    ];
777    let mut graph = vec![
778        json!({
779            "@id": RO_CRATE_METADATA_FILE,
780            "@type": "CreativeWork",
781            "about": {"@id": "./"},
782            "conformsTo": {"@id": "https://w3id.org/ro/crate/1.1"},
783        }),
784        json!({
785            "@id": "./",
786            "@type": "Dataset",
787            "name": format!("DAG-ML research bundle {}", bundle.bundle_id),
788            "mainEntity": {"@id": "#workflow"},
789            "hasPart": has_part.clone(),
790            "dagml:schema_version": RESEARCH_PROVENANCE_SCHEMA_VERSION,
791            "dagml:bundle_id": bundle.bundle_id,
792            "dagml:plan_id": plan.id,
793            "dagml:unsafe_flags": bundle.unsafe_flags,
794        }),
795        json!({
796            "@id": "#workflow",
797            "@type": ["ComputationalWorkflow", "SoftwareSourceCode"],
798            "name": "DAG-ML compiled workflow",
799            "programmingLanguage": "Rust",
800            "dagml:plan_id": plan.id,
801            "dagml:graph_fingerprint": plan.graph_fingerprint,
802            "dagml:campaign_fingerprint": plan.campaign_fingerprint,
803            "dagml:controller_fingerprint": plan.controller_fingerprint,
804            "dagml:selected_variant_id": bundle.selected_variant_id,
805            "dagml:variant_count": plan.variants.len(),
806        }),
807        file_entity(
808            "execution_plan.json",
809            "DAG-ML execution plan",
810            "dagml:ExecutionPlan",
811        ),
812        file_entity(
813            "execution_bundle.json",
814            "DAG-ML execution bundle",
815            "dagml:ExecutionBundle",
816        ),
817        file_entity(PROV_JSONLD_FILE, "DAG-ML W3C PROV export", "prov:Bundle"),
818    ];
819
820    if prediction_cache_manifest.is_some() {
821        has_part.push(json!({"@id": FILE_PREDICTION_CACHE_MANIFEST_FILE}));
822        graph.push(file_entity(
823            FILE_PREDICTION_CACHE_MANIFEST_FILE,
824            "DAG-ML prediction cache manifest",
825            "dagml:PredictionCacheManifest",
826        ));
827    }
828    if artifact_manifest.is_some() {
829        has_part.push(json!({"@id": FILE_ARTIFACT_MANIFEST_FILE}));
830        graph.push(file_entity(
831            FILE_ARTIFACT_MANIFEST_FILE,
832            "DAG-ML artifact manifest",
833            "dagml:ArtifactManifest",
834        ));
835    }
836    for (key, envelope) in data_envelopes {
837        let id = format!("data_envelopes/{key}.json");
838        has_part.push(json!({"@id": id}));
839        graph.push(json!({
840            "@id": id,
841            "@type": ["File", "dagml:ExternalDataPlanEnvelope"],
842            "name": format!("DAG-ML data envelope {key}"),
843            "dagml:envelope_key": key,
844            "dagml:schema_version": envelope.schema_version,
845            "dagml:schema_fingerprint": envelope.schema_fingerprint,
846            "dagml:plan_fingerprint": envelope.plan_fingerprint,
847            "dagml:relation_fingerprint": envelope.relation_fingerprint,
848        }));
849    }
850
851    graph[1]["hasPart"] = Value::Array(has_part);
852
853    for manifest in plan.controller_manifests.values() {
854        graph.push(json!({
855            "@id": controller_agent_id(manifest.controller_id.as_str()),
856            "@type": ["SoftwareApplication", "dagml:Controller"],
857            "name": manifest.controller_id,
858            "softwareVersion": manifest.controller_version,
859            "dagml:operator_kind": manifest.operator_kind,
860            "dagml:capabilities": manifest.capabilities,
861            "dagml:artifact_policy": manifest.artifact_policy,
862        }));
863    }
864    for artifact in &bundle.refit_artifacts {
865        graph.push(json!({
866            "@id": artifact_entity_id(&artifact.artifact.id),
867            "@type": ["File", "dagml:ModelArtifact"],
868            "name": artifact.artifact.id,
869            "encodingFormat": artifact.artifact.kind,
870            "dagml:node_id": artifact.node_id,
871            "dagml:controller_id": artifact.controller_id,
872            "dagml:backend": artifact.artifact.backend,
873            "dagml:uri": artifact.artifact.uri,
874            "dagml:content_fingerprint": artifact.artifact.content_fingerprint,
875            "dagml:plugin": artifact.artifact.plugin,
876            "dagml:plugin_version": artifact.artifact.plugin_version,
877            "dagml:refit_data_requirement_keys": artifact.data_requirement_keys,
878            "dagml:refit_prediction_requirement_keys": artifact.prediction_requirement_keys,
879        }));
880    }
881
882    Ok(json!({
883        "@context": [
884            "https://w3id.org/ro/crate/1.1/context",
885            {
886                "dagml": "https://dag-ml.dev/ns#",
887                "prov": "http://www.w3.org/ns/prov#",
888            }
889        ],
890        "@graph": graph,
891    }))
892}
893
894fn file_entity(id: &str, name: &str, dagml_type: &str) -> Value {
895    json!({
896        "@id": id,
897        "@type": ["File", dagml_type],
898        "name": name,
899    })
900}
901
902fn add_json_package_file<T: Serialize + ?Sized>(
903    files: &mut BTreeMap<String, ResearchProvenancePackageFile>,
904    path: &str,
905    value: &T,
906    label: &str,
907) -> Result<()> {
908    validate_package_path(path)?;
909    let mut bytes = serde_json::to_vec_pretty(value).map_err(|err| {
910        DagMlError::RuntimeValidation(format!("failed to serialize {label}: {err}"))
911    })?;
912    bytes.push(b'\n');
913    let sha256 = sha256_hex(&bytes);
914    let previous = files.insert(
915        path.to_string(),
916        ResearchProvenancePackageFile {
917            path: path.to_string(),
918            sha256,
919            size_bytes: bytes.len(),
920            bytes,
921        },
922    );
923    if previous.is_some() {
924        return Err(DagMlError::RuntimeValidation(format!(
925            "duplicate research provenance package file `{path}`"
926        )));
927    }
928    Ok(())
929}
930
931fn validate_package_path(path: &str) -> Result<()> {
932    if path.is_empty() {
933        return Err(DagMlError::RuntimeValidation(
934            "research provenance package path is empty".to_string(),
935        ));
936    }
937    if path.starts_with('/') || path.starts_with('\\') {
938        return Err(DagMlError::RuntimeValidation(format!(
939            "research provenance package path `{path}` must be relative"
940        )));
941    }
942    if path.chars().any(char::is_control) {
943        return Err(DagMlError::RuntimeValidation(format!(
944            "research provenance package path `{path}` has control characters"
945        )));
946    }
947    for segment in path.split(['/', '\\']) {
948        if segment.is_empty() || segment == "." || segment == ".." {
949            return Err(DagMlError::RuntimeValidation(format!(
950                "research provenance package path `{path}` has an invalid path component"
951            )));
952        }
953    }
954    Ok(())
955}
956
957fn data_envelope_file_path(key: &str) -> Result<String> {
958    if key.contains(['/', '\\']) {
959        return Err(DagMlError::RuntimeValidation(format!(
960            "data envelope key `{key}` cannot be used as a research provenance package path"
961        )));
962    }
963    Ok(format!("data_envelopes/{key}.json"))
964}
965
966fn require_package_file<'a>(files: &'a BTreeMap<String, Vec<u8>>, path: &str) -> Result<&'a [u8]> {
967    files.get(path).map(Vec::as_slice).ok_or_else(|| {
968        DagMlError::RuntimeValidation(format!("research provenance package is missing `{path}`"))
969    })
970}
971
972fn parse_package_json<T: DeserializeOwned>(bytes: &[u8], path: &str) -> Result<T> {
973    serde_json::from_slice(bytes).map_err(|err| {
974        DagMlError::RuntimeValidation(format!(
975            "failed to parse research provenance package JSON `{path}`: {err}"
976        ))
977    })
978}
979
980fn parse_package_data_envelopes(
981    files: &BTreeMap<String, Vec<u8>>,
982) -> Result<BTreeMap<String, ExternalDataPlanEnvelope>> {
983    let mut envelopes = BTreeMap::new();
984    for (path, bytes) in files {
985        let Some(key) = path
986            .strip_prefix("data_envelopes/")
987            .and_then(|suffix| suffix.strip_suffix(".json"))
988        else {
989            continue;
990        };
991        if key.is_empty() || key.contains(['/', '\\']) {
992            return Err(DagMlError::RuntimeValidation(format!(
993                "research provenance data envelope path `{path}` has an invalid key"
994            )));
995        }
996        let previous = envelopes.insert(key.to_string(), parse_package_json(bytes, path)?);
997        if previous.is_some() {
998            return Err(DagMlError::RuntimeValidation(format!(
999                "duplicate research provenance data envelope key `{key}`"
1000            )));
1001        }
1002    }
1003    Ok(envelopes)
1004}
1005
1006fn validate_prov_jsonld_root(prov_jsonld: Value) -> Result<()> {
1007    if prov_jsonld.get("@context").is_none()
1008        || prov_jsonld.get("entity").is_none()
1009        || prov_jsonld.get("activity").is_none()
1010        || prov_jsonld.get("agent").is_none()
1011    {
1012        return Err(DagMlError::RuntimeValidation(
1013            "research provenance PROV JSON-LD root is missing required sections".to_string(),
1014        ));
1015    }
1016    Ok(())
1017}
1018
1019fn validate_openlineage_namespace(namespace: &str) -> Result<()> {
1020    if namespace.trim().is_empty() {
1021        return Err(DagMlError::RuntimeValidation(
1022            "OpenLineage namespace must not be empty".to_string(),
1023        ));
1024    }
1025    if namespace.chars().any(char::is_control) {
1026        return Err(DagMlError::RuntimeValidation(
1027            "OpenLineage namespace contains control characters".to_string(),
1028        ));
1029    }
1030    Ok(())
1031}
1032
1033fn validate_openlineage_event_time(event_time: &str) -> Result<()> {
1034    if event_time.trim().is_empty() || !event_time.contains('T') {
1035        return Err(DagMlError::RuntimeValidation(
1036            "OpenLineage event_time must be a non-empty RFC3339-like timestamp".to_string(),
1037        ));
1038    }
1039    if event_time.chars().any(char::is_control) {
1040        return Err(DagMlError::RuntimeValidation(
1041            "OpenLineage event_time contains control characters".to_string(),
1042        ));
1043    }
1044    Ok(())
1045}
1046
1047fn dagml_openlineage_reproducibility_run_facet(
1048    plan: &ExecutionPlan,
1049    bundle: &ExecutionBundle,
1050) -> Value {
1051    json!({
1052        "_schemaURL": format!("{DAGML_OPENLINEAGE_FACET_SCHEMA_URL}#/$defs/DagmlReproducibilityRunFacet"),
1053        "plan_id": plan.id,
1054        "bundle_id": bundle.bundle_id,
1055        "graph_fingerprint": bundle.graph_fingerprint,
1056        "campaign_fingerprint": bundle.campaign_fingerprint,
1057        "controller_fingerprint": bundle.controller_fingerprint,
1058        "selected_variant_id": bundle.selected_variant_id,
1059        "variant_count": plan.variants.len(),
1060        "unsafe_flags": bundle.unsafe_flags,
1061    })
1062}
1063
1064fn dagml_openlineage_oof_safety_run_facet(
1065    bundle: &ExecutionBundle,
1066    lineage: &[LineageRecord],
1067) -> Value {
1068    json!({
1069        "_schemaURL": format!("{DAGML_OPENLINEAGE_FACET_SCHEMA_URL}#/$defs/DagmlOofSafetyRunFacet"),
1070        "prediction_requirement_count": bundle.prediction_requirements.len(),
1071        "prediction_cache_count": bundle.prediction_caches.len(),
1072        "lineage_record_count": lineage.len(),
1073        "requires_oof_prediction_count": bundle.prediction_requirements.len(),
1074        "refit_artifact_count": bundle.refit_artifacts.len(),
1075    })
1076}
1077
1078fn dagml_openlineage_plan_job_facet(plan: &ExecutionPlan, bundle: &ExecutionBundle) -> Value {
1079    json!({
1080        "_schemaURL": format!("{DAGML_OPENLINEAGE_FACET_SCHEMA_URL}#/$defs/DagmlPlanJobFacet"),
1081        "plan_id": plan.id,
1082        "bundle_id": bundle.bundle_id,
1083        "node_count": plan.node_plans.len(),
1084        "controller_count": plan.controller_manifests.len(),
1085        "has_fold_set": plan.fold_set.is_some(),
1086        "selected_variant_id": bundle.selected_variant_id,
1087    })
1088}
1089
1090fn openlineage_input_datasets(
1091    bundle: &ExecutionBundle,
1092    data_envelopes: &BTreeMap<String, ExternalDataPlanEnvelope>,
1093) -> Vec<Value> {
1094    bundle
1095        .data_requirements
1096        .iter()
1097        .map(|requirement| {
1098            let key = requirement.key();
1099            let envelope = data_envelopes.get(&key);
1100            json!({
1101                "namespace": "dagml:data-requirement",
1102                "name": key,
1103                "facets": {
1104                    "dagml_contract": {
1105                        "_schemaURL": format!("{DAGML_OPENLINEAGE_FACET_SCHEMA_URL}#/$defs/DagmlDatasetContractFacet"),
1106                        "node_id": requirement.node_id,
1107                        "input_name": requirement.input_name,
1108                        "schema_fingerprint": requirement.schema_fingerprint,
1109                        "plan_fingerprint": requirement.plan_fingerprint,
1110                        "relation_fingerprint": requirement.relation_fingerprint,
1111                        "feature_set_id": requirement.feature_set_id,
1112                        "envelope_schema_fingerprint": envelope.map(|envelope| envelope.schema_fingerprint.clone()),
1113                        "envelope_plan_fingerprint": envelope.map(|envelope| envelope.plan_fingerprint.clone()),
1114                    }
1115                }
1116            })
1117        })
1118        .collect()
1119}
1120
1121fn openlineage_output_datasets(
1122    bundle: &ExecutionBundle,
1123    prediction_cache_manifest: Option<&FilePredictionCacheManifest>,
1124    artifact_manifest: Option<&FileArtifactManifest>,
1125) -> Vec<Value> {
1126    let mut outputs = vec![json!({
1127        "namespace": "dagml:bundle",
1128        "name": bundle.bundle_id,
1129        "facets": {
1130            "dagml_contract": {
1131                "_schemaURL": format!("{DAGML_OPENLINEAGE_FACET_SCHEMA_URL}#/$defs/DagmlDatasetContractFacet"),
1132                "schema_version": bundle.schema_version,
1133                "plan_id": bundle.plan_id,
1134                "selected_variant_id": bundle.selected_variant_id,
1135                "graph_fingerprint": bundle.graph_fingerprint,
1136                "campaign_fingerprint": bundle.campaign_fingerprint,
1137                "controller_fingerprint": bundle.controller_fingerprint,
1138            }
1139        }
1140    })];
1141    for cache in &bundle.prediction_caches {
1142        outputs.push(json!({
1143            "namespace": "dagml:prediction-cache",
1144            "name": cache.cache_id,
1145            "facets": {
1146                "dagml_contract": {
1147                    "_schemaURL": format!("{DAGML_OPENLINEAGE_FACET_SCHEMA_URL}#/$defs/DagmlDatasetContractFacet"),
1148                    "requirement_key": cache.requirement_key,
1149                    "prediction_level": cache.prediction_level,
1150                    "row_count": cache.row_count,
1151                    "block_count": cache.block_count,
1152                    "content_fingerprint": cache.content_fingerprint,
1153                    "has_file_manifest": prediction_cache_manifest.is_some(),
1154                }
1155            }
1156        }));
1157    }
1158    for artifact in &bundle.refit_artifacts {
1159        outputs.push(json!({
1160            "namespace": "dagml:artifact",
1161            "name": artifact.artifact.id,
1162            "facets": {
1163                "dagml_contract": {
1164                    "_schemaURL": format!("{DAGML_OPENLINEAGE_FACET_SCHEMA_URL}#/$defs/DagmlDatasetContractFacet"),
1165                    "node_id": artifact.node_id,
1166                    "controller_id": artifact.controller_id,
1167                    "backend": artifact.artifact.backend,
1168                    "uri": artifact.artifact.uri,
1169                    "content_fingerprint": artifact.artifact.content_fingerprint,
1170                    "plugin": artifact.artifact.plugin,
1171                    "plugin_version": artifact.artifact.plugin_version,
1172                    "data_requirement_keys": artifact.data_requirement_keys,
1173                    "prediction_requirement_keys": artifact.prediction_requirement_keys,
1174                    "has_file_manifest": artifact_manifest.is_some(),
1175                }
1176            }
1177        }));
1178    }
1179    outputs
1180}
1181
1182fn openlineage_run_id(plan: &ExecutionPlan, bundle: &ExecutionBundle) -> String {
1183    let input = format!("dag-ml/openlineage/run/{}/{}", plan.id, bundle.bundle_id);
1184    let digest = Sha256::digest(input.as_bytes());
1185    let mut bytes = [0u8; 16];
1186    bytes.copy_from_slice(&digest[..16]);
1187    bytes[6] = (bytes[6] & 0x0f) | 0x50;
1188    bytes[8] = (bytes[8] & 0x3f) | 0x80;
1189    format!(
1190        "{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
1191        bytes[0],
1192        bytes[1],
1193        bytes[2],
1194        bytes[3],
1195        bytes[4],
1196        bytes[5],
1197        bytes[6],
1198        bytes[7],
1199        bytes[8],
1200        bytes[9],
1201        bytes[10],
1202        bytes[11],
1203        bytes[12],
1204        bytes[13],
1205        bytes[14],
1206        bytes[15],
1207    )
1208}
1209
1210fn validate_ro_crate_package_checksums(
1211    ro_crate_metadata: &Value,
1212    files: &BTreeMap<String, Vec<u8>>,
1213) -> Result<usize> {
1214    let graph = ro_crate_metadata
1215        .get("@graph")
1216        .and_then(Value::as_array)
1217        .ok_or_else(|| {
1218            DagMlError::RuntimeValidation("RO-Crate metadata has no @graph array".to_string())
1219        })?;
1220    let root = graph
1221        .iter()
1222        .find(|entry| entry.get("@id").and_then(Value::as_str) == Some("./"))
1223        .ok_or_else(|| {
1224            DagMlError::RuntimeValidation("RO-Crate metadata has no root dataset".to_string())
1225        })?;
1226    if root.get("dagml:schema_version").and_then(Value::as_u64)
1227        != Some(RESEARCH_PROVENANCE_SCHEMA_VERSION as u64)
1228    {
1229        return Err(DagMlError::RuntimeValidation(format!(
1230            "RO-Crate root has unsupported dagml:schema_version, expected {}",
1231            RESEARCH_PROVENANCE_SCHEMA_VERSION
1232        )));
1233    }
1234    let root_has_part = root
1235        .get("hasPart")
1236        .and_then(Value::as_array)
1237        .ok_or_else(|| {
1238            DagMlError::RuntimeValidation("RO-Crate root hasPart is not an array".to_string())
1239        })?;
1240    let root_has_part_ids = root_has_part
1241        .iter()
1242        .filter_map(|entry| entry.get("@id").and_then(Value::as_str))
1243        .collect::<BTreeSet<_>>();
1244
1245    let mut checksummed = 0;
1246    for (path, bytes) in files {
1247        if path == RO_CRATE_METADATA_FILE {
1248            continue;
1249        }
1250        if !root_has_part_ids.contains(path.as_str()) {
1251            return Err(DagMlError::RuntimeValidation(format!(
1252                "RO-Crate root does not list package file `{path}` in hasPart"
1253            )));
1254        }
1255        let entry = graph
1256            .iter()
1257            .find(|entry| entry.get("@id").and_then(Value::as_str) == Some(path.as_str()))
1258            .ok_or_else(|| {
1259                DagMlError::RuntimeValidation(format!(
1260                    "RO-Crate metadata is missing package file entry `{path}`"
1261                ))
1262            })?;
1263        let expected_sha256 = sha256_hex(bytes);
1264        let declared_sha256 = entry.get("sha256").and_then(Value::as_str).ok_or_else(|| {
1265            DagMlError::RuntimeValidation(format!(
1266                "RO-Crate package file `{path}` is missing sha256"
1267            ))
1268        })?;
1269        if declared_sha256 != expected_sha256 {
1270            return Err(DagMlError::RuntimeValidation(format!(
1271                "RO-Crate package file `{path}` sha256 mismatch"
1272            )));
1273        }
1274        let declared_dagml_sha256 = entry
1275            .get("dagml:sha256")
1276            .and_then(Value::as_str)
1277            .ok_or_else(|| {
1278                DagMlError::RuntimeValidation(format!(
1279                    "RO-Crate package file `{path}` is missing dagml:sha256"
1280                ))
1281            })?;
1282        if declared_dagml_sha256 != declared_sha256 {
1283            return Err(DagMlError::RuntimeValidation(format!(
1284                "RO-Crate package file `{path}` has inconsistent checksum fields"
1285            )));
1286        }
1287        if entry.get("contentSize").and_then(Value::as_u64) != Some(bytes.len() as u64) {
1288            return Err(DagMlError::RuntimeValidation(format!(
1289                "RO-Crate package file `{path}` contentSize mismatch"
1290            )));
1291        }
1292        if entry.get("encodingFormat").and_then(Value::as_str) != Some("application/json") {
1293            return Err(DagMlError::RuntimeValidation(format!(
1294                "RO-Crate package file `{path}` must declare application/json"
1295            )));
1296        }
1297        checksummed += 1;
1298    }
1299
1300    for has_part_id in root_has_part_ids {
1301        if has_part_id != RO_CRATE_METADATA_FILE && !files.contains_key(has_part_id) {
1302            return Err(DagMlError::RuntimeValidation(format!(
1303                "RO-Crate root references missing package file `{has_part_id}`"
1304            )));
1305        }
1306    }
1307
1308    Ok(checksummed)
1309}
1310
1311fn annotate_ro_crate_package_files(
1312    ro_crate_metadata: &mut Value,
1313    files: &BTreeMap<String, ResearchProvenancePackageFile>,
1314) -> Result<()> {
1315    let graph = ro_crate_metadata
1316        .get_mut("@graph")
1317        .and_then(Value::as_array_mut)
1318        .ok_or_else(|| {
1319            DagMlError::RuntimeValidation("RO-Crate metadata has no @graph array".to_string())
1320        })?;
1321
1322    let mut existing_ids = graph
1323        .iter()
1324        .filter_map(|entry| entry.get("@id").and_then(Value::as_str).map(str::to_string))
1325        .collect::<BTreeSet<_>>();
1326    for file in files.values() {
1327        if !existing_ids.contains(&file.path) {
1328            graph.push(file_entity(
1329                &file.path,
1330                &format!("DAG-ML contract file {}", file.path),
1331                "dagml:ContractArtifact",
1332            ));
1333            existing_ids.insert(file.path.clone());
1334        }
1335    }
1336
1337    for entry in graph.iter_mut() {
1338        let Some(id) = entry.get("@id").and_then(Value::as_str).map(str::to_string) else {
1339            continue;
1340        };
1341        let Some(file) = files.get(id.as_str()) else {
1342            continue;
1343        };
1344        let object = entry.as_object_mut().ok_or_else(|| {
1345            DagMlError::RuntimeValidation(format!("RO-Crate graph entry `{id}` is not an object"))
1346        })?;
1347        object.insert("encodingFormat".to_string(), json!("application/json"));
1348        object.insert("contentSize".to_string(), json!(file.size_bytes));
1349        object.insert("sha256".to_string(), json!(file.sha256));
1350        object.insert("dagml:sha256".to_string(), json!(file.sha256));
1351    }
1352
1353    let root = graph
1354        .iter_mut()
1355        .find(|entry| entry.get("@id") == Some(&json!("./")))
1356        .ok_or_else(|| {
1357            DagMlError::RuntimeValidation("RO-Crate metadata has no root dataset".to_string())
1358        })?;
1359    let root_object = root.as_object_mut().ok_or_else(|| {
1360        DagMlError::RuntimeValidation("RO-Crate root dataset is not an object".to_string())
1361    })?;
1362    let has_part = root_object
1363        .entry("hasPart".to_string())
1364        .or_insert_with(|| Value::Array(Vec::new()));
1365    let has_part = has_part.as_array_mut().ok_or_else(|| {
1366        DagMlError::RuntimeValidation("RO-Crate root hasPart is not an array".to_string())
1367    })?;
1368    let mut has_part_ids = has_part
1369        .iter()
1370        .filter_map(|entry| entry.get("@id").and_then(Value::as_str).map(str::to_string))
1371        .collect::<BTreeSet<_>>();
1372    for path in files.keys() {
1373        if has_part_ids.insert(path.clone()) {
1374            has_part.push(json!({"@id": path}));
1375        }
1376    }
1377    Ok(())
1378}
1379
1380fn sha256_hex(bytes: &[u8]) -> String {
1381    let digest = Sha256::digest(bytes);
1382    let mut out = String::with_capacity(digest.len() * 2);
1383    for byte in digest {
1384        use std::fmt::Write;
1385        write!(&mut out, "{byte:02x}").expect("writing to string cannot fail");
1386    }
1387    out
1388}
1389
1390fn lineage_artifact_index(lineage: &[LineageRecord]) -> BTreeMap<ArtifactId, String> {
1391    let mut index = BTreeMap::new();
1392    for record in lineage {
1393        for artifact in &record.artifact_refs {
1394            index.insert(artifact.id.clone(), lineage_activity_id(record));
1395        }
1396    }
1397    index
1398}
1399
1400fn lineage_activity_id(record: &LineageRecord) -> String {
1401    format!("dagml:activity:{}", record.record_id)
1402}
1403
1404fn controller_agent_id(controller_id: &str) -> String {
1405    format!("dagml:controller:{controller_id}")
1406}
1407
1408fn artifact_entity_id(artifact_id: &ArtifactId) -> String {
1409    format!("dagml:artifact:{artifact_id}")
1410}
1411
1412fn lineage_record_entity_id(lineage_id: &LineageId) -> String {
1413    format!("dagml:lineage-record:{lineage_id}")
1414}
1415
1416fn data_requirement_entity_id(key: &str) -> String {
1417    format!("dagml:data-requirement:{key}")
1418}
1419
1420fn data_envelope_entity_id(key: &str) -> String {
1421    format!("dagml:data-envelope:{key}")
1422}
1423
1424fn prediction_requirement_entity_id(key: &str) -> String {
1425    format!("dagml:prediction-requirement:{key}")
1426}
1427
1428fn prediction_cache_entity_id(cache_id: &str) -> String {
1429    format!("dagml:prediction-cache:{cache_id}")
1430}
1431
1432#[cfg(test)]
1433mod tests {
1434    use super::*;
1435    use crate::controller::{ControllerManifest, ControllerRegistry};
1436    use crate::ids::{ControllerId, LineageId, NodeId, RunId};
1437    use crate::plan::build_execution_plan;
1438    use crate::{CampaignSpec, GraphSpec, Phase};
1439
1440    fn fixture_plan() -> ExecutionPlan {
1441        let graph: GraphSpec =
1442            serde_json::from_str(include_str!("../../../examples/minimal_graph.json")).unwrap();
1443        let campaign: CampaignSpec = serde_json::from_str(include_str!(
1444            "../../../examples/campaign_oof_generation.json"
1445        ))
1446        .unwrap();
1447        let manifests: Vec<ControllerManifest> =
1448            serde_json::from_str(include_str!("../../../examples/controller_manifests.json"))
1449                .unwrap();
1450        let mut registry = ControllerRegistry::new();
1451        for manifest in manifests {
1452            registry.register(manifest).unwrap();
1453        }
1454        build_execution_plan("plan:cli.bundle", graph, campaign, &registry).unwrap()
1455    }
1456
1457    fn fixture_bundle() -> ExecutionBundle {
1458        serde_json::from_str(include_str!(
1459            "../../../examples/generated/execution_bundle_minimal.json"
1460        ))
1461        .unwrap()
1462    }
1463
1464    fn fixture_lineage(plan: &ExecutionPlan) -> LineageRecord {
1465        let node_id = NodeId::new("model:base").unwrap();
1466        let node_plan = plan.node_plans.get(&node_id).unwrap();
1467        LineageRecord {
1468            record_id: LineageId::new("lineage:test:model:base").unwrap(),
1469            run_id: RunId::new("run:provenance").unwrap(),
1470            node_id,
1471            phase: Phase::Refit,
1472            controller_id: node_plan.controller_id.clone(),
1473            controller_version: node_plan.controller_version.clone(),
1474            variant_id: plan
1475                .variants
1476                .first()
1477                .map(|variant| variant.variant_id.clone()),
1478            fold_id: None,
1479            branch_path: Vec::new(),
1480            input_lineage: Vec::new(),
1481            artifact_refs: vec![fixture_bundle().refit_artifacts[0].artifact.clone()],
1482            params_fingerprint: node_plan.params_fingerprint.clone(),
1483            data_model_shape_fingerprint: None,
1484            aggregation_policy_fingerprint: None,
1485            seed: Some(42),
1486            unsafe_flags: BTreeSet::new(),
1487            metrics: BTreeMap::new(),
1488        }
1489    }
1490
1491    #[test]
1492    fn research_provenance_export_contains_prov_and_ro_crate_contracts() {
1493        let plan = fixture_plan();
1494        let bundle = fixture_bundle();
1495        let lineage = vec![fixture_lineage(&plan)];
1496        let export = build_research_provenance_export(
1497            &plan,
1498            &bundle,
1499            &lineage,
1500            &BTreeMap::new(),
1501            None,
1502            None,
1503        )
1504        .unwrap();
1505
1506        assert_eq!(export.schema_version, RESEARCH_PROVENANCE_SCHEMA_VERSION);
1507        assert!(export.prov_jsonld["@context"]["prov"]
1508            .as_str()
1509            .unwrap()
1510            .contains("prov"));
1511        assert!(export.prov_jsonld["activity"]
1512            .as_object()
1513            .unwrap()
1514            .contains_key("dagml:activity:lineage:test:model:base"));
1515        assert!(export.prov_jsonld["agent"]
1516            .as_object()
1517            .unwrap()
1518            .contains_key("dagml:controller:controller:model.mock"));
1519        assert!(export.prov_jsonld["entity"]
1520            .as_object()
1521            .unwrap()
1522            .contains_key("dagml:artifact:artifact:model:base:refit"));
1523
1524        let graph = export.ro_crate_metadata["@graph"].as_array().unwrap();
1525        assert!(graph
1526            .iter()
1527            .any(|entry| entry["@type"].to_string().contains("ComputationalWorkflow")));
1528        assert!(graph
1529            .iter()
1530            .any(|entry| entry["@id"] == json!("lineage.prov.jsonld")));
1531        assert!(graph
1532            .iter()
1533            .any(|entry| entry["@id"] == json!("execution_bundle.json")));
1534    }
1535
1536    #[test]
1537    fn research_provenance_package_contains_contract_files_and_checksums() {
1538        let plan = fixture_plan();
1539        let bundle = fixture_bundle();
1540        let lineage = vec![fixture_lineage(&plan)];
1541        let package = build_research_provenance_package(
1542            &plan,
1543            &bundle,
1544            &lineage,
1545            &BTreeMap::new(),
1546            None,
1547            None,
1548        )
1549        .unwrap();
1550
1551        for path in [
1552            EXECUTION_PLAN_FILE,
1553            EXECUTION_BUNDLE_FILE,
1554            LINEAGE_RECORDS_FILE,
1555            PROV_JSONLD_FILE,
1556            RO_CRATE_METADATA_FILE,
1557        ] {
1558            assert!(
1559                package.files.contains_key(path),
1560                "package is missing {path}"
1561            );
1562        }
1563        for (path, file) in &package.files {
1564            assert_eq!(file.path, *path);
1565            assert_eq!(file.sha256.len(), 64, "invalid sha256 for {path}");
1566            assert!(file.size_bytes > 0, "empty package file {path}");
1567            assert_eq!(file.size_bytes, file.bytes.len());
1568        }
1569
1570        let ro_crate_file = package.files.get(RO_CRATE_METADATA_FILE).unwrap();
1571        let ro_crate_metadata: Value = serde_json::from_slice(&ro_crate_file.bytes).unwrap();
1572        let graph = ro_crate_metadata["@graph"].as_array().unwrap();
1573        for path in [
1574            EXECUTION_PLAN_FILE,
1575            EXECUTION_BUNDLE_FILE,
1576            LINEAGE_RECORDS_FILE,
1577            PROV_JSONLD_FILE,
1578        ] {
1579            let entry = graph
1580                .iter()
1581                .find(|entry| entry["@id"] == json!(path))
1582                .unwrap_or_else(|| panic!("RO-Crate metadata is missing file entry {path}"));
1583            assert_eq!(entry["sha256"].as_str().map(str::len), Some(64));
1584            assert_eq!(entry["dagml:sha256"].as_str(), entry["sha256"].as_str());
1585            assert_eq!(entry["encodingFormat"], json!("application/json"));
1586            assert!(entry["contentSize"].as_u64().unwrap() > 0);
1587        }
1588        let root = graph
1589            .iter()
1590            .find(|entry| entry["@id"] == json!("./"))
1591            .expect("RO-Crate root dataset is present");
1592        let has_part = root["hasPart"].as_array().unwrap();
1593        assert!(has_part
1594            .iter()
1595            .any(|entry| entry["@id"] == json!(LINEAGE_RECORDS_FILE)));
1596    }
1597
1598    #[test]
1599    fn research_provenance_package_validation_reopens_exported_contracts() {
1600        let plan = fixture_plan();
1601        let bundle = fixture_bundle();
1602        let lineage = vec![fixture_lineage(&plan)];
1603        let package = build_research_provenance_package(
1604            &plan,
1605            &bundle,
1606            &lineage,
1607            &BTreeMap::new(),
1608            None,
1609            None,
1610        )
1611        .unwrap();
1612        let files = package
1613            .files
1614            .iter()
1615            .map(|(path, file)| (path.clone(), file.bytes.clone()))
1616            .collect::<BTreeMap<_, _>>();
1617
1618        let validation = validate_research_provenance_package_files(&files).unwrap();
1619
1620        assert_eq!(
1621            validation.schema_version,
1622            RESEARCH_PROVENANCE_SCHEMA_VERSION
1623        );
1624        assert_eq!(validation.plan_id, plan.id.to_string());
1625        assert_eq!(validation.bundle_id, bundle.bundle_id.to_string());
1626        assert_eq!(validation.file_count, package.files.len());
1627        assert_eq!(validation.checksummed_file_count, package.files.len() - 1);
1628        assert_eq!(validation.lineage_record_count, 1);
1629    }
1630
1631    #[test]
1632    fn research_provenance_package_validation_refuses_tampered_file() {
1633        let plan = fixture_plan();
1634        let bundle = fixture_bundle();
1635        let package =
1636            build_research_provenance_package(&plan, &bundle, &[], &BTreeMap::new(), None, None)
1637                .unwrap();
1638        let mut files = package
1639            .files
1640            .iter()
1641            .map(|(path, file)| (path.clone(), file.bytes.clone()))
1642            .collect::<BTreeMap<_, _>>();
1643        files.insert(LINEAGE_RECORDS_FILE.to_string(), b"[]\n\n".to_vec());
1644
1645        let error = validate_research_provenance_package_files(&files)
1646            .unwrap_err()
1647            .to_string();
1648
1649        assert!(
1650            error.contains("sha256 mismatch"),
1651            "unexpected error: {error}"
1652        );
1653    }
1654
1655    #[test]
1656    fn openlineage_export_uses_validated_research_package() {
1657        let plan = fixture_plan();
1658        let bundle = fixture_bundle();
1659        let lineage = vec![fixture_lineage(&plan)];
1660        let package = build_research_provenance_package(
1661            &plan,
1662            &bundle,
1663            &lineage,
1664            &BTreeMap::new(),
1665            None,
1666            None,
1667        )
1668        .unwrap();
1669        let files = package
1670            .files
1671            .iter()
1672            .map(|(path, file)| (path.clone(), file.bytes.clone()))
1673            .collect::<BTreeMap<_, _>>();
1674
1675        let event = build_openlineage_run_event_from_package_files(
1676            &files,
1677            "dag-ml-test",
1678            "2026-05-27T00:00:00Z",
1679        )
1680        .unwrap();
1681
1682        assert_eq!(event["eventType"], json!("COMPLETE"));
1683        assert_eq!(event["schemaURL"], json!(OPENLINEAGE_RUN_EVENT_SCHEMA_URL));
1684        assert_eq!(event["job"]["namespace"], json!("dag-ml-test"));
1685        assert_eq!(event["run"]["runId"].as_str().map(str::len), Some(36));
1686        assert!(
1687            event["run"]["facets"]["dagml_reproducibility"]["graph_fingerprint"]
1688                .as_str()
1689                .is_some()
1690        );
1691        assert!(!event["inputs"].as_array().unwrap().is_empty());
1692        assert!(event["outputs"]
1693            .as_array()
1694            .unwrap()
1695            .iter()
1696            .any(|output| output["namespace"] == json!("dagml:bundle")));
1697    }
1698
1699    #[test]
1700    fn research_provenance_export_refuses_unknown_lineage_node() {
1701        let plan = fixture_plan();
1702        let bundle = fixture_bundle();
1703        let mut lineage = fixture_lineage(&plan);
1704        lineage.node_id = NodeId::new("model:missing").unwrap();
1705
1706        let error = build_research_provenance_export(
1707            &plan,
1708            &bundle,
1709            &[lineage],
1710            &BTreeMap::new(),
1711            None,
1712            None,
1713        )
1714        .unwrap_err()
1715        .to_string();
1716
1717        assert!(error.contains("unknown node"), "unexpected error: {error}");
1718    }
1719
1720    #[test]
1721    fn research_provenance_export_refuses_mismatched_artifact_manifest() {
1722        let plan = fixture_plan();
1723        let bundle = fixture_bundle();
1724        let mut manifest = FileArtifactManifest {
1725            bundle_id: bundle.bundle_id.clone(),
1726            schema_version: crate::runtime::FILE_ARTIFACT_MANIFEST_SCHEMA_VERSION,
1727            artifacts: Vec::new(),
1728        };
1729        manifest.bundle_id = crate::ids::BundleId::new("bundle:wrong").unwrap();
1730
1731        let error = build_research_provenance_export(
1732            &plan,
1733            &bundle,
1734            &[],
1735            &BTreeMap::new(),
1736            None,
1737            Some(&manifest),
1738        )
1739        .unwrap_err()
1740        .to_string();
1741
1742        assert!(
1743            error.contains("does not match bundle"),
1744            "unexpected error: {error}"
1745        );
1746    }
1747
1748    #[test]
1749    fn research_provenance_export_refuses_unknown_lineage_controller() {
1750        let plan = fixture_plan();
1751        let bundle = fixture_bundle();
1752        let mut lineage = fixture_lineage(&plan);
1753        lineage.controller_id = ControllerId::new("controller:missing").unwrap();
1754
1755        let error = build_research_provenance_export(
1756            &plan,
1757            &bundle,
1758            &[lineage],
1759            &BTreeMap::new(),
1760            None,
1761            None,
1762        )
1763        .unwrap_err()
1764        .to_string();
1765
1766        assert!(
1767            error.contains("unknown controller"),
1768            "unexpected error: {error}"
1769        );
1770    }
1771}