vortex_file/footer/
deserializer.rs1use flatbuffers::root;
5use vortex_buffer::ByteBuffer;
6use vortex_buffer::ByteBufferMut;
7use vortex_dtype::DType;
8use vortex_error::VortexExpect;
9use vortex_error::VortexResult;
10use vortex_error::vortex_bail;
11use vortex_error::vortex_err;
12use vortex_flatbuffers::FlatBuffer;
13use vortex_flatbuffers::ReadFlatBuffer;
14use vortex_flatbuffers::dtype as fbd;
15use vortex_session::VortexSession;
16
17use crate::EOF_SIZE;
18use crate::Footer;
19use crate::MAGIC_BYTES;
20use crate::VERSION;
21use crate::footer::FileStatistics;
22use crate::footer::postscript::Postscript;
23use crate::footer::postscript::PostscriptSegment;
24
25pub struct FooterDeserializer {
28 buffer: ByteBuffer,
32 session: VortexSession,
34 dtype: Option<DType>,
36
37 file_size: Option<u64>,
41 postscript: Option<Postscript>,
43}
44
45impl FooterDeserializer {
46 pub(super) fn new(initial_read: ByteBuffer, session: VortexSession) -> Self {
47 Self {
48 buffer: initial_read,
49 session,
50 dtype: None,
51 file_size: None,
52 postscript: None,
53 }
54 }
55
56 pub fn with_dtype(mut self, dtype: DType) -> Self {
57 self.dtype = Some(dtype);
58 self
59 }
60
61 pub fn with_some_dtype(mut self, dtype: Option<DType>) -> Self {
62 self.dtype = dtype;
63 self
64 }
65
66 pub fn with_size(mut self, file_size: u64) -> Self {
67 self.file_size = Some(file_size);
68 self
69 }
70
71 pub fn with_some_size(mut self, file_size: Option<u64>) -> Self {
72 self.file_size = file_size;
73 self
74 }
75
76 pub fn prefix_data(&mut self, more_data: ByteBuffer) {
78 let mut buffer = ByteBufferMut::with_capacity(self.buffer.len() + more_data.len());
79 buffer.extend_from_slice(&more_data);
80 buffer.extend_from_slice(&self.buffer);
81 self.buffer = buffer.freeze();
82 }
83
84 pub fn deserialize(&mut self) -> VortexResult<DeserializeStep> {
85 let postscript = if let Some(ref postscript) = self.postscript {
86 postscript
87 } else {
88 self.postscript = Some(self.parse_postscript(&self.buffer)?);
89 self.postscript
90 .as_ref()
91 .vortex_expect("Just set postscript")
92 };
93
94 let dtype_segment = self
96 .dtype
97 .is_none()
98 .then(|| {
99 postscript.dtype.as_ref().ok_or_else(|| {
100 vortex_err!(
101 "Vortex file doesn't embed a DType and none provided to VortexOpenOptions"
102 )
103 })
104 })
105 .transpose()?;
106
107 let Some(file_size) = self.file_size else {
112 return Ok(DeserializeStep::NeedFileSize);
113 };
114 let initial_offset = file_size - (self.buffer.len() as u64);
115
116 let mut read_more_offset = initial_offset;
117 if let Some(dtype_segment) = &dtype_segment {
118 read_more_offset = read_more_offset.min(dtype_segment.offset);
119 }
120 if let Some(stats_segment) = &postscript.statistics {
121 read_more_offset = read_more_offset.min(stats_segment.offset);
122 }
123 read_more_offset = read_more_offset.min(postscript.layout.offset);
124 read_more_offset = read_more_offset.min(postscript.footer.offset);
125
126 if read_more_offset < initial_offset {
128 tracing::debug!(
129 "Initial read from {initial_offset} did not cover all footer segments, reading from {read_more_offset}"
130 );
131 return Ok(DeserializeStep::NeedMoreData {
132 offset: read_more_offset,
133 len: usize::try_from(initial_offset - read_more_offset)?,
134 });
135 }
136
137 let dtype = dtype_segment
139 .map(|segment| self.parse_dtype(initial_offset, &self.buffer, segment))
140 .transpose()?
141 .unwrap_or_else(|| self.dtype.clone().vortex_expect("DType was provided"));
142 let file_stats = postscript
143 .statistics
144 .as_ref()
145 .map(|segment| self.parse_file_statistics(initial_offset, &self.buffer, segment))
146 .transpose()?;
147
148 Ok(DeserializeStep::Done(self.parse_footer(
149 initial_offset,
150 &self.buffer,
151 &postscript.footer,
152 &postscript.layout,
153 dtype,
154 file_stats,
155 )?))
156 }
157
158 pub fn buffer(&self) -> &ByteBuffer {
160 &self.buffer
161 }
162
163 fn parse_postscript(&self, initial_read: &[u8]) -> VortexResult<Postscript> {
165 if initial_read.len() < EOF_SIZE {
166 vortex_bail!(
167 "Initial read must be at least EOF_SIZE ({}) bytes",
168 EOF_SIZE
169 );
170 }
171 let eof_loc = initial_read.len() - EOF_SIZE;
172 let magic_bytes_loc = eof_loc + (EOF_SIZE - MAGIC_BYTES.len());
173
174 let magic_number = &initial_read[magic_bytes_loc..];
175 if magic_number != MAGIC_BYTES {
176 vortex_bail!("Malformed file, invalid magic bytes, got {magic_number:?}")
177 }
178
179 let version = u16::from_le_bytes(
180 initial_read[eof_loc..eof_loc + 2]
181 .try_into()
182 .map_err(|e| vortex_err!("Version was not a u16 {e}"))?,
183 );
184 if version != VERSION {
185 vortex_bail!("Malformed file, unsupported version {version}")
186 }
187
188 let ps_size = u16::from_le_bytes(
189 initial_read[eof_loc + 2..eof_loc + 4]
190 .try_into()
191 .map_err(|e| vortex_err!("Postscript size was not a u16 {e}"))?,
192 ) as usize;
193
194 if initial_read.len() < ps_size + EOF_SIZE {
195 vortex_bail!(
196 "Initial read must be at least {} bytes to include the Postscript",
197 ps_size + EOF_SIZE
198 );
199 }
200
201 Postscript::read_flatbuffer_bytes(&initial_read[eof_loc - ps_size..eof_loc])
202 }
203
204 fn parse_dtype(
206 &self,
207 initial_offset: u64,
208 initial_read: &[u8],
209 segment: &PostscriptSegment,
210 ) -> VortexResult<DType> {
211 let offset = usize::try_from(segment.offset - initial_offset)?;
212 let sliced_buffer =
213 FlatBuffer::copy_from(&initial_read[offset..offset + (segment.length as usize)]);
214 let fbd_dtype = root::<fbd::DType>(&sliced_buffer)?;
215
216 DType::try_from_view(fbd_dtype, sliced_buffer.clone())
217 }
218
219 fn parse_file_statistics(
221 &self,
222 initial_offset: u64,
223 initial_read: &[u8],
224 segment: &PostscriptSegment,
225 ) -> VortexResult<FileStatistics> {
226 let offset = usize::try_from(segment.offset - initial_offset)?;
227 let sliced_buffer =
228 FlatBuffer::copy_from(&initial_read[offset..offset + (segment.length as usize)]);
229 FileStatistics::read_flatbuffer_bytes(&sliced_buffer)
230 }
231
232 fn parse_footer(
234 &self,
235 initial_offset: u64,
236 initial_read: &[u8],
237 footer_segment: &PostscriptSegment,
238 layout_segment: &PostscriptSegment,
239 dtype: DType,
240 file_stats: Option<FileStatistics>,
241 ) -> VortexResult<Footer> {
242 let footer_offset = usize::try_from(footer_segment.offset - initial_offset)?;
243 let footer_bytes = FlatBuffer::copy_from(
244 &initial_read[footer_offset..footer_offset + (footer_segment.length as usize)],
245 );
246
247 let layout_offset = usize::try_from(layout_segment.offset - initial_offset)?;
248 let layout_bytes = FlatBuffer::copy_from(
249 &initial_read[layout_offset..layout_offset + (layout_segment.length as usize)],
250 );
251
252 Footer::from_flatbuffer(footer_bytes, layout_bytes, dtype, file_stats, &self.session)
253 }
254}
255
256#[derive(Debug)]
257pub enum DeserializeStep {
258 NeedMoreData { offset: u64, len: usize },
260 NeedFileSize,
261 Done(Footer),
262}