datafusion_physical_plan/spill/spill_pool.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use futures::{Stream, StreamExt};
19use std::collections::VecDeque;
20use std::sync::Arc;
21use std::task::Waker;
22
23use parking_lot::Mutex;
24
25use arrow::datatypes::SchemaRef;
26use arrow::record_batch::RecordBatch;
27use datafusion_common::Result;
28use datafusion_execution::disk_manager::RefCountedTempFile;
29use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
30
31use super::in_progress_spill_file::InProgressSpillFile;
32use super::spill_manager::SpillManager;
33
34/// Shared state between the writer and readers of a spill pool.
35/// This contains the queue of files and coordination state.
36///
37/// # Locking Design
38///
39/// This struct uses **fine-grained locking** with nested `Arc<Mutex<>>`:
40/// - `SpillPoolShared` is wrapped in `Arc<Mutex<>>` (outer lock)
41/// - Each `ActiveSpillFileShared` is wrapped in `Arc<Mutex<>>` (inner lock)
42///
43/// This enables:
44/// 1. **Short critical sections**: The outer lock is held only for queue operations
45/// 2. **I/O outside locks**: Disk I/O happens while holding only the file-specific lock
46/// 3. **Concurrent operations**: Reader can access the queue while writer does I/O
47///
48/// **Lock ordering discipline**: Never hold both locks simultaneously to prevent deadlock.
49/// Always: acquire outer lock → release outer lock → acquire inner lock (if needed).
50struct SpillPoolShared {
51 /// Queue of ALL files (including the current write file if it exists).
52 /// Readers always read from the front of this queue (FIFO).
53 /// Each file has its own lock to enable concurrent reader/writer access.
54 files: VecDeque<Arc<Mutex<ActiveSpillFileShared>>>,
55 /// SpillManager for creating files and tracking metrics
56 spill_manager: Arc<SpillManager>,
57 /// Pool-level waker to notify when new files are available (single reader)
58 waker: Option<Waker>,
59 /// Whether the writer has been dropped (no more files will be added)
60 writer_dropped: bool,
61 /// Writer's reference to the current file (shared by all cloned writers).
62 /// Has its own lock to allow I/O without blocking queue access.
63 current_write_file: Option<Arc<Mutex<ActiveSpillFileShared>>>,
64}
65
66impl SpillPoolShared {
67 /// Creates a new shared pool state
68 fn new(spill_manager: Arc<SpillManager>) -> Self {
69 Self {
70 files: VecDeque::new(),
71 spill_manager,
72 waker: None,
73 writer_dropped: false,
74 current_write_file: None,
75 }
76 }
77
78 /// Registers a waker to be notified when new data is available (pool-level)
79 fn register_waker(&mut self, waker: Waker) {
80 self.waker = Some(waker);
81 }
82
83 /// Wakes the pool-level reader
84 fn wake(&mut self) {
85 if let Some(waker) = self.waker.take() {
86 waker.wake();
87 }
88 }
89}
90
91/// Writer for a spill pool. Provides coordinated write access with FIFO semantics.
92///
93/// Created by [`channel`]. See that function for architecture diagrams and usage examples.
94///
95/// The writer is `Clone`, allowing multiple writers to coordinate on the same pool.
96/// All clones share the same current write file and coordinate file rotation.
97/// The writer automatically manages file rotation based on the `max_file_size_bytes`
98/// configured in [`channel`]. When the last writer clone is dropped, it finalizes the
99/// current file so readers can access all written data.
100#[derive(Clone)]
101pub struct SpillPoolWriter {
102 /// Maximum size in bytes before rotating to a new file.
103 /// Typically set from configuration `datafusion.execution.max_spill_file_size_bytes`.
104 max_file_size_bytes: usize,
105 /// Shared state with readers (includes current_write_file for coordination)
106 shared: Arc<Mutex<SpillPoolShared>>,
107}
108
109impl SpillPoolWriter {
110 /// Spills a batch to the pool, rotating files when necessary.
111 ///
112 /// If the current file would exceed `max_file_size_bytes` after adding
113 /// this batch, the file is finalized and a new one is started.
114 ///
115 /// See [`channel`] for overall architecture and examples.
116 ///
117 /// # File Rotation Logic
118 ///
119 /// ```text
120 /// push_batch()
121 /// │
122 /// ▼
123 /// Current file exists?
124 /// │
125 /// ├─ No ──▶ Create new file ──▶ Add to shared queue
126 /// │ Wake readers
127 /// ▼
128 /// Write batch to current file
129 /// │
130 /// ▼
131 /// estimated_size > max_file_size_bytes?
132 /// │
133 /// ├─ No ──▶ Keep current file for next batch
134 /// │
135 /// ▼
136 /// Yes: finish() current file
137 /// Mark writer_finished = true
138 /// Wake readers
139 /// │
140 /// ▼
141 /// Next push_batch() creates new file
142 /// ```
143 ///
144 /// # Errors
145 ///
146 /// Returns an error if disk I/O fails or disk quota is exceeded.
147 pub fn push_batch(&self, batch: &RecordBatch) -> Result<()> {
148 if batch.num_rows() == 0 {
149 // Skip empty batches
150 return Ok(());
151 }
152
153 let batch_size = batch.get_array_memory_size();
154
155 // Fine-grained locking: Lock shared state briefly for queue access
156 let mut shared = self.shared.lock();
157
158 // Create new file if we don't have one yet
159 if shared.current_write_file.is_none() {
160 let spill_manager = Arc::clone(&shared.spill_manager);
161 // Release shared lock before disk I/O (fine-grained locking)
162 drop(shared);
163
164 let writer = spill_manager.create_in_progress_file("SpillPool")?;
165 // Clone the file so readers can access it immediately
166 let file = writer.file().expect("InProgressSpillFile should always have a file when it is first created").clone();
167
168 let file_shared = Arc::new(Mutex::new(ActiveSpillFileShared {
169 writer: Some(writer),
170 file: Some(file), // Set immediately so readers can access it
171 batches_written: 0,
172 estimated_size: 0,
173 writer_finished: false,
174 waker: None,
175 }));
176
177 // Re-acquire lock and push to shared queue
178 shared = self.shared.lock();
179 shared.files.push_back(Arc::clone(&file_shared));
180 shared.current_write_file = Some(file_shared);
181 shared.wake(); // Wake readers waiting for new files
182 }
183
184 let current_write_file = shared.current_write_file.take();
185 // Release shared lock before file I/O (fine-grained locking)
186 // This allows readers to access the queue while we do disk I/O
187 drop(shared);
188
189 // Write batch to current file - lock only the specific file
190 if let Some(current_file) = current_write_file {
191 // Now lock just this file for I/O (separate from shared lock)
192 let mut file_shared = current_file.lock();
193
194 // Append the batch
195 if let Some(ref mut writer) = file_shared.writer {
196 writer.append_batch(batch)?;
197 file_shared.batches_written += 1;
198 file_shared.estimated_size += batch_size;
199 }
200
201 // Wake reader waiting on this specific file
202 file_shared.wake();
203
204 // Check if we need to rotate
205 let needs_rotation = file_shared.estimated_size > self.max_file_size_bytes;
206
207 if needs_rotation {
208 // Finish the IPC writer
209 if let Some(mut writer) = file_shared.writer.take() {
210 writer.finish()?;
211 }
212 // Mark as finished so readers know not to wait for more data
213 file_shared.writer_finished = true;
214 // Wake reader waiting on this file (it's now finished)
215 file_shared.wake();
216 // Don't put back current_write_file - let it rotate
217 } else {
218 // Release file lock
219 drop(file_shared);
220 // Put back the current file for further writing
221 let mut shared = self.shared.lock();
222 shared.current_write_file = Some(current_file);
223 }
224 }
225
226 Ok(())
227 }
228}
229
230impl Drop for SpillPoolWriter {
231 fn drop(&mut self) {
232 let mut shared = self.shared.lock();
233
234 // Finalize the current file when the last writer is dropped
235 if let Some(current_file) = shared.current_write_file.take() {
236 // Release shared lock before locking file
237 drop(shared);
238
239 let mut file_shared = current_file.lock();
240
241 // Finish the current writer if it exists
242 if let Some(mut writer) = file_shared.writer.take() {
243 // Ignore errors on drop - we're in destructor
244 let _ = writer.finish();
245 }
246
247 // Mark as finished so readers know not to wait for more data
248 file_shared.writer_finished = true;
249
250 // Wake reader waiting on this file (it's now finished)
251 file_shared.wake();
252
253 drop(file_shared);
254 shared = self.shared.lock();
255 }
256
257 // Mark writer as dropped and wake pool-level readers
258 shared.writer_dropped = true;
259 shared.wake();
260 }
261}
262
263/// Creates a paired writer and reader for a spill pool with MPSC (multi-producer, single-consumer)
264/// semantics.
265///
266/// This is the recommended way to create a spill pool. The writer is `Clone`, allowing
267/// multiple producers to coordinate writes to the same pool. The reader can consume batches
268/// in FIFO order. The reader can start reading immediately after a writer appends a batch
269/// to the spill file, without waiting for the file to be sealed, while writers continue to
270/// write more data.
271///
272/// Internally this coordinates rotating spill files based on size limits, and
273/// handles asynchronous notification between the writer and reader using wakers.
274/// This ensures that we manage disk usage efficiently while allowing concurrent
275/// I/O between the writer and reader.
276///
277/// # Data Flow Overview
278///
279/// 1. Writer write batch `B0` to F1
280/// 2. Writer write batch `B1` to F1, notices the size limit exceeded, finishes F1.
281/// 3. Reader read `B0` from F1
282/// 4. Reader read `B1`, no more batch to read -> wait on the waker
283/// 5. Writer write batch `B2` to a new file `F2`, wake up the waiting reader.
284/// 6. Reader read `B2` from F2.
285/// 7. Repeat until writer is dropped.
286///
287/// # Architecture
288///
289/// ```text
290/// ┌─────────────────────────────────────────────────────────────────────────┐
291/// │ SpillPool │
292/// │ │
293/// │ Writer Side Shared State Reader Side │
294/// │ ─────────── ──────────── ─────────── │
295/// │ │
296/// │ SpillPoolWriter ┌────────────────────┐ SpillPoolReader │
297/// │ │ │ VecDeque<File> │ │ │
298/// │ │ │ ┌────┐┌────┐ │ │ │
299/// │ push_batch() │ │ F1 ││ F2 │ ... │ next().await │
300/// │ │ │ └────┘└────┘ │ │ │
301/// │ ▼ │ (FIFO order) │ ▼ │
302/// │ ┌─────────┐ │ │ ┌──────────┐ │
303/// │ │Current │───────▶│ Coordination: │◀───│ Current │ │
304/// │ │Write │ │ - Wakers │ │ Read │ │
305/// │ │File │ │ - Batch counts │ │ File │ │
306/// │ └─────────┘ │ - Writer status │ └──────────┘ │
307/// │ │ └────────────────────┘ │ │
308/// │ │ │ │
309/// │ Size > limit? Read all batches? │
310/// │ │ │ │
311/// │ ▼ ▼ │
312/// │ Rotate to new file Pop from queue │
313/// └─────────────────────────────────────────────────────────────────────────┘
314///
315/// Writer produces → Shared FIFO queue → Reader consumes
316/// ```
317///
318/// # File State Machine
319///
320/// Each file in the pool coordinates between writer and reader:
321///
322/// ```text
323/// Writer View Reader View
324/// ─────────── ───────────
325///
326/// Created writer: Some(..) batches_read: 0
327/// batches_written: 0 (waiting for data)
328/// │
329/// ▼
330/// Writing append_batch() Can read if:
331/// batches_written++ batches_read < batches_written
332/// wake readers
333/// │ │
334/// │ ▼
335/// ┌──────┴──────┐ poll_next() → batch
336/// │ │ batches_read++
337/// ▼ ▼
338/// Size > limit? More data?
339/// │ │
340/// │ └─▶ Yes ──▶ Continue writing
341/// ▼
342/// finish() Reader catches up:
343/// writer_finished = true batches_read == batches_written
344/// wake readers │
345/// │ ▼
346/// └─────────────────────▶ Returns Poll::Ready(None)
347/// File complete, pop from queue
348/// ```
349///
350/// # Arguments
351///
352/// * `max_file_size_bytes` - Maximum size per file before rotation. When a file
353/// exceeds this size, the writer automatically rotates to a new file.
354/// * `spill_manager` - Manager for file creation and metrics tracking
355///
356/// # Returns
357///
358/// A tuple of `(SpillPoolWriter, SendableRecordBatchStream)` that share the same
359/// underlying pool. The reader is returned as a stream for immediate use with
360/// async stream combinators.
361///
362/// # Example
363///
364/// ```
365/// use std::sync::Arc;
366/// use arrow::array::{ArrayRef, Int32Array};
367/// use arrow::datatypes::{DataType, Field, Schema};
368/// use arrow::record_batch::RecordBatch;
369/// use datafusion_execution::runtime_env::RuntimeEnv;
370/// use futures::StreamExt;
371///
372/// # use datafusion_physical_plan::spill::spill_pool;
373/// # use datafusion_physical_plan::spill::SpillManager; // Re-exported for doctests
374/// # use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
375/// #
376/// # #[tokio::main]
377/// # async fn main() -> datafusion_common::Result<()> {
378/// # // Setup for the example (typically comes from TaskContext in production)
379/// # let env = Arc::new(RuntimeEnv::default());
380/// # let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
381/// # let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
382/// # let spill_manager = Arc::new(SpillManager::new(env, metrics, schema.clone()));
383/// #
384/// // Create channel with 1MB file size limit
385/// let (writer, mut reader) = spill_pool::channel(1024 * 1024, spill_manager);
386///
387/// // Spawn writer and reader concurrently; writer wakes reader via wakers
388/// let writer_task = tokio::spawn(async move {
389/// for i in 0..5 {
390/// let array: ArrayRef = Arc::new(Int32Array::from(vec![i; 100]));
391/// let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
392/// writer.push_batch(&batch)?;
393/// }
394/// // Explicitly drop writer to finalize the spill file and wake the reader
395/// drop(writer);
396/// datafusion_common::Result::<()>::Ok(())
397/// });
398///
399/// let reader_task = tokio::spawn(async move {
400/// let mut batches_read = 0;
401/// while let Some(result) = reader.next().await {
402/// let _batch = result?;
403/// batches_read += 1;
404/// }
405/// datafusion_common::Result::<usize>::Ok(batches_read)
406/// });
407///
408/// let (writer_res, reader_res) = tokio::join!(writer_task, reader_task);
409/// writer_res
410/// .map_err(|e| datafusion_common::DataFusionError::Execution(e.to_string()))??;
411/// let batches_read = reader_res
412/// .map_err(|e| datafusion_common::DataFusionError::Execution(e.to_string()))??;
413///
414/// assert_eq!(batches_read, 5);
415/// # Ok(())
416/// # }
417/// ```
418///
419/// # Why rotate files?
420///
421/// File rotation ensures we don't end up with unreferenced disk usage.
422/// If we used a single file for all spilled data, we would end up with
423/// unreferenced data at the beginning of the file that has already been read
424/// by readers but we can't delete because you can't truncate from the start of a file.
425///
426/// Consider the case of a query like `SELECT * FROM large_table WHERE false`.
427/// Obviously this query produces no output rows, but if we had a spilling operator
428/// in the middle of this query between the scan and the filter it would see the entire
429/// `large_table` flow through it and thus would spill all of that data to disk.
430/// So we'd end up using up to `size(large_table)` bytes of disk space.
431/// If instead we use file rotation, and as long as the readers can keep up with the writer,
432/// then we can ensure that once a file is fully read by all readers it can be deleted,
433/// thus bounding the maximum disk usage to roughly `max_file_size_bytes`.
434pub fn channel(
435 max_file_size_bytes: usize,
436 spill_manager: Arc<SpillManager>,
437) -> (SpillPoolWriter, SendableRecordBatchStream) {
438 let schema = Arc::clone(spill_manager.schema());
439 let shared = Arc::new(Mutex::new(SpillPoolShared::new(spill_manager)));
440
441 let writer = SpillPoolWriter {
442 max_file_size_bytes,
443 shared: Arc::clone(&shared),
444 };
445
446 let reader = SpillPoolReader::new(shared, schema);
447
448 (writer, Box::pin(reader))
449}
450
451/// Shared state between writer and readers for an active spill file.
452/// Protected by a Mutex to coordinate between concurrent readers and the writer.
453struct ActiveSpillFileShared {
454 /// Writer handle - taken (set to None) when finish() is called
455 writer: Option<InProgressSpillFile>,
456 /// The spill file, set when the writer finishes.
457 /// Taken by the reader when creating a stream (the file stays open via file handles).
458 file: Option<RefCountedTempFile>,
459 /// Total number of batches written to this file
460 batches_written: usize,
461 /// Estimated size in bytes of data written to this file
462 estimated_size: usize,
463 /// Whether the writer has finished writing to this file
464 writer_finished: bool,
465 /// Waker for reader waiting on this specific file (SPSC: only one reader)
466 waker: Option<Waker>,
467}
468
469impl ActiveSpillFileShared {
470 /// Registers a waker to be notified when new data is written to this file
471 fn register_waker(&mut self, waker: Waker) {
472 self.waker = Some(waker);
473 }
474
475 /// Wakes the reader waiting on this file
476 fn wake(&mut self) {
477 if let Some(waker) = self.waker.take() {
478 waker.wake();
479 }
480 }
481}
482
483/// Reader state for a SpillFile (owned by individual SpillFile instances).
484/// This is kept separate from the shared state to avoid holding locks during I/O.
485struct SpillFileReader {
486 /// The actual stream reading from disk
487 stream: SendableRecordBatchStream,
488 /// Number of batches this reader has consumed
489 batches_read: usize,
490}
491
492struct SpillFile {
493 /// Shared coordination state (contains writer and batch counts)
494 shared: Arc<Mutex<ActiveSpillFileShared>>,
495 /// Reader state (lazy-initialized, owned by this SpillFile)
496 reader: Option<SpillFileReader>,
497 /// Spill manager for creating readers
498 spill_manager: Arc<SpillManager>,
499}
500
501impl Stream for SpillFile {
502 type Item = Result<RecordBatch>;
503
504 fn poll_next(
505 mut self: std::pin::Pin<&mut Self>,
506 cx: &mut std::task::Context<'_>,
507 ) -> std::task::Poll<Option<Self::Item>> {
508 use std::task::Poll;
509
510 // Step 1: Lock shared state and check coordination
511 let (should_read, file) = {
512 let mut shared = self.shared.lock();
513
514 // Determine if we can read
515 let batches_read = self.reader.as_ref().map_or(0, |r| r.batches_read);
516
517 if batches_read < shared.batches_written {
518 // More data available to read - take the file if we don't have a reader yet
519 let file = if self.reader.is_none() {
520 shared.file.take()
521 } else {
522 None
523 };
524 (true, file)
525 } else if shared.writer_finished {
526 // No more data and writer is done - EOF
527 return Poll::Ready(None);
528 } else {
529 // Caught up to writer, but writer still active - register waker and wait
530 shared.register_waker(cx.waker().clone());
531 return Poll::Pending;
532 }
533 }; // Lock released here
534
535 // Step 2: Lazy-create reader stream if needed
536 if self.reader.is_none() && should_read {
537 if let Some(file) = file {
538 match self.spill_manager.read_spill_as_stream(file, None) {
539 Ok(stream) => {
540 self.reader = Some(SpillFileReader {
541 stream,
542 batches_read: 0,
543 });
544 }
545 Err(e) => return Poll::Ready(Some(Err(e))),
546 }
547 } else {
548 // File not available yet (writer hasn't finished or already taken)
549 // Register waker and wait for file to be ready
550 let mut shared = self.shared.lock();
551 shared.register_waker(cx.waker().clone());
552 return Poll::Pending;
553 }
554 }
555
556 // Step 3: Poll the reader stream (no lock held)
557 if let Some(reader) = &mut self.reader {
558 match reader.stream.poll_next_unpin(cx) {
559 Poll::Ready(Some(Ok(batch))) => {
560 // Successfully read a batch - increment counter
561 reader.batches_read += 1;
562 Poll::Ready(Some(Ok(batch)))
563 }
564 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
565 Poll::Ready(None) => {
566 // Stream exhausted unexpectedly
567 // This shouldn't happen if coordination is correct, but handle gracefully
568 Poll::Ready(None)
569 }
570 Poll::Pending => Poll::Pending,
571 }
572 } else {
573 // Should not reach here, but handle gracefully
574 Poll::Ready(None)
575 }
576 }
577}
578
579/// A stream that reads from a SpillPool in FIFO order.
580///
581/// Created by [`channel`]. See that function for architecture diagrams and usage examples.
582///
583/// The stream automatically handles file rotation and reads from completed files.
584/// When no data is available, it returns `Poll::Pending` and registers a waker to
585/// be notified when the writer produces more data.
586///
587/// # Infinite Stream Semantics
588///
589/// This stream never returns `None` (`Poll::Ready(None)`) on its own - it will keep
590/// waiting for the writer to produce more data. The stream ends only when:
591/// - The reader is dropped
592/// - The writer is dropped AND all queued data has been consumed
593///
594/// This makes it suitable for continuous streaming scenarios where the writer may
595/// produce data intermittently.
596pub struct SpillPoolReader {
597 /// Shared reference to the spill pool
598 shared: Arc<Mutex<SpillPoolShared>>,
599 /// Current SpillFile we're reading from
600 current_file: Option<SpillFile>,
601 /// Schema of the spilled data
602 schema: SchemaRef,
603}
604
605impl SpillPoolReader {
606 /// Creates a new reader from shared pool state.
607 ///
608 /// This is private - use the `channel()` function to create a reader/writer pair.
609 ///
610 /// # Arguments
611 ///
612 /// * `shared` - Shared reference to the pool state
613 fn new(shared: Arc<Mutex<SpillPoolShared>>, schema: SchemaRef) -> Self {
614 Self {
615 shared,
616 current_file: None,
617 schema,
618 }
619 }
620}
621
622impl Stream for SpillPoolReader {
623 type Item = Result<RecordBatch>;
624
625 fn poll_next(
626 mut self: std::pin::Pin<&mut Self>,
627 cx: &mut std::task::Context<'_>,
628 ) -> std::task::Poll<Option<Self::Item>> {
629 use std::task::Poll;
630
631 loop {
632 // If we have a current file, try to read from it
633 if let Some(ref mut file) = self.current_file {
634 match file.poll_next_unpin(cx) {
635 Poll::Ready(Some(Ok(batch))) => {
636 // Got a batch, return it
637 return Poll::Ready(Some(Ok(batch)));
638 }
639 Poll::Ready(Some(Err(e))) => {
640 // Error reading batch
641 return Poll::Ready(Some(Err(e)));
642 }
643 Poll::Ready(None) => {
644 // Current file stream exhausted
645 // Check if this file is marked as writer_finished
646 let writer_finished = { file.shared.lock().writer_finished };
647
648 if writer_finished {
649 // File is complete, pop it from the queue and move to next
650 let mut shared = self.shared.lock();
651 shared.files.pop_front();
652 drop(shared); // Release lock
653
654 // Clear current file and continue loop to get next file
655 self.current_file = None;
656 continue;
657 } else {
658 // Stream exhausted but writer not finished - unexpected
659 // This shouldn't happen with proper coordination
660 return Poll::Ready(None);
661 }
662 }
663 Poll::Pending => {
664 // File not ready yet (waiting for writer)
665 // Register waker so we get notified when writer adds more batches
666 let mut shared = self.shared.lock();
667 shared.register_waker(cx.waker().clone());
668 return Poll::Pending;
669 }
670 }
671 }
672
673 // No current file, need to get the next one
674 let mut shared = self.shared.lock();
675
676 // Peek at the front of the queue (don't pop yet)
677 if let Some(file_shared) = shared.files.front() {
678 // Create a SpillFile from the shared state
679 let spill_manager = Arc::clone(&shared.spill_manager);
680 let file_shared = Arc::clone(file_shared);
681 drop(shared); // Release lock before creating SpillFile
682
683 self.current_file = Some(SpillFile {
684 shared: file_shared,
685 reader: None,
686 spill_manager,
687 });
688
689 // Continue loop to poll the new file
690 continue;
691 }
692
693 // No files in queue - check if writer is done
694 if shared.writer_dropped {
695 // Writer is done and no more files will be added - EOF
696 return Poll::Ready(None);
697 }
698
699 // Writer still active, register waker that will get notified when new files are added
700 shared.register_waker(cx.waker().clone());
701 return Poll::Pending;
702 }
703 }
704}
705
706impl RecordBatchStream for SpillPoolReader {
707 fn schema(&self) -> SchemaRef {
708 Arc::clone(&self.schema)
709 }
710}
711
712#[cfg(test)]
713mod tests {
714 use super::*;
715 use crate::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
716 use arrow::array::{ArrayRef, Int32Array};
717 use arrow::datatypes::{DataType, Field, Schema};
718 use datafusion_common_runtime::SpawnedTask;
719 use datafusion_execution::runtime_env::RuntimeEnv;
720 use futures::StreamExt;
721
722 fn create_test_schema() -> SchemaRef {
723 Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]))
724 }
725
726 fn create_test_batch(start: i32, count: usize) -> RecordBatch {
727 let schema = create_test_schema();
728 let a: ArrayRef = Arc::new(Int32Array::from(
729 (start..start + count as i32).collect::<Vec<_>>(),
730 ));
731 RecordBatch::try_new(schema, vec![a]).unwrap()
732 }
733
734 fn create_spill_channel(
735 max_file_size: usize,
736 ) -> (SpillPoolWriter, SendableRecordBatchStream) {
737 let env = Arc::new(RuntimeEnv::default());
738 let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
739 let schema = create_test_schema();
740 let spill_manager = Arc::new(SpillManager::new(env, metrics, schema));
741
742 channel(max_file_size, spill_manager)
743 }
744
745 fn create_spill_channel_with_metrics(
746 max_file_size: usize,
747 ) -> (SpillPoolWriter, SendableRecordBatchStream, SpillMetrics) {
748 let env = Arc::new(RuntimeEnv::default());
749 let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
750 let schema = create_test_schema();
751 let spill_manager = Arc::new(SpillManager::new(env, metrics.clone(), schema));
752
753 let (writer, reader) = channel(max_file_size, spill_manager);
754 (writer, reader, metrics)
755 }
756
757 #[tokio::test]
758 async fn test_basic_write_and_read() -> Result<()> {
759 let (writer, mut reader) = create_spill_channel(1024 * 1024);
760
761 // Write one batch
762 let batch1 = create_test_batch(0, 10);
763 writer.push_batch(&batch1)?;
764
765 // Read the batch
766 let result = reader.next().await.unwrap()?;
767 assert_eq!(result.num_rows(), 10);
768
769 // Write another batch
770 let batch2 = create_test_batch(10, 5);
771 writer.push_batch(&batch2)?;
772 // Read the second batch
773 let result = reader.next().await.unwrap()?;
774 assert_eq!(result.num_rows(), 5);
775
776 Ok(())
777 }
778
779 #[tokio::test]
780 async fn test_single_batch_write_read() -> Result<()> {
781 let (writer, mut reader) = create_spill_channel(1024 * 1024);
782
783 // Write one batch
784 let batch = create_test_batch(0, 5);
785 writer.push_batch(&batch)?;
786
787 // Read it back
788 let result = reader.next().await.unwrap()?;
789 assert_eq!(result.num_rows(), 5);
790
791 // Verify the actual data
792 let col = result
793 .column(0)
794 .as_any()
795 .downcast_ref::<Int32Array>()
796 .unwrap();
797 assert_eq!(col.value(0), 0);
798 assert_eq!(col.value(4), 4);
799
800 Ok(())
801 }
802
803 #[tokio::test]
804 async fn test_multiple_batches_sequential() -> Result<()> {
805 let (writer, mut reader) = create_spill_channel(1024 * 1024);
806
807 // Write multiple batches
808 for i in 0..5 {
809 let batch = create_test_batch(i * 10, 10);
810 writer.push_batch(&batch)?;
811 }
812
813 // Read all batches and verify FIFO order
814 for i in 0..5 {
815 let result = reader.next().await.unwrap()?;
816 assert_eq!(result.num_rows(), 10);
817
818 let col = result
819 .column(0)
820 .as_any()
821 .downcast_ref::<Int32Array>()
822 .unwrap();
823 assert_eq!(col.value(0), i * 10, "Batch {i} not in FIFO order");
824 }
825
826 Ok(())
827 }
828
829 #[tokio::test]
830 async fn test_empty_writer() -> Result<()> {
831 let (_writer, reader) = create_spill_channel(1024 * 1024);
832
833 // Reader should pend since no batches were written
834 let mut reader = reader;
835 let result =
836 tokio::time::timeout(std::time::Duration::from_millis(100), reader.next())
837 .await;
838
839 assert!(result.is_err(), "Reader should timeout on empty writer");
840
841 Ok(())
842 }
843
844 #[tokio::test]
845 async fn test_empty_batch_skipping() -> Result<()> {
846 let (writer, mut reader) = create_spill_channel(1024 * 1024);
847
848 // Write empty batch
849 let empty_batch = create_test_batch(0, 0);
850 writer.push_batch(&empty_batch)?;
851
852 // Write non-empty batch
853 let batch = create_test_batch(0, 5);
854 writer.push_batch(&batch)?;
855
856 // Should only read the non-empty batch
857 let result = reader.next().await.unwrap()?;
858 assert_eq!(result.num_rows(), 5);
859
860 Ok(())
861 }
862
863 #[tokio::test]
864 async fn test_rotation_triggered_by_size() -> Result<()> {
865 // Set a small max_file_size to trigger rotation after one batch
866 let batch1 = create_test_batch(0, 10);
867 let batch_size = batch1.get_array_memory_size() + 1;
868
869 let (writer, mut reader, metrics) = create_spill_channel_with_metrics(batch_size);
870
871 // Write first batch (should fit in first file)
872 writer.push_batch(&batch1)?;
873
874 // Check metrics after first batch - file created but not finalized yet
875 assert_eq!(
876 metrics.spill_file_count.value(),
877 1,
878 "Should have created 1 file after first batch"
879 );
880 assert_eq!(
881 metrics.spilled_bytes.value(),
882 0,
883 "Spilled bytes should be 0 before file finalization"
884 );
885 assert_eq!(
886 metrics.spilled_rows.value(),
887 10,
888 "Should have spilled 10 rows from first batch"
889 );
890
891 // Write second batch (should trigger rotation - finalize first file)
892 let batch2 = create_test_batch(10, 10);
893 assert!(
894 batch2.get_array_memory_size() <= batch_size,
895 "batch2 size {} exceeds limit {batch_size}",
896 batch2.get_array_memory_size(),
897 );
898 assert!(
899 batch1.get_array_memory_size() + batch2.get_array_memory_size() > batch_size,
900 "Combined size {} does not exceed limit to trigger rotation",
901 batch1.get_array_memory_size() + batch2.get_array_memory_size()
902 );
903 writer.push_batch(&batch2)?;
904
905 // Check metrics after rotation - first file finalized, but second file not created yet
906 // (new file created lazily on next push_batch call)
907 assert_eq!(
908 metrics.spill_file_count.value(),
909 1,
910 "Should still have 1 file (second file not created until next write)"
911 );
912 assert!(
913 metrics.spilled_bytes.value() > 0,
914 "Spilled bytes should be > 0 after first file finalized (got {})",
915 metrics.spilled_bytes.value()
916 );
917 assert_eq!(
918 metrics.spilled_rows.value(),
919 20,
920 "Should have spilled 20 total rows (10 + 10)"
921 );
922
923 // Write a third batch to confirm rotation occurred (creates second file)
924 let batch3 = create_test_batch(20, 5);
925 writer.push_batch(&batch3)?;
926
927 // Now check that second file was created
928 assert_eq!(
929 metrics.spill_file_count.value(),
930 2,
931 "Should have created 2 files after writing to new file"
932 );
933 assert_eq!(
934 metrics.spilled_rows.value(),
935 25,
936 "Should have spilled 25 total rows (10 + 10 + 5)"
937 );
938
939 // Read all three batches
940 let result1 = reader.next().await.unwrap()?;
941 assert_eq!(result1.num_rows(), 10);
942
943 let result2 = reader.next().await.unwrap()?;
944 assert_eq!(result2.num_rows(), 10);
945
946 let result3 = reader.next().await.unwrap()?;
947 assert_eq!(result3.num_rows(), 5);
948
949 Ok(())
950 }
951
952 #[tokio::test]
953 async fn test_multiple_rotations() -> Result<()> {
954 let batches = (0..10)
955 .map(|i| create_test_batch(i * 10, 10))
956 .collect::<Vec<_>>();
957
958 let batch_size = batches[0].get_array_memory_size() * 2 + 1;
959
960 // Very small max_file_size to force frequent rotations
961 let (writer, mut reader, metrics) = create_spill_channel_with_metrics(batch_size);
962
963 // Write many batches to cause multiple rotations
964 for i in 0..10 {
965 let batch = create_test_batch(i * 10, 10);
966 writer.push_batch(&batch)?;
967 }
968
969 // Check metrics after all writes - should have multiple files due to rotations
970 // With batch_size = 2 * one_batch + 1, each file fits ~2 batches before rotating
971 // 10 batches should create multiple files (exact count depends on rotation timing)
972 let file_count = metrics.spill_file_count.value();
973 assert!(
974 file_count >= 4,
975 "Should have created at least 4 files with multiple rotations (got {file_count})"
976 );
977 assert!(
978 metrics.spilled_bytes.value() > 0,
979 "Spilled bytes should be > 0 after rotations (got {})",
980 metrics.spilled_bytes.value()
981 );
982 assert_eq!(
983 metrics.spilled_rows.value(),
984 100,
985 "Should have spilled 100 total rows (10 batches * 10 rows)"
986 );
987
988 // Read all batches and verify order
989 for i in 0..10 {
990 let result = reader.next().await.unwrap()?;
991 assert_eq!(result.num_rows(), 10);
992
993 let col = result
994 .column(0)
995 .as_any()
996 .downcast_ref::<Int32Array>()
997 .unwrap();
998 assert_eq!(
999 col.value(0),
1000 i * 10,
1001 "Batch {i} not in correct order after rotations"
1002 );
1003 }
1004
1005 Ok(())
1006 }
1007
1008 #[tokio::test]
1009 async fn test_single_batch_larger_than_limit() -> Result<()> {
1010 // Very small limit
1011 let (writer, mut reader, metrics) = create_spill_channel_with_metrics(100);
1012
1013 // Write a batch that exceeds the limit
1014 let large_batch = create_test_batch(0, 100);
1015 writer.push_batch(&large_batch)?;
1016
1017 // Check metrics after large batch - should trigger rotation immediately
1018 assert_eq!(
1019 metrics.spill_file_count.value(),
1020 1,
1021 "Should have created 1 file for large batch"
1022 );
1023 assert_eq!(
1024 metrics.spilled_rows.value(),
1025 100,
1026 "Should have spilled 100 rows from large batch"
1027 );
1028
1029 // Should still write and read successfully
1030 let result = reader.next().await.unwrap()?;
1031 assert_eq!(result.num_rows(), 100);
1032
1033 // Next batch should go to a new file
1034 let batch2 = create_test_batch(100, 10);
1035 writer.push_batch(&batch2)?;
1036
1037 // Check metrics after second batch - should have rotated to a new file
1038 assert_eq!(
1039 metrics.spill_file_count.value(),
1040 2,
1041 "Should have created 2 files after rotation"
1042 );
1043 assert_eq!(
1044 metrics.spilled_rows.value(),
1045 110,
1046 "Should have spilled 110 total rows (100 + 10)"
1047 );
1048
1049 let result2 = reader.next().await.unwrap()?;
1050 assert_eq!(result2.num_rows(), 10);
1051
1052 Ok(())
1053 }
1054
1055 #[tokio::test]
1056 async fn test_very_small_max_file_size() -> Result<()> {
1057 // Test with just 1 byte max (extreme case)
1058 let (writer, mut reader) = create_spill_channel(1);
1059
1060 // Any batch will exceed this limit
1061 let batch = create_test_batch(0, 5);
1062 writer.push_batch(&batch)?;
1063
1064 // Should still work
1065 let result = reader.next().await.unwrap()?;
1066 assert_eq!(result.num_rows(), 5);
1067
1068 Ok(())
1069 }
1070
1071 #[tokio::test]
1072 async fn test_exact_size_boundary() -> Result<()> {
1073 // Create a batch and measure its approximate size
1074 let batch = create_test_batch(0, 10);
1075 let batch_size = batch.get_array_memory_size();
1076
1077 // Set max_file_size to exactly the batch size
1078 let (writer, mut reader, metrics) = create_spill_channel_with_metrics(batch_size);
1079
1080 // Write first batch (exactly at the size limit)
1081 writer.push_batch(&batch)?;
1082
1083 // Check metrics after first batch - should NOT rotate yet (size == limit, not >)
1084 assert_eq!(
1085 metrics.spill_file_count.value(),
1086 1,
1087 "Should have created 1 file after first batch at exact boundary"
1088 );
1089 assert_eq!(
1090 metrics.spilled_rows.value(),
1091 10,
1092 "Should have spilled 10 rows from first batch"
1093 );
1094
1095 // Write second batch (exceeds the limit, should trigger rotation)
1096 let batch2 = create_test_batch(10, 10);
1097 writer.push_batch(&batch2)?;
1098
1099 // Check metrics after second batch - rotation triggered, first file finalized
1100 // Note: second file not created yet (lazy creation on next write)
1101 assert_eq!(
1102 metrics.spill_file_count.value(),
1103 1,
1104 "Should still have 1 file after rotation (second file created lazily)"
1105 );
1106 assert_eq!(
1107 metrics.spilled_rows.value(),
1108 20,
1109 "Should have spilled 20 total rows (10 + 10)"
1110 );
1111 // Verify first file was finalized by checking spilled_bytes
1112 assert!(
1113 metrics.spilled_bytes.value() > 0,
1114 "Spilled bytes should be > 0 after file finalization (got {})",
1115 metrics.spilled_bytes.value()
1116 );
1117
1118 // Both should be readable
1119 let result1 = reader.next().await.unwrap()?;
1120 assert_eq!(result1.num_rows(), 10);
1121
1122 let result2 = reader.next().await.unwrap()?;
1123 assert_eq!(result2.num_rows(), 10);
1124
1125 // Spill another batch, now we should see the second file created
1126 let batch3 = create_test_batch(20, 5);
1127 writer.push_batch(&batch3)?;
1128 assert_eq!(
1129 metrics.spill_file_count.value(),
1130 2,
1131 "Should have created 2 files after writing to new file"
1132 );
1133
1134 Ok(())
1135 }
1136
1137 #[tokio::test]
1138 async fn test_concurrent_reader_writer() -> Result<()> {
1139 let (writer, mut reader) = create_spill_channel(1024 * 1024);
1140
1141 // Spawn writer task
1142 let writer_handle = SpawnedTask::spawn(async move {
1143 for i in 0..10 {
1144 let batch = create_test_batch(i * 10, 10);
1145 writer.push_batch(&batch).unwrap();
1146 // Small delay to simulate real concurrent work
1147 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1148 }
1149 });
1150
1151 // Reader task (runs concurrently)
1152 let reader_handle = SpawnedTask::spawn(async move {
1153 let mut count = 0;
1154 for i in 0..10 {
1155 let result = reader.next().await.unwrap().unwrap();
1156 assert_eq!(result.num_rows(), 10);
1157
1158 let col = result
1159 .column(0)
1160 .as_any()
1161 .downcast_ref::<Int32Array>()
1162 .unwrap();
1163 assert_eq!(col.value(0), i * 10);
1164 count += 1;
1165 }
1166 count
1167 });
1168
1169 // Wait for both to complete
1170 writer_handle.await.unwrap();
1171 let batches_read = reader_handle.await.unwrap();
1172 assert_eq!(batches_read, 10);
1173
1174 Ok(())
1175 }
1176
1177 #[tokio::test]
1178 async fn test_reader_catches_up_to_writer() -> Result<()> {
1179 let (writer, mut reader) = create_spill_channel(1024 * 1024);
1180
1181 let (reader_waiting_tx, reader_waiting_rx) = tokio::sync::oneshot::channel();
1182 let (first_read_done_tx, first_read_done_rx) = tokio::sync::oneshot::channel();
1183
1184 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1185 enum ReadWriteEvent {
1186 ReadStart,
1187 Read(usize),
1188 Write(usize),
1189 }
1190
1191 let events = Arc::new(Mutex::new(vec![]));
1192 // Start reader first (will pend)
1193 let reader_events = Arc::clone(&events);
1194 let reader_handle = SpawnedTask::spawn(async move {
1195 reader_events.lock().push(ReadWriteEvent::ReadStart);
1196 reader_waiting_tx
1197 .send(())
1198 .expect("reader_waiting channel closed unexpectedly");
1199 let result = reader.next().await.unwrap().unwrap();
1200 reader_events
1201 .lock()
1202 .push(ReadWriteEvent::Read(result.num_rows()));
1203 first_read_done_tx
1204 .send(())
1205 .expect("first_read_done channel closed unexpectedly");
1206 let result = reader.next().await.unwrap().unwrap();
1207 reader_events
1208 .lock()
1209 .push(ReadWriteEvent::Read(result.num_rows()));
1210 });
1211
1212 // Wait until the reader is pending on the first batch
1213 reader_waiting_rx
1214 .await
1215 .expect("reader should signal when waiting");
1216
1217 // Now write a batch (should wake the reader)
1218 let batch = create_test_batch(0, 5);
1219 events.lock().push(ReadWriteEvent::Write(batch.num_rows()));
1220 writer.push_batch(&batch)?;
1221
1222 // Wait for the reader to finish the first read before allowing the
1223 // second write. This ensures deterministic ordering of events:
1224 // 1. The reader starts and pends on the first `next()`
1225 // 2. The first write wakes the reader
1226 // 3. The reader processes the first batch and signals completion
1227 // 4. The second write is issued, ensuring consistent event ordering
1228 first_read_done_rx
1229 .await
1230 .expect("reader should signal when first read completes");
1231
1232 // Write another batch
1233 let batch = create_test_batch(5, 10);
1234 events.lock().push(ReadWriteEvent::Write(batch.num_rows()));
1235 writer.push_batch(&batch)?;
1236
1237 // Reader should complete
1238 reader_handle.await.unwrap();
1239 let events = events.lock().clone();
1240 assert_eq!(
1241 events,
1242 vec![
1243 ReadWriteEvent::ReadStart,
1244 ReadWriteEvent::Write(5),
1245 ReadWriteEvent::Read(5),
1246 ReadWriteEvent::Write(10),
1247 ReadWriteEvent::Read(10)
1248 ]
1249 );
1250
1251 Ok(())
1252 }
1253
1254 #[tokio::test]
1255 async fn test_reader_starts_after_writer_finishes() -> Result<()> {
1256 let (writer, reader) = create_spill_channel(128);
1257
1258 // Writer writes all data
1259 for i in 0..5 {
1260 let batch = create_test_batch(i * 10, 10);
1261 writer.push_batch(&batch)?;
1262 }
1263
1264 drop(writer);
1265
1266 // Now start reader
1267 let mut reader = reader;
1268 let mut count = 0;
1269 for i in 0..5 {
1270 let result = reader.next().await.unwrap()?;
1271 assert_eq!(result.num_rows(), 10);
1272
1273 let col = result
1274 .column(0)
1275 .as_any()
1276 .downcast_ref::<Int32Array>()
1277 .unwrap();
1278 assert_eq!(col.value(0), i * 10);
1279 count += 1;
1280 }
1281
1282 assert_eq!(count, 5, "Should read all batches after writer finishes");
1283
1284 Ok(())
1285 }
1286
1287 #[tokio::test]
1288 async fn test_writer_drop_finalizes_file() -> Result<()> {
1289 let env = Arc::new(RuntimeEnv::default());
1290 let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
1291 let schema = create_test_schema();
1292 let spill_manager =
1293 Arc::new(SpillManager::new(Arc::clone(&env), metrics.clone(), schema));
1294
1295 let (writer, mut reader) = channel(1024 * 1024, spill_manager);
1296
1297 // Write some batches
1298 for i in 0..5 {
1299 let batch = create_test_batch(i * 10, 10);
1300 writer.push_batch(&batch)?;
1301 }
1302
1303 // Check metrics before drop - spilled_bytes should be 0 since file isn't finalized yet
1304 let spilled_bytes_before = metrics.spilled_bytes.value();
1305 assert_eq!(
1306 spilled_bytes_before, 0,
1307 "Spilled bytes should be 0 before writer is dropped"
1308 );
1309
1310 // Explicitly drop the writer - this should finalize the current file
1311 drop(writer);
1312
1313 // Check metrics after drop - spilled_bytes should be > 0 now
1314 let spilled_bytes_after = metrics.spilled_bytes.value();
1315 assert!(
1316 spilled_bytes_after > 0,
1317 "Spilled bytes should be > 0 after writer is dropped (got {spilled_bytes_after})"
1318 );
1319
1320 // Verify reader can still read all batches
1321 let mut count = 0;
1322 for i in 0..5 {
1323 let result = reader.next().await.unwrap()?;
1324 assert_eq!(result.num_rows(), 10);
1325
1326 let col = result
1327 .column(0)
1328 .as_any()
1329 .downcast_ref::<Int32Array>()
1330 .unwrap();
1331 assert_eq!(col.value(0), i * 10);
1332 count += 1;
1333 }
1334
1335 assert_eq!(count, 5, "Should read all batches after writer is dropped");
1336
1337 Ok(())
1338 }
1339
1340 #[tokio::test]
1341 async fn test_disk_usage_decreases_as_files_consumed() -> Result<()> {
1342 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1343
1344 // Test configuration
1345 const NUM_BATCHES: usize = 3;
1346 const ROWS_PER_BATCH: usize = 100;
1347
1348 // Step 1: Create a test batch and measure its size
1349 let batch = create_test_batch(0, ROWS_PER_BATCH);
1350 let batch_size = batch.get_array_memory_size();
1351
1352 // Step 2: Configure file rotation to approximately 1 batch per file
1353 // Create a custom RuntimeEnv so we can access the DiskManager
1354 let runtime = Arc::new(RuntimeEnvBuilder::default().build()?);
1355 let disk_manager = Arc::clone(&runtime.disk_manager);
1356
1357 let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
1358 let schema = create_test_schema();
1359 let spill_manager = Arc::new(SpillManager::new(runtime, metrics.clone(), schema));
1360
1361 let (writer, mut reader) = channel(batch_size, spill_manager);
1362
1363 // Step 3: Write NUM_BATCHES batches to create approximately NUM_BATCHES files
1364 for i in 0..NUM_BATCHES {
1365 let start = (i * ROWS_PER_BATCH) as i32;
1366 writer.push_batch(&create_test_batch(start, ROWS_PER_BATCH))?;
1367 }
1368
1369 // Check how many files were created (should be at least a few due to file rotation)
1370 let file_count = metrics.spill_file_count.value();
1371 assert_eq!(
1372 file_count,
1373 NUM_BATCHES - 1,
1374 "Expected at {} files with rotation, got {file_count}",
1375 NUM_BATCHES - 1
1376 );
1377
1378 // Step 4: Verify initial disk usage reflects all files
1379 let initial_disk_usage = disk_manager.used_disk_space();
1380 assert!(
1381 initial_disk_usage > 0,
1382 "Expected disk usage > 0 after writing batches, got {initial_disk_usage}"
1383 );
1384
1385 // Step 5: Read NUM_BATCHES - 1 batches (all but 1)
1386 // As each file is fully consumed, it should be dropped and disk usage should decrease
1387 for i in 0..(NUM_BATCHES - 1) {
1388 let result = reader.next().await.unwrap()?;
1389 assert_eq!(result.num_rows(), ROWS_PER_BATCH);
1390
1391 let col = result
1392 .column(0)
1393 .as_any()
1394 .downcast_ref::<Int32Array>()
1395 .unwrap();
1396 assert_eq!(col.value(0), (i * ROWS_PER_BATCH) as i32);
1397 }
1398
1399 // Step 6: Verify disk usage decreased but is not zero (at least 1 batch remains)
1400 let partial_disk_usage = disk_manager.used_disk_space();
1401 assert!(
1402 partial_disk_usage > 0
1403 && partial_disk_usage < (batch_size * NUM_BATCHES * 2) as u64,
1404 "Disk usage should be > 0 with remaining batches"
1405 );
1406 assert!(
1407 partial_disk_usage < initial_disk_usage,
1408 "Disk usage should have decreased after reading most batches: initial={initial_disk_usage}, partial={partial_disk_usage}"
1409 );
1410
1411 // Step 7: Read the final batch
1412 let result = reader.next().await.unwrap()?;
1413 assert_eq!(result.num_rows(), ROWS_PER_BATCH);
1414
1415 // Step 8: Drop writer first to signal no more data will be written
1416 // The reader has infinite stream semantics and will wait for the writer
1417 // to be dropped before returning None
1418 drop(writer);
1419
1420 // Verify we've read all batches - now the reader should return None
1421 assert!(
1422 reader.next().await.is_none(),
1423 "Should have no more batches to read"
1424 );
1425
1426 // Step 9: Drop reader to release all references
1427 drop(reader);
1428
1429 // Step 10: Verify complete cleanup - disk usage should be 0
1430 let final_disk_usage = disk_manager.used_disk_space();
1431 assert_eq!(
1432 final_disk_usage, 0,
1433 "Disk usage should be 0 after all files dropped, got {final_disk_usage}"
1434 );
1435
1436 Ok(())
1437 }
1438}