vortex_file/segments/
source.rs1use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::task::Context;
9use std::task::Poll;
10
11use futures::FutureExt;
12use futures::StreamExt;
13use futures::channel::mpsc;
14use futures::future;
15use vortex_array::buffer::BufferHandle;
16use vortex_buffer::Alignment;
17use vortex_buffer::ByteBuffer;
18use vortex_error::VortexResult;
19use vortex_error::vortex_err;
20use vortex_error::vortex_panic;
21use vortex_io::VortexReadAt;
22use vortex_io::runtime::Handle;
23use vortex_layout::segments::SegmentFuture;
24use vortex_layout::segments::SegmentId;
25use vortex_layout::segments::SegmentSource;
26use vortex_metrics::Counter;
27use vortex_metrics::Histogram;
28use vortex_metrics::Label;
29use vortex_metrics::MetricBuilder;
30use vortex_metrics::MetricsRegistry;
31
32use crate::SegmentSpec;
33use crate::read::IoRequestStream;
34use crate::read::ReadRequest;
35use crate::read::RequestId;
36
37#[derive(Debug)]
38pub enum ReadEvent {
39 Request(ReadRequest),
40 Polled(RequestId),
41 Dropped(RequestId),
42}
43
44pub struct FileSegmentSource {
66 segments: Arc<[SegmentSpec]>,
67 events: mpsc::UnboundedSender<ReadEvent>,
69 next_id: Arc<AtomicUsize>,
71}
72
73impl FileSegmentSource {
74 pub fn open<R: VortexReadAt + Clone>(
75 segments: Arc<[SegmentSpec]>,
76 reader: R,
77 handle: Handle,
78 metrics: RequestMetrics,
79 ) -> Self {
80 let (send, recv) = mpsc::unbounded();
81
82 let max_alignment = segments
83 .iter()
84 .map(|segment| segment.alignment)
85 .max()
86 .unwrap_or_else(Alignment::none);
87 let coalesce_config = reader.coalesce_config().map(|mut config| {
88 let extra = (*max_alignment as u64).saturating_sub(1);
91 config.max_size = config.max_size.saturating_add(extra);
92 config
93 });
94 let concurrency = reader.concurrency();
95 if concurrency == 0 {
96 vortex_panic!(
97 "VortexReadAt::concurrency returned 0 (uri={:?}); this would stall I/O",
98 reader.uri()
99 );
100 }
101
102 let stream = IoRequestStream::new(
103 StreamExt::boxed(recv),
104 coalesce_config,
105 max_alignment,
106 metrics,
107 )
108 .boxed();
109
110 let drive_fut = async move {
111 stream
112 .map(move |req| {
113 let reader = reader.clone();
114 async move {
115 let result = reader
116 .read_at(req.offset(), req.len(), req.alignment())
117 .await;
118 req.resolve(result);
119 }
120 })
121 .buffer_unordered(concurrency)
122 .collect::<()>()
123 .await
124 };
125
126 handle.spawn(drive_fut).detach();
127
128 Self {
129 segments,
130 events: send,
131 next_id: Arc::new(AtomicUsize::new(0)),
132 }
133 }
134}
135
136impl SegmentSource for FileSegmentSource {
137 fn request(&self, id: SegmentId) -> SegmentFuture {
138 let spec = *match self.segments.get(*id as usize) {
141 Some(spec) => spec,
142 None => {
143 return future::ready(Err(vortex_err!("Missing segment: {}", id))).boxed();
144 }
145 };
146
147 let SegmentSpec {
148 offset,
149 length,
150 alignment,
151 } = spec;
152
153 let (send, recv) = oneshot::channel();
154 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
155 let event = ReadEvent::Request(ReadRequest {
156 id,
157 offset,
158 length: length as usize,
159 alignment,
160 callback: send,
161 });
162
163 if let Err(e) = self.events.unbounded_send(event) {
165 return future::ready(Err(vortex_err!("Failed to submit read request: {e}"))).boxed();
166 }
167
168 let fut = ReadFuture {
169 id,
170 recv,
171 polled: false,
172 finished: false,
173 events: self.events.clone(),
174 };
175
176 fut.boxed()
178 }
179}
180
181struct ReadFuture {
186 id: usize,
187 recv: oneshot::Receiver<VortexResult<BufferHandle>>,
188 polled: bool,
189 finished: bool,
190 events: mpsc::UnboundedSender<ReadEvent>,
191}
192
193impl Future for ReadFuture {
194 type Output = VortexResult<BufferHandle>;
195
196 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
197 match self.recv.poll_unpin(cx) {
198 Poll::Ready(result) => {
199 self.finished = true;
200 Poll::Ready(
204 result.unwrap_or_else(|e| {
205 Err(vortex_err!("ReadRequest dropped by runtime: {e}"))
206 }),
207 )
208 }
209 Poll::Pending if !self.polled => {
210 self.polled = true;
211 match self.events.unbounded_send(ReadEvent::Polled(self.id)) {
213 Ok(()) => Poll::Pending,
214 Err(e) => Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}"))),
215 }
216 }
217 _ => Poll::Pending,
218 }
219 }
220}
221
222impl Drop for ReadFuture {
223 fn drop(&mut self) {
224 if self.finished {
226 return;
227 }
228
229 drop(self.events.unbounded_send(ReadEvent::Dropped(self.id)));
231 }
232}
233
234pub struct RequestMetrics {
235 pub individual_requests: Counter,
236 pub coalesced_requests: Counter,
237 pub num_requests_coalesced: Histogram,
238}
239
240impl RequestMetrics {
241 pub fn new(metrics_registry: &dyn MetricsRegistry, labels: Vec<Label>) -> Self {
242 Self {
243 individual_requests: MetricBuilder::new(metrics_registry)
244 .add_labels(labels.clone())
245 .counter("io.requests.individual"),
246 coalesced_requests: MetricBuilder::new(metrics_registry)
247 .add_labels(labels.clone())
248 .counter("io.requests.coalesced"),
249 num_requests_coalesced: MetricBuilder::new(metrics_registry)
250 .add_labels(labels)
251 .histogram("io.requests.coalesced.num_coalesced"),
252 }
253 }
254}
255
256pub(crate) struct BufferSegmentSource {
261 buffer: ByteBuffer,
262 segments: Arc<[SegmentSpec]>,
263}
264
265impl BufferSegmentSource {
266 pub fn new(buffer: ByteBuffer, segments: Arc<[SegmentSpec]>) -> Self {
268 Self { buffer, segments }
269 }
270}
271
272impl SegmentSource for BufferSegmentSource {
273 fn request(&self, id: SegmentId) -> SegmentFuture {
274 let spec = match self.segments.get(*id as usize) {
275 Some(spec) => spec,
276 None => {
277 return future::ready(Err(vortex_err!("Missing segment: {}", id))).boxed();
278 }
279 };
280
281 let start = spec.offset as usize;
282 let end = start + spec.length as usize;
283 if end > self.buffer.len() {
284 return future::ready(Err(vortex_err!(
285 "Segment {} range {}..{} out of bounds for buffer of length {}",
286 *id,
287 start,
288 end,
289 self.buffer.len()
290 )))
291 .boxed();
292 }
293
294 let slice = self.buffer.slice(start..end).aligned(spec.alignment);
295 future::ready(Ok(BufferHandle::new_host(slice))).boxed()
296 }
297}