use crate::core::job_processor::{JobError, JobProcessor};
use crate::core::Xid;
use async_trait::async_trait;
use bincode::{config::Configuration, Decode, Encode};
use bytes::Bytes;
use tracing::instrument;
pub type BoxedJobHandler = Box<dyn JobProcessor<Payload = Bytes, Error = JobError>>;
pub struct WrappedJobHandler<T: JobProcessor> {
job: T,
config: Configuration,
}
impl<J> WrappedJobHandler<J>
where
J: JobProcessor + 'static,
J::Payload: Decode + Encode,
J::Error: Into<JobError>,
{
pub fn new(job: J) -> Self {
let config = bincode::config::standard();
Self { job, config }
}
pub fn boxed(self) -> BoxedJobHandler {
Box::new(self) as BoxedJobHandler
}
}
#[async_trait]
impl<J> JobProcessor for WrappedJobHandler<J>
where
J: JobProcessor + 'static,
J::Payload: Decode + Encode,
J::Error: Into<JobError>,
{
type Payload = Bytes;
type Error = JobError;
#[instrument(skip_all, err, fields(jid = %jid.to_string(), job_type = %Self::name()))]
async fn handle(&self, jid: Xid, payload: Self::Payload) -> Result<(), Self::Error> {
let (payload, _) = bincode::decode_from_slice(payload.as_ref(), self.config)?;
self.job.handle(jid, payload).await.map_err(Into::into)
}
fn name() -> &'static str {
J::name()
}
}
impl<J> From<J> for WrappedJobHandler<J>
where
J: JobProcessor + 'static,
J::Payload: Decode + Encode,
J::Error: Into<JobError>,
{
fn from(job: J) -> Self {
Self::new(job)
}
}