vortex_io/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use futures::future::BoxFuture;
9use vortex_buffer::Alignment;
10use vortex_buffer::ByteBuffer;
11use vortex_error::VortexExpect;
12use vortex_error::VortexResult;
13use vortex_error::vortex_bail;
14use vortex_metrics::Counter;
15use vortex_metrics::Histogram;
16use vortex_metrics::Timer;
17use vortex_metrics::VortexMetrics;
18
19/// The read trait used within Vortex.
20///
21/// This trait provides async positional reads to underlying storage and is used by the vortex-file
22/// crate to read data from files or object stores.
23///
24/// It behaves a little differently from a typical async read trait in order to provide us with
25/// some nice additional semantics for use within Vortex. See the [`VortexReadAt::read_at`] method
26/// for details.
27pub trait VortexReadAt: Send + Sync + 'static {
28    /// Request an asynchronous positional read. Results will be returned as a [`ByteBuffer`].
29    ///
30    /// If the reader does not have the requested number of bytes, the returned Future will complete
31    /// with an [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof].
32    ///
33    /// This function returns a future with a `'static` lifetime. This allows us to define the
34    /// following semantics:
35    ///
36    /// This function returns a future with a `'static` lifetime, allowing us to define the
37    /// following semantics:
38    ///
39    /// * Creation of the future hints to the implementation that a read _may_ be required.
40    /// * Polling of the future indicates that the read _is now_ required.
41    /// * Dropping of the future indicates that the read is not required, and may be cancelled.
42    ///
43    /// Implementations may choose to ignore these semantics, but they allow optimizations such as
44    /// coalescing and cancellation. See [`crate::file::FileRead`] for an example of such an
45    /// implementation.
46    ///
47    /// ## For Developers
48    ///
49    /// This trait is left unsealed to provide maximum flexibility for users of the Vortex, however
50    /// we strongly recommend using the [`crate::file::FileRead`] abstraction where possible as we
51    /// will continue to evolve and optimize its implementation for the best performance across
52    /// as many filesystems and platforms as possible.
53    fn read_at(
54        &self,
55        offset: u64,
56        length: usize,
57        alignment: Alignment,
58    ) -> BoxFuture<'static, VortexResult<ByteBuffer>>;
59
60    /// Asynchronously get the number of bytes of the underlying file.
61    fn size(&self) -> BoxFuture<'static, VortexResult<u64>>;
62
63    // TODO(ngates): this is deprecated, but cannot yet be removed.
64    fn performance_hint(&self) -> PerformanceHint {
65        PerformanceHint::local()
66    }
67}
68
69#[derive(Debug, Clone)]
70pub struct PerformanceHint {
71    coalescing_window: u64,
72    max_read: Option<u64>,
73}
74
75impl PerformanceHint {
76    pub fn new(coalescing_window: u64, max_read: Option<u64>) -> Self {
77        Self {
78            coalescing_window,
79            max_read,
80        }
81    }
82
83    /// Creates a new instance with a profile appropriate for fast local storage, like memory or files on NVMe devices.
84    pub fn local() -> Self {
85        // Coalesce ~8K page size, also ensures we span padding for adjacent segments.
86        Self::new(8192, Some(8192))
87    }
88
89    pub fn object_storage() -> Self {
90        Self::new(
91            1 << 20,       // 1MB,
92            Some(8 << 20), // 8MB,
93        )
94    }
95
96    /// The maximum distance between two reads that should coalesced into a single operation.
97    pub fn coalescing_window(&self) -> u64 {
98        self.coalescing_window
99    }
100
101    /// Maximum number of bytes in a coalesced read.
102    pub fn max_read(&self) -> Option<u64> {
103        self.max_read
104    }
105}
106
107impl<R: VortexReadAt> VortexReadAt for Arc<R> {
108    fn read_at(
109        &self,
110        offset: u64,
111        length: usize,
112        alignment: Alignment,
113    ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
114        self.as_ref().read_at(offset, length, alignment)
115    }
116
117    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
118        self.as_ref().size()
119    }
120
121    fn performance_hint(&self) -> PerformanceHint {
122        self.as_ref().performance_hint()
123    }
124}
125
126impl VortexReadAt for ByteBuffer {
127    fn read_at(
128        &self,
129        offset: u64,
130        length: usize,
131        alignment: Alignment,
132    ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
133        let buffer = self.clone();
134        async move {
135            let start = usize::try_from(offset).vortex_expect("start too big for usize");
136            let end =
137                usize::try_from(offset + length as u64).vortex_expect("end too big for usize");
138            if end > buffer.len() {
139                vortex_bail!(
140                    "Requested range {}..{} out of bounds for buffer of length {}",
141                    start,
142                    end,
143                    buffer.len()
144                );
145            }
146            Ok(buffer.slice_unaligned(start..end).aligned(alignment))
147        }
148        .boxed()
149    }
150
151    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
152        let length = self.len() as u64;
153        async move { Ok(length) }.boxed()
154    }
155
156    fn performance_hint(&self) -> PerformanceHint {
157        PerformanceHint::local()
158    }
159}
160
161#[derive(Clone)]
162pub struct InstrumentedReadAt<T: VortexReadAt> {
163    read: Arc<T>,
164    sizes: Arc<Histogram>,
165    total_size: Arc<Counter>,
166    durations: Arc<Timer>,
167}
168
169impl<T: VortexReadAt> InstrumentedReadAt<T> {
170    pub fn new(read: Arc<T>, metrics: &VortexMetrics) -> Self {
171        Self {
172            read,
173            sizes: metrics.histogram("vortex.io.read.size"),
174            total_size: metrics.counter("vortex.io.read.total_size"),
175            durations: metrics.timer("vortex.io.read.duration"),
176        }
177    }
178}
179
180impl<T> Drop for InstrumentedReadAt<T>
181where
182    T: VortexReadAt,
183{
184    #[allow(clippy::cognitive_complexity)]
185    fn drop(&mut self) {
186        let sizes = self.sizes.snapshot();
187        tracing::debug!("Reads: {}", self.sizes.count());
188        tracing::debug!(
189            "Read size: p50={} p95={} p99={} p999={}",
190            sizes.value(0.5),
191            sizes.value(0.95),
192            sizes.value(0.99),
193            sizes.value(0.999),
194        );
195
196        let total_size = self.total_size.count();
197        tracing::debug!("Total read size: {total_size}");
198
199        let durations = self.durations.snapshot();
200        tracing::debug!(
201            "Read duration: p50={}ms p95={}ms p99={}ms p999={}ms",
202            durations.value(0.5) / 1_000_000.0,
203            durations.value(0.95) / 1_000_000.0,
204            durations.value(0.99) / 1_000_000.0,
205            durations.value(0.999) / 1_000_000.0
206        );
207    }
208}
209
210#[async_trait]
211impl<T: VortexReadAt> VortexReadAt for InstrumentedReadAt<T> {
212    fn read_at(
213        &self,
214        offset: u64,
215        length: usize,
216        alignment: Alignment,
217    ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
218        let durations = self.durations.clone();
219        let sizes = self.sizes.clone();
220        let total_size = self.total_size.clone();
221        let read_fut = self.read.read_at(offset, length, alignment);
222        async move {
223            let _timer = durations.time();
224            let buf = read_fut.await;
225            sizes.update(length as i64);
226            total_size.add(length as i64);
227            buf
228        }
229        .boxed()
230    }
231
232    #[inline]
233    fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
234        self.read.size()
235    }
236
237    fn performance_hint(&self) -> PerformanceHint {
238        self.read.performance_hint()
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use std::sync::Arc;
245
246    use vortex_buffer::Alignment;
247    use vortex_buffer::ByteBuffer;
248
249    use super::*;
250
251    #[test]
252    fn test_performance_hint_local() {
253        let hint = PerformanceHint::local();
254        assert_eq!(hint.coalescing_window(), 8192);
255        assert_eq!(hint.max_read(), Some(8192));
256    }
257
258    #[test]
259    fn test_performance_hint_object_storage() {
260        let hint = PerformanceHint::object_storage();
261        assert_eq!(hint.coalescing_window(), 1 << 20); // 1MB
262        assert_eq!(hint.max_read(), Some(8 << 20)); // 8MB
263    }
264
265    #[test]
266    fn test_performance_hint_custom() {
267        let hint = PerformanceHint::new(4096, Some(16384));
268        assert_eq!(hint.coalescing_window(), 4096);
269        assert_eq!(hint.max_read(), Some(16384));
270    }
271
272    #[test]
273    fn test_performance_hint_no_max() {
274        let hint = PerformanceHint::new(2048, None);
275        assert_eq!(hint.coalescing_window(), 2048);
276        assert_eq!(hint.max_read(), None);
277    }
278
279    #[tokio::test]
280    async fn test_byte_buffer_read_at() {
281        let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]);
282
283        let result = data.read_at(1, 3, Alignment::none()).await.unwrap();
284        assert_eq!(result.as_ref(), &[2, 3, 4]);
285    }
286
287    #[tokio::test]
288    async fn test_byte_buffer_read_out_of_bounds() {
289        let data = ByteBuffer::from(vec![1, 2, 3]);
290
291        let result = data.read_at(1, 9, Alignment::none()).await;
292        assert!(result.is_err());
293    }
294
295    #[tokio::test]
296    async fn test_arc_read_at() {
297        let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));
298
299        let result = data.read_at(2, 3, Alignment::none()).await.unwrap();
300        assert_eq!(result.as_ref(), &[3, 4, 5]);
301
302        let size = data.size().await.unwrap();
303        assert_eq!(size, 5);
304    }
305}