datafusion_execution/
disk_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DiskManager`]: Manages files generated during query execution
19
20use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
21use log::debug;
22use parking_lot::Mutex;
23use rand::{thread_rng, Rng};
24use std::path::{Path, PathBuf};
25use std::sync::Arc;
26use tempfile::{Builder, NamedTempFile, TempDir};
27
28/// Configuration for temporary disk access
29#[derive(Debug, Clone)]
30pub enum DiskManagerConfig {
31    /// Use the provided [DiskManager] instance
32    Existing(Arc<DiskManager>),
33
34    /// Create a new [DiskManager] that creates temporary files within
35    /// a temporary directory chosen by the OS
36    NewOs,
37
38    /// Create a new [DiskManager] that creates temporary files within
39    /// the specified directories
40    NewSpecified(Vec<PathBuf>),
41
42    /// Disable disk manager, attempts to create temporary files will error
43    Disabled,
44}
45
46impl Default for DiskManagerConfig {
47    fn default() -> Self {
48        Self::NewOs
49    }
50}
51
52impl DiskManagerConfig {
53    /// Create temporary files in a temporary directory chosen by the OS
54    pub fn new() -> Self {
55        Self::default()
56    }
57
58    /// Create temporary files using the provided disk manager
59    pub fn new_existing(existing: Arc<DiskManager>) -> Self {
60        Self::Existing(existing)
61    }
62
63    /// Create temporary files in the specified directories
64    pub fn new_specified(paths: Vec<PathBuf>) -> Self {
65        Self::NewSpecified(paths)
66    }
67}
68
69/// Manages files generated during query execution, e.g. spill files generated
70/// while processing dataset larger than available memory.
71#[derive(Debug)]
72pub struct DiskManager {
73    /// TempDirs to put temporary files in.
74    ///
75    /// If `Some(vec![])` a new OS specified temporary directory will be created
76    /// If `None` an error will be returned (configured not to spill)
77    local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
78}
79
80impl DiskManager {
81    /// Create a DiskManager given the configuration
82    pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
83        match config {
84            DiskManagerConfig::Existing(manager) => Ok(manager),
85            DiskManagerConfig::NewOs => Ok(Arc::new(Self {
86                local_dirs: Mutex::new(Some(vec![])),
87            })),
88            DiskManagerConfig::NewSpecified(conf_dirs) => {
89                let local_dirs = create_local_dirs(conf_dirs)?;
90                debug!(
91                    "Created local dirs {:?} as DataFusion working directory",
92                    local_dirs
93                );
94                Ok(Arc::new(Self {
95                    local_dirs: Mutex::new(Some(local_dirs)),
96                }))
97            }
98            DiskManagerConfig::Disabled => Ok(Arc::new(Self {
99                local_dirs: Mutex::new(None),
100            })),
101        }
102    }
103
104    /// Return true if this disk manager supports creating temporary
105    /// files. If this returns false, any call to `create_tmp_file`
106    /// will error.
107    pub fn tmp_files_enabled(&self) -> bool {
108        self.local_dirs.lock().is_some()
109    }
110
111    /// Return a temporary file from a randomized choice in the configured locations
112    ///
113    /// If the file can not be created for some reason, returns an
114    /// error message referencing the request description
115    pub fn create_tmp_file(
116        &self,
117        request_description: &str,
118    ) -> Result<RefCountedTempFile> {
119        let mut guard = self.local_dirs.lock();
120        let local_dirs = guard.as_mut().ok_or_else(|| {
121            resources_datafusion_err!(
122                "Memory Exhausted while {request_description} (DiskManager is disabled)"
123            )
124        })?;
125
126        // Create a temporary directory if needed
127        if local_dirs.is_empty() {
128            let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
129
130            debug!(
131                "Created directory '{:?}' as DataFusion tempfile directory for {}",
132                tempdir.path().to_string_lossy(),
133                request_description,
134            );
135
136            local_dirs.push(Arc::new(tempdir));
137        }
138
139        let dir_index = thread_rng().gen_range(0..local_dirs.len());
140        Ok(RefCountedTempFile {
141            _parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
142            tempfile: Builder::new()
143                .tempfile_in(local_dirs[dir_index].as_ref())
144                .map_err(DataFusionError::IoError)?,
145        })
146    }
147}
148
149/// A wrapper around a [`NamedTempFile`] that also contains
150/// a reference to its parent temporary directory
151#[derive(Debug)]
152pub struct RefCountedTempFile {
153    /// The reference to the directory in which temporary files are created to ensure
154    /// it is not cleaned up prior to the NamedTempFile
155    _parent_temp_dir: Arc<TempDir>,
156    tempfile: NamedTempFile,
157}
158
159impl RefCountedTempFile {
160    pub fn path(&self) -> &Path {
161        self.tempfile.path()
162    }
163
164    pub fn inner(&self) -> &NamedTempFile {
165        &self.tempfile
166    }
167}
168
169/// Setup local dirs by creating one new dir in each of the given dirs
170fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> {
171    local_dirs
172        .iter()
173        .map(|root| {
174            if !Path::new(root).exists() {
175                std::fs::create_dir(root)?;
176            }
177            Builder::new()
178                .prefix("datafusion-")
179                .tempdir_in(root)
180                .map_err(DataFusionError::IoError)
181        })
182        .map(|result| result.map(Arc::new))
183        .collect()
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    #[test]
191    fn lazy_temp_dir_creation() -> Result<()> {
192        // A default configuration should not create temp files until requested
193        let config = DiskManagerConfig::new();
194        let dm = DiskManager::try_new(config)?;
195
196        assert_eq!(0, local_dir_snapshot(&dm).len());
197
198        // can still create a tempfile however:
199        let actual = dm.create_tmp_file("Testing")?;
200
201        // Now the tempdir has been created on demand
202        assert_eq!(1, local_dir_snapshot(&dm).len());
203
204        // the returned tempfile file should be in the temp directory
205        let local_dirs = local_dir_snapshot(&dm);
206        assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
207
208        Ok(())
209    }
210
211    fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
212        dm.local_dirs
213            .lock()
214            .iter()
215            .flatten()
216            .map(|p| p.path().into())
217            .collect()
218    }
219
220    #[test]
221    fn file_in_right_dir() -> Result<()> {
222        let local_dir1 = TempDir::new()?;
223        let local_dir2 = TempDir::new()?;
224        let local_dir3 = TempDir::new()?;
225        let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
226        let config = DiskManagerConfig::new_specified(
227            local_dirs.iter().map(|p| p.into()).collect(),
228        );
229
230        let dm = DiskManager::try_new(config)?;
231        assert!(dm.tmp_files_enabled());
232        let actual = dm.create_tmp_file("Testing")?;
233
234        // the file should be in one of the specified local directories
235        assert_path_in_dirs(actual.path(), local_dirs.into_iter());
236
237        Ok(())
238    }
239
240    #[test]
241    fn test_disabled_disk_manager() {
242        let config = DiskManagerConfig::Disabled;
243        let manager = DiskManager::try_new(config).unwrap();
244        assert!(!manager.tmp_files_enabled());
245        assert_eq!(
246            manager.create_tmp_file("Testing").unwrap_err().strip_backtrace(),
247            "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
248        )
249    }
250
251    #[test]
252    fn test_disk_manager_create_spill_folder() {
253        let dir = TempDir::new().unwrap();
254        let config = DiskManagerConfig::new_specified(vec![dir.path().to_owned()]);
255
256        DiskManager::try_new(config)
257            .unwrap()
258            .create_tmp_file("Testing")
259            .unwrap();
260    }
261
262    /// Asserts that `file_path` is found anywhere in any of `dir` directories
263    fn assert_path_in_dirs<'a>(
264        file_path: &'a Path,
265        dirs: impl Iterator<Item = &'a Path>,
266    ) {
267        let dirs: Vec<&Path> = dirs.collect();
268
269        let found = dirs.iter().any(|dir_path| {
270            file_path
271                .ancestors()
272                .any(|candidate_path| *dir_path == candidate_path)
273        });
274
275        assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
276    }
277
278    #[test]
279    fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
280        // Test for the case using OS arranged temporary directory
281        let config = DiskManagerConfig::new();
282        let dm = DiskManager::try_new(config)?;
283        let temp_file = dm.create_tmp_file("Testing")?;
284        let temp_file_path = temp_file.path().to_owned();
285        assert!(temp_file_path.exists());
286
287        drop(dm);
288        assert!(temp_file_path.exists());
289
290        drop(temp_file);
291        assert!(!temp_file_path.exists());
292
293        // Test for the case using specified directories
294        let local_dir1 = TempDir::new()?;
295        let local_dir2 = TempDir::new()?;
296        let local_dir3 = TempDir::new()?;
297        let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
298        let config = DiskManagerConfig::new_specified(
299            local_dirs.iter().map(|p| p.into()).collect(),
300        );
301        let dm = DiskManager::try_new(config)?;
302        let temp_file = dm.create_tmp_file("Testing")?;
303        let temp_file_path = temp_file.path().to_owned();
304        assert!(temp_file_path.exists());
305
306        drop(dm);
307        assert!(temp_file_path.exists());
308
309        drop(temp_file);
310        assert!(!temp_file_path.exists());
311
312        Ok(())
313    }
314}