1use crate::import::{ImportError, ImportResult, TableData};
10use crate::models::{Column, Table};
11use crate::validation::input::{validate_column_name, validate_table_name};
12use anyhow::{Context, Result};
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use tracing::{info, warn};
16
17#[derive(Default)]
19pub struct AvroImporter;
20
21impl AvroImporter {
22 pub fn new() -> Self {
32 Self
33 }
34
35 pub fn import(&self, avro_content: &str) -> Result<ImportResult, ImportError> {
63 match self.parse(avro_content) {
64 Ok((tables, errors)) => {
65 let mut sdk_tables = Vec::new();
66 for (idx, table) in tables.iter().enumerate() {
67 sdk_tables.push(TableData {
68 table_index: idx,
69 name: Some(table.name.clone()),
70 columns: table
71 .columns
72 .iter()
73 .map(|c| super::ColumnData {
74 name: c.name.clone(),
75 data_type: c.data_type.clone(),
76 nullable: c.nullable,
77 primary_key: c.primary_key,
78 })
79 .collect(),
80 });
81 }
82 let sdk_errors: Vec<ImportError> = errors
83 .iter()
84 .map(|e| ImportError::ParseError(e.message.clone()))
85 .collect();
86 Ok(ImportResult {
87 tables: sdk_tables,
88 tables_requiring_name: Vec::new(),
89 errors: sdk_errors,
90 ai_suggestions: None,
91 })
92 }
93 Err(e) => Err(ImportError::ParseError(e.to_string())),
94 }
95 }
96
97 fn parse(&self, avro_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
103 let mut errors = Vec::new();
104
105 let schema: Value =
107 serde_json::from_str(avro_content).context("Failed to parse AVRO schema as JSON")?;
108
109 let mut tables = Vec::new();
110
111 if let Some(schemas) = schema.as_array() {
113 for (idx, schema_item) in schemas.iter().enumerate() {
115 match self.parse_schema(schema_item, &mut errors) {
116 Ok(table) => tables.push(table),
117 Err(e) => {
118 errors.push(ParserError {
119 error_type: "parse_error".to_string(),
120 field: Some(format!("schema[{}]", idx)),
121 message: format!("Failed to parse schema: {}", e),
122 });
123 }
124 }
125 }
126 } else {
127 match self.parse_schema(&schema, &mut errors) {
129 Ok(table) => tables.push(table),
130 Err(e) => {
131 errors.push(ParserError {
132 error_type: "parse_error".to_string(),
133 field: None,
134 message: format!("Failed to parse schema: {}", e),
135 });
136 }
137 }
138 }
139
140 Ok((tables, errors))
141 }
142
143 fn parse_schema(&self, schema: &Value, errors: &mut Vec<ParserError>) -> Result<Table> {
145 let schema_obj = schema
146 .as_object()
147 .ok_or_else(|| anyhow::anyhow!("Schema must be an object"))?;
148
149 let name = schema_obj
151 .get("name")
152 .and_then(|v| v.as_str())
153 .ok_or_else(|| anyhow::anyhow!("Missing required field: name"))?
154 .to_string();
155
156 if let Err(e) = validate_table_name(&name) {
158 warn!("Table name validation warning for '{}': {}", name, e);
159 }
160
161 let namespace = schema_obj
163 .get("namespace")
164 .and_then(|v| v.as_str())
165 .map(|s| s.to_string());
166
167 let fields = schema_obj
169 .get("fields")
170 .and_then(|v| v.as_array())
171 .ok_or_else(|| anyhow::anyhow!("Missing required field: fields"))?;
172
173 let mut columns = Vec::new();
174 for (idx, field) in fields.iter().enumerate() {
175 match self.parse_field(field, &name, errors) {
176 Ok(mut cols) => columns.append(&mut cols),
177 Err(e) => {
178 errors.push(ParserError {
179 error_type: "parse_error".to_string(),
180 field: Some(format!("fields[{}]", idx)),
181 message: format!("Failed to parse field: {}", e),
182 });
183 }
184 }
185 }
186
187 let mut odcl_metadata = HashMap::new();
189 if let Some(ref ns) = namespace {
190 odcl_metadata.insert("namespace".to_string(), json!(ns));
191 }
192 if let Some(doc) = schema_obj.get("doc").and_then(|v| v.as_str()) {
193 odcl_metadata.insert("description".to_string(), json!(doc));
194 }
195
196 let table = Table {
197 id: crate::models::table::Table::generate_id(&name, None, None, namespace.as_deref()),
198 name: name.clone(),
199 columns,
200 database_type: None,
201 catalog_name: None,
202 schema_name: namespace.clone(),
203 medallion_layers: Vec::new(),
204 scd_pattern: None,
205 data_vault_classification: None,
206 modeling_level: None,
207 tags: Vec::new(),
208 odcl_metadata,
209 owner: None,
210 sla: None,
211 contact_details: None,
212 infrastructure_type: None,
213 notes: None,
214 position: None,
215 yaml_file_path: None,
216 drawio_cell_id: None,
217 quality: Vec::new(),
218 errors: Vec::new(),
219 created_at: chrono::Utc::now(),
220 updated_at: chrono::Utc::now(),
221 };
222
223 info!(
224 "Parsed AVRO schema: {} with {} columns",
225 name,
226 table.columns.len()
227 );
228 Ok(table)
229 }
230
231 fn parse_field(
233 &self,
234 field: &Value,
235 _parent_name: &str,
236 errors: &mut Vec<ParserError>,
237 ) -> Result<Vec<Column>> {
238 let field_obj = field
239 .as_object()
240 .ok_or_else(|| anyhow::anyhow!("Field must be an object"))?;
241
242 let field_name = field_obj
243 .get("name")
244 .and_then(|v| v.as_str())
245 .ok_or_else(|| anyhow::anyhow!("Field missing name"))?
246 .to_string();
247
248 if let Err(e) = validate_column_name(&field_name) {
250 warn!("Column name validation warning for '{}': {}", field_name, e);
251 }
252
253 let field_type = field_obj
254 .get("type")
255 .ok_or_else(|| anyhow::anyhow!("Field missing type"))?;
256
257 let description = field_obj
258 .get("doc")
259 .and_then(|v| v.as_str())
260 .map(|s| s.to_string())
261 .unwrap_or_default();
262
263 let (avro_type, nullable) = if let Some(types) = field_type.as_array() {
265 if types.len() == 2 && types.iter().any(|t| t.as_str() == Some("null")) {
266 let non_null_type = types
268 .iter()
269 .find(|t| t.as_str() != Some("null"))
270 .ok_or_else(|| anyhow::anyhow!("Invalid union type"))?;
271 (non_null_type, true)
272 } else {
273 let first_non_null = types
276 .iter()
277 .find(|t| t.as_str() != Some("null"))
278 .unwrap_or(field_type);
279 (first_non_null, true)
280 }
281 } else {
282 (field_type, false)
283 };
284
285 let mut columns = Vec::new();
287 if let Some(type_str) = avro_type.as_str() {
288 let data_type = self.map_avro_type_to_sql(type_str);
290 columns.push(Column {
291 name: field_name,
292 data_type,
293 nullable,
294 primary_key: false,
295 secondary_key: false,
296 composite_key: None,
297 foreign_key: None,
298 constraints: Vec::new(),
299 description,
300 quality: Vec::new(),
301 enum_values: Vec::new(),
302 errors: Vec::new(),
303 column_order: 0,
304 });
305 } else if let Some(type_obj) = avro_type.as_object() {
306 if type_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
308 let nested_name = type_obj
310 .get("name")
311 .and_then(|v| v.as_str())
312 .unwrap_or(&field_name);
313 let nested_fields = type_obj
314 .get("fields")
315 .and_then(|v| v.as_array())
316 .ok_or_else(|| anyhow::anyhow!("Nested record missing fields"))?;
317
318 for nested_field in nested_fields {
319 match self.parse_field(nested_field, nested_name, errors) {
320 Ok(mut nested_cols) => {
321 for col in nested_cols.iter_mut() {
323 col.name = format!("{}.{}", field_name, col.name);
324 }
325 columns.append(&mut nested_cols);
326 }
327 Err(e) => {
328 errors.push(ParserError {
329 error_type: "parse_error".to_string(),
330 field: Some(format!("{}.{}", field_name, nested_name)),
331 message: format!("Failed to parse nested field: {}", e),
332 });
333 }
334 }
335 }
336 } else if type_obj.get("type").and_then(|v| v.as_str()) == Some("array") {
337 let items = type_obj
339 .get("items")
340 .ok_or_else(|| anyhow::anyhow!("Array type missing items"))?;
341
342 let data_type = if let Some(items_str) = items.as_str() {
343 format!("ARRAY<{}>", self.map_avro_type_to_sql(items_str))
344 } else if let Some(items_obj) = items.as_object() {
345 if items_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
346 let nested_name = items_obj
348 .get("name")
349 .and_then(|v| v.as_str())
350 .unwrap_or(&field_name);
351 let nested_fields = items_obj
352 .get("fields")
353 .and_then(|v| v.as_array())
354 .ok_or_else(|| anyhow::anyhow!("Array record missing fields"))?;
355
356 for nested_field in nested_fields {
357 match self.parse_field(nested_field, nested_name, errors) {
358 Ok(mut nested_cols) => {
359 for col in nested_cols.iter_mut() {
360 col.name = format!("{}.{}", field_name, col.name);
361 }
362 columns.append(&mut nested_cols);
363 }
364 Err(e) => {
365 errors.push(ParserError {
366 error_type: "parse_error".to_string(),
367 field: Some(format!("{}.{}", field_name, nested_name)),
368 message: format!("Failed to parse array item field: {}", e),
369 });
370 }
371 }
372 }
373 return Ok(columns);
374 } else {
375 format!("ARRAY<{}>", "STRUCT")
376 }
377 } else {
378 "ARRAY<STRING>".to_string()
379 };
380
381 columns.push(Column {
382 name: field_name,
383 data_type,
384 nullable,
385 primary_key: false,
386 secondary_key: false,
387 composite_key: None,
388 foreign_key: None,
389 constraints: Vec::new(),
390 description,
391 quality: Vec::new(),
392 enum_values: Vec::new(),
393 errors: Vec::new(),
394 column_order: 0,
395 });
396 } else {
397 columns.push(Column {
399 name: field_name,
400 data_type: "STRUCT".to_string(),
401 nullable,
402 primary_key: false,
403 secondary_key: false,
404 composite_key: None,
405 foreign_key: None,
406 constraints: Vec::new(),
407 description,
408 quality: Vec::new(),
409 enum_values: Vec::new(),
410 errors: Vec::new(),
411 column_order: 0,
412 });
413 }
414 } else {
415 return Err(anyhow::anyhow!("Unsupported field type format"));
416 }
417
418 Ok(columns)
419 }
420
421 fn map_avro_type_to_sql(&self, avro_type: &str) -> String {
423 match avro_type {
424 "int" => "INTEGER".to_string(),
425 "long" => "BIGINT".to_string(),
426 "float" => "FLOAT".to_string(),
427 "double" => "DOUBLE".to_string(),
428 "boolean" => "BOOLEAN".to_string(),
429 "bytes" => "BYTES".to_string(),
430 "string" => "STRING".to_string(),
431 "null" => "NULL".to_string(),
432 _ => "STRING".to_string(), }
434 }
435}
436
437#[derive(Debug, Clone)]
439pub struct ParserError {
440 pub error_type: String,
441 pub field: Option<String>,
442 pub message: String,
443}