Skip to main content

datafusion_catalog/
view.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! View data source which uses a LogicalPlan as it's input.
19
20use std::{borrow::Cow, sync::Arc};
21
22use crate::Session;
23use crate::TableProvider;
24
25use arrow::datatypes::SchemaRef;
26use async_trait::async_trait;
27use datafusion_common::Column;
28use datafusion_common::error::Result;
29use datafusion_expr::TableType;
30use datafusion_expr::{Expr, LogicalPlan};
31use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
32use datafusion_physical_plan::ExecutionPlan;
33
34/// An implementation of `TableProvider` that uses another logical plan.
35#[derive(Debug)]
36pub struct ViewTable {
37    /// LogicalPlan of the view
38    logical_plan: LogicalPlan,
39    /// File fields + partition columns
40    table_schema: SchemaRef,
41    /// SQL used to create the view, if available
42    definition: Option<String>,
43}
44
45impl ViewTable {
46    /// Create new view that is executed at query runtime.
47    ///
48    /// Takes a `LogicalPlan` and optionally the SQL text of the `CREATE`
49    /// statement.
50    ///
51    /// Notes: the `LogicalPlan` is not validated or type coerced. If this is
52    /// needed it should be done after calling this function.
53    pub fn new(logical_plan: LogicalPlan, definition: Option<String>) -> Self {
54        let table_schema = Arc::clone(logical_plan.schema().inner());
55        Self {
56            logical_plan,
57            table_schema,
58            definition,
59        }
60    }
61
62    #[deprecated(
63        since = "47.0.0",
64        note = "Use `ViewTable::new` instead and apply TypeCoercion to the logical plan if needed"
65    )]
66    pub fn try_new(
67        logical_plan: LogicalPlan,
68        definition: Option<String>,
69    ) -> Result<Self> {
70        Ok(Self::new(logical_plan, definition))
71    }
72
73    /// Get definition ref
74    pub fn definition(&self) -> Option<&String> {
75        self.definition.as_ref()
76    }
77
78    /// Get logical_plan ref
79    pub fn logical_plan(&self) -> &LogicalPlan {
80        &self.logical_plan
81    }
82}
83
84#[async_trait]
85impl TableProvider for ViewTable {
86    fn get_logical_plan(&'_ self) -> Option<Cow<'_, LogicalPlan>> {
87        Some(Cow::Borrowed(&self.logical_plan))
88    }
89
90    fn schema(&self) -> SchemaRef {
91        Arc::clone(&self.table_schema)
92    }
93
94    fn table_type(&self) -> TableType {
95        TableType::View
96    }
97
98    fn get_table_definition(&self) -> Option<&str> {
99        self.definition.as_deref()
100    }
101    fn supports_filters_pushdown(
102        &self,
103        filters: &[&Expr],
104    ) -> Result<Vec<TableProviderFilterPushDown>> {
105        // A filter is added on the View when given
106        Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
107    }
108
109    async fn scan(
110        &self,
111        state: &dyn Session,
112        projection: Option<&Vec<usize>>,
113        filters: &[Expr],
114        limit: Option<usize>,
115    ) -> Result<Arc<dyn ExecutionPlan>> {
116        let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
117        let plan = self.logical_plan().clone();
118        let mut plan = LogicalPlanBuilder::from(plan);
119
120        if let Some(filter) = filter {
121            plan = plan.filter(filter)?;
122        }
123
124        let mut plan = if let Some(projection) = projection {
125            // avoiding adding a redundant projection (e.g. SELECT * FROM view)
126            let current_projection =
127                (0..plan.schema().fields().len()).collect::<Vec<usize>>();
128            if projection == &current_projection {
129                plan
130            } else {
131                let fields: Vec<Expr> = projection
132                    .iter()
133                    .map(|i| {
134                        Expr::Column(Column::from(
135                            self.logical_plan.schema().qualified_field(*i),
136                        ))
137                    })
138                    .collect();
139                plan.project(fields)?
140            }
141        } else {
142            plan
143        };
144
145        if let Some(limit) = limit {
146            plan = plan.limit(0, Some(limit))?;
147        }
148
149        state.create_physical_plan(&plan.build()?).await
150    }
151}