datafusion_physical_plan/spill/
spill_manager.rs1use arrow::array::StringViewArray;
21use arrow::datatypes::SchemaRef;
22use arrow::record_batch::RecordBatch;
23use datafusion_common::{Result, config::SpillCompression};
24use datafusion_execution::SendableRecordBatchStream;
25use datafusion_execution::disk_manager::RefCountedTempFile;
26use datafusion_execution::runtime_env::RuntimeEnv;
27use std::sync::Arc;
28
29use super::{SpillReaderStream, in_progress_spill_file::InProgressSpillFile};
30use crate::coop::cooperative;
31use crate::{common::spawn_buffered, metrics::SpillMetrics};
32
33#[derive(Debug, Clone)]
40pub struct SpillManager {
41 env: Arc<RuntimeEnv>,
42 pub(crate) metrics: SpillMetrics,
43 schema: SchemaRef,
44 batch_read_buffer_capacity: usize,
46 pub(crate) compression: SpillCompression,
48}
49
50impl SpillManager {
51 pub fn new(env: Arc<RuntimeEnv>, metrics: SpillMetrics, schema: SchemaRef) -> Self {
52 Self {
53 env,
54 metrics,
55 schema,
56 batch_read_buffer_capacity: 2,
57 compression: SpillCompression::default(),
58 }
59 }
60
61 pub fn with_batch_read_buffer_capacity(
62 mut self,
63 batch_read_buffer_capacity: usize,
64 ) -> Self {
65 self.batch_read_buffer_capacity = batch_read_buffer_capacity;
66 self
67 }
68
69 pub fn with_compression_type(mut self, spill_compression: SpillCompression) -> Self {
70 self.compression = spill_compression;
71 self
72 }
73
74 pub fn schema(&self) -> &SchemaRef {
76 &self.schema
77 }
78
79 pub fn create_in_progress_file(
83 &self,
84 request_msg: &str,
85 ) -> Result<InProgressSpillFile> {
86 let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?;
87 Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file))
88 }
89
90 pub fn spill_record_batch_and_finish(
99 &self,
100 batches: &[RecordBatch],
101 request_msg: &str,
102 ) -> Result<Option<RefCountedTempFile>> {
103 let mut in_progress_file = self.create_in_progress_file(request_msg)?;
104
105 for batch in batches {
106 in_progress_file.append_batch(batch)?;
107 }
108
109 in_progress_file.finish()
110 }
111
112 pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory(
119 &self,
120 batch: &RecordBatch,
121 request_description: &str,
122 row_limit: usize,
123 ) -> Result<Option<(RefCountedTempFile, usize)>> {
124 let total_rows = batch.num_rows();
125 let mut batches = Vec::new();
126 let mut offset = 0;
127
128 while offset < total_rows {
130 let length = std::cmp::min(total_rows - offset, row_limit);
131 let sliced_batch = batch.slice(offset, length);
132 batches.push(sliced_batch);
133 offset += length;
134 }
135
136 let mut in_progress_file = self.create_in_progress_file(request_description)?;
137
138 let mut max_record_batch_size = 0;
139
140 for batch in batches {
141 in_progress_file.append_batch(&batch)?;
142
143 max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
144 }
145
146 let file = in_progress_file.finish()?;
147
148 Ok(file.map(|f| (f, max_record_batch_size)))
149 }
150
151 pub(crate) async fn spill_record_batch_stream_and_return_max_batch_memory(
153 &self,
154 stream: &mut SendableRecordBatchStream,
155 request_description: &str,
156 ) -> Result<Option<(RefCountedTempFile, usize)>> {
157 use futures::StreamExt;
158
159 let mut in_progress_file = self.create_in_progress_file(request_description)?;
160
161 let mut max_record_batch_size = 0;
162
163 while let Some(batch) = stream.next().await {
164 let batch = batch?;
165 in_progress_file.append_batch(&batch)?;
166
167 max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
168 }
169
170 let file = in_progress_file.finish()?;
171
172 Ok(file.map(|f| (f, max_record_batch_size)))
173 }
174
175 pub fn read_spill_as_stream(
179 &self,
180 spill_file_path: RefCountedTempFile,
181 max_record_batch_memory: Option<usize>,
182 ) -> Result<SendableRecordBatchStream> {
183 let stream = Box::pin(cooperative(SpillReaderStream::new(
184 Arc::clone(&self.schema),
185 spill_file_path,
186 max_record_batch_memory,
187 )));
188
189 Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
190 }
191}
192
193pub(crate) trait GetSlicedSize {
194 fn get_sliced_size(&self) -> Result<usize>;
198}
199
200impl GetSlicedSize for RecordBatch {
201 fn get_sliced_size(&self) -> Result<usize> {
202 let mut total = 0;
203 for array in self.columns() {
204 let data = array.to_data();
205 total += data.get_slice_memory_size()?;
206
207 if let Some(sv) = array.as_any().downcast_ref::<StringViewArray>() {
216 for buffer in sv.data_buffers() {
217 total += buffer.capacity();
218 }
219 }
220 }
221 Ok(total)
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use crate::spill::{get_record_batch_memory_size, spill_manager::GetSlicedSize};
228 use arrow::datatypes::{DataType, Field, Schema};
229 use arrow::{
230 array::{ArrayRef, StringViewArray},
231 record_batch::RecordBatch,
232 };
233 use datafusion_common::Result;
234 use std::sync::Arc;
235
236 #[test]
237 fn check_sliced_size_for_string_view_array() -> Result<()> {
238 let array_length = 50;
239 let short_len = 8;
240 let long_len = 25;
241
242 let strings: Vec<String> = (0..array_length)
244 .map(|i| {
245 if i % 2 == 0 {
246 "a".repeat(short_len)
247 } else {
248 "b".repeat(long_len)
249 }
250 })
251 .collect();
252
253 let string_array = StringViewArray::from(strings);
254 let array_ref: ArrayRef = Arc::new(string_array);
255 let batch = RecordBatch::try_new(
256 Arc::new(Schema::new(vec![Field::new(
257 "strings",
258 DataType::Utf8View,
259 false,
260 )])),
261 vec![array_ref],
262 )
263 .unwrap();
264
265 assert_eq!(
267 batch.get_sliced_size().unwrap(),
268 get_record_batch_memory_size(&batch)
269 );
270
271 let half_batch = batch.slice(0, array_length / 2);
273 assert!(
275 half_batch.get_sliced_size().unwrap()
276 < get_record_batch_memory_size(&half_batch)
277 );
278 let data = arrow::array::Array::to_data(&half_batch.column(0));
279 let views_sliced_size = data.get_slice_memory_size()?;
280 assert!(views_sliced_size < half_batch.get_sliced_size().unwrap());
282
283 Ok(())
284 }
285}