orc_rust/reader/
metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parse ORC file tail metadata structures from file.
19//!
20//! File tail structure:
21//!
22//! ```text
23//! ------------------
24//! |    Metadata    |
25//! |                |
26//! ------------------
27//! |     Footer     |
28//! |                |
29//! ------------------
30//! |  Postscript  |X|
31//! ------------------
32//! ```
33//!
34//! Where X is last byte in file indicating
35//! Postscript length in bytes.
36//!
37//! Footer and Metadata lengths are encoded in Postscript.
38//! Postscript is never compressed, Footer and Metadata
39//! may be compressed depending Postscript config value.
40//!
41//! If they are compressed then their lengths indicate their
42//! compressed lengths.
43
44use std::collections::HashMap;
45use std::io::Read;
46
47use bytes::{Bytes, BytesMut};
48use prost::Message;
49use snafu::{ensure, OptionExt, ResultExt};
50
51use crate::compression::{Compression, Decompressor};
52use crate::error::{self, EmptyFileSnafu, OutOfSpecSnafu, Result};
53use crate::proto::{self, Footer, Metadata, PostScript};
54use crate::schema::RootDataType;
55use crate::statistics::ColumnStatistics;
56use crate::stripe::StripeMetadata;
57
58use crate::reader::ChunkReader;
59
60const DEFAULT_FOOTER_SIZE: u64 = 16 * 1024;
61
62/// The file's metadata.
63#[derive(Debug, Clone)]
64pub struct FileMetadata {
65    compression: Option<Compression>,
66    root_data_type: RootDataType,
67    number_of_rows: u64,
68    file_format_version: String,
69    /// Statistics of columns across entire file
70    column_statistics: Vec<ColumnStatistics>,
71    stripes: Vec<StripeMetadata>,
72    user_custom_metadata: HashMap<String, Vec<u8>>,
73}
74
75impl FileMetadata {
76    fn from_proto(
77        postscript: &proto::PostScript,
78        footer: &proto::Footer,
79        metadata: &proto::Metadata,
80    ) -> Result<Self> {
81        let compression =
82            Compression::from_proto(postscript.compression(), postscript.compression_block_size);
83        let root_data_type = RootDataType::from_proto(&footer.types)?;
84        let number_of_rows = footer.number_of_rows();
85        let column_statistics = footer
86            .statistics
87            .iter()
88            .map(TryFrom::try_from)
89            .collect::<Result<Vec<_>>>()?;
90        ensure!(
91            metadata.stripe_stats.is_empty() || metadata.stripe_stats.len() == footer.stripes.len(),
92            OutOfSpecSnafu {
93                msg: "stripe stats length must equal the number of stripes"
94            }
95        );
96        // TODO: confirm if this is valid
97        let stripes = if metadata.stripe_stats.is_empty() {
98            footer
99                .stripes
100                .iter()
101                .map(TryFrom::try_from)
102                .collect::<Result<Vec<_>>>()?
103        } else {
104            footer
105                .stripes
106                .iter()
107                .zip(metadata.stripe_stats.iter())
108                .map(TryFrom::try_from)
109                .collect::<Result<Vec<_>>>()?
110        };
111        let user_custom_metadata = footer
112            .metadata
113            .iter()
114            .map(|kv| (kv.name().to_owned(), kv.value().to_vec()))
115            .collect::<HashMap<_, _>>();
116
117        let file_format_version = postscript
118            .version
119            .iter()
120            .map(|v| v.to_string() + ".")
121            .collect::<String>();
122        let file_format_version = file_format_version
123            .strip_suffix('.')
124            .unwrap_or("")
125            .to_string();
126
127        Ok(Self {
128            compression,
129            root_data_type,
130            number_of_rows,
131            file_format_version,
132            column_statistics,
133            stripes,
134            user_custom_metadata,
135        })
136    }
137
138    pub fn number_of_rows(&self) -> u64 {
139        self.number_of_rows
140    }
141
142    pub fn compression(&self) -> Option<Compression> {
143        self.compression
144    }
145
146    pub fn root_data_type(&self) -> &RootDataType {
147        &self.root_data_type
148    }
149
150    pub fn column_file_statistics(&self) -> &[ColumnStatistics] {
151        &self.column_statistics
152    }
153
154    pub fn stripe_metadatas(&self) -> &[StripeMetadata] {
155        &self.stripes
156    }
157
158    pub fn user_custom_metadata(&self) -> &HashMap<String, Vec<u8>> {
159        &self.user_custom_metadata
160    }
161
162    pub fn file_format_version(&self) -> &str {
163        &self.file_format_version
164    }
165}
166
167pub fn read_metadata<R: ChunkReader>(reader: &mut R) -> Result<FileMetadata> {
168    let file_len = reader.len();
169    if file_len == 0 {
170        return EmptyFileSnafu.fail();
171    }
172
173    // Initial read of the file tail
174    // Use a default size for first read in hopes of capturing all sections with one read
175    // At worst need two reads to get all necessary bytes
176    let assume_footer_len = file_len.min(DEFAULT_FOOTER_SIZE);
177    let mut tail_bytes = reader
178        .get_bytes(file_len - assume_footer_len, assume_footer_len)
179        .context(error::IoSnafu)?;
180
181    // The final byte of the file contains the serialized length of the Postscript,
182    // which must be less than 256 bytes.
183    let postscript_len = tail_bytes[tail_bytes.len() - 1] as u64;
184    tail_bytes.truncate(tail_bytes.len() - 1);
185
186    if tail_bytes.len() < postscript_len as usize {
187        return OutOfSpecSnafu {
188            msg: "File too small for given postscript length",
189        }
190        .fail();
191    }
192    let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len as usize..])
193        .context(error::DecodeProtoSnafu)?;
194    let compression =
195        Compression::from_proto(postscript.compression(), postscript.compression_block_size);
196    tail_bytes.truncate(tail_bytes.len() - postscript_len as usize);
197
198    let footer_length = postscript.footer_length.context(error::OutOfSpecSnafu {
199        msg: "Footer length is empty",
200    })?;
201    let metadata_length = postscript.metadata_length.context(error::OutOfSpecSnafu {
202        msg: "Metadata length is empty",
203    })?;
204
205    // Ensure we have enough bytes for Footer and Metadata
206    let mut tail_bytes = if footer_length + metadata_length > tail_bytes.len() as u64 {
207        // Need second read
208        // -1 is the postscript length byte
209        let offset = file_len - 1 - postscript_len - footer_length - metadata_length;
210        let bytes_to_read = (footer_length + metadata_length) - tail_bytes.len() as u64;
211        let prepend_bytes = reader
212            .get_bytes(offset, bytes_to_read)
213            .context(error::IoSnafu)?;
214        let mut all_bytes = BytesMut::with_capacity(prepend_bytes.len() + tail_bytes.len());
215        all_bytes.extend_from_slice(&prepend_bytes);
216        all_bytes.extend_from_slice(&tail_bytes);
217        all_bytes.into()
218    } else {
219        tail_bytes
220    };
221
222    let footer = deserialize_footer(
223        tail_bytes.slice(tail_bytes.len() - footer_length as usize..),
224        compression,
225    )?;
226    tail_bytes.truncate(tail_bytes.len() - footer_length as usize);
227
228    let metadata = deserialize_footer_metadata(
229        tail_bytes.slice(tail_bytes.len() - metadata_length as usize..),
230        compression,
231    )?;
232
233    FileMetadata::from_proto(&postscript, &footer, &metadata)
234}
235
236#[cfg(feature = "async")]
237pub async fn read_metadata_async<R: super::AsyncChunkReader>(
238    reader: &mut R,
239) -> Result<FileMetadata> {
240    let file_len = reader.len().await.context(error::IoSnafu)?;
241    if file_len == 0 {
242        return EmptyFileSnafu.fail();
243    }
244
245    // Initial read of the file tail
246    // Use a default size for first read in hopes of capturing all sections with one read
247    // At worst need two reads to get all necessary bytes
248    let assume_footer_len = file_len.min(DEFAULT_FOOTER_SIZE);
249    let mut tail_bytes = reader
250        .get_bytes(file_len - assume_footer_len, assume_footer_len)
251        .await
252        .context(error::IoSnafu)?;
253
254    // The final byte of the file contains the serialized length of the Postscript,
255    // which must be less than 256 bytes.
256    let postscript_len = tail_bytes[tail_bytes.len() - 1] as u64;
257    tail_bytes.truncate(tail_bytes.len() - 1);
258
259    if tail_bytes.len() < postscript_len as usize {
260        return OutOfSpecSnafu {
261            msg: "File too small for given postscript length",
262        }
263        .fail();
264    }
265    let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len as usize..])
266        .context(error::DecodeProtoSnafu)?;
267    let compression =
268        Compression::from_proto(postscript.compression(), postscript.compression_block_size);
269    tail_bytes.truncate(tail_bytes.len() - postscript_len as usize);
270
271    let footer_length = postscript.footer_length.context(error::OutOfSpecSnafu {
272        msg: "Footer length is empty",
273    })?;
274    let metadata_length = postscript.metadata_length.context(error::OutOfSpecSnafu {
275        msg: "Metadata length is empty",
276    })?;
277
278    // Ensure we have enough bytes for Footer and Metadata
279    let mut tail_bytes = if footer_length + metadata_length > tail_bytes.len() as u64 {
280        // Need second read
281        // -1 is the postscript length byte
282        let offset = file_len - 1 - postscript_len - footer_length - metadata_length;
283        let bytes_to_read = (footer_length + metadata_length) - tail_bytes.len() as u64;
284        let prepend_bytes = reader
285            .get_bytes(offset, bytes_to_read)
286            .await
287            .context(error::IoSnafu)?;
288        let mut all_bytes = BytesMut::with_capacity(prepend_bytes.len() + tail_bytes.len());
289        all_bytes.extend_from_slice(&prepend_bytes);
290        all_bytes.extend_from_slice(&tail_bytes);
291        all_bytes.into()
292    } else {
293        tail_bytes
294    };
295
296    let footer = deserialize_footer(
297        tail_bytes.slice(tail_bytes.len() - footer_length as usize..),
298        compression,
299    )?;
300    tail_bytes.truncate(tail_bytes.len() - footer_length as usize);
301
302    let metadata = deserialize_footer_metadata(
303        tail_bytes.slice(tail_bytes.len() - metadata_length as usize..),
304        compression,
305    )?;
306
307    FileMetadata::from_proto(&postscript, &footer, &metadata)
308}
309
310fn deserialize_footer(bytes: Bytes, compression: Option<Compression>) -> Result<Footer> {
311    let mut buffer = vec![];
312    Decompressor::new(bytes, compression, vec![])
313        .read_to_end(&mut buffer)
314        .context(error::IoSnafu)?;
315    Footer::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
316}
317
318fn deserialize_footer_metadata(bytes: Bytes, compression: Option<Compression>) -> Result<Metadata> {
319    let mut buffer = vec![];
320    Decompressor::new(bytes, compression, vec![])
321        .read_to_end(&mut buffer)
322        .context(error::IoSnafu)?;
323    Metadata::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
324}