use crate::{
Result,
job::{Job, JobId},
rate_limit::ThrottleConfig,
stats::{DeadJobSummary, QueueStats},
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::{Database, Pool};
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use tokio::sync::RwLock;
#[cfg(feature = "postgres")]
pub mod postgres;
#[cfg(feature = "mysql")]
pub mod mysql;
#[cfg(feature = "test")]
pub mod test;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueuePauseInfo {
pub queue_name: String,
pub paused_at: DateTime<Utc>,
pub paused_by: Option<String>,
pub reason: Option<String>,
}
#[async_trait]
pub trait DatabaseQueue: Send + Sync {
type Database: Database;
async fn enqueue(&self, job: Job) -> Result<JobId>;
async fn dequeue(&self, queue_name: &str) -> Result<Option<Job>>;
async fn dequeue_with_priority_weights(
&self,
queue_name: &str,
weights: &crate::priority::PriorityWeights,
) -> Result<Option<Job>>;
async fn complete_job(&self, job_id: JobId) -> Result<()>;
async fn fail_job(&self, job_id: JobId, error_message: &str) -> Result<()>;
async fn retry_job(&self, job_id: JobId, retry_at: DateTime<Utc>) -> Result<()>;
async fn get_job(&self, job_id: JobId) -> Result<Option<Job>>;
async fn delete_job(&self, job_id: JobId) -> Result<()>;
async fn enqueue_batch(&self, batch: crate::batch::JobBatch) -> Result<crate::batch::BatchId>;
async fn get_batch_status(
&self,
batch_id: crate::batch::BatchId,
) -> Result<crate::batch::BatchResult>;
async fn get_batch_jobs(&self, batch_id: crate::batch::BatchId) -> Result<Vec<Job>>;
async fn delete_batch(&self, batch_id: crate::batch::BatchId) -> Result<()>;
async fn mark_job_dead(&self, job_id: JobId, error_message: &str) -> Result<()>;
async fn mark_job_timed_out(&self, job_id: JobId, error_message: &str) -> Result<()>;
async fn get_dead_jobs(&self, limit: Option<u32>, offset: Option<u32>) -> Result<Vec<Job>>;
async fn get_dead_jobs_by_queue(
&self,
queue_name: &str,
limit: Option<u32>,
offset: Option<u32>,
) -> Result<Vec<Job>>;
async fn retry_dead_job(&self, job_id: JobId) -> Result<()>;
async fn purge_dead_jobs(&self, older_than: DateTime<Utc>) -> Result<u64>;
async fn get_dead_job_summary(&self) -> Result<DeadJobSummary>;
async fn get_queue_stats(&self, queue_name: &str) -> Result<QueueStats>;
async fn get_all_queue_stats(&self) -> Result<Vec<QueueStats>>;
async fn get_job_counts_by_status(
&self,
queue_name: &str,
) -> Result<std::collections::HashMap<String, u64>>;
async fn get_priority_stats(&self, queue_name: &str) -> Result<crate::priority::PriorityStats>;
async fn get_processing_times(
&self,
queue_name: &str,
since: DateTime<Utc>,
) -> Result<Vec<i64>>;
async fn get_error_frequencies(
&self,
queue_name: Option<&str>,
since: DateTime<Utc>,
) -> Result<std::collections::HashMap<String, u64>>;
async fn get_jobs_completed_in_range(
&self,
queue_name: Option<&str>,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
limit: Option<u32>,
) -> Result<Vec<Job>>;
async fn enqueue_cron_job(&self, job: Job) -> Result<JobId>;
async fn get_due_cron_jobs(&self, queue_name: Option<&str>) -> Result<Vec<Job>>;
async fn reschedule_cron_job(&self, job_id: JobId, next_run_at: DateTime<Utc>) -> Result<()>;
async fn get_recurring_jobs(&self, queue_name: &str) -> Result<Vec<Job>>;
async fn disable_recurring_job(&self, job_id: JobId) -> Result<()>;
async fn enable_recurring_job(&self, job_id: JobId) -> Result<()>;
async fn set_throttle_config(&self, queue_name: &str, config: ThrottleConfig) -> Result<()>;
async fn get_throttle_config(&self, queue_name: &str) -> Result<Option<ThrottleConfig>>;
async fn remove_throttle_config(&self, queue_name: &str) -> Result<()>;
async fn get_all_throttle_configs(&self) -> Result<HashMap<String, ThrottleConfig>>;
async fn get_queue_depth(&self, queue_name: &str) -> Result<u64>;
async fn store_job_result(
&self,
job_id: JobId,
result_data: serde_json::Value,
expires_at: Option<DateTime<Utc>>,
) -> Result<()>;
async fn get_job_result(&self, job_id: JobId) -> Result<Option<serde_json::Value>>;
async fn delete_job_result(&self, job_id: JobId) -> Result<()>;
async fn cleanup_expired_results(&self) -> Result<u64>;
async fn enqueue_workflow(
&self,
workflow: crate::workflow::JobGroup,
) -> Result<crate::workflow::WorkflowId>;
async fn get_workflow_status(
&self,
workflow_id: crate::workflow::WorkflowId,
) -> Result<Option<crate::workflow::JobGroup>>;
async fn resolve_job_dependencies(&self, completed_job_id: JobId) -> Result<Vec<JobId>>;
async fn get_ready_jobs(&self, queue_name: &str, limit: u32) -> Result<Vec<Job>>;
async fn fail_job_dependencies(&self, failed_job_id: JobId) -> Result<Vec<JobId>>;
async fn get_workflow_jobs(&self, workflow_id: crate::workflow::WorkflowId)
-> Result<Vec<Job>>;
async fn cancel_workflow(&self, workflow_id: crate::workflow::WorkflowId) -> Result<()>;
async fn archive_jobs(
&self,
queue_name: Option<&str>,
policy: &crate::archive::ArchivalPolicy,
config: &crate::archive::ArchivalConfig,
reason: crate::archive::ArchivalReason,
archived_by: Option<&str>,
) -> Result<crate::archive::ArchivalStats>;
async fn restore_archived_job(&self, job_id: JobId) -> Result<Job>;
async fn list_archived_jobs(
&self,
queue_name: Option<&str>,
limit: Option<u32>,
offset: Option<u32>,
) -> Result<Vec<crate::archive::ArchivedJob>>;
async fn purge_archived_jobs(&self, older_than: DateTime<Utc>) -> Result<u64>;
async fn get_archival_stats(
&self,
queue_name: Option<&str>,
) -> Result<crate::archive::ArchivalStats>;
async fn pause_queue(&self, queue_name: &str, paused_by: Option<&str>) -> Result<()>;
async fn resume_queue(&self, queue_name: &str, resumed_by: Option<&str>) -> Result<()>;
async fn is_queue_paused(&self, queue_name: &str) -> Result<bool>;
async fn get_queue_pause_info(&self, queue_name: &str) -> Result<Option<QueuePauseInfo>>;
async fn get_paused_queues(&self) -> Result<Vec<QueuePauseInfo>>;
}
pub struct JobQueue<DB: Database> {
#[allow(dead_code)] pub pool: Pool<DB>,
pub(crate) _phantom: PhantomData<DB>,
pub(crate) throttle_configs: Arc<RwLock<HashMap<String, ThrottleConfig>>>,
}
impl<DB: Database> Clone for JobQueue<DB> {
fn clone(&self) -> Self {
Self {
pool: self.pool.clone(),
_phantom: PhantomData,
throttle_configs: self.throttle_configs.clone(),
}
}
}
impl<DB: Database> JobQueue<DB> {
pub fn new(pool: Pool<DB>) -> Self {
Self {
pool,
_phantom: PhantomData,
throttle_configs: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn set_throttle(&self, queue_name: &str, config: ThrottleConfig) -> Result<()> {
let mut configs = self.throttle_configs.write().await;
configs.insert(queue_name.to_string(), config);
Ok(())
}
pub async fn get_throttle(&self, queue_name: &str) -> Option<ThrottleConfig> {
let configs = self.throttle_configs.read().await;
configs.get(queue_name).cloned()
}
pub async fn remove_throttle(&self, queue_name: &str) -> Result<()> {
let mut configs = self.throttle_configs.write().await;
configs.remove(queue_name);
Ok(())
}
pub async fn get_all_throttles(&self) -> HashMap<String, ThrottleConfig> {
let configs = self.throttle_configs.read().await;
configs.clone()
}
pub fn get_pool(&self) -> &Pool<DB> {
&self.pool
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "postgres")]
#[tokio::test]
#[ignore] async fn test_public_pool_access() {
let database_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| {
"postgres://postgres:hammerwork@localhost:5433/hammerwork".to_string()
});
let pool = sqlx::PgPool::connect(&database_url).await.unwrap();
let queue = JobQueue::new(pool.clone());
let _pool_ref = &queue.pool;
let _pool_clone = queue.pool.clone();
let _same_pool = std::ptr::eq(&queue.pool, &pool);
}
#[test]
fn test_queue_creation_with_pool() {
#[cfg(feature = "test")]
{
use crate::queue::test::TestQueue;
let test_queue = TestQueue::new();
let _ = test_queue; }
}
#[test]
fn test_pool_field_visibility() {
#[allow(dead_code)]
fn _check_pool_access<DB: sqlx::Database>(queue: &JobQueue<DB>) -> &sqlx::Pool<DB> {
&queue.pool }
}
#[test]
fn test_throttle_configs_still_private() {
fn _ensure_throttle_configs_private<DB: sqlx::Database>(_queue: &JobQueue<DB>) {
}
}
#[test]
fn test_pool_documentation_examples() {
#[cfg(feature = "postgres")]
#[allow(dead_code)]
async fn _example_archive_integration()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let pool = sqlx::PgPool::connect("postgresql://localhost/hammerwork").await?;
let queue = std::sync::Arc::new(JobQueue::new(pool.clone()));
let _archiver = crate::archive::JobArchiver::new(queue.pool.clone());
Ok(())
}
}
#[test]
fn test_throttle_config_creation() {
use crate::rate_limit::ThrottleConfig;
let throttle_config = ThrottleConfig::new()
.max_concurrent(10)
.rate_per_minute(60)
.enabled(true);
assert_eq!(throttle_config.max_concurrent, Some(10));
assert_eq!(throttle_config.rate_per_minute, Some(60));
assert!(throttle_config.enabled);
let default_config = ThrottleConfig::new();
assert_eq!(default_config.max_concurrent, None);
assert_eq!(default_config.rate_per_minute, None);
assert!(default_config.enabled);
}
#[test]
fn test_job_queue_clone() {
#[cfg(feature = "test")]
{
use crate::queue::test::TestQueue;
let test_queue = TestQueue::new();
let cloned_queue = test_queue.clone();
let _ = test_queue;
let _ = cloned_queue;
}
#[allow(dead_code)]
fn _test_job_queue_clone_trait<DB: sqlx::Database>()
-> impl Fn(&JobQueue<DB>) -> JobQueue<DB> {
|queue: &JobQueue<DB>| queue.clone()
}
}
}