Skip to main content

vortex_file/footer/
deserializer.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use flatbuffers::root;
5use vortex_array::dtype::DType;
6use vortex_buffer::ByteBuffer;
7use vortex_buffer::ByteBufferMut;
8use vortex_error::VortexExpect;
9use vortex_error::VortexResult;
10use vortex_error::vortex_bail;
11use vortex_error::vortex_err;
12use vortex_flatbuffers::FlatBuffer;
13use vortex_flatbuffers::ReadFlatBuffer;
14use vortex_session::VortexSession;
15
16use crate::EOF_SIZE;
17use crate::Footer;
18use crate::MAGIC_BYTES;
19use crate::VERSION;
20use crate::footer::FileStatistics;
21use crate::footer::postscript::Postscript;
22use crate::footer::postscript::PostscriptSegment;
23
24/// Deserialize a footer from the end of a Vortex file or created from a
25/// [`crate::footer::FooterSerializer`].
26pub struct FooterDeserializer {
27    // A buffer representing the end of a Vortex file.
28    // During deserialization, we may need to expand this buffer by requesting more data from
29    // the caller.
30    buffer: ByteBuffer,
31    // The session to use for deserialization.
32    session: VortexSession,
33    // The DType, if provided externally.
34    dtype: Option<DType>,
35
36    // Internal state that we accumulate
37
38    // The file size, possibly provided externally.
39    file_size: Option<u64>,
40    // The postscript, once we've parsed it.
41    postscript: Option<Postscript>,
42}
43
44impl FooterDeserializer {
45    pub(super) fn new(initial_read: ByteBuffer, session: VortexSession) -> Self {
46        Self {
47            buffer: initial_read,
48            session,
49            dtype: None,
50            file_size: None,
51            postscript: None,
52        }
53    }
54
55    pub fn with_dtype(mut self, dtype: DType) -> Self {
56        self.dtype = Some(dtype);
57        self
58    }
59
60    pub fn with_some_dtype(mut self, dtype: Option<DType>) -> Self {
61        self.dtype = dtype;
62        self
63    }
64
65    pub fn with_size(mut self, file_size: u64) -> Self {
66        self.file_size = Some(file_size);
67        self
68    }
69
70    pub fn with_some_size(mut self, file_size: Option<u64>) -> Self {
71        self.file_size = file_size;
72        self
73    }
74
75    /// Prefix more data to the existing buffer when requested by the deserializer.
76    pub fn prefix_data(&mut self, more_data: ByteBuffer) {
77        let mut buffer = ByteBufferMut::with_capacity(self.buffer.len() + more_data.len());
78        buffer.extend_from_slice(&more_data);
79        buffer.extend_from_slice(&self.buffer);
80        self.buffer = buffer.freeze();
81    }
82
83    pub fn deserialize(&mut self) -> VortexResult<DeserializeStep> {
84        let postscript = if let Some(ref postscript) = self.postscript {
85            postscript
86        } else {
87            self.postscript = Some(self.parse_postscript(&self.buffer)?);
88            self.postscript
89                .as_ref()
90                .vortex_expect("Just set postscript")
91        };
92
93        // If we haven't been provided a DType, we must read one from the file.
94        let dtype_segment = self
95            .dtype
96            .is_none()
97            .then(|| {
98                postscript.dtype.as_ref().ok_or_else(|| {
99                    vortex_err!(
100                        "Vortex file doesn't embed a DType and none provided to VortexOpenOptions"
101                    )
102                })
103            })
104            .transpose()?;
105
106        // The other postscript segments are required, so now we figure out our the offset that
107        // contains all the required segments.
108
109        // The initial offset is the file size - the size of our initial read.
110        let Some(file_size) = self.file_size else {
111            return Ok(DeserializeStep::NeedFileSize);
112        };
113        let initial_offset = file_size - (self.buffer.len() as u64);
114
115        let mut read_more_offset = initial_offset;
116        if let Some(dtype_segment) = &dtype_segment {
117            read_more_offset = read_more_offset.min(dtype_segment.offset);
118        }
119        if let Some(stats_segment) = &postscript.statistics {
120            read_more_offset = read_more_offset.min(stats_segment.offset);
121        }
122        read_more_offset = read_more_offset.min(postscript.layout.offset);
123        read_more_offset = read_more_offset.min(postscript.footer.offset);
124
125        // Read more bytes if necessary.
126        if read_more_offset < initial_offset {
127            tracing::debug!(
128                "Initial read from {initial_offset} did not cover all footer segments, reading from {read_more_offset}"
129            );
130            return Ok(DeserializeStep::NeedMoreData {
131                offset: read_more_offset,
132                len: usize::try_from(initial_offset - read_more_offset)?,
133            });
134        }
135
136        // Now we read our initial segments.
137        let dtype = dtype_segment
138            .map(|segment| self.parse_dtype(initial_offset, &self.buffer, segment))
139            .transpose()?
140            .unwrap_or_else(|| self.dtype.clone().vortex_expect("DType was provided"));
141        let file_stats = postscript
142            .statistics
143            .as_ref()
144            .map(|segment| {
145                self.parse_file_statistics(
146                    initial_offset,
147                    &self.buffer,
148                    segment,
149                    &dtype,
150                    &self.session,
151                )
152            })
153            .transpose()?;
154
155        Ok(DeserializeStep::Done(self.parse_footer(
156            initial_offset,
157            &self.buffer,
158            &postscript.footer,
159            &postscript.layout,
160            dtype,
161            file_stats,
162        )?))
163    }
164
165    /// The current buffer being used for deserialization.
166    pub fn buffer(&self) -> &ByteBuffer {
167        &self.buffer
168    }
169
170    /// Parse the postscript from the initial read.
171    fn parse_postscript(&self, initial_read: &[u8]) -> VortexResult<Postscript> {
172        if initial_read.len() < EOF_SIZE {
173            vortex_bail!(
174                "Initial read must be at least EOF_SIZE ({}) bytes",
175                EOF_SIZE
176            );
177        }
178        let eof_loc = initial_read.len() - EOF_SIZE;
179        let magic_bytes_loc = eof_loc + (EOF_SIZE - MAGIC_BYTES.len());
180
181        let magic_number = &initial_read[magic_bytes_loc..];
182        if magic_number != MAGIC_BYTES {
183            vortex_bail!("Malformed file, invalid magic bytes, got {magic_number:?}")
184        }
185
186        let version = u16::from_le_bytes(
187            initial_read[eof_loc..eof_loc + 2]
188                .try_into()
189                .map_err(|e| vortex_err!("Version was not a u16 {e}"))?,
190        );
191        if version != VERSION {
192            vortex_bail!("Malformed file, unsupported version {version}")
193        }
194
195        let ps_size = u16::from_le_bytes(
196            initial_read[eof_loc + 2..eof_loc + 4]
197                .try_into()
198                .map_err(|e| vortex_err!("Postscript size was not a u16 {e}"))?,
199        ) as usize;
200
201        if initial_read.len() < ps_size + EOF_SIZE {
202            vortex_bail!(
203                "Initial read must be at least {} bytes to include the Postscript",
204                ps_size + EOF_SIZE
205            );
206        }
207
208        Postscript::read_flatbuffer_bytes(&initial_read[eof_loc - ps_size..eof_loc])
209    }
210
211    /// Parse the DType from the initial read.
212    fn parse_dtype(
213        &self,
214        initial_offset: u64,
215        initial_read: &[u8],
216        segment: &PostscriptSegment,
217    ) -> VortexResult<DType> {
218        let offset = usize::try_from(segment.offset - initial_offset)?;
219        let sliced_buffer =
220            FlatBuffer::copy_from(&initial_read[offset..offset + (segment.length as usize)]);
221        DType::from_flatbuffer(sliced_buffer, &self.session)
222    }
223
224    /// Parse the [`FileStatistics`] from the initial read buffer.
225    fn parse_file_statistics(
226        &self,
227        initial_offset: u64,
228        initial_read: &[u8],
229        segment: &PostscriptSegment,
230        dtype: &DType,
231        session: &VortexSession,
232    ) -> VortexResult<FileStatistics> {
233        let offset = usize::try_from(segment.offset - initial_offset)?;
234        let sliced_buffer =
235            FlatBuffer::copy_from(&initial_read[offset..offset + (segment.length as usize)]);
236
237        let fb = root::<vortex_flatbuffers::footer::FileStatistics>(&sliced_buffer)?;
238        FileStatistics::from_flatbuffer(&fb, dtype, session)
239    }
240
241    /// Parse the rest of the footer from the initial read.
242    fn parse_footer(
243        &self,
244        initial_offset: u64,
245        initial_read: &[u8],
246        footer_segment: &PostscriptSegment,
247        layout_segment: &PostscriptSegment,
248        dtype: DType,
249        file_stats: Option<FileStatistics>,
250    ) -> VortexResult<Footer> {
251        let footer_offset = usize::try_from(footer_segment.offset - initial_offset)?;
252        let footer_bytes = FlatBuffer::copy_from(
253            &initial_read[footer_offset..footer_offset + (footer_segment.length as usize)],
254        );
255
256        let layout_offset = usize::try_from(layout_segment.offset - initial_offset)?;
257        let layout_bytes = FlatBuffer::copy_from(
258            &initial_read[layout_offset..layout_offset + (layout_segment.length as usize)],
259        );
260
261        Footer::from_flatbuffer(footer_bytes, layout_bytes, dtype, file_stats, &self.session)
262    }
263}
264
265#[derive(Debug)]
266pub enum DeserializeStep {
267    // The offset and length of additional data needed to continue deserialization.
268    NeedMoreData { offset: u64, len: usize },
269    NeedFileSize,
270    Done(Footer),
271}