folk-plugin-jobs 0.3.4

Queue consumer plugin for Folk — pulls jobs from memory or Redis and dispatches to PHP workers
Documentation
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use folk_api::{Executor, PluginContext, ServerPlugin};
use folk_plugin_jobs::Driver;
use folk_plugin_jobs::MemoryDriver;
use folk_plugin_jobs::config::{BackoffStrategy, DriverKind, JobsConfig, QueueConfig};
use folk_plugin_jobs::plugin::JobsPlugin;

struct CountingExecutor {
    count: AtomicU32,
}

impl CountingExecutor {
    fn new() -> Arc<Self> {
        Arc::new(Self {
            count: AtomicU32::new(0),
        })
    }
}

#[async_trait]
impl Executor for CountingExecutor {
    async fn execute_method(&self, _method: &str, _payload: Bytes) -> Result<Bytes> {
        self.count.fetch_add(1, Ordering::SeqCst);
        Ok(Bytes::new())
    }
}

struct FailingExecutor {
    call_count: AtomicU32,
    fail_until: u32,
}

impl FailingExecutor {
    fn new(fail_until: u32) -> Arc<Self> {
        Arc::new(Self {
            call_count: AtomicU32::new(0),
            fail_until,
        })
    }
}

#[async_trait]
impl Executor for FailingExecutor {
    async fn execute_method(&self, _method: &str, _payload: Bytes) -> Result<Bytes> {
        let n = self.call_count.fetch_add(1, Ordering::SeqCst) + 1;
        if n <= self.fail_until {
            anyhow::bail!("simulated failure #{n}");
        }
        Ok(Bytes::new())
    }
}

fn make_ctx(
    executor: Arc<dyn Executor>,
    shutdown_rx: tokio::sync::watch::Receiver<bool>,
) -> PluginContext {
    PluginContext {
        executor,
        shutdown: shutdown_rx,
        rpc_registrar: None,
        health_registry: None,
        metrics_registry: None,
    }
}

fn default_queue() -> QueueConfig {
    QueueConfig {
        name: "default".into(),
        concurrency: 1,
        max_retries: 3,
        retry_delay: Duration::from_millis(10),
        retry_backoff: BackoffStrategy::Fixed,
        job_timeout: Duration::ZERO,
        dead_letter_queue: None,
        priority: 10,
    }
}

#[tokio::test]
async fn jobs_plugin_dispatches_via_executor() {
    let executor = CountingExecutor::new();
    let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
    let _ctx = make_ctx(executor.clone(), shutdown_rx);

    let plugin = JobsPlugin::new(JobsConfig {
        driver: DriverKind::Memory,
        queues: vec![default_queue()],
        ..Default::default()
    });

    let driver = MemoryDriver::new();
    driver
        .push("default", Bytes::from("test-job"))
        .await
        .unwrap();

    let d = driver.clone();
    let exec = executor.clone();
    let handle = tokio::spawn(async move {
        let queue = default_queue();
        let payload = d.pop(&queue.name).await.unwrap().unwrap();
        let json_payload = serde_json::json!({"payload": String::from_utf8_lossy(&payload)});
        let encoded = Bytes::from(serde_json::to_vec(&json_payload).unwrap());
        exec.execute_method("jobs.process", encoded).await.unwrap();
    });

    tokio::time::timeout(Duration::from_millis(500), handle)
        .await
        .expect("timed out")
        .expect("task panicked");

    assert_eq!(executor.count.load(Ordering::SeqCst), 1);
    shutdown_tx.send(true).ok();

    assert_eq!(plugin.name(), "jobs");
}

#[tokio::test]
async fn retry_exhaustion_discards_job() {
    let executor = FailingExecutor::new(100);
    let driver = MemoryDriver::new();
    driver
        .push("default", Bytes::from("doomed-job"))
        .await
        .unwrap();

    let d = driver.clone();
    let exec: Arc<dyn Executor> = executor.clone();
    let (shutdown_tx, _shutdown_rx) = tokio::sync::watch::channel(false);

    let handle = tokio::spawn(async move {
        let queue = QueueConfig {
            max_retries: 2,
            ..default_queue()
        };

        let payload = d.pop(&queue.name).await.unwrap().unwrap();
        let json_payload = serde_json::json!({"payload": String::from_utf8_lossy(&payload)});
        let encoded = Bytes::from(serde_json::to_vec(&json_payload).unwrap());

        let mut retries = 0u32;
        loop {
            match exec.execute_method("jobs.process", encoded.clone()).await {
                Ok(_) => break,
                Err(_) => {
                    retries += 1;
                    if retries >= queue.max_retries {
                        break;
                    }
                    tokio::time::sleep(Duration::from_millis(10)).await;
                }
            }
        }
    });

    tokio::time::timeout(Duration::from_secs(2), handle)
        .await
        .expect("timed out")
        .expect("task panicked");

    assert_eq!(executor.call_count.load(Ordering::SeqCst), 2);
    shutdown_tx.send(true).ok();
}

