data_modelling_sdk/import/
avro.rs

1//! AVRO schema parser for importing AVRO schemas into data models.
2//!
3//! # Validation
4//!
5//! All imported table and column names are validated for:
6//! - Valid identifier format
7//! - Maximum length limits
8
9use crate::import::{ImportError, ImportResult, TableData};
10use crate::models::{Column, Table};
11use crate::validation::input::{validate_column_name, validate_table_name};
12use anyhow::{Context, Result};
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use tracing::{info, warn};
16
17/// Parser for AVRO schema format.
18#[derive(Default)]
19pub struct AvroImporter;
20
21impl AvroImporter {
22    /// Create a new AVRO parser instance.
23    ///
24    /// # Example
25    ///
26    /// ```rust
27    /// use data_modelling_sdk::import::avro::AvroImporter;
28    ///
29    /// let importer = AvroImporter::new();
30    /// ```
31    pub fn new() -> Self {
32        Self
33    }
34
35    /// Import AVRO schema content and create Table(s) (SDK interface).
36    ///
37    /// # Arguments
38    ///
39    /// * `avro_content` - AVRO schema as JSON string (can be a single record or array of records)
40    ///
41    /// # Returns
42    ///
43    /// An `ImportResult` containing extracted tables and any parse errors.
44    ///
45    /// # Example
46    ///
47    /// ```rust
48    /// use data_modelling_sdk::import::avro::AvroImporter;
49    ///
50    /// let importer = AvroImporter::new();
51    /// let schema = r#"
52    /// {
53    ///   "type": "record",
54    ///   "name": "User",
55    ///   "fields": [
56    ///     {"name": "id", "type": "long"}
57    ///   ]
58    /// }
59    /// "#;
60    /// let result = importer.import(schema).unwrap();
61    /// ```
62    pub fn import(&self, avro_content: &str) -> Result<ImportResult, ImportError> {
63        match self.parse(avro_content) {
64            Ok((tables, errors)) => {
65                let mut sdk_tables = Vec::new();
66                for (idx, table) in tables.iter().enumerate() {
67                    sdk_tables.push(TableData {
68                        table_index: idx,
69                        name: Some(table.name.clone()),
70                        columns: table
71                            .columns
72                            .iter()
73                            .map(|c| super::ColumnData {
74                                name: c.name.clone(),
75                                data_type: c.data_type.clone(),
76                                nullable: c.nullable,
77                                primary_key: c.primary_key,
78                            })
79                            .collect(),
80                    });
81                }
82                let sdk_errors: Vec<ImportError> = errors
83                    .iter()
84                    .map(|e| ImportError::ParseError(e.message.clone()))
85                    .collect();
86                Ok(ImportResult {
87                    tables: sdk_tables,
88                    tables_requiring_name: Vec::new(),
89                    errors: sdk_errors,
90                    ai_suggestions: None,
91                })
92            }
93            Err(e) => Err(ImportError::ParseError(e.to_string())),
94        }
95    }
96
97    /// Parse AVRO schema content and create Table(s) (internal method).
98    ///
99    /// # Returns
100    ///
101    /// Returns a tuple of (Tables, list of errors/warnings).
102    fn parse(&self, avro_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
103        let mut errors = Vec::new();
104
105        // Parse JSON
106        let schema: Value =
107            serde_json::from_str(avro_content).context("Failed to parse AVRO schema as JSON")?;
108
109        let mut tables = Vec::new();
110
111        // AVRO can be a single record or an array of records
112        if let Some(schemas) = schema.as_array() {
113            // Multiple schemas
114            for (idx, schema_item) in schemas.iter().enumerate() {
115                match self.parse_schema(schema_item, &mut errors) {
116                    Ok(table) => tables.push(table),
117                    Err(e) => {
118                        errors.push(ParserError {
119                            error_type: "parse_error".to_string(),
120                            field: Some(format!("schema[{}]", idx)),
121                            message: format!("Failed to parse schema: {}", e),
122                        });
123                    }
124                }
125            }
126        } else {
127            // Single schema
128            match self.parse_schema(&schema, &mut errors) {
129                Ok(table) => tables.push(table),
130                Err(e) => {
131                    errors.push(ParserError {
132                        error_type: "parse_error".to_string(),
133                        field: None,
134                        message: format!("Failed to parse schema: {}", e),
135                    });
136                }
137            }
138        }
139
140        Ok((tables, errors))
141    }
142
143    /// Parse a single AVRO schema record.
144    fn parse_schema(&self, schema: &Value, errors: &mut Vec<ParserError>) -> Result<Table> {
145        let schema_obj = schema
146            .as_object()
147            .ok_or_else(|| anyhow::anyhow!("Schema must be an object"))?;
148
149        // Extract record name
150        let name = schema_obj
151            .get("name")
152            .and_then(|v| v.as_str())
153            .ok_or_else(|| anyhow::anyhow!("Missing required field: name"))?
154            .to_string();
155
156        // Validate table name
157        if let Err(e) = validate_table_name(&name) {
158            warn!("Table name validation warning for '{}': {}", name, e);
159        }
160
161        // Extract namespace (optional)
162        let namespace = schema_obj
163            .get("namespace")
164            .and_then(|v| v.as_str())
165            .map(|s| s.to_string());
166
167        // Extract fields
168        let fields = schema_obj
169            .get("fields")
170            .and_then(|v| v.as_array())
171            .ok_or_else(|| anyhow::anyhow!("Missing required field: fields"))?;
172
173        let mut columns = Vec::new();
174        for (idx, field) in fields.iter().enumerate() {
175            match self.parse_field(field, &name, errors) {
176                Ok(mut cols) => columns.append(&mut cols),
177                Err(e) => {
178                    errors.push(ParserError {
179                        error_type: "parse_error".to_string(),
180                        field: Some(format!("fields[{}]", idx)),
181                        message: format!("Failed to parse field: {}", e),
182                    });
183                }
184            }
185        }
186
187        // Build table metadata
188        let mut odcl_metadata = HashMap::new();
189        if let Some(ref ns) = namespace {
190            odcl_metadata.insert("namespace".to_string(), json!(ns));
191        }
192        if let Some(doc) = schema_obj.get("doc").and_then(|v| v.as_str()) {
193            odcl_metadata.insert("description".to_string(), json!(doc));
194        }
195
196        let table = Table {
197            id: crate::models::table::Table::generate_id(&name, None, None, namespace.as_deref()),
198            name: name.clone(),
199            columns,
200            database_type: None,
201            catalog_name: None,
202            schema_name: namespace.clone(),
203            medallion_layers: Vec::new(),
204            scd_pattern: None,
205            data_vault_classification: None,
206            modeling_level: None,
207            tags: Vec::new(),
208            odcl_metadata,
209            owner: None,
210            sla: None,
211            contact_details: None,
212            infrastructure_type: None,
213            notes: None,
214            position: None,
215            yaml_file_path: None,
216            drawio_cell_id: None,
217            quality: Vec::new(),
218            errors: Vec::new(),
219            created_at: chrono::Utc::now(),
220            updated_at: chrono::Utc::now(),
221        };
222
223        info!(
224            "Parsed AVRO schema: {} with {} columns",
225            name,
226            table.columns.len()
227        );
228        Ok(table)
229    }
230
231    /// Parse an AVRO field (which can be a simple field or nested record).
232    fn parse_field(
233        &self,
234        field: &Value,
235        _parent_name: &str,
236        errors: &mut Vec<ParserError>,
237    ) -> Result<Vec<Column>> {
238        let field_obj = field
239            .as_object()
240            .ok_or_else(|| anyhow::anyhow!("Field must be an object"))?;
241
242        let field_name = field_obj
243            .get("name")
244            .and_then(|v| v.as_str())
245            .ok_or_else(|| anyhow::anyhow!("Field missing name"))?
246            .to_string();
247
248        // Validate column name
249        if let Err(e) = validate_column_name(&field_name) {
250            warn!("Column name validation warning for '{}': {}", field_name, e);
251        }
252
253        let field_type = field_obj
254            .get("type")
255            .ok_or_else(|| anyhow::anyhow!("Field missing type"))?;
256
257        let description = field_obj
258            .get("doc")
259            .and_then(|v| v.as_str())
260            .map(|s| s.to_string())
261            .unwrap_or_default();
262
263        // Handle union types (e.g., ["null", "string"] for nullable)
264        let (avro_type, nullable) = if let Some(types) = field_type.as_array() {
265            if types.len() == 2 && types.iter().any(|t| t.as_str() == Some("null")) {
266                // Nullable type
267                let non_null_type = types
268                    .iter()
269                    .find(|t| t.as_str() != Some("null"))
270                    .ok_or_else(|| anyhow::anyhow!("Invalid union type"))?;
271                (non_null_type, true)
272            } else {
273                // Complex union with multiple non-null types - use first non-null type
274                // and mark as nullable since union implies optionality
275                let first_non_null = types
276                    .iter()
277                    .find(|t| t.as_str() != Some("null"))
278                    .unwrap_or(field_type);
279                (first_non_null, true)
280            }
281        } else {
282            (field_type, false)
283        };
284
285        // Parse the actual type
286        let mut columns = Vec::new();
287        if let Some(type_str) = avro_type.as_str() {
288            // Simple type
289            let data_type = self.map_avro_type_to_sql(type_str);
290            columns.push(Column {
291                name: field_name,
292                data_type,
293                nullable,
294                primary_key: false,
295                secondary_key: false,
296                composite_key: None,
297                foreign_key: None,
298                constraints: Vec::new(),
299                description,
300                quality: Vec::new(),
301                enum_values: Vec::new(),
302                errors: Vec::new(),
303                column_order: 0,
304            });
305        } else if let Some(type_obj) = avro_type.as_object() {
306            // Complex type (record, array, map)
307            if type_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
308                // Nested record - create nested columns with dot notation
309                let nested_name = type_obj
310                    .get("name")
311                    .and_then(|v| v.as_str())
312                    .unwrap_or(&field_name);
313                let nested_fields = type_obj
314                    .get("fields")
315                    .and_then(|v| v.as_array())
316                    .ok_or_else(|| anyhow::anyhow!("Nested record missing fields"))?;
317
318                for nested_field in nested_fields {
319                    match self.parse_field(nested_field, nested_name, errors) {
320                        Ok(mut nested_cols) => {
321                            // Prefix nested columns with parent field name
322                            for col in nested_cols.iter_mut() {
323                                col.name = format!("{}.{}", field_name, col.name);
324                            }
325                            columns.append(&mut nested_cols);
326                        }
327                        Err(e) => {
328                            errors.push(ParserError {
329                                error_type: "parse_error".to_string(),
330                                field: Some(format!("{}.{}", field_name, nested_name)),
331                                message: format!("Failed to parse nested field: {}", e),
332                            });
333                        }
334                    }
335                }
336            } else if type_obj.get("type").and_then(|v| v.as_str()) == Some("array") {
337                // Array type
338                let items = type_obj
339                    .get("items")
340                    .ok_or_else(|| anyhow::anyhow!("Array type missing items"))?;
341
342                let data_type = if let Some(items_str) = items.as_str() {
343                    format!("ARRAY<{}>", self.map_avro_type_to_sql(items_str))
344                } else if let Some(items_obj) = items.as_object() {
345                    if items_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
346                        // Array of records - create nested columns
347                        let nested_name = items_obj
348                            .get("name")
349                            .and_then(|v| v.as_str())
350                            .unwrap_or(&field_name);
351                        let nested_fields = items_obj
352                            .get("fields")
353                            .and_then(|v| v.as_array())
354                            .ok_or_else(|| anyhow::anyhow!("Array record missing fields"))?;
355
356                        for nested_field in nested_fields {
357                            match self.parse_field(nested_field, nested_name, errors) {
358                                Ok(mut nested_cols) => {
359                                    for col in nested_cols.iter_mut() {
360                                        col.name = format!("{}.{}", field_name, col.name);
361                                    }
362                                    columns.append(&mut nested_cols);
363                                }
364                                Err(e) => {
365                                    errors.push(ParserError {
366                                        error_type: "parse_error".to_string(),
367                                        field: Some(format!("{}.{}", field_name, nested_name)),
368                                        message: format!("Failed to parse array item field: {}", e),
369                                    });
370                                }
371                            }
372                        }
373                        return Ok(columns);
374                    } else {
375                        format!("ARRAY<{}>", "STRUCT")
376                    }
377                } else {
378                    "ARRAY<STRING>".to_string()
379                };
380
381                columns.push(Column {
382                    name: field_name,
383                    data_type,
384                    nullable,
385                    primary_key: false,
386                    secondary_key: false,
387                    composite_key: None,
388                    foreign_key: None,
389                    constraints: Vec::new(),
390                    description,
391                    quality: Vec::new(),
392                    enum_values: Vec::new(),
393                    errors: Vec::new(),
394                    column_order: 0,
395                });
396            } else {
397                // Other complex types - default to STRUCT
398                columns.push(Column {
399                    name: field_name,
400                    data_type: "STRUCT".to_string(),
401                    nullable,
402                    primary_key: false,
403                    secondary_key: false,
404                    composite_key: None,
405                    foreign_key: None,
406                    constraints: Vec::new(),
407                    description,
408                    quality: Vec::new(),
409                    enum_values: Vec::new(),
410                    errors: Vec::new(),
411                    column_order: 0,
412                });
413            }
414        } else {
415            return Err(anyhow::anyhow!("Unsupported field type format"));
416        }
417
418        Ok(columns)
419    }
420
421    /// Map AVRO type to SQL/ODCL data type.
422    fn map_avro_type_to_sql(&self, avro_type: &str) -> String {
423        match avro_type {
424            "int" => "INTEGER".to_string(),
425            "long" => "BIGINT".to_string(),
426            "float" => "FLOAT".to_string(),
427            "double" => "DOUBLE".to_string(),
428            "boolean" => "BOOLEAN".to_string(),
429            "bytes" => "BYTES".to_string(),
430            "string" => "STRING".to_string(),
431            "null" => "NULL".to_string(),
432            _ => "STRING".to_string(), // Default fallback
433        }
434    }
435}
436
437/// Parser error structure (matches ODCL parser format).
438#[derive(Debug, Clone)]
439pub struct ParserError {
440    pub error_type: String,
441    pub field: Option<String>,
442    pub message: String,
443}