folk-plugin-jobs 0.3.4

Queue consumer plugin for Folk — pulls jobs from memory or Redis and dispatches to PHP workers
Documentation
use std::collections::{BTreeMap, VecDeque};
use std::time::Instant;

use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use tokio::sync::Mutex;

/// A queue driver: push jobs in, pop them out.
#[async_trait]
pub trait Driver: Send + Sync + 'static {
    /// Block-pop: wait for a job. Returns `None` if shutdown.
    async fn pop(&self, queue: &str) -> Result<Option<Bytes>>;
    /// Push a job (for retry or testing).
    async fn push(&self, queue: &str, payload: Bytes) -> Result<()>;
    /// Push a job with a delay in seconds (0 = immediate).
    async fn push_delayed(&self, queue: &str, payload: Bytes, delay_secs: u64) -> Result<()>;
    /// Move ready delayed jobs into the main queue. Returns count promoted.
    async fn promote_delayed(&self, queue: &str) -> Result<u64>;
    /// Current queue depth (for metrics).
    async fn depth(&self, queue: &str) -> Result<u64>;
}

struct DelayedJob {
    ready_at: Instant,
    payload: Bytes,
}

/// In-memory driver (testing, no persistence).
pub struct MemoryDriver {
    queues: Mutex<std::collections::HashMap<String, VecDeque<Bytes>>>,
    delayed: Mutex<std::collections::HashMap<String, BTreeMap<u64, Vec<DelayedJob>>>>,
    next_id: Mutex<u64>,
    notify: tokio::sync::Notify,
}

impl MemoryDriver {
    pub fn new() -> std::sync::Arc<Self> {
        std::sync::Arc::new(Self {
            queues: Mutex::new(Default::default()),
            delayed: Mutex::new(Default::default()),
            next_id: Mutex::new(0),
            notify: tokio::sync::Notify::new(),
        })
    }
}

#[async_trait]
impl Driver for MemoryDriver {
    async fn pop(&self, queue: &str) -> Result<Option<Bytes>> {
        loop {
            {
                let mut qs = self.queues.lock().await;
                if let Some(item) = qs.entry(queue.to_string()).or_default().pop_front() {
                    return Ok(Some(item));
                }
            }
            self.notify.notified().await;
        }
    }

    async fn push(&self, queue: &str, payload: Bytes) -> Result<()> {
        let mut qs = self.queues.lock().await;
        qs.entry(queue.to_string()).or_default().push_back(payload);
        self.notify.notify_one();
        Ok(())
    }

    async fn push_delayed(&self, queue: &str, payload: Bytes, delay_secs: u64) -> Result<()> {
        if delay_secs == 0 {
            return self.push(queue, payload).await;
        }
        let ready_at = Instant::now() + std::time::Duration::from_secs(delay_secs);
        let mut delayed = self.delayed.lock().await;
        let mut id = self.next_id.lock().await;
        *id += 1;
        delayed
            .entry(queue.to_string())
            .or_default()
            .entry(*id)
            .or_default()
            .push(DelayedJob { ready_at, payload });
        Ok(())
    }

    async fn promote_delayed(&self, queue: &str) -> Result<u64> {
        let now = Instant::now();

        // Collect ready payloads under delayed lock, then release it.
        let ready_payloads = {
            let mut delayed = self.delayed.lock().await;
            let mut ready = Vec::new();
            if let Some(jobs) = delayed.get_mut(queue) {
                let keys: Vec<u64> = jobs.keys().copied().collect();
                for key in keys {
                    let entry = jobs.get_mut(&key).unwrap();
                    let mut remaining = Vec::new();
                    for job in entry.drain(..) {
                        if job.ready_at <= now {
                            ready.push(job.payload);
                        } else {
                            remaining.push(job);
                        }
                    }
                    if remaining.is_empty() {
                        jobs.remove(&key);
                    } else {
                        *jobs.get_mut(&key).unwrap() = remaining;
                    }
                }
            }
            ready
        }; // delayed lock released

        let count = ready_payloads.len() as u64;
        if count > 0 {
            let mut qs = self.queues.lock().await;
            let q = qs.entry(queue.to_string()).or_default();
            for payload in ready_payloads {
                q.push_back(payload);
            }
            drop(qs);
            self.notify.notify_one();
        }
        Ok(count)
    }

    async fn depth(&self, queue: &str) -> Result<u64> {
        Ok(self
            .queues
            .lock()
            .await
            .get(queue)
            .map(|q| q.len() as u64)
            .unwrap_or(0))
    }
}