1use datafusion_common::{
21 DataFusionError, Result, config_err, resources_datafusion_err, resources_err,
22};
23use log::debug;
24use parking_lot::Mutex;
25use rand::{Rng, rng};
26use std::path::{Path, PathBuf};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use tempfile::{Builder, NamedTempFile, TempDir};
30
31use datafusion_common::human_readable_size;
32
33pub const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; #[derive(Clone, Debug)]
37pub struct DiskManagerBuilder {
38 mode: DiskManagerMode,
40 max_temp_directory_size: u64,
43}
44
45impl Default for DiskManagerBuilder {
46 fn default() -> Self {
47 Self {
48 mode: DiskManagerMode::OsTmpDirectory,
49 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
50 }
51 }
52}
53
54impl DiskManagerBuilder {
55 pub fn set_mode(&mut self, mode: DiskManagerMode) {
56 self.mode = mode;
57 }
58
59 pub fn with_mode(mut self, mode: DiskManagerMode) -> Self {
60 self.set_mode(mode);
61 self
62 }
63
64 pub fn set_max_temp_directory_size(&mut self, value: u64) {
65 self.max_temp_directory_size = value;
66 }
67
68 pub fn with_max_temp_directory_size(mut self, value: u64) -> Self {
69 self.set_max_temp_directory_size(value);
70 self
71 }
72
73 pub fn build(self) -> Result<DiskManager> {
75 match self.mode {
76 DiskManagerMode::OsTmpDirectory => Ok(DiskManager {
77 local_dirs: Mutex::new(Some(vec![])),
78 max_temp_directory_size: self.max_temp_directory_size,
79 used_disk_space: Arc::new(AtomicU64::new(0)),
80 }),
81 DiskManagerMode::Directories(conf_dirs) => {
82 let local_dirs = create_local_dirs(&conf_dirs)?;
83 debug!(
84 "Created local dirs {local_dirs:?} as DataFusion working directory"
85 );
86 Ok(DiskManager {
87 local_dirs: Mutex::new(Some(local_dirs)),
88 max_temp_directory_size: self.max_temp_directory_size,
89 used_disk_space: Arc::new(AtomicU64::new(0)),
90 })
91 }
92 DiskManagerMode::Disabled => Ok(DiskManager {
93 local_dirs: Mutex::new(None),
94 max_temp_directory_size: self.max_temp_directory_size,
95 used_disk_space: Arc::new(AtomicU64::new(0)),
96 }),
97 }
98 }
99}
100
101#[derive(Clone, Debug, Default)]
102pub enum DiskManagerMode {
103 #[default]
106 OsTmpDirectory,
107
108 Directories(Vec<PathBuf>),
112
113 Disabled,
115}
116
117#[deprecated(since = "48.0.0", note = "Use DiskManagerBuilder instead")]
119#[derive(Debug, Clone, Default)]
120#[allow(clippy::allow_attributes)]
121#[allow(deprecated)]
122pub enum DiskManagerConfig {
123 Existing(Arc<DiskManager>),
125
126 #[default]
129 NewOs,
130
131 NewSpecified(Vec<PathBuf>),
134
135 Disabled,
137}
138
139#[expect(deprecated)]
140impl DiskManagerConfig {
141 pub fn new() -> Self {
143 Self::default()
144 }
145
146 pub fn new_existing(existing: Arc<DiskManager>) -> Self {
148 Self::Existing(existing)
149 }
150
151 pub fn new_specified(paths: Vec<PathBuf>) -> Self {
153 Self::NewSpecified(paths)
154 }
155}
156
157#[derive(Debug)]
160pub struct DiskManager {
161 local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
166 max_temp_directory_size: u64,
169 used_disk_space: Arc<AtomicU64>,
172}
173
174impl DiskManager {
175 pub fn builder() -> DiskManagerBuilder {
177 DiskManagerBuilder::default()
178 }
179
180 #[expect(deprecated)]
182 #[deprecated(since = "48.0.0", note = "Use DiskManager::builder() instead")]
183 pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
184 match config {
185 DiskManagerConfig::Existing(manager) => Ok(manager),
186 DiskManagerConfig::NewOs => Ok(Arc::new(Self {
187 local_dirs: Mutex::new(Some(vec![])),
188 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
189 used_disk_space: Arc::new(AtomicU64::new(0)),
190 })),
191 DiskManagerConfig::NewSpecified(conf_dirs) => {
192 let local_dirs = create_local_dirs(&conf_dirs)?;
193 debug!(
194 "Created local dirs {local_dirs:?} as DataFusion working directory"
195 );
196 Ok(Arc::new(Self {
197 local_dirs: Mutex::new(Some(local_dirs)),
198 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
199 used_disk_space: Arc::new(AtomicU64::new(0)),
200 }))
201 }
202 DiskManagerConfig::Disabled => Ok(Arc::new(Self {
203 local_dirs: Mutex::new(None),
204 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
205 used_disk_space: Arc::new(AtomicU64::new(0)),
206 })),
207 }
208 }
209
210 pub fn set_max_temp_directory_size(
211 &mut self,
212 max_temp_directory_size: u64,
213 ) -> Result<()> {
214 if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
217 return config_err!(
218 "Cannot set max temp directory size for a disk manager that spilling is disabled"
219 );
220 }
221
222 self.max_temp_directory_size = max_temp_directory_size;
223 Ok(())
224 }
225
226 pub fn set_arc_max_temp_directory_size(
227 this: &mut Arc<Self>,
228 max_temp_directory_size: u64,
229 ) -> Result<()> {
230 if let Some(inner) = Arc::get_mut(this) {
231 inner.set_max_temp_directory_size(max_temp_directory_size)?;
232 Ok(())
233 } else {
234 config_err!("DiskManager should be a single instance")
235 }
236 }
237
238 pub fn with_max_temp_directory_size(
239 mut self,
240 max_temp_directory_size: u64,
241 ) -> Result<Self> {
242 self.set_max_temp_directory_size(max_temp_directory_size)?;
243 Ok(self)
244 }
245
246 pub fn used_disk_space(&self) -> u64 {
247 self.used_disk_space.load(Ordering::Relaxed)
248 }
249
250 pub fn max_temp_directory_size(&self) -> u64 {
252 self.max_temp_directory_size
253 }
254
255 pub fn temp_dir_paths(&self) -> Vec<PathBuf> {
257 self.local_dirs
258 .lock()
259 .as_ref()
260 .map(|dirs| {
261 dirs.iter()
262 .map(|temp_dir| temp_dir.path().to_path_buf())
263 .collect()
264 })
265 .unwrap_or_default()
266 }
267
268 pub fn tmp_files_enabled(&self) -> bool {
272 self.local_dirs.lock().is_some()
273 }
274
275 pub fn create_tmp_file(
280 self: &Arc<Self>,
281 request_description: &str,
282 ) -> Result<RefCountedTempFile> {
283 let mut guard = self.local_dirs.lock();
284 let local_dirs = guard.as_mut().ok_or_else(|| {
285 resources_datafusion_err!(
286 "Memory Exhausted while {request_description} (DiskManager is disabled)"
287 )
288 })?;
289
290 if local_dirs.is_empty() {
292 let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
293
294 debug!(
295 "Created directory '{:?}' as DataFusion tempfile directory for {}",
296 tempdir.path().to_string_lossy(),
297 request_description,
298 );
299
300 local_dirs.push(Arc::new(tempdir));
301 }
302
303 let dir_index = rng().random_range(0..local_dirs.len());
304 Ok(RefCountedTempFile {
305 parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
306 tempfile: Arc::new(
307 Builder::new()
308 .tempfile_in(local_dirs[dir_index].as_ref())
309 .map_err(DataFusionError::IoError)?,
310 ),
311 current_file_disk_usage: Arc::new(AtomicU64::new(0)),
312 disk_manager: Arc::clone(self),
313 })
314 }
315}
316
317#[derive(Debug)]
335pub struct RefCountedTempFile {
336 parent_temp_dir: Arc<TempDir>,
339 tempfile: Arc<NamedTempFile>,
341 current_file_disk_usage: Arc<AtomicU64>,
347 disk_manager: Arc<DiskManager>,
349}
350
351impl Clone for RefCountedTempFile {
352 fn clone(&self) -> Self {
353 Self {
354 parent_temp_dir: Arc::clone(&self.parent_temp_dir),
355 tempfile: Arc::clone(&self.tempfile),
356 current_file_disk_usage: Arc::clone(&self.current_file_disk_usage),
357 disk_manager: Arc::clone(&self.disk_manager),
358 }
359 }
360}
361
362impl RefCountedTempFile {
363 pub fn path(&self) -> &Path {
364 self.tempfile.path()
365 }
366
367 pub fn inner(&self) -> &NamedTempFile {
368 self.tempfile.as_ref()
369 }
370
371 pub fn update_disk_usage(&mut self) -> Result<()> {
376 let metadata = self.tempfile.as_file().metadata()?;
378 let new_disk_usage = metadata.len();
379
380 let old_disk_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
382
383 self.disk_manager
386 .used_disk_space
387 .fetch_sub(old_disk_usage, Ordering::Relaxed);
388 self.disk_manager
390 .used_disk_space
391 .fetch_add(new_disk_usage, Ordering::Relaxed);
392
393 let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
395 if global_disk_usage > self.disk_manager.max_temp_directory_size {
396 return resources_err!(
397 "The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
398 human_readable_size(self.disk_manager.max_temp_directory_size as usize)
399 );
400 }
401
402 self.current_file_disk_usage
404 .store(new_disk_usage, Ordering::Relaxed);
405
406 Ok(())
407 }
408
409 pub fn current_disk_usage(&self) -> u64 {
410 self.current_file_disk_usage.load(Ordering::Relaxed)
411 }
412}
413
414impl Drop for RefCountedTempFile {
416 fn drop(&mut self) {
417 if Arc::strong_count(&self.tempfile) == 1 {
421 let current_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
422 self.disk_manager
423 .used_disk_space
424 .fetch_sub(current_usage, Ordering::Relaxed);
425 }
426 }
427}
428
429fn create_local_dirs(local_dirs: &[PathBuf]) -> Result<Vec<Arc<TempDir>>> {
431 local_dirs
432 .iter()
433 .map(|root| {
434 if !Path::new(root).exists() {
435 std::fs::create_dir(root)?;
436 }
437 Builder::new()
438 .prefix("datafusion-")
439 .tempdir_in(root)
440 .map_err(DataFusionError::IoError)
441 })
442 .map(|result| result.map(Arc::new))
443 .collect()
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449
450 #[test]
451 fn lazy_temp_dir_creation() -> Result<()> {
452 let dm = Arc::new(DiskManagerBuilder::default().build()?);
454
455 assert_eq!(0, local_dir_snapshot(&dm).len());
456
457 let actual = dm.create_tmp_file("Testing")?;
459
460 assert_eq!(1, local_dir_snapshot(&dm).len());
462
463 let local_dirs = local_dir_snapshot(&dm);
465 assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
466
467 Ok(())
468 }
469
470 fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
471 dm.local_dirs
472 .lock()
473 .iter()
474 .flatten()
475 .map(|p| p.path().into())
476 .collect()
477 }
478
479 #[test]
480 fn file_in_right_dir() -> Result<()> {
481 let local_dir1 = TempDir::new()?;
482 let local_dir2 = TempDir::new()?;
483 let local_dir3 = TempDir::new()?;
484 let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
485 let dm = Arc::new(
486 DiskManagerBuilder::default()
487 .with_mode(DiskManagerMode::Directories(
488 local_dirs.iter().map(|p| p.into()).collect(),
489 ))
490 .build()?,
491 );
492
493 assert!(dm.tmp_files_enabled());
494 let actual = dm.create_tmp_file("Testing")?;
495
496 assert_path_in_dirs(actual.path(), local_dirs.into_iter());
498
499 Ok(())
500 }
501
502 #[test]
503 fn test_disabled_disk_manager() {
504 let manager = Arc::new(
505 DiskManagerBuilder::default()
506 .with_mode(DiskManagerMode::Disabled)
507 .build()
508 .unwrap(),
509 );
510 assert!(!manager.tmp_files_enabled());
511 assert_eq!(
512 manager
513 .create_tmp_file("Testing")
514 .unwrap_err()
515 .strip_backtrace(),
516 "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
517 )
518 }
519
520 #[test]
521 fn test_disk_manager_create_spill_folder() {
522 let dir = TempDir::new().unwrap();
523 DiskManagerBuilder::default()
524 .with_mode(DiskManagerMode::Directories(vec![dir.path().to_path_buf()]))
525 .build()
526 .unwrap();
527 }
528
529 fn assert_path_in_dirs<'a>(
531 file_path: &'a Path,
532 dirs: impl Iterator<Item = &'a Path>,
533 ) {
534 let dirs: Vec<&Path> = dirs.collect();
535
536 let found = dirs.iter().any(|dir_path| {
537 file_path
538 .ancestors()
539 .any(|candidate_path| *dir_path == candidate_path)
540 });
541
542 assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
543 }
544
545 #[test]
546 fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
547 let dm = Arc::new(DiskManagerBuilder::default().build()?);
549 let temp_file = dm.create_tmp_file("Testing")?;
550 let temp_file_path = temp_file.path().to_owned();
551 assert!(temp_file_path.exists());
552
553 drop(dm);
554 assert!(temp_file_path.exists());
555
556 drop(temp_file);
557 assert!(!temp_file_path.exists());
558
559 let local_dir1 = TempDir::new()?;
561 let local_dir2 = TempDir::new()?;
562 let local_dir3 = TempDir::new()?;
563 let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
564 let dm = Arc::new(
565 DiskManagerBuilder::default()
566 .with_mode(DiskManagerMode::Directories(
567 local_dirs.iter().map(|p| p.into()).collect(),
568 ))
569 .build()?,
570 );
571 let temp_file = dm.create_tmp_file("Testing")?;
572 let temp_file_path = temp_file.path().to_owned();
573 assert!(temp_file_path.exists());
574
575 drop(dm);
576 assert!(temp_file_path.exists());
577
578 drop(temp_file);
579 assert!(!temp_file_path.exists());
580
581 Ok(())
582 }
583
584 #[test]
585 fn test_disk_usage_basic() -> Result<()> {
586 use std::io::Write;
587
588 let dm = Arc::new(DiskManagerBuilder::default().build()?);
589 let mut temp_file = dm.create_tmp_file("Testing")?;
590
591 assert_eq!(dm.used_disk_space(), 0);
593 assert_eq!(temp_file.current_disk_usage(), 0);
594
595 temp_file.inner().as_file().write_all(b"hello world")?;
597 temp_file.update_disk_usage()?;
598
599 let expected_usage = temp_file.current_disk_usage();
601 assert!(expected_usage > 0);
602 assert_eq!(dm.used_disk_space(), expected_usage);
603
604 temp_file.inner().as_file().write_all(b" more data")?;
606 temp_file.update_disk_usage()?;
607
608 let new_usage = temp_file.current_disk_usage();
610 assert!(new_usage > expected_usage);
611 assert_eq!(dm.used_disk_space(), new_usage);
612
613 drop(temp_file);
615
616 assert_eq!(dm.used_disk_space(), 0);
618
619 Ok(())
620 }
621
622 #[test]
623 fn test_disk_usage_with_clones() -> Result<()> {
624 use std::io::Write;
625
626 let dm = Arc::new(DiskManagerBuilder::default().build()?);
627 let mut temp_file = dm.create_tmp_file("Testing")?;
628
629 temp_file.inner().as_file().write_all(b"test data")?;
631 temp_file.update_disk_usage()?;
632
633 let usage_after_write = temp_file.current_disk_usage();
634 assert!(usage_after_write > 0);
635 assert_eq!(dm.used_disk_space(), usage_after_write);
636
637 let clone1 = temp_file.clone();
639 let clone2 = temp_file.clone();
640
641 assert_eq!(clone1.current_disk_usage(), usage_after_write);
643 assert_eq!(clone2.current_disk_usage(), usage_after_write);
644
645 assert_eq!(dm.used_disk_space(), usage_after_write);
647
648 clone1.inner().as_file().write_all(b" more data")?;
650 let mut mutable_clone1 = clone1;
651 mutable_clone1.update_disk_usage()?;
652
653 let new_usage = mutable_clone1.current_disk_usage();
654 assert!(new_usage > usage_after_write);
655
656 assert_eq!(temp_file.current_disk_usage(), new_usage);
658 assert_eq!(clone2.current_disk_usage(), new_usage);
659 assert_eq!(mutable_clone1.current_disk_usage(), new_usage);
660
661 assert_eq!(dm.used_disk_space(), new_usage);
663
664 drop(mutable_clone1);
666
667 assert_eq!(dm.used_disk_space(), new_usage);
669 assert_eq!(temp_file.current_disk_usage(), new_usage);
670 assert_eq!(clone2.current_disk_usage(), new_usage);
671
672 drop(clone2);
674
675 assert_eq!(dm.used_disk_space(), new_usage);
677 assert_eq!(temp_file.current_disk_usage(), new_usage);
678
679 drop(temp_file);
681
682 assert_eq!(dm.used_disk_space(), 0);
684
685 Ok(())
686 }
687
688 #[test]
689 fn test_disk_usage_clones_dropped_out_of_order() -> Result<()> {
690 use std::io::Write;
691
692 let dm = Arc::new(DiskManagerBuilder::default().build()?);
693 let mut temp_file = dm.create_tmp_file("Testing")?;
694
695 temp_file.inner().as_file().write_all(b"test")?;
697 temp_file.update_disk_usage()?;
698
699 let usage = temp_file.current_disk_usage();
700 assert_eq!(dm.used_disk_space(), usage);
701
702 let clone1 = temp_file.clone();
704 let clone2 = temp_file.clone();
705 let clone3 = temp_file.clone();
706
707 drop(temp_file);
709
710 assert_eq!(dm.used_disk_space(), usage);
712 assert_eq!(clone1.current_disk_usage(), usage);
713
714 drop(clone2);
716 assert_eq!(dm.used_disk_space(), usage);
717
718 drop(clone1);
719 assert_eq!(dm.used_disk_space(), usage);
720
721 drop(clone3);
723
724 assert_eq!(dm.used_disk_space(), 0);
726
727 Ok(())
728 }
729
730 #[test]
731 fn test_disk_usage_multiple_files() -> Result<()> {
732 use std::io::Write;
733
734 let dm = Arc::new(DiskManagerBuilder::default().build()?);
735
736 let mut file1 = dm.create_tmp_file("Testing1")?;
738 let mut file2 = dm.create_tmp_file("Testing2")?;
739
740 file1.inner().as_file().write_all(b"file1")?;
742 file1.update_disk_usage()?;
743 let usage1 = file1.current_disk_usage();
744
745 assert_eq!(dm.used_disk_space(), usage1);
746
747 file2.inner().as_file().write_all(b"file2 data")?;
749 file2.update_disk_usage()?;
750 let usage2 = file2.current_disk_usage();
751
752 assert_eq!(dm.used_disk_space(), usage1 + usage2);
754
755 drop(file1);
757
758 assert_eq!(dm.used_disk_space(), usage2);
760
761 drop(file2);
763
764 assert_eq!(dm.used_disk_space(), 0);
766
767 Ok(())
768 }
769}