vortex_io/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use futures::future::BoxFuture;
9use vortex_buffer::Alignment;
10use vortex_buffer::ByteBuffer;
11use vortex_error::VortexExpect;
12use vortex_error::VortexResult;
13use vortex_error::vortex_bail;
14use vortex_metrics::Histogram;
15use vortex_metrics::Timer;
16use vortex_metrics::VortexMetrics;
17
18/// The read trait used within Vortex.
19///
20/// This trait provides async positional reads to underlying storage and is used by the vortex-file
21/// crate to read data from files or object stores.
22///
23/// It behaves a little differently from a typical async read trait in order to provide us with
24/// some nice additional semantics for use within Vortex. See the [`VortexReadAt::read_at`] method
25/// for details.
26pub trait VortexReadAt: Send + Sync + 'static {
27    /// Request an asynchronous positional read. Results will be returned as a [`ByteBuffer`].
28    ///
29    /// If the reader does not have the requested number of bytes, the returned Future will complete
30    /// with an [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof].
31    ///
32    /// This function returns a future with a `'static` lifetime. This allows us to define the
33    /// following semantics:
34    ///
35    /// This function returns a future with a `'static` lifetime, allowing us to define the
36    /// following semantics:
37    ///
38    /// * Creation of the future hints to the implementation that a read _may_ be required.
39    /// * Polling of the future indicates that the read _is now_ required.
40    /// * Dropping of the future indicates that the read is not required, and may be cancelled.
41    ///
42    /// Implementations may choose to ignore these semantics, but they allow optimizations such as
43    /// coalescing and cancellation. See [`crate::file::FileRead`] for an example of such an
44    /// implementation.
45    ///
46    /// ## For Developers
47    ///
48    /// This trait is left unsealed to provide maximum flexibility for users of the Vortex, however
49    /// we strongly recommend using the [`crate::file::FileRead`] abstraction where possible as we
50    /// will continue to evolve and optimize its implementation for the best performance across
51    /// as many filesystems and platforms as possible.
52    fn read_at(
53        &self,
54        offset: u64,
55        length: usize,
56        alignment: Alignment,
57    ) -> BoxFuture<'static, VortexResult<ByteBuffer>>;
58
59    /// Asynchronously get the number of bytes of the underlying file.
60    fn size(&self) -> BoxFuture<'static, VortexResult<u64>>;
61
62    // TODO(ngates): this is deprecated, but cannot yet be removed.
63    fn performance_hint(&self) -> PerformanceHint {
64        PerformanceHint::local()
65    }
66}
67
68#[derive(Debug, Clone)]
69pub struct PerformanceHint {
70    coalescing_window: u64,
71    max_read: Option<u64>,
72}
73
74impl PerformanceHint {
75    pub fn new(coalescing_window: u64, max_read: Option<u64>) -> Self {
76        Self {
77            coalescing_window,
78            max_read,
79        }
80    }
81
82    /// Creates a new instance with a profile appropriate for fast local storage, like memory or files on NVMe devices.
83    pub fn local() -> Self {
84        // Coalesce ~8K page size, also ensures we span padding for adjacent segments.
85        Self::new(8192, Some(8192))
86    }
87
88    pub fn object_storage() -> Self {
89        Self::new(
90            1 << 20,       // 1MB,
91            Some(8 << 20), // 8MB,
92        )
93    }
94
95    /// The maximum distance between two reads that should coalesced into a single operation.
96    pub fn coalescing_window(&self) -> u64 {
97        self.coalescing_window
98    }
99
100    /// Maximum number of bytes in a coalesced read.
101    pub fn max_read(&self) -> Option<u64> {
102        self.max_read
103    }
104}
105
106impl<R: VortexReadAt> VortexReadAt for Arc<R> {
107    fn read_at(
108        &self,
109        offset: u64,
110        length: usize,
111        alignment: Alignment,
112    ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
113        self.as_ref().read_at(offset, length, alignment)
114    }
115
116    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
117        self.as_ref().size()
118    }
119
120    fn performance_hint(&self) -> PerformanceHint {
121        self.as_ref().performance_hint()
122    }
123}
124
125impl VortexReadAt for ByteBuffer {
126    fn read_at(
127        &self,
128        offset: u64,
129        length: usize,
130        alignment: Alignment,
131    ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
132        let buffer = self.clone();
133        async move {
134            let start = usize::try_from(offset).vortex_expect("start too big for usize");
135            let end =
136                usize::try_from(offset + length as u64).vortex_expect("end too big for usize");
137            if end > buffer.len() {
138                vortex_bail!(
139                    "Requested range {}..{} out of bounds for buffer of length {}",
140                    start,
141                    end,
142                    buffer.len()
143                );
144            }
145            Ok(buffer.slice_unaligned(start..end).aligned(alignment))
146        }
147        .boxed()
148    }
149
150    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
151        let length = self.len() as u64;
152        async move { Ok(length) }.boxed()
153    }
154
155    fn performance_hint(&self) -> PerformanceHint {
156        PerformanceHint::local()
157    }
158}
159
160#[derive(Clone)]
161pub struct InstrumentedReadAt<T: VortexReadAt> {
162    read: Arc<T>,
163    sizes: Arc<Histogram>,
164    durations: Arc<Timer>,
165}
166
167impl<T: VortexReadAt> InstrumentedReadAt<T> {
168    pub fn new(read: Arc<T>, metrics: &VortexMetrics) -> Self {
169        Self {
170            read,
171            sizes: metrics.histogram("vortex.io.read.size"),
172            durations: metrics.timer("vortex.io.read.duration"),
173        }
174    }
175}
176
177impl<T> Drop for InstrumentedReadAt<T>
178where
179    T: VortexReadAt,
180{
181    fn drop(&mut self) {
182        let sizes = self.sizes.snapshot();
183        log::debug!("Reads: {}", self.sizes.count());
184        log::debug!(
185            "Read size: p50={} p95={} p99={} p999={}",
186            sizes.value(0.5),
187            sizes.value(0.95),
188            sizes.value(0.99),
189            sizes.value(0.999),
190        );
191        let durations = self.durations.snapshot();
192        log::debug!(
193            "Read duration: p50={}ms p95={}ms p99={}ms p999={}ms",
194            durations.value(0.5) / 1_000_000.0,
195            durations.value(0.95) / 1_000_000.0,
196            durations.value(0.99) / 1_000_000.0,
197            durations.value(0.999) / 1_000_000.0
198        );
199    }
200}
201
202#[async_trait]
203impl<T: VortexReadAt> VortexReadAt for InstrumentedReadAt<T> {
204    fn read_at(
205        &self,
206        offset: u64,
207        length: usize,
208        alignment: Alignment,
209    ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
210        let durations = self.durations.clone();
211        let sizes = self.sizes.clone();
212        let read_fut = self.read.read_at(offset, length, alignment);
213        async move {
214            let _timer = durations.time();
215            let buf = read_fut.await;
216            sizes.update(length as i64);
217            buf
218        }
219        .boxed()
220    }
221
222    #[inline]
223    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
224        self.read.size()
225    }
226
227    fn performance_hint(&self) -> PerformanceHint {
228        self.read.performance_hint()
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use std::sync::Arc;
235
236    use vortex_buffer::Alignment;
237    use vortex_buffer::ByteBuffer;
238
239    use super::*;
240
241    #[test]
242    fn test_performance_hint_local() {
243        let hint = PerformanceHint::local();
244        assert_eq!(hint.coalescing_window(), 8192);
245        assert_eq!(hint.max_read(), Some(8192));
246    }
247
248    #[test]
249    fn test_performance_hint_object_storage() {
250        let hint = PerformanceHint::object_storage();
251        assert_eq!(hint.coalescing_window(), 1 << 20); // 1MB
252        assert_eq!(hint.max_read(), Some(8 << 20)); // 8MB
253    }
254
255    #[test]
256    fn test_performance_hint_custom() {
257        let hint = PerformanceHint::new(4096, Some(16384));
258        assert_eq!(hint.coalescing_window(), 4096);
259        assert_eq!(hint.max_read(), Some(16384));
260    }
261
262    #[test]
263    fn test_performance_hint_no_max() {
264        let hint = PerformanceHint::new(2048, None);
265        assert_eq!(hint.coalescing_window(), 2048);
266        assert_eq!(hint.max_read(), None);
267    }
268
269    #[tokio::test]
270    async fn test_byte_buffer_read_at() {
271        let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]);
272
273        let result = data.read_at(1, 3, Alignment::none()).await.unwrap();
274        assert_eq!(result.as_ref(), &[2, 3, 4]);
275    }
276
277    #[tokio::test]
278    async fn test_byte_buffer_read_out_of_bounds() {
279        let data = ByteBuffer::from(vec![1, 2, 3]);
280
281        let result = data.read_at(1, 9, Alignment::none()).await;
282        assert!(result.is_err());
283    }
284
285    #[tokio::test]
286    async fn test_arc_read_at() {
287        let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));
288
289        let result = data.read_at(2, 3, Alignment::none()).await.unwrap();
290        assert_eq!(result.as_ref(), &[3, 4, 5]);
291
292        let size = data.size().await.unwrap();
293        assert_eq!(size, 5);
294    }
295}