use async_trait::async_trait;
use serde::de::DeserializeOwned;
use std::collections::HashMap;
use std::marker::PhantomData;
use crate::core::Job;
use crate::error::{QmlError, Result};
pub mod cleanup;
pub mod heartbeat;
#[cfg(feature = "metrics")]
pub mod metrics;
pub mod middleware;
pub mod processor;
pub mod recurring;
pub mod retry;
pub mod scheduler;
pub mod server;
pub mod worker;
pub use cleanup::{
CleanupWorker, DEFAULT_CLEANUP_INTERVAL, DEFAULT_FAILED_TTL, DEFAULT_SUCCEEDED_TTL,
};
pub use heartbeat::{DEFAULT_DEAD_SERVER_TIMEOUT, DEFAULT_HEARTBEAT_INTERVAL, HeartbeatWorker};
#[cfg(feature = "metrics")]
pub use metrics::{DEFAULT_JOB_DURATION_BUCKETS, PrometheusMetrics, PrometheusMiddleware};
pub use middleware::{JobMiddleware, Next, TracingMiddleware};
pub use processor::{JobProcessor, StateChangeHook};
pub use recurring::{DEFAULT_RECURRING_BATCH_SIZE, RecurringJobPoller};
pub use retry::{RetryPolicy, RetryStrategy};
pub use scheduler::JobScheduler;
pub use server::{BackgroundJobServer, ServerConfig};
pub use worker::{WorkerConfig, WorkerContext, WorkerResult};
#[async_trait]
pub trait Worker: Send + Sync {
async fn execute(&self, job: &Job, context: &WorkerContext) -> Result<WorkerResult>;
fn method_name(&self) -> &str;
fn can_handle(&self, method: &str) -> bool {
self.method_name() == method
}
}
#[async_trait]
pub trait TypedWorker: Send + Sync {
type Args: DeserializeOwned + Send + Sync;
async fn execute(&self, args: Self::Args, context: &WorkerContext) -> Result<WorkerResult>;
fn method_name(&self) -> &str;
}
pub struct TypedWorkerAdapter<W: TypedWorker> {
inner: W,
_args: PhantomData<fn() -> W::Args>,
}
impl<W: TypedWorker> TypedWorkerAdapter<W> {
pub fn new(inner: W) -> Self {
Self {
inner,
_args: PhantomData,
}
}
}
#[async_trait]
impl<W> Worker for TypedWorkerAdapter<W>
where
W: TypedWorker + 'static,
{
async fn execute(&self, job: &Job, context: &WorkerContext) -> Result<WorkerResult> {
let args: W::Args =
serde_json::from_value(job.payload.clone()).map_err(|e| QmlError::WorkerError {
message: format!(
"Failed to deserialize typed payload for method {}: {}",
self.inner.method_name(),
e
),
})?;
self.inner.execute(args, context).await
}
fn method_name(&self) -> &str {
self.inner.method_name()
}
}
#[derive(Default)]
pub struct WorkerRegistry {
workers: HashMap<String, Box<dyn Worker>>,
}
impl WorkerRegistry {
pub fn new() -> Self {
Self {
workers: HashMap::new(),
}
}
pub fn register<W>(&mut self, worker: W)
where
W: Worker + 'static,
{
let method_name = worker.method_name().to_string();
self.workers.insert(method_name, Box::new(worker));
}
pub fn register_typed<W>(&mut self, worker: W)
where
W: TypedWorker + 'static,
{
self.register(TypedWorkerAdapter::new(worker));
}
pub fn get_worker(&self, method: &str) -> Option<&dyn Worker> {
self.workers.get(method).map(|w| w.as_ref())
}
pub fn get_methods(&self) -> Vec<&str> {
self.workers.keys().map(|s| s.as_str()).collect()
}
pub fn has_worker(&self, method: &str) -> bool {
self.workers.contains_key(method)
}
pub fn len(&self) -> usize {
self.workers.len()
}
pub fn is_empty(&self) -> bool {
self.workers.is_empty()
}
}