vortex_file/footer/
deserializer.rs1use flatbuffers::root;
5use vortex_array::dtype::DType;
6use vortex_buffer::ByteBuffer;
7use vortex_buffer::ByteBufferMut;
8use vortex_error::VortexExpect;
9use vortex_error::VortexResult;
10use vortex_error::vortex_bail;
11use vortex_error::vortex_err;
12use vortex_flatbuffers::FlatBuffer;
13use vortex_flatbuffers::ReadFlatBuffer;
14use vortex_session::VortexSession;
15
16use crate::EOF_SIZE;
17use crate::Footer;
18use crate::MAGIC_BYTES;
19use crate::VERSION;
20use crate::footer::FileStatistics;
21use crate::footer::postscript::Postscript;
22use crate::footer::postscript::PostscriptSegment;
23
24pub struct FooterDeserializer {
27 buffer: ByteBuffer,
31 session: VortexSession,
33 dtype: Option<DType>,
35
36 file_size: Option<u64>,
40 postscript: Option<Postscript>,
42}
43
44impl FooterDeserializer {
45 pub(super) fn new(initial_read: ByteBuffer, session: VortexSession) -> Self {
46 Self {
47 buffer: initial_read,
48 session,
49 dtype: None,
50 file_size: None,
51 postscript: None,
52 }
53 }
54
55 pub fn with_dtype(mut self, dtype: DType) -> Self {
56 self.dtype = Some(dtype);
57 self
58 }
59
60 pub fn with_some_dtype(mut self, dtype: Option<DType>) -> Self {
61 self.dtype = dtype;
62 self
63 }
64
65 pub fn with_size(mut self, file_size: u64) -> Self {
66 self.file_size = Some(file_size);
67 self
68 }
69
70 pub fn with_some_size(mut self, file_size: Option<u64>) -> Self {
71 self.file_size = file_size;
72 self
73 }
74
75 pub fn prefix_data(&mut self, more_data: ByteBuffer) {
77 let mut buffer = ByteBufferMut::with_capacity(self.buffer.len() + more_data.len());
78 buffer.extend_from_slice(&more_data);
79 buffer.extend_from_slice(&self.buffer);
80 self.buffer = buffer.freeze();
81 }
82
83 pub fn deserialize(&mut self) -> VortexResult<DeserializeStep> {
84 let postscript = if let Some(ref postscript) = self.postscript {
85 postscript
86 } else {
87 self.postscript = Some(self.parse_postscript(&self.buffer)?);
88 self.postscript
89 .as_ref()
90 .vortex_expect("Just set postscript")
91 };
92
93 let dtype_segment = self
95 .dtype
96 .is_none()
97 .then(|| {
98 postscript.dtype.as_ref().ok_or_else(|| {
99 vortex_err!(
100 "Vortex file doesn't embed a DType and none provided to VortexOpenOptions"
101 )
102 })
103 })
104 .transpose()?;
105
106 let Some(file_size) = self.file_size else {
111 return Ok(DeserializeStep::NeedFileSize);
112 };
113 let initial_offset = file_size - (self.buffer.len() as u64);
114
115 let mut read_more_offset = initial_offset;
116 if let Some(dtype_segment) = &dtype_segment {
117 read_more_offset = read_more_offset.min(dtype_segment.offset);
118 }
119 if let Some(stats_segment) = &postscript.statistics {
120 read_more_offset = read_more_offset.min(stats_segment.offset);
121 }
122 read_more_offset = read_more_offset.min(postscript.layout.offset);
123 read_more_offset = read_more_offset.min(postscript.footer.offset);
124
125 if read_more_offset < initial_offset {
127 tracing::debug!(
128 "Initial read from {initial_offset} did not cover all footer segments, reading from {read_more_offset}"
129 );
130 return Ok(DeserializeStep::NeedMoreData {
131 offset: read_more_offset,
132 len: usize::try_from(initial_offset - read_more_offset)?,
133 });
134 }
135
136 let dtype = dtype_segment
138 .map(|segment| self.parse_dtype(initial_offset, &self.buffer, segment))
139 .transpose()?
140 .unwrap_or_else(|| self.dtype.clone().vortex_expect("DType was provided"));
141 let file_stats = postscript
142 .statistics
143 .as_ref()
144 .map(|segment| {
145 self.parse_file_statistics(
146 initial_offset,
147 &self.buffer,
148 segment,
149 &dtype,
150 &self.session,
151 )
152 })
153 .transpose()?;
154
155 Ok(DeserializeStep::Done(self.parse_footer(
156 initial_offset,
157 &self.buffer,
158 &postscript.footer,
159 &postscript.layout,
160 dtype,
161 file_stats,
162 )?))
163 }
164
165 pub fn buffer(&self) -> &ByteBuffer {
167 &self.buffer
168 }
169
170 fn parse_postscript(&self, initial_read: &[u8]) -> VortexResult<Postscript> {
172 if initial_read.len() < EOF_SIZE {
173 vortex_bail!(
174 "Initial read must be at least EOF_SIZE ({}) bytes",
175 EOF_SIZE
176 );
177 }
178 let eof_loc = initial_read.len() - EOF_SIZE;
179 let magic_bytes_loc = eof_loc + (EOF_SIZE - MAGIC_BYTES.len());
180
181 let magic_number = &initial_read[magic_bytes_loc..];
182 if magic_number != MAGIC_BYTES {
183 vortex_bail!("Malformed file, invalid magic bytes, got {magic_number:?}")
184 }
185
186 let version = u16::from_le_bytes(
187 initial_read[eof_loc..eof_loc + 2]
188 .try_into()
189 .map_err(|e| vortex_err!("Version was not a u16 {e}"))?,
190 );
191 if version != VERSION {
192 vortex_bail!("Malformed file, unsupported version {version}")
193 }
194
195 let ps_size = u16::from_le_bytes(
196 initial_read[eof_loc + 2..eof_loc + 4]
197 .try_into()
198 .map_err(|e| vortex_err!("Postscript size was not a u16 {e}"))?,
199 ) as usize;
200
201 if initial_read.len() < ps_size + EOF_SIZE {
202 vortex_bail!(
203 "Initial read must be at least {} bytes to include the Postscript",
204 ps_size + EOF_SIZE
205 );
206 }
207
208 Postscript::read_flatbuffer_bytes(&initial_read[eof_loc - ps_size..eof_loc])
209 }
210
211 fn parse_dtype(
213 &self,
214 initial_offset: u64,
215 initial_read: &[u8],
216 segment: &PostscriptSegment,
217 ) -> VortexResult<DType> {
218 let offset = usize::try_from(segment.offset - initial_offset)?;
219 let sliced_buffer =
220 FlatBuffer::copy_from(&initial_read[offset..offset + (segment.length as usize)]);
221 DType::from_flatbuffer(sliced_buffer, &self.session)
222 }
223
224 fn parse_file_statistics(
226 &self,
227 initial_offset: u64,
228 initial_read: &[u8],
229 segment: &PostscriptSegment,
230 dtype: &DType,
231 session: &VortexSession,
232 ) -> VortexResult<FileStatistics> {
233 let offset = usize::try_from(segment.offset - initial_offset)?;
234 let sliced_buffer =
235 FlatBuffer::copy_from(&initial_read[offset..offset + (segment.length as usize)]);
236
237 let fb = root::<vortex_flatbuffers::footer::FileStatistics>(&sliced_buffer)?;
238 FileStatistics::from_flatbuffer(&fb, dtype, session)
239 }
240
241 fn parse_footer(
243 &self,
244 initial_offset: u64,
245 initial_read: &[u8],
246 footer_segment: &PostscriptSegment,
247 layout_segment: &PostscriptSegment,
248 dtype: DType,
249 file_stats: Option<FileStatistics>,
250 ) -> VortexResult<Footer> {
251 let footer_offset = usize::try_from(footer_segment.offset - initial_offset)?;
252 let footer_bytes = FlatBuffer::copy_from(
253 &initial_read[footer_offset..footer_offset + (footer_segment.length as usize)],
254 );
255
256 let layout_offset = usize::try_from(layout_segment.offset - initial_offset)?;
257 let layout_bytes = FlatBuffer::copy_from(
258 &initial_read[layout_offset..layout_offset + (layout_segment.length as usize)],
259 );
260
261 Footer::from_flatbuffer(footer_bytes, layout_bytes, dtype, file_stats, &self.session)
262 }
263}
264
265#[derive(Debug)]
266pub enum DeserializeStep {
267 NeedMoreData { offset: u64, len: usize },
269 NeedFileSize,
270 Done(Footer),
271}