iceberg_rust/file_format/
parquet.rs

1/*!
2 * Helpers for parquet files
3*/
4
5use std::{
6    collections::{hash_map::Entry, HashMap},
7    sync::Arc,
8};
9
10use iceberg_rust_spec::{
11    partition::BoundPartitionField,
12    spec::{
13        manifest::{AvroMap, Content, DataFile, FileFormat},
14        partition::PartitionField,
15        schema::Schema,
16        types::Type,
17        values::{Struct, Value},
18    },
19};
20use parquet::{
21    file::{metadata::RowGroupMetaData, writer::TrackedWrite},
22    format::FileMetaData,
23    schema::types::{from_thrift, SchemaDescriptor},
24};
25use thrift::protocol::{TCompactOutputProtocol, TSerializable};
26
27use crate::error::Error;
28
29/// Read datafile statistics from parquetfile
30pub fn parquet_to_datafile(
31    location: &str,
32    file_size: u64,
33    file_metadata: &FileMetaData,
34    schema: &Schema,
35    partition_fields: &[BoundPartitionField<'_>],
36    equality_ids: Option<&[i32]>,
37) -> Result<DataFile, Error> {
38    let mut partition = partition_fields
39        .iter()
40        .map(|field| Ok((field.name().to_owned(), None)))
41        .collect::<Result<Struct, Error>>()?;
42    let partition_fields = partition_fields
43        .iter()
44        .map(|field| {
45            Ok((
46                field.source_name().to_owned(),
47                field.partition_field().clone(),
48            ))
49        })
50        .collect::<Result<HashMap<String, PartitionField>, Error>>()?;
51    let parquet_schema = Arc::new(SchemaDescriptor::new(from_thrift(&file_metadata.schema)?));
52
53    let mut column_sizes = AvroMap(HashMap::new());
54    let mut value_counts = AvroMap(HashMap::new());
55    let mut null_value_counts = AvroMap(HashMap::new());
56    let mut distinct_counts = AvroMap(HashMap::new());
57    let mut lower_bounds: HashMap<i32, Value> = HashMap::new();
58    let mut upper_bounds: HashMap<i32, Value> = HashMap::new();
59
60    for row_group in &file_metadata.row_groups {
61        let row_group = RowGroupMetaData::from_thrift(parquet_schema.clone(), row_group.clone())?;
62
63        for column in row_group.columns() {
64            let column_name = column.column_descr().name();
65            let id = schema
66                .get_name(column_name)
67                .ok_or_else(|| Error::Schema(column_name.to_string(), "".to_string()))?
68                .id;
69            column_sizes
70                .entry(id)
71                .and_modify(|x| *x += column.compressed_size())
72                .or_insert(column.compressed_size());
73            value_counts
74                .entry(id)
75                .and_modify(|x| *x += row_group.num_rows())
76                .or_insert(row_group.num_rows());
77
78            if let Some(statistics) = column.statistics() {
79                if let Some(null_count) = statistics.null_count_opt() {
80                    null_value_counts
81                        .entry(id)
82                        .and_modify(|x| *x += null_count as i64)
83                        .or_insert(null_count as i64);
84                }
85                if let Some(distinct_count) = statistics.distinct_count_opt() {
86                    distinct_counts
87                        .entry(id)
88                        .and_modify(|x| *x += distinct_count as i64)
89                        .or_insert(distinct_count as i64);
90                }
91                let data_type = &schema
92                    .fields()
93                    .get(id as usize)
94                    .ok_or_else(|| Error::Schema(column_name.to_string(), "".to_string()))?
95                    .field_type;
96
97                if let Some(min_bytes) = statistics.min_bytes_opt() {
98                    if let Type::Primitive(_) = &data_type {
99                        let new = Value::try_from_bytes(min_bytes, data_type)?;
100                        match lower_bounds.entry(id) {
101                            Entry::Occupied(mut entry) => {
102                                let entry = entry.get_mut();
103                                match (&entry, &new) {
104                                    (Value::Int(current), Value::Int(new_val)) => {
105                                        if *current > *new_val {
106                                            *entry = new
107                                        }
108                                    }
109                                    (Value::LongInt(current), Value::LongInt(new_val)) => {
110                                        if *current > *new_val {
111                                            *entry = new
112                                        }
113                                    }
114                                    (Value::Float(current), Value::Float(new_val)) => {
115                                        if *current > *new_val {
116                                            *entry = new
117                                        }
118                                    }
119                                    (Value::Double(current), Value::Double(new_val)) => {
120                                        if *current > *new_val {
121                                            *entry = new
122                                        }
123                                    }
124                                    (Value::Date(current), Value::Date(new_val)) => {
125                                        if *current > *new_val {
126                                            *entry = new
127                                        }
128                                    }
129                                    (Value::Time(current), Value::Time(new_val)) => {
130                                        if *current > *new_val {
131                                            *entry = new
132                                        }
133                                    }
134                                    (Value::Timestamp(current), Value::Timestamp(new_val)) => {
135                                        if *current > *new_val {
136                                            *entry = new
137                                        }
138                                    }
139                                    (Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => {
140                                        if *current > *new_val {
141                                            *entry = new
142                                        }
143                                    }
144                                    _ => (),
145                                }
146                            }
147                            Entry::Vacant(entry) => {
148                                entry.insert(new);
149                            }
150                        }
151                    }
152                    if let Some(max_bytes) = statistics.max_bytes_opt() {
153                        let new = Value::try_from_bytes(max_bytes, data_type)?;
154                        match upper_bounds.entry(id) {
155                            Entry::Occupied(mut entry) => {
156                                let entry = entry.get_mut();
157                                match (&entry, &new) {
158                                    (Value::Int(current), Value::Int(new_val)) => {
159                                        if *current < *new_val {
160                                            *entry = new
161                                        }
162                                    }
163                                    (Value::LongInt(current), Value::LongInt(new_val)) => {
164                                        if *current < *new_val {
165                                            *entry = new
166                                        }
167                                    }
168                                    (Value::Float(current), Value::Float(new_val)) => {
169                                        if *current < *new_val {
170                                            *entry = new
171                                        }
172                                    }
173                                    (Value::Double(current), Value::Double(new_val)) => {
174                                        if *current < *new_val {
175                                            *entry = new
176                                        }
177                                    }
178                                    (Value::Date(current), Value::Date(new_val)) => {
179                                        if *current < *new_val {
180                                            *entry = new
181                                        }
182                                    }
183                                    (Value::Time(current), Value::Time(new_val)) => {
184                                        if *current < *new_val {
185                                            *entry = new
186                                        }
187                                    }
188                                    (Value::Timestamp(current), Value::Timestamp(new_val)) => {
189                                        if *current < *new_val {
190                                            *entry = new
191                                        }
192                                    }
193                                    (Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => {
194                                        if *current < *new_val {
195                                            *entry = new
196                                        }
197                                    }
198                                    _ => (),
199                                }
200                            }
201                            Entry::Vacant(entry) => {
202                                entry.insert(new);
203                            }
204                        }
205
206                        if let Some(partition_field) = partition_fields.get(column_name) {
207                            if let Some(partition_value) = partition.get_mut(partition_field.name())
208                            {
209                                if partition_value.is_none() {
210                                    let partition_field =
211                                        partition_fields.get(column_name).ok_or_else(|| {
212                                            Error::InvalidFormat("transform".to_string())
213                                        })?;
214                                    if let (Some(min_bytes), Some(max_bytes)) =
215                                        (statistics.min_bytes_opt(), statistics.max_bytes_opt())
216                                    {
217                                        let min = Value::try_from_bytes(min_bytes, data_type)?
218                                            .transform(partition_field.transform())?;
219                                        let max = Value::try_from_bytes(max_bytes, data_type)?
220                                            .transform(partition_field.transform())?;
221                                        if min == max {
222                                            *partition_value = Some(min)
223                                        } else {
224                                            return Err(Error::InvalidFormat(
225                                                "Partition value of data file".to_owned(),
226                                            ));
227                                        }
228                                    }
229                                }
230                            }
231                        }
232                    }
233                }
234            }
235        }
236    }
237    let mut builder = DataFile::builder();
238    builder
239        .with_content(if equality_ids.is_none() {
240            Content::Data
241        } else {
242            Content::EqualityDeletes
243        })
244        .with_file_path(location.to_string())
245        .with_file_format(FileFormat::Parquet)
246        .with_partition(partition)
247        .with_record_count(file_metadata.num_rows)
248        .with_file_size_in_bytes(file_size as i64)
249        .with_column_sizes(Some(column_sizes))
250        .with_value_counts(Some(value_counts))
251        .with_null_value_counts(Some(null_value_counts))
252        .with_nan_value_counts(None)
253        .with_lower_bounds(Some(lower_bounds))
254        .with_upper_bounds(Some(upper_bounds));
255
256    if let Some(equality_ids) = equality_ids {
257        builder.with_equality_ids(Some(equality_ids.to_vec()));
258    }
259
260    let content = builder
261        .build()
262        .map_err(iceberg_rust_spec::error::Error::from)?;
263    Ok(content)
264}
265
266/// Get parquet metadata size
267pub fn thrift_size<T: TSerializable>(metadata: &T) -> Result<usize, Error> {
268    let mut buffer = TrackedWrite::new(Vec::<u8>::new());
269    let mut protocol = TCompactOutputProtocol::new(&mut buffer);
270    metadata.write_to_out_protocol(&mut protocol)?;
271    Ok(buffer.bytes_written())
272}