1use std::ops::Range;
2use std::sync::{Arc, RwLock};
3
4use futures::{StreamExt, pin_mut};
5use vortex_array::aliases::hash_map::HashMap;
6use vortex_buffer::{Alignment, ByteBuffer, ByteBufferMut};
7use vortex_error::{VortexExpect, VortexResult, vortex_err};
8use vortex_io::{Dispatch, InstrumentedReadAt, IoDispatcher, VortexReadAt};
9use vortex_layout::segments::{SegmentEvents, SegmentId, SegmentSource};
10use vortex_metrics::VortexMetrics;
11
12use crate::driver::CoalescedDriver;
13use crate::segments::{
14 InitialReadSegmentCache, MokaSegmentCache, NoOpSegmentCache, SegmentCache, SegmentCacheMetrics,
15 SegmentCacheSourceAdapter,
16};
17use crate::{
18 EOF_SIZE, FileType, Footer, MAX_FOOTER_SIZE, SegmentSourceFactory, SegmentSpec, VortexFile,
19 VortexOpenOptions,
20};
21
22#[cfg(feature = "tokio")]
23static TOKIO_DISPATCHER: std::sync::LazyLock<IoDispatcher> =
24 std::sync::LazyLock::new(|| IoDispatcher::new_tokio(1));
25
26pub struct GenericVortexFile;
31
32impl FileType for GenericVortexFile {
33 type Options = GenericFileOptions;
34}
35
36impl VortexOpenOptions<GenericVortexFile> {
37 const INITIAL_READ_SIZE: u64 = 1 << 20; pub fn file() -> Self {
41 Self::new(Default::default())
42 .with_segment_cache(Arc::new(MokaSegmentCache::new(256 << 20)))
45 .with_initial_read_size(Self::INITIAL_READ_SIZE)
46 }
47
48 pub fn with_initial_read_size(mut self, initial_read_size: u64) -> Self {
50 self.options.initial_read_size = initial_read_size;
51 self
52 }
53
54 pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
56 self.options.segment_cache = segment_cache;
57 self
58 }
59
60 pub fn without_segment_cache(self) -> Self {
62 self.with_segment_cache(Arc::new(NoOpSegmentCache))
63 }
64
65 pub fn with_io_concurrency(mut self, io_concurrency: usize) -> Self {
66 self.options.io_concurrency = io_concurrency;
67 self
68 }
69
70 #[cfg(feature = "tokio")]
72 pub fn open_blocking(self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
73 futures::executor::block_on(self.open(read))
76 }
77
78 #[cfg(feature = "tokio")]
80 pub async fn open(mut self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
81 self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
82 self.open_read_at(vortex_io::TokioFile::open(read)?).await
83 }
84
85 pub async fn open_read_at<R: VortexReadAt + Send + Sync>(
88 self,
89 read: R,
90 ) -> VortexResult<VortexFile> {
91 let read = Arc::new(read);
92
93 let footer = if let Some(footer) = self.footer {
94 footer
95 } else {
96 self.read_footer(read.clone()).await?
97 };
98
99 let segment_cache = Arc::new(SegmentCacheMetrics::new(
100 InitialReadSegmentCache {
101 initial: self.options.initial_read_segments,
102 fallback: self.options.segment_cache,
103 },
104 self.metrics.clone(),
105 ));
106
107 let segment_source_factory = Arc::new(GenericVortexFileIo {
108 read,
109 segment_map: footer.segment_map().clone(),
110 segment_cache,
111 io_dispatcher: self.options.io_dispatcher,
112 io_concurrency: self.options.io_concurrency,
113 });
114
115 Ok(VortexFile {
116 footer,
117 segment_source_factory,
118 metrics: self.metrics,
119 })
120 }
121
122 async fn read_footer<R: VortexReadAt + Send + Sync>(
123 &self,
124 read: Arc<R>,
125 ) -> VortexResult<Footer> {
126 let file_size = match self.file_size {
128 None => self.dispatched_size(read.clone()).await?,
129 Some(file_size) => file_size,
130 };
131 let initial_read_size = self
132 .options
133 .initial_read_size
134 .max(MAX_FOOTER_SIZE as u64 + EOF_SIZE as u64)
136 .min(file_size);
137 let mut initial_offset = file_size - initial_read_size;
138 let mut initial_read: ByteBuffer = self
139 .dispatched_read(read.clone(), initial_offset..file_size)
140 .await?;
141
142 let postscript = self.parse_postscript(&initial_read)?;
143
144 let dtype_segment = self
146 .dtype
147 .is_none()
148 .then(|| {
149 postscript.dtype.ok_or_else(|| {
150 vortex_err!(
151 "Vortex file doesn't embed a DType and none provided to VortexOpenOptions"
152 )
153 })
154 })
155 .transpose()?;
156
157 let mut read_more_offset = initial_offset;
160 if let Some(dtype_segment) = &dtype_segment {
161 read_more_offset = read_more_offset.min(dtype_segment.offset);
162 }
163 if let Some(stats_segment) = &postscript.statistics {
164 read_more_offset = read_more_offset.min(stats_segment.offset);
165 }
166 read_more_offset = read_more_offset.min(postscript.layout.offset);
167 read_more_offset = read_more_offset.min(postscript.footer.offset);
168
169 if read_more_offset < initial_offset {
171 log::info!(
172 "Initial read from {} did not cover all footer segments, reading from {}",
173 initial_offset,
174 read_more_offset
175 );
176
177 let mut new_initial_read =
178 ByteBufferMut::with_capacity(usize::try_from(file_size - read_more_offset)?);
179 new_initial_read.extend_from_slice(
180 &self
181 .dispatched_read(read, read_more_offset..initial_offset)
182 .await?,
183 );
184 new_initial_read.extend_from_slice(&initial_read);
185
186 initial_offset = read_more_offset;
187 initial_read = new_initial_read.freeze();
188 }
189
190 let dtype = dtype_segment
192 .map(|segment| self.parse_dtype(initial_offset, &initial_read, &segment))
193 .transpose()?
194 .unwrap_or_else(|| self.dtype.clone().vortex_expect("DType was provided"));
195 let file_stats = postscript
196 .statistics
197 .map(|segment| self.parse_file_statistics(initial_offset, &initial_read, &segment))
198 .transpose()?;
199 let footer = self.parse_footer(
200 initial_offset,
201 &initial_read,
202 &postscript.footer,
203 &postscript.layout,
204 dtype,
205 file_stats,
206 )?;
207
208 self.populate_initial_segments(initial_offset, &initial_read, &footer);
211
212 Ok(footer)
213 }
214
215 async fn dispatched_size<R: VortexReadAt + Send + Sync>(
217 &self,
218 read: Arc<R>,
219 ) -> VortexResult<u64> {
220 Ok(self
221 .options
222 .io_dispatcher
223 .dispatch(move || async move { read.size().await })?
224 .await??)
225 }
226
227 async fn dispatched_read<R: VortexReadAt + Send + Sync>(
229 &self,
230 read: Arc<R>,
231 range: Range<u64>,
232 ) -> VortexResult<ByteBuffer> {
233 Ok(self
234 .options
235 .io_dispatcher
236 .dispatch(move || async move { read.read_byte_range(range, Alignment::none()).await })?
237 .await??)
238 }
239
240 fn populate_initial_segments(
242 &self,
243 initial_offset: u64,
244 initial_read: &ByteBuffer,
245 footer: &Footer,
246 ) {
247 let first_idx = footer
248 .segment_map()
249 .partition_point(|segment| segment.offset < initial_offset);
250
251 let mut initial_segments = self
252 .options
253 .initial_read_segments
254 .write()
255 .vortex_expect("poisoned lock");
256
257 for idx in first_idx..footer.segment_map().len() {
258 let segment = &footer.segment_map()[idx];
259 let segment_id =
260 SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
261 let offset =
262 usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
263 let buffer = initial_read
264 .slice(offset..offset + (segment.length as usize))
265 .aligned(segment.alignment);
266 initial_segments.insert(segment_id, buffer);
267 }
268 }
269}
270
271struct GenericVortexFileIo<R> {
272 read: Arc<R>,
273 segment_map: Arc<[SegmentSpec]>,
274 segment_cache: Arc<dyn SegmentCache>,
275 io_dispatcher: IoDispatcher,
276 io_concurrency: usize,
277}
278
279impl<R: VortexReadAt + Send + Sync> SegmentSourceFactory for GenericVortexFileIo<R> {
280 fn segment_source(&self, metrics: VortexMetrics) -> Arc<dyn SegmentSource> {
281 let (segment_source, events) = SegmentEvents::create();
283
284 let segment_source = Arc::new(SegmentCacheSourceAdapter::new(
286 self.segment_cache.clone(),
287 segment_source,
288 ));
289
290 let read = InstrumentedReadAt::new(self.read.clone(), &metrics);
291
292 let driver = CoalescedDriver::new(
293 read.performance_hint(),
294 self.segment_map.clone(),
295 events,
296 metrics,
297 );
298
299 let io_concurrency = self.io_concurrency;
301 self.io_dispatcher
302 .dispatch(move || {
303 async move {
304 let stream = driver
306 .map(|coalesced_req| coalesced_req.launch(&read))
307 .buffer_unordered(io_concurrency);
308 pin_mut!(stream);
309
310 stream.collect::<()>().await
312 }
313 })
314 .vortex_expect("Failed to spawn I/O driver");
315
316 segment_source
317 }
318}
319
320#[cfg(feature = "object_store")]
321impl VortexOpenOptions<GenericVortexFile> {
322 pub async fn open_object_store(
323 mut self,
324 object_store: &Arc<dyn object_store::ObjectStore>,
325 path: &str,
326 ) -> VortexResult<VortexFile> {
327 use std::path::Path;
328
329 use vortex_io::ObjectStoreReadAt;
330
331 self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
333
334 let local_path = Path::new("/").join(path);
339 if local_path.exists() {
340 self.open(local_path).await
342 } else {
343 self.open_read_at(ObjectStoreReadAt::new(
344 object_store.clone(),
345 path.into(),
346 None,
347 ))
348 .await
349 }
350 }
351}
352
353pub struct GenericFileOptions {
354 segment_cache: Arc<dyn SegmentCache>,
355 initial_read_size: u64,
356 initial_read_segments: RwLock<HashMap<SegmentId, ByteBuffer>>,
357 io_concurrency: usize,
360 io_dispatcher: IoDispatcher,
362}
363
364impl Default for GenericFileOptions {
365 fn default() -> Self {
366 Self {
367 segment_cache: Arc::new(NoOpSegmentCache),
368 initial_read_size: 0,
369 initial_read_segments: Default::default(),
370 io_concurrency: 8,
371 io_dispatcher: IoDispatcher::shared(),
372 }
373 }
374}