vortex_file/
generic.rs

1use std::ops::Range;
2use std::sync::{Arc, RwLock};
3
4use futures::{StreamExt, pin_mut};
5use vortex_array::aliases::hash_map::HashMap;
6use vortex_buffer::{Alignment, ByteBuffer, ByteBufferMut};
7use vortex_error::{VortexExpect, VortexResult, vortex_err};
8use vortex_io::{Dispatch, InstrumentedReadAt, IoDispatcher, VortexReadAt};
9use vortex_layout::segments::{SegmentEvents, SegmentId, SegmentSource};
10use vortex_metrics::VortexMetrics;
11
12use crate::driver::CoalescedDriver;
13use crate::segments::{
14    InitialReadSegmentCache, MokaSegmentCache, NoOpSegmentCache, SegmentCache, SegmentCacheMetrics,
15    SegmentCacheSourceAdapter,
16};
17use crate::{
18    EOF_SIZE, FileType, Footer, MAX_FOOTER_SIZE, SegmentSourceFactory, SegmentSpec, VortexFile,
19    VortexOpenOptions,
20};
21
22#[cfg(feature = "tokio")]
23static TOKIO_DISPATCHER: std::sync::LazyLock<IoDispatcher> =
24    std::sync::LazyLock::new(|| IoDispatcher::new_tokio(1));
25
26/// A type of Vortex file that supports any [`VortexReadAt`] implementation.
27///
28/// This is a reasonable choice for files backed by a network since it performs I/O coalescing.
29// TODO(ngates): rename to TokioVortexFile
30pub struct GenericVortexFile;
31
32impl FileType for GenericVortexFile {
33    type Options = GenericFileOptions;
34}
35
36impl VortexOpenOptions<GenericVortexFile> {
37    const INITIAL_READ_SIZE: u64 = 1 << 20; // 1 MB
38
39    /// Open a file using the provided [`VortexReadAt`] implementation.
40    pub fn file() -> Self {
41        Self::new(Default::default())
42            // Start with an initial in-memory cache of 256MB.
43            // TODO(ngates): would it be better to default to a home directory disk cache?
44            .with_segment_cache(Arc::new(MokaSegmentCache::new(256 << 20)))
45            .with_initial_read_size(Self::INITIAL_READ_SIZE)
46    }
47
48    /// Configure the initial read size for the Vortex file.
49    pub fn with_initial_read_size(mut self, initial_read_size: u64) -> Self {
50        self.options.initial_read_size = initial_read_size;
51        self
52    }
53
54    /// Configure a custom [`SegmentCache`].
55    pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
56        self.options.segment_cache = segment_cache;
57        self
58    }
59
60    /// Disable segment caching entirely.
61    pub fn without_segment_cache(self) -> Self {
62        self.with_segment_cache(Arc::new(NoOpSegmentCache))
63    }
64
65    pub fn with_io_concurrency(mut self, io_concurrency: usize) -> Self {
66        self.options.io_concurrency = io_concurrency;
67        self
68    }
69
70    /// Blocking call to open a Vortex file using the provided [`std::path::Path`].
71    #[cfg(feature = "tokio")]
72    pub fn open_blocking(self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
73        // Since we dispatch all I/O to a dedicated Tokio dispatcher thread, we can just
74        // block-on the async call to open.
75        futures::executor::block_on(self.open(read))
76    }
77
78    /// Open a Vortex file using the provided [`std::path::Path`].
79    #[cfg(feature = "tokio")]
80    pub async fn open(mut self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
81        self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
82        self.open_read_at(vortex_io::TokioFile::open(read)?).await
83    }
84
85    /// Low-level API for opening any [`VortexReadAt`]. Note that the user is responsible for
86    /// ensuring the `VortexReadAt` implementation is compatible with the chosen I/O dispatcher.
87    pub async fn open_read_at<R: VortexReadAt + Send + Sync>(
88        self,
89        read: R,
90    ) -> VortexResult<VortexFile> {
91        let read = Arc::new(read);
92
93        let footer = if let Some(footer) = self.footer {
94            footer
95        } else {
96            self.read_footer(read.clone()).await?
97        };
98
99        let segment_cache = Arc::new(SegmentCacheMetrics::new(
100            InitialReadSegmentCache {
101                initial: self.options.initial_read_segments,
102                fallback: self.options.segment_cache,
103            },
104            self.metrics.clone(),
105        ));
106
107        let segment_source_factory = Arc::new(GenericVortexFileIo {
108            read,
109            segment_map: footer.segment_map().clone(),
110            segment_cache,
111            io_dispatcher: self.options.io_dispatcher,
112            io_concurrency: self.options.io_concurrency,
113        });
114
115        Ok(VortexFile {
116            footer,
117            segment_source_factory,
118            metrics: self.metrics,
119        })
120    }
121
122    async fn read_footer<R: VortexReadAt + Send + Sync>(
123        &self,
124        read: Arc<R>,
125    ) -> VortexResult<Footer> {
126        // Fetch the file size and perform the initial read.
127        let file_size = match self.file_size {
128            None => self.dispatched_size(read.clone()).await?,
129            Some(file_size) => file_size,
130        };
131        let initial_read_size = self
132            .options
133            .initial_read_size
134            // Make sure we read enough to cover the postscript
135            .max(MAX_FOOTER_SIZE as u64 + EOF_SIZE as u64)
136            .min(file_size);
137        let mut initial_offset = file_size - initial_read_size;
138        let mut initial_read: ByteBuffer = self
139            .dispatched_read(read.clone(), initial_offset..file_size)
140            .await?;
141
142        let postscript = self.parse_postscript(&initial_read)?;
143
144        // If we haven't been provided a DType, we must read one from the file.
145        let dtype_segment = self
146            .dtype
147            .is_none()
148            .then(|| {
149                postscript.dtype.ok_or_else(|| {
150                    vortex_err!(
151                        "Vortex file doesn't embed a DType and none provided to VortexOpenOptions"
152                    )
153                })
154            })
155            .transpose()?;
156
157        // The other postscript segments are required, so now we figure out our the offset that
158        // contains all the required segments.
159        let mut read_more_offset = initial_offset;
160        if let Some(dtype_segment) = &dtype_segment {
161            read_more_offset = read_more_offset.min(dtype_segment.offset);
162        }
163        if let Some(stats_segment) = &postscript.statistics {
164            read_more_offset = read_more_offset.min(stats_segment.offset);
165        }
166        read_more_offset = read_more_offset.min(postscript.layout.offset);
167        read_more_offset = read_more_offset.min(postscript.footer.offset);
168
169        // Read more bytes if necessary.
170        if read_more_offset < initial_offset {
171            log::info!(
172                "Initial read from {} did not cover all footer segments, reading from {}",
173                initial_offset,
174                read_more_offset
175            );
176
177            let mut new_initial_read =
178                ByteBufferMut::with_capacity(usize::try_from(file_size - read_more_offset)?);
179            new_initial_read.extend_from_slice(
180                &self
181                    .dispatched_read(read, read_more_offset..initial_offset)
182                    .await?,
183            );
184            new_initial_read.extend_from_slice(&initial_read);
185
186            initial_offset = read_more_offset;
187            initial_read = new_initial_read.freeze();
188        }
189
190        // Now we read our initial segments.
191        let dtype = dtype_segment
192            .map(|segment| self.parse_dtype(initial_offset, &initial_read, &segment))
193            .transpose()?
194            .unwrap_or_else(|| self.dtype.clone().vortex_expect("DType was provided"));
195        let file_stats = postscript
196            .statistics
197            .map(|segment| self.parse_file_statistics(initial_offset, &initial_read, &segment))
198            .transpose()?;
199        let footer = self.parse_footer(
200            initial_offset,
201            &initial_read,
202            &postscript.footer,
203            &postscript.layout,
204            dtype,
205            file_stats,
206        )?;
207
208        // If the initial read happened to cover any segments, then we can populate the
209        // segment cache
210        self.populate_initial_segments(initial_offset, &initial_read, &footer);
211
212        Ok(footer)
213    }
214
215    /// Dispatch a [`VortexReadAt::size`] request onto the configured I/O dispatcher.
216    async fn dispatched_size<R: VortexReadAt + Send + Sync>(
217        &self,
218        read: Arc<R>,
219    ) -> VortexResult<u64> {
220        Ok(self
221            .options
222            .io_dispatcher
223            .dispatch(move || async move { read.size().await })?
224            .await??)
225    }
226
227    /// Dispatch a read onto the configured I/O dispatcher.
228    async fn dispatched_read<R: VortexReadAt + Send + Sync>(
229        &self,
230        read: Arc<R>,
231        range: Range<u64>,
232    ) -> VortexResult<ByteBuffer> {
233        Ok(self
234            .options
235            .io_dispatcher
236            .dispatch(move || async move { read.read_byte_range(range, Alignment::none()).await })?
237            .await??)
238    }
239
240    /// Populate segments in the cache that were covered by the initial read.
241    fn populate_initial_segments(
242        &self,
243        initial_offset: u64,
244        initial_read: &ByteBuffer,
245        footer: &Footer,
246    ) {
247        let first_idx = footer
248            .segment_map()
249            .partition_point(|segment| segment.offset < initial_offset);
250
251        let mut initial_segments = self
252            .options
253            .initial_read_segments
254            .write()
255            .vortex_expect("poisoned lock");
256
257        for idx in first_idx..footer.segment_map().len() {
258            let segment = &footer.segment_map()[idx];
259            let segment_id =
260                SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
261            let offset =
262                usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
263            let buffer = initial_read
264                .slice(offset..offset + (segment.length as usize))
265                .aligned(segment.alignment);
266            initial_segments.insert(segment_id, buffer);
267        }
268    }
269}
270
271struct GenericVortexFileIo<R> {
272    read: Arc<R>,
273    segment_map: Arc<[SegmentSpec]>,
274    segment_cache: Arc<dyn SegmentCache>,
275    io_dispatcher: IoDispatcher,
276    io_concurrency: usize,
277}
278
279impl<R: VortexReadAt + Send + Sync> SegmentSourceFactory for GenericVortexFileIo<R> {
280    fn segment_source(&self, metrics: VortexMetrics) -> Arc<dyn SegmentSource> {
281        // We use segment events for driving I/O.
282        let (segment_source, events) = SegmentEvents::create();
283
284        // Wrap the source to resolve segments from the initial read cache.
285        let segment_source = Arc::new(SegmentCacheSourceAdapter::new(
286            self.segment_cache.clone(),
287            segment_source,
288        ));
289
290        let read = InstrumentedReadAt::new(self.read.clone(), &metrics);
291
292        let driver = CoalescedDriver::new(
293            read.performance_hint(),
294            self.segment_map.clone(),
295            events,
296            metrics,
297        );
298
299        // Spawn an I/O driver onto the dispatcher.
300        let io_concurrency = self.io_concurrency;
301        self.io_dispatcher
302            .dispatch(move || {
303                async move {
304                    // Drive the segment event stream.
305                    let stream = driver
306                        .map(|coalesced_req| coalesced_req.launch(&read))
307                        .buffer_unordered(io_concurrency);
308                    pin_mut!(stream);
309
310                    // Drive the stream to completion.
311                    stream.collect::<()>().await
312                }
313            })
314            .vortex_expect("Failed to spawn I/O driver");
315
316        segment_source
317    }
318}
319
320#[cfg(feature = "object_store")]
321impl VortexOpenOptions<GenericVortexFile> {
322    pub async fn open_object_store(
323        mut self,
324        object_store: &Arc<dyn object_store::ObjectStore>,
325        path: &str,
326    ) -> VortexResult<VortexFile> {
327        use std::path::Path;
328
329        use vortex_io::ObjectStoreReadAt;
330
331        // Object store _must_ use tokio for I/O.
332        self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
333
334        // If the file is local, we much prefer to use TokioFile since object store re-opens the
335        // file on every read. This check is a little naive... but we hope that ObjectStore will
336        // soon expose the scheme in a way that we can check more thoroughly.
337        // See: https://github.com/apache/arrow-rs-object-store/issues/259
338        let local_path = Path::new("/").join(path);
339        if local_path.exists() {
340            // Local disk is too fast to justify prefetching.
341            self.open(local_path).await
342        } else {
343            self.open_read_at(ObjectStoreReadAt::new(
344                object_store.clone(),
345                path.into(),
346                None,
347            ))
348            .await
349        }
350    }
351}
352
353pub struct GenericFileOptions {
354    segment_cache: Arc<dyn SegmentCache>,
355    initial_read_size: u64,
356    initial_read_segments: RwLock<HashMap<SegmentId, ByteBuffer>>,
357    /// The number of concurrent I/O requests to spawn.
358    /// This should be smaller than execution concurrency for coalescing to occur.
359    io_concurrency: usize,
360    /// The dispatcher to use for I/O requests.
361    io_dispatcher: IoDispatcher,
362}
363
364impl Default for GenericFileOptions {
365    fn default() -> Self {
366        Self {
367            segment_cache: Arc::new(NoOpSegmentCache),
368            initial_read_size: 0,
369            initial_read_segments: Default::default(),
370            io_concurrency: 8,
371            io_dispatcher: IoDispatcher::shared(),
372        }
373    }
374}