graphos_core/execution/spill/
manager.rs1use super::file::SpillFile;
4use parking_lot::Mutex;
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicU64, Ordering};
7
8pub struct SpillManager {
15 spill_dir: PathBuf,
17 next_file_id: AtomicU64,
19 active_files: Mutex<Vec<PathBuf>>,
21 total_spilled_bytes: AtomicU64,
23}
24
25impl SpillManager {
26 pub fn new(spill_dir: impl Into<PathBuf>) -> std::io::Result<Self> {
34 let spill_dir = spill_dir.into();
35 std::fs::create_dir_all(&spill_dir)?;
36
37 Ok(Self {
38 spill_dir,
39 next_file_id: AtomicU64::new(0),
40 active_files: Mutex::new(Vec::new()),
41 total_spilled_bytes: AtomicU64::new(0),
42 })
43 }
44
45 pub fn with_temp_dir() -> std::io::Result<Self> {
51 let temp_dir = std::env::temp_dir().join("graphos_spill");
52 Self::new(temp_dir)
53 }
54
55 #[must_use]
57 pub fn spill_dir(&self) -> &Path {
58 &self.spill_dir
59 }
60
61 pub fn create_file(&self, prefix: &str) -> std::io::Result<SpillFile> {
69 let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
70 let file_name = format!("{prefix}_{file_id}.spill");
71 let file_path = self.spill_dir.join(file_name);
72
73 self.active_files.lock().push(file_path.clone());
75
76 SpillFile::new(file_path)
77 }
78
79 pub fn register_spilled_bytes(&self, bytes: u64) {
83 self.total_spilled_bytes.fetch_add(bytes, Ordering::Relaxed);
84 }
85
86 pub fn unregister_spilled_bytes(&self, bytes: u64) {
90 self.total_spilled_bytes.fetch_sub(bytes, Ordering::Relaxed);
91 }
92
93 pub fn unregister_file(&self, path: &Path) {
95 let mut files = self.active_files.lock();
96 files.retain(|p| p != path);
97 }
98
99 #[must_use]
101 pub fn spilled_bytes(&self) -> u64 {
102 self.total_spilled_bytes.load(Ordering::Relaxed)
103 }
104
105 #[must_use]
107 pub fn active_file_count(&self) -> usize {
108 self.active_files.lock().len()
109 }
110
111 pub fn cleanup(&self) -> std::io::Result<()> {
119 let files = std::mem::take(&mut *self.active_files.lock());
120 let mut last_error = None;
121
122 for path in files {
123 if let Err(e) = std::fs::remove_file(&path) {
124 if e.kind() != std::io::ErrorKind::NotFound {
126 last_error = Some(e);
127 }
128 }
129 }
130
131 self.total_spilled_bytes.store(0, Ordering::Relaxed);
132
133 match last_error {
134 Some(e) => Err(e),
135 None => Ok(()),
136 }
137 }
138}
139
140impl Drop for SpillManager {
141 fn drop(&mut self) {
142 let _ = self.cleanup();
144 }
145}
146
147impl std::fmt::Debug for SpillManager {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.debug_struct("SpillManager")
150 .field("spill_dir", &self.spill_dir)
151 .field("active_files", &self.active_file_count())
152 .field("spilled_bytes", &self.spilled_bytes())
153 .finish()
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160 use tempfile::TempDir;
161
162 #[test]
163 fn test_manager_creation() {
164 let temp_dir = TempDir::new().unwrap();
165 let manager = SpillManager::new(temp_dir.path()).unwrap();
166
167 assert_eq!(manager.spilled_bytes(), 0);
168 assert_eq!(manager.active_file_count(), 0);
169 assert_eq!(manager.spill_dir(), temp_dir.path());
170 }
171
172 #[test]
173 fn test_create_spill_file() {
174 let temp_dir = TempDir::new().unwrap();
175 let manager = SpillManager::new(temp_dir.path()).unwrap();
176
177 let file1 = manager.create_file("sort").unwrap();
178 let file2 = manager.create_file("sort").unwrap();
179 let file3 = manager.create_file("agg").unwrap();
180
181 assert_eq!(manager.active_file_count(), 3);
182
183 assert_ne!(file1.path(), file2.path());
185 assert!(file1.path().to_str().unwrap().contains("sort_0"));
186 assert!(file2.path().to_str().unwrap().contains("sort_1"));
187 assert!(file3.path().to_str().unwrap().contains("agg_2"));
188 }
189
190 #[test]
191 fn test_cleanup() {
192 let temp_dir = TempDir::new().unwrap();
193 let manager = SpillManager::new(temp_dir.path()).unwrap();
194
195 let _file1 = manager.create_file("test").unwrap();
197 let _file2 = manager.create_file("test").unwrap();
198 assert_eq!(manager.active_file_count(), 2);
199
200 manager.cleanup().unwrap();
202 assert_eq!(manager.active_file_count(), 0);
203 }
204
205 #[test]
206 fn test_spilled_bytes_tracking() {
207 let temp_dir = TempDir::new().unwrap();
208 let manager = SpillManager::new(temp_dir.path()).unwrap();
209
210 manager.register_spilled_bytes(1000);
211 manager.register_spilled_bytes(500);
212 assert_eq!(manager.spilled_bytes(), 1500);
213
214 manager.unregister_spilled_bytes(300);
215 assert_eq!(manager.spilled_bytes(), 1200);
216 }
217
218 #[test]
219 fn test_cleanup_on_drop() {
220 let temp_dir = TempDir::new().unwrap();
221 let temp_path = temp_dir.path().to_path_buf();
222
223 let file_path = {
224 let manager = SpillManager::new(&temp_path).unwrap();
225 let file = manager.create_file("test").unwrap();
226 file.path().to_path_buf()
227 };
228
229 assert!(!file_path.exists());
231 }
232}