1use std::fs::File;
4use std::os::fd::AsRawFd;
5use std::path::Path;
6
7use crate::megakernel::MegakernelIoQueue;
8use crate::PipelineError;
9
10#[cfg(feature = "uring-cmd-nvme")]
11use super::gpudirect::encode_nvme_read_sqe;
12use super::gpudirect::GpuDirectCapability;
13use super::stream::{AsyncUringStream, GpuMappedBuffer, Iovec};
14
15#[derive(Debug)]
16struct PendingIngest {
17 _file: Option<File>,
18 tag: u32,
19 completion: PendingCompletion,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23#[cfg_attr(not(feature = "uring-cmd-nvme"), allow(dead_code))]
24enum PendingCompletion {
25 ByteCountFromCqe,
26 NativeNvmeStatus { expected_byte_count: u32 },
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct CompletedIngest {
32 pub slot: u32,
34 pub byte_count: u32,
36 pub tag: u32,
38}
39
40#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
47pub struct NvmeGpuIngestTelemetry {
48 pub submitted_bytes: u64,
50 pub completed_bytes: u64,
52 pub submitted_reads: u64,
54 pub completed_reads: u64,
56 pub registered_mapped_read_submissions: u64,
58 pub gpudirect_nvme_submissions: u64,
60 pub cpu_bounce_bytes: u64,
62 pub failed_completions: u64,
64}
65
66impl NvmeGpuIngestTelemetry {
67 #[must_use]
69 pub fn inflight_reads(self) -> u64 {
70 self.submitted_reads
71 .saturating_sub(self.completed_reads)
72 .saturating_sub(self.failed_completions)
73 }
74
75 #[must_use]
77 pub fn path_submissions(self, path: NativeReadPath) -> u64 {
78 match path {
79 NativeReadPath::RegisteredMappedRead => self.registered_mapped_read_submissions,
80 NativeReadPath::GpuDirectNvmePassthrough => self.gpudirect_nvme_submissions,
81 }
82 }
83
84 pub fn validate_completed_zero_copy(self, path: NativeReadPath) -> Result<(), PipelineError> {
97 if self.cpu_bounce_bytes != 0 {
98 return Err(PipelineError::Backend(format!(
99 "NVMe GPU ingest copied {} bytes through a CPU bounce buffer. Fix: route reads through registered GPU-visible slots or native GPUDirect NVMe passthrough.",
100 self.cpu_bounce_bytes
101 )));
102 }
103 if self.failed_completions != 0 {
104 return Err(PipelineError::Backend(format!(
105 "NVMe GPU ingest reported {} failed completions. Fix: inspect CQE status before publishing slots to the megakernel IO queue.",
106 self.failed_completions
107 )));
108 }
109 let inflight = self.inflight_reads();
110 if inflight != 0 {
111 return Err(PipelineError::Backend(format!(
112 "NVMe GPU ingest left {inflight} reads inflight. Fix: drain completions before taking release telemetry snapshots."
113 )));
114 }
115 if self.submitted_bytes != self.completed_bytes {
116 return Err(PipelineError::Backend(format!(
117 "NVMe GPU ingest byte accounting mismatch: submitted={}, completed={}. Fix: account CQE byte counts exactly once.",
118 self.submitted_bytes, self.completed_bytes
119 )));
120 }
121 if self.submitted_reads != self.completed_reads {
122 return Err(PipelineError::Backend(format!(
123 "NVMe GPU ingest read accounting mismatch: submitted={}, completed={}. Fix: account every terminal CQE exactly once.",
124 self.submitted_reads, self.completed_reads
125 )));
126 }
127 let selected_path_submissions = self.path_submissions(path);
128 if selected_path_submissions != self.submitted_reads {
129 return Err(PipelineError::Backend(format!(
130 "NVMe GPU ingest path submission mismatch for {path:?}: path_submissions={}, submitted_reads={}. Fix: construct the driver with the same native read path used by the benchmark.",
131 selected_path_submissions, self.submitted_reads
132 )));
133 }
134 let mixed_path_submissions = match path {
135 NativeReadPath::RegisteredMappedRead => self.gpudirect_nvme_submissions,
136 NativeReadPath::GpuDirectNvmePassthrough => self.registered_mapped_read_submissions,
137 };
138 if mixed_path_submissions != 0 {
139 return Err(PipelineError::Backend(format!(
140 "NVMe GPU ingest mixed {mixed_path_submissions} submissions from the non-selected path into {path:?}. Fix: keep registered mapped reads and native GPUDirect passthrough telemetry separate."
141 )));
142 }
143 Ok(())
144 }
145
146 fn record_submit(
147 &mut self,
148 path: NativeReadPath,
149 byte_count: u32,
150 ) -> Result<(), PipelineError> {
151 self.submitted_bytes = checked_telemetry_add(
152 self.submitted_bytes,
153 u64::from(byte_count),
154 "submitted bytes",
155 )?;
156 self.submitted_reads = checked_telemetry_add(self.submitted_reads, 1, "submitted reads")?;
157 match path {
158 NativeReadPath::RegisteredMappedRead => {
159 self.registered_mapped_read_submissions = checked_telemetry_add(
160 self.registered_mapped_read_submissions,
161 1,
162 "registered mapped read submissions",
163 )?;
164 }
165 NativeReadPath::GpuDirectNvmePassthrough => {
166 self.gpudirect_nvme_submissions = checked_telemetry_add(
167 self.gpudirect_nvme_submissions,
168 1,
169 "GPUDirect NVMe submissions",
170 )?;
171 }
172 }
173 Ok(())
174 }
175
176 fn record_complete(&mut self, byte_count: u32) -> Result<(), PipelineError> {
177 self.completed_bytes = checked_telemetry_add(
178 self.completed_bytes,
179 u64::from(byte_count),
180 "completed bytes",
181 )?;
182 self.completed_reads = checked_telemetry_add(self.completed_reads, 1, "completed reads")?;
183 Ok(())
184 }
185
186 fn record_failed_completion(&mut self) -> Result<(), PipelineError> {
187 self.failed_completions =
188 checked_telemetry_add(self.failed_completions, 1, "failed completions")?;
189 Ok(())
190 }
191}
192
193#[derive(Debug, Clone, Copy, PartialEq, Eq)]
195pub enum NativeReadPath {
196 RegisteredMappedRead,
202 GpuDirectNvmePassthrough,
208}
209
210pub struct NvmeGpuIngestDriver<'a> {
212 stream: AsyncUringStream<'a>,
213 mapped_slots: Vec<GpuMappedBuffer<'a>>,
214 registered_iovecs: Vec<Iovec>,
215 megakernel_io_queue: MegakernelIoQueue,
216 pending: Vec<Option<PendingIngest>>,
217 slot_bytes: usize,
218 read_path: NativeReadPath,
219 telemetry: NvmeGpuIngestTelemetry,
220}
221
222impl<'a> NvmeGpuIngestDriver<'a> {
223 pub fn new(
232 stream: AsyncUringStream<'a>,
233 slot_count: u32,
234 megakernel_io_queue: MegakernelIoQueue,
235 ) -> Result<Self, PipelineError> {
236 Self::new_with_path(
237 stream,
238 slot_count,
239 megakernel_io_queue,
240 NativeReadPath::RegisteredMappedRead,
241 )
242 }
243
244 pub fn new_gpudirect(
257 stream: AsyncUringStream<'a>,
258 slot_count: u32,
259 megakernel_io_queue: MegakernelIoQueue,
260 ) -> Result<Self, PipelineError> {
261 match GpuDirectCapability::probe() {
262 GpuDirectCapability::Available { .. } => Self::new_with_path(
263 stream,
264 slot_count,
265 megakernel_io_queue,
266 NativeReadPath::GpuDirectNvmePassthrough,
267 ),
268 GpuDirectCapability::FeatureDisabled => Err(PipelineError::NvmePassthroughDisabled),
269 GpuDirectCapability::Unavailable { reason } => Err(PipelineError::Backend(format!(
270 "GPUDirect native read unavailable: {reason}. Fix: install/enable nvidia-fs, use a BAR1-backed GpuMappedBuffer, or use NvmeGpuIngestDriver::new for registered mapped reads."
271 ))),
272 }
273 }
274
275 fn new_with_path(
276 stream: AsyncUringStream<'a>,
277 slot_count: u32,
278 megakernel_io_queue: MegakernelIoQueue,
279 read_path: NativeReadPath,
280 ) -> Result<Self, PipelineError> {
281 let total_len = stream.gpu_buffer.len();
282 let slot_count_usize =
283 usize::try_from(slot_count).map_err(|_| PipelineError::QueueFull {
284 queue: "submission",
285 fix: "slot_count does not fit host usize; reduce the ingest slot count",
286 })?;
287 let slot_bytes = partition_slot_bytes(total_len, slot_count_usize)?;
288
289 let mut mapped_slots = Vec::new();
290 let mut registered_iovecs = Vec::new();
291 let mut pending = Vec::new();
292 reserve_ingest_vec_capacity(
293 &mut mapped_slots,
294 slot_count_usize,
295 "mapped GPU ingest slots",
296 )?;
297 reserve_ingest_vec_capacity(
298 &mut registered_iovecs,
299 slot_count_usize,
300 "registered io_uring iovecs",
301 )?;
302 reserve_ingest_vec_capacity(&mut pending, slot_count_usize, "pending ingest slots")?;
303 for slot in 0..slot_count_usize {
304 let offset = slot * slot_bytes;
305 let slot_buffer = stream.gpu_buffer.sub_region(offset, slot_bytes)?;
306 registered_iovecs.push(Iovec {
307 iov_base: slot_buffer.as_ptr().cast(),
308 iov_len: slot_buffer.len(),
309 });
310 mapped_slots.push(slot_buffer);
311 }
312 pending.resize_with(slot_count_usize, || None);
313 Ok(Self {
314 stream,
315 mapped_slots,
316 registered_iovecs,
317 megakernel_io_queue,
318 pending,
319 slot_bytes,
320 read_path,
321 telemetry: NvmeGpuIngestTelemetry::default(),
322 })
323 }
324
325 pub fn submit_file(&mut self, path: &Path, slot: u32) -> Result<(), PipelineError> {
332 let slot_usize = self.validate_slot_for_submit(slot)?;
333
334 let file = File::open(path).map_err(|error| {
335 PipelineError::Backend(format!("open {} failed: {error}", path.display()))
336 })?;
337 let file_len = file
338 .metadata()
339 .map_err(|error| {
340 PipelineError::Backend(format!("stat {} failed: {error}", path.display()))
341 })?
342 .len();
343 let slot_bytes_u64 = usize_to_u64(self.slot_bytes, "ingest slot byte length")?;
344 if file_len > slot_bytes_u64 {
345 return Err(PipelineError::QueueFull {
346 queue: "submission",
347 fix: "file exceeds the configured ingest slot size; enlarge the mapped staging buffer or segment the file",
348 });
349 }
350
351 let byte_count = u32::try_from(file_len).map_err(|_| PipelineError::QueueFull {
352 queue: "submission",
353 fix: "file length exceeds u32 read size even though it fit the slot; split the ingest file",
354 })?;
355 let target_offset = slot_byte_offset(slot_usize, self.slot_bytes)?;
356 let slot_iovec = &mut self.registered_iovecs[slot_usize..slot_usize + 1];
357 unsafe {
359 self.stream.submit_read_to_gpu_at(
360 file.as_raw_fd(),
361 0,
362 byte_count,
363 target_offset,
364 slot_iovec,
365 )?;
366 }
367 self.telemetry
368 .record_submit(NativeReadPath::RegisteredMappedRead, byte_count)?;
369 self.pending[slot_usize] = Some(PendingIngest {
370 _file: Some(file),
371 tag: slot,
372 completion: PendingCompletion::ByteCountFromCqe,
373 });
374 Ok(())
375 }
376
377 #[cfg(feature = "uring-cmd-nvme")]
397 #[allow(clippy::too_many_arguments)]
398 pub unsafe fn submit_native_nvme_read(
399 &mut self,
400 nvme_fd: i32,
401 namespace_id: u32,
402 starting_lba: u64,
403 blocks: u32,
404 bytes_per_block: u32,
405 slot: u32,
406 ) -> Result<(), PipelineError> {
407 if self.read_path != NativeReadPath::GpuDirectNvmePassthrough {
408 return Err(PipelineError::Backend(
409 "native NVMe read submitted on a registered-mapped-read driver. Fix: construct with NvmeGpuIngestDriver::new_gpudirect and a BAR1-backed GpuMappedBuffer.".to_string(),
410 ));
411 }
412 let slot_usize = self.validate_slot_for_submit(slot)?;
413 if blocks == 0 || bytes_per_block == 0 {
414 return Err(PipelineError::QueueFull {
415 queue: "submission",
416 fix: "native NVMe reads require non-zero block count and bytes_per_block",
417 });
418 }
419 let byte_count = vyre_driver::accounting::checked_mul_u32_value(
420 blocks,
421 bytes_per_block,
422 PipelineError::QueueFull {
423 queue: "submission",
424 fix: "native NVMe read byte count overflowed u32; submit a smaller range",
425 },
426 )?;
427 let byte_count_usize =
428 usize::try_from(byte_count).map_err(|_| PipelineError::QueueFull {
429 queue: "submission",
430 fix: "native NVMe read byte count cannot fit host usize; submit a smaller range",
431 })?;
432 if byte_count_usize > self.slot_bytes {
433 return Err(PipelineError::QueueFull {
434 queue: "submission",
435 fix: "native NVMe read exceeds the configured ingest slot size; enlarge the BAR1 mapped slot or submit fewer blocks",
436 });
437 }
438
439 let dest = usize_to_u64(
440 self.mapped_slots[slot_usize].as_ptr().addr(),
441 "mapped BAR1 destination pointer",
442 )?;
443 let sqe = encode_nvme_read_sqe(namespace_id, starting_lba, blocks, dest);
444 let user_data = slot_byte_offset(slot_usize, self.slot_bytes)?;
445 unsafe {
448 self.stream
449 .submit_nvme_passthrough(nvme_fd, user_data, &sqe)?;
450 }
451 self.telemetry
452 .record_submit(NativeReadPath::GpuDirectNvmePassthrough, byte_count)?;
453 self.pending[slot_usize] = Some(PendingIngest {
454 _file: None,
455 tag: slot,
456 completion: PendingCompletion::NativeNvmeStatus {
457 expected_byte_count: byte_count,
458 },
459 });
460 Ok(())
461 }
462
463 #[cfg(not(feature = "uring-cmd-nvme"))]
474 #[allow(clippy::too_many_arguments)]
475 pub unsafe fn submit_native_nvme_read(
476 &mut self,
477 _nvme_fd: i32,
478 _namespace_id: u32,
479 _starting_lba: u64,
480 _blocks: u32,
481 _bytes_per_block: u32,
482 _slot: u32,
483 ) -> Result<(), PipelineError> {
484 Err(PipelineError::NvmePassthroughDisabled)
485 }
486
487 pub fn poll_completions(&mut self) -> Result<Vec<CompletedIngest>, PipelineError> {
490 let mut completed = Vec::new();
491 self.poll_completions_into(&mut completed)?;
492 Ok(completed)
493 }
494
495 pub fn poll_completions_into(
501 &mut self,
502 completed: &mut Vec<CompletedIngest>,
503 ) -> Result<(), PipelineError> {
504 completed.clear();
505 self.stream.flush_submissions()?;
506 let inflight_capacity =
507 usize::try_from(self.stream.inflight).map_err(|_| PipelineError::Backend(
508 "io_uring inflight completion count cannot fit host usize. Fix: shard ingest submissions before polling completions."
509 .to_string(),
510 ))?;
511 reserve_ingest_vec_capacity(completed, inflight_capacity, "completed ingest records")?;
512 let mut first_error: Option<PipelineError> = None;
513
514 while let Some(cqe) = self.stream.ring_state.peek_cqe() {
515 let res = cqe.res;
516 if self.slot_bytes == 0 {
517 return Err(PipelineError::Backend(
518 "io_uring ingest driver has zero slot_bytes. Fix: construct NvmeGpuIngestDriver with at least one non-empty mapped slot.".to_string(),
519 ));
520 }
521 let user_data = usize::try_from(cqe.user_data).map_err(|_| {
522 PipelineError::Backend(format!(
523 "io_uring CQE user_data {} does not fit host usize. Fix: keep slot byte offsets within host addressable range.",
524 cqe.user_data
525 ))
526 })?;
527 let slot = user_data / self.slot_bytes;
528 self.stream.ring_state.advance_cq();
529 self.stream.inflight = self.stream.inflight.checked_sub(1).ok_or_else(|| {
530 PipelineError::Backend(
531 "io_uring completion arrived with zero inflight submissions. Fix: audit submit/completion accounting before reusing this stream.".to_string(),
532 )
533 })?;
534
535 let pending = self.pending.get_mut(slot).and_then(Option::take);
536 if res < 0 {
537 self.telemetry.record_failed_completion()?;
538 if first_error.is_none() {
539 first_error = Some(PipelineError::IoUringSyscall {
540 syscall: "io_uring_cqe",
541 errno: -res,
542 fix: "inspect the offending file descriptor and slot metadata; common causes are EIO on disk or EFAULT on an invalid registered buffer",
543 });
544 }
545 continue;
546 }
547
548 let pending = match pending {
549 Some(pending) => pending,
550 None => {
551 self.telemetry.record_failed_completion()?;
552 if first_error.is_none() {
553 first_error = Some(PipelineError::Backend(format!(
554 "CQE for slot {slot} arrived without matching pending metadata"
555 )));
556 }
557 continue;
558 }
559 };
560 let byte_count = match pending.completion {
561 PendingCompletion::ByteCountFromCqe => {
562 u32::try_from(res).map_err(|_| PipelineError::Backend(format!(
563 "io_uring CQE byte count {res} cannot fit u32. Fix: split ingest reads so completions stay within the megakernel io_queue ABI."
564 )))?
565 }
566 PendingCompletion::NativeNvmeStatus {
567 expected_byte_count,
568 } => {
569 if res != 0 {
570 self.telemetry.record_failed_completion()?;
571 if first_error.is_none() {
572 first_error = Some(PipelineError::Backend(format!(
573 "NVMe passthrough completion for slot {slot} returned non-zero status {res}. Fix: inspect namespace id, LBA range, permissions, and nvidia-fs state."
574 )));
575 }
576 continue;
577 }
578 expected_byte_count
579 }
580 };
581 let slot_u32 = u32::try_from(slot).map_err(|_| PipelineError::Backend(format!(
582 "io_uring completion slot {slot} cannot fit u32. Fix: shard ingest slots before publishing to the megakernel io_queue."
583 )))?;
584 self.megakernel_io_queue
585 .publish_slot(slot_u32, slot_u32, byte_count, pending.tag)?;
586 self.telemetry.record_complete(byte_count)?;
587 completed.push(CompletedIngest {
588 slot: slot_u32,
589 byte_count,
590 tag: pending.tag,
591 });
592 }
593
594 match first_error {
595 Some(err) => Err(err),
596 None => Ok(()),
597 }
598 }
599
600 #[must_use]
602 pub fn megakernel_io_queue(&self) -> &MegakernelIoQueue {
603 &self.megakernel_io_queue
604 }
605
606 #[must_use]
608 pub fn megakernel_io_queue_mut(&mut self) -> &mut MegakernelIoQueue {
609 &mut self.megakernel_io_queue
610 }
611
612 #[must_use]
614 pub fn slot_bytes(&self) -> usize {
615 self.slot_bytes
616 }
617
618 #[must_use]
620 pub fn slot_count(&self) -> usize {
621 self.registered_iovecs.len()
622 }
623
624 #[must_use]
626 pub fn read_path(&self) -> NativeReadPath {
627 self.read_path
628 }
629
630 #[must_use]
632 pub fn telemetry_snapshot(&self) -> NvmeGpuIngestTelemetry {
633 self.telemetry
634 }
635
636 pub fn reset_telemetry(&mut self) {
638 self.telemetry = NvmeGpuIngestTelemetry::default();
639 }
640
641 fn validate_slot_for_submit(&self, slot: u32) -> Result<usize, PipelineError> {
642 let slot_usize = usize::try_from(slot).map_err(|_| PipelineError::QueueFull {
643 queue: "submission",
644 fix: "slot index cannot fit host usize; shard mapped ingest slots",
645 })?;
646 if slot_usize >= self.mapped_slots.len() {
647 return Err(PipelineError::QueueFull {
648 queue: "submission",
649 fix: "slot exceeds the configured mapped-slot count",
650 });
651 }
652 if self.pending[slot_usize].is_some() {
653 return Err(PipelineError::QueueFull {
654 queue: "submission",
655 fix: "slot already has an in-flight ingest; drain completions before reusing it",
656 });
657 }
658 Ok(slot_usize)
659 }
660}
661
662
663fn checked_telemetry_add(
664 current: u64,
665 increment: u64,
666 label: &'static str,
667) -> Result<u64, PipelineError> {
668 vyre_driver::accounting::checked_add_u64_lazy(current, increment, || {
669 PipelineError::Backend(format!(
670 "io_uring ingest telemetry {label} overflowed u64. Fix: snapshot and reset telemetry before counters saturate."
671 ))
672 })
673}
674
675fn usize_to_u64(value: usize, label: &'static str) -> Result<u64, PipelineError> {
676 u64::try_from(value).map_err(|_| {
677 PipelineError::Backend(format!(
678 "{label} cannot fit u64. Fix: shard io_uring GPU ingest buffers before submission."
679 ))
680 })
681}
682
683fn slot_byte_offset(slot_idx: usize, slot_bytes: usize) -> Result<u64, PipelineError> {
684 let offset = vyre_driver::accounting::checked_mul_usize_lazy(slot_idx, slot_bytes, || {
685 PipelineError::Backend(
686 "io_uring ingest slot byte offset overflowed usize. Fix: shard mapped ingest slots."
687 .to_string(),
688 )
689 })?;
690 usize_to_u64(offset, "io_uring ingest slot byte offset")
691}
692
693fn reserve_ingest_vec_capacity<T>(
694 vec: &mut Vec<T>,
695 capacity: usize,
696 field: &'static str,
697) -> Result<(), PipelineError> {
698 if vec.capacity() >= capacity {
699 return Ok(());
700 }
701 vec.try_reserve_exact(capacity - vec.capacity())
702 .map_err(|error| {
703 PipelineError::Backend(format!(
704 "io_uring GPU ingest failed to reserve {field} for {capacity} entries: {error}. Fix: reduce ingest slot fan-out or shard the ingest batch."
705 ))
706 })
707}
708
709fn partition_slot_bytes(total_len: usize, slot_count: usize) -> Result<usize, PipelineError> {
710 if slot_count == 0 {
711 return Err(PipelineError::QueueFull {
712 queue: "submission",
713 fix: "NvmeGpuIngestDriver requires at least one slot",
714 });
715 }
716 let slot_bytes = total_len / slot_count;
717 if slot_bytes == 0 {
718 return Err(PipelineError::QueueFull {
719 queue: "submission",
720 fix: "mapped staging buffer is too small to partition into the requested slot count",
721 });
722 }
723 if total_len % slot_count != 0 {
724 return Err(PipelineError::QueueFull {
725 queue: "submission",
726 fix: "mapped staging buffer length must divide evenly by slot_count so every byte belongs to exactly one DMA slot",
727 });
728 }
729 Ok(slot_bytes)
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735
736 #[test]
737 fn partition_slot_bytes_accepts_exact_slot_geometry() {
738 assert_eq!(partition_slot_bytes(4096 * 8, 8).unwrap(), 4096);
739 }
740
741 #[test]
742 fn partition_slot_bytes_rejects_zero_slots() {
743 let error = partition_slot_bytes(4096, 0).expect_err("zero slots must fail");
744 assert!(matches!(error, PipelineError::QueueFull { .. }));
745 }
746
747 #[test]
748 fn partition_slot_bytes_rejects_remainder_bytes() {
749 let error = partition_slot_bytes(4097, 4)
750 .expect_err("remainder bytes create unreachable DMA capacity");
751 assert!(matches!(error, PipelineError::QueueFull { .. }));
752 }
753
754 #[test]
755 fn partition_slot_bytes_rejects_zero_byte_slots() {
756 let error = partition_slot_bytes(3, 4).expect_err("zero-byte DMA slots must fail");
757 assert!(matches!(error, PipelineError::QueueFull { .. }));
758 }
759
760 #[test]
761 fn ingest_telemetry_tracks_zero_cpu_bounce_registered_reads() {
762 let mut telemetry = NvmeGpuIngestTelemetry::default();
763 telemetry
764 .record_submit(NativeReadPath::RegisteredMappedRead, 4096)
765 .expect("Fix: telemetry submit accounting must fit.");
766 telemetry
767 .record_complete(4096)
768 .expect("Fix: telemetry completion accounting must fit.");
769
770 assert_eq!(telemetry.submitted_bytes, 4096);
771 assert_eq!(telemetry.completed_bytes, 4096);
772 assert_eq!(telemetry.submitted_reads, 1);
773 assert_eq!(telemetry.completed_reads, 1);
774 assert_eq!(telemetry.registered_mapped_read_submissions, 1);
775 assert_eq!(telemetry.gpudirect_nvme_submissions, 0);
776 assert_eq!(telemetry.cpu_bounce_bytes, 0);
777 assert_eq!(telemetry.inflight_reads(), 0);
778 }
779
780 #[test]
781 fn ingest_telemetry_tracks_zero_cpu_bounce_gpudirect_reads() {
782 let mut telemetry = NvmeGpuIngestTelemetry::default();
783 telemetry
784 .record_submit(NativeReadPath::GpuDirectNvmePassthrough, 8192)
785 .expect("Fix: telemetry submit accounting must fit.");
786 telemetry
787 .record_failed_completion()
788 .expect("Fix: telemetry failure accounting must fit.");
789
790 assert_eq!(telemetry.submitted_bytes, 8192);
791 assert_eq!(telemetry.completed_bytes, 0);
792 assert_eq!(telemetry.gpudirect_nvme_submissions, 1);
793 assert_eq!(telemetry.registered_mapped_read_submissions, 0);
794 assert_eq!(telemetry.failed_completions, 1);
795 assert_eq!(telemetry.cpu_bounce_bytes, 0);
796 assert_eq!(telemetry.inflight_reads(), 0);
797 }
798
799 #[test]
800 fn ingest_telemetry_reports_overflow_instead_of_wrapping() {
801 let error = checked_telemetry_add(u64::MAX, 1, "test counter")
802 .expect_err("Fix: telemetry counters must fail before wrapping.");
803 assert!(
804 error.to_string().contains("overflowed u64"),
805 "Fix: telemetry overflow errors must be actionable: {error}"
806 );
807 }
808
809 #[test]
810 fn ingest_staging_reservation_reports_capacity_overflow() {
811 let mut bytes = Vec::<u8>::new();
812 let error = reserve_ingest_vec_capacity(&mut bytes, usize::MAX, "test ingest bytes")
813 .expect_err("Fix: impossible ingest staging capacity must be a typed error.");
814
815 assert!(
816 error
817 .to_string()
818 .contains("failed to reserve test ingest bytes"),
819 "Fix: ingest staging reserve failure must name the failed field: {error}"
820 );
821 }
822
823 #[test]
824 fn ingest_staging_reservation_reuses_existing_capacity() {
825 let mut bytes = Vec::<u8>::with_capacity(8);
826 let original_capacity = bytes.capacity();
827
828 reserve_ingest_vec_capacity(&mut bytes, 4, "test ingest bytes")
829 .expect("Fix: lower target capacity should reuse existing staging.");
830
831 assert_eq!(bytes.capacity(), original_capacity);
832 }
833}
834