vortex_io/
read.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::future::Future;
5use std::io;
6use std::ops::Range;
7use std::sync::Arc;
8
9use vortex_buffer::{Alignment, ByteBuffer};
10use vortex_error::{VortexExpect, vortex_err};
11use vortex_metrics::{Histogram, Timer, VortexMetrics};
12
13/// A trait for types that support asynchronous reads.
14///
15/// References to the type must be safe to [share across threads][Send], but spawned
16/// futures may be `!Send` to support thread-per-core implementations.
17///
18/// Readers must be cheaply cloneable to allow for easy sharing across tasks or threads.
19pub trait VortexReadAt: 'static {
20    /// Request an asynchronous positional read. Results will be returned as a [`ByteBuffer`].
21    ///
22    /// If the reader does not have the requested number of bytes, the returned Future will complete
23    /// with an [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof].
24    ///
25    /// ## Thread Safety
26    ///
27    /// The resultant Future need not be [`Send`], allowing implementations that use thread-per-core
28    /// executors.
29    fn read_byte_range(
30        &self,
31        range: Range<u64>,
32        alignment: Alignment,
33    ) -> impl Future<Output = io::Result<ByteBuffer>>;
34
35    // TODO(ngates): the read implementation should be able to hint at its latency/throughput
36    //  allowing the caller to make better decisions about how to coalesce reads.
37    fn performance_hint(&self) -> PerformanceHint {
38        PerformanceHint::local()
39    }
40
41    /// Asynchronously get the number of bytes of data readable.
42    ///
43    /// For a file it will be the size in bytes, for an object in an
44    /// `ObjectStore` it will be the `ObjectMeta::size`.
45    fn size(&self) -> impl Future<Output = io::Result<u64>>;
46}
47
48#[derive(Debug, Clone)]
49pub struct PerformanceHint {
50    coalescing_window: u64,
51    max_read: Option<u64>,
52}
53
54impl PerformanceHint {
55    pub fn new(coalescing_window: u64, max_read: Option<u64>) -> Self {
56        Self {
57            coalescing_window,
58            max_read,
59        }
60    }
61
62    /// Creates a new instance with a profile appropriate for fast local storage, like memory or files on NVMe devices.
63    pub fn local() -> Self {
64        // Coalesce ~8K page size, also ensures we span padding for adjacent segments.
65        Self::new(8192, Some(8192))
66    }
67
68    pub fn object_storage() -> Self {
69        Self::new(
70            1 << 20,       // 1MB,
71            Some(8 << 20), // 8MB,
72        )
73    }
74
75    /// The maximum distance between two reads that should coalesced into a single operation.
76    pub fn coalescing_window(&self) -> u64 {
77        self.coalescing_window
78    }
79
80    /// Maximum number of bytes in a coalesced read.
81    pub fn max_read(&self) -> Option<u64> {
82        self.max_read
83    }
84}
85
86impl<T: VortexReadAt> VortexReadAt for Arc<T> {
87    async fn read_byte_range(
88        &self,
89        range: Range<u64>,
90        alignment: Alignment,
91    ) -> io::Result<ByteBuffer> {
92        T::read_byte_range(self, range, alignment).await
93    }
94
95    fn performance_hint(&self) -> PerformanceHint {
96        T::performance_hint(self)
97    }
98
99    async fn size(&self) -> io::Result<u64> {
100        T::size(self).await
101    }
102}
103
104impl VortexReadAt for ByteBuffer {
105    async fn read_byte_range(
106        &self,
107        range: Range<u64>,
108        alignment: Alignment,
109    ) -> io::Result<ByteBuffer> {
110        let start = usize::try_from(range.start).vortex_expect("start too big for usize");
111        let end = usize::try_from(range.end).vortex_expect("end too big for usize");
112        if end > self.len() {
113            return Err(io::Error::new(
114                io::ErrorKind::UnexpectedEof,
115                vortex_err!("unexpected eof"),
116            ));
117        }
118        Ok(self.clone().slice_unaligned(start..end).aligned(alignment))
119    }
120
121    fn performance_hint(&self) -> PerformanceHint {
122        PerformanceHint::local()
123    }
124
125    async fn size(&self) -> io::Result<u64> {
126        Ok(self.len() as u64)
127    }
128}
129
130#[derive(Clone)]
131pub struct InstrumentedReadAt<T: VortexReadAt> {
132    read: T,
133    sizes: Arc<Histogram>,
134    durations: Arc<Timer>,
135}
136
137impl<T: VortexReadAt> InstrumentedReadAt<T> {
138    pub fn new(read: T, metrics: &VortexMetrics) -> Self {
139        Self {
140            read,
141            sizes: metrics.histogram("vortex.io.read.size"),
142            durations: metrics.timer("vortex.io.read.duration"),
143        }
144    }
145}
146
147impl<T: VortexReadAt> VortexReadAt for InstrumentedReadAt<T> {
148    async fn read_byte_range(
149        &self,
150        range: Range<u64>,
151        alignment: Alignment,
152    ) -> io::Result<ByteBuffer> {
153        let _timer = self.durations.time();
154        let size = range.end - range.start;
155        let buf = self.read.read_byte_range(range, alignment).await;
156        let _ = size.try_into().map(|size| self.sizes.update(size));
157        buf
158    }
159
160    #[inline]
161    async fn size(&self) -> io::Result<u64> {
162        self.read.size().await
163    }
164
165    fn performance_hint(&self) -> PerformanceHint {
166        self.read.performance_hint()
167    }
168}