use crate::megakernel::Megakernel;
use crate::uring::stream::AsyncUringStream;
use crate::PipelineError;
use core::sync::atomic::Ordering;
use std::collections::VecDeque;
#[derive(Debug, Clone, Copy)]
struct PendingPublish {
chunk_idx: u32,
slot_idx: u32,
tenant_id: u32,
opcode: u32,
args: [u32; 3],
}
pub struct UringMegakernelPump<'a> {
stream: AsyncUringStream<'a>,
chunk_bytes: u32,
iovec_scratch: VecDeque<Box<super::stream::Iovec>>,
iovec_free: Vec<Box<super::stream::Iovec>>,
pending: VecDeque<PendingPublish>,
}
impl<'a> UringMegakernelPump<'a> {
#[must_use]
pub fn new(stream: AsyncUringStream<'a>, chunk_bytes: u32) -> Self {
Self {
stream,
chunk_bytes,
iovec_scratch: VecDeque::new(),
iovec_free: Vec::new(),
pending: VecDeque::new(),
}
}
fn acquire_iovec(&mut self) -> Box<super::stream::Iovec> {
self.iovec_free.pop().unwrap_or_else(|| {
Box::new(super::stream::Iovec {
iov_base: core::ptr::null_mut(),
iov_len: 0,
})
})
}
fn release_iovec(&mut self, mut iovec: Box<super::stream::Iovec>) {
iovec.iov_base = core::ptr::null_mut();
iovec.iov_len = 0;
self.iovec_free.push(iovec);
}
pub fn into_stream(self) -> AsyncUringStream<'a> {
self.stream
}
#[must_use]
pub fn inflight(&self) -> u32 {
self.stream.inflight()
}
#[allow(clippy::too_many_arguments)]
pub unsafe fn submit_file_scan(
&mut self,
fd: i32,
file_offset: u64,
len: u32,
chunk_idx: u32,
slot_idx: u32,
tenant_id: u32,
opcode: u32,
args: [u32; 3],
) -> Result<(), PipelineError> {
if len != self.chunk_bytes {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "submit_file_scan len must equal pump's chunk_bytes; construct a new pump for a different chunk size",
});
}
let scratch = self.acquire_iovec();
self.iovec_scratch.push_back(scratch);
let submit_result = {
let slot = self
.iovec_scratch
.back_mut()
.ok_or(PipelineError::QueueFull {
queue: "submission",
fix: "just-pushed iovec scratch slot is missing; keep io_uring scratch ownership synchronized with submit staging",
})?;
unsafe {
self.stream.submit_read_to_gpu(
fd,
file_offset,
len,
usize::try_from(chunk_idx).map_err(|_| PipelineError::QueueFull {
queue: "submission",
fix: "chunk_idx cannot fit host usize; shard io_uring megakernel pump chunks",
})?,
std::slice::from_mut(slot.as_mut()),
)
}
};
if let Err(error) = submit_result {
if let Some(iovec) = self.iovec_scratch.pop_back() {
self.release_iovec(iovec);
}
return Err(error);
}
self.pending.push_back(PendingPublish {
chunk_idx,
slot_idx,
tenant_id,
opcode,
args,
});
Ok(())
}
pub fn drain_into_ring(&mut self, ring_bytes: &mut [u8]) -> Result<u32, PipelineError> {
let mut completed: u32 = 0;
let mut first_error: Option<PipelineError> = None;
while let Some(cqe) = self.stream.ring_state.peek_cqe() {
let res = cqe.res;
self.stream.ring_state.advance_cq();
self.stream.inflight = self.stream.inflight.checked_sub(1).ok_or_else(|| {
PipelineError::Backend(
"io_uring pump completion arrived with zero inflight submissions. Fix: audit submit/drain accounting before reusing this pump.".to_string(),
)
})?;
let publish = self.pending.pop_front();
if let Some(iovec) = self.iovec_scratch.pop_front() {
self.release_iovec(iovec);
}
if res < 0 {
if let Some(p) = publish.as_ref() {
tracing::warn!(
chunk_idx = p.chunk_idx,
slot_idx = p.slot_idx,
tenant_id = p.tenant_id,
opcode = p.opcode,
errno = -res,
"uring CQE failure for pending GPU-resident chunk; failed offset is chunk_idx * chunk_bytes"
);
}
if first_error.is_none() {
first_error = Some(PipelineError::IoUringSyscall {
syscall: "io_uring_cqe",
errno: -res,
fix: "see preceding tracing::warn! for chunk_idx of the failed offset; check disk health on the source fd and verify the registered DMA buffer covers the addressed range",
});
}
continue;
}
self.stream.megakernel_tail.fetch_add(1, Ordering::Release);
if let Some(p) = publish {
Megakernel::publish_slot(ring_bytes, p.slot_idx, p.tenant_id, p.opcode, &p.args)?;
}
completed += 1;
}
match first_error {
Some(err) => Err(err),
None => Ok(completed),
}
}
#[must_use]
pub fn observe_epoch(&self, control_bytes: &[u8]) -> u32 {
Megakernel::read_epoch(control_bytes)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pending_publish_layout_matches_ring_slot() {
let mut ring = Megakernel::try_encode_empty_ring(4).unwrap();
let p = PendingPublish {
chunk_idx: 0,
slot_idx: 2,
tenant_id: 7,
opcode: 0x4000_0000,
args: [11, 22, 33],
};
Megakernel::publish_slot(&mut ring, p.slot_idx, p.tenant_id, p.opcode, &p.args)
.expect("Fix: publish slot; restore this invariant before continuing.");
let err = Megakernel::publish_slot(&mut ring, p.slot_idx, p.tenant_id, p.opcode, &p.args)
.expect_err("second publish on in-flight slot must reject");
assert!(matches!(err, PipelineError::QueueFull { .. }));
}
#[test]
fn iovec_pool_reuses_stable_box_without_retaining_stale_pointer() {
let mut iovec = Box::new(super::super::stream::Iovec {
iov_base: core::ptr::dangling_mut::<core::ffi::c_void>(),
iov_len: 4096,
});
let original_addr = (&*iovec as *const super::super::stream::Iovec) as usize;
iovec.iov_len = 8192;
let mut free = Vec::new();
iovec.iov_base = core::ptr::null_mut();
iovec.iov_len = 0;
free.push(iovec);
let reused = free.pop().expect("Fix: released iovec must be reusable");
assert_eq!(
(&*reused as *const super::super::stream::Iovec) as usize,
original_addr
);
assert!(reused.iov_base.is_null());
assert_eq!(reused.iov_len, 0);
}
#[test]
#[cfg(target_os = "linux")]
fn submit_rejects_mismatched_len() {
}
}