orc_rust/reader/
metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Parse ORC file tail metadata structures from file.
19//!
20//! File tail structure:
21//!
22//! ```text
23//! ------------------
24//! |    Metadata    |
25//! |                |
26//! ------------------
27//! |     Footer     |
28//! |                |
29//! ------------------
30//! |  Postscript  |X|
31//! ------------------
32//! ```
33//!
34//! Where X is last byte in file indicating
35//! Postscript length in bytes.
36//!
37//! Footer and Metadata lengths are encoded in Postscript.
38//! Postscript is never compressed, Footer and Metadata
39//! may be compressed depending Postscript config value.
40//!
41//! If they are compressed then their lengths indicate their
42//! compressed lengths.
43
44use std::collections::HashMap;
45use std::io::Read;
46
47use bytes::{Bytes, BytesMut};
48use prost::Message;
49use snafu::{ensure, OptionExt, ResultExt};
50
51use crate::compression::{Compression, Decompressor};
52use crate::error::{self, EmptyFileSnafu, OutOfSpecSnafu, Result};
53use crate::proto::{self, Footer, Metadata, PostScript};
54use crate::schema::RootDataType;
55use crate::statistics::ColumnStatistics;
56use crate::stripe::StripeMetadata;
57
58use crate::reader::ChunkReader;
59
60const DEFAULT_FOOTER_SIZE: u64 = 16 * 1024;
61
62/// The file's metadata.
63#[derive(Debug, Clone)]
64pub struct FileMetadata {
65    compression: Option<Compression>,
66    root_data_type: RootDataType,
67    number_of_rows: u64,
68    file_format_version: String,
69    /// Statistics of columns across entire file
70    column_statistics: Vec<ColumnStatistics>,
71    stripes: Vec<StripeMetadata>,
72    user_custom_metadata: HashMap<String, Vec<u8>>,
73    /// The maximum number of rows in each index entry (default 10,000)
74    row_index_stride: Option<u32>,
75}
76
77impl FileMetadata {
78    fn from_proto(
79        postscript: &proto::PostScript,
80        footer: &proto::Footer,
81        metadata: &proto::Metadata,
82    ) -> Result<Self> {
83        let compression =
84            Compression::from_proto(postscript.compression(), postscript.compression_block_size);
85        let root_data_type = RootDataType::from_proto(&footer.types)?;
86        let number_of_rows = footer.number_of_rows();
87        let column_statistics = footer
88            .statistics
89            .iter()
90            .map(TryFrom::try_from)
91            .collect::<Result<Vec<_>>>()?;
92        ensure!(
93            metadata.stripe_stats.is_empty() || metadata.stripe_stats.len() == footer.stripes.len(),
94            OutOfSpecSnafu {
95                msg: "stripe stats length must equal the number of stripes"
96            }
97        );
98        // TODO: confirm if this is valid
99        let stripes = if metadata.stripe_stats.is_empty() {
100            footer
101                .stripes
102                .iter()
103                .map(TryFrom::try_from)
104                .collect::<Result<Vec<_>>>()?
105        } else {
106            footer
107                .stripes
108                .iter()
109                .zip(metadata.stripe_stats.iter())
110                .map(TryFrom::try_from)
111                .collect::<Result<Vec<_>>>()?
112        };
113        let user_custom_metadata = footer
114            .metadata
115            .iter()
116            .map(|kv| (kv.name().to_owned(), kv.value().to_vec()))
117            .collect::<HashMap<_, _>>();
118
119        let file_format_version = postscript
120            .version
121            .iter()
122            .map(|v| v.to_string() + ".")
123            .collect::<String>();
124        let file_format_version = file_format_version
125            .strip_suffix('.')
126            .unwrap_or("")
127            .to_string();
128
129        Ok(Self {
130            compression,
131            root_data_type,
132            number_of_rows,
133            file_format_version,
134            column_statistics,
135            stripes,
136            user_custom_metadata,
137            row_index_stride: footer.row_index_stride,
138        })
139    }
140
141    pub fn number_of_rows(&self) -> u64 {
142        self.number_of_rows
143    }
144
145    pub fn compression(&self) -> Option<Compression> {
146        self.compression
147    }
148
149    pub fn root_data_type(&self) -> &RootDataType {
150        &self.root_data_type
151    }
152
153    pub fn column_file_statistics(&self) -> &[ColumnStatistics] {
154        &self.column_statistics
155    }
156
157    pub fn stripe_metadatas(&self) -> &[StripeMetadata] {
158        &self.stripes
159    }
160
161    pub fn user_custom_metadata(&self) -> &HashMap<String, Vec<u8>> {
162        &self.user_custom_metadata
163    }
164
165    /// Get the row index stride (rows per row group)
166    ///
167    /// Returns the number of rows per row group used for row-level indexes.
168    /// Default is 10,000 according to ORC spec.
169    ///
170    /// If `None` is returned, it means row indexes are not enabled for this file.
171    pub fn row_index_stride(&self) -> Option<usize> {
172        self.row_index_stride.map(|s| s as usize)
173    }
174
175    pub fn file_format_version(&self) -> &str {
176        &self.file_format_version
177    }
178}
179
180pub fn read_metadata<R: ChunkReader>(reader: &mut R) -> Result<FileMetadata> {
181    let file_len = reader.len();
182    if file_len == 0 {
183        return EmptyFileSnafu.fail();
184    }
185
186    // Initial read of the file tail
187    // Use a default size for first read in hopes of capturing all sections with one read
188    // At worst need two reads to get all necessary bytes
189    let assume_footer_len = file_len.min(DEFAULT_FOOTER_SIZE);
190    let mut tail_bytes = reader
191        .get_bytes(file_len - assume_footer_len, assume_footer_len)
192        .context(error::IoSnafu)?;
193
194    // The final byte of the file contains the serialized length of the Postscript,
195    // which must be less than 256 bytes.
196    let postscript_len = tail_bytes[tail_bytes.len() - 1] as u64;
197    tail_bytes.truncate(tail_bytes.len() - 1);
198
199    if tail_bytes.len() < postscript_len as usize {
200        return OutOfSpecSnafu {
201            msg: "File too small for given postscript length",
202        }
203        .fail();
204    }
205    let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len as usize..])
206        .context(error::DecodeProtoSnafu)?;
207    let compression =
208        Compression::from_proto(postscript.compression(), postscript.compression_block_size);
209    tail_bytes.truncate(tail_bytes.len() - postscript_len as usize);
210
211    let footer_length = postscript.footer_length.context(error::OutOfSpecSnafu {
212        msg: "Footer length is empty",
213    })?;
214    let metadata_length = postscript.metadata_length.context(error::OutOfSpecSnafu {
215        msg: "Metadata length is empty",
216    })?;
217
218    // Ensure we have enough bytes for Footer and Metadata
219    let mut tail_bytes = if footer_length + metadata_length > tail_bytes.len() as u64 {
220        // Need second read
221        // -1 is the postscript length byte
222        let offset = file_len - 1 - postscript_len - footer_length - metadata_length;
223        let bytes_to_read = (footer_length + metadata_length) - tail_bytes.len() as u64;
224        let prepend_bytes = reader
225            .get_bytes(offset, bytes_to_read)
226            .context(error::IoSnafu)?;
227        let mut all_bytes = BytesMut::with_capacity(prepend_bytes.len() + tail_bytes.len());
228        all_bytes.extend_from_slice(&prepend_bytes);
229        all_bytes.extend_from_slice(&tail_bytes);
230        all_bytes.into()
231    } else {
232        tail_bytes
233    };
234
235    let footer = deserialize_footer(
236        tail_bytes.slice(tail_bytes.len() - footer_length as usize..),
237        compression,
238    )?;
239    tail_bytes.truncate(tail_bytes.len() - footer_length as usize);
240
241    let metadata = deserialize_footer_metadata(
242        tail_bytes.slice(tail_bytes.len() - metadata_length as usize..),
243        compression,
244    )?;
245
246    FileMetadata::from_proto(&postscript, &footer, &metadata)
247}
248
249#[cfg(feature = "async")]
250pub async fn read_metadata_async<R: super::AsyncChunkReader>(
251    reader: &mut R,
252) -> Result<FileMetadata> {
253    let file_len = reader.len().await.context(error::IoSnafu)?;
254    if file_len == 0 {
255        return EmptyFileSnafu.fail();
256    }
257
258    // Initial read of the file tail
259    // Use a default size for first read in hopes of capturing all sections with one read
260    // At worst need two reads to get all necessary bytes
261    let assume_footer_len = file_len.min(DEFAULT_FOOTER_SIZE);
262    let mut tail_bytes = reader
263        .get_bytes(file_len - assume_footer_len, assume_footer_len)
264        .await
265        .context(error::IoSnafu)?;
266
267    // The final byte of the file contains the serialized length of the Postscript,
268    // which must be less than 256 bytes.
269    let postscript_len = tail_bytes[tail_bytes.len() - 1] as u64;
270    tail_bytes.truncate(tail_bytes.len() - 1);
271
272    if tail_bytes.len() < postscript_len as usize {
273        return OutOfSpecSnafu {
274            msg: "File too small for given postscript length",
275        }
276        .fail();
277    }
278    let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len as usize..])
279        .context(error::DecodeProtoSnafu)?;
280    let compression =
281        Compression::from_proto(postscript.compression(), postscript.compression_block_size);
282    tail_bytes.truncate(tail_bytes.len() - postscript_len as usize);
283
284    let footer_length = postscript.footer_length.context(error::OutOfSpecSnafu {
285        msg: "Footer length is empty",
286    })?;
287    let metadata_length = postscript.metadata_length.context(error::OutOfSpecSnafu {
288        msg: "Metadata length is empty",
289    })?;
290
291    // Ensure we have enough bytes for Footer and Metadata
292    let mut tail_bytes = if footer_length + metadata_length > tail_bytes.len() as u64 {
293        // Need second read
294        // -1 is the postscript length byte
295        let offset = file_len - 1 - postscript_len - footer_length - metadata_length;
296        let bytes_to_read = (footer_length + metadata_length) - tail_bytes.len() as u64;
297        let prepend_bytes = reader
298            .get_bytes(offset, bytes_to_read)
299            .await
300            .context(error::IoSnafu)?;
301        let mut all_bytes = BytesMut::with_capacity(prepend_bytes.len() + tail_bytes.len());
302        all_bytes.extend_from_slice(&prepend_bytes);
303        all_bytes.extend_from_slice(&tail_bytes);
304        all_bytes.into()
305    } else {
306        tail_bytes
307    };
308
309    let footer = deserialize_footer(
310        tail_bytes.slice(tail_bytes.len() - footer_length as usize..),
311        compression,
312    )?;
313    tail_bytes.truncate(tail_bytes.len() - footer_length as usize);
314
315    let metadata = deserialize_footer_metadata(
316        tail_bytes.slice(tail_bytes.len() - metadata_length as usize..),
317        compression,
318    )?;
319
320    FileMetadata::from_proto(&postscript, &footer, &metadata)
321}
322
323fn deserialize_footer(bytes: Bytes, compression: Option<Compression>) -> Result<Footer> {
324    let mut buffer = vec![];
325    Decompressor::new(bytes, compression, vec![])
326        .read_to_end(&mut buffer)
327        .context(error::IoSnafu)?;
328    Footer::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
329}
330
331fn deserialize_footer_metadata(bytes: Bytes, compression: Option<Compression>) -> Result<Metadata> {
332    let mut buffer = vec![];
333    Decompressor::new(bytes, compression, vec![])
334        .read_to_end(&mut buffer)
335        .context(error::IoSnafu)?;
336    Metadata::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
337}