vortex_serde/file/read/builder/
initial_read.rs

1use core::ops::Range;
2
3use bytes::{Bytes, BytesMut};
4use flatbuffers::{root, root_unchecked};
5use vortex_error::{vortex_bail, vortex_err, VortexResult};
6use vortex_flatbuffers::{footer, message};
7use vortex_schema::projection::Projection;
8
9use crate::file::{
10    LayoutDeserializer, LayoutReader, LazilyDeserializedDType, RelativeLayoutCache, Scan, EOF_SIZE,
11    INITIAL_READ_SIZE, MAGIC_BYTES, VERSION,
12};
13use crate::io::VortexReadAt;
14
15#[derive(Debug)]
16pub struct InitialRead {
17    /// The bytes from the initial read of the file, which is assumed (for now) to be sufficiently
18    /// large to contain the schema and layout.
19    pub buf: Bytes,
20    /// The absolute byte offset representing the start of the initial read within the file.
21    pub initial_read_offset: u64,
22    /// The byte range within `buf` representing the Postscript flatbuffer.
23    pub fb_postscript_byte_range: Range<usize>,
24}
25
26impl InitialRead {
27    pub fn fb_postscript(&self) -> VortexResult<footer::Postscript> {
28        Ok(unsafe {
29            root_unchecked::<footer::Postscript>(&self.buf[self.fb_postscript_byte_range.clone()])
30        })
31    }
32
33    /// The bytes of the `Layout` flatbuffer.
34    pub fn fb_layout_byte_range(&self) -> VortexResult<Range<usize>> {
35        let footer = self.fb_postscript()?;
36        let layout_start = (footer.layout_offset() - self.initial_read_offset) as usize;
37        let layout_end = self.fb_postscript_byte_range.start;
38        Ok(layout_start..layout_end)
39    }
40
41    /// The `Layout` flatbuffer.
42    pub fn fb_layout(&self) -> VortexResult<footer::Layout> {
43        Ok(unsafe { root_unchecked::<footer::Layout>(&self.buf[self.fb_layout_byte_range()?]) })
44    }
45
46    /// The bytes of the `Schema` flatbuffer.
47    pub fn fb_schema_byte_range(&self) -> VortexResult<Range<usize>> {
48        let footer = self.fb_postscript()?;
49        let schema_start = (footer.schema_offset() - self.initial_read_offset) as usize;
50        let schema_end = (footer.layout_offset() - self.initial_read_offset) as usize;
51        Ok(schema_start..schema_end)
52    }
53
54    /// The `Schema` flatbuffer.
55    pub fn fb_schema(&self) -> VortexResult<message::Schema> {
56        Ok(unsafe { root_unchecked::<message::Schema>(&self.buf[self.fb_schema_byte_range()?]) })
57    }
58
59    pub fn lazy_dtype(&self) -> VortexResult<LazilyDeserializedDType> {
60        // we validated the schema bytes at construction time
61        unsafe {
62            Ok(LazilyDeserializedDType::from_schema_bytes(
63                self.buf.slice(self.fb_schema_byte_range()?),
64                Projection::All,
65            ))
66        }
67    }
68}
69
70pub fn read_layout_from_initial(
71    initial_read: &InitialRead,
72    layout_serde: &LayoutDeserializer,
73    scan: Scan,
74    message_cache: RelativeLayoutCache,
75) -> VortexResult<Box<dyn LayoutReader>> {
76    let layout_bytes = initial_read.buf.slice(initial_read.fb_layout_byte_range()?);
77    let fb_loc = initial_read.fb_layout()?._tab.loc();
78    layout_serde.read_layout(layout_bytes, fb_loc, scan, message_cache)
79}
80
81pub async fn read_initial_bytes<R: VortexReadAt>(
82    read: &R,
83    file_size: u64,
84) -> VortexResult<InitialRead> {
85    if file_size < EOF_SIZE as u64 {
86        vortex_bail!(
87            "Malformed vortex file, size {} must be at least {}",
88            file_size,
89            EOF_SIZE,
90        )
91    }
92
93    let read_size = INITIAL_READ_SIZE.min(file_size as usize);
94    let mut buf = BytesMut::with_capacity(read_size);
95    unsafe { buf.set_len(read_size) }
96
97    let initial_read_offset = file_size - read_size as u64;
98    buf = read.read_at_into(initial_read_offset, buf).await?;
99
100    let eof_loc = read_size - EOF_SIZE;
101    let magic_bytes_loc = eof_loc + (EOF_SIZE - MAGIC_BYTES.len());
102    let magic_number = &buf[magic_bytes_loc..];
103    if magic_number != MAGIC_BYTES {
104        vortex_bail!("Malformed file, invalid magic bytes, got {magic_number:?}")
105    }
106
107    let version = u16::from_le_bytes(
108        buf[eof_loc..eof_loc + 2]
109            .try_into()
110            .map_err(|e| vortex_err!("Version was not a u16 {e}"))?,
111    );
112    if version != VERSION {
113        vortex_bail!("Malformed file, unsupported version {version}")
114    }
115
116    // The footer MUST fit in the initial read.
117    let ps_size = u16::from_le_bytes(
118        buf[eof_loc + 2..eof_loc + 4]
119            .try_into()
120            .map_err(|e| vortex_err!("Footer size was not a u16 {e}"))?,
121    ) as usize;
122    if ps_size > eof_loc {
123        vortex_bail!(
124            "Malformed file, postscript of size {} is too large to fit in initial read of size {} (file size {})",
125            ps_size,
126            read_size,
127            file_size,
128        )
129    }
130
131    let ps_loc = eof_loc - ps_size;
132    let fb_postscript_byte_range = ps_loc..eof_loc;
133
134    // we validate the footer here
135    let postscript = root::<footer::Postscript>(&buf[fb_postscript_byte_range.clone()])?;
136    let schema_offset = postscript.schema_offset();
137    let layout_offset = postscript.layout_offset();
138
139    if layout_offset > initial_read_offset + ps_loc as u64 {
140        vortex_bail!(
141            "Layout must come before the Footer, got layout_offset {}, but footer starts at offset {}",
142            layout_offset,
143            initial_read_offset + ps_loc as u64,
144        )
145    }
146
147    if layout_offset < schema_offset {
148        vortex_bail!(
149            "Schema must come before the Layout, got schema_offset {} and layout_offset {}",
150            schema_offset,
151            layout_offset,
152        )
153    }
154
155    if schema_offset < initial_read_offset {
156        // TODO: instead of bailing, we can just read more bytes.
157        vortex_bail!(
158            "Schema, layout, & footer must be in the initial read, got schema at {} and initial read from {}",
159            schema_offset,
160            initial_read_offset,
161        )
162    }
163
164    // validate the schema and layout
165    let schema_loc = (schema_offset - initial_read_offset) as usize;
166    let layout_loc = (layout_offset - initial_read_offset) as usize;
167    root::<message::Schema>(&buf[schema_loc..layout_loc])?;
168    root::<footer::Layout>(&buf[layout_loc..ps_loc])?;
169
170    Ok(InitialRead {
171        buf: buf.freeze(),
172        initial_read_offset,
173        fb_postscript_byte_range,
174    })
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180    use crate::file::MAX_FOOTER_SIZE;
181
182    #[test]
183    fn big_enough_initial_read() {
184        assert!(INITIAL_READ_SIZE > EOF_SIZE + MAX_FOOTER_SIZE as usize);
185    }
186}