folk-plugin-jobs 0.3.3

Queue consumer plugin for Folk — pulls jobs from memory or Redis and dispatches to PHP workers
Documentation
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use folk_api::metrics::{CounterVec, GaugeVec, HistogramVec};
use folk_api::{BoxFuture, Executor, PluginContext, RpcMethodDef, ServerPlugin};
use tracing::{error, info, warn};

use serde::Deserialize;

use crate::config::{DriverKind, JobsConfig, QueueConfig};
use crate::driver::Driver;

#[derive(Deserialize)]
struct PushRequest {
    queue: String,
    payload: String,
    #[serde(default)]
    delay: u64,
}

struct JobsMetrics {
    pushed_total: Arc<dyn CounterVec>,
    processed_total: Arc<dyn CounterVec>,
    processing_duration: Arc<dyn HistogramVec>,
    retries_total: Arc<dyn CounterVec>,
    dead_letter_total: Arc<dyn CounterVec>,
    active: Arc<dyn GaugeVec>,
}

impl JobsMetrics {
    fn from_registry(reg: &dyn folk_api::metrics::MetricsRegistry) -> Self {
        Self {
            pushed_total: reg.counter_vec(
                "folk_jobs_pushed_total",
                "Number of jobs pushed",
                &["queue"],
            ),
            processed_total: reg.counter_vec(
                "folk_jobs_processed_total",
                "Number of jobs processed",
                &["queue", "status"],
            ),
            processing_duration: reg.histogram_vec(
                "folk_jobs_processing_duration_seconds",
                "Job processing duration",
                &["queue"],
            ),
            retries_total: reg.counter_vec(
                "folk_jobs_retries_total",
                "Number of job retries",
                &["queue"],
            ),
            dead_letter_total: reg.counter_vec(
                "folk_jobs_dead_letter_total",
                "Number of jobs sent to dead letter queue",
                &["queue"],
            ),
            active: reg.gauge_vec(
                "folk_jobs_active",
                "Jobs currently being processed",
                &["queue"],
            ),
        }
    }
}

pub struct JobsPlugin {
    config: JobsConfig,
}

impl JobsPlugin {
    pub fn new(config: JobsConfig) -> Self {
        Self { config }
    }
}

#[async_trait]
impl ServerPlugin for JobsPlugin {
    fn name(&self) -> &'static str {
        "jobs"
    }

    async fn run(&self, ctx: PluginContext) -> Result<()> {
        let driver: Arc<dyn Driver> = match self.config.driver {
            DriverKind::Memory => crate::driver::MemoryDriver::new(),
            DriverKind::Redis => {
                crate::redis_driver::RedisDriver::new(&self.config.redis_url()).await?
            }
        };

        let metrics = ctx
            .metrics_registry
            .as_ref()
            .map(|reg| Arc::new(JobsMetrics::from_registry(reg.as_ref())));

        // Delayed job promoter — polls every second
        let mut delayed_tasks = Vec::new();
        for q in &self.config.queues {
            let d = driver.clone();
            let q_name = q.name.clone();
            let mut sd = ctx.shutdown.clone();
            delayed_tasks.push(tokio::spawn(async move {
                loop {
                    tokio::select! {
                        _ = tokio::time::sleep(Duration::from_secs(1)) => {
                            let _ = d.promote_delayed(&q_name).await;
                        }
                        _ = sd.changed() => {
                            if *sd.borrow() { return; }
                        }
                    }
                }
            }));
        }

        // One consumer task per queue × concurrency
        let mut tasks = Vec::new();
        for q in &self.config.queues {
            for _ in 0..q.concurrency {
                let d = driver.clone();
                let exec = ctx.executor.clone();
                let q_cfg = q.clone();
                let sd = ctx.shutdown.clone();
                let m = metrics.clone();
                tasks.push(tokio::spawn(async move {
                    consume_loop(d, exec, q_cfg, sd, m).await;
                }));
            }
        }

        info!(
            queues = self.config.queues.len(),
            driver = ?self.config.driver,
            "jobs plugin started"
        );

        // Register RPC methods
        if let Some(reg) = &ctx.rpc_registrar {
            // jobs.push — push a job to a queue
            let d = driver.clone();
            let m = metrics.clone();
            reg.register_raw(
                "jobs.push".into(),
                Arc::new(move |payload: Bytes| -> BoxFuture<'static, Result<Bytes>> {
                    let d = d.clone();
                    let m = m.clone();
                    Box::pin(async move {
                        let req: PushRequest = serde_json::from_slice(&payload)?;
                        if req.delay > 0 {
                            d.push_delayed(&req.queue, Bytes::from(req.payload), req.delay)
                                .await?;
                        } else {
                            d.push(&req.queue, Bytes::from(req.payload)).await?;
                        }
                        if let Some(m) = &m {
                            m.pushed_total.with_labels(&[&req.queue]).inc();
                        }
                        Ok(Bytes::from(serde_json::to_vec(&"ok")?))
                    })
                }),
            )
            .await;

            // jobs.stats — queue depth counters
            let d = driver.clone();
            let queues: Vec<String> = self.config.queues.iter().map(|q| q.name.clone()).collect();
            reg.register_raw(
                "jobs.stats".into(),
                Arc::new(move |_: Bytes| -> BoxFuture<'static, Result<Bytes>> {
                    let d = d.clone();
                    let queues = queues.clone();
                    Box::pin(async move {
                        let mut stats = vec![];
                        for q in &queues {
                            let depth = d.depth(q).await.unwrap_or(0);
                            stats.push(format!("{q}: depth={depth}"));
                        }
                        Ok(Bytes::from(serde_json::to_vec(&stats)?))
                    })
                }),
            )
            .await;
        }

        let mut sd = ctx.shutdown.clone();
        if let Err(e) = sd.changed().await {
            tracing::error!(error = %e, "shutdown sender dropped unexpectedly");
        }
        for t in tasks {
            let _ = t.await;
        }
        for t in delayed_tasks {
            t.abort();
        }
        Ok(())
    }

    fn rpc_methods(&self) -> Vec<RpcMethodDef> {
        vec![
            RpcMethodDef::new("jobs.push", "push a job to a queue (supports delay)"),
            RpcMethodDef::new("jobs.stats", "queue depth and processing counters"),
        ]
    }
}

async fn consume_loop(
    driver: Arc<dyn Driver>,
    executor: Arc<dyn Executor>,
    queue: QueueConfig,
    mut shutdown: tokio::sync::watch::Receiver<bool>,
    metrics: Option<Arc<JobsMetrics>>,
) {
    loop {
        let job = tokio::select! {
            job = driver.pop(&queue.name) => job,
            _ = shutdown.changed() => {
                if *shutdown.borrow() { return; }
                continue;
            }
        };

        let payload = match job {
            Ok(Some(p)) => p,
            Ok(None) => continue,
            Err(e) => {
                error!(error = ?e, queue = %queue.name, "driver pop error");
                continue;
            }
        };

        // PHP __folk_dispatch expects array params, wrap payload string in an object
        let payload_str = String::from_utf8_lossy(&payload).into_owned();
        let job_value = serde_json::json!({ "payload": payload_str });

        if let Some(m) = &metrics {
            m.active.with_labels(&[&queue.name]).inc();
        }

        let mut retries = 0u32;
        let start = std::time::Instant::now();
        loop {
            let result = if queue.job_timeout.is_zero() {
                executor
                    .execute_value("jobs.process", job_value.clone())
                    .await
                    .map(|_| ())
            } else {
                match tokio::time::timeout(
                    queue.job_timeout,
                    executor.execute_value("jobs.process", job_value.clone()),
                )
                .await
                {
                    Ok(r) => r.map(|_| ()),
                    Err(_) => Err(anyhow::anyhow!(
                        "job timed out after {:?}",
                        queue.job_timeout
                    )),
                }
            };

            match result {
                Ok(_) => {
                    if let Some(m) = &metrics {
                        m.processed_total.with_labels(&[&queue.name, "ok"]).inc();
                        m.processing_duration
                            .with_labels(&[&queue.name])
                            .observe(start.elapsed().as_secs_f64());
                    }
                    break;
                }
                Err(e) => {
                    retries += 1;
                    if retries >= queue.max_retries {
                        // Dead letter queue or discard
                        if let Some(dlq) = &queue.dead_letter_queue {
                            if let Err(dlq_err) = driver.push(dlq, payload.clone()).await {
                                error!(
                                    queue = %queue.name,
                                    dlq = %dlq,
                                    error = ?dlq_err,
                                    "failed to push to dead letter queue"
                                );
                            }
                            if let Some(m) = &metrics {
                                m.dead_letter_total.with_labels(&[&queue.name]).inc();
                            }
                            warn!(
                                queue = %queue.name,
                                retries,
                                dlq = %dlq,
                                "job failed after max retries; sent to DLQ"
                            );
                        } else {
                            error!(
                                queue = %queue.name,
                                retries,
                                "job failed after max retries; discarding"
                            );
                        }
                        if let Some(m) = &metrics {
                            m.processed_total
                                .with_labels(&[&queue.name, "failed"])
                                .inc();
                            m.processing_duration
                                .with_labels(&[&queue.name])
                                .observe(start.elapsed().as_secs_f64());
                        }
                        break;
                    }
                    if let Some(m) = &metrics {
                        m.retries_total.with_labels(&[&queue.name]).inc();
                    }
                    let delay = queue.retry_backoff.delay(queue.retry_delay, retries);
                    warn!(
                        queue = %queue.name,
                        retries,
                        delay_ms = delay.as_millis() as u64,
                        error = ?e,
                        "job failed; retrying"
                    );
                    tokio::time::sleep(delay).await;
                }
            }
        }

        if let Some(m) = &metrics {
            m.active.with_labels(&[&queue.name]).dec();
        }
    }
}