data_modelling_sdk/import/
avro.rs

1//! AVRO schema parser for importing AVRO schemas into data models.
2//!
3//! # Validation
4//!
5//! All imported table and column names are validated for:
6//! - Valid identifier format
7//! - Maximum length limits
8
9use crate::import::{ImportError, ImportResult, TableData};
10use crate::models::{Column, Table, Tag};
11use crate::validation::input::{validate_column_name, validate_table_name};
12use anyhow::{Context, Result};
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use std::str::FromStr;
16use tracing::{info, warn};
17
18/// Parser for AVRO schema format.
19#[derive(Default)]
20pub struct AvroImporter;
21
22impl AvroImporter {
23    /// Create a new AVRO parser instance.
24    ///
25    /// # Example
26    ///
27    /// ```rust
28    /// use data_modelling_sdk::import::avro::AvroImporter;
29    ///
30    /// let importer = AvroImporter::new();
31    /// ```
32    pub fn new() -> Self {
33        Self
34    }
35
36    /// Import AVRO schema content and create Table(s) (SDK interface).
37    ///
38    /// # Arguments
39    ///
40    /// * `avro_content` - AVRO schema as JSON string (can be a single record or array of records)
41    ///
42    /// # Returns
43    ///
44    /// An `ImportResult` containing extracted tables and any parse errors.
45    ///
46    /// # Example
47    ///
48    /// ```rust
49    /// use data_modelling_sdk::import::avro::AvroImporter;
50    ///
51    /// let importer = AvroImporter::new();
52    /// let schema = r#"
53    /// {
54    ///   "type": "record",
55    ///   "name": "User",
56    ///   "fields": [
57    ///     {"name": "id", "type": "long"}
58    ///   ]
59    /// }
60    /// "#;
61    /// let result = importer.import(schema).unwrap();
62    /// ```
63    pub fn import(&self, avro_content: &str) -> Result<ImportResult, ImportError> {
64        match self.parse(avro_content) {
65            Ok((tables, errors)) => {
66                let mut sdk_tables = Vec::new();
67                for (idx, table) in tables.iter().enumerate() {
68                    sdk_tables.push(TableData {
69                        table_index: idx,
70                        name: Some(table.name.clone()),
71                        columns: table
72                            .columns
73                            .iter()
74                            .map(|c| super::ColumnData {
75                                name: c.name.clone(),
76                                data_type: c.data_type.clone(),
77                                nullable: c.nullable,
78                                primary_key: c.primary_key,
79                                description: if c.description.is_empty() {
80                                    None
81                                } else {
82                                    Some(c.description.clone())
83                                },
84                                quality: if c.quality.is_empty() {
85                                    None
86                                } else {
87                                    Some(c.quality.clone())
88                                },
89                                ref_path: c.ref_path.clone(),
90                                enum_values: if c.enum_values.is_empty() {
91                                    None
92                                } else {
93                                    Some(c.enum_values.clone())
94                                },
95                            })
96                            .collect(),
97                    });
98                }
99                let sdk_errors: Vec<ImportError> = errors
100                    .iter()
101                    .map(|e| ImportError::ParseError(e.message.clone()))
102                    .collect();
103                Ok(ImportResult {
104                    tables: sdk_tables,
105                    tables_requiring_name: Vec::new(),
106                    errors: sdk_errors,
107                    ai_suggestions: None,
108                })
109            }
110            Err(e) => Err(ImportError::ParseError(e.to_string())),
111        }
112    }
113
114    /// Parse AVRO schema content and create Table(s) (internal method).
115    ///
116    /// # Returns
117    ///
118    /// Returns a tuple of (Tables, list of errors/warnings).
119    fn parse(&self, avro_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
120        let mut errors = Vec::new();
121
122        // Parse JSON
123        let schema: Value =
124            serde_json::from_str(avro_content).context("Failed to parse AVRO schema as JSON")?;
125
126        let mut tables = Vec::new();
127
128        // AVRO can be a single record or an array of records
129        if let Some(schemas) = schema.as_array() {
130            // Multiple schemas
131            for (idx, schema_item) in schemas.iter().enumerate() {
132                match self.parse_schema(schema_item, &mut errors) {
133                    Ok(table) => tables.push(table),
134                    Err(e) => {
135                        errors.push(ParserError {
136                            error_type: "parse_error".to_string(),
137                            field: Some(format!("schema[{}]", idx)),
138                            message: format!("Failed to parse schema: {}", e),
139                        });
140                    }
141                }
142            }
143        } else {
144            // Single schema
145            match self.parse_schema(&schema, &mut errors) {
146                Ok(table) => tables.push(table),
147                Err(e) => {
148                    errors.push(ParserError {
149                        error_type: "parse_error".to_string(),
150                        field: None,
151                        message: format!("Failed to parse schema: {}", e),
152                    });
153                }
154            }
155        }
156
157        Ok((tables, errors))
158    }
159
160    /// Parse a single AVRO schema record.
161    fn parse_schema(&self, schema: &Value, errors: &mut Vec<ParserError>) -> Result<Table> {
162        let schema_obj = schema
163            .as_object()
164            .ok_or_else(|| anyhow::anyhow!("Schema must be an object"))?;
165
166        // Extract record name
167        let name = schema_obj
168            .get("name")
169            .and_then(|v| v.as_str())
170            .ok_or_else(|| anyhow::anyhow!("Missing required field: name"))?
171            .to_string();
172
173        // Validate table name
174        if let Err(e) = validate_table_name(&name) {
175            warn!("Table name validation warning for '{}': {}", name, e);
176        }
177
178        // Extract namespace (optional)
179        let namespace = schema_obj
180            .get("namespace")
181            .and_then(|v| v.as_str())
182            .map(|s| s.to_string());
183
184        // Extract fields
185        let fields = schema_obj
186            .get("fields")
187            .and_then(|v| v.as_array())
188            .ok_or_else(|| anyhow::anyhow!("Missing required field: fields"))?;
189
190        let mut columns = Vec::new();
191        for (idx, field) in fields.iter().enumerate() {
192            match self.parse_field(field, &name, errors) {
193                Ok(mut cols) => columns.append(&mut cols),
194                Err(e) => {
195                    errors.push(ParserError {
196                        error_type: "parse_error".to_string(),
197                        field: Some(format!("fields[{}]", idx)),
198                        message: format!("Failed to parse field: {}", e),
199                    });
200                }
201            }
202        }
203
204        // Extract tags from AVRO schema (can be in root or in aliases/metadata)
205        let mut tags: Vec<Tag> = Vec::new();
206        if let Some(tags_arr) = schema_obj.get("tags").and_then(|v| v.as_array()) {
207            for item in tags_arr {
208                if let Some(s) = item.as_str() {
209                    if let Ok(tag) = Tag::from_str(s) {
210                        tags.push(tag);
211                    } else {
212                        tags.push(Tag::Simple(s.to_string()));
213                    }
214                }
215            }
216        }
217        // Also check aliases/metadata for tags
218        if let Some(aliases_arr) = schema_obj.get("aliases").and_then(|v| v.as_array()) {
219            for item in aliases_arr {
220                if let Some(s) = item.as_str() {
221                    // AVRO aliases can be used as tags
222                    if let Ok(tag) = Tag::from_str(s) {
223                        if !tags.contains(&tag) {
224                            tags.push(tag);
225                        }
226                    } else {
227                        let simple_tag = Tag::Simple(s.to_string());
228                        if !tags.contains(&simple_tag) {
229                            tags.push(simple_tag);
230                        }
231                    }
232                }
233            }
234        }
235
236        // Build table metadata
237        let mut odcl_metadata = HashMap::new();
238        if let Some(ref ns) = namespace {
239            odcl_metadata.insert("namespace".to_string(), json!(ns));
240        }
241        if let Some(doc) = schema_obj.get("doc").and_then(|v| v.as_str()) {
242            odcl_metadata.insert("description".to_string(), json!(doc));
243        }
244
245        let table = Table {
246            id: crate::models::table::Table::generate_id(&name, None, None, namespace.as_deref()),
247            name: name.clone(),
248            columns,
249            database_type: None,
250            catalog_name: None,
251            schema_name: namespace.clone(),
252            medallion_layers: Vec::new(),
253            scd_pattern: None,
254            data_vault_classification: None,
255            modeling_level: None,
256            tags,
257            odcl_metadata,
258            owner: None,
259            sla: None,
260            contact_details: None,
261            infrastructure_type: None,
262            notes: None,
263            position: None,
264            yaml_file_path: None,
265            drawio_cell_id: None,
266            quality: Vec::new(),
267            errors: Vec::new(),
268            created_at: chrono::Utc::now(),
269            updated_at: chrono::Utc::now(),
270        };
271
272        info!(
273            "Parsed AVRO schema: {} with {} columns",
274            name,
275            table.columns.len()
276        );
277        Ok(table)
278    }
279
280    /// Parse an AVRO field (which can be a simple field or nested record).
281    fn parse_field(
282        &self,
283        field: &Value,
284        _parent_name: &str,
285        errors: &mut Vec<ParserError>,
286    ) -> Result<Vec<Column>> {
287        let field_obj = field
288            .as_object()
289            .ok_or_else(|| anyhow::anyhow!("Field must be an object"))?;
290
291        let field_name = field_obj
292            .get("name")
293            .and_then(|v| v.as_str())
294            .ok_or_else(|| anyhow::anyhow!("Field missing name"))?
295            .to_string();
296
297        // Validate column name
298        if let Err(e) = validate_column_name(&field_name) {
299            warn!("Column name validation warning for '{}': {}", field_name, e);
300        }
301
302        let field_type = field_obj
303            .get("type")
304            .ok_or_else(|| anyhow::anyhow!("Field missing type"))?;
305
306        let description = field_obj
307            .get("doc")
308            .and_then(|v| v.as_str())
309            .map(|s| s.to_string())
310            .unwrap_or_default();
311
312        // Handle union types (e.g., ["null", "string"] for nullable)
313        let (avro_type, nullable) = if let Some(types) = field_type.as_array() {
314            if types.len() == 2 && types.iter().any(|t| t.as_str() == Some("null")) {
315                // Nullable type
316                let non_null_type = types
317                    .iter()
318                    .find(|t| t.as_str() != Some("null"))
319                    .ok_or_else(|| anyhow::anyhow!("Invalid union type"))?;
320                (non_null_type, true)
321            } else {
322                // Complex union with multiple non-null types - use first non-null type
323                // and mark as nullable since union implies optionality
324                let first_non_null = types
325                    .iter()
326                    .find(|t| t.as_str() != Some("null"))
327                    .unwrap_or(field_type);
328                (first_non_null, true)
329            }
330        } else {
331            (field_type, false)
332        };
333
334        // Parse the actual type
335        let mut columns = Vec::new();
336        if let Some(type_str) = avro_type.as_str() {
337            // Simple type
338            let data_type = self.map_avro_type_to_sql(type_str);
339            columns.push(Column {
340                name: field_name,
341                data_type,
342                nullable,
343                primary_key: false,
344                secondary_key: false,
345                composite_key: None,
346                foreign_key: None,
347                constraints: Vec::new(),
348                description,
349                quality: Vec::new(),
350                ref_path: None,
351                enum_values: Vec::new(),
352                errors: Vec::new(),
353                column_order: 0,
354            });
355        } else if let Some(type_obj) = avro_type.as_object() {
356            // Complex type (record, array, map)
357            if type_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
358                // Nested record - create nested columns with dot notation
359                let nested_name = type_obj
360                    .get("name")
361                    .and_then(|v| v.as_str())
362                    .unwrap_or(&field_name);
363                let nested_fields = type_obj
364                    .get("fields")
365                    .and_then(|v| v.as_array())
366                    .ok_or_else(|| anyhow::anyhow!("Nested record missing fields"))?;
367
368                for nested_field in nested_fields {
369                    match self.parse_field(nested_field, nested_name, errors) {
370                        Ok(mut nested_cols) => {
371                            // Prefix nested columns with parent field name
372                            for col in nested_cols.iter_mut() {
373                                col.name = format!("{}.{}", field_name, col.name);
374                            }
375                            columns.append(&mut nested_cols);
376                        }
377                        Err(e) => {
378                            errors.push(ParserError {
379                                error_type: "parse_error".to_string(),
380                                field: Some(format!("{}.{}", field_name, nested_name)),
381                                message: format!("Failed to parse nested field: {}", e),
382                            });
383                        }
384                    }
385                }
386            } else if type_obj.get("type").and_then(|v| v.as_str()) == Some("array") {
387                // Array type
388                let items = type_obj
389                    .get("items")
390                    .ok_or_else(|| anyhow::anyhow!("Array type missing items"))?;
391
392                let data_type = if let Some(items_str) = items.as_str() {
393                    format!("ARRAY<{}>", self.map_avro_type_to_sql(items_str))
394                } else if let Some(items_obj) = items.as_object() {
395                    if items_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
396                        // Array of records - create nested columns
397                        let nested_name = items_obj
398                            .get("name")
399                            .and_then(|v| v.as_str())
400                            .unwrap_or(&field_name);
401                        let nested_fields = items_obj
402                            .get("fields")
403                            .and_then(|v| v.as_array())
404                            .ok_or_else(|| anyhow::anyhow!("Array record missing fields"))?;
405
406                        for nested_field in nested_fields {
407                            match self.parse_field(nested_field, nested_name, errors) {
408                                Ok(mut nested_cols) => {
409                                    for col in nested_cols.iter_mut() {
410                                        col.name = format!("{}.{}", field_name, col.name);
411                                    }
412                                    columns.append(&mut nested_cols);
413                                }
414                                Err(e) => {
415                                    errors.push(ParserError {
416                                        error_type: "parse_error".to_string(),
417                                        field: Some(format!("{}.{}", field_name, nested_name)),
418                                        message: format!("Failed to parse array item field: {}", e),
419                                    });
420                                }
421                            }
422                        }
423                        return Ok(columns);
424                    } else {
425                        format!("ARRAY<{}>", "STRUCT")
426                    }
427                } else {
428                    "ARRAY<STRING>".to_string()
429                };
430
431                columns.push(Column {
432                    name: field_name,
433                    data_type,
434                    nullable,
435                    primary_key: false,
436                    secondary_key: false,
437                    composite_key: None,
438                    foreign_key: None,
439                    constraints: Vec::new(),
440                    description,
441                    quality: Vec::new(),
442                    ref_path: None,
443                    enum_values: Vec::new(),
444                    errors: Vec::new(),
445                    column_order: 0,
446                });
447            } else {
448                // Other complex types - default to STRUCT
449                columns.push(Column {
450                    name: field_name,
451                    data_type: "STRUCT".to_string(),
452                    nullable,
453                    primary_key: false,
454                    secondary_key: false,
455                    composite_key: None,
456                    foreign_key: None,
457                    constraints: Vec::new(),
458                    description,
459                    quality: Vec::new(),
460                    ref_path: None,
461                    enum_values: Vec::new(),
462                    errors: Vec::new(),
463                    column_order: 0,
464                });
465            }
466        } else {
467            return Err(anyhow::anyhow!("Unsupported field type format"));
468        }
469
470        Ok(columns)
471    }
472
473    /// Map AVRO type to SQL/ODCL data type.
474    fn map_avro_type_to_sql(&self, avro_type: &str) -> String {
475        match avro_type {
476            "int" => "INTEGER".to_string(),
477            "long" => "BIGINT".to_string(),
478            "float" => "FLOAT".to_string(),
479            "double" => "DOUBLE".to_string(),
480            "boolean" => "BOOLEAN".to_string(),
481            "bytes" => "BYTES".to_string(),
482            "string" => "STRING".to_string(),
483            "null" => "NULL".to_string(),
484            _ => "STRING".to_string(), // Default fallback
485        }
486    }
487}
488
489/// Parser error structure (matches ODCL parser format).
490#[derive(Debug, Clone)]
491pub struct ParserError {
492    pub error_type: String,
493    pub field: Option<String>,
494    pub message: String,
495}