datafusion_physical_plan/
work_table.rs1use std::any::Any;
21use std::sync::{Arc, Mutex};
22
23use crate::coop::cooperative;
24use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
25use crate::memory::MemoryStream;
26use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
27use crate::{
28 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
29 SendableRecordBatchStream, Statistics,
30};
31
32use arrow::datatypes::SchemaRef;
33use arrow::record_batch::RecordBatch;
34use datafusion_common::{internal_datafusion_err, internal_err, Result};
35use datafusion_execution::memory_pool::MemoryReservation;
36use datafusion_execution::TaskContext;
37use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
38
39#[derive(Debug)]
41pub(super) struct ReservedBatches {
42 batches: Vec<RecordBatch>,
43 #[allow(dead_code)]
44 reservation: MemoryReservation,
45}
46
47impl ReservedBatches {
48 pub(super) fn new(batches: Vec<RecordBatch>, reservation: MemoryReservation) -> Self {
49 ReservedBatches {
50 batches,
51 reservation,
52 }
53 }
54}
55
56#[derive(Debug)]
60pub struct WorkTable {
61 batches: Mutex<Option<ReservedBatches>>,
62}
63
64impl WorkTable {
65 pub(super) fn new() -> Self {
67 Self {
68 batches: Mutex::new(None),
69 }
70 }
71
72 fn take(&self) -> Result<ReservedBatches> {
75 self.batches
76 .lock()
77 .unwrap()
78 .take()
79 .ok_or_else(|| internal_datafusion_err!("Unexpected empty work table"))
80 }
81
82 pub(super) fn update(&self, batches: ReservedBatches) {
84 self.batches.lock().unwrap().replace(batches);
85 }
86}
87
88#[derive(Clone, Debug)]
99pub struct WorkTableExec {
100 name: String,
102 schema: SchemaRef,
104 work_table: Arc<WorkTable>,
106 metrics: ExecutionPlanMetricsSet,
108 cache: PlanProperties,
110}
111
112impl WorkTableExec {
113 pub fn new(name: String, schema: SchemaRef) -> Self {
115 let cache = Self::compute_properties(Arc::clone(&schema));
116 Self {
117 name,
118 schema,
119 metrics: ExecutionPlanMetricsSet::new(),
120 work_table: Arc::new(WorkTable::new()),
121 cache,
122 }
123 }
124
125 pub fn name(&self) -> &str {
127 &self.name
128 }
129
130 pub fn schema(&self) -> SchemaRef {
132 Arc::clone(&self.schema)
133 }
134
135 fn compute_properties(schema: SchemaRef) -> PlanProperties {
137 PlanProperties::new(
138 EquivalenceProperties::new(schema),
139 Partitioning::UnknownPartitioning(1),
140 EmissionType::Incremental,
141 Boundedness::Bounded,
142 )
143 .with_scheduling_type(SchedulingType::Cooperative)
144 }
145}
146
147impl DisplayAs for WorkTableExec {
148 fn fmt_as(
149 &self,
150 t: DisplayFormatType,
151 f: &mut std::fmt::Formatter,
152 ) -> std::fmt::Result {
153 match t {
154 DisplayFormatType::Default | DisplayFormatType::Verbose => {
155 write!(f, "WorkTableExec: name={}", self.name)
156 }
157 DisplayFormatType::TreeRender => {
158 write!(f, "name={}", self.name)
159 }
160 }
161 }
162}
163
164impl ExecutionPlan for WorkTableExec {
165 fn name(&self) -> &'static str {
166 "WorkTableExec"
167 }
168
169 fn as_any(&self) -> &dyn Any {
170 self
171 }
172
173 fn properties(&self) -> &PlanProperties {
174 &self.cache
175 }
176
177 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
178 vec![]
179 }
180
181 fn with_new_children(
182 self: Arc<Self>,
183 _: Vec<Arc<dyn ExecutionPlan>>,
184 ) -> Result<Arc<dyn ExecutionPlan>> {
185 Ok(Arc::clone(&self) as Arc<dyn ExecutionPlan>)
186 }
187
188 fn execute(
190 &self,
191 partition: usize,
192 _context: Arc<TaskContext>,
193 ) -> Result<SendableRecordBatchStream> {
194 if partition != 0 {
196 return internal_err!(
197 "WorkTableExec got an invalid partition {partition} (expected 0)"
198 );
199 }
200 let batch = self.work_table.take()?;
201
202 let stream =
203 MemoryStream::try_new(batch.batches, Arc::clone(&self.schema), None)?
204 .with_reservation(batch.reservation);
205 Ok(Box::pin(cooperative(stream)))
206 }
207
208 fn metrics(&self) -> Option<MetricsSet> {
209 Some(self.metrics.clone_inner())
210 }
211
212 fn statistics(&self) -> Result<Statistics> {
213 Ok(Statistics::new_unknown(&self.schema()))
214 }
215
216 fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
217 Ok(Statistics::new_unknown(&self.schema()))
218 }
219
220 fn with_new_state(
228 &self,
229 state: Arc<dyn Any + Send + Sync>,
230 ) -> Option<Arc<dyn ExecutionPlan>> {
231 let work_table = state.downcast::<WorkTable>().ok()?;
233
234 Some(Arc::new(Self {
235 name: self.name.clone(),
236 schema: Arc::clone(&self.schema),
237 metrics: ExecutionPlanMetricsSet::new(),
238 work_table,
239 cache: self.cache.clone(),
240 }))
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247 use arrow::array::{ArrayRef, Int32Array};
248 use datafusion_execution::memory_pool::{MemoryConsumer, UnboundedMemoryPool};
249
250 #[test]
251 fn test_work_table() {
252 let work_table = WorkTable::new();
253 assert!(work_table.take().is_err());
255
256 let pool = Arc::new(UnboundedMemoryPool::default()) as _;
257 let mut reservation = MemoryConsumer::new("test_work_table").register(&pool);
258
259 let array: ArrayRef = Arc::new((0..5).collect::<Int32Array>());
261 let batch = RecordBatch::try_from_iter(vec![("col", array)]).unwrap();
262 reservation.try_grow(100).unwrap();
263 work_table.update(ReservedBatches::new(vec![batch.clone()], reservation));
264 let reserved_batches = work_table.take().unwrap();
266 assert_eq!(reserved_batches.batches, vec![batch.clone()]);
267
268 let memory_stream =
270 MemoryStream::try_new(reserved_batches.batches, batch.schema(), None)
271 .unwrap()
272 .with_reservation(reserved_batches.reservation);
273
274 assert_eq!(pool.reserved(), 100);
276
277 drop(memory_stream);
279 assert_eq!(pool.reserved(), 0);
280 }
281}