orc_rust/reader/
metadata.rs1use std::collections::HashMap;
45use std::io::Read;
46
47use bytes::{Bytes, BytesMut};
48use prost::Message;
49use snafu::{ensure, OptionExt, ResultExt};
50
51use crate::compression::{Compression, Decompressor};
52use crate::error::{self, EmptyFileSnafu, OutOfSpecSnafu, Result};
53use crate::proto::{self, Footer, Metadata, PostScript};
54use crate::schema::RootDataType;
55use crate::statistics::ColumnStatistics;
56use crate::stripe::StripeMetadata;
57
58use crate::reader::ChunkReader;
59
60const DEFAULT_FOOTER_SIZE: u64 = 16 * 1024;
61
62#[derive(Debug, Clone)]
64pub struct FileMetadata {
65 compression: Option<Compression>,
66 root_data_type: RootDataType,
67 number_of_rows: u64,
68 file_format_version: String,
69 column_statistics: Vec<ColumnStatistics>,
71 stripes: Vec<StripeMetadata>,
72 user_custom_metadata: HashMap<String, Vec<u8>>,
73 row_index_stride: Option<u32>,
75}
76
77impl FileMetadata {
78 fn from_proto(
79 postscript: &proto::PostScript,
80 footer: &proto::Footer,
81 metadata: &proto::Metadata,
82 ) -> Result<Self> {
83 let compression =
84 Compression::from_proto(postscript.compression(), postscript.compression_block_size);
85 let root_data_type = RootDataType::from_proto(&footer.types)?;
86 let number_of_rows = footer.number_of_rows();
87 let column_statistics = footer
88 .statistics
89 .iter()
90 .map(TryFrom::try_from)
91 .collect::<Result<Vec<_>>>()?;
92 ensure!(
93 metadata.stripe_stats.is_empty() || metadata.stripe_stats.len() == footer.stripes.len(),
94 OutOfSpecSnafu {
95 msg: "stripe stats length must equal the number of stripes"
96 }
97 );
98 let stripes = if metadata.stripe_stats.is_empty() {
100 footer
101 .stripes
102 .iter()
103 .map(TryFrom::try_from)
104 .collect::<Result<Vec<_>>>()?
105 } else {
106 footer
107 .stripes
108 .iter()
109 .zip(metadata.stripe_stats.iter())
110 .map(TryFrom::try_from)
111 .collect::<Result<Vec<_>>>()?
112 };
113 let user_custom_metadata = footer
114 .metadata
115 .iter()
116 .map(|kv| (kv.name().to_owned(), kv.value().to_vec()))
117 .collect::<HashMap<_, _>>();
118
119 let file_format_version = postscript
120 .version
121 .iter()
122 .map(|v| v.to_string() + ".")
123 .collect::<String>();
124 let file_format_version = file_format_version
125 .strip_suffix('.')
126 .unwrap_or("")
127 .to_string();
128
129 Ok(Self {
130 compression,
131 root_data_type,
132 number_of_rows,
133 file_format_version,
134 column_statistics,
135 stripes,
136 user_custom_metadata,
137 row_index_stride: footer.row_index_stride,
138 })
139 }
140
141 pub fn number_of_rows(&self) -> u64 {
142 self.number_of_rows
143 }
144
145 pub fn compression(&self) -> Option<Compression> {
146 self.compression
147 }
148
149 pub fn root_data_type(&self) -> &RootDataType {
150 &self.root_data_type
151 }
152
153 pub fn column_file_statistics(&self) -> &[ColumnStatistics] {
154 &self.column_statistics
155 }
156
157 pub fn stripe_metadatas(&self) -> &[StripeMetadata] {
158 &self.stripes
159 }
160
161 pub fn user_custom_metadata(&self) -> &HashMap<String, Vec<u8>> {
162 &self.user_custom_metadata
163 }
164
165 pub fn row_index_stride(&self) -> Option<usize> {
172 self.row_index_stride.map(|s| s as usize)
173 }
174
175 pub fn file_format_version(&self) -> &str {
176 &self.file_format_version
177 }
178}
179
180pub fn read_metadata<R: ChunkReader>(reader: &mut R) -> Result<FileMetadata> {
181 let file_len = reader.len();
182 if file_len == 0 {
183 return EmptyFileSnafu.fail();
184 }
185
186 let assume_footer_len = file_len.min(DEFAULT_FOOTER_SIZE);
190 let mut tail_bytes = reader
191 .get_bytes(file_len - assume_footer_len, assume_footer_len)
192 .context(error::IoSnafu)?;
193
194 let postscript_len = tail_bytes[tail_bytes.len() - 1] as u64;
197 tail_bytes.truncate(tail_bytes.len() - 1);
198
199 if tail_bytes.len() < postscript_len as usize {
200 return OutOfSpecSnafu {
201 msg: "File too small for given postscript length",
202 }
203 .fail();
204 }
205 let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len as usize..])
206 .context(error::DecodeProtoSnafu)?;
207 let compression =
208 Compression::from_proto(postscript.compression(), postscript.compression_block_size);
209 tail_bytes.truncate(tail_bytes.len() - postscript_len as usize);
210
211 let footer_length = postscript.footer_length.context(error::OutOfSpecSnafu {
212 msg: "Footer length is empty",
213 })?;
214 let metadata_length = postscript.metadata_length.context(error::OutOfSpecSnafu {
215 msg: "Metadata length is empty",
216 })?;
217
218 let mut tail_bytes = if footer_length + metadata_length > tail_bytes.len() as u64 {
220 let offset = file_len - 1 - postscript_len - footer_length - metadata_length;
223 let bytes_to_read = (footer_length + metadata_length) - tail_bytes.len() as u64;
224 let prepend_bytes = reader
225 .get_bytes(offset, bytes_to_read)
226 .context(error::IoSnafu)?;
227 let mut all_bytes = BytesMut::with_capacity(prepend_bytes.len() + tail_bytes.len());
228 all_bytes.extend_from_slice(&prepend_bytes);
229 all_bytes.extend_from_slice(&tail_bytes);
230 all_bytes.into()
231 } else {
232 tail_bytes
233 };
234
235 let footer = deserialize_footer(
236 tail_bytes.slice(tail_bytes.len() - footer_length as usize..),
237 compression,
238 )?;
239 tail_bytes.truncate(tail_bytes.len() - footer_length as usize);
240
241 let metadata = deserialize_footer_metadata(
242 tail_bytes.slice(tail_bytes.len() - metadata_length as usize..),
243 compression,
244 )?;
245
246 FileMetadata::from_proto(&postscript, &footer, &metadata)
247}
248
249#[cfg(feature = "async")]
250pub async fn read_metadata_async<R: super::AsyncChunkReader>(
251 reader: &mut R,
252) -> Result<FileMetadata> {
253 let file_len = reader.len().await.context(error::IoSnafu)?;
254 if file_len == 0 {
255 return EmptyFileSnafu.fail();
256 }
257
258 let assume_footer_len = file_len.min(DEFAULT_FOOTER_SIZE);
262 let mut tail_bytes = reader
263 .get_bytes(file_len - assume_footer_len, assume_footer_len)
264 .await
265 .context(error::IoSnafu)?;
266
267 let postscript_len = tail_bytes[tail_bytes.len() - 1] as u64;
270 tail_bytes.truncate(tail_bytes.len() - 1);
271
272 if tail_bytes.len() < postscript_len as usize {
273 return OutOfSpecSnafu {
274 msg: "File too small for given postscript length",
275 }
276 .fail();
277 }
278 let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len as usize..])
279 .context(error::DecodeProtoSnafu)?;
280 let compression =
281 Compression::from_proto(postscript.compression(), postscript.compression_block_size);
282 tail_bytes.truncate(tail_bytes.len() - postscript_len as usize);
283
284 let footer_length = postscript.footer_length.context(error::OutOfSpecSnafu {
285 msg: "Footer length is empty",
286 })?;
287 let metadata_length = postscript.metadata_length.context(error::OutOfSpecSnafu {
288 msg: "Metadata length is empty",
289 })?;
290
291 let mut tail_bytes = if footer_length + metadata_length > tail_bytes.len() as u64 {
293 let offset = file_len - 1 - postscript_len - footer_length - metadata_length;
296 let bytes_to_read = (footer_length + metadata_length) - tail_bytes.len() as u64;
297 let prepend_bytes = reader
298 .get_bytes(offset, bytes_to_read)
299 .await
300 .context(error::IoSnafu)?;
301 let mut all_bytes = BytesMut::with_capacity(prepend_bytes.len() + tail_bytes.len());
302 all_bytes.extend_from_slice(&prepend_bytes);
303 all_bytes.extend_from_slice(&tail_bytes);
304 all_bytes.into()
305 } else {
306 tail_bytes
307 };
308
309 let footer = deserialize_footer(
310 tail_bytes.slice(tail_bytes.len() - footer_length as usize..),
311 compression,
312 )?;
313 tail_bytes.truncate(tail_bytes.len() - footer_length as usize);
314
315 let metadata = deserialize_footer_metadata(
316 tail_bytes.slice(tail_bytes.len() - metadata_length as usize..),
317 compression,
318 )?;
319
320 FileMetadata::from_proto(&postscript, &footer, &metadata)
321}
322
323fn deserialize_footer(bytes: Bytes, compression: Option<Compression>) -> Result<Footer> {
324 let mut buffer = vec![];
325 Decompressor::new(bytes, compression, vec![])
326 .read_to_end(&mut buffer)
327 .context(error::IoSnafu)?;
328 Footer::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
329}
330
331fn deserialize_footer_metadata(bytes: Bytes, compression: Option<Compression>) -> Result<Metadata> {
332 let mut buffer = vec![];
333 Decompressor::new(bytes, compression, vec![])
334 .read_to_end(&mut buffer)
335 .context(error::IoSnafu)?;
336 Metadata::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
337}