use std::time::Duration;
use async_trait::async_trait;
use sqlx_postgres::PgPool;
use tokio_util::sync::CancellationToken;
use crate::domain::{Job, JobKind};
use crate::queue;
#[derive(Debug, Clone)]
pub struct ExecutionContext {
pub pool: PgPool,
pub shutdown: CancellationToken,
pub worker_id: String,
}
#[derive(Debug)]
pub enum ExecutionOutcome {
Succeeded,
Failed(String),
Cancelled,
}
#[async_trait]
pub trait Executor: Send + Sync + 'static {
async fn execute(&self, ctx: &ExecutionContext, job: &Job) -> ExecutionOutcome;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct SimulatedExecutor;
#[derive(Clone, Copy)]
struct Step {
sleep_ms: u64,
fail_prob: f64,
}
fn steps(kind: JobKind) -> &'static [Step] {
match kind {
JobKind::SendEmail => &[
Step {
sleep_ms: 50,
fail_prob: 0.0,
},
Step {
sleep_ms: 50,
fail_prob: 0.45,
},
Step {
sleep_ms: 50,
fail_prob: 0.0,
},
],
JobKind::ResizeImage => &[
Step {
sleep_ms: 100,
fail_prob: 0.25,
},
Step {
sleep_ms: 200,
fail_prob: 0.0,
},
],
JobKind::SummarizeText => &[Step {
sleep_ms: 150,
fail_prob: 0.15,
}],
JobKind::WebhookDelivery => &[
Step {
sleep_ms: 80,
fail_prob: 0.35,
},
Step {
sleep_ms: 80,
fail_prob: 0.15,
},
],
}
}
fn draw(job_id: uuid::Uuid, attempt: i32, step_idx: usize) -> f64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut h = DefaultHasher::new();
job_id.hash(&mut h);
attempt.hash(&mut h);
step_idx.hash(&mut h);
let v = h.finish();
((v >> 11) as f64) / ((1u64 << 53) as f64)
}
#[async_trait]
impl Executor for SimulatedExecutor {
async fn execute(&self, ctx: &ExecutionContext, job: &Job) -> ExecutionOutcome {
let plan = steps(job.kind);
for (i, step) in plan.iter().enumerate() {
match queue::get(&ctx.pool, job.id).await {
Ok(Some(j)) if j.cancel_requested => return ExecutionOutcome::Cancelled,
Ok(_) => {}
Err(e) => {
return ExecutionOutcome::Failed(format!(
"executor: db error checking cancel flag: {e}"
));
}
}
tokio::time::sleep(Duration::from_millis(step.sleep_ms)).await;
let effective_fail_prob = if job.attempts >= 2 {
0.0
} else {
step.fail_prob
};
if draw(job.id.as_uuid(), job.attempts, i) < effective_fail_prob {
return ExecutionOutcome::Failed(format!(
"simulated failure at step {}/{} for kind={}",
i + 1,
plan.len(),
job.kind.as_str()
));
}
}
ExecutionOutcome::Succeeded
}
}
#[cfg(test)]
mod tests {
use super::*;
use uuid::Uuid;
#[test]
fn draw_is_deterministic() {
let id = Uuid::now_v7();
let a = draw(id, 1, 0);
let b = draw(id, 1, 0);
assert_eq!(a, b);
}
#[test]
fn draw_is_in_unit_interval() {
let id = Uuid::now_v7();
for attempt in 0..5 {
for step in 0..5 {
let v = draw(id, attempt, step);
assert!((0.0..1.0).contains(&v), "got {v}");
}
}
}
#[test]
fn draw_varies_with_attempt() {
let id = Uuid::now_v7();
let a1 = draw(id, 1, 0);
let a2 = draw(id, 2, 0);
assert_ne!(
a1, a2,
"expected different attempts to produce different draws"
);
}
}