use async_trait::async_trait;
use es_entity::clock::ClockHandle;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
pub use job::Jobs;
use job::{
CurrentJob, Job, JobCompletion, JobInitializer, JobRunner, JobSpawner, JobType, RetrySettings,
};
use super::{InboxEvent, InboxEventId, InboxEventStatus};
use crate::tables::MailboxTables;
pub enum InboxResult {
Complete,
ReprocessNow,
ReprocessIn(std::time::Duration),
}
pub trait InboxHandler: Send + Sync + 'static {
fn handle(
&self,
event: &InboxEvent,
) -> impl std::future::Future<
Output = Result<InboxResult, Box<dyn std::error::Error + Send + Sync>>,
> + Send;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InboxJobData<Tables> {
pub inbox_event_id: InboxEventId,
#[serde(skip)]
pub(super) _phantom: std::marker::PhantomData<Tables>,
}
pub(super) struct InboxJobInitializer<H, Tables>
where
H: InboxHandler,
Tables: MailboxTables,
{
pool: sqlx::PgPool,
handler: Arc<H>,
job_type: JobType,
retry_settings: RetrySettings,
clock: ClockHandle,
_phantom: std::marker::PhantomData<Tables>,
}
impl<H, Tables> InboxJobInitializer<H, Tables>
where
H: InboxHandler,
Tables: MailboxTables,
{
pub fn new(
pool: &sqlx::PgPool,
handler: H,
job_type: JobType,
retry_settings: RetrySettings,
clock: ClockHandle,
) -> Self {
Self {
pool: pool.clone(),
handler: Arc::new(handler),
job_type,
retry_settings,
clock,
_phantom: std::marker::PhantomData,
}
}
}
impl<H, Tables> JobInitializer for InboxJobInitializer<H, Tables>
where
H: InboxHandler,
Tables: MailboxTables,
{
type Config = InboxJobData<Tables>;
fn job_type(&self) -> JobType {
self.job_type.clone()
}
fn retry_on_error_settings(&self) -> RetrySettings {
self.retry_settings.clone()
}
fn init(
&self,
job: &Job,
_: JobSpawner<Self::Config>,
) -> Result<Box<dyn JobRunner>, Box<dyn std::error::Error>> {
let config: InboxJobData<Tables> = job.config()?;
Ok(Box::new(InboxJobRunner::<H, Tables> {
pool: self.pool.clone(),
handler: self.handler.clone(),
inbox_event_id: config.inbox_event_id,
clock: self.clock.clone(),
_phantom: std::marker::PhantomData,
}))
}
}
struct InboxJobRunner<H, Tables>
where
H: InboxHandler,
Tables: MailboxTables,
{
pool: sqlx::PgPool,
handler: Arc<H>,
inbox_event_id: InboxEventId,
clock: ClockHandle,
_phantom: std::marker::PhantomData<Tables>,
}
#[async_trait]
impl<H, Tables> JobRunner for InboxJobRunner<H, Tables>
where
H: InboxHandler,
Tables: MailboxTables,
{
async fn run(
&self,
mut current_job: CurrentJob,
) -> Result<JobCompletion, Box<dyn std::error::Error>> {
if current_job.is_shutdown_requested() {
return Ok(JobCompletion::RescheduleNow);
}
let now = self.clock.manual_now();
Tables::update_inbox_event_status(
&self.pool,
now,
self.inbox_event_id,
InboxEventStatus::Processing,
None,
)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
let event = Tables::find_inbox_event_by_id(&self.pool, self.inbox_event_id)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
let result = self.handler.handle(&event).await;
match result {
Ok(InboxResult::Complete) => {
Tables::update_inbox_event_status(
&self.pool,
now,
self.inbox_event_id,
InboxEventStatus::Completed,
None,
)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
Ok(JobCompletion::Complete)
}
Ok(InboxResult::ReprocessNow) => {
Tables::update_inbox_event_status(
&self.pool,
now,
self.inbox_event_id,
InboxEventStatus::Pending,
None,
)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
Ok(JobCompletion::RescheduleNow)
}
Ok(InboxResult::ReprocessIn(duration)) => {
Tables::update_inbox_event_status(
&self.pool,
now,
self.inbox_event_id,
InboxEventStatus::Pending,
None,
)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
Ok(JobCompletion::RescheduleIn(duration))
}
Err(e) => {
Tables::update_inbox_event_status(
&self.pool,
now,
self.inbox_event_id,
InboxEventStatus::Failed,
Some(&e.to_string()),
)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
Err(e)
}
}
}
}
pub(super) type InboxJobSpawner<Tables> = JobSpawner<InboxJobData<Tables>>;