datafusion_physical_plan/spill/
spill_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Define the `SpillManager` struct, which is responsible for reading and writing `RecordBatch`es to raw files based on the provided configurations.
19
20use arrow::array::StringViewArray;
21use arrow::datatypes::SchemaRef;
22use arrow::record_batch::RecordBatch;
23use datafusion_common::{Result, config::SpillCompression};
24use datafusion_execution::SendableRecordBatchStream;
25use datafusion_execution::disk_manager::RefCountedTempFile;
26use datafusion_execution::runtime_env::RuntimeEnv;
27use std::sync::Arc;
28
29use super::{SpillReaderStream, in_progress_spill_file::InProgressSpillFile};
30use crate::coop::cooperative;
31use crate::{common::spawn_buffered, metrics::SpillMetrics};
32
33/// The `SpillManager` is responsible for the following tasks:
34/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations.
35/// - Updating the associated metrics.
36///
37/// Note: The caller (external operators such as `SortExec`) is responsible for interpreting the spilled files.
38/// For example, all records within the same spill file are ordered according to a specific order.
39#[derive(Debug, Clone)]
40pub struct SpillManager {
41    env: Arc<RuntimeEnv>,
42    pub(crate) metrics: SpillMetrics,
43    schema: SchemaRef,
44    /// Number of batches to buffer in memory during disk reads
45    batch_read_buffer_capacity: usize,
46    /// general-purpose compression options
47    pub(crate) compression: SpillCompression,
48}
49
50impl SpillManager {
51    pub fn new(env: Arc<RuntimeEnv>, metrics: SpillMetrics, schema: SchemaRef) -> Self {
52        Self {
53            env,
54            metrics,
55            schema,
56            batch_read_buffer_capacity: 2,
57            compression: SpillCompression::default(),
58        }
59    }
60
61    pub fn with_batch_read_buffer_capacity(
62        mut self,
63        batch_read_buffer_capacity: usize,
64    ) -> Self {
65        self.batch_read_buffer_capacity = batch_read_buffer_capacity;
66        self
67    }
68
69    pub fn with_compression_type(mut self, spill_compression: SpillCompression) -> Self {
70        self.compression = spill_compression;
71        self
72    }
73
74    /// Returns the schema for batches managed by this SpillManager
75    pub fn schema(&self) -> &SchemaRef {
76        &self.schema
77    }
78
79    /// Creates a temporary file for in-progress operations, returning an error
80    /// message if file creation fails. The file can be used to append batches
81    /// incrementally and then finish the file when done.
82    pub fn create_in_progress_file(
83        &self,
84        request_msg: &str,
85    ) -> Result<InProgressSpillFile> {
86        let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?;
87        Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file))
88    }
89
90    /// Spill input `batches` into a single file in a atomic operation. If it is
91    /// intended to incrementally write in-memory batches into the same spill file,
92    /// use [`Self::create_in_progress_file`] instead.
93    /// None is returned if no batches are spilled.
94    ///
95    /// # Errors
96    /// - Returns an error if spilling would exceed the disk usage limit configured
97    ///   by `max_temp_directory_size` in `DiskManager`
98    pub fn spill_record_batch_and_finish(
99        &self,
100        batches: &[RecordBatch],
101        request_msg: &str,
102    ) -> Result<Option<RefCountedTempFile>> {
103        let mut in_progress_file = self.create_in_progress_file(request_msg)?;
104
105        for batch in batches {
106            in_progress_file.append_batch(batch)?;
107        }
108
109        in_progress_file.finish()
110    }
111
112    /// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method
113    /// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`.
114    ///
115    /// # Errors
116    /// - Returns an error if spilling would exceed the disk usage limit configured
117    ///   by `max_temp_directory_size` in `DiskManager`
118    pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory(
119        &self,
120        batch: &RecordBatch,
121        request_description: &str,
122        row_limit: usize,
123    ) -> Result<Option<(RefCountedTempFile, usize)>> {
124        let total_rows = batch.num_rows();
125        let mut batches = Vec::new();
126        let mut offset = 0;
127
128        // It's ok to calculate all slices first, because slicing is zero-copy.
129        while offset < total_rows {
130            let length = std::cmp::min(total_rows - offset, row_limit);
131            let sliced_batch = batch.slice(offset, length);
132            batches.push(sliced_batch);
133            offset += length;
134        }
135
136        let mut in_progress_file = self.create_in_progress_file(request_description)?;
137
138        let mut max_record_batch_size = 0;
139
140        for batch in batches {
141            in_progress_file.append_batch(&batch)?;
142
143            max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
144        }
145
146        let file = in_progress_file.finish()?;
147
148        Ok(file.map(|f| (f, max_record_batch_size)))
149    }
150
151    /// Spill a stream of `RecordBatch`es to disk and return the spill file and the size of the largest batch in memory
152    pub(crate) async fn spill_record_batch_stream_and_return_max_batch_memory(
153        &self,
154        stream: &mut SendableRecordBatchStream,
155        request_description: &str,
156    ) -> Result<Option<(RefCountedTempFile, usize)>> {
157        use futures::StreamExt;
158
159        let mut in_progress_file = self.create_in_progress_file(request_description)?;
160
161        let mut max_record_batch_size = 0;
162
163        while let Some(batch) = stream.next().await {
164            let batch = batch?;
165            in_progress_file.append_batch(&batch)?;
166
167            max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
168        }
169
170        let file = in_progress_file.finish()?;
171
172        Ok(file.map(|f| (f, max_record_batch_size)))
173    }
174
175    /// Reads a spill file as a stream. The file must be created by the current `SpillManager`.
176    /// This method will generate output in FIFO order: the batch appended first
177    /// will be read first.
178    pub fn read_spill_as_stream(
179        &self,
180        spill_file_path: RefCountedTempFile,
181        max_record_batch_memory: Option<usize>,
182    ) -> Result<SendableRecordBatchStream> {
183        let stream = Box::pin(cooperative(SpillReaderStream::new(
184            Arc::clone(&self.schema),
185            spill_file_path,
186            max_record_batch_memory,
187        )));
188
189        Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
190    }
191}
192
193pub(crate) trait GetSlicedSize {
194    /// Returns the size of the `RecordBatch` when sliced.
195    /// Note: if multiple arrays or even a single array share the same data buffers, we may double count each buffer.
196    /// Therefore, make sure we call gc() or organize_stringview_arrays() before using this method.
197    fn get_sliced_size(&self) -> Result<usize>;
198}
199
200impl GetSlicedSize for RecordBatch {
201    fn get_sliced_size(&self) -> Result<usize> {
202        let mut total = 0;
203        for array in self.columns() {
204            let data = array.to_data();
205            total += data.get_slice_memory_size()?;
206
207            // While StringViewArray holds large data buffer for non inlined string, the Arrow layout (BufferSpec)
208            // does not include any data buffers. Currently, ArrayData::get_slice_memory_size()
209            // under-counts memory size by accounting only views buffer although data buffer is cloned during slice()
210            //
211            // Therefore, we manually add the sum of the lengths used by all non inlined views
212            // on top of the sliced size for views buffer. This matches the intended semantics of
213            // "bytes needed if we materialized exactly this slice into fresh buffers".
214            // This is a workaround until https://github.com/apache/arrow-rs/issues/8230
215            if let Some(sv) = array.as_any().downcast_ref::<StringViewArray>() {
216                for buffer in sv.data_buffers() {
217                    total += buffer.capacity();
218                }
219            }
220        }
221        Ok(total)
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use crate::spill::{get_record_batch_memory_size, spill_manager::GetSlicedSize};
228    use arrow::datatypes::{DataType, Field, Schema};
229    use arrow::{
230        array::{ArrayRef, StringViewArray},
231        record_batch::RecordBatch,
232    };
233    use datafusion_common::Result;
234    use std::sync::Arc;
235
236    #[test]
237    fn check_sliced_size_for_string_view_array() -> Result<()> {
238        let array_length = 50;
239        let short_len = 8;
240        let long_len = 25;
241
242        // Build StringViewArray that includes both inline strings and non inlined strings
243        let strings: Vec<String> = (0..array_length)
244            .map(|i| {
245                if i % 2 == 0 {
246                    "a".repeat(short_len)
247                } else {
248                    "b".repeat(long_len)
249                }
250            })
251            .collect();
252
253        let string_array = StringViewArray::from(strings);
254        let array_ref: ArrayRef = Arc::new(string_array);
255        let batch = RecordBatch::try_new(
256            Arc::new(Schema::new(vec![Field::new(
257                "strings",
258                DataType::Utf8View,
259                false,
260            )])),
261            vec![array_ref],
262        )
263        .unwrap();
264
265        // We did not slice the batch, so these two memory size should be equal
266        assert_eq!(
267            batch.get_sliced_size().unwrap(),
268            get_record_batch_memory_size(&batch)
269        );
270
271        // Slice the batch into half
272        let half_batch = batch.slice(0, array_length / 2);
273        // Now sliced_size is smaller because the views buffer is sliced
274        assert!(
275            half_batch.get_sliced_size().unwrap()
276                < get_record_batch_memory_size(&half_batch)
277        );
278        let data = arrow::array::Array::to_data(&half_batch.column(0));
279        let views_sliced_size = data.get_slice_memory_size()?;
280        // The sliced size should be larger than sliced views buffer size
281        assert!(views_sliced_size < half_batch.get_sliced_size().unwrap());
282
283        Ok(())
284    }
285}