use std::io;
use std::sync::atomic::Ordering;
use std::task::Context;
use std::time::Instant;
use io_uring::cqueue;
use crate::chain::ChainEvent;
use crate::completion::{OpTag, UserData};
use crate::connection::RecvMode;
use crate::driver::Driver;
use crate::driver::sockaddr_to_socket_addr;
use crate::metrics;
use crate::runtime::handler::AsyncEventHandler;
use crate::runtime::io::{ConnCtx, DriverState, UdpCtx, clear_driver_state, set_driver_state};
use crate::runtime::waker::{STANDALONE_BIT, conn_waker, standalone_waker};
use crate::runtime::{CURRENT_TASK_ID, Executor, TimerSlotPool};
pub(crate) struct AsyncEventLoop<A: AsyncEventHandler> {
driver: Driver,
handler: A,
executor: Executor,
}
impl<A: AsyncEventHandler> AsyncEventLoop<A> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
config: &crate::config::Config,
handler: A,
accept_rx: Option<crossbeam_channel::Receiver<(std::os::fd::RawFd, std::net::SocketAddr)>>,
eventfd: std::os::fd::RawFd,
shutdown_flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
resolve_rx: Option<crossbeam_channel::Receiver<crate::resolver::ResolveResponse>>,
resolve_tx: Option<crossbeam_channel::Sender<crate::resolver::ResolveResponse>>,
resolver: Option<std::sync::Arc<crate::resolver::ResolverPool>>,
spawn_rx: Option<crossbeam_channel::Receiver<crate::spawner::SpawnResponse>>,
spawn_tx: Option<crossbeam_channel::Sender<crate::spawner::SpawnResponse>>,
spawner: Option<std::sync::Arc<crate::spawner::SpawnerPool>>,
blocking_rx: Option<crossbeam_channel::Receiver<crate::blocking::BlockingResponse>>,
blocking_tx: Option<crossbeam_channel::Sender<crate::blocking::BlockingResponse>>,
blocking_pool: Option<std::sync::Arc<crate::blocking::BlockingPool>>,
) -> Result<Self, crate::error::Error> {
let driver = Driver::new(
config,
accept_rx,
eventfd,
shutdown_flag,
resolve_rx,
resolve_tx,
resolver,
spawn_rx,
spawn_tx,
spawner,
blocking_rx,
blocking_tx,
blocking_pool,
)?;
let executor = Executor::new(
config.max_connections,
config.standalone_task_capacity,
config.timer_slots,
config.udp_bind.len() as u32,
);
Ok(AsyncEventLoop {
driver,
handler,
executor,
})
}
pub(crate) fn run(&mut self) -> Result<(), crate::error::Error> {
self.driver
.ring
.submit_eventfd_read(self.driver.eventfd, self.driver.eventfd_buf.as_mut_ptr())?;
self.driver.eventfd_armed = true;
let kick: u64 = 1;
unsafe {
libc::write(
self.driver.eventfd,
&kick as *const u64 as *const libc::c_void,
8,
);
}
for udp_idx in 0..self.driver.udp_sockets.len() {
let udp_ctx = UdpCtx {
udp_index: udp_idx as u32,
};
if let Some(future) = self.handler.on_udp_bind(udp_ctx)
&& let Some(idx) = self.executor.standalone_slab.spawn(future)
{
self.executor.ready_queue.push_back(idx | STANDALONE_BIT);
}
}
if let Some(future) = self.handler.on_start()
&& let Some(idx) = self.executor.standalone_slab.spawn(future)
{
self.executor.ready_queue.push_back(idx | STANDALONE_BIT);
}
loop {
if !self.driver.eventfd_armed && !self.driver.shutdown_flag.load(Ordering::Relaxed) {
self.driver.eventfd_armed = self
.driver
.ring
.submit_eventfd_read(self.driver.eventfd, self.driver.eventfd_buf.as_mut_ptr())
.is_ok();
}
if !self.driver.tick_timeout_armed
&& let Some(ref ts) = self.driver.tick_timeout_ts
{
let ud = UserData::encode(OpTag::TickTimeout, 0, 0);
let _ = self
.driver
.ring
.submit_tick_timeout(ts as *const _, ud.raw());
self.driver.tick_timeout_armed = true;
}
self.driver.ring.submit_and_wait(1)?;
self.drain_completions();
if self.driver.shutdown_local || self.driver.shutdown_flag.load(Ordering::Relaxed) {
self.driver.run_shutdown();
return Ok(());
}
if !self.driver.pending_replenish.is_empty() {
self.driver
.provided_bufs
.replenish_batch(&self.driver.pending_replenish);
self.driver.pending_replenish.clear();
}
if !self.driver.pending_zc_retries.is_empty() {
let retries: Vec<_> = self.driver.pending_zc_retries.drain(..).collect();
for (conn_index, generation, slab_idx) in retries {
if !self.driver.send_slab.in_use(slab_idx) {
continue; }
if self.driver.connections.get(conn_index).is_none()
|| self.driver.connections.generation(conn_index) != generation
{
self.driver.send_slab.mark_awaiting_notifications(slab_idx);
if self.driver.send_slab.should_release(slab_idx) {
let pool_slot = self.driver.send_slab.release(slab_idx);
if pool_slot != u16::MAX {
self.driver.send_copy_pool.release(pool_slot);
}
}
continue;
}
let msg_ptr = self.driver.send_slab.msghdr_ptr(slab_idx);
if self
.driver
.ring
.submit_send_msg_zc(conn_index, msg_ptr, slab_idx)
.is_err()
{
self.driver.send_slab.mark_awaiting_notifications(slab_idx);
if self.driver.send_slab.should_release(slab_idx) {
let pool_slot = self.driver.send_slab.release(slab_idx);
if pool_slot != u16::MAX {
self.driver.send_copy_pool.release(pool_slot);
}
}
self.driver.drain_conn_send_queue(conn_index);
let err = io::Error::other("SQ full during partial ZC send resubmission");
self.executor.wake_send(conn_index, Err(err));
self.driver.close_connection(conn_index);
}
}
}
if !self.driver.pending_copy_retries.is_empty() {
let retries: Vec<_> = self.driver.pending_copy_retries.drain(..).collect();
for (conn_index, generation, pool_slot) in retries {
if !self.driver.send_copy_pool.in_use(pool_slot) {
continue;
}
if self.driver.connections.get(conn_index).is_none()
|| self.driver.connections.generation(conn_index) != generation
{
self.driver.send_copy_pool.release(pool_slot);
continue;
}
let (ptr, remaining) =
self.driver.send_copy_pool.current_ptr_remaining(pool_slot);
let is_tls = self
.driver
.tls_table
.as_ref()
.is_some_and(|t| t.has(conn_index));
let result = if is_tls {
self.driver
.ring
.submit_tls_send(conn_index, ptr, remaining, pool_slot)
} else {
self.driver
.ring
.submit_send_copied(conn_index, ptr, remaining, pool_slot)
};
if result.is_err() {
self.driver.send_copy_pool.release(pool_slot);
self.driver.drain_conn_send_queue(conn_index);
let err = io::Error::other("SQ full during partial copy send resubmission");
self.executor.wake_send(conn_index, Err(err));
self.driver.close_connection(conn_index);
}
}
}
if !self.driver.pending_close_retries.is_empty() {
let retries: Vec<_> = self.driver.pending_close_retries.drain(..).collect();
for conn_index in retries {
if self.driver.ring.submit_close(conn_index).is_err() {
self.driver.pending_close_retries.push(conn_index);
}
}
}
self.executor.collect_wakeups();
self.poll_ready_tasks();
let mut ctx = self.driver.make_ctx();
self.handler.on_tick(&mut ctx);
}
}
fn poll_ready_tasks(&mut self) {
let driver = &mut self.driver as *mut Driver;
let executor = &mut self.executor as *mut Executor;
let mut driver_state = DriverState { driver, executor };
set_driver_state(&mut driver_state);
let driver = unsafe { &mut *driver };
let executor = unsafe { &mut *executor };
let mut i = 0;
while i < executor.ready_queue.len() {
let raw_id = executor.ready_queue[i];
i += 1;
if raw_id & STANDALONE_BIT != 0 {
let task_idx = raw_id & !STANDALONE_BIT;
if let Some(mut fut) = executor.standalone_slab.take_ready(task_idx) {
let waker = standalone_waker(task_idx);
let mut cx = Context::from_waker(&waker);
CURRENT_TASK_ID.with(|c| c.set(raw_id));
match fut.as_mut().poll(&mut cx) {
std::task::Poll::Ready(()) => {
executor.standalone_slab.remove(task_idx);
}
std::task::Poll::Pending => {
executor.standalone_slab.park(task_idx, fut);
}
}
}
} else {
let conn_index = raw_id;
if let Some(mut fut) = executor.task_slab.take_ready(conn_index) {
let waker = conn_waker(conn_index);
let mut cx = Context::from_waker(&waker);
CURRENT_TASK_ID.with(|c| c.set(conn_index));
match fut.as_mut().poll(&mut cx) {
std::task::Poll::Ready(()) => {
driver.close_connection(conn_index);
executor.remove_connection(conn_index);
}
std::task::Poll::Pending => {
executor.task_slab.park(conn_index, fut);
}
}
}
}
}
clear_driver_state();
executor.ready_queue.clear();
executor.collect_wakeups();
}
fn drain_completions(&mut self) {
self.driver.cqe_batch.clear();
{
let cq = self.driver.ring.ring.completion();
for cqe in cq {
self.driver.cqe_batch.push((
cqe.user_data(),
cqe.result(),
cqe.flags(),
*cqe.big_cqe(),
));
}
}
if let Some(interval) = self.driver.flush_interval {
let mut last_flush = Instant::now();
for i in 0..self.driver.cqe_batch.len() {
let (user_data_raw, result, flags, _big_cqe) = self.driver.cqe_batch[i];
self.dispatch_cqe(user_data_raw, result, flags);
if (i & 0xF) == 0xF {
let now = Instant::now();
if now.duration_since(last_flush) >= interval {
let _ = self.driver.ring.flush();
last_flush = now;
}
}
}
} else {
for i in 0..self.driver.cqe_batch.len() {
let (user_data_raw, result, flags, _big_cqe) = self.driver.cqe_batch[i];
self.dispatch_cqe(user_data_raw, result, flags);
}
}
}
fn dispatch_cqe(&mut self, user_data_raw: u64, result: i32, flags: u32) {
metrics::CQE_PROCESSED.increment();
let ud = UserData(user_data_raw);
let tag = match ud.tag() {
Some(t) => t,
None => return,
};
match tag {
OpTag::RecvMulti => self.handle_recv_multi(ud, result, flags),
OpTag::Send => self.handle_send(ud, result),
OpTag::SendMsgZc => self.handle_send_msg_zc(ud, result, flags),
OpTag::Close => self.handle_close(ud),
OpTag::Shutdown => {}
OpTag::EventFdRead => self.handle_eventfd_read(),
OpTag::TlsSend => self.handle_tls_send(ud, result),
OpTag::Connect => self.handle_connect(ud, result),
OpTag::Timeout => self.handle_timeout(ud, result),
OpTag::Cancel => {}
OpTag::TickTimeout => {
self.driver.tick_timeout_armed = false;
}
OpTag::Timer => self.handle_timer(ud, result),
OpTag::RecvMsgUdp => self.handle_recv_msg_udp(ud, result),
OpTag::SendMsgUdp => self.handle_send_msg_udp(ud, result),
OpTag::NvmeCmd => self.handle_nvme_cmd(ud, result),
OpTag::DirectIo => self.handle_direct_io(ud, result),
OpTag::Fs => self.handle_fs(ud, result),
OpTag::PidfdPoll => self.handle_pidfd_poll(ud, result),
OpTag::SendRecvBuf => self.handle_send_recv_buf(ud, result),
#[cfg(feature = "timestamps")]
OpTag::RecvMsgMultiTs => self.handle_recv_msg_multi_ts(ud, result, flags),
}
}
fn handle_recv_multi(&mut self, ud: UserData, result: i32, flags: u32) {
let conn_index = ud.conn_index();
let has_more = cqueue::more(flags);
if self.driver.connections.get(conn_index).is_none() {
if result > 0
&& let Some(bid) = cqueue::buffer_select(flags)
{
self.driver.pending_replenish.push(bid);
}
return;
}
if result <= 0 {
if result == 0 {
self.executor.wake_recv(conn_index);
self.driver.close_connection(conn_index);
return;
}
let errno = -result;
if errno == libc::ENOBUFS {
metrics::BUFFER_RING_EMPTY.increment();
if !has_more && self.driver.ring.submit_multishot_recv(conn_index).is_err() {
metrics::RECV_ARM_FAILURES.increment();
self.executor.wake_recv(conn_index);
self.driver.close_connection(conn_index);
}
} else if errno == libc::ECANCELED {
return;
} else if !has_more {
self.executor.wake_recv(conn_index);
self.driver.close_connection(conn_index);
}
return;
}
let bid = match cqueue::buffer_select(flags) {
Some(bid) => bid,
None => {
if !has_more {
self.executor.wake_recv(conn_index);
self.driver.close_connection(conn_index);
}
return;
}
};
let bytes_received = result as u32;
metrics::BYTES_RECEIVED.add(bytes_received as u64);
let (buf_ptr, _) = self.driver.provided_bufs.get_buffer(bid);
let data = unsafe { std::slice::from_raw_parts(buf_ptr, bytes_received as usize) };
let is_tls_conn = self
.driver
.tls_table
.as_ref()
.is_some_and(|t| t.has(conn_index));
if is_tls_conn {
self.driver.pending_replenish.push(bid);
{
let tls_table = self.driver.tls_table.as_mut().unwrap();
let result = crate::tls::feed_tls_recv(
tls_table,
&mut self.driver.accumulators,
&mut self.driver.ring,
&mut self.driver.send_copy_pool,
&mut self.driver.tls_scratch,
conn_index,
data,
);
match result {
crate::tls::TlsRecvResult::HandshakeJustCompleted => {
let is_outbound = self
.driver
.connections
.get(conn_index)
.map(|c| c.outbound)
.unwrap_or(false);
if is_outbound {
if let Some(cs) = self.driver.connections.get_mut(conn_index) {
cs.established = true;
}
self.executor.wake_connect(conn_index, Ok(()));
} else {
if let Some(cs) = self.driver.connections.get_mut(conn_index) {
cs.established = true;
}
metrics::CONNECTIONS_ACCEPTED.increment();
metrics::CONNECTIONS_ACTIVE.increment();
self.spawn_accept_task(conn_index);
}
self.executor.wake_recv(conn_index);
}
crate::tls::TlsRecvResult::Ok => {
self.executor.wake_recv(conn_index);
}
crate::tls::TlsRecvResult::Error(e) => {
let established = self
.driver
.connections
.get(conn_index)
.map(|c| c.established)
.unwrap_or(false);
if !established {
let err = std::io::Error::new(std::io::ErrorKind::ConnectionReset, e);
self.executor.wake_connect(conn_index, Err(err));
}
self.executor.wake_recv(conn_index);
self.driver.close_connection(conn_index);
}
crate::tls::TlsRecvResult::Closed => {
self.executor.wake_recv(conn_index);
self.driver.close_connection(conn_index);
}
}
}
} else {
if let Some(sink) = &mut self.executor.recv_sinks[conn_index as usize] {
self.driver.pending_replenish.push(bid);
let remaining_cap = sink.cap - sink.pos;
let to_sink = data.len().min(remaining_cap);
if to_sink > 0 {
unsafe {
std::ptr::copy_nonoverlapping(
data.as_ptr(),
sink.ptr.add(sink.pos),
to_sink,
);
}
sink.pos += to_sink;
}
if to_sink < data.len() {
self.driver
.accumulators
.append(conn_index, &data[to_sink..]);
}
} else {
let acc_empty = self.driver.accumulators.data(conn_index).is_empty();
let slot = &mut self.driver.pending_recv_bufs[conn_index as usize];
if acc_empty && slot.is_none() {
*slot = Some(crate::driver::PendingRecvBuf {
bid,
len: bytes_received,
ptr: buf_ptr,
});
} else {
if let Some(pending) = slot.take() {
let pending_data = unsafe {
std::slice::from_raw_parts(pending.ptr, pending.len as usize)
};
self.driver.accumulators.append(conn_index, pending_data);
self.driver.pending_replenish.push(pending.bid);
}
self.driver.accumulators.append(conn_index, data);
self.driver.pending_replenish.push(bid);
}
}
self.executor.wake_recv(conn_index);
}
if !has_more
&& let Some(conn) = self.driver.connections.get(conn_index)
&& matches!(conn.recv_mode, RecvMode::Multi)
&& self.driver.ring.submit_multishot_recv(conn_index).is_err()
{
metrics::RECV_ARM_FAILURES.increment();
self.executor.wake_recv(conn_index);
self.driver.close_connection(conn_index);
}
}
#[cfg(feature = "timestamps")]
fn handle_recv_msg_multi_ts(&mut self, ud: UserData, result: i32, flags: u32) {
let conn_index = ud.conn_index();
let has_more = cqueue::more(flags);
if self.driver.connections.get(conn_index).is_none() {
if result > 0
&& let Some(bid) = cqueue::buffer_select(flags)
{
self.driver.pending_replenish.push(bid);
}
return;
}
if result <= 0 {
if result == 0 {
self.executor.wake_recv(conn_index);
self.driver.close_connection(conn_index);
return;
}
let errno = -result;
if errno == libc::ENOBUFS {
metrics::BUFFER_RING_EMPTY.increment();
if !has_more {
let msghdr_ptr = &*self.driver.recvmsg_msghdr as *const libc::msghdr;
let _ = self
.driver
.ring
.submit_multishot_recvmsg(conn_index, msghdr_ptr);
}
} else if errno == libc::ECANCELED {
return;
} else if !has_more {
self.executor.wake_recv(conn_index);
self.driver.close_connection(conn_index);
}
return;
}
let bid = match cqueue::buffer_select(flags) {
Some(bid) => bid,
None => {
if !has_more {
self.executor.wake_recv(conn_index);
self.driver.close_connection(conn_index);
}
return;
}
};
let buf_len = result as u32;
let (buf_ptr, _) = self.driver.provided_bufs.get_buffer(bid);
let buf = unsafe { std::slice::from_raw_parts(buf_ptr, buf_len as usize) };
self.driver.pending_replenish.push(bid);
let msg_out = match io_uring::types::RecvMsgOut::parse(buf, &self.driver.recvmsg_msghdr) {
Ok(out) => out,
Err(()) => {
return;
}
};
let payload = msg_out.payload_data();
if payload.is_empty() {
self.executor.wake_recv(conn_index);
self.driver.close_connection(conn_index);
return;
}
metrics::BYTES_RECEIVED.add(payload.len() as u64);
let control = msg_out.control_data();
if let Some(ts_ns) = Self::parse_scm_timestamp(control) {
if let Some(cs) = self.driver.connections.get_mut(conn_index) {
cs.recv_timestamp_ns = ts_ns;
}
}
if let Some(sink) = &mut self.executor.recv_sinks[conn_index as usize] {
let remaining_cap = sink.cap - sink.pos;
let to_sink = payload.len().min(remaining_cap);
if to_sink > 0 {
unsafe {
std::ptr::copy_nonoverlapping(
payload.as_ptr(),
sink.ptr.add(sink.pos),
to_sink,
);
}
sink.pos += to_sink;
}
if to_sink < payload.len() {
self.driver
.accumulators
.append(conn_index, &payload[to_sink..]);
}
} else {
self.driver.accumulators.append(conn_index, payload);
}
self.executor.wake_recv(conn_index);
if !has_more
&& let Some(conn) = self.driver.connections.get(conn_index)
&& matches!(conn.recv_mode, RecvMode::MsgMulti)
{
let msghdr_ptr = &*self.driver.recvmsg_msghdr as *const libc::msghdr;
let _ = self
.driver
.ring
.submit_multishot_recvmsg(conn_index, msghdr_ptr);
}
}
#[cfg(feature = "timestamps")]
fn parse_scm_timestamp(control: &[u8]) -> Option<u64> {
let hdr_size = std::mem::size_of::<libc::cmsghdr>();
let align = std::mem::align_of::<libc::cmsghdr>();
let mut offset = 0usize;
while offset + hdr_size <= control.len() {
let hdr_ptr = control[offset..].as_ptr() as *const libc::cmsghdr;
let hdr = unsafe { std::ptr::read_unaligned(hdr_ptr) };
if hdr.cmsg_len < hdr_size {
break;
}
let data_offset = offset + hdr_size;
let data_len = hdr.cmsg_len - hdr_size;
if hdr.cmsg_level == libc::SOL_SOCKET && hdr.cmsg_type == libc::SO_TIMESTAMPING {
let ts_size = std::mem::size_of::<libc::timespec>();
if data_len >= ts_size && data_offset + ts_size <= control.len() {
let ts_ptr = control[data_offset..].as_ptr() as *const libc::timespec;
let ts = unsafe { std::ptr::read_unaligned(ts_ptr) };
if ts.tv_sec != 0 || ts.tv_nsec != 0 {
return Some(ts.tv_sec as u64 * 1_000_000_000 + ts.tv_nsec as u64);
}
}
}
let next = offset + ((hdr.cmsg_len + align - 1) & !(align - 1));
if next <= offset {
break;
}
offset = next;
}
None
}
fn handle_eventfd_read(&mut self) {
{
loop {
let item = match self.driver.accept_rx {
Some(ref rx) => rx.try_recv().ok(),
None => None,
};
let Some((raw_fd, peer_addr)) = item else {
break;
};
let conn_index = match self.driver.connections.allocate() {
Some(idx) => idx,
None => {
unsafe {
libc::close(raw_fd);
}
continue;
}
};
if let Some(cs) = self.driver.connections.get_mut(conn_index) {
cs.peer_addr = Some(crate::connection::PeerAddr::Tcp(peer_addr));
}
if self
.driver
.ring
.register_files_update(conn_index, &[raw_fd])
.is_err()
{
self.driver.connections.release(conn_index);
unsafe {
libc::close(raw_fd);
}
continue;
}
unsafe {
libc::close(raw_fd);
}
if let Some(pending) = self.driver.pending_recv_bufs[conn_index as usize].take() {
self.driver.pending_replenish.push(pending.bid);
}
self.driver.accumulators.reset(conn_index);
self.arm_recv(conn_index);
if let Some(ref mut tls_table) = self.driver.tls_table
&& tls_table.has_server_config()
{
if tls_table.create(conn_index).is_err() {
self.driver.close_connection(conn_index);
}
continue;
}
if let Some(cs) = self.driver.connections.get_mut(conn_index) {
cs.established = true;
}
metrics::CONNECTIONS_ACCEPTED.increment();
metrics::CONNECTIONS_ACTIVE.increment();
self.spawn_accept_task(conn_index);
}
}
if let Some(ref rx) = self.driver.resolve_rx {
while let Ok(response) = rx.try_recv() {
self.executor
.deliver_resolve(response.request_id, response.result);
}
}
if let Some(ref rx) = self.driver.spawn_rx {
while let Ok(response) = rx.try_recv() {
self.executor
.deliver_spawn(response.request_id, response.result);
}
}
if let Some(ref rx) = self.driver.blocking_rx {
while let Ok(response) = rx.try_recv() {
self.executor
.deliver_blocking(response.request_id, response.result);
}
}
{
let mut ctx = self.driver.make_ctx();
self.handler.on_notify(&mut ctx);
}
if !self.driver.shutdown_flag.load(Ordering::Relaxed) {
self.driver.eventfd_armed = self
.driver
.ring
.submit_eventfd_read(self.driver.eventfd, self.driver.eventfd_buf.as_mut_ptr())
.is_ok();
}
}
fn handle_send(&mut self, ud: UserData, result: i32) {
let conn_index = ud.conn_index();
let pool_slot = ud.payload() as u16;
if !self.driver.send_copy_pool.in_use(pool_slot) {
return;
}
if self.driver.chain_table.is_active(conn_index) {
self.driver.send_copy_pool.release(pool_slot);
let event = self.driver.chain_table.on_operation_cqe(conn_index, result);
if matches!(event, ChainEvent::Complete { .. }) {
self.fire_chain_complete(conn_index);
}
return;
}
if result > 0 {
if let Some((ptr, remaining)) = self
.driver
.send_copy_pool
.try_advance(pool_slot, result as u32)
{
if self
.driver
.ring
.submit_send_copied(conn_index, ptr, remaining, pool_slot)
.is_err()
{
let generation = self.driver.connections.generation(conn_index);
self.driver
.pending_copy_retries
.push((conn_index, generation, pool_slot));
}
return;
}
let total = self.driver.send_copy_pool.original_len(pool_slot);
metrics::BYTES_SENT.add(total as u64);
self.driver.send_copy_pool.release(pool_slot);
self.driver.submit_next_queued(conn_index);
self.executor.wake_send(conn_index, Ok(total));
return;
}
self.driver.send_copy_pool.release(pool_slot);
self.driver.drain_conn_send_queue(conn_index);
let io_result = if result == 0 {
Ok(0u32)
} else {
Err(io::Error::from_raw_os_error(-result))
};
self.executor.wake_send(conn_index, io_result);
}
fn handle_send_recv_buf(&mut self, ud: UserData, result: i32) {
let conn_index = ud.conn_index();
let payload = ud.payload();
let bid = (payload & 0xFFFF) as u16;
let remaining_before = payload >> 16;
if result > 0 {
let bytes_sent = result as u32;
if bytes_sent < remaining_before {
let new_remaining = remaining_before - bytes_sent;
let (buf_ptr, _buf_size) = self.driver.provided_bufs.get_buffer(bid);
let original_len = self.driver.send_recv_buf_original_lens[conn_index as usize];
let offset = original_len - new_remaining;
let new_ptr = unsafe { buf_ptr.add(offset as usize) };
let new_payload = (bid as u32) | ((new_remaining) << 16);
let new_ud = UserData::encode(
crate::completion::OpTag::SendRecvBuf,
conn_index,
new_payload,
);
let entry = io_uring::opcode::Send::new(
io_uring::types::Fixed(conn_index),
new_ptr,
new_remaining,
)
.build()
.user_data(new_ud.raw());
if unsafe { self.driver.ring.push_sqe(entry) }.is_err() {
self.driver.pending_replenish.push(bid);
self.driver.submit_next_queued(conn_index);
}
return;
}
metrics::BYTES_SENT.add(remaining_before as u64);
self.driver.pending_replenish.push(bid);
self.driver.submit_next_queued(conn_index);
self.executor.wake_send(conn_index, Ok(remaining_before));
return;
}
self.driver.pending_replenish.push(bid);
self.driver.submit_next_queued(conn_index);
let io_result = if result == 0 {
Ok(0u32)
} else {
Err(io::Error::from_raw_os_error(-result))
};
self.executor.wake_send(conn_index, io_result);
}
fn handle_send_msg_zc(&mut self, ud: UserData, result: i32, flags: u32) {
let conn_index = ud.conn_index();
let slab_idx = ud.payload() as u16;
if !self.driver.send_slab.in_use(slab_idx) {
return;
}
if self.driver.chain_table.is_active(conn_index) {
if cqueue::notif(flags) {
self.driver.send_slab.dec_pending_notifs(slab_idx);
if self.driver.send_slab.should_release(slab_idx) {
let ps = self.driver.send_slab.release(slab_idx);
if ps != u16::MAX {
self.driver.send_copy_pool.release(ps);
}
}
let event = self.driver.chain_table.on_notif_cqe(conn_index);
if matches!(event, ChainEvent::Complete { .. }) {
self.fire_chain_complete(conn_index);
}
return;
}
if result == -libc::ECANCELED {
let ps = self.driver.send_slab.release(slab_idx);
if ps != u16::MAX {
self.driver.send_copy_pool.release(ps);
}
} else if result > 0 {
self.driver.send_slab.inc_pending_notifs(slab_idx);
self.driver.send_slab.mark_awaiting_notifications(slab_idx);
self.driver.chain_table.inc_zc_notif(conn_index);
} else {
let ps = self.driver.send_slab.release(slab_idx);
if ps != u16::MAX {
self.driver.send_copy_pool.release(ps);
}
}
let event = self.driver.chain_table.on_operation_cqe(conn_index, result);
if matches!(event, ChainEvent::Complete { .. }) {
self.fire_chain_complete(conn_index);
}
return;
}
if cqueue::notif(flags) {
self.driver.send_slab.dec_pending_notifs(slab_idx);
if self.driver.send_slab.should_release(slab_idx) {
let pool_slot = self.driver.send_slab.release(slab_idx);
if pool_slot != u16::MAX {
self.driver.send_copy_pool.release(pool_slot);
}
}
return;
}
if result > 0 {
self.driver.send_slab.inc_pending_notifs(slab_idx);
}
#[allow(clippy::collapsible_if)]
if result > 0 {
if let Some(msg_ptr) = self.driver.send_slab.try_advance(slab_idx, result as u32) {
if self
.driver
.ring
.submit_send_msg_zc(conn_index, msg_ptr, slab_idx)
.is_ok()
{
return;
}
let generation = self.driver.connections.generation(conn_index);
self.driver
.pending_zc_retries
.push((conn_index, generation, slab_idx));
return;
}
}
self.driver.send_slab.mark_awaiting_notifications(slab_idx);
let total_len = self.driver.send_slab.total_len(slab_idx);
let should_release = self.driver.send_slab.should_release(slab_idx);
if should_release {
let pool_slot = self.driver.send_slab.release(slab_idx);
if pool_slot != u16::MAX {
self.driver.send_copy_pool.release(pool_slot);
}
}
if result >= 0 {
metrics::BYTES_SENT.add(total_len as u64);
self.driver.submit_next_queued(conn_index);
} else {
self.driver.drain_conn_send_queue(conn_index);
}
let io_result = if result >= 0 {
Ok(total_len)
} else {
Err(io::Error::from_raw_os_error(-result))
};
self.executor.wake_send(conn_index, io_result);
}
fn handle_connect(&mut self, ud: UserData, result: i32) {
let conn_index = ud.conn_index();
if self.driver.connections.get(conn_index).is_none() {
return;
}
if result < 0 {
let errno = -result;
if errno == libc::ECANCELED {
let timeout_armed = self
.driver
.connections
.get(conn_index)
.map(|c| c.connect_timeout_armed)
.unwrap_or(false);
if !timeout_armed {
let err = io::Error::from_raw_os_error(errno);
self.executor.wake_connect(conn_index, Err(err));
self.driver.close_connection(conn_index);
return;
}
if let Some(cs) = self.driver.connections.get_mut(conn_index) {
cs.connect_timeout_armed = false;
}
return;
}
if self
.driver
.connections
.get(conn_index)
.map(|c| c.connect_timeout_armed)
.unwrap_or(false)
{
let timeout_ud = UserData::encode(OpTag::Timeout, conn_index, 0);
let _ = self
.driver
.ring
.submit_async_cancel(timeout_ud.raw(), conn_index);
if let Some(cs) = self.driver.connections.get_mut(conn_index) {
cs.connect_timeout_armed = false;
}
}
if let Some(ref mut tls_table) = self.driver.tls_table {
tls_table.remove(conn_index);
}
let err = io::Error::from_raw_os_error(errno);
self.executor.wake_connect(conn_index, Err(err));
self.driver.close_connection(conn_index);
return;
}
let timeout_was_armed = self
.driver
.connections
.get(conn_index)
.map(|c| c.connect_timeout_armed)
.unwrap_or(false);
if timeout_was_armed {
let still_connecting = self
.driver
.connections
.get(conn_index)
.map(|c| matches!(c.recv_mode, RecvMode::Connecting))
.unwrap_or(false);
if !still_connecting {
if let Some(cs) = self.driver.connections.get_mut(conn_index) {
cs.connect_timeout_armed = false;
}
return;
}
let timeout_ud = UserData::encode(OpTag::Timeout, conn_index, 0);
let _ = self
.driver
.ring
.submit_async_cancel(timeout_ud.raw(), conn_index);
if let Some(cs) = self.driver.connections.get_mut(conn_index) {
cs.connect_timeout_armed = false;
}
}
if let Some(pending) = self.driver.pending_recv_bufs[conn_index as usize].take() {
self.driver.pending_replenish.push(pending.bid);
}
self.driver.accumulators.reset(conn_index);
if let Some(ref mut tls_table) = self.driver.tls_table
&& tls_table.get_mut(conn_index).is_some()
{
if !crate::tls::flush_tls_output(
tls_table,
&mut self.driver.ring,
&mut self.driver.send_copy_pool,
conn_index,
) {
let err = std::io::Error::other("send pool exhausted during TLS flush");
self.executor.wake_connect(conn_index, Err(err));
self.driver.close_connection(conn_index);
return;
}
if let Some(cs) = self.driver.connections.get_mut(conn_index) {
cs.recv_mode = RecvMode::Multi;
}
self.arm_recv(conn_index);
return;
}
if let Some(cs) = self.driver.connections.get_mut(conn_index) {
cs.established = true;
cs.recv_mode = RecvMode::Multi;
}
self.arm_recv(conn_index);
self.executor.wake_connect(conn_index, Ok(()));
}
fn handle_timeout(&mut self, ud: UserData, result: i32) {
let conn_index = ud.conn_index();
if result != -libc::ETIME {
return;
}
let conn = match self.driver.connections.get(conn_index) {
Some(c) => c,
None => return,
};
if !matches!(conn.recv_mode, RecvMode::Connecting) {
return;
}
let connect_ud = UserData::encode(OpTag::Connect, conn_index, 0);
let _ = self
.driver
.ring
.submit_async_cancel(connect_ud.raw(), conn_index);
if let Some(ref mut tls_table) = self.driver.tls_table {
tls_table.remove(conn_index);
}
let err = io::Error::new(io::ErrorKind::TimedOut, "connect timed out");
self.executor.wake_connect(conn_index, Err(err));
self.driver.close_connection(conn_index);
}
fn handle_close(&mut self, ud: UserData) {
let conn_index = ud.conn_index();
if let Some(pending) = self.driver.pending_recv_bufs[conn_index as usize].take() {
self.driver.pending_replenish.push(pending.bid);
}
let was_established = self
.driver
.connections
.get(conn_index)
.map(|c| c.established)
.unwrap_or(false);
if let Some(ref mut tls_table) = self.driver.tls_table {
tls_table.remove(conn_index);
}
if was_established {
metrics::CONNECTIONS_CLOSED.increment();
metrics::CONNECTIONS_ACTIVE.decrement();
}
self.executor.remove_connection(conn_index);
self.driver.connections.release(conn_index);
}
fn handle_tls_send(&mut self, ud: UserData, result: i32) {
let conn_index = ud.conn_index();
let pool_slot = ud.payload() as u16;
if result > 0
&& let Some((ptr, remaining)) = self
.driver
.send_copy_pool
.try_advance(pool_slot, result as u32)
{
if self
.driver
.ring
.submit_tls_send(conn_index, ptr, remaining, pool_slot)
.is_err()
{
let generation = self.driver.connections.generation(conn_index);
self.driver
.pending_copy_retries
.push((conn_index, generation, pool_slot));
}
return;
}
self.driver.send_copy_pool.release(pool_slot);
if result < 0 {
self.driver.close_connection(conn_index);
}
}
fn fire_chain_complete(&mut self, conn_index: u32) {
let chain = match self.driver.chain_table.take(conn_index) {
Some(c) => c,
None => return,
};
let io_result = match chain.first_error {
Some(errno) => Err(io::Error::from_raw_os_error(-errno)),
None => Ok(chain.bytes_sent),
};
if chain.first_error.is_none() {
self.driver.submit_next_queued(conn_index);
} else {
self.driver.drain_conn_send_queue(conn_index);
}
self.executor.wake_send(conn_index, io_result);
}
fn handle_timer(&mut self, ud: UserData, result: i32) {
if result != -libc::ETIME {
return;
}
let payload = ud.payload();
let (slot, generation) = TimerSlotPool::decode_payload(payload);
if let Some(waker_id) = self.executor.timer_pool.fire(slot, generation) {
self.executor.wake_task(waker_id);
}
}
fn handle_recv_msg_udp(&mut self, ud: UserData, result: i32) {
let udp_index = ud.conn_index();
let idx = udp_index as usize;
if idx >= self.driver.udp_sockets.len() {
return;
}
if result <= 0 {
self.driver.resubmit_udp_recvmsg(udp_index);
return;
}
let bytes = result as usize;
let peer = sockaddr_to_socket_addr(
&self.driver.udp_sockets[idx].recv_addr,
self.driver.udp_sockets[idx].recv_msghdr.msg_namelen,
);
let data = self.driver.udp_sockets[idx].recv_buf[..bytes].to_vec();
self.driver.resubmit_udp_recvmsg(udp_index);
if let Some(peer) = peer {
metrics::UDP_DATAGRAMS_RECEIVED.increment();
if idx < self.executor.udp_recv_queues.len() {
self.executor.udp_recv_queues[idx].push_back((data, peer));
self.executor.wake_udp_recv(udp_index);
}
}
}
fn handle_send_msg_udp(&mut self, ud: UserData, result: i32) {
let udp_index = ud.conn_index();
let pool_slot = ud.payload() as u16;
let idx = udp_index as usize;
self.driver.send_copy_pool.release(pool_slot);
if idx < self.driver.udp_sockets.len() {
self.driver.udp_sockets[idx].send_in_flight = false;
}
if result < 0 {
metrics::UDP_SEND_ERRORS.increment();
}
}
fn handle_nvme_cmd(&mut self, ud: UserData, result: i32) {
let slab_idx = ud.payload() as u16;
let nvme_cmd_slab = match self.driver.nvme_cmd_slab {
Some(ref mut s) => s,
None => return,
};
if !nvme_cmd_slab.in_use(slab_idx) {
return;
}
let device_index = nvme_cmd_slab.release(slab_idx);
if let Some(ref mut devices) = self.driver.nvme_devices
&& let Some(dev) = devices.get_mut(device_index)
{
dev.in_flight = dev.in_flight.saturating_sub(1);
}
self.executor.wake_disk_io(slab_idx as u32, result);
}
fn handle_direct_io(&mut self, ud: UserData, result: i32) {
let slab_idx = ud.payload() as u16;
let cmd_slab = match self.driver.direct_io_cmd_slab {
Some(ref mut s) => s,
None => return,
};
if !cmd_slab.in_use(slab_idx) {
return;
}
let (file_index, _op) = cmd_slab.release(slab_idx);
if let Some(ref mut files) = self.driver.direct_io_files
&& let Some(f) = files.get_mut(file_index)
{
f.in_flight = f.in_flight.saturating_sub(1);
}
self.executor.wake_disk_io(slab_idx as u32, result);
}
fn handle_fs(&mut self, ud: UserData, result: i32) {
let slab_idx = ud.payload() as u16;
let file_index = ud.conn_index() as u16;
let cmd_slab = match self.driver.fs_cmd_slab {
Some(ref mut s) => s,
None => return,
};
if !cmd_slab.in_use(slab_idx) {
return;
}
let op = cmd_slab.get(slab_idx).map(|e| e.op);
if op == Some(crate::fs::FsOp::Statx)
&& result >= 0
&& let Some(entry) = cmd_slab.get(slab_idx)
&& let Some(ref statx_buf) = entry.statx_buf
{
let metadata = crate::fs::Metadata::from_statx(statx_buf);
self.executor
.fs_stat_results
.insert(slab_idx as u32, metadata);
}
if op == Some(crate::fs::FsOp::Open) && result < 0 {
if let Some(ref mut files) = self.driver.fs_files {
files.release(file_index);
}
}
let (released_file_index, released_op) = cmd_slab.release(slab_idx);
match released_op {
crate::fs::FsOp::Read | crate::fs::FsOp::Write | crate::fs::FsOp::Fsync => {
if let Some(ref mut files) = self.driver.fs_files
&& let Some(f) = files.get_mut(released_file_index)
{
f.in_flight = f.in_flight.saturating_sub(1);
}
}
_ => {}
}
self.executor.wake_disk_io(slab_idx as u32, result);
}
fn handle_pidfd_poll(&mut self, ud: UserData, result: i32) {
let seq = ud.payload();
self.executor.wake_pidfd(seq, result);
}
fn arm_recv(&mut self, conn_index: u32) {
#[cfg(feature = "timestamps")]
if self.driver.timestamps {
let msghdr_ptr = &*self.driver.recvmsg_msghdr as *const libc::msghdr;
if self
.driver
.ring
.submit_multishot_recvmsg(conn_index, msghdr_ptr)
.is_err()
{
metrics::RECV_ARM_FAILURES.increment();
self.executor.wake_recv(conn_index);
self.driver.close_connection(conn_index);
return;
}
if let Some(cs) = self.driver.connections.get_mut(conn_index) {
cs.recv_mode = RecvMode::MsgMulti;
}
return;
}
if self.driver.ring.submit_multishot_recv(conn_index).is_err() {
metrics::RECV_ARM_FAILURES.increment();
self.executor.wake_recv(conn_index);
self.driver.close_connection(conn_index);
}
}
fn spawn_accept_task(&mut self, conn_index: u32) {
let generation = self.driver.connections.generation(conn_index);
let conn_ctx = ConnCtx::new(conn_index, generation);
let future = Box::pin(self.handler.on_accept(conn_ctx));
self.executor.owner_task[conn_index as usize] = Some(conn_index);
self.executor.task_slab.spawn(conn_index, future);
self.executor.ready_queue.push_back(conn_index);
}
#[cfg(test)]
pub(crate) fn test_dispatch_cqe(&mut self, user_data_raw: u64, result: i32, flags: u32) {
self.dispatch_cqe(user_data_raw, result, flags);
}
#[cfg(test)]
pub(crate) fn inject_and_dispatch(&mut self, user_data_raw: u64, result: i32) {
self.driver
.ring
.submit_nop_inject(user_data_raw, result)
.expect("submit_nop_inject failed — kernel 6.6+ required");
self.driver
.ring
.submit_and_wait(1)
.expect("submit_and_wait failed");
self.drain_completions();
}
#[cfg(test)]
pub(crate) fn drain_retries(&mut self) {
if !self.driver.pending_copy_retries.is_empty() {
let retries: Vec<_> = self.driver.pending_copy_retries.drain(..).collect();
for (conn_index, generation, pool_slot) in retries {
if !self.driver.send_copy_pool.in_use(pool_slot) {
continue;
}
if self.driver.connections.get(conn_index).is_none()
|| self.driver.connections.generation(conn_index) != generation
{
self.driver.send_copy_pool.release(pool_slot);
continue;
}
let (ptr, remaining) = self.driver.send_copy_pool.current_ptr_remaining(pool_slot);
let result = self
.driver
.ring
.submit_send_copied(conn_index, ptr, remaining, pool_slot);
if result.is_err() {
self.driver.send_copy_pool.release(pool_slot);
self.driver.drain_conn_send_queue(conn_index);
let err = io::Error::other("SQ full during partial copy send resubmission");
self.executor.wake_send(conn_index, Err(err));
self.driver.close_connection(conn_index);
}
}
}
if !self.driver.pending_zc_retries.is_empty() {
let retries: Vec<_> = self.driver.pending_zc_retries.drain(..).collect();
for (conn_index, generation, slab_idx) in retries {
if !self.driver.send_slab.in_use(slab_idx) {
continue;
}
if self.driver.connections.get(conn_index).is_none()
|| self.driver.connections.generation(conn_index) != generation
{
self.driver.send_slab.mark_awaiting_notifications(slab_idx);
if self.driver.send_slab.should_release(slab_idx) {
let pool_slot = self.driver.send_slab.release(slab_idx);
if pool_slot != u16::MAX {
self.driver.send_copy_pool.release(pool_slot);
}
}
continue;
}
let msg_ptr = self.driver.send_slab.msghdr_ptr(slab_idx);
if self
.driver
.ring
.submit_send_msg_zc(conn_index, msg_ptr, slab_idx)
.is_err()
{
self.driver.send_slab.mark_awaiting_notifications(slab_idx);
if self.driver.send_slab.should_release(slab_idx) {
let pool_slot = self.driver.send_slab.release(slab_idx);
if pool_slot != u16::MAX {
self.driver.send_copy_pool.release(pool_slot);
}
}
self.driver.drain_conn_send_queue(conn_index);
let err = io::Error::other("SQ full during partial ZC send resubmission");
self.executor.wake_send(conn_index, Err(err));
self.driver.close_connection(conn_index);
}
}
}
if !self.driver.pending_close_retries.is_empty() {
let retries: Vec<_> = self.driver.pending_close_retries.drain(..).collect();
for conn_index in retries {
if self.driver.ring.submit_close(conn_index).is_err() {
self.driver.pending_close_retries.push(conn_index);
}
}
}
}
#[cfg(test)]
#[cfg(test)]
pub(crate) fn inject_linked_chain_and_dispatch(&mut self, cqes: &[(u64, i32)]) {
let last = cqes.len() - 1;
for (i, &(user_data_raw, result)) in cqes.iter().enumerate() {
if i < last {
self.driver
.ring
.submit_nop_inject_linked(user_data_raw, result)
.expect("submit_nop_inject_linked failed");
} else {
self.driver
.ring
.submit_nop_inject(user_data_raw, result)
.expect("submit_nop_inject failed");
}
}
self.driver
.ring
.submit_and_wait(cqes.len() as u32)
.expect("submit_and_wait failed");
self.drain_completions();
}
#[cfg(test)]
pub(crate) fn inject_batch_and_dispatch(&mut self, cqes: &[(u64, i32)]) {
for &(user_data_raw, result) in cqes {
self.driver
.ring
.submit_nop_inject(user_data_raw, result)
.expect("submit_nop_inject failed");
}
self.driver
.ring
.submit_and_wait(cqes.len() as u32)
.expect("submit_and_wait failed");
self.drain_completions();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::completion::{OpTag, UserData};
use crate::config::Config;
use crate::runtime::io::ConnCtx;
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
struct NoopHandler;
impl AsyncEventHandler for NoopHandler {
#[allow(clippy::manual_async_fn)]
fn on_accept(&self, _conn: ConnCtx) -> impl Future<Output = ()> + 'static {
async {}
}
fn create_for_worker(_id: usize) -> Self {
NoopHandler
}
}
fn test_config() -> Config {
let mut config = Config::default();
config.worker.threads = 1;
config.worker.pin_to_core = false;
config.sq_entries = 32;
config.recv_buffer.ring_size = 16;
config.recv_buffer.buffer_size = 4096;
config.max_connections = 16;
config.send_copy_count = 16;
config.send_slab_slots = 8;
config.fs = Some(crate::fs::FsConfig {
max_files: 2,
max_commands_in_flight: 4,
});
config
}
fn make_test_loop() -> AsyncEventLoop<NoopHandler> {
let config = test_config();
let shutdown = Arc::new(AtomicBool::new(false));
let eventfd = unsafe { libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC) };
assert!(eventfd >= 0, "eventfd creation failed");
AsyncEventLoop::new(
&config,
NoopHandler,
None,
eventfd,
shutdown,
None,
None,
None,
None,
None,
None,
None,
None,
None,
)
.expect("failed to create test event loop")
}
fn accept_connection(el: &mut AsyncEventLoop<NoopHandler>) -> u32 {
let conn_index = el.driver.connections.allocate().expect("no free slots");
el.driver.accumulators.reset(conn_index);
if let Some(cs) = el.driver.connections.get_mut(conn_index) {
cs.recv_mode = RecvMode::Multi;
cs.established = true;
}
conn_index
}
#[test]
fn handle_send_complete_releases_pool_slot() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let data = b"hello";
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(data).unwrap();
let free_before = el.driver.send_copy_pool.free_count();
let ud = UserData::encode(OpTag::Send, conn_index, slot as u32);
el.test_dispatch_cqe(ud.raw(), data.len() as i32, 0);
assert_eq!(
el.driver.send_copy_pool.free_count(),
free_before + 1,
"pool slot not released after send complete"
);
}
#[test]
fn handle_send_error_releases_pool_slot() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let data = b"hello";
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(data).unwrap();
let free_before = el.driver.send_copy_pool.free_count();
let ud = UserData::encode(OpTag::Send, conn_index, slot as u32);
el.test_dispatch_cqe(ud.raw(), -104, 0);
assert_eq!(
el.driver.send_copy_pool.free_count(),
free_before + 1,
"pool slot not released after send error"
);
}
#[test]
fn handle_send_wakes_send_waiter() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
el.executor.send_waiters[conn_index as usize] = true;
let data = b"hello";
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(data).unwrap();
let ud = UserData::encode(OpTag::Send, conn_index, slot as u32);
el.test_dispatch_cqe(ud.raw(), data.len() as i32, 0);
assert!(
!el.executor.send_waiters[conn_index as usize],
"send waiter not cleared"
);
assert!(
el.executor.io_results[conn_index as usize].is_some(),
"send result not stored"
);
}
#[test]
fn handle_send_msg_zc_notif_releases_slab() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let iovecs = [libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 100,
}];
let guards = [None, None, None, None];
let (slab_idx, _ptr) = el
.driver
.send_slab
.allocate(conn_index, &iovecs, u16::MAX, guards, 0, 100)
.unwrap();
let free_before = el.driver.send_slab.free_count();
el.driver.send_slab.inc_pending_notifs(slab_idx);
el.driver.send_slab.mark_awaiting_notifications(slab_idx);
let ud = UserData::encode(OpTag::SendMsgZc, conn_index, slab_idx as u32);
let notif_flags = 8u32; el.test_dispatch_cqe(ud.raw(), 0, notif_flags);
assert_eq!(
el.driver.send_slab.free_count(),
free_before + 1,
"slab entry not released after notification"
);
}
#[test]
fn handle_send_msg_zc_error_does_not_increment_notifs() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let iovecs = [libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 100,
}];
let guards = [None, None, None, None];
let (slab_idx, _ptr) = el
.driver
.send_slab
.allocate(conn_index, &iovecs, u16::MAX, guards, 0, 100)
.unwrap();
let ud = UserData::encode(OpTag::SendMsgZc, conn_index, slab_idx as u32);
el.test_dispatch_cqe(ud.raw(), -104, 0);
assert!(
el.driver.send_slab.should_release(slab_idx) || !el.driver.send_slab.in_use(slab_idx),
"slab entry leaked after ZC send error"
);
}
#[test]
fn handle_send_msg_zc_result_zero_does_not_leak_slab() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let iovecs = [libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 100,
}];
let guards = [None, None, None, None];
let (slab_idx, _ptr) = el
.driver
.send_slab
.allocate(conn_index, &iovecs, u16::MAX, guards, 0, 100)
.unwrap();
let ud = UserData::encode(OpTag::SendMsgZc, conn_index, slab_idx as u32);
el.test_dispatch_cqe(ud.raw(), 0, 0);
assert!(
!el.driver.send_slab.in_use(slab_idx) || el.driver.send_slab.should_release(slab_idx),
"slab entry leaked on result == 0"
);
}
#[test]
fn handle_recv_multi_eof_closes_connection() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let ud = UserData::encode(OpTag::RecvMulti, conn_index, 0);
el.test_dispatch_cqe(ud.raw(), 0, 0);
let conn = el.driver.connections.get(conn_index);
assert!(
conn.is_none() || matches!(conn.unwrap().recv_mode, RecvMode::Closed),
"connection not closed after recv EOF"
);
}
#[test]
fn handle_recv_multi_stale_connection_replenishes_buffer() {
let mut el = make_test_loop();
let replenish_before = el.driver.pending_replenish.len();
let bid: u16 = 5;
let flags = (1u32) | ((bid as u32) << 16); let ud = UserData::encode(OpTag::RecvMulti, 0, 0);
el.test_dispatch_cqe(ud.raw(), 100, flags);
assert_eq!(
el.driver.pending_replenish.len(),
replenish_before + 1,
"buffer not replenished on stale connection CQE"
);
assert_eq!(el.driver.pending_replenish[0], bid);
}
#[test]
fn handle_close_releases_connection_slot() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
assert!(el.driver.connections.get(conn_index).is_some());
el.driver.close_connection(conn_index);
let ud = UserData::encode(OpTag::Close, conn_index, 0);
el.test_dispatch_cqe(ud.raw(), 0, 0);
assert!(
el.driver.connections.get(conn_index).is_none(),
"connection slot not released after Close CQE"
);
}
#[test]
fn handle_recv_multi_data_appends_to_accumulator() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let bid: u16 = 0;
let flags = 1u32 | 2u32 | ((bid as u32) << 16);
let bytes_received = 5i32;
let (buf_ptr, _) = el.driver.provided_bufs.get_buffer(bid);
unsafe {
std::ptr::copy_nonoverlapping(b"hello".as_ptr(), buf_ptr as *mut u8, 5);
}
let ud = UserData::encode(OpTag::RecvMulti, conn_index, 0);
el.test_dispatch_cqe(ud.raw(), bytes_received, flags);
let data = el.driver.accumulators.data(conn_index);
assert!(
data.is_empty(),
"data should NOT be in accumulator (zero-copy)"
);
let pending = el.driver.pending_recv_bufs[conn_index as usize];
assert!(pending.is_some(), "pending recv buf should be set");
let pending = pending.unwrap();
assert_eq!(pending.bid, bid);
assert_eq!(pending.len, bytes_received as u32);
assert!(
!el.driver.pending_replenish.contains(&bid),
"buffer should NOT be replenished yet (zero-copy deferred)"
);
}
#[test]
fn handle_recv_multi_second_completion_flushes_to_accumulator() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let flags = 1u32 | 2u32;
let (buf_ptr, _) = el.driver.provided_bufs.get_buffer(0);
unsafe {
std::ptr::copy_nonoverlapping(b"hello".as_ptr(), buf_ptr as *mut u8, 5);
}
let ud = UserData::encode(OpTag::RecvMulti, conn_index, 0);
el.test_dispatch_cqe(ud.raw(), 5, flags);
let (buf_ptr, _) = el.driver.provided_bufs.get_buffer(1);
unsafe {
std::ptr::copy_nonoverlapping(b" world".as_ptr(), buf_ptr as *mut u8, 6);
}
let ud = UserData::encode(OpTag::RecvMulti, conn_index, 0);
el.test_dispatch_cqe(ud.raw(), 6, flags | (1u32 << 16));
assert!(
el.driver.pending_replenish.contains(&0),
"first buffer should be replenished"
);
assert!(
el.driver.pending_replenish.contains(&1),
"second buffer should be replenished"
);
assert!(el.driver.pending_recv_bufs[conn_index as usize].is_none());
let data = el.driver.accumulators.data(conn_index);
assert_eq!(data, b"hello world");
}
#[test]
fn handle_recv_multi_enobufs_does_not_close() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let ud = UserData::encode(OpTag::RecvMulti, conn_index, 0);
el.test_dispatch_cqe(ud.raw(), -105, 0);
assert!(
el.driver.connections.get(conn_index).is_some(),
"connection closed on ENOBUFS"
);
}
#[test]
fn handle_recv_multi_unknown_error_closes_when_no_more() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let ud = UserData::encode(OpTag::RecvMulti, conn_index, 0);
el.test_dispatch_cqe(ud.raw(), -99, 0);
let conn = el.driver.connections.get(conn_index);
assert!(
conn.is_none() || matches!(conn.unwrap().recv_mode, RecvMode::Closed),
"connection not closed on unknown recv error"
);
}
#[test]
fn handle_recv_multi_ecanceled_does_nothing() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let ud = UserData::encode(OpTag::RecvMulti, conn_index, 0);
el.test_dispatch_cqe(ud.raw(), -125, 0);
assert!(
el.driver.connections.get(conn_index).is_some(),
"connection closed on ECANCELED"
);
}
#[test]
fn handle_connect_success_wakes_waiter() {
let mut el = make_test_loop();
let conn_index = el
.driver
.connections
.allocate_outbound()
.expect("no free slots");
el.executor.connect_waiters[conn_index as usize] = true;
let ud = UserData::encode(OpTag::Connect, conn_index, 0);
el.test_dispatch_cqe(ud.raw(), 0, 0);
assert!(
!el.executor.connect_waiters[conn_index as usize],
"connect waiter not cleared"
);
assert!(
el.executor.io_results[conn_index as usize].is_some(),
"connect result not stored"
);
let conn = el.driver.connections.get(conn_index).unwrap();
assert!(conn.established, "connection not marked established");
assert!(
matches!(conn.recv_mode, RecvMode::Multi),
"recv_mode not set to Multi after connect"
);
}
#[test]
fn handle_connect_error_wakes_waiter_and_closes() {
let mut el = make_test_loop();
let conn_index = el
.driver
.connections
.allocate_outbound()
.expect("no free slots");
el.executor.connect_waiters[conn_index as usize] = true;
let ud = UserData::encode(OpTag::Connect, conn_index, 0);
el.test_dispatch_cqe(ud.raw(), -111, 0);
assert!(
!el.executor.connect_waiters[conn_index as usize],
"connect waiter not cleared on error"
);
assert!(
el.executor.io_results[conn_index as usize].is_some(),
"connect error result not stored"
);
let conn = el.driver.connections.get(conn_index);
assert!(
conn.is_none() || matches!(conn.unwrap().recv_mode, RecvMode::Closed),
"connection not closed after connect error"
);
}
#[test]
fn handle_timer_fires_and_wakes_task() {
let mut el = make_test_loop();
let waker_id = 0u32; let (slot, generation) = el.executor.timer_pool.allocate(waker_id).unwrap();
let payload = TimerSlotPool::encode_payload(slot, generation);
let ud = UserData::encode(OpTag::Timer, 0, payload);
el.test_dispatch_cqe(ud.raw(), -62, 0);
assert!(
el.executor.timer_pool.is_fired(slot),
"timer not marked as fired"
);
}
#[test]
fn handle_timer_stale_generation_ignored() {
let mut el = make_test_loop();
let (slot, generation) = el.executor.timer_pool.allocate(0).unwrap();
el.executor.timer_pool.release(slot);
let (_slot2, gen2) = el.executor.timer_pool.allocate(0).unwrap();
assert_ne!(generation, gen2, "generation should have changed");
let payload = TimerSlotPool::encode_payload(slot, generation);
let ud = UserData::encode(OpTag::Timer, 0, payload);
el.test_dispatch_cqe(ud.raw(), -62, 0);
assert!(
!el.executor.timer_pool.is_fired(slot),
"stale timer should not be fired"
);
}
#[test]
fn handle_tls_send_complete_releases_pool_slot() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let data = b"ciphertext";
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(data).unwrap();
let free_before = el.driver.send_copy_pool.free_count();
let ud = UserData::encode(OpTag::TlsSend, conn_index, slot as u32);
el.test_dispatch_cqe(ud.raw(), data.len() as i32, 0);
assert_eq!(
el.driver.send_copy_pool.free_count(),
free_before + 1,
"pool slot not released after TLS send complete"
);
}
#[test]
fn handle_tls_send_error_closes_connection() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let data = b"ciphertext";
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(data).unwrap();
let ud = UserData::encode(OpTag::TlsSend, conn_index, slot as u32);
el.test_dispatch_cqe(ud.raw(), -104, 0);
assert!(
!el.driver.send_copy_pool.in_use(slot),
"pool slot not released after TLS send error"
);
let conn = el.driver.connections.get(conn_index);
assert!(
conn.is_none() || matches!(conn.unwrap().recv_mode, RecvMode::Closed),
"connection not closed after TLS send error"
);
}
#[test]
fn handle_tick_timeout_clears_armed_flag() {
let mut el = make_test_loop();
el.driver.tick_timeout_armed = true;
let ud = UserData::encode(OpTag::TickTimeout, 0, 0);
el.test_dispatch_cqe(ud.raw(), -62, 0);
assert!(
!el.driver.tick_timeout_armed,
"tick_timeout_armed not cleared"
);
}
#[test]
fn handle_send_msg_udp_error_releases_pool_slot() {
let mut el = make_test_loop();
if el.driver.udp_sockets.is_empty() {
return; }
let data = b"datagram";
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(data).unwrap();
let free_before = el.driver.send_copy_pool.free_count();
let udp_index = 0u32;
let ud = UserData::encode(OpTag::SendMsgUdp, udp_index, slot as u32);
el.test_dispatch_cqe(ud.raw(), data.len() as i32, 0);
assert_eq!(
el.driver.send_copy_pool.free_count(),
free_before + 1,
"pool slot not released after UDP send"
);
}
#[test]
fn handle_send_partial_queues_or_resubmits() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let data = b"0123456789";
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(data).unwrap();
el.driver.send_queues[conn_index as usize].in_flight = true;
let ud = UserData::encode(OpTag::Send, conn_index, slot as u32);
el.test_dispatch_cqe(ud.raw(), 5, 0);
assert!(
el.driver.send_copy_pool.in_use(slot),
"pool slot released prematurely on partial send"
);
}
#[test]
fn handle_send_error_wakes_send_waiter_with_error() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
el.executor.send_waiters[conn_index as usize] = true;
el.driver.send_queues[conn_index as usize].in_flight = true;
let data = b"hello";
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(data).unwrap();
let ud = UserData::encode(OpTag::Send, conn_index, slot as u32);
el.test_dispatch_cqe(ud.raw(), -104, 0);
assert!(
!el.executor.send_waiters[conn_index as usize],
"send waiter not cleared on error"
);
assert!(
el.executor.io_results[conn_index as usize].is_some(),
"send error result not stored"
);
}
#[test]
fn handle_send_msg_zc_error_wakes_send_waiter() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
el.executor.send_waiters[conn_index as usize] = true;
let iovecs = [libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 100,
}];
let guards = [None, None, None, None];
let (slab_idx, _ptr) = el
.driver
.send_slab
.allocate(conn_index, &iovecs, u16::MAX, guards, 0, 100)
.unwrap();
let ud = UserData::encode(OpTag::SendMsgZc, conn_index, slab_idx as u32);
el.test_dispatch_cqe(ud.raw(), -104, 0);
assert!(
!el.executor.send_waiters[conn_index as usize],
"send waiter not cleared on ZC error"
);
assert!(
el.executor.io_results[conn_index as usize].is_some(),
"ZC send error result not stored"
);
}
#[test]
fn disk_io_future_drop_clears_waiter() {
let mut el = make_test_loop();
let seq = 42u32;
let task_id = 0u32;
el.executor.disk_io_waiters.insert(seq, task_id);
assert!(el.executor.disk_io_waiters.contains_key(&seq));
let driver_ptr = &mut el.driver as *mut Driver;
let executor_ptr = &mut el.executor as *mut Executor;
let mut driver_state = DriverState {
driver: driver_ptr,
executor: executor_ptr,
};
set_driver_state(&mut driver_state);
{
let _fut = crate::runtime::io::DiskIoFuture { seq };
}
clear_driver_state();
assert!(
!el.executor.disk_io_waiters.contains_key(&seq),
"disk_io_waiter not cleared on DiskIoFuture drop"
);
}
#[test]
fn nop_inject_send_complete() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let data = b"hello";
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(data).unwrap();
let free_before = el.driver.send_copy_pool.free_count();
let ud = UserData::encode(OpTag::Send, conn_index, slot as u32);
el.inject_and_dispatch(ud.raw(), data.len() as i32);
assert_eq!(
el.driver.send_copy_pool.free_count(),
free_before + 1,
"pool slot not released via NOP inject path"
);
}
#[test]
fn nop_inject_send_error_releases_and_wakes() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
el.executor.send_waiters[conn_index as usize] = true;
el.driver.send_queues[conn_index as usize].in_flight = true;
let data = b"hello";
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(data).unwrap();
let ud = UserData::encode(OpTag::Send, conn_index, slot as u32);
el.inject_and_dispatch(ud.raw(), -104);
assert!(
!el.driver.send_copy_pool.in_use(slot),
"pool slot not released on injected send error"
);
assert!(
!el.executor.send_waiters[conn_index as usize],
"send waiter not cleared on injected error"
);
assert!(
el.executor.io_results[conn_index as usize].is_some(),
"error result not stored"
);
}
#[test]
fn nop_inject_recv_eof_closes_connection() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let ud = UserData::encode(OpTag::RecvMulti, conn_index, 0);
el.inject_and_dispatch(ud.raw(), 0);
let conn = el.driver.connections.get(conn_index);
assert!(
conn.is_none() || matches!(conn.unwrap().recv_mode, RecvMode::Closed),
"connection not closed on injected recv EOF"
);
}
#[test]
fn nop_inject_zc_send_error_no_slab_leak() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let iovecs = [libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 100,
}];
let guards = [None, None, None, None];
let (slab_idx, _ptr) = el
.driver
.send_slab
.allocate(conn_index, &iovecs, u16::MAX, guards, 0, 100)
.unwrap();
let ud = UserData::encode(OpTag::SendMsgZc, conn_index, slab_idx as u32);
el.inject_and_dispatch(ud.raw(), -104);
assert!(
!el.driver.send_slab.in_use(slab_idx) || el.driver.send_slab.should_release(slab_idx),
"slab entry leaked on injected ZC error"
);
}
#[test]
fn nop_inject_timer_fires() {
let mut el = make_test_loop();
let waker_id = 0u32;
let (slot, generation) = el.executor.timer_pool.allocate(waker_id).unwrap();
let payload = TimerSlotPool::encode_payload(slot, generation);
let ud = UserData::encode(OpTag::Timer, 0, payload);
el.inject_and_dispatch(ud.raw(), -62);
assert!(
el.executor.timer_pool.is_fired(slot),
"timer not fired via NOP inject"
);
}
#[test]
fn nop_inject_send_wakes_waiter() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
el.executor.send_waiters[conn_index as usize] = true;
el.driver.send_queues[conn_index as usize].in_flight = true;
let data = b"hello";
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(data).unwrap();
let ud = UserData::encode(OpTag::Send, conn_index, slot as u32);
el.inject_and_dispatch(ud.raw(), data.len() as i32);
assert!(!el.executor.send_waiters[conn_index as usize]);
assert!(el.executor.io_results[conn_index as usize].is_some());
}
#[test]
fn nop_inject_zc_notif_releases_slab() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let iovecs = [libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 100,
}];
let guards = [None, None, None, None];
let (slab_idx, _ptr) = el
.driver
.send_slab
.allocate(conn_index, &iovecs, u16::MAX, guards, 0, 100)
.unwrap();
let free_before = el.driver.send_slab.free_count();
el.driver.send_slab.inc_pending_notifs(slab_idx);
el.driver.send_slab.mark_awaiting_notifications(slab_idx);
let ud = UserData::encode(OpTag::SendMsgZc, conn_index, slab_idx as u32);
el.test_dispatch_cqe(ud.raw(), 0, 8);
assert_eq!(el.driver.send_slab.free_count(), free_before + 1);
}
#[test]
fn nop_inject_zc_result_zero() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let iovecs = [libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 100,
}];
let guards = [None, None, None, None];
let (slab_idx, _ptr) = el
.driver
.send_slab
.allocate(conn_index, &iovecs, u16::MAX, guards, 0, 100)
.unwrap();
let ud = UserData::encode(OpTag::SendMsgZc, conn_index, slab_idx as u32);
el.inject_and_dispatch(ud.raw(), 0);
assert!(
!el.driver.send_slab.in_use(slab_idx) || el.driver.send_slab.should_release(slab_idx),
);
}
#[test]
fn nop_inject_recv_enobufs() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let ud = UserData::encode(OpTag::RecvMulti, conn_index, 0);
el.inject_and_dispatch(ud.raw(), -105);
assert!(
el.driver.connections.get(conn_index).is_some(),
"connection closed on ENOBUFS"
);
}
#[test]
fn nop_inject_recv_unknown_error_closes() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let ud = UserData::encode(OpTag::RecvMulti, conn_index, 0);
el.inject_and_dispatch(ud.raw(), -99);
let conn = el.driver.connections.get(conn_index);
assert!(conn.is_none() || matches!(conn.unwrap().recv_mode, RecvMode::Closed),);
}
#[test]
fn nop_inject_recv_ecanceled() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let ud = UserData::encode(OpTag::RecvMulti, conn_index, 0);
el.inject_and_dispatch(ud.raw(), -125);
assert!(el.driver.connections.get(conn_index).is_some());
}
#[test]
fn nop_inject_connect_success() {
let mut el = make_test_loop();
let conn_index = el
.driver
.connections
.allocate_outbound()
.expect("no free slots");
el.executor.connect_waiters[conn_index as usize] = true;
let ud = UserData::encode(OpTag::Connect, conn_index, 0);
el.inject_and_dispatch(ud.raw(), 0);
assert!(!el.executor.connect_waiters[conn_index as usize]);
assert!(el.executor.io_results[conn_index as usize].is_some());
let conn = el.driver.connections.get(conn_index).unwrap();
assert!(conn.established);
assert!(matches!(conn.recv_mode, RecvMode::Multi));
}
#[test]
fn nop_inject_connect_error() {
let mut el = make_test_loop();
let conn_index = el
.driver
.connections
.allocate_outbound()
.expect("no free slots");
el.executor.connect_waiters[conn_index as usize] = true;
let ud = UserData::encode(OpTag::Connect, conn_index, 0);
el.inject_and_dispatch(ud.raw(), -111);
assert!(!el.executor.connect_waiters[conn_index as usize]);
assert!(el.executor.io_results[conn_index as usize].is_some());
}
#[test]
fn nop_inject_timer_stale_generation() {
let mut el = make_test_loop();
let (slot, generation) = el.executor.timer_pool.allocate(0).unwrap();
el.executor.timer_pool.release(slot);
let (_slot2, _gen2) = el.executor.timer_pool.allocate(0).unwrap();
let payload = TimerSlotPool::encode_payload(slot, generation);
let ud = UserData::encode(OpTag::Timer, 0, payload);
el.inject_and_dispatch(ud.raw(), -62);
assert!(!el.executor.timer_pool.is_fired(slot));
}
#[test]
fn nop_inject_tls_send_complete() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let data = b"ciphertext";
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(data).unwrap();
let free_before = el.driver.send_copy_pool.free_count();
let ud = UserData::encode(OpTag::TlsSend, conn_index, slot as u32);
el.inject_and_dispatch(ud.raw(), data.len() as i32);
assert_eq!(el.driver.send_copy_pool.free_count(), free_before + 1);
}
#[test]
fn nop_inject_tls_send_error_closes() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let data = b"ciphertext";
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(data).unwrap();
let ud = UserData::encode(OpTag::TlsSend, conn_index, slot as u32);
el.inject_and_dispatch(ud.raw(), -104);
assert!(!el.driver.send_copy_pool.in_use(slot));
let conn = el.driver.connections.get(conn_index);
assert!(conn.is_none() || matches!(conn.unwrap().recv_mode, RecvMode::Closed));
}
#[test]
fn nop_inject_tick_timeout() {
let mut el = make_test_loop();
el.driver.tick_timeout_armed = true;
let ud = UserData::encode(OpTag::TickTimeout, 0, 0);
el.inject_and_dispatch(ud.raw(), -62);
assert!(!el.driver.tick_timeout_armed);
}
#[test]
fn nop_inject_close_releases_slot() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
el.driver.close_connection(conn_index);
let ud = UserData::encode(OpTag::Close, conn_index, 0);
el.inject_and_dispatch(ud.raw(), 0);
assert!(el.driver.connections.get(conn_index).is_none());
}
#[test]
fn batch_send_error_then_recv_on_same_conn() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
el.executor.send_waiters[conn_index as usize] = true;
el.driver.send_queues[conn_index as usize].in_flight = true;
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(b"data").unwrap();
let send_ud = UserData::encode(OpTag::Send, conn_index, slot as u32);
let recv_ud = UserData::encode(OpTag::RecvMulti, conn_index, 0);
el.inject_batch_and_dispatch(&[
(send_ud.raw(), -104), (recv_ud.raw(), 0), ]);
assert!(!el.driver.send_copy_pool.in_use(slot));
let conn = el.driver.connections.get(conn_index);
assert!(conn.is_none() || matches!(conn.unwrap().recv_mode, RecvMode::Closed));
}
#[test]
fn batch_recv_eof_then_stale_send_cqe() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
el.driver.send_queues[conn_index as usize].in_flight = true;
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(b"data").unwrap();
let recv_ud = UserData::encode(OpTag::RecvMulti, conn_index, 0);
let send_ud = UserData::encode(OpTag::Send, conn_index, slot as u32);
el.inject_batch_and_dispatch(&[
(recv_ud.raw(), 0), (send_ud.raw(), 4), ]);
let conn = el.driver.connections.get(conn_index);
assert!(conn.is_none() || matches!(conn.unwrap().recv_mode, RecvMode::Closed));
assert!(!el.driver.send_copy_pool.in_use(slot));
}
#[test]
fn batch_two_sends_on_same_conn() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
el.driver.send_queues[conn_index as usize].in_flight = true;
let (slot1, _p, _l) = el.driver.send_copy_pool.copy_in(b"aaa").unwrap();
let (slot2, _p, _l) = el.driver.send_copy_pool.copy_in(b"bbb").unwrap();
let free_before = el.driver.send_copy_pool.free_count();
let ud1 = UserData::encode(OpTag::Send, conn_index, slot1 as u32);
let ud2 = UserData::encode(OpTag::Send, conn_index, slot2 as u32);
el.inject_batch_and_dispatch(&[(ud1.raw(), 3), (ud2.raw(), 3)]);
assert_eq!(
el.driver.send_copy_pool.free_count(),
free_before + 2,
"both pool slots should be released"
);
}
#[test]
fn batch_multiple_connections_interleaved() {
let mut el = make_test_loop();
let c1 = accept_connection(&mut el);
let c2 = accept_connection(&mut el);
el.executor.send_waiters[c1 as usize] = true;
el.executor.send_waiters[c2 as usize] = true;
el.driver.send_queues[c1 as usize].in_flight = true;
el.driver.send_queues[c2 as usize].in_flight = true;
let (s1, _p, _l) = el.driver.send_copy_pool.copy_in(b"hello").unwrap();
let (s2, _p, _l) = el.driver.send_copy_pool.copy_in(b"world").unwrap();
let ud1 = UserData::encode(OpTag::Send, c1, s1 as u32);
let ud2 = UserData::encode(OpTag::Send, c2, s2 as u32);
el.inject_batch_and_dispatch(&[
(ud1.raw(), 5), (ud2.raw(), -104), ]);
assert!(el.executor.io_results[c1 as usize].is_some());
assert!(el.executor.io_results[c2 as usize].is_some());
assert!(!el.driver.send_copy_pool.in_use(s1));
assert!(!el.driver.send_copy_pool.in_use(s2));
}
#[test]
fn retry_drain_copy_send_releases_on_closed_connection() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let data = b"retry-data";
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(data).unwrap();
let generation = el.driver.connections.generation(conn_index);
el.driver
.pending_copy_retries
.push((conn_index, generation, slot));
el.driver.close_connection(conn_index);
let close_ud = UserData::encode(OpTag::Close, conn_index, 0);
el.inject_and_dispatch(close_ud.raw(), 0);
el.drain_retries();
assert!(el.driver.pending_copy_retries.is_empty());
assert!(
!el.driver.send_copy_pool.in_use(slot),
"pool slot leaked on retry with closed connection"
);
}
#[test]
fn retry_drain_copy_send_with_reused_connection() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let old_generation = el.driver.connections.generation(conn_index);
let data = b"retry-data";
let (slot, _ptr, _len) = el.driver.send_copy_pool.copy_in(data).unwrap();
el.driver
.pending_copy_retries
.push((conn_index, old_generation, slot));
el.driver.close_connection(conn_index);
let close_ud = UserData::encode(OpTag::Close, conn_index, 0);
el.inject_and_dispatch(close_ud.raw(), 0);
let new_conn_index = accept_connection(&mut el);
assert_eq!(
new_conn_index, conn_index,
"expected slot reuse for generation test"
);
let new_generation = el.driver.connections.generation(conn_index);
assert_ne!(old_generation, new_generation);
el.drain_retries();
assert!(
!el.driver.send_copy_pool.in_use(slot),
"pool slot should be released on generation mismatch"
);
assert!(el.driver.connections.get(new_conn_index).is_some());
}
#[test]
fn retry_drain_zc_send_releases_on_closed_connection() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let generation = el.driver.connections.generation(conn_index);
let iovecs = [libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 100,
}];
let guards = [None, None, None, None];
let (slab_idx, _ptr) = el
.driver
.send_slab
.allocate(conn_index, &iovecs, u16::MAX, guards, 0, 100)
.unwrap();
el.driver.send_slab.inc_pending_notifs(slab_idx);
el.driver
.pending_zc_retries
.push((conn_index, generation, slab_idx));
el.driver.close_connection(conn_index);
let close_ud = UserData::encode(OpTag::Close, conn_index, 0);
el.inject_and_dispatch(close_ud.raw(), 0);
el.drain_retries();
assert!(el.driver.pending_zc_retries.is_empty());
}
#[test]
fn linked_chain_copy_send_first_fails_releases_all() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
el.executor.send_waiters[conn_index as usize] = true;
let (s1, _, _) = el.driver.send_copy_pool.copy_in(b"aaa").unwrap();
let (s2, _, _) = el.driver.send_copy_pool.copy_in(b"bbb").unwrap();
let (s3, _, _) = el.driver.send_copy_pool.copy_in(b"ccc").unwrap();
let free_before = el.driver.send_copy_pool.free_count();
el.driver.chain_table.start(conn_index, 3, 9);
let ud1 = UserData::encode(OpTag::Send, conn_index, s1 as u32);
let ud2 = UserData::encode(OpTag::Send, conn_index, s2 as u32);
let ud3 = UserData::encode(OpTag::Send, conn_index, s3 as u32);
el.inject_linked_chain_and_dispatch(&[
(ud1.raw(), -104), (ud2.raw(), 0), (ud3.raw(), 0), ]);
assert_eq!(
el.driver.send_copy_pool.free_count(),
free_before + 3,
"not all pool slots released after chain error"
);
assert!(
!el.driver.chain_table.is_active(conn_index),
"chain still active after all CQEs processed"
);
assert!(
!el.executor.send_waiters[conn_index as usize],
"send waiter not cleared"
);
assert!(
el.executor.io_results[conn_index as usize].is_some(),
"chain result not stored"
);
}
#[test]
fn linked_chain_middle_fails_rest_canceled() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
el.executor.send_waiters[conn_index as usize] = true;
let (s1, _, _) = el.driver.send_copy_pool.copy_in(b"aaa").unwrap();
let (s2, _, _) = el.driver.send_copy_pool.copy_in(b"bbb").unwrap();
let (s3, _, _) = el.driver.send_copy_pool.copy_in(b"ccc").unwrap();
let free_before = el.driver.send_copy_pool.free_count();
el.driver.chain_table.start(conn_index, 3, 9);
let ud1 = UserData::encode(OpTag::Send, conn_index, s1 as u32);
let ud2 = UserData::encode(OpTag::Send, conn_index, s2 as u32);
let ud3 = UserData::encode(OpTag::Send, conn_index, s3 as u32);
el.inject_linked_chain_and_dispatch(&[
(ud1.raw(), 3), (ud2.raw(), -104), (ud3.raw(), 0), ]);
assert_eq!(
el.driver.send_copy_pool.free_count(),
free_before + 3,
"not all pool slots released"
);
assert!(!el.driver.chain_table.is_active(conn_index));
assert!(!el.executor.send_waiters[conn_index as usize]);
}
#[test]
fn linked_chain_all_succeed() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
el.executor.send_waiters[conn_index as usize] = true;
let (s1, _, _) = el.driver.send_copy_pool.copy_in(b"aaa").unwrap();
let (s2, _, _) = el.driver.send_copy_pool.copy_in(b"bbb").unwrap();
let (s3, _, _) = el.driver.send_copy_pool.copy_in(b"ccc").unwrap();
let free_before = el.driver.send_copy_pool.free_count();
el.driver.chain_table.start(conn_index, 3, 9);
let ud1 = UserData::encode(OpTag::Send, conn_index, s1 as u32);
let ud2 = UserData::encode(OpTag::Send, conn_index, s2 as u32);
let ud3 = UserData::encode(OpTag::Send, conn_index, s3 as u32);
el.inject_linked_chain_and_dispatch(&[(ud1.raw(), 3), (ud2.raw(), 3), (ud3.raw(), 3)]);
assert_eq!(el.driver.send_copy_pool.free_count(), free_before + 3);
assert!(!el.driver.chain_table.is_active(conn_index));
assert!(!el.executor.send_waiters[conn_index as usize]);
assert!(el.executor.io_results[conn_index as usize].is_some());
}
#[test]
fn cancel_injection_timer_ecanceled() {
let mut el = make_test_loop();
let waker_id = 0u32;
let (slot, generation) = el.executor.timer_pool.allocate(waker_id).unwrap();
el.executor.timer_pool.timespecs[slot as usize] =
io_uring::types::Timespec::new().sec(10).nsec(0);
let payload = TimerSlotPool::encode_payload(slot, generation);
let timer_ud = UserData::encode(OpTag::Timer, 0, payload);
let ts_ptr =
&el.executor.timer_pool.timespecs[slot as usize] as *const io_uring::types::Timespec;
el.driver
.ring
.submit_timeout(ts_ptr, timer_ud)
.expect("submit_timeout failed");
el.driver
.ring
.submit_async_cancel(timer_ud.raw(), 0)
.expect("submit_async_cancel failed");
el.driver
.ring
.submit_and_wait(2)
.expect("submit_and_wait failed");
el.drain_completions();
assert!(
!el.executor.timer_pool.is_fired(slot),
"cancelled timer should not be fired"
);
el.executor.timer_pool.release(slot);
let (slot2, _gen2) = el.executor.timer_pool.allocate(0).unwrap();
assert_eq!(slot2, slot, "released slot should be reusable");
el.executor.timer_pool.release(slot2);
}
#[test]
fn cancel_injection_timer_fires_before_cancel() {
let mut el = make_test_loop();
let waker_id = 0u32;
let (slot, generation) = el.executor.timer_pool.allocate(waker_id).unwrap();
el.executor.timer_pool.timespecs[slot as usize] =
io_uring::types::Timespec::new().sec(0).nsec(1);
let payload = TimerSlotPool::encode_payload(slot, generation);
let timer_ud = UserData::encode(OpTag::Timer, 0, payload);
let ts_ptr =
&el.executor.timer_pool.timespecs[slot as usize] as *const io_uring::types::Timespec;
el.driver
.ring
.submit_timeout(ts_ptr, timer_ud)
.expect("submit_timeout failed");
el.driver
.ring
.submit_async_cancel(timer_ud.raw(), 0)
.expect("submit_async_cancel failed");
el.driver
.ring
.submit_and_wait(1)
.expect("submit_and_wait failed");
std::thread::sleep(std::time::Duration::from_millis(10));
el.drain_completions();
el.executor.timer_pool.release(slot);
}
#[test]
fn handle_send_recv_buf_full_send_replenishes_and_wakes() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
el.executor.send_waiters[conn_index as usize] = true;
let bid: u16 = 3;
let data_len: u32 = 100;
el.driver.send_recv_buf_original_lens[conn_index as usize] = data_len;
let payload = (bid as u32) | (data_len << 16);
let ud = UserData::encode(OpTag::SendRecvBuf, conn_index, payload);
el.test_dispatch_cqe(ud.raw(), 100, 0);
assert!(
el.driver.pending_replenish.contains(&bid),
"buffer not replenished after full send"
);
assert!(
!el.executor.send_waiters[conn_index as usize],
"send waiter not cleared"
);
assert!(
el.executor.io_results[conn_index as usize].is_some(),
"send result not stored"
);
}
#[test]
fn handle_send_recv_buf_error_replenishes_buffer() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
el.executor.send_waiters[conn_index as usize] = true;
let bid: u16 = 5;
let data_len: u32 = 200;
el.driver.send_recv_buf_original_lens[conn_index as usize] = data_len;
let payload = (bid as u32) | (data_len << 16);
let ud = UserData::encode(OpTag::SendRecvBuf, conn_index, payload);
el.test_dispatch_cqe(ud.raw(), -104, 0);
assert!(
el.driver.pending_replenish.contains(&bid),
"buffer not replenished after send error"
);
assert!(
el.executor.io_results[conn_index as usize].is_some(),
"error result not stored"
);
}
#[test]
fn handle_send_recv_buf_partial_send_computes_correct_offset() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let bid: u16 = 0;
let data_len: u32 = 100;
let (buf_ptr, buf_size) = el.driver.provided_bufs.get_buffer(bid);
assert!(
buf_size > data_len,
"test requires buffer_size > data_len to exercise the bug"
);
let test_data: Vec<u8> = (0..data_len as u8).collect();
unsafe {
std::ptr::copy_nonoverlapping(
test_data.as_ptr(),
buf_ptr as *mut u8,
data_len as usize,
);
}
el.driver.send_recv_buf_original_lens[conn_index as usize] = data_len;
let payload = (bid as u32) | (data_len << 16);
let ud = UserData::encode(OpTag::SendRecvBuf, conn_index, payload);
el.test_dispatch_cqe(ud.raw(), 60, 0);
assert!(
!el.driver.pending_replenish.contains(&bid),
"buffer replenished prematurely on partial send"
);
let new_remaining: u32 = 40;
let new_payload = (bid as u32) | (new_remaining << 16);
let new_ud = UserData::encode(OpTag::SendRecvBuf, conn_index, new_payload);
el.test_dispatch_cqe(new_ud.raw(), 40, 0);
assert!(
el.driver.pending_replenish.contains(&bid),
"buffer not replenished after retry completed"
);
}
#[test]
fn handle_send_recv_buf_double_partial_send_offset() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let bid: u16 = 2;
let data_len: u32 = 100;
let (buf_ptr, _) = el.driver.provided_bufs.get_buffer(bid);
let test_data: Vec<u8> = (0u8..100).collect();
unsafe {
std::ptr::copy_nonoverlapping(test_data.as_ptr(), buf_ptr as *mut u8, 100);
}
el.driver.send_recv_buf_original_lens[conn_index as usize] = data_len;
let payload = (bid as u32) | (data_len << 16);
let ud = UserData::encode(OpTag::SendRecvBuf, conn_index, payload);
el.test_dispatch_cqe(ud.raw(), 30, 0);
assert!(!el.driver.pending_replenish.contains(&bid));
let remaining_70 = 70u32;
let payload2 = (bid as u32) | (remaining_70 << 16);
let ud2 = UserData::encode(OpTag::SendRecvBuf, conn_index, payload2);
el.test_dispatch_cqe(ud2.raw(), 20, 0);
assert!(!el.driver.pending_replenish.contains(&bid));
let remaining_50 = 50u32;
let payload3 = (bid as u32) | (remaining_50 << 16);
let ud3 = UserData::encode(OpTag::SendRecvBuf, conn_index, payload3);
el.test_dispatch_cqe(ud3.raw(), 50, 0);
assert!(
el.driver.pending_replenish.contains(&bid),
"buffer not replenished after final partial send"
);
}
#[test]
fn handle_send_recv_buf_partial_send_pointer_correctness() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let bid: u16 = 1;
let data_len: u32 = 50;
let (_, _buf_size) = el.driver.provided_bufs.get_buffer(bid);
el.driver.send_recv_buf_original_lens[conn_index as usize] = data_len;
let payload = (bid as u32) | (data_len << 16);
let ud = UserData::encode(OpTag::SendRecvBuf, conn_index, payload);
el.test_dispatch_cqe(ud.raw(), 25, 0);
let retry_payload = (bid as u32) | (25u32 << 16);
let retry_ud = UserData::encode(OpTag::SendRecvBuf, conn_index, retry_payload);
el.test_dispatch_cqe(retry_ud.raw(), 25, 0);
assert!(
el.driver.pending_replenish.contains(&bid),
"buffer not replenished after partial send retry"
);
assert_eq!(
el.driver.send_recv_buf_original_lens[conn_index as usize], data_len,
"original_len should be preserved across partial send retries"
);
}
#[test]
fn handle_send_recv_buf_zero_result_replenishes() {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let bid: u16 = 7;
let data_len: u32 = 50;
el.driver.send_recv_buf_original_lens[conn_index as usize] = data_len;
let payload = (bid as u32) | (data_len << 16);
let ud = UserData::encode(OpTag::SendRecvBuf, conn_index, payload);
el.test_dispatch_cqe(ud.raw(), 0, 0);
assert!(
el.driver.pending_replenish.contains(&bid),
"buffer not replenished on zero-length send"
);
}
mod proptest_cqe {
use super::*;
use proptest::prelude::*;
#[derive(Debug, Clone)]
enum SendAction {
Ok,
Error,
Zero,
}
#[derive(Debug, Clone)]
enum ZcAction {
OkThenNotif,
Error,
Zero,
}
#[derive(Debug, Clone)]
enum RecvAction {
Eof,
Error,
Enobufs,
Ecanceled,
}
fn send_action_strategy() -> impl Strategy<Value = SendAction> {
prop_oneof![
Just(SendAction::Ok),
Just(SendAction::Error),
Just(SendAction::Zero),
]
}
fn zc_action_strategy() -> impl Strategy<Value = ZcAction> {
prop_oneof![
Just(ZcAction::OkThenNotif),
Just(ZcAction::Error),
Just(ZcAction::Zero),
]
}
fn recv_action_strategy() -> impl Strategy<Value = RecvAction> {
prop_oneof![
Just(RecvAction::Eof),
Just(RecvAction::Error),
Just(RecvAction::Enobufs),
Just(RecvAction::Ecanceled),
]
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(200))]
#[test]
fn send_sequence_no_pool_leak(actions in proptest::collection::vec(send_action_strategy(), 1..8)) {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
el.driver.send_queues[conn_index as usize].in_flight = true;
let initial_free = el.driver.send_copy_pool.free_count();
for action in &actions {
let data = b"test";
let (slot, _, _) = match el.driver.send_copy_pool.copy_in(data) {
Some(s) => s,
None => break, };
let ud = UserData::encode(OpTag::Send, conn_index, slot as u32);
let result = match action {
SendAction::Ok => data.len() as i32,
SendAction::Error => -104, SendAction::Zero => 0,
};
el.test_dispatch_cqe(ud.raw(), result, 0);
}
prop_assert_eq!(
el.driver.send_copy_pool.free_count(),
initial_free,
"pool slot leak detected"
);
}
#[test]
fn zc_sequence_no_slab_leak(actions in proptest::collection::vec(zc_action_strategy(), 1..6)) {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
let initial_slab_free = el.driver.send_slab.free_count();
let _initial_pool_free = el.driver.send_copy_pool.free_count();
for action in &actions {
let iovecs = [libc::iovec { iov_base: std::ptr::null_mut(), iov_len: 100 }];
let guards = [None, None, None, None];
let (slab_idx, _) = match el.driver.send_slab.allocate(conn_index, &iovecs, u16::MAX, guards, 0, 100) {
Some(s) => s,
None => break,
};
let ud = UserData::encode(OpTag::SendMsgZc, conn_index, slab_idx as u32);
match action {
ZcAction::OkThenNotif => {
el.test_dispatch_cqe(ud.raw(), 100, 0);
el.test_dispatch_cqe(ud.raw(), 0, 8); }
ZcAction::Error => {
el.test_dispatch_cqe(ud.raw(), -104, 0);
if el.driver.send_slab.in_use(slab_idx) && el.driver.send_slab.should_release(slab_idx) {
el.driver.send_slab.release(slab_idx);
}
}
ZcAction::Zero => {
el.test_dispatch_cqe(ud.raw(), 0, 0);
if el.driver.send_slab.in_use(slab_idx) && el.driver.send_slab.should_release(slab_idx) {
el.driver.send_slab.release(slab_idx);
}
}
}
}
prop_assert_eq!(
el.driver.send_slab.free_count(),
initial_slab_free,
"slab entry leak detected"
);
}
#[test]
fn recv_sequence_no_panic(actions in proptest::collection::vec(recv_action_strategy(), 1..10)) {
let mut el = make_test_loop();
let conn_index = accept_connection(&mut el);
for action in &actions {
if el.driver.connections.get(conn_index).is_none()
|| matches!(
el.driver.connections.get(conn_index).unwrap().recv_mode,
RecvMode::Closed
)
{
break;
}
let ud = UserData::encode(OpTag::RecvMulti, conn_index, 0);
let result = match action {
RecvAction::Eof => 0,
RecvAction::Error => -99,
RecvAction::Enobufs => -105,
RecvAction::Ecanceled => -125,
};
el.test_dispatch_cqe(ud.raw(), result, 0);
}
}
#[test]
fn mixed_operations_no_leak_no_panic(
actions in proptest::collection::vec(0..10u8, 5..30)
) {
let mut el = make_test_loop();
let initial_pool_free = el.driver.send_copy_pool.free_count();
let initial_slab_free = el.driver.send_slab.free_count();
let mut live_conns: Vec<u32> = Vec::new();
let mut pool_slots_in_flight: Vec<u16> = Vec::new();
for action in actions {
match action {
0 => {
if live_conns.len() < 8 {
let ci = accept_connection(&mut el);
el.driver.send_queues[ci as usize].in_flight = false;
live_conns.push(ci);
}
}
1 if !live_conns.is_empty() => {
let ci = live_conns[0];
if let Some((slot, _, _)) = el.driver.send_copy_pool.copy_in(b"data") {
let ud = UserData::encode(OpTag::Send, ci, slot as u32);
el.test_dispatch_cqe(ud.raw(), 4, 0);
}
}
2 if !live_conns.is_empty() => {
let ci = live_conns[0];
if let Some((slot, _, _)) = el.driver.send_copy_pool.copy_in(b"data") {
let ud = UserData::encode(OpTag::Send, ci, slot as u32);
el.test_dispatch_cqe(ud.raw(), -104, 0);
}
}
3 if !live_conns.is_empty() => {
let ci = live_conns[0];
let iovecs = [libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 50,
}];
let guards = [None, None, None, None];
if let Some((slab_idx, _)) = el.driver.send_slab.allocate(
ci, &iovecs, u16::MAX, guards, 0, 50,
) {
let ud = UserData::encode(
OpTag::SendMsgZc, ci, slab_idx as u32,
);
el.test_dispatch_cqe(ud.raw(), 50, 0);
el.test_dispatch_cqe(ud.raw(), 0, 8);
}
}
4 if !live_conns.is_empty() => {
let ci = live_conns[0];
let iovecs = [libc::iovec {
iov_base: std::ptr::null_mut(),
iov_len: 50,
}];
let guards = [None, None, None, None];
if let Some((slab_idx, _)) = el.driver.send_slab.allocate(
ci, &iovecs, u16::MAX, guards, 0, 50,
) {
let ud = UserData::encode(
OpTag::SendMsgZc, ci, slab_idx as u32,
);
el.test_dispatch_cqe(ud.raw(), -104, 0);
if el.driver.send_slab.in_use(slab_idx)
&& el.driver.send_slab.should_release(slab_idx)
{
el.driver.send_slab.release(slab_idx);
}
}
}
5 if !live_conns.is_empty() => {
let ci = live_conns.remove(0);
let ud = UserData::encode(OpTag::RecvMulti, ci, 0);
el.test_dispatch_cqe(ud.raw(), 0, 0);
let close_ud = UserData::encode(OpTag::Close, ci, 0);
el.test_dispatch_cqe(close_ud.raw(), 0, 0);
}
6 if !live_conns.is_empty() => {
let ci = live_conns[0];
let ud = UserData::encode(OpTag::RecvMulti, ci, 0);
el.test_dispatch_cqe(ud.raw(), -105, 0); }
7 if !live_conns.is_empty() => {
let ci = live_conns.remove(0);
if let Some((slot, _, _)) = el.driver.send_copy_pool.copy_in(b"data") {
pool_slots_in_flight.push(slot);
let send_ud = UserData::encode(OpTag::Send, ci, slot as u32);
let recv_ud = UserData::encode(OpTag::RecvMulti, ci, 0);
el.test_dispatch_cqe(recv_ud.raw(), 0, 0);
el.test_dispatch_cqe(send_ud.raw(), 4, 0);
let close_ud = UserData::encode(OpTag::Close, ci, 0);
el.test_dispatch_cqe(close_ud.raw(), 0, 0);
} else {
live_conns.insert(0, ci); }
}
8 if !live_conns.is_empty() => {
let ci = live_conns.remove(0);
el.driver.close_connection(ci);
let close_ud = UserData::encode(OpTag::Close, ci, 0);
el.test_dispatch_cqe(close_ud.raw(), 0, 0);
}
_ => {}
}
}
for ci in &live_conns {
el.driver.close_connection(*ci);
let close_ud = UserData::encode(OpTag::Close, *ci, 0);
el.test_dispatch_cqe(close_ud.raw(), 0, 0);
}
prop_assert_eq!(
el.driver.send_copy_pool.free_count(),
initial_pool_free,
"pool slot leak after mixed operations"
);
prop_assert_eq!(
el.driver.send_slab.free_count(),
initial_slab_free,
"slab entry leak after mixed operations"
);
}
}
}
}