use crate::config::Config;
use crate::error::OxanaError;
use crate::{JobContext, JobId, Queue};
use crate::{context::ContextValue, job_state::JobState};
enum ProcessJobResult {
Success,
Failed,
Missing,
}
#[derive(Default, Debug)]
pub struct DrainStats {
pub processed: u64,
pub succeeded: u64,
pub failed: u64,
pub missing: u64,
}
pub async fn drain<DT, ET>(
config: &Config<DT, ET>,
ctx: ContextValue<DT>,
queue: impl Queue,
) -> Result<DrainStats, OxanaError>
where
DT: Send + Sync + Clone + 'static,
ET: std::error::Error + Send + Sync + 'static,
{
let queue_key = queue.key();
let mut stats = DrainStats::default();
while let Some(job_id) = config.storage.internal.dequeue(&queue_key).await? {
let result = process_job(config, ctx.clone(), job_id).await?;
match result {
ProcessJobResult::Success => stats.succeeded += 1,
ProcessJobResult::Failed => stats.failed += 1,
ProcessJobResult::Missing => stats.missing += 1,
}
stats.processed += 1;
}
Ok(stats)
}
async fn process_job<DT, ET>(
config: &Config<DT, ET>,
ctx: ContextValue<DT>,
job_id: JobId,
) -> Result<ProcessJobResult, OxanaError>
where
DT: Send + Sync + Clone + 'static,
ET: std::error::Error + Send + Sync + 'static,
{
let envelope = match config.storage.internal.get_job(&job_id).await? {
Some(envelope) => envelope,
None => return Ok(ProcessJobResult::Missing),
};
let job = config
.registry
.build(&envelope.job.name, envelope.job.args.clone(), &ctx.0)?;
let job_ctx = JobContext {
meta: envelope.meta.clone(),
state: JobState::new(config.storage.clone(), job_id, envelope.meta.state.clone()),
};
let job_result = job.process(vec![job_ctx]).await;
match job_result {
Ok(()) => {
config
.storage
.internal
.finish_with_success(&envelope)
.await?;
Ok(ProcessJobResult::Success)
}
Err(e) => {
tracing::error!("Job failed: {}", e);
config
.storage
.internal
.finish_with_failure(&envelope)
.await?;
config
.storage
.internal
.kill(&envelope, e.to_string())
.await?;
Ok(ProcessJobResult::Failed)
}
}
}