data_modelling_core/import/
odps.rs1use super::ImportError;
6use crate::models::Tag;
7use crate::models::odps::*;
8use anyhow::{Context, Result};
9use serde_json::Value as JsonValue;
10use serde_yaml::Value as YamlValue;
11use std::str::FromStr;
12
13pub struct ODPSImporter {
15 known_table_ids: Option<Vec<String>>,
17}
18
19impl ODPSImporter {
20 pub fn new() -> Self {
22 Self {
23 known_table_ids: None,
24 }
25 }
26
27 pub fn with_table_ids(table_ids: Vec<String>) -> Self {
29 Self {
30 known_table_ids: Some(table_ids),
31 }
32 }
33
34 pub fn import(&self, yaml_content: &str) -> Result<ODPSDataProduct, ImportError> {
62 #[cfg(feature = "odps-validation")]
64 {
65 use crate::validation::schema::validate_odps_internal;
66 validate_odps_internal(yaml_content).map_err(ImportError::ValidationError)?;
67 }
68
69 let yaml_value: YamlValue = serde_yaml::from_str(yaml_content)
70 .map_err(|e| ImportError::ParseError(format!("Failed to parse YAML: {}", e)))?;
71
72 self.parse_odps_product(&yaml_value)
73 .map_err(|e| ImportError::ParseError(e.to_string()))
74 }
75
76 fn parse_odps_product(&self, yaml: &YamlValue) -> Result<ODPSDataProduct> {
78 let _obj = yaml
79 .as_mapping()
80 .ok_or_else(|| anyhow::anyhow!("ODPS YAML must be a mapping"))?;
81
82 let json_value: JsonValue =
84 serde_json::to_value(yaml).context("Failed to convert YAML to JSON")?;
85
86 let api_version = json_value
88 .get("apiVersion")
89 .and_then(|v| v.as_str())
90 .unwrap_or("v1.0.0")
91 .to_string();
92
93 let kind = json_value
94 .get("kind")
95 .and_then(|v| v.as_str())
96 .unwrap_or("DataProduct")
97 .to_string();
98
99 let id = json_value
100 .get("id")
101 .and_then(|v| v.as_str())
102 .ok_or_else(|| anyhow::anyhow!("Missing required field: id"))?
103 .to_string();
104
105 let status_str = json_value
106 .get("status")
107 .and_then(|v| v.as_str())
108 .ok_or_else(|| anyhow::anyhow!("Missing required field: status"))?;
109
110 let status = match status_str {
111 "proposed" => ODPSStatus::Proposed,
112 "draft" => ODPSStatus::Draft,
113 "active" => ODPSStatus::Active,
114 "deprecated" => ODPSStatus::Deprecated,
115 "retired" => ODPSStatus::Retired,
116 _ => return Err(anyhow::anyhow!("Invalid status: {}", status_str)),
117 };
118
119 let name = json_value
121 .get("name")
122 .and_then(|v| v.as_str())
123 .map(|s| s.to_string());
124 let version = json_value
125 .get("version")
126 .and_then(|v| v.as_str())
127 .map(|s| s.to_string());
128 let domain = json_value
129 .get("domain")
130 .and_then(|v| v.as_str())
131 .map(|s| s.to_string());
132 let tenant = json_value
133 .get("tenant")
134 .and_then(|v| v.as_str())
135 .map(|s| s.to_string());
136
137 let tags = self.parse_tags(&json_value)?;
138 let description = self.parse_description(&json_value)?;
139 let authoritative_definitions = self.parse_authoritative_definitions(&json_value)?;
140 let custom_properties = self.parse_custom_properties(&json_value)?;
141 let input_ports = self.parse_input_ports(&json_value)?;
142 let output_ports = self.parse_output_ports(&json_value)?;
143 let management_ports = self.parse_management_ports(&json_value)?;
144 let support = self.parse_support(&json_value)?;
145 let team = self.parse_team(&json_value)?;
146 let product_created_ts = json_value
147 .get("productCreatedTs")
148 .and_then(|v| v.as_str())
149 .map(|s| s.to_string());
150
151 Ok(ODPSDataProduct {
152 api_version,
153 kind,
154 id,
155 name,
156 version,
157 status,
158 domain,
159 tenant,
160 authoritative_definitions,
161 description,
162 custom_properties,
163 tags,
164 input_ports,
165 output_ports,
166 management_ports,
167 support,
168 team,
169 product_created_ts,
170 created_at: Some(chrono::Utc::now()),
171 updated_at: Some(chrono::Utc::now()),
172 })
173 }
174
175 fn parse_tags(&self, json: &JsonValue) -> Result<Vec<Tag>> {
177 let mut tags = Vec::new();
178 if let Some(tags_arr) = json.get("tags").and_then(|v| v.as_array()) {
179 for item in tags_arr {
180 if let Some(s) = item.as_str() {
181 if let Ok(tag) = Tag::from_str(s) {
182 tags.push(tag);
183 } else {
184 tags.push(Tag::Simple(s.to_string()));
185 }
186 }
187 }
188 }
189 Ok(tags)
190 }
191
192 fn parse_description(&self, json: &JsonValue) -> Result<Option<ODPSDescription>> {
194 if let Some(desc_val) = json.get("description") {
195 let desc_obj = desc_val
196 .as_object()
197 .ok_or_else(|| anyhow::anyhow!("Description must be an object"))?;
198 Ok(Some(ODPSDescription {
199 purpose: desc_obj
200 .get("purpose")
201 .and_then(|v| v.as_str())
202 .map(|s| s.to_string()),
203 limitations: desc_obj
204 .get("limitations")
205 .and_then(|v| v.as_str())
206 .map(|s| s.to_string()),
207 usage: desc_obj
208 .get("usage")
209 .and_then(|v| v.as_str())
210 .map(|s| s.to_string()),
211 authoritative_definitions: self.parse_authoritative_definitions(desc_val)?,
212 custom_properties: self.parse_custom_properties(desc_val)?,
213 }))
214 } else {
215 Ok(None)
216 }
217 }
218
219 fn parse_authoritative_definitions(
221 &self,
222 json: &JsonValue,
223 ) -> Result<Option<Vec<ODPSAuthoritativeDefinition>>> {
224 if let Some(defs_arr) = json
225 .get("authoritativeDefinitions")
226 .and_then(|v| v.as_array())
227 {
228 let mut defs = Vec::new();
229 for def_item in defs_arr {
230 if let Some(def_obj) = def_item.as_object()
231 && let (Some(r#type), Some(url)) = (
232 def_obj.get("type").and_then(|v| v.as_str()),
233 def_obj.get("url").and_then(|v| v.as_str()),
234 )
235 {
236 defs.push(ODPSAuthoritativeDefinition {
237 r#type: r#type.to_string(),
238 url: url.to_string(),
239 description: def_obj
240 .get("description")
241 .and_then(|v| v.as_str())
242 .map(|s| s.to_string()),
243 });
244 }
245 }
246 Ok(Some(defs))
248 } else {
249 Ok(None)
250 }
251 }
252
253 fn parse_custom_properties(&self, json: &JsonValue) -> Result<Option<Vec<ODPSCustomProperty>>> {
255 if let Some(props_arr) = json.get("customProperties").and_then(|v| v.as_array()) {
256 let mut props = Vec::new();
257 for prop_item in props_arr {
258 if let Some(prop_obj) = prop_item.as_object()
259 && let (Some(property), Some(value)) = (
260 prop_obj.get("property").and_then(|v| v.as_str()),
261 prop_obj.get("value"),
262 )
263 {
264 props.push(ODPSCustomProperty {
265 property: property.to_string(),
266 value: value.clone(),
267 description: prop_obj
268 .get("description")
269 .and_then(|v| v.as_str())
270 .map(|s| s.to_string()),
271 });
272 }
273 }
274 Ok(Some(props))
276 } else {
277 Ok(None)
278 }
279 }
280
281 fn parse_input_ports(&self, json: &JsonValue) -> Result<Option<Vec<ODPSInputPort>>> {
283 if let Some(ports_arr) = json.get("inputPorts").and_then(|v| v.as_array()) {
284 let mut ports = Vec::new();
285 for port_item in ports_arr {
286 if let Some(port_obj) = port_item.as_object()
287 && let (Some(name), Some(version), Some(contract_id)) = (
288 port_obj.get("name").and_then(|v| v.as_str()),
289 port_obj.get("version").and_then(|v| v.as_str()),
290 port_obj.get("contractId").and_then(|v| v.as_str()),
291 )
292 {
293 if let Some(ref table_ids) = self.known_table_ids
295 && !table_ids.contains(&contract_id.to_string())
296 {
297 return Err(anyhow::anyhow!(
298 "Input port '{}' references unknown contractId: {}",
299 name,
300 contract_id
301 ));
302 }
303
304 let port_json = JsonValue::Object(port_obj.clone());
305 ports.push(ODPSInputPort {
306 name: name.to_string(),
307 version: version.to_string(),
308 contract_id: contract_id.to_string(),
309 tags: self.parse_tags(&port_json)?,
310 custom_properties: self.parse_custom_properties(&port_json)?,
311 authoritative_definitions: self
312 .parse_authoritative_definitions(&port_json)?,
313 });
314 }
315 }
316 Ok(Some(ports))
318 } else {
319 Ok(None)
320 }
321 }
322
323 fn parse_output_ports(&self, json: &JsonValue) -> Result<Option<Vec<ODPSOutputPort>>> {
325 if let Some(ports_arr) = json.get("outputPorts").and_then(|v| v.as_array()) {
326 let mut ports = Vec::new();
327 for port_item in ports_arr {
328 if let Some(port_obj) = port_item.as_object()
329 && let (Some(name), Some(version)) = (
330 port_obj.get("name").and_then(|v| v.as_str()),
331 port_obj.get("version").and_then(|v| v.as_str()),
332 )
333 {
334 let contract_id = port_obj
335 .get("contractId")
336 .and_then(|v| v.as_str())
337 .map(|s| s.to_string());
338
339 if let Some(ref contract_id_str) = contract_id
341 && let Some(ref table_ids) = self.known_table_ids
342 && !table_ids.contains(contract_id_str)
343 {
344 return Err(anyhow::anyhow!(
345 "Output port '{}' references unknown contractId: {}",
346 name,
347 contract_id_str
348 ));
349 }
350
351 let input_contracts = if let Some(contracts_arr) =
353 port_obj.get("inputContracts").and_then(|v| v.as_array())
354 {
355 let mut contracts = Vec::new();
356 for contract_item in contracts_arr {
357 if let Some(contract_obj) = contract_item.as_object()
358 && let (Some(id), Some(version)) = (
359 contract_obj.get("id").and_then(|v| v.as_str()),
360 contract_obj.get("version").and_then(|v| v.as_str()),
361 )
362 {
363 contracts.push(ODPSInputContract {
364 id: id.to_string(),
365 version: version.to_string(),
366 });
367 }
368 }
369 if !contracts.is_empty() {
370 Some(contracts)
371 } else {
372 None
373 }
374 } else {
375 None
376 };
377
378 let sbom =
380 if let Some(sbom_arr) = port_obj.get("sbom").and_then(|v| v.as_array()) {
381 let mut sboms = Vec::new();
382 for sbom_item in sbom_arr {
383 if let Some(sbom_obj) = sbom_item.as_object()
384 && let Some(url) = sbom_obj.get("url").and_then(|v| v.as_str())
385 {
386 sboms.push(ODPSSBOM {
387 r#type: sbom_obj
388 .get("type")
389 .and_then(|v| v.as_str())
390 .map(|s| s.to_string()),
391 url: url.to_string(),
392 });
393 }
394 }
395 if !sboms.is_empty() { Some(sboms) } else { None }
396 } else {
397 None
398 };
399
400 let port_json = JsonValue::Object(port_obj.clone());
401 ports.push(ODPSOutputPort {
402 name: name.to_string(),
403 description: port_obj
404 .get("description")
405 .and_then(|v| v.as_str())
406 .map(|s| s.to_string()),
407 r#type: port_obj
408 .get("type")
409 .and_then(|v| v.as_str())
410 .map(|s| s.to_string()),
411 version: version.to_string(),
412 contract_id,
413 sbom,
414 input_contracts,
415 tags: self.parse_tags(&port_json)?,
416 custom_properties: self.parse_custom_properties(&port_json)?,
417 authoritative_definitions: self
418 .parse_authoritative_definitions(&port_json)?,
419 });
420 }
421 }
422 Ok(Some(ports))
424 } else {
425 Ok(None)
426 }
427 }
428
429 fn parse_management_ports(&self, json: &JsonValue) -> Result<Option<Vec<ODPSManagementPort>>> {
431 if let Some(ports_arr) = json.get("managementPorts").and_then(|v| v.as_array()) {
432 let mut ports = Vec::new();
433 for port_item in ports_arr {
434 if let Some(port_obj) = port_item.as_object()
435 && let (Some(name), Some(content)) = (
436 port_obj.get("name").and_then(|v| v.as_str()),
437 port_obj.get("content").and_then(|v| v.as_str()),
438 )
439 {
440 let port_json = JsonValue::Object(port_obj.clone());
441 ports.push(ODPSManagementPort {
442 name: name.to_string(),
443 content: content.to_string(),
444 r#type: port_obj
445 .get("type")
446 .and_then(|v| v.as_str())
447 .map(|s| s.to_string()),
448 url: port_obj
449 .get("url")
450 .and_then(|v| v.as_str())
451 .map(|s| s.to_string()),
452 channel: port_obj
453 .get("channel")
454 .and_then(|v| v.as_str())
455 .map(|s| s.to_string()),
456 description: port_obj
457 .get("description")
458 .and_then(|v| v.as_str())
459 .map(|s| s.to_string()),
460 tags: self.parse_tags(&port_json)?,
461 custom_properties: self.parse_custom_properties(&port_json)?,
462 authoritative_definitions: self
463 .parse_authoritative_definitions(&port_json)?,
464 });
465 }
466 }
467 Ok(Some(ports))
469 } else {
470 Ok(None)
471 }
472 }
473
474 fn parse_support(&self, json: &JsonValue) -> Result<Option<Vec<ODPSSupport>>> {
476 if let Some(support_arr) = json.get("support").and_then(|v| v.as_array()) {
477 let mut supports = Vec::new();
478 for support_item in support_arr {
479 if let Some(support_obj) = support_item.as_object()
480 && let (Some(channel), Some(url)) = (
481 support_obj.get("channel").and_then(|v| v.as_str()),
482 support_obj.get("url").and_then(|v| v.as_str()),
483 )
484 {
485 let support_json = JsonValue::Object(support_obj.clone());
486 supports.push(ODPSSupport {
487 channel: channel.to_string(),
488 url: url.to_string(),
489 description: support_obj
490 .get("description")
491 .and_then(|v| v.as_str())
492 .map(|s| s.to_string()),
493 tool: support_obj
494 .get("tool")
495 .and_then(|v| v.as_str())
496 .map(|s| s.to_string()),
497 scope: support_obj
498 .get("scope")
499 .and_then(|v| v.as_str())
500 .map(|s| s.to_string()),
501 invitation_url: support_obj
502 .get("invitationUrl")
503 .and_then(|v| v.as_str())
504 .map(|s| s.to_string()),
505 tags: self.parse_tags(&support_json)?,
506 custom_properties: self.parse_custom_properties(&support_json)?,
507 authoritative_definitions: self
508 .parse_authoritative_definitions(&support_json)?,
509 });
510 }
511 }
512 Ok(Some(supports))
514 } else {
515 Ok(None)
516 }
517 }
518
519 fn parse_team(&self, json: &JsonValue) -> Result<Option<ODPSTeam>> {
521 if let Some(team_obj) = json.get("team").and_then(|v| v.as_object()) {
522 let members = if let Some(members_arr) =
523 team_obj.get("members").and_then(|v| v.as_array())
524 {
525 let mut team_members = Vec::new();
526 for member_item in members_arr {
527 if let Some(member_obj) = member_item.as_object()
528 && let Some(username) = member_obj.get("username").and_then(|v| v.as_str())
529 {
530 let member_json = JsonValue::Object(member_obj.clone());
531 team_members.push(ODPSTeamMember {
532 username: username.to_string(),
533 name: member_obj
534 .get("name")
535 .and_then(|v| v.as_str())
536 .map(|s| s.to_string()),
537 description: member_obj
538 .get("description")
539 .and_then(|v| v.as_str())
540 .map(|s| s.to_string()),
541 role: member_obj
542 .get("role")
543 .and_then(|v| v.as_str())
544 .map(|s| s.to_string()),
545 date_in: member_obj
546 .get("dateIn")
547 .and_then(|v| v.as_str())
548 .map(|s| s.to_string()),
549 date_out: member_obj
550 .get("dateOut")
551 .and_then(|v| v.as_str())
552 .map(|s| s.to_string()),
553 replaced_by_username: member_obj
554 .get("replacedByUsername")
555 .and_then(|v| v.as_str())
556 .map(|s| s.to_string()),
557 tags: self.parse_tags(&member_json)?,
558 custom_properties: self.parse_custom_properties(&member_json)?,
559 authoritative_definitions: self
560 .parse_authoritative_definitions(&member_json)?,
561 });
562 }
563 }
564 if !team_members.is_empty() {
565 Some(team_members)
566 } else {
567 None
568 }
569 } else {
570 None
571 };
572
573 let team_json = JsonValue::Object(team_obj.clone());
574 Ok(Some(ODPSTeam {
575 name: team_obj
576 .get("name")
577 .and_then(|v| v.as_str())
578 .map(|s| s.to_string()),
579 description: team_obj
580 .get("description")
581 .and_then(|v| v.as_str())
582 .map(|s| s.to_string()),
583 members,
584 tags: self.parse_tags(&team_json)?,
585 custom_properties: self.parse_custom_properties(&team_json)?,
586 authoritative_definitions: self.parse_authoritative_definitions(&team_json)?,
587 }))
588 } else {
589 Ok(None)
590 }
591 }
592}
593
594impl Default for ODPSImporter {
595 fn default() -> Self {
596 Self::new()
597 }
598}