data_modelling_core/import/
avro.rs

1//! AVRO schema parser for importing AVRO schemas into data models.
2//!
3//! # Validation
4//!
5//! All imported table and column names are validated for:
6//! - Valid identifier format
7//! - Maximum length limits
8
9use crate::import::odcs_shared::column_to_column_data;
10use crate::import::{ImportError, ImportResult, TableData};
11use crate::models::{Column, Table, Tag};
12use crate::validation::input::{validate_column_name, validate_table_name};
13use anyhow::{Context, Result};
14use serde_json::{Value, json};
15use std::collections::HashMap;
16use std::str::FromStr;
17use tracing::{info, warn};
18
19/// Parser for AVRO schema format.
20#[derive(Default)]
21pub struct AvroImporter;
22
23impl AvroImporter {
24    /// Create a new AVRO parser instance.
25    ///
26    /// # Example
27    ///
28    /// ```rust
29    /// use data_modelling_core::import::avro::AvroImporter;
30    ///
31    /// let importer = AvroImporter::new();
32    /// ```
33    pub fn new() -> Self {
34        Self
35    }
36
37    /// Import AVRO schema content and create Table(s) (SDK interface).
38    ///
39    /// # Arguments
40    ///
41    /// * `avro_content` - AVRO schema as JSON string (can be a single record or array of records)
42    ///
43    /// # Returns
44    ///
45    /// An `ImportResult` containing extracted tables and any parse errors.
46    ///
47    /// # Example
48    ///
49    /// ```rust
50    /// use data_modelling_core::import::avro::AvroImporter;
51    ///
52    /// let importer = AvroImporter::new();
53    /// let schema = r#"
54    /// {
55    ///   "type": "record",
56    ///   "name": "User",
57    ///   "fields": [
58    ///     {"name": "id", "type": "long"}
59    ///   ]
60    /// }
61    /// "#;
62    /// let result = importer.import(schema).unwrap();
63    /// ```
64    pub fn import(&self, avro_content: &str) -> Result<ImportResult, ImportError> {
65        match self.parse(avro_content) {
66            Ok((tables, errors)) => {
67                let mut sdk_tables = Vec::new();
68                for (idx, table) in tables.iter().enumerate() {
69                    sdk_tables.push(TableData {
70                        table_index: idx,
71                        id: Some(table.id.to_string()),
72                        name: Some(table.name.clone()),
73                        columns: table.columns.iter().map(column_to_column_data).collect(),
74                        ..Default::default()
75                    });
76                }
77                let sdk_errors: Vec<ImportError> = errors
78                    .iter()
79                    .map(|e| ImportError::ParseError(e.message.clone()))
80                    .collect();
81                Ok(ImportResult {
82                    tables: sdk_tables,
83                    tables_requiring_name: Vec::new(),
84                    errors: sdk_errors,
85                    ai_suggestions: None,
86                })
87            }
88            Err(e) => Err(ImportError::ParseError(e.to_string())),
89        }
90    }
91
92    /// Parse AVRO schema content and create Table(s) (internal method).
93    ///
94    /// # Returns
95    ///
96    /// Returns a tuple of (Tables, list of errors/warnings).
97    fn parse(&self, avro_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
98        let mut errors = Vec::new();
99
100        // Parse JSON
101        let schema: Value =
102            serde_json::from_str(avro_content).context("Failed to parse AVRO schema as JSON")?;
103
104        let mut tables = Vec::new();
105
106        // AVRO can be a single record or an array of records
107        if let Some(schemas) = schema.as_array() {
108            // Multiple schemas
109            for (idx, schema_item) in schemas.iter().enumerate() {
110                match self.parse_schema(schema_item, &mut errors) {
111                    Ok(table) => tables.push(table),
112                    Err(e) => {
113                        errors.push(ParserError {
114                            error_type: "parse_error".to_string(),
115                            field: Some(format!("schema[{}]", idx)),
116                            message: format!("Failed to parse schema: {}", e),
117                        });
118                    }
119                }
120            }
121        } else {
122            // Single schema
123            match self.parse_schema(&schema, &mut errors) {
124                Ok(table) => tables.push(table),
125                Err(e) => {
126                    errors.push(ParserError {
127                        error_type: "parse_error".to_string(),
128                        field: None,
129                        message: format!("Failed to parse schema: {}", e),
130                    });
131                }
132            }
133        }
134
135        Ok((tables, errors))
136    }
137
138    /// Parse a single AVRO schema record.
139    fn parse_schema(&self, schema: &Value, errors: &mut Vec<ParserError>) -> Result<Table> {
140        let schema_obj = schema
141            .as_object()
142            .ok_or_else(|| anyhow::anyhow!("Schema must be an object"))?;
143
144        // Extract record name
145        let name = schema_obj
146            .get("name")
147            .and_then(|v| v.as_str())
148            .ok_or_else(|| anyhow::anyhow!("Missing required field: name"))?
149            .to_string();
150
151        // Validate table name
152        if let Err(e) = validate_table_name(&name) {
153            warn!("Table name validation warning for '{}': {}", name, e);
154        }
155
156        // Extract namespace (optional)
157        let namespace = schema_obj
158            .get("namespace")
159            .and_then(|v| v.as_str())
160            .map(|s| s.to_string());
161
162        // Extract fields
163        let fields = schema_obj
164            .get("fields")
165            .and_then(|v| v.as_array())
166            .ok_or_else(|| anyhow::anyhow!("Missing required field: fields"))?;
167
168        let mut columns = Vec::new();
169        for (idx, field) in fields.iter().enumerate() {
170            match self.parse_field(field, &name, errors) {
171                Ok(mut cols) => columns.append(&mut cols),
172                Err(e) => {
173                    errors.push(ParserError {
174                        error_type: "parse_error".to_string(),
175                        field: Some(format!("fields[{}]", idx)),
176                        message: format!("Failed to parse field: {}", e),
177                    });
178                }
179            }
180        }
181
182        // Extract tags from AVRO schema (can be in root or in aliases/metadata)
183        let mut tags: Vec<Tag> = Vec::new();
184        if let Some(tags_arr) = schema_obj.get("tags").and_then(|v| v.as_array()) {
185            for item in tags_arr {
186                if let Some(s) = item.as_str() {
187                    if let Ok(tag) = Tag::from_str(s) {
188                        tags.push(tag);
189                    } else {
190                        tags.push(Tag::Simple(s.to_string()));
191                    }
192                }
193            }
194        }
195        // Also check aliases/metadata for tags
196        if let Some(aliases_arr) = schema_obj.get("aliases").and_then(|v| v.as_array()) {
197            for item in aliases_arr {
198                if let Some(s) = item.as_str() {
199                    // AVRO aliases can be used as tags
200                    if let Ok(tag) = Tag::from_str(s) {
201                        if !tags.contains(&tag) {
202                            tags.push(tag);
203                        }
204                    } else {
205                        let simple_tag = Tag::Simple(s.to_string());
206                        if !tags.contains(&simple_tag) {
207                            tags.push(simple_tag);
208                        }
209                    }
210                }
211            }
212        }
213
214        // Build table metadata
215        let mut odcl_metadata = HashMap::new();
216        if let Some(ref ns) = namespace {
217            odcl_metadata.insert("namespace".to_string(), json!(ns));
218        }
219        if let Some(doc) = schema_obj.get("doc").and_then(|v| v.as_str()) {
220            odcl_metadata.insert("description".to_string(), json!(doc));
221        }
222
223        let table = Table {
224            id: crate::models::table::Table::generate_id(&name, None, None, namespace.as_deref()),
225            name: name.clone(),
226            columns,
227            database_type: None,
228            catalog_name: None,
229            schema_name: namespace.clone(),
230            medallion_layers: Vec::new(),
231            scd_pattern: None,
232            data_vault_classification: None,
233            modeling_level: None,
234            tags,
235            odcl_metadata,
236            owner: None,
237            sla: None,
238            contact_details: None,
239            infrastructure_type: None,
240            notes: None,
241            position: None,
242            yaml_file_path: None,
243            drawio_cell_id: None,
244            quality: Vec::new(),
245            errors: Vec::new(),
246            created_at: chrono::Utc::now(),
247            updated_at: chrono::Utc::now(),
248        };
249
250        info!(
251            "Parsed AVRO schema: {} with {} columns",
252            name,
253            table.columns.len()
254        );
255        Ok(table)
256    }
257
258    /// Parse an AVRO field (which can be a simple field or nested record).
259    fn parse_field(
260        &self,
261        field: &Value,
262        _parent_name: &str,
263        errors: &mut Vec<ParserError>,
264    ) -> Result<Vec<Column>> {
265        let field_obj = field
266            .as_object()
267            .ok_or_else(|| anyhow::anyhow!("Field must be an object"))?;
268
269        let field_name = field_obj
270            .get("name")
271            .and_then(|v| v.as_str())
272            .ok_or_else(|| anyhow::anyhow!("Field missing name"))?
273            .to_string();
274
275        // Validate column name
276        if let Err(e) = validate_column_name(&field_name) {
277            warn!("Column name validation warning for '{}': {}", field_name, e);
278        }
279
280        let field_type = field_obj
281            .get("type")
282            .ok_or_else(|| anyhow::anyhow!("Field missing type"))?;
283
284        let description = field_obj
285            .get("doc")
286            .and_then(|v| v.as_str())
287            .map(|s| s.to_string())
288            .unwrap_or_default();
289
290        // Handle union types (e.g., ["null", "string"] for nullable)
291        let (avro_type, nullable) = if let Some(types) = field_type.as_array() {
292            if types.len() == 2 && types.iter().any(|t| t.as_str() == Some("null")) {
293                // Nullable type
294                let non_null_type = types
295                    .iter()
296                    .find(|t| t.as_str() != Some("null"))
297                    .ok_or_else(|| anyhow::anyhow!("Invalid union type"))?;
298                (non_null_type, true)
299            } else {
300                // Complex union with multiple non-null types - use first non-null type
301                // and mark as nullable since union implies optionality
302                let first_non_null = types
303                    .iter()
304                    .find(|t| t.as_str() != Some("null"))
305                    .unwrap_or(field_type);
306                (first_non_null, true)
307            }
308        } else {
309            (field_type, false)
310        };
311
312        // Parse the actual type
313        let mut columns = Vec::new();
314        if let Some(type_str) = avro_type.as_str() {
315            // Simple type
316            let data_type = self.map_avro_type_to_sql(type_str);
317            columns.push(Column {
318                name: field_name,
319                data_type,
320                nullable,
321                description,
322                ..Default::default()
323            });
324        } else if let Some(type_obj) = avro_type.as_object() {
325            // Complex type (record, array, map)
326            if type_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
327                // Nested record - create nested columns with dot notation
328                let nested_name = type_obj
329                    .get("name")
330                    .and_then(|v| v.as_str())
331                    .unwrap_or(&field_name);
332                let nested_fields = type_obj
333                    .get("fields")
334                    .and_then(|v| v.as_array())
335                    .ok_or_else(|| anyhow::anyhow!("Nested record missing fields"))?;
336
337                for nested_field in nested_fields {
338                    match self.parse_field(nested_field, nested_name, errors) {
339                        Ok(mut nested_cols) => {
340                            // Prefix nested columns with parent field name
341                            for col in nested_cols.iter_mut() {
342                                col.name = format!("{}.{}", field_name, col.name);
343                            }
344                            columns.append(&mut nested_cols);
345                        }
346                        Err(e) => {
347                            errors.push(ParserError {
348                                error_type: "parse_error".to_string(),
349                                field: Some(format!("{}.{}", field_name, nested_name)),
350                                message: format!("Failed to parse nested field: {}", e),
351                            });
352                        }
353                    }
354                }
355            } else if type_obj.get("type").and_then(|v| v.as_str()) == Some("array") {
356                // Array type
357                let items = type_obj
358                    .get("items")
359                    .ok_or_else(|| anyhow::anyhow!("Array type missing items"))?;
360
361                let data_type = if let Some(items_str) = items.as_str() {
362                    format!("ARRAY<{}>", self.map_avro_type_to_sql(items_str))
363                } else if let Some(items_obj) = items.as_object() {
364                    if items_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
365                        // Array of records - create nested columns
366                        let nested_name = items_obj
367                            .get("name")
368                            .and_then(|v| v.as_str())
369                            .unwrap_or(&field_name);
370                        let nested_fields = items_obj
371                            .get("fields")
372                            .and_then(|v| v.as_array())
373                            .ok_or_else(|| anyhow::anyhow!("Array record missing fields"))?;
374
375                        for nested_field in nested_fields {
376                            match self.parse_field(nested_field, nested_name, errors) {
377                                Ok(mut nested_cols) => {
378                                    for col in nested_cols.iter_mut() {
379                                        col.name = format!("{}.{}", field_name, col.name);
380                                    }
381                                    columns.append(&mut nested_cols);
382                                }
383                                Err(e) => {
384                                    errors.push(ParserError {
385                                        error_type: "parse_error".to_string(),
386                                        field: Some(format!("{}.{}", field_name, nested_name)),
387                                        message: format!("Failed to parse array item field: {}", e),
388                                    });
389                                }
390                            }
391                        }
392                        return Ok(columns);
393                    } else {
394                        format!("ARRAY<{}>", "STRUCT")
395                    }
396                } else {
397                    "ARRAY<STRING>".to_string()
398                };
399
400                columns.push(Column {
401                    name: field_name,
402                    data_type,
403                    nullable,
404                    description,
405                    ..Default::default()
406                });
407            } else {
408                // Other complex types - default to STRUCT
409                columns.push(Column {
410                    name: field_name,
411                    data_type: "STRUCT".to_string(),
412                    nullable,
413                    description,
414                    ..Default::default()
415                });
416            }
417        } else {
418            return Err(anyhow::anyhow!("Unsupported field type format"));
419        }
420
421        Ok(columns)
422    }
423
424    /// Map AVRO type to SQL/ODCL data type.
425    fn map_avro_type_to_sql(&self, avro_type: &str) -> String {
426        match avro_type {
427            "int" => "INTEGER".to_string(),
428            "long" => "BIGINT".to_string(),
429            "float" => "FLOAT".to_string(),
430            "double" => "DOUBLE".to_string(),
431            "boolean" => "BOOLEAN".to_string(),
432            "bytes" => "BYTES".to_string(),
433            "string" => "STRING".to_string(),
434            "null" => "NULL".to_string(),
435            _ => "STRING".to_string(), // Default fallback
436        }
437    }
438}
439
440/// Parser error structure (matches ODCL parser format).
441#[derive(Debug, Clone)]
442pub struct ParserError {
443    pub error_type: String,
444    pub field: Option<String>,
445    pub message: String,
446}