Skip to main content

yarli_cli/yarli-queue/src/
memory.rs

1//! In-memory task queue for development and testing.
2//!
3//! Uses `RwLock<Vec<QueueEntry>>` with index maps for O(1) lookups,
4//! mirroring the InMemoryEventStore pattern from yarli-store.
5
6use std::collections::HashMap;
7use std::sync::RwLock;
8
9use chrono::{DateTime, Duration, Utc};
10use uuid::Uuid;
11
12use crate::yarli_core::domain::{CommandClass, RunId, TaskId};
13
14use crate::yarli_queue::error::QueueError;
15use crate::yarli_queue::queue::{
16    ClaimRequest, ConcurrencyConfig, QueueEntry, QueueStats, QueueStatus, TaskQueue,
17};
18
19const PRIORITY_AGING_WINDOW_SECONDS: i64 = 60;
20
21fn priority_for_claim(priority: u32, available_at: DateTime<Utc>, now: DateTime<Utc>) -> i64 {
22    let age_seconds = (now - available_at).num_seconds().max(0);
23    let age_boost = age_seconds / PRIORITY_AGING_WINDOW_SECONDS;
24    priority as i64 + age_boost
25}
26
27/// Thread-safe in-memory task queue.
28pub struct InMemoryTaskQueue {
29    inner: RwLock<QueueInner>,
30}
31
32struct QueueInner {
33    entries: Vec<QueueEntry>,
34    /// queue_id → index in entries vec.
35    id_index: HashMap<Uuid, usize>,
36    /// task_id → queue_id for active (pending/leased) entries.
37    active_tasks: HashMap<TaskId, Uuid>,
38}
39
40impl InMemoryTaskQueue {
41    pub fn new() -> Self {
42        Self {
43            inner: RwLock::new(QueueInner {
44                entries: Vec::new(),
45                id_index: HashMap::new(),
46                active_tasks: HashMap::new(),
47            }),
48        }
49    }
50}
51
52impl Default for InMemoryTaskQueue {
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58impl TaskQueue for InMemoryTaskQueue {
59    fn enqueue(
60        &self,
61        task_id: TaskId,
62        run_id: RunId,
63        priority: u32,
64        command_class: CommandClass,
65        available_at: Option<DateTime<Utc>>,
66    ) -> Result<Uuid, QueueError> {
67        let mut inner = self.inner.write().unwrap();
68
69        // Check for duplicate active task.
70        if inner.active_tasks.contains_key(&task_id) {
71            return Err(QueueError::DuplicateTask(task_id));
72        }
73
74        let now = Utc::now();
75        let queue_id = Uuid::now_v7();
76        let entry = QueueEntry {
77            queue_id,
78            task_id,
79            run_id,
80            priority,
81            available_at: available_at.unwrap_or(now),
82            attempt_no: 1,
83            command_class,
84            status: QueueStatus::Pending,
85            lease_owner: None,
86            lease_expires_at: None,
87            last_heartbeat: None,
88            rehydration_tokens: None,
89            created_at: now,
90            updated_at: now,
91        };
92
93        let idx = inner.entries.len();
94        inner.entries.push(entry);
95        inner.id_index.insert(queue_id, idx);
96        inner.active_tasks.insert(task_id, queue_id);
97
98        Ok(queue_id)
99    }
100
101    fn claim(
102        &self,
103        request: &ClaimRequest,
104        config: &ConcurrencyConfig,
105    ) -> Result<Vec<QueueEntry>, QueueError> {
106        let mut inner = self.inner.write().unwrap();
107        let now = Utc::now();
108
109        // Count current leased entries per run and per command class.
110        let mut leased_by_run: HashMap<RunId, usize> = HashMap::new();
111        let mut leased_by_class: HashMap<CommandClass, usize> = HashMap::new();
112        for entry in &inner.entries {
113            if entry.status == QueueStatus::Leased {
114                *leased_by_run.entry(entry.run_id).or_default() += 1;
115                *leased_by_class.entry(entry.command_class).or_default() += 1;
116            }
117        }
118
119        // Collect indices of claimable entries, sorted by priority DESC with aging.
120        let allowed = &request.allowed_run_ids;
121        let mut candidates: Vec<usize> = inner
122            .entries
123            .iter()
124            .enumerate()
125            .filter(|(_, e)| {
126                e.status == QueueStatus::Pending
127                    && e.available_at <= now
128                    && allowed.as_ref().map_or(true, |ids| ids.contains(&e.run_id))
129            })
130            .map(|(i, _)| i)
131            .collect();
132
133        candidates.sort_by(|&a, &b| {
134            let ea = &inner.entries[a];
135            let eb = &inner.entries[b];
136            priority_for_claim(eb.priority, eb.available_at, now)
137                .cmp(&priority_for_claim(ea.priority, ea.available_at, now))
138                .then(eb.priority.cmp(&ea.priority))
139                .then(ea.available_at.cmp(&eb.available_at))
140                .then(ea.queue_id.cmp(&eb.queue_id))
141        });
142
143        let mut claimed = Vec::new();
144        let lease_expires = now + request.lease_ttl;
145
146        for idx in candidates {
147            if claimed.len() >= request.limit {
148                break;
149            }
150
151            let entry = &inner.entries[idx];
152
153            // Check per-run cap.
154            let run_count = leased_by_run.get(&entry.run_id).copied().unwrap_or(0);
155            if run_count >= config.per_run_cap {
156                continue;
157            }
158
159            // Check per-class cap.
160            let class_count = leased_by_class
161                .get(&entry.command_class)
162                .copied()
163                .unwrap_or(0);
164            if class_count >= config.cap_for(entry.command_class) {
165                continue;
166            }
167
168            // Claim it.
169            let entry = &mut inner.entries[idx];
170            entry.status = QueueStatus::Leased;
171            entry.lease_owner = Some(request.worker_id.clone());
172            entry.lease_expires_at = Some(lease_expires);
173            entry.last_heartbeat = Some(now);
174            entry.updated_at = now;
175
176            *leased_by_run.entry(entry.run_id).or_default() += 1;
177            *leased_by_class.entry(entry.command_class).or_default() += 1;
178
179            claimed.push(entry.clone());
180        }
181
182        Ok(claimed)
183    }
184
185    fn heartbeat(
186        &self,
187        queue_id: Uuid,
188        worker_id: &str,
189        lease_ttl: Duration,
190    ) -> Result<(), QueueError> {
191        let mut inner = self.inner.write().unwrap();
192        let idx = *inner
193            .id_index
194            .get(&queue_id)
195            .ok_or(QueueError::NotFound(queue_id))?;
196
197        let entry = &mut inner.entries[idx];
198
199        if entry.status != QueueStatus::Leased {
200            return Err(QueueError::InvalidStatus {
201                entry_id: queue_id,
202                expected: "leased",
203                actual: format!("{:?}", entry.status),
204            });
205        }
206
207        let owner = entry.lease_owner.as_deref().unwrap_or("");
208        if owner != worker_id {
209            return Err(QueueError::LeaseOwnerMismatch {
210                entry_id: queue_id,
211                expected: worker_id.to_string(),
212                actual: owner.to_string(),
213            });
214        }
215
216        // Check if lease already expired.
217        let now = Utc::now();
218        if let Some(expires) = entry.lease_expires_at {
219            if now > expires {
220                return Err(QueueError::LeaseExpired(queue_id));
221            }
222        }
223
224        entry.lease_expires_at = Some(now + lease_ttl);
225        entry.last_heartbeat = Some(now);
226        entry.updated_at = now;
227
228        Ok(())
229    }
230
231    fn complete(&self, queue_id: Uuid, worker_id: &str) -> Result<(), QueueError> {
232        let mut inner = self.inner.write().unwrap();
233        let idx = *inner
234            .id_index
235            .get(&queue_id)
236            .ok_or(QueueError::NotFound(queue_id))?;
237
238        let entry = &inner.entries[idx];
239
240        if entry.status != QueueStatus::Leased {
241            return Err(QueueError::InvalidStatus {
242                entry_id: queue_id,
243                expected: "leased",
244                actual: format!("{:?}", entry.status),
245            });
246        }
247
248        let owner = entry.lease_owner.as_deref().unwrap_or("");
249        if owner != worker_id {
250            return Err(QueueError::LeaseOwnerMismatch {
251                entry_id: queue_id,
252                expected: worker_id.to_string(),
253                actual: owner.to_string(),
254            });
255        }
256
257        let task_id = entry.task_id;
258        let entry = &mut inner.entries[idx];
259        entry.status = QueueStatus::Completed;
260        entry.updated_at = Utc::now();
261
262        // Remove from active tasks.
263        inner.active_tasks.remove(&task_id);
264
265        Ok(())
266    }
267
268    fn fail(&self, queue_id: Uuid, worker_id: &str) -> Result<(), QueueError> {
269        let mut inner = self.inner.write().unwrap();
270        let idx = *inner
271            .id_index
272            .get(&queue_id)
273            .ok_or(QueueError::NotFound(queue_id))?;
274
275        let entry = &inner.entries[idx];
276
277        if entry.status != QueueStatus::Leased {
278            return Err(QueueError::InvalidStatus {
279                entry_id: queue_id,
280                expected: "leased",
281                actual: format!("{:?}", entry.status),
282            });
283        }
284
285        let owner = entry.lease_owner.as_deref().unwrap_or("");
286        if owner != worker_id {
287            return Err(QueueError::LeaseOwnerMismatch {
288                entry_id: queue_id,
289                expected: worker_id.to_string(),
290                actual: owner.to_string(),
291            });
292        }
293
294        let task_id = entry.task_id;
295        let entry = &mut inner.entries[idx];
296        entry.status = QueueStatus::Failed;
297        entry.updated_at = Utc::now();
298
299        // Remove from active tasks so the task can be re-enqueued for retry.
300        inner.active_tasks.remove(&task_id);
301
302        Ok(())
303    }
304
305    fn override_priority(&self, task_id: TaskId, priority: u32) -> Result<(), QueueError> {
306        let mut inner = self.inner.write().unwrap();
307        let now = Utc::now();
308        let mut found = false;
309
310        for entry in &mut inner.entries {
311            if entry.task_id == task_id {
312                entry.priority = priority;
313                entry.updated_at = now;
314                found = true;
315            }
316        }
317
318        if found {
319            Ok(())
320        } else {
321            Err(QueueError::NotFound(task_id))
322        }
323    }
324
325    fn cancel(&self, queue_id: Uuid) -> Result<(), QueueError> {
326        let mut inner = self.inner.write().unwrap();
327        let idx = *inner
328            .id_index
329            .get(&queue_id)
330            .ok_or(QueueError::NotFound(queue_id))?;
331
332        let status = inner.entries[idx].status;
333        let task_id = inner.entries[idx].task_id;
334
335        match status {
336            QueueStatus::Pending | QueueStatus::Leased => {
337                let entry = &mut inner.entries[idx];
338                entry.status = QueueStatus::Cancelled;
339                entry.updated_at = Utc::now();
340                inner.active_tasks.remove(&task_id);
341                Ok(())
342            }
343            _ => Err(QueueError::InvalidStatus {
344                entry_id: queue_id,
345                expected: "pending or leased",
346                actual: format!("{status:?}"),
347            }),
348        }
349    }
350
351    fn entries(&self) -> Vec<QueueEntry> {
352        let inner = self.inner.read().unwrap();
353        inner.entries.clone()
354    }
355
356    fn reclaim_stale(&self, grace_period: Duration) -> Result<usize, QueueError> {
357        let mut inner = self.inner.write().unwrap();
358        let now = Utc::now();
359        let mut reclaimed = 0;
360
361        for entry in &mut inner.entries {
362            if entry.status != QueueStatus::Leased {
363                continue;
364            }
365
366            let expires = match entry.lease_expires_at {
367                Some(t) => t,
368                None => continue,
369            };
370
371            // Stale if lease_expires_at + grace < now.
372            if expires + grace_period < now {
373                entry.status = QueueStatus::Pending;
374                entry.attempt_no += 1;
375                entry.lease_owner = None;
376                entry.lease_expires_at = None;
377                entry.last_heartbeat = None;
378                entry.updated_at = now;
379                reclaimed += 1;
380            }
381        }
382
383        Ok(reclaimed)
384    }
385
386    fn stats(&self) -> QueueStats {
387        let inner = self.inner.read().unwrap();
388        let mut stats = QueueStats::default();
389        for entry in &inner.entries {
390            match entry.status {
391                QueueStatus::Pending => stats.pending += 1,
392                QueueStatus::Leased => stats.leased += 1,
393                QueueStatus::Completed => stats.completed += 1,
394                QueueStatus::Failed => stats.failed += 1,
395                QueueStatus::Cancelled => stats.cancelled += 1,
396            }
397        }
398        stats
399    }
400
401    fn leased_count_for_run(&self, run_id: RunId) -> usize {
402        let inner = self.inner.read().unwrap();
403        inner
404            .entries
405            .iter()
406            .filter(|e| e.status == QueueStatus::Leased && e.run_id == run_id)
407            .count()
408    }
409
410    fn leased_count_for_class(&self, class: CommandClass) -> usize {
411        let inner = self.inner.read().unwrap();
412        inner
413            .entries
414            .iter()
415            .filter(|e| e.status == QueueStatus::Leased && e.command_class == class)
416            .count()
417    }
418
419    fn pending_count(&self) -> usize {
420        let inner = self.inner.read().unwrap();
421        inner
422            .entries
423            .iter()
424            .filter(|e| e.status == QueueStatus::Pending)
425            .count()
426    }
427
428    fn cancel_for_run(&self, run_id: RunId) -> Result<usize, QueueError> {
429        let mut inner = self.inner.write().unwrap();
430        let now = Utc::now();
431        let mut cancelled = 0;
432
433        // Collect task_ids of entries we're about to cancel.
434        let mut cancelled_task_ids = Vec::new();
435
436        for entry in &mut inner.entries {
437            if entry.run_id == run_id
438                && matches!(entry.status, QueueStatus::Pending | QueueStatus::Leased)
439            {
440                cancelled_task_ids.push(entry.task_id);
441                entry.status = QueueStatus::Cancelled;
442                entry.updated_at = now;
443                cancelled += 1;
444            }
445        }
446
447        // Remove cancelled entries from active_tasks map.
448        for task_id in &cancelled_task_ids {
449            inner.active_tasks.remove(task_id);
450        }
451
452        Ok(cancelled)
453    }
454
455    fn cancel_stale_runs(&self, active_run_ids: &[RunId]) -> Result<usize, QueueError> {
456        let mut inner = self.inner.write().unwrap();
457        let now = Utc::now();
458        let mut cancelled = 0;
459        let mut cancelled_task_ids = Vec::new();
460
461        for entry in &mut inner.entries {
462            if matches!(entry.status, QueueStatus::Pending | QueueStatus::Leased)
463                && !active_run_ids.contains(&entry.run_id)
464            {
465                cancelled_task_ids.push(entry.task_id);
466                entry.status = QueueStatus::Cancelled;
467                entry.updated_at = now;
468                cancelled += 1;
469            }
470        }
471
472        for task_id in &cancelled_task_ids {
473            inner.active_tasks.remove(task_id);
474        }
475
476        Ok(cancelled)
477    }
478}
479
480#[cfg(test)]
481mod tests;