use std::{
borrow::{Borrow, BorrowMut},
path::Path,
};
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
use async_io::Async;
use bytes::BytesMut;
#[cfg(feature = "async_tokio")]
use tokio::io::unix::AsyncFd;
use crate::maps::{
perf::{Events, PerfBufferError, PerfEventArray, PerfEventArrayBuffer},
MapData, MapError, PinError,
};
#[doc(alias = "BPF_MAP_TYPE_PERF_EVENT_ARRAY")]
pub struct AsyncPerfEventArray<T> {
perf_map: PerfEventArray<T>,
}
impl<T: BorrowMut<MapData>> AsyncPerfEventArray<T> {
pub fn open(
&mut self,
index: u32,
page_count: Option<usize>,
) -> Result<AsyncPerfEventArrayBuffer<T>, PerfBufferError> {
let Self { perf_map } = self;
let buf = perf_map.open(index, page_count)?;
#[cfg(feature = "async_tokio")]
let buf = AsyncFd::new(buf)?;
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
let buf = Async::new(buf)?;
Ok(AsyncPerfEventArrayBuffer { buf })
}
pub fn pin<P: AsRef<Path>>(&self, path: P) -> Result<(), PinError> {
self.perf_map.pin(path)
}
}
impl<T: Borrow<MapData>> AsyncPerfEventArray<T> {
pub(crate) fn new(map: T) -> Result<Self, MapError> {
Ok(Self {
perf_map: PerfEventArray::new(map)?,
})
}
}
pub struct AsyncPerfEventArrayBuffer<T: BorrowMut<MapData>> {
#[cfg(not(any(feature = "async_tokio", feature = "async_std")))]
buf: PerfEventArrayBuffer<T>,
#[cfg(feature = "async_tokio")]
buf: AsyncFd<PerfEventArrayBuffer<T>>,
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
buf: Async<PerfEventArrayBuffer<T>>,
}
impl<T: BorrowMut<MapData>> AsyncPerfEventArrayBuffer<T> {
pub async fn read_events(
&mut self,
buffers: &mut [BytesMut],
) -> Result<Events, PerfBufferError> {
let Self { buf } = self;
loop {
#[cfg(feature = "async_tokio")]
let mut guard = buf.readable_mut().await?;
#[cfg(feature = "async_tokio")]
let buf = guard.get_inner_mut();
#[cfg(all(not(feature = "async_tokio"), feature = "async_std"))]
let buf = {
if !buf.get_ref().readable() {
buf.readable().await?;
}
unsafe { buf.get_mut() }
};
let events = buf.read_events(buffers)?;
const EMPTY: Events = Events { read: 0, lost: 0 };
if events != EMPTY {
break Ok(events);
}
#[cfg(feature = "async_tokio")]
guard.clear_ready();
}
}
}