1use std::sync::Arc;
2use std::time::Duration;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use bytes::Bytes;
7use folk_api::metrics::{CounterVec, GaugeVec, HistogramVec};
8use folk_api::{BoxFuture, Executor, PluginContext, RpcMethodDef, ServerPlugin};
9use tracing::{error, info, warn};
10
11use serde::Deserialize;
12
13use crate::config::{DriverKind, JobsConfig, QueueConfig};
14use crate::driver::Driver;
15
16#[derive(Deserialize)]
17struct PushRequest {
18 queue: String,
19 payload: String,
20 #[serde(default)]
21 delay: u64,
22}
23
24struct JobsMetrics {
25 pushed_total: Arc<dyn CounterVec>,
26 processed_total: Arc<dyn CounterVec>,
27 processing_duration: Arc<dyn HistogramVec>,
28 retries_total: Arc<dyn CounterVec>,
29 dead_letter_total: Arc<dyn CounterVec>,
30 active: Arc<dyn GaugeVec>,
31}
32
33impl JobsMetrics {
34 fn from_registry(reg: &dyn folk_api::metrics::MetricsRegistry) -> Self {
35 Self {
36 pushed_total: reg.counter_vec(
37 "folk_jobs_pushed_total",
38 "Number of jobs pushed",
39 &["queue"],
40 ),
41 processed_total: reg.counter_vec(
42 "folk_jobs_processed_total",
43 "Number of jobs processed",
44 &["queue", "status"],
45 ),
46 processing_duration: reg.histogram_vec(
47 "folk_jobs_processing_duration_seconds",
48 "Job processing duration",
49 &["queue"],
50 ),
51 retries_total: reg.counter_vec(
52 "folk_jobs_retries_total",
53 "Number of job retries",
54 &["queue"],
55 ),
56 dead_letter_total: reg.counter_vec(
57 "folk_jobs_dead_letter_total",
58 "Number of jobs sent to dead letter queue",
59 &["queue"],
60 ),
61 active: reg.gauge_vec(
62 "folk_jobs_active",
63 "Jobs currently being processed",
64 &["queue"],
65 ),
66 }
67 }
68}
69
70pub struct JobsPlugin {
71 config: JobsConfig,
72}
73
74impl JobsPlugin {
75 pub fn new(config: JobsConfig) -> Self {
76 Self { config }
77 }
78}
79
80#[async_trait]
81impl ServerPlugin for JobsPlugin {
82 fn name(&self) -> &'static str {
83 "jobs"
84 }
85
86 async fn run(&self, ctx: PluginContext) -> Result<()> {
87 let driver: Arc<dyn Driver> = match self.config.driver {
88 DriverKind::Memory => crate::driver::MemoryDriver::new(),
89 DriverKind::Redis => {
90 crate::redis_driver::RedisDriver::new(&self.config.redis_url()).await?
91 }
92 };
93
94 let metrics = ctx
95 .metrics_registry
96 .as_ref()
97 .map(|reg| Arc::new(JobsMetrics::from_registry(reg.as_ref())));
98
99 let mut delayed_tasks = Vec::new();
101 for q in &self.config.queues {
102 let d = driver.clone();
103 let q_name = q.name.clone();
104 let mut sd = ctx.shutdown.clone();
105 delayed_tasks.push(tokio::spawn(async move {
106 loop {
107 tokio::select! {
108 _ = tokio::time::sleep(Duration::from_secs(1)) => {
109 let _ = d.promote_delayed(&q_name).await;
110 }
111 _ = sd.changed() => {
112 if *sd.borrow() { return; }
113 }
114 }
115 }
116 }));
117 }
118
119 let mut tasks = Vec::new();
121 for q in &self.config.queues {
122 for _ in 0..q.concurrency {
123 let d = driver.clone();
124 let exec = ctx.executor.clone();
125 let q_cfg = q.clone();
126 let sd = ctx.shutdown.clone();
127 let m = metrics.clone();
128 tasks.push(tokio::spawn(async move {
129 consume_loop(d, exec, q_cfg, sd, m).await;
130 }));
131 }
132 }
133
134 info!(
135 queues = self.config.queues.len(),
136 driver = ?self.config.driver,
137 "jobs plugin started"
138 );
139
140 if let Some(reg) = &ctx.rpc_registrar {
142 let d = driver.clone();
144 let m = metrics.clone();
145 reg.register_raw(
146 "jobs.push".into(),
147 Arc::new(move |payload: Bytes| -> BoxFuture<'static, Result<Bytes>> {
148 let d = d.clone();
149 let m = m.clone();
150 Box::pin(async move {
151 let req: PushRequest = serde_json::from_slice(&payload)?;
152 if req.delay > 0 {
153 d.push_delayed(&req.queue, Bytes::from(req.payload), req.delay)
154 .await?;
155 } else {
156 d.push(&req.queue, Bytes::from(req.payload)).await?;
157 }
158 if let Some(m) = &m {
159 m.pushed_total.with_labels(&[&req.queue]).inc();
160 }
161 Ok(Bytes::from(serde_json::to_vec(&"ok")?))
162 })
163 }),
164 )
165 .await;
166
167 let d = driver.clone();
169 let queues: Vec<String> = self.config.queues.iter().map(|q| q.name.clone()).collect();
170 reg.register_raw(
171 "jobs.stats".into(),
172 Arc::new(move |_: Bytes| -> BoxFuture<'static, Result<Bytes>> {
173 let d = d.clone();
174 let queues = queues.clone();
175 Box::pin(async move {
176 let mut stats = vec![];
177 for q in &queues {
178 let depth = d.depth(q).await.unwrap_or(0);
179 stats.push(format!("{q}: depth={depth}"));
180 }
181 Ok(Bytes::from(serde_json::to_vec(&stats)?))
182 })
183 }),
184 )
185 .await;
186 }
187
188 let mut sd = ctx.shutdown.clone();
189 if let Err(e) = sd.changed().await {
190 tracing::error!(error = %e, "shutdown sender dropped unexpectedly");
191 }
192 for t in tasks {
193 let _ = t.await;
194 }
195 for t in delayed_tasks {
196 t.abort();
197 }
198 Ok(())
199 }
200
201 fn rpc_methods(&self) -> Vec<RpcMethodDef> {
202 vec![
203 RpcMethodDef::new("jobs.push", "push a job to a queue (supports delay)"),
204 RpcMethodDef::new("jobs.stats", "queue depth and processing counters"),
205 ]
206 }
207}
208
209async fn consume_loop(
210 driver: Arc<dyn Driver>,
211 executor: Arc<dyn Executor>,
212 queue: QueueConfig,
213 mut shutdown: tokio::sync::watch::Receiver<bool>,
214 metrics: Option<Arc<JobsMetrics>>,
215) {
216 loop {
217 let job = tokio::select! {
218 job = driver.pop(&queue.name) => job,
219 _ = shutdown.changed() => {
220 if *shutdown.borrow() { return; }
221 continue;
222 }
223 };
224
225 let payload = match job {
226 Ok(Some(p)) => p,
227 Ok(None) => continue,
228 Err(e) => {
229 error!(error = ?e, queue = %queue.name, "driver pop error");
230 continue;
231 }
232 };
233
234 let payload_str = String::from_utf8_lossy(&payload).into_owned();
236 let job_value = serde_json::json!({ "payload": payload_str });
237
238 if let Some(m) = &metrics {
239 m.active.with_labels(&[&queue.name]).inc();
240 }
241
242 let mut retries = 0u32;
243 let start = std::time::Instant::now();
244 loop {
245 let result = if queue.job_timeout.is_zero() {
246 executor
247 .execute_value("jobs.process", job_value.clone())
248 .await
249 .map(|_| ())
250 } else {
251 match tokio::time::timeout(
252 queue.job_timeout,
253 executor.execute_value("jobs.process", job_value.clone()),
254 )
255 .await
256 {
257 Ok(r) => r.map(|_| ()),
258 Err(_) => Err(anyhow::anyhow!(
259 "job timed out after {:?}",
260 queue.job_timeout
261 )),
262 }
263 };
264
265 match result {
266 Ok(_) => {
267 if let Some(m) = &metrics {
268 m.processed_total.with_labels(&[&queue.name, "ok"]).inc();
269 m.processing_duration
270 .with_labels(&[&queue.name])
271 .observe(start.elapsed().as_secs_f64());
272 }
273 break;
274 }
275 Err(e) => {
276 retries += 1;
277 if retries >= queue.max_retries {
278 if let Some(dlq) = &queue.dead_letter_queue {
280 if let Err(dlq_err) = driver.push(dlq, payload.clone()).await {
281 error!(
282 queue = %queue.name,
283 dlq = %dlq,
284 error = ?dlq_err,
285 "failed to push to dead letter queue"
286 );
287 }
288 if let Some(m) = &metrics {
289 m.dead_letter_total.with_labels(&[&queue.name]).inc();
290 }
291 warn!(
292 queue = %queue.name,
293 retries,
294 dlq = %dlq,
295 "job failed after max retries; sent to DLQ"
296 );
297 } else {
298 error!(
299 queue = %queue.name,
300 retries,
301 "job failed after max retries; discarding"
302 );
303 }
304 if let Some(m) = &metrics {
305 m.processed_total
306 .with_labels(&[&queue.name, "failed"])
307 .inc();
308 m.processing_duration
309 .with_labels(&[&queue.name])
310 .observe(start.elapsed().as_secs_f64());
311 }
312 break;
313 }
314 if let Some(m) = &metrics {
315 m.retries_total.with_labels(&[&queue.name]).inc();
316 }
317 let delay = queue.retry_backoff.delay(queue.retry_delay, retries);
318 warn!(
319 queue = %queue.name,
320 retries,
321 delay_ms = delay.as_millis() as u64,
322 error = ?e,
323 "job failed; retrying"
324 );
325 tokio::time::sleep(delay).await;
326 }
327 }
328 }
329
330 if let Some(m) = &metrics {
331 m.active.with_labels(&[&queue.name]).dec();
332 }
333 }
334}