1use crate::import::{ImportError, ImportResult, TableData};
10use crate::models::{Column, Table, Tag};
11use crate::validation::input::{validate_column_name, validate_table_name};
12use anyhow::{Context, Result};
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use std::str::FromStr;
16use tracing::{info, warn};
17
18#[derive(Default)]
20pub struct AvroImporter;
21
22impl AvroImporter {
23 pub fn new() -> Self {
33 Self
34 }
35
36 pub fn import(&self, avro_content: &str) -> Result<ImportResult, ImportError> {
64 match self.parse(avro_content) {
65 Ok((tables, errors)) => {
66 let mut sdk_tables = Vec::new();
67 for (idx, table) in tables.iter().enumerate() {
68 sdk_tables.push(TableData {
69 table_index: idx,
70 name: Some(table.name.clone()),
71 columns: table
72 .columns
73 .iter()
74 .map(|c| super::ColumnData {
75 name: c.name.clone(),
76 data_type: c.data_type.clone(),
77 nullable: c.nullable,
78 primary_key: c.primary_key,
79 description: if c.description.is_empty() {
80 None
81 } else {
82 Some(c.description.clone())
83 },
84 quality: if c.quality.is_empty() {
85 None
86 } else {
87 Some(c.quality.clone())
88 },
89 ref_path: c.ref_path.clone(),
90 })
91 .collect(),
92 });
93 }
94 let sdk_errors: Vec<ImportError> = errors
95 .iter()
96 .map(|e| ImportError::ParseError(e.message.clone()))
97 .collect();
98 Ok(ImportResult {
99 tables: sdk_tables,
100 tables_requiring_name: Vec::new(),
101 errors: sdk_errors,
102 ai_suggestions: None,
103 })
104 }
105 Err(e) => Err(ImportError::ParseError(e.to_string())),
106 }
107 }
108
109 fn parse(&self, avro_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
115 let mut errors = Vec::new();
116
117 let schema: Value =
119 serde_json::from_str(avro_content).context("Failed to parse AVRO schema as JSON")?;
120
121 let mut tables = Vec::new();
122
123 if let Some(schemas) = schema.as_array() {
125 for (idx, schema_item) in schemas.iter().enumerate() {
127 match self.parse_schema(schema_item, &mut errors) {
128 Ok(table) => tables.push(table),
129 Err(e) => {
130 errors.push(ParserError {
131 error_type: "parse_error".to_string(),
132 field: Some(format!("schema[{}]", idx)),
133 message: format!("Failed to parse schema: {}", e),
134 });
135 }
136 }
137 }
138 } else {
139 match self.parse_schema(&schema, &mut errors) {
141 Ok(table) => tables.push(table),
142 Err(e) => {
143 errors.push(ParserError {
144 error_type: "parse_error".to_string(),
145 field: None,
146 message: format!("Failed to parse schema: {}", e),
147 });
148 }
149 }
150 }
151
152 Ok((tables, errors))
153 }
154
155 fn parse_schema(&self, schema: &Value, errors: &mut Vec<ParserError>) -> Result<Table> {
157 let schema_obj = schema
158 .as_object()
159 .ok_or_else(|| anyhow::anyhow!("Schema must be an object"))?;
160
161 let name = schema_obj
163 .get("name")
164 .and_then(|v| v.as_str())
165 .ok_or_else(|| anyhow::anyhow!("Missing required field: name"))?
166 .to_string();
167
168 if let Err(e) = validate_table_name(&name) {
170 warn!("Table name validation warning for '{}': {}", name, e);
171 }
172
173 let namespace = schema_obj
175 .get("namespace")
176 .and_then(|v| v.as_str())
177 .map(|s| s.to_string());
178
179 let fields = schema_obj
181 .get("fields")
182 .and_then(|v| v.as_array())
183 .ok_or_else(|| anyhow::anyhow!("Missing required field: fields"))?;
184
185 let mut columns = Vec::new();
186 for (idx, field) in fields.iter().enumerate() {
187 match self.parse_field(field, &name, errors) {
188 Ok(mut cols) => columns.append(&mut cols),
189 Err(e) => {
190 errors.push(ParserError {
191 error_type: "parse_error".to_string(),
192 field: Some(format!("fields[{}]", idx)),
193 message: format!("Failed to parse field: {}", e),
194 });
195 }
196 }
197 }
198
199 let mut tags: Vec<Tag> = Vec::new();
201 if let Some(tags_arr) = schema_obj.get("tags").and_then(|v| v.as_array()) {
202 for item in tags_arr {
203 if let Some(s) = item.as_str() {
204 if let Ok(tag) = Tag::from_str(s) {
205 tags.push(tag);
206 } else {
207 tags.push(Tag::Simple(s.to_string()));
208 }
209 }
210 }
211 }
212 if let Some(aliases_arr) = schema_obj.get("aliases").and_then(|v| v.as_array()) {
214 for item in aliases_arr {
215 if let Some(s) = item.as_str() {
216 if let Ok(tag) = Tag::from_str(s) {
218 if !tags.contains(&tag) {
219 tags.push(tag);
220 }
221 } else {
222 let simple_tag = Tag::Simple(s.to_string());
223 if !tags.contains(&simple_tag) {
224 tags.push(simple_tag);
225 }
226 }
227 }
228 }
229 }
230
231 let mut odcl_metadata = HashMap::new();
233 if let Some(ref ns) = namespace {
234 odcl_metadata.insert("namespace".to_string(), json!(ns));
235 }
236 if let Some(doc) = schema_obj.get("doc").and_then(|v| v.as_str()) {
237 odcl_metadata.insert("description".to_string(), json!(doc));
238 }
239
240 let table = Table {
241 id: crate::models::table::Table::generate_id(&name, None, None, namespace.as_deref()),
242 name: name.clone(),
243 columns,
244 database_type: None,
245 catalog_name: None,
246 schema_name: namespace.clone(),
247 medallion_layers: Vec::new(),
248 scd_pattern: None,
249 data_vault_classification: None,
250 modeling_level: None,
251 tags,
252 odcl_metadata,
253 owner: None,
254 sla: None,
255 contact_details: None,
256 infrastructure_type: None,
257 notes: None,
258 position: None,
259 yaml_file_path: None,
260 drawio_cell_id: None,
261 quality: Vec::new(),
262 errors: Vec::new(),
263 created_at: chrono::Utc::now(),
264 updated_at: chrono::Utc::now(),
265 };
266
267 info!(
268 "Parsed AVRO schema: {} with {} columns",
269 name,
270 table.columns.len()
271 );
272 Ok(table)
273 }
274
275 fn parse_field(
277 &self,
278 field: &Value,
279 _parent_name: &str,
280 errors: &mut Vec<ParserError>,
281 ) -> Result<Vec<Column>> {
282 let field_obj = field
283 .as_object()
284 .ok_or_else(|| anyhow::anyhow!("Field must be an object"))?;
285
286 let field_name = field_obj
287 .get("name")
288 .and_then(|v| v.as_str())
289 .ok_or_else(|| anyhow::anyhow!("Field missing name"))?
290 .to_string();
291
292 if let Err(e) = validate_column_name(&field_name) {
294 warn!("Column name validation warning for '{}': {}", field_name, e);
295 }
296
297 let field_type = field_obj
298 .get("type")
299 .ok_or_else(|| anyhow::anyhow!("Field missing type"))?;
300
301 let description = field_obj
302 .get("doc")
303 .and_then(|v| v.as_str())
304 .map(|s| s.to_string())
305 .unwrap_or_default();
306
307 let (avro_type, nullable) = if let Some(types) = field_type.as_array() {
309 if types.len() == 2 && types.iter().any(|t| t.as_str() == Some("null")) {
310 let non_null_type = types
312 .iter()
313 .find(|t| t.as_str() != Some("null"))
314 .ok_or_else(|| anyhow::anyhow!("Invalid union type"))?;
315 (non_null_type, true)
316 } else {
317 let first_non_null = types
320 .iter()
321 .find(|t| t.as_str() != Some("null"))
322 .unwrap_or(field_type);
323 (first_non_null, true)
324 }
325 } else {
326 (field_type, false)
327 };
328
329 let mut columns = Vec::new();
331 if let Some(type_str) = avro_type.as_str() {
332 let data_type = self.map_avro_type_to_sql(type_str);
334 columns.push(Column {
335 name: field_name,
336 data_type,
337 nullable,
338 primary_key: false,
339 secondary_key: false,
340 composite_key: None,
341 foreign_key: None,
342 constraints: Vec::new(),
343 description,
344 quality: Vec::new(),
345 ref_path: None,
346 enum_values: Vec::new(),
347 errors: Vec::new(),
348 column_order: 0,
349 });
350 } else if let Some(type_obj) = avro_type.as_object() {
351 if type_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
353 let nested_name = type_obj
355 .get("name")
356 .and_then(|v| v.as_str())
357 .unwrap_or(&field_name);
358 let nested_fields = type_obj
359 .get("fields")
360 .and_then(|v| v.as_array())
361 .ok_or_else(|| anyhow::anyhow!("Nested record missing fields"))?;
362
363 for nested_field in nested_fields {
364 match self.parse_field(nested_field, nested_name, errors) {
365 Ok(mut nested_cols) => {
366 for col in nested_cols.iter_mut() {
368 col.name = format!("{}.{}", field_name, col.name);
369 }
370 columns.append(&mut nested_cols);
371 }
372 Err(e) => {
373 errors.push(ParserError {
374 error_type: "parse_error".to_string(),
375 field: Some(format!("{}.{}", field_name, nested_name)),
376 message: format!("Failed to parse nested field: {}", e),
377 });
378 }
379 }
380 }
381 } else if type_obj.get("type").and_then(|v| v.as_str()) == Some("array") {
382 let items = type_obj
384 .get("items")
385 .ok_or_else(|| anyhow::anyhow!("Array type missing items"))?;
386
387 let data_type = if let Some(items_str) = items.as_str() {
388 format!("ARRAY<{}>", self.map_avro_type_to_sql(items_str))
389 } else if let Some(items_obj) = items.as_object() {
390 if items_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
391 let nested_name = items_obj
393 .get("name")
394 .and_then(|v| v.as_str())
395 .unwrap_or(&field_name);
396 let nested_fields = items_obj
397 .get("fields")
398 .and_then(|v| v.as_array())
399 .ok_or_else(|| anyhow::anyhow!("Array record missing fields"))?;
400
401 for nested_field in nested_fields {
402 match self.parse_field(nested_field, nested_name, errors) {
403 Ok(mut nested_cols) => {
404 for col in nested_cols.iter_mut() {
405 col.name = format!("{}.{}", field_name, col.name);
406 }
407 columns.append(&mut nested_cols);
408 }
409 Err(e) => {
410 errors.push(ParserError {
411 error_type: "parse_error".to_string(),
412 field: Some(format!("{}.{}", field_name, nested_name)),
413 message: format!("Failed to parse array item field: {}", e),
414 });
415 }
416 }
417 }
418 return Ok(columns);
419 } else {
420 format!("ARRAY<{}>", "STRUCT")
421 }
422 } else {
423 "ARRAY<STRING>".to_string()
424 };
425
426 columns.push(Column {
427 name: field_name,
428 data_type,
429 nullable,
430 primary_key: false,
431 secondary_key: false,
432 composite_key: None,
433 foreign_key: None,
434 constraints: Vec::new(),
435 description,
436 quality: Vec::new(),
437 ref_path: None,
438 enum_values: Vec::new(),
439 errors: Vec::new(),
440 column_order: 0,
441 });
442 } else {
443 columns.push(Column {
445 name: field_name,
446 data_type: "STRUCT".to_string(),
447 nullable,
448 primary_key: false,
449 secondary_key: false,
450 composite_key: None,
451 foreign_key: None,
452 constraints: Vec::new(),
453 description,
454 quality: Vec::new(),
455 ref_path: None,
456 enum_values: Vec::new(),
457 errors: Vec::new(),
458 column_order: 0,
459 });
460 }
461 } else {
462 return Err(anyhow::anyhow!("Unsupported field type format"));
463 }
464
465 Ok(columns)
466 }
467
468 fn map_avro_type_to_sql(&self, avro_type: &str) -> String {
470 match avro_type {
471 "int" => "INTEGER".to_string(),
472 "long" => "BIGINT".to_string(),
473 "float" => "FLOAT".to_string(),
474 "double" => "DOUBLE".to_string(),
475 "boolean" => "BOOLEAN".to_string(),
476 "bytes" => "BYTES".to_string(),
477 "string" => "STRING".to_string(),
478 "null" => "NULL".to_string(),
479 _ => "STRING".to_string(), }
481 }
482}
483
484#[derive(Debug, Clone)]
486pub struct ParserError {
487 pub error_type: String,
488 pub field: Option<String>,
489 pub message: String,
490}