parquet2/read/
metadata.rs1use std::convert::TryInto;
2use std::{
3 cmp::min,
4 io::{Read, Seek, SeekFrom},
5};
6
7use parquet_format_safe::thrift::protocol::TCompactInputProtocol;
8use parquet_format_safe::FileMetaData as TFileMetaData;
9
10use super::super::{
11 metadata::FileMetaData, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE, HEADER_SIZE, PARQUET_MAGIC,
12};
13
14use crate::error::{Error, Result};
15
16pub(super) fn metadata_len(buffer: &[u8], len: usize) -> i32 {
17 i32::from_le_bytes(buffer[len - 8..len - 4].try_into().unwrap())
18}
19
20fn stream_len(seek: &mut impl Seek) -> std::result::Result<u64, std::io::Error> {
22 let old_pos = seek.seek(SeekFrom::Current(0))?;
23 let len = seek.seek(SeekFrom::End(0))?;
24
25 if old_pos != len {
28 seek.seek(SeekFrom::Start(old_pos))?;
29 }
30
31 Ok(len)
32}
33
34pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
36 let file_size = stream_len(reader)?;
38 if file_size < HEADER_SIZE + FOOTER_SIZE {
39 return Err(Error::oos(
40 "A parquet file must containt a header and footer with at least 12 bytes",
41 ));
42 }
43
44 let default_end_len = min(DEFAULT_FOOTER_READ_SIZE, file_size) as usize;
46 reader.seek(SeekFrom::End(-(default_end_len as i64)))?;
47
48 let mut buffer = Vec::with_capacity(default_end_len);
49 reader
50 .by_ref()
51 .take(default_end_len as u64)
52 .read_to_end(&mut buffer)?;
53
54 if buffer[default_end_len - 4..] != PARQUET_MAGIC {
56 return Err(Error::oos("The file must end with PAR1"));
57 }
58
59 let metadata_len = metadata_len(&buffer, default_end_len);
60
61 let metadata_len: u64 = metadata_len.try_into()?;
62
63 let footer_len = FOOTER_SIZE + metadata_len;
64 if footer_len > file_size {
65 return Err(Error::oos(
66 "The footer size must be smaller or equal to the file's size",
67 ));
68 }
69
70 let reader: &[u8] = if (footer_len as usize) < buffer.len() {
71 let remaining = buffer.len() - footer_len as usize;
73 &buffer[remaining..]
74 } else {
75 reader.seek(SeekFrom::End(-(footer_len as i64)))?;
77
78 buffer.clear();
79 buffer.try_reserve(footer_len as usize)?;
80 reader.take(footer_len).read_to_end(&mut buffer)?;
81
82 &buffer
83 };
84
85 let max_size = reader.len() * 2 + 1024;
87
88 deserialize_metadata(reader, max_size)
89}
90
91pub fn deserialize_metadata<R: Read>(reader: R, max_size: usize) -> Result<FileMetaData> {
93 let mut prot = TCompactInputProtocol::new(reader, max_size);
94 let metadata = TFileMetaData::read_from_in_protocol(&mut prot)?;
95
96 FileMetaData::try_from_thrift(metadata)
97}