1use self::types::{
2 AlignedCursor, EnumerationMappings, EventHeaderParser, EventParser, EventPayloadMemberParser,
3 EventPayloadParser, FieldTypeParser, PacketContextParser, PacketContextParserArgs,
4 PacketHeaderParser, Size, StreamParser, StreamReader, UIntParser, UuidParser,
5};
6use crate::{
7 config::{Config, FieldType, NativeByteOrder},
8 error::Error,
9 types::{Event, EventId, LogLevel, Packet, PacketContext, PacketHeader, StreamId},
10};
11use bytes::{Buf, BytesMut};
12use fxhash::FxHashMap;
13use internment::Intern;
14use itertools::Itertools;
15use std::io::Read;
16use tokio_util::codec::Decoder;
17use tracing::{debug, warn};
18use uuid::Uuid;
19
20pub(crate) mod types;
21
22#[derive(Debug)]
24pub struct Parser {
25 byte_order: NativeByteOrder,
26 trace_uuid: Option<Uuid>,
27 pkt_header: PacketHeaderParser,
28 streams: FxHashMap<StreamId, StreamParser>,
29 stream_clocks: FxHashMap<StreamId, Intern<String>>,
30}
31
32impl Parser {
33 pub fn new(cfg: &Config) -> Result<Self, Error> {
34 if let Some(magic_ft) = cfg.trace.typ.features.magic_field_type.as_ft() {
36 if magic_ft.field_type.size != 32 {
37 return Err(Error::UnsupportedFieldType(
38 "magic-field-type".to_owned(),
39 magic_ft.field_type.size,
40 magic_ft.field_type.alignment,
41 ));
42 }
43 }
44
45 let magic = UIntParser::from_opt_uint_ft(&cfg.trace.typ.features.magic_field_type)
46 .map_err(|e| Error::unsupported_ft("magic-field-type", e))?;
47 let uuid = UuidParser::from_bool_ft(cfg.trace.typ.features.uuid_field_type);
48 let stream_id =
49 UIntParser::from_uint_ft(&cfg.trace.typ.features.data_stream_type_id_field_type)
50 .map_err(|e| Error::unsupported_ft("data-stream-type-id-field-type", e))?;
51 let pkt_header = PacketHeaderParser::new(
52 magic,
53 uuid,
54 stream_id,
55 Size::from_bits(cfg.trace.typ.features.alignment())
56 .ok_or_else(|| Error::unsupported_alignment("trace.type.$features"))?,
57 );
58
59 let mut streams = FxHashMap::default();
62 let mut stream_clocks = FxHashMap::default();
63 for (stream_id, (stream_name, stream)) in cfg
64 .trace
65 .typ
66 .data_stream_types
67 .iter()
68 .sorted_by_key(|(name, _)| name.as_str())
69 .enumerate()
70 {
71 if let Some(default_clock) = stream.default_clock_type_name.as_ref() {
72 stream_clocks.insert(stream_id as StreamId, Intern::new(default_clock.to_owned()));
73 }
74
75 let packet_size = UIntParser::from_uint_ft(
77 &stream.features.packet.total_size_field_type,
78 )
79 .map_err(|e| {
80 Error::unsupported_ft(
81 format!(
82 "stream.{}.$features.packet.total-size-field-type",
83 stream_name
84 ),
85 e,
86 )
87 })?;
88 let content_size = UIntParser::from_uint_ft(
89 &stream.features.packet.content_size_field_type,
90 )
91 .map_err(|e| {
92 Error::unsupported_ft(
93 format!(
94 "stream.{}.$features.packet.content-size-field-type",
95 stream_name
96 ),
97 e,
98 )
99 })?;
100 let beginning_timestamp = UIntParser::from_opt_uint_ft(
101 &stream.features.packet.beginning_timestamp_field_type,
102 )
103 .map_err(|e| {
104 Error::unsupported_ft(
105 format!(
106 "stream.{}.$features.packet.beginning-timestamp-field-type",
107 stream_name
108 ),
109 e,
110 )
111 })?;
112 let end_timestamp =
113 UIntParser::from_opt_uint_ft(&stream.features.packet.end_timestamp_field_type)
114 .map_err(|e| {
115 Error::unsupported_ft(
116 format!(
117 "stream.{}.$features.packet.end-timestamp-field-type",
118 stream_name
119 ),
120 e,
121 )
122 })?;
123 let events_discarded = UIntParser::from_opt_uint_ft(
124 &stream
125 .features
126 .packet
127 .discarded_event_records_counter_snapshot_field_type,
128 )
129 .map_err(|e| {
130 Error::unsupported_ft(
131 format!("stream.{}.$features.packet.discarded-event-records-counter-snapshot-field-type", stream_name),
132 e,
133 )
134 })?;
135 let sequence_number =
136 UIntParser::from_opt_uint_ft(&stream.features.packet.sequence_number_field_type)
137 .map_err(|e| {
138 Error::unsupported_ft(
139 format!(
140 "stream.{}.$features.packet.sequence-number-field-type",
141 stream_name
142 ),
143 e,
144 )
145 })?;
146
147 let mut pc_extra_members = Vec::new();
148 let pc_extra_member_alignment =
149 stream.packet_context_field_type_extra_members.alignment();
150 for (pc_member_name, pc_member) in stream
151 .packet_context_field_type_extra_members
152 .0
153 .iter()
154 .flat_map(|m| m.iter())
155 {
156 pc_extra_members.push(EventPayloadMemberParser {
157 member_name: Intern::new(pc_member_name.clone()),
158 preferred_display_base: pc_member.field_type.preferred_display_base(),
159 enum_mappings: EnumerationMappings::from_struct_ft(&pc_member.field_type),
160 value: FieldTypeParser::from_ft(&pc_member.field_type).map_err(|e| {
161 Error::unsupported_ft(
162 format!(
163 "stream.{}.packet-context-field-type-extra-members.{}",
164 stream_name, pc_member_name
165 ),
166 e,
167 )
168 })?,
169 });
170 }
171
172 let common_context = if let Some(cc_field_type) =
174 stream.event_record_common_context_field_type.as_ref()
175 {
176 let mut members = Vec::new();
177 for (member_name, member) in cc_field_type.members.iter().flat_map(|m| m.iter()) {
178 members.push(EventPayloadMemberParser {
179 member_name: Intern::new(member_name.clone()),
180 preferred_display_base: member.field_type.preferred_display_base(),
181 enum_mappings: EnumerationMappings::from_struct_ft(&member.field_type),
182 value: FieldTypeParser::from_ft(&member.field_type).map_err(|e| {
183 Error::unsupported_ft(
184 format!(
185 "stream.{}.event-record-common-context-field-type.{}",
186 stream_name, member_name
187 ),
188 e,
189 )
190 })?,
191 });
192 }
193
194 Some(EventPayloadParser {
195 alignment: Size::from_bits(cc_field_type.alignment()).ok_or_else(|| {
196 Error::unsupported_alignment(format!(
197 "stream.{}.event-record-common-context-field-type",
198 stream_name
199 ))
200 })?,
201 members,
202 })
203 } else {
204 None
205 };
206
207 let mut events = FxHashMap::default();
210 for (event_id, (event_name, event)) in stream
211 .event_record_types
212 .iter()
213 .sorted_by_key(|(name, _)| name.as_str())
214 .enumerate()
215 {
216 let specific_context = if let Some(sc_field_type) =
217 event.specific_context_field_type.as_ref()
218 {
219 let mut members = Vec::new();
220 for (member_name, member) in sc_field_type.members.iter().flat_map(|m| m.iter())
221 {
222 members.push(EventPayloadMemberParser {
223 member_name: Intern::new(member_name.clone()),
224 preferred_display_base: member.field_type.preferred_display_base(),
225 enum_mappings: EnumerationMappings::from_struct_ft(&member.field_type),
226 value: FieldTypeParser::from_ft(&member.field_type)
227 .map_err(|e| {
228 Error::unsupported_ft(
229 format!(
230 "stream.{}.event-record-types.{}.specific-context-field-type.{}",
231 stream_name,
232 event_name,
233 member_name
234 ),
235 e,
236 )
237 })?,
238 });
239 }
240
241 Some(EventPayloadParser {
242 alignment: Size::from_bits(sc_field_type.alignment()).ok_or_else(|| {
243 Error::unsupported_alignment(format!(
244 "stream.{}.event-record-types.{}.specific-context-field-type",
245 stream_name, event_name
246 ))
247 })?,
248 members,
249 })
250 } else {
251 None
252 };
253
254 let payload = if let Some(payload_field_type) = event.payload_field_type.as_ref() {
255 let mut members = Vec::new();
256 for (member_name, member) in
257 payload_field_type.members.iter().flat_map(|m| m.iter())
258 {
259 members.push(EventPayloadMemberParser {
260 member_name: Intern::new(member_name.clone()),
261 preferred_display_base: member.field_type.preferred_display_base(),
262 enum_mappings: EnumerationMappings::from_struct_ft(&member.field_type),
263 value: FieldTypeParser::from_ft(&member.field_type).map_err(|e| {
264 Error::unsupported_ft(
265 format!(
266 "stream.{}.event-record-types.{}.payload-field-type.{}",
267 stream_name, event_name, member_name
268 ),
269 e,
270 )
271 })?,
272 });
273 }
274
275 Some(EventPayloadParser {
276 alignment: Size::from_bits(payload_field_type.alignment()).ok_or_else(
277 || {
278 Error::unsupported_alignment(format!(
279 "stream.{}.event-record-types.{}.payload-field-type",
280 stream_name, event_name
281 ))
282 },
283 )?,
284 members,
285 })
286 } else {
287 None
288 };
289
290 events.insert(
291 event_id as EventId,
292 EventParser {
293 event_name: Intern::new(event_name.clone()),
294 log_level: event.log_level,
295 specific_context,
296 payload,
297 },
298 );
299 }
300
301 streams.insert(
302 stream_id as StreamId,
303 StreamParser {
304 stream_name: Intern::new(stream_name.clone()),
305 packet_context: PacketContextParser::new(
306 PacketContextParserArgs {
307 packet_size,
308 content_size,
309 beginning_timestamp,
310 end_timestamp,
311 events_discarded,
312 sequence_number,
313 extra_members: pc_extra_members,
314 alignment: Size::from_bits(
315 stream
316 .features
317 .packet
318 .alignment()
319 .max(pc_extra_member_alignment),
320 )
321 .ok_or_else(|| {
322 Error::unsupported_alignment(format!(
323 "stream.{}.$features.packet",
324 stream_name
325 ))
326 })?,
327 },
328 &pkt_header.wire_size_hint,
329 ),
330 event_header: EventHeaderParser {
331 event_id: UIntParser::from_uint_ft(
332 &stream.features.event_record.type_id_field_type,
333 )
334 .map_err(|e| {
335 Error::unsupported_ft(
336 format!(
337 "stream.{}.$features.event-record.type-id-field-type",
338 stream_name
339 ),
340 e,
341 )
342 })?,
343 timestamp: UIntParser::from_uint_ft(
344 &stream.features.event_record.timestamp_field_type,
345 )
346 .map_err(|e| {
347 Error::unsupported_ft(
348 format!(
349 "stream.{}.$features.event-record.timestamp-field-type",
350 stream_name
351 ),
352 e,
353 )
354 })?,
355 alignment: Size::from_bits(stream.features.event_record.alignment())
356 .ok_or_else(|| {
357 Error::unsupported_alignment(format!(
358 "stream.{}.$features.event-record",
359 stream_name
360 ))
361 })?,
362 },
363 common_context,
364 events,
365 },
366 );
367 }
368
369 Ok(Self {
370 byte_order: cfg.trace.typ.native_byte_order,
371 trace_uuid: cfg.trace.typ.uuid,
372 pkt_header,
373 streams,
374 stream_clocks,
375 })
376 }
377
378 pub fn into_packet_decoder(self) -> PacketDecoder {
379 PacketDecoder {
380 parser: self,
381 state: PacketDecoderState::Header,
382 }
383 }
384
385 pub fn parse<R: Read>(&self, r: &mut R) -> Result<Packet, Error> {
386 let mut r = StreamReader::new(self.byte_order, r);
387
388 let header = self.parse_header(&mut r)?;
389
390 let stream = self
392 .streams
393 .get(&header.stream_id)
394 .ok_or(Error::UndefinedStreamId(header.stream_id))?;
395
396 let context = Self::parse_packet_context(stream, &mut r)?;
397
398 let events = Self::parse_events(stream, &context, &mut r)?;
399
400 Ok(Packet {
401 header,
402 context,
403 events,
404 })
405 }
406
407 fn parse_header<R: Read>(&self, r: &mut StreamReader<R>) -> Result<PacketHeader, Error> {
408 r.align_to(self.pkt_header.alignment)?;
410
411 let magic = self
413 .pkt_header
414 .magic
415 .as_ref()
416 .map(|p| p.parse(r))
417 .transpose()?
418 .map(|m| m as u32);
419 let trace_uuid = self
420 .pkt_header
421 .uuid
422 .as_ref()
423 .map(|p| p.parse(r))
424 .transpose()?;
425 let stream_id = self.pkt_header.stream_id.parse(r)?;
426 debug!(stream_id, ?magic, ?trace_uuid, "Parsed packet header");
427 if let Some(m) = magic {
428 if m != PacketHeader::MAGIC {
429 warn!(
430 "Invalid packet header magic number 0x{m:X} (expected 0x{:X})",
431 PacketHeader::MAGIC
432 );
433 }
434 }
435 if let (Some(uuid), Some(schema_uuid)) = (trace_uuid.as_ref(), self.trace_uuid.as_ref()) {
436 if uuid != schema_uuid {
437 warn!(
438 trace_uuid = %uuid,
439 %schema_uuid, "Trace type UUID doesn't match"
440 );
441 }
442 }
443
444 debug_assert_eq!(
446 r.cursor_bits(),
447 self.pkt_header.wire_size_hint.cursor_bits()
448 );
449
450 let stream = self
451 .streams
452 .get(&stream_id)
453 .ok_or(Error::UndefinedStreamId(stream_id))?;
454
455 Ok(PacketHeader {
456 magic_number: magic,
457 trace_uuid,
458 stream_id,
459 stream_name: stream.stream_name,
460 clock_name: self.stream_clocks.get(&stream_id).copied(),
461 })
462 }
463
464 fn parse_packet_context<R: Read>(
465 stream: &StreamParser,
466 r: &mut StreamReader<R>,
467 ) -> Result<PacketContext, Error> {
468 r.align_to(stream.packet_context.alignment)?;
470
471 let pkt_size_bits = stream.packet_context.packet_size.parse(r)?;
473 let content_size_bits = stream.packet_context.content_size.parse(r)?;
474 let beginning_timestamp = stream
475 .packet_context
476 .beginning_timestamp
477 .as_ref()
478 .map(|p| p.parse(r))
479 .transpose()?;
480 let end_timestamp = stream
481 .packet_context
482 .end_timestamp
483 .as_ref()
484 .map(|p| p.parse(r))
485 .transpose()?;
486 let events_discarded = stream
487 .packet_context
488 .events_discarded
489 .as_ref()
490 .map(|p| p.parse(r))
491 .transpose()?;
492 let sequence_number = stream
493 .packet_context
494 .sequence_number
495 .as_ref()
496 .map(|p| p.parse(r))
497 .transpose()?;
498
499 let mut extra_members = Vec::new();
501 for member in stream.packet_context.extra_members.iter() {
502 let val = member.parse(r)?;
503 extra_members.push((member.member_name, val));
504 }
505
506 debug!(
507 packet_size = pkt_size_bits,
508 content_size = content_size_bits,
509 ?events_discarded,
510 ?sequence_number,
511 "Parsed packet context"
512 );
513 debug_assert_eq!(
515 r.cursor_bits(),
516 stream.packet_context.wire_size_hint.cursor_bits()
517 );
518
519 Ok(PacketContext {
520 packet_size_bits: pkt_size_bits as _,
521 content_size_bits: content_size_bits as _,
522 beginning_timestamp,
523 end_timestamp,
524 events_discarded,
525 sequence_number,
526 extra_members,
527 })
528 }
529
530 fn parse_events<R: Read>(
531 stream: &StreamParser,
532 packet_context: &PacketContext,
533 r: &mut StreamReader<R>,
534 ) -> Result<Vec<Event>, Error> {
535 let mut events = Vec::new();
536
537 loop {
539 r.align_to(stream.event_header.alignment)?;
541
542 let event_id = stream.event_header.event_id.parse(r)?;
544 let timestamp = stream.event_header.timestamp.parse(r)?;
545 debug!(event_id, timestamp, "Parsed event header");
546
547 let mut common_context = Vec::new();
550 if let Some(p) = stream.common_context.as_ref() {
551 r.align_to(p.alignment)?;
553
554 for member in p.members.iter() {
556 let val = member.parse(r)?;
557 common_context.push((member.member_name, val));
558 }
559 }
560
561 let event = stream
563 .events
564 .get(&event_id)
565 .ok_or(Error::UndefinedEventId(event_id))?;
566
567 let mut specific_context = Vec::new();
569 if let Some(p) = event.specific_context.as_ref() {
570 r.align_to(p.alignment)?;
572
573 for member in p.members.iter() {
575 let val = member.parse(r)?;
576 specific_context.push((member.member_name, val));
577 }
578 }
579
580 let mut payload = Vec::new();
582 if let Some(p) = event.payload.as_ref() {
583 r.align_to(p.alignment)?;
585
586 for member in p.members.iter() {
588 let val = member.parse(r)?;
589 payload.push((member.member_name, val));
590 }
591 }
592
593 events.push(Event {
594 id: event_id,
595 name: event.event_name,
596 timestamp,
597 log_level: event.log_level.map(LogLevel::from),
598 specific_context,
599 common_context,
600 payload,
601 });
602
603 debug_assert!(r.cursor_bits() <= packet_context.content_size_bits);
605 if r.cursor_bits() == packet_context.content_size_bits {
606 break;
607 }
608 }
609
610 let remaining_bits = packet_context.packet_size_bits - packet_context.content_size_bits;
612 if remaining_bits != 0 {
613 let remaining_bytes = remaining_bits >> 3;
614 for _ in 0..remaining_bytes {
615 r.inner.read_u8()?;
617 }
618 }
619
620 Ok(events)
621 }
622}
623
624#[derive(Debug)]
626pub struct PacketDecoder {
627 parser: Parser,
628 state: PacketDecoderState,
629}
630
631#[derive(Debug)]
632enum PacketDecoderState {
633 Header,
634 PacketContext(PacketHeader, AlignedCursor),
635 Events(PacketHeader, PacketContext, AlignedCursor),
636}
637
638impl Decoder for PacketDecoder {
639 type Item = Packet;
640 type Error = Error;
641
642 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
643 loop {
645 match std::mem::replace(&mut self.state, PacketDecoderState::Header) {
646 PacketDecoderState::Header => {
647 if src.len() < self.parser.pkt_header.wire_size_hint.cursor_bytes() {
648 self.state = PacketDecoderState::Header;
650 return Ok(None);
651 }
652
653 let mut src_reader = src.reader();
654 let mut r = StreamReader::new(self.parser.byte_order, &mut src_reader);
655 let header = self.parser.parse_header(&mut r)?;
656 let cursor = r.into_cursor();
657
658 self.state = PacketDecoderState::PacketContext(header, cursor);
659 }
660 PacketDecoderState::PacketContext(header, cursor) => {
661 let stream = self
663 .parser
664 .streams
665 .get(&header.stream_id)
666 .ok_or(Error::UndefinedStreamId(header.stream_id))?;
667
668 let context_bytes_remaining =
669 stream.packet_context.wire_size_hint.cursor_bytes() - cursor.cursor_bytes();
670 if src.len() < context_bytes_remaining {
671 self.state = PacketDecoderState::PacketContext(header, cursor);
673 return Ok(None);
674 }
675
676 let mut src_reader = src.reader();
677 let mut r = StreamReader::new_with_cursor(
678 self.parser.byte_order,
679 cursor,
680 &mut src_reader,
681 );
682
683 let packet_context = Parser::parse_packet_context(stream, &mut r)?;
684 let cursor = r.into_cursor();
685
686 self.state = PacketDecoderState::Events(header, packet_context, cursor);
687 }
688 PacketDecoderState::Events(header, packet_context, cursor) => {
689 let remaining_bytes = packet_context.packet_size() - cursor.cursor_bytes();
690 if src.len() < remaining_bytes {
691 self.state = PacketDecoderState::Events(header, packet_context, cursor);
693 return Ok(None);
694 }
695
696 let stream = self
697 .parser
698 .streams
699 .get(&header.stream_id)
700 .ok_or(Error::UndefinedStreamId(header.stream_id))?;
701
702 let mut src_reader = src.reader();
703 let mut r = StreamReader::new_with_cursor(
704 self.parser.byte_order,
705 cursor,
706 &mut src_reader,
707 );
708
709 let events = Parser::parse_events(stream, &packet_context, &mut r)?;
710
711 let pkt = Packet {
712 header,
713 context: packet_context,
714 events,
715 };
716 self.state = PacketDecoderState::Header;
717 return Ok(Some(pkt));
718 }
719 }
720 }
721 }
722}