Skip to main content

datafusion_functions_parquet/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{Int64Array, StringArray};
19use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
20use arrow::record_batch::RecordBatch;
21use async_trait::async_trait;
22
23use datafusion::catalog::Session;
24use datafusion::catalog::TableFunctionImpl;
25use datafusion::common::{plan_err, Column};
26use datafusion::datasource::memory::MemorySourceConfig;
27use datafusion::datasource::TableProvider;
28use datafusion::error::Result;
29use datafusion::logical_expr::Expr;
30use datafusion::physical_plan::ExecutionPlan;
31use datafusion::scalar::ScalarValue;
32use parquet::basic::ConvertedType;
33use parquet::data_type::{ByteArray, FixedLenByteArray};
34use parquet::file::reader::FileReader;
35use parquet::file::serialized_reader::SerializedFileReader;
36use parquet::file::statistics::Statistics;
37use std::fs::File;
38use std::sync::Arc;
39
40// Copied from https://github.com/apache/datafusion/blob/main/datafusion-cli/src/functions.rs
41/// PARQUET_META table function
42#[derive(Debug)]
43struct ParquetMetadataTable {
44    schema: SchemaRef,
45    batch: RecordBatch,
46}
47
48#[async_trait]
49impl TableProvider for ParquetMetadataTable {
50    fn as_any(&self) -> &dyn std::any::Any {
51        self
52    }
53
54    fn schema(&self) -> arrow::datatypes::SchemaRef {
55        self.schema.clone()
56    }
57
58    fn table_type(&self) -> datafusion::logical_expr::TableType {
59        datafusion::logical_expr::TableType::Base
60    }
61
62    async fn scan(
63        &self,
64        _state: &dyn Session,
65        projection: Option<&Vec<usize>>,
66        _filters: &[Expr],
67        _limit: Option<usize>,
68    ) -> Result<Arc<dyn ExecutionPlan>> {
69        Ok(MemorySourceConfig::try_new_exec(
70            &[vec![self.batch.clone()]],
71            TableProvider::schema(self),
72            projection.cloned(),
73        )?)
74    }
75}
76
77fn convert_parquet_statistics(
78    value: &Statistics,
79    converted_type: ConvertedType,
80) -> (Option<String>, Option<String>) {
81    match (value, converted_type) {
82        (Statistics::Boolean(val), _) => (
83            val.min_opt().map(|v| v.to_string()),
84            val.max_opt().map(|v| v.to_string()),
85        ),
86        (Statistics::Int32(val), _) => (
87            val.min_opt().map(|v| v.to_string()),
88            val.max_opt().map(|v| v.to_string()),
89        ),
90        (Statistics::Int64(val), _) => (
91            val.min_opt().map(|v| v.to_string()),
92            val.max_opt().map(|v| v.to_string()),
93        ),
94        (Statistics::Int96(val), _) => (
95            val.min_opt().map(|v| v.to_string()),
96            val.max_opt().map(|v| v.to_string()),
97        ),
98        (Statistics::Float(val), _) => (
99            val.min_opt().map(|v| v.to_string()),
100            val.max_opt().map(|v| v.to_string()),
101        ),
102        (Statistics::Double(val), _) => (
103            val.min_opt().map(|v| v.to_string()),
104            val.max_opt().map(|v| v.to_string()),
105        ),
106        (Statistics::ByteArray(val), ConvertedType::UTF8) => (
107            byte_array_to_string(val.min_opt()),
108            byte_array_to_string(val.max_opt()),
109        ),
110        (Statistics::ByteArray(val), _) => (
111            val.min_opt().map(|v| v.to_string()),
112            val.max_opt().map(|v| v.to_string()),
113        ),
114        (Statistics::FixedLenByteArray(val), ConvertedType::UTF8) => (
115            fixed_len_byte_array_to_string(val.min_opt()),
116            fixed_len_byte_array_to_string(val.max_opt()),
117        ),
118        (Statistics::FixedLenByteArray(val), _) => (
119            val.min_opt().map(|v| v.to_string()),
120            val.max_opt().map(|v| v.to_string()),
121        ),
122    }
123}
124
125/// Convert to a string if it has utf8 encoding, otherwise print bytes directly
126fn byte_array_to_string(val: Option<&ByteArray>) -> Option<String> {
127    val.map(|v| {
128        v.as_utf8()
129            .map(|s| s.to_string())
130            .unwrap_or_else(|_e| v.to_string())
131    })
132}
133
134/// Convert to a string if it has utf8 encoding, otherwise print bytes directly
135fn fixed_len_byte_array_to_string(val: Option<&FixedLenByteArray>) -> Option<String> {
136    val.map(|v| {
137        v.as_utf8()
138            .map(|s| s.to_string())
139            .unwrap_or_else(|_e| v.to_string())
140    })
141}
142
143#[derive(Debug)]
144pub struct ParquetMetadataFunc {}
145
146impl TableFunctionImpl for ParquetMetadataFunc {
147    fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
148        let filename = match exprs.first() {
149            Some(Expr::Literal(ScalarValue::Utf8(Some(s)), _)) => s, // single quote: parquet_metadata('x.parquet')
150            Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
151            _ => {
152                return plan_err!("parquet_metadata requires string argument as its input");
153            }
154        };
155
156        let file = File::open(filename.clone())?;
157        let reader = SerializedFileReader::new(file)?;
158        let metadata = reader.metadata();
159
160        let schema = Arc::new(Schema::new(vec![
161            Field::new("filename", DataType::Utf8, true),
162            Field::new("row_group_id", DataType::Int64, true),
163            Field::new("row_group_num_rows", DataType::Int64, true),
164            Field::new("row_group_num_columns", DataType::Int64, true),
165            Field::new("row_group_bytes", DataType::Int64, true),
166            Field::new("column_id", DataType::Int64, true),
167            Field::new("file_offset", DataType::Int64, true),
168            Field::new("num_values", DataType::Int64, true),
169            Field::new("path_in_schema", DataType::Utf8, true),
170            Field::new("type", DataType::Utf8, true),
171            Field::new("logical_type", DataType::Utf8, true),
172            Field::new("stats_min", DataType::Utf8, true),
173            Field::new("stats_max", DataType::Utf8, true),
174            Field::new("stats_null_count", DataType::Int64, true),
175            Field::new("stats_distinct_count", DataType::Int64, true),
176            Field::new("stats_min_value", DataType::Utf8, true),
177            Field::new("stats_max_value", DataType::Utf8, true),
178            Field::new("compression", DataType::Utf8, true),
179            Field::new("encodings", DataType::Utf8, true),
180            Field::new("index_page_offset", DataType::Int64, true),
181            Field::new("dictionary_page_offset", DataType::Int64, true),
182            Field::new("data_page_offset", DataType::Int64, true),
183            Field::new("total_compressed_size", DataType::Int64, true),
184            Field::new("total_uncompressed_size", DataType::Int64, true),
185        ]));
186
187        // construct record batch from metadata
188        let mut filename_arr = vec![];
189        let mut row_group_id_arr = vec![];
190        let mut row_group_num_rows_arr = vec![];
191        let mut row_group_num_columns_arr = vec![];
192        let mut row_group_bytes_arr = vec![];
193        let mut column_id_arr = vec![];
194        let mut file_offset_arr = vec![];
195        let mut num_values_arr = vec![];
196        let mut path_in_schema_arr = vec![];
197        let mut type_arr = vec![];
198        let mut logical_type_arr = vec![];
199        let mut stats_min_arr = vec![];
200        let mut stats_max_arr = vec![];
201        let mut stats_null_count_arr = vec![];
202        let mut stats_distinct_count_arr = vec![];
203        let mut stats_min_value_arr = vec![];
204        let mut stats_max_value_arr = vec![];
205        let mut compression_arr = vec![];
206        let mut encodings_arr = vec![];
207        let mut index_page_offset_arr = vec![];
208        let mut dictionary_page_offset_arr = vec![];
209        let mut data_page_offset_arr = vec![];
210        let mut total_compressed_size_arr = vec![];
211        let mut total_uncompressed_size_arr = vec![];
212        for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
213            for (col_idx, column) in row_group.columns().iter().enumerate() {
214                filename_arr.push(filename.clone());
215                row_group_id_arr.push(rg_idx as i64);
216                row_group_num_rows_arr.push(row_group.num_rows());
217                row_group_num_columns_arr.push(row_group.num_columns() as i64);
218                row_group_bytes_arr.push(row_group.total_byte_size());
219                column_id_arr.push(col_idx as i64);
220                file_offset_arr.push(column.file_offset());
221                num_values_arr.push(column.num_values());
222                path_in_schema_arr.push(column.column_path().to_string());
223                type_arr.push(column.column_type().to_string());
224                logical_type_arr.push(
225                    column
226                        .column_descr()
227                        .logical_type_ref()
228                        .map(|lt| format!("{:?}", lt)),
229                );
230                let converted_type = column.column_descr().converted_type();
231
232                if let Some(s) = column.statistics() {
233                    let (min_val, max_val) = convert_parquet_statistics(s, converted_type);
234                    stats_min_arr.push(min_val.clone());
235                    stats_max_arr.push(max_val.clone());
236                    stats_null_count_arr.push(s.null_count_opt().map(|c| c as i64));
237                    stats_distinct_count_arr.push(s.distinct_count_opt().map(|c| c as i64));
238                    stats_min_value_arr.push(min_val);
239                    stats_max_value_arr.push(max_val);
240                } else {
241                    stats_min_arr.push(None);
242                    stats_max_arr.push(None);
243                    stats_null_count_arr.push(None);
244                    stats_distinct_count_arr.push(None);
245                    stats_min_value_arr.push(None);
246                    stats_max_value_arr.push(None);
247                };
248                compression_arr.push(format!("{:?}", column.compression()));
249                encodings_arr.push(format!("{:?}", column.encodings().collect::<Vec<_>>()));
250                index_page_offset_arr.push(column.index_page_offset());
251                dictionary_page_offset_arr.push(column.dictionary_page_offset());
252                data_page_offset_arr.push(column.data_page_offset());
253                total_compressed_size_arr.push(column.compressed_size());
254                total_uncompressed_size_arr.push(column.uncompressed_size());
255            }
256        }
257
258        let rb = RecordBatch::try_new(
259            schema.clone(),
260            vec![
261                Arc::new(StringArray::from(filename_arr)),
262                Arc::new(Int64Array::from(row_group_id_arr)),
263                Arc::new(Int64Array::from(row_group_num_rows_arr)),
264                Arc::new(Int64Array::from(row_group_num_columns_arr)),
265                Arc::new(Int64Array::from(row_group_bytes_arr)),
266                Arc::new(Int64Array::from(column_id_arr)),
267                Arc::new(Int64Array::from(file_offset_arr)),
268                Arc::new(Int64Array::from(num_values_arr)),
269                Arc::new(StringArray::from(path_in_schema_arr)),
270                Arc::new(StringArray::from(type_arr)),
271                Arc::new(StringArray::from(logical_type_arr)),
272                Arc::new(StringArray::from(stats_min_arr)),
273                Arc::new(StringArray::from(stats_max_arr)),
274                Arc::new(Int64Array::from(stats_null_count_arr)),
275                Arc::new(Int64Array::from(stats_distinct_count_arr)),
276                Arc::new(StringArray::from(stats_min_value_arr)),
277                Arc::new(StringArray::from(stats_max_value_arr)),
278                Arc::new(StringArray::from(compression_arr)),
279                Arc::new(StringArray::from(encodings_arr)),
280                Arc::new(Int64Array::from(index_page_offset_arr)),
281                Arc::new(Int64Array::from(dictionary_page_offset_arr)),
282                Arc::new(Int64Array::from(data_page_offset_arr)),
283                Arc::new(Int64Array::from(total_compressed_size_arr)),
284                Arc::new(Int64Array::from(total_uncompressed_size_arr)),
285            ],
286        )?;
287
288        let parquet_metadata = ParquetMetadataTable { schema, batch: rb };
289        Ok(Arc::new(parquet_metadata))
290    }
291}