1use crate::import::{ImportError, ImportResult, TableData};
10use crate::models::{Column, Table};
11use crate::validation::input::{validate_column_name, validate_table_name};
12use anyhow::{Context, Result};
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use tracing::{info, warn};
16
17#[derive(Default)]
19pub struct AvroImporter;
20
21impl AvroImporter {
22 pub fn new() -> Self {
32 Self
33 }
34
35 pub fn import(&self, avro_content: &str) -> Result<ImportResult, ImportError> {
63 match self.parse(avro_content) {
64 Ok((tables, errors)) => {
65 let mut sdk_tables = Vec::new();
66 for (idx, table) in tables.iter().enumerate() {
67 sdk_tables.push(TableData {
68 table_index: idx,
69 name: Some(table.name.clone()),
70 columns: table
71 .columns
72 .iter()
73 .map(|c| super::ColumnData {
74 name: c.name.clone(),
75 data_type: c.data_type.clone(),
76 nullable: c.nullable,
77 primary_key: c.primary_key,
78 })
79 .collect(),
80 });
81 }
82 let sdk_errors: Vec<ImportError> = errors
83 .iter()
84 .map(|e| ImportError::ParseError(e.message.clone()))
85 .collect();
86 Ok(ImportResult {
87 tables: sdk_tables,
88 tables_requiring_name: Vec::new(),
89 errors: sdk_errors,
90 ai_suggestions: None,
91 })
92 }
93 Err(e) => Err(ImportError::ParseError(e.to_string())),
94 }
95 }
96
97 fn parse(&self, avro_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
103 let mut errors = Vec::new();
104
105 let schema: Value =
107 serde_json::from_str(avro_content).context("Failed to parse AVRO schema as JSON")?;
108
109 let mut tables = Vec::new();
110
111 if let Some(schemas) = schema.as_array() {
113 for (idx, schema_item) in schemas.iter().enumerate() {
115 match self.parse_schema(schema_item, &mut errors) {
116 Ok(table) => tables.push(table),
117 Err(e) => {
118 errors.push(ParserError {
119 error_type: "parse_error".to_string(),
120 field: Some(format!("schema[{}]", idx)),
121 message: format!("Failed to parse schema: {}", e),
122 });
123 }
124 }
125 }
126 } else {
127 match self.parse_schema(&schema, &mut errors) {
129 Ok(table) => tables.push(table),
130 Err(e) => {
131 errors.push(ParserError {
132 error_type: "parse_error".to_string(),
133 field: None,
134 message: format!("Failed to parse schema: {}", e),
135 });
136 }
137 }
138 }
139
140 Ok((tables, errors))
141 }
142
143 fn parse_schema(&self, schema: &Value, errors: &mut Vec<ParserError>) -> Result<Table> {
145 let schema_obj = schema
146 .as_object()
147 .ok_or_else(|| anyhow::anyhow!("Schema must be an object"))?;
148
149 let name = schema_obj
151 .get("name")
152 .and_then(|v| v.as_str())
153 .ok_or_else(|| anyhow::anyhow!("Missing required field: name"))?
154 .to_string();
155
156 if let Err(e) = validate_table_name(&name) {
158 warn!("Table name validation warning for '{}': {}", name, e);
159 }
160
161 let namespace = schema_obj
163 .get("namespace")
164 .and_then(|v| v.as_str())
165 .map(|s| s.to_string());
166
167 let fields = schema_obj
169 .get("fields")
170 .and_then(|v| v.as_array())
171 .ok_or_else(|| anyhow::anyhow!("Missing required field: fields"))?;
172
173 let mut columns = Vec::new();
174 for (idx, field) in fields.iter().enumerate() {
175 match self.parse_field(field, &name, errors) {
176 Ok(mut cols) => columns.append(&mut cols),
177 Err(e) => {
178 errors.push(ParserError {
179 error_type: "parse_error".to_string(),
180 field: Some(format!("fields[{}]", idx)),
181 message: format!("Failed to parse field: {}", e),
182 });
183 }
184 }
185 }
186
187 let mut odcl_metadata = HashMap::new();
189 if let Some(ref ns) = namespace {
190 odcl_metadata.insert("namespace".to_string(), json!(ns));
191 }
192 if let Some(doc) = schema_obj.get("doc").and_then(|v| v.as_str()) {
193 odcl_metadata.insert("description".to_string(), json!(doc));
194 }
195
196 let table = Table {
197 id: crate::models::table::Table::generate_id(&name, None, None, namespace.as_deref()),
198 name: name.clone(),
199 columns,
200 database_type: None,
201 catalog_name: None,
202 schema_name: namespace.clone(),
203 medallion_layers: Vec::new(),
204 scd_pattern: None,
205 data_vault_classification: None,
206 modeling_level: None,
207 tags: Vec::new(),
208 odcl_metadata,
209 position: None,
210 yaml_file_path: None,
211 drawio_cell_id: None,
212 quality: Vec::new(),
213 errors: Vec::new(),
214 created_at: chrono::Utc::now(),
215 updated_at: chrono::Utc::now(),
216 };
217
218 info!(
219 "Parsed AVRO schema: {} with {} columns",
220 name,
221 table.columns.len()
222 );
223 Ok(table)
224 }
225
226 fn parse_field(
228 &self,
229 field: &Value,
230 _parent_name: &str,
231 errors: &mut Vec<ParserError>,
232 ) -> Result<Vec<Column>> {
233 let field_obj = field
234 .as_object()
235 .ok_or_else(|| anyhow::anyhow!("Field must be an object"))?;
236
237 let field_name = field_obj
238 .get("name")
239 .and_then(|v| v.as_str())
240 .ok_or_else(|| anyhow::anyhow!("Field missing name"))?
241 .to_string();
242
243 if let Err(e) = validate_column_name(&field_name) {
245 warn!("Column name validation warning for '{}': {}", field_name, e);
246 }
247
248 let field_type = field_obj
249 .get("type")
250 .ok_or_else(|| anyhow::anyhow!("Field missing type"))?;
251
252 let description = field_obj
253 .get("doc")
254 .and_then(|v| v.as_str())
255 .map(|s| s.to_string())
256 .unwrap_or_default();
257
258 let (avro_type, nullable) = if let Some(types) = field_type.as_array() {
260 if types.len() == 2 && types.iter().any(|t| t.as_str() == Some("null")) {
261 let non_null_type = types
263 .iter()
264 .find(|t| t.as_str() != Some("null"))
265 .ok_or_else(|| anyhow::anyhow!("Invalid union type"))?;
266 (non_null_type, true)
267 } else {
268 let first_non_null = types
271 .iter()
272 .find(|t| t.as_str() != Some("null"))
273 .unwrap_or(field_type);
274 (first_non_null, true)
275 }
276 } else {
277 (field_type, false)
278 };
279
280 let mut columns = Vec::new();
282 if let Some(type_str) = avro_type.as_str() {
283 let data_type = self.map_avro_type_to_sql(type_str);
285 columns.push(Column {
286 name: field_name,
287 data_type,
288 nullable,
289 primary_key: false,
290 secondary_key: false,
291 composite_key: None,
292 foreign_key: None,
293 constraints: Vec::new(),
294 description,
295 quality: Vec::new(),
296 enum_values: Vec::new(),
297 errors: Vec::new(),
298 column_order: 0,
299 });
300 } else if let Some(type_obj) = avro_type.as_object() {
301 if type_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
303 let nested_name = type_obj
305 .get("name")
306 .and_then(|v| v.as_str())
307 .unwrap_or(&field_name);
308 let nested_fields = type_obj
309 .get("fields")
310 .and_then(|v| v.as_array())
311 .ok_or_else(|| anyhow::anyhow!("Nested record missing fields"))?;
312
313 for nested_field in nested_fields {
314 match self.parse_field(nested_field, nested_name, errors) {
315 Ok(mut nested_cols) => {
316 for col in nested_cols.iter_mut() {
318 col.name = format!("{}.{}", field_name, col.name);
319 }
320 columns.append(&mut nested_cols);
321 }
322 Err(e) => {
323 errors.push(ParserError {
324 error_type: "parse_error".to_string(),
325 field: Some(format!("{}.{}", field_name, nested_name)),
326 message: format!("Failed to parse nested field: {}", e),
327 });
328 }
329 }
330 }
331 } else if type_obj.get("type").and_then(|v| v.as_str()) == Some("array") {
332 let items = type_obj
334 .get("items")
335 .ok_or_else(|| anyhow::anyhow!("Array type missing items"))?;
336
337 let data_type = if let Some(items_str) = items.as_str() {
338 format!("ARRAY<{}>", self.map_avro_type_to_sql(items_str))
339 } else if let Some(items_obj) = items.as_object() {
340 if items_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
341 let nested_name = items_obj
343 .get("name")
344 .and_then(|v| v.as_str())
345 .unwrap_or(&field_name);
346 let nested_fields = items_obj
347 .get("fields")
348 .and_then(|v| v.as_array())
349 .ok_or_else(|| anyhow::anyhow!("Array record missing fields"))?;
350
351 for nested_field in nested_fields {
352 match self.parse_field(nested_field, nested_name, errors) {
353 Ok(mut nested_cols) => {
354 for col in nested_cols.iter_mut() {
355 col.name = format!("{}.{}", field_name, col.name);
356 }
357 columns.append(&mut nested_cols);
358 }
359 Err(e) => {
360 errors.push(ParserError {
361 error_type: "parse_error".to_string(),
362 field: Some(format!("{}.{}", field_name, nested_name)),
363 message: format!("Failed to parse array item field: {}", e),
364 });
365 }
366 }
367 }
368 return Ok(columns);
369 } else {
370 format!("ARRAY<{}>", "STRUCT")
371 }
372 } else {
373 "ARRAY<STRING>".to_string()
374 };
375
376 columns.push(Column {
377 name: field_name,
378 data_type,
379 nullable,
380 primary_key: false,
381 secondary_key: false,
382 composite_key: None,
383 foreign_key: None,
384 constraints: Vec::new(),
385 description,
386 quality: Vec::new(),
387 enum_values: Vec::new(),
388 errors: Vec::new(),
389 column_order: 0,
390 });
391 } else {
392 columns.push(Column {
394 name: field_name,
395 data_type: "STRUCT".to_string(),
396 nullable,
397 primary_key: false,
398 secondary_key: false,
399 composite_key: None,
400 foreign_key: None,
401 constraints: Vec::new(),
402 description,
403 quality: Vec::new(),
404 enum_values: Vec::new(),
405 errors: Vec::new(),
406 column_order: 0,
407 });
408 }
409 } else {
410 return Err(anyhow::anyhow!("Unsupported field type format"));
411 }
412
413 Ok(columns)
414 }
415
416 fn map_avro_type_to_sql(&self, avro_type: &str) -> String {
418 match avro_type {
419 "int" => "INTEGER".to_string(),
420 "long" => "BIGINT".to_string(),
421 "float" => "FLOAT".to_string(),
422 "double" => "DOUBLE".to_string(),
423 "boolean" => "BOOLEAN".to_string(),
424 "bytes" => "BYTES".to_string(),
425 "string" => "STRING".to_string(),
426 "null" => "NULL".to_string(),
427 _ => "STRING".to_string(), }
429 }
430}
431
432#[derive(Debug, Clone)]
434pub struct ParserError {
435 pub error_type: String,
436 pub field: Option<String>,
437 pub message: String,
438}