1use std::future::Future;
5use std::io;
6use std::ops::Range;
7use std::sync::Arc;
8
9use vortex_buffer::{Alignment, ByteBuffer};
10use vortex_error::{VortexExpect, vortex_err};
11use vortex_metrics::{Histogram, Timer, VortexMetrics};
12
13pub trait VortexReadAt: 'static {
20 fn read_byte_range(
30 &self,
31 range: Range<u64>,
32 alignment: Alignment,
33 ) -> impl Future<Output = io::Result<ByteBuffer>>;
34
35 fn performance_hint(&self) -> PerformanceHint {
38 PerformanceHint::local()
39 }
40
41 fn size(&self) -> impl Future<Output = io::Result<u64>>;
46}
47
48#[derive(Debug, Clone)]
49pub struct PerformanceHint {
50 coalescing_window: u64,
51 max_read: Option<u64>,
52}
53
54impl PerformanceHint {
55 pub fn new(coalescing_window: u64, max_read: Option<u64>) -> Self {
56 Self {
57 coalescing_window,
58 max_read,
59 }
60 }
61
62 pub fn local() -> Self {
64 Self::new(8192, Some(8192))
66 }
67
68 pub fn object_storage() -> Self {
69 Self::new(
70 1 << 20, Some(8 << 20), )
73 }
74
75 pub fn coalescing_window(&self) -> u64 {
77 self.coalescing_window
78 }
79
80 pub fn max_read(&self) -> Option<u64> {
82 self.max_read
83 }
84}
85
86impl<T: VortexReadAt> VortexReadAt for Arc<T> {
87 async fn read_byte_range(
88 &self,
89 range: Range<u64>,
90 alignment: Alignment,
91 ) -> io::Result<ByteBuffer> {
92 T::read_byte_range(self, range, alignment).await
93 }
94
95 fn performance_hint(&self) -> PerformanceHint {
96 T::performance_hint(self)
97 }
98
99 async fn size(&self) -> io::Result<u64> {
100 T::size(self).await
101 }
102}
103
104impl VortexReadAt for ByteBuffer {
105 async fn read_byte_range(
106 &self,
107 range: Range<u64>,
108 alignment: Alignment,
109 ) -> io::Result<ByteBuffer> {
110 let start = usize::try_from(range.start).vortex_expect("start too big for usize");
111 let end = usize::try_from(range.end).vortex_expect("end too big for usize");
112 if end > self.len() {
113 return Err(io::Error::new(
114 io::ErrorKind::UnexpectedEof,
115 vortex_err!("unexpected eof"),
116 ));
117 }
118 Ok(self.clone().slice_unaligned(start..end).aligned(alignment))
119 }
120
121 fn performance_hint(&self) -> PerformanceHint {
122 PerformanceHint::local()
123 }
124
125 async fn size(&self) -> io::Result<u64> {
126 Ok(self.len() as u64)
127 }
128}
129
130#[derive(Clone)]
131pub struct InstrumentedReadAt<T: VortexReadAt> {
132 read: T,
133 sizes: Arc<Histogram>,
134 durations: Arc<Timer>,
135}
136
137impl<T: VortexReadAt> InstrumentedReadAt<T> {
138 pub fn new(read: T, metrics: &VortexMetrics) -> Self {
139 Self {
140 read,
141 sizes: metrics.histogram("vortex.io.read.size"),
142 durations: metrics.timer("vortex.io.read.duration"),
143 }
144 }
145}
146
147impl<T: VortexReadAt> VortexReadAt for InstrumentedReadAt<T> {
148 async fn read_byte_range(
149 &self,
150 range: Range<u64>,
151 alignment: Alignment,
152 ) -> io::Result<ByteBuffer> {
153 let _timer = self.durations.time();
154 let size = range.end - range.start;
155 let buf = self.read.read_byte_range(range, alignment).await;
156 let _ = size.try_into().map(|size| self.sizes.update(size));
157 buf
158 }
159
160 #[inline]
161 async fn size(&self) -> io::Result<u64> {
162 self.read.size().await
163 }
164
165 fn performance_hint(&self) -> PerformanceHint {
166 self.read.performance_hint()
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use std::sync::Arc;
173
174 use vortex_buffer::{Alignment, ByteBuffer};
175
176 use super::*;
177
178 #[test]
179 fn test_performance_hint_local() {
180 let hint = PerformanceHint::local();
181 assert_eq!(hint.coalescing_window(), 8192);
182 assert_eq!(hint.max_read(), Some(8192));
183 }
184
185 #[test]
186 fn test_performance_hint_object_storage() {
187 let hint = PerformanceHint::object_storage();
188 assert_eq!(hint.coalescing_window(), 1 << 20); assert_eq!(hint.max_read(), Some(8 << 20)); }
191
192 #[test]
193 fn test_performance_hint_custom() {
194 let hint = PerformanceHint::new(4096, Some(16384));
195 assert_eq!(hint.coalescing_window(), 4096);
196 assert_eq!(hint.max_read(), Some(16384));
197 }
198
199 #[test]
200 fn test_performance_hint_no_max() {
201 let hint = PerformanceHint::new(2048, None);
202 assert_eq!(hint.coalescing_window(), 2048);
203 assert_eq!(hint.max_read(), None);
204 }
205
206 #[tokio::test]
207 async fn test_byte_buffer_read_at() {
208 let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]);
209
210 let result = data.read_byte_range(1..4, Alignment::none()).await.unwrap();
211 assert_eq!(result.as_ref(), &[2, 3, 4]);
212 }
213
214 #[tokio::test]
215 async fn test_byte_buffer_read_out_of_bounds() {
216 let data = ByteBuffer::from(vec![1, 2, 3]);
217
218 let result = data.read_byte_range(1..10, Alignment::none()).await;
219 assert!(result.is_err());
220 assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
221 }
222
223 #[tokio::test]
224 async fn test_arc_read_at() {
225 let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));
226
227 let result = data.read_byte_range(2..5, Alignment::none()).await.unwrap();
228 assert_eq!(result.as_ref(), &[3, 4, 5]);
229
230 let size = data.size().await.unwrap();
231 assert_eq!(size, 5);
232 }
233}