Skip to main content

vortex_file/
open.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use futures::executor::block_on;
7use parking_lot::RwLock;
8use vortex_array::dtype::DType;
9use vortex_array::session::ArraySessionExt;
10use vortex_buffer::Alignment;
11use vortex_buffer::ByteBuffer;
12use vortex_error::VortexError;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_io::VortexReadAt;
16use vortex_io::session::RuntimeSessionExt;
17use vortex_layout::segments::InstrumentedSegmentCache;
18use vortex_layout::segments::NoOpSegmentCache;
19use vortex_layout::segments::SegmentCache;
20use vortex_layout::segments::SegmentCacheSourceAdapter;
21use vortex_layout::segments::SegmentId;
22use vortex_layout::segments::SharedSegmentSource;
23use vortex_layout::session::LayoutSessionExt;
24use vortex_metrics::DefaultMetricsRegistry;
25use vortex_metrics::Label;
26use vortex_metrics::MetricsRegistry;
27use vortex_session::VortexSession;
28use vortex_utils::aliases::hash_map::HashMap;
29
30use crate::DeserializeStep;
31use crate::EOF_SIZE;
32use crate::MAX_POSTSCRIPT_SIZE;
33use crate::VortexFile;
34use crate::footer::Footer;
35use crate::segments::BufferSegmentSource;
36use crate::segments::FileSegmentSource;
37use crate::segments::InitialReadSegmentCache;
38use crate::segments::RequestMetrics;
39
40const INITIAL_READ_SIZE: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
41
42/// Open options for a Vortex file reader.
43pub struct VortexOpenOptions {
44    /// The session to use for opening the file.
45    session: VortexSession,
46    /// Cache to use for file segments.
47    segment_cache: Option<Arc<dyn SegmentCache>>,
48    /// The number of bytes to read when parsing the footer.
49    initial_read_size: usize,
50    /// An optional, externally provided, file size.
51    file_size: Option<u64>,
52    /// An optional, externally provided, DType.
53    dtype: Option<DType>,
54    /// An optional, externally provided, file layout.
55    footer: Option<Footer>,
56    /// The segments read during the initial read.
57    initial_read_segments: RwLock<HashMap<SegmentId, ByteBuffer>>,
58    /// A metrics registry for the file.
59    metrics_registry: Option<Arc<dyn MetricsRegistry>>,
60    /// Default labels applied to all the file's metrics
61    labels: Vec<Label>,
62}
63
64pub trait OpenOptionsSessionExt: ArraySessionExt + LayoutSessionExt + RuntimeSessionExt {
65    /// Create a new [`VortexOpenOptions`] using the provided session to open a file.
66    fn open_options(&self) -> VortexOpenOptions {
67        VortexOpenOptions {
68            session: self.session(),
69            segment_cache: None,
70            initial_read_size: INITIAL_READ_SIZE,
71            file_size: None,
72            dtype: None,
73            footer: None,
74            initial_read_segments: Default::default(),
75            metrics_registry: None,
76            labels: Vec::default(),
77        }
78    }
79}
80impl<S: ArraySessionExt + LayoutSessionExt + RuntimeSessionExt> OpenOptionsSessionExt for S {}
81
82impl VortexOpenOptions {
83    /// Configure the initial read size for the Vortex file.
84    pub fn with_initial_read_size(mut self, initial_read_size: usize) -> Self {
85        self.initial_read_size = initial_read_size;
86        self
87    }
88
89    /// Configure a custom [`SegmentCache`].
90    pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
91        self.segment_cache = Some(segment_cache);
92        self
93    }
94
95    /// Configure a known file size.
96    ///
97    /// This helps to prevent an I/O request to discover the size of the file.
98    /// Of course, all bets are off if you pass an incorrect value.
99    pub fn with_file_size(mut self, file_size: u64) -> Self {
100        self.file_size = Some(file_size);
101        self
102    }
103
104    /// Configure a known file size.
105    ///
106    /// This helps to prevent an I/O request to discover the size of the file.
107    /// Of course, all bets are off if you pass an incorrect value.
108    pub fn with_some_file_size(mut self, file_size: Option<u64>) -> Self {
109        self.file_size = file_size;
110        self
111    }
112
113    /// Configure a known DType.
114    ///
115    /// If this is provided, then the Vortex file may be opened with fewer I/O requests.
116    ///
117    /// For Vortex files that do not contain a `DType`, this is required.
118    pub fn with_dtype(mut self, dtype: DType) -> Self {
119        self.dtype = Some(dtype);
120        self
121    }
122
123    /// Configure a known file layout.
124    ///
125    /// If this is provided, then the Vortex file can be opened without performing any I/O.
126    /// Once open, the [`Footer`] can be accessed via [`crate::VortexFile::footer`].
127    pub fn with_footer(mut self, footer: Footer) -> Self {
128        self.dtype = Some(footer.layout().dtype().clone());
129        self.footer = Some(footer);
130        self
131    }
132
133    /// Configure a custom [`MetricsRegistry`] implementation.
134    pub fn with_metrics_registry(mut self, metrics: Arc<dyn MetricsRegistry>) -> Self {
135        self.metrics_registry = Some(metrics);
136        self
137    }
138
139    /// Adds labels to all the file's metrics.
140    pub fn with_labels(mut self, labels: Vec<Label>) -> Self {
141        self.labels.extend(labels);
142        self
143    }
144
145    /// Open a Vortex file using the provided I/O source.
146    ///
147    /// This is the most common way to open a [`VortexFile`] and tends to provide the best
148    /// out-of-the-box performance. The underlying I/O system will continue to be optimised for
149    /// different file systems and object stores so we encourage users to use this method
150    /// whenever possible and file issues if they encounter problems.
151    pub async fn open(self, source: Arc<dyn VortexReadAt>) -> VortexResult<VortexFile> {
152        self.open_read(source).await
153    }
154
155    /// Open a Vortex file from a filesystem path.
156    #[cfg(not(target_arch = "wasm32"))]
157    pub async fn open_path(self, path: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
158        use vortex_io::std_file::FileReadAt;
159        let handle = self.session.handle();
160        let source = Arc::new(FileReadAt::open(path, handle)?);
161        self.open(source).await
162    }
163
164    /// Open a Vortex file from an in-memory buffer.
165    ///
166    /// This uses a `BufferSegmentSource` that resolves segments synchronously
167    /// by slicing the buffer directly, bypassing the async I/O pipeline.
168    pub fn open_buffer<B: Into<ByteBuffer>>(self, buffer: B) -> VortexResult<VortexFile> {
169        let buffer: ByteBuffer = buffer.into();
170
171        if self.segment_cache.is_some() {
172            tracing::warn!("segment cache is ignored for in-memory `open_buffer`");
173        }
174        if self.metrics_registry.is_some() {
175            tracing::warn!("metrics registry is ignored for in-memory `open_buffer`");
176        }
177
178        let mut opts = self.with_initial_read_size(0);
179
180        let footer = match opts.footer.take() {
181            Some(footer) => footer,
182            None => block_on(opts.read_footer(&buffer))?,
183        };
184
185        let segment_source = Arc::new(BufferSegmentSource::new(
186            buffer,
187            footer.segment_map().clone(),
188        ));
189
190        Ok(VortexFile {
191            footer,
192            segment_source,
193            session: opts.session,
194        })
195    }
196
197    /// An API for opening a [`VortexFile`] using any [`VortexReadAt`] implementation.
198    pub async fn open_read<R: VortexReadAt + Clone>(self, reader: R) -> VortexResult<VortexFile> {
199        let segment_cache = self
200            .segment_cache
201            .clone()
202            .unwrap_or_else(|| Arc::new(NoOpSegmentCache));
203
204        let metrics_registry = self
205            .metrics_registry
206            .clone()
207            .unwrap_or_else(|| Arc::new(DefaultMetricsRegistry::default()));
208
209        let footer = if let Some(footer) = self.footer {
210            footer
211        } else {
212            self.read_footer(&reader).await?
213        };
214
215        let segment_cache = Arc::new(InstrumentedSegmentCache::new(
216            InitialReadSegmentCache {
217                initial: self.initial_read_segments,
218                fallback: segment_cache,
219            },
220            metrics_registry.as_ref(),
221            self.labels.clone(),
222        ));
223
224        let metrics = RequestMetrics::new(metrics_registry.as_ref(), self.labels);
225
226        // Create a segment source backed by the VortexRead implementation.
227        let segment_source = Arc::new(SharedSegmentSource::new(FileSegmentSource::open(
228            footer.segment_map().clone(),
229            reader,
230            self.session.handle(),
231            metrics,
232        )));
233
234        // Wrap up the segment source to first resolve segments from the initial read cache.
235        let segment_source = Arc::new(SegmentCacheSourceAdapter::new(
236            segment_cache,
237            segment_source,
238        ));
239
240        Ok(VortexFile {
241            footer,
242            segment_source,
243            session: self.session.clone(),
244        })
245    }
246
247    async fn read_footer(&self, read: &dyn VortexReadAt) -> VortexResult<Footer> {
248        // Fetch the file size and perform the initial read.
249        let file_size = match self.file_size {
250            None => read.size().await?,
251            Some(file_size) => file_size,
252        };
253        let mut initial_read_size = self
254            .initial_read_size
255            // Make sure we read enough to cover the postscript
256            .max(MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE);
257        if let Ok(file_size) = usize::try_from(file_size) {
258            initial_read_size = initial_read_size.min(file_size);
259        }
260
261        let initial_offset = file_size - initial_read_size as u64;
262        let initial_read: ByteBuffer = read
263            .read_at(initial_offset, initial_read_size, Alignment::none())
264            .await?
265            .try_into_host()?
266            .await?;
267
268        let mut deserializer = Footer::deserializer(initial_read, self.session.clone())
269            .with_size(file_size)
270            .with_some_dtype(self.dtype.clone());
271
272        let footer = loop {
273            match deserializer.deserialize()? {
274                DeserializeStep::NeedMoreData { offset, len } => {
275                    let more_data = read
276                        .read_at(offset, len, Alignment::none())
277                        .await?
278                        .try_into_host()?
279                        .await?;
280                    deserializer.prefix_data(more_data);
281                }
282                DeserializeStep::NeedFileSize => unreachable!("We passed file_size above"),
283                DeserializeStep::Done(footer) => break Ok::<_, VortexError>(footer),
284            }
285        }?;
286
287        // If the initial read happened to cover any segments, then we can populate the
288        // segment cache
289        let initial_offset = file_size - (deserializer.buffer().len() as u64);
290        self.populate_initial_segments(initial_offset, deserializer.buffer(), &footer);
291
292        Ok(footer)
293    }
294
295    /// Populate segments in the cache that were covered by the initial read.
296    fn populate_initial_segments(
297        &self,
298        initial_offset: u64,
299        initial_read: &ByteBuffer,
300        footer: &Footer,
301    ) {
302        let first_idx = footer
303            .segment_map()
304            .partition_point(|segment| segment.offset < initial_offset);
305
306        let mut initial_read_segments = self.initial_read_segments.write();
307
308        for idx in first_idx..footer.segment_map().len() {
309            let segment = &footer.segment_map()[idx];
310            let segment_id =
311                SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
312            let offset =
313                usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
314            let buffer = initial_read
315                .slice(offset..offset + (segment.length as usize))
316                .aligned(segment.alignment);
317            initial_read_segments.insert(segment_id, buffer);
318        }
319    }
320}
321
322#[cfg(feature = "object_store")]
323impl VortexOpenOptions {
324    pub async fn open_object_store(
325        self,
326        object_store: &Arc<dyn object_store::ObjectStore>,
327        path: &str,
328    ) -> VortexResult<VortexFile> {
329        use vortex_io::object_store::ObjectStoreReadAt;
330
331        let handle = self.session.handle();
332        let source = Arc::new(ObjectStoreReadAt::new(
333            object_store.clone(),
334            path.into(),
335            handle,
336        ));
337        self.open(source).await
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use std::sync::atomic::AtomicUsize;
344    use std::sync::atomic::Ordering;
345
346    use futures::future::BoxFuture;
347    use vortex_array::IntoArray;
348    use vortex_array::buffer::BufferHandle;
349    use vortex_array::dtype::session::DTypeSession;
350    use vortex_array::scalar_fn::session::ScalarFnSession;
351    use vortex_array::session::ArraySession;
352    use vortex_buffer::Buffer;
353    use vortex_buffer::ByteBufferMut;
354    use vortex_io::session::RuntimeSession;
355    use vortex_layout::session::LayoutSession;
356
357    use super::*;
358    use crate::WriteOptionsSessionExt;
359
360    #[derive(Clone)]
361    // Define CountingRead struct
362    struct CountingRead<R> {
363        inner: R,
364        total_read: Arc<AtomicUsize>,
365        first_read_len: Arc<AtomicUsize>,
366    }
367
368    impl<R: VortexReadAt + Clone> VortexReadAt for CountingRead<R> {
369        fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
370            self.inner.size()
371        }
372
373        fn read_at(
374            &self,
375            offset: u64,
376            length: usize,
377            alignment: Alignment,
378        ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
379            self.total_read.fetch_add(length, Ordering::Relaxed);
380            let _ = self.first_read_len.compare_exchange(
381                0,
382                length,
383                Ordering::Relaxed,
384                Ordering::Relaxed,
385            );
386            self.inner.read_at(offset, length, alignment)
387        }
388
389        fn concurrency(&self) -> usize {
390            self.inner.concurrency()
391        }
392    }
393
394    #[tokio::test]
395    async fn test_initial_read_size() {
396        // Create a large file (> 1MB)
397        let mut buf = ByteBufferMut::empty();
398        let mut session = VortexSession::empty()
399            .with::<DTypeSession>()
400            .with::<ArraySession>()
401            .with::<LayoutSession>()
402            .with::<ScalarFnSession>()
403            .with::<RuntimeSession>();
404
405        crate::register_default_encodings(&mut session);
406
407        // 1.5M integers -> ~6MB. We use a pattern to avoid Sequence encoding.
408        let array = Buffer::from(
409            (0i32..1_500_000)
410                .map(|i| if i % 2 == 0 { i } else { -i })
411                .collect::<Vec<i32>>(),
412        )
413        .into_array();
414
415        session
416            .write_options()
417            .write(&mut buf, array.to_array_stream())
418            .await
419            .unwrap();
420
421        let buffer = ByteBuffer::from(buf);
422        assert!(
423            buffer.len() > 1024 * 1024,
424            "Buffer length is only {} bytes",
425            buffer.len()
426        );
427
428        let total_read = Arc::new(AtomicUsize::new(0));
429        let first_read_len = Arc::new(AtomicUsize::new(0));
430        let reader = CountingRead {
431            inner: buffer,
432            total_read: total_read.clone(),
433            first_read_len: first_read_len.clone(),
434        };
435
436        // Open the file
437        let _file = session.open_options().open_read(reader).await.unwrap();
438
439        // Assert that we read approximately the postscript size, not 1MB
440        let first = first_read_len.load(Ordering::Relaxed);
441        assert_eq!(
442            first,
443            MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE,
444            "Read exactly the postscript size"
445        );
446        let read = total_read.load(Ordering::Relaxed);
447        assert!(read < 1024 * 1024, "Read {} bytes, expected < 1MB", read);
448    }
449}