Skip to main content

datafusion_physical_plan/spill/
spill_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Define the `SpillManager` struct, which is responsible for reading and writing `RecordBatch`es to raw files based on the provided configurations.
19
20use super::{SpillReaderStream, in_progress_spill_file::InProgressSpillFile};
21use crate::coop::cooperative;
22use crate::{common::spawn_buffered, metrics::SpillMetrics};
23use arrow::array::StringViewArray;
24use arrow::datatypes::SchemaRef;
25use arrow::record_batch::RecordBatch;
26use datafusion_common::utils::memory::get_record_batch_memory_size;
27use datafusion_common::{DataFusionError, Result, config::SpillCompression};
28use datafusion_execution::SendableRecordBatchStream;
29use datafusion_execution::disk_manager::RefCountedTempFile;
30use datafusion_execution::runtime_env::RuntimeEnv;
31use std::borrow::Borrow;
32use std::sync::Arc;
33
34/// The `SpillManager` is responsible for the following tasks:
35/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations.
36/// - Updating the associated metrics.
37///
38/// Note: The caller (external operators such as `SortExec`) is responsible for interpreting the spilled files.
39/// For example, all records within the same spill file are ordered according to a specific order.
40#[derive(Debug, Clone)]
41pub struct SpillManager {
42    env: Arc<RuntimeEnv>,
43    pub(crate) metrics: SpillMetrics,
44    schema: SchemaRef,
45    /// Number of batches to buffer in memory during disk reads
46    batch_read_buffer_capacity: usize,
47    /// general-purpose compression options
48    pub(crate) compression: SpillCompression,
49}
50
51impl SpillManager {
52    pub fn new(env: Arc<RuntimeEnv>, metrics: SpillMetrics, schema: SchemaRef) -> Self {
53        Self {
54            env,
55            metrics,
56            schema,
57            batch_read_buffer_capacity: 2,
58            compression: SpillCompression::default(),
59        }
60    }
61
62    pub fn with_batch_read_buffer_capacity(
63        mut self,
64        batch_read_buffer_capacity: usize,
65    ) -> Self {
66        self.batch_read_buffer_capacity = batch_read_buffer_capacity;
67        self
68    }
69
70    pub fn with_compression_type(mut self, spill_compression: SpillCompression) -> Self {
71        self.compression = spill_compression;
72        self
73    }
74
75    /// Returns the schema for batches managed by this SpillManager
76    pub fn schema(&self) -> &SchemaRef {
77        &self.schema
78    }
79
80    /// Creates a temporary file for in-progress operations, returning an error
81    /// message if file creation fails. The file can be used to append batches
82    /// incrementally and then finish the file when done.
83    pub fn create_in_progress_file(
84        &self,
85        request_msg: &str,
86    ) -> Result<InProgressSpillFile> {
87        let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?;
88        Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file))
89    }
90
91    /// Spill input `batches` into a single file in a atomic operation. If it is
92    /// intended to incrementally write in-memory batches into the same spill file,
93    /// use [`Self::create_in_progress_file`] instead.
94    /// None is returned if no batches are spilled.
95    ///
96    /// # Errors
97    /// - Returns an error if spilling would exceed the disk usage limit configured
98    ///   by `max_temp_directory_size` in `DiskManager`
99    pub fn spill_record_batch_and_finish(
100        &self,
101        batches: &[RecordBatch],
102        request_msg: &str,
103    ) -> Result<Option<RefCountedTempFile>> {
104        let mut in_progress_file = self.create_in_progress_file(request_msg)?;
105
106        for batch in batches {
107            in_progress_file.append_batch(batch)?;
108        }
109
110        in_progress_file.finish()
111    }
112
113    /// Spill an iterator of `RecordBatch`es to disk and return the spill file and the size of the largest batch in memory
114    /// Note that this expects the caller to provide *non-sliced* batches, so the memory calculation of each batch is accurate.
115    pub(crate) fn spill_record_batch_iter_and_return_max_batch_memory(
116        &self,
117        mut iter: impl Iterator<Item = Result<impl Borrow<RecordBatch>>>,
118        request_description: &str,
119    ) -> Result<Option<(RefCountedTempFile, usize)>> {
120        let mut in_progress_file = self.create_in_progress_file(request_description)?;
121
122        let mut max_record_batch_size = 0;
123
124        iter.try_for_each(|batch| {
125            let batch = batch?;
126            let borrowed = batch.borrow();
127            if borrowed.num_rows() == 0 {
128                return Ok(());
129            }
130            in_progress_file.append_batch(borrowed)?;
131
132            max_record_batch_size =
133                max_record_batch_size.max(get_record_batch_memory_size(borrowed));
134            Result::<_, DataFusionError>::Ok(())
135        })?;
136
137        let file = in_progress_file.finish()?;
138
139        Ok(file.map(|f| (f, max_record_batch_size)))
140    }
141
142    /// Spill a stream of `RecordBatch`es to disk and return the spill file and the size of the largest batch in memory
143    pub(crate) async fn spill_record_batch_stream_and_return_max_batch_memory(
144        &self,
145        stream: &mut SendableRecordBatchStream,
146        request_description: &str,
147    ) -> Result<Option<(RefCountedTempFile, usize)>> {
148        use futures::StreamExt;
149
150        let mut in_progress_file = self.create_in_progress_file(request_description)?;
151
152        let mut max_record_batch_size = 0;
153
154        while let Some(batch) = stream.next().await {
155            let batch = batch?;
156            in_progress_file.append_batch(&batch)?;
157
158            max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
159        }
160
161        let file = in_progress_file.finish()?;
162
163        Ok(file.map(|f| (f, max_record_batch_size)))
164    }
165
166    /// Reads a spill file as a stream. The file must be created by the current `SpillManager`.
167    /// This method will generate output in FIFO order: the batch appended first
168    /// will be read first.
169    pub fn read_spill_as_stream(
170        &self,
171        spill_file_path: RefCountedTempFile,
172        max_record_batch_memory: Option<usize>,
173    ) -> Result<SendableRecordBatchStream> {
174        let stream = Box::pin(cooperative(SpillReaderStream::new(
175            Arc::clone(&self.schema),
176            spill_file_path,
177            max_record_batch_memory,
178        )));
179
180        Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
181    }
182
183    /// Same as `read_spill_as_stream`, but without buffering.
184    pub fn read_spill_as_stream_unbuffered(
185        &self,
186        spill_file_path: RefCountedTempFile,
187        max_record_batch_memory: Option<usize>,
188    ) -> Result<SendableRecordBatchStream> {
189        Ok(Box::pin(cooperative(SpillReaderStream::new(
190            Arc::clone(&self.schema),
191            spill_file_path,
192            max_record_batch_memory,
193        ))))
194    }
195}
196
197pub(crate) trait GetSlicedSize {
198    /// Returns the size of the `RecordBatch` when sliced.
199    /// Note: if multiple arrays or even a single array share the same data buffers, we may double count each buffer.
200    /// Therefore, make sure we call gc() or organize_stringview_arrays() before using this method.
201    fn get_sliced_size(&self) -> Result<usize>;
202}
203
204impl GetSlicedSize for RecordBatch {
205    fn get_sliced_size(&self) -> Result<usize> {
206        let mut total = 0;
207        for array in self.columns() {
208            let data = array.to_data();
209            total += data.get_slice_memory_size()?;
210
211            // While StringViewArray holds large data buffer for non inlined string, the Arrow layout (BufferSpec)
212            // does not include any data buffers. Currently, ArrayData::get_slice_memory_size()
213            // under-counts memory size by accounting only views buffer although data buffer is cloned during slice()
214            //
215            // Therefore, we manually add the sum of the lengths used by all non inlined views
216            // on top of the sliced size for views buffer. This matches the intended semantics of
217            // "bytes needed if we materialized exactly this slice into fresh buffers".
218            // This is a workaround until https://github.com/apache/arrow-rs/issues/8230
219            if let Some(sv) = array.as_any().downcast_ref::<StringViewArray>() {
220                for buffer in sv.data_buffers() {
221                    total += buffer.capacity();
222                }
223            }
224        }
225        Ok(total)
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use crate::spill::{get_record_batch_memory_size, spill_manager::GetSlicedSize};
232    use arrow::datatypes::{DataType, Field, Schema};
233    use arrow::{
234        array::{ArrayRef, StringViewArray},
235        record_batch::RecordBatch,
236    };
237    use datafusion_common::Result;
238    use std::sync::Arc;
239
240    #[test]
241    fn check_sliced_size_for_string_view_array() -> Result<()> {
242        let array_length = 50;
243        let short_len = 8;
244        let long_len = 25;
245
246        // Build StringViewArray that includes both inline strings and non inlined strings
247        let strings: Vec<String> = (0..array_length)
248            .map(|i| {
249                if i % 2 == 0 {
250                    "a".repeat(short_len)
251                } else {
252                    "b".repeat(long_len)
253                }
254            })
255            .collect();
256
257        let string_array = StringViewArray::from(strings);
258        let array_ref: ArrayRef = Arc::new(string_array);
259        let batch = RecordBatch::try_new(
260            Arc::new(Schema::new(vec![Field::new(
261                "strings",
262                DataType::Utf8View,
263                false,
264            )])),
265            vec![array_ref],
266        )
267        .unwrap();
268
269        // We did not slice the batch, so these two memory size should be equal
270        assert_eq!(
271            batch.get_sliced_size().unwrap(),
272            get_record_batch_memory_size(&batch)
273        );
274
275        // Slice the batch into half
276        let half_batch = batch.slice(0, array_length / 2);
277        // Now sliced_size is smaller because the views buffer is sliced
278        assert!(
279            half_batch.get_sliced_size().unwrap()
280                < get_record_batch_memory_size(&half_batch)
281        );
282        let data = arrow::array::Array::to_data(&half_batch.column(0));
283        let views_sliced_size = data.get_slice_memory_size()?;
284        // The sliced size should be larger than sliced views buffer size
285        assert!(views_sliced_size < half_batch.get_sliced_size().unwrap());
286
287        Ok(())
288    }
289}