Skip to main content

vortex_file/
open.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use futures::executor::block_on;
7use parking_lot::RwLock;
8use vortex_array::session::ArraySessionExt;
9use vortex_buffer::Alignment;
10use vortex_buffer::ByteBuffer;
11use vortex_dtype::DType;
12use vortex_error::VortexError;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_io::VortexReadAt;
16use vortex_io::session::RuntimeSessionExt;
17use vortex_layout::segments::InstrumentedSegmentCache;
18use vortex_layout::segments::NoOpSegmentCache;
19use vortex_layout::segments::SegmentCache;
20use vortex_layout::segments::SegmentCacheSourceAdapter;
21use vortex_layout::segments::SegmentId;
22use vortex_layout::segments::SharedSegmentSource;
23use vortex_layout::session::LayoutSessionExt;
24use vortex_metrics::DefaultMetricsRegistry;
25use vortex_metrics::Label;
26use vortex_metrics::MetricsRegistry;
27use vortex_session::VortexSession;
28use vortex_utils::aliases::hash_map::HashMap;
29
30use crate::DeserializeStep;
31use crate::EOF_SIZE;
32use crate::MAX_POSTSCRIPT_SIZE;
33use crate::VortexFile;
34use crate::footer::Footer;
35use crate::segments::FileSegmentSource;
36use crate::segments::InitialReadSegmentCache;
37use crate::segments::RequestMetrics;
38
39const INITIAL_READ_SIZE: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
40
41/// Open options for a Vortex file reader.
42pub struct VortexOpenOptions {
43    /// The session to use for opening the file.
44    session: VortexSession,
45    /// Cache to use for file segments.
46    segment_cache: Option<Arc<dyn SegmentCache>>,
47    /// The number of bytes to read when parsing the footer.
48    initial_read_size: usize,
49    /// An optional, externally provided, file size.
50    file_size: Option<u64>,
51    /// An optional, externally provided, DType.
52    dtype: Option<DType>,
53    /// An optional, externally provided, file layout.
54    footer: Option<Footer>,
55    /// The segments read during the initial read.
56    initial_read_segments: RwLock<HashMap<SegmentId, ByteBuffer>>,
57    /// A metrics registry for the file.
58    metrics_registry: Option<Arc<dyn MetricsRegistry>>,
59    /// Default labels applied to all the file's metrics
60    labels: Vec<Label>,
61}
62
63pub trait OpenOptionsSessionExt: ArraySessionExt + LayoutSessionExt + RuntimeSessionExt {
64    /// Create a new [`VortexOpenOptions`] using the provided session to open a file.
65    fn open_options(&self) -> VortexOpenOptions {
66        VortexOpenOptions {
67            session: self.session(),
68            segment_cache: None,
69            initial_read_size: INITIAL_READ_SIZE,
70            file_size: None,
71            dtype: None,
72            footer: None,
73            initial_read_segments: Default::default(),
74            metrics_registry: None,
75            labels: Vec::default(),
76        }
77    }
78}
79impl<S: ArraySessionExt + LayoutSessionExt + RuntimeSessionExt> OpenOptionsSessionExt for S {}
80
81impl VortexOpenOptions {
82    /// Configure the initial read size for the Vortex file.
83    pub fn with_initial_read_size(mut self, initial_read_size: usize) -> Self {
84        self.initial_read_size = initial_read_size;
85        self
86    }
87
88    /// Configure a custom [`SegmentCache`].
89    pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
90        self.segment_cache = Some(segment_cache);
91        self
92    }
93
94    /// Configure a known file size.
95    ///
96    /// This helps to prevent an I/O request to discover the size of the file.
97    /// Of course, all bets are off if you pass an incorrect value.
98    pub fn with_file_size(mut self, file_size: u64) -> Self {
99        self.file_size = Some(file_size);
100        self
101    }
102
103    /// Configure a known file size.
104    ///
105    /// This helps to prevent an I/O request to discover the size of the file.
106    /// Of course, all bets are off if you pass an incorrect value.
107    pub fn with_some_file_size(mut self, file_size: Option<u64>) -> Self {
108        self.file_size = file_size;
109        self
110    }
111
112    /// Configure a known DType.
113    ///
114    /// If this is provided, then the Vortex file may be opened with fewer I/O requests.
115    ///
116    /// For Vortex files that do not contain a `DType`, this is required.
117    pub fn with_dtype(mut self, dtype: DType) -> Self {
118        self.dtype = Some(dtype);
119        self
120    }
121
122    /// Configure a known file layout.
123    ///
124    /// If this is provided, then the Vortex file can be opened without performing any I/O.
125    /// Once open, the [`Footer`] can be accessed via [`crate::VortexFile::footer`].
126    pub fn with_footer(mut self, footer: Footer) -> Self {
127        self.dtype = Some(footer.layout().dtype().clone());
128        self.footer = Some(footer);
129        self
130    }
131
132    /// Configure a custom [`MetricsRegistry`] implementation.
133    pub fn with_metrics_registry(mut self, metrics: Arc<dyn MetricsRegistry>) -> Self {
134        self.metrics_registry = Some(metrics);
135        self
136    }
137
138    /// Adds labels to all the file's metrics.
139    pub fn with_labels(mut self, labels: Vec<Label>) -> Self {
140        self.labels.extend(labels);
141        self
142    }
143
144    /// Open a Vortex file using the provided I/O source.
145    ///
146    /// This is the most common way to open a [`VortexFile`] and tends to provide the best
147    /// out-of-the-box performance. The underlying I/O system will continue to be optimised for
148    /// different file systems and object stores so we encourage users to use this method
149    /// whenever possible and file issues if they encounter problems.
150    pub async fn open(self, source: Arc<dyn VortexReadAt>) -> VortexResult<VortexFile> {
151        self.open_read(source).await
152    }
153
154    /// Open a Vortex file from a filesystem path.
155    #[cfg(not(target_arch = "wasm32"))]
156    pub async fn open_path(self, path: impl AsRef<std::path::Path>) -> VortexResult<VortexFile> {
157        use vortex_io::std_file::FileReadAt;
158        let handle = self.session.handle();
159        let source = Arc::new(FileReadAt::open(path, handle)?);
160        self.open(source).await
161    }
162
163    /// Open a Vortex file from an in-memory buffer.
164    pub fn open_buffer<B: Into<ByteBuffer>>(self, buffer: B) -> VortexResult<VortexFile> {
165        // We know this is in memory, so we can open it synchronously.
166        block_on(self.with_initial_read_size(0).open_read(buffer.into()))
167    }
168
169    /// An API for opening a [`VortexFile`] using any [`VortexReadAt`] implementation.
170    pub async fn open_read<R: VortexReadAt + Clone>(self, reader: R) -> VortexResult<VortexFile> {
171        let segment_cache = self
172            .segment_cache
173            .clone()
174            .unwrap_or_else(|| Arc::new(NoOpSegmentCache));
175
176        let metrics_registry = self
177            .metrics_registry
178            .clone()
179            .unwrap_or_else(|| Arc::new(DefaultMetricsRegistry::default()));
180
181        let footer = if let Some(footer) = self.footer {
182            footer
183        } else {
184            self.read_footer(&reader).await?
185        };
186
187        let segment_cache = Arc::new(InstrumentedSegmentCache::new(
188            InitialReadSegmentCache {
189                initial: self.initial_read_segments,
190                fallback: segment_cache,
191            },
192            metrics_registry.as_ref(),
193            self.labels.clone(),
194        ));
195
196        let metrics = RequestMetrics::new(metrics_registry.as_ref(), self.labels);
197
198        // Create a segment source backed by the VortexRead implementation.
199        let segment_source = Arc::new(SharedSegmentSource::new(FileSegmentSource::open(
200            footer.segment_map().clone(),
201            reader,
202            self.session.handle(),
203            metrics,
204        )));
205
206        // Wrap up the segment source to first resolve segments from the initial read cache.
207        let segment_source = Arc::new(SegmentCacheSourceAdapter::new(
208            segment_cache,
209            segment_source,
210        ));
211
212        Ok(VortexFile {
213            footer,
214            segment_source,
215            session: self.session.clone(),
216        })
217    }
218
219    async fn read_footer(&self, read: &dyn VortexReadAt) -> VortexResult<Footer> {
220        // Fetch the file size and perform the initial read.
221        let file_size = match self.file_size {
222            None => read.size().await?,
223            Some(file_size) => file_size,
224        };
225        let mut initial_read_size = self
226            .initial_read_size
227            // Make sure we read enough to cover the postscript
228            .max(MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE);
229        if let Ok(file_size) = usize::try_from(file_size) {
230            initial_read_size = initial_read_size.min(file_size);
231        }
232
233        let initial_offset = file_size - initial_read_size as u64;
234        let initial_read: ByteBuffer = read
235            .read_at(initial_offset, initial_read_size, Alignment::none())
236            .await?
237            .try_into_host()?
238            .await?;
239
240        let mut deserializer = Footer::deserializer(initial_read, self.session.clone())
241            .with_size(file_size)
242            .with_some_dtype(self.dtype.clone());
243
244        let footer = loop {
245            match deserializer.deserialize()? {
246                DeserializeStep::NeedMoreData { offset, len } => {
247                    let more_data = read
248                        .read_at(offset, len, Alignment::none())
249                        .await?
250                        .try_into_host()?
251                        .await?;
252                    deserializer.prefix_data(more_data);
253                }
254                DeserializeStep::NeedFileSize => unreachable!("We passed file_size above"),
255                DeserializeStep::Done(footer) => break Ok::<_, VortexError>(footer),
256            }
257        }?;
258
259        // If the initial read happened to cover any segments, then we can populate the
260        // segment cache
261        let initial_offset = file_size - (deserializer.buffer().len() as u64);
262        self.populate_initial_segments(initial_offset, deserializer.buffer(), &footer);
263
264        Ok(footer)
265    }
266
267    /// Populate segments in the cache that were covered by the initial read.
268    fn populate_initial_segments(
269        &self,
270        initial_offset: u64,
271        initial_read: &ByteBuffer,
272        footer: &Footer,
273    ) {
274        let first_idx = footer
275            .segment_map()
276            .partition_point(|segment| segment.offset < initial_offset);
277
278        let mut initial_read_segments = self.initial_read_segments.write();
279
280        for idx in first_idx..footer.segment_map().len() {
281            let segment = &footer.segment_map()[idx];
282            let segment_id =
283                SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
284            let offset =
285                usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
286            let buffer = initial_read
287                .slice(offset..offset + (segment.length as usize))
288                .aligned(segment.alignment);
289            initial_read_segments.insert(segment_id, buffer);
290        }
291    }
292}
293
294#[cfg(feature = "object_store")]
295impl VortexOpenOptions {
296    pub async fn open_object_store(
297        self,
298        object_store: &Arc<dyn object_store::ObjectStore>,
299        path: &str,
300    ) -> VortexResult<VortexFile> {
301        use vortex_io::object_store::ObjectStoreReadAt;
302
303        let handle = self.session.handle();
304        let source = Arc::new(ObjectStoreReadAt::new(
305            object_store.clone(),
306            path.into(),
307            handle,
308        ));
309        self.open(source).await
310    }
311}
312
313#[cfg(test)]
314mod tests {
315    use std::sync::atomic::AtomicUsize;
316    use std::sync::atomic::Ordering;
317
318    use futures::future::BoxFuture;
319    use vortex_array::IntoArray;
320    use vortex_array::buffer::BufferHandle;
321    use vortex_array::expr::session::ExprSession;
322    use vortex_array::session::ArraySession;
323    use vortex_buffer::Buffer;
324    use vortex_buffer::ByteBufferMut;
325    use vortex_dtype::session::DTypeSession;
326    use vortex_io::session::RuntimeSession;
327    use vortex_layout::session::LayoutSession;
328
329    use super::*;
330    use crate::WriteOptionsSessionExt;
331
332    #[derive(Clone)]
333    // Define CountingRead struct
334    struct CountingRead<R> {
335        inner: R,
336        total_read: Arc<AtomicUsize>,
337        first_read_len: Arc<AtomicUsize>,
338    }
339
340    impl<R: VortexReadAt + Clone> VortexReadAt for CountingRead<R> {
341        fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
342            self.inner.size()
343        }
344
345        fn read_at(
346            &self,
347            offset: u64,
348            length: usize,
349            alignment: Alignment,
350        ) -> BoxFuture<'static, VortexResult<BufferHandle>> {
351            self.total_read.fetch_add(length, Ordering::Relaxed);
352            let _ = self.first_read_len.compare_exchange(
353                0,
354                length,
355                Ordering::Relaxed,
356                Ordering::Relaxed,
357            );
358            self.inner.read_at(offset, length, alignment)
359        }
360
361        fn concurrency(&self) -> usize {
362            self.inner.concurrency()
363        }
364    }
365
366    #[tokio::test]
367    async fn test_initial_read_size() {
368        // Create a large file (> 1MB)
369        let mut buf = ByteBufferMut::empty();
370        let mut session = VortexSession::empty()
371            .with::<DTypeSession>()
372            .with::<ArraySession>()
373            .with::<LayoutSession>()
374            .with::<ExprSession>()
375            .with::<RuntimeSession>();
376
377        crate::register_default_encodings(&mut session);
378
379        // 1.5M integers -> ~6MB. We use a pattern to avoid Sequence encoding.
380        let array = Buffer::from(
381            (0i32..1_500_000)
382                .map(|i| if i % 2 == 0 { i } else { -i })
383                .collect::<Vec<i32>>(),
384        )
385        .into_array();
386
387        session
388            .write_options()
389            .write(&mut buf, array.to_array_stream())
390            .await
391            .unwrap();
392
393        let buffer = ByteBuffer::from(buf);
394        assert!(
395            buffer.len() > 1024 * 1024,
396            "Buffer length is only {} bytes",
397            buffer.len()
398        );
399
400        let total_read = Arc::new(AtomicUsize::new(0));
401        let first_read_len = Arc::new(AtomicUsize::new(0));
402        let reader = CountingRead {
403            inner: buffer,
404            total_read: total_read.clone(),
405            first_read_len: first_read_len.clone(),
406        };
407
408        // Open the file
409        let _file = session.open_options().open_read(reader).await.unwrap();
410
411        // Assert that we read approximately the postscript size, not 1MB
412        let first = first_read_len.load(Ordering::Relaxed);
413        assert_eq!(
414            first,
415            MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE,
416            "Read exactly the postscript size"
417        );
418        let read = total_read.load(Ordering::Relaxed);
419        assert!(read < 1024 * 1024, "Read {} bytes, expected < 1MB", read);
420    }
421}