pub mod cleanup;
pub mod config;
pub mod error;
pub mod factory;
pub mod global;
pub mod lock;
pub mod session_job_mapping;
pub mod types;
#[cfg(test)]
mod tests;
pub use cleanup::{
parse_duration, CleanupConfig, CleanupStats, StorageCleanupManager, StorageStats,
};
pub use config::{BackendConfig, BackendType, StorageConfig};
pub use error::{StorageError, StorageResult};
pub use factory::StorageFactory;
pub use global::GlobalStorage;
pub use lock::{StorageLock, StorageLockGuard};
pub use session_job_mapping::SessionJobMapping;
pub use types::{
CheckpointFilter, DLQFilter, EventFilter, EventStats, EventStream, EventSubscription,
HealthStatus, SessionFilter, SessionId, SessionState, WorkflowFilter,
};
use anyhow::{anyhow, Context, Result};
use std::path::{Path, PathBuf};
pub async fn init_from_env() -> StorageResult<GlobalStorage> {
StorageFactory::from_env().await
}
pub fn extract_repo_name(repo_path: &Path) -> Result<String> {
let canonical_path = repo_path
.canonicalize()
.unwrap_or_else(|_| repo_path.to_path_buf());
canonical_path
.file_name()
.and_then(|n| n.to_str())
.map(|s| s.to_string())
.ok_or_else(|| {
anyhow!(
"Could not determine repository name from path: {}",
repo_path.display()
)
})
}
pub fn get_default_storage_dir() -> Result<PathBuf> {
if let Ok(prodigy_home) = std::env::var("PRODIGY_HOME") {
let path = PathBuf::from(prodigy_home);
if !path.exists() {
std::fs::create_dir_all(&path).with_context(|| {
format!(
"Failed to create PRODIGY_HOME directory: {}",
path.display()
)
})?;
}
return Ok(path);
}
#[cfg(test)]
{
use std::sync::OnceLock;
static TEST_DIR: OnceLock<PathBuf> = OnceLock::new();
let test_dir = TEST_DIR.get_or_init(|| {
let temp_dir =
std::env::temp_dir().join(format!("prodigy-test-{}", std::process::id()));
std::fs::create_dir_all(&temp_dir).unwrap();
temp_dir
});
Ok(test_dir.clone())
}
#[cfg(not(test))]
{
directories::BaseDirs::new()
.ok_or_else(|| anyhow!("Could not determine base directories"))
.map(|dirs| dirs.home_dir().join(".prodigy"))
}
}
pub async fn discover_dlq_job_ids(project_root: &Path) -> Result<Vec<String>> {
let storage = GlobalStorage::new()?;
let repo_name = extract_repo_name(project_root)?;
storage.list_dlq_job_ids(&repo_name).await
}
pub async fn create_global_event_logger(
repo_path: &Path,
job_id: &str,
) -> Result<crate::cook::execution::events::EventLogger> {
use crate::cook::execution::events::{EventLogger, EventWriter, JsonlEventWriter};
let storage = GlobalStorage::new()?;
let repo_name = extract_repo_name(repo_path)?;
let events_dir = storage.get_events_dir(&repo_name, job_id).await?;
let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S");
let event_file = events_dir.join(format!("events-{}.jsonl", timestamp));
let writer: Box<dyn EventWriter> = Box::new(
JsonlEventWriter::new(event_file)
.await
.context("Failed to create global event writer")?,
);
Ok(EventLogger::new(vec![writer]))
}
pub async fn create_global_dlq(
repo_path: &Path,
job_id: &str,
event_logger: Option<std::sync::Arc<crate::cook::execution::events::EventLogger>>,
) -> Result<crate::cook::execution::dlq::DeadLetterQueue> {
use crate::cook::execution::dlq::DeadLetterQueue;
let storage = GlobalStorage::new()?;
let repo_name = extract_repo_name(repo_path)?;
let dlq_dir = storage.get_dlq_dir(&repo_name, job_id).await?;
DeadLetterQueue::new(
job_id.to_string(),
dlq_dir,
1000, 30, event_logger,
)
.await
}
#[cfg(test)]
mod mod_tests {
use super::*;
use serde_json::json;
use tempfile::TempDir;
use tokio::fs;
#[test]
fn test_extract_repo_name() {
let path = Path::new("/home/user/projects/my-repo");
assert_eq!(extract_repo_name(path).unwrap(), "my-repo");
let path = Path::new("/home/user/projects/another-repo/");
assert_eq!(extract_repo_name(path).unwrap(), "another-repo");
}
#[tokio::test]
async fn test_global_storage_paths() {
let temp_dir = TempDir::new().unwrap();
let repo_path = temp_dir.path().join("test-repo");
std::fs::create_dir(&repo_path).unwrap();
let storage = GlobalStorage::new().unwrap();
let repo_name = "test-repo";
let events_dir = storage.get_events_dir(repo_name, "job-123").await.unwrap();
assert!(events_dir.exists());
assert!(events_dir.ends_with("events/test-repo/job-123"));
let dlq_dir = storage.get_dlq_dir(repo_name, "job-123").await.unwrap();
assert!(dlq_dir.exists());
assert!(dlq_dir.ends_with("dlq/test-repo/job-123"));
let state_dir = storage.get_state_dir(repo_name, "job-123").await.unwrap();
assert!(state_dir.exists());
assert!(state_dir.ends_with("state/test-repo/job-123"));
}
#[tokio::test]
async fn test_cross_worktree_event_aggregation() {
use serde_json::json;
let temp_dir = TempDir::new().unwrap();
let repo_path = temp_dir.path().join("test-repo");
std::fs::create_dir(&repo_path).unwrap();
let job_id = "shared-job-123";
let repo_name = "test-repo";
let storage1 = GlobalStorage::new().unwrap();
let storage2 = GlobalStorage::new().unwrap();
let events_dir1 = storage1.get_events_dir(repo_name, job_id).await.unwrap();
let events_dir2 = storage2.get_events_dir(repo_name, job_id).await.unwrap();
assert_eq!(events_dir1, events_dir2);
assert!(events_dir1.ends_with("events/test-repo/shared-job-123"));
let event_file1 = events_dir1.join("events-wt1.jsonl");
let event_file2 = events_dir2.join("events-wt2.jsonl");
fs::write(
&event_file1,
json!({"worktree": 1, "event": "test"}).to_string() + "\n",
)
.await
.unwrap();
fs::write(
&event_file2,
json!({"worktree": 2, "event": "test"}).to_string() + "\n",
)
.await
.unwrap();
assert!(event_file1.exists());
assert!(event_file2.exists());
let mut entries = fs::read_dir(&events_dir1).await.unwrap();
let mut event_files = Vec::new();
while let Some(entry) = entries.next_entry().await.unwrap() {
if entry.path().extension().and_then(|s| s.to_str()) == Some("jsonl") {
event_files.push(entry.path());
}
}
assert_eq!(event_files.len(), 2);
}
#[tokio::test]
async fn test_global_storage_isolation_between_repos() {
let _temp_dir = TempDir::new().unwrap();
let storage1 = GlobalStorage::new().unwrap();
let storage2 = GlobalStorage::new().unwrap();
let job_id = "same-job-id";
let events_dir1 = storage1.get_events_dir("repo-one", job_id).await.unwrap();
let events_dir2 = storage2.get_events_dir("repo-two", job_id).await.unwrap();
assert_ne!(events_dir1, events_dir2);
assert!(events_dir1.ends_with("events/repo-one/same-job-id"));
assert!(events_dir2.ends_with("events/repo-two/same-job-id"));
}
#[tokio::test]
async fn test_list_dlq_job_ids() {
let temp_dir = TempDir::new().unwrap();
let storage = GlobalStorage::new_with_root(temp_dir.path().to_path_buf()).unwrap();
let repo_name = "test-repo";
let job_ids = storage.list_dlq_job_ids(repo_name).await.unwrap();
assert!(job_ids.is_empty());
for i in 1..=3 {
let job_id = format!("job-{}", i);
let dlq_dir = storage.get_dlq_dir(repo_name, &job_id).await.unwrap();
let items_dir = dlq_dir.join("items");
fs::create_dir_all(&items_dir).await.unwrap();
let item_file = items_dir.join("item-1.json");
fs::write(&item_file, "{}").await.unwrap();
}
let empty_job_dir = storage.get_dlq_dir(repo_name, "empty-job").await.unwrap();
fs::create_dir_all(&empty_job_dir.join("items"))
.await
.unwrap();
let job_ids = storage.list_dlq_job_ids(repo_name).await.unwrap();
assert_eq!(job_ids.len(), 3);
assert_eq!(job_ids, vec!["job-3", "job-2", "job-1"]);
}
#[tokio::test]
async fn test_discover_dlq_job_ids() {
let temp_dir = TempDir::new().unwrap();
let project_path = temp_dir.path().join("test-project");
std::fs::create_dir(&project_path).unwrap();
let job_ids = discover_dlq_job_ids(&project_path).await.unwrap();
assert!(job_ids.is_empty());
let storage = GlobalStorage::new().unwrap();
let repo_name = extract_repo_name(&project_path).unwrap();
let job_dir = storage.get_dlq_dir(&repo_name, "test-job").await.unwrap();
let items_dir = job_dir.join("items");
fs::create_dir_all(&items_dir).await.unwrap();
let item_file = items_dir.join("item-1.json");
fs::write(&item_file, "{}").await.unwrap();
let job_ids = storage.list_dlq_job_ids(&repo_name).await.unwrap();
assert_eq!(job_ids.len(), 1);
assert_eq!(job_ids[0], "test-job");
}
}