data_modelling_sdk/import/
avro.rs

1//! AVRO schema parser for importing AVRO schemas into data models.
2//!
3//! # Validation
4//!
5//! All imported table and column names are validated for:
6//! - Valid identifier format
7//! - Maximum length limits
8
9use crate::import::{ImportError, ImportResult, TableData};
10use crate::models::{Column, Table, Tag};
11use crate::validation::input::{validate_column_name, validate_table_name};
12use anyhow::{Context, Result};
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use std::str::FromStr;
16use tracing::{info, warn};
17
18/// Parser for AVRO schema format.
19#[derive(Default)]
20pub struct AvroImporter;
21
22impl AvroImporter {
23    /// Create a new AVRO parser instance.
24    ///
25    /// # Example
26    ///
27    /// ```rust
28    /// use data_modelling_sdk::import::avro::AvroImporter;
29    ///
30    /// let importer = AvroImporter::new();
31    /// ```
32    pub fn new() -> Self {
33        Self
34    }
35
36    /// Import AVRO schema content and create Table(s) (SDK interface).
37    ///
38    /// # Arguments
39    ///
40    /// * `avro_content` - AVRO schema as JSON string (can be a single record or array of records)
41    ///
42    /// # Returns
43    ///
44    /// An `ImportResult` containing extracted tables and any parse errors.
45    ///
46    /// # Example
47    ///
48    /// ```rust
49    /// use data_modelling_sdk::import::avro::AvroImporter;
50    ///
51    /// let importer = AvroImporter::new();
52    /// let schema = r#"
53    /// {
54    ///   "type": "record",
55    ///   "name": "User",
56    ///   "fields": [
57    ///     {"name": "id", "type": "long"}
58    ///   ]
59    /// }
60    /// "#;
61    /// let result = importer.import(schema).unwrap();
62    /// ```
63    pub fn import(&self, avro_content: &str) -> Result<ImportResult, ImportError> {
64        match self.parse(avro_content) {
65            Ok((tables, errors)) => {
66                let mut sdk_tables = Vec::new();
67                for (idx, table) in tables.iter().enumerate() {
68                    sdk_tables.push(TableData {
69                        table_index: idx,
70                        name: Some(table.name.clone()),
71                        columns: table
72                            .columns
73                            .iter()
74                            .map(|c| super::ColumnData {
75                                name: c.name.clone(),
76                                data_type: c.data_type.clone(),
77                                nullable: c.nullable,
78                                primary_key: c.primary_key,
79                                description: if c.description.is_empty() {
80                                    None
81                                } else {
82                                    Some(c.description.clone())
83                                },
84                                quality: if c.quality.is_empty() {
85                                    None
86                                } else {
87                                    Some(c.quality.clone())
88                                },
89                                ref_path: c.ref_path.clone(),
90                            })
91                            .collect(),
92                    });
93                }
94                let sdk_errors: Vec<ImportError> = errors
95                    .iter()
96                    .map(|e| ImportError::ParseError(e.message.clone()))
97                    .collect();
98                Ok(ImportResult {
99                    tables: sdk_tables,
100                    tables_requiring_name: Vec::new(),
101                    errors: sdk_errors,
102                    ai_suggestions: None,
103                })
104            }
105            Err(e) => Err(ImportError::ParseError(e.to_string())),
106        }
107    }
108
109    /// Parse AVRO schema content and create Table(s) (internal method).
110    ///
111    /// # Returns
112    ///
113    /// Returns a tuple of (Tables, list of errors/warnings).
114    fn parse(&self, avro_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
115        let mut errors = Vec::new();
116
117        // Parse JSON
118        let schema: Value =
119            serde_json::from_str(avro_content).context("Failed to parse AVRO schema as JSON")?;
120
121        let mut tables = Vec::new();
122
123        // AVRO can be a single record or an array of records
124        if let Some(schemas) = schema.as_array() {
125            // Multiple schemas
126            for (idx, schema_item) in schemas.iter().enumerate() {
127                match self.parse_schema(schema_item, &mut errors) {
128                    Ok(table) => tables.push(table),
129                    Err(e) => {
130                        errors.push(ParserError {
131                            error_type: "parse_error".to_string(),
132                            field: Some(format!("schema[{}]", idx)),
133                            message: format!("Failed to parse schema: {}", e),
134                        });
135                    }
136                }
137            }
138        } else {
139            // Single schema
140            match self.parse_schema(&schema, &mut errors) {
141                Ok(table) => tables.push(table),
142                Err(e) => {
143                    errors.push(ParserError {
144                        error_type: "parse_error".to_string(),
145                        field: None,
146                        message: format!("Failed to parse schema: {}", e),
147                    });
148                }
149            }
150        }
151
152        Ok((tables, errors))
153    }
154
155    /// Parse a single AVRO schema record.
156    fn parse_schema(&self, schema: &Value, errors: &mut Vec<ParserError>) -> Result<Table> {
157        let schema_obj = schema
158            .as_object()
159            .ok_or_else(|| anyhow::anyhow!("Schema must be an object"))?;
160
161        // Extract record name
162        let name = schema_obj
163            .get("name")
164            .and_then(|v| v.as_str())
165            .ok_or_else(|| anyhow::anyhow!("Missing required field: name"))?
166            .to_string();
167
168        // Validate table name
169        if let Err(e) = validate_table_name(&name) {
170            warn!("Table name validation warning for '{}': {}", name, e);
171        }
172
173        // Extract namespace (optional)
174        let namespace = schema_obj
175            .get("namespace")
176            .and_then(|v| v.as_str())
177            .map(|s| s.to_string());
178
179        // Extract fields
180        let fields = schema_obj
181            .get("fields")
182            .and_then(|v| v.as_array())
183            .ok_or_else(|| anyhow::anyhow!("Missing required field: fields"))?;
184
185        let mut columns = Vec::new();
186        for (idx, field) in fields.iter().enumerate() {
187            match self.parse_field(field, &name, errors) {
188                Ok(mut cols) => columns.append(&mut cols),
189                Err(e) => {
190                    errors.push(ParserError {
191                        error_type: "parse_error".to_string(),
192                        field: Some(format!("fields[{}]", idx)),
193                        message: format!("Failed to parse field: {}", e),
194                    });
195                }
196            }
197        }
198
199        // Extract tags from AVRO schema (can be in root or in aliases/metadata)
200        let mut tags: Vec<Tag> = Vec::new();
201        if let Some(tags_arr) = schema_obj.get("tags").and_then(|v| v.as_array()) {
202            for item in tags_arr {
203                if let Some(s) = item.as_str() {
204                    if let Ok(tag) = Tag::from_str(s) {
205                        tags.push(tag);
206                    } else {
207                        tags.push(Tag::Simple(s.to_string()));
208                    }
209                }
210            }
211        }
212        // Also check aliases/metadata for tags
213        if let Some(aliases_arr) = schema_obj.get("aliases").and_then(|v| v.as_array()) {
214            for item in aliases_arr {
215                if let Some(s) = item.as_str() {
216                    // AVRO aliases can be used as tags
217                    if let Ok(tag) = Tag::from_str(s) {
218                        if !tags.contains(&tag) {
219                            tags.push(tag);
220                        }
221                    } else {
222                        let simple_tag = Tag::Simple(s.to_string());
223                        if !tags.contains(&simple_tag) {
224                            tags.push(simple_tag);
225                        }
226                    }
227                }
228            }
229        }
230
231        // Build table metadata
232        let mut odcl_metadata = HashMap::new();
233        if let Some(ref ns) = namespace {
234            odcl_metadata.insert("namespace".to_string(), json!(ns));
235        }
236        if let Some(doc) = schema_obj.get("doc").and_then(|v| v.as_str()) {
237            odcl_metadata.insert("description".to_string(), json!(doc));
238        }
239
240        let table = Table {
241            id: crate::models::table::Table::generate_id(&name, None, None, namespace.as_deref()),
242            name: name.clone(),
243            columns,
244            database_type: None,
245            catalog_name: None,
246            schema_name: namespace.clone(),
247            medallion_layers: Vec::new(),
248            scd_pattern: None,
249            data_vault_classification: None,
250            modeling_level: None,
251            tags,
252            odcl_metadata,
253            owner: None,
254            sla: None,
255            contact_details: None,
256            infrastructure_type: None,
257            notes: None,
258            position: None,
259            yaml_file_path: None,
260            drawio_cell_id: None,
261            quality: Vec::new(),
262            errors: Vec::new(),
263            created_at: chrono::Utc::now(),
264            updated_at: chrono::Utc::now(),
265        };
266
267        info!(
268            "Parsed AVRO schema: {} with {} columns",
269            name,
270            table.columns.len()
271        );
272        Ok(table)
273    }
274
275    /// Parse an AVRO field (which can be a simple field or nested record).
276    fn parse_field(
277        &self,
278        field: &Value,
279        _parent_name: &str,
280        errors: &mut Vec<ParserError>,
281    ) -> Result<Vec<Column>> {
282        let field_obj = field
283            .as_object()
284            .ok_or_else(|| anyhow::anyhow!("Field must be an object"))?;
285
286        let field_name = field_obj
287            .get("name")
288            .and_then(|v| v.as_str())
289            .ok_or_else(|| anyhow::anyhow!("Field missing name"))?
290            .to_string();
291
292        // Validate column name
293        if let Err(e) = validate_column_name(&field_name) {
294            warn!("Column name validation warning for '{}': {}", field_name, e);
295        }
296
297        let field_type = field_obj
298            .get("type")
299            .ok_or_else(|| anyhow::anyhow!("Field missing type"))?;
300
301        let description = field_obj
302            .get("doc")
303            .and_then(|v| v.as_str())
304            .map(|s| s.to_string())
305            .unwrap_or_default();
306
307        // Handle union types (e.g., ["null", "string"] for nullable)
308        let (avro_type, nullable) = if let Some(types) = field_type.as_array() {
309            if types.len() == 2 && types.iter().any(|t| t.as_str() == Some("null")) {
310                // Nullable type
311                let non_null_type = types
312                    .iter()
313                    .find(|t| t.as_str() != Some("null"))
314                    .ok_or_else(|| anyhow::anyhow!("Invalid union type"))?;
315                (non_null_type, true)
316            } else {
317                // Complex union with multiple non-null types - use first non-null type
318                // and mark as nullable since union implies optionality
319                let first_non_null = types
320                    .iter()
321                    .find(|t| t.as_str() != Some("null"))
322                    .unwrap_or(field_type);
323                (first_non_null, true)
324            }
325        } else {
326            (field_type, false)
327        };
328
329        // Parse the actual type
330        let mut columns = Vec::new();
331        if let Some(type_str) = avro_type.as_str() {
332            // Simple type
333            let data_type = self.map_avro_type_to_sql(type_str);
334            columns.push(Column {
335                name: field_name,
336                data_type,
337                nullable,
338                primary_key: false,
339                secondary_key: false,
340                composite_key: None,
341                foreign_key: None,
342                constraints: Vec::new(),
343                description,
344                quality: Vec::new(),
345                ref_path: None,
346                enum_values: Vec::new(),
347                errors: Vec::new(),
348                column_order: 0,
349            });
350        } else if let Some(type_obj) = avro_type.as_object() {
351            // Complex type (record, array, map)
352            if type_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
353                // Nested record - create nested columns with dot notation
354                let nested_name = type_obj
355                    .get("name")
356                    .and_then(|v| v.as_str())
357                    .unwrap_or(&field_name);
358                let nested_fields = type_obj
359                    .get("fields")
360                    .and_then(|v| v.as_array())
361                    .ok_or_else(|| anyhow::anyhow!("Nested record missing fields"))?;
362
363                for nested_field in nested_fields {
364                    match self.parse_field(nested_field, nested_name, errors) {
365                        Ok(mut nested_cols) => {
366                            // Prefix nested columns with parent field name
367                            for col in nested_cols.iter_mut() {
368                                col.name = format!("{}.{}", field_name, col.name);
369                            }
370                            columns.append(&mut nested_cols);
371                        }
372                        Err(e) => {
373                            errors.push(ParserError {
374                                error_type: "parse_error".to_string(),
375                                field: Some(format!("{}.{}", field_name, nested_name)),
376                                message: format!("Failed to parse nested field: {}", e),
377                            });
378                        }
379                    }
380                }
381            } else if type_obj.get("type").and_then(|v| v.as_str()) == Some("array") {
382                // Array type
383                let items = type_obj
384                    .get("items")
385                    .ok_or_else(|| anyhow::anyhow!("Array type missing items"))?;
386
387                let data_type = if let Some(items_str) = items.as_str() {
388                    format!("ARRAY<{}>", self.map_avro_type_to_sql(items_str))
389                } else if let Some(items_obj) = items.as_object() {
390                    if items_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
391                        // Array of records - create nested columns
392                        let nested_name = items_obj
393                            .get("name")
394                            .and_then(|v| v.as_str())
395                            .unwrap_or(&field_name);
396                        let nested_fields = items_obj
397                            .get("fields")
398                            .and_then(|v| v.as_array())
399                            .ok_or_else(|| anyhow::anyhow!("Array record missing fields"))?;
400
401                        for nested_field in nested_fields {
402                            match self.parse_field(nested_field, nested_name, errors) {
403                                Ok(mut nested_cols) => {
404                                    for col in nested_cols.iter_mut() {
405                                        col.name = format!("{}.{}", field_name, col.name);
406                                    }
407                                    columns.append(&mut nested_cols);
408                                }
409                                Err(e) => {
410                                    errors.push(ParserError {
411                                        error_type: "parse_error".to_string(),
412                                        field: Some(format!("{}.{}", field_name, nested_name)),
413                                        message: format!("Failed to parse array item field: {}", e),
414                                    });
415                                }
416                            }
417                        }
418                        return Ok(columns);
419                    } else {
420                        format!("ARRAY<{}>", "STRUCT")
421                    }
422                } else {
423                    "ARRAY<STRING>".to_string()
424                };
425
426                columns.push(Column {
427                    name: field_name,
428                    data_type,
429                    nullable,
430                    primary_key: false,
431                    secondary_key: false,
432                    composite_key: None,
433                    foreign_key: None,
434                    constraints: Vec::new(),
435                    description,
436                    quality: Vec::new(),
437                    ref_path: None,
438                    enum_values: Vec::new(),
439                    errors: Vec::new(),
440                    column_order: 0,
441                });
442            } else {
443                // Other complex types - default to STRUCT
444                columns.push(Column {
445                    name: field_name,
446                    data_type: "STRUCT".to_string(),
447                    nullable,
448                    primary_key: false,
449                    secondary_key: false,
450                    composite_key: None,
451                    foreign_key: None,
452                    constraints: Vec::new(),
453                    description,
454                    quality: Vec::new(),
455                    ref_path: None,
456                    enum_values: Vec::new(),
457                    errors: Vec::new(),
458                    column_order: 0,
459                });
460            }
461        } else {
462            return Err(anyhow::anyhow!("Unsupported field type format"));
463        }
464
465        Ok(columns)
466    }
467
468    /// Map AVRO type to SQL/ODCL data type.
469    fn map_avro_type_to_sql(&self, avro_type: &str) -> String {
470        match avro_type {
471            "int" => "INTEGER".to_string(),
472            "long" => "BIGINT".to_string(),
473            "float" => "FLOAT".to_string(),
474            "double" => "DOUBLE".to_string(),
475            "boolean" => "BOOLEAN".to_string(),
476            "bytes" => "BYTES".to_string(),
477            "string" => "STRING".to_string(),
478            "null" => "NULL".to_string(),
479            _ => "STRING".to_string(), // Default fallback
480        }
481    }
482}
483
484/// Parser error structure (matches ODCL parser format).
485#[derive(Debug, Clone)]
486pub struct ParserError {
487    pub error_type: String,
488    pub field: Option<String>,
489    pub message: String,
490}