Skip to main content

datasynth_runtime/
prov.rs

1//! W3C PROV-JSON export for data lineage interoperability.
2//!
3//! Exports generation lineage in the W3C PROV-JSON format as defined by
4//! <https://www.w3.org/Submission/2013/SUBM-prov-json-20130424/>.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9use super::run_manifest::RunManifest;
10
11/// A W3C PROV-JSON document.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct ProvDocument {
14    /// Namespace prefixes.
15    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
16    pub prefix: HashMap<String, String>,
17    /// Entities (data artifacts).
18    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
19    pub entity: HashMap<String, ProvEntity>,
20    /// Activities (processes).
21    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
22    pub activity: HashMap<String, ProvActivity>,
23    /// Agents (software/people).
24    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
25    pub agent: HashMap<String, ProvAgent>,
26    /// Generation relationships.
27    #[serde(
28        default,
29        rename = "wasGeneratedBy",
30        skip_serializing_if = "HashMap::is_empty"
31    )]
32    pub was_generated_by: HashMap<String, ProvGeneration>,
33    /// Usage relationships.
34    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
35    pub used: HashMap<String, ProvUsage>,
36    /// Attribution relationships.
37    #[serde(
38        default,
39        rename = "wasAttributedTo",
40        skip_serializing_if = "HashMap::is_empty"
41    )]
42    pub was_attributed_to: HashMap<String, ProvAttribution>,
43    /// Derivation relationships.
44    #[serde(
45        default,
46        rename = "wasDerivedFrom",
47        skip_serializing_if = "HashMap::is_empty"
48    )]
49    pub was_derived_from: HashMap<String, ProvDerivation>,
50}
51
52/// A PROV entity.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct ProvEntity {
55    /// Entity type.
56    #[serde(rename = "prov:type", skip_serializing_if = "Option::is_none")]
57    pub prov_type: Option<String>,
58    /// Entity label.
59    #[serde(rename = "prov:label", skip_serializing_if = "Option::is_none")]
60    pub prov_label: Option<String>,
61    /// Additional attributes.
62    #[serde(flatten)]
63    pub attributes: HashMap<String, serde_json::Value>,
64}
65
66/// A PROV activity.
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct ProvActivity {
69    /// Activity type.
70    #[serde(rename = "prov:type", skip_serializing_if = "Option::is_none")]
71    pub prov_type: Option<String>,
72    /// Activity label.
73    #[serde(rename = "prov:label", skip_serializing_if = "Option::is_none")]
74    pub prov_label: Option<String>,
75    /// Start time.
76    #[serde(rename = "prov:startTime", skip_serializing_if = "Option::is_none")]
77    pub start_time: Option<String>,
78    /// End time.
79    #[serde(rename = "prov:endTime", skip_serializing_if = "Option::is_none")]
80    pub end_time: Option<String>,
81    /// Additional attributes.
82    #[serde(flatten)]
83    pub attributes: HashMap<String, serde_json::Value>,
84}
85
86/// A PROV agent.
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct ProvAgent {
89    /// Agent type.
90    #[serde(rename = "prov:type", skip_serializing_if = "Option::is_none")]
91    pub prov_type: Option<String>,
92    /// Agent label.
93    #[serde(rename = "prov:label", skip_serializing_if = "Option::is_none")]
94    pub prov_label: Option<String>,
95    /// Additional attributes.
96    #[serde(flatten)]
97    pub attributes: HashMap<String, serde_json::Value>,
98}
99
100/// A PROV generation relationship.
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct ProvGeneration {
103    /// The generated entity.
104    #[serde(rename = "prov:entity")]
105    pub entity: String,
106    /// The activity that generated it.
107    #[serde(rename = "prov:activity")]
108    pub activity: String,
109}
110
111/// A PROV usage relationship.
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct ProvUsage {
114    /// The activity that used the entity.
115    #[serde(rename = "prov:activity")]
116    pub activity: String,
117    /// The entity that was used.
118    #[serde(rename = "prov:entity")]
119    pub entity: String,
120}
121
122/// A PROV attribution relationship.
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct ProvAttribution {
125    /// The entity that is attributed.
126    #[serde(rename = "prov:entity")]
127    pub entity: String,
128    /// The agent it is attributed to.
129    #[serde(rename = "prov:agent")]
130    pub agent: String,
131}
132
133/// A PROV derivation relationship.
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct ProvDerivation {
136    /// The derived entity.
137    #[serde(rename = "prov:generatedEntity")]
138    pub generated_entity: String,
139    /// The source entity.
140    #[serde(rename = "prov:usedEntity")]
141    pub used_entity: String,
142}
143
144/// Converts a RunManifest into a W3C PROV-JSON document.
145pub fn manifest_to_prov(manifest: &RunManifest) -> ProvDocument {
146    let mut doc = ProvDocument {
147        prefix: HashMap::new(),
148        entity: HashMap::new(),
149        activity: HashMap::new(),
150        agent: HashMap::new(),
151        was_generated_by: HashMap::new(),
152        used: HashMap::new(),
153        was_attributed_to: HashMap::new(),
154        was_derived_from: HashMap::new(),
155    };
156
157    // Prefixes
158    doc.prefix
159        .insert("dsf".to_string(), "https://datasynth.io/ns/".to_string());
160    doc.prefix
161        .insert("prov".to_string(), "http://www.w3.org/ns/prov#".to_string());
162
163    let run_id = &manifest.run_id;
164
165    // The generation run as an Activity
166    let activity_id = format!("dsf:run/{}", run_id);
167    doc.activity.insert(
168        activity_id.clone(),
169        ProvActivity {
170            prov_type: Some("dsf:GenerationRun".to_string()),
171            prov_label: Some(format!("DataSynth generation run {}", run_id)),
172            start_time: Some(manifest.started_at.to_rfc3339()),
173            end_time: manifest.completed_at.map(|t| t.to_rfc3339()),
174            attributes: {
175                let mut attrs = HashMap::new();
176                attrs.insert(
177                    "dsf:seed".to_string(),
178                    serde_json::Value::Number(manifest.seed.into()),
179                );
180                attrs.insert(
181                    "dsf:generatorVersion".to_string(),
182                    serde_json::Value::String(manifest.generator_version.clone()),
183                );
184                attrs
185            },
186        },
187    );
188
189    // DataSynth as the Agent
190    let agent_id = format!("dsf:agent/datasynth-{}", manifest.generator_version);
191    doc.agent.insert(
192        agent_id.clone(),
193        ProvAgent {
194            prov_type: Some("prov:SoftwareAgent".to_string()),
195            prov_label: Some(format!("DataSynth v{}", manifest.generator_version)),
196            attributes: HashMap::new(),
197        },
198    );
199
200    // Config as an input Entity
201    let config_entity_id = format!("dsf:config/{}", manifest.config_hash);
202    doc.entity.insert(
203        config_entity_id.clone(),
204        ProvEntity {
205            prov_type: Some("dsf:GeneratorConfig".to_string()),
206            prov_label: Some("Generation configuration".to_string()),
207            attributes: {
208                let mut attrs = HashMap::new();
209                attrs.insert(
210                    "dsf:configHash".to_string(),
211                    serde_json::Value::String(manifest.config_hash.clone()),
212                );
213                attrs
214            },
215        },
216    );
217
218    // Activity used the config
219    doc.used.insert(
220        format!("dsf:usage/{}/config", run_id),
221        ProvUsage {
222            activity: activity_id.clone(),
223            entity: config_entity_id,
224        },
225    );
226
227    // Each output file as an Entity
228    for (i, file_info) in manifest.output_files.iter().enumerate() {
229        let entity_id = format!("dsf:output/{}/{}", run_id, file_info.path.replace('/', "_"));
230        let mut attrs = HashMap::new();
231        attrs.insert(
232            "dsf:format".to_string(),
233            serde_json::Value::String(file_info.format.clone()),
234        );
235        if let Some(count) = file_info.record_count {
236            attrs.insert("dsf:recordCount".to_string(), serde_json::json!(count));
237        }
238        if let Some(size) = file_info.size_bytes {
239            attrs.insert("dsf:sizeBytes".to_string(), serde_json::json!(size));
240        }
241        if let Some(ref checksum) = file_info.sha256_checksum {
242            attrs.insert(
243                "dsf:sha256".to_string(),
244                serde_json::Value::String(checksum.clone()),
245            );
246        }
247
248        doc.entity.insert(
249            entity_id.clone(),
250            ProvEntity {
251                prov_type: Some("dsf:OutputFile".to_string()),
252                prov_label: Some(file_info.path.clone()),
253                attributes: attrs,
254            },
255        );
256
257        // wasGeneratedBy
258        doc.was_generated_by.insert(
259            format!("dsf:gen/{}/{}", run_id, i),
260            ProvGeneration {
261                entity: entity_id.clone(),
262                activity: activity_id.clone(),
263            },
264        );
265
266        // wasAttributedTo
267        doc.was_attributed_to.insert(
268            format!("dsf:attr/{}/{}", run_id, i),
269            ProvAttribution {
270                entity: entity_id,
271                agent: agent_id.clone(),
272            },
273        );
274    }
275
276    doc
277}
278
279#[cfg(test)]
280#[allow(clippy::unwrap_used)]
281mod tests {
282    use super::*;
283    use crate::run_manifest::{OutputFileInfo, RunManifest};
284    use datasynth_config::schema::*;
285
286    fn create_test_manifest() -> RunManifest {
287        let config = GeneratorConfig {
288            global: GlobalConfig {
289                industry: datasynth_core::models::IndustrySector::Manufacturing,
290                start_date: "2024-01-01".to_string(),
291                period_months: 1,
292                seed: Some(42),
293                parallel: false,
294                group_currency: "USD".to_string(),
295                worker_threads: 1,
296                memory_limit_mb: 512,
297            },
298            companies: vec![CompanyConfig {
299                code: "TEST".to_string(),
300                name: "Test Company".to_string(),
301                currency: "USD".to_string(),
302                country: "US".to_string(),
303                annual_transaction_volume: TransactionVolume::TenK,
304                volume_weight: 1.0,
305                fiscal_year_variant: "K4".to_string(),
306            }],
307            chart_of_accounts: ChartOfAccountsConfig::default(),
308            transactions: TransactionConfig::default(),
309            output: OutputConfig::default(),
310            fraud: FraudConfig::default(),
311            internal_controls: InternalControlsConfig::default(),
312            business_processes: BusinessProcessConfig::default(),
313            user_personas: UserPersonaConfig::default(),
314            templates: TemplateConfig::default(),
315            approval: ApprovalConfig::default(),
316            departments: DepartmentConfig::default(),
317            master_data: MasterDataConfig::default(),
318            document_flows: DocumentFlowConfig::default(),
319            intercompany: IntercompanyConfig::default(),
320            balance: BalanceConfig::default(),
321            ocpm: OcpmConfig::default(),
322            audit: AuditGenerationConfig::default(),
323            banking: datasynth_banking::BankingConfig::default(),
324            data_quality: DataQualitySchemaConfig::default(),
325            scenario: ScenarioConfig::default(),
326            temporal: TemporalDriftConfig::default(),
327            graph_export: GraphExportConfig::default(),
328            streaming: StreamingSchemaConfig::default(),
329            rate_limit: RateLimitSchemaConfig::default(),
330            temporal_attributes: TemporalAttributeSchemaConfig::default(),
331            relationships: RelationshipSchemaConfig::default(),
332            accounting_standards: AccountingStandardsConfig::default(),
333            audit_standards: AuditStandardsConfig::default(),
334            distributions: Default::default(),
335            temporal_patterns: Default::default(),
336            vendor_network: VendorNetworkSchemaConfig::default(),
337            customer_segmentation: CustomerSegmentationSchemaConfig::default(),
338            relationship_strength: RelationshipStrengthSchemaConfig::default(),
339            cross_process_links: CrossProcessLinksSchemaConfig::default(),
340            organizational_events: OrganizationalEventsSchemaConfig::default(),
341            behavioral_drift: BehavioralDriftSchemaConfig::default(),
342            market_drift: MarketDriftSchemaConfig::default(),
343            drift_labeling: DriftLabelingSchemaConfig::default(),
344            anomaly_injection: Default::default(),
345            industry_specific: Default::default(),
346            fingerprint_privacy: Default::default(),
347            quality_gates: Default::default(),
348            compliance: Default::default(),
349            webhooks: Default::default(),
350            llm: Default::default(),
351            diffusion: Default::default(),
352            causal: Default::default(),
353            source_to_pay: Default::default(),
354            financial_reporting: Default::default(),
355            hr: Default::default(),
356            manufacturing: Default::default(),
357            sales_quotes: Default::default(),
358        };
359
360        let mut manifest = RunManifest::new(&config, 42);
361        manifest.add_output_file(OutputFileInfo {
362            path: "journal_entries.csv".to_string(),
363            format: "csv".to_string(),
364            record_count: Some(1000),
365            size_bytes: Some(102400),
366            sha256_checksum: Some("abc123".to_string()),
367            first_record_index: Some(0),
368            last_record_index: Some(999),
369        });
370        manifest.add_output_file(OutputFileInfo {
371            path: "vendors.csv".to_string(),
372            format: "csv".to_string(),
373            record_count: Some(50),
374            size_bytes: None,
375            sha256_checksum: None,
376            first_record_index: None,
377            last_record_index: None,
378        });
379        manifest
380    }
381
382    #[test]
383    fn test_manifest_to_prov_structure() {
384        let manifest = create_test_manifest();
385        let prov = manifest_to_prov(&manifest);
386
387        // Should have prefixes
388        assert!(prov.prefix.contains_key("dsf"));
389        assert!(prov.prefix.contains_key("prov"));
390
391        // Should have 1 activity (the run)
392        assert_eq!(prov.activity.len(), 1);
393
394        // Should have 1 agent (DataSynth)
395        assert_eq!(prov.agent.len(), 1);
396
397        // Should have 3 entities: config + 2 output files
398        assert_eq!(prov.entity.len(), 3);
399
400        // Each output file should have wasGeneratedBy
401        assert_eq!(prov.was_generated_by.len(), 2);
402
403        // Each output file should have wasAttributedTo
404        assert_eq!(prov.was_attributed_to.len(), 2);
405
406        // Config should be used
407        assert_eq!(prov.used.len(), 1);
408    }
409
410    #[test]
411    fn test_prov_json_roundtrip() {
412        let manifest = create_test_manifest();
413        let prov = manifest_to_prov(&manifest);
414
415        let json = serde_json::to_string_pretty(&prov).expect("serialize");
416        let deserialized: ProvDocument = serde_json::from_str(&json).expect("deserialize");
417
418        assert_eq!(deserialized.entity.len(), prov.entity.len());
419        assert_eq!(deserialized.activity.len(), prov.activity.len());
420        assert_eq!(
421            deserialized.was_generated_by.len(),
422            prov.was_generated_by.len()
423        );
424    }
425
426    #[test]
427    fn test_all_output_files_have_was_generated_by() {
428        let manifest = create_test_manifest();
429        let prov = manifest_to_prov(&manifest);
430
431        // Every output file entity should have a corresponding wasGeneratedBy
432        let generated_entities: Vec<_> = prov
433            .was_generated_by
434            .values()
435            .map(|g| g.entity.clone())
436            .collect();
437
438        for (id, entity) in &prov.entity {
439            if entity.prov_type.as_deref() == Some("dsf:OutputFile") {
440                assert!(
441                    generated_entities.contains(id),
442                    "Output file {} has no wasGeneratedBy",
443                    id
444                );
445            }
446        }
447    }
448
449    #[test]
450    fn test_prov_checksum_included() {
451        let manifest = create_test_manifest();
452        let prov = manifest_to_prov(&manifest);
453
454        // Find the journal_entries entity
455        let je_entity = prov
456            .entity
457            .values()
458            .find(|e| e.prov_label.as_deref() == Some("journal_entries.csv"))
459            .expect("should find journal_entries entity");
460
461        assert!(je_entity.attributes.contains_key("dsf:sha256"));
462    }
463}