vyre_runtime/uring/stream.rs
1//! `AsyncUringStream` - drives io_uring reads into GPU-visible memory
2//! and advances the megakernel tail pointer on each completion.
3//!
4//! The critical safety contract: every byte read lands in a
5//! [`GpuMappedBuffer`]. Compatibility ingest uses registered
6//! host-visible GPU mappings; canonical native ingest uses BAR1 peer memory
7//! via [`GpuMappedBuffer::from_bar1_peer_with_owner`] plus NVMe passthrough.
8//! The io_uring writer never targets an ordinary userspace bounce buffer.
9
10use super::ring::IoUringState;
11use crate::PipelineError;
12use core::marker::PhantomData;
13use core::sync::atomic::{AtomicU32, Ordering};
14
15/// Minimal `iovec` struct matching the Linux ABI for `readv`.
16#[repr(C)]
17#[derive(Debug, Clone, Copy)]
18pub struct Iovec {
19 /// Target buffer address for this chunk of the read.
20 pub iov_base: *mut core::ffi::c_void,
21 /// Byte length of the target buffer.
22 pub iov_len: usize,
23}
24
25/// `IORING_OP_READV` - scatter-read into an array of iovecs.
26pub const IORING_OP_READV: u8 = 1;
27/// `IORING_OP_READ_FIXED` - read into a pre-registered buffer.
28pub const IORING_OP_READ_FIXED: u8 = 22;
29/// `IORING_OP_URING_CMD` - vendor-specific passthrough (NVMe). Kernel 6.0+.
30pub const IORING_OP_URING_CMD: u8 = 46;
31
32/// GPU-visible memory region that io_uring is allowed to DMA into.
33///
34/// Compatibility constructors cover host-visible shared mappings. The BAR1
35/// constructor covers the native GPUDirect path where NVMe DMA lands directly
36/// in GPU-owned memory.
37pub struct GpuMappedBuffer<'a> {
38 ptr: *mut u8,
39 len: usize,
40 _owner: PhantomData<&'a mut [u8]>,
41}
42
43// SAFETY: Send + Sync because (a) the constructor's safety contract
44// requires the caller to commit the lifetime invariant, and (b) the
45// raw pointer is only dereferenced by the kernel via io_uring -
46// vyre-runtime never reads through it directly.
47unsafe impl Send for GpuMappedBuffer<'_> {}
48unsafe impl Sync for GpuMappedBuffer<'_> {}
49
50impl<'a> GpuMappedBuffer<'a> {
51 /// Construct from a borrowed host-visible byte slice.
52 ///
53 /// # Safety
54 ///
55 /// The caller asserts:
56 /// - `slice` aliases a device allocation created with host-visible
57 /// host-shared usage bits by the concrete backend.
58 /// - No other code reads or writes through `slice` while the
59 /// returned handle is alive.
60 pub unsafe fn from_host_visible_slice(slice: &'a mut [u8]) -> Self {
61 Self {
62 ptr: slice.as_mut_ptr(),
63 len: slice.len(),
64 _owner: PhantomData,
65 }
66 }
67
68 /// Construct from a raw pointer + explicit owner anchor.
69 ///
70 /// This is for GPU APIs that hand back a raw mapped pointer plus an
71 /// owning handle rather than a Rust slice. The borrow on `owner`
72 /// forces the mapped region to outlive every derived
73 /// [`AsyncUringStream`].
74 ///
75 /// # Safety
76 ///
77 /// The caller asserts:
78 /// - `ptr` points into a GPU allocation owned by `owner`.
79 /// - The mapped region is `len` bytes long and host-visible.
80 /// - No other code reads or writes through `ptr` while the
81 /// returned handle is alive.
82 pub unsafe fn from_host_visible_owner<O: ?Sized>(
83 _owner: &'a mut O,
84 ptr: *mut u8,
85 len: usize,
86 ) -> Self {
87 Self {
88 ptr,
89 len,
90 _owner: PhantomData,
91 }
92 }
93
94 /// Duplicate the mapped-buffer handle for the same underlying region.
95 ///
96 /// # Safety
97 ///
98 /// The caller must uphold the same aliasing and lifetime guarantees as
99 /// [`GpuMappedBuffer::from_host_visible_slice`]. This does not clone memory;
100 /// it creates another handle to the same mapped bytes.
101 pub unsafe fn duplicate(&self) -> Self {
102 Self {
103 ptr: self.ptr,
104 len: self.len,
105 _owner: PhantomData,
106 }
107 }
108
109 /// Carve out a sub-region of this mapped buffer.
110 ///
111 /// This preserves the original constructor contract: the returned
112 /// handle aliases the same host-visible GPU allocation and carries
113 /// no ownership of its own.
114 ///
115 /// # Errors
116 ///
117 /// Returns [`PipelineError::QueueFull`] when `offset + len`
118 /// exceeds the mapped buffer bounds.
119 pub fn sub_region(&self, offset: usize, len: usize) -> Result<Self, crate::PipelineError> {
120 let _end = vyre_driver::accounting::checked_usize_byte_range_end_lazy(
121 offset,
122 len,
123 self.len,
124 || crate::PipelineError::QueueFull {
125 queue: "submission",
126 fix: "GpuMappedBuffer::sub_region offset + len overflows usize; reduce slot size or enlarge the staging buffer",
127 },
128 |_| crate::PipelineError::QueueFull {
129 queue: "submission",
130 fix: "GpuMappedBuffer::sub_region exceeds the mapped allocation; reduce slot size or enlarge the staging buffer",
131 },
132 )?;
133 Ok(Self {
134 ptr: self.ptr.wrapping_add(offset),
135 len,
136 _owner: PhantomData,
137 })
138 }
139
140 /// Byte length of the mapped region.
141 #[must_use]
142 pub fn len(&self) -> usize {
143 self.len
144 }
145
146 /// Whether the region is empty.
147 #[must_use]
148 pub fn is_empty(&self) -> bool {
149 self.len == 0
150 }
151
152 /// Raw pointer for io_uring submission. Crate-private.
153 pub(crate) fn as_ptr(&self) -> *mut u8 {
154 self.ptr
155 }
156
157 /// Borrow the mapped bytes as a mutable slice.
158 ///
159 /// # Safety
160 ///
161 /// The caller must ensure exclusive mutable access to the region for the
162 /// lifetime of the returned slice.
163 pub unsafe fn as_mut_slice(&mut self) -> &mut [u8] {
164 // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
165 unsafe { core::slice::from_raw_parts_mut(self.ptr, self.len) }
166 }
167
168 /// Construct from a PCIe peer-memory pointer
169 /// GPUDirect Storage).
170 ///
171 /// When paired with [`crate::PipelineError::NvmePassthroughDisabled`]
172 /// via the `uring-cmd-nvme` feature, this lets NVMe DMA writes
173 /// land directly in VRAM without crossing the host PCIe bridge.
174 ///
175 /// # Safety
176 ///
177 /// The caller asserts:
178 /// - `peer_ptr` points at a region returned by `nvidia_p2p_get_pages`
179 /// (Linux nvidia-fs) or `cuMemAlloc` + `cuPointerSetAttribute`
180 /// `CU_POINTER_ATTRIBUTE_SYNC_MEMOPS`.
181 /// - The GPU allocation outlives the returned handle.
182 /// - The io_uring kernel and NVMe driver both have DMA-mapping
183 /// capability (verified at runtime by checking
184 /// `/proc/driver/nvidia-fs/stats`).
185 pub unsafe fn from_bar1_peer_with_owner<O: ?Sized>(
186 _owner: &'a mut O,
187 peer_ptr: *mut u8,
188 len: usize,
189 ) -> Self {
190 Self {
191 ptr: peer_ptr,
192 len,
193 _owner: PhantomData,
194 }
195 }
196}
197
198/// Streaming reader that pushes chunked reads into an io_uring SQ and
199/// advances an atomic tail pointer the megakernel observes.
200pub struct AsyncUringStream<'a> {
201 pub(crate) ring_state: IoUringState,
202 pub(crate) gpu_buffer: GpuMappedBuffer<'a>,
203 pub(crate) megakernel_tail: &'a AtomicU32,
204 pub(crate) inflight: u32,
205 pub(crate) pending_submissions: u32,
206}
207
208// SAFETY: raw pointer fields covered by GpuMappedBuffer's contract +
209// the constructor's safety commitment on megakernel_tail_ptr.
210unsafe impl Send for AsyncUringStream<'_> {}
211unsafe impl Sync for AsyncUringStream<'_> {}
212
213impl<'a> AsyncUringStream<'a> {
214 /// Create a stream bound to the given ring state, GPU-mapped
215 /// buffer, and megakernel tail pointer.
216 pub fn new(
217 ring_state: IoUringState,
218 gpu_buffer: GpuMappedBuffer<'a>,
219 megakernel_tail: &'a AtomicU32,
220 ) -> Self {
221 Self {
222 ring_state,
223 gpu_buffer,
224 megakernel_tail,
225 inflight: 0,
226 pending_submissions: 0,
227 }
228 }
229
230 /// Rebind the target mapped buffer for future submissions.
231 pub fn replace_buffer(&mut self, gpu_buffer: GpuMappedBuffer<'a>) {
232 self.gpu_buffer = gpu_buffer;
233 }
234
235 /// Submit a scattered read of `len` bytes at file offset `offset`
236 /// into the slot at `chunk_idx * len` within the GPU buffer.
237 ///
238 /// # Errors
239 ///
240 /// - [`PipelineError::QueueFull`] if the SQ is full OR the
241 /// destination slot exceeds buffer bounds.
242 /// - Range errors surface later as [`PipelineError::IoUringSyscall`]
243 /// on `poll` if the kernel rejects the SQE.
244 ///
245 /// # Safety
246 ///
247 /// `iovs_storage` must live until this SQE's completion is reaped;
248 /// the kernel dereferences `iov_base` at I/O time, not submit time.
249 pub unsafe fn submit_read_to_gpu(
250 &mut self,
251 fd: i32,
252 offset: u64,
253 len: u32,
254 chunk_idx: usize,
255 iovs_storage: &mut [Iovec],
256 ) -> Result<(), PipelineError> {
257 if iovs_storage.is_empty() {
258 return Err(PipelineError::QueueFull {
259 queue: "submission",
260 fix: "caller supplied empty iovs_storage; pass at least one slot",
261 });
262 }
263 let target_offset = checked_chunk_target_offset(chunk_idx, len)?;
264 // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
265 unsafe { self.submit_read_to_gpu_at(fd, offset, len, target_offset, iovs_storage) }
266 }
267
268 /// Submit a read directly into a byte offset inside the mapped buffer.
269 ///
270 /// Unlike [`AsyncUringStream::submit_read_to_gpu`], this method does not
271 /// derive the destination from a fixed chunk index. Wrappers that stream
272 /// variable-sized shards can place each read contiguously in a staging
273 /// buffer without being forced into `chunk_idx * len` layout.
274 ///
275 /// # Errors
276 ///
277 /// - [`PipelineError::QueueFull`] if the SQ is full OR the target range
278 /// exceeds the mapped buffer bounds.
279 ///
280 /// # Safety
281 ///
282 /// `iovs_storage` must live until this SQE's completion is reaped.
283 pub unsafe fn submit_read_to_gpu_at(
284 &mut self,
285 fd: i32,
286 offset: u64,
287 len: u32,
288 target_offset: u64,
289 iovs_storage: &mut [Iovec],
290 ) -> Result<(), PipelineError> {
291 // SAFETY: registered fixed buffers + file index are valid for the lifetime
292 // of the ring; the SQE is built on the ring's own SQ slot.
293 unsafe {
294 self.submit_read_to_gpu_at_with_user_data(
295 fd,
296 offset,
297 len,
298 target_offset,
299 target_offset,
300 iovs_storage,
301 )
302 }
303 }
304
305 /// Submit a read into an arbitrary byte offset and preserve caller-defined
306 /// `user_data` for completion correlation.
307 ///
308 /// # Errors
309 ///
310 /// Returns [`PipelineError::QueueFull`] when the SQ is full, the iovec
311 /// storage is empty, or the target range exceeds the mapped GPU buffer.
312 ///
313 /// # Safety
314 ///
315 /// `iovs_storage` must live until this SQE's completion is reaped.
316 pub unsafe fn submit_read_to_gpu_at_with_user_data(
317 &mut self,
318 fd: i32,
319 offset: u64,
320 len: u32,
321 target_offset: u64,
322 user_data: u64,
323 iovs_storage: &mut [Iovec],
324 ) -> Result<(), PipelineError> {
325 if iovs_storage.is_empty() {
326 return Err(PipelineError::QueueFull {
327 queue: "submission",
328 fix: "caller supplied empty iovs_storage; pass at least one slot",
329 });
330 }
331 let end = checked_target_end(target_offset, len)?;
332 let gpu_len = usize_to_u64(self.gpu_buffer.len(), "mapped GPU buffer length")?;
333 if end > gpu_len {
334 return Err(PipelineError::QueueFull {
335 queue: "submission",
336 fix: "target_offset + len exceeds GpuMappedBuffer length; enlarge the buffer or reduce the read size",
337 });
338 }
339
340 let Some(sqe) = self.ring_state.get_sqe() else {
341 return Err(PipelineError::QueueFull {
342 queue: "submission",
343 fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
344 });
345 };
346
347 // SAFETY: bounds-checked above; writing to a sub-region of
348 // the host-visible GpuMappedBuffer the caller committed.
349 let target_addr = unsafe {
350 self.gpu_buffer
351 .as_ptr()
352 .add(u64_to_usize(target_offset, "target offset")?)
353 };
354
355 iovs_storage[0] = Iovec {
356 iov_base: target_addr.cast::<core::ffi::c_void>(),
357 iov_len: u32_to_usize(len, "read length")?,
358 };
359
360 sqe.opcode = IORING_OP_READV;
361 sqe.fd = fd;
362 sqe.user_data_or_off = offset;
363 sqe.addr = pointer_addr_u64(iovs_storage.as_ptr(), "readv iovec pointer")?;
364 sqe.len = 1;
365 sqe.user_data = user_data;
366
367 self.ring_state.commit_sqe();
368 increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
369 increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
370
371 Ok(())
372 }
373
374 /// Submit any queued SQEs to the kernel.
375 ///
376 /// SQPOLL can pick up tail updates on its own, but wrappers that rely on
377 /// bounded latency should not depend on the polling thread waking
378 /// promptly. Flushing pending submissions makes progress explicit.
379 pub fn flush_submissions(&mut self) -> Result<(), PipelineError> {
380 if self.pending_submissions == 0 {
381 return Ok(());
382 }
383 if self.ring_state.uses_sqpoll() {
384 if self.ring_state.sq_needs_wakeup() {
385 self.ring_state.wake_sqpoll()?;
386 }
387 } else {
388 self.ring_state.enter(self.pending_submissions, 0, 0)?;
389 }
390 self.pending_submissions = 0;
391 Ok(())
392 }
393
394 /// Reap available completions, advancing the megakernel tail
395 /// pointer once per success. Returns completions reaped.
396 ///
397 /// # Errors
398 ///
399 /// Returns [`PipelineError::IoUringSyscall`] on the first CQE
400 /// reporting `res < 0`. Remaining CQEs are still drained so the
401 /// ring does not overflow, but only the first failure is
402 /// returned - caller re-polls to pick up subsequent errors or
403 /// successes.
404 pub fn poll(&mut self) -> Result<u32, PipelineError> {
405 self.flush_submissions()?;
406 let mut completed: u32 = 0;
407 let mut first_error: Option<PipelineError> = None;
408
409 while let Some(cqe) = self.ring_state.peek_cqe() {
410 let res = cqe.res;
411 self.ring_state.advance_cq();
412 decrement_queue_counter(&mut self.inflight, "inflight SQE count")?;
413
414 if res < 0 {
415 if first_error.is_none() {
416 first_error = Some(PipelineError::IoUringSyscall {
417 syscall: "io_uring_cqe",
418 errno: -res,
419 fix: "inspect user_data to identify the failed SQE; common causes: EIO on disk, EFAULT on bad iovec, EINVAL on misaligned offset",
420 });
421 }
422 continue;
423 }
424
425 // Successful DMA: bytes landed in GPU-visible memory. Tail
426 // publication is batched after CQ drain so one poll with N
427 // completions performs one release atomic instead of N.
428 completed = vyre_driver::accounting::checked_add_u32_value(
429 completed,
430 1,
431 PipelineError::QueueFull {
432 queue: "completion",
433 fix: "io_uring completion count overflowed u32; drain completions more frequently",
434 },
435 )?;
436 }
437
438 if completed != 0 {
439 self.megakernel_tail.fetch_add(completed, Ordering::Release);
440 }
441
442 match first_error {
443 Some(err) => Err(err),
444 None => Ok(completed),
445 }
446 }
447
448 /// Flush pending submissions + wait for at least one completion.
449 ///
450 /// # Errors
451 ///
452 /// Returns [`PipelineError::IoUringSyscall`] if `io_uring_enter`
453 /// fails.
454 pub fn wait_for_completion(&mut self) -> Result<(), PipelineError> {
455 if self.inflight > 0 {
456 self.flush_submissions()?;
457 self.ring_state.enter(0, 1, 1)?;
458 self.poll()?;
459 }
460 Ok(())
461 }
462
463 /// Number of submissions still awaiting completion.
464 #[must_use]
465 pub fn inflight(&self) -> u32 {
466 self.inflight
467 }
468
469 /// Submit an NVMe passthrough command via `IORING_OP_URING_CMD`.
470 /// Requires the `uring-cmd-nvme` feature and Linux kernel 6.0+.
471 ///
472 /// The NVMe SQE layout is encoded by the caller in `nvme_sqe_bytes`
473 /// (64 bytes) - the SQE is memcpy'd into the `addr3`+`addr` slots
474 /// the kernel forwards to the NVMe driver. `user_data` is returned
475 /// on the matching CQE so the caller can correlate completions.
476 ///
477 /// # Errors
478 ///
479 /// - [`PipelineError::NvmePassthroughDisabled`] if the
480 /// `uring-cmd-nvme` feature is not enabled at compile time. This
481 /// variant is unreachable in this cfg-gated method but remains
482 /// part of the public error contract shared with the feature-gated
483 /// implementation.
484 /// - [`PipelineError::QueueFull`] if the SQ is full or the NVMe
485 /// command buffer is malformed (must be exactly 64 bytes).
486 ///
487 /// # Safety
488 ///
489 /// - `fd` must be an open character device the caller has
490 /// `IORING_SETUP_CQE32`-compatible access to (e.g. `/dev/ng0n1`).
491 /// - `nvme_sqe_bytes` must encode a valid NVMe command - kernel
492 /// rejection returns an errno on the CQE, but a forged payload
493 /// can still trigger device-level misbehavior.
494 #[cfg(feature = "uring-cmd-nvme")]
495 pub unsafe fn submit_nvme_passthrough(
496 &mut self,
497 fd: i32,
498 user_data: u64,
499 nvme_sqe_bytes: &[u8],
500 ) -> Result<(), PipelineError> {
501 if nvme_sqe_bytes.len() != 64 {
502 return Err(PipelineError::QueueFull {
503 queue: "submission",
504 fix: "NVMe passthrough SQE must be exactly 64 bytes; see linux/nvme_ioctl.h",
505 });
506 }
507
508 let Some(sqe) = self.ring_state.get_sqe() else {
509 return Err(PipelineError::QueueFull {
510 queue: "submission",
511 fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
512 });
513 };
514
515 // SAFETY: caller-provided slice is 64 bytes as validated
516 // above; we copy into the 64-byte NVMe passthrough region
517 // the kernel expects (addr + addr3 cover the first 40 bytes;
518 // the remaining 24 live in the SQE's inline payload).
519 let nvme_ptr = nvme_sqe_bytes.as_ptr();
520 sqe.opcode = IORING_OP_URING_CMD;
521 sqe.fd = fd;
522 sqe.user_data_or_off = 0;
523 // `cmd_op` in the first 4 bytes of addr3 (kernel reads it as u32).
524 sqe.addr = pointer_addr_u64(nvme_ptr, "NVMe command pointer")?;
525 sqe.len = 64;
526 sqe.user_data = user_data;
527 // The kernel reads the remaining payload bytes out of addr3
528 // directly; downstream NVMe drivers dereference this pointer.
529 sqe.addr3 = pointer_addr_u64(nvme_ptr, "NVMe command addr3 pointer")?;
530
531 self.ring_state.commit_sqe();
532 increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
533 increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
534
535 Ok(())
536 }
537
538 /// Submit an `IORING_OP_READ_FIXED` into a pre-registered buffer.
539 ///
540 /// Requires the caller to have previously called
541 /// [`super::ring::IoUringState::register_buffers`] with an iovec
542 /// slice whose entry `buf_index` covers the target range. Because
543 /// the kernel skips per-SQE iovec validation, this path is 20-40%
544 /// lower latency than `submit_read_to_gpu` on hot loops.
545 ///
546 /// # Errors
547 ///
548 /// - [`PipelineError::QueueFull`] if the SQ is full or the
549 /// destination range exceeds the GPU buffer bounds.
550 ///
551 /// # Safety
552 ///
553 /// The `buf_index` must reference a still-registered iovec whose
554 /// region overlaps `chunk_idx * len .. (chunk_idx + 1) * len`
555 /// inside the [`GpuMappedBuffer`]. Mis-indexing produces a kernel
556 /// DMA into the wrong region - silent data corruption.
557 pub unsafe fn submit_read_fixed(
558 &mut self,
559 fd: i32,
560 offset: u64,
561 len: u32,
562 chunk_idx: usize,
563 buf_index: u16,
564 ) -> Result<(), PipelineError> {
565 let target_offset = checked_chunk_target_offset(chunk_idx, len)?;
566 // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
567 unsafe {
568 self.submit_read_fixed_at(
569 fd,
570 offset,
571 len,
572 target_offset,
573 buf_index,
574 usize_to_u64(chunk_idx, "chunk index")?,
575 )
576 }
577 }
578
579 /// Submit an `IORING_OP_READ_FIXED` into a registered buffer at an
580 /// explicit destination offset inside the mapped buffer.
581 ///
582 /// Unlike [`AsyncUringStream::submit_read_fixed`], this variant decouples
583 /// the CQE `user_data` from the destination layout so higher-level
584 /// drivers can publish their own slot ids while still using a fixed slot
585 /// stride.
586 ///
587 /// # Errors
588 ///
589 /// - [`PipelineError::QueueFull`] if the SQ is full or the
590 /// destination range exceeds the GPU buffer bounds.
591 ///
592 /// # Safety
593 ///
594 /// `buf_index` must reference a still-registered iovec covering the
595 /// target range, and `user_data` must remain meaningful to the caller
596 /// until the CQE is reaped.
597 pub unsafe fn submit_read_fixed_at(
598 &mut self,
599 fd: i32,
600 offset: u64,
601 len: u32,
602 target_offset: u64,
603 buf_index: u16,
604 user_data: u64,
605 ) -> Result<(), PipelineError> {
606 let end = checked_target_end(target_offset, len)?;
607 let gpu_len = usize_to_u64(self.gpu_buffer.len(), "mapped GPU buffer length")?;
608 if end > gpu_len {
609 return Err(PipelineError::QueueFull {
610 queue: "submission",
611 fix: "chunk_idx * len exceeds GpuMappedBuffer length",
612 });
613 }
614
615 let Some(sqe) = self.ring_state.get_sqe() else {
616 return Err(PipelineError::QueueFull {
617 queue: "submission",
618 fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
619 });
620 };
621
622 // SAFETY: bounds-checked target address inside the host-visible
623 // GpuMappedBuffer the caller committed at construction.
624 let target_addr = unsafe {
625 self.gpu_buffer
626 .as_ptr()
627 .add(u64_to_usize(target_offset, "target offset")?)
628 };
629
630 sqe.opcode = IORING_OP_READ_FIXED;
631 sqe.fd = fd;
632 sqe.user_data_or_off = offset;
633 sqe.addr = pointer_addr_u64(target_addr, "fixed-read target pointer")?;
634 sqe.len = len;
635 sqe.buf_index = buf_index;
636 sqe.user_data = user_data;
637
638 self.ring_state.commit_sqe();
639 increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
640 increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
641
642 Ok(())
643 }
644
645 /// Submit a read using a registered-file-table index instead of a
646 /// live fd. Use with
647 /// [`super::ring::IoUringState::register_files`] - avoids the
648 /// per-SQE file refcount bump.
649 ///
650 /// # Errors
651 ///
652 /// Same surface as [`AsyncUringStream::submit_read_to_gpu`].
653 ///
654 /// # Safety
655 ///
656 /// `file_index` must name a still-registered fd.
657 /// `iovs_storage` must outlive the completion. All other
658 /// conditions match `submit_read_to_gpu`.
659 pub unsafe fn submit_read_to_gpu_fixed_file(
660 &mut self,
661 file_index: i32,
662 offset: u64,
663 len: u32,
664 chunk_idx: usize,
665 iovs_storage: &mut [Iovec],
666 ) -> Result<(), PipelineError> {
667 if iovs_storage.is_empty() {
668 return Err(PipelineError::QueueFull {
669 queue: "submission",
670 fix: "caller supplied empty iovs_storage; pass at least one slot",
671 });
672 }
673 let target_offset = checked_chunk_target_offset(chunk_idx, len)?;
674 let end = checked_target_end(target_offset, len)?;
675 let gpu_len = usize_to_u64(self.gpu_buffer.len(), "mapped GPU buffer length")?;
676 if end > gpu_len {
677 return Err(PipelineError::QueueFull {
678 queue: "submission",
679 fix: "chunk_idx * len exceeds GpuMappedBuffer length",
680 });
681 }
682
683 let Some(sqe) = self.ring_state.get_sqe() else {
684 return Err(PipelineError::QueueFull {
685 queue: "submission",
686 fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
687 });
688 };
689
690 // SAFETY: same invariants as submit_read_to_gpu, plus the
691 // caller committed that file_index is a registered fd.
692 let target_addr = unsafe {
693 self.gpu_buffer
694 .as_ptr()
695 .add(u64_to_usize(target_offset, "target offset")?)
696 };
697 iovs_storage[0] = Iovec {
698 iov_base: target_addr.cast::<core::ffi::c_void>(),
699 iov_len: u32_to_usize(len, "read length")?,
700 };
701
702 sqe.opcode = IORING_OP_READV;
703 sqe.flags = super::ring::IOSQE_FIXED_FILE;
704 sqe.fd = file_index;
705 sqe.user_data_or_off = offset;
706 sqe.addr = pointer_addr_u64(iovs_storage.as_ptr(), "fixed-file readv iovec pointer")?;
707 sqe.len = 1;
708 sqe.user_data = usize_to_u64(chunk_idx, "chunk index")?;
709
710 self.ring_state.commit_sqe();
711 increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
712 increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
713
714 Ok(())
715 }
716
717 /// Disabled-feature implementation for NVMe passthrough. Always returns
718 /// [`PipelineError::NvmePassthroughDisabled`] so callers get a
719 /// structured error rather than a link failure.
720 #[cfg(not(feature = "uring-cmd-nvme"))]
721 #[allow(clippy::unused_self, clippy::missing_safety_doc)]
722 pub unsafe fn submit_nvme_passthrough(
723 &mut self,
724 _fd: i32,
725 _user_data: u64,
726 _nvme_sqe_bytes: &[u8],
727 ) -> Result<(), PipelineError> {
728 Err(PipelineError::NvmePassthroughDisabled)
729 }
730}
731
732
733fn checked_chunk_target_offset(chunk_idx: usize, len: u32) -> Result<u64, PipelineError> {
734 let chunk_idx = usize_to_u64(chunk_idx, "chunk index")?;
735 vyre_driver::accounting::checked_mul_u64_lazy(chunk_idx, u64::from(len), || {
736 PipelineError::QueueFull {
737 queue: "submission",
738 fix: "chunk_idx * len overflows u64; split the IO batch before submission",
739 }
740 })
741}
742
743fn checked_target_end(target_offset: u64, len: u32) -> Result<u64, PipelineError> {
744 vyre_driver::accounting::checked_add_u64_lazy(target_offset, u64::from(len), || {
745 PipelineError::QueueFull {
746 queue: "submission",
747 fix: "target_offset + len overflows u64; split the IO batch before submission",
748 }
749 })
750}
751
752fn increment_queue_counter(counter: &mut u32, label: &'static str) -> Result<(), PipelineError> {
753 *counter = vyre_driver::accounting::checked_add_u32_value(
754 *counter,
755 1,
756 PipelineError::QueueFull {
757 queue: "submission",
758 fix: match label {
759 "inflight SQE count" => {
760 "inflight SQE count overflowed u32; poll completions before submitting more work"
761 }
762 "pending submission count" => {
763 "pending submission count overflowed u32; flush submissions before queuing more work"
764 }
765 _ => {
766 "io_uring queue counter overflowed u32; drain the queue before submitting more work"
767 }
768 },
769 },
770 )?;
771 Ok(())
772}
773
774fn decrement_queue_counter(counter: &mut u32, label: &'static str) -> Result<(), PipelineError> {
775 *counter = counter.checked_sub(1).ok_or(PipelineError::QueueFull {
776 queue: "completion",
777 fix: match label {
778 "inflight SQE count" => {
779 "io_uring completion arrived with no inflight SQE; rebuild the stream state"
780 }
781 _ => "io_uring queue counter underflowed; rebuild the stream state",
782 },
783 })?;
784 Ok(())
785}
786
787fn usize_to_u64(value: usize, label: &'static str) -> Result<u64, PipelineError> {
788 u64::try_from(value).map_err(|_| PipelineError::QueueFull {
789 queue: "submission",
790 fix: match label {
791 "chunk index" => "chunk index cannot fit u64; split the IO batch before submission",
792 "mapped GPU buffer length" => {
793 "mapped GPU buffer length cannot fit u64; split the staging allocation"
794 }
795 _ => "host usize value cannot fit u64; split the IO batch before submission",
796 },
797 })
798}
799
800fn pointer_addr_u64<T>(ptr: *const T, label: &'static str) -> Result<u64, PipelineError> {
801 usize_to_u64(ptr.addr(), label)
802}
803
804fn u64_to_usize(value: u64, label: &'static str) -> Result<usize, PipelineError> {
805 usize::try_from(value).map_err(|_| PipelineError::QueueFull {
806 queue: "submission",
807 fix: match label {
808 "target offset" => {
809 "target offset cannot fit usize; split the IO batch before submission"
810 }
811 _ => "u64 value cannot fit usize; split the IO batch before submission",
812 },
813 })
814}
815
816fn u32_to_usize(value: u32, label: &'static str) -> Result<usize, PipelineError> {
817 usize::try_from(value).map_err(|_| PipelineError::QueueFull {
818 queue: "submission",
819 fix: match label {
820 "read length" => "read length cannot fit usize; split the IO request before submission",
821 _ => "u32 value cannot fit usize; split the IO request before submission",
822 },
823 })
824}
825
826#[cfg(test)]
827mod tests {
828 use super::*;
829
830 #[test]
831 fn mapped_slice_roundtrip_is_miri_clean() {
832 let mut backing = [1_u8, 2, 3, 4];
833 // SAFETY: `backing` stays live and uniquely borrowed for the mapped buffer lifetime.
834 let mut mapped = unsafe { GpuMappedBuffer::from_host_visible_slice(&mut backing) };
835 // SAFETY: the mapped buffer was built from `backing` and remains uniquely borrowed.
836 let slice = unsafe { mapped.as_mut_slice() };
837 slice[0] = 9;
838 slice[3] = 7;
839 assert_eq!(backing, [9, 2, 3, 7]);
840 }
841}
842