Skip to main content

vortex_io/
read_at.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use futures::FutureExt;
7use futures::future::BoxFuture;
8use vortex_array::buffer::BufferHandle;
9use vortex_buffer::Alignment;
10use vortex_buffer::ByteBuffer;
11use vortex_error::VortexExpect;
12use vortex_error::VortexResult;
13use vortex_error::vortex_bail;
14use vortex_metrics::Counter;
15use vortex_metrics::Histogram;
16use vortex_metrics::Label;
17use vortex_metrics::MetricBuilder;
18use vortex_metrics::MetricsRegistry;
19use vortex_metrics::Timer;
20
21/// Configuration for coalescing nearby I/O requests into single operations.
22#[derive(Clone, Copy, Debug)]
23pub struct CoalesceConfig {
24    /// The maximum "empty" distance between two requests to consider them for coalescing.
25    pub distance: u64,
26    /// The maximum total size spanned by a coalesced request.
27    pub max_size: u64,
28}
29
30impl CoalesceConfig {
31    /// Creates a new coalesce configuration.
32    pub const fn new(distance: u64, max_size: u64) -> Self {
33        Self { distance, max_size }
34    }
35
36    /// Configuration appropriate for in-memory / low-latency sources.
37    pub const fn in_memory() -> Self {
38        Self::new(8 * 1024, 8 * 1024) // 8KB
39    }
40
41    /// Configuration appropriate for local filesystem access.
42    pub const fn file() -> Self {
43        Self::new(1 << 20, 4 << 20) // 1MB distance, 4MB max
44    }
45
46    /// Configuration appropriate for object storage (S3, GCS, etc.).
47    pub const fn object_storage() -> Self {
48        Self::new(1 << 20, 16 << 20) // 1MB distance, 16MB max
49    }
50}
51
52/// The unified read trait for Vortex I/O sources.
53///
54/// This trait provides async positional reads to underlying storage and is used by the vortex-file
55/// crate to read data from files or object stores.
56pub trait VortexReadAt: Send + Sync + 'static {
57    /// URI for debugging/logging. Returns `None` for anonymous sources.
58    fn uri(&self) -> Option<&Arc<str>> {
59        None
60    }
61
62    /// Configuration for merging nearby I/O requests into fewer, larger reads.
63    fn coalesce_config(&self) -> Option<CoalesceConfig> {
64        None
65    }
66
67    /// Maximum number of concurrent I/O requests for that should be pulled from this source.
68    ///
69    /// This value is used to control how many [`VortexReadAt::read_at`] calls can
70    /// be in-flight simultaneously. Higher values allow more parallelism but consume
71    /// more resources (memory, file descriptors, network connections).
72    ///
73    /// Implementations should choose a value appropriate for their underlying storage
74    /// characteristics. Low-latency sources benefit less from high concurrency, while
75    /// high-latency sources (like remote storage) benefit significantly from issuing
76    /// many requests in parallel.
77    fn concurrency(&self) -> usize;
78
79    /// Asynchronously get the number of bytes of the underlying source.
80    fn size(&self) -> BoxFuture<'static, VortexResult<u64>>;
81
82    /// Request an asynchronous positional read. Results will be returned as a [`BufferHandle`].
83    ///
84    /// If the reader does not have the requested number of bytes, the returned Future will complete
85    /// with an [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof] error.
86    fn read_at(
87        &self,
88        offset: u64,
89        length: usize,
90        alignment: Alignment,
91    ) -> BoxFuture<'static, VortexResult<BufferHandle>>;
92}
93
94impl VortexReadAt for Arc<dyn VortexReadAt> {
95    fn uri(&self) -> Option<&Arc<str>> {
96        self.as_ref().uri()
97    }
98
99    fn coalesce_config(&self) -> Option<CoalesceConfig> {
100        self.as_ref().coalesce_config()
101    }
102
103    fn concurrency(&self) -> usize {
104        self.as_ref().concurrency()
105    }
106
107    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
108        self.as_ref().size()
109    }
110
111    fn read_at(
112        &self,
113        offset: u64,
114        length: usize,
115        alignment: Alignment,
116    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
117        self.as_ref().read_at(offset, length, alignment)
118    }
119}
120
121impl<R: VortexReadAt> VortexReadAt for Arc<R> {
122    fn uri(&self) -> Option<&Arc<str>> {
123        self.as_ref().uri()
124    }
125
126    fn coalesce_config(&self) -> Option<CoalesceConfig> {
127        self.as_ref().coalesce_config()
128    }
129
130    fn concurrency(&self) -> usize {
131        self.as_ref().concurrency()
132    }
133
134    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
135        self.as_ref().size()
136    }
137
138    fn read_at(
139        &self,
140        offset: u64,
141        length: usize,
142        alignment: Alignment,
143    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
144        self.as_ref().read_at(offset, length, alignment)
145    }
146}
147
148impl VortexReadAt for ByteBuffer {
149    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
150        let length = self.len() as u64;
151        async move { Ok(length) }.boxed()
152    }
153
154    fn concurrency(&self) -> usize {
155        16
156    }
157
158    fn read_at(
159        &self,
160        offset: u64,
161        length: usize,
162        alignment: Alignment,
163    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
164        let buffer = self.clone();
165        async move {
166            let start = usize::try_from(offset).vortex_expect("start too big for usize");
167            let end =
168                usize::try_from(offset + length as u64).vortex_expect("end too big for usize");
169            if end > buffer.len() {
170                vortex_bail!(
171                    "Requested range {}..{} out of bounds for buffer of length {}",
172                    start,
173                    end,
174                    buffer.len()
175                );
176            }
177            Ok(BufferHandle::new_host(
178                buffer.slice_unaligned(start..end).aligned(alignment),
179            ))
180        }
181        .boxed()
182    }
183}
184
185/// A wrapper that instruments a [`VortexReadAt`] with metrics.
186#[derive(Clone)]
187pub struct InstrumentedReadAt<T: VortexReadAt + Clone> {
188    read: T,
189    // We use `Arc` to take care of all the complexity that's potentially associated with reference counting
190    // and dropping
191    metrics: Arc<InnerMetrics>,
192}
193
194struct InnerMetrics {
195    sizes: Histogram,
196    total_size: Counter,
197    durations: Timer,
198}
199
200impl<T: VortexReadAt + Clone> InstrumentedReadAt<T> {
201    pub fn new(read: T, metrics_registry: &dyn MetricsRegistry) -> Self {
202        Self::new_with_labels(read, metrics_registry, Vec::<Label>::default())
203    }
204
205    pub fn new_with_labels<I, L>(read: T, metrics_registry: &dyn MetricsRegistry, labels: I) -> Self
206    where
207        I: IntoIterator<Item = L>,
208        L: Into<Label>,
209    {
210        let labels = labels.into_iter().map(|l| l.into()).collect::<Vec<Label>>();
211        let sizes = MetricBuilder::new(metrics_registry)
212            .add_labels(labels.clone())
213            .histogram("vortex.io.read.size");
214        let total_size = MetricBuilder::new(metrics_registry)
215            .add_labels(labels.clone())
216            .counter("vortex.io.read.total_size");
217        let durations = MetricBuilder::new(metrics_registry)
218            .add_labels(labels)
219            .timer("vortex.io.read.duration");
220
221        Self {
222            read,
223            metrics: Arc::new(InnerMetrics {
224                sizes,
225                total_size,
226                durations,
227            }),
228        }
229    }
230}
231
232// We implement drop for `InnerMetrics` so this will be logged only when we eventually drop the final instance of `InstrumentedRead`
233impl Drop for InnerMetrics {
234    #[allow(clippy::cognitive_complexity)]
235    fn drop(&mut self) {
236        tracing::debug!("Reads: {}", self.sizes.count());
237        if !self.sizes.is_empty() {
238            tracing::debug!(
239                "Read size: p50={} p95={} p99={} p999={}",
240                self.sizes.quantile(0.5).vortex_expect("must not be empty"),
241                self.sizes.quantile(0.95).vortex_expect("must not be empty"),
242                self.sizes.quantile(0.99).vortex_expect("must not be empty"),
243                self.sizes
244                    .quantile(0.999)
245                    .vortex_expect("must not be empty"),
246            );
247        }
248
249        let total_size = self.total_size.value();
250        tracing::debug!("Total read size: {total_size}");
251
252        if !self.durations.is_empty() {
253            tracing::debug!(
254                "Read duration: p50={}ms p95={}ms p99={}ms p999={}ms",
255                self.durations
256                    .quantile(0.5)
257                    .vortex_expect("must not be empty")
258                    .as_millis(),
259                self.durations
260                    .quantile(0.95)
261                    .vortex_expect("must not be empty")
262                    .as_millis(),
263                self.durations
264                    .quantile(0.99)
265                    .vortex_expect("must not be empty")
266                    .as_millis(),
267                self.durations
268                    .quantile(0.999)
269                    .vortex_expect("must not be empty")
270                    .as_millis(),
271            );
272        }
273    }
274}
275
276impl<T: VortexReadAt + Clone> VortexReadAt for InstrumentedReadAt<T> {
277    fn uri(&self) -> Option<&Arc<str>> {
278        self.read.uri()
279    }
280
281    fn coalesce_config(&self) -> Option<CoalesceConfig> {
282        self.read.coalesce_config()
283    }
284
285    fn concurrency(&self) -> usize {
286        self.read.concurrency()
287    }
288
289    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
290        self.read.size()
291    }
292
293    fn read_at(
294        &self,
295        offset: u64,
296        length: usize,
297        alignment: Alignment,
298    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
299        let durations = self.metrics.durations.clone();
300        let sizes = self.metrics.sizes.clone();
301        let total_size = self.metrics.total_size.clone();
302
303        let read_fut = self.read.read_at(offset, length, alignment);
304        async move {
305            let _timer = durations.time();
306            let buf = read_fut.await;
307            sizes.update(length as f64);
308            total_size.add(length as u64);
309            buf
310        }
311        .boxed()
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use std::sync::Arc;
318
319    use vortex_buffer::Alignment;
320    use vortex_buffer::ByteBuffer;
321
322    use super::*;
323
324    #[test]
325    fn test_coalesce_config_in_memory() {
326        let config = CoalesceConfig::in_memory();
327        assert_eq!(config.distance, 8 * 1024);
328        assert_eq!(config.max_size, 8 * 1024);
329    }
330
331    #[test]
332    fn test_coalesce_config_file() {
333        let config = CoalesceConfig::file();
334        assert_eq!(config.distance, 1 << 20); // 1MB
335        assert_eq!(config.max_size, 4 << 20); // 4MB
336    }
337
338    #[test]
339    fn test_coalesce_config_object_storage() {
340        let config = CoalesceConfig::object_storage();
341        assert_eq!(config.distance, 1 << 20); // 1MB
342        assert_eq!(config.max_size, 16 << 20); // 16MB
343    }
344
345    #[tokio::test]
346    async fn test_byte_buffer_read_at() {
347        let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]);
348
349        let result = data.read_at(1, 3, Alignment::none()).await.unwrap();
350        assert_eq!(result.to_host().await.as_ref(), &[2, 3, 4]);
351    }
352
353    #[tokio::test]
354    async fn test_byte_buffer_read_out_of_bounds() {
355        let data = ByteBuffer::from(vec![1, 2, 3]);
356
357        let result = data.read_at(1, 9, Alignment::none()).await;
358        assert!(result.is_err());
359    }
360
361    #[tokio::test]
362    async fn test_arc_read_at() {
363        let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));
364
365        let result = data.read_at(2, 3, Alignment::none()).await.unwrap();
366        assert_eq!(result.to_host().await.as_ref(), &[3, 4, 5]);
367
368        let size = data.size().await.unwrap();
369        assert_eq!(size, 5);
370    }
371}