Skip to main content

linux_perf_data/
file_reader.rs

1use byteorder::{BigEndian, ByteOrder, LittleEndian};
2use linear_map::LinearMap;
3use linux_perf_event_reader::{
4    get_record_id, get_record_identifier, get_record_timestamp, AttrFlags, Endianness,
5    PerfEventHeader, RawData, RawEventRecord, RecordIdParseInfo, RecordParseInfo, RecordType,
6    SampleFormat,
7};
8
9use std::collections::{HashMap, VecDeque};
10use std::io::{Cursor, Read, Seek, SeekFrom};
11
12#[cfg(feature = "zstd")]
13use crate::decompression::ZstdDecompressor;
14
15use super::auxtrace::Auxtrace;
16use super::error::{Error, ReadError};
17use super::feature_sections::AttributeDescription;
18use super::features::Feature;
19use super::header::{PerfHeader, PerfPipeHeader};
20use super::perf_file::PerfFile;
21use super::record::{HeaderAttr, HeaderFeature, PerfFileRecord, RawUserRecord, UserRecordType};
22use super::section::PerfFileSection;
23use super::simpleperf;
24use super::sorter::Sorter;
25
26/// How many records between synthetic round boundaries when reading simpleperf
27/// files. Simpleperf heap-merges per-CPU buffers per wake-up but not globally,
28/// so a thread migrating across CPUs at a wake-up boundary can produce small
29/// clusters of out-of-order records. A delta of up to ~200 records has been
30/// observed in practice; 1024 gives comfortable headroom while keeping the
31/// sorter's in-flight buffer to O(2 * 1024) records.
32const SIMPLEPERF_SYNTHETIC_ROUND_SIZE: usize = 1024;
33
34/// A parser for the perf.data file format.
35///
36/// # Example
37///
38/// ```
39/// use linux_perf_data::{AttributeDescription, PerfFileReader, PerfFileRecord};
40///
41/// # fn wrapper() -> Result<(), linux_perf_data::Error> {
42/// let file = std::fs::File::open("perf.data")?;
43/// let reader = std::io::BufReader::new(file);
44/// let PerfFileReader { mut perf_file, mut record_iter } = PerfFileReader::parse_file(reader)?;
45/// let event_names: Vec<_> =
46///     perf_file.event_attributes().iter().filter_map(AttributeDescription::name).collect();
47/// println!("perf events: {}", event_names.join(", "));
48///
49/// while let Some(record) = record_iter.next_record(&mut perf_file)? {
50///     match record {
51///         PerfFileRecord::EventRecord { attr_index, record } => {
52///             let record_type = record.record_type;
53///             let parsed_record = record.parse()?;
54///             println!("{:?} for event {}: {:?}", record_type, attr_index, parsed_record);
55///         }
56///         PerfFileRecord::UserRecord(record) => {
57///             let record_type = record.record_type;
58///             let parsed_record = record.parse()?;
59///             println!("{:?}: {:?}", record_type, parsed_record);
60///         }
61///     }
62/// }
63/// # Ok(())
64/// # }
65/// ```
66pub struct PerfFileReader<R: Read> {
67    pub perf_file: PerfFile,
68    pub record_iter: PerfRecordIter<R>,
69}
70
71impl<C: Read + Seek> PerfFileReader<C> {
72    pub fn parse_file(mut cursor: C) -> Result<Self, Error> {
73        let header = PerfHeader::parse(&mut cursor)?;
74        match &header.magic {
75            b"PERFILE2" => {
76                Self::parse_file_impl::<LittleEndian>(cursor, header, Endianness::LittleEndian)
77            }
78            b"2ELIFREP" => {
79                Self::parse_file_impl::<BigEndian>(cursor, header, Endianness::BigEndian)
80            }
81            _ => Err(Error::UnrecognizedMagicValue(header.magic)),
82        }
83    }
84
85    fn parse_file_impl<T>(
86        mut cursor: C,
87        header: PerfHeader,
88        endian: Endianness,
89    ) -> Result<Self, Error>
90    where
91        T: ByteOrder,
92    {
93        // Read the section information for each feature, starting just after the data section.
94        let feature_pos = header.data_section.offset + header.data_section.size;
95        cursor.seek(SeekFrom::Start(feature_pos))?;
96        let mut feature_sections_info = Vec::new();
97        for feature in header.features.iter() {
98            let section = PerfFileSection::parse::<_, T>(&mut cursor)?;
99            feature_sections_info.push((feature, section));
100        }
101
102        let mut feature_sections = LinearMap::new();
103        for (feature, section) in feature_sections_info {
104            let offset = section.offset;
105            let size = usize::try_from(section.size).map_err(|_| Error::SectionSizeTooBig)?;
106            let mut data = vec![0; size];
107            cursor.seek(SeekFrom::Start(offset))?;
108            cursor.read_exact(&mut data)?;
109            feature_sections.insert(feature, data);
110        }
111
112        let attributes =
113            if let Some(event_desc_section) = feature_sections.get(&Feature::EVENT_DESC) {
114                AttributeDescription::parse_event_desc_section::<_, T>(Cursor::new(
115                    &event_desc_section[..],
116                ))?
117            } else if header.event_types_section.size != 0 {
118                AttributeDescription::parse_event_types_section::<_, T>(
119                    &mut cursor,
120                    &header.event_types_section,
121                    header.attr_size,
122                )?
123            } else if let Some(simpleperf_meta_info) =
124                feature_sections.get(&Feature::SIMPLEPERF_META_INFO)
125            {
126                let info_map = simpleperf::parse_meta_info_map(&simpleperf_meta_info[..])?;
127                let event_types = simpleperf::get_event_types(&info_map)
128                    .ok_or(Error::NoEventTypesInSimpleperfMetaInfo)?;
129                AttributeDescription::parse_simpleperf_attr_section::<_, T>(
130                    &mut cursor,
131                    &header.attr_section,
132                    header.attr_size,
133                    &event_types,
134                )?
135            } else {
136                AttributeDescription::parse_attr_section::<_, T>(
137                    &mut cursor,
138                    &header.attr_section,
139                    header.attr_size,
140                )?
141            };
142
143        let mut event_id_to_attr_index = HashMap::new();
144        for (attr_index, AttributeDescription { event_ids, .. }) in attributes.iter().enumerate() {
145            for event_id in event_ids {
146                event_id_to_attr_index.insert(*event_id, attr_index);
147            }
148        }
149
150        let parse_infos: Vec<_> = attributes
151            .iter()
152            .map(|attr| RecordParseInfo::new(&attr.attr, endian))
153            .collect();
154
155        let first_attr = attributes.first().ok_or(Error::NoAttributes)?;
156
157        let first_has_sample_id_all = first_attr.attr.flags.contains(AttrFlags::SAMPLE_ID_ALL);
158        let (first_parse_info, remaining_parse_infos) = parse_infos.split_first().unwrap();
159
160        let id_parse_infos = if remaining_parse_infos.is_empty() {
161            IdParseInfos::OnlyOneEvent
162        } else if remaining_parse_infos
163            .iter()
164            .all(|parse_info| parse_info.id_parse_info == first_parse_info.id_parse_info)
165        {
166            IdParseInfos::Same(first_parse_info.id_parse_info)
167        } else {
168            // Make sure that all attributes have IDENTIFIER and the same SAMPLE_ID_ALL setting.
169            // Otherwise we won't be able to know which attr a record belongs to; we need to know
170            // the record's ID for that, and we can only read the ID if it's in the same location
171            // regardless of attr.
172            // In theory we could make the requirements weaker, and take the record type into
173            // account for disambiguation. For example, if there are two events, but one of them
174            // only creates SAMPLE records and the other only non-SAMPLE records, we don't
175            // necessarily need IDENTIFIER in order to be able to read the record ID.
176            for (attr_index, AttributeDescription { attr, .. }) in attributes.iter().enumerate() {
177                if !attr.sample_format.contains(SampleFormat::IDENTIFIER) {
178                    return Err(Error::NoIdentifierDespiteMultiEvent(attr_index));
179                }
180                if attr.flags.contains(AttrFlags::SAMPLE_ID_ALL) != first_has_sample_id_all {
181                    return Err(Error::InconsistentSampleIdAllWithMultiEvent(attr_index));
182                }
183            }
184
185            IdParseInfos::PerAttribute(first_has_sample_id_all)
186        };
187
188        // Move the cursor to the start of the data section so that we can start
189        // reading records from it.
190        cursor.seek(SeekFrom::Start(header.data_section.offset))?;
191
192        let is_simpleperf = feature_sections
193            .get(&Feature::SIMPLEPERF_META_INFO)
194            .is_some();
195
196        let perf_file = PerfFile {
197            endian,
198            features: header.features,
199            feature_sections,
200            attributes,
201        };
202
203        let record_iter = PerfRecordIter {
204            reader: cursor,
205            endian,
206            id_parse_infos,
207            parse_infos,
208            event_id_to_attr_index,
209            read_offset: 0,
210            record_data_len: Some(header.data_section.size),
211            sorter: Sorter::new(),
212            synthetic_round_size: if is_simpleperf {
213                Some(SIMPLEPERF_SYNTHETIC_ROUND_SIZE)
214            } else {
215                None
216            },
217            records_since_last_finished_round: 0,
218            buffers_for_recycling: VecDeque::new(),
219            current_event_body: Vec::new(),
220            pending_first_record: None,
221            #[cfg(feature = "zstd")]
222            zstd_decompressor: ZstdDecompressor::new(),
223            #[cfg(feature = "zstd")]
224            pending_decompressed_data: Vec::new(),
225        };
226
227        Ok(Self {
228            perf_file,
229            record_iter,
230        })
231    }
232}
233
234impl<R: Read> PerfFileReader<R> {
235    /// Parse a perf.data file in pipe mode (streaming format).
236    ///
237    /// Pipe mode is designed for streaming and does not require seeking.
238    /// Metadata (attributes and features) is embedded in the stream as
239    /// synthesized records (PERF_RECORD_HEADER_ATTR, PERF_RECORD_HEADER_FEATURE).
240    pub fn parse_pipe(mut reader: R) -> Result<Self, Error> {
241        let pipe_header = PerfPipeHeader::parse(&mut reader)?;
242        match &pipe_header.magic {
243            b"PERFILE2" => Self::parse_pipe_impl::<LittleEndian>(reader, Endianness::LittleEndian),
244            b"2ELIFREP" => Self::parse_pipe_impl::<BigEndian>(reader, Endianness::BigEndian),
245            _ => Err(Error::UnrecognizedMagicValue(pipe_header.magic)),
246        }
247    }
248
249    fn parse_pipe_impl<T: ByteOrder>(mut reader: R, endian: Endianness) -> Result<Self, Error> {
250        let mut attributes = Vec::new();
251        let mut feature_sections = LinearMap::new();
252        let mut pending_first_record: Option<(PerfEventHeader, Vec<u8>)> = None;
253
254        // Read records from the stream until we hit a non-metadata record or EOF
255        loop {
256            let header = match PerfEventHeader::parse::<_, T>(&mut reader) {
257                Ok(header) => header,
258                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
259                    // Stream ended with only metadata records - this is valid
260                    break;
261                }
262                Err(e) => return Err(e.into()),
263            };
264
265            let size = header.size as usize;
266            if size < PerfEventHeader::STRUCT_SIZE {
267                return Err(Error::InvalidPerfEventSize);
268            }
269
270            let event_body_len = size - PerfEventHeader::STRUCT_SIZE;
271            let mut buffer = vec![0; event_body_len];
272            match reader.read_exact(&mut buffer) {
273                Ok(()) => {}
274                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
275                    // Incomplete record at end of stream
276                    return Err(e.into());
277                }
278                Err(e) => return Err(e.into()),
279            }
280
281            let record_type = RecordType(header.type_);
282
283            match UserRecordType::try_from(record_type) {
284                Some(UserRecordType::PERF_HEADER_ATTR) => {
285                    let data = RawData::from(&buffer[..]);
286                    let header_attr = HeaderAttr::parse::<T>(data)?;
287
288                    attributes.push(AttributeDescription {
289                        attr: header_attr.attr,
290                        name: None,
291                        event_ids: header_attr.ids,
292                    });
293                }
294                Some(UserRecordType::PERF_HEADER_FEATURE) => {
295                    let data = RawData::from(&buffer[..]);
296                    let header_feature = HeaderFeature::parse::<T>(data)?;
297
298                    feature_sections.insert(header_feature.feature, header_feature.data);
299                }
300                _ => {
301                    // Not a metadata record - this is the first real event
302                    pending_first_record = Some((header, buffer));
303                    break;
304                }
305            }
306        }
307
308        if attributes.is_empty() {
309            return Err(Error::NoAttributes);
310        }
311
312        if let Some(event_desc_data) = feature_sections.get(&Feature::EVENT_DESC) {
313            let event_desc_attrs = AttributeDescription::parse_event_desc_section::<_, T>(
314                Cursor::new(&event_desc_data[..]),
315            )?;
316
317            // Match attributes by event IDs and update names
318            for attr in attributes.iter_mut() {
319                // Find matching event in EVENT_DESC by comparing event IDs
320                if let Some(event_desc_attr) = event_desc_attrs.iter().find(|desc| {
321                    !desc.event_ids.is_empty()
322                        && !attr.event_ids.is_empty()
323                        && desc.event_ids[0] == attr.event_ids[0]
324                }) {
325                    attr.name = event_desc_attr.name.clone();
326                }
327            }
328        }
329
330        let mut event_id_to_attr_index = HashMap::new();
331        for (attr_index, AttributeDescription { event_ids, .. }) in attributes.iter().enumerate() {
332            for event_id in event_ids {
333                event_id_to_attr_index.insert(*event_id, attr_index);
334            }
335        }
336
337        let parse_infos: Vec<_> = attributes
338            .iter()
339            .map(|attr| RecordParseInfo::new(&attr.attr, endian))
340            .collect();
341
342        let first_attr = attributes.first().ok_or(Error::NoAttributes)?;
343        let first_has_sample_id_all = first_attr.attr.flags.contains(AttrFlags::SAMPLE_ID_ALL);
344        let (first_parse_info, remaining_parse_infos) = parse_infos.split_first().unwrap();
345
346        let id_parse_infos = if remaining_parse_infos.is_empty() {
347            IdParseInfos::OnlyOneEvent
348        } else if remaining_parse_infos
349            .iter()
350            .all(|parse_info| parse_info.id_parse_info == first_parse_info.id_parse_info)
351        {
352            IdParseInfos::Same(first_parse_info.id_parse_info)
353        } else {
354            for (attr_index, AttributeDescription { attr, .. }) in attributes.iter().enumerate() {
355                if !attr.sample_format.contains(SampleFormat::IDENTIFIER) {
356                    return Err(Error::NoIdentifierDespiteMultiEvent(attr_index));
357                }
358                if attr.flags.contains(AttrFlags::SAMPLE_ID_ALL) != first_has_sample_id_all {
359                    return Err(Error::InconsistentSampleIdAllWithMultiEvent(attr_index));
360                }
361            }
362            IdParseInfos::PerAttribute(first_has_sample_id_all)
363        };
364
365        // Infer features from the feature_sections we collected
366        let mut features_array = [0u64; 4];
367        for feature in feature_sections.keys() {
368            let feature_bit = feature.0;
369            if feature_bit < 256 {
370                let chunk_index = (feature_bit / 64) as usize;
371                let bit_in_chunk = feature_bit % 64;
372                features_array[chunk_index] |= 1u64 << bit_in_chunk;
373            }
374        }
375
376        let is_simpleperf = feature_sections
377            .get(&Feature::SIMPLEPERF_META_INFO)
378            .is_some();
379
380        let perf_file = PerfFile {
381            endian,
382            features: super::features::FeatureSet(features_array),
383            feature_sections,
384            attributes,
385        };
386
387        let record_iter = PerfRecordIter {
388            reader,
389            endian,
390            id_parse_infos,
391            parse_infos,
392            event_id_to_attr_index,
393            read_offset: 0,
394            record_data_len: None, // Unbounded for pipes
395            sorter: Sorter::new(),
396            synthetic_round_size: if is_simpleperf {
397                Some(SIMPLEPERF_SYNTHETIC_ROUND_SIZE)
398            } else {
399                None
400            },
401            records_since_last_finished_round: 0,
402            buffers_for_recycling: VecDeque::new(),
403            current_event_body: Vec::new(),
404            pending_first_record,
405            #[cfg(feature = "zstd")]
406            zstd_decompressor: ZstdDecompressor::new(),
407            #[cfg(feature = "zstd")]
408            pending_decompressed_data: Vec::new(),
409        };
410
411        Ok(Self {
412            perf_file,
413            record_iter,
414        })
415    }
416}
417
418/// An iterator which incrementally reads and sorts the records from a perf.data file.
419pub struct PerfRecordIter<R: Read> {
420    reader: R,
421    endian: Endianness,
422    read_offset: u64,
423    /// None for pipe mode
424    record_data_len: Option<u64>,
425    current_event_body: Vec<u8>,
426    id_parse_infos: IdParseInfos,
427    /// Guaranteed to have at least one element
428    parse_infos: Vec<RecordParseInfo>,
429    event_id_to_attr_index: HashMap<u64, usize>,
430    sorter: Sorter<RecordSortKey, PendingRecord>,
431    /// If `Some(n)`, inject a synthetic `finish_round()` call every `n`
432    /// records. Used for files whose writer (e.g. simpleperf) already emits
433    /// records in near-timestamp order but doesn't write
434    /// `PERF_RECORD_FINISHED_ROUND` markers. The synthetic rounds bound the
435    /// sorter's buffering — without them the sorter would hold the entire
436    /// file. The chosen `n` must comfortably exceed the worst-case
437    /// out-of-order window (in record count) of the writer.
438    synthetic_round_size: Option<usize>,
439    /// Counter used together with `synthetic_round_size`.
440    records_since_last_finished_round: usize,
441    buffers_for_recycling: VecDeque<Vec<u8>>,
442    /// For pipe mode: the first non-metadata record that was read during initialization
443    pending_first_record: Option<(PerfEventHeader, Vec<u8>)>,
444    /// Zstd decompressor for handling COMPRESSED records
445    #[cfg(feature = "zstd")]
446    zstd_decompressor: ZstdDecompressor,
447    /// Decompressed data from the end of previous compressed record which
448    /// wasn't enough to form a full record and needs to be concatenated
449    /// with upcoming decompressed data.
450    #[cfg(feature = "zstd")]
451    pending_decompressed_data: Vec<u8>,
452}
453
454impl<R: Read> PerfRecordIter<R> {
455    /// Iterates the records in this file. The records are *mostly* emitted
456    /// in the correct order, i.e. sorted by time.
457    ///
458    /// `next_record` does some internal buffering so that the sort order can
459    /// be guaranteed. This buffering takes advantage of `FINISHED_ROUND`
460    /// records so that we don't buffer more records than necessary.
461    ///
462    /// However, `FINISHED_ROUND` records are only emitted by Linux perf, not
463    /// by Android simpleperf. When reading simpleperf files, the following
464    /// caveats apply:
465    ///
466    /// - `MMAP` / `MMAP2` records can appear out-of-order and with non-sensical
467    ///   timestamps.
468    /// - Other records can, in very rare cases, appear out-of-order if they were
469    ///   originally emitted by different CPUs.
470    pub fn next_record(
471        &mut self,
472        _perf_file: &mut PerfFile,
473    ) -> Result<Option<PerfFileRecord<'_>>, Error> {
474        if !self.sorter.has_more() {
475            self.read_next_round()?;
476        }
477        if let Some(pending_record) = self.sorter.get_next() {
478            let record = self.convert_pending_record(pending_record);
479            return Ok(Some(record));
480        }
481        Ok(None)
482    }
483
484    /// Reads events into self.sorter until a round boundary is found
485    /// and self.sorter is non-empty, or until we've run out of records to read.
486    fn read_next_round(&mut self) -> Result<(), Error> {
487        if self.endian == Endianness::LittleEndian {
488            self.read_next_round_impl::<byteorder::LittleEndian>()
489        } else {
490            self.read_next_round_impl::<byteorder::BigEndian>()
491        }
492    }
493
494    /// Reads events into self.sorter until a round boundary is found (either
495    /// a FINISHED_ROUND record or a synthetic boundary for simpleperf files)
496    /// and self.sorter is non-empty, or until we've run out of records to read.
497    fn read_next_round_impl<T: ByteOrder>(&mut self) -> Result<(), Error> {
498        // Handle pending first record from pipe mode initialization
499        if let Some((pending_header, pending_buffer)) = self.pending_first_record.take() {
500            self.process_record::<T>(pending_header, pending_buffer, Some(self.read_offset))?;
501            self.read_offset += u64::from(pending_header.size);
502        }
503
504        while self
505            .record_data_len
506            .is_none_or(|len| self.read_offset < len)
507        {
508            let offset = self.read_offset;
509
510            // Try to parse the next header. For pipe mode (unbounded), EOF is normal termination.
511            let header = match PerfEventHeader::parse::<_, T>(&mut self.reader) {
512                Ok(header) => header,
513                Err(e) => {
514                    // For pipe mode with unbounded length, EOF just means end of stream
515                    if self.record_data_len.is_none()
516                        && e.kind() == std::io::ErrorKind::UnexpectedEof
517                    {
518                        break;
519                    }
520                    return Err(e.into());
521                }
522            };
523
524            let size = header.size as usize;
525            if size < PerfEventHeader::STRUCT_SIZE {
526                return Err(Error::InvalidPerfEventSize);
527            }
528            self.read_offset += u64::from(header.size);
529
530            let user_record_type = UserRecordType::try_from(RecordType(header.type_));
531
532            if user_record_type == Some(UserRecordType::PERF_FINISHED_ROUND) {
533                self.sorter.finish_round();
534                self.records_since_last_finished_round = 0;
535                if self.sorter.has_more() {
536                    // The sorter is non-empty. We're done.
537                    return Ok(());
538                }
539
540                // Keep going so that we never exit the loop with sorter
541                // being empty, unless we've truly run out of data to read.
542                continue;
543            }
544
545            let event_body_len = size - PerfEventHeader::STRUCT_SIZE;
546            let mut buffer = self.buffers_for_recycling.pop_front().unwrap_or_default();
547            buffer.resize(event_body_len, 0);
548            // Try to read the event body. For pipe mode, EOF here also means end of stream.
549            match self.reader.read_exact(&mut buffer) {
550                Ok(()) => {}
551                Err(e) => {
552                    // For pipe mode with unbounded length, EOF just means end of stream
553                    if self.record_data_len.is_none()
554                        && e.kind() == std::io::ErrorKind::UnexpectedEof
555                    {
556                        break;
557                    }
558                    return Err(ReadError::PerfEventData.into());
559                }
560            }
561
562            match user_record_type {
563                Some(UserRecordType::PERF_COMPRESSED | UserRecordType::PERF_COMPRESSED2) => {
564                    #[cfg(not(feature = "zstd"))]
565                    {
566                        return Err(Error::IoError(std::io::Error::new(std::io::ErrorKind::Unsupported,
567                        "Compression support is not enabled. Please rebuild with the 'zstd' feature flag.",
568                    )));
569                    }
570                    #[cfg(feature = "zstd")]
571                    {
572                        let compressed_data =
573                            if user_record_type.unwrap() == UserRecordType::PERF_COMPRESSED {
574                                // PERF_RECORD_COMPRESSED (type 81) was introduced in Linux 5.2 (2019).
575                                // The entire body is the compressed data.
576                                &buffer[..]
577                            } else {
578                                // PERF_RECORD_COMPRESSED2 (type 83) was introduced in Linux 6.x (May 2025)
579                                // The record now specifies the compressed size explicitly because
580                                // the compressed data may not fill the entire body (it shouldn't
581                                // include the alignment padding).
582                                Self::get_compressed_data_for_compressed2_record::<T>(&buffer)?
583                            };
584                        self.process_compressed_record_data::<T>(compressed_data)?;
585                    }
586                }
587                Some(UserRecordType::PERF_AUXTRACE) => {
588                    // So far the buffer contents only contain the auxtrace struct
589                    // values, but not the actual auxtrace data.
590                    // Get the length of the auxtrace data, and then read the bytes
591                    // and append them to the record buffer.
592                    let auxtrace = Auxtrace::parse::<T>(RawData::Single(&buffer))?;
593                    let auxtrace_size = auxtrace.size;
594
595                    let auxtrace_len =
596                        usize::try_from(auxtrace.size).map_err(|_| ReadError::PerfEventData)?;
597                    let actual_event_body_len = event_body_len
598                        .checked_add(auxtrace_len)
599                        .ok_or(ReadError::PerfEventData)?;
600                    buffer.resize(actual_event_body_len, 0);
601                    self.reader
602                        .read_exact(&mut buffer[event_body_len..])
603                        .map_err(|_| ReadError::PerfEventData)?;
604                    self.read_offset += auxtrace_size;
605
606                    self.process_record::<T>(header, buffer, Some(offset))?
607                }
608                _ => self.process_record::<T>(header, buffer, Some(offset))?,
609            }
610
611            // Auto-flush for simpleperf files
612            if let Some(round_size) = self.synthetic_round_size {
613                self.records_since_last_finished_round += 1;
614                if self.records_since_last_finished_round >= round_size {
615                    self.records_since_last_finished_round = 0;
616                    self.sorter.finish_round();
617                    if self.sorter.has_more() {
618                        return Ok(());
619                    }
620                }
621            }
622        }
623
624        // Everything has been read.
625        self.sorter.finish();
626
627        Ok(())
628    }
629
630    /// Process a single record and add it to the sorter
631    fn process_record<T: ByteOrder>(
632        &mut self,
633        header: PerfEventHeader,
634        buffer: Vec<u8>,
635        offset: Option<u64>,
636    ) -> Result<(), Error> {
637        let data = RawData::from(&buffer[..]);
638        let record_type = RecordType(header.type_);
639
640        let (attr_index, timestamp) = if record_type.is_builtin_type() {
641            let attr_index = match &self.id_parse_infos {
642                IdParseInfos::OnlyOneEvent => 0,
643                IdParseInfos::Same(id_parse_info) => {
644                    get_record_id::<T>(record_type, data, id_parse_info)
645                        .and_then(|id| self.event_id_to_attr_index.get(&id).cloned())
646                        .unwrap_or(0)
647                }
648                IdParseInfos::PerAttribute(sample_id_all) => {
649                    // We have IDENTIFIER (guaranteed by PerAttribute).
650                    get_record_identifier::<T>(record_type, data, *sample_id_all)
651                        .and_then(|id| self.event_id_to_attr_index.get(&id).cloned())
652                        .unwrap_or(0)
653                }
654            };
655            let parse_info = self.parse_infos[attr_index];
656            let timestamp = get_record_timestamp::<T>(record_type, data, &parse_info);
657            (Some(attr_index), timestamp)
658        } else {
659            // user type
660            (None, None)
661        };
662
663        let sort_key = RecordSortKey { timestamp, offset };
664        let misc = header.misc;
665        let pending_record = PendingRecord {
666            record_type,
667            misc,
668            buffer,
669            attr_index,
670        };
671        self.sorter.insert_unordered(sort_key, pending_record);
672        Ok(())
673    }
674
675    /// Return the actual compressed part of a COMPRESSED2 record body.
676    ///
677    /// PERF_RECORD_COMPRESSED2 (type 83) was introduced in Linux 6.x (May 2025)
678    /// to fix 8-byte alignment issues with the original format.
679    /// Format: header (8 bytes) + data_size (8 bytes) + compressed data + padding
680    /// The header.size includes padding for 8-byte alignment; data_size has the actual size.
681    #[cfg(feature = "zstd")]
682    fn get_compressed_data_for_compressed2_record<T: ByteOrder>(
683        buffer: &[u8],
684    ) -> Result<&[u8], Error> {
685        if buffer.len() < 8 {
686            return Err(ReadError::PerfEventData.into());
687        }
688        let data_size = T::read_u64(&buffer[0..8]) as usize;
689        if data_size > buffer.len() - 8 {
690            return Err(ReadError::PerfEventData.into());
691        }
692        Ok(&buffer[8..8 + data_size])
693    }
694
695    #[cfg(feature = "zstd")]
696    fn process_compressed_record_data<T: ByteOrder>(
697        &mut self,
698        compressed_data: &[u8],
699    ) -> Result<(), Error> {
700        let mut decompressed = core::mem::take(&mut self.pending_decompressed_data);
701        self.zstd_decompressor
702            .decompress_into(compressed_data, &mut decompressed)?;
703        let remaining_len = self.process_decompressed_records::<T>(&decompressed)?;
704        decompressed.drain(0..(decompressed.len() - remaining_len));
705        self.pending_decompressed_data = decompressed;
706        Ok(())
707    }
708
709    /// Processes decompressed data as a sequence of perf records.
710    /// Shared by both COMPRESSED and COMPRESSED2 handlers.
711    ///
712    /// Returns the number of bytes remaining which need to be carried
713    /// over into the next call.
714    #[cfg(feature = "zstd")]
715    fn process_decompressed_records<T: ByteOrder>(
716        &mut self,
717        decompressed: &[u8],
718    ) -> Result<usize, Error> {
719        let mut remaining = decompressed;
720
721        while let Some((header_data, after_header_data)) =
722            remaining.split_at_checked(PerfEventHeader::STRUCT_SIZE)
723        {
724            let header = PerfEventHeader::parse::<_, T>(header_data)?;
725            let record_size = header.size as usize;
726            let Some(record_body_len) = record_size.checked_sub(PerfEventHeader::STRUCT_SIZE)
727            else {
728                return Err(Error::InvalidPerfEventSize);
729            };
730
731            let Some((record_body_data, after_record_data)) =
732                after_header_data.split_at_checked(record_body_len)
733            else {
734                // Not enough remaining data for the full record
735                break;
736            };
737
738            let mut record_body_buffer = self.buffers_for_recycling.pop_front().unwrap_or_default();
739            record_body_buffer.clear();
740            record_body_buffer.extend_from_slice(record_body_data);
741
742            self.process_record::<T>(header, record_body_buffer, None)?;
743
744            remaining = after_record_data;
745        }
746        Ok(remaining.len())
747    }
748
749    /// Converts pending_record into an RawRecord which references the data in self.current_event_body.
750    fn convert_pending_record(&mut self, pending_record: PendingRecord) -> PerfFileRecord<'_> {
751        let PendingRecord {
752            record_type,
753            misc,
754            buffer,
755            attr_index,
756            ..
757        } = pending_record;
758        let prev_buffer = std::mem::replace(&mut self.current_event_body, buffer);
759        self.buffers_for_recycling.push_back(prev_buffer);
760
761        let data = RawData::from(&self.current_event_body[..]);
762
763        if let Some(record_type) = UserRecordType::try_from(record_type) {
764            let endian = self.endian;
765            PerfFileRecord::UserRecord(RawUserRecord {
766                record_type,
767                misc,
768                data,
769                endian,
770            })
771        } else {
772            let attr_index = attr_index.unwrap();
773            let parse_info = self.parse_infos[attr_index];
774            let record = RawEventRecord {
775                record_type,
776                misc,
777                data,
778                parse_info,
779            };
780            PerfFileRecord::EventRecord { attr_index, record }
781        }
782    }
783}
784
785#[derive(Clone, Debug, PartialEq, Eq)]
786struct PendingRecord {
787    record_type: RecordType,
788    misc: u16,
789    buffer: Vec<u8>,
790    attr_index: Option<usize>,
791}
792
793#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, PartialOrd, Ord)]
794struct RecordSortKey {
795    timestamp: Option<u64>,
796    offset: Option<u64>,
797}
798
799#[derive(Debug, Clone)]
800enum IdParseInfos {
801    /// There is only one event.
802    OnlyOneEvent,
803    /// There are multiple events, but all events are parsed the same way.
804    Same(RecordIdParseInfo),
805    /// All elements are guaranteed to have [`SampleFormat::IDENTIFIER`] set in `attr.sample_format`.
806    /// The inner element indicates sample_id_all.
807    PerAttribute(bool),
808}