1use async_trait::async_trait;
7use serde::de::DeserializeOwned;
8use std::collections::HashMap;
9use std::marker::PhantomData;
10
11use crate::core::Job;
12use crate::error::{QmlError, Result};
13
14pub mod cleanup;
15pub mod heartbeat;
16#[cfg(feature = "metrics")]
17pub mod metrics;
18pub mod middleware;
19pub mod processor;
20pub mod recurring;
21pub mod retry;
22pub mod scheduler;
23pub mod server;
24pub mod worker;
25
26pub use cleanup::{
27 CleanupWorker, DEFAULT_CLEANUP_INTERVAL, DEFAULT_FAILED_TTL, DEFAULT_SUCCEEDED_TTL,
28};
29pub use heartbeat::{DEFAULT_DEAD_SERVER_TIMEOUT, DEFAULT_HEARTBEAT_INTERVAL, HeartbeatWorker};
30#[cfg(feature = "metrics")]
31pub use metrics::{DEFAULT_JOB_DURATION_BUCKETS, PrometheusMetrics, PrometheusMiddleware};
32pub use middleware::{JobMiddleware, Next, TracingMiddleware};
33pub use processor::{JobProcessor, StateChangeHook};
34pub use recurring::{DEFAULT_RECURRING_BATCH_SIZE, RecurringJobPoller};
35pub use retry::{RetryPolicy, RetryStrategy};
36pub use scheduler::JobScheduler;
37pub use server::{BackgroundJobServer, ServerConfig};
38pub use worker::{WorkerConfig, WorkerContext, WorkerResult};
39
40#[async_trait]
47pub trait Worker: Send + Sync {
48 async fn execute(&self, job: &Job, context: &WorkerContext) -> Result<WorkerResult>;
50
51 fn method_name(&self) -> &str;
53
54 fn can_handle(&self, method: &str) -> bool {
56 self.method_name() == method
57 }
58}
59
60#[async_trait]
90pub trait TypedWorker: Send + Sync {
91 type Args: DeserializeOwned + Send + Sync;
94
95 async fn execute(&self, args: Self::Args, context: &WorkerContext) -> Result<WorkerResult>;
97
98 fn method_name(&self) -> &str;
100}
101
102pub struct TypedWorkerAdapter<W: TypedWorker> {
106 inner: W,
107 _args: PhantomData<fn() -> W::Args>,
108}
109
110impl<W: TypedWorker> TypedWorkerAdapter<W> {
111 pub fn new(inner: W) -> Self {
112 Self {
113 inner,
114 _args: PhantomData,
115 }
116 }
117}
118
119#[async_trait]
120impl<W> Worker for TypedWorkerAdapter<W>
121where
122 W: TypedWorker + 'static,
123{
124 async fn execute(&self, job: &Job, context: &WorkerContext) -> Result<WorkerResult> {
125 let args: W::Args =
126 serde_json::from_value(job.payload.clone()).map_err(|e| QmlError::WorkerError {
127 message: format!(
128 "Failed to deserialize typed payload for method {}: {}",
129 self.inner.method_name(),
130 e
131 ),
132 })?;
133 self.inner.execute(args, context).await
134 }
135
136 fn method_name(&self) -> &str {
137 self.inner.method_name()
138 }
139}
140
141#[derive(Default)]
146pub struct WorkerRegistry {
147 workers: HashMap<String, Box<dyn Worker>>,
148}
149
150impl WorkerRegistry {
151 pub fn new() -> Self {
153 Self {
154 workers: HashMap::new(),
155 }
156 }
157
158 pub fn register<W>(&mut self, worker: W)
160 where
161 W: Worker + 'static,
162 {
163 let method_name = worker.method_name().to_string();
164 self.workers.insert(method_name, Box::new(worker));
165 }
166
167 pub fn register_typed<W>(&mut self, worker: W)
172 where
173 W: TypedWorker + 'static,
174 {
175 self.register(TypedWorkerAdapter::new(worker));
176 }
177
178 pub fn get_worker(&self, method: &str) -> Option<&dyn Worker> {
180 self.workers.get(method).map(|w| w.as_ref())
181 }
182
183 pub fn get_methods(&self) -> Vec<&str> {
185 self.workers.keys().map(|s| s.as_str()).collect()
186 }
187
188 pub fn has_worker(&self, method: &str) -> bool {
190 self.workers.contains_key(method)
191 }
192
193 pub fn len(&self) -> usize {
195 self.workers.len()
196 }
197
198 pub fn is_empty(&self) -> bool {
200 self.workers.is_empty()
201 }
202}