data_modelling_sdk/import/
avro.rs

1//! AVRO schema parser for importing AVRO schemas into data models.
2//!
3//! # Validation
4//!
5//! All imported table and column names are validated for:
6//! - Valid identifier format
7//! - Maximum length limits
8
9use crate::import::{ImportError, ImportResult, TableData};
10use crate::models::{Column, Table, Tag};
11use crate::validation::input::{validate_column_name, validate_table_name};
12use anyhow::{Context, Result};
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use std::str::FromStr;
16use tracing::{info, warn};
17
18/// Parser for AVRO schema format.
19#[derive(Default)]
20pub struct AvroImporter;
21
22impl AvroImporter {
23    /// Create a new AVRO parser instance.
24    ///
25    /// # Example
26    ///
27    /// ```rust
28    /// use data_modelling_sdk::import::avro::AvroImporter;
29    ///
30    /// let importer = AvroImporter::new();
31    /// ```
32    pub fn new() -> Self {
33        Self
34    }
35
36    /// Import AVRO schema content and create Table(s) (SDK interface).
37    ///
38    /// # Arguments
39    ///
40    /// * `avro_content` - AVRO schema as JSON string (can be a single record or array of records)
41    ///
42    /// # Returns
43    ///
44    /// An `ImportResult` containing extracted tables and any parse errors.
45    ///
46    /// # Example
47    ///
48    /// ```rust
49    /// use data_modelling_sdk::import::avro::AvroImporter;
50    ///
51    /// let importer = AvroImporter::new();
52    /// let schema = r#"
53    /// {
54    ///   "type": "record",
55    ///   "name": "User",
56    ///   "fields": [
57    ///     {"name": "id", "type": "long"}
58    ///   ]
59    /// }
60    /// "#;
61    /// let result = importer.import(schema).unwrap();
62    /// ```
63    pub fn import(&self, avro_content: &str) -> Result<ImportResult, ImportError> {
64        match self.parse(avro_content) {
65            Ok((tables, errors)) => {
66                let mut sdk_tables = Vec::new();
67                for (idx, table) in tables.iter().enumerate() {
68                    sdk_tables.push(TableData {
69                        table_index: idx,
70                        name: Some(table.name.clone()),
71                        columns: table
72                            .columns
73                            .iter()
74                            .map(|c| super::ColumnData {
75                                name: c.name.clone(),
76                                data_type: c.data_type.clone(),
77                                physical_type: c.physical_type.clone(),
78                                nullable: c.nullable,
79                                primary_key: c.primary_key,
80                                description: if c.description.is_empty() {
81                                    None
82                                } else {
83                                    Some(c.description.clone())
84                                },
85                                quality: if c.quality.is_empty() {
86                                    None
87                                } else {
88                                    Some(c.quality.clone())
89                                },
90                                relationships: c.relationships.clone(),
91                                enum_values: if c.enum_values.is_empty() {
92                                    None
93                                } else {
94                                    Some(c.enum_values.clone())
95                                },
96                            })
97                            .collect(),
98                    });
99                }
100                let sdk_errors: Vec<ImportError> = errors
101                    .iter()
102                    .map(|e| ImportError::ParseError(e.message.clone()))
103                    .collect();
104                Ok(ImportResult {
105                    tables: sdk_tables,
106                    tables_requiring_name: Vec::new(),
107                    errors: sdk_errors,
108                    ai_suggestions: None,
109                })
110            }
111            Err(e) => Err(ImportError::ParseError(e.to_string())),
112        }
113    }
114
115    /// Parse AVRO schema content and create Table(s) (internal method).
116    ///
117    /// # Returns
118    ///
119    /// Returns a tuple of (Tables, list of errors/warnings).
120    fn parse(&self, avro_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
121        let mut errors = Vec::new();
122
123        // Parse JSON
124        let schema: Value =
125            serde_json::from_str(avro_content).context("Failed to parse AVRO schema as JSON")?;
126
127        let mut tables = Vec::new();
128
129        // AVRO can be a single record or an array of records
130        if let Some(schemas) = schema.as_array() {
131            // Multiple schemas
132            for (idx, schema_item) in schemas.iter().enumerate() {
133                match self.parse_schema(schema_item, &mut errors) {
134                    Ok(table) => tables.push(table),
135                    Err(e) => {
136                        errors.push(ParserError {
137                            error_type: "parse_error".to_string(),
138                            field: Some(format!("schema[{}]", idx)),
139                            message: format!("Failed to parse schema: {}", e),
140                        });
141                    }
142                }
143            }
144        } else {
145            // Single schema
146            match self.parse_schema(&schema, &mut errors) {
147                Ok(table) => tables.push(table),
148                Err(e) => {
149                    errors.push(ParserError {
150                        error_type: "parse_error".to_string(),
151                        field: None,
152                        message: format!("Failed to parse schema: {}", e),
153                    });
154                }
155            }
156        }
157
158        Ok((tables, errors))
159    }
160
161    /// Parse a single AVRO schema record.
162    fn parse_schema(&self, schema: &Value, errors: &mut Vec<ParserError>) -> Result<Table> {
163        let schema_obj = schema
164            .as_object()
165            .ok_or_else(|| anyhow::anyhow!("Schema must be an object"))?;
166
167        // Extract record name
168        let name = schema_obj
169            .get("name")
170            .and_then(|v| v.as_str())
171            .ok_or_else(|| anyhow::anyhow!("Missing required field: name"))?
172            .to_string();
173
174        // Validate table name
175        if let Err(e) = validate_table_name(&name) {
176            warn!("Table name validation warning for '{}': {}", name, e);
177        }
178
179        // Extract namespace (optional)
180        let namespace = schema_obj
181            .get("namespace")
182            .and_then(|v| v.as_str())
183            .map(|s| s.to_string());
184
185        // Extract fields
186        let fields = schema_obj
187            .get("fields")
188            .and_then(|v| v.as_array())
189            .ok_or_else(|| anyhow::anyhow!("Missing required field: fields"))?;
190
191        let mut columns = Vec::new();
192        for (idx, field) in fields.iter().enumerate() {
193            match self.parse_field(field, &name, errors) {
194                Ok(mut cols) => columns.append(&mut cols),
195                Err(e) => {
196                    errors.push(ParserError {
197                        error_type: "parse_error".to_string(),
198                        field: Some(format!("fields[{}]", idx)),
199                        message: format!("Failed to parse field: {}", e),
200                    });
201                }
202            }
203        }
204
205        // Extract tags from AVRO schema (can be in root or in aliases/metadata)
206        let mut tags: Vec<Tag> = Vec::new();
207        if let Some(tags_arr) = schema_obj.get("tags").and_then(|v| v.as_array()) {
208            for item in tags_arr {
209                if let Some(s) = item.as_str() {
210                    if let Ok(tag) = Tag::from_str(s) {
211                        tags.push(tag);
212                    } else {
213                        tags.push(Tag::Simple(s.to_string()));
214                    }
215                }
216            }
217        }
218        // Also check aliases/metadata for tags
219        if let Some(aliases_arr) = schema_obj.get("aliases").and_then(|v| v.as_array()) {
220            for item in aliases_arr {
221                if let Some(s) = item.as_str() {
222                    // AVRO aliases can be used as tags
223                    if let Ok(tag) = Tag::from_str(s) {
224                        if !tags.contains(&tag) {
225                            tags.push(tag);
226                        }
227                    } else {
228                        let simple_tag = Tag::Simple(s.to_string());
229                        if !tags.contains(&simple_tag) {
230                            tags.push(simple_tag);
231                        }
232                    }
233                }
234            }
235        }
236
237        // Build table metadata
238        let mut odcl_metadata = HashMap::new();
239        if let Some(ref ns) = namespace {
240            odcl_metadata.insert("namespace".to_string(), json!(ns));
241        }
242        if let Some(doc) = schema_obj.get("doc").and_then(|v| v.as_str()) {
243            odcl_metadata.insert("description".to_string(), json!(doc));
244        }
245
246        let table = Table {
247            id: crate::models::table::Table::generate_id(&name, None, None, namespace.as_deref()),
248            name: name.clone(),
249            columns,
250            database_type: None,
251            catalog_name: None,
252            schema_name: namespace.clone(),
253            medallion_layers: Vec::new(),
254            scd_pattern: None,
255            data_vault_classification: None,
256            modeling_level: None,
257            tags,
258            odcl_metadata,
259            owner: None,
260            sla: None,
261            contact_details: None,
262            infrastructure_type: None,
263            notes: None,
264            position: None,
265            yaml_file_path: None,
266            drawio_cell_id: None,
267            quality: Vec::new(),
268            errors: Vec::new(),
269            created_at: chrono::Utc::now(),
270            updated_at: chrono::Utc::now(),
271        };
272
273        info!(
274            "Parsed AVRO schema: {} with {} columns",
275            name,
276            table.columns.len()
277        );
278        Ok(table)
279    }
280
281    /// Parse an AVRO field (which can be a simple field or nested record).
282    fn parse_field(
283        &self,
284        field: &Value,
285        _parent_name: &str,
286        errors: &mut Vec<ParserError>,
287    ) -> Result<Vec<Column>> {
288        let field_obj = field
289            .as_object()
290            .ok_or_else(|| anyhow::anyhow!("Field must be an object"))?;
291
292        let field_name = field_obj
293            .get("name")
294            .and_then(|v| v.as_str())
295            .ok_or_else(|| anyhow::anyhow!("Field missing name"))?
296            .to_string();
297
298        // Validate column name
299        if let Err(e) = validate_column_name(&field_name) {
300            warn!("Column name validation warning for '{}': {}", field_name, e);
301        }
302
303        let field_type = field_obj
304            .get("type")
305            .ok_or_else(|| anyhow::anyhow!("Field missing type"))?;
306
307        let description = field_obj
308            .get("doc")
309            .and_then(|v| v.as_str())
310            .map(|s| s.to_string())
311            .unwrap_or_default();
312
313        // Handle union types (e.g., ["null", "string"] for nullable)
314        let (avro_type, nullable) = if let Some(types) = field_type.as_array() {
315            if types.len() == 2 && types.iter().any(|t| t.as_str() == Some("null")) {
316                // Nullable type
317                let non_null_type = types
318                    .iter()
319                    .find(|t| t.as_str() != Some("null"))
320                    .ok_or_else(|| anyhow::anyhow!("Invalid union type"))?;
321                (non_null_type, true)
322            } else {
323                // Complex union with multiple non-null types - use first non-null type
324                // and mark as nullable since union implies optionality
325                let first_non_null = types
326                    .iter()
327                    .find(|t| t.as_str() != Some("null"))
328                    .unwrap_or(field_type);
329                (first_non_null, true)
330            }
331        } else {
332            (field_type, false)
333        };
334
335        // Parse the actual type
336        let mut columns = Vec::new();
337        if let Some(type_str) = avro_type.as_str() {
338            // Simple type
339            let data_type = self.map_avro_type_to_sql(type_str);
340            columns.push(Column {
341                name: field_name,
342                data_type,
343                physical_type: None,
344                nullable,
345                primary_key: false,
346                secondary_key: false,
347                composite_key: None,
348                foreign_key: None,
349                constraints: Vec::new(),
350                description,
351                quality: Vec::new(),
352                relationships: Vec::new(),
353                enum_values: Vec::new(),
354                errors: Vec::new(),
355                column_order: 0,
356                nested_data: None,
357            });
358        } else if let Some(type_obj) = avro_type.as_object() {
359            // Complex type (record, array, map)
360            if type_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
361                // Nested record - create nested columns with dot notation
362                let nested_name = type_obj
363                    .get("name")
364                    .and_then(|v| v.as_str())
365                    .unwrap_or(&field_name);
366                let nested_fields = type_obj
367                    .get("fields")
368                    .and_then(|v| v.as_array())
369                    .ok_or_else(|| anyhow::anyhow!("Nested record missing fields"))?;
370
371                for nested_field in nested_fields {
372                    match self.parse_field(nested_field, nested_name, errors) {
373                        Ok(mut nested_cols) => {
374                            // Prefix nested columns with parent field name
375                            for col in nested_cols.iter_mut() {
376                                col.name = format!("{}.{}", field_name, col.name);
377                            }
378                            columns.append(&mut nested_cols);
379                        }
380                        Err(e) => {
381                            errors.push(ParserError {
382                                error_type: "parse_error".to_string(),
383                                field: Some(format!("{}.{}", field_name, nested_name)),
384                                message: format!("Failed to parse nested field: {}", e),
385                            });
386                        }
387                    }
388                }
389            } else if type_obj.get("type").and_then(|v| v.as_str()) == Some("array") {
390                // Array type
391                let items = type_obj
392                    .get("items")
393                    .ok_or_else(|| anyhow::anyhow!("Array type missing items"))?;
394
395                let data_type = if let Some(items_str) = items.as_str() {
396                    format!("ARRAY<{}>", self.map_avro_type_to_sql(items_str))
397                } else if let Some(items_obj) = items.as_object() {
398                    if items_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
399                        // Array of records - create nested columns
400                        let nested_name = items_obj
401                            .get("name")
402                            .and_then(|v| v.as_str())
403                            .unwrap_or(&field_name);
404                        let nested_fields = items_obj
405                            .get("fields")
406                            .and_then(|v| v.as_array())
407                            .ok_or_else(|| anyhow::anyhow!("Array record missing fields"))?;
408
409                        for nested_field in nested_fields {
410                            match self.parse_field(nested_field, nested_name, errors) {
411                                Ok(mut nested_cols) => {
412                                    for col in nested_cols.iter_mut() {
413                                        col.name = format!("{}.{}", field_name, col.name);
414                                    }
415                                    columns.append(&mut nested_cols);
416                                }
417                                Err(e) => {
418                                    errors.push(ParserError {
419                                        error_type: "parse_error".to_string(),
420                                        field: Some(format!("{}.{}", field_name, nested_name)),
421                                        message: format!("Failed to parse array item field: {}", e),
422                                    });
423                                }
424                            }
425                        }
426                        return Ok(columns);
427                    } else {
428                        format!("ARRAY<{}>", "STRUCT")
429                    }
430                } else {
431                    "ARRAY<STRING>".to_string()
432                };
433
434                columns.push(Column {
435                    name: field_name,
436                    data_type,
437                    physical_type: None,
438                    nullable,
439                    primary_key: false,
440                    secondary_key: false,
441                    composite_key: None,
442                    foreign_key: None,
443                    constraints: Vec::new(),
444                    description,
445                    quality: Vec::new(),
446                    relationships: Vec::new(),
447                    enum_values: Vec::new(),
448                    errors: Vec::new(),
449                    column_order: 0,
450                    nested_data: None,
451                });
452            } else {
453                // Other complex types - default to STRUCT
454                columns.push(Column {
455                    name: field_name,
456                    data_type: "STRUCT".to_string(),
457                    physical_type: None,
458                    nullable,
459                    primary_key: false,
460                    secondary_key: false,
461                    composite_key: None,
462                    foreign_key: None,
463                    constraints: Vec::new(),
464                    description,
465                    quality: Vec::new(),
466                    relationships: Vec::new(),
467                    enum_values: Vec::new(),
468                    errors: Vec::new(),
469                    column_order: 0,
470                    nested_data: None,
471                });
472            }
473        } else {
474            return Err(anyhow::anyhow!("Unsupported field type format"));
475        }
476
477        Ok(columns)
478    }
479
480    /// Map AVRO type to SQL/ODCL data type.
481    fn map_avro_type_to_sql(&self, avro_type: &str) -> String {
482        match avro_type {
483            "int" => "INTEGER".to_string(),
484            "long" => "BIGINT".to_string(),
485            "float" => "FLOAT".to_string(),
486            "double" => "DOUBLE".to_string(),
487            "boolean" => "BOOLEAN".to_string(),
488            "bytes" => "BYTES".to_string(),
489            "string" => "STRING".to_string(),
490            "null" => "NULL".to_string(),
491            _ => "STRING".to_string(), // Default fallback
492        }
493    }
494}
495
496/// Parser error structure (matches ODCL parser format).
497#[derive(Debug, Clone)]
498pub struct ParserError {
499    pub error_type: String,
500    pub field: Option<String>,
501    pub message: String,
502}