Skip to main content

grafeo_core/execution/spill/
manager.rs

1//! Spill manager for file lifecycle management.
2
3use super::file::SpillFile;
4use parking_lot::Mutex;
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicU64, Ordering};
7
8/// Manages spill file lifecycle for out-of-core processing.
9///
10/// The manager handles:
11/// - Creating unique spill files with prefixes
12/// - Tracking total bytes spilled to disk
13/// - Automatic cleanup of all spill files on drop
14///
15/// By default the spill directory itself outlives the manager (the caller
16/// owns it). Per-query callers that pass a unique throwaway directory should
17/// chain [`with_owned_dir`](Self::with_owned_dir) so `Drop` also removes the
18/// directory once its files are gone.
19pub struct SpillManager {
20    /// Directory for spill files.
21    spill_dir: PathBuf,
22    /// Counter for unique file IDs.
23    next_file_id: AtomicU64,
24    /// Active spill file paths for cleanup.
25    active_files: Mutex<Vec<PathBuf>>,
26    /// Total bytes currently spilled to disk.
27    total_spilled_bytes: AtomicU64,
28    /// Whether `Drop` should remove `spill_dir` itself (non-recursive).
29    owns_dir: bool,
30}
31
32impl SpillManager {
33    /// Creates a new spill manager with the given directory.
34    ///
35    /// Creates the directory if it doesn't exist. The directory is *not*
36    /// removed on drop unless [`with_owned_dir`](Self::with_owned_dir) is
37    /// chained on the result.
38    ///
39    /// # Errors
40    ///
41    /// Returns an error if the directory cannot be created.
42    pub fn new(spill_dir: impl Into<PathBuf>) -> std::io::Result<Self> {
43        let spill_dir = spill_dir.into();
44        std::fs::create_dir_all(&spill_dir)?;
45
46        Ok(Self {
47            spill_dir,
48            next_file_id: AtomicU64::new(0),
49            active_files: Mutex::new(Vec::new()),
50            total_spilled_bytes: AtomicU64::new(0),
51            owns_dir: false,
52        })
53    }
54
55    /// Marks the spill directory as owned by this manager so that `Drop`
56    /// removes it (non-recursive) after spill files are cleaned up.
57    ///
58    /// Use for per-query spill subdirectories (e.g. `<base>/query_<id>/`)
59    /// where leaving the empty directory behind would accumulate over time.
60    /// The removal is best-effort: if anything unexpected is left in the
61    /// directory, `remove_dir` fails and the directory is preserved.
62    #[must_use]
63    pub fn with_owned_dir(mut self) -> Self {
64        self.owns_dir = true;
65        self
66    }
67
68    /// Creates a new spill manager using a system temp directory.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the temp directory cannot be created.
73    pub fn with_temp_dir() -> std::io::Result<Self> {
74        let temp_dir = std::env::temp_dir().join("grafeo_spill");
75        Self::new(temp_dir)
76    }
77
78    /// Returns the spill directory path.
79    #[must_use]
80    pub fn spill_dir(&self) -> &Path {
81        &self.spill_dir
82    }
83
84    /// Creates a new spill file with the given prefix.
85    ///
86    /// The file name format is: `{prefix}_{file_id}.spill`
87    ///
88    /// # Errors
89    ///
90    /// Returns an error if the file cannot be created.
91    pub fn create_file(&self, prefix: &str) -> std::io::Result<SpillFile> {
92        let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
93        let file_name = format!("{prefix}_{file_id}.spill");
94        let file_path = self.spill_dir.join(file_name);
95
96        // Track the file for cleanup
97        self.active_files.lock().push(file_path.clone());
98
99        SpillFile::new(file_path)
100    }
101
102    /// Registers bytes spilled to disk.
103    ///
104    /// Called by SpillFile when writing completes.
105    pub fn register_spilled_bytes(&self, bytes: u64) {
106        self.total_spilled_bytes.fetch_add(bytes, Ordering::Relaxed);
107    }
108
109    /// Unregisters bytes when a spill file is deleted.
110    ///
111    /// Called by SpillFile on deletion.
112    pub fn unregister_spilled_bytes(&self, bytes: u64) {
113        self.total_spilled_bytes.fetch_sub(bytes, Ordering::Relaxed);
114    }
115
116    /// Removes a file path from tracking (called when file is deleted).
117    pub fn unregister_file(&self, path: &Path) {
118        let mut files = self.active_files.lock();
119        files.retain(|p| p != path);
120    }
121
122    /// Returns total bytes currently spilled to disk.
123    #[must_use]
124    pub fn spilled_bytes(&self) -> u64 {
125        self.total_spilled_bytes.load(Ordering::Relaxed)
126    }
127
128    /// Returns the number of active spill files.
129    #[must_use]
130    pub fn active_file_count(&self) -> usize {
131        self.active_files.lock().len()
132    }
133
134    /// Cleans up all spill files.
135    ///
136    /// This is called automatically on drop, but can be called manually.
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if any file cannot be deleted (continues trying others).
141    pub fn cleanup(&self) -> std::io::Result<()> {
142        let files = std::mem::take(&mut *self.active_files.lock());
143        let mut last_error = None;
144
145        for path in files {
146            if let Err(e) = std::fs::remove_file(&path) {
147                // Ignore "not found" errors (file may have been deleted already)
148                if e.kind() != std::io::ErrorKind::NotFound {
149                    last_error = Some(e);
150                }
151            }
152        }
153
154        self.total_spilled_bytes.store(0, Ordering::Relaxed);
155
156        match last_error {
157            Some(e) => Err(e),
158            None => Ok(()),
159        }
160    }
161}
162
163impl Drop for SpillManager {
164    fn drop(&mut self) {
165        // Best-effort cleanup on drop
166        let _ = self.cleanup();
167        if self.owns_dir {
168            // Non-recursive remove_dir: succeeds only if cleanup left the
169            // directory empty. If something else (a stray file, a subdir we
170            // didn't track) is in there, the directory is preserved.
171            let _ = std::fs::remove_dir(&self.spill_dir);
172        }
173    }
174}
175
176impl std::fmt::Debug for SpillManager {
177    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178        f.debug_struct("SpillManager")
179            .field("spill_dir", &self.spill_dir)
180            .field("active_files", &self.active_file_count())
181            .field("spilled_bytes", &self.spilled_bytes())
182            .finish()
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189    use tempfile::TempDir;
190
191    #[test]
192    fn test_manager_creation() {
193        let temp_dir = TempDir::new().unwrap();
194        let manager = SpillManager::new(temp_dir.path()).unwrap();
195
196        assert_eq!(manager.spilled_bytes(), 0);
197        assert_eq!(manager.active_file_count(), 0);
198        assert_eq!(manager.spill_dir(), temp_dir.path());
199    }
200
201    #[test]
202    fn test_create_spill_file() {
203        let temp_dir = TempDir::new().unwrap();
204        let manager = SpillManager::new(temp_dir.path()).unwrap();
205
206        let file1 = manager.create_file("sort").unwrap();
207        let file2 = manager.create_file("sort").unwrap();
208        let file3 = manager.create_file("agg").unwrap();
209
210        assert_eq!(manager.active_file_count(), 3);
211
212        // File names should be unique
213        assert_ne!(file1.path(), file2.path());
214        assert!(file1.path().to_str().unwrap().contains("sort_0"));
215        assert!(file2.path().to_str().unwrap().contains("sort_1"));
216        assert!(file3.path().to_str().unwrap().contains("agg_2"));
217    }
218
219    #[test]
220    fn test_cleanup() {
221        let temp_dir = TempDir::new().unwrap();
222        let manager = SpillManager::new(temp_dir.path()).unwrap();
223
224        // Create some files
225        let _file1 = manager.create_file("test").unwrap();
226        let _file2 = manager.create_file("test").unwrap();
227        assert_eq!(manager.active_file_count(), 2);
228
229        // Cleanup should remove all files
230        manager.cleanup().unwrap();
231        assert_eq!(manager.active_file_count(), 0);
232    }
233
234    #[test]
235    fn test_spilled_bytes_tracking() {
236        let temp_dir = TempDir::new().unwrap();
237        let manager = SpillManager::new(temp_dir.path()).unwrap();
238
239        manager.register_spilled_bytes(1000);
240        manager.register_spilled_bytes(500);
241        assert_eq!(manager.spilled_bytes(), 1500);
242
243        manager.unregister_spilled_bytes(300);
244        assert_eq!(manager.spilled_bytes(), 1200);
245    }
246
247    #[test]
248    fn test_cleanup_on_drop() {
249        let temp_dir = TempDir::new().unwrap();
250        let temp_path = temp_dir.path().to_path_buf();
251
252        let file_path = {
253            let manager = SpillManager::new(&temp_path).unwrap();
254            let file = manager.create_file("test").unwrap();
255            file.path().to_path_buf()
256        };
257
258        // After manager is dropped, the file should be cleaned up
259        assert!(!file_path.exists());
260    }
261
262    #[test]
263    fn unowned_dir_survives_drop() {
264        // Default behavior: caller owns the directory, manager leaves it
265        // alone on drop. Per-query subdirs should opt into ownership; shared
266        // / caller-managed dirs should not.
267        let temp_dir = TempDir::new().unwrap();
268        let dir_path = temp_dir.path().join("shared_spill");
269
270        {
271            let _manager = SpillManager::new(&dir_path).unwrap();
272        }
273
274        assert!(
275            dir_path.exists(),
276            "default SpillManager must not remove its directory on drop"
277        );
278    }
279
280    #[test]
281    fn owned_dir_removed_after_files_cleaned_on_drop() {
282        // Regression test for the per-query spill leak (#323 follow-up):
283        // session-created `<base>/query_<id>/` subdirs accumulated empty
284        // because Drop only removed files, not the directory itself.
285        let temp_dir = TempDir::new().unwrap();
286        let query_dir = temp_dir.path().join("query_42");
287
288        {
289            let manager = SpillManager::new(&query_dir).unwrap().with_owned_dir();
290            let _file = manager.create_file("sort").unwrap();
291            assert!(query_dir.exists());
292        }
293
294        assert!(
295            !query_dir.exists(),
296            "with_owned_dir manager must remove its empty directory on drop"
297        );
298    }
299
300    #[test]
301    fn owned_dir_preserved_when_unexpected_contents_remain() {
302        // remove_dir is non-recursive on purpose: if something the manager
303        // did not track is sitting in the directory, the directory survives
304        // rather than being silently deleted.
305        let temp_dir = TempDir::new().unwrap();
306        let query_dir = temp_dir.path().join("query_with_extra");
307
308        let stray_file = {
309            let manager = SpillManager::new(&query_dir).unwrap().with_owned_dir();
310            let _file = manager.create_file("sort").unwrap();
311            let stray = query_dir.join("not_tracked.dat");
312            std::fs::write(&stray, b"keep me").unwrap();
313            stray
314        };
315
316        assert!(
317            query_dir.exists(),
318            "directory with untracked content must not be removed"
319        );
320        assert!(
321            stray_file.exists(),
322            "untracked file must not be touched by SpillManager Drop"
323        );
324    }
325}