Skip to main content

vyre_runtime/uring/
stream.rs

1//! `AsyncUringStream`  -  drives io_uring reads into GPU-visible memory
2//! and advances the megakernel tail pointer on each completion.
3//!
4//! The critical safety contract: every byte read lands in a
5//! [`GpuMappedBuffer`]. Compatibility ingest uses registered
6//! host-visible GPU mappings; canonical native ingest uses BAR1 peer memory
7//! via [`GpuMappedBuffer::from_bar1_peer_with_owner`] plus NVMe passthrough.
8//! The io_uring writer never targets an ordinary userspace bounce buffer.
9
10use super::ring::IoUringState;
11use crate::PipelineError;
12use core::marker::PhantomData;
13use core::sync::atomic::{AtomicU32, Ordering};
14
15/// Minimal `iovec` struct matching the Linux ABI for `readv`.
16#[repr(C)]
17#[derive(Debug, Clone, Copy)]
18pub struct Iovec {
19    /// Target buffer address for this chunk of the read.
20    pub iov_base: *mut core::ffi::c_void,
21    /// Byte length of the target buffer.
22    pub iov_len: usize,
23}
24
25/// `IORING_OP_READV`  -  scatter-read into an array of iovecs.
26pub const IORING_OP_READV: u8 = 1;
27/// `IORING_OP_READ_FIXED`  -  read into a pre-registered buffer.
28pub const IORING_OP_READ_FIXED: u8 = 22;
29/// `IORING_OP_URING_CMD`  -  vendor-specific passthrough (NVMe). Kernel 6.0+.
30pub const IORING_OP_URING_CMD: u8 = 46;
31
32/// GPU-visible memory region that io_uring is allowed to DMA into.
33///
34/// Compatibility constructors cover host-visible shared mappings. The BAR1
35/// constructor covers the native GPUDirect path where NVMe DMA lands directly
36/// in GPU-owned memory.
37pub struct GpuMappedBuffer<'a> {
38    ptr: *mut u8,
39    len: usize,
40    _owner: PhantomData<&'a mut [u8]>,
41}
42
43// SAFETY: Send + Sync because (a) the constructor's safety contract
44// requires the caller to commit the lifetime invariant, and (b) the
45// raw pointer is only dereferenced by the kernel via io_uring  -
46// vyre-runtime never reads through it directly.
47unsafe impl Send for GpuMappedBuffer<'_> {}
48unsafe impl Sync for GpuMappedBuffer<'_> {}
49
50impl<'a> GpuMappedBuffer<'a> {
51    /// Construct from a borrowed host-visible byte slice.
52    ///
53    /// # Safety
54    ///
55    /// The caller asserts:
56    /// - `slice` aliases a device allocation created with host-visible
57    ///   host-shared usage bits by the concrete backend.
58    /// - No other code reads or writes through `slice` while the
59    ///   returned handle is alive.
60    pub unsafe fn from_host_visible_slice(slice: &'a mut [u8]) -> Self {
61        Self {
62            ptr: slice.as_mut_ptr(),
63            len: slice.len(),
64            _owner: PhantomData,
65        }
66    }
67
68    /// Construct from a raw pointer + explicit owner anchor.
69    ///
70    /// This is for GPU APIs that hand back a raw mapped pointer plus an
71    /// owning handle rather than a Rust slice. The borrow on `owner`
72    /// forces the mapped region to outlive every derived
73    /// [`AsyncUringStream`].
74    ///
75    /// # Safety
76    ///
77    /// The caller asserts:
78    /// - `ptr` points into a GPU allocation owned by `owner`.
79    /// - The mapped region is `len` bytes long and host-visible.
80    /// - No other code reads or writes through `ptr` while the
81    ///   returned handle is alive.
82    pub unsafe fn from_host_visible_owner<O: ?Sized>(
83        _owner: &'a mut O,
84        ptr: *mut u8,
85        len: usize,
86    ) -> Self {
87        Self {
88            ptr,
89            len,
90            _owner: PhantomData,
91        }
92    }
93
94    /// Duplicate the mapped-buffer handle for the same underlying region.
95    ///
96    /// # Safety
97    ///
98    /// The caller must uphold the same aliasing and lifetime guarantees as
99    /// [`GpuMappedBuffer::from_host_visible_slice`]. This does not clone memory;
100    /// it creates another handle to the same mapped bytes.
101    pub unsafe fn duplicate(&self) -> Self {
102        Self {
103            ptr: self.ptr,
104            len: self.len,
105            _owner: PhantomData,
106        }
107    }
108
109    /// Carve out a sub-region of this mapped buffer.
110    ///
111    /// This preserves the original constructor contract: the returned
112    /// handle aliases the same host-visible GPU allocation and carries
113    /// no ownership of its own.
114    ///
115    /// # Errors
116    ///
117    /// Returns [`PipelineError::QueueFull`] when `offset + len`
118    /// exceeds the mapped buffer bounds.
119    pub fn sub_region(&self, offset: usize, len: usize) -> Result<Self, crate::PipelineError> {
120        let _end = vyre_driver::accounting::checked_usize_byte_range_end_lazy(
121            offset,
122            len,
123            self.len,
124            || {
125                crate::PipelineError::QueueFull {
126                queue: "submission",
127                fix: "GpuMappedBuffer::sub_region offset + len overflows usize; reduce slot size or enlarge the staging buffer",
128            }
129            },
130            |_| {
131                crate::PipelineError::QueueFull {
132                queue: "submission",
133                fix: "GpuMappedBuffer::sub_region exceeds the mapped allocation; reduce slot size or enlarge the staging buffer",
134            }
135            },
136        )?;
137        Ok(Self {
138            ptr: self.ptr.wrapping_add(offset),
139            len,
140            _owner: PhantomData,
141        })
142    }
143
144    /// Byte length of the mapped region.
145    #[must_use]
146    pub fn len(&self) -> usize {
147        self.len
148    }
149
150    /// Whether the region is empty.
151    #[must_use]
152    pub fn is_empty(&self) -> bool {
153        self.len == 0
154    }
155
156    /// Raw pointer for io_uring submission. Crate-private.
157    pub(crate) fn as_ptr(&self) -> *mut u8 {
158        self.ptr
159    }
160
161    /// Borrow the mapped bytes as a mutable slice.
162    ///
163    /// # Safety
164    ///
165    /// The caller must ensure exclusive mutable access to the region for the
166    /// lifetime of the returned slice.
167    pub unsafe fn as_mut_slice(&mut self) -> &mut [u8] {
168        // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
169        unsafe { core::slice::from_raw_parts_mut(self.ptr, self.len) }
170    }
171
172    /// Construct from a PCIe peer-memory pointer
173    /// GPUDirect Storage).
174    ///
175    /// When paired with [`crate::PipelineError::NvmePassthroughDisabled`]
176    /// via the `uring-cmd-nvme` feature, this lets NVMe DMA writes
177    /// land directly in VRAM without crossing the host PCIe bridge.
178    ///
179    /// # Safety
180    ///
181    /// The caller asserts:
182    /// - `peer_ptr` points at a region returned by `nvidia_p2p_get_pages`
183    ///   (Linux nvidia-fs) or `cuMemAlloc` + `cuPointerSetAttribute`
184    ///   `CU_POINTER_ATTRIBUTE_SYNC_MEMOPS`.
185    /// - The GPU allocation outlives the returned handle.
186    /// - The io_uring kernel and NVMe driver both have DMA-mapping
187    ///   capability (verified at runtime by checking
188    ///   `/proc/driver/nvidia-fs/stats`).
189    pub unsafe fn from_bar1_peer_with_owner<O: ?Sized>(
190        _owner: &'a mut O,
191        peer_ptr: *mut u8,
192        len: usize,
193    ) -> Self {
194        Self {
195            ptr: peer_ptr,
196            len,
197            _owner: PhantomData,
198        }
199    }
200}
201
202/// Streaming reader that pushes chunked reads into an io_uring SQ and
203/// advances an atomic tail pointer the megakernel observes.
204pub struct AsyncUringStream<'a> {
205    pub(crate) ring_state: IoUringState,
206    pub(crate) gpu_buffer: GpuMappedBuffer<'a>,
207    pub(crate) megakernel_tail: &'a AtomicU32,
208    pub(crate) inflight: u32,
209    pub(crate) pending_submissions: u32,
210}
211
212// SAFETY: raw pointer fields covered by GpuMappedBuffer's contract +
213// the constructor's safety commitment on megakernel_tail_ptr.
214unsafe impl Send for AsyncUringStream<'_> {}
215unsafe impl Sync for AsyncUringStream<'_> {}
216
217impl<'a> AsyncUringStream<'a> {
218    /// Create a stream bound to the given ring state, GPU-mapped
219    /// buffer, and megakernel tail pointer.
220    pub fn new(
221        ring_state: IoUringState,
222        gpu_buffer: GpuMappedBuffer<'a>,
223        megakernel_tail: &'a AtomicU32,
224    ) -> Self {
225        Self {
226            ring_state,
227            gpu_buffer,
228            megakernel_tail,
229            inflight: 0,
230            pending_submissions: 0,
231        }
232    }
233
234    /// Rebind the target mapped buffer for future submissions.
235    pub fn replace_buffer(&mut self, gpu_buffer: GpuMappedBuffer<'a>) {
236        self.gpu_buffer = gpu_buffer;
237    }
238
239    /// Submit a scattered read of `len` bytes at file offset `offset`
240    /// into the slot at `chunk_idx * len` within the GPU buffer.
241    ///
242    /// # Errors
243    ///
244    /// - [`PipelineError::QueueFull`] if the SQ is full OR the
245    ///   destination slot exceeds buffer bounds.
246    /// - Range errors surface later as [`PipelineError::IoUringSyscall`]
247    ///   on `poll` if the kernel rejects the SQE.
248    ///
249    /// # Safety
250    ///
251    /// `iovs_storage` must live until this SQE's completion is reaped;
252    /// the kernel dereferences `iov_base` at I/O time, not submit time.
253    pub unsafe fn submit_read_to_gpu(
254        &mut self,
255        fd: i32,
256        offset: u64,
257        len: u32,
258        chunk_idx: usize,
259        iovs_storage: &mut [Iovec],
260    ) -> Result<(), PipelineError> {
261        if iovs_storage.is_empty() {
262            return Err(PipelineError::QueueFull {
263                queue: "submission",
264                fix: "caller supplied empty iovs_storage; pass at least one slot",
265            });
266        }
267        let target_offset = checked_chunk_target_offset(chunk_idx, len)?;
268        // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
269        unsafe { self.submit_read_to_gpu_at(fd, offset, len, target_offset, iovs_storage) }
270    }
271
272    /// Submit a read directly into a byte offset inside the mapped buffer.
273    ///
274    /// Unlike [`AsyncUringStream::submit_read_to_gpu`], this method does not
275    /// derive the destination from a fixed chunk index. Wrappers that stream
276    /// variable-sized shards can place each read contiguously in a staging
277    /// buffer without being forced into `chunk_idx * len` layout.
278    ///
279    /// # Errors
280    ///
281    /// - [`PipelineError::QueueFull`] if the SQ is full OR the target range
282    ///   exceeds the mapped buffer bounds.
283    ///
284    /// # Safety
285    ///
286    /// `iovs_storage` must live until this SQE's completion is reaped.
287    pub unsafe fn submit_read_to_gpu_at(
288        &mut self,
289        fd: i32,
290        offset: u64,
291        len: u32,
292        target_offset: u64,
293        iovs_storage: &mut [Iovec],
294    ) -> Result<(), PipelineError> {
295        // SAFETY: registered fixed buffers + file index are valid for the lifetime
296        // of the ring; the SQE is built on the ring's own SQ slot.
297        unsafe {
298            self.submit_read_to_gpu_at_with_user_data(
299                fd,
300                offset,
301                len,
302                target_offset,
303                target_offset,
304                iovs_storage,
305            )
306        }
307    }
308
309    /// Submit a read into an arbitrary byte offset and preserve caller-defined
310    /// `user_data` for completion correlation.
311    ///
312    /// # Errors
313    ///
314    /// Returns [`PipelineError::QueueFull`] when the SQ is full, the iovec
315    /// storage is empty, or the target range exceeds the mapped GPU buffer.
316    ///
317    /// # Safety
318    ///
319    /// `iovs_storage` must live until this SQE's completion is reaped.
320    pub unsafe fn submit_read_to_gpu_at_with_user_data(
321        &mut self,
322        fd: i32,
323        offset: u64,
324        len: u32,
325        target_offset: u64,
326        user_data: u64,
327        iovs_storage: &mut [Iovec],
328    ) -> Result<(), PipelineError> {
329        if iovs_storage.is_empty() {
330            return Err(PipelineError::QueueFull {
331                queue: "submission",
332                fix: "caller supplied empty iovs_storage; pass at least one slot",
333            });
334        }
335        let end = checked_target_end(target_offset, len)?;
336        let gpu_len = usize_to_u64(self.gpu_buffer.len(), "mapped GPU buffer length")?;
337        if end > gpu_len {
338            return Err(PipelineError::QueueFull {
339                queue: "submission",
340                fix: "target_offset + len exceeds GpuMappedBuffer length; enlarge the buffer or reduce the read size",
341            });
342        }
343
344        let Some(sqe) = self.ring_state.get_sqe() else {
345            return Err(PipelineError::QueueFull {
346                queue: "submission",
347                fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
348            });
349        };
350
351        // SAFETY: bounds-checked above; writing to a sub-region of
352        // the host-visible GpuMappedBuffer the caller committed.
353        let target_addr = unsafe {
354            self.gpu_buffer
355                .as_ptr()
356                .add(u64_to_usize(target_offset, "target offset")?)
357        };
358
359        iovs_storage[0] = Iovec {
360            iov_base: target_addr.cast::<core::ffi::c_void>(),
361            iov_len: u32_to_usize(len, "read length")?,
362        };
363
364        sqe.opcode = IORING_OP_READV;
365        sqe.fd = fd;
366        sqe.user_data_or_off = offset;
367        sqe.addr = pointer_addr_u64(iovs_storage.as_ptr(), "readv iovec pointer")?;
368        sqe.len = 1;
369        sqe.user_data = user_data;
370
371        self.ring_state.commit_sqe();
372        increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
373        increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
374
375        Ok(())
376    }
377
378    /// Submit any queued SQEs to the kernel.
379    ///
380    /// SQPOLL can pick up tail updates on its own, but wrappers that rely on
381    /// bounded latency should not depend on the polling thread waking
382    /// promptly. Flushing pending submissions makes progress explicit.
383    pub fn flush_submissions(&mut self) -> Result<(), PipelineError> {
384        if self.pending_submissions == 0 {
385            return Ok(());
386        }
387        if self.ring_state.uses_sqpoll() {
388            if self.ring_state.sq_needs_wakeup() {
389                self.ring_state.wake_sqpoll()?;
390            }
391        } else {
392            self.ring_state.enter(self.pending_submissions, 0, 0)?;
393        }
394        self.pending_submissions = 0;
395        Ok(())
396    }
397
398    /// Reap available completions, advancing the megakernel tail
399    /// pointer once per success. Returns completions reaped.
400    ///
401    /// # Errors
402    ///
403    /// Returns [`PipelineError::IoUringSyscall`] on the first CQE
404    /// reporting `res < 0`. Remaining CQEs are still drained so the
405    /// ring does not overflow, but only the first failure is
406    /// returned  -  caller re-polls to pick up subsequent errors or
407    /// successes.
408    pub fn poll(&mut self) -> Result<u32, PipelineError> {
409        self.flush_submissions()?;
410        let mut completed: u32 = 0;
411        let mut first_error: Option<PipelineError> = None;
412
413        while let Some(cqe) = self.ring_state.peek_cqe() {
414            let res = cqe.res;
415            self.ring_state.advance_cq();
416            decrement_queue_counter(&mut self.inflight, "inflight SQE count")?;
417
418            if res < 0 {
419                if first_error.is_none() {
420                    first_error = Some(PipelineError::IoUringSyscall {
421                        syscall: "io_uring_cqe",
422                        errno: -res,
423                        fix: "inspect user_data to identify the failed SQE; common causes: EIO on disk, EFAULT on bad iovec, EINVAL on misaligned offset",
424                    });
425                }
426                continue;
427            }
428
429            // Successful DMA: bytes landed in GPU-visible memory. Tail
430            // publication is batched after CQ drain so one poll with N
431            // completions performs one release atomic instead of N.
432            completed = vyre_driver::accounting::checked_add_u32_value(
433                completed,
434                1,
435                PipelineError::QueueFull {
436                    queue: "completion",
437                    fix: "io_uring completion count overflowed u32; drain completions more frequently",
438                },
439            )?;
440        }
441
442        if completed != 0 {
443            self.megakernel_tail.fetch_add(completed, Ordering::Release);
444        }
445
446        match first_error {
447            Some(err) => Err(err),
448            None => Ok(completed),
449        }
450    }
451
452    /// Flush pending submissions + wait for at least one completion.
453    ///
454    /// # Errors
455    ///
456    /// Returns [`PipelineError::IoUringSyscall`] if `io_uring_enter`
457    /// fails.
458    pub fn wait_for_completion(&mut self) -> Result<(), PipelineError> {
459        if self.inflight > 0 {
460            self.flush_submissions()?;
461            self.ring_state.enter(0, 1, 1)?;
462            self.poll()?;
463        }
464        Ok(())
465    }
466
467    /// Number of submissions still awaiting completion.
468    #[must_use]
469    pub fn inflight(&self) -> u32 {
470        self.inflight
471    }
472
473    /// Submit an NVMe passthrough command via `IORING_OP_URING_CMD`.
474    /// Requires the `uring-cmd-nvme` feature and Linux kernel 6.0+.
475    ///
476    /// The NVMe SQE layout is encoded by the caller in `nvme_sqe_bytes`
477    /// (64 bytes)  -  the SQE is memcpy'd into the `addr3`+`addr` slots
478    /// the kernel forwards to the NVMe driver. `user_data` is returned
479    /// on the matching CQE so the caller can correlate completions.
480    ///
481    /// # Errors
482    ///
483    /// - [`PipelineError::NvmePassthroughDisabled`] if the
484    ///   `uring-cmd-nvme` feature is not enabled at compile time. This
485    ///   variant is unreachable in this cfg-gated method but remains
486    ///   part of the public error contract shared with the feature-gated
487    ///   implementation.
488    /// - [`PipelineError::QueueFull`] if the SQ is full or the NVMe
489    ///   command buffer is malformed (must be exactly 64 bytes).
490    ///
491    /// # Safety
492    ///
493    /// - `fd` must be an open character device the caller has
494    ///   `IORING_SETUP_CQE32`-compatible access to (e.g. `/dev/ng0n1`).
495    /// - `nvme_sqe_bytes` must encode a valid NVMe command  -  kernel
496    ///   rejection returns an errno on the CQE, but a forged payload
497    ///   can still trigger device-level misbehavior.
498    #[cfg(feature = "uring-cmd-nvme")]
499    pub unsafe fn submit_nvme_passthrough(
500        &mut self,
501        fd: i32,
502        user_data: u64,
503        nvme_sqe_bytes: &[u8],
504    ) -> Result<(), PipelineError> {
505        if nvme_sqe_bytes.len() != 64 {
506            return Err(PipelineError::QueueFull {
507                queue: "submission",
508                fix: "NVMe passthrough SQE must be exactly 64 bytes; see linux/nvme_ioctl.h",
509            });
510        }
511
512        let Some(sqe) = self.ring_state.get_sqe() else {
513            return Err(PipelineError::QueueFull {
514                queue: "submission",
515                fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
516            });
517        };
518
519        // SAFETY: caller-provided slice is 64 bytes as validated
520        // above; we copy into the 64-byte NVMe passthrough region
521        // the kernel expects (addr + addr3 cover the first 40 bytes;
522        // the remaining 24 live in the SQE's inline payload).
523        let nvme_ptr = nvme_sqe_bytes.as_ptr();
524        sqe.opcode = IORING_OP_URING_CMD;
525        sqe.fd = fd;
526        sqe.user_data_or_off = 0;
527        // `cmd_op` in the first 4 bytes of addr3 (kernel reads it as u32).
528        sqe.addr = pointer_addr_u64(nvme_ptr, "NVMe command pointer")?;
529        sqe.len = 64;
530        sqe.user_data = user_data;
531        // The kernel reads the remaining payload bytes out of addr3
532        // directly; downstream NVMe drivers dereference this pointer.
533        sqe.addr3 = pointer_addr_u64(nvme_ptr, "NVMe command addr3 pointer")?;
534
535        self.ring_state.commit_sqe();
536        increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
537        increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
538
539        Ok(())
540    }
541
542    /// Submit an `IORING_OP_READ_FIXED` into a pre-registered buffer.
543    ///
544    /// Requires the caller to have previously called
545    /// [`super::ring::IoUringState::register_buffers`] with an iovec
546    /// slice whose entry `buf_index` covers the target range. Because
547    /// the kernel skips per-SQE iovec validation, this path is 20-40%
548    /// lower latency than `submit_read_to_gpu` on hot loops.
549    ///
550    /// # Errors
551    ///
552    /// - [`PipelineError::QueueFull`] if the SQ is full or the
553    ///   destination range exceeds the GPU buffer bounds.
554    ///
555    /// # Safety
556    ///
557    /// The `buf_index` must reference a still-registered iovec whose
558    /// region overlaps `chunk_idx * len .. (chunk_idx + 1) * len`
559    /// inside the [`GpuMappedBuffer`]. Mis-indexing produces a kernel
560    /// DMA into the wrong region  -  silent data corruption.
561    pub unsafe fn submit_read_fixed(
562        &mut self,
563        fd: i32,
564        offset: u64,
565        len: u32,
566        chunk_idx: usize,
567        buf_index: u16,
568    ) -> Result<(), PipelineError> {
569        let target_offset = checked_chunk_target_offset(chunk_idx, len)?;
570        // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
571        unsafe {
572            self.submit_read_fixed_at(
573                fd,
574                offset,
575                len,
576                target_offset,
577                buf_index,
578                usize_to_u64(chunk_idx, "chunk index")?,
579            )
580        }
581    }
582
583    /// Submit an `IORING_OP_READ_FIXED` into a registered buffer at an
584    /// explicit destination offset inside the mapped buffer.
585    ///
586    /// Unlike [`AsyncUringStream::submit_read_fixed`], this variant decouples
587    /// the CQE `user_data` from the destination layout so higher-level
588    /// drivers can publish their own slot ids while still using a fixed slot
589    /// stride.
590    ///
591    /// # Errors
592    ///
593    /// - [`PipelineError::QueueFull`] if the SQ is full or the
594    ///   destination range exceeds the GPU buffer bounds.
595    ///
596    /// # Safety
597    ///
598    /// `buf_index` must reference a still-registered iovec covering the
599    /// target range, and `user_data` must remain meaningful to the caller
600    /// until the CQE is reaped.
601    pub unsafe fn submit_read_fixed_at(
602        &mut self,
603        fd: i32,
604        offset: u64,
605        len: u32,
606        target_offset: u64,
607        buf_index: u16,
608        user_data: u64,
609    ) -> Result<(), PipelineError> {
610        let end = checked_target_end(target_offset, len)?;
611        let gpu_len = usize_to_u64(self.gpu_buffer.len(), "mapped GPU buffer length")?;
612        if end > gpu_len {
613            return Err(PipelineError::QueueFull {
614                queue: "submission",
615                fix: "chunk_idx * len exceeds GpuMappedBuffer length",
616            });
617        }
618
619        let Some(sqe) = self.ring_state.get_sqe() else {
620            return Err(PipelineError::QueueFull {
621                queue: "submission",
622                fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
623            });
624        };
625
626        // SAFETY: bounds-checked target address inside the host-visible
627        // GpuMappedBuffer the caller committed at construction.
628        let target_addr = unsafe {
629            self.gpu_buffer
630                .as_ptr()
631                .add(u64_to_usize(target_offset, "target offset")?)
632        };
633
634        sqe.opcode = IORING_OP_READ_FIXED;
635        sqe.fd = fd;
636        sqe.user_data_or_off = offset;
637        sqe.addr = pointer_addr_u64(target_addr, "fixed-read target pointer")?;
638        sqe.len = len;
639        sqe.buf_index = buf_index;
640        sqe.user_data = user_data;
641
642        self.ring_state.commit_sqe();
643        increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
644        increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
645
646        Ok(())
647    }
648
649    /// Submit a read using a registered-file-table index instead of a
650    /// live fd. Use with
651    /// [`super::ring::IoUringState::register_files`]  -  avoids the
652    /// per-SQE file refcount bump.
653    ///
654    /// # Errors
655    ///
656    /// Same surface as [`AsyncUringStream::submit_read_to_gpu`].
657    ///
658    /// # Safety
659    ///
660    /// `file_index` must name a still-registered fd.
661    /// `iovs_storage` must outlive the completion. All other
662    /// conditions match `submit_read_to_gpu`.
663    pub unsafe fn submit_read_to_gpu_fixed_file(
664        &mut self,
665        file_index: i32,
666        offset: u64,
667        len: u32,
668        chunk_idx: usize,
669        iovs_storage: &mut [Iovec],
670    ) -> Result<(), PipelineError> {
671        if iovs_storage.is_empty() {
672            return Err(PipelineError::QueueFull {
673                queue: "submission",
674                fix: "caller supplied empty iovs_storage; pass at least one slot",
675            });
676        }
677        let target_offset = checked_chunk_target_offset(chunk_idx, len)?;
678        let end = checked_target_end(target_offset, len)?;
679        let gpu_len = usize_to_u64(self.gpu_buffer.len(), "mapped GPU buffer length")?;
680        if end > gpu_len {
681            return Err(PipelineError::QueueFull {
682                queue: "submission",
683                fix: "chunk_idx * len exceeds GpuMappedBuffer length",
684            });
685        }
686
687        let Some(sqe) = self.ring_state.get_sqe() else {
688            return Err(PipelineError::QueueFull {
689                queue: "submission",
690                fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
691            });
692        };
693
694        // SAFETY: same invariants as submit_read_to_gpu, plus the
695        // caller committed that file_index is a registered fd.
696        let target_addr = unsafe {
697            self.gpu_buffer
698                .as_ptr()
699                .add(u64_to_usize(target_offset, "target offset")?)
700        };
701        iovs_storage[0] = Iovec {
702            iov_base: target_addr.cast::<core::ffi::c_void>(),
703            iov_len: u32_to_usize(len, "read length")?,
704        };
705
706        sqe.opcode = IORING_OP_READV;
707        sqe.flags = super::ring::IOSQE_FIXED_FILE;
708        sqe.fd = file_index;
709        sqe.user_data_or_off = offset;
710        sqe.addr = pointer_addr_u64(iovs_storage.as_ptr(), "fixed-file readv iovec pointer")?;
711        sqe.len = 1;
712        sqe.user_data = usize_to_u64(chunk_idx, "chunk index")?;
713
714        self.ring_state.commit_sqe();
715        increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
716        increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
717
718        Ok(())
719    }
720
721    /// Disabled-feature implementation for NVMe passthrough. Always returns
722    /// [`PipelineError::NvmePassthroughDisabled`] so callers get a
723    /// structured error rather than a link failure.
724    #[cfg(not(feature = "uring-cmd-nvme"))]
725    #[allow(clippy::unused_self, clippy::missing_safety_doc)]
726    pub unsafe fn submit_nvme_passthrough(
727        &mut self,
728        _fd: i32,
729        _user_data: u64,
730        _nvme_sqe_bytes: &[u8],
731    ) -> Result<(), PipelineError> {
732        Err(PipelineError::NvmePassthroughDisabled)
733    }
734}
735
736fn checked_chunk_target_offset(chunk_idx: usize, len: u32) -> Result<u64, PipelineError> {
737    let chunk_idx = usize_to_u64(chunk_idx, "chunk index")?;
738    vyre_driver::accounting::checked_mul_u64_lazy(chunk_idx, u64::from(len), || {
739        PipelineError::QueueFull {
740            queue: "submission",
741            fix: "chunk_idx * len overflows u64; split the IO batch before submission",
742        }
743    })
744}
745
746fn checked_target_end(target_offset: u64, len: u32) -> Result<u64, PipelineError> {
747    vyre_driver::accounting::checked_add_u64_lazy(target_offset, u64::from(len), || {
748        PipelineError::QueueFull {
749            queue: "submission",
750            fix: "target_offset + len overflows u64; split the IO batch before submission",
751        }
752    })
753}
754
755fn increment_queue_counter(counter: &mut u32, label: &'static str) -> Result<(), PipelineError> {
756    *counter = vyre_driver::accounting::checked_add_u32_value(
757        *counter,
758        1,
759        PipelineError::QueueFull {
760            queue: "submission",
761            fix: match label {
762                "inflight SQE count" => {
763                    "inflight SQE count overflowed u32; poll completions before submitting more work"
764                }
765                "pending submission count" => {
766                    "pending submission count overflowed u32; flush submissions before queuing more work"
767                }
768                _ => {
769                    "io_uring queue counter overflowed u32; drain the queue before submitting more work"
770                }
771            },
772        },
773    )?;
774    Ok(())
775}
776
777fn decrement_queue_counter(counter: &mut u32, label: &'static str) -> Result<(), PipelineError> {
778    *counter = counter.checked_sub(1).ok_or(PipelineError::QueueFull {
779        queue: "completion",
780        fix: match label {
781            "inflight SQE count" => {
782                "io_uring completion arrived with no inflight SQE; rebuild the stream state"
783            }
784            _ => "io_uring queue counter underflowed; rebuild the stream state",
785        },
786    })?;
787    Ok(())
788}
789
790fn usize_to_u64(value: usize, label: &'static str) -> Result<u64, PipelineError> {
791    u64::try_from(value).map_err(|_| PipelineError::QueueFull {
792        queue: "submission",
793        fix: match label {
794            "chunk index" => "chunk index cannot fit u64; split the IO batch before submission",
795            "mapped GPU buffer length" => {
796                "mapped GPU buffer length cannot fit u64; split the staging allocation"
797            }
798            _ => "host usize value cannot fit u64; split the IO batch before submission",
799        },
800    })
801}
802
803fn pointer_addr_u64<T>(ptr: *const T, label: &'static str) -> Result<u64, PipelineError> {
804    usize_to_u64(ptr.addr(), label)
805}
806
807fn u64_to_usize(value: u64, label: &'static str) -> Result<usize, PipelineError> {
808    usize::try_from(value).map_err(|_| PipelineError::QueueFull {
809        queue: "submission",
810        fix: match label {
811            "target offset" => {
812                "target offset cannot fit usize; split the IO batch before submission"
813            }
814            _ => "u64 value cannot fit usize; split the IO batch before submission",
815        },
816    })
817}
818
819fn u32_to_usize(value: u32, label: &'static str) -> Result<usize, PipelineError> {
820    usize::try_from(value).map_err(|_| PipelineError::QueueFull {
821        queue: "submission",
822        fix: match label {
823            "read length" => "read length cannot fit usize; split the IO request before submission",
824            _ => "u32 value cannot fit usize; split the IO request before submission",
825        },
826    })
827}
828
829#[cfg(test)]
830mod tests {
831    use super::*;
832
833    #[test]
834    fn mapped_slice_roundtrip_is_miri_clean() {
835        let mut backing = [1_u8, 2, 3, 4];
836        // SAFETY: `backing` stays live and uniquely borrowed for the mapped buffer lifetime.
837        let mut mapped = unsafe { GpuMappedBuffer::from_host_visible_slice(&mut backing) };
838        // SAFETY: the mapped buffer was built from `backing` and remains uniquely borrowed.
839        let slice = unsafe { mapped.as_mut_slice() };
840        slice[0] = 9;
841        slice[3] = 7;
842        assert_eq!(backing, [9, 2, 3, 7]);
843    }
844}