datafusion_catalog/
cte_worktable.rs1use std::any::Any;
21use std::borrow::Cow;
22use std::sync::Arc;
23
24use arrow::datatypes::SchemaRef;
25use async_trait::async_trait;
26use datafusion_common::error::Result;
27use datafusion_expr::{Expr, LogicalPlan, TableProviderFilterPushDown, TableType};
28use datafusion_physical_plan::ExecutionPlan;
29use datafusion_physical_plan::work_table::WorkTableExec;
30
31use crate::{ScanArgs, ScanResult, Session, TableProvider};
32
33#[derive(Debug)]
37pub struct CteWorkTable {
38 name: String,
40 table_schema: SchemaRef,
42}
43
44impl CteWorkTable {
45 pub fn new(name: &str, table_schema: SchemaRef) -> Self {
49 Self {
50 name: name.to_owned(),
51 table_schema,
52 }
53 }
54
55 pub fn name(&self) -> &str {
57 &self.name
58 }
59
60 pub fn schema(&self) -> SchemaRef {
62 Arc::clone(&self.table_schema)
63 }
64}
65
66#[async_trait]
67impl TableProvider for CteWorkTable {
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71
72 fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> {
73 None
74 }
75
76 fn schema(&self) -> SchemaRef {
77 Arc::clone(&self.table_schema)
78 }
79
80 fn table_type(&self) -> TableType {
81 TableType::Temporary
82 }
83
84 async fn scan(
85 &self,
86 state: &dyn Session,
87 projection: Option<&Vec<usize>>,
88 filters: &[Expr],
89 limit: Option<usize>,
90 ) -> Result<Arc<dyn ExecutionPlan>> {
91 let options = ScanArgs::default()
92 .with_projection(projection.map(|p| p.as_slice()))
93 .with_filters(Some(filters))
94 .with_limit(limit);
95 Ok(self.scan_with_args(state, options).await?.into_inner())
96 }
97
98 async fn scan_with_args<'a>(
99 &self,
100 _state: &dyn Session,
101 args: ScanArgs<'a>,
102 ) -> Result<ScanResult> {
103 Ok(ScanResult::new(Arc::new(WorkTableExec::new(
104 self.name.clone(),
105 Arc::clone(&self.table_schema),
106 args.projection().map(|p| p.to_vec()),
107 )?)))
108 }
109
110 fn supports_filters_pushdown(
111 &self,
112 filters: &[&Expr],
113 ) -> Result<Vec<TableProviderFilterPushDown>> {
114 Ok(vec![
116 TableProviderFilterPushDown::Unsupported;
117 filters.len()
118 ])
119 }
120}