datafusion_execution/
disk_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DiskManager`]: Manages files generated during query execution
19
20use datafusion_common::{
21    config_err, resources_datafusion_err, resources_err, DataFusionError, Result,
22};
23use log::debug;
24use parking_lot::Mutex;
25use rand::{rng, Rng};
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tempfile::{Builder, NamedTempFile, TempDir};
30
31use crate::memory_pool::human_readable_size;
32
33const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB
34
35/// Builder pattern for the [DiskManager] structure
36#[derive(Clone, Debug)]
37pub struct DiskManagerBuilder {
38    /// The storage mode of the disk manager
39    mode: DiskManagerMode,
40    /// The maximum amount of data (in bytes) stored inside the temporary directories.
41    /// Default to 100GB
42    max_temp_directory_size: u64,
43}
44
45impl Default for DiskManagerBuilder {
46    fn default() -> Self {
47        Self {
48            mode: DiskManagerMode::OsTmpDirectory,
49            max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
50        }
51    }
52}
53
54impl DiskManagerBuilder {
55    pub fn set_mode(&mut self, mode: DiskManagerMode) {
56        self.mode = mode;
57    }
58
59    pub fn with_mode(mut self, mode: DiskManagerMode) -> Self {
60        self.set_mode(mode);
61        self
62    }
63
64    pub fn set_max_temp_directory_size(&mut self, value: u64) {
65        self.max_temp_directory_size = value;
66    }
67
68    pub fn with_max_temp_directory_size(mut self, value: u64) -> Self {
69        self.set_max_temp_directory_size(value);
70        self
71    }
72
73    /// Create a DiskManager given the builder
74    pub fn build(self) -> Result<DiskManager> {
75        match self.mode {
76            DiskManagerMode::OsTmpDirectory => Ok(DiskManager {
77                local_dirs: Mutex::new(Some(vec![])),
78                max_temp_directory_size: self.max_temp_directory_size,
79                used_disk_space: Arc::new(AtomicU64::new(0)),
80            }),
81            DiskManagerMode::Directories(conf_dirs) => {
82                let local_dirs = create_local_dirs(conf_dirs)?;
83                debug!(
84                    "Created local dirs {local_dirs:?} as DataFusion working directory"
85                );
86                Ok(DiskManager {
87                    local_dirs: Mutex::new(Some(local_dirs)),
88                    max_temp_directory_size: self.max_temp_directory_size,
89                    used_disk_space: Arc::new(AtomicU64::new(0)),
90                })
91            }
92            DiskManagerMode::Disabled => Ok(DiskManager {
93                local_dirs: Mutex::new(None),
94                max_temp_directory_size: self.max_temp_directory_size,
95                used_disk_space: Arc::new(AtomicU64::new(0)),
96            }),
97        }
98    }
99}
100
101#[derive(Clone, Debug)]
102pub enum DiskManagerMode {
103    /// Create a new [DiskManager] that creates temporary files within
104    /// a temporary directory chosen by the OS
105    OsTmpDirectory,
106
107    /// Create a new [DiskManager] that creates temporary files within
108    /// the specified directories. One of the directories will be chosen
109    /// at random for each temporary file created.
110    Directories(Vec<PathBuf>),
111
112    /// Disable disk manager, attempts to create temporary files will error
113    Disabled,
114}
115
116impl Default for DiskManagerMode {
117    fn default() -> Self {
118        Self::OsTmpDirectory
119    }
120}
121
122/// Configuration for temporary disk access
123#[deprecated(since = "48.0.0", note = "Use DiskManagerBuilder instead")]
124#[derive(Debug, Clone)]
125pub enum DiskManagerConfig {
126    /// Use the provided [DiskManager] instance
127    Existing(Arc<DiskManager>),
128
129    /// Create a new [DiskManager] that creates temporary files within
130    /// a temporary directory chosen by the OS
131    NewOs,
132
133    /// Create a new [DiskManager] that creates temporary files within
134    /// the specified directories
135    NewSpecified(Vec<PathBuf>),
136
137    /// Disable disk manager, attempts to create temporary files will error
138    Disabled,
139}
140
141#[allow(deprecated)]
142impl Default for DiskManagerConfig {
143    fn default() -> Self {
144        Self::NewOs
145    }
146}
147
148#[allow(deprecated)]
149impl DiskManagerConfig {
150    /// Create temporary files in a temporary directory chosen by the OS
151    pub fn new() -> Self {
152        Self::default()
153    }
154
155    /// Create temporary files using the provided disk manager
156    pub fn new_existing(existing: Arc<DiskManager>) -> Self {
157        Self::Existing(existing)
158    }
159
160    /// Create temporary files in the specified directories
161    pub fn new_specified(paths: Vec<PathBuf>) -> Self {
162        Self::NewSpecified(paths)
163    }
164}
165
166/// Manages files generated during query execution, e.g. spill files generated
167/// while processing dataset larger than available memory.
168#[derive(Debug)]
169pub struct DiskManager {
170    /// TempDirs to put temporary files in.
171    ///
172    /// If `Some(vec![])` a new OS specified temporary directory will be created
173    /// If `None` an error will be returned (configured not to spill)
174    local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
175    /// The maximum amount of data (in bytes) stored inside the temporary directories.
176    /// Default to 100GB
177    max_temp_directory_size: u64,
178    /// Used disk space in the temporary directories. Now only spilled data for
179    /// external executors are counted.
180    used_disk_space: Arc<AtomicU64>,
181}
182
183impl DiskManager {
184    /// Creates a builder for [DiskManager]
185    pub fn builder() -> DiskManagerBuilder {
186        DiskManagerBuilder::default()
187    }
188
189    /// Create a DiskManager given the configuration
190    #[allow(deprecated)]
191    #[deprecated(since = "48.0.0", note = "Use DiskManager::builder() instead")]
192    pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
193        match config {
194            DiskManagerConfig::Existing(manager) => Ok(manager),
195            DiskManagerConfig::NewOs => Ok(Arc::new(Self {
196                local_dirs: Mutex::new(Some(vec![])),
197                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
198                used_disk_space: Arc::new(AtomicU64::new(0)),
199            })),
200            DiskManagerConfig::NewSpecified(conf_dirs) => {
201                let local_dirs = create_local_dirs(conf_dirs)?;
202                debug!(
203                    "Created local dirs {local_dirs:?} as DataFusion working directory"
204                );
205                Ok(Arc::new(Self {
206                    local_dirs: Mutex::new(Some(local_dirs)),
207                    max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
208                    used_disk_space: Arc::new(AtomicU64::new(0)),
209                }))
210            }
211            DiskManagerConfig::Disabled => Ok(Arc::new(Self {
212                local_dirs: Mutex::new(None),
213                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
214                used_disk_space: Arc::new(AtomicU64::new(0)),
215            })),
216        }
217    }
218
219    pub fn set_max_temp_directory_size(
220        &mut self,
221        max_temp_directory_size: u64,
222    ) -> Result<()> {
223        // If the disk manager is disabled and `max_temp_directory_size` is not 0,
224        // this operation is not meaningful, fail early.
225        if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
226            return config_err!(
227                "Cannot set max temp directory size for a disk manager that spilling is disabled"
228            );
229        }
230
231        self.max_temp_directory_size = max_temp_directory_size;
232        Ok(())
233    }
234
235    pub fn set_arc_max_temp_directory_size(
236        this: &mut Arc<Self>,
237        max_temp_directory_size: u64,
238    ) -> Result<()> {
239        if let Some(inner) = Arc::get_mut(this) {
240            inner.set_max_temp_directory_size(max_temp_directory_size)?;
241            Ok(())
242        } else {
243            config_err!("DiskManager should be a single instance")
244        }
245    }
246
247    pub fn with_max_temp_directory_size(
248        mut self,
249        max_temp_directory_size: u64,
250    ) -> Result<Self> {
251        self.set_max_temp_directory_size(max_temp_directory_size)?;
252        Ok(self)
253    }
254
255    pub fn used_disk_space(&self) -> u64 {
256        self.used_disk_space.load(Ordering::Relaxed)
257    }
258
259    /// Return true if this disk manager supports creating temporary
260    /// files. If this returns false, any call to `create_tmp_file`
261    /// will error.
262    pub fn tmp_files_enabled(&self) -> bool {
263        self.local_dirs.lock().is_some()
264    }
265
266    /// Return a temporary file from a randomized choice in the configured locations
267    ///
268    /// If the file can not be created for some reason, returns an
269    /// error message referencing the request description
270    pub fn create_tmp_file(
271        self: &Arc<Self>,
272        request_description: &str,
273    ) -> Result<RefCountedTempFile> {
274        let mut guard = self.local_dirs.lock();
275        let local_dirs = guard.as_mut().ok_or_else(|| {
276            resources_datafusion_err!(
277                "Memory Exhausted while {request_description} (DiskManager is disabled)"
278            )
279        })?;
280
281        // Create a temporary directory if needed
282        if local_dirs.is_empty() {
283            let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
284
285            debug!(
286                "Created directory '{:?}' as DataFusion tempfile directory for {}",
287                tempdir.path().to_string_lossy(),
288                request_description,
289            );
290
291            local_dirs.push(Arc::new(tempdir));
292        }
293
294        let dir_index = rng().random_range(0..local_dirs.len());
295        Ok(RefCountedTempFile {
296            _parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
297            tempfile: Builder::new()
298                .tempfile_in(local_dirs[dir_index].as_ref())
299                .map_err(DataFusionError::IoError)?,
300            current_file_disk_usage: 0,
301            disk_manager: Arc::clone(self),
302        })
303    }
304}
305
306/// A wrapper around a [`NamedTempFile`] that also contains
307/// a reference to its parent temporary directory.
308///
309/// # Note
310/// After any modification to the underlying file (e.g., writing data to it), the caller
311/// must invoke [`Self::update_disk_usage`] to update the global disk usage counter.
312/// This ensures the disk manager can properly enforce usage limits configured by
313/// [`DiskManager::with_max_temp_directory_size`].
314#[derive(Debug)]
315pub struct RefCountedTempFile {
316    /// The reference to the directory in which temporary files are created to ensure
317    /// it is not cleaned up prior to the NamedTempFile
318    _parent_temp_dir: Arc<TempDir>,
319    tempfile: NamedTempFile,
320    /// Tracks the current disk usage of this temporary file. See
321    /// [`Self::update_disk_usage`] for more details.
322    current_file_disk_usage: u64,
323    /// The disk manager that created and manages this temporary file
324    disk_manager: Arc<DiskManager>,
325}
326
327impl RefCountedTempFile {
328    pub fn path(&self) -> &Path {
329        self.tempfile.path()
330    }
331
332    pub fn inner(&self) -> &NamedTempFile {
333        &self.tempfile
334    }
335
336    /// Updates the global disk usage counter after modifications to the underlying file.
337    ///
338    /// # Errors
339    /// - Returns an error if the global disk usage exceeds the configured limit.
340    pub fn update_disk_usage(&mut self) -> Result<()> {
341        // Get new file size from OS
342        let metadata = self.tempfile.as_file().metadata()?;
343        let new_disk_usage = metadata.len();
344
345        // Update the global disk usage by:
346        // 1. Subtracting the old file size from the global counter
347        self.disk_manager
348            .used_disk_space
349            .fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
350        // 2. Adding the new file size to the global counter
351        self.disk_manager
352            .used_disk_space
353            .fetch_add(new_disk_usage, Ordering::Relaxed);
354
355        // 3. Check if the updated global disk usage exceeds the configured limit
356        let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
357        if global_disk_usage > self.disk_manager.max_temp_directory_size {
358            return resources_err!(
359                "The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
360                human_readable_size(self.disk_manager.max_temp_directory_size as usize)
361            );
362        }
363
364        // 4. Update the local file size tracking
365        self.current_file_disk_usage = new_disk_usage;
366
367        Ok(())
368    }
369
370    pub fn current_disk_usage(&self) -> u64 {
371        self.current_file_disk_usage
372    }
373}
374
375/// When the temporary file is dropped, subtract its disk usage from the disk manager's total
376impl Drop for RefCountedTempFile {
377    fn drop(&mut self) {
378        // Subtract the current file's disk usage from the global counter
379        self.disk_manager
380            .used_disk_space
381            .fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
382    }
383}
384
385/// Setup local dirs by creating one new dir in each of the given dirs
386fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> {
387    local_dirs
388        .iter()
389        .map(|root| {
390            if !Path::new(root).exists() {
391                std::fs::create_dir(root)?;
392            }
393            Builder::new()
394                .prefix("datafusion-")
395                .tempdir_in(root)
396                .map_err(DataFusionError::IoError)
397        })
398        .map(|result| result.map(Arc::new))
399        .collect()
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405
406    #[test]
407    fn lazy_temp_dir_creation() -> Result<()> {
408        // A default configuration should not create temp files until requested
409        let dm = Arc::new(DiskManagerBuilder::default().build()?);
410
411        assert_eq!(0, local_dir_snapshot(&dm).len());
412
413        // can still create a tempfile however:
414        let actual = dm.create_tmp_file("Testing")?;
415
416        // Now the tempdir has been created on demand
417        assert_eq!(1, local_dir_snapshot(&dm).len());
418
419        // the returned tempfile file should be in the temp directory
420        let local_dirs = local_dir_snapshot(&dm);
421        assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
422
423        Ok(())
424    }
425
426    fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
427        dm.local_dirs
428            .lock()
429            .iter()
430            .flatten()
431            .map(|p| p.path().into())
432            .collect()
433    }
434
435    #[test]
436    fn file_in_right_dir() -> Result<()> {
437        let local_dir1 = TempDir::new()?;
438        let local_dir2 = TempDir::new()?;
439        let local_dir3 = TempDir::new()?;
440        let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
441        let dm = Arc::new(
442            DiskManagerBuilder::default()
443                .with_mode(DiskManagerMode::Directories(
444                    local_dirs.iter().map(|p| p.into()).collect(),
445                ))
446                .build()?,
447        );
448
449        assert!(dm.tmp_files_enabled());
450        let actual = dm.create_tmp_file("Testing")?;
451
452        // the file should be in one of the specified local directories
453        assert_path_in_dirs(actual.path(), local_dirs.into_iter());
454
455        Ok(())
456    }
457
458    #[test]
459    fn test_disabled_disk_manager() {
460        let manager = Arc::new(
461            DiskManagerBuilder::default()
462                .with_mode(DiskManagerMode::Disabled)
463                .build()
464                .unwrap(),
465        );
466        assert!(!manager.tmp_files_enabled());
467        assert_eq!(
468            manager.create_tmp_file("Testing").unwrap_err().strip_backtrace(),
469            "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
470        )
471    }
472
473    #[test]
474    fn test_disk_manager_create_spill_folder() {
475        let dir = TempDir::new().unwrap();
476        DiskManagerBuilder::default()
477            .with_mode(DiskManagerMode::Directories(vec![dir.path().to_path_buf()]))
478            .build()
479            .unwrap();
480    }
481
482    /// Asserts that `file_path` is found anywhere in any of `dir` directories
483    fn assert_path_in_dirs<'a>(
484        file_path: &'a Path,
485        dirs: impl Iterator<Item = &'a Path>,
486    ) {
487        let dirs: Vec<&Path> = dirs.collect();
488
489        let found = dirs.iter().any(|dir_path| {
490            file_path
491                .ancestors()
492                .any(|candidate_path| *dir_path == candidate_path)
493        });
494
495        assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
496    }
497
498    #[test]
499    fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
500        // Test for the case using OS arranged temporary directory
501        let dm = Arc::new(DiskManagerBuilder::default().build()?);
502        let temp_file = dm.create_tmp_file("Testing")?;
503        let temp_file_path = temp_file.path().to_owned();
504        assert!(temp_file_path.exists());
505
506        drop(dm);
507        assert!(temp_file_path.exists());
508
509        drop(temp_file);
510        assert!(!temp_file_path.exists());
511
512        // Test for the case using specified directories
513        let local_dir1 = TempDir::new()?;
514        let local_dir2 = TempDir::new()?;
515        let local_dir3 = TempDir::new()?;
516        let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
517        let dm = Arc::new(
518            DiskManagerBuilder::default()
519                .with_mode(DiskManagerMode::Directories(
520                    local_dirs.iter().map(|p| p.into()).collect(),
521                ))
522                .build()?,
523        );
524        let temp_file = dm.create_tmp_file("Testing")?;
525        let temp_file_path = temp_file.path().to_owned();
526        assert!(temp_file_path.exists());
527
528        drop(dm);
529        assert!(temp_file_path.exists());
530
531        drop(temp_file);
532        assert!(!temp_file_path.exists());
533
534        Ok(())
535    }
536}