vortex_io/file/read/
request.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::ops::Range;
8use std::sync::Arc;
9
10use vortex_buffer::Alignment;
11use vortex_buffer::ByteBuffer;
12use vortex_error::VortexError;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15
16/// An I/O request, either a single read or a coalesced set of reads.
17pub struct IoRequest(IoRequestInner);
18
19impl IoRequest {
20    pub(crate) fn new_single(request: ReadRequest) -> Self {
21        IoRequest(IoRequestInner::Single(request))
22    }
23
24    pub(crate) fn new_coalesced(request: CoalescedRequest) -> Self {
25        IoRequest(IoRequestInner::Coalesced(request))
26    }
27
28    // For debugging purposes.
29    #[cfg(test)]
30    pub(crate) fn inner(&self) -> &IoRequestInner {
31        &self.0
32    }
33
34    /// Returns the starting offset of this request within the file.
35    pub fn offset(&self) -> u64 {
36        match &self.0 {
37            IoRequestInner::Single(r) => r.offset,
38            IoRequestInner::Coalesced(r) => r.range.start,
39        }
40    }
41
42    /// Returns the byte range this request within the file.
43    pub fn range(&self) -> Range<u64> {
44        match &self.0 {
45            IoRequestInner::Single(r) => {
46                r.offset
47                    ..(r.offset + u64::try_from(r.length).vortex_expect("length too big for u64"))
48            }
49            IoRequestInner::Coalesced(r) => r.range.clone(),
50        }
51    }
52
53    /// Returns true if this request has zero length.
54    pub fn is_empty(&self) -> bool {
55        match &self.0 {
56            IoRequestInner::Single(r) => r.length == 0,
57            IoRequestInner::Coalesced(r) => r.range.start == r.range.end,
58        }
59    }
60
61    /// Returns the length of this request in bytes.
62    pub fn len(&self) -> usize {
63        match &self.0 {
64            IoRequestInner::Single(r) => r.length,
65            IoRequestInner::Coalesced(r) => usize::try_from(r.range.end - r.range.start)
66                .vortex_expect("range too big for usize"),
67        }
68    }
69
70    /// Returns the alignment requirement for this request.
71    pub fn alignment(&self) -> Alignment {
72        match &self.0 {
73            IoRequestInner::Single(r) => r.alignment,
74            IoRequestInner::Coalesced(r) => r.alignment,
75        }
76    }
77
78    /// Returns true if all callbacks associated with this request have been dropped.
79    /// In other words, there is no one waiting for the result of this request.
80    pub fn is_canceled(&self) -> bool {
81        match &self.0 {
82            IoRequestInner::Single(req) => req.callback.is_closed(),
83            IoRequestInner::Coalesced(req) => req.requests.iter().all(|r| r.callback.is_closed()),
84        }
85    }
86
87    /// Resolves the request with the given result.
88    pub fn resolve(self, result: VortexResult<ByteBuffer>) {
89        match self.0 {
90            IoRequestInner::Single(req) => req.resolve(result),
91            IoRequestInner::Coalesced(req) => req.resolve(result),
92        }
93    }
94}
95
96pub(crate) enum IoRequestInner {
97    Single(ReadRequest),
98    Coalesced(CoalescedRequest),
99}
100
101pub(crate) type RequestId = usize;
102
103pub(crate) struct ReadRequest {
104    pub(crate) id: RequestId,
105    pub(crate) offset: u64,
106    pub(crate) length: usize,
107    pub(crate) alignment: Alignment,
108    pub(crate) callback: oneshot::Sender<VortexResult<ByteBuffer>>,
109}
110
111impl Debug for ReadRequest {
112    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
113        f.debug_struct("ReadRequest")
114            .field("id", &self.id)
115            .field("offset", &self.offset)
116            .field("length", &self.length)
117            .field("alignment", &self.alignment)
118            .field("is_closed", &self.callback.is_closed())
119            .finish()
120    }
121}
122
123impl ReadRequest {
124    pub(crate) fn resolve(self, result: VortexResult<ByteBuffer>) {
125        if let Err(e) = self.callback.send(result) {
126            tracing::debug!("ReadRequest {} dropped before resolving: {e}", self.id);
127        }
128    }
129}
130
131/// A set of I/O requests that have been coalesced into a single larger request.
132pub(crate) struct CoalescedRequest {
133    pub(crate) range: Range<u64>,
134    pub(crate) alignment: Alignment, // The alignment of the first request in the coalesced range.
135    pub(crate) requests: Vec<ReadRequest>, // TODO(ngates): we could have enum of Single/Many to avoid Vec.
136}
137
138impl Debug for CoalescedRequest {
139    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
140        f.debug_struct("CoalescedRequest")
141            .field("#", &self.requests.len())
142            .field("length", &(self.range.end - self.range.start))
143            .field("range", &self.range)
144            .field("alignment", &self.alignment)
145            .finish()
146    }
147}
148
149impl CoalescedRequest {
150    pub fn resolve(self, result: VortexResult<ByteBuffer>) {
151        match result {
152            Ok(buffer) => {
153                let buffer = buffer.aligned(Alignment::none());
154                for req in self.requests.into_iter() {
155                    let start = usize::try_from(req.offset - self.range.start)
156                        .vortex_expect("invalid offset");
157                    let end = start + req.length;
158                    let slice = buffer.slice(start..end).aligned(req.alignment);
159                    req.resolve(Ok(slice));
160                }
161            }
162            Err(e) => {
163                let e = Arc::new(e);
164                for req in self.requests.into_iter() {
165                    req.resolve(Err(VortexError::from(e.clone())));
166                }
167            }
168        }
169    }
170}