use super::file::SpillFile;
use parking_lot::Mutex;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
pub struct SpillManager {
spill_dir: PathBuf,
next_file_id: AtomicU64,
active_files: Mutex<Vec<PathBuf>>,
total_spilled_bytes: AtomicU64,
}
impl SpillManager {
pub fn new(spill_dir: impl Into<PathBuf>) -> std::io::Result<Self> {
let spill_dir = spill_dir.into();
std::fs::create_dir_all(&spill_dir)?;
Ok(Self {
spill_dir,
next_file_id: AtomicU64::new(0),
active_files: Mutex::new(Vec::new()),
total_spilled_bytes: AtomicU64::new(0),
})
}
pub fn with_temp_dir() -> std::io::Result<Self> {
let temp_dir = std::env::temp_dir().join("grafeo_spill");
Self::new(temp_dir)
}
#[must_use]
pub fn spill_dir(&self) -> &Path {
&self.spill_dir
}
pub fn create_file(&self, prefix: &str) -> std::io::Result<SpillFile> {
let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
let file_name = format!("{prefix}_{file_id}.spill");
let file_path = self.spill_dir.join(file_name);
self.active_files.lock().push(file_path.clone());
SpillFile::new(file_path)
}
pub fn register_spilled_bytes(&self, bytes: u64) {
self.total_spilled_bytes.fetch_add(bytes, Ordering::Relaxed);
}
pub fn unregister_spilled_bytes(&self, bytes: u64) {
self.total_spilled_bytes.fetch_sub(bytes, Ordering::Relaxed);
}
pub fn unregister_file(&self, path: &Path) {
let mut files = self.active_files.lock();
files.retain(|p| p != path);
}
#[must_use]
pub fn spilled_bytes(&self) -> u64 {
self.total_spilled_bytes.load(Ordering::Relaxed)
}
#[must_use]
pub fn active_file_count(&self) -> usize {
self.active_files.lock().len()
}
pub fn cleanup(&self) -> std::io::Result<()> {
let files = std::mem::take(&mut *self.active_files.lock());
let mut last_error = None;
for path in files {
if let Err(e) = std::fs::remove_file(&path) {
if e.kind() != std::io::ErrorKind::NotFound {
last_error = Some(e);
}
}
}
self.total_spilled_bytes.store(0, Ordering::Relaxed);
match last_error {
Some(e) => Err(e),
None => Ok(()),
}
}
}
impl Drop for SpillManager {
fn drop(&mut self) {
let _ = self.cleanup();
}
}
impl std::fmt::Debug for SpillManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SpillManager")
.field("spill_dir", &self.spill_dir)
.field("active_files", &self.active_file_count())
.field("spilled_bytes", &self.spilled_bytes())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_manager_creation() {
let temp_dir = TempDir::new().unwrap();
let manager = SpillManager::new(temp_dir.path()).unwrap();
assert_eq!(manager.spilled_bytes(), 0);
assert_eq!(manager.active_file_count(), 0);
assert_eq!(manager.spill_dir(), temp_dir.path());
}
#[test]
fn test_create_spill_file() {
let temp_dir = TempDir::new().unwrap();
let manager = SpillManager::new(temp_dir.path()).unwrap();
let file1 = manager.create_file("sort").unwrap();
let file2 = manager.create_file("sort").unwrap();
let file3 = manager.create_file("agg").unwrap();
assert_eq!(manager.active_file_count(), 3);
assert_ne!(file1.path(), file2.path());
assert!(file1.path().to_str().unwrap().contains("sort_0"));
assert!(file2.path().to_str().unwrap().contains("sort_1"));
assert!(file3.path().to_str().unwrap().contains("agg_2"));
}
#[test]
fn test_cleanup() {
let temp_dir = TempDir::new().unwrap();
let manager = SpillManager::new(temp_dir.path()).unwrap();
let _file1 = manager.create_file("test").unwrap();
let _file2 = manager.create_file("test").unwrap();
assert_eq!(manager.active_file_count(), 2);
manager.cleanup().unwrap();
assert_eq!(manager.active_file_count(), 0);
}
#[test]
fn test_spilled_bytes_tracking() {
let temp_dir = TempDir::new().unwrap();
let manager = SpillManager::new(temp_dir.path()).unwrap();
manager.register_spilled_bytes(1000);
manager.register_spilled_bytes(500);
assert_eq!(manager.spilled_bytes(), 1500);
manager.unregister_spilled_bytes(300);
assert_eq!(manager.spilled_bytes(), 1200);
}
#[test]
fn test_cleanup_on_drop() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path().to_path_buf();
let file_path = {
let manager = SpillManager::new(&temp_path).unwrap();
let file = manager.create_file("test").unwrap();
file.path().to_path_buf()
};
assert!(!file_path.exists());
}
}