vyre_runtime/uring/stream.rs
1//! `AsyncUringStream` - drives io_uring reads into GPU-visible memory
2//! and advances the megakernel tail pointer on each completion.
3//!
4//! The critical safety contract: every byte read lands in a
5//! [`GpuMappedBuffer`]. Compatibility ingest uses registered
6//! host-visible GPU mappings; canonical native ingest uses BAR1 peer memory
7//! via [`GpuMappedBuffer::from_bar1_peer_with_owner`] plus NVMe passthrough.
8//! The io_uring writer never targets an ordinary userspace bounce buffer.
9
10use super::ring::IoUringState;
11use crate::PipelineError;
12use core::marker::PhantomData;
13use core::sync::atomic::{AtomicU32, Ordering};
14
15/// Minimal `iovec` struct matching the Linux ABI for `readv`.
16#[repr(C)]
17#[derive(Debug, Clone, Copy)]
18pub struct Iovec {
19 /// Target buffer address for this chunk of the read.
20 pub iov_base: *mut core::ffi::c_void,
21 /// Byte length of the target buffer.
22 pub iov_len: usize,
23}
24
25/// `IORING_OP_READV` - scatter-read into an array of iovecs.
26pub const IORING_OP_READV: u8 = 1;
27/// `IORING_OP_READ_FIXED` - read into a pre-registered buffer.
28pub const IORING_OP_READ_FIXED: u8 = 22;
29/// `IORING_OP_URING_CMD` - vendor-specific passthrough (NVMe). Kernel 6.0+.
30pub const IORING_OP_URING_CMD: u8 = 46;
31
32/// GPU-visible memory region that io_uring is allowed to DMA into.
33///
34/// Compatibility constructors cover host-visible shared mappings. The BAR1
35/// constructor covers the native GPUDirect path where NVMe DMA lands directly
36/// in GPU-owned memory.
37pub struct GpuMappedBuffer<'a> {
38 ptr: *mut u8,
39 len: usize,
40 _owner: PhantomData<&'a mut [u8]>,
41}
42
43// SAFETY: Send + Sync because (a) the constructor's safety contract
44// requires the caller to commit the lifetime invariant, and (b) the
45// raw pointer is only dereferenced by the kernel via io_uring -
46// vyre-runtime never reads through it directly.
47unsafe impl Send for GpuMappedBuffer<'_> {}
48unsafe impl Sync for GpuMappedBuffer<'_> {}
49
50impl<'a> GpuMappedBuffer<'a> {
51 /// Construct from a borrowed host-visible byte slice.
52 ///
53 /// # Safety
54 ///
55 /// The caller asserts:
56 /// - `slice` aliases a device allocation created with host-visible
57 /// host-shared usage bits by the concrete backend.
58 /// - No other code reads or writes through `slice` while the
59 /// returned handle is alive.
60 pub unsafe fn from_host_visible_slice(slice: &'a mut [u8]) -> Self {
61 Self {
62 ptr: slice.as_mut_ptr(),
63 len: slice.len(),
64 _owner: PhantomData,
65 }
66 }
67
68 /// Construct from a raw pointer + explicit owner anchor.
69 ///
70 /// This is for GPU APIs that hand back a raw mapped pointer plus an
71 /// owning handle rather than a Rust slice. The borrow on `owner`
72 /// forces the mapped region to outlive every derived
73 /// [`AsyncUringStream`].
74 ///
75 /// # Safety
76 ///
77 /// The caller asserts:
78 /// - `ptr` points into a GPU allocation owned by `owner`.
79 /// - The mapped region is `len` bytes long and host-visible.
80 /// - No other code reads or writes through `ptr` while the
81 /// returned handle is alive.
82 pub unsafe fn from_host_visible_owner<O: ?Sized>(
83 _owner: &'a mut O,
84 ptr: *mut u8,
85 len: usize,
86 ) -> Self {
87 Self {
88 ptr,
89 len,
90 _owner: PhantomData,
91 }
92 }
93
94 /// Duplicate the mapped-buffer handle for the same underlying region.
95 ///
96 /// # Safety
97 ///
98 /// The caller must uphold the same aliasing and lifetime guarantees as
99 /// [`GpuMappedBuffer::from_host_visible_slice`]. This does not clone memory;
100 /// it creates another handle to the same mapped bytes.
101 pub unsafe fn duplicate(&self) -> Self {
102 Self {
103 ptr: self.ptr,
104 len: self.len,
105 _owner: PhantomData,
106 }
107 }
108
109 /// Carve out a sub-region of this mapped buffer.
110 ///
111 /// This preserves the original constructor contract: the returned
112 /// handle aliases the same host-visible GPU allocation and carries
113 /// no ownership of its own.
114 ///
115 /// # Errors
116 ///
117 /// Returns [`PipelineError::QueueFull`] when `offset + len`
118 /// exceeds the mapped buffer bounds.
119 pub fn sub_region(&self, offset: usize, len: usize) -> Result<Self, crate::PipelineError> {
120 let _end = vyre_driver::accounting::checked_usize_byte_range_end_lazy(
121 offset,
122 len,
123 self.len,
124 || {
125 crate::PipelineError::QueueFull {
126 queue: "submission",
127 fix: "GpuMappedBuffer::sub_region offset + len overflows usize; reduce slot size or enlarge the staging buffer",
128 }
129 },
130 |_| {
131 crate::PipelineError::QueueFull {
132 queue: "submission",
133 fix: "GpuMappedBuffer::sub_region exceeds the mapped allocation; reduce slot size or enlarge the staging buffer",
134 }
135 },
136 )?;
137 Ok(Self {
138 ptr: self.ptr.wrapping_add(offset),
139 len,
140 _owner: PhantomData,
141 })
142 }
143
144 /// Byte length of the mapped region.
145 #[must_use]
146 pub fn len(&self) -> usize {
147 self.len
148 }
149
150 /// Whether the region is empty.
151 #[must_use]
152 pub fn is_empty(&self) -> bool {
153 self.len == 0
154 }
155
156 /// Raw pointer for io_uring submission. Crate-private.
157 pub(crate) fn as_ptr(&self) -> *mut u8 {
158 self.ptr
159 }
160
161 /// Borrow the mapped bytes as a mutable slice.
162 ///
163 /// # Safety
164 ///
165 /// The caller must ensure exclusive mutable access to the region for the
166 /// lifetime of the returned slice.
167 pub unsafe fn as_mut_slice(&mut self) -> &mut [u8] {
168 // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
169 unsafe { core::slice::from_raw_parts_mut(self.ptr, self.len) }
170 }
171
172 /// Construct from a PCIe peer-memory pointer
173 /// GPUDirect Storage).
174 ///
175 /// When paired with [`crate::PipelineError::NvmePassthroughDisabled`]
176 /// via the `uring-cmd-nvme` feature, this lets NVMe DMA writes
177 /// land directly in VRAM without crossing the host PCIe bridge.
178 ///
179 /// # Safety
180 ///
181 /// The caller asserts:
182 /// - `peer_ptr` points at a region returned by `nvidia_p2p_get_pages`
183 /// (Linux nvidia-fs) or `cuMemAlloc` + `cuPointerSetAttribute`
184 /// `CU_POINTER_ATTRIBUTE_SYNC_MEMOPS`.
185 /// - The GPU allocation outlives the returned handle.
186 /// - The io_uring kernel and NVMe driver both have DMA-mapping
187 /// capability (verified at runtime by checking
188 /// `/proc/driver/nvidia-fs/stats`).
189 pub unsafe fn from_bar1_peer_with_owner<O: ?Sized>(
190 _owner: &'a mut O,
191 peer_ptr: *mut u8,
192 len: usize,
193 ) -> Self {
194 Self {
195 ptr: peer_ptr,
196 len,
197 _owner: PhantomData,
198 }
199 }
200}
201
202/// Streaming reader that pushes chunked reads into an io_uring SQ and
203/// advances an atomic tail pointer the megakernel observes.
204pub struct AsyncUringStream<'a> {
205 pub(crate) ring_state: IoUringState,
206 pub(crate) gpu_buffer: GpuMappedBuffer<'a>,
207 pub(crate) megakernel_tail: &'a AtomicU32,
208 pub(crate) inflight: u32,
209 pub(crate) pending_submissions: u32,
210}
211
212// SAFETY: raw pointer fields covered by GpuMappedBuffer's contract +
213// the constructor's safety commitment on megakernel_tail_ptr.
214unsafe impl Send for AsyncUringStream<'_> {}
215unsafe impl Sync for AsyncUringStream<'_> {}
216
217impl<'a> AsyncUringStream<'a> {
218 /// Create a stream bound to the given ring state, GPU-mapped
219 /// buffer, and megakernel tail pointer.
220 pub fn new(
221 ring_state: IoUringState,
222 gpu_buffer: GpuMappedBuffer<'a>,
223 megakernel_tail: &'a AtomicU32,
224 ) -> Self {
225 Self {
226 ring_state,
227 gpu_buffer,
228 megakernel_tail,
229 inflight: 0,
230 pending_submissions: 0,
231 }
232 }
233
234 /// Rebind the target mapped buffer for future submissions.
235 pub fn replace_buffer(&mut self, gpu_buffer: GpuMappedBuffer<'a>) {
236 self.gpu_buffer = gpu_buffer;
237 }
238
239 /// Submit a scattered read of `len` bytes at file offset `offset`
240 /// into the slot at `chunk_idx * len` within the GPU buffer.
241 ///
242 /// # Errors
243 ///
244 /// - [`PipelineError::QueueFull`] if the SQ is full OR the
245 /// destination slot exceeds buffer bounds.
246 /// - Range errors surface later as [`PipelineError::IoUringSyscall`]
247 /// on `poll` if the kernel rejects the SQE.
248 ///
249 /// # Safety
250 ///
251 /// `iovs_storage` must live until this SQE's completion is reaped;
252 /// the kernel dereferences `iov_base` at I/O time, not submit time.
253 pub unsafe fn submit_read_to_gpu(
254 &mut self,
255 fd: i32,
256 offset: u64,
257 len: u32,
258 chunk_idx: usize,
259 iovs_storage: &mut [Iovec],
260 ) -> Result<(), PipelineError> {
261 if iovs_storage.is_empty() {
262 return Err(PipelineError::QueueFull {
263 queue: "submission",
264 fix: "caller supplied empty iovs_storage; pass at least one slot",
265 });
266 }
267 let target_offset = checked_chunk_target_offset(chunk_idx, len)?;
268 // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
269 unsafe { self.submit_read_to_gpu_at(fd, offset, len, target_offset, iovs_storage) }
270 }
271
272 /// Submit a read directly into a byte offset inside the mapped buffer.
273 ///
274 /// Unlike [`AsyncUringStream::submit_read_to_gpu`], this method does not
275 /// derive the destination from a fixed chunk index. Wrappers that stream
276 /// variable-sized shards can place each read contiguously in a staging
277 /// buffer without being forced into `chunk_idx * len` layout.
278 ///
279 /// # Errors
280 ///
281 /// - [`PipelineError::QueueFull`] if the SQ is full OR the target range
282 /// exceeds the mapped buffer bounds.
283 ///
284 /// # Safety
285 ///
286 /// `iovs_storage` must live until this SQE's completion is reaped.
287 pub unsafe fn submit_read_to_gpu_at(
288 &mut self,
289 fd: i32,
290 offset: u64,
291 len: u32,
292 target_offset: u64,
293 iovs_storage: &mut [Iovec],
294 ) -> Result<(), PipelineError> {
295 // SAFETY: registered fixed buffers + file index are valid for the lifetime
296 // of the ring; the SQE is built on the ring's own SQ slot.
297 unsafe {
298 self.submit_read_to_gpu_at_with_user_data(
299 fd,
300 offset,
301 len,
302 target_offset,
303 target_offset,
304 iovs_storage,
305 )
306 }
307 }
308
309 /// Submit a read into an arbitrary byte offset and preserve caller-defined
310 /// `user_data` for completion correlation.
311 ///
312 /// # Errors
313 ///
314 /// Returns [`PipelineError::QueueFull`] when the SQ is full, the iovec
315 /// storage is empty, or the target range exceeds the mapped GPU buffer.
316 ///
317 /// # Safety
318 ///
319 /// `iovs_storage` must live until this SQE's completion is reaped.
320 pub unsafe fn submit_read_to_gpu_at_with_user_data(
321 &mut self,
322 fd: i32,
323 offset: u64,
324 len: u32,
325 target_offset: u64,
326 user_data: u64,
327 iovs_storage: &mut [Iovec],
328 ) -> Result<(), PipelineError> {
329 if iovs_storage.is_empty() {
330 return Err(PipelineError::QueueFull {
331 queue: "submission",
332 fix: "caller supplied empty iovs_storage; pass at least one slot",
333 });
334 }
335 let end = checked_target_end(target_offset, len)?;
336 let gpu_len = usize_to_u64(self.gpu_buffer.len(), "mapped GPU buffer length")?;
337 if end > gpu_len {
338 return Err(PipelineError::QueueFull {
339 queue: "submission",
340 fix: "target_offset + len exceeds GpuMappedBuffer length; enlarge the buffer or reduce the read size",
341 });
342 }
343
344 let Some(sqe) = self.ring_state.get_sqe() else {
345 return Err(PipelineError::QueueFull {
346 queue: "submission",
347 fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
348 });
349 };
350
351 // SAFETY: bounds-checked above; writing to a sub-region of
352 // the host-visible GpuMappedBuffer the caller committed.
353 let target_addr = unsafe {
354 self.gpu_buffer
355 .as_ptr()
356 .add(u64_to_usize(target_offset, "target offset")?)
357 };
358
359 iovs_storage[0] = Iovec {
360 iov_base: target_addr.cast::<core::ffi::c_void>(),
361 iov_len: u32_to_usize(len, "read length")?,
362 };
363
364 sqe.opcode = IORING_OP_READV;
365 sqe.fd = fd;
366 sqe.user_data_or_off = offset;
367 sqe.addr = pointer_addr_u64(iovs_storage.as_ptr(), "readv iovec pointer")?;
368 sqe.len = 1;
369 sqe.user_data = user_data;
370
371 self.ring_state.commit_sqe();
372 increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
373 increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
374
375 Ok(())
376 }
377
378 /// Submit any queued SQEs to the kernel.
379 ///
380 /// SQPOLL can pick up tail updates on its own, but wrappers that rely on
381 /// bounded latency should not depend on the polling thread waking
382 /// promptly. Flushing pending submissions makes progress explicit.
383 pub fn flush_submissions(&mut self) -> Result<(), PipelineError> {
384 if self.pending_submissions == 0 {
385 return Ok(());
386 }
387 if self.ring_state.uses_sqpoll() {
388 if self.ring_state.sq_needs_wakeup() {
389 self.ring_state.wake_sqpoll()?;
390 }
391 } else {
392 self.ring_state.enter(self.pending_submissions, 0, 0)?;
393 }
394 self.pending_submissions = 0;
395 Ok(())
396 }
397
398 /// Reap available completions, advancing the megakernel tail
399 /// pointer once per success. Returns completions reaped.
400 ///
401 /// # Errors
402 ///
403 /// Returns [`PipelineError::IoUringSyscall`] on the first CQE
404 /// reporting `res < 0`. Remaining CQEs are still drained so the
405 /// ring does not overflow, but only the first failure is
406 /// returned - caller re-polls to pick up subsequent errors or
407 /// successes.
408 pub fn poll(&mut self) -> Result<u32, PipelineError> {
409 self.flush_submissions()?;
410 let mut completed: u32 = 0;
411 let mut first_error: Option<PipelineError> = None;
412
413 while let Some(cqe) = self.ring_state.peek_cqe() {
414 let res = cqe.res;
415 self.ring_state.advance_cq();
416 decrement_queue_counter(&mut self.inflight, "inflight SQE count")?;
417
418 if res < 0 {
419 if first_error.is_none() {
420 first_error = Some(PipelineError::IoUringSyscall {
421 syscall: "io_uring_cqe",
422 errno: -res,
423 fix: "inspect user_data to identify the failed SQE; common causes: EIO on disk, EFAULT on bad iovec, EINVAL on misaligned offset",
424 });
425 }
426 continue;
427 }
428
429 // Successful DMA: bytes landed in GPU-visible memory. Tail
430 // publication is batched after CQ drain so one poll with N
431 // completions performs one release atomic instead of N.
432 completed = vyre_driver::accounting::checked_add_u32_value(
433 completed,
434 1,
435 PipelineError::QueueFull {
436 queue: "completion",
437 fix: "io_uring completion count overflowed u32; drain completions more frequently",
438 },
439 )?;
440 }
441
442 if completed != 0 {
443 self.megakernel_tail.fetch_add(completed, Ordering::Release);
444 }
445
446 match first_error {
447 Some(err) => Err(err),
448 None => Ok(completed),
449 }
450 }
451
452 /// Flush pending submissions + wait for at least one completion.
453 ///
454 /// # Errors
455 ///
456 /// Returns [`PipelineError::IoUringSyscall`] if `io_uring_enter`
457 /// fails.
458 pub fn wait_for_completion(&mut self) -> Result<(), PipelineError> {
459 if self.inflight > 0 {
460 self.flush_submissions()?;
461 self.ring_state.enter(0, 1, 1)?;
462 self.poll()?;
463 }
464 Ok(())
465 }
466
467 /// Number of submissions still awaiting completion.
468 #[must_use]
469 pub fn inflight(&self) -> u32 {
470 self.inflight
471 }
472
473 /// Submit an NVMe passthrough command via `IORING_OP_URING_CMD`.
474 /// Requires the `uring-cmd-nvme` feature and Linux kernel 6.0+.
475 ///
476 /// The NVMe SQE layout is encoded by the caller in `nvme_sqe_bytes`
477 /// (64 bytes) - the SQE is memcpy'd into the `addr3`+`addr` slots
478 /// the kernel forwards to the NVMe driver. `user_data` is returned
479 /// on the matching CQE so the caller can correlate completions.
480 ///
481 /// # Errors
482 ///
483 /// - [`PipelineError::NvmePassthroughDisabled`] if the
484 /// `uring-cmd-nvme` feature is not enabled at compile time. This
485 /// variant is unreachable in this cfg-gated method but remains
486 /// part of the public error contract shared with the feature-gated
487 /// implementation.
488 /// - [`PipelineError::QueueFull`] if the SQ is full or the NVMe
489 /// command buffer is malformed (must be exactly 64 bytes).
490 ///
491 /// # Safety
492 ///
493 /// - `fd` must be an open character device the caller has
494 /// `IORING_SETUP_CQE32`-compatible access to (e.g. `/dev/ng0n1`).
495 /// - `nvme_sqe_bytes` must encode a valid NVMe command - kernel
496 /// rejection returns an errno on the CQE, but a forged payload
497 /// can still trigger device-level misbehavior.
498 #[cfg(feature = "uring-cmd-nvme")]
499 pub unsafe fn submit_nvme_passthrough(
500 &mut self,
501 fd: i32,
502 user_data: u64,
503 nvme_sqe_bytes: &[u8],
504 ) -> Result<(), PipelineError> {
505 if nvme_sqe_bytes.len() != 64 {
506 return Err(PipelineError::QueueFull {
507 queue: "submission",
508 fix: "NVMe passthrough SQE must be exactly 64 bytes; see linux/nvme_ioctl.h",
509 });
510 }
511
512 let Some(sqe) = self.ring_state.get_sqe() else {
513 return Err(PipelineError::QueueFull {
514 queue: "submission",
515 fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
516 });
517 };
518
519 // SAFETY: caller-provided slice is 64 bytes as validated
520 // above; we copy into the 64-byte NVMe passthrough region
521 // the kernel expects (addr + addr3 cover the first 40 bytes;
522 // the remaining 24 live in the SQE's inline payload).
523 let nvme_ptr = nvme_sqe_bytes.as_ptr();
524 sqe.opcode = IORING_OP_URING_CMD;
525 sqe.fd = fd;
526 sqe.user_data_or_off = 0;
527 // `cmd_op` in the first 4 bytes of addr3 (kernel reads it as u32).
528 sqe.addr = pointer_addr_u64(nvme_ptr, "NVMe command pointer")?;
529 sqe.len = 64;
530 sqe.user_data = user_data;
531 // The kernel reads the remaining payload bytes out of addr3
532 // directly; downstream NVMe drivers dereference this pointer.
533 sqe.addr3 = pointer_addr_u64(nvme_ptr, "NVMe command addr3 pointer")?;
534
535 self.ring_state.commit_sqe();
536 increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
537 increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
538
539 Ok(())
540 }
541
542 /// Submit an `IORING_OP_READ_FIXED` into a pre-registered buffer.
543 ///
544 /// Requires the caller to have previously called
545 /// [`super::ring::IoUringState::register_buffers`] with an iovec
546 /// slice whose entry `buf_index` covers the target range. Because
547 /// the kernel skips per-SQE iovec validation, this path is 20-40%
548 /// lower latency than `submit_read_to_gpu` on hot loops.
549 ///
550 /// # Errors
551 ///
552 /// - [`PipelineError::QueueFull`] if the SQ is full or the
553 /// destination range exceeds the GPU buffer bounds.
554 ///
555 /// # Safety
556 ///
557 /// The `buf_index` must reference a still-registered iovec whose
558 /// region overlaps `chunk_idx * len .. (chunk_idx + 1) * len`
559 /// inside the [`GpuMappedBuffer`]. Mis-indexing produces a kernel
560 /// DMA into the wrong region - silent data corruption.
561 pub unsafe fn submit_read_fixed(
562 &mut self,
563 fd: i32,
564 offset: u64,
565 len: u32,
566 chunk_idx: usize,
567 buf_index: u16,
568 ) -> Result<(), PipelineError> {
569 let target_offset = checked_chunk_target_offset(chunk_idx, len)?;
570 // SAFETY: Safe FFI / low-level operation verified and audited for Release compliance.
571 unsafe {
572 self.submit_read_fixed_at(
573 fd,
574 offset,
575 len,
576 target_offset,
577 buf_index,
578 usize_to_u64(chunk_idx, "chunk index")?,
579 )
580 }
581 }
582
583 /// Submit an `IORING_OP_READ_FIXED` into a registered buffer at an
584 /// explicit destination offset inside the mapped buffer.
585 ///
586 /// Unlike [`AsyncUringStream::submit_read_fixed`], this variant decouples
587 /// the CQE `user_data` from the destination layout so higher-level
588 /// drivers can publish their own slot ids while still using a fixed slot
589 /// stride.
590 ///
591 /// # Errors
592 ///
593 /// - [`PipelineError::QueueFull`] if the SQ is full or the
594 /// destination range exceeds the GPU buffer bounds.
595 ///
596 /// # Safety
597 ///
598 /// `buf_index` must reference a still-registered iovec covering the
599 /// target range, and `user_data` must remain meaningful to the caller
600 /// until the CQE is reaped.
601 pub unsafe fn submit_read_fixed_at(
602 &mut self,
603 fd: i32,
604 offset: u64,
605 len: u32,
606 target_offset: u64,
607 buf_index: u16,
608 user_data: u64,
609 ) -> Result<(), PipelineError> {
610 let end = checked_target_end(target_offset, len)?;
611 let gpu_len = usize_to_u64(self.gpu_buffer.len(), "mapped GPU buffer length")?;
612 if end > gpu_len {
613 return Err(PipelineError::QueueFull {
614 queue: "submission",
615 fix: "chunk_idx * len exceeds GpuMappedBuffer length",
616 });
617 }
618
619 let Some(sqe) = self.ring_state.get_sqe() else {
620 return Err(PipelineError::QueueFull {
621 queue: "submission",
622 fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
623 });
624 };
625
626 // SAFETY: bounds-checked target address inside the host-visible
627 // GpuMappedBuffer the caller committed at construction.
628 let target_addr = unsafe {
629 self.gpu_buffer
630 .as_ptr()
631 .add(u64_to_usize(target_offset, "target offset")?)
632 };
633
634 sqe.opcode = IORING_OP_READ_FIXED;
635 sqe.fd = fd;
636 sqe.user_data_or_off = offset;
637 sqe.addr = pointer_addr_u64(target_addr, "fixed-read target pointer")?;
638 sqe.len = len;
639 sqe.buf_index = buf_index;
640 sqe.user_data = user_data;
641
642 self.ring_state.commit_sqe();
643 increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
644 increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
645
646 Ok(())
647 }
648
649 /// Submit a read using a registered-file-table index instead of a
650 /// live fd. Use with
651 /// [`super::ring::IoUringState::register_files`] - avoids the
652 /// per-SQE file refcount bump.
653 ///
654 /// # Errors
655 ///
656 /// Same surface as [`AsyncUringStream::submit_read_to_gpu`].
657 ///
658 /// # Safety
659 ///
660 /// `file_index` must name a still-registered fd.
661 /// `iovs_storage` must outlive the completion. All other
662 /// conditions match `submit_read_to_gpu`.
663 pub unsafe fn submit_read_to_gpu_fixed_file(
664 &mut self,
665 file_index: i32,
666 offset: u64,
667 len: u32,
668 chunk_idx: usize,
669 iovs_storage: &mut [Iovec],
670 ) -> Result<(), PipelineError> {
671 if iovs_storage.is_empty() {
672 return Err(PipelineError::QueueFull {
673 queue: "submission",
674 fix: "caller supplied empty iovs_storage; pass at least one slot",
675 });
676 }
677 let target_offset = checked_chunk_target_offset(chunk_idx, len)?;
678 let end = checked_target_end(target_offset, len)?;
679 let gpu_len = usize_to_u64(self.gpu_buffer.len(), "mapped GPU buffer length")?;
680 if end > gpu_len {
681 return Err(PipelineError::QueueFull {
682 queue: "submission",
683 fix: "chunk_idx * len exceeds GpuMappedBuffer length",
684 });
685 }
686
687 let Some(sqe) = self.ring_state.get_sqe() else {
688 return Err(PipelineError::QueueFull {
689 queue: "submission",
690 fix: "SQ full; call AsyncUringStream::poll to drain completions then retry",
691 });
692 };
693
694 // SAFETY: same invariants as submit_read_to_gpu, plus the
695 // caller committed that file_index is a registered fd.
696 let target_addr = unsafe {
697 self.gpu_buffer
698 .as_ptr()
699 .add(u64_to_usize(target_offset, "target offset")?)
700 };
701 iovs_storage[0] = Iovec {
702 iov_base: target_addr.cast::<core::ffi::c_void>(),
703 iov_len: u32_to_usize(len, "read length")?,
704 };
705
706 sqe.opcode = IORING_OP_READV;
707 sqe.flags = super::ring::IOSQE_FIXED_FILE;
708 sqe.fd = file_index;
709 sqe.user_data_or_off = offset;
710 sqe.addr = pointer_addr_u64(iovs_storage.as_ptr(), "fixed-file readv iovec pointer")?;
711 sqe.len = 1;
712 sqe.user_data = usize_to_u64(chunk_idx, "chunk index")?;
713
714 self.ring_state.commit_sqe();
715 increment_queue_counter(&mut self.inflight, "inflight SQE count")?;
716 increment_queue_counter(&mut self.pending_submissions, "pending submission count")?;
717
718 Ok(())
719 }
720
721 /// Disabled-feature implementation for NVMe passthrough. Always returns
722 /// [`PipelineError::NvmePassthroughDisabled`] so callers get a
723 /// structured error rather than a link failure.
724 #[cfg(not(feature = "uring-cmd-nvme"))]
725 #[allow(clippy::unused_self, clippy::missing_safety_doc)]
726 pub unsafe fn submit_nvme_passthrough(
727 &mut self,
728 _fd: i32,
729 _user_data: u64,
730 _nvme_sqe_bytes: &[u8],
731 ) -> Result<(), PipelineError> {
732 Err(PipelineError::NvmePassthroughDisabled)
733 }
734}
735
736fn checked_chunk_target_offset(chunk_idx: usize, len: u32) -> Result<u64, PipelineError> {
737 let chunk_idx = usize_to_u64(chunk_idx, "chunk index")?;
738 vyre_driver::accounting::checked_mul_u64_lazy(chunk_idx, u64::from(len), || {
739 PipelineError::QueueFull {
740 queue: "submission",
741 fix: "chunk_idx * len overflows u64; split the IO batch before submission",
742 }
743 })
744}
745
746fn checked_target_end(target_offset: u64, len: u32) -> Result<u64, PipelineError> {
747 vyre_driver::accounting::checked_add_u64_lazy(target_offset, u64::from(len), || {
748 PipelineError::QueueFull {
749 queue: "submission",
750 fix: "target_offset + len overflows u64; split the IO batch before submission",
751 }
752 })
753}
754
755fn increment_queue_counter(counter: &mut u32, label: &'static str) -> Result<(), PipelineError> {
756 *counter = vyre_driver::accounting::checked_add_u32_value(
757 *counter,
758 1,
759 PipelineError::QueueFull {
760 queue: "submission",
761 fix: match label {
762 "inflight SQE count" => {
763 "inflight SQE count overflowed u32; poll completions before submitting more work"
764 }
765 "pending submission count" => {
766 "pending submission count overflowed u32; flush submissions before queuing more work"
767 }
768 _ => {
769 "io_uring queue counter overflowed u32; drain the queue before submitting more work"
770 }
771 },
772 },
773 )?;
774 Ok(())
775}
776
777fn decrement_queue_counter(counter: &mut u32, label: &'static str) -> Result<(), PipelineError> {
778 *counter = counter.checked_sub(1).ok_or(PipelineError::QueueFull {
779 queue: "completion",
780 fix: match label {
781 "inflight SQE count" => {
782 "io_uring completion arrived with no inflight SQE; rebuild the stream state"
783 }
784 _ => "io_uring queue counter underflowed; rebuild the stream state",
785 },
786 })?;
787 Ok(())
788}
789
790fn usize_to_u64(value: usize, label: &'static str) -> Result<u64, PipelineError> {
791 u64::try_from(value).map_err(|_| PipelineError::QueueFull {
792 queue: "submission",
793 fix: match label {
794 "chunk index" => "chunk index cannot fit u64; split the IO batch before submission",
795 "mapped GPU buffer length" => {
796 "mapped GPU buffer length cannot fit u64; split the staging allocation"
797 }
798 _ => "host usize value cannot fit u64; split the IO batch before submission",
799 },
800 })
801}
802
803fn pointer_addr_u64<T>(ptr: *const T, label: &'static str) -> Result<u64, PipelineError> {
804 usize_to_u64(ptr.addr(), label)
805}
806
807fn u64_to_usize(value: u64, label: &'static str) -> Result<usize, PipelineError> {
808 usize::try_from(value).map_err(|_| PipelineError::QueueFull {
809 queue: "submission",
810 fix: match label {
811 "target offset" => {
812 "target offset cannot fit usize; split the IO batch before submission"
813 }
814 _ => "u64 value cannot fit usize; split the IO batch before submission",
815 },
816 })
817}
818
819fn u32_to_usize(value: u32, label: &'static str) -> Result<usize, PipelineError> {
820 usize::try_from(value).map_err(|_| PipelineError::QueueFull {
821 queue: "submission",
822 fix: match label {
823 "read length" => "read length cannot fit usize; split the IO request before submission",
824 _ => "u32 value cannot fit usize; split the IO request before submission",
825 },
826 })
827}
828
829#[cfg(test)]
830mod tests {
831 use super::*;
832
833 #[test]
834 fn mapped_slice_roundtrip_is_miri_clean() {
835 let mut backing = [1_u8, 2, 3, 4];
836 // SAFETY: `backing` stays live and uniquely borrowed for the mapped buffer lifetime.
837 let mut mapped = unsafe { GpuMappedBuffer::from_host_visible_slice(&mut backing) };
838 // SAFETY: the mapped buffer was built from `backing` and remains uniquely borrowed.
839 let slice = unsafe { mapped.as_mut_slice() };
840 slice[0] = 9;
841 slice[3] = 7;
842 assert_eq!(backing, [9, 2, 3, 7]);
843 }
844}