data_modelling_sdk/import/
avro.rs

1//! AVRO schema parser for importing AVRO schemas into data models.
2//!
3//! # Validation
4//!
5//! All imported table and column names are validated for:
6//! - Valid identifier format
7//! - Maximum length limits
8
9use crate::import::{ImportError, ImportResult, TableData};
10use crate::models::{Column, Table};
11use crate::validation::input::{validate_column_name, validate_table_name};
12use anyhow::{Context, Result};
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use tracing::{info, warn};
16
17/// Parser for AVRO schema format.
18#[derive(Default)]
19pub struct AvroImporter;
20
21impl AvroImporter {
22    /// Create a new AVRO parser instance.
23    ///
24    /// # Example
25    ///
26    /// ```rust
27    /// use data_modelling_sdk::import::avro::AvroImporter;
28    ///
29    /// let importer = AvroImporter::new();
30    /// ```
31    pub fn new() -> Self {
32        Self
33    }
34
35    /// Import AVRO schema content and create Table(s) (SDK interface).
36    ///
37    /// # Arguments
38    ///
39    /// * `avro_content` - AVRO schema as JSON string (can be a single record or array of records)
40    ///
41    /// # Returns
42    ///
43    /// An `ImportResult` containing extracted tables and any parse errors.
44    ///
45    /// # Example
46    ///
47    /// ```rust
48    /// use data_modelling_sdk::import::avro::AvroImporter;
49    ///
50    /// let importer = AvroImporter::new();
51    /// let schema = r#"
52    /// {
53    ///   "type": "record",
54    ///   "name": "User",
55    ///   "fields": [
56    ///     {"name": "id", "type": "long"}
57    ///   ]
58    /// }
59    /// "#;
60    /// let result = importer.import(schema).unwrap();
61    /// ```
62    pub fn import(&self, avro_content: &str) -> Result<ImportResult, ImportError> {
63        match self.parse(avro_content) {
64            Ok((tables, errors)) => {
65                let mut sdk_tables = Vec::new();
66                for (idx, table) in tables.iter().enumerate() {
67                    sdk_tables.push(TableData {
68                        table_index: idx,
69                        name: Some(table.name.clone()),
70                        columns: table
71                            .columns
72                            .iter()
73                            .map(|c| super::ColumnData {
74                                name: c.name.clone(),
75                                data_type: c.data_type.clone(),
76                                nullable: c.nullable,
77                                primary_key: c.primary_key,
78                            })
79                            .collect(),
80                    });
81                }
82                let sdk_errors: Vec<ImportError> = errors
83                    .iter()
84                    .map(|e| ImportError::ParseError(e.message.clone()))
85                    .collect();
86                Ok(ImportResult {
87                    tables: sdk_tables,
88                    tables_requiring_name: Vec::new(),
89                    errors: sdk_errors,
90                    ai_suggestions: None,
91                })
92            }
93            Err(e) => Err(ImportError::ParseError(e.to_string())),
94        }
95    }
96
97    /// Parse AVRO schema content and create Table(s) (internal method).
98    ///
99    /// # Returns
100    ///
101    /// Returns a tuple of (Tables, list of errors/warnings).
102    fn parse(&self, avro_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
103        let mut errors = Vec::new();
104
105        // Parse JSON
106        let schema: Value =
107            serde_json::from_str(avro_content).context("Failed to parse AVRO schema as JSON")?;
108
109        let mut tables = Vec::new();
110
111        // AVRO can be a single record or an array of records
112        if let Some(schemas) = schema.as_array() {
113            // Multiple schemas
114            for (idx, schema_item) in schemas.iter().enumerate() {
115                match self.parse_schema(schema_item, &mut errors) {
116                    Ok(table) => tables.push(table),
117                    Err(e) => {
118                        errors.push(ParserError {
119                            error_type: "parse_error".to_string(),
120                            field: Some(format!("schema[{}]", idx)),
121                            message: format!("Failed to parse schema: {}", e),
122                        });
123                    }
124                }
125            }
126        } else {
127            // Single schema
128            match self.parse_schema(&schema, &mut errors) {
129                Ok(table) => tables.push(table),
130                Err(e) => {
131                    errors.push(ParserError {
132                        error_type: "parse_error".to_string(),
133                        field: None,
134                        message: format!("Failed to parse schema: {}", e),
135                    });
136                }
137            }
138        }
139
140        Ok((tables, errors))
141    }
142
143    /// Parse a single AVRO schema record.
144    fn parse_schema(&self, schema: &Value, errors: &mut Vec<ParserError>) -> Result<Table> {
145        let schema_obj = schema
146            .as_object()
147            .ok_or_else(|| anyhow::anyhow!("Schema must be an object"))?;
148
149        // Extract record name
150        let name = schema_obj
151            .get("name")
152            .and_then(|v| v.as_str())
153            .ok_or_else(|| anyhow::anyhow!("Missing required field: name"))?
154            .to_string();
155
156        // Validate table name
157        if let Err(e) = validate_table_name(&name) {
158            warn!("Table name validation warning for '{}': {}", name, e);
159        }
160
161        // Extract namespace (optional)
162        let namespace = schema_obj
163            .get("namespace")
164            .and_then(|v| v.as_str())
165            .map(|s| s.to_string());
166
167        // Extract fields
168        let fields = schema_obj
169            .get("fields")
170            .and_then(|v| v.as_array())
171            .ok_or_else(|| anyhow::anyhow!("Missing required field: fields"))?;
172
173        let mut columns = Vec::new();
174        for (idx, field) in fields.iter().enumerate() {
175            match self.parse_field(field, &name, errors) {
176                Ok(mut cols) => columns.append(&mut cols),
177                Err(e) => {
178                    errors.push(ParserError {
179                        error_type: "parse_error".to_string(),
180                        field: Some(format!("fields[{}]", idx)),
181                        message: format!("Failed to parse field: {}", e),
182                    });
183                }
184            }
185        }
186
187        // Build table metadata
188        let mut odcl_metadata = HashMap::new();
189        if let Some(ref ns) = namespace {
190            odcl_metadata.insert("namespace".to_string(), json!(ns));
191        }
192        if let Some(doc) = schema_obj.get("doc").and_then(|v| v.as_str()) {
193            odcl_metadata.insert("description".to_string(), json!(doc));
194        }
195
196        let table = Table {
197            id: crate::models::table::Table::generate_id(&name, None, None, namespace.as_deref()),
198            name: name.clone(),
199            columns,
200            database_type: None,
201            catalog_name: None,
202            schema_name: namespace.clone(),
203            medallion_layers: Vec::new(),
204            scd_pattern: None,
205            data_vault_classification: None,
206            modeling_level: None,
207            tags: Vec::new(),
208            odcl_metadata,
209            position: None,
210            yaml_file_path: None,
211            drawio_cell_id: None,
212            quality: Vec::new(),
213            errors: Vec::new(),
214            created_at: chrono::Utc::now(),
215            updated_at: chrono::Utc::now(),
216        };
217
218        info!(
219            "Parsed AVRO schema: {} with {} columns",
220            name,
221            table.columns.len()
222        );
223        Ok(table)
224    }
225
226    /// Parse an AVRO field (which can be a simple field or nested record).
227    fn parse_field(
228        &self,
229        field: &Value,
230        _parent_name: &str,
231        errors: &mut Vec<ParserError>,
232    ) -> Result<Vec<Column>> {
233        let field_obj = field
234            .as_object()
235            .ok_or_else(|| anyhow::anyhow!("Field must be an object"))?;
236
237        let field_name = field_obj
238            .get("name")
239            .and_then(|v| v.as_str())
240            .ok_or_else(|| anyhow::anyhow!("Field missing name"))?
241            .to_string();
242
243        // Validate column name
244        if let Err(e) = validate_column_name(&field_name) {
245            warn!("Column name validation warning for '{}': {}", field_name, e);
246        }
247
248        let field_type = field_obj
249            .get("type")
250            .ok_or_else(|| anyhow::anyhow!("Field missing type"))?;
251
252        let description = field_obj
253            .get("doc")
254            .and_then(|v| v.as_str())
255            .map(|s| s.to_string())
256            .unwrap_or_default();
257
258        // Handle union types (e.g., ["null", "string"] for nullable)
259        let (avro_type, nullable) = if let Some(types) = field_type.as_array() {
260            if types.len() == 2 && types.iter().any(|t| t.as_str() == Some("null")) {
261                // Nullable type
262                let non_null_type = types
263                    .iter()
264                    .find(|t| t.as_str() != Some("null"))
265                    .ok_or_else(|| anyhow::anyhow!("Invalid union type"))?;
266                (non_null_type, true)
267            } else {
268                // Complex union with multiple non-null types - use first non-null type
269                // and mark as nullable since union implies optionality
270                let first_non_null = types
271                    .iter()
272                    .find(|t| t.as_str() != Some("null"))
273                    .unwrap_or(field_type);
274                (first_non_null, true)
275            }
276        } else {
277            (field_type, false)
278        };
279
280        // Parse the actual type
281        let mut columns = Vec::new();
282        if let Some(type_str) = avro_type.as_str() {
283            // Simple type
284            let data_type = self.map_avro_type_to_sql(type_str);
285            columns.push(Column {
286                name: field_name,
287                data_type,
288                nullable,
289                primary_key: false,
290                secondary_key: false,
291                composite_key: None,
292                foreign_key: None,
293                constraints: Vec::new(),
294                description,
295                quality: Vec::new(),
296                enum_values: Vec::new(),
297                errors: Vec::new(),
298                column_order: 0,
299            });
300        } else if let Some(type_obj) = avro_type.as_object() {
301            // Complex type (record, array, map)
302            if type_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
303                // Nested record - create nested columns with dot notation
304                let nested_name = type_obj
305                    .get("name")
306                    .and_then(|v| v.as_str())
307                    .unwrap_or(&field_name);
308                let nested_fields = type_obj
309                    .get("fields")
310                    .and_then(|v| v.as_array())
311                    .ok_or_else(|| anyhow::anyhow!("Nested record missing fields"))?;
312
313                for nested_field in nested_fields {
314                    match self.parse_field(nested_field, nested_name, errors) {
315                        Ok(mut nested_cols) => {
316                            // Prefix nested columns with parent field name
317                            for col in nested_cols.iter_mut() {
318                                col.name = format!("{}.{}", field_name, col.name);
319                            }
320                            columns.append(&mut nested_cols);
321                        }
322                        Err(e) => {
323                            errors.push(ParserError {
324                                error_type: "parse_error".to_string(),
325                                field: Some(format!("{}.{}", field_name, nested_name)),
326                                message: format!("Failed to parse nested field: {}", e),
327                            });
328                        }
329                    }
330                }
331            } else if type_obj.get("type").and_then(|v| v.as_str()) == Some("array") {
332                // Array type
333                let items = type_obj
334                    .get("items")
335                    .ok_or_else(|| anyhow::anyhow!("Array type missing items"))?;
336
337                let data_type = if let Some(items_str) = items.as_str() {
338                    format!("ARRAY<{}>", self.map_avro_type_to_sql(items_str))
339                } else if let Some(items_obj) = items.as_object() {
340                    if items_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
341                        // Array of records - create nested columns
342                        let nested_name = items_obj
343                            .get("name")
344                            .and_then(|v| v.as_str())
345                            .unwrap_or(&field_name);
346                        let nested_fields = items_obj
347                            .get("fields")
348                            .and_then(|v| v.as_array())
349                            .ok_or_else(|| anyhow::anyhow!("Array record missing fields"))?;
350
351                        for nested_field in nested_fields {
352                            match self.parse_field(nested_field, nested_name, errors) {
353                                Ok(mut nested_cols) => {
354                                    for col in nested_cols.iter_mut() {
355                                        col.name = format!("{}.{}", field_name, col.name);
356                                    }
357                                    columns.append(&mut nested_cols);
358                                }
359                                Err(e) => {
360                                    errors.push(ParserError {
361                                        error_type: "parse_error".to_string(),
362                                        field: Some(format!("{}.{}", field_name, nested_name)),
363                                        message: format!("Failed to parse array item field: {}", e),
364                                    });
365                                }
366                            }
367                        }
368                        return Ok(columns);
369                    } else {
370                        format!("ARRAY<{}>", "STRUCT")
371                    }
372                } else {
373                    "ARRAY<STRING>".to_string()
374                };
375
376                columns.push(Column {
377                    name: field_name,
378                    data_type,
379                    nullable,
380                    primary_key: false,
381                    secondary_key: false,
382                    composite_key: None,
383                    foreign_key: None,
384                    constraints: Vec::new(),
385                    description,
386                    quality: Vec::new(),
387                    enum_values: Vec::new(),
388                    errors: Vec::new(),
389                    column_order: 0,
390                });
391            } else {
392                // Other complex types - default to STRUCT
393                columns.push(Column {
394                    name: field_name,
395                    data_type: "STRUCT".to_string(),
396                    nullable,
397                    primary_key: false,
398                    secondary_key: false,
399                    composite_key: None,
400                    foreign_key: None,
401                    constraints: Vec::new(),
402                    description,
403                    quality: Vec::new(),
404                    enum_values: Vec::new(),
405                    errors: Vec::new(),
406                    column_order: 0,
407                });
408            }
409        } else {
410            return Err(anyhow::anyhow!("Unsupported field type format"));
411        }
412
413        Ok(columns)
414    }
415
416    /// Map AVRO type to SQL/ODCL data type.
417    fn map_avro_type_to_sql(&self, avro_type: &str) -> String {
418        match avro_type {
419            "int" => "INTEGER".to_string(),
420            "long" => "BIGINT".to_string(),
421            "float" => "FLOAT".to_string(),
422            "double" => "DOUBLE".to_string(),
423            "boolean" => "BOOLEAN".to_string(),
424            "bytes" => "BYTES".to_string(),
425            "string" => "STRING".to_string(),
426            "null" => "NULL".to_string(),
427            _ => "STRING".to_string(), // Default fallback
428        }
429    }
430}
431
432/// Parser error structure (matches ODCL parser format).
433#[derive(Debug, Clone)]
434pub struct ParserError {
435    pub error_type: String,
436    pub field: Option<String>,
437    pub message: String,
438}