use crate::core::JobParameter;
use crate::models::{AmqpCommand, Job, RecurringJob};
use crate::models::{DelayedStage, EnqueuedStage, JobConfig, Stage, WaitingStage};
use crate::mq::MqClient;
use crate::persist::Persist;
use crate::stats::{Event, EventsHandler, NoOpStats, Stats};
use crate::storage::Storage;
use crate::{encoder, RecurringJobId};
use crate::{metrics, BackgroundJobServerPublisher, JobId, UtcDateTime};
use anyhow::Context;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
impl BackgroundJobServerPublisher {
pub async fn new(
id: String,
mq_client: Arc<Box<dyn MqClient>>,
storage: Box<dyn Storage>,
) -> anyhow::Result<Self> {
let routing_key = format!("later-{}", id);
let publisher = mq_client.new_publisher(&routing_key).await?;
let persist = Arc::new(Persist::new(storage, routing_key.clone()));
let stats: Box<dyn EventsHandler> = {
if cfg!(feature = "dashboard") {
tracing::info!("Enabling Dashboard Stats Collector");
Box::new(
Stats::new(&routing_key, persist.clone(), &mq_client.clone())
.await
.expect("configure stats"),
)
} else {
Box::new(NoOpStats {})
}
};
Ok(Self {
storage: persist.clone(),
stats,
publisher,
routing_key,
})
}
pub async fn ensure_worker_ready(&self) -> anyhow::Result<()> {
Ok(self.publisher.ensure_consumer().await?)
}
pub fn get_metrics(&self) -> anyhow::Result<String> {
metrics::COUNTER.output()
}
pub async fn enqueue_continue(
&self,
parent_job_id: JobId,
message: impl JobParameter,
) -> anyhow::Result<JobId> {
self.enqueue_internal(message, Some(parent_job_id), None, None)
.await
}
pub async fn enqueue_delayed(
&self,
message: impl JobParameter,
delay: std::time::Duration,
) -> anyhow::Result<JobId> {
let enqueue_time = chrono::Utc::now()
.checked_add_signed(chrono::Duration::from_std(delay)?)
.ok_or(anyhow::anyhow!("Error calculating enqueue time"))?;
self.enqueue_delayed_at(message, enqueue_time).await
}
pub async fn enqueue_delayed_at(
&self,
message: impl JobParameter,
time: chrono::DateTime<chrono::Utc>,
) -> anyhow::Result<JobId> {
if time <= chrono::Utc::now() {
return Err(anyhow::anyhow!("Time must be in the future"));
}
self.enqueue_internal(message, None, Some(time), None).await
}
pub async fn enqueue(&self, message: impl JobParameter) -> anyhow::Result<JobId> {
self.enqueue_internal(message, None, None, None).await
}
#[tracing::instrument(skip(self, message), fields(ptype = message.get_ptype(), job_id), name = "init_create_job")]
async fn enqueue_internal(
&self,
message: impl JobParameter,
parent_job_id: Option<JobId>,
delay_until: Option<UtcDateTime>,
recurring_job_id: Option<RecurringJobId>,
) -> anyhow::Result<JobId> {
let job = create_job(message, parent_job_id, delay_until, recurring_job_id)?;
tracing::Span::current().record("job_id", job.id.to_string());
Ok(self.enqueue_internal_job(job).await?)
}
pub(crate) async fn enqueue_internal_job(&self, job: Job) -> Result<JobId, anyhow::Error> {
let id = job.id.clone();
self.save(&job).await?;
Event::NewJob((&job).into()).publish(&self).await;
self.handle_job_enqueue_initial(job).await?;
Ok(id)
}
pub async fn enqueue_recurring(
&self,
identifier: String,
message: impl JobParameter,
cron: String,
) -> anyhow::Result<JobId> {
let _ = cron::Schedule::from_str(&cron).context("error parsing cron expression")?;
let recurring_job = RecurringJob {
id: RecurringJobId(identifier),
payload_type: message.get_ptype(),
payload: message
.to_bytes()
.context("Unable to serialize the message to bytes")?,
cron_schedule: cron,
date_added: chrono::Utc::now(),
config: JobConfig::default(),
};
self.storage.save_recurring_job(&recurring_job).await?;
let first_job = recurring_job.try_into()?;
let id = self.enqueue_internal_job(first_job).await?;
Ok(id)
}
pub(crate) async fn save(&self, job: &Job) -> anyhow::Result<()> {
if job.stage.is_polling_required() {
self.storage.save_job_id(&job.id, &job.stage).await?;
}
if let Stage::Waiting(w) = &job.stage {
self.storage
.save_continuation(&job.id, w.parent_id.clone())
.await?;
}
self.storage.save_job(job).await
}
pub(crate) async fn expire(&self, job: &Job, _duration: Duration) -> anyhow::Result<()> {
self.storage.expire(job.id.clone()).await
}
#[async_recursion::async_recursion]
pub(crate) async fn handle_job_enqueue_initial(&self, job: Job) -> anyhow::Result<()> {
tracing::debug!(
"handle_job_enqueue_initial: Id: {}, Stage: {:?}",
&job.id,
&job.stage
);
match &job.stage {
Stage::Delayed(delayed) => {
if delayed.is_time() {
let job = job.transition(); self.save(&job).await?;
self.handle_job_enqueue_initial(job).await?;
}
}
Stage::Waiting(waiting) => {
if let Some(parent_job) = self.storage.get_job(waiting.parent_id.clone()).await {
if !parent_job.stage.is_success() {
return Ok(());
}
tracing::info!(
"Parent job {} is already completed, enqueuing this job immediately",
parent_job.id
);
}
let job = job.transition();
self.save(&job).await?;
self.handle_job_enqueue_initial(job).await?;
}
Stage::Enqueued(_) => {
tracing::debug!("Enqueue job {}", job.id);
self.publish_amqp_command(AmqpCommand::ExecuteJob(job.into()))
.await?
}
Stage::Running(_) | Stage::Requeued(_) | Stage::Success(_) | Stage::Failed(_) => {
tracing::warn!("Invalid job here {}, Stage {:?}", job.id, &job.stage);
}
}
Ok(())
}
pub(crate) async fn publish_amqp_command(&self, cmd: AmqpCommand) -> anyhow::Result<()> {
let message_bytes = encoder::encode(&cmd)?;
self.publisher.publish(&message_bytes).await?;
Ok(())
}
}
fn create_job(
message: impl JobParameter,
parent_job_id: Option<JobId>,
delay_until: Option<chrono::DateTime<chrono::Utc>>,
recurring_job_id: Option<RecurringJobId>,
) -> anyhow::Result<Job> {
let id = crate::generate_id();
let job = Job {
id: JobId(id.clone()),
payload_type: message.get_ptype(),
payload: message
.to_bytes()
.context("Unable to serialize the message to bytes")?,
stage: {
if let Some(parent_job_id) = parent_job_id {
Stage::Waiting(WaitingStage {
date: chrono::Utc::now(),
parent_id: parent_job_id,
})
} else if let Some(delay_until) = delay_until {
Stage::Delayed(DelayedStage {
date: chrono::Utc::now(),
not_before: delay_until,
})
} else {
Stage::Enqueued(EnqueuedStage {
date: chrono::Utc::now(),
})
}
},
previous_stages: Vec::default(),
config: JobConfig::default(),
recurring_job_id: recurring_job_id,
};
Ok(job)
}