orc_rust/
stripe.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{collections::HashMap, io::Read, sync::Arc};
19
20use bytes::Bytes;
21use prost::Message;
22use snafu::ResultExt;
23
24use crate::{
25    column::Column,
26    compression::{Compression, Decompressor},
27    error::{self, IoSnafu, Result},
28    proto::{self, stream::Kind, StripeFooter},
29    reader::{metadata::FileMetadata, ChunkReader},
30    schema::RootDataType,
31    statistics::ColumnStatistics,
32};
33
34/// Stripe metadata parsed from the file tail metadata sections.
35/// Does not contain the actual stripe bytes, as those are decoded
36/// when they are required.
37#[derive(Debug, Clone)]
38pub struct StripeMetadata {
39    /// Statistics of columns across this specific stripe
40    column_statistics: Vec<ColumnStatistics>,
41    /// Byte offset of start of stripe from start of file
42    offset: u64,
43    /// Byte length of index section
44    index_length: u64,
45    /// Byte length of data section
46    data_length: u64,
47    /// Byte length of footer section
48    footer_length: u64,
49    /// Number of rows in the stripe
50    number_of_rows: u64,
51}
52
53impl StripeMetadata {
54    pub fn offset(&self) -> u64 {
55        self.offset
56    }
57
58    pub fn index_length(&self) -> u64 {
59        self.index_length
60    }
61
62    pub fn data_length(&self) -> u64 {
63        self.data_length
64    }
65
66    pub fn footer_length(&self) -> u64 {
67        self.footer_length
68    }
69
70    pub fn number_of_rows(&self) -> u64 {
71        self.number_of_rows
72    }
73
74    pub fn column_statistics(&self) -> &[ColumnStatistics] {
75        &self.column_statistics
76    }
77
78    pub fn footer_offset(&self) -> u64 {
79        self.offset + self.index_length + self.data_length
80    }
81}
82
83impl TryFrom<(&proto::StripeInformation, &proto::StripeStatistics)> for StripeMetadata {
84    type Error = error::OrcError;
85
86    fn try_from(value: (&proto::StripeInformation, &proto::StripeStatistics)) -> Result<Self> {
87        let (info, statistics) = value;
88        let column_statistics = statistics
89            .col_stats
90            .iter()
91            .map(TryFrom::try_from)
92            .collect::<Result<Vec<_>>>()?;
93        Ok(Self {
94            column_statistics,
95            offset: info.offset(),
96            index_length: info.index_length(),
97            data_length: info.data_length(),
98            footer_length: info.footer_length(),
99            number_of_rows: info.number_of_rows(),
100        })
101    }
102}
103
104impl TryFrom<&proto::StripeInformation> for StripeMetadata {
105    type Error = error::OrcError;
106
107    fn try_from(value: &proto::StripeInformation) -> Result<Self> {
108        Ok(Self {
109            column_statistics: vec![],
110            offset: value.offset(),
111            index_length: value.index_length(),
112            data_length: value.data_length(),
113            footer_length: value.footer_length(),
114            number_of_rows: value.number_of_rows(),
115        })
116    }
117}
118
119#[derive(Debug)]
120pub struct Stripe {
121    columns: Vec<Column>,
122    stream_map: StreamMap,
123    number_of_rows: usize,
124    tz: Option<chrono_tz::Tz>,
125}
126
127impl Stripe {
128    pub fn new<R: ChunkReader>(
129        reader: &mut R,
130        file_metadata: &FileMetadata,
131        projected_data_type: &RootDataType,
132        info: &StripeMetadata,
133    ) -> Result<Self> {
134        let footer = reader
135            .get_bytes(info.footer_offset(), info.footer_length())
136            .context(IoSnafu)?;
137        let footer = Arc::new(deserialize_stripe_footer(
138            footer,
139            file_metadata.compression(),
140        )?);
141
142        let columns: Vec<Column> = projected_data_type
143            .children()
144            .iter()
145            .map(|col| {
146                Column::new(
147                    col.name().to_string(),
148                    col.data_type().clone(),
149                    footer.clone(),
150                )
151            })
152            .collect();
153
154        let mut stream_map = HashMap::new();
155        let mut stream_offset = info.offset();
156        for stream in &footer.streams {
157            let length = stream.length();
158            let column_id = stream.column();
159            if projected_data_type.contains_column_index(column_id as usize) {
160                let kind = stream.kind();
161                let data = reader.get_bytes(stream_offset, length).context(IoSnafu)?;
162                stream_map.insert((column_id, kind), data);
163            }
164            stream_offset += length;
165        }
166
167        let tz = footer
168            .writer_timezone
169            .as_ref()
170            // TODO: make this return error
171            .map(|a| a.parse::<chrono_tz::Tz>().unwrap());
172
173        Ok(Self {
174            columns,
175            stream_map: StreamMap {
176                inner: stream_map,
177                compression: file_metadata.compression(),
178            },
179            number_of_rows: info.number_of_rows() as usize,
180            tz,
181        })
182    }
183
184    // TODO: reduce duplication with above
185    #[cfg(feature = "async")]
186    pub async fn new_async<R: crate::reader::AsyncChunkReader>(
187        reader: &mut R,
188        file_metadata: &FileMetadata,
189        projected_data_type: &RootDataType,
190        info: &StripeMetadata,
191    ) -> Result<Self> {
192        let footer = reader
193            .get_bytes(info.footer_offset(), info.footer_length())
194            .await
195            .context(IoSnafu)?;
196        let footer = Arc::new(deserialize_stripe_footer(
197            footer,
198            file_metadata.compression(),
199        )?);
200
201        let columns: Vec<Column> = projected_data_type
202            .children()
203            .iter()
204            .map(|col| {
205                Column::new(
206                    col.name().to_string(),
207                    col.data_type().clone(),
208                    footer.clone(),
209                )
210            })
211            .collect();
212
213        let mut stream_map = HashMap::new();
214        let mut stream_offset = info.offset();
215        for stream in &footer.streams {
216            let length = stream.length();
217            let column_id = stream.column();
218            if projected_data_type.contains_column_index(column_id as usize) {
219                let kind = stream.kind();
220                let data = reader
221                    .get_bytes(stream_offset, length)
222                    .await
223                    .context(IoSnafu)?;
224                stream_map.insert((column_id, kind), data);
225            }
226
227            stream_offset += length;
228        }
229
230        let tz = footer
231            .writer_timezone
232            .as_ref()
233            // TODO: make this return error
234            .map(|a| a.parse::<chrono_tz::Tz>().unwrap());
235
236        Ok(Self {
237            columns,
238            stream_map: StreamMap {
239                inner: stream_map,
240                compression: file_metadata.compression(),
241            },
242            number_of_rows: info.number_of_rows() as usize,
243            tz,
244        })
245    }
246
247    pub fn number_of_rows(&self) -> usize {
248        self.number_of_rows
249    }
250
251    pub fn stream_map(&self) -> &StreamMap {
252        &self.stream_map
253    }
254
255    pub fn columns(&self) -> &[Column] {
256        &self.columns
257    }
258
259    pub fn writer_tz(&self) -> Option<chrono_tz::Tz> {
260        self.tz
261    }
262}
263
264#[derive(Debug)]
265pub struct StreamMap {
266    /// <(ColumnId, Kind), Bytes>
267    inner: HashMap<(u32, Kind), Bytes>,
268    compression: Option<Compression>,
269}
270
271impl StreamMap {
272    pub fn get(&self, column: &Column, kind: Kind) -> Decompressor {
273        // There is edge case where if column has no data then the stream might be omitted entirely
274        // (e.g. if there is only 1 null element, then it'll have present stream, but no data stream)
275        // See the integration::meta_data test for an example of this
276        // TODO: some better way to handle this?
277        self.get_opt(column, kind)
278            .unwrap_or_else(Decompressor::empty)
279    }
280
281    pub fn get_opt(&self, column: &Column, kind: Kind) -> Option<Decompressor> {
282        let column_id = column.column_id();
283
284        self.inner
285            .get(&(column_id, kind))
286            .cloned()
287            .map(|data| Decompressor::new(data, self.compression, vec![]))
288    }
289}
290
291fn deserialize_stripe_footer(
292    bytes: Bytes,
293    compression: Option<Compression>,
294) -> Result<StripeFooter> {
295    let mut buffer = vec![];
296    Decompressor::new(bytes, compression, vec![])
297        .read_to_end(&mut buffer)
298        .context(error::IoSnafu)?;
299    StripeFooter::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
300}