aide_de_camp/runner/
job_router.rs

1use super::wrapped_job::{BoxedJobHandler, WrappedJobHandler};
2use crate::core::job_handle::JobHandle;
3use crate::core::job_processor::{JobError, JobProcessor};
4use crate::core::queue::{Queue, QueueError};
5use bincode::{self, Decode, Encode};
6use chrono::Duration;
7use std::collections::HashMap;
8use thiserror::Error;
9use tokio_util::sync::CancellationToken;
10use tracing::instrument;
11
12/// A job processor router. Matches job type to job processor implementation.
13/// This type requires that your jobs implement `Encode` + `Decode` from bincode trait. Those traits are re-exported in prelude.
14///
15/// ## Example
16/// ```rust
17/// use aide_de_camp::prelude::{JobProcessor, RunnerRouter, Encode, Decode, Xid, CancellationToken};
18/// use async_trait::async_trait;
19/// struct MyJob;
20///
21/// impl MyJob {
22///     async fn do_work(&self) -> anyhow::Result<()> {
23///         // ..do some work
24///         Ok(())
25///     }
26/// }
27///
28/// #[derive(Encode, Decode)]
29/// struct MyJobPayload(u8, String);
30///
31/// #[async_trait::async_trait]
32/// impl JobProcessor for MyJob {
33///     type Payload = MyJobPayload;
34///     type Error = anyhow::Error;
35///
36///     fn name() -> &'static str {
37///         "my_job"
38///     }
39///
40///     async fn handle(&self, jid: Xid, payload: Self::Payload, cancellation_token: CancellationToken) -> Result<(), Self::Error> {
41///         tokio::select! {
42///             result = self.do_work() => { result }
43///             _ = cancellation_token.cancelled() => { Ok(()) }
44///         }
45///     }
46/// }
47///
48/// let router = {
49///     let mut r = RunnerRouter::default();
50///     r.add_job_handler(MyJob);
51///     r
52/// };
53///
54///```
55#[derive(Default)]
56pub struct RunnerRouter {
57    jobs: HashMap<&'static str, BoxedJobHandler>,
58}
59
60impl RunnerRouter {
61    /// Register a job handler with the router. If job by that name already present, it will get replaced.
62    pub fn add_job_handler<J>(&mut self, job: J)
63    where
64        J: JobProcessor + 'static,
65        J::Payload: Decode + Encode,
66        J::Error: Into<JobError>,
67    {
68        let name = J::name();
69        let boxed = WrappedJobHandler::new(job).boxed();
70        self.jobs.entry(name).or_insert(boxed);
71    }
72
73    pub fn types(&self) -> Vec<&'static str> {
74        self.jobs.keys().copied().collect()
75    }
76
77    /// Process job handle. This function reposible for job lifecycle. If you're implementing your
78    /// own job runner, then this is what you should use to process job that is already pulled
79    /// from the queue. In all other cases, you shouldn't use this function directly.
80    #[instrument(skip_all, err, fields(job_type = %job_handle.job_type(), jid = %job_handle.id().to_string(), retries = job_handle.retries()))]
81    pub async fn process<H: JobHandle>(
82        &self,
83        job_handle: H,
84        cancellation_token: CancellationToken,
85    ) -> Result<(), RunnerError> {
86        if let Some(r) = self.jobs.get(job_handle.job_type()) {
87            let job_shutdown_timeout = r.shutdown_timeout();
88
89            let job_result = tokio::select! {
90                job_result = r.handle(job_handle.id(), job_handle.payload(), cancellation_token.child_token()) => {
91                    job_result
92                }
93                cancellation_result = cancellation_handler(job_shutdown_timeout, cancellation_token.child_token()) => {
94                    cancellation_result
95                }
96            };
97            handle_job_result(
98                job_result,
99                job_handle,
100                r.max_retries(),
101                cancellation_token.child_token(),
102            )
103            .await
104        } else {
105            Err(RunnerError::UnknownJobType(
106                job_handle.job_type().to_string(),
107            ))
108        }
109    }
110
111    /// In a loop, poll the queue with interval (passes interval to `Queue::next`) and process
112    /// incoming jobs. Function process jobs one-by-one without job-level concurrency. If you need
113    /// concurrency, look at the `JobRunner` instead.
114    pub async fn listen<Q, QR>(
115        &self,
116        queue: Q,
117        poll_interval: Duration,
118        cancellation_token: CancellationToken,
119    ) where
120        Q: AsRef<QR>,
121        QR: Queue,
122    {
123        let job_types = self.types();
124        loop {
125            tokio::select! {
126                next = queue.as_ref().next(&job_types, poll_interval) => {
127                    let cancellation_token = cancellation_token.child_token();
128                    self.handle_next_job::<QR>(next, cancellation_token).await;
129                }
130                _ = cancellation_token.cancelled() => {
131                    // If cancellation is requested while a job is processing, this block will execute on the next iteration.
132                    tracing::debug!("Shutdown request received, stopping listener");
133                    return
134                }
135            }
136        }
137    }
138
139    async fn handle_next_job<QR>(
140        &self,
141        next: Result<QR::JobHandle, QueueError>,
142        cancellation_token: CancellationToken,
143    ) where
144        QR: Queue,
145    {
146        match next {
147            Ok(handle) => {
148                match self.process(handle, cancellation_token.child_token()).await {
149                    Ok(_) => {}
150                    Err(RunnerError::QueueError(e)) => handle_queue_error(e).await,
151                    Err(RunnerError::UnknownJobType(name)) => {
152                        tracing::error!("Unknown job type: {}", name)
153                    }
154                };
155            }
156            Err(e) => {
157                handle_queue_error(e).await;
158            }
159        }
160    }
161}
162
163/// Errors returned by the router.
164#[derive(Error, Debug)]
165pub enum RunnerError {
166    #[error("Runner is not configured to run this job type: {0}")]
167    UnknownJobType(String),
168    #[error(transparent)]
169    QueueError(#[from] QueueError),
170}
171
172async fn cancellation_handler(
173    job_shutdown_timeout: std::time::Duration,
174    cancellation_token: CancellationToken,
175) -> Result<(), JobError> {
176    cancellation_token.cancelled().await;
177    // Wait for the duration of the shutdown timeout specified by the job configuration.
178    // The job should complete within the timeout period, preventing this method from completing.
179    tokio::time::sleep(job_shutdown_timeout).await;
180    Err(JobError::ShutdownTimeout(job_shutdown_timeout))
181}
182
183async fn handle_job_result<H: JobHandle>(
184    job_result: Result<(), JobError>,
185    job_handle: H,
186    max_retries: u32,
187    cancellation_token: CancellationToken,
188) -> Result<(), RunnerError> {
189    if cancellation_token.is_cancelled() {
190        tracing::info!("Cancellation was requested during job processing");
191    }
192    match job_result.map_err(JobError::from) {
193        Ok(_) => {
194            job_handle.complete().await?;
195            Ok(())
196        }
197        Err(e) => {
198            tracing::error!("Error during job processing: {}", e);
199            if job_handle.retries() >= max_retries {
200                tracing::warn!("Moving job {} to dead queue", job_handle.id().to_string());
201                job_handle.dead_queue().await?;
202                Ok(())
203            } else {
204                job_handle.fail().await?;
205                Ok(())
206            }
207        }
208    }
209}
210
211async fn handle_queue_error(error: QueueError) {
212    tracing::error!("Encountered QueueError: {}", error);
213    tracing::warn!("Suspending worker for 5 seconds");
214    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
215}
216
217#[cfg(test)]
218mod test {
219    use super::*;
220    use crate::core::Xid;
221    use bincode::config::standard;
222    use std::convert::Infallible;
223
224    #[tokio::test]
225    async fn it_is_object_safe_and_wrappable() {
226        struct Example;
227
228        #[async_trait::async_trait]
229        impl JobProcessor for Example {
230            type Payload = Vec<i32>;
231            type Error = Infallible;
232
233            async fn handle(
234                &self,
235                _jid: Xid,
236                _payload: Self::Payload,
237                _cancellation_token: CancellationToken,
238            ) -> Result<(), Infallible> {
239                dbg!("we did it patrick");
240                Ok(())
241            }
242            fn name() -> &'static str {
243                "example"
244            }
245        }
246
247        let payload = vec![1, 2, 3];
248
249        let job: Box<dyn JobProcessor<Payload = _, Error = _>> = Box::new(Example);
250
251        job.handle(xid::new(), payload.clone(), CancellationToken::new())
252            .await
253            .unwrap();
254        let wrapped: Box<dyn JobProcessor<Payload = _, Error = JobError>> =
255            Box::new(WrappedJobHandler::new(Example));
256
257        let payload = bincode::encode_to_vec(&payload, standard()).unwrap();
258
259        wrapped
260            .handle(xid::new(), payload.into(), CancellationToken::new())
261            .await
262            .unwrap();
263    }
264}