datafusion_execution/
disk_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DiskManager`]: Manages files generated during query execution
19
20use datafusion_common::{
21    config_err, resources_datafusion_err, resources_err, DataFusionError, Result,
22};
23use log::debug;
24use parking_lot::Mutex;
25use rand::{rng, Rng};
26use std::path::{Path, PathBuf};
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use tempfile::{Builder, NamedTempFile, TempDir};
30
31use crate::memory_pool::human_readable_size;
32
33const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB
34
35/// Builder pattern for the [DiskManager] structure
36#[derive(Clone, Debug)]
37pub struct DiskManagerBuilder {
38    /// The storage mode of the disk manager
39    mode: DiskManagerMode,
40    /// The maximum amount of data (in bytes) stored inside the temporary directories.
41    /// Default to 100GB
42    max_temp_directory_size: u64,
43}
44
45impl Default for DiskManagerBuilder {
46    fn default() -> Self {
47        Self {
48            mode: DiskManagerMode::OsTmpDirectory,
49            max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
50        }
51    }
52}
53
54impl DiskManagerBuilder {
55    pub fn set_mode(&mut self, mode: DiskManagerMode) {
56        self.mode = mode;
57    }
58
59    pub fn with_mode(mut self, mode: DiskManagerMode) -> Self {
60        self.set_mode(mode);
61        self
62    }
63
64    pub fn set_max_temp_directory_size(&mut self, value: u64) {
65        self.max_temp_directory_size = value;
66    }
67
68    pub fn with_max_temp_directory_size(mut self, value: u64) -> Self {
69        self.set_max_temp_directory_size(value);
70        self
71    }
72
73    /// Create a DiskManager given the builder
74    pub fn build(self) -> Result<DiskManager> {
75        match self.mode {
76            DiskManagerMode::OsTmpDirectory => Ok(DiskManager {
77                local_dirs: Mutex::new(Some(vec![])),
78                max_temp_directory_size: self.max_temp_directory_size,
79                used_disk_space: Arc::new(AtomicU64::new(0)),
80            }),
81            DiskManagerMode::Directories(conf_dirs) => {
82                let local_dirs = create_local_dirs(conf_dirs)?;
83                debug!(
84                    "Created local dirs {local_dirs:?} as DataFusion working directory"
85                );
86                Ok(DiskManager {
87                    local_dirs: Mutex::new(Some(local_dirs)),
88                    max_temp_directory_size: self.max_temp_directory_size,
89                    used_disk_space: Arc::new(AtomicU64::new(0)),
90                })
91            }
92            DiskManagerMode::Disabled => Ok(DiskManager {
93                local_dirs: Mutex::new(None),
94                max_temp_directory_size: self.max_temp_directory_size,
95                used_disk_space: Arc::new(AtomicU64::new(0)),
96            }),
97        }
98    }
99}
100
101#[derive(Clone, Debug, Default)]
102pub enum DiskManagerMode {
103    /// Create a new [DiskManager] that creates temporary files within
104    /// a temporary directory chosen by the OS
105    #[default]
106    OsTmpDirectory,
107
108    /// Create a new [DiskManager] that creates temporary files within
109    /// the specified directories. One of the directories will be chosen
110    /// at random for each temporary file created.
111    Directories(Vec<PathBuf>),
112
113    /// Disable disk manager, attempts to create temporary files will error
114    Disabled,
115}
116
117/// Configuration for temporary disk access
118#[allow(deprecated)]
119#[deprecated(since = "48.0.0", note = "Use DiskManagerBuilder instead")]
120#[derive(Debug, Clone, Default)]
121pub enum DiskManagerConfig {
122    /// Use the provided [DiskManager] instance
123    Existing(Arc<DiskManager>),
124
125    /// Create a new [DiskManager] that creates temporary files within
126    /// a temporary directory chosen by the OS
127    #[default]
128    NewOs,
129
130    /// Create a new [DiskManager] that creates temporary files within
131    /// the specified directories
132    NewSpecified(Vec<PathBuf>),
133
134    /// Disable disk manager, attempts to create temporary files will error
135    Disabled,
136}
137
138#[allow(deprecated)]
139impl DiskManagerConfig {
140    /// Create temporary files in a temporary directory chosen by the OS
141    pub fn new() -> Self {
142        Self::default()
143    }
144
145    /// Create temporary files using the provided disk manager
146    pub fn new_existing(existing: Arc<DiskManager>) -> Self {
147        Self::Existing(existing)
148    }
149
150    /// Create temporary files in the specified directories
151    pub fn new_specified(paths: Vec<PathBuf>) -> Self {
152        Self::NewSpecified(paths)
153    }
154}
155
156/// Manages files generated during query execution, e.g. spill files generated
157/// while processing dataset larger than available memory.
158#[derive(Debug)]
159pub struct DiskManager {
160    /// TempDirs to put temporary files in.
161    ///
162    /// If `Some(vec![])` a new OS specified temporary directory will be created
163    /// If `None` an error will be returned (configured not to spill)
164    local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
165    /// The maximum amount of data (in bytes) stored inside the temporary directories.
166    /// Default to 100GB
167    max_temp_directory_size: u64,
168    /// Used disk space in the temporary directories. Now only spilled data for
169    /// external executors are counted.
170    used_disk_space: Arc<AtomicU64>,
171}
172
173impl DiskManager {
174    /// Creates a builder for [DiskManager]
175    pub fn builder() -> DiskManagerBuilder {
176        DiskManagerBuilder::default()
177    }
178
179    /// Create a DiskManager given the configuration
180    #[allow(deprecated)]
181    #[deprecated(since = "48.0.0", note = "Use DiskManager::builder() instead")]
182    pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
183        match config {
184            DiskManagerConfig::Existing(manager) => Ok(manager),
185            DiskManagerConfig::NewOs => Ok(Arc::new(Self {
186                local_dirs: Mutex::new(Some(vec![])),
187                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
188                used_disk_space: Arc::new(AtomicU64::new(0)),
189            })),
190            DiskManagerConfig::NewSpecified(conf_dirs) => {
191                let local_dirs = create_local_dirs(conf_dirs)?;
192                debug!(
193                    "Created local dirs {local_dirs:?} as DataFusion working directory"
194                );
195                Ok(Arc::new(Self {
196                    local_dirs: Mutex::new(Some(local_dirs)),
197                    max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
198                    used_disk_space: Arc::new(AtomicU64::new(0)),
199                }))
200            }
201            DiskManagerConfig::Disabled => Ok(Arc::new(Self {
202                local_dirs: Mutex::new(None),
203                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
204                used_disk_space: Arc::new(AtomicU64::new(0)),
205            })),
206        }
207    }
208
209    pub fn set_max_temp_directory_size(
210        &mut self,
211        max_temp_directory_size: u64,
212    ) -> Result<()> {
213        // If the disk manager is disabled and `max_temp_directory_size` is not 0,
214        // this operation is not meaningful, fail early.
215        if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
216            return config_err!(
217                "Cannot set max temp directory size for a disk manager that spilling is disabled"
218            );
219        }
220
221        self.max_temp_directory_size = max_temp_directory_size;
222        Ok(())
223    }
224
225    pub fn set_arc_max_temp_directory_size(
226        this: &mut Arc<Self>,
227        max_temp_directory_size: u64,
228    ) -> Result<()> {
229        if let Some(inner) = Arc::get_mut(this) {
230            inner.set_max_temp_directory_size(max_temp_directory_size)?;
231            Ok(())
232        } else {
233            config_err!("DiskManager should be a single instance")
234        }
235    }
236
237    pub fn with_max_temp_directory_size(
238        mut self,
239        max_temp_directory_size: u64,
240    ) -> Result<Self> {
241        self.set_max_temp_directory_size(max_temp_directory_size)?;
242        Ok(self)
243    }
244
245    pub fn used_disk_space(&self) -> u64 {
246        self.used_disk_space.load(Ordering::Relaxed)
247    }
248
249    /// Return true if this disk manager supports creating temporary
250    /// files. If this returns false, any call to `create_tmp_file`
251    /// will error.
252    pub fn tmp_files_enabled(&self) -> bool {
253        self.local_dirs.lock().is_some()
254    }
255
256    /// Return a temporary file from a randomized choice in the configured locations
257    ///
258    /// If the file can not be created for some reason, returns an
259    /// error message referencing the request description
260    pub fn create_tmp_file(
261        self: &Arc<Self>,
262        request_description: &str,
263    ) -> Result<RefCountedTempFile> {
264        let mut guard = self.local_dirs.lock();
265        let local_dirs = guard.as_mut().ok_or_else(|| {
266            resources_datafusion_err!(
267                "Memory Exhausted while {request_description} (DiskManager is disabled)"
268            )
269        })?;
270
271        // Create a temporary directory if needed
272        if local_dirs.is_empty() {
273            let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
274
275            debug!(
276                "Created directory '{:?}' as DataFusion tempfile directory for {}",
277                tempdir.path().to_string_lossy(),
278                request_description,
279            );
280
281            local_dirs.push(Arc::new(tempdir));
282        }
283
284        let dir_index = rng().random_range(0..local_dirs.len());
285        Ok(RefCountedTempFile {
286            parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
287            tempfile: Arc::new(
288                Builder::new()
289                    .tempfile_in(local_dirs[dir_index].as_ref())
290                    .map_err(DataFusionError::IoError)?,
291            ),
292            current_file_disk_usage: Arc::new(AtomicU64::new(0)),
293            disk_manager: Arc::clone(self),
294        })
295    }
296}
297
298/// A wrapper around a [`NamedTempFile`] that also contains
299/// a reference to its parent temporary directory.
300///
301/// # Note
302/// After any modification to the underlying file (e.g., writing data to it), the caller
303/// must invoke [`Self::update_disk_usage`] to update the global disk usage counter.
304/// This ensures the disk manager can properly enforce usage limits configured by
305/// [`DiskManager::with_max_temp_directory_size`].
306///
307/// This type is Clone-able, allowing multiple references to the same underlying file.
308/// The file is deleted only when the last reference is dropped.
309///
310/// The parent temporary directory is also kept alive as long as any reference to
311/// this file exists, preventing premature cleanup of the directory.
312///
313/// Once all references to this file are dropped, the file is deleted, and the
314/// disk usage is subtracted from the disk manager's total.
315#[derive(Debug)]
316pub struct RefCountedTempFile {
317    /// The reference to the directory in which temporary files are created to ensure
318    /// it is not cleaned up prior to the NamedTempFile
319    parent_temp_dir: Arc<TempDir>,
320    /// The underlying temporary file, wrapped in Arc to allow cloning
321    tempfile: Arc<NamedTempFile>,
322    /// Tracks the current disk usage of this temporary file. See
323    /// [`Self::update_disk_usage`] for more details.
324    ///
325    /// This is wrapped in `Arc<AtomicU64>` so that all clones share the same
326    /// disk usage tracking, preventing incorrect accounting when clones are dropped.
327    current_file_disk_usage: Arc<AtomicU64>,
328    /// The disk manager that created and manages this temporary file
329    disk_manager: Arc<DiskManager>,
330}
331
332impl Clone for RefCountedTempFile {
333    fn clone(&self) -> Self {
334        Self {
335            parent_temp_dir: Arc::clone(&self.parent_temp_dir),
336            tempfile: Arc::clone(&self.tempfile),
337            current_file_disk_usage: Arc::clone(&self.current_file_disk_usage),
338            disk_manager: Arc::clone(&self.disk_manager),
339        }
340    }
341}
342
343impl RefCountedTempFile {
344    pub fn path(&self) -> &Path {
345        self.tempfile.path()
346    }
347
348    pub fn inner(&self) -> &NamedTempFile {
349        self.tempfile.as_ref()
350    }
351
352    /// Updates the global disk usage counter after modifications to the underlying file.
353    ///
354    /// # Errors
355    /// - Returns an error if the global disk usage exceeds the configured limit.
356    pub fn update_disk_usage(&mut self) -> Result<()> {
357        // Get new file size from OS
358        let metadata = self.tempfile.as_file().metadata()?;
359        let new_disk_usage = metadata.len();
360
361        // Get the old disk usage
362        let old_disk_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
363
364        // Update the global disk usage by:
365        // 1. Subtracting the old file size from the global counter
366        self.disk_manager
367            .used_disk_space
368            .fetch_sub(old_disk_usage, Ordering::Relaxed);
369        // 2. Adding the new file size to the global counter
370        self.disk_manager
371            .used_disk_space
372            .fetch_add(new_disk_usage, Ordering::Relaxed);
373
374        // 3. Check if the updated global disk usage exceeds the configured limit
375        let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
376        if global_disk_usage > self.disk_manager.max_temp_directory_size {
377            return resources_err!(
378                "The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
379                human_readable_size(self.disk_manager.max_temp_directory_size as usize)
380            );
381        }
382
383        // 4. Update the local file size tracking
384        self.current_file_disk_usage
385            .store(new_disk_usage, Ordering::Relaxed);
386
387        Ok(())
388    }
389
390    pub fn current_disk_usage(&self) -> u64 {
391        self.current_file_disk_usage.load(Ordering::Relaxed)
392    }
393}
394
395/// When the temporary file is dropped, subtract its disk usage from the disk manager's total
396impl Drop for RefCountedTempFile {
397    fn drop(&mut self) {
398        // Only subtract disk usage when this is the last reference to the file
399        // Check if we're the last one by seeing if there's only one strong reference
400        // left to the underlying tempfile (the one we're holding)
401        if Arc::strong_count(&self.tempfile) == 1 {
402            let current_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
403            self.disk_manager
404                .used_disk_space
405                .fetch_sub(current_usage, Ordering::Relaxed);
406        }
407    }
408}
409
410/// Setup local dirs by creating one new dir in each of the given dirs
411fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<Arc<TempDir>>> {
412    local_dirs
413        .iter()
414        .map(|root| {
415            if !Path::new(root).exists() {
416                std::fs::create_dir(root)?;
417            }
418            Builder::new()
419                .prefix("datafusion-")
420                .tempdir_in(root)
421                .map_err(DataFusionError::IoError)
422        })
423        .map(|result| result.map(Arc::new))
424        .collect()
425}
426
427#[cfg(test)]
428mod tests {
429    use super::*;
430
431    #[test]
432    fn lazy_temp_dir_creation() -> Result<()> {
433        // A default configuration should not create temp files until requested
434        let dm = Arc::new(DiskManagerBuilder::default().build()?);
435
436        assert_eq!(0, local_dir_snapshot(&dm).len());
437
438        // can still create a tempfile however:
439        let actual = dm.create_tmp_file("Testing")?;
440
441        // Now the tempdir has been created on demand
442        assert_eq!(1, local_dir_snapshot(&dm).len());
443
444        // the returned tempfile file should be in the temp directory
445        let local_dirs = local_dir_snapshot(&dm);
446        assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
447
448        Ok(())
449    }
450
451    fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
452        dm.local_dirs
453            .lock()
454            .iter()
455            .flatten()
456            .map(|p| p.path().into())
457            .collect()
458    }
459
460    #[test]
461    fn file_in_right_dir() -> Result<()> {
462        let local_dir1 = TempDir::new()?;
463        let local_dir2 = TempDir::new()?;
464        let local_dir3 = TempDir::new()?;
465        let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
466        let dm = Arc::new(
467            DiskManagerBuilder::default()
468                .with_mode(DiskManagerMode::Directories(
469                    local_dirs.iter().map(|p| p.into()).collect(),
470                ))
471                .build()?,
472        );
473
474        assert!(dm.tmp_files_enabled());
475        let actual = dm.create_tmp_file("Testing")?;
476
477        // the file should be in one of the specified local directories
478        assert_path_in_dirs(actual.path(), local_dirs.into_iter());
479
480        Ok(())
481    }
482
483    #[test]
484    fn test_disabled_disk_manager() {
485        let manager = Arc::new(
486            DiskManagerBuilder::default()
487                .with_mode(DiskManagerMode::Disabled)
488                .build()
489                .unwrap(),
490        );
491        assert!(!manager.tmp_files_enabled());
492        assert_eq!(
493            manager.create_tmp_file("Testing").unwrap_err().strip_backtrace(),
494            "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
495        )
496    }
497
498    #[test]
499    fn test_disk_manager_create_spill_folder() {
500        let dir = TempDir::new().unwrap();
501        DiskManagerBuilder::default()
502            .with_mode(DiskManagerMode::Directories(vec![dir.path().to_path_buf()]))
503            .build()
504            .unwrap();
505    }
506
507    /// Asserts that `file_path` is found anywhere in any of `dir` directories
508    fn assert_path_in_dirs<'a>(
509        file_path: &'a Path,
510        dirs: impl Iterator<Item = &'a Path>,
511    ) {
512        let dirs: Vec<&Path> = dirs.collect();
513
514        let found = dirs.iter().any(|dir_path| {
515            file_path
516                .ancestors()
517                .any(|candidate_path| *dir_path == candidate_path)
518        });
519
520        assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
521    }
522
523    #[test]
524    fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
525        // Test for the case using OS arranged temporary directory
526        let dm = Arc::new(DiskManagerBuilder::default().build()?);
527        let temp_file = dm.create_tmp_file("Testing")?;
528        let temp_file_path = temp_file.path().to_owned();
529        assert!(temp_file_path.exists());
530
531        drop(dm);
532        assert!(temp_file_path.exists());
533
534        drop(temp_file);
535        assert!(!temp_file_path.exists());
536
537        // Test for the case using specified directories
538        let local_dir1 = TempDir::new()?;
539        let local_dir2 = TempDir::new()?;
540        let local_dir3 = TempDir::new()?;
541        let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
542        let dm = Arc::new(
543            DiskManagerBuilder::default()
544                .with_mode(DiskManagerMode::Directories(
545                    local_dirs.iter().map(|p| p.into()).collect(),
546                ))
547                .build()?,
548        );
549        let temp_file = dm.create_tmp_file("Testing")?;
550        let temp_file_path = temp_file.path().to_owned();
551        assert!(temp_file_path.exists());
552
553        drop(dm);
554        assert!(temp_file_path.exists());
555
556        drop(temp_file);
557        assert!(!temp_file_path.exists());
558
559        Ok(())
560    }
561
562    #[test]
563    fn test_disk_usage_basic() -> Result<()> {
564        use std::io::Write;
565
566        let dm = Arc::new(DiskManagerBuilder::default().build()?);
567        let mut temp_file = dm.create_tmp_file("Testing")?;
568
569        // Initially, disk usage should be 0
570        assert_eq!(dm.used_disk_space(), 0);
571        assert_eq!(temp_file.current_disk_usage(), 0);
572
573        // Write some data to the file
574        temp_file.inner().as_file().write_all(b"hello world")?;
575        temp_file.update_disk_usage()?;
576
577        // Disk usage should now reflect the written data
578        let expected_usage = temp_file.current_disk_usage();
579        assert!(expected_usage > 0);
580        assert_eq!(dm.used_disk_space(), expected_usage);
581
582        // Write more data
583        temp_file.inner().as_file().write_all(b" more data")?;
584        temp_file.update_disk_usage()?;
585
586        // Disk usage should increase
587        let new_usage = temp_file.current_disk_usage();
588        assert!(new_usage > expected_usage);
589        assert_eq!(dm.used_disk_space(), new_usage);
590
591        // Drop the file
592        drop(temp_file);
593
594        // Disk usage should return to 0
595        assert_eq!(dm.used_disk_space(), 0);
596
597        Ok(())
598    }
599
600    #[test]
601    fn test_disk_usage_with_clones() -> Result<()> {
602        use std::io::Write;
603
604        let dm = Arc::new(DiskManagerBuilder::default().build()?);
605        let mut temp_file = dm.create_tmp_file("Testing")?;
606
607        // Write some data
608        temp_file.inner().as_file().write_all(b"test data")?;
609        temp_file.update_disk_usage()?;
610
611        let usage_after_write = temp_file.current_disk_usage();
612        assert!(usage_after_write > 0);
613        assert_eq!(dm.used_disk_space(), usage_after_write);
614
615        // Clone the file
616        let clone1 = temp_file.clone();
617        let clone2 = temp_file.clone();
618
619        // All clones should see the same disk usage
620        assert_eq!(clone1.current_disk_usage(), usage_after_write);
621        assert_eq!(clone2.current_disk_usage(), usage_after_write);
622
623        // Global disk usage should still be the same (not multiplied by number of clones)
624        assert_eq!(dm.used_disk_space(), usage_after_write);
625
626        // Write more data through one clone
627        clone1.inner().as_file().write_all(b" more data")?;
628        let mut mutable_clone1 = clone1;
629        mutable_clone1.update_disk_usage()?;
630
631        let new_usage = mutable_clone1.current_disk_usage();
632        assert!(new_usage > usage_after_write);
633
634        // All clones should see the updated disk usage
635        assert_eq!(temp_file.current_disk_usage(), new_usage);
636        assert_eq!(clone2.current_disk_usage(), new_usage);
637        assert_eq!(mutable_clone1.current_disk_usage(), new_usage);
638
639        // Global disk usage should reflect the new size (not multiplied)
640        assert_eq!(dm.used_disk_space(), new_usage);
641
642        // Drop one clone
643        drop(mutable_clone1);
644
645        // Disk usage should NOT change (other clones still exist)
646        assert_eq!(dm.used_disk_space(), new_usage);
647        assert_eq!(temp_file.current_disk_usage(), new_usage);
648        assert_eq!(clone2.current_disk_usage(), new_usage);
649
650        // Drop another clone
651        drop(clone2);
652
653        // Disk usage should still NOT change (original still exists)
654        assert_eq!(dm.used_disk_space(), new_usage);
655        assert_eq!(temp_file.current_disk_usage(), new_usage);
656
657        // Drop the original
658        drop(temp_file);
659
660        // Now disk usage should return to 0 (last reference dropped)
661        assert_eq!(dm.used_disk_space(), 0);
662
663        Ok(())
664    }
665
666    #[test]
667    fn test_disk_usage_clones_dropped_out_of_order() -> Result<()> {
668        use std::io::Write;
669
670        let dm = Arc::new(DiskManagerBuilder::default().build()?);
671        let mut temp_file = dm.create_tmp_file("Testing")?;
672
673        // Write data
674        temp_file.inner().as_file().write_all(b"test")?;
675        temp_file.update_disk_usage()?;
676
677        let usage = temp_file.current_disk_usage();
678        assert_eq!(dm.used_disk_space(), usage);
679
680        // Create multiple clones
681        let clone1 = temp_file.clone();
682        let clone2 = temp_file.clone();
683        let clone3 = temp_file.clone();
684
685        // Drop the original first (out of order)
686        drop(temp_file);
687
688        // Disk usage should still be tracked (clones exist)
689        assert_eq!(dm.used_disk_space(), usage);
690        assert_eq!(clone1.current_disk_usage(), usage);
691
692        // Drop clones in different order
693        drop(clone2);
694        assert_eq!(dm.used_disk_space(), usage);
695
696        drop(clone1);
697        assert_eq!(dm.used_disk_space(), usage);
698
699        // Drop the last clone
700        drop(clone3);
701
702        // Now disk usage should be 0
703        assert_eq!(dm.used_disk_space(), 0);
704
705        Ok(())
706    }
707
708    #[test]
709    fn test_disk_usage_multiple_files() -> Result<()> {
710        use std::io::Write;
711
712        let dm = Arc::new(DiskManagerBuilder::default().build()?);
713
714        // Create multiple temp files
715        let mut file1 = dm.create_tmp_file("Testing1")?;
716        let mut file2 = dm.create_tmp_file("Testing2")?;
717
718        // Write to first file
719        file1.inner().as_file().write_all(b"file1")?;
720        file1.update_disk_usage()?;
721        let usage1 = file1.current_disk_usage();
722
723        assert_eq!(dm.used_disk_space(), usage1);
724
725        // Write to second file
726        file2.inner().as_file().write_all(b"file2 data")?;
727        file2.update_disk_usage()?;
728        let usage2 = file2.current_disk_usage();
729
730        // Global usage should be sum of both files
731        assert_eq!(dm.used_disk_space(), usage1 + usage2);
732
733        // Drop first file
734        drop(file1);
735
736        // Usage should only reflect second file
737        assert_eq!(dm.used_disk_space(), usage2);
738
739        // Drop second file
740        drop(file2);
741
742        // Usage should be 0
743        assert_eq!(dm.used_disk_space(), 0);
744
745        Ok(())
746    }
747}