datafusion_execution/
disk_manager.rs1use datafusion_common::{
21 config_err, resources_datafusion_err, resources_err, DataFusionError, Result,
22};
23use log::debug;
24use parking_lot::Mutex;
25use rand::{thread_rng, Rng};
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tempfile::{Builder, NamedTempFile, TempDir};
30
31use crate::memory_pool::human_readable_size;
32
33const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; #[derive(Debug, Clone)]
37pub enum DiskManagerConfig {
38 Existing(Arc<DiskManager>),
40
41 NewOs,
44
45 NewSpecified(Vec<PathBuf>),
48
49 Disabled,
51}
52
53impl Default for DiskManagerConfig {
54 fn default() -> Self {
55 Self::NewOs
56 }
57}
58
59impl DiskManagerConfig {
60 pub fn new() -> Self {
62 Self::default()
63 }
64
65 pub fn new_existing(existing: Arc<DiskManager>) -> Self {
67 Self::Existing(existing)
68 }
69
70 pub fn new_specified(paths: Vec<PathBuf>) -> Self {
72 Self::NewSpecified(paths)
73 }
74}
75
76#[derive(Debug)]
79pub struct DiskManager {
80 local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
85 max_temp_directory_size: u64,
88 used_disk_space: Arc<AtomicU64>,
91}
92
93impl DiskManager {
94 pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
96 match config {
97 DiskManagerConfig::Existing(manager) => Ok(manager),
98 DiskManagerConfig::NewOs => Ok(Arc::new(Self {
99 local_dirs: Mutex::new(Some(vec![])),
100 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
101 used_disk_space: Arc::new(AtomicU64::new(0)),
102 })),
103 DiskManagerConfig::NewSpecified(conf_dirs) => {
104 let local_dirs = create_local_dirs(conf_dirs)?;
105 debug!(
106 "Created local dirs {:?} as DataFusion working directory",
107 local_dirs
108 );
109 Ok(Arc::new(Self {
110 local_dirs: Mutex::new(Some(local_dirs)),
111 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
112 used_disk_space: Arc::new(AtomicU64::new(0)),
113 }))
114 }
115 DiskManagerConfig::Disabled => Ok(Arc::new(Self {
116 local_dirs: Mutex::new(None),
117 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
118 used_disk_space: Arc::new(AtomicU64::new(0)),
119 })),
120 }
121 }
122
123 pub fn with_max_temp_directory_size(
124 mut self,
125 max_temp_directory_size: u64,
126 ) -> Result<Self> {
127 if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
130 return config_err!(
131 "Cannot set max temp directory size for a disk manager that spilling is disabled"
132 );
133 }
134
135 self.max_temp_directory_size = max_temp_directory_size;
136 Ok(self)
137 }
138
139 pub fn used_disk_space(&self) -> u64 {
140 self.used_disk_space.load(Ordering::Relaxed)
141 }
142
143 pub fn tmp_files_enabled(&self) -> bool {
147 self.local_dirs.lock().is_some()
148 }
149
150 pub fn create_tmp_file(
155 self: &Arc<Self>,
156 request_description: &str,
157 ) -> Result<RefCountedTempFile> {
158 let mut guard = self.local_dirs.lock();
159 let local_dirs = guard.as_mut().ok_or_else(|| {
160 resources_datafusion_err!(
161 "Memory Exhausted while {request_description} (DiskManager is disabled)"
162 )
163 })?;
164
165 if local_dirs.is_empty() {
167 let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
168
169 debug!(
170 "Created directory '{:?}' as DataFusion tempfile directory for {}",
171 tempdir.path().to_string_lossy(),
172 request_description,
173 );
174
175 local_dirs.push(Arc::new(tempdir));
176 }
177
178 let dir_index = thread_rng().gen_range(0..local_dirs.len());
179 Ok(RefCountedTempFile {
180 _parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
181 tempfile: Builder::new()
182 .tempfile_in(local_dirs[dir_index].as_ref())
183 .map_err(DataFusionError::IoError)?,
184 current_file_disk_usage: 0,
185 disk_manager: Arc::clone(self),
186 })
187 }
188}
189
190#[derive(Debug)]
199pub struct RefCountedTempFile {
200 _parent_temp_dir: Arc<TempDir>,
203 tempfile: NamedTempFile,
204 current_file_disk_usage: u64,
207 disk_manager: Arc<DiskManager>,
209}
210
211impl RefCountedTempFile {
212 pub fn path(&self) -> &Path {
213 self.tempfile.path()
214 }
215
216 pub fn inner(&self) -> &NamedTempFile {
217 &self.tempfile
218 }
219
220 pub fn update_disk_usage(&mut self) -> Result<()> {
225 let metadata = self.tempfile.as_file().metadata()?;
227 let new_disk_usage = metadata.len();
228
229 self.disk_manager
232 .used_disk_space
233 .fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
234 self.disk_manager
236 .used_disk_space
237 .fetch_add(new_disk_usage, Ordering::Relaxed);
238
239 let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
241 if global_disk_usage > self.disk_manager.max_temp_directory_size {
242 return resources_err!(
243 "The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
244 human_readable_size(self.disk_manager.max_temp_directory_size as usize)
245 );
246 }
247
248 self.current_file_disk_usage = new_disk_usage;
250
251 Ok(())
252 }
253}
254
255impl Drop for RefCountedTempFile {
257 fn drop(&mut self) {
258 self.disk_manager
260 .used_disk_space
261 .fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
262 }
263}
264
265fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> {
267 local_dirs
268 .iter()
269 .map(|root| {
270 if !Path::new(root).exists() {
271 std::fs::create_dir(root)?;
272 }
273 Builder::new()
274 .prefix("datafusion-")
275 .tempdir_in(root)
276 .map_err(DataFusionError::IoError)
277 })
278 .map(|result| result.map(Arc::new))
279 .collect()
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285
286 #[test]
287 fn lazy_temp_dir_creation() -> Result<()> {
288 let config = DiskManagerConfig::new();
290 let dm = DiskManager::try_new(config)?;
291
292 assert_eq!(0, local_dir_snapshot(&dm).len());
293
294 let actual = dm.create_tmp_file("Testing")?;
296
297 assert_eq!(1, local_dir_snapshot(&dm).len());
299
300 let local_dirs = local_dir_snapshot(&dm);
302 assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
303
304 Ok(())
305 }
306
307 fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
308 dm.local_dirs
309 .lock()
310 .iter()
311 .flatten()
312 .map(|p| p.path().into())
313 .collect()
314 }
315
316 #[test]
317 fn file_in_right_dir() -> Result<()> {
318 let local_dir1 = TempDir::new()?;
319 let local_dir2 = TempDir::new()?;
320 let local_dir3 = TempDir::new()?;
321 let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
322 let config = DiskManagerConfig::new_specified(
323 local_dirs.iter().map(|p| p.into()).collect(),
324 );
325
326 let dm = DiskManager::try_new(config)?;
327 assert!(dm.tmp_files_enabled());
328 let actual = dm.create_tmp_file("Testing")?;
329
330 assert_path_in_dirs(actual.path(), local_dirs.into_iter());
332
333 Ok(())
334 }
335
336 #[test]
337 fn test_disabled_disk_manager() {
338 let config = DiskManagerConfig::Disabled;
339 let manager = DiskManager::try_new(config).unwrap();
340 assert!(!manager.tmp_files_enabled());
341 assert_eq!(
342 manager.create_tmp_file("Testing").unwrap_err().strip_backtrace(),
343 "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
344 )
345 }
346
347 #[test]
348 fn test_disk_manager_create_spill_folder() {
349 let dir = TempDir::new().unwrap();
350 let config = DiskManagerConfig::new_specified(vec![dir.path().to_owned()]);
351
352 DiskManager::try_new(config)
353 .unwrap()
354 .create_tmp_file("Testing")
355 .unwrap();
356 }
357
358 fn assert_path_in_dirs<'a>(
360 file_path: &'a Path,
361 dirs: impl Iterator<Item = &'a Path>,
362 ) {
363 let dirs: Vec<&Path> = dirs.collect();
364
365 let found = dirs.iter().any(|dir_path| {
366 file_path
367 .ancestors()
368 .any(|candidate_path| *dir_path == candidate_path)
369 });
370
371 assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
372 }
373
374 #[test]
375 fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
376 let config = DiskManagerConfig::new();
378 let dm = DiskManager::try_new(config)?;
379 let temp_file = dm.create_tmp_file("Testing")?;
380 let temp_file_path = temp_file.path().to_owned();
381 assert!(temp_file_path.exists());
382
383 drop(dm);
384 assert!(temp_file_path.exists());
385
386 drop(temp_file);
387 assert!(!temp_file_path.exists());
388
389 let local_dir1 = TempDir::new()?;
391 let local_dir2 = TempDir::new()?;
392 let local_dir3 = TempDir::new()?;
393 let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
394 let config = DiskManagerConfig::new_specified(
395 local_dirs.iter().map(|p| p.into()).collect(),
396 );
397 let dm = DiskManager::try_new(config)?;
398 let temp_file = dm.create_tmp_file("Testing")?;
399 let temp_file_path = temp_file.path().to_owned();
400 assert!(temp_file_path.exists());
401
402 drop(dm);
403 assert!(temp_file_path.exists());
404
405 drop(temp_file);
406 assert!(!temp_file_path.exists());
407
408 Ok(())
409 }
410}