Skip to main content

vyre_runtime/uring/
stream.rs

1//! `AsyncUringStream`  -  drives io_uring reads into GPU-visible memory
2//! and advances the megakernel tail pointer on each completion.
3//!
4//! The critical safety contract: every byte read lands in a
5//! [`GpuMappedBuffer`]. Compatibility ingest uses registered
6//! host-visible GPU mappings; canonical native ingest uses BAR1 peer memory
7//! via [`GpuMappedBuffer::from_bar1_peer_with_owner`] plus NVMe passthrough.
8//! The io_uring writer never targets an ordinary userspace bounce buffer.
9
10use super::ring::IoUringState;
11use crate::PipelineError;
12use core::marker::PhantomData;
13use core::sync::atomic::{AtomicU32, Ordering};
14
15/// Minimal `iovec` struct matching the Linux ABI for `readv`.
16#[repr(C)]
17#[derive(Debug, Clone, Copy)]
18pub struct Iovec {
19    /// Target buffer address for this chunk of the read.
20    pub iov_base: *mut core::ffi::c_void,
21    /// Byte length of the target buffer.
22    pub iov_len: usize,
23}
24
25/// `IORING_OP_READV`  -  scatter-read into an array of iovecs.
26pub const IORING_OP_READV: u8 = 1;
27/// `IORING_OP_READ_FIXED`  -  read into a pre-registered buffer.
28pub const IORING_OP_READ_FIXED: u8 = 22;
29/// `IORING_OP_URING_CMD`  -  vendor-specific passthrough (NVMe). Kernel 6.0+.
30pub const IORING_OP_URING_CMD: u8 = 46;
31
32/// GPU-visible memory region that io_uring is allowed to DMA into.
33///
34/// Compatibility constructors cover host-visible shared mappings. The BAR1
35/// constructor covers the native GPUDirect path where NVMe DMA lands directly
36/// in GPU-owned memory.
37pub struct GpuMappedBuffer<'a> {
38    ptr: *mut u8,
39    len: usize,
40    _owner: PhantomData<&'a mut [u8]>,
41}
42
43// SAFETY: Send + Sync because (a) the constructor's safety contract
44// requires the caller to commit the lifetime invariant, and (b) the
45// raw pointer is only dereferenced by the kernel via io_uring  -
46// vyre-runtime never reads through it directly.
47unsafe impl Send for GpuMappedBuffer<'_> {}
48unsafe impl Sync for GpuMappedBuffer<'_> {}
49
50impl<'a> GpuMappedBuffer<'a> {
51    /// Construct from a borrowed host-visible byte slice.
52    ///
53    /// # Safety
54    ///
55    /// The caller asserts:
56    /// - `slice` aliases a device allocation created with host-visible
57    ///   host-shared usage bits by the concrete backend.
58    /// - No other code reads or writes through `slice` while the
59    ///   returned handle is alive.
60    pub unsafe fn from_host_visible_slice(slice: &'a mut [u8]) -> Self {
61        Self {
62            ptr: slice.as_mut_ptr(),
63            len: slice.len(),
64            _owner: PhantomData,
65        }
66    }
67
68    /// Construct from a raw pointer + explicit owner anchor.
69    ///
70    /// This is for GPU APIs that hand back a raw mapped pointer plus an
71    /// owning handle rather than a Rust slice. The borrow on `owner`
72    /// forces the mapped region to outlive every derived
73    /// [`AsyncUringStream`].
74    ///
75    /// # Safety
76    ///
77    /// The caller asserts:
78    /// - `ptr` points into a GPU allocation owned by `owner`.
79    /// - The mapped region is `len` bytes long and host-visible.
80    /// - No other code reads or writes through `ptr` while the
81    ///   returned handle is alive.
82    pub unsafe fn from_host_visible_owner<O: ?Sized>(
83        _owner: &'a mut O,
84        ptr: *mut u8,
85        len: usize,
86    ) -> Self {
87        Self {
88            ptr,
89            len,
90            _owner: PhantomData,
91        }
92    }
93
94    /// Duplicate the mapped-buffer handle for the same underlying region.
95    ///
96    /// # Safety
97    ///
98    /// The caller must uphold the same aliasing and lifetime guarantees as
99    /// [`GpuMappedBuffer::from_host_visible_slice`]. This does not clone memory;
100    /// it creates another handle to the same mapped bytes.
101    pub unsafe fn duplicate(&self) -> Self {
102        Self {
103            ptr: self.ptr,
104            len: self.len,
105            _owner: PhantomData,
106        }
107    }
108
109    /// Carve out a sub-region of this mapped buffer.
110    ///
111    /// This preserves the original constructor contract: the returned
112    /// handle aliases the same host-visible GPU allocation and carries
113    /// no ownership of its own.
114    ///
115    /// # Errors
116    ///
117    /// Returns [`PipelineError::QueueFull`] when `offset + len`
118    /// exceeds the mapped buffer bounds.
119    pub fn sub_region(&self, offset: usize, len: usize) -> Result<Self, crate::PipelineError> {
120        let _end = vyre_driver::accounting::checked_usize_byte_range_end_lazy(
121            offset,
122            len,
123            self.len,
124            || crate::PipelineError::QueueFull {
125                queue: "submission",
126                fix: "GpuMappedBuffer::sub_region offset + len overflows usize; reduce slot size or enlarge the staging buffer",
127            },
128            |_| crate::PipelineError::QueueFull {
129                queue: "submission",
130                fix: "GpuMappedBuffer::sub_region exceeds the mapped allocation; reduce slot size or enlarge the staging buffer",
131            },
132        )?;
133        Ok(Self {
134            ptr: self.ptr.wrapping_add(offset),
135            len,
136            _owner: PhantomData,
137        })
138    }
139
140    /// Byte length of the mapped region.
141    #[must_use]
142    pub fn len(&self) -> usize {
143        self.len
144    }
145
146    /// Whether the region is empty.
147    #[must_use]
148    pub fn is_empty(&self) -> bool {
149        self.len == 0
150    }
151
152    /// Raw pointer for io_uring submission. Crate-private.
153    pub(crate) fn as_ptr(&self) -> *mut u8 {
154        self.ptr
155    }
156
157    /// Borrow the mapped bytes as a mutable slice.
158    ///
159    /// # Safety
160    ///
161    /// The caller must ensure exclusive mutable access to the region for the
162    /// lifetime of the returned slice.
163    pub unsafe fn as_mut_slice(&mut self) -> &mut [u8] {
164        // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
165        unsafe { core::slice::from_raw_parts_mut(self.ptr, self.len) }
166    }
167
168    /// Construct from a PCIe peer-memory pointer
169    /// GPUDirect Storage).
170    ///
171    /// When paired with [`crate::PipelineError::NvmePassthroughDisabled`]
172    /// via the `uring-cmd-nvme` feature, this lets NVMe DMA writes
173    /// land directly in VRAM without crossing the host PCIe bridge.
174    ///
175    /// # Safety
176    ///
177    /// The caller asserts:
178    /// - `peer_ptr` points at a region returned by `nvidia_p2p_get_pages`
179    ///   (Linux nvidia-fs) or `cuMemAlloc` + `cuPointerSetAttribute`
180    ///   `CU_POINTER_ATTRIBUTE_SYNC_MEMOPS`.
181    /// - The GPU allocation outlives the returned handle.
182    /// - The io_uring kernel and NVMe driver both have DMA-mapping
183    ///   capability (verified at runtime by checking
184    ///   `/proc/driver/nvidia-fs/stats`).
185    pub unsafe fn from_bar1_peer_with_owner<O: ?Sized>(
186        _owner: &'a mut O,
187        peer_ptr: *mut u8,
188        len: usize,
189    ) -> Self {
190        Self {
191            ptr: peer_ptr,
192            len,
193            _owner: PhantomData,
194        }
195    }
196}
197
198/// Streaming reader that pushes chunked reads into an io_uring SQ and
199/// advances an atomic tail pointer the megakernel observes.
200pub struct AsyncUringStream<'a> {
201    pub(crate) ring_state: IoUringState,
202    pub(crate) gpu_buffer: GpuMappedBuffer<'a>,
203    pub(crate) megakernel_tail: &'a AtomicU32,
204    pub(crate) inflight: u32,
205    pub(crate) pending_submissions: u32,
206}
207
208// SAFETY: raw pointer fields covered by GpuMappedBuffer's contract +
209// the constructor's safety commitment on megakernel_tail_ptr.
210unsafe impl Send for AsyncUringStream<'_> {}
211unsafe impl Sync for AsyncUringStream<'_> {}
212
213impl<'a> AsyncUringStream<'a> {
214    /// Create a stream bound to the given ring state, GPU-mapped
215    /// buffer, and megakernel tail pointer.
216    pub fn new(
217        ring_state: IoUringState,
218        gpu_buffer: GpuMappedBuffer<'a>,
219        megakernel_tail: &'a AtomicU32,
220    ) -> Self {
221        Self {
222            ring_state,
223            gpu_buffer,
224            megakernel_tail,
225            inflight: 0,
226            pending_submissions: 0,
227        }
228    }
229
230    /// Rebind the target mapped buffer for future submissions.
231    pub fn replace_buffer(&mut self, gpu_buffer: GpuMappedBuffer<'a>) {
232        self.gpu_buffer = gpu_buffer;
233    }
234
235    /// Submit a scattered read of `len` bytes at file offset `offset`
236    /// into the slot at `chunk_idx * len` within the GPU buffer.
237    ///
238    /// # Errors
239    ///
240    /// - [`PipelineError::QueueFull`] if the SQ is full OR the
241    ///   destination slot exceeds buffer bounds.
242    /// - Range errors surface later as [`PipelineError::IoUringSyscall`]
243    ///   on `poll` if the kernel rejects the SQE.
244    ///
245    /// # Safety
246    ///
247    /// `iovs_storage` must live until this SQE's completion is reaped;
248    /// the kernel dereferences `iov_base` at I/O time, not submit time.
249    pub unsafe fn submit_read_to_gpu(
250        &mut self,
251        fd: i32,
252        offset: u64,
253        len: u32,
254        chunk_idx: usize,
255        iovs_storage: &mut [Iovec],
256    ) -> Result<(), PipelineError> {
257        if iovs_storage.is_empty() {
258            return Err(PipelineError::QueueFull {
259                queue: "submission",
260                fix: "caller supplied empty iovs_storage; pass at least one slot",
261            });
262        }
263        let target_offset = checked_chunk_target_offset(chunk_idx, len)?;
264        // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
265        unsafe { self.submit_read_to_gpu_at(fd, offset, len, target_offset, iovs_storage) }
266    }
267
268    /// Submit a read directly into a byte offset inside the mapped buffer.
269    ///
270    /// Unlike [`AsyncUringStream::submit_read_to_gpu`], this method does not
271    /// derive the destination from a fixed chunk index. Wrappers that stream
272    /// variable-sized shards can place each read contiguously in a staging
273    /// buffer without being forced into `chunk_idx * len` layout.
274    ///
275    /// # Errors
276    ///
277    /// - [`PipelineError::QueueFull`] if the SQ is full OR the target range
278    ///   exceeds the mapped buffer bounds.
279    ///
280    /// # Safety
281    ///
282    /// `iovs_storage` must live until this SQE's completion is reaped.
283    pub unsafe fn submit_read_to_gpu_at(
284        &mut self,
285        fd: i32,
286        offset: u64,
287        len: u32,
288        target_offset: u64,
289        iovs_storage: &mut [Iovec],
290    ) -> Result<(), PipelineError> {
291        // SAFETY: registered fixed buffers + file index are valid for the lifetime
292        // of the ring; the SQE is built on the ring's own SQ slot.
293        unsafe {
294            self.submit_read_to_gpu_at_with_user_data(
295                fd,
296                offset,
297                len,
298                target_offset,
299                target_offset,
300                iovs_storage,
301            )
302        }
303    }
304
305    /// Submit a read into an arbitrary byte offset and preserve caller-defined
306    /// `user_data` for completion correlation.
307    ///
308    /// # Errors
309    ///
310    /// Returns [`PipelineError::QueueFull`] when the SQ is full, the iovec
311    /// storage is empty, or the target range exceeds the mapped GPU buffer.
312    ///
313    /// # Safety
314    ///
315    /// `iovs_storage` must live until this SQE's completion is reaped.
316    pub unsafe fn submit_read_to_gpu_at_with_user_data(
317        &mut self,
318        fd: i32,
319        offset: u64,
320        len: u32,
321        target_offset: u64,
322        user_data: u64,
323        iovs_storage: &mut [Iovec],
324    ) -> Result<(), PipelineError> {
325        if iovs_storage.is_empty() {
326            return Err(PipelineError::QueueFull {
327                queue: "submission",
328                fix: "caller supplied empty iovs_storage; pass at least one slot",
329            });
330        }
331        let end = checked_target_end(target_offset, len)?;
332        let gpu_len = usize_to_u64(self.gpu_buffer.len(), "mapped GPU buffer length")?;
333        if end > gpu_len {
334            return Err(PipelineError::QueueFull {
335                queue: "submission",
336                fix: "target_offset + len exceeds GpuMappedBuffer length; enlarge the buffer or reduce the read size",
337            });
338        }
339
340        let Some(sqe) = self.ring_state.get_sqe() else {
341            return Err(PipelineError::QueueFull {
342                queue: "submission",
343                fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
344            });
345        };
346
347        // SAFETY: bounds-checked above; writing to a sub-region of
348        // the host-visible GpuMappedBuffer the caller committed.
349        let target_addr = unsafe {
350            self.gpu_buffer
351                .as_ptr()
352                .add(u64_to_usize(target_offset, "target offset")?)
353        };
354
355        iovs_storage[0] = Iovec {
356            iov_base: target_addr.cast::<core::ffi::c_void>(),
357            iov_len: u32_to_usize(len, "read length")?,
358        };
359
360        sqe.opcode = IORING_OP_READV;
361        sqe.fd = fd;
362        sqe.user_data_or_off = offset;
363        sqe.addr = pointer_addr_u64(iovs_storage.as_ptr(), "readv iovec pointer")?;
364        sqe.len = 1;
365        sqe.user_data = user_data;
366
367        self.ring_state.commit_sqe();
368        increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
369        increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
370
371        Ok(())
372    }
373
374    /// Submit any queued SQEs to the kernel.
375    ///
376    /// SQPOLL can pick up tail updates on its own, but wrappers that rely on
377    /// bounded latency should not depend on the polling thread waking
378    /// promptly. Flushing pending submissions makes progress explicit.
379    pub fn flush_submissions(&mut self) -> Result<(), PipelineError> {
380        if self.pending_submissions == 0 {
381            return Ok(());
382        }
383        if self.ring_state.uses_sqpoll() {
384            if self.ring_state.sq_needs_wakeup() {
385                self.ring_state.wake_sqpoll()?;
386            }
387        } else {
388            self.ring_state.enter(self.pending_submissions, 0, 0)?;
389        }
390        self.pending_submissions = 0;
391        Ok(())
392    }
393
394    /// Reap available completions, advancing the megakernel tail
395    /// pointer once per success. Returns completions reaped.
396    ///
397    /// # Errors
398    ///
399    /// Returns [`PipelineError::IoUringSyscall`] on the first CQE
400    /// reporting `res < 0`. Remaining CQEs are still drained so the
401    /// ring does not overflow, but only the first failure is
402    /// returned  -  caller re-polls to pick up subsequent errors or
403    /// successes.
404    pub fn poll(&mut self) -> Result<u32, PipelineError> {
405        self.flush_submissions()?;
406        let mut completed: u32 = 0;
407        let mut first_error: Option<PipelineError> = None;
408
409        while let Some(cqe) = self.ring_state.peek_cqe() {
410            let res = cqe.res;
411            self.ring_state.advance_cq();
412            decrement_queue_counter(&mut self.inflight, "inflight SQE count")?;
413
414            if res < 0 {
415                if first_error.is_none() {
416                    first_error = Some(PipelineError::IoUringSyscall {
417                        syscall: "io_uring_cqe",
418                        errno: -res,
419                        fix: "inspect user_data to identify the failed SQE; common causes: EIO on disk, EFAULT on bad iovec, EINVAL on misaligned offset",
420                    });
421                }
422                continue;
423            }
424
425            // Successful DMA: bytes landed in GPU-visible memory. Tail
426            // publication is batched after CQ drain so one poll with N
427            // completions performs one release atomic instead of N.
428            completed = vyre_driver::accounting::checked_add_u32_value(
429                completed,
430                1,
431                PipelineError::QueueFull {
432                    queue: "completion",
433                    fix: "io_uring completion count overflowed u32; drain completions more frequently",
434                },
435            )?;
436        }
437
438        if completed != 0 {
439            self.megakernel_tail.fetch_add(completed, Ordering::Release);
440        }
441
442        match first_error {
443            Some(err) => Err(err),
444            None => Ok(completed),
445        }
446    }
447
448    /// Flush pending submissions + wait for at least one completion.
449    ///
450    /// # Errors
451    ///
452    /// Returns [`PipelineError::IoUringSyscall`] if `io_uring_enter`
453    /// fails.
454    pub fn wait_for_completion(&mut self) -> Result<(), PipelineError> {
455        if self.inflight > 0 {
456            self.flush_submissions()?;
457            self.ring_state.enter(0, 1, 1)?;
458            self.poll()?;
459        }
460        Ok(())
461    }
462
463    /// Number of submissions still awaiting completion.
464    #[must_use]
465    pub fn inflight(&self) -> u32 {
466        self.inflight
467    }
468
469    /// Submit an NVMe passthrough command via `IORING_OP_URING_CMD`.
470    /// Requires the `uring-cmd-nvme` feature and Linux kernel 6.0+.
471    ///
472    /// The NVMe SQE layout is encoded by the caller in `nvme_sqe_bytes`
473    /// (64 bytes)  -  the SQE is memcpy'd into the `addr3`+`addr` slots
474    /// the kernel forwards to the NVMe driver. `user_data` is returned
475    /// on the matching CQE so the caller can correlate completions.
476    ///
477    /// # Errors
478    ///
479    /// - [`PipelineError::NvmePassthroughDisabled`] if the
480    ///   `uring-cmd-nvme` feature is not enabled at compile time. This
481    ///   variant is unreachable in this cfg-gated method but remains
482    ///   part of the public error contract shared with the feature-gated
483    ///   implementation.
484    /// - [`PipelineError::QueueFull`] if the SQ is full or the NVMe
485    ///   command buffer is malformed (must be exactly 64 bytes).
486    ///
487    /// # Safety
488    ///
489    /// - `fd` must be an open character device the caller has
490    ///   `IORING_SETUP_CQE32`-compatible access to (e.g. `/dev/ng0n1`).
491    /// - `nvme_sqe_bytes` must encode a valid NVMe command  -  kernel
492    ///   rejection returns an errno on the CQE, but a forged payload
493    ///   can still trigger device-level misbehavior.
494    #[cfg(feature = "uring-cmd-nvme")]
495    pub unsafe fn submit_nvme_passthrough(
496        &mut self,
497        fd: i32,
498        user_data: u64,
499        nvme_sqe_bytes: &[u8],
500    ) -> Result<(), PipelineError> {
501        if nvme_sqe_bytes.len() != 64 {
502            return Err(PipelineError::QueueFull {
503                queue: "submission",
504                fix: "NVMe passthrough SQE must be exactly 64 bytes; see linux/nvme_ioctl.h",
505            });
506        }
507
508        let Some(sqe) = self.ring_state.get_sqe() else {
509            return Err(PipelineError::QueueFull {
510                queue: "submission",
511                fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
512            });
513        };
514
515        // SAFETY: caller-provided slice is 64 bytes as validated
516        // above; we copy into the 64-byte NVMe passthrough region
517        // the kernel expects (addr + addr3 cover the first 40 bytes;
518        // the remaining 24 live in the SQE's inline payload).
519        let nvme_ptr = nvme_sqe_bytes.as_ptr();
520        sqe.opcode = IORING_OP_URING_CMD;
521        sqe.fd = fd;
522        sqe.user_data_or_off = 0;
523        // `cmd_op` in the first 4 bytes of addr3 (kernel reads it as u32).
524        sqe.addr = pointer_addr_u64(nvme_ptr, "NVMe command pointer")?;
525        sqe.len = 64;
526        sqe.user_data = user_data;
527        // The kernel reads the remaining payload bytes out of addr3
528        // directly; downstream NVMe drivers dereference this pointer.
529        sqe.addr3 = pointer_addr_u64(nvme_ptr, "NVMe command addr3 pointer")?;
530
531        self.ring_state.commit_sqe();
532        increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
533        increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
534
535        Ok(())
536    }
537
538    /// Submit an `IORING_OP_READ_FIXED` into a pre-registered buffer.
539    ///
540    /// Requires the caller to have previously called
541    /// [`super::ring::IoUringState::register_buffers`] with an iovec
542    /// slice whose entry `buf_index` covers the target range. Because
543    /// the kernel skips per-SQE iovec validation, this path is 20-40%
544    /// lower latency than `submit_read_to_gpu` on hot loops.
545    ///
546    /// # Errors
547    ///
548    /// - [`PipelineError::QueueFull`] if the SQ is full or the
549    ///   destination range exceeds the GPU buffer bounds.
550    ///
551    /// # Safety
552    ///
553    /// The `buf_index` must reference a still-registered iovec whose
554    /// region overlaps `chunk_idx * len .. (chunk_idx + 1) * len`
555    /// inside the [`GpuMappedBuffer`]. Mis-indexing produces a kernel
556    /// DMA into the wrong region  -  silent data corruption.
557    pub unsafe fn submit_read_fixed(
558        &mut self,
559        fd: i32,
560        offset: u64,
561        len: u32,
562        chunk_idx: usize,
563        buf_index: u16,
564    ) -> Result<(), PipelineError> {
565        let target_offset = checked_chunk_target_offset(chunk_idx, len)?;
566        // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
567        unsafe {
568            self.submit_read_fixed_at(
569                fd,
570                offset,
571                len,
572                target_offset,
573                buf_index,
574                usize_to_u64(chunk_idx, "chunk index")?,
575            )
576        }
577    }
578
579    /// Submit an `IORING_OP_READ_FIXED` into a registered buffer at an
580    /// explicit destination offset inside the mapped buffer.
581    ///
582    /// Unlike [`AsyncUringStream::submit_read_fixed`], this variant decouples
583    /// the CQE `user_data` from the destination layout so higher-level
584    /// drivers can publish their own slot ids while still using a fixed slot
585    /// stride.
586    ///
587    /// # Errors
588    ///
589    /// - [`PipelineError::QueueFull`] if the SQ is full or the
590    ///   destination range exceeds the GPU buffer bounds.
591    ///
592    /// # Safety
593    ///
594    /// `buf_index` must reference a still-registered iovec covering the
595    /// target range, and `user_data` must remain meaningful to the caller
596    /// until the CQE is reaped.
597    pub unsafe fn submit_read_fixed_at(
598        &mut self,
599        fd: i32,
600        offset: u64,
601        len: u32,
602        target_offset: u64,
603        buf_index: u16,
604        user_data: u64,
605    ) -> Result<(), PipelineError> {
606        let end = checked_target_end(target_offset, len)?;
607        let gpu_len = usize_to_u64(self.gpu_buffer.len(), "mapped GPU buffer length")?;
608        if end > gpu_len {
609            return Err(PipelineError::QueueFull {
610                queue: "submission",
611                fix: "chunk_idx * len exceeds GpuMappedBuffer length",
612            });
613        }
614
615        let Some(sqe) = self.ring_state.get_sqe() else {
616            return Err(PipelineError::QueueFull {
617                queue: "submission",
618                fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
619            });
620        };
621
622        // SAFETY: bounds-checked target address inside the host-visible
623        // GpuMappedBuffer the caller committed at construction.
624        let target_addr = unsafe {
625            self.gpu_buffer
626                .as_ptr()
627                .add(u64_to_usize(target_offset, "target offset")?)
628        };
629
630        sqe.opcode = IORING_OP_READ_FIXED;
631        sqe.fd = fd;
632        sqe.user_data_or_off = offset;
633        sqe.addr = pointer_addr_u64(target_addr, "fixed-read target pointer")?;
634        sqe.len = len;
635        sqe.buf_index = buf_index;
636        sqe.user_data = user_data;
637
638        self.ring_state.commit_sqe();
639        increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
640        increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
641
642        Ok(())
643    }
644
645    /// Submit a read using a registered-file-table index instead of a
646    /// live fd. Use with
647    /// [`super::ring::IoUringState::register_files`]  -  avoids the
648    /// per-SQE file refcount bump.
649    ///
650    /// # Errors
651    ///
652    /// Same surface as [`AsyncUringStream::submit_read_to_gpu`].
653    ///
654    /// # Safety
655    ///
656    /// `file_index` must name a still-registered fd.
657    /// `iovs_storage` must outlive the completion. All other
658    /// conditions match `submit_read_to_gpu`.
659    pub unsafe fn submit_read_to_gpu_fixed_file(
660        &mut self,
661        file_index: i32,
662        offset: u64,
663        len: u32,
664        chunk_idx: usize,
665        iovs_storage: &mut [Iovec],
666    ) -> Result<(), PipelineError> {
667        if iovs_storage.is_empty() {
668            return Err(PipelineError::QueueFull {
669                queue: "submission",
670                fix: "caller supplied empty iovs_storage; pass at least one slot",
671            });
672        }
673        let target_offset = checked_chunk_target_offset(chunk_idx, len)?;
674        let end = checked_target_end(target_offset, len)?;
675        let gpu_len = usize_to_u64(self.gpu_buffer.len(), "mapped GPU buffer length")?;
676        if end > gpu_len {
677            return Err(PipelineError::QueueFull {
678                queue: "submission",
679                fix: "chunk_idx * len exceeds GpuMappedBuffer length",
680            });
681        }
682
683        let Some(sqe) = self.ring_state.get_sqe() else {
684            return Err(PipelineError::QueueFull {
685                queue: "submission",
686                fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
687            });
688        };
689
690        // SAFETY: same invariants as submit_read_to_gpu, plus the
691        // caller committed that file_index is a registered fd.
692        let target_addr = unsafe {
693            self.gpu_buffer
694                .as_ptr()
695                .add(u64_to_usize(target_offset, "target offset")?)
696        };
697        iovs_storage[0] = Iovec {
698            iov_base: target_addr.cast::<core::ffi::c_void>(),
699            iov_len: u32_to_usize(len, "read length")?,
700        };
701
702        sqe.opcode = IORING_OP_READV;
703        sqe.flags = super::ring::IOSQE_FIXED_FILE;
704        sqe.fd = file_index;
705        sqe.user_data_or_off = offset;
706        sqe.addr = pointer_addr_u64(iovs_storage.as_ptr(), "fixed-file readv iovec pointer")?;
707        sqe.len = 1;
708        sqe.user_data = usize_to_u64(chunk_idx, "chunk index")?;
709
710        self.ring_state.commit_sqe();
711        increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
712        increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
713
714        Ok(())
715    }
716
717    /// Disabled-feature implementation for NVMe passthrough. Always returns
718    /// [`PipelineError::NvmePassthroughDisabled`] so callers get a
719    /// structured error rather than a link failure.
720    #[cfg(not(feature = "uring-cmd-nvme"))]
721    #[allow(clippy::unused_self, clippy::missing_safety_doc)]
722    pub unsafe fn submit_nvme_passthrough(
723        &mut self,
724        _fd: i32,
725        _user_data: u64,
726        _nvme_sqe_bytes: &[u8],
727    ) -> Result<(), PipelineError> {
728        Err(PipelineError::NvmePassthroughDisabled)
729    }
730}
731
732
733fn checked_chunk_target_offset(chunk_idx: usize, len: u32) -> Result<u64, PipelineError> {
734    let chunk_idx = usize_to_u64(chunk_idx, "chunk index")?;
735    vyre_driver::accounting::checked_mul_u64_lazy(chunk_idx, u64::from(len), || {
736        PipelineError::QueueFull {
737            queue: "submission",
738            fix: "chunk_idx * len overflows u64; split the IO batch before submission",
739        }
740    })
741}
742
743fn checked_target_end(target_offset: u64, len: u32) -> Result<u64, PipelineError> {
744    vyre_driver::accounting::checked_add_u64_lazy(target_offset, u64::from(len), || {
745        PipelineError::QueueFull {
746            queue: "submission",
747            fix: "target_offset + len overflows u64; split the IO batch before submission",
748        }
749    })
750}
751
752fn increment_queue_counter(counter: &mut u32, label: &'static str) -> Result<(), PipelineError> {
753    *counter = vyre_driver::accounting::checked_add_u32_value(
754        *counter,
755        1,
756        PipelineError::QueueFull {
757            queue: "submission",
758            fix: match label {
759                "inflight SQE count" => {
760                    "inflight SQE count overflowed u32; poll completions before submitting more work"
761                }
762                "pending submission count" => {
763                    "pending submission count overflowed u32; flush submissions before queuing more work"
764                }
765                _ => {
766                    "io_uring queue counter overflowed u32; drain the queue before submitting more work"
767                }
768            },
769        },
770    )?;
771    Ok(())
772}
773
774fn decrement_queue_counter(counter: &mut u32, label: &'static str) -> Result<(), PipelineError> {
775    *counter = counter.checked_sub(1).ok_or(PipelineError::QueueFull {
776        queue: "completion",
777        fix: match label {
778            "inflight SQE count" => {
779                "io_uring completion arrived with no inflight SQE; rebuild the stream state"
780            }
781            _ => "io_uring queue counter underflowed; rebuild the stream state",
782        },
783    })?;
784    Ok(())
785}
786
787fn usize_to_u64(value: usize, label: &'static str) -> Result<u64, PipelineError> {
788    u64::try_from(value).map_err(|_| PipelineError::QueueFull {
789        queue: "submission",
790        fix: match label {
791            "chunk index" => "chunk index cannot fit u64; split the IO batch before submission",
792            "mapped GPU buffer length" => {
793                "mapped GPU buffer length cannot fit u64; split the staging allocation"
794            }
795            _ => "host usize value cannot fit u64; split the IO batch before submission",
796        },
797    })
798}
799
800fn pointer_addr_u64<T>(ptr: *const T, label: &'static str) -> Result<u64, PipelineError> {
801    usize_to_u64(ptr.addr(), label)
802}
803
804fn u64_to_usize(value: u64, label: &'static str) -> Result<usize, PipelineError> {
805    usize::try_from(value).map_err(|_| PipelineError::QueueFull {
806        queue: "submission",
807        fix: match label {
808            "target offset" => {
809                "target offset cannot fit usize; split the IO batch before submission"
810            }
811            _ => "u64 value cannot fit usize; split the IO batch before submission",
812        },
813    })
814}
815
816fn u32_to_usize(value: u32, label: &'static str) -> Result<usize, PipelineError> {
817    usize::try_from(value).map_err(|_| PipelineError::QueueFull {
818        queue: "submission",
819        fix: match label {
820            "read length" => "read length cannot fit usize; split the IO request before submission",
821            _ => "u32 value cannot fit usize; split the IO request before submission",
822        },
823    })
824}
825
826#[cfg(test)]
827mod tests {
828    use super::*;
829
830    #[test]
831    fn mapped_slice_roundtrip_is_miri_clean() {
832        let mut backing = [1_u8, 2, 3, 4];
833        // SAFETY: `backing` stays live and uniquely borrowed for the mapped buffer lifetime.
834        let mut mapped = unsafe { GpuMappedBuffer::from_host_visible_slice(&mut backing) };
835        // SAFETY: the mapped buffer was built from `backing` and remains uniquely borrowed.
836        let slice = unsafe { mapped.as_mut_slice() };
837        slice[0] = 9;
838        slice[3] = 7;
839        assert_eq!(backing, [9, 2, 3, 7]);
840    }
841}
842