vortex_io/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::future::Future;
5use std::io;
6use std::ops::Range;
7use std::sync::Arc;
8
9use vortex_buffer::{Alignment, ByteBuffer};
10use vortex_error::{VortexExpect, vortex_err};
11use vortex_metrics::{Histogram, Timer, VortexMetrics};
12
13/// A trait for types that support asynchronous reads.
14///
15/// References to the type must be safe to [share across threads][Send], but spawned
16/// futures may be `!Send` to support thread-per-core implementations.
17///
18/// Readers must be cheaply cloneable to allow for easy sharing across tasks or threads.
19pub trait VortexReadAt: 'static {
20    /// Request an asynchronous positional read. Results will be returned as a [`ByteBuffer`].
21    ///
22    /// If the reader does not have the requested number of bytes, the returned Future will complete
23    /// with an [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof].
24    ///
25    /// ## Thread Safety
26    ///
27    /// The resultant Future need not be [`Send`], allowing implementations that use thread-per-core
28    /// executors.
29    fn read_byte_range(
30        &self,
31        range: Range<u64>,
32        alignment: Alignment,
33    ) -> impl Future<Output = io::Result<ByteBuffer>>;
34
35    // TODO(ngates): the read implementation should be able to hint at its latency/throughput
36    //  allowing the caller to make better decisions about how to coalesce reads.
37    fn performance_hint(&self) -> PerformanceHint {
38        PerformanceHint::local()
39    }
40
41    /// Asynchronously get the number of bytes of data readable.
42    ///
43    /// For a file it will be the size in bytes, for an object in an
44    /// `ObjectStore` it will be the `ObjectMeta::size`.
45    fn size(&self) -> impl Future<Output = io::Result<u64>>;
46}
47
48#[derive(Debug, Clone)]
49pub struct PerformanceHint {
50    coalescing_window: u64,
51    max_read: Option<u64>,
52}
53
54impl PerformanceHint {
55    pub fn new(coalescing_window: u64, max_read: Option<u64>) -> Self {
56        Self {
57            coalescing_window,
58            max_read,
59        }
60    }
61
62    /// Creates a new instance with a profile appropriate for fast local storage, like memory or files on NVMe devices.
63    pub fn local() -> Self {
64        // Coalesce ~8K page size, also ensures we span padding for adjacent segments.
65        Self::new(8192, Some(8192))
66    }
67
68    pub fn object_storage() -> Self {
69        Self::new(
70            1 << 20,       // 1MB,
71            Some(8 << 20), // 8MB,
72        )
73    }
74
75    /// The maximum distance between two reads that should coalesced into a single operation.
76    pub fn coalescing_window(&self) -> u64 {
77        self.coalescing_window
78    }
79
80    /// Maximum number of bytes in a coalesced read.
81    pub fn max_read(&self) -> Option<u64> {
82        self.max_read
83    }
84}
85
86impl<T: VortexReadAt> VortexReadAt for Arc<T> {
87    async fn read_byte_range(
88        &self,
89        range: Range<u64>,
90        alignment: Alignment,
91    ) -> io::Result<ByteBuffer> {
92        T::read_byte_range(self, range, alignment).await
93    }
94
95    fn performance_hint(&self) -> PerformanceHint {
96        T::performance_hint(self)
97    }
98
99    async fn size(&self) -> io::Result<u64> {
100        T::size(self).await
101    }
102}
103
104impl VortexReadAt for ByteBuffer {
105    async fn read_byte_range(
106        &self,
107        range: Range<u64>,
108        alignment: Alignment,
109    ) -> io::Result<ByteBuffer> {
110        let start = usize::try_from(range.start).vortex_expect("start too big for usize");
111        let end = usize::try_from(range.end).vortex_expect("end too big for usize");
112        if end > self.len() {
113            return Err(io::Error::new(
114                io::ErrorKind::UnexpectedEof,
115                vortex_err!("unexpected eof"),
116            ));
117        }
118        Ok(self.clone().slice_unaligned(start..end).aligned(alignment))
119    }
120
121    fn performance_hint(&self) -> PerformanceHint {
122        PerformanceHint::local()
123    }
124
125    async fn size(&self) -> io::Result<u64> {
126        Ok(self.len() as u64)
127    }
128}
129
130#[derive(Clone)]
131pub struct InstrumentedReadAt<T: VortexReadAt> {
132    read: T,
133    sizes: Arc<Histogram>,
134    durations: Arc<Timer>,
135}
136
137impl<T: VortexReadAt> InstrumentedReadAt<T> {
138    pub fn new(read: T, metrics: &VortexMetrics) -> Self {
139        Self {
140            read,
141            sizes: metrics.histogram("vortex.io.read.size"),
142            durations: metrics.timer("vortex.io.read.duration"),
143        }
144    }
145}
146
147impl<T: VortexReadAt> VortexReadAt for InstrumentedReadAt<T> {
148    async fn read_byte_range(
149        &self,
150        range: Range<u64>,
151        alignment: Alignment,
152    ) -> io::Result<ByteBuffer> {
153        let _timer = self.durations.time();
154        let size = range.end - range.start;
155        let buf = self.read.read_byte_range(range, alignment).await;
156        let _ = size.try_into().map(|size| self.sizes.update(size));
157        buf
158    }
159
160    #[inline]
161    async fn size(&self) -> io::Result<u64> {
162        self.read.size().await
163    }
164
165    fn performance_hint(&self) -> PerformanceHint {
166        self.read.performance_hint()
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use std::sync::Arc;
173
174    use vortex_buffer::{Alignment, ByteBuffer};
175
176    use super::*;
177
178    #[test]
179    fn test_performance_hint_local() {
180        let hint = PerformanceHint::local();
181        assert_eq!(hint.coalescing_window(), 8192);
182        assert_eq!(hint.max_read(), Some(8192));
183    }
184
185    #[test]
186    fn test_performance_hint_object_storage() {
187        let hint = PerformanceHint::object_storage();
188        assert_eq!(hint.coalescing_window(), 1 << 20); // 1MB
189        assert_eq!(hint.max_read(), Some(8 << 20)); // 8MB
190    }
191
192    #[test]
193    fn test_performance_hint_custom() {
194        let hint = PerformanceHint::new(4096, Some(16384));
195        assert_eq!(hint.coalescing_window(), 4096);
196        assert_eq!(hint.max_read(), Some(16384));
197    }
198
199    #[test]
200    fn test_performance_hint_no_max() {
201        let hint = PerformanceHint::new(2048, None);
202        assert_eq!(hint.coalescing_window(), 2048);
203        assert_eq!(hint.max_read(), None);
204    }
205
206    #[tokio::test]
207    async fn test_byte_buffer_read_at() {
208        let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]);
209
210        let result = data.read_byte_range(1..4, Alignment::none()).await.unwrap();
211        assert_eq!(result.as_ref(), &[2, 3, 4]);
212    }
213
214    #[tokio::test]
215    async fn test_byte_buffer_read_out_of_bounds() {
216        let data = ByteBuffer::from(vec![1, 2, 3]);
217
218        let result = data.read_byte_range(1..10, Alignment::none()).await;
219        assert!(result.is_err());
220        assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
221    }
222
223    #[tokio::test]
224    async fn test_arc_read_at() {
225        let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));
226
227        let result = data.read_byte_range(2..5, Alignment::none()).await.unwrap();
228        assert_eq!(result.as_ref(), &[3, 4, 5]);
229
230        let size = data.size().await.unwrap();
231        assert_eq!(size, 5);
232    }
233}