graphile_worker_task_handler/handler.rs
1use graphile_worker_ctx::WorkerContext;
2use serde::Deserialize;
3use serde::Serialize;
4use serde_json::Value;
5use std::fmt::Debug;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::Arc;
9
10/// Type-erased function used by workers to run a task from a [`WorkerContext`].
11pub type TaskHandlerFn = Arc<
12 dyn Fn(WorkerContext) -> Pin<Box<dyn Future<Output = TaskHandlerOutcome> + Send>> + Send + Sync,
13>;
14
15/// Outcome returned by type-erased task handlers.
16///
17/// Normal task handlers only produce complete/failed outcomes. Batch handlers
18/// may also include a replacement payload so only failed batch items are retried.
19#[derive(Debug, Clone, PartialEq)]
20pub enum TaskHandlerOutcome {
21 Complete,
22 Failed {
23 error: String,
24 replacement_payload: Option<Value>,
25 },
26}
27
28impl TaskHandlerOutcome {
29 pub fn failed(error: impl Into<String>) -> Self {
30 Self::Failed {
31 error: error.into(),
32 replacement_payload: None,
33 }
34 }
35
36 pub fn failed_with_replacement(
37 error: impl Into<String>,
38 replacement_payload: impl Into<Value>,
39 ) -> Self {
40 Self::Failed {
41 error: error.into(),
42 replacement_payload: Some(replacement_payload.into()),
43 }
44 }
45}
46
47/// Trait for converting task handler return types into a standardized Result.
48///
49/// This trait allows task handlers to return different types while maintaining
50/// a consistent interface. It enables task handlers to return:
51/// - `()` (unit type) for successful execution with no return value
52/// - `Result<(), E>` for success/failure with error type E
53///
54/// The trait converts these different return types into a standardized `Result<(), impl Debug>`.
55pub trait IntoTaskHandlerResult {
56 /// Converts the implementing type into a task handler result.
57 ///
58 /// # Returns
59 /// A Result<(), impl Debug> where:
60 /// - Ok(()) represents successful task execution
61 /// - Err(e) represents task failure with a debug-printable error
62 fn into_task_handler_result(self) -> Result<(), impl Debug>;
63}
64
65/// Reusable registration value for a [`TaskHandler`].
66///
67/// `JobDefinition` lets crates and modules expose the jobs they provide as
68/// values, so applications can register a collection of jobs in one call.
69#[derive(Clone)]
70pub struct JobDefinition {
71 identifier: &'static str,
72 handler: TaskHandlerFn,
73}
74
75impl JobDefinition {
76 /// Creates a job definition for a task handler type.
77 pub fn of<T: TaskHandler>() -> Self {
78 let handler = move |ctx: WorkerContext| {
79 let ctx = ctx.clone();
80 Box::pin(run_task_from_worker_ctx_outcome::<T>(ctx))
81 as Pin<Box<dyn Future<Output = TaskHandlerOutcome> + Send>>
82 };
83
84 Self {
85 identifier: T::IDENTIFIER,
86 handler: Arc::new(handler),
87 }
88 }
89
90 /// Creates a job definition for a batch task handler type.
91 pub fn of_batch<T: BatchTaskHandler>() -> Self {
92 let handler = move |ctx: WorkerContext| {
93 let ctx = ctx.clone();
94 Box::pin(run_batch_task_from_worker_ctx::<T>(ctx))
95 as Pin<Box<dyn Future<Output = TaskHandlerOutcome> + Send>>
96 };
97
98 Self {
99 identifier: T::IDENTIFIER,
100 handler: Arc::new(handler),
101 }
102 }
103
104 /// The identifier handled by this definition.
105 pub fn identifier(&self) -> &'static str {
106 self.identifier
107 }
108
109 /// The type-erased task handler function.
110 pub fn handler(&self) -> TaskHandlerFn {
111 self.handler.clone()
112 }
113
114 /// Splits this definition into the identifier and handler function.
115 pub fn into_parts(self) -> (&'static str, TaskHandlerFn) {
116 (self.identifier, self.handler)
117 }
118}
119
120/// Implementation for the unit type, allowing tasks to simply return `()`.
121impl IntoTaskHandlerResult for () {
122 fn into_task_handler_result(self) -> Result<(), impl Debug> {
123 Ok::<_, ()>(())
124 }
125}
126
127/// Implementation for Result types, allowing tasks to return errors directly.
128impl<D: Debug> IntoTaskHandlerResult for Result<(), D> {
129 fn into_task_handler_result(self) -> Result<(), impl Debug> {
130 self
131 }
132}
133
134/// Core trait for defining task handlers in Graphile Worker.
135///
136/// A TaskHandler represents a specific job type that can be processed by Graphile Worker.
137/// It defines:
138/// - A unique identifier for the task type
139/// - The payload structure (via the implementing type's fields)
140/// - The execution logic in the `run` method
141///
142/// # Type Requirements
143/// - `Serialize`: The task must be serializable to JSON for storage in the database
144/// - `Deserialize`: The task must be deserializable from JSON when retrieved from the database
145/// - `Send + Sync + 'static`: The task must be safe to send between threads
146///
147/// # Example
148/// ```
149/// use graphile_worker_task_handler::{TaskHandler, IntoTaskHandlerResult};
150/// use graphile_worker_ctx::WorkerContext;
151/// use serde::{Deserialize, Serialize};
152///
153/// #[derive(Deserialize, Serialize)]
154/// struct SendEmail {
155/// to: String,
156/// subject: String,
157/// body: String,
158/// }
159///
160/// impl TaskHandler for SendEmail {
161/// const IDENTIFIER: &'static str = "send_email";
162///
163/// async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
164/// println!("Sending email to {} with subject '{}'", self.to, self.subject);
165/// // Actual email sending logic would go here
166/// Ok::<(), String>(())
167/// }
168/// }
169/// ```
170pub trait TaskHandler: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
171 /// Unique identifier for this task type.
172 ///
173 /// This identifier must be unique across all task types in your application.
174 /// It is used to match incoming jobs to the correct handler.
175 ///
176 /// # Best Practices
177 /// - Use lowercase snake_case names
178 /// - Make names descriptive but concise
179 /// - Ensure they are globally unique across your application
180 const IDENTIFIER: &'static str;
181
182 /// Returns a reusable registration value for this task handler.
183 ///
184 /// This is equivalent to [`JobDefinition::of`] and is useful when
185 /// a module wants to expose all of its jobs as a collection.
186 fn definition() -> JobDefinition
187 where
188 Self: Sized,
189 {
190 JobDefinition::of::<Self>()
191 }
192
193 /// Execute the task logic.
194 ///
195 /// This method is called when a job of this task type is processed by the worker.
196 /// It contains the actual business logic for processing the job.
197 ///
198 /// # Arguments
199 /// * `self` - The task payload, deserialized from the job's JSON payload
200 /// * `ctx` - Worker context providing access to job metadata and extensions
201 ///
202 /// # Returns
203 /// An async result that converts to a TaskHandlerResult, indicating success or failure.
204 /// - Return `Ok(())` or just `()` for success
205 /// - Return `Err(error)` for failure, which will trigger retries
206 fn run(
207 self,
208 ctx: WorkerContext,
209 ) -> impl Future<Output = impl IntoTaskHandlerResult> + Send + 'static;
210}
211
212/// Result returned by a batch task handler.
213///
214/// `ItemResults` must have the same length and order as the input batch. Failed
215/// item positions are retried with the corresponding original payload values;
216/// successful item positions are removed from the retried job payload.
217#[derive(Debug, Clone, PartialEq)]
218pub enum BatchTaskResult<E> {
219 Complete,
220 FailAll(E),
221 ItemResults(Vec<Result<(), E>>),
222}
223
224/// Trait for converting batch task return types into a standardized result.
225pub trait IntoBatchTaskHandlerResult {
226 fn into_batch_task_handler_result(self) -> BatchTaskResult<impl Debug>;
227}
228
229impl IntoBatchTaskHandlerResult for () {
230 fn into_batch_task_handler_result(self) -> BatchTaskResult<impl Debug> {
231 BatchTaskResult::<()>::Complete
232 }
233}
234
235impl<D: Debug> IntoBatchTaskHandlerResult for Result<(), D> {
236 fn into_batch_task_handler_result(self) -> BatchTaskResult<impl Debug> {
237 match self {
238 Ok(()) => BatchTaskResult::Complete,
239 Err(error) => BatchTaskResult::FailAll(error),
240 }
241 }
242}
243
244impl<D: Debug> IntoBatchTaskHandlerResult for Vec<Result<(), D>> {
245 fn into_batch_task_handler_result(self) -> BatchTaskResult<impl Debug> {
246 BatchTaskResult::ItemResults(self)
247 }
248}
249
250impl<D: Debug> IntoBatchTaskHandlerResult for BatchTaskResult<D> {
251 fn into_batch_task_handler_result(self) -> BatchTaskResult<impl Debug> {
252 self
253 }
254}
255
256/// Core trait for defining batch task handlers.
257///
258/// Implement this when a single database job should contain an array of item
259/// payloads and the worker should retry only the failed items after partial
260/// success. `Self` is the item payload type, not `Vec<Self>`.
261pub trait BatchTaskHandler: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
262 /// Unique identifier for this batch task type.
263 const IDENTIFIER: &'static str;
264
265 /// Returns a reusable registration value for this batch task handler.
266 fn definition() -> JobDefinition
267 where
268 Self: Sized,
269 {
270 JobDefinition::of_batch::<Self>()
271 }
272
273 /// Execute a batch of item payloads.
274 fn run_batch(
275 items: Vec<Self>,
276 ctx: WorkerContext,
277 ) -> impl Future<Output = impl IntoBatchTaskHandlerResult> + Send + 'static;
278}
279
280async fn run_task_from_worker_ctx_outcome<T: TaskHandler>(
281 worker_context: WorkerContext,
282) -> TaskHandlerOutcome {
283 match run_task_from_worker_ctx::<T>(worker_context).await {
284 Ok(()) => TaskHandlerOutcome::Complete,
285 Err(error) => TaskHandlerOutcome::failed(error),
286 }
287}
288
289pub async fn run_batch_task_from_worker_ctx<T: BatchTaskHandler>(
290 worker_context: WorkerContext,
291) -> TaskHandlerOutcome {
292 let original_payload = worker_context.payload().clone();
293 let item_payloads = match original_payload.as_array() {
294 Some(items) => items.clone(),
295 None => {
296 return TaskHandlerOutcome::failed("batch job payload must be a JSON array");
297 }
298 };
299
300 let items = match Vec::<T>::deserialize(&original_payload) {
301 Ok(items) => items,
302 Err(error) => return TaskHandlerOutcome::failed(format!("{error:?}")),
303 };
304
305 let item_count = items.len();
306 let result = T::run_batch(items, worker_context)
307 .await
308 .into_batch_task_handler_result();
309
310 batch_result_to_task_outcome(result, item_count, item_payloads)
311}
312
313fn batch_result_to_task_outcome<E: Debug>(
314 result: BatchTaskResult<E>,
315 item_count: usize,
316 item_payloads: Vec<Value>,
317) -> TaskHandlerOutcome {
318 match result {
319 BatchTaskResult::Complete => TaskHandlerOutcome::Complete,
320 BatchTaskResult::FailAll(error) => TaskHandlerOutcome::failed(format!("{error:?}")),
321 BatchTaskResult::ItemResults(results) => {
322 if results.len() != item_count {
323 return TaskHandlerOutcome::failed(format!(
324 "batch handler returned {} results for {item_count} payload items",
325 results.len()
326 ));
327 }
328
329 let mut failed_items = Vec::new();
330 let mut errors = Vec::new();
331
332 for (index, result) in results.into_iter().enumerate() {
333 let Err(error) = result else {
334 continue;
335 };
336
337 failed_items.push(item_payloads[index].clone());
338 errors.push(format!("{index}: {error:?}"));
339 }
340
341 if failed_items.is_empty() {
342 return TaskHandlerOutcome::Complete;
343 }
344
345 TaskHandlerOutcome::failed_with_replacement(
346 format!(
347 "{} batch item(s) failed: {}",
348 failed_items.len(),
349 errors.join(", ")
350 ),
351 Value::Array(failed_items),
352 )
353 }
354 }
355}
356
357/// Internal function to execute a task handler from a worker context.
358///
359/// This function:
360/// 1. Deserializes the job payload into the specified task handler type
361/// 2. Calls the task handler's run method
362/// 3. Processes the result
363///
364/// # Arguments
365/// * `worker_context` - The context containing job information and payload
366///
367/// # Returns
368/// * `Result<(), String>` - Ok(()) on success, Err with error message on failure
369///
370/// # Type Parameters
371/// * `T` - The task handler type to deserialize and execute
372pub async fn run_task_from_worker_ctx<T: TaskHandler>(
373 worker_context: WorkerContext,
374) -> Result<(), String> {
375 // Deserialize the job payload into the task handler type
376 let job = T::deserialize(worker_context.payload());
377 let Ok(job) = job else {
378 let e = job.err().unwrap();
379 return Err(format!("{e:?}"));
380 };
381
382 // Execute the task and convert the result
383 job.run(worker_context)
384 .await
385 .into_task_handler_result()
386 .map_err(|e| format!("{e:?}"))
387}