parquet2/read/
metadata.rs

1use std::convert::TryInto;
2use std::{
3    cmp::min,
4    io::{Read, Seek, SeekFrom},
5};
6
7use parquet_format_safe::thrift::protocol::TCompactInputProtocol;
8use parquet_format_safe::FileMetaData as TFileMetaData;
9
10use super::super::{
11    metadata::FileMetaData, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, HEADER_SIZE, PARQUET_MAGIC,
12};
13
14use crate::error::{Error, Result};
15
16pub(super) fn metadata_len(buffer: &[u8], len: usize) -> i32 {
17    i32::from_le_bytes(buffer[len - 8..len - 4].try_into().unwrap())
18}
19
20// see (unstable) Seek::stream_len
21fn stream_len(seek: &mut impl Seek) -> std::result::Result<u64, std::io::Error> {
22    let old_pos = seek.seek(SeekFrom::Current(0))?;
23    let len = seek.seek(SeekFrom::End(0))?;
24
25    // Avoid seeking a third time when we were already at the end of the
26    // stream. The branch is usually way cheaper than a seek operation.
27    if old_pos != len {
28        seek.seek(SeekFrom::Start(old_pos))?;
29    }
30
31    Ok(len)
32}
33
34/// Reads a [`FileMetaData`] from the reader, located at the end of the file.
35pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
36    // check file is large enough to hold footer
37    let file_size = stream_len(reader)?;
38    if file_size < HEADER_SIZE + FOOTER_SIZE {
39        return Err(Error::oos(
40            "A parquet file must containt a header and footer with at least 12 bytes",
41        ));
42    }
43
44    // read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
45    let default_end_len = min(DEFAULT_FOOTER_READ_SIZE, file_size) as usize;
46    reader.seek(SeekFrom::End(-(default_end_len as i64)))?;
47
48    let mut buffer = Vec::with_capacity(default_end_len);
49    reader
50        .by_ref()
51        .take(default_end_len as u64)
52        .read_to_end(&mut buffer)?;
53
54    // check this is indeed a parquet file
55    if buffer[default_end_len - 4..] != PARQUET_MAGIC {
56        return Err(Error::oos("The file must end with PAR1"));
57    }
58
59    let metadata_len = metadata_len(&buffer, default_end_len);
60
61    let metadata_len: u64 = metadata_len.try_into()?;
62
63    let footer_len = FOOTER_SIZE + metadata_len;
64    if footer_len > file_size {
65        return Err(Error::oos(
66            "The footer size must be smaller or equal to the file's size",
67        ));
68    }
69
70    let reader: &[u8] = if (footer_len as usize) < buffer.len() {
71        // the whole metadata is in the bytes we already read
72        let remaining = buffer.len() - footer_len as usize;
73        &buffer[remaining..]
74    } else {
75        // the end of file read by default is not long enough, read again including the metadata.
76        reader.seek(SeekFrom::End(-(footer_len as i64)))?;
77
78        buffer.clear();
79        buffer.try_reserve(footer_len as usize)?;
80        reader.take(footer_len).read_to_end(&mut buffer)?;
81
82        &buffer
83    };
84
85    // a highly nested but sparse struct could result in many allocations
86    let max_size = reader.len() * 2 + 1024;
87
88    deserialize_metadata(reader, max_size)
89}
90
91/// Parse loaded metadata bytes
92pub fn deserialize_metadata<R: Read>(reader: R, max_size: usize) -> Result<FileMetaData> {
93    let mut prot = TCompactInputProtocol::new(reader, max_size);
94    let metadata = TFileMetaData::read_from_in_protocol(&mut prot)?;
95
96    FileMetaData::try_from_thrift(metadata)
97}