Skip to main content

vortex_file/segments/
source.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::task::Context;
9use std::task::Poll;
10
11use futures::FutureExt;
12use futures::StreamExt;
13use futures::channel::mpsc;
14use futures::future;
15use vortex_array::buffer::BufferHandle;
16use vortex_buffer::Alignment;
17use vortex_error::VortexResult;
18use vortex_error::vortex_err;
19use vortex_error::vortex_panic;
20use vortex_io::VortexReadAt;
21use vortex_io::runtime::Handle;
22use vortex_layout::segments::SegmentFuture;
23use vortex_layout::segments::SegmentId;
24use vortex_layout::segments::SegmentSource;
25use vortex_metrics::Counter;
26use vortex_metrics::Histogram;
27use vortex_metrics::Label;
28use vortex_metrics::MetricBuilder;
29use vortex_metrics::MetricsRegistry;
30
31use crate::SegmentSpec;
32use crate::read::IoRequestStream;
33use crate::read::ReadRequest;
34use crate::read::RequestId;
35
36#[derive(Debug)]
37pub enum ReadEvent {
38    Request(ReadRequest),
39    Polled(RequestId),
40    Dropped(RequestId),
41}
42
43/// A [`SegmentSource`] for file-like IO.
44/// ## Coalescing and Pre-fetching
45///
46/// It is important to understand the semantics of the read futures returned by a [`FileSegmentSource`].
47/// Under the hood, each instance is backed by a stream that services read requests by
48/// applying coalescing and concurrency constraints.
49///
50/// Each read future has four states:
51/// * `registered` - the read future has been created, but not yet polled.
52/// * `requested` - the read future has been polled.
53/// * `in-flight` - the read request has been sent to the underlying storage system.
54/// * `resolved` - the read future has completed and resolved a result.
55///
56/// When a read request is `registered`, it will not itself trigger any I/O, but is eligible to
57/// be coalesced with other requests.
58///
59/// If a read future is dropped, it will be canceled if possible. This depends on the current
60/// state of the request, as well as whether the underlying storage system supports cancellation.
61///
62/// I/O requests will be processed in the order they are `registered`, however coalescing may mean
63/// other registered requests are lumped together into a single I/O operation.
64pub struct FileSegmentSource {
65    segments: Arc<[SegmentSpec]>,
66    /// A queue for sending read request events to the I/O stream.
67    events: mpsc::UnboundedSender<ReadEvent>,
68    /// The next read request ID.
69    next_id: Arc<AtomicUsize>,
70}
71
72impl FileSegmentSource {
73    pub fn open<R: VortexReadAt + Clone>(
74        segments: Arc<[SegmentSpec]>,
75        reader: R,
76        handle: Handle,
77        metrics: RequestMetrics,
78    ) -> Self {
79        let (send, recv) = mpsc::unbounded();
80
81        let max_alignment = segments
82            .iter()
83            .map(|segment| segment.alignment)
84            .max()
85            .unwrap_or_else(Alignment::none);
86        let coalesce_config = reader.coalesce_config().map(|mut config| {
87            // Aligning the coalesced start down can add up to (alignment - 1) bytes.
88            // Increase max_size to keep the effective payload window consistent.
89            let extra = (*max_alignment as u64).saturating_sub(1);
90            config.max_size = config.max_size.saturating_add(extra);
91            config
92        });
93        let concurrency = reader.concurrency();
94        if concurrency == 0 {
95            vortex_panic!(
96                "VortexReadAt::concurrency returned 0 (uri={:?}); this would stall I/O",
97                reader.uri()
98            );
99        }
100
101        let stream = IoRequestStream::new(
102            StreamExt::boxed(recv),
103            coalesce_config,
104            max_alignment,
105            metrics,
106        )
107        .boxed();
108
109        let drive_fut = async move {
110            stream
111                .map(move |req| {
112                    let reader = reader.clone();
113                    async move {
114                        let result = reader
115                            .read_at(req.offset(), req.len(), req.alignment())
116                            .await;
117                        req.resolve(result);
118                    }
119                })
120                .buffer_unordered(concurrency)
121                .collect::<()>()
122                .await
123        };
124
125        handle.spawn(drive_fut).detach();
126
127        Self {
128            segments,
129            events: send,
130            next_id: Arc::new(AtomicUsize::new(0)),
131        }
132    }
133}
134
135impl SegmentSource for FileSegmentSource {
136    fn request(&self, id: SegmentId) -> SegmentFuture {
137        // We eagerly register the read request here assuming the behaviour of [`FileRead`], where
138        // coalescing becomes effective prior to the future being polled.
139        let spec = match self.segments.get(*id as usize).cloned() {
140            Some(spec) => spec,
141            None => {
142                return future::ready(Err(vortex_err!("Missing segment: {}", id))).boxed();
143            }
144        };
145
146        let SegmentSpec {
147            offset,
148            length,
149            alignment,
150        } = spec;
151
152        let (send, recv) = oneshot::channel();
153        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
154        let event = ReadEvent::Request(ReadRequest {
155            id,
156            offset,
157            length: length as usize,
158            alignment,
159            callback: send,
160        });
161
162        // If we fail to submit the event, we create a future that has failed.
163        if let Err(e) = self.events.unbounded_send(event) {
164            return future::ready(Err(vortex_err!("Failed to submit read request: {e}"))).boxed();
165        }
166
167        let fut = ReadFuture {
168            id,
169            recv,
170            polled: false,
171            finished: false,
172            events: self.events.clone(),
173        };
174
175        // One allocation: we only box the returned SegmentFuture, not the inner ReadFuture.
176        fut.boxed()
177    }
178}
179
180/// A future that resolves a read request from a [`FileRead`].
181///
182/// See the documentation for [`FileRead`] for details on coalescing and pre-fetching.
183/// If dropped, the read request will be canceled where possible.
184struct ReadFuture {
185    id: usize,
186    recv: oneshot::Receiver<VortexResult<BufferHandle>>,
187    polled: bool,
188    finished: bool,
189    events: mpsc::UnboundedSender<ReadEvent>,
190}
191
192impl Future for ReadFuture {
193    type Output = VortexResult<BufferHandle>;
194
195    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
196        match self.recv.poll_unpin(cx) {
197            Poll::Ready(result) => {
198                self.finished = true;
199                // note: we are skipping polled and dropped events for this if the future
200                //       is ready on the first poll, that means this request was completed
201                //       before it was polled, as part of a coalesced request.
202                Poll::Ready(
203                    result.unwrap_or_else(|e| {
204                        Err(vortex_err!("ReadRequest dropped by runtime: {e}"))
205                    }),
206                )
207            }
208            Poll::Pending if !self.polled => {
209                self.polled = true;
210                // Notify the I/O stream that this request has been polled.
211                match self.events.unbounded_send(ReadEvent::Polled(self.id)) {
212                    Ok(()) => Poll::Pending,
213                    Err(e) => Poll::Ready(Err(vortex_err!("ReadRequest dropped by runtime: {e}"))),
214                }
215            }
216            _ => Poll::Pending,
217        }
218    }
219}
220
221impl Drop for ReadFuture {
222    fn drop(&mut self) {
223        // Completed requests have already left driver state.
224        if self.finished {
225            return;
226        }
227
228        // Best-effort cancellation signal to the I/O stream.
229        drop(self.events.unbounded_send(ReadEvent::Dropped(self.id)));
230    }
231}
232
233pub struct RequestMetrics {
234    pub individual_requests: Counter,
235    pub coalesced_requests: Counter,
236    pub num_requests_coalesced: Histogram,
237}
238
239impl RequestMetrics {
240    pub fn new(metrics_registry: &dyn MetricsRegistry, labels: Vec<Label>) -> Self {
241        Self {
242            individual_requests: MetricBuilder::new(metrics_registry)
243                .add_labels(labels.clone())
244                .counter("io.requests.individual"),
245            coalesced_requests: MetricBuilder::new(metrics_registry)
246                .add_labels(labels.clone())
247                .counter("io.requests.coalesced"),
248            num_requests_coalesced: MetricBuilder::new(metrics_registry)
249                .add_labels(labels)
250                .histogram("io.requests.coalesced.num_coalesced"),
251        }
252    }
253}