1use std::fs::File;
4use std::os::fd::AsRawFd;
5use std::path::Path;
6
7use crate::megakernel::MegakernelIoQueue;
8use crate::PipelineError;
9
10#[cfg(feature = "uring-cmd-nvme")]
11use super::gpudirect::encode_nvme_read_sqe;
12use super::gpudirect::GpuDirectCapability;
13use super::stream::{AsyncUringStream, GpuMappedBuffer, Iovec};
14
15#[derive(Debug)]
16struct PendingIngest {
17 _file: Option<File>,
18 tag: u32,
19 completion: PendingCompletion,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23#[cfg_attr(not(feature = "uring-cmd-nvme"), allow(dead_code))]
24enum PendingCompletion {
25 ByteCountFromCqe,
26 NativeNvmeStatus { expected_byte_count: u32 },
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub struct CompletedIngest {
32 pub slot: u32,
34 pub byte_count: u32,
36 pub tag: u32,
38}
39
40#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
47pub struct NvmeGpuIngestTelemetry {
48 pub submitted_bytes: u64,
50 pub completed_bytes: u64,
52 pub submitted_reads: u64,
54 pub completed_reads: u64,
56 pub registered_mapped_read_submissions: u64,
58 pub gpudirect_nvme_submissions: u64,
60 pub cpu_bounce_bytes: u64,
62 pub failed_completions: u64,
64}
65
66impl NvmeGpuIngestTelemetry {
67 #[must_use]
69 pub fn inflight_reads(self) -> u64 {
70 self.submitted_reads
71 .saturating_sub(self.completed_reads)
72 .saturating_sub(self.failed_completions)
73 }
74
75 #[must_use]
77 pub fn path_submissions(self, path: NativeReadPath) -> u64 {
78 match path {
79 NativeReadPath::RegisteredMappedRead => self.registered_mapped_read_submissions,
80 NativeReadPath::GpuDirectNvmePassthrough => self.gpudirect_nvme_submissions,
81 }
82 }
83
84 pub fn validate_completed_zero_copy(self, path: NativeReadPath) -> Result<(), PipelineError> {
97 if self.cpu_bounce_bytes != 0 {
98 return Err(PipelineError::Backend(format!(
99 "NVMe GPU ingest copied {} bytes through a CPU bounce buffer. Fix: route reads through registered GPU-visible slots or native GPUDirect NVMe passthrough.",
100 self.cpu_bounce_bytes
101 )));
102 }
103 if self.failed_completions != 0 {
104 return Err(PipelineError::Backend(format!(
105 "NVMe GPU ingest reported {} failed completions. Fix: inspect CQE status before publishing slots to the megakernel IO queue.",
106 self.failed_completions
107 )));
108 }
109 let inflight = self.inflight_reads();
110 if inflight != 0 {
111 return Err(PipelineError::Backend(format!(
112 "NVMe GPU ingest left {inflight} reads inflight. Fix: drain completions before taking release telemetry snapshots."
113 )));
114 }
115 if self.submitted_bytes != self.completed_bytes {
116 return Err(PipelineError::Backend(format!(
117 "NVMe GPU ingest byte accounting mismatch: submitted={}, completed={}. Fix: account CQE byte counts exactly once.",
118 self.submitted_bytes, self.completed_bytes
119 )));
120 }
121 if self.submitted_reads != self.completed_reads {
122 return Err(PipelineError::Backend(format!(
123 "NVMe GPU ingest read accounting mismatch: submitted={}, completed={}. Fix: account every terminal CQE exactly once.",
124 self.submitted_reads, self.completed_reads
125 )));
126 }
127 let selected_path_submissions = self.path_submissions(path);
128 if selected_path_submissions != self.submitted_reads {
129 return Err(PipelineError::Backend(format!(
130 "NVMe GPU ingest path submission mismatch for {path:?}: path_submissions={}, submitted_reads={}. Fix: construct the driver with the same native read path used by the benchmark.",
131 selected_path_submissions, self.submitted_reads
132 )));
133 }
134 let mixed_path_submissions = match path {
135 NativeReadPath::RegisteredMappedRead => self.gpudirect_nvme_submissions,
136 NativeReadPath::GpuDirectNvmePassthrough => self.registered_mapped_read_submissions,
137 };
138 if mixed_path_submissions != 0 {
139 return Err(PipelineError::Backend(format!(
140 "NVMe GPU ingest mixed {mixed_path_submissions} submissions from the non-selected path into {path:?}. Fix: keep registered mapped reads and native GPUDirect passthrough telemetry separate."
141 )));
142 }
143 Ok(())
144 }
145
146 fn record_submit(
147 &mut self,
148 path: NativeReadPath,
149 byte_count: u32,
150 ) -> Result<(), PipelineError> {
151 self.submitted_bytes = checked_telemetry_add(
152 self.submitted_bytes,
153 u64::from(byte_count),
154 "submitted bytes",
155 )?;
156 self.submitted_reads = checked_telemetry_add(self.submitted_reads, 1, "submitted reads")?;
157 match path {
158 NativeReadPath::RegisteredMappedRead => {
159 self.registered_mapped_read_submissions = checked_telemetry_add(
160 self.registered_mapped_read_submissions,
161 1,
162 "registered mapped read submissions",
163 )?;
164 }
165 NativeReadPath::GpuDirectNvmePassthrough => {
166 self.gpudirect_nvme_submissions = checked_telemetry_add(
167 self.gpudirect_nvme_submissions,
168 1,
169 "GPUDirect NVMe submissions",
170 )?;
171 }
172 }
173 Ok(())
174 }
175
176 fn record_complete(&mut self, byte_count: u32) -> Result<(), PipelineError> {
177 self.completed_bytes = checked_telemetry_add(
178 self.completed_bytes,
179 u64::from(byte_count),
180 "completed bytes",
181 )?;
182 self.completed_reads = checked_telemetry_add(self.completed_reads, 1, "completed reads")?;
183 Ok(())
184 }
185
186 fn record_failed_completion(&mut self) -> Result<(), PipelineError> {
187 self.failed_completions =
188 checked_telemetry_add(self.failed_completions, 1, "failed completions")?;
189 Ok(())
190 }
191}
192
193#[derive(Debug, Clone, Copy, PartialEq, Eq)]
195pub enum NativeReadPath {
196 RegisteredMappedRead,
202 GpuDirectNvmePassthrough,
208}
209
210pub struct NvmeGpuIngestDriver<'a> {
212 stream: AsyncUringStream<'a>,
213 mapped_slots: Vec<GpuMappedBuffer<'a>>,
214 registered_iovecs: Vec<Iovec>,
215 megakernel_io_queue: MegakernelIoQueue,
216 pending: Vec<Option<PendingIngest>>,
217 slot_bytes: usize,
218 read_path: NativeReadPath,
219 telemetry: NvmeGpuIngestTelemetry,
220}
221
222impl<'a> NvmeGpuIngestDriver<'a> {
223 pub fn new(
232 stream: AsyncUringStream<'a>,
233 slot_count: u32,
234 megakernel_io_queue: MegakernelIoQueue,
235 ) -> Result<Self, PipelineError> {
236 Self::new_with_path(
237 stream,
238 slot_count,
239 megakernel_io_queue,
240 NativeReadPath::RegisteredMappedRead,
241 )
242 }
243
244 pub fn new_gpudirect(
257 stream: AsyncUringStream<'a>,
258 slot_count: u32,
259 megakernel_io_queue: MegakernelIoQueue,
260 ) -> Result<Self, PipelineError> {
261 match GpuDirectCapability::probe() {
262 GpuDirectCapability::Available { .. } => Self::new_with_path(
263 stream,
264 slot_count,
265 megakernel_io_queue,
266 NativeReadPath::GpuDirectNvmePassthrough,
267 ),
268 GpuDirectCapability::FeatureDisabled => Err(PipelineError::NvmePassthroughDisabled),
269 GpuDirectCapability::Unavailable { reason } => Err(PipelineError::Backend(format!(
270 "GPUDirect native read unavailable: {reason}. Fix: install/enable nvidia-fs, use a BAR1-backed GpuMappedBuffer, or use NvmeGpuIngestDriver::new for registered mapped reads."
271 ))),
272 }
273 }
274
275 fn new_with_path(
276 stream: AsyncUringStream<'a>,
277 slot_count: u32,
278 megakernel_io_queue: MegakernelIoQueue,
279 read_path: NativeReadPath,
280 ) -> Result<Self, PipelineError> {
281 let total_len = stream.gpu_buffer.len();
282 let slot_count_usize =
283 usize::try_from(slot_count).map_err(|_| PipelineError::QueueFull {
284 queue: "submission",
285 fix: "slot_count does not fit host usize; reduce the ingest slot count",
286 })?;
287 let slot_bytes = partition_slot_bytes(total_len, slot_count_usize)?;
288
289 let mut mapped_slots = Vec::new();
290 let mut registered_iovecs = Vec::new();
291 let mut pending = Vec::new();
292 reserve_ingest_vec_capacity(
293 &mut mapped_slots,
294 slot_count_usize,
295 "mapped GPU ingest slots",
296 )?;
297 reserve_ingest_vec_capacity(
298 &mut registered_iovecs,
299 slot_count_usize,
300 "registered io_uring iovecs",
301 )?;
302 reserve_ingest_vec_capacity(&mut pending, slot_count_usize, "pending ingest slots")?;
303 for slot in 0..slot_count_usize {
304 let offset = slot * slot_bytes;
305 let slot_buffer = stream.gpu_buffer.sub_region(offset, slot_bytes)?;
306 registered_iovecs.push(Iovec {
307 iov_base: slot_buffer.as_ptr().cast(),
308 iov_len: slot_buffer.len(),
309 });
310 mapped_slots.push(slot_buffer);
311 }
312 pending.resize_with(slot_count_usize, || None);
313 Ok(Self {
314 stream,
315 mapped_slots,
316 registered_iovecs,
317 megakernel_io_queue,
318 pending,
319 slot_bytes,
320 read_path,
321 telemetry: NvmeGpuIngestTelemetry::default(),
322 })
323 }
324
325 pub fn submit_file(&mut self, path: &Path, slot: u32) -> Result<(), PipelineError> {
332 let slot_usize = self.validate_slot_for_submit(slot)?;
333
334 let file = File::open(path).map_err(|error| {
335 PipelineError::Backend(format!("open {} failed: {error}", path.display()))
336 })?;
337 let file_len = file
338 .metadata()
339 .map_err(|error| {
340 PipelineError::Backend(format!("stat {} failed: {error}", path.display()))
341 })?
342 .len();
343 let slot_bytes_u64 = usize_to_u64(self.slot_bytes, "ingest slot byte length")?;
344 if file_len > slot_bytes_u64 {
345 return Err(PipelineError::QueueFull {
346 queue: "submission",
347 fix: "file exceeds the configured ingest slot size; enlarge the mapped staging buffer or segment the file",
348 });
349 }
350
351 let byte_count = u32::try_from(file_len).map_err(|_| PipelineError::QueueFull {
352 queue: "submission",
353 fix: "file length exceeds u32 read size even though it fit the slot; split the ingest file",
354 })?;
355 let target_offset = slot_byte_offset(slot_usize, self.slot_bytes)?;
356 let slot_iovec = &mut self.registered_iovecs[slot_usize..slot_usize + 1];
357 unsafe {
359 self.stream.submit_read_to_gpu_at(
360 file.as_raw_fd(),
361 0,
362 byte_count,
363 target_offset,
364 slot_iovec,
365 )?;
366 }
367 self.telemetry
368 .record_submit(NativeReadPath::RegisteredMappedRead, byte_count)?;
369 self.pending[slot_usize] = Some(PendingIngest {
370 _file: Some(file),
371 tag: slot,
372 completion: PendingCompletion::ByteCountFromCqe,
373 });
374 Ok(())
375 }
376
377 #[cfg(feature = "uring-cmd-nvme")]
397 #[allow(clippy::too_many_arguments)]
398 pub unsafe fn submit_native_nvme_read(
399 &mut self,
400 nvme_fd: i32,
401 namespace_id: u32,
402 starting_lba: u64,
403 blocks: u32,
404 bytes_per_block: u32,
405 slot: u32,
406 ) -> Result<(), PipelineError> {
407 if self.read_path != NativeReadPath::GpuDirectNvmePassthrough {
408 return Err(PipelineError::Backend(
409 "native NVMe read submitted on a registered-mapped-read driver. Fix: construct with NvmeGpuIngestDriver::new_gpudirect and a BAR1-backed GpuMappedBuffer.".to_string(),
410 ));
411 }
412 let slot_usize = self.validate_slot_for_submit(slot)?;
413 if blocks == 0 || bytes_per_block == 0 {
414 return Err(PipelineError::QueueFull {
415 queue: "submission",
416 fix: "native NVMe reads require non-zero block count and bytes_per_block",
417 });
418 }
419 let byte_count = vyre_driver::accounting::checked_mul_u32_value(
420 blocks,
421 bytes_per_block,
422 PipelineError::QueueFull {
423 queue: "submission",
424 fix: "native NVMe read byte count overflowed u32; submit a smaller range",
425 },
426 )?;
427 let byte_count_usize =
428 usize::try_from(byte_count).map_err(|_| PipelineError::QueueFull {
429 queue: "submission",
430 fix: "native NVMe read byte count cannot fit host usize; submit a smaller range",
431 })?;
432 if byte_count_usize > self.slot_bytes {
433 return Err(PipelineError::QueueFull {
434 queue: "submission",
435 fix: "native NVMe read exceeds the configured ingest slot size; enlarge the BAR1 mapped slot or submit fewer blocks",
436 });
437 }
438
439 let dest = usize_to_u64(
440 self.mapped_slots[slot_usize].as_ptr().addr(),
441 "mapped BAR1 destination pointer",
442 )?;
443 let sqe = encode_nvme_read_sqe(namespace_id, starting_lba, blocks, dest);
444 let user_data = slot_byte_offset(slot_usize, self.slot_bytes)?;
445 unsafe {
448 self.stream
449 .submit_nvme_passthrough(nvme_fd, user_data, &sqe)?;
450 }
451 self.telemetry
452 .record_submit(NativeReadPath::GpuDirectNvmePassthrough, byte_count)?;
453 self.pending[slot_usize] = Some(PendingIngest {
454 _file: None,
455 tag: slot,
456 completion: PendingCompletion::NativeNvmeStatus {
457 expected_byte_count: byte_count,
458 },
459 });
460 Ok(())
461 }
462
463 #[cfg(not(feature = "uring-cmd-nvme"))]
474 #[allow(clippy::too_many_arguments)]
475 pub unsafe fn submit_native_nvme_read(
476 &mut self,
477 _nvme_fd: i32,
478 _namespace_id: u32,
479 _starting_lba: u64,
480 _blocks: u32,
481 _bytes_per_block: u32,
482 _slot: u32,
483 ) -> Result<(), PipelineError> {
484 Err(PipelineError::NvmePassthroughDisabled)
485 }
486
487 pub fn poll_completions(&mut self) -> Result<Vec<CompletedIngest>, PipelineError> {
490 let mut completed = Vec::new();
491 self.poll_completions_into(&mut completed)?;
492 Ok(completed)
493 }
494
495 pub fn poll_completions_into(
501 &mut self,
502 completed: &mut Vec<CompletedIngest>,
503 ) -> Result<(), PipelineError> {
504 completed.clear();
505 self.stream.flush_submissions()?;
506 let inflight_capacity =
507 usize::try_from(self.stream.inflight).map_err(|_| PipelineError::Backend(
508 "io_uring inflight completion count cannot fit host usize. Fix: shard ingest submissions before polling completions."
509 .to_string(),
510 ))?;
511 reserve_ingest_vec_capacity(completed, inflight_capacity, "completed ingest records")?;
512 let mut first_error: Option<PipelineError> = None;
513
514 while let Some(cqe) = self.stream.ring_state.peek_cqe() {
515 let res = cqe.res;
516 if self.slot_bytes == 0 {
517 return Err(PipelineError::Backend(
518 "io_uring ingest driver has zero slot_bytes. Fix: construct NvmeGpuIngestDriver with at least one non-empty mapped slot.".to_string(),
519 ));
520 }
521 let user_data = usize::try_from(cqe.user_data).map_err(|_| {
522 PipelineError::Backend(format!(
523 "io_uring CQE user_data {} does not fit host usize. Fix: keep slot byte offsets within host addressable range.",
524 cqe.user_data
525 ))
526 })?;
527 let slot = user_data / self.slot_bytes;
528 self.stream.ring_state.advance_cq();
529 self.stream.inflight = self.stream.inflight.checked_sub(1).ok_or_else(|| {
530 PipelineError::Backend(
531 "io_uring completion arrived with zero inflight submissions. Fix: audit submit/completion accounting before reusing this stream.".to_string(),
532 )
533 })?;
534
535 let pending = self.pending.get_mut(slot).and_then(Option::take);
536 if res < 0 {
537 self.telemetry.record_failed_completion()?;
538 if first_error.is_none() {
539 first_error = Some(PipelineError::IoUringSyscall {
540 syscall: "io_uring_cqe",
541 errno: -res,
542 fix: "inspect the offending file descriptor and slot metadata; common causes are EIO on disk or EFAULT on an invalid registered buffer",
543 });
544 }
545 continue;
546 }
547
548 let pending = match pending {
549 Some(pending) => pending,
550 None => {
551 self.telemetry.record_failed_completion()?;
552 if first_error.is_none() {
553 first_error = Some(PipelineError::Backend(format!(
554 "CQE for slot {slot} arrived without matching pending metadata"
555 )));
556 }
557 continue;
558 }
559 };
560 let byte_count = match pending.completion {
561 PendingCompletion::ByteCountFromCqe => {
562 u32::try_from(res).map_err(|_| PipelineError::Backend(format!(
563 "io_uring CQE byte count {res} cannot fit u32. Fix: split ingest reads so completions stay within the megakernel io_queue ABI."
564 )))?
565 }
566 PendingCompletion::NativeNvmeStatus {
567 expected_byte_count,
568 } => {
569 if res != 0 {
570 self.telemetry.record_failed_completion()?;
571 if first_error.is_none() {
572 first_error = Some(PipelineError::Backend(format!(
573 "NVMe passthrough completion for slot {slot} returned non-zero status {res}. Fix: inspect namespace id, LBA range, permissions, and nvidia-fs state."
574 )));
575 }
576 continue;
577 }
578 expected_byte_count
579 }
580 };
581 let slot_u32 = u32::try_from(slot).map_err(|_| PipelineError::Backend(format!(
582 "io_uring completion slot {slot} cannot fit u32. Fix: shard ingest slots before publishing to the megakernel io_queue."
583 )))?;
584 self.megakernel_io_queue
585 .publish_slot(slot_u32, slot_u32, byte_count, pending.tag)?;
586 self.telemetry.record_complete(byte_count)?;
587 completed.push(CompletedIngest {
588 slot: slot_u32,
589 byte_count,
590 tag: pending.tag,
591 });
592 }
593
594 match first_error {
595 Some(err) => Err(err),
596 None => Ok(()),
597 }
598 }
599
600 #[must_use]
602 pub fn megakernel_io_queue(&self) -> &MegakernelIoQueue {
603 &self.megakernel_io_queue
604 }
605
606 #[must_use]
608 pub fn megakernel_io_queue_mut(&mut self) -> &mut MegakernelIoQueue {
609 &mut self.megakernel_io_queue
610 }
611
612 #[must_use]
614 pub fn slot_bytes(&self) -> usize {
615 self.slot_bytes
616 }
617
618 #[must_use]
620 pub fn slot_count(&self) -> usize {
621 self.registered_iovecs.len()
622 }
623
624 #[must_use]
626 pub fn read_path(&self) -> NativeReadPath {
627 self.read_path
628 }
629
630 #[must_use]
632 pub fn telemetry_snapshot(&self) -> NvmeGpuIngestTelemetry {
633 self.telemetry
634 }
635
636 pub fn reset_telemetry(&mut self) {
638 self.telemetry = NvmeGpuIngestTelemetry::default();
639 }
640
641 fn validate_slot_for_submit(&self, slot: u32) -> Result<usize, PipelineError> {
642 let slot_usize = usize::try_from(slot).map_err(|_| PipelineError::QueueFull {
643 queue: "submission",
644 fix: "slot index cannot fit host usize; shard mapped ingest slots",
645 })?;
646 if slot_usize >= self.mapped_slots.len() {
647 return Err(PipelineError::QueueFull {
648 queue: "submission",
649 fix: "slot exceeds the configured mapped-slot count",
650 });
651 }
652 if self.pending[slot_usize].is_some() {
653 return Err(PipelineError::QueueFull {
654 queue: "submission",
655 fix: "slot already has an in-flight ingest; drain completions before reusing it",
656 });
657 }
658 Ok(slot_usize)
659 }
660}
661
662fn checked_telemetry_add(
663 current: u64,
664 increment: u64,
665 label: &'static str,
666) -> Result<u64, PipelineError> {
667 vyre_driver::accounting::checked_add_u64_lazy(current, increment, || {
668 PipelineError::Backend(format!(
669 "io_uring ingest telemetry {label} overflowed u64. Fix: snapshot and reset telemetry before counters saturate."
670 ))
671 })
672}
673
674fn usize_to_u64(value: usize, label: &'static str) -> Result<u64, PipelineError> {
675 u64::try_from(value).map_err(|_| {
676 PipelineError::Backend(format!(
677 "{label} cannot fit u64. Fix: shard io_uring GPU ingest buffers before submission."
678 ))
679 })
680}
681
682fn slot_byte_offset(slot_idx: usize, slot_bytes: usize) -> Result<u64, PipelineError> {
683 let offset = vyre_driver::accounting::checked_mul_usize_lazy(slot_idx, slot_bytes, || {
684 PipelineError::Backend(
685 "io_uring ingest slot byte offset overflowed usize. Fix: shard mapped ingest slots."
686 .to_string(),
687 )
688 })?;
689 usize_to_u64(offset, "io_uring ingest slot byte offset")
690}
691
692fn reserve_ingest_vec_capacity<T>(
693 vec: &mut Vec<T>,
694 capacity: usize,
695 field: &'static str,
696) -> Result<(), PipelineError> {
697 if vec.capacity() >= capacity {
698 return Ok(());
699 }
700 vec.try_reserve_exact(capacity - vec.capacity())
701 .map_err(|error| {
702 PipelineError::Backend(format!(
703 "io_uring GPU ingest failed to reserve {field} for {capacity} entries: {error}. Fix: reduce ingest slot fan-out or shard the ingest batch."
704 ))
705 })
706}
707
708fn partition_slot_bytes(total_len: usize, slot_count: usize) -> Result<usize, PipelineError> {
709 if slot_count == 0 {
710 return Err(PipelineError::QueueFull {
711 queue: "submission",
712 fix: "NvmeGpuIngestDriver requires at least one slot",
713 });
714 }
715 let slot_bytes = total_len / slot_count;
716 if slot_bytes == 0 {
717 return Err(PipelineError::QueueFull {
718 queue: "submission",
719 fix: "mapped staging buffer is too small to partition into the requested slot count",
720 });
721 }
722 if total_len % slot_count != 0 {
723 return Err(PipelineError::QueueFull {
724 queue: "submission",
725 fix: "mapped staging buffer length must divide evenly by slot_count so every byte belongs to exactly one DMA slot",
726 });
727 }
728 Ok(slot_bytes)
729}
730
731#[cfg(test)]
732mod tests {
733 use super::*;
734
735 #[test]
736 fn partition_slot_bytes_accepts_exact_slot_geometry() {
737 assert_eq!(partition_slot_bytes(4096 * 8, 8).unwrap(), 4096);
738 }
739
740 #[test]
741 fn partition_slot_bytes_rejects_zero_slots() {
742 let error = partition_slot_bytes(4096, 0).expect_err("zero slots must fail");
743 assert!(matches!(error, PipelineError::QueueFull { .. }));
744 }
745
746 #[test]
747 fn partition_slot_bytes_rejects_remainder_bytes() {
748 let error = partition_slot_bytes(4097, 4)
749 .expect_err("remainder bytes create unreachable DMA capacity");
750 assert!(matches!(error, PipelineError::QueueFull { .. }));
751 }
752
753 #[test]
754 fn partition_slot_bytes_rejects_zero_byte_slots() {
755 let error = partition_slot_bytes(3, 4).expect_err("zero-byte DMA slots must fail");
756 assert!(matches!(error, PipelineError::QueueFull { .. }));
757 }
758
759 #[test]
760 fn ingest_telemetry_tracks_zero_cpu_bounce_registered_reads() {
761 let mut telemetry = NvmeGpuIngestTelemetry::default();
762 telemetry
763 .record_submit(NativeReadPath::RegisteredMappedRead, 4096)
764 .expect("Fix: telemetry submit accounting must fit.");
765 telemetry
766 .record_complete(4096)
767 .expect("Fix: telemetry completion accounting must fit.");
768
769 assert_eq!(telemetry.submitted_bytes, 4096);
770 assert_eq!(telemetry.completed_bytes, 4096);
771 assert_eq!(telemetry.submitted_reads, 1);
772 assert_eq!(telemetry.completed_reads, 1);
773 assert_eq!(telemetry.registered_mapped_read_submissions, 1);
774 assert_eq!(telemetry.gpudirect_nvme_submissions, 0);
775 assert_eq!(telemetry.cpu_bounce_bytes, 0);
776 assert_eq!(telemetry.inflight_reads(), 0);
777 }
778
779 #[test]
780 fn ingest_telemetry_tracks_zero_cpu_bounce_gpudirect_reads() {
781 let mut telemetry = NvmeGpuIngestTelemetry::default();
782 telemetry
783 .record_submit(NativeReadPath::GpuDirectNvmePassthrough, 8192)
784 .expect("Fix: telemetry submit accounting must fit.");
785 telemetry
786 .record_failed_completion()
787 .expect("Fix: telemetry failure accounting must fit.");
788
789 assert_eq!(telemetry.submitted_bytes, 8192);
790 assert_eq!(telemetry.completed_bytes, 0);
791 assert_eq!(telemetry.gpudirect_nvme_submissions, 1);
792 assert_eq!(telemetry.registered_mapped_read_submissions, 0);
793 assert_eq!(telemetry.failed_completions, 1);
794 assert_eq!(telemetry.cpu_bounce_bytes, 0);
795 assert_eq!(telemetry.inflight_reads(), 0);
796 }
797
798 #[test]
799 fn ingest_telemetry_reports_overflow_instead_of_wrapping() {
800 let error = checked_telemetry_add(u64::MAX, 1, "test counter")
801 .expect_err("Fix: telemetry counters must fail before wrapping.");
802 assert!(
803 error.to_string().contains("overflowed u64"),
804 "Fix: telemetry overflow errors must be actionable: {error}"
805 );
806 }
807
808 #[test]
809 fn ingest_staging_reservation_reports_capacity_overflow() {
810 let mut bytes = Vec::<u8>::new();
811 let error = reserve_ingest_vec_capacity(&mut bytes, usize::MAX, "test ingest bytes")
812 .expect_err("Fix: impossible ingest staging capacity must be a typed error.");
813
814 assert!(
815 error
816 .to_string()
817 .contains("failed to reserve test ingest bytes"),
818 "Fix: ingest staging reserve failure must name the failed field: {error}"
819 );
820 }
821
822 #[test]
823 fn ingest_staging_reservation_reuses_existing_capacity() {
824 let mut bytes = Vec::<u8>::with_capacity(8);
825 let original_capacity = bytes.capacity();
826
827 reserve_ingest_vec_capacity(&mut bytes, 4, "test ingest bytes")
828 .expect("Fix: lower target capacity should reuse existing staging.");
829
830 assert_eq!(bytes.capacity(), original_capacity);
831 }
832}