1use std::collections::HashSet;
19use std::{collections::HashMap, io::Read, sync::Arc};
20
21use bytes::Bytes;
22use prost::Message;
23use snafu::ResultExt;
24
25use crate::{
26 column::Column,
27 compression::{Compression, Decompressor},
28 error::{self, IoSnafu, Result},
29 proto::{self, stream::Kind, StripeFooter},
30 reader::{metadata::FileMetadata, ChunkReader},
31 schema::RootDataType,
32 statistics::ColumnStatistics,
33};
34
35#[derive(Debug, Clone)]
39pub struct StripeMetadata {
40 column_statistics: Vec<ColumnStatistics>,
42 offset: u64,
44 index_length: u64,
46 data_length: u64,
48 footer_length: u64,
50 number_of_rows: u64,
52}
53
54impl StripeMetadata {
55 pub fn offset(&self) -> u64 {
56 self.offset
57 }
58
59 pub fn index_length(&self) -> u64 {
60 self.index_length
61 }
62
63 pub fn data_length(&self) -> u64 {
64 self.data_length
65 }
66
67 pub fn footer_length(&self) -> u64 {
68 self.footer_length
69 }
70
71 pub fn number_of_rows(&self) -> u64 {
72 self.number_of_rows
73 }
74
75 pub fn column_statistics(&self) -> &[ColumnStatistics] {
76 &self.column_statistics
77 }
78
79 pub fn footer_offset(&self) -> u64 {
80 self.offset + self.index_length + self.data_length
81 }
82}
83
84impl TryFrom<(&proto::StripeInformation, &proto::StripeStatistics)> for StripeMetadata {
85 type Error = error::OrcError;
86
87 fn try_from(value: (&proto::StripeInformation, &proto::StripeStatistics)) -> Result<Self> {
88 let column_statistics = value
89 .1
90 .col_stats
91 .iter()
92 .map(TryFrom::try_from)
93 .collect::<Result<Vec<_>>>()?;
94 Ok(Self {
95 column_statistics,
96 offset: value.0.offset(),
97 index_length: value.0.index_length(),
98 data_length: value.0.data_length(),
99 footer_length: value.0.footer_length(),
100 number_of_rows: value.0.number_of_rows(),
101 })
102 }
103}
104
105impl TryFrom<&proto::StripeInformation> for StripeMetadata {
106 type Error = error::OrcError;
107
108 fn try_from(value: &proto::StripeInformation) -> Result<Self> {
109 Ok(Self {
110 column_statistics: vec![],
111 offset: value.offset(),
112 index_length: value.index_length(),
113 data_length: value.data_length(),
114 footer_length: value.footer_length(),
115 number_of_rows: value.number_of_rows(),
116 })
117 }
118}
119
120#[derive(Debug)]
121pub struct Stripe {
122 columns: Vec<Column>,
123 stream_map: Arc<StreamMap>,
125 number_of_rows: usize,
126 tz: Option<chrono_tz::Tz>,
127}
128
129impl Stripe {
130 pub fn new<R: ChunkReader>(
131 reader: &mut R,
132 file_metadata: &Arc<FileMetadata>,
133 projected_data_type: &RootDataType,
134 info: &StripeMetadata,
135 ) -> Result<Self> {
136 let compression = file_metadata.compression();
137
138 let footer = reader
139 .get_bytes(info.footer_offset(), info.footer_length())
140 .context(IoSnafu)?;
141 let footer = Arc::new(deserialize_stripe_footer(footer, compression)?);
142
143 let columns: Vec<Column> = projected_data_type
144 .children()
145 .iter()
146 .map(|col| Column::new(col.name(), col.data_type(), &footer))
147 .collect();
148 let column_ids = collect_required_column_ids(&columns);
149
150 let mut stream_map = HashMap::new();
151 let mut stream_offset = info.offset();
152 for stream in &footer.streams {
153 let length = stream.length();
154 let column_id = stream.column();
155 if column_ids.contains(&column_id) {
156 let kind = stream.kind();
157 let data = Column::read_stream(reader, stream_offset, length)?;
158 stream_map.insert((column_id, kind), data);
159 }
160 stream_offset += length;
161 }
162
163 let tz: Option<chrono_tz::Tz> = footer
164 .writer_timezone
165 .as_ref()
166 .map(|a| a.parse::<chrono_tz::Tz>().unwrap());
168
169 Ok(Self {
170 columns,
171 stream_map: Arc::new(StreamMap {
172 inner: stream_map,
173 compression,
174 }),
175 number_of_rows: info.number_of_rows() as usize,
176 tz,
177 })
178 }
179
180 #[cfg(feature = "async")]
182 pub async fn new_async<R: crate::reader::AsyncChunkReader>(
183 reader: &mut R,
184 file_metadata: &Arc<FileMetadata>,
185 projected_data_type: &RootDataType,
186 info: &StripeMetadata,
187 ) -> Result<Self> {
188 let compression = file_metadata.compression();
189
190 let footer = reader
191 .get_bytes(info.footer_offset(), info.footer_length())
192 .await
193 .context(IoSnafu)?;
194 let footer = Arc::new(deserialize_stripe_footer(footer, compression)?);
195
196 let columns: Vec<Column> = projected_data_type
197 .children()
198 .iter()
199 .map(|col| Column::new(col.name(), col.data_type(), &footer))
200 .collect();
201 let column_ids = collect_required_column_ids(&columns);
202
203 let mut stream_map = HashMap::new();
204 let mut stream_offset = info.offset();
205 for stream in &footer.streams {
206 let length = stream.length();
207 let column_id = stream.column();
208 if column_ids.contains(&column_id) {
209 let kind = stream.kind();
210 let data = Column::read_stream_async(reader, stream_offset, length).await?;
211 stream_map.insert((column_id, kind), data);
212 }
213
214 stream_offset += length;
215 }
216
217 let tz: Option<chrono_tz::Tz> = footer
218 .writer_timezone
219 .as_ref()
220 .map(|a| a.parse::<chrono_tz::Tz>().unwrap());
222
223 Ok(Self {
224 columns,
225 stream_map: Arc::new(StreamMap {
226 inner: stream_map,
227 compression,
228 }),
229 number_of_rows: info.number_of_rows() as usize,
230 tz,
231 })
232 }
233
234 pub fn number_of_rows(&self) -> usize {
235 self.number_of_rows
236 }
237
238 pub fn stream_map(&self) -> &StreamMap {
240 &self.stream_map
241 }
242
243 pub fn columns(&self) -> &[Column] {
244 &self.columns
245 }
246
247 pub fn writer_tz(&self) -> Option<chrono_tz::Tz> {
248 self.tz
249 }
250}
251
252#[derive(Debug)]
253pub struct StreamMap {
254 pub inner: HashMap<(u32, Kind), Bytes>,
255 pub compression: Option<Compression>,
256}
257
258impl StreamMap {
259 pub fn get(&self, column: &Column, kind: Kind) -> Decompressor {
260 self.get_opt(column, kind)
265 .unwrap_or_else(Decompressor::empty)
266 }
267
268 pub fn get_opt(&self, column: &Column, kind: Kind) -> Option<Decompressor> {
269 let column_id = column.column_id();
270
271 self.inner
272 .get(&(column_id, kind))
273 .cloned()
274 .map(|data| Decompressor::new(data, self.compression, vec![]))
275 }
276}
277
278fn deserialize_stripe_footer(
279 bytes: Bytes,
280 compression: Option<Compression>,
281) -> Result<StripeFooter> {
282 let mut buffer = vec![];
283 Decompressor::new(bytes, compression, vec![])
284 .read_to_end(&mut buffer)
285 .context(error::IoSnafu)?;
286 StripeFooter::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
287}
288
289fn collect_required_column_ids(columns: &[Column]) -> HashSet<u32> {
290 let mut set = HashSet::new();
291 for column in columns {
292 set.insert(column.column_id());
293 set.extend(collect_required_column_ids(&column.children()));
294 }
295 set
296}