1use datafusion_common::{
21 DataFusionError, Result, config_err, resources_datafusion_err, resources_err,
22};
23use log::debug;
24use parking_lot::Mutex;
25use rand::{Rng, rng};
26use std::path::{Path, PathBuf};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
29use tempfile::{Builder, NamedTempFile, TempDir};
30
31use datafusion_common::human_readable_size;
32
33pub const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; #[derive(Clone, Debug)]
37pub struct DiskManagerBuilder {
38 mode: DiskManagerMode,
40 max_temp_directory_size: u64,
43}
44
45impl Default for DiskManagerBuilder {
46 fn default() -> Self {
47 Self {
48 mode: DiskManagerMode::OsTmpDirectory,
49 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
50 }
51 }
52}
53
54impl DiskManagerBuilder {
55 pub fn set_mode(&mut self, mode: DiskManagerMode) {
56 self.mode = mode;
57 }
58
59 pub fn with_mode(mut self, mode: DiskManagerMode) -> Self {
60 self.set_mode(mode);
61 self
62 }
63
64 pub fn set_max_temp_directory_size(&mut self, value: u64) {
65 self.max_temp_directory_size = value;
66 }
67
68 pub fn with_max_temp_directory_size(mut self, value: u64) -> Self {
69 self.set_max_temp_directory_size(value);
70 self
71 }
72
73 pub fn build(self) -> Result<DiskManager> {
75 match self.mode {
76 DiskManagerMode::OsTmpDirectory => Ok(DiskManager {
77 local_dirs: Mutex::new(Some(vec![])),
78 max_temp_directory_size: self.max_temp_directory_size,
79 used_disk_space: Arc::new(AtomicU64::new(0)),
80 active_files_count: Arc::new(AtomicUsize::new(0)),
81 }),
82 DiskManagerMode::Directories(conf_dirs) => {
83 let local_dirs = create_local_dirs(&conf_dirs)?;
84 debug!(
85 "Created local dirs {local_dirs:?} as DataFusion working directory"
86 );
87 Ok(DiskManager {
88 local_dirs: Mutex::new(Some(local_dirs)),
89 max_temp_directory_size: self.max_temp_directory_size,
90 used_disk_space: Arc::new(AtomicU64::new(0)),
91 active_files_count: Arc::new(AtomicUsize::new(0)),
92 })
93 }
94 DiskManagerMode::Disabled => Ok(DiskManager {
95 local_dirs: Mutex::new(None),
96 max_temp_directory_size: self.max_temp_directory_size,
97 used_disk_space: Arc::new(AtomicU64::new(0)),
98 active_files_count: Arc::new(AtomicUsize::new(0)),
99 }),
100 }
101 }
102}
103
104#[derive(Clone, Debug, Default)]
105pub enum DiskManagerMode {
106 #[default]
109 OsTmpDirectory,
110
111 Directories(Vec<PathBuf>),
115
116 Disabled,
118}
119
120#[deprecated(since = "48.0.0", note = "Use DiskManagerBuilder instead")]
122#[derive(Debug, Clone, Default)]
123#[allow(clippy::allow_attributes)]
124#[allow(deprecated)]
125pub enum DiskManagerConfig {
126 Existing(Arc<DiskManager>),
128
129 #[default]
132 NewOs,
133
134 NewSpecified(Vec<PathBuf>),
137
138 Disabled,
140}
141
142#[expect(deprecated)]
143impl DiskManagerConfig {
144 pub fn new() -> Self {
146 Self::default()
147 }
148
149 pub fn new_existing(existing: Arc<DiskManager>) -> Self {
151 Self::Existing(existing)
152 }
153
154 pub fn new_specified(paths: Vec<PathBuf>) -> Self {
156 Self::NewSpecified(paths)
157 }
158}
159
160#[derive(Debug)]
163pub struct DiskManager {
164 local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
169 max_temp_directory_size: u64,
172 used_disk_space: Arc<AtomicU64>,
175 active_files_count: Arc<AtomicUsize>,
177}
178
179#[derive(Debug, Clone, Copy)]
181pub struct SpillingProgress {
182 pub current_bytes: u64,
184 pub active_files_count: usize,
186}
187
188impl DiskManager {
189 pub fn builder() -> DiskManagerBuilder {
191 DiskManagerBuilder::default()
192 }
193
194 #[expect(deprecated)]
196 #[deprecated(since = "48.0.0", note = "Use DiskManager::builder() instead")]
197 pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
198 match config {
199 DiskManagerConfig::Existing(manager) => Ok(manager),
200 DiskManagerConfig::NewOs => Ok(Arc::new(Self {
201 local_dirs: Mutex::new(Some(vec![])),
202 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
203 used_disk_space: Arc::new(AtomicU64::new(0)),
204 active_files_count: Arc::new(AtomicUsize::new(0)),
205 })),
206 DiskManagerConfig::NewSpecified(conf_dirs) => {
207 let local_dirs = create_local_dirs(&conf_dirs)?;
208 debug!(
209 "Created local dirs {local_dirs:?} as DataFusion working directory"
210 );
211 Ok(Arc::new(Self {
212 local_dirs: Mutex::new(Some(local_dirs)),
213 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
214 used_disk_space: Arc::new(AtomicU64::new(0)),
215 active_files_count: Arc::new(AtomicUsize::new(0)),
216 }))
217 }
218 DiskManagerConfig::Disabled => Ok(Arc::new(Self {
219 local_dirs: Mutex::new(None),
220 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
221 used_disk_space: Arc::new(AtomicU64::new(0)),
222 active_files_count: Arc::new(AtomicUsize::new(0)),
223 })),
224 }
225 }
226
227 pub fn set_max_temp_directory_size(
228 &mut self,
229 max_temp_directory_size: u64,
230 ) -> Result<()> {
231 if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
234 return config_err!(
235 "Cannot set max temp directory size for a disk manager that spilling is disabled"
236 );
237 }
238
239 self.max_temp_directory_size = max_temp_directory_size;
240 Ok(())
241 }
242
243 pub fn set_arc_max_temp_directory_size(
244 this: &mut Arc<Self>,
245 max_temp_directory_size: u64,
246 ) -> Result<()> {
247 if let Some(inner) = Arc::get_mut(this) {
248 inner.set_max_temp_directory_size(max_temp_directory_size)?;
249 Ok(())
250 } else {
251 config_err!("DiskManager should be a single instance")
252 }
253 }
254
255 pub fn with_max_temp_directory_size(
256 mut self,
257 max_temp_directory_size: u64,
258 ) -> Result<Self> {
259 self.set_max_temp_directory_size(max_temp_directory_size)?;
260 Ok(self)
261 }
262
263 pub fn used_disk_space(&self) -> u64 {
264 self.used_disk_space.load(Ordering::Relaxed)
265 }
266
267 pub fn max_temp_directory_size(&self) -> u64 {
269 self.max_temp_directory_size
270 }
271
272 pub fn spilling_progress(&self) -> SpillingProgress {
274 SpillingProgress {
275 current_bytes: self.used_disk_space.load(Ordering::Relaxed),
276 active_files_count: self.active_files_count.load(Ordering::Relaxed),
277 }
278 }
279
280 pub fn temp_dir_paths(&self) -> Vec<PathBuf> {
282 self.local_dirs
283 .lock()
284 .as_ref()
285 .map(|dirs| {
286 dirs.iter()
287 .map(|temp_dir| temp_dir.path().to_path_buf())
288 .collect()
289 })
290 .unwrap_or_default()
291 }
292
293 pub fn tmp_files_enabled(&self) -> bool {
297 self.local_dirs.lock().is_some()
298 }
299
300 pub fn create_tmp_file(
305 self: &Arc<Self>,
306 request_description: &str,
307 ) -> Result<RefCountedTempFile> {
308 let mut guard = self.local_dirs.lock();
309 let local_dirs = guard.as_mut().ok_or_else(|| {
310 resources_datafusion_err!(
311 "Memory Exhausted while {request_description} (DiskManager is disabled)"
312 )
313 })?;
314
315 if local_dirs.is_empty() {
317 let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
318
319 debug!(
320 "Created directory '{:?}' as DataFusion tempfile directory for {}",
321 tempdir.path().to_string_lossy(),
322 request_description,
323 );
324
325 local_dirs.push(Arc::new(tempdir));
326 }
327
328 let dir_index = rng().random_range(0..local_dirs.len());
329 self.active_files_count.fetch_add(1, Ordering::Relaxed);
330 Ok(RefCountedTempFile {
331 parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
332 tempfile: Arc::new(
333 Builder::new()
334 .tempfile_in(local_dirs[dir_index].as_ref())
335 .map_err(DataFusionError::IoError)?,
336 ),
337 current_file_disk_usage: Arc::new(AtomicU64::new(0)),
338 disk_manager: Arc::clone(self),
339 })
340 }
341}
342
343#[derive(Debug)]
361pub struct RefCountedTempFile {
362 parent_temp_dir: Arc<TempDir>,
365 tempfile: Arc<NamedTempFile>,
367 current_file_disk_usage: Arc<AtomicU64>,
373 disk_manager: Arc<DiskManager>,
375}
376
377impl Clone for RefCountedTempFile {
378 fn clone(&self) -> Self {
379 Self {
380 parent_temp_dir: Arc::clone(&self.parent_temp_dir),
381 tempfile: Arc::clone(&self.tempfile),
382 current_file_disk_usage: Arc::clone(&self.current_file_disk_usage),
383 disk_manager: Arc::clone(&self.disk_manager),
384 }
385 }
386}
387
388impl RefCountedTempFile {
389 pub fn path(&self) -> &Path {
390 self.tempfile.path()
391 }
392
393 pub fn inner(&self) -> &NamedTempFile {
394 self.tempfile.as_ref()
395 }
396
397 pub fn update_disk_usage(&mut self) -> Result<()> {
402 let metadata = self.tempfile.as_file().metadata()?;
404 let new_disk_usage = metadata.len();
405
406 let old_disk_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
408
409 self.disk_manager
412 .used_disk_space
413 .fetch_sub(old_disk_usage, Ordering::Relaxed);
414 self.disk_manager
416 .used_disk_space
417 .fetch_add(new_disk_usage, Ordering::Relaxed);
418
419 let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
421 if global_disk_usage > self.disk_manager.max_temp_directory_size {
422 return resources_err!(
423 "The used disk space during the spilling process has exceeded the allowable limit of {}. \
424 Please try increasing the config: `datafusion.runtime.max_temp_directory_size`.",
425 human_readable_size(self.disk_manager.max_temp_directory_size as usize)
426 );
427 }
428
429 self.current_file_disk_usage
431 .store(new_disk_usage, Ordering::Relaxed);
432
433 Ok(())
434 }
435
436 pub fn current_disk_usage(&self) -> u64 {
437 self.current_file_disk_usage.load(Ordering::Relaxed)
438 }
439}
440
441impl Drop for RefCountedTempFile {
443 fn drop(&mut self) {
444 if Arc::strong_count(&self.tempfile) == 1 {
448 let current_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
449 self.disk_manager
450 .used_disk_space
451 .fetch_sub(current_usage, Ordering::Relaxed);
452 self.disk_manager
453 .active_files_count
454 .fetch_sub(1, Ordering::Relaxed);
455 }
456 }
457}
458
459fn create_local_dirs(local_dirs: &[PathBuf]) -> Result<Vec<Arc<TempDir>>> {
461 local_dirs
462 .iter()
463 .map(|root| {
464 if !Path::new(root).exists() {
465 std::fs::create_dir(root)?;
466 }
467 Builder::new()
468 .prefix("datafusion-")
469 .tempdir_in(root)
470 .map_err(DataFusionError::IoError)
471 })
472 .map(|result| result.map(Arc::new))
473 .collect()
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479
480 #[test]
481 fn lazy_temp_dir_creation() -> Result<()> {
482 let dm = Arc::new(DiskManagerBuilder::default().build()?);
484
485 assert_eq!(0, local_dir_snapshot(&dm).len());
486
487 let actual = dm.create_tmp_file("Testing")?;
489
490 assert_eq!(1, local_dir_snapshot(&dm).len());
492
493 let local_dirs = local_dir_snapshot(&dm);
495 assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
496
497 Ok(())
498 }
499
500 fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
501 dm.local_dirs
502 .lock()
503 .iter()
504 .flatten()
505 .map(|p| p.path().into())
506 .collect()
507 }
508
509 #[test]
510 fn file_in_right_dir() -> Result<()> {
511 let local_dir1 = TempDir::new()?;
512 let local_dir2 = TempDir::new()?;
513 let local_dir3 = TempDir::new()?;
514 let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
515 let dm = Arc::new(
516 DiskManagerBuilder::default()
517 .with_mode(DiskManagerMode::Directories(
518 local_dirs.iter().map(|p| p.into()).collect(),
519 ))
520 .build()?,
521 );
522
523 assert!(dm.tmp_files_enabled());
524 let actual = dm.create_tmp_file("Testing")?;
525
526 assert_path_in_dirs(actual.path(), local_dirs.into_iter());
528
529 Ok(())
530 }
531
532 #[test]
533 fn test_disabled_disk_manager() {
534 let manager = Arc::new(
535 DiskManagerBuilder::default()
536 .with_mode(DiskManagerMode::Disabled)
537 .build()
538 .unwrap(),
539 );
540 assert!(!manager.tmp_files_enabled());
541 assert_eq!(
542 manager
543 .create_tmp_file("Testing")
544 .unwrap_err()
545 .strip_backtrace(),
546 "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
547 )
548 }
549
550 #[test]
551 fn test_disk_manager_create_spill_folder() {
552 let dir = TempDir::new().unwrap();
553 DiskManagerBuilder::default()
554 .with_mode(DiskManagerMode::Directories(vec![dir.path().to_path_buf()]))
555 .build()
556 .unwrap();
557 }
558
559 fn assert_path_in_dirs<'a>(
561 file_path: &'a Path,
562 dirs: impl Iterator<Item = &'a Path>,
563 ) {
564 let dirs: Vec<&Path> = dirs.collect();
565
566 let found = dirs.iter().any(|dir_path| {
567 file_path
568 .ancestors()
569 .any(|candidate_path| *dir_path == candidate_path)
570 });
571
572 assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
573 }
574
575 #[test]
576 fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
577 let dm = Arc::new(DiskManagerBuilder::default().build()?);
579 let temp_file = dm.create_tmp_file("Testing")?;
580 let temp_file_path = temp_file.path().to_owned();
581 assert!(temp_file_path.exists());
582
583 drop(dm);
584 assert!(temp_file_path.exists());
585
586 drop(temp_file);
587 assert!(!temp_file_path.exists());
588
589 let local_dir1 = TempDir::new()?;
591 let local_dir2 = TempDir::new()?;
592 let local_dir3 = TempDir::new()?;
593 let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
594 let dm = Arc::new(
595 DiskManagerBuilder::default()
596 .with_mode(DiskManagerMode::Directories(
597 local_dirs.iter().map(|p| p.into()).collect(),
598 ))
599 .build()?,
600 );
601 let temp_file = dm.create_tmp_file("Testing")?;
602 let temp_file_path = temp_file.path().to_owned();
603 assert!(temp_file_path.exists());
604
605 drop(dm);
606 assert!(temp_file_path.exists());
607
608 drop(temp_file);
609 assert!(!temp_file_path.exists());
610
611 Ok(())
612 }
613
614 #[test]
615 fn test_disk_usage_basic() -> Result<()> {
616 use std::io::Write;
617
618 let dm = Arc::new(DiskManagerBuilder::default().build()?);
619 let mut temp_file = dm.create_tmp_file("Testing")?;
620
621 assert_eq!(dm.used_disk_space(), 0);
623 assert_eq!(temp_file.current_disk_usage(), 0);
624
625 temp_file.inner().as_file().write_all(b"hello world")?;
627 temp_file.update_disk_usage()?;
628
629 let expected_usage = temp_file.current_disk_usage();
631 assert!(expected_usage > 0);
632 assert_eq!(dm.used_disk_space(), expected_usage);
633
634 temp_file.inner().as_file().write_all(b" more data")?;
636 temp_file.update_disk_usage()?;
637
638 let new_usage = temp_file.current_disk_usage();
640 assert!(new_usage > expected_usage);
641 assert_eq!(dm.used_disk_space(), new_usage);
642
643 drop(temp_file);
645
646 assert_eq!(dm.used_disk_space(), 0);
648
649 Ok(())
650 }
651
652 #[test]
653 fn test_disk_usage_with_clones() -> Result<()> {
654 use std::io::Write;
655
656 let dm = Arc::new(DiskManagerBuilder::default().build()?);
657 let mut temp_file = dm.create_tmp_file("Testing")?;
658
659 temp_file.inner().as_file().write_all(b"test data")?;
661 temp_file.update_disk_usage()?;
662
663 let usage_after_write = temp_file.current_disk_usage();
664 assert!(usage_after_write > 0);
665 assert_eq!(dm.used_disk_space(), usage_after_write);
666
667 let clone1 = temp_file.clone();
669 let clone2 = temp_file.clone();
670
671 assert_eq!(clone1.current_disk_usage(), usage_after_write);
673 assert_eq!(clone2.current_disk_usage(), usage_after_write);
674
675 assert_eq!(dm.used_disk_space(), usage_after_write);
677
678 clone1.inner().as_file().write_all(b" more data")?;
680 let mut mutable_clone1 = clone1;
681 mutable_clone1.update_disk_usage()?;
682
683 let new_usage = mutable_clone1.current_disk_usage();
684 assert!(new_usage > usage_after_write);
685
686 assert_eq!(temp_file.current_disk_usage(), new_usage);
688 assert_eq!(clone2.current_disk_usage(), new_usage);
689 assert_eq!(mutable_clone1.current_disk_usage(), new_usage);
690
691 assert_eq!(dm.used_disk_space(), new_usage);
693
694 drop(mutable_clone1);
696
697 assert_eq!(dm.used_disk_space(), new_usage);
699 assert_eq!(temp_file.current_disk_usage(), new_usage);
700 assert_eq!(clone2.current_disk_usage(), new_usage);
701
702 drop(clone2);
704
705 assert_eq!(dm.used_disk_space(), new_usage);
707 assert_eq!(temp_file.current_disk_usage(), new_usage);
708
709 drop(temp_file);
711
712 assert_eq!(dm.used_disk_space(), 0);
714
715 Ok(())
716 }
717
718 #[test]
719 fn test_disk_usage_clones_dropped_out_of_order() -> Result<()> {
720 use std::io::Write;
721
722 let dm = Arc::new(DiskManagerBuilder::default().build()?);
723 let mut temp_file = dm.create_tmp_file("Testing")?;
724
725 temp_file.inner().as_file().write_all(b"test")?;
727 temp_file.update_disk_usage()?;
728
729 let usage = temp_file.current_disk_usage();
730 assert_eq!(dm.used_disk_space(), usage);
731
732 let clone1 = temp_file.clone();
734 let clone2 = temp_file.clone();
735 let clone3 = temp_file.clone();
736
737 drop(temp_file);
739
740 assert_eq!(dm.used_disk_space(), usage);
742 assert_eq!(clone1.current_disk_usage(), usage);
743
744 drop(clone2);
746 assert_eq!(dm.used_disk_space(), usage);
747
748 drop(clone1);
749 assert_eq!(dm.used_disk_space(), usage);
750
751 drop(clone3);
753
754 assert_eq!(dm.used_disk_space(), 0);
756
757 Ok(())
758 }
759
760 #[test]
761 fn test_disk_usage_multiple_files() -> Result<()> {
762 use std::io::Write;
763
764 let dm = Arc::new(DiskManagerBuilder::default().build()?);
765
766 let mut file1 = dm.create_tmp_file("Testing1")?;
768 let mut file2 = dm.create_tmp_file("Testing2")?;
769
770 file1.inner().as_file().write_all(b"file1")?;
772 file1.update_disk_usage()?;
773 let usage1 = file1.current_disk_usage();
774
775 assert_eq!(dm.used_disk_space(), usage1);
776
777 file2.inner().as_file().write_all(b"file2 data")?;
779 file2.update_disk_usage()?;
780 let usage2 = file2.current_disk_usage();
781
782 assert_eq!(dm.used_disk_space(), usage1 + usage2);
784
785 drop(file1);
787
788 assert_eq!(dm.used_disk_space(), usage2);
790
791 drop(file2);
793
794 assert_eq!(dm.used_disk_space(), 0);
796
797 Ok(())
798 }
799}