Skip to main content

grafeo_core/execution/spill/
manager.rs

1//! Spill manager for file lifecycle management.
2
3use super::file::SpillFile;
4use parking_lot::Mutex;
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicU64, Ordering};
7
8/// Manages spill file lifecycle for out-of-core processing.
9///
10/// The manager handles:
11/// - Creating unique spill files with prefixes
12/// - Tracking total bytes spilled to disk
13/// - Automatic cleanup of all spill files on drop
14pub struct SpillManager {
15    /// Directory for spill files.
16    spill_dir: PathBuf,
17    /// Counter for unique file IDs.
18    next_file_id: AtomicU64,
19    /// Active spill file paths for cleanup.
20    active_files: Mutex<Vec<PathBuf>>,
21    /// Total bytes currently spilled to disk.
22    total_spilled_bytes: AtomicU64,
23}
24
25impl SpillManager {
26    /// Creates a new spill manager with the given directory.
27    ///
28    /// Creates the directory if it doesn't exist.
29    ///
30    /// # Errors
31    ///
32    /// Returns an error if the directory cannot be created.
33    pub fn new(spill_dir: impl Into<PathBuf>) -> std::io::Result<Self> {
34        let spill_dir = spill_dir.into();
35        std::fs::create_dir_all(&spill_dir)?;
36
37        Ok(Self {
38            spill_dir,
39            next_file_id: AtomicU64::new(0),
40            active_files: Mutex::new(Vec::new()),
41            total_spilled_bytes: AtomicU64::new(0),
42        })
43    }
44
45    /// Creates a new spill manager using a system temp directory.
46    ///
47    /// # Errors
48    ///
49    /// Returns an error if the temp directory cannot be created.
50    pub fn with_temp_dir() -> std::io::Result<Self> {
51        let temp_dir = std::env::temp_dir().join("grafeo_spill");
52        Self::new(temp_dir)
53    }
54
55    /// Returns the spill directory path.
56    #[must_use]
57    pub fn spill_dir(&self) -> &Path {
58        &self.spill_dir
59    }
60
61    /// Creates a new spill file with the given prefix.
62    ///
63    /// The file name format is: `{prefix}_{file_id}.spill`
64    ///
65    /// # Errors
66    ///
67    /// Returns an error if the file cannot be created.
68    pub fn create_file(&self, prefix: &str) -> std::io::Result<SpillFile> {
69        let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
70        let file_name = format!("{prefix}_{file_id}.spill");
71        let file_path = self.spill_dir.join(file_name);
72
73        // Track the file for cleanup
74        self.active_files.lock().push(file_path.clone());
75
76        SpillFile::new(file_path)
77    }
78
79    /// Registers bytes spilled to disk.
80    ///
81    /// Called by SpillFile when writing completes.
82    pub fn register_spilled_bytes(&self, bytes: u64) {
83        self.total_spilled_bytes.fetch_add(bytes, Ordering::Relaxed);
84    }
85
86    /// Unregisters bytes when a spill file is deleted.
87    ///
88    /// Called by SpillFile on deletion.
89    pub fn unregister_spilled_bytes(&self, bytes: u64) {
90        self.total_spilled_bytes.fetch_sub(bytes, Ordering::Relaxed);
91    }
92
93    /// Removes a file path from tracking (called when file is deleted).
94    pub fn unregister_file(&self, path: &Path) {
95        let mut files = self.active_files.lock();
96        files.retain(|p| p != path);
97    }
98
99    /// Returns total bytes currently spilled to disk.
100    #[must_use]
101    pub fn spilled_bytes(&self) -> u64 {
102        self.total_spilled_bytes.load(Ordering::Relaxed)
103    }
104
105    /// Returns the number of active spill files.
106    #[must_use]
107    pub fn active_file_count(&self) -> usize {
108        self.active_files.lock().len()
109    }
110
111    /// Cleans up all spill files.
112    ///
113    /// This is called automatically on drop, but can be called manually.
114    ///
115    /// # Errors
116    ///
117    /// Returns an error if any file cannot be deleted (continues trying others).
118    pub fn cleanup(&self) -> std::io::Result<()> {
119        let files = std::mem::take(&mut *self.active_files.lock());
120        let mut last_error = None;
121
122        for path in files {
123            if let Err(e) = std::fs::remove_file(&path) {
124                // Ignore "not found" errors (file may have been deleted already)
125                if e.kind() != std::io::ErrorKind::NotFound {
126                    last_error = Some(e);
127                }
128            }
129        }
130
131        self.total_spilled_bytes.store(0, Ordering::Relaxed);
132
133        match last_error {
134            Some(e) => Err(e),
135            None => Ok(()),
136        }
137    }
138}
139
140impl Drop for SpillManager {
141    fn drop(&mut self) {
142        // Best-effort cleanup on drop
143        let _ = self.cleanup();
144    }
145}
146
147impl std::fmt::Debug for SpillManager {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        f.debug_struct("SpillManager")
150            .field("spill_dir", &self.spill_dir)
151            .field("active_files", &self.active_file_count())
152            .field("spilled_bytes", &self.spilled_bytes())
153            .finish()
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use tempfile::TempDir;
161
162    #[test]
163    fn test_manager_creation() {
164        let temp_dir = TempDir::new().unwrap();
165        let manager = SpillManager::new(temp_dir.path()).unwrap();
166
167        assert_eq!(manager.spilled_bytes(), 0);
168        assert_eq!(manager.active_file_count(), 0);
169        assert_eq!(manager.spill_dir(), temp_dir.path());
170    }
171
172    #[test]
173    fn test_create_spill_file() {
174        let temp_dir = TempDir::new().unwrap();
175        let manager = SpillManager::new(temp_dir.path()).unwrap();
176
177        let file1 = manager.create_file("sort").unwrap();
178        let file2 = manager.create_file("sort").unwrap();
179        let file3 = manager.create_file("agg").unwrap();
180
181        assert_eq!(manager.active_file_count(), 3);
182
183        // File names should be unique
184        assert_ne!(file1.path(), file2.path());
185        assert!(file1.path().to_str().unwrap().contains("sort_0"));
186        assert!(file2.path().to_str().unwrap().contains("sort_1"));
187        assert!(file3.path().to_str().unwrap().contains("agg_2"));
188    }
189
190    #[test]
191    fn test_cleanup() {
192        let temp_dir = TempDir::new().unwrap();
193        let manager = SpillManager::new(temp_dir.path()).unwrap();
194
195        // Create some files
196        let _file1 = manager.create_file("test").unwrap();
197        let _file2 = manager.create_file("test").unwrap();
198        assert_eq!(manager.active_file_count(), 2);
199
200        // Cleanup should remove all files
201        manager.cleanup().unwrap();
202        assert_eq!(manager.active_file_count(), 0);
203    }
204
205    #[test]
206    fn test_spilled_bytes_tracking() {
207        let temp_dir = TempDir::new().unwrap();
208        let manager = SpillManager::new(temp_dir.path()).unwrap();
209
210        manager.register_spilled_bytes(1000);
211        manager.register_spilled_bytes(500);
212        assert_eq!(manager.spilled_bytes(), 1500);
213
214        manager.unregister_spilled_bytes(300);
215        assert_eq!(manager.spilled_bytes(), 1200);
216    }
217
218    #[test]
219    fn test_cleanup_on_drop() {
220        let temp_dir = TempDir::new().unwrap();
221        let temp_path = temp_dir.path().to_path_buf();
222
223        let file_path = {
224            let manager = SpillManager::new(&temp_path).unwrap();
225            let file = manager.create_file("test").unwrap();
226            file.path().to_path_buf()
227        };
228
229        // After manager is dropped, the file should be cleaned up
230        assert!(!file_path.exists());
231    }
232}