vortex_serde/file/read/builder/
initial_read.rs1use core::ops::Range;
2
3use bytes::{Bytes, BytesMut};
4use flatbuffers::{root, root_unchecked};
5use vortex_error::{vortex_bail, vortex_err, VortexResult};
6use vortex_flatbuffers::{footer, message};
7use vortex_schema::projection::Projection;
8
9use crate::file::{
10 LayoutDeserializer, LayoutReader, LazilyDeserializedDType, RelativeLayoutCache, Scan, EOF_SIZE,
11 INITIAL_READ_SIZE, MAGIC_BYTES, VERSION,
12};
13use crate::io::VortexReadAt;
14
15#[derive(Debug)]
16pub struct InitialRead {
17 pub buf: Bytes,
20 pub initial_read_offset: u64,
22 pub fb_postscript_byte_range: Range<usize>,
24}
25
26impl InitialRead {
27 pub fn fb_postscript(&self) -> VortexResult<footer::Postscript> {
28 Ok(unsafe {
29 root_unchecked::<footer::Postscript>(&self.buf[self.fb_postscript_byte_range.clone()])
30 })
31 }
32
33 pub fn fb_layout_byte_range(&self) -> VortexResult<Range<usize>> {
35 let footer = self.fb_postscript()?;
36 let layout_start = (footer.layout_offset() - self.initial_read_offset) as usize;
37 let layout_end = self.fb_postscript_byte_range.start;
38 Ok(layout_start..layout_end)
39 }
40
41 pub fn fb_layout(&self) -> VortexResult<footer::Layout> {
43 Ok(unsafe { root_unchecked::<footer::Layout>(&self.buf[self.fb_layout_byte_range()?]) })
44 }
45
46 pub fn fb_schema_byte_range(&self) -> VortexResult<Range<usize>> {
48 let footer = self.fb_postscript()?;
49 let schema_start = (footer.schema_offset() - self.initial_read_offset) as usize;
50 let schema_end = (footer.layout_offset() - self.initial_read_offset) as usize;
51 Ok(schema_start..schema_end)
52 }
53
54 pub fn fb_schema(&self) -> VortexResult<message::Schema> {
56 Ok(unsafe { root_unchecked::<message::Schema>(&self.buf[self.fb_schema_byte_range()?]) })
57 }
58
59 pub fn lazy_dtype(&self) -> VortexResult<LazilyDeserializedDType> {
60 unsafe {
62 Ok(LazilyDeserializedDType::from_schema_bytes(
63 self.buf.slice(self.fb_schema_byte_range()?),
64 Projection::All,
65 ))
66 }
67 }
68}
69
70pub fn read_layout_from_initial(
71 initial_read: &InitialRead,
72 layout_serde: &LayoutDeserializer,
73 scan: Scan,
74 message_cache: RelativeLayoutCache,
75) -> VortexResult<Box<dyn LayoutReader>> {
76 let layout_bytes = initial_read.buf.slice(initial_read.fb_layout_byte_range()?);
77 let fb_loc = initial_read.fb_layout()?._tab.loc();
78 layout_serde.read_layout(layout_bytes, fb_loc, scan, message_cache)
79}
80
81pub async fn read_initial_bytes<R: VortexReadAt>(
82 read: &R,
83 file_size: u64,
84) -> VortexResult<InitialRead> {
85 if file_size < EOF_SIZE as u64 {
86 vortex_bail!(
87 "Malformed vortex file, size {} must be at least {}",
88 file_size,
89 EOF_SIZE,
90 )
91 }
92
93 let read_size = INITIAL_READ_SIZE.min(file_size as usize);
94 let mut buf = BytesMut::with_capacity(read_size);
95 unsafe { buf.set_len(read_size) }
96
97 let initial_read_offset = file_size - read_size as u64;
98 buf = read.read_at_into(initial_read_offset, buf).await?;
99
100 let eof_loc = read_size - EOF_SIZE;
101 let magic_bytes_loc = eof_loc + (EOF_SIZE - MAGIC_BYTES.len());
102 let magic_number = &buf[magic_bytes_loc..];
103 if magic_number != MAGIC_BYTES {
104 vortex_bail!("Malformed file, invalid magic bytes, got {magic_number:?}")
105 }
106
107 let version = u16::from_le_bytes(
108 buf[eof_loc..eof_loc + 2]
109 .try_into()
110 .map_err(|e| vortex_err!("Version was not a u16 {e}"))?,
111 );
112 if version != VERSION {
113 vortex_bail!("Malformed file, unsupported version {version}")
114 }
115
116 let ps_size = u16::from_le_bytes(
118 buf[eof_loc + 2..eof_loc + 4]
119 .try_into()
120 .map_err(|e| vortex_err!("Footer size was not a u16 {e}"))?,
121 ) as usize;
122 if ps_size > eof_loc {
123 vortex_bail!(
124 "Malformed file, postscript of size {} is too large to fit in initial read of size {} (file size {})",
125 ps_size,
126 read_size,
127 file_size,
128 )
129 }
130
131 let ps_loc = eof_loc - ps_size;
132 let fb_postscript_byte_range = ps_loc..eof_loc;
133
134 let postscript = root::<footer::Postscript>(&buf[fb_postscript_byte_range.clone()])?;
136 let schema_offset = postscript.schema_offset();
137 let layout_offset = postscript.layout_offset();
138
139 if layout_offset > initial_read_offset + ps_loc as u64 {
140 vortex_bail!(
141 "Layout must come before the Footer, got layout_offset {}, but footer starts at offset {}",
142 layout_offset,
143 initial_read_offset + ps_loc as u64,
144 )
145 }
146
147 if layout_offset < schema_offset {
148 vortex_bail!(
149 "Schema must come before the Layout, got schema_offset {} and layout_offset {}",
150 schema_offset,
151 layout_offset,
152 )
153 }
154
155 if schema_offset < initial_read_offset {
156 vortex_bail!(
158 "Schema, layout, & footer must be in the initial read, got schema at {} and initial read from {}",
159 schema_offset,
160 initial_read_offset,
161 )
162 }
163
164 let schema_loc = (schema_offset - initial_read_offset) as usize;
166 let layout_loc = (layout_offset - initial_read_offset) as usize;
167 root::<message::Schema>(&buf[schema_loc..layout_loc])?;
168 root::<footer::Layout>(&buf[layout_loc..ps_loc])?;
169
170 Ok(InitialRead {
171 buf: buf.freeze(),
172 initial_read_offset,
173 fb_postscript_byte_range,
174 })
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180 use crate::file::MAX_FOOTER_SIZE;
181
182 #[test]
183 fn big_enough_initial_read() {
184 assert!(INITIAL_READ_SIZE > EOF_SIZE + MAX_FOOTER_SIZE as usize);
185 }
186}