datafusion_physical_plan/spill/spill_pool.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use futures::{Stream, StreamExt};
19use std::collections::VecDeque;
20use std::sync::Arc;
21use std::task::Waker;
22
23use parking_lot::Mutex;
24
25use arrow::datatypes::SchemaRef;
26use arrow::record_batch::RecordBatch;
27use datafusion_common::Result;
28use datafusion_execution::disk_manager::RefCountedTempFile;
29use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
30
31use super::in_progress_spill_file::InProgressSpillFile;
32use super::spill_manager::SpillManager;
33
34/// Shared state between the writer and readers of a spill pool.
35/// This contains the queue of files and coordination state.
36///
37/// # Locking Design
38///
39/// This struct uses **fine-grained locking** with nested `Arc<Mutex<>>`:
40/// - `SpillPoolShared` is wrapped in `Arc<Mutex<>>` (outer lock)
41/// - Each `ActiveSpillFileShared` is wrapped in `Arc<Mutex<>>` (inner lock)
42///
43/// This enables:
44/// 1. **Short critical sections**: The outer lock is held only for queue operations
45/// 2. **I/O outside locks**: Disk I/O happens while holding only the file-specific lock
46/// 3. **Concurrent operations**: Reader can access the queue while writer does I/O
47///
48/// **Lock ordering discipline**: Never hold both locks simultaneously to prevent deadlock.
49/// Always: acquire outer lock → release outer lock → acquire inner lock (if needed).
50struct SpillPoolShared {
51 /// Queue of ALL files (including the current write file if it exists).
52 /// Readers always read from the front of this queue (FIFO).
53 /// Each file has its own lock to enable concurrent reader/writer access.
54 files: VecDeque<Arc<Mutex<ActiveSpillFileShared>>>,
55 /// SpillManager for creating files and tracking metrics
56 spill_manager: Arc<SpillManager>,
57 /// Pool-level waker to notify when new files are available (single reader)
58 waker: Option<Waker>,
59 /// Whether the writer has been dropped (no more files will be added)
60 writer_dropped: bool,
61 /// Writer's reference to the current file (shared by all cloned writers).
62 /// Has its own lock to allow I/O without blocking queue access.
63 current_write_file: Option<Arc<Mutex<ActiveSpillFileShared>>>,
64 /// Number of active writer clones. Only when this reaches zero should
65 /// `writer_dropped` be set to true. This prevents premature EOF signaling
66 /// when one writer clone is dropped while others are still active.
67 active_writer_count: usize,
68}
69
70impl SpillPoolShared {
71 /// Creates a new shared pool state
72 fn new(spill_manager: Arc<SpillManager>) -> Self {
73 Self {
74 files: VecDeque::new(),
75 spill_manager,
76 waker: None,
77 writer_dropped: false,
78 current_write_file: None,
79 active_writer_count: 1,
80 }
81 }
82
83 /// Registers a waker to be notified when new data is available (pool-level)
84 fn register_waker(&mut self, waker: Waker) {
85 self.waker = Some(waker);
86 }
87
88 /// Wakes the pool-level reader
89 fn wake(&mut self) {
90 if let Some(waker) = self.waker.take() {
91 waker.wake();
92 }
93 }
94}
95
96/// Writer for a spill pool. Provides coordinated write access with FIFO semantics.
97///
98/// Created by [`channel`]. See that function for architecture diagrams and usage examples.
99///
100/// The writer is `Clone`, allowing multiple writers to coordinate on the same pool.
101/// All clones share the same current write file and coordinate file rotation.
102/// The writer automatically manages file rotation based on the `max_file_size_bytes`
103/// configured in [`channel`]. When the last writer clone is dropped, it finalizes the
104/// current file so readers can access all written data.
105pub struct SpillPoolWriter {
106 /// Maximum size in bytes before rotating to a new file.
107 /// Typically set from configuration `datafusion.execution.max_spill_file_size_bytes`.
108 max_file_size_bytes: usize,
109 /// Shared state with readers (includes current_write_file for coordination)
110 shared: Arc<Mutex<SpillPoolShared>>,
111}
112
113impl Clone for SpillPoolWriter {
114 fn clone(&self) -> Self {
115 // Increment the active writer count so that `writer_dropped` is only
116 // set to true when the *last* clone is dropped.
117 self.shared.lock().active_writer_count += 1;
118 Self {
119 max_file_size_bytes: self.max_file_size_bytes,
120 shared: Arc::clone(&self.shared),
121 }
122 }
123}
124
125impl SpillPoolWriter {
126 /// Spills a batch to the pool, rotating files when necessary.
127 ///
128 /// If the current file would exceed `max_file_size_bytes` after adding
129 /// this batch, the file is finalized and a new one is started.
130 ///
131 /// See [`channel`] for overall architecture and examples.
132 ///
133 /// # File Rotation Logic
134 ///
135 /// ```text
136 /// push_batch()
137 /// │
138 /// ▼
139 /// Current file exists?
140 /// │
141 /// ├─ No ──▶ Create new file ──▶ Add to shared queue
142 /// │ Wake readers
143 /// ▼
144 /// Write batch to current file
145 /// │
146 /// ▼
147 /// estimated_size > max_file_size_bytes?
148 /// │
149 /// ├─ No ──▶ Keep current file for next batch
150 /// │
151 /// ▼
152 /// Yes: finish() current file
153 /// Mark writer_finished = true
154 /// Wake readers
155 /// │
156 /// ▼
157 /// Next push_batch() creates new file
158 /// ```
159 ///
160 /// # Errors
161 ///
162 /// Returns an error if disk I/O fails or disk quota is exceeded.
163 pub fn push_batch(&self, batch: &RecordBatch) -> Result<()> {
164 if batch.num_rows() == 0 {
165 // Skip empty batches
166 return Ok(());
167 }
168
169 let batch_size = batch.get_array_memory_size();
170
171 // Fine-grained locking: Lock shared state briefly for queue access
172 let mut shared = self.shared.lock();
173
174 // Create new file if we don't have one yet
175 if shared.current_write_file.is_none() {
176 let spill_manager = Arc::clone(&shared.spill_manager);
177 // Release shared lock before disk I/O (fine-grained locking)
178 drop(shared);
179
180 let writer = spill_manager.create_in_progress_file("SpillPool")?;
181 // Clone the file so readers can access it immediately
182 let file = writer.file().expect("InProgressSpillFile should always have a file when it is first created").clone();
183
184 let file_shared = Arc::new(Mutex::new(ActiveSpillFileShared {
185 writer: Some(writer),
186 file: Some(file), // Set immediately so readers can access it
187 batches_written: 0,
188 estimated_size: 0,
189 writer_finished: false,
190 waker: None,
191 }));
192
193 // Re-acquire lock and push to shared queue
194 shared = self.shared.lock();
195 shared.files.push_back(Arc::clone(&file_shared));
196 shared.current_write_file = Some(file_shared);
197 shared.wake(); // Wake readers waiting for new files
198 }
199
200 let current_write_file = shared.current_write_file.take();
201 // Release shared lock before file I/O (fine-grained locking)
202 // This allows readers to access the queue while we do disk I/O
203 drop(shared);
204
205 // Write batch to current file - lock only the specific file
206 if let Some(current_file) = current_write_file {
207 // Now lock just this file for I/O (separate from shared lock)
208 let mut file_shared = current_file.lock();
209
210 // Append the batch
211 if let Some(ref mut writer) = file_shared.writer {
212 writer.append_batch(batch)?;
213 // make sure we flush the writer for readers
214 writer.flush()?;
215 file_shared.batches_written += 1;
216 file_shared.estimated_size += batch_size;
217 }
218
219 // Wake reader waiting on this specific file
220 file_shared.wake();
221
222 // Check if we need to rotate
223 let needs_rotation = file_shared.estimated_size > self.max_file_size_bytes;
224
225 if needs_rotation {
226 // Finish the IPC writer
227 if let Some(mut writer) = file_shared.writer.take() {
228 writer.finish()?;
229 }
230 // Mark as finished so readers know not to wait for more data
231 file_shared.writer_finished = true;
232 // Wake reader waiting on this file (it's now finished)
233 file_shared.wake();
234 // Don't put back current_write_file - let it rotate
235 } else {
236 // Release file lock
237 drop(file_shared);
238 // Put back the current file for further writing
239 let mut shared = self.shared.lock();
240 shared.current_write_file = Some(current_file);
241 }
242 }
243
244 Ok(())
245 }
246}
247
248impl Drop for SpillPoolWriter {
249 fn drop(&mut self) {
250 let mut shared = self.shared.lock();
251
252 shared.active_writer_count -= 1;
253 let is_last_writer = shared.active_writer_count == 0;
254
255 if !is_last_writer {
256 // Other writer clones are still active; do not finalize or
257 // signal EOF to readers.
258 return;
259 }
260
261 // Finalize the current file when the last writer is dropped
262 if let Some(current_file) = shared.current_write_file.take() {
263 // Release shared lock before locking file
264 drop(shared);
265
266 let mut file_shared = current_file.lock();
267
268 // Finish the current writer if it exists
269 if let Some(mut writer) = file_shared.writer.take() {
270 // Ignore errors on drop - we're in destructor
271 let _ = writer.finish();
272 }
273
274 // Mark as finished so readers know not to wait for more data
275 file_shared.writer_finished = true;
276
277 // Wake reader waiting on this file (it's now finished)
278 file_shared.wake();
279
280 drop(file_shared);
281 shared = self.shared.lock();
282 }
283
284 // Mark writer as dropped and wake pool-level readers
285 shared.writer_dropped = true;
286 shared.wake();
287 }
288}
289
290/// Creates a paired writer and reader for a spill pool with MPSC (multi-producer, single-consumer)
291/// semantics.
292///
293/// This is the recommended way to create a spill pool. The writer is `Clone`, allowing
294/// multiple producers to coordinate writes to the same pool. The reader can consume batches
295/// in FIFO order. The reader can start reading immediately after a writer appends a batch
296/// to the spill file, without waiting for the file to be sealed, while writers continue to
297/// write more data.
298///
299/// Internally this coordinates rotating spill files based on size limits, and
300/// handles asynchronous notification between the writer and reader using wakers.
301/// This ensures that we manage disk usage efficiently while allowing concurrent
302/// I/O between the writer and reader.
303///
304/// # Data Flow Overview
305///
306/// 1. Writer write batch `B0` to F1
307/// 2. Writer write batch `B1` to F1, notices the size limit exceeded, finishes F1.
308/// 3. Reader read `B0` from F1
309/// 4. Reader read `B1`, no more batch to read -> wait on the waker
310/// 5. Writer write batch `B2` to a new file `F2`, wake up the waiting reader.
311/// 6. Reader read `B2` from F2.
312/// 7. Repeat until writer is dropped.
313///
314/// # Architecture
315///
316/// ```text
317/// ┌─────────────────────────────────────────────────────────────────────────┐
318/// │ SpillPool │
319/// │ │
320/// │ Writer Side Shared State Reader Side │
321/// │ ─────────── ──────────── ─────────── │
322/// │ │
323/// │ SpillPoolWriter ┌────────────────────┐ SpillPoolReader │
324/// │ │ │ VecDeque<File> │ │ │
325/// │ │ │ ┌────┐┌────┐ │ │ │
326/// │ push_batch() │ │ F1 ││ F2 │ ... │ next().await │
327/// │ │ │ └────┘└────┘ │ │ │
328/// │ ▼ │ (FIFO order) │ ▼ │
329/// │ ┌─────────┐ │ │ ┌──────────┐ │
330/// │ │Current │───────▶│ Coordination: │◀───│ Current │ │
331/// │ │Write │ │ - Wakers │ │ Read │ │
332/// │ │File │ │ - Batch counts │ │ File │ │
333/// │ └─────────┘ │ - Writer status │ └──────────┘ │
334/// │ │ └────────────────────┘ │ │
335/// │ │ │ │
336/// │ Size > limit? Read all batches? │
337/// │ │ │ │
338/// │ ▼ ▼ │
339/// │ Rotate to new file Pop from queue │
340/// └─────────────────────────────────────────────────────────────────────────┘
341///
342/// Writer produces → Shared FIFO queue → Reader consumes
343/// ```
344///
345/// # File State Machine
346///
347/// Each file in the pool coordinates between writer and reader:
348///
349/// ```text
350/// Writer View Reader View
351/// ─────────── ───────────
352///
353/// Created writer: Some(..) batches_read: 0
354/// batches_written: 0 (waiting for data)
355/// │
356/// ▼
357/// Writing append_batch() Can read if:
358/// batches_written++ batches_read < batches_written
359/// wake readers
360/// │ │
361/// │ ▼
362/// ┌──────┴──────┐ poll_next() → batch
363/// │ │ batches_read++
364/// ▼ ▼
365/// Size > limit? More data?
366/// │ │
367/// │ └─▶ Yes ──▶ Continue writing
368/// ▼
369/// finish() Reader catches up:
370/// writer_finished = true batches_read == batches_written
371/// wake readers │
372/// │ ▼
373/// └─────────────────────▶ Returns Poll::Ready(None)
374/// File complete, pop from queue
375/// ```
376///
377/// # Arguments
378///
379/// * `max_file_size_bytes` - Maximum size per file before rotation. When a file
380/// exceeds this size, the writer automatically rotates to a new file.
381/// * `spill_manager` - Manager for file creation and metrics tracking
382///
383/// # Returns
384///
385/// A tuple of `(SpillPoolWriter, SendableRecordBatchStream)` that share the same
386/// underlying pool. The reader is returned as a stream for immediate use with
387/// async stream combinators.
388///
389/// # Example
390///
391/// ```
392/// use std::sync::Arc;
393/// use arrow::array::{ArrayRef, Int32Array};
394/// use arrow::datatypes::{DataType, Field, Schema};
395/// use arrow::record_batch::RecordBatch;
396/// use datafusion_execution::runtime_env::RuntimeEnv;
397/// use futures::StreamExt;
398///
399/// # use datafusion_physical_plan::spill::spill_pool;
400/// # use datafusion_physical_plan::spill::SpillManager; // Re-exported for doctests
401/// # use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
402/// #
403/// # #[tokio::main]
404/// # async fn main() -> datafusion_common::Result<()> {
405/// # // Setup for the example (typically comes from TaskContext in production)
406/// # let env = Arc::new(RuntimeEnv::default());
407/// # let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
408/// # let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
409/// # let spill_manager = Arc::new(SpillManager::new(env, metrics, schema.clone()));
410/// #
411/// // Create channel with 1MB file size limit
412/// let (writer, mut reader) = spill_pool::channel(1024 * 1024, spill_manager);
413///
414/// // Spawn writer and reader concurrently; writer wakes reader via wakers
415/// let writer_task = tokio::spawn(async move {
416/// for i in 0..5 {
417/// let array: ArrayRef = Arc::new(Int32Array::from(vec![i; 100]));
418/// let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
419/// writer.push_batch(&batch)?;
420/// }
421/// // Explicitly drop writer to finalize the spill file and wake the reader
422/// drop(writer);
423/// datafusion_common::Result::<()>::Ok(())
424/// });
425///
426/// let reader_task = tokio::spawn(async move {
427/// let mut batches_read = 0;
428/// while let Some(result) = reader.next().await {
429/// let _batch = result?;
430/// batches_read += 1;
431/// }
432/// datafusion_common::Result::<usize>::Ok(batches_read)
433/// });
434///
435/// let (writer_res, reader_res) = tokio::join!(writer_task, reader_task);
436/// writer_res
437/// .map_err(|e| datafusion_common::DataFusionError::Execution(e.to_string()))??;
438/// let batches_read = reader_res
439/// .map_err(|e| datafusion_common::DataFusionError::Execution(e.to_string()))??;
440///
441/// assert_eq!(batches_read, 5);
442/// # Ok(())
443/// # }
444/// ```
445///
446/// # Why rotate files?
447///
448/// File rotation ensures we don't end up with unreferenced disk usage.
449/// If we used a single file for all spilled data, we would end up with
450/// unreferenced data at the beginning of the file that has already been read
451/// by readers but we can't delete because you can't truncate from the start of a file.
452///
453/// Consider the case of a query like `SELECT * FROM large_table WHERE false`.
454/// Obviously this query produces no output rows, but if we had a spilling operator
455/// in the middle of this query between the scan and the filter it would see the entire
456/// `large_table` flow through it and thus would spill all of that data to disk.
457/// So we'd end up using up to `size(large_table)` bytes of disk space.
458/// If instead we use file rotation, and as long as the readers can keep up with the writer,
459/// then we can ensure that once a file is fully read by all readers it can be deleted,
460/// thus bounding the maximum disk usage to roughly `max_file_size_bytes`.
461pub fn channel(
462 max_file_size_bytes: usize,
463 spill_manager: Arc<SpillManager>,
464) -> (SpillPoolWriter, SendableRecordBatchStream) {
465 let schema = Arc::clone(spill_manager.schema());
466 let shared = Arc::new(Mutex::new(SpillPoolShared::new(spill_manager)));
467
468 let writer = SpillPoolWriter {
469 max_file_size_bytes,
470 shared: Arc::clone(&shared),
471 };
472
473 let reader = SpillPoolReader::new(shared, schema);
474
475 (writer, Box::pin(reader))
476}
477
478/// Shared state between writer and readers for an active spill file.
479/// Protected by a Mutex to coordinate between concurrent readers and the writer.
480struct ActiveSpillFileShared {
481 /// Writer handle - taken (set to None) when finish() is called
482 writer: Option<InProgressSpillFile>,
483 /// The spill file, set when the writer finishes.
484 /// Taken by the reader when creating a stream (the file stays open via file handles).
485 file: Option<RefCountedTempFile>,
486 /// Total number of batches written to this file
487 batches_written: usize,
488 /// Estimated size in bytes of data written to this file
489 estimated_size: usize,
490 /// Whether the writer has finished writing to this file
491 writer_finished: bool,
492 /// Waker for reader waiting on this specific file (SPSC: only one reader)
493 waker: Option<Waker>,
494}
495
496impl ActiveSpillFileShared {
497 /// Registers a waker to be notified when new data is written to this file
498 fn register_waker(&mut self, waker: Waker) {
499 self.waker = Some(waker);
500 }
501
502 /// Wakes the reader waiting on this file
503 fn wake(&mut self) {
504 if let Some(waker) = self.waker.take() {
505 waker.wake();
506 }
507 }
508}
509
510/// Reader state for a SpillFile (owned by individual SpillFile instances).
511/// This is kept separate from the shared state to avoid holding locks during I/O.
512struct SpillFileReader {
513 /// The actual stream reading from disk
514 stream: SendableRecordBatchStream,
515 /// Number of batches this reader has consumed
516 batches_read: usize,
517}
518
519struct SpillFile {
520 /// Shared coordination state (contains writer and batch counts)
521 shared: Arc<Mutex<ActiveSpillFileShared>>,
522 /// Reader state (lazy-initialized, owned by this SpillFile)
523 reader: Option<SpillFileReader>,
524 /// Spill manager for creating readers
525 spill_manager: Arc<SpillManager>,
526}
527
528impl Stream for SpillFile {
529 type Item = Result<RecordBatch>;
530
531 fn poll_next(
532 mut self: std::pin::Pin<&mut Self>,
533 cx: &mut std::task::Context<'_>,
534 ) -> std::task::Poll<Option<Self::Item>> {
535 use std::task::Poll;
536
537 // Step 1: Lock shared state and check coordination
538 let (should_read, file) = {
539 let mut shared = self.shared.lock();
540
541 // Determine if we can read
542 let batches_read = self.reader.as_ref().map_or(0, |r| r.batches_read);
543
544 if batches_read < shared.batches_written {
545 // More data available to read - take the file if we don't have a reader yet
546 let file = if self.reader.is_none() {
547 shared.file.take()
548 } else {
549 None
550 };
551 (true, file)
552 } else if shared.writer_finished {
553 // No more data and writer is done - EOF
554 return Poll::Ready(None);
555 } else {
556 // Caught up to writer, but writer still active - register waker and wait
557 shared.register_waker(cx.waker().clone());
558 return Poll::Pending;
559 }
560 }; // Lock released here
561
562 // Step 2: Lazy-create reader stream if needed
563 if self.reader.is_none() && should_read {
564 if let Some(file) = file {
565 // we want this unbuffered because files are actively being written to
566 match self
567 .spill_manager
568 .read_spill_as_stream_unbuffered(file, None)
569 {
570 Ok(stream) => {
571 self.reader = Some(SpillFileReader {
572 stream,
573 batches_read: 0,
574 });
575 }
576 Err(e) => return Poll::Ready(Some(Err(e))),
577 }
578 } else {
579 // File not available yet (writer hasn't finished or already taken)
580 // Register waker and wait for file to be ready
581 let mut shared = self.shared.lock();
582 shared.register_waker(cx.waker().clone());
583 return Poll::Pending;
584 }
585 }
586
587 // Step 3: Poll the reader stream (no lock held)
588 if let Some(reader) = &mut self.reader {
589 match reader.stream.poll_next_unpin(cx) {
590 Poll::Ready(Some(Ok(batch))) => {
591 // Successfully read a batch - increment counter
592 reader.batches_read += 1;
593 Poll::Ready(Some(Ok(batch)))
594 }
595 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
596 Poll::Ready(None) => {
597 // Stream exhausted unexpectedly
598 // This shouldn't happen if coordination is correct, but handle gracefully
599 Poll::Ready(None)
600 }
601 Poll::Pending => Poll::Pending,
602 }
603 } else {
604 // Should not reach here, but handle gracefully
605 Poll::Ready(None)
606 }
607 }
608}
609
610/// A stream that reads from a SpillPool in FIFO order.
611///
612/// Created by [`channel`]. See that function for architecture diagrams and usage examples.
613///
614/// The stream automatically handles file rotation and reads from completed files.
615/// When no data is available, it returns `Poll::Pending` and registers a waker to
616/// be notified when the writer produces more data.
617///
618/// # Infinite Stream Semantics
619///
620/// This stream never returns `None` (`Poll::Ready(None)`) on its own - it will keep
621/// waiting for the writer to produce more data. The stream ends only when:
622/// - The reader is dropped
623/// - The writer is dropped AND all queued data has been consumed
624///
625/// This makes it suitable for continuous streaming scenarios where the writer may
626/// produce data intermittently.
627pub struct SpillPoolReader {
628 /// Shared reference to the spill pool
629 shared: Arc<Mutex<SpillPoolShared>>,
630 /// Current SpillFile we're reading from
631 current_file: Option<SpillFile>,
632 /// Schema of the spilled data
633 schema: SchemaRef,
634}
635
636impl SpillPoolReader {
637 /// Creates a new reader from shared pool state.
638 ///
639 /// This is private - use the `channel()` function to create a reader/writer pair.
640 ///
641 /// # Arguments
642 ///
643 /// * `shared` - Shared reference to the pool state
644 fn new(shared: Arc<Mutex<SpillPoolShared>>, schema: SchemaRef) -> Self {
645 Self {
646 shared,
647 current_file: None,
648 schema,
649 }
650 }
651}
652
653impl Stream for SpillPoolReader {
654 type Item = Result<RecordBatch>;
655
656 fn poll_next(
657 mut self: std::pin::Pin<&mut Self>,
658 cx: &mut std::task::Context<'_>,
659 ) -> std::task::Poll<Option<Self::Item>> {
660 use std::task::Poll;
661
662 loop {
663 // If we have a current file, try to read from it
664 if let Some(ref mut file) = self.current_file {
665 match file.poll_next_unpin(cx) {
666 Poll::Ready(Some(Ok(batch))) => {
667 // Got a batch, return it
668 return Poll::Ready(Some(Ok(batch)));
669 }
670 Poll::Ready(Some(Err(e))) => {
671 // Error reading batch
672 return Poll::Ready(Some(Err(e)));
673 }
674 Poll::Ready(None) => {
675 // Current file stream exhausted
676 // Check if this file is marked as writer_finished
677 let writer_finished = { file.shared.lock().writer_finished };
678
679 if writer_finished {
680 // File is complete, pop it from the queue and move to next
681 let mut shared = self.shared.lock();
682 shared.files.pop_front();
683 drop(shared); // Release lock
684
685 // Clear current file and continue loop to get next file
686 self.current_file = None;
687 continue;
688 } else {
689 // Stream exhausted but writer not finished - unexpected
690 // This shouldn't happen with proper coordination
691 return Poll::Ready(None);
692 }
693 }
694 Poll::Pending => {
695 // File not ready yet (waiting for writer)
696 // Register waker so we get notified when writer adds more batches
697 let mut shared = self.shared.lock();
698 shared.register_waker(cx.waker().clone());
699 return Poll::Pending;
700 }
701 }
702 }
703
704 // No current file, need to get the next one
705 let mut shared = self.shared.lock();
706
707 // Peek at the front of the queue (don't pop yet)
708 if let Some(file_shared) = shared.files.front() {
709 // Create a SpillFile from the shared state
710 let spill_manager = Arc::clone(&shared.spill_manager);
711 let file_shared = Arc::clone(file_shared);
712 drop(shared); // Release lock before creating SpillFile
713
714 self.current_file = Some(SpillFile {
715 shared: file_shared,
716 reader: None,
717 spill_manager,
718 });
719
720 // Continue loop to poll the new file
721 continue;
722 }
723
724 // No files in queue - check if writer is done
725 if shared.writer_dropped {
726 // Writer is done and no more files will be added - EOF
727 return Poll::Ready(None);
728 }
729
730 // Writer still active, register waker that will get notified when new files are added
731 shared.register_waker(cx.waker().clone());
732 return Poll::Pending;
733 }
734 }
735}
736
737impl RecordBatchStream for SpillPoolReader {
738 fn schema(&self) -> SchemaRef {
739 Arc::clone(&self.schema)
740 }
741}
742
743#[cfg(test)]
744mod tests {
745 use super::*;
746 use crate::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
747 use arrow::array::{ArrayRef, Int32Array};
748 use arrow::datatypes::{DataType, Field, Schema};
749 use datafusion_common_runtime::SpawnedTask;
750 use datafusion_execution::runtime_env::RuntimeEnv;
751
752 fn create_test_schema() -> SchemaRef {
753 Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))
754 }
755
756 fn create_test_batch(start: i32, count: usize) -> RecordBatch {
757 let schema = create_test_schema();
758 let a: ArrayRef = Arc::new(Int32Array::from(
759 (start..start + count as i32).collect::<Vec<_>>(),
760 ));
761 RecordBatch::try_new(schema, vec![a]).unwrap()
762 }
763
764 fn create_spill_channel(
765 max_file_size: usize,
766 ) -> (SpillPoolWriter, SendableRecordBatchStream) {
767 let env = Arc::new(RuntimeEnv::default());
768 let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
769 let schema = create_test_schema();
770 let spill_manager = Arc::new(SpillManager::new(env, metrics, schema));
771
772 channel(max_file_size, spill_manager)
773 }
774
775 fn create_spill_channel_with_metrics(
776 max_file_size: usize,
777 ) -> (SpillPoolWriter, SendableRecordBatchStream, SpillMetrics) {
778 let env = Arc::new(RuntimeEnv::default());
779 let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
780 let schema = create_test_schema();
781 let spill_manager = Arc::new(SpillManager::new(env, metrics.clone(), schema));
782
783 let (writer, reader) = channel(max_file_size, spill_manager);
784 (writer, reader, metrics)
785 }
786
787 #[tokio::test]
788 async fn test_basic_write_and_read() -> Result<()> {
789 let (writer, mut reader) = create_spill_channel(1024 * 1024);
790
791 // Write one batch
792 let batch1 = create_test_batch(0, 10);
793 writer.push_batch(&batch1)?;
794
795 // Read the batch
796 let result = reader.next().await.unwrap()?;
797 assert_eq!(result.num_rows(), 10);
798
799 // Write another batch
800 let batch2 = create_test_batch(10, 5);
801 writer.push_batch(&batch2)?;
802 // Read the second batch
803 let result = reader.next().await.unwrap()?;
804 assert_eq!(result.num_rows(), 5);
805
806 Ok(())
807 }
808
809 #[tokio::test]
810 async fn test_single_batch_write_read() -> Result<()> {
811 let (writer, mut reader) = create_spill_channel(1024 * 1024);
812
813 // Write one batch
814 let batch = create_test_batch(0, 5);
815 writer.push_batch(&batch)?;
816
817 // Read it back
818 let result = reader.next().await.unwrap()?;
819 assert_eq!(result.num_rows(), 5);
820
821 // Verify the actual data
822 let col = result
823 .column(0)
824 .as_any()
825 .downcast_ref::<Int32Array>()
826 .unwrap();
827 assert_eq!(col.value(0), 0);
828 assert_eq!(col.value(4), 4);
829
830 Ok(())
831 }
832
833 #[tokio::test]
834 async fn test_multiple_batches_sequential() -> Result<()> {
835 let (writer, mut reader) = create_spill_channel(1024 * 1024);
836
837 // Write multiple batches
838 for i in 0..5 {
839 let batch = create_test_batch(i * 10, 10);
840 writer.push_batch(&batch)?;
841 }
842
843 // Read all batches and verify FIFO order
844 for i in 0..5 {
845 let result = reader.next().await.unwrap()?;
846 assert_eq!(result.num_rows(), 10);
847
848 let col = result
849 .column(0)
850 .as_any()
851 .downcast_ref::<Int32Array>()
852 .unwrap();
853 assert_eq!(col.value(0), i * 10, "Batch {i} not in FIFO order");
854 }
855
856 Ok(())
857 }
858
859 #[tokio::test]
860 async fn test_empty_writer() -> Result<()> {
861 let (_writer, reader) = create_spill_channel(1024 * 1024);
862
863 // Reader should pend since no batches were written
864 let mut reader = reader;
865 let result =
866 tokio::time::timeout(std::time::Duration::from_millis(100), reader.next())
867 .await;
868
869 assert!(result.is_err(), "Reader should timeout on empty writer");
870
871 Ok(())
872 }
873
874 #[tokio::test]
875 async fn test_empty_batch_skipping() -> Result<()> {
876 let (writer, mut reader) = create_spill_channel(1024 * 1024);
877
878 // Write empty batch
879 let empty_batch = create_test_batch(0, 0);
880 writer.push_batch(&empty_batch)?;
881
882 // Write non-empty batch
883 let batch = create_test_batch(0, 5);
884 writer.push_batch(&batch)?;
885
886 // Should only read the non-empty batch
887 let result = reader.next().await.unwrap()?;
888 assert_eq!(result.num_rows(), 5);
889
890 Ok(())
891 }
892
893 #[tokio::test]
894 async fn test_rotation_triggered_by_size() -> Result<()> {
895 // Set a small max_file_size to trigger rotation after one batch
896 let batch1 = create_test_batch(0, 10);
897 let batch_size = batch1.get_array_memory_size() + 1;
898
899 let (writer, mut reader, metrics) = create_spill_channel_with_metrics(batch_size);
900
901 // Write first batch (should fit in first file)
902 writer.push_batch(&batch1)?;
903
904 // Check metrics after first batch - file created but not finalized yet
905 assert_eq!(
906 metrics.spill_file_count.value(),
907 1,
908 "Should have created 1 file after first batch"
909 );
910 assert_eq!(
911 metrics.spilled_bytes.value(),
912 320,
913 "Spilled bytes should reflect data written (header + 1 batch)"
914 );
915 assert_eq!(
916 metrics.spilled_rows.value(),
917 10,
918 "Should have spilled 10 rows from first batch"
919 );
920
921 // Write second batch (should trigger rotation - finalize first file)
922 let batch2 = create_test_batch(10, 10);
923 assert!(
924 batch2.get_array_memory_size() <= batch_size,
925 "batch2 size {} exceeds limit {batch_size}",
926 batch2.get_array_memory_size(),
927 );
928 assert!(
929 batch1.get_array_memory_size() + batch2.get_array_memory_size() > batch_size,
930 "Combined size {} does not exceed limit to trigger rotation",
931 batch1.get_array_memory_size() + batch2.get_array_memory_size()
932 );
933 writer.push_batch(&batch2)?;
934
935 // Check metrics after rotation - first file finalized, but second file not created yet
936 // (new file created lazily on next push_batch call)
937 assert_eq!(
938 metrics.spill_file_count.value(),
939 1,
940 "Should still have 1 file (second file not created until next write)"
941 );
942 assert!(
943 metrics.spilled_bytes.value() > 0,
944 "Spilled bytes should be > 0 after first file finalized (got {})",
945 metrics.spilled_bytes.value()
946 );
947 assert_eq!(
948 metrics.spilled_rows.value(),
949 20,
950 "Should have spilled 20 total rows (10 + 10)"
951 );
952
953 // Write a third batch to confirm rotation occurred (creates second file)
954 let batch3 = create_test_batch(20, 5);
955 writer.push_batch(&batch3)?;
956
957 // Now check that second file was created
958 assert_eq!(
959 metrics.spill_file_count.value(),
960 2,
961 "Should have created 2 files after writing to new file"
962 );
963 assert_eq!(
964 metrics.spilled_rows.value(),
965 25,
966 "Should have spilled 25 total rows (10 + 10 + 5)"
967 );
968
969 // Read all three batches
970 let result1 = reader.next().await.unwrap()?;
971 assert_eq!(result1.num_rows(), 10);
972
973 let result2 = reader.next().await.unwrap()?;
974 assert_eq!(result2.num_rows(), 10);
975
976 let result3 = reader.next().await.unwrap()?;
977 assert_eq!(result3.num_rows(), 5);
978
979 Ok(())
980 }
981
982 #[tokio::test]
983 async fn test_multiple_rotations() -> Result<()> {
984 let batches = (0..10)
985 .map(|i| create_test_batch(i * 10, 10))
986 .collect::<Vec<_>>();
987
988 let batch_size = batches[0].get_array_memory_size() * 2 + 1;
989
990 // Very small max_file_size to force frequent rotations
991 let (writer, mut reader, metrics) = create_spill_channel_with_metrics(batch_size);
992
993 // Write many batches to cause multiple rotations
994 for i in 0..10 {
995 let batch = create_test_batch(i * 10, 10);
996 writer.push_batch(&batch)?;
997 }
998
999 // Check metrics after all writes - should have multiple files due to rotations
1000 // With batch_size = 2 * one_batch + 1, each file fits ~2 batches before rotating
1001 // 10 batches should create multiple files (exact count depends on rotation timing)
1002 let file_count = metrics.spill_file_count.value();
1003 assert!(
1004 file_count >= 4,
1005 "Should have created at least 4 files with multiple rotations (got {file_count})"
1006 );
1007 assert!(
1008 metrics.spilled_bytes.value() > 0,
1009 "Spilled bytes should be > 0 after rotations (got {})",
1010 metrics.spilled_bytes.value()
1011 );
1012 assert_eq!(
1013 metrics.spilled_rows.value(),
1014 100,
1015 "Should have spilled 100 total rows (10 batches * 10 rows)"
1016 );
1017
1018 // Read all batches and verify order
1019 for i in 0..10 {
1020 let result = reader.next().await.unwrap()?;
1021 assert_eq!(result.num_rows(), 10);
1022
1023 let col = result
1024 .column(0)
1025 .as_any()
1026 .downcast_ref::<Int32Array>()
1027 .unwrap();
1028 assert_eq!(
1029 col.value(0),
1030 i * 10,
1031 "Batch {i} not in correct order after rotations"
1032 );
1033 }
1034
1035 Ok(())
1036 }
1037
1038 #[tokio::test]
1039 async fn test_single_batch_larger_than_limit() -> Result<()> {
1040 // Very small limit
1041 let (writer, mut reader, metrics) = create_spill_channel_with_metrics(100);
1042
1043 // Write a batch that exceeds the limit
1044 let large_batch = create_test_batch(0, 100);
1045 writer.push_batch(&large_batch)?;
1046
1047 // Check metrics after large batch - should trigger rotation immediately
1048 assert_eq!(
1049 metrics.spill_file_count.value(),
1050 1,
1051 "Should have created 1 file for large batch"
1052 );
1053 assert_eq!(
1054 metrics.spilled_rows.value(),
1055 100,
1056 "Should have spilled 100 rows from large batch"
1057 );
1058
1059 // Should still write and read successfully
1060 let result = reader.next().await.unwrap()?;
1061 assert_eq!(result.num_rows(), 100);
1062
1063 // Next batch should go to a new file
1064 let batch2 = create_test_batch(100, 10);
1065 writer.push_batch(&batch2)?;
1066
1067 // Check metrics after second batch - should have rotated to a new file
1068 assert_eq!(
1069 metrics.spill_file_count.value(),
1070 2,
1071 "Should have created 2 files after rotation"
1072 );
1073 assert_eq!(
1074 metrics.spilled_rows.value(),
1075 110,
1076 "Should have spilled 110 total rows (100 + 10)"
1077 );
1078
1079 let result2 = reader.next().await.unwrap()?;
1080 assert_eq!(result2.num_rows(), 10);
1081
1082 Ok(())
1083 }
1084
1085 #[tokio::test]
1086 async fn test_very_small_max_file_size() -> Result<()> {
1087 // Test with just 1 byte max (extreme case)
1088 let (writer, mut reader) = create_spill_channel(1);
1089
1090 // Any batch will exceed this limit
1091 let batch = create_test_batch(0, 5);
1092 writer.push_batch(&batch)?;
1093
1094 // Should still work
1095 let result = reader.next().await.unwrap()?;
1096 assert_eq!(result.num_rows(), 5);
1097
1098 Ok(())
1099 }
1100
1101 #[tokio::test]
1102 async fn test_exact_size_boundary() -> Result<()> {
1103 // Create a batch and measure its approximate size
1104 let batch = create_test_batch(0, 10);
1105 let batch_size = batch.get_array_memory_size();
1106
1107 // Set max_file_size to exactly the batch size
1108 let (writer, mut reader, metrics) = create_spill_channel_with_metrics(batch_size);
1109
1110 // Write first batch (exactly at the size limit)
1111 writer.push_batch(&batch)?;
1112
1113 // Check metrics after first batch - should NOT rotate yet (size == limit, not >)
1114 assert_eq!(
1115 metrics.spill_file_count.value(),
1116 1,
1117 "Should have created 1 file after first batch at exact boundary"
1118 );
1119 assert_eq!(
1120 metrics.spilled_rows.value(),
1121 10,
1122 "Should have spilled 10 rows from first batch"
1123 );
1124
1125 // Write second batch (exceeds the limit, should trigger rotation)
1126 let batch2 = create_test_batch(10, 10);
1127 writer.push_batch(&batch2)?;
1128
1129 // Check metrics after second batch - rotation triggered, first file finalized
1130 // Note: second file not created yet (lazy creation on next write)
1131 assert_eq!(
1132 metrics.spill_file_count.value(),
1133 1,
1134 "Should still have 1 file after rotation (second file created lazily)"
1135 );
1136 assert_eq!(
1137 metrics.spilled_rows.value(),
1138 20,
1139 "Should have spilled 20 total rows (10 + 10)"
1140 );
1141 // Verify first file was finalized by checking spilled_bytes
1142 assert!(
1143 metrics.spilled_bytes.value() > 0,
1144 "Spilled bytes should be > 0 after file finalization (got {})",
1145 metrics.spilled_bytes.value()
1146 );
1147
1148 // Both should be readable
1149 let result1 = reader.next().await.unwrap()?;
1150 assert_eq!(result1.num_rows(), 10);
1151
1152 let result2 = reader.next().await.unwrap()?;
1153 assert_eq!(result2.num_rows(), 10);
1154
1155 // Spill another batch, now we should see the second file created
1156 let batch3 = create_test_batch(20, 5);
1157 writer.push_batch(&batch3)?;
1158 assert_eq!(
1159 metrics.spill_file_count.value(),
1160 2,
1161 "Should have created 2 files after writing to new file"
1162 );
1163
1164 Ok(())
1165 }
1166
1167 #[tokio::test]
1168 async fn test_concurrent_reader_writer() -> Result<()> {
1169 let (writer, mut reader) = create_spill_channel(1024 * 1024);
1170
1171 // Spawn writer task
1172 let writer_handle = SpawnedTask::spawn(async move {
1173 for i in 0..10 {
1174 let batch = create_test_batch(i * 10, 10);
1175 writer.push_batch(&batch).unwrap();
1176 // Small delay to simulate real concurrent work
1177 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1178 }
1179 });
1180
1181 // Reader task (runs concurrently)
1182 let reader_handle = SpawnedTask::spawn(async move {
1183 let mut count = 0;
1184 for i in 0..10 {
1185 let result = reader.next().await.unwrap().unwrap();
1186 assert_eq!(result.num_rows(), 10);
1187
1188 let col = result
1189 .column(0)
1190 .as_any()
1191 .downcast_ref::<Int32Array>()
1192 .unwrap();
1193 assert_eq!(col.value(0), i * 10);
1194 count += 1;
1195 }
1196 count
1197 });
1198
1199 // Wait for both to complete
1200 writer_handle.await.unwrap();
1201 let batches_read = reader_handle.await.unwrap();
1202 assert_eq!(batches_read, 10);
1203
1204 Ok(())
1205 }
1206
1207 #[tokio::test]
1208 async fn test_reader_catches_up_to_writer() -> Result<()> {
1209 let (writer, mut reader) = create_spill_channel(1024 * 1024);
1210
1211 let (reader_waiting_tx, reader_waiting_rx) = tokio::sync::oneshot::channel();
1212 let (first_read_done_tx, first_read_done_rx) = tokio::sync::oneshot::channel();
1213
1214 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1215 enum ReadWriteEvent {
1216 ReadStart,
1217 Read(usize),
1218 Write(usize),
1219 }
1220
1221 let events = Arc::new(Mutex::new(vec![]));
1222 // Start reader first (will pend)
1223 let reader_events = Arc::clone(&events);
1224 let reader_handle = SpawnedTask::spawn(async move {
1225 reader_events.lock().push(ReadWriteEvent::ReadStart);
1226 reader_waiting_tx
1227 .send(())
1228 .expect("reader_waiting channel closed unexpectedly");
1229 let result = reader.next().await.unwrap().unwrap();
1230 reader_events
1231 .lock()
1232 .push(ReadWriteEvent::Read(result.num_rows()));
1233 first_read_done_tx
1234 .send(())
1235 .expect("first_read_done channel closed unexpectedly");
1236 let result = reader.next().await.unwrap().unwrap();
1237 reader_events
1238 .lock()
1239 .push(ReadWriteEvent::Read(result.num_rows()));
1240 });
1241
1242 // Wait until the reader is pending on the first batch
1243 reader_waiting_rx
1244 .await
1245 .expect("reader should signal when waiting");
1246
1247 // Now write a batch (should wake the reader)
1248 let batch = create_test_batch(0, 5);
1249 events.lock().push(ReadWriteEvent::Write(batch.num_rows()));
1250 writer.push_batch(&batch)?;
1251
1252 // Wait for the reader to finish the first read before allowing the
1253 // second write. This ensures deterministic ordering of events:
1254 // 1. The reader starts and pends on the first `next()`
1255 // 2. The first write wakes the reader
1256 // 3. The reader processes the first batch and signals completion
1257 // 4. The second write is issued, ensuring consistent event ordering
1258 first_read_done_rx
1259 .await
1260 .expect("reader should signal when first read completes");
1261
1262 // Write another batch
1263 let batch = create_test_batch(5, 10);
1264 events.lock().push(ReadWriteEvent::Write(batch.num_rows()));
1265 writer.push_batch(&batch)?;
1266
1267 // Reader should complete
1268 reader_handle.await.unwrap();
1269 let events = events.lock().clone();
1270 assert_eq!(
1271 events,
1272 vec![
1273 ReadWriteEvent::ReadStart,
1274 ReadWriteEvent::Write(5),
1275 ReadWriteEvent::Read(5),
1276 ReadWriteEvent::Write(10),
1277 ReadWriteEvent::Read(10)
1278 ]
1279 );
1280
1281 Ok(())
1282 }
1283
1284 #[tokio::test]
1285 async fn test_reader_starts_after_writer_finishes() -> Result<()> {
1286 let (writer, reader) = create_spill_channel(128);
1287
1288 // Writer writes all data
1289 for i in 0..5 {
1290 let batch = create_test_batch(i * 10, 10);
1291 writer.push_batch(&batch)?;
1292 }
1293
1294 drop(writer);
1295
1296 // Now start reader
1297 let mut reader = reader;
1298 let mut count = 0;
1299 for i in 0..5 {
1300 let result = reader.next().await.unwrap()?;
1301 assert_eq!(result.num_rows(), 10);
1302
1303 let col = result
1304 .column(0)
1305 .as_any()
1306 .downcast_ref::<Int32Array>()
1307 .unwrap();
1308 assert_eq!(col.value(0), i * 10);
1309 count += 1;
1310 }
1311
1312 assert_eq!(count, 5, "Should read all batches after writer finishes");
1313
1314 Ok(())
1315 }
1316
1317 #[tokio::test]
1318 async fn test_writer_drop_finalizes_file() -> Result<()> {
1319 let env = Arc::new(RuntimeEnv::default());
1320 let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
1321 let schema = create_test_schema();
1322 let spill_manager =
1323 Arc::new(SpillManager::new(Arc::clone(&env), metrics.clone(), schema));
1324
1325 let (writer, mut reader) = channel(1024 * 1024, spill_manager);
1326
1327 // Write some batches
1328 for i in 0..5 {
1329 let batch = create_test_batch(i * 10, 10);
1330 writer.push_batch(&batch)?;
1331 }
1332
1333 // Check metrics before drop - spilled_bytes already reflects written data
1334 let spilled_bytes_before = metrics.spilled_bytes.value();
1335 assert_eq!(
1336 spilled_bytes_before, 1088,
1337 "Spilled bytes should reflect data written (header + 5 batches)"
1338 );
1339
1340 // Explicitly drop the writer - this should finalize the current file
1341 drop(writer);
1342
1343 // Check metrics after drop - spilled_bytes should be > 0 now
1344 let spilled_bytes_after = metrics.spilled_bytes.value();
1345 assert!(
1346 spilled_bytes_after > 0,
1347 "Spilled bytes should be > 0 after writer is dropped (got {spilled_bytes_after})"
1348 );
1349
1350 // Verify reader can still read all batches
1351 let mut count = 0;
1352 for i in 0..5 {
1353 let result = reader.next().await.unwrap()?;
1354 assert_eq!(result.num_rows(), 10);
1355
1356 let col = result
1357 .column(0)
1358 .as_any()
1359 .downcast_ref::<Int32Array>()
1360 .unwrap();
1361 assert_eq!(col.value(0), i * 10);
1362 count += 1;
1363 }
1364
1365 assert_eq!(count, 5, "Should read all batches after writer is dropped");
1366
1367 Ok(())
1368 }
1369
1370 /// Verifies that the reader stays alive as long as any writer clone exists.
1371 ///
1372 /// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning
1373 /// mode multiple input partition tasks share clones of the same writer.
1374 /// The reader must not see EOF until **all** clones have been dropped,
1375 /// even if the queue is temporarily empty between writes from different
1376 /// clones.
1377 ///
1378 /// The test sequence is:
1379 ///
1380 /// 1. writer1 writes a batch, then is dropped.
1381 /// 2. The reader consumes that batch (queue is now empty).
1382 /// 3. writer2 (still alive) writes a batch.
1383 /// 4. The reader must see that batch.
1384 /// 5. EOF is only signalled after writer2 is also dropped.
1385 #[tokio::test]
1386 async fn test_clone_drop_does_not_signal_eof_prematurely() -> Result<()> {
1387 let (writer1, mut reader) = create_spill_channel(1024 * 1024);
1388 let writer2 = writer1.clone();
1389
1390 // Synchronization: tell writer2 when it may proceed.
1391 let (proceed_tx, proceed_rx) = tokio::sync::oneshot::channel::<()>();
1392
1393 // Spawn writer2 — it waits for the signal before writing.
1394 let writer2_handle = SpawnedTask::spawn(async move {
1395 proceed_rx.await.unwrap();
1396 writer2.push_batch(&create_test_batch(10, 10)).unwrap();
1397 // writer2 is dropped here (last clone → true EOF)
1398 });
1399
1400 // Writer1 writes one batch, then drops.
1401 writer1.push_batch(&create_test_batch(0, 10))?;
1402 drop(writer1);
1403
1404 // Read writer1's batch.
1405 let batch1 = reader.next().await.unwrap()?;
1406 assert_eq!(batch1.num_rows(), 10);
1407 let col = batch1
1408 .column(0)
1409 .as_any()
1410 .downcast_ref::<Int32Array>()
1411 .unwrap();
1412 assert_eq!(col.value(0), 0);
1413
1414 // Signal writer2 to write its batch. It will execute when the
1415 // current task yields (i.e. when reader.next() returns Pending).
1416 proceed_tx.send(()).unwrap();
1417
1418 // The reader should wait (Pending) for writer2's data, not EOF.
1419 let batch2 =
1420 tokio::time::timeout(std::time::Duration::from_secs(5), reader.next())
1421 .await
1422 .expect("Reader timed out — should not hang");
1423
1424 assert!(
1425 batch2.is_some(),
1426 "Reader must not return EOF while a writer clone is still alive"
1427 );
1428 let batch2 = batch2.unwrap()?;
1429 assert_eq!(batch2.num_rows(), 10);
1430 let col = batch2
1431 .column(0)
1432 .as_any()
1433 .downcast_ref::<Int32Array>()
1434 .unwrap();
1435 assert_eq!(col.value(0), 10);
1436
1437 writer2_handle.await.unwrap();
1438
1439 // All writers dropped — reader should see real EOF now.
1440 assert!(reader.next().await.is_none());
1441
1442 Ok(())
1443 }
1444
1445 #[tokio::test]
1446 async fn test_disk_usage_decreases_as_files_consumed() -> Result<()> {
1447 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1448
1449 // Test configuration
1450 const NUM_BATCHES: usize = 3;
1451 const ROWS_PER_BATCH: usize = 100;
1452
1453 // Step 1: Create a test batch and measure its size
1454 let batch = create_test_batch(0, ROWS_PER_BATCH);
1455 let batch_size = batch.get_array_memory_size();
1456
1457 // Step 2: Configure file rotation to approximately 1 batch per file
1458 // Create a custom RuntimeEnv so we can access the DiskManager
1459 let runtime = Arc::new(RuntimeEnvBuilder::default().build()?);
1460 let disk_manager = Arc::clone(&runtime.disk_manager);
1461
1462 let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
1463 let schema = create_test_schema();
1464 let spill_manager = Arc::new(SpillManager::new(runtime, metrics.clone(), schema));
1465
1466 let (writer, mut reader) = channel(batch_size, spill_manager);
1467
1468 // Step 3: Write NUM_BATCHES batches to create approximately NUM_BATCHES files
1469 for i in 0..NUM_BATCHES {
1470 let start = (i * ROWS_PER_BATCH) as i32;
1471 writer.push_batch(&create_test_batch(start, ROWS_PER_BATCH))?;
1472 }
1473
1474 // Check how many files were created (should be at least a few due to file rotation)
1475 let file_count = metrics.spill_file_count.value();
1476 assert_eq!(
1477 file_count,
1478 NUM_BATCHES - 1,
1479 "Expected at {} files with rotation, got {file_count}",
1480 NUM_BATCHES - 1
1481 );
1482
1483 // Step 4: Verify initial disk usage reflects all files
1484 let initial_disk_usage = disk_manager.used_disk_space();
1485 assert!(
1486 initial_disk_usage > 0,
1487 "Expected disk usage > 0 after writing batches, got {initial_disk_usage}"
1488 );
1489
1490 // Step 5: Read NUM_BATCHES - 1 batches (all but 1)
1491 // As each file is fully consumed, it should be dropped and disk usage should decrease
1492 for i in 0..(NUM_BATCHES - 1) {
1493 let result = reader.next().await.unwrap()?;
1494 assert_eq!(result.num_rows(), ROWS_PER_BATCH);
1495
1496 let col = result
1497 .column(0)
1498 .as_any()
1499 .downcast_ref::<Int32Array>()
1500 .unwrap();
1501 assert_eq!(col.value(0), (i * ROWS_PER_BATCH) as i32);
1502 }
1503
1504 // Step 6: Verify disk usage decreased but is not zero (at least 1 batch remains)
1505 let partial_disk_usage = disk_manager.used_disk_space();
1506 assert!(
1507 partial_disk_usage > 0
1508 && partial_disk_usage < (batch_size * NUM_BATCHES * 2) as u64,
1509 "Disk usage should be > 0 with remaining batches"
1510 );
1511 assert!(
1512 partial_disk_usage < initial_disk_usage,
1513 "Disk usage should have decreased after reading most batches: initial={initial_disk_usage}, partial={partial_disk_usage}"
1514 );
1515
1516 // Step 7: Read the final batch
1517 let result = reader.next().await.unwrap()?;
1518 assert_eq!(result.num_rows(), ROWS_PER_BATCH);
1519
1520 // Step 8: Drop writer first to signal no more data will be written
1521 // The reader has infinite stream semantics and will wait for the writer
1522 // to be dropped before returning None
1523 drop(writer);
1524
1525 // Verify we've read all batches - now the reader should return None
1526 assert!(
1527 reader.next().await.is_none(),
1528 "Should have no more batches to read"
1529 );
1530
1531 // Step 9: Drop reader to release all references
1532 drop(reader);
1533
1534 // Step 10: Verify complete cleanup - disk usage should be 0
1535 let final_disk_usage = disk_manager.used_disk_space();
1536 assert_eq!(
1537 final_disk_usage, 0,
1538 "Disk usage should be 0 after all files dropped, got {final_disk_usage}"
1539 );
1540
1541 Ok(())
1542 }
1543}