yarli_cli/yarli-queue/src/
memory.rs1use std::collections::HashMap;
7use std::sync::RwLock;
8
9use chrono::{DateTime, Duration, Utc};
10use uuid::Uuid;
11
12use crate::yarli_core::domain::{CommandClass, RunId, TaskId};
13
14use crate::yarli_queue::error::QueueError;
15use crate::yarli_queue::queue::{
16 ClaimRequest, ConcurrencyConfig, QueueEntry, QueueStats, QueueStatus, TaskQueue,
17};
18
19const PRIORITY_AGING_WINDOW_SECONDS: i64 = 60;
20
21fn priority_for_claim(priority: u32, available_at: DateTime<Utc>, now: DateTime<Utc>) -> i64 {
22 let age_seconds = (now - available_at).num_seconds().max(0);
23 let age_boost = age_seconds / PRIORITY_AGING_WINDOW_SECONDS;
24 priority as i64 + age_boost
25}
26
27pub struct InMemoryTaskQueue {
29 inner: RwLock<QueueInner>,
30}
31
32struct QueueInner {
33 entries: Vec<QueueEntry>,
34 id_index: HashMap<Uuid, usize>,
36 active_tasks: HashMap<TaskId, Uuid>,
38}
39
40impl InMemoryTaskQueue {
41 pub fn new() -> Self {
42 Self {
43 inner: RwLock::new(QueueInner {
44 entries: Vec::new(),
45 id_index: HashMap::new(),
46 active_tasks: HashMap::new(),
47 }),
48 }
49 }
50}
51
52impl Default for InMemoryTaskQueue {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl TaskQueue for InMemoryTaskQueue {
59 fn enqueue(
60 &self,
61 task_id: TaskId,
62 run_id: RunId,
63 priority: u32,
64 command_class: CommandClass,
65 available_at: Option<DateTime<Utc>>,
66 ) -> Result<Uuid, QueueError> {
67 let mut inner = self.inner.write().unwrap();
68
69 if inner.active_tasks.contains_key(&task_id) {
71 return Err(QueueError::DuplicateTask(task_id));
72 }
73
74 let now = Utc::now();
75 let queue_id = Uuid::now_v7();
76 let entry = QueueEntry {
77 queue_id,
78 task_id,
79 run_id,
80 priority,
81 available_at: available_at.unwrap_or(now),
82 attempt_no: 1,
83 command_class,
84 status: QueueStatus::Pending,
85 lease_owner: None,
86 lease_expires_at: None,
87 last_heartbeat: None,
88 rehydration_tokens: None,
89 created_at: now,
90 updated_at: now,
91 };
92
93 let idx = inner.entries.len();
94 inner.entries.push(entry);
95 inner.id_index.insert(queue_id, idx);
96 inner.active_tasks.insert(task_id, queue_id);
97
98 Ok(queue_id)
99 }
100
101 fn claim(
102 &self,
103 request: &ClaimRequest,
104 config: &ConcurrencyConfig,
105 ) -> Result<Vec<QueueEntry>, QueueError> {
106 let mut inner = self.inner.write().unwrap();
107 let now = Utc::now();
108
109 let mut leased_by_run: HashMap<RunId, usize> = HashMap::new();
111 let mut leased_by_class: HashMap<CommandClass, usize> = HashMap::new();
112 for entry in &inner.entries {
113 if entry.status == QueueStatus::Leased {
114 *leased_by_run.entry(entry.run_id).or_default() += 1;
115 *leased_by_class.entry(entry.command_class).or_default() += 1;
116 }
117 }
118
119 let allowed = &request.allowed_run_ids;
121 let mut candidates: Vec<usize> = inner
122 .entries
123 .iter()
124 .enumerate()
125 .filter(|(_, e)| {
126 e.status == QueueStatus::Pending
127 && e.available_at <= now
128 && allowed.as_ref().map_or(true, |ids| ids.contains(&e.run_id))
129 })
130 .map(|(i, _)| i)
131 .collect();
132
133 candidates.sort_by(|&a, &b| {
134 let ea = &inner.entries[a];
135 let eb = &inner.entries[b];
136 priority_for_claim(eb.priority, eb.available_at, now)
137 .cmp(&priority_for_claim(ea.priority, ea.available_at, now))
138 .then(eb.priority.cmp(&ea.priority))
139 .then(ea.available_at.cmp(&eb.available_at))
140 .then(ea.queue_id.cmp(&eb.queue_id))
141 });
142
143 let mut claimed = Vec::new();
144 let lease_expires = now + request.lease_ttl;
145
146 for idx in candidates {
147 if claimed.len() >= request.limit {
148 break;
149 }
150
151 let entry = &inner.entries[idx];
152
153 let run_count = leased_by_run.get(&entry.run_id).copied().unwrap_or(0);
155 if run_count >= config.per_run_cap {
156 continue;
157 }
158
159 let class_count = leased_by_class
161 .get(&entry.command_class)
162 .copied()
163 .unwrap_or(0);
164 if class_count >= config.cap_for(entry.command_class) {
165 continue;
166 }
167
168 let entry = &mut inner.entries[idx];
170 entry.status = QueueStatus::Leased;
171 entry.lease_owner = Some(request.worker_id.clone());
172 entry.lease_expires_at = Some(lease_expires);
173 entry.last_heartbeat = Some(now);
174 entry.updated_at = now;
175
176 *leased_by_run.entry(entry.run_id).or_default() += 1;
177 *leased_by_class.entry(entry.command_class).or_default() += 1;
178
179 claimed.push(entry.clone());
180 }
181
182 Ok(claimed)
183 }
184
185 fn heartbeat(
186 &self,
187 queue_id: Uuid,
188 worker_id: &str,
189 lease_ttl: Duration,
190 ) -> Result<(), QueueError> {
191 let mut inner = self.inner.write().unwrap();
192 let idx = *inner
193 .id_index
194 .get(&queue_id)
195 .ok_or(QueueError::NotFound(queue_id))?;
196
197 let entry = &mut inner.entries[idx];
198
199 if entry.status != QueueStatus::Leased {
200 return Err(QueueError::InvalidStatus {
201 entry_id: queue_id,
202 expected: "leased",
203 actual: format!("{:?}", entry.status),
204 });
205 }
206
207 let owner = entry.lease_owner.as_deref().unwrap_or("");
208 if owner != worker_id {
209 return Err(QueueError::LeaseOwnerMismatch {
210 entry_id: queue_id,
211 expected: worker_id.to_string(),
212 actual: owner.to_string(),
213 });
214 }
215
216 let now = Utc::now();
218 if let Some(expires) = entry.lease_expires_at {
219 if now > expires {
220 return Err(QueueError::LeaseExpired(queue_id));
221 }
222 }
223
224 entry.lease_expires_at = Some(now + lease_ttl);
225 entry.last_heartbeat = Some(now);
226 entry.updated_at = now;
227
228 Ok(())
229 }
230
231 fn complete(&self, queue_id: Uuid, worker_id: &str) -> Result<(), QueueError> {
232 let mut inner = self.inner.write().unwrap();
233 let idx = *inner
234 .id_index
235 .get(&queue_id)
236 .ok_or(QueueError::NotFound(queue_id))?;
237
238 let entry = &inner.entries[idx];
239
240 if entry.status != QueueStatus::Leased {
241 return Err(QueueError::InvalidStatus {
242 entry_id: queue_id,
243 expected: "leased",
244 actual: format!("{:?}", entry.status),
245 });
246 }
247
248 let owner = entry.lease_owner.as_deref().unwrap_or("");
249 if owner != worker_id {
250 return Err(QueueError::LeaseOwnerMismatch {
251 entry_id: queue_id,
252 expected: worker_id.to_string(),
253 actual: owner.to_string(),
254 });
255 }
256
257 let task_id = entry.task_id;
258 let entry = &mut inner.entries[idx];
259 entry.status = QueueStatus::Completed;
260 entry.updated_at = Utc::now();
261
262 inner.active_tasks.remove(&task_id);
264
265 Ok(())
266 }
267
268 fn fail(&self, queue_id: Uuid, worker_id: &str) -> Result<(), QueueError> {
269 let mut inner = self.inner.write().unwrap();
270 let idx = *inner
271 .id_index
272 .get(&queue_id)
273 .ok_or(QueueError::NotFound(queue_id))?;
274
275 let entry = &inner.entries[idx];
276
277 if entry.status != QueueStatus::Leased {
278 return Err(QueueError::InvalidStatus {
279 entry_id: queue_id,
280 expected: "leased",
281 actual: format!("{:?}", entry.status),
282 });
283 }
284
285 let owner = entry.lease_owner.as_deref().unwrap_or("");
286 if owner != worker_id {
287 return Err(QueueError::LeaseOwnerMismatch {
288 entry_id: queue_id,
289 expected: worker_id.to_string(),
290 actual: owner.to_string(),
291 });
292 }
293
294 let task_id = entry.task_id;
295 let entry = &mut inner.entries[idx];
296 entry.status = QueueStatus::Failed;
297 entry.updated_at = Utc::now();
298
299 inner.active_tasks.remove(&task_id);
301
302 Ok(())
303 }
304
305 fn override_priority(&self, task_id: TaskId, priority: u32) -> Result<(), QueueError> {
306 let mut inner = self.inner.write().unwrap();
307 let now = Utc::now();
308 let mut found = false;
309
310 for entry in &mut inner.entries {
311 if entry.task_id == task_id {
312 entry.priority = priority;
313 entry.updated_at = now;
314 found = true;
315 }
316 }
317
318 if found {
319 Ok(())
320 } else {
321 Err(QueueError::NotFound(task_id))
322 }
323 }
324
325 fn cancel(&self, queue_id: Uuid) -> Result<(), QueueError> {
326 let mut inner = self.inner.write().unwrap();
327 let idx = *inner
328 .id_index
329 .get(&queue_id)
330 .ok_or(QueueError::NotFound(queue_id))?;
331
332 let status = inner.entries[idx].status;
333 let task_id = inner.entries[idx].task_id;
334
335 match status {
336 QueueStatus::Pending | QueueStatus::Leased => {
337 let entry = &mut inner.entries[idx];
338 entry.status = QueueStatus::Cancelled;
339 entry.updated_at = Utc::now();
340 inner.active_tasks.remove(&task_id);
341 Ok(())
342 }
343 _ => Err(QueueError::InvalidStatus {
344 entry_id: queue_id,
345 expected: "pending or leased",
346 actual: format!("{status:?}"),
347 }),
348 }
349 }
350
351 fn entries(&self) -> Vec<QueueEntry> {
352 let inner = self.inner.read().unwrap();
353 inner.entries.clone()
354 }
355
356 fn reclaim_stale(&self, grace_period: Duration) -> Result<usize, QueueError> {
357 let mut inner = self.inner.write().unwrap();
358 let now = Utc::now();
359 let mut reclaimed = 0;
360
361 for entry in &mut inner.entries {
362 if entry.status != QueueStatus::Leased {
363 continue;
364 }
365
366 let expires = match entry.lease_expires_at {
367 Some(t) => t,
368 None => continue,
369 };
370
371 if expires + grace_period < now {
373 entry.status = QueueStatus::Pending;
374 entry.attempt_no += 1;
375 entry.lease_owner = None;
376 entry.lease_expires_at = None;
377 entry.last_heartbeat = None;
378 entry.updated_at = now;
379 reclaimed += 1;
380 }
381 }
382
383 Ok(reclaimed)
384 }
385
386 fn stats(&self) -> QueueStats {
387 let inner = self.inner.read().unwrap();
388 let mut stats = QueueStats::default();
389 for entry in &inner.entries {
390 match entry.status {
391 QueueStatus::Pending => stats.pending += 1,
392 QueueStatus::Leased => stats.leased += 1,
393 QueueStatus::Completed => stats.completed += 1,
394 QueueStatus::Failed => stats.failed += 1,
395 QueueStatus::Cancelled => stats.cancelled += 1,
396 }
397 }
398 stats
399 }
400
401 fn leased_count_for_run(&self, run_id: RunId) -> usize {
402 let inner = self.inner.read().unwrap();
403 inner
404 .entries
405 .iter()
406 .filter(|e| e.status == QueueStatus::Leased && e.run_id == run_id)
407 .count()
408 }
409
410 fn leased_count_for_class(&self, class: CommandClass) -> usize {
411 let inner = self.inner.read().unwrap();
412 inner
413 .entries
414 .iter()
415 .filter(|e| e.status == QueueStatus::Leased && e.command_class == class)
416 .count()
417 }
418
419 fn pending_count(&self) -> usize {
420 let inner = self.inner.read().unwrap();
421 inner
422 .entries
423 .iter()
424 .filter(|e| e.status == QueueStatus::Pending)
425 .count()
426 }
427
428 fn cancel_for_run(&self, run_id: RunId) -> Result<usize, QueueError> {
429 let mut inner = self.inner.write().unwrap();
430 let now = Utc::now();
431 let mut cancelled = 0;
432
433 let mut cancelled_task_ids = Vec::new();
435
436 for entry in &mut inner.entries {
437 if entry.run_id == run_id
438 && matches!(entry.status, QueueStatus::Pending | QueueStatus::Leased)
439 {
440 cancelled_task_ids.push(entry.task_id);
441 entry.status = QueueStatus::Cancelled;
442 entry.updated_at = now;
443 cancelled += 1;
444 }
445 }
446
447 for task_id in &cancelled_task_ids {
449 inner.active_tasks.remove(task_id);
450 }
451
452 Ok(cancelled)
453 }
454
455 fn cancel_stale_runs(&self, active_run_ids: &[RunId]) -> Result<usize, QueueError> {
456 let mut inner = self.inner.write().unwrap();
457 let now = Utc::now();
458 let mut cancelled = 0;
459 let mut cancelled_task_ids = Vec::new();
460
461 for entry in &mut inner.entries {
462 if matches!(entry.status, QueueStatus::Pending | QueueStatus::Leased)
463 && !active_run_ids.contains(&entry.run_id)
464 {
465 cancelled_task_ids.push(entry.task_id);
466 entry.status = QueueStatus::Cancelled;
467 entry.updated_at = now;
468 cancelled += 1;
469 }
470 }
471
472 for task_id in &cancelled_task_ids {
473 inner.active_tasks.remove(task_id);
474 }
475
476 Ok(cancelled)
477 }
478}
479
480#[cfg(test)]
481mod tests;