use super::wrapped_job::{BoxedJobHandler, WrappedJobHandler};
use crate::core::job_handle::JobHandle;
use crate::core::job_processor::{JobError, JobProcessor};
use crate::core::queue::{Queue, QueueError};
use bincode::{self, Decode, Encode};
use chrono::Duration;
use std::collections::HashMap;
use thiserror::Error;
use tracing::instrument;
#[derive(Default)]
pub struct RunnerRouter {
jobs: HashMap<&'static str, BoxedJobHandler>,
}
impl RunnerRouter {
pub fn add_job_handler<J>(&mut self, job: J)
where
J: JobProcessor + 'static,
J::Payload: Decode + Encode,
J::Error: Into<JobError>,
{
let name = J::name();
let boxed = WrappedJobHandler::new(job).boxed();
self.jobs.entry(name).or_insert(boxed);
}
pub fn types(&self) -> Vec<&'static str> {
self.jobs.keys().copied().collect()
}
#[instrument(skip_all, err, fields(job_type = %job_handle.job_type(), jid = %job_handle.id().to_string(), retries = job_handle.retries()))]
pub async fn process<H: JobHandle>(&self, job_handle: H) -> Result<(), RunnerError> {
if let Some(r) = self.jobs.get(job_handle.job_type()) {
match r
.handle(job_handle.id(), job_handle.payload())
.await
.map_err(JobError::from)
{
Ok(_) => {
job_handle.complete().await?;
Ok(())
}
Err(e) => {
tracing::error!("Error during job processing: {}", e);
if job_handle.retries() >= r.max_retries() {
tracing::warn!("Moving job {} to dead queue", job_handle.id().to_string());
job_handle.dead_queue().await?;
Ok(())
} else {
job_handle.fail().await?;
Ok(())
}
}
}
} else {
Err(RunnerError::UnknownJobType(
job_handle.job_type().to_string(),
))
}
}
pub async fn listen<Q, QR>(&self, queue: Q, poll_interval: Duration)
where
Q: AsRef<QR>,
QR: Queue,
{
let job_types = self.types();
loop {
match queue.as_ref().next(&job_types, poll_interval).await {
Ok(handle) => match self.process(handle).await {
Ok(_) => {}
Err(RunnerError::QueueError(e)) => handle_queue_error(e).await,
Err(RunnerError::UnknownJobType(name)) => {
tracing::error!("Unknown job type: {}", name)
}
},
Err(e) => {
handle_queue_error(e).await;
}
}
}
}
}
#[derive(Error, Debug)]
pub enum RunnerError {
#[error("Runner is not configured to run this job type: {0}")]
UnknownJobType(String),
#[error(transparent)]
QueueError(#[from] QueueError),
}
async fn handle_queue_error(error: QueueError) {
tracing::error!("Encountered QueueError: {}", error);
tracing::warn!("Suspending worker for 5 seconds");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
#[cfg(test)]
mod test {
use super::*;
use crate::core::Xid;
use bincode::config::standard;
use std::convert::Infallible;
#[tokio::test]
async fn it_is_object_safe_and_wrappable() {
struct Example;
#[async_trait::async_trait]
impl JobProcessor for Example {
type Payload = Vec<i32>;
type Error = Infallible;
async fn handle(&self, _jid: Xid, _payload: Self::Payload) -> Result<(), Infallible> {
dbg!("we did it patrick");
Ok(())
}
fn name() -> &'static str {
"example"
}
}
let payload = vec![1, 2, 3];
let job: Box<dyn JobProcessor<Payload = _, Error = _>> = Box::new(Example);
job.handle(xid::new(), payload.clone()).await.unwrap();
let wrapped: Box<dyn JobProcessor<Payload = _, Error = JobError>> =
Box::new(WrappedJobHandler::new(Example));
let payload = bincode::encode_to_vec(&payload, standard()).unwrap();
wrapped.handle(xid::new(), payload.into()).await.unwrap();
}
}