#[tokio::test]
async fn dead_letter_queue_receives_failed_jobs() {
    let executor = FailingExecutor::new(100);
    let driver = MemoryDriver::new();
    driver
        .push("default", Bytes::from("dlq-job"))
        .await
        .unwrap();

    let d = driver.clone();
    let exec: Arc<dyn Executor> = executor.clone();

    let handle = tokio::spawn(async move {
        let queue = QueueConfig {
            max_retries: 2,
            dead_letter_queue: Some("failed".into()),
            ..default_queue()
        };

        let payload = d.pop(&queue.name).await.unwrap().unwrap();
        let json_payload = serde_json::json!({"payload": String::from_utf8_lossy(&payload)});
        let encoded = Bytes::from(serde_json::to_vec(&json_payload).unwrap());

        let mut retries = 0u32;
        loop {
            match exec.execute_method("jobs.process", encoded.clone()).await {
                Ok(_) => break,
                Err(_) => {
                    retries += 1;
                    if retries >= queue.max_retries {
                        d.push(queue.dead_letter_queue.as_ref().unwrap(), payload.clone())
                            .await
                            .unwrap();
                        break;
                    }
                    tokio::time::sleep(Duration::from_millis(10)).await;
                }
            }
        }
    });

    tokio::time::timeout(Duration::from_secs(2), handle)
        .await
        .expect("timed out")
        .expect("task panicked");

    assert_eq!(driver.depth("failed").await.unwrap(), 1);
    let dlq_job = tokio::time::timeout(Duration::from_millis(100), driver.pop("failed"))
        .await
        .unwrap()
        .unwrap()
        .unwrap();
    assert_eq!(dlq_job, Bytes::from("dlq-job"));
}

#[test]
fn backoff_strategy_exponential() {
    let base = Duration::from_millis(100);
    assert_eq!(
        BackoffStrategy::Exponential.delay(base, 0),
        Duration::from_millis(100)
    );
    assert_eq!(
        BackoffStrategy::Exponential.delay(base, 1),
        Duration::from_millis(200)
    );
    assert_eq!(
        BackoffStrategy::Exponential.delay(base, 2),
        Duration::from_millis(400)
    );
    assert_eq!(
        BackoffStrategy::Exponential.delay(base, 3),
        Duration::from_millis(800)
    );
}

#[test]
fn backoff_strategy_linear() {
    let base = Duration::from_millis(100);
    assert_eq!(
        BackoffStrategy::Linear.delay(base, 0),
        Duration::from_millis(100)
    );
    assert_eq!(
        BackoffStrategy::Linear.delay(base, 1),
        Duration::from_millis(200)
    );
    assert_eq!(
        BackoffStrategy::Linear.delay(base, 2),
        Duration::from_millis(300)
    );
}

#[test]
fn backoff_strategy_fixed() {
    let base = Duration::from_millis(100);
    assert_eq!(
        BackoffStrategy::Fixed.delay(base, 0),
        Duration::from_millis(100)
    );
    assert_eq!(
        BackoffStrategy::Fixed.delay(base, 5),
        Duration::from_millis(100)
    );
}

#[tokio::test]
async fn config_redis_url_generation() {
    let config = JobsConfig {
        driver: DriverKind::Redis,
        host: "myhost".into(),
        port: 6380,
        password: "secret".into(),
        db: 2,
        queues: vec![],
    };
    assert_eq!(config.redis_url(), "redis://:secret@myhost:6380/2");

    let config_no_pass = JobsConfig {
        driver: DriverKind::Redis,
        host: "localhost".into(),
        port: 6379,
        password: String::new(),
        db: 0,
        queues: vec![],
    };
    assert_eq!(config_no_pass.redis_url(), "redis://localhost:6379/0");
}

#[tokio::test]
async fn config_deserialization_with_new_fields() {
    let json = serde_json::json!({
        "driver": "memory",
        "queues": [{
            "name": "emails",
            "concurrency": 2,
            "max_retries": 5,
            "retry_delay": "2s",
            "retry_backoff": "linear",
            "job_timeout": "30s",
            "dead_letter_queue": "failed-emails",
            "priority": 1
        }]
    });

    let config: JobsConfig = serde_json::from_value(json).unwrap();
    assert_eq!(config.queues.len(), 1);
    let q = &config.queues[0];
    assert_eq!(q.name, "emails");
    assert_eq!(q.concurrency, 2);
    assert_eq!(q.max_retries, 5);
    assert_eq!(q.retry_delay, Duration::from_secs(2));
    assert_eq!(q.retry_backoff, BackoffStrategy::Linear);
    assert_eq!(q.job_timeout, Duration::from_secs(30));
    assert_eq!(q.dead_letter_queue.as_deref(), Some("failed-emails"));
    assert_eq!(q.priority, 1);
}

#[tokio::test]
async fn config_defaults_are_sensible() {
    let json = serde_json::json!({});
    let config: JobsConfig = serde_json::from_value(json).unwrap();
    assert_eq!(config.driver, DriverKind::Memory);
    assert_eq!(config.host, "127.0.0.1");
    assert_eq!(config.port, 6379);
    assert_eq!(config.queues.len(), 1);
    let q = &config.queues[0];
    assert_eq!(q.name, "default");
    assert_eq!(q.retry_delay, Duration::from_secs(1));
    assert_eq!(q.retry_backoff, BackoffStrategy::Exponential);
    assert_eq!(q.job_timeout, Duration::from_secs(60));
    assert!(q.dead_letter_queue.is_none());
}

#[test]
fn factory_accepts_empty_config() {
    use folk_plugin_jobs::folk_plugin_factory;
    assert!(
        folk_plugin_factory().create(serde_json::json!({})).is_ok(),
        "absent [jobs] section must not crash the server"
    );
}