datafusion_physical_plan/spill/
spill_manager.rs1use arrow::array::StringViewArray;
21use arrow::datatypes::SchemaRef;
22use arrow::record_batch::RecordBatch;
23use datafusion_execution::runtime_env::RuntimeEnv;
24use std::sync::Arc;
25
26use datafusion_common::{config::SpillCompression, Result};
27use datafusion_execution::disk_manager::RefCountedTempFile;
28use datafusion_execution::SendableRecordBatchStream;
29
30use super::{in_progress_spill_file::InProgressSpillFile, SpillReaderStream};
31use crate::coop::cooperative;
32use crate::{common::spawn_buffered, metrics::SpillMetrics};
33
34#[derive(Debug, Clone)]
41pub struct SpillManager {
42 env: Arc<RuntimeEnv>,
43 pub(crate) metrics: SpillMetrics,
44 schema: SchemaRef,
45 batch_read_buffer_capacity: usize,
47 pub(crate) compression: SpillCompression,
49}
50
51impl SpillManager {
52 pub fn new(env: Arc<RuntimeEnv>, metrics: SpillMetrics, schema: SchemaRef) -> Self {
53 Self {
54 env,
55 metrics,
56 schema,
57 batch_read_buffer_capacity: 2,
58 compression: SpillCompression::default(),
59 }
60 }
61
62 pub fn with_batch_read_buffer_capacity(
63 mut self,
64 batch_read_buffer_capacity: usize,
65 ) -> Self {
66 self.batch_read_buffer_capacity = batch_read_buffer_capacity;
67 self
68 }
69
70 pub fn with_compression_type(mut self, spill_compression: SpillCompression) -> Self {
71 self.compression = spill_compression;
72 self
73 }
74
75 pub fn schema(&self) -> &SchemaRef {
77 &self.schema
78 }
79
80 pub fn create_in_progress_file(
84 &self,
85 request_msg: &str,
86 ) -> Result<InProgressSpillFile> {
87 let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?;
88 Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file))
89 }
90
91 pub fn spill_record_batch_and_finish(
100 &self,
101 batches: &[RecordBatch],
102 request_msg: &str,
103 ) -> Result<Option<RefCountedTempFile>> {
104 let mut in_progress_file = self.create_in_progress_file(request_msg)?;
105
106 for batch in batches {
107 in_progress_file.append_batch(batch)?;
108 }
109
110 in_progress_file.finish()
111 }
112
113 pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory(
120 &self,
121 batch: &RecordBatch,
122 request_description: &str,
123 row_limit: usize,
124 ) -> Result<Option<(RefCountedTempFile, usize)>> {
125 let total_rows = batch.num_rows();
126 let mut batches = Vec::new();
127 let mut offset = 0;
128
129 while offset < total_rows {
131 let length = std::cmp::min(total_rows - offset, row_limit);
132 let sliced_batch = batch.slice(offset, length);
133 batches.push(sliced_batch);
134 offset += length;
135 }
136
137 let mut in_progress_file = self.create_in_progress_file(request_description)?;
138
139 let mut max_record_batch_size = 0;
140
141 for batch in batches {
142 in_progress_file.append_batch(&batch)?;
143
144 max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
145 }
146
147 let file = in_progress_file.finish()?;
148
149 Ok(file.map(|f| (f, max_record_batch_size)))
150 }
151
152 pub(crate) async fn spill_record_batch_stream_and_return_max_batch_memory(
154 &self,
155 stream: &mut SendableRecordBatchStream,
156 request_description: &str,
157 ) -> Result<Option<(RefCountedTempFile, usize)>> {
158 use futures::StreamExt;
159
160 let mut in_progress_file = self.create_in_progress_file(request_description)?;
161
162 let mut max_record_batch_size = 0;
163
164 while let Some(batch) = stream.next().await {
165 let batch = batch?;
166 in_progress_file.append_batch(&batch)?;
167
168 max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
169 }
170
171 let file = in_progress_file.finish()?;
172
173 Ok(file.map(|f| (f, max_record_batch_size)))
174 }
175
176 pub fn read_spill_as_stream(
180 &self,
181 spill_file_path: RefCountedTempFile,
182 max_record_batch_memory: Option<usize>,
183 ) -> Result<SendableRecordBatchStream> {
184 let stream = Box::pin(cooperative(SpillReaderStream::new(
185 Arc::clone(&self.schema),
186 spill_file_path,
187 max_record_batch_memory,
188 )));
189
190 Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
191 }
192}
193
194pub(crate) trait GetSlicedSize {
195 fn get_sliced_size(&self) -> Result<usize>;
199}
200
201impl GetSlicedSize for RecordBatch {
202 fn get_sliced_size(&self) -> Result<usize> {
203 let mut total = 0;
204 for array in self.columns() {
205 let data = array.to_data();
206 total += data.get_slice_memory_size()?;
207
208 if let Some(sv) = array.as_any().downcast_ref::<StringViewArray>() {
217 for buffer in sv.data_buffers() {
218 total += buffer.capacity();
219 }
220 }
221 }
222 Ok(total)
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use crate::spill::{get_record_batch_memory_size, spill_manager::GetSlicedSize};
229 use arrow::datatypes::{DataType, Field, Schema};
230 use arrow::{
231 array::{ArrayRef, StringViewArray},
232 record_batch::RecordBatch,
233 };
234 use datafusion_common::Result;
235 use std::sync::Arc;
236
237 #[test]
238 fn check_sliced_size_for_string_view_array() -> Result<()> {
239 let array_length = 50;
240 let short_len = 8;
241 let long_len = 25;
242
243 let strings: Vec<String> = (0..array_length)
245 .map(|i| {
246 if i % 2 == 0 {
247 "a".repeat(short_len)
248 } else {
249 "b".repeat(long_len)
250 }
251 })
252 .collect();
253
254 let string_array = StringViewArray::from(strings);
255 let array_ref: ArrayRef = Arc::new(string_array);
256 let batch = RecordBatch::try_new(
257 Arc::new(Schema::new(vec![Field::new(
258 "strings",
259 DataType::Utf8View,
260 false,
261 )])),
262 vec![array_ref],
263 )
264 .unwrap();
265
266 assert_eq!(
268 batch.get_sliced_size().unwrap(),
269 get_record_batch_memory_size(&batch)
270 );
271
272 let half_batch = batch.slice(0, array_length / 2);
274 assert!(
276 half_batch.get_sliced_size().unwrap()
277 < get_record_batch_memory_size(&half_batch)
278 );
279 let data = arrow::array::Array::to_data(&half_batch.column(0));
280 let views_sliced_size = data.get_slice_memory_size()?;
281 assert!(views_sliced_size < half_batch.get_sliced_size().unwrap());
283
284 Ok(())
285 }
286}