1use std::sync::Arc;
7
8use crate::helpers::{
9 decode_bool, decode_double, decode_fixed32, decode_fixed64, decode_float, decode_int32,
10 decode_int64, decode_sfixed32, decode_sfixed64, decode_sint32, decode_sint64, decode_uint32,
11 decode_uint64, parse_varint, parse_wiretag, WiretagResult, WT_END_GROUP, WT_I32, WT_I64,
12 WT_LEN, WT_START_GROUP, WT_VARINT,
13};
14use crate::schema::{proto_label, proto_type as pt, FieldInfo, MessageSchema, ParsedSchema};
15
16#[derive(Debug, Default, Clone)]
21pub struct ProtoTextMessage {
22 pub fields: Vec<ProtoTextField>,
24}
25
26#[derive(Debug, Default, Clone)]
29pub struct ProtoTextField {
30 pub annotations: Vec<String>,
33 pub field_number: Option<u64>,
35
36 pub content: ProtoTextContent,
38
39 pub tag_overhang_count: Option<u64>,
41 pub tag_is_out_of_range: bool,
42 pub value_overhang_count: Option<u64>,
43 pub length_overhang_count: Option<u64>,
44 pub missing_bytes_count: Option<u64>,
45 pub mismatched_group_end: Option<u64>,
46 pub open_ended_group: bool,
47 pub end_tag_overhang_count: Option<u64>,
48 pub end_tag_is_out_of_range: bool,
49
50 pub proto2_has_type_mismatch: bool,
52 pub records_overhung_count: Vec<u64>,
53}
54
55#[derive(Debug, Clone, Default)]
58pub enum ProtoTextContent {
59 #[default]
60 Unset,
61 WireVarint(u64), WireFixed64(u64), WireBytes(Vec<u8>), WireGroup(Box<ProtoTextMessage>), WireFixed32(u32), InvalidTagType(Vec<u8>), InvalidVarint(Vec<u8>), InvalidFixed64(Vec<u8>), InvalidBytesLength(Vec<u8>), TruncatedBytes(Vec<u8>), InvalidPackedRecords(Vec<u8>), InvalidString(Vec<u8>), InvalidGroupEnd(Vec<u8>), InvalidFixed32(Vec<u8>), Double(f64), Float(f32), Int64(i64), Uint64(u64), Int32(i32), PFixed64(u64), PFixed32(u32), Bool(bool), StringVal(String), Group(Box<ProtoTextMessage>), MessageVal(Box<ProtoTextMessage>), BytesVal(Vec<u8>), Uint32(u32), Enum(i32), Sfixed32(i32), Sfixed64(i64), Sint32(i32), Sint64(i64), Doubles(Vec<f64>), Floats(Vec<f32>), Int64s(Vec<i64>), Uint64s(Vec<u64>), Int32s(Vec<i32>), Fixed64s(Vec<u64>), Fixed32s(Vec<u32>), Bools(Vec<bool>), Uint32s(Vec<u32>), Enums(Vec<i32>), Sfixed32s(Vec<i32>), Sfixed64s(Vec<i64>), Sint32s(Vec<i32>), Sint64s(Vec<i64>), }
111
112pub fn ingest_pb(
118 pb_bytes: &[u8],
119 full_schema: &ParsedSchema,
120 annotations: bool,
121) -> ProtoTextMessage {
122 let root = full_schema.root_schema();
123 let (msg, _, _) = parse_message(pb_bytes, 0, None, root.as_deref(), full_schema, annotations);
124 msg
125}
126
127pub fn parse_message(
137 buf: &[u8],
138 start: usize,
139 my_group: Option<u64>, schema: Option<&MessageSchema>,
141 full_schema: &ParsedSchema, annotations: bool,
143) -> (ProtoTextMessage, usize, Option<WiretagResult>) {
144 let buflen = buf.len();
145 let mut pos = start;
146 let mut message = ProtoTextMessage::default();
147
148 loop {
149 if pos == buflen {
150 return (message, pos, None);
151 }
152
153 let mut field = ProtoTextField::default();
154
155 let tag = parse_wiretag(buf, pos);
158
159 if let Some(wtag_gar) = tag.wtag_gar {
160 if annotations {
162 field.annotations.push("invalid field".to_string());
163 }
164 field.field_number = Some(0);
165 field.content = ProtoTextContent::InvalidTagType(wtag_gar);
166 pos = buflen;
167 message.fields.push(field);
168 continue;
169 }
170
171 let field_number = tag.wfield.unwrap();
172 let wire_type = tag.wtype.unwrap();
173 field.field_number = Some(field_number);
174 if let Some(ohb) = tag.wfield_ohb {
175 field.tag_overhang_count = Some(ohb);
176 }
177 if tag.wfield_oor.is_some() {
178 field.tag_is_out_of_range = true;
179 }
180 pos = tag.next_pos;
181
182 let field_schema: Option<&FieldInfo> =
185 schema.and_then(|s| s.fields.get(&(field_number as u32)));
186
187 if annotations {
188 if schema.is_none() {
189 field.annotations.push("no schema".to_string());
190 } else if let Some(fs) = field_schema {
191 field.annotations.push(format_annotation(fs));
192 } else {
193 field.annotations.push("unknown field".to_string());
194 }
195 }
196
197 match wire_type {
200 WT_VARINT => {
202 let vr = parse_varint(buf, pos);
203 if let Some(varint_gar) = vr.varint_gar {
204 field.content = ProtoTextContent::InvalidVarint(varint_gar);
205 pos = buflen;
206 message.fields.push(field);
207 continue;
208 }
209 pos = vr.next_pos;
210 if let Some(ohb) = vr.varint_ohb {
211 field.value_overhang_count = Some(ohb);
212 }
213 let val = vr.varint.unwrap();
214
215 if let Some(fs) = field_schema {
216 match decode_varint_by_type(val, fs.proto_type) {
217 Ok(content) => field.content = content,
218 Err(TypeMismatch) => {
219 field.proto2_has_type_mismatch = true;
220 field.content = ProtoTextContent::WireVarint(val);
221 }
222 }
223 } else {
224 field.content = ProtoTextContent::WireVarint(val);
225 }
226 }
227
228 WT_I64 => {
230 if pos + 8 > buflen {
231 field.content = ProtoTextContent::InvalidFixed64(buf[pos..].to_vec());
232 pos = buflen;
233 message.fields.push(field);
234 continue;
235 }
236 let data = &buf[pos..pos + 8];
237 pos += 8;
238
239 if let Some(fs) = field_schema {
240 field.content = match fs.proto_type {
241 pt::DOUBLE => ProtoTextContent::Double(decode_double(data)),
242 pt::FIXED64 => ProtoTextContent::PFixed64(decode_fixed64(data)),
243 pt::SFIXED64 => ProtoTextContent::Sfixed64(decode_sfixed64(data)),
244 _ => ProtoTextContent::WireFixed64(decode_fixed64(data)),
245 };
246 } else {
247 field.content = ProtoTextContent::WireFixed64(decode_fixed64(data));
248 }
249 }
250
251 WT_LEN => {
253 let lr = parse_varint(buf, pos);
254 if lr.varint_gar.is_some() {
255 field.content = ProtoTextContent::InvalidBytesLength(buf[pos..].to_vec());
256 pos = buflen;
257 message.fields.push(field);
258 continue;
259 }
260 pos = lr.next_pos;
261 if let Some(ohb) = lr.varint_ohb {
262 field.length_overhang_count = Some(ohb);
263 }
264 let length = lr.varint.unwrap() as usize;
265
266 if pos + length > buflen {
267 field.missing_bytes_count = Some((length - (buflen - pos)) as u64);
268 field.content = ProtoTextContent::TruncatedBytes(buf[pos..].to_vec());
269 pos = buflen;
270 message.fields.push(field);
271 continue;
272 }
273 let data = &buf[pos..pos + length];
274 pos += length;
275
276 decode_len_field(data, field_schema, full_schema, annotations, &mut field);
277 }
278
279 WT_START_GROUP => {
281 let nested_arc: Option<Arc<MessageSchema>> = field_schema
283 .filter(|fs| fs.proto_type == pt::GROUP)
284 .and_then(|fs| fs.nested_type_name.as_deref())
285 .and_then(|name| full_schema.messages.get(name))
286 .cloned();
287
288 let (nested_msg, new_pos, end_tag) = parse_message(
289 buf,
290 pos,
291 Some(field_number),
292 nested_arc.as_deref(),
293 full_schema,
294 annotations,
295 );
296 pos = new_pos;
297
298 if end_tag.is_none() {
299 field.open_ended_group = true;
300 } else if let Some(ref et) = end_tag {
301 if let Some(ohb) = et.wfield_ohb {
302 field.end_tag_overhang_count = Some(ohb);
303 }
304 if et.wfield_oor.is_some() {
305 field.end_tag_is_out_of_range = true;
306 }
307 let end_field = et.wfield.unwrap_or(0);
308 if end_field != field_number {
309 field.mismatched_group_end = Some(end_field);
310 }
311 }
312 field.content = ProtoTextContent::Group(Box::new(nested_msg));
314 }
315
316 WT_END_GROUP => {
318 if my_group.is_none() {
319 field.content = ProtoTextContent::InvalidGroupEnd(buf[pos..].to_vec());
321 pos = buflen;
322 message.fields.push(field);
323 continue;
324 }
325 return (message, pos, Some(tag));
327 }
328
329 WT_I32 => {
331 if pos + 4 > buflen {
332 field.content = ProtoTextContent::InvalidFixed32(buf[pos..].to_vec());
333 pos = buflen;
334 message.fields.push(field);
335 continue;
336 }
337 let data = &buf[pos..pos + 4];
338 pos += 4;
339
340 if let Some(fs) = field_schema {
341 field.content = match fs.proto_type {
342 pt::FLOAT => ProtoTextContent::Float(decode_float(data)),
343 pt::FIXED32 => ProtoTextContent::PFixed32(decode_fixed32(data)),
344 pt::SFIXED32 => ProtoTextContent::Sfixed32(decode_sfixed32(data)),
345 _ => ProtoTextContent::PFixed32(decode_fixed32(data)),
347 };
348 } else {
349 field.content = ProtoTextContent::PFixed32(decode_fixed32(data));
351 }
352 }
353
354 _ => {
355 unreachable!("wire type > 5 should have been caught by parse_wiretag");
357 }
358 }
359
360 message.fields.push(field);
361 }
362}
363
364struct TypeMismatch;
367
368fn decode_varint_by_type(val: u64, proto_type: i32) -> Result<ProtoTextContent, TypeMismatch> {
372 match proto_type {
373 pt::INT64 => {
374 Ok(ProtoTextContent::Int64(decode_int64(val)))
377 }
378 pt::UINT64 => Ok(ProtoTextContent::Uint64(decode_uint64(val))),
379 pt::INT32 => {
380 if val >= (1u64 << 32) {
381 return Err(TypeMismatch);
382 }
383 Ok(ProtoTextContent::Int32(decode_int32(val)))
384 }
385 pt::BOOL => {
386 if val > 1 {
387 return Err(TypeMismatch);
388 }
389 Ok(ProtoTextContent::Bool(decode_bool(val)))
390 }
391 pt::UINT32 => {
392 if val >= (1u64 << 32) {
393 return Err(TypeMismatch);
394 }
395 Ok(ProtoTextContent::Uint32(decode_uint32(val)))
396 }
397 pt::ENUM => {
398 if val >= (1u64 << 32) {
399 return Err(TypeMismatch);
400 }
401 Ok(ProtoTextContent::Enum(decode_int32(val)))
402 }
403 pt::SINT32 => {
404 if val >= (1u64 << 32) {
405 return Err(TypeMismatch);
406 }
407 Ok(ProtoTextContent::Sint32(decode_sint32(val)))
408 }
409 pt::SINT64 => Ok(ProtoTextContent::Sint64(decode_sint64(val))),
410 _ => Err(TypeMismatch),
411 }
412}
413
414fn decode_len_field(
418 data: &[u8],
419 field_schema: Option<&FieldInfo>,
420 full_schema: &ParsedSchema,
421 annotations: bool,
422 field: &mut ProtoTextField,
423) {
424 let Some(fs) = field_schema else {
425 field.content = ProtoTextContent::WireBytes(data.to_vec());
426 return;
427 };
428
429 let is_repeated = fs.label == proto_label::REPEATED;
430
431 if is_repeated && fs.is_packed {
433 decode_packed(data, fs, field);
434 return;
435 }
436
437 if fs.proto_type == pt::STRING {
439 match std::str::from_utf8(data) {
440 Ok(s) => field.content = ProtoTextContent::StringVal(s.to_string()),
441 Err(_) => field.content = ProtoTextContent::InvalidString(data.to_vec()),
442 }
443 return;
444 }
445
446 if fs.proto_type == pt::BYTES {
448 field.content = ProtoTextContent::BytesVal(data.to_vec());
449 return;
450 }
451
452 if fs.proto_type == pt::MESSAGE {
454 let nested_schema = fs
455 .nested_type_name
456 .as_deref()
457 .and_then(|name| full_schema.messages.get(name))
458 .map(|arc| arc.as_ref());
459 let (nested_msg, _, _) =
460 parse_message(data, 0, None, nested_schema, full_schema, annotations);
461 field.content = ProtoTextContent::MessageVal(Box::new(nested_msg));
462 return;
463 }
464
465 field.content = ProtoTextContent::WireBytes(data.to_vec());
467}
468
469fn decode_packed(data: &[u8], fs: &FieldInfo, field: &mut ProtoTextField) {
471 let length = data.len();
472
473 match fs.proto_type {
476 pt::DOUBLE => {
477 let mut vals = Vec::new();
478 let mut i = 0;
479 while i < length {
480 if i + 8 > length {
481 field.content = ProtoTextContent::InvalidPackedRecords(data.to_vec());
482 return;
483 }
484 vals.push(decode_double(&data[i..i + 8]));
485 i += 8;
486 }
487 field.content = ProtoTextContent::Doubles(vals);
488 }
489 pt::FLOAT => {
490 let mut vals = Vec::new();
491 let mut i = 0;
492 while i < length {
493 if i + 4 > length {
494 field.content = ProtoTextContent::InvalidPackedRecords(data.to_vec());
495 return;
496 }
497 vals.push(decode_float(&data[i..i + 4]));
498 i += 4;
499 }
500 field.content = ProtoTextContent::Floats(vals);
501 }
502 pt::FIXED64 => {
503 let mut vals = Vec::new();
504 let mut i = 0;
505 while i < length {
506 if i + 8 > length {
507 field.content = ProtoTextContent::InvalidPackedRecords(data.to_vec());
508 return;
509 }
510 vals.push(decode_fixed64(&data[i..i + 8]));
511 i += 8;
512 }
513 field.content = ProtoTextContent::Fixed64s(vals);
514 }
515 pt::SFIXED64 => {
516 let mut vals = Vec::new();
517 let mut i = 0;
518 while i < length {
519 if i + 8 > length {
520 field.content = ProtoTextContent::InvalidPackedRecords(data.to_vec());
521 return;
522 }
523 vals.push(decode_sfixed64(&data[i..i + 8]));
524 i += 8;
525 }
526 field.content = ProtoTextContent::Sfixed64s(vals);
527 }
528 pt::FIXED32 => {
529 let mut vals = Vec::new();
530 let mut i = 0;
531 while i < length {
532 if i + 4 > length {
533 field.content = ProtoTextContent::InvalidPackedRecords(data.to_vec());
534 return;
535 }
536 vals.push(decode_fixed32(&data[i..i + 4]));
537 i += 4;
538 }
539 field.content = ProtoTextContent::Fixed32s(vals);
540 }
541 pt::SFIXED32 => {
542 let mut vals = Vec::new();
543 let mut i = 0;
544 while i < length {
545 if i + 4 > length {
546 field.content = ProtoTextContent::InvalidPackedRecords(data.to_vec());
547 return;
548 }
549 vals.push(decode_sfixed32(&data[i..i + 4]));
550 i += 4;
551 }
552 field.content = ProtoTextContent::Sfixed32s(vals);
553 }
554 _ => decode_packed_varints(data, fs, field),
556 }
557}
558
559fn decode_packed_varints(data: &[u8], fs: &FieldInfo, field: &mut ProtoTextField) {
562 let length = data.len();
563
564 let mut vals_i64: Vec<i64> = Vec::new();
568 let mut vals_u64: Vec<u64> = Vec::new();
569 let mut vals_i32: Vec<i32> = Vec::new();
570 let mut vals_u32: Vec<u32> = Vec::new();
571 let mut vals_bool: Vec<bool> = Vec::new();
572 let mut vals_enum: Vec<i32> = Vec::new();
573 let mut ohbs: Vec<u64> = Vec::new();
574 let mut i = 0;
575
576 while i < length {
577 let vr = parse_varint(data, i);
578 if vr.varint_gar.is_some() {
579 field.content = ProtoTextContent::InvalidPackedRecords(data.to_vec());
580 return;
581 }
582 i = vr.next_pos;
583 ohbs.push(vr.varint_ohb.unwrap_or(0));
584 let v = vr.varint.unwrap();
585
586 match fs.proto_type {
587 pt::INT64 => {
588 vals_i64.push(decode_int64(v));
589 } pt::UINT64 => {
591 vals_u64.push(v);
592 }
593 pt::INT32 => {
594 if v >= (1u64 << 32) {
595 field.content = ProtoTextContent::InvalidPackedRecords(data.to_vec());
596 return;
597 }
598 vals_i32.push(decode_int32(v));
599 }
600 pt::BOOL => {
601 if v > 1 {
602 field.content = ProtoTextContent::InvalidPackedRecords(data.to_vec());
603 return;
604 }
605 vals_bool.push(v != 0);
606 }
607 pt::UINT32 => {
608 if v >= (1u64 << 32) {
609 field.content = ProtoTextContent::InvalidPackedRecords(data.to_vec());
610 return;
611 }
612 vals_u32.push(decode_uint32(v));
613 }
614 pt::ENUM => {
615 if v >= (1u64 << 32) {
616 field.content = ProtoTextContent::InvalidPackedRecords(data.to_vec());
617 return;
618 }
619 vals_enum.push(decode_int32(v));
620 }
621 pt::SINT32 => {
622 if v >= (1u64 << 32) {
623 field.content = ProtoTextContent::InvalidPackedRecords(data.to_vec());
624 return;
625 }
626 vals_i32.push(decode_sint32(v));
627 }
628 pt::SINT64 => {
629 vals_i64.push(decode_sint64(v));
630 }
631 _ => {
632 field.content = ProtoTextContent::InvalidPackedRecords(data.to_vec());
633 return;
634 }
635 }
636 }
637
638 if ohbs.iter().any(|&x| x > 0) {
640 field.records_overhung_count = ohbs;
641 }
642
643 field.content = match fs.proto_type {
644 pt::INT64 => ProtoTextContent::Int64s(vals_i64),
645 pt::UINT64 => ProtoTextContent::Uint64s(vals_u64),
646 pt::INT32 => ProtoTextContent::Int32s(vals_i32),
647 pt::BOOL => ProtoTextContent::Bools(vals_bool),
648 pt::UINT32 => ProtoTextContent::Uint32s(vals_u32),
649 pt::ENUM => ProtoTextContent::Enums(vals_enum),
650 pt::SINT32 => ProtoTextContent::Sint32s(vals_i32),
651 pt::SINT64 => ProtoTextContent::Sint64s(vals_i64),
652 _ => ProtoTextContent::InvalidPackedRecords(data.to_vec()),
653 };
654}
655
656fn format_annotation(fs: &FieldInfo) -> String {
658 let label = match fs.label {
659 1 => "optional",
660 2 => "required",
661 3 => "repeated",
662 _ => "?",
663 };
664 let type_str = match fs.proto_type {
665 1 => "double",
666 2 => "float",
667 3 => "int64",
668 4 => "uint64",
669 5 => "int32",
670 6 => "fixed64",
671 7 => "fixed32",
672 8 => "bool",
673 9 => "string",
674 10 => "group",
675 11 => "message",
676 12 => "bytes",
677 13 => "uint32",
678 14 => "enum",
679 15 => "sfixed32",
680 16 => "sfixed64",
681 17 => "sint32",
682 18 => "sint64",
683 _ => "?",
684 };
685 let type_display = if matches!(fs.proto_type, 10 | 11 | 14) {
687 fs.type_display_name.as_deref().unwrap_or(type_str)
688 } else {
689 type_str
690 };
691 let packed_suffix = if fs.is_packed { " [packed=true]" } else { "" };
692 format!("{}: {} {}{}", fs.name, label, type_display, packed_suffix)
693}