vortex_file/footer/
deserializer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use flatbuffers::root;
5use vortex_buffer::ByteBuffer;
6use vortex_buffer::ByteBufferMut;
7use vortex_dtype::DType;
8use vortex_error::VortexExpect;
9use vortex_error::VortexResult;
10use vortex_error::vortex_bail;
11use vortex_error::vortex_err;
12use vortex_flatbuffers::FlatBuffer;
13use vortex_flatbuffers::ReadFlatBuffer;
14use vortex_flatbuffers::dtype as fbd;
15use vortex_session::VortexSession;
16
17use crate::EOF_SIZE;
18use crate::Footer;
19use crate::MAGIC_BYTES;
20use crate::VERSION;
21use crate::footer::FileStatistics;
22use crate::footer::postscript::Postscript;
23use crate::footer::postscript::PostscriptSegment;
24
25/// Deserialize a footer from the end of a Vortex file or created from a
26/// [`crate::footer::FooterSerializer`].
27pub struct FooterDeserializer {
28    // A buffer representing the end of a Vortex file.
29    // During deserialization, we may need to expand this buffer by requesting more data from
30    // the caller.
31    buffer: ByteBuffer,
32    // The session to use for deserialization.
33    session: VortexSession,
34    // The DType, if provided externally.
35    dtype: Option<DType>,
36
37    // Internal state that we accumulate
38
39    // The file size, possibly provided externally.
40    file_size: Option<u64>,
41    // The postscript, once we've parsed it.
42    postscript: Option<Postscript>,
43}
44
45impl FooterDeserializer {
46    pub(super) fn new(initial_read: ByteBuffer, session: VortexSession) -> Self {
47        Self {
48            buffer: initial_read,
49            session,
50            dtype: None,
51            file_size: None,
52            postscript: None,
53        }
54    }
55
56    pub fn with_dtype(mut self, dtype: DType) -> Self {
57        self.dtype = Some(dtype);
58        self
59    }
60
61    pub fn with_some_dtype(mut self, dtype: Option<DType>) -> Self {
62        self.dtype = dtype;
63        self
64    }
65
66    pub fn with_size(mut self, file_size: u64) -> Self {
67        self.file_size = Some(file_size);
68        self
69    }
70
71    pub fn with_some_size(mut self, file_size: Option<u64>) -> Self {
72        self.file_size = file_size;
73        self
74    }
75
76    /// Prefix more data to the existing buffer when requested by the deserializer.
77    pub fn prefix_data(&mut self, more_data: ByteBuffer) {
78        let mut buffer = ByteBufferMut::with_capacity(self.buffer.len() + more_data.len());
79        buffer.extend_from_slice(&more_data);
80        buffer.extend_from_slice(&self.buffer);
81        self.buffer = buffer.freeze();
82    }
83
84    pub fn deserialize(&mut self) -> VortexResult<DeserializeStep> {
85        let postscript = if let Some(ref postscript) = self.postscript {
86            postscript
87        } else {
88            self.postscript = Some(self.parse_postscript(&self.buffer)?);
89            self.postscript
90                .as_ref()
91                .vortex_expect("Just set postscript")
92        };
93
94        // If we haven't been provided a DType, we must read one from the file.
95        let dtype_segment = self
96            .dtype
97            .is_none()
98            .then(|| {
99                postscript.dtype.as_ref().ok_or_else(|| {
100                    vortex_err!(
101                        "Vortex file doesn't embed a DType and none provided to VortexOpenOptions"
102                    )
103                })
104            })
105            .transpose()?;
106
107        // The other postscript segments are required, so now we figure out our the offset that
108        // contains all the required segments.
109
110        // The initial offset is the file size - the size of our initial read.
111        let Some(file_size) = self.file_size else {
112            return Ok(DeserializeStep::NeedFileSize);
113        };
114        let initial_offset = file_size - (self.buffer.len() as u64);
115
116        let mut read_more_offset = initial_offset;
117        if let Some(dtype_segment) = &dtype_segment {
118            read_more_offset = read_more_offset.min(dtype_segment.offset);
119        }
120        if let Some(stats_segment) = &postscript.statistics {
121            read_more_offset = read_more_offset.min(stats_segment.offset);
122        }
123        read_more_offset = read_more_offset.min(postscript.layout.offset);
124        read_more_offset = read_more_offset.min(postscript.footer.offset);
125
126        // Read more bytes if necessary.
127        if read_more_offset < initial_offset {
128            tracing::debug!(
129                "Initial read from {initial_offset} did not cover all footer segments, reading from {read_more_offset}"
130            );
131            return Ok(DeserializeStep::NeedMoreData {
132                offset: read_more_offset,
133                len: usize::try_from(initial_offset - read_more_offset)?,
134            });
135        }
136
137        // Now we read our initial segments.
138        let dtype = dtype_segment
139            .map(|segment| self.parse_dtype(initial_offset, &self.buffer, segment))
140            .transpose()?
141            .unwrap_or_else(|| self.dtype.clone().vortex_expect("DType was provided"));
142        let file_stats = postscript
143            .statistics
144            .as_ref()
145            .map(|segment| self.parse_file_statistics(initial_offset, &self.buffer, segment))
146            .transpose()?;
147
148        Ok(DeserializeStep::Done(self.parse_footer(
149            initial_offset,
150            &self.buffer,
151            &postscript.footer,
152            &postscript.layout,
153            dtype,
154            file_stats,
155        )?))
156    }
157
158    /// The current buffer being used for deserialization.
159    pub fn buffer(&self) -> &ByteBuffer {
160        &self.buffer
161    }
162
163    /// Parse the postscript from the initial read.
164    fn parse_postscript(&self, initial_read: &[u8]) -> VortexResult<Postscript> {
165        if initial_read.len() < EOF_SIZE {
166            vortex_bail!(
167                "Initial read must be at least EOF_SIZE ({}) bytes",
168                EOF_SIZE
169            );
170        }
171        let eof_loc = initial_read.len() - EOF_SIZE;
172        let magic_bytes_loc = eof_loc + (EOF_SIZE - MAGIC_BYTES.len());
173
174        let magic_number = &initial_read[magic_bytes_loc..];
175        if magic_number != MAGIC_BYTES {
176            vortex_bail!("Malformed file, invalid magic bytes, got {magic_number:?}")
177        }
178
179        let version = u16::from_le_bytes(
180            initial_read[eof_loc..eof_loc + 2]
181                .try_into()
182                .map_err(|e| vortex_err!("Version was not a u16 {e}"))?,
183        );
184        if version != VERSION {
185            vortex_bail!("Malformed file, unsupported version {version}")
186        }
187
188        let ps_size = u16::from_le_bytes(
189            initial_read[eof_loc + 2..eof_loc + 4]
190                .try_into()
191                .map_err(|e| vortex_err!("Postscript size was not a u16 {e}"))?,
192        ) as usize;
193
194        if initial_read.len() < ps_size + EOF_SIZE {
195            vortex_bail!(
196                "Initial read must be at least {} bytes to include the Postscript",
197                ps_size + EOF_SIZE
198            );
199        }
200
201        Postscript::read_flatbuffer_bytes(&initial_read[eof_loc - ps_size..eof_loc])
202    }
203
204    /// Parse the DType from the initial read.
205    fn parse_dtype(
206        &self,
207        initial_offset: u64,
208        initial_read: &[u8],
209        segment: &PostscriptSegment,
210    ) -> VortexResult<DType> {
211        let offset = usize::try_from(segment.offset - initial_offset)?;
212        let sliced_buffer =
213            FlatBuffer::copy_from(&initial_read[offset..offset + (segment.length as usize)]);
214        let fbd_dtype = root::<fbd::DType>(&sliced_buffer)?;
215
216        DType::try_from_view(fbd_dtype, sliced_buffer.clone())
217    }
218
219    /// Parse the [`FileStatistics`] from the initial read buffer.
220    fn parse_file_statistics(
221        &self,
222        initial_offset: u64,
223        initial_read: &[u8],
224        segment: &PostscriptSegment,
225    ) -> VortexResult<FileStatistics> {
226        let offset = usize::try_from(segment.offset - initial_offset)?;
227        let sliced_buffer =
228            FlatBuffer::copy_from(&initial_read[offset..offset + (segment.length as usize)]);
229        FileStatistics::read_flatbuffer_bytes(&sliced_buffer)
230    }
231
232    /// Parse the rest of the footer from the initial read.
233    fn parse_footer(
234        &self,
235        initial_offset: u64,
236        initial_read: &[u8],
237        footer_segment: &PostscriptSegment,
238        layout_segment: &PostscriptSegment,
239        dtype: DType,
240        file_stats: Option<FileStatistics>,
241    ) -> VortexResult<Footer> {
242        let footer_offset = usize::try_from(footer_segment.offset - initial_offset)?;
243        let footer_bytes = FlatBuffer::copy_from(
244            &initial_read[footer_offset..footer_offset + (footer_segment.length as usize)],
245        );
246
247        let layout_offset = usize::try_from(layout_segment.offset - initial_offset)?;
248        let layout_bytes = FlatBuffer::copy_from(
249            &initial_read[layout_offset..layout_offset + (layout_segment.length as usize)],
250        );
251
252        Footer::from_flatbuffer(footer_bytes, layout_bytes, dtype, file_stats, &self.session)
253    }
254}
255
256#[derive(Debug)]
257pub enum DeserializeStep {
258    // The offset and length of additional data needed to continue deserialization.
259    NeedMoreData { offset: u64, len: usize },
260    NeedFileSize,
261    Done(Footer),
262}