vortex_io/file/read/
request.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt;
5use std::fmt::{Debug, Formatter};
6use std::ops::Range;
7use std::sync::Arc;
8
9use vortex_buffer::{Alignment, ByteBuffer};
10use vortex_error::{VortexError, VortexExpect, VortexResult};
11
12/// An I/O request, either a single read or a coalesced set of reads.
13pub struct IoRequest(IoRequestInner);
14
15impl IoRequest {
16    pub(crate) fn new_single(request: ReadRequest) -> Self {
17        IoRequest(IoRequestInner::Single(request))
18    }
19
20    pub(crate) fn new_coalesced(request: CoalescedRequest) -> Self {
21        IoRequest(IoRequestInner::Coalesced(request))
22    }
23
24    // For debugging purposes.
25    #[cfg(test)]
26    pub(crate) fn inner(&self) -> &IoRequestInner {
27        &self.0
28    }
29
30    /// Returns the starting offset of this request within the file.
31    pub fn offset(&self) -> u64 {
32        match &self.0 {
33            IoRequestInner::Single(r) => r.offset,
34            IoRequestInner::Coalesced(r) => r.range.start,
35        }
36    }
37
38    /// Returns the byte range this request within the file.
39    pub fn range(&self) -> Range<u64> {
40        match &self.0 {
41            IoRequestInner::Single(r) => {
42                r.offset
43                    ..(r.offset + u64::try_from(r.length).vortex_expect("length too big for u64"))
44            }
45            IoRequestInner::Coalesced(r) => r.range.clone(),
46        }
47    }
48
49    /// Returns true if this request has zero length.
50    pub fn is_empty(&self) -> bool {
51        match &self.0 {
52            IoRequestInner::Single(r) => r.length == 0,
53            IoRequestInner::Coalesced(r) => r.range.start == r.range.end,
54        }
55    }
56
57    /// Returns the length of this request in bytes.
58    pub fn len(&self) -> usize {
59        match &self.0 {
60            IoRequestInner::Single(r) => r.length,
61            IoRequestInner::Coalesced(r) => usize::try_from(r.range.end - r.range.start)
62                .vortex_expect("range too big for usize"),
63        }
64    }
65
66    /// Returns the alignment requirement for this request.
67    pub fn alignment(&self) -> Alignment {
68        match &self.0 {
69            IoRequestInner::Single(r) => r.alignment,
70            IoRequestInner::Coalesced(r) => r.alignment,
71        }
72    }
73
74    /// Returns true if all callbacks associated with this request have been dropped.
75    /// In other words, there is no one waiting for the result of this request.
76    pub fn is_canceled(&self) -> bool {
77        match &self.0 {
78            IoRequestInner::Single(req) => req.callback.is_closed(),
79            IoRequestInner::Coalesced(req) => req.requests.iter().all(|r| r.callback.is_closed()),
80        }
81    }
82
83    /// Resolves the request with the given result.
84    pub fn resolve(self, result: VortexResult<ByteBuffer>) {
85        match self.0 {
86            IoRequestInner::Single(req) => req.resolve(result),
87            IoRequestInner::Coalesced(req) => req.resolve(result),
88        }
89    }
90}
91
92pub(crate) enum IoRequestInner {
93    Single(ReadRequest),
94    Coalesced(CoalescedRequest),
95}
96
97pub(crate) type RequestId = usize;
98
99pub(crate) struct ReadRequest {
100    pub(crate) id: RequestId,
101    pub(crate) offset: u64,
102    pub(crate) length: usize,
103    pub(crate) alignment: Alignment,
104    pub(crate) callback: oneshot::Sender<VortexResult<ByteBuffer>>,
105}
106
107impl Debug for ReadRequest {
108    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
109        f.debug_struct("ReadRequest")
110            .field("id", &self.id)
111            .field("offset", &self.offset)
112            .field("length", &self.length)
113            .field("alignment", &self.alignment)
114            .field("is_closed", &self.callback.is_closed())
115            .finish()
116    }
117}
118
119impl ReadRequest {
120    pub(crate) fn resolve(self, result: VortexResult<ByteBuffer>) {
121        if let Err(e) = self.callback.send(result) {
122            log::debug!("ReadRequest {} dropped before resolving: {e}", self.id);
123        }
124    }
125}
126
127/// A set of I/O requests that have been coalesced into a single larger request.
128pub(crate) struct CoalescedRequest {
129    pub(crate) range: Range<u64>,
130    pub(crate) alignment: Alignment, // The alignment of the first request in the coalesced range.
131    pub(crate) requests: Vec<ReadRequest>, // TODO(ngates): we could have enum of Single/Many to avoid Vec.
132}
133
134impl Debug for CoalescedRequest {
135    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
136        f.debug_struct("CoalescedRequest")
137            .field("#", &self.requests.len())
138            .field("length", &(self.range.end - self.range.start))
139            .field("range", &self.range)
140            .field("alignment", &self.alignment)
141            .finish()
142    }
143}
144
145impl CoalescedRequest {
146    pub fn resolve(self, result: VortexResult<ByteBuffer>) {
147        match result {
148            Ok(buffer) => {
149                let buffer = buffer.aligned(Alignment::none());
150                for req in self.requests.into_iter() {
151                    let start = usize::try_from(req.offset - self.range.start)
152                        .vortex_expect("invalid offset");
153                    let end = start + req.length;
154                    let slice = buffer.slice(start..end).aligned(req.alignment);
155                    req.resolve(Ok(slice));
156                }
157            }
158            Err(e) => {
159                let e = Arc::new(e);
160                for req in self.requests.into_iter() {
161                    req.resolve(Err(VortexError::from(e.clone())));
162                }
163            }
164        }
165    }
166}