1use crate::import::{ImportError, ImportResult, TableData};
10use crate::models::{Column, Table, Tag};
11use crate::validation::input::{validate_column_name, validate_table_name};
12use anyhow::{Context, Result};
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use std::str::FromStr;
16use tracing::{info, warn};
17
18#[derive(Default)]
20pub struct AvroImporter;
21
22impl AvroImporter {
23 pub fn new() -> Self {
33 Self
34 }
35
36 pub fn import(&self, avro_content: &str) -> Result<ImportResult, ImportError> {
64 match self.parse(avro_content) {
65 Ok((tables, errors)) => {
66 let mut sdk_tables = Vec::new();
67 for (idx, table) in tables.iter().enumerate() {
68 sdk_tables.push(TableData {
69 table_index: idx,
70 name: Some(table.name.clone()),
71 columns: table
72 .columns
73 .iter()
74 .map(|c| super::ColumnData {
75 name: c.name.clone(),
76 data_type: c.data_type.clone(),
77 physical_type: c.physical_type.clone(),
78 nullable: c.nullable,
79 primary_key: c.primary_key,
80 description: if c.description.is_empty() {
81 None
82 } else {
83 Some(c.description.clone())
84 },
85 quality: if c.quality.is_empty() {
86 None
87 } else {
88 Some(c.quality.clone())
89 },
90 relationships: c.relationships.clone(),
91 enum_values: if c.enum_values.is_empty() {
92 None
93 } else {
94 Some(c.enum_values.clone())
95 },
96 })
97 .collect(),
98 });
99 }
100 let sdk_errors: Vec<ImportError> = errors
101 .iter()
102 .map(|e| ImportError::ParseError(e.message.clone()))
103 .collect();
104 Ok(ImportResult {
105 tables: sdk_tables,
106 tables_requiring_name: Vec::new(),
107 errors: sdk_errors,
108 ai_suggestions: None,
109 })
110 }
111 Err(e) => Err(ImportError::ParseError(e.to_string())),
112 }
113 }
114
115 fn parse(&self, avro_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
121 let mut errors = Vec::new();
122
123 let schema: Value =
125 serde_json::from_str(avro_content).context("Failed to parse AVRO schema as JSON")?;
126
127 let mut tables = Vec::new();
128
129 if let Some(schemas) = schema.as_array() {
131 for (idx, schema_item) in schemas.iter().enumerate() {
133 match self.parse_schema(schema_item, &mut errors) {
134 Ok(table) => tables.push(table),
135 Err(e) => {
136 errors.push(ParserError {
137 error_type: "parse_error".to_string(),
138 field: Some(format!("schema[{}]", idx)),
139 message: format!("Failed to parse schema: {}", e),
140 });
141 }
142 }
143 }
144 } else {
145 match self.parse_schema(&schema, &mut errors) {
147 Ok(table) => tables.push(table),
148 Err(e) => {
149 errors.push(ParserError {
150 error_type: "parse_error".to_string(),
151 field: None,
152 message: format!("Failed to parse schema: {}", e),
153 });
154 }
155 }
156 }
157
158 Ok((tables, errors))
159 }
160
161 fn parse_schema(&self, schema: &Value, errors: &mut Vec<ParserError>) -> Result<Table> {
163 let schema_obj = schema
164 .as_object()
165 .ok_or_else(|| anyhow::anyhow!("Schema must be an object"))?;
166
167 let name = schema_obj
169 .get("name")
170 .and_then(|v| v.as_str())
171 .ok_or_else(|| anyhow::anyhow!("Missing required field: name"))?
172 .to_string();
173
174 if let Err(e) = validate_table_name(&name) {
176 warn!("Table name validation warning for '{}': {}", name, e);
177 }
178
179 let namespace = schema_obj
181 .get("namespace")
182 .and_then(|v| v.as_str())
183 .map(|s| s.to_string());
184
185 let fields = schema_obj
187 .get("fields")
188 .and_then(|v| v.as_array())
189 .ok_or_else(|| anyhow::anyhow!("Missing required field: fields"))?;
190
191 let mut columns = Vec::new();
192 for (idx, field) in fields.iter().enumerate() {
193 match self.parse_field(field, &name, errors) {
194 Ok(mut cols) => columns.append(&mut cols),
195 Err(e) => {
196 errors.push(ParserError {
197 error_type: "parse_error".to_string(),
198 field: Some(format!("fields[{}]", idx)),
199 message: format!("Failed to parse field: {}", e),
200 });
201 }
202 }
203 }
204
205 let mut tags: Vec<Tag> = Vec::new();
207 if let Some(tags_arr) = schema_obj.get("tags").and_then(|v| v.as_array()) {
208 for item in tags_arr {
209 if let Some(s) = item.as_str() {
210 if let Ok(tag) = Tag::from_str(s) {
211 tags.push(tag);
212 } else {
213 tags.push(Tag::Simple(s.to_string()));
214 }
215 }
216 }
217 }
218 if let Some(aliases_arr) = schema_obj.get("aliases").and_then(|v| v.as_array()) {
220 for item in aliases_arr {
221 if let Some(s) = item.as_str() {
222 if let Ok(tag) = Tag::from_str(s) {
224 if !tags.contains(&tag) {
225 tags.push(tag);
226 }
227 } else {
228 let simple_tag = Tag::Simple(s.to_string());
229 if !tags.contains(&simple_tag) {
230 tags.push(simple_tag);
231 }
232 }
233 }
234 }
235 }
236
237 let mut odcl_metadata = HashMap::new();
239 if let Some(ref ns) = namespace {
240 odcl_metadata.insert("namespace".to_string(), json!(ns));
241 }
242 if let Some(doc) = schema_obj.get("doc").and_then(|v| v.as_str()) {
243 odcl_metadata.insert("description".to_string(), json!(doc));
244 }
245
246 let table = Table {
247 id: crate::models::table::Table::generate_id(&name, None, None, namespace.as_deref()),
248 name: name.clone(),
249 columns,
250 database_type: None,
251 catalog_name: None,
252 schema_name: namespace.clone(),
253 medallion_layers: Vec::new(),
254 scd_pattern: None,
255 data_vault_classification: None,
256 modeling_level: None,
257 tags,
258 odcl_metadata,
259 owner: None,
260 sla: None,
261 contact_details: None,
262 infrastructure_type: None,
263 notes: None,
264 position: None,
265 yaml_file_path: None,
266 drawio_cell_id: None,
267 quality: Vec::new(),
268 errors: Vec::new(),
269 created_at: chrono::Utc::now(),
270 updated_at: chrono::Utc::now(),
271 };
272
273 info!(
274 "Parsed AVRO schema: {} with {} columns",
275 name,
276 table.columns.len()
277 );
278 Ok(table)
279 }
280
281 fn parse_field(
283 &self,
284 field: &Value,
285 _parent_name: &str,
286 errors: &mut Vec<ParserError>,
287 ) -> Result<Vec<Column>> {
288 let field_obj = field
289 .as_object()
290 .ok_or_else(|| anyhow::anyhow!("Field must be an object"))?;
291
292 let field_name = field_obj
293 .get("name")
294 .and_then(|v| v.as_str())
295 .ok_or_else(|| anyhow::anyhow!("Field missing name"))?
296 .to_string();
297
298 if let Err(e) = validate_column_name(&field_name) {
300 warn!("Column name validation warning for '{}': {}", field_name, e);
301 }
302
303 let field_type = field_obj
304 .get("type")
305 .ok_or_else(|| anyhow::anyhow!("Field missing type"))?;
306
307 let description = field_obj
308 .get("doc")
309 .and_then(|v| v.as_str())
310 .map(|s| s.to_string())
311 .unwrap_or_default();
312
313 let (avro_type, nullable) = if let Some(types) = field_type.as_array() {
315 if types.len() == 2 && types.iter().any(|t| t.as_str() == Some("null")) {
316 let non_null_type = types
318 .iter()
319 .find(|t| t.as_str() != Some("null"))
320 .ok_or_else(|| anyhow::anyhow!("Invalid union type"))?;
321 (non_null_type, true)
322 } else {
323 let first_non_null = types
326 .iter()
327 .find(|t| t.as_str() != Some("null"))
328 .unwrap_or(field_type);
329 (first_non_null, true)
330 }
331 } else {
332 (field_type, false)
333 };
334
335 let mut columns = Vec::new();
337 if let Some(type_str) = avro_type.as_str() {
338 let data_type = self.map_avro_type_to_sql(type_str);
340 columns.push(Column {
341 name: field_name,
342 data_type,
343 physical_type: None,
344 nullable,
345 primary_key: false,
346 secondary_key: false,
347 composite_key: None,
348 foreign_key: None,
349 constraints: Vec::new(),
350 description,
351 quality: Vec::new(),
352 relationships: Vec::new(),
353 enum_values: Vec::new(),
354 errors: Vec::new(),
355 column_order: 0,
356 nested_data: None,
357 });
358 } else if let Some(type_obj) = avro_type.as_object() {
359 if type_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
361 let nested_name = type_obj
363 .get("name")
364 .and_then(|v| v.as_str())
365 .unwrap_or(&field_name);
366 let nested_fields = type_obj
367 .get("fields")
368 .and_then(|v| v.as_array())
369 .ok_or_else(|| anyhow::anyhow!("Nested record missing fields"))?;
370
371 for nested_field in nested_fields {
372 match self.parse_field(nested_field, nested_name, errors) {
373 Ok(mut nested_cols) => {
374 for col in nested_cols.iter_mut() {
376 col.name = format!("{}.{}", field_name, col.name);
377 }
378 columns.append(&mut nested_cols);
379 }
380 Err(e) => {
381 errors.push(ParserError {
382 error_type: "parse_error".to_string(),
383 field: Some(format!("{}.{}", field_name, nested_name)),
384 message: format!("Failed to parse nested field: {}", e),
385 });
386 }
387 }
388 }
389 } else if type_obj.get("type").and_then(|v| v.as_str()) == Some("array") {
390 let items = type_obj
392 .get("items")
393 .ok_or_else(|| anyhow::anyhow!("Array type missing items"))?;
394
395 let data_type = if let Some(items_str) = items.as_str() {
396 format!("ARRAY<{}>", self.map_avro_type_to_sql(items_str))
397 } else if let Some(items_obj) = items.as_object() {
398 if items_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
399 let nested_name = items_obj
401 .get("name")
402 .and_then(|v| v.as_str())
403 .unwrap_or(&field_name);
404 let nested_fields = items_obj
405 .get("fields")
406 .and_then(|v| v.as_array())
407 .ok_or_else(|| anyhow::anyhow!("Array record missing fields"))?;
408
409 for nested_field in nested_fields {
410 match self.parse_field(nested_field, nested_name, errors) {
411 Ok(mut nested_cols) => {
412 for col in nested_cols.iter_mut() {
413 col.name = format!("{}.{}", field_name, col.name);
414 }
415 columns.append(&mut nested_cols);
416 }
417 Err(e) => {
418 errors.push(ParserError {
419 error_type: "parse_error".to_string(),
420 field: Some(format!("{}.{}", field_name, nested_name)),
421 message: format!("Failed to parse array item field: {}", e),
422 });
423 }
424 }
425 }
426 return Ok(columns);
427 } else {
428 format!("ARRAY<{}>", "STRUCT")
429 }
430 } else {
431 "ARRAY<STRING>".to_string()
432 };
433
434 columns.push(Column {
435 name: field_name,
436 data_type,
437 physical_type: None,
438 nullable,
439 primary_key: false,
440 secondary_key: false,
441 composite_key: None,
442 foreign_key: None,
443 constraints: Vec::new(),
444 description,
445 quality: Vec::new(),
446 relationships: Vec::new(),
447 enum_values: Vec::new(),
448 errors: Vec::new(),
449 column_order: 0,
450 nested_data: None,
451 });
452 } else {
453 columns.push(Column {
455 name: field_name,
456 data_type: "STRUCT".to_string(),
457 physical_type: None,
458 nullable,
459 primary_key: false,
460 secondary_key: false,
461 composite_key: None,
462 foreign_key: None,
463 constraints: Vec::new(),
464 description,
465 quality: Vec::new(),
466 relationships: Vec::new(),
467 enum_values: Vec::new(),
468 errors: Vec::new(),
469 column_order: 0,
470 nested_data: None,
471 });
472 }
473 } else {
474 return Err(anyhow::anyhow!("Unsupported field type format"));
475 }
476
477 Ok(columns)
478 }
479
480 fn map_avro_type_to_sql(&self, avro_type: &str) -> String {
482 match avro_type {
483 "int" => "INTEGER".to_string(),
484 "long" => "BIGINT".to_string(),
485 "float" => "FLOAT".to_string(),
486 "double" => "DOUBLE".to_string(),
487 "boolean" => "BOOLEAN".to_string(),
488 "bytes" => "BYTES".to_string(),
489 "string" => "STRING".to_string(),
490 "null" => "NULL".to_string(),
491 _ => "STRING".to_string(), }
493 }
494}
495
496#[derive(Debug, Clone)]
498pub struct ParserError {
499 pub error_type: String,
500 pub field: Option<String>,
501 pub message: String,
502}