1use std::fmt;
21use std::fs::File;
22use std::str::FromStr;
23use std::sync::Arc;
24
25use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray, UInt64Array};
26use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
27use arrow::record_batch::RecordBatch;
28use arrow::util::pretty::pretty_format_batches;
29use datafusion::catalog::{Session, TableFunctionImpl};
30use datafusion::common::{plan_err, Column};
31use datafusion::datasource::memory::MemorySourceConfig;
32use datafusion::datasource::TableProvider;
33use datafusion::error::Result;
34use datafusion::execution::cache::cache_manager::CacheManager;
35use datafusion::logical_expr::Expr;
36use datafusion::physical_plan::ExecutionPlan;
37use datafusion::scalar::ScalarValue;
38
39use async_trait::async_trait;
40use parquet::basic::ConvertedType;
41use parquet::data_type::{ByteArray, FixedLenByteArray};
42use parquet::file::reader::FileReader;
43use parquet::file::serialized_reader::SerializedFileReader;
44use parquet::file::statistics::Statistics;
45
46#[derive(Debug)]
47pub enum Function {
48 Select,
49 Explain,
50 Show,
51 CreateTable,
52 CreateTableAs,
53 Insert,
54 DropTable,
55}
56
57const ALL_FUNCTIONS: [Function; 7] = [
58 Function::CreateTable,
59 Function::CreateTableAs,
60 Function::DropTable,
61 Function::Explain,
62 Function::Insert,
63 Function::Select,
64 Function::Show,
65];
66
67impl Function {
68 pub fn function_details(&self) -> Result<&str> {
69 let details = match self {
70 Function::Select => {
71 r#"
72Command: SELECT
73Description: retrieve rows from a table or view
74Syntax:
75SELECT [ ALL | DISTINCT [ ON ( expression [, ...] ) ] ]
76 [ * | expression [ [ AS ] output_name ] [, ...] ]
77 [ FROM from_item [, ...] ]
78 [ WHERE condition ]
79 [ GROUP BY [ ALL | DISTINCT ] grouping_element [, ...] ]
80 [ HAVING condition ]
81 [ WINDOW window_name AS ( window_definition ) [, ...] ]
82 [ { UNION | INTERSECT | EXCEPT } [ ALL | DISTINCT ] select ]
83 [ ORDER BY expression [ ASC | DESC | USING operator ] [ NULLS { FIRST | LAST } ] [, ...] ]
84 [ LIMIT { count | ALL } ]
85 [ OFFSET start [ ROW | ROWS ] ]
86
87where from_item can be one of:
88
89 [ ONLY ] table_name [ * ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
90 [ TABLESAMPLE sampling_method ( argument [, ...] ) [ REPEATABLE ( seed ) ] ]
91 [ LATERAL ] ( select ) [ AS ] alias [ ( column_alias [, ...] ) ]
92 with_query_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
93 [ LATERAL ] function_name ( [ argument [, ...] ] )
94 [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
95 [ LATERAL ] function_name ( [ argument [, ...] ] ) [ AS ] alias ( column_definition [, ...] )
96 [ LATERAL ] function_name ( [ argument [, ...] ] ) AS ( column_definition [, ...] )
97 [ LATERAL ] ROWS FROM( function_name ( [ argument [, ...] ] ) [ AS ( column_definition [, ...] ) ] [, ...] )
98 [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
99 from_item [ NATURAL ] join_type from_item [ ON join_condition | USING ( join_column [, ...] ) [ AS join_using_alias ] ]
100
101and grouping_element can be one of:
102
103 ( )
104 expression
105 ( expression [, ...] )
106
107and with_query is:
108
109 with_query_name [ ( column_name [, ...] ) ] AS [ [ NOT ] MATERIALIZED ] ( select | values | insert | update | delete )
110
111TABLE [ ONLY ] table_name [ * ]"#
112 }
113 Function::Explain => {
114 r#"
115Command: EXPLAIN
116Description: show the execution plan of a statement
117Syntax:
118EXPLAIN [ ANALYZE ] statement
119"#
120 }
121 Function::Show => {
122 r#"
123Command: SHOW
124Description: show the value of a run-time parameter
125Syntax:
126SHOW name
127"#
128 }
129 Function::CreateTable => {
130 r#"
131Command: CREATE TABLE
132Description: define a new table
133Syntax:
134CREATE [ EXTERNAL ] TABLE table_name ( [
135 { column_name data_type }
136 [, ... ]
137] )
138"#
139 }
140 Function::CreateTableAs => {
141 r#"
142Command: CREATE TABLE AS
143Description: define a new table from the results of a query
144Syntax:
145CREATE TABLE table_name
146 [ (column_name [, ...] ) ]
147 AS query
148 [ WITH [ NO ] DATA ]
149"#
150 }
151 Function::Insert => {
152 r#"
153Command: INSERT
154Description: create new rows in a table
155Syntax:
156INSERT INTO table_name [ ( column_name [, ...] ) ]
157 { VALUES ( { expression } [, ...] ) [, ...] }
158"#
159 }
160 Function::DropTable => {
161 r#"
162Command: DROP TABLE
163Description: remove a table
164Syntax:
165DROP TABLE [ IF EXISTS ] name [, ...]
166"#
167 }
168 };
169 Ok(details)
170 }
171}
172
173impl FromStr for Function {
174 type Err = ();
175
176 fn from_str(s: &str) -> Result<Self, Self::Err> {
177 Ok(match s.trim().to_uppercase().as_str() {
178 "SELECT" => Self::Select,
179 "EXPLAIN" => Self::Explain,
180 "SHOW" => Self::Show,
181 "CREATE TABLE" => Self::CreateTable,
182 "CREATE TABLE AS" => Self::CreateTableAs,
183 "INSERT" => Self::Insert,
184 "DROP TABLE" => Self::DropTable,
185 _ => return Err(()),
186 })
187 }
188}
189
190impl fmt::Display for Function {
191 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192 match *self {
193 Function::Select => write!(f, "SELECT"),
194 Function::Explain => write!(f, "EXPLAIN"),
195 Function::Show => write!(f, "SHOW"),
196 Function::CreateTable => write!(f, "CREATE TABLE"),
197 Function::CreateTableAs => write!(f, "CREATE TABLE AS"),
198 Function::Insert => write!(f, "INSERT"),
199 Function::DropTable => write!(f, "DROP TABLE"),
200 }
201 }
202}
203
204pub fn display_all_functions() -> Result<()> {
205 println!("Available help:");
206 let array = StringArray::from(
207 ALL_FUNCTIONS
208 .iter()
209 .map(|f| format!("{f}"))
210 .collect::<Vec<String>>(),
211 );
212 let schema = Schema::new(vec![Field::new("Function", DataType::Utf8, false)]);
213 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;
214 println!("{}", pretty_format_batches(&[batch]).unwrap());
215 Ok(())
216}
217
218#[derive(Debug)]
220struct ParquetMetadataTable {
221 schema: SchemaRef,
222 batch: RecordBatch,
223}
224
225#[async_trait]
226impl TableProvider for ParquetMetadataTable {
227 fn as_any(&self) -> &dyn std::any::Any {
228 self
229 }
230
231 fn schema(&self) -> arrow::datatypes::SchemaRef {
232 self.schema.clone()
233 }
234
235 fn table_type(&self) -> datafusion::logical_expr::TableType {
236 datafusion::logical_expr::TableType::Base
237 }
238
239 async fn scan(
240 &self,
241 _state: &dyn Session,
242 projection: Option<&Vec<usize>>,
243 _filters: &[Expr],
244 _limit: Option<usize>,
245 ) -> Result<Arc<dyn ExecutionPlan>> {
246 Ok(MemorySourceConfig::try_new_exec(
247 &[vec![self.batch.clone()]],
248 TableProvider::schema(self),
249 projection.cloned(),
250 )?)
251 }
252}
253
254fn convert_parquet_statistics(
255 value: &Statistics,
256 converted_type: ConvertedType,
257) -> (Option<String>, Option<String>) {
258 match (value, converted_type) {
259 (Statistics::Boolean(val), _) => (
260 val.min_opt().map(|v| v.to_string()),
261 val.max_opt().map(|v| v.to_string()),
262 ),
263 (Statistics::Int32(val), _) => (
264 val.min_opt().map(|v| v.to_string()),
265 val.max_opt().map(|v| v.to_string()),
266 ),
267 (Statistics::Int64(val), _) => (
268 val.min_opt().map(|v| v.to_string()),
269 val.max_opt().map(|v| v.to_string()),
270 ),
271 (Statistics::Int96(val), _) => (
272 val.min_opt().map(|v| v.to_string()),
273 val.max_opt().map(|v| v.to_string()),
274 ),
275 (Statistics::Float(val), _) => (
276 val.min_opt().map(|v| v.to_string()),
277 val.max_opt().map(|v| v.to_string()),
278 ),
279 (Statistics::Double(val), _) => (
280 val.min_opt().map(|v| v.to_string()),
281 val.max_opt().map(|v| v.to_string()),
282 ),
283 (Statistics::ByteArray(val), ConvertedType::UTF8) => (
284 byte_array_to_string(val.min_opt()),
285 byte_array_to_string(val.max_opt()),
286 ),
287 (Statistics::ByteArray(val), _) => (
288 val.min_opt().map(|v| v.to_string()),
289 val.max_opt().map(|v| v.to_string()),
290 ),
291 (Statistics::FixedLenByteArray(val), ConvertedType::UTF8) => (
292 fixed_len_byte_array_to_string(val.min_opt()),
293 fixed_len_byte_array_to_string(val.max_opt()),
294 ),
295 (Statistics::FixedLenByteArray(val), _) => (
296 val.min_opt().map(|v| v.to_string()),
297 val.max_opt().map(|v| v.to_string()),
298 ),
299 }
300}
301
302fn byte_array_to_string(val: Option<&ByteArray>) -> Option<String> {
304 val.map(|v| {
305 v.as_utf8()
306 .map(|s| s.to_string())
307 .unwrap_or_else(|_e| v.to_string())
308 })
309}
310
311fn fixed_len_byte_array_to_string(val: Option<&FixedLenByteArray>) -> Option<String> {
313 val.map(|v| {
314 v.as_utf8()
315 .map(|s| s.to_string())
316 .unwrap_or_else(|_e| v.to_string())
317 })
318}
319
320#[derive(Debug)]
321pub struct ParquetMetadataFunc {}
322
323impl TableFunctionImpl for ParquetMetadataFunc {
324 fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
325 let filename = match exprs.first() {
326 Some(Expr::Literal(ScalarValue::Utf8(Some(s)), _)) => s, Some(Expr::Column(Column { name, .. })) => name, _ => {
329 return plan_err!(
330 "parquet_metadata requires string argument as its input"
331 );
332 }
333 };
334
335 let file = File::open(filename.clone())?;
336 let reader = SerializedFileReader::new(file)?;
337 let metadata = reader.metadata();
338
339 let schema = Arc::new(Schema::new(vec![
340 Field::new("filename", DataType::Utf8, true),
341 Field::new("row_group_id", DataType::Int64, true),
342 Field::new("row_group_num_rows", DataType::Int64, true),
343 Field::new("row_group_num_columns", DataType::Int64, true),
344 Field::new("row_group_bytes", DataType::Int64, true),
345 Field::new("column_id", DataType::Int64, true),
346 Field::new("file_offset", DataType::Int64, true),
347 Field::new("num_values", DataType::Int64, true),
348 Field::new("path_in_schema", DataType::Utf8, true),
349 Field::new("type", DataType::Utf8, true),
350 Field::new("stats_min", DataType::Utf8, true),
351 Field::new("stats_max", DataType::Utf8, true),
352 Field::new("stats_null_count", DataType::Int64, true),
353 Field::new("stats_distinct_count", DataType::Int64, true),
354 Field::new("stats_min_value", DataType::Utf8, true),
355 Field::new("stats_max_value", DataType::Utf8, true),
356 Field::new("compression", DataType::Utf8, true),
357 Field::new("encodings", DataType::Utf8, true),
358 Field::new("index_page_offset", DataType::Int64, true),
359 Field::new("dictionary_page_offset", DataType::Int64, true),
360 Field::new("data_page_offset", DataType::Int64, true),
361 Field::new("total_compressed_size", DataType::Int64, true),
362 Field::new("total_uncompressed_size", DataType::Int64, true),
363 ]));
364
365 let mut filename_arr = vec![];
367 let mut row_group_id_arr = vec![];
368 let mut row_group_num_rows_arr = vec![];
369 let mut row_group_num_columns_arr = vec![];
370 let mut row_group_bytes_arr = vec![];
371 let mut column_id_arr = vec![];
372 let mut file_offset_arr = vec![];
373 let mut num_values_arr = vec![];
374 let mut path_in_schema_arr = vec![];
375 let mut type_arr = vec![];
376 let mut stats_min_arr = vec![];
377 let mut stats_max_arr = vec![];
378 let mut stats_null_count_arr = vec![];
379 let mut stats_distinct_count_arr = vec![];
380 let mut stats_min_value_arr = vec![];
381 let mut stats_max_value_arr = vec![];
382 let mut compression_arr = vec![];
383 let mut encodings_arr = vec![];
384 let mut index_page_offset_arr = vec![];
385 let mut dictionary_page_offset_arr = vec![];
386 let mut data_page_offset_arr = vec![];
387 let mut total_compressed_size_arr = vec![];
388 let mut total_uncompressed_size_arr = vec![];
389 for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
390 for (col_idx, column) in row_group.columns().iter().enumerate() {
391 filename_arr.push(filename.clone());
392 row_group_id_arr.push(rg_idx as i64);
393 row_group_num_rows_arr.push(row_group.num_rows());
394 row_group_num_columns_arr.push(row_group.num_columns() as i64);
395 row_group_bytes_arr.push(row_group.total_byte_size());
396 column_id_arr.push(col_idx as i64);
397 file_offset_arr.push(column.file_offset());
398 num_values_arr.push(column.num_values());
399 path_in_schema_arr.push(column.column_path().to_string());
400 type_arr.push(column.column_type().to_string());
401 let converted_type = column.column_descr().converted_type();
402
403 if let Some(s) = column.statistics() {
404 let (min_val, max_val) =
405 convert_parquet_statistics(s, converted_type);
406 stats_min_arr.push(min_val.clone());
407 stats_max_arr.push(max_val.clone());
408 stats_null_count_arr.push(s.null_count_opt().map(|c| c as i64));
409 stats_distinct_count_arr
410 .push(s.distinct_count_opt().map(|c| c as i64));
411 stats_min_value_arr.push(min_val);
412 stats_max_value_arr.push(max_val);
413 } else {
414 stats_min_arr.push(None);
415 stats_max_arr.push(None);
416 stats_null_count_arr.push(None);
417 stats_distinct_count_arr.push(None);
418 stats_min_value_arr.push(None);
419 stats_max_value_arr.push(None);
420 };
421 compression_arr.push(format!("{:?}", column.compression()));
422 let encodings: Vec<_> = column.encodings().collect();
424 encodings_arr.push(format!("{:?}", encodings));
425 index_page_offset_arr.push(column.index_page_offset());
426 dictionary_page_offset_arr.push(column.dictionary_page_offset());
427 data_page_offset_arr.push(column.data_page_offset());
428 total_compressed_size_arr.push(column.compressed_size());
429 total_uncompressed_size_arr.push(column.uncompressed_size());
430 }
431 }
432
433 let rb = RecordBatch::try_new(
434 schema.clone(),
435 vec![
436 Arc::new(StringArray::from(filename_arr)),
437 Arc::new(Int64Array::from(row_group_id_arr)),
438 Arc::new(Int64Array::from(row_group_num_rows_arr)),
439 Arc::new(Int64Array::from(row_group_num_columns_arr)),
440 Arc::new(Int64Array::from(row_group_bytes_arr)),
441 Arc::new(Int64Array::from(column_id_arr)),
442 Arc::new(Int64Array::from(file_offset_arr)),
443 Arc::new(Int64Array::from(num_values_arr)),
444 Arc::new(StringArray::from(path_in_schema_arr)),
445 Arc::new(StringArray::from(type_arr)),
446 Arc::new(StringArray::from(stats_min_arr)),
447 Arc::new(StringArray::from(stats_max_arr)),
448 Arc::new(Int64Array::from(stats_null_count_arr)),
449 Arc::new(Int64Array::from(stats_distinct_count_arr)),
450 Arc::new(StringArray::from(stats_min_value_arr)),
451 Arc::new(StringArray::from(stats_max_value_arr)),
452 Arc::new(StringArray::from(compression_arr)),
453 Arc::new(StringArray::from(encodings_arr)),
454 Arc::new(Int64Array::from(index_page_offset_arr)),
455 Arc::new(Int64Array::from(dictionary_page_offset_arr)),
456 Arc::new(Int64Array::from(data_page_offset_arr)),
457 Arc::new(Int64Array::from(total_compressed_size_arr)),
458 Arc::new(Int64Array::from(total_uncompressed_size_arr)),
459 ],
460 )?;
461
462 let parquet_metadata = ParquetMetadataTable { schema, batch: rb };
463 Ok(Arc::new(parquet_metadata))
464 }
465}
466
467#[derive(Debug)]
469struct MetadataCacheTable {
470 schema: SchemaRef,
471 batch: RecordBatch,
472}
473
474#[async_trait]
475impl TableProvider for MetadataCacheTable {
476 fn as_any(&self) -> &dyn std::any::Any {
477 self
478 }
479
480 fn schema(&self) -> arrow::datatypes::SchemaRef {
481 self.schema.clone()
482 }
483
484 fn table_type(&self) -> datafusion::logical_expr::TableType {
485 datafusion::logical_expr::TableType::Base
486 }
487
488 async fn scan(
489 &self,
490 _state: &dyn Session,
491 projection: Option<&Vec<usize>>,
492 _filters: &[Expr],
493 _limit: Option<usize>,
494 ) -> Result<Arc<dyn ExecutionPlan>> {
495 Ok(MemorySourceConfig::try_new_exec(
496 &[vec![self.batch.clone()]],
497 TableProvider::schema(self),
498 projection.cloned(),
499 )?)
500 }
501}
502
503#[derive(Debug)]
504pub struct MetadataCacheFunc {
505 cache_manager: Arc<CacheManager>,
506}
507
508impl MetadataCacheFunc {
509 pub fn new(cache_manager: Arc<CacheManager>) -> Self {
510 Self { cache_manager }
511 }
512}
513
514impl TableFunctionImpl for MetadataCacheFunc {
515 fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
516 if !exprs.is_empty() {
517 return plan_err!("metadata_cache should have no arguments");
518 }
519
520 let schema = Arc::new(Schema::new(vec![
521 Field::new("path", DataType::Utf8, false),
522 Field::new(
523 "file_modified",
524 DataType::Timestamp(TimeUnit::Millisecond, None),
525 false,
526 ),
527 Field::new("file_size_bytes", DataType::UInt64, false),
528 Field::new("e_tag", DataType::Utf8, true),
529 Field::new("version", DataType::Utf8, true),
530 Field::new("metadata_size_bytes", DataType::UInt64, false),
531 Field::new("hits", DataType::UInt64, false),
532 Field::new("extra", DataType::Utf8, true),
533 ]));
534
535 let mut path_arr = vec![];
537 let mut file_modified_arr = vec![];
538 let mut file_size_bytes_arr = vec![];
539 let mut e_tag_arr = vec![];
540 let mut version_arr = vec![];
541 let mut metadata_size_bytes = vec![];
542 let mut hits_arr = vec![];
543 let mut extra_arr = vec![];
544
545 let cached_entries = self.cache_manager.get_file_metadata_cache().list_entries();
546
547 for (path, entry) in cached_entries {
548 path_arr.push(path.to_string());
549 file_modified_arr
550 .push(Some(entry.object_meta.last_modified.timestamp_millis()));
551 file_size_bytes_arr.push(entry.object_meta.size);
552 e_tag_arr.push(entry.object_meta.e_tag);
553 version_arr.push(entry.object_meta.version);
554 metadata_size_bytes.push(entry.size_bytes as u64);
555 hits_arr.push(entry.hits as u64);
556
557 let mut extra = entry
558 .extra
559 .iter()
560 .map(|(k, v)| format!("{k}={v}"))
561 .collect::<Vec<_>>();
562 extra.sort();
563 extra_arr.push(extra.join(" "));
564 }
565
566 let batch = RecordBatch::try_new(
567 schema.clone(),
568 vec![
569 Arc::new(StringArray::from(path_arr)),
570 Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
571 Arc::new(UInt64Array::from(file_size_bytes_arr)),
572 Arc::new(StringArray::from(e_tag_arr)),
573 Arc::new(StringArray::from(version_arr)),
574 Arc::new(UInt64Array::from(metadata_size_bytes)),
575 Arc::new(UInt64Array::from(hits_arr)),
576 Arc::new(StringArray::from(extra_arr)),
577 ],
578 )?;
579
580 let metadata_cache = MetadataCacheTable { schema, batch };
581 Ok(Arc::new(metadata_cache))
582 }
583}