1use std::future::Future;
5use std::io;
6use std::ops::Range;
7use std::sync::Arc;
8
9use vortex_buffer::{Alignment, ByteBuffer};
10use vortex_error::{VortexExpect, vortex_err};
11use vortex_metrics::{Histogram, Timer, VortexMetrics};
12
13pub trait VortexReadAt: 'static {
20 fn read_byte_range(
30 &self,
31 range: Range<u64>,
32 alignment: Alignment,
33 ) -> impl Future<Output = io::Result<ByteBuffer>>;
34
35 fn performance_hint(&self) -> PerformanceHint {
38 PerformanceHint::local()
39 }
40
41 fn size(&self) -> impl Future<Output = io::Result<u64>>;
46}
47
48#[derive(Debug, Clone)]
49pub struct PerformanceHint {
50 coalescing_window: u64,
51 max_read: Option<u64>,
52}
53
54impl PerformanceHint {
55 pub fn new(coalescing_window: u64, max_read: Option<u64>) -> Self {
56 Self {
57 coalescing_window,
58 max_read,
59 }
60 }
61
62 pub fn local() -> Self {
64 Self::new(8192, Some(8192))
66 }
67
68 pub fn object_storage() -> Self {
69 Self::new(
70 1 << 20, Some(8 << 20), )
73 }
74
75 pub fn coalescing_window(&self) -> u64 {
77 self.coalescing_window
78 }
79
80 pub fn max_read(&self) -> Option<u64> {
82 self.max_read
83 }
84}
85
86impl<T: VortexReadAt> VortexReadAt for Arc<T> {
87 async fn read_byte_range(
88 &self,
89 range: Range<u64>,
90 alignment: Alignment,
91 ) -> io::Result<ByteBuffer> {
92 T::read_byte_range(self, range, alignment).await
93 }
94
95 fn performance_hint(&self) -> PerformanceHint {
96 T::performance_hint(self)
97 }
98
99 async fn size(&self) -> io::Result<u64> {
100 T::size(self).await
101 }
102}
103
104impl VortexReadAt for ByteBuffer {
105 async fn read_byte_range(
106 &self,
107 range: Range<u64>,
108 alignment: Alignment,
109 ) -> io::Result<ByteBuffer> {
110 let start = usize::try_from(range.start).vortex_expect("start too big for usize");
111 let end = usize::try_from(range.end).vortex_expect("end too big for usize");
112 if end > self.len() {
113 return Err(io::Error::new(
114 io::ErrorKind::UnexpectedEof,
115 vortex_err!("unexpected eof"),
116 ));
117 }
118 Ok(self.clone().slice_unaligned(start..end).aligned(alignment))
119 }
120
121 fn performance_hint(&self) -> PerformanceHint {
122 PerformanceHint::local()
123 }
124
125 async fn size(&self) -> io::Result<u64> {
126 Ok(self.len() as u64)
127 }
128}
129
130#[derive(Clone)]
131pub struct InstrumentedReadAt<T: VortexReadAt> {
132 read: T,
133 sizes: Arc<Histogram>,
134 durations: Arc<Timer>,
135}
136
137impl<T: VortexReadAt> InstrumentedReadAt<T> {
138 pub fn new(read: T, metrics: &VortexMetrics) -> Self {
139 Self {
140 read,
141 sizes: metrics.histogram("vortex.io.read.size"),
142 durations: metrics.timer("vortex.io.read.duration"),
143 }
144 }
145}
146
147impl<T: VortexReadAt> VortexReadAt for InstrumentedReadAt<T> {
148 async fn read_byte_range(
149 &self,
150 range: Range<u64>,
151 alignment: Alignment,
152 ) -> io::Result<ByteBuffer> {
153 let _timer = self.durations.time();
154 let size = range.end - range.start;
155 let buf = self.read.read_byte_range(range, alignment).await;
156 let _ = size.try_into().map(|size| self.sizes.update(size));
157 buf
158 }
159
160 #[inline]
161 async fn size(&self) -> io::Result<u64> {
162 self.read.size().await
163 }
164
165 fn performance_hint(&self) -> PerformanceHint {
166 self.read.performance_hint()
167 }
168}