Skip to main content

vortex_io/
read_at.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use futures::FutureExt;
7use futures::future::BoxFuture;
8use vortex_array::buffer::BufferHandle;
9use vortex_buffer::Alignment;
10use vortex_buffer::ByteBuffer;
11use vortex_error::VortexExpect;
12use vortex_error::VortexResult;
13use vortex_error::vortex_bail;
14use vortex_metrics::Counter;
15use vortex_metrics::Histogram;
16use vortex_metrics::Label;
17use vortex_metrics::MetricBuilder;
18use vortex_metrics::MetricsRegistry;
19use vortex_metrics::Timer;
20
21/// Configuration for coalescing nearby I/O requests into single operations.
22#[derive(Clone, Copy, Debug)]
23pub struct CoalesceConfig {
24    /// The maximum "empty" distance between two requests to consider them for coalescing.
25    pub distance: u64,
26    /// The maximum total size spanned by a coalesced request.
27    pub max_size: u64,
28}
29
30impl CoalesceConfig {
31    /// Creates a new coalesce configuration.
32    pub const fn new(distance: u64, max_size: u64) -> Self {
33        Self { distance, max_size }
34    }
35
36    /// Configuration appropriate for in-memory / low-latency sources.
37    pub const fn in_memory() -> Self {
38        Self::new(8 * 1024, 8 * 1024) // 8KB
39    }
40
41    /// Configuration appropriate for local filesystem access.
42    pub const fn file() -> Self {
43        Self::new(1 << 20, 4 << 20) // 1MB distance, 4MB max
44    }
45
46    /// Configuration appropriate for object storage (S3, GCS, etc.).
47    pub const fn object_storage() -> Self {
48        Self::new(1 << 20, 16 << 20) // 1MB distance, 16MB max
49    }
50}
51
52/// The unified read trait for Vortex I/O sources.
53///
54/// This trait provides async positional reads to underlying storage and is used by the vortex-file
55/// crate to read data from files or object stores.
56pub trait VortexReadAt: Send + Sync + 'static {
57    /// URI for debugging/logging. Returns `None` for anonymous sources.
58    fn uri(&self) -> Option<&Arc<str>> {
59        None
60    }
61
62    /// Configuration for merging nearby I/O requests into fewer, larger reads.
63    fn coalesce_config(&self) -> Option<CoalesceConfig> {
64        None
65    }
66
67    /// Maximum number of concurrent I/O requests for that should be pulled from this source.
68    ///
69    /// This value is used to control how many [`VortexReadAt::read_at`] calls can
70    /// be in-flight simultaneously. Higher values allow more parallelism but consume
71    /// more resources (memory, file descriptors, network connections).
72    ///
73    /// Implementations should choose a value appropriate for their underlying storage
74    /// characteristics. Low-latency sources benefit less from high concurrency, while
75    /// high-latency sources (like remote storage) benefit significantly from issuing
76    /// many requests in parallel.
77    fn concurrency(&self) -> usize;
78
79    /// Asynchronously get the number of bytes of the underlying source.
80    fn size(&self) -> BoxFuture<'static, VortexResult<u64>>;
81
82    /// Request an asynchronous positional read. Results will be returned as a [`BufferHandle`].
83    ///
84    /// If the reader does not have the requested number of bytes, the returned Future will complete
85    /// with an [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof] error.
86    fn read_at(
87        &self,
88        offset: u64,
89        length: usize,
90        alignment: Alignment,
91    ) -> BoxFuture<'static, VortexResult<BufferHandle>>;
92}
93
94impl VortexReadAt for Arc<dyn VortexReadAt> {
95    fn uri(&self) -> Option<&Arc<str>> {
96        self.as_ref().uri()
97    }
98
99    fn coalesce_config(&self) -> Option<CoalesceConfig> {
100        self.as_ref().coalesce_config()
101    }
102
103    fn concurrency(&self) -> usize {
104        self.as_ref().concurrency()
105    }
106
107    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
108        self.as_ref().size()
109    }
110
111    fn read_at(
112        &self,
113        offset: u64,
114        length: usize,
115        alignment: Alignment,
116    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
117        self.as_ref().read_at(offset, length, alignment)
118    }
119}
120
121impl<R: VortexReadAt> VortexReadAt for Arc<R> {
122    fn uri(&self) -> Option<&Arc<str>> {
123        self.as_ref().uri()
124    }
125
126    fn coalesce_config(&self) -> Option<CoalesceConfig> {
127        self.as_ref().coalesce_config()
128    }
129
130    fn concurrency(&self) -> usize {
131        self.as_ref().concurrency()
132    }
133
134    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
135        self.as_ref().size()
136    }
137
138    fn read_at(
139        &self,
140        offset: u64,
141        length: usize,
142        alignment: Alignment,
143    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
144        self.as_ref().read_at(offset, length, alignment)
145    }
146}
147
148impl VortexReadAt for ByteBuffer {
149    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
150        let length = self.len() as u64;
151        async move { Ok(length) }.boxed()
152    }
153
154    fn concurrency(&self) -> usize {
155        16
156    }
157
158    fn read_at(
159        &self,
160        offset: u64,
161        length: usize,
162        alignment: Alignment,
163    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
164        let buffer = self.clone();
165        async move {
166            let start = usize::try_from(offset).vortex_expect("start too big for usize");
167            let end =
168                usize::try_from(offset + length as u64).vortex_expect("end too big for usize");
169            if end > buffer.len() {
170                vortex_bail!(
171                    "Requested range {}..{} out of bounds for buffer of length {}",
172                    start,
173                    end,
174                    buffer.len()
175                );
176            }
177            Ok(BufferHandle::new_host(
178                buffer.slice_unaligned(start..end).aligned(alignment),
179            ))
180        }
181        .boxed()
182    }
183}
184
185/// A wrapper that instruments a [`VortexReadAt`] with metrics.
186#[derive(Clone)]
187pub struct InstrumentedReadAt<T: VortexReadAt + Clone> {
188    read: T,
189    // We use `Arc` to take care of all the complexity that's potentially associated with reference counting
190    // and dropping
191    metrics: Arc<InnerMetrics>,
192}
193
194struct InnerMetrics {
195    sizes: Histogram,
196    total_size: Counter,
197    durations: Timer,
198}
199
200impl<T: VortexReadAt + Clone> InstrumentedReadAt<T> {
201    pub fn new(read: T, metrics_registry: &dyn MetricsRegistry) -> Self {
202        Self::new_with_labels(read, metrics_registry, Vec::<Label>::default())
203    }
204
205    pub fn new_with_labels<I, L>(read: T, metrics_registry: &dyn MetricsRegistry, labels: I) -> Self
206    where
207        I: IntoIterator<Item = L>,
208        L: Into<Label>,
209    {
210        let labels = labels.into_iter().map(|l| l.into()).collect::<Vec<Label>>();
211        let sizes = MetricBuilder::new(metrics_registry)
212            .add_labels(labels.clone())
213            .histogram("vortex.io.read.size");
214        let total_size = MetricBuilder::new(metrics_registry)
215            .add_labels(labels.clone())
216            .counter("vortex.io.read.total_size");
217        let durations = MetricBuilder::new(metrics_registry)
218            .add_labels(labels)
219            .timer("vortex.io.read.duration");
220
221        Self {
222            read,
223            metrics: Arc::new(InnerMetrics {
224                sizes,
225                total_size,
226                durations,
227            }),
228        }
229    }
230}
231
232impl InnerMetrics {
233    fn log_sizes(&self) {
234        tracing::debug!("Reads: {}", self.sizes.count());
235        if !self.sizes.is_empty() {
236            tracing::debug!(
237                "Read size: p50={} p95={} p99={} p999={}",
238                self.sizes.quantile(0.5).vortex_expect("must not be empty"),
239                self.sizes.quantile(0.95).vortex_expect("must not be empty"),
240                self.sizes.quantile(0.99).vortex_expect("must not be empty"),
241                self.sizes
242                    .quantile(0.999)
243                    .vortex_expect("must not be empty"),
244            );
245        }
246        tracing::debug!("Total read size: {}", self.total_size.value());
247    }
248
249    fn log_durations(&self) {
250        if !self.durations.is_empty() {
251            tracing::debug!(
252                "Read duration: p50={}ms p95={}ms p99={}ms p999={}ms",
253                self.durations
254                    .quantile(0.5)
255                    .vortex_expect("must not be empty")
256                    .as_millis(),
257                self.durations
258                    .quantile(0.95)
259                    .vortex_expect("must not be empty")
260                    .as_millis(),
261                self.durations
262                    .quantile(0.99)
263                    .vortex_expect("must not be empty")
264                    .as_millis(),
265                self.durations
266                    .quantile(0.999)
267                    .vortex_expect("must not be empty")
268                    .as_millis(),
269            );
270        }
271    }
272}
273
274// We implement drop for `InnerMetrics` so this will be logged only when we eventually drop the final instance of `InstrumentedRead`
275impl Drop for InnerMetrics {
276    fn drop(&mut self) {
277        self.log_sizes();
278        self.log_durations();
279    }
280}
281
282impl<T: VortexReadAt + Clone> VortexReadAt for InstrumentedReadAt<T> {
283    fn uri(&self) -> Option<&Arc<str>> {
284        self.read.uri()
285    }
286
287    fn coalesce_config(&self) -> Option<CoalesceConfig> {
288        self.read.coalesce_config()
289    }
290
291    fn concurrency(&self) -> usize {
292        self.read.concurrency()
293    }
294
295    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
296        self.read.size()
297    }
298
299    fn read_at(
300        &self,
301        offset: u64,
302        length: usize,
303        alignment: Alignment,
304    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
305        let durations = self.metrics.durations.clone();
306        let sizes = self.metrics.sizes.clone();
307        let total_size = self.metrics.total_size.clone();
308
309        let read_fut = self.read.read_at(offset, length, alignment);
310        async move {
311            let _timer = durations.time();
312            let buf = read_fut.await;
313            sizes.update(length as f64);
314            total_size.add(length as u64);
315            buf
316        }
317        .boxed()
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use std::sync::Arc;
324
325    use vortex_buffer::Alignment;
326    use vortex_buffer::ByteBuffer;
327
328    use super::*;
329
330    #[test]
331    fn test_coalesce_config_in_memory() {
332        let config = CoalesceConfig::in_memory();
333        assert_eq!(config.distance, 8 * 1024);
334        assert_eq!(config.max_size, 8 * 1024);
335    }
336
337    #[test]
338    fn test_coalesce_config_file() {
339        let config = CoalesceConfig::file();
340        assert_eq!(config.distance, 1 << 20); // 1MB
341        assert_eq!(config.max_size, 4 << 20); // 4MB
342    }
343
344    #[test]
345    fn test_coalesce_config_object_storage() {
346        let config = CoalesceConfig::object_storage();
347        assert_eq!(config.distance, 1 << 20); // 1MB
348        assert_eq!(config.max_size, 16 << 20); // 16MB
349    }
350
351    #[tokio::test]
352    async fn test_byte_buffer_read_at() {
353        let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]);
354
355        let result = data.read_at(1, 3, Alignment::none()).await.unwrap();
356        assert_eq!(result.to_host().await.as_ref(), &[2, 3, 4]);
357    }
358
359    #[tokio::test]
360    async fn test_byte_buffer_read_out_of_bounds() {
361        let data = ByteBuffer::from(vec![1, 2, 3]);
362
363        let result = data.read_at(1, 9, Alignment::none()).await;
364        assert!(result.is_err());
365    }
366
367    #[tokio::test]
368    async fn test_arc_read_at() {
369        let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));
370
371        let result = data.read_at(2, 3, Alignment::none()).await.unwrap();
372        assert_eq!(result.to_host().await.as_ref(), &[3, 4, 5]);
373
374        let size = data.size().await.unwrap();
375        assert_eq!(size, 5);
376    }
377}