1use robinpath::{RobinPath, Value};
2use std::sync::{Arc, Mutex};
3use std::collections::HashMap;
4
5struct Queue {
6 name: String,
7 jobs: HashMap<String, Job>,
8 dead_letter: Vec<Job>,
9 max_dead_letter: usize,
10 paused: bool,
11 processed: u64,
12 failed: u64,
13 next_id: u64,
14}
15
16#[derive(Clone)]
17struct Job {
18 id: String,
19 data: Value,
20 priority: i64,
21 status: String,
22 attempts: u32,
23 max_attempts: u32,
24 created_at: u64,
25 scheduled_for: u64,
26 started_at: Option<u64>,
27 completed_at: Option<u64>,
28 failed_at: Option<u64>,
29 error: Option<String>,
30 result: Option<Value>,
31}
32
33fn now_ms() -> u64 {
34 std::time::SystemTime::now()
35 .duration_since(std::time::UNIX_EPOCH)
36 .unwrap_or_default()
37 .as_millis() as u64
38}
39
40pub fn register(rp: &mut RobinPath) {
41 let state = Arc::new(Mutex::new(HashMap::<String, Queue>::new()));
42
43 let s = state.clone();
45 rp.register_builtin("queue.create", move |args, _| {
46 let name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
47 let opts = args.get(1).cloned().unwrap_or(Value::Null);
48 let max_dl = if let Value::Object(obj) = &opts {
49 obj.get("maxDeadLetter").map(|v| v.to_number() as usize).unwrap_or(1000)
50 } else {
51 1000
52 };
53 let mut queues = s.lock().unwrap();
54 let q = queues.entry(name.clone()).or_insert_with(|| Queue {
55 name: name.clone(),
56 jobs: HashMap::new(),
57 dead_letter: Vec::new(),
58 max_dead_letter: max_dl,
59 paused: false,
60 processed: 0,
61 failed: 0,
62 next_id: 1,
63 });
64 q.max_dead_letter = max_dl;
65 let mut obj = indexmap::IndexMap::new();
66 obj.insert("name".to_string(), Value::String(q.name.clone()));
67 obj.insert("paused".to_string(), Value::Bool(q.paused));
68 obj.insert("maxDeadLetter".to_string(), Value::Number(q.max_dead_letter as f64));
69 Ok(Value::Object(obj))
70 });
71
72 let s = state.clone();
74 rp.register_builtin("queue.push", move |args, _| {
75 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
76 let data = args.get(1).cloned().unwrap_or(Value::Null);
77 let opts = args.get(2).cloned().unwrap_or(Value::Null);
78 let (priority, delay_ms, max_attempts) = if let Value::Object(obj) = &opts {
79 (
80 obj.get("priority").map(|v| v.to_number() as i64).unwrap_or(0),
81 obj.get("delay").map(|v| v.to_number() as u64).unwrap_or(0),
82 obj.get("maxAttempts").map(|v| v.to_number() as u32).unwrap_or(3),
83 )
84 } else {
85 (0, 0, 3)
86 };
87 let now = now_ms();
88 let mut queues = s.lock().unwrap();
89 let q = queues.entry(queue_name).or_insert_with(|| Queue {
90 name: "default".to_string(),
91 jobs: HashMap::new(),
92 dead_letter: Vec::new(),
93 max_dead_letter: 1000,
94 paused: false,
95 processed: 0,
96 failed: 0,
97 next_id: 1,
98 });
99 let id = format!("job_{}", q.next_id);
100 q.next_id += 1;
101 let status = if delay_ms > 0 { "delayed" } else { "pending" };
102 let job = Job {
103 id: id.clone(),
104 data,
105 priority,
106 status: status.to_string(),
107 attempts: 0,
108 max_attempts,
109 created_at: now,
110 scheduled_for: now + delay_ms,
111 started_at: None,
112 completed_at: None,
113 failed_at: None,
114 error: None,
115 result: None,
116 };
117 q.jobs.insert(id.clone(), job);
118 let mut obj = indexmap::IndexMap::new();
119 obj.insert("id".to_string(), Value::String(id));
120 obj.insert("status".to_string(), Value::String(status.to_string()));
121 obj.insert("priority".to_string(), Value::Number(priority as f64));
122 Ok(Value::Object(obj))
123 });
124
125 let s = state.clone();
127 rp.register_builtin("queue.pop", move |args, _| {
128 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
129 let mut queues = s.lock().unwrap();
130 let q = match queues.get_mut(&queue_name) {
131 Some(q) => q,
132 None => return Ok(Value::Null),
133 };
134 if q.paused {
135 return Ok(Value::Null);
136 }
137 let now = now_ms();
138 for job in q.jobs.values_mut() {
140 if job.status == "delayed" && job.scheduled_for <= now {
141 job.status = "pending".to_string();
142 }
143 }
144 let best_id = q.jobs.values()
146 .filter(|j| j.status == "pending")
147 .max_by(|a, b| {
148 a.priority.cmp(&b.priority)
149 .then_with(|| b.created_at.cmp(&a.created_at))
150 })
151 .map(|j| j.id.clone());
152
153 if let Some(id) = best_id {
154 let job = q.jobs.get_mut(&id).unwrap();
155 job.status = "active".to_string();
156 job.attempts += 1;
157 job.started_at = Some(now);
158 let mut obj = indexmap::IndexMap::new();
159 obj.insert("id".to_string(), Value::String(job.id.clone()));
160 obj.insert("data".to_string(), job.data.clone());
161 obj.insert("priority".to_string(), Value::Number(job.priority as f64));
162 obj.insert("attempts".to_string(), Value::Number(job.attempts as f64));
163 obj.insert("maxAttempts".to_string(), Value::Number(job.max_attempts as f64));
164 Ok(Value::Object(obj))
165 } else {
166 Ok(Value::Null)
167 }
168 });
169
170 let s = state.clone();
172 rp.register_builtin("queue.complete", move |args, _| {
173 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
174 let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
175 let result = args.get(2).cloned();
176 let mut queues = s.lock().unwrap();
177 let q = queues.get_mut(&queue_name)
178 .ok_or_else(|| format!("Queue \"{}\" not found", queue_name))?;
179 let job = q.jobs.get_mut(&job_id)
180 .ok_or_else(|| format!("Job \"{}\" not found in queue \"{}\"", job_id, queue_name))?;
181 let now = now_ms();
182 job.status = "completed".to_string();
183 job.completed_at = Some(now);
184 job.result = result;
185 q.processed += 1;
186 let duration = now - job.started_at.unwrap_or(job.created_at);
187 let mut obj = indexmap::IndexMap::new();
188 obj.insert("id".to_string(), Value::String(job_id));
189 obj.insert("status".to_string(), Value::String("completed".to_string()));
190 obj.insert("duration".to_string(), Value::Number(duration as f64));
191 Ok(Value::Object(obj))
192 });
193
194 let s = state.clone();
196 rp.register_builtin("queue.fail", move |args, _| {
197 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
198 let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
199 let error = args.get(2).map(|v| v.to_display_string()).unwrap_or_else(|| "Unknown error".to_string());
200 let mut queues = s.lock().unwrap();
201 let q = queues.get_mut(&queue_name)
202 .ok_or_else(|| format!("Queue \"{}\" not found", queue_name))?;
203 let job = q.jobs.get_mut(&job_id)
204 .ok_or_else(|| format!("Job \"{}\" not found in queue \"{}\"", job_id, queue_name))?;
205 job.error = Some(error.clone());
206 job.failed_at = Some(now_ms());
207 if job.attempts < job.max_attempts {
208 job.status = "pending".to_string();
209 let mut obj = indexmap::IndexMap::new();
210 obj.insert("id".to_string(), Value::String(job_id));
211 obj.insert("status".to_string(), Value::String("retry".to_string()));
212 obj.insert("attempts".to_string(), Value::Number(job.attempts as f64));
213 obj.insert("maxAttempts".to_string(), Value::Number(job.max_attempts as f64));
214 Ok(Value::Object(obj))
215 } else {
216 let mut dead_job = job.clone();
217 dead_job.status = "failed".to_string();
218 q.failed += 1;
219 q.dead_letter.push(dead_job);
220 q.jobs.remove(&job_id);
221 if q.dead_letter.len() > q.max_dead_letter {
222 q.dead_letter.remove(0);
223 }
224 let mut obj = indexmap::IndexMap::new();
225 obj.insert("id".to_string(), Value::String(job_id));
226 obj.insert("status".to_string(), Value::String("dead-letter".to_string()));
227 obj.insert("attempts".to_string(), Value::Number(q.dead_letter.last().map(|j| j.attempts).unwrap_or(0) as f64));
228 obj.insert("error".to_string(), Value::String(error));
229 Ok(Value::Object(obj))
230 }
231 });
232
233 let s = state.clone();
235 rp.register_builtin("queue.retry", move |args, _| {
236 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
237 let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
238 let mut queues = s.lock().unwrap();
239 let q = queues.get_mut(&queue_name)
240 .ok_or_else(|| format!("Queue \"{}\" not found", queue_name))?;
241 if let Some(idx) = q.dead_letter.iter().position(|j| j.id == job_id) {
243 let mut job = q.dead_letter.remove(idx);
244 job.status = "pending".to_string();
245 job.attempts = 0;
246 job.error = None;
247 job.failed_at = None;
248 q.jobs.insert(job.id.clone(), job);
249 let mut obj = indexmap::IndexMap::new();
250 obj.insert("id".to_string(), Value::String(job_id));
251 obj.insert("status".to_string(), Value::String("pending".to_string()));
252 obj.insert("source".to_string(), Value::String("dead-letter".to_string()));
253 return Ok(Value::Object(obj));
254 }
255 let job = q.jobs.get_mut(&job_id)
256 .ok_or_else(|| format!("Job \"{}\" not found", job_id))?;
257 job.status = "pending".to_string();
258 job.error = None;
259 let mut obj = indexmap::IndexMap::new();
260 obj.insert("id".to_string(), Value::String(job_id));
261 obj.insert("status".to_string(), Value::String("pending".to_string()));
262 obj.insert("source".to_string(), Value::String("queue".to_string()));
263 Ok(Value::Object(obj))
264 });
265
266 let s = state.clone();
268 rp.register_builtin("queue.remove", move |args, _| {
269 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
270 let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
271 let mut queues = s.lock().unwrap();
272 if let Some(q) = queues.get_mut(&queue_name) {
273 Ok(Value::Bool(q.jobs.remove(&job_id).is_some()))
274 } else {
275 Ok(Value::Bool(false))
276 }
277 });
278
279 let s = state.clone();
281 rp.register_builtin("queue.size", move |args, _| {
282 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
283 let status_filter = args.get(1).map(|v| v.to_display_string());
284 let mut queues = s.lock().unwrap();
285 if let Some(q) = queues.get_mut(&queue_name) {
286 let now = now_ms();
287 for job in q.jobs.values_mut() {
288 if job.status == "delayed" && job.scheduled_for <= now {
289 job.status = "pending".to_string();
290 }
291 }
292 let count = if let Some(status) = status_filter {
293 q.jobs.values().filter(|j| j.status == status).count()
294 } else {
295 q.jobs.len()
296 };
297 Ok(Value::Number(count as f64))
298 } else {
299 Ok(Value::Number(0.0))
300 }
301 });
302
303 let s = state.clone();
305 rp.register_builtin("queue.status", move |args, _| {
306 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
307 let mut queues = s.lock().unwrap();
308 if let Some(q) = queues.get_mut(&queue_name) {
309 let now = now_ms();
310 let mut counts: HashMap<&str, u64> = HashMap::new();
311 for job in q.jobs.values_mut() {
312 if job.status == "delayed" && job.scheduled_for <= now {
313 job.status = "pending".to_string();
314 }
315 *counts.entry(match job.status.as_str() {
316 "pending" => "pending",
317 "active" => "active",
318 "completed" => "completed",
319 "failed" => "failed",
320 "delayed" => "delayed",
321 _ => "pending",
322 }).or_insert(0) += 1;
323 }
324 let mut obj = indexmap::IndexMap::new();
325 obj.insert("name".to_string(), Value::String(q.name.clone()));
326 obj.insert("paused".to_string(), Value::Bool(q.paused));
327 obj.insert("total".to_string(), Value::Number(q.jobs.len() as f64));
328 obj.insert("pending".to_string(), Value::Number(*counts.get("pending").unwrap_or(&0) as f64));
329 obj.insert("active".to_string(), Value::Number(*counts.get("active").unwrap_or(&0) as f64));
330 obj.insert("completed".to_string(), Value::Number(*counts.get("completed").unwrap_or(&0) as f64));
331 obj.insert("failed".to_string(), Value::Number(*counts.get("failed").unwrap_or(&0) as f64));
332 obj.insert("delayed".to_string(), Value::Number(*counts.get("delayed").unwrap_or(&0) as f64));
333 obj.insert("deadLetter".to_string(), Value::Number(q.dead_letter.len() as f64));
334 obj.insert("totalProcessed".to_string(), Value::Number(q.processed as f64));
335 obj.insert("totalFailed".to_string(), Value::Number(q.failed as f64));
336 Ok(Value::Object(obj))
337 } else {
338 let mut obj = indexmap::IndexMap::new();
339 obj.insert("name".to_string(), Value::String(queue_name));
340 obj.insert("total".to_string(), Value::Number(0.0));
341 Ok(Value::Object(obj))
342 }
343 });
344
345 let s = state.clone();
347 rp.register_builtin("queue.pause", move |args, _| {
348 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
349 let mut queues = s.lock().unwrap();
350 if let Some(q) = queues.get_mut(&queue_name) {
351 q.paused = true;
352 }
353 let mut obj = indexmap::IndexMap::new();
354 obj.insert("name".to_string(), Value::String(queue_name));
355 obj.insert("paused".to_string(), Value::Bool(true));
356 Ok(Value::Object(obj))
357 });
358
359 let s = state.clone();
361 rp.register_builtin("queue.resume", move |args, _| {
362 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
363 let mut queues = s.lock().unwrap();
364 if let Some(q) = queues.get_mut(&queue_name) {
365 q.paused = false;
366 }
367 let mut obj = indexmap::IndexMap::new();
368 obj.insert("name".to_string(), Value::String(queue_name));
369 obj.insert("paused".to_string(), Value::Bool(false));
370 Ok(Value::Object(obj))
371 });
372
373 let s = state.clone();
375 rp.register_builtin("queue.clear", move |args, _| {
376 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
377 let mut queues = s.lock().unwrap();
378 let count = if let Some(q) = queues.get_mut(&queue_name) {
379 let c = q.jobs.len();
380 q.jobs.clear();
381 c
382 } else {
383 0
384 };
385 let mut obj = indexmap::IndexMap::new();
386 obj.insert("cleared".to_string(), Value::Number(count as f64));
387 Ok(Value::Object(obj))
388 });
389
390 let s = state.clone();
392 rp.register_builtin("queue.deadLetter", move |args, _| {
393 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
394 let limit = args.get(1).map(|v| v.to_number() as usize).unwrap_or(50);
395 let queues = s.lock().unwrap();
396 if let Some(q) = queues.get(&queue_name) {
397 let start = if q.dead_letter.len() > limit { q.dead_letter.len() - limit } else { 0 };
398 let items: Vec<Value> = q.dead_letter[start..].iter().map(|j| {
399 let mut obj = indexmap::IndexMap::new();
400 obj.insert("id".to_string(), Value::String(j.id.clone()));
401 obj.insert("data".to_string(), j.data.clone());
402 obj.insert("error".to_string(), Value::String(j.error.clone().unwrap_or_default()));
403 obj.insert("attempts".to_string(), Value::Number(j.attempts as f64));
404 Ok(Value::Object(obj))
405 }).collect::<Result<Vec<_>, String>>()?;
406 Ok(Value::Array(items))
407 } else {
408 Ok(Value::Array(Vec::new()))
409 }
410 });
411
412 let s = state.clone();
414 rp.register_builtin("queue.getJob", move |args, _| {
415 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
416 let job_id = args.get(1).map(|v| v.to_display_string()).unwrap_or_default();
417 let queues = s.lock().unwrap();
418 if let Some(q) = queues.get(&queue_name) {
419 if let Some(job) = q.jobs.get(&job_id) {
420 let mut obj = indexmap::IndexMap::new();
421 obj.insert("id".to_string(), Value::String(job.id.clone()));
422 obj.insert("data".to_string(), job.data.clone());
423 obj.insert("status".to_string(), Value::String(job.status.clone()));
424 obj.insert("priority".to_string(), Value::Number(job.priority as f64));
425 obj.insert("attempts".to_string(), Value::Number(job.attempts as f64));
426 obj.insert("maxAttempts".to_string(), Value::Number(job.max_attempts as f64));
427 if let Some(err) = &job.error {
428 obj.insert("error".to_string(), Value::String(err.clone()));
429 }
430 return Ok(Value::Object(obj));
431 }
432 }
433 Ok(Value::Null)
434 });
435
436 let s = state.clone();
438 rp.register_builtin("queue.destroy", move |args, _| {
439 let queue_name = args.first().map(|v| v.to_display_string()).unwrap_or_else(|| "default".to_string());
440 let mut queues = s.lock().unwrap();
441 Ok(Value::Bool(queues.remove(&queue_name).is_some()))
442 });
443}