1use std::ops::Range;
5use std::sync::Arc;
6
7use futures::{StreamExt, pin_mut};
8use vortex_buffer::{Alignment, ByteBuffer, ByteBufferMut};
9use vortex_error::{VortexExpect, VortexResult, vortex_err};
10use vortex_io::{Dispatch, InstrumentedReadAt, IoDispatcher, VortexReadAt};
11use vortex_layout::segments::{SegmentEvents, SegmentId};
12use vortex_utils::aliases::dash_map::DashMap;
13
14use crate::driver::CoalescedDriver;
15use crate::segments::{
16 InitialReadSegmentCache, MokaSegmentCache, NoOpSegmentCache, SegmentCache, SegmentCacheMetrics,
17 SegmentCacheSourceAdapter,
18};
19use crate::{EOF_SIZE, FileType, Footer, MAX_FOOTER_SIZE, VortexFile, VortexOpenOptions};
20
21#[cfg(feature = "tokio")]
22static TOKIO_DISPATCHER: std::sync::LazyLock<IoDispatcher> =
23 std::sync::LazyLock::new(|| IoDispatcher::new_tokio(1));
24
25pub struct GenericVortexFile;
30
31impl FileType for GenericVortexFile {
32 type Options = GenericFileOptions;
33}
34
35impl VortexOpenOptions<GenericVortexFile> {
36 const INITIAL_READ_SIZE: u64 = 1 << 20; pub fn file() -> Self {
40 Self::new(Default::default())
41 .with_segment_cache(Arc::new(MokaSegmentCache::new(256 << 20)))
44 .with_initial_read_size(Self::INITIAL_READ_SIZE)
45 }
46
47 pub fn with_initial_read_size(mut self, initial_read_size: u64) -> Self {
49 self.options.initial_read_size = initial_read_size;
50 self
51 }
52
53 pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
55 self.options.segment_cache = segment_cache;
56 self
57 }
58
59 pub fn without_segment_cache(self) -> Self {
61 self.with_segment_cache(Arc::new(NoOpSegmentCache))
62 }
63
64 pub fn with_io_concurrency(mut self, io_concurrency: usize) -> Self {
65 self.options.io_concurrency = io_concurrency;
66 self
67 }
68
69 #[cfg(feature = "tokio")]
71 pub fn open_blocking(self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
72 futures::executor::block_on(self.open(read))
75 }
76
77 #[cfg(feature = "tokio")]
79 pub async fn open(mut self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
80 self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
81 self.open_read_at(vortex_io::TokioFile::open(read)?).await
82 }
83
84 pub async fn open_read_at<R: VortexReadAt + Send + Sync>(
87 self,
88 read: R,
89 ) -> VortexResult<VortexFile> {
90 let read = Arc::new(read);
91
92 let footer = if let Some(footer) = self.footer {
93 footer
94 } else {
95 self.read_footer(read.clone()).await?
96 };
97
98 let segment_cache = Arc::new(SegmentCacheMetrics::new(
99 InitialReadSegmentCache {
100 initial: self.options.initial_read_segments,
101 fallback: self.options.segment_cache,
102 },
103 self.metrics.clone(),
104 ));
105
106 let (events_source, events) = SegmentEvents::create();
108
109 let segment_source = Arc::new(SegmentCacheSourceAdapter::new(segment_cache, events_source));
111
112 let read = InstrumentedReadAt::new(read.clone(), &self.metrics);
113
114 let driver = CoalescedDriver::new(
115 read.performance_hint(),
116 footer.segment_map().clone(),
117 events,
118 self.metrics.clone(),
119 );
120
121 let io_concurrency = self.options.io_concurrency;
123 let io_dispatcher = self.options.io_dispatcher.clone();
124 self.options
125 .io_dispatcher
126 .dispatch(move || {
127 async move {
128 let stream = driver
130 .map(|coalesced_req| {
131 let read = read.clone();
132 io_dispatcher
133 .dispatch(move || coalesced_req.launch(read))
134 .vortex_expect("Failed to dispatch I/O request")
135 })
136 .buffer_unordered(io_concurrency)
137 .map(|result| result.vortex_expect("infallible"));
138 pin_mut!(stream);
139
140 stream.collect::<()>().await
142 }
143 })
144 .vortex_expect("Failed to spawn I/O driver");
145
146 Ok(VortexFile {
147 footer,
148 segment_source,
149 metrics: self.metrics,
150 })
151 }
152
153 async fn read_footer<R: VortexReadAt + Send + Sync>(
154 &self,
155 read: Arc<R>,
156 ) -> VortexResult<Footer> {
157 let file_size = match self.file_size {
159 None => self.dispatched_size(read.clone()).await?,
160 Some(file_size) => file_size,
161 };
162 let initial_read_size = self
163 .options
164 .initial_read_size
165 .max(MAX_FOOTER_SIZE as u64 + EOF_SIZE as u64)
167 .min(file_size);
168 let mut initial_offset = file_size - initial_read_size;
169 let mut initial_read: ByteBuffer = self
170 .dispatched_read(read.clone(), initial_offset..file_size)
171 .await?;
172
173 let postscript = self.parse_postscript(&initial_read)?;
174
175 let dtype_segment = self
177 .dtype
178 .is_none()
179 .then(|| {
180 postscript.dtype.ok_or_else(|| {
181 vortex_err!(
182 "Vortex file doesn't embed a DType and none provided to VortexOpenOptions"
183 )
184 })
185 })
186 .transpose()?;
187
188 let mut read_more_offset = initial_offset;
191 if let Some(dtype_segment) = &dtype_segment {
192 read_more_offset = read_more_offset.min(dtype_segment.offset);
193 }
194 if let Some(stats_segment) = &postscript.statistics {
195 read_more_offset = read_more_offset.min(stats_segment.offset);
196 }
197 read_more_offset = read_more_offset.min(postscript.layout.offset);
198 read_more_offset = read_more_offset.min(postscript.footer.offset);
199
200 if read_more_offset < initial_offset {
202 log::debug!(
203 "Initial read from {initial_offset} did not cover all footer segments, reading from {read_more_offset}"
204 );
205
206 let mut new_initial_read =
207 ByteBufferMut::with_capacity(usize::try_from(file_size - read_more_offset)?);
208 new_initial_read.extend_from_slice(
209 &self
210 .dispatched_read(read, read_more_offset..initial_offset)
211 .await?,
212 );
213 new_initial_read.extend_from_slice(&initial_read);
214
215 initial_offset = read_more_offset;
216 initial_read = new_initial_read.freeze();
217 }
218
219 let dtype = dtype_segment
221 .map(|segment| self.parse_dtype(initial_offset, &initial_read, &segment))
222 .transpose()?
223 .unwrap_or_else(|| self.dtype.clone().vortex_expect("DType was provided"));
224 let file_stats = postscript
225 .statistics
226 .map(|segment| self.parse_file_statistics(initial_offset, &initial_read, &segment))
227 .transpose()?;
228 let footer = self.parse_footer(
229 initial_offset,
230 &initial_read,
231 &postscript.footer,
232 &postscript.layout,
233 dtype,
234 file_stats,
235 )?;
236
237 self.populate_initial_segments(initial_offset, &initial_read, &footer);
240
241 Ok(footer)
242 }
243
244 async fn dispatched_size<R: VortexReadAt + Send + Sync>(
246 &self,
247 read: Arc<R>,
248 ) -> VortexResult<u64> {
249 Ok(self
250 .options
251 .io_dispatcher
252 .dispatch(move || async move { read.size().await })?
253 .await??)
254 }
255
256 async fn dispatched_read<R: VortexReadAt + Send + Sync>(
258 &self,
259 read: Arc<R>,
260 range: Range<u64>,
261 ) -> VortexResult<ByteBuffer> {
262 Ok(self
263 .options
264 .io_dispatcher
265 .dispatch(move || async move { read.read_byte_range(range, Alignment::none()).await })?
266 .await??)
267 }
268
269 fn populate_initial_segments(
271 &self,
272 initial_offset: u64,
273 initial_read: &ByteBuffer,
274 footer: &Footer,
275 ) {
276 let first_idx = footer
277 .segment_map()
278 .partition_point(|segment| segment.offset < initial_offset);
279
280 for idx in first_idx..footer.segment_map().len() {
281 let segment = &footer.segment_map()[idx];
282 let segment_id =
283 SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
284 let offset =
285 usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
286 let buffer = initial_read
287 .slice(offset..offset + (segment.length as usize))
288 .aligned(segment.alignment);
289 self.options
290 .initial_read_segments
291 .insert(segment_id, buffer);
292 }
293 }
294}
295
296#[cfg(feature = "object_store")]
297impl VortexOpenOptions<GenericVortexFile> {
298 pub async fn open_object_store(
299 mut self,
300 object_store: &Arc<dyn object_store::ObjectStore>,
301 path: &str,
302 ) -> VortexResult<VortexFile> {
303 use std::path::Path;
304
305 use vortex_io::ObjectStoreReadAt;
306
307 self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
309
310 let local_path = Path::new("/").join(path);
315 if local_path.exists() {
316 self.open(local_path).await
318 } else {
319 self.open_read_at(ObjectStoreReadAt::new(
320 object_store.clone(),
321 path.into(),
322 None,
323 ))
324 .await
325 }
326 }
327}
328
329pub struct GenericFileOptions {
330 segment_cache: Arc<dyn SegmentCache>,
331 initial_read_size: u64,
332 initial_read_segments: DashMap<SegmentId, ByteBuffer>,
333 io_concurrency: usize,
336 io_dispatcher: IoDispatcher,
338}
339
340impl Default for GenericFileOptions {
341 fn default() -> Self {
342 Self {
343 segment_cache: Arc::new(NoOpSegmentCache),
344 initial_read_size: 0,
345 initial_read_segments: Default::default(),
346 io_concurrency: 8,
347 io_dispatcher: IoDispatcher::shared(),
348 }
349 }
350}