datafusion_catalog/memory/
table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use crate::TableProvider;
26use datafusion_common::error::Result;
27use datafusion_expr::Expr;
28use datafusion_expr::TableType;
29use datafusion_physical_expr::create_physical_sort_exprs;
30use datafusion_physical_plan::repartition::RepartitionExec;
31use datafusion_physical_plan::{
32    common, ExecutionPlan, ExecutionPlanProperties, Partitioning,
33};
34
35use arrow::datatypes::SchemaRef;
36use arrow::record_batch::RecordBatch;
37use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt};
38use datafusion_common_runtime::JoinSet;
39use datafusion_datasource::memory::MemSink;
40use datafusion_datasource::memory::MemorySourceConfig;
41use datafusion_datasource::sink::DataSinkExec;
42use datafusion_datasource::source::DataSourceExec;
43use datafusion_expr::dml::InsertOp;
44use datafusion_expr::SortExpr;
45use datafusion_session::Session;
46
47use async_trait::async_trait;
48use futures::StreamExt;
49use log::debug;
50use parking_lot::Mutex;
51use tokio::sync::RwLock;
52
53// backward compatibility
54pub use datafusion_datasource::memory::PartitionData;
55
56/// In-memory data source for presenting a `Vec<RecordBatch>` as a
57/// data source that can be queried by DataFusion. This allows data to
58/// be pre-loaded into memory and then repeatedly queried without
59/// incurring additional file I/O overhead.
60#[derive(Debug)]
61pub struct MemTable {
62    schema: SchemaRef,
63    // batches used to be pub(crate), but it's needed to be public for the tests
64    pub batches: Vec<PartitionData>,
65    constraints: Constraints,
66    column_defaults: HashMap<String, Expr>,
67    /// Optional pre-known sort order(s). Must be `SortExpr`s.
68    /// inserting data into this table removes the order
69    pub sort_order: Arc<Mutex<Vec<Vec<SortExpr>>>>,
70}
71
72impl MemTable {
73    /// Create a new in-memory table from the provided schema and record batches
74    pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) -> Result<Self> {
75        for batches in partitions.iter().flatten() {
76            let batches_schema = batches.schema();
77            if !schema.contains(&batches_schema) {
78                debug!(
79                    "mem table schema does not contain batches schema. \
80                        Target_schema: {schema:?}. Batches Schema: {batches_schema:?}"
81                );
82                return plan_err!("Mismatch between schema and batches");
83            }
84        }
85
86        Ok(Self {
87            schema,
88            batches: partitions
89                .into_iter()
90                .map(|e| Arc::new(RwLock::new(e)))
91                .collect::<Vec<_>>(),
92            constraints: Constraints::empty(),
93            column_defaults: HashMap::new(),
94            sort_order: Arc::new(Mutex::new(vec![])),
95        })
96    }
97
98    /// Assign constraints
99    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
100        self.constraints = constraints;
101        self
102    }
103
104    /// Assign column defaults
105    pub fn with_column_defaults(
106        mut self,
107        column_defaults: HashMap<String, Expr>,
108    ) -> Self {
109        self.column_defaults = column_defaults;
110        self
111    }
112
113    /// Specify an optional pre-known sort order(s). Must be `SortExpr`s.
114    ///
115    /// If the data is not sorted by this order, DataFusion may produce
116    /// incorrect results.
117    ///
118    /// DataFusion may take advantage of this ordering to omit sorts
119    /// or use more efficient algorithms.
120    ///
121    /// Note that multiple sort orders are supported, if some are known to be
122    /// equivalent,
123    pub fn with_sort_order(self, mut sort_order: Vec<Vec<SortExpr>>) -> Self {
124        std::mem::swap(self.sort_order.lock().as_mut(), &mut sort_order);
125        self
126    }
127
128    /// Create a mem table by reading from another data source
129    pub async fn load(
130        t: Arc<dyn TableProvider>,
131        output_partitions: Option<usize>,
132        state: &dyn Session,
133    ) -> Result<Self> {
134        let schema = t.schema();
135        let constraints = t.constraints();
136        let exec = t.scan(state, None, &[], None).await?;
137        let partition_count = exec.output_partitioning().partition_count();
138
139        let mut join_set = JoinSet::new();
140
141        for part_idx in 0..partition_count {
142            let task = state.task_ctx();
143            let exec = Arc::clone(&exec);
144            join_set.spawn(async move {
145                let stream = exec.execute(part_idx, task)?;
146                common::collect(stream).await
147            });
148        }
149
150        let mut data: Vec<Vec<RecordBatch>> =
151            Vec::with_capacity(exec.output_partitioning().partition_count());
152
153        while let Some(result) = join_set.join_next().await {
154            match result {
155                Ok(res) => data.push(res?),
156                Err(e) => {
157                    if e.is_panic() {
158                        std::panic::resume_unwind(e.into_panic());
159                    } else {
160                        unreachable!();
161                    }
162                }
163            }
164        }
165
166        let mut exec = DataSourceExec::new(Arc::new(MemorySourceConfig::try_new(
167            &data,
168            Arc::clone(&schema),
169            None,
170        )?));
171        if let Some(cons) = constraints {
172            exec = exec.with_constraints(cons.clone());
173        }
174
175        if let Some(num_partitions) = output_partitions {
176            let exec = RepartitionExec::try_new(
177                Arc::new(exec),
178                Partitioning::RoundRobinBatch(num_partitions),
179            )?;
180
181            // execute and collect results
182            let mut output_partitions = vec![];
183            for i in 0..exec.properties().output_partitioning().partition_count() {
184                // execute this *output* partition and collect all batches
185                let task_ctx = state.task_ctx();
186                let mut stream = exec.execute(i, task_ctx)?;
187                let mut batches = vec![];
188                while let Some(result) = stream.next().await {
189                    batches.push(result?);
190                }
191                output_partitions.push(batches);
192            }
193
194            return MemTable::try_new(Arc::clone(&schema), output_partitions);
195        }
196        MemTable::try_new(Arc::clone(&schema), data)
197    }
198}
199
200#[async_trait]
201impl TableProvider for MemTable {
202    fn as_any(&self) -> &dyn Any {
203        self
204    }
205
206    fn schema(&self) -> SchemaRef {
207        Arc::clone(&self.schema)
208    }
209
210    fn constraints(&self) -> Option<&Constraints> {
211        Some(&self.constraints)
212    }
213
214    fn table_type(&self) -> TableType {
215        TableType::Base
216    }
217
218    async fn scan(
219        &self,
220        state: &dyn Session,
221        projection: Option<&Vec<usize>>,
222        _filters: &[Expr],
223        _limit: Option<usize>,
224    ) -> Result<Arc<dyn ExecutionPlan>> {
225        let mut partitions = vec![];
226        for arc_inner_vec in self.batches.iter() {
227            let inner_vec = arc_inner_vec.read().await;
228            partitions.push(inner_vec.clone())
229        }
230
231        let mut source =
232            MemorySourceConfig::try_new(&partitions, self.schema(), projection.cloned())?;
233
234        let show_sizes = state.config_options().explain.show_sizes;
235        source = source.with_show_sizes(show_sizes);
236
237        // add sort information if present
238        let sort_order = self.sort_order.lock();
239        if !sort_order.is_empty() {
240            let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?;
241
242            let file_sort_order = sort_order
243                .iter()
244                .map(|sort_exprs| {
245                    create_physical_sort_exprs(
246                        sort_exprs,
247                        &df_schema,
248                        state.execution_props(),
249                    )
250                })
251                .collect::<Result<Vec<_>>>()?;
252            source = source.try_with_sort_information(file_sort_order)?;
253        }
254
255        Ok(DataSourceExec::from_data_source(source))
256    }
257
258    /// Returns an ExecutionPlan that inserts the execution results of a given [`ExecutionPlan`] into this [`MemTable`].
259    ///
260    /// The [`ExecutionPlan`] must have the same schema as this [`MemTable`].
261    ///
262    /// # Arguments
263    ///
264    /// * `state` - The [`SessionState`] containing the context for executing the plan.
265    /// * `input` - The [`ExecutionPlan`] to execute and insert.
266    ///
267    /// # Returns
268    ///
269    /// * A plan that returns the number of rows written.
270    ///
271    /// [`SessionState`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html
272    async fn insert_into(
273        &self,
274        _state: &dyn Session,
275        input: Arc<dyn ExecutionPlan>,
276        insert_op: InsertOp,
277    ) -> Result<Arc<dyn ExecutionPlan>> {
278        // If we are inserting into the table, any sort order may be messed up so reset it here
279        *self.sort_order.lock() = vec![];
280
281        // Create a physical plan from the logical plan.
282        // Check that the schema of the plan matches the schema of this table.
283        self.schema()
284            .logically_equivalent_names_and_types(&input.schema())?;
285
286        if insert_op != InsertOp::Append {
287            return not_impl_err!("{insert_op} not implemented for MemoryTable yet");
288        }
289        let sink = MemSink::try_new(self.batches.clone(), Arc::clone(&self.schema))?;
290        Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None)))
291    }
292
293    fn get_column_default(&self, column: &str) -> Option<&Expr> {
294        self.column_defaults.get(column)
295    }
296}