Skip to main content

qml_rs/processing/
mod.rs

1//! Job Processing Engine
2//!
3//! This module contains the job processing engine that handles job execution,
4//! worker management, and background job processing.
5
6use async_trait::async_trait;
7use serde::de::DeserializeOwned;
8use std::collections::HashMap;
9use std::marker::PhantomData;
10
11use crate::core::Job;
12use crate::error::{QmlError, Result};
13
14pub mod cleanup;
15pub mod heartbeat;
16#[cfg(feature = "metrics")]
17pub mod metrics;
18pub mod middleware;
19pub mod processor;
20pub mod recurring;
21pub mod retry;
22pub mod scheduler;
23pub mod server;
24pub mod worker;
25
26pub use cleanup::{
27    CleanupWorker, DEFAULT_CLEANUP_INTERVAL, DEFAULT_FAILED_TTL, DEFAULT_SUCCEEDED_TTL,
28};
29pub use heartbeat::{DEFAULT_DEAD_SERVER_TIMEOUT, DEFAULT_HEARTBEAT_INTERVAL, HeartbeatWorker};
30#[cfg(feature = "metrics")]
31pub use metrics::{DEFAULT_JOB_DURATION_BUCKETS, PrometheusMetrics, PrometheusMiddleware};
32pub use middleware::{JobMiddleware, Next, TracingMiddleware};
33pub use processor::{JobProcessor, StateChangeHook};
34pub use recurring::{DEFAULT_RECURRING_BATCH_SIZE, RecurringJobPoller};
35pub use retry::{RetryPolicy, RetryStrategy};
36pub use scheduler::JobScheduler;
37pub use server::{BackgroundJobServer, ServerConfig};
38pub use worker::{WorkerConfig, WorkerContext, WorkerResult};
39
40/// Untyped worker trait — receives the raw [`Job`] and its JSON payload.
41///
42/// Most users should prefer [`TypedWorker`], which deserializes the payload
43/// into a strongly-typed argument struct before dispatch. Implement `Worker`
44/// directly only when you need the full job metadata or when the payload
45/// shape is dynamic.
46#[async_trait]
47pub trait Worker: Send + Sync {
48    /// Execute a job.
49    async fn execute(&self, job: &Job, context: &WorkerContext) -> Result<WorkerResult>;
50
51    /// Get the method name this worker handles
52    fn method_name(&self) -> &str;
53
54    /// Check if this worker can handle the given job method
55    fn can_handle(&self, method: &str) -> bool {
56        self.method_name() == method
57    }
58}
59
60/// Typed worker trait — receives a deserialized `Args` value instead of the
61/// raw JSON payload.
62///
63/// ```no_run
64/// use async_trait::async_trait;
65/// use qml_rs::{TypedWorker, WorkerContext, WorkerResult, Result};
66/// use serde::{Deserialize, Serialize};
67///
68/// #[derive(Serialize, Deserialize)]
69/// struct SendEmailArgs { to: String, subject: String }
70///
71/// struct SendEmailWorker;
72///
73/// #[async_trait]
74/// impl TypedWorker for SendEmailWorker {
75///     type Args = SendEmailArgs;
76///
77///     async fn execute(&self, args: Self::Args, _ctx: &WorkerContext) -> Result<WorkerResult> {
78///         println!("sending {} to {}", args.subject, args.to);
79///         Ok(WorkerResult::success(None, 0))
80///     }
81///
82///     fn method_name(&self) -> &str { "send_email" }
83/// }
84/// ```
85///
86/// Register a `TypedWorker` with [`WorkerRegistry::register_typed`], which
87/// wraps it in a [`TypedWorkerAdapter`] and stores it as a regular
88/// [`Worker`].
89#[async_trait]
90pub trait TypedWorker: Send + Sync {
91    /// Strongly-typed argument payload. Deserialized from `job.payload`
92    /// before [`TypedWorker::execute`] is invoked.
93    type Args: DeserializeOwned + Send + Sync;
94
95    /// Execute a job with its deserialized arguments.
96    async fn execute(&self, args: Self::Args, context: &WorkerContext) -> Result<WorkerResult>;
97
98    /// Get the method name this worker handles.
99    fn method_name(&self) -> &str;
100}
101
102/// Adapter that wraps a [`TypedWorker`] and implements the untyped
103/// [`Worker`] trait, deserializing `job.payload` into `W::Args` before
104/// dispatching.
105pub struct TypedWorkerAdapter<W: TypedWorker> {
106    inner: W,
107    _args: PhantomData<fn() -> W::Args>,
108}
109
110impl<W: TypedWorker> TypedWorkerAdapter<W> {
111    pub fn new(inner: W) -> Self {
112        Self {
113            inner,
114            _args: PhantomData,
115        }
116    }
117}
118
119#[async_trait]
120impl<W> Worker for TypedWorkerAdapter<W>
121where
122    W: TypedWorker + 'static,
123{
124    async fn execute(&self, job: &Job, context: &WorkerContext) -> Result<WorkerResult> {
125        let args: W::Args =
126            serde_json::from_value(job.payload.clone()).map_err(|e| QmlError::WorkerError {
127                message: format!(
128                    "Failed to deserialize typed payload for method {}: {}",
129                    self.inner.method_name(),
130                    e
131                ),
132            })?;
133        self.inner.execute(args, context).await
134    }
135
136    fn method_name(&self) -> &str {
137        self.inner.method_name()
138    }
139}
140
141/// Registry for job workers
142///
143/// This registry maps job method names to their corresponding worker implementations.
144/// It's used by the job processor to find the appropriate worker for each job.
145#[derive(Default)]
146pub struct WorkerRegistry {
147    workers: HashMap<String, Box<dyn Worker>>,
148}
149
150impl WorkerRegistry {
151    /// Create a new worker registry
152    pub fn new() -> Self {
153        Self {
154            workers: HashMap::new(),
155        }
156    }
157
158    /// Register a worker for a specific method
159    pub fn register<W>(&mut self, worker: W)
160    where
161        W: Worker + 'static,
162    {
163        let method_name = worker.method_name().to_string();
164        self.workers.insert(method_name, Box::new(worker));
165    }
166
167    /// Register a [`TypedWorker`] for a specific method.
168    ///
169    /// Wraps the worker in a [`TypedWorkerAdapter`] that deserializes
170    /// `job.payload` into `W::Args` before dispatching.
171    pub fn register_typed<W>(&mut self, worker: W)
172    where
173        W: TypedWorker + 'static,
174    {
175        self.register(TypedWorkerAdapter::new(worker));
176    }
177
178    /// Get a worker for the given method name
179    pub fn get_worker(&self, method: &str) -> Option<&dyn Worker> {
180        self.workers.get(method).map(|w| w.as_ref())
181    }
182
183    /// Get all registered method names
184    pub fn get_methods(&self) -> Vec<&str> {
185        self.workers.keys().map(|s| s.as_str()).collect()
186    }
187
188    /// Check if a method is registered
189    pub fn has_worker(&self, method: &str) -> bool {
190        self.workers.contains_key(method)
191    }
192
193    /// Get the number of registered workers
194    pub fn len(&self) -> usize {
195        self.workers.len()
196    }
197
198    /// Check if the registry is empty
199    pub fn is_empty(&self) -> bool {
200        self.workers.is_empty()
201    }
202}