orc_rust/reader/
metadata.rs1use std::collections::HashMap;
45use std::io::Read;
46
47use bytes::{Bytes, BytesMut};
48use prost::Message;
49use snafu::{ensure, OptionExt, ResultExt};
50
51use crate::compression::{Compression, Decompressor};
52use crate::error::{self, EmptyFileSnafu, OutOfSpecSnafu, Result};
53use crate::proto::{self, Footer, Metadata, PostScript};
54use crate::schema::RootDataType;
55use crate::statistics::ColumnStatistics;
56use crate::stripe::StripeMetadata;
57
58use crate::reader::ChunkReader;
59
60const DEFAULT_FOOTER_SIZE: u64 = 16 * 1024;
61
62#[derive(Debug, Clone)]
64pub struct FileMetadata {
65 compression: Option<Compression>,
66 root_data_type: RootDataType,
67 number_of_rows: u64,
68 file_format_version: String,
69 column_statistics: Vec<ColumnStatistics>,
71 stripes: Vec<StripeMetadata>,
72 user_custom_metadata: HashMap<String, Vec<u8>>,
73}
74
75impl FileMetadata {
76 fn from_proto(
77 postscript: &proto::PostScript,
78 footer: &proto::Footer,
79 metadata: &proto::Metadata,
80 ) -> Result<Self> {
81 let compression =
82 Compression::from_proto(postscript.compression(), postscript.compression_block_size);
83 let root_data_type = RootDataType::from_proto(&footer.types)?;
84 let number_of_rows = footer.number_of_rows();
85 let column_statistics = footer
86 .statistics
87 .iter()
88 .map(TryFrom::try_from)
89 .collect::<Result<Vec<_>>>()?;
90 ensure!(
91 metadata.stripe_stats.is_empty() || metadata.stripe_stats.len() == footer.stripes.len(),
92 OutOfSpecSnafu {
93 msg: "stripe stats length must equal the number of stripes"
94 }
95 );
96 let stripes = if metadata.stripe_stats.is_empty() {
98 footer
99 .stripes
100 .iter()
101 .map(TryFrom::try_from)
102 .collect::<Result<Vec<_>>>()?
103 } else {
104 footer
105 .stripes
106 .iter()
107 .zip(metadata.stripe_stats.iter())
108 .map(TryFrom::try_from)
109 .collect::<Result<Vec<_>>>()?
110 };
111 let user_custom_metadata = footer
112 .metadata
113 .iter()
114 .map(|kv| (kv.name().to_owned(), kv.value().to_vec()))
115 .collect::<HashMap<_, _>>();
116
117 let file_format_version = postscript
118 .version
119 .iter()
120 .map(|v| v.to_string() + ".")
121 .collect::<String>();
122 let file_format_version = file_format_version
123 .strip_suffix('.')
124 .unwrap_or("")
125 .to_string();
126
127 Ok(Self {
128 compression,
129 root_data_type,
130 number_of_rows,
131 file_format_version,
132 column_statistics,
133 stripes,
134 user_custom_metadata,
135 })
136 }
137
138 pub fn number_of_rows(&self) -> u64 {
139 self.number_of_rows
140 }
141
142 pub fn compression(&self) -> Option<Compression> {
143 self.compression
144 }
145
146 pub fn root_data_type(&self) -> &RootDataType {
147 &self.root_data_type
148 }
149
150 pub fn column_file_statistics(&self) -> &[ColumnStatistics] {
151 &self.column_statistics
152 }
153
154 pub fn stripe_metadatas(&self) -> &[StripeMetadata] {
155 &self.stripes
156 }
157
158 pub fn user_custom_metadata(&self) -> &HashMap<String, Vec<u8>> {
159 &self.user_custom_metadata
160 }
161
162 pub fn file_format_version(&self) -> &str {
163 &self.file_format_version
164 }
165}
166
167pub fn read_metadata<R: ChunkReader>(reader: &mut R) -> Result<FileMetadata> {
168 let file_len = reader.len();
169 if file_len == 0 {
170 return EmptyFileSnafu.fail();
171 }
172
173 let assume_footer_len = file_len.min(DEFAULT_FOOTER_SIZE);
177 let mut tail_bytes = reader
178 .get_bytes(file_len - assume_footer_len, assume_footer_len)
179 .context(error::IoSnafu)?;
180
181 let postscript_len = tail_bytes[tail_bytes.len() - 1] as u64;
184 tail_bytes.truncate(tail_bytes.len() - 1);
185
186 if tail_bytes.len() < postscript_len as usize {
187 return OutOfSpecSnafu {
188 msg: "File too small for given postscript length",
189 }
190 .fail();
191 }
192 let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len as usize..])
193 .context(error::DecodeProtoSnafu)?;
194 let compression =
195 Compression::from_proto(postscript.compression(), postscript.compression_block_size);
196 tail_bytes.truncate(tail_bytes.len() - postscript_len as usize);
197
198 let footer_length = postscript.footer_length.context(error::OutOfSpecSnafu {
199 msg: "Footer length is empty",
200 })?;
201 let metadata_length = postscript.metadata_length.context(error::OutOfSpecSnafu {
202 msg: "Metadata length is empty",
203 })?;
204
205 let mut tail_bytes = if footer_length + metadata_length > tail_bytes.len() as u64 {
207 let offset = file_len - 1 - postscript_len - footer_length - metadata_length;
210 let bytes_to_read = (footer_length + metadata_length) - tail_bytes.len() as u64;
211 let prepend_bytes = reader
212 .get_bytes(offset, bytes_to_read)
213 .context(error::IoSnafu)?;
214 let mut all_bytes = BytesMut::with_capacity(prepend_bytes.len() + tail_bytes.len());
215 all_bytes.extend_from_slice(&prepend_bytes);
216 all_bytes.extend_from_slice(&tail_bytes);
217 all_bytes.into()
218 } else {
219 tail_bytes
220 };
221
222 let footer = deserialize_footer(
223 tail_bytes.slice(tail_bytes.len() - footer_length as usize..),
224 compression,
225 )?;
226 tail_bytes.truncate(tail_bytes.len() - footer_length as usize);
227
228 let metadata = deserialize_footer_metadata(
229 tail_bytes.slice(tail_bytes.len() - metadata_length as usize..),
230 compression,
231 )?;
232
233 FileMetadata::from_proto(&postscript, &footer, &metadata)
234}
235
236#[cfg(feature = "async")]
237pub async fn read_metadata_async<R: super::AsyncChunkReader>(
238 reader: &mut R,
239) -> Result<FileMetadata> {
240 let file_len = reader.len().await.context(error::IoSnafu)?;
241 if file_len == 0 {
242 return EmptyFileSnafu.fail();
243 }
244
245 let assume_footer_len = file_len.min(DEFAULT_FOOTER_SIZE);
249 let mut tail_bytes = reader
250 .get_bytes(file_len - assume_footer_len, assume_footer_len)
251 .await
252 .context(error::IoSnafu)?;
253
254 let postscript_len = tail_bytes[tail_bytes.len() - 1] as u64;
257 tail_bytes.truncate(tail_bytes.len() - 1);
258
259 if tail_bytes.len() < postscript_len as usize {
260 return OutOfSpecSnafu {
261 msg: "File too small for given postscript length",
262 }
263 .fail();
264 }
265 let postscript = PostScript::decode(&tail_bytes[tail_bytes.len() - postscript_len as usize..])
266 .context(error::DecodeProtoSnafu)?;
267 let compression =
268 Compression::from_proto(postscript.compression(), postscript.compression_block_size);
269 tail_bytes.truncate(tail_bytes.len() - postscript_len as usize);
270
271 let footer_length = postscript.footer_length.context(error::OutOfSpecSnafu {
272 msg: "Footer length is empty",
273 })?;
274 let metadata_length = postscript.metadata_length.context(error::OutOfSpecSnafu {
275 msg: "Metadata length is empty",
276 })?;
277
278 let mut tail_bytes = if footer_length + metadata_length > tail_bytes.len() as u64 {
280 let offset = file_len - 1 - postscript_len - footer_length - metadata_length;
283 let bytes_to_read = (footer_length + metadata_length) - tail_bytes.len() as u64;
284 let prepend_bytes = reader
285 .get_bytes(offset, bytes_to_read)
286 .await
287 .context(error::IoSnafu)?;
288 let mut all_bytes = BytesMut::with_capacity(prepend_bytes.len() + tail_bytes.len());
289 all_bytes.extend_from_slice(&prepend_bytes);
290 all_bytes.extend_from_slice(&tail_bytes);
291 all_bytes.into()
292 } else {
293 tail_bytes
294 };
295
296 let footer = deserialize_footer(
297 tail_bytes.slice(tail_bytes.len() - footer_length as usize..),
298 compression,
299 )?;
300 tail_bytes.truncate(tail_bytes.len() - footer_length as usize);
301
302 let metadata = deserialize_footer_metadata(
303 tail_bytes.slice(tail_bytes.len() - metadata_length as usize..),
304 compression,
305 )?;
306
307 FileMetadata::from_proto(&postscript, &footer, &metadata)
308}
309
310fn deserialize_footer(bytes: Bytes, compression: Option<Compression>) -> Result<Footer> {
311 let mut buffer = vec![];
312 Decompressor::new(bytes, compression, vec![])
313 .read_to_end(&mut buffer)
314 .context(error::IoSnafu)?;
315 Footer::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
316}
317
318fn deserialize_footer_metadata(bytes: Bytes, compression: Option<Compression>) -> Result<Metadata> {
319 let mut buffer = vec![];
320 Decompressor::new(bytes, compression, vec![])
321 .read_to_end(&mut buffer)
322 .context(error::IoSnafu)?;
323 Metadata::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
324}