vortex_io/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use futures::future::BoxFuture;
9use vortex_buffer::{Alignment, ByteBuffer};
10use vortex_error::{VortexExpect, VortexResult, vortex_bail};
11use vortex_metrics::{Histogram, Timer, VortexMetrics};
12
13/// The read trait used within Vortex.
14///
15/// This trait provides async positional reads to underlying storage and is used by the vortex-file
16/// crate to read data from files or object stores.
17///
18/// It behaves a little differently from a typical async read trait in order to provide us with
19/// some nice additional semantics for use within Vortex. See the [`VortexReadAt::read_at`] method
20/// for details.
21pub trait VortexReadAt: Send + Sync + 'static {
22    /// Request an asynchronous positional read. Results will be returned as a [`ByteBuffer`].
23    ///
24    /// If the reader does not have the requested number of bytes, the returned Future will complete
25    /// with an [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof].
26    ///
27    /// This function returns a future with a `'static` lifetime. This allows us to define the
28    /// following semantics:
29    ///
30    /// This function returns a future with a `'static` lifetime, allowing us to define the
31    /// following semantics:
32    ///
33    /// * Creation of the future hints to the implementation that a read _may_ be required.
34    /// * Polling of the future indicates that the read _is now_ required.
35    /// * Dropping of the future indicates that the read is not required, and may be cancelled.
36    ///
37    /// Implementations may choose to ignore these semantics, but they allow optimizations such as
38    /// coalescing and cancellation. See [`crate::file::FileRead`] for an example of such an
39    /// implementation.
40    ///
41    /// ## For Developers
42    ///
43    /// This trait is left unsealed to provide maximum flexibility for users of the Vortex, however
44    /// we strongly recommend using the [`crate::file::FileRead`] abstraction where possible as we
45    /// will continue to evolve and optimize its implementation for the best performance across
46    /// as many filesystems and platforms as possible.
47    fn read_at(
48        &self,
49        offset: u64,
50        length: usize,
51        alignment: Alignment,
52    ) -> BoxFuture<'static, VortexResult<ByteBuffer>>;
53
54    /// Asynchronously get the number of bytes of the underlying file.
55    fn size(&self) -> BoxFuture<'static, VortexResult<u64>>;
56
57    // TODO(ngates): this is deprecated, but cannot yet be removed.
58    fn performance_hint(&self) -> PerformanceHint {
59        PerformanceHint::local()
60    }
61}
62
63#[derive(Debug, Clone)]
64pub struct PerformanceHint {
65    coalescing_window: u64,
66    max_read: Option<u64>,
67}
68
69impl PerformanceHint {
70    pub fn new(coalescing_window: u64, max_read: Option<u64>) -> Self {
71        Self {
72            coalescing_window,
73            max_read,
74        }
75    }
76
77    /// Creates a new instance with a profile appropriate for fast local storage, like memory or files on NVMe devices.
78    pub fn local() -> Self {
79        // Coalesce ~8K page size, also ensures we span padding for adjacent segments.
80        Self::new(8192, Some(8192))
81    }
82
83    pub fn object_storage() -> Self {
84        Self::new(
85            1 << 20,       // 1MB,
86            Some(8 << 20), // 8MB,
87        )
88    }
89
90    /// The maximum distance between two reads that should coalesced into a single operation.
91    pub fn coalescing_window(&self) -> u64 {
92        self.coalescing_window
93    }
94
95    /// Maximum number of bytes in a coalesced read.
96    pub fn max_read(&self) -> Option<u64> {
97        self.max_read
98    }
99}
100
101impl<R: VortexReadAt> VortexReadAt for Arc<R> {
102    fn read_at(
103        &self,
104        offset: u64,
105        length: usize,
106        alignment: Alignment,
107    ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
108        self.as_ref().read_at(offset, length, alignment)
109    }
110
111    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
112        self.as_ref().size()
113    }
114
115    fn performance_hint(&self) -> PerformanceHint {
116        self.as_ref().performance_hint()
117    }
118}
119
120impl VortexReadAt for ByteBuffer {
121    fn read_at(
122        &self,
123        offset: u64,
124        length: usize,
125        alignment: Alignment,
126    ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
127        let buffer = self.clone();
128        async move {
129            let start = usize::try_from(offset).vortex_expect("start too big for usize");
130            let end =
131                usize::try_from(offset + length as u64).vortex_expect("end too big for usize");
132            if end > buffer.len() {
133                vortex_bail!(
134                    "Requested range {}..{} out of bounds for buffer of length {}",
135                    start,
136                    end,
137                    buffer.len()
138                );
139            }
140            Ok(buffer.slice_unaligned(start..end).aligned(alignment))
141        }
142        .boxed()
143    }
144
145    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
146        let length = self.len() as u64;
147        async move { Ok(length) }.boxed()
148    }
149
150    fn performance_hint(&self) -> PerformanceHint {
151        PerformanceHint::local()
152    }
153}
154
155#[derive(Clone)]
156pub struct InstrumentedReadAt<T: VortexReadAt> {
157    read: Arc<T>,
158    sizes: Arc<Histogram>,
159    durations: Arc<Timer>,
160}
161
162impl<T: VortexReadAt> InstrumentedReadAt<T> {
163    pub fn new(read: Arc<T>, metrics: &VortexMetrics) -> Self {
164        Self {
165            read,
166            sizes: metrics.histogram("vortex.io.read.size"),
167            durations: metrics.timer("vortex.io.read.duration"),
168        }
169    }
170}
171
172impl<T> Drop for InstrumentedReadAt<T>
173where
174    T: VortexReadAt,
175{
176    fn drop(&mut self) {
177        let sizes = self.sizes.snapshot();
178        log::debug!("Reads: {}", self.sizes.count());
179        log::debug!(
180            "Read size: p50={} p95={} p99={} p999={}",
181            sizes.value(0.5),
182            sizes.value(0.95),
183            sizes.value(0.99),
184            sizes.value(0.999),
185        );
186        let durations = self.durations.snapshot();
187        log::debug!(
188            "Read duration: p50={}ms p95={}ms p99={}ms p999={}ms",
189            durations.value(0.5) / 1_000_000.0,
190            durations.value(0.95) / 1_000_000.0,
191            durations.value(0.99) / 1_000_000.0,
192            durations.value(0.999) / 1_000_000.0
193        );
194    }
195}
196
197#[async_trait]
198impl<T: VortexReadAt> VortexReadAt for InstrumentedReadAt<T> {
199    fn read_at(
200        &self,
201        offset: u64,
202        length: usize,
203        alignment: Alignment,
204    ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
205        let durations = self.durations.clone();
206        let sizes = self.sizes.clone();
207        let read_fut = self.read.read_at(offset, length, alignment);
208        async move {
209            let _timer = durations.time();
210            let buf = read_fut.await;
211            sizes.update(length as i64);
212            buf
213        }
214        .boxed()
215    }
216
217    #[inline]
218    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
219        self.read.size()
220    }
221
222    fn performance_hint(&self) -> PerformanceHint {
223        self.read.performance_hint()
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use std::sync::Arc;
230
231    use vortex_buffer::{Alignment, ByteBuffer};
232
233    use super::*;
234
235    #[test]
236    fn test_performance_hint_local() {
237        let hint = PerformanceHint::local();
238        assert_eq!(hint.coalescing_window(), 8192);
239        assert_eq!(hint.max_read(), Some(8192));
240    }
241
242    #[test]
243    fn test_performance_hint_object_storage() {
244        let hint = PerformanceHint::object_storage();
245        assert_eq!(hint.coalescing_window(), 1 << 20); // 1MB
246        assert_eq!(hint.max_read(), Some(8 << 20)); // 8MB
247    }
248
249    #[test]
250    fn test_performance_hint_custom() {
251        let hint = PerformanceHint::new(4096, Some(16384));
252        assert_eq!(hint.coalescing_window(), 4096);
253        assert_eq!(hint.max_read(), Some(16384));
254    }
255
256    #[test]
257    fn test_performance_hint_no_max() {
258        let hint = PerformanceHint::new(2048, None);
259        assert_eq!(hint.coalescing_window(), 2048);
260        assert_eq!(hint.max_read(), None);
261    }
262
263    #[tokio::test]
264    async fn test_byte_buffer_read_at() {
265        let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]);
266
267        let result = data.read_at(1, 3, Alignment::none()).await.unwrap();
268        assert_eq!(result.as_ref(), &[2, 3, 4]);
269    }
270
271    #[tokio::test]
272    async fn test_byte_buffer_read_out_of_bounds() {
273        let data = ByteBuffer::from(vec![1, 2, 3]);
274
275        let result = data.read_at(1, 9, Alignment::none()).await;
276        assert!(result.is_err());
277    }
278
279    #[tokio::test]
280    async fn test_arc_read_at() {
281        let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));
282
283        let result = data.read_at(2, 3, Alignment::none()).await.unwrap();
284        assert_eq!(result.as_ref(), &[3, 4, 5]);
285
286        let size = data.size().await.unwrap();
287        assert_eq!(size, 5);
288    }
289}