datafusion_catalog/
view.rs1use std::{borrow::Cow, sync::Arc};
21
22use crate::Session;
23use crate::TableProvider;
24
25use arrow::datatypes::SchemaRef;
26use async_trait::async_trait;
27use datafusion_common::Column;
28use datafusion_common::error::Result;
29use datafusion_expr::TableType;
30use datafusion_expr::{Expr, LogicalPlan};
31use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
32use datafusion_physical_plan::ExecutionPlan;
33
34#[derive(Debug)]
36pub struct ViewTable {
37 logical_plan: LogicalPlan,
39 table_schema: SchemaRef,
41 definition: Option<String>,
43}
44
45impl ViewTable {
46 pub fn new(logical_plan: LogicalPlan, definition: Option<String>) -> Self {
54 let table_schema = Arc::clone(logical_plan.schema().inner());
55 Self {
56 logical_plan,
57 table_schema,
58 definition,
59 }
60 }
61
62 #[deprecated(
63 since = "47.0.0",
64 note = "Use `ViewTable::new` instead and apply TypeCoercion to the logical plan if needed"
65 )]
66 pub fn try_new(
67 logical_plan: LogicalPlan,
68 definition: Option<String>,
69 ) -> Result<Self> {
70 Ok(Self::new(logical_plan, definition))
71 }
72
73 pub fn definition(&self) -> Option<&String> {
75 self.definition.as_ref()
76 }
77
78 pub fn logical_plan(&self) -> &LogicalPlan {
80 &self.logical_plan
81 }
82}
83
84#[async_trait]
85impl TableProvider for ViewTable {
86 fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> {
87 Some(Cow::Borrowed(&self.logical_plan))
88 }
89
90 fn schema(&self) -> SchemaRef {
91 Arc::clone(&self.table_schema)
92 }
93
94 fn table_type(&self) -> TableType {
95 TableType::View
96 }
97
98 fn get_table_definition(&self) -> Option<&str> {
99 self.definition.as_deref()
100 }
101 fn supports_filters_pushdown(
102 &self,
103 filters: &[&Expr],
104 ) -> Result<Vec<TableProviderFilterPushDown>> {
105 Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
107 }
108
109 async fn scan(
110 &self,
111 state: &dyn Session,
112 projection: Option<&Vec<usize>>,
113 filters: &[Expr],
114 limit: Option<usize>,
115 ) -> Result<Arc<dyn ExecutionPlan>> {
116 let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
117 let plan = self.logical_plan().clone();
118 let mut plan = LogicalPlanBuilder::from(plan);
119
120 if let Some(filter) = filter {
121 plan = plan.filter(filter)?;
122 }
123
124 let mut plan = if let Some(projection) = projection {
125 let current_projection =
127 (0..plan.schema().fields().len()).collect::<Vec<usize>>();
128 if projection == ¤t_projection {
129 plan
130 } else {
131 let fields: Vec<Expr> = projection
132 .iter()
133 .map(|i| {
134 Expr::Column(Column::from(
135 self.logical_plan.schema().qualified_field(*i),
136 ))
137 })
138 .collect();
139 plan.project(fields)?
140 }
141 } else {
142 plan
143 };
144
145 if let Some(limit) = limit {
146 plan = plan.limit(0, Some(limit))?;
147 }
148
149 state.create_physical_plan(&plan.build()?).await
150 }
151}