use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use folk_api::metrics::{CounterVec, GaugeVec, HistogramVec};
use folk_api::{BoxFuture, Executor, PluginContext, RpcMethodDef, ServerPlugin};
use tracing::{error, info, warn};
use serde::Deserialize;
use crate::config::{DriverKind, JobsConfig, QueueConfig};
use crate::driver::Driver;
#[derive(Deserialize)]
struct PushRequest {
queue: String,
payload: String,
#[serde(default)]
delay: u64,
}
struct JobsMetrics {
pushed_total: Arc<dyn CounterVec>,
processed_total: Arc<dyn CounterVec>,
processing_duration: Arc<dyn HistogramVec>,
retries_total: Arc<dyn CounterVec>,
dead_letter_total: Arc<dyn CounterVec>,
active: Arc<dyn GaugeVec>,
}
impl JobsMetrics {
fn from_registry(reg: &dyn folk_api::metrics::MetricsRegistry) -> Self {
Self {
pushed_total: reg.counter_vec(
"folk_jobs_pushed_total",
"Number of jobs pushed",
&["queue"],
),
processed_total: reg.counter_vec(
"folk_jobs_processed_total",
"Number of jobs processed",
&["queue", "status"],
),
processing_duration: reg.histogram_vec(
"folk_jobs_processing_duration_seconds",
"Job processing duration",
&["queue"],
),
retries_total: reg.counter_vec(
"folk_jobs_retries_total",
"Number of job retries",
&["queue"],
),
dead_letter_total: reg.counter_vec(
"folk_jobs_dead_letter_total",
"Number of jobs sent to dead letter queue",
&["queue"],
),
active: reg.gauge_vec(
"folk_jobs_active",
"Jobs currently being processed",
&["queue"],
),
}
}
}
pub struct JobsPlugin {
config: JobsConfig,
}
impl JobsPlugin {
pub fn new(config: JobsConfig) -> Self {
Self { config }
}
}
#[async_trait]
impl ServerPlugin for JobsPlugin {
fn name(&self) -> &'static str {
"jobs"
}
async fn run(&self, ctx: PluginContext) -> Result<()> {
let driver: Arc<dyn Driver> = match self.config.driver {
DriverKind::Memory => crate::driver::MemoryDriver::new(),
DriverKind::Redis => {
crate::redis_driver::RedisDriver::new(&self.config.redis_url()).await?
}
};
let metrics = ctx
.metrics_registry
.as_ref()
.map(|reg| Arc::new(JobsMetrics::from_registry(reg.as_ref())));
let mut delayed_tasks = Vec::new();
for q in &self.config.queues {
let d = driver.clone();
let q_name = q.name.clone();
let mut sd = ctx.shutdown.clone();
delayed_tasks.push(tokio::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => {
let _ = d.promote_delayed(&q_name).await;
}
_ = sd.changed() => {
if *sd.borrow() { return; }
}
}
}
}));
}
let mut tasks = Vec::new();
for q in &self.config.queues {
for _ in 0..q.concurrency {
let d = driver.clone();
let exec = ctx.executor.clone();
let q_cfg = q.clone();
let sd = ctx.shutdown.clone();
let m = metrics.clone();
tasks.push(tokio::spawn(async move {
consume_loop(d, exec, q_cfg, sd, m).await;
}));
}
}
info!(
queues = self.config.queues.len(),
driver = ?self.config.driver,
"jobs plugin started"
);
if let Some(reg) = &ctx.rpc_registrar {
let d = driver.clone();
let m = metrics.clone();
reg.register_raw(
"jobs.push".into(),
Arc::new(move |payload: Bytes| -> BoxFuture<'static, Result<Bytes>> {
let d = d.clone();
let m = m.clone();
Box::pin(async move {
let req: PushRequest = serde_json::from_slice(&payload)?;
if req.delay > 0 {
d.push_delayed(&req.queue, Bytes::from(req.payload), req.delay)
.await?;
} else {
d.push(&req.queue, Bytes::from(req.payload)).await?;
}
if let Some(m) = &m {
m.pushed_total.with_labels(&[&req.queue]).inc();
}
Ok(Bytes::from(serde_json::to_vec(&"ok")?))
})
}),
)
.await;
let d = driver.clone();
let queues: Vec<String> = self.config.queues.iter().map(|q| q.name.clone()).collect();
reg.register_raw(
"jobs.stats".into(),
Arc::new(move |_: Bytes| -> BoxFuture<'static, Result<Bytes>> {
let d = d.clone();
let queues = queues.clone();
Box::pin(async move {
let mut stats = vec![];
for q in &queues {
let depth = d.depth(q).await.unwrap_or(0);
stats.push(format!("{q}: depth={depth}"));
}
Ok(Bytes::from(serde_json::to_vec(&stats)?))
})
}),
)
.await;
}
let mut sd = ctx.shutdown.clone();
if let Err(e) = sd.changed().await {
tracing::error!(error = %e, "shutdown sender dropped unexpectedly");
}
for t in tasks {
let _ = t.await;
}
for t in delayed_tasks {
t.abort();
}
Ok(())
}
fn rpc_methods(&self) -> Vec<RpcMethodDef> {
vec![
RpcMethodDef::new("jobs.push", "push a job to a queue (supports delay)"),
RpcMethodDef::new("jobs.stats", "queue depth and processing counters"),
]
}
}
async fn consume_loop(
driver: Arc<dyn Driver>,
executor: Arc<dyn Executor>,
queue: QueueConfig,
mut shutdown: tokio::sync::watch::Receiver<bool>,
metrics: Option<Arc<JobsMetrics>>,
) {
loop {
let job = tokio::select! {
job = driver.pop(&queue.name) => job,
_ = shutdown.changed() => {
if *shutdown.borrow() { return; }
continue;
}
};
let payload = match job {
Ok(Some(p)) => p,
Ok(None) => continue,
Err(e) => {
error!(error = ?e, queue = %queue.name, "driver pop error");
continue;
}
};
let payload_str = String::from_utf8_lossy(&payload).into_owned();
let job_value = serde_json::json!({ "payload": payload_str });
if let Some(m) = &metrics {
m.active.with_labels(&[&queue.name]).inc();
}
let mut retries = 0u32;
let start = std::time::Instant::now();
loop {
let result = if queue.job_timeout.is_zero() {
executor
.execute_value("jobs.process", job_value.clone())
.await
.map(|_| ())
} else {
match tokio::time::timeout(
queue.job_timeout,
executor.execute_value("jobs.process", job_value.clone()),
)
.await
{
Ok(r) => r.map(|_| ()),
Err(_) => Err(anyhow::anyhow!(
"job timed out after {:?}",
queue.job_timeout
)),
}
};
match result {
Ok(_) => {
if let Some(m) = &metrics {
m.processed_total.with_labels(&[&queue.name, "ok"]).inc();
m.processing_duration
.with_labels(&[&queue.name])
.observe(start.elapsed().as_secs_f64());
}
break;
}
Err(e) => {
retries += 1;
if retries >= queue.max_retries {
if let Some(dlq) = &queue.dead_letter_queue {
if let Err(dlq_err) = driver.push(dlq, payload.clone()).await {
error!(
queue = %queue.name,
dlq = %dlq,
error = ?dlq_err,
"failed to push to dead letter queue"
);
}
if let Some(m) = &metrics {
m.dead_letter_total.with_labels(&[&queue.name]).inc();
}
warn!(
queue = %queue.name,
retries,
dlq = %dlq,
"job failed after max retries; sent to DLQ"
);
} else {
error!(
queue = %queue.name,
retries,
"job failed after max retries; discarding"
);
}
if let Some(m) = &metrics {
m.processed_total
.with_labels(&[&queue.name, "failed"])
.inc();
m.processing_duration
.with_labels(&[&queue.name])
.observe(start.elapsed().as_secs_f64());
}
break;
}
if let Some(m) = &metrics {
m.retries_total.with_labels(&[&queue.name]).inc();
}
let delay = queue.retry_backoff.delay(queue.retry_delay, retries);
warn!(
queue = %queue.name,
retries,
delay_ms = delay.as_millis() as u64,
error = ?e,
"job failed; retrying"
);
tokio::time::sleep(delay).await;
}
}
}
if let Some(m) = &metrics {
m.active.with_labels(&[&queue.name]).dec();
}
}
}