data_modelling_sdk/import/
protobuf.rs1use crate::import::odcs_shared::column_to_column_data;
23use crate::import::{ImportError, ImportResult, TableData};
24use crate::models::{Column, Table, Tag};
25use crate::validation::input::{validate_column_name, validate_data_type, validate_table_name};
26use anyhow::Result;
27use std::collections::HashMap;
28use tracing::{info, warn};
29
30pub struct ProtobufImporter;
32
33impl Default for ProtobufImporter {
34 fn default() -> Self {
35 Self::new()
36 }
37}
38
39impl ProtobufImporter {
40 pub fn new() -> Self {
50 Self
51 }
52
53 pub fn import(&self, proto_content: &str) -> Result<ImportResult, ImportError> {
79 match self.parse(proto_content) {
80 Ok((tables, errors)) => {
81 let mut sdk_tables = Vec::new();
82 for (idx, table) in tables.iter().enumerate() {
83 sdk_tables.push(TableData {
84 table_index: idx,
85 name: Some(table.name.clone()),
86 columns: table.columns.iter().map(column_to_column_data).collect(),
87 });
88 }
89 let sdk_errors: Vec<ImportError> = errors
90 .iter()
91 .map(|e| ImportError::ParseError(e.message.clone()))
92 .collect();
93 Ok(ImportResult {
94 tables: sdk_tables,
95 tables_requiring_name: Vec::new(),
96 errors: sdk_errors,
97 ai_suggestions: None,
98 })
99 }
100 Err(e) => Err(ImportError::ParseError(e.to_string())),
101 }
102 }
103
104 fn parse(&self, proto_content: &str) -> Result<(Vec<Table>, Vec<ParserError>)> {
117 let mut errors = Vec::new();
118 let mut tables = Vec::new();
119
120 let lines: Vec<&str> = proto_content.lines().collect();
122 let mut current_message: Option<Message> = None;
123 let mut messages = Vec::new();
124
125 for (_line_num, line) in lines.iter().enumerate() {
126 let trimmed = line.trim();
127
128 if trimmed.is_empty() || trimmed.starts_with("//") || trimmed.starts_with("/*") {
130 continue;
131 }
132
133 if trimmed.starts_with("message ") {
135 if let Some(msg) = current_message.take() {
137 messages.push(msg);
138 }
139
140 let msg_name = trimmed
142 .strip_prefix("message ")
143 .and_then(|s| {
144 let s = s.trim_end();
146 if let Some(stripped) = s.strip_suffix("{") {
147 Some(stripped)
148 } else if let Some(stripped) = s.strip_suffix(" {") {
149 Some(stripped)
150 } else {
151 s.split_whitespace().next()
152 }
153 })
154 .map(|s| s.trim())
155 .filter(|s| !s.is_empty())
156 .ok_or_else(|| anyhow::anyhow!("Invalid message syntax: {}", trimmed))?;
157
158 if let Err(e) = validate_table_name(msg_name) {
160 warn!("Message name validation warning for '{}': {}", msg_name, e);
161 }
162
163 current_message = Some(Message {
164 name: msg_name.to_string(),
165 fields: Vec::new(),
166 });
167 } else if trimmed == "}" || trimmed == "};" {
168 if let Some(msg) = current_message.take() {
170 messages.push(msg);
171 }
172 } else if trimmed.starts_with("enum ") {
173 continue;
175 } else if let Some(ref mut msg) = current_message {
176 if let Ok(field) = self.parse_field(trimmed, _line_num) {
178 msg.fields.push(field);
179 } else {
180 if !trimmed.is_empty() && !trimmed.starts_with("//") {
182 errors.push(ParserError {
183 error_type: "parse_error".to_string(),
184 field: Some(format!("line {}", _line_num + 1)),
185 message: format!("Failed to parse field: {}", trimmed),
186 });
187 }
188 }
189 }
190 }
191
192 if let Some(msg) = current_message {
194 messages.push(msg);
195 }
196
197 for message in &messages {
199 match self.message_to_table(message, &messages, &mut errors) {
200 Ok(table) => tables.push(table),
201 Err(e) => {
202 errors.push(ParserError {
203 error_type: "parse_error".to_string(),
204 field: Some(message.name.clone()),
205 message: format!("Failed to convert message to table: {}", e),
206 });
207 }
208 }
209 }
210
211 Ok((tables, errors))
212 }
213
214 fn parse_field(&self, line: &str, _line_num: usize) -> Result<ProtobufField> {
216 let line = line.split("//").next().unwrap_or(line).trim();
218
219 let parts: Vec<&str> = line.split_whitespace().collect();
221 if parts.len() < 3 {
222 return Err(anyhow::anyhow!("Invalid field syntax"));
223 }
224
225 let mut idx = 0;
226 let mut repeated = false;
227 let mut optional = false;
228
229 while idx < parts.len() {
231 match parts[idx] {
232 "repeated" => {
233 repeated = true;
234 idx += 1;
235 }
236 "optional" => {
237 optional = true;
238 idx += 1;
239 }
240 _ => break,
241 }
242 }
243
244 if idx >= parts.len() {
245 return Err(anyhow::anyhow!("Missing field type"));
246 }
247
248 let field_type = parts[idx].to_string();
249 idx += 1;
250
251 if idx >= parts.len() {
252 return Err(anyhow::anyhow!("Missing field name"));
253 }
254
255 let field_name = parts[idx]
256 .strip_suffix(";")
257 .unwrap_or(parts[idx])
258 .to_string();
259 idx += 1;
260
261 if let Err(e) = validate_column_name(&field_name) {
263 warn!("Field name validation warning for '{}': {}", field_name, e);
264 }
265 if let Err(e) = validate_data_type(&field_type) {
266 warn!("Field type validation warning for '{}': {}", field_type, e);
267 }
268
269 let _field_number = if idx < parts.len() {
271 parts[idx]
272 .strip_prefix("=")
273 .and_then(|s| s.strip_suffix(";"))
274 .and_then(|s| s.parse::<u32>().ok())
275 } else {
276 None
277 };
278
279 Ok(ProtobufField {
280 name: field_name,
281 field_type,
282 repeated,
283 nullable: optional || repeated, })
285 }
286
287 fn message_to_table(
289 &self,
290 message: &Message,
291 all_messages: &[Message],
292 _errors: &mut Vec<ParserError>,
293 ) -> Result<Table> {
294 let mut columns = Vec::new();
295
296 for field in &message.fields {
297 if let Some(nested_msg) = all_messages.iter().find(|m| m.name == field.field_type) {
299 for nested_field in &nested_msg.fields {
302 let nested_field_name = format!("{}.{}", field.name, nested_field.name);
303
304 if let Some(deep_nested_msg) = all_messages
306 .iter()
307 .find(|m| m.name == nested_field.field_type)
308 {
309 for deep_nested_field in &deep_nested_msg.fields {
311 let data_type = if deep_nested_field.repeated {
312 format!(
313 "ARRAY<{}>",
314 self.map_proto_type_to_sql(&deep_nested_field.field_type)
315 )
316 } else {
317 self.map_proto_type_to_sql(&deep_nested_field.field_type)
318 };
319
320 columns.push(Column {
321 name: format!("{}.{}", nested_field_name, deep_nested_field.name),
322 data_type,
323 nullable: nested_field.nullable || deep_nested_field.nullable,
324 ..Default::default()
325 });
326 }
327 } else {
328 let data_type = if nested_field.repeated {
330 format!(
331 "ARRAY<{}>",
332 self.map_proto_type_to_sql(&nested_field.field_type)
333 )
334 } else {
335 self.map_proto_type_to_sql(&nested_field.field_type)
336 };
337
338 columns.push(Column {
339 name: nested_field_name,
340 data_type,
341 nullable: nested_field.nullable,
342 ..Default::default()
343 });
344 }
345 }
346 } else {
347 let data_type = if field.repeated {
349 format!("ARRAY<{}>", self.map_proto_type_to_sql(&field.field_type))
350 } else {
351 self.map_proto_type_to_sql(&field.field_type)
352 };
353
354 columns.push(Column {
355 name: field.name.clone(),
356 data_type,
357 nullable: field.nullable,
358 ..Default::default()
359 });
360 }
361 }
362
363 let tags: Vec<Tag> = Vec::new(); let mut odcl_metadata = HashMap::new();
370 odcl_metadata.insert(
371 "syntax".to_string(),
372 serde_json::Value::String("proto3".to_string()),
373 );
374
375 let table = Table {
376 id: crate::models::table::Table::generate_id(&message.name, None, None, None),
377 name: message.name.clone(),
378 columns,
379 database_type: None,
380 catalog_name: None,
381 schema_name: None,
382 medallion_layers: Vec::new(),
383 scd_pattern: None,
384 data_vault_classification: None,
385 modeling_level: None,
386 tags,
387 odcl_metadata,
388 owner: None,
389 sla: None,
390 contact_details: None,
391 infrastructure_type: None,
392 notes: None,
393 position: None,
394 yaml_file_path: None,
395 drawio_cell_id: None,
396 quality: Vec::new(),
397 errors: Vec::new(),
398 created_at: chrono::Utc::now(),
399 updated_at: chrono::Utc::now(),
400 };
401
402 info!(
403 "Parsed Protobuf message: {} with {} columns",
404 message.name,
405 table.columns.len()
406 );
407 Ok(table)
408 }
409
410 fn map_proto_type_to_sql(&self, proto_type: &str) -> String {
412 match proto_type {
413 "int32" | "int" => "INTEGER".to_string(),
415 "int64" | "long" => "BIGINT".to_string(),
416 "uint32" => "INTEGER".to_string(), "uint64" => "BIGINT".to_string(),
418 "sint32" => "INTEGER".to_string(), "sint64" => "BIGINT".to_string(),
420 "fixed32" => "INTEGER".to_string(), "fixed64" => "BIGINT".to_string(), "sfixed32" => "INTEGER".to_string(), "sfixed64" => "BIGINT".to_string(), "float" => "FLOAT".to_string(),
425 "double" => "DOUBLE".to_string(),
426 "bool" | "boolean" => "BOOLEAN".to_string(),
427 "bytes" => "BYTES".to_string(),
428 "string" => "STRING".to_string(),
429 "google.protobuf.StringValue" => "STRING".to_string(),
431 "google.protobuf.BytesValue" => "BYTES".to_string(),
432 "google.protobuf.Int32Value" => "INTEGER".to_string(),
433 "google.protobuf.Int64Value" => "BIGINT".to_string(),
434 "google.protobuf.UInt32Value" => "INTEGER".to_string(),
435 "google.protobuf.UInt64Value" => "BIGINT".to_string(),
436 "google.protobuf.FloatValue" => "FLOAT".to_string(),
437 "google.protobuf.DoubleValue" => "DOUBLE".to_string(),
438 "google.protobuf.BoolValue" => "BOOLEAN".to_string(),
439 "google.protobuf.Timestamp" => "TIMESTAMP".to_string(),
441 "google.protobuf.Duration" => "STRING".to_string(),
442 "google.protobuf.Any" => "STRING".to_string(),
444 "google.protobuf.Struct" => "STRING".to_string(),
445 "google.protobuf.Value" => "STRING".to_string(),
446 "google.protobuf.ListValue" => "STRING".to_string(),
447 "google.protobuf.FieldMask" => "STRING".to_string(),
448 "google.protobuf.Empty" => "STRING".to_string(),
449 _ => "STRING".to_string(),
451 }
452 }
453}
454
455#[derive(Debug, Clone)]
457struct Message {
458 name: String,
459 fields: Vec<ProtobufField>,
460}
461
462#[derive(Debug, Clone)]
464struct ProtobufField {
465 name: String,
466 field_type: String,
467 repeated: bool,
468 nullable: bool,
469}
470
471#[derive(Debug, Clone)]
473pub struct ParserError {
474 pub error_type: String,
475 pub field: Option<String>,
476 pub message: String,
477}