use datafusion_common::{
DataFusionError, Result, config_err, resources_datafusion_err, resources_err,
};
use log::debug;
use parking_lot::Mutex;
use rand::{Rng, rng};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use tempfile::{Builder, NamedTempFile, TempDir};
use datafusion_common::human_readable_size;
pub const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024;
#[derive(Clone, Debug)]
pub struct DiskManagerBuilder {
mode: DiskManagerMode,
max_temp_directory_size: u64,
}
impl Default for DiskManagerBuilder {
fn default() -> Self {
Self {
mode: DiskManagerMode::OsTmpDirectory,
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
}
}
}
impl DiskManagerBuilder {
pub fn set_mode(&mut self, mode: DiskManagerMode) {
self.mode = mode;
}
pub fn with_mode(mut self, mode: DiskManagerMode) -> Self {
self.set_mode(mode);
self
}
pub fn set_max_temp_directory_size(&mut self, value: u64) {
self.max_temp_directory_size = value;
}
pub fn with_max_temp_directory_size(mut self, value: u64) -> Self {
self.set_max_temp_directory_size(value);
self
}
pub fn build(self) -> Result<DiskManager> {
match self.mode {
DiskManagerMode::OsTmpDirectory => Ok(DiskManager {
local_dirs: Mutex::new(Some(vec![])),
max_temp_directory_size: self.max_temp_directory_size,
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
}),
DiskManagerMode::Directories(conf_dirs) => {
let local_dirs = create_local_dirs(&conf_dirs)?;
debug!(
"Created local dirs {local_dirs:?} as DataFusion working directory"
);
Ok(DiskManager {
local_dirs: Mutex::new(Some(local_dirs)),
max_temp_directory_size: self.max_temp_directory_size,
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
})
}
DiskManagerMode::Disabled => Ok(DiskManager {
local_dirs: Mutex::new(None),
max_temp_directory_size: self.max_temp_directory_size,
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
}),
}
}
}
#[derive(Clone, Debug, Default)]
pub enum DiskManagerMode {
#[default]
OsTmpDirectory,
Directories(Vec<PathBuf>),
Disabled,
}
#[deprecated(since = "48.0.0", note = "Use DiskManagerBuilder instead")]
#[derive(Debug, Clone, Default)]
#[allow(clippy::allow_attributes)]
#[allow(deprecated)]
pub enum DiskManagerConfig {
Existing(Arc<DiskManager>),
#[default]
NewOs,
NewSpecified(Vec<PathBuf>),
Disabled,
}
#[expect(deprecated)]
impl DiskManagerConfig {
pub fn new() -> Self {
Self::default()
}
pub fn new_existing(existing: Arc<DiskManager>) -> Self {
Self::Existing(existing)
}
pub fn new_specified(paths: Vec<PathBuf>) -> Self {
Self::NewSpecified(paths)
}
}
#[derive(Debug)]
pub struct DiskManager {
local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
max_temp_directory_size: u64,
used_disk_space: Arc<AtomicU64>,
active_files_count: Arc<AtomicUsize>,
}
#[derive(Debug, Clone, Copy)]
pub struct SpillingProgress {
pub current_bytes: u64,
pub active_files_count: usize,
}
impl DiskManager {
pub fn builder() -> DiskManagerBuilder {
DiskManagerBuilder::default()
}
#[expect(deprecated)]
#[deprecated(since = "48.0.0", note = "Use DiskManager::builder() instead")]
pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
match config {
DiskManagerConfig::Existing(manager) => Ok(manager),
DiskManagerConfig::NewOs => Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(vec![])),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
})),
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(&conf_dirs)?;
debug!(
"Created local dirs {local_dirs:?} as DataFusion working directory"
);
Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(local_dirs)),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
}))
}
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
local_dirs: Mutex::new(None),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
})),
}
}
pub fn set_max_temp_directory_size(
&mut self,
max_temp_directory_size: u64,
) -> Result<()> {
if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
return config_err!(
"Cannot set max temp directory size for a disk manager that spilling is disabled"
);
}
self.max_temp_directory_size = max_temp_directory_size;
Ok(())
}
pub fn set_arc_max_temp_directory_size(
this: &mut Arc<Self>,
max_temp_directory_size: u64,
) -> Result<()> {
if let Some(inner) = Arc::get_mut(this) {
inner.set_max_temp_directory_size(max_temp_directory_size)?;
Ok(())
} else {
config_err!("DiskManager should be a single instance")
}
}
pub fn with_max_temp_directory_size(
mut self,
max_temp_directory_size: u64,
) -> Result<Self> {
self.set_max_temp_directory_size(max_temp_directory_size)?;
Ok(self)
}
pub fn used_disk_space(&self) -> u64 {
self.used_disk_space.load(Ordering::Relaxed)
}
pub fn max_temp_directory_size(&self) -> u64 {
self.max_temp_directory_size
}
pub fn spilling_progress(&self) -> SpillingProgress {
SpillingProgress {
current_bytes: self.used_disk_space.load(Ordering::Relaxed),
active_files_count: self.active_files_count.load(Ordering::Relaxed),
}
}
pub fn temp_dir_paths(&self) -> Vec<PathBuf> {
self.local_dirs
.lock()
.as_ref()
.map(|dirs| {
dirs.iter()
.map(|temp_dir| temp_dir.path().to_path_buf())
.collect()
})
.unwrap_or_default()
}
pub fn tmp_files_enabled(&self) -> bool {
self.local_dirs.lock().is_some()
}
pub fn create_tmp_file(
self: &Arc<Self>,
request_description: &str,
) -> Result<RefCountedTempFile> {
let mut guard = self.local_dirs.lock();
let local_dirs = guard.as_mut().ok_or_else(|| {
resources_datafusion_err!(
"Memory Exhausted while {request_description} (DiskManager is disabled)"
)
})?;
if local_dirs.is_empty() {
let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
debug!(
"Created directory '{:?}' as DataFusion tempfile directory for {}",
tempdir.path().to_string_lossy(),
request_description,
);
local_dirs.push(Arc::new(tempdir));
}
let dir_index = rng().random_range(0..local_dirs.len());
self.active_files_count.fetch_add(1, Ordering::Relaxed);
Ok(RefCountedTempFile {
parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
tempfile: Arc::new(
Builder::new()
.tempfile_in(local_dirs[dir_index].as_ref())
.map_err(DataFusionError::IoError)?,
),
current_file_disk_usage: Arc::new(AtomicU64::new(0)),
disk_manager: Arc::clone(self),
})
}
}
#[derive(Debug)]
pub struct RefCountedTempFile {
parent_temp_dir: Arc<TempDir>,
tempfile: Arc<NamedTempFile>,
current_file_disk_usage: Arc<AtomicU64>,
disk_manager: Arc<DiskManager>,
}
impl Clone for RefCountedTempFile {
fn clone(&self) -> Self {
Self {
parent_temp_dir: Arc::clone(&self.parent_temp_dir),
tempfile: Arc::clone(&self.tempfile),
current_file_disk_usage: Arc::clone(&self.current_file_disk_usage),
disk_manager: Arc::clone(&self.disk_manager),
}
}
}
impl RefCountedTempFile {
pub fn path(&self) -> &Path {
self.tempfile.path()
}
pub fn inner(&self) -> &NamedTempFile {
self.tempfile.as_ref()
}
pub fn update_disk_usage(&mut self) -> Result<()> {
let metadata = self.tempfile.as_file().metadata()?;
let new_disk_usage = metadata.len();
let old_disk_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
self.disk_manager
.used_disk_space
.fetch_sub(old_disk_usage, Ordering::Relaxed);
self.disk_manager
.used_disk_space
.fetch_add(new_disk_usage, Ordering::Relaxed);
let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
if global_disk_usage > self.disk_manager.max_temp_directory_size {
return resources_err!(
"The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
human_readable_size(self.disk_manager.max_temp_directory_size as usize)
);
}
self.current_file_disk_usage
.store(new_disk_usage, Ordering::Relaxed);
Ok(())
}
pub fn current_disk_usage(&self) -> u64 {
self.current_file_disk_usage.load(Ordering::Relaxed)
}
}
impl Drop for RefCountedTempFile {
fn drop(&mut self) {
if Arc::strong_count(&self.tempfile) == 1 {
let current_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
self.disk_manager
.used_disk_space
.fetch_sub(current_usage, Ordering::Relaxed);
self.disk_manager
.active_files_count
.fetch_sub(1, Ordering::Relaxed);
}
}
}
fn create_local_dirs(local_dirs: &[PathBuf]) -> Result<Vec<Arc<TempDir>>> {
local_dirs
.iter()
.map(|root| {
if !Path::new(root).exists() {
std::fs::create_dir(root)?;
}
Builder::new()
.prefix("datafusion-")
.tempdir_in(root)
.map_err(DataFusionError::IoError)
})
.map(|result| result.map(Arc::new))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn lazy_temp_dir_creation() -> Result<()> {
let dm = Arc::new(DiskManagerBuilder::default().build()?);
assert_eq!(0, local_dir_snapshot(&dm).len());
let actual = dm.create_tmp_file("Testing")?;
assert_eq!(1, local_dir_snapshot(&dm).len());
let local_dirs = local_dir_snapshot(&dm);
assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
Ok(())
}
fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
dm.local_dirs
.lock()
.iter()
.flatten()
.map(|p| p.path().into())
.collect()
}
#[test]
fn file_in_right_dir() -> Result<()> {
let local_dir1 = TempDir::new()?;
let local_dir2 = TempDir::new()?;
let local_dir3 = TempDir::new()?;
let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
let dm = Arc::new(
DiskManagerBuilder::default()
.with_mode(DiskManagerMode::Directories(
local_dirs.iter().map(|p| p.into()).collect(),
))
.build()?,
);
assert!(dm.tmp_files_enabled());
let actual = dm.create_tmp_file("Testing")?;
assert_path_in_dirs(actual.path(), local_dirs.into_iter());
Ok(())
}
#[test]
fn test_disabled_disk_manager() {
let manager = Arc::new(
DiskManagerBuilder::default()
.with_mode(DiskManagerMode::Disabled)
.build()
.unwrap(),
);
assert!(!manager.tmp_files_enabled());
assert_eq!(
manager
.create_tmp_file("Testing")
.unwrap_err()
.strip_backtrace(),
"Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
)
}
#[test]
fn test_disk_manager_create_spill_folder() {
let dir = TempDir::new().unwrap();
DiskManagerBuilder::default()
.with_mode(DiskManagerMode::Directories(vec![dir.path().to_path_buf()]))
.build()
.unwrap();
}
fn assert_path_in_dirs<'a>(
file_path: &'a Path,
dirs: impl Iterator<Item = &'a Path>,
) {
let dirs: Vec<&Path> = dirs.collect();
let found = dirs.iter().any(|dir_path| {
file_path
.ancestors()
.any(|candidate_path| *dir_path == candidate_path)
});
assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
}
#[test]
fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
let dm = Arc::new(DiskManagerBuilder::default().build()?);
let temp_file = dm.create_tmp_file("Testing")?;
let temp_file_path = temp_file.path().to_owned();
assert!(temp_file_path.exists());
drop(dm);
assert!(temp_file_path.exists());
drop(temp_file);
assert!(!temp_file_path.exists());
let local_dir1 = TempDir::new()?;
let local_dir2 = TempDir::new()?;
let local_dir3 = TempDir::new()?;
let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
let dm = Arc::new(
DiskManagerBuilder::default()
.with_mode(DiskManagerMode::Directories(
local_dirs.iter().map(|p| p.into()).collect(),
))
.build()?,
);
let temp_file = dm.create_tmp_file("Testing")?;
let temp_file_path = temp_file.path().to_owned();
assert!(temp_file_path.exists());
drop(dm);
assert!(temp_file_path.exists());
drop(temp_file);
assert!(!temp_file_path.exists());
Ok(())
}
#[test]
fn test_disk_usage_basic() -> Result<()> {
use std::io::Write;
let dm = Arc::new(DiskManagerBuilder::default().build()?);
let mut temp_file = dm.create_tmp_file("Testing")?;
assert_eq!(dm.used_disk_space(), 0);
assert_eq!(temp_file.current_disk_usage(), 0);
temp_file.inner().as_file().write_all(b"hello world")?;
temp_file.update_disk_usage()?;
let expected_usage = temp_file.current_disk_usage();
assert!(expected_usage > 0);
assert_eq!(dm.used_disk_space(), expected_usage);
temp_file.inner().as_file().write_all(b" more data")?;
temp_file.update_disk_usage()?;
let new_usage = temp_file.current_disk_usage();
assert!(new_usage > expected_usage);
assert_eq!(dm.used_disk_space(), new_usage);
drop(temp_file);
assert_eq!(dm.used_disk_space(), 0);
Ok(())
}
#[test]
fn test_disk_usage_with_clones() -> Result<()> {
use std::io::Write;
let dm = Arc::new(DiskManagerBuilder::default().build()?);
let mut temp_file = dm.create_tmp_file("Testing")?;
temp_file.inner().as_file().write_all(b"test data")?;
temp_file.update_disk_usage()?;
let usage_after_write = temp_file.current_disk_usage();
assert!(usage_after_write > 0);
assert_eq!(dm.used_disk_space(), usage_after_write);
let clone1 = temp_file.clone();
let clone2 = temp_file.clone();
assert_eq!(clone1.current_disk_usage(), usage_after_write);
assert_eq!(clone2.current_disk_usage(), usage_after_write);
assert_eq!(dm.used_disk_space(), usage_after_write);
clone1.inner().as_file().write_all(b" more data")?;
let mut mutable_clone1 = clone1;
mutable_clone1.update_disk_usage()?;
let new_usage = mutable_clone1.current_disk_usage();
assert!(new_usage > usage_after_write);
assert_eq!(temp_file.current_disk_usage(), new_usage);
assert_eq!(clone2.current_disk_usage(), new_usage);
assert_eq!(mutable_clone1.current_disk_usage(), new_usage);
assert_eq!(dm.used_disk_space(), new_usage);
drop(mutable_clone1);
assert_eq!(dm.used_disk_space(), new_usage);
assert_eq!(temp_file.current_disk_usage(), new_usage);
assert_eq!(clone2.current_disk_usage(), new_usage);
drop(clone2);
assert_eq!(dm.used_disk_space(), new_usage);
assert_eq!(temp_file.current_disk_usage(), new_usage);
drop(temp_file);
assert_eq!(dm.used_disk_space(), 0);
Ok(())
}
#[test]
fn test_disk_usage_clones_dropped_out_of_order() -> Result<()> {
use std::io::Write;
let dm = Arc::new(DiskManagerBuilder::default().build()?);
let mut temp_file = dm.create_tmp_file("Testing")?;
temp_file.inner().as_file().write_all(b"test")?;
temp_file.update_disk_usage()?;
let usage = temp_file.current_disk_usage();
assert_eq!(dm.used_disk_space(), usage);
let clone1 = temp_file.clone();
let clone2 = temp_file.clone();
let clone3 = temp_file.clone();
drop(temp_file);
assert_eq!(dm.used_disk_space(), usage);
assert_eq!(clone1.current_disk_usage(), usage);
drop(clone2);
assert_eq!(dm.used_disk_space(), usage);
drop(clone1);
assert_eq!(dm.used_disk_space(), usage);
drop(clone3);
assert_eq!(dm.used_disk_space(), 0);
Ok(())
}
#[test]
fn test_disk_usage_multiple_files() -> Result<()> {
use std::io::Write;
let dm = Arc::new(DiskManagerBuilder::default().build()?);
let mut file1 = dm.create_tmp_file("Testing1")?;
let mut file2 = dm.create_tmp_file("Testing2")?;
file1.inner().as_file().write_all(b"file1")?;
file1.update_disk_usage()?;
let usage1 = file1.current_disk_usage();
assert_eq!(dm.used_disk_space(), usage1);
file2.inner().as_file().write_all(b"file2 data")?;
file2.update_disk_usage()?;
let usage2 = file2.current_disk_usage();
assert_eq!(dm.used_disk_space(), usage1 + usage2);
drop(file1);
assert_eq!(dm.used_disk_space(), usage2);
drop(file2);
assert_eq!(dm.used_disk_space(), 0);
Ok(())
}
}