1use arrow::array::{Int64Array, StringArray};
19use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
20use arrow::record_batch::RecordBatch;
21use async_trait::async_trait;
22
23use datafusion::catalog::Session;
24use datafusion::catalog::TableFunctionImpl;
25use datafusion::common::{plan_err, Column};
26use datafusion::datasource::memory::MemorySourceConfig;
27use datafusion::datasource::TableProvider;
28use datafusion::error::Result;
29use datafusion::logical_expr::Expr;
30use datafusion::physical_plan::ExecutionPlan;
31use datafusion::scalar::ScalarValue;
32use parquet::basic::ConvertedType;
33use parquet::data_type::{ByteArray, FixedLenByteArray};
34use parquet::file::reader::FileReader;
35use parquet::file::serialized_reader::SerializedFileReader;
36use parquet::file::statistics::Statistics;
37use std::fs::File;
38use std::sync::Arc;
39
40#[derive(Debug)]
43struct ParquetMetadataTable {
44 schema: SchemaRef,
45 batch: RecordBatch,
46}
47
48#[async_trait]
49impl TableProvider for ParquetMetadataTable {
50 fn as_any(&self) -> &dyn std::any::Any {
51 self
52 }
53
54 fn schema(&self) -> arrow::datatypes::SchemaRef {
55 self.schema.clone()
56 }
57
58 fn table_type(&self) -> datafusion::logical_expr::TableType {
59 datafusion::logical_expr::TableType::Base
60 }
61
62 async fn scan(
63 &self,
64 _state: &dyn Session,
65 projection: Option<&Vec<usize>>,
66 _filters: &[Expr],
67 _limit: Option<usize>,
68 ) -> Result<Arc<dyn ExecutionPlan>> {
69 Ok(MemorySourceConfig::try_new_exec(
70 &[vec![self.batch.clone()]],
71 TableProvider::schema(self),
72 projection.cloned(),
73 )?)
74 }
75}
76
77fn convert_parquet_statistics(
78 value: &Statistics,
79 converted_type: ConvertedType,
80) -> (Option<String>, Option<String>) {
81 match (value, converted_type) {
82 (Statistics::Boolean(val), _) => (
83 val.min_opt().map(|v| v.to_string()),
84 val.max_opt().map(|v| v.to_string()),
85 ),
86 (Statistics::Int32(val), _) => (
87 val.min_opt().map(|v| v.to_string()),
88 val.max_opt().map(|v| v.to_string()),
89 ),
90 (Statistics::Int64(val), _) => (
91 val.min_opt().map(|v| v.to_string()),
92 val.max_opt().map(|v| v.to_string()),
93 ),
94 (Statistics::Int96(val), _) => (
95 val.min_opt().map(|v| v.to_string()),
96 val.max_opt().map(|v| v.to_string()),
97 ),
98 (Statistics::Float(val), _) => (
99 val.min_opt().map(|v| v.to_string()),
100 val.max_opt().map(|v| v.to_string()),
101 ),
102 (Statistics::Double(val), _) => (
103 val.min_opt().map(|v| v.to_string()),
104 val.max_opt().map(|v| v.to_string()),
105 ),
106 (Statistics::ByteArray(val), ConvertedType::UTF8) => (
107 byte_array_to_string(val.min_opt()),
108 byte_array_to_string(val.max_opt()),
109 ),
110 (Statistics::ByteArray(val), _) => (
111 val.min_opt().map(|v| v.to_string()),
112 val.max_opt().map(|v| v.to_string()),
113 ),
114 (Statistics::FixedLenByteArray(val), ConvertedType::UTF8) => (
115 fixed_len_byte_array_to_string(val.min_opt()),
116 fixed_len_byte_array_to_string(val.max_opt()),
117 ),
118 (Statistics::FixedLenByteArray(val), _) => (
119 val.min_opt().map(|v| v.to_string()),
120 val.max_opt().map(|v| v.to_string()),
121 ),
122 }
123}
124
125fn byte_array_to_string(val: Option<&ByteArray>) -> Option<String> {
127 val.map(|v| {
128 v.as_utf8()
129 .map(|s| s.to_string())
130 .unwrap_or_else(|_e| v.to_string())
131 })
132}
133
134fn fixed_len_byte_array_to_string(val: Option<&FixedLenByteArray>) -> Option<String> {
136 val.map(|v| {
137 v.as_utf8()
138 .map(|s| s.to_string())
139 .unwrap_or_else(|_e| v.to_string())
140 })
141}
142
143#[derive(Debug)]
144pub struct ParquetMetadataFunc {}
145
146impl TableFunctionImpl for ParquetMetadataFunc {
147 fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
148 let filename = match exprs.first() {
149 Some(Expr::Literal(ScalarValue::Utf8(Some(s)), _)) => s, Some(Expr::Column(Column { name, .. })) => name, _ => {
152 return plan_err!("parquet_metadata requires string argument as its input");
153 }
154 };
155
156 let file = File::open(filename.clone())?;
157 let reader = SerializedFileReader::new(file)?;
158 let metadata = reader.metadata();
159
160 let schema = Arc::new(Schema::new(vec![
161 Field::new("filename", DataType::Utf8, true),
162 Field::new("row_group_id", DataType::Int64, true),
163 Field::new("row_group_num_rows", DataType::Int64, true),
164 Field::new("row_group_num_columns", DataType::Int64, true),
165 Field::new("row_group_bytes", DataType::Int64, true),
166 Field::new("column_id", DataType::Int64, true),
167 Field::new("file_offset", DataType::Int64, true),
168 Field::new("num_values", DataType::Int64, true),
169 Field::new("path_in_schema", DataType::Utf8, true),
170 Field::new("type", DataType::Utf8, true),
171 Field::new("logical_type", DataType::Utf8, true),
172 Field::new("stats_min", DataType::Utf8, true),
173 Field::new("stats_max", DataType::Utf8, true),
174 Field::new("stats_null_count", DataType::Int64, true),
175 Field::new("stats_distinct_count", DataType::Int64, true),
176 Field::new("stats_min_value", DataType::Utf8, true),
177 Field::new("stats_max_value", DataType::Utf8, true),
178 Field::new("compression", DataType::Utf8, true),
179 Field::new("encodings", DataType::Utf8, true),
180 Field::new("index_page_offset", DataType::Int64, true),
181 Field::new("dictionary_page_offset", DataType::Int64, true),
182 Field::new("data_page_offset", DataType::Int64, true),
183 Field::new("total_compressed_size", DataType::Int64, true),
184 Field::new("total_uncompressed_size", DataType::Int64, true),
185 ]));
186
187 let mut filename_arr = vec![];
189 let mut row_group_id_arr = vec![];
190 let mut row_group_num_rows_arr = vec![];
191 let mut row_group_num_columns_arr = vec![];
192 let mut row_group_bytes_arr = vec![];
193 let mut column_id_arr = vec![];
194 let mut file_offset_arr = vec![];
195 let mut num_values_arr = vec![];
196 let mut path_in_schema_arr = vec![];
197 let mut type_arr = vec![];
198 let mut logical_type_arr = vec![];
199 let mut stats_min_arr = vec![];
200 let mut stats_max_arr = vec![];
201 let mut stats_null_count_arr = vec![];
202 let mut stats_distinct_count_arr = vec![];
203 let mut stats_min_value_arr = vec![];
204 let mut stats_max_value_arr = vec![];
205 let mut compression_arr = vec![];
206 let mut encodings_arr = vec![];
207 let mut index_page_offset_arr = vec![];
208 let mut dictionary_page_offset_arr = vec![];
209 let mut data_page_offset_arr = vec![];
210 let mut total_compressed_size_arr = vec![];
211 let mut total_uncompressed_size_arr = vec![];
212 for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
213 for (col_idx, column) in row_group.columns().iter().enumerate() {
214 filename_arr.push(filename.clone());
215 row_group_id_arr.push(rg_idx as i64);
216 row_group_num_rows_arr.push(row_group.num_rows());
217 row_group_num_columns_arr.push(row_group.num_columns() as i64);
218 row_group_bytes_arr.push(row_group.total_byte_size());
219 column_id_arr.push(col_idx as i64);
220 file_offset_arr.push(column.file_offset());
221 num_values_arr.push(column.num_values());
222 path_in_schema_arr.push(column.column_path().to_string());
223 type_arr.push(column.column_type().to_string());
224 logical_type_arr.push(
225 column
226 .column_descr()
227 .logical_type_ref()
228 .map(|lt| format!("{:?}", lt)),
229 );
230 let converted_type = column.column_descr().converted_type();
231
232 if let Some(s) = column.statistics() {
233 let (min_val, max_val) = convert_parquet_statistics(s, converted_type);
234 stats_min_arr.push(min_val.clone());
235 stats_max_arr.push(max_val.clone());
236 stats_null_count_arr.push(s.null_count_opt().map(|c| c as i64));
237 stats_distinct_count_arr.push(s.distinct_count_opt().map(|c| c as i64));
238 stats_min_value_arr.push(min_val);
239 stats_max_value_arr.push(max_val);
240 } else {
241 stats_min_arr.push(None);
242 stats_max_arr.push(None);
243 stats_null_count_arr.push(None);
244 stats_distinct_count_arr.push(None);
245 stats_min_value_arr.push(None);
246 stats_max_value_arr.push(None);
247 };
248 compression_arr.push(format!("{:?}", column.compression()));
249 encodings_arr.push(format!("{:?}", column.encodings().collect::<Vec<_>>()));
250 index_page_offset_arr.push(column.index_page_offset());
251 dictionary_page_offset_arr.push(column.dictionary_page_offset());
252 data_page_offset_arr.push(column.data_page_offset());
253 total_compressed_size_arr.push(column.compressed_size());
254 total_uncompressed_size_arr.push(column.uncompressed_size());
255 }
256 }
257
258 let rb = RecordBatch::try_new(
259 schema.clone(),
260 vec![
261 Arc::new(StringArray::from(filename_arr)),
262 Arc::new(Int64Array::from(row_group_id_arr)),
263 Arc::new(Int64Array::from(row_group_num_rows_arr)),
264 Arc::new(Int64Array::from(row_group_num_columns_arr)),
265 Arc::new(Int64Array::from(row_group_bytes_arr)),
266 Arc::new(Int64Array::from(column_id_arr)),
267 Arc::new(Int64Array::from(file_offset_arr)),
268 Arc::new(Int64Array::from(num_values_arr)),
269 Arc::new(StringArray::from(path_in_schema_arr)),
270 Arc::new(StringArray::from(type_arr)),
271 Arc::new(StringArray::from(logical_type_arr)),
272 Arc::new(StringArray::from(stats_min_arr)),
273 Arc::new(StringArray::from(stats_max_arr)),
274 Arc::new(Int64Array::from(stats_null_count_arr)),
275 Arc::new(Int64Array::from(stats_distinct_count_arr)),
276 Arc::new(StringArray::from(stats_min_value_arr)),
277 Arc::new(StringArray::from(stats_max_value_arr)),
278 Arc::new(StringArray::from(compression_arr)),
279 Arc::new(StringArray::from(encodings_arr)),
280 Arc::new(Int64Array::from(index_page_offset_arr)),
281 Arc::new(Int64Array::from(dictionary_page_offset_arr)),
282 Arc::new(Int64Array::from(data_page_offset_arr)),
283 Arc::new(Int64Array::from(total_compressed_size_arr)),
284 Arc::new(Int64Array::from(total_uncompressed_size_arr)),
285 ],
286 )?;
287
288 let parquet_metadata = ParquetMetadataTable { schema, batch: rb };
289 Ok(Arc::new(parquet_metadata))
290 }
291}