vortex_file/
generic.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::ops::Range;
5use std::sync::Arc;
6
7use dashmap::DashMap;
8use futures::{StreamExt, pin_mut};
9use vortex_buffer::{Alignment, ByteBuffer, ByteBufferMut};
10use vortex_error::{VortexExpect, VortexResult, vortex_err};
11use vortex_io::{Dispatch, InstrumentedReadAt, IoDispatcher, VortexReadAt};
12use vortex_layout::segments::{SegmentEvents, SegmentId};
13
14use crate::driver::CoalescedDriver;
15use crate::segments::{
16    InitialReadSegmentCache, MokaSegmentCache, NoOpSegmentCache, SegmentCache, SegmentCacheMetrics,
17    SegmentCacheSourceAdapter,
18};
19use crate::{EOF_SIZE, FileType, Footer, MAX_FOOTER_SIZE, VortexFile, VortexOpenOptions};
20
21#[cfg(feature = "tokio")]
22static TOKIO_DISPATCHER: std::sync::LazyLock<IoDispatcher> =
23    std::sync::LazyLock::new(|| IoDispatcher::new_tokio(1));
24
25/// A type of Vortex file that supports any [`VortexReadAt`] implementation.
26///
27/// This is a reasonable choice for files backed by a network since it performs I/O coalescing.
28// TODO(ngates): rename to TokioVortexFile
29pub struct GenericVortexFile;
30
31impl FileType for GenericVortexFile {
32    type Options = GenericFileOptions;
33}
34
35impl VortexOpenOptions<GenericVortexFile> {
36    const INITIAL_READ_SIZE: u64 = 1 << 20; // 1 MB
37
38    /// Open a file using the provided [`VortexReadAt`] implementation.
39    pub fn file() -> Self {
40        Self::new(Default::default())
41            // Start with an initial in-memory cache of 256MB.
42            // TODO(ngates): would it be better to default to a home directory disk cache?
43            .with_segment_cache(Arc::new(MokaSegmentCache::new(256 << 20)))
44            .with_initial_read_size(Self::INITIAL_READ_SIZE)
45    }
46
47    /// Configure the initial read size for the Vortex file.
48    pub fn with_initial_read_size(mut self, initial_read_size: u64) -> Self {
49        self.options.initial_read_size = initial_read_size;
50        self
51    }
52
53    /// Configure a custom [`SegmentCache`].
54    pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
55        self.options.segment_cache = segment_cache;
56        self
57    }
58
59    /// Disable segment caching entirely.
60    pub fn without_segment_cache(self) -> Self {
61        self.with_segment_cache(Arc::new(NoOpSegmentCache))
62    }
63
64    pub fn with_io_concurrency(mut self, io_concurrency: usize) -> Self {
65        self.options.io_concurrency = io_concurrency;
66        self
67    }
68
69    /// Blocking call to open a Vortex file using the provided [`std::path::Path`].
70    #[cfg(feature = "tokio")]
71    pub fn open_blocking(self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
72        // Since we dispatch all I/O to a dedicated Tokio dispatcher thread, we can just
73        // block-on the async call to open.
74        futures::executor::block_on(self.open(read))
75    }
76
77    /// Open a Vortex file using the provided [`std::path::Path`].
78    #[cfg(feature = "tokio")]
79    pub async fn open(mut self, read: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
80        self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
81        self.open_read_at(vortex_io::TokioFile::open(read)?).await
82    }
83
84    /// Low-level API for opening any [`VortexReadAt`]. Note that the user is responsible for
85    /// ensuring the `VortexReadAt` implementation is compatible with the chosen I/O dispatcher.
86    pub async fn open_read_at<R: VortexReadAt + Send + Sync>(
87        self,
88        read: R,
89    ) -> VortexResult<VortexFile> {
90        let read = Arc::new(read);
91
92        let footer = if let Some(footer) = self.footer {
93            footer
94        } else {
95            self.read_footer(read.clone()).await?
96        };
97
98        let segment_cache = Arc::new(SegmentCacheMetrics::new(
99            InitialReadSegmentCache {
100                initial: self.options.initial_read_segments,
101                fallback: self.options.segment_cache,
102            },
103            self.metrics.clone(),
104        ));
105
106        // We use segment events for driving I/O.
107        let (events_source, events) = SegmentEvents::create();
108
109        // Wrap the events source to first resolve segments from the initial read cache.
110        let segment_source = Arc::new(SegmentCacheSourceAdapter::new(segment_cache, events_source));
111
112        let read = InstrumentedReadAt::new(read.clone(), &self.metrics);
113
114        let driver = CoalescedDriver::new(
115            read.performance_hint(),
116            footer.segment_map().clone(),
117            events,
118            self.metrics.clone(),
119        );
120
121        // Spawn an I/O driver onto the dispatcher.
122        let io_concurrency = self.options.io_concurrency;
123        self.options
124            .io_dispatcher
125            .dispatch(move || {
126                async move {
127                    // Drive the segment event stream.
128                    let stream = driver
129                        .map(|coalesced_req| coalesced_req.launch(&read))
130                        .buffer_unordered(io_concurrency);
131                    pin_mut!(stream);
132
133                    // Drive the stream to completion.
134                    stream.collect::<()>().await
135                }
136            })
137            .vortex_expect("Failed to spawn I/O driver");
138
139        Ok(VortexFile {
140            footer,
141            segment_source,
142            metrics: self.metrics,
143        })
144    }
145
146    async fn read_footer<R: VortexReadAt + Send + Sync>(
147        &self,
148        read: Arc<R>,
149    ) -> VortexResult<Footer> {
150        // Fetch the file size and perform the initial read.
151        let file_size = match self.file_size {
152            None => self.dispatched_size(read.clone()).await?,
153            Some(file_size) => file_size,
154        };
155        let initial_read_size = self
156            .options
157            .initial_read_size
158            // Make sure we read enough to cover the postscript
159            .max(MAX_FOOTER_SIZE as u64 + EOF_SIZE as u64)
160            .min(file_size);
161        let mut initial_offset = file_size - initial_read_size;
162        let mut initial_read: ByteBuffer = self
163            .dispatched_read(read.clone(), initial_offset..file_size)
164            .await?;
165
166        let postscript = self.parse_postscript(&initial_read)?;
167
168        // If we haven't been provided a DType, we must read one from the file.
169        let dtype_segment = self
170            .dtype
171            .is_none()
172            .then(|| {
173                postscript.dtype.ok_or_else(|| {
174                    vortex_err!(
175                        "Vortex file doesn't embed a DType and none provided to VortexOpenOptions"
176                    )
177                })
178            })
179            .transpose()?;
180
181        // The other postscript segments are required, so now we figure out our the offset that
182        // contains all the required segments.
183        let mut read_more_offset = initial_offset;
184        if let Some(dtype_segment) = &dtype_segment {
185            read_more_offset = read_more_offset.min(dtype_segment.offset);
186        }
187        if let Some(stats_segment) = &postscript.statistics {
188            read_more_offset = read_more_offset.min(stats_segment.offset);
189        }
190        read_more_offset = read_more_offset.min(postscript.layout.offset);
191        read_more_offset = read_more_offset.min(postscript.footer.offset);
192
193        // Read more bytes if necessary.
194        if read_more_offset < initial_offset {
195            log::info!(
196                "Initial read from {initial_offset} did not cover all footer segments, reading from {read_more_offset}"
197            );
198
199            let mut new_initial_read =
200                ByteBufferMut::with_capacity(usize::try_from(file_size - read_more_offset)?);
201            new_initial_read.extend_from_slice(
202                &self
203                    .dispatched_read(read, read_more_offset..initial_offset)
204                    .await?,
205            );
206            new_initial_read.extend_from_slice(&initial_read);
207
208            initial_offset = read_more_offset;
209            initial_read = new_initial_read.freeze();
210        }
211
212        // Now we read our initial segments.
213        let dtype = dtype_segment
214            .map(|segment| self.parse_dtype(initial_offset, &initial_read, &segment))
215            .transpose()?
216            .unwrap_or_else(|| self.dtype.clone().vortex_expect("DType was provided"));
217        let file_stats = postscript
218            .statistics
219            .map(|segment| self.parse_file_statistics(initial_offset, &initial_read, &segment))
220            .transpose()?;
221        let footer = self.parse_footer(
222            initial_offset,
223            &initial_read,
224            &postscript.footer,
225            &postscript.layout,
226            dtype,
227            file_stats,
228        )?;
229
230        // If the initial read happened to cover any segments, then we can populate the
231        // segment cache
232        self.populate_initial_segments(initial_offset, &initial_read, &footer);
233
234        Ok(footer)
235    }
236
237    /// Dispatch a [`VortexReadAt::size`] request onto the configured I/O dispatcher.
238    async fn dispatched_size<R: VortexReadAt + Send + Sync>(
239        &self,
240        read: Arc<R>,
241    ) -> VortexResult<u64> {
242        Ok(self
243            .options
244            .io_dispatcher
245            .dispatch(move || async move { read.size().await })?
246            .await??)
247    }
248
249    /// Dispatch a read onto the configured I/O dispatcher.
250    async fn dispatched_read<R: VortexReadAt + Send + Sync>(
251        &self,
252        read: Arc<R>,
253        range: Range<u64>,
254    ) -> VortexResult<ByteBuffer> {
255        Ok(self
256            .options
257            .io_dispatcher
258            .dispatch(move || async move { read.read_byte_range(range, Alignment::none()).await })?
259            .await??)
260    }
261
262    /// Populate segments in the cache that were covered by the initial read.
263    fn populate_initial_segments(
264        &self,
265        initial_offset: u64,
266        initial_read: &ByteBuffer,
267        footer: &Footer,
268    ) {
269        let first_idx = footer
270            .segment_map()
271            .partition_point(|segment| segment.offset < initial_offset);
272
273        for idx in first_idx..footer.segment_map().len() {
274            let segment = &footer.segment_map()[idx];
275            let segment_id =
276                SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
277            let offset =
278                usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
279            let buffer = initial_read
280                .slice(offset..offset + (segment.length as usize))
281                .aligned(segment.alignment);
282            self.options
283                .initial_read_segments
284                .insert(segment_id, buffer);
285        }
286    }
287}
288
289#[cfg(feature = "object_store")]
290impl VortexOpenOptions<GenericVortexFile> {
291    pub async fn open_object_store(
292        mut self,
293        object_store: &Arc<dyn object_store::ObjectStore>,
294        path: &str,
295    ) -> VortexResult<VortexFile> {
296        use std::path::Path;
297
298        use vortex_io::ObjectStoreReadAt;
299
300        // Object store _must_ use tokio for I/O.
301        self.options.io_dispatcher = TOKIO_DISPATCHER.clone();
302
303        // If the file is local, we much prefer to use TokioFile since object store re-opens the
304        // file on every read. This check is a little naive... but we hope that ObjectStore will
305        // soon expose the scheme in a way that we can check more thoroughly.
306        // See: https://github.com/apache/arrow-rs-object-store/issues/259
307        let local_path = Path::new("/").join(path);
308        if local_path.exists() {
309            // Local disk is too fast to justify prefetching.
310            self.open(local_path).await
311        } else {
312            self.open_read_at(ObjectStoreReadAt::new(
313                object_store.clone(),
314                path.into(),
315                None,
316            ))
317            .await
318        }
319    }
320}
321
322pub struct GenericFileOptions {
323    segment_cache: Arc<dyn SegmentCache>,
324    initial_read_size: u64,
325    initial_read_segments: DashMap<SegmentId, ByteBuffer>,
326    /// The number of concurrent I/O requests to spawn.
327    /// This should be smaller than execution concurrency for coalescing to occur.
328    io_concurrency: usize,
329    /// The dispatcher to use for I/O requests.
330    io_dispatcher: IoDispatcher,
331}
332
333impl Default for GenericFileOptions {
334    fn default() -> Self {
335        Self {
336            segment_cache: Arc::new(NoOpSegmentCache),
337            initial_read_size: 0,
338            initial_read_segments: Default::default(),
339            io_concurrency: 8,
340            io_dispatcher: IoDispatcher::shared(),
341        }
342    }
343}