data_modelling_core/import/
odps.rs

1//! ODPS (Open Data Product Standard) importer
2//!
3//! Parses ODPS YAML files and converts them to ODPSDataProduct models.
4
5use super::ImportError;
6use crate::models::Tag;
7use crate::models::odps::*;
8use anyhow::{Context, Result};
9use serde_json::Value as JsonValue;
10use serde_yaml::Value as YamlValue;
11use std::str::FromStr;
12
13/// ODPS importer for parsing ODPS YAML files
14pub struct ODPSImporter {
15    /// Optional: Known ODCS Table IDs for contractId validation
16    known_table_ids: Option<Vec<String>>,
17}
18
19impl ODPSImporter {
20    /// Create a new ODPS importer instance
21    pub fn new() -> Self {
22        Self {
23            known_table_ids: None,
24        }
25    }
26
27    /// Create a new ODPS importer with known table IDs for contractId validation
28    pub fn with_table_ids(table_ids: Vec<String>) -> Self {
29        Self {
30            known_table_ids: Some(table_ids),
31        }
32    }
33
34    /// Import ODPS YAML content and create ODPSDataProduct
35    ///
36    /// # Arguments
37    ///
38    /// * `yaml_content` - ODPS YAML content as a string
39    ///
40    /// # Returns
41    ///
42    /// A `ODPSDataProduct` parsed from the YAML content
43    ///
44    /// # Example
45    ///
46    /// ```rust
47    /// use data_modelling_core::import::odps::ODPSImporter;
48    ///
49    /// let importer = ODPSImporter::new();
50    /// let yaml = r#"
51    /// apiVersion: v1.0.0
52    /// kind: DataProduct
53    /// id: 550e8400-e29b-41d4-a716-446655440000
54    /// name: customer-data-product
55    /// version: 1.0.0
56    /// status: active
57    /// "#;
58    /// let product = importer.import(yaml).unwrap();
59    /// assert_eq!(product.name, Some("customer-data-product".to_string()));
60    /// ```
61    pub fn import(&self, yaml_content: &str) -> Result<ODPSDataProduct, ImportError> {
62        // Validate against ODPS schema before parsing (if feature enabled)
63        #[cfg(feature = "odps-validation")]
64        {
65            use crate::validation::schema::validate_odps_internal;
66            validate_odps_internal(yaml_content).map_err(ImportError::ValidationError)?;
67        }
68
69        let yaml_value: YamlValue = serde_yaml::from_str(yaml_content)
70            .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
71
72        self.parse_odps_product(&yaml_value)
73            .map_err(|e| ImportError::ParseError(e.to_string()))
74    }
75
76    /// Parse ODPS data product from YAML value
77    fn parse_odps_product(&self, yaml: &YamlValue) -> Result<ODPSDataProduct> {
78        let _obj = yaml
79            .as_mapping()
80            .ok_or_else(|| anyhow::anyhow!("ODPS YAML must be a mapping"))?;
81
82        // Convert YAML to JSON for easier parsing
83        let json_value: JsonValue =
84            serde_json::to_value(yaml).context("Failed to convert YAML to JSON")?;
85
86        // Parse required fields
87        let api_version = json_value
88            .get("apiVersion")
89            .and_then(|v| v.as_str())
90            .unwrap_or("v1.0.0")
91            .to_string();
92
93        let kind = json_value
94            .get("kind")
95            .and_then(|v| v.as_str())
96            .unwrap_or("DataProduct")
97            .to_string();
98
99        let id = json_value
100            .get("id")
101            .and_then(|v| v.as_str())
102            .ok_or_else(|| anyhow::anyhow!("Missing required field: id"))?
103            .to_string();
104
105        let status_str = json_value
106            .get("status")
107            .and_then(|v| v.as_str())
108            .ok_or_else(|| anyhow::anyhow!("Missing required field: status"))?;
109
110        let status = match status_str {
111            "proposed" => ODPSStatus::Proposed,
112            "draft" => ODPSStatus::Draft,
113            "active" => ODPSStatus::Active,
114            "deprecated" => ODPSStatus::Deprecated,
115            "retired" => ODPSStatus::Retired,
116            _ => return Err(anyhow::anyhow!("Invalid status: {}", status_str)),
117        };
118
119        // Parse optional fields
120        let name = json_value
121            .get("name")
122            .and_then(|v| v.as_str())
123            .map(|s| s.to_string());
124        let version = json_value
125            .get("version")
126            .and_then(|v| v.as_str())
127            .map(|s| s.to_string());
128        let domain = json_value
129            .get("domain")
130            .and_then(|v| v.as_str())
131            .map(|s| s.to_string());
132        let tenant = json_value
133            .get("tenant")
134            .and_then(|v| v.as_str())
135            .map(|s| s.to_string());
136
137        let tags = self.parse_tags(&json_value)?;
138        let description = self.parse_description(&json_value)?;
139        let authoritative_definitions = self.parse_authoritative_definitions(&json_value)?;
140        let custom_properties = self.parse_custom_properties(&json_value)?;
141        let input_ports = self.parse_input_ports(&json_value)?;
142        let output_ports = self.parse_output_ports(&json_value)?;
143        let management_ports = self.parse_management_ports(&json_value)?;
144        let support = self.parse_support(&json_value)?;
145        let team = self.parse_team(&json_value)?;
146        let product_created_ts = json_value
147            .get("productCreatedTs")
148            .and_then(|v| v.as_str())
149            .map(|s| s.to_string());
150
151        Ok(ODPSDataProduct {
152            api_version,
153            kind,
154            id,
155            name,
156            version,
157            status,
158            domain,
159            tenant,
160            authoritative_definitions,
161            description,
162            custom_properties,
163            tags,
164            input_ports,
165            output_ports,
166            management_ports,
167            support,
168            team,
169            product_created_ts,
170            created_at: Some(chrono::Utc::now()),
171            updated_at: Some(chrono::Utc::now()),
172        })
173    }
174
175    /// Parse tags array
176    fn parse_tags(&self, json: &JsonValue) -> Result<Vec<Tag>> {
177        let mut tags = Vec::new();
178        if let Some(tags_arr) = json.get("tags").and_then(|v| v.as_array()) {
179            for item in tags_arr {
180                if let Some(s) = item.as_str() {
181                    if let Ok(tag) = Tag::from_str(s) {
182                        tags.push(tag);
183                    } else {
184                        tags.push(Tag::Simple(s.to_string()));
185                    }
186                }
187            }
188        }
189        Ok(tags)
190    }
191
192    /// Parse description object
193    fn parse_description(&self, json: &JsonValue) -> Result<Option<ODPSDescription>> {
194        if let Some(desc_val) = json.get("description") {
195            let desc_obj = desc_val
196                .as_object()
197                .ok_or_else(|| anyhow::anyhow!("Description must be an object"))?;
198            Ok(Some(ODPSDescription {
199                purpose: desc_obj
200                    .get("purpose")
201                    .and_then(|v| v.as_str())
202                    .map(|s| s.to_string()),
203                limitations: desc_obj
204                    .get("limitations")
205                    .and_then(|v| v.as_str())
206                    .map(|s| s.to_string()),
207                usage: desc_obj
208                    .get("usage")
209                    .and_then(|v| v.as_str())
210                    .map(|s| s.to_string()),
211                authoritative_definitions: self.parse_authoritative_definitions(desc_val)?,
212                custom_properties: self.parse_custom_properties(desc_val)?,
213            }))
214        } else {
215            Ok(None)
216        }
217    }
218
219    /// Parse authoritative definitions array
220    fn parse_authoritative_definitions(
221        &self,
222        json: &JsonValue,
223    ) -> Result<Option<Vec<ODPSAuthoritativeDefinition>>> {
224        if let Some(defs_arr) = json
225            .get("authoritativeDefinitions")
226            .and_then(|v| v.as_array())
227        {
228            let mut defs = Vec::new();
229            for def_item in defs_arr {
230                if let Some(def_obj) = def_item.as_object()
231                    && let (Some(r#type), Some(url)) = (
232                        def_obj.get("type").and_then(|v| v.as_str()),
233                        def_obj.get("url").and_then(|v| v.as_str()),
234                    )
235                {
236                    defs.push(ODPSAuthoritativeDefinition {
237                        r#type: r#type.to_string(),
238                        url: url.to_string(),
239                        description: def_obj
240                            .get("description")
241                            .and_then(|v| v.as_str())
242                            .map(|s| s.to_string()),
243                    });
244                }
245            }
246            // Preserve empty arrays as Some(vec![]) to maintain structure
247            Ok(Some(defs))
248        } else {
249            Ok(None)
250        }
251    }
252
253    /// Parse custom properties array
254    fn parse_custom_properties(&self, json: &JsonValue) -> Result<Option<Vec<ODPSCustomProperty>>> {
255        if let Some(props_arr) = json.get("customProperties").and_then(|v| v.as_array()) {
256            let mut props = Vec::new();
257            for prop_item in props_arr {
258                if let Some(prop_obj) = prop_item.as_object()
259                    && let (Some(property), Some(value)) = (
260                        prop_obj.get("property").and_then(|v| v.as_str()),
261                        prop_obj.get("value"),
262                    )
263                {
264                    props.push(ODPSCustomProperty {
265                        property: property.to_string(),
266                        value: value.clone(),
267                        description: prop_obj
268                            .get("description")
269                            .and_then(|v| v.as_str())
270                            .map(|s| s.to_string()),
271                    });
272                }
273            }
274            // Preserve empty arrays as Some(vec![]) to maintain structure
275            Ok(Some(props))
276        } else {
277            Ok(None)
278        }
279    }
280
281    /// Parse input ports array
282    fn parse_input_ports(&self, json: &JsonValue) -> Result<Option<Vec<ODPSInputPort>>> {
283        if let Some(ports_arr) = json.get("inputPorts").and_then(|v| v.as_array()) {
284            let mut ports = Vec::new();
285            for port_item in ports_arr {
286                if let Some(port_obj) = port_item.as_object()
287                    && let (Some(name), Some(version), Some(contract_id)) = (
288                        port_obj.get("name").and_then(|v| v.as_str()),
289                        port_obj.get("version").and_then(|v| v.as_str()),
290                        port_obj.get("contractId").and_then(|v| v.as_str()),
291                    )
292                {
293                    // Validate contractId if known table IDs provided
294                    if let Some(ref table_ids) = self.known_table_ids
295                        && !table_ids.contains(&contract_id.to_string())
296                    {
297                        return Err(anyhow::anyhow!(
298                            "Input port '{}' references unknown contractId: {}",
299                            name,
300                            contract_id
301                        ));
302                    }
303
304                    let port_json = JsonValue::Object(port_obj.clone());
305                    ports.push(ODPSInputPort {
306                        name: name.to_string(),
307                        version: version.to_string(),
308                        contract_id: contract_id.to_string(),
309                        tags: self.parse_tags(&port_json)?,
310                        custom_properties: self.parse_custom_properties(&port_json)?,
311                        authoritative_definitions: self
312                            .parse_authoritative_definitions(&port_json)?,
313                    });
314                }
315            }
316            // Preserve empty arrays as Some(vec![]) to maintain structure
317            Ok(Some(ports))
318        } else {
319            Ok(None)
320        }
321    }
322
323    /// Parse output ports array
324    fn parse_output_ports(&self, json: &JsonValue) -> Result<Option<Vec<ODPSOutputPort>>> {
325        if let Some(ports_arr) = json.get("outputPorts").and_then(|v| v.as_array()) {
326            let mut ports = Vec::new();
327            for port_item in ports_arr {
328                if let Some(port_obj) = port_item.as_object()
329                    && let (Some(name), Some(version)) = (
330                        port_obj.get("name").and_then(|v| v.as_str()),
331                        port_obj.get("version").and_then(|v| v.as_str()),
332                    )
333                {
334                    let contract_id = port_obj
335                        .get("contractId")
336                        .and_then(|v| v.as_str())
337                        .map(|s| s.to_string());
338
339                    // Validate contractId if known table IDs provided
340                    if let Some(ref contract_id_str) = contract_id
341                        && let Some(ref table_ids) = self.known_table_ids
342                        && !table_ids.contains(contract_id_str)
343                    {
344                        return Err(anyhow::anyhow!(
345                            "Output port '{}' references unknown contractId: {}",
346                            name,
347                            contract_id_str
348                        ));
349                    }
350
351                    // Parse input contracts
352                    let input_contracts = if let Some(contracts_arr) =
353                        port_obj.get("inputContracts").and_then(|v| v.as_array())
354                    {
355                        let mut contracts = Vec::new();
356                        for contract_item in contracts_arr {
357                            if let Some(contract_obj) = contract_item.as_object()
358                                && let (Some(id), Some(version)) = (
359                                    contract_obj.get("id").and_then(|v| v.as_str()),
360                                    contract_obj.get("version").and_then(|v| v.as_str()),
361                                )
362                            {
363                                contracts.push(ODPSInputContract {
364                                    id: id.to_string(),
365                                    version: version.to_string(),
366                                });
367                            }
368                        }
369                        if !contracts.is_empty() {
370                            Some(contracts)
371                        } else {
372                            None
373                        }
374                    } else {
375                        None
376                    };
377
378                    // Parse SBOM
379                    let sbom =
380                        if let Some(sbom_arr) = port_obj.get("sbom").and_then(|v| v.as_array()) {
381                            let mut sboms = Vec::new();
382                            for sbom_item in sbom_arr {
383                                if let Some(sbom_obj) = sbom_item.as_object()
384                                    && let Some(url) = sbom_obj.get("url").and_then(|v| v.as_str())
385                                {
386                                    sboms.push(ODPSSBOM {
387                                        r#type: sbom_obj
388                                            .get("type")
389                                            .and_then(|v| v.as_str())
390                                            .map(|s| s.to_string()),
391                                        url: url.to_string(),
392                                    });
393                                }
394                            }
395                            if !sboms.is_empty() { Some(sboms) } else { None }
396                        } else {
397                            None
398                        };
399
400                    let port_json = JsonValue::Object(port_obj.clone());
401                    ports.push(ODPSOutputPort {
402                        name: name.to_string(),
403                        description: port_obj
404                            .get("description")
405                            .and_then(|v| v.as_str())
406                            .map(|s| s.to_string()),
407                        r#type: port_obj
408                            .get("type")
409                            .and_then(|v| v.as_str())
410                            .map(|s| s.to_string()),
411                        version: version.to_string(),
412                        contract_id,
413                        sbom,
414                        input_contracts,
415                        tags: self.parse_tags(&port_json)?,
416                        custom_properties: self.parse_custom_properties(&port_json)?,
417                        authoritative_definitions: self
418                            .parse_authoritative_definitions(&port_json)?,
419                    });
420                }
421            }
422            // Preserve empty arrays as Some(vec![]) to maintain structure
423            Ok(Some(ports))
424        } else {
425            Ok(None)
426        }
427    }
428
429    /// Parse management ports array
430    fn parse_management_ports(&self, json: &JsonValue) -> Result<Option<Vec<ODPSManagementPort>>> {
431        if let Some(ports_arr) = json.get("managementPorts").and_then(|v| v.as_array()) {
432            let mut ports = Vec::new();
433            for port_item in ports_arr {
434                if let Some(port_obj) = port_item.as_object()
435                    && let (Some(name), Some(content)) = (
436                        port_obj.get("name").and_then(|v| v.as_str()),
437                        port_obj.get("content").and_then(|v| v.as_str()),
438                    )
439                {
440                    let port_json = JsonValue::Object(port_obj.clone());
441                    ports.push(ODPSManagementPort {
442                        name: name.to_string(),
443                        content: content.to_string(),
444                        r#type: port_obj
445                            .get("type")
446                            .and_then(|v| v.as_str())
447                            .map(|s| s.to_string()),
448                        url: port_obj
449                            .get("url")
450                            .and_then(|v| v.as_str())
451                            .map(|s| s.to_string()),
452                        channel: port_obj
453                            .get("channel")
454                            .and_then(|v| v.as_str())
455                            .map(|s| s.to_string()),
456                        description: port_obj
457                            .get("description")
458                            .and_then(|v| v.as_str())
459                            .map(|s| s.to_string()),
460                        tags: self.parse_tags(&port_json)?,
461                        custom_properties: self.parse_custom_properties(&port_json)?,
462                        authoritative_definitions: self
463                            .parse_authoritative_definitions(&port_json)?,
464                    });
465                }
466            }
467            // Preserve empty arrays as Some(vec![]) to maintain structure
468            Ok(Some(ports))
469        } else {
470            Ok(None)
471        }
472    }
473
474    /// Parse support array
475    fn parse_support(&self, json: &JsonValue) -> Result<Option<Vec<ODPSSupport>>> {
476        if let Some(support_arr) = json.get("support").and_then(|v| v.as_array()) {
477            let mut supports = Vec::new();
478            for support_item in support_arr {
479                if let Some(support_obj) = support_item.as_object()
480                    && let (Some(channel), Some(url)) = (
481                        support_obj.get("channel").and_then(|v| v.as_str()),
482                        support_obj.get("url").and_then(|v| v.as_str()),
483                    )
484                {
485                    let support_json = JsonValue::Object(support_obj.clone());
486                    supports.push(ODPSSupport {
487                        channel: channel.to_string(),
488                        url: url.to_string(),
489                        description: support_obj
490                            .get("description")
491                            .and_then(|v| v.as_str())
492                            .map(|s| s.to_string()),
493                        tool: support_obj
494                            .get("tool")
495                            .and_then(|v| v.as_str())
496                            .map(|s| s.to_string()),
497                        scope: support_obj
498                            .get("scope")
499                            .and_then(|v| v.as_str())
500                            .map(|s| s.to_string()),
501                        invitation_url: support_obj
502                            .get("invitationUrl")
503                            .and_then(|v| v.as_str())
504                            .map(|s| s.to_string()),
505                        tags: self.parse_tags(&support_json)?,
506                        custom_properties: self.parse_custom_properties(&support_json)?,
507                        authoritative_definitions: self
508                            .parse_authoritative_definitions(&support_json)?,
509                    });
510                }
511            }
512            // Preserve empty arrays as Some(vec![]) to maintain structure
513            Ok(Some(supports))
514        } else {
515            Ok(None)
516        }
517    }
518
519    /// Parse team object
520    fn parse_team(&self, json: &JsonValue) -> Result<Option<ODPSTeam>> {
521        if let Some(team_obj) = json.get("team").and_then(|v| v.as_object()) {
522            let members = if let Some(members_arr) =
523                team_obj.get("members").and_then(|v| v.as_array())
524            {
525                let mut team_members = Vec::new();
526                for member_item in members_arr {
527                    if let Some(member_obj) = member_item.as_object()
528                        && let Some(username) = member_obj.get("username").and_then(|v| v.as_str())
529                    {
530                        let member_json = JsonValue::Object(member_obj.clone());
531                        team_members.push(ODPSTeamMember {
532                            username: username.to_string(),
533                            name: member_obj
534                                .get("name")
535                                .and_then(|v| v.as_str())
536                                .map(|s| s.to_string()),
537                            description: member_obj
538                                .get("description")
539                                .and_then(|v| v.as_str())
540                                .map(|s| s.to_string()),
541                            role: member_obj
542                                .get("role")
543                                .and_then(|v| v.as_str())
544                                .map(|s| s.to_string()),
545                            date_in: member_obj
546                                .get("dateIn")
547                                .and_then(|v| v.as_str())
548                                .map(|s| s.to_string()),
549                            date_out: member_obj
550                                .get("dateOut")
551                                .and_then(|v| v.as_str())
552                                .map(|s| s.to_string()),
553                            replaced_by_username: member_obj
554                                .get("replacedByUsername")
555                                .and_then(|v| v.as_str())
556                                .map(|s| s.to_string()),
557                            tags: self.parse_tags(&member_json)?,
558                            custom_properties: self.parse_custom_properties(&member_json)?,
559                            authoritative_definitions: self
560                                .parse_authoritative_definitions(&member_json)?,
561                        });
562                    }
563                }
564                if !team_members.is_empty() {
565                    Some(team_members)
566                } else {
567                    None
568                }
569            } else {
570                None
571            };
572
573            let team_json = JsonValue::Object(team_obj.clone());
574            Ok(Some(ODPSTeam {
575                name: team_obj
576                    .get("name")
577                    .and_then(|v| v.as_str())
578                    .map(|s| s.to_string()),
579                description: team_obj
580                    .get("description")
581                    .and_then(|v| v.as_str())
582                    .map(|s| s.to_string()),
583                members,
584                tags: self.parse_tags(&team_json)?,
585                custom_properties: self.parse_custom_properties(&team_json)?,
586                authoritative_definitions: self.parse_authoritative_definitions(&team_json)?,
587            }))
588        } else {
589            Ok(None)
590        }
591    }
592}
593
594impl Default for ODPSImporter {
595    fn default() -> Self {
596        Self::new()
597    }
598}