vortex_file/
open.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use futures::executor::block_on;
7use parking_lot::RwLock;
8use vortex_array::session::ArraySessionExt;
9use vortex_buffer::Alignment;
10use vortex_buffer::ByteBuffer;
11use vortex_dtype::DType;
12use vortex_error::VortexError;
13use vortex_error::VortexExpect;
14use vortex_error::VortexResult;
15use vortex_io::InstrumentedReadAt;
16use vortex_io::VortexReadAt;
17use vortex_io::file::IntoReadSource;
18use vortex_io::session::RuntimeSessionExt;
19use vortex_layout::segments::NoOpSegmentCache;
20use vortex_layout::segments::SegmentCache;
21use vortex_layout::segments::SegmentCacheMetrics;
22use vortex_layout::segments::SegmentCacheSourceAdapter;
23use vortex_layout::segments::SegmentId;
24use vortex_layout::segments::SharedSegmentSource;
25use vortex_layout::session::LayoutSessionExt;
26use vortex_metrics::MetricsSessionExt;
27use vortex_metrics::VortexMetrics;
28use vortex_session::VortexSession;
29use vortex_utils::aliases::hash_map::HashMap;
30
31use crate::DeserializeStep;
32use crate::EOF_SIZE;
33use crate::MAX_POSTSCRIPT_SIZE;
34use crate::VortexFile;
35use crate::footer::Footer;
36use crate::segments::FileSegmentSource;
37use crate::segments::InitialReadSegmentCache;
38
39const INITIAL_READ_SIZE: usize = 1 << 20; // 1 MB
40
41/// Open options for a Vortex file reader.
42pub struct VortexOpenOptions {
43    /// The session to use for opening the file.
44    session: VortexSession,
45    /// Cache to use for file segments.
46    segment_cache: Arc<dyn SegmentCache>,
47    /// The number of bytes to read when parsing the footer.
48    initial_read_size: usize,
49    /// An optional, externally provided, file size.
50    file_size: Option<u64>,
51    /// An optional, externally provided, DType.
52    dtype: Option<DType>,
53    /// An optional, externally provided, file layout.
54    footer: Option<Footer>,
55    /// The segments read during the initial read.
56    initial_read_segments: RwLock<HashMap<SegmentId, ByteBuffer>>,
57    /// A metrics registry for the file.
58    metrics: VortexMetrics,
59}
60
61pub trait OpenOptionsSessionExt:
62    ArraySessionExt + LayoutSessionExt + MetricsSessionExt + RuntimeSessionExt
63{
64    /// Create a new [`VortexOpenOptions`] using the provided session to open a file.
65    fn open_options(&self) -> VortexOpenOptions {
66        VortexOpenOptions {
67            session: self.session(),
68            segment_cache: Arc::new(NoOpSegmentCache),
69            initial_read_size: INITIAL_READ_SIZE,
70            file_size: None,
71            dtype: None,
72            footer: None,
73            initial_read_segments: Default::default(),
74            metrics: self.metrics(),
75        }
76    }
77}
78impl<S: ArraySessionExt + LayoutSessionExt + MetricsSessionExt + RuntimeSessionExt>
79    OpenOptionsSessionExt for S
80{
81}
82
83impl VortexOpenOptions {
84    /// Configure the initial read size for the Vortex file.
85    pub fn with_initial_read_size(mut self, initial_read_size: usize) -> Self {
86        self.initial_read_size = initial_read_size;
87        self
88    }
89
90    /// Configure a custom [`SegmentCache`].
91    pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
92        self.segment_cache = segment_cache;
93        self
94    }
95
96    /// Disable segment caching entirely.
97    pub fn without_segment_cache(self) -> Self {
98        self.with_segment_cache(Arc::new(NoOpSegmentCache))
99    }
100
101    /// Configure a known file size.
102    ///
103    /// This helps to prevent an I/O request to discover the size of the file.
104    /// Of course, all bets are off if you pass an incorrect value.
105    pub fn with_file_size(mut self, file_size: u64) -> Self {
106        self.file_size = Some(file_size);
107        self
108    }
109
110    /// Configure a known DType.
111    ///
112    /// If this is provided, then the Vortex file may be opened with fewer I/O requests.
113    ///
114    /// For Vortex files that do not contain a `DType`, this is required.
115    pub fn with_dtype(mut self, dtype: DType) -> Self {
116        self.dtype = Some(dtype);
117        self
118    }
119
120    /// Configure a known file layout.
121    ///
122    /// If this is provided, then the Vortex file can be opened without performing any I/O.
123    /// Once open, the [`Footer`] can be accessed via [`crate::VortexFile::footer`].
124    pub fn with_footer(mut self, footer: Footer) -> Self {
125        self.dtype = Some(footer.layout().dtype().clone());
126        self.footer = Some(footer);
127        self
128    }
129
130    /// Configure a custom [`VortexMetrics`].
131    pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
132        self.metrics = metrics;
133        self
134    }
135
136    /// Open a Vortex file using the provided I/O source.
137    ///
138    /// This is the most common way to open a [`VortexFile`] and tends to provide the best
139    /// out-of-the-box performance. The underlying I/O system will continue to be optimised for
140    /// different file systems and object stores so we encourage users to use this method
141    /// whenever possible and file issues if they encounter problems.
142    pub async fn open<S: IntoReadSource>(self, source: S) -> VortexResult<VortexFile> {
143        let handle = self.session.handle();
144        let metrics = self.metrics.clone();
145        self.open_read_at(handle.open_read(source, metrics)?).await
146    }
147
148    /// Open a Vortex file from an in-memory buffer.
149    pub fn open_buffer<B: Into<ByteBuffer>>(self, buffer: B) -> VortexResult<VortexFile> {
150        // We know this is in memory, so we can open it synchronously.
151        block_on(
152            self.with_initial_read_size(0)
153                .without_segment_cache()
154                .open_read_at(buffer.into()),
155        )
156    }
157
158    /// An API for opening a [`VortexFile`] using any [`VortexReadAt`] implementation.
159    ///
160    /// This is a low-level API and we strongly recommend using [`VortexOpenOptions::open`].
161    pub async fn open_read_at<R: VortexReadAt>(self, read: R) -> VortexResult<VortexFile> {
162        let read = Arc::new(InstrumentedReadAt::new(Arc::new(read), &self.metrics));
163
164        let footer = if let Some(footer) = self.footer {
165            footer
166        } else {
167            self.read_footer(read.clone()).await?
168        };
169
170        let segment_cache = Arc::new(SegmentCacheMetrics::new(
171            InitialReadSegmentCache {
172                initial: self.initial_read_segments,
173                fallback: self.segment_cache,
174            },
175            self.metrics.clone(),
176        ));
177
178        // Create a segment source backed by the VortexReadAt implementation.
179        let segment_source = Arc::new(SharedSegmentSource::new(FileSegmentSource::new(
180            footer.segment_map().clone(),
181            read,
182        )));
183
184        // Wrap up the segment source to first resolve segments from the initial read cache.
185        let segment_source = Arc::new(SegmentCacheSourceAdapter::new(
186            segment_cache,
187            segment_source,
188        ));
189
190        Ok(VortexFile {
191            footer,
192            segment_source,
193            metrics: self.metrics,
194            session: self.session.clone(),
195        })
196    }
197
198    async fn read_footer(&self, read: Arc<dyn VortexReadAt>) -> VortexResult<Footer> {
199        // Fetch the file size and perform the initial read.
200        let file_size = match self.file_size {
201            None => read.size().await?,
202            Some(file_size) => file_size,
203        };
204        let mut initial_read_size = self
205            .initial_read_size
206            // Make sure we read enough to cover the postscript
207            .max(MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE);
208        if let Ok(file_size) = usize::try_from(file_size) {
209            initial_read_size = initial_read_size.min(file_size);
210        }
211
212        let initial_offset = file_size - initial_read_size as u64;
213        let initial_read: ByteBuffer = read
214            .clone()
215            .read_at(initial_offset, initial_read_size, Alignment::none())
216            .await?;
217
218        let mut deserializer = Footer::deserializer(initial_read, self.session.clone())
219            .with_size(file_size)
220            .with_some_dtype(self.dtype.clone());
221
222        let footer = loop {
223            match deserializer.deserialize()? {
224                DeserializeStep::NeedMoreData { offset, len } => {
225                    let more_data = read.clone().read_at(offset, len, Alignment::none()).await?;
226                    deserializer.prefix_data(more_data);
227                }
228                DeserializeStep::NeedFileSize => unreachable!("We passed file_size above"),
229                DeserializeStep::Done(footer) => break Ok::<_, VortexError>(footer),
230            }
231        }?;
232
233        // If the initial read happened to cover any segments, then we can populate the
234        // segment cache
235        let initial_offset = file_size - (deserializer.buffer().len() as u64);
236        self.populate_initial_segments(initial_offset, deserializer.buffer(), &footer);
237
238        Ok(footer)
239    }
240
241    /// Populate segments in the cache that were covered by the initial read.
242    fn populate_initial_segments(
243        &self,
244        initial_offset: u64,
245        initial_read: &ByteBuffer,
246        footer: &Footer,
247    ) {
248        let first_idx = footer
249            .segment_map()
250            .partition_point(|segment| segment.offset < initial_offset);
251
252        let mut initial_read_segments = self.initial_read_segments.write();
253
254        for idx in first_idx..footer.segment_map().len() {
255            let segment = &footer.segment_map()[idx];
256            let segment_id =
257                SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
258            let offset =
259                usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
260            let buffer = initial_read
261                .slice(offset..offset + (segment.length as usize))
262                .aligned(segment.alignment);
263            initial_read_segments.insert(segment_id, buffer);
264        }
265    }
266}
267
268#[cfg(feature = "object_store")]
269impl VortexOpenOptions {
270    pub async fn open_object_store(
271        self,
272        object_store: &Arc<dyn object_store::ObjectStore>,
273        path: &str,
274    ) -> VortexResult<VortexFile> {
275        use vortex_io::file::object_store::ObjectStoreReadSource;
276
277        self.open(ObjectStoreReadSource::new(
278            object_store.clone(),
279            path.into(),
280        ))
281        .await
282    }
283}