1use robinpath::{RobinPath, Value};
2use std::sync::{LazyLock, Mutex};
3use std::collections::HashMap;
4
5struct Queue {
6 name: String,
7 jobs: HashMap<String, Job>,
8 dead_letter: Vec<Job>,
9 max_dead_letter: usize,
10 paused: bool,
11 processed: u64,
12 failed: u64,
13 next_id: u64,
14}
15
16#[derive(Clone)]
17struct Job {
18 id: String,
19 data: Value,
20 priority: i64,
21 status: String,
22 attempts: u32,
23 max_attempts: u32,
24 created_at: u64,
25 scheduled_for: u64,
26 started_at: Option<u64>,
27 completed_at: Option<u64>,
28 failed_at: Option<u64>,
29 error: Option<String>,
30 result: Option<Value>,
31}
32
33static QUEUES: LazyLock<Mutex<HashMap<String, Queue>>> =
34 LazyLock::new(|| Mutex::new(HashMap::new()));
35
36fn now_ms() -> u64 {
37 std::time::SystemTime::now()
38 .duration_since(std::time::UNIX_EPOCH)
39 .unwrap_or_default()
40 .as_millis() as u64
41}
42
43pub fn register(rp: &mut RobinPath) {
44 rp.register_builtin("queue.create", |args, _| {
46 let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
47 let opts = args.get(1).cloned().unwrap_or(Value::Null);
48 let max_dl = if let Value::Object(obj) = &opts {
49 obj.get("maxDeadLetter").map(|v| v.to_number() as usize).unwrap_or(1000)
50 } else {
51 1000
52 };
53 let mut queues = QUEUES.lock().unwrap();
54 let q = queues.entry(name.clone()).or_insert_with(|| Queue {
55 name: name.clone(),
56 jobs: HashMap::new(),
57 dead_letter: Vec::new(),
58 max_dead_letter: max_dl,
59 paused: false,
60 processed: 0,
61 failed: 0,
62 next_id: 1,
63 });
64 q.max_dead_letter = max_dl;
65 let mut obj = indexmap::IndexMap::new();
66 obj.insert("name".to_string(), Value::String(q.name.clone()));
67 obj.insert("paused".to_string(), Value::Bool(q.paused));
68 obj.insert("maxDeadLetter".to_string(), Value::Number(q.max_dead_letter as f64));
69 Ok(Value::Object(obj))
70 });
71
72 rp.register_builtin("queue.push", |args, _| {
74 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
75 let data = args.get(1).cloned().unwrap_or(Value::Null);
76 let opts = args.get(2).cloned().unwrap_or(Value::Null);
77 let (priority, delay_ms, max_attempts) = if let Value::Object(obj) = &opts {
78 (
79 obj.get("priority").map(|v| v.to_number() as i64).unwrap_or(0),
80 obj.get("delay").map(|v| v.to_number() as u64).unwrap_or(0),
81 obj.get("maxAttempts").map(|v| v.to_number() as u32).unwrap_or(3),
82 )
83 } else {
84 (0, 0, 3)
85 };
86 let now = now_ms();
87 let mut queues = QUEUES.lock().unwrap();
88 let q = queues.entry(queue_name).or_insert_with(|| Queue {
89 name: "default".to_string(),
90 jobs: HashMap::new(),
91 dead_letter: Vec::new(),
92 max_dead_letter: 1000,
93 paused: false,
94 processed: 0,
95 failed: 0,
96 next_id: 1,
97 });
98 let id = format!("job_{}", q.next_id);
99 q.next_id += 1;
100 let status = if delay_ms > 0 { "delayed" } else { "pending" };
101 let job = Job {
102 id: id.clone(),
103 data,
104 priority,
105 status: status.to_string(),
106 attempts: 0,
107 max_attempts,
108 created_at: now,
109 scheduled_for: now + delay_ms,
110 started_at: None,
111 completed_at: None,
112 failed_at: None,
113 error: None,
114 result: None,
115 };
116 q.jobs.insert(id.clone(), job);
117 let mut obj = indexmap::IndexMap::new();
118 obj.insert("id".to_string(), Value::String(id));
119 obj.insert("status".to_string(), Value::String(status.to_string()));
120 obj.insert("priority".to_string(), Value::Number(priority as f64));
121 Ok(Value::Object(obj))
122 });
123
124 rp.register_builtin("queue.pop", |args, _| {
126 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
127 let mut queues = QUEUES.lock().unwrap();
128 let q = match queues.get_mut(&queue_name) {
129 Some(q) => q,
130 None => return Ok(Value::Null),
131 };
132 if q.paused {
133 return Ok(Value::Null);
134 }
135 let now = now_ms();
136 for job in q.jobs.values_mut() {
138 if job.status == "delayed" && job.scheduled_for <= now {
139 job.status = "pending".to_string();
140 }
141 }
142 let best_id = q.jobs.values()
144 .filter(|j| j.status == "pending")
145 .max_by(|a, b| {
146 a.priority.cmp(&b.priority)
147 .then_with(|| b.created_at.cmp(&a.created_at))
148 })
149 .map(|j| j.id.clone());
150
151 if let Some(id) = best_id {
152 let job = q.jobs.get_mut(&id).unwrap();
153 job.status = "active".to_string();
154 job.attempts += 1;
155 job.started_at = Some(now);
156 let mut obj = indexmap::IndexMap::new();
157 obj.insert("id".to_string(), Value::String(job.id.clone()));
158 obj.insert("data".to_string(), job.data.clone());
159 obj.insert("priority".to_string(), Value::Number(job.priority as f64));
160 obj.insert("attempts".to_string(), Value::Number(job.attempts as f64));
161 obj.insert("maxAttempts".to_string(), Value::Number(job.max_attempts as f64));
162 Ok(Value::Object(obj))
163 } else {
164 Ok(Value::Null)
165 }
166 });
167
168 rp.register_builtin("queue.complete", |args, _| {
170 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
171 let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
172 let result = args.get(2).cloned();
173 let mut queues = QUEUES.lock().unwrap();
174 let q = queues.get_mut(&queue_name)
175 .ok_or_else(|| format!("Queue \"{}\" not found", queue_name))?;
176 let job = q.jobs.get_mut(&job_id)
177 .ok_or_else(|| format!("Job \"{}\" not found in queue \"{}\"", job_id, queue_name))?;
178 let now = now_ms();
179 job.status = "completed".to_string();
180 job.completed_at = Some(now);
181 job.result = result;
182 q.processed += 1;
183 let duration = now - job.started_at.unwrap_or(job.created_at);
184 let mut obj = indexmap::IndexMap::new();
185 obj.insert("id".to_string(), Value::String(job_id));
186 obj.insert("status".to_string(), Value::String("completed".to_string()));
187 obj.insert("duration".to_string(), Value::Number(duration as f64));
188 Ok(Value::Object(obj))
189 });
190
191 rp.register_builtin("queue.fail", |args, _| {
193 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
194 let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
195 let error = args.get(2).map(|v| v.to_display_string()).unwrap_or_else(|| "Unknown error".to_string());
196 let mut queues = QUEUES.lock().unwrap();
197 let q = queues.get_mut(&queue_name)
198 .ok_or_else(|| format!("Queue \"{}\" not found", queue_name))?;
199 let job = q.jobs.get_mut(&job_id)
200 .ok_or_else(|| format!("Job \"{}\" not found in queue \"{}\"", job_id, queue_name))?;
201 job.error = Some(error.clone());
202 job.failed_at = Some(now_ms());
203 if job.attempts < job.max_attempts {
204 job.status = "pending".to_string();
205 let mut obj = indexmap::IndexMap::new();
206 obj.insert("id".to_string(), Value::String(job_id));
207 obj.insert("status".to_string(), Value::String("retry".to_string()));
208 obj.insert("attempts".to_string(), Value::Number(job.attempts as f64));
209 obj.insert("maxAttempts".to_string(), Value::Number(job.max_attempts as f64));
210 Ok(Value::Object(obj))
211 } else {
212 let mut dead_job = job.clone();
213 dead_job.status = "failed".to_string();
214 q.failed += 1;
215 q.dead_letter.push(dead_job);
216 q.jobs.remove(&job_id);
217 if q.dead_letter.len() > q.max_dead_letter {
218 q.dead_letter.remove(0);
219 }
220 let mut obj = indexmap::IndexMap::new();
221 obj.insert("id".to_string(), Value::String(job_id));
222 obj.insert("status".to_string(), Value::String("dead-letter".to_string()));
223 obj.insert("attempts".to_string(), Value::Number(q.dead_letter.last().map(|j| j.attempts).unwrap_or(0) as f64));
224 obj.insert("error".to_string(), Value::String(error));
225 Ok(Value::Object(obj))
226 }
227 });
228
229 rp.register_builtin("queue.retry", |args, _| {
231 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
232 let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
233 let mut queues = QUEUES.lock().unwrap();
234 let q = queues.get_mut(&queue_name)
235 .ok_or_else(|| format!("Queue \"{}\" not found", queue_name))?;
236 if let Some(idx) = q.dead_letter.iter().position(|j| j.id == job_id) {
238 let mut job = q.dead_letter.remove(idx);
239 job.status = "pending".to_string();
240 job.attempts = 0;
241 job.error = None;
242 job.failed_at = None;
243 q.jobs.insert(job.id.clone(), job);
244 let mut obj = indexmap::IndexMap::new();
245 obj.insert("id".to_string(), Value::String(job_id));
246 obj.insert("status".to_string(), Value::String("pending".to_string()));
247 obj.insert("source".to_string(), Value::String("dead-letter".to_string()));
248 return Ok(Value::Object(obj));
249 }
250 let job = q.jobs.get_mut(&job_id)
251 .ok_or_else(|| format!("Job \"{}\" not found", job_id))?;
252 job.status = "pending".to_string();
253 job.error = None;
254 let mut obj = indexmap::IndexMap::new();
255 obj.insert("id".to_string(), Value::String(job_id));
256 obj.insert("status".to_string(), Value::String("pending".to_string()));
257 obj.insert("source".to_string(), Value::String("queue".to_string()));
258 Ok(Value::Object(obj))
259 });
260
261 rp.register_builtin("queue.remove", |args, _| {
263 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
264 let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
265 let mut queues = QUEUES.lock().unwrap();
266 if let Some(q) = queues.get_mut(&queue_name) {
267 Ok(Value::Bool(q.jobs.remove(&job_id).is_some()))
268 } else {
269 Ok(Value::Bool(false))
270 }
271 });
272
273 rp.register_builtin("queue.size", |args, _| {
275 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
276 let status_filter = args.get(1).map(|v| v.to_display_string());
277 let mut queues = QUEUES.lock().unwrap();
278 if let Some(q) = queues.get_mut(&queue_name) {
279 let now = now_ms();
280 for job in q.jobs.values_mut() {
281 if job.status == "delayed" && job.scheduled_for <= now {
282 job.status = "pending".to_string();
283 }
284 }
285 let count = if let Some(status) = status_filter {
286 q.jobs.values().filter(|j| j.status == status).count()
287 } else {
288 q.jobs.len()
289 };
290 Ok(Value::Number(count as f64))
291 } else {
292 Ok(Value::Number(0.0))
293 }
294 });
295
296 rp.register_builtin("queue.status", |args, _| {
298 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
299 let mut queues = QUEUES.lock().unwrap();
300 if let Some(q) = queues.get_mut(&queue_name) {
301 let now = now_ms();
302 let mut counts: HashMap<&str, u64> = HashMap::new();
303 for job in q.jobs.values_mut() {
304 if job.status == "delayed" && job.scheduled_for <= now {
305 job.status = "pending".to_string();
306 }
307 *counts.entry(match job.status.as_str() {
308 "pending" => "pending",
309 "active" => "active",
310 "completed" => "completed",
311 "failed" => "failed",
312 "delayed" => "delayed",
313 _ => "pending",
314 }).or_insert(0) += 1;
315 }
316 let mut obj = indexmap::IndexMap::new();
317 obj.insert("name".to_string(), Value::String(q.name.clone()));
318 obj.insert("paused".to_string(), Value::Bool(q.paused));
319 obj.insert("total".to_string(), Value::Number(q.jobs.len() as f64));
320 obj.insert("pending".to_string(), Value::Number(*counts.get("pending").unwrap_or(&0) as f64));
321 obj.insert("active".to_string(), Value::Number(*counts.get("active").unwrap_or(&0) as f64));
322 obj.insert("completed".to_string(), Value::Number(*counts.get("completed").unwrap_or(&0) as f64));
323 obj.insert("failed".to_string(), Value::Number(*counts.get("failed").unwrap_or(&0) as f64));
324 obj.insert("delayed".to_string(), Value::Number(*counts.get("delayed").unwrap_or(&0) as f64));
325 obj.insert("deadLetter".to_string(), Value::Number(q.dead_letter.len() as f64));
326 obj.insert("totalProcessed".to_string(), Value::Number(q.processed as f64));
327 obj.insert("totalFailed".to_string(), Value::Number(q.failed as f64));
328 Ok(Value::Object(obj))
329 } else {
330 let mut obj = indexmap::IndexMap::new();
331 obj.insert("name".to_string(), Value::String(queue_name));
332 obj.insert("total".to_string(), Value::Number(0.0));
333 Ok(Value::Object(obj))
334 }
335 });
336
337 rp.register_builtin("queue.pause", |args, _| {
339 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
340 let mut queues = QUEUES.lock().unwrap();
341 if let Some(q) = queues.get_mut(&queue_name) {
342 q.paused = true;
343 }
344 let mut obj = indexmap::IndexMap::new();
345 obj.insert("name".to_string(), Value::String(queue_name));
346 obj.insert("paused".to_string(), Value::Bool(true));
347 Ok(Value::Object(obj))
348 });
349
350 rp.register_builtin("queue.resume", |args, _| {
352 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
353 let mut queues = QUEUES.lock().unwrap();
354 if let Some(q) = queues.get_mut(&queue_name) {
355 q.paused = false;
356 }
357 let mut obj = indexmap::IndexMap::new();
358 obj.insert("name".to_string(), Value::String(queue_name));
359 obj.insert("paused".to_string(), Value::Bool(false));
360 Ok(Value::Object(obj))
361 });
362
363 rp.register_builtin("queue.clear", |args, _| {
365 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
366 let mut queues = QUEUES.lock().unwrap();
367 let count = if let Some(q) = queues.get_mut(&queue_name) {
368 let c = q.jobs.len();
369 q.jobs.clear();
370 c
371 } else {
372 0
373 };
374 let mut obj = indexmap::IndexMap::new();
375 obj.insert("cleared".to_string(), Value::Number(count as f64));
376 Ok(Value::Object(obj))
377 });
378
379 rp.register_builtin("queue.deadLetter", |args, _| {
381 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
382 let limit = args.get(1).map(|v| v.to_number() as usize).unwrap_or(50);
383 let queues = QUEUES.lock().unwrap();
384 if let Some(q) = queues.get(&queue_name) {
385 let start = if q.dead_letter.len() > limit { q.dead_letter.len() - limit } else { 0 };
386 let items: Vec<Value> = q.dead_letter[start..].iter().map(|j| {
387 let mut obj = indexmap::IndexMap::new();
388 obj.insert("id".to_string(), Value::String(j.id.clone()));
389 obj.insert("data".to_string(), j.data.clone());
390 obj.insert("error".to_string(), Value::String(j.error.clone().unwrap_or_default()));
391 obj.insert("attempts".to_string(), Value::Number(j.attempts as f64));
392 Ok(Value::Object(obj))
393 }).collect::<Result<Vec<_>, String>>()?;
394 Ok(Value::Array(items))
395 } else {
396 Ok(Value::Array(Vec::new()))
397 }
398 });
399
400 rp.register_builtin("queue.getJob", |args, _| {
402 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
403 let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
404 let queues = QUEUES.lock().unwrap();
405 if let Some(q) = queues.get(&queue_name) {
406 if let Some(job) = q.jobs.get(&job_id) {
407 let mut obj = indexmap::IndexMap::new();
408 obj.insert("id".to_string(), Value::String(job.id.clone()));
409 obj.insert("data".to_string(), job.data.clone());
410 obj.insert("status".to_string(), Value::String(job.status.clone()));
411 obj.insert("priority".to_string(), Value::Number(job.priority as f64));
412 obj.insert("attempts".to_string(), Value::Number(job.attempts as f64));
413 obj.insert("maxAttempts".to_string(), Value::Number(job.max_attempts as f64));
414 if let Some(err) = &job.error {
415 obj.insert("error".to_string(), Value::String(err.clone()));
416 }
417 return Ok(Value::Object(obj));
418 }
419 }
420 Ok(Value::Null)
421 });
422
423 rp.register_builtin("queue.destroy", |args, _| {
425 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
426 let mut queues = QUEUES.lock().unwrap();
427 Ok(Value::Bool(queues.remove(&queue_name).is_some()))
428 });
429}