barectf_parser/parser/
mod.rs

1use self::types::{
2    AlignedCursor, EnumerationMappings, EventHeaderParser, EventParser, EventPayloadMemberParser,
3    EventPayloadParser, FieldTypeParser, PacketContextParser, PacketContextParserArgs,
4    PacketHeaderParser, Size, StreamParser, StreamReader, UIntParser, UuidParser,
5};
6use crate::{
7    config::{ClockType, Config, FieldType, NativeByteOrder},
8    error::Error,
9    types::{Event, EventId, LogLevel, Packet, PacketContext, PacketHeader, StreamId},
10};
11use bytes::{Buf, BytesMut};
12use fxhash::FxHashMap;
13use internment::Intern;
14use itertools::Itertools;
15use std::io::Read;
16use tokio_util::codec::Decoder;
17use tracing::{debug, warn};
18use uuid::Uuid;
19
20pub(crate) mod types;
21
22/// A barectf CTF byte-stream parser.
23#[derive(Debug)]
24pub struct Parser {
25    byte_order: NativeByteOrder,
26    trace_uuid: Option<Uuid>,
27    pkt_header: PacketHeaderParser,
28    streams: FxHashMap<StreamId, StreamParser>,
29    stream_clocks: FxHashMap<StreamId, Intern<String>>,
30    stream_clock_types: FxHashMap<StreamId, Intern<ClockType>>,
31}
32
33impl Parser {
34    pub fn new(cfg: &Config) -> Result<Self, Error> {
35        // Do some basic semantic checks
36        if let Some(magic_ft) = cfg.trace.typ.features.magic_field_type.as_ft() {
37            if magic_ft.field_type.size != 32 {
38                return Err(Error::UnsupportedFieldType(
39                    "magic-field-type".to_owned(),
40                    magic_ft.field_type.size,
41                    magic_ft.field_type.alignment,
42                ));
43            }
44        }
45
46        let magic = UIntParser::from_opt_uint_ft(&cfg.trace.typ.features.magic_field_type)
47            .map_err(|e| Error::unsupported_ft("magic-field-type", e))?;
48        let uuid = UuidParser::from_bool_ft(cfg.trace.typ.features.uuid_field_type);
49        let stream_id =
50            UIntParser::from_uint_ft(&cfg.trace.typ.features.data_stream_type_id_field_type)
51                .map_err(|e| Error::unsupported_ft("data-stream-type-id-field-type", e))?;
52        let pkt_header = PacketHeaderParser::new(
53            magic,
54            uuid,
55            stream_id,
56            Size::from_bits(cfg.trace.typ.features.alignment())
57                .ok_or_else(|| Error::unsupported_alignment("trace.type.$features"))?,
58        );
59
60        // Per-stream packet parsers
61        // NOTE: barectf generates stream IDs based on alphabetical order of stream name
62        let mut streams = FxHashMap::default();
63        let mut stream_clocks = FxHashMap::default();
64        let mut stream_clock_types = FxHashMap::default();
65        for (stream_id, (stream_name, stream)) in cfg
66            .trace
67            .typ
68            .data_stream_types
69            .iter()
70            .sorted_by_key(|(name, _)| name.as_str())
71            .enumerate()
72        {
73            if let Some(default_clock) = stream.default_clock_type_name.as_ref() {
74                stream_clocks.insert(stream_id as StreamId, Intern::new(default_clock.to_owned()));
75                if let Some(clock_type) = cfg.trace.typ.clock_types.get(default_clock) {
76                    stream_clock_types
77                        .insert(stream_id as StreamId, Intern::new(clock_type.clone()));
78                }
79            }
80
81            // These are required by barectf
82            let packet_size = UIntParser::from_uint_ft(
83                &stream.features.packet.total_size_field_type,
84            )
85            .map_err(|e| {
86                Error::unsupported_ft(
87                    format!(
88                        "stream.{}.$features.packet.total-size-field-type",
89                        stream_name
90                    ),
91                    e,
92                )
93            })?;
94            let content_size = UIntParser::from_uint_ft(
95                &stream.features.packet.content_size_field_type,
96            )
97            .map_err(|e| {
98                Error::unsupported_ft(
99                    format!(
100                        "stream.{}.$features.packet.content-size-field-type",
101                        stream_name
102                    ),
103                    e,
104                )
105            })?;
106            let beginning_timestamp = UIntParser::from_opt_uint_ft(
107                &stream.features.packet.beginning_timestamp_field_type,
108            )
109            .map_err(|e| {
110                Error::unsupported_ft(
111                    format!(
112                        "stream.{}.$features.packet.beginning-timestamp-field-type",
113                        stream_name
114                    ),
115                    e,
116                )
117            })?;
118            let end_timestamp =
119                UIntParser::from_opt_uint_ft(&stream.features.packet.end_timestamp_field_type)
120                    .map_err(|e| {
121                        Error::unsupported_ft(
122                            format!(
123                                "stream.{}.$features.packet.end-timestamp-field-type",
124                                stream_name
125                            ),
126                            e,
127                        )
128                    })?;
129            let events_discarded = UIntParser::from_opt_uint_ft(
130                &stream
131                    .features
132                    .packet
133                    .discarded_event_records_counter_snapshot_field_type,
134            )
135            .map_err(|e| {
136                Error::unsupported_ft(
137                    format!("stream.{}.$features.packet.discarded-event-records-counter-snapshot-field-type", stream_name),
138                    e,
139                )
140            })?;
141            let sequence_number =
142                UIntParser::from_opt_uint_ft(&stream.features.packet.sequence_number_field_type)
143                    .map_err(|e| {
144                        Error::unsupported_ft(
145                            format!(
146                                "stream.{}.$features.packet.sequence-number-field-type",
147                                stream_name
148                            ),
149                            e,
150                        )
151                    })?;
152
153            let mut pc_extra_members = Vec::new();
154            let pc_extra_member_alignment =
155                stream.packet_context_field_type_extra_members.alignment();
156            for (pc_member_name, pc_member) in stream
157                .packet_context_field_type_extra_members
158                .0
159                .iter()
160                .flat_map(|m| m.iter())
161            {
162                pc_extra_members.push(EventPayloadMemberParser {
163                    member_name: Intern::new(pc_member_name.clone()),
164                    preferred_display_base: pc_member.field_type.preferred_display_base(),
165                    enum_mappings: EnumerationMappings::from_struct_ft(&pc_member.field_type),
166                    value: FieldTypeParser::from_ft(&pc_member.field_type).map_err(|e| {
167                        Error::unsupported_ft(
168                            format!(
169                                "stream.{}.packet-context-field-type-extra-members.{}",
170                                stream_name, pc_member_name
171                            ),
172                            e,
173                        )
174                    })?,
175                });
176            }
177
178            // Event common context
179            let common_context = if let Some(cc_field_type) =
180                stream.event_record_common_context_field_type.as_ref()
181            {
182                let mut members = Vec::new();
183                for (member_name, member) in cc_field_type.members.iter().flat_map(|m| m.iter()) {
184                    members.push(EventPayloadMemberParser {
185                        member_name: Intern::new(member_name.clone()),
186                        preferred_display_base: member.field_type.preferred_display_base(),
187                        enum_mappings: EnumerationMappings::from_struct_ft(&member.field_type),
188                        value: FieldTypeParser::from_ft(&member.field_type).map_err(|e| {
189                            Error::unsupported_ft(
190                                format!(
191                                    "stream.{}.event-record-common-context-field-type.{}",
192                                    stream_name, member_name
193                                ),
194                                e,
195                            )
196                        })?,
197                    });
198                }
199
200                Some(EventPayloadParser {
201                    alignment: Size::from_bits(cc_field_type.alignment()).ok_or_else(|| {
202                        Error::unsupported_alignment(format!(
203                            "stream.{}.event-record-common-context-field-type",
204                            stream_name
205                        ))
206                    })?,
207                    members,
208                })
209            } else {
210                None
211            };
212
213            // Per-event event parsers
214            // NOTE: barectf generates event IDs based on alphabetical order of event name
215            let mut events = FxHashMap::default();
216            for (event_id, (event_name, event)) in stream
217                .event_record_types
218                .iter()
219                .sorted_by_key(|(name, _)| name.as_str())
220                .enumerate()
221            {
222                let specific_context = if let Some(sc_field_type) =
223                    event.specific_context_field_type.as_ref()
224                {
225                    let mut members = Vec::new();
226                    for (member_name, member) in sc_field_type.members.iter().flat_map(|m| m.iter())
227                    {
228                        members.push(EventPayloadMemberParser {
229                            member_name: Intern::new(member_name.clone()),
230                            preferred_display_base: member.field_type.preferred_display_base(),
231                            enum_mappings: EnumerationMappings::from_struct_ft(&member.field_type),
232                            value: FieldTypeParser::from_ft(&member.field_type)
233                                .map_err(|e| {
234                                Error::unsupported_ft(
235                                    format!(
236                                        "stream.{}.event-record-types.{}.specific-context-field-type.{}",
237                                        stream_name,
238                                        event_name,
239                                        member_name
240                                    ),
241                                    e,
242                                )
243                            })?,
244                        });
245                    }
246
247                    Some(EventPayloadParser {
248                        alignment: Size::from_bits(sc_field_type.alignment()).ok_or_else(|| {
249                            Error::unsupported_alignment(format!(
250                                "stream.{}.event-record-types.{}.specific-context-field-type",
251                                stream_name, event_name
252                            ))
253                        })?,
254                        members,
255                    })
256                } else {
257                    None
258                };
259
260                let payload = if let Some(payload_field_type) = event.payload_field_type.as_ref() {
261                    let mut members = Vec::new();
262                    for (member_name, member) in
263                        payload_field_type.members.iter().flat_map(|m| m.iter())
264                    {
265                        members.push(EventPayloadMemberParser {
266                            member_name: Intern::new(member_name.clone()),
267                            preferred_display_base: member.field_type.preferred_display_base(),
268                            enum_mappings: EnumerationMappings::from_struct_ft(&member.field_type),
269                            value: FieldTypeParser::from_ft(&member.field_type).map_err(|e| {
270                                Error::unsupported_ft(
271                                    format!(
272                                        "stream.{}.event-record-types.{}.payload-field-type.{}",
273                                        stream_name, event_name, member_name
274                                    ),
275                                    e,
276                                )
277                            })?,
278                        });
279                    }
280
281                    Some(EventPayloadParser {
282                        alignment: Size::from_bits(payload_field_type.alignment()).ok_or_else(
283                            || {
284                                Error::unsupported_alignment(format!(
285                                    "stream.{}.event-record-types.{}.payload-field-type",
286                                    stream_name, event_name
287                                ))
288                            },
289                        )?,
290                        members,
291                    })
292                } else {
293                    None
294                };
295
296                events.insert(
297                    event_id as EventId,
298                    EventParser {
299                        event_name: Intern::new(event_name.clone()),
300                        log_level: event.log_level,
301                        specific_context,
302                        payload,
303                    },
304                );
305            }
306
307            streams.insert(
308                stream_id as StreamId,
309                StreamParser {
310                    stream_name: Intern::new(stream_name.clone()),
311                    packet_context: PacketContextParser::new(
312                        PacketContextParserArgs {
313                            packet_size,
314                            content_size,
315                            beginning_timestamp,
316                            end_timestamp,
317                            events_discarded,
318                            sequence_number,
319                            extra_members: pc_extra_members,
320                            alignment: Size::from_bits(
321                                stream
322                                    .features
323                                    .packet
324                                    .alignment()
325                                    .max(pc_extra_member_alignment),
326                            )
327                            .ok_or_else(|| {
328                                Error::unsupported_alignment(format!(
329                                    "stream.{}.$features.packet",
330                                    stream_name
331                                ))
332                            })?,
333                        },
334                        &pkt_header.wire_size_hint,
335                    ),
336                    event_header: EventHeaderParser {
337                        event_id: UIntParser::from_uint_ft(
338                            &stream.features.event_record.type_id_field_type,
339                        )
340                        .map_err(|e| {
341                            Error::unsupported_ft(
342                                format!(
343                                    "stream.{}.$features.event-record.type-id-field-type",
344                                    stream_name
345                                ),
346                                e,
347                            )
348                        })?,
349                        timestamp: UIntParser::from_uint_ft(
350                            &stream.features.event_record.timestamp_field_type,
351                        )
352                        .map_err(|e| {
353                            Error::unsupported_ft(
354                                format!(
355                                    "stream.{}.$features.event-record.timestamp-field-type",
356                                    stream_name
357                                ),
358                                e,
359                            )
360                        })?,
361                        alignment: Size::from_bits(stream.features.event_record.alignment())
362                            .ok_or_else(|| {
363                                Error::unsupported_alignment(format!(
364                                    "stream.{}.$features.event-record",
365                                    stream_name
366                                ))
367                            })?,
368                    },
369                    common_context,
370                    events,
371                },
372            );
373        }
374
375        Ok(Self {
376            byte_order: cfg.trace.typ.native_byte_order,
377            trace_uuid: cfg.trace.typ.uuid,
378            pkt_header,
379            streams,
380            stream_clocks,
381            stream_clock_types,
382        })
383    }
384
385    pub fn into_packet_decoder(self) -> PacketDecoder {
386        PacketDecoder {
387            parser: self,
388            state: PacketDecoderState::Header,
389        }
390    }
391
392    pub fn parse<R: Read>(&self, r: &mut R) -> Result<Packet, Error> {
393        let mut r = StreamReader::new(self.byte_order, r);
394
395        let header = self.parse_header(&mut r)?;
396
397        // Stream-specific from here on
398        let stream = self
399            .streams
400            .get(&header.stream_id)
401            .ok_or(Error::UndefinedStreamId(header.stream_id))?;
402
403        let context = Self::parse_packet_context(stream, &mut r)?;
404
405        let events = Self::parse_events(stream, &context, &mut r)?;
406
407        Ok(Packet {
408            header,
409            context,
410            events,
411        })
412    }
413
414    fn parse_header<R: Read>(&self, r: &mut StreamReader<R>) -> Result<PacketHeader, Error> {
415        // Align for packet header structure
416        r.align_to(self.pkt_header.alignment)?;
417
418        // Parse the header
419        let magic = self
420            .pkt_header
421            .magic
422            .as_ref()
423            .map(|p| p.parse(r))
424            .transpose()?
425            .map(|m| m as u32);
426        let trace_uuid = self
427            .pkt_header
428            .uuid
429            .as_ref()
430            .map(|p| p.parse(r))
431            .transpose()?;
432        let stream_id = self.pkt_header.stream_id.parse(r)?;
433        debug!(stream_id, ?magic, ?trace_uuid, "Parsed packet header");
434        if let Some(m) = magic {
435            if m != PacketHeader::MAGIC {
436                warn!(
437                    "Invalid packet header magic number 0x{m:X} (expected 0x{:X})",
438                    PacketHeader::MAGIC
439                );
440            }
441        }
442        if let (Some(uuid), Some(schema_uuid)) = (trace_uuid.as_ref(), self.trace_uuid.as_ref()) {
443            if uuid != schema_uuid {
444                warn!(
445                    trace_uuid = %uuid,
446                    %schema_uuid, "Trace type UUID doesn't match"
447                );
448            }
449        }
450
451        // Ensure the wire size hint is aligned with reality
452        debug_assert_eq!(
453            r.cursor_bits(),
454            self.pkt_header.wire_size_hint.cursor_bits()
455        );
456
457        let stream = self
458            .streams
459            .get(&stream_id)
460            .ok_or(Error::UndefinedStreamId(stream_id))?;
461
462        Ok(PacketHeader {
463            magic_number: magic,
464            trace_uuid,
465            stream_id,
466            stream_name: stream.stream_name,
467            clock_name: self.stream_clocks.get(&stream_id).copied(),
468            clock_type: self.stream_clock_types.get(&stream_id).copied(),
469        })
470    }
471
472    fn parse_packet_context<R: Read>(
473        stream: &StreamParser,
474        r: &mut StreamReader<R>,
475    ) -> Result<PacketContext, Error> {
476        // Align for packet context structure
477        r.align_to(stream.packet_context.alignment)?;
478
479        // Parse packet context
480        let pkt_size_bits = stream.packet_context.packet_size.parse(r)?;
481        let content_size_bits = stream.packet_context.content_size.parse(r)?;
482        let beginning_timestamp = stream
483            .packet_context
484            .beginning_timestamp
485            .as_ref()
486            .map(|p| p.parse(r))
487            .transpose()?;
488        let end_timestamp = stream
489            .packet_context
490            .end_timestamp
491            .as_ref()
492            .map(|p| p.parse(r))
493            .transpose()?;
494        let events_discarded = stream
495            .packet_context
496            .events_discarded
497            .as_ref()
498            .map(|p| p.parse(r))
499            .transpose()?;
500        let sequence_number = stream
501            .packet_context
502            .sequence_number
503            .as_ref()
504            .map(|p| p.parse(r))
505            .transpose()?;
506
507        // Align for and read each extra member
508        let mut extra_members = Vec::new();
509        for member in stream.packet_context.extra_members.iter() {
510            let val = member.parse(r)?;
511            extra_members.push((member.member_name, val));
512        }
513
514        debug!(
515            packet_size = pkt_size_bits,
516            content_size = content_size_bits,
517            ?events_discarded,
518            ?sequence_number,
519            "Parsed packet context"
520        );
521        // Ensure the wire size hint is aligned with reality
522        debug_assert_eq!(
523            r.cursor_bits(),
524            stream.packet_context.wire_size_hint.cursor_bits()
525        );
526
527        Ok(PacketContext {
528            packet_size_bits: pkt_size_bits as _,
529            content_size_bits: content_size_bits as _,
530            beginning_timestamp,
531            end_timestamp,
532            events_discarded,
533            sequence_number,
534            extra_members,
535        })
536    }
537
538    fn parse_events<R: Read>(
539        stream: &StreamParser,
540        packet_context: &PacketContext,
541        r: &mut StreamReader<R>,
542    ) -> Result<Vec<Event>, Error> {
543        let mut events = Vec::new();
544
545        // Read until we reach the end of the actual packet content
546        loop {
547            // Align for header structure
548            r.align_to(stream.event_header.alignment)?;
549
550            // Parse event header structure
551            let event_id = stream.event_header.event_id.parse(r)?;
552            let timestamp = stream.event_header.timestamp.parse(r)?;
553            debug!(event_id, timestamp, "Parsed event header");
554
555            // Align for common context structure
556            // Common context
557            let mut common_context = Vec::new();
558            if let Some(p) = stream.common_context.as_ref() {
559                // Align for common context structure
560                r.align_to(p.alignment)?;
561
562                // Align for and read each member
563                for member in p.members.iter() {
564                    let val = member.parse(r)?;
565                    common_context.push((member.member_name, val));
566                }
567            }
568
569            // Event-specific from here on
570            let event = stream
571                .events
572                .get(&event_id)
573                .ok_or(Error::UndefinedEventId(event_id))?;
574
575            // Specific context
576            let mut specific_context = Vec::new();
577            if let Some(p) = event.specific_context.as_ref() {
578                // Align for specific context structure
579                r.align_to(p.alignment)?;
580
581                // Align for and read each member
582                for member in p.members.iter() {
583                    let val = member.parse(r)?;
584                    specific_context.push((member.member_name, val));
585                }
586            }
587
588            // Payload
589            let mut payload = Vec::new();
590            if let Some(p) = event.payload.as_ref() {
591                // Align for payload structure
592                r.align_to(p.alignment)?;
593
594                // Align for and read each member
595                for member in p.members.iter() {
596                    let val = member.parse(r)?;
597                    payload.push((member.member_name, val));
598                }
599            }
600
601            events.push(Event {
602                id: event_id,
603                name: event.event_name,
604                timestamp,
605                log_level: event.log_level.map(LogLevel::from),
606                specific_context,
607                common_context,
608                payload,
609            });
610
611            // Done with actual packet data, may still be residual bits to skip over
612            debug_assert!(r.cursor_bits() <= packet_context.content_size_bits);
613            if r.cursor_bits() == packet_context.content_size_bits {
614                break;
615            }
616        }
617
618        // Skip the remaining in the packet
619        let remaining_bits = packet_context.packet_size_bits - packet_context.content_size_bits;
620        if remaining_bits != 0 {
621            let remaining_bytes = remaining_bits >> 3;
622            for _ in 0..remaining_bytes {
623                // No need to maintain alignment/etc, we're done with the reader
624                r.inner.read_u8()?;
625            }
626        }
627
628        Ok(events)
629    }
630}
631
632/// A barectf CTF byte-stream decoder.
633#[derive(Debug)]
634pub struct PacketDecoder {
635    parser: Parser,
636    state: PacketDecoderState,
637}
638
639#[derive(Debug)]
640enum PacketDecoderState {
641    Header,
642    PacketContext(PacketHeader, AlignedCursor),
643    Events(PacketHeader, PacketContext, AlignedCursor),
644}
645
646impl Decoder for PacketDecoder {
647    type Item = Packet;
648    type Error = Error;
649
650    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
651        // Loop until we've got a full packet or need more data
652        loop {
653            match std::mem::replace(&mut self.state, PacketDecoderState::Header) {
654                PacketDecoderState::Header => {
655                    if src.len() < self.parser.pkt_header.wire_size_hint.cursor_bytes() {
656                        // Not enough data for header
657                        self.state = PacketDecoderState::Header;
658                        return Ok(None);
659                    }
660
661                    let mut src_reader = src.reader();
662                    let mut r = StreamReader::new(self.parser.byte_order, &mut src_reader);
663                    let header = self.parser.parse_header(&mut r)?;
664                    let cursor = r.into_cursor();
665
666                    self.state = PacketDecoderState::PacketContext(header, cursor);
667                }
668                PacketDecoderState::PacketContext(header, cursor) => {
669                    // Stream-specific from here on
670                    let stream = self
671                        .parser
672                        .streams
673                        .get(&header.stream_id)
674                        .ok_or(Error::UndefinedStreamId(header.stream_id))?;
675
676                    let context_bytes_remaining =
677                        stream.packet_context.wire_size_hint.cursor_bytes() - cursor.cursor_bytes();
678                    if src.len() < context_bytes_remaining {
679                        // Not enough data for context
680                        self.state = PacketDecoderState::PacketContext(header, cursor);
681                        return Ok(None);
682                    }
683
684                    let mut src_reader = src.reader();
685                    let mut r = StreamReader::new_with_cursor(
686                        self.parser.byte_order,
687                        cursor,
688                        &mut src_reader,
689                    );
690
691                    let packet_context = Parser::parse_packet_context(stream, &mut r)?;
692                    let cursor = r.into_cursor();
693
694                    self.state = PacketDecoderState::Events(header, packet_context, cursor);
695                }
696                PacketDecoderState::Events(header, packet_context, cursor) => {
697                    let remaining_bytes = packet_context.packet_size() - cursor.cursor_bytes();
698                    if src.len() < remaining_bytes {
699                        // Not enough data for the remaining payload
700                        self.state = PacketDecoderState::Events(header, packet_context, cursor);
701                        return Ok(None);
702                    }
703
704                    let stream = self
705                        .parser
706                        .streams
707                        .get(&header.stream_id)
708                        .ok_or(Error::UndefinedStreamId(header.stream_id))?;
709
710                    let mut src_reader = src.reader();
711                    let mut r = StreamReader::new_with_cursor(
712                        self.parser.byte_order,
713                        cursor,
714                        &mut src_reader,
715                    );
716
717                    let events = Parser::parse_events(stream, &packet_context, &mut r)?;
718
719                    let pkt = Packet {
720                        header,
721                        context: packet_context,
722                        events,
723                    };
724                    self.state = PacketDecoderState::Header;
725                    return Ok(Some(pkt));
726                }
727            }
728        }
729    }
730}