iceberg_rust/file_format/
parquet.rs

1/*!
2 * Helpers for parquet files
3*/
4
5use std::{
6    collections::{hash_map::Entry, HashMap},
7    ops::Sub,
8    sync::Arc,
9};
10
11use iceberg_rust_spec::{
12    partition::BoundPartitionField,
13    spec::{
14        manifest::{AvroMap, Content, DataFile, FileFormat},
15        partition::PartitionField,
16        schema::Schema,
17        types::Type,
18        values::{Struct, Value},
19    },
20};
21use parquet::{
22    file::{metadata::RowGroupMetaData, writer::TrackedWrite},
23    format::FileMetaData,
24    schema::types::{from_thrift, SchemaDescriptor},
25};
26use thrift::protocol::{TCompactOutputProtocol, TSerializable};
27use tracing::instrument;
28
29use crate::error::Error;
30
31/// Read datafile statistics from parquetfile
32#[instrument(name = "iceberg_rust::file_format::parquet::parquet_to_datafile", level = "debug", skip(file_metadata, schema, partition_fields), fields(
33    location = location,
34    file_size = file_size,
35    partition_field_count = partition_fields.len(),
36    has_equality_ids = equality_ids.is_some()
37))]
38pub fn parquet_to_datafile(
39    location: &str,
40    file_size: u64,
41    file_metadata: &FileMetaData,
42    schema: &Schema,
43    partition_fields: &[BoundPartitionField<'_>],
44    equality_ids: Option<&[i32]>,
45) -> Result<DataFile, Error> {
46    let mut partition = partition_fields
47        .iter()
48        .map(|field| Ok((field.name().to_owned(), None)))
49        .collect::<Result<Struct, Error>>()?;
50    let partition_fields = partition_fields
51        .iter()
52        .map(|field| {
53            Ok((
54                field.source_name().to_owned(),
55                field.partition_field().clone(),
56            ))
57        })
58        .collect::<Result<HashMap<String, PartitionField>, Error>>()?;
59    let parquet_schema = Arc::new(SchemaDescriptor::new(from_thrift(&file_metadata.schema)?));
60
61    let mut column_sizes = AvroMap(HashMap::new());
62    let mut value_counts = AvroMap(HashMap::new());
63    let mut null_value_counts = AvroMap(HashMap::new());
64    let mut distinct_counts = AvroMap(HashMap::new());
65    let mut lower_bounds: HashMap<i32, Value> = HashMap::new();
66    let mut upper_bounds: HashMap<i32, Value> = HashMap::new();
67
68    for row_group in &file_metadata.row_groups {
69        let row_group = RowGroupMetaData::from_thrift(parquet_schema.clone(), row_group.clone())?;
70
71        for column in row_group.columns() {
72            let column_name = column.column_descr().name();
73            let id = schema
74                .get_name(&column.column_path().parts().join("."))
75                .ok_or_else(|| Error::Schema(column_name.to_string(), "".to_string()))?
76                .id;
77            column_sizes
78                .entry(id)
79                .and_modify(|x| *x += column.compressed_size())
80                .or_insert(column.compressed_size());
81            value_counts
82                .entry(id)
83                .and_modify(|x| *x += row_group.num_rows())
84                .or_insert(row_group.num_rows());
85
86            if let Some(statistics) = column.statistics() {
87                if let Some(null_count) = statistics.null_count_opt() {
88                    null_value_counts
89                        .entry(id)
90                        .and_modify(|x| *x += null_count as i64)
91                        .or_insert(null_count as i64);
92                }
93
94                let data_type = &schema
95                    .fields()
96                    .get(id as usize)
97                    .ok_or_else(|| Error::Schema(column_name.to_string(), "".to_string()))?
98                    .field_type;
99
100                if let (Some(distinct_count), Some(min_bytes), Some(max_bytes)) = (
101                    statistics.distinct_count_opt(),
102                    statistics.min_bytes_opt(),
103                    statistics.max_bytes_opt(),
104                ) {
105                    let min = Value::try_from_bytes(min_bytes, data_type)?;
106                    let max = Value::try_from_bytes(max_bytes, data_type)?;
107                    let current_min = lower_bounds.get(&id);
108                    let current_max = upper_bounds.get(&id);
109                    match (min, max, current_min, current_max) {
110                        (
111                            Value::Int(min),
112                            Value::Int(max),
113                            Some(Value::Int(current_min)),
114                            Some(Value::Int(current_max)),
115                        ) => {
116                            distinct_counts
117                                .entry(id)
118                                .and_modify(|x| {
119                                    *x += estimate_distinct_count(
120                                        &[current_min, current_max],
121                                        &[&min, &max],
122                                        *x,
123                                        distinct_count as i64,
124                                    );
125                                })
126                                .or_insert(distinct_count as i64);
127                        }
128                        (
129                            Value::LongInt(min),
130                            Value::LongInt(max),
131                            Some(Value::LongInt(current_min)),
132                            Some(Value::LongInt(current_max)),
133                        ) => {
134                            distinct_counts
135                                .entry(id)
136                                .and_modify(|x| {
137                                    *x += estimate_distinct_count(
138                                        &[current_min, current_max],
139                                        &[&min, &max],
140                                        *x,
141                                        distinct_count as i64,
142                                    );
143                                })
144                                .or_insert(distinct_count as i64);
145                        }
146                        (_, _, None, None) => {
147                            distinct_counts.entry(id).or_insert(distinct_count as i64);
148                        }
149                        _ => (),
150                    }
151                }
152
153                if let Some(min_bytes) = statistics.min_bytes_opt() {
154                    if let Type::Primitive(_) = &data_type {
155                        let new = Value::try_from_bytes(min_bytes, data_type)?;
156                        match lower_bounds.entry(id) {
157                            Entry::Occupied(mut entry) => {
158                                let entry = entry.get_mut();
159                                match (&entry, &new) {
160                                    (Value::Int(current), Value::Int(new_val)) => {
161                                        if *current > *new_val {
162                                            *entry = new
163                                        }
164                                    }
165                                    (Value::LongInt(current), Value::LongInt(new_val)) => {
166                                        if *current > *new_val {
167                                            *entry = new
168                                        }
169                                    }
170                                    (Value::Float(current), Value::Float(new_val)) => {
171                                        if *current > *new_val {
172                                            *entry = new
173                                        }
174                                    }
175                                    (Value::Double(current), Value::Double(new_val)) => {
176                                        if *current > *new_val {
177                                            *entry = new
178                                        }
179                                    }
180                                    (Value::Date(current), Value::Date(new_val)) => {
181                                        if *current > *new_val {
182                                            *entry = new
183                                        }
184                                    }
185                                    (Value::Time(current), Value::Time(new_val)) => {
186                                        if *current > *new_val {
187                                            *entry = new
188                                        }
189                                    }
190                                    (Value::Timestamp(current), Value::Timestamp(new_val)) => {
191                                        if *current > *new_val {
192                                            *entry = new
193                                        }
194                                    }
195                                    (Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => {
196                                        if *current > *new_val {
197                                            *entry = new
198                                        }
199                                    }
200                                    _ => (),
201                                }
202                            }
203                            Entry::Vacant(entry) => {
204                                entry.insert(new);
205                            }
206                        }
207                    }
208                }
209                if let Some(max_bytes) = statistics.max_bytes_opt() {
210                    if let Type::Primitive(_) = &data_type {
211                        let new = Value::try_from_bytes(max_bytes, data_type)?;
212                        match upper_bounds.entry(id) {
213                            Entry::Occupied(mut entry) => {
214                                let entry = entry.get_mut();
215                                match (&entry, &new) {
216                                    (Value::Int(current), Value::Int(new_val)) => {
217                                        if *current < *new_val {
218                                            *entry = new
219                                        }
220                                    }
221                                    (Value::LongInt(current), Value::LongInt(new_val)) => {
222                                        if *current < *new_val {
223                                            *entry = new
224                                        }
225                                    }
226                                    (Value::Float(current), Value::Float(new_val)) => {
227                                        if *current < *new_val {
228                                            *entry = new
229                                        }
230                                    }
231                                    (Value::Double(current), Value::Double(new_val)) => {
232                                        if *current < *new_val {
233                                            *entry = new
234                                        }
235                                    }
236                                    (Value::Date(current), Value::Date(new_val)) => {
237                                        if *current < *new_val {
238                                            *entry = new
239                                        }
240                                    }
241                                    (Value::Time(current), Value::Time(new_val)) => {
242                                        if *current < *new_val {
243                                            *entry = new
244                                        }
245                                    }
246                                    (Value::Timestamp(current), Value::Timestamp(new_val)) => {
247                                        if *current < *new_val {
248                                            *entry = new
249                                        }
250                                    }
251                                    (Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => {
252                                        if *current < *new_val {
253                                            *entry = new
254                                        }
255                                    }
256                                    _ => (),
257                                }
258                            }
259                            Entry::Vacant(entry) => {
260                                entry.insert(new);
261                            }
262                        }
263                    }
264                }
265
266                if let Some(partition_field) = partition_fields.get(column_name) {
267                    if let Some(partition_value) = partition.get_mut(partition_field.name()) {
268                        if partition_value.is_none() {
269                            let partition_field = partition_fields
270                                .get(column_name)
271                                .ok_or_else(|| Error::InvalidFormat("transform".to_string()))?;
272                            if let (Some(min_bytes), Some(max_bytes)) =
273                                (statistics.min_bytes_opt(), statistics.max_bytes_opt())
274                            {
275                                let min = Value::try_from_bytes(min_bytes, data_type)?
276                                    .transform(partition_field.transform())?;
277                                let max = Value::try_from_bytes(max_bytes, data_type)?
278                                    .transform(partition_field.transform())?;
279                                if min == max {
280                                    *partition_value = Some(min)
281                                } else {
282                                    return Err(Error::InvalidFormat(
283                                        "Partition value of data file".to_owned(),
284                                    ));
285                                }
286                            }
287                        }
288                    }
289                }
290            }
291        }
292    }
293    let mut builder = DataFile::builder();
294    builder
295        .with_content(if equality_ids.is_none() {
296            Content::Data
297        } else {
298            Content::EqualityDeletes
299        })
300        .with_file_path(location.to_string())
301        .with_file_format(FileFormat::Parquet)
302        .with_partition(partition)
303        .with_record_count(file_metadata.num_rows)
304        .with_file_size_in_bytes(file_size as i64)
305        .with_column_sizes(Some(column_sizes))
306        .with_value_counts(Some(value_counts))
307        .with_null_value_counts(Some(null_value_counts))
308        .with_nan_value_counts(None)
309        .with_lower_bounds(Some(lower_bounds))
310        .with_upper_bounds(Some(upper_bounds));
311
312    if let Some(equality_ids) = equality_ids {
313        builder.with_equality_ids(Some(equality_ids.to_vec()));
314    }
315
316    let content = builder.build()?;
317    Ok(content)
318}
319
320/// Get parquet metadata size
321pub fn thrift_size<T: TSerializable>(metadata: &T) -> Result<usize, Error> {
322    let mut buffer = TrackedWrite::new(Vec::<u8>::new());
323    let mut protocol = TCompactOutputProtocol::new(&mut buffer);
324    metadata.write_to_out_protocol(&mut protocol)?;
325    Ok(buffer.bytes_written())
326}
327
328fn range_overlap<T: Ord + Sub + Copy>(
329    old_range: &[&T; 2],
330    new_range: &[&T; 2],
331) -> <T as Sub>::Output {
332    let overlap_start = (*old_range[0]).max(*new_range[0]);
333    let overlap_end = (*old_range[1]).min(*new_range[1]);
334    overlap_end - overlap_start
335}
336
337/// Helper trait to convert numeric types to f64 for statistical calculations.
338///
339/// This trait provides a uniform interface for converting integer types to f64,
340/// which is necessary for the statistical estimation algorithms. The conversion
341/// may be lossy for very large i64 values (beyond 2^53), but this is acceptable
342/// for statistical approximations.
343pub trait ToF64 {
344    /// Converts the value to f64.
345    ///
346    /// # Note
347    ///
348    /// For i64 values larger than 2^53, precision may be lost in the conversion.
349    /// This is acceptable for statistical calculations where exact precision is
350    /// not required.
351    fn to_f64(self) -> f64;
352}
353
354impl ToF64 for i32 {
355    fn to_f64(self) -> f64 {
356        self as f64
357    }
358}
359
360impl ToF64 for i64 {
361    fn to_f64(self) -> f64 {
362        self as f64
363    }
364}
365
366/// Estimates the number of new distinct values when merging two sets of statistics.
367///
368/// This function assumes uniform distribution of distinct values within their respective ranges
369/// and uses an independence approximation to estimate overlap probability.
370///
371/// # Algorithm
372///
373/// The estimation is split into two parts:
374/// 1. **Non-overlapping region**: All values in the new range that fall outside the old range
375///    are guaranteed to be new.
376/// 2. **Overlapping region**: Uses the independence approximation:
377///    - P(specific value not covered) = ((R-1)/R)^k
378///    - where R is the overlap size and k is the expected number of old values in the overlap
379///    - Expected new values = n2_overlap × P(not covered)
380///
381/// # Parameters
382///
383/// * `old_range` - [min, max] of the existing value range
384/// * `new_range` - [min, max] of the new value range
385/// * `old_distinct_count` - Number of distinct values in the old range
386/// * `new_distinct_count` - Number of distinct values in the new range
387///
388/// # Returns
389///
390/// Estimated number of new distinct values to add to the running total
391///
392/// # Example
393///
394/// ```ignore
395/// // Old range [0, 1000] with 100 distinct values
396/// // New range [500, 1500] with 50 distinct values
397/// let new_count = estimate_distinct_count(&[&0, &1000], &[&500, &1500], 100, 50);
398/// ```
399pub fn estimate_distinct_count<T>(
400    old_range: &[&T; 2],
401    new_range: &[&T; 2],
402    old_distinct_count: i64,
403    new_distinct_count: i64,
404) -> i64
405where
406    T: Ord + Sub<Output = T> + Copy + Default + ToF64,
407{
408    let new_range_size = (*new_range[1] - *new_range[0]).to_f64();
409    let current_range_size = (*old_range[1] - *old_range[0]).to_f64();
410    let overlap = range_overlap(old_range, new_range);
411    let overlap_size: f64 = if overlap >= T::default() {
412        overlap.to_f64()
413    } else {
414        0.0
415    };
416    let n2 = new_distinct_count as f64;
417    let n1 = old_distinct_count as f64;
418
419    // Values outside overlap are definitely new
420    let outside_overlap = ((new_range_size - overlap_size) / new_range_size * n2).max(0.0);
421
422    // For overlap region: estimate how many new values exist
423    // using independence approximation: P(value not covered) = ((R-1)/R)^k
424    // Expected new values in overlap = n2_overlap * ((R-1)/R)^(n1_overlap)
425    let n2_overlap = (overlap_size / new_range_size * n2).max(0.0);
426    let expected_n1_in_overlap = (overlap_size / current_range_size * n1).max(0.0);
427
428    let new_in_overlap = if overlap_size > 0.0 {
429        let prob_not_covered = ((overlap_size - 1.0) / overlap_size).powf(expected_n1_in_overlap);
430        n2_overlap * prob_not_covered
431    } else {
432        0.0
433    };
434
435    (outside_overlap + new_in_overlap).round() as i64
436}