1use datafusion_common::{
21 config_err, resources_datafusion_err, resources_err, DataFusionError, Result,
22};
23use log::debug;
24use parking_lot::Mutex;
25use rand::{rng, Rng};
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tempfile::{Builder, NamedTempFile, TempDir};
30
31use crate::memory_pool::human_readable_size;
32
33const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; #[derive(Clone, Debug)]
37pub struct DiskManagerBuilder {
38 mode: DiskManagerMode,
40 max_temp_directory_size: u64,
43}
44
45impl Default for DiskManagerBuilder {
46 fn default() -> Self {
47 Self {
48 mode: DiskManagerMode::OsTmpDirectory,
49 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
50 }
51 }
52}
53
54impl DiskManagerBuilder {
55 pub fn set_mode(&mut self, mode: DiskManagerMode) {
56 self.mode = mode;
57 }
58
59 pub fn with_mode(mut self, mode: DiskManagerMode) -> Self {
60 self.set_mode(mode);
61 self
62 }
63
64 pub fn set_max_temp_directory_size(&mut self, value: u64) {
65 self.max_temp_directory_size = value;
66 }
67
68 pub fn with_max_temp_directory_size(mut self, value: u64) -> Self {
69 self.set_max_temp_directory_size(value);
70 self
71 }
72
73 pub fn build(self) -> Result<DiskManager> {
75 match self.mode {
76 DiskManagerMode::OsTmpDirectory => Ok(DiskManager {
77 local_dirs: Mutex::new(Some(vec![])),
78 max_temp_directory_size: self.max_temp_directory_size,
79 used_disk_space: Arc::new(AtomicU64::new(0)),
80 }),
81 DiskManagerMode::Directories(conf_dirs) => {
82 let local_dirs = create_local_dirs(conf_dirs)?;
83 debug!(
84 "Created local dirs {local_dirs:?} as DataFusion working directory"
85 );
86 Ok(DiskManager {
87 local_dirs: Mutex::new(Some(local_dirs)),
88 max_temp_directory_size: self.max_temp_directory_size,
89 used_disk_space: Arc::new(AtomicU64::new(0)),
90 })
91 }
92 DiskManagerMode::Disabled => Ok(DiskManager {
93 local_dirs: Mutex::new(None),
94 max_temp_directory_size: self.max_temp_directory_size,
95 used_disk_space: Arc::new(AtomicU64::new(0)),
96 }),
97 }
98 }
99}
100
101#[derive(Clone, Debug)]
102pub enum DiskManagerMode {
103 OsTmpDirectory,
106
107 Directories(Vec<PathBuf>),
111
112 Disabled,
114}
115
116impl Default for DiskManagerMode {
117 fn default() -> Self {
118 Self::OsTmpDirectory
119 }
120}
121
122#[deprecated(since = "48.0.0", note = "Use DiskManagerBuilder instead")]
124#[derive(Debug, Clone)]
125pub enum DiskManagerConfig {
126 Existing(Arc<DiskManager>),
128
129 NewOs,
132
133 NewSpecified(Vec<PathBuf>),
136
137 Disabled,
139}
140
141#[allow(deprecated)]
142impl Default for DiskManagerConfig {
143 fn default() -> Self {
144 Self::NewOs
145 }
146}
147
148#[allow(deprecated)]
149impl DiskManagerConfig {
150 pub fn new() -> Self {
152 Self::default()
153 }
154
155 pub fn new_existing(existing: Arc<DiskManager>) -> Self {
157 Self::Existing(existing)
158 }
159
160 pub fn new_specified(paths: Vec<PathBuf>) -> Self {
162 Self::NewSpecified(paths)
163 }
164}
165
166#[derive(Debug)]
169pub struct DiskManager {
170 local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
175 max_temp_directory_size: u64,
178 used_disk_space: Arc<AtomicU64>,
181}
182
183impl DiskManager {
184 pub fn builder() -> DiskManagerBuilder {
186 DiskManagerBuilder::default()
187 }
188
189 #[allow(deprecated)]
191 #[deprecated(since = "48.0.0", note = "Use DiskManager::builder() instead")]
192 pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
193 match config {
194 DiskManagerConfig::Existing(manager) => Ok(manager),
195 DiskManagerConfig::NewOs => Ok(Arc::new(Self {
196 local_dirs: Mutex::new(Some(vec![])),
197 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
198 used_disk_space: Arc::new(AtomicU64::new(0)),
199 })),
200 DiskManagerConfig::NewSpecified(conf_dirs) => {
201 let local_dirs = create_local_dirs(conf_dirs)?;
202 debug!(
203 "Created local dirs {local_dirs:?} as DataFusion working directory"
204 );
205 Ok(Arc::new(Self {
206 local_dirs: Mutex::new(Some(local_dirs)),
207 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
208 used_disk_space: Arc::new(AtomicU64::new(0)),
209 }))
210 }
211 DiskManagerConfig::Disabled => Ok(Arc::new(Self {
212 local_dirs: Mutex::new(None),
213 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
214 used_disk_space: Arc::new(AtomicU64::new(0)),
215 })),
216 }
217 }
218
219 pub fn set_max_temp_directory_size(
220 &mut self,
221 max_temp_directory_size: u64,
222 ) -> Result<()> {
223 if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
226 return config_err!(
227 "Cannot set max temp directory size for a disk manager that spilling is disabled"
228 );
229 }
230
231 self.max_temp_directory_size = max_temp_directory_size;
232 Ok(())
233 }
234
235 pub fn set_arc_max_temp_directory_size(
236 this: &mut Arc<Self>,
237 max_temp_directory_size: u64,
238 ) -> Result<()> {
239 if let Some(inner) = Arc::get_mut(this) {
240 inner.set_max_temp_directory_size(max_temp_directory_size)?;
241 Ok(())
242 } else {
243 config_err!("DiskManager should be a single instance")
244 }
245 }
246
247 pub fn with_max_temp_directory_size(
248 mut self,
249 max_temp_directory_size: u64,
250 ) -> Result<Self> {
251 self.set_max_temp_directory_size(max_temp_directory_size)?;
252 Ok(self)
253 }
254
255 pub fn used_disk_space(&self) -> u64 {
256 self.used_disk_space.load(Ordering::Relaxed)
257 }
258
259 pub fn tmp_files_enabled(&self) -> bool {
263 self.local_dirs.lock().is_some()
264 }
265
266 pub fn create_tmp_file(
271 self: &Arc<Self>,
272 request_description: &str,
273 ) -> Result<RefCountedTempFile> {
274 let mut guard = self.local_dirs.lock();
275 let local_dirs = guard.as_mut().ok_or_else(|| {
276 resources_datafusion_err!(
277 "Memory Exhausted while {request_description} (DiskManager is disabled)"
278 )
279 })?;
280
281 if local_dirs.is_empty() {
283 let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
284
285 debug!(
286 "Created directory '{:?}' as DataFusion tempfile directory for {}",
287 tempdir.path().to_string_lossy(),
288 request_description,
289 );
290
291 local_dirs.push(Arc::new(tempdir));
292 }
293
294 let dir_index = rng().random_range(0..local_dirs.len());
295 Ok(RefCountedTempFile {
296 _parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
297 tempfile: Builder::new()
298 .tempfile_in(local_dirs[dir_index].as_ref())
299 .map_err(DataFusionError::IoError)?,
300 current_file_disk_usage: 0,
301 disk_manager: Arc::clone(self),
302 })
303 }
304}
305
306#[derive(Debug)]
315pub struct RefCountedTempFile {
316 _parent_temp_dir: Arc<TempDir>,
319 tempfile: NamedTempFile,
320 current_file_disk_usage: u64,
323 disk_manager: Arc<DiskManager>,
325}
326
327impl RefCountedTempFile {
328 pub fn path(&self) -> &Path {
329 self.tempfile.path()
330 }
331
332 pub fn inner(&self) -> &NamedTempFile {
333 &self.tempfile
334 }
335
336 pub fn update_disk_usage(&mut self) -> Result<()> {
341 let metadata = self.tempfile.as_file().metadata()?;
343 let new_disk_usage = metadata.len();
344
345 self.disk_manager
348 .used_disk_space
349 .fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
350 self.disk_manager
352 .used_disk_space
353 .fetch_add(new_disk_usage, Ordering::Relaxed);
354
355 let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
357 if global_disk_usage > self.disk_manager.max_temp_directory_size {
358 return resources_err!(
359 "The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
360 human_readable_size(self.disk_manager.max_temp_directory_size as usize)
361 );
362 }
363
364 self.current_file_disk_usage = new_disk_usage;
366
367 Ok(())
368 }
369
370 pub fn current_disk_usage(&self) -> u64 {
371 self.current_file_disk_usage
372 }
373}
374
375impl Drop for RefCountedTempFile {
377 fn drop(&mut self) {
378 self.disk_manager
380 .used_disk_space
381 .fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
382 }
383}
384
385fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> {
387 local_dirs
388 .iter()
389 .map(|root| {
390 if !Path::new(root).exists() {
391 std::fs::create_dir(root)?;
392 }
393 Builder::new()
394 .prefix("datafusion-")
395 .tempdir_in(root)
396 .map_err(DataFusionError::IoError)
397 })
398 .map(|result| result.map(Arc::new))
399 .collect()
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405
406 #[test]
407 fn lazy_temp_dir_creation() -> Result<()> {
408 let dm = Arc::new(DiskManagerBuilder::default().build()?);
410
411 assert_eq!(0, local_dir_snapshot(&dm).len());
412
413 let actual = dm.create_tmp_file("Testing")?;
415
416 assert_eq!(1, local_dir_snapshot(&dm).len());
418
419 let local_dirs = local_dir_snapshot(&dm);
421 assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
422
423 Ok(())
424 }
425
426 fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
427 dm.local_dirs
428 .lock()
429 .iter()
430 .flatten()
431 .map(|p| p.path().into())
432 .collect()
433 }
434
435 #[test]
436 fn file_in_right_dir() -> Result<()> {
437 let local_dir1 = TempDir::new()?;
438 let local_dir2 = TempDir::new()?;
439 let local_dir3 = TempDir::new()?;
440 let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
441 let dm = Arc::new(
442 DiskManagerBuilder::default()
443 .with_mode(DiskManagerMode::Directories(
444 local_dirs.iter().map(|p| p.into()).collect(),
445 ))
446 .build()?,
447 );
448
449 assert!(dm.tmp_files_enabled());
450 let actual = dm.create_tmp_file("Testing")?;
451
452 assert_path_in_dirs(actual.path(), local_dirs.into_iter());
454
455 Ok(())
456 }
457
458 #[test]
459 fn test_disabled_disk_manager() {
460 let manager = Arc::new(
461 DiskManagerBuilder::default()
462 .with_mode(DiskManagerMode::Disabled)
463 .build()
464 .unwrap(),
465 );
466 assert!(!manager.tmp_files_enabled());
467 assert_eq!(
468 manager.create_tmp_file("Testing").unwrap_err().strip_backtrace(),
469 "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
470 )
471 }
472
473 #[test]
474 fn test_disk_manager_create_spill_folder() {
475 let dir = TempDir::new().unwrap();
476 DiskManagerBuilder::default()
477 .with_mode(DiskManagerMode::Directories(vec![dir.path().to_path_buf()]))
478 .build()
479 .unwrap();
480 }
481
482 fn assert_path_in_dirs<'a>(
484 file_path: &'a Path,
485 dirs: impl Iterator<Item = &'a Path>,
486 ) {
487 let dirs: Vec<&Path> = dirs.collect();
488
489 let found = dirs.iter().any(|dir_path| {
490 file_path
491 .ancestors()
492 .any(|candidate_path| *dir_path == candidate_path)
493 });
494
495 assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
496 }
497
498 #[test]
499 fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
500 let dm = Arc::new(DiskManagerBuilder::default().build()?);
502 let temp_file = dm.create_tmp_file("Testing")?;
503 let temp_file_path = temp_file.path().to_owned();
504 assert!(temp_file_path.exists());
505
506 drop(dm);
507 assert!(temp_file_path.exists());
508
509 drop(temp_file);
510 assert!(!temp_file_path.exists());
511
512 let local_dir1 = TempDir::new()?;
514 let local_dir2 = TempDir::new()?;
515 let local_dir3 = TempDir::new()?;
516 let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
517 let dm = Arc::new(
518 DiskManagerBuilder::default()
519 .with_mode(DiskManagerMode::Directories(
520 local_dirs.iter().map(|p| p.into()).collect(),
521 ))
522 .build()?,
523 );
524 let temp_file = dm.create_tmp_file("Testing")?;
525 let temp_file_path = temp_file.path().to_owned();
526 assert!(temp_file_path.exists());
527
528 drop(dm);
529 assert!(temp_file_path.exists());
530
531 drop(temp_file);
532 assert!(!temp_file_path.exists());
533
534 Ok(())
535 }
536}