vortex_io/file/read/
request.rs1use std::fmt;
5use std::fmt::{Debug, Formatter};
6use std::ops::Range;
7use std::sync::Arc;
8
9use vortex_buffer::{Alignment, ByteBuffer};
10use vortex_error::{VortexError, VortexExpect, VortexResult};
11
12pub struct IoRequest(IoRequestInner);
14
15impl IoRequest {
16 pub(crate) fn new_single(request: ReadRequest) -> Self {
17 IoRequest(IoRequestInner::Single(request))
18 }
19
20 pub(crate) fn new_coalesced(request: CoalescedRequest) -> Self {
21 IoRequest(IoRequestInner::Coalesced(request))
22 }
23
24 #[cfg(test)]
26 pub(crate) fn inner(&self) -> &IoRequestInner {
27 &self.0
28 }
29
30 pub fn offset(&self) -> u64 {
32 match &self.0 {
33 IoRequestInner::Single(r) => r.offset,
34 IoRequestInner::Coalesced(r) => r.range.start,
35 }
36 }
37
38 pub fn range(&self) -> Range<u64> {
40 match &self.0 {
41 IoRequestInner::Single(r) => {
42 r.offset
43 ..(r.offset + u64::try_from(r.length).vortex_expect("length too big for u64"))
44 }
45 IoRequestInner::Coalesced(r) => r.range.clone(),
46 }
47 }
48
49 pub fn is_empty(&self) -> bool {
51 match &self.0 {
52 IoRequestInner::Single(r) => r.length == 0,
53 IoRequestInner::Coalesced(r) => r.range.start == r.range.end,
54 }
55 }
56
57 pub fn len(&self) -> usize {
59 match &self.0 {
60 IoRequestInner::Single(r) => r.length,
61 IoRequestInner::Coalesced(r) => usize::try_from(r.range.end - r.range.start)
62 .vortex_expect("range too big for usize"),
63 }
64 }
65
66 pub fn alignment(&self) -> Alignment {
68 match &self.0 {
69 IoRequestInner::Single(r) => r.alignment,
70 IoRequestInner::Coalesced(r) => r.alignment,
71 }
72 }
73
74 pub fn is_canceled(&self) -> bool {
77 match &self.0 {
78 IoRequestInner::Single(req) => req.callback.is_closed(),
79 IoRequestInner::Coalesced(req) => req.requests.iter().all(|r| r.callback.is_closed()),
80 }
81 }
82
83 pub fn resolve(self, result: VortexResult<ByteBuffer>) {
85 match self.0 {
86 IoRequestInner::Single(req) => req.resolve(result),
87 IoRequestInner::Coalesced(req) => req.resolve(result),
88 }
89 }
90}
91
92pub(crate) enum IoRequestInner {
93 Single(ReadRequest),
94 Coalesced(CoalescedRequest),
95}
96
97pub(crate) type RequestId = usize;
98
99pub(crate) struct ReadRequest {
100 pub(crate) id: RequestId,
101 pub(crate) offset: u64,
102 pub(crate) length: usize,
103 pub(crate) alignment: Alignment,
104 pub(crate) callback: oneshot::Sender<VortexResult<ByteBuffer>>,
105}
106
107impl Debug for ReadRequest {
108 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
109 f.debug_struct("ReadRequest")
110 .field("id", &self.id)
111 .field("offset", &self.offset)
112 .field("length", &self.length)
113 .field("alignment", &self.alignment)
114 .field("is_closed", &self.callback.is_closed())
115 .finish()
116 }
117}
118
119impl ReadRequest {
120 pub(crate) fn resolve(self, result: VortexResult<ByteBuffer>) {
121 if let Err(e) = self.callback.send(result) {
122 log::debug!("ReadRequest {} dropped before resolving: {e}", self.id);
123 }
124 }
125}
126
127pub(crate) struct CoalescedRequest {
129 pub(crate) range: Range<u64>,
130 pub(crate) alignment: Alignment, pub(crate) requests: Vec<ReadRequest>, }
133
134impl Debug for CoalescedRequest {
135 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
136 f.debug_struct("CoalescedRequest")
137 .field("#", &self.requests.len())
138 .field("length", &(self.range.end - self.range.start))
139 .field("range", &self.range)
140 .field("alignment", &self.alignment)
141 .finish()
142 }
143}
144
145impl CoalescedRequest {
146 pub fn resolve(self, result: VortexResult<ByteBuffer>) {
147 match result {
148 Ok(buffer) => {
149 let buffer = buffer.aligned(Alignment::none());
150 for req in self.requests.into_iter() {
151 let start = usize::try_from(req.offset - self.range.start)
152 .vortex_expect("invalid offset");
153 let end = start + req.length;
154 let slice = buffer.slice(start..end).aligned(req.alignment);
155 req.resolve(Ok(slice));
156 }
157 }
158 Err(e) => {
159 let e = Arc::new(e);
160 for req in self.requests.into_iter() {
161 req.resolve(Err(VortexError::from(e.clone())));
162 }
163 }
164 }
165 }
166}