datafusion_catalog/memory/
table.rs1use std::any::Any;
21use std::collections::HashMap;
22use std::fmt::Debug;
23use std::sync::Arc;
24
25use crate::TableProvider;
26use datafusion_common::error::Result;
27use datafusion_expr::Expr;
28use datafusion_expr::TableType;
29use datafusion_physical_expr::create_physical_sort_exprs;
30use datafusion_physical_plan::repartition::RepartitionExec;
31use datafusion_physical_plan::{
32 common, ExecutionPlan, ExecutionPlanProperties, Partitioning,
33};
34
35use arrow::datatypes::SchemaRef;
36use arrow::record_batch::RecordBatch;
37use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt};
38use datafusion_common_runtime::JoinSet;
39use datafusion_datasource::memory::MemSink;
40use datafusion_datasource::memory::MemorySourceConfig;
41use datafusion_datasource::sink::DataSinkExec;
42use datafusion_datasource::source::DataSourceExec;
43use datafusion_expr::dml::InsertOp;
44use datafusion_expr::SortExpr;
45use datafusion_session::Session;
46
47use async_trait::async_trait;
48use futures::StreamExt;
49use log::debug;
50use parking_lot::Mutex;
51use tokio::sync::RwLock;
52
53pub use datafusion_datasource::memory::PartitionData;
55
56#[derive(Debug)]
61pub struct MemTable {
62 schema: SchemaRef,
63 pub batches: Vec<PartitionData>,
65 constraints: Constraints,
66 column_defaults: HashMap<String, Expr>,
67 pub sort_order: Arc<Mutex<Vec<Vec<SortExpr>>>>,
70}
71
72impl MemTable {
73 pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) -> Result<Self> {
75 for batches in partitions.iter().flatten() {
76 let batches_schema = batches.schema();
77 if !schema.contains(&batches_schema) {
78 debug!(
79 "mem table schema does not contain batches schema. \
80 Target_schema: {schema:?}. Batches Schema: {batches_schema:?}"
81 );
82 return plan_err!("Mismatch between schema and batches");
83 }
84 }
85
86 Ok(Self {
87 schema,
88 batches: partitions
89 .into_iter()
90 .map(|e| Arc::new(RwLock::new(e)))
91 .collect::<Vec<_>>(),
92 constraints: Constraints::empty(),
93 column_defaults: HashMap::new(),
94 sort_order: Arc::new(Mutex::new(vec![])),
95 })
96 }
97
98 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
100 self.constraints = constraints;
101 self
102 }
103
104 pub fn with_column_defaults(
106 mut self,
107 column_defaults: HashMap<String, Expr>,
108 ) -> Self {
109 self.column_defaults = column_defaults;
110 self
111 }
112
113 pub fn with_sort_order(self, mut sort_order: Vec<Vec<SortExpr>>) -> Self {
124 std::mem::swap(self.sort_order.lock().as_mut(), &mut sort_order);
125 self
126 }
127
128 pub async fn load(
130 t: Arc<dyn TableProvider>,
131 output_partitions: Option<usize>,
132 state: &dyn Session,
133 ) -> Result<Self> {
134 let schema = t.schema();
135 let constraints = t.constraints();
136 let exec = t.scan(state, None, &[], None).await?;
137 let partition_count = exec.output_partitioning().partition_count();
138
139 let mut join_set = JoinSet::new();
140
141 for part_idx in 0..partition_count {
142 let task = state.task_ctx();
143 let exec = Arc::clone(&exec);
144 join_set.spawn(async move {
145 let stream = exec.execute(part_idx, task)?;
146 common::collect(stream).await
147 });
148 }
149
150 let mut data: Vec<Vec<RecordBatch>> =
151 Vec::with_capacity(exec.output_partitioning().partition_count());
152
153 while let Some(result) = join_set.join_next().await {
154 match result {
155 Ok(res) => data.push(res?),
156 Err(e) => {
157 if e.is_panic() {
158 std::panic::resume_unwind(e.into_panic());
159 } else {
160 unreachable!();
161 }
162 }
163 }
164 }
165
166 let mut exec = DataSourceExec::new(Arc::new(MemorySourceConfig::try_new(
167 &data,
168 Arc::clone(&schema),
169 None,
170 )?));
171 if let Some(cons) = constraints {
172 exec = exec.with_constraints(cons.clone());
173 }
174
175 if let Some(num_partitions) = output_partitions {
176 let exec = RepartitionExec::try_new(
177 Arc::new(exec),
178 Partitioning::RoundRobinBatch(num_partitions),
179 )?;
180
181 let mut output_partitions = vec![];
183 for i in 0..exec.properties().output_partitioning().partition_count() {
184 let task_ctx = state.task_ctx();
186 let mut stream = exec.execute(i, task_ctx)?;
187 let mut batches = vec![];
188 while let Some(result) = stream.next().await {
189 batches.push(result?);
190 }
191 output_partitions.push(batches);
192 }
193
194 return MemTable::try_new(Arc::clone(&schema), output_partitions);
195 }
196 MemTable::try_new(Arc::clone(&schema), data)
197 }
198}
199
200#[async_trait]
201impl TableProvider for MemTable {
202 fn as_any(&self) -> &dyn Any {
203 self
204 }
205
206 fn schema(&self) -> SchemaRef {
207 Arc::clone(&self.schema)
208 }
209
210 fn constraints(&self) -> Option<&Constraints> {
211 Some(&self.constraints)
212 }
213
214 fn table_type(&self) -> TableType {
215 TableType::Base
216 }
217
218 async fn scan(
219 &self,
220 state: &dyn Session,
221 projection: Option<&Vec<usize>>,
222 _filters: &[Expr],
223 _limit: Option<usize>,
224 ) -> Result<Arc<dyn ExecutionPlan>> {
225 let mut partitions = vec![];
226 for arc_inner_vec in self.batches.iter() {
227 let inner_vec = arc_inner_vec.read().await;
228 partitions.push(inner_vec.clone())
229 }
230
231 let mut source =
232 MemorySourceConfig::try_new(&partitions, self.schema(), projection.cloned())?;
233
234 let show_sizes = state.config_options().explain.show_sizes;
235 source = source.with_show_sizes(show_sizes);
236
237 let sort_order = self.sort_order.lock();
239 if !sort_order.is_empty() {
240 let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?;
241
242 let file_sort_order = sort_order
243 .iter()
244 .map(|sort_exprs| {
245 create_physical_sort_exprs(
246 sort_exprs,
247 &df_schema,
248 state.execution_props(),
249 )
250 })
251 .collect::<Result<Vec<_>>>()?;
252 source = source.try_with_sort_information(file_sort_order)?;
253 }
254
255 Ok(DataSourceExec::from_data_source(source))
256 }
257
258 async fn insert_into(
273 &self,
274 _state: &dyn Session,
275 input: Arc<dyn ExecutionPlan>,
276 insert_op: InsertOp,
277 ) -> Result<Arc<dyn ExecutionPlan>> {
278 *self.sort_order.lock() = vec![];
280
281 self.schema()
284 .logically_equivalent_names_and_types(&input.schema())?;
285
286 if insert_op != InsertOp::Append {
287 return not_impl_err!("{insert_op} not implemented for MemoryTable yet");
288 }
289 let sink = MemSink::try_new(self.batches.clone(), Arc::clone(&self.schema))?;
290 Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None)))
291 }
292
293 fn get_column_default(&self, column: &str) -> Option<&Expr> {
294 self.column_defaults.get(column)
295 }
296}