orc_rust/
stripe.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashSet;
19use std::{collections::HashMap, io::Read, sync::Arc};
20
21use bytes::Bytes;
22use prost::Message;
23use snafu::ResultExt;
24
25use crate::{
26    column::Column,
27    compression::{Compression, Decompressor},
28    error::{self, IoSnafu, Result},
29    proto::{self, stream::Kind, StripeFooter},
30    reader::{metadata::FileMetadata, ChunkReader},
31    schema::RootDataType,
32    statistics::ColumnStatistics,
33};
34
35/// Stripe metadata parsed from the file tail metadata sections.
36/// Does not contain the actual stripe bytes, as those are decoded
37/// when they are required.
38#[derive(Debug, Clone)]
39pub struct StripeMetadata {
40    /// Statistics of columns across this specific stripe
41    column_statistics: Vec<ColumnStatistics>,
42    /// Byte offset of start of stripe from start of file
43    offset: u64,
44    /// Byte length of index section
45    index_length: u64,
46    /// Byte length of data section
47    data_length: u64,
48    /// Byte length of footer section
49    footer_length: u64,
50    /// Number of rows in the stripe
51    number_of_rows: u64,
52}
53
54impl StripeMetadata {
55    pub fn offset(&self) -> u64 {
56        self.offset
57    }
58
59    pub fn index_length(&self) -> u64 {
60        self.index_length
61    }
62
63    pub fn data_length(&self) -> u64 {
64        self.data_length
65    }
66
67    pub fn footer_length(&self) -> u64 {
68        self.footer_length
69    }
70
71    pub fn number_of_rows(&self) -> u64 {
72        self.number_of_rows
73    }
74
75    pub fn column_statistics(&self) -> &[ColumnStatistics] {
76        &self.column_statistics
77    }
78
79    pub fn footer_offset(&self) -> u64 {
80        self.offset + self.index_length + self.data_length
81    }
82}
83
84impl TryFrom<(&proto::StripeInformation, &proto::StripeStatistics)> for StripeMetadata {
85    type Error = error::OrcError;
86
87    fn try_from(value: (&proto::StripeInformation, &proto::StripeStatistics)) -> Result<Self> {
88        let column_statistics = value
89            .1
90            .col_stats
91            .iter()
92            .map(TryFrom::try_from)
93            .collect::<Result<Vec<_>>>()?;
94        Ok(Self {
95            column_statistics,
96            offset: value.0.offset(),
97            index_length: value.0.index_length(),
98            data_length: value.0.data_length(),
99            footer_length: value.0.footer_length(),
100            number_of_rows: value.0.number_of_rows(),
101        })
102    }
103}
104
105impl TryFrom<&proto::StripeInformation> for StripeMetadata {
106    type Error = error::OrcError;
107
108    fn try_from(value: &proto::StripeInformation) -> Result<Self> {
109        Ok(Self {
110            column_statistics: vec![],
111            offset: value.offset(),
112            index_length: value.index_length(),
113            data_length: value.data_length(),
114            footer_length: value.footer_length(),
115            number_of_rows: value.number_of_rows(),
116        })
117    }
118}
119
120#[derive(Debug)]
121pub struct Stripe {
122    columns: Vec<Column>,
123    /// <(ColumnId, Kind), Bytes>
124    stream_map: Arc<StreamMap>,
125    number_of_rows: usize,
126    tz: Option<chrono_tz::Tz>,
127}
128
129impl Stripe {
130    pub fn new<R: ChunkReader>(
131        reader: &mut R,
132        file_metadata: &Arc<FileMetadata>,
133        projected_data_type: &RootDataType,
134        info: &StripeMetadata,
135    ) -> Result<Self> {
136        let compression = file_metadata.compression();
137
138        let footer = reader
139            .get_bytes(info.footer_offset(), info.footer_length())
140            .context(IoSnafu)?;
141        let footer = Arc::new(deserialize_stripe_footer(footer, compression)?);
142
143        let columns: Vec<Column> = projected_data_type
144            .children()
145            .iter()
146            .map(|col| Column::new(col.name(), col.data_type(), &footer))
147            .collect();
148        let column_ids = collect_required_column_ids(&columns);
149
150        let mut stream_map = HashMap::new();
151        let mut stream_offset = info.offset();
152        for stream in &footer.streams {
153            let length = stream.length();
154            let column_id = stream.column();
155            if column_ids.contains(&column_id) {
156                let kind = stream.kind();
157                let data = Column::read_stream(reader, stream_offset, length)?;
158                stream_map.insert((column_id, kind), data);
159            }
160            stream_offset += length;
161        }
162
163        let tz: Option<chrono_tz::Tz> = footer
164            .writer_timezone
165            .as_ref()
166            // TODO: make this return error
167            .map(|a| a.parse::<chrono_tz::Tz>().unwrap());
168
169        Ok(Self {
170            columns,
171            stream_map: Arc::new(StreamMap {
172                inner: stream_map,
173                compression,
174            }),
175            number_of_rows: info.number_of_rows() as usize,
176            tz,
177        })
178    }
179
180    // TODO: reduce duplication with above
181    #[cfg(feature = "async")]
182    pub async fn new_async<R: crate::reader::AsyncChunkReader>(
183        reader: &mut R,
184        file_metadata: &Arc<FileMetadata>,
185        projected_data_type: &RootDataType,
186        info: &StripeMetadata,
187    ) -> Result<Self> {
188        let compression = file_metadata.compression();
189
190        let footer = reader
191            .get_bytes(info.footer_offset(), info.footer_length())
192            .await
193            .context(IoSnafu)?;
194        let footer = Arc::new(deserialize_stripe_footer(footer, compression)?);
195
196        let columns: Vec<Column> = projected_data_type
197            .children()
198            .iter()
199            .map(|col| Column::new(col.name(), col.data_type(), &footer))
200            .collect();
201        let column_ids = collect_required_column_ids(&columns);
202
203        let mut stream_map = HashMap::new();
204        let mut stream_offset = info.offset();
205        for stream in &footer.streams {
206            let length = stream.length();
207            let column_id = stream.column();
208            if column_ids.contains(&column_id) {
209                let kind = stream.kind();
210                let data = Column::read_stream_async(reader, stream_offset, length).await?;
211                stream_map.insert((column_id, kind), data);
212            }
213
214            stream_offset += length;
215        }
216
217        let tz: Option<chrono_tz::Tz> = footer
218            .writer_timezone
219            .as_ref()
220            // TODO: make this return error
221            .map(|a| a.parse::<chrono_tz::Tz>().unwrap());
222
223        Ok(Self {
224            columns,
225            stream_map: Arc::new(StreamMap {
226                inner: stream_map,
227                compression,
228            }),
229            number_of_rows: info.number_of_rows() as usize,
230            tz,
231        })
232    }
233
234    pub fn number_of_rows(&self) -> usize {
235        self.number_of_rows
236    }
237
238    /// Fetch the stream map
239    pub fn stream_map(&self) -> &StreamMap {
240        &self.stream_map
241    }
242
243    pub fn columns(&self) -> &[Column] {
244        &self.columns
245    }
246
247    pub fn writer_tz(&self) -> Option<chrono_tz::Tz> {
248        self.tz
249    }
250}
251
252#[derive(Debug)]
253pub struct StreamMap {
254    pub inner: HashMap<(u32, Kind), Bytes>,
255    pub compression: Option<Compression>,
256}
257
258impl StreamMap {
259    pub fn get(&self, column: &Column, kind: Kind) -> Decompressor {
260        // There is edge case where if column has no data then the stream might be omitted entirely
261        // (e.g. if there is only 1 null element, then it'll have present stream, but no data stream)
262        // See the integration::meta_data test for an example of this
263        // TODO: some better way to handle this?
264        self.get_opt(column, kind)
265            .unwrap_or_else(Decompressor::empty)
266    }
267
268    pub fn get_opt(&self, column: &Column, kind: Kind) -> Option<Decompressor> {
269        let column_id = column.column_id();
270
271        self.inner
272            .get(&(column_id, kind))
273            .cloned()
274            .map(|data| Decompressor::new(data, self.compression, vec![]))
275    }
276}
277
278fn deserialize_stripe_footer(
279    bytes: Bytes,
280    compression: Option<Compression>,
281) -> Result<StripeFooter> {
282    let mut buffer = vec![];
283    Decompressor::new(bytes, compression, vec![])
284        .read_to_end(&mut buffer)
285        .context(error::IoSnafu)?;
286    StripeFooter::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
287}
288
289fn collect_required_column_ids(columns: &[Column]) -> HashSet<u32> {
290    let mut set = HashSet::new();
291    for column in columns {
292        set.insert(column.column_id());
293        set.extend(collect_required_column_ids(&column.children()));
294    }
295    set
296}