vortex_io/file/read/
request.rs1use std::fmt;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::ops::Range;
8use std::sync::Arc;
9
10use vortex_buffer::Alignment;
11use vortex_buffer::ByteBuffer;
12use vortex_error::VortexError;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15
16pub struct IoRequest(IoRequestInner);
18
19impl IoRequest {
20 pub(crate) fn new_single(request: ReadRequest) -> Self {
21 IoRequest(IoRequestInner::Single(request))
22 }
23
24 pub(crate) fn new_coalesced(request: CoalescedRequest) -> Self {
25 IoRequest(IoRequestInner::Coalesced(request))
26 }
27
28 #[cfg(test)]
30 pub(crate) fn inner(&self) -> &IoRequestInner {
31 &self.0
32 }
33
34 pub fn offset(&self) -> u64 {
36 match &self.0 {
37 IoRequestInner::Single(r) => r.offset,
38 IoRequestInner::Coalesced(r) => r.range.start,
39 }
40 }
41
42 pub fn range(&self) -> Range<u64> {
44 match &self.0 {
45 IoRequestInner::Single(r) => {
46 r.offset
47 ..(r.offset + u64::try_from(r.length).vortex_expect("length too big for u64"))
48 }
49 IoRequestInner::Coalesced(r) => r.range.clone(),
50 }
51 }
52
53 pub fn is_empty(&self) -> bool {
55 match &self.0 {
56 IoRequestInner::Single(r) => r.length == 0,
57 IoRequestInner::Coalesced(r) => r.range.start == r.range.end,
58 }
59 }
60
61 pub fn len(&self) -> usize {
63 match &self.0 {
64 IoRequestInner::Single(r) => r.length,
65 IoRequestInner::Coalesced(r) => usize::try_from(r.range.end - r.range.start)
66 .vortex_expect("range too big for usize"),
67 }
68 }
69
70 pub fn alignment(&self) -> Alignment {
72 match &self.0 {
73 IoRequestInner::Single(r) => r.alignment,
74 IoRequestInner::Coalesced(r) => r.alignment,
75 }
76 }
77
78 pub fn is_canceled(&self) -> bool {
81 match &self.0 {
82 IoRequestInner::Single(req) => req.callback.is_closed(),
83 IoRequestInner::Coalesced(req) => req.requests.iter().all(|r| r.callback.is_closed()),
84 }
85 }
86
87 pub fn resolve(self, result: VortexResult<ByteBuffer>) {
89 match self.0 {
90 IoRequestInner::Single(req) => req.resolve(result),
91 IoRequestInner::Coalesced(req) => req.resolve(result),
92 }
93 }
94}
95
96pub(crate) enum IoRequestInner {
97 Single(ReadRequest),
98 Coalesced(CoalescedRequest),
99}
100
101pub(crate) type RequestId = usize;
102
103pub(crate) struct ReadRequest {
104 pub(crate) id: RequestId,
105 pub(crate) offset: u64,
106 pub(crate) length: usize,
107 pub(crate) alignment: Alignment,
108 pub(crate) callback: oneshot::Sender<VortexResult<ByteBuffer>>,
109}
110
111impl Debug for ReadRequest {
112 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
113 f.debug_struct("ReadRequest")
114 .field("id", &self.id)
115 .field("offset", &self.offset)
116 .field("length", &self.length)
117 .field("alignment", &self.alignment)
118 .field("is_closed", &self.callback.is_closed())
119 .finish()
120 }
121}
122
123impl ReadRequest {
124 pub(crate) fn resolve(self, result: VortexResult<ByteBuffer>) {
125 if let Err(e) = self.callback.send(result) {
126 tracing::debug!("ReadRequest {} dropped before resolving: {e}", self.id);
127 }
128 }
129}
130
131pub(crate) struct CoalescedRequest {
133 pub(crate) range: Range<u64>,
134 pub(crate) alignment: Alignment, pub(crate) requests: Vec<ReadRequest>, }
137
138impl Debug for CoalescedRequest {
139 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
140 f.debug_struct("CoalescedRequest")
141 .field("#", &self.requests.len())
142 .field("length", &(self.range.end - self.range.start))
143 .field("range", &self.range)
144 .field("alignment", &self.alignment)
145 .finish()
146 }
147}
148
149impl CoalescedRequest {
150 pub fn resolve(self, result: VortexResult<ByteBuffer>) {
151 match result {
152 Ok(buffer) => {
153 let buffer = buffer.aligned(Alignment::none());
154 for req in self.requests.into_iter() {
155 let start = usize::try_from(req.offset - self.range.start)
156 .vortex_expect("invalid offset");
157 let end = start + req.length;
158 let slice = buffer.slice(start..end).aligned(req.alignment);
159 req.resolve(Ok(slice));
160 }
161 }
162 Err(e) => {
163 let e = Arc::new(e);
164 for req in self.requests.into_iter() {
165 req.resolve(Err(VortexError::from(e.clone())));
166 }
167 }
168 }
169 }
170}