Skip to main content

vortex_file/segments/
source.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::task;
9use std::task::Context;
10use std::task::Poll;
11
12use futures::FutureExt;
13use futures::StreamExt;
14use futures::channel::mpsc;
15use futures::future;
16use vortex_array::buffer::BufferHandle;
17use vortex_buffer::Alignment;
18use vortex_error::VortexResult;
19use vortex_error::vortex_err;
20use vortex_error::vortex_panic;
21use vortex_io::VortexReadAt;
22use vortex_io::runtime::Handle;
23use vortex_layout::segments::SegmentFuture;
24use vortex_layout::segments::SegmentId;
25use vortex_layout::segments::SegmentSource;
26use vortex_metrics::Counter;
27use vortex_metrics::Histogram;
28use vortex_metrics::Label;
29use vortex_metrics::MetricBuilder;
30use vortex_metrics::MetricsRegistry;
31
32use crate::SegmentSpec;
33use crate::read::IoRequestStream;
34use crate::read::ReadRequest;
35use crate::read::RequestId;
36
37#[derive(Debug)]
38pub enum ReadEvent {
39    Request(ReadRequest),
40    Polled(RequestId),
41    Dropped(RequestId),
42}
43
44/// A [`SegmentSource`] for file-like IO.
45/// ## Coalescing and Pre-fetching
46///
47/// It is important to understand the semantics of the read futures returned by a [`FileSegmentSource`].
48/// Under the hood, each instance is backed by a stream that services read requests by
49/// applying coalescing and concurrency constraints.
50///
51/// Each read future has four states:
52/// * `registered` - the read future has been created, but not yet polled.
53/// * `requested` - the read future has been polled.
54/// * `in-flight` - the read request has been sent to the underlying storage system.
55/// * `resolved` - the read future has completed and resolved a result.
56///
57/// When a read request is `registered`, it will not itself trigger any I/O, but is eligible to
58/// be coalesced with other requests.
59///
60/// If a read future is dropped, it will be canceled if possible. This depends on the current
61/// state of the request, as well as whether the underlying storage system supports cancellation.
62///
63/// I/O requests will be processed in the order they are `registered`, however coalescing may mean
64/// other registered requests are lumped together into a single I/O operation.
65pub struct FileSegmentSource {
66    segments: Arc<[SegmentSpec]>,
67    /// A queue for sending read request events to the I/O stream.
68    events: mpsc::UnboundedSender<ReadEvent>,
69    /// The next read request ID.
70    next_id: Arc<AtomicUsize>,
71}
72
73impl FileSegmentSource {
74    pub fn open<R: VortexReadAt + Clone>(
75        segments: Arc<[SegmentSpec]>,
76        reader: R,
77        handle: Handle,
78        metrics: RequestMetrics,
79    ) -> Self {
80        let (send, recv) = mpsc::unbounded();
81
82        let max_alignment = segments
83            .iter()
84            .map(|segment| segment.alignment)
85            .max()
86            .unwrap_or_else(Alignment::none);
87        let coalesce_config = reader.coalesce_config().map(|mut config| {
88            // Aligning the coalesced start down can add up to (alignment - 1) bytes.
89            // Increase max_size to keep the effective payload window consistent.
90            let extra = (*max_alignment as u64).saturating_sub(1);
91            config.max_size = config.max_size.saturating_add(extra);
92            config
93        });
94        let concurrency = reader.concurrency();
95        if concurrency == 0 {
96            vortex_panic!(
97                "VortexReadAt::concurrency returned 0 (uri={:?}); this would stall I/O",
98                reader.uri()
99            );
100        }
101
102        let stream = IoRequestStream::new(
103            StreamExt::boxed(recv),
104            coalesce_config,
105            max_alignment,
106            metrics,
107        )
108        .boxed();
109
110        let drive_fut = async move {
111            stream
112                .map(move |req| {
113                    let reader = reader.clone();
114                    async move {
115                        let result = reader
116                            .read_at(req.offset(), req.len(), req.alignment())
117                            .await;
118                        req.resolve(result);
119                    }
120                })
121                .buffer_unordered(concurrency)
122                .collect::<()>()
123                .await
124        };
125
126        handle.spawn(drive_fut).detach();
127
128        Self {
129            segments,
130            events: send,
131            next_id: Arc::new(AtomicUsize::new(0)),
132        }
133    }
134}
135
136impl SegmentSource for FileSegmentSource {
137    fn request(&self, id: SegmentId) -> SegmentFuture {
138        // We eagerly register the read request here assuming the behaviour of [`FileRead`], where
139        // coalescing becomes effective prior to the future being polled.
140        let spec = match self.segments.get(*id as usize).cloned() {
141            Some(spec) => spec,
142            None => {
143                return future::ready(Err(vortex_err!("Missing segment: {}", id))).boxed();
144            }
145        };
146
147        let SegmentSpec {
148            offset,
149            length,
150            alignment,
151        } = spec;
152
153        let (send, recv) = oneshot::channel();
154        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
155        let event = ReadEvent::Request(ReadRequest {
156            id,
157            offset,
158            length: length as usize,
159            alignment,
160            callback: send,
161        });
162
163        // If we fail to submit the event, we create a future that has failed.
164        if let Err(e) = self.events.unbounded_send(event) {
165            return future::ready(Err(vortex_err!("Failed to submit read request: {e}"))).boxed();
166        }
167
168        let fut = ReadFuture {
169            id,
170            recv,
171            polled: false,
172            events: self.events.clone(),
173        };
174
175        // One allocation: we only box the returned SegmentFuture, not the inner ReadFuture.
176        fut.boxed()
177    }
178}
179
180/// A future that resolves a read request from a [`FileRead`].
181///
182/// See the documentation for [`FileRead`] for details on coalescing and pre-fetching.
183/// If dropped, the read request will be canceled where possible.
184struct ReadFuture {
185    id: usize,
186    recv: oneshot::Receiver<VortexResult<BufferHandle>>,
187    polled: bool,
188    events: mpsc::UnboundedSender<ReadEvent>,
189}
190
191impl Future for ReadFuture {
192    type Output = VortexResult<BufferHandle>;
193
194    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195        if !self.polled {
196            self.polled = true;
197            // Notify the I/O stream that this request has been polled.
198            if let Err(e) = self.events.unbounded_send(ReadEvent::Polled(self.id)) {
199                return Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}")));
200            }
201        }
202
203        match task::ready!(self.recv.poll_unpin(cx)) {
204            Ok(result) => Poll::Ready(result),
205            Err(e) => Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}"))),
206        }
207    }
208}
209
210impl Drop for ReadFuture {
211    fn drop(&mut self) {
212        // When the FileHandle is dropped, we can send a shutdown event to the I/O stream.
213        // If the I/O stream has already been dropped, this will fail silently.
214        drop(self.events.unbounded_send(ReadEvent::Dropped(self.id)));
215    }
216}
217
218pub struct RequestMetrics {
219    pub individual_requests: Counter,
220    pub coalesced_requests: Counter,
221    pub num_requests_coalesced: Histogram,
222}
223
224impl RequestMetrics {
225    pub fn new(metrics_registry: &dyn MetricsRegistry, labels: Vec<Label>) -> Self {
226        Self {
227            individual_requests: MetricBuilder::new(metrics_registry)
228                .add_labels(labels.clone())
229                .counter("io.requests.individual"),
230            coalesced_requests: MetricBuilder::new(metrics_registry)
231                .add_labels(labels.clone())
232                .counter("io.requests.coalesced"),
233            num_requests_coalesced: MetricBuilder::new(metrics_registry)
234                .add_labels(labels)
235                .histogram("io.requests.coalesced.num_coalesced"),
236        }
237    }
238}