1use std::{collections::HashMap, io::Read, sync::Arc};
19
20use bytes::Bytes;
21use prost::Message;
22use snafu::ResultExt;
23
24use crate::{
25 column::Column,
26 compression::{Compression, Decompressor},
27 error::{self, IoSnafu, Result},
28 proto::{self, stream::Kind, StripeFooter},
29 reader::{metadata::FileMetadata, ChunkReader},
30 schema::RootDataType,
31 statistics::ColumnStatistics,
32};
33
34#[derive(Debug, Clone)]
38pub struct StripeMetadata {
39 column_statistics: Vec<ColumnStatistics>,
41 offset: u64,
43 index_length: u64,
45 data_length: u64,
47 footer_length: u64,
49 number_of_rows: u64,
51}
52
53impl StripeMetadata {
54 pub fn offset(&self) -> u64 {
55 self.offset
56 }
57
58 pub fn index_length(&self) -> u64 {
59 self.index_length
60 }
61
62 pub fn data_length(&self) -> u64 {
63 self.data_length
64 }
65
66 pub fn footer_length(&self) -> u64 {
67 self.footer_length
68 }
69
70 pub fn number_of_rows(&self) -> u64 {
71 self.number_of_rows
72 }
73
74 pub fn column_statistics(&self) -> &[ColumnStatistics] {
75 &self.column_statistics
76 }
77
78 pub fn footer_offset(&self) -> u64 {
79 self.offset + self.index_length + self.data_length
80 }
81}
82
83impl TryFrom<(&proto::StripeInformation, &proto::StripeStatistics)> for StripeMetadata {
84 type Error = error::OrcError;
85
86 fn try_from(value: (&proto::StripeInformation, &proto::StripeStatistics)) -> Result<Self> {
87 let (info, statistics) = value;
88 let column_statistics = statistics
89 .col_stats
90 .iter()
91 .map(TryFrom::try_from)
92 .collect::<Result<Vec<_>>>()?;
93 Ok(Self {
94 column_statistics,
95 offset: info.offset(),
96 index_length: info.index_length(),
97 data_length: info.data_length(),
98 footer_length: info.footer_length(),
99 number_of_rows: info.number_of_rows(),
100 })
101 }
102}
103
104impl TryFrom<&proto::StripeInformation> for StripeMetadata {
105 type Error = error::OrcError;
106
107 fn try_from(value: &proto::StripeInformation) -> Result<Self> {
108 Ok(Self {
109 column_statistics: vec![],
110 offset: value.offset(),
111 index_length: value.index_length(),
112 data_length: value.data_length(),
113 footer_length: value.footer_length(),
114 number_of_rows: value.number_of_rows(),
115 })
116 }
117}
118
119#[derive(Debug)]
120pub struct Stripe {
121 columns: Vec<Column>,
122 stream_map: StreamMap,
123 number_of_rows: usize,
124 tz: Option<chrono_tz::Tz>,
125}
126
127impl Stripe {
128 pub fn new<R: ChunkReader>(
129 reader: &mut R,
130 file_metadata: &FileMetadata,
131 projected_data_type: &RootDataType,
132 info: &StripeMetadata,
133 ) -> Result<Self> {
134 let footer = reader
135 .get_bytes(info.footer_offset(), info.footer_length())
136 .context(IoSnafu)?;
137 let footer = Arc::new(deserialize_stripe_footer(
138 footer,
139 file_metadata.compression(),
140 )?);
141
142 let columns: Vec<Column> = projected_data_type
143 .children()
144 .iter()
145 .map(|col| {
146 Column::new(
147 col.name().to_string(),
148 col.data_type().clone(),
149 footer.clone(),
150 )
151 })
152 .collect();
153
154 let mut stream_map = HashMap::new();
155 let mut stream_offset = info.offset();
156 for stream in &footer.streams {
157 let length = stream.length();
158 let column_id = stream.column();
159 if projected_data_type.contains_column_index(column_id as usize) {
160 let kind = stream.kind();
161 let data = reader.get_bytes(stream_offset, length).context(IoSnafu)?;
162 stream_map.insert((column_id, kind), data);
163 }
164 stream_offset += length;
165 }
166
167 let tz = footer
168 .writer_timezone
169 .as_ref()
170 .map(|a| a.parse::<chrono_tz::Tz>().unwrap());
172
173 Ok(Self {
174 columns,
175 stream_map: StreamMap {
176 inner: stream_map,
177 compression: file_metadata.compression(),
178 },
179 number_of_rows: info.number_of_rows() as usize,
180 tz,
181 })
182 }
183
184 #[cfg(feature = "async")]
186 pub async fn new_async<R: crate::reader::AsyncChunkReader>(
187 reader: &mut R,
188 file_metadata: &FileMetadata,
189 projected_data_type: &RootDataType,
190 info: &StripeMetadata,
191 ) -> Result<Self> {
192 let footer = reader
193 .get_bytes(info.footer_offset(), info.footer_length())
194 .await
195 .context(IoSnafu)?;
196 let footer = Arc::new(deserialize_stripe_footer(
197 footer,
198 file_metadata.compression(),
199 )?);
200
201 let columns: Vec<Column> = projected_data_type
202 .children()
203 .iter()
204 .map(|col| {
205 Column::new(
206 col.name().to_string(),
207 col.data_type().clone(),
208 footer.clone(),
209 )
210 })
211 .collect();
212
213 let mut stream_map = HashMap::new();
214 let mut stream_offset = info.offset();
215 for stream in &footer.streams {
216 let length = stream.length();
217 let column_id = stream.column();
218 if projected_data_type.contains_column_index(column_id as usize) {
219 let kind = stream.kind();
220 let data = reader
221 .get_bytes(stream_offset, length)
222 .await
223 .context(IoSnafu)?;
224 stream_map.insert((column_id, kind), data);
225 }
226
227 stream_offset += length;
228 }
229
230 let tz = footer
231 .writer_timezone
232 .as_ref()
233 .map(|a| a.parse::<chrono_tz::Tz>().unwrap());
235
236 Ok(Self {
237 columns,
238 stream_map: StreamMap {
239 inner: stream_map,
240 compression: file_metadata.compression(),
241 },
242 number_of_rows: info.number_of_rows() as usize,
243 tz,
244 })
245 }
246
247 pub fn number_of_rows(&self) -> usize {
248 self.number_of_rows
249 }
250
251 pub fn stream_map(&self) -> &StreamMap {
252 &self.stream_map
253 }
254
255 pub fn columns(&self) -> &[Column] {
256 &self.columns
257 }
258
259 pub fn writer_tz(&self) -> Option<chrono_tz::Tz> {
260 self.tz
261 }
262}
263
264#[derive(Debug)]
265pub struct StreamMap {
266 inner: HashMap<(u32, Kind), Bytes>,
268 compression: Option<Compression>,
269}
270
271impl StreamMap {
272 pub fn get(&self, column: &Column, kind: Kind) -> Decompressor {
273 self.get_opt(column, kind)
278 .unwrap_or_else(Decompressor::empty)
279 }
280
281 pub fn get_opt(&self, column: &Column, kind: Kind) -> Option<Decompressor> {
282 let column_id = column.column_id();
283
284 self.inner
285 .get(&(column_id, kind))
286 .cloned()
287 .map(|data| Decompressor::new(data, self.compression, vec![]))
288 }
289}
290
291fn deserialize_stripe_footer(
292 bytes: Bytes,
293 compression: Option<Compression>,
294) -> Result<StripeFooter> {
295 let mut buffer = vec![];
296 Decompressor::new(bytes, compression, vec![])
297 .read_to_end(&mut buffer)
298 .context(error::IoSnafu)?;
299 StripeFooter::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
300}