perf_event_open/sample/iter/
cow.rs

1use std::fs::File;
2use std::future::Future;
3use std::io::Result;
4use std::mem::{transmute, MaybeUninit};
5use std::pin::Pin;
6use std::sync::mpsc::{sync_channel, SyncSender};
7use std::task::{Context, Poll, Waker};
8use std::thread;
9
10use crate::ffi::syscall::{epoll_create1, epoll_ctl, epoll_wait};
11use crate::sample::rb::{CowChunk, Rb};
12use crate::sample::record::Parser;
13
14/// COW (copy-on-write) record iterator.
15///
16/// This type allows you to access the raw bytes of record in the
17/// underlying ring-buffer directly without copy it to the outside.
18pub struct CowIter<'a> {
19    pub(in crate::sample) rb: Rb<'a>,
20    pub(in crate::sample) perf: &'a File,
21    pub(in crate::sample) parser: &'a Parser,
22}
23
24impl<'a> CowIter<'a> {
25    /// Advances the iterator and returns the next value.
26    ///
27    /// If sampling is in happening, operations in the closure should be
28    /// quick and cheap. Slow iteration of raw bytes may throttle kernel
29    /// threads from outputting new data to the ring-buffer, and heavy
30    /// operations may affect the performance of the target process.
31    ///
32    /// # Examples
33    ///
34    /// ``` rust
35    /// use perf_event_open::config::{Cpu, Opts, Proc, SampleOn, Size};
36    /// use perf_event_open::count::Counter;
37    /// use perf_event_open::event::sw::Software;
38    ///
39    /// let event = Software::TaskClock;
40    /// let target = (Proc::ALL, Cpu(0));
41    ///
42    /// let mut opts = Opts::default();
43    /// opts.sample_on = SampleOn::Count(50_000); // 50us
44    /// opts.sample_format.user_stack = Some(Size(8)); // Dump 8-bytes user stack in sample.
45    ///
46    /// let counter = Counter::new(event, target, &opts).unwrap();
47    /// let sampler = counter.sampler(5).unwrap();
48    /// let mut iter = sampler.iter().into_cow();
49    ///
50    /// counter.enable().unwrap();
51    ///
52    /// let mut skipped = 0;
53    /// let it = loop {
54    ///     let it = iter
55    ///         .next(|cc, p| {
56    ///             // ABI layout:
57    ///             // u32 type
58    ///             // u16 misc
59    ///             // u16 size
60    ///             // u64 len
61    ///             // [u8; len] bytes
62    ///
63    ///             let ptr = cc.as_bytes().as_ptr();
64    ///             let ty = ptr as *const u32;
65    ///
66    ///             // Only parse sample record with stack dumped.
67    ///             if unsafe { *ty } == 9 {
68    ///                 let len = unsafe { ptr.offset(8) } as *const u64;
69    ///                 if unsafe { *len } > 0 {
70    ///                     return Some(p.parse(cc));
71    ///                 }
72    ///             }
73    ///
74    ///             skipped += 1;
75    ///             None
76    ///         })
77    ///         .flatten();
78    ///
79    ///     if let Some(it) = it {
80    ///         break it;
81    ///     }
82    /// };
83    ///
84    /// println!("skipped: {}", skipped);
85    /// println!("{:-?}", it);
86    /// ```
87    pub fn next<F, R>(&mut self, f: F) -> Option<R>
88    where
89        F: FnOnce(CowChunk<'_>, &Parser) -> R,
90    {
91        self.rb.lending_pop().map(|cc| f(cc, self.parser))
92    }
93
94    /// Creates an asynchronous iterator.
95    pub fn into_async(self) -> Result<AsyncCowIter<'a>> {
96        let epoll = epoll_create1(libc::O_CLOEXEC)?;
97        let mut event = libc::epoll_event {
98            events: (libc::EPOLLIN | libc::EPOLLHUP) as _,
99            u64: 0,
100        };
101        epoll_ctl(&epoll, libc::EPOLL_CTL_ADD, self.perf, &mut event)?;
102
103        let (tx, rx) = sync_channel::<Waker>(1);
104
105        thread::spawn(move || {
106            let mut events = {
107                let src = [MaybeUninit::<libc::epoll_event>::uninit()];
108                // We don't care which event triggers epoll because we only monitor one event
109                // but `epoll_wait` requires a non-empty buffer
110                unsafe { transmute::<[_; 1], [_; 1]>(src) }
111            };
112            'exit: while let Ok(waker) = rx.recv() {
113                loop {
114                    match epoll_wait(&epoll, &mut events, -1).map(|it| it[0].events as _) {
115                        Ok(libc::EPOLLIN) => {
116                            waker.wake();
117                            break;
118                        }
119                        Ok(libc::EPOLLHUP) => {
120                            drop(rx);
121                            waker.wake();
122                            break 'exit;
123                        }
124                        _ => (), // Error can only be `EINTR`, ignore it and try again.
125                    }
126                }
127            }
128        });
129
130        Ok(AsyncCowIter {
131            inner: self,
132            waker: tx,
133        })
134    }
135}
136
137/// Asynchronous COW record iterator.
138pub struct AsyncCowIter<'a> {
139    inner: CowIter<'a>,
140    waker: SyncSender<Waker>,
141}
142
143impl AsyncCowIter<'_> {
144    /// Advances the iterator and returns the next value.
145    ///
146    /// [`WakeUp::on`][crate::config::WakeUp::on] must be properly set to make this work.
147    ///
148    /// See also [`CowIter::next`].
149    pub async fn next<F, R>(&mut self, f: F) -> Option<R>
150    where
151        F: FnOnce(CowChunk<'_>, &Parser) -> R + Unpin,
152    {
153        struct Fut<I, F>(I, Option<F>);
154
155        impl<F, R> Future for Fut<&mut AsyncCowIter<'_>, F>
156        where
157            F: FnOnce(CowChunk<'_>, &Parser) -> R + Unpin,
158        {
159            type Output = Option<R>;
160
161            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
162                let Fut(iter, f) = self.get_mut();
163
164                if let Some(cc) = iter.inner.rb.lending_pop() {
165                    let f = f.take();
166                    // We only take `f` once, so there is always a value there.
167                    let f = unsafe { f.unwrap_unchecked() };
168                    return Poll::Ready(Some(f(cc, iter.inner.parser)));
169                }
170
171                let waker = cx.waker().clone();
172                match iter.waker.send(waker) {
173                    Ok(()) => Poll::Pending,
174                    // The task we were monitoring exited, so the epoll thread died.
175                    // No more data needs to be produced.
176                    Err(_) => Poll::Ready(None),
177                }
178            }
179        }
180
181        Fut(self, Some(f)).await
182    }
183}