1use datafusion_common::{
21 config_err, resources_datafusion_err, resources_err, DataFusionError, Result,
22};
23use log::debug;
24use parking_lot::Mutex;
25use rand::{rng, Rng};
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tempfile::{Builder, NamedTempFile, TempDir};
30
31use crate::memory_pool::human_readable_size;
32
33const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; #[derive(Clone, Debug)]
37pub struct DiskManagerBuilder {
38 mode: DiskManagerMode,
40 max_temp_directory_size: u64,
43}
44
45impl Default for DiskManagerBuilder {
46 fn default() -> Self {
47 Self {
48 mode: DiskManagerMode::OsTmpDirectory,
49 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
50 }
51 }
52}
53
54impl DiskManagerBuilder {
55 pub fn set_mode(&mut self, mode: DiskManagerMode) {
56 self.mode = mode;
57 }
58
59 pub fn with_mode(mut self, mode: DiskManagerMode) -> Self {
60 self.set_mode(mode);
61 self
62 }
63
64 pub fn set_max_temp_directory_size(&mut self, value: u64) {
65 self.max_temp_directory_size = value;
66 }
67
68 pub fn with_max_temp_directory_size(mut self, value: u64) -> Self {
69 self.set_max_temp_directory_size(value);
70 self
71 }
72
73 pub fn build(self) -> Result<DiskManager> {
75 match self.mode {
76 DiskManagerMode::OsTmpDirectory => Ok(DiskManager {
77 local_dirs: Mutex::new(Some(vec![])),
78 max_temp_directory_size: self.max_temp_directory_size,
79 used_disk_space: Arc::new(AtomicU64::new(0)),
80 }),
81 DiskManagerMode::Directories(conf_dirs) => {
82 let local_dirs = create_local_dirs(conf_dirs)?;
83 debug!(
84 "Created local dirs {local_dirs:?} as DataFusion working directory"
85 );
86 Ok(DiskManager {
87 local_dirs: Mutex::new(Some(local_dirs)),
88 max_temp_directory_size: self.max_temp_directory_size,
89 used_disk_space: Arc::new(AtomicU64::new(0)),
90 })
91 }
92 DiskManagerMode::Disabled => Ok(DiskManager {
93 local_dirs: Mutex::new(None),
94 max_temp_directory_size: self.max_temp_directory_size,
95 used_disk_space: Arc::new(AtomicU64::new(0)),
96 }),
97 }
98 }
99}
100
101#[derive(Clone, Debug, Default)]
102pub enum DiskManagerMode {
103 #[default]
106 OsTmpDirectory,
107
108 Directories(Vec<PathBuf>),
112
113 Disabled,
115}
116
117#[allow(deprecated)]
119#[deprecated(since = "48.0.0", note = "Use DiskManagerBuilder instead")]
120#[derive(Debug, Clone, Default)]
121pub enum DiskManagerConfig {
122 Existing(Arc<DiskManager>),
124
125 #[default]
128 NewOs,
129
130 NewSpecified(Vec<PathBuf>),
133
134 Disabled,
136}
137
138#[allow(deprecated)]
139impl DiskManagerConfig {
140 pub fn new() -> Self {
142 Self::default()
143 }
144
145 pub fn new_existing(existing: Arc<DiskManager>) -> Self {
147 Self::Existing(existing)
148 }
149
150 pub fn new_specified(paths: Vec<PathBuf>) -> Self {
152 Self::NewSpecified(paths)
153 }
154}
155
156#[derive(Debug)]
159pub struct DiskManager {
160 local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
165 max_temp_directory_size: u64,
168 used_disk_space: Arc<AtomicU64>,
171}
172
173impl DiskManager {
174 pub fn builder() -> DiskManagerBuilder {
176 DiskManagerBuilder::default()
177 }
178
179 #[allow(deprecated)]
181 #[deprecated(since = "48.0.0", note = "Use DiskManager::builder() instead")]
182 pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
183 match config {
184 DiskManagerConfig::Existing(manager) => Ok(manager),
185 DiskManagerConfig::NewOs => Ok(Arc::new(Self {
186 local_dirs: Mutex::new(Some(vec![])),
187 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
188 used_disk_space: Arc::new(AtomicU64::new(0)),
189 })),
190 DiskManagerConfig::NewSpecified(conf_dirs) => {
191 let local_dirs = create_local_dirs(conf_dirs)?;
192 debug!(
193 "Created local dirs {local_dirs:?} as DataFusion working directory"
194 );
195 Ok(Arc::new(Self {
196 local_dirs: Mutex::new(Some(local_dirs)),
197 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
198 used_disk_space: Arc::new(AtomicU64::new(0)),
199 }))
200 }
201 DiskManagerConfig::Disabled => Ok(Arc::new(Self {
202 local_dirs: Mutex::new(None),
203 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
204 used_disk_space: Arc::new(AtomicU64::new(0)),
205 })),
206 }
207 }
208
209 pub fn set_max_temp_directory_size(
210 &mut self,
211 max_temp_directory_size: u64,
212 ) -> Result<()> {
213 if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
216 return config_err!(
217 "Cannot set max temp directory size for a disk manager that spilling is disabled"
218 );
219 }
220
221 self.max_temp_directory_size = max_temp_directory_size;
222 Ok(())
223 }
224
225 pub fn set_arc_max_temp_directory_size(
226 this: &mut Arc<Self>,
227 max_temp_directory_size: u64,
228 ) -> Result<()> {
229 if let Some(inner) = Arc::get_mut(this) {
230 inner.set_max_temp_directory_size(max_temp_directory_size)?;
231 Ok(())
232 } else {
233 config_err!("DiskManager should be a single instance")
234 }
235 }
236
237 pub fn with_max_temp_directory_size(
238 mut self,
239 max_temp_directory_size: u64,
240 ) -> Result<Self> {
241 self.set_max_temp_directory_size(max_temp_directory_size)?;
242 Ok(self)
243 }
244
245 pub fn used_disk_space(&self) -> u64 {
246 self.used_disk_space.load(Ordering::Relaxed)
247 }
248
249 pub fn tmp_files_enabled(&self) -> bool {
253 self.local_dirs.lock().is_some()
254 }
255
256 pub fn create_tmp_file(
261 self: &Arc<Self>,
262 request_description: &str,
263 ) -> Result<RefCountedTempFile> {
264 let mut guard = self.local_dirs.lock();
265 let local_dirs = guard.as_mut().ok_or_else(|| {
266 resources_datafusion_err!(
267 "Memory Exhausted while {request_description} (DiskManager is disabled)"
268 )
269 })?;
270
271 if local_dirs.is_empty() {
273 let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
274
275 debug!(
276 "Created directory '{:?}' as DataFusion tempfile directory for {}",
277 tempdir.path().to_string_lossy(),
278 request_description,
279 );
280
281 local_dirs.push(Arc::new(tempdir));
282 }
283
284 let dir_index = rng().random_range(0..local_dirs.len());
285 Ok(RefCountedTempFile {
286 parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
287 tempfile: Arc::new(
288 Builder::new()
289 .tempfile_in(local_dirs[dir_index].as_ref())
290 .map_err(DataFusionError::IoError)?,
291 ),
292 current_file_disk_usage: Arc::new(AtomicU64::new(0)),
293 disk_manager: Arc::clone(self),
294 })
295 }
296}
297
298#[derive(Debug)]
316pub struct RefCountedTempFile {
317 parent_temp_dir: Arc<TempDir>,
320 tempfile: Arc<NamedTempFile>,
322 current_file_disk_usage: Arc<AtomicU64>,
328 disk_manager: Arc<DiskManager>,
330}
331
332impl Clone for RefCountedTempFile {
333 fn clone(&self) -> Self {
334 Self {
335 parent_temp_dir: Arc::clone(&self.parent_temp_dir),
336 tempfile: Arc::clone(&self.tempfile),
337 current_file_disk_usage: Arc::clone(&self.current_file_disk_usage),
338 disk_manager: Arc::clone(&self.disk_manager),
339 }
340 }
341}
342
343impl RefCountedTempFile {
344 pub fn path(&self) -> &Path {
345 self.tempfile.path()
346 }
347
348 pub fn inner(&self) -> &NamedTempFile {
349 self.tempfile.as_ref()
350 }
351
352 pub fn update_disk_usage(&mut self) -> Result<()> {
357 let metadata = self.tempfile.as_file().metadata()?;
359 let new_disk_usage = metadata.len();
360
361 let old_disk_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
363
364 self.disk_manager
367 .used_disk_space
368 .fetch_sub(old_disk_usage, Ordering::Relaxed);
369 self.disk_manager
371 .used_disk_space
372 .fetch_add(new_disk_usage, Ordering::Relaxed);
373
374 let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
376 if global_disk_usage > self.disk_manager.max_temp_directory_size {
377 return resources_err!(
378 "The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
379 human_readable_size(self.disk_manager.max_temp_directory_size as usize)
380 );
381 }
382
383 self.current_file_disk_usage
385 .store(new_disk_usage, Ordering::Relaxed);
386
387 Ok(())
388 }
389
390 pub fn current_disk_usage(&self) -> u64 {
391 self.current_file_disk_usage.load(Ordering::Relaxed)
392 }
393}
394
395impl Drop for RefCountedTempFile {
397 fn drop(&mut self) {
398 if Arc::strong_count(&self.tempfile) == 1 {
402 let current_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
403 self.disk_manager
404 .used_disk_space
405 .fetch_sub(current_usage, Ordering::Relaxed);
406 }
407 }
408}
409
410fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> {
412 local_dirs
413 .iter()
414 .map(|root| {
415 if !Path::new(root).exists() {
416 std::fs::create_dir(root)?;
417 }
418 Builder::new()
419 .prefix("datafusion-")
420 .tempdir_in(root)
421 .map_err(DataFusionError::IoError)
422 })
423 .map(|result| result.map(Arc::new))
424 .collect()
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430
431 #[test]
432 fn lazy_temp_dir_creation() -> Result<()> {
433 let dm = Arc::new(DiskManagerBuilder::default().build()?);
435
436 assert_eq!(0, local_dir_snapshot(&dm).len());
437
438 let actual = dm.create_tmp_file("Testing")?;
440
441 assert_eq!(1, local_dir_snapshot(&dm).len());
443
444 let local_dirs = local_dir_snapshot(&dm);
446 assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
447
448 Ok(())
449 }
450
451 fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
452 dm.local_dirs
453 .lock()
454 .iter()
455 .flatten()
456 .map(|p| p.path().into())
457 .collect()
458 }
459
460 #[test]
461 fn file_in_right_dir() -> Result<()> {
462 let local_dir1 = TempDir::new()?;
463 let local_dir2 = TempDir::new()?;
464 let local_dir3 = TempDir::new()?;
465 let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
466 let dm = Arc::new(
467 DiskManagerBuilder::default()
468 .with_mode(DiskManagerMode::Directories(
469 local_dirs.iter().map(|p| p.into()).collect(),
470 ))
471 .build()?,
472 );
473
474 assert!(dm.tmp_files_enabled());
475 let actual = dm.create_tmp_file("Testing")?;
476
477 assert_path_in_dirs(actual.path(), local_dirs.into_iter());
479
480 Ok(())
481 }
482
483 #[test]
484 fn test_disabled_disk_manager() {
485 let manager = Arc::new(
486 DiskManagerBuilder::default()
487 .with_mode(DiskManagerMode::Disabled)
488 .build()
489 .unwrap(),
490 );
491 assert!(!manager.tmp_files_enabled());
492 assert_eq!(
493 manager.create_tmp_file("Testing").unwrap_err().strip_backtrace(),
494 "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
495 )
496 }
497
498 #[test]
499 fn test_disk_manager_create_spill_folder() {
500 let dir = TempDir::new().unwrap();
501 DiskManagerBuilder::default()
502 .with_mode(DiskManagerMode::Directories(vec![dir.path().to_path_buf()]))
503 .build()
504 .unwrap();
505 }
506
507 fn assert_path_in_dirs<'a>(
509 file_path: &'a Path,
510 dirs: impl Iterator<Item = &'a Path>,
511 ) {
512 let dirs: Vec<&Path> = dirs.collect();
513
514 let found = dirs.iter().any(|dir_path| {
515 file_path
516 .ancestors()
517 .any(|candidate_path| *dir_path == candidate_path)
518 });
519
520 assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
521 }
522
523 #[test]
524 fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
525 let dm = Arc::new(DiskManagerBuilder::default().build()?);
527 let temp_file = dm.create_tmp_file("Testing")?;
528 let temp_file_path = temp_file.path().to_owned();
529 assert!(temp_file_path.exists());
530
531 drop(dm);
532 assert!(temp_file_path.exists());
533
534 drop(temp_file);
535 assert!(!temp_file_path.exists());
536
537 let local_dir1 = TempDir::new()?;
539 let local_dir2 = TempDir::new()?;
540 let local_dir3 = TempDir::new()?;
541 let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
542 let dm = Arc::new(
543 DiskManagerBuilder::default()
544 .with_mode(DiskManagerMode::Directories(
545 local_dirs.iter().map(|p| p.into()).collect(),
546 ))
547 .build()?,
548 );
549 let temp_file = dm.create_tmp_file("Testing")?;
550 let temp_file_path = temp_file.path().to_owned();
551 assert!(temp_file_path.exists());
552
553 drop(dm);
554 assert!(temp_file_path.exists());
555
556 drop(temp_file);
557 assert!(!temp_file_path.exists());
558
559 Ok(())
560 }
561
562 #[test]
563 fn test_disk_usage_basic() -> Result<()> {
564 use std::io::Write;
565
566 let dm = Arc::new(DiskManagerBuilder::default().build()?);
567 let mut temp_file = dm.create_tmp_file("Testing")?;
568
569 assert_eq!(dm.used_disk_space(), 0);
571 assert_eq!(temp_file.current_disk_usage(), 0);
572
573 temp_file.inner().as_file().write_all(b"hello world")?;
575 temp_file.update_disk_usage()?;
576
577 let expected_usage = temp_file.current_disk_usage();
579 assert!(expected_usage > 0);
580 assert_eq!(dm.used_disk_space(), expected_usage);
581
582 temp_file.inner().as_file().write_all(b" more data")?;
584 temp_file.update_disk_usage()?;
585
586 let new_usage = temp_file.current_disk_usage();
588 assert!(new_usage > expected_usage);
589 assert_eq!(dm.used_disk_space(), new_usage);
590
591 drop(temp_file);
593
594 assert_eq!(dm.used_disk_space(), 0);
596
597 Ok(())
598 }
599
600 #[test]
601 fn test_disk_usage_with_clones() -> Result<()> {
602 use std::io::Write;
603
604 let dm = Arc::new(DiskManagerBuilder::default().build()?);
605 let mut temp_file = dm.create_tmp_file("Testing")?;
606
607 temp_file.inner().as_file().write_all(b"test data")?;
609 temp_file.update_disk_usage()?;
610
611 let usage_after_write = temp_file.current_disk_usage();
612 assert!(usage_after_write > 0);
613 assert_eq!(dm.used_disk_space(), usage_after_write);
614
615 let clone1 = temp_file.clone();
617 let clone2 = temp_file.clone();
618
619 assert_eq!(clone1.current_disk_usage(), usage_after_write);
621 assert_eq!(clone2.current_disk_usage(), usage_after_write);
622
623 assert_eq!(dm.used_disk_space(), usage_after_write);
625
626 clone1.inner().as_file().write_all(b" more data")?;
628 let mut mutable_clone1 = clone1;
629 mutable_clone1.update_disk_usage()?;
630
631 let new_usage = mutable_clone1.current_disk_usage();
632 assert!(new_usage > usage_after_write);
633
634 assert_eq!(temp_file.current_disk_usage(), new_usage);
636 assert_eq!(clone2.current_disk_usage(), new_usage);
637 assert_eq!(mutable_clone1.current_disk_usage(), new_usage);
638
639 assert_eq!(dm.used_disk_space(), new_usage);
641
642 drop(mutable_clone1);
644
645 assert_eq!(dm.used_disk_space(), new_usage);
647 assert_eq!(temp_file.current_disk_usage(), new_usage);
648 assert_eq!(clone2.current_disk_usage(), new_usage);
649
650 drop(clone2);
652
653 assert_eq!(dm.used_disk_space(), new_usage);
655 assert_eq!(temp_file.current_disk_usage(), new_usage);
656
657 drop(temp_file);
659
660 assert_eq!(dm.used_disk_space(), 0);
662
663 Ok(())
664 }
665
666 #[test]
667 fn test_disk_usage_clones_dropped_out_of_order() -> Result<()> {
668 use std::io::Write;
669
670 let dm = Arc::new(DiskManagerBuilder::default().build()?);
671 let mut temp_file = dm.create_tmp_file("Testing")?;
672
673 temp_file.inner().as_file().write_all(b"test")?;
675 temp_file.update_disk_usage()?;
676
677 let usage = temp_file.current_disk_usage();
678 assert_eq!(dm.used_disk_space(), usage);
679
680 let clone1 = temp_file.clone();
682 let clone2 = temp_file.clone();
683 let clone3 = temp_file.clone();
684
685 drop(temp_file);
687
688 assert_eq!(dm.used_disk_space(), usage);
690 assert_eq!(clone1.current_disk_usage(), usage);
691
692 drop(clone2);
694 assert_eq!(dm.used_disk_space(), usage);
695
696 drop(clone1);
697 assert_eq!(dm.used_disk_space(), usage);
698
699 drop(clone3);
701
702 assert_eq!(dm.used_disk_space(), 0);
704
705 Ok(())
706 }
707
708 #[test]
709 fn test_disk_usage_multiple_files() -> Result<()> {
710 use std::io::Write;
711
712 let dm = Arc::new(DiskManagerBuilder::default().build()?);
713
714 let mut file1 = dm.create_tmp_file("Testing1")?;
716 let mut file2 = dm.create_tmp_file("Testing2")?;
717
718 file1.inner().as_file().write_all(b"file1")?;
720 file1.update_disk_usage()?;
721 let usage1 = file1.current_disk_usage();
722
723 assert_eq!(dm.used_disk_space(), usage1);
724
725 file2.inner().as_file().write_all(b"file2 data")?;
727 file2.update_disk_usage()?;
728 let usage2 = file2.current_disk_usage();
729
730 assert_eq!(dm.used_disk_space(), usage1 + usage2);
732
733 drop(file1);
735
736 assert_eq!(dm.used_disk_space(), usage2);
738
739 drop(file2);
741
742 assert_eq!(dm.used_disk_space(), 0);
744
745 Ok(())
746 }
747}