Skip to main content

robinpath_modules/modules/
queue_mod.rs

1use robinpath::{RobinPath, Value};
2use std::sync::{LazyLock, Mutex};
3use std::collections::HashMap;
4
5struct Queue {
6    name: String,
7    jobs: HashMap<String, Job>,
8    dead_letter: Vec<Job>,
9    max_dead_letter: usize,
10    paused: bool,
11    processed: u64,
12    failed: u64,
13    next_id: u64,
14}
15
16#[derive(Clone)]
17struct Job {
18    id: String,
19    data: Value,
20    priority: i64,
21    status: String,
22    attempts: u32,
23    max_attempts: u32,
24    created_at: u64,
25    scheduled_for: u64,
26    started_at: Option<u64>,
27    completed_at: Option<u64>,
28    failed_at: Option<u64>,
29    error: Option<String>,
30    result: Option<Value>,
31}
32
33static QUEUES: LazyLock<Mutex<HashMap<String, Queue>>> =
34    LazyLock::new(|| Mutex::new(HashMap::new()));
35
36fn now_ms() -> u64 {
37    std::time::SystemTime::now()
38        .duration_since(std::time::UNIX_EPOCH)
39        .unwrap_or_default()
40        .as_millis() as u64
41}
42
43pub fn register(rp: &mut RobinPath) {
44    // queue.create name options? → {name, paused, maxDeadLetter}
45    rp.register_builtin("queue.create", |args, _| {
46        let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
47        let opts = args.get(1).cloned().unwrap_or(Value::Null);
48        let max_dl = if let Value::Object(obj) = &opts {
49            obj.get("maxDeadLetter").map(|v| v.to_number() as usize).unwrap_or(1000)
50        } else {
51            1000
52        };
53        let mut queues = QUEUES.lock().unwrap();
54        let q = queues.entry(name.clone()).or_insert_with(|| Queue {
55            name: name.clone(),
56            jobs: HashMap::new(),
57            dead_letter: Vec::new(),
58            max_dead_letter: max_dl,
59            paused: false,
60            processed: 0,
61            failed: 0,
62            next_id: 1,
63        });
64        q.max_dead_letter = max_dl;
65        let mut obj = indexmap::IndexMap::new();
66        obj.insert("name".to_string(), Value::String(q.name.clone()));
67        obj.insert("paused".to_string(), Value::Bool(q.paused));
68        obj.insert("maxDeadLetter".to_string(), Value::Number(q.max_dead_letter as f64));
69        Ok(Value::Object(obj))
70    });
71
72    // queue.push queueName data options? → {id, status, priority}
73    rp.register_builtin("queue.push", |args, _| {
74        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
75        let data = args.get(1).cloned().unwrap_or(Value::Null);
76        let opts = args.get(2).cloned().unwrap_or(Value::Null);
77        let (priority, delay_ms, max_attempts) = if let Value::Object(obj) = &opts {
78            (
79                obj.get("priority").map(|v| v.to_number() as i64).unwrap_or(0),
80                obj.get("delay").map(|v| v.to_number() as u64).unwrap_or(0),
81                obj.get("maxAttempts").map(|v| v.to_number() as u32).unwrap_or(3),
82            )
83        } else {
84            (0, 0, 3)
85        };
86        let now = now_ms();
87        let mut queues = QUEUES.lock().unwrap();
88        let q = queues.entry(queue_name).or_insert_with(|| Queue {
89            name: "default".to_string(),
90            jobs: HashMap::new(),
91            dead_letter: Vec::new(),
92            max_dead_letter: 1000,
93            paused: false,
94            processed: 0,
95            failed: 0,
96            next_id: 1,
97        });
98        let id = format!("job_{}", q.next_id);
99        q.next_id += 1;
100        let status = if delay_ms > 0 { "delayed" } else { "pending" };
101        let job = Job {
102            id: id.clone(),
103            data,
104            priority,
105            status: status.to_string(),
106            attempts: 0,
107            max_attempts,
108            created_at: now,
109            scheduled_for: now + delay_ms,
110            started_at: None,
111            completed_at: None,
112            failed_at: None,
113            error: None,
114            result: None,
115        };
116        q.jobs.insert(id.clone(), job);
117        let mut obj = indexmap::IndexMap::new();
118        obj.insert("id".to_string(), Value::String(id));
119        obj.insert("status".to_string(), Value::String(status.to_string()));
120        obj.insert("priority".to_string(), Value::Number(priority as f64));
121        Ok(Value::Object(obj))
122    });
123
124    // queue.pop queueName → job or null
125    rp.register_builtin("queue.pop", |args, _| {
126        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
127        let mut queues = QUEUES.lock().unwrap();
128        let q = match queues.get_mut(&queue_name) {
129            Some(q) => q,
130            None => return Ok(Value::Null),
131        };
132        if q.paused {
133            return Ok(Value::Null);
134        }
135        let now = now_ms();
136        // Promote delayed jobs
137        for job in q.jobs.values_mut() {
138            if job.status == "delayed" && job.scheduled_for <= now {
139                job.status = "pending".to_string();
140            }
141        }
142        // Find best pending job (highest priority, oldest first)
143        let best_id = q.jobs.values()
144            .filter(|j| j.status == "pending")
145            .max_by(|a, b| {
146                a.priority.cmp(&b.priority)
147                    .then_with(|| b.created_at.cmp(&a.created_at))
148            })
149            .map(|j| j.id.clone());
150
151        if let Some(id) = best_id {
152            let job = q.jobs.get_mut(&id).unwrap();
153            job.status = "active".to_string();
154            job.attempts += 1;
155            job.started_at = Some(now);
156            let mut obj = indexmap::IndexMap::new();
157            obj.insert("id".to_string(), Value::String(job.id.clone()));
158            obj.insert("data".to_string(), job.data.clone());
159            obj.insert("priority".to_string(), Value::Number(job.priority as f64));
160            obj.insert("attempts".to_string(), Value::Number(job.attempts as f64));
161            obj.insert("maxAttempts".to_string(), Value::Number(job.max_attempts as f64));
162            Ok(Value::Object(obj))
163        } else {
164            Ok(Value::Null)
165        }
166    });
167
168    // queue.complete queueName jobId result? → {id, status, duration}
169    rp.register_builtin("queue.complete", |args, _| {
170        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
171        let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
172        let result = args.get(2).cloned();
173        let mut queues = QUEUES.lock().unwrap();
174        let q = queues.get_mut(&queue_name)
175            .ok_or_else(|| format!("Queue \"{}\" not found", queue_name))?;
176        let job = q.jobs.get_mut(&job_id)
177            .ok_or_else(|| format!("Job \"{}\" not found in queue \"{}\"", job_id, queue_name))?;
178        let now = now_ms();
179        job.status = "completed".to_string();
180        job.completed_at = Some(now);
181        job.result = result;
182        q.processed += 1;
183        let duration = now - job.started_at.unwrap_or(job.created_at);
184        let mut obj = indexmap::IndexMap::new();
185        obj.insert("id".to_string(), Value::String(job_id));
186        obj.insert("status".to_string(), Value::String("completed".to_string()));
187        obj.insert("duration".to_string(), Value::Number(duration as f64));
188        Ok(Value::Object(obj))
189    });
190
191    // queue.fail queueName jobId error → {id, status, attempts}
192    rp.register_builtin("queue.fail", |args, _| {
193        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
194        let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
195        let error = args.get(2).map(|v| v.to_display_string()).unwrap_or_else(|| "Unknown error".to_string());
196        let mut queues = QUEUES.lock().unwrap();
197        let q = queues.get_mut(&queue_name)
198            .ok_or_else(|| format!("Queue \"{}\" not found", queue_name))?;
199        let job = q.jobs.get_mut(&job_id)
200            .ok_or_else(|| format!("Job \"{}\" not found in queue \"{}\"", job_id, queue_name))?;
201        job.error = Some(error.clone());
202        job.failed_at = Some(now_ms());
203        if job.attempts < job.max_attempts {
204            job.status = "pending".to_string();
205            let mut obj = indexmap::IndexMap::new();
206            obj.insert("id".to_string(), Value::String(job_id));
207            obj.insert("status".to_string(), Value::String("retry".to_string()));
208            obj.insert("attempts".to_string(), Value::Number(job.attempts as f64));
209            obj.insert("maxAttempts".to_string(), Value::Number(job.max_attempts as f64));
210            Ok(Value::Object(obj))
211        } else {
212            let mut dead_job = job.clone();
213            dead_job.status = "failed".to_string();
214            q.failed += 1;
215            q.dead_letter.push(dead_job);
216            q.jobs.remove(&job_id);
217            if q.dead_letter.len() > q.max_dead_letter {
218                q.dead_letter.remove(0);
219            }
220            let mut obj = indexmap::IndexMap::new();
221            obj.insert("id".to_string(), Value::String(job_id));
222            obj.insert("status".to_string(), Value::String("dead-letter".to_string()));
223            obj.insert("attempts".to_string(), Value::Number(q.dead_letter.last().map(|j| j.attempts).unwrap_or(0) as f64));
224            obj.insert("error".to_string(), Value::String(error));
225            Ok(Value::Object(obj))
226        }
227    });
228
229    // queue.retry queueName jobId → {id, status, source}
230    rp.register_builtin("queue.retry", |args, _| {
231        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
232        let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
233        let mut queues = QUEUES.lock().unwrap();
234        let q = queues.get_mut(&queue_name)
235            .ok_or_else(|| format!("Queue \"{}\" not found", queue_name))?;
236        // Check dead letter first
237        if let Some(idx) = q.dead_letter.iter().position(|j| j.id == job_id) {
238            let mut job = q.dead_letter.remove(idx);
239            job.status = "pending".to_string();
240            job.attempts = 0;
241            job.error = None;
242            job.failed_at = None;
243            q.jobs.insert(job.id.clone(), job);
244            let mut obj = indexmap::IndexMap::new();
245            obj.insert("id".to_string(), Value::String(job_id));
246            obj.insert("status".to_string(), Value::String("pending".to_string()));
247            obj.insert("source".to_string(), Value::String("dead-letter".to_string()));
248            return Ok(Value::Object(obj));
249        }
250        let job = q.jobs.get_mut(&job_id)
251            .ok_or_else(|| format!("Job \"{}\" not found", job_id))?;
252        job.status = "pending".to_string();
253        job.error = None;
254        let mut obj = indexmap::IndexMap::new();
255        obj.insert("id".to_string(), Value::String(job_id));
256        obj.insert("status".to_string(), Value::String("pending".to_string()));
257        obj.insert("source".to_string(), Value::String("queue".to_string()));
258        Ok(Value::Object(obj))
259    });
260
261    // queue.remove queueName jobId → bool
262    rp.register_builtin("queue.remove", |args, _| {
263        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
264        let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
265        let mut queues = QUEUES.lock().unwrap();
266        if let Some(q) = queues.get_mut(&queue_name) {
267            Ok(Value::Bool(q.jobs.remove(&job_id).is_some()))
268        } else {
269            Ok(Value::Bool(false))
270        }
271    });
272
273    // queue.size queueName status? → number
274    rp.register_builtin("queue.size", |args, _| {
275        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
276        let status_filter = args.get(1).map(|v| v.to_display_string());
277        let mut queues = QUEUES.lock().unwrap();
278        if let Some(q) = queues.get_mut(&queue_name) {
279            let now = now_ms();
280            for job in q.jobs.values_mut() {
281                if job.status == "delayed" && job.scheduled_for <= now {
282                    job.status = "pending".to_string();
283                }
284            }
285            let count = if let Some(status) = status_filter {
286                q.jobs.values().filter(|j| j.status == status).count()
287            } else {
288                q.jobs.len()
289            };
290            Ok(Value::Number(count as f64))
291        } else {
292            Ok(Value::Number(0.0))
293        }
294    });
295
296    // queue.status queueName → {name, paused, total, pending, active, ...}
297    rp.register_builtin("queue.status", |args, _| {
298        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
299        let mut queues = QUEUES.lock().unwrap();
300        if let Some(q) = queues.get_mut(&queue_name) {
301            let now = now_ms();
302            let mut counts: HashMap<&str, u64> = HashMap::new();
303            for job in q.jobs.values_mut() {
304                if job.status == "delayed" && job.scheduled_for <= now {
305                    job.status = "pending".to_string();
306                }
307                *counts.entry(match job.status.as_str() {
308                    "pending" => "pending",
309                    "active" => "active",
310                    "completed" => "completed",
311                    "failed" => "failed",
312                    "delayed" => "delayed",
313                    _ => "pending",
314                }).or_insert(0) += 1;
315            }
316            let mut obj = indexmap::IndexMap::new();
317            obj.insert("name".to_string(), Value::String(q.name.clone()));
318            obj.insert("paused".to_string(), Value::Bool(q.paused));
319            obj.insert("total".to_string(), Value::Number(q.jobs.len() as f64));
320            obj.insert("pending".to_string(), Value::Number(*counts.get("pending").unwrap_or(&0) as f64));
321            obj.insert("active".to_string(), Value::Number(*counts.get("active").unwrap_or(&0) as f64));
322            obj.insert("completed".to_string(), Value::Number(*counts.get("completed").unwrap_or(&0) as f64));
323            obj.insert("failed".to_string(), Value::Number(*counts.get("failed").unwrap_or(&0) as f64));
324            obj.insert("delayed".to_string(), Value::Number(*counts.get("delayed").unwrap_or(&0) as f64));
325            obj.insert("deadLetter".to_string(), Value::Number(q.dead_letter.len() as f64));
326            obj.insert("totalProcessed".to_string(), Value::Number(q.processed as f64));
327            obj.insert("totalFailed".to_string(), Value::Number(q.failed as f64));
328            Ok(Value::Object(obj))
329        } else {
330            let mut obj = indexmap::IndexMap::new();
331            obj.insert("name".to_string(), Value::String(queue_name));
332            obj.insert("total".to_string(), Value::Number(0.0));
333            Ok(Value::Object(obj))
334        }
335    });
336
337    // queue.pause queueName → {name, paused: true}
338    rp.register_builtin("queue.pause", |args, _| {
339        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
340        let mut queues = QUEUES.lock().unwrap();
341        if let Some(q) = queues.get_mut(&queue_name) {
342            q.paused = true;
343        }
344        let mut obj = indexmap::IndexMap::new();
345        obj.insert("name".to_string(), Value::String(queue_name));
346        obj.insert("paused".to_string(), Value::Bool(true));
347        Ok(Value::Object(obj))
348    });
349
350    // queue.resume queueName → {name, paused: false}
351    rp.register_builtin("queue.resume", |args, _| {
352        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
353        let mut queues = QUEUES.lock().unwrap();
354        if let Some(q) = queues.get_mut(&queue_name) {
355            q.paused = false;
356        }
357        let mut obj = indexmap::IndexMap::new();
358        obj.insert("name".to_string(), Value::String(queue_name));
359        obj.insert("paused".to_string(), Value::Bool(false));
360        Ok(Value::Object(obj))
361    });
362
363    // queue.clear queueName → {cleared: number}
364    rp.register_builtin("queue.clear", |args, _| {
365        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
366        let mut queues = QUEUES.lock().unwrap();
367        let count = if let Some(q) = queues.get_mut(&queue_name) {
368            let c = q.jobs.len();
369            q.jobs.clear();
370            c
371        } else {
372            0
373        };
374        let mut obj = indexmap::IndexMap::new();
375        obj.insert("cleared".to_string(), Value::Number(count as f64));
376        Ok(Value::Object(obj))
377    });
378
379    // queue.deadLetter queueName limit? → array of failed jobs
380    rp.register_builtin("queue.deadLetter", |args, _| {
381        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
382        let limit = args.get(1).map(|v| v.to_number() as usize).unwrap_or(50);
383        let queues = QUEUES.lock().unwrap();
384        if let Some(q) = queues.get(&queue_name) {
385            let start = if q.dead_letter.len() > limit { q.dead_letter.len() - limit } else { 0 };
386            let items: Vec<Value> = q.dead_letter[start..].iter().map(|j| {
387                let mut obj = indexmap::IndexMap::new();
388                obj.insert("id".to_string(), Value::String(j.id.clone()));
389                obj.insert("data".to_string(), j.data.clone());
390                obj.insert("error".to_string(), Value::String(j.error.clone().unwrap_or_default()));
391                obj.insert("attempts".to_string(), Value::Number(j.attempts as f64));
392                Ok(Value::Object(obj))
393            }).collect::<Result<Vec<_>, String>>()?;
394            Ok(Value::Array(items))
395        } else {
396            Ok(Value::Array(Vec::new()))
397        }
398    });
399
400    // queue.getJob queueName jobId → job object or null
401    rp.register_builtin("queue.getJob", |args, _| {
402        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
403        let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
404        let queues = QUEUES.lock().unwrap();
405        if let Some(q) = queues.get(&queue_name) {
406            if let Some(job) = q.jobs.get(&job_id) {
407                let mut obj = indexmap::IndexMap::new();
408                obj.insert("id".to_string(), Value::String(job.id.clone()));
409                obj.insert("data".to_string(), job.data.clone());
410                obj.insert("status".to_string(), Value::String(job.status.clone()));
411                obj.insert("priority".to_string(), Value::Number(job.priority as f64));
412                obj.insert("attempts".to_string(), Value::Number(job.attempts as f64));
413                obj.insert("maxAttempts".to_string(), Value::Number(job.max_attempts as f64));
414                if let Some(err) = &job.error {
415                    obj.insert("error".to_string(), Value::String(err.clone()));
416                }
417                return Ok(Value::Object(obj));
418            }
419        }
420        Ok(Value::Null)
421    });
422
423    // queue.destroy queueName → bool
424    rp.register_builtin("queue.destroy", |args, _| {
425        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
426        let mut queues = QUEUES.lock().unwrap();
427        Ok(Value::Bool(queues.remove(&queue_name).is_some()))
428    });
429}