1use datafusion_common::{
21 DataFusionError, Result, config_err, resources_datafusion_err, resources_err,
22};
23use log::debug;
24use parking_lot::Mutex;
25use rand::{Rng, rng};
26use std::path::{Path, PathBuf};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
29use tempfile::{Builder, NamedTempFile, TempDir};
30
31use datafusion_common::human_readable_size;
32
33pub const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; #[derive(Clone, Debug)]
37pub struct DiskManagerBuilder {
38 mode: DiskManagerMode,
40 max_temp_directory_size: u64,
43}
44
45impl Default for DiskManagerBuilder {
46 fn default() -> Self {
47 Self {
48 mode: DiskManagerMode::OsTmpDirectory,
49 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
50 }
51 }
52}
53
54impl DiskManagerBuilder {
55 pub fn set_mode(&mut self, mode: DiskManagerMode) {
56 self.mode = mode;
57 }
58
59 pub fn with_mode(mut self, mode: DiskManagerMode) -> Self {
60 self.set_mode(mode);
61 self
62 }
63
64 pub fn set_max_temp_directory_size(&mut self, value: u64) {
65 self.max_temp_directory_size = value;
66 }
67
68 pub fn with_max_temp_directory_size(mut self, value: u64) -> Self {
69 self.set_max_temp_directory_size(value);
70 self
71 }
72
73 pub fn build(self) -> Result<DiskManager> {
75 match self.mode {
76 DiskManagerMode::OsTmpDirectory => Ok(DiskManager {
77 local_dirs: Mutex::new(Some(vec![])),
78 max_temp_directory_size: self.max_temp_directory_size,
79 used_disk_space: Arc::new(AtomicU64::new(0)),
80 active_files_count: Arc::new(AtomicUsize::new(0)),
81 }),
82 DiskManagerMode::Directories(conf_dirs) => {
83 let local_dirs = create_local_dirs(&conf_dirs)?;
84 debug!(
85 "Created local dirs {local_dirs:?} as DataFusion working directory"
86 );
87 Ok(DiskManager {
88 local_dirs: Mutex::new(Some(local_dirs)),
89 max_temp_directory_size: self.max_temp_directory_size,
90 used_disk_space: Arc::new(AtomicU64::new(0)),
91 active_files_count: Arc::new(AtomicUsize::new(0)),
92 })
93 }
94 DiskManagerMode::Disabled => Ok(DiskManager {
95 local_dirs: Mutex::new(None),
96 max_temp_directory_size: self.max_temp_directory_size,
97 used_disk_space: Arc::new(AtomicU64::new(0)),
98 active_files_count: Arc::new(AtomicUsize::new(0)),
99 }),
100 }
101 }
102}
103
104#[derive(Clone, Debug, Default)]
105pub enum DiskManagerMode {
106 #[default]
109 OsTmpDirectory,
110
111 Directories(Vec<PathBuf>),
115
116 Disabled,
118}
119
120#[deprecated(since = "48.0.0", note = "Use DiskManagerBuilder instead")]
122#[derive(Debug, Clone, Default)]
123#[allow(clippy::allow_attributes)]
124#[allow(deprecated)]
125pub enum DiskManagerConfig {
126 Existing(Arc<DiskManager>),
128
129 #[default]
132 NewOs,
133
134 NewSpecified(Vec<PathBuf>),
137
138 Disabled,
140}
141
142#[expect(deprecated)]
143impl DiskManagerConfig {
144 pub fn new() -> Self {
146 Self::default()
147 }
148
149 pub fn new_existing(existing: Arc<DiskManager>) -> Self {
151 Self::Existing(existing)
152 }
153
154 pub fn new_specified(paths: Vec<PathBuf>) -> Self {
156 Self::NewSpecified(paths)
157 }
158}
159
160#[derive(Debug)]
163pub struct DiskManager {
164 local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
169 max_temp_directory_size: u64,
172 used_disk_space: Arc<AtomicU64>,
175 active_files_count: Arc<AtomicUsize>,
177}
178
179#[derive(Debug, Clone, Copy)]
181pub struct SpillingProgress {
182 pub current_bytes: u64,
184 pub active_files_count: usize,
186}
187
188impl DiskManager {
189 pub fn builder() -> DiskManagerBuilder {
191 DiskManagerBuilder::default()
192 }
193
194 #[expect(deprecated)]
196 #[deprecated(since = "48.0.0", note = "Use DiskManager::builder() instead")]
197 pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
198 match config {
199 DiskManagerConfig::Existing(manager) => Ok(manager),
200 DiskManagerConfig::NewOs => Ok(Arc::new(Self {
201 local_dirs: Mutex::new(Some(vec![])),
202 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
203 used_disk_space: Arc::new(AtomicU64::new(0)),
204 active_files_count: Arc::new(AtomicUsize::new(0)),
205 })),
206 DiskManagerConfig::NewSpecified(conf_dirs) => {
207 let local_dirs = create_local_dirs(&conf_dirs)?;
208 debug!(
209 "Created local dirs {local_dirs:?} as DataFusion working directory"
210 );
211 Ok(Arc::new(Self {
212 local_dirs: Mutex::new(Some(local_dirs)),
213 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
214 used_disk_space: Arc::new(AtomicU64::new(0)),
215 active_files_count: Arc::new(AtomicUsize::new(0)),
216 }))
217 }
218 DiskManagerConfig::Disabled => Ok(Arc::new(Self {
219 local_dirs: Mutex::new(None),
220 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
221 used_disk_space: Arc::new(AtomicU64::new(0)),
222 active_files_count: Arc::new(AtomicUsize::new(0)),
223 })),
224 }
225 }
226
227 pub fn set_max_temp_directory_size(
228 &mut self,
229 max_temp_directory_size: u64,
230 ) -> Result<()> {
231 if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
234 return config_err!(
235 "Cannot set max temp directory size for a disk manager that spilling is disabled"
236 );
237 }
238
239 self.max_temp_directory_size = max_temp_directory_size;
240 Ok(())
241 }
242
243 pub fn set_arc_max_temp_directory_size(
244 this: &mut Arc<Self>,
245 max_temp_directory_size: u64,
246 ) -> Result<()> {
247 if let Some(inner) = Arc::get_mut(this) {
248 inner.set_max_temp_directory_size(max_temp_directory_size)?;
249 Ok(())
250 } else {
251 config_err!("DiskManager should be a single instance")
252 }
253 }
254
255 pub fn with_max_temp_directory_size(
256 mut self,
257 max_temp_directory_size: u64,
258 ) -> Result<Self> {
259 self.set_max_temp_directory_size(max_temp_directory_size)?;
260 Ok(self)
261 }
262
263 pub fn used_disk_space(&self) -> u64 {
264 self.used_disk_space.load(Ordering::Relaxed)
265 }
266
267 pub fn max_temp_directory_size(&self) -> u64 {
269 self.max_temp_directory_size
270 }
271
272 pub fn spilling_progress(&self) -> SpillingProgress {
274 SpillingProgress {
275 current_bytes: self.used_disk_space.load(Ordering::Relaxed),
276 active_files_count: self.active_files_count.load(Ordering::Relaxed),
277 }
278 }
279
280 pub fn temp_dir_paths(&self) -> Vec<PathBuf> {
282 self.local_dirs
283 .lock()
284 .as_ref()
285 .map(|dirs| {
286 dirs.iter()
287 .map(|temp_dir| temp_dir.path().to_path_buf())
288 .collect()
289 })
290 .unwrap_or_default()
291 }
292
293 pub fn tmp_files_enabled(&self) -> bool {
297 self.local_dirs.lock().is_some()
298 }
299
300 pub fn create_tmp_file(
305 self: &Arc<Self>,
306 request_description: &str,
307 ) -> Result<RefCountedTempFile> {
308 let mut guard = self.local_dirs.lock();
309 let local_dirs = guard.as_mut().ok_or_else(|| {
310 resources_datafusion_err!(
311 "Memory Exhausted while {request_description} (DiskManager is disabled)"
312 )
313 })?;
314
315 if local_dirs.is_empty() {
317 let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
318
319 debug!(
320 "Created directory '{:?}' as DataFusion tempfile directory for {}",
321 tempdir.path().to_string_lossy(),
322 request_description,
323 );
324
325 local_dirs.push(Arc::new(tempdir));
326 }
327
328 let dir_index = rng().random_range(0..local_dirs.len());
329 self.active_files_count.fetch_add(1, Ordering::Relaxed);
330 Ok(RefCountedTempFile {
331 parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
332 tempfile: Arc::new(
333 Builder::new()
334 .tempfile_in(local_dirs[dir_index].as_ref())
335 .map_err(DataFusionError::IoError)?,
336 ),
337 current_file_disk_usage: Arc::new(AtomicU64::new(0)),
338 disk_manager: Arc::clone(self),
339 })
340 }
341}
342
343#[derive(Debug)]
361pub struct RefCountedTempFile {
362 parent_temp_dir: Arc<TempDir>,
365 tempfile: Arc<NamedTempFile>,
367 current_file_disk_usage: Arc<AtomicU64>,
373 disk_manager: Arc<DiskManager>,
375}
376
377impl Clone for RefCountedTempFile {
378 fn clone(&self) -> Self {
379 Self {
380 parent_temp_dir: Arc::clone(&self.parent_temp_dir),
381 tempfile: Arc::clone(&self.tempfile),
382 current_file_disk_usage: Arc::clone(&self.current_file_disk_usage),
383 disk_manager: Arc::clone(&self.disk_manager),
384 }
385 }
386}
387
388impl RefCountedTempFile {
389 pub fn path(&self) -> &Path {
390 self.tempfile.path()
391 }
392
393 pub fn inner(&self) -> &NamedTempFile {
394 self.tempfile.as_ref()
395 }
396
397 pub fn update_disk_usage(&mut self) -> Result<()> {
402 let metadata = self.tempfile.as_file().metadata()?;
404 let new_disk_usage = metadata.len();
405
406 let old_disk_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
408
409 self.disk_manager
412 .used_disk_space
413 .fetch_sub(old_disk_usage, Ordering::Relaxed);
414 self.disk_manager
416 .used_disk_space
417 .fetch_add(new_disk_usage, Ordering::Relaxed);
418
419 let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
421 if global_disk_usage > self.disk_manager.max_temp_directory_size {
422 return resources_err!(
423 "The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
424 human_readable_size(self.disk_manager.max_temp_directory_size as usize)
425 );
426 }
427
428 self.current_file_disk_usage
430 .store(new_disk_usage, Ordering::Relaxed);
431
432 Ok(())
433 }
434
435 pub fn current_disk_usage(&self) -> u64 {
436 self.current_file_disk_usage.load(Ordering::Relaxed)
437 }
438}
439
440impl Drop for RefCountedTempFile {
442 fn drop(&mut self) {
443 if Arc::strong_count(&self.tempfile) == 1 {
447 let current_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
448 self.disk_manager
449 .used_disk_space
450 .fetch_sub(current_usage, Ordering::Relaxed);
451 self.disk_manager
452 .active_files_count
453 .fetch_sub(1, Ordering::Relaxed);
454 }
455 }
456}
457
458fn create_local_dirs(local_dirs: &[PathBuf]) -> Result<Vec<Arc<TempDir>>> {
460 local_dirs
461 .iter()
462 .map(|root| {
463 if !Path::new(root).exists() {
464 std::fs::create_dir(root)?;
465 }
466 Builder::new()
467 .prefix("datafusion-")
468 .tempdir_in(root)
469 .map_err(DataFusionError::IoError)
470 })
471 .map(|result| result.map(Arc::new))
472 .collect()
473}
474
475#[cfg(test)]
476mod tests {
477 use super::*;
478
479 #[test]
480 fn lazy_temp_dir_creation() -> Result<()> {
481 let dm = Arc::new(DiskManagerBuilder::default().build()?);
483
484 assert_eq!(0, local_dir_snapshot(&dm).len());
485
486 let actual = dm.create_tmp_file("Testing")?;
488
489 assert_eq!(1, local_dir_snapshot(&dm).len());
491
492 let local_dirs = local_dir_snapshot(&dm);
494 assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
495
496 Ok(())
497 }
498
499 fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
500 dm.local_dirs
501 .lock()
502 .iter()
503 .flatten()
504 .map(|p| p.path().into())
505 .collect()
506 }
507
508 #[test]
509 fn file_in_right_dir() -> Result<()> {
510 let local_dir1 = TempDir::new()?;
511 let local_dir2 = TempDir::new()?;
512 let local_dir3 = TempDir::new()?;
513 let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
514 let dm = Arc::new(
515 DiskManagerBuilder::default()
516 .with_mode(DiskManagerMode::Directories(
517 local_dirs.iter().map(|p| p.into()).collect(),
518 ))
519 .build()?,
520 );
521
522 assert!(dm.tmp_files_enabled());
523 let actual = dm.create_tmp_file("Testing")?;
524
525 assert_path_in_dirs(actual.path(), local_dirs.into_iter());
527
528 Ok(())
529 }
530
531 #[test]
532 fn test_disabled_disk_manager() {
533 let manager = Arc::new(
534 DiskManagerBuilder::default()
535 .with_mode(DiskManagerMode::Disabled)
536 .build()
537 .unwrap(),
538 );
539 assert!(!manager.tmp_files_enabled());
540 assert_eq!(
541 manager
542 .create_tmp_file("Testing")
543 .unwrap_err()
544 .strip_backtrace(),
545 "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
546 )
547 }
548
549 #[test]
550 fn test_disk_manager_create_spill_folder() {
551 let dir = TempDir::new().unwrap();
552 DiskManagerBuilder::default()
553 .with_mode(DiskManagerMode::Directories(vec![dir.path().to_path_buf()]))
554 .build()
555 .unwrap();
556 }
557
558 fn assert_path_in_dirs<'a>(
560 file_path: &'a Path,
561 dirs: impl Iterator<Item = &'a Path>,
562 ) {
563 let dirs: Vec<&Path> = dirs.collect();
564
565 let found = dirs.iter().any(|dir_path| {
566 file_path
567 .ancestors()
568 .any(|candidate_path| *dir_path == candidate_path)
569 });
570
571 assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
572 }
573
574 #[test]
575 fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
576 let dm = Arc::new(DiskManagerBuilder::default().build()?);
578 let temp_file = dm.create_tmp_file("Testing")?;
579 let temp_file_path = temp_file.path().to_owned();
580 assert!(temp_file_path.exists());
581
582 drop(dm);
583 assert!(temp_file_path.exists());
584
585 drop(temp_file);
586 assert!(!temp_file_path.exists());
587
588 let local_dir1 = TempDir::new()?;
590 let local_dir2 = TempDir::new()?;
591 let local_dir3 = TempDir::new()?;
592 let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
593 let dm = Arc::new(
594 DiskManagerBuilder::default()
595 .with_mode(DiskManagerMode::Directories(
596 local_dirs.iter().map(|p| p.into()).collect(),
597 ))
598 .build()?,
599 );
600 let temp_file = dm.create_tmp_file("Testing")?;
601 let temp_file_path = temp_file.path().to_owned();
602 assert!(temp_file_path.exists());
603
604 drop(dm);
605 assert!(temp_file_path.exists());
606
607 drop(temp_file);
608 assert!(!temp_file_path.exists());
609
610 Ok(())
611 }
612
613 #[test]
614 fn test_disk_usage_basic() -> Result<()> {
615 use std::io::Write;
616
617 let dm = Arc::new(DiskManagerBuilder::default().build()?);
618 let mut temp_file = dm.create_tmp_file("Testing")?;
619
620 assert_eq!(dm.used_disk_space(), 0);
622 assert_eq!(temp_file.current_disk_usage(), 0);
623
624 temp_file.inner().as_file().write_all(b"hello world")?;
626 temp_file.update_disk_usage()?;
627
628 let expected_usage = temp_file.current_disk_usage();
630 assert!(expected_usage > 0);
631 assert_eq!(dm.used_disk_space(), expected_usage);
632
633 temp_file.inner().as_file().write_all(b" more data")?;
635 temp_file.update_disk_usage()?;
636
637 let new_usage = temp_file.current_disk_usage();
639 assert!(new_usage > expected_usage);
640 assert_eq!(dm.used_disk_space(), new_usage);
641
642 drop(temp_file);
644
645 assert_eq!(dm.used_disk_space(), 0);
647
648 Ok(())
649 }
650
651 #[test]
652 fn test_disk_usage_with_clones() -> Result<()> {
653 use std::io::Write;
654
655 let dm = Arc::new(DiskManagerBuilder::default().build()?);
656 let mut temp_file = dm.create_tmp_file("Testing")?;
657
658 temp_file.inner().as_file().write_all(b"test data")?;
660 temp_file.update_disk_usage()?;
661
662 let usage_after_write = temp_file.current_disk_usage();
663 assert!(usage_after_write > 0);
664 assert_eq!(dm.used_disk_space(), usage_after_write);
665
666 let clone1 = temp_file.clone();
668 let clone2 = temp_file.clone();
669
670 assert_eq!(clone1.current_disk_usage(), usage_after_write);
672 assert_eq!(clone2.current_disk_usage(), usage_after_write);
673
674 assert_eq!(dm.used_disk_space(), usage_after_write);
676
677 clone1.inner().as_file().write_all(b" more data")?;
679 let mut mutable_clone1 = clone1;
680 mutable_clone1.update_disk_usage()?;
681
682 let new_usage = mutable_clone1.current_disk_usage();
683 assert!(new_usage > usage_after_write);
684
685 assert_eq!(temp_file.current_disk_usage(), new_usage);
687 assert_eq!(clone2.current_disk_usage(), new_usage);
688 assert_eq!(mutable_clone1.current_disk_usage(), new_usage);
689
690 assert_eq!(dm.used_disk_space(), new_usage);
692
693 drop(mutable_clone1);
695
696 assert_eq!(dm.used_disk_space(), new_usage);
698 assert_eq!(temp_file.current_disk_usage(), new_usage);
699 assert_eq!(clone2.current_disk_usage(), new_usage);
700
701 drop(clone2);
703
704 assert_eq!(dm.used_disk_space(), new_usage);
706 assert_eq!(temp_file.current_disk_usage(), new_usage);
707
708 drop(temp_file);
710
711 assert_eq!(dm.used_disk_space(), 0);
713
714 Ok(())
715 }
716
717 #[test]
718 fn test_disk_usage_clones_dropped_out_of_order() -> Result<()> {
719 use std::io::Write;
720
721 let dm = Arc::new(DiskManagerBuilder::default().build()?);
722 let mut temp_file = dm.create_tmp_file("Testing")?;
723
724 temp_file.inner().as_file().write_all(b"test")?;
726 temp_file.update_disk_usage()?;
727
728 let usage = temp_file.current_disk_usage();
729 assert_eq!(dm.used_disk_space(), usage);
730
731 let clone1 = temp_file.clone();
733 let clone2 = temp_file.clone();
734 let clone3 = temp_file.clone();
735
736 drop(temp_file);
738
739 assert_eq!(dm.used_disk_space(), usage);
741 assert_eq!(clone1.current_disk_usage(), usage);
742
743 drop(clone2);
745 assert_eq!(dm.used_disk_space(), usage);
746
747 drop(clone1);
748 assert_eq!(dm.used_disk_space(), usage);
749
750 drop(clone3);
752
753 assert_eq!(dm.used_disk_space(), 0);
755
756 Ok(())
757 }
758
759 #[test]
760 fn test_disk_usage_multiple_files() -> Result<()> {
761 use std::io::Write;
762
763 let dm = Arc::new(DiskManagerBuilder::default().build()?);
764
765 let mut file1 = dm.create_tmp_file("Testing1")?;
767 let mut file2 = dm.create_tmp_file("Testing2")?;
768
769 file1.inner().as_file().write_all(b"file1")?;
771 file1.update_disk_usage()?;
772 let usage1 = file1.current_disk_usage();
773
774 assert_eq!(dm.used_disk_space(), usage1);
775
776 file2.inner().as_file().write_all(b"file2 data")?;
778 file2.update_disk_usage()?;
779 let usage2 = file2.current_disk_usage();
780
781 assert_eq!(dm.used_disk_space(), usage1 + usage2);
783
784 drop(file1);
786
787 assert_eq!(dm.used_disk_space(), usage2);
789
790 drop(file2);
792
793 assert_eq!(dm.used_disk_space(), 0);
795
796 Ok(())
797 }
798}