vortex_file/
open.rs

1use std::sync::{Arc, RwLock};
2
3use flatbuffers::root;
4use vortex_array::ArrayRegistry;
5use vortex_array::aliases::hash_map::HashMap;
6use vortex_buffer::{Alignment, ByteBuffer, ByteBufferMut};
7use vortex_dtype::DType;
8use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err};
9use vortex_flatbuffers::{FlatBuffer, ReadFlatBuffer, dtype as fbd};
10use vortex_io::VortexReadAt;
11use vortex_layout::segments::SegmentId;
12use vortex_layout::{LayoutRegistry, LayoutRegistryExt};
13use vortex_metrics::VortexMetrics;
14
15use crate::footer::{FileStatistics, Footer, Postscript, PostscriptSegment};
16use crate::segments::{NoOpSegmentCache, SegmentCache};
17use crate::{DEFAULT_REGISTRY, EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION};
18
19pub trait FileType: Sized {
20    type Options;
21}
22
23/// Open options for a Vortex file reader.
24pub struct VortexOpenOptions<F: FileType> {
25    /// File-specific options
26    pub(crate) options: F::Options,
27    /// The registry of array encodings.
28    pub(crate) registry: Arc<ArrayRegistry>,
29    /// The registry of layouts.
30    pub(crate) layout_registry: Arc<LayoutRegistry>,
31    /// An optional, externally provided, file size.
32    pub(crate) file_size: Option<u64>,
33    /// An optional, externally provided, DType.
34    pub(crate) dtype: Option<DType>,
35    /// An optional, externally provided, file layout.
36    // TODO(ngates): add an optional DType so we only read the layout segment.
37    footer: Option<Footer>,
38    pub(crate) segment_cache: Arc<dyn SegmentCache>,
39    pub(crate) initial_read_size: u64,
40    pub(crate) initial_read_segments: RwLock<HashMap<SegmentId, ByteBuffer>>,
41    pub(crate) metrics: VortexMetrics,
42}
43
44impl<F: FileType> VortexOpenOptions<F> {
45    pub(crate) fn new(options: F::Options) -> Self {
46        Self {
47            options,
48            registry: DEFAULT_REGISTRY.clone(),
49            layout_registry: Arc::new(LayoutRegistry::default()),
50            file_size: None,
51            dtype: None,
52            footer: None,
53            segment_cache: Arc::new(NoOpSegmentCache),
54            initial_read_size: 0,
55            initial_read_segments: Default::default(),
56            metrics: VortexMetrics::default(),
57        }
58    }
59
60    /// Configure a Vortex array registry.
61    pub fn with_array_registry(mut self, registry: Arc<ArrayRegistry>) -> Self {
62        self.registry = registry;
63        self
64    }
65
66    /// Configure a Vortex array registry.
67    pub fn with_layout_registry(mut self, registry: Arc<LayoutRegistry>) -> Self {
68        self.layout_registry = registry;
69        self
70    }
71
72    /// Configure a known file size.
73    ///
74    /// This helps to prevent an I/O request to discover the size of the file.
75    /// Of course, all bets are off if you pass an incorrect value.
76    pub fn with_file_size(mut self, file_size: u64) -> Self {
77        self.file_size = Some(file_size);
78        self
79    }
80
81    /// Configure a known DType.
82    ///
83    /// If this is provided, then the Vortex file may be opened with fewer I/O requests.
84    ///
85    /// For Vortex files that do not contain a `DType`, this is required.
86    pub fn with_dtype(mut self, dtype: DType) -> Self {
87        self.dtype = Some(dtype);
88        self
89    }
90
91    /// Configure a known file layout.
92    ///
93    /// If this is provided, then the Vortex file can be opened without performing any I/O.
94    /// Once open, the [`Footer`] can be accessed via [`crate::VortexFile::footer`].
95    pub fn with_footer(mut self, footer: Footer) -> Self {
96        self.dtype = Some(footer.layout().dtype().clone());
97        self.footer = Some(footer);
98        self
99    }
100
101    /// Configure the initial read size for the Vortex file.
102    pub fn with_initial_read_size(mut self, initial_read_size: u64) -> Self {
103        self.initial_read_size = initial_read_size;
104        self
105    }
106
107    /// Configure a custom [`SegmentCache`].
108    pub fn with_segment_cache(mut self, segment_cache: Arc<dyn SegmentCache>) -> Self {
109        self.segment_cache = segment_cache;
110        self
111    }
112
113    /// Disable segment caching entirely.
114    pub fn without_segment_cache(self) -> Self {
115        self.with_segment_cache(Arc::new(NoOpSegmentCache))
116    }
117
118    /// Configure a custom [`VortexMetrics`].
119    pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
120        self.metrics = metrics;
121        self
122    }
123}
124
125impl<F: FileType> VortexOpenOptions<F> {
126    /// Read the [`Footer`] from the file.
127    pub(crate) async fn read_footer<R: VortexReadAt>(&self, read: &R) -> VortexResult<Footer> {
128        if let Some(footer) = self.footer.as_ref() {
129            return Ok(footer.clone());
130        }
131
132        // Fetch the file size and perform the initial read.
133        let file_size = match self.file_size {
134            None => read.size().await?,
135            Some(file_size) => file_size,
136        };
137        let initial_read_size = self
138            .initial_read_size
139            // Make sure we read enough to cover the postscript
140            .max(MAX_FOOTER_SIZE as u64 + EOF_SIZE as u64)
141            .min(file_size);
142        let mut initial_offset = file_size - initial_read_size;
143        let mut initial_read: ByteBuffer = read
144            .read_byte_range(initial_offset..file_size, Alignment::none())
145            .await?;
146
147        // We know the initial read _must_ contain at least the Postscript.
148        let postscript = self.parse_postscript(&initial_read)?;
149
150        // If we haven't been provided a DType, we must read one from the file.
151        let dtype_segment = self.dtype.is_none().then(|| postscript.dtype.ok_or_else(|| vortex_err!("Vortex file doesn't embed a DType and one has not been provided to VortexOpenOptions"))).transpose()?;
152
153        // The other postscript segments are required, so now we figure out our the offset that
154        // contains all the required segments.
155        let mut read_more_offset = initial_offset;
156        if let Some(dtype_segment) = &dtype_segment {
157            read_more_offset = read_more_offset.min(dtype_segment.offset);
158        }
159        if let Some(stats_segment) = &postscript.statistics {
160            read_more_offset = read_more_offset.min(stats_segment.offset);
161        }
162        read_more_offset = read_more_offset.min(postscript.layout.offset);
163
164        // Read more bytes if necessary.
165        if read_more_offset < initial_offset {
166            log::info!(
167                "Initial read from {} did not cover all footer segments, reading from {}",
168                initial_offset,
169                read_more_offset
170            );
171
172            let mut new_initial_read =
173                ByteBufferMut::with_capacity(usize::try_from(file_size - read_more_offset)?);
174            new_initial_read.extend_from_slice(
175                &read
176                    .read_byte_range(read_more_offset..initial_offset, Alignment::none())
177                    .await?,
178            );
179            new_initial_read.extend_from_slice(&initial_read);
180
181            initial_offset = read_more_offset;
182            initial_read = new_initial_read.freeze();
183        }
184
185        // Now we read our initial segments.
186        let dtype = dtype_segment
187            .map(|segment| self.parse_dtype(initial_offset, &initial_read, &segment))
188            .transpose()?
189            .unwrap_or_else(|| self.dtype.clone().vortex_expect("DType was provided"));
190        let file_stats = postscript
191            .statistics
192            .map(|segment| {
193                self.parse_flatbuffer::<FileStatistics>(initial_offset, &initial_read, &segment)
194            })
195            .transpose()?;
196        let footer = self.parse_file_layout(
197            initial_offset,
198            &initial_read,
199            &postscript.layout,
200            dtype,
201            file_stats,
202        )?;
203
204        // If the initial read happened to cover any segments, then we can populate the
205        // segment cache
206        self.populate_initial_segments(initial_offset, &initial_read, &footer);
207
208        Ok(footer)
209    }
210
211    /// Parse the postscript from the initial read.
212    fn parse_postscript(&self, initial_read: &[u8]) -> VortexResult<Postscript> {
213        let eof_loc = initial_read.len() - EOF_SIZE;
214        let magic_bytes_loc = eof_loc + (EOF_SIZE - MAGIC_BYTES.len());
215
216        let magic_number = &initial_read[magic_bytes_loc..];
217        if magic_number != MAGIC_BYTES {
218            vortex_bail!("Malformed file, invalid magic bytes, got {magic_number:?}")
219        }
220
221        let version = u16::from_le_bytes(
222            initial_read[eof_loc..eof_loc + 2]
223                .try_into()
224                .map_err(|e| vortex_err!("Version was not a u16 {e}"))?,
225        );
226        if version != VERSION {
227            vortex_bail!("Malformed file, unsupported version {version}")
228        }
229
230        let ps_size = u16::from_le_bytes(
231            initial_read[eof_loc + 2..eof_loc + 4]
232                .try_into()
233                .map_err(|e| vortex_err!("Postscript size was not a u16 {e}"))?,
234        ) as usize;
235
236        Postscript::read_flatbuffer_bytes(&initial_read[eof_loc - ps_size..eof_loc])
237    }
238
239    /// Parse the DType from the initial read.
240    fn parse_dtype(
241        &self,
242        initial_offset: u64,
243        initial_read: &ByteBuffer,
244        segment: &PostscriptSegment,
245    ) -> VortexResult<DType> {
246        let offset = usize::try_from(segment.offset - initial_offset)?;
247        let sliced_buffer =
248            FlatBuffer::align_from(initial_read.slice(offset..offset + (segment.length as usize)));
249        let fbd_dtype = root::<fbd::DType>(&sliced_buffer)?;
250
251        DType::try_from_view(fbd_dtype, sliced_buffer.clone())
252    }
253
254    /// Parse a [`ReadFlatBuffer`] from the initial read buffer.
255    fn parse_flatbuffer<T: ReadFlatBuffer<Error = VortexError>>(
256        &self,
257        initial_offset: u64,
258        initial_read: &ByteBuffer,
259        segment: &PostscriptSegment,
260    ) -> VortexResult<T> {
261        let offset = usize::try_from(segment.offset - initial_offset)?;
262        let sliced_buffer =
263            FlatBuffer::align_from(initial_read.slice(offset..offset + (segment.length as usize)));
264        T::read_flatbuffer_bytes(&sliced_buffer)
265    }
266
267    /// Parse the rest of the footer from the initial read.
268    fn parse_file_layout(
269        &self,
270        initial_offset: u64,
271        initial_read: &ByteBuffer,
272        layout_segment: &PostscriptSegment,
273        dtype: DType,
274        file_stats: Option<FileStatistics>,
275    ) -> VortexResult<Footer> {
276        let offset = usize::try_from(layout_segment.offset - initial_offset)?;
277        let bytes = FlatBuffer::align_from(
278            initial_read.slice(offset..offset + (layout_segment.length as usize)),
279        );
280        Footer::from_flatbuffer(
281            bytes,
282            dtype,
283            file_stats,
284            &self.registry,
285            &self.layout_registry,
286        )
287    }
288
289    /// Populate segments in the cache that were covered by the initial read.
290    fn populate_initial_segments(
291        &self,
292        initial_offset: u64,
293        initial_read: &ByteBuffer,
294        footer: &Footer,
295    ) {
296        let first_idx = footer
297            .segment_map()
298            .partition_point(|segment| segment.offset < initial_offset);
299
300        let mut initial_segments = self
301            .initial_read_segments
302            .write()
303            .vortex_expect("poisoned lock");
304
305        for idx in first_idx..footer.segment_map().len() {
306            let segment = &footer.segment_map()[idx];
307            let segment_id =
308                SegmentId::from(u32::try_from(idx).vortex_expect("Invalid segment ID"));
309            let offset =
310                usize::try_from(segment.offset - initial_offset).vortex_expect("Invalid offset");
311            let buffer = initial_read
312                .slice(offset..offset + (segment.length as usize))
313                .aligned(segment.alignment);
314            initial_segments.insert(segment_id, buffer);
315        }
316    }
317}