vortex_file/footer/
deserializer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use flatbuffers::root;
7use vortex_array::ArrayRegistry;
8use vortex_buffer::{ByteBuffer, ByteBufferMut};
9use vortex_dtype::DType;
10use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err};
11use vortex_flatbuffers::{FlatBuffer, ReadFlatBuffer, dtype as fbd};
12use vortex_layout::{LayoutRegistry, LayoutRegistryExt};
13
14use crate::footer::FileStatistics;
15use crate::footer::postscript::{Postscript, PostscriptSegment};
16use crate::{DEFAULT_REGISTRY, EOF_SIZE, Footer, MAGIC_BYTES, VERSION};
17
18/// Deserialize a footer from the end of a Vortex file or created from a
19/// [`crate::footer::FooterSerializer`].
20pub struct FooterDeserializer {
21    // A buffer representing the end of a Vortex file.
22    // During deserialization, we may need to expand this buffer by requesting more data from
23    // the caller.
24    buffer: ByteBuffer,
25    // The registry of array encodings.
26    array_registry: Arc<ArrayRegistry>,
27    // The registry of layouts.
28    layout_registry: Arc<LayoutRegistry>,
29    // The DType, if provided externally.
30    dtype: Option<DType>,
31
32    // Internal state that we accumulate
33
34    // The file size, possibly provided externally.
35    file_size: Option<u64>,
36    // The postscript, once we've parsed it.
37    postscript: Option<Postscript>,
38}
39
40impl FooterDeserializer {
41    pub(super) fn new(initial_read: ByteBuffer) -> Self {
42        Self {
43            buffer: initial_read,
44            array_registry: DEFAULT_REGISTRY.clone(),
45            layout_registry: Arc::new(LayoutRegistry::default()),
46            dtype: None,
47            file_size: None,
48            postscript: None,
49        }
50    }
51
52    pub fn with_dtype(mut self, dtype: DType) -> Self {
53        self.dtype = Some(dtype);
54        self
55    }
56
57    pub fn with_some_dtype(mut self, dtype: Option<DType>) -> Self {
58        self.dtype = dtype;
59        self
60    }
61
62    pub fn with_size(mut self, file_size: u64) -> Self {
63        self.file_size = Some(file_size);
64        self
65    }
66
67    pub fn with_some_size(mut self, file_size: Option<u64>) -> Self {
68        self.file_size = file_size;
69        self
70    }
71
72    pub fn with_array_registry(mut self, registry: Arc<ArrayRegistry>) -> Self {
73        self.array_registry = registry;
74        self
75    }
76
77    pub fn with_layout_registry(mut self, registry: Arc<LayoutRegistry>) -> Self {
78        self.layout_registry = registry;
79        self
80    }
81
82    /// Prefix more data to the existing buffer when requested by the deserializer.
83    pub fn prefix_data(&mut self, more_data: ByteBuffer) {
84        let mut buffer = ByteBufferMut::with_capacity(self.buffer.len() + more_data.len());
85        buffer.extend_from_slice(&more_data);
86        buffer.extend_from_slice(&self.buffer);
87        self.buffer = buffer.freeze();
88    }
89
90    pub fn deserialize(&mut self) -> VortexResult<DeserializeStep> {
91        let postscript = if let Some(ref postscript) = self.postscript {
92            postscript
93        } else {
94            self.postscript = Some(self.parse_postscript(&self.buffer)?);
95            self.postscript
96                .as_ref()
97                .vortex_expect("Just set postscript")
98        };
99
100        // If we haven't been provided a DType, we must read one from the file.
101        let dtype_segment = self
102            .dtype
103            .is_none()
104            .then(|| {
105                postscript.dtype.as_ref().ok_or_else(|| {
106                    vortex_err!(
107                        "Vortex file doesn't embed a DType and none provided to VortexOpenOptions"
108                    )
109                })
110            })
111            .transpose()?;
112
113        // The other postscript segments are required, so now we figure out our the offset that
114        // contains all the required segments.
115
116        // The initial offset is the file size - the size of our initial read.
117        let Some(file_size) = self.file_size else {
118            return Ok(DeserializeStep::NeedFileSize);
119        };
120        let initial_offset = file_size - (self.buffer.len() as u64);
121
122        let mut read_more_offset = initial_offset;
123        if let Some(dtype_segment) = &dtype_segment {
124            read_more_offset = read_more_offset.min(dtype_segment.offset);
125        }
126        if let Some(stats_segment) = &postscript.statistics {
127            read_more_offset = read_more_offset.min(stats_segment.offset);
128        }
129        read_more_offset = read_more_offset.min(postscript.layout.offset);
130        read_more_offset = read_more_offset.min(postscript.footer.offset);
131
132        // Read more bytes if necessary.
133        if read_more_offset < initial_offset {
134            log::debug!(
135                "Initial read from {initial_offset} did not cover all footer segments, reading from {read_more_offset}"
136            );
137            return Ok(DeserializeStep::NeedMoreData {
138                offset: read_more_offset,
139                len: usize::try_from(initial_offset - read_more_offset)?,
140            });
141        }
142
143        // Now we read our initial segments.
144        let dtype = dtype_segment
145            .map(|segment| self.parse_dtype(initial_offset, &self.buffer, segment))
146            .transpose()?
147            .unwrap_or_else(|| self.dtype.clone().vortex_expect("DType was provided"));
148        let file_stats = postscript
149            .statistics
150            .as_ref()
151            .map(|segment| self.parse_file_statistics(initial_offset, &self.buffer, segment))
152            .transpose()?;
153
154        Ok(DeserializeStep::Done(self.parse_footer(
155            initial_offset,
156            &self.buffer,
157            &postscript.footer,
158            &postscript.layout,
159            dtype,
160            file_stats,
161        )?))
162    }
163
164    /// The current buffer being used for deserialization.
165    pub fn buffer(&self) -> &ByteBuffer {
166        &self.buffer
167    }
168
169    /// Parse the postscript from the initial read.
170    fn parse_postscript(&self, initial_read: &[u8]) -> VortexResult<Postscript> {
171        if initial_read.len() < EOF_SIZE {
172            vortex_bail!(
173                "Initial read must be at least EOF_SIZE ({}) bytes",
174                EOF_SIZE
175            );
176        }
177        let eof_loc = initial_read.len() - EOF_SIZE;
178        let magic_bytes_loc = eof_loc + (EOF_SIZE - MAGIC_BYTES.len());
179
180        let magic_number = &initial_read[magic_bytes_loc..];
181        if magic_number != MAGIC_BYTES {
182            vortex_bail!("Malformed file, invalid magic bytes, got {magic_number:?}")
183        }
184
185        let version = u16::from_le_bytes(
186            initial_read[eof_loc..eof_loc + 2]
187                .try_into()
188                .map_err(|e| vortex_err!("Version was not a u16 {e}"))?,
189        );
190        if version != VERSION {
191            vortex_bail!("Malformed file, unsupported version {version}")
192        }
193
194        let ps_size = u16::from_le_bytes(
195            initial_read[eof_loc + 2..eof_loc + 4]
196                .try_into()
197                .map_err(|e| vortex_err!("Postscript size was not a u16 {e}"))?,
198        ) as usize;
199
200        if initial_read.len() < ps_size + EOF_SIZE {
201            vortex_bail!(
202                "Initial read must be at least {} bytes to include the Postscript",
203                ps_size + EOF_SIZE
204            );
205        }
206
207        Postscript::read_flatbuffer_bytes(&initial_read[eof_loc - ps_size..eof_loc])
208    }
209
210    /// Parse the DType from the initial read.
211    fn parse_dtype(
212        &self,
213        initial_offset: u64,
214        initial_read: &[u8],
215        segment: &PostscriptSegment,
216    ) -> VortexResult<DType> {
217        let offset = usize::try_from(segment.offset - initial_offset)?;
218        let sliced_buffer =
219            FlatBuffer::copy_from(&initial_read[offset..offset + (segment.length as usize)]);
220        let fbd_dtype = root::<fbd::DType>(&sliced_buffer)?;
221
222        DType::try_from_view(fbd_dtype, sliced_buffer.clone())
223    }
224
225    /// Parse the [`FileStatistics`] from the initial read buffer.
226    fn parse_file_statistics(
227        &self,
228        initial_offset: u64,
229        initial_read: &[u8],
230        segment: &PostscriptSegment,
231    ) -> VortexResult<FileStatistics> {
232        let offset = usize::try_from(segment.offset - initial_offset)?;
233        let sliced_buffer =
234            FlatBuffer::copy_from(&initial_read[offset..offset + (segment.length as usize)]);
235        FileStatistics::read_flatbuffer_bytes(&sliced_buffer)
236    }
237
238    /// Parse the rest of the footer from the initial read.
239    fn parse_footer(
240        &self,
241        initial_offset: u64,
242        initial_read: &[u8],
243        footer_segment: &PostscriptSegment,
244        layout_segment: &PostscriptSegment,
245        dtype: DType,
246        file_stats: Option<FileStatistics>,
247    ) -> VortexResult<Footer> {
248        let footer_offset = usize::try_from(footer_segment.offset - initial_offset)?;
249        let footer_bytes = FlatBuffer::copy_from(
250            &initial_read[footer_offset..footer_offset + (footer_segment.length as usize)],
251        );
252
253        let layout_offset = usize::try_from(layout_segment.offset - initial_offset)?;
254        let layout_bytes = FlatBuffer::copy_from(
255            &initial_read[layout_offset..layout_offset + (layout_segment.length as usize)],
256        );
257
258        Footer::from_flatbuffer(
259            footer_bytes,
260            layout_bytes,
261            dtype,
262            file_stats,
263            &self.array_registry,
264            &self.layout_registry,
265        )
266    }
267}
268
269#[derive(Debug)]
270pub enum DeserializeStep {
271    // The offset and length of additional data needed to continue deserialization.
272    NeedMoreData { offset: u64, len: usize },
273    NeedFileSize,
274    Done(Footer),
275}