oo_ide/task_registry.rs
1//! Queue-based task scheduler for the IDE.
2//!
3//! [`TaskRegistry`] is the single source of truth for all scheduled, running,
4//! and recently finished tasks. It enforces the following scheduling rules:
5//!
6//! 1. **One running task per queue** — a queue may have at most one task with
7//! status [`TaskStatus::Running`] at any time.
8//! 2. **Same `(queue, target)` cancels previous** — scheduling a task whose
9//! [`TaskKey`] matches a running or queued task cancels the earlier one first.
10//! 3. **Different targets are queued** — if another task is already running on
11//! the same queue but with a different target, the new task is enqueued.
12//! 4. **Queue compaction** — if the same [`TaskKey`] already sits in the pending
13//! queue, it is replaced in-place rather than appended a second time.
14//! 5. **FIFO execution** — tasks start in insertion order after deduplication.
15//!
16//! # Design constraints
17//!
18//! * **No disk I/O** — purely in-memory.
19//! * **No async** — deterministic and synchronous. Callers are responsible for
20//! spawning async work and calling [`TaskRegistry::mark_running`] /
21//! [`TaskRegistry::mark_finished`] on completion.
22//! * **Single-threaded** — lives on the main thread as part of
23//! [`crate::app_state::AppState`].
24
25use std::collections::{HashMap, VecDeque};
26use std::time::Instant;
27
28use tokio_util::sync::CancellationToken;
29
30// ---------------------------------------------------------------------------
31// Public types
32// ---------------------------------------------------------------------------
33
34/// Opaque, monotonically-increasing task identifier. Never reused within a
35/// single registry lifetime.
36#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
37pub struct TaskId(pub u64);
38
39/// Identifies a named scheduling queue (e.g. `"build"`, `"lint"`, `"test"`).
40#[derive(Clone, Debug, PartialEq, Eq, Hash)]
41pub struct TaskQueueId(pub String);
42
43/// The compound identity of a task for deduplication and cancellation purposes.
44///
45/// Two tasks with equal `TaskKey` values are considered interchangeable; the
46/// registry ensures that at most one such task is pending or running at any
47/// time.
48#[derive(Clone, Debug, PartialEq, Eq, Hash)]
49pub struct TaskKey {
50 pub queue: TaskQueueId,
51 /// Arbitrary string that scopes the task within its queue — e.g. a crate
52 /// path, a file path, or `"*"` for queue-wide singletons.
53 pub target: String,
54}
55
56/// Lifecycle state of a task.
57#[derive(Clone, Debug, PartialEq, Eq)]
58pub enum TaskStatus {
59 /// Created and waiting for its turn in the queue.
60 Pending,
61 /// Currently executing.
62 Running,
63 /// Finished successfully.
64 Success,
65 /// Finished with warnings but no hard errors.
66 Warning,
67 /// Finished with at least one error.
68 Error,
69 /// Cancelled before or during execution.
70 Cancelled,
71}
72
73/// What initiated the task.
74#[derive(Clone, Debug)]
75pub enum TaskTrigger {
76 Manual,
77 OnSave,
78 OnFileChange,
79 Extension(String),
80}
81
82/// A single scheduled, running, or finished task.
83pub struct Task {
84 pub id: TaskId,
85 pub key: TaskKey,
86 pub status: TaskStatus,
87 /// The shell command string to execute (e.g. `"cargo build -p my-crate"`).
88 pub command: String,
89 pub created_at: Instant,
90 pub started_at: Option<Instant>,
91 pub finished_at: Option<Instant>,
92 /// Signal used to stop the async work for this task. Callers poll
93 /// `cancellation_token.is_cancelled()` (or `await` it) in their async
94 /// work loop.
95 pub cancellation_token: CancellationToken,
96 pub trigger: TaskTrigger,
97}
98
99// ---------------------------------------------------------------------------
100// Registry
101// ---------------------------------------------------------------------------
102
103/// Central scheduler and store for all IDE tasks.
104pub struct TaskRegistry {
105 /// All tasks ever created in this session, keyed by [`TaskId`].
106 tasks: HashMap<TaskId, Task>,
107 /// The task currently running on each queue.
108 running: HashMap<TaskQueueId, TaskId>,
109 /// FIFO pending queue per [`TaskQueueId`].
110 queues: HashMap<TaskQueueId, VecDeque<TaskId>>,
111 next_id: u64,
112 /// Ring buffer of recently finished task IDs (max 5), newest at back.
113 recent_finished: VecDeque<TaskId>,
114}
115
116impl TaskRegistry {
117 pub fn new() -> Self {
118 Self {
119 tasks: HashMap::new(),
120 running: HashMap::new(),
121 queues: HashMap::new(),
122 next_id: 1,
123 recent_finished: VecDeque::with_capacity(5),
124 }
125 }
126
127 // -----------------------------------------------------------------------
128 // Public API
129 // -----------------------------------------------------------------------
130
131 /// Schedule a new task on `key.queue`.
132 ///
133 /// Any existing running or queued task with the **same** [`TaskKey`] is
134 /// cancelled first (Rule 2 / Rule 4). If no task is currently running on
135 /// the queue the new task starts immediately (status → `Running`); otherwise
136 /// it is appended to the FIFO queue (status → `Pending`).
137 ///
138 /// Returns the [`TaskId`] of the newly created task.
139 pub fn schedule_task(&mut self, key: TaskKey, trigger: TaskTrigger, command: String) -> TaskId {
140 // Cancel any existing task with the same key (running or queued).
141 self.cancel_by_key(&key);
142
143 let id = self.alloc_id();
144 let task = Task {
145 id,
146 key: key.clone(),
147 status: TaskStatus::Pending,
148 command,
149 created_at: Instant::now(),
150 started_at: None,
151 finished_at: None,
152 cancellation_token: CancellationToken::new(),
153 trigger,
154 };
155 self.tasks.insert(id, task);
156
157 let queue_id = &key.queue;
158 if self.running.contains_key(queue_id) {
159 // Another task is running on this queue — enqueue.
160 self.queues.entry(queue_id.clone()).or_default().push_back(id);
161 } else {
162 // Queue is idle — start immediately.
163 self.mark_running(id);
164 }
165
166 id
167 }
168
169 /// Cancel the task identified by `task_id`.
170 ///
171 /// * If the task is **Running** its [`CancellationToken`] is triggered and
172 /// `status` is set to [`TaskStatus::Cancelled`]. The `running` slot is
173 /// cleared and `start_next` is called for the queue.
174 /// * If the task is **Pending** it is removed from the queue and its status
175 /// is set to [`TaskStatus::Cancelled`].
176 /// * If the task is already finished or cancelled this is a no-op.
177 ///
178 /// Returns the [`TaskId`] of the next task that was started as a side-effect
179 /// of releasing the running slot (only possible when cancelling a Running
180 /// task that had a non-empty pending queue).
181 pub fn cancel(&mut self, task_id: TaskId) -> Option<TaskId> {
182 let (status, queue_id) = match self.tasks.get(&task_id) {
183 Some(t) => (t.status.clone(), t.key.queue.clone()),
184 None => return None,
185 };
186
187 match status {
188 TaskStatus::Running => {
189 if let Some(t) = self.tasks.get_mut(&task_id) {
190 t.cancellation_token.cancel();
191 t.status = TaskStatus::Cancelled;
192 t.finished_at = Some(Instant::now());
193 }
194 self.running.remove(&queue_id);
195 self.start_next(&queue_id)
196 }
197 TaskStatus::Pending => {
198 if let Some(queue) = self.queues.get_mut(&queue_id) {
199 queue.retain(|&id| id != task_id);
200 }
201 if let Some(t) = self.tasks.get_mut(&task_id) {
202 t.status = TaskStatus::Cancelled;
203 }
204 None
205 }
206 // Already in a terminal state — nothing to do.
207 _ => None,
208 }
209 }
210
211 /// Cancel all running and queued tasks whose [`TaskKey`] equals `key`.
212 ///
213 /// Returns the [`TaskId`] of any task that started as a side-effect of
214 /// releasing a running slot.
215 pub fn cancel_by_key(&mut self, key: &TaskKey) -> Option<TaskId> {
216 let ids_to_cancel: Vec<TaskId> = self
217 .tasks
218 .values()
219 .filter(|t| {
220 &t.key == key
221 && matches!(t.status, TaskStatus::Pending | TaskStatus::Running)
222 })
223 .map(|t| t.id)
224 .collect();
225
226 let mut started = None;
227 for id in ids_to_cancel {
228 if let Some(next) = self.cancel(id) {
229 started = Some(next);
230 }
231 }
232 started
233 }
234
235 /// Transition a task from `Pending` to `Running` and record `started_at`.
236 ///
237 /// Also registers the task in the `running` map for its queue.
238 /// Panics in debug builds if the task is not in `Pending` state.
239 pub fn mark_running(&mut self, task_id: TaskId) {
240 let queue_id = match self.tasks.get_mut(&task_id) {
241 Some(t) => {
242 debug_assert_eq!(
243 t.status,
244 TaskStatus::Pending,
245 "mark_running called on task {:?} with status {:?}",
246 task_id,
247 t.status
248 );
249 t.status = TaskStatus::Running;
250 t.started_at = Some(Instant::now());
251 t.key.queue.clone()
252 }
253 None => return,
254 };
255 self.running.insert(queue_id, task_id);
256 }
257
258 /// Record a terminal status for a finished task and start the next one.
259 ///
260 /// If the task was already `Cancelled` the status is **not** overwritten —
261 /// a cancelled task stays cancelled regardless of the final process outcome.
262 ///
263 /// Returns the [`TaskId`] of the next task that was started as a
264 /// side-effect, if the queue had a pending task waiting.
265 pub fn mark_finished(&mut self, task_id: TaskId, status: TaskStatus) -> Option<TaskId> {
266 let queue_id = match self.tasks.get_mut(&task_id) {
267 Some(t) => {
268 // A cancelled task must not be re-labelled.
269 if t.status != TaskStatus::Cancelled {
270 t.status = status;
271 t.finished_at = Some(Instant::now());
272 }
273 t.key.queue.clone()
274 }
275 None => return None,
276 };
277
278 // Add to recently-finished ring (cap at 5, discard oldest).
279 if self.recent_finished.len() >= 5 {
280 self.recent_finished.pop_front();
281 }
282 self.recent_finished.push_back(task_id);
283
284 // Clear the running slot only if this task still owns it.
285 if self.running.get(&queue_id) == Some(&task_id) {
286 self.running.remove(&queue_id);
287 self.start_next(&queue_id)
288 } else {
289 None
290 }
291 }
292
293 // -----------------------------------------------------------------------
294 // Read accessors
295 // -----------------------------------------------------------------------
296
297 /// Returns an iterator over all tasks ever created (in arbitrary order).
298 pub fn all_tasks(&self) -> impl Iterator<Item = &Task> {
299 self.tasks.values()
300 }
301
302 /// Returns the total number of tasks ever created (running, pending, or finished).
303 pub fn task_count(&self) -> usize {
304 self.tasks.len()
305 }
306
307 /// Returns a reference to a task by ID, or `None` if it does not exist.
308 pub fn get(&self, id: TaskId) -> Option<&Task> {
309 self.tasks.get(&id)
310 }
311
312 /// Returns the [`TaskId`] of the currently running task on `queue`, if any.
313 pub fn running_task(&self, queue: &TaskQueueId) -> Option<TaskId> {
314 self.running.get(queue).copied()
315 }
316
317 /// Returns an iterator over all currently running tasks `(queue_id, task_id)`.
318 pub fn running_tasks(&self) -> impl Iterator<Item = (&TaskQueueId, &TaskId)> {
319 self.running.iter()
320 }
321
322 /// Returns an iterator over recently finished task IDs (newest last, max 5).
323 ///
324 /// Only tasks with a terminal status (Success / Warning / Error / Cancelled)
325 /// are included. The caller can use [`TaskRegistry::get`] to read the full task record.
326 pub fn recently_finished_tasks(&self) -> impl Iterator<Item = TaskId> + '_ {
327 // Iterate newest-first so the most recent result appears closest to the
328 // running task on the right side of the status bar.
329 self.recent_finished.iter().rev().copied()
330 }
331
332 /// Returns the ordered pending task IDs for `queue` (front = next to run).
333 pub fn pending_tasks(&self, queue: &TaskQueueId) -> &[TaskId] {
334 self.queues
335 .get(queue)
336 .map(|q| q.as_slices().0)
337 .unwrap_or(&[])
338 }
339
340 // -----------------------------------------------------------------------
341 // Private helpers
342 // -----------------------------------------------------------------------
343
344 fn alloc_id(&mut self) -> TaskId {
345 let id = TaskId(self.next_id);
346 self.next_id += 1;
347 id
348 }
349
350 /// Dequeue the next pending task for `queue` and mark it as running.
351 /// Returns `Some(TaskId)` if a task was started, `None` if the queue is
352 /// empty.
353 fn start_next(&mut self, queue: &TaskQueueId) -> Option<TaskId> {
354 let next_id = self.queues.get_mut(queue)?.pop_front()?;
355 self.mark_running(next_id);
356 Some(next_id)
357 }
358}
359
360impl Default for TaskRegistry {
361 fn default() -> Self {
362 Self::new()
363 }
364}
365
366// ---------------------------------------------------------------------------
367// Tests
368// ---------------------------------------------------------------------------
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373
374 fn key(queue: &str, target: &str) -> TaskKey {
375 TaskKey {
376 queue: TaskQueueId(queue.into()),
377 target: target.into(),
378 }
379 }
380
381 fn sched(reg: &mut TaskRegistry, queue: &str, target: &str) -> TaskId {
382 reg.schedule_task(key(queue, target), TaskTrigger::Manual, format!("echo {target}"))
383 }
384
385 // 1. Schedule same (queue, target) twice → first cancelled, second is active
386 #[test]
387 fn same_key_cancels_previous() {
388 let mut reg = TaskRegistry::new();
389 let id1 = sched(&mut reg, "build", "crate:a");
390 let id2 = sched(&mut reg, "build", "crate:a");
391
392 assert_eq!(reg.get(id1).unwrap().status, TaskStatus::Cancelled);
393 assert!(reg.get(id1).unwrap().cancellation_token.is_cancelled());
394 assert_eq!(reg.get(id2).unwrap().status, TaskStatus::Running);
395 assert_eq!(reg.running_task(&TaskQueueId("build".into())), Some(id2));
396 }
397
398 // 2. Schedule A then B → A running, B queued
399 #[test]
400 fn different_targets_are_queued() {
401 let mut reg = TaskRegistry::new();
402 let id_a = sched(&mut reg, "build", "crate:a");
403 let id_b = sched(&mut reg, "build", "crate:b");
404
405 assert_eq!(reg.get(id_a).unwrap().status, TaskStatus::Running);
406 assert_eq!(reg.get(id_b).unwrap().status, TaskStatus::Pending);
407 assert_eq!(reg.running_task(&TaskQueueId("build".into())), Some(id_a));
408 assert_eq!(reg.pending_tasks(&TaskQueueId("build".into())), &[id_b]);
409 }
410
411 // 3. Schedule A, B, A again → deduplicated: only latest A remains in queue
412 #[test]
413 fn queue_compaction_deduplicates_key() {
414 let mut reg = TaskRegistry::new();
415 let id_a1 = sched(&mut reg, "build", "crate:a");
416 let id_b = sched(&mut reg, "build", "crate:b");
417 // Re-schedule A — must replace queued A, not append.
418 let id_a2 = sched(&mut reg, "build", "crate:a");
419
420 // A1 was running → cancelled; A2 queued (not running, because B is running).
421 // Actually: A1 started immediately. Then B queued. Then A2 is scheduled:
422 // cancel_by_key(A) hits A1 (Running) → cancelled + start_next → B starts.
423 // new A2 is created: B is now running → A2 enqueued.
424 assert_eq!(reg.get(id_a1).unwrap().status, TaskStatus::Cancelled);
425 assert_eq!(reg.get(id_b).unwrap().status, TaskStatus::Running);
426 assert_eq!(reg.get(id_a2).unwrap().status, TaskStatus::Pending);
427
428 // Queue must contain exactly A2 (not id_b which is running, not id_a1).
429 let pending = reg.pending_tasks(&TaskQueueId("build".into()));
430 assert_eq!(pending, &[id_a2]);
431 }
432
433 // 4. Cancel a queued task → removed from queue, status Cancelled
434 #[test]
435 fn cancel_queued_task_removes_from_queue() {
436 let mut reg = TaskRegistry::new();
437 let _id_a = sched(&mut reg, "build", "crate:a");
438 let id_b = sched(&mut reg, "build", "crate:b");
439
440 reg.cancel(id_b);
441
442 assert_eq!(reg.get(id_b).unwrap().status, TaskStatus::Cancelled);
443 assert!(reg.pending_tasks(&TaskQueueId("build".into())).is_empty());
444 }
445
446 // 5. Cancel a running task → token triggered, status Cancelled
447 #[test]
448 fn cancel_running_task_triggers_token() {
449 let mut reg = TaskRegistry::new();
450 let id = sched(&mut reg, "build", "crate:a");
451 let token = reg.get(id).unwrap().cancellation_token.clone();
452
453 reg.cancel(id);
454
455 assert_eq!(reg.get(id).unwrap().status, TaskStatus::Cancelled);
456 assert!(token.is_cancelled());
457 assert_eq!(reg.running_task(&TaskQueueId("build".into())), None);
458 }
459
460 // 6. mark_finished → next queued task starts automatically; returns next id
461 #[test]
462 fn finish_starts_next_queued_task() {
463 let mut reg = TaskRegistry::new();
464 let id_a = sched(&mut reg, "build", "crate:a");
465 let id_b = sched(&mut reg, "build", "crate:b");
466
467 let next = reg.mark_finished(id_a, TaskStatus::Success);
468
469 assert_eq!(next, Some(id_b));
470 assert_eq!(reg.get(id_a).unwrap().status, TaskStatus::Success);
471 assert_eq!(reg.get(id_b).unwrap().status, TaskStatus::Running);
472 assert_eq!(reg.running_task(&TaskQueueId("build".into())), Some(id_b));
473 }
474
475 // 7. Multiple queues operate independently
476 #[test]
477 fn independent_queues_do_not_interfere() {
478 let mut reg = TaskRegistry::new();
479 let build_id = sched(&mut reg, "build", "crate:a");
480 let lint_id = sched(&mut reg, "lint", "crate:a");
481
482 assert_eq!(reg.get(build_id).unwrap().status, TaskStatus::Running);
483 assert_eq!(reg.get(lint_id).unwrap().status, TaskStatus::Running);
484
485 reg.mark_finished(build_id, TaskStatus::Success);
486 // Lint queue should be unaffected.
487 assert_eq!(reg.get(lint_id).unwrap().status, TaskStatus::Running);
488 }
489
490 // 8. A finished cancelled task must remain Cancelled
491 #[test]
492 fn cancelled_task_stays_cancelled_on_finish() {
493 let mut reg = TaskRegistry::new();
494 let id = sched(&mut reg, "build", "crate:a");
495 reg.cancel(id);
496
497 // Simulate the async worker not noticing cancellation in time and
498 // reporting success anyway.
499 reg.mark_finished(id, TaskStatus::Success);
500
501 assert_eq!(reg.get(id).unwrap().status, TaskStatus::Cancelled);
502 }
503
504 // 9. start_next on empty queue → no-op, returns None
505 #[test]
506 fn finish_on_empty_queue_is_noop() {
507 let mut reg = TaskRegistry::new();
508 let id = sched(&mut reg, "build", "crate:a");
509 let next = reg.mark_finished(id, TaskStatus::Success);
510
511 assert_eq!(next, None);
512 assert_eq!(reg.running_task(&TaskQueueId("build".into())), None);
513 assert!(reg.pending_tasks(&TaskQueueId("build".into())).is_empty());
514 }
515
516 // Bonus: rapid rescheduling leaves no duplicate key in queue
517 #[test]
518 fn rapid_reschedule_no_duplicate_key_in_queue() {
519 let mut reg = TaskRegistry::new();
520 // Occupy the queue with a long-running anchor task.
521 let _anchor = sched(&mut reg, "build", "anchor");
522
523 // Schedule target A three times rapidly.
524 let id1 = sched(&mut reg, "build", "crate:a");
525 let id2 = sched(&mut reg, "build", "crate:a");
526 let id3 = sched(&mut reg, "build", "crate:a");
527
528 assert_eq!(reg.get(id1).unwrap().status, TaskStatus::Cancelled);
529 assert_eq!(reg.get(id2).unwrap().status, TaskStatus::Cancelled);
530 assert_eq!(reg.get(id3).unwrap().status, TaskStatus::Pending);
531
532 let pending = reg.pending_tasks(&TaskQueueId("build".into()));
533 assert_eq!(pending.len(), 1);
534 assert_eq!(pending[0], id3);
535 }
536
537 // cancel() on a running task returns the next started task
538 #[test]
539 fn cancel_running_returns_next_started() {
540 let mut reg = TaskRegistry::new();
541 let id_a = sched(&mut reg, "build", "crate:a");
542 let id_b = sched(&mut reg, "build", "crate:b");
543
544 let next = reg.cancel(id_a);
545
546 assert_eq!(next, Some(id_b));
547 assert_eq!(reg.get(id_b).unwrap().status, TaskStatus::Running);
548 }
549}