pub(crate) mod cancellation;
pub mod channel;
pub(crate) mod handler;
pub(crate) mod io;
pub(crate) mod join;
pub(crate) mod select;
pub(crate) mod stream;
pub(crate) mod task;
pub(crate) mod waker;
use std::cell::Cell;
use std::collections::{HashMap, VecDeque};
use std::io as stdio;
use self::task::{StandaloneTaskSlab, TaskSlab};
use self::waker::drain_ready_queue;
pub(crate) enum IoResult {
Send(stdio::Result<u32>),
Connect(stdio::Result<()>),
}
pub(crate) struct RecvSink {
pub(crate) ptr: *mut u8,
pub(crate) cap: usize,
pub(crate) pos: usize,
}
thread_local! {
pub(crate) static CURRENT_TASK_ID: Cell<u32> = const { Cell::new(0) };
}
pub(crate) struct TimerSlotPool {
#[cfg(has_io_uring)]
pub(crate) timespecs: Vec<io_uring::types::Timespec>,
#[cfg(not(has_io_uring))]
pub(crate) deadlines: Vec<Option<std::time::Instant>>,
pub(crate) waker_ids: Vec<u32>,
pub(crate) fired: Vec<bool>,
pub(crate) generations: Vec<u16>,
free_list: Vec<u32>,
}
impl TimerSlotPool {
pub(crate) fn new(capacity: u32) -> Self {
let cap = capacity as usize;
let mut free_list = Vec::with_capacity(cap);
for i in 0..capacity {
free_list.push(i);
}
TimerSlotPool {
#[cfg(has_io_uring)]
timespecs: vec![io_uring::types::Timespec::new(); cap],
#[cfg(not(has_io_uring))]
deadlines: vec![None; cap],
waker_ids: vec![0; cap],
fired: vec![false; cap],
generations: vec![0; cap],
free_list,
}
}
pub(crate) fn allocate(&mut self, waker_id: u32) -> Option<(u32, u16)> {
let slot = self.free_list.pop()?;
let idx = slot as usize;
self.waker_ids[idx] = waker_id;
self.fired[idx] = false;
let generation = self.generations[idx];
Some((slot, generation))
}
pub(crate) fn release(&mut self, slot: u32) {
let idx = slot as usize;
if idx < self.generations.len() {
self.generations[idx] = self.generations[idx].wrapping_add(1);
self.free_list.push(slot);
}
}
pub(crate) fn fire(&mut self, slot: u32, generation: u16) -> Option<u32> {
let idx = slot as usize;
if idx >= self.generations.len() || self.generations[idx] != generation {
return None; }
self.fired[idx] = true;
Some(self.waker_ids[idx])
}
pub(crate) fn is_fired(&self, slot: u32) -> bool {
self.fired.get(slot as usize).copied().unwrap_or(false)
}
#[cfg_attr(not(has_io_uring), allow(dead_code))]
pub(crate) fn encode_payload(slot: u32, generation: u16) -> u32 {
(slot & 0xFFFF) | ((generation as u32) << 16)
}
#[cfg_attr(not(has_io_uring), allow(dead_code))]
pub(crate) fn decode_payload(payload: u32) -> (u32, u16) {
let slot = payload & 0xFFFF;
let generation = (payload >> 16) as u16;
(slot, generation)
}
#[cfg(has_io_uring)]
pub(crate) fn set_relative(
&mut self,
slot: u32,
duration: std::time::Duration,
) -> *const io_uring::types::Timespec {
let idx = slot as usize;
self.timespecs[idx] = io_uring::types::Timespec::new()
.sec(duration.as_secs())
.nsec(duration.subsec_nanos());
&self.timespecs[idx] as *const _
}
#[cfg(has_io_uring)]
pub(crate) fn set_absolute(
&mut self,
slot: u32,
secs: u64,
nsecs: u32,
) -> *const io_uring::types::Timespec {
let idx = slot as usize;
self.timespecs[idx] = io_uring::types::Timespec::new().sec(secs).nsec(nsecs);
&self.timespecs[idx] as *const _
}
#[cfg(not(has_io_uring))]
pub(crate) fn set_relative(&mut self, slot: u32, duration: std::time::Duration) {
let idx = slot as usize;
self.deadlines[idx] = Some(std::time::Instant::now() + duration);
}
#[cfg(not(has_io_uring))]
pub(crate) fn set_absolute(&mut self, slot: u32, secs: u64, nsecs: u32) {
let idx = slot as usize;
let mut ts = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
unsafe {
libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts);
}
let now_ns = ts.tv_sec as u128 * 1_000_000_000 + ts.tv_nsec as u128;
let deadline_ns = secs as u128 * 1_000_000_000 + nsecs as u128;
let deadline = if deadline_ns > now_ns {
std::time::Instant::now()
+ std::time::Duration::from_nanos((deadline_ns - now_ns) as u64)
} else {
std::time::Instant::now()
};
self.deadlines[idx] = Some(deadline);
}
}
pub(crate) struct Executor {
pub(crate) task_slab: TaskSlab,
pub(crate) standalone_slab: StandaloneTaskSlab,
pub(crate) timer_pool: TimerSlotPool,
pub(crate) ready_queue: VecDeque<u32>,
pub(crate) recv_waiters: Vec<bool>,
pub(crate) send_waiters: Vec<bool>,
pub(crate) connect_waiters: Vec<bool>,
pub(crate) io_results: Vec<Option<IoResult>>,
pub(crate) owner_task: Vec<Option<u32>>,
pub(crate) recv_sinks: Vec<Option<RecvSink>>,
pub(crate) udp_recv_queues: Vec<VecDeque<(Vec<u8>, std::net::SocketAddr)>>,
pub(crate) udp_recv_waiters: Vec<Option<u32>>,
pub(crate) disk_io_waiters: HashMap<u32, u32>,
pub(crate) disk_io_results: HashMap<u32, i32>,
pub(crate) fs_stat_results: HashMap<u32, crate::fs::Metadata>,
pub(crate) pending_resolves: HashMap<u64, (u32, Option<stdio::Result<std::net::SocketAddr>>)>,
pub(crate) next_resolve_id: u64,
pub(crate) pending_spawns:
HashMap<u64, (u32, Option<stdio::Result<crate::spawner::SpawnResult>>)>,
pub(crate) next_spawn_id: u64,
pub(crate) pidfd_waiters: HashMap<u32, u32>,
pub(crate) pidfd_results: HashMap<u32, i32>,
#[cfg_attr(not(has_io_uring), allow(dead_code))]
pub(crate) next_pidfd_seq: u32,
pub(crate) pending_blocking: HashMap<u64, (u32, Option<Box<dyn std::any::Any + Send>>)>,
pub(crate) next_blocking_id: u64,
}
impl Executor {
pub(crate) fn new(
max_connections: u32,
standalone_capacity: u32,
timer_slots: u32,
udp_count: u32,
) -> Self {
let cap = max_connections as usize;
let udp = udp_count as usize;
Executor {
task_slab: TaskSlab::new(max_connections),
standalone_slab: StandaloneTaskSlab::new(standalone_capacity),
timer_pool: TimerSlotPool::new(timer_slots),
ready_queue: VecDeque::with_capacity(64),
recv_waiters: vec![false; cap],
send_waiters: vec![false; cap],
connect_waiters: vec![false; cap],
io_results: {
let mut v = Vec::with_capacity(cap);
for _ in 0..cap {
v.push(None);
}
v
},
owner_task: vec![None; cap],
recv_sinks: {
let mut v = Vec::with_capacity(cap);
for _ in 0..cap {
v.push(None);
}
v
},
udp_recv_queues: (0..udp).map(|_| VecDeque::new()).collect(),
udp_recv_waiters: vec![None; udp],
disk_io_waiters: HashMap::new(),
disk_io_results: HashMap::new(),
fs_stat_results: HashMap::new(),
pending_resolves: HashMap::new(),
next_resolve_id: 0,
pending_spawns: HashMap::new(),
next_spawn_id: 0,
pidfd_waiters: HashMap::new(),
pidfd_results: HashMap::new(),
next_pidfd_seq: 0,
pending_blocking: HashMap::new(),
next_blocking_id: 0,
}
}
pub(crate) fn collect_wakeups(&mut self) {
drain_ready_queue(&mut self.ready_queue);
}
pub(crate) fn remove_connection(&mut self, conn_index: u32) {
let idx = conn_index as usize;
if idx < self.recv_sinks.len() {
self.recv_sinks[idx] = None;
}
self.task_slab.remove(conn_index);
if idx < self.recv_waiters.len() {
self.recv_waiters[idx] = false;
self.send_waiters[idx] = false;
self.connect_waiters[idx] = false;
self.io_results[idx] = None;
self.owner_task[idx] = None;
}
}
pub(crate) fn wake_task(&mut self, task_id: u32) -> bool {
if task_id & waker::STANDALONE_BIT != 0 {
let idx = task_id & !waker::STANDALONE_BIT;
if self.standalone_slab.wake(idx) {
self.ready_queue.push_back(task_id);
return true;
}
} else if self.task_slab.wake(task_id) {
self.ready_queue.push_back(task_id);
return true;
}
false
}
pub(crate) fn wake_recv(&mut self, conn_index: u32) {
let idx = conn_index as usize;
if idx < self.recv_waiters.len() && self.recv_waiters[idx] {
self.recv_waiters[idx] = false;
let task_id = self.owner_task[idx].unwrap_or(conn_index);
self.wake_task(task_id);
}
}
pub(crate) fn wake_send(&mut self, conn_index: u32, result: stdio::Result<u32>) {
let idx = conn_index as usize;
if idx < self.send_waiters.len() && self.send_waiters[idx] {
self.send_waiters[idx] = false;
self.io_results[idx] = Some(IoResult::Send(result));
let task_id = self.owner_task[idx].unwrap_or(conn_index);
self.wake_task(task_id);
}
}
pub(crate) fn wake_udp_recv(&mut self, udp_index: u32) {
let idx = udp_index as usize;
if idx < self.udp_recv_waiters.len()
&& let Some(task_id) = self.udp_recv_waiters[idx].take()
{
self.wake_task(task_id);
}
}
pub(crate) fn wake_connect(&mut self, conn_index: u32, result: stdio::Result<()>) {
let idx = conn_index as usize;
if idx < self.connect_waiters.len() && self.connect_waiters[idx] {
self.connect_waiters[idx] = false;
self.io_results[idx] = Some(IoResult::Connect(result));
let task_id = self.owner_task[idx].unwrap_or(conn_index);
self.wake_task(task_id);
}
}
#[cfg_attr(not(has_io_uring), allow(dead_code))]
pub(crate) fn wake_disk_io(&mut self, seq: u32, result: i32) {
self.disk_io_results.insert(seq, result);
if let Some(task_id) = self.disk_io_waiters.remove(&seq) {
self.wake_task(task_id);
}
}
pub(crate) fn deliver_spawn(
&mut self,
request_id: u64,
result: stdio::Result<crate::spawner::SpawnResult>,
) {
if let Some((task_id, slot)) = self.pending_spawns.get_mut(&request_id) {
*slot = Some(result);
let task_id = *task_id;
self.wake_task(task_id);
}
}
#[cfg_attr(not(has_io_uring), allow(dead_code))]
pub(crate) fn wake_pidfd(&mut self, seq: u32, result: i32) {
self.pidfd_results.insert(seq, result);
if let Some(task_id) = self.pidfd_waiters.remove(&seq) {
self.wake_task(task_id);
}
}
pub(crate) fn deliver_blocking(
&mut self,
request_id: u64,
result: Box<dyn std::any::Any + Send>,
) {
if let Some((task_id, slot)) = self.pending_blocking.get_mut(&request_id) {
*slot = Some(result);
let task_id = *task_id;
self.wake_task(task_id);
}
}
pub(crate) fn deliver_resolve(
&mut self,
request_id: u64,
result: stdio::Result<std::net::SocketAddr>,
) {
if let Some((task_id, slot)) = self.pending_resolves.get_mut(&request_id) {
*slot = Some(result);
let task_id = *task_id;
self.wake_task(task_id);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn executor_new() {
let exec = Executor::new(16, 8, 8, 0);
assert!(exec.ready_queue.is_empty());
assert_eq!(exec.recv_waiters.len(), 16);
assert_eq!(exec.send_waiters.len(), 16);
assert_eq!(exec.connect_waiters.len(), 16);
assert_eq!(exec.io_results.len(), 16);
assert_eq!(exec.owner_task.len(), 16);
}
#[test]
fn remove_connection_clears_state() {
let mut exec = Executor::new(4, 4, 4, 0);
exec.recv_waiters[1] = true;
exec.send_waiters[1] = true;
exec.connect_waiters[1] = true;
exec.io_results[1] = Some(IoResult::Send(Ok(42)));
exec.owner_task[1] = Some(0);
exec.remove_connection(1);
assert!(!exec.recv_waiters[1]);
assert!(!exec.send_waiters[1]);
assert!(!exec.connect_waiters[1]);
assert!(exec.io_results[1].is_none());
assert!(exec.owner_task[1].is_none());
}
#[test]
fn wake_task_connection_task() {
let mut exec = Executor::new(4, 4, 4, 0);
exec.task_slab
.spawn(1, Box::pin(std::future::pending::<()>()));
let fut = exec.task_slab.take_ready(1).unwrap();
exec.task_slab.park(1, fut);
assert!(exec.wake_task(1));
assert_eq!(exec.ready_queue.len(), 1);
assert_eq!(exec.ready_queue[0], 1);
}
#[test]
fn wake_task_standalone_task() {
let mut exec = Executor::new(4, 4, 4, 0);
let idx = exec
.standalone_slab
.spawn(Box::pin(std::future::pending::<()>()))
.unwrap();
let fut = exec.standalone_slab.take_ready(idx).unwrap();
exec.standalone_slab.park(idx, fut);
let task_id = idx | waker::STANDALONE_BIT;
assert!(exec.wake_task(task_id));
assert_eq!(exec.ready_queue.len(), 1);
assert_eq!(exec.ready_queue[0], task_id);
}
#[test]
fn owner_task_routes_recv_wakeup() {
let mut exec = Executor::new(16, 4, 4, 0);
exec.task_slab
.spawn(5, Box::pin(std::future::pending::<()>()));
let fut = exec.task_slab.take_ready(5).unwrap();
exec.task_slab.park(5, fut);
exec.owner_task[12] = Some(5);
exec.recv_waiters[12] = true;
exec.wake_recv(12);
assert_eq!(exec.ready_queue.len(), 1);
assert_eq!(exec.ready_queue[0], 5);
assert!(!exec.recv_waiters[12]);
}
#[test]
fn owner_task_routes_send_wakeup() {
let mut exec = Executor::new(16, 4, 4, 0);
exec.task_slab
.spawn(3, Box::pin(std::future::pending::<()>()));
let fut = exec.task_slab.take_ready(3).unwrap();
exec.task_slab.park(3, fut);
exec.owner_task[10] = Some(3);
exec.send_waiters[10] = true;
exec.wake_send(10, Ok(42));
assert_eq!(exec.ready_queue.len(), 1);
assert_eq!(exec.ready_queue[0], 3);
assert!(!exec.send_waiters[10]);
assert!(matches!(exec.io_results[10], Some(IoResult::Send(Ok(42)))));
}
#[test]
fn owner_task_routes_connect_wakeup() {
let mut exec = Executor::new(16, 4, 4, 0);
exec.task_slab
.spawn(2, Box::pin(std::future::pending::<()>()));
let fut = exec.task_slab.take_ready(2).unwrap();
exec.task_slab.park(2, fut);
exec.owner_task[8] = Some(2);
exec.connect_waiters[8] = true;
exec.wake_connect(8, Ok(()));
assert_eq!(exec.ready_queue.len(), 1);
assert_eq!(exec.ready_queue[0], 2);
assert!(!exec.connect_waiters[8]);
}
#[test]
fn owner_task_none_falls_back_to_conn_index() {
let mut exec = Executor::new(4, 4, 4, 0);
exec.task_slab
.spawn(1, Box::pin(std::future::pending::<()>()));
let fut = exec.task_slab.take_ready(1).unwrap();
exec.task_slab.park(1, fut);
exec.recv_waiters[1] = true;
exec.wake_recv(1);
assert_eq!(exec.ready_queue.len(), 1);
assert_eq!(exec.ready_queue[0], 1);
}
#[test]
fn timer_allocate_and_fire() {
let mut pool = TimerSlotPool::new(4);
let (slot, generation) = pool.allocate(42).unwrap();
assert!(!pool.is_fired(slot));
let waker_id = pool.fire(slot, generation).unwrap();
assert_eq!(waker_id, 42);
assert!(pool.is_fired(slot));
}
#[test]
fn timer_fire_stale_generation_returns_none() {
let mut pool = TimerSlotPool::new(4);
let (slot, generation) = pool.allocate(10).unwrap();
pool.release(slot);
assert!(pool.fire(slot, generation).is_none());
}
#[test]
fn timer_release_increments_generation() {
let mut pool = TimerSlotPool::new(4);
let (slot, gen0) = pool.allocate(1).unwrap();
pool.release(slot);
let (slot2, gen1) = pool.allocate(2).unwrap();
assert_eq!(slot2, slot); assert_eq!(gen1, gen0 + 1);
pool.release(slot2);
}
#[test]
fn timer_generation_wraps_at_u16_max() {
let mut pool = TimerSlotPool::new(1);
let (slot, _) = pool.allocate(1).unwrap();
pool.generations[slot as usize] = u16::MAX;
pool.release(slot);
let (slot2, generation) = pool.allocate(2).unwrap();
assert_eq!(slot2, slot);
assert_eq!(generation, 0); }
#[test]
fn timer_encode_decode_round_trip() {
let slot = 1234u32;
let generation = 5678u16;
let payload = TimerSlotPool::encode_payload(slot, generation);
let (decoded_slot, decoded_gen) = TimerSlotPool::decode_payload(payload);
assert_eq!(decoded_slot, slot);
assert_eq!(decoded_gen, generation);
}
#[test]
fn timer_encode_decode_boundary_values() {
let payload = TimerSlotPool::encode_payload(0xFFFF, 0xFFFF);
let (slot, generation) = TimerSlotPool::decode_payload(payload);
assert_eq!(slot, 0xFFFF);
assert_eq!(generation, 0xFFFF);
let payload = TimerSlotPool::encode_payload(0, 0);
let (slot, generation) = TimerSlotPool::decode_payload(payload);
assert_eq!(slot, 0);
assert_eq!(generation, 0);
}
#[test]
fn timer_exhaust_pool() {
let mut pool = TimerSlotPool::new(2);
let (s0, _) = pool.allocate(1).unwrap();
let (s1, _) = pool.allocate(2).unwrap();
assert!(pool.allocate(3).is_none());
pool.release(s0);
assert!(pool.allocate(4).is_some()); assert!(pool.allocate(5).is_none());
pool.release(s1);
}
#[test]
fn timer_is_fired_out_of_bounds() {
let pool = TimerSlotPool::new(2);
assert!(!pool.is_fired(99)); }
#[test]
fn timer_fire_out_of_bounds_returns_none() {
let mut pool = TimerSlotPool::new(2);
assert!(pool.fire(99, 0).is_none());
}
#[test]
fn timer_allocate_resets_fired_flag() {
let mut pool = TimerSlotPool::new(2);
let (slot, generation) = pool.allocate(1).unwrap();
pool.fire(slot, generation).unwrap();
assert!(pool.is_fired(slot));
pool.release(slot);
let (slot2, _) = pool.allocate(2).unwrap();
assert_eq!(slot2, slot);
assert!(!pool.is_fired(slot2)); }
}