aide_de_camp/core/job_processor.rs
1use crate::core::Xid;
2use async_trait::async_trait;
3use std::convert::Infallible;
4use thiserror::Error;
5use tokio_util::sync::CancellationToken;
6
7/// A job-handler interface. Your Payload should implement `bincode::{Decode, Encode}` if you're
8/// planning to use it with the runner and Queue from this crate.
9///
10/// ## Example
11/// ```rust
12/// use aide_de_camp::prelude::{JobProcessor, Encode, Decode, Xid, CancellationToken};
13/// use async_trait::async_trait;
14/// struct MyJob;
15///
16/// impl MyJob {
17/// async fn do_work(&self) -> anyhow::Result<()> {
18/// // ..do some work
19/// Ok(())
20/// }
21/// }
22///
23/// #[derive(Encode, Decode)]
24/// struct MyJobPayload(u8, String);
25///
26/// #[async_trait::async_trait]
27/// impl JobProcessor for MyJob {
28/// type Payload = MyJobPayload;
29/// type Error = anyhow::Error;
30///
31/// fn name() -> &'static str {
32/// "my_job"
33/// }
34///
35/// async fn handle(&self, jid: Xid, payload: Self::Payload, cancellation_token: CancellationToken) -> Result<(), Self::Error> {
36/// tokio::select! {
37/// result = self.do_work() => { result }
38/// _ = cancellation_token.cancelled() => { Ok(()) }
39/// }
40/// }
41/// }
42/// ```
43/// ## Services
44/// If your job processor requires external services (i.e. database client, REST client, etc.), add
45/// them directly as your struct fields.
46#[async_trait]
47pub trait JobProcessor: Send + Sync {
48 /// What is the input to this handler. If you want to use `RunnerRouter`, then this must implement `bincode::Decode` and `bincode::Encode`.
49 type Payload: Send;
50 /// What error is returned
51 type Error: Send;
52 /// Run the job, passing payload to it. Your payload should implement `bincode::Decode`.
53 /// You should listen for the `cancellation_token.cancelled()` event in order to handle shutdown requests gracefully.
54 async fn handle(
55 &self,
56 jid: Xid,
57 payload: Self::Payload,
58 cancellation_token: CancellationToken,
59 ) -> Result<(), Self::Error>;
60
61 /// How many times job should be retried before being moved to dead queue
62 fn max_retries(&self) -> u32 {
63 0
64 }
65
66 /// How long to wait before forcefully terminating the job when the server receives a shutdown request.
67 fn shutdown_timeout(&self) -> std::time::Duration {
68 std::time::Duration::from_secs(1)
69 }
70
71 /// Job type, used to differentiate between different jobs in the queue.
72 fn name() -> &'static str
73 where
74 Self: Sized;
75}
76
77/// Error types returned by job processor that wraps your job processor.
78#[derive(Error, Debug)]
79pub enum JobError {
80 /// Encountered an error when tried to deserialize Context.
81 #[error("Failed to deserialize job context")]
82 DecodeError {
83 #[from]
84 source: bincode::error::DecodeError,
85 },
86
87 #[error("Job failed to complete within the shutdown timeout of {0:#?}")]
88 ShutdownTimeout(std::time::Duration),
89
90 /// Error originated in inner-job implementation
91 #[error(transparent)]
92 Other(#[from] anyhow::Error),
93}
94
95impl From<Infallible> for JobError {
96 fn from(_: Infallible) -> Self {
97 unreachable!();
98 }
99}