iceberg_rust/file_format/
parquet.rs1use std::{
6 collections::{hash_map::Entry, HashMap},
7 ops::Sub,
8 sync::Arc,
9};
10
11use iceberg_rust_spec::{
12 partition::BoundPartitionField,
13 spec::{
14 manifest::{AvroMap, Content, DataFile, FileFormat},
15 partition::PartitionField,
16 schema::Schema,
17 types::Type,
18 values::{Struct, Value},
19 },
20};
21use parquet::{
22 file::{metadata::RowGroupMetaData, writer::TrackedWrite},
23 format::FileMetaData,
24 schema::types::{from_thrift, SchemaDescriptor},
25};
26use thrift::protocol::{TCompactOutputProtocol, TSerializable};
27use tracing::instrument;
28
29use crate::error::Error;
30
31#[instrument(name = "iceberg_rust::file_format::parquet::parquet_to_datafile", level = "debug", skip(file_metadata, schema, partition_fields), fields(
33 location = location,
34 file_size = file_size,
35 partition_field_count = partition_fields.len(),
36 has_equality_ids = equality_ids.is_some()
37))]
38pub fn parquet_to_datafile(
39 location: &str,
40 file_size: u64,
41 file_metadata: &FileMetaData,
42 schema: &Schema,
43 partition_fields: &[BoundPartitionField<'_>],
44 equality_ids: Option<&[i32]>,
45) -> Result<DataFile, Error> {
46 let mut partition = partition_fields
47 .iter()
48 .map(|field| Ok((field.name().to_owned(), None)))
49 .collect::<Result<Struct, Error>>()?;
50 let partition_fields = partition_fields
51 .iter()
52 .map(|field| {
53 Ok((
54 field.source_name().to_owned(),
55 field.partition_field().clone(),
56 ))
57 })
58 .collect::<Result<HashMap<String, PartitionField>, Error>>()?;
59 let parquet_schema = Arc::new(SchemaDescriptor::new(from_thrift(&file_metadata.schema)?));
60
61 let mut column_sizes = AvroMap(HashMap::new());
62 let mut value_counts = AvroMap(HashMap::new());
63 let mut null_value_counts = AvroMap(HashMap::new());
64 let mut distinct_counts = AvroMap(HashMap::new());
65 let mut lower_bounds: HashMap<i32, Value> = HashMap::new();
66 let mut upper_bounds: HashMap<i32, Value> = HashMap::new();
67
68 for row_group in &file_metadata.row_groups {
69 let row_group = RowGroupMetaData::from_thrift(parquet_schema.clone(), row_group.clone())?;
70
71 for column in row_group.columns() {
72 let column_name = column.column_descr().name();
73 let id = schema
74 .get_name(&column.column_path().parts().join("."))
75 .ok_or_else(|| Error::Schema(column_name.to_string(), "".to_string()))?
76 .id;
77 column_sizes
78 .entry(id)
79 .and_modify(|x| *x += column.compressed_size())
80 .or_insert(column.compressed_size());
81 value_counts
82 .entry(id)
83 .and_modify(|x| *x += row_group.num_rows())
84 .or_insert(row_group.num_rows());
85
86 if let Some(statistics) = column.statistics() {
87 if let Some(null_count) = statistics.null_count_opt() {
88 null_value_counts
89 .entry(id)
90 .and_modify(|x| *x += null_count as i64)
91 .or_insert(null_count as i64);
92 }
93
94 let data_type = &schema
95 .fields()
96 .get(id as usize)
97 .ok_or_else(|| Error::Schema(column_name.to_string(), "".to_string()))?
98 .field_type;
99
100 if let (Some(distinct_count), Some(min_bytes), Some(max_bytes)) = (
101 statistics.distinct_count_opt(),
102 statistics.min_bytes_opt(),
103 statistics.max_bytes_opt(),
104 ) {
105 let min = Value::try_from_bytes(min_bytes, data_type)?;
106 let max = Value::try_from_bytes(max_bytes, data_type)?;
107 let current_min = lower_bounds.get(&id);
108 let current_max = upper_bounds.get(&id);
109 match (min, max, current_min, current_max) {
110 (
111 Value::Int(min),
112 Value::Int(max),
113 Some(Value::Int(current_min)),
114 Some(Value::Int(current_max)),
115 ) => {
116 distinct_counts
117 .entry(id)
118 .and_modify(|x| {
119 *x += estimate_distinct_count(
120 &[current_min, current_max],
121 &[&min, &max],
122 *x,
123 distinct_count as i64,
124 );
125 })
126 .or_insert(distinct_count as i64);
127 }
128 (
129 Value::LongInt(min),
130 Value::LongInt(max),
131 Some(Value::LongInt(current_min)),
132 Some(Value::LongInt(current_max)),
133 ) => {
134 distinct_counts
135 .entry(id)
136 .and_modify(|x| {
137 *x += estimate_distinct_count(
138 &[current_min, current_max],
139 &[&min, &max],
140 *x,
141 distinct_count as i64,
142 );
143 })
144 .or_insert(distinct_count as i64);
145 }
146 (_, _, None, None) => {
147 distinct_counts.entry(id).or_insert(distinct_count as i64);
148 }
149 _ => (),
150 }
151 }
152
153 if let Some(min_bytes) = statistics.min_bytes_opt() {
154 if let Type::Primitive(_) = &data_type {
155 let new = Value::try_from_bytes(min_bytes, data_type)?;
156 match lower_bounds.entry(id) {
157 Entry::Occupied(mut entry) => {
158 let entry = entry.get_mut();
159 match (&entry, &new) {
160 (Value::Int(current), Value::Int(new_val)) => {
161 if *current > *new_val {
162 *entry = new
163 }
164 }
165 (Value::LongInt(current), Value::LongInt(new_val)) => {
166 if *current > *new_val {
167 *entry = new
168 }
169 }
170 (Value::Float(current), Value::Float(new_val)) => {
171 if *current > *new_val {
172 *entry = new
173 }
174 }
175 (Value::Double(current), Value::Double(new_val)) => {
176 if *current > *new_val {
177 *entry = new
178 }
179 }
180 (Value::Date(current), Value::Date(new_val)) => {
181 if *current > *new_val {
182 *entry = new
183 }
184 }
185 (Value::Time(current), Value::Time(new_val)) => {
186 if *current > *new_val {
187 *entry = new
188 }
189 }
190 (Value::Timestamp(current), Value::Timestamp(new_val)) => {
191 if *current > *new_val {
192 *entry = new
193 }
194 }
195 (Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => {
196 if *current > *new_val {
197 *entry = new
198 }
199 }
200 _ => (),
201 }
202 }
203 Entry::Vacant(entry) => {
204 entry.insert(new);
205 }
206 }
207 }
208 }
209 if let Some(max_bytes) = statistics.max_bytes_opt() {
210 if let Type::Primitive(_) = &data_type {
211 let new = Value::try_from_bytes(max_bytes, data_type)?;
212 match upper_bounds.entry(id) {
213 Entry::Occupied(mut entry) => {
214 let entry = entry.get_mut();
215 match (&entry, &new) {
216 (Value::Int(current), Value::Int(new_val)) => {
217 if *current < *new_val {
218 *entry = new
219 }
220 }
221 (Value::LongInt(current), Value::LongInt(new_val)) => {
222 if *current < *new_val {
223 *entry = new
224 }
225 }
226 (Value::Float(current), Value::Float(new_val)) => {
227 if *current < *new_val {
228 *entry = new
229 }
230 }
231 (Value::Double(current), Value::Double(new_val)) => {
232 if *current < *new_val {
233 *entry = new
234 }
235 }
236 (Value::Date(current), Value::Date(new_val)) => {
237 if *current < *new_val {
238 *entry = new
239 }
240 }
241 (Value::Time(current), Value::Time(new_val)) => {
242 if *current < *new_val {
243 *entry = new
244 }
245 }
246 (Value::Timestamp(current), Value::Timestamp(new_val)) => {
247 if *current < *new_val {
248 *entry = new
249 }
250 }
251 (Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => {
252 if *current < *new_val {
253 *entry = new
254 }
255 }
256 _ => (),
257 }
258 }
259 Entry::Vacant(entry) => {
260 entry.insert(new);
261 }
262 }
263 }
264 }
265
266 if let Some(partition_field) = partition_fields.get(column_name) {
267 if let Some(partition_value) = partition.get_mut(partition_field.name()) {
268 if partition_value.is_none() {
269 let partition_field = partition_fields
270 .get(column_name)
271 .ok_or_else(|| Error::InvalidFormat("transform".to_string()))?;
272 if let (Some(min_bytes), Some(max_bytes)) =
273 (statistics.min_bytes_opt(), statistics.max_bytes_opt())
274 {
275 let min = Value::try_from_bytes(min_bytes, data_type)?
276 .transform(partition_field.transform())?;
277 let max = Value::try_from_bytes(max_bytes, data_type)?
278 .transform(partition_field.transform())?;
279 if min == max {
280 *partition_value = Some(min)
281 } else {
282 return Err(Error::InvalidFormat(
283 "Partition value of data file".to_owned(),
284 ));
285 }
286 }
287 }
288 }
289 }
290 }
291 }
292 }
293 let mut builder = DataFile::builder();
294 builder
295 .with_content(if equality_ids.is_none() {
296 Content::Data
297 } else {
298 Content::EqualityDeletes
299 })
300 .with_file_path(location.to_string())
301 .with_file_format(FileFormat::Parquet)
302 .with_partition(partition)
303 .with_record_count(file_metadata.num_rows)
304 .with_file_size_in_bytes(file_size as i64)
305 .with_column_sizes(Some(column_sizes))
306 .with_value_counts(Some(value_counts))
307 .with_null_value_counts(Some(null_value_counts))
308 .with_nan_value_counts(None)
309 .with_lower_bounds(Some(lower_bounds))
310 .with_upper_bounds(Some(upper_bounds));
311
312 if let Some(equality_ids) = equality_ids {
313 builder.with_equality_ids(Some(equality_ids.to_vec()));
314 }
315
316 let content = builder.build()?;
317 Ok(content)
318}
319
320pub fn thrift_size<T: TSerializable>(metadata: &T) -> Result<usize, Error> {
322 let mut buffer = TrackedWrite::new(Vec::<u8>::new());
323 let mut protocol = TCompactOutputProtocol::new(&mut buffer);
324 metadata.write_to_out_protocol(&mut protocol)?;
325 Ok(buffer.bytes_written())
326}
327
328fn range_overlap<T: Ord + Sub + Copy>(
329 old_range: &[&T; 2],
330 new_range: &[&T; 2],
331) -> <T as Sub>::Output {
332 let overlap_start = (*old_range[0]).max(*new_range[0]);
333 let overlap_end = (*old_range[1]).min(*new_range[1]);
334 overlap_end - overlap_start
335}
336
337pub trait ToF64 {
344 fn to_f64(self) -> f64;
352}
353
354impl ToF64 for i32 {
355 fn to_f64(self) -> f64 {
356 self as f64
357 }
358}
359
360impl ToF64 for i64 {
361 fn to_f64(self) -> f64 {
362 self as f64
363 }
364}
365
366pub fn estimate_distinct_count<T>(
400 old_range: &[&T; 2],
401 new_range: &[&T; 2],
402 old_distinct_count: i64,
403 new_distinct_count: i64,
404) -> i64
405where
406 T: Ord + Sub<Output = T> + Copy + Default + ToF64,
407{
408 let new_range_size = (*new_range[1] - *new_range[0]).to_f64();
409 let current_range_size = (*old_range[1] - *old_range[0]).to_f64();
410 let overlap = range_overlap(old_range, new_range);
411 let overlap_size: f64 = if overlap >= T::default() {
412 overlap.to_f64()
413 } else {
414 0.0
415 };
416 let n2 = new_distinct_count as f64;
417 let n1 = old_distinct_count as f64;
418
419 let outside_overlap = ((new_range_size - overlap_size) / new_range_size * n2).max(0.0);
421
422 let n2_overlap = (overlap_size / new_range_size * n2).max(0.0);
426 let expected_n1_in_overlap = (overlap_size / current_range_size * n1).max(0.0);
427
428 let new_in_overlap = if overlap_size > 0.0 {
429 let prob_not_covered = ((overlap_size - 1.0) / overlap_size).powf(expected_n1_in_overlap);
430 n2_overlap * prob_not_covered
431 } else {
432 0.0
433 };
434
435 (outside_overlap + new_in_overlap).round() as i64
436}