1use std::any::Any;
21use std::fmt;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use crate::coop::cooperative;
26use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
27use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
28use crate::{
29 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
30 RecordBatchStream, SendableRecordBatchStream, Statistics,
31};
32
33use arrow::array::RecordBatch;
34use arrow::datatypes::SchemaRef;
35use datafusion_common::{internal_err, Result};
36use datafusion_execution::memory_pool::MemoryReservation;
37use datafusion_execution::TaskContext;
38use datafusion_physical_expr::EquivalenceProperties;
39
40use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
41use futures::Stream;
42use parking_lot::RwLock;
43
44pub struct MemoryStream {
46 data: Vec<RecordBatch>,
48 reservation: Option<MemoryReservation>,
50 schema: SchemaRef,
52 projection: Option<Vec<usize>>,
54 index: usize,
56 fetch: Option<usize>,
58}
59
60impl MemoryStream {
61 pub fn try_new(
63 data: Vec<RecordBatch>,
64 schema: SchemaRef,
65 projection: Option<Vec<usize>>,
66 ) -> Result<Self> {
67 Ok(Self {
68 data,
69 reservation: None,
70 schema,
71 projection,
72 index: 0,
73 fetch: None,
74 })
75 }
76
77 pub fn with_reservation(mut self, reservation: MemoryReservation) -> Self {
79 self.reservation = Some(reservation);
80 self
81 }
82
83 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
85 self.fetch = fetch;
86 self
87 }
88}
89
90impl Stream for MemoryStream {
91 type Item = Result<RecordBatch>;
92
93 fn poll_next(
94 mut self: std::pin::Pin<&mut Self>,
95 _: &mut Context<'_>,
96 ) -> Poll<Option<Self::Item>> {
97 if self.index >= self.data.len() {
98 return Poll::Ready(None);
99 }
100 self.index += 1;
101 let batch = &self.data[self.index - 1];
102 let batch = match self.projection.as_ref() {
104 Some(columns) => batch.project(columns)?,
105 None => batch.clone(),
106 };
107
108 let Some(&fetch) = self.fetch.as_ref() else {
109 return Poll::Ready(Some(Ok(batch)));
110 };
111 if fetch == 0 {
112 return Poll::Ready(None);
113 }
114
115 let batch = if batch.num_rows() > fetch {
116 batch.slice(0, fetch)
117 } else {
118 batch
119 };
120 self.fetch = Some(fetch - batch.num_rows());
121 Poll::Ready(Some(Ok(batch)))
122 }
123
124 fn size_hint(&self) -> (usize, Option<usize>) {
125 (self.data.len(), Some(self.data.len()))
126 }
127}
128
129impl RecordBatchStream for MemoryStream {
130 fn schema(&self) -> SchemaRef {
132 Arc::clone(&self.schema)
133 }
134}
135
136pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display {
137 fn as_any(&self) -> &dyn Any;
140
141 fn boundedness(&self) -> Boundedness {
142 Boundedness::Bounded
143 }
144
145 fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>>;
147}
148
149pub struct LazyMemoryExec {
154 schema: SchemaRef,
156 projection: Option<Vec<usize>>,
158 batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
160 cache: PlanProperties,
162 metrics: ExecutionPlanMetricsSet,
164}
165
166impl LazyMemoryExec {
167 pub fn try_new(
169 schema: SchemaRef,
170 generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
171 ) -> Result<Self> {
172 let boundedness = generators
173 .iter()
174 .map(|g| g.read().boundedness())
175 .reduce(|acc, b| match acc {
176 Boundedness::Bounded => b,
177 Boundedness::Unbounded {
178 requires_infinite_memory,
179 } => {
180 let acc_infinite_memory = requires_infinite_memory;
181 match b {
182 Boundedness::Bounded => acc,
183 Boundedness::Unbounded {
184 requires_infinite_memory,
185 } => Boundedness::Unbounded {
186 requires_infinite_memory: requires_infinite_memory
187 || acc_infinite_memory,
188 },
189 }
190 }
191 })
192 .unwrap_or(Boundedness::Bounded);
193
194 let cache = PlanProperties::new(
195 EquivalenceProperties::new(Arc::clone(&schema)),
196 Partitioning::RoundRobinBatch(generators.len()),
197 EmissionType::Incremental,
198 boundedness,
199 )
200 .with_scheduling_type(SchedulingType::Cooperative);
201
202 Ok(Self {
203 schema,
204 projection: None,
205 batch_generators: generators,
206 cache,
207 metrics: ExecutionPlanMetricsSet::new(),
208 })
209 }
210
211 pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
212 match projection.as_ref() {
213 Some(columns) => {
214 let projected = Arc::new(self.schema.project(columns).unwrap());
215 self.cache = self.cache.with_eq_properties(EquivalenceProperties::new(
216 Arc::clone(&projected),
217 ));
218 self.schema = projected;
219 self.projection = projection;
220 self
221 }
222 _ => self,
223 }
224 }
225
226 pub fn try_set_partitioning(&mut self, partitioning: Partitioning) -> Result<()> {
227 if partitioning.partition_count() != self.batch_generators.len() {
228 internal_err!(
229 "Partition count must match generator count: {} != {}",
230 partitioning.partition_count(),
231 self.batch_generators.len()
232 )
233 } else {
234 self.cache.partitioning = partitioning;
235 Ok(())
236 }
237 }
238
239 pub fn add_ordering(&mut self, ordering: impl IntoIterator<Item = PhysicalSortExpr>) {
240 self.cache
241 .eq_properties
242 .add_orderings(std::iter::once(ordering));
243 }
244
245 pub fn generators(&self) -> &Vec<Arc<RwLock<dyn LazyBatchGenerator>>> {
247 &self.batch_generators
248 }
249}
250
251impl fmt::Debug for LazyMemoryExec {
252 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
253 f.debug_struct("LazyMemoryExec")
254 .field("schema", &self.schema)
255 .field("batch_generators", &self.batch_generators)
256 .finish()
257 }
258}
259
260impl DisplayAs for LazyMemoryExec {
261 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
262 match t {
263 DisplayFormatType::Default | DisplayFormatType::Verbose => {
264 write!(
265 f,
266 "LazyMemoryExec: partitions={}, batch_generators=[{}]",
267 self.batch_generators.len(),
268 self.batch_generators
269 .iter()
270 .map(|g| g.read().to_string())
271 .collect::<Vec<_>>()
272 .join(", ")
273 )
274 }
275 DisplayFormatType::TreeRender => {
276 writeln!(
278 f,
279 "batch_generators={}",
280 self.batch_generators
281 .iter()
282 .map(|g| g.read().to_string())
283 .collect::<Vec<String>>()
284 .join(", ")
285 )?;
286 Ok(())
287 }
288 }
289 }
290}
291
292impl ExecutionPlan for LazyMemoryExec {
293 fn name(&self) -> &'static str {
294 "LazyMemoryExec"
295 }
296
297 fn as_any(&self) -> &dyn Any {
298 self
299 }
300
301 fn schema(&self) -> SchemaRef {
302 Arc::clone(&self.schema)
303 }
304
305 fn properties(&self) -> &PlanProperties {
306 &self.cache
307 }
308
309 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
310 vec![]
311 }
312
313 fn with_new_children(
314 self: Arc<Self>,
315 children: Vec<Arc<dyn ExecutionPlan>>,
316 ) -> Result<Arc<dyn ExecutionPlan>> {
317 if children.is_empty() {
318 Ok(self)
319 } else {
320 internal_err!("Children cannot be replaced in LazyMemoryExec")
321 }
322 }
323
324 fn execute(
325 &self,
326 partition: usize,
327 _context: Arc<TaskContext>,
328 ) -> Result<SendableRecordBatchStream> {
329 if partition >= self.batch_generators.len() {
330 return internal_err!(
331 "Invalid partition {} for LazyMemoryExec with {} partitions",
332 partition,
333 self.batch_generators.len()
334 );
335 }
336
337 let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
338
339 let stream = LazyMemoryStream {
340 schema: Arc::clone(&self.schema),
341 projection: self.projection.clone(),
342 generator: Arc::clone(&self.batch_generators[partition]),
343 baseline_metrics,
344 };
345 Ok(Box::pin(cooperative(stream)))
346 }
347
348 fn metrics(&self) -> Option<MetricsSet> {
349 Some(self.metrics.clone_inner())
350 }
351
352 fn statistics(&self) -> Result<Statistics> {
353 Ok(Statistics::new_unknown(&self.schema))
354 }
355}
356
357pub struct LazyMemoryStream {
359 schema: SchemaRef,
360 projection: Option<Vec<usize>>,
362 generator: Arc<RwLock<dyn LazyBatchGenerator>>,
370 baseline_metrics: BaselineMetrics,
372}
373
374impl Stream for LazyMemoryStream {
375 type Item = Result<RecordBatch>;
376
377 fn poll_next(
378 self: std::pin::Pin<&mut Self>,
379 _: &mut Context<'_>,
380 ) -> Poll<Option<Self::Item>> {
381 let _timer_guard = self.baseline_metrics.elapsed_compute().timer();
382 let batch = self.generator.write().generate_next_batch();
383
384 let poll = match batch {
385 Ok(Some(batch)) => {
386 let batch = match self.projection.as_ref() {
388 Some(columns) => batch.project(columns)?,
389 None => batch,
390 };
391 Poll::Ready(Some(Ok(batch)))
392 }
393 Ok(None) => Poll::Ready(None),
394 Err(e) => Poll::Ready(Some(Err(e))),
395 };
396
397 self.baseline_metrics.record_poll(poll)
398 }
399}
400
401impl RecordBatchStream for LazyMemoryStream {
402 fn schema(&self) -> SchemaRef {
403 Arc::clone(&self.schema)
404 }
405}
406
407#[cfg(test)]
408mod lazy_memory_tests {
409 use super::*;
410 use crate::common::collect;
411 use arrow::array::Int64Array;
412 use arrow::datatypes::{DataType, Field, Schema};
413 use futures::StreamExt;
414
415 #[derive(Debug, Clone)]
416 struct TestGenerator {
417 counter: i64,
418 max_batches: i64,
419 batch_size: usize,
420 schema: SchemaRef,
421 }
422
423 impl fmt::Display for TestGenerator {
424 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
425 write!(
426 f,
427 "TestGenerator: counter={}, max_batches={}, batch_size={}",
428 self.counter, self.max_batches, self.batch_size
429 )
430 }
431 }
432
433 impl LazyBatchGenerator for TestGenerator {
434 fn as_any(&self) -> &dyn Any {
435 self
436 }
437
438 fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>> {
439 if self.counter >= self.max_batches {
440 return Ok(None);
441 }
442
443 let array = Int64Array::from_iter_values(
444 (self.counter * self.batch_size as i64)
445 ..(self.counter * self.batch_size as i64 + self.batch_size as i64),
446 );
447 self.counter += 1;
448 Ok(Some(RecordBatch::try_new(
449 Arc::clone(&self.schema),
450 vec![Arc::new(array)],
451 )?))
452 }
453 }
454
455 #[tokio::test]
456 async fn test_lazy_memory_exec() -> Result<()> {
457 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
458 let generator = TestGenerator {
459 counter: 0,
460 max_batches: 3,
461 batch_size: 2,
462 schema: Arc::clone(&schema),
463 };
464
465 let exec =
466 LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?;
467
468 assert_eq!(exec.schema().fields().len(), 1);
470 assert_eq!(exec.schema().field(0).name(), "a");
471
472 let stream = exec.execute(0, Arc::new(TaskContext::default()))?;
474 let batches: Vec<_> = stream.collect::<Vec<_>>().await;
475
476 assert_eq!(batches.len(), 3);
477
478 let batch0 = batches[0].as_ref().unwrap();
480 let array0 = batch0
481 .column(0)
482 .as_any()
483 .downcast_ref::<Int64Array>()
484 .unwrap();
485 assert_eq!(array0.values(), &[0, 1]);
486
487 let batch1 = batches[1].as_ref().unwrap();
488 let array1 = batch1
489 .column(0)
490 .as_any()
491 .downcast_ref::<Int64Array>()
492 .unwrap();
493 assert_eq!(array1.values(), &[2, 3]);
494
495 let batch2 = batches[2].as_ref().unwrap();
496 let array2 = batch2
497 .column(0)
498 .as_any()
499 .downcast_ref::<Int64Array>()
500 .unwrap();
501 assert_eq!(array2.values(), &[4, 5]);
502
503 Ok(())
504 }
505
506 #[tokio::test]
507 async fn test_lazy_memory_exec_invalid_partition() -> Result<()> {
508 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
509 let generator = TestGenerator {
510 counter: 0,
511 max_batches: 1,
512 batch_size: 1,
513 schema: Arc::clone(&schema),
514 };
515
516 let exec =
517 LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?;
518
519 let result = exec.execute(1, Arc::new(TaskContext::default()));
521
522 assert!(matches!(
524 result,
525 Err(e) if e.to_string().contains("Invalid partition 1 for LazyMemoryExec with 1 partitions")
526 ));
527
528 Ok(())
529 }
530
531 #[tokio::test]
532 async fn test_generate_series_metrics_integration() -> Result<()> {
533 let test_cases = vec![
535 (10, 2, 10), (100, 10, 100), (5, 1, 5), ];
539
540 for (total_rows, batch_size, expected_rows) in test_cases {
541 let schema =
542 Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
543 let generator = TestGenerator {
544 counter: 0,
545 max_batches: (total_rows + batch_size - 1) / batch_size, batch_size: batch_size as usize,
547 schema: Arc::clone(&schema),
548 };
549
550 let exec =
551 LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?;
552 let task_ctx = Arc::new(TaskContext::default());
553
554 let stream = exec.execute(0, task_ctx)?;
555 let batches = collect(stream).await?;
556
557 let metrics = exec.metrics().unwrap();
559
560 let actual_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
562 assert_eq!(actual_rows, expected_rows);
563
564 assert_eq!(metrics.output_rows().unwrap(), expected_rows);
566 assert!(metrics.elapsed_compute().unwrap() > 0);
567 }
568
569 Ok(())
570 }
571}