data_modelling_core/import/
protobuf.rs1use crate::import::odcs_shared::column_to_column_data;
23use crate::import::{ImportError, ImportResult, TableData};
24use crate::models::{Column, Table, Tag};
25use crate::validation::input::{validate_column_name, validate_data_type, validate_table_name};
26use anyhow::Result;
27use std::collections::HashMap;
28use tracing::{info, warn};
29
30pub struct ProtobufImporter;
32
33impl Default for ProtobufImporter {
34 fn default() -> Self {
35 Self::new()
36 }
37}
38
39impl ProtobufImporter {
40 pub fn new() -> Self {
50 Self
51 }
52
53 pub fn import(&self, proto_content: &str) -> Result<ImportResult, ImportError> {
79 match self.parse(proto_content) {
80 Ok((tables, errors)) => {
81 let mut sdk_tables = Vec::new();
82 for (idx, table) in tables.iter().enumerate() {
83 sdk_tables.push(TableData {
84 table_index: idx,
85 id: Some(table.id.to_string()),
86 name: Some(table.name.clone()),
87 columns: table.columns.iter().map(column_to_column_data).collect(),
88 ..Default::default()
89 });
90 }
91 let sdk_errors: Vec<ImportError> = errors
92 .iter()
93 .map(|e| ImportError::ParseError(e.message.clone()))
94 .collect();
95 Ok(ImportResult {
96 tables: sdk_tables,
97 tables_requiring_name: Vec::new(),
98 errors: sdk_errors,
99 ai_suggestions: None,
100 })
101 }
102 Err(e) => Err(ImportError::ParseError(e.to_string())),
103 }
104 }
105
106 fn parse(&self, proto_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
119 let mut errors = Vec::new();
120 let mut tables = Vec::new();
121
122 let lines: Vec<&str> = proto_content.lines().collect();
124 let mut current_message: Option<Message> = None;
125 let mut messages = Vec::new();
126
127 for (_line_num, line) in lines.iter().enumerate() {
128 let trimmed = line.trim();
129
130 if trimmed.is_empty() || trimmed.starts_with("//") || trimmed.starts_with("/*") {
132 continue;
133 }
134
135 if trimmed.starts_with("message ") {
137 if let Some(msg) = current_message.take() {
139 messages.push(msg);
140 }
141
142 let msg_name = trimmed
144 .strip_prefix("message ")
145 .and_then(|s| {
146 let s = s.trim_end();
148 if let Some(stripped) = s.strip_suffix("{") {
149 Some(stripped)
150 } else if let Some(stripped) = s.strip_suffix(" {") {
151 Some(stripped)
152 } else {
153 s.split_whitespace().next()
154 }
155 })
156 .map(|s| s.trim())
157 .filter(|s| !s.is_empty())
158 .ok_or_else(|| anyhow::anyhow!("Invalid message syntax: {}", trimmed))?;
159
160 if let Err(e) = validate_table_name(msg_name) {
162 warn!("Message name validation warning for '{}': {}", msg_name, e);
163 }
164
165 current_message = Some(Message {
166 name: msg_name.to_string(),
167 fields: Vec::new(),
168 });
169 } else if trimmed == "}" || trimmed == "};" {
170 if let Some(msg) = current_message.take() {
172 messages.push(msg);
173 }
174 } else if trimmed.starts_with("enum ") {
175 continue;
177 } else if let Some(ref mut msg) = current_message {
178 if let Ok(field) = self.parse_field(trimmed, _line_num) {
180 msg.fields.push(field);
181 } else {
182 if !trimmed.is_empty() && !trimmed.starts_with("//") {
184 errors.push(ParserError {
185 error_type: "parse_error".to_string(),
186 field: Some(format!("line {}", _line_num + 1)),
187 message: format!("Failed to parse field: {}", trimmed),
188 });
189 }
190 }
191 }
192 }
193
194 if let Some(msg) = current_message {
196 messages.push(msg);
197 }
198
199 for message in &messages {
201 match self.message_to_table(message, &messages, &mut errors) {
202 Ok(table) => tables.push(table),
203 Err(e) => {
204 errors.push(ParserError {
205 error_type: "parse_error".to_string(),
206 field: Some(message.name.clone()),
207 message: format!("Failed to convert message to table: {}", e),
208 });
209 }
210 }
211 }
212
213 Ok((tables, errors))
214 }
215
216 fn parse_field(&self, line: &str, _line_num: usize) -> Result<ProtobufField> {
218 let line = line.split("//").next().unwrap_or(line).trim();
220
221 let parts: Vec<&str> = line.split_whitespace().collect();
223 if parts.len() < 3 {
224 return Err(anyhow::anyhow!("Invalid field syntax"));
225 }
226
227 let mut idx = 0;
228 let mut repeated = false;
229 let mut optional = false;
230
231 while idx < parts.len() {
233 match parts[idx] {
234 "repeated" => {
235 repeated = true;
236 idx += 1;
237 }
238 "optional" => {
239 optional = true;
240 idx += 1;
241 }
242 _ => break,
243 }
244 }
245
246 if idx >= parts.len() {
247 return Err(anyhow::anyhow!("Missing field type"));
248 }
249
250 let field_type = parts[idx].to_string();
251 idx += 1;
252
253 if idx >= parts.len() {
254 return Err(anyhow::anyhow!("Missing field name"));
255 }
256
257 let field_name = parts[idx]
258 .strip_suffix(";")
259 .unwrap_or(parts[idx])
260 .to_string();
261 idx += 1;
262
263 if let Err(e) = validate_column_name(&field_name) {
265 warn!("Field name validation warning for '{}': {}", field_name, e);
266 }
267 if let Err(e) = validate_data_type(&field_type) {
268 warn!("Field type validation warning for '{}': {}", field_type, e);
269 }
270
271 let _field_number = if idx < parts.len() {
273 parts[idx]
274 .strip_prefix("=")
275 .and_then(|s| s.strip_suffix(";"))
276 .and_then(|s| s.parse::<u32>().ok())
277 } else {
278 None
279 };
280
281 Ok(ProtobufField {
282 name: field_name,
283 field_type,
284 repeated,
285 nullable: optional || repeated, })
287 }
288
289 fn message_to_table(
291 &self,
292 message: &Message,
293 all_messages: &[Message],
294 _errors: &mut Vec<ParserError>,
295 ) -> Result<Table> {
296 let mut columns = Vec::new();
297
298 for field in &message.fields {
299 if let Some(nested_msg) = all_messages.iter().find(|m| m.name == field.field_type) {
301 for nested_field in &nested_msg.fields {
304 let nested_field_name = format!("{}.{}", field.name, nested_field.name);
305
306 if let Some(deep_nested_msg) = all_messages
308 .iter()
309 .find(|m| m.name == nested_field.field_type)
310 {
311 for deep_nested_field in &deep_nested_msg.fields {
313 let data_type = if deep_nested_field.repeated {
314 format!(
315 "ARRAY<{}>",
316 self.map_proto_type_to_sql(&deep_nested_field.field_type)
317 )
318 } else {
319 self.map_proto_type_to_sql(&deep_nested_field.field_type)
320 };
321
322 columns.push(Column {
323 name: format!("{}.{}", nested_field_name, deep_nested_field.name),
324 data_type,
325 nullable: nested_field.nullable || deep_nested_field.nullable,
326 ..Default::default()
327 });
328 }
329 } else {
330 let data_type = if nested_field.repeated {
332 format!(
333 "ARRAY<{}>",
334 self.map_proto_type_to_sql(&nested_field.field_type)
335 )
336 } else {
337 self.map_proto_type_to_sql(&nested_field.field_type)
338 };
339
340 columns.push(Column {
341 name: nested_field_name,
342 data_type,
343 nullable: nested_field.nullable,
344 ..Default::default()
345 });
346 }
347 }
348 } else {
349 let data_type = if field.repeated {
351 format!("ARRAY<{}>", self.map_proto_type_to_sql(&field.field_type))
352 } else {
353 self.map_proto_type_to_sql(&field.field_type)
354 };
355
356 columns.push(Column {
357 name: field.name.clone(),
358 data_type,
359 nullable: field.nullable,
360 ..Default::default()
361 });
362 }
363 }
364
365 let tags: Vec<Tag> = Vec::new(); let mut odcl_metadata = HashMap::new();
372 odcl_metadata.insert(
373 "syntax".to_string(),
374 serde_json::Value::String("proto3".to_string()),
375 );
376
377 let table = Table {
378 id: crate::models::table::Table::generate_id(&message.name, None, None, None),
379 name: message.name.clone(),
380 columns,
381 database_type: None,
382 catalog_name: None,
383 schema_name: None,
384 medallion_layers: Vec::new(),
385 scd_pattern: None,
386 data_vault_classification: None,
387 modeling_level: None,
388 tags,
389 odcl_metadata,
390 owner: None,
391 sla: None,
392 contact_details: None,
393 infrastructure_type: None,
394 notes: None,
395 position: None,
396 yaml_file_path: None,
397 drawio_cell_id: None,
398 quality: Vec::new(),
399 errors: Vec::new(),
400 created_at: chrono::Utc::now(),
401 updated_at: chrono::Utc::now(),
402 };
403
404 info!(
405 "Parsed Protobuf message: {} with {} columns",
406 message.name,
407 table.columns.len()
408 );
409 Ok(table)
410 }
411
412 fn map_proto_type_to_sql(&self, proto_type: &str) -> String {
414 match proto_type {
415 "int32" | "int" => "INTEGER".to_string(),
417 "int64" | "long" => "BIGINT".to_string(),
418 "uint32" => "INTEGER".to_string(), "uint64" => "BIGINT".to_string(),
420 "sint32" => "INTEGER".to_string(), "sint64" => "BIGINT".to_string(),
422 "fixed32" => "INTEGER".to_string(), "fixed64" => "BIGINT".to_string(), "sfixed32" => "INTEGER".to_string(), "sfixed64" => "BIGINT".to_string(), "float" => "FLOAT".to_string(),
427 "double" => "DOUBLE".to_string(),
428 "bool" | "boolean" => "BOOLEAN".to_string(),
429 "bytes" => "BYTES".to_string(),
430 "string" => "STRING".to_string(),
431 "google.protobuf.StringValue" => "STRING".to_string(),
433 "google.protobuf.BytesValue" => "BYTES".to_string(),
434 "google.protobuf.Int32Value" => "INTEGER".to_string(),
435 "google.protobuf.Int64Value" => "BIGINT".to_string(),
436 "google.protobuf.UInt32Value" => "INTEGER".to_string(),
437 "google.protobuf.UInt64Value" => "BIGINT".to_string(),
438 "google.protobuf.FloatValue" => "FLOAT".to_string(),
439 "google.protobuf.DoubleValue" => "DOUBLE".to_string(),
440 "google.protobuf.BoolValue" => "BOOLEAN".to_string(),
441 "google.protobuf.Timestamp" => "TIMESTAMP".to_string(),
443 "google.protobuf.Duration" => "STRING".to_string(),
444 "google.protobuf.Any" => "STRING".to_string(),
446 "google.protobuf.Struct" => "STRING".to_string(),
447 "google.protobuf.Value" => "STRING".to_string(),
448 "google.protobuf.ListValue" => "STRING".to_string(),
449 "google.protobuf.FieldMask" => "STRING".to_string(),
450 "google.protobuf.Empty" => "STRING".to_string(),
451 _ => "STRING".to_string(),
453 }
454 }
455}
456
457#[derive(Debug, Clone)]
459struct Message {
460 name: String,
461 fields: Vec<ProtobufField>,
462}
463
464#[derive(Debug, Clone)]
466struct ProtobufField {
467 name: String,
468 field_type: String,
469 repeated: bool,
470 nullable: bool,
471}
472
473#[derive(Debug, Clone)]
475pub struct ParserError {
476 pub error_type: String,
477 pub field: Option<String>,
478 pub message: String,
479}