Skip to main content

folk_plugin_jobs/
plugin.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use bytes::Bytes;
7use folk_api::metrics::{CounterVec, GaugeVec, HistogramVec};
8use folk_api::{BoxFuture, Executor, PluginContext, RpcMethodDef, ServerPlugin};
9use tracing::{error, info, warn};
10
11use serde::Deserialize;
12
13use crate::config::{DriverKind, JobsConfig, QueueConfig};
14use crate::driver::Driver;
15
16#[derive(Deserialize)]
17struct PushRequest {
18    queue: String,
19    payload: String,
20    #[serde(default)]
21    delay: u64,
22}
23
24struct JobsMetrics {
25    pushed_total: Arc<dyn CounterVec>,
26    processed_total: Arc<dyn CounterVec>,
27    processing_duration: Arc<dyn HistogramVec>,
28    retries_total: Arc<dyn CounterVec>,
29    dead_letter_total: Arc<dyn CounterVec>,
30    active: Arc<dyn GaugeVec>,
31}
32
33impl JobsMetrics {
34    fn from_registry(reg: &dyn folk_api::metrics::MetricsRegistry) -> Self {
35        Self {
36            pushed_total: reg.counter_vec(
37                "folk_jobs_pushed_total",
38                "Number of jobs pushed",
39                &["queue"],
40            ),
41            processed_total: reg.counter_vec(
42                "folk_jobs_processed_total",
43                "Number of jobs processed",
44                &["queue", "status"],
45            ),
46            processing_duration: reg.histogram_vec(
47                "folk_jobs_processing_duration_seconds",
48                "Job processing duration",
49                &["queue"],
50            ),
51            retries_total: reg.counter_vec(
52                "folk_jobs_retries_total",
53                "Number of job retries",
54                &["queue"],
55            ),
56            dead_letter_total: reg.counter_vec(
57                "folk_jobs_dead_letter_total",
58                "Number of jobs sent to dead letter queue",
59                &["queue"],
60            ),
61            active: reg.gauge_vec(
62                "folk_jobs_active",
63                "Jobs currently being processed",
64                &["queue"],
65            ),
66        }
67    }
68}
69
70pub struct JobsPlugin {
71    config: JobsConfig,
72}
73
74impl JobsPlugin {
75    pub fn new(config: JobsConfig) -> Self {
76        Self { config }
77    }
78}
79
80#[async_trait]
81impl ServerPlugin for JobsPlugin {
82    fn name(&self) -> &'static str {
83        "jobs"
84    }
85
86    async fn run(&self, ctx: PluginContext) -> Result<()> {
87        let driver: Arc<dyn Driver> = match self.config.driver {
88            DriverKind::Memory => crate::driver::MemoryDriver::new(),
89            DriverKind::Redis => {
90                crate::redis_driver::RedisDriver::new(&self.config.redis_url()).await?
91            }
92        };
93
94        let metrics = ctx
95            .metrics_registry
96            .as_ref()
97            .map(|reg| Arc::new(JobsMetrics::from_registry(reg.as_ref())));
98
99        // Delayed job promoter — polls every second
100        let mut delayed_tasks = Vec::new();
101        for q in &self.config.queues {
102            let d = driver.clone();
103            let q_name = q.name.clone();
104            let mut sd = ctx.shutdown.clone();
105            delayed_tasks.push(tokio::spawn(async move {
106                loop {
107                    tokio::select! {
108                        _ = tokio::time::sleep(Duration::from_secs(1)) => {
109                            let _ = d.promote_delayed(&q_name).await;
110                        }
111                        _ = sd.changed() => {
112                            if *sd.borrow() { return; }
113                        }
114                    }
115                }
116            }));
117        }
118
119        // One consumer task per queue × concurrency
120        let mut tasks = Vec::new();
121        for q in &self.config.queues {
122            for _ in 0..q.concurrency {
123                let d = driver.clone();
124                let exec = ctx.executor.clone();
125                let q_cfg = q.clone();
126                let sd = ctx.shutdown.clone();
127                let m = metrics.clone();
128                tasks.push(tokio::spawn(async move {
129                    consume_loop(d, exec, q_cfg, sd, m).await;
130                }));
131            }
132        }
133
134        info!(
135            queues = self.config.queues.len(),
136            driver = ?self.config.driver,
137            "jobs plugin started"
138        );
139
140        // Register RPC methods
141        if let Some(reg) = &ctx.rpc_registrar {
142            // jobs.push — push a job to a queue
143            let d = driver.clone();
144            let m = metrics.clone();
145            reg.register_raw(
146                "jobs.push".into(),
147                Arc::new(move |payload: Bytes| -> BoxFuture<'static, Result<Bytes>> {
148                    let d = d.clone();
149                    let m = m.clone();
150                    Box::pin(async move {
151                        let req: PushRequest = serde_json::from_slice(&payload)?;
152                        if req.delay > 0 {
153                            d.push_delayed(&req.queue, Bytes::from(req.payload), req.delay)
154                                .await?;
155                        } else {
156                            d.push(&req.queue, Bytes::from(req.payload)).await?;
157                        }
158                        if let Some(m) = &m {
159                            m.pushed_total.with_labels(&[&req.queue]).inc();
160                        }
161                        Ok(Bytes::from(serde_json::to_vec(&"ok")?))
162                    })
163                }),
164            )
165            .await;
166
167            // jobs.stats — queue depth counters
168            let d = driver.clone();
169            let queues: Vec<String> = self.config.queues.iter().map(|q| q.name.clone()).collect();
170            reg.register_raw(
171                "jobs.stats".into(),
172                Arc::new(move |_: Bytes| -> BoxFuture<'static, Result<Bytes>> {
173                    let d = d.clone();
174                    let queues = queues.clone();
175                    Box::pin(async move {
176                        let mut stats = vec![];
177                        for q in &queues {
178                            let depth = d.depth(q).await.unwrap_or(0);
179                            stats.push(format!("{q}: depth={depth}"));
180                        }
181                        Ok(Bytes::from(serde_json::to_vec(&stats)?))
182                    })
183                }),
184            )
185            .await;
186        }
187
188        let mut sd = ctx.shutdown.clone();
189        if let Err(e) = sd.changed().await {
190            tracing::error!(error = %e, "shutdown sender dropped unexpectedly");
191        }
192        for t in tasks {
193            let _ = t.await;
194        }
195        for t in delayed_tasks {
196            t.abort();
197        }
198        Ok(())
199    }
200
201    fn rpc_methods(&self) -> Vec<RpcMethodDef> {
202        vec![
203            RpcMethodDef::new("jobs.push", "push a job to a queue (supports delay)"),
204            RpcMethodDef::new("jobs.stats", "queue depth and processing counters"),
205        ]
206    }
207}
208
209async fn consume_loop(
210    driver: Arc<dyn Driver>,
211    executor: Arc<dyn Executor>,
212    queue: QueueConfig,
213    mut shutdown: tokio::sync::watch::Receiver<bool>,
214    metrics: Option<Arc<JobsMetrics>>,
215) {
216    loop {
217        let job = tokio::select! {
218            job = driver.pop(&queue.name) => job,
219            _ = shutdown.changed() => {
220                if *shutdown.borrow() { return; }
221                continue;
222            }
223        };
224
225        let payload = match job {
226            Ok(Some(p)) => p,
227            Ok(None) => continue,
228            Err(e) => {
229                error!(error = ?e, queue = %queue.name, "driver pop error");
230                continue;
231            }
232        };
233
234        // PHP __folk_dispatch expects array params, wrap payload string in an object
235        let payload_str = String::from_utf8_lossy(&payload).into_owned();
236        let job_value = serde_json::json!({ "payload": payload_str });
237
238        if let Some(m) = &metrics {
239            m.active.with_labels(&[&queue.name]).inc();
240        }
241
242        let mut retries = 0u32;
243        let start = std::time::Instant::now();
244        loop {
245            let result = if queue.job_timeout.is_zero() {
246                executor
247                    .execute_value("jobs.process", job_value.clone())
248                    .await
249                    .map(|_| ())
250            } else {
251                match tokio::time::timeout(
252                    queue.job_timeout,
253                    executor.execute_value("jobs.process", job_value.clone()),
254                )
255                .await
256                {
257                    Ok(r) => r.map(|_| ()),
258                    Err(_) => Err(anyhow::anyhow!(
259                        "job timed out after {:?}",
260                        queue.job_timeout
261                    )),
262                }
263            };
264
265            match result {
266                Ok(_) => {
267                    if let Some(m) = &metrics {
268                        m.processed_total.with_labels(&[&queue.name, "ok"]).inc();
269                        m.processing_duration
270                            .with_labels(&[&queue.name])
271                            .observe(start.elapsed().as_secs_f64());
272                    }
273                    break;
274                }
275                Err(e) => {
276                    retries += 1;
277                    if retries >= queue.max_retries {
278                        // Dead letter queue or discard
279                        if let Some(dlq) = &queue.dead_letter_queue {
280                            if let Err(dlq_err) = driver.push(dlq, payload.clone()).await {
281                                error!(
282                                    queue = %queue.name,
283                                    dlq = %dlq,
284                                    error = ?dlq_err,
285                                    "failed to push to dead letter queue"
286                                );
287                            }
288                            if let Some(m) = &metrics {
289                                m.dead_letter_total.with_labels(&[&queue.name]).inc();
290                            }
291                            warn!(
292                                queue = %queue.name,
293                                retries,
294                                dlq = %dlq,
295                                "job failed after max retries; sent to DLQ"
296                            );
297                        } else {
298                            error!(
299                                queue = %queue.name,
300                                retries,
301                                "job failed after max retries; discarding"
302                            );
303                        }
304                        if let Some(m) = &metrics {
305                            m.processed_total
306                                .with_labels(&[&queue.name, "failed"])
307                                .inc();
308                            m.processing_duration
309                                .with_labels(&[&queue.name])
310                                .observe(start.elapsed().as_secs_f64());
311                        }
312                        break;
313                    }
314                    if let Some(m) = &metrics {
315                        m.retries_total.with_labels(&[&queue.name]).inc();
316                    }
317                    let delay = queue.retry_backoff.delay(queue.retry_delay, retries);
318                    warn!(
319                        queue = %queue.name,
320                        retries,
321                        delay_ms = delay.as_millis() as u64,
322                        error = ?e,
323                        "job failed; retrying"
324                    );
325                    tokio::time::sleep(delay).await;
326                }
327            }
328        }
329
330        if let Some(m) = &metrics {
331            m.active.with_labels(&[&queue.name]).dec();
332        }
333    }
334}