vortex_file/
generic.rs

1use std::ops::Range;
2use std::sync::Arc;
3
4use dashmap::DashMap;
5use futures::{StreamExt, pin_mut};
6use vortex_buffer::{Alignment, ByteBuffer, ByteBufferMut};
7use vortex_error::{VortexExpect, VortexResult, vortex_err};
8use vortex_io::{Dispatch, InstrumentedReadAt, IoDispatcher, VortexReadAt};
9use vortex_layout::segments::{SegmentEvents, SegmentId, SegmentSource};
10use vortex_metrics::VortexMetrics;
11
12use crate::driver::CoalescedDriver;
13use crate::segments::{
14    InitialReadSegmentCache, MokaSegmentCache, NoOpSegmentCache, SegmentCache, SegmentCacheMetrics,
15    SegmentCacheSourceAdapter,
16};
17use crate::{
18    EOF_SIZE, FileType, Footer, MAX_FOOTER_SIZE, SegmentSourceFactory, SegmentSpec, VortexFile,
19    VortexOpenOptions,
20};
21
22#[cfg(feature = "tokio")]
23static TOKIO_DISPATCHER: std::sync::LazyLock<IoDispatcher> =
24    std::sync::LazyLock::new(|| IoDispatcher::new_tokio(1));
25
26/// A type of Vortex file that supports any [`VortexReadAt`] implementation.
27///
28/// This is a reasonable choice for files backed by a network since it performs I/O coalescing.
29// TODO(ngates): rename to TokioVortexFile
30pub struct GenericVortexFile;
31
32impl FileType for GenericVortexFile {
33    type Options = GenericFileOptions;
34}
35
36impl VortexOpenOptions<GenericVortexFile> {
37    const INITIAL_READ_SIZE: u64 = 1 << 20; // 1 MB
38
39    /// Open a file using the provided [`VortexReadAt`] implementation.
40    pub fn file() -> Self {
41        Self::new(Default::default())
42            // Start with an initial in-memory cache of 256MB.
43            // TODO(ngates): would it be better to default to a home directory disk cache?
44            .with_segment_cache(Arc::new(MokaSegmentCache::new(256 << 20)))
45            .with_initial_read_size(Self::INITIAL_READ_SIZE)
46    }
47
48    /// Configure the initial read size for the Vortex file.
49    pub fn with_initial_read_size(mut self, initial_read_size: u64) -> Self {
50        self.options.initial_read_size = initial_read_size;
51        self
52    }
53
54    /// Configure a custom [`SegmentCache`].
55    pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
56        self.options.segment_cache = segment_cache;
57        self
58    }
59
60    /// Disable segment caching entirely.
61    pub fn without_segment_cache(self) -> Self {
62        self.with_segment_cache(Arc::new(NoOpSegmentCache))
63    }
64
65    pub fn with_io_concurrency(mut self, io_concurrency: usize) -> Self {
66        self.options.io_concurrency = io_concurrency;
67        self
68    }
69
70    /// Blocking call to open a Vortex file using the provided [`std::path::Path`].
71    #[cfg(feature = "tokio")]
72    pub fn open_blocking(self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
73        // Since we dispatch all I/O to a dedicated Tokio dispatcher thread, we can just
74        // block-on the async call to open.
75        futures::executor::block_on(self.open(read))
76    }
77
78    /// Open a Vortex file using the provided [`std::path::Path`].
79    #[cfg(feature = "tokio")]
80    pub async fn open(mut self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
81        self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
82        self.open_read_at(vortex_io::TokioFile::open(read)?).await
83    }
84
85    /// Low-level API for opening any [`VortexReadAt`]. Note that the user is responsible for
86    /// ensuring the `VortexReadAt` implementation is compatible with the chosen I/O dispatcher.
87    pub async fn open_read_at<R: VortexReadAt + Send + Sync>(
88        self,
89        read: R,
90    ) -> VortexResult<VortexFile> {
91        let read = Arc::new(read);
92
93        let footer = if let Some(footer) = self.footer {
94            footer
95        } else {
96            self.read_footer(read.clone()).await?
97        };
98
99        let segment_cache = Arc::new(SegmentCacheMetrics::new(
100            InitialReadSegmentCache {
101                initial: self.options.initial_read_segments,
102                fallback: self.options.segment_cache,
103            },
104            self.metrics.clone(),
105        ));
106
107        let segment_source_factory = Arc::new(GenericVortexFileIo {
108            read,
109            segment_map: footer.segment_map().clone(),
110            segment_cache,
111            io_dispatcher: self.options.io_dispatcher,
112            io_concurrency: self.options.io_concurrency,
113        });
114
115        Ok(VortexFile {
116            footer,
117            segment_source_factory,
118            metrics: self.metrics,
119        })
120    }
121
122    async fn read_footer<R: VortexReadAt + Send + Sync>(
123        &self,
124        read: Arc<R>,
125    ) -> VortexResult<Footer> {
126        // Fetch the file size and perform the initial read.
127        let file_size = match self.file_size {
128            None => self.dispatched_size(read.clone()).await?,
129            Some(file_size) => file_size,
130        };
131        let initial_read_size = self
132            .options
133            .initial_read_size
134            // Make sure we read enough to cover the postscript
135            .max(MAX_FOOTER_SIZE as u64 + EOF_SIZE as u64)
136            .min(file_size);
137        let mut initial_offset = file_size - initial_read_size;
138        let mut initial_read: ByteBuffer = self
139            .dispatched_read(read.clone(), initial_offset..file_size)
140            .await?;
141
142        let postscript = self.parse_postscript(&initial_read)?;
143
144        // If we haven't been provided a DType, we must read one from the file.
145        let dtype_segment = self
146            .dtype
147            .is_none()
148            .then(|| {
149                postscript.dtype.ok_or_else(|| {
150                    vortex_err!(
151                        "Vortex file doesn't embed a DType and none provided to VortexOpenOptions"
152                    )
153                })
154            })
155            .transpose()?;
156
157        // The other postscript segments are required, so now we figure out our the offset that
158        // contains all the required segments.
159        let mut read_more_offset = initial_offset;
160        if let Some(dtype_segment) = &dtype_segment {
161            read_more_offset = read_more_offset.min(dtype_segment.offset);
162        }
163        if let Some(stats_segment) = &postscript.statistics {
164            read_more_offset = read_more_offset.min(stats_segment.offset);
165        }
166        read_more_offset = read_more_offset.min(postscript.layout.offset);
167        read_more_offset = read_more_offset.min(postscript.footer.offset);
168
169        // Read more bytes if necessary.
170        if read_more_offset < initial_offset {
171            log::info!(
172                "Initial read from {} did not cover all footer segments, reading from {}",
173                initial_offset,
174                read_more_offset
175            );
176
177            let mut new_initial_read =
178                ByteBufferMut::with_capacity(usize::try_from(file_size - read_more_offset)?);
179            new_initial_read.extend_from_slice(
180                &self
181                    .dispatched_read(read, read_more_offset..initial_offset)
182                    .await?,
183            );
184            new_initial_read.extend_from_slice(&initial_read);
185
186            initial_offset = read_more_offset;
187            initial_read = new_initial_read.freeze();
188        }
189
190        // Now we read our initial segments.
191        let dtype = dtype_segment
192            .map(|segment| self.parse_dtype(initial_offset, &initial_read, &segment))
193            .transpose()?
194            .unwrap_or_else(|| self.dtype.clone().vortex_expect("DType was provided"));
195        let file_stats = postscript
196            .statistics
197            .map(|segment| self.parse_file_statistics(initial_offset, &initial_read, &segment))
198            .transpose()?;
199        let footer = self.parse_footer(
200            initial_offset,
201            &initial_read,
202            &postscript.footer,
203            &postscript.layout,
204            dtype,
205            file_stats,
206        )?;
207
208        // If the initial read happened to cover any segments, then we can populate the
209        // segment cache
210        self.populate_initial_segments(initial_offset, &initial_read, &footer);
211
212        Ok(footer)
213    }
214
215    /// Dispatch a [`VortexReadAt::size`] request onto the configured I/O dispatcher.
216    async fn dispatched_size<R: VortexReadAt + Send + Sync>(
217        &self,
218        read: Arc<R>,
219    ) -> VortexResult<u64> {
220        Ok(self
221            .options
222            .io_dispatcher
223            .dispatch(move || async move { read.size().await })?
224            .await??)
225    }
226
227    /// Dispatch a read onto the configured I/O dispatcher.
228    async fn dispatched_read<R: VortexReadAt + Send + Sync>(
229        &self,
230        read: Arc<R>,
231        range: Range<u64>,
232    ) -> VortexResult<ByteBuffer> {
233        Ok(self
234            .options
235            .io_dispatcher
236            .dispatch(move || async move { read.read_byte_range(range, Alignment::none()).await })?
237            .await??)
238    }
239
240    /// Populate segments in the cache that were covered by the initial read.
241    fn populate_initial_segments(
242        &self,
243        initial_offset: u64,
244        initial_read: &ByteBuffer,
245        footer: &Footer,
246    ) {
247        let first_idx = footer
248            .segment_map()
249            .partition_point(|segment| segment.offset < initial_offset);
250
251        for idx in first_idx..footer.segment_map().len() {
252            let segment = &footer.segment_map()[idx];
253            let segment_id =
254                SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
255            let offset =
256                usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
257            let buffer = initial_read
258                .slice(offset..offset + (segment.length as usize))
259                .aligned(segment.alignment);
260            self.options
261                .initial_read_segments
262                .insert(segment_id, buffer);
263        }
264    }
265}
266
267struct GenericVortexFileIo<R> {
268    read: Arc<R>,
269    segment_map: Arc<[SegmentSpec]>,
270    segment_cache: Arc<dyn SegmentCache>,
271    io_dispatcher: IoDispatcher,
272    io_concurrency: usize,
273}
274
275impl<R: VortexReadAt + Send + Sync> SegmentSourceFactory for GenericVortexFileIo<R> {
276    fn segment_source(&self, metrics: VortexMetrics) -> Arc<dyn SegmentSource> {
277        // We use segment events for driving I/O.
278        let (segment_source, events) = SegmentEvents::create();
279
280        // Wrap the source to resolve segments from the initial read cache.
281        let segment_source = Arc::new(SegmentCacheSourceAdapter::new(
282            self.segment_cache.clone(),
283            segment_source,
284        ));
285
286        let read = InstrumentedReadAt::new(self.read.clone(), &metrics);
287
288        let driver = CoalescedDriver::new(
289            read.performance_hint(),
290            self.segment_map.clone(),
291            events,
292            metrics,
293        );
294
295        // Spawn an I/O driver onto the dispatcher.
296        let io_concurrency = self.io_concurrency;
297        self.io_dispatcher
298            .dispatch(move || {
299                async move {
300                    // Drive the segment event stream.
301                    let stream = driver
302                        .map(|coalesced_req| coalesced_req.launch(&read))
303                        .buffer_unordered(io_concurrency);
304                    pin_mut!(stream);
305
306                    // Drive the stream to completion.
307                    stream.collect::<()>().await
308                }
309            })
310            .vortex_expect("Failed to spawn I/O driver");
311
312        segment_source
313    }
314}
315
316#[cfg(feature = "object_store")]
317impl VortexOpenOptions<GenericVortexFile> {
318    pub async fn open_object_store(
319        mut self,
320        object_store: &Arc<dyn object_store::ObjectStore>,
321        path: &str,
322    ) -> VortexResult<VortexFile> {
323        use std::path::Path;
324
325        use vortex_io::ObjectStoreReadAt;
326
327        // Object store _must_ use tokio for I/O.
328        self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
329
330        // If the file is local, we much prefer to use TokioFile since object store re-opens the
331        // file on every read. This check is a little naive... but we hope that ObjectStore will
332        // soon expose the scheme in a way that we can check more thoroughly.
333        // See: https://github.com/apache/arrow-rs-object-store/issues/259
334        let local_path = Path::new("/").join(path);
335        if local_path.exists() {
336            // Local disk is too fast to justify prefetching.
337            self.open(local_path).await
338        } else {
339            self.open_read_at(ObjectStoreReadAt::new(
340                object_store.clone(),
341                path.into(),
342                None,
343            ))
344            .await
345        }
346    }
347}
348
349pub struct GenericFileOptions {
350    segment_cache: Arc<dyn SegmentCache>,
351    initial_read_size: u64,
352    initial_read_segments: DashMap<SegmentId, ByteBuffer>,
353    /// The number of concurrent I/O requests to spawn.
354    /// This should be smaller than execution concurrency for coalescing to occur.
355    io_concurrency: usize,
356    /// The dispatcher to use for I/O requests.
357    io_dispatcher: IoDispatcher,
358}
359
360impl Default for GenericFileOptions {
361    fn default() -> Self {
362        Self {
363            segment_cache: Arc::new(NoOpSegmentCache),
364            initial_read_size: 0,
365            initial_read_segments: Default::default(),
366            io_concurrency: 8,
367            io_dispatcher: IoDispatcher::shared(),
368        }
369    }
370}