vortex_file/
open.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use futures::executor::block_on;
7use parking_lot::RwLock;
8use vortex_array::ArraySessionExt;
9use vortex_buffer::{Alignment, ByteBuffer};
10use vortex_dtype::DType;
11use vortex_error::{VortexError, VortexExpect, VortexResult};
12use vortex_io::file::IntoReadSource;
13use vortex_io::session::RuntimeSessionExt;
14use vortex_io::{InstrumentedReadAt, VortexReadAt};
15use vortex_layout::segments::{
16    NoOpSegmentCache, SegmentCache, SegmentCacheMetrics, SegmentCacheSourceAdapter, SegmentId,
17    SharedSegmentSource,
18};
19use vortex_layout::session::LayoutSessionExt;
20use vortex_metrics::{MetricsSessionExt, VortexMetrics};
21use vortex_session::VortexSession;
22use vortex_utils::aliases::hash_map::HashMap;
23
24use crate::footer::Footer;
25use crate::segments::{FileSegmentSource, InitialReadSegmentCache};
26use crate::{DeserializeStep, EOF_SIZE, MAX_POSTSCRIPT_SIZE, VortexFile};
27
28const INITIAL_READ_SIZE: usize = 1 << 20; // 1 MB
29
30/// Open options for a Vortex file reader.
31pub struct VortexOpenOptions {
32    /// The session to use for opening the file.
33    session: VortexSession,
34    /// Cache to use for file segments.
35    segment_cache: Arc<dyn SegmentCache>,
36    /// The number of bytes to read when parsing the footer.
37    initial_read_size: usize,
38    /// An optional, externally provided, file size.
39    file_size: Option<u64>,
40    /// An optional, externally provided, DType.
41    dtype: Option<DType>,
42    /// An optional, externally provided, file layout.
43    footer: Option<Footer>,
44    /// The segments read during the initial read.
45    initial_read_segments: RwLock<HashMap<SegmentId, ByteBuffer>>,
46    /// A metrics registry for the file.
47    metrics: VortexMetrics,
48}
49
50pub trait OpenOptionsSessionExt:
51    ArraySessionExt + LayoutSessionExt + MetricsSessionExt + RuntimeSessionExt
52{
53    /// Create a new [`VortexOpenOptions`] using the provided session to open a file.
54    fn open_options(&self) -> VortexOpenOptions {
55        VortexOpenOptions {
56            session: self.session(),
57            segment_cache: Arc::new(NoOpSegmentCache),
58            initial_read_size: INITIAL_READ_SIZE,
59            file_size: None,
60            dtype: None,
61            footer: None,
62            initial_read_segments: Default::default(),
63            metrics: self.metrics(),
64        }
65    }
66}
67impl<S: ArraySessionExt + LayoutSessionExt + MetricsSessionExt + RuntimeSessionExt>
68    OpenOptionsSessionExt for S
69{
70}
71
72impl VortexOpenOptions {
73    /// Configure the initial read size for the Vortex file.
74    pub fn with_initial_read_size(mut self, initial_read_size: usize) -> Self {
75        self.initial_read_size = initial_read_size;
76        self
77    }
78
79    /// Configure a custom [`SegmentCache`].
80    pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
81        self.segment_cache = segment_cache;
82        self
83    }
84
85    /// Disable segment caching entirely.
86    pub fn without_segment_cache(self) -> Self {
87        self.with_segment_cache(Arc::new(NoOpSegmentCache))
88    }
89
90    /// Configure a known file size.
91    ///
92    /// This helps to prevent an I/O request to discover the size of the file.
93    /// Of course, all bets are off if you pass an incorrect value.
94    pub fn with_file_size(mut self, file_size: u64) -> Self {
95        self.file_size = Some(file_size);
96        self
97    }
98
99    /// Configure a known DType.
100    ///
101    /// If this is provided, then the Vortex file may be opened with fewer I/O requests.
102    ///
103    /// For Vortex files that do not contain a `DType`, this is required.
104    pub fn with_dtype(mut self, dtype: DType) -> Self {
105        self.dtype = Some(dtype);
106        self
107    }
108
109    /// Configure a known file layout.
110    ///
111    /// If this is provided, then the Vortex file can be opened without performing any I/O.
112    /// Once open, the [`Footer`] can be accessed via [`crate::VortexFile::footer`].
113    pub fn with_footer(mut self, footer: Footer) -> Self {
114        self.dtype = Some(footer.layout().dtype().clone());
115        self.footer = Some(footer);
116        self
117    }
118
119    /// Configure a custom [`VortexMetrics`].
120    pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
121        self.metrics = metrics;
122        self
123    }
124
125    /// Open a Vortex file using the provided I/O source.
126    ///
127    /// This is the most common way to open a [`VortexFile`] and tends to provide the best
128    /// out-of-the-box performance. The underlying I/O system will continue to be optimised for
129    /// different file systems and object stores so we encourage users to use this method
130    /// whenever possible and file issues if they encounter problems.
131    pub async fn open<S: IntoReadSource>(self, source: S) -> VortexResult<VortexFile> {
132        let handle = self.session.handle();
133        let metrics = self.metrics.clone();
134        self.open_read_at(handle.open_read(source, metrics)?).await
135    }
136
137    /// Open a Vortex file from an in-memory buffer.
138    pub fn open_buffer<B: Into<ByteBuffer>>(self, buffer: B) -> VortexResult<VortexFile> {
139        // We know this is in memory, so we can open it synchronously.
140        block_on(
141            self.with_initial_read_size(0)
142                .without_segment_cache()
143                .open_read_at(buffer.into()),
144        )
145    }
146
147    /// An API for opening a [`VortexFile`] using any [`VortexReadAt`] implementation.
148    ///
149    /// This is a low-level API and we strongly recommend using [`VortexOpenOptions::open`].
150    pub async fn open_read_at<R: VortexReadAt>(self, read: R) -> VortexResult<VortexFile> {
151        let read = Arc::new(InstrumentedReadAt::new(Arc::new(read), &self.metrics));
152
153        let footer = if let Some(footer) = self.footer {
154            footer
155        } else {
156            self.read_footer(read.clone()).await?
157        };
158
159        let segment_cache = Arc::new(SegmentCacheMetrics::new(
160            InitialReadSegmentCache {
161                initial: self.initial_read_segments,
162                fallback: self.segment_cache,
163            },
164            self.metrics.clone(),
165        ));
166
167        // Create a segment source backed by the VortexReadAt implementation.
168        let segment_source = Arc::new(SharedSegmentSource::new(FileSegmentSource::new(
169            footer.segment_map().clone(),
170            read,
171        )));
172
173        // Wrap up the segment source to first resolve segments from the initial read cache.
174        let segment_source = Arc::new(SegmentCacheSourceAdapter::new(
175            segment_cache,
176            segment_source,
177        ));
178
179        Ok(VortexFile {
180            footer,
181            segment_source,
182            metrics: self.metrics,
183            session: self.session.clone(),
184        })
185    }
186
187    async fn read_footer(&self, read: Arc<dyn VortexReadAt>) -> VortexResult<Footer> {
188        // Fetch the file size and perform the initial read.
189        let file_size = match self.file_size {
190            None => read.size().await?,
191            Some(file_size) => file_size,
192        };
193        let mut initial_read_size = self
194            .initial_read_size
195            // Make sure we read enough to cover the postscript
196            .max(MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE);
197        if let Ok(file_size) = usize::try_from(file_size) {
198            initial_read_size = initial_read_size.min(file_size);
199        }
200
201        let initial_offset = file_size - initial_read_size as u64;
202        let initial_read: ByteBuffer = read
203            .clone()
204            .read_at(initial_offset, initial_read_size, Alignment::none())
205            .await?;
206
207        let mut deserializer = Footer::deserializer(initial_read, self.session.clone())
208            .with_size(file_size)
209            .with_some_dtype(self.dtype.clone());
210
211        let footer = loop {
212            match deserializer.deserialize()? {
213                DeserializeStep::NeedMoreData { offset, len } => {
214                    let more_data = read.clone().read_at(offset, len, Alignment::none()).await?;
215                    deserializer.prefix_data(more_data);
216                }
217                DeserializeStep::NeedFileSize => unreachable!("We passed file_size above"),
218                DeserializeStep::Done(footer) => break Ok::<_, VortexError>(footer),
219            }
220        }?;
221
222        // If the initial read happened to cover any segments, then we can populate the
223        // segment cache
224        let initial_offset = file_size - (deserializer.buffer().len() as u64);
225        self.populate_initial_segments(initial_offset, deserializer.buffer(), &footer);
226
227        Ok(footer)
228    }
229
230    /// Populate segments in the cache that were covered by the initial read.
231    fn populate_initial_segments(
232        &self,
233        initial_offset: u64,
234        initial_read: &ByteBuffer,
235        footer: &Footer,
236    ) {
237        let first_idx = footer
238            .segment_map()
239            .partition_point(|segment| segment.offset < initial_offset);
240
241        let mut initial_read_segments = self.initial_read_segments.write();
242
243        for idx in first_idx..footer.segment_map().len() {
244            let segment = &footer.segment_map()[idx];
245            let segment_id =
246                SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
247            let offset =
248                usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
249            let buffer = initial_read
250                .slice(offset..offset + (segment.length as usize))
251                .aligned(segment.alignment);
252            initial_read_segments.insert(segment_id, buffer);
253        }
254    }
255}
256
257#[cfg(feature = "object_store")]
258impl VortexOpenOptions {
259    pub async fn open_object_store(
260        self,
261        object_store: &Arc<dyn object_store::ObjectStore>,
262        path: &str,
263    ) -> VortexResult<VortexFile> {
264        use vortex_io::file::object_store::ObjectStoreReadSource;
265
266        self.open(ObjectStoreReadSource::new(
267            object_store.clone(),
268            path.into(),
269        ))
270        .await
271    }
272}