1use self::types::{
2 AlignedCursor, EnumerationMappings, EventHeaderParser, EventParser, EventPayloadMemberParser,
3 EventPayloadParser, FieldTypeParser, PacketContextParser, PacketContextParserArgs,
4 PacketHeaderParser, Size, StreamParser, StreamReader, UIntParser, UuidParser,
5};
6use crate::{
7 config::{ClockType, Config, FieldType, NativeByteOrder},
8 error::Error,
9 types::{Event, EventId, LogLevel, Packet, PacketContext, PacketHeader, StreamId},
10};
11use bytes::{Buf, BytesMut};
12use fxhash::FxHashMap;
13use internment::Intern;
14use itertools::Itertools;
15use std::io::Read;
16use tokio_util::codec::Decoder;
17use tracing::{debug, warn};
18use uuid::Uuid;
19
20pub(crate) mod types;
21
22#[derive(Debug)]
24pub struct Parser {
25 byte_order: NativeByteOrder,
26 trace_uuid: Option<Uuid>,
27 pkt_header: PacketHeaderParser,
28 streams: FxHashMap<StreamId, StreamParser>,
29 stream_clocks: FxHashMap<StreamId, Intern<String>>,
30 stream_clock_types: FxHashMap<StreamId, Intern<ClockType>>,
31}
32
33impl Parser {
34 pub fn new(cfg: &Config) -> Result<Self, Error> {
35 if let Some(magic_ft) = cfg.trace.typ.features.magic_field_type.as_ft() {
37 if magic_ft.field_type.size != 32 {
38 return Err(Error::UnsupportedFieldType(
39 "magic-field-type".to_owned(),
40 magic_ft.field_type.size,
41 magic_ft.field_type.alignment,
42 ));
43 }
44 }
45
46 let magic = UIntParser::from_opt_uint_ft(&cfg.trace.typ.features.magic_field_type)
47 .map_err(|e| Error::unsupported_ft("magic-field-type", e))?;
48 let uuid = UuidParser::from_bool_ft(cfg.trace.typ.features.uuid_field_type);
49 let stream_id =
50 UIntParser::from_uint_ft(&cfg.trace.typ.features.data_stream_type_id_field_type)
51 .map_err(|e| Error::unsupported_ft("data-stream-type-id-field-type", e))?;
52 let pkt_header = PacketHeaderParser::new(
53 magic,
54 uuid,
55 stream_id,
56 Size::from_bits(cfg.trace.typ.features.alignment())
57 .ok_or_else(|| Error::unsupported_alignment("trace.type.$features"))?,
58 );
59
60 let mut streams = FxHashMap::default();
63 let mut stream_clocks = FxHashMap::default();
64 let mut stream_clock_types = FxHashMap::default();
65 for (stream_id, (stream_name, stream)) in cfg
66 .trace
67 .typ
68 .data_stream_types
69 .iter()
70 .sorted_by_key(|(name, _)| name.as_str())
71 .enumerate()
72 {
73 if let Some(default_clock) = stream.default_clock_type_name.as_ref() {
74 stream_clocks.insert(stream_id as StreamId, Intern::new(default_clock.to_owned()));
75 if let Some(clock_type) = cfg.trace.typ.clock_types.get(default_clock) {
76 stream_clock_types
77 .insert(stream_id as StreamId, Intern::new(clock_type.clone()));
78 }
79 }
80
81 let packet_size = UIntParser::from_uint_ft(
83 &stream.features.packet.total_size_field_type,
84 )
85 .map_err(|e| {
86 Error::unsupported_ft(
87 format!(
88 "stream.{}.$features.packet.total-size-field-type",
89 stream_name
90 ),
91 e,
92 )
93 })?;
94 let content_size = UIntParser::from_uint_ft(
95 &stream.features.packet.content_size_field_type,
96 )
97 .map_err(|e| {
98 Error::unsupported_ft(
99 format!(
100 "stream.{}.$features.packet.content-size-field-type",
101 stream_name
102 ),
103 e,
104 )
105 })?;
106 let beginning_timestamp = UIntParser::from_opt_uint_ft(
107 &stream.features.packet.beginning_timestamp_field_type,
108 )
109 .map_err(|e| {
110 Error::unsupported_ft(
111 format!(
112 "stream.{}.$features.packet.beginning-timestamp-field-type",
113 stream_name
114 ),
115 e,
116 )
117 })?;
118 let end_timestamp =
119 UIntParser::from_opt_uint_ft(&stream.features.packet.end_timestamp_field_type)
120 .map_err(|e| {
121 Error::unsupported_ft(
122 format!(
123 "stream.{}.$features.packet.end-timestamp-field-type",
124 stream_name
125 ),
126 e,
127 )
128 })?;
129 let events_discarded = UIntParser::from_opt_uint_ft(
130 &stream
131 .features
132 .packet
133 .discarded_event_records_counter_snapshot_field_type,
134 )
135 .map_err(|e| {
136 Error::unsupported_ft(
137 format!("stream.{}.$features.packet.discarded-event-records-counter-snapshot-field-type", stream_name),
138 e,
139 )
140 })?;
141 let sequence_number =
142 UIntParser::from_opt_uint_ft(&stream.features.packet.sequence_number_field_type)
143 .map_err(|e| {
144 Error::unsupported_ft(
145 format!(
146 "stream.{}.$features.packet.sequence-number-field-type",
147 stream_name
148 ),
149 e,
150 )
151 })?;
152
153 let mut pc_extra_members = Vec::new();
154 let pc_extra_member_alignment =
155 stream.packet_context_field_type_extra_members.alignment();
156 for (pc_member_name, pc_member) in stream
157 .packet_context_field_type_extra_members
158 .0
159 .iter()
160 .flat_map(|m| m.iter())
161 {
162 pc_extra_members.push(EventPayloadMemberParser {
163 member_name: Intern::new(pc_member_name.clone()),
164 preferred_display_base: pc_member.field_type.preferred_display_base(),
165 enum_mappings: EnumerationMappings::from_struct_ft(&pc_member.field_type),
166 value: FieldTypeParser::from_ft(&pc_member.field_type).map_err(|e| {
167 Error::unsupported_ft(
168 format!(
169 "stream.{}.packet-context-field-type-extra-members.{}",
170 stream_name, pc_member_name
171 ),
172 e,
173 )
174 })?,
175 });
176 }
177
178 let common_context = if let Some(cc_field_type) =
180 stream.event_record_common_context_field_type.as_ref()
181 {
182 let mut members = Vec::new();
183 for (member_name, member) in cc_field_type.members.iter().flat_map(|m| m.iter()) {
184 members.push(EventPayloadMemberParser {
185 member_name: Intern::new(member_name.clone()),
186 preferred_display_base: member.field_type.preferred_display_base(),
187 enum_mappings: EnumerationMappings::from_struct_ft(&member.field_type),
188 value: FieldTypeParser::from_ft(&member.field_type).map_err(|e| {
189 Error::unsupported_ft(
190 format!(
191 "stream.{}.event-record-common-context-field-type.{}",
192 stream_name, member_name
193 ),
194 e,
195 )
196 })?,
197 });
198 }
199
200 Some(EventPayloadParser {
201 alignment: Size::from_bits(cc_field_type.alignment()).ok_or_else(|| {
202 Error::unsupported_alignment(format!(
203 "stream.{}.event-record-common-context-field-type",
204 stream_name
205 ))
206 })?,
207 members,
208 })
209 } else {
210 None
211 };
212
213 let mut events = FxHashMap::default();
216 for (event_id, (event_name, event)) in stream
217 .event_record_types
218 .iter()
219 .sorted_by_key(|(name, _)| name.as_str())
220 .enumerate()
221 {
222 let specific_context = if let Some(sc_field_type) =
223 event.specific_context_field_type.as_ref()
224 {
225 let mut members = Vec::new();
226 for (member_name, member) in sc_field_type.members.iter().flat_map(|m| m.iter())
227 {
228 members.push(EventPayloadMemberParser {
229 member_name: Intern::new(member_name.clone()),
230 preferred_display_base: member.field_type.preferred_display_base(),
231 enum_mappings: EnumerationMappings::from_struct_ft(&member.field_type),
232 value: FieldTypeParser::from_ft(&member.field_type)
233 .map_err(|e| {
234 Error::unsupported_ft(
235 format!(
236 "stream.{}.event-record-types.{}.specific-context-field-type.{}",
237 stream_name,
238 event_name,
239 member_name
240 ),
241 e,
242 )
243 })?,
244 });
245 }
246
247 Some(EventPayloadParser {
248 alignment: Size::from_bits(sc_field_type.alignment()).ok_or_else(|| {
249 Error::unsupported_alignment(format!(
250 "stream.{}.event-record-types.{}.specific-context-field-type",
251 stream_name, event_name
252 ))
253 })?,
254 members,
255 })
256 } else {
257 None
258 };
259
260 let payload = if let Some(payload_field_type) = event.payload_field_type.as_ref() {
261 let mut members = Vec::new();
262 for (member_name, member) in
263 payload_field_type.members.iter().flat_map(|m| m.iter())
264 {
265 members.push(EventPayloadMemberParser {
266 member_name: Intern::new(member_name.clone()),
267 preferred_display_base: member.field_type.preferred_display_base(),
268 enum_mappings: EnumerationMappings::from_struct_ft(&member.field_type),
269 value: FieldTypeParser::from_ft(&member.field_type).map_err(|e| {
270 Error::unsupported_ft(
271 format!(
272 "stream.{}.event-record-types.{}.payload-field-type.{}",
273 stream_name, event_name, member_name
274 ),
275 e,
276 )
277 })?,
278 });
279 }
280
281 Some(EventPayloadParser {
282 alignment: Size::from_bits(payload_field_type.alignment()).ok_or_else(
283 || {
284 Error::unsupported_alignment(format!(
285 "stream.{}.event-record-types.{}.payload-field-type",
286 stream_name, event_name
287 ))
288 },
289 )?,
290 members,
291 })
292 } else {
293 None
294 };
295
296 events.insert(
297 event_id as EventId,
298 EventParser {
299 event_name: Intern::new(event_name.clone()),
300 log_level: event.log_level,
301 specific_context,
302 payload,
303 },
304 );
305 }
306
307 streams.insert(
308 stream_id as StreamId,
309 StreamParser {
310 stream_name: Intern::new(stream_name.clone()),
311 packet_context: PacketContextParser::new(
312 PacketContextParserArgs {
313 packet_size,
314 content_size,
315 beginning_timestamp,
316 end_timestamp,
317 events_discarded,
318 sequence_number,
319 extra_members: pc_extra_members,
320 alignment: Size::from_bits(
321 stream
322 .features
323 .packet
324 .alignment()
325 .max(pc_extra_member_alignment),
326 )
327 .ok_or_else(|| {
328 Error::unsupported_alignment(format!(
329 "stream.{}.$features.packet",
330 stream_name
331 ))
332 })?,
333 },
334 &pkt_header.wire_size_hint,
335 ),
336 event_header: EventHeaderParser {
337 event_id: UIntParser::from_uint_ft(
338 &stream.features.event_record.type_id_field_type,
339 )
340 .map_err(|e| {
341 Error::unsupported_ft(
342 format!(
343 "stream.{}.$features.event-record.type-id-field-type",
344 stream_name
345 ),
346 e,
347 )
348 })?,
349 timestamp: UIntParser::from_uint_ft(
350 &stream.features.event_record.timestamp_field_type,
351 )
352 .map_err(|e| {
353 Error::unsupported_ft(
354 format!(
355 "stream.{}.$features.event-record.timestamp-field-type",
356 stream_name
357 ),
358 e,
359 )
360 })?,
361 alignment: Size::from_bits(stream.features.event_record.alignment())
362 .ok_or_else(|| {
363 Error::unsupported_alignment(format!(
364 "stream.{}.$features.event-record",
365 stream_name
366 ))
367 })?,
368 },
369 common_context,
370 events,
371 },
372 );
373 }
374
375 Ok(Self {
376 byte_order: cfg.trace.typ.native_byte_order,
377 trace_uuid: cfg.trace.typ.uuid,
378 pkt_header,
379 streams,
380 stream_clocks,
381 stream_clock_types,
382 })
383 }
384
385 pub fn into_packet_decoder(self) -> PacketDecoder {
386 PacketDecoder {
387 parser: self,
388 state: PacketDecoderState::Header,
389 }
390 }
391
392 pub fn parse<R: Read>(&self, r: &mut R) -> Result<Packet, Error> {
393 let mut r = StreamReader::new(self.byte_order, r);
394
395 let header = self.parse_header(&mut r)?;
396
397 let stream = self
399 .streams
400 .get(&header.stream_id)
401 .ok_or(Error::UndefinedStreamId(header.stream_id))?;
402
403 let context = Self::parse_packet_context(stream, &mut r)?;
404
405 let events = Self::parse_events(stream, &context, &mut r)?;
406
407 Ok(Packet {
408 header,
409 context,
410 events,
411 })
412 }
413
414 fn parse_header<R: Read>(&self, r: &mut StreamReader<R>) -> Result<PacketHeader, Error> {
415 r.align_to(self.pkt_header.alignment)?;
417
418 let magic = self
420 .pkt_header
421 .magic
422 .as_ref()
423 .map(|p| p.parse(r))
424 .transpose()?
425 .map(|m| m as u32);
426 let trace_uuid = self
427 .pkt_header
428 .uuid
429 .as_ref()
430 .map(|p| p.parse(r))
431 .transpose()?;
432 let stream_id = self.pkt_header.stream_id.parse(r)?;
433 debug!(stream_id, ?magic, ?trace_uuid, "Parsed packet header");
434 if let Some(m) = magic {
435 if m != PacketHeader::MAGIC {
436 warn!(
437 "Invalid packet header magic number 0x{m:X} (expected 0x{:X})",
438 PacketHeader::MAGIC
439 );
440 }
441 }
442 if let (Some(uuid), Some(schema_uuid)) = (trace_uuid.as_ref(), self.trace_uuid.as_ref()) {
443 if uuid != schema_uuid {
444 warn!(
445 trace_uuid = %uuid,
446 %schema_uuid, "Trace type UUID doesn't match"
447 );
448 }
449 }
450
451 debug_assert_eq!(
453 r.cursor_bits(),
454 self.pkt_header.wire_size_hint.cursor_bits()
455 );
456
457 let stream = self
458 .streams
459 .get(&stream_id)
460 .ok_or(Error::UndefinedStreamId(stream_id))?;
461
462 Ok(PacketHeader {
463 magic_number: magic,
464 trace_uuid,
465 stream_id,
466 stream_name: stream.stream_name,
467 clock_name: self.stream_clocks.get(&stream_id).copied(),
468 clock_type: self.stream_clock_types.get(&stream_id).copied(),
469 })
470 }
471
472 fn parse_packet_context<R: Read>(
473 stream: &StreamParser,
474 r: &mut StreamReader<R>,
475 ) -> Result<PacketContext, Error> {
476 r.align_to(stream.packet_context.alignment)?;
478
479 let pkt_size_bits = stream.packet_context.packet_size.parse(r)?;
481 let content_size_bits = stream.packet_context.content_size.parse(r)?;
482 let beginning_timestamp = stream
483 .packet_context
484 .beginning_timestamp
485 .as_ref()
486 .map(|p| p.parse(r))
487 .transpose()?;
488 let end_timestamp = stream
489 .packet_context
490 .end_timestamp
491 .as_ref()
492 .map(|p| p.parse(r))
493 .transpose()?;
494 let events_discarded = stream
495 .packet_context
496 .events_discarded
497 .as_ref()
498 .map(|p| p.parse(r))
499 .transpose()?;
500 let sequence_number = stream
501 .packet_context
502 .sequence_number
503 .as_ref()
504 .map(|p| p.parse(r))
505 .transpose()?;
506
507 let mut extra_members = Vec::new();
509 for member in stream.packet_context.extra_members.iter() {
510 let val = member.parse(r)?;
511 extra_members.push((member.member_name, val));
512 }
513
514 debug!(
515 packet_size = pkt_size_bits,
516 content_size = content_size_bits,
517 ?events_discarded,
518 ?sequence_number,
519 "Parsed packet context"
520 );
521 debug_assert_eq!(
523 r.cursor_bits(),
524 stream.packet_context.wire_size_hint.cursor_bits()
525 );
526
527 Ok(PacketContext {
528 packet_size_bits: pkt_size_bits as _,
529 content_size_bits: content_size_bits as _,
530 beginning_timestamp,
531 end_timestamp,
532 events_discarded,
533 sequence_number,
534 extra_members,
535 })
536 }
537
538 fn parse_events<R: Read>(
539 stream: &StreamParser,
540 packet_context: &PacketContext,
541 r: &mut StreamReader<R>,
542 ) -> Result<Vec<Event>, Error> {
543 let mut events = Vec::new();
544
545 loop {
547 r.align_to(stream.event_header.alignment)?;
549
550 let event_id = stream.event_header.event_id.parse(r)?;
552 let timestamp = stream.event_header.timestamp.parse(r)?;
553 debug!(event_id, timestamp, "Parsed event header");
554
555 let mut common_context = Vec::new();
558 if let Some(p) = stream.common_context.as_ref() {
559 r.align_to(p.alignment)?;
561
562 for member in p.members.iter() {
564 let val = member.parse(r)?;
565 common_context.push((member.member_name, val));
566 }
567 }
568
569 let event = stream
571 .events
572 .get(&event_id)
573 .ok_or(Error::UndefinedEventId(event_id))?;
574
575 let mut specific_context = Vec::new();
577 if let Some(p) = event.specific_context.as_ref() {
578 r.align_to(p.alignment)?;
580
581 for member in p.members.iter() {
583 let val = member.parse(r)?;
584 specific_context.push((member.member_name, val));
585 }
586 }
587
588 let mut payload = Vec::new();
590 if let Some(p) = event.payload.as_ref() {
591 r.align_to(p.alignment)?;
593
594 for member in p.members.iter() {
596 let val = member.parse(r)?;
597 payload.push((member.member_name, val));
598 }
599 }
600
601 events.push(Event {
602 id: event_id,
603 name: event.event_name,
604 timestamp,
605 log_level: event.log_level.map(LogLevel::from),
606 specific_context,
607 common_context,
608 payload,
609 });
610
611 debug_assert!(r.cursor_bits() <= packet_context.content_size_bits);
613 if r.cursor_bits() == packet_context.content_size_bits {
614 break;
615 }
616 }
617
618 let remaining_bits = packet_context.packet_size_bits - packet_context.content_size_bits;
620 if remaining_bits != 0 {
621 let remaining_bytes = remaining_bits >> 3;
622 for _ in 0..remaining_bytes {
623 r.inner.read_u8()?;
625 }
626 }
627
628 Ok(events)
629 }
630}
631
632#[derive(Debug)]
634pub struct PacketDecoder {
635 parser: Parser,
636 state: PacketDecoderState,
637}
638
639#[derive(Debug)]
640enum PacketDecoderState {
641 Header,
642 PacketContext(PacketHeader, AlignedCursor),
643 Events(PacketHeader, PacketContext, AlignedCursor),
644}
645
646impl Decoder for PacketDecoder {
647 type Item = Packet;
648 type Error = Error;
649
650 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
651 loop {
653 match std::mem::replace(&mut self.state, PacketDecoderState::Header) {
654 PacketDecoderState::Header => {
655 if src.len() < self.parser.pkt_header.wire_size_hint.cursor_bytes() {
656 self.state = PacketDecoderState::Header;
658 return Ok(None);
659 }
660
661 let mut src_reader = src.reader();
662 let mut r = StreamReader::new(self.parser.byte_order, &mut src_reader);
663 let header = self.parser.parse_header(&mut r)?;
664 let cursor = r.into_cursor();
665
666 self.state = PacketDecoderState::PacketContext(header, cursor);
667 }
668 PacketDecoderState::PacketContext(header, cursor) => {
669 let stream = self
671 .parser
672 .streams
673 .get(&header.stream_id)
674 .ok_or(Error::UndefinedStreamId(header.stream_id))?;
675
676 let context_bytes_remaining =
677 stream.packet_context.wire_size_hint.cursor_bytes() - cursor.cursor_bytes();
678 if src.len() < context_bytes_remaining {
679 self.state = PacketDecoderState::PacketContext(header, cursor);
681 return Ok(None);
682 }
683
684 let mut src_reader = src.reader();
685 let mut r = StreamReader::new_with_cursor(
686 self.parser.byte_order,
687 cursor,
688 &mut src_reader,
689 );
690
691 let packet_context = Parser::parse_packet_context(stream, &mut r)?;
692 let cursor = r.into_cursor();
693
694 self.state = PacketDecoderState::Events(header, packet_context, cursor);
695 }
696 PacketDecoderState::Events(header, packet_context, cursor) => {
697 let remaining_bytes = packet_context.packet_size() - cursor.cursor_bytes();
698 if src.len() < remaining_bytes {
699 self.state = PacketDecoderState::Events(header, packet_context, cursor);
701 return Ok(None);
702 }
703
704 let stream = self
705 .parser
706 .streams
707 .get(&header.stream_id)
708 .ok_or(Error::UndefinedStreamId(header.stream_id))?;
709
710 let mut src_reader = src.reader();
711 let mut r = StreamReader::new_with_cursor(
712 self.parser.byte_order,
713 cursor,
714 &mut src_reader,
715 );
716
717 let events = Parser::parse_events(stream, &packet_context, &mut r)?;
718
719 let pkt = Packet {
720 header,
721 context: packet_context,
722 events,
723 };
724 self.state = PacketDecoderState::Header;
725 return Ok(Some(pkt));
726 }
727 }
728 }
729 }
730}