Skip to main content

graphos_core/execution/spill/
async_manager.rs

1//! Async spill manager for file lifecycle management using tokio.
2
3use super::async_file::AsyncSpillFile;
4use std::path::{Path, PathBuf};
5use std::sync::atomic::{AtomicU64, Ordering};
6use tokio::sync::Mutex;
7
8/// Async manager for spill file lifecycle for out-of-core processing.
9///
10/// The async manager handles:
11/// - Creating unique spill files with prefixes using async I/O
12/// - Tracking total bytes spilled to disk
13/// - Automatic cleanup of all spill files on drop
14pub struct AsyncSpillManager {
15    /// Directory for spill files.
16    spill_dir: PathBuf,
17    /// Counter for unique file IDs.
18    next_file_id: AtomicU64,
19    /// Active spill file paths for cleanup.
20    active_files: Mutex<Vec<PathBuf>>,
21    /// Total bytes currently spilled to disk.
22    total_spilled_bytes: AtomicU64,
23}
24
25impl AsyncSpillManager {
26    /// Creates a new async spill manager with the given directory.
27    ///
28    /// Creates the directory if it doesn't exist.
29    ///
30    /// # Errors
31    ///
32    /// Returns an error if the directory cannot be created.
33    pub async fn new(spill_dir: impl Into<PathBuf>) -> std::io::Result<Self> {
34        let spill_dir = spill_dir.into();
35        tokio::fs::create_dir_all(&spill_dir).await?;
36
37        Ok(Self {
38            spill_dir,
39            next_file_id: AtomicU64::new(0),
40            active_files: Mutex::new(Vec::new()),
41            total_spilled_bytes: AtomicU64::new(0),
42        })
43    }
44
45    /// Creates a new async spill manager using a system temp directory.
46    ///
47    /// # Errors
48    ///
49    /// Returns an error if the temp directory cannot be created.
50    pub async fn with_temp_dir() -> std::io::Result<Self> {
51        let temp_dir = std::env::temp_dir().join("graphos_spill");
52        Self::new(temp_dir).await
53    }
54
55    /// Returns the spill directory path.
56    #[must_use]
57    pub fn spill_dir(&self) -> &Path {
58        &self.spill_dir
59    }
60
61    /// Creates a new async spill file with the given prefix.
62    ///
63    /// The file name format is: `{prefix}_{file_id}.spill`
64    ///
65    /// # Errors
66    ///
67    /// Returns an error if the file cannot be created.
68    pub async fn create_file(&self, prefix: &str) -> std::io::Result<AsyncSpillFile> {
69        let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
70        let file_name = format!("{prefix}_{file_id}.spill");
71        let file_path = self.spill_dir.join(file_name);
72
73        // Track the file for cleanup
74        self.active_files.lock().await.push(file_path.clone());
75
76        AsyncSpillFile::new(file_path).await
77    }
78
79    /// Registers bytes spilled to disk.
80    ///
81    /// Called by AsyncSpillFile when writing completes.
82    pub fn register_spilled_bytes(&self, bytes: u64) {
83        self.total_spilled_bytes.fetch_add(bytes, Ordering::Relaxed);
84    }
85
86    /// Unregisters bytes when a spill file is deleted.
87    ///
88    /// Called by AsyncSpillFile on deletion.
89    pub fn unregister_spilled_bytes(&self, bytes: u64) {
90        self.total_spilled_bytes.fetch_sub(bytes, Ordering::Relaxed);
91    }
92
93    /// Removes a file path from tracking (called when file is deleted).
94    pub async fn unregister_file(&self, path: &Path) {
95        let mut files = self.active_files.lock().await;
96        files.retain(|p| p != path);
97    }
98
99    /// Returns total bytes currently spilled to disk.
100    #[must_use]
101    pub fn spilled_bytes(&self) -> u64 {
102        self.total_spilled_bytes.load(Ordering::Relaxed)
103    }
104
105    /// Returns the number of active spill files.
106    pub async fn active_file_count(&self) -> usize {
107        self.active_files.lock().await.len()
108    }
109
110    /// Cleans up all spill files asynchronously.
111    ///
112    /// This can be called manually for async cleanup.
113    ///
114    /// # Errors
115    ///
116    /// Returns an error if any file cannot be deleted (continues trying others).
117    pub async fn cleanup(&self) -> std::io::Result<()> {
118        let files = std::mem::take(&mut *self.active_files.lock().await);
119        let mut last_error = None;
120
121        for path in files {
122            if let Err(e) = tokio::fs::remove_file(&path).await {
123                // Ignore "not found" errors (file may have been deleted already)
124                if e.kind() != std::io::ErrorKind::NotFound {
125                    last_error = Some(e);
126                }
127            }
128        }
129
130        self.total_spilled_bytes.store(0, Ordering::Relaxed);
131
132        match last_error {
133            Some(e) => Err(e),
134            None => Ok(()),
135        }
136    }
137
138    /// Synchronous cleanup for use in Drop.
139    ///
140    /// This is a best-effort cleanup that blocks on file deletion.
141    fn cleanup_sync(&self) {
142        // Use std::fs for synchronous cleanup in Drop
143        if let Ok(mut files) = self.active_files.try_lock() {
144            let files_to_remove = std::mem::take(&mut *files);
145            for path in files_to_remove {
146                let _ = std::fs::remove_file(&path);
147            }
148        }
149    }
150}
151
152impl Drop for AsyncSpillManager {
153    fn drop(&mut self) {
154        // Best-effort synchronous cleanup on drop
155        self.cleanup_sync();
156    }
157}
158
159impl std::fmt::Debug for AsyncSpillManager {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        f.debug_struct("AsyncSpillManager")
162            .field("spill_dir", &self.spill_dir)
163            .field("spilled_bytes", &self.spilled_bytes())
164            .finish()
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171    use tempfile::TempDir;
172
173    #[tokio::test]
174    async fn test_async_manager_creation() {
175        let temp_dir = TempDir::new().unwrap();
176        let manager = AsyncSpillManager::new(temp_dir.path()).await.unwrap();
177
178        assert_eq!(manager.spilled_bytes(), 0);
179        assert_eq!(manager.active_file_count().await, 0);
180        assert_eq!(manager.spill_dir(), temp_dir.path());
181    }
182
183    #[tokio::test]
184    async fn test_async_create_spill_file() {
185        let temp_dir = TempDir::new().unwrap();
186        let manager = AsyncSpillManager::new(temp_dir.path()).await.unwrap();
187
188        let file1 = manager.create_file("sort").await.unwrap();
189        let file2 = manager.create_file("sort").await.unwrap();
190        let file3 = manager.create_file("agg").await.unwrap();
191
192        assert_eq!(manager.active_file_count().await, 3);
193
194        // File names should be unique
195        assert_ne!(file1.path(), file2.path());
196        assert!(file1.path().to_str().unwrap().contains("sort_0"));
197        assert!(file2.path().to_str().unwrap().contains("sort_1"));
198        assert!(file3.path().to_str().unwrap().contains("agg_2"));
199    }
200
201    #[tokio::test]
202    async fn test_async_cleanup() {
203        let temp_dir = TempDir::new().unwrap();
204        let manager = AsyncSpillManager::new(temp_dir.path()).await.unwrap();
205
206        // Create some files
207        let _file1 = manager.create_file("test").await.unwrap();
208        let _file2 = manager.create_file("test").await.unwrap();
209        assert_eq!(manager.active_file_count().await, 2);
210
211        // Cleanup should remove all files
212        manager.cleanup().await.unwrap();
213        assert_eq!(manager.active_file_count().await, 0);
214    }
215
216    #[tokio::test]
217    async fn test_async_spilled_bytes_tracking() {
218        let temp_dir = TempDir::new().unwrap();
219        let manager = AsyncSpillManager::new(temp_dir.path()).await.unwrap();
220
221        manager.register_spilled_bytes(1000);
222        manager.register_spilled_bytes(500);
223        assert_eq!(manager.spilled_bytes(), 1500);
224
225        manager.unregister_spilled_bytes(300);
226        assert_eq!(manager.spilled_bytes(), 1200);
227    }
228
229    #[tokio::test]
230    async fn test_async_cleanup_on_drop() {
231        let temp_dir = TempDir::new().unwrap();
232        let temp_path = temp_dir.path().to_path_buf();
233
234        let file_path = {
235            let manager = AsyncSpillManager::new(&temp_path).await.unwrap();
236            let file = manager.create_file("test").await.unwrap();
237            file.path().to_path_buf()
238        };
239
240        // After manager is dropped, the file should be cleaned up
241        assert!(!file_path.exists());
242    }
243}