Skip to main content

datafusion_execution/
disk_manager.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DiskManager`]: Manages files generated during query execution
19
20use datafusion_common::{
21    DataFusionError, Result, config_err, resources_datafusion_err, resources_err,
22};
23use log::debug;
24use parking_lot::Mutex;
25use rand::{Rng, rng};
26use std::path::{Path, PathBuf};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
29use tempfile::{Builder, NamedTempFile, TempDir};
30
31use datafusion_common::human_readable_size;
32
33pub const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB
34
35/// Builder pattern for the [DiskManager] structure
36#[derive(Clone, Debug)]
37pub struct DiskManagerBuilder {
38    /// The storage mode of the disk manager
39    mode: DiskManagerMode,
40    /// The maximum amount of data (in bytes) stored inside the temporary directories.
41    /// Default to 100GB
42    max_temp_directory_size: u64,
43}
44
45impl Default for DiskManagerBuilder {
46    fn default() -> Self {
47        Self {
48            mode: DiskManagerMode::OsTmpDirectory,
49            max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
50        }
51    }
52}
53
54impl DiskManagerBuilder {
55    pub fn set_mode(&mut self, mode: DiskManagerMode) {
56        self.mode = mode;
57    }
58
59    pub fn with_mode(mut self, mode: DiskManagerMode) -> Self {
60        self.set_mode(mode);
61        self
62    }
63
64    pub fn set_max_temp_directory_size(&mut self, value: u64) {
65        self.max_temp_directory_size = value;
66    }
67
68    pub fn with_max_temp_directory_size(mut self, value: u64) -> Self {
69        self.set_max_temp_directory_size(value);
70        self
71    }
72
73    /// Create a DiskManager given the builder
74    pub fn build(self) -> Result<DiskManager> {
75        match self.mode {
76            DiskManagerMode::OsTmpDirectory => Ok(DiskManager {
77                local_dirs: Mutex::new(Some(vec![])),
78                max_temp_directory_size: self.max_temp_directory_size,
79                used_disk_space: Arc::new(AtomicU64::new(0)),
80                active_files_count: Arc::new(AtomicUsize::new(0)),
81            }),
82            DiskManagerMode::Directories(conf_dirs) => {
83                let local_dirs = create_local_dirs(&conf_dirs)?;
84                debug!(
85                    "Created local dirs {local_dirs:?} as DataFusion working directory"
86                );
87                Ok(DiskManager {
88                    local_dirs: Mutex::new(Some(local_dirs)),
89                    max_temp_directory_size: self.max_temp_directory_size,
90                    used_disk_space: Arc::new(AtomicU64::new(0)),
91                    active_files_count: Arc::new(AtomicUsize::new(0)),
92                })
93            }
94            DiskManagerMode::Disabled => Ok(DiskManager {
95                local_dirs: Mutex::new(None),
96                max_temp_directory_size: self.max_temp_directory_size,
97                used_disk_space: Arc::new(AtomicU64::new(0)),
98                active_files_count: Arc::new(AtomicUsize::new(0)),
99            }),
100        }
101    }
102}
103
104#[derive(Clone, Debug, Default)]
105pub enum DiskManagerMode {
106    /// Create a new [DiskManager] that creates temporary files within
107    /// a temporary directory chosen by the OS
108    #[default]
109    OsTmpDirectory,
110
111    /// Create a new [DiskManager] that creates temporary files within
112    /// the specified directories. One of the directories will be chosen
113    /// at random for each temporary file created.
114    Directories(Vec<PathBuf>),
115
116    /// Disable disk manager, attempts to create temporary files will error
117    Disabled,
118}
119
120/// Configuration for temporary disk access
121#[deprecated(since = "48.0.0", note = "Use DiskManagerBuilder instead")]
122#[derive(Debug, Clone, Default)]
123#[allow(clippy::allow_attributes)]
124#[allow(deprecated)]
125pub enum DiskManagerConfig {
126    /// Use the provided [DiskManager] instance
127    Existing(Arc<DiskManager>),
128
129    /// Create a new [DiskManager] that creates temporary files within
130    /// a temporary directory chosen by the OS
131    #[default]
132    NewOs,
133
134    /// Create a new [DiskManager] that creates temporary files within
135    /// the specified directories
136    NewSpecified(Vec<PathBuf>),
137
138    /// Disable disk manager, attempts to create temporary files will error
139    Disabled,
140}
141
142#[expect(deprecated)]
143impl DiskManagerConfig {
144    /// Create temporary files in a temporary directory chosen by the OS
145    pub fn new() -> Self {
146        Self::default()
147    }
148
149    /// Create temporary files using the provided disk manager
150    pub fn new_existing(existing: Arc<DiskManager>) -> Self {
151        Self::Existing(existing)
152    }
153
154    /// Create temporary files in the specified directories
155    pub fn new_specified(paths: Vec<PathBuf>) -> Self {
156        Self::NewSpecified(paths)
157    }
158}
159
160/// Manages files generated during query execution, e.g. spill files generated
161/// while processing dataset larger than available memory.
162#[derive(Debug)]
163pub struct DiskManager {
164    /// TempDirs to put temporary files in.
165    ///
166    /// If `Some(vec![])` a new OS specified temporary directory will be created
167    /// If `None` an error will be returned (configured not to spill)
168    local_dirs: Mutex<Option<Vec<Arc<TempDir>>>>,
169    /// The maximum amount of data (in bytes) stored inside the temporary directories.
170    /// Default to 100GB
171    max_temp_directory_size: u64,
172    /// Used disk space in the temporary directories. Now only spilled data for
173    /// external executors are counted.
174    used_disk_space: Arc<AtomicU64>,
175    /// Number of active temporary files created by this disk manager
176    active_files_count: Arc<AtomicUsize>,
177}
178
179/// Information about the current disk usage for spilling
180#[derive(Debug, Clone, Copy)]
181pub struct SpillingProgress {
182    /// Total bytes currently used on disk for spilling
183    pub current_bytes: u64,
184    /// Total number of active spill files
185    pub active_files_count: usize,
186}
187
188impl DiskManager {
189    /// Creates a builder for [DiskManager]
190    pub fn builder() -> DiskManagerBuilder {
191        DiskManagerBuilder::default()
192    }
193
194    /// Create a DiskManager given the configuration
195    #[expect(deprecated)]
196    #[deprecated(since = "48.0.0", note = "Use DiskManager::builder() instead")]
197    pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
198        match config {
199            DiskManagerConfig::Existing(manager) => Ok(manager),
200            DiskManagerConfig::NewOs => Ok(Arc::new(Self {
201                local_dirs: Mutex::new(Some(vec![])),
202                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
203                used_disk_space: Arc::new(AtomicU64::new(0)),
204                active_files_count: Arc::new(AtomicUsize::new(0)),
205            })),
206            DiskManagerConfig::NewSpecified(conf_dirs) => {
207                let local_dirs = create_local_dirs(&conf_dirs)?;
208                debug!(
209                    "Created local dirs {local_dirs:?} as DataFusion working directory"
210                );
211                Ok(Arc::new(Self {
212                    local_dirs: Mutex::new(Some(local_dirs)),
213                    max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
214                    used_disk_space: Arc::new(AtomicU64::new(0)),
215                    active_files_count: Arc::new(AtomicUsize::new(0)),
216                }))
217            }
218            DiskManagerConfig::Disabled => Ok(Arc::new(Self {
219                local_dirs: Mutex::new(None),
220                max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
221                used_disk_space: Arc::new(AtomicU64::new(0)),
222                active_files_count: Arc::new(AtomicUsize::new(0)),
223            })),
224        }
225    }
226
227    pub fn set_max_temp_directory_size(
228        &mut self,
229        max_temp_directory_size: u64,
230    ) -> Result<()> {
231        // If the disk manager is disabled and `max_temp_directory_size` is not 0,
232        // this operation is not meaningful, fail early.
233        if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
234            return config_err!(
235                "Cannot set max temp directory size for a disk manager that spilling is disabled"
236            );
237        }
238
239        self.max_temp_directory_size = max_temp_directory_size;
240        Ok(())
241    }
242
243    pub fn set_arc_max_temp_directory_size(
244        this: &mut Arc<Self>,
245        max_temp_directory_size: u64,
246    ) -> Result<()> {
247        if let Some(inner) = Arc::get_mut(this) {
248            inner.set_max_temp_directory_size(max_temp_directory_size)?;
249            Ok(())
250        } else {
251            config_err!("DiskManager should be a single instance")
252        }
253    }
254
255    pub fn with_max_temp_directory_size(
256        mut self,
257        max_temp_directory_size: u64,
258    ) -> Result<Self> {
259        self.set_max_temp_directory_size(max_temp_directory_size)?;
260        Ok(self)
261    }
262
263    pub fn used_disk_space(&self) -> u64 {
264        self.used_disk_space.load(Ordering::Relaxed)
265    }
266
267    /// Returns the maximum temporary directory size in bytes
268    pub fn max_temp_directory_size(&self) -> u64 {
269        self.max_temp_directory_size
270    }
271
272    /// Returns the current spilling progress
273    pub fn spilling_progress(&self) -> SpillingProgress {
274        SpillingProgress {
275            current_bytes: self.used_disk_space.load(Ordering::Relaxed),
276            active_files_count: self.active_files_count.load(Ordering::Relaxed),
277        }
278    }
279
280    /// Returns the temporary directory paths
281    pub fn temp_dir_paths(&self) -> Vec<PathBuf> {
282        self.local_dirs
283            .lock()
284            .as_ref()
285            .map(|dirs| {
286                dirs.iter()
287                    .map(|temp_dir| temp_dir.path().to_path_buf())
288                    .collect()
289            })
290            .unwrap_or_default()
291    }
292
293    /// Return true if this disk manager supports creating temporary
294    /// files. If this returns false, any call to `create_tmp_file`
295    /// will error.
296    pub fn tmp_files_enabled(&self) -> bool {
297        self.local_dirs.lock().is_some()
298    }
299
300    /// Return a temporary file from a randomized choice in the configured locations
301    ///
302    /// If the file can not be created for some reason, returns an
303    /// error message referencing the request description
304    pub fn create_tmp_file(
305        self: &Arc<Self>,
306        request_description: &str,
307    ) -> Result<RefCountedTempFile> {
308        let mut guard = self.local_dirs.lock();
309        let local_dirs = guard.as_mut().ok_or_else(|| {
310            resources_datafusion_err!(
311                "Memory Exhausted while {request_description} (DiskManager is disabled)"
312            )
313        })?;
314
315        // Create a temporary directory if needed
316        if local_dirs.is_empty() {
317            let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;
318
319            debug!(
320                "Created directory '{:?}' as DataFusion tempfile directory for {}",
321                tempdir.path().to_string_lossy(),
322                request_description,
323            );
324
325            local_dirs.push(Arc::new(tempdir));
326        }
327
328        let dir_index = rng().random_range(0..local_dirs.len());
329        self.active_files_count.fetch_add(1, Ordering::Relaxed);
330        Ok(RefCountedTempFile {
331            parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
332            tempfile: Arc::new(
333                Builder::new()
334                    .tempfile_in(local_dirs[dir_index].as_ref())
335                    .map_err(DataFusionError::IoError)?,
336            ),
337            current_file_disk_usage: Arc::new(AtomicU64::new(0)),
338            disk_manager: Arc::clone(self),
339        })
340    }
341}
342
343/// A wrapper around a [`NamedTempFile`] that also contains
344/// a reference to its parent temporary directory.
345///
346/// # Note
347/// After any modification to the underlying file (e.g., writing data to it), the caller
348/// must invoke [`Self::update_disk_usage`] to update the global disk usage counter.
349/// This ensures the disk manager can properly enforce usage limits configured by
350/// [`DiskManager::with_max_temp_directory_size`].
351///
352/// This type is Clone-able, allowing multiple references to the same underlying file.
353/// The file is deleted only when the last reference is dropped.
354///
355/// The parent temporary directory is also kept alive as long as any reference to
356/// this file exists, preventing premature cleanup of the directory.
357///
358/// Once all references to this file are dropped, the file is deleted, and the
359/// disk usage is subtracted from the disk manager's total.
360#[derive(Debug)]
361pub struct RefCountedTempFile {
362    /// The reference to the directory in which temporary files are created to ensure
363    /// it is not cleaned up prior to the NamedTempFile
364    parent_temp_dir: Arc<TempDir>,
365    /// The underlying temporary file, wrapped in Arc to allow cloning
366    tempfile: Arc<NamedTempFile>,
367    /// Tracks the current disk usage of this temporary file. See
368    /// [`Self::update_disk_usage`] for more details.
369    ///
370    /// This is wrapped in `Arc<AtomicU64>` so that all clones share the same
371    /// disk usage tracking, preventing incorrect accounting when clones are dropped.
372    current_file_disk_usage: Arc<AtomicU64>,
373    /// The disk manager that created and manages this temporary file
374    disk_manager: Arc<DiskManager>,
375}
376
377impl Clone for RefCountedTempFile {
378    fn clone(&self) -> Self {
379        Self {
380            parent_temp_dir: Arc::clone(&self.parent_temp_dir),
381            tempfile: Arc::clone(&self.tempfile),
382            current_file_disk_usage: Arc::clone(&self.current_file_disk_usage),
383            disk_manager: Arc::clone(&self.disk_manager),
384        }
385    }
386}
387
388impl RefCountedTempFile {
389    pub fn path(&self) -> &Path {
390        self.tempfile.path()
391    }
392
393    pub fn inner(&self) -> &NamedTempFile {
394        self.tempfile.as_ref()
395    }
396
397    /// Updates the global disk usage counter after modifications to the underlying file.
398    ///
399    /// # Errors
400    /// - Returns an error if the global disk usage exceeds the configured limit.
401    pub fn update_disk_usage(&mut self) -> Result<()> {
402        // Get new file size from OS
403        let metadata = self.tempfile.as_file().metadata()?;
404        let new_disk_usage = metadata.len();
405
406        // Get the old disk usage
407        let old_disk_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
408
409        // Update the global disk usage by:
410        // 1. Subtracting the old file size from the global counter
411        self.disk_manager
412            .used_disk_space
413            .fetch_sub(old_disk_usage, Ordering::Relaxed);
414        // 2. Adding the new file size to the global counter
415        self.disk_manager
416            .used_disk_space
417            .fetch_add(new_disk_usage, Ordering::Relaxed);
418
419        // 3. Check if the updated global disk usage exceeds the configured limit
420        let global_disk_usage = self.disk_manager.used_disk_space.load(Ordering::Relaxed);
421        if global_disk_usage > self.disk_manager.max_temp_directory_size {
422            return resources_err!(
423                "The used disk space during the spilling process has exceeded the allowable limit of {}. \
424                Please try increasing the config: `datafusion.runtime.max_temp_directory_size`.",
425                human_readable_size(self.disk_manager.max_temp_directory_size as usize)
426            );
427        }
428
429        // 4. Update the local file size tracking
430        self.current_file_disk_usage
431            .store(new_disk_usage, Ordering::Relaxed);
432
433        Ok(())
434    }
435
436    pub fn current_disk_usage(&self) -> u64 {
437        self.current_file_disk_usage.load(Ordering::Relaxed)
438    }
439}
440
441/// When the temporary file is dropped, subtract its disk usage from the disk manager's total
442impl Drop for RefCountedTempFile {
443    fn drop(&mut self) {
444        // Only subtract disk usage when this is the last reference to the file
445        // Check if we're the last one by seeing if there's only one strong reference
446        // left to the underlying tempfile (the one we're holding)
447        if Arc::strong_count(&self.tempfile) == 1 {
448            let current_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
449            self.disk_manager
450                .used_disk_space
451                .fetch_sub(current_usage, Ordering::Relaxed);
452            self.disk_manager
453                .active_files_count
454                .fetch_sub(1, Ordering::Relaxed);
455        }
456    }
457}
458
459/// Setup local dirs by creating one new dir in each of the given dirs
460fn create_local_dirs(local_dirs: &[PathBuf]) -> Result<Vec<Arc<TempDir>>> {
461    local_dirs
462        .iter()
463        .map(|root| {
464            if !Path::new(root).exists() {
465                std::fs::create_dir(root)?;
466            }
467            Builder::new()
468                .prefix("datafusion-")
469                .tempdir_in(root)
470                .map_err(DataFusionError::IoError)
471        })
472        .map(|result| result.map(Arc::new))
473        .collect()
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479
480    #[test]
481    fn lazy_temp_dir_creation() -> Result<()> {
482        // A default configuration should not create temp files until requested
483        let dm = Arc::new(DiskManagerBuilder::default().build()?);
484
485        assert_eq!(0, local_dir_snapshot(&dm).len());
486
487        // can still create a tempfile however:
488        let actual = dm.create_tmp_file("Testing")?;
489
490        // Now the tempdir has been created on demand
491        assert_eq!(1, local_dir_snapshot(&dm).len());
492
493        // the returned tempfile file should be in the temp directory
494        let local_dirs = local_dir_snapshot(&dm);
495        assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));
496
497        Ok(())
498    }
499
500    fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
501        dm.local_dirs
502            .lock()
503            .iter()
504            .flatten()
505            .map(|p| p.path().into())
506            .collect()
507    }
508
509    #[test]
510    fn file_in_right_dir() -> Result<()> {
511        let local_dir1 = TempDir::new()?;
512        let local_dir2 = TempDir::new()?;
513        let local_dir3 = TempDir::new()?;
514        let local_dirs = vec![local_dir1.path(), local_dir2.path(), local_dir3.path()];
515        let dm = Arc::new(
516            DiskManagerBuilder::default()
517                .with_mode(DiskManagerMode::Directories(
518                    local_dirs.iter().map(|p| p.into()).collect(),
519                ))
520                .build()?,
521        );
522
523        assert!(dm.tmp_files_enabled());
524        let actual = dm.create_tmp_file("Testing")?;
525
526        // the file should be in one of the specified local directories
527        assert_path_in_dirs(actual.path(), local_dirs.into_iter());
528
529        Ok(())
530    }
531
532    #[test]
533    fn test_disabled_disk_manager() {
534        let manager = Arc::new(
535            DiskManagerBuilder::default()
536                .with_mode(DiskManagerMode::Disabled)
537                .build()
538                .unwrap(),
539        );
540        assert!(!manager.tmp_files_enabled());
541        assert_eq!(
542            manager
543                .create_tmp_file("Testing")
544                .unwrap_err()
545                .strip_backtrace(),
546            "Resources exhausted: Memory Exhausted while Testing (DiskManager is disabled)",
547        )
548    }
549
550    #[test]
551    fn test_disk_manager_create_spill_folder() {
552        let dir = TempDir::new().unwrap();
553        DiskManagerBuilder::default()
554            .with_mode(DiskManagerMode::Directories(vec![dir.path().to_path_buf()]))
555            .build()
556            .unwrap();
557    }
558
559    /// Asserts that `file_path` is found anywhere in any of `dir` directories
560    fn assert_path_in_dirs<'a>(
561        file_path: &'a Path,
562        dirs: impl Iterator<Item = &'a Path>,
563    ) {
564        let dirs: Vec<&Path> = dirs.collect();
565
566        let found = dirs.iter().any(|dir_path| {
567            file_path
568                .ancestors()
569                .any(|candidate_path| *dir_path == candidate_path)
570        });
571
572        assert!(found, "Can't find {file_path:?} in dirs: {dirs:?}");
573    }
574
575    #[test]
576    fn test_temp_file_still_alive_after_disk_manager_dropped() -> Result<()> {
577        // Test for the case using OS arranged temporary directory
578        let dm = Arc::new(DiskManagerBuilder::default().build()?);
579        let temp_file = dm.create_tmp_file("Testing")?;
580        let temp_file_path = temp_file.path().to_owned();
581        assert!(temp_file_path.exists());
582
583        drop(dm);
584        assert!(temp_file_path.exists());
585
586        drop(temp_file);
587        assert!(!temp_file_path.exists());
588
589        // Test for the case using specified directories
590        let local_dir1 = TempDir::new()?;
591        let local_dir2 = TempDir::new()?;
592        let local_dir3 = TempDir::new()?;
593        let local_dirs = [local_dir1.path(), local_dir2.path(), local_dir3.path()];
594        let dm = Arc::new(
595            DiskManagerBuilder::default()
596                .with_mode(DiskManagerMode::Directories(
597                    local_dirs.iter().map(|p| p.into()).collect(),
598                ))
599                .build()?,
600        );
601        let temp_file = dm.create_tmp_file("Testing")?;
602        let temp_file_path = temp_file.path().to_owned();
603        assert!(temp_file_path.exists());
604
605        drop(dm);
606        assert!(temp_file_path.exists());
607
608        drop(temp_file);
609        assert!(!temp_file_path.exists());
610
611        Ok(())
612    }
613
614    #[test]
615    fn test_disk_usage_basic() -> Result<()> {
616        use std::io::Write;
617
618        let dm = Arc::new(DiskManagerBuilder::default().build()?);
619        let mut temp_file = dm.create_tmp_file("Testing")?;
620
621        // Initially, disk usage should be 0
622        assert_eq!(dm.used_disk_space(), 0);
623        assert_eq!(temp_file.current_disk_usage(), 0);
624
625        // Write some data to the file
626        temp_file.inner().as_file().write_all(b"hello world")?;
627        temp_file.update_disk_usage()?;
628
629        // Disk usage should now reflect the written data
630        let expected_usage = temp_file.current_disk_usage();
631        assert!(expected_usage > 0);
632        assert_eq!(dm.used_disk_space(), expected_usage);
633
634        // Write more data
635        temp_file.inner().as_file().write_all(b" more data")?;
636        temp_file.update_disk_usage()?;
637
638        // Disk usage should increase
639        let new_usage = temp_file.current_disk_usage();
640        assert!(new_usage > expected_usage);
641        assert_eq!(dm.used_disk_space(), new_usage);
642
643        // Drop the file
644        drop(temp_file);
645
646        // Disk usage should return to 0
647        assert_eq!(dm.used_disk_space(), 0);
648
649        Ok(())
650    }
651
652    #[test]
653    fn test_disk_usage_with_clones() -> Result<()> {
654        use std::io::Write;
655
656        let dm = Arc::new(DiskManagerBuilder::default().build()?);
657        let mut temp_file = dm.create_tmp_file("Testing")?;
658
659        // Write some data
660        temp_file.inner().as_file().write_all(b"test data")?;
661        temp_file.update_disk_usage()?;
662
663        let usage_after_write = temp_file.current_disk_usage();
664        assert!(usage_after_write > 0);
665        assert_eq!(dm.used_disk_space(), usage_after_write);
666
667        // Clone the file
668        let clone1 = temp_file.clone();
669        let clone2 = temp_file.clone();
670
671        // All clones should see the same disk usage
672        assert_eq!(clone1.current_disk_usage(), usage_after_write);
673        assert_eq!(clone2.current_disk_usage(), usage_after_write);
674
675        // Global disk usage should still be the same (not multiplied by number of clones)
676        assert_eq!(dm.used_disk_space(), usage_after_write);
677
678        // Write more data through one clone
679        clone1.inner().as_file().write_all(b" more data")?;
680        let mut mutable_clone1 = clone1;
681        mutable_clone1.update_disk_usage()?;
682
683        let new_usage = mutable_clone1.current_disk_usage();
684        assert!(new_usage > usage_after_write);
685
686        // All clones should see the updated disk usage
687        assert_eq!(temp_file.current_disk_usage(), new_usage);
688        assert_eq!(clone2.current_disk_usage(), new_usage);
689        assert_eq!(mutable_clone1.current_disk_usage(), new_usage);
690
691        // Global disk usage should reflect the new size (not multiplied)
692        assert_eq!(dm.used_disk_space(), new_usage);
693
694        // Drop one clone
695        drop(mutable_clone1);
696
697        // Disk usage should NOT change (other clones still exist)
698        assert_eq!(dm.used_disk_space(), new_usage);
699        assert_eq!(temp_file.current_disk_usage(), new_usage);
700        assert_eq!(clone2.current_disk_usage(), new_usage);
701
702        // Drop another clone
703        drop(clone2);
704
705        // Disk usage should still NOT change (original still exists)
706        assert_eq!(dm.used_disk_space(), new_usage);
707        assert_eq!(temp_file.current_disk_usage(), new_usage);
708
709        // Drop the original
710        drop(temp_file);
711
712        // Now disk usage should return to 0 (last reference dropped)
713        assert_eq!(dm.used_disk_space(), 0);
714
715        Ok(())
716    }
717
718    #[test]
719    fn test_disk_usage_clones_dropped_out_of_order() -> Result<()> {
720        use std::io::Write;
721
722        let dm = Arc::new(DiskManagerBuilder::default().build()?);
723        let mut temp_file = dm.create_tmp_file("Testing")?;
724
725        // Write data
726        temp_file.inner().as_file().write_all(b"test")?;
727        temp_file.update_disk_usage()?;
728
729        let usage = temp_file.current_disk_usage();
730        assert_eq!(dm.used_disk_space(), usage);
731
732        // Create multiple clones
733        let clone1 = temp_file.clone();
734        let clone2 = temp_file.clone();
735        let clone3 = temp_file.clone();
736
737        // Drop the original first (out of order)
738        drop(temp_file);
739
740        // Disk usage should still be tracked (clones exist)
741        assert_eq!(dm.used_disk_space(), usage);
742        assert_eq!(clone1.current_disk_usage(), usage);
743
744        // Drop clones in different order
745        drop(clone2);
746        assert_eq!(dm.used_disk_space(), usage);
747
748        drop(clone1);
749        assert_eq!(dm.used_disk_space(), usage);
750
751        // Drop the last clone
752        drop(clone3);
753
754        // Now disk usage should be 0
755        assert_eq!(dm.used_disk_space(), 0);
756
757        Ok(())
758    }
759
760    #[test]
761    fn test_disk_usage_multiple_files() -> Result<()> {
762        use std::io::Write;
763
764        let dm = Arc::new(DiskManagerBuilder::default().build()?);
765
766        // Create multiple temp files
767        let mut file1 = dm.create_tmp_file("Testing1")?;
768        let mut file2 = dm.create_tmp_file("Testing2")?;
769
770        // Write to first file
771        file1.inner().as_file().write_all(b"file1")?;
772        file1.update_disk_usage()?;
773        let usage1 = file1.current_disk_usage();
774
775        assert_eq!(dm.used_disk_space(), usage1);
776
777        // Write to second file
778        file2.inner().as_file().write_all(b"file2 data")?;
779        file2.update_disk_usage()?;
780        let usage2 = file2.current_disk_usage();
781
782        // Global usage should be sum of both files
783        assert_eq!(dm.used_disk_space(), usage1 + usage2);
784
785        // Drop first file
786        drop(file1);
787
788        // Usage should only reflect second file
789        assert_eq!(dm.used_disk_space(), usage2);
790
791        // Drop second file
792        drop(file2);
793
794        // Usage should be 0
795        assert_eq!(dm.used_disk_space(), 0);
796
797        Ok(())
798    }
799}