1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use futures::future::BoxFuture;
9use vortex_buffer::Alignment;
10use vortex_buffer::ByteBuffer;
11use vortex_error::VortexExpect;
12use vortex_error::VortexResult;
13use vortex_error::vortex_bail;
14use vortex_metrics::Histogram;
15use vortex_metrics::Timer;
16use vortex_metrics::VortexMetrics;
17
18pub trait VortexReadAt: Send + Sync + 'static {
27 fn read_at(
53 &self,
54 offset: u64,
55 length: usize,
56 alignment: Alignment,
57 ) -> BoxFuture<'static, VortexResult<ByteBuffer>>;
58
59 fn size(&self) -> BoxFuture<'static, VortexResult<u64>>;
61
62 fn performance_hint(&self) -> PerformanceHint {
64 PerformanceHint::local()
65 }
66}
67
68#[derive(Debug, Clone)]
69pub struct PerformanceHint {
70 coalescing_window: u64,
71 max_read: Option<u64>,
72}
73
74impl PerformanceHint {
75 pub fn new(coalescing_window: u64, max_read: Option<u64>) -> Self {
76 Self {
77 coalescing_window,
78 max_read,
79 }
80 }
81
82 pub fn local() -> Self {
84 Self::new(8192, Some(8192))
86 }
87
88 pub fn object_storage() -> Self {
89 Self::new(
90 1 << 20, Some(8 << 20), )
93 }
94
95 pub fn coalescing_window(&self) -> u64 {
97 self.coalescing_window
98 }
99
100 pub fn max_read(&self) -> Option<u64> {
102 self.max_read
103 }
104}
105
106impl<R: VortexReadAt> VortexReadAt for Arc<R> {
107 fn read_at(
108 &self,
109 offset: u64,
110 length: usize,
111 alignment: Alignment,
112 ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
113 self.as_ref().read_at(offset, length, alignment)
114 }
115
116 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
117 self.as_ref().size()
118 }
119
120 fn performance_hint(&self) -> PerformanceHint {
121 self.as_ref().performance_hint()
122 }
123}
124
125impl VortexReadAt for ByteBuffer {
126 fn read_at(
127 &self,
128 offset: u64,
129 length: usize,
130 alignment: Alignment,
131 ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
132 let buffer = self.clone();
133 async move {
134 let start = usize::try_from(offset).vortex_expect("start too big for usize");
135 let end =
136 usize::try_from(offset + length as u64).vortex_expect("end too big for usize");
137 if end > buffer.len() {
138 vortex_bail!(
139 "Requested range {}..{} out of bounds for buffer of length {}",
140 start,
141 end,
142 buffer.len()
143 );
144 }
145 Ok(buffer.slice_unaligned(start..end).aligned(alignment))
146 }
147 .boxed()
148 }
149
150 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
151 let length = self.len() as u64;
152 async move { Ok(length) }.boxed()
153 }
154
155 fn performance_hint(&self) -> PerformanceHint {
156 PerformanceHint::local()
157 }
158}
159
160#[derive(Clone)]
161pub struct InstrumentedReadAt<T: VortexReadAt> {
162 read: Arc<T>,
163 sizes: Arc<Histogram>,
164 durations: Arc<Timer>,
165}
166
167impl<T: VortexReadAt> InstrumentedReadAt<T> {
168 pub fn new(read: Arc<T>, metrics: &VortexMetrics) -> Self {
169 Self {
170 read,
171 sizes: metrics.histogram("vortex.io.read.size"),
172 durations: metrics.timer("vortex.io.read.duration"),
173 }
174 }
175}
176
177impl<T> Drop for InstrumentedReadAt<T>
178where
179 T: VortexReadAt,
180{
181 fn drop(&mut self) {
182 let sizes = self.sizes.snapshot();
183 log::debug!("Reads: {}", self.sizes.count());
184 log::debug!(
185 "Read size: p50={} p95={} p99={} p999={}",
186 sizes.value(0.5),
187 sizes.value(0.95),
188 sizes.value(0.99),
189 sizes.value(0.999),
190 );
191 let durations = self.durations.snapshot();
192 log::debug!(
193 "Read duration: p50={}ms p95={}ms p99={}ms p999={}ms",
194 durations.value(0.5) / 1_000_000.0,
195 durations.value(0.95) / 1_000_000.0,
196 durations.value(0.99) / 1_000_000.0,
197 durations.value(0.999) / 1_000_000.0
198 );
199 }
200}
201
202#[async_trait]
203impl<T: VortexReadAt> VortexReadAt for InstrumentedReadAt<T> {
204 fn read_at(
205 &self,
206 offset: u64,
207 length: usize,
208 alignment: Alignment,
209 ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
210 let durations = self.durations.clone();
211 let sizes = self.sizes.clone();
212 let read_fut = self.read.read_at(offset, length, alignment);
213 async move {
214 let _timer = durations.time();
215 let buf = read_fut.await;
216 sizes.update(length as i64);
217 buf
218 }
219 .boxed()
220 }
221
222 #[inline]
223 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
224 self.read.size()
225 }
226
227 fn performance_hint(&self) -> PerformanceHint {
228 self.read.performance_hint()
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use std::sync::Arc;
235
236 use vortex_buffer::Alignment;
237 use vortex_buffer::ByteBuffer;
238
239 use super::*;
240
241 #[test]
242 fn test_performance_hint_local() {
243 let hint = PerformanceHint::local();
244 assert_eq!(hint.coalescing_window(), 8192);
245 assert_eq!(hint.max_read(), Some(8192));
246 }
247
248 #[test]
249 fn test_performance_hint_object_storage() {
250 let hint = PerformanceHint::object_storage();
251 assert_eq!(hint.coalescing_window(), 1 << 20); assert_eq!(hint.max_read(), Some(8 << 20)); }
254
255 #[test]
256 fn test_performance_hint_custom() {
257 let hint = PerformanceHint::new(4096, Some(16384));
258 assert_eq!(hint.coalescing_window(), 4096);
259 assert_eq!(hint.max_read(), Some(16384));
260 }
261
262 #[test]
263 fn test_performance_hint_no_max() {
264 let hint = PerformanceHint::new(2048, None);
265 assert_eq!(hint.coalescing_window(), 2048);
266 assert_eq!(hint.max_read(), None);
267 }
268
269 #[tokio::test]
270 async fn test_byte_buffer_read_at() {
271 let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]);
272
273 let result = data.read_at(1, 3, Alignment::none()).await.unwrap();
274 assert_eq!(result.as_ref(), &[2, 3, 4]);
275 }
276
277 #[tokio::test]
278 async fn test_byte_buffer_read_out_of_bounds() {
279 let data = ByteBuffer::from(vec![1, 2, 3]);
280
281 let result = data.read_at(1, 9, Alignment::none()).await;
282 assert!(result.is_err());
283 }
284
285 #[tokio::test]
286 async fn test_arc_read_at() {
287 let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));
288
289 let result = data.read_at(2, 3, Alignment::none()).await.unwrap();
290 assert_eq!(result.as_ref(), &[3, 4, 5]);
291
292 let size = data.size().await.unwrap();
293 assert_eq!(size, 5);
294 }
295}