datafusion_physical_plan/
work_table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the work table query plan
19
20use std::any::Any;
21use std::sync::{Arc, Mutex};
22
23use crate::coop::cooperative;
24use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
25use crate::memory::MemoryStream;
26use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
27use crate::{
28    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
29    SendableRecordBatchStream, Statistics,
30};
31
32use arrow::datatypes::SchemaRef;
33use arrow::record_batch::RecordBatch;
34use datafusion_common::{internal_datafusion_err, internal_err, Result};
35use datafusion_execution::memory_pool::MemoryReservation;
36use datafusion_execution::TaskContext;
37use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
38
39/// A vector of record batches with a memory reservation.
40#[derive(Debug)]
41pub(super) struct ReservedBatches {
42    batches: Vec<RecordBatch>,
43    #[allow(dead_code)]
44    reservation: MemoryReservation,
45}
46
47impl ReservedBatches {
48    pub(super) fn new(batches: Vec<RecordBatch>, reservation: MemoryReservation) -> Self {
49        ReservedBatches {
50            batches,
51            reservation,
52        }
53    }
54}
55
56/// The name is from PostgreSQL's terminology.
57/// See <https://wiki.postgresql.org/wiki/CTEReadme#How_Recursion_Works>
58/// This table serves as a mirror or buffer between each iteration of a recursive query.
59#[derive(Debug)]
60pub struct WorkTable {
61    batches: Mutex<Option<ReservedBatches>>,
62}
63
64impl WorkTable {
65    /// Create a new work table.
66    pub(super) fn new() -> Self {
67        Self {
68            batches: Mutex::new(None),
69        }
70    }
71
72    /// Take the previously written batches from the work table.
73    /// This will be called by the [`WorkTableExec`] when it is executed.
74    fn take(&self) -> Result<ReservedBatches> {
75        self.batches
76            .lock()
77            .unwrap()
78            .take()
79            .ok_or_else(|| internal_datafusion_err!("Unexpected empty work table"))
80    }
81
82    /// Update the results of a recursive query iteration to the work table.
83    pub(super) fn update(&self, batches: ReservedBatches) {
84        self.batches.lock().unwrap().replace(batches);
85    }
86}
87
88/// A temporary "working table" operation where the input data will be
89/// taken from the named handle during the execution and will be re-published
90/// as is (kind of like a mirror).
91///
92/// Most notably used in the implementation of recursive queries where the
93/// underlying relation does not exist yet but the data will come as the previous
94/// term is evaluated. This table will be used such that the recursive plan
95/// will register a receiver in the task context and this plan will use that
96/// receiver to get the data and stream it back up so that the batches are available
97/// in the next iteration.
98#[derive(Clone, Debug)]
99pub struct WorkTableExec {
100    /// Name of the relation handler
101    name: String,
102    /// The schema of the stream
103    schema: SchemaRef,
104    /// The work table
105    work_table: Arc<WorkTable>,
106    /// Execution metrics
107    metrics: ExecutionPlanMetricsSet,
108    /// Cache holding plan properties like equivalences, output partitioning etc.
109    cache: PlanProperties,
110}
111
112impl WorkTableExec {
113    /// Create a new execution plan for a worktable exec.
114    pub fn new(name: String, schema: SchemaRef) -> Self {
115        let cache = Self::compute_properties(Arc::clone(&schema));
116        Self {
117            name,
118            schema,
119            metrics: ExecutionPlanMetricsSet::new(),
120            work_table: Arc::new(WorkTable::new()),
121            cache,
122        }
123    }
124
125    /// Ref to name
126    pub fn name(&self) -> &str {
127        &self.name
128    }
129
130    /// Arc clone of ref to schema
131    pub fn schema(&self) -> SchemaRef {
132        Arc::clone(&self.schema)
133    }
134
135    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
136    fn compute_properties(schema: SchemaRef) -> PlanProperties {
137        PlanProperties::new(
138            EquivalenceProperties::new(schema),
139            Partitioning::UnknownPartitioning(1),
140            EmissionType::Incremental,
141            Boundedness::Bounded,
142        )
143        .with_scheduling_type(SchedulingType::Cooperative)
144    }
145}
146
147impl DisplayAs for WorkTableExec {
148    fn fmt_as(
149        &self,
150        t: DisplayFormatType,
151        f: &mut std::fmt::Formatter,
152    ) -> std::fmt::Result {
153        match t {
154            DisplayFormatType::Default | DisplayFormatType::Verbose => {
155                write!(f, "WorkTableExec: name={}", self.name)
156            }
157            DisplayFormatType::TreeRender => {
158                write!(f, "name={}", self.name)
159            }
160        }
161    }
162}
163
164impl ExecutionPlan for WorkTableExec {
165    fn name(&self) -> &'static str {
166        "WorkTableExec"
167    }
168
169    fn as_any(&self) -> &dyn Any {
170        self
171    }
172
173    fn properties(&self) -> &PlanProperties {
174        &self.cache
175    }
176
177    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
178        vec![]
179    }
180
181    fn with_new_children(
182        self: Arc<Self>,
183        _: Vec<Arc<dyn ExecutionPlan>>,
184    ) -> Result<Arc<dyn ExecutionPlan>> {
185        Ok(Arc::clone(&self) as Arc<dyn ExecutionPlan>)
186    }
187
188    /// Stream the batches that were written to the work table.
189    fn execute(
190        &self,
191        partition: usize,
192        _context: Arc<TaskContext>,
193    ) -> Result<SendableRecordBatchStream> {
194        // WorkTable streams must be the plan base.
195        if partition != 0 {
196            return internal_err!(
197                "WorkTableExec got an invalid partition {partition} (expected 0)"
198            );
199        }
200        let batch = self.work_table.take()?;
201
202        let stream =
203            MemoryStream::try_new(batch.batches, Arc::clone(&self.schema), None)?
204                .with_reservation(batch.reservation);
205        Ok(Box::pin(cooperative(stream)))
206    }
207
208    fn metrics(&self) -> Option<MetricsSet> {
209        Some(self.metrics.clone_inner())
210    }
211
212    fn statistics(&self) -> Result<Statistics> {
213        Ok(Statistics::new_unknown(&self.schema()))
214    }
215
216    fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
217        Ok(Statistics::new_unknown(&self.schema()))
218    }
219
220    /// Injects run-time state into this `WorkTableExec`.
221    ///
222    /// The only state this node currently understands is an [`Arc<WorkTable>`].
223    /// If `state` can be down-cast to that type, a new `WorkTableExec` backed
224    /// by the provided work table is returned.  Otherwise `None` is returned
225    /// so that callers can attempt to propagate the state further down the
226    /// execution plan tree.
227    fn with_new_state(
228        &self,
229        state: Arc<dyn Any + Send + Sync>,
230    ) -> Option<Arc<dyn ExecutionPlan>> {
231        // Down-cast to the expected state type; propagate `None` on failure
232        let work_table = state.downcast::<WorkTable>().ok()?;
233
234        Some(Arc::new(Self {
235            name: self.name.clone(),
236            schema: Arc::clone(&self.schema),
237            metrics: ExecutionPlanMetricsSet::new(),
238            work_table,
239            cache: self.cache.clone(),
240        }))
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use arrow::array::{ArrayRef, Int32Array};
248    use datafusion_execution::memory_pool::{MemoryConsumer, UnboundedMemoryPool};
249
250    #[test]
251    fn test_work_table() {
252        let work_table = WorkTable::new();
253        // Can't take from empty work_table
254        assert!(work_table.take().is_err());
255
256        let pool = Arc::new(UnboundedMemoryPool::default()) as _;
257        let mut reservation = MemoryConsumer::new("test_work_table").register(&pool);
258
259        // Update batch to work_table
260        let array: ArrayRef = Arc::new((0..5).collect::<Int32Array>());
261        let batch = RecordBatch::try_from_iter(vec![("col", array)]).unwrap();
262        reservation.try_grow(100).unwrap();
263        work_table.update(ReservedBatches::new(vec![batch.clone()], reservation));
264        // Take from work_table
265        let reserved_batches = work_table.take().unwrap();
266        assert_eq!(reserved_batches.batches, vec![batch.clone()]);
267
268        // Consume the batch by the MemoryStream
269        let memory_stream =
270            MemoryStream::try_new(reserved_batches.batches, batch.schema(), None)
271                .unwrap()
272                .with_reservation(reserved_batches.reservation);
273
274        // Should still be reserved
275        assert_eq!(pool.reserved(), 100);
276
277        // The reservation should be freed after drop the memory_stream
278        drop(memory_stream);
279        assert_eq!(pool.reserved(), 0);
280    }
281}