datafusion_physical_plan/
memory.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading in-memory batches of data
19
20use std::any::Any;
21use std::fmt;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use crate::coop::cooperative;
26use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
27use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
28use crate::{
29    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
30    RecordBatchStream, SendableRecordBatchStream, Statistics,
31};
32
33use arrow::array::RecordBatch;
34use arrow::datatypes::SchemaRef;
35use datafusion_common::{internal_err, Result};
36use datafusion_execution::memory_pool::MemoryReservation;
37use datafusion_execution::TaskContext;
38use datafusion_physical_expr::EquivalenceProperties;
39
40use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
41use futures::Stream;
42use parking_lot::RwLock;
43
44/// Iterator over batches
45pub struct MemoryStream {
46    /// Vector of record batches
47    data: Vec<RecordBatch>,
48    /// Optional memory reservation bound to the data, freed on drop
49    reservation: Option<MemoryReservation>,
50    /// Schema representing the data
51    schema: SchemaRef,
52    /// Optional projection for which columns to load
53    projection: Option<Vec<usize>>,
54    /// Index into the data
55    index: usize,
56    /// The remaining number of rows to return. If None, all rows are returned
57    fetch: Option<usize>,
58}
59
60impl MemoryStream {
61    /// Create an iterator for a vector of record batches
62    pub fn try_new(
63        data: Vec<RecordBatch>,
64        schema: SchemaRef,
65        projection: Option<Vec<usize>>,
66    ) -> Result<Self> {
67        Ok(Self {
68            data,
69            reservation: None,
70            schema,
71            projection,
72            index: 0,
73            fetch: None,
74        })
75    }
76
77    /// Set the memory reservation for the data
78    pub fn with_reservation(mut self, reservation: MemoryReservation) -> Self {
79        self.reservation = Some(reservation);
80        self
81    }
82
83    /// Set the number of rows to produce
84    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
85        self.fetch = fetch;
86        self
87    }
88}
89
90impl Stream for MemoryStream {
91    type Item = Result<RecordBatch>;
92
93    fn poll_next(
94        mut self: std::pin::Pin<&mut Self>,
95        _: &mut Context<'_>,
96    ) -> Poll<Option<Self::Item>> {
97        if self.index >= self.data.len() {
98            return Poll::Ready(None);
99        }
100        self.index += 1;
101        let batch = &self.data[self.index - 1];
102        // return just the columns requested
103        let batch = match self.projection.as_ref() {
104            Some(columns) => batch.project(columns)?,
105            None => batch.clone(),
106        };
107
108        let Some(&fetch) = self.fetch.as_ref() else {
109            return Poll::Ready(Some(Ok(batch)));
110        };
111        if fetch == 0 {
112            return Poll::Ready(None);
113        }
114
115        let batch = if batch.num_rows() > fetch {
116            batch.slice(0, fetch)
117        } else {
118            batch
119        };
120        self.fetch = Some(fetch - batch.num_rows());
121        Poll::Ready(Some(Ok(batch)))
122    }
123
124    fn size_hint(&self) -> (usize, Option<usize>) {
125        (self.data.len(), Some(self.data.len()))
126    }
127}
128
129impl RecordBatchStream for MemoryStream {
130    /// Get the schema
131    fn schema(&self) -> SchemaRef {
132        Arc::clone(&self.schema)
133    }
134}
135
136pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display {
137    /// Returns the generator as [`Any`] so that it can be
138    /// downcast to a specific implementation.
139    fn as_any(&self) -> &dyn Any;
140
141    fn boundedness(&self) -> Boundedness {
142        Boundedness::Bounded
143    }
144
145    /// Generate the next batch, return `None` when no more batches are available
146    fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>>;
147}
148
149/// Execution plan for lazy in-memory batches of data
150///
151/// This plan generates output batches lazily, it doesn't have to buffer all batches
152/// in memory up front (compared to `MemorySourceConfig`), thus consuming constant memory.
153pub struct LazyMemoryExec {
154    /// Schema representing the data
155    schema: SchemaRef,
156    /// Optional projection for which columns to load
157    projection: Option<Vec<usize>>,
158    /// Functions to generate batches for each partition
159    batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
160    /// Plan properties cache storing equivalence properties, partitioning, and execution mode
161    cache: PlanProperties,
162    /// Execution metrics
163    metrics: ExecutionPlanMetricsSet,
164}
165
166impl LazyMemoryExec {
167    /// Create a new lazy memory execution plan
168    pub fn try_new(
169        schema: SchemaRef,
170        generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
171    ) -> Result<Self> {
172        let boundedness = generators
173            .iter()
174            .map(|g| g.read().boundedness())
175            .reduce(|acc, b| match acc {
176                Boundedness::Bounded => b,
177                Boundedness::Unbounded {
178                    requires_infinite_memory,
179                } => {
180                    let acc_infinite_memory = requires_infinite_memory;
181                    match b {
182                        Boundedness::Bounded => acc,
183                        Boundedness::Unbounded {
184                            requires_infinite_memory,
185                        } => Boundedness::Unbounded {
186                            requires_infinite_memory: requires_infinite_memory
187                                || acc_infinite_memory,
188                        },
189                    }
190                }
191            })
192            .unwrap_or(Boundedness::Bounded);
193
194        let cache = PlanProperties::new(
195            EquivalenceProperties::new(Arc::clone(&schema)),
196            Partitioning::RoundRobinBatch(generators.len()),
197            EmissionType::Incremental,
198            boundedness,
199        )
200        .with_scheduling_type(SchedulingType::Cooperative);
201
202        Ok(Self {
203            schema,
204            projection: None,
205            batch_generators: generators,
206            cache,
207            metrics: ExecutionPlanMetricsSet::new(),
208        })
209    }
210
211    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
212        match projection.as_ref() {
213            Some(columns) => {
214                let projected = Arc::new(self.schema.project(columns).unwrap());
215                self.cache = self.cache.with_eq_properties(EquivalenceProperties::new(
216                    Arc::clone(&projected),
217                ));
218                self.schema = projected;
219                self.projection = projection;
220                self
221            }
222            _ => self,
223        }
224    }
225
226    pub fn try_set_partitioning(&mut self, partitioning: Partitioning) -> Result<()> {
227        if partitioning.partition_count() != self.batch_generators.len() {
228            internal_err!(
229                "Partition count must match generator count: {} != {}",
230                partitioning.partition_count(),
231                self.batch_generators.len()
232            )
233        } else {
234            self.cache.partitioning = partitioning;
235            Ok(())
236        }
237    }
238
239    pub fn add_ordering(&mut self, ordering: impl IntoIterator<Item = PhysicalSortExpr>) {
240        self.cache
241            .eq_properties
242            .add_orderings(std::iter::once(ordering));
243    }
244
245    /// Get the batch generators
246    pub fn generators(&self) -> &Vec<Arc<RwLock<dyn LazyBatchGenerator>>> {
247        &self.batch_generators
248    }
249}
250
251impl fmt::Debug for LazyMemoryExec {
252    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
253        f.debug_struct("LazyMemoryExec")
254            .field("schema", &self.schema)
255            .field("batch_generators", &self.batch_generators)
256            .finish()
257    }
258}
259
260impl DisplayAs for LazyMemoryExec {
261    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
262        match t {
263            DisplayFormatType::Default | DisplayFormatType::Verbose => {
264                write!(
265                    f,
266                    "LazyMemoryExec: partitions={}, batch_generators=[{}]",
267                    self.batch_generators.len(),
268                    self.batch_generators
269                        .iter()
270                        .map(|g| g.read().to_string())
271                        .collect::<Vec<_>>()
272                        .join(", ")
273                )
274            }
275            DisplayFormatType::TreeRender => {
276                //TODO: remove batch_size, add one line per generator
277                writeln!(
278                    f,
279                    "batch_generators={}",
280                    self.batch_generators
281                        .iter()
282                        .map(|g| g.read().to_string())
283                        .collect::<Vec<String>>()
284                        .join(", ")
285                )?;
286                Ok(())
287            }
288        }
289    }
290}
291
292impl ExecutionPlan for LazyMemoryExec {
293    fn name(&self) -> &'static str {
294        "LazyMemoryExec"
295    }
296
297    fn as_any(&self) -> &dyn Any {
298        self
299    }
300
301    fn schema(&self) -> SchemaRef {
302        Arc::clone(&self.schema)
303    }
304
305    fn properties(&self) -> &PlanProperties {
306        &self.cache
307    }
308
309    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
310        vec![]
311    }
312
313    fn with_new_children(
314        self: Arc<Self>,
315        children: Vec<Arc<dyn ExecutionPlan>>,
316    ) -> Result<Arc<dyn ExecutionPlan>> {
317        if children.is_empty() {
318            Ok(self)
319        } else {
320            internal_err!("Children cannot be replaced in LazyMemoryExec")
321        }
322    }
323
324    fn execute(
325        &self,
326        partition: usize,
327        _context: Arc<TaskContext>,
328    ) -> Result<SendableRecordBatchStream> {
329        if partition >= self.batch_generators.len() {
330            return internal_err!(
331                "Invalid partition {} for LazyMemoryExec with {} partitions",
332                partition,
333                self.batch_generators.len()
334            );
335        }
336
337        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
338
339        let stream = LazyMemoryStream {
340            schema: Arc::clone(&self.schema),
341            projection: self.projection.clone(),
342            generator: Arc::clone(&self.batch_generators[partition]),
343            baseline_metrics,
344        };
345        Ok(Box::pin(cooperative(stream)))
346    }
347
348    fn metrics(&self) -> Option<MetricsSet> {
349        Some(self.metrics.clone_inner())
350    }
351
352    fn statistics(&self) -> Result<Statistics> {
353        Ok(Statistics::new_unknown(&self.schema))
354    }
355}
356
357/// Stream that generates record batches on demand
358pub struct LazyMemoryStream {
359    schema: SchemaRef,
360    /// Optional projection for which columns to load
361    projection: Option<Vec<usize>>,
362    /// Generator to produce batches
363    ///
364    /// Note: Idiomatically, DataFusion uses plan-time parallelism - each stream
365    /// should have a unique `LazyBatchGenerator`. Use RepartitionExec or
366    /// construct multiple `LazyMemoryStream`s during planning to enable
367    /// parallel execution.
368    /// Sharing generators between streams should be used with caution.
369    generator: Arc<RwLock<dyn LazyBatchGenerator>>,
370    /// Execution metrics
371    baseline_metrics: BaselineMetrics,
372}
373
374impl Stream for LazyMemoryStream {
375    type Item = Result<RecordBatch>;
376
377    fn poll_next(
378        self: std::pin::Pin<&mut Self>,
379        _: &mut Context<'_>,
380    ) -> Poll<Option<Self::Item>> {
381        let _timer_guard = self.baseline_metrics.elapsed_compute().timer();
382        let batch = self.generator.write().generate_next_batch();
383
384        let poll = match batch {
385            Ok(Some(batch)) => {
386                // return just the columns requested
387                let batch = match self.projection.as_ref() {
388                    Some(columns) => batch.project(columns)?,
389                    None => batch,
390                };
391                Poll::Ready(Some(Ok(batch)))
392            }
393            Ok(None) => Poll::Ready(None),
394            Err(e) => Poll::Ready(Some(Err(e))),
395        };
396
397        self.baseline_metrics.record_poll(poll)
398    }
399}
400
401impl RecordBatchStream for LazyMemoryStream {
402    fn schema(&self) -> SchemaRef {
403        Arc::clone(&self.schema)
404    }
405}
406
407#[cfg(test)]
408mod lazy_memory_tests {
409    use super::*;
410    use crate::common::collect;
411    use arrow::array::Int64Array;
412    use arrow::datatypes::{DataType, Field, Schema};
413    use futures::StreamExt;
414
415    #[derive(Debug, Clone)]
416    struct TestGenerator {
417        counter: i64,
418        max_batches: i64,
419        batch_size: usize,
420        schema: SchemaRef,
421    }
422
423    impl fmt::Display for TestGenerator {
424        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
425            write!(
426                f,
427                "TestGenerator: counter={}, max_batches={}, batch_size={}",
428                self.counter, self.max_batches, self.batch_size
429            )
430        }
431    }
432
433    impl LazyBatchGenerator for TestGenerator {
434        fn as_any(&self) -> &dyn Any {
435            self
436        }
437
438        fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>> {
439            if self.counter >= self.max_batches {
440                return Ok(None);
441            }
442
443            let array = Int64Array::from_iter_values(
444                (self.counter * self.batch_size as i64)
445                    ..(self.counter * self.batch_size as i64 + self.batch_size as i64),
446            );
447            self.counter += 1;
448            Ok(Some(RecordBatch::try_new(
449                Arc::clone(&self.schema),
450                vec![Arc::new(array)],
451            )?))
452        }
453    }
454
455    #[tokio::test]
456    async fn test_lazy_memory_exec() -> Result<()> {
457        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
458        let generator = TestGenerator {
459            counter: 0,
460            max_batches: 3,
461            batch_size: 2,
462            schema: Arc::clone(&schema),
463        };
464
465        let exec =
466            LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?;
467
468        // Test schema
469        assert_eq!(exec.schema().fields().len(), 1);
470        assert_eq!(exec.schema().field(0).name(), "a");
471
472        // Test execution
473        let stream = exec.execute(0, Arc::new(TaskContext::default()))?;
474        let batches: Vec<_> = stream.collect::<Vec<_>>().await;
475
476        assert_eq!(batches.len(), 3);
477
478        // Verify batch contents
479        let batch0 = batches[0].as_ref().unwrap();
480        let array0 = batch0
481            .column(0)
482            .as_any()
483            .downcast_ref::<Int64Array>()
484            .unwrap();
485        assert_eq!(array0.values(), &[0, 1]);
486
487        let batch1 = batches[1].as_ref().unwrap();
488        let array1 = batch1
489            .column(0)
490            .as_any()
491            .downcast_ref::<Int64Array>()
492            .unwrap();
493        assert_eq!(array1.values(), &[2, 3]);
494
495        let batch2 = batches[2].as_ref().unwrap();
496        let array2 = batch2
497            .column(0)
498            .as_any()
499            .downcast_ref::<Int64Array>()
500            .unwrap();
501        assert_eq!(array2.values(), &[4, 5]);
502
503        Ok(())
504    }
505
506    #[tokio::test]
507    async fn test_lazy_memory_exec_invalid_partition() -> Result<()> {
508        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
509        let generator = TestGenerator {
510            counter: 0,
511            max_batches: 1,
512            batch_size: 1,
513            schema: Arc::clone(&schema),
514        };
515
516        let exec =
517            LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?;
518
519        // Test invalid partition
520        let result = exec.execute(1, Arc::new(TaskContext::default()));
521
522        // partition is 0-indexed, so there only should be partition 0
523        assert!(matches!(
524            result,
525            Err(e) if e.to_string().contains("Invalid partition 1 for LazyMemoryExec with 1 partitions")
526        ));
527
528        Ok(())
529    }
530
531    #[tokio::test]
532    async fn test_generate_series_metrics_integration() -> Result<()> {
533        // Test LazyMemoryExec metrics with different configurations
534        let test_cases = vec![
535            (10, 2, 10),    // 10 rows, batch size 2, expected 10 rows
536            (100, 10, 100), // 100 rows, batch size 10, expected 100 rows
537            (5, 1, 5),      // 5 rows, batch size 1, expected 5 rows
538        ];
539
540        for (total_rows, batch_size, expected_rows) in test_cases {
541            let schema =
542                Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
543            let generator = TestGenerator {
544                counter: 0,
545                max_batches: (total_rows + batch_size - 1) / batch_size, // ceiling division
546                batch_size: batch_size as usize,
547                schema: Arc::clone(&schema),
548            };
549
550            let exec =
551                LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?;
552            let task_ctx = Arc::new(TaskContext::default());
553
554            let stream = exec.execute(0, task_ctx)?;
555            let batches = collect(stream).await?;
556
557            // Verify metrics exist with actual expected numbers
558            let metrics = exec.metrics().unwrap();
559
560            // Count actual rows returned
561            let actual_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
562            assert_eq!(actual_rows, expected_rows);
563
564            // Verify metrics match actual output
565            assert_eq!(metrics.output_rows().unwrap(), expected_rows);
566            assert!(metrics.elapsed_compute().unwrap() > 0);
567        }
568
569        Ok(())
570    }
571}