Skip to main content

datafusion_execution/
disk_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DiskManager`]: Manages files generated during query execution
19
20use datafusion_common::{
21    DataFusionError, Result, config_err, resources_datafusion_err, resources_err,
22};
23use log::debug;
24use parking_lot::Mutex;
25use rand::{Rng, rng};
26use std::path::{Path, PathBuf};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
29use tempfile::{Builder, NamedTempFile, TempDir};
30
31use datafusion_common::human_readable_size;
32
33pub const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB
34
35/// Builder pattern for the [DiskManager] structure
36#[derive(Clone, Debug)]
37pub struct DiskManagerBuilder {
38    /// The storage mode of the disk manager
39    mode: DiskManagerMode,
40    /// The maximum amount of data (in bytes) stored inside the temporary directories.
41    /// Default to 100GB
42    max_temp_directory_size: u64,
43}
44
45impl Default for DiskManagerBuilder {
46    fn default() -> Self {
47        Self {
48            mode: DiskManagerMode::OsTmpDirectory,
49            max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
50        }
51    }
52}
53
54impl DiskManagerBuilder {
55    pub fn set_mode(&mut self, mode: DiskManagerMode) {
56        self.mode = mode;
57    }
58
59    pub fn with_mode(mut self, mode: DiskManagerMode) -> Self {
60        self.set_mode(mode);
61        self
62    }
63
64    pub fn set_max_temp_directory_size(&mut self, value: u64) {
65        self.max_temp_directory_size = value;
66    }
67
68    pub fn with_max_temp_directory_size(mut self, value: u64) -> Self {
69        self.set_max_temp_directory_size(value);
70        self
71    }
72
73    /// Create a DiskManager given the builder
74    pub fn build(self) -> Result<DiskManager> {
75        match self.mode {
76            DiskManagerMode::OsTmpDirectory => Ok(DiskManager {
77                local_dirs: Mutex::new(Some(vec![])),
78                max_temp_directory_size: self.max_temp_directory_size,
79                used_disk_space: Arc::new(AtomicU64::new(0)),
80                active_files_count: Arc::new(AtomicUsize::new(0)),
81            }),
82            DiskManagerMode::Directories(conf_dirs) => {
83                let local_dirs = create_local_dirs(&conf_dirs)?;
84                debug!(
85                    "Created local dirs {local_dirs:?} as DataFusion working directory"
86                );
87                Ok(DiskManager {
88                    local_dirs: Mutex::new(Some(local_dirs)),
89                    max_temp_directory_size: self.max_temp_directory_size,
90                    used_disk_space: Arc::new(AtomicU64::new(0)),
91                    active_files_count: Arc::new(AtomicUsize::new(0)),
92                })
93            }
94            DiskManagerMode::Disabled => Ok(DiskManager {
95                local_dirs: Mutex::new(None),
96                max_temp_directory_size: self.max_temp_directory_size,
97                used_disk_space: Arc::new(AtomicU64::new(0)),
98                active_files_count: Arc::new(AtomicUsize::new(0)),
99            }),
100        }
101    }
102}
103
104#[derive(Clone, Debug, Default)]
105pub enum DiskManagerMode {
106    /// Create a new [DiskManager] that creates temporary files within
107    /// a temporary directory chosen by the OS
108    #[default]
109    OsTmpDirectory,
110
111    /// Create a new [DiskManager] that creates temporary files within
112    /// the specified directories. One of the directories will be chosen
113    /// at random for each temporary file created.
114    Directories(Vec<PathBuf>),
115
116    /// Disable disk manager, attempts to create temporary files will error
117    Disabled,
118}
119
120/// Configuration for temporary disk access
121#[deprecated(since = "48.0.0", note = "Use DiskManagerBuilder instead")]
122#[derive(Debug, Clone, Default)]
123#[allow(clippy::allow_attributes)]
124#[allow(deprecated)]
125pub enum DiskManagerConfig {
126    /// Use the provided [DiskManager] instance
127    Existing(Arc<DiskManager>),
128
129    /// Create a new [DiskManager] that creates temporary files within
130    /// a temporary directory chosen by the OS
131    #[default]
132    NewOs,
133
134    /// Create a new [DiskManager] that creates temporary files within
135    /// the specified directories
136    NewSpecified(Vec<PathBuf>),
137
138    /// Disable disk manager, attempts to create temporary files will error
139    Disabled,
140}
141
142#[expect(deprecated)]
143impl DiskManagerConfig {
144    /// Create temporary files in a temporary directory chosen by the OS
145    pub fn new() -> Self {
146        Self::default()
147    }
148
149    /// Create temporary files using the provided disk manager
150    pub fn new_existing(existing: Arc<DiskManager>) -> Self {
151        Self::Existing(existing)
152    }
153
154    /// Create temporary files in the specified directories
155    pub fn new_specified(paths: Vec<PathBuf>) -> Self {
156        Self::NewSpecified(paths)
157    }
158}
159
160/// Manages files generated during query execution, e.g. spill files generated
161/// while processing dataset larger than available memory.
162#[derive(Debug)]
163pub struct DiskManager {
164    /// TempDirs to put temporary files in.
165    ///
166    /// If `Some(vec![])` a new OS specified temporary directory will be created
167    /// If `None` an error will be returned (configured not to spill)
168    local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
169    /// The maximum amount of data (in bytes) stored inside the temporary directories.
170    /// Default to 100GB
171    max_temp_directory_size: u64,
172    /// Used disk space in the temporary directories. Now only spilled data for
173    /// external executors are counted.
174    used_disk_space: Arc<AtomicU64>,
175    /// Number of active temporary files created by this disk manager
176    active_files_count: Arc<AtomicUsize>,
177}
178
179/// Information about the current disk usage for spilling
180#[derive(Debug, Clone, Copy)]
181pub struct SpillingProgress {
182    /// Total bytes currently used on disk for spilling
183    pub current_bytes: u64,
184    /// Total number of active spill files
185    pub active_files_count: usize,
186}
187
188impl DiskManager {
189    /// Creates a builder for [DiskManager]
190    pub fn builder() -> DiskManagerBuilder {
191        DiskManagerBuilder::default()
192    }
193
194    /// Create a DiskManager given the configuration
195    #[expect(deprecated)]
196    #[deprecated(since = "48.0.0", note = "Use DiskManager::builder() instead")]
197    pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
198        match config {
199            DiskManagerConfig::Existing(manager) => Ok(manager),
200            DiskManagerConfig::NewOs => Ok(Arc::new(Self {
201                local_dirs: Mutex::new(Some(vec![])),
202                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
203                used_disk_space: Arc::new(AtomicU64::new(0)),
204                active_files_count: Arc::new(AtomicUsize::new(0)),
205            })),
206            DiskManagerConfig::NewSpecified(conf_dirs) => {
207                let local_dirs = create_local_dirs(&conf_dirs)?;
208                debug!(
209                    "Created local dirs {local_dirs:?} as DataFusion working directory"
210                );
211                Ok(Arc::new(Self {
212                    local_dirs: Mutex::new(Some(local_dirs)),
213                    max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
214                    used_disk_space: Arc::new(AtomicU64::new(0)),
215                    active_files_count: Arc::new(AtomicUsize::new(0)),
216                }))
217            }
218            DiskManagerConfig::Disabled => Ok(Arc::new(Self {
219                local_dirs: Mutex::new(None),
220                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
221                used_disk_space: Arc::new(AtomicU64::new(0)),
222                active_files_count: Arc::new(AtomicUsize::new(0)),
223            })),
224        }
225    }
226
227    pub fn set_max_temp_directory_size(
228        &mut self,
229        max_temp_directory_size: u64,
230    ) -> Result<()> {
231        // If the disk manager is disabled and `max_temp_directory_size` is not 0,
232        // this operation is not meaningful, fail early.
233        if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
234            return config_err!(
235                "Cannot set max temp directory size for a disk manager that spilling is disabled"
236            );
237        }
238
239        self.max_temp_directory_size = max_temp_directory_size;
240        Ok(())
241    }
242
243    pub fn set_arc_max_temp_directory_size(
244        this: &mut Arc<Self>,
245        max_temp_directory_size: u64,
246    ) -> Result<()> {
247        if let Some(inner) = Arc::get_mut(this) {
248            inner.set_max_temp_directory_size(max_temp_directory_size)?;
249            Ok(())
250        } else {
251            config_err!("DiskManager should be a single instance")
252        }
253    }
254
255    pub fn with_max_temp_directory_size(
256        mut self,
257        max_temp_directory_size: u64,
258    ) -> Result<Self> {
259        self.set_max_temp_directory_size(max_temp_directory_size)?;
260        Ok(self)
261    }
262
263    pub fn used_disk_space(&self) -> u64 {
264        self.used_disk_space.load(Ordering::Relaxed)
265    }
266
267    /// Returns the maximum temporary directory size in bytes
268    pub fn max_temp_directory_size(&self) -> u64 {
269        self.max_temp_directory_size
270    }
271
272    /// Returns the current spilling progress
273    pub fn spilling_progress(&self) -> SpillingProgress {
274        SpillingProgress {
275            current_bytes: self.used_disk_space.load(Ordering::Relaxed),
276            active_files_count: self.active_files_count.load(Ordering::Relaxed),
277        }
278    }
279
280    /// Returns the temporary directory paths
281    pub fn temp_dir_paths(&self) -> Vec<PathBuf> {
282        self.local_dirs
283            .lock()
284            .as_ref()
285            .map(|dirs| {
286                dirs.iter()
287                    .map(|temp_dir| temp_dir.path().to_path_buf())
288                    .collect()
289            })
290            .unwrap_or_default()
291    }
292
293    /// Return true if this disk manager supports creating temporary
294    /// files. If this returns false, any call to `create_tmp_file`
295    /// will error.
296    pub fn tmp_files_enabled(&self) -> bool {
297        self.local_dirs.lock().is_some()
298    }
299
300    /// Return a temporary file from a randomized choice in the configured locations
301    ///
302    /// If the file can not be created for some reason, returns an
303    /// error message referencing the request description
304    pub fn create_tmp_file(
305        self: &Arc<Self>,
306        request_description: &str,
307    ) -> Result<RefCountedTempFile> {
308        let mut guard = self.local_dirs.lock();
309        let local_dirs = guard.as_mut().ok_or_else(|| {
310            resources_datafusion_err!(
311                "Memory Exhausted while {request_description} (DiskManager is disabled)"
312            )
313        })?;
314
315        // Create a temporary directory if needed
316        if local_dirs.is_empty() {
317            let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
318
319            debug!(
320                "Created directory '{:?}' as DataFusion tempfile directory for {}",
321                tempdir.path().to_string_lossy(),
322                request_description,
323            );
324
325            local_dirs.push(Arc::new(tempdir));
326        }
327
328        let dir_index = rng().random_range(0..local_dirs.len());
329        self.active_files_count.fetch_add(1, Ordering::Relaxed);
330        Ok(RefCountedTempFile {
331            parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
332            tempfile: Arc::new(
333                Builder::new()
334                    .tempfile_in(local_dirs[dir_index].as_ref())
335                    .map_err(DataFusionError::IoError)?,
336            ),
337            current_file_disk_usage: Arc::new(AtomicU64::new(0)),
338            disk_manager: Arc::clone(self),
339        })
340    }
341}
342
343/// A wrapper around a [`NamedTempFile`] that also contains
344/// a reference to its parent temporary directory.
345///
346/// # Note
347/// After any modification to the underlying file (e.g., writing data to it), the caller
348/// must invoke [`Self::update_disk_usage`] to update the global disk usage counter.
349/// This ensures the disk manager can properly enforce usage limits configured by
350/// [`DiskManager::with_max_temp_directory_size`].
351///
352/// This type is Clone-able, allowing multiple references to the same underlying file.
353/// The file is deleted only when the last reference is dropped.
354///
355/// The parent temporary directory is also kept alive as long as any reference to
356/// this file exists, preventing premature cleanup of the directory.
357///
358/// Once all references to this file are dropped, the file is deleted, and the
359/// disk usage is subtracted from the disk manager's total.
360#[derive(Debug)]
361pub struct RefCountedTempFile {
362    /// The reference to the directory in which temporary files are created to ensure
363    /// it is not cleaned up prior to the NamedTempFile
364    parent_temp_dir: Arc<TempDir>,
365    /// The underlying temporary file, wrapped in Arc to allow cloning
366    tempfile: Arc<NamedTempFile>,
367    /// Tracks the current disk usage of this temporary file. See
368    /// [`Self::update_disk_usage`] for more details.
369    ///
370    /// This is wrapped in `Arc<AtomicU64>` so that all clones share the same
371    /// disk usage tracking, preventing incorrect accounting when clones are dropped.
372    current_file_disk_usage: Arc<AtomicU64>,
373    /// The disk manager that created and manages this temporary file
374    disk_manager: Arc<DiskManager>,
375}
376
377impl Clone for RefCountedTempFile {
378    fn clone(&self) -> Self {
379        Self {
380            parent_temp_dir: Arc::clone(&self.parent_temp_dir),
381            tempfile: Arc::clone(&self.tempfile),
382            current_file_disk_usage: Arc::clone(&self.current_file_disk_usage),
383            disk_manager: Arc::clone(&self.disk_manager),
384        }
385    }
386}
387
388impl RefCountedTempFile {
389    pub fn path(&self) -> &Path {
390        self.tempfile.path()
391    }
392
393    pub fn inner(&self) -> &NamedTempFile {
394        self.tempfile.as_ref()
395    }
396
397    /// Updates the global disk usage counter after modifications to the underlying file.
398    ///
399    /// # Errors
400    /// - Returns an error if the global disk usage exceeds the configured limit.
401    pub fn update_disk_usage(&mut self) -> Result<()> {
402        // Get new file size from OS
403        let metadata = self.tempfile.as_file().metadata()?;
404        let new_disk_usage = metadata.len();
405
406        // Get the old disk usage
407        let old_disk_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
408
409        // Update the global disk usage by:
410        // 1. Subtracting the old file size from the global counter
411        self.disk_manager
412            .used_disk_space
413            .fetch_sub(old_disk_usage, Ordering::Relaxed);
414        // 2. Adding the new file size to the global counter
415        self.disk_manager
416            .used_disk_space
417            .fetch_add(new_disk_usage, Ordering::Relaxed);
418
419        // 3. Check if the updated global disk usage exceeds the configured limit
420        let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
421        if global_disk_usage > self.disk_manager.max_temp_directory_size {
422            return resources_err!(
423                "The used disk space during the spilling process has exceeded the allowable limit of {}. Try increasing the `max_temp_directory_size` in the disk manager configuration.",
424                human_readable_size(self.disk_manager.max_temp_directory_size as usize)
425            );
426        }
427
428        // 4. Update the local file size tracking
429        self.current_file_disk_usage
430            .store(new_disk_usage, Ordering::Relaxed);
431
432        Ok(())
433    }
434
435    pub fn current_disk_usage(&self) -> u64 {
436        self.current_file_disk_usage.load(Ordering::Relaxed)
437    }
438}
439
440/// When the temporary file is dropped, subtract its disk usage from the disk manager's total
441impl Drop for RefCountedTempFile {
442    fn drop(&mut self) {
443        // Only subtract disk usage when this is the last reference to the file
444        // Check if we're the last one by seeing if there's only one strong reference
445        // left to the underlying tempfile (the one we're holding)
446        if Arc::strong_count(&self.tempfile) == 1 {
447            let current_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
448            self.disk_manager
449                .used_disk_space
450                .fetch_sub(current_usage, Ordering::Relaxed);
451            self.disk_manager
452                .active_files_count
453                .fetch_sub(1, Ordering::Relaxed);
454        }
455    }
456}
457
458/// Setup local dirs by creating one new dir in each of the given dirs
459fn create_local_dirs(local_dirs: &[PathBuf]) -> Result<Vec<Arc<TempDir>>> {
460    local_dirs
461        .iter()
462        .map(|root| {
463            if !Path::new(root).exists() {
464                std::fs::create_dir(root)?;
465            }
466            Builder::new()
467                .prefix("datafusion-")
468                .tempdir_in(root)
469                .map_err(DataFusionError::IoError)
470        })
471        .map(|result| result.map(Arc::new))
472        .collect()
473}
474
475#[cfg(test)]
476mod tests {
477    use super::*;
478
479    #[test]
480    fn lazy_temp_dir_creation() -> Result<()> {
481        // A default configuration should not create temp files until requested
482        let dm = Arc::new(DiskManagerBuilder::default().build()?);
483
484        assert_eq!(0, local_dir_snapshot(&dm).len());
485
486        // can still create a tempfile however:
487        let actual = dm.create_tmp_file("Testing")?;
488
489        // Now the tempdir has been created on demand
490        assert_eq!(1, local_dir_snapshot(&dm).len());
491
492        // the returned tempfile file should be in the temp directory
493        let local_dirs = local_dir_snapshot(&dm);
494        assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
495
496        Ok(())
497    }
498
499    fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
500        dm.local_dirs
501            .lock()
502            .iter()
503            .flatten()
504            .map(|p| p.path().into())
505            .collect()
506    }
507
508    #[test]
509    fn file_in_right_dir() -> Result<()> {
510        let local_dir1 = TempDir::new()?;
511        let local_dir2 = TempDir::new()?;
512        let local_dir3 = TempDir::new()?;
513        let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
514        let dm = Arc::new(
515            DiskManagerBuilder::default()
516                .with_mode(DiskManagerMode::Directories(
517                    local_dirs.iter().map(|p| p.into()).collect(),
518                ))
519                .build()?,
520        );
521
522        assert!(dm.tmp_files_enabled());
523        let actual = dm.create_tmp_file("Testing")?;
524
525        // the file should be in one of the specified local directories
526        assert_path_in_dirs(actual.path(), local_dirs.into_iter());
527
528        Ok(())
529    }
530
531    #[test]
532    fn test_disabled_disk_manager() {
533        let manager = Arc::new(
534            DiskManagerBuilder::default()
535                .with_mode(DiskManagerMode::Disabled)
536                .build()
537                .unwrap(),
538        );
539        assert!(!manager.tmp_files_enabled());
540        assert_eq!(
541            manager
542                .create_tmp_file("Testing")
543                .unwrap_err()
544                .strip_backtrace(),
545            "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
546        )
547    }
548
549    #[test]
550    fn test_disk_manager_create_spill_folder() {
551        let dir = TempDir::new().unwrap();
552        DiskManagerBuilder::default()
553            .with_mode(DiskManagerMode::Directories(vec![dir.path().to_path_buf()]))
554            .build()
555            .unwrap();
556    }
557
558    /// Asserts that `file_path` is found anywhere in any of `dir` directories
559    fn assert_path_in_dirs<'a>(
560        file_path: &'a Path,
561        dirs: impl Iterator<Item = &'a Path>,
562    ) {
563        let dirs: Vec<&Path> = dirs.collect();
564
565        let found = dirs.iter().any(|dir_path| {
566            file_path
567                .ancestors()
568                .any(|candidate_path| *dir_path == candidate_path)
569        });
570
571        assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
572    }
573
574    #[test]
575    fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
576        // Test for the case using OS arranged temporary directory
577        let dm = Arc::new(DiskManagerBuilder::default().build()?);
578        let temp_file = dm.create_tmp_file("Testing")?;
579        let temp_file_path = temp_file.path().to_owned();
580        assert!(temp_file_path.exists());
581
582        drop(dm);
583        assert!(temp_file_path.exists());
584
585        drop(temp_file);
586        assert!(!temp_file_path.exists());
587
588        // Test for the case using specified directories
589        let local_dir1 = TempDir::new()?;
590        let local_dir2 = TempDir::new()?;
591        let local_dir3 = TempDir::new()?;
592        let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
593        let dm = Arc::new(
594            DiskManagerBuilder::default()
595                .with_mode(DiskManagerMode::Directories(
596                    local_dirs.iter().map(|p| p.into()).collect(),
597                ))
598                .build()?,
599        );
600        let temp_file = dm.create_tmp_file("Testing")?;
601        let temp_file_path = temp_file.path().to_owned();
602        assert!(temp_file_path.exists());
603
604        drop(dm);
605        assert!(temp_file_path.exists());
606
607        drop(temp_file);
608        assert!(!temp_file_path.exists());
609
610        Ok(())
611    }
612
613    #[test]
614    fn test_disk_usage_basic() -> Result<()> {
615        use std::io::Write;
616
617        let dm = Arc::new(DiskManagerBuilder::default().build()?);
618        let mut temp_file = dm.create_tmp_file("Testing")?;
619
620        // Initially, disk usage should be 0
621        assert_eq!(dm.used_disk_space(), 0);
622        assert_eq!(temp_file.current_disk_usage(), 0);
623
624        // Write some data to the file
625        temp_file.inner().as_file().write_all(b"hello world")?;
626        temp_file.update_disk_usage()?;
627
628        // Disk usage should now reflect the written data
629        let expected_usage = temp_file.current_disk_usage();
630        assert!(expected_usage > 0);
631        assert_eq!(dm.used_disk_space(), expected_usage);
632
633        // Write more data
634        temp_file.inner().as_file().write_all(b" more data")?;
635        temp_file.update_disk_usage()?;
636
637        // Disk usage should increase
638        let new_usage = temp_file.current_disk_usage();
639        assert!(new_usage > expected_usage);
640        assert_eq!(dm.used_disk_space(), new_usage);
641
642        // Drop the file
643        drop(temp_file);
644
645        // Disk usage should return to 0
646        assert_eq!(dm.used_disk_space(), 0);
647
648        Ok(())
649    }
650
651    #[test]
652    fn test_disk_usage_with_clones() -> Result<()> {
653        use std::io::Write;
654
655        let dm = Arc::new(DiskManagerBuilder::default().build()?);
656        let mut temp_file = dm.create_tmp_file("Testing")?;
657
658        // Write some data
659        temp_file.inner().as_file().write_all(b"test data")?;
660        temp_file.update_disk_usage()?;
661
662        let usage_after_write = temp_file.current_disk_usage();
663        assert!(usage_after_write > 0);
664        assert_eq!(dm.used_disk_space(), usage_after_write);
665
666        // Clone the file
667        let clone1 = temp_file.clone();
668        let clone2 = temp_file.clone();
669
670        // All clones should see the same disk usage
671        assert_eq!(clone1.current_disk_usage(), usage_after_write);
672        assert_eq!(clone2.current_disk_usage(), usage_after_write);
673
674        // Global disk usage should still be the same (not multiplied by number of clones)
675        assert_eq!(dm.used_disk_space(), usage_after_write);
676
677        // Write more data through one clone
678        clone1.inner().as_file().write_all(b" more data")?;
679        let mut mutable_clone1 = clone1;
680        mutable_clone1.update_disk_usage()?;
681
682        let new_usage = mutable_clone1.current_disk_usage();
683        assert!(new_usage > usage_after_write);
684
685        // All clones should see the updated disk usage
686        assert_eq!(temp_file.current_disk_usage(), new_usage);
687        assert_eq!(clone2.current_disk_usage(), new_usage);
688        assert_eq!(mutable_clone1.current_disk_usage(), new_usage);
689
690        // Global disk usage should reflect the new size (not multiplied)
691        assert_eq!(dm.used_disk_space(), new_usage);
692
693        // Drop one clone
694        drop(mutable_clone1);
695
696        // Disk usage should NOT change (other clones still exist)
697        assert_eq!(dm.used_disk_space(), new_usage);
698        assert_eq!(temp_file.current_disk_usage(), new_usage);
699        assert_eq!(clone2.current_disk_usage(), new_usage);
700
701        // Drop another clone
702        drop(clone2);
703
704        // Disk usage should still NOT change (original still exists)
705        assert_eq!(dm.used_disk_space(), new_usage);
706        assert_eq!(temp_file.current_disk_usage(), new_usage);
707
708        // Drop the original
709        drop(temp_file);
710
711        // Now disk usage should return to 0 (last reference dropped)
712        assert_eq!(dm.used_disk_space(), 0);
713
714        Ok(())
715    }
716
717    #[test]
718    fn test_disk_usage_clones_dropped_out_of_order() -> Result<()> {
719        use std::io::Write;
720
721        let dm = Arc::new(DiskManagerBuilder::default().build()?);
722        let mut temp_file = dm.create_tmp_file("Testing")?;
723
724        // Write data
725        temp_file.inner().as_file().write_all(b"test")?;
726        temp_file.update_disk_usage()?;
727
728        let usage = temp_file.current_disk_usage();
729        assert_eq!(dm.used_disk_space(), usage);
730
731        // Create multiple clones
732        let clone1 = temp_file.clone();
733        let clone2 = temp_file.clone();
734        let clone3 = temp_file.clone();
735
736        // Drop the original first (out of order)
737        drop(temp_file);
738
739        // Disk usage should still be tracked (clones exist)
740        assert_eq!(dm.used_disk_space(), usage);
741        assert_eq!(clone1.current_disk_usage(), usage);
742
743        // Drop clones in different order
744        drop(clone2);
745        assert_eq!(dm.used_disk_space(), usage);
746
747        drop(clone1);
748        assert_eq!(dm.used_disk_space(), usage);
749
750        // Drop the last clone
751        drop(clone3);
752
753        // Now disk usage should be 0
754        assert_eq!(dm.used_disk_space(), 0);
755
756        Ok(())
757    }
758
759    #[test]
760    fn test_disk_usage_multiple_files() -> Result<()> {
761        use std::io::Write;
762
763        let dm = Arc::new(DiskManagerBuilder::default().build()?);
764
765        // Create multiple temp files
766        let mut file1 = dm.create_tmp_file("Testing1")?;
767        let mut file2 = dm.create_tmp_file("Testing2")?;
768
769        // Write to first file
770        file1.inner().as_file().write_all(b"file1")?;
771        file1.update_disk_usage()?;
772        let usage1 = file1.current_disk_usage();
773
774        assert_eq!(dm.used_disk_space(), usage1);
775
776        // Write to second file
777        file2.inner().as_file().write_all(b"file2 data")?;
778        file2.update_disk_usage()?;
779        let usage2 = file2.current_disk_usage();
780
781        // Global usage should be sum of both files
782        assert_eq!(dm.used_disk_space(), usage1 + usage2);
783
784        // Drop first file
785        drop(file1);
786
787        // Usage should only reflect second file
788        assert_eq!(dm.used_disk_space(), usage2);
789
790        // Drop second file
791        drop(file2);
792
793        // Usage should be 0
794        assert_eq!(dm.used_disk_space(), 0);
795
796        Ok(())
797    }
798}