data_modelling_core/import/
avro.rs1use crate::import::odcs_shared::column_to_column_data;
10use crate::import::{ImportError, ImportResult, TableData};
11use crate::models::{Column, Table, Tag};
12use crate::validation::input::{validate_column_name, validate_table_name};
13use anyhow::{Context, Result};
14use serde_json::{Value, json};
15use std::collections::HashMap;
16use std::str::FromStr;
17use tracing::{info, warn};
18
19#[derive(Default)]
21pub struct AvroImporter;
22
23impl AvroImporter {
24 pub fn new() -> Self {
34 Self
35 }
36
37 pub fn import(&self, avro_content: &str) -> Result<ImportResult, ImportError> {
65 match self.parse(avro_content) {
66 Ok((tables, errors)) => {
67 let mut sdk_tables = Vec::new();
68 for (idx, table) in tables.iter().enumerate() {
69 sdk_tables.push(TableData {
70 table_index: idx,
71 id: Some(table.id.to_string()),
72 name: Some(table.name.clone()),
73 columns: table.columns.iter().map(column_to_column_data).collect(),
74 ..Default::default()
75 });
76 }
77 let sdk_errors: Vec<ImportError> = errors
78 .iter()
79 .map(|e| ImportError::ParseError(e.message.clone()))
80 .collect();
81 Ok(ImportResult {
82 tables: sdk_tables,
83 tables_requiring_name: Vec::new(),
84 errors: sdk_errors,
85 ai_suggestions: None,
86 })
87 }
88 Err(e) => Err(ImportError::ParseError(e.to_string())),
89 }
90 }
91
92 fn parse(&self, avro_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
98 let mut errors = Vec::new();
99
100 let schema: Value =
102 serde_json::from_str(avro_content).context("Failed to parse AVRO schema as JSON")?;
103
104 let mut tables = Vec::new();
105
106 if let Some(schemas) = schema.as_array() {
108 for (idx, schema_item) in schemas.iter().enumerate() {
110 match self.parse_schema(schema_item, &mut errors) {
111 Ok(table) => tables.push(table),
112 Err(e) => {
113 errors.push(ParserError {
114 error_type: "parse_error".to_string(),
115 field: Some(format!("schema[{}]", idx)),
116 message: format!("Failed to parse schema: {}", e),
117 });
118 }
119 }
120 }
121 } else {
122 match self.parse_schema(&schema, &mut errors) {
124 Ok(table) => tables.push(table),
125 Err(e) => {
126 errors.push(ParserError {
127 error_type: "parse_error".to_string(),
128 field: None,
129 message: format!("Failed to parse schema: {}", e),
130 });
131 }
132 }
133 }
134
135 Ok((tables, errors))
136 }
137
138 fn parse_schema(&self, schema: &Value, errors: &mut Vec<ParserError>) -> Result<Table> {
140 let schema_obj = schema
141 .as_object()
142 .ok_or_else(|| anyhow::anyhow!("Schema must be an object"))?;
143
144 let name = schema_obj
146 .get("name")
147 .and_then(|v| v.as_str())
148 .ok_or_else(|| anyhow::anyhow!("Missing required field: name"))?
149 .to_string();
150
151 if let Err(e) = validate_table_name(&name) {
153 warn!("Table name validation warning for '{}': {}", name, e);
154 }
155
156 let namespace = schema_obj
158 .get("namespace")
159 .and_then(|v| v.as_str())
160 .map(|s| s.to_string());
161
162 let fields = schema_obj
164 .get("fields")
165 .and_then(|v| v.as_array())
166 .ok_or_else(|| anyhow::anyhow!("Missing required field: fields"))?;
167
168 let mut columns = Vec::new();
169 for (idx, field) in fields.iter().enumerate() {
170 match self.parse_field(field, &name, errors) {
171 Ok(mut cols) => columns.append(&mut cols),
172 Err(e) => {
173 errors.push(ParserError {
174 error_type: "parse_error".to_string(),
175 field: Some(format!("fields[{}]", idx)),
176 message: format!("Failed to parse field: {}", e),
177 });
178 }
179 }
180 }
181
182 let mut tags: Vec<Tag> = Vec::new();
184 if let Some(tags_arr) = schema_obj.get("tags").and_then(|v| v.as_array()) {
185 for item in tags_arr {
186 if let Some(s) = item.as_str() {
187 if let Ok(tag) = Tag::from_str(s) {
188 tags.push(tag);
189 } else {
190 tags.push(Tag::Simple(s.to_string()));
191 }
192 }
193 }
194 }
195 if let Some(aliases_arr) = schema_obj.get("aliases").and_then(|v| v.as_array()) {
197 for item in aliases_arr {
198 if let Some(s) = item.as_str() {
199 if let Ok(tag) = Tag::from_str(s) {
201 if !tags.contains(&tag) {
202 tags.push(tag);
203 }
204 } else {
205 let simple_tag = Tag::Simple(s.to_string());
206 if !tags.contains(&simple_tag) {
207 tags.push(simple_tag);
208 }
209 }
210 }
211 }
212 }
213
214 let mut odcl_metadata = HashMap::new();
216 if let Some(ref ns) = namespace {
217 odcl_metadata.insert("namespace".to_string(), json!(ns));
218 }
219 if let Some(doc) = schema_obj.get("doc").and_then(|v| v.as_str()) {
220 odcl_metadata.insert("description".to_string(), json!(doc));
221 }
222
223 let table = Table {
224 id: crate::models::table::Table::generate_id(&name, None, None, namespace.as_deref()),
225 name: name.clone(),
226 columns,
227 database_type: None,
228 catalog_name: None,
229 schema_name: namespace.clone(),
230 medallion_layers: Vec::new(),
231 scd_pattern: None,
232 data_vault_classification: None,
233 modeling_level: None,
234 tags,
235 odcl_metadata,
236 owner: None,
237 sla: None,
238 contact_details: None,
239 infrastructure_type: None,
240 notes: None,
241 position: None,
242 yaml_file_path: None,
243 drawio_cell_id: None,
244 quality: Vec::new(),
245 errors: Vec::new(),
246 created_at: chrono::Utc::now(),
247 updated_at: chrono::Utc::now(),
248 };
249
250 info!(
251 "Parsed AVRO schema: {} with {} columns",
252 name,
253 table.columns.len()
254 );
255 Ok(table)
256 }
257
258 fn parse_field(
260 &self,
261 field: &Value,
262 _parent_name: &str,
263 errors: &mut Vec<ParserError>,
264 ) -> Result<Vec<Column>> {
265 let field_obj = field
266 .as_object()
267 .ok_or_else(|| anyhow::anyhow!("Field must be an object"))?;
268
269 let field_name = field_obj
270 .get("name")
271 .and_then(|v| v.as_str())
272 .ok_or_else(|| anyhow::anyhow!("Field missing name"))?
273 .to_string();
274
275 if let Err(e) = validate_column_name(&field_name) {
277 warn!("Column name validation warning for '{}': {}", field_name, e);
278 }
279
280 let field_type = field_obj
281 .get("type")
282 .ok_or_else(|| anyhow::anyhow!("Field missing type"))?;
283
284 let description = field_obj
285 .get("doc")
286 .and_then(|v| v.as_str())
287 .map(|s| s.to_string())
288 .unwrap_or_default();
289
290 let (avro_type, nullable) = if let Some(types) = field_type.as_array() {
292 if types.len() == 2 && types.iter().any(|t| t.as_str() == Some("null")) {
293 let non_null_type = types
295 .iter()
296 .find(|t| t.as_str() != Some("null"))
297 .ok_or_else(|| anyhow::anyhow!("Invalid union type"))?;
298 (non_null_type, true)
299 } else {
300 let first_non_null = types
303 .iter()
304 .find(|t| t.as_str() != Some("null"))
305 .unwrap_or(field_type);
306 (first_non_null, true)
307 }
308 } else {
309 (field_type, false)
310 };
311
312 let mut columns = Vec::new();
314 if let Some(type_str) = avro_type.as_str() {
315 let data_type = self.map_avro_type_to_sql(type_str);
317 columns.push(Column {
318 name: field_name,
319 data_type,
320 nullable,
321 description,
322 ..Default::default()
323 });
324 } else if let Some(type_obj) = avro_type.as_object() {
325 if type_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
327 let nested_name = type_obj
329 .get("name")
330 .and_then(|v| v.as_str())
331 .unwrap_or(&field_name);
332 let nested_fields = type_obj
333 .get("fields")
334 .and_then(|v| v.as_array())
335 .ok_or_else(|| anyhow::anyhow!("Nested record missing fields"))?;
336
337 for nested_field in nested_fields {
338 match self.parse_field(nested_field, nested_name, errors) {
339 Ok(mut nested_cols) => {
340 for col in nested_cols.iter_mut() {
342 col.name = format!("{}.{}", field_name, col.name);
343 }
344 columns.append(&mut nested_cols);
345 }
346 Err(e) => {
347 errors.push(ParserError {
348 error_type: "parse_error".to_string(),
349 field: Some(format!("{}.{}", field_name, nested_name)),
350 message: format!("Failed to parse nested field: {}", e),
351 });
352 }
353 }
354 }
355 } else if type_obj.get("type").and_then(|v| v.as_str()) == Some("array") {
356 let items = type_obj
358 .get("items")
359 .ok_or_else(|| anyhow::anyhow!("Array type missing items"))?;
360
361 let data_type = if let Some(items_str) = items.as_str() {
362 format!("ARRAY<{}>", self.map_avro_type_to_sql(items_str))
363 } else if let Some(items_obj) = items.as_object() {
364 if items_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
365 let nested_name = items_obj
367 .get("name")
368 .and_then(|v| v.as_str())
369 .unwrap_or(&field_name);
370 let nested_fields = items_obj
371 .get("fields")
372 .and_then(|v| v.as_array())
373 .ok_or_else(|| anyhow::anyhow!("Array record missing fields"))?;
374
375 for nested_field in nested_fields {
376 match self.parse_field(nested_field, nested_name, errors) {
377 Ok(mut nested_cols) => {
378 for col in nested_cols.iter_mut() {
379 col.name = format!("{}.{}", field_name, col.name);
380 }
381 columns.append(&mut nested_cols);
382 }
383 Err(e) => {
384 errors.push(ParserError {
385 error_type: "parse_error".to_string(),
386 field: Some(format!("{}.{}", field_name, nested_name)),
387 message: format!("Failed to parse array item field: {}", e),
388 });
389 }
390 }
391 }
392 return Ok(columns);
393 } else {
394 format!("ARRAY<{}>", "STRUCT")
395 }
396 } else {
397 "ARRAY<STRING>".to_string()
398 };
399
400 columns.push(Column {
401 name: field_name,
402 data_type,
403 nullable,
404 description,
405 ..Default::default()
406 });
407 } else {
408 columns.push(Column {
410 name: field_name,
411 data_type: "STRUCT".to_string(),
412 nullable,
413 description,
414 ..Default::default()
415 });
416 }
417 } else {
418 return Err(anyhow::anyhow!("Unsupported field type format"));
419 }
420
421 Ok(columns)
422 }
423
424 fn map_avro_type_to_sql(&self, avro_type: &str) -> String {
426 match avro_type {
427 "int" => "INTEGER".to_string(),
428 "long" => "BIGINT".to_string(),
429 "float" => "FLOAT".to_string(),
430 "double" => "DOUBLE".to_string(),
431 "boolean" => "BOOLEAN".to_string(),
432 "bytes" => "BYTES".to_string(),
433 "string" => "STRING".to_string(),
434 "null" => "NULL".to_string(),
435 _ => "STRING".to_string(), }
437 }
438}
439
440#[derive(Debug, Clone)]
442pub struct ParserError {
443 pub error_type: String,
444 pub field: Option<String>,
445 pub message: String,
446}