Expand description
Tower-style middleware around job execution.
A JobMiddleware wraps the worker.execute(&job, &ctx) call so that
cross-cutting concerns (tracing spans, metrics, auth, custom retry logic)
can layer on top of worker impls without every worker reimplementing them.
§Execution order
Middleware is registered on
ServerConfig::middleware as a Vec
and runs in registration order — the first entry is the outermost layer,
the last entry is closest to the worker, and the terminal
worker.execute(&job, &ctx) runs when every middleware has called
next.run(job, ctx).await.
§Short-circuiting
A middleware can return without calling next.run(...) to skip the rest
of the stack and the worker entirely — useful for guards (authorization,
circuit breakers, dry-run mode). In that case the returned
WorkerResult is what the processor will apply to the job.
§Example
use async_trait::async_trait;
use qml_rs::processing::middleware::{JobMiddleware, Next};
use qml_rs::{Job, Result, WorkerContext, WorkerResult};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
struct CountingMiddleware {
successes: Arc<AtomicUsize>,
failures: Arc<AtomicUsize>,
}
#[async_trait]
impl JobMiddleware for CountingMiddleware {
async fn call<'a>(
&'a self,
job: &'a Job,
ctx: &'a WorkerContext,
next: Next<'a>,
) -> Result<WorkerResult> {
let result = next.run(job, ctx).await;
match &result {
Ok(WorkerResult::Success { .. }) => {
self.successes.fetch_add(1, Ordering::Relaxed);
}
_ => {
self.failures.fetch_add(1, Ordering::Relaxed);
}
}
result
}
}Structs§
- Next
- Handle passed to each middleware that invokes the rest of the stack.
- Tracing
Middleware - Built-in middleware that wraps each job execution in a
tracingspan carryingjob.id,job.method,job.queue, andjob.attempt.
Traits§
- JobMiddleware
- A layer that wraps
worker.executefor a single job.