1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use futures::future::BoxFuture;
9use vortex_buffer::{Alignment, ByteBuffer};
10use vortex_error::{VortexExpect, VortexResult, vortex_bail};
11use vortex_metrics::{Histogram, Timer, VortexMetrics};
12
13pub trait VortexReadAt: Send + Sync + 'static {
22 fn read_at(
48 &self,
49 offset: u64,
50 length: usize,
51 alignment: Alignment,
52 ) -> BoxFuture<'static, VortexResult<ByteBuffer>>;
53
54 fn size(&self) -> BoxFuture<'static, VortexResult<u64>>;
56
57 fn performance_hint(&self) -> PerformanceHint {
59 PerformanceHint::local()
60 }
61}
62
63#[derive(Debug, Clone)]
64pub struct PerformanceHint {
65 coalescing_window: u64,
66 max_read: Option<u64>,
67}
68
69impl PerformanceHint {
70 pub fn new(coalescing_window: u64, max_read: Option<u64>) -> Self {
71 Self {
72 coalescing_window,
73 max_read,
74 }
75 }
76
77 pub fn local() -> Self {
79 Self::new(8192, Some(8192))
81 }
82
83 pub fn object_storage() -> Self {
84 Self::new(
85 1 << 20, Some(8 << 20), )
88 }
89
90 pub fn coalescing_window(&self) -> u64 {
92 self.coalescing_window
93 }
94
95 pub fn max_read(&self) -> Option<u64> {
97 self.max_read
98 }
99}
100
101impl<R: VortexReadAt> VortexReadAt for Arc<R> {
102 fn read_at(
103 &self,
104 offset: u64,
105 length: usize,
106 alignment: Alignment,
107 ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
108 self.as_ref().read_at(offset, length, alignment)
109 }
110
111 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
112 self.as_ref().size()
113 }
114
115 fn performance_hint(&self) -> PerformanceHint {
116 self.as_ref().performance_hint()
117 }
118}
119
120impl VortexReadAt for ByteBuffer {
121 fn read_at(
122 &self,
123 offset: u64,
124 length: usize,
125 alignment: Alignment,
126 ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
127 let buffer = self.clone();
128 async move {
129 let start = usize::try_from(offset).vortex_expect("start too big for usize");
130 let end =
131 usize::try_from(offset + length as u64).vortex_expect("end too big for usize");
132 if end > buffer.len() {
133 vortex_bail!(
134 "Requested range {}..{} out of bounds for buffer of length {}",
135 start,
136 end,
137 buffer.len()
138 );
139 }
140 Ok(buffer.slice_unaligned(start..end).aligned(alignment))
141 }
142 .boxed()
143 }
144
145 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
146 let length = self.len() as u64;
147 async move { Ok(length) }.boxed()
148 }
149
150 fn performance_hint(&self) -> PerformanceHint {
151 PerformanceHint::local()
152 }
153}
154
155#[derive(Clone)]
156pub struct InstrumentedReadAt<T: VortexReadAt> {
157 read: Arc<T>,
158 sizes: Arc<Histogram>,
159 durations: Arc<Timer>,
160}
161
162impl<T: VortexReadAt> InstrumentedReadAt<T> {
163 pub fn new(read: Arc<T>, metrics: &VortexMetrics) -> Self {
164 Self {
165 read,
166 sizes: metrics.histogram("vortex.io.read.size"),
167 durations: metrics.timer("vortex.io.read.duration"),
168 }
169 }
170}
171
172impl<T> Drop for InstrumentedReadAt<T>
173where
174 T: VortexReadAt,
175{
176 fn drop(&mut self) {
177 let sizes = self.sizes.snapshot();
178 log::debug!("Reads: {}", self.sizes.count());
179 log::debug!(
180 "Read size: p50={} p95={} p99={} p999={}",
181 sizes.value(0.5),
182 sizes.value(0.95),
183 sizes.value(0.99),
184 sizes.value(0.999),
185 );
186 let durations = self.durations.snapshot();
187 log::debug!(
188 "Read duration: p50={}ms p95={}ms p99={}ms p999={}ms",
189 durations.value(0.5) / 1_000_000.0,
190 durations.value(0.95) / 1_000_000.0,
191 durations.value(0.99) / 1_000_000.0,
192 durations.value(0.999) / 1_000_000.0
193 );
194 }
195}
196
197#[async_trait]
198impl<T: VortexReadAt> VortexReadAt for InstrumentedReadAt<T> {
199 fn read_at(
200 &self,
201 offset: u64,
202 length: usize,
203 alignment: Alignment,
204 ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
205 let durations = self.durations.clone();
206 let sizes = self.sizes.clone();
207 let read_fut = self.read.read_at(offset, length, alignment);
208 async move {
209 let _timer = durations.time();
210 let buf = read_fut.await;
211 sizes.update(length as i64);
212 buf
213 }
214 .boxed()
215 }
216
217 #[inline]
218 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
219 self.read.size()
220 }
221
222 fn performance_hint(&self) -> PerformanceHint {
223 self.read.performance_hint()
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use std::sync::Arc;
230
231 use vortex_buffer::{Alignment, ByteBuffer};
232
233 use super::*;
234
235 #[test]
236 fn test_performance_hint_local() {
237 let hint = PerformanceHint::local();
238 assert_eq!(hint.coalescing_window(), 8192);
239 assert_eq!(hint.max_read(), Some(8192));
240 }
241
242 #[test]
243 fn test_performance_hint_object_storage() {
244 let hint = PerformanceHint::object_storage();
245 assert_eq!(hint.coalescing_window(), 1 << 20); assert_eq!(hint.max_read(), Some(8 << 20)); }
248
249 #[test]
250 fn test_performance_hint_custom() {
251 let hint = PerformanceHint::new(4096, Some(16384));
252 assert_eq!(hint.coalescing_window(), 4096);
253 assert_eq!(hint.max_read(), Some(16384));
254 }
255
256 #[test]
257 fn test_performance_hint_no_max() {
258 let hint = PerformanceHint::new(2048, None);
259 assert_eq!(hint.coalescing_window(), 2048);
260 assert_eq!(hint.max_read(), None);
261 }
262
263 #[tokio::test]
264 async fn test_byte_buffer_read_at() {
265 let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]);
266
267 let result = data.read_at(1, 3, Alignment::none()).await.unwrap();
268 assert_eq!(result.as_ref(), &[2, 3, 4]);
269 }
270
271 #[tokio::test]
272 async fn test_byte_buffer_read_out_of_bounds() {
273 let data = ByteBuffer::from(vec![1, 2, 3]);
274
275 let result = data.read_at(1, 9, Alignment::none()).await;
276 assert!(result.is_err());
277 }
278
279 #[tokio::test]
280 async fn test_arc_read_at() {
281 let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));
282
283 let result = data.read_at(2, 3, Alignment::none()).await.unwrap();
284 assert_eq!(result.as_ref(), &[3, 4, 5]);
285
286 let size = data.size().await.unwrap();
287 assert_eq!(size, 5);
288 }
289}