Skip to main content

robinpath_modules/modules/
queue_mod.rs

1use robinpath::{RobinPath, Value};
2use std::sync::{Arc, Mutex};
3use std::collections::HashMap;
4
5struct Queue {
6    name: String,
7    jobs: HashMap<String, Job>,
8    dead_letter: Vec<Job>,
9    max_dead_letter: usize,
10    paused: bool,
11    processed: u64,
12    failed: u64,
13    next_id: u64,
14}
15
16#[derive(Clone)]
17struct Job {
18    id: String,
19    data: Value,
20    priority: i64,
21    status: String,
22    attempts: u32,
23    max_attempts: u32,
24    created_at: u64,
25    scheduled_for: u64,
26    started_at: Option<u64>,
27    completed_at: Option<u64>,
28    failed_at: Option<u64>,
29    error: Option<String>,
30    result: Option<Value>,
31}
32
33fn now_ms() -> u64 {
34    std::time::SystemTime::now()
35        .duration_since(std::time::UNIX_EPOCH)
36        .unwrap_or_default()
37        .as_millis() as u64
38}
39
40pub fn register(rp: &mut RobinPath) {
41    let state = Arc::new(Mutex::new(HashMap::<String, Queue>::new()));
42
43    // queue.create name options? → {name, paused, maxDeadLetter}
44    let s = state.clone();
45    rp.register_builtin("queue.create", move |args, _| {
46        let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
47        let opts = args.get(1).cloned().unwrap_or(Value::Null);
48        let max_dl = if let Value::Object(obj) = &opts {
49            obj.get("maxDeadLetter").map(|v| v.to_number() as usize).unwrap_or(1000)
50        } else {
51            1000
52        };
53        let mut queues = s.lock().unwrap();
54        let q = queues.entry(name.clone()).or_insert_with(|| Queue {
55            name: name.clone(),
56            jobs: HashMap::new(),
57            dead_letter: Vec::new(),
58            max_dead_letter: max_dl,
59            paused: false,
60            processed: 0,
61            failed: 0,
62            next_id: 1,
63        });
64        q.max_dead_letter = max_dl;
65        let mut obj = indexmap::IndexMap::new();
66        obj.insert("name".to_string(), Value::String(q.name.clone()));
67        obj.insert("paused".to_string(), Value::Bool(q.paused));
68        obj.insert("maxDeadLetter".to_string(), Value::Number(q.max_dead_letter as f64));
69        Ok(Value::Object(obj))
70    });
71
72    // queue.push queueName data options? → {id, status, priority}
73    let s = state.clone();
74    rp.register_builtin("queue.push", move |args, _| {
75        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
76        let data = args.get(1).cloned().unwrap_or(Value::Null);
77        let opts = args.get(2).cloned().unwrap_or(Value::Null);
78        let (priority, delay_ms, max_attempts) = if let Value::Object(obj) = &opts {
79            (
80                obj.get("priority").map(|v| v.to_number() as i64).unwrap_or(0),
81                obj.get("delay").map(|v| v.to_number() as u64).unwrap_or(0),
82                obj.get("maxAttempts").map(|v| v.to_number() as u32).unwrap_or(3),
83            )
84        } else {
85            (0, 0, 3)
86        };
87        let now = now_ms();
88        let mut queues = s.lock().unwrap();
89        let q = queues.entry(queue_name).or_insert_with(|| Queue {
90            name: "default".to_string(),
91            jobs: HashMap::new(),
92            dead_letter: Vec::new(),
93            max_dead_letter: 1000,
94            paused: false,
95            processed: 0,
96            failed: 0,
97            next_id: 1,
98        });
99        let id = format!("job_{}", q.next_id);
100        q.next_id += 1;
101        let status = if delay_ms > 0 { "delayed" } else { "pending" };
102        let job = Job {
103            id: id.clone(),
104            data,
105            priority,
106            status: status.to_string(),
107            attempts: 0,
108            max_attempts,
109            created_at: now,
110            scheduled_for: now + delay_ms,
111            started_at: None,
112            completed_at: None,
113            failed_at: None,
114            error: None,
115            result: None,
116        };
117        q.jobs.insert(id.clone(), job);
118        let mut obj = indexmap::IndexMap::new();
119        obj.insert("id".to_string(), Value::String(id));
120        obj.insert("status".to_string(), Value::String(status.to_string()));
121        obj.insert("priority".to_string(), Value::Number(priority as f64));
122        Ok(Value::Object(obj))
123    });
124
125    // queue.pop queueName → job or null
126    let s = state.clone();
127    rp.register_builtin("queue.pop", move |args, _| {
128        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
129        let mut queues = s.lock().unwrap();
130        let q = match queues.get_mut(&queue_name) {
131            Some(q) => q,
132            None => return Ok(Value::Null),
133        };
134        if q.paused {
135            return Ok(Value::Null);
136        }
137        let now = now_ms();
138        // Promote delayed jobs
139        for job in q.jobs.values_mut() {
140            if job.status == "delayed" && job.scheduled_for <= now {
141                job.status = "pending".to_string();
142            }
143        }
144        // Find best pending job (highest priority, oldest first)
145        let best_id = q.jobs.values()
146            .filter(|j| j.status == "pending")
147            .max_by(|a, b| {
148                a.priority.cmp(&b.priority)
149                    .then_with(|| b.created_at.cmp(&a.created_at))
150            })
151            .map(|j| j.id.clone());
152
153        if let Some(id) = best_id {
154            let job = q.jobs.get_mut(&id).unwrap();
155            job.status = "active".to_string();
156            job.attempts += 1;
157            job.started_at = Some(now);
158            let mut obj = indexmap::IndexMap::new();
159            obj.insert("id".to_string(), Value::String(job.id.clone()));
160            obj.insert("data".to_string(), job.data.clone());
161            obj.insert("priority".to_string(), Value::Number(job.priority as f64));
162            obj.insert("attempts".to_string(), Value::Number(job.attempts as f64));
163            obj.insert("maxAttempts".to_string(), Value::Number(job.max_attempts as f64));
164            Ok(Value::Object(obj))
165        } else {
166            Ok(Value::Null)
167        }
168    });
169
170    // queue.complete queueName jobId result? → {id, status, duration}
171    let s = state.clone();
172    rp.register_builtin("queue.complete", move |args, _| {
173        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
174        let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
175        let result = args.get(2).cloned();
176        let mut queues = s.lock().unwrap();
177        let q = queues.get_mut(&queue_name)
178            .ok_or_else(|| format!("Queue \"{}\" not found", queue_name))?;
179        let job = q.jobs.get_mut(&job_id)
180            .ok_or_else(|| format!("Job \"{}\" not found in queue \"{}\"", job_id, queue_name))?;
181        let now = now_ms();
182        job.status = "completed".to_string();
183        job.completed_at = Some(now);
184        job.result = result;
185        q.processed += 1;
186        let duration = now - job.started_at.unwrap_or(job.created_at);
187        let mut obj = indexmap::IndexMap::new();
188        obj.insert("id".to_string(), Value::String(job_id));
189        obj.insert("status".to_string(), Value::String("completed".to_string()));
190        obj.insert("duration".to_string(), Value::Number(duration as f64));
191        Ok(Value::Object(obj))
192    });
193
194    // queue.fail queueName jobId error → {id, status, attempts}
195    let s = state.clone();
196    rp.register_builtin("queue.fail", move |args, _| {
197        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
198        let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
199        let error = args.get(2).map(|v| v.to_display_string()).unwrap_or_else(|| "Unknown error".to_string());
200        let mut queues = s.lock().unwrap();
201        let q = queues.get_mut(&queue_name)
202            .ok_or_else(|| format!("Queue \"{}\" not found", queue_name))?;
203        let job = q.jobs.get_mut(&job_id)
204            .ok_or_else(|| format!("Job \"{}\" not found in queue \"{}\"", job_id, queue_name))?;
205        job.error = Some(error.clone());
206        job.failed_at = Some(now_ms());
207        if job.attempts < job.max_attempts {
208            job.status = "pending".to_string();
209            let mut obj = indexmap::IndexMap::new();
210            obj.insert("id".to_string(), Value::String(job_id));
211            obj.insert("status".to_string(), Value::String("retry".to_string()));
212            obj.insert("attempts".to_string(), Value::Number(job.attempts as f64));
213            obj.insert("maxAttempts".to_string(), Value::Number(job.max_attempts as f64));
214            Ok(Value::Object(obj))
215        } else {
216            let mut dead_job = job.clone();
217            dead_job.status = "failed".to_string();
218            q.failed += 1;
219            q.dead_letter.push(dead_job);
220            q.jobs.remove(&job_id);
221            if q.dead_letter.len() > q.max_dead_letter {
222                q.dead_letter.remove(0);
223            }
224            let mut obj = indexmap::IndexMap::new();
225            obj.insert("id".to_string(), Value::String(job_id));
226            obj.insert("status".to_string(), Value::String("dead-letter".to_string()));
227            obj.insert("attempts".to_string(), Value::Number(q.dead_letter.last().map(|j| j.attempts).unwrap_or(0) as f64));
228            obj.insert("error".to_string(), Value::String(error));
229            Ok(Value::Object(obj))
230        }
231    });
232
233    // queue.retry queueName jobId → {id, status, source}
234    let s = state.clone();
235    rp.register_builtin("queue.retry", move |args, _| {
236        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
237        let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
238        let mut queues = s.lock().unwrap();
239        let q = queues.get_mut(&queue_name)
240            .ok_or_else(|| format!("Queue \"{}\" not found", queue_name))?;
241        // Check dead letter first
242        if let Some(idx) = q.dead_letter.iter().position(|j| j.id == job_id) {
243            let mut job = q.dead_letter.remove(idx);
244            job.status = "pending".to_string();
245            job.attempts = 0;
246            job.error = None;
247            job.failed_at = None;
248            q.jobs.insert(job.id.clone(), job);
249            let mut obj = indexmap::IndexMap::new();
250            obj.insert("id".to_string(), Value::String(job_id));
251            obj.insert("status".to_string(), Value::String("pending".to_string()));
252            obj.insert("source".to_string(), Value::String("dead-letter".to_string()));
253            return Ok(Value::Object(obj));
254        }
255        let job = q.jobs.get_mut(&job_id)
256            .ok_or_else(|| format!("Job \"{}\" not found", job_id))?;
257        job.status = "pending".to_string();
258        job.error = None;
259        let mut obj = indexmap::IndexMap::new();
260        obj.insert("id".to_string(), Value::String(job_id));
261        obj.insert("status".to_string(), Value::String("pending".to_string()));
262        obj.insert("source".to_string(), Value::String("queue".to_string()));
263        Ok(Value::Object(obj))
264    });
265
266    // queue.remove queueName jobId → bool
267    let s = state.clone();
268    rp.register_builtin("queue.remove", move |args, _| {
269        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
270        let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
271        let mut queues = s.lock().unwrap();
272        if let Some(q) = queues.get_mut(&queue_name) {
273            Ok(Value::Bool(q.jobs.remove(&job_id).is_some()))
274        } else {
275            Ok(Value::Bool(false))
276        }
277    });
278
279    // queue.size queueName status? → number
280    let s = state.clone();
281    rp.register_builtin("queue.size", move |args, _| {
282        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
283        let status_filter = args.get(1).map(|v| v.to_display_string());
284        let mut queues = s.lock().unwrap();
285        if let Some(q) = queues.get_mut(&queue_name) {
286            let now = now_ms();
287            for job in q.jobs.values_mut() {
288                if job.status == "delayed" && job.scheduled_for <= now {
289                    job.status = "pending".to_string();
290                }
291            }
292            let count = if let Some(status) = status_filter {
293                q.jobs.values().filter(|j| j.status == status).count()
294            } else {
295                q.jobs.len()
296            };
297            Ok(Value::Number(count as f64))
298        } else {
299            Ok(Value::Number(0.0))
300        }
301    });
302
303    // queue.status queueName → {name, paused, total, pending, active, ...}
304    let s = state.clone();
305    rp.register_builtin("queue.status", move |args, _| {
306        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
307        let mut queues = s.lock().unwrap();
308        if let Some(q) = queues.get_mut(&queue_name) {
309            let now = now_ms();
310            let mut counts: HashMap<&str, u64> = HashMap::new();
311            for job in q.jobs.values_mut() {
312                if job.status == "delayed" && job.scheduled_for <= now {
313                    job.status = "pending".to_string();
314                }
315                *counts.entry(match job.status.as_str() {
316                    "pending" => "pending",
317                    "active" => "active",
318                    "completed" => "completed",
319                    "failed" => "failed",
320                    "delayed" => "delayed",
321                    _ => "pending",
322                }).or_insert(0) += 1;
323            }
324            let mut obj = indexmap::IndexMap::new();
325            obj.insert("name".to_string(), Value::String(q.name.clone()));
326            obj.insert("paused".to_string(), Value::Bool(q.paused));
327            obj.insert("total".to_string(), Value::Number(q.jobs.len() as f64));
328            obj.insert("pending".to_string(), Value::Number(*counts.get("pending").unwrap_or(&0) as f64));
329            obj.insert("active".to_string(), Value::Number(*counts.get("active").unwrap_or(&0) as f64));
330            obj.insert("completed".to_string(), Value::Number(*counts.get("completed").unwrap_or(&0) as f64));
331            obj.insert("failed".to_string(), Value::Number(*counts.get("failed").unwrap_or(&0) as f64));
332            obj.insert("delayed".to_string(), Value::Number(*counts.get("delayed").unwrap_or(&0) as f64));
333            obj.insert("deadLetter".to_string(), Value::Number(q.dead_letter.len() as f64));
334            obj.insert("totalProcessed".to_string(), Value::Number(q.processed as f64));
335            obj.insert("totalFailed".to_string(), Value::Number(q.failed as f64));
336            Ok(Value::Object(obj))
337        } else {
338            let mut obj = indexmap::IndexMap::new();
339            obj.insert("name".to_string(), Value::String(queue_name));
340            obj.insert("total".to_string(), Value::Number(0.0));
341            Ok(Value::Object(obj))
342        }
343    });
344
345    // queue.pause queueName → {name, paused: true}
346    let s = state.clone();
347    rp.register_builtin("queue.pause", move |args, _| {
348        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
349        let mut queues = s.lock().unwrap();
350        if let Some(q) = queues.get_mut(&queue_name) {
351            q.paused = true;
352        }
353        let mut obj = indexmap::IndexMap::new();
354        obj.insert("name".to_string(), Value::String(queue_name));
355        obj.insert("paused".to_string(), Value::Bool(true));
356        Ok(Value::Object(obj))
357    });
358
359    // queue.resume queueName → {name, paused: false}
360    let s = state.clone();
361    rp.register_builtin("queue.resume", move |args, _| {
362        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
363        let mut queues = s.lock().unwrap();
364        if let Some(q) = queues.get_mut(&queue_name) {
365            q.paused = false;
366        }
367        let mut obj = indexmap::IndexMap::new();
368        obj.insert("name".to_string(), Value::String(queue_name));
369        obj.insert("paused".to_string(), Value::Bool(false));
370        Ok(Value::Object(obj))
371    });
372
373    // queue.clear queueName → {cleared: number}
374    let s = state.clone();
375    rp.register_builtin("queue.clear", move |args, _| {
376        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
377        let mut queues = s.lock().unwrap();
378        let count = if let Some(q) = queues.get_mut(&queue_name) {
379            let c = q.jobs.len();
380            q.jobs.clear();
381            c
382        } else {
383            0
384        };
385        let mut obj = indexmap::IndexMap::new();
386        obj.insert("cleared".to_string(), Value::Number(count as f64));
387        Ok(Value::Object(obj))
388    });
389
390    // queue.deadLetter queueName limit? → array of failed jobs
391    let s = state.clone();
392    rp.register_builtin("queue.deadLetter", move |args, _| {
393        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
394        let limit = args.get(1).map(|v| v.to_number() as usize).unwrap_or(50);
395        let queues = s.lock().unwrap();
396        if let Some(q) = queues.get(&queue_name) {
397            let start = if q.dead_letter.len() > limit { q.dead_letter.len() - limit } else { 0 };
398            let items: Vec<Value> = q.dead_letter[start..].iter().map(|j| {
399                let mut obj = indexmap::IndexMap::new();
400                obj.insert("id".to_string(), Value::String(j.id.clone()));
401                obj.insert("data".to_string(), j.data.clone());
402                obj.insert("error".to_string(), Value::String(j.error.clone().unwrap_or_default()));
403                obj.insert("attempts".to_string(), Value::Number(j.attempts as f64));
404                Ok(Value::Object(obj))
405            }).collect::<Result<Vec<_>, String>>()?;
406            Ok(Value::Array(items))
407        } else {
408            Ok(Value::Array(Vec::new()))
409        }
410    });
411
412    // queue.getJob queueName jobId → job object or null
413    let s = state.clone();
414    rp.register_builtin("queue.getJob", move |args, _| {
415        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
416        let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
417        let queues = s.lock().unwrap();
418        if let Some(q) = queues.get(&queue_name) {
419            if let Some(job) = q.jobs.get(&job_id) {
420                let mut obj = indexmap::IndexMap::new();
421                obj.insert("id".to_string(), Value::String(job.id.clone()));
422                obj.insert("data".to_string(), job.data.clone());
423                obj.insert("status".to_string(), Value::String(job.status.clone()));
424                obj.insert("priority".to_string(), Value::Number(job.priority as f64));
425                obj.insert("attempts".to_string(), Value::Number(job.attempts as f64));
426                obj.insert("maxAttempts".to_string(), Value::Number(job.max_attempts as f64));
427                if let Some(err) = &job.error {
428                    obj.insert("error".to_string(), Value::String(err.clone()));
429                }
430                return Ok(Value::Object(obj));
431            }
432        }
433        Ok(Value::Null)
434    });
435
436    // queue.destroy queueName → bool
437    let s = state.clone();
438    rp.register_builtin("queue.destroy", move |args, _| {
439        let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
440        let mut queues = s.lock().unwrap();
441        Ok(Value::Bool(queues.remove(&queue_name).is_some()))
442    });
443}