use std::os::fd::{AsFd, AsRawFd, BorrowedFd};
use std::time::Duration;
use tokio::io::unix::AsyncFd;
use crate::afpacket::tx::Injector;
use crate::error::Error;
pub struct AsyncInjector {
inner: AsyncFd<Injector>,
}
impl AsyncInjector {
pub fn new(tx: Injector) -> Result<Self, Error> {
let fd = AsyncFd::with_interest(tx, tokio::io::Interest::WRITABLE).map_err(Error::Io)?;
Ok(Self { inner: fd })
}
pub async fn send(&mut self, data: &[u8]) -> Result<(), Error> {
let cap = self.inner.get_ref().frame_capacity();
if data.len() > cap {
return Err(Error::Config(format!(
"packet length {} exceeds TX frame capacity {}",
data.len(),
cap
)));
}
loop {
if let Some(mut slot) = self.inner.get_mut().allocate(data.len()) {
slot.data_mut()[..data.len()].copy_from_slice(data);
slot.set_len(data.len());
slot.send();
return Ok(());
}
let mut guard = self.inner.writable_mut().await.map_err(Error::Io)?;
guard.clear_ready();
drop(guard);
}
}
pub async fn flush(&mut self) -> Result<usize, Error> {
self.inner.get_mut().flush()
}
pub async fn wait_drained(&mut self, timeout: Duration) -> Result<(), Error> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
if self.inner.get_ref().pending_count() == 0 {
return Ok(());
}
let remaining = match deadline.checked_duration_since(tokio::time::Instant::now()) {
Some(r) => r,
None => {
return Err(Error::Io(std::io::Error::from(
std::io::ErrorKind::TimedOut,
)));
}
};
let slice = remaining.min(Duration::from_millis(10));
tokio::select! {
ready = self.inner.writable_mut() => {
let mut guard = ready.map_err(Error::Io)?;
guard.clear_ready();
}
_ = tokio::time::sleep(slice) => {}
}
}
}
pub fn get_ref(&self) -> &Injector {
self.inner.get_ref()
}
pub fn get_mut(&mut self) -> &mut Injector {
self.inner.get_mut()
}
pub fn into_inner(self) -> Injector {
self.inner.into_inner()
}
#[inline]
pub fn frame_capacity(&self) -> usize {
self.inner.get_ref().frame_capacity()
}
#[inline]
pub fn frame_count(&self) -> usize {
self.inner.get_ref().frame_count()
}
pub fn available_slots(&self) -> usize {
self.inner.get_ref().available_slots()
}
pub fn rejected_slots(&self) -> usize {
self.inner.get_ref().rejected_slots()
}
pub fn pending_count(&self) -> usize {
self.inner.get_ref().pending_count()
}
}
impl AsFd for AsyncInjector {
fn as_fd(&self) -> BorrowedFd<'_> {
self.inner.get_ref().as_fd()
}
}
impl AsRawFd for AsyncInjector {
fn as_raw_fd(&self) -> std::os::fd::RawFd {
self.inner.get_ref().as_raw_fd()
}
}