grafeo_core/execution/spill/
manager.rs1use super::file::SpillFile;
4use parking_lot::Mutex;
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicU64, Ordering};
7
8pub struct SpillManager {
20 spill_dir: PathBuf,
22 next_file_id: AtomicU64,
24 active_files: Mutex<Vec<PathBuf>>,
26 total_spilled_bytes: AtomicU64,
28 owns_dir: bool,
30}
31
32impl SpillManager {
33 pub fn new(spill_dir: impl Into<PathBuf>) -> std::io::Result<Self> {
43 let spill_dir = spill_dir.into();
44 std::fs::create_dir_all(&spill_dir)?;
45
46 Ok(Self {
47 spill_dir,
48 next_file_id: AtomicU64::new(0),
49 active_files: Mutex::new(Vec::new()),
50 total_spilled_bytes: AtomicU64::new(0),
51 owns_dir: false,
52 })
53 }
54
55 #[must_use]
63 pub fn with_owned_dir(mut self) -> Self {
64 self.owns_dir = true;
65 self
66 }
67
68 pub fn with_temp_dir() -> std::io::Result<Self> {
74 let temp_dir = std::env::temp_dir().join("grafeo_spill");
75 Self::new(temp_dir)
76 }
77
78 #[must_use]
80 pub fn spill_dir(&self) -> &Path {
81 &self.spill_dir
82 }
83
84 pub fn create_file(&self, prefix: &str) -> std::io::Result<SpillFile> {
92 let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
93 let file_name = format!("{prefix}_{file_id}.spill");
94 let file_path = self.spill_dir.join(file_name);
95
96 self.active_files.lock().push(file_path.clone());
98
99 SpillFile::new(file_path)
100 }
101
102 pub fn register_spilled_bytes(&self, bytes: u64) {
106 self.total_spilled_bytes.fetch_add(bytes, Ordering::Relaxed);
107 }
108
109 pub fn unregister_spilled_bytes(&self, bytes: u64) {
113 self.total_spilled_bytes.fetch_sub(bytes, Ordering::Relaxed);
114 }
115
116 pub fn unregister_file(&self, path: &Path) {
118 let mut files = self.active_files.lock();
119 files.retain(|p| p != path);
120 }
121
122 #[must_use]
124 pub fn spilled_bytes(&self) -> u64 {
125 self.total_spilled_bytes.load(Ordering::Relaxed)
126 }
127
128 #[must_use]
130 pub fn active_file_count(&self) -> usize {
131 self.active_files.lock().len()
132 }
133
134 pub fn cleanup(&self) -> std::io::Result<()> {
142 let files = std::mem::take(&mut *self.active_files.lock());
143 let mut last_error = None;
144
145 for path in files {
146 if let Err(e) = std::fs::remove_file(&path) {
147 if e.kind() != std::io::ErrorKind::NotFound {
149 last_error = Some(e);
150 }
151 }
152 }
153
154 self.total_spilled_bytes.store(0, Ordering::Relaxed);
155
156 match last_error {
157 Some(e) => Err(e),
158 None => Ok(()),
159 }
160 }
161}
162
163impl Drop for SpillManager {
164 fn drop(&mut self) {
165 let _ = self.cleanup();
167 if self.owns_dir {
168 let _ = std::fs::remove_dir(&self.spill_dir);
172 }
173 }
174}
175
176impl std::fmt::Debug for SpillManager {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 f.debug_struct("SpillManager")
179 .field("spill_dir", &self.spill_dir)
180 .field("active_files", &self.active_file_count())
181 .field("spilled_bytes", &self.spilled_bytes())
182 .finish()
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189 use tempfile::TempDir;
190
191 #[test]
192 fn test_manager_creation() {
193 let temp_dir = TempDir::new().unwrap();
194 let manager = SpillManager::new(temp_dir.path()).unwrap();
195
196 assert_eq!(manager.spilled_bytes(), 0);
197 assert_eq!(manager.active_file_count(), 0);
198 assert_eq!(manager.spill_dir(), temp_dir.path());
199 }
200
201 #[test]
202 fn test_create_spill_file() {
203 let temp_dir = TempDir::new().unwrap();
204 let manager = SpillManager::new(temp_dir.path()).unwrap();
205
206 let file1 = manager.create_file("sort").unwrap();
207 let file2 = manager.create_file("sort").unwrap();
208 let file3 = manager.create_file("agg").unwrap();
209
210 assert_eq!(manager.active_file_count(), 3);
211
212 assert_ne!(file1.path(), file2.path());
214 assert!(file1.path().to_str().unwrap().contains("sort_0"));
215 assert!(file2.path().to_str().unwrap().contains("sort_1"));
216 assert!(file3.path().to_str().unwrap().contains("agg_2"));
217 }
218
219 #[test]
220 fn test_cleanup() {
221 let temp_dir = TempDir::new().unwrap();
222 let manager = SpillManager::new(temp_dir.path()).unwrap();
223
224 let _file1 = manager.create_file("test").unwrap();
226 let _file2 = manager.create_file("test").unwrap();
227 assert_eq!(manager.active_file_count(), 2);
228
229 manager.cleanup().unwrap();
231 assert_eq!(manager.active_file_count(), 0);
232 }
233
234 #[test]
235 fn test_spilled_bytes_tracking() {
236 let temp_dir = TempDir::new().unwrap();
237 let manager = SpillManager::new(temp_dir.path()).unwrap();
238
239 manager.register_spilled_bytes(1000);
240 manager.register_spilled_bytes(500);
241 assert_eq!(manager.spilled_bytes(), 1500);
242
243 manager.unregister_spilled_bytes(300);
244 assert_eq!(manager.spilled_bytes(), 1200);
245 }
246
247 #[test]
248 fn test_cleanup_on_drop() {
249 let temp_dir = TempDir::new().unwrap();
250 let temp_path = temp_dir.path().to_path_buf();
251
252 let file_path = {
253 let manager = SpillManager::new(&temp_path).unwrap();
254 let file = manager.create_file("test").unwrap();
255 file.path().to_path_buf()
256 };
257
258 assert!(!file_path.exists());
260 }
261
262 #[test]
263 fn unowned_dir_survives_drop() {
264 let temp_dir = TempDir::new().unwrap();
268 let dir_path = temp_dir.path().join("shared_spill");
269
270 {
271 let _manager = SpillManager::new(&dir_path).unwrap();
272 }
273
274 assert!(
275 dir_path.exists(),
276 "default SpillManager must not remove its directory on drop"
277 );
278 }
279
280 #[test]
281 fn owned_dir_removed_after_files_cleaned_on_drop() {
282 let temp_dir = TempDir::new().unwrap();
286 let query_dir = temp_dir.path().join("query_42");
287
288 {
289 let manager = SpillManager::new(&query_dir).unwrap().with_owned_dir();
290 let _file = manager.create_file("sort").unwrap();
291 assert!(query_dir.exists());
292 }
293
294 assert!(
295 !query_dir.exists(),
296 "with_owned_dir manager must remove its empty directory on drop"
297 );
298 }
299
300 #[test]
301 fn owned_dir_preserved_when_unexpected_contents_remain() {
302 let temp_dir = TempDir::new().unwrap();
306 let query_dir = temp_dir.path().join("query_with_extra");
307
308 let stray_file = {
309 let manager = SpillManager::new(&query_dir).unwrap().with_owned_dir();
310 let _file = manager.create_file("sort").unwrap();
311 let stray = query_dir.join("not_tracked.dat");
312 std::fs::write(&stray, b"keep me").unwrap();
313 stray
314 };
315
316 assert!(
317 query_dir.exists(),
318 "directory with untracked content must not be removed"
319 );
320 assert!(
321 stray_file.exists(),
322 "untracked file must not be touched by SpillManager Drop"
323 );
324 }
325}