use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use crate::megakernel::io::{claim_io_requests_into, complete_io_request, io_op};
use crate::uring::stream::AsyncUringStream;
use crate::PipelineError;
const IDLE_SPINS: u32 = 64;
const MIN_IDLE_PARK: Duration = Duration::from_micros(10);
const MAX_IDLE_PARK: Duration = Duration::from_micros(100);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RegisteredIoDestination {
pub handle: u32,
pub buf_index: u16,
pub target_offset: u64,
}
#[derive(Default)]
struct IdleBackoff {
polls: u32,
}
impl IdleBackoff {
fn reset(&mut self) {
self.polls = 0;
}
fn wait(&mut self, shutdown: &AtomicBool) {
if shutdown.load(Ordering::Acquire) {
return;
}
self.polls = self.polls.saturating_add(1);
if self.polls <= IDLE_SPINS {
std::hint::spin_loop();
return;
}
let shift = (self.polls - IDLE_SPINS).min(7);
let multiplier = 1_u32 << shift;
let park = MIN_IDLE_PARK
.checked_mul(multiplier)
.unwrap_or(MAX_IDLE_PARK)
.min(MAX_IDLE_PARK);
thread::park_timeout(park);
}
}
pub struct MegakernelIoLoop {
shutdown: Arc<AtomicBool>,
handle: Option<JoinHandle<Result<(), PipelineError>>>,
}
impl MegakernelIoLoop {
pub fn spawn(stream: AsyncUringStream<'static>, io_queue_mapped: &'static mut [u8]) -> Self {
Self::spawn_with_registered_destinations(stream, io_queue_mapped, Vec::new())
}
pub fn spawn_with_registered_destinations(
mut stream: AsyncUringStream<'static>,
io_queue_mapped: &'static mut [u8],
registered_destinations: Vec<RegisteredIoDestination>,
) -> Self {
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = Arc::clone(&shutdown);
let handle = thread::spawn(move || {
let mut backoff = IdleBackoff::default();
let mut requests = Vec::new();
let mut registered_destinations = registered_destinations;
registered_destinations.sort_unstable_by_key(|destination| destination.handle);
while !shutdown_clone.load(Ordering::Acquire) {
while let Some(cqe) = stream.ring_state.peek_cqe() {
let res = cqe.res;
let slot_idx = cqe.user_data;
stream.ring_state.advance_cq();
stream.inflight =
stream
.inflight
.checked_sub(1)
.ok_or(PipelineError::QueueFull {
queue: "io_uring",
fix: "megakernel IO loop completion arrived with no inflight SQE; rebuild the IO stream state",
})?;
let slot_idx = u32::try_from(slot_idx).map_err(|error| {
PipelineError::QueueFull {
queue: "completion",
fix: match error {
_ => "io_uring completion user_data does not fit megakernel IO slot index; keep user_data in u32 slot-id range",
},
}
})?;
complete_io_request(io_queue_mapped, slot_idx, res >= 0)?;
backoff.reset();
}
claim_io_requests_into(io_queue_mapped, &mut requests)?;
if requests.is_empty() {
if stream.inflight() > 0 {
stream.flush_submissions()?;
stream.ring_state.enter(0, 1, 1)?;
} else {
backoff.wait(&shutdown_clone);
}
continue;
}
backoff.reset();
for req in requests.iter().copied() {
match req.op_type {
io_op::READ => unsafe {
let fd = req.src_handle as i32;
if let Ok(destination_idx) = registered_destinations
.binary_search_by_key(&req.dst_handle, |destination| {
destination.handle
})
{
let destination = registered_destinations[destination_idx];
if let Err(e) = stream.submit_read_fixed_at(
fd,
req.offset,
req.byte_count,
destination.target_offset,
destination.buf_index,
u64::from(req.slot_idx),
) {
let _ =
complete_io_request(io_queue_mapped, req.slot_idx, false);
return Err(PipelineError::Backend(e.to_string()));
}
} else {
complete_io_request(io_queue_mapped, req.slot_idx, false)?;
return Err(PipelineError::Backend(format!(
"megakernel IO READ requested unregistered GPU destination handle {} in slot {}. Fix: register the destination with MegakernelIoLoop::spawn_with_registered_destinations before publishing READ requests.",
req.dst_handle, req.slot_idx
)));
}
},
io_op::FENCE => complete_io_request(io_queue_mapped, req.slot_idx, true)?,
io_op::WRITE => complete_io_request(io_queue_mapped, req.slot_idx, false)?,
_ => complete_io_request(io_queue_mapped, req.slot_idx, false)?,
}
}
if let Err(e) = stream.flush_submissions() {
for req in requests.iter().copied() {
if req.op_type == io_op::READ {
let _ = complete_io_request(io_queue_mapped, req.slot_idx, false);
}
}
return Err(e);
}
}
Ok(())
});
Self {
shutdown,
handle: Some(handle),
}
}
pub fn stop(&mut self) -> Result<(), PipelineError> {
self.shutdown.store(true, Ordering::Release);
if let Some(handle) = self.handle.take() {
handle.thread().unpark();
handle
.join()
.map_err(|_| PipelineError::Backend("IO loop thread panicked".to_string()))?
} else {
Ok(())
}
}
}