pub(crate) mod handler;
pub(crate) mod io;
pub(crate) mod join;
pub(crate) mod select;
pub(crate) mod task;
pub(crate) mod waker;
use std::cell::Cell;
use std::collections::{HashMap, VecDeque};
use std::io as stdio;
use self::task::{StandaloneTaskSlab, TaskSlab};
use self::waker::drain_ready_queue;
#[allow(dead_code)]
pub(crate) enum IoResult {
Send(stdio::Result<u32>),
Connect(stdio::Result<()>),
}
pub(crate) struct RecvSink {
pub(crate) ptr: *mut u8,
pub(crate) cap: usize,
pub(crate) pos: usize,
}
thread_local! {
pub(crate) static CURRENT_TASK_ID: Cell<u32> = const { Cell::new(0) };
}
pub(crate) struct TimerSlotPool {
pub(crate) timespecs: Vec<io_uring::types::Timespec>,
pub(crate) waker_ids: Vec<u32>,
pub(crate) fired: Vec<bool>,
pub(crate) generations: Vec<u16>,
free_list: Vec<u32>,
}
impl TimerSlotPool {
pub(crate) fn new(capacity: u32) -> Self {
let cap = capacity as usize;
let mut free_list = Vec::with_capacity(cap);
for i in 0..capacity {
free_list.push(i);
}
TimerSlotPool {
timespecs: vec![io_uring::types::Timespec::new(); cap],
waker_ids: vec![0; cap],
fired: vec![false; cap],
generations: vec![0; cap],
free_list,
}
}
pub(crate) fn allocate(&mut self, waker_id: u32) -> Option<(u32, u16)> {
let slot = self.free_list.pop()?;
let idx = slot as usize;
self.waker_ids[idx] = waker_id;
self.fired[idx] = false;
let generation = self.generations[idx];
Some((slot, generation))
}
pub(crate) fn release(&mut self, slot: u32) {
let idx = slot as usize;
if idx < self.generations.len() {
self.generations[idx] = self.generations[idx].wrapping_add(1);
self.free_list.push(slot);
}
}
pub(crate) fn fire(&mut self, slot: u32, generation: u16) -> Option<u32> {
let idx = slot as usize;
if idx >= self.generations.len() || self.generations[idx] != generation {
return None; }
self.fired[idx] = true;
Some(self.waker_ids[idx])
}
pub(crate) fn is_fired(&self, slot: u32) -> bool {
self.fired.get(slot as usize).copied().unwrap_or(false)
}
pub(crate) fn encode_payload(slot: u32, generation: u16) -> u32 {
(slot & 0xFFFF) | ((generation as u32) << 16)
}
pub(crate) fn decode_payload(payload: u32) -> (u32, u16) {
let slot = payload & 0xFFFF;
let generation = (payload >> 16) as u16;
(slot, generation)
}
}
pub(crate) struct Executor {
pub(crate) task_slab: TaskSlab,
pub(crate) standalone_slab: StandaloneTaskSlab,
pub(crate) timer_pool: TimerSlotPool,
pub(crate) ready_queue: VecDeque<u32>,
pub(crate) recv_waiters: Vec<bool>,
pub(crate) send_waiters: Vec<bool>,
pub(crate) connect_waiters: Vec<bool>,
pub(crate) io_results: Vec<Option<IoResult>>,
pub(crate) owner_task: Vec<Option<u32>>,
pub(crate) recv_sinks: Vec<Option<RecvSink>>,
pub(crate) udp_recv_queues: Vec<VecDeque<(Vec<u8>, std::net::SocketAddr)>>,
pub(crate) udp_recv_waiters: Vec<Option<u32>>,
pub(crate) disk_io_waiters: HashMap<u32, u32>,
pub(crate) disk_io_results: HashMap<u32, i32>,
}
impl Executor {
pub(crate) fn new(
max_connections: u32,
standalone_capacity: u32,
timer_slots: u32,
udp_count: u32,
) -> Self {
let cap = max_connections as usize;
let udp = udp_count as usize;
Executor {
task_slab: TaskSlab::new(max_connections),
standalone_slab: StandaloneTaskSlab::new(standalone_capacity),
timer_pool: TimerSlotPool::new(timer_slots),
ready_queue: VecDeque::with_capacity(64),
recv_waiters: vec![false; cap],
send_waiters: vec![false; cap],
connect_waiters: vec![false; cap],
io_results: {
let mut v = Vec::with_capacity(cap);
for _ in 0..cap {
v.push(None);
}
v
},
owner_task: vec![None; cap],
recv_sinks: {
let mut v = Vec::with_capacity(cap);
for _ in 0..cap {
v.push(None);
}
v
},
udp_recv_queues: (0..udp).map(|_| VecDeque::new()).collect(),
udp_recv_waiters: vec![None; udp],
disk_io_waiters: HashMap::new(),
disk_io_results: HashMap::new(),
}
}
pub(crate) fn collect_wakeups(&mut self) {
drain_ready_queue(&mut self.ready_queue);
}
pub(crate) fn remove_connection(&mut self, conn_index: u32) {
let idx = conn_index as usize;
if idx < self.recv_sinks.len() {
self.recv_sinks[idx] = None;
}
self.task_slab.remove(conn_index);
if idx < self.recv_waiters.len() {
self.recv_waiters[idx] = false;
self.send_waiters[idx] = false;
self.connect_waiters[idx] = false;
self.io_results[idx] = None;
self.owner_task[idx] = None;
}
}
pub(crate) fn wake_task(&mut self, task_id: u32) -> bool {
if task_id & waker::STANDALONE_BIT != 0 {
let idx = task_id & !waker::STANDALONE_BIT;
if self.standalone_slab.wake(idx) {
self.ready_queue.push_back(task_id);
return true;
}
} else if self.task_slab.wake(task_id) {
self.ready_queue.push_back(task_id);
return true;
}
false
}
pub(crate) fn wake_recv(&mut self, conn_index: u32) {
let idx = conn_index as usize;
if idx < self.recv_waiters.len() && self.recv_waiters[idx] {
self.recv_waiters[idx] = false;
let task_id = self.owner_task[idx].unwrap_or(conn_index);
self.wake_task(task_id);
}
}
pub(crate) fn wake_send(&mut self, conn_index: u32, result: stdio::Result<u32>) {
let idx = conn_index as usize;
if idx < self.send_waiters.len() && self.send_waiters[idx] {
self.send_waiters[idx] = false;
self.io_results[idx] = Some(IoResult::Send(result));
let task_id = self.owner_task[idx].unwrap_or(conn_index);
self.wake_task(task_id);
}
}
pub(crate) fn wake_udp_recv(&mut self, udp_index: u32) {
let idx = udp_index as usize;
if idx < self.udp_recv_waiters.len()
&& let Some(task_id) = self.udp_recv_waiters[idx].take()
{
self.wake_task(task_id);
}
}
pub(crate) fn wake_connect(&mut self, conn_index: u32, result: stdio::Result<()>) {
let idx = conn_index as usize;
if idx < self.connect_waiters.len() && self.connect_waiters[idx] {
self.connect_waiters[idx] = false;
self.io_results[idx] = Some(IoResult::Connect(result));
let task_id = self.owner_task[idx].unwrap_or(conn_index);
self.wake_task(task_id);
}
}
pub(crate) fn wake_disk_io(&mut self, seq: u32, result: i32) {
self.disk_io_results.insert(seq, result);
if let Some(task_id) = self.disk_io_waiters.remove(&seq) {
self.wake_task(task_id);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn executor_new() {
let exec = Executor::new(16, 8, 8, 0);
assert!(exec.ready_queue.is_empty());
assert_eq!(exec.recv_waiters.len(), 16);
assert_eq!(exec.send_waiters.len(), 16);
assert_eq!(exec.connect_waiters.len(), 16);
assert_eq!(exec.io_results.len(), 16);
assert_eq!(exec.owner_task.len(), 16);
}
#[test]
fn remove_connection_clears_state() {
let mut exec = Executor::new(4, 4, 4, 0);
exec.recv_waiters[1] = true;
exec.send_waiters[1] = true;
exec.connect_waiters[1] = true;
exec.io_results[1] = Some(IoResult::Send(Ok(42)));
exec.owner_task[1] = Some(0);
exec.remove_connection(1);
assert!(!exec.recv_waiters[1]);
assert!(!exec.send_waiters[1]);
assert!(!exec.connect_waiters[1]);
assert!(exec.io_results[1].is_none());
assert!(exec.owner_task[1].is_none());
}
#[test]
fn wake_task_connection_task() {
let mut exec = Executor::new(4, 4, 4, 0);
exec.task_slab
.spawn(1, Box::pin(std::future::pending::<()>()));
let fut = exec.task_slab.take_ready(1).unwrap();
exec.task_slab.park(1, fut);
assert!(exec.wake_task(1));
assert_eq!(exec.ready_queue.len(), 1);
assert_eq!(exec.ready_queue[0], 1);
}
#[test]
fn wake_task_standalone_task() {
let mut exec = Executor::new(4, 4, 4, 0);
let idx = exec
.standalone_slab
.spawn(Box::pin(std::future::pending::<()>()))
.unwrap();
let fut = exec.standalone_slab.take_ready(idx).unwrap();
exec.standalone_slab.park(idx, fut);
let task_id = idx | waker::STANDALONE_BIT;
assert!(exec.wake_task(task_id));
assert_eq!(exec.ready_queue.len(), 1);
assert_eq!(exec.ready_queue[0], task_id);
}
#[test]
fn owner_task_routes_recv_wakeup() {
let mut exec = Executor::new(16, 4, 4, 0);
exec.task_slab
.spawn(5, Box::pin(std::future::pending::<()>()));
let fut = exec.task_slab.take_ready(5).unwrap();
exec.task_slab.park(5, fut);
exec.owner_task[12] = Some(5);
exec.recv_waiters[12] = true;
exec.wake_recv(12);
assert_eq!(exec.ready_queue.len(), 1);
assert_eq!(exec.ready_queue[0], 5);
assert!(!exec.recv_waiters[12]);
}
#[test]
fn owner_task_routes_send_wakeup() {
let mut exec = Executor::new(16, 4, 4, 0);
exec.task_slab
.spawn(3, Box::pin(std::future::pending::<()>()));
let fut = exec.task_slab.take_ready(3).unwrap();
exec.task_slab.park(3, fut);
exec.owner_task[10] = Some(3);
exec.send_waiters[10] = true;
exec.wake_send(10, Ok(42));
assert_eq!(exec.ready_queue.len(), 1);
assert_eq!(exec.ready_queue[0], 3);
assert!(!exec.send_waiters[10]);
assert!(matches!(exec.io_results[10], Some(IoResult::Send(Ok(42)))));
}
#[test]
fn owner_task_routes_connect_wakeup() {
let mut exec = Executor::new(16, 4, 4, 0);
exec.task_slab
.spawn(2, Box::pin(std::future::pending::<()>()));
let fut = exec.task_slab.take_ready(2).unwrap();
exec.task_slab.park(2, fut);
exec.owner_task[8] = Some(2);
exec.connect_waiters[8] = true;
exec.wake_connect(8, Ok(()));
assert_eq!(exec.ready_queue.len(), 1);
assert_eq!(exec.ready_queue[0], 2);
assert!(!exec.connect_waiters[8]);
}
#[test]
fn owner_task_none_falls_back_to_conn_index() {
let mut exec = Executor::new(4, 4, 4, 0);
exec.task_slab
.spawn(1, Box::pin(std::future::pending::<()>()));
let fut = exec.task_slab.take_ready(1).unwrap();
exec.task_slab.park(1, fut);
exec.recv_waiters[1] = true;
exec.wake_recv(1);
assert_eq!(exec.ready_queue.len(), 1);
assert_eq!(exec.ready_queue[0], 1);
}
}