1use std::ops::Range;
5use std::sync::Arc;
6
7use dashmap::DashMap;
8use futures::{StreamExt, pin_mut};
9use vortex_buffer::{Alignment, ByteBuffer, ByteBufferMut};
10use vortex_error::{VortexExpect, VortexResult, vortex_err};
11use vortex_io::{Dispatch, InstrumentedReadAt, IoDispatcher, VortexReadAt};
12use vortex_layout::segments::{SegmentEvents, SegmentId};
13
14use crate::driver::CoalescedDriver;
15use crate::segments::{
16 InitialReadSegmentCache, MokaSegmentCache, NoOpSegmentCache, SegmentCache, SegmentCacheMetrics,
17 SegmentCacheSourceAdapter,
18};
19use crate::{EOF_SIZE, FileType, Footer, MAX_FOOTER_SIZE, VortexFile, VortexOpenOptions};
20
21#[cfg(feature = "tokio")]
22static TOKIO_DISPATCHER: std::sync::LazyLock<IoDispatcher> =
23 std::sync::LazyLock::new(|| IoDispatcher::new_tokio(1));
24
25pub struct GenericVortexFile;
30
31impl FileType for GenericVortexFile {
32 type Options = GenericFileOptions;
33}
34
35impl VortexOpenOptions<GenericVortexFile> {
36 const INITIAL_READ_SIZE: u64 = 1 << 20; pub fn file() -> Self {
40 Self::new(Default::default())
41 .with_segment_cache(Arc::new(MokaSegmentCache::new(256 << 20)))
44 .with_initial_read_size(Self::INITIAL_READ_SIZE)
45 }
46
47 pub fn with_initial_read_size(mut self, initial_read_size: u64) -> Self {
49 self.options.initial_read_size = initial_read_size;
50 self
51 }
52
53 pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
55 self.options.segment_cache = segment_cache;
56 self
57 }
58
59 pub fn without_segment_cache(self) -> Self {
61 self.with_segment_cache(Arc::new(NoOpSegmentCache))
62 }
63
64 pub fn with_io_concurrency(mut self, io_concurrency: usize) -> Self {
65 self.options.io_concurrency = io_concurrency;
66 self
67 }
68
69 #[cfg(feature = "tokio")]
71 pub fn open_blocking(self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
72 futures::executor::block_on(self.open(read))
75 }
76
77 #[cfg(feature = "tokio")]
79 pub async fn open(mut self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
80 self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
81 self.open_read_at(vortex_io::TokioFile::open(read)?).await
82 }
83
84 pub async fn open_read_at<R: VortexReadAt + Send + Sync>(
87 self,
88 read: R,
89 ) -> VortexResult<VortexFile> {
90 let read = Arc::new(read);
91
92 let footer = if let Some(footer) = self.footer {
93 footer
94 } else {
95 self.read_footer(read.clone()).await?
96 };
97
98 let segment_cache = Arc::new(SegmentCacheMetrics::new(
99 InitialReadSegmentCache {
100 initial: self.options.initial_read_segments,
101 fallback: self.options.segment_cache,
102 },
103 self.metrics.clone(),
104 ));
105
106 let (events_source, events) = SegmentEvents::create();
108
109 let segment_source = Arc::new(SegmentCacheSourceAdapter::new(segment_cache, events_source));
111
112 let read = InstrumentedReadAt::new(read.clone(), &self.metrics);
113
114 let driver = CoalescedDriver::new(
115 read.performance_hint(),
116 footer.segment_map().clone(),
117 events,
118 self.metrics.clone(),
119 );
120
121 let io_concurrency = self.options.io_concurrency;
123 self.options
124 .io_dispatcher
125 .dispatch(move || {
126 async move {
127 let stream = driver
129 .map(|coalesced_req| coalesced_req.launch(&read))
130 .buffer_unordered(io_concurrency);
131 pin_mut!(stream);
132
133 stream.collect::<()>().await
135 }
136 })
137 .vortex_expect("Failed to spawn I/O driver");
138
139 Ok(VortexFile {
140 footer,
141 segment_source,
142 metrics: self.metrics,
143 })
144 }
145
146 async fn read_footer<R: VortexReadAt + Send + Sync>(
147 &self,
148 read: Arc<R>,
149 ) -> VortexResult<Footer> {
150 let file_size = match self.file_size {
152 None => self.dispatched_size(read.clone()).await?,
153 Some(file_size) => file_size,
154 };
155 let initial_read_size = self
156 .options
157 .initial_read_size
158 .max(MAX_FOOTER_SIZE as u64 + EOF_SIZE as u64)
160 .min(file_size);
161 let mut initial_offset = file_size - initial_read_size;
162 let mut initial_read: ByteBuffer = self
163 .dispatched_read(read.clone(), initial_offset..file_size)
164 .await?;
165
166 let postscript = self.parse_postscript(&initial_read)?;
167
168 let dtype_segment = self
170 .dtype
171 .is_none()
172 .then(|| {
173 postscript.dtype.ok_or_else(|| {
174 vortex_err!(
175 "Vortex file doesn't embed a DType and none provided to VortexOpenOptions"
176 )
177 })
178 })
179 .transpose()?;
180
181 let mut read_more_offset = initial_offset;
184 if let Some(dtype_segment) = &dtype_segment {
185 read_more_offset = read_more_offset.min(dtype_segment.offset);
186 }
187 if let Some(stats_segment) = &postscript.statistics {
188 read_more_offset = read_more_offset.min(stats_segment.offset);
189 }
190 read_more_offset = read_more_offset.min(postscript.layout.offset);
191 read_more_offset = read_more_offset.min(postscript.footer.offset);
192
193 if read_more_offset < initial_offset {
195 log::info!(
196 "Initial read from {initial_offset} did not cover all footer segments, reading from {read_more_offset}"
197 );
198
199 let mut new_initial_read =
200 ByteBufferMut::with_capacity(usize::try_from(file_size - read_more_offset)?);
201 new_initial_read.extend_from_slice(
202 &self
203 .dispatched_read(read, read_more_offset..initial_offset)
204 .await?,
205 );
206 new_initial_read.extend_from_slice(&initial_read);
207
208 initial_offset = read_more_offset;
209 initial_read = new_initial_read.freeze();
210 }
211
212 let dtype = dtype_segment
214 .map(|segment| self.parse_dtype(initial_offset, &initial_read, &segment))
215 .transpose()?
216 .unwrap_or_else(|| self.dtype.clone().vortex_expect("DType was provided"));
217 let file_stats = postscript
218 .statistics
219 .map(|segment| self.parse_file_statistics(initial_offset, &initial_read, &segment))
220 .transpose()?;
221 let footer = self.parse_footer(
222 initial_offset,
223 &initial_read,
224 &postscript.footer,
225 &postscript.layout,
226 dtype,
227 file_stats,
228 )?;
229
230 self.populate_initial_segments(initial_offset, &initial_read, &footer);
233
234 Ok(footer)
235 }
236
237 async fn dispatched_size<R: VortexReadAt + Send + Sync>(
239 &self,
240 read: Arc<R>,
241 ) -> VortexResult<u64> {
242 Ok(self
243 .options
244 .io_dispatcher
245 .dispatch(move || async move { read.size().await })?
246 .await??)
247 }
248
249 async fn dispatched_read<R: VortexReadAt + Send + Sync>(
251 &self,
252 read: Arc<R>,
253 range: Range<u64>,
254 ) -> VortexResult<ByteBuffer> {
255 Ok(self
256 .options
257 .io_dispatcher
258 .dispatch(move || async move { read.read_byte_range(range, Alignment::none()).await })?
259 .await??)
260 }
261
262 fn populate_initial_segments(
264 &self,
265 initial_offset: u64,
266 initial_read: &ByteBuffer,
267 footer: &Footer,
268 ) {
269 let first_idx = footer
270 .segment_map()
271 .partition_point(|segment| segment.offset < initial_offset);
272
273 for idx in first_idx..footer.segment_map().len() {
274 let segment = &footer.segment_map()[idx];
275 let segment_id =
276 SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
277 let offset =
278 usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
279 let buffer = initial_read
280 .slice(offset..offset + (segment.length as usize))
281 .aligned(segment.alignment);
282 self.options
283 .initial_read_segments
284 .insert(segment_id, buffer);
285 }
286 }
287}
288
289#[cfg(feature = "object_store")]
290impl VortexOpenOptions<GenericVortexFile> {
291 pub async fn open_object_store(
292 mut self,
293 object_store: &Arc<dyn object_store::ObjectStore>,
294 path: &str,
295 ) -> VortexResult<VortexFile> {
296 use std::path::Path;
297
298 use vortex_io::ObjectStoreReadAt;
299
300 self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
302
303 let local_path = Path::new("/").join(path);
308 if local_path.exists() {
309 self.open(local_path).await
311 } else {
312 self.open_read_at(ObjectStoreReadAt::new(
313 object_store.clone(),
314 path.into(),
315 None,
316 ))
317 .await
318 }
319 }
320}
321
322pub struct GenericFileOptions {
323 segment_cache: Arc<dyn SegmentCache>,
324 initial_read_size: u64,
325 initial_read_segments: DashMap<SegmentId, ByteBuffer>,
326 io_concurrency: usize,
329 io_dispatcher: IoDispatcher,
331}
332
333impl Default for GenericFileOptions {
334 fn default() -> Self {
335 Self {
336 segment_cache: Arc::new(NoOpSegmentCache),
337 initial_read_size: 0,
338 initial_read_segments: Default::default(),
339 io_concurrency: 8,
340 io_dispatcher: IoDispatcher::shared(),
341 }
342 }
343}