barectf_parser/parser/
mod.rs

1use self::types::{
2    AlignedCursor, EnumerationMappings, EventHeaderParser, EventParser, EventPayloadMemberParser,
3    EventPayloadParser, FieldTypeParser, PacketContextParser, PacketContextParserArgs,
4    PacketHeaderParser, Size, StreamParser, StreamReader, UIntParser, UuidParser,
5};
6use crate::{
7    config::{Config, FieldType, NativeByteOrder},
8    error::Error,
9    types::{Event, EventId, LogLevel, Packet, PacketContext, PacketHeader, StreamId},
10};
11use bytes::{Buf, BytesMut};
12use fxhash::FxHashMap;
13use internment::Intern;
14use itertools::Itertools;
15use std::io::Read;
16use tokio_util::codec::Decoder;
17use tracing::{debug, warn};
18use uuid::Uuid;
19
20pub(crate) mod types;
21
22/// A barectf CTF byte-stream parser.
23#[derive(Debug)]
24pub struct Parser {
25    byte_order: NativeByteOrder,
26    trace_uuid: Option<Uuid>,
27    pkt_header: PacketHeaderParser,
28    streams: FxHashMap<StreamId, StreamParser>,
29    stream_clocks: FxHashMap<StreamId, Intern<String>>,
30}
31
32impl Parser {
33    pub fn new(cfg: &Config) -> Result<Self, Error> {
34        // Do some basic semantic checks
35        if let Some(magic_ft) = cfg.trace.typ.features.magic_field_type.as_ft() {
36            if magic_ft.field_type.size != 32 {
37                return Err(Error::UnsupportedFieldType(
38                    "magic-field-type".to_owned(),
39                    magic_ft.field_type.size,
40                    magic_ft.field_type.alignment,
41                ));
42            }
43        }
44
45        let magic = UIntParser::from_opt_uint_ft(&cfg.trace.typ.features.magic_field_type)
46            .map_err(|e| Error::unsupported_ft("magic-field-type", e))?;
47        let uuid = UuidParser::from_bool_ft(cfg.trace.typ.features.uuid_field_type);
48        let stream_id =
49            UIntParser::from_uint_ft(&cfg.trace.typ.features.data_stream_type_id_field_type)
50                .map_err(|e| Error::unsupported_ft("data-stream-type-id-field-type", e))?;
51        let pkt_header = PacketHeaderParser::new(
52            magic,
53            uuid,
54            stream_id,
55            Size::from_bits(cfg.trace.typ.features.alignment())
56                .ok_or_else(|| Error::unsupported_alignment("trace.type.$features"))?,
57        );
58
59        // Per-stream packet parsers
60        // NOTE: barectf generates stream IDs based on alphabetical order of stream name
61        let mut streams = FxHashMap::default();
62        let mut stream_clocks = FxHashMap::default();
63        for (stream_id, (stream_name, stream)) in cfg
64            .trace
65            .typ
66            .data_stream_types
67            .iter()
68            .sorted_by_key(|(name, _)| name.as_str())
69            .enumerate()
70        {
71            if let Some(default_clock) = stream.default_clock_type_name.as_ref() {
72                stream_clocks.insert(stream_id as StreamId, Intern::new(default_clock.to_owned()));
73            }
74
75            // These are required by barectf
76            let packet_size = UIntParser::from_uint_ft(
77                &stream.features.packet.total_size_field_type,
78            )
79            .map_err(|e| {
80                Error::unsupported_ft(
81                    format!(
82                        "stream.{}.$features.packet.total-size-field-type",
83                        stream_name
84                    ),
85                    e,
86                )
87            })?;
88            let content_size = UIntParser::from_uint_ft(
89                &stream.features.packet.content_size_field_type,
90            )
91            .map_err(|e| {
92                Error::unsupported_ft(
93                    format!(
94                        "stream.{}.$features.packet.content-size-field-type",
95                        stream_name
96                    ),
97                    e,
98                )
99            })?;
100            let beginning_timestamp = UIntParser::from_opt_uint_ft(
101                &stream.features.packet.beginning_timestamp_field_type,
102            )
103            .map_err(|e| {
104                Error::unsupported_ft(
105                    format!(
106                        "stream.{}.$features.packet.beginning-timestamp-field-type",
107                        stream_name
108                    ),
109                    e,
110                )
111            })?;
112            let end_timestamp =
113                UIntParser::from_opt_uint_ft(&stream.features.packet.end_timestamp_field_type)
114                    .map_err(|e| {
115                        Error::unsupported_ft(
116                            format!(
117                                "stream.{}.$features.packet.end-timestamp-field-type",
118                                stream_name
119                            ),
120                            e,
121                        )
122                    })?;
123            let events_discarded = UIntParser::from_opt_uint_ft(
124                &stream
125                    .features
126                    .packet
127                    .discarded_event_records_counter_snapshot_field_type,
128            )
129            .map_err(|e| {
130                Error::unsupported_ft(
131                    format!("stream.{}.$features.packet.discarded-event-records-counter-snapshot-field-type", stream_name),
132                    e,
133                )
134            })?;
135            let sequence_number =
136                UIntParser::from_opt_uint_ft(&stream.features.packet.sequence_number_field_type)
137                    .map_err(|e| {
138                        Error::unsupported_ft(
139                            format!(
140                                "stream.{}.$features.packet.sequence-number-field-type",
141                                stream_name
142                            ),
143                            e,
144                        )
145                    })?;
146
147            let mut pc_extra_members = Vec::new();
148            let pc_extra_member_alignment =
149                stream.packet_context_field_type_extra_members.alignment();
150            for (pc_member_name, pc_member) in stream
151                .packet_context_field_type_extra_members
152                .0
153                .iter()
154                .flat_map(|m| m.iter())
155            {
156                pc_extra_members.push(EventPayloadMemberParser {
157                    member_name: Intern::new(pc_member_name.clone()),
158                    preferred_display_base: pc_member.field_type.preferred_display_base(),
159                    enum_mappings: EnumerationMappings::from_struct_ft(&pc_member.field_type),
160                    value: FieldTypeParser::from_ft(&pc_member.field_type).map_err(|e| {
161                        Error::unsupported_ft(
162                            format!(
163                                "stream.{}.packet-context-field-type-extra-members.{}",
164                                stream_name, pc_member_name
165                            ),
166                            e,
167                        )
168                    })?,
169                });
170            }
171
172            // Event common context
173            let common_context = if let Some(cc_field_type) =
174                stream.event_record_common_context_field_type.as_ref()
175            {
176                let mut members = Vec::new();
177                for (member_name, member) in cc_field_type.members.iter().flat_map(|m| m.iter()) {
178                    members.push(EventPayloadMemberParser {
179                        member_name: Intern::new(member_name.clone()),
180                        preferred_display_base: member.field_type.preferred_display_base(),
181                        enum_mappings: EnumerationMappings::from_struct_ft(&member.field_type),
182                        value: FieldTypeParser::from_ft(&member.field_type).map_err(|e| {
183                            Error::unsupported_ft(
184                                format!(
185                                    "stream.{}.event-record-common-context-field-type.{}",
186                                    stream_name, member_name
187                                ),
188                                e,
189                            )
190                        })?,
191                    });
192                }
193
194                Some(EventPayloadParser {
195                    alignment: Size::from_bits(cc_field_type.alignment()).ok_or_else(|| {
196                        Error::unsupported_alignment(format!(
197                            "stream.{}.event-record-common-context-field-type",
198                            stream_name
199                        ))
200                    })?,
201                    members,
202                })
203            } else {
204                None
205            };
206
207            // Per-event event parsers
208            // NOTE: barectf generates event IDs based on alphabetical order of event name
209            let mut events = FxHashMap::default();
210            for (event_id, (event_name, event)) in stream
211                .event_record_types
212                .iter()
213                .sorted_by_key(|(name, _)| name.as_str())
214                .enumerate()
215            {
216                let specific_context = if let Some(sc_field_type) =
217                    event.specific_context_field_type.as_ref()
218                {
219                    let mut members = Vec::new();
220                    for (member_name, member) in sc_field_type.members.iter().flat_map(|m| m.iter())
221                    {
222                        members.push(EventPayloadMemberParser {
223                            member_name: Intern::new(member_name.clone()),
224                            preferred_display_base: member.field_type.preferred_display_base(),
225                            enum_mappings: EnumerationMappings::from_struct_ft(&member.field_type),
226                            value: FieldTypeParser::from_ft(&member.field_type)
227                                .map_err(|e| {
228                                Error::unsupported_ft(
229                                    format!(
230                                        "stream.{}.event-record-types.{}.specific-context-field-type.{}",
231                                        stream_name,
232                                        event_name,
233                                        member_name
234                                    ),
235                                    e,
236                                )
237                            })?,
238                        });
239                    }
240
241                    Some(EventPayloadParser {
242                        alignment: Size::from_bits(sc_field_type.alignment()).ok_or_else(|| {
243                            Error::unsupported_alignment(format!(
244                                "stream.{}.event-record-types.{}.specific-context-field-type",
245                                stream_name, event_name
246                            ))
247                        })?,
248                        members,
249                    })
250                } else {
251                    None
252                };
253
254                let payload = if let Some(payload_field_type) = event.payload_field_type.as_ref() {
255                    let mut members = Vec::new();
256                    for (member_name, member) in
257                        payload_field_type.members.iter().flat_map(|m| m.iter())
258                    {
259                        members.push(EventPayloadMemberParser {
260                            member_name: Intern::new(member_name.clone()),
261                            preferred_display_base: member.field_type.preferred_display_base(),
262                            enum_mappings: EnumerationMappings::from_struct_ft(&member.field_type),
263                            value: FieldTypeParser::from_ft(&member.field_type).map_err(|e| {
264                                Error::unsupported_ft(
265                                    format!(
266                                        "stream.{}.event-record-types.{}.payload-field-type.{}",
267                                        stream_name, event_name, member_name
268                                    ),
269                                    e,
270                                )
271                            })?,
272                        });
273                    }
274
275                    Some(EventPayloadParser {
276                        alignment: Size::from_bits(payload_field_type.alignment()).ok_or_else(
277                            || {
278                                Error::unsupported_alignment(format!(
279                                    "stream.{}.event-record-types.{}.payload-field-type",
280                                    stream_name, event_name
281                                ))
282                            },
283                        )?,
284                        members,
285                    })
286                } else {
287                    None
288                };
289
290                events.insert(
291                    event_id as EventId,
292                    EventParser {
293                        event_name: Intern::new(event_name.clone()),
294                        log_level: event.log_level,
295                        specific_context,
296                        payload,
297                    },
298                );
299            }
300
301            streams.insert(
302                stream_id as StreamId,
303                StreamParser {
304                    stream_name: Intern::new(stream_name.clone()),
305                    packet_context: PacketContextParser::new(
306                        PacketContextParserArgs {
307                            packet_size,
308                            content_size,
309                            beginning_timestamp,
310                            end_timestamp,
311                            events_discarded,
312                            sequence_number,
313                            extra_members: pc_extra_members,
314                            alignment: Size::from_bits(
315                                stream
316                                    .features
317                                    .packet
318                                    .alignment()
319                                    .max(pc_extra_member_alignment),
320                            )
321                            .ok_or_else(|| {
322                                Error::unsupported_alignment(format!(
323                                    "stream.{}.$features.packet",
324                                    stream_name
325                                ))
326                            })?,
327                        },
328                        &pkt_header.wire_size_hint,
329                    ),
330                    event_header: EventHeaderParser {
331                        event_id: UIntParser::from_uint_ft(
332                            &stream.features.event_record.type_id_field_type,
333                        )
334                        .map_err(|e| {
335                            Error::unsupported_ft(
336                                format!(
337                                    "stream.{}.$features.event-record.type-id-field-type",
338                                    stream_name
339                                ),
340                                e,
341                            )
342                        })?,
343                        timestamp: UIntParser::from_uint_ft(
344                            &stream.features.event_record.timestamp_field_type,
345                        )
346                        .map_err(|e| {
347                            Error::unsupported_ft(
348                                format!(
349                                    "stream.{}.$features.event-record.timestamp-field-type",
350                                    stream_name
351                                ),
352                                e,
353                            )
354                        })?,
355                        alignment: Size::from_bits(stream.features.event_record.alignment())
356                            .ok_or_else(|| {
357                                Error::unsupported_alignment(format!(
358                                    "stream.{}.$features.event-record",
359                                    stream_name
360                                ))
361                            })?,
362                    },
363                    common_context,
364                    events,
365                },
366            );
367        }
368
369        Ok(Self {
370            byte_order: cfg.trace.typ.native_byte_order,
371            trace_uuid: cfg.trace.typ.uuid,
372            pkt_header,
373            streams,
374            stream_clocks,
375        })
376    }
377
378    pub fn into_packet_decoder(self) -> PacketDecoder {
379        PacketDecoder {
380            parser: self,
381            state: PacketDecoderState::Header,
382        }
383    }
384
385    pub fn parse<R: Read>(&self, r: &mut R) -> Result<Packet, Error> {
386        let mut r = StreamReader::new(self.byte_order, r);
387
388        let header = self.parse_header(&mut r)?;
389
390        // Stream-specific from here on
391        let stream = self
392            .streams
393            .get(&header.stream_id)
394            .ok_or(Error::UndefinedStreamId(header.stream_id))?;
395
396        let context = Self::parse_packet_context(stream, &mut r)?;
397
398        let events = Self::parse_events(stream, &context, &mut r)?;
399
400        Ok(Packet {
401            header,
402            context,
403            events,
404        })
405    }
406
407    fn parse_header<R: Read>(&self, r: &mut StreamReader<R>) -> Result<PacketHeader, Error> {
408        // Align for packet header structure
409        r.align_to(self.pkt_header.alignment)?;
410
411        // Parse the header
412        let magic = self
413            .pkt_header
414            .magic
415            .as_ref()
416            .map(|p| p.parse(r))
417            .transpose()?
418            .map(|m| m as u32);
419        let trace_uuid = self
420            .pkt_header
421            .uuid
422            .as_ref()
423            .map(|p| p.parse(r))
424            .transpose()?;
425        let stream_id = self.pkt_header.stream_id.parse(r)?;
426        debug!(stream_id, ?magic, ?trace_uuid, "Parsed packet header");
427        if let Some(m) = magic {
428            if m != PacketHeader::MAGIC {
429                warn!(
430                    "Invalid packet header magic number 0x{m:X} (expected 0x{:X})",
431                    PacketHeader::MAGIC
432                );
433            }
434        }
435        if let (Some(uuid), Some(schema_uuid)) = (trace_uuid.as_ref(), self.trace_uuid.as_ref()) {
436            if uuid != schema_uuid {
437                warn!(
438                    trace_uuid = %uuid,
439                    %schema_uuid, "Trace type UUID doesn't match"
440                );
441            }
442        }
443
444        // Ensure the wire size hint is aligned with reality
445        debug_assert_eq!(
446            r.cursor_bits(),
447            self.pkt_header.wire_size_hint.cursor_bits()
448        );
449
450        let stream = self
451            .streams
452            .get(&stream_id)
453            .ok_or(Error::UndefinedStreamId(stream_id))?;
454
455        Ok(PacketHeader {
456            magic_number: magic,
457            trace_uuid,
458            stream_id,
459            stream_name: stream.stream_name,
460            clock_name: self.stream_clocks.get(&stream_id).copied(),
461        })
462    }
463
464    fn parse_packet_context<R: Read>(
465        stream: &StreamParser,
466        r: &mut StreamReader<R>,
467    ) -> Result<PacketContext, Error> {
468        // Align for packet context structure
469        r.align_to(stream.packet_context.alignment)?;
470
471        // Parse packet context
472        let pkt_size_bits = stream.packet_context.packet_size.parse(r)?;
473        let content_size_bits = stream.packet_context.content_size.parse(r)?;
474        let beginning_timestamp = stream
475            .packet_context
476            .beginning_timestamp
477            .as_ref()
478            .map(|p| p.parse(r))
479            .transpose()?;
480        let end_timestamp = stream
481            .packet_context
482            .end_timestamp
483            .as_ref()
484            .map(|p| p.parse(r))
485            .transpose()?;
486        let events_discarded = stream
487            .packet_context
488            .events_discarded
489            .as_ref()
490            .map(|p| p.parse(r))
491            .transpose()?;
492        let sequence_number = stream
493            .packet_context
494            .sequence_number
495            .as_ref()
496            .map(|p| p.parse(r))
497            .transpose()?;
498
499        // Align for and read each extra member
500        let mut extra_members = Vec::new();
501        for member in stream.packet_context.extra_members.iter() {
502            let val = member.parse(r)?;
503            extra_members.push((member.member_name, val));
504        }
505
506        debug!(
507            packet_size = pkt_size_bits,
508            content_size = content_size_bits,
509            ?events_discarded,
510            ?sequence_number,
511            "Parsed packet context"
512        );
513        // Ensure the wire size hint is aligned with reality
514        debug_assert_eq!(
515            r.cursor_bits(),
516            stream.packet_context.wire_size_hint.cursor_bits()
517        );
518
519        Ok(PacketContext {
520            packet_size_bits: pkt_size_bits as _,
521            content_size_bits: content_size_bits as _,
522            beginning_timestamp,
523            end_timestamp,
524            events_discarded,
525            sequence_number,
526            extra_members,
527        })
528    }
529
530    fn parse_events<R: Read>(
531        stream: &StreamParser,
532        packet_context: &PacketContext,
533        r: &mut StreamReader<R>,
534    ) -> Result<Vec<Event>, Error> {
535        let mut events = Vec::new();
536
537        // Read until we reach the end of the actual packet content
538        loop {
539            // Align for header structure
540            r.align_to(stream.event_header.alignment)?;
541
542            // Parse event header structure
543            let event_id = stream.event_header.event_id.parse(r)?;
544            let timestamp = stream.event_header.timestamp.parse(r)?;
545            debug!(event_id, timestamp, "Parsed event header");
546
547            // Align for common context structure
548            // Common context
549            let mut common_context = Vec::new();
550            if let Some(p) = stream.common_context.as_ref() {
551                // Align for common context structure
552                r.align_to(p.alignment)?;
553
554                // Align for and read each member
555                for member in p.members.iter() {
556                    let val = member.parse(r)?;
557                    common_context.push((member.member_name, val));
558                }
559            }
560
561            // Event-specific from here on
562            let event = stream
563                .events
564                .get(&event_id)
565                .ok_or(Error::UndefinedEventId(event_id))?;
566
567            // Specific context
568            let mut specific_context = Vec::new();
569            if let Some(p) = event.specific_context.as_ref() {
570                // Align for specific context structure
571                r.align_to(p.alignment)?;
572
573                // Align for and read each member
574                for member in p.members.iter() {
575                    let val = member.parse(r)?;
576                    specific_context.push((member.member_name, val));
577                }
578            }
579
580            // Payload
581            let mut payload = Vec::new();
582            if let Some(p) = event.payload.as_ref() {
583                // Align for payload structure
584                r.align_to(p.alignment)?;
585
586                // Align for and read each member
587                for member in p.members.iter() {
588                    let val = member.parse(r)?;
589                    payload.push((member.member_name, val));
590                }
591            }
592
593            events.push(Event {
594                id: event_id,
595                name: event.event_name,
596                timestamp,
597                log_level: event.log_level.map(LogLevel::from),
598                specific_context,
599                common_context,
600                payload,
601            });
602
603            // Done with actual packet data, may still be residual bits to skip over
604            debug_assert!(r.cursor_bits() <= packet_context.content_size_bits);
605            if r.cursor_bits() == packet_context.content_size_bits {
606                break;
607            }
608        }
609
610        // Skip the remaining in the packet
611        let remaining_bits = packet_context.packet_size_bits - packet_context.content_size_bits;
612        if remaining_bits != 0 {
613            let remaining_bytes = remaining_bits >> 3;
614            for _ in 0..remaining_bytes {
615                // No need to maintain alignment/etc, we're done with the reader
616                r.inner.read_u8()?;
617            }
618        }
619
620        Ok(events)
621    }
622}
623
624/// A barectf CTF byte-stream decoder.
625#[derive(Debug)]
626pub struct PacketDecoder {
627    parser: Parser,
628    state: PacketDecoderState,
629}
630
631#[derive(Debug)]
632enum PacketDecoderState {
633    Header,
634    PacketContext(PacketHeader, AlignedCursor),
635    Events(PacketHeader, PacketContext, AlignedCursor),
636}
637
638impl Decoder for PacketDecoder {
639    type Item = Packet;
640    type Error = Error;
641
642    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
643        // Loop until we've got a full packet or need more data
644        loop {
645            match std::mem::replace(&mut self.state, PacketDecoderState::Header) {
646                PacketDecoderState::Header => {
647                    if src.len() < self.parser.pkt_header.wire_size_hint.cursor_bytes() {
648                        // Not enough data for header
649                        self.state = PacketDecoderState::Header;
650                        return Ok(None);
651                    }
652
653                    let mut src_reader = src.reader();
654                    let mut r = StreamReader::new(self.parser.byte_order, &mut src_reader);
655                    let header = self.parser.parse_header(&mut r)?;
656                    let cursor = r.into_cursor();
657
658                    self.state = PacketDecoderState::PacketContext(header, cursor);
659                }
660                PacketDecoderState::PacketContext(header, cursor) => {
661                    // Stream-specific from here on
662                    let stream = self
663                        .parser
664                        .streams
665                        .get(&header.stream_id)
666                        .ok_or(Error::UndefinedStreamId(header.stream_id))?;
667
668                    let context_bytes_remaining =
669                        stream.packet_context.wire_size_hint.cursor_bytes() - cursor.cursor_bytes();
670                    if src.len() < context_bytes_remaining {
671                        // Not enough data for context
672                        self.state = PacketDecoderState::PacketContext(header, cursor);
673                        return Ok(None);
674                    }
675
676                    let mut src_reader = src.reader();
677                    let mut r = StreamReader::new_with_cursor(
678                        self.parser.byte_order,
679                        cursor,
680                        &mut src_reader,
681                    );
682
683                    let packet_context = Parser::parse_packet_context(stream, &mut r)?;
684                    let cursor = r.into_cursor();
685
686                    self.state = PacketDecoderState::Events(header, packet_context, cursor);
687                }
688                PacketDecoderState::Events(header, packet_context, cursor) => {
689                    let remaining_bytes = packet_context.packet_size() - cursor.cursor_bytes();
690                    if src.len() < remaining_bytes {
691                        // Not enough data for the remaining payload
692                        self.state = PacketDecoderState::Events(header, packet_context, cursor);
693                        return Ok(None);
694                    }
695
696                    let stream = self
697                        .parser
698                        .streams
699                        .get(&header.stream_id)
700                        .ok_or(Error::UndefinedStreamId(header.stream_id))?;
701
702                    let mut src_reader = src.reader();
703                    let mut r = StreamReader::new_with_cursor(
704                        self.parser.byte_order,
705                        cursor,
706                        &mut src_reader,
707                    );
708
709                    let events = Parser::parse_events(stream, &packet_context, &mut r)?;
710
711                    let pkt = Packet {
712                        header,
713                        context: packet_context,
714                        events,
715                    };
716                    self.state = PacketDecoderState::Header;
717                    return Ok(Some(pkt));
718                }
719            }
720        }
721    }
722}