vortex_file/
generic.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::ops::Range;
5use std::sync::Arc;
6
7use futures::{StreamExt, pin_mut};
8use vortex_buffer::{Alignment, ByteBuffer, ByteBufferMut};
9use vortex_error::{VortexExpect, VortexResult, vortex_err};
10use vortex_io::{Dispatch, InstrumentedReadAt, IoDispatcher, VortexReadAt};
11use vortex_layout::segments::{SegmentEvents, SegmentId};
12use vortex_utils::aliases::dash_map::DashMap;
13
14use crate::driver::CoalescedDriver;
15use crate::segments::{
16    InitialReadSegmentCache, MokaSegmentCache, NoOpSegmentCache, SegmentCache, SegmentCacheMetrics,
17    SegmentCacheSourceAdapter,
18};
19use crate::{EOF_SIZE, FileType, Footer, MAX_FOOTER_SIZE, VortexFile, VortexOpenOptions};
20
21#[cfg(feature = "tokio")]
22static TOKIO_DISPATCHER: std::sync::LazyLock<IoDispatcher> =
23    std::sync::LazyLock::new(|| IoDispatcher::new_tokio(1));
24
25/// A type of Vortex file that supports any [`VortexReadAt`] implementation.
26///
27/// This is a reasonable choice for files backed by a network since it performs I/O coalescing.
28// TODO(ngates): rename to TokioVortexFile
29pub struct GenericVortexFile;
30
31impl FileType for GenericVortexFile {
32    type Options = GenericFileOptions;
33}
34
35impl VortexOpenOptions<GenericVortexFile> {
36    const INITIAL_READ_SIZE: u64 = 1 << 20; // 1 MB
37
38    /// Open a file using the provided [`VortexReadAt`] implementation.
39    pub fn file() -> Self {
40        Self::new(Default::default())
41            // Start with an initial in-memory cache of 256MB.
42            // TODO(ngates): would it be better to default to a home directory disk cache?
43            .with_segment_cache(Arc::new(MokaSegmentCache::new(256 << 20)))
44            .with_initial_read_size(Self::INITIAL_READ_SIZE)
45    }
46
47    /// Configure the initial read size for the Vortex file.
48    pub fn with_initial_read_size(mut self, initial_read_size: u64) -> Self {
49        self.options.initial_read_size = initial_read_size;
50        self
51    }
52
53    /// Configure a custom [`SegmentCache`].
54    pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
55        self.options.segment_cache = segment_cache;
56        self
57    }
58
59    /// Disable segment caching entirely.
60    pub fn without_segment_cache(self) -> Self {
61        self.with_segment_cache(Arc::new(NoOpSegmentCache))
62    }
63
64    pub fn with_io_concurrency(mut self, io_concurrency: usize) -> Self {
65        self.options.io_concurrency = io_concurrency;
66        self
67    }
68
69    /// Blocking call to open a Vortex file using the provided [`std::path::Path`].
70    #[cfg(feature = "tokio")]
71    pub fn open_blocking(self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
72        // Since we dispatch all I/O to a dedicated Tokio dispatcher thread, we can just
73        // block-on the async call to open.
74        futures::executor::block_on(self.open(read))
75    }
76
77    /// Open a Vortex file using the provided [`std::path::Path`].
78    #[cfg(feature = "tokio")]
79    pub async fn open(mut self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
80        self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
81        self.open_read_at(vortex_io::TokioFile::open(read)?).await
82    }
83
84    /// Low-level API for opening any [`VortexReadAt`]. Note that the user is responsible for
85    /// ensuring the `VortexReadAt` implementation is compatible with the chosen I/O dispatcher.
86    pub async fn open_read_at<R: VortexReadAt + Send + Sync>(
87        self,
88        read: R,
89    ) -> VortexResult<VortexFile> {
90        let read = Arc::new(read);
91
92        let footer = if let Some(footer) = self.footer {
93            footer
94        } else {
95            self.read_footer(read.clone()).await?
96        };
97
98        let segment_cache = Arc::new(SegmentCacheMetrics::new(
99            InitialReadSegmentCache {
100                initial: self.options.initial_read_segments,
101                fallback: self.options.segment_cache,
102            },
103            self.metrics.clone(),
104        ));
105
106        // We use segment events for driving I/O.
107        let (events_source, events) = SegmentEvents::create();
108
109        // Wrap the events source to first resolve segments from the initial read cache.
110        let segment_source = Arc::new(SegmentCacheSourceAdapter::new(segment_cache, events_source));
111
112        let read = InstrumentedReadAt::new(read.clone(), &self.metrics);
113
114        let driver = CoalescedDriver::new(
115            read.performance_hint(),
116            footer.segment_map().clone(),
117            events,
118            self.metrics.clone(),
119        );
120
121        // Spawn an I/O driver onto the dispatcher.
122        let io_concurrency = self.options.io_concurrency;
123        let io_dispatcher = self.options.io_dispatcher.clone();
124        self.options
125            .io_dispatcher
126            .dispatch(move || {
127                async move {
128                    // Drive the segment event stream.
129                    let stream = driver
130                        .map(|coalesced_req| {
131                            let read = read.clone();
132                            io_dispatcher
133                                .dispatch(move || coalesced_req.launch(read))
134                                .vortex_expect("Failed to dispatch I/O request")
135                        })
136                        .buffer_unordered(io_concurrency)
137                        .map(|result| result.vortex_expect("infallible"));
138                    pin_mut!(stream);
139
140                    // Drive the stream to completion.
141                    stream.collect::<()>().await
142                }
143            })
144            .vortex_expect("Failed to spawn I/O driver");
145
146        Ok(VortexFile {
147            footer,
148            segment_source,
149            metrics: self.metrics,
150        })
151    }
152
153    async fn read_footer<R: VortexReadAt + Send + Sync>(
154        &self,
155        read: Arc<R>,
156    ) -> VortexResult<Footer> {
157        // Fetch the file size and perform the initial read.
158        let file_size = match self.file_size {
159            None => self.dispatched_size(read.clone()).await?,
160            Some(file_size) => file_size,
161        };
162        let initial_read_size = self
163            .options
164            .initial_read_size
165            // Make sure we read enough to cover the postscript
166            .max(MAX_FOOTER_SIZE as u64 + EOF_SIZE as u64)
167            .min(file_size);
168        let mut initial_offset = file_size - initial_read_size;
169        let mut initial_read: ByteBuffer = self
170            .dispatched_read(read.clone(), initial_offset..file_size)
171            .await?;
172
173        let postscript = self.parse_postscript(&initial_read)?;
174
175        // If we haven't been provided a DType, we must read one from the file.
176        let dtype_segment = self
177            .dtype
178            .is_none()
179            .then(|| {
180                postscript.dtype.ok_or_else(|| {
181                    vortex_err!(
182                        "Vortex file doesn't embed a DType and none provided to VortexOpenOptions"
183                    )
184                })
185            })
186            .transpose()?;
187
188        // The other postscript segments are required, so now we figure out our the offset that
189        // contains all the required segments.
190        let mut read_more_offset = initial_offset;
191        if let Some(dtype_segment) = &dtype_segment {
192            read_more_offset = read_more_offset.min(dtype_segment.offset);
193        }
194        if let Some(stats_segment) = &postscript.statistics {
195            read_more_offset = read_more_offset.min(stats_segment.offset);
196        }
197        read_more_offset = read_more_offset.min(postscript.layout.offset);
198        read_more_offset = read_more_offset.min(postscript.footer.offset);
199
200        // Read more bytes if necessary.
201        if read_more_offset < initial_offset {
202            log::debug!(
203                "Initial read from {initial_offset} did not cover all footer segments, reading from {read_more_offset}"
204            );
205
206            let mut new_initial_read =
207                ByteBufferMut::with_capacity(usize::try_from(file_size - read_more_offset)?);
208            new_initial_read.extend_from_slice(
209                &self
210                    .dispatched_read(read, read_more_offset..initial_offset)
211                    .await?,
212            );
213            new_initial_read.extend_from_slice(&initial_read);
214
215            initial_offset = read_more_offset;
216            initial_read = new_initial_read.freeze();
217        }
218
219        // Now we read our initial segments.
220        let dtype = dtype_segment
221            .map(|segment| self.parse_dtype(initial_offset, &initial_read, &segment))
222            .transpose()?
223            .unwrap_or_else(|| self.dtype.clone().vortex_expect("DType was provided"));
224        let file_stats = postscript
225            .statistics
226            .map(|segment| self.parse_file_statistics(initial_offset, &initial_read, &segment))
227            .transpose()?;
228        let footer = self.parse_footer(
229            initial_offset,
230            &initial_read,
231            &postscript.footer,
232            &postscript.layout,
233            dtype,
234            file_stats,
235        )?;
236
237        // If the initial read happened to cover any segments, then we can populate the
238        // segment cache
239        self.populate_initial_segments(initial_offset, &initial_read, &footer);
240
241        Ok(footer)
242    }
243
244    /// Dispatch a [`VortexReadAt::size`] request onto the configured I/O dispatcher.
245    async fn dispatched_size<R: VortexReadAt + Send + Sync>(
246        &self,
247        read: Arc<R>,
248    ) -> VortexResult<u64> {
249        Ok(self
250            .options
251            .io_dispatcher
252            .dispatch(move || async move { read.size().await })?
253            .await??)
254    }
255
256    /// Dispatch a read onto the configured I/O dispatcher.
257    async fn dispatched_read<R: VortexReadAt + Send + Sync>(
258        &self,
259        read: Arc<R>,
260        range: Range<u64>,
261    ) -> VortexResult<ByteBuffer> {
262        Ok(self
263            .options
264            .io_dispatcher
265            .dispatch(move || async move { read.read_byte_range(range, Alignment::none()).await })?
266            .await??)
267    }
268
269    /// Populate segments in the cache that were covered by the initial read.
270    fn populate_initial_segments(
271        &self,
272        initial_offset: u64,
273        initial_read: &ByteBuffer,
274        footer: &Footer,
275    ) {
276        let first_idx = footer
277            .segment_map()
278            .partition_point(|segment| segment.offset < initial_offset);
279
280        for idx in first_idx..footer.segment_map().len() {
281            let segment = &footer.segment_map()[idx];
282            let segment_id =
283                SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
284            let offset =
285                usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
286            let buffer = initial_read
287                .slice(offset..offset + (segment.length as usize))
288                .aligned(segment.alignment);
289            self.options
290                .initial_read_segments
291                .insert(segment_id, buffer);
292        }
293    }
294}
295
296#[cfg(feature = "object_store")]
297impl VortexOpenOptions<GenericVortexFile> {
298    pub async fn open_object_store(
299        mut self,
300        object_store: &Arc<dyn object_store::ObjectStore>,
301        path: &str,
302    ) -> VortexResult<VortexFile> {
303        use std::path::Path;
304
305        use vortex_io::ObjectStoreReadAt;
306
307        // Object store _must_ use tokio for I/O.
308        self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
309
310        // If the file is local, we much prefer to use TokioFile since object store re-opens the
311        // file on every read. This check is a little naive... but we hope that ObjectStore will
312        // soon expose the scheme in a way that we can check more thoroughly.
313        // See: https://github.com/apache/arrow-rs-object-store/issues/259
314        let local_path = Path::new("/").join(path);
315        if local_path.exists() {
316            // Local disk is too fast to justify prefetching.
317            self.open(local_path).await
318        } else {
319            self.open_read_at(ObjectStoreReadAt::new(
320                object_store.clone(),
321                path.into(),
322                None,
323            ))
324            .await
325        }
326    }
327}
328
329pub struct GenericFileOptions {
330    segment_cache: Arc<dyn SegmentCache>,
331    initial_read_size: u64,
332    initial_read_segments: DashMap<SegmentId, ByteBuffer>,
333    /// The number of concurrent I/O requests to spawn.
334    /// This should be smaller than execution concurrency for coalescing to occur.
335    io_concurrency: usize,
336    /// The dispatcher to use for I/O requests.
337    io_dispatcher: IoDispatcher,
338}
339
340impl Default for GenericFileOptions {
341    fn default() -> Self {
342        Self {
343            segment_cache: Arc::new(NoOpSegmentCache),
344            initial_read_size: 0,
345            initial_read_segments: Default::default(),
346            io_concurrency: 8,
347            io_dispatcher: IoDispatcher::shared(),
348        }
349    }
350}