vortex_file/
open.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use futures::executor::block_on;
7use parking_lot::RwLock;
8use vortex_array::ArrayRegistry;
9use vortex_buffer::{Alignment, ByteBuffer};
10use vortex_dtype::DType;
11use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_bail};
12use vortex_io::file::IntoReadSource;
13use vortex_io::runtime::Handle;
14use vortex_io::{InstrumentedReadAt, VortexReadAt};
15use vortex_layout::segments::{
16    NoOpSegmentCache, SegmentCache, SegmentCacheMetrics, SegmentCacheSourceAdapter, SegmentId,
17    SharedSegmentSource,
18};
19use vortex_layout::{LayoutRegistry, LayoutRegistryExt};
20use vortex_metrics::VortexMetrics;
21use vortex_utils::aliases::hash_map::HashMap;
22
23use crate::footer::Footer;
24use crate::segments::{FileSegmentSource, InitialReadSegmentCache};
25use crate::{DEFAULT_REGISTRY, DeserializeStep, EOF_SIZE, MAX_POSTSCRIPT_SIZE, VortexFile};
26
27const INITIAL_READ_SIZE: usize = 1 << 20; // 1 MB
28
29/// Open options for a Vortex file reader.
30pub struct VortexOpenOptions {
31    /// The handle used by the open file.
32    handle: Option<Handle>,
33    /// Cache to use for file segments.
34    segment_cache: Arc<dyn SegmentCache>,
35    /// The number of bytes to read when parsing the footer.
36    initial_read_size: usize,
37    /// The registry of array encodings.
38    registry: Arc<ArrayRegistry>,
39    /// The registry of layouts.
40    layout_registry: Arc<LayoutRegistry>,
41    /// An optional, externally provided, file size.
42    file_size: Option<u64>,
43    /// An optional, externally provided, DType.
44    dtype: Option<DType>,
45    /// An optional, externally provided, file layout.
46    footer: Option<Footer>,
47    /// The segments read during the initial read.
48    initial_read_segments: RwLock<HashMap<SegmentId, ByteBuffer>>,
49    /// A metrics registry for the file.
50    metrics: VortexMetrics,
51}
52
53impl Default for VortexOpenOptions {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59impl VortexOpenOptions {
60    /// Create a new [`VortexOpenOptions`] with the expected options for the file source.
61    ///
62    /// This should not be used directly, instead public API clients are expected to
63    /// access either `VortexOpenOptions::new()` or `VortexOpenOptions::memory()`
64    pub fn new() -> Self {
65        Self {
66            handle: Handle::find(),
67            segment_cache: Arc::new(NoOpSegmentCache),
68            initial_read_size: INITIAL_READ_SIZE,
69            registry: DEFAULT_REGISTRY.clone(),
70            layout_registry: Arc::new(LayoutRegistry::default()),
71            file_size: None,
72            dtype: None,
73            footer: None,
74            initial_read_segments: Default::default(),
75            metrics: VortexMetrics::default(),
76        }
77    }
78
79    /// Configure the initial read size for the Vortex file.
80    pub fn with_initial_read_size(mut self, initial_read_size: usize) -> Self {
81        self.initial_read_size = initial_read_size;
82        self
83    }
84
85    /// Configure a custom [`SegmentCache`].
86    pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
87        self.segment_cache = segment_cache;
88        self
89    }
90
91    /// Disable segment caching entirely.
92    pub fn without_segment_cache(self) -> Self {
93        self.with_segment_cache(Arc::new(NoOpSegmentCache))
94    }
95
96    /// Configure a [`Handle`] to use for opening the file.
97    ///
98    /// **Warning**: it is important that the runtime associated with the handle remains alive
99    /// while the file is being used. If the runtime is dropped, any I/O operations on the
100    /// file will fail.
101    ///
102    /// We tried to enforce this with Rust lifetimes, but sadly Rust async cannot express scoped
103    /// futures in a safe way, so we need static lifetimes for now. If you're interested in the
104    /// details, see [this post](https://without.boats/blog/the-scoped-task-trilemma/).
105    pub fn with_handle(mut self, handle: Handle) -> Self {
106        self.handle = Some(handle);
107        self
108    }
109
110    /// Configure a Vortex array registry.
111    pub fn with_array_registry(mut self, registry: Arc<ArrayRegistry>) -> Self {
112        self.registry = registry;
113        self
114    }
115
116    /// Configure a Vortex array registry.
117    pub fn with_layout_registry(mut self, registry: Arc<LayoutRegistry>) -> Self {
118        self.layout_registry = registry;
119        self
120    }
121
122    /// Configure a known file size.
123    ///
124    /// This helps to prevent an I/O request to discover the size of the file.
125    /// Of course, all bets are off if you pass an incorrect value.
126    pub fn with_file_size(mut self, file_size: u64) -> Self {
127        self.file_size = Some(file_size);
128        self
129    }
130
131    /// Configure a known DType.
132    ///
133    /// If this is provided, then the Vortex file may be opened with fewer I/O requests.
134    ///
135    /// For Vortex files that do not contain a `DType`, this is required.
136    pub fn with_dtype(mut self, dtype: DType) -> Self {
137        self.dtype = Some(dtype);
138        self
139    }
140
141    /// Configure a known file layout.
142    ///
143    /// If this is provided, then the Vortex file can be opened without performing any I/O.
144    /// Once open, the [`Footer`] can be accessed via [`crate::VortexFile::footer`].
145    pub fn with_footer(mut self, footer: Footer) -> Self {
146        self.dtype = Some(footer.layout().dtype().clone());
147        self.footer = Some(footer);
148        self
149    }
150
151    /// Configure a custom [`VortexMetrics`].
152    pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
153        self.metrics = metrics;
154        self
155    }
156
157    /// Open a Vortex file using the provided I/O source.
158    ///
159    /// This is the most common way to open a [`VortexFile`] and tends to provide the best
160    /// out-of-the-box performance. The underlying I/O system will continue to be optimised for
161    /// different file systems and object stores so we encourage users to use this method
162    /// whenever possible and file issues if they encounter problems.
163    pub async fn open<S: IntoReadSource>(self, source: S) -> VortexResult<VortexFile> {
164        let Some(handle) = self.handle.clone() else {
165            vortex_bail!("VortexOpenOptions::handle must be set, or else be running inside Tokio");
166        };
167        let metrics = self.metrics.clone();
168        self.open_read_at(handle.open_read(source, metrics)?).await
169    }
170
171    /// Open a Vortex file from an in-memory buffer.
172    pub fn open_buffer<B: Into<ByteBuffer>>(self, buffer: B) -> VortexResult<VortexFile> {
173        // We know this is in memory, so we can open it synchronously.
174        block_on(
175            self.with_initial_read_size(0)
176                .without_segment_cache()
177                .open_read_at(buffer.into()),
178        )
179    }
180
181    /// An API for opening a [`VortexFile`] using any [`VortexReadAt`] implementation.
182    ///
183    /// This is a low-level API and we strongly recommend using [`VortexOpenOptions::open`].
184    pub async fn open_read_at<R: VortexReadAt>(self, read: R) -> VortexResult<VortexFile> {
185        let read = Arc::new(InstrumentedReadAt::new(Arc::new(read), &self.metrics));
186
187        let footer = if let Some(footer) = self.footer {
188            footer
189        } else {
190            self.read_footer(read.clone()).await?
191        };
192
193        let segment_cache = Arc::new(SegmentCacheMetrics::new(
194            InitialReadSegmentCache {
195                initial: self.initial_read_segments,
196                fallback: self.segment_cache,
197            },
198            self.metrics.clone(),
199        ));
200
201        // Create a segment source backed by the VortexReadAt implementation.
202        let segment_source = Arc::new(SharedSegmentSource::new(FileSegmentSource::new(
203            footer.segment_map().clone(),
204            read,
205        )));
206
207        // Wrap up the segment source to first resolve segments from the initial read cache.
208        let segment_source = Arc::new(SegmentCacheSourceAdapter::new(
209            segment_cache,
210            segment_source,
211        ));
212
213        Ok(VortexFile {
214            footer,
215            segment_source,
216            metrics: self.metrics,
217        })
218    }
219
220    async fn read_footer(&self, read: Arc<dyn VortexReadAt>) -> VortexResult<Footer> {
221        // Fetch the file size and perform the initial read.
222        let file_size = match self.file_size {
223            None => read.size().await?,
224            Some(file_size) => file_size,
225        };
226        let mut initial_read_size = self
227            .initial_read_size
228            // Make sure we read enough to cover the postscript
229            .max(MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE);
230        if let Ok(file_size) = usize::try_from(file_size) {
231            initial_read_size = initial_read_size.min(file_size);
232        }
233
234        let initial_offset = file_size - initial_read_size as u64;
235        let initial_read: ByteBuffer = read
236            .clone()
237            .read_at(initial_offset, initial_read_size, Alignment::none())
238            .await?;
239
240        let mut deserializer = Footer::deserializer(initial_read)
241            .with_size(file_size)
242            .with_some_dtype(self.dtype.clone())
243            .with_array_registry(self.registry.clone())
244            .with_layout_registry(self.layout_registry.clone());
245
246        let footer = loop {
247            match deserializer.deserialize()? {
248                DeserializeStep::NeedMoreData { offset, len } => {
249                    let more_data = read.clone().read_at(offset, len, Alignment::none()).await?;
250                    deserializer.prefix_data(more_data);
251                }
252                DeserializeStep::NeedFileSize => unreachable!("We passed file_size above"),
253                DeserializeStep::Done(footer) => break Ok::<_, VortexError>(footer),
254            }
255        }?;
256
257        // If the initial read happened to cover any segments, then we can populate the
258        // segment cache
259        let initial_offset = file_size - (deserializer.buffer().len() as u64);
260        self.populate_initial_segments(initial_offset, deserializer.buffer(), &footer);
261
262        Ok(footer)
263    }
264
265    /// Populate segments in the cache that were covered by the initial read.
266    fn populate_initial_segments(
267        &self,
268        initial_offset: u64,
269        initial_read: &ByteBuffer,
270        footer: &Footer,
271    ) {
272        let first_idx = footer
273            .segment_map()
274            .partition_point(|segment| segment.offset < initial_offset);
275
276        let mut initial_read_segments = self.initial_read_segments.write();
277
278        for idx in first_idx..footer.segment_map().len() {
279            let segment = &footer.segment_map()[idx];
280            let segment_id =
281                SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
282            let offset =
283                usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
284            let buffer = initial_read
285                .slice(offset..offset + (segment.length as usize))
286                .aligned(segment.alignment);
287            initial_read_segments.insert(segment_id, buffer);
288        }
289    }
290}
291
292#[cfg(feature = "object_store")]
293impl VortexOpenOptions {
294    pub async fn open_object_store(
295        self,
296        object_store: &Arc<dyn object_store::ObjectStore>,
297        path: &str,
298    ) -> VortexResult<VortexFile> {
299        use vortex_io::file::object_store::ObjectStoreReadSource;
300
301        self.open(ObjectStoreReadSource::new(
302            object_store.clone(),
303            path.into(),
304        ))
305        .await
306    }
307}