iceberg_rust/file_format/
parquet.rs1use std::{
6 collections::{hash_map::Entry, HashMap},
7 sync::Arc,
8};
9
10use iceberg_rust_spec::{
11 partition::BoundPartitionField,
12 spec::{
13 manifest::{AvroMap, Content, DataFile, FileFormat},
14 partition::PartitionField,
15 schema::Schema,
16 types::Type,
17 values::{Struct, Value},
18 },
19};
20use parquet::{
21 file::{metadata::RowGroupMetaData, writer::TrackedWrite},
22 format::FileMetaData,
23 schema::types::{from_thrift, SchemaDescriptor},
24};
25use thrift::protocol::{TCompactOutputProtocol, TSerializable};
26
27use crate::error::Error;
28
29pub fn parquet_to_datafile(
31 location: &str,
32 file_size: u64,
33 file_metadata: &FileMetaData,
34 schema: &Schema,
35 partition_fields: &[BoundPartitionField<'_>],
36 equality_ids: Option<&[i32]>,
37) -> Result<DataFile, Error> {
38 let mut partition = partition_fields
39 .iter()
40 .map(|field| Ok((field.name().to_owned(), None)))
41 .collect::<Result<Struct, Error>>()?;
42 let partition_fields = partition_fields
43 .iter()
44 .map(|field| {
45 Ok((
46 field.source_name().to_owned(),
47 field.partition_field().clone(),
48 ))
49 })
50 .collect::<Result<HashMap<String, PartitionField>, Error>>()?;
51 let parquet_schema = Arc::new(SchemaDescriptor::new(from_thrift(&file_metadata.schema)?));
52
53 let mut column_sizes = AvroMap(HashMap::new());
54 let mut value_counts = AvroMap(HashMap::new());
55 let mut null_value_counts = AvroMap(HashMap::new());
56 let mut distinct_counts = AvroMap(HashMap::new());
57 let mut lower_bounds: HashMap<i32, Value> = HashMap::new();
58 let mut upper_bounds: HashMap<i32, Value> = HashMap::new();
59
60 for row_group in &file_metadata.row_groups {
61 let row_group = RowGroupMetaData::from_thrift(parquet_schema.clone(), row_group.clone())?;
62
63 for column in row_group.columns() {
64 let column_name = column.column_descr().name();
65 let id = schema
66 .get_name(column_name)
67 .ok_or_else(|| Error::Schema(column_name.to_string(), "".to_string()))?
68 .id;
69 column_sizes
70 .entry(id)
71 .and_modify(|x| *x += column.compressed_size())
72 .or_insert(column.compressed_size());
73 value_counts
74 .entry(id)
75 .and_modify(|x| *x += row_group.num_rows())
76 .or_insert(row_group.num_rows());
77
78 if let Some(statistics) = column.statistics() {
79 if let Some(null_count) = statistics.null_count_opt() {
80 null_value_counts
81 .entry(id)
82 .and_modify(|x| *x += null_count as i64)
83 .or_insert(null_count as i64);
84 }
85 if let Some(distinct_count) = statistics.distinct_count_opt() {
86 distinct_counts
87 .entry(id)
88 .and_modify(|x| *x += distinct_count as i64)
89 .or_insert(distinct_count as i64);
90 }
91 let data_type = &schema
92 .fields()
93 .get(id as usize)
94 .ok_or_else(|| Error::Schema(column_name.to_string(), "".to_string()))?
95 .field_type;
96
97 if let Some(min_bytes) = statistics.min_bytes_opt() {
98 if let Type::Primitive(_) = &data_type {
99 let new = Value::try_from_bytes(min_bytes, data_type)?;
100 match lower_bounds.entry(id) {
101 Entry::Occupied(mut entry) => {
102 let entry = entry.get_mut();
103 match (&entry, &new) {
104 (Value::Int(current), Value::Int(new_val)) => {
105 if *current > *new_val {
106 *entry = new
107 }
108 }
109 (Value::LongInt(current), Value::LongInt(new_val)) => {
110 if *current > *new_val {
111 *entry = new
112 }
113 }
114 (Value::Float(current), Value::Float(new_val)) => {
115 if *current > *new_val {
116 *entry = new
117 }
118 }
119 (Value::Double(current), Value::Double(new_val)) => {
120 if *current > *new_val {
121 *entry = new
122 }
123 }
124 (Value::Date(current), Value::Date(new_val)) => {
125 if *current > *new_val {
126 *entry = new
127 }
128 }
129 (Value::Time(current), Value::Time(new_val)) => {
130 if *current > *new_val {
131 *entry = new
132 }
133 }
134 (Value::Timestamp(current), Value::Timestamp(new_val)) => {
135 if *current > *new_val {
136 *entry = new
137 }
138 }
139 (Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => {
140 if *current > *new_val {
141 *entry = new
142 }
143 }
144 _ => (),
145 }
146 }
147 Entry::Vacant(entry) => {
148 entry.insert(new);
149 }
150 }
151 }
152 if let Some(max_bytes) = statistics.max_bytes_opt() {
153 let new = Value::try_from_bytes(max_bytes, data_type)?;
154 match upper_bounds.entry(id) {
155 Entry::Occupied(mut entry) => {
156 let entry = entry.get_mut();
157 match (&entry, &new) {
158 (Value::Int(current), Value::Int(new_val)) => {
159 if *current < *new_val {
160 *entry = new
161 }
162 }
163 (Value::LongInt(current), Value::LongInt(new_val)) => {
164 if *current < *new_val {
165 *entry = new
166 }
167 }
168 (Value::Float(current), Value::Float(new_val)) => {
169 if *current < *new_val {
170 *entry = new
171 }
172 }
173 (Value::Double(current), Value::Double(new_val)) => {
174 if *current < *new_val {
175 *entry = new
176 }
177 }
178 (Value::Date(current), Value::Date(new_val)) => {
179 if *current < *new_val {
180 *entry = new
181 }
182 }
183 (Value::Time(current), Value::Time(new_val)) => {
184 if *current < *new_val {
185 *entry = new
186 }
187 }
188 (Value::Timestamp(current), Value::Timestamp(new_val)) => {
189 if *current < *new_val {
190 *entry = new
191 }
192 }
193 (Value::TimestampTZ(current), Value::TimestampTZ(new_val)) => {
194 if *current < *new_val {
195 *entry = new
196 }
197 }
198 _ => (),
199 }
200 }
201 Entry::Vacant(entry) => {
202 entry.insert(new);
203 }
204 }
205
206 if let Some(partition_field) = partition_fields.get(column_name) {
207 if let Some(partition_value) = partition.get_mut(partition_field.name())
208 {
209 if partition_value.is_none() {
210 let partition_field =
211 partition_fields.get(column_name).ok_or_else(|| {
212 Error::InvalidFormat("transform".to_string())
213 })?;
214 if let (Some(min_bytes), Some(max_bytes)) =
215 (statistics.min_bytes_opt(), statistics.max_bytes_opt())
216 {
217 let min = Value::try_from_bytes(min_bytes, data_type)?
218 .transform(partition_field.transform())?;
219 let max = Value::try_from_bytes(max_bytes, data_type)?
220 .transform(partition_field.transform())?;
221 if min == max {
222 *partition_value = Some(min)
223 } else {
224 return Err(Error::InvalidFormat(
225 "Partition value of data file".to_owned(),
226 ));
227 }
228 }
229 }
230 }
231 }
232 }
233 }
234 }
235 }
236 }
237 let mut builder = DataFile::builder();
238 builder
239 .with_content(if equality_ids.is_none() {
240 Content::Data
241 } else {
242 Content::EqualityDeletes
243 })
244 .with_file_path(location.to_string())
245 .with_file_format(FileFormat::Parquet)
246 .with_partition(partition)
247 .with_record_count(file_metadata.num_rows)
248 .with_file_size_in_bytes(file_size as i64)
249 .with_column_sizes(Some(column_sizes))
250 .with_value_counts(Some(value_counts))
251 .with_null_value_counts(Some(null_value_counts))
252 .with_nan_value_counts(None)
253 .with_lower_bounds(Some(lower_bounds))
254 .with_upper_bounds(Some(upper_bounds));
255
256 if let Some(equality_ids) = equality_ids {
257 builder.with_equality_ids(Some(equality_ids.to_vec()));
258 }
259
260 let content = builder
261 .build()
262 .map_err(iceberg_rust_spec::error::Error::from)?;
263 Ok(content)
264}
265
266pub fn thrift_size<T: TSerializable>(metadata: &T) -> Result<usize, Error> {
268 let mut buffer = TrackedWrite::new(Vec::<u8>::new());
269 let mut protocol = TCompactOutputProtocol::new(&mut buffer);
270 metadata.write_to_out_protocol(&mut protocol)?;
271 Ok(buffer.bytes_written())
272}