Skip to main content

datafusion_physical_plan/spill/
spill_pool.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use futures::{Stream, StreamExt};
19use std::collections::VecDeque;
20use std::sync::Arc;
21use std::task::Waker;
22
23use parking_lot::Mutex;
24
25use arrow::datatypes::SchemaRef;
26use arrow::record_batch::RecordBatch;
27use datafusion_common::Result;
28use datafusion_execution::disk_manager::RefCountedTempFile;
29use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
30
31use super::in_progress_spill_file::InProgressSpillFile;
32use super::spill_manager::SpillManager;
33
34/// Shared state between the writer and readers of a spill pool.
35/// This contains the queue of files and coordination state.
36///
37/// # Locking Design
38///
39/// This struct uses **fine-grained locking** with nested `Arc<Mutex<>>`:
40/// - `SpillPoolShared` is wrapped in `Arc<Mutex<>>` (outer lock)
41/// - Each `ActiveSpillFileShared` is wrapped in `Arc<Mutex<>>` (inner lock)
42///
43/// This enables:
44/// 1. **Short critical sections**: The outer lock is held only for queue operations
45/// 2. **I/O outside locks**: Disk I/O happens while holding only the file-specific lock
46/// 3. **Concurrent operations**: Reader can access the queue while writer does I/O
47///
48/// **Lock ordering discipline**: Never hold both locks simultaneously to prevent deadlock.
49/// Always: acquire outer lock → release outer lock → acquire inner lock (if needed).
50struct SpillPoolShared {
51    /// Queue of ALL files (including the current write file if it exists).
52    /// Readers always read from the front of this queue (FIFO).
53    /// Each file has its own lock to enable concurrent reader/writer access.
54    files: VecDeque<Arc<Mutex<ActiveSpillFileShared>>>,
55    /// SpillManager for creating files and tracking metrics
56    spill_manager: Arc<SpillManager>,
57    /// Pool-level waker to notify when new files are available (single reader)
58    waker: Option<Waker>,
59    /// Whether the writer has been dropped (no more files will be added)
60    writer_dropped: bool,
61    /// Writer's reference to the current file (shared by all cloned writers).
62    /// Has its own lock to allow I/O without blocking queue access.
63    current_write_file: Option<Arc<Mutex<ActiveSpillFileShared>>>,
64}
65
66impl SpillPoolShared {
67    /// Creates a new shared pool state
68    fn new(spill_manager: Arc<SpillManager>) -> Self {
69        Self {
70            files: VecDeque::new(),
71            spill_manager,
72            waker: None,
73            writer_dropped: false,
74            current_write_file: None,
75        }
76    }
77
78    /// Registers a waker to be notified when new data is available (pool-level)
79    fn register_waker(&mut self, waker: Waker) {
80        self.waker = Some(waker);
81    }
82
83    /// Wakes the pool-level reader
84    fn wake(&mut self) {
85        if let Some(waker) = self.waker.take() {
86            waker.wake();
87        }
88    }
89}
90
91/// Writer for a spill pool. Provides coordinated write access with FIFO semantics.
92///
93/// Created by [`channel`]. See that function for architecture diagrams and usage examples.
94///
95/// The writer is `Clone`, allowing multiple writers to coordinate on the same pool.
96/// All clones share the same current write file and coordinate file rotation.
97/// The writer automatically manages file rotation based on the `max_file_size_bytes`
98/// configured in [`channel`]. When the last writer clone is dropped, it finalizes the
99/// current file so readers can access all written data.
100#[derive(Clone)]
101pub struct SpillPoolWriter {
102    /// Maximum size in bytes before rotating to a new file.
103    /// Typically set from configuration `datafusion.execution.max_spill_file_size_bytes`.
104    max_file_size_bytes: usize,
105    /// Shared state with readers (includes current_write_file for coordination)
106    shared: Arc<Mutex<SpillPoolShared>>,
107}
108
109impl SpillPoolWriter {
110    /// Spills a batch to the pool, rotating files when necessary.
111    ///
112    /// If the current file would exceed `max_file_size_bytes` after adding
113    /// this batch, the file is finalized and a new one is started.
114    ///
115    /// See [`channel`] for overall architecture and examples.
116    ///
117    /// # File Rotation Logic
118    ///
119    /// ```text
120    /// push_batch()
121    ///      │
122    ///      ▼
123    /// Current file exists?
124    ///      │
125    ///      ├─ No ──▶ Create new file ──▶ Add to shared queue
126    ///      │                               Wake readers
127    ///      ▼
128    /// Write batch to current file
129    ///      │
130    ///      ▼
131    /// estimated_size > max_file_size_bytes?
132    ///      │
133    ///      ├─ No ──▶ Keep current file for next batch
134    ///      │
135    ///      ▼
136    /// Yes: finish() current file
137    ///      Mark writer_finished = true
138    ///      Wake readers
139    ///      │
140    ///      ▼
141    /// Next push_batch() creates new file
142    /// ```
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if disk I/O fails or disk quota is exceeded.
147    pub fn push_batch(&self, batch: &RecordBatch) -> Result<()> {
148        if batch.num_rows() == 0 {
149            // Skip empty batches
150            return Ok(());
151        }
152
153        let batch_size = batch.get_array_memory_size();
154
155        // Fine-grained locking: Lock shared state briefly for queue access
156        let mut shared = self.shared.lock();
157
158        // Create new file if we don't have one yet
159        if shared.current_write_file.is_none() {
160            let spill_manager = Arc::clone(&shared.spill_manager);
161            // Release shared lock before disk I/O (fine-grained locking)
162            drop(shared);
163
164            let writer = spill_manager.create_in_progress_file("SpillPool")?;
165            // Clone the file so readers can access it immediately
166            let file = writer.file().expect("InProgressSpillFile should always have a file when it is first created").clone();
167
168            let file_shared = Arc::new(Mutex::new(ActiveSpillFileShared {
169                writer: Some(writer),
170                file: Some(file), // Set immediately so readers can access it
171                batches_written: 0,
172                estimated_size: 0,
173                writer_finished: false,
174                waker: None,
175            }));
176
177            // Re-acquire lock and push to shared queue
178            shared = self.shared.lock();
179            shared.files.push_back(Arc::clone(&file_shared));
180            shared.current_write_file = Some(file_shared);
181            shared.wake(); // Wake readers waiting for new files
182        }
183
184        let current_write_file = shared.current_write_file.take();
185        // Release shared lock before file I/O (fine-grained locking)
186        // This allows readers to access the queue while we do disk I/O
187        drop(shared);
188
189        // Write batch to current file - lock only the specific file
190        if let Some(current_file) = current_write_file {
191            // Now lock just this file for I/O (separate from shared lock)
192            let mut file_shared = current_file.lock();
193
194            // Append the batch
195            if let Some(ref mut writer) = file_shared.writer {
196                writer.append_batch(batch)?;
197                file_shared.batches_written += 1;
198                file_shared.estimated_size += batch_size;
199            }
200
201            // Wake reader waiting on this specific file
202            file_shared.wake();
203
204            // Check if we need to rotate
205            let needs_rotation = file_shared.estimated_size > self.max_file_size_bytes;
206
207            if needs_rotation {
208                // Finish the IPC writer
209                if let Some(mut writer) = file_shared.writer.take() {
210                    writer.finish()?;
211                }
212                // Mark as finished so readers know not to wait for more data
213                file_shared.writer_finished = true;
214                // Wake reader waiting on this file (it's now finished)
215                file_shared.wake();
216                // Don't put back current_write_file - let it rotate
217            } else {
218                // Release file lock
219                drop(file_shared);
220                // Put back the current file for further writing
221                let mut shared = self.shared.lock();
222                shared.current_write_file = Some(current_file);
223            }
224        }
225
226        Ok(())
227    }
228}
229
230impl Drop for SpillPoolWriter {
231    fn drop(&mut self) {
232        let mut shared = self.shared.lock();
233
234        // Finalize the current file when the last writer is dropped
235        if let Some(current_file) = shared.current_write_file.take() {
236            // Release shared lock before locking file
237            drop(shared);
238
239            let mut file_shared = current_file.lock();
240
241            // Finish the current writer if it exists
242            if let Some(mut writer) = file_shared.writer.take() {
243                // Ignore errors on drop - we're in destructor
244                let _ = writer.finish();
245            }
246
247            // Mark as finished so readers know not to wait for more data
248            file_shared.writer_finished = true;
249
250            // Wake reader waiting on this file (it's now finished)
251            file_shared.wake();
252
253            drop(file_shared);
254            shared = self.shared.lock();
255        }
256
257        // Mark writer as dropped and wake pool-level readers
258        shared.writer_dropped = true;
259        shared.wake();
260    }
261}
262
263/// Creates a paired writer and reader for a spill pool with MPSC (multi-producer, single-consumer)
264/// semantics.
265///
266/// This is the recommended way to create a spill pool. The writer is `Clone`, allowing
267/// multiple producers to coordinate writes to the same pool. The reader can consume batches
268/// in FIFO order. The reader can start reading immediately after a writer appends a batch
269/// to the spill file, without waiting for the file to be sealed, while writers continue to
270/// write more data.
271///
272/// Internally this coordinates rotating spill files based on size limits, and
273/// handles asynchronous notification between the writer and reader using wakers.
274/// This ensures that we manage disk usage efficiently while allowing concurrent
275/// I/O between the writer and reader.
276///
277/// # Data Flow Overview
278///
279/// 1. Writer write batch `B0` to F1
280/// 2. Writer write batch `B1` to F1, notices the size limit exceeded, finishes F1.
281/// 3. Reader read `B0` from F1
282/// 4. Reader read `B1`, no more batch to read -> wait on the waker
283/// 5. Writer write batch `B2` to a new file `F2`, wake up the waiting reader.
284/// 6. Reader read `B2` from F2.
285/// 7. Repeat until writer is dropped.
286///
287/// # Architecture
288///
289/// ```text
290/// ┌─────────────────────────────────────────────────────────────────────────┐
291/// │                            SpillPool                                    │
292/// │                                                                         │
293/// │  Writer Side              Shared State              Reader Side         │
294/// │  ───────────              ────────────              ───────────         │
295/// │                                                                         │
296/// │  SpillPoolWriter    ┌────────────────────┐    SpillPoolReader           │
297/// │       │             │  VecDeque<File>    │          │                   │
298/// │       │             │  ┌────┐┌────┐      │          │                   │
299/// │  push_batch()       │  │ F1 ││ F2 │ ...  │      next().await            │
300/// │       │             │  └────┘└────┘      │          │                   │
301/// │       ▼             │   (FIFO order)     │          ▼                   │
302/// │  ┌─────────┐        │                    │    ┌──────────┐              │
303/// │  │Current  │───────▶│ Coordination:      │◀───│ Current  │              │
304/// │  │Write    │        │ - Wakers           │    │ Read     │              │
305/// │  │File     │        │ - Batch counts     │    │ File     │              │
306/// │  └─────────┘        │ - Writer status    │    └──────────┘              │
307/// │       │             └────────────────────┘          │                   │
308/// │       │                                              │                  │
309/// │  Size > limit?                                Read all batches?         │
310/// │       │                                              │                  │
311/// │       ▼                                              ▼                  │
312/// │  Rotate to new file                            Pop from queue           │
313/// └─────────────────────────────────────────────────────────────────────────┘
314///
315/// Writer produces → Shared FIFO queue → Reader consumes
316/// ```
317///
318/// # File State Machine
319///
320/// Each file in the pool coordinates between writer and reader:
321///
322/// ```text
323///                Writer View              Reader View
324///                ───────────              ───────────
325///
326/// Created        writer: Some(..)         batches_read: 0
327///                batches_written: 0       (waiting for data)
328///                       │
329///                       ▼
330/// Writing        append_batch()           Can read if:
331///                batches_written++        batches_read < batches_written
332///                wake readers
333///                       │                        │
334///                       │                        ▼
335///                ┌──────┴──────┐          poll_next() → batch
336///                │             │          batches_read++
337///                ▼             ▼
338///          Size > limit?  More data?
339///                │             │
340///                │             └─▶ Yes ──▶ Continue writing
341///                ▼
342///          finish()                   Reader catches up:
343///          writer_finished = true     batches_read == batches_written
344///          wake readers                       │
345///                │                            ▼
346///                └─────────────────────▶ Returns Poll::Ready(None)
347///                                       File complete, pop from queue
348/// ```
349///
350/// # Arguments
351///
352/// * `max_file_size_bytes` - Maximum size per file before rotation. When a file
353///   exceeds this size, the writer automatically rotates to a new file.
354/// * `spill_manager` - Manager for file creation and metrics tracking
355///
356/// # Returns
357///
358/// A tuple of `(SpillPoolWriter, SendableRecordBatchStream)` that share the same
359/// underlying pool. The reader is returned as a stream for immediate use with
360/// async stream combinators.
361///
362/// # Example
363///
364/// ```
365/// use std::sync::Arc;
366/// use arrow::array::{ArrayRef, Int32Array};
367/// use arrow::datatypes::{DataType, Field, Schema};
368/// use arrow::record_batch::RecordBatch;
369/// use datafusion_execution::runtime_env::RuntimeEnv;
370/// use futures::StreamExt;
371///
372/// # use datafusion_physical_plan::spill::spill_pool;
373/// # use datafusion_physical_plan::spill::SpillManager; // Re-exported for doctests
374/// # use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
375/// #
376/// # #[tokio::main]
377/// # async fn main() -> datafusion_common::Result<()> {
378/// # // Setup for the example (typically comes from TaskContext in production)
379/// # let env = Arc::new(RuntimeEnv::default());
380/// # let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
381/// # let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
382/// # let spill_manager = Arc::new(SpillManager::new(env, metrics, schema.clone()));
383/// #
384/// // Create channel with 1MB file size limit
385/// let (writer, mut reader) = spill_pool::channel(1024 * 1024, spill_manager);
386///
387/// // Spawn writer and reader concurrently; writer wakes reader via wakers
388/// let writer_task = tokio::spawn(async move {
389///     for i in 0..5 {
390///         let array: ArrayRef = Arc::new(Int32Array::from(vec![i; 100]));
391///         let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
392///         writer.push_batch(&batch)?;
393///     }
394///     // Explicitly drop writer to finalize the spill file and wake the reader
395///     drop(writer);
396///     datafusion_common::Result::<()>::Ok(())
397/// });
398///
399/// let reader_task = tokio::spawn(async move {
400///     let mut batches_read = 0;
401///     while let Some(result) = reader.next().await {
402///         let _batch = result?;
403///         batches_read += 1;
404///     }
405///     datafusion_common::Result::<usize>::Ok(batches_read)
406/// });
407///
408/// let (writer_res, reader_res) = tokio::join!(writer_task, reader_task);
409/// writer_res
410///     .map_err(|e| datafusion_common::DataFusionError::Execution(e.to_string()))??;
411/// let batches_read = reader_res
412///     .map_err(|e| datafusion_common::DataFusionError::Execution(e.to_string()))??;
413///
414/// assert_eq!(batches_read, 5);
415/// # Ok(())
416/// # }
417/// ```
418///
419/// # Why rotate files?
420///
421/// File rotation ensures we don't end up with unreferenced disk usage.
422/// If we used a single file for all spilled data, we would end up with
423/// unreferenced data at the beginning of the file that has already been read
424/// by readers but we can't delete because you can't truncate from the start of a file.
425///
426/// Consider the case of a query like `SELECT * FROM large_table WHERE false`.
427/// Obviously this query produces no output rows, but if we had a spilling operator
428/// in the middle of this query between the scan and the filter it would see the entire
429/// `large_table` flow through it and thus would spill all of that data to disk.
430/// So we'd end up using up to `size(large_table)` bytes of disk space.
431/// If instead we use file rotation, and as long as the readers can keep up with the writer,
432/// then we can ensure that once a file is fully read by all readers it can be deleted,
433/// thus bounding the maximum disk usage to roughly `max_file_size_bytes`.
434pub fn channel(
435    max_file_size_bytes: usize,
436    spill_manager: Arc<SpillManager>,
437) -> (SpillPoolWriter, SendableRecordBatchStream) {
438    let schema = Arc::clone(spill_manager.schema());
439    let shared = Arc::new(Mutex::new(SpillPoolShared::new(spill_manager)));
440
441    let writer = SpillPoolWriter {
442        max_file_size_bytes,
443        shared: Arc::clone(&shared),
444    };
445
446    let reader = SpillPoolReader::new(shared, schema);
447
448    (writer, Box::pin(reader))
449}
450
451/// Shared state between writer and readers for an active spill file.
452/// Protected by a Mutex to coordinate between concurrent readers and the writer.
453struct ActiveSpillFileShared {
454    /// Writer handle - taken (set to None) when finish() is called
455    writer: Option<InProgressSpillFile>,
456    /// The spill file, set when the writer finishes.
457    /// Taken by the reader when creating a stream (the file stays open via file handles).
458    file: Option<RefCountedTempFile>,
459    /// Total number of batches written to this file
460    batches_written: usize,
461    /// Estimated size in bytes of data written to this file
462    estimated_size: usize,
463    /// Whether the writer has finished writing to this file
464    writer_finished: bool,
465    /// Waker for reader waiting on this specific file (SPSC: only one reader)
466    waker: Option<Waker>,
467}
468
469impl ActiveSpillFileShared {
470    /// Registers a waker to be notified when new data is written to this file
471    fn register_waker(&mut self, waker: Waker) {
472        self.waker = Some(waker);
473    }
474
475    /// Wakes the reader waiting on this file
476    fn wake(&mut self) {
477        if let Some(waker) = self.waker.take() {
478            waker.wake();
479        }
480    }
481}
482
483/// Reader state for a SpillFile (owned by individual SpillFile instances).
484/// This is kept separate from the shared state to avoid holding locks during I/O.
485struct SpillFileReader {
486    /// The actual stream reading from disk
487    stream: SendableRecordBatchStream,
488    /// Number of batches this reader has consumed
489    batches_read: usize,
490}
491
492struct SpillFile {
493    /// Shared coordination state (contains writer and batch counts)
494    shared: Arc<Mutex<ActiveSpillFileShared>>,
495    /// Reader state (lazy-initialized, owned by this SpillFile)
496    reader: Option<SpillFileReader>,
497    /// Spill manager for creating readers
498    spill_manager: Arc<SpillManager>,
499}
500
501impl Stream for SpillFile {
502    type Item = Result<RecordBatch>;
503
504    fn poll_next(
505        mut self: std::pin::Pin<&mut Self>,
506        cx: &mut std::task::Context<'_>,
507    ) -> std::task::Poll<Option<Self::Item>> {
508        use std::task::Poll;
509
510        // Step 1: Lock shared state and check coordination
511        let (should_read, file) = {
512            let mut shared = self.shared.lock();
513
514            // Determine if we can read
515            let batches_read = self.reader.as_ref().map_or(0, |r| r.batches_read);
516
517            if batches_read < shared.batches_written {
518                // More data available to read - take the file if we don't have a reader yet
519                let file = if self.reader.is_none() {
520                    shared.file.take()
521                } else {
522                    None
523                };
524                (true, file)
525            } else if shared.writer_finished {
526                // No more data and writer is done - EOF
527                return Poll::Ready(None);
528            } else {
529                // Caught up to writer, but writer still active - register waker and wait
530                shared.register_waker(cx.waker().clone());
531                return Poll::Pending;
532            }
533        }; // Lock released here
534
535        // Step 2: Lazy-create reader stream if needed
536        if self.reader.is_none() && should_read {
537            if let Some(file) = file {
538                match self.spill_manager.read_spill_as_stream(file, None) {
539                    Ok(stream) => {
540                        self.reader = Some(SpillFileReader {
541                            stream,
542                            batches_read: 0,
543                        });
544                    }
545                    Err(e) => return Poll::Ready(Some(Err(e))),
546                }
547            } else {
548                // File not available yet (writer hasn't finished or already taken)
549                // Register waker and wait for file to be ready
550                let mut shared = self.shared.lock();
551                shared.register_waker(cx.waker().clone());
552                return Poll::Pending;
553            }
554        }
555
556        // Step 3: Poll the reader stream (no lock held)
557        if let Some(reader) = &mut self.reader {
558            match reader.stream.poll_next_unpin(cx) {
559                Poll::Ready(Some(Ok(batch))) => {
560                    // Successfully read a batch - increment counter
561                    reader.batches_read += 1;
562                    Poll::Ready(Some(Ok(batch)))
563                }
564                Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
565                Poll::Ready(None) => {
566                    // Stream exhausted unexpectedly
567                    // This shouldn't happen if coordination is correct, but handle gracefully
568                    Poll::Ready(None)
569                }
570                Poll::Pending => Poll::Pending,
571            }
572        } else {
573            // Should not reach here, but handle gracefully
574            Poll::Ready(None)
575        }
576    }
577}
578
579/// A stream that reads from a SpillPool in FIFO order.
580///
581/// Created by [`channel`]. See that function for architecture diagrams and usage examples.
582///
583/// The stream automatically handles file rotation and reads from completed files.
584/// When no data is available, it returns `Poll::Pending` and registers a waker to
585/// be notified when the writer produces more data.
586///
587/// # Infinite Stream Semantics
588///
589/// This stream never returns `None` (`Poll::Ready(None)`) on its own - it will keep
590/// waiting for the writer to produce more data. The stream ends only when:
591/// - The reader is dropped
592/// - The writer is dropped AND all queued data has been consumed
593///
594/// This makes it suitable for continuous streaming scenarios where the writer may
595/// produce data intermittently.
596pub struct SpillPoolReader {
597    /// Shared reference to the spill pool
598    shared: Arc<Mutex<SpillPoolShared>>,
599    /// Current SpillFile we're reading from
600    current_file: Option<SpillFile>,
601    /// Schema of the spilled data
602    schema: SchemaRef,
603}
604
605impl SpillPoolReader {
606    /// Creates a new reader from shared pool state.
607    ///
608    /// This is private - use the `channel()` function to create a reader/writer pair.
609    ///
610    /// # Arguments
611    ///
612    /// * `shared` - Shared reference to the pool state
613    fn new(shared: Arc<Mutex<SpillPoolShared>>, schema: SchemaRef) -> Self {
614        Self {
615            shared,
616            current_file: None,
617            schema,
618        }
619    }
620}
621
622impl Stream for SpillPoolReader {
623    type Item = Result<RecordBatch>;
624
625    fn poll_next(
626        mut self: std::pin::Pin<&mut Self>,
627        cx: &mut std::task::Context<'_>,
628    ) -> std::task::Poll<Option<Self::Item>> {
629        use std::task::Poll;
630
631        loop {
632            // If we have a current file, try to read from it
633            if let Some(ref mut file) = self.current_file {
634                match file.poll_next_unpin(cx) {
635                    Poll::Ready(Some(Ok(batch))) => {
636                        // Got a batch, return it
637                        return Poll::Ready(Some(Ok(batch)));
638                    }
639                    Poll::Ready(Some(Err(e))) => {
640                        // Error reading batch
641                        return Poll::Ready(Some(Err(e)));
642                    }
643                    Poll::Ready(None) => {
644                        // Current file stream exhausted
645                        // Check if this file is marked as writer_finished
646                        let writer_finished = { file.shared.lock().writer_finished };
647
648                        if writer_finished {
649                            // File is complete, pop it from the queue and move to next
650                            let mut shared = self.shared.lock();
651                            shared.files.pop_front();
652                            drop(shared); // Release lock
653
654                            // Clear current file and continue loop to get next file
655                            self.current_file = None;
656                            continue;
657                        } else {
658                            // Stream exhausted but writer not finished - unexpected
659                            // This shouldn't happen with proper coordination
660                            return Poll::Ready(None);
661                        }
662                    }
663                    Poll::Pending => {
664                        // File not ready yet (waiting for writer)
665                        // Register waker so we get notified when writer adds more batches
666                        let mut shared = self.shared.lock();
667                        shared.register_waker(cx.waker().clone());
668                        return Poll::Pending;
669                    }
670                }
671            }
672
673            // No current file, need to get the next one
674            let mut shared = self.shared.lock();
675
676            // Peek at the front of the queue (don't pop yet)
677            if let Some(file_shared) = shared.files.front() {
678                // Create a SpillFile from the shared state
679                let spill_manager = Arc::clone(&shared.spill_manager);
680                let file_shared = Arc::clone(file_shared);
681                drop(shared); // Release lock before creating SpillFile
682
683                self.current_file = Some(SpillFile {
684                    shared: file_shared,
685                    reader: None,
686                    spill_manager,
687                });
688
689                // Continue loop to poll the new file
690                continue;
691            }
692
693            // No files in queue - check if writer is done
694            if shared.writer_dropped {
695                // Writer is done and no more files will be added - EOF
696                return Poll::Ready(None);
697            }
698
699            // Writer still active, register waker that will get notified when new files are added
700            shared.register_waker(cx.waker().clone());
701            return Poll::Pending;
702        }
703    }
704}
705
706impl RecordBatchStream for SpillPoolReader {
707    fn schema(&self) -> SchemaRef {
708        Arc::clone(&self.schema)
709    }
710}
711
712#[cfg(test)]
713mod tests {
714    use super::*;
715    use crate::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
716    use arrow::array::{ArrayRef, Int32Array};
717    use arrow::datatypes::{DataType, Field, Schema};
718    use datafusion_common_runtime::SpawnedTask;
719    use datafusion_execution::runtime_env::RuntimeEnv;
720    use futures::StreamExt;
721
722    fn create_test_schema() -> SchemaRef {
723        Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))
724    }
725
726    fn create_test_batch(start: i32, count: usize) -> RecordBatch {
727        let schema = create_test_schema();
728        let a: ArrayRef = Arc::new(Int32Array::from(
729            (start..start + count as i32).collect::<Vec<_>>(),
730        ));
731        RecordBatch::try_new(schema, vec![a]).unwrap()
732    }
733
734    fn create_spill_channel(
735        max_file_size: usize,
736    ) -> (SpillPoolWriter, SendableRecordBatchStream) {
737        let env = Arc::new(RuntimeEnv::default());
738        let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
739        let schema = create_test_schema();
740        let spill_manager = Arc::new(SpillManager::new(env, metrics, schema));
741
742        channel(max_file_size, spill_manager)
743    }
744
745    fn create_spill_channel_with_metrics(
746        max_file_size: usize,
747    ) -> (SpillPoolWriter, SendableRecordBatchStream, SpillMetrics) {
748        let env = Arc::new(RuntimeEnv::default());
749        let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
750        let schema = create_test_schema();
751        let spill_manager = Arc::new(SpillManager::new(env, metrics.clone(), schema));
752
753        let (writer, reader) = channel(max_file_size, spill_manager);
754        (writer, reader, metrics)
755    }
756
757    #[tokio::test]
758    async fn test_basic_write_and_read() -> Result<()> {
759        let (writer, mut reader) = create_spill_channel(1024 * 1024);
760
761        // Write one batch
762        let batch1 = create_test_batch(0, 10);
763        writer.push_batch(&batch1)?;
764
765        // Read the batch
766        let result = reader.next().await.unwrap()?;
767        assert_eq!(result.num_rows(), 10);
768
769        // Write another batch
770        let batch2 = create_test_batch(10, 5);
771        writer.push_batch(&batch2)?;
772        // Read the second batch
773        let result = reader.next().await.unwrap()?;
774        assert_eq!(result.num_rows(), 5);
775
776        Ok(())
777    }
778
779    #[tokio::test]
780    async fn test_single_batch_write_read() -> Result<()> {
781        let (writer, mut reader) = create_spill_channel(1024 * 1024);
782
783        // Write one batch
784        let batch = create_test_batch(0, 5);
785        writer.push_batch(&batch)?;
786
787        // Read it back
788        let result = reader.next().await.unwrap()?;
789        assert_eq!(result.num_rows(), 5);
790
791        // Verify the actual data
792        let col = result
793            .column(0)
794            .as_any()
795            .downcast_ref::<Int32Array>()
796            .unwrap();
797        assert_eq!(col.value(0), 0);
798        assert_eq!(col.value(4), 4);
799
800        Ok(())
801    }
802
803    #[tokio::test]
804    async fn test_multiple_batches_sequential() -> Result<()> {
805        let (writer, mut reader) = create_spill_channel(1024 * 1024);
806
807        // Write multiple batches
808        for i in 0..5 {
809            let batch = create_test_batch(i * 10, 10);
810            writer.push_batch(&batch)?;
811        }
812
813        // Read all batches and verify FIFO order
814        for i in 0..5 {
815            let result = reader.next().await.unwrap()?;
816            assert_eq!(result.num_rows(), 10);
817
818            let col = result
819                .column(0)
820                .as_any()
821                .downcast_ref::<Int32Array>()
822                .unwrap();
823            assert_eq!(col.value(0), i * 10, "Batch {i} not in FIFO order");
824        }
825
826        Ok(())
827    }
828
829    #[tokio::test]
830    async fn test_empty_writer() -> Result<()> {
831        let (_writer, reader) = create_spill_channel(1024 * 1024);
832
833        // Reader should pend since no batches were written
834        let mut reader = reader;
835        let result =
836            tokio::time::timeout(std::time::Duration::from_millis(100), reader.next())
837                .await;
838
839        assert!(result.is_err(), "Reader should timeout on empty writer");
840
841        Ok(())
842    }
843
844    #[tokio::test]
845    async fn test_empty_batch_skipping() -> Result<()> {
846        let (writer, mut reader) = create_spill_channel(1024 * 1024);
847
848        // Write empty batch
849        let empty_batch = create_test_batch(0, 0);
850        writer.push_batch(&empty_batch)?;
851
852        // Write non-empty batch
853        let batch = create_test_batch(0, 5);
854        writer.push_batch(&batch)?;
855
856        // Should only read the non-empty batch
857        let result = reader.next().await.unwrap()?;
858        assert_eq!(result.num_rows(), 5);
859
860        Ok(())
861    }
862
863    #[tokio::test]
864    async fn test_rotation_triggered_by_size() -> Result<()> {
865        // Set a small max_file_size to trigger rotation after one batch
866        let batch1 = create_test_batch(0, 10);
867        let batch_size = batch1.get_array_memory_size() + 1;
868
869        let (writer, mut reader, metrics) = create_spill_channel_with_metrics(batch_size);
870
871        // Write first batch (should fit in first file)
872        writer.push_batch(&batch1)?;
873
874        // Check metrics after first batch - file created but not finalized yet
875        assert_eq!(
876            metrics.spill_file_count.value(),
877            1,
878            "Should have created 1 file after first batch"
879        );
880        assert_eq!(
881            metrics.spilled_bytes.value(),
882            0,
883            "Spilled bytes should be 0 before file finalization"
884        );
885        assert_eq!(
886            metrics.spilled_rows.value(),
887            10,
888            "Should have spilled 10 rows from first batch"
889        );
890
891        // Write second batch (should trigger rotation - finalize first file)
892        let batch2 = create_test_batch(10, 10);
893        assert!(
894            batch2.get_array_memory_size() <= batch_size,
895            "batch2 size {} exceeds limit {batch_size}",
896            batch2.get_array_memory_size(),
897        );
898        assert!(
899            batch1.get_array_memory_size() + batch2.get_array_memory_size() > batch_size,
900            "Combined size {} does not exceed limit to trigger rotation",
901            batch1.get_array_memory_size() + batch2.get_array_memory_size()
902        );
903        writer.push_batch(&batch2)?;
904
905        // Check metrics after rotation - first file finalized, but second file not created yet
906        // (new file created lazily on next push_batch call)
907        assert_eq!(
908            metrics.spill_file_count.value(),
909            1,
910            "Should still have 1 file (second file not created until next write)"
911        );
912        assert!(
913            metrics.spilled_bytes.value() > 0,
914            "Spilled bytes should be > 0 after first file finalized (got {})",
915            metrics.spilled_bytes.value()
916        );
917        assert_eq!(
918            metrics.spilled_rows.value(),
919            20,
920            "Should have spilled 20 total rows (10 + 10)"
921        );
922
923        // Write a third batch to confirm rotation occurred (creates second file)
924        let batch3 = create_test_batch(20, 5);
925        writer.push_batch(&batch3)?;
926
927        // Now check that second file was created
928        assert_eq!(
929            metrics.spill_file_count.value(),
930            2,
931            "Should have created 2 files after writing to new file"
932        );
933        assert_eq!(
934            metrics.spilled_rows.value(),
935            25,
936            "Should have spilled 25 total rows (10 + 10 + 5)"
937        );
938
939        // Read all three batches
940        let result1 = reader.next().await.unwrap()?;
941        assert_eq!(result1.num_rows(), 10);
942
943        let result2 = reader.next().await.unwrap()?;
944        assert_eq!(result2.num_rows(), 10);
945
946        let result3 = reader.next().await.unwrap()?;
947        assert_eq!(result3.num_rows(), 5);
948
949        Ok(())
950    }
951
952    #[tokio::test]
953    async fn test_multiple_rotations() -> Result<()> {
954        let batches = (0..10)
955            .map(|i| create_test_batch(i * 10, 10))
956            .collect::<Vec<_>>();
957
958        let batch_size = batches[0].get_array_memory_size() * 2 + 1;
959
960        // Very small max_file_size to force frequent rotations
961        let (writer, mut reader, metrics) = create_spill_channel_with_metrics(batch_size);
962
963        // Write many batches to cause multiple rotations
964        for i in 0..10 {
965            let batch = create_test_batch(i * 10, 10);
966            writer.push_batch(&batch)?;
967        }
968
969        // Check metrics after all writes - should have multiple files due to rotations
970        // With batch_size = 2 * one_batch + 1, each file fits ~2 batches before rotating
971        // 10 batches should create multiple files (exact count depends on rotation timing)
972        let file_count = metrics.spill_file_count.value();
973        assert!(
974            file_count >= 4,
975            "Should have created at least 4 files with multiple rotations (got {file_count})"
976        );
977        assert!(
978            metrics.spilled_bytes.value() > 0,
979            "Spilled bytes should be > 0 after rotations (got {})",
980            metrics.spilled_bytes.value()
981        );
982        assert_eq!(
983            metrics.spilled_rows.value(),
984            100,
985            "Should have spilled 100 total rows (10 batches * 10 rows)"
986        );
987
988        // Read all batches and verify order
989        for i in 0..10 {
990            let result = reader.next().await.unwrap()?;
991            assert_eq!(result.num_rows(), 10);
992
993            let col = result
994                .column(0)
995                .as_any()
996                .downcast_ref::<Int32Array>()
997                .unwrap();
998            assert_eq!(
999                col.value(0),
1000                i * 10,
1001                "Batch {i} not in correct order after rotations"
1002            );
1003        }
1004
1005        Ok(())
1006    }
1007
1008    #[tokio::test]
1009    async fn test_single_batch_larger_than_limit() -> Result<()> {
1010        // Very small limit
1011        let (writer, mut reader, metrics) = create_spill_channel_with_metrics(100);
1012
1013        // Write a batch that exceeds the limit
1014        let large_batch = create_test_batch(0, 100);
1015        writer.push_batch(&large_batch)?;
1016
1017        // Check metrics after large batch - should trigger rotation immediately
1018        assert_eq!(
1019            metrics.spill_file_count.value(),
1020            1,
1021            "Should have created 1 file for large batch"
1022        );
1023        assert_eq!(
1024            metrics.spilled_rows.value(),
1025            100,
1026            "Should have spilled 100 rows from large batch"
1027        );
1028
1029        // Should still write and read successfully
1030        let result = reader.next().await.unwrap()?;
1031        assert_eq!(result.num_rows(), 100);
1032
1033        // Next batch should go to a new file
1034        let batch2 = create_test_batch(100, 10);
1035        writer.push_batch(&batch2)?;
1036
1037        // Check metrics after second batch - should have rotated to a new file
1038        assert_eq!(
1039            metrics.spill_file_count.value(),
1040            2,
1041            "Should have created 2 files after rotation"
1042        );
1043        assert_eq!(
1044            metrics.spilled_rows.value(),
1045            110,
1046            "Should have spilled 110 total rows (100 + 10)"
1047        );
1048
1049        let result2 = reader.next().await.unwrap()?;
1050        assert_eq!(result2.num_rows(), 10);
1051
1052        Ok(())
1053    }
1054
1055    #[tokio::test]
1056    async fn test_very_small_max_file_size() -> Result<()> {
1057        // Test with just 1 byte max (extreme case)
1058        let (writer, mut reader) = create_spill_channel(1);
1059
1060        // Any batch will exceed this limit
1061        let batch = create_test_batch(0, 5);
1062        writer.push_batch(&batch)?;
1063
1064        // Should still work
1065        let result = reader.next().await.unwrap()?;
1066        assert_eq!(result.num_rows(), 5);
1067
1068        Ok(())
1069    }
1070
1071    #[tokio::test]
1072    async fn test_exact_size_boundary() -> Result<()> {
1073        // Create a batch and measure its approximate size
1074        let batch = create_test_batch(0, 10);
1075        let batch_size = batch.get_array_memory_size();
1076
1077        // Set max_file_size to exactly the batch size
1078        let (writer, mut reader, metrics) = create_spill_channel_with_metrics(batch_size);
1079
1080        // Write first batch (exactly at the size limit)
1081        writer.push_batch(&batch)?;
1082
1083        // Check metrics after first batch - should NOT rotate yet (size == limit, not >)
1084        assert_eq!(
1085            metrics.spill_file_count.value(),
1086            1,
1087            "Should have created 1 file after first batch at exact boundary"
1088        );
1089        assert_eq!(
1090            metrics.spilled_rows.value(),
1091            10,
1092            "Should have spilled 10 rows from first batch"
1093        );
1094
1095        // Write second batch (exceeds the limit, should trigger rotation)
1096        let batch2 = create_test_batch(10, 10);
1097        writer.push_batch(&batch2)?;
1098
1099        // Check metrics after second batch - rotation triggered, first file finalized
1100        // Note: second file not created yet (lazy creation on next write)
1101        assert_eq!(
1102            metrics.spill_file_count.value(),
1103            1,
1104            "Should still have 1 file after rotation (second file created lazily)"
1105        );
1106        assert_eq!(
1107            metrics.spilled_rows.value(),
1108            20,
1109            "Should have spilled 20 total rows (10 + 10)"
1110        );
1111        // Verify first file was finalized by checking spilled_bytes
1112        assert!(
1113            metrics.spilled_bytes.value() > 0,
1114            "Spilled bytes should be > 0 after file finalization (got {})",
1115            metrics.spilled_bytes.value()
1116        );
1117
1118        // Both should be readable
1119        let result1 = reader.next().await.unwrap()?;
1120        assert_eq!(result1.num_rows(), 10);
1121
1122        let result2 = reader.next().await.unwrap()?;
1123        assert_eq!(result2.num_rows(), 10);
1124
1125        // Spill another batch, now we should see the second file created
1126        let batch3 = create_test_batch(20, 5);
1127        writer.push_batch(&batch3)?;
1128        assert_eq!(
1129            metrics.spill_file_count.value(),
1130            2,
1131            "Should have created 2 files after writing to new file"
1132        );
1133
1134        Ok(())
1135    }
1136
1137    #[tokio::test]
1138    async fn test_concurrent_reader_writer() -> Result<()> {
1139        let (writer, mut reader) = create_spill_channel(1024 * 1024);
1140
1141        // Spawn writer task
1142        let writer_handle = SpawnedTask::spawn(async move {
1143            for i in 0..10 {
1144                let batch = create_test_batch(i * 10, 10);
1145                writer.push_batch(&batch).unwrap();
1146                // Small delay to simulate real concurrent work
1147                tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1148            }
1149        });
1150
1151        // Reader task (runs concurrently)
1152        let reader_handle = SpawnedTask::spawn(async move {
1153            let mut count = 0;
1154            for i in 0..10 {
1155                let result = reader.next().await.unwrap().unwrap();
1156                assert_eq!(result.num_rows(), 10);
1157
1158                let col = result
1159                    .column(0)
1160                    .as_any()
1161                    .downcast_ref::<Int32Array>()
1162                    .unwrap();
1163                assert_eq!(col.value(0), i * 10);
1164                count += 1;
1165            }
1166            count
1167        });
1168
1169        // Wait for both to complete
1170        writer_handle.await.unwrap();
1171        let batches_read = reader_handle.await.unwrap();
1172        assert_eq!(batches_read, 10);
1173
1174        Ok(())
1175    }
1176
1177    #[tokio::test]
1178    async fn test_reader_catches_up_to_writer() -> Result<()> {
1179        let (writer, mut reader) = create_spill_channel(1024 * 1024);
1180
1181        let (reader_waiting_tx, reader_waiting_rx) = tokio::sync::oneshot::channel();
1182        let (first_read_done_tx, first_read_done_rx) = tokio::sync::oneshot::channel();
1183
1184        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1185        enum ReadWriteEvent {
1186            ReadStart,
1187            Read(usize),
1188            Write(usize),
1189        }
1190
1191        let events = Arc::new(Mutex::new(vec![]));
1192        // Start reader first (will pend)
1193        let reader_events = Arc::clone(&events);
1194        let reader_handle = SpawnedTask::spawn(async move {
1195            reader_events.lock().push(ReadWriteEvent::ReadStart);
1196            reader_waiting_tx
1197                .send(())
1198                .expect("reader_waiting channel closed unexpectedly");
1199            let result = reader.next().await.unwrap().unwrap();
1200            reader_events
1201                .lock()
1202                .push(ReadWriteEvent::Read(result.num_rows()));
1203            first_read_done_tx
1204                .send(())
1205                .expect("first_read_done channel closed unexpectedly");
1206            let result = reader.next().await.unwrap().unwrap();
1207            reader_events
1208                .lock()
1209                .push(ReadWriteEvent::Read(result.num_rows()));
1210        });
1211
1212        // Wait until the reader is pending on the first batch
1213        reader_waiting_rx
1214            .await
1215            .expect("reader should signal when waiting");
1216
1217        // Now write a batch (should wake the reader)
1218        let batch = create_test_batch(0, 5);
1219        events.lock().push(ReadWriteEvent::Write(batch.num_rows()));
1220        writer.push_batch(&batch)?;
1221
1222        // Wait for the reader to finish the first read before allowing the
1223        // second write. This ensures deterministic ordering of events:
1224        // 1. The reader starts and pends on the first `next()`
1225        // 2. The first write wakes the reader
1226        // 3. The reader processes the first batch and signals completion
1227        // 4. The second write is issued, ensuring consistent event ordering
1228        first_read_done_rx
1229            .await
1230            .expect("reader should signal when first read completes");
1231
1232        // Write another batch
1233        let batch = create_test_batch(5, 10);
1234        events.lock().push(ReadWriteEvent::Write(batch.num_rows()));
1235        writer.push_batch(&batch)?;
1236
1237        // Reader should complete
1238        reader_handle.await.unwrap();
1239        let events = events.lock().clone();
1240        assert_eq!(
1241            events,
1242            vec![
1243                ReadWriteEvent::ReadStart,
1244                ReadWriteEvent::Write(5),
1245                ReadWriteEvent::Read(5),
1246                ReadWriteEvent::Write(10),
1247                ReadWriteEvent::Read(10)
1248            ]
1249        );
1250
1251        Ok(())
1252    }
1253
1254    #[tokio::test]
1255    async fn test_reader_starts_after_writer_finishes() -> Result<()> {
1256        let (writer, reader) = create_spill_channel(128);
1257
1258        // Writer writes all data
1259        for i in 0..5 {
1260            let batch = create_test_batch(i * 10, 10);
1261            writer.push_batch(&batch)?;
1262        }
1263
1264        drop(writer);
1265
1266        // Now start reader
1267        let mut reader = reader;
1268        let mut count = 0;
1269        for i in 0..5 {
1270            let result = reader.next().await.unwrap()?;
1271            assert_eq!(result.num_rows(), 10);
1272
1273            let col = result
1274                .column(0)
1275                .as_any()
1276                .downcast_ref::<Int32Array>()
1277                .unwrap();
1278            assert_eq!(col.value(0), i * 10);
1279            count += 1;
1280        }
1281
1282        assert_eq!(count, 5, "Should read all batches after writer finishes");
1283
1284        Ok(())
1285    }
1286
1287    #[tokio::test]
1288    async fn test_writer_drop_finalizes_file() -> Result<()> {
1289        let env = Arc::new(RuntimeEnv::default());
1290        let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
1291        let schema = create_test_schema();
1292        let spill_manager =
1293            Arc::new(SpillManager::new(Arc::clone(&env), metrics.clone(), schema));
1294
1295        let (writer, mut reader) = channel(1024 * 1024, spill_manager);
1296
1297        // Write some batches
1298        for i in 0..5 {
1299            let batch = create_test_batch(i * 10, 10);
1300            writer.push_batch(&batch)?;
1301        }
1302
1303        // Check metrics before drop - spilled_bytes should be 0 since file isn't finalized yet
1304        let spilled_bytes_before = metrics.spilled_bytes.value();
1305        assert_eq!(
1306            spilled_bytes_before, 0,
1307            "Spilled bytes should be 0 before writer is dropped"
1308        );
1309
1310        // Explicitly drop the writer - this should finalize the current file
1311        drop(writer);
1312
1313        // Check metrics after drop - spilled_bytes should be > 0 now
1314        let spilled_bytes_after = metrics.spilled_bytes.value();
1315        assert!(
1316            spilled_bytes_after > 0,
1317            "Spilled bytes should be > 0 after writer is dropped (got {spilled_bytes_after})"
1318        );
1319
1320        // Verify reader can still read all batches
1321        let mut count = 0;
1322        for i in 0..5 {
1323            let result = reader.next().await.unwrap()?;
1324            assert_eq!(result.num_rows(), 10);
1325
1326            let col = result
1327                .column(0)
1328                .as_any()
1329                .downcast_ref::<Int32Array>()
1330                .unwrap();
1331            assert_eq!(col.value(0), i * 10);
1332            count += 1;
1333        }
1334
1335        assert_eq!(count, 5, "Should read all batches after writer is dropped");
1336
1337        Ok(())
1338    }
1339
1340    #[tokio::test]
1341    async fn test_disk_usage_decreases_as_files_consumed() -> Result<()> {
1342        use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1343
1344        // Test configuration
1345        const NUM_BATCHES: usize = 3;
1346        const ROWS_PER_BATCH: usize = 100;
1347
1348        // Step 1: Create a test batch and measure its size
1349        let batch = create_test_batch(0, ROWS_PER_BATCH);
1350        let batch_size = batch.get_array_memory_size();
1351
1352        // Step 2: Configure file rotation to approximately 1 batch per file
1353        // Create a custom RuntimeEnv so we can access the DiskManager
1354        let runtime = Arc::new(RuntimeEnvBuilder::default().build()?);
1355        let disk_manager = Arc::clone(&runtime.disk_manager);
1356
1357        let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
1358        let schema = create_test_schema();
1359        let spill_manager = Arc::new(SpillManager::new(runtime, metrics.clone(), schema));
1360
1361        let (writer, mut reader) = channel(batch_size, spill_manager);
1362
1363        // Step 3: Write NUM_BATCHES batches to create approximately NUM_BATCHES files
1364        for i in 0..NUM_BATCHES {
1365            let start = (i * ROWS_PER_BATCH) as i32;
1366            writer.push_batch(&create_test_batch(start, ROWS_PER_BATCH))?;
1367        }
1368
1369        // Check how many files were created (should be at least a few due to file rotation)
1370        let file_count = metrics.spill_file_count.value();
1371        assert_eq!(
1372            file_count,
1373            NUM_BATCHES - 1,
1374            "Expected at {} files with rotation, got {file_count}",
1375            NUM_BATCHES - 1
1376        );
1377
1378        // Step 4: Verify initial disk usage reflects all files
1379        let initial_disk_usage = disk_manager.used_disk_space();
1380        assert!(
1381            initial_disk_usage > 0,
1382            "Expected disk usage > 0 after writing batches, got {initial_disk_usage}"
1383        );
1384
1385        // Step 5: Read NUM_BATCHES - 1 batches (all but 1)
1386        // As each file is fully consumed, it should be dropped and disk usage should decrease
1387        for i in 0..(NUM_BATCHES - 1) {
1388            let result = reader.next().await.unwrap()?;
1389            assert_eq!(result.num_rows(), ROWS_PER_BATCH);
1390
1391            let col = result
1392                .column(0)
1393                .as_any()
1394                .downcast_ref::<Int32Array>()
1395                .unwrap();
1396            assert_eq!(col.value(0), (i * ROWS_PER_BATCH) as i32);
1397        }
1398
1399        // Step 6: Verify disk usage decreased but is not zero (at least 1 batch remains)
1400        let partial_disk_usage = disk_manager.used_disk_space();
1401        assert!(
1402            partial_disk_usage > 0
1403                && partial_disk_usage < (batch_size * NUM_BATCHES * 2) as u64,
1404            "Disk usage should be > 0 with remaining batches"
1405        );
1406        assert!(
1407            partial_disk_usage < initial_disk_usage,
1408            "Disk usage should have decreased after reading most batches: initial={initial_disk_usage}, partial={partial_disk_usage}"
1409        );
1410
1411        // Step 7: Read the final batch
1412        let result = reader.next().await.unwrap()?;
1413        assert_eq!(result.num_rows(), ROWS_PER_BATCH);
1414
1415        // Step 8: Drop writer first to signal no more data will be written
1416        // The reader has infinite stream semantics and will wait for the writer
1417        // to be dropped before returning None
1418        drop(writer);
1419
1420        // Verify we've read all batches - now the reader should return None
1421        assert!(
1422            reader.next().await.is_none(),
1423            "Should have no more batches to read"
1424        );
1425
1426        // Step 9: Drop reader to release all references
1427        drop(reader);
1428
1429        // Step 10: Verify complete cleanup - disk usage should be 0
1430        let final_disk_usage = disk_manager.used_disk_space();
1431        assert_eq!(
1432            final_disk_usage, 0,
1433            "Disk usage should be 0 after all files dropped, got {final_disk_usage}"
1434        );
1435
1436        Ok(())
1437    }
1438}