Skip to main content

datafusion_physical_plan/spill/
spill_pool.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use futures::{Stream, StreamExt};
19use std::collections::VecDeque;
20use std::sync::Arc;
21use std::task::Waker;
22
23use parking_lot::Mutex;
24
25use arrow::datatypes::SchemaRef;
26use arrow::record_batch::RecordBatch;
27use datafusion_common::Result;
28use datafusion_execution::disk_manager::RefCountedTempFile;
29use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
30
31use super::in_progress_spill_file::InProgressSpillFile;
32use super::spill_manager::SpillManager;
33
34/// Shared state between the writer and readers of a spill pool.
35/// This contains the queue of files and coordination state.
36///
37/// # Locking Design
38///
39/// This struct uses **fine-grained locking** with nested `Arc<Mutex<>>`:
40/// - `SpillPoolShared` is wrapped in `Arc<Mutex<>>` (outer lock)
41/// - Each `ActiveSpillFileShared` is wrapped in `Arc<Mutex<>>` (inner lock)
42///
43/// This enables:
44/// 1. **Short critical sections**: The outer lock is held only for queue operations
45/// 2. **I/O outside locks**: Disk I/O happens while holding only the file-specific lock
46/// 3. **Concurrent operations**: Reader can access the queue while writer does I/O
47///
48/// **Lock ordering discipline**: Never hold both locks simultaneously to prevent deadlock.
49/// Always: acquire outer lock → release outer lock → acquire inner lock (if needed).
50struct SpillPoolShared {
51    /// Queue of ALL files (including the current write file if it exists).
52    /// Readers always read from the front of this queue (FIFO).
53    /// Each file has its own lock to enable concurrent reader/writer access.
54    files: VecDeque<Arc<Mutex<ActiveSpillFileShared>>>,
55    /// SpillManager for creating files and tracking metrics
56    spill_manager: Arc<SpillManager>,
57    /// Pool-level waker to notify when new files are available (single reader)
58    waker: Option<Waker>,
59    /// Whether the writer has been dropped (no more files will be added)
60    writer_dropped: bool,
61    /// Writer's reference to the current file (shared by all cloned writers).
62    /// Has its own lock to allow I/O without blocking queue access.
63    current_write_file: Option<Arc<Mutex<ActiveSpillFileShared>>>,
64    /// Number of active writer clones. Only when this reaches zero should
65    /// `writer_dropped` be set to true. This prevents premature EOF signaling
66    /// when one writer clone is dropped while others are still active.
67    active_writer_count: usize,
68}
69
70impl SpillPoolShared {
71    /// Creates a new shared pool state
72    fn new(spill_manager: Arc<SpillManager>) -> Self {
73        Self {
74            files: VecDeque::new(),
75            spill_manager,
76            waker: None,
77            writer_dropped: false,
78            current_write_file: None,
79            active_writer_count: 1,
80        }
81    }
82
83    /// Registers a waker to be notified when new data is available (pool-level)
84    fn register_waker(&mut self, waker: Waker) {
85        self.waker = Some(waker);
86    }
87
88    /// Wakes the pool-level reader
89    fn wake(&mut self) {
90        if let Some(waker) = self.waker.take() {
91            waker.wake();
92        }
93    }
94}
95
96/// Writer for a spill pool. Provides coordinated write access with FIFO semantics.
97///
98/// Created by [`channel`]. See that function for architecture diagrams and usage examples.
99///
100/// The writer is `Clone`, allowing multiple writers to coordinate on the same pool.
101/// All clones share the same current write file and coordinate file rotation.
102/// The writer automatically manages file rotation based on the `max_file_size_bytes`
103/// configured in [`channel`]. When the last writer clone is dropped, it finalizes the
104/// current file so readers can access all written data.
105pub struct SpillPoolWriter {
106    /// Maximum size in bytes before rotating to a new file.
107    /// Typically set from configuration `datafusion.execution.max_spill_file_size_bytes`.
108    max_file_size_bytes: usize,
109    /// Shared state with readers (includes current_write_file for coordination)
110    shared: Arc<Mutex<SpillPoolShared>>,
111}
112
113impl Clone for SpillPoolWriter {
114    fn clone(&self) -> Self {
115        // Increment the active writer count so that `writer_dropped` is only
116        // set to true when the *last* clone is dropped.
117        self.shared.lock().active_writer_count += 1;
118        Self {
119            max_file_size_bytes: self.max_file_size_bytes,
120            shared: Arc::clone(&self.shared),
121        }
122    }
123}
124
125impl SpillPoolWriter {
126    /// Spills a batch to the pool, rotating files when necessary.
127    ///
128    /// If the current file would exceed `max_file_size_bytes` after adding
129    /// this batch, the file is finalized and a new one is started.
130    ///
131    /// See [`channel`] for overall architecture and examples.
132    ///
133    /// # File Rotation Logic
134    ///
135    /// ```text
136    /// push_batch()
137    ///      │
138    ///      ▼
139    /// Current file exists?
140    ///      │
141    ///      ├─ No ──▶ Create new file ──▶ Add to shared queue
142    ///      │                               Wake readers
143    ///      ▼
144    /// Write batch to current file
145    ///      │
146    ///      ▼
147    /// estimated_size > max_file_size_bytes?
148    ///      │
149    ///      ├─ No ──▶ Keep current file for next batch
150    ///      │
151    ///      ▼
152    /// Yes: finish() current file
153    ///      Mark writer_finished = true
154    ///      Wake readers
155    ///      │
156    ///      ▼
157    /// Next push_batch() creates new file
158    /// ```
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if disk I/O fails or disk quota is exceeded.
163    pub fn push_batch(&self, batch: &RecordBatch) -> Result<()> {
164        if batch.num_rows() == 0 {
165            // Skip empty batches
166            return Ok(());
167        }
168
169        let batch_size = batch.get_array_memory_size();
170
171        // Fine-grained locking: Lock shared state briefly for queue access
172        let mut shared = self.shared.lock();
173
174        // Create new file if we don't have one yet
175        if shared.current_write_file.is_none() {
176            let spill_manager = Arc::clone(&shared.spill_manager);
177            // Release shared lock before disk I/O (fine-grained locking)
178            drop(shared);
179
180            let writer = spill_manager.create_in_progress_file("SpillPool")?;
181            // Clone the file so readers can access it immediately
182            let file = writer.file().expect("InProgressSpillFile should always have a file when it is first created").clone();
183
184            let file_shared = Arc::new(Mutex::new(ActiveSpillFileShared {
185                writer: Some(writer),
186                file: Some(file), // Set immediately so readers can access it
187                batches_written: 0,
188                estimated_size: 0,
189                writer_finished: false,
190                waker: None,
191            }));
192
193            // Re-acquire lock and push to shared queue
194            shared = self.shared.lock();
195            shared.files.push_back(Arc::clone(&file_shared));
196            shared.current_write_file = Some(file_shared);
197            shared.wake(); // Wake readers waiting for new files
198        }
199
200        let current_write_file = shared.current_write_file.take();
201        // Release shared lock before file I/O (fine-grained locking)
202        // This allows readers to access the queue while we do disk I/O
203        drop(shared);
204
205        // Write batch to current file - lock only the specific file
206        if let Some(current_file) = current_write_file {
207            // Now lock just this file for I/O (separate from shared lock)
208            let mut file_shared = current_file.lock();
209
210            // Append the batch
211            if let Some(ref mut writer) = file_shared.writer {
212                writer.append_batch(batch)?;
213                // make sure we flush the writer for readers
214                writer.flush()?;
215                file_shared.batches_written += 1;
216                file_shared.estimated_size += batch_size;
217            }
218
219            // Wake reader waiting on this specific file
220            file_shared.wake();
221
222            // Check if we need to rotate
223            let needs_rotation = file_shared.estimated_size > self.max_file_size_bytes;
224
225            if needs_rotation {
226                // Finish the IPC writer
227                if let Some(mut writer) = file_shared.writer.take() {
228                    writer.finish()?;
229                }
230                // Mark as finished so readers know not to wait for more data
231                file_shared.writer_finished = true;
232                // Wake reader waiting on this file (it's now finished)
233                file_shared.wake();
234                // Don't put back current_write_file - let it rotate
235            } else {
236                // Release file lock
237                drop(file_shared);
238                // Put back the current file for further writing
239                let mut shared = self.shared.lock();
240                shared.current_write_file = Some(current_file);
241            }
242        }
243
244        Ok(())
245    }
246}
247
248impl Drop for SpillPoolWriter {
249    fn drop(&mut self) {
250        let mut shared = self.shared.lock();
251
252        shared.active_writer_count -= 1;
253        let is_last_writer = shared.active_writer_count == 0;
254
255        if !is_last_writer {
256            // Other writer clones are still active; do not finalize or
257            // signal EOF to readers.
258            return;
259        }
260
261        // Finalize the current file when the last writer is dropped
262        if let Some(current_file) = shared.current_write_file.take() {
263            // Release shared lock before locking file
264            drop(shared);
265
266            let mut file_shared = current_file.lock();
267
268            // Finish the current writer if it exists
269            if let Some(mut writer) = file_shared.writer.take() {
270                // Ignore errors on drop - we're in destructor
271                let _ = writer.finish();
272            }
273
274            // Mark as finished so readers know not to wait for more data
275            file_shared.writer_finished = true;
276
277            // Wake reader waiting on this file (it's now finished)
278            file_shared.wake();
279
280            drop(file_shared);
281            shared = self.shared.lock();
282        }
283
284        // Mark writer as dropped and wake pool-level readers
285        shared.writer_dropped = true;
286        shared.wake();
287    }
288}
289
290/// Creates a paired writer and reader for a spill pool with MPSC (multi-producer, single-consumer)
291/// semantics.
292///
293/// This is the recommended way to create a spill pool. The writer is `Clone`, allowing
294/// multiple producers to coordinate writes to the same pool. The reader can consume batches
295/// in FIFO order. The reader can start reading immediately after a writer appends a batch
296/// to the spill file, without waiting for the file to be sealed, while writers continue to
297/// write more data.
298///
299/// Internally this coordinates rotating spill files based on size limits, and
300/// handles asynchronous notification between the writer and reader using wakers.
301/// This ensures that we manage disk usage efficiently while allowing concurrent
302/// I/O between the writer and reader.
303///
304/// # Data Flow Overview
305///
306/// 1. Writer write batch `B0` to F1
307/// 2. Writer write batch `B1` to F1, notices the size limit exceeded, finishes F1.
308/// 3. Reader read `B0` from F1
309/// 4. Reader read `B1`, no more batch to read -> wait on the waker
310/// 5. Writer write batch `B2` to a new file `F2`, wake up the waiting reader.
311/// 6. Reader read `B2` from F2.
312/// 7. Repeat until writer is dropped.
313///
314/// # Architecture
315///
316/// ```text
317/// ┌─────────────────────────────────────────────────────────────────────────┐
318/// │                            SpillPool                                    │
319/// │                                                                         │
320/// │  Writer Side              Shared State              Reader Side         │
321/// │  ───────────              ────────────              ───────────         │
322/// │                                                                         │
323/// │  SpillPoolWriter    ┌────────────────────┐    SpillPoolReader           │
324/// │       │             │  VecDeque<File>    │          │                   │
325/// │       │             │  ┌────┐┌────┐      │          │                   │
326/// │  push_batch()       │  │ F1 ││ F2 │ ...  │      next().await            │
327/// │       │             │  └────┘└────┘      │          │                   │
328/// │       ▼             │   (FIFO order)     │          ▼                   │
329/// │  ┌─────────┐        │                    │    ┌──────────┐              │
330/// │  │Current  │───────▶│ Coordination:      │◀───│ Current  │              │
331/// │  │Write    │        │ - Wakers           │    │ Read     │              │
332/// │  │File     │        │ - Batch counts     │    │ File     │              │
333/// │  └─────────┘        │ - Writer status    │    └──────────┘              │
334/// │       │             └────────────────────┘          │                   │
335/// │       │                                              │                  │
336/// │  Size > limit?                                Read all batches?         │
337/// │       │                                              │                  │
338/// │       ▼                                              ▼                  │
339/// │  Rotate to new file                            Pop from queue           │
340/// └─────────────────────────────────────────────────────────────────────────┘
341///
342/// Writer produces → Shared FIFO queue → Reader consumes
343/// ```
344///
345/// # File State Machine
346///
347/// Each file in the pool coordinates between writer and reader:
348///
349/// ```text
350///                Writer View              Reader View
351///                ───────────              ───────────
352///
353/// Created        writer: Some(..)         batches_read: 0
354///                batches_written: 0       (waiting for data)
355///                       │
356///                       ▼
357/// Writing        append_batch()           Can read if:
358///                batches_written++        batches_read < batches_written
359///                wake readers
360///                       │                        │
361///                       │                        ▼
362///                ┌──────┴──────┐          poll_next() → batch
363///                │             │          batches_read++
364///                ▼             ▼
365///          Size > limit?  More data?
366///                │             │
367///                │             └─▶ Yes ──▶ Continue writing
368///                ▼
369///          finish()                   Reader catches up:
370///          writer_finished = true     batches_read == batches_written
371///          wake readers                       │
372///                │                            ▼
373///                └─────────────────────▶ Returns Poll::Ready(None)
374///                                       File complete, pop from queue
375/// ```
376///
377/// # Arguments
378///
379/// * `max_file_size_bytes` - Maximum size per file before rotation. When a file
380///   exceeds this size, the writer automatically rotates to a new file.
381/// * `spill_manager` - Manager for file creation and metrics tracking
382///
383/// # Returns
384///
385/// A tuple of `(SpillPoolWriter, SendableRecordBatchStream)` that share the same
386/// underlying pool. The reader is returned as a stream for immediate use with
387/// async stream combinators.
388///
389/// # Example
390///
391/// ```
392/// use std::sync::Arc;
393/// use arrow::array::{ArrayRef, Int32Array};
394/// use arrow::datatypes::{DataType, Field, Schema};
395/// use arrow::record_batch::RecordBatch;
396/// use datafusion_execution::runtime_env::RuntimeEnv;
397/// use futures::StreamExt;
398///
399/// # use datafusion_physical_plan::spill::spill_pool;
400/// # use datafusion_physical_plan::spill::SpillManager; // Re-exported for doctests
401/// # use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
402/// #
403/// # #[tokio::main]
404/// # async fn main() -> datafusion_common::Result<()> {
405/// # // Setup for the example (typically comes from TaskContext in production)
406/// # let env = Arc::new(RuntimeEnv::default());
407/// # let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
408/// # let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
409/// # let spill_manager = Arc::new(SpillManager::new(env, metrics, schema.clone()));
410/// #
411/// // Create channel with 1MB file size limit
412/// let (writer, mut reader) = spill_pool::channel(1024 * 1024, spill_manager);
413///
414/// // Spawn writer and reader concurrently; writer wakes reader via wakers
415/// let writer_task = tokio::spawn(async move {
416///     for i in 0..5 {
417///         let array: ArrayRef = Arc::new(Int32Array::from(vec![i; 100]));
418///         let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
419///         writer.push_batch(&batch)?;
420///     }
421///     // Explicitly drop writer to finalize the spill file and wake the reader
422///     drop(writer);
423///     datafusion_common::Result::<()>::Ok(())
424/// });
425///
426/// let reader_task = tokio::spawn(async move {
427///     let mut batches_read = 0;
428///     while let Some(result) = reader.next().await {
429///         let _batch = result?;
430///         batches_read += 1;
431///     }
432///     datafusion_common::Result::<usize>::Ok(batches_read)
433/// });
434///
435/// let (writer_res, reader_res) = tokio::join!(writer_task, reader_task);
436/// writer_res
437///     .map_err(|e| datafusion_common::DataFusionError::Execution(e.to_string()))??;
438/// let batches_read = reader_res
439///     .map_err(|e| datafusion_common::DataFusionError::Execution(e.to_string()))??;
440///
441/// assert_eq!(batches_read, 5);
442/// # Ok(())
443/// # }
444/// ```
445///
446/// # Why rotate files?
447///
448/// File rotation ensures we don't end up with unreferenced disk usage.
449/// If we used a single file for all spilled data, we would end up with
450/// unreferenced data at the beginning of the file that has already been read
451/// by readers but we can't delete because you can't truncate from the start of a file.
452///
453/// Consider the case of a query like `SELECT * FROM large_table WHERE false`.
454/// Obviously this query produces no output rows, but if we had a spilling operator
455/// in the middle of this query between the scan and the filter it would see the entire
456/// `large_table` flow through it and thus would spill all of that data to disk.
457/// So we'd end up using up to `size(large_table)` bytes of disk space.
458/// If instead we use file rotation, and as long as the readers can keep up with the writer,
459/// then we can ensure that once a file is fully read by all readers it can be deleted,
460/// thus bounding the maximum disk usage to roughly `max_file_size_bytes`.
461pub fn channel(
462    max_file_size_bytes: usize,
463    spill_manager: Arc<SpillManager>,
464) -> (SpillPoolWriter, SendableRecordBatchStream) {
465    let schema = Arc::clone(spill_manager.schema());
466    let shared = Arc::new(Mutex::new(SpillPoolShared::new(spill_manager)));
467
468    let writer = SpillPoolWriter {
469        max_file_size_bytes,
470        shared: Arc::clone(&shared),
471    };
472
473    let reader = SpillPoolReader::new(shared, schema);
474
475    (writer, Box::pin(reader))
476}
477
478/// Shared state between writer and readers for an active spill file.
479/// Protected by a Mutex to coordinate between concurrent readers and the writer.
480struct ActiveSpillFileShared {
481    /// Writer handle - taken (set to None) when finish() is called
482    writer: Option<InProgressSpillFile>,
483    /// The spill file, set when the writer finishes.
484    /// Taken by the reader when creating a stream (the file stays open via file handles).
485    file: Option<RefCountedTempFile>,
486    /// Total number of batches written to this file
487    batches_written: usize,
488    /// Estimated size in bytes of data written to this file
489    estimated_size: usize,
490    /// Whether the writer has finished writing to this file
491    writer_finished: bool,
492    /// Waker for reader waiting on this specific file (SPSC: only one reader)
493    waker: Option<Waker>,
494}
495
496impl ActiveSpillFileShared {
497    /// Registers a waker to be notified when new data is written to this file
498    fn register_waker(&mut self, waker: Waker) {
499        self.waker = Some(waker);
500    }
501
502    /// Wakes the reader waiting on this file
503    fn wake(&mut self) {
504        if let Some(waker) = self.waker.take() {
505            waker.wake();
506        }
507    }
508}
509
510/// Reader state for a SpillFile (owned by individual SpillFile instances).
511/// This is kept separate from the shared state to avoid holding locks during I/O.
512struct SpillFileReader {
513    /// The actual stream reading from disk
514    stream: SendableRecordBatchStream,
515    /// Number of batches this reader has consumed
516    batches_read: usize,
517}
518
519struct SpillFile {
520    /// Shared coordination state (contains writer and batch counts)
521    shared: Arc<Mutex<ActiveSpillFileShared>>,
522    /// Reader state (lazy-initialized, owned by this SpillFile)
523    reader: Option<SpillFileReader>,
524    /// Spill manager for creating readers
525    spill_manager: Arc<SpillManager>,
526}
527
528impl Stream for SpillFile {
529    type Item = Result<RecordBatch>;
530
531    fn poll_next(
532        mut self: std::pin::Pin<&mut Self>,
533        cx: &mut std::task::Context<'_>,
534    ) -> std::task::Poll<Option<Self::Item>> {
535        use std::task::Poll;
536
537        // Step 1: Lock shared state and check coordination
538        let (should_read, file) = {
539            let mut shared = self.shared.lock();
540
541            // Determine if we can read
542            let batches_read = self.reader.as_ref().map_or(0, |r| r.batches_read);
543
544            if batches_read < shared.batches_written {
545                // More data available to read - take the file if we don't have a reader yet
546                let file = if self.reader.is_none() {
547                    shared.file.take()
548                } else {
549                    None
550                };
551                (true, file)
552            } else if shared.writer_finished {
553                // No more data and writer is done - EOF
554                return Poll::Ready(None);
555            } else {
556                // Caught up to writer, but writer still active - register waker and wait
557                shared.register_waker(cx.waker().clone());
558                return Poll::Pending;
559            }
560        }; // Lock released here
561
562        // Step 2: Lazy-create reader stream if needed
563        if self.reader.is_none() && should_read {
564            if let Some(file) = file {
565                // we want this unbuffered because files are actively being written to
566                match self
567                    .spill_manager
568                    .read_spill_as_stream_unbuffered(file, None)
569                {
570                    Ok(stream) => {
571                        self.reader = Some(SpillFileReader {
572                            stream,
573                            batches_read: 0,
574                        });
575                    }
576                    Err(e) => return Poll::Ready(Some(Err(e))),
577                }
578            } else {
579                // File not available yet (writer hasn't finished or already taken)
580                // Register waker and wait for file to be ready
581                let mut shared = self.shared.lock();
582                shared.register_waker(cx.waker().clone());
583                return Poll::Pending;
584            }
585        }
586
587        // Step 3: Poll the reader stream (no lock held)
588        if let Some(reader) = &mut self.reader {
589            match reader.stream.poll_next_unpin(cx) {
590                Poll::Ready(Some(Ok(batch))) => {
591                    // Successfully read a batch - increment counter
592                    reader.batches_read += 1;
593                    Poll::Ready(Some(Ok(batch)))
594                }
595                Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
596                Poll::Ready(None) => {
597                    // Stream exhausted unexpectedly
598                    // This shouldn't happen if coordination is correct, but handle gracefully
599                    Poll::Ready(None)
600                }
601                Poll::Pending => Poll::Pending,
602            }
603        } else {
604            // Should not reach here, but handle gracefully
605            Poll::Ready(None)
606        }
607    }
608}
609
610/// A stream that reads from a SpillPool in FIFO order.
611///
612/// Created by [`channel`]. See that function for architecture diagrams and usage examples.
613///
614/// The stream automatically handles file rotation and reads from completed files.
615/// When no data is available, it returns `Poll::Pending` and registers a waker to
616/// be notified when the writer produces more data.
617///
618/// # Infinite Stream Semantics
619///
620/// This stream never returns `None` (`Poll::Ready(None)`) on its own - it will keep
621/// waiting for the writer to produce more data. The stream ends only when:
622/// - The reader is dropped
623/// - The writer is dropped AND all queued data has been consumed
624///
625/// This makes it suitable for continuous streaming scenarios where the writer may
626/// produce data intermittently.
627pub struct SpillPoolReader {
628    /// Shared reference to the spill pool
629    shared: Arc<Mutex<SpillPoolShared>>,
630    /// Current SpillFile we're reading from
631    current_file: Option<SpillFile>,
632    /// Schema of the spilled data
633    schema: SchemaRef,
634}
635
636impl SpillPoolReader {
637    /// Creates a new reader from shared pool state.
638    ///
639    /// This is private - use the `channel()` function to create a reader/writer pair.
640    ///
641    /// # Arguments
642    ///
643    /// * `shared` - Shared reference to the pool state
644    fn new(shared: Arc<Mutex<SpillPoolShared>>, schema: SchemaRef) -> Self {
645        Self {
646            shared,
647            current_file: None,
648            schema,
649        }
650    }
651}
652
653impl Stream for SpillPoolReader {
654    type Item = Result<RecordBatch>;
655
656    fn poll_next(
657        mut self: std::pin::Pin<&mut Self>,
658        cx: &mut std::task::Context<'_>,
659    ) -> std::task::Poll<Option<Self::Item>> {
660        use std::task::Poll;
661
662        loop {
663            // If we have a current file, try to read from it
664            if let Some(ref mut file) = self.current_file {
665                match file.poll_next_unpin(cx) {
666                    Poll::Ready(Some(Ok(batch))) => {
667                        // Got a batch, return it
668                        return Poll::Ready(Some(Ok(batch)));
669                    }
670                    Poll::Ready(Some(Err(e))) => {
671                        // Error reading batch
672                        return Poll::Ready(Some(Err(e)));
673                    }
674                    Poll::Ready(None) => {
675                        // Current file stream exhausted
676                        // Check if this file is marked as writer_finished
677                        let writer_finished = { file.shared.lock().writer_finished };
678
679                        if writer_finished {
680                            // File is complete, pop it from the queue and move to next
681                            let mut shared = self.shared.lock();
682                            shared.files.pop_front();
683                            drop(shared); // Release lock
684
685                            // Clear current file and continue loop to get next file
686                            self.current_file = None;
687                            continue;
688                        } else {
689                            // Stream exhausted but writer not finished - unexpected
690                            // This shouldn't happen with proper coordination
691                            return Poll::Ready(None);
692                        }
693                    }
694                    Poll::Pending => {
695                        // File not ready yet (waiting for writer)
696                        // Register waker so we get notified when writer adds more batches
697                        let mut shared = self.shared.lock();
698                        shared.register_waker(cx.waker().clone());
699                        return Poll::Pending;
700                    }
701                }
702            }
703
704            // No current file, need to get the next one
705            let mut shared = self.shared.lock();
706
707            // Peek at the front of the queue (don't pop yet)
708            if let Some(file_shared) = shared.files.front() {
709                // Create a SpillFile from the shared state
710                let spill_manager = Arc::clone(&shared.spill_manager);
711                let file_shared = Arc::clone(file_shared);
712                drop(shared); // Release lock before creating SpillFile
713
714                self.current_file = Some(SpillFile {
715                    shared: file_shared,
716                    reader: None,
717                    spill_manager,
718                });
719
720                // Continue loop to poll the new file
721                continue;
722            }
723
724            // No files in queue - check if writer is done
725            if shared.writer_dropped {
726                // Writer is done and no more files will be added - EOF
727                return Poll::Ready(None);
728            }
729
730            // Writer still active, register waker that will get notified when new files are added
731            shared.register_waker(cx.waker().clone());
732            return Poll::Pending;
733        }
734    }
735}
736
737impl RecordBatchStream for SpillPoolReader {
738    fn schema(&self) -> SchemaRef {
739        Arc::clone(&self.schema)
740    }
741}
742
743#[cfg(test)]
744mod tests {
745    use super::*;
746    use crate::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
747    use arrow::array::{ArrayRef, Int32Array};
748    use arrow::datatypes::{DataType, Field, Schema};
749    use datafusion_common_runtime::SpawnedTask;
750    use datafusion_execution::runtime_env::RuntimeEnv;
751
752    fn create_test_schema() -> SchemaRef {
753        Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))
754    }
755
756    fn create_test_batch(start: i32, count: usize) -> RecordBatch {
757        let schema = create_test_schema();
758        let a: ArrayRef = Arc::new(Int32Array::from(
759            (start..start + count as i32).collect::<Vec<_>>(),
760        ));
761        RecordBatch::try_new(schema, vec![a]).unwrap()
762    }
763
764    fn create_spill_channel(
765        max_file_size: usize,
766    ) -> (SpillPoolWriter, SendableRecordBatchStream) {
767        let env = Arc::new(RuntimeEnv::default());
768        let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
769        let schema = create_test_schema();
770        let spill_manager = Arc::new(SpillManager::new(env, metrics, schema));
771
772        channel(max_file_size, spill_manager)
773    }
774
775    fn create_spill_channel_with_metrics(
776        max_file_size: usize,
777    ) -> (SpillPoolWriter, SendableRecordBatchStream, SpillMetrics) {
778        let env = Arc::new(RuntimeEnv::default());
779        let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
780        let schema = create_test_schema();
781        let spill_manager = Arc::new(SpillManager::new(env, metrics.clone(), schema));
782
783        let (writer, reader) = channel(max_file_size, spill_manager);
784        (writer, reader, metrics)
785    }
786
787    #[tokio::test]
788    async fn test_basic_write_and_read() -> Result<()> {
789        let (writer, mut reader) = create_spill_channel(1024 * 1024);
790
791        // Write one batch
792        let batch1 = create_test_batch(0, 10);
793        writer.push_batch(&batch1)?;
794
795        // Read the batch
796        let result = reader.next().await.unwrap()?;
797        assert_eq!(result.num_rows(), 10);
798
799        // Write another batch
800        let batch2 = create_test_batch(10, 5);
801        writer.push_batch(&batch2)?;
802        // Read the second batch
803        let result = reader.next().await.unwrap()?;
804        assert_eq!(result.num_rows(), 5);
805
806        Ok(())
807    }
808
809    #[tokio::test]
810    async fn test_single_batch_write_read() -> Result<()> {
811        let (writer, mut reader) = create_spill_channel(1024 * 1024);
812
813        // Write one batch
814        let batch = create_test_batch(0, 5);
815        writer.push_batch(&batch)?;
816
817        // Read it back
818        let result = reader.next().await.unwrap()?;
819        assert_eq!(result.num_rows(), 5);
820
821        // Verify the actual data
822        let col = result
823            .column(0)
824            .as_any()
825            .downcast_ref::<Int32Array>()
826            .unwrap();
827        assert_eq!(col.value(0), 0);
828        assert_eq!(col.value(4), 4);
829
830        Ok(())
831    }
832
833    #[tokio::test]
834    async fn test_multiple_batches_sequential() -> Result<()> {
835        let (writer, mut reader) = create_spill_channel(1024 * 1024);
836
837        // Write multiple batches
838        for i in 0..5 {
839            let batch = create_test_batch(i * 10, 10);
840            writer.push_batch(&batch)?;
841        }
842
843        // Read all batches and verify FIFO order
844        for i in 0..5 {
845            let result = reader.next().await.unwrap()?;
846            assert_eq!(result.num_rows(), 10);
847
848            let col = result
849                .column(0)
850                .as_any()
851                .downcast_ref::<Int32Array>()
852                .unwrap();
853            assert_eq!(col.value(0), i * 10, "Batch {i} not in FIFO order");
854        }
855
856        Ok(())
857    }
858
859    #[tokio::test]
860    async fn test_empty_writer() -> Result<()> {
861        let (_writer, reader) = create_spill_channel(1024 * 1024);
862
863        // Reader should pend since no batches were written
864        let mut reader = reader;
865        let result =
866            tokio::time::timeout(std::time::Duration::from_millis(100), reader.next())
867                .await;
868
869        assert!(result.is_err(), "Reader should timeout on empty writer");
870
871        Ok(())
872    }
873
874    #[tokio::test]
875    async fn test_empty_batch_skipping() -> Result<()> {
876        let (writer, mut reader) = create_spill_channel(1024 * 1024);
877
878        // Write empty batch
879        let empty_batch = create_test_batch(0, 0);
880        writer.push_batch(&empty_batch)?;
881
882        // Write non-empty batch
883        let batch = create_test_batch(0, 5);
884        writer.push_batch(&batch)?;
885
886        // Should only read the non-empty batch
887        let result = reader.next().await.unwrap()?;
888        assert_eq!(result.num_rows(), 5);
889
890        Ok(())
891    }
892
893    #[tokio::test]
894    async fn test_rotation_triggered_by_size() -> Result<()> {
895        // Set a small max_file_size to trigger rotation after one batch
896        let batch1 = create_test_batch(0, 10);
897        let batch_size = batch1.get_array_memory_size() + 1;
898
899        let (writer, mut reader, metrics) = create_spill_channel_with_metrics(batch_size);
900
901        // Write first batch (should fit in first file)
902        writer.push_batch(&batch1)?;
903
904        // Check metrics after first batch - file created but not finalized yet
905        assert_eq!(
906            metrics.spill_file_count.value(),
907            1,
908            "Should have created 1 file after first batch"
909        );
910        assert_eq!(
911            metrics.spilled_bytes.value(),
912            320,
913            "Spilled bytes should reflect data written (header + 1 batch)"
914        );
915        assert_eq!(
916            metrics.spilled_rows.value(),
917            10,
918            "Should have spilled 10 rows from first batch"
919        );
920
921        // Write second batch (should trigger rotation - finalize first file)
922        let batch2 = create_test_batch(10, 10);
923        assert!(
924            batch2.get_array_memory_size() <= batch_size,
925            "batch2 size {} exceeds limit {batch_size}",
926            batch2.get_array_memory_size(),
927        );
928        assert!(
929            batch1.get_array_memory_size() + batch2.get_array_memory_size() > batch_size,
930            "Combined size {} does not exceed limit to trigger rotation",
931            batch1.get_array_memory_size() + batch2.get_array_memory_size()
932        );
933        writer.push_batch(&batch2)?;
934
935        // Check metrics after rotation - first file finalized, but second file not created yet
936        // (new file created lazily on next push_batch call)
937        assert_eq!(
938            metrics.spill_file_count.value(),
939            1,
940            "Should still have 1 file (second file not created until next write)"
941        );
942        assert!(
943            metrics.spilled_bytes.value() > 0,
944            "Spilled bytes should be > 0 after first file finalized (got {})",
945            metrics.spilled_bytes.value()
946        );
947        assert_eq!(
948            metrics.spilled_rows.value(),
949            20,
950            "Should have spilled 20 total rows (10 + 10)"
951        );
952
953        // Write a third batch to confirm rotation occurred (creates second file)
954        let batch3 = create_test_batch(20, 5);
955        writer.push_batch(&batch3)?;
956
957        // Now check that second file was created
958        assert_eq!(
959            metrics.spill_file_count.value(),
960            2,
961            "Should have created 2 files after writing to new file"
962        );
963        assert_eq!(
964            metrics.spilled_rows.value(),
965            25,
966            "Should have spilled 25 total rows (10 + 10 + 5)"
967        );
968
969        // Read all three batches
970        let result1 = reader.next().await.unwrap()?;
971        assert_eq!(result1.num_rows(), 10);
972
973        let result2 = reader.next().await.unwrap()?;
974        assert_eq!(result2.num_rows(), 10);
975
976        let result3 = reader.next().await.unwrap()?;
977        assert_eq!(result3.num_rows(), 5);
978
979        Ok(())
980    }
981
982    #[tokio::test]
983    async fn test_multiple_rotations() -> Result<()> {
984        let batches = (0..10)
985            .map(|i| create_test_batch(i * 10, 10))
986            .collect::<Vec<_>>();
987
988        let batch_size = batches[0].get_array_memory_size() * 2 + 1;
989
990        // Very small max_file_size to force frequent rotations
991        let (writer, mut reader, metrics) = create_spill_channel_with_metrics(batch_size);
992
993        // Write many batches to cause multiple rotations
994        for i in 0..10 {
995            let batch = create_test_batch(i * 10, 10);
996            writer.push_batch(&batch)?;
997        }
998
999        // Check metrics after all writes - should have multiple files due to rotations
1000        // With batch_size = 2 * one_batch + 1, each file fits ~2 batches before rotating
1001        // 10 batches should create multiple files (exact count depends on rotation timing)
1002        let file_count = metrics.spill_file_count.value();
1003        assert!(
1004            file_count >= 4,
1005            "Should have created at least 4 files with multiple rotations (got {file_count})"
1006        );
1007        assert!(
1008            metrics.spilled_bytes.value() > 0,
1009            "Spilled bytes should be > 0 after rotations (got {})",
1010            metrics.spilled_bytes.value()
1011        );
1012        assert_eq!(
1013            metrics.spilled_rows.value(),
1014            100,
1015            "Should have spilled 100 total rows (10 batches * 10 rows)"
1016        );
1017
1018        // Read all batches and verify order
1019        for i in 0..10 {
1020            let result = reader.next().await.unwrap()?;
1021            assert_eq!(result.num_rows(), 10);
1022
1023            let col = result
1024                .column(0)
1025                .as_any()
1026                .downcast_ref::<Int32Array>()
1027                .unwrap();
1028            assert_eq!(
1029                col.value(0),
1030                i * 10,
1031                "Batch {i} not in correct order after rotations"
1032            );
1033        }
1034
1035        Ok(())
1036    }
1037
1038    #[tokio::test]
1039    async fn test_single_batch_larger_than_limit() -> Result<()> {
1040        // Very small limit
1041        let (writer, mut reader, metrics) = create_spill_channel_with_metrics(100);
1042
1043        // Write a batch that exceeds the limit
1044        let large_batch = create_test_batch(0, 100);
1045        writer.push_batch(&large_batch)?;
1046
1047        // Check metrics after large batch - should trigger rotation immediately
1048        assert_eq!(
1049            metrics.spill_file_count.value(),
1050            1,
1051            "Should have created 1 file for large batch"
1052        );
1053        assert_eq!(
1054            metrics.spilled_rows.value(),
1055            100,
1056            "Should have spilled 100 rows from large batch"
1057        );
1058
1059        // Should still write and read successfully
1060        let result = reader.next().await.unwrap()?;
1061        assert_eq!(result.num_rows(), 100);
1062
1063        // Next batch should go to a new file
1064        let batch2 = create_test_batch(100, 10);
1065        writer.push_batch(&batch2)?;
1066
1067        // Check metrics after second batch - should have rotated to a new file
1068        assert_eq!(
1069            metrics.spill_file_count.value(),
1070            2,
1071            "Should have created 2 files after rotation"
1072        );
1073        assert_eq!(
1074            metrics.spilled_rows.value(),
1075            110,
1076            "Should have spilled 110 total rows (100 + 10)"
1077        );
1078
1079        let result2 = reader.next().await.unwrap()?;
1080        assert_eq!(result2.num_rows(), 10);
1081
1082        Ok(())
1083    }
1084
1085    #[tokio::test]
1086    async fn test_very_small_max_file_size() -> Result<()> {
1087        // Test with just 1 byte max (extreme case)
1088        let (writer, mut reader) = create_spill_channel(1);
1089
1090        // Any batch will exceed this limit
1091        let batch = create_test_batch(0, 5);
1092        writer.push_batch(&batch)?;
1093
1094        // Should still work
1095        let result = reader.next().await.unwrap()?;
1096        assert_eq!(result.num_rows(), 5);
1097
1098        Ok(())
1099    }
1100
1101    #[tokio::test]
1102    async fn test_exact_size_boundary() -> Result<()> {
1103        // Create a batch and measure its approximate size
1104        let batch = create_test_batch(0, 10);
1105        let batch_size = batch.get_array_memory_size();
1106
1107        // Set max_file_size to exactly the batch size
1108        let (writer, mut reader, metrics) = create_spill_channel_with_metrics(batch_size);
1109
1110        // Write first batch (exactly at the size limit)
1111        writer.push_batch(&batch)?;
1112
1113        // Check metrics after first batch - should NOT rotate yet (size == limit, not >)
1114        assert_eq!(
1115            metrics.spill_file_count.value(),
1116            1,
1117            "Should have created 1 file after first batch at exact boundary"
1118        );
1119        assert_eq!(
1120            metrics.spilled_rows.value(),
1121            10,
1122            "Should have spilled 10 rows from first batch"
1123        );
1124
1125        // Write second batch (exceeds the limit, should trigger rotation)
1126        let batch2 = create_test_batch(10, 10);
1127        writer.push_batch(&batch2)?;
1128
1129        // Check metrics after second batch - rotation triggered, first file finalized
1130        // Note: second file not created yet (lazy creation on next write)
1131        assert_eq!(
1132            metrics.spill_file_count.value(),
1133            1,
1134            "Should still have 1 file after rotation (second file created lazily)"
1135        );
1136        assert_eq!(
1137            metrics.spilled_rows.value(),
1138            20,
1139            "Should have spilled 20 total rows (10 + 10)"
1140        );
1141        // Verify first file was finalized by checking spilled_bytes
1142        assert!(
1143            metrics.spilled_bytes.value() > 0,
1144            "Spilled bytes should be > 0 after file finalization (got {})",
1145            metrics.spilled_bytes.value()
1146        );
1147
1148        // Both should be readable
1149        let result1 = reader.next().await.unwrap()?;
1150        assert_eq!(result1.num_rows(), 10);
1151
1152        let result2 = reader.next().await.unwrap()?;
1153        assert_eq!(result2.num_rows(), 10);
1154
1155        // Spill another batch, now we should see the second file created
1156        let batch3 = create_test_batch(20, 5);
1157        writer.push_batch(&batch3)?;
1158        assert_eq!(
1159            metrics.spill_file_count.value(),
1160            2,
1161            "Should have created 2 files after writing to new file"
1162        );
1163
1164        Ok(())
1165    }
1166
1167    #[tokio::test]
1168    async fn test_concurrent_reader_writer() -> Result<()> {
1169        let (writer, mut reader) = create_spill_channel(1024 * 1024);
1170
1171        // Spawn writer task
1172        let writer_handle = SpawnedTask::spawn(async move {
1173            for i in 0..10 {
1174                let batch = create_test_batch(i * 10, 10);
1175                writer.push_batch(&batch).unwrap();
1176                // Small delay to simulate real concurrent work
1177                tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1178            }
1179        });
1180
1181        // Reader task (runs concurrently)
1182        let reader_handle = SpawnedTask::spawn(async move {
1183            let mut count = 0;
1184            for i in 0..10 {
1185                let result = reader.next().await.unwrap().unwrap();
1186                assert_eq!(result.num_rows(), 10);
1187
1188                let col = result
1189                    .column(0)
1190                    .as_any()
1191                    .downcast_ref::<Int32Array>()
1192                    .unwrap();
1193                assert_eq!(col.value(0), i * 10);
1194                count += 1;
1195            }
1196            count
1197        });
1198
1199        // Wait for both to complete
1200        writer_handle.await.unwrap();
1201        let batches_read = reader_handle.await.unwrap();
1202        assert_eq!(batches_read, 10);
1203
1204        Ok(())
1205    }
1206
1207    #[tokio::test]
1208    async fn test_reader_catches_up_to_writer() -> Result<()> {
1209        let (writer, mut reader) = create_spill_channel(1024 * 1024);
1210
1211        let (reader_waiting_tx, reader_waiting_rx) = tokio::sync::oneshot::channel();
1212        let (first_read_done_tx, first_read_done_rx) = tokio::sync::oneshot::channel();
1213
1214        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1215        enum ReadWriteEvent {
1216            ReadStart,
1217            Read(usize),
1218            Write(usize),
1219        }
1220
1221        let events = Arc::new(Mutex::new(vec![]));
1222        // Start reader first (will pend)
1223        let reader_events = Arc::clone(&events);
1224        let reader_handle = SpawnedTask::spawn(async move {
1225            reader_events.lock().push(ReadWriteEvent::ReadStart);
1226            reader_waiting_tx
1227                .send(())
1228                .expect("reader_waiting channel closed unexpectedly");
1229            let result = reader.next().await.unwrap().unwrap();
1230            reader_events
1231                .lock()
1232                .push(ReadWriteEvent::Read(result.num_rows()));
1233            first_read_done_tx
1234                .send(())
1235                .expect("first_read_done channel closed unexpectedly");
1236            let result = reader.next().await.unwrap().unwrap();
1237            reader_events
1238                .lock()
1239                .push(ReadWriteEvent::Read(result.num_rows()));
1240        });
1241
1242        // Wait until the reader is pending on the first batch
1243        reader_waiting_rx
1244            .await
1245            .expect("reader should signal when waiting");
1246
1247        // Now write a batch (should wake the reader)
1248        let batch = create_test_batch(0, 5);
1249        events.lock().push(ReadWriteEvent::Write(batch.num_rows()));
1250        writer.push_batch(&batch)?;
1251
1252        // Wait for the reader to finish the first read before allowing the
1253        // second write. This ensures deterministic ordering of events:
1254        // 1. The reader starts and pends on the first `next()`
1255        // 2. The first write wakes the reader
1256        // 3. The reader processes the first batch and signals completion
1257        // 4. The second write is issued, ensuring consistent event ordering
1258        first_read_done_rx
1259            .await
1260            .expect("reader should signal when first read completes");
1261
1262        // Write another batch
1263        let batch = create_test_batch(5, 10);
1264        events.lock().push(ReadWriteEvent::Write(batch.num_rows()));
1265        writer.push_batch(&batch)?;
1266
1267        // Reader should complete
1268        reader_handle.await.unwrap();
1269        let events = events.lock().clone();
1270        assert_eq!(
1271            events,
1272            vec![
1273                ReadWriteEvent::ReadStart,
1274                ReadWriteEvent::Write(5),
1275                ReadWriteEvent::Read(5),
1276                ReadWriteEvent::Write(10),
1277                ReadWriteEvent::Read(10)
1278            ]
1279        );
1280
1281        Ok(())
1282    }
1283
1284    #[tokio::test]
1285    async fn test_reader_starts_after_writer_finishes() -> Result<()> {
1286        let (writer, reader) = create_spill_channel(128);
1287
1288        // Writer writes all data
1289        for i in 0..5 {
1290            let batch = create_test_batch(i * 10, 10);
1291            writer.push_batch(&batch)?;
1292        }
1293
1294        drop(writer);
1295
1296        // Now start reader
1297        let mut reader = reader;
1298        let mut count = 0;
1299        for i in 0..5 {
1300            let result = reader.next().await.unwrap()?;
1301            assert_eq!(result.num_rows(), 10);
1302
1303            let col = result
1304                .column(0)
1305                .as_any()
1306                .downcast_ref::<Int32Array>()
1307                .unwrap();
1308            assert_eq!(col.value(0), i * 10);
1309            count += 1;
1310        }
1311
1312        assert_eq!(count, 5, "Should read all batches after writer finishes");
1313
1314        Ok(())
1315    }
1316
1317    #[tokio::test]
1318    async fn test_writer_drop_finalizes_file() -> Result<()> {
1319        let env = Arc::new(RuntimeEnv::default());
1320        let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
1321        let schema = create_test_schema();
1322        let spill_manager =
1323            Arc::new(SpillManager::new(Arc::clone(&env), metrics.clone(), schema));
1324
1325        let (writer, mut reader) = channel(1024 * 1024, spill_manager);
1326
1327        // Write some batches
1328        for i in 0..5 {
1329            let batch = create_test_batch(i * 10, 10);
1330            writer.push_batch(&batch)?;
1331        }
1332
1333        // Check metrics before drop - spilled_bytes already reflects written data
1334        let spilled_bytes_before = metrics.spilled_bytes.value();
1335        assert_eq!(
1336            spilled_bytes_before, 1088,
1337            "Spilled bytes should reflect data written (header + 5 batches)"
1338        );
1339
1340        // Explicitly drop the writer - this should finalize the current file
1341        drop(writer);
1342
1343        // Check metrics after drop - spilled_bytes should be > 0 now
1344        let spilled_bytes_after = metrics.spilled_bytes.value();
1345        assert!(
1346            spilled_bytes_after > 0,
1347            "Spilled bytes should be > 0 after writer is dropped (got {spilled_bytes_after})"
1348        );
1349
1350        // Verify reader can still read all batches
1351        let mut count = 0;
1352        for i in 0..5 {
1353            let result = reader.next().await.unwrap()?;
1354            assert_eq!(result.num_rows(), 10);
1355
1356            let col = result
1357                .column(0)
1358                .as_any()
1359                .downcast_ref::<Int32Array>()
1360                .unwrap();
1361            assert_eq!(col.value(0), i * 10);
1362            count += 1;
1363        }
1364
1365        assert_eq!(count, 5, "Should read all batches after writer is dropped");
1366
1367        Ok(())
1368    }
1369
1370    /// Verifies that the reader stays alive as long as any writer clone exists.
1371    ///
1372    /// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning
1373    /// mode multiple input partition tasks share clones of the same writer.
1374    /// The reader must not see EOF until **all** clones have been dropped,
1375    /// even if the queue is temporarily empty between writes from different
1376    /// clones.
1377    ///
1378    /// The test sequence is:
1379    ///
1380    /// 1. writer1 writes a batch, then is dropped.
1381    /// 2. The reader consumes that batch (queue is now empty).
1382    /// 3. writer2 (still alive) writes a batch.
1383    /// 4. The reader must see that batch.
1384    /// 5. EOF is only signalled after writer2 is also dropped.
1385    #[tokio::test]
1386    async fn test_clone_drop_does_not_signal_eof_prematurely() -> Result<()> {
1387        let (writer1, mut reader) = create_spill_channel(1024 * 1024);
1388        let writer2 = writer1.clone();
1389
1390        // Synchronization: tell writer2 when it may proceed.
1391        let (proceed_tx, proceed_rx) = tokio::sync::oneshot::channel::<()>();
1392
1393        // Spawn writer2 — it waits for the signal before writing.
1394        let writer2_handle = SpawnedTask::spawn(async move {
1395            proceed_rx.await.unwrap();
1396            writer2.push_batch(&create_test_batch(10, 10)).unwrap();
1397            // writer2 is dropped here (last clone → true EOF)
1398        });
1399
1400        // Writer1 writes one batch, then drops.
1401        writer1.push_batch(&create_test_batch(0, 10))?;
1402        drop(writer1);
1403
1404        // Read writer1's batch.
1405        let batch1 = reader.next().await.unwrap()?;
1406        assert_eq!(batch1.num_rows(), 10);
1407        let col = batch1
1408            .column(0)
1409            .as_any()
1410            .downcast_ref::<Int32Array>()
1411            .unwrap();
1412        assert_eq!(col.value(0), 0);
1413
1414        // Signal writer2 to write its batch. It will execute when the
1415        // current task yields (i.e. when reader.next() returns Pending).
1416        proceed_tx.send(()).unwrap();
1417
1418        // The reader should wait (Pending) for writer2's data, not EOF.
1419        let batch2 =
1420            tokio::time::timeout(std::time::Duration::from_secs(5), reader.next())
1421                .await
1422                .expect("Reader timed out — should not hang");
1423
1424        assert!(
1425            batch2.is_some(),
1426            "Reader must not return EOF while a writer clone is still alive"
1427        );
1428        let batch2 = batch2.unwrap()?;
1429        assert_eq!(batch2.num_rows(), 10);
1430        let col = batch2
1431            .column(0)
1432            .as_any()
1433            .downcast_ref::<Int32Array>()
1434            .unwrap();
1435        assert_eq!(col.value(0), 10);
1436
1437        writer2_handle.await.unwrap();
1438
1439        // All writers dropped — reader should see real EOF now.
1440        assert!(reader.next().await.is_none());
1441
1442        Ok(())
1443    }
1444
1445    #[tokio::test]
1446    async fn test_disk_usage_decreases_as_files_consumed() -> Result<()> {
1447        use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1448
1449        // Test configuration
1450        const NUM_BATCHES: usize = 3;
1451        const ROWS_PER_BATCH: usize = 100;
1452
1453        // Step 1: Create a test batch and measure its size
1454        let batch = create_test_batch(0, ROWS_PER_BATCH);
1455        let batch_size = batch.get_array_memory_size();
1456
1457        // Step 2: Configure file rotation to approximately 1 batch per file
1458        // Create a custom RuntimeEnv so we can access the DiskManager
1459        let runtime = Arc::new(RuntimeEnvBuilder::default().build()?);
1460        let disk_manager = Arc::clone(&runtime.disk_manager);
1461
1462        let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
1463        let schema = create_test_schema();
1464        let spill_manager = Arc::new(SpillManager::new(runtime, metrics.clone(), schema));
1465
1466        let (writer, mut reader) = channel(batch_size, spill_manager);
1467
1468        // Step 3: Write NUM_BATCHES batches to create approximately NUM_BATCHES files
1469        for i in 0..NUM_BATCHES {
1470            let start = (i * ROWS_PER_BATCH) as i32;
1471            writer.push_batch(&create_test_batch(start, ROWS_PER_BATCH))?;
1472        }
1473
1474        // Check how many files were created (should be at least a few due to file rotation)
1475        let file_count = metrics.spill_file_count.value();
1476        assert_eq!(
1477            file_count,
1478            NUM_BATCHES - 1,
1479            "Expected at {} files with rotation, got {file_count}",
1480            NUM_BATCHES - 1
1481        );
1482
1483        // Step 4: Verify initial disk usage reflects all files
1484        let initial_disk_usage = disk_manager.used_disk_space();
1485        assert!(
1486            initial_disk_usage > 0,
1487            "Expected disk usage > 0 after writing batches, got {initial_disk_usage}"
1488        );
1489
1490        // Step 5: Read NUM_BATCHES - 1 batches (all but 1)
1491        // As each file is fully consumed, it should be dropped and disk usage should decrease
1492        for i in 0..(NUM_BATCHES - 1) {
1493            let result = reader.next().await.unwrap()?;
1494            assert_eq!(result.num_rows(), ROWS_PER_BATCH);
1495
1496            let col = result
1497                .column(0)
1498                .as_any()
1499                .downcast_ref::<Int32Array>()
1500                .unwrap();
1501            assert_eq!(col.value(0), (i * ROWS_PER_BATCH) as i32);
1502        }
1503
1504        // Step 6: Verify disk usage decreased but is not zero (at least 1 batch remains)
1505        let partial_disk_usage = disk_manager.used_disk_space();
1506        assert!(
1507            partial_disk_usage > 0
1508                && partial_disk_usage < (batch_size * NUM_BATCHES * 2) as u64,
1509            "Disk usage should be > 0 with remaining batches"
1510        );
1511        assert!(
1512            partial_disk_usage < initial_disk_usage,
1513            "Disk usage should have decreased after reading most batches: initial={initial_disk_usage}, partial={partial_disk_usage}"
1514        );
1515
1516        // Step 7: Read the final batch
1517        let result = reader.next().await.unwrap()?;
1518        assert_eq!(result.num_rows(), ROWS_PER_BATCH);
1519
1520        // Step 8: Drop writer first to signal no more data will be written
1521        // The reader has infinite stream semantics and will wait for the writer
1522        // to be dropped before returning None
1523        drop(writer);
1524
1525        // Verify we've read all batches - now the reader should return None
1526        assert!(
1527            reader.next().await.is_none(),
1528            "Should have no more batches to read"
1529        );
1530
1531        // Step 9: Drop reader to release all references
1532        drop(reader);
1533
1534        // Step 10: Verify complete cleanup - disk usage should be 0
1535        let final_disk_usage = disk_manager.used_disk_space();
1536        assert_eq!(
1537            final_disk_usage, 0,
1538            "Disk usage should be 0 after all files dropped, got {final_disk_usage}"
1539        );
1540
1541        Ok(())
1542    }
1543}