1use datafusion_common::instant::Instant;
21use std::fmt;
22use std::fs::File;
23use std::str::FromStr;
24use std::sync::Arc;
25
26use arrow::array::{
27 DurationMillisecondArray, GenericListArray, Int64Array, StringArray, StructArray,
28 TimestampMillisecondArray, UInt64Array,
29};
30use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
31use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
32use arrow::record_batch::RecordBatch;
33use arrow::util::pretty::pretty_format_batches;
34use datafusion::catalog::{Session, TableFunctionImpl};
35use datafusion::common::{Column, plan_err};
36use datafusion::datasource::TableProvider;
37use datafusion::datasource::memory::MemorySourceConfig;
38use datafusion::error::Result;
39use datafusion::execution::cache::cache_manager::CacheManager;
40use datafusion::logical_expr::Expr;
41use datafusion::physical_plan::ExecutionPlan;
42use datafusion::scalar::ScalarValue;
43
44use async_trait::async_trait;
45use parquet::basic::ConvertedType;
46use parquet::data_type::{ByteArray, FixedLenByteArray};
47use parquet::file::reader::FileReader;
48use parquet::file::serialized_reader::SerializedFileReader;
49use parquet::file::statistics::Statistics;
50
51#[derive(Debug)]
52pub enum Function {
53 Select,
54 Explain,
55 Show,
56 CreateTable,
57 CreateTableAs,
58 Insert,
59 DropTable,
60}
61
62const ALL_FUNCTIONS: [Function; 7] = [
63 Function::CreateTable,
64 Function::CreateTableAs,
65 Function::DropTable,
66 Function::Explain,
67 Function::Insert,
68 Function::Select,
69 Function::Show,
70];
71
72impl Function {
73 pub fn function_details(&self) -> Result<&str> {
74 let details = match self {
75 Function::Select => {
76 r#"
77Command: SELECT
78Description: retrieve rows from a table or view
79Syntax:
80SELECT [ ALL | DISTINCT [ ON ( expression [, ...] ) ] ]
81 [ * | expression [ [ AS ] output_name ] [, ...] ]
82 [ FROM from_item [, ...] ]
83 [ WHERE condition ]
84 [ GROUP BY [ ALL | DISTINCT ] grouping_element [, ...] ]
85 [ HAVING condition ]
86 [ WINDOW window_name AS ( window_definition ) [, ...] ]
87 [ { UNION | INTERSECT | EXCEPT } [ ALL | DISTINCT ] select ]
88 [ ORDER BY expression [ ASC | DESC | USING operator ] [ NULLS { FIRST | LAST } ] [, ...] ]
89 [ LIMIT { count | ALL } ]
90 [ OFFSET start [ ROW | ROWS ] ]
91
92where from_item can be one of:
93
94 [ ONLY ] table_name [ * ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
95 [ TABLESAMPLE sampling_method ( argument [, ...] ) [ REPEATABLE ( seed ) ] ]
96 [ LATERAL ] ( select ) [ AS ] alias [ ( column_alias [, ...] ) ]
97 with_query_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
98 [ LATERAL ] function_name ( [ argument [, ...] ] )
99 [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
100 [ LATERAL ] function_name ( [ argument [, ...] ] ) [ AS ] alias ( column_definition [, ...] )
101 [ LATERAL ] function_name ( [ argument [, ...] ] ) AS ( column_definition [, ...] )
102 [ LATERAL ] ROWS FROM( function_name ( [ argument [, ...] ] ) [ AS ( column_definition [, ...] ) ] [, ...] )
103 [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
104 from_item [ NATURAL ] join_type from_item [ ON join_condition | USING ( join_column [, ...] ) [ AS join_using_alias ] ]
105
106and grouping_element can be one of:
107
108 ( )
109 expression
110 ( expression [, ...] )
111
112and with_query is:
113
114 with_query_name [ ( column_name [, ...] ) ] AS [ [ NOT ] MATERIALIZED ] ( select | values | insert | update | delete )
115
116TABLE [ ONLY ] table_name [ * ]"#
117 }
118 Function::Explain => {
119 r#"
120Command: EXPLAIN
121Description: show the execution plan of a statement
122Syntax:
123EXPLAIN [ ANALYZE ] statement
124"#
125 }
126 Function::Show => {
127 r#"
128Command: SHOW
129Description: show the value of a run-time parameter
130Syntax:
131SHOW name
132"#
133 }
134 Function::CreateTable => {
135 r#"
136Command: CREATE TABLE
137Description: define a new table
138Syntax:
139CREATE [ EXTERNAL ] TABLE table_name ( [
140 { column_name data_type }
141 [, ... ]
142] )
143"#
144 }
145 Function::CreateTableAs => {
146 r#"
147Command: CREATE TABLE AS
148Description: define a new table from the results of a query
149Syntax:
150CREATE TABLE table_name
151 [ (column_name [, ...] ) ]
152 AS query
153 [ WITH [ NO ] DATA ]
154"#
155 }
156 Function::Insert => {
157 r#"
158Command: INSERT
159Description: create new rows in a table
160Syntax:
161INSERT INTO table_name [ ( column_name [, ...] ) ]
162 { VALUES ( { expression } [, ...] ) [, ...] }
163"#
164 }
165 Function::DropTable => {
166 r#"
167Command: DROP TABLE
168Description: remove a table
169Syntax:
170DROP TABLE [ IF EXISTS ] name [, ...]
171"#
172 }
173 };
174 Ok(details)
175 }
176}
177
178impl FromStr for Function {
179 type Err = ();
180
181 fn from_str(s: &str) -> Result<Self, Self::Err> {
182 Ok(match s.trim().to_uppercase().as_str() {
183 "SELECT" => Self::Select,
184 "EXPLAIN" => Self::Explain,
185 "SHOW" => Self::Show,
186 "CREATE TABLE" => Self::CreateTable,
187 "CREATE TABLE AS" => Self::CreateTableAs,
188 "INSERT" => Self::Insert,
189 "DROP TABLE" => Self::DropTable,
190 _ => return Err(()),
191 })
192 }
193}
194
195impl fmt::Display for Function {
196 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
197 match *self {
198 Function::Select => write!(f, "SELECT"),
199 Function::Explain => write!(f, "EXPLAIN"),
200 Function::Show => write!(f, "SHOW"),
201 Function::CreateTable => write!(f, "CREATE TABLE"),
202 Function::CreateTableAs => write!(f, "CREATE TABLE AS"),
203 Function::Insert => write!(f, "INSERT"),
204 Function::DropTable => write!(f, "DROP TABLE"),
205 }
206 }
207}
208
209pub fn display_all_functions() -> Result<()> {
210 println!("Available help:");
211 let array = StringArray::from(
212 ALL_FUNCTIONS
213 .iter()
214 .map(|f| format!("{f}"))
215 .collect::<Vec<String>>(),
216 );
217 let schema = Schema::new(vec![Field::new("Function", DataType::Utf8, false)]);
218 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;
219 println!("{}", pretty_format_batches(&[batch]).unwrap());
220 Ok(())
221}
222
223#[derive(Debug)]
225struct ParquetMetadataTable {
226 schema: SchemaRef,
227 batch: RecordBatch,
228}
229
230#[async_trait]
231impl TableProvider for ParquetMetadataTable {
232 fn as_any(&self) -> &dyn std::any::Any {
233 self
234 }
235
236 fn schema(&self) -> arrow::datatypes::SchemaRef {
237 self.schema.clone()
238 }
239
240 fn table_type(&self) -> datafusion::logical_expr::TableType {
241 datafusion::logical_expr::TableType::Base
242 }
243
244 async fn scan(
245 &self,
246 _state: &dyn Session,
247 projection: Option<&Vec<usize>>,
248 _filters: &[Expr],
249 _limit: Option<usize>,
250 ) -> Result<Arc<dyn ExecutionPlan>> {
251 Ok(MemorySourceConfig::try_new_exec(
252 &[vec![self.batch.clone()]],
253 TableProvider::schema(self),
254 projection.cloned(),
255 )?)
256 }
257}
258
259fn convert_parquet_statistics(
260 value: &Statistics,
261 converted_type: ConvertedType,
262) -> (Option<String>, Option<String>) {
263 match (value, converted_type) {
264 (Statistics::Boolean(val), _) => (
265 val.min_opt().map(|v| v.to_string()),
266 val.max_opt().map(|v| v.to_string()),
267 ),
268 (Statistics::Int32(val), _) => (
269 val.min_opt().map(|v| v.to_string()),
270 val.max_opt().map(|v| v.to_string()),
271 ),
272 (Statistics::Int64(val), _) => (
273 val.min_opt().map(|v| v.to_string()),
274 val.max_opt().map(|v| v.to_string()),
275 ),
276 (Statistics::Int96(val), _) => (
277 val.min_opt().map(|v| v.to_string()),
278 val.max_opt().map(|v| v.to_string()),
279 ),
280 (Statistics::Float(val), _) => (
281 val.min_opt().map(|v| v.to_string()),
282 val.max_opt().map(|v| v.to_string()),
283 ),
284 (Statistics::Double(val), _) => (
285 val.min_opt().map(|v| v.to_string()),
286 val.max_opt().map(|v| v.to_string()),
287 ),
288 (Statistics::ByteArray(val), ConvertedType::UTF8) => (
289 byte_array_to_string(val.min_opt()),
290 byte_array_to_string(val.max_opt()),
291 ),
292 (Statistics::ByteArray(val), _) => (
293 val.min_opt().map(|v| v.to_string()),
294 val.max_opt().map(|v| v.to_string()),
295 ),
296 (Statistics::FixedLenByteArray(val), ConvertedType::UTF8) => (
297 fixed_len_byte_array_to_string(val.min_opt()),
298 fixed_len_byte_array_to_string(val.max_opt()),
299 ),
300 (Statistics::FixedLenByteArray(val), _) => (
301 val.min_opt().map(|v| v.to_string()),
302 val.max_opt().map(|v| v.to_string()),
303 ),
304 }
305}
306
307fn byte_array_to_string(val: Option<&ByteArray>) -> Option<String> {
309 val.map(|v| {
310 v.as_utf8()
311 .map(|s| s.to_string())
312 .unwrap_or_else(|_e| v.to_string())
313 })
314}
315
316fn fixed_len_byte_array_to_string(val: Option<&FixedLenByteArray>) -> Option<String> {
318 val.map(|v| {
319 v.as_utf8()
320 .map(|s| s.to_string())
321 .unwrap_or_else(|_e| v.to_string())
322 })
323}
324
325#[derive(Debug)]
326pub struct ParquetMetadataFunc {}
327
328impl TableFunctionImpl for ParquetMetadataFunc {
329 fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
330 let filename = match exprs.first() {
331 Some(Expr::Literal(ScalarValue::Utf8(Some(s)), _)) => s, Some(Expr::Column(Column { name, .. })) => name, _ => {
334 return plan_err!(
335 "parquet_metadata requires string argument as its input"
336 );
337 }
338 };
339
340 let file = File::open(filename.clone())?;
341 let reader = SerializedFileReader::new(file)?;
342 let metadata = reader.metadata();
343
344 let schema = Arc::new(Schema::new(vec![
345 Field::new("filename", DataType::Utf8, true),
346 Field::new("row_group_id", DataType::Int64, true),
347 Field::new("row_group_num_rows", DataType::Int64, true),
348 Field::new("row_group_num_columns", DataType::Int64, true),
349 Field::new("row_group_bytes", DataType::Int64, true),
350 Field::new("column_id", DataType::Int64, true),
351 Field::new("file_offset", DataType::Int64, true),
352 Field::new("num_values", DataType::Int64, true),
353 Field::new("path_in_schema", DataType::Utf8, true),
354 Field::new("type", DataType::Utf8, true),
355 Field::new("stats_min", DataType::Utf8, true),
356 Field::new("stats_max", DataType::Utf8, true),
357 Field::new("stats_null_count", DataType::Int64, true),
358 Field::new("stats_distinct_count", DataType::Int64, true),
359 Field::new("stats_min_value", DataType::Utf8, true),
360 Field::new("stats_max_value", DataType::Utf8, true),
361 Field::new("compression", DataType::Utf8, true),
362 Field::new("encodings", DataType::Utf8, true),
363 Field::new("index_page_offset", DataType::Int64, true),
364 Field::new("dictionary_page_offset", DataType::Int64, true),
365 Field::new("data_page_offset", DataType::Int64, true),
366 Field::new("total_compressed_size", DataType::Int64, true),
367 Field::new("total_uncompressed_size", DataType::Int64, true),
368 ]));
369
370 let mut filename_arr = vec![];
372 let mut row_group_id_arr = vec![];
373 let mut row_group_num_rows_arr = vec![];
374 let mut row_group_num_columns_arr = vec![];
375 let mut row_group_bytes_arr = vec![];
376 let mut column_id_arr = vec![];
377 let mut file_offset_arr = vec![];
378 let mut num_values_arr = vec![];
379 let mut path_in_schema_arr = vec![];
380 let mut type_arr = vec![];
381 let mut stats_min_arr = vec![];
382 let mut stats_max_arr = vec![];
383 let mut stats_null_count_arr = vec![];
384 let mut stats_distinct_count_arr = vec![];
385 let mut stats_min_value_arr = vec![];
386 let mut stats_max_value_arr = vec![];
387 let mut compression_arr = vec![];
388 let mut encodings_arr = vec![];
389 let mut index_page_offset_arr = vec![];
390 let mut dictionary_page_offset_arr = vec![];
391 let mut data_page_offset_arr = vec![];
392 let mut total_compressed_size_arr = vec![];
393 let mut total_uncompressed_size_arr = vec![];
394 for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
395 for (col_idx, column) in row_group.columns().iter().enumerate() {
396 filename_arr.push(filename.clone());
397 row_group_id_arr.push(rg_idx as i64);
398 row_group_num_rows_arr.push(row_group.num_rows());
399 row_group_num_columns_arr.push(row_group.num_columns() as i64);
400 row_group_bytes_arr.push(row_group.total_byte_size());
401 column_id_arr.push(col_idx as i64);
402 file_offset_arr.push(column.file_offset());
403 num_values_arr.push(column.num_values());
404 path_in_schema_arr.push(column.column_path().to_string());
405 type_arr.push(column.column_type().to_string());
406 let converted_type = column.column_descr().converted_type();
407
408 if let Some(s) = column.statistics() {
409 let (min_val, max_val) =
410 convert_parquet_statistics(s, converted_type);
411 stats_min_arr.push(min_val.clone());
412 stats_max_arr.push(max_val.clone());
413 stats_null_count_arr.push(s.null_count_opt().map(|c| c as i64));
414 stats_distinct_count_arr
415 .push(s.distinct_count_opt().map(|c| c as i64));
416 stats_min_value_arr.push(min_val);
417 stats_max_value_arr.push(max_val);
418 } else {
419 stats_min_arr.push(None);
420 stats_max_arr.push(None);
421 stats_null_count_arr.push(None);
422 stats_distinct_count_arr.push(None);
423 stats_min_value_arr.push(None);
424 stats_max_value_arr.push(None);
425 };
426 compression_arr.push(format!("{:?}", column.compression()));
427 let encodings: Vec<_> = column.encodings().collect();
429 encodings_arr.push(format!("{:?}", encodings));
430 index_page_offset_arr.push(column.index_page_offset());
431 dictionary_page_offset_arr.push(column.dictionary_page_offset());
432 data_page_offset_arr.push(column.data_page_offset());
433 total_compressed_size_arr.push(column.compressed_size());
434 total_uncompressed_size_arr.push(column.uncompressed_size());
435 }
436 }
437
438 let rb = RecordBatch::try_new(
439 schema.clone(),
440 vec![
441 Arc::new(StringArray::from(filename_arr)),
442 Arc::new(Int64Array::from(row_group_id_arr)),
443 Arc::new(Int64Array::from(row_group_num_rows_arr)),
444 Arc::new(Int64Array::from(row_group_num_columns_arr)),
445 Arc::new(Int64Array::from(row_group_bytes_arr)),
446 Arc::new(Int64Array::from(column_id_arr)),
447 Arc::new(Int64Array::from(file_offset_arr)),
448 Arc::new(Int64Array::from(num_values_arr)),
449 Arc::new(StringArray::from(path_in_schema_arr)),
450 Arc::new(StringArray::from(type_arr)),
451 Arc::new(StringArray::from(stats_min_arr)),
452 Arc::new(StringArray::from(stats_max_arr)),
453 Arc::new(Int64Array::from(stats_null_count_arr)),
454 Arc::new(Int64Array::from(stats_distinct_count_arr)),
455 Arc::new(StringArray::from(stats_min_value_arr)),
456 Arc::new(StringArray::from(stats_max_value_arr)),
457 Arc::new(StringArray::from(compression_arr)),
458 Arc::new(StringArray::from(encodings_arr)),
459 Arc::new(Int64Array::from(index_page_offset_arr)),
460 Arc::new(Int64Array::from(dictionary_page_offset_arr)),
461 Arc::new(Int64Array::from(data_page_offset_arr)),
462 Arc::new(Int64Array::from(total_compressed_size_arr)),
463 Arc::new(Int64Array::from(total_uncompressed_size_arr)),
464 ],
465 )?;
466
467 let parquet_metadata = ParquetMetadataTable { schema, batch: rb };
468 Ok(Arc::new(parquet_metadata))
469 }
470}
471
472#[derive(Debug)]
474struct MetadataCacheTable {
475 schema: SchemaRef,
476 batch: RecordBatch,
477}
478
479#[async_trait]
480impl TableProvider for MetadataCacheTable {
481 fn as_any(&self) -> &dyn std::any::Any {
482 self
483 }
484
485 fn schema(&self) -> arrow::datatypes::SchemaRef {
486 self.schema.clone()
487 }
488
489 fn table_type(&self) -> datafusion::logical_expr::TableType {
490 datafusion::logical_expr::TableType::Base
491 }
492
493 async fn scan(
494 &self,
495 _state: &dyn Session,
496 projection: Option<&Vec<usize>>,
497 _filters: &[Expr],
498 _limit: Option<usize>,
499 ) -> Result<Arc<dyn ExecutionPlan>> {
500 Ok(MemorySourceConfig::try_new_exec(
501 &[vec![self.batch.clone()]],
502 TableProvider::schema(self),
503 projection.cloned(),
504 )?)
505 }
506}
507
508#[derive(Debug)]
509pub struct MetadataCacheFunc {
510 cache_manager: Arc<CacheManager>,
511}
512
513impl MetadataCacheFunc {
514 pub fn new(cache_manager: Arc<CacheManager>) -> Self {
515 Self { cache_manager }
516 }
517}
518
519impl TableFunctionImpl for MetadataCacheFunc {
520 fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
521 if !exprs.is_empty() {
522 return plan_err!("metadata_cache should have no arguments");
523 }
524
525 let schema = Arc::new(Schema::new(vec![
526 Field::new("path", DataType::Utf8, false),
527 Field::new(
528 "file_modified",
529 DataType::Timestamp(TimeUnit::Millisecond, None),
530 false,
531 ),
532 Field::new("file_size_bytes", DataType::UInt64, false),
533 Field::new("e_tag", DataType::Utf8, true),
534 Field::new("version", DataType::Utf8, true),
535 Field::new("metadata_size_bytes", DataType::UInt64, false),
536 Field::new("hits", DataType::UInt64, false),
537 Field::new("extra", DataType::Utf8, true),
538 ]));
539
540 let mut path_arr = vec![];
542 let mut file_modified_arr = vec![];
543 let mut file_size_bytes_arr = vec![];
544 let mut e_tag_arr = vec![];
545 let mut version_arr = vec![];
546 let mut metadata_size_bytes = vec![];
547 let mut hits_arr = vec![];
548 let mut extra_arr = vec![];
549
550 let cached_entries = self.cache_manager.get_file_metadata_cache().list_entries();
551
552 for (path, entry) in cached_entries {
553 path_arr.push(path.to_string());
554 file_modified_arr
555 .push(Some(entry.object_meta.last_modified.timestamp_millis()));
556 file_size_bytes_arr.push(entry.object_meta.size);
557 e_tag_arr.push(entry.object_meta.e_tag);
558 version_arr.push(entry.object_meta.version);
559 metadata_size_bytes.push(entry.size_bytes as u64);
560 hits_arr.push(entry.hits as u64);
561
562 let mut extra = entry
563 .extra
564 .iter()
565 .map(|(k, v)| format!("{k}={v}"))
566 .collect::<Vec<_>>();
567 extra.sort();
568 extra_arr.push(extra.join(" "));
569 }
570
571 let batch = RecordBatch::try_new(
572 schema.clone(),
573 vec![
574 Arc::new(StringArray::from(path_arr)),
575 Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
576 Arc::new(UInt64Array::from(file_size_bytes_arr)),
577 Arc::new(StringArray::from(e_tag_arr)),
578 Arc::new(StringArray::from(version_arr)),
579 Arc::new(UInt64Array::from(metadata_size_bytes)),
580 Arc::new(UInt64Array::from(hits_arr)),
581 Arc::new(StringArray::from(extra_arr)),
582 ],
583 )?;
584
585 let metadata_cache = MetadataCacheTable { schema, batch };
586 Ok(Arc::new(metadata_cache))
587 }
588}
589
590#[derive(Debug)]
592struct StatisticsCacheTable {
593 schema: SchemaRef,
594 batch: RecordBatch,
595}
596
597#[async_trait]
598impl TableProvider for StatisticsCacheTable {
599 fn as_any(&self) -> &dyn std::any::Any {
600 self
601 }
602
603 fn schema(&self) -> arrow::datatypes::SchemaRef {
604 self.schema.clone()
605 }
606
607 fn table_type(&self) -> datafusion::logical_expr::TableType {
608 datafusion::logical_expr::TableType::Base
609 }
610
611 async fn scan(
612 &self,
613 _state: &dyn Session,
614 projection: Option<&Vec<usize>>,
615 _filters: &[Expr],
616 _limit: Option<usize>,
617 ) -> Result<Arc<dyn ExecutionPlan>> {
618 Ok(MemorySourceConfig::try_new_exec(
619 &[vec![self.batch.clone()]],
620 TableProvider::schema(self),
621 projection.cloned(),
622 )?)
623 }
624}
625
626#[derive(Debug)]
627pub struct StatisticsCacheFunc {
628 cache_manager: Arc<CacheManager>,
629}
630
631impl StatisticsCacheFunc {
632 pub fn new(cache_manager: Arc<CacheManager>) -> Self {
633 Self { cache_manager }
634 }
635}
636
637impl TableFunctionImpl for StatisticsCacheFunc {
638 fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
639 if !exprs.is_empty() {
640 return plan_err!("statistics_cache should have no arguments");
641 }
642
643 let schema = Arc::new(Schema::new(vec![
644 Field::new("path", DataType::Utf8, false),
645 Field::new(
646 "file_modified",
647 DataType::Timestamp(TimeUnit::Millisecond, None),
648 false,
649 ),
650 Field::new("file_size_bytes", DataType::UInt64, false),
651 Field::new("e_tag", DataType::Utf8, true),
652 Field::new("version", DataType::Utf8, true),
653 Field::new("num_rows", DataType::Utf8, false),
654 Field::new("num_columns", DataType::UInt64, false),
655 Field::new("table_size_bytes", DataType::Utf8, false),
656 Field::new("statistics_size_bytes", DataType::UInt64, false),
657 ]));
658
659 let mut path_arr = vec![];
661 let mut file_modified_arr = vec![];
662 let mut file_size_bytes_arr = vec![];
663 let mut e_tag_arr = vec![];
664 let mut version_arr = vec![];
665 let mut num_rows_arr = vec![];
666 let mut num_columns_arr = vec![];
667 let mut table_size_bytes_arr = vec![];
668 let mut statistics_size_bytes_arr = vec![];
669
670 if let Some(file_statistics_cache) = self.cache_manager.get_file_statistic_cache()
671 {
672 for (path, entry) in file_statistics_cache.list_entries() {
673 path_arr.push(path.to_string());
674 file_modified_arr
675 .push(Some(entry.object_meta.last_modified.timestamp_millis()));
676 file_size_bytes_arr.push(entry.object_meta.size);
677 e_tag_arr.push(entry.object_meta.e_tag);
678 version_arr.push(entry.object_meta.version);
679 num_rows_arr.push(entry.num_rows.to_string());
680 num_columns_arr.push(entry.num_columns as u64);
681 table_size_bytes_arr.push(entry.table_size_bytes.to_string());
682 statistics_size_bytes_arr.push(entry.statistics_size_bytes as u64);
683 }
684 }
685
686 let batch = RecordBatch::try_new(
687 schema.clone(),
688 vec![
689 Arc::new(StringArray::from(path_arr)),
690 Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
691 Arc::new(UInt64Array::from(file_size_bytes_arr)),
692 Arc::new(StringArray::from(e_tag_arr)),
693 Arc::new(StringArray::from(version_arr)),
694 Arc::new(StringArray::from(num_rows_arr)),
695 Arc::new(UInt64Array::from(num_columns_arr)),
696 Arc::new(StringArray::from(table_size_bytes_arr)),
697 Arc::new(UInt64Array::from(statistics_size_bytes_arr)),
698 ],
699 )?;
700
701 let statistics_cache = StatisticsCacheTable { schema, batch };
702 Ok(Arc::new(statistics_cache))
703 }
704}
705
706#[derive(Debug)]
724struct ListFilesCacheTable {
725 schema: SchemaRef,
726 batch: RecordBatch,
727}
728
729#[async_trait]
730impl TableProvider for ListFilesCacheTable {
731 fn as_any(&self) -> &dyn std::any::Any {
732 self
733 }
734
735 fn schema(&self) -> arrow::datatypes::SchemaRef {
736 self.schema.clone()
737 }
738
739 fn table_type(&self) -> datafusion::logical_expr::TableType {
740 datafusion::logical_expr::TableType::Base
741 }
742
743 async fn scan(
744 &self,
745 _state: &dyn Session,
746 projection: Option<&Vec<usize>>,
747 _filters: &[Expr],
748 _limit: Option<usize>,
749 ) -> Result<Arc<dyn ExecutionPlan>> {
750 Ok(MemorySourceConfig::try_new_exec(
751 &[vec![self.batch.clone()]],
752 TableProvider::schema(self),
753 projection.cloned(),
754 )?)
755 }
756}
757
758#[derive(Debug)]
759pub struct ListFilesCacheFunc {
760 cache_manager: Arc<CacheManager>,
761}
762
763impl ListFilesCacheFunc {
764 pub fn new(cache_manager: Arc<CacheManager>) -> Self {
765 Self { cache_manager }
766 }
767}
768
769impl TableFunctionImpl for ListFilesCacheFunc {
770 fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
771 if !exprs.is_empty() {
772 return plan_err!("list_files_cache should have no arguments");
773 }
774
775 let nested_fields = Fields::from(vec![
776 Field::new("file_path", DataType::Utf8, false),
777 Field::new(
778 "file_modified",
779 DataType::Timestamp(TimeUnit::Millisecond, None),
780 false,
781 ),
782 Field::new("file_size_bytes", DataType::UInt64, false),
783 Field::new("e_tag", DataType::Utf8, true),
784 Field::new("version", DataType::Utf8, true),
785 ]);
786
787 let metadata_field =
788 Field::new("metadata", DataType::Struct(nested_fields.clone()), true);
789
790 let schema = Arc::new(Schema::new(vec![
791 Field::new("table", DataType::Utf8, false),
792 Field::new("path", DataType::Utf8, false),
793 Field::new("metadata_size_bytes", DataType::UInt64, false),
794 Field::new(
796 "expires_in",
797 DataType::Duration(TimeUnit::Millisecond),
798 true,
799 ),
800 Field::new(
801 "metadata_list",
802 DataType::List(Arc::new(metadata_field.clone())),
803 true,
804 ),
805 ]));
806
807 let mut table_arr = vec![];
808 let mut path_arr = vec![];
809 let mut metadata_size_bytes_arr = vec![];
810 let mut expires_arr = vec![];
811
812 let mut file_path_arr = vec![];
813 let mut file_modified_arr = vec![];
814 let mut file_size_bytes_arr = vec![];
815 let mut etag_arr = vec![];
816 let mut version_arr = vec![];
817 let mut offsets: Vec<i32> = vec![0];
818
819 if let Some(list_files_cache) = self.cache_manager.get_list_files_cache() {
820 let now = Instant::now();
821 let mut current_offset: i32 = 0;
822
823 for (path, entry) in list_files_cache.list_entries() {
824 table_arr.push(path.table.map_or("NULL".to_string(), |t| t.to_string()));
825 path_arr.push(path.path.to_string());
826 metadata_size_bytes_arr.push(entry.size_bytes as u64);
827 expires_arr.push(
829 entry
830 .expires
831 .map(|t| t.duration_since(now).as_millis() as i64),
832 );
833
834 for meta in entry.metas.iter() {
835 file_path_arr.push(meta.location.to_string());
836 file_modified_arr.push(meta.last_modified.timestamp_millis());
837 file_size_bytes_arr.push(meta.size);
838 etag_arr.push(meta.e_tag.clone());
839 version_arr.push(meta.version.clone());
840 }
841 current_offset += entry.metas.len() as i32;
842 offsets.push(current_offset);
843 }
844 }
845
846 let struct_arr = StructArray::new(
847 nested_fields,
848 vec![
849 Arc::new(StringArray::from(file_path_arr)),
850 Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
851 Arc::new(UInt64Array::from(file_size_bytes_arr)),
852 Arc::new(StringArray::from(etag_arr)),
853 Arc::new(StringArray::from(version_arr)),
854 ],
855 None,
856 );
857
858 let offsets_buffer: OffsetBuffer<i32> =
859 OffsetBuffer::new(ScalarBuffer::from(Buffer::from_vec(offsets)));
860
861 let batch = RecordBatch::try_new(
862 schema.clone(),
863 vec![
864 Arc::new(StringArray::from(table_arr)),
865 Arc::new(StringArray::from(path_arr)),
866 Arc::new(UInt64Array::from(metadata_size_bytes_arr)),
867 Arc::new(DurationMillisecondArray::from(expires_arr)),
868 Arc::new(GenericListArray::new(
869 Arc::new(metadata_field),
870 offsets_buffer,
871 Arc::new(struct_arr),
872 None,
873 )),
874 ],
875 )?;
876
877 let list_files_cache = ListFilesCacheTable { schema, batch };
878 Ok(Arc::new(list_files_cache))
879 }
880}