Skip to main content

vyre_runtime/uring/
driver.rs

1//! End-to-end ingest driver: file/NVMe -> io_uring -> mapped slot -> io_queue.
2
3use std::fs::File;
4use std::os::fd::AsRawFd;
5use std::path::Path;
6
7use crate::megakernel::MegakernelIoQueue;
8use crate::PipelineError;
9
10#[cfg(feature = "uring-cmd-nvme")]
11use super::gpudirect::encode_nvme_read_sqe;
12use super::gpudirect::GpuDirectCapability;
13use super::stream::{AsyncUringStream, GpuMappedBuffer, Iovec};
14
15#[derive(Debug)]
16struct PendingIngest {
17    _file: Option<File>,
18    tag: u32,
19    completion: PendingCompletion,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23#[cfg_attr(not(feature = "uring-cmd-nvme"), allow(dead_code))]
24enum PendingCompletion {
25    ByteCountFromCqe,
26    NativeNvmeStatus { expected_byte_count: u32 },
27}
28
29/// Host-visible completion surfaced after the DMA completes.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct CompletedIngest {
32    /// Queue slot that completed.
33    pub slot: u32,
34    /// Number of bytes the kernel reported as transferred.
35    pub byte_count: u32,
36    /// Caller-defined tag mirrored into the `io_queue`.
37    pub tag: u32,
38}
39
40/// Runtime telemetry for NVMe/file ingest into GPU-visible memory.
41///
42/// `cpu_bounce_bytes` is intentionally part of the public snapshot. The
43/// `NvmeGpuIngestDriver` never targets an ordinary userspace bounce buffer, so
44/// this counter must remain zero across both registered mapped reads and native
45/// GPUDirect NVMe passthrough.
46#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
47pub struct NvmeGpuIngestTelemetry {
48    /// Bytes submitted to io_uring reads.
49    pub submitted_bytes: u64,
50    /// Bytes completed and published into the megakernel IO queue.
51    pub completed_bytes: u64,
52    /// Total read submissions accepted by io_uring.
53    pub submitted_reads: u64,
54    /// Completed reads published into the megakernel IO queue.
55    pub completed_reads: u64,
56    /// Submissions using `IORING_OP_READ_FIXED` into registered GPU-visible memory.
57    pub registered_mapped_read_submissions: u64,
58    /// Submissions using native `IORING_OP_URING_CMD` NVMe passthrough into BAR1 memory.
59    pub gpudirect_nvme_submissions: u64,
60    /// Bytes copied through ordinary userspace bounce buffers.
61    pub cpu_bounce_bytes: u64,
62    /// CQEs that completed with an error or without matching pending metadata.
63    pub failed_completions: u64,
64}
65
66impl NvmeGpuIngestTelemetry {
67    /// Inflight read count derived from accepted submissions and terminal CQEs.
68    #[must_use]
69    pub fn inflight_reads(self) -> u64 {
70        self.submitted_reads
71            .saturating_sub(self.completed_reads)
72            .saturating_sub(self.failed_completions)
73    }
74
75    /// Read submissions recorded for a specific native ingest path.
76    #[must_use]
77    pub fn path_submissions(self, path: NativeReadPath) -> u64 {
78        match path {
79            NativeReadPath::RegisteredMappedRead => self.registered_mapped_read_submissions,
80            NativeReadPath::GpuDirectNvmePassthrough => self.gpudirect_nvme_submissions,
81        }
82    }
83
84    /// Validate that the snapshot describes a completed zero-copy ingest run for `path`.
85    ///
86    /// This method intentionally does not validate a benchmark's expected byte
87    /// count; it validates the runtime invariant that every submitted read on
88    /// the selected path completed without CPU bounce buffers or path mixing.
89    ///
90    /// # Errors
91    ///
92    /// Returns [`PipelineError::Backend`] with an actionable fix when the
93    /// snapshot reports a CPU bounce copy, failed/inflight reads, incomplete
94    /// byte accounting, incomplete read accounting, or submissions on the
95    /// wrong native ingest path.
96    pub fn validate_completed_zero_copy(self, path: NativeReadPath) -> Result<(), PipelineError> {
97        if self.cpu_bounce_bytes != 0 {
98            return Err(PipelineError::Backend(format!(
99                "NVMe GPU ingest copied {} bytes through a CPU bounce buffer. Fix: route reads through registered GPU-visible slots or native GPUDirect NVMe passthrough.",
100                self.cpu_bounce_bytes
101            )));
102        }
103        if self.failed_completions != 0 {
104            return Err(PipelineError::Backend(format!(
105                "NVMe GPU ingest reported {} failed completions. Fix: inspect CQE status before publishing slots to the megakernel IO queue.",
106                self.failed_completions
107            )));
108        }
109        let inflight = self.inflight_reads();
110        if inflight != 0 {
111            return Err(PipelineError::Backend(format!(
112                "NVMe GPU ingest left {inflight} reads inflight. Fix: drain completions before taking release telemetry snapshots."
113            )));
114        }
115        if self.submitted_bytes != self.completed_bytes {
116            return Err(PipelineError::Backend(format!(
117                "NVMe GPU ingest byte accounting mismatch: submitted={}, completed={}. Fix: account CQE byte counts exactly once.",
118                self.submitted_bytes, self.completed_bytes
119            )));
120        }
121        if self.submitted_reads != self.completed_reads {
122            return Err(PipelineError::Backend(format!(
123                "NVMe GPU ingest read accounting mismatch: submitted={}, completed={}. Fix: account every terminal CQE exactly once.",
124                self.submitted_reads, self.completed_reads
125            )));
126        }
127        let selected_path_submissions = self.path_submissions(path);
128        if selected_path_submissions != self.submitted_reads {
129            return Err(PipelineError::Backend(format!(
130                "NVMe GPU ingest path submission mismatch for {path:?}: path_submissions={}, submitted_reads={}. Fix: construct the driver with the same native read path used by the benchmark.",
131                selected_path_submissions, self.submitted_reads
132            )));
133        }
134        let mixed_path_submissions = match path {
135            NativeReadPath::RegisteredMappedRead => self.gpudirect_nvme_submissions,
136            NativeReadPath::GpuDirectNvmePassthrough => self.registered_mapped_read_submissions,
137        };
138        if mixed_path_submissions != 0 {
139            return Err(PipelineError::Backend(format!(
140                "NVMe GPU ingest mixed {mixed_path_submissions} submissions from the non-selected path into {path:?}. Fix: keep registered mapped reads and native GPUDirect passthrough telemetry separate."
141            )));
142        }
143        Ok(())
144    }
145
146    fn record_submit(
147        &mut self,
148        path: NativeReadPath,
149        byte_count: u32,
150    ) -> Result<(), PipelineError> {
151        self.submitted_bytes = checked_telemetry_add(
152            self.submitted_bytes,
153            u64::from(byte_count),
154            "submitted bytes",
155        )?;
156        self.submitted_reads = checked_telemetry_add(self.submitted_reads, 1, "submitted reads")?;
157        match path {
158            NativeReadPath::RegisteredMappedRead => {
159                self.registered_mapped_read_submissions = checked_telemetry_add(
160                    self.registered_mapped_read_submissions,
161                    1,
162                    "registered mapped read submissions",
163                )?;
164            }
165            NativeReadPath::GpuDirectNvmePassthrough => {
166                self.gpudirect_nvme_submissions = checked_telemetry_add(
167                    self.gpudirect_nvme_submissions,
168                    1,
169                    "GPUDirect NVMe submissions",
170                )?;
171            }
172        }
173        Ok(())
174    }
175
176    fn record_complete(&mut self, byte_count: u32) -> Result<(), PipelineError> {
177        self.completed_bytes = checked_telemetry_add(
178            self.completed_bytes,
179            u64::from(byte_count),
180            "completed bytes",
181        )?;
182        self.completed_reads = checked_telemetry_add(self.completed_reads, 1, "completed reads")?;
183        Ok(())
184    }
185
186    fn record_failed_completion(&mut self) -> Result<(), PipelineError> {
187        self.failed_completions =
188            checked_telemetry_add(self.failed_completions, 1, "failed completions")?;
189        Ok(())
190    }
191}
192
193/// Native-read strategy used by [`NvmeGpuIngestDriver`].
194#[derive(Debug, Clone, Copy, PartialEq, Eq)]
195pub enum NativeReadPath {
196    /// `IORING_OP_READ_FIXED` into a registered GPU-visible mapping.
197    ///
198    /// This removes the userspace bounce buffer but still uses normal file
199    /// reads submitted by the CPU. It is the compatibility path for filesystems
200    /// and GPU memory APIs that do not expose BAR1 peer DMA.
201    RegisteredMappedRead,
202    /// `IORING_OP_URING_CMD` NVMe read into BAR1 peer memory.
203    ///
204    /// This is the canonical native ingest path: CPU submits one NVMe command,
205    /// the device DMAs bytes directly into GPU-owned memory, and the megakernel
206    /// consumes the published slot.
207    GpuDirectNvmePassthrough,
208}
209
210/// Wire the Linux ingest loop end-to-end without userspace bounce buffers.
211pub struct NvmeGpuIngestDriver<'a> {
212    stream: AsyncUringStream<'a>,
213    mapped_slots: Vec<GpuMappedBuffer<'a>>,
214    registered_iovecs: Vec<Iovec>,
215    megakernel_io_queue: MegakernelIoQueue,
216    pending: Vec<Option<PendingIngest>>,
217    slot_bytes: usize,
218    read_path: NativeReadPath,
219    telemetry: NvmeGpuIngestTelemetry,
220}
221
222impl<'a> NvmeGpuIngestDriver<'a> {
223    /// Split one mapped staging buffer into `slot_count` fixed-size slots and
224    /// register those slots for `IORING_OP_READ_FIXED`.
225    ///
226    /// # Errors
227    ///
228    /// Returns [`PipelineError::QueueFull`] when the buffer cannot be evenly
229    /// partitioned into non-empty slots, or an io_uring syscall error if
230    /// buffer registration fails.
231    pub fn new(
232        stream: AsyncUringStream<'a>,
233        slot_count: u32,
234        megakernel_io_queue: MegakernelIoQueue,
235    ) -> Result<Self, PipelineError> {
236        Self::new_with_path(
237            stream,
238            slot_count,
239            megakernel_io_queue,
240            NativeReadPath::RegisteredMappedRead,
241        )
242    }
243
244    /// Construct a driver that requires native GPUDirect NVMe passthrough.
245    ///
246    /// This constructor fails loudly when `uring-cmd-nvme` is not compiled in
247    /// or `nvidia-fs` is not active. Callers that need the VYRE canonical path
248    /// should use this instead of [`NvmeGpuIngestDriver::new`].
249    ///
250    /// # Errors
251    ///
252    /// Returns [`PipelineError::NvmePassthroughDisabled`] when the feature is
253    /// absent, [`PipelineError::Backend`] when the host probe rejects
254    /// GPUDirect, or the same slot-partitioning errors as
255    /// [`NvmeGpuIngestDriver::new`].
256    pub fn new_gpudirect(
257        stream: AsyncUringStream<'a>,
258        slot_count: u32,
259        megakernel_io_queue: MegakernelIoQueue,
260    ) -> Result<Self, PipelineError> {
261        match GpuDirectCapability::probe() {
262            GpuDirectCapability::Available { .. } => Self::new_with_path(
263                stream,
264                slot_count,
265                megakernel_io_queue,
266                NativeReadPath::GpuDirectNvmePassthrough,
267            ),
268            GpuDirectCapability::FeatureDisabled => Err(PipelineError::NvmePassthroughDisabled),
269            GpuDirectCapability::Unavailable { reason } => Err(PipelineError::Backend(format!(
270                "GPUDirect native read unavailable: {reason}. Fix: install/enable nvidia-fs, use a BAR1-backed GpuMappedBuffer, or use NvmeGpuIngestDriver::new for registered mapped reads."
271            ))),
272        }
273    }
274
275    fn new_with_path(
276        stream: AsyncUringStream<'a>,
277        slot_count: u32,
278        megakernel_io_queue: MegakernelIoQueue,
279        read_path: NativeReadPath,
280    ) -> Result<Self, PipelineError> {
281        let total_len = stream.gpu_buffer.len();
282        let slot_count_usize =
283            usize::try_from(slot_count).map_err(|_| PipelineError::QueueFull {
284                queue: "submission",
285                fix: "slot_count does not fit host usize; reduce the ingest slot count",
286            })?;
287        let slot_bytes = partition_slot_bytes(total_len, slot_count_usize)?;
288
289        let mut mapped_slots = Vec::new();
290        let mut registered_iovecs = Vec::new();
291        let mut pending = Vec::new();
292        reserve_ingest_vec_capacity(
293            &mut mapped_slots,
294            slot_count_usize,
295            "mapped GPU ingest slots",
296        )?;
297        reserve_ingest_vec_capacity(
298            &mut registered_iovecs,
299            slot_count_usize,
300            "registered io_uring iovecs",
301        )?;
302        reserve_ingest_vec_capacity(&mut pending, slot_count_usize, "pending ingest slots")?;
303        for slot in 0..slot_count_usize {
304            let offset = slot * slot_bytes;
305            let slot_buffer = stream.gpu_buffer.sub_region(offset, slot_bytes)?;
306            registered_iovecs.push(Iovec {
307                iov_base: slot_buffer.as_ptr().cast(),
308                iov_len: slot_buffer.len(),
309            });
310            mapped_slots.push(slot_buffer);
311        }
312        pending.resize_with(slot_count_usize, || None);
313        Ok(Self {
314            stream,
315            mapped_slots,
316            registered_iovecs,
317            megakernel_io_queue,
318            pending,
319            slot_bytes,
320            read_path,
321            telemetry: NvmeGpuIngestTelemetry::default(),
322        })
323    }
324
325    /// Read an entire file into a fixed ingest slot.
326    ///
327    /// # Errors
328    ///
329    /// Returns [`PipelineError::QueueFull`] when the slot is already in
330    /// flight or the file is larger than the slot capacity.
331    pub fn submit_file(&mut self, path: &Path, slot: u32) -> Result<(), PipelineError> {
332        let slot_usize = self.validate_slot_for_submit(slot)?;
333
334        let file = File::open(path).map_err(|error| {
335            PipelineError::Backend(format!("open {} failed: {error}", path.display()))
336        })?;
337        let file_len = file
338            .metadata()
339            .map_err(|error| {
340                PipelineError::Backend(format!("stat {} failed: {error}", path.display()))
341            })?
342            .len();
343        let slot_bytes_u64 = usize_to_u64(self.slot_bytes, "ingest slot byte length")?;
344        if file_len > slot_bytes_u64 {
345            return Err(PipelineError::QueueFull {
346                queue: "submission",
347                fix: "file exceeds the configured ingest slot size; enlarge the mapped staging buffer or segment the file",
348            });
349        }
350
351        let byte_count = u32::try_from(file_len).map_err(|_| PipelineError::QueueFull {
352            queue: "submission",
353            fix: "file length exceeds u32 read size even though it fit the slot; split the ingest file",
354        })?;
355        let target_offset = slot_byte_offset(slot_usize, self.slot_bytes)?;
356        let slot_iovec = &mut self.registered_iovecs[slot_usize..slot_usize + 1];
357        // SAFETY: `slot_iovec` and file descriptor stay live until the CQE is reaped.
358        unsafe {
359            self.stream.submit_read_to_gpu_at(
360                file.as_raw_fd(),
361                0,
362                byte_count,
363                target_offset,
364                slot_iovec,
365            )?;
366        }
367        self.telemetry
368            .record_submit(NativeReadPath::RegisteredMappedRead, byte_count)?;
369        self.pending[slot_usize] = Some(PendingIngest {
370            _file: Some(file),
371            tag: slot,
372            completion: PendingCompletion::ByteCountFromCqe,
373        });
374        Ok(())
375    }
376
377    /// Submit one native NVMe read directly into the mapped slot.
378    ///
379    /// `nvme_fd` must name an NVMe character device such as `/dev/ng0n1`.
380    /// `mapped_slots[slot]` must be a BAR1 peer-memory region created with
381    /// [`GpuMappedBuffer::from_bar1_peer_with_owner`]. On completion the slot is
382    /// published to the megakernel `io_queue`; the CPU does not copy bytes.
383    ///
384    /// # Errors
385    ///
386    /// Returns [`PipelineError::NvmePassthroughDisabled`] when the build lacks
387    /// native NVMe passthrough support, [`PipelineError::QueueFull`] when the
388    /// slot or byte range is invalid, or [`PipelineError::Backend`] when this
389    /// driver was constructed for the compatibility path.
390    ///
391    /// # Safety
392    ///
393    /// The caller must ensure `nvme_fd`, `namespace_id`, `starting_lba`, and
394    /// `blocks` describe a valid device range, and that the mapped slot remains
395    /// a valid peer-DMA destination until its CQE is reaped.
396    #[cfg(feature = "uring-cmd-nvme")]
397    #[allow(clippy::too_many_arguments)]
398    pub unsafe fn submit_native_nvme_read(
399        &mut self,
400        nvme_fd: i32,
401        namespace_id: u32,
402        starting_lba: u64,
403        blocks: u32,
404        bytes_per_block: u32,
405        slot: u32,
406    ) -> Result<(), PipelineError> {
407        if self.read_path != NativeReadPath::GpuDirectNvmePassthrough {
408            return Err(PipelineError::Backend(
409                "native NVMe read submitted on a registered-mapped-read driver. Fix: construct with NvmeGpuIngestDriver::new_gpudirect and a BAR1-backed GpuMappedBuffer.".to_string(),
410            ));
411        }
412        let slot_usize = self.validate_slot_for_submit(slot)?;
413        if blocks == 0 || bytes_per_block == 0 {
414            return Err(PipelineError::QueueFull {
415                queue: "submission",
416                fix: "native NVMe reads require non-zero block count and bytes_per_block",
417            });
418        }
419        let byte_count = vyre_driver::accounting::checked_mul_u32_value(
420            blocks,
421            bytes_per_block,
422            PipelineError::QueueFull {
423                queue: "submission",
424                fix: "native NVMe read byte count overflowed u32; submit a smaller range",
425            },
426        )?;
427        let byte_count_usize =
428            usize::try_from(byte_count).map_err(|_| PipelineError::QueueFull {
429                queue: "submission",
430                fix: "native NVMe read byte count cannot fit host usize; submit a smaller range",
431            })?;
432        if byte_count_usize > self.slot_bytes {
433            return Err(PipelineError::QueueFull {
434                queue: "submission",
435                fix: "native NVMe read exceeds the configured ingest slot size; enlarge the BAR1 mapped slot or submit fewer blocks",
436            });
437        }
438
439        let dest = usize_to_u64(
440            self.mapped_slots[slot_usize].as_ptr().addr(),
441            "mapped BAR1 destination pointer",
442        )?;
443        let sqe = encode_nvme_read_sqe(namespace_id, starting_lba, blocks, dest);
444        let user_data = slot_byte_offset(slot_usize, self.slot_bytes)?;
445        // SAFETY: forwarded from this method's contract; the SQE is built
446        // from validated scalar fields and a slot-local BAR1 destination.
447        unsafe {
448            self.stream
449                .submit_nvme_passthrough(nvme_fd, user_data, &sqe)?;
450        }
451        self.telemetry
452            .record_submit(NativeReadPath::GpuDirectNvmePassthrough, byte_count)?;
453        self.pending[slot_usize] = Some(PendingIngest {
454            _file: None,
455            tag: slot,
456            completion: PendingCompletion::NativeNvmeStatus {
457                expected_byte_count: byte_count,
458            },
459        });
460        Ok(())
461    }
462
463    /// Disabled-feature variant of [`NvmeGpuIngestDriver::submit_native_nvme_read`].
464    ///
465    /// # Errors
466    ///
467    /// Always returns [`PipelineError::NvmePassthroughDisabled`].
468    ///
469    /// # Safety
470    ///
471    /// This method does not touch `nvme_fd`; it exists to keep the public API
472    /// structured across feature sets.
473    #[cfg(not(feature = "uring-cmd-nvme"))]
474    #[allow(clippy::too_many_arguments)]
475    pub unsafe fn submit_native_nvme_read(
476        &mut self,
477        _nvme_fd: i32,
478        _namespace_id: u32,
479        _starting_lba: u64,
480        _blocks: u32,
481        _bytes_per_block: u32,
482        _slot: u32,
483    ) -> Result<(), PipelineError> {
484        Err(PipelineError::NvmePassthroughDisabled)
485    }
486
487    /// Flush submissions, reap CQEs, and publish the completed slots into the
488    /// megakernel `io_queue`.
489    pub fn poll_completions(&mut self) -> Result<Vec<CompletedIngest>, PipelineError> {
490        let mut completed = Vec::new();
491        self.poll_completions_into(&mut completed)?;
492        Ok(completed)
493    }
494
495    /// Flush submissions, reap CQEs, and append completed slots into
496    /// caller-owned storage.
497    ///
498    /// Reusing `completed` across polls keeps the ingest hot path allocation
499    /// free after driver construction.
500    pub fn poll_completions_into(
501        &mut self,
502        completed: &mut Vec<CompletedIngest>,
503    ) -> Result<(), PipelineError> {
504        completed.clear();
505        self.stream.flush_submissions()?;
506        let inflight_capacity =
507            usize::try_from(self.stream.inflight).map_err(|_| PipelineError::Backend(
508                "io_uring inflight completion count cannot fit host usize. Fix: shard ingest submissions before polling completions."
509                    .to_string(),
510            ))?;
511        reserve_ingest_vec_capacity(completed, inflight_capacity, "completed ingest records")?;
512        let mut first_error: Option<PipelineError> = None;
513
514        while let Some(cqe) = self.stream.ring_state.peek_cqe() {
515            let res = cqe.res;
516            if self.slot_bytes == 0 {
517                return Err(PipelineError::Backend(
518                    "io_uring ingest driver has zero slot_bytes. Fix: construct NvmeGpuIngestDriver with at least one non-empty mapped slot.".to_string(),
519                ));
520            }
521            let user_data = usize::try_from(cqe.user_data).map_err(|_| {
522                PipelineError::Backend(format!(
523                    "io_uring CQE user_data {} does not fit host usize. Fix: keep slot byte offsets within host addressable range.",
524                    cqe.user_data
525                ))
526            })?;
527            let slot = user_data / self.slot_bytes;
528            self.stream.ring_state.advance_cq();
529            self.stream.inflight = self.stream.inflight.checked_sub(1).ok_or_else(|| {
530                PipelineError::Backend(
531                    "io_uring completion arrived with zero inflight submissions. Fix: audit submit/completion accounting before reusing this stream.".to_string(),
532                )
533            })?;
534
535            let pending = self.pending.get_mut(slot).and_then(Option::take);
536            if res < 0 {
537                self.telemetry.record_failed_completion()?;
538                if first_error.is_none() {
539                    first_error = Some(PipelineError::IoUringSyscall {
540                        syscall: "io_uring_cqe",
541                        errno: -res,
542                        fix: "inspect the offending file descriptor and slot metadata; common causes are EIO on disk or EFAULT on an invalid registered buffer",
543                    });
544                }
545                continue;
546            }
547
548            let pending = match pending {
549                Some(pending) => pending,
550                None => {
551                    self.telemetry.record_failed_completion()?;
552                    if first_error.is_none() {
553                        first_error = Some(PipelineError::Backend(format!(
554                            "CQE for slot {slot} arrived without matching pending metadata"
555                        )));
556                    }
557                    continue;
558                }
559            };
560            let byte_count = match pending.completion {
561                PendingCompletion::ByteCountFromCqe => {
562                    u32::try_from(res).map_err(|_| PipelineError::Backend(format!(
563                        "io_uring CQE byte count {res} cannot fit u32. Fix: split ingest reads so completions stay within the megakernel io_queue ABI."
564                    )))?
565                }
566                PendingCompletion::NativeNvmeStatus {
567                    expected_byte_count,
568                } => {
569                    if res != 0 {
570                        self.telemetry.record_failed_completion()?;
571                        if first_error.is_none() {
572                            first_error = Some(PipelineError::Backend(format!(
573                                "NVMe passthrough completion for slot {slot} returned non-zero status {res}. Fix: inspect namespace id, LBA range, permissions, and nvidia-fs state."
574                            )));
575                        }
576                        continue;
577                    }
578                    expected_byte_count
579                }
580            };
581            let slot_u32 = u32::try_from(slot).map_err(|_| PipelineError::Backend(format!(
582                "io_uring completion slot {slot} cannot fit u32. Fix: shard ingest slots before publishing to the megakernel io_queue."
583            )))?;
584            self.megakernel_io_queue
585                .publish_slot(slot_u32, slot_u32, byte_count, pending.tag)?;
586            self.telemetry.record_complete(byte_count)?;
587            completed.push(CompletedIngest {
588                slot: slot_u32,
589                byte_count,
590                tag: pending.tag,
591            });
592        }
593
594        match first_error {
595            Some(err) => Err(err),
596            None => Ok(()),
597        }
598    }
599
600    /// Borrow the raw io_queue bytes for backend upload/readback.
601    #[must_use]
602    pub fn megakernel_io_queue(&self) -> &MegakernelIoQueue {
603        &self.megakernel_io_queue
604    }
605
606    /// Mutable access to the io_queue bytes.
607    #[must_use]
608    pub fn megakernel_io_queue_mut(&mut self) -> &mut MegakernelIoQueue {
609        &mut self.megakernel_io_queue
610    }
611
612    /// Fixed slot size in bytes.
613    #[must_use]
614    pub fn slot_bytes(&self) -> usize {
615        self.slot_bytes
616    }
617
618    /// Number of registered slots.
619    #[must_use]
620    pub fn slot_count(&self) -> usize {
621        self.registered_iovecs.len()
622    }
623
624    /// Read path this driver was constructed to use.
625    #[must_use]
626    pub fn read_path(&self) -> NativeReadPath {
627        self.read_path
628    }
629
630    /// Snapshot ingest telemetry counters.
631    #[must_use]
632    pub fn telemetry_snapshot(&self) -> NvmeGpuIngestTelemetry {
633        self.telemetry
634    }
635
636    /// Reset ingest telemetry counters without changing pending submissions.
637    pub fn reset_telemetry(&mut self) {
638        self.telemetry = NvmeGpuIngestTelemetry::default();
639    }
640
641    fn validate_slot_for_submit(&self, slot: u32) -> Result<usize, PipelineError> {
642        let slot_usize = usize::try_from(slot).map_err(|_| PipelineError::QueueFull {
643            queue: "submission",
644            fix: "slot index cannot fit host usize; shard mapped ingest slots",
645        })?;
646        if slot_usize >= self.mapped_slots.len() {
647            return Err(PipelineError::QueueFull {
648                queue: "submission",
649                fix: "slot exceeds the configured mapped-slot count",
650            });
651        }
652        if self.pending[slot_usize].is_some() {
653            return Err(PipelineError::QueueFull {
654                queue: "submission",
655                fix: "slot already has an in-flight ingest; drain completions before reusing it",
656            });
657        }
658        Ok(slot_usize)
659    }
660}
661
662
663fn checked_telemetry_add(
664    current: u64,
665    increment: u64,
666    label: &'static str,
667) -> Result<u64, PipelineError> {
668    vyre_driver::accounting::checked_add_u64_lazy(current, increment, || {
669        PipelineError::Backend(format!(
670            "io_uring ingest telemetry {label} overflowed u64. Fix: snapshot and reset telemetry before counters saturate."
671        ))
672    })
673}
674
675fn usize_to_u64(value: usize, label: &'static str) -> Result<u64, PipelineError> {
676    u64::try_from(value).map_err(|_| {
677        PipelineError::Backend(format!(
678            "{label} cannot fit u64. Fix: shard io_uring GPU ingest buffers before submission."
679        ))
680    })
681}
682
683fn slot_byte_offset(slot_idx: usize, slot_bytes: usize) -> Result<u64, PipelineError> {
684    let offset = vyre_driver::accounting::checked_mul_usize_lazy(slot_idx, slot_bytes, || {
685        PipelineError::Backend(
686            "io_uring ingest slot byte offset overflowed usize. Fix: shard mapped ingest slots."
687                .to_string(),
688        )
689    })?;
690    usize_to_u64(offset, "io_uring ingest slot byte offset")
691}
692
693fn reserve_ingest_vec_capacity<T>(
694    vec: &mut Vec<T>,
695    capacity: usize,
696    field: &'static str,
697) -> Result<(), PipelineError> {
698    if vec.capacity() >= capacity {
699        return Ok(());
700    }
701    vec.try_reserve_exact(capacity - vec.capacity())
702        .map_err(|error| {
703            PipelineError::Backend(format!(
704                "io_uring GPU ingest failed to reserve {field} for {capacity} entries: {error}. Fix: reduce ingest slot fan-out or shard the ingest batch."
705            ))
706        })
707}
708
709fn partition_slot_bytes(total_len: usize, slot_count: usize) -> Result<usize, PipelineError> {
710    if slot_count == 0 {
711        return Err(PipelineError::QueueFull {
712            queue: "submission",
713            fix: "NvmeGpuIngestDriver requires at least one slot",
714        });
715    }
716    let slot_bytes = total_len / slot_count;
717    if slot_bytes == 0 {
718        return Err(PipelineError::QueueFull {
719            queue: "submission",
720            fix: "mapped staging buffer is too small to partition into the requested slot count",
721        });
722    }
723    if total_len % slot_count != 0 {
724        return Err(PipelineError::QueueFull {
725            queue: "submission",
726            fix: "mapped staging buffer length must divide evenly by slot_count so every byte belongs to exactly one DMA slot",
727        });
728    }
729    Ok(slot_bytes)
730}
731
732#[cfg(test)]
733mod tests {
734    use super::*;
735
736    #[test]
737    fn partition_slot_bytes_accepts_exact_slot_geometry() {
738        assert_eq!(partition_slot_bytes(4096 * 8, 8).unwrap(), 4096);
739    }
740
741    #[test]
742    fn partition_slot_bytes_rejects_zero_slots() {
743        let error = partition_slot_bytes(4096, 0).expect_err("zero slots must fail");
744        assert!(matches!(error, PipelineError::QueueFull { .. }));
745    }
746
747    #[test]
748    fn partition_slot_bytes_rejects_remainder_bytes() {
749        let error = partition_slot_bytes(4097, 4)
750            .expect_err("remainder bytes create unreachable DMA capacity");
751        assert!(matches!(error, PipelineError::QueueFull { .. }));
752    }
753
754    #[test]
755    fn partition_slot_bytes_rejects_zero_byte_slots() {
756        let error = partition_slot_bytes(3, 4).expect_err("zero-byte DMA slots must fail");
757        assert!(matches!(error, PipelineError::QueueFull { .. }));
758    }
759
760    #[test]
761    fn ingest_telemetry_tracks_zero_cpu_bounce_registered_reads() {
762        let mut telemetry = NvmeGpuIngestTelemetry::default();
763        telemetry
764            .record_submit(NativeReadPath::RegisteredMappedRead, 4096)
765            .expect("Fix: telemetry submit accounting must fit.");
766        telemetry
767            .record_complete(4096)
768            .expect("Fix: telemetry completion accounting must fit.");
769
770        assert_eq!(telemetry.submitted_bytes, 4096);
771        assert_eq!(telemetry.completed_bytes, 4096);
772        assert_eq!(telemetry.submitted_reads, 1);
773        assert_eq!(telemetry.completed_reads, 1);
774        assert_eq!(telemetry.registered_mapped_read_submissions, 1);
775        assert_eq!(telemetry.gpudirect_nvme_submissions, 0);
776        assert_eq!(telemetry.cpu_bounce_bytes, 0);
777        assert_eq!(telemetry.inflight_reads(), 0);
778    }
779
780    #[test]
781    fn ingest_telemetry_tracks_zero_cpu_bounce_gpudirect_reads() {
782        let mut telemetry = NvmeGpuIngestTelemetry::default();
783        telemetry
784            .record_submit(NativeReadPath::GpuDirectNvmePassthrough, 8192)
785            .expect("Fix: telemetry submit accounting must fit.");
786        telemetry
787            .record_failed_completion()
788            .expect("Fix: telemetry failure accounting must fit.");
789
790        assert_eq!(telemetry.submitted_bytes, 8192);
791        assert_eq!(telemetry.completed_bytes, 0);
792        assert_eq!(telemetry.gpudirect_nvme_submissions, 1);
793        assert_eq!(telemetry.registered_mapped_read_submissions, 0);
794        assert_eq!(telemetry.failed_completions, 1);
795        assert_eq!(telemetry.cpu_bounce_bytes, 0);
796        assert_eq!(telemetry.inflight_reads(), 0);
797    }
798
799    #[test]
800    fn ingest_telemetry_reports_overflow_instead_of_wrapping() {
801        let error = checked_telemetry_add(u64::MAX, 1, "test counter")
802            .expect_err("Fix: telemetry counters must fail before wrapping.");
803        assert!(
804            error.to_string().contains("overflowed u64"),
805            "Fix: telemetry overflow errors must be actionable: {error}"
806        );
807    }
808
809    #[test]
810    fn ingest_staging_reservation_reports_capacity_overflow() {
811        let mut bytes = Vec::<u8>::new();
812        let error = reserve_ingest_vec_capacity(&mut bytes, usize::MAX, "test ingest bytes")
813            .expect_err("Fix: impossible ingest staging capacity must be a typed error.");
814
815        assert!(
816            error
817                .to_string()
818                .contains("failed to reserve test ingest bytes"),
819            "Fix: ingest staging reserve failure must name the failed field: {error}"
820        );
821    }
822
823    #[test]
824    fn ingest_staging_reservation_reuses_existing_capacity() {
825        let mut bytes = Vec::<u8>::with_capacity(8);
826        let original_capacity = bytes.capacity();
827
828        reserve_ingest_vec_capacity(&mut bytes, 4, "test ingest bytes")
829            .expect("Fix: lower target capacity should reuse existing staging.");
830
831        assert_eq!(bytes.capacity(), original_capacity);
832    }
833}
834