1use crate::import::{ImportError, ImportResult, TableData};
10use crate::models::{Column, Table, Tag};
11use crate::validation::input::{validate_column_name, validate_table_name};
12use anyhow::{Context, Result};
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use std::str::FromStr;
16use tracing::{info, warn};
17
18#[derive(Default)]
20pub struct AvroImporter;
21
22impl AvroImporter {
23 pub fn new() -> Self {
33 Self
34 }
35
36 pub fn import(&self, avro_content: &str) -> Result<ImportResult, ImportError> {
64 match self.parse(avro_content) {
65 Ok((tables, errors)) => {
66 let mut sdk_tables = Vec::new();
67 for (idx, table) in tables.iter().enumerate() {
68 sdk_tables.push(TableData {
69 table_index: idx,
70 name: Some(table.name.clone()),
71 columns: table
72 .columns
73 .iter()
74 .map(|c| super::ColumnData {
75 name: c.name.clone(),
76 data_type: c.data_type.clone(),
77 nullable: c.nullable,
78 primary_key: c.primary_key,
79 description: if c.description.is_empty() {
80 None
81 } else {
82 Some(c.description.clone())
83 },
84 quality: if c.quality.is_empty() {
85 None
86 } else {
87 Some(c.quality.clone())
88 },
89 ref_path: c.ref_path.clone(),
90 enum_values: if c.enum_values.is_empty() {
91 None
92 } else {
93 Some(c.enum_values.clone())
94 },
95 })
96 .collect(),
97 });
98 }
99 let sdk_errors: Vec<ImportError> = errors
100 .iter()
101 .map(|e| ImportError::ParseError(e.message.clone()))
102 .collect();
103 Ok(ImportResult {
104 tables: sdk_tables,
105 tables_requiring_name: Vec::new(),
106 errors: sdk_errors,
107 ai_suggestions: None,
108 })
109 }
110 Err(e) => Err(ImportError::ParseError(e.to_string())),
111 }
112 }
113
114 fn parse(&self, avro_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
120 let mut errors = Vec::new();
121
122 let schema: Value =
124 serde_json::from_str(avro_content).context("Failed to parse AVRO schema as JSON")?;
125
126 let mut tables = Vec::new();
127
128 if let Some(schemas) = schema.as_array() {
130 for (idx, schema_item) in schemas.iter().enumerate() {
132 match self.parse_schema(schema_item, &mut errors) {
133 Ok(table) => tables.push(table),
134 Err(e) => {
135 errors.push(ParserError {
136 error_type: "parse_error".to_string(),
137 field: Some(format!("schema[{}]", idx)),
138 message: format!("Failed to parse schema: {}", e),
139 });
140 }
141 }
142 }
143 } else {
144 match self.parse_schema(&schema, &mut errors) {
146 Ok(table) => tables.push(table),
147 Err(e) => {
148 errors.push(ParserError {
149 error_type: "parse_error".to_string(),
150 field: None,
151 message: format!("Failed to parse schema: {}", e),
152 });
153 }
154 }
155 }
156
157 Ok((tables, errors))
158 }
159
160 fn parse_schema(&self, schema: &Value, errors: &mut Vec<ParserError>) -> Result<Table> {
162 let schema_obj = schema
163 .as_object()
164 .ok_or_else(|| anyhow::anyhow!("Schema must be an object"))?;
165
166 let name = schema_obj
168 .get("name")
169 .and_then(|v| v.as_str())
170 .ok_or_else(|| anyhow::anyhow!("Missing required field: name"))?
171 .to_string();
172
173 if let Err(e) = validate_table_name(&name) {
175 warn!("Table name validation warning for '{}': {}", name, e);
176 }
177
178 let namespace = schema_obj
180 .get("namespace")
181 .and_then(|v| v.as_str())
182 .map(|s| s.to_string());
183
184 let fields = schema_obj
186 .get("fields")
187 .and_then(|v| v.as_array())
188 .ok_or_else(|| anyhow::anyhow!("Missing required field: fields"))?;
189
190 let mut columns = Vec::new();
191 for (idx, field) in fields.iter().enumerate() {
192 match self.parse_field(field, &name, errors) {
193 Ok(mut cols) => columns.append(&mut cols),
194 Err(e) => {
195 errors.push(ParserError {
196 error_type: "parse_error".to_string(),
197 field: Some(format!("fields[{}]", idx)),
198 message: format!("Failed to parse field: {}", e),
199 });
200 }
201 }
202 }
203
204 let mut tags: Vec<Tag> = Vec::new();
206 if let Some(tags_arr) = schema_obj.get("tags").and_then(|v| v.as_array()) {
207 for item in tags_arr {
208 if let Some(s) = item.as_str() {
209 if let Ok(tag) = Tag::from_str(s) {
210 tags.push(tag);
211 } else {
212 tags.push(Tag::Simple(s.to_string()));
213 }
214 }
215 }
216 }
217 if let Some(aliases_arr) = schema_obj.get("aliases").and_then(|v| v.as_array()) {
219 for item in aliases_arr {
220 if let Some(s) = item.as_str() {
221 if let Ok(tag) = Tag::from_str(s) {
223 if !tags.contains(&tag) {
224 tags.push(tag);
225 }
226 } else {
227 let simple_tag = Tag::Simple(s.to_string());
228 if !tags.contains(&simple_tag) {
229 tags.push(simple_tag);
230 }
231 }
232 }
233 }
234 }
235
236 let mut odcl_metadata = HashMap::new();
238 if let Some(ref ns) = namespace {
239 odcl_metadata.insert("namespace".to_string(), json!(ns));
240 }
241 if let Some(doc) = schema_obj.get("doc").and_then(|v| v.as_str()) {
242 odcl_metadata.insert("description".to_string(), json!(doc));
243 }
244
245 let table = Table {
246 id: crate::models::table::Table::generate_id(&name, None, None, namespace.as_deref()),
247 name: name.clone(),
248 columns,
249 database_type: None,
250 catalog_name: None,
251 schema_name: namespace.clone(),
252 medallion_layers: Vec::new(),
253 scd_pattern: None,
254 data_vault_classification: None,
255 modeling_level: None,
256 tags,
257 odcl_metadata,
258 owner: None,
259 sla: None,
260 contact_details: None,
261 infrastructure_type: None,
262 notes: None,
263 position: None,
264 yaml_file_path: None,
265 drawio_cell_id: None,
266 quality: Vec::new(),
267 errors: Vec::new(),
268 created_at: chrono::Utc::now(),
269 updated_at: chrono::Utc::now(),
270 };
271
272 info!(
273 "Parsed AVRO schema: {} with {} columns",
274 name,
275 table.columns.len()
276 );
277 Ok(table)
278 }
279
280 fn parse_field(
282 &self,
283 field: &Value,
284 _parent_name: &str,
285 errors: &mut Vec<ParserError>,
286 ) -> Result<Vec<Column>> {
287 let field_obj = field
288 .as_object()
289 .ok_or_else(|| anyhow::anyhow!("Field must be an object"))?;
290
291 let field_name = field_obj
292 .get("name")
293 .and_then(|v| v.as_str())
294 .ok_or_else(|| anyhow::anyhow!("Field missing name"))?
295 .to_string();
296
297 if let Err(e) = validate_column_name(&field_name) {
299 warn!("Column name validation warning for '{}': {}", field_name, e);
300 }
301
302 let field_type = field_obj
303 .get("type")
304 .ok_or_else(|| anyhow::anyhow!("Field missing type"))?;
305
306 let description = field_obj
307 .get("doc")
308 .and_then(|v| v.as_str())
309 .map(|s| s.to_string())
310 .unwrap_or_default();
311
312 let (avro_type, nullable) = if let Some(types) = field_type.as_array() {
314 if types.len() == 2 && types.iter().any(|t| t.as_str() == Some("null")) {
315 let non_null_type = types
317 .iter()
318 .find(|t| t.as_str() != Some("null"))
319 .ok_or_else(|| anyhow::anyhow!("Invalid union type"))?;
320 (non_null_type, true)
321 } else {
322 let first_non_null = types
325 .iter()
326 .find(|t| t.as_str() != Some("null"))
327 .unwrap_or(field_type);
328 (first_non_null, true)
329 }
330 } else {
331 (field_type, false)
332 };
333
334 let mut columns = Vec::new();
336 if let Some(type_str) = avro_type.as_str() {
337 let data_type = self.map_avro_type_to_sql(type_str);
339 columns.push(Column {
340 name: field_name,
341 data_type,
342 nullable,
343 primary_key: false,
344 secondary_key: false,
345 composite_key: None,
346 foreign_key: None,
347 constraints: Vec::new(),
348 description,
349 quality: Vec::new(),
350 ref_path: None,
351 enum_values: Vec::new(),
352 errors: Vec::new(),
353 column_order: 0,
354 });
355 } else if let Some(type_obj) = avro_type.as_object() {
356 if type_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
358 let nested_name = type_obj
360 .get("name")
361 .and_then(|v| v.as_str())
362 .unwrap_or(&field_name);
363 let nested_fields = type_obj
364 .get("fields")
365 .and_then(|v| v.as_array())
366 .ok_or_else(|| anyhow::anyhow!("Nested record missing fields"))?;
367
368 for nested_field in nested_fields {
369 match self.parse_field(nested_field, nested_name, errors) {
370 Ok(mut nested_cols) => {
371 for col in nested_cols.iter_mut() {
373 col.name = format!("{}.{}", field_name, col.name);
374 }
375 columns.append(&mut nested_cols);
376 }
377 Err(e) => {
378 errors.push(ParserError {
379 error_type: "parse_error".to_string(),
380 field: Some(format!("{}.{}", field_name, nested_name)),
381 message: format!("Failed to parse nested field: {}", e),
382 });
383 }
384 }
385 }
386 } else if type_obj.get("type").and_then(|v| v.as_str()) == Some("array") {
387 let items = type_obj
389 .get("items")
390 .ok_or_else(|| anyhow::anyhow!("Array type missing items"))?;
391
392 let data_type = if let Some(items_str) = items.as_str() {
393 format!("ARRAY<{}>", self.map_avro_type_to_sql(items_str))
394 } else if let Some(items_obj) = items.as_object() {
395 if items_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
396 let nested_name = items_obj
398 .get("name")
399 .and_then(|v| v.as_str())
400 .unwrap_or(&field_name);
401 let nested_fields = items_obj
402 .get("fields")
403 .and_then(|v| v.as_array())
404 .ok_or_else(|| anyhow::anyhow!("Array record missing fields"))?;
405
406 for nested_field in nested_fields {
407 match self.parse_field(nested_field, nested_name, errors) {
408 Ok(mut nested_cols) => {
409 for col in nested_cols.iter_mut() {
410 col.name = format!("{}.{}", field_name, col.name);
411 }
412 columns.append(&mut nested_cols);
413 }
414 Err(e) => {
415 errors.push(ParserError {
416 error_type: "parse_error".to_string(),
417 field: Some(format!("{}.{}", field_name, nested_name)),
418 message: format!("Failed to parse array item field: {}", e),
419 });
420 }
421 }
422 }
423 return Ok(columns);
424 } else {
425 format!("ARRAY<{}>", "STRUCT")
426 }
427 } else {
428 "ARRAY<STRING>".to_string()
429 };
430
431 columns.push(Column {
432 name: field_name,
433 data_type,
434 nullable,
435 primary_key: false,
436 secondary_key: false,
437 composite_key: None,
438 foreign_key: None,
439 constraints: Vec::new(),
440 description,
441 quality: Vec::new(),
442 ref_path: None,
443 enum_values: Vec::new(),
444 errors: Vec::new(),
445 column_order: 0,
446 });
447 } else {
448 columns.push(Column {
450 name: field_name,
451 data_type: "STRUCT".to_string(),
452 nullable,
453 primary_key: false,
454 secondary_key: false,
455 composite_key: None,
456 foreign_key: None,
457 constraints: Vec::new(),
458 description,
459 quality: Vec::new(),
460 ref_path: None,
461 enum_values: Vec::new(),
462 errors: Vec::new(),
463 column_order: 0,
464 });
465 }
466 } else {
467 return Err(anyhow::anyhow!("Unsupported field type format"));
468 }
469
470 Ok(columns)
471 }
472
473 fn map_avro_type_to_sql(&self, avro_type: &str) -> String {
475 match avro_type {
476 "int" => "INTEGER".to_string(),
477 "long" => "BIGINT".to_string(),
478 "float" => "FLOAT".to_string(),
479 "double" => "DOUBLE".to_string(),
480 "boolean" => "BOOLEAN".to_string(),
481 "bytes" => "BYTES".to_string(),
482 "string" => "STRING".to_string(),
483 "null" => "NULL".to_string(),
484 _ => "STRING".to_string(), }
486 }
487}
488
489#[derive(Debug, Clone)]
491pub struct ParserError {
492 pub error_type: String,
493 pub field: Option<String>,
494 pub message: String,
495}