1use crate::io::blocking::Buf;
2use crate::io::uring::utils::{ArcFd, UringFd};
3use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op};
4
5use io_uring::{opcode, types};
6use std::fmt;
7use std::io::{self, Error};
8use std::os::fd::OwnedFd;
9
10pub(crate) trait ReadBuffer: Send + 'static {
12 fn uring_read_prepare(&mut self, max_len: usize) -> (*mut u8, u32);
15
16 unsafe fn uring_read_complete(&mut self, n: u32);
23}
24
25impl ReadBuffer for Vec<u8> {
26 fn uring_read_prepare(&mut self, max_len: usize) -> (*mut u8, u32) {
27 assert!(self.spare_capacity_mut().len() >= max_len);
28 let ptr = self.spare_capacity_mut().as_mut_ptr().cast();
29 (ptr, max_len as u32)
30 }
31
32 unsafe fn uring_read_complete(&mut self, n: u32) {
33 unsafe { self.set_len(self.len() + n as usize) };
36 }
37}
38
39impl ReadBuffer for Buf {
40 fn uring_read_prepare(&mut self, max_len: usize) -> (*mut u8, u32) {
41 self.prepare_uring_read(max_len)
42 }
43
44 unsafe fn uring_read_complete(&mut self, n: u32) {
45 unsafe { self.complete_uring_read(n as usize) };
47 }
48}
49
50pub(crate) struct Read<B, F = ArcFd> {
51 fd: F,
52 buf: B,
53}
54
55impl<B: fmt::Debug, F> fmt::Debug for Read<B, F> {
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 f.debug_struct("Read")
58 .field("buf", &self.buf)
59 .finish_non_exhaustive()
60 }
61}
62
63impl<B: ReadBuffer, F: UringFd> Completable for Read<B, F> {
64 type Output = (io::Result<u32>, F, B);
65
66 fn complete(self, cqe: CqeResult) -> Self::Output {
67 let mut buf = self.buf;
68 if let Ok(len) = cqe.result {
69 unsafe { buf.uring_read_complete(len) };
71 }
72 (cqe.result, self.fd, buf)
73 }
74
75 fn complete_with_error(self, err: Error) -> Self::Output {
76 (Err(err), self.fd, self.buf)
77 }
78}
79
80impl Cancellable for Read<Vec<u8>, OwnedFd> {
81 fn cancel(self) -> CancelData {
82 CancelData::ReadVec(self)
83 }
84}
85
86impl Cancellable for Read<Buf, ArcFd> {
87 fn cancel(self) -> CancelData {
88 CancelData::ReadBuf(self)
89 }
90}
91
92impl<B, F> Op<Read<B, F>>
93where
94 B: ReadBuffer + fmt::Debug,
95 F: UringFd,
96 Read<B, F>: Cancellable,
97{
98 pub(crate) fn read_at(fd: F, mut buf: B, max_len: usize, offset: u64) -> Self {
103 let (ptr, len) = buf.uring_read_prepare(max_len);
104
105 let sqe = opcode::Read::new(types::Fd(UringFd::as_raw_fd(&fd)), ptr, len)
106 .offset(offset)
107 .build();
108
109 unsafe { Op::new(sqe, Read { fd, buf }) }
113 }
114}