folk-plugin-jobs 0.3.3

Queue consumer plugin for Folk — pulls jobs from memory or Redis and dispatches to PHP workers
Documentation
use anyhow::{Context, Result};
use async_trait::async_trait;
use bytes::Bytes;
use redis::AsyncCommands;
use redis::aio::MultiplexedConnection;

use crate::driver::Driver;

pub struct RedisDriver {
    conn: MultiplexedConnection,
}

impl RedisDriver {
    pub async fn new(url: &str) -> Result<std::sync::Arc<Self>> {
        let client = redis::Client::open(url).context("redis connect")?;
        let mut conn = client
            .get_multiplexed_async_connection()
            .await
            .context("redis connection")?;
        redis::cmd("PING")
            .query_async::<_, String>(&mut conn)
            .await
            .context("redis PING")?;
        Ok(std::sync::Arc::new(Self { conn }))
    }
}

#[async_trait]
impl Driver for RedisDriver {
    async fn pop(&self, queue: &str) -> Result<Option<Bytes>> {
        let mut conn = self.conn.clone();
        let result: Option<(String, Vec<u8>)> = conn.blpop(queue, 0.0).await.context("BLPOP")?;
        Ok(result.map(|(_, v)| Bytes::from(v)))
    }

    async fn push(&self, queue: &str, payload: Bytes) -> Result<()> {
        let mut conn = self.conn.clone();
        conn.rpush::<_, _, ()>(queue, payload.to_vec()).await?;
        Ok(())
    }

    async fn push_delayed(&self, queue: &str, payload: Bytes, delay_secs: u64) -> Result<()> {
        if delay_secs == 0 {
            return self.push(queue, payload).await;
        }
        let mut conn = self.conn.clone();
        let score = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_secs()
            + delay_secs;
        let delayed_key = format!("{queue}:delayed");
        conn.zadd::<_, _, _, ()>(&delayed_key, payload.to_vec(), score as f64)
            .await?;
        Ok(())
    }

    async fn promote_delayed(&self, queue: &str) -> Result<u64> {
        let mut conn = self.conn.clone();
        let now = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_secs() as f64;
        let delayed_key = format!("{queue}:delayed");

        // Atomic promote via Lua: ZRANGE BYSCORE → RPUSH → ZREM in one call.
        let count: u64 = redis::Script::new(
            r"local items = redis.call('ZRANGEBYSCORE', KEYS[1], '0', ARGV[1])
              for _, v in ipairs(items) do
                  redis.call('RPUSH', KEYS[2], v)
                  redis.call('ZREM', KEYS[1], v)
              end
              return #items",
        )
        .key(&delayed_key)
        .key(queue)
        .arg(now)
        .invoke_async(&mut conn)
        .await
        .context("promote_delayed lua script")?;

        Ok(count)
    }

    async fn depth(&self, queue: &str) -> Result<u64> {
        let mut conn = self.conn.clone();
        Ok(conn.llen::<_, u64>(queue).await.unwrap_or(0))
    }
}