vortex_file/footer/
deserializer.rs1use std::sync::Arc;
5
6use flatbuffers::root;
7use vortex_array::ArrayRegistry;
8use vortex_buffer::{ByteBuffer, ByteBufferMut};
9use vortex_dtype::DType;
10use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err};
11use vortex_flatbuffers::{FlatBuffer, ReadFlatBuffer, dtype as fbd};
12use vortex_layout::{LayoutRegistry, LayoutRegistryExt};
13
14use crate::footer::FileStatistics;
15use crate::footer::postscript::{Postscript, PostscriptSegment};
16use crate::{DEFAULT_REGISTRY, EOF_SIZE, Footer, MAGIC_BYTES, VERSION};
17
18pub struct FooterDeserializer {
21 buffer: ByteBuffer,
25 array_registry: Arc<ArrayRegistry>,
27 layout_registry: Arc<LayoutRegistry>,
29 dtype: Option<DType>,
31
32 file_size: Option<u64>,
36 postscript: Option<Postscript>,
38}
39
40impl FooterDeserializer {
41 pub(super) fn new(initial_read: ByteBuffer) -> Self {
42 Self {
43 buffer: initial_read,
44 array_registry: DEFAULT_REGISTRY.clone(),
45 layout_registry: Arc::new(LayoutRegistry::default()),
46 dtype: None,
47 file_size: None,
48 postscript: None,
49 }
50 }
51
52 pub fn with_dtype(mut self, dtype: DType) -> Self {
53 self.dtype = Some(dtype);
54 self
55 }
56
57 pub fn with_some_dtype(mut self, dtype: Option<DType>) -> Self {
58 self.dtype = dtype;
59 self
60 }
61
62 pub fn with_size(mut self, file_size: u64) -> Self {
63 self.file_size = Some(file_size);
64 self
65 }
66
67 pub fn with_some_size(mut self, file_size: Option<u64>) -> Self {
68 self.file_size = file_size;
69 self
70 }
71
72 pub fn with_array_registry(mut self, registry: Arc<ArrayRegistry>) -> Self {
73 self.array_registry = registry;
74 self
75 }
76
77 pub fn with_layout_registry(mut self, registry: Arc<LayoutRegistry>) -> Self {
78 self.layout_registry = registry;
79 self
80 }
81
82 pub fn prefix_data(&mut self, more_data: ByteBuffer) {
84 let mut buffer = ByteBufferMut::with_capacity(self.buffer.len() + more_data.len());
85 buffer.extend_from_slice(&more_data);
86 buffer.extend_from_slice(&self.buffer);
87 self.buffer = buffer.freeze();
88 }
89
90 pub fn deserialize(&mut self) -> VortexResult<DeserializeStep> {
91 let postscript = if let Some(ref postscript) = self.postscript {
92 postscript
93 } else {
94 self.postscript = Some(self.parse_postscript(&self.buffer)?);
95 self.postscript
96 .as_ref()
97 .vortex_expect("Just set postscript")
98 };
99
100 let dtype_segment = self
102 .dtype
103 .is_none()
104 .then(|| {
105 postscript.dtype.as_ref().ok_or_else(|| {
106 vortex_err!(
107 "Vortex file doesn't embed a DType and none provided to VortexOpenOptions"
108 )
109 })
110 })
111 .transpose()?;
112
113 let Some(file_size) = self.file_size else {
118 return Ok(DeserializeStep::NeedFileSize);
119 };
120 let initial_offset = file_size - (self.buffer.len() as u64);
121
122 let mut read_more_offset = initial_offset;
123 if let Some(dtype_segment) = &dtype_segment {
124 read_more_offset = read_more_offset.min(dtype_segment.offset);
125 }
126 if let Some(stats_segment) = &postscript.statistics {
127 read_more_offset = read_more_offset.min(stats_segment.offset);
128 }
129 read_more_offset = read_more_offset.min(postscript.layout.offset);
130 read_more_offset = read_more_offset.min(postscript.footer.offset);
131
132 if read_more_offset < initial_offset {
134 log::debug!(
135 "Initial read from {initial_offset} did not cover all footer segments, reading from {read_more_offset}"
136 );
137 return Ok(DeserializeStep::NeedMoreData {
138 offset: read_more_offset,
139 len: usize::try_from(initial_offset - read_more_offset)?,
140 });
141 }
142
143 let dtype = dtype_segment
145 .map(|segment| self.parse_dtype(initial_offset, &self.buffer, segment))
146 .transpose()?
147 .unwrap_or_else(|| self.dtype.clone().vortex_expect("DType was provided"));
148 let file_stats = postscript
149 .statistics
150 .as_ref()
151 .map(|segment| self.parse_file_statistics(initial_offset, &self.buffer, segment))
152 .transpose()?;
153
154 Ok(DeserializeStep::Done(self.parse_footer(
155 initial_offset,
156 &self.buffer,
157 &postscript.footer,
158 &postscript.layout,
159 dtype,
160 file_stats,
161 )?))
162 }
163
164 pub fn buffer(&self) -> &ByteBuffer {
166 &self.buffer
167 }
168
169 fn parse_postscript(&self, initial_read: &[u8]) -> VortexResult<Postscript> {
171 if initial_read.len() < EOF_SIZE {
172 vortex_bail!(
173 "Initial read must be at least EOF_SIZE ({}) bytes",
174 EOF_SIZE
175 );
176 }
177 let eof_loc = initial_read.len() - EOF_SIZE;
178 let magic_bytes_loc = eof_loc + (EOF_SIZE - MAGIC_BYTES.len());
179
180 let magic_number = &initial_read[magic_bytes_loc..];
181 if magic_number != MAGIC_BYTES {
182 vortex_bail!("Malformed file, invalid magic bytes, got {magic_number:?}")
183 }
184
185 let version = u16::from_le_bytes(
186 initial_read[eof_loc..eof_loc + 2]
187 .try_into()
188 .map_err(|e| vortex_err!("Version was not a u16 {e}"))?,
189 );
190 if version != VERSION {
191 vortex_bail!("Malformed file, unsupported version {version}")
192 }
193
194 let ps_size = u16::from_le_bytes(
195 initial_read[eof_loc + 2..eof_loc + 4]
196 .try_into()
197 .map_err(|e| vortex_err!("Postscript size was not a u16 {e}"))?,
198 ) as usize;
199
200 if initial_read.len() < ps_size + EOF_SIZE {
201 vortex_bail!(
202 "Initial read must be at least {} bytes to include the Postscript",
203 ps_size + EOF_SIZE
204 );
205 }
206
207 Postscript::read_flatbuffer_bytes(&initial_read[eof_loc - ps_size..eof_loc])
208 }
209
210 fn parse_dtype(
212 &self,
213 initial_offset: u64,
214 initial_read: &[u8],
215 segment: &PostscriptSegment,
216 ) -> VortexResult<DType> {
217 let offset = usize::try_from(segment.offset - initial_offset)?;
218 let sliced_buffer =
219 FlatBuffer::copy_from(&initial_read[offset..offset + (segment.length as usize)]);
220 let fbd_dtype = root::<fbd::DType>(&sliced_buffer)?;
221
222 DType::try_from_view(fbd_dtype, sliced_buffer.clone())
223 }
224
225 fn parse_file_statistics(
227 &self,
228 initial_offset: u64,
229 initial_read: &[u8],
230 segment: &PostscriptSegment,
231 ) -> VortexResult<FileStatistics> {
232 let offset = usize::try_from(segment.offset - initial_offset)?;
233 let sliced_buffer =
234 FlatBuffer::copy_from(&initial_read[offset..offset + (segment.length as usize)]);
235 FileStatistics::read_flatbuffer_bytes(&sliced_buffer)
236 }
237
238 fn parse_footer(
240 &self,
241 initial_offset: u64,
242 initial_read: &[u8],
243 footer_segment: &PostscriptSegment,
244 layout_segment: &PostscriptSegment,
245 dtype: DType,
246 file_stats: Option<FileStatistics>,
247 ) -> VortexResult<Footer> {
248 let footer_offset = usize::try_from(footer_segment.offset - initial_offset)?;
249 let footer_bytes = FlatBuffer::copy_from(
250 &initial_read[footer_offset..footer_offset + (footer_segment.length as usize)],
251 );
252
253 let layout_offset = usize::try_from(layout_segment.offset - initial_offset)?;
254 let layout_bytes = FlatBuffer::copy_from(
255 &initial_read[layout_offset..layout_offset + (layout_segment.length as usize)],
256 );
257
258 Footer::from_flatbuffer(
259 footer_bytes,
260 layout_bytes,
261 dtype,
262 file_stats,
263 &self.array_registry,
264 &self.layout_registry,
265 )
266 }
267}
268
269#[derive(Debug)]
270pub enum DeserializeStep {
271 NeedMoreData { offset: u64, len: usize },
273 NeedFileSize,
274 Done(Footer),
275}