datafusion_execution/
disk_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DiskManager`]: Manages files generated during query execution
19
20use datafusion_common::{
21    config_err, resources_datafusion_err, resources_err, DataFusionError, Result,
22};
23use log::debug;
24use parking_lot::Mutex;
25use rand::{thread_rng, Rng};
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tempfile::{Builder, NamedTempFile, TempDir};
30
31use crate::memory_pool::human_readable_size;
32
33const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB
34
35/// Configuration for temporary disk access
36#[derive(Debug, Clone)]
37pub enum DiskManagerConfig {
38    /// Use the provided [DiskManager] instance
39    Existing(Arc<DiskManager>),
40
41    /// Create a new [DiskManager] that creates temporary files within
42    /// a temporary directory chosen by the OS
43    NewOs,
44
45    /// Create a new [DiskManager] that creates temporary files within
46    /// the specified directories
47    NewSpecified(Vec<PathBuf>),
48
49    /// Disable disk manager, attempts to create temporary files will error
50    Disabled,
51}
52
53impl Default for DiskManagerConfig {
54    fn default() -> Self {
55        Self::NewOs
56    }
57}
58
59impl DiskManagerConfig {
60    /// Create temporary files in a temporary directory chosen by the OS
61    pub fn new() -> Self {
62        Self::default()
63    }
64
65    /// Create temporary files using the provided disk manager
66    pub fn new_existing(existing: Arc<DiskManager>) -> Self {
67        Self::Existing(existing)
68    }
69
70    /// Create temporary files in the specified directories
71    pub fn new_specified(paths: Vec<PathBuf>) -> Self {
72        Self::NewSpecified(paths)
73    }
74}
75
76/// Manages files generated during query execution, e.g. spill files generated
77/// while processing dataset larger than available memory.
78#[derive(Debug)]
79pub struct DiskManager {
80    /// TempDirs to put temporary files in.
81    ///
82    /// If `Some(vec![])` a new OS specified temporary directory will be created
83    /// If `None` an error will be returned (configured not to spill)
84    local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
85    /// The maximum amount of data (in bytes) stored inside the temporary directories.
86    /// Default to 100GB
87    max_temp_directory_size: u64,
88    /// Used disk space in the temporary directories. Now only spilled data for
89    /// external executors are counted.
90    used_disk_space: Arc<AtomicU64>,
91}
92
93impl DiskManager {
94    /// Create a DiskManager given the configuration
95    pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
96        match config {
97            DiskManagerConfig::Existing(manager) => Ok(manager),
98            DiskManagerConfig::NewOs => Ok(Arc::new(Self {
99                local_dirs: Mutex::new(Some(vec![])),
100                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
101                used_disk_space: Arc::new(AtomicU64::new(0)),
102            })),
103            DiskManagerConfig::NewSpecified(conf_dirs) => {
104                let local_dirs = create_local_dirs(conf_dirs)?;
105                debug!(
106                    "Created local dirs {:?} as DataFusion working directory",
107                    local_dirs
108                );
109                Ok(Arc::new(Self {
110                    local_dirs: Mutex::new(Some(local_dirs)),
111                    max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
112                    used_disk_space: Arc::new(AtomicU64::new(0)),
113                }))
114            }
115            DiskManagerConfig::Disabled => Ok(Arc::new(Self {
116                local_dirs: Mutex::new(None),
117                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
118                used_disk_space: Arc::new(AtomicU64::new(0)),
119            })),
120        }
121    }
122
123    pub fn with_max_temp_directory_size(
124        mut self,
125        max_temp_directory_size: u64,
126    ) -> Result<Self> {
127        // If the disk manager is disabled and `max_temp_directory_size` is not 0,
128        // this operation is not meaningful, fail early.
129        if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
130            return config_err!(
131                "Cannot set max temp directory size for a disk manager that spilling is disabled"
132            );
133        }
134
135        self.max_temp_directory_size = max_temp_directory_size;
136        Ok(self)
137    }
138
139    pub fn used_disk_space(&self) -> u64 {
140        self.used_disk_space.load(Ordering::Relaxed)
141    }
142
143    /// Return true if this disk manager supports creating temporary
144    /// files. If this returns false, any call to `create_tmp_file`
145    /// will error.
146    pub fn tmp_files_enabled(&self) -> bool {
147        self.local_dirs.lock().is_some()
148    }
149
150    /// Return a temporary file from a randomized choice in the configured locations
151    ///
152    /// If the file can not be created for some reason, returns an
153    /// error message referencing the request description
154    pub fn create_tmp_file(
155        self: &Arc<Self>,
156        request_description: &str,
157    ) -> Result<RefCountedTempFile> {
158        let mut guard = self.local_dirs.lock();
159        let local_dirs = guard.as_mut().ok_or_else(|| {
160            resources_datafusion_err!(
161                "Memory Exhausted while {request_description} (DiskManager is disabled)"
162            )
163        })?;
164
165        // Create a temporary directory if needed
166        if local_dirs.is_empty() {
167            let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
168
169            debug!(
170                "Created directory '{:?}' as DataFusion tempfile directory for {}",
171                tempdir.path().to_string_lossy(),
172                request_description,
173            );
174
175            local_dirs.push(Arc::new(tempdir));
176        }
177
178        let dir_index = thread_rng().gen_range(0..local_dirs.len());
179        Ok(RefCountedTempFile {
180            _parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
181            tempfile: Builder::new()
182                .tempfile_in(local_dirs[dir_index].as_ref())
183                .map_err(DataFusionError::IoError)?,
184            current_file_disk_usage: 0,
185            disk_manager: Arc::clone(self),
186        })
187    }
188}
189
190/// A wrapper around a [`NamedTempFile`] that also contains
191/// a reference to its parent temporary directory.
192///
193/// # Note
194/// After any modification to the underlying file (e.g., writing data to it), the caller
195/// must invoke [`Self::update_disk_usage`] to update the global disk usage counter.
196/// This ensures the disk manager can properly enforce usage limits configured by
197/// [`DiskManager::with_max_temp_directory_size`].
198#[derive(Debug)]
199pub struct RefCountedTempFile {
200    /// The reference to the directory in which temporary files are created to ensure
201    /// it is not cleaned up prior to the NamedTempFile
202    _parent_temp_dir: Arc<TempDir>,
203    tempfile: NamedTempFile,
204    /// Tracks the current disk usage of this temporary file. See
205    /// [`Self::update_disk_usage`] for more details.
206    current_file_disk_usage: u64,
207    /// The disk manager that created and manages this temporary file
208    disk_manager: Arc<DiskManager>,
209}
210
211impl RefCountedTempFile {
212    pub fn path(&self) -> &Path {
213        self.tempfile.path()
214    }
215
216    pub fn inner(&self) -> &NamedTempFile {
217        &self.tempfile
218    }
219
220    /// Updates the global disk usage counter after modifications to the underlying file.
221    ///
222    /// # Errors
223    /// - Returns an error if the global disk usage exceeds the configured limit.
224    pub fn update_disk_usage(&mut self) -> Result<()> {
225        // Get new file size from OS
226        let metadata = self.tempfile.as_file().metadata()?;
227        let new_disk_usage = metadata.len();
228
229        // Update the global disk usage by:
230        // 1. Subtracting the old file size from the global counter
231        self.disk_manager
232            .used_disk_space
233            .fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
234        // 2. Adding the new file size to the global counter
235        self.disk_manager
236            .used_disk_space
237            .fetch_add(new_disk_usage, Ordering::Relaxed);
238
239        // 3. Check if the updated global disk usage exceeds the configured limit
240        let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
241        if global_disk_usage > self.disk_manager.max_temp_directory_size {
242            return resources_err!(
243                "The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
244                human_readable_size(self.disk_manager.max_temp_directory_size as usize)
245            );
246        }
247
248        // 4. Update the local file size tracking
249        self.current_file_disk_usage = new_disk_usage;
250
251        Ok(())
252    }
253}
254
255/// When the temporary file is dropped, subtract its disk usage from the disk manager's total
256impl Drop for RefCountedTempFile {
257    fn drop(&mut self) {
258        // Subtract the current file's disk usage from the global counter
259        self.disk_manager
260            .used_disk_space
261            .fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
262    }
263}
264
265/// Setup local dirs by creating one new dir in each of the given dirs
266fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> {
267    local_dirs
268        .iter()
269        .map(|root| {
270            if !Path::new(root).exists() {
271                std::fs::create_dir(root)?;
272            }
273            Builder::new()
274                .prefix("datafusion-")
275                .tempdir_in(root)
276                .map_err(DataFusionError::IoError)
277        })
278        .map(|result| result.map(Arc::new))
279        .collect()
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    #[test]
287    fn lazy_temp_dir_creation() -> Result<()> {
288        // A default configuration should not create temp files until requested
289        let config = DiskManagerConfig::new();
290        let dm = DiskManager::try_new(config)?;
291
292        assert_eq!(0, local_dir_snapshot(&dm).len());
293
294        // can still create a tempfile however:
295        let actual = dm.create_tmp_file("Testing")?;
296
297        // Now the tempdir has been created on demand
298        assert_eq!(1, local_dir_snapshot(&dm).len());
299
300        // the returned tempfile file should be in the temp directory
301        let local_dirs = local_dir_snapshot(&dm);
302        assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
303
304        Ok(())
305    }
306
307    fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
308        dm.local_dirs
309            .lock()
310            .iter()
311            .flatten()
312            .map(|p| p.path().into())
313            .collect()
314    }
315
316    #[test]
317    fn file_in_right_dir() -> Result<()> {
318        let local_dir1 = TempDir::new()?;
319        let local_dir2 = TempDir::new()?;
320        let local_dir3 = TempDir::new()?;
321        let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
322        let config = DiskManagerConfig::new_specified(
323            local_dirs.iter().map(|p| p.into()).collect(),
324        );
325
326        let dm = DiskManager::try_new(config)?;
327        assert!(dm.tmp_files_enabled());
328        let actual = dm.create_tmp_file("Testing")?;
329
330        // the file should be in one of the specified local directories
331        assert_path_in_dirs(actual.path(), local_dirs.into_iter());
332
333        Ok(())
334    }
335
336    #[test]
337    fn test_disabled_disk_manager() {
338        let config = DiskManagerConfig::Disabled;
339        let manager = DiskManager::try_new(config).unwrap();
340        assert!(!manager.tmp_files_enabled());
341        assert_eq!(
342            manager.create_tmp_file("Testing").unwrap_err().strip_backtrace(),
343            "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
344        )
345    }
346
347    #[test]
348    fn test_disk_manager_create_spill_folder() {
349        let dir = TempDir::new().unwrap();
350        let config = DiskManagerConfig::new_specified(vec![dir.path().to_owned()]);
351
352        DiskManager::try_new(config)
353            .unwrap()
354            .create_tmp_file("Testing")
355            .unwrap();
356    }
357
358    /// Asserts that `file_path` is found anywhere in any of `dir` directories
359    fn assert_path_in_dirs<'a>(
360        file_path: &'a Path,
361        dirs: impl Iterator<Item = &'a Path>,
362    ) {
363        let dirs: Vec<&Path> = dirs.collect();
364
365        let found = dirs.iter().any(|dir_path| {
366            file_path
367                .ancestors()
368                .any(|candidate_path| *dir_path == candidate_path)
369        });
370
371        assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
372    }
373
374    #[test]
375    fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
376        // Test for the case using OS arranged temporary directory
377        let config = DiskManagerConfig::new();
378        let dm = DiskManager::try_new(config)?;
379        let temp_file = dm.create_tmp_file("Testing")?;
380        let temp_file_path = temp_file.path().to_owned();
381        assert!(temp_file_path.exists());
382
383        drop(dm);
384        assert!(temp_file_path.exists());
385
386        drop(temp_file);
387        assert!(!temp_file_path.exists());
388
389        // Test for the case using specified directories
390        let local_dir1 = TempDir::new()?;
391        let local_dir2 = TempDir::new()?;
392        let local_dir3 = TempDir::new()?;
393        let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
394        let config = DiskManagerConfig::new_specified(
395            local_dirs.iter().map(|p| p.into()).collect(),
396        );
397        let dm = DiskManager::try_new(config)?;
398        let temp_file = dm.create_tmp_file("Testing")?;
399        let temp_file_path = temp_file.path().to_owned();
400        assert!(temp_file_path.exists());
401
402        drop(dm);
403        assert!(temp_file_path.exists());
404
405        drop(temp_file);
406        assert!(!temp_file_path.exists());
407
408        Ok(())
409    }
410}