1use std::sync::Arc;
5
6use futures::FutureExt;
7use futures::future::BoxFuture;
8use vortex_array::buffer::BufferHandle;
9use vortex_buffer::Alignment;
10use vortex_buffer::ByteBuffer;
11use vortex_error::VortexExpect;
12use vortex_error::VortexResult;
13use vortex_error::vortex_bail;
14use vortex_metrics::Counter;
15use vortex_metrics::Histogram;
16use vortex_metrics::Label;
17use vortex_metrics::MetricBuilder;
18use vortex_metrics::MetricsRegistry;
19use vortex_metrics::Timer;
20
21#[derive(Clone, Copy, Debug)]
23pub struct CoalesceConfig {
24 pub distance: u64,
26 pub max_size: u64,
28}
29
30impl CoalesceConfig {
31 pub const fn new(distance: u64, max_size: u64) -> Self {
33 Self { distance, max_size }
34 }
35
36 pub const fn in_memory() -> Self {
38 Self::new(8 * 1024, 8 * 1024) }
40
41 pub const fn file() -> Self {
43 Self::new(1 << 20, 4 << 20) }
45
46 pub const fn object_storage() -> Self {
48 Self::new(1 << 20, 16 << 20) }
50}
51
52pub trait VortexReadAt: Send + Sync + 'static {
57 fn uri(&self) -> Option<&Arc<str>> {
59 None
60 }
61
62 fn coalesce_config(&self) -> Option<CoalesceConfig> {
64 None
65 }
66
67 fn concurrency(&self) -> usize;
78
79 fn size(&self) -> BoxFuture<'static, VortexResult<u64>>;
81
82 fn read_at(
87 &self,
88 offset: u64,
89 length: usize,
90 alignment: Alignment,
91 ) -> BoxFuture<'static, VortexResult<BufferHandle>>;
92}
93
94impl VortexReadAt for Arc<dyn VortexReadAt> {
95 fn uri(&self) -> Option<&Arc<str>> {
96 self.as_ref().uri()
97 }
98
99 fn coalesce_config(&self) -> Option<CoalesceConfig> {
100 self.as_ref().coalesce_config()
101 }
102
103 fn concurrency(&self) -> usize {
104 self.as_ref().concurrency()
105 }
106
107 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
108 self.as_ref().size()
109 }
110
111 fn read_at(
112 &self,
113 offset: u64,
114 length: usize,
115 alignment: Alignment,
116 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
117 self.as_ref().read_at(offset, length, alignment)
118 }
119}
120
121impl<R: VortexReadAt> VortexReadAt for Arc<R> {
122 fn uri(&self) -> Option<&Arc<str>> {
123 self.as_ref().uri()
124 }
125
126 fn coalesce_config(&self) -> Option<CoalesceConfig> {
127 self.as_ref().coalesce_config()
128 }
129
130 fn concurrency(&self) -> usize {
131 self.as_ref().concurrency()
132 }
133
134 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
135 self.as_ref().size()
136 }
137
138 fn read_at(
139 &self,
140 offset: u64,
141 length: usize,
142 alignment: Alignment,
143 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
144 self.as_ref().read_at(offset, length, alignment)
145 }
146}
147
148impl VortexReadAt for ByteBuffer {
149 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
150 let length = self.len() as u64;
151 async move { Ok(length) }.boxed()
152 }
153
154 fn concurrency(&self) -> usize {
155 16
156 }
157
158 fn read_at(
159 &self,
160 offset: u64,
161 length: usize,
162 alignment: Alignment,
163 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
164 let buffer = self.clone();
165 async move {
166 let start = usize::try_from(offset).vortex_expect("start too big for usize");
167 let end =
168 usize::try_from(offset + length as u64).vortex_expect("end too big for usize");
169 if end > buffer.len() {
170 vortex_bail!(
171 "Requested range {}..{} out of bounds for buffer of length {}",
172 start,
173 end,
174 buffer.len()
175 );
176 }
177 Ok(BufferHandle::new_host(
178 buffer.slice_unaligned(start..end).aligned(alignment),
179 ))
180 }
181 .boxed()
182 }
183}
184
185#[derive(Clone)]
187pub struct InstrumentedReadAt<T: VortexReadAt + Clone> {
188 read: T,
189 metrics: Arc<InnerMetrics>,
192}
193
194struct InnerMetrics {
195 sizes: Histogram,
196 total_size: Counter,
197 durations: Timer,
198}
199
200impl<T: VortexReadAt + Clone> InstrumentedReadAt<T> {
201 pub fn new(read: T, metrics_registry: &dyn MetricsRegistry) -> Self {
202 Self::new_with_labels(read, metrics_registry, Vec::<Label>::default())
203 }
204
205 pub fn new_with_labels<I, L>(read: T, metrics_registry: &dyn MetricsRegistry, labels: I) -> Self
206 where
207 I: IntoIterator<Item = L>,
208 L: Into<Label>,
209 {
210 let labels = labels.into_iter().map(|l| l.into()).collect::<Vec<Label>>();
211 let sizes = MetricBuilder::new(metrics_registry)
212 .add_labels(labels.clone())
213 .histogram("vortex.io.read.size");
214 let total_size = MetricBuilder::new(metrics_registry)
215 .add_labels(labels.clone())
216 .counter("vortex.io.read.total_size");
217 let durations = MetricBuilder::new(metrics_registry)
218 .add_labels(labels)
219 .timer("vortex.io.read.duration");
220
221 Self {
222 read,
223 metrics: Arc::new(InnerMetrics {
224 sizes,
225 total_size,
226 durations,
227 }),
228 }
229 }
230}
231
232impl InnerMetrics {
233 fn log_sizes(&self) {
234 tracing::debug!("Reads: {}", self.sizes.count());
235 if !self.sizes.is_empty() {
236 tracing::debug!(
237 "Read size: p50={} p95={} p99={} p999={}",
238 self.sizes.quantile(0.5).vortex_expect("must not be empty"),
239 self.sizes.quantile(0.95).vortex_expect("must not be empty"),
240 self.sizes.quantile(0.99).vortex_expect("must not be empty"),
241 self.sizes
242 .quantile(0.999)
243 .vortex_expect("must not be empty"),
244 );
245 }
246 tracing::debug!("Total read size: {}", self.total_size.value());
247 }
248
249 fn log_durations(&self) {
250 if !self.durations.is_empty() {
251 tracing::debug!(
252 "Read duration: p50={}ms p95={}ms p99={}ms p999={}ms",
253 self.durations
254 .quantile(0.5)
255 .vortex_expect("must not be empty")
256 .as_millis(),
257 self.durations
258 .quantile(0.95)
259 .vortex_expect("must not be empty")
260 .as_millis(),
261 self.durations
262 .quantile(0.99)
263 .vortex_expect("must not be empty")
264 .as_millis(),
265 self.durations
266 .quantile(0.999)
267 .vortex_expect("must not be empty")
268 .as_millis(),
269 );
270 }
271 }
272}
273
274impl Drop for InnerMetrics {
276 fn drop(&mut self) {
277 self.log_sizes();
278 self.log_durations();
279 }
280}
281
282impl<T: VortexReadAt + Clone> VortexReadAt for InstrumentedReadAt<T> {
283 fn uri(&self) -> Option<&Arc<str>> {
284 self.read.uri()
285 }
286
287 fn coalesce_config(&self) -> Option<CoalesceConfig> {
288 self.read.coalesce_config()
289 }
290
291 fn concurrency(&self) -> usize {
292 self.read.concurrency()
293 }
294
295 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
296 self.read.size()
297 }
298
299 fn read_at(
300 &self,
301 offset: u64,
302 length: usize,
303 alignment: Alignment,
304 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
305 let durations = self.metrics.durations.clone();
306 let sizes = self.metrics.sizes.clone();
307 let total_size = self.metrics.total_size.clone();
308
309 let read_fut = self.read.read_at(offset, length, alignment);
310 async move {
311 let _timer = durations.time();
312 let buf = read_fut.await;
313 sizes.update(length as f64);
314 total_size.add(length as u64);
315 buf
316 }
317 .boxed()
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use std::sync::Arc;
324
325 use vortex_buffer::Alignment;
326 use vortex_buffer::ByteBuffer;
327
328 use super::*;
329
330 #[test]
331 fn test_coalesce_config_in_memory() {
332 let config = CoalesceConfig::in_memory();
333 assert_eq!(config.distance, 8 * 1024);
334 assert_eq!(config.max_size, 8 * 1024);
335 }
336
337 #[test]
338 fn test_coalesce_config_file() {
339 let config = CoalesceConfig::file();
340 assert_eq!(config.distance, 1 << 20); assert_eq!(config.max_size, 4 << 20); }
343
344 #[test]
345 fn test_coalesce_config_object_storage() {
346 let config = CoalesceConfig::object_storage();
347 assert_eq!(config.distance, 1 << 20); assert_eq!(config.max_size, 16 << 20); }
350
351 #[tokio::test]
352 async fn test_byte_buffer_read_at() {
353 let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]);
354
355 let result = data.read_at(1, 3, Alignment::none()).await.unwrap();
356 assert_eq!(result.to_host().await.as_ref(), &[2, 3, 4]);
357 }
358
359 #[tokio::test]
360 async fn test_byte_buffer_read_out_of_bounds() {
361 let data = ByteBuffer::from(vec![1, 2, 3]);
362
363 let result = data.read_at(1, 9, Alignment::none()).await;
364 assert!(result.is_err());
365 }
366
367 #[tokio::test]
368 async fn test_arc_read_at() {
369 let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));
370
371 let result = data.read_at(2, 3, Alignment::none()).await.unwrap();
372 assert_eq!(result.to_host().await.as_ref(), &[3, 4, 5]);
373
374 let size = data.size().await.unwrap();
375 assert_eq!(size, 5);
376 }
377}