use super::error::StorageResult;
use super::types::HealthStatus;
use anyhow::{Context, Result};
use std::path::{Path, PathBuf};
use tokio::fs;
#[derive(Clone)]
pub struct GlobalStorage {
base_dir: PathBuf,
}
impl GlobalStorage {
pub fn new() -> StorageResult<Self> {
let base_dir = super::get_default_storage_dir()?;
Ok(Self { base_dir })
}
#[cfg(test)]
pub fn new_with_root(base_dir: PathBuf) -> StorageResult<Self> {
Ok(Self { base_dir })
}
pub fn base_dir(&self) -> &Path {
&self.base_dir
}
pub async fn get_events_dir(&self, repo_name: &str, job_id: &str) -> Result<PathBuf> {
let path = self.base_dir.join("events").join(repo_name).join(job_id);
fs::create_dir_all(&path)
.await
.context("Failed to create global events directory")?;
Ok(path)
}
pub async fn get_dlq_dir(&self, repo_name: &str, job_id: &str) -> Result<PathBuf> {
let path = self.base_dir.join("dlq").join(repo_name).join(job_id);
fs::create_dir_all(&path)
.await
.context("Failed to create global DLQ directory")?;
Ok(path)
}
pub async fn get_state_dir(&self, repo_name: &str, job_id: &str) -> Result<PathBuf> {
let path = self.base_dir.join("state").join(repo_name).join(job_id);
fs::create_dir_all(&path)
.await
.context("Failed to create global state directory")?;
Ok(path)
}
pub async fn get_checkpoints_dir(&self, repo_name: &str) -> Result<PathBuf> {
let path = self
.base_dir
.join("state")
.join(repo_name)
.join("checkpoints");
fs::create_dir_all(&path)
.await
.context("Failed to create global checkpoints directory")?;
Ok(path)
}
pub async fn list_dlq_job_ids(&self, repo_name: &str) -> Result<Vec<String>> {
let dlq_repo_dir = self.base_dir.join("dlq").join(repo_name);
if !dlq_repo_dir.exists() {
return Ok(vec![]);
}
let mut job_ids = Vec::new();
let mut entries = fs::read_dir(dlq_repo_dir).await?;
while let Some(entry) = entries.next_entry().await? {
if entry.file_type().await?.is_dir() {
if let Some(job_id) = entry.file_name().to_str() {
let items_dir = entry.path().join("items");
if items_dir.exists() {
let mut items_entries = fs::read_dir(&items_dir).await?;
if items_entries.next_entry().await?.is_some() {
job_ids.push(job_id.to_string());
}
}
}
}
}
job_ids.sort();
job_ids.reverse(); Ok(job_ids)
}
pub async fn health_check(&self) -> StorageResult<HealthStatus> {
Ok(HealthStatus {
healthy: true,
backend_type: "global".to_string(),
message: Some("Global file storage is operational".to_string()),
details: None,
})
}
pub async fn get_worktrees_dir(&self, repo_name: &str) -> Result<PathBuf> {
let path = self.base_dir.join("worktrees").join(repo_name);
fs::create_dir_all(&path)
.await
.context("Failed to create worktrees directory")?;
Ok(path)
}
pub async fn get_state_base_dir(&self, repo_name: &str) -> Result<PathBuf> {
let path = self.base_dir.join("state").join(repo_name);
fs::create_dir_all(&path)
.await
.context("Failed to create state directory")?;
Ok(path)
}
pub async fn get_logs_dir(&self, repo_name: &str) -> Result<PathBuf> {
let path = self.base_dir.join("logs").join(repo_name);
fs::create_dir_all(&path)
.await
.context("Failed to create logs directory")?;
Ok(path)
}
pub async fn get_events_base_dir(&self, repo_name: &str) -> Result<PathBuf> {
let path = self.base_dir.join("events").join(repo_name);
fs::create_dir_all(&path)
.await
.context("Failed to create events directory")?;
Ok(path)
}
pub async fn get_dlq_base_dir(&self, repo_name: &str) -> Result<PathBuf> {
let path = self.base_dir.join("dlq").join(repo_name);
fs::create_dir_all(&path)
.await
.context("Failed to create DLQ directory")?;
Ok(path)
}
pub async fn list_all_repositories(&self) -> Result<Vec<String>> {
let mut repos = std::collections::HashSet::new();
for storage_type in &["worktrees", "events", "dlq", "state", "logs"] {
let type_dir = self.base_dir.join(storage_type);
if type_dir.exists() {
let mut entries = fs::read_dir(&type_dir).await?;
while let Some(entry) = entries.next_entry().await? {
if entry.file_type().await?.is_dir() {
if let Some(repo_name) = entry.file_name().to_str() {
repos.insert(repo_name.to_string());
}
}
}
}
}
let mut repo_list: Vec<String> = repos.into_iter().collect();
repo_list.sort();
Ok(repo_list)
}
}