datafusion_physical_plan/spill/
spill_manager.rs1use super::{SpillReaderStream, in_progress_spill_file::InProgressSpillFile};
21use crate::coop::cooperative;
22use crate::{common::spawn_buffered, metrics::SpillMetrics};
23use arrow::array::StringViewArray;
24use arrow::datatypes::SchemaRef;
25use arrow::record_batch::RecordBatch;
26use datafusion_common::utils::memory::get_record_batch_memory_size;
27use datafusion_common::{DataFusionError, Result, config::SpillCompression};
28use datafusion_execution::SendableRecordBatchStream;
29use datafusion_execution::disk_manager::RefCountedTempFile;
30use datafusion_execution::runtime_env::RuntimeEnv;
31use std::borrow::Borrow;
32use std::sync::Arc;
33
34#[derive(Debug, Clone)]
41pub struct SpillManager {
42 env: Arc<RuntimeEnv>,
43 pub(crate) metrics: SpillMetrics,
44 schema: SchemaRef,
45 batch_read_buffer_capacity: usize,
47 pub(crate) compression: SpillCompression,
49}
50
51impl SpillManager {
52 pub fn new(env: Arc<RuntimeEnv>, metrics: SpillMetrics, schema: SchemaRef) -> Self {
53 Self {
54 env,
55 metrics,
56 schema,
57 batch_read_buffer_capacity: 2,
58 compression: SpillCompression::default(),
59 }
60 }
61
62 pub fn with_batch_read_buffer_capacity(
63 mut self,
64 batch_read_buffer_capacity: usize,
65 ) -> Self {
66 self.batch_read_buffer_capacity = batch_read_buffer_capacity;
67 self
68 }
69
70 pub fn with_compression_type(mut self, spill_compression: SpillCompression) -> Self {
71 self.compression = spill_compression;
72 self
73 }
74
75 pub fn schema(&self) -> &SchemaRef {
77 &self.schema
78 }
79
80 pub fn create_in_progress_file(
84 &self,
85 request_msg: &str,
86 ) -> Result<InProgressSpillFile> {
87 let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?;
88 Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file))
89 }
90
91 pub fn spill_record_batch_and_finish(
100 &self,
101 batches: &[RecordBatch],
102 request_msg: &str,
103 ) -> Result<Option<RefCountedTempFile>> {
104 let mut in_progress_file = self.create_in_progress_file(request_msg)?;
105
106 for batch in batches {
107 in_progress_file.append_batch(batch)?;
108 }
109
110 in_progress_file.finish()
111 }
112
113 pub(crate) fn spill_record_batch_iter_and_return_max_batch_memory(
116 &self,
117 mut iter: impl Iterator<Item = Result<impl Borrow<RecordBatch>>>,
118 request_description: &str,
119 ) -> Result<Option<(RefCountedTempFile, usize)>> {
120 let mut in_progress_file = self.create_in_progress_file(request_description)?;
121
122 let mut max_record_batch_size = 0;
123
124 iter.try_for_each(|batch| {
125 let batch = batch?;
126 let borrowed = batch.borrow();
127 if borrowed.num_rows() == 0 {
128 return Ok(());
129 }
130 in_progress_file.append_batch(borrowed)?;
131
132 max_record_batch_size =
133 max_record_batch_size.max(get_record_batch_memory_size(borrowed));
134 Result::<_, DataFusionError>::Ok(())
135 })?;
136
137 let file = in_progress_file.finish()?;
138
139 Ok(file.map(|f| (f, max_record_batch_size)))
140 }
141
142 pub(crate) async fn spill_record_batch_stream_and_return_max_batch_memory(
144 &self,
145 stream: &mut SendableRecordBatchStream,
146 request_description: &str,
147 ) -> Result<Option<(RefCountedTempFile, usize)>> {
148 use futures::StreamExt;
149
150 let mut in_progress_file = self.create_in_progress_file(request_description)?;
151
152 let mut max_record_batch_size = 0;
153
154 while let Some(batch) = stream.next().await {
155 let batch = batch?;
156 in_progress_file.append_batch(&batch)?;
157
158 max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
159 }
160
161 let file = in_progress_file.finish()?;
162
163 Ok(file.map(|f| (f, max_record_batch_size)))
164 }
165
166 pub fn read_spill_as_stream(
170 &self,
171 spill_file_path: RefCountedTempFile,
172 max_record_batch_memory: Option<usize>,
173 ) -> Result<SendableRecordBatchStream> {
174 let stream = Box::pin(cooperative(SpillReaderStream::new(
175 Arc::clone(&self.schema),
176 spill_file_path,
177 max_record_batch_memory,
178 )));
179
180 Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
181 }
182
183 pub fn read_spill_as_stream_unbuffered(
185 &self,
186 spill_file_path: RefCountedTempFile,
187 max_record_batch_memory: Option<usize>,
188 ) -> Result<SendableRecordBatchStream> {
189 Ok(Box::pin(cooperative(SpillReaderStream::new(
190 Arc::clone(&self.schema),
191 spill_file_path,
192 max_record_batch_memory,
193 ))))
194 }
195}
196
197pub(crate) trait GetSlicedSize {
198 fn get_sliced_size(&self) -> Result<usize>;
202}
203
204impl GetSlicedSize for RecordBatch {
205 fn get_sliced_size(&self) -> Result<usize> {
206 let mut total = 0;
207 for array in self.columns() {
208 let data = array.to_data();
209 total += data.get_slice_memory_size()?;
210
211 if let Some(sv) = array.as_any().downcast_ref::<StringViewArray>() {
220 for buffer in sv.data_buffers() {
221 total += buffer.capacity();
222 }
223 }
224 }
225 Ok(total)
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use crate::spill::{get_record_batch_memory_size, spill_manager::GetSlicedSize};
232 use arrow::datatypes::{DataType, Field, Schema};
233 use arrow::{
234 array::{ArrayRef, StringViewArray},
235 record_batch::RecordBatch,
236 };
237 use datafusion_common::Result;
238 use std::sync::Arc;
239
240 #[test]
241 fn check_sliced_size_for_string_view_array() -> Result<()> {
242 let array_length = 50;
243 let short_len = 8;
244 let long_len = 25;
245
246 let strings: Vec<String> = (0..array_length)
248 .map(|i| {
249 if i % 2 == 0 {
250 "a".repeat(short_len)
251 } else {
252 "b".repeat(long_len)
253 }
254 })
255 .collect();
256
257 let string_array = StringViewArray::from(strings);
258 let array_ref: ArrayRef = Arc::new(string_array);
259 let batch = RecordBatch::try_new(
260 Arc::new(Schema::new(vec![Field::new(
261 "strings",
262 DataType::Utf8View,
263 false,
264 )])),
265 vec![array_ref],
266 )
267 .unwrap();
268
269 assert_eq!(
271 batch.get_sliced_size().unwrap(),
272 get_record_batch_memory_size(&batch)
273 );
274
275 let half_batch = batch.slice(0, array_length / 2);
277 assert!(
279 half_batch.get_sliced_size().unwrap()
280 < get_record_batch_memory_size(&half_batch)
281 );
282 let data = arrow::array::Array::to_data(&half_batch.column(0));
283 let views_sliced_size = data.get_slice_memory_size()?;
284 assert!(views_sliced_size < half_batch.get_sliced_size().unwrap());
286
287 Ok(())
288 }
289}