1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use futures::future::BoxFuture;
9use vortex_buffer::Alignment;
10use vortex_buffer::ByteBuffer;
11use vortex_error::VortexExpect;
12use vortex_error::VortexResult;
13use vortex_error::vortex_bail;
14use vortex_metrics::Counter;
15use vortex_metrics::Histogram;
16use vortex_metrics::Timer;
17use vortex_metrics::VortexMetrics;
18
19pub trait VortexReadAt: Send + Sync + 'static {
28 fn read_at(
54 &self,
55 offset: u64,
56 length: usize,
57 alignment: Alignment,
58 ) -> BoxFuture<'static, VortexResult<ByteBuffer>>;
59
60 fn size(&self) -> BoxFuture<'static, VortexResult<u64>>;
62
63 fn performance_hint(&self) -> PerformanceHint {
65 PerformanceHint::local()
66 }
67}
68
69#[derive(Debug, Clone)]
70pub struct PerformanceHint {
71 coalescing_window: u64,
72 max_read: Option<u64>,
73}
74
75impl PerformanceHint {
76 pub fn new(coalescing_window: u64, max_read: Option<u64>) -> Self {
77 Self {
78 coalescing_window,
79 max_read,
80 }
81 }
82
83 pub fn local() -> Self {
85 Self::new(8192, Some(8192))
87 }
88
89 pub fn object_storage() -> Self {
90 Self::new(
91 1 << 20, Some(8 << 20), )
94 }
95
96 pub fn coalescing_window(&self) -> u64 {
98 self.coalescing_window
99 }
100
101 pub fn max_read(&self) -> Option<u64> {
103 self.max_read
104 }
105}
106
107impl<R: VortexReadAt> VortexReadAt for Arc<R> {
108 fn read_at(
109 &self,
110 offset: u64,
111 length: usize,
112 alignment: Alignment,
113 ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
114 self.as_ref().read_at(offset, length, alignment)
115 }
116
117 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
118 self.as_ref().size()
119 }
120
121 fn performance_hint(&self) -> PerformanceHint {
122 self.as_ref().performance_hint()
123 }
124}
125
126impl VortexReadAt for ByteBuffer {
127 fn read_at(
128 &self,
129 offset: u64,
130 length: usize,
131 alignment: Alignment,
132 ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
133 let buffer = self.clone();
134 async move {
135 let start = usize::try_from(offset).vortex_expect("start too big for usize");
136 let end =
137 usize::try_from(offset + length as u64).vortex_expect("end too big for usize");
138 if end > buffer.len() {
139 vortex_bail!(
140 "Requested range {}..{} out of bounds for buffer of length {}",
141 start,
142 end,
143 buffer.len()
144 );
145 }
146 Ok(buffer.slice_unaligned(start..end).aligned(alignment))
147 }
148 .boxed()
149 }
150
151 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
152 let length = self.len() as u64;
153 async move { Ok(length) }.boxed()
154 }
155
156 fn performance_hint(&self) -> PerformanceHint {
157 PerformanceHint::local()
158 }
159}
160
161#[derive(Clone)]
162pub struct InstrumentedReadAt<T: VortexReadAt> {
163 read: Arc<T>,
164 sizes: Arc<Histogram>,
165 total_size: Arc<Counter>,
166 durations: Arc<Timer>,
167}
168
169impl<T: VortexReadAt> InstrumentedReadAt<T> {
170 pub fn new(read: Arc<T>, metrics: &VortexMetrics) -> Self {
171 Self {
172 read,
173 sizes: metrics.histogram("vortex.io.read.size"),
174 total_size: metrics.counter("vortex.io.read.total_size"),
175 durations: metrics.timer("vortex.io.read.duration"),
176 }
177 }
178}
179
180impl<T> Drop for InstrumentedReadAt<T>
181where
182 T: VortexReadAt,
183{
184 #[allow(clippy::cognitive_complexity)]
185 fn drop(&mut self) {
186 let sizes = self.sizes.snapshot();
187 tracing::debug!("Reads: {}", self.sizes.count());
188 tracing::debug!(
189 "Read size: p50={} p95={} p99={} p999={}",
190 sizes.value(0.5),
191 sizes.value(0.95),
192 sizes.value(0.99),
193 sizes.value(0.999),
194 );
195
196 let total_size = self.total_size.count();
197 tracing::debug!("Total read size: {total_size}");
198
199 let durations = self.durations.snapshot();
200 tracing::debug!(
201 "Read duration: p50={}ms p95={}ms p99={}ms p999={}ms",
202 durations.value(0.5) / 1_000_000.0,
203 durations.value(0.95) / 1_000_000.0,
204 durations.value(0.99) / 1_000_000.0,
205 durations.value(0.999) / 1_000_000.0
206 );
207 }
208}
209
210#[async_trait]
211impl<T: VortexReadAt> VortexReadAt for InstrumentedReadAt<T> {
212 fn read_at(
213 &self,
214 offset: u64,
215 length: usize,
216 alignment: Alignment,
217 ) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
218 let durations = self.durations.clone();
219 let sizes = self.sizes.clone();
220 let total_size = self.total_size.clone();
221 let read_fut = self.read.read_at(offset, length, alignment);
222 async move {
223 let _timer = durations.time();
224 let buf = read_fut.await;
225 sizes.update(length as i64);
226 total_size.add(length as i64);
227 buf
228 }
229 .boxed()
230 }
231
232 #[inline]
233 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
234 self.read.size()
235 }
236
237 fn performance_hint(&self) -> PerformanceHint {
238 self.read.performance_hint()
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use std::sync::Arc;
245
246 use vortex_buffer::Alignment;
247 use vortex_buffer::ByteBuffer;
248
249 use super::*;
250
251 #[test]
252 fn test_performance_hint_local() {
253 let hint = PerformanceHint::local();
254 assert_eq!(hint.coalescing_window(), 8192);
255 assert_eq!(hint.max_read(), Some(8192));
256 }
257
258 #[test]
259 fn test_performance_hint_object_storage() {
260 let hint = PerformanceHint::object_storage();
261 assert_eq!(hint.coalescing_window(), 1 << 20); assert_eq!(hint.max_read(), Some(8 << 20)); }
264
265 #[test]
266 fn test_performance_hint_custom() {
267 let hint = PerformanceHint::new(4096, Some(16384));
268 assert_eq!(hint.coalescing_window(), 4096);
269 assert_eq!(hint.max_read(), Some(16384));
270 }
271
272 #[test]
273 fn test_performance_hint_no_max() {
274 let hint = PerformanceHint::new(2048, None);
275 assert_eq!(hint.coalescing_window(), 2048);
276 assert_eq!(hint.max_read(), None);
277 }
278
279 #[tokio::test]
280 async fn test_byte_buffer_read_at() {
281 let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]);
282
283 let result = data.read_at(1, 3, Alignment::none()).await.unwrap();
284 assert_eq!(result.as_ref(), &[2, 3, 4]);
285 }
286
287 #[tokio::test]
288 async fn test_byte_buffer_read_out_of_bounds() {
289 let data = ByteBuffer::from(vec![1, 2, 3]);
290
291 let result = data.read_at(1, 9, Alignment::none()).await;
292 assert!(result.is_err());
293 }
294
295 #[tokio::test]
296 async fn test_arc_read_at() {
297 let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));
298
299 let result = data.read_at(2, 3, Alignment::none()).await.unwrap();
300 assert_eq!(result.as_ref(), &[3, 4, 5]);
301
302 let size = data.size().await.unwrap();
303 assert_eq!(size, 5);
304 }
305}