data_modelling_sdk/import/
avro.rs

1//! AVRO schema parser for importing AVRO schemas into data models.
2//!
3//! # Validation
4//!
5//! All imported table and column names are validated for:
6//! - Valid identifier format
7//! - Maximum length limits
8
9use crate::import::odcs_shared::column_to_column_data;
10use crate::import::{ImportError, ImportResult, TableData};
11use crate::models::{Column, Table, Tag};
12use crate::validation::input::{validate_column_name, validate_table_name};
13use anyhow::{Context, Result};
14use serde_json::{Value, json};
15use std::collections::HashMap;
16use std::str::FromStr;
17use tracing::{info, warn};
18
19/// Parser for AVRO schema format.
20#[derive(Default)]
21pub struct AvroImporter;
22
23impl AvroImporter {
24    /// Create a new AVRO parser instance.
25    ///
26    /// # Example
27    ///
28    /// ```rust
29    /// use data_modelling_sdk::import::avro::AvroImporter;
30    ///
31    /// let importer = AvroImporter::new();
32    /// ```
33    pub fn new() -> Self {
34        Self
35    }
36
37    /// Import AVRO schema content and create Table(s) (SDK interface).
38    ///
39    /// # Arguments
40    ///
41    /// * `avro_content` - AVRO schema as JSON string (can be a single record or array of records)
42    ///
43    /// # Returns
44    ///
45    /// An `ImportResult` containing extracted tables and any parse errors.
46    ///
47    /// # Example
48    ///
49    /// ```rust
50    /// use data_modelling_sdk::import::avro::AvroImporter;
51    ///
52    /// let importer = AvroImporter::new();
53    /// let schema = r#"
54    /// {
55    ///   "type": "record",
56    ///   "name": "User",
57    ///   "fields": [
58    ///     {"name": "id", "type": "long"}
59    ///   ]
60    /// }
61    /// "#;
62    /// let result = importer.import(schema).unwrap();
63    /// ```
64    pub fn import(&self, avro_content: &str) -> Result<ImportResult, ImportError> {
65        match self.parse(avro_content) {
66            Ok((tables, errors)) => {
67                let mut sdk_tables = Vec::new();
68                for (idx, table) in tables.iter().enumerate() {
69                    sdk_tables.push(TableData {
70                        table_index: idx,
71                        name: Some(table.name.clone()),
72                        columns: table.columns.iter().map(column_to_column_data).collect(),
73                    });
74                }
75                let sdk_errors: Vec<ImportError> = errors
76                    .iter()
77                    .map(|e| ImportError::ParseError(e.message.clone()))
78                    .collect();
79                Ok(ImportResult {
80                    tables: sdk_tables,
81                    tables_requiring_name: Vec::new(),
82                    errors: sdk_errors,
83                    ai_suggestions: None,
84                })
85            }
86            Err(e) => Err(ImportError::ParseError(e.to_string())),
87        }
88    }
89
90    /// Parse AVRO schema content and create Table(s) (internal method).
91    ///
92    /// # Returns
93    ///
94    /// Returns a tuple of (Tables, list of errors/warnings).
95    fn parse(&self, avro_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
96        let mut errors = Vec::new();
97
98        // Parse JSON
99        let schema: Value =
100            serde_json::from_str(avro_content).context("Failed to parse AVRO schema as JSON")?;
101
102        let mut tables = Vec::new();
103
104        // AVRO can be a single record or an array of records
105        if let Some(schemas) = schema.as_array() {
106            // Multiple schemas
107            for (idx, schema_item) in schemas.iter().enumerate() {
108                match self.parse_schema(schema_item, &mut errors) {
109                    Ok(table) => tables.push(table),
110                    Err(e) => {
111                        errors.push(ParserError {
112                            error_type: "parse_error".to_string(),
113                            field: Some(format!("schema[{}]", idx)),
114                            message: format!("Failed to parse schema: {}", e),
115                        });
116                    }
117                }
118            }
119        } else {
120            // Single schema
121            match self.parse_schema(&schema, &mut errors) {
122                Ok(table) => tables.push(table),
123                Err(e) => {
124                    errors.push(ParserError {
125                        error_type: "parse_error".to_string(),
126                        field: None,
127                        message: format!("Failed to parse schema: {}", e),
128                    });
129                }
130            }
131        }
132
133        Ok((tables, errors))
134    }
135
136    /// Parse a single AVRO schema record.
137    fn parse_schema(&self, schema: &Value, errors: &mut Vec<ParserError>) -> Result<Table> {
138        let schema_obj = schema
139            .as_object()
140            .ok_or_else(|| anyhow::anyhow!("Schema must be an object"))?;
141
142        // Extract record name
143        let name = schema_obj
144            .get("name")
145            .and_then(|v| v.as_str())
146            .ok_or_else(|| anyhow::anyhow!("Missing required field: name"))?
147            .to_string();
148
149        // Validate table name
150        if let Err(e) = validate_table_name(&name) {
151            warn!("Table name validation warning for '{}': {}", name, e);
152        }
153
154        // Extract namespace (optional)
155        let namespace = schema_obj
156            .get("namespace")
157            .and_then(|v| v.as_str())
158            .map(|s| s.to_string());
159
160        // Extract fields
161        let fields = schema_obj
162            .get("fields")
163            .and_then(|v| v.as_array())
164            .ok_or_else(|| anyhow::anyhow!("Missing required field: fields"))?;
165
166        let mut columns = Vec::new();
167        for (idx, field) in fields.iter().enumerate() {
168            match self.parse_field(field, &name, errors) {
169                Ok(mut cols) => columns.append(&mut cols),
170                Err(e) => {
171                    errors.push(ParserError {
172                        error_type: "parse_error".to_string(),
173                        field: Some(format!("fields[{}]", idx)),
174                        message: format!("Failed to parse field: {}", e),
175                    });
176                }
177            }
178        }
179
180        // Extract tags from AVRO schema (can be in root or in aliases/metadata)
181        let mut tags: Vec<Tag> = Vec::new();
182        if let Some(tags_arr) = schema_obj.get("tags").and_then(|v| v.as_array()) {
183            for item in tags_arr {
184                if let Some(s) = item.as_str() {
185                    if let Ok(tag) = Tag::from_str(s) {
186                        tags.push(tag);
187                    } else {
188                        tags.push(Tag::Simple(s.to_string()));
189                    }
190                }
191            }
192        }
193        // Also check aliases/metadata for tags
194        if let Some(aliases_arr) = schema_obj.get("aliases").and_then(|v| v.as_array()) {
195            for item in aliases_arr {
196                if let Some(s) = item.as_str() {
197                    // AVRO aliases can be used as tags
198                    if let Ok(tag) = Tag::from_str(s) {
199                        if !tags.contains(&tag) {
200                            tags.push(tag);
201                        }
202                    } else {
203                        let simple_tag = Tag::Simple(s.to_string());
204                        if !tags.contains(&simple_tag) {
205                            tags.push(simple_tag);
206                        }
207                    }
208                }
209            }
210        }
211
212        // Build table metadata
213        let mut odcl_metadata = HashMap::new();
214        if let Some(ref ns) = namespace {
215            odcl_metadata.insert("namespace".to_string(), json!(ns));
216        }
217        if let Some(doc) = schema_obj.get("doc").and_then(|v| v.as_str()) {
218            odcl_metadata.insert("description".to_string(), json!(doc));
219        }
220
221        let table = Table {
222            id: crate::models::table::Table::generate_id(&name, None, None, namespace.as_deref()),
223            name: name.clone(),
224            columns,
225            database_type: None,
226            catalog_name: None,
227            schema_name: namespace.clone(),
228            medallion_layers: Vec::new(),
229            scd_pattern: None,
230            data_vault_classification: None,
231            modeling_level: None,
232            tags,
233            odcl_metadata,
234            owner: None,
235            sla: None,
236            contact_details: None,
237            infrastructure_type: None,
238            notes: None,
239            position: None,
240            yaml_file_path: None,
241            drawio_cell_id: None,
242            quality: Vec::new(),
243            errors: Vec::new(),
244            created_at: chrono::Utc::now(),
245            updated_at: chrono::Utc::now(),
246        };
247
248        info!(
249            "Parsed AVRO schema: {} with {} columns",
250            name,
251            table.columns.len()
252        );
253        Ok(table)
254    }
255
256    /// Parse an AVRO field (which can be a simple field or nested record).
257    fn parse_field(
258        &self,
259        field: &Value,
260        _parent_name: &str,
261        errors: &mut Vec<ParserError>,
262    ) -> Result<Vec<Column>> {
263        let field_obj = field
264            .as_object()
265            .ok_or_else(|| anyhow::anyhow!("Field must be an object"))?;
266
267        let field_name = field_obj
268            .get("name")
269            .and_then(|v| v.as_str())
270            .ok_or_else(|| anyhow::anyhow!("Field missing name"))?
271            .to_string();
272
273        // Validate column name
274        if let Err(e) = validate_column_name(&field_name) {
275            warn!("Column name validation warning for '{}': {}", field_name, e);
276        }
277
278        let field_type = field_obj
279            .get("type")
280            .ok_or_else(|| anyhow::anyhow!("Field missing type"))?;
281
282        let description = field_obj
283            .get("doc")
284            .and_then(|v| v.as_str())
285            .map(|s| s.to_string())
286            .unwrap_or_default();
287
288        // Handle union types (e.g., ["null", "string"] for nullable)
289        let (avro_type, nullable) = if let Some(types) = field_type.as_array() {
290            if types.len() == 2 && types.iter().any(|t| t.as_str() == Some("null")) {
291                // Nullable type
292                let non_null_type = types
293                    .iter()
294                    .find(|t| t.as_str() != Some("null"))
295                    .ok_or_else(|| anyhow::anyhow!("Invalid union type"))?;
296                (non_null_type, true)
297            } else {
298                // Complex union with multiple non-null types - use first non-null type
299                // and mark as nullable since union implies optionality
300                let first_non_null = types
301                    .iter()
302                    .find(|t| t.as_str() != Some("null"))
303                    .unwrap_or(field_type);
304                (first_non_null, true)
305            }
306        } else {
307            (field_type, false)
308        };
309
310        // Parse the actual type
311        let mut columns = Vec::new();
312        if let Some(type_str) = avro_type.as_str() {
313            // Simple type
314            let data_type = self.map_avro_type_to_sql(type_str);
315            columns.push(Column {
316                name: field_name,
317                data_type,
318                nullable,
319                description,
320                ..Default::default()
321            });
322        } else if let Some(type_obj) = avro_type.as_object() {
323            // Complex type (record, array, map)
324            if type_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
325                // Nested record - create nested columns with dot notation
326                let nested_name = type_obj
327                    .get("name")
328                    .and_then(|v| v.as_str())
329                    .unwrap_or(&field_name);
330                let nested_fields = type_obj
331                    .get("fields")
332                    .and_then(|v| v.as_array())
333                    .ok_or_else(|| anyhow::anyhow!("Nested record missing fields"))?;
334
335                for nested_field in nested_fields {
336                    match self.parse_field(nested_field, nested_name, errors) {
337                        Ok(mut nested_cols) => {
338                            // Prefix nested columns with parent field name
339                            for col in nested_cols.iter_mut() {
340                                col.name = format!("{}.{}", field_name, col.name);
341                            }
342                            columns.append(&mut nested_cols);
343                        }
344                        Err(e) => {
345                            errors.push(ParserError {
346                                error_type: "parse_error".to_string(),
347                                field: Some(format!("{}.{}", field_name, nested_name)),
348                                message: format!("Failed to parse nested field: {}", e),
349                            });
350                        }
351                    }
352                }
353            } else if type_obj.get("type").and_then(|v| v.as_str()) == Some("array") {
354                // Array type
355                let items = type_obj
356                    .get("items")
357                    .ok_or_else(|| anyhow::anyhow!("Array type missing items"))?;
358
359                let data_type = if let Some(items_str) = items.as_str() {
360                    format!("ARRAY<{}>", self.map_avro_type_to_sql(items_str))
361                } else if let Some(items_obj) = items.as_object() {
362                    if items_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
363                        // Array of records - create nested columns
364                        let nested_name = items_obj
365                            .get("name")
366                            .and_then(|v| v.as_str())
367                            .unwrap_or(&field_name);
368                        let nested_fields = items_obj
369                            .get("fields")
370                            .and_then(|v| v.as_array())
371                            .ok_or_else(|| anyhow::anyhow!("Array record missing fields"))?;
372
373                        for nested_field in nested_fields {
374                            match self.parse_field(nested_field, nested_name, errors) {
375                                Ok(mut nested_cols) => {
376                                    for col in nested_cols.iter_mut() {
377                                        col.name = format!("{}.{}", field_name, col.name);
378                                    }
379                                    columns.append(&mut nested_cols);
380                                }
381                                Err(e) => {
382                                    errors.push(ParserError {
383                                        error_type: "parse_error".to_string(),
384                                        field: Some(format!("{}.{}", field_name, nested_name)),
385                                        message: format!("Failed to parse array item field: {}", e),
386                                    });
387                                }
388                            }
389                        }
390                        return Ok(columns);
391                    } else {
392                        format!("ARRAY<{}>", "STRUCT")
393                    }
394                } else {
395                    "ARRAY<STRING>".to_string()
396                };
397
398                columns.push(Column {
399                    name: field_name,
400                    data_type,
401                    nullable,
402                    description,
403                    ..Default::default()
404                });
405            } else {
406                // Other complex types - default to STRUCT
407                columns.push(Column {
408                    name: field_name,
409                    data_type: "STRUCT".to_string(),
410                    nullable,
411                    description,
412                    ..Default::default()
413                });
414            }
415        } else {
416            return Err(anyhow::anyhow!("Unsupported field type format"));
417        }
418
419        Ok(columns)
420    }
421
422    /// Map AVRO type to SQL/ODCL data type.
423    fn map_avro_type_to_sql(&self, avro_type: &str) -> String {
424        match avro_type {
425            "int" => "INTEGER".to_string(),
426            "long" => "BIGINT".to_string(),
427            "float" => "FLOAT".to_string(),
428            "double" => "DOUBLE".to_string(),
429            "boolean" => "BOOLEAN".to_string(),
430            "bytes" => "BYTES".to_string(),
431            "string" => "STRING".to_string(),
432            "null" => "NULL".to_string(),
433            _ => "STRING".to_string(), // Default fallback
434        }
435    }
436}
437
438/// Parser error structure (matches ODCL parser format).
439#[derive(Debug, Clone)]
440pub struct ParserError {
441    pub error_type: String,
442    pub field: Option<String>,
443    pub message: String,
444}