1use std::sync::Arc;
5
6use futures::FutureExt;
7use futures::future::BoxFuture;
8use vortex_array::buffer::BufferHandle;
9use vortex_buffer::Alignment;
10use vortex_buffer::ByteBuffer;
11use vortex_error::VortexExpect;
12use vortex_error::VortexResult;
13use vortex_error::vortex_bail;
14use vortex_metrics::Counter;
15use vortex_metrics::Histogram;
16use vortex_metrics::Label;
17use vortex_metrics::MetricBuilder;
18use vortex_metrics::MetricsRegistry;
19use vortex_metrics::Timer;
20
21#[derive(Clone, Copy, Debug)]
23pub struct CoalesceConfig {
24 pub distance: u64,
26 pub max_size: u64,
28}
29
30impl CoalesceConfig {
31 pub const fn new(distance: u64, max_size: u64) -> Self {
33 Self { distance, max_size }
34 }
35
36 pub const fn in_memory() -> Self {
38 Self::new(8 * 1024, 8 * 1024) }
40
41 pub const fn file() -> Self {
43 Self::new(1 << 20, 4 << 20) }
45
46 pub const fn object_storage() -> Self {
48 Self::new(1 << 20, 16 << 20) }
50}
51
52pub trait VortexReadAt: Send + Sync + 'static {
57 fn uri(&self) -> Option<&Arc<str>> {
59 None
60 }
61
62 fn coalesce_config(&self) -> Option<CoalesceConfig> {
64 None
65 }
66
67 fn concurrency(&self) -> usize;
78
79 fn size(&self) -> BoxFuture<'static, VortexResult<u64>>;
81
82 fn read_at(
87 &self,
88 offset: u64,
89 length: usize,
90 alignment: Alignment,
91 ) -> BoxFuture<'static, VortexResult<BufferHandle>>;
92}
93
94impl VortexReadAt for Arc<dyn VortexReadAt> {
95 fn uri(&self) -> Option<&Arc<str>> {
96 self.as_ref().uri()
97 }
98
99 fn coalesce_config(&self) -> Option<CoalesceConfig> {
100 self.as_ref().coalesce_config()
101 }
102
103 fn concurrency(&self) -> usize {
104 self.as_ref().concurrency()
105 }
106
107 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
108 self.as_ref().size()
109 }
110
111 fn read_at(
112 &self,
113 offset: u64,
114 length: usize,
115 alignment: Alignment,
116 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
117 self.as_ref().read_at(offset, length, alignment)
118 }
119}
120
121impl<R: VortexReadAt> VortexReadAt for Arc<R> {
122 fn uri(&self) -> Option<&Arc<str>> {
123 self.as_ref().uri()
124 }
125
126 fn coalesce_config(&self) -> Option<CoalesceConfig> {
127 self.as_ref().coalesce_config()
128 }
129
130 fn concurrency(&self) -> usize {
131 self.as_ref().concurrency()
132 }
133
134 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
135 self.as_ref().size()
136 }
137
138 fn read_at(
139 &self,
140 offset: u64,
141 length: usize,
142 alignment: Alignment,
143 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
144 self.as_ref().read_at(offset, length, alignment)
145 }
146}
147
148impl VortexReadAt for ByteBuffer {
149 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
150 let length = self.len() as u64;
151 async move { Ok(length) }.boxed()
152 }
153
154 fn concurrency(&self) -> usize {
155 16
156 }
157
158 fn read_at(
159 &self,
160 offset: u64,
161 length: usize,
162 alignment: Alignment,
163 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
164 let buffer = self.clone();
165 async move {
166 let start = usize::try_from(offset).vortex_expect("start too big for usize");
167 let end =
168 usize::try_from(offset + length as u64).vortex_expect("end too big for usize");
169 if end > buffer.len() {
170 vortex_bail!(
171 "Requested range {}..{} out of bounds for buffer of length {}",
172 start,
173 end,
174 buffer.len()
175 );
176 }
177 Ok(BufferHandle::new_host(
178 buffer.slice_unaligned(start..end).aligned(alignment),
179 ))
180 }
181 .boxed()
182 }
183}
184
185#[derive(Clone)]
187pub struct InstrumentedReadAt<T: VortexReadAt + Clone> {
188 read: T,
189 metrics: Arc<InnerMetrics>,
192}
193
194struct InnerMetrics {
195 sizes: Histogram,
196 total_size: Counter,
197 durations: Timer,
198}
199
200impl<T: VortexReadAt + Clone> InstrumentedReadAt<T> {
201 pub fn new(read: T, metrics_registry: &dyn MetricsRegistry) -> Self {
202 Self::new_with_labels(read, metrics_registry, Vec::<Label>::default())
203 }
204
205 pub fn new_with_labels<I, L>(read: T, metrics_registry: &dyn MetricsRegistry, labels: I) -> Self
206 where
207 I: IntoIterator<Item = L>,
208 L: Into<Label>,
209 {
210 let labels = labels.into_iter().map(|l| l.into()).collect::<Vec<Label>>();
211 let sizes = MetricBuilder::new(metrics_registry)
212 .add_labels(labels.clone())
213 .histogram("vortex.io.read.size");
214 let total_size = MetricBuilder::new(metrics_registry)
215 .add_labels(labels.clone())
216 .counter("vortex.io.read.total_size");
217 let durations = MetricBuilder::new(metrics_registry)
218 .add_labels(labels)
219 .timer("vortex.io.read.duration");
220
221 Self {
222 read,
223 metrics: Arc::new(InnerMetrics {
224 sizes,
225 total_size,
226 durations,
227 }),
228 }
229 }
230}
231
232impl Drop for InnerMetrics {
234 #[allow(clippy::cognitive_complexity)]
235 fn drop(&mut self) {
236 tracing::debug!("Reads: {}", self.sizes.count());
237 if !self.sizes.is_empty() {
238 tracing::debug!(
239 "Read size: p50={} p95={} p99={} p999={}",
240 self.sizes.quantile(0.5).vortex_expect("must not be empty"),
241 self.sizes.quantile(0.95).vortex_expect("must not be empty"),
242 self.sizes.quantile(0.99).vortex_expect("must not be empty"),
243 self.sizes
244 .quantile(0.999)
245 .vortex_expect("must not be empty"),
246 );
247 }
248
249 let total_size = self.total_size.value();
250 tracing::debug!("Total read size: {total_size}");
251
252 if !self.durations.is_empty() {
253 tracing::debug!(
254 "Read duration: p50={}ms p95={}ms p99={}ms p999={}ms",
255 self.durations
256 .quantile(0.5)
257 .vortex_expect("must not be empty")
258 .as_millis(),
259 self.durations
260 .quantile(0.95)
261 .vortex_expect("must not be empty")
262 .as_millis(),
263 self.durations
264 .quantile(0.99)
265 .vortex_expect("must not be empty")
266 .as_millis(),
267 self.durations
268 .quantile(0.999)
269 .vortex_expect("must not be empty")
270 .as_millis(),
271 );
272 }
273 }
274}
275
276impl<T: VortexReadAt + Clone> VortexReadAt for InstrumentedReadAt<T> {
277 fn uri(&self) -> Option<&Arc<str>> {
278 self.read.uri()
279 }
280
281 fn coalesce_config(&self) -> Option<CoalesceConfig> {
282 self.read.coalesce_config()
283 }
284
285 fn concurrency(&self) -> usize {
286 self.read.concurrency()
287 }
288
289 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
290 self.read.size()
291 }
292
293 fn read_at(
294 &self,
295 offset: u64,
296 length: usize,
297 alignment: Alignment,
298 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
299 let durations = self.metrics.durations.clone();
300 let sizes = self.metrics.sizes.clone();
301 let total_size = self.metrics.total_size.clone();
302
303 let read_fut = self.read.read_at(offset, length, alignment);
304 async move {
305 let _timer = durations.time();
306 let buf = read_fut.await;
307 sizes.update(length as f64);
308 total_size.add(length as u64);
309 buf
310 }
311 .boxed()
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use std::sync::Arc;
318
319 use vortex_buffer::Alignment;
320 use vortex_buffer::ByteBuffer;
321
322 use super::*;
323
324 #[test]
325 fn test_coalesce_config_in_memory() {
326 let config = CoalesceConfig::in_memory();
327 assert_eq!(config.distance, 8 * 1024);
328 assert_eq!(config.max_size, 8 * 1024);
329 }
330
331 #[test]
332 fn test_coalesce_config_file() {
333 let config = CoalesceConfig::file();
334 assert_eq!(config.distance, 1 << 20); assert_eq!(config.max_size, 4 << 20); }
337
338 #[test]
339 fn test_coalesce_config_object_storage() {
340 let config = CoalesceConfig::object_storage();
341 assert_eq!(config.distance, 1 << 20); assert_eq!(config.max_size, 16 << 20); }
344
345 #[tokio::test]
346 async fn test_byte_buffer_read_at() {
347 let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]);
348
349 let result = data.read_at(1, 3, Alignment::none()).await.unwrap();
350 assert_eq!(result.to_host().await.as_ref(), &[2, 3, 4]);
351 }
352
353 #[tokio::test]
354 async fn test_byte_buffer_read_out_of_bounds() {
355 let data = ByteBuffer::from(vec![1, 2, 3]);
356
357 let result = data.read_at(1, 9, Alignment::none()).await;
358 assert!(result.is_err());
359 }
360
361 #[tokio::test]
362 async fn test_arc_read_at() {
363 let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));
364
365 let result = data.read_at(2, 3, Alignment::none()).await.unwrap();
366 assert_eq!(result.to_host().await.as_ref(), &[3, 4, 5]);
367
368 let size = data.size().await.unwrap();
369 assert_eq!(size, 5);
370 }
371}