datafusion_execution/
disk_manager.rs1use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
21use log::debug;
22use parking_lot::Mutex;
23use rand::{thread_rng, Rng};
24use std::path::{Path, PathBuf};
25use std::sync::Arc;
26use tempfile::{Builder, NamedTempFile, TempDir};
27
28#[derive(Debug, Clone)]
30pub enum DiskManagerConfig {
31 Existing(Arc<DiskManager>),
33
34 NewOs,
37
38 NewSpecified(Vec<PathBuf>),
41
42 Disabled,
44}
45
46impl Default for DiskManagerConfig {
47 fn default() -> Self {
48 Self::NewOs
49 }
50}
51
52impl DiskManagerConfig {
53 pub fn new() -> Self {
55 Self::default()
56 }
57
58 pub fn new_existing(existing: Arc<DiskManager>) -> Self {
60 Self::Existing(existing)
61 }
62
63 pub fn new_specified(paths: Vec<PathBuf>) -> Self {
65 Self::NewSpecified(paths)
66 }
67}
68
69#[derive(Debug)]
72pub struct DiskManager {
73 local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
78}
79
80impl DiskManager {
81 pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
83 match config {
84 DiskManagerConfig::Existing(manager) => Ok(manager),
85 DiskManagerConfig::NewOs => Ok(Arc::new(Self {
86 local_dirs: Mutex::new(Some(vec![])),
87 })),
88 DiskManagerConfig::NewSpecified(conf_dirs) => {
89 let local_dirs = create_local_dirs(conf_dirs)?;
90 debug!(
91 "Created local dirs {:?} as DataFusion working directory",
92 local_dirs
93 );
94 Ok(Arc::new(Self {
95 local_dirs: Mutex::new(Some(local_dirs)),
96 }))
97 }
98 DiskManagerConfig::Disabled => Ok(Arc::new(Self {
99 local_dirs: Mutex::new(None),
100 })),
101 }
102 }
103
104 pub fn tmp_files_enabled(&self) -> bool {
108 self.local_dirs.lock().is_some()
109 }
110
111 pub fn create_tmp_file(
116 &self,
117 request_description: &str,
118 ) -> Result<RefCountedTempFile> {
119 let mut guard = self.local_dirs.lock();
120 let local_dirs = guard.as_mut().ok_or_else(|| {
121 resources_datafusion_err!(
122 "Memory Exhausted while {request_description} (DiskManager is disabled)"
123 )
124 })?;
125
126 if local_dirs.is_empty() {
128 let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
129
130 debug!(
131 "Created directory '{:?}' as DataFusion tempfile directory for {}",
132 tempdir.path().to_string_lossy(),
133 request_description,
134 );
135
136 local_dirs.push(Arc::new(tempdir));
137 }
138
139 let dir_index = thread_rng().gen_range(0..local_dirs.len());
140 Ok(RefCountedTempFile {
141 _parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
142 tempfile: Builder::new()
143 .tempfile_in(local_dirs[dir_index].as_ref())
144 .map_err(DataFusionError::IoError)?,
145 })
146 }
147}
148
149#[derive(Debug)]
152pub struct RefCountedTempFile {
153 _parent_temp_dir: Arc<TempDir>,
156 tempfile: NamedTempFile,
157}
158
159impl RefCountedTempFile {
160 pub fn path(&self) -> &Path {
161 self.tempfile.path()
162 }
163
164 pub fn inner(&self) -> &NamedTempFile {
165 &self.tempfile
166 }
167}
168
169fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> {
171 local_dirs
172 .iter()
173 .map(|root| {
174 if !Path::new(root).exists() {
175 std::fs::create_dir(root)?;
176 }
177 Builder::new()
178 .prefix("datafusion-")
179 .tempdir_in(root)
180 .map_err(DataFusionError::IoError)
181 })
182 .map(|result| result.map(Arc::new))
183 .collect()
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[test]
191 fn lazy_temp_dir_creation() -> Result<()> {
192 let config = DiskManagerConfig::new();
194 let dm = DiskManager::try_new(config)?;
195
196 assert_eq!(0, local_dir_snapshot(&dm).len());
197
198 let actual = dm.create_tmp_file("Testing")?;
200
201 assert_eq!(1, local_dir_snapshot(&dm).len());
203
204 let local_dirs = local_dir_snapshot(&dm);
206 assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
207
208 Ok(())
209 }
210
211 fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
212 dm.local_dirs
213 .lock()
214 .iter()
215 .flatten()
216 .map(|p| p.path().into())
217 .collect()
218 }
219
220 #[test]
221 fn file_in_right_dir() -> Result<()> {
222 let local_dir1 = TempDir::new()?;
223 let local_dir2 = TempDir::new()?;
224 let local_dir3 = TempDir::new()?;
225 let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
226 let config = DiskManagerConfig::new_specified(
227 local_dirs.iter().map(|p| p.into()).collect(),
228 );
229
230 let dm = DiskManager::try_new(config)?;
231 assert!(dm.tmp_files_enabled());
232 let actual = dm.create_tmp_file("Testing")?;
233
234 assert_path_in_dirs(actual.path(), local_dirs.into_iter());
236
237 Ok(())
238 }
239
240 #[test]
241 fn test_disabled_disk_manager() {
242 let config = DiskManagerConfig::Disabled;
243 let manager = DiskManager::try_new(config).unwrap();
244 assert!(!manager.tmp_files_enabled());
245 assert_eq!(
246 manager.create_tmp_file("Testing").unwrap_err().strip_backtrace(),
247 "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
248 )
249 }
250
251 #[test]
252 fn test_disk_manager_create_spill_folder() {
253 let dir = TempDir::new().unwrap();
254 let config = DiskManagerConfig::new_specified(vec![dir.path().to_owned()]);
255
256 DiskManager::try_new(config)
257 .unwrap()
258 .create_tmp_file("Testing")
259 .unwrap();
260 }
261
262 fn assert_path_in_dirs<'a>(
264 file_path: &'a Path,
265 dirs: impl Iterator<Item = &'a Path>,
266 ) {
267 let dirs: Vec<&Path> = dirs.collect();
268
269 let found = dirs.iter().any(|dir_path| {
270 file_path
271 .ancestors()
272 .any(|candidate_path| *dir_path == candidate_path)
273 });
274
275 assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
276 }
277
278 #[test]
279 fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
280 let config = DiskManagerConfig::new();
282 let dm = DiskManager::try_new(config)?;
283 let temp_file = dm.create_tmp_file("Testing")?;
284 let temp_file_path = temp_file.path().to_owned();
285 assert!(temp_file_path.exists());
286
287 drop(dm);
288 assert!(temp_file_path.exists());
289
290 drop(temp_file);
291 assert!(!temp_file_path.exists());
292
293 let local_dir1 = TempDir::new()?;
295 let local_dir2 = TempDir::new()?;
296 let local_dir3 = TempDir::new()?;
297 let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
298 let config = DiskManagerConfig::new_specified(
299 local_dirs.iter().map(|p| p.into()).collect(),
300 );
301 let dm = DiskManager::try_new(config)?;
302 let temp_file = dm.create_tmp_file("Testing")?;
303 let temp_file_path = temp_file.path().to_owned();
304 assert!(temp_file_path.exists());
305
306 drop(dm);
307 assert!(temp_file_path.exists());
308
309 drop(temp_file);
310 assert!(!temp_file_path.exists());
311
312 Ok(())
313 }
314}