aide_de_camp/core/
job_processor.rs

1use crate::core::Xid;
2use async_trait::async_trait;
3use std::convert::Infallible;
4use thiserror::Error;
5use tokio_util::sync::CancellationToken;
6
7/// A job-handler interface. Your Payload should implement `bincode::{Decode, Encode}` if you're
8/// planning to use it with the runner and Queue from this crate.
9///
10/// ## Example
11/// ```rust
12/// use aide_de_camp::prelude::{JobProcessor, Encode, Decode, Xid, CancellationToken};
13/// use async_trait::async_trait;
14/// struct MyJob;
15///
16/// impl MyJob {
17///     async fn do_work(&self) -> anyhow::Result<()> {
18///         // ..do some work
19///         Ok(())
20///     }
21/// }
22///
23/// #[derive(Encode, Decode)]
24/// struct MyJobPayload(u8, String);
25///
26/// #[async_trait::async_trait]
27/// impl JobProcessor for MyJob {
28///     type Payload = MyJobPayload;
29///     type Error = anyhow::Error;
30///
31///     fn name() -> &'static str {
32///         "my_job"
33///     }
34///
35///     async fn handle(&self, jid: Xid, payload: Self::Payload, cancellation_token: CancellationToken) -> Result<(), Self::Error> {
36///         tokio::select! {
37///             result = self.do_work() => { result }
38///             _ = cancellation_token.cancelled() => { Ok(()) }
39///         }
40///     }
41/// }
42/// ```
43/// ## Services
44/// If your job processor requires external services (i.e. database client, REST client, etc.), add
45/// them directly as your struct fields.
46#[async_trait]
47pub trait JobProcessor: Send + Sync {
48    /// What is the input to this handler. If you want to use `RunnerRouter`, then this must implement `bincode::Decode` and `bincode::Encode`.
49    type Payload: Send;
50    /// What error is returned
51    type Error: Send;
52    /// Run the job, passing payload to it. Your payload should implement `bincode::Decode`.
53    /// You should listen for the `cancellation_token.cancelled()` event in order to handle shutdown requests gracefully.
54    async fn handle(
55        &self,
56        jid: Xid,
57        payload: Self::Payload,
58        cancellation_token: CancellationToken,
59    ) -> Result<(), Self::Error>;
60
61    /// How many times job should be retried before being moved to dead queue
62    fn max_retries(&self) -> u32 {
63        0
64    }
65
66    /// How long to wait before forcefully terminating the job when the server receives a shutdown request.
67    fn shutdown_timeout(&self) -> std::time::Duration {
68        std::time::Duration::from_secs(1)
69    }
70
71    /// Job type, used to differentiate between different jobs in the queue.
72    fn name() -> &'static str
73    where
74        Self: Sized;
75}
76
77/// Error types returned by job processor that wraps your job processor.
78#[derive(Error, Debug)]
79pub enum JobError {
80    /// Encountered an error when tried to deserialize Context.
81    #[error("Failed to deserialize job context")]
82    DecodeError {
83        #[from]
84        source: bincode::error::DecodeError,
85    },
86
87    #[error("Job failed to complete within the shutdown timeout of {0:#?}")]
88    ShutdownTimeout(std::time::Duration),
89
90    /// Error originated in inner-job implementation
91    #[error(transparent)]
92    Other(#[from] anyhow::Error),
93}
94
95impl From<Infallible> for JobError {
96    fn from(_: Infallible) -> Self {
97        unreachable!();
98    }
99}