Skip to main content

datafusion_physical_plan/spill/
spill_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Define the `SpillManager` struct, which is responsible for reading and writing `RecordBatch`es to raw files based on the provided configurations.
19
20use arrow::array::StringViewArray;
21use arrow::datatypes::SchemaRef;
22use arrow::record_batch::RecordBatch;
23use datafusion_common::{Result, config::SpillCompression};
24use datafusion_execution::SendableRecordBatchStream;
25use datafusion_execution::disk_manager::RefCountedTempFile;
26use datafusion_execution::runtime_env::RuntimeEnv;
27use std::sync::Arc;
28
29use super::{SpillReaderStream, in_progress_spill_file::InProgressSpillFile};
30use crate::coop::cooperative;
31use crate::{common::spawn_buffered, metrics::SpillMetrics};
32
33/// The `SpillManager` is responsible for the following tasks:
34/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations.
35/// - Updating the associated metrics.
36///
37/// Note: The caller (external operators such as `SortExec`) is responsible for interpreting the spilled files.
38/// For example, all records within the same spill file are ordered according to a specific order.
39#[derive(Debug, Clone)]
40pub struct SpillManager {
41    env: Arc<RuntimeEnv>,
42    pub(crate) metrics: SpillMetrics,
43    schema: SchemaRef,
44    /// Number of batches to buffer in memory during disk reads
45    batch_read_buffer_capacity: usize,
46    /// general-purpose compression options
47    pub(crate) compression: SpillCompression,
48}
49
50impl SpillManager {
51    pub fn new(env: Arc<RuntimeEnv>, metrics: SpillMetrics, schema: SchemaRef) -> Self {
52        Self {
53            env,
54            metrics,
55            schema,
56            batch_read_buffer_capacity: 2,
57            compression: SpillCompression::default(),
58        }
59    }
60
61    pub fn with_batch_read_buffer_capacity(
62        mut self,
63        batch_read_buffer_capacity: usize,
64    ) -> Self {
65        self.batch_read_buffer_capacity = batch_read_buffer_capacity;
66        self
67    }
68
69    pub fn with_compression_type(mut self, spill_compression: SpillCompression) -> Self {
70        self.compression = spill_compression;
71        self
72    }
73
74    /// Returns the schema for batches managed by this SpillManager
75    pub fn schema(&self) -> &SchemaRef {
76        &self.schema
77    }
78
79    /// Creates a temporary file for in-progress operations, returning an error
80    /// message if file creation fails. The file can be used to append batches
81    /// incrementally and then finish the file when done.
82    pub fn create_in_progress_file(
83        &self,
84        request_msg: &str,
85    ) -> Result<InProgressSpillFile> {
86        let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?;
87        Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file))
88    }
89
90    /// Spill input `batches` into a single file in a atomic operation. If it is
91    /// intended to incrementally write in-memory batches into the same spill file,
92    /// use [`Self::create_in_progress_file`] instead.
93    /// None is returned if no batches are spilled.
94    ///
95    /// # Errors
96    /// - Returns an error if spilling would exceed the disk usage limit configured
97    ///   by `max_temp_directory_size` in `DiskManager`
98    pub fn spill_record_batch_and_finish(
99        &self,
100        batches: &[RecordBatch],
101        request_msg: &str,
102    ) -> Result<Option<RefCountedTempFile>> {
103        let mut in_progress_file = self.create_in_progress_file(request_msg)?;
104
105        for batch in batches {
106            in_progress_file.append_batch(batch)?;
107        }
108
109        in_progress_file.finish()
110    }
111
112    /// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method
113    /// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`.
114    ///
115    /// # Errors
116    /// - Returns an error if spilling would exceed the disk usage limit configured
117    ///   by `max_temp_directory_size` in `DiskManager`
118    pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory(
119        &self,
120        batch: &RecordBatch,
121        request_description: &str,
122        row_limit: usize,
123    ) -> Result<Option<(RefCountedTempFile, usize)>> {
124        let total_rows = batch.num_rows();
125        let mut batches = Vec::new();
126        let mut offset = 0;
127
128        // It's ok to calculate all slices first, because slicing is zero-copy.
129        while offset < total_rows {
130            let length = std::cmp::min(total_rows - offset, row_limit);
131            let sliced_batch = batch.slice(offset, length);
132            batches.push(sliced_batch);
133            offset += length;
134        }
135
136        let mut in_progress_file = self.create_in_progress_file(request_description)?;
137
138        let mut max_record_batch_size = 0;
139
140        for batch in batches {
141            in_progress_file.append_batch(&batch)?;
142
143            max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
144        }
145
146        let file = in_progress_file.finish()?;
147
148        Ok(file.map(|f| (f, max_record_batch_size)))
149    }
150
151    /// Spill a stream of `RecordBatch`es to disk and return the spill file and the size of the largest batch in memory
152    pub(crate) async fn spill_record_batch_stream_and_return_max_batch_memory(
153        &self,
154        stream: &mut SendableRecordBatchStream,
155        request_description: &str,
156    ) -> Result<Option<(RefCountedTempFile, usize)>> {
157        use futures::StreamExt;
158
159        let mut in_progress_file = self.create_in_progress_file(request_description)?;
160
161        let mut max_record_batch_size = 0;
162
163        while let Some(batch) = stream.next().await {
164            let batch = batch?;
165            in_progress_file.append_batch(&batch)?;
166
167            max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
168        }
169
170        let file = in_progress_file.finish()?;
171
172        Ok(file.map(|f| (f, max_record_batch_size)))
173    }
174
175    /// Reads a spill file as a stream. The file must be created by the current `SpillManager`.
176    /// This method will generate output in FIFO order: the batch appended first
177    /// will be read first.
178    pub fn read_spill_as_stream(
179        &self,
180        spill_file_path: RefCountedTempFile,
181        max_record_batch_memory: Option<usize>,
182    ) -> Result<SendableRecordBatchStream> {
183        let stream = Box::pin(cooperative(SpillReaderStream::new(
184            Arc::clone(&self.schema),
185            spill_file_path,
186            max_record_batch_memory,
187        )));
188
189        Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
190    }
191
192    /// Same as `read_spill_as_stream`, but without buffering.
193    pub fn read_spill_as_stream_unbuffered(
194        &self,
195        spill_file_path: RefCountedTempFile,
196        max_record_batch_memory: Option<usize>,
197    ) -> Result<SendableRecordBatchStream> {
198        Ok(Box::pin(cooperative(SpillReaderStream::new(
199            Arc::clone(&self.schema),
200            spill_file_path,
201            max_record_batch_memory,
202        ))))
203    }
204}
205
206pub(crate) trait GetSlicedSize {
207    /// Returns the size of the `RecordBatch` when sliced.
208    /// Note: if multiple arrays or even a single array share the same data buffers, we may double count each buffer.
209    /// Therefore, make sure we call gc() or organize_stringview_arrays() before using this method.
210    fn get_sliced_size(&self) -> Result<usize>;
211}
212
213impl GetSlicedSize for RecordBatch {
214    fn get_sliced_size(&self) -> Result<usize> {
215        let mut total = 0;
216        for array in self.columns() {
217            let data = array.to_data();
218            total += data.get_slice_memory_size()?;
219
220            // While StringViewArray holds large data buffer for non inlined string, the Arrow layout (BufferSpec)
221            // does not include any data buffers. Currently, ArrayData::get_slice_memory_size()
222            // under-counts memory size by accounting only views buffer although data buffer is cloned during slice()
223            //
224            // Therefore, we manually add the sum of the lengths used by all non inlined views
225            // on top of the sliced size for views buffer. This matches the intended semantics of
226            // "bytes needed if we materialized exactly this slice into fresh buffers".
227            // This is a workaround until https://github.com/apache/arrow-rs/issues/8230
228            if let Some(sv) = array.as_any().downcast_ref::<StringViewArray>() {
229                for buffer in sv.data_buffers() {
230                    total += buffer.capacity();
231                }
232            }
233        }
234        Ok(total)
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use crate::spill::{get_record_batch_memory_size, spill_manager::GetSlicedSize};
241    use arrow::datatypes::{DataType, Field, Schema};
242    use arrow::{
243        array::{ArrayRef, StringViewArray},
244        record_batch::RecordBatch,
245    };
246    use datafusion_common::Result;
247    use std::sync::Arc;
248
249    #[test]
250    fn check_sliced_size_for_string_view_array() -> Result<()> {
251        let array_length = 50;
252        let short_len = 8;
253        let long_len = 25;
254
255        // Build StringViewArray that includes both inline strings and non inlined strings
256        let strings: Vec<String> = (0..array_length)
257            .map(|i| {
258                if i % 2 == 0 {
259                    "a".repeat(short_len)
260                } else {
261                    "b".repeat(long_len)
262                }
263            })
264            .collect();
265
266        let string_array = StringViewArray::from(strings);
267        let array_ref: ArrayRef = Arc::new(string_array);
268        let batch = RecordBatch::try_new(
269            Arc::new(Schema::new(vec![Field::new(
270                "strings",
271                DataType::Utf8View,
272                false,
273            )])),
274            vec![array_ref],
275        )
276        .unwrap();
277
278        // We did not slice the batch, so these two memory size should be equal
279        assert_eq!(
280            batch.get_sliced_size().unwrap(),
281            get_record_batch_memory_size(&batch)
282        );
283
284        // Slice the batch into half
285        let half_batch = batch.slice(0, array_length / 2);
286        // Now sliced_size is smaller because the views buffer is sliced
287        assert!(
288            half_batch.get_sliced_size().unwrap()
289                < get_record_batch_memory_size(&half_batch)
290        );
291        let data = arrow::array::Array::to_data(&half_batch.column(0));
292        let views_sliced_size = data.get_slice_memory_size()?;
293        // The sliced size should be larger than sliced views buffer size
294        assert!(views_sliced_size < half_batch.get_sliced_size().unwrap());
295
296        Ok(())
297    }
298}