aide_de_camp/runner/
job_router.rs1use super::wrapped_job::{BoxedJobHandler, WrappedJobHandler};
2use crate::core::job_handle::JobHandle;
3use crate::core::job_processor::{JobError, JobProcessor};
4use crate::core::queue::{Queue, QueueError};
5use bincode::{self, Decode, Encode};
6use chrono::Duration;
7use std::collections::HashMap;
8use thiserror::Error;
9use tokio_util::sync::CancellationToken;
10use tracing::instrument;
11
12#[derive(Default)]
56pub struct RunnerRouter {
57 jobs: HashMap<&'static str, BoxedJobHandler>,
58}
59
60impl RunnerRouter {
61 pub fn add_job_handler<J>(&mut self, job: J)
63 where
64 J: JobProcessor + 'static,
65 J::Payload: Decode + Encode,
66 J::Error: Into<JobError>,
67 {
68 let name = J::name();
69 let boxed = WrappedJobHandler::new(job).boxed();
70 self.jobs.entry(name).or_insert(boxed);
71 }
72
73 pub fn types(&self) -> Vec<&'static str> {
74 self.jobs.keys().copied().collect()
75 }
76
77 #[instrument(skip_all, err, fields(job_type = %job_handle.job_type(), jid = %job_handle.id().to_string(), retries = job_handle.retries()))]
81 pub async fn process<H: JobHandle>(
82 &self,
83 job_handle: H,
84 cancellation_token: CancellationToken,
85 ) -> Result<(), RunnerError> {
86 if let Some(r) = self.jobs.get(job_handle.job_type()) {
87 let job_shutdown_timeout = r.shutdown_timeout();
88
89 let job_result = tokio::select! {
90 job_result = r.handle(job_handle.id(), job_handle.payload(), cancellation_token.child_token()) => {
91 job_result
92 }
93 cancellation_result = cancellation_handler(job_shutdown_timeout, cancellation_token.child_token()) => {
94 cancellation_result
95 }
96 };
97 handle_job_result(
98 job_result,
99 job_handle,
100 r.max_retries(),
101 cancellation_token.child_token(),
102 )
103 .await
104 } else {
105 Err(RunnerError::UnknownJobType(
106 job_handle.job_type().to_string(),
107 ))
108 }
109 }
110
111 pub async fn listen<Q, QR>(
115 &self,
116 queue: Q,
117 poll_interval: Duration,
118 cancellation_token: CancellationToken,
119 ) where
120 Q: AsRef<QR>,
121 QR: Queue,
122 {
123 let job_types = self.types();
124 loop {
125 tokio::select! {
126 next = queue.as_ref().next(&job_types, poll_interval) => {
127 let cancellation_token = cancellation_token.child_token();
128 self.handle_next_job::<QR>(next, cancellation_token).await;
129 }
130 _ = cancellation_token.cancelled() => {
131 tracing::debug!("Shutdown request received, stopping listener");
133 return
134 }
135 }
136 }
137 }
138
139 async fn handle_next_job<QR>(
140 &self,
141 next: Result<QR::JobHandle, QueueError>,
142 cancellation_token: CancellationToken,
143 ) where
144 QR: Queue,
145 {
146 match next {
147 Ok(handle) => {
148 match self.process(handle, cancellation_token.child_token()).await {
149 Ok(_) => {}
150 Err(RunnerError::QueueError(e)) => handle_queue_error(e).await,
151 Err(RunnerError::UnknownJobType(name)) => {
152 tracing::error!("Unknown job type: {}", name)
153 }
154 };
155 }
156 Err(e) => {
157 handle_queue_error(e).await;
158 }
159 }
160 }
161}
162
163#[derive(Error, Debug)]
165pub enum RunnerError {
166 #[error("Runner is not configured to run this job type: {0}")]
167 UnknownJobType(String),
168 #[error(transparent)]
169 QueueError(#[from] QueueError),
170}
171
172async fn cancellation_handler(
173 job_shutdown_timeout: std::time::Duration,
174 cancellation_token: CancellationToken,
175) -> Result<(), JobError> {
176 cancellation_token.cancelled().await;
177 tokio::time::sleep(job_shutdown_timeout).await;
180 Err(JobError::ShutdownTimeout(job_shutdown_timeout))
181}
182
183async fn handle_job_result<H: JobHandle>(
184 job_result: Result<(), JobError>,
185 job_handle: H,
186 max_retries: u32,
187 cancellation_token: CancellationToken,
188) -> Result<(), RunnerError> {
189 if cancellation_token.is_cancelled() {
190 tracing::info!("Cancellation was requested during job processing");
191 }
192 match job_result.map_err(JobError::from) {
193 Ok(_) => {
194 job_handle.complete().await?;
195 Ok(())
196 }
197 Err(e) => {
198 tracing::error!("Error during job processing: {}", e);
199 if job_handle.retries() >= max_retries {
200 tracing::warn!("Moving job {} to dead queue", job_handle.id().to_string());
201 job_handle.dead_queue().await?;
202 Ok(())
203 } else {
204 job_handle.fail().await?;
205 Ok(())
206 }
207 }
208 }
209}
210
211async fn handle_queue_error(error: QueueError) {
212 tracing::error!("Encountered QueueError: {}", error);
213 tracing::warn!("Suspending worker for 5 seconds");
214 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
215}
216
217#[cfg(test)]
218mod test {
219 use super::*;
220 use crate::core::Xid;
221 use bincode::config::standard;
222 use std::convert::Infallible;
223
224 #[tokio::test]
225 async fn it_is_object_safe_and_wrappable() {
226 struct Example;
227
228 #[async_trait::async_trait]
229 impl JobProcessor for Example {
230 type Payload = Vec<i32>;
231 type Error = Infallible;
232
233 async fn handle(
234 &self,
235 _jid: Xid,
236 _payload: Self::Payload,
237 _cancellation_token: CancellationToken,
238 ) -> Result<(), Infallible> {
239 dbg!("we did it patrick");
240 Ok(())
241 }
242 fn name() -> &'static str {
243 "example"
244 }
245 }
246
247 let payload = vec![1, 2, 3];
248
249 let job: Box<dyn JobProcessor<Payload = _, Error = _>> = Box::new(Example);
250
251 job.handle(xid::new(), payload.clone(), CancellationToken::new())
252 .await
253 .unwrap();
254 let wrapped: Box<dyn JobProcessor<Payload = _, Error = JobError>> =
255 Box::new(WrappedJobHandler::new(Example));
256
257 let payload = bincode::encode_to_vec(&payload, standard()).unwrap();
258
259 wrapped
260 .handle(xid::new(), payload.into(), CancellationToken::new())
261 .await
262 .unwrap();
263 }
264}