1use byteorder::{BigEndian, ByteOrder, LittleEndian};
2use linear_map::LinearMap;
3use linux_perf_event_reader::{
4 get_record_id, get_record_identifier, get_record_timestamp, AttrFlags, Endianness,
5 PerfEventHeader, RawData, RawEventRecord, RecordIdParseInfo, RecordParseInfo, RecordType,
6 SampleFormat,
7};
8
9use std::collections::{HashMap, VecDeque};
10use std::io::{Cursor, Read, Seek, SeekFrom};
11
12#[cfg(feature = "zstd")]
13use crate::decompression::ZstdDecompressor;
14
15use super::auxtrace::Auxtrace;
16use super::error::{Error, ReadError};
17use super::feature_sections::AttributeDescription;
18use super::features::Feature;
19use super::header::{PerfHeader, PerfPipeHeader};
20use super::perf_file::PerfFile;
21use super::record::{HeaderAttr, HeaderFeature, PerfFileRecord, RawUserRecord, UserRecordType};
22use super::section::PerfFileSection;
23use super::simpleperf;
24use super::sorter::Sorter;
25
26const SIMPLEPERF_SYNTHETIC_ROUND_SIZE: usize = 1024;
33
34pub struct PerfFileReader<R: Read> {
67 pub perf_file: PerfFile,
68 pub record_iter: PerfRecordIter<R>,
69}
70
71impl<C: Read + Seek> PerfFileReader<C> {
72 pub fn parse_file(mut cursor: C) -> Result<Self, Error> {
73 let header = PerfHeader::parse(&mut cursor)?;
74 match &header.magic {
75 b"PERFILE2" => {
76 Self::parse_file_impl::<LittleEndian>(cursor, header, Endianness::LittleEndian)
77 }
78 b"2ELIFREP" => {
79 Self::parse_file_impl::<BigEndian>(cursor, header, Endianness::BigEndian)
80 }
81 _ => Err(Error::UnrecognizedMagicValue(header.magic)),
82 }
83 }
84
85 fn parse_file_impl<T>(
86 mut cursor: C,
87 header: PerfHeader,
88 endian: Endianness,
89 ) -> Result<Self, Error>
90 where
91 T: ByteOrder,
92 {
93 let feature_pos = header.data_section.offset + header.data_section.size;
95 cursor.seek(SeekFrom::Start(feature_pos))?;
96 let mut feature_sections_info = Vec::new();
97 for feature in header.features.iter() {
98 let section = PerfFileSection::parse::<_, T>(&mut cursor)?;
99 feature_sections_info.push((feature, section));
100 }
101
102 let mut feature_sections = LinearMap::new();
103 for (feature, section) in feature_sections_info {
104 let offset = section.offset;
105 let size = usize::try_from(section.size).map_err(|_| Error::SectionSizeTooBig)?;
106 let mut data = vec![0; size];
107 cursor.seek(SeekFrom::Start(offset))?;
108 cursor.read_exact(&mut data)?;
109 feature_sections.insert(feature, data);
110 }
111
112 let attributes =
113 if let Some(event_desc_section) = feature_sections.get(&Feature::EVENT_DESC) {
114 AttributeDescription::parse_event_desc_section::<_, T>(Cursor::new(
115 &event_desc_section[..],
116 ))?
117 } else if header.event_types_section.size != 0 {
118 AttributeDescription::parse_event_types_section::<_, T>(
119 &mut cursor,
120 &header.event_types_section,
121 header.attr_size,
122 )?
123 } else if let Some(simpleperf_meta_info) =
124 feature_sections.get(&Feature::SIMPLEPERF_META_INFO)
125 {
126 let info_map = simpleperf::parse_meta_info_map(&simpleperf_meta_info[..])?;
127 let event_types = simpleperf::get_event_types(&info_map)
128 .ok_or(Error::NoEventTypesInSimpleperfMetaInfo)?;
129 AttributeDescription::parse_simpleperf_attr_section::<_, T>(
130 &mut cursor,
131 &header.attr_section,
132 header.attr_size,
133 &event_types,
134 )?
135 } else {
136 AttributeDescription::parse_attr_section::<_, T>(
137 &mut cursor,
138 &header.attr_section,
139 header.attr_size,
140 )?
141 };
142
143 let mut event_id_to_attr_index = HashMap::new();
144 for (attr_index, AttributeDescription { event_ids, .. }) in attributes.iter().enumerate() {
145 for event_id in event_ids {
146 event_id_to_attr_index.insert(*event_id, attr_index);
147 }
148 }
149
150 let parse_infos: Vec<_> = attributes
151 .iter()
152 .map(|attr| RecordParseInfo::new(&attr.attr, endian))
153 .collect();
154
155 let first_attr = attributes.first().ok_or(Error::NoAttributes)?;
156
157 let first_has_sample_id_all = first_attr.attr.flags.contains(AttrFlags::SAMPLE_ID_ALL);
158 let (first_parse_info, remaining_parse_infos) = parse_infos.split_first().unwrap();
159
160 let id_parse_infos = if remaining_parse_infos.is_empty() {
161 IdParseInfos::OnlyOneEvent
162 } else if remaining_parse_infos
163 .iter()
164 .all(|parse_info| parse_info.id_parse_info == first_parse_info.id_parse_info)
165 {
166 IdParseInfos::Same(first_parse_info.id_parse_info)
167 } else {
168 for (attr_index, AttributeDescription { attr, .. }) in attributes.iter().enumerate() {
177 if !attr.sample_format.contains(SampleFormat::IDENTIFIER) {
178 return Err(Error::NoIdentifierDespiteMultiEvent(attr_index));
179 }
180 if attr.flags.contains(AttrFlags::SAMPLE_ID_ALL) != first_has_sample_id_all {
181 return Err(Error::InconsistentSampleIdAllWithMultiEvent(attr_index));
182 }
183 }
184
185 IdParseInfos::PerAttribute(first_has_sample_id_all)
186 };
187
188 cursor.seek(SeekFrom::Start(header.data_section.offset))?;
191
192 let is_simpleperf = feature_sections
193 .get(&Feature::SIMPLEPERF_META_INFO)
194 .is_some();
195
196 let perf_file = PerfFile {
197 endian,
198 features: header.features,
199 feature_sections,
200 attributes,
201 };
202
203 let record_iter = PerfRecordIter {
204 reader: cursor,
205 endian,
206 id_parse_infos,
207 parse_infos,
208 event_id_to_attr_index,
209 read_offset: 0,
210 record_data_len: Some(header.data_section.size),
211 sorter: Sorter::new(),
212 synthetic_round_size: if is_simpleperf {
213 Some(SIMPLEPERF_SYNTHETIC_ROUND_SIZE)
214 } else {
215 None
216 },
217 records_since_last_finished_round: 0,
218 buffers_for_recycling: VecDeque::new(),
219 current_event_body: Vec::new(),
220 pending_first_record: None,
221 #[cfg(feature = "zstd")]
222 zstd_decompressor: ZstdDecompressor::new(),
223 #[cfg(feature = "zstd")]
224 pending_decompressed_data: Vec::new(),
225 };
226
227 Ok(Self {
228 perf_file,
229 record_iter,
230 })
231 }
232}
233
234impl<R: Read> PerfFileReader<R> {
235 pub fn parse_pipe(mut reader: R) -> Result<Self, Error> {
241 let pipe_header = PerfPipeHeader::parse(&mut reader)?;
242 match &pipe_header.magic {
243 b"PERFILE2" => Self::parse_pipe_impl::<LittleEndian>(reader, Endianness::LittleEndian),
244 b"2ELIFREP" => Self::parse_pipe_impl::<BigEndian>(reader, Endianness::BigEndian),
245 _ => Err(Error::UnrecognizedMagicValue(pipe_header.magic)),
246 }
247 }
248
249 fn parse_pipe_impl<T: ByteOrder>(mut reader: R, endian: Endianness) -> Result<Self, Error> {
250 let mut attributes = Vec::new();
251 let mut feature_sections = LinearMap::new();
252 let mut pending_first_record: Option<(PerfEventHeader, Vec<u8>)> = None;
253
254 loop {
256 let header = match PerfEventHeader::parse::<_, T>(&mut reader) {
257 Ok(header) => header,
258 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
259 break;
261 }
262 Err(e) => return Err(e.into()),
263 };
264
265 let size = header.size as usize;
266 if size < PerfEventHeader::STRUCT_SIZE {
267 return Err(Error::InvalidPerfEventSize);
268 }
269
270 let event_body_len = size - PerfEventHeader::STRUCT_SIZE;
271 let mut buffer = vec![0; event_body_len];
272 match reader.read_exact(&mut buffer) {
273 Ok(()) => {}
274 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
275 return Err(e.into());
277 }
278 Err(e) => return Err(e.into()),
279 }
280
281 let record_type = RecordType(header.type_);
282
283 match UserRecordType::try_from(record_type) {
284 Some(UserRecordType::PERF_HEADER_ATTR) => {
285 let data = RawData::from(&buffer[..]);
286 let header_attr = HeaderAttr::parse::<T>(data)?;
287
288 attributes.push(AttributeDescription {
289 attr: header_attr.attr,
290 name: None,
291 event_ids: header_attr.ids,
292 });
293 }
294 Some(UserRecordType::PERF_HEADER_FEATURE) => {
295 let data = RawData::from(&buffer[..]);
296 let header_feature = HeaderFeature::parse::<T>(data)?;
297
298 feature_sections.insert(header_feature.feature, header_feature.data);
299 }
300 _ => {
301 pending_first_record = Some((header, buffer));
303 break;
304 }
305 }
306 }
307
308 if attributes.is_empty() {
309 return Err(Error::NoAttributes);
310 }
311
312 if let Some(event_desc_data) = feature_sections.get(&Feature::EVENT_DESC) {
313 let event_desc_attrs = AttributeDescription::parse_event_desc_section::<_, T>(
314 Cursor::new(&event_desc_data[..]),
315 )?;
316
317 for attr in attributes.iter_mut() {
319 if let Some(event_desc_attr) = event_desc_attrs.iter().find(|desc| {
321 !desc.event_ids.is_empty()
322 && !attr.event_ids.is_empty()
323 && desc.event_ids[0] == attr.event_ids[0]
324 }) {
325 attr.name = event_desc_attr.name.clone();
326 }
327 }
328 }
329
330 let mut event_id_to_attr_index = HashMap::new();
331 for (attr_index, AttributeDescription { event_ids, .. }) in attributes.iter().enumerate() {
332 for event_id in event_ids {
333 event_id_to_attr_index.insert(*event_id, attr_index);
334 }
335 }
336
337 let parse_infos: Vec<_> = attributes
338 .iter()
339 .map(|attr| RecordParseInfo::new(&attr.attr, endian))
340 .collect();
341
342 let first_attr = attributes.first().ok_or(Error::NoAttributes)?;
343 let first_has_sample_id_all = first_attr.attr.flags.contains(AttrFlags::SAMPLE_ID_ALL);
344 let (first_parse_info, remaining_parse_infos) = parse_infos.split_first().unwrap();
345
346 let id_parse_infos = if remaining_parse_infos.is_empty() {
347 IdParseInfos::OnlyOneEvent
348 } else if remaining_parse_infos
349 .iter()
350 .all(|parse_info| parse_info.id_parse_info == first_parse_info.id_parse_info)
351 {
352 IdParseInfos::Same(first_parse_info.id_parse_info)
353 } else {
354 for (attr_index, AttributeDescription { attr, .. }) in attributes.iter().enumerate() {
355 if !attr.sample_format.contains(SampleFormat::IDENTIFIER) {
356 return Err(Error::NoIdentifierDespiteMultiEvent(attr_index));
357 }
358 if attr.flags.contains(AttrFlags::SAMPLE_ID_ALL) != first_has_sample_id_all {
359 return Err(Error::InconsistentSampleIdAllWithMultiEvent(attr_index));
360 }
361 }
362 IdParseInfos::PerAttribute(first_has_sample_id_all)
363 };
364
365 let mut features_array = [0u64; 4];
367 for feature in feature_sections.keys() {
368 let feature_bit = feature.0;
369 if feature_bit < 256 {
370 let chunk_index = (feature_bit / 64) as usize;
371 let bit_in_chunk = feature_bit % 64;
372 features_array[chunk_index] |= 1u64 << bit_in_chunk;
373 }
374 }
375
376 let is_simpleperf = feature_sections
377 .get(&Feature::SIMPLEPERF_META_INFO)
378 .is_some();
379
380 let perf_file = PerfFile {
381 endian,
382 features: super::features::FeatureSet(features_array),
383 feature_sections,
384 attributes,
385 };
386
387 let record_iter = PerfRecordIter {
388 reader,
389 endian,
390 id_parse_infos,
391 parse_infos,
392 event_id_to_attr_index,
393 read_offset: 0,
394 record_data_len: None, sorter: Sorter::new(),
396 synthetic_round_size: if is_simpleperf {
397 Some(SIMPLEPERF_SYNTHETIC_ROUND_SIZE)
398 } else {
399 None
400 },
401 records_since_last_finished_round: 0,
402 buffers_for_recycling: VecDeque::new(),
403 current_event_body: Vec::new(),
404 pending_first_record,
405 #[cfg(feature = "zstd")]
406 zstd_decompressor: ZstdDecompressor::new(),
407 #[cfg(feature = "zstd")]
408 pending_decompressed_data: Vec::new(),
409 };
410
411 Ok(Self {
412 perf_file,
413 record_iter,
414 })
415 }
416}
417
418pub struct PerfRecordIter<R: Read> {
420 reader: R,
421 endian: Endianness,
422 read_offset: u64,
423 record_data_len: Option<u64>,
425 current_event_body: Vec<u8>,
426 id_parse_infos: IdParseInfos,
427 parse_infos: Vec<RecordParseInfo>,
429 event_id_to_attr_index: HashMap<u64, usize>,
430 sorter: Sorter<RecordSortKey, PendingRecord>,
431 synthetic_round_size: Option<usize>,
439 records_since_last_finished_round: usize,
441 buffers_for_recycling: VecDeque<Vec<u8>>,
442 pending_first_record: Option<(PerfEventHeader, Vec<u8>)>,
444 #[cfg(feature = "zstd")]
446 zstd_decompressor: ZstdDecompressor,
447 #[cfg(feature = "zstd")]
451 pending_decompressed_data: Vec<u8>,
452}
453
454impl<R: Read> PerfRecordIter<R> {
455 pub fn next_record(
471 &mut self,
472 _perf_file: &mut PerfFile,
473 ) -> Result<Option<PerfFileRecord<'_>>, Error> {
474 if !self.sorter.has_more() {
475 self.read_next_round()?;
476 }
477 if let Some(pending_record) = self.sorter.get_next() {
478 let record = self.convert_pending_record(pending_record);
479 return Ok(Some(record));
480 }
481 Ok(None)
482 }
483
484 fn read_next_round(&mut self) -> Result<(), Error> {
487 if self.endian == Endianness::LittleEndian {
488 self.read_next_round_impl::<byteorder::LittleEndian>()
489 } else {
490 self.read_next_round_impl::<byteorder::BigEndian>()
491 }
492 }
493
494 fn read_next_round_impl<T: ByteOrder>(&mut self) -> Result<(), Error> {
498 if let Some((pending_header, pending_buffer)) = self.pending_first_record.take() {
500 self.process_record::<T>(pending_header, pending_buffer, Some(self.read_offset))?;
501 self.read_offset += u64::from(pending_header.size);
502 }
503
504 while self
505 .record_data_len
506 .is_none_or(|len| self.read_offset < len)
507 {
508 let offset = self.read_offset;
509
510 let header = match PerfEventHeader::parse::<_, T>(&mut self.reader) {
512 Ok(header) => header,
513 Err(e) => {
514 if self.record_data_len.is_none()
516 && e.kind() == std::io::ErrorKind::UnexpectedEof
517 {
518 break;
519 }
520 return Err(e.into());
521 }
522 };
523
524 let size = header.size as usize;
525 if size < PerfEventHeader::STRUCT_SIZE {
526 return Err(Error::InvalidPerfEventSize);
527 }
528 self.read_offset += u64::from(header.size);
529
530 let user_record_type = UserRecordType::try_from(RecordType(header.type_));
531
532 if user_record_type == Some(UserRecordType::PERF_FINISHED_ROUND) {
533 self.sorter.finish_round();
534 self.records_since_last_finished_round = 0;
535 if self.sorter.has_more() {
536 return Ok(());
538 }
539
540 continue;
543 }
544
545 let event_body_len = size - PerfEventHeader::STRUCT_SIZE;
546 let mut buffer = self.buffers_for_recycling.pop_front().unwrap_or_default();
547 buffer.resize(event_body_len, 0);
548 match self.reader.read_exact(&mut buffer) {
550 Ok(()) => {}
551 Err(e) => {
552 if self.record_data_len.is_none()
554 && e.kind() == std::io::ErrorKind::UnexpectedEof
555 {
556 break;
557 }
558 return Err(ReadError::PerfEventData.into());
559 }
560 }
561
562 match user_record_type {
563 Some(UserRecordType::PERF_COMPRESSED | UserRecordType::PERF_COMPRESSED2) => {
564 #[cfg(not(feature = "zstd"))]
565 {
566 return Err(Error::IoError(std::io::Error::new(std::io::ErrorKind::Unsupported,
567 "Compression support is not enabled. Please rebuild with the 'zstd' feature flag.",
568 )));
569 }
570 #[cfg(feature = "zstd")]
571 {
572 let compressed_data =
573 if user_record_type.unwrap() == UserRecordType::PERF_COMPRESSED {
574 &buffer[..]
577 } else {
578 Self::get_compressed_data_for_compressed2_record::<T>(&buffer)?
583 };
584 self.process_compressed_record_data::<T>(compressed_data)?;
585 }
586 }
587 Some(UserRecordType::PERF_AUXTRACE) => {
588 let auxtrace = Auxtrace::parse::<T>(RawData::Single(&buffer))?;
593 let auxtrace_size = auxtrace.size;
594
595 let auxtrace_len =
596 usize::try_from(auxtrace.size).map_err(|_| ReadError::PerfEventData)?;
597 let actual_event_body_len = event_body_len
598 .checked_add(auxtrace_len)
599 .ok_or(ReadError::PerfEventData)?;
600 buffer.resize(actual_event_body_len, 0);
601 self.reader
602 .read_exact(&mut buffer[event_body_len..])
603 .map_err(|_| ReadError::PerfEventData)?;
604 self.read_offset += auxtrace_size;
605
606 self.process_record::<T>(header, buffer, Some(offset))?
607 }
608 _ => self.process_record::<T>(header, buffer, Some(offset))?,
609 }
610
611 if let Some(round_size) = self.synthetic_round_size {
613 self.records_since_last_finished_round += 1;
614 if self.records_since_last_finished_round >= round_size {
615 self.records_since_last_finished_round = 0;
616 self.sorter.finish_round();
617 if self.sorter.has_more() {
618 return Ok(());
619 }
620 }
621 }
622 }
623
624 self.sorter.finish();
626
627 Ok(())
628 }
629
630 fn process_record<T: ByteOrder>(
632 &mut self,
633 header: PerfEventHeader,
634 buffer: Vec<u8>,
635 offset: Option<u64>,
636 ) -> Result<(), Error> {
637 let data = RawData::from(&buffer[..]);
638 let record_type = RecordType(header.type_);
639
640 let (attr_index, timestamp) = if record_type.is_builtin_type() {
641 let attr_index = match &self.id_parse_infos {
642 IdParseInfos::OnlyOneEvent => 0,
643 IdParseInfos::Same(id_parse_info) => {
644 get_record_id::<T>(record_type, data, id_parse_info)
645 .and_then(|id| self.event_id_to_attr_index.get(&id).cloned())
646 .unwrap_or(0)
647 }
648 IdParseInfos::PerAttribute(sample_id_all) => {
649 get_record_identifier::<T>(record_type, data, *sample_id_all)
651 .and_then(|id| self.event_id_to_attr_index.get(&id).cloned())
652 .unwrap_or(0)
653 }
654 };
655 let parse_info = self.parse_infos[attr_index];
656 let timestamp = get_record_timestamp::<T>(record_type, data, &parse_info);
657 (Some(attr_index), timestamp)
658 } else {
659 (None, None)
661 };
662
663 let sort_key = RecordSortKey { timestamp, offset };
664 let misc = header.misc;
665 let pending_record = PendingRecord {
666 record_type,
667 misc,
668 buffer,
669 attr_index,
670 };
671 self.sorter.insert_unordered(sort_key, pending_record);
672 Ok(())
673 }
674
675 #[cfg(feature = "zstd")]
682 fn get_compressed_data_for_compressed2_record<T: ByteOrder>(
683 buffer: &[u8],
684 ) -> Result<&[u8], Error> {
685 if buffer.len() < 8 {
686 return Err(ReadError::PerfEventData.into());
687 }
688 let data_size = T::read_u64(&buffer[0..8]) as usize;
689 if data_size > buffer.len() - 8 {
690 return Err(ReadError::PerfEventData.into());
691 }
692 Ok(&buffer[8..8 + data_size])
693 }
694
695 #[cfg(feature = "zstd")]
696 fn process_compressed_record_data<T: ByteOrder>(
697 &mut self,
698 compressed_data: &[u8],
699 ) -> Result<(), Error> {
700 let mut decompressed = core::mem::take(&mut self.pending_decompressed_data);
701 self.zstd_decompressor
702 .decompress_into(compressed_data, &mut decompressed)?;
703 let remaining_len = self.process_decompressed_records::<T>(&decompressed)?;
704 decompressed.drain(0..(decompressed.len() - remaining_len));
705 self.pending_decompressed_data = decompressed;
706 Ok(())
707 }
708
709 #[cfg(feature = "zstd")]
715 fn process_decompressed_records<T: ByteOrder>(
716 &mut self,
717 decompressed: &[u8],
718 ) -> Result<usize, Error> {
719 let mut remaining = decompressed;
720
721 while let Some((header_data, after_header_data)) =
722 remaining.split_at_checked(PerfEventHeader::STRUCT_SIZE)
723 {
724 let header = PerfEventHeader::parse::<_, T>(header_data)?;
725 let record_size = header.size as usize;
726 let Some(record_body_len) = record_size.checked_sub(PerfEventHeader::STRUCT_SIZE)
727 else {
728 return Err(Error::InvalidPerfEventSize);
729 };
730
731 let Some((record_body_data, after_record_data)) =
732 after_header_data.split_at_checked(record_body_len)
733 else {
734 break;
736 };
737
738 let mut record_body_buffer = self.buffers_for_recycling.pop_front().unwrap_or_default();
739 record_body_buffer.clear();
740 record_body_buffer.extend_from_slice(record_body_data);
741
742 self.process_record::<T>(header, record_body_buffer, None)?;
743
744 remaining = after_record_data;
745 }
746 Ok(remaining.len())
747 }
748
749 fn convert_pending_record(&mut self, pending_record: PendingRecord) -> PerfFileRecord<'_> {
751 let PendingRecord {
752 record_type,
753 misc,
754 buffer,
755 attr_index,
756 ..
757 } = pending_record;
758 let prev_buffer = std::mem::replace(&mut self.current_event_body, buffer);
759 self.buffers_for_recycling.push_back(prev_buffer);
760
761 let data = RawData::from(&self.current_event_body[..]);
762
763 if let Some(record_type) = UserRecordType::try_from(record_type) {
764 let endian = self.endian;
765 PerfFileRecord::UserRecord(RawUserRecord {
766 record_type,
767 misc,
768 data,
769 endian,
770 })
771 } else {
772 let attr_index = attr_index.unwrap();
773 let parse_info = self.parse_infos[attr_index];
774 let record = RawEventRecord {
775 record_type,
776 misc,
777 data,
778 parse_info,
779 };
780 PerfFileRecord::EventRecord { attr_index, record }
781 }
782 }
783}
784
785#[derive(Clone, Debug, PartialEq, Eq)]
786struct PendingRecord {
787 record_type: RecordType,
788 misc: u16,
789 buffer: Vec<u8>,
790 attr_index: Option<usize>,
791}
792
793#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, PartialOrd, Ord)]
794struct RecordSortKey {
795 timestamp: Option<u64>,
796 offset: Option<u64>,
797}
798
799#[derive(Debug, Clone)]
800enum IdParseInfos {
801 OnlyOneEvent,
803 Same(RecordIdParseInfo),
805 PerAttribute(bool),
808}