datafusion_execution/
disk_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DiskManager`]: Manages files generated during query execution
19
20use datafusion_common::{
21    DataFusionError, Result, config_err, resources_datafusion_err, resources_err,
22};
23use log::debug;
24use parking_lot::Mutex;
25use rand::{Rng, rng};
26use std::path::{Path, PathBuf};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use tempfile::{Builder, NamedTempFile, TempDir};
30
31use datafusion_common::human_readable_size;
32
33pub const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB
34
35/// Builder pattern for the [DiskManager] structure
36#[derive(Clone, Debug)]
37pub struct DiskManagerBuilder {
38    /// The storage mode of the disk manager
39    mode: DiskManagerMode,
40    /// The maximum amount of data (in bytes) stored inside the temporary directories.
41    /// Default to 100GB
42    max_temp_directory_size: u64,
43}
44
45impl Default for DiskManagerBuilder {
46    fn default() -> Self {
47        Self {
48            mode: DiskManagerMode::OsTmpDirectory,
49            max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
50        }
51    }
52}
53
54impl DiskManagerBuilder {
55    pub fn set_mode(&mut self, mode: DiskManagerMode) {
56        self.mode = mode;
57    }
58
59    pub fn with_mode(mut self, mode: DiskManagerMode) -> Self {
60        self.set_mode(mode);
61        self
62    }
63
64    pub fn set_max_temp_directory_size(&mut self, value: u64) {
65        self.max_temp_directory_size = value;
66    }
67
68    pub fn with_max_temp_directory_size(mut self, value: u64) -> Self {
69        self.set_max_temp_directory_size(value);
70        self
71    }
72
73    /// Create a DiskManager given the builder
74    pub fn build(self) -> Result<DiskManager> {
75        match self.mode {
76            DiskManagerMode::OsTmpDirectory => Ok(DiskManager {
77                local_dirs: Mutex::new(Some(vec![])),
78                max_temp_directory_size: self.max_temp_directory_size,
79                used_disk_space: Arc::new(AtomicU64::new(0)),
80            }),
81            DiskManagerMode::Directories(conf_dirs) => {
82                let local_dirs = create_local_dirs(&conf_dirs)?;
83                debug!(
84                    "Created local dirs {local_dirs:?} as DataFusion working directory"
85                );
86                Ok(DiskManager {
87                    local_dirs: Mutex::new(Some(local_dirs)),
88                    max_temp_directory_size: self.max_temp_directory_size,
89                    used_disk_space: Arc::new(AtomicU64::new(0)),
90                })
91            }
92            DiskManagerMode::Disabled => Ok(DiskManager {
93                local_dirs: Mutex::new(None),
94                max_temp_directory_size: self.max_temp_directory_size,
95                used_disk_space: Arc::new(AtomicU64::new(0)),
96            }),
97        }
98    }
99}
100
101#[derive(Clone, Debug, Default)]
102pub enum DiskManagerMode {
103    /// Create a new [DiskManager] that creates temporary files within
104    /// a temporary directory chosen by the OS
105    #[default]
106    OsTmpDirectory,
107
108    /// Create a new [DiskManager] that creates temporary files within
109    /// the specified directories. One of the directories will be chosen
110    /// at random for each temporary file created.
111    Directories(Vec<PathBuf>),
112
113    /// Disable disk manager, attempts to create temporary files will error
114    Disabled,
115}
116
117/// Configuration for temporary disk access
118#[deprecated(since = "48.0.0", note = "Use DiskManagerBuilder instead")]
119#[derive(Debug, Clone, Default)]
120#[allow(clippy::allow_attributes)]
121#[allow(deprecated)]
122pub enum DiskManagerConfig {
123    /// Use the provided [DiskManager] instance
124    Existing(Arc<DiskManager>),
125
126    /// Create a new [DiskManager] that creates temporary files within
127    /// a temporary directory chosen by the OS
128    #[default]
129    NewOs,
130
131    /// Create a new [DiskManager] that creates temporary files within
132    /// the specified directories
133    NewSpecified(Vec<PathBuf>),
134
135    /// Disable disk manager, attempts to create temporary files will error
136    Disabled,
137}
138
139#[expect(deprecated)]
140impl DiskManagerConfig {
141    /// Create temporary files in a temporary directory chosen by the OS
142    pub fn new() -> Self {
143        Self::default()
144    }
145
146    /// Create temporary files using the provided disk manager
147    pub fn new_existing(existing: Arc<DiskManager>) -> Self {
148        Self::Existing(existing)
149    }
150
151    /// Create temporary files in the specified directories
152    pub fn new_specified(paths: Vec<PathBuf>) -> Self {
153        Self::NewSpecified(paths)
154    }
155}
156
157/// Manages files generated during query execution, e.g. spill files generated
158/// while processing dataset larger than available memory.
159#[derive(Debug)]
160pub struct DiskManager {
161    /// TempDirs to put temporary files in.
162    ///
163    /// If `Some(vec![])` a new OS specified temporary directory will be created
164    /// If `None` an error will be returned (configured not to spill)
165    local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
166    /// The maximum amount of data (in bytes) stored inside the temporary directories.
167    /// Default to 100GB
168    max_temp_directory_size: u64,
169    /// Used disk space in the temporary directories. Now only spilled data for
170    /// external executors are counted.
171    used_disk_space: Arc<AtomicU64>,
172}
173
174impl DiskManager {
175    /// Creates a builder for [DiskManager]
176    pub fn builder() -> DiskManagerBuilder {
177        DiskManagerBuilder::default()
178    }
179
180    /// Create a DiskManager given the configuration
181    #[expect(deprecated)]
182    #[deprecated(since = "48.0.0", note = "Use DiskManager::builder() instead")]
183    pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
184        match config {
185            DiskManagerConfig::Existing(manager) => Ok(manager),
186            DiskManagerConfig::NewOs => Ok(Arc::new(Self {
187                local_dirs: Mutex::new(Some(vec![])),
188                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
189                used_disk_space: Arc::new(AtomicU64::new(0)),
190            })),
191            DiskManagerConfig::NewSpecified(conf_dirs) => {
192                let local_dirs = create_local_dirs(&conf_dirs)?;
193                debug!(
194                    "Created local dirs {local_dirs:?} as DataFusion working directory"
195                );
196                Ok(Arc::new(Self {
197                    local_dirs: Mutex::new(Some(local_dirs)),
198                    max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
199                    used_disk_space: Arc::new(AtomicU64::new(0)),
200                }))
201            }
202            DiskManagerConfig::Disabled => Ok(Arc::new(Self {
203                local_dirs: Mutex::new(None),
204                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
205                used_disk_space: Arc::new(AtomicU64::new(0)),
206            })),
207        }
208    }
209
210    pub fn set_max_temp_directory_size(
211        &mut self,
212        max_temp_directory_size: u64,
213    ) -> Result<()> {
214        // If the disk manager is disabled and `max_temp_directory_size` is not 0,
215        // this operation is not meaningful, fail early.
216        if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
217            return config_err!(
218                "Cannot set max temp directory size for a disk manager that spilling is disabled"
219            );
220        }
221
222        self.max_temp_directory_size = max_temp_directory_size;
223        Ok(())
224    }
225
226    pub fn set_arc_max_temp_directory_size(
227        this: &mut Arc<Self>,
228        max_temp_directory_size: u64,
229    ) -> Result<()> {
230        if let Some(inner) = Arc::get_mut(this) {
231            inner.set_max_temp_directory_size(max_temp_directory_size)?;
232            Ok(())
233        } else {
234            config_err!("DiskManager should be a single instance")
235        }
236    }
237
238    pub fn with_max_temp_directory_size(
239        mut self,
240        max_temp_directory_size: u64,
241    ) -> Result<Self> {
242        self.set_max_temp_directory_size(max_temp_directory_size)?;
243        Ok(self)
244    }
245
246    pub fn used_disk_space(&self) -> u64 {
247        self.used_disk_space.load(Ordering::Relaxed)
248    }
249
250    /// Returns the maximum temporary directory size in bytes
251    pub fn max_temp_directory_size(&self) -> u64 {
252        self.max_temp_directory_size
253    }
254
255    /// Returns the temporary directory paths
256    pub fn temp_dir_paths(&self) -> Vec<PathBuf> {
257        self.local_dirs
258            .lock()
259            .as_ref()
260            .map(|dirs| {
261                dirs.iter()
262                    .map(|temp_dir| temp_dir.path().to_path_buf())
263                    .collect()
264            })
265            .unwrap_or_default()
266    }
267
268    /// Return true if this disk manager supports creating temporary
269    /// files. If this returns false, any call to `create_tmp_file`
270    /// will error.
271    pub fn tmp_files_enabled(&self) -> bool {
272        self.local_dirs.lock().is_some()
273    }
274
275    /// Return a temporary file from a randomized choice in the configured locations
276    ///
277    /// If the file can not be created for some reason, returns an
278    /// error message referencing the request description
279    pub fn create_tmp_file(
280        self: &Arc<Self>,
281        request_description: &str,
282    ) -> Result<RefCountedTempFile> {
283        let mut guard = self.local_dirs.lock();
284        let local_dirs = guard.as_mut().ok_or_else(|| {
285            resources_datafusion_err!(
286                "Memory Exhausted while {request_description} (DiskManager is disabled)"
287            )
288        })?;
289
290        // Create a temporary directory if needed
291        if local_dirs.is_empty() {
292            let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
293
294            debug!(
295                "Created directory '{:?}' as DataFusion tempfile directory for {}",
296                tempdir.path().to_string_lossy(),
297                request_description,
298            );
299
300            local_dirs.push(Arc::new(tempdir));
301        }
302
303        let dir_index = rng().random_range(0..local_dirs.len());
304        Ok(RefCountedTempFile {
305            parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
306            tempfile: Arc::new(
307                Builder::new()
308                    .tempfile_in(local_dirs[dir_index].as_ref())
309                    .map_err(DataFusionError::IoError)?,
310            ),
311            current_file_disk_usage: Arc::new(AtomicU64::new(0)),
312            disk_manager: Arc::clone(self),
313        })
314    }
315}
316
317/// A wrapper around a [`NamedTempFile`] that also contains
318/// a reference to its parent temporary directory.
319///
320/// # Note
321/// After any modification to the underlying file (e.g., writing data to it), the caller
322/// must invoke [`Self::update_disk_usage`] to update the global disk usage counter.
323/// This ensures the disk manager can properly enforce usage limits configured by
324/// [`DiskManager::with_max_temp_directory_size`].
325///
326/// This type is Clone-able, allowing multiple references to the same underlying file.
327/// The file is deleted only when the last reference is dropped.
328///
329/// The parent temporary directory is also kept alive as long as any reference to
330/// this file exists, preventing premature cleanup of the directory.
331///
332/// Once all references to this file are dropped, the file is deleted, and the
333/// disk usage is subtracted from the disk manager's total.
334#[derive(Debug)]
335pub struct RefCountedTempFile {
336    /// The reference to the directory in which temporary files are created to ensure
337    /// it is not cleaned up prior to the NamedTempFile
338    parent_temp_dir: Arc<TempDir>,
339    /// The underlying temporary file, wrapped in Arc to allow cloning
340    tempfile: Arc<NamedTempFile>,
341    /// Tracks the current disk usage of this temporary file. See
342    /// [`Self::update_disk_usage`] for more details.
343    ///
344    /// This is wrapped in `Arc<AtomicU64>` so that all clones share the same
345    /// disk usage tracking, preventing incorrect accounting when clones are dropped.
346    current_file_disk_usage: Arc<AtomicU64>,
347    /// The disk manager that created and manages this temporary file
348    disk_manager: Arc<DiskManager>,
349}
350
351impl Clone for RefCountedTempFile {
352    fn clone(&self) -> Self {
353        Self {
354            parent_temp_dir: Arc::clone(&self.parent_temp_dir),
355            tempfile: Arc::clone(&self.tempfile),
356            current_file_disk_usage: Arc::clone(&self.current_file_disk_usage),
357            disk_manager: Arc::clone(&self.disk_manager),
358        }
359    }
360}
361
362impl RefCountedTempFile {
363    pub fn path(&self) -> &Path {
364        self.tempfile.path()
365    }
366
367    pub fn inner(&self) -> &NamedTempFile {
368        self.tempfile.as_ref()
369    }
370
371    /// Updates the global disk usage counter after modifications to the underlying file.
372    ///
373    /// # Errors
374    /// - Returns an error if the global disk usage exceeds the configured limit.
375    pub fn update_disk_usage(&mut self) -> Result<()> {
376        // Get new file size from OS
377        let metadata = self.tempfile.as_file().metadata()?;
378        let new_disk_usage = metadata.len();
379
380        // Get the old disk usage
381        let old_disk_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
382
383        // Update the global disk usage by:
384        // 1. Subtracting the old file size from the global counter
385        self.disk_manager
386            .used_disk_space
387            .fetch_sub(old_disk_usage, Ordering::Relaxed);
388        // 2. Adding the new file size to the global counter
389        self.disk_manager
390            .used_disk_space
391            .fetch_add(new_disk_usage, Ordering::Relaxed);
392
393        // 3. Check if the updated global disk usage exceeds the configured limit
394        let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
395        if global_disk_usage > self.disk_manager.max_temp_directory_size {
396            return resources_err!(
397                "The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
398                human_readable_size(self.disk_manager.max_temp_directory_size as usize)
399            );
400        }
401
402        // 4. Update the local file size tracking
403        self.current_file_disk_usage
404            .store(new_disk_usage, Ordering::Relaxed);
405
406        Ok(())
407    }
408
409    pub fn current_disk_usage(&self) -> u64 {
410        self.current_file_disk_usage.load(Ordering::Relaxed)
411    }
412}
413
414/// When the temporary file is dropped, subtract its disk usage from the disk manager's total
415impl Drop for RefCountedTempFile {
416    fn drop(&mut self) {
417        // Only subtract disk usage when this is the last reference to the file
418        // Check if we're the last one by seeing if there's only one strong reference
419        // left to the underlying tempfile (the one we're holding)
420        if Arc::strong_count(&self.tempfile) == 1 {
421            let current_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
422            self.disk_manager
423                .used_disk_space
424                .fetch_sub(current_usage, Ordering::Relaxed);
425        }
426    }
427}
428
429/// Setup local dirs by creating one new dir in each of the given dirs
430fn create_local_dirs(local_dirs: &[PathBuf]) -> Result<Vec<Arc<TempDir>>> {
431    local_dirs
432        .iter()
433        .map(|root| {
434            if !Path::new(root).exists() {
435                std::fs::create_dir(root)?;
436            }
437            Builder::new()
438                .prefix("datafusion-")
439                .tempdir_in(root)
440                .map_err(DataFusionError::IoError)
441        })
442        .map(|result| result.map(Arc::new))
443        .collect()
444}
445
446#[cfg(test)]
447mod tests {
448    use super::*;
449
450    #[test]
451    fn lazy_temp_dir_creation() -> Result<()> {
452        // A default configuration should not create temp files until requested
453        let dm = Arc::new(DiskManagerBuilder::default().build()?);
454
455        assert_eq!(0, local_dir_snapshot(&dm).len());
456
457        // can still create a tempfile however:
458        let actual = dm.create_tmp_file("Testing")?;
459
460        // Now the tempdir has been created on demand
461        assert_eq!(1, local_dir_snapshot(&dm).len());
462
463        // the returned tempfile file should be in the temp directory
464        let local_dirs = local_dir_snapshot(&dm);
465        assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
466
467        Ok(())
468    }
469
470    fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
471        dm.local_dirs
472            .lock()
473            .iter()
474            .flatten()
475            .map(|p| p.path().into())
476            .collect()
477    }
478
479    #[test]
480    fn file_in_right_dir() -> Result<()> {
481        let local_dir1 = TempDir::new()?;
482        let local_dir2 = TempDir::new()?;
483        let local_dir3 = TempDir::new()?;
484        let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
485        let dm = Arc::new(
486            DiskManagerBuilder::default()
487                .with_mode(DiskManagerMode::Directories(
488                    local_dirs.iter().map(|p| p.into()).collect(),
489                ))
490                .build()?,
491        );
492
493        assert!(dm.tmp_files_enabled());
494        let actual = dm.create_tmp_file("Testing")?;
495
496        // the file should be in one of the specified local directories
497        assert_path_in_dirs(actual.path(), local_dirs.into_iter());
498
499        Ok(())
500    }
501
502    #[test]
503    fn test_disabled_disk_manager() {
504        let manager = Arc::new(
505            DiskManagerBuilder::default()
506                .with_mode(DiskManagerMode::Disabled)
507                .build()
508                .unwrap(),
509        );
510        assert!(!manager.tmp_files_enabled());
511        assert_eq!(
512            manager
513                .create_tmp_file("Testing")
514                .unwrap_err()
515                .strip_backtrace(),
516            "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
517        )
518    }
519
520    #[test]
521    fn test_disk_manager_create_spill_folder() {
522        let dir = TempDir::new().unwrap();
523        DiskManagerBuilder::default()
524            .with_mode(DiskManagerMode::Directories(vec![dir.path().to_path_buf()]))
525            .build()
526            .unwrap();
527    }
528
529    /// Asserts that `file_path` is found anywhere in any of `dir` directories
530    fn assert_path_in_dirs<'a>(
531        file_path: &'a Path,
532        dirs: impl Iterator<Item = &'a Path>,
533    ) {
534        let dirs: Vec<&Path> = dirs.collect();
535
536        let found = dirs.iter().any(|dir_path| {
537            file_path
538                .ancestors()
539                .any(|candidate_path| *dir_path == candidate_path)
540        });
541
542        assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
543    }
544
545    #[test]
546    fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
547        // Test for the case using OS arranged temporary directory
548        let dm = Arc::new(DiskManagerBuilder::default().build()?);
549        let temp_file = dm.create_tmp_file("Testing")?;
550        let temp_file_path = temp_file.path().to_owned();
551        assert!(temp_file_path.exists());
552
553        drop(dm);
554        assert!(temp_file_path.exists());
555
556        drop(temp_file);
557        assert!(!temp_file_path.exists());
558
559        // Test for the case using specified directories
560        let local_dir1 = TempDir::new()?;
561        let local_dir2 = TempDir::new()?;
562        let local_dir3 = TempDir::new()?;
563        let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
564        let dm = Arc::new(
565            DiskManagerBuilder::default()
566                .with_mode(DiskManagerMode::Directories(
567                    local_dirs.iter().map(|p| p.into()).collect(),
568                ))
569                .build()?,
570        );
571        let temp_file = dm.create_tmp_file("Testing")?;
572        let temp_file_path = temp_file.path().to_owned();
573        assert!(temp_file_path.exists());
574
575        drop(dm);
576        assert!(temp_file_path.exists());
577
578        drop(temp_file);
579        assert!(!temp_file_path.exists());
580
581        Ok(())
582    }
583
584    #[test]
585    fn test_disk_usage_basic() -> Result<()> {
586        use std::io::Write;
587
588        let dm = Arc::new(DiskManagerBuilder::default().build()?);
589        let mut temp_file = dm.create_tmp_file("Testing")?;
590
591        // Initially, disk usage should be 0
592        assert_eq!(dm.used_disk_space(), 0);
593        assert_eq!(temp_file.current_disk_usage(), 0);
594
595        // Write some data to the file
596        temp_file.inner().as_file().write_all(b"hello world")?;
597        temp_file.update_disk_usage()?;
598
599        // Disk usage should now reflect the written data
600        let expected_usage = temp_file.current_disk_usage();
601        assert!(expected_usage > 0);
602        assert_eq!(dm.used_disk_space(), expected_usage);
603
604        // Write more data
605        temp_file.inner().as_file().write_all(b" more data")?;
606        temp_file.update_disk_usage()?;
607
608        // Disk usage should increase
609        let new_usage = temp_file.current_disk_usage();
610        assert!(new_usage > expected_usage);
611        assert_eq!(dm.used_disk_space(), new_usage);
612
613        // Drop the file
614        drop(temp_file);
615
616        // Disk usage should return to 0
617        assert_eq!(dm.used_disk_space(), 0);
618
619        Ok(())
620    }
621
622    #[test]
623    fn test_disk_usage_with_clones() -> Result<()> {
624        use std::io::Write;
625
626        let dm = Arc::new(DiskManagerBuilder::default().build()?);
627        let mut temp_file = dm.create_tmp_file("Testing")?;
628
629        // Write some data
630        temp_file.inner().as_file().write_all(b"test data")?;
631        temp_file.update_disk_usage()?;
632
633        let usage_after_write = temp_file.current_disk_usage();
634        assert!(usage_after_write > 0);
635        assert_eq!(dm.used_disk_space(), usage_after_write);
636
637        // Clone the file
638        let clone1 = temp_file.clone();
639        let clone2 = temp_file.clone();
640
641        // All clones should see the same disk usage
642        assert_eq!(clone1.current_disk_usage(), usage_after_write);
643        assert_eq!(clone2.current_disk_usage(), usage_after_write);
644
645        // Global disk usage should still be the same (not multiplied by number of clones)
646        assert_eq!(dm.used_disk_space(), usage_after_write);
647
648        // Write more data through one clone
649        clone1.inner().as_file().write_all(b" more data")?;
650        let mut mutable_clone1 = clone1;
651        mutable_clone1.update_disk_usage()?;
652
653        let new_usage = mutable_clone1.current_disk_usage();
654        assert!(new_usage > usage_after_write);
655
656        // All clones should see the updated disk usage
657        assert_eq!(temp_file.current_disk_usage(), new_usage);
658        assert_eq!(clone2.current_disk_usage(), new_usage);
659        assert_eq!(mutable_clone1.current_disk_usage(), new_usage);
660
661        // Global disk usage should reflect the new size (not multiplied)
662        assert_eq!(dm.used_disk_space(), new_usage);
663
664        // Drop one clone
665        drop(mutable_clone1);
666
667        // Disk usage should NOT change (other clones still exist)
668        assert_eq!(dm.used_disk_space(), new_usage);
669        assert_eq!(temp_file.current_disk_usage(), new_usage);
670        assert_eq!(clone2.current_disk_usage(), new_usage);
671
672        // Drop another clone
673        drop(clone2);
674
675        // Disk usage should still NOT change (original still exists)
676        assert_eq!(dm.used_disk_space(), new_usage);
677        assert_eq!(temp_file.current_disk_usage(), new_usage);
678
679        // Drop the original
680        drop(temp_file);
681
682        // Now disk usage should return to 0 (last reference dropped)
683        assert_eq!(dm.used_disk_space(), 0);
684
685        Ok(())
686    }
687
688    #[test]
689    fn test_disk_usage_clones_dropped_out_of_order() -> Result<()> {
690        use std::io::Write;
691
692        let dm = Arc::new(DiskManagerBuilder::default().build()?);
693        let mut temp_file = dm.create_tmp_file("Testing")?;
694
695        // Write data
696        temp_file.inner().as_file().write_all(b"test")?;
697        temp_file.update_disk_usage()?;
698
699        let usage = temp_file.current_disk_usage();
700        assert_eq!(dm.used_disk_space(), usage);
701
702        // Create multiple clones
703        let clone1 = temp_file.clone();
704        let clone2 = temp_file.clone();
705        let clone3 = temp_file.clone();
706
707        // Drop the original first (out of order)
708        drop(temp_file);
709
710        // Disk usage should still be tracked (clones exist)
711        assert_eq!(dm.used_disk_space(), usage);
712        assert_eq!(clone1.current_disk_usage(), usage);
713
714        // Drop clones in different order
715        drop(clone2);
716        assert_eq!(dm.used_disk_space(), usage);
717
718        drop(clone1);
719        assert_eq!(dm.used_disk_space(), usage);
720
721        // Drop the last clone
722        drop(clone3);
723
724        // Now disk usage should be 0
725        assert_eq!(dm.used_disk_space(), 0);
726
727        Ok(())
728    }
729
730    #[test]
731    fn test_disk_usage_multiple_files() -> Result<()> {
732        use std::io::Write;
733
734        let dm = Arc::new(DiskManagerBuilder::default().build()?);
735
736        // Create multiple temp files
737        let mut file1 = dm.create_tmp_file("Testing1")?;
738        let mut file2 = dm.create_tmp_file("Testing2")?;
739
740        // Write to first file
741        file1.inner().as_file().write_all(b"file1")?;
742        file1.update_disk_usage()?;
743        let usage1 = file1.current_disk_usage();
744
745        assert_eq!(dm.used_disk_space(), usage1);
746
747        // Write to second file
748        file2.inner().as_file().write_all(b"file2 data")?;
749        file2.update_disk_usage()?;
750        let usage2 = file2.current_disk_usage();
751
752        // Global usage should be sum of both files
753        assert_eq!(dm.used_disk_space(), usage1 + usage2);
754
755        // Drop first file
756        drop(file1);
757
758        // Usage should only reflect second file
759        assert_eq!(dm.used_disk_space(), usage2);
760
761        // Drop second file
762        drop(file2);
763
764        // Usage should be 0
765        assert_eq!(dm.used_disk_space(), 0);
766
767        Ok(())
768    }
769}