Skip to main content

oo_ide/
task_registry.rs

1//! Queue-based task scheduler for the IDE.
2//!
3//! [`TaskRegistry`] is the single source of truth for all scheduled, running,
4//! and recently finished tasks.  It enforces the following scheduling rules:
5//!
6//! 1. **One running task per queue** — a queue may have at most one task with
7//!    status [`TaskStatus::Running`] at any time.
8//! 2. **Same `(queue, target)` cancels previous** — scheduling a task whose
9//!    [`TaskKey`] matches a running or queued task cancels the earlier one first.
10//! 3. **Different targets are queued** — if another task is already running on
11//!    the same queue but with a different target, the new task is enqueued.
12//! 4. **Queue compaction** — if the same [`TaskKey`] already sits in the pending
13//!    queue, it is replaced in-place rather than appended a second time.
14//! 5. **FIFO execution** — tasks start in insertion order after deduplication.
15//!
16//! # Design constraints
17//!
18//! * **No disk I/O** — purely in-memory.
19//! * **No async** — deterministic and synchronous.  Callers are responsible for
20//!   spawning async work and calling [`TaskRegistry::mark_running`] /
21//!   [`TaskRegistry::mark_finished`] on completion.
22//! * **Single-threaded** — lives on the main thread as part of
23//!   [`crate::app_state::AppState`].
24
25use std::collections::{HashMap, VecDeque};
26use std::time::Instant;
27
28use tokio_util::sync::CancellationToken;
29
30// ---------------------------------------------------------------------------
31// Public types
32// ---------------------------------------------------------------------------
33
34/// Opaque, monotonically-increasing task identifier.  Never reused within a
35/// single registry lifetime.
36#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
37pub struct TaskId(pub u64);
38
39/// Identifies a named scheduling queue (e.g. `"build"`, `"lint"`, `"test"`).
40#[derive(Clone, Debug, PartialEq, Eq, Hash)]
41pub struct TaskQueueId(pub String);
42
43/// The compound identity of a task for deduplication and cancellation purposes.
44///
45/// Two tasks with equal `TaskKey` values are considered interchangeable; the
46/// registry ensures that at most one such task is pending or running at any
47/// time.
48#[derive(Clone, Debug, PartialEq, Eq, Hash)]
49pub struct TaskKey {
50    pub queue: TaskQueueId,
51    /// Arbitrary string that scopes the task within its queue — e.g. a crate
52    /// path, a file path, or `"*"` for queue-wide singletons.
53    pub target: String,
54}
55
56/// Lifecycle state of a task.
57#[derive(Clone, Debug, PartialEq, Eq)]
58pub enum TaskStatus {
59    /// Created and waiting for its turn in the queue.
60    Pending,
61    /// Currently executing.
62    Running,
63    /// Finished successfully.
64    Success,
65    /// Finished with warnings but no hard errors.
66    Warning,
67    /// Finished with at least one error.
68    Error,
69    /// Cancelled before or during execution.
70    Cancelled,
71}
72
73/// What initiated the task.
74#[derive(Clone, Debug)]
75pub enum TaskTrigger {
76    Manual,
77    OnSave,
78    OnFileChange,
79    Extension(String),
80}
81
82/// A single scheduled, running, or finished task.
83pub struct Task {
84    pub id: TaskId,
85    pub key: TaskKey,
86    pub status: TaskStatus,
87    /// The shell command string to execute (e.g. `"cargo build -p my-crate"`).
88    pub command: String,
89    pub created_at: Instant,
90    pub started_at: Option<Instant>,
91    pub finished_at: Option<Instant>,
92    /// Signal used to stop the async work for this task.  Callers poll
93    /// `cancellation_token.is_cancelled()` (or `await` it) in their async
94    /// work loop.
95    pub cancellation_token: CancellationToken,
96    pub trigger: TaskTrigger,
97}
98
99// ---------------------------------------------------------------------------
100// Registry
101// ---------------------------------------------------------------------------
102
103/// Central scheduler and store for all IDE tasks.
104pub struct TaskRegistry {
105    /// All tasks ever created in this session, keyed by [`TaskId`].
106    tasks: HashMap<TaskId, Task>,
107    /// The task currently running on each queue.
108    running: HashMap<TaskQueueId, TaskId>,
109    /// FIFO pending queue per [`TaskQueueId`].
110    queues: HashMap<TaskQueueId, VecDeque<TaskId>>,
111    next_id: u64,
112    /// Ring buffer of recently finished task IDs (max 5), newest at back.
113    recent_finished: VecDeque<TaskId>,
114}
115
116impl TaskRegistry {
117    pub fn new() -> Self {
118        Self {
119            tasks: HashMap::new(),
120            running: HashMap::new(),
121            queues: HashMap::new(),
122            next_id: 1,
123            recent_finished: VecDeque::with_capacity(5),
124        }
125    }
126
127    // -----------------------------------------------------------------------
128    // Public API
129    // -----------------------------------------------------------------------
130
131    /// Schedule a new task on `key.queue`.
132    ///
133    /// Any existing running or queued task with the **same** [`TaskKey`] is
134    /// cancelled first (Rule 2 / Rule 4).  If no task is currently running on
135    /// the queue the new task starts immediately (status → `Running`); otherwise
136    /// it is appended to the FIFO queue (status → `Pending`).
137    ///
138    /// Returns the [`TaskId`] of the newly created task.
139    pub fn schedule_task(&mut self, key: TaskKey, trigger: TaskTrigger, command: String) -> TaskId {
140        // Cancel any existing task with the same key (running or queued).
141        self.cancel_by_key(&key);
142
143        let id = self.alloc_id();
144        let task = Task {
145            id,
146            key: key.clone(),
147            status: TaskStatus::Pending,
148            command,
149            created_at: Instant::now(),
150            started_at: None,
151            finished_at: None,
152            cancellation_token: CancellationToken::new(),
153            trigger,
154        };
155        self.tasks.insert(id, task);
156
157        let queue_id = &key.queue;
158        if self.running.contains_key(queue_id) {
159            // Another task is running on this queue — enqueue.
160            self.queues.entry(queue_id.clone()).or_default().push_back(id);
161        } else {
162            // Queue is idle — start immediately.
163            self.mark_running(id);
164        }
165
166        id
167    }
168
169    /// Cancel the task identified by `task_id`.
170    ///
171    /// * If the task is **Running** its [`CancellationToken`] is triggered and
172    ///   `status` is set to [`TaskStatus::Cancelled`].  The `running` slot is
173    ///   cleared and `start_next` is called for the queue.
174    /// * If the task is **Pending** it is removed from the queue and its status
175    ///   is set to [`TaskStatus::Cancelled`].
176    /// * If the task is already finished or cancelled this is a no-op.
177    ///
178    /// Returns the [`TaskId`] of the next task that was started as a side-effect
179    /// of releasing the running slot (only possible when cancelling a Running
180    /// task that had a non-empty pending queue).
181    pub fn cancel(&mut self, task_id: TaskId) -> Option<TaskId> {
182        let (status, queue_id) = match self.tasks.get(&task_id) {
183            Some(t) => (t.status.clone(), t.key.queue.clone()),
184            None => return None,
185        };
186
187        match status {
188            TaskStatus::Running => {
189                if let Some(t) = self.tasks.get_mut(&task_id) {
190                    t.cancellation_token.cancel();
191                    t.status = TaskStatus::Cancelled;
192                    t.finished_at = Some(Instant::now());
193                }
194                self.running.remove(&queue_id);
195                self.start_next(&queue_id)
196            }
197            TaskStatus::Pending => {
198                if let Some(queue) = self.queues.get_mut(&queue_id) {
199                    queue.retain(|&id| id != task_id);
200                }
201                if let Some(t) = self.tasks.get_mut(&task_id) {
202                    t.status = TaskStatus::Cancelled;
203                }
204                None
205            }
206            // Already in a terminal state — nothing to do.
207            _ => None,
208        }
209    }
210
211    /// Cancel all running and queued tasks whose [`TaskKey`] equals `key`.
212    ///
213    /// Returns the [`TaskId`] of any task that started as a side-effect of
214    /// releasing a running slot.
215    pub fn cancel_by_key(&mut self, key: &TaskKey) -> Option<TaskId> {
216        let ids_to_cancel: Vec<TaskId> = self
217            .tasks
218            .values()
219            .filter(|t| {
220                &t.key == key
221                    && matches!(t.status, TaskStatus::Pending | TaskStatus::Running)
222            })
223            .map(|t| t.id)
224            .collect();
225
226        let mut started = None;
227        for id in ids_to_cancel {
228            if let Some(next) = self.cancel(id) {
229                started = Some(next);
230            }
231        }
232        started
233    }
234
235    /// Transition a task from `Pending` to `Running` and record `started_at`.
236    ///
237    /// Also registers the task in the `running` map for its queue.
238    /// Panics in debug builds if the task is not in `Pending` state.
239    pub fn mark_running(&mut self, task_id: TaskId) {
240        let queue_id = match self.tasks.get_mut(&task_id) {
241            Some(t) => {
242                debug_assert_eq!(
243                    t.status,
244                    TaskStatus::Pending,
245                    "mark_running called on task {:?} with status {:?}",
246                    task_id,
247                    t.status
248                );
249                t.status = TaskStatus::Running;
250                t.started_at = Some(Instant::now());
251                t.key.queue.clone()
252            }
253            None => return,
254        };
255        self.running.insert(queue_id, task_id);
256    }
257
258    /// Record a terminal status for a finished task and start the next one.
259    ///
260    /// If the task was already `Cancelled` the status is **not** overwritten —
261    /// a cancelled task stays cancelled regardless of the final process outcome.
262    ///
263    /// Returns the [`TaskId`] of the next task that was started as a
264    /// side-effect, if the queue had a pending task waiting.
265    pub fn mark_finished(&mut self, task_id: TaskId, status: TaskStatus) -> Option<TaskId> {
266        let queue_id = match self.tasks.get_mut(&task_id) {
267            Some(t) => {
268                // A cancelled task must not be re-labelled.
269                if t.status != TaskStatus::Cancelled {
270                    t.status = status;
271                    t.finished_at = Some(Instant::now());
272                }
273                t.key.queue.clone()
274            }
275            None => return None,
276        };
277
278        // Add to recently-finished ring (cap at 5, discard oldest).
279        if self.recent_finished.len() >= 5 {
280            self.recent_finished.pop_front();
281        }
282        self.recent_finished.push_back(task_id);
283
284        // Clear the running slot only if this task still owns it.
285        if self.running.get(&queue_id) == Some(&task_id) {
286            self.running.remove(&queue_id);
287            self.start_next(&queue_id)
288        } else {
289            None
290        }
291    }
292
293    // -----------------------------------------------------------------------
294    // Read accessors
295    // -----------------------------------------------------------------------
296
297    /// Returns an iterator over all tasks ever created (in arbitrary order).
298    pub fn all_tasks(&self) -> impl Iterator<Item = &Task> {
299        self.tasks.values()
300    }
301
302    /// Returns the total number of tasks ever created (running, pending, or finished).
303    pub fn task_count(&self) -> usize {
304        self.tasks.len()
305    }
306
307    /// Returns a reference to a task by ID, or `None` if it does not exist.
308    pub fn get(&self, id: TaskId) -> Option<&Task> {
309        self.tasks.get(&id)
310    }
311
312    /// Returns the [`TaskId`] of the currently running task on `queue`, if any.
313    pub fn running_task(&self, queue: &TaskQueueId) -> Option<TaskId> {
314        self.running.get(queue).copied()
315    }
316
317    /// Returns an iterator over all currently running tasks `(queue_id, task_id)`.
318    pub fn running_tasks(&self) -> impl Iterator<Item = (&TaskQueueId, &TaskId)> {
319        self.running.iter()
320    }
321
322    /// Returns an iterator over recently finished task IDs (newest last, max 5).
323    ///
324    /// Only tasks with a terminal status (Success / Warning / Error / Cancelled)
325    /// are included.  The caller can use [`TaskRegistry::get`] to read the full task record.
326    pub fn recently_finished_tasks(&self) -> impl Iterator<Item = TaskId> + '_ {
327        // Iterate newest-first so the most recent result appears closest to the
328        // running task on the right side of the status bar.
329        self.recent_finished.iter().rev().copied()
330    }
331
332    /// Returns the ordered pending task IDs for `queue` (front = next to run).
333    pub fn pending_tasks(&self, queue: &TaskQueueId) -> &[TaskId] {
334        self.queues
335            .get(queue)
336            .map(|q| q.as_slices().0)
337            .unwrap_or(&[])
338    }
339
340    // -----------------------------------------------------------------------
341    // Private helpers
342    // -----------------------------------------------------------------------
343
344    fn alloc_id(&mut self) -> TaskId {
345        let id = TaskId(self.next_id);
346        self.next_id += 1;
347        id
348    }
349
350    /// Dequeue the next pending task for `queue` and mark it as running.
351    /// Returns `Some(TaskId)` if a task was started, `None` if the queue is
352    /// empty.
353    fn start_next(&mut self, queue: &TaskQueueId) -> Option<TaskId> {
354        let next_id = self.queues.get_mut(queue)?.pop_front()?;
355        self.mark_running(next_id);
356        Some(next_id)
357    }
358}
359
360impl Default for TaskRegistry {
361    fn default() -> Self {
362        Self::new()
363    }
364}
365
366// ---------------------------------------------------------------------------
367// Tests
368// ---------------------------------------------------------------------------
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373
374    fn key(queue: &str, target: &str) -> TaskKey {
375        TaskKey {
376            queue: TaskQueueId(queue.into()),
377            target: target.into(),
378        }
379    }
380
381    fn sched(reg: &mut TaskRegistry, queue: &str, target: &str) -> TaskId {
382        reg.schedule_task(key(queue, target), TaskTrigger::Manual, format!("echo {target}"))
383    }
384
385    // 1. Schedule same (queue, target) twice → first cancelled, second is active
386    #[test]
387    fn same_key_cancels_previous() {
388        let mut reg = TaskRegistry::new();
389        let id1 = sched(&mut reg, "build", "crate:a");
390        let id2 = sched(&mut reg, "build", "crate:a");
391
392        assert_eq!(reg.get(id1).unwrap().status, TaskStatus::Cancelled);
393        assert!(reg.get(id1).unwrap().cancellation_token.is_cancelled());
394        assert_eq!(reg.get(id2).unwrap().status, TaskStatus::Running);
395        assert_eq!(reg.running_task(&TaskQueueId("build".into())), Some(id2));
396    }
397
398    // 2. Schedule A then B → A running, B queued
399    #[test]
400    fn different_targets_are_queued() {
401        let mut reg = TaskRegistry::new();
402        let id_a = sched(&mut reg, "build", "crate:a");
403        let id_b = sched(&mut reg, "build", "crate:b");
404
405        assert_eq!(reg.get(id_a).unwrap().status, TaskStatus::Running);
406        assert_eq!(reg.get(id_b).unwrap().status, TaskStatus::Pending);
407        assert_eq!(reg.running_task(&TaskQueueId("build".into())), Some(id_a));
408        assert_eq!(reg.pending_tasks(&TaskQueueId("build".into())), &[id_b]);
409    }
410
411    // 3. Schedule A, B, A again → deduplicated: only latest A remains in queue
412    #[test]
413    fn queue_compaction_deduplicates_key() {
414        let mut reg = TaskRegistry::new();
415        let id_a1 = sched(&mut reg, "build", "crate:a");
416        let id_b = sched(&mut reg, "build", "crate:b");
417        // Re-schedule A — must replace queued A, not append.
418        let id_a2 = sched(&mut reg, "build", "crate:a");
419
420        // A1 was running → cancelled; A2 queued (not running, because B is running).
421        // Actually: A1 started immediately. Then B queued. Then A2 is scheduled:
422        //   cancel_by_key(A) hits A1 (Running) → cancelled + start_next → B starts.
423        //   new A2 is created: B is now running → A2 enqueued.
424        assert_eq!(reg.get(id_a1).unwrap().status, TaskStatus::Cancelled);
425        assert_eq!(reg.get(id_b).unwrap().status, TaskStatus::Running);
426        assert_eq!(reg.get(id_a2).unwrap().status, TaskStatus::Pending);
427
428        // Queue must contain exactly A2 (not id_b which is running, not id_a1).
429        let pending = reg.pending_tasks(&TaskQueueId("build".into()));
430        assert_eq!(pending, &[id_a2]);
431    }
432
433    // 4. Cancel a queued task → removed from queue, status Cancelled
434    #[test]
435    fn cancel_queued_task_removes_from_queue() {
436        let mut reg = TaskRegistry::new();
437        let _id_a = sched(&mut reg, "build", "crate:a");
438        let id_b = sched(&mut reg, "build", "crate:b");
439
440        reg.cancel(id_b);
441
442        assert_eq!(reg.get(id_b).unwrap().status, TaskStatus::Cancelled);
443        assert!(reg.pending_tasks(&TaskQueueId("build".into())).is_empty());
444    }
445
446    // 5. Cancel a running task → token triggered, status Cancelled
447    #[test]
448    fn cancel_running_task_triggers_token() {
449        let mut reg = TaskRegistry::new();
450        let id = sched(&mut reg, "build", "crate:a");
451        let token = reg.get(id).unwrap().cancellation_token.clone();
452
453        reg.cancel(id);
454
455        assert_eq!(reg.get(id).unwrap().status, TaskStatus::Cancelled);
456        assert!(token.is_cancelled());
457        assert_eq!(reg.running_task(&TaskQueueId("build".into())), None);
458    }
459
460    // 6. mark_finished → next queued task starts automatically; returns next id
461    #[test]
462    fn finish_starts_next_queued_task() {
463        let mut reg = TaskRegistry::new();
464        let id_a = sched(&mut reg, "build", "crate:a");
465        let id_b = sched(&mut reg, "build", "crate:b");
466
467        let next = reg.mark_finished(id_a, TaskStatus::Success);
468
469        assert_eq!(next, Some(id_b));
470        assert_eq!(reg.get(id_a).unwrap().status, TaskStatus::Success);
471        assert_eq!(reg.get(id_b).unwrap().status, TaskStatus::Running);
472        assert_eq!(reg.running_task(&TaskQueueId("build".into())), Some(id_b));
473    }
474
475    // 7. Multiple queues operate independently
476    #[test]
477    fn independent_queues_do_not_interfere() {
478        let mut reg = TaskRegistry::new();
479        let build_id = sched(&mut reg, "build", "crate:a");
480        let lint_id = sched(&mut reg, "lint", "crate:a");
481
482        assert_eq!(reg.get(build_id).unwrap().status, TaskStatus::Running);
483        assert_eq!(reg.get(lint_id).unwrap().status, TaskStatus::Running);
484
485        reg.mark_finished(build_id, TaskStatus::Success);
486        // Lint queue should be unaffected.
487        assert_eq!(reg.get(lint_id).unwrap().status, TaskStatus::Running);
488    }
489
490    // 8. A finished cancelled task must remain Cancelled
491    #[test]
492    fn cancelled_task_stays_cancelled_on_finish() {
493        let mut reg = TaskRegistry::new();
494        let id = sched(&mut reg, "build", "crate:a");
495        reg.cancel(id);
496
497        // Simulate the async worker not noticing cancellation in time and
498        // reporting success anyway.
499        reg.mark_finished(id, TaskStatus::Success);
500
501        assert_eq!(reg.get(id).unwrap().status, TaskStatus::Cancelled);
502    }
503
504    // 9. start_next on empty queue → no-op, returns None
505    #[test]
506    fn finish_on_empty_queue_is_noop() {
507        let mut reg = TaskRegistry::new();
508        let id = sched(&mut reg, "build", "crate:a");
509        let next = reg.mark_finished(id, TaskStatus::Success);
510
511        assert_eq!(next, None);
512        assert_eq!(reg.running_task(&TaskQueueId("build".into())), None);
513        assert!(reg.pending_tasks(&TaskQueueId("build".into())).is_empty());
514    }
515
516    // Bonus: rapid rescheduling leaves no duplicate key in queue
517    #[test]
518    fn rapid_reschedule_no_duplicate_key_in_queue() {
519        let mut reg = TaskRegistry::new();
520        // Occupy the queue with a long-running anchor task.
521        let _anchor = sched(&mut reg, "build", "anchor");
522
523        // Schedule target A three times rapidly.
524        let id1 = sched(&mut reg, "build", "crate:a");
525        let id2 = sched(&mut reg, "build", "crate:a");
526        let id3 = sched(&mut reg, "build", "crate:a");
527
528        assert_eq!(reg.get(id1).unwrap().status, TaskStatus::Cancelled);
529        assert_eq!(reg.get(id2).unwrap().status, TaskStatus::Cancelled);
530        assert_eq!(reg.get(id3).unwrap().status, TaskStatus::Pending);
531
532        let pending = reg.pending_tasks(&TaskQueueId("build".into()));
533        assert_eq!(pending.len(), 1);
534        assert_eq!(pending[0], id3);
535    }
536
537    // cancel() on a running task returns the next started task
538    #[test]
539    fn cancel_running_returns_next_started() {
540        let mut reg = TaskRegistry::new();
541        let id_a = sched(&mut reg, "build", "crate:a");
542        let id_b = sched(&mut reg, "build", "crate:b");
543
544        let next = reg.cancel(id_a);
545
546        assert_eq!(next, Some(id_b));
547        assert_eq!(reg.get(id_b).unwrap().status, TaskStatus::Running);
548    }
549}