use crate::{PduRx, PduTx, error::Error, fmt, std::ParkSignal, std::unix::RawSocketDesc};
use core::{mem::MaybeUninit, task::Waker};
use io_uring::{IoUring, opcode};
use smallvec::{SmallVec, smallvec};
use std::{io, os::fd::AsRawFd, sync::Arc, time::Instant};
const WRITE_MASK: u64 = 1 << 63;
const ENTRIES: usize = 256;
pub fn tx_rx_task_io_uring<'sto>(
interface: &str,
mut pdu_tx: PduTx<'sto>,
mut pdu_rx: PduRx<'sto>,
) -> Result<(PduTx<'sto>, PduRx<'sto>), io::Error> {
let mut socket = RawSocketDesc::new(interface)?;
let mtu = socket.interface_mtu()?;
fmt::debug!(
"Opening {} with MTU {}, blocking, using io_uring",
interface,
mtu
);
let mtu = mtu + 18;
let mut bufs: slab::Slab<(io_uring::squeue::Entry, SmallVec<[u8; 1518]>)> =
slab::Slab::with_capacity(ENTRIES * 2);
let mut ring = IoUring::new(ENTRIES as u32)?;
let mut probe = io_uring::register::Probe::new();
ring.submitter().register_probe(&mut probe)?;
if !(probe.is_supported(opcode::Read::CODE) && probe.is_supported(opcode::Write::CODE)) {
log::error!("io_uring does not support read and/or write opcodes");
return Err(io::Error::other(Error::Internal));
}
let mut high_water_mark = 0;
let signal = Arc::new(ParkSignal::new());
let waker = Waker::from(Arc::clone(&signal));
loop {
pdu_tx.replace_waker(&waker);
let mut sent = 0;
while let Some(frame) = pdu_tx.next_sendable_frame() {
let idx = frame.storage_slot_index();
let tx_b = bufs.vacant_entry();
let tx_key = tx_b.key();
let (tx_entry, tx_buf) = tx_b.insert((
unsafe { MaybeUninit::zeroed().assume_init() },
smallvec![0; mtu],
));
frame
.send_blocking(|data: &[u8]| {
*tx_entry = opcode::Write::new(
io_uring::types::Fd(socket.as_raw_fd()),
data.as_ptr(),
data.len() as _,
)
.build()
.user_data(tx_key as u64 | WRITE_MASK);
tx_buf
.get_mut(0..data.len())
.ok_or(Error::Internal)?
.copy_from_slice(data);
while unsafe { ring.submission().push(tx_entry).is_err() } {
ring.submit().expect("Internal error, failed to submit ops");
}
sent += 1;
Ok(data.len())
})
.expect("Send blocking");
let rx_b = bufs.vacant_entry();
let rx_key = rx_b.key();
let (rx_entry, rx_buf) = rx_b.insert((
unsafe { MaybeUninit::zeroed().assume_init() },
smallvec![0; mtu],
));
*rx_entry = opcode::Read::new(
io_uring::types::Fd(socket.as_raw_fd()),
rx_buf.as_mut_ptr() as _,
rx_buf.len() as _,
)
.build()
.user_data(rx_key as u64);
fmt::trace!(
"Insert frame TX {:#04x}, key {}, RX key {}",
idx,
tx_key,
rx_key
);
while unsafe { ring.submission().push(rx_entry).is_err() } {
ring.submit().expect("Internal error, failed to submit ops");
}
high_water_mark = high_water_mark.max(bufs.len());
}
ring.submission().sync();
let now = Instant::now();
if sent > 0 {
ring.submit_and_wait(sent * 2)?;
}
fmt::trace!(
"Submitted, waited for {} completions for {} us",
ring.completion().len(),
now.elapsed().as_micros(),
);
for recv in unsafe { ring.completion_shared() } {
if recv.result() < 0 && recv.result() != -libc::EWOULDBLOCK {
return Err(io::Error::last_os_error());
}
let key = recv.user_data();
let received = Instant::now();
fmt::trace!(
"Got a frame by key {} -> {} {}",
key,
key & !WRITE_MASK,
if key & WRITE_MASK == WRITE_MASK {
"---->"
} else {
"<--"
}
);
if key & WRITE_MASK == WRITE_MASK {
let key = key & !WRITE_MASK;
bufs.remove(key as usize);
continue;
}
if recv.result() == -libc::EWOULDBLOCK {
fmt::trace!("Frame key {} would block. Queuing for retry", key);
let (rx_entry, _buf) = bufs.get(key as usize).expect("Could not get retry entry");
while unsafe { ring.submission_shared().push(rx_entry).is_err() } {
ring.submit().expect("Internal error, failed to submit ops");
}
} else {
let (_entry, frame) = bufs.remove(key as usize);
let frame_index = frame
.get(0x11)
.ok_or_else(|| io::Error::other(Error::Internal))?;
fmt::trace!(
"Raw frame {:#04x} result {} buffer key {}",
frame_index,
recv.result(),
key,
);
pdu_rx.receive_frame(&frame).map_err(io::Error::other)?;
fmt::trace!("Received frame in {} ns", received.elapsed().as_nanos());
}
}
if bufs.is_empty() {
fmt::trace!("No frames in flight, waiting to be woken with new frames to send");
let start = Instant::now();
signal.wait();
fmt::trace!("--> Waited for {} ns", start.elapsed().as_nanos());
if pdu_tx.should_exit() {
fmt::debug!("io_uring TX/RX was asked to exit");
return Ok((pdu_tx.release(), pdu_rx.release()));
}
} else {
fmt::trace!(
"Buf keys {:?} in flight",
bufs.iter().map(|(k, _v)| k).collect::<Vec<_>>(),
);
}
}
}