#![cfg(feature = "af-xdp")]
use std::os::fd::{AsFd, AsRawFd, BorrowedFd};
use std::time::Duration;
use tokio::io::Interest;
use tokio::io::unix::AsyncFd;
use crate::error::Error;
use crate::packet::OwnedPacket;
use crate::{XdpBatch, XdpSocket, XdpStats};
pub struct AsyncXdpSocket {
inner: AsyncFd<XdpSocket>,
}
impl AsyncXdpSocket {
pub fn new(socket: XdpSocket) -> Result<Self, Error> {
let fd = AsyncFd::with_interest(socket, Interest::READABLE | Interest::WRITABLE)
.map_err(Error::Io)?;
Ok(Self { inner: fd })
}
pub fn open(interface: &str) -> Result<Self, Error> {
Self::new(XdpSocket::open(interface)?)
}
pub async fn readable(&mut self) -> Result<XdpReadableGuard<'_>, Error> {
let guard = self.inner.readable_mut().await.map_err(Error::Io)?;
Ok(XdpReadableGuard { guard })
}
pub async fn try_recv_batch(&mut self) -> Result<XdpBatch<'_>, Error> {
loop {
let self_ptr: *mut Self = self;
let mut guard = unsafe { (*self_ptr).inner.readable_mut() }
.await
.map_err(Error::Io)?;
if let Some(batch) = guard.get_inner_mut().next_batch() {
let batch: XdpBatch<'_> = unsafe { std::mem::transmute(batch) };
return Ok(batch);
}
guard.clear_ready();
}
}
pub async fn recv(&mut self) -> Result<Vec<OwnedPacket>, Error> {
loop {
let mut guard = self.inner.readable_mut().await.map_err(Error::Io)?;
let packets = guard.get_inner_mut().recv()?;
if !packets.is_empty() {
return Ok(packets);
}
guard.clear_ready();
}
}
pub async fn send(&mut self, data: &[u8]) -> Result<(), Error> {
loop {
if self.inner.get_mut().send(data)? {
return Ok(());
}
let mut guard = self.inner.writable_mut().await.map_err(Error::Io)?;
guard.clear_ready();
}
}
pub async fn flush(&mut self) -> Result<(), Error> {
self.inner.get_mut().flush()
}
pub fn into_stream(self) -> XdpStream {
XdpStream { socket: self }
}
pub fn statistics(&self) -> Result<XdpStats, Error> {
self.inner.get_ref().statistics()
}
pub fn get_ref(&self) -> &XdpSocket {
self.inner.get_ref()
}
pub fn get_mut(&mut self) -> &mut XdpSocket {
self.inner.get_mut()
}
pub fn into_inner(self) -> XdpSocket {
self.inner.into_inner()
}
}
impl AsFd for AsyncXdpSocket {
fn as_fd(&self) -> BorrowedFd<'_> {
self.inner.get_ref().as_fd()
}
}
impl AsRawFd for AsyncXdpSocket {
fn as_raw_fd(&self) -> std::os::fd::RawFd {
self.inner.get_ref().as_raw_fd()
}
}
pub struct XdpReadableGuard<'a> {
guard: tokio::io::unix::AsyncFdReadyMutGuard<'a, XdpSocket>,
}
impl<'a> XdpReadableGuard<'a> {
pub fn next_batch(&mut self) -> Option<XdpBatch<'_>> {
let guard_ptr: *mut tokio::io::unix::AsyncFdReadyMutGuard<'a, XdpSocket> =
&raw mut self.guard;
let batch = unsafe { (*guard_ptr).get_inner_mut().next_batch() };
if batch.is_none() {
unsafe { (*guard_ptr).clear_ready() };
}
batch
}
pub fn get_inner_mut(&mut self) -> &mut XdpSocket {
self.guard.get_inner_mut()
}
}
pub struct XdpStream {
socket: AsyncXdpSocket,
}
impl XdpStream {
pub fn into_inner(self) -> AsyncXdpSocket {
self.socket
}
}
impl futures_core::Stream for XdpStream {
type Item = Result<Vec<OwnedPacket>, Error>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
let mut ready = match this.socket.inner.poll_read_ready_mut(cx) {
std::task::Poll::Ready(Ok(g)) => g,
std::task::Poll::Ready(Err(e)) => {
return std::task::Poll::Ready(Some(Err(Error::Io(e))));
}
std::task::Poll::Pending => return std::task::Poll::Pending,
};
match ready.get_inner_mut().recv() {
Ok(pkts) if !pkts.is_empty() => {
return std::task::Poll::Ready(Some(Ok(pkts)));
}
Ok(_) => {
ready.clear_ready();
}
Err(e) => return std::task::Poll::Ready(Some(Err(e))),
}
}
}
}
impl AsyncXdpSocket {
pub async fn wait_drained(&mut self, timeout: Duration) -> Result<(), Error> {
tokio::select! {
ready = self.inner.writable_mut() => {
let mut guard = ready.map_err(Error::Io)?;
guard.clear_ready();
Ok(())
}
_ = tokio::time::sleep(timeout) => {
Err(Error::Io(std::io::Error::from(std::io::ErrorKind::TimedOut)))
}
}
}
}