use std::borrow::Cow;
use std::convert::{AsMut, AsRef};
use std::ops::{Deref, DerefMut};
use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
use crate::data::parse::{ParseBuf, ParseBufChunk, ParseError, ParseResult, Parser};
use crate::events::Hardware;
use crate::sys::bindings::{
__BindgenBitfieldUnit, perf_event_header, perf_event_mmap_page,
perf_event_mmap_page__bindgen_ty_1__bindgen_ty_1 as MmapPageFlags,
};
use crate::{check_errno_syscall, data, Counter};
used_in_docs!(Hardware);
pub struct Sampler {
counter: Counter,
mmap: memmap2::MmapRaw,
}
pub struct Record<'a> {
sampler: &'a Sampler,
header: perf_event_header,
data: ByteBuffer<'a>,
}
#[derive(Copy, Clone)]
enum ByteBuffer<'a> {
Single(&'a [u8]),
Split([&'a [u8]; 2]),
}
impl Sampler {
pub(crate) fn new(counter: Counter, mmap: memmap2::MmapRaw) -> Self {
assert!(!mmap.as_ptr().is_null());
Self { counter, mmap }
}
pub fn into_counter(self) -> Counter {
self.counter
}
pub fn as_counter(&self) -> &Counter {
&self.counter
}
pub fn as_counter_mut(&mut self) -> &mut Counter {
&mut self.counter
}
pub fn next_record(&mut self) -> Option<Record> {
use std::{mem, ptr, slice};
let page = self.page();
let tail = unsafe { ptr::read(ptr::addr_of!((*page).data_tail)) };
let head = unsafe { atomic_load(ptr::addr_of!((*page).data_head), Ordering::Acquire) };
if tail == head {
return None;
}
let data_size = unsafe { ptr::read(ptr::addr_of!((*page).data_size)) };
let data_offset = unsafe { ptr::read(ptr::addr_of!((*page).data_offset)) };
let mod_tail = (tail % data_size) as usize;
let mod_head = (head % data_size) as usize;
let data_start = unsafe { self.mmap.as_ptr().add(data_offset as usize) };
let tail_start = unsafe { data_start.add(mod_tail) };
let mut buffer = if mod_head > mod_tail {
ByteBuffer::Single(unsafe { slice::from_raw_parts(tail_start, mod_head - mod_tail) })
} else {
ByteBuffer::Split([
unsafe { slice::from_raw_parts(tail_start, data_size as usize - mod_tail) },
unsafe { slice::from_raw_parts(data_start, mod_head) },
])
};
let header = buffer.parse_header();
assert!(header.size as usize >= mem::size_of::<perf_event_header>());
buffer.truncate(header.size as usize - mem::size_of::<perf_event_header>());
Some(Record {
sampler: self,
header,
data: buffer,
})
}
pub fn next_blocking(&mut self, timeout: Option<Duration>) -> Option<Record> {
let deadline = timeout.map(|timeout| Instant::now() + timeout);
loop {
if let Some(record) = self.next_record() {
return Some(unsafe { std::mem::transmute::<Record, Record>(record) });
}
let timeout = match deadline {
Some(deadline) => deadline
.checked_duration_since(Instant::now())?
.as_millis()
.min(libc::c_int::MAX as u128) as libc::c_int,
None => -1,
};
let mut pollfd = libc::pollfd {
fd: self.as_raw_fd(),
events: libc::POLLIN,
revents: 0,
};
match check_errno_syscall(|| unsafe { libc::poll(&mut pollfd, 1, timeout) }) {
Ok(0) => return None,
Ok(_) if pollfd.revents & libc::POLLHUP != 0 => return self.next_record(),
Ok(_) => continue,
Err(e) => match e.raw_os_error() {
Some(libc::EINTR) => continue,
_ => panic!(
"polling a perf-event fd returned an unexpected error: {}",
e
),
},
}
}
}
pub fn read_user(&self) -> UserReadData {
#[cfg(target_arch = "x86")]
use std::arch::x86::_rdtsc;
#[cfg(target_arch = "x86_64")]
use std::arch::x86_64::_rdtsc;
loop {
let mut data = unsafe { PmcReadData::new(self.page()) };
if let Some(index) = data.index() {
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
data.with_pmc(unsafe { rdpmc(index) });
}
if data.cap_user_time() {
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
data.with_tsc(unsafe { _rdtsc() });
}
if let Some(data) = data.finish() {
return data;
}
}
}
fn page(&self) -> *const perf_event_mmap_page {
self.mmap.as_ptr() as *const _
}
}
impl Deref for Sampler {
type Target = Counter;
fn deref(&self) -> &Self::Target {
&self.counter
}
}
impl DerefMut for Sampler {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.counter
}
}
impl AsRef<Counter> for Sampler {
fn as_ref(&self) -> &Counter {
&self.counter
}
}
impl AsMut<Counter> for Sampler {
fn as_mut(&mut self) -> &mut Counter {
&mut self.counter
}
}
impl AsRawFd for Sampler {
fn as_raw_fd(&self) -> RawFd {
self.counter.as_raw_fd()
}
}
impl IntoRawFd for Sampler {
fn into_raw_fd(self) -> RawFd {
self.counter.into_raw_fd()
}
}
macro_rules! read_once {
($place:expr) => {
atomic_load(::std::ptr::addr_of!($place), Ordering::Relaxed)
};
}
macro_rules! barrier {
() => {
std::sync::atomic::compiler_fence(Ordering::SeqCst)
};
}
struct PmcReadData {
page: *const perf_event_mmap_page,
seq: u32,
flags: MmapPageFlags,
enabled: u64,
running: u64,
index: u32,
count: i64,
has_pmc_value: bool,
}
#[allow(dead_code)]
impl PmcReadData {
pub unsafe fn new(page: *const perf_event_mmap_page) -> Self {
let seq = atomic_load(std::ptr::addr_of!((*page).lock), Ordering::Acquire);
barrier!();
let capabilities = read_once!((*page).__bindgen_anon_1.capabilities);
Self {
page,
seq,
flags: {
let mut flags = MmapPageFlags::default();
flags._bitfield_1 = __BindgenBitfieldUnit::new(capabilities.to_ne_bytes());
flags
},
enabled: read_once!((*page).time_enabled),
running: read_once!((*page).time_running),
index: read_once!((*page).index),
count: read_once!((*page).offset),
has_pmc_value: false,
}
}
pub fn cap_user_rdpmc(&self) -> bool {
self.flags.cap_user_rdpmc() != 0
}
pub fn cap_user_time(&self) -> bool {
self.flags.cap_user_time() != 0
}
pub fn cap_user_time_short(&self) -> bool {
self.flags.cap_user_time_short() != 0
}
pub fn index(&self) -> Option<u32> {
if self.cap_user_rdpmc() && self.index != 0 {
Some(self.index - 1)
} else {
None
}
}
pub fn with_tsc(&mut self, mut cyc: u64) {
assert!(self.cap_user_time());
if !self.cap_user_rdpmc() || self.index == 0 {
return;
}
let page = self.page;
let time_offset = unsafe { read_once!((*page).time_offset) };
let time_mult = unsafe { read_once!((*page).time_mult) };
let time_shift = unsafe { read_once!((*page).time_shift) };
if self.cap_user_time_short() {
let time_cycles = unsafe { read_once!((*page).time_cycles) };
let time_mask = unsafe { read_once!((*page).time_mask) };
cyc = time_cycles + ((cyc - time_cycles) & time_mask);
}
let time_mult = time_mult as u64;
let quot = cyc >> time_shift;
let rem = cyc & ((1u64 << time_shift) - 1);
let delta = quot * time_mult + ((rem * time_mult) >> time_shift);
let delta = time_offset.wrapping_add(delta);
self.enabled += delta;
if self.index != 0 {
self.running += delta;
}
}
pub fn with_pmc(&mut self, pmc: u64) {
assert!(self.index().is_some());
let Self { page, .. } = *self;
let width = unsafe { read_once!((*page).pmc_width) };
let mut pmc = pmc as i64;
pmc <<= 64 - width;
pmc >>= 64 - width;
self.count = self.count.wrapping_add(pmc);
self.has_pmc_value = true;
}
pub fn finish(self) -> Option<UserReadData> {
let page = self.page;
let seq = self.seq;
barrier!();
let nseq = unsafe { atomic_load(std::ptr::addr_of!((*page).lock), Ordering::Acquire) };
if nseq != seq {
return None;
}
Some(UserReadData {
time_enabled: self.enabled,
time_running: self.running,
value: if self.has_pmc_value {
Some(self.count as u64)
} else {
None
},
})
}
}
#[derive(Copy, Clone, Debug)]
pub struct UserReadData {
time_enabled: u64,
time_running: u64,
value: Option<u64>,
}
impl UserReadData {
pub fn time_enabled(&self) -> Duration {
Duration::from_nanos(self.time_enabled)
}
pub fn time_running(&self) -> Duration {
Duration::from_nanos(self.time_running)
}
pub fn count(&self) -> Option<u64> {
self.value
}
pub fn scaled_count(&self) -> Option<u64> {
self.count().map(|count| {
let quot = count / self.time_running;
let rem = count % self.time_running;
quot * self.time_enabled + (rem * self.time_enabled) / self.time_running
})
}
}
impl<'s> Record<'s> {
pub fn ty(&self) -> u32 {
self.header.type_
}
pub fn misc(&self) -> u16 {
self.header.misc
}
#[allow(clippy::len_without_is_empty)] pub fn len(&self) -> usize {
self.data.len()
}
pub fn data(&self) -> &[&[u8]] {
match &self.data {
ByteBuffer::Single(buf) => std::slice::from_ref(buf),
ByteBuffer::Split(bufs) => &bufs[..],
}
}
pub fn to_vec(&self) -> Vec<u8> {
self.to_contiguous().into_owned()
}
pub fn to_contiguous(&self) -> Cow<[u8]> {
match self.data {
ByteBuffer::Single(data) => Cow::Borrowed(data),
ByteBuffer::Split([a, b]) => {
let mut vec = Vec::with_capacity(a.len() + b.len());
vec.extend_from_slice(a);
vec.extend_from_slice(b);
Cow::Owned(vec)
}
}
}
pub fn parse_record(&self) -> ParseResult<data::Record> {
let mut parser = Parser::new(self.data, self.sampler.config().clone());
data::Record::parse_with_header(&mut parser, self.header)
}
}
impl<'s> Drop for Record<'s> {
fn drop(&mut self) {
use std::ptr;
let page = self.sampler.page();
unsafe {
let tail = ptr::read(ptr::addr_of!((*page).data_tail));
atomic_store(
ptr::addr_of!((*page).data_tail),
tail + (self.header.size as u64),
Ordering::Release,
);
}
}
}
unsafe impl<'s> Sync for Record<'s> {}
unsafe impl<'s> Send for Record<'s> {}
impl<'a> ByteBuffer<'a> {
fn parse_header(&mut self) -> perf_event_header {
let mut bytes = [0; std::mem::size_of::<perf_event_header>()];
self.copy_to_slice(&mut bytes);
unsafe { std::mem::transmute(bytes) }
}
fn len(&self) -> usize {
match self {
Self::Single(buf) => buf.len(),
Self::Split([a, b]) => a.len() + b.len(),
}
}
fn truncate(&mut self, new_len: usize) {
assert!(new_len <= self.len());
*self = match *self {
Self::Single(buf) => Self::Single(&buf[..new_len]),
Self::Split([a, b]) => {
if new_len <= a.len() {
Self::Single(&a[..new_len])
} else {
Self::Split([a, &b[..new_len - a.len()]])
}
}
}
}
fn copy_to_slice(&mut self, dst: &mut [u8]) {
assert!(self.len() >= dst.len());
match self {
Self::Single(buf) => {
let (head, rest) = buf.split_at(dst.len());
dst.copy_from_slice(head);
*buf = rest;
}
Self::Split([buf, _]) if buf.len() >= dst.len() => {
let (head, rest) = buf.split_at(dst.len());
dst.copy_from_slice(head);
*buf = rest;
}
&mut Self::Split([a, b]) => {
let (d_head, d_rest) = dst.split_at_mut(a.len());
let (b_head, b_rest) = b.split_at(d_rest.len());
d_head.copy_from_slice(a);
d_rest.copy_from_slice(b_head);
*self = Self::Single(b_rest);
}
}
}
}
unsafe impl<'a> ParseBuf<'a> for ByteBuffer<'a> {
fn chunk(&mut self) -> ParseResult<ParseBufChunk<'_, 'a>> {
match self {
Self::Single([]) => Err(ParseError::eof()),
Self::Single(chunk) => Ok(ParseBufChunk::External(chunk)),
Self::Split([chunk, _]) => Ok(ParseBufChunk::External(chunk)),
}
}
fn advance(&mut self, mut count: usize) {
match self {
Self::Single(chunk) => chunk.advance(count),
Self::Split([chunk, _]) if count < chunk.len() => chunk.advance(count),
Self::Split([a, b]) => {
count -= a.len();
b.advance(count);
*self = Self::Single(b);
}
}
}
}
macro_rules! assert_same_size {
($a:ty, $b:ty) => {{
if false {
let _assert_same_size: [u8; ::std::mem::size_of::<$b>()] =
[0u8; ::std::mem::size_of::<$a>()];
}
}};
}
trait Atomic: Sized + Copy {
type Atomic;
unsafe fn store(ptr: *const Self, val: Self, order: Ordering);
unsafe fn load(ptr: *const Self, order: Ordering) -> Self;
}
macro_rules! impl_atomic {
($base:ty, $atomic:ty) => {
impl Atomic for $base {
type Atomic = $atomic;
unsafe fn store(ptr: *const Self, val: Self, order: Ordering) {
assert_same_size!(Self, Self::Atomic);
let ptr = ptr as *const Self::Atomic;
(*ptr).store(val, order)
}
unsafe fn load(ptr: *const Self, order: Ordering) -> Self {
assert_same_size!(Self, Self::Atomic);
let ptr = ptr as *const Self::Atomic;
(*ptr).load(order)
}
}
};
}
impl_atomic!(u64, std::sync::atomic::AtomicU64);
impl_atomic!(u32, std::sync::atomic::AtomicU32);
impl_atomic!(u16, std::sync::atomic::AtomicU16);
impl_atomic!(i64, std::sync::atomic::AtomicI64);
unsafe fn atomic_store<T: Atomic>(ptr: *const T, val: T, order: Ordering) {
T::store(ptr, val, order)
}
unsafe fn atomic_load<T: Atomic>(ptr: *const T, order: Ordering) -> T {
T::load(ptr, order)
}
#[cfg(any(target_arch = "x86_64", target_arch = "x86"))]
unsafe fn rdpmc(index: u32) -> u64 {
#[cfg(target_arch = "x86_64")]
{
let lo: u64;
let hi: u64;
std::arch::asm!(
"rdpmc",
in("ecx") index,
out("rax") lo,
out("rdx") hi
);
lo | (hi << u32::BITS)
}
#[cfg(target_arch = "x86")]
{
let lo: u32;
let hi: u32;
std::arch::asm!(
"rdpmc",
in("ecx") index,
out("eax") lo,
out("edx") hi
);
(lo as u64) | ((hi as u64) << u32::BITS)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn buf_copy_over_split() {
let mut out = [0; 7];
let mut buf = ByteBuffer::Split([b"aaaaaa", b"bbbbb"]);
buf.copy_to_slice(&mut out);
assert_eq!(&out, b"aaaaaab");
assert_eq!(buf.len(), 4);
}
#[test]
fn buf_copy_to_split() {
let mut out = [0; 6];
let mut buf = ByteBuffer::Split([b"aaaaaa", b"bbbbb"]);
buf.copy_to_slice(&mut out);
assert_eq!(&out, b"aaaaaa");
assert_eq!(buf.len(), 5);
}
#[test]
fn buf_copy_before_split() {
let mut out = [0; 5];
let mut buf = ByteBuffer::Split([b"aaaaaa", b"bbbbb"]);
buf.copy_to_slice(&mut out);
assert_eq!(&out, b"aaaaa");
assert_eq!(buf.len(), 6);
}
#[test]
fn buf_truncate_over_split() {
let mut out = [0u8; 11];
let mut buf = ByteBuffer::Split([b"1234567890", b"abc"]);
buf.truncate(11);
assert_eq!(buf.len(), 11);
buf.copy_to_slice(&mut out);
assert_eq!(&out, b"1234567890a");
}
#[test]
fn buf_truncate_before_split() {
let mut out = [0u8; 5];
let mut buf = ByteBuffer::Split([b"1234567890", b"abc"]);
buf.truncate(5);
assert_eq!(buf.len(), 5);
buf.copy_to_slice(&mut out);
assert_eq!(&out, b"12345");
}
}