data_modelling_sdk/import/
avro.rs1use crate::import::odcs_shared::column_to_column_data;
10use crate::import::{ImportError, ImportResult, TableData};
11use crate::models::{Column, Table, Tag};
12use crate::validation::input::{validate_column_name, validate_table_name};
13use anyhow::{Context, Result};
14use serde_json::{Value, json};
15use std::collections::HashMap;
16use std::str::FromStr;
17use tracing::{info, warn};
18
19#[derive(Default)]
21pub struct AvroImporter;
22
23impl AvroImporter {
24 pub fn new() -> Self {
34 Self
35 }
36
37 pub fn import(&self, avro_content: &str) -> Result<ImportResult, ImportError> {
65 match self.parse(avro_content) {
66 Ok((tables, errors)) => {
67 let mut sdk_tables = Vec::new();
68 for (idx, table) in tables.iter().enumerate() {
69 sdk_tables.push(TableData {
70 table_index: idx,
71 name: Some(table.name.clone()),
72 columns: table.columns.iter().map(column_to_column_data).collect(),
73 });
74 }
75 let sdk_errors: Vec<ImportError> = errors
76 .iter()
77 .map(|e| ImportError::ParseError(e.message.clone()))
78 .collect();
79 Ok(ImportResult {
80 tables: sdk_tables,
81 tables_requiring_name: Vec::new(),
82 errors: sdk_errors,
83 ai_suggestions: None,
84 })
85 }
86 Err(e) => Err(ImportError::ParseError(e.to_string())),
87 }
88 }
89
90 fn parse(&self, avro_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
96 let mut errors = Vec::new();
97
98 let schema: Value =
100 serde_json::from_str(avro_content).context("Failed to parse AVRO schema as JSON")?;
101
102 let mut tables = Vec::new();
103
104 if let Some(schemas) = schema.as_array() {
106 for (idx, schema_item) in schemas.iter().enumerate() {
108 match self.parse_schema(schema_item, &mut errors) {
109 Ok(table) => tables.push(table),
110 Err(e) => {
111 errors.push(ParserError {
112 error_type: "parse_error".to_string(),
113 field: Some(format!("schema[{}]", idx)),
114 message: format!("Failed to parse schema: {}", e),
115 });
116 }
117 }
118 }
119 } else {
120 match self.parse_schema(&schema, &mut errors) {
122 Ok(table) => tables.push(table),
123 Err(e) => {
124 errors.push(ParserError {
125 error_type: "parse_error".to_string(),
126 field: None,
127 message: format!("Failed to parse schema: {}", e),
128 });
129 }
130 }
131 }
132
133 Ok((tables, errors))
134 }
135
136 fn parse_schema(&self, schema: &Value, errors: &mut Vec<ParserError>) -> Result<Table> {
138 let schema_obj = schema
139 .as_object()
140 .ok_or_else(|| anyhow::anyhow!("Schema must be an object"))?;
141
142 let name = schema_obj
144 .get("name")
145 .and_then(|v| v.as_str())
146 .ok_or_else(|| anyhow::anyhow!("Missing required field: name"))?
147 .to_string();
148
149 if let Err(e) = validate_table_name(&name) {
151 warn!("Table name validation warning for '{}': {}", name, e);
152 }
153
154 let namespace = schema_obj
156 .get("namespace")
157 .and_then(|v| v.as_str())
158 .map(|s| s.to_string());
159
160 let fields = schema_obj
162 .get("fields")
163 .and_then(|v| v.as_array())
164 .ok_or_else(|| anyhow::anyhow!("Missing required field: fields"))?;
165
166 let mut columns = Vec::new();
167 for (idx, field) in fields.iter().enumerate() {
168 match self.parse_field(field, &name, errors) {
169 Ok(mut cols) => columns.append(&mut cols),
170 Err(e) => {
171 errors.push(ParserError {
172 error_type: "parse_error".to_string(),
173 field: Some(format!("fields[{}]", idx)),
174 message: format!("Failed to parse field: {}", e),
175 });
176 }
177 }
178 }
179
180 let mut tags: Vec<Tag> = Vec::new();
182 if let Some(tags_arr) = schema_obj.get("tags").and_then(|v| v.as_array()) {
183 for item in tags_arr {
184 if let Some(s) = item.as_str() {
185 if let Ok(tag) = Tag::from_str(s) {
186 tags.push(tag);
187 } else {
188 tags.push(Tag::Simple(s.to_string()));
189 }
190 }
191 }
192 }
193 if let Some(aliases_arr) = schema_obj.get("aliases").and_then(|v| v.as_array()) {
195 for item in aliases_arr {
196 if let Some(s) = item.as_str() {
197 if let Ok(tag) = Tag::from_str(s) {
199 if !tags.contains(&tag) {
200 tags.push(tag);
201 }
202 } else {
203 let simple_tag = Tag::Simple(s.to_string());
204 if !tags.contains(&simple_tag) {
205 tags.push(simple_tag);
206 }
207 }
208 }
209 }
210 }
211
212 let mut odcl_metadata = HashMap::new();
214 if let Some(ref ns) = namespace {
215 odcl_metadata.insert("namespace".to_string(), json!(ns));
216 }
217 if let Some(doc) = schema_obj.get("doc").and_then(|v| v.as_str()) {
218 odcl_metadata.insert("description".to_string(), json!(doc));
219 }
220
221 let table = Table {
222 id: crate::models::table::Table::generate_id(&name, None, None, namespace.as_deref()),
223 name: name.clone(),
224 columns,
225 database_type: None,
226 catalog_name: None,
227 schema_name: namespace.clone(),
228 medallion_layers: Vec::new(),
229 scd_pattern: None,
230 data_vault_classification: None,
231 modeling_level: None,
232 tags,
233 odcl_metadata,
234 owner: None,
235 sla: None,
236 contact_details: None,
237 infrastructure_type: None,
238 notes: None,
239 position: None,
240 yaml_file_path: None,
241 drawio_cell_id: None,
242 quality: Vec::new(),
243 errors: Vec::new(),
244 created_at: chrono::Utc::now(),
245 updated_at: chrono::Utc::now(),
246 };
247
248 info!(
249 "Parsed AVRO schema: {} with {} columns",
250 name,
251 table.columns.len()
252 );
253 Ok(table)
254 }
255
256 fn parse_field(
258 &self,
259 field: &Value,
260 _parent_name: &str,
261 errors: &mut Vec<ParserError>,
262 ) -> Result<Vec<Column>> {
263 let field_obj = field
264 .as_object()
265 .ok_or_else(|| anyhow::anyhow!("Field must be an object"))?;
266
267 let field_name = field_obj
268 .get("name")
269 .and_then(|v| v.as_str())
270 .ok_or_else(|| anyhow::anyhow!("Field missing name"))?
271 .to_string();
272
273 if let Err(e) = validate_column_name(&field_name) {
275 warn!("Column name validation warning for '{}': {}", field_name, e);
276 }
277
278 let field_type = field_obj
279 .get("type")
280 .ok_or_else(|| anyhow::anyhow!("Field missing type"))?;
281
282 let description = field_obj
283 .get("doc")
284 .and_then(|v| v.as_str())
285 .map(|s| s.to_string())
286 .unwrap_or_default();
287
288 let (avro_type, nullable) = if let Some(types) = field_type.as_array() {
290 if types.len() == 2 && types.iter().any(|t| t.as_str() == Some("null")) {
291 let non_null_type = types
293 .iter()
294 .find(|t| t.as_str() != Some("null"))
295 .ok_or_else(|| anyhow::anyhow!("Invalid union type"))?;
296 (non_null_type, true)
297 } else {
298 let first_non_null = types
301 .iter()
302 .find(|t| t.as_str() != Some("null"))
303 .unwrap_or(field_type);
304 (first_non_null, true)
305 }
306 } else {
307 (field_type, false)
308 };
309
310 let mut columns = Vec::new();
312 if let Some(type_str) = avro_type.as_str() {
313 let data_type = self.map_avro_type_to_sql(type_str);
315 columns.push(Column {
316 name: field_name,
317 data_type,
318 nullable,
319 description,
320 ..Default::default()
321 });
322 } else if let Some(type_obj) = avro_type.as_object() {
323 if type_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
325 let nested_name = type_obj
327 .get("name")
328 .and_then(|v| v.as_str())
329 .unwrap_or(&field_name);
330 let nested_fields = type_obj
331 .get("fields")
332 .and_then(|v| v.as_array())
333 .ok_or_else(|| anyhow::anyhow!("Nested record missing fields"))?;
334
335 for nested_field in nested_fields {
336 match self.parse_field(nested_field, nested_name, errors) {
337 Ok(mut nested_cols) => {
338 for col in nested_cols.iter_mut() {
340 col.name = format!("{}.{}", field_name, col.name);
341 }
342 columns.append(&mut nested_cols);
343 }
344 Err(e) => {
345 errors.push(ParserError {
346 error_type: "parse_error".to_string(),
347 field: Some(format!("{}.{}", field_name, nested_name)),
348 message: format!("Failed to parse nested field: {}", e),
349 });
350 }
351 }
352 }
353 } else if type_obj.get("type").and_then(|v| v.as_str()) == Some("array") {
354 let items = type_obj
356 .get("items")
357 .ok_or_else(|| anyhow::anyhow!("Array type missing items"))?;
358
359 let data_type = if let Some(items_str) = items.as_str() {
360 format!("ARRAY<{}>", self.map_avro_type_to_sql(items_str))
361 } else if let Some(items_obj) = items.as_object() {
362 if items_obj.get("type").and_then(|v| v.as_str()) == Some("record") {
363 let nested_name = items_obj
365 .get("name")
366 .and_then(|v| v.as_str())
367 .unwrap_or(&field_name);
368 let nested_fields = items_obj
369 .get("fields")
370 .and_then(|v| v.as_array())
371 .ok_or_else(|| anyhow::anyhow!("Array record missing fields"))?;
372
373 for nested_field in nested_fields {
374 match self.parse_field(nested_field, nested_name, errors) {
375 Ok(mut nested_cols) => {
376 for col in nested_cols.iter_mut() {
377 col.name = format!("{}.{}", field_name, col.name);
378 }
379 columns.append(&mut nested_cols);
380 }
381 Err(e) => {
382 errors.push(ParserError {
383 error_type: "parse_error".to_string(),
384 field: Some(format!("{}.{}", field_name, nested_name)),
385 message: format!("Failed to parse array item field: {}", e),
386 });
387 }
388 }
389 }
390 return Ok(columns);
391 } else {
392 format!("ARRAY<{}>", "STRUCT")
393 }
394 } else {
395 "ARRAY<STRING>".to_string()
396 };
397
398 columns.push(Column {
399 name: field_name,
400 data_type,
401 nullable,
402 description,
403 ..Default::default()
404 });
405 } else {
406 columns.push(Column {
408 name: field_name,
409 data_type: "STRUCT".to_string(),
410 nullable,
411 description,
412 ..Default::default()
413 });
414 }
415 } else {
416 return Err(anyhow::anyhow!("Unsupported field type format"));
417 }
418
419 Ok(columns)
420 }
421
422 fn map_avro_type_to_sql(&self, avro_type: &str) -> String {
424 match avro_type {
425 "int" => "INTEGER".to_string(),
426 "long" => "BIGINT".to_string(),
427 "float" => "FLOAT".to_string(),
428 "double" => "DOUBLE".to_string(),
429 "boolean" => "BOOLEAN".to_string(),
430 "bytes" => "BYTES".to_string(),
431 "string" => "STRING".to_string(),
432 "null" => "NULL".to_string(),
433 _ => "STRING".to_string(), }
435 }
436}
437
438#[derive(Debug, Clone)]
440pub struct ParserError {
441 pub error_type: String,
442 pub field: Option<String>,
443 pub message: String,
444}