graphos_core/execution/spill/
async_manager.rs1use super::async_file::AsyncSpillFile;
4use std::path::{Path, PathBuf};
5use std::sync::atomic::{AtomicU64, Ordering};
6use tokio::sync::Mutex;
7
8pub struct AsyncSpillManager {
15 spill_dir: PathBuf,
17 next_file_id: AtomicU64,
19 active_files: Mutex<Vec<PathBuf>>,
21 total_spilled_bytes: AtomicU64,
23}
24
25impl AsyncSpillManager {
26 pub async fn new(spill_dir: impl Into<PathBuf>) -> std::io::Result<Self> {
34 let spill_dir = spill_dir.into();
35 tokio::fs::create_dir_all(&spill_dir).await?;
36
37 Ok(Self {
38 spill_dir,
39 next_file_id: AtomicU64::new(0),
40 active_files: Mutex::new(Vec::new()),
41 total_spilled_bytes: AtomicU64::new(0),
42 })
43 }
44
45 pub async fn with_temp_dir() -> std::io::Result<Self> {
51 let temp_dir = std::env::temp_dir().join("graphos_spill");
52 Self::new(temp_dir).await
53 }
54
55 #[must_use]
57 pub fn spill_dir(&self) -> &Path {
58 &self.spill_dir
59 }
60
61 pub async fn create_file(&self, prefix: &str) -> std::io::Result<AsyncSpillFile> {
69 let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
70 let file_name = format!("{prefix}_{file_id}.spill");
71 let file_path = self.spill_dir.join(file_name);
72
73 self.active_files.lock().await.push(file_path.clone());
75
76 AsyncSpillFile::new(file_path).await
77 }
78
79 pub fn register_spilled_bytes(&self, bytes: u64) {
83 self.total_spilled_bytes.fetch_add(bytes, Ordering::Relaxed);
84 }
85
86 pub fn unregister_spilled_bytes(&self, bytes: u64) {
90 self.total_spilled_bytes.fetch_sub(bytes, Ordering::Relaxed);
91 }
92
93 pub async fn unregister_file(&self, path: &Path) {
95 let mut files = self.active_files.lock().await;
96 files.retain(|p| p != path);
97 }
98
99 #[must_use]
101 pub fn spilled_bytes(&self) -> u64 {
102 self.total_spilled_bytes.load(Ordering::Relaxed)
103 }
104
105 pub async fn active_file_count(&self) -> usize {
107 self.active_files.lock().await.len()
108 }
109
110 pub async fn cleanup(&self) -> std::io::Result<()> {
118 let files = std::mem::take(&mut *self.active_files.lock().await);
119 let mut last_error = None;
120
121 for path in files {
122 if let Err(e) = tokio::fs::remove_file(&path).await {
123 if e.kind() != std::io::ErrorKind::NotFound {
125 last_error = Some(e);
126 }
127 }
128 }
129
130 self.total_spilled_bytes.store(0, Ordering::Relaxed);
131
132 match last_error {
133 Some(e) => Err(e),
134 None => Ok(()),
135 }
136 }
137
138 fn cleanup_sync(&self) {
142 if let Ok(mut files) = self.active_files.try_lock() {
144 let files_to_remove = std::mem::take(&mut *files);
145 for path in files_to_remove {
146 let _ = std::fs::remove_file(&path);
147 }
148 }
149 }
150}
151
152impl Drop for AsyncSpillManager {
153 fn drop(&mut self) {
154 self.cleanup_sync();
156 }
157}
158
159impl std::fmt::Debug for AsyncSpillManager {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 f.debug_struct("AsyncSpillManager")
162 .field("spill_dir", &self.spill_dir)
163 .field("spilled_bytes", &self.spilled_bytes())
164 .finish()
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use tempfile::TempDir;
172
173 #[tokio::test]
174 async fn test_async_manager_creation() {
175 let temp_dir = TempDir::new().unwrap();
176 let manager = AsyncSpillManager::new(temp_dir.path()).await.unwrap();
177
178 assert_eq!(manager.spilled_bytes(), 0);
179 assert_eq!(manager.active_file_count().await, 0);
180 assert_eq!(manager.spill_dir(), temp_dir.path());
181 }
182
183 #[tokio::test]
184 async fn test_async_create_spill_file() {
185 let temp_dir = TempDir::new().unwrap();
186 let manager = AsyncSpillManager::new(temp_dir.path()).await.unwrap();
187
188 let file1 = manager.create_file("sort").await.unwrap();
189 let file2 = manager.create_file("sort").await.unwrap();
190 let file3 = manager.create_file("agg").await.unwrap();
191
192 assert_eq!(manager.active_file_count().await, 3);
193
194 assert_ne!(file1.path(), file2.path());
196 assert!(file1.path().to_str().unwrap().contains("sort_0"));
197 assert!(file2.path().to_str().unwrap().contains("sort_1"));
198 assert!(file3.path().to_str().unwrap().contains("agg_2"));
199 }
200
201 #[tokio::test]
202 async fn test_async_cleanup() {
203 let temp_dir = TempDir::new().unwrap();
204 let manager = AsyncSpillManager::new(temp_dir.path()).await.unwrap();
205
206 let _file1 = manager.create_file("test").await.unwrap();
208 let _file2 = manager.create_file("test").await.unwrap();
209 assert_eq!(manager.active_file_count().await, 2);
210
211 manager.cleanup().await.unwrap();
213 assert_eq!(manager.active_file_count().await, 0);
214 }
215
216 #[tokio::test]
217 async fn test_async_spilled_bytes_tracking() {
218 let temp_dir = TempDir::new().unwrap();
219 let manager = AsyncSpillManager::new(temp_dir.path()).await.unwrap();
220
221 manager.register_spilled_bytes(1000);
222 manager.register_spilled_bytes(500);
223 assert_eq!(manager.spilled_bytes(), 1500);
224
225 manager.unregister_spilled_bytes(300);
226 assert_eq!(manager.spilled_bytes(), 1200);
227 }
228
229 #[tokio::test]
230 async fn test_async_cleanup_on_drop() {
231 let temp_dir = TempDir::new().unwrap();
232 let temp_path = temp_dir.path().to_path_buf();
233
234 let file_path = {
235 let manager = AsyncSpillManager::new(&temp_path).await.unwrap();
236 let file = manager.create_file("test").await.unwrap();
237 file.path().to_path_buf()
238 };
239
240 assert!(!file_path.exists());
242 }
243}