datafusion_physical_plan/spill/
spill_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Define the `SpillManager` struct, which is responsible for reading and writing `RecordBatch`es to raw files based on the provided configurations.
19
20use arrow::array::StringViewArray;
21use arrow::datatypes::SchemaRef;
22use arrow::record_batch::RecordBatch;
23use datafusion_execution::runtime_env::RuntimeEnv;
24use std::sync::Arc;
25
26use datafusion_common::{config::SpillCompression, Result};
27use datafusion_execution::disk_manager::RefCountedTempFile;
28use datafusion_execution::SendableRecordBatchStream;
29
30use super::{in_progress_spill_file::InProgressSpillFile, SpillReaderStream};
31use crate::coop::cooperative;
32use crate::{common::spawn_buffered, metrics::SpillMetrics};
33
34/// The `SpillManager` is responsible for the following tasks:
35/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations.
36/// - Updating the associated metrics.
37///
38/// Note: The caller (external operators such as `SortExec`) is responsible for interpreting the spilled files.
39/// For example, all records within the same spill file are ordered according to a specific order.
40#[derive(Debug, Clone)]
41pub struct SpillManager {
42    env: Arc<RuntimeEnv>,
43    pub(crate) metrics: SpillMetrics,
44    schema: SchemaRef,
45    /// Number of batches to buffer in memory during disk reads
46    batch_read_buffer_capacity: usize,
47    /// general-purpose compression options
48    pub(crate) compression: SpillCompression,
49}
50
51impl SpillManager {
52    pub fn new(env: Arc<RuntimeEnv>, metrics: SpillMetrics, schema: SchemaRef) -> Self {
53        Self {
54            env,
55            metrics,
56            schema,
57            batch_read_buffer_capacity: 2,
58            compression: SpillCompression::default(),
59        }
60    }
61
62    pub fn with_batch_read_buffer_capacity(
63        mut self,
64        batch_read_buffer_capacity: usize,
65    ) -> Self {
66        self.batch_read_buffer_capacity = batch_read_buffer_capacity;
67        self
68    }
69
70    pub fn with_compression_type(mut self, spill_compression: SpillCompression) -> Self {
71        self.compression = spill_compression;
72        self
73    }
74
75    /// Returns the schema for batches managed by this SpillManager
76    pub fn schema(&self) -> &SchemaRef {
77        &self.schema
78    }
79
80    /// Creates a temporary file for in-progress operations, returning an error
81    /// message if file creation fails. The file can be used to append batches
82    /// incrementally and then finish the file when done.
83    pub fn create_in_progress_file(
84        &self,
85        request_msg: &str,
86    ) -> Result<InProgressSpillFile> {
87        let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?;
88        Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file))
89    }
90
91    /// Spill input `batches` into a single file in a atomic operation. If it is
92    /// intended to incrementally write in-memory batches into the same spill file,
93    /// use [`Self::create_in_progress_file`] instead.
94    /// None is returned if no batches are spilled.
95    ///
96    /// # Errors
97    /// - Returns an error if spilling would exceed the disk usage limit configured
98    ///   by `max_temp_directory_size` in `DiskManager`
99    pub fn spill_record_batch_and_finish(
100        &self,
101        batches: &[RecordBatch],
102        request_msg: &str,
103    ) -> Result<Option<RefCountedTempFile>> {
104        let mut in_progress_file = self.create_in_progress_file(request_msg)?;
105
106        for batch in batches {
107            in_progress_file.append_batch(batch)?;
108        }
109
110        in_progress_file.finish()
111    }
112
113    /// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method
114    /// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`.
115    ///
116    /// # Errors
117    /// - Returns an error if spilling would exceed the disk usage limit configured
118    ///   by `max_temp_directory_size` in `DiskManager`
119    pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory(
120        &self,
121        batch: &RecordBatch,
122        request_description: &str,
123        row_limit: usize,
124    ) -> Result<Option<(RefCountedTempFile, usize)>> {
125        let total_rows = batch.num_rows();
126        let mut batches = Vec::new();
127        let mut offset = 0;
128
129        // It's ok to calculate all slices first, because slicing is zero-copy.
130        while offset < total_rows {
131            let length = std::cmp::min(total_rows - offset, row_limit);
132            let sliced_batch = batch.slice(offset, length);
133            batches.push(sliced_batch);
134            offset += length;
135        }
136
137        let mut in_progress_file = self.create_in_progress_file(request_description)?;
138
139        let mut max_record_batch_size = 0;
140
141        for batch in batches {
142            in_progress_file.append_batch(&batch)?;
143
144            max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
145        }
146
147        let file = in_progress_file.finish()?;
148
149        Ok(file.map(|f| (f, max_record_batch_size)))
150    }
151
152    /// Spill a stream of `RecordBatch`es to disk and return the spill file and the size of the largest batch in memory
153    pub(crate) async fn spill_record_batch_stream_and_return_max_batch_memory(
154        &self,
155        stream: &mut SendableRecordBatchStream,
156        request_description: &str,
157    ) -> Result<Option<(RefCountedTempFile, usize)>> {
158        use futures::StreamExt;
159
160        let mut in_progress_file = self.create_in_progress_file(request_description)?;
161
162        let mut max_record_batch_size = 0;
163
164        while let Some(batch) = stream.next().await {
165            let batch = batch?;
166            in_progress_file.append_batch(&batch)?;
167
168            max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
169        }
170
171        let file = in_progress_file.finish()?;
172
173        Ok(file.map(|f| (f, max_record_batch_size)))
174    }
175
176    /// Reads a spill file as a stream. The file must be created by the current `SpillManager`.
177    /// This method will generate output in FIFO order: the batch appended first
178    /// will be read first.
179    pub fn read_spill_as_stream(
180        &self,
181        spill_file_path: RefCountedTempFile,
182        max_record_batch_memory: Option<usize>,
183    ) -> Result<SendableRecordBatchStream> {
184        let stream = Box::pin(cooperative(SpillReaderStream::new(
185            Arc::clone(&self.schema),
186            spill_file_path,
187            max_record_batch_memory,
188        )));
189
190        Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
191    }
192}
193
194pub(crate) trait GetSlicedSize {
195    /// Returns the size of the `RecordBatch` when sliced.
196    /// Note: if multiple arrays or even a single array share the same data buffers, we may double count each buffer.
197    /// Therefore, make sure we call gc() or organize_stringview_arrays() before using this method.
198    fn get_sliced_size(&self) -> Result<usize>;
199}
200
201impl GetSlicedSize for RecordBatch {
202    fn get_sliced_size(&self) -> Result<usize> {
203        let mut total = 0;
204        for array in self.columns() {
205            let data = array.to_data();
206            total += data.get_slice_memory_size()?;
207
208            // While StringViewArray holds large data buffer for non inlined string, the Arrow layout (BufferSpec)
209            // does not include any data buffers. Currently, ArrayData::get_slice_memory_size()
210            // under-counts memory size by accounting only views buffer although data buffer is cloned during slice()
211            //
212            // Therefore, we manually add the sum of the lengths used by all non inlined views
213            // on top of the sliced size for views buffer. This matches the intended semantics of
214            // "bytes needed if we materialized exactly this slice into fresh buffers".
215            // This is a workaround until https://github.com/apache/arrow-rs/issues/8230
216            if let Some(sv) = array.as_any().downcast_ref::<StringViewArray>() {
217                for buffer in sv.data_buffers() {
218                    total += buffer.capacity();
219                }
220            }
221        }
222        Ok(total)
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use crate::spill::{get_record_batch_memory_size, spill_manager::GetSlicedSize};
229    use arrow::datatypes::{DataType, Field, Schema};
230    use arrow::{
231        array::{ArrayRef, StringViewArray},
232        record_batch::RecordBatch,
233    };
234    use datafusion_common::Result;
235    use std::sync::Arc;
236
237    #[test]
238    fn check_sliced_size_for_string_view_array() -> Result<()> {
239        let array_length = 50;
240        let short_len = 8;
241        let long_len = 25;
242
243        // Build StringViewArray that includes both inline strings and non inlined strings
244        let strings: Vec<String> = (0..array_length)
245            .map(|i| {
246                if i % 2 == 0 {
247                    "a".repeat(short_len)
248                } else {
249                    "b".repeat(long_len)
250                }
251            })
252            .collect();
253
254        let string_array = StringViewArray::from(strings);
255        let array_ref: ArrayRef = Arc::new(string_array);
256        let batch = RecordBatch::try_new(
257            Arc::new(Schema::new(vec![Field::new(
258                "strings",
259                DataType::Utf8View,
260                false,
261            )])),
262            vec![array_ref],
263        )
264        .unwrap();
265
266        // We did not slice the batch, so these two memory size should be equal
267        assert_eq!(
268            batch.get_sliced_size().unwrap(),
269            get_record_batch_memory_size(&batch)
270        );
271
272        // Slice the batch into half
273        let half_batch = batch.slice(0, array_length / 2);
274        // Now sliced_size is smaller because the views buffer is sliced
275        assert!(
276            half_batch.get_sliced_size().unwrap()
277                < get_record_batch_memory_size(&half_batch)
278        );
279        let data = arrow::array::Array::to_data(&half_batch.column(0));
280        let views_sliced_size = data.get_slice_memory_size()?;
281        // The sliced size should be larger than sliced views buffer size
282        assert!(views_sliced_size < half_batch.get_sliced_size().unwrap());
283
284        Ok(())
285    }
286}