1use std::sync::Arc;
5
6use futures::FutureExt;
7use futures::future::BoxFuture;
8use vortex_array::buffer::BufferHandle;
9use vortex_buffer::Alignment;
10use vortex_buffer::ByteBuffer;
11use vortex_error::VortexExpect;
12use vortex_error::VortexResult;
13use vortex_error::vortex_bail;
14use vortex_metrics::Counter;
15use vortex_metrics::Histogram;
16use vortex_metrics::Label;
17use vortex_metrics::MetricBuilder;
18use vortex_metrics::MetricsRegistry;
19use vortex_metrics::Timer;
20
21#[derive(Clone, Copy, Debug)]
23pub struct CoalesceConfig {
24 pub distance: u64,
26 pub max_size: u64,
28}
29
30impl CoalesceConfig {
31 pub const fn new(distance: u64, max_size: u64) -> Self {
33 Self { distance, max_size }
34 }
35
36 pub const fn local() -> Self {
38 Self::new(8 * 1024, 8 * 1024) }
40
41 pub const fn object_storage() -> Self {
43 Self::new(1 << 20, 16 << 20) }
45}
46
47pub trait VortexReadAt: Send + Sync + 'static {
52 fn uri(&self) -> Option<&Arc<str>> {
54 None
55 }
56
57 fn coalesce_config(&self) -> Option<CoalesceConfig> {
59 None
60 }
61
62 fn concurrency(&self) -> usize;
73
74 fn size(&self) -> BoxFuture<'static, VortexResult<u64>>;
76
77 fn read_at(
82 &self,
83 offset: u64,
84 length: usize,
85 alignment: Alignment,
86 ) -> BoxFuture<'static, VortexResult<BufferHandle>>;
87}
88
89impl VortexReadAt for Arc<dyn VortexReadAt> {
90 fn uri(&self) -> Option<&Arc<str>> {
91 self.as_ref().uri()
92 }
93
94 fn coalesce_config(&self) -> Option<CoalesceConfig> {
95 self.as_ref().coalesce_config()
96 }
97
98 fn concurrency(&self) -> usize {
99 self.as_ref().concurrency()
100 }
101
102 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
103 self.as_ref().size()
104 }
105
106 fn read_at(
107 &self,
108 offset: u64,
109 length: usize,
110 alignment: Alignment,
111 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
112 self.as_ref().read_at(offset, length, alignment)
113 }
114}
115
116impl<R: VortexReadAt> VortexReadAt for Arc<R> {
117 fn uri(&self) -> Option<&Arc<str>> {
118 self.as_ref().uri()
119 }
120
121 fn coalesce_config(&self) -> Option<CoalesceConfig> {
122 self.as_ref().coalesce_config()
123 }
124
125 fn concurrency(&self) -> usize {
126 self.as_ref().concurrency()
127 }
128
129 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
130 self.as_ref().size()
131 }
132
133 fn read_at(
134 &self,
135 offset: u64,
136 length: usize,
137 alignment: Alignment,
138 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
139 self.as_ref().read_at(offset, length, alignment)
140 }
141}
142
143impl VortexReadAt for ByteBuffer {
144 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
145 let length = self.len() as u64;
146 async move { Ok(length) }.boxed()
147 }
148
149 fn concurrency(&self) -> usize {
150 16
151 }
152
153 fn read_at(
154 &self,
155 offset: u64,
156 length: usize,
157 alignment: Alignment,
158 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
159 let buffer = self.clone();
160 async move {
161 let start = usize::try_from(offset).vortex_expect("start too big for usize");
162 let end =
163 usize::try_from(offset + length as u64).vortex_expect("end too big for usize");
164 if end > buffer.len() {
165 vortex_bail!(
166 "Requested range {}..{} out of bounds for buffer of length {}",
167 start,
168 end,
169 buffer.len()
170 );
171 }
172 Ok(BufferHandle::new_host(
173 buffer.slice_unaligned(start..end).aligned(alignment),
174 ))
175 }
176 .boxed()
177 }
178}
179
180#[derive(Clone)]
182pub struct InstrumentedReadAt<T: VortexReadAt + Clone> {
183 read: T,
184 metrics: Arc<InnerMetrics>,
187}
188
189struct InnerMetrics {
190 sizes: Histogram,
191 total_size: Counter,
192 durations: Timer,
193}
194
195impl<T: VortexReadAt + Clone> InstrumentedReadAt<T> {
196 pub fn new(read: T, metrics_registry: &dyn MetricsRegistry) -> Self {
197 Self::new_with_labels(read, metrics_registry, Vec::<Label>::default())
198 }
199
200 pub fn new_with_labels<I, L>(read: T, metrics_registry: &dyn MetricsRegistry, labels: I) -> Self
201 where
202 I: IntoIterator<Item = L>,
203 L: Into<Label>,
204 {
205 let labels = labels.into_iter().map(|l| l.into()).collect::<Vec<Label>>();
206 let sizes = MetricBuilder::new(metrics_registry)
207 .add_labels(labels.clone())
208 .histogram("vortex.io.read.size");
209 let total_size = MetricBuilder::new(metrics_registry)
210 .add_labels(labels.clone())
211 .counter("vortex.io.read.total_size");
212 let durations = MetricBuilder::new(metrics_registry)
213 .add_labels(labels)
214 .timer("vortex.io.read.duration");
215
216 Self {
217 read,
218 metrics: Arc::new(InnerMetrics {
219 sizes,
220 total_size,
221 durations,
222 }),
223 }
224 }
225}
226
227impl Drop for InnerMetrics {
229 #[allow(clippy::cognitive_complexity)]
230 fn drop(&mut self) {
231 tracing::debug!("Reads: {}", self.sizes.count());
232 if !self.sizes.is_empty() {
233 tracing::debug!(
234 "Read size: p50={} p95={} p99={} p999={}",
235 self.sizes.quantile(0.5).vortex_expect("must not be empty"),
236 self.sizes.quantile(0.95).vortex_expect("must not be empty"),
237 self.sizes.quantile(0.99).vortex_expect("must not be empty"),
238 self.sizes
239 .quantile(0.999)
240 .vortex_expect("must not be empty"),
241 );
242 }
243
244 let total_size = self.total_size.value();
245 tracing::debug!("Total read size: {total_size}");
246
247 if !self.durations.is_empty() {
248 tracing::debug!(
249 "Read duration: p50={}ms p95={}ms p99={}ms p999={}ms",
250 self.durations
251 .quantile(0.5)
252 .vortex_expect("must not be empty")
253 .as_millis(),
254 self.durations
255 .quantile(0.95)
256 .vortex_expect("must not be empty")
257 .as_millis(),
258 self.durations
259 .quantile(0.99)
260 .vortex_expect("must not be empty")
261 .as_millis(),
262 self.durations
263 .quantile(0.999)
264 .vortex_expect("must not be empty")
265 .as_millis(),
266 );
267 }
268 }
269}
270
271impl<T: VortexReadAt + Clone> VortexReadAt for InstrumentedReadAt<T> {
272 fn uri(&self) -> Option<&Arc<str>> {
273 self.read.uri()
274 }
275
276 fn coalesce_config(&self) -> Option<CoalesceConfig> {
277 self.read.coalesce_config()
278 }
279
280 fn concurrency(&self) -> usize {
281 self.read.concurrency()
282 }
283
284 fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
285 self.read.size()
286 }
287
288 fn read_at(
289 &self,
290 offset: u64,
291 length: usize,
292 alignment: Alignment,
293 ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
294 let durations = self.metrics.durations.clone();
295 let sizes = self.metrics.sizes.clone();
296 let total_size = self.metrics.total_size.clone();
297
298 let read_fut = self.read.read_at(offset, length, alignment);
299 async move {
300 let _timer = durations.time();
301 let buf = read_fut.await;
302 sizes.update(length as f64);
303 total_size.add(length as u64);
304 buf
305 }
306 .boxed()
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use std::sync::Arc;
313
314 use vortex_buffer::Alignment;
315 use vortex_buffer::ByteBuffer;
316
317 use super::*;
318
319 #[test]
320 fn test_coalesce_config_local() {
321 let config = CoalesceConfig::local();
322 assert_eq!(config.distance, 8 * 1024);
323 assert_eq!(config.max_size, 8 * 1024);
324 }
325
326 #[test]
327 fn test_coalesce_config_object_storage() {
328 let config = CoalesceConfig::object_storage();
329 assert_eq!(config.distance, 1 << 20); assert_eq!(config.max_size, 16 << 20); }
332
333 #[tokio::test]
334 async fn test_byte_buffer_read_at() {
335 let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]);
336
337 let result = data.read_at(1, 3, Alignment::none()).await.unwrap();
338 assert_eq!(result.to_host().await.as_ref(), &[2, 3, 4]);
339 }
340
341 #[tokio::test]
342 async fn test_byte_buffer_read_out_of_bounds() {
343 let data = ByteBuffer::from(vec![1, 2, 3]);
344
345 let result = data.read_at(1, 9, Alignment::none()).await;
346 assert!(result.is_err());
347 }
348
349 #[tokio::test]
350 async fn test_arc_read_at() {
351 let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));
352
353 let result = data.read_at(2, 3, Alignment::none()).await.unwrap();
354 assert_eq!(result.to_host().await.as_ref(), &[3, 4, 5]);
355
356 let size = data.size().await.unwrap();
357 assert_eq!(size, 5);
358 }
359}