1use std::collections::{BTreeMap, BTreeSet};
2
3use serde::{de::DeserializeOwned, Deserialize, Serialize};
4use serde_json::{json, Value};
5use sha2::{Digest, Sha256};
6
7use crate::bundle::ExecutionBundle;
8use crate::data::ExternalDataPlanEnvelope;
9use crate::error::{DagMlError, Result};
10use crate::ids::{ArtifactId, LineageId};
11use crate::plan::ExecutionPlan;
12use crate::runtime::{
13 FileArtifactManifest, FilePredictionCacheManifest, LineageRecord, FILE_ARTIFACT_MANIFEST_FILE,
14 FILE_PREDICTION_CACHE_MANIFEST_FILE,
15};
16
17pub const RESEARCH_PROVENANCE_SCHEMA_VERSION: u32 = 1;
18pub const EXECUTION_PLAN_FILE: &str = "execution_plan.json";
19pub const EXECUTION_BUNDLE_FILE: &str = "execution_bundle.json";
20pub const LINEAGE_RECORDS_FILE: &str = "lineage_records.json";
21pub const PROV_JSONLD_FILE: &str = "lineage.prov.jsonld";
22pub const RO_CRATE_METADATA_FILE: &str = "ro-crate-metadata.json";
23pub const OPENLINEAGE_RUN_EVENT_SCHEMA_URL: &str =
24 "https://openlineage.io/spec/1-0-0/OpenLineage.json#/definitions/RunEvent";
25pub const DAGML_OPENLINEAGE_FACET_SCHEMA_URL: &str =
26 "https://github.com/GBeurier/dag-ml/schemas/openlineage_dagml_facets.v1.schema.json";
27
28#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
29pub struct ResearchProvenanceExport {
30 pub schema_version: u32,
31 pub prov_jsonld: Value,
32 pub ro_crate_metadata: Value,
33}
34
35#[derive(Clone, Debug, Eq, PartialEq)]
36pub struct ResearchProvenancePackage {
37 pub schema_version: u32,
38 pub files: BTreeMap<String, ResearchProvenancePackageFile>,
39}
40
41#[derive(Clone, Debug, Eq, PartialEq)]
42pub struct ResearchProvenancePackageFile {
43 pub path: String,
44 pub sha256: String,
45 pub size_bytes: usize,
46 pub bytes: Vec<u8>,
47}
48
49#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
50pub struct ResearchProvenancePackageValidation {
51 pub schema_version: u32,
52 pub plan_id: String,
53 pub bundle_id: String,
54 pub file_count: usize,
55 pub checksummed_file_count: usize,
56 pub lineage_record_count: usize,
57 pub data_envelope_count: usize,
58 pub has_prediction_cache_manifest: bool,
59 pub has_artifact_manifest: bool,
60}
61
62#[derive(Clone, Debug, Eq, PartialEq)]
63pub struct OpenLineageRunEventOptions {
64 pub namespace: String,
65 pub event_time: String,
66}
67
68pub fn build_research_provenance_export(
69 plan: &ExecutionPlan,
70 bundle: &ExecutionBundle,
71 lineage: &[LineageRecord],
72 data_envelopes: &BTreeMap<String, ExternalDataPlanEnvelope>,
73 prediction_cache_manifest: Option<&FilePredictionCacheManifest>,
74 artifact_manifest: Option<&FileArtifactManifest>,
75) -> Result<ResearchProvenanceExport> {
76 validate_provenance_inputs(
77 plan,
78 bundle,
79 lineage,
80 data_envelopes,
81 prediction_cache_manifest,
82 artifact_manifest,
83 )?;
84
85 Ok(ResearchProvenanceExport {
86 schema_version: RESEARCH_PROVENANCE_SCHEMA_VERSION,
87 prov_jsonld: build_prov_jsonld(
88 plan,
89 bundle,
90 lineage,
91 data_envelopes,
92 prediction_cache_manifest,
93 artifact_manifest,
94 )?,
95 ro_crate_metadata: build_ro_crate_metadata(
96 plan,
97 bundle,
98 data_envelopes,
99 prediction_cache_manifest,
100 artifact_manifest,
101 )?,
102 })
103}
104
105pub fn build_research_provenance_package(
106 plan: &ExecutionPlan,
107 bundle: &ExecutionBundle,
108 lineage: &[LineageRecord],
109 data_envelopes: &BTreeMap<String, ExternalDataPlanEnvelope>,
110 prediction_cache_manifest: Option<&FilePredictionCacheManifest>,
111 artifact_manifest: Option<&FileArtifactManifest>,
112) -> Result<ResearchProvenancePackage> {
113 let export = build_research_provenance_export(
114 plan,
115 bundle,
116 lineage,
117 data_envelopes,
118 prediction_cache_manifest,
119 artifact_manifest,
120 )?;
121 let mut files = BTreeMap::new();
122 add_json_package_file(&mut files, EXECUTION_PLAN_FILE, plan, "execution plan")?;
123 add_json_package_file(
124 &mut files,
125 EXECUTION_BUNDLE_FILE,
126 bundle,
127 "execution bundle",
128 )?;
129 add_json_package_file(
130 &mut files,
131 LINEAGE_RECORDS_FILE,
132 &lineage,
133 "lineage records",
134 )?;
135 add_json_package_file(
136 &mut files,
137 PROV_JSONLD_FILE,
138 &export.prov_jsonld,
139 "PROV JSON-LD",
140 )?;
141 if let Some(manifest) = prediction_cache_manifest {
142 add_json_package_file(
143 &mut files,
144 FILE_PREDICTION_CACHE_MANIFEST_FILE,
145 manifest,
146 "prediction cache manifest",
147 )?;
148 }
149 if let Some(manifest) = artifact_manifest {
150 add_json_package_file(
151 &mut files,
152 FILE_ARTIFACT_MANIFEST_FILE,
153 manifest,
154 "artifact manifest",
155 )?;
156 }
157 for (key, envelope) in data_envelopes {
158 add_json_package_file(
159 &mut files,
160 &data_envelope_file_path(key)?,
161 envelope,
162 "data envelope",
163 )?;
164 }
165
166 let mut ro_crate_metadata = export.ro_crate_metadata;
167 annotate_ro_crate_package_files(&mut ro_crate_metadata, &files)?;
168 add_json_package_file(
169 &mut files,
170 RO_CRATE_METADATA_FILE,
171 &ro_crate_metadata,
172 "RO-Crate metadata",
173 )?;
174
175 Ok(ResearchProvenancePackage {
176 schema_version: RESEARCH_PROVENANCE_SCHEMA_VERSION,
177 files,
178 })
179}
180
181pub fn validate_research_provenance_package_files(
182 files: &BTreeMap<String, Vec<u8>>,
183) -> Result<ResearchProvenancePackageValidation> {
184 if files.is_empty() {
185 return Err(DagMlError::RuntimeValidation(
186 "research provenance package has no files".to_string(),
187 ));
188 }
189 for path in files.keys() {
190 validate_package_path(path)?;
191 }
192 require_package_file(files, EXECUTION_PLAN_FILE)?;
193 require_package_file(files, EXECUTION_BUNDLE_FILE)?;
194 require_package_file(files, LINEAGE_RECORDS_FILE)?;
195 require_package_file(files, PROV_JSONLD_FILE)?;
196 let ro_crate_metadata: Value = parse_package_json(
197 require_package_file(files, RO_CRATE_METADATA_FILE)?,
198 RO_CRATE_METADATA_FILE,
199 )?;
200
201 let checksummed_file_count = validate_ro_crate_package_checksums(&ro_crate_metadata, files)?;
202 validate_prov_jsonld_root(parse_package_json(
203 require_package_file(files, PROV_JSONLD_FILE)?,
204 PROV_JSONLD_FILE,
205 )?)?;
206
207 let plan: ExecutionPlan = parse_package_json(
208 require_package_file(files, EXECUTION_PLAN_FILE)?,
209 EXECUTION_PLAN_FILE,
210 )?;
211 let bundle: ExecutionBundle = parse_package_json(
212 require_package_file(files, EXECUTION_BUNDLE_FILE)?,
213 EXECUTION_BUNDLE_FILE,
214 )?;
215 let lineage: Vec<LineageRecord> = parse_package_json(
216 require_package_file(files, LINEAGE_RECORDS_FILE)?,
217 LINEAGE_RECORDS_FILE,
218 )?;
219 let data_envelopes = parse_package_data_envelopes(files)?;
220 let prediction_cache_manifest: Option<FilePredictionCacheManifest> = files
221 .get(FILE_PREDICTION_CACHE_MANIFEST_FILE)
222 .map(|bytes| parse_package_json(bytes, FILE_PREDICTION_CACHE_MANIFEST_FILE))
223 .transpose()?;
224 let artifact_manifest: Option<FileArtifactManifest> = files
225 .get(FILE_ARTIFACT_MANIFEST_FILE)
226 .map(|bytes| parse_package_json(bytes, FILE_ARTIFACT_MANIFEST_FILE))
227 .transpose()?;
228
229 validate_provenance_inputs(
230 &plan,
231 &bundle,
232 &lineage,
233 &data_envelopes,
234 prediction_cache_manifest.as_ref(),
235 artifact_manifest.as_ref(),
236 )?;
237
238 Ok(ResearchProvenancePackageValidation {
239 schema_version: RESEARCH_PROVENANCE_SCHEMA_VERSION,
240 plan_id: plan.id.to_string(),
241 bundle_id: bundle.bundle_id.to_string(),
242 file_count: files.len(),
243 checksummed_file_count,
244 lineage_record_count: lineage.len(),
245 data_envelope_count: data_envelopes.len(),
246 has_prediction_cache_manifest: prediction_cache_manifest.is_some(),
247 has_artifact_manifest: artifact_manifest.is_some(),
248 })
249}
250
251pub fn build_openlineage_run_event_from_package_files(
252 files: &BTreeMap<String, Vec<u8>>,
253 namespace: &str,
254 event_time: &str,
255) -> Result<Value> {
256 validate_research_provenance_package_files(files)?;
257 let plan: ExecutionPlan = parse_package_json(
258 require_package_file(files, EXECUTION_PLAN_FILE)?,
259 EXECUTION_PLAN_FILE,
260 )?;
261 let bundle: ExecutionBundle = parse_package_json(
262 require_package_file(files, EXECUTION_BUNDLE_FILE)?,
263 EXECUTION_BUNDLE_FILE,
264 )?;
265 let lineage: Vec<LineageRecord> = parse_package_json(
266 require_package_file(files, LINEAGE_RECORDS_FILE)?,
267 LINEAGE_RECORDS_FILE,
268 )?;
269 let data_envelopes = parse_package_data_envelopes(files)?;
270 let prediction_cache_manifest: Option<FilePredictionCacheManifest> = files
271 .get(FILE_PREDICTION_CACHE_MANIFEST_FILE)
272 .map(|bytes| parse_package_json(bytes, FILE_PREDICTION_CACHE_MANIFEST_FILE))
273 .transpose()?;
274 let artifact_manifest: Option<FileArtifactManifest> = files
275 .get(FILE_ARTIFACT_MANIFEST_FILE)
276 .map(|bytes| parse_package_json(bytes, FILE_ARTIFACT_MANIFEST_FILE))
277 .transpose()?;
278 let options = OpenLineageRunEventOptions {
279 namespace: namespace.to_string(),
280 event_time: event_time.to_string(),
281 };
282
283 build_openlineage_run_event(
284 &plan,
285 &bundle,
286 &lineage,
287 &data_envelopes,
288 prediction_cache_manifest.as_ref(),
289 artifact_manifest.as_ref(),
290 &options,
291 )
292}
293
294pub fn build_openlineage_run_event(
295 plan: &ExecutionPlan,
296 bundle: &ExecutionBundle,
297 lineage: &[LineageRecord],
298 data_envelopes: &BTreeMap<String, ExternalDataPlanEnvelope>,
299 prediction_cache_manifest: Option<&FilePredictionCacheManifest>,
300 artifact_manifest: Option<&FileArtifactManifest>,
301 options: &OpenLineageRunEventOptions,
302) -> Result<Value> {
303 validate_provenance_inputs(
304 plan,
305 bundle,
306 lineage,
307 data_envelopes,
308 prediction_cache_manifest,
309 artifact_manifest,
310 )?;
311 validate_openlineage_namespace(options.namespace.as_str())?;
312 validate_openlineage_event_time(options.event_time.as_str())?;
313
314 Ok(json!({
315 "eventType": "COMPLETE",
316 "eventTime": options.event_time.as_str(),
317 "run": {
318 "runId": openlineage_run_id(plan, bundle),
319 "facets": {
320 "dagml_reproducibility": dagml_openlineage_reproducibility_run_facet(plan, bundle),
321 "dagml_oof_safety": dagml_openlineage_oof_safety_run_facet(bundle, lineage),
322 },
323 },
324 "job": {
325 "namespace": options.namespace.as_str(),
326 "name": format!("{}::{}", plan.id, bundle.bundle_id),
327 "facets": {
328 "dagml_plan": dagml_openlineage_plan_job_facet(plan, bundle),
329 },
330 },
331 "inputs": openlineage_input_datasets(bundle, data_envelopes),
332 "outputs": openlineage_output_datasets(bundle, prediction_cache_manifest, artifact_manifest),
333 "producer": "https://github.com/GBeurier/dag-ml",
334 "schemaURL": OPENLINEAGE_RUN_EVENT_SCHEMA_URL,
335 }))
336}
337
338fn validate_provenance_inputs(
339 plan: &ExecutionPlan,
340 bundle: &ExecutionBundle,
341 lineage: &[LineageRecord],
342 data_envelopes: &BTreeMap<String, ExternalDataPlanEnvelope>,
343 prediction_cache_manifest: Option<&FilePredictionCacheManifest>,
344 artifact_manifest: Option<&FileArtifactManifest>,
345) -> Result<()> {
346 plan.validate()?;
347 bundle.validate_against_plan(plan)?;
348 if !data_envelopes.is_empty() {
349 bundle.validate_replay_envelopes(data_envelopes)?;
350 }
351 if let Some(manifest) = prediction_cache_manifest {
352 manifest.validate_against_bundle(bundle)?;
353 }
354 if let Some(manifest) = artifact_manifest {
355 manifest.validate_against_bundle(bundle)?;
356 }
357
358 let mut lineage_ids = BTreeSet::<&LineageId>::new();
359 for record in lineage {
360 record.validate()?;
361 if !plan.node_plans.contains_key(&record.node_id) {
362 return Err(DagMlError::RuntimeValidation(format!(
363 "provenance lineage `{}` references unknown node `{}`",
364 record.record_id, record.node_id
365 )));
366 }
367 if !plan
368 .controller_manifests
369 .contains_key(&record.controller_id)
370 {
371 return Err(DagMlError::RuntimeValidation(format!(
372 "provenance lineage `{}` references unknown controller `{}`",
373 record.record_id, record.controller_id
374 )));
375 }
376 if !lineage_ids.insert(&record.record_id) {
377 return Err(DagMlError::RuntimeValidation(format!(
378 "duplicate provenance lineage record `{}`",
379 record.record_id
380 )));
381 }
382 }
383 for record in lineage {
384 for input_id in &record.input_lineage {
385 if !lineage_ids.contains(input_id) {
386 return Err(DagMlError::RuntimeValidation(format!(
387 "provenance lineage `{}` references missing input lineage `{}`",
388 record.record_id, input_id
389 )));
390 }
391 }
392 }
393 Ok(())
394}
395
396fn build_prov_jsonld(
397 plan: &ExecutionPlan,
398 bundle: &ExecutionBundle,
399 lineage: &[LineageRecord],
400 data_envelopes: &BTreeMap<String, ExternalDataPlanEnvelope>,
401 prediction_cache_manifest: Option<&FilePredictionCacheManifest>,
402 artifact_manifest: Option<&FileArtifactManifest>,
403) -> Result<Value> {
404 let plan_entity_id = format!("dagml:execution-plan:{}", plan.id);
405 let bundle_entity_id = format!("dagml:execution-bundle:{}", bundle.bundle_id);
406 let packaging_activity_id = format!("dagml:activity:package-bundle:{}", bundle.bundle_id);
407 let coordinator_agent_id = "dagml:agent:dag-ml".to_string();
408
409 let mut entity = BTreeMap::<String, Value>::new();
410 entity.insert(
411 plan_entity_id.clone(),
412 json!({
413 "prov:type": ["prov:Entity", "dagml:ExecutionPlan"],
414 "dagml:plan_id": plan.id,
415 "dagml:graph_fingerprint": plan.graph_fingerprint,
416 "dagml:campaign_fingerprint": plan.campaign_fingerprint,
417 "dagml:controller_fingerprint": plan.controller_fingerprint,
418 "dagml:variant_count": plan.variants.len(),
419 "dagml:has_fold_set": plan.fold_set.is_some(),
420 }),
421 );
422 entity.insert(
423 bundle_entity_id.clone(),
424 json!({
425 "prov:type": ["prov:Entity", "dagml:ExecutionBundle"],
426 "dagml:bundle_id": bundle.bundle_id,
427 "dagml:schema_version": bundle.schema_version,
428 "dagml:plan_id": bundle.plan_id,
429 "dagml:selected_variant_id": bundle.selected_variant_id,
430 "dagml:graph_fingerprint": bundle.graph_fingerprint,
431 "dagml:campaign_fingerprint": bundle.campaign_fingerprint,
432 "dagml:controller_fingerprint": bundle.controller_fingerprint,
433 "dagml:unsafe_flags": bundle.unsafe_flags,
434 "dagml:selection_count": bundle.selections.len(),
435 }),
436 );
437
438 for requirement in &bundle.data_requirements {
439 let key = requirement.key();
440 entity.insert(
441 data_requirement_entity_id(&key),
442 json!({
443 "prov:type": ["prov:Entity", "dagml:DataRequirement"],
444 "dagml:requirement_key": key,
445 "dagml:node_id": requirement.node_id,
446 "dagml:input_name": requirement.input_name,
447 "dagml:schema_fingerprint": requirement.schema_fingerprint,
448 "dagml:plan_fingerprint": requirement.plan_fingerprint,
449 "dagml:relation_fingerprint": requirement.relation_fingerprint,
450 "dagml:feature_set_id": requirement.feature_set_id,
451 }),
452 );
453 }
454 for (key, envelope) in data_envelopes {
455 entity.insert(
456 data_envelope_entity_id(key),
457 json!({
458 "prov:type": ["prov:Entity", "dagml:ExternalDataPlanEnvelope"],
459 "dagml:envelope_key": key,
460 "dagml:schema_version": envelope.schema_version,
461 "dagml:schema_fingerprint": envelope.schema_fingerprint,
462 "dagml:plan_fingerprint": envelope.plan_fingerprint,
463 "dagml:relation_fingerprint": envelope.relation_fingerprint,
464 }),
465 );
466 }
467 for requirement in &bundle.prediction_requirements {
468 let key = requirement.key();
469 entity.insert(
470 prediction_requirement_entity_id(&key),
471 json!({
472 "prov:type": ["prov:Entity", "dagml:PredictionRequirement"],
473 "dagml:requirement_key": key,
474 "dagml:producer_node": requirement.producer_node,
475 "dagml:consumer_node": requirement.consumer_node,
476 "dagml:prediction_level": requirement.prediction_level,
477 "dagml:fold_ids": requirement.fold_ids,
478 "dagml:unit_ids": requirement.unit_ids,
479 "dagml:sample_ids": requirement.sample_ids,
480 "dagml:prediction_width": requirement.prediction_width,
481 "dagml:target_names": requirement.target_names,
482 }),
483 );
484 }
485 for cache in &bundle.prediction_caches {
486 entity.insert(
487 prediction_cache_entity_id(&cache.cache_id),
488 json!({
489 "prov:type": ["prov:Entity", "dagml:PredictionCache"],
490 "dagml:requirement_key": cache.requirement_key,
491 "dagml:cache_id": cache.cache_id,
492 "dagml:format": cache.format,
493 "dagml:prediction_level": cache.prediction_level,
494 "dagml:unit_ids": cache.unit_ids,
495 "dagml:block_count": cache.block_count,
496 "dagml:row_count": cache.row_count,
497 "dagml:content_fingerprint": cache.content_fingerprint,
498 }),
499 );
500 }
501 if let Some(manifest) = prediction_cache_manifest {
502 entity.insert(
503 "dagml:file:prediction-cache-manifest".to_string(),
504 json!({
505 "prov:type": ["prov:Entity", "dagml:PredictionCacheManifest"],
506 "dagml:file": FILE_PREDICTION_CACHE_MANIFEST_FILE,
507 "dagml:schema_version": manifest.schema_version,
508 "dagml:cache_count": manifest.caches.len(),
509 }),
510 );
511 }
512 for record in &bundle.refit_artifacts {
513 entity.insert(
514 artifact_entity_id(&record.artifact.id),
515 json!({
516 "prov:type": ["prov:Entity", "dagml:ModelArtifact"],
517 "dagml:artifact_id": record.artifact.id,
518 "dagml:kind": record.artifact.kind,
519 "dagml:node_id": record.node_id,
520 "dagml:controller_id": record.controller_id,
521 "dagml:backend": record.artifact.backend,
522 "dagml:uri": record.artifact.uri,
523 "dagml:content_fingerprint": record.artifact.content_fingerprint,
524 "dagml:size_bytes": record.artifact.size_bytes,
525 "dagml:plugin": record.artifact.plugin,
526 "dagml:plugin_version": record.artifact.plugin_version,
527 "dagml:params_fingerprint": record.params_fingerprint,
528 "dagml:data_requirement_keys": record.data_requirement_keys,
529 "dagml:prediction_requirement_keys": record.prediction_requirement_keys,
530 }),
531 );
532 }
533 if let Some(manifest) = artifact_manifest {
534 entity.insert(
535 "dagml:file:artifact-manifest".to_string(),
536 json!({
537 "prov:type": ["prov:Entity", "dagml:ArtifactManifest"],
538 "dagml:file": FILE_ARTIFACT_MANIFEST_FILE,
539 "dagml:schema_version": manifest.schema_version,
540 "dagml:artifact_count": manifest.artifacts.len(),
541 }),
542 );
543 }
544 for record in lineage {
545 entity.insert(
546 lineage_record_entity_id(&record.record_id),
547 json!({
548 "prov:type": ["prov:Entity", "dagml:LineageRecord"],
549 "dagml:lineage_id": record.record_id,
550 "dagml:run_id": record.run_id,
551 "dagml:node_id": record.node_id,
552 "dagml:phase": record.phase,
553 "dagml:controller_id": record.controller_id,
554 "dagml:variant_id": record.variant_id,
555 "dagml:fold_id": record.fold_id,
556 "dagml:branch_path": record.branch_path,
557 "dagml:input_lineage": record.input_lineage,
558 "dagml:artifact_refs": record
559 .artifact_refs
560 .iter()
561 .map(|artifact| artifact.id.clone())
562 .collect::<Vec<_>>(),
563 }),
564 );
565 }
566
567 let mut agent = BTreeMap::<String, Value>::new();
568 agent.insert(
569 coordinator_agent_id.clone(),
570 json!({
571 "prov:type": ["prov:Agent", "dagml:Coordinator"],
572 "dagml:name": "dag-ml",
573 "dagml:provenance_schema_version": RESEARCH_PROVENANCE_SCHEMA_VERSION,
574 }),
575 );
576 for manifest in plan.controller_manifests.values() {
577 agent.insert(
578 controller_agent_id(manifest.controller_id.as_str()),
579 json!({
580 "prov:type": ["prov:Agent", "dagml:Controller"],
581 "dagml:controller_id": manifest.controller_id,
582 "dagml:controller_version": manifest.controller_version,
583 "dagml:operator_kind": manifest.operator_kind,
584 "dagml:fit_scope": manifest.fit_scope,
585 "dagml:rng_policy": manifest.rng_policy,
586 "dagml:artifact_policy": manifest.artifact_policy,
587 "dagml:capabilities": manifest.capabilities,
588 }),
589 );
590 }
591
592 let mut activity = BTreeMap::<String, Value>::new();
593 activity.insert(
594 packaging_activity_id.clone(),
595 json!({
596 "prov:type": ["prov:Activity", "dagml:BundlePackaging"],
597 "dagml:bundle_id": bundle.bundle_id,
598 "dagml:plan_id": bundle.plan_id,
599 "dagml:selected_variant_id": bundle.selected_variant_id,
600 }),
601 );
602 for record in lineage {
603 activity.insert(
604 lineage_activity_id(record),
605 json!({
606 "prov:type": ["prov:Activity", "dagml:NodeExecution"],
607 "dagml:lineage_id": record.record_id,
608 "dagml:run_id": record.run_id,
609 "dagml:node_id": record.node_id,
610 "dagml:phase": record.phase,
611 "dagml:controller_id": record.controller_id,
612 "dagml:controller_version": record.controller_version,
613 "dagml:variant_id": record.variant_id,
614 "dagml:fold_id": record.fold_id,
615 "dagml:branch_path": record.branch_path,
616 "dagml:params_fingerprint": record.params_fingerprint,
617 "dagml:data_model_shape_fingerprint": record.data_model_shape_fingerprint,
618 "dagml:aggregation_policy_fingerprint": record.aggregation_policy_fingerprint,
619 "dagml:seed": record.seed,
620 "dagml:unsafe_flags": record.unsafe_flags,
621 "dagml:metrics": record.metrics,
622 }),
623 );
624 }
625
626 let mut used = BTreeMap::<String, Value>::new();
627 used.insert(
628 "dagml:used:bundle-plan".to_string(),
629 json!({
630 "prov:activity": packaging_activity_id,
631 "prov:entity": plan_entity_id,
632 }),
633 );
634 for record in lineage {
635 for input_id in &record.input_lineage {
636 used.insert(
637 format!("dagml:used:{}:{}", record.record_id, input_id),
638 json!({
639 "prov:activity": lineage_activity_id(record),
640 "prov:entity": lineage_record_entity_id(input_id),
641 "dagml:input_lineage_id": input_id,
642 }),
643 );
644 }
645 }
646
647 let lineage_by_artifact = lineage_artifact_index(lineage);
648 let mut was_generated_by = BTreeMap::<String, Value>::new();
649 was_generated_by.insert(
650 "dagml:generated:bundle".to_string(),
651 json!({
652 "prov:entity": bundle_entity_id,
653 "prov:activity": packaging_activity_id,
654 }),
655 );
656 for record in lineage {
657 was_generated_by.insert(
658 format!("dagml:generated:lineage:{}", record.record_id),
659 json!({
660 "prov:entity": lineage_record_entity_id(&record.record_id),
661 "prov:activity": lineage_activity_id(record),
662 }),
663 );
664 }
665 for record in &bundle.refit_artifacts {
666 let activity_id = lineage_by_artifact
667 .get(&record.artifact.id)
668 .cloned()
669 .unwrap_or_else(|| packaging_activity_id.clone());
670 was_generated_by.insert(
671 format!("dagml:generated:artifact:{}", record.artifact.id),
672 json!({
673 "prov:entity": artifact_entity_id(&record.artifact.id),
674 "prov:activity": activity_id,
675 }),
676 );
677 }
678
679 let mut was_derived_from = BTreeMap::<String, Value>::new();
680 was_derived_from.insert(
681 "dagml:derived:bundle-plan".to_string(),
682 json!({
683 "prov:generatedEntity": bundle_entity_id,
684 "prov:usedEntity": plan_entity_id,
685 }),
686 );
687 for record in &bundle.refit_artifacts {
688 for key in &record.data_requirement_keys {
689 was_derived_from.insert(
690 format!("dagml:derived:{}:data:{key}", record.artifact.id),
691 json!({
692 "prov:generatedEntity": artifact_entity_id(&record.artifact.id),
693 "prov:usedEntity": data_requirement_entity_id(key),
694 "dagml:refit_dependency": "data_requirement",
695 }),
696 );
697 }
698 for key in &record.prediction_requirement_keys {
699 was_derived_from.insert(
700 format!("dagml:derived:{}:prediction:{key}", record.artifact.id),
701 json!({
702 "prov:generatedEntity": artifact_entity_id(&record.artifact.id),
703 "prov:usedEntity": prediction_requirement_entity_id(key),
704 "dagml:refit_dependency": "prediction_requirement",
705 "dagml:oof_dependency": true,
706 }),
707 );
708 }
709 }
710 for cache in &bundle.prediction_caches {
711 was_derived_from.insert(
712 format!("dagml:derived:cache:{}", cache.cache_id),
713 json!({
714 "prov:generatedEntity": prediction_cache_entity_id(&cache.cache_id),
715 "prov:usedEntity": prediction_requirement_entity_id(&cache.requirement_key),
716 }),
717 );
718 }
719 for record in lineage {
720 for input_id in &record.input_lineage {
721 was_derived_from.insert(
722 format!("dagml:derived:lineage:{}:{input_id}", record.record_id),
723 json!({
724 "prov:generatedEntity": lineage_record_entity_id(&record.record_id),
725 "prov:usedEntity": lineage_record_entity_id(input_id),
726 "dagml:lineage_dependency": true,
727 }),
728 );
729 }
730 }
731
732 let mut was_associated_with = BTreeMap::<String, Value>::new();
733 was_associated_with.insert(
734 "dagml:associated:bundle-packaging".to_string(),
735 json!({
736 "prov:activity": packaging_activity_id,
737 "prov:agent": coordinator_agent_id,
738 }),
739 );
740 for record in lineage {
741 was_associated_with.insert(
742 format!("dagml:associated:{}", record.record_id),
743 json!({
744 "prov:activity": lineage_activity_id(record),
745 "prov:agent": controller_agent_id(record.controller_id.as_str()),
746 }),
747 );
748 }
749
750 Ok(json!({
751 "@context": {
752 "prov": "http://www.w3.org/ns/prov#",
753 "dagml": "https://dag-ml.dev/ns#",
754 },
755 "entity": entity,
756 "activity": activity,
757 "agent": agent,
758 "used": used,
759 "wasGeneratedBy": was_generated_by,
760 "wasDerivedFrom": was_derived_from,
761 "wasAssociatedWith": was_associated_with,
762 }))
763}
764
765fn build_ro_crate_metadata(
766 plan: &ExecutionPlan,
767 bundle: &ExecutionBundle,
768 data_envelopes: &BTreeMap<String, ExternalDataPlanEnvelope>,
769 prediction_cache_manifest: Option<&FilePredictionCacheManifest>,
770 artifact_manifest: Option<&FileArtifactManifest>,
771) -> Result<Value> {
772 let mut has_part = vec![
773 json!({"@id": "execution_plan.json"}),
774 json!({"@id": "execution_bundle.json"}),
775 json!({"@id": PROV_JSONLD_FILE}),
776 ];
777 let mut graph = vec![
778 json!({
779 "@id": RO_CRATE_METADATA_FILE,
780 "@type": "CreativeWork",
781 "about": {"@id": "./"},
782 "conformsTo": {"@id": "https://w3id.org/ro/crate/1.1"},
783 }),
784 json!({
785 "@id": "./",
786 "@type": "Dataset",
787 "name": format!("DAG-ML research bundle {}", bundle.bundle_id),
788 "mainEntity": {"@id": "#workflow"},
789 "hasPart": has_part.clone(),
790 "dagml:schema_version": RESEARCH_PROVENANCE_SCHEMA_VERSION,
791 "dagml:bundle_id": bundle.bundle_id,
792 "dagml:plan_id": plan.id,
793 "dagml:unsafe_flags": bundle.unsafe_flags,
794 }),
795 json!({
796 "@id": "#workflow",
797 "@type": ["ComputationalWorkflow", "SoftwareSourceCode"],
798 "name": "DAG-ML compiled workflow",
799 "programmingLanguage": "Rust",
800 "dagml:plan_id": plan.id,
801 "dagml:graph_fingerprint": plan.graph_fingerprint,
802 "dagml:campaign_fingerprint": plan.campaign_fingerprint,
803 "dagml:controller_fingerprint": plan.controller_fingerprint,
804 "dagml:selected_variant_id": bundle.selected_variant_id,
805 "dagml:variant_count": plan.variants.len(),
806 }),
807 file_entity(
808 "execution_plan.json",
809 "DAG-ML execution plan",
810 "dagml:ExecutionPlan",
811 ),
812 file_entity(
813 "execution_bundle.json",
814 "DAG-ML execution bundle",
815 "dagml:ExecutionBundle",
816 ),
817 file_entity(PROV_JSONLD_FILE, "DAG-ML W3C PROV export", "prov:Bundle"),
818 ];
819
820 if prediction_cache_manifest.is_some() {
821 has_part.push(json!({"@id": FILE_PREDICTION_CACHE_MANIFEST_FILE}));
822 graph.push(file_entity(
823 FILE_PREDICTION_CACHE_MANIFEST_FILE,
824 "DAG-ML prediction cache manifest",
825 "dagml:PredictionCacheManifest",
826 ));
827 }
828 if artifact_manifest.is_some() {
829 has_part.push(json!({"@id": FILE_ARTIFACT_MANIFEST_FILE}));
830 graph.push(file_entity(
831 FILE_ARTIFACT_MANIFEST_FILE,
832 "DAG-ML artifact manifest",
833 "dagml:ArtifactManifest",
834 ));
835 }
836 for (key, envelope) in data_envelopes {
837 let id = format!("data_envelopes/{key}.json");
838 has_part.push(json!({"@id": id}));
839 graph.push(json!({
840 "@id": id,
841 "@type": ["File", "dagml:ExternalDataPlanEnvelope"],
842 "name": format!("DAG-ML data envelope {key}"),
843 "dagml:envelope_key": key,
844 "dagml:schema_version": envelope.schema_version,
845 "dagml:schema_fingerprint": envelope.schema_fingerprint,
846 "dagml:plan_fingerprint": envelope.plan_fingerprint,
847 "dagml:relation_fingerprint": envelope.relation_fingerprint,
848 }));
849 }
850
851 graph[1]["hasPart"] = Value::Array(has_part);
852
853 for manifest in plan.controller_manifests.values() {
854 graph.push(json!({
855 "@id": controller_agent_id(manifest.controller_id.as_str()),
856 "@type": ["SoftwareApplication", "dagml:Controller"],
857 "name": manifest.controller_id,
858 "softwareVersion": manifest.controller_version,
859 "dagml:operator_kind": manifest.operator_kind,
860 "dagml:capabilities": manifest.capabilities,
861 "dagml:artifact_policy": manifest.artifact_policy,
862 }));
863 }
864 for artifact in &bundle.refit_artifacts {
865 graph.push(json!({
866 "@id": artifact_entity_id(&artifact.artifact.id),
867 "@type": ["File", "dagml:ModelArtifact"],
868 "name": artifact.artifact.id,
869 "encodingFormat": artifact.artifact.kind,
870 "dagml:node_id": artifact.node_id,
871 "dagml:controller_id": artifact.controller_id,
872 "dagml:backend": artifact.artifact.backend,
873 "dagml:uri": artifact.artifact.uri,
874 "dagml:content_fingerprint": artifact.artifact.content_fingerprint,
875 "dagml:plugin": artifact.artifact.plugin,
876 "dagml:plugin_version": artifact.artifact.plugin_version,
877 "dagml:refit_data_requirement_keys": artifact.data_requirement_keys,
878 "dagml:refit_prediction_requirement_keys": artifact.prediction_requirement_keys,
879 }));
880 }
881
882 Ok(json!({
883 "@context": [
884 "https://w3id.org/ro/crate/1.1/context",
885 {
886 "dagml": "https://dag-ml.dev/ns#",
887 "prov": "http://www.w3.org/ns/prov#",
888 }
889 ],
890 "@graph": graph,
891 }))
892}
893
894fn file_entity(id: &str, name: &str, dagml_type: &str) -> Value {
895 json!({
896 "@id": id,
897 "@type": ["File", dagml_type],
898 "name": name,
899 })
900}
901
902fn add_json_package_file<T: Serialize + ?Sized>(
903 files: &mut BTreeMap<String, ResearchProvenancePackageFile>,
904 path: &str,
905 value: &T,
906 label: &str,
907) -> Result<()> {
908 validate_package_path(path)?;
909 let mut bytes = serde_json::to_vec_pretty(value).map_err(|err| {
910 DagMlError::RuntimeValidation(format!("failed to serialize {label}: {err}"))
911 })?;
912 bytes.push(b'\n');
913 let sha256 = sha256_hex(&bytes);
914 let previous = files.insert(
915 path.to_string(),
916 ResearchProvenancePackageFile {
917 path: path.to_string(),
918 sha256,
919 size_bytes: bytes.len(),
920 bytes,
921 },
922 );
923 if previous.is_some() {
924 return Err(DagMlError::RuntimeValidation(format!(
925 "duplicate research provenance package file `{path}`"
926 )));
927 }
928 Ok(())
929}
930
931fn validate_package_path(path: &str) -> Result<()> {
932 if path.is_empty() {
933 return Err(DagMlError::RuntimeValidation(
934 "research provenance package path is empty".to_string(),
935 ));
936 }
937 if path.starts_with('/') || path.starts_with('\\') {
938 return Err(DagMlError::RuntimeValidation(format!(
939 "research provenance package path `{path}` must be relative"
940 )));
941 }
942 if path.chars().any(char::is_control) {
943 return Err(DagMlError::RuntimeValidation(format!(
944 "research provenance package path `{path}` has control characters"
945 )));
946 }
947 for segment in path.split(['/', '\\']) {
948 if segment.is_empty() || segment == "." || segment == ".." {
949 return Err(DagMlError::RuntimeValidation(format!(
950 "research provenance package path `{path}` has an invalid path component"
951 )));
952 }
953 }
954 Ok(())
955}
956
957fn data_envelope_file_path(key: &str) -> Result<String> {
958 if key.contains(['/', '\\']) {
959 return Err(DagMlError::RuntimeValidation(format!(
960 "data envelope key `{key}` cannot be used as a research provenance package path"
961 )));
962 }
963 Ok(format!("data_envelopes/{key}.json"))
964}
965
966fn require_package_file<'a>(files: &'a BTreeMap<String, Vec<u8>>, path: &str) -> Result<&'a [u8]> {
967 files.get(path).map(Vec::as_slice).ok_or_else(|| {
968 DagMlError::RuntimeValidation(format!("research provenance package is missing `{path}`"))
969 })
970}
971
972fn parse_package_json<T: DeserializeOwned>(bytes: &[u8], path: &str) -> Result<T> {
973 serde_json::from_slice(bytes).map_err(|err| {
974 DagMlError::RuntimeValidation(format!(
975 "failed to parse research provenance package JSON `{path}`: {err}"
976 ))
977 })
978}
979
980fn parse_package_data_envelopes(
981 files: &BTreeMap<String, Vec<u8>>,
982) -> Result<BTreeMap<String, ExternalDataPlanEnvelope>> {
983 let mut envelopes = BTreeMap::new();
984 for (path, bytes) in files {
985 let Some(key) = path
986 .strip_prefix("data_envelopes/")
987 .and_then(|suffix| suffix.strip_suffix(".json"))
988 else {
989 continue;
990 };
991 if key.is_empty() || key.contains(['/', '\\']) {
992 return Err(DagMlError::RuntimeValidation(format!(
993 "research provenance data envelope path `{path}` has an invalid key"
994 )));
995 }
996 let previous = envelopes.insert(key.to_string(), parse_package_json(bytes, path)?);
997 if previous.is_some() {
998 return Err(DagMlError::RuntimeValidation(format!(
999 "duplicate research provenance data envelope key `{key}`"
1000 )));
1001 }
1002 }
1003 Ok(envelopes)
1004}
1005
1006fn validate_prov_jsonld_root(prov_jsonld: Value) -> Result<()> {
1007 if prov_jsonld.get("@context").is_none()
1008 || prov_jsonld.get("entity").is_none()
1009 || prov_jsonld.get("activity").is_none()
1010 || prov_jsonld.get("agent").is_none()
1011 {
1012 return Err(DagMlError::RuntimeValidation(
1013 "research provenance PROV JSON-LD root is missing required sections".to_string(),
1014 ));
1015 }
1016 Ok(())
1017}
1018
1019fn validate_openlineage_namespace(namespace: &str) -> Result<()> {
1020 if namespace.trim().is_empty() {
1021 return Err(DagMlError::RuntimeValidation(
1022 "OpenLineage namespace must not be empty".to_string(),
1023 ));
1024 }
1025 if namespace.chars().any(char::is_control) {
1026 return Err(DagMlError::RuntimeValidation(
1027 "OpenLineage namespace contains control characters".to_string(),
1028 ));
1029 }
1030 Ok(())
1031}
1032
1033fn validate_openlineage_event_time(event_time: &str) -> Result<()> {
1034 if event_time.trim().is_empty() || !event_time.contains('T') {
1035 return Err(DagMlError::RuntimeValidation(
1036 "OpenLineage event_time must be a non-empty RFC3339-like timestamp".to_string(),
1037 ));
1038 }
1039 if event_time.chars().any(char::is_control) {
1040 return Err(DagMlError::RuntimeValidation(
1041 "OpenLineage event_time contains control characters".to_string(),
1042 ));
1043 }
1044 Ok(())
1045}
1046
1047fn dagml_openlineage_reproducibility_run_facet(
1048 plan: &ExecutionPlan,
1049 bundle: &ExecutionBundle,
1050) -> Value {
1051 json!({
1052 "_schemaURL": format!("{DAGML_OPENLINEAGE_FACET_SCHEMA_URL}#/$defs/DagmlReproducibilityRunFacet"),
1053 "plan_id": plan.id,
1054 "bundle_id": bundle.bundle_id,
1055 "graph_fingerprint": bundle.graph_fingerprint,
1056 "campaign_fingerprint": bundle.campaign_fingerprint,
1057 "controller_fingerprint": bundle.controller_fingerprint,
1058 "selected_variant_id": bundle.selected_variant_id,
1059 "variant_count": plan.variants.len(),
1060 "unsafe_flags": bundle.unsafe_flags,
1061 })
1062}
1063
1064fn dagml_openlineage_oof_safety_run_facet(
1065 bundle: &ExecutionBundle,
1066 lineage: &[LineageRecord],
1067) -> Value {
1068 json!({
1069 "_schemaURL": format!("{DAGML_OPENLINEAGE_FACET_SCHEMA_URL}#/$defs/DagmlOofSafetyRunFacet"),
1070 "prediction_requirement_count": bundle.prediction_requirements.len(),
1071 "prediction_cache_count": bundle.prediction_caches.len(),
1072 "lineage_record_count": lineage.len(),
1073 "requires_oof_prediction_count": bundle.prediction_requirements.len(),
1074 "refit_artifact_count": bundle.refit_artifacts.len(),
1075 })
1076}
1077
1078fn dagml_openlineage_plan_job_facet(plan: &ExecutionPlan, bundle: &ExecutionBundle) -> Value {
1079 json!({
1080 "_schemaURL": format!("{DAGML_OPENLINEAGE_FACET_SCHEMA_URL}#/$defs/DagmlPlanJobFacet"),
1081 "plan_id": plan.id,
1082 "bundle_id": bundle.bundle_id,
1083 "node_count": plan.node_plans.len(),
1084 "controller_count": plan.controller_manifests.len(),
1085 "has_fold_set": plan.fold_set.is_some(),
1086 "selected_variant_id": bundle.selected_variant_id,
1087 })
1088}
1089
1090fn openlineage_input_datasets(
1091 bundle: &ExecutionBundle,
1092 data_envelopes: &BTreeMap<String, ExternalDataPlanEnvelope>,
1093) -> Vec<Value> {
1094 bundle
1095 .data_requirements
1096 .iter()
1097 .map(|requirement| {
1098 let key = requirement.key();
1099 let envelope = data_envelopes.get(&key);
1100 json!({
1101 "namespace": "dagml:data-requirement",
1102 "name": key,
1103 "facets": {
1104 "dagml_contract": {
1105 "_schemaURL": format!("{DAGML_OPENLINEAGE_FACET_SCHEMA_URL}#/$defs/DagmlDatasetContractFacet"),
1106 "node_id": requirement.node_id,
1107 "input_name": requirement.input_name,
1108 "schema_fingerprint": requirement.schema_fingerprint,
1109 "plan_fingerprint": requirement.plan_fingerprint,
1110 "relation_fingerprint": requirement.relation_fingerprint,
1111 "feature_set_id": requirement.feature_set_id,
1112 "envelope_schema_fingerprint": envelope.map(|envelope| envelope.schema_fingerprint.clone()),
1113 "envelope_plan_fingerprint": envelope.map(|envelope| envelope.plan_fingerprint.clone()),
1114 }
1115 }
1116 })
1117 })
1118 .collect()
1119}
1120
1121fn openlineage_output_datasets(
1122 bundle: &ExecutionBundle,
1123 prediction_cache_manifest: Option<&FilePredictionCacheManifest>,
1124 artifact_manifest: Option<&FileArtifactManifest>,
1125) -> Vec<Value> {
1126 let mut outputs = vec![json!({
1127 "namespace": "dagml:bundle",
1128 "name": bundle.bundle_id,
1129 "facets": {
1130 "dagml_contract": {
1131 "_schemaURL": format!("{DAGML_OPENLINEAGE_FACET_SCHEMA_URL}#/$defs/DagmlDatasetContractFacet"),
1132 "schema_version": bundle.schema_version,
1133 "plan_id": bundle.plan_id,
1134 "selected_variant_id": bundle.selected_variant_id,
1135 "graph_fingerprint": bundle.graph_fingerprint,
1136 "campaign_fingerprint": bundle.campaign_fingerprint,
1137 "controller_fingerprint": bundle.controller_fingerprint,
1138 }
1139 }
1140 })];
1141 for cache in &bundle.prediction_caches {
1142 outputs.push(json!({
1143 "namespace": "dagml:prediction-cache",
1144 "name": cache.cache_id,
1145 "facets": {
1146 "dagml_contract": {
1147 "_schemaURL": format!("{DAGML_OPENLINEAGE_FACET_SCHEMA_URL}#/$defs/DagmlDatasetContractFacet"),
1148 "requirement_key": cache.requirement_key,
1149 "prediction_level": cache.prediction_level,
1150 "row_count": cache.row_count,
1151 "block_count": cache.block_count,
1152 "content_fingerprint": cache.content_fingerprint,
1153 "has_file_manifest": prediction_cache_manifest.is_some(),
1154 }
1155 }
1156 }));
1157 }
1158 for artifact in &bundle.refit_artifacts {
1159 outputs.push(json!({
1160 "namespace": "dagml:artifact",
1161 "name": artifact.artifact.id,
1162 "facets": {
1163 "dagml_contract": {
1164 "_schemaURL": format!("{DAGML_OPENLINEAGE_FACET_SCHEMA_URL}#/$defs/DagmlDatasetContractFacet"),
1165 "node_id": artifact.node_id,
1166 "controller_id": artifact.controller_id,
1167 "backend": artifact.artifact.backend,
1168 "uri": artifact.artifact.uri,
1169 "content_fingerprint": artifact.artifact.content_fingerprint,
1170 "plugin": artifact.artifact.plugin,
1171 "plugin_version": artifact.artifact.plugin_version,
1172 "data_requirement_keys": artifact.data_requirement_keys,
1173 "prediction_requirement_keys": artifact.prediction_requirement_keys,
1174 "has_file_manifest": artifact_manifest.is_some(),
1175 }
1176 }
1177 }));
1178 }
1179 outputs
1180}
1181
1182fn openlineage_run_id(plan: &ExecutionPlan, bundle: &ExecutionBundle) -> String {
1183 let input = format!("dag-ml/openlineage/run/{}/{}", plan.id, bundle.bundle_id);
1184 let digest = Sha256::digest(input.as_bytes());
1185 let mut bytes = [0u8; 16];
1186 bytes.copy_from_slice(&digest[..16]);
1187 bytes[6] = (bytes[6] & 0x0f) | 0x50;
1188 bytes[8] = (bytes[8] & 0x3f) | 0x80;
1189 format!(
1190 "{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
1191 bytes[0],
1192 bytes[1],
1193 bytes[2],
1194 bytes[3],
1195 bytes[4],
1196 bytes[5],
1197 bytes[6],
1198 bytes[7],
1199 bytes[8],
1200 bytes[9],
1201 bytes[10],
1202 bytes[11],
1203 bytes[12],
1204 bytes[13],
1205 bytes[14],
1206 bytes[15],
1207 )
1208}
1209
1210fn validate_ro_crate_package_checksums(
1211 ro_crate_metadata: &Value,
1212 files: &BTreeMap<String, Vec<u8>>,
1213) -> Result<usize> {
1214 let graph = ro_crate_metadata
1215 .get("@graph")
1216 .and_then(Value::as_array)
1217 .ok_or_else(|| {
1218 DagMlError::RuntimeValidation("RO-Crate metadata has no @graph array".to_string())
1219 })?;
1220 let root = graph
1221 .iter()
1222 .find(|entry| entry.get("@id").and_then(Value::as_str) == Some("./"))
1223 .ok_or_else(|| {
1224 DagMlError::RuntimeValidation("RO-Crate metadata has no root dataset".to_string())
1225 })?;
1226 if root.get("dagml:schema_version").and_then(Value::as_u64)
1227 != Some(RESEARCH_PROVENANCE_SCHEMA_VERSION as u64)
1228 {
1229 return Err(DagMlError::RuntimeValidation(format!(
1230 "RO-Crate root has unsupported dagml:schema_version, expected {}",
1231 RESEARCH_PROVENANCE_SCHEMA_VERSION
1232 )));
1233 }
1234 let root_has_part = root
1235 .get("hasPart")
1236 .and_then(Value::as_array)
1237 .ok_or_else(|| {
1238 DagMlError::RuntimeValidation("RO-Crate root hasPart is not an array".to_string())
1239 })?;
1240 let root_has_part_ids = root_has_part
1241 .iter()
1242 .filter_map(|entry| entry.get("@id").and_then(Value::as_str))
1243 .collect::<BTreeSet<_>>();
1244
1245 let mut checksummed = 0;
1246 for (path, bytes) in files {
1247 if path == RO_CRATE_METADATA_FILE {
1248 continue;
1249 }
1250 if !root_has_part_ids.contains(path.as_str()) {
1251 return Err(DagMlError::RuntimeValidation(format!(
1252 "RO-Crate root does not list package file `{path}` in hasPart"
1253 )));
1254 }
1255 let entry = graph
1256 .iter()
1257 .find(|entry| entry.get("@id").and_then(Value::as_str) == Some(path.as_str()))
1258 .ok_or_else(|| {
1259 DagMlError::RuntimeValidation(format!(
1260 "RO-Crate metadata is missing package file entry `{path}`"
1261 ))
1262 })?;
1263 let expected_sha256 = sha256_hex(bytes);
1264 let declared_sha256 = entry.get("sha256").and_then(Value::as_str).ok_or_else(|| {
1265 DagMlError::RuntimeValidation(format!(
1266 "RO-Crate package file `{path}` is missing sha256"
1267 ))
1268 })?;
1269 if declared_sha256 != expected_sha256 {
1270 return Err(DagMlError::RuntimeValidation(format!(
1271 "RO-Crate package file `{path}` sha256 mismatch"
1272 )));
1273 }
1274 let declared_dagml_sha256 = entry
1275 .get("dagml:sha256")
1276 .and_then(Value::as_str)
1277 .ok_or_else(|| {
1278 DagMlError::RuntimeValidation(format!(
1279 "RO-Crate package file `{path}` is missing dagml:sha256"
1280 ))
1281 })?;
1282 if declared_dagml_sha256 != declared_sha256 {
1283 return Err(DagMlError::RuntimeValidation(format!(
1284 "RO-Crate package file `{path}` has inconsistent checksum fields"
1285 )));
1286 }
1287 if entry.get("contentSize").and_then(Value::as_u64) != Some(bytes.len() as u64) {
1288 return Err(DagMlError::RuntimeValidation(format!(
1289 "RO-Crate package file `{path}` contentSize mismatch"
1290 )));
1291 }
1292 if entry.get("encodingFormat").and_then(Value::as_str) != Some("application/json") {
1293 return Err(DagMlError::RuntimeValidation(format!(
1294 "RO-Crate package file `{path}` must declare application/json"
1295 )));
1296 }
1297 checksummed += 1;
1298 }
1299
1300 for has_part_id in root_has_part_ids {
1301 if has_part_id != RO_CRATE_METADATA_FILE && !files.contains_key(has_part_id) {
1302 return Err(DagMlError::RuntimeValidation(format!(
1303 "RO-Crate root references missing package file `{has_part_id}`"
1304 )));
1305 }
1306 }
1307
1308 Ok(checksummed)
1309}
1310
1311fn annotate_ro_crate_package_files(
1312 ro_crate_metadata: &mut Value,
1313 files: &BTreeMap<String, ResearchProvenancePackageFile>,
1314) -> Result<()> {
1315 let graph = ro_crate_metadata
1316 .get_mut("@graph")
1317 .and_then(Value::as_array_mut)
1318 .ok_or_else(|| {
1319 DagMlError::RuntimeValidation("RO-Crate metadata has no @graph array".to_string())
1320 })?;
1321
1322 let mut existing_ids = graph
1323 .iter()
1324 .filter_map(|entry| entry.get("@id").and_then(Value::as_str).map(str::to_string))
1325 .collect::<BTreeSet<_>>();
1326 for file in files.values() {
1327 if !existing_ids.contains(&file.path) {
1328 graph.push(file_entity(
1329 &file.path,
1330 &format!("DAG-ML contract file {}", file.path),
1331 "dagml:ContractArtifact",
1332 ));
1333 existing_ids.insert(file.path.clone());
1334 }
1335 }
1336
1337 for entry in graph.iter_mut() {
1338 let Some(id) = entry.get("@id").and_then(Value::as_str).map(str::to_string) else {
1339 continue;
1340 };
1341 let Some(file) = files.get(id.as_str()) else {
1342 continue;
1343 };
1344 let object = entry.as_object_mut().ok_or_else(|| {
1345 DagMlError::RuntimeValidation(format!("RO-Crate graph entry `{id}` is not an object"))
1346 })?;
1347 object.insert("encodingFormat".to_string(), json!("application/json"));
1348 object.insert("contentSize".to_string(), json!(file.size_bytes));
1349 object.insert("sha256".to_string(), json!(file.sha256));
1350 object.insert("dagml:sha256".to_string(), json!(file.sha256));
1351 }
1352
1353 let root = graph
1354 .iter_mut()
1355 .find(|entry| entry.get("@id") == Some(&json!("./")))
1356 .ok_or_else(|| {
1357 DagMlError::RuntimeValidation("RO-Crate metadata has no root dataset".to_string())
1358 })?;
1359 let root_object = root.as_object_mut().ok_or_else(|| {
1360 DagMlError::RuntimeValidation("RO-Crate root dataset is not an object".to_string())
1361 })?;
1362 let has_part = root_object
1363 .entry("hasPart".to_string())
1364 .or_insert_with(|| Value::Array(Vec::new()));
1365 let has_part = has_part.as_array_mut().ok_or_else(|| {
1366 DagMlError::RuntimeValidation("RO-Crate root hasPart is not an array".to_string())
1367 })?;
1368 let mut has_part_ids = has_part
1369 .iter()
1370 .filter_map(|entry| entry.get("@id").and_then(Value::as_str).map(str::to_string))
1371 .collect::<BTreeSet<_>>();
1372 for path in files.keys() {
1373 if has_part_ids.insert(path.clone()) {
1374 has_part.push(json!({"@id": path}));
1375 }
1376 }
1377 Ok(())
1378}
1379
1380fn sha256_hex(bytes: &[u8]) -> String {
1381 let digest = Sha256::digest(bytes);
1382 let mut out = String::with_capacity(digest.len() * 2);
1383 for byte in digest {
1384 use std::fmt::Write;
1385 write!(&mut out, "{byte:02x}").expect("writing to string cannot fail");
1386 }
1387 out
1388}
1389
1390fn lineage_artifact_index(lineage: &[LineageRecord]) -> BTreeMap<ArtifactId, String> {
1391 let mut index = BTreeMap::new();
1392 for record in lineage {
1393 for artifact in &record.artifact_refs {
1394 index.insert(artifact.id.clone(), lineage_activity_id(record));
1395 }
1396 }
1397 index
1398}
1399
1400fn lineage_activity_id(record: &LineageRecord) -> String {
1401 format!("dagml:activity:{}", record.record_id)
1402}
1403
1404fn controller_agent_id(controller_id: &str) -> String {
1405 format!("dagml:controller:{controller_id}")
1406}
1407
1408fn artifact_entity_id(artifact_id: &ArtifactId) -> String {
1409 format!("dagml:artifact:{artifact_id}")
1410}
1411
1412fn lineage_record_entity_id(lineage_id: &LineageId) -> String {
1413 format!("dagml:lineage-record:{lineage_id}")
1414}
1415
1416fn data_requirement_entity_id(key: &str) -> String {
1417 format!("dagml:data-requirement:{key}")
1418}
1419
1420fn data_envelope_entity_id(key: &str) -> String {
1421 format!("dagml:data-envelope:{key}")
1422}
1423
1424fn prediction_requirement_entity_id(key: &str) -> String {
1425 format!("dagml:prediction-requirement:{key}")
1426}
1427
1428fn prediction_cache_entity_id(cache_id: &str) -> String {
1429 format!("dagml:prediction-cache:{cache_id}")
1430}
1431
1432#[cfg(test)]
1433mod tests {
1434 use super::*;
1435 use crate::controller::{ControllerManifest, ControllerRegistry};
1436 use crate::ids::{ControllerId, LineageId, NodeId, RunId};
1437 use crate::plan::build_execution_plan;
1438 use crate::{CampaignSpec, GraphSpec, Phase};
1439
1440 fn fixture_plan() -> ExecutionPlan {
1441 let graph: GraphSpec =
1442 serde_json::from_str(include_str!("../../../examples/minimal_graph.json")).unwrap();
1443 let campaign: CampaignSpec = serde_json::from_str(include_str!(
1444 "../../../examples/campaign_oof_generation.json"
1445 ))
1446 .unwrap();
1447 let manifests: Vec<ControllerManifest> =
1448 serde_json::from_str(include_str!("../../../examples/controller_manifests.json"))
1449 .unwrap();
1450 let mut registry = ControllerRegistry::new();
1451 for manifest in manifests {
1452 registry.register(manifest).unwrap();
1453 }
1454 build_execution_plan("plan:cli.bundle", graph, campaign, ®istry).unwrap()
1455 }
1456
1457 fn fixture_bundle() -> ExecutionBundle {
1458 serde_json::from_str(include_str!(
1459 "../../../examples/generated/execution_bundle_minimal.json"
1460 ))
1461 .unwrap()
1462 }
1463
1464 fn fixture_lineage(plan: &ExecutionPlan) -> LineageRecord {
1465 let node_id = NodeId::new("model:base").unwrap();
1466 let node_plan = plan.node_plans.get(&node_id).unwrap();
1467 LineageRecord {
1468 record_id: LineageId::new("lineage:test:model:base").unwrap(),
1469 run_id: RunId::new("run:provenance").unwrap(),
1470 node_id,
1471 phase: Phase::Refit,
1472 controller_id: node_plan.controller_id.clone(),
1473 controller_version: node_plan.controller_version.clone(),
1474 variant_id: plan
1475 .variants
1476 .first()
1477 .map(|variant| variant.variant_id.clone()),
1478 fold_id: None,
1479 branch_path: Vec::new(),
1480 input_lineage: Vec::new(),
1481 artifact_refs: vec![fixture_bundle().refit_artifacts[0].artifact.clone()],
1482 params_fingerprint: node_plan.params_fingerprint.clone(),
1483 data_model_shape_fingerprint: None,
1484 aggregation_policy_fingerprint: None,
1485 seed: Some(42),
1486 unsafe_flags: BTreeSet::new(),
1487 metrics: BTreeMap::new(),
1488 }
1489 }
1490
1491 #[test]
1492 fn research_provenance_export_contains_prov_and_ro_crate_contracts() {
1493 let plan = fixture_plan();
1494 let bundle = fixture_bundle();
1495 let lineage = vec![fixture_lineage(&plan)];
1496 let export = build_research_provenance_export(
1497 &plan,
1498 &bundle,
1499 &lineage,
1500 &BTreeMap::new(),
1501 None,
1502 None,
1503 )
1504 .unwrap();
1505
1506 assert_eq!(export.schema_version, RESEARCH_PROVENANCE_SCHEMA_VERSION);
1507 assert!(export.prov_jsonld["@context"]["prov"]
1508 .as_str()
1509 .unwrap()
1510 .contains("prov"));
1511 assert!(export.prov_jsonld["activity"]
1512 .as_object()
1513 .unwrap()
1514 .contains_key("dagml:activity:lineage:test:model:base"));
1515 assert!(export.prov_jsonld["agent"]
1516 .as_object()
1517 .unwrap()
1518 .contains_key("dagml:controller:controller:model.mock"));
1519 assert!(export.prov_jsonld["entity"]
1520 .as_object()
1521 .unwrap()
1522 .contains_key("dagml:artifact:artifact:model:base:refit"));
1523
1524 let graph = export.ro_crate_metadata["@graph"].as_array().unwrap();
1525 assert!(graph
1526 .iter()
1527 .any(|entry| entry["@type"].to_string().contains("ComputationalWorkflow")));
1528 assert!(graph
1529 .iter()
1530 .any(|entry| entry["@id"] == json!("lineage.prov.jsonld")));
1531 assert!(graph
1532 .iter()
1533 .any(|entry| entry["@id"] == json!("execution_bundle.json")));
1534 }
1535
1536 #[test]
1537 fn research_provenance_package_contains_contract_files_and_checksums() {
1538 let plan = fixture_plan();
1539 let bundle = fixture_bundle();
1540 let lineage = vec![fixture_lineage(&plan)];
1541 let package = build_research_provenance_package(
1542 &plan,
1543 &bundle,
1544 &lineage,
1545 &BTreeMap::new(),
1546 None,
1547 None,
1548 )
1549 .unwrap();
1550
1551 for path in [
1552 EXECUTION_PLAN_FILE,
1553 EXECUTION_BUNDLE_FILE,
1554 LINEAGE_RECORDS_FILE,
1555 PROV_JSONLD_FILE,
1556 RO_CRATE_METADATA_FILE,
1557 ] {
1558 assert!(
1559 package.files.contains_key(path),
1560 "package is missing {path}"
1561 );
1562 }
1563 for (path, file) in &package.files {
1564 assert_eq!(file.path, *path);
1565 assert_eq!(file.sha256.len(), 64, "invalid sha256 for {path}");
1566 assert!(file.size_bytes > 0, "empty package file {path}");
1567 assert_eq!(file.size_bytes, file.bytes.len());
1568 }
1569
1570 let ro_crate_file = package.files.get(RO_CRATE_METADATA_FILE).unwrap();
1571 let ro_crate_metadata: Value = serde_json::from_slice(&ro_crate_file.bytes).unwrap();
1572 let graph = ro_crate_metadata["@graph"].as_array().unwrap();
1573 for path in [
1574 EXECUTION_PLAN_FILE,
1575 EXECUTION_BUNDLE_FILE,
1576 LINEAGE_RECORDS_FILE,
1577 PROV_JSONLD_FILE,
1578 ] {
1579 let entry = graph
1580 .iter()
1581 .find(|entry| entry["@id"] == json!(path))
1582 .unwrap_or_else(|| panic!("RO-Crate metadata is missing file entry {path}"));
1583 assert_eq!(entry["sha256"].as_str().map(str::len), Some(64));
1584 assert_eq!(entry["dagml:sha256"].as_str(), entry["sha256"].as_str());
1585 assert_eq!(entry["encodingFormat"], json!("application/json"));
1586 assert!(entry["contentSize"].as_u64().unwrap() > 0);
1587 }
1588 let root = graph
1589 .iter()
1590 .find(|entry| entry["@id"] == json!("./"))
1591 .expect("RO-Crate root dataset is present");
1592 let has_part = root["hasPart"].as_array().unwrap();
1593 assert!(has_part
1594 .iter()
1595 .any(|entry| entry["@id"] == json!(LINEAGE_RECORDS_FILE)));
1596 }
1597
1598 #[test]
1599 fn research_provenance_package_validation_reopens_exported_contracts() {
1600 let plan = fixture_plan();
1601 let bundle = fixture_bundle();
1602 let lineage = vec![fixture_lineage(&plan)];
1603 let package = build_research_provenance_package(
1604 &plan,
1605 &bundle,
1606 &lineage,
1607 &BTreeMap::new(),
1608 None,
1609 None,
1610 )
1611 .unwrap();
1612 let files = package
1613 .files
1614 .iter()
1615 .map(|(path, file)| (path.clone(), file.bytes.clone()))
1616 .collect::<BTreeMap<_, _>>();
1617
1618 let validation = validate_research_provenance_package_files(&files).unwrap();
1619
1620 assert_eq!(
1621 validation.schema_version,
1622 RESEARCH_PROVENANCE_SCHEMA_VERSION
1623 );
1624 assert_eq!(validation.plan_id, plan.id.to_string());
1625 assert_eq!(validation.bundle_id, bundle.bundle_id.to_string());
1626 assert_eq!(validation.file_count, package.files.len());
1627 assert_eq!(validation.checksummed_file_count, package.files.len() - 1);
1628 assert_eq!(validation.lineage_record_count, 1);
1629 }
1630
1631 #[test]
1632 fn research_provenance_package_validation_refuses_tampered_file() {
1633 let plan = fixture_plan();
1634 let bundle = fixture_bundle();
1635 let package =
1636 build_research_provenance_package(&plan, &bundle, &[], &BTreeMap::new(), None, None)
1637 .unwrap();
1638 let mut files = package
1639 .files
1640 .iter()
1641 .map(|(path, file)| (path.clone(), file.bytes.clone()))
1642 .collect::<BTreeMap<_, _>>();
1643 files.insert(LINEAGE_RECORDS_FILE.to_string(), b"[]\n\n".to_vec());
1644
1645 let error = validate_research_provenance_package_files(&files)
1646 .unwrap_err()
1647 .to_string();
1648
1649 assert!(
1650 error.contains("sha256 mismatch"),
1651 "unexpected error: {error}"
1652 );
1653 }
1654
1655 #[test]
1656 fn openlineage_export_uses_validated_research_package() {
1657 let plan = fixture_plan();
1658 let bundle = fixture_bundle();
1659 let lineage = vec![fixture_lineage(&plan)];
1660 let package = build_research_provenance_package(
1661 &plan,
1662 &bundle,
1663 &lineage,
1664 &BTreeMap::new(),
1665 None,
1666 None,
1667 )
1668 .unwrap();
1669 let files = package
1670 .files
1671 .iter()
1672 .map(|(path, file)| (path.clone(), file.bytes.clone()))
1673 .collect::<BTreeMap<_, _>>();
1674
1675 let event = build_openlineage_run_event_from_package_files(
1676 &files,
1677 "dag-ml-test",
1678 "2026-05-27T00:00:00Z",
1679 )
1680 .unwrap();
1681
1682 assert_eq!(event["eventType"], json!("COMPLETE"));
1683 assert_eq!(event["schemaURL"], json!(OPENLINEAGE_RUN_EVENT_SCHEMA_URL));
1684 assert_eq!(event["job"]["namespace"], json!("dag-ml-test"));
1685 assert_eq!(event["run"]["runId"].as_str().map(str::len), Some(36));
1686 assert!(
1687 event["run"]["facets"]["dagml_reproducibility"]["graph_fingerprint"]
1688 .as_str()
1689 .is_some()
1690 );
1691 assert!(!event["inputs"].as_array().unwrap().is_empty());
1692 assert!(event["outputs"]
1693 .as_array()
1694 .unwrap()
1695 .iter()
1696 .any(|output| output["namespace"] == json!("dagml:bundle")));
1697 }
1698
1699 #[test]
1700 fn research_provenance_export_refuses_unknown_lineage_node() {
1701 let plan = fixture_plan();
1702 let bundle = fixture_bundle();
1703 let mut lineage = fixture_lineage(&plan);
1704 lineage.node_id = NodeId::new("model:missing").unwrap();
1705
1706 let error = build_research_provenance_export(
1707 &plan,
1708 &bundle,
1709 &[lineage],
1710 &BTreeMap::new(),
1711 None,
1712 None,
1713 )
1714 .unwrap_err()
1715 .to_string();
1716
1717 assert!(error.contains("unknown node"), "unexpected error: {error}");
1718 }
1719
1720 #[test]
1721 fn research_provenance_export_refuses_mismatched_artifact_manifest() {
1722 let plan = fixture_plan();
1723 let bundle = fixture_bundle();
1724 let mut manifest = FileArtifactManifest {
1725 bundle_id: bundle.bundle_id.clone(),
1726 schema_version: crate::runtime::FILE_ARTIFACT_MANIFEST_SCHEMA_VERSION,
1727 artifacts: Vec::new(),
1728 };
1729 manifest.bundle_id = crate::ids::BundleId::new("bundle:wrong").unwrap();
1730
1731 let error = build_research_provenance_export(
1732 &plan,
1733 &bundle,
1734 &[],
1735 &BTreeMap::new(),
1736 None,
1737 Some(&manifest),
1738 )
1739 .unwrap_err()
1740 .to_string();
1741
1742 assert!(
1743 error.contains("does not match bundle"),
1744 "unexpected error: {error}"
1745 );
1746 }
1747
1748 #[test]
1749 fn research_provenance_export_refuses_unknown_lineage_controller() {
1750 let plan = fixture_plan();
1751 let bundle = fixture_bundle();
1752 let mut lineage = fixture_lineage(&plan);
1753 lineage.controller_id = ControllerId::new("controller:missing").unwrap();
1754
1755 let error = build_research_provenance_export(
1756 &plan,
1757 &bundle,
1758 &[lineage],
1759 &BTreeMap::new(),
1760 None,
1761 None,
1762 )
1763 .unwrap_err()
1764 .to_string();
1765
1766 assert!(
1767 error.contains("unknown controller"),
1768 "unexpected error: {error}"
1769 );
1770 }
1771}