use super::ring::IoUringState;
use crate::PipelineError;
use core::marker::PhantomData;
use core::sync::atomic::{AtomicU32, Ordering};
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct Iovec {
pub iov_base: *mut core::ffi::c_void,
pub iov_len: usize,
}
pub const IORING_OP_READV: u8 = 1;
pub const IORING_OP_READ_FIXED: u8 = 22;
pub const IORING_OP_URING_CMD: u8 = 46;
pub struct GpuMappedBuffer<'a> {
ptr: *mut u8,
len: usize,
_owner: PhantomData<&'a mut [u8]>,
}
unsafe impl Send for GpuMappedBuffer<'_> {}
unsafe impl Sync for GpuMappedBuffer<'_> {}
impl<'a> GpuMappedBuffer<'a> {
pub unsafe fn from_host_visible_slice(slice: &'a mut [u8]) -> Self {
Self {
ptr: slice.as_mut_ptr(),
len: slice.len(),
_owner: PhantomData,
}
}
pub unsafe fn from_host_visible_owner<O: ?Sized>(
_owner: &'a mut O,
ptr: *mut u8,
len: usize,
) -> Self {
Self {
ptr,
len,
_owner: PhantomData,
}
}
pub unsafe fn duplicate(&self) -> Self {
Self {
ptr: self.ptr,
len: self.len,
_owner: PhantomData,
}
}
pub fn sub_region(&self, offset: usize, len: usize) -> Result<Self, crate::PipelineError> {
let _end = vyre_driver::accounting::checked_usize_byte_range_end_lazy(
offset,
len,
self.len,
|| {
crate::PipelineError::QueueFull {
queue: "submission",
fix: "GpuMappedBuffer::sub_region offset + len overflows usize; reduce slot size or enlarge the staging buffer",
}
},
|_| {
crate::PipelineError::QueueFull {
queue: "submission",
fix: "GpuMappedBuffer::sub_region exceeds the mapped allocation; reduce slot size or enlarge the staging buffer",
}
},
)?;
Ok(Self {
ptr: self.ptr.wrapping_add(offset),
len,
_owner: PhantomData,
})
}
#[must_use]
pub fn len(&self) -> usize {
self.len
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub(crate) fn as_ptr(&self) -> *mut u8 {
self.ptr
}
pub unsafe fn as_mut_slice(&mut self) -> &mut [u8] {
unsafe { core::slice::from_raw_parts_mut(self.ptr, self.len) }
}
pub unsafe fn from_bar1_peer_with_owner<O: ?Sized>(
_owner: &'a mut O,
peer_ptr: *mut u8,
len: usize,
) -> Self {
Self {
ptr: peer_ptr,
len,
_owner: PhantomData,
}
}
}
pub struct AsyncUringStream<'a> {
pub(crate) ring_state: IoUringState,
pub(crate) gpu_buffer: GpuMappedBuffer<'a>,
pub(crate) megakernel_tail: &'a AtomicU32,
pub(crate) inflight: u32,
pub(crate) pending_submissions: u32,
}
unsafe impl Send for AsyncUringStream<'_> {}
unsafe impl Sync for AsyncUringStream<'_> {}
impl<'a> AsyncUringStream<'a> {
pub fn new(
ring_state: IoUringState,
gpu_buffer: GpuMappedBuffer<'a>,
megakernel_tail: &'a AtomicU32,
) -> Self {
Self {
ring_state,
gpu_buffer,
megakernel_tail,
inflight: 0,
pending_submissions: 0,
}
}
pub fn replace_buffer(&mut self, gpu_buffer: GpuMappedBuffer<'a>) {
self.gpu_buffer = gpu_buffer;
}
pub unsafe fn submit_read_to_gpu(
&mut self,
fd: i32,
offset: u64,
len: u32,
chunk_idx: usize,
iovs_storage: &mut [Iovec],
) -> Result<(), PipelineError> {
if iovs_storage.is_empty() {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "caller supplied empty iovs_storage; pass at least one slot",
});
}
let target_offset = checked_chunk_target_offset(chunk_idx, len)?;
unsafe { self.submit_read_to_gpu_at(fd, offset, len, target_offset, iovs_storage) }
}
pub unsafe fn submit_read_to_gpu_at(
&mut self,
fd: i32,
offset: u64,
len: u32,
target_offset: u64,
iovs_storage: &mut [Iovec],
) -> Result<(), PipelineError> {
unsafe {
self.submit_read_to_gpu_at_with_user_data(
fd,
offset,
len,
target_offset,
target_offset,
iovs_storage,
)
}
}
pub unsafe fn submit_read_to_gpu_at_with_user_data(
&mut self,
fd: i32,
offset: u64,
len: u32,
target_offset: u64,
user_data: u64,
iovs_storage: &mut [Iovec],
) -> Result<(), PipelineError> {
if iovs_storage.is_empty() {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "caller supplied empty iovs_storage; pass at least one slot",
});
}
let end = checked_target_end(target_offset, len)?;
let gpu_len = usize_to_u64(self.gpu_buffer.len(), "mapped GPU buffer length")?;
if end > gpu_len {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "target_offset + len exceeds GpuMappedBuffer length; enlarge the buffer or reduce the read size",
});
}
let Some(sqe) = self.ring_state.get_sqe() else {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
});
};
let target_addr = unsafe {
self.gpu_buffer
.as_ptr()
.add(u64_to_usize(target_offset, "target offset")?)
};
iovs_storage[0] = Iovec {
iov_base: target_addr.cast::<core::ffi::c_void>(),
iov_len: u32_to_usize(len, "read length")?,
};
sqe.opcode = IORING_OP_READV;
sqe.fd = fd;
sqe.user_data_or_off = offset;
sqe.addr = pointer_addr_u64(iovs_storage.as_ptr(), "readv iovec pointer")?;
sqe.len = 1;
sqe.user_data = user_data;
self.ring_state.commit_sqe();
increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
Ok(())
}
pub fn flush_submissions(&mut self) -> Result<(), PipelineError> {
if self.pending_submissions == 0 {
return Ok(());
}
if self.ring_state.uses_sqpoll() {
if self.ring_state.sq_needs_wakeup() {
self.ring_state.wake_sqpoll()?;
}
} else {
self.ring_state.enter(self.pending_submissions, 0, 0)?;
}
self.pending_submissions = 0;
Ok(())
}
pub fn poll(&mut self) -> Result<u32, PipelineError> {
self.flush_submissions()?;
let mut completed: u32 = 0;
let mut first_error: Option<PipelineError> = None;
while let Some(cqe) = self.ring_state.peek_cqe() {
let res = cqe.res;
self.ring_state.advance_cq();
decrement_queue_counter(&mut self.inflight, "inflight SQE count")?;
if res < 0 {
if first_error.is_none() {
first_error = Some(PipelineError::IoUringSyscall {
syscall: "io_uring_cqe",
errno: -res,
fix: "inspect user_data to identify the failed SQE; common causes: EIO on disk, EFAULT on bad iovec, EINVAL on misaligned offset",
});
}
continue;
}
completed = vyre_driver::accounting::checked_add_u32_value(
completed,
1,
PipelineError::QueueFull {
queue: "completion",
fix: "io_uring completion count overflowed u32; drain completions more frequently",
},
)?;
}
if completed != 0 {
self.megakernel_tail.fetch_add(completed, Ordering::Release);
}
match first_error {
Some(err) => Err(err),
None => Ok(completed),
}
}
pub fn wait_for_completion(&mut self) -> Result<(), PipelineError> {
if self.inflight > 0 {
self.flush_submissions()?;
self.ring_state.enter(0, 1, 1)?;
self.poll()?;
}
Ok(())
}
#[must_use]
pub fn inflight(&self) -> u32 {
self.inflight
}
#[cfg(feature = "uring-cmd-nvme")]
pub unsafe fn submit_nvme_passthrough(
&mut self,
fd: i32,
user_data: u64,
nvme_sqe_bytes: &[u8],
) -> Result<(), PipelineError> {
if nvme_sqe_bytes.len() != 64 {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "NVMe passthrough SQE must be exactly 64 bytes; see linux/nvme_ioctl.h",
});
}
let Some(sqe) = self.ring_state.get_sqe() else {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
});
};
let nvme_ptr = nvme_sqe_bytes.as_ptr();
sqe.opcode = IORING_OP_URING_CMD;
sqe.fd = fd;
sqe.user_data_or_off = 0;
sqe.addr = pointer_addr_u64(nvme_ptr, "NVMe command pointer")?;
sqe.len = 64;
sqe.user_data = user_data;
sqe.addr3 = pointer_addr_u64(nvme_ptr, "NVMe command addr3 pointer")?;
self.ring_state.commit_sqe();
increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
Ok(())
}
pub unsafe fn submit_read_fixed(
&mut self,
fd: i32,
offset: u64,
len: u32,
chunk_idx: usize,
buf_index: u16,
) -> Result<(), PipelineError> {
let target_offset = checked_chunk_target_offset(chunk_idx, len)?;
unsafe {
self.submit_read_fixed_at(
fd,
offset,
len,
target_offset,
buf_index,
usize_to_u64(chunk_idx, "chunk index")?,
)
}
}
pub unsafe fn submit_read_fixed_at(
&mut self,
fd: i32,
offset: u64,
len: u32,
target_offset: u64,
buf_index: u16,
user_data: u64,
) -> Result<(), PipelineError> {
let end = checked_target_end(target_offset, len)?;
let gpu_len = usize_to_u64(self.gpu_buffer.len(), "mapped GPU buffer length")?;
if end > gpu_len {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "chunk_idx * len exceeds GpuMappedBuffer length",
});
}
let Some(sqe) = self.ring_state.get_sqe() else {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
});
};
let target_addr = unsafe {
self.gpu_buffer
.as_ptr()
.add(u64_to_usize(target_offset, "target offset")?)
};
sqe.opcode = IORING_OP_READ_FIXED;
sqe.fd = fd;
sqe.user_data_or_off = offset;
sqe.addr = pointer_addr_u64(target_addr, "fixed-read target pointer")?;
sqe.len = len;
sqe.buf_index = buf_index;
sqe.user_data = user_data;
self.ring_state.commit_sqe();
increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
Ok(())
}
pub unsafe fn submit_read_to_gpu_fixed_file(
&mut self,
file_index: i32,
offset: u64,
len: u32,
chunk_idx: usize,
iovs_storage: &mut [Iovec],
) -> Result<(), PipelineError> {
if iovs_storage.is_empty() {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "caller supplied empty iovs_storage; pass at least one slot",
});
}
let target_offset = checked_chunk_target_offset(chunk_idx, len)?;
let end = checked_target_end(target_offset, len)?;
let gpu_len = usize_to_u64(self.gpu_buffer.len(), "mapped GPU buffer length")?;
if end > gpu_len {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "chunk_idx * len exceeds GpuMappedBuffer length",
});
}
let Some(sqe) = self.ring_state.get_sqe() else {
return Err(PipelineError::QueueFull {
queue: "submission",
fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
});
};
let target_addr = unsafe {
self.gpu_buffer
.as_ptr()
.add(u64_to_usize(target_offset, "target offset")?)
};
iovs_storage[0] = Iovec {
iov_base: target_addr.cast::<core::ffi::c_void>(),
iov_len: u32_to_usize(len, "read length")?,
};
sqe.opcode = IORING_OP_READV;
sqe.flags = super::ring::IOSQE_FIXED_FILE;
sqe.fd = file_index;
sqe.user_data_or_off = offset;
sqe.addr = pointer_addr_u64(iovs_storage.as_ptr(), "fixed-file readv iovec pointer")?;
sqe.len = 1;
sqe.user_data = usize_to_u64(chunk_idx, "chunk index")?;
self.ring_state.commit_sqe();
increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
Ok(())
}
#[cfg(not(feature = "uring-cmd-nvme"))]
#[allow(clippy::unused_self, clippy::missing_safety_doc)]
pub unsafe fn submit_nvme_passthrough(
&mut self,
_fd: i32,
_user_data: u64,
_nvme_sqe_bytes: &[u8],
) -> Result<(), PipelineError> {
Err(PipelineError::NvmePassthroughDisabled)
}
}
fn checked_chunk_target_offset(chunk_idx: usize, len: u32) -> Result<u64, PipelineError> {
let chunk_idx = usize_to_u64(chunk_idx, "chunk index")?;
vyre_driver::accounting::checked_mul_u64_lazy(chunk_idx, u64::from(len), || {
PipelineError::QueueFull {
queue: "submission",
fix: "chunk_idx * len overflows u64; split the IO batch before submission",
}
})
}
fn checked_target_end(target_offset: u64, len: u32) -> Result<u64, PipelineError> {
vyre_driver::accounting::checked_add_u64_lazy(target_offset, u64::from(len), || {
PipelineError::QueueFull {
queue: "submission",
fix: "target_offset + len overflows u64; split the IO batch before submission",
}
})
}
fn increment_queue_counter(counter: &mut u32, label: &'static str) -> Result<(), PipelineError> {
*counter = vyre_driver::accounting::checked_add_u32_value(
*counter,
1,
PipelineError::QueueFull {
queue: "submission",
fix: match label {
"inflight SQE count" => {
"inflight SQE count overflowed u32; poll completions before submitting more work"
}
"pending submission count" => {
"pending submission count overflowed u32; flush submissions before queuing more work"
}
_ => {
"io_uring queue counter overflowed u32; drain the queue before submitting more work"
}
},
},
)?;
Ok(())
}
fn decrement_queue_counter(counter: &mut u32, label: &'static str) -> Result<(), PipelineError> {
*counter = counter.checked_sub(1).ok_or(PipelineError::QueueFull {
queue: "completion",
fix: match label {
"inflight SQE count" => {
"io_uring completion arrived with no inflight SQE; rebuild the stream state"
}
_ => "io_uring queue counter underflowed; rebuild the stream state",
},
})?;
Ok(())
}
fn usize_to_u64(value: usize, label: &'static str) -> Result<u64, PipelineError> {
u64::try_from(value).map_err(|_| PipelineError::QueueFull {
queue: "submission",
fix: match label {
"chunk index" => "chunk index cannot fit u64; split the IO batch before submission",
"mapped GPU buffer length" => {
"mapped GPU buffer length cannot fit u64; split the staging allocation"
}
_ => "host usize value cannot fit u64; split the IO batch before submission",
},
})
}
fn pointer_addr_u64<T>(ptr: *const T, label: &'static str) -> Result<u64, PipelineError> {
usize_to_u64(ptr.addr(), label)
}
fn u64_to_usize(value: u64, label: &'static str) -> Result<usize, PipelineError> {
usize::try_from(value).map_err(|_| PipelineError::QueueFull {
queue: "submission",
fix: match label {
"target offset" => {
"target offset cannot fit usize; split the IO batch before submission"
}
_ => "u64 value cannot fit usize; split the IO batch before submission",
},
})
}
fn u32_to_usize(value: u32, label: &'static str) -> Result<usize, PipelineError> {
usize::try_from(value).map_err(|_| PipelineError::QueueFull {
queue: "submission",
fix: match label {
"read length" => "read length cannot fit usize; split the IO request before submission",
_ => "u32 value cannot fit usize; split the IO request before submission",
},
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn mapped_slice_roundtrip_is_miri_clean() {
let mut backing = [1_u8, 2, 3, 4];
let mut mapped = unsafe { GpuMappedBuffer::from_host_visible_slice(&mut backing) };
let slice = unsafe { mapped.as_mut_slice() };
slice[0] = 9;
slice[3] = 7;
assert_eq!(backing, [9, 2, 3, 7]);
}
}