1use crate::import::{ImportError, ImportResult, TableData};
23use crate::models::{Column, Table};
24use crate::validation::input::{validate_column_name, validate_data_type, validate_table_name};
25use anyhow::Result;
26use std::collections::HashMap;
27use tracing::{info, warn};
28
29pub struct ProtobufImporter;
31
32impl Default for ProtobufImporter {
33 fn default() -> Self {
34 Self::new()
35 }
36}
37
38impl ProtobufImporter {
39 pub fn new() -> Self {
49 Self
50 }
51
52 pub fn import(&self, proto_content: &str) -> Result<ImportResult, ImportError> {
78 match self.parse(proto_content) {
79 Ok((tables, errors)) => {
80 let mut sdk_tables = Vec::new();
81 for (idx, table) in tables.iter().enumerate() {
82 sdk_tables.push(TableData {
83 table_index: idx,
84 name: Some(table.name.clone()),
85 columns: table
86 .columns
87 .iter()
88 .map(|c| super::ColumnData {
89 name: c.name.clone(),
90 data_type: c.data_type.clone(),
91 nullable: c.nullable,
92 primary_key: c.primary_key,
93 })
94 .collect(),
95 });
96 }
97 let sdk_errors: Vec<ImportError> = errors
98 .iter()
99 .map(|e| ImportError::ParseError(e.message.clone()))
100 .collect();
101 Ok(ImportResult {
102 tables: sdk_tables,
103 tables_requiring_name: Vec::new(),
104 errors: sdk_errors,
105 ai_suggestions: None,
106 })
107 }
108 Err(e) => Err(ImportError::ParseError(e.to_string())),
109 }
110 }
111
112 fn parse(&self, proto_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
125 let mut errors = Vec::new();
126 let mut tables = Vec::new();
127
128 let lines: Vec<&str> = proto_content.lines().collect();
130 let mut current_message: Option<Message> = None;
131 let mut messages = Vec::new();
132
133 for (_line_num, line) in lines.iter().enumerate() {
134 let trimmed = line.trim();
135
136 if trimmed.is_empty() || trimmed.starts_with("//") || trimmed.starts_with("/*") {
138 continue;
139 }
140
141 if trimmed.starts_with("message ") {
143 if let Some(msg) = current_message.take() {
145 messages.push(msg);
146 }
147
148 let msg_name = trimmed
150 .strip_prefix("message ")
151 .and_then(|s| {
152 let s = s.trim_end();
154 if let Some(stripped) = s.strip_suffix("{") {
155 Some(stripped)
156 } else if let Some(stripped) = s.strip_suffix(" {") {
157 Some(stripped)
158 } else {
159 s.split_whitespace().next()
160 }
161 })
162 .map(|s| s.trim())
163 .filter(|s| !s.is_empty())
164 .ok_or_else(|| anyhow::anyhow!("Invalid message syntax: {}", trimmed))?;
165
166 if let Err(e) = validate_table_name(msg_name) {
168 warn!("Message name validation warning for '{}': {}", msg_name, e);
169 }
170
171 current_message = Some(Message {
172 name: msg_name.to_string(),
173 fields: Vec::new(),
174 nested_messages: Vec::new(),
175 });
176 } else if trimmed == "}" || trimmed == "};" {
177 if let Some(msg) = current_message.take() {
179 messages.push(msg);
180 }
181 } else if trimmed.starts_with("enum ") {
182 continue;
184 } else if let Some(ref mut msg) = current_message {
185 if let Ok(field) = self.parse_field(trimmed, _line_num) {
187 msg.fields.push(field);
188 } else {
189 if !trimmed.is_empty() && !trimmed.starts_with("//") {
191 errors.push(ParserError {
192 error_type: "parse_error".to_string(),
193 field: Some(format!("line {}", _line_num + 1)),
194 message: format!("Failed to parse field: {}", trimmed),
195 });
196 }
197 }
198 }
199 }
200
201 if let Some(msg) = current_message {
203 messages.push(msg);
204 }
205
206 for message in &messages {
208 match self.message_to_table(message, &messages, &mut errors) {
209 Ok(table) => tables.push(table),
210 Err(e) => {
211 errors.push(ParserError {
212 error_type: "parse_error".to_string(),
213 field: Some(message.name.clone()),
214 message: format!("Failed to convert message to table: {}", e),
215 });
216 }
217 }
218 }
219
220 Ok((tables, errors))
221 }
222
223 fn parse_field(&self, line: &str, _line_num: usize) -> Result<ProtobufField> {
225 let line = line.split("//").next().unwrap_or(line).trim();
227
228 let parts: Vec<&str> = line.split_whitespace().collect();
230 if parts.len() < 3 {
231 return Err(anyhow::anyhow!("Invalid field syntax"));
232 }
233
234 let mut idx = 0;
235 let mut repeated = false;
236 let mut optional = false;
237
238 while idx < parts.len() {
240 match parts[idx] {
241 "repeated" => {
242 repeated = true;
243 idx += 1;
244 }
245 "optional" => {
246 optional = true;
247 idx += 1;
248 }
249 _ => break,
250 }
251 }
252
253 if idx >= parts.len() {
254 return Err(anyhow::anyhow!("Missing field type"));
255 }
256
257 let field_type = parts[idx].to_string();
258 idx += 1;
259
260 if idx >= parts.len() {
261 return Err(anyhow::anyhow!("Missing field name"));
262 }
263
264 let field_name = parts[idx]
265 .strip_suffix(";")
266 .unwrap_or(parts[idx])
267 .to_string();
268 idx += 1;
269
270 if let Err(e) = validate_column_name(&field_name) {
272 warn!("Field name validation warning for '{}': {}", field_name, e);
273 }
274 if let Err(e) = validate_data_type(&field_type) {
275 warn!("Field type validation warning for '{}': {}", field_type, e);
276 }
277
278 let _field_number = if idx < parts.len() {
280 parts[idx]
281 .strip_prefix("=")
282 .and_then(|s| s.strip_suffix(";"))
283 .and_then(|s| s.parse::<u32>().ok())
284 } else {
285 None
286 };
287
288 Ok(ProtobufField {
289 name: field_name,
290 field_type,
291 repeated,
292 nullable: optional || repeated, })
294 }
295
296 fn message_to_table(
298 &self,
299 message: &Message,
300 all_messages: &[Message],
301 _errors: &mut Vec<ParserError>,
302 ) -> Result<Table> {
303 let mut columns = Vec::new();
304
305 for field in &message.fields {
306 if let Some(nested_msg) = all_messages.iter().find(|m| m.name == field.field_type) {
308 for nested_field in &nested_msg.fields {
311 let nested_field_name = format!("{}.{}", field.name, nested_field.name);
312
313 if let Some(deep_nested_msg) = all_messages
315 .iter()
316 .find(|m| m.name == nested_field.field_type)
317 {
318 for deep_nested_field in &deep_nested_msg.fields {
320 let data_type = if deep_nested_field.repeated {
321 format!(
322 "ARRAY<{}>",
323 self.map_proto_type_to_sql(&deep_nested_field.field_type)
324 )
325 } else {
326 self.map_proto_type_to_sql(&deep_nested_field.field_type)
327 };
328
329 columns.push(Column {
330 name: format!("{}.{}", nested_field_name, deep_nested_field.name),
331 data_type,
332 nullable: nested_field.nullable || deep_nested_field.nullable,
333 primary_key: false,
334 secondary_key: false,
335 composite_key: None,
336 foreign_key: None,
337 constraints: Vec::new(),
338 description: String::new(),
339 quality: Vec::new(),
340 enum_values: Vec::new(),
341 errors: Vec::new(),
342 column_order: 0,
343 });
344 }
345 } else {
346 let data_type = if nested_field.repeated {
348 format!(
349 "ARRAY<{}>",
350 self.map_proto_type_to_sql(&nested_field.field_type)
351 )
352 } else {
353 self.map_proto_type_to_sql(&nested_field.field_type)
354 };
355
356 columns.push(Column {
357 name: nested_field_name,
358 data_type,
359 nullable: nested_field.nullable,
360 primary_key: false,
361 secondary_key: false,
362 composite_key: None,
363 foreign_key: None,
364 constraints: Vec::new(),
365 description: String::new(),
366 quality: Vec::new(),
367 enum_values: Vec::new(),
368 errors: Vec::new(),
369 column_order: 0,
370 });
371 }
372 }
373 } else {
374 let data_type = if field.repeated {
376 format!("ARRAY<{}>", self.map_proto_type_to_sql(&field.field_type))
377 } else {
378 self.map_proto_type_to_sql(&field.field_type)
379 };
380
381 columns.push(Column {
382 name: field.name.clone(),
383 data_type,
384 nullable: field.nullable,
385 primary_key: false,
386 secondary_key: false,
387 composite_key: None,
388 foreign_key: None,
389 constraints: Vec::new(),
390 description: String::new(),
391 quality: Vec::new(),
392 enum_values: Vec::new(),
393 errors: Vec::new(),
394 column_order: 0,
395 });
396 }
397 }
398
399 let mut odcl_metadata = HashMap::new();
400 odcl_metadata.insert(
401 "syntax".to_string(),
402 serde_json::Value::String("proto3".to_string()),
403 );
404
405 let table = Table {
406 id: crate::models::table::Table::generate_id(&message.name, None, None, None),
407 name: message.name.clone(),
408 columns,
409 database_type: None,
410 catalog_name: None,
411 schema_name: None,
412 medallion_layers: Vec::new(),
413 scd_pattern: None,
414 data_vault_classification: None,
415 modeling_level: None,
416 tags: Vec::new(),
417 odcl_metadata,
418 owner: None,
419 sla: None,
420 contact_details: None,
421 infrastructure_type: None,
422 notes: None,
423 position: None,
424 yaml_file_path: None,
425 drawio_cell_id: None,
426 quality: Vec::new(),
427 errors: Vec::new(),
428 created_at: chrono::Utc::now(),
429 updated_at: chrono::Utc::now(),
430 };
431
432 info!(
433 "Parsed Protobuf message: {} with {} columns",
434 message.name,
435 table.columns.len()
436 );
437 Ok(table)
438 }
439
440 fn map_proto_type_to_sql(&self, proto_type: &str) -> String {
442 match proto_type {
443 "int32" | "int" => "INTEGER".to_string(),
444 "int64" | "long" => "BIGINT".to_string(),
445 "uint32" => "INTEGER".to_string(), "uint64" => "BIGINT".to_string(),
447 "sint32" => "INTEGER".to_string(), "sint64" => "BIGINT".to_string(),
449 "fixed32" => "INTEGER".to_string(), "fixed64" => "BIGINT".to_string(), "sfixed32" => "INTEGER".to_string(), "sfixed64" => "BIGINT".to_string(), "float" => "FLOAT".to_string(),
454 "double" => "DOUBLE".to_string(),
455 "bool" | "boolean" => "BOOLEAN".to_string(),
456 "bytes" => "BYTES".to_string(),
457 "string" => "STRING".to_string(),
458 _ => "STRING".to_string(), }
460 }
461}
462
463#[derive(Debug, Clone)]
465struct Message {
466 name: String,
467 fields: Vec<ProtobufField>,
468 #[allow(dead_code)]
469 nested_messages: Vec<Message>,
470}
471
472#[derive(Debug, Clone)]
474struct ProtobufField {
475 name: String,
476 field_type: String,
477 repeated: bool,
478 nullable: bool,
479}
480
481#[derive(Debug, Clone)]
483pub struct ParserError {
484 pub error_type: String,
485 pub field: Option<String>,
486 pub message: String,
487}