use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::broadcast;
use super::config::{JobConfig, ConfigError, StorageBackend};
use super::types::{Job, JobId, JobStatus, JobProgress, JobStats};
#[derive(Debug, thiserror::Error)]
pub enum StorageError {
#[error("Connection error: {0}")]
Connection(String),
#[error("Job not found: {0}")]
NotFound(JobId),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Database error: {0}")]
Database(String),
#[error("Backend not available: {0}")]
BackendNotAvailable(String),
#[error("Configuration error: {0}")]
Config(#[from] ConfigError),
#[error("Operation failed: {0}")]
Operation(String),
}
pub type StorageResult<T> = Result<T, StorageError>;
#[async_trait]
pub trait JobStorage: Send + Sync + 'static {
fn backend_name(&self) -> &'static str;
async fn setup(&self) -> StorageResult<()>;
async fn enqueue(&self, job: Job) -> StorageResult<JobId>;
async fn dequeue(&self, worker_id: &str) -> StorageResult<Option<Job>>;
async fn get(&self, job_id: JobId) -> StorageResult<Option<Job>>;
async fn update(&self, job: &Job) -> StorageResult<()>;
async fn complete(&self, job_id: JobId, result: Option<serde_json::Value>) -> StorageResult<()>;
async fn fail(&self, job_id: JobId, error: &str) -> StorageResult<()>;
async fn cancel(&self, job_id: JobId) -> StorageResult<()>;
async fn retry(&self, job_id: JobId) -> StorageResult<()>;
async fn list(&self, filter: JobFilter) -> StorageResult<Vec<Job>>;
async fn stats(&self) -> StorageResult<JobStats>;
async fn heartbeat(&self, worker_id: &str, job_id: JobId) -> StorageResult<()>;
async fn requeue_orphaned(&self, timeout_secs: u64) -> StorageResult<usize>;
async fn cleanup(&self, older_than_secs: u64) -> StorageResult<usize>;
async fn close(&self) -> StorageResult<()>;
}
#[derive(Debug, Clone, Default)]
pub struct JobFilter {
pub status: Option<JobStatus>,
pub job_type: Option<String>,
pub skill_id: Option<String>,
pub worker_id: Option<String>,
pub limit: Option<usize>,
pub offset: Option<usize>,
pub order_by: Option<String>,
pub descending: bool,
}
impl JobFilter {
pub fn new() -> Self {
Self::default()
}
pub fn with_status(mut self, status: JobStatus) -> Self {
self.status = Some(status);
self
}
pub fn with_skill_id(mut self, skill_id: impl Into<String>) -> Self {
self.skill_id = Some(skill_id.into());
self
}
pub fn with_limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
pub fn with_offset(mut self, offset: usize) -> Self {
self.offset = Some(offset);
self
}
pub fn order_by(mut self, field: impl Into<String>, descending: bool) -> Self {
self.order_by = Some(field.into());
self.descending = descending;
self
}
}
pub struct JobQueue {
storage: Arc<dyn JobStorage>,
config: JobConfig,
progress_tx: broadcast::Sender<JobProgress>,
}
impl JobQueue {
pub fn new(storage: Arc<dyn JobStorage>, config: JobConfig) -> Self {
let (progress_tx, _) = broadcast::channel(100);
Self {
storage,
config,
progress_tx,
}
}
pub fn storage(&self) -> &Arc<dyn JobStorage> {
&self.storage
}
pub fn config(&self) -> &JobConfig {
&self.config
}
pub fn subscribe_progress(&self) -> broadcast::Receiver<JobProgress> {
self.progress_tx.subscribe()
}
pub async fn enqueue(&self, job: Job) -> StorageResult<JobId> {
self.storage.enqueue(job).await
}
pub async fn get(&self, job_id: JobId) -> StorageResult<Option<Job>> {
self.storage.get(job_id).await
}
pub fn report_progress(&self, progress: JobProgress) {
let _ = self.progress_tx.send(progress);
}
pub async fn pending_jobs(&self) -> StorageResult<Vec<Job>> {
self.storage.list(JobFilter::new().with_status(JobStatus::Pending)).await
}
pub async fn running_jobs(&self) -> StorageResult<Vec<Job>> {
self.storage.list(JobFilter::new().with_status(JobStatus::Running)).await
}
pub async fn stats(&self) -> StorageResult<JobStats> {
self.storage.stats().await
}
pub async fn cleanup(&self) -> StorageResult<usize> {
let older_than = self.config.cleanup_after.as_secs();
self.storage.cleanup(older_than).await
}
pub async fn recover_orphans(&self) -> StorageResult<usize> {
let timeout = self.config.job_timeout.as_secs();
self.storage.requeue_orphaned(timeout).await
}
}
pub async fn create_storage(config: &JobConfig) -> StorageResult<Arc<dyn JobStorage>> {
config.validate()?;
match config.backend {
StorageBackend::Memory => {
#[cfg(feature = "sqlite-storage")]
{
let memory_config = JobConfig {
connection: ":memory:".to_string(),
..config.clone()
};
let storage = super::sqlite::SqliteJobStorage::new(&memory_config).await?;
storage.setup().await?;
Ok(Arc::new(storage))
}
#[cfg(not(feature = "sqlite-storage"))]
{
Err(StorageError::BackendNotAvailable(
"Memory backend requires 'sqlite-storage' feature".to_string()
))
}
}
StorageBackend::Sqlite => {
#[cfg(feature = "sqlite-storage")]
{
let storage = super::sqlite::SqliteJobStorage::new(config).await?;
storage.setup().await?;
Ok(Arc::new(storage))
}
#[cfg(not(feature = "sqlite-storage"))]
{
Err(StorageError::BackendNotAvailable(
"Enable 'sqlite-storage' feature for SQLite support".to_string()
))
}
}
StorageBackend::Postgres => {
#[cfg(feature = "postgres-storage")]
{
let storage = super::postgres::PostgresJobStorage::new(config).await?;
storage.setup().await?;
Ok(Arc::new(storage))
}
#[cfg(not(feature = "postgres-storage"))]
{
Err(StorageError::BackendNotAvailable(
"Enable 'postgres-storage' feature for PostgreSQL support".to_string()
))
}
}
StorageBackend::Redis => {
#[cfg(feature = "redis-storage")]
{
let storage = super::redis_backend::RedisJobStorage::new(config).await?;
Ok(Arc::new(storage))
}
#[cfg(not(feature = "redis-storage"))]
{
Err(StorageError::BackendNotAvailable(
"Enable 'redis-storage' feature for Redis support".to_string()
))
}
}
}
}
pub async fn create_job_queue(config: JobConfig) -> StorageResult<JobQueue> {
let storage = create_storage(&config).await?;
Ok(JobQueue::new(storage, config))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_job_filter() {
let filter = JobFilter::new()
.with_status(JobStatus::Pending)
.with_skill_id("kubernetes")
.with_limit(10)
.order_by("created_at", true);
assert_eq!(filter.status, Some(JobStatus::Pending));
assert_eq!(filter.skill_id, Some("kubernetes".to_string()));
assert_eq!(filter.limit, Some(10));
assert!(filter.descending);
}
}