use futures::prelude::*;
use std::os::unix::io::RawFd;
use std::pin::Pin;
use std::slice;
use std::task::{Context, Poll};
use tokio::io::unix::AsyncFd;
use tokio::io::Interest;
use tracing::error;
use crate::{Event, PerfMap};
pub struct PerfMessageStream {
poll: AsyncFd<RawFd>,
map: PerfMap,
name: String,
}
impl PerfMessageStream {
pub fn new(name: String, map: PerfMap) -> Self {
let poll = AsyncFd::with_interest(map.fd, Interest::READABLE).unwrap();
PerfMessageStream { poll, map, name }
}
fn read_messages(&mut self) -> Vec<Box<[u8]>> {
let mut ret = Vec::new();
while let Some(ev) = self.map.read() {
match ev {
Event::Lost(lost) => {
error!("Possibly lost {} samples for {}", lost.count, &self.name);
}
Event::Sample(sample) => {
let msg = unsafe {
slice::from_raw_parts(sample.data.as_ptr(), sample.size as usize)
.to_vec()
.into_boxed_slice()
};
ret.push(msg);
}
};
}
ret
}
}
impl Stream for PerfMessageStream {
type Item = Vec<Box<[u8]>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match self.poll.poll_read_ready(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => {
error!("PerfMessageStream error: {:?}", e);
return Poll::Ready(None);
}
Poll::Ready(Ok(mut rg)) => rg.clear_ready(),
};
Some(self.read_messages()).into()
}
}