Skip to main content

vortex_io/
read_at.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use futures::FutureExt;
7use futures::future::BoxFuture;
8use vortex_array::buffer::BufferHandle;
9use vortex_buffer::Alignment;
10use vortex_buffer::ByteBuffer;
11use vortex_error::VortexExpect;
12use vortex_error::VortexResult;
13use vortex_error::vortex_bail;
14use vortex_metrics::Counter;
15use vortex_metrics::Histogram;
16use vortex_metrics::Label;
17use vortex_metrics::MetricBuilder;
18use vortex_metrics::MetricsRegistry;
19use vortex_metrics::Timer;
20
21/// Configuration for coalescing nearby I/O requests into single operations.
22#[derive(Clone, Copy, Debug)]
23pub struct CoalesceConfig {
24    /// The maximum "empty" distance between two requests to consider them for coalescing.
25    pub distance: u64,
26    /// The maximum total size spanned by a coalesced request.
27    pub max_size: u64,
28}
29
30impl CoalesceConfig {
31    /// Creates a new coalesce configuration.
32    pub const fn new(distance: u64, max_size: u64) -> Self {
33        Self { distance, max_size }
34    }
35
36    /// Configuration appropriate for fast local storage (memory, NVMe).
37    pub const fn local() -> Self {
38        Self::new(8 * 1024, 8 * 1024) // 8KB
39    }
40
41    /// Configuration appropriate for object storage (S3, GCS, etc.).
42    pub const fn object_storage() -> Self {
43        Self::new(1 << 20, 16 << 20) // 1MB distance, 16MB max
44    }
45}
46
47/// The unified read trait for Vortex I/O sources.
48///
49/// This trait provides async positional reads to underlying storage and is used by the vortex-file
50/// crate to read data from files or object stores.
51pub trait VortexReadAt: Send + Sync + 'static {
52    /// URI for debugging/logging. Returns `None` for anonymous sources.
53    fn uri(&self) -> Option<&Arc<str>> {
54        None
55    }
56
57    /// Configuration for merging nearby I/O requests into fewer, larger reads.
58    fn coalesce_config(&self) -> Option<CoalesceConfig> {
59        None
60    }
61
62    /// Maximum number of concurrent I/O requests for that should be pulled from this source.
63    ///
64    /// This value is used to control how many [`VortexReadAt::read_at`] calls can
65    /// be in-flight simultaneously. Higher values allow more parallelism but consume
66    /// more resources (memory, file descriptors, network connections).
67    ///
68    /// Implementations should choose a value appropriate for their underlying storage
69    /// characteristics. Low-latency sources benefit less from high concurrency, while
70    /// high-latency sources (like remote storage) benefit significantly from issuing
71    /// many requests in parallel.
72    fn concurrency(&self) -> usize;
73
74    /// Asynchronously get the number of bytes of the underlying source.
75    fn size(&self) -> BoxFuture<'static, VortexResult<u64>>;
76
77    /// Request an asynchronous positional read. Results will be returned as a [`BufferHandle`].
78    ///
79    /// If the reader does not have the requested number of bytes, the returned Future will complete
80    /// with an [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof] error.
81    fn read_at(
82        &self,
83        offset: u64,
84        length: usize,
85        alignment: Alignment,
86    ) -> BoxFuture<'static, VortexResult<BufferHandle>>;
87}
88
89impl VortexReadAt for Arc<dyn VortexReadAt> {
90    fn uri(&self) -> Option<&Arc<str>> {
91        self.as_ref().uri()
92    }
93
94    fn coalesce_config(&self) -> Option<CoalesceConfig> {
95        self.as_ref().coalesce_config()
96    }
97
98    fn concurrency(&self) -> usize {
99        self.as_ref().concurrency()
100    }
101
102    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
103        self.as_ref().size()
104    }
105
106    fn read_at(
107        &self,
108        offset: u64,
109        length: usize,
110        alignment: Alignment,
111    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
112        self.as_ref().read_at(offset, length, alignment)
113    }
114}
115
116impl<R: VortexReadAt> VortexReadAt for Arc<R> {
117    fn uri(&self) -> Option<&Arc<str>> {
118        self.as_ref().uri()
119    }
120
121    fn coalesce_config(&self) -> Option<CoalesceConfig> {
122        self.as_ref().coalesce_config()
123    }
124
125    fn concurrency(&self) -> usize {
126        self.as_ref().concurrency()
127    }
128
129    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
130        self.as_ref().size()
131    }
132
133    fn read_at(
134        &self,
135        offset: u64,
136        length: usize,
137        alignment: Alignment,
138    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
139        self.as_ref().read_at(offset, length, alignment)
140    }
141}
142
143impl VortexReadAt for ByteBuffer {
144    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
145        let length = self.len() as u64;
146        async move { Ok(length) }.boxed()
147    }
148
149    fn concurrency(&self) -> usize {
150        16
151    }
152
153    fn read_at(
154        &self,
155        offset: u64,
156        length: usize,
157        alignment: Alignment,
158    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
159        let buffer = self.clone();
160        async move {
161            let start = usize::try_from(offset).vortex_expect("start too big for usize");
162            let end =
163                usize::try_from(offset + length as u64).vortex_expect("end too big for usize");
164            if end > buffer.len() {
165                vortex_bail!(
166                    "Requested range {}..{} out of bounds for buffer of length {}",
167                    start,
168                    end,
169                    buffer.len()
170                );
171            }
172            Ok(BufferHandle::new_host(
173                buffer.slice_unaligned(start..end).aligned(alignment),
174            ))
175        }
176        .boxed()
177    }
178}
179
180/// A wrapper that instruments a [`VortexReadAt`] with metrics.
181#[derive(Clone)]
182pub struct InstrumentedReadAt<T: VortexReadAt + Clone> {
183    read: T,
184    // We use `Arc` to take care of all the complexity that's potentially associated with reference counting
185    // and dropping
186    metrics: Arc<InnerMetrics>,
187}
188
189struct InnerMetrics {
190    sizes: Histogram,
191    total_size: Counter,
192    durations: Timer,
193}
194
195impl<T: VortexReadAt + Clone> InstrumentedReadAt<T> {
196    pub fn new(read: T, metrics_registry: &dyn MetricsRegistry) -> Self {
197        Self::new_with_labels(read, metrics_registry, Vec::<Label>::default())
198    }
199
200    pub fn new_with_labels<I, L>(read: T, metrics_registry: &dyn MetricsRegistry, labels: I) -> Self
201    where
202        I: IntoIterator<Item = L>,
203        L: Into<Label>,
204    {
205        let labels = labels.into_iter().map(|l| l.into()).collect::<Vec<Label>>();
206        let sizes = MetricBuilder::new(metrics_registry)
207            .add_labels(labels.clone())
208            .histogram("vortex.io.read.size");
209        let total_size = MetricBuilder::new(metrics_registry)
210            .add_labels(labels.clone())
211            .counter("vortex.io.read.total_size");
212        let durations = MetricBuilder::new(metrics_registry)
213            .add_labels(labels)
214            .timer("vortex.io.read.duration");
215
216        Self {
217            read,
218            metrics: Arc::new(InnerMetrics {
219                sizes,
220                total_size,
221                durations,
222            }),
223        }
224    }
225}
226
227// We implement drop for `InnerMetrics` so this will be logged only when we eventually drop the final instance of `InstrumentedRead`
228impl Drop for InnerMetrics {
229    #[allow(clippy::cognitive_complexity)]
230    fn drop(&mut self) {
231        tracing::debug!("Reads: {}", self.sizes.count());
232        if !self.sizes.is_empty() {
233            tracing::debug!(
234                "Read size: p50={} p95={} p99={} p999={}",
235                self.sizes.quantile(0.5).vortex_expect("must not be empty"),
236                self.sizes.quantile(0.95).vortex_expect("must not be empty"),
237                self.sizes.quantile(0.99).vortex_expect("must not be empty"),
238                self.sizes
239                    .quantile(0.999)
240                    .vortex_expect("must not be empty"),
241            );
242        }
243
244        let total_size = self.total_size.value();
245        tracing::debug!("Total read size: {total_size}");
246
247        if !self.durations.is_empty() {
248            tracing::debug!(
249                "Read duration: p50={}ms p95={}ms p99={}ms p999={}ms",
250                self.durations
251                    .quantile(0.5)
252                    .vortex_expect("must not be empty")
253                    .as_millis(),
254                self.durations
255                    .quantile(0.95)
256                    .vortex_expect("must not be empty")
257                    .as_millis(),
258                self.durations
259                    .quantile(0.99)
260                    .vortex_expect("must not be empty")
261                    .as_millis(),
262                self.durations
263                    .quantile(0.999)
264                    .vortex_expect("must not be empty")
265                    .as_millis(),
266            );
267        }
268    }
269}
270
271impl<T: VortexReadAt + Clone> VortexReadAt for InstrumentedReadAt<T> {
272    fn uri(&self) -> Option<&Arc<str>> {
273        self.read.uri()
274    }
275
276    fn coalesce_config(&self) -> Option<CoalesceConfig> {
277        self.read.coalesce_config()
278    }
279
280    fn concurrency(&self) -> usize {
281        self.read.concurrency()
282    }
283
284    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
285        self.read.size()
286    }
287
288    fn read_at(
289        &self,
290        offset: u64,
291        length: usize,
292        alignment: Alignment,
293    ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
294        let durations = self.metrics.durations.clone();
295        let sizes = self.metrics.sizes.clone();
296        let total_size = self.metrics.total_size.clone();
297
298        let read_fut = self.read.read_at(offset, length, alignment);
299        async move {
300            let _timer = durations.time();
301            let buf = read_fut.await;
302            sizes.update(length as f64);
303            total_size.add(length as u64);
304            buf
305        }
306        .boxed()
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use std::sync::Arc;
313
314    use vortex_buffer::Alignment;
315    use vortex_buffer::ByteBuffer;
316
317    use super::*;
318
319    #[test]
320    fn test_coalesce_config_local() {
321        let config = CoalesceConfig::local();
322        assert_eq!(config.distance, 8 * 1024);
323        assert_eq!(config.max_size, 8 * 1024);
324    }
325
326    #[test]
327    fn test_coalesce_config_object_storage() {
328        let config = CoalesceConfig::object_storage();
329        assert_eq!(config.distance, 1 << 20); // 1MB
330        assert_eq!(config.max_size, 16 << 20); // 16MB
331    }
332
333    #[tokio::test]
334    async fn test_byte_buffer_read_at() {
335        let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]);
336
337        let result = data.read_at(1, 3, Alignment::none()).await.unwrap();
338        assert_eq!(result.to_host().await.as_ref(), &[2, 3, 4]);
339    }
340
341    #[tokio::test]
342    async fn test_byte_buffer_read_out_of_bounds() {
343        let data = ByteBuffer::from(vec![1, 2, 3]);
344
345        let result = data.read_at(1, 9, Alignment::none()).await;
346        assert!(result.is_err());
347    }
348
349    #[tokio::test]
350    async fn test_arc_read_at() {
351        let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));
352
353        let result = data.read_at(2, 3, Alignment::none()).await.unwrap();
354        assert_eq!(result.to_host().await.as_ref(), &[3, 4, 5]);
355
356        let size = data.size().await.unwrap();
357        assert_eq!(size, 5);
358    }
359}