perf_event_open/sample/iter/cow.rs
1use std::fs::File;
2use std::future::Future;
3use std::io::Result;
4use std::mem::{transmute, MaybeUninit};
5use std::pin::Pin;
6use std::sync::mpsc::{sync_channel, SyncSender};
7use std::task::{Context, Poll, Waker};
8use std::thread;
9
10use crate::ffi::syscall::{epoll_create1, epoll_ctl, epoll_wait};
11use crate::sample::rb::{CowChunk, Rb};
12use crate::sample::record::Parser;
13
14/// COW (copy-on-write) record iterator.
15///
16/// This type allows you to access the raw bytes of record in the
17/// underlying ring-buffer directly without copy it to the outside.
18pub struct CowIter<'a> {
19 pub(in crate::sample) rb: Rb<'a>,
20 pub(in crate::sample) perf: &'a File,
21 pub(in crate::sample) parser: &'a Parser,
22}
23
24impl<'a> CowIter<'a> {
25 /// Advances the iterator and returns the next value.
26 ///
27 /// If sampling is in happening, operations in the closure should be
28 /// quick and cheap. Slow iteration of raw bytes may throttle kernel
29 /// threads from outputting new data to the ring-buffer, and heavy
30 /// operations may affect the performance of the target process.
31 ///
32 /// # Examples
33 ///
34 /// ``` rust
35 /// use perf_event_open::config::{Cpu, Opts, Proc, SampleOn, Size};
36 /// use perf_event_open::count::Counter;
37 /// use perf_event_open::event::sw::Software;
38 ///
39 /// let event = Software::TaskClock;
40 /// let target = (Proc::ALL, Cpu(0));
41 ///
42 /// let mut opts = Opts::default();
43 /// opts.sample_on = SampleOn::Count(50_000); // 50us
44 /// opts.sample_format.user_stack = Some(Size(8)); // Dump 8-bytes user stack in sample.
45 ///
46 /// let counter = Counter::new(event, target, &opts).unwrap();
47 /// let sampler = counter.sampler(5).unwrap();
48 /// let mut iter = sampler.iter().into_cow();
49 ///
50 /// counter.enable().unwrap();
51 ///
52 /// let mut skipped = 0;
53 /// let it = loop {
54 /// let it = iter
55 /// .next(|cc, p| {
56 /// // ABI layout:
57 /// // u32 type
58 /// // u16 misc
59 /// // u16 size
60 /// // u64 len
61 /// // [u8; len] bytes
62 ///
63 /// let ptr = cc.as_bytes().as_ptr();
64 /// let ty = ptr as *const u32;
65 ///
66 /// // Only parse sample record with stack dumped.
67 /// if unsafe { *ty } == 9 {
68 /// let len = unsafe { ptr.offset(8) } as *const u64;
69 /// if unsafe { *len } > 0 {
70 /// return Some(p.parse(cc));
71 /// }
72 /// }
73 ///
74 /// skipped += 1;
75 /// None
76 /// })
77 /// .flatten();
78 ///
79 /// if let Some(it) = it {
80 /// break it;
81 /// }
82 /// };
83 ///
84 /// println!("skipped: {}", skipped);
85 /// println!("{:-?}", it);
86 /// ```
87 pub fn next<F, R>(&mut self, f: F) -> Option<R>
88 where
89 F: FnOnce(CowChunk<'_>, &Parser) -> R,
90 {
91 self.rb.lending_pop().map(|cc| f(cc, self.parser))
92 }
93
94 /// Creates an asynchronous iterator.
95 pub fn into_async(self) -> Result<AsyncCowIter<'a>> {
96 let epoll = epoll_create1(libc::O_CLOEXEC)?;
97 let mut event = libc::epoll_event {
98 events: (libc::EPOLLIN | libc::EPOLLHUP) as _,
99 u64: 0,
100 };
101 epoll_ctl(&epoll, libc::EPOLL_CTL_ADD, self.perf, &mut event)?;
102
103 let (tx, rx) = sync_channel::<Waker>(1);
104
105 thread::spawn(move || {
106 let mut events = {
107 let src = [MaybeUninit::<libc::epoll_event>::uninit()];
108 // We don't care which event triggers epoll because we only monitor one event
109 // but `epoll_wait` requires a non-empty buffer
110 unsafe { transmute::<[_; 1], [_; 1]>(src) }
111 };
112 'exit: while let Ok(waker) = rx.recv() {
113 loop {
114 match epoll_wait(&epoll, &mut events, -1).map(|it| it[0].events as _) {
115 Ok(libc::EPOLLIN) => {
116 waker.wake();
117 break;
118 }
119 Ok(libc::EPOLLHUP) => {
120 drop(rx);
121 waker.wake();
122 break 'exit;
123 }
124 _ => (), // Error can only be `EINTR`, ignore it and try again.
125 }
126 }
127 }
128 });
129
130 Ok(AsyncCowIter {
131 inner: self,
132 waker: tx,
133 })
134 }
135}
136
137/// Asynchronous COW record iterator.
138pub struct AsyncCowIter<'a> {
139 inner: CowIter<'a>,
140 waker: SyncSender<Waker>,
141}
142
143impl AsyncCowIter<'_> {
144 /// Advances the iterator and returns the next value.
145 ///
146 /// [`WakeUp::on`][crate::config::WakeUp::on] must be properly set to make this work.
147 ///
148 /// See also [`CowIter::next`].
149 pub async fn next<F, R>(&mut self, f: F) -> Option<R>
150 where
151 F: FnOnce(CowChunk<'_>, &Parser) -> R + Unpin,
152 {
153 struct Fut<I, F>(I, Option<F>);
154
155 impl<F, R> Future for Fut<&mut AsyncCowIter<'_>, F>
156 where
157 F: FnOnce(CowChunk<'_>, &Parser) -> R + Unpin,
158 {
159 type Output = Option<R>;
160
161 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
162 let Fut(iter, f) = self.get_mut();
163
164 if let Some(cc) = iter.inner.rb.lending_pop() {
165 let f = f.take();
166 // We only take `f` once, so there is always a value there.
167 let f = unsafe { f.unwrap_unchecked() };
168 return Poll::Ready(Some(f(cc, iter.inner.parser)));
169 }
170
171 let waker = cx.waker().clone();
172 match iter.waker.send(waker) {
173 Ok(()) => Poll::Pending,
174 // The task we were monitoring exited, so the epoll thread died.
175 // No more data needs to be produced.
176 Err(_) => Poll::Ready(None),
177 }
178 }
179 }
180
181 Fut(self, Some(f)).await
182 }
183}