use anyhow::{Context, Result};
use chrono::{DateTime, Duration, Utc};
use std::collections::HashMap;
use std::path::Path;
use tokio::fs;
use super::GlobalStorage;
#[derive(Debug, Clone, Default)]
pub struct CleanupConfig {
pub older_than: Option<Duration>,
pub dry_run: bool,
pub force: bool,
}
#[derive(Debug, Clone, Default)]
pub struct CleanupStats {
pub items_scanned: usize,
pub items_removed: usize,
pub bytes_reclaimed: u64,
pub errors: Vec<String>,
}
impl CleanupStats {
pub fn new() -> Self {
Self::default()
}
pub fn merge(&mut self, other: &CleanupStats) {
self.items_scanned += other.items_scanned;
self.items_removed += other.items_removed;
self.bytes_reclaimed += other.bytes_reclaimed;
self.errors.extend(other.errors.clone());
}
}
#[derive(Debug, Clone, Default)]
pub struct StorageStats {
pub worktrees_bytes: u64,
pub sessions_bytes: u64,
pub logs_bytes: u64,
pub state_bytes: u64,
pub events_bytes: u64,
pub dlq_bytes: u64,
pub total_bytes: u64,
}
impl StorageStats {
pub fn new() -> Self {
Self::default()
}
pub fn calculate_total(&mut self) {
self.total_bytes = self.worktrees_bytes
+ self.sessions_bytes
+ self.logs_bytes
+ self.state_bytes
+ self.events_bytes
+ self.dlq_bytes;
}
pub fn format_bytes(bytes: u64) -> String {
if bytes < 1024 {
format!("{} B", bytes)
} else if bytes < 1024 * 1024 {
format!("{:.2} KB", bytes as f64 / 1024.0)
} else if bytes < 1024 * 1024 * 1024 {
format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
} else {
format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
}
}
}
pub struct StorageCleanupManager {
storage: GlobalStorage,
repo_name: String,
}
impl StorageCleanupManager {
pub fn new(storage: GlobalStorage, repo_name: String) -> Self {
Self { storage, repo_name }
}
pub async fn get_storage_stats(&self) -> Result<StorageStats> {
let mut stats = StorageStats::new();
let worktrees_dir = self.storage.get_worktrees_dir(&self.repo_name).await?;
stats.worktrees_bytes = calculate_dir_size(&worktrees_dir).await?;
if let Ok(state_base_dir) = self.storage.get_state_base_dir(&self.repo_name).await {
stats.sessions_bytes = calculate_dir_size(&state_base_dir).await?;
}
if let Ok(logs_dir) = self.storage.get_logs_dir(&self.repo_name).await {
stats.logs_bytes = calculate_dir_size(&logs_dir).await?;
}
if let Ok(state_base_dir) = self.storage.get_state_base_dir(&self.repo_name).await {
let mapreduce_dir = state_base_dir.join("mapreduce");
if mapreduce_dir.exists() {
stats.state_bytes = calculate_dir_size(&mapreduce_dir).await?;
}
}
if let Ok(events_base_dir) = self.storage.get_events_base_dir(&self.repo_name).await {
stats.events_bytes = calculate_dir_size(&events_base_dir).await?;
}
if let Ok(dlq_base_dir) = self.storage.get_dlq_base_dir(&self.repo_name).await {
stats.dlq_bytes = calculate_dir_size(&dlq_base_dir).await?;
}
stats.calculate_total();
Ok(stats)
}
pub async fn clean_worktrees(&self, config: &CleanupConfig) -> Result<CleanupStats> {
let worktrees_dir = self.storage.get_worktrees_dir(&self.repo_name).await?;
clean_directory_by_age(&worktrees_dir, config).await
}
pub async fn clean_sessions(&self, config: &CleanupConfig) -> Result<CleanupStats> {
let state_dir = self.storage.get_state_base_dir(&self.repo_name).await?;
clean_directory_by_age(&state_dir, config).await
}
pub async fn clean_logs(&self, config: &CleanupConfig) -> Result<CleanupStats> {
let logs_dir = self.storage.get_logs_dir(&self.repo_name).await?;
clean_directory_by_age(&logs_dir, config).await
}
pub async fn clean_state(&self, config: &CleanupConfig) -> Result<CleanupStats> {
let state_base_dir = self.storage.get_state_base_dir(&self.repo_name).await?;
let mapreduce_dir = state_base_dir.join("mapreduce");
if !mapreduce_dir.exists() {
return Ok(CleanupStats::new());
}
clean_directory_by_age(&mapreduce_dir, config).await
}
pub async fn clean_events(&self, config: &CleanupConfig) -> Result<CleanupStats> {
let events_dir = self.storage.get_events_base_dir(&self.repo_name).await?;
clean_directory_by_age(&events_dir, config).await
}
pub async fn clean_dlq(&self, config: &CleanupConfig) -> Result<CleanupStats> {
let dlq_dir = self.storage.get_dlq_base_dir(&self.repo_name).await?;
clean_directory_by_age(&dlq_dir, config).await
}
pub async fn clean_all(&self, config: &CleanupConfig) -> Result<HashMap<String, CleanupStats>> {
let mut results = HashMap::new();
results.insert("worktrees".to_string(), self.clean_worktrees(config).await?);
results.insert("sessions".to_string(), self.clean_sessions(config).await?);
results.insert("logs".to_string(), self.clean_logs(config).await?);
results.insert("state".to_string(), self.clean_state(config).await?);
results.insert("events".to_string(), self.clean_events(config).await?);
results.insert("dlq".to_string(), self.clean_dlq(config).await?);
Ok(results)
}
}
async fn calculate_dir_size(dir: &Path) -> Result<u64> {
if !dir.exists() {
return Ok(0);
}
let mut total_size = 0u64;
let mut stack = vec![dir.to_path_buf()];
while let Some(current) = stack.pop() {
let mut entries = match fs::read_dir(¤t).await {
Ok(entries) => entries,
Err(_) => continue, };
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
let metadata = match fs::metadata(&path).await {
Ok(m) => m,
Err(_) => continue, };
if metadata.is_dir() {
stack.push(path);
} else {
total_size += metadata.len();
}
}
}
Ok(total_size)
}
async fn clean_directory_by_age(dir: &Path, config: &CleanupConfig) -> Result<CleanupStats> {
let mut stats = CleanupStats::new();
if !dir.exists() {
return Ok(stats);
}
let cutoff_time = config.older_than.map(|duration| Utc::now() - duration);
let mut entries = fs::read_dir(dir)
.await
.context("Failed to read directory")?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
stats.items_scanned += 1;
let metadata = match fs::metadata(&path).await {
Ok(m) => m,
Err(e) => {
stats
.errors
.push(format!("Failed to stat {}: {}", path.display(), e));
continue;
}
};
if let Some(cutoff) = cutoff_time {
if let Ok(modified) = metadata.modified() {
let modified_time: DateTime<Utc> = modified.into();
if modified_time >= cutoff {
continue; }
}
}
let size = if metadata.is_dir() {
calculate_dir_size(&path).await.unwrap_or(0)
} else {
metadata.len()
};
if !config.dry_run {
if let Err(e) = remove_path(&path).await {
stats
.errors
.push(format!("Failed to remove {}: {}", path.display(), e));
continue;
}
}
stats.items_removed += 1;
stats.bytes_reclaimed += size;
}
Ok(stats)
}
async fn remove_path(path: &Path) -> Result<()> {
let metadata = fs::metadata(path).await?;
if metadata.is_dir() {
fs::remove_dir_all(path).await?;
} else {
fs::remove_file(path).await?;
}
Ok(())
}
pub fn parse_duration(s: &str) -> Result<Duration> {
let s = s.trim();
let (num_str, unit) = s.split_at(s.len().saturating_sub(1));
let num: i64 = num_str
.parse()
.with_context(|| format!("Invalid duration number: {}", num_str))?;
match unit {
"s" => Ok(Duration::seconds(num)),
"m" => Ok(Duration::minutes(num)),
"h" => Ok(Duration::hours(num)),
"d" => Ok(Duration::days(num)),
_ => Err(anyhow::anyhow!(
"Invalid duration unit: {}. Use s, m, h, or d",
unit
)),
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_parse_duration() {
assert_eq!(parse_duration("30s").unwrap(), Duration::seconds(30));
assert_eq!(parse_duration("15m").unwrap(), Duration::minutes(15));
assert_eq!(parse_duration("24h").unwrap(), Duration::hours(24));
assert_eq!(parse_duration("7d").unwrap(), Duration::days(7));
assert!(parse_duration("invalid").is_err());
assert!(parse_duration("7x").is_err());
}
#[tokio::test]
async fn test_calculate_dir_size() {
let temp_dir = TempDir::new().unwrap();
let dir = temp_dir.path();
fs::write(dir.join("file1.txt"), "hello").await.unwrap();
fs::write(dir.join("file2.txt"), "world").await.unwrap();
let size = calculate_dir_size(dir).await.unwrap();
assert_eq!(size, 10); }
#[tokio::test]
async fn test_clean_directory_by_age() {
let temp_dir = TempDir::new().unwrap();
let dir = temp_dir.path();
fs::write(dir.join("old1.txt"), "old content")
.await
.unwrap();
fs::write(dir.join("old2.txt"), "more old").await.unwrap();
let config = CleanupConfig {
older_than: Some(Duration::seconds(0)), dry_run: false,
force: true,
};
let stats = clean_directory_by_age(dir, &config).await.unwrap();
assert_eq!(stats.items_scanned, 2);
assert_eq!(stats.items_removed, 2);
}
#[tokio::test]
async fn test_dry_run_doesnt_delete() {
let temp_dir = TempDir::new().unwrap();
let dir = temp_dir.path();
let test_file = dir.join("test.txt");
fs::write(&test_file, "test content").await.unwrap();
let config = CleanupConfig {
older_than: Some(Duration::seconds(0)),
dry_run: true,
force: true,
};
let stats = clean_directory_by_age(dir, &config).await.unwrap();
assert_eq!(stats.items_scanned, 1);
assert_eq!(stats.items_removed, 1);
assert!(test_file.exists());
}
}