Skip to main content

oo_ide/
task_executor.rs

1//! Async task executor with incremental output streaming.
2//!
3//! [`TaskExecutor`] bridges [`crate::task_registry::TaskRegistry`] and the OS
4//! process layer.  When the registry marks a task as running, the caller
5//! invokes [`TaskExecutor::spawn`] to start the actual process asynchronously.
6//!
7//! # Output fan-out
8//!
9//! Raw output bytes are broadcast on a [`tokio::sync::broadcast`] channel so
10//! that multiple independent consumers (future: log storage, diagnostic
11//! parser, UI render) can each subscribe without blocking one another.  Use
12//! [`TaskExecutor::subscribe`] to obtain a receiver.
13//!
14//! # Cancellation
15//!
16//! The `CancellationToken` stored in each `crate::task_registry::Task` is monitored inside the
17//! spawned async task. When it fires, the child process is killed and the
18//! task is reported back as `TaskStatus::Cancelled`.
19//!
20//! # Integration
21//!
22//! The executor sends [`Operation::TaskFinished`] (or [`Operation::TaskStarted`])
23//! back to the main event loop via the `op_tx` channel so that
24//! `apply_operation` can call [`crate::task_registry::TaskRegistry::mark_finished`] on the main
25//! thread and keep the registry consistent.
26
27use std::collections::HashMap;
28use std::sync::Arc;
29
30use tokio::io::AsyncReadExt as _;
31use tokio::process::Command;
32use tokio::sync::broadcast;
33use tokio::task::JoinHandle;
34use tokio_util::sync::CancellationToken;
35
36use crate::diagnostics_extractor::DiagnosticsExtractor;
37use crate::log_matcher::{MatcherEngine, MatcherRegistry};
38use crate::log_store::LogStore;
39use crate::operation::Operation;
40use crate::task_registry::{TaskId, TaskStatus};
41use crate::vt_parser::{StyledLine, TerminalParser};
42
43// ---------------------------------------------------------------------------
44// Public types
45// ---------------------------------------------------------------------------
46
47/// Which stdio stream an output chunk originated from.
48#[derive(Clone, Debug, PartialEq, Eq)]
49pub enum OutputStream {
50    Stdout,
51    Stderr,
52}
53
54/// A raw byte chunk read from a task's stdout or stderr.
55///
56/// Retained for potential future consumers that need raw bytes (e.g. a
57/// protocol bridge).  The primary pipeline uses [`ParsedOutputEvent`] instead.
58#[derive(Clone, Debug)]
59pub struct TaskOutputEvent {
60    pub task_id: TaskId,
61    pub stream: OutputStream,
62    /// Raw bytes — may contain partial UTF-8 sequences or ANSI escape codes.
63    pub chunk: Vec<u8>,
64}
65
66/// A parsed, styled output event produced after running bytes through
67/// [`TerminalParser`].
68#[derive(Clone, Debug)]
69pub struct ParsedOutputEvent {
70    pub task_id: TaskId,
71    pub stream: OutputStream,
72    /// Completed styled lines decoded from the raw byte stream.
73    pub lines: Vec<StyledLine>,
74}
75
76/// High-level event broadcast to all subscribers.
77#[derive(Clone, Debug)]
78pub enum TaskEvent {
79    /// The executor started working on this task (process about to be spawned).
80    Started(TaskId),
81    /// One or more styled lines of output arrived from the running process.
82    ParsedOutput(ParsedOutputEvent),
83    /// The task reached a terminal state.
84    Finished(TaskId, TaskStatus),
85}
86
87// ---------------------------------------------------------------------------
88// TaskExecutor
89// ---------------------------------------------------------------------------
90
91/// Spawns tasks as OS processes and streams their output over a broadcast
92/// channel.
93///
94/// Lives on `AppState` on the main thread; all async work is delegated to
95/// Tokio task handles stored internally.
96pub struct TaskExecutor {
97    event_tx: broadcast::Sender<TaskEvent>,
98    /// Active async handles — used to abort tasks on cancellation.
99    handles: HashMap<TaskId, JoinHandle<()>>,
100    /// Snapshot provider for matchers – owned Arc to the central registry.
101    matcher_registry: std::sync::Arc<MatcherRegistry>,
102}
103
104impl TaskExecutor {
105    /// Capacity of the broadcast ring buffer.  Lagging receivers will miss
106    /// older output chunks but the executor itself never blocks.
107    const BROADCAST_CAPACITY: usize = 256;
108
109    pub fn new() -> Self {
110        let (event_tx, _) = broadcast::channel(Self::BROADCAST_CAPACITY);
111        Self {
112            event_tx,
113            handles: HashMap::new(),
114            matcher_registry: std::sync::Arc::new(MatcherRegistry::new()),
115        }
116    }
117
118    /// Create a TaskExecutor that uses the given MatcherRegistry snapshot
119    /// provider.  This is used by AppState to ensure spawned tasks use the
120    /// central ExtensionManager matcher set.
121    pub fn with_matcher_registry(matcher_registry: std::sync::Arc<MatcherRegistry>) -> Self {
122        let (event_tx, _) = broadcast::channel(Self::BROADCAST_CAPACITY);
123        Self {
124            event_tx,
125            handles: HashMap::new(),
126            matcher_registry,
127        }
128    }
129
130    /// Subscribe to the live event stream.
131    ///
132    /// Receivers are independent; a slow receiver simply misses old messages
133    /// once the ring buffer wraps (broadcast semantics).
134    pub fn subscribe(&self) -> broadcast::Receiver<TaskEvent> {
135        self.event_tx.subscribe()
136    }
137
138    /// Start executing `command` in a background Tokio task.
139    ///
140    /// * Broadcasts [`TaskEvent::Started`] immediately.
141    /// * Streams stdout and stderr as [`TaskEvent::ParsedOutput`] events,
142    ///   with raw bytes parsed through per-stream [`TerminalParser`] instances.
143    /// * Runs a [`DiagnosticsExtractor`] (shared between both streams) and
144    ///   sends [`Operation::AddIssue`] for each matched diagnostic line.
145    /// * Runs a [`MatcherEngine`] (separate instance per stream) for each set
146    ///   of extension-defined log matchers; emits [`Operation::AddIssue`] for
147    ///   every completed block and on end-of-stream flush.
148    /// * Watches `cancellation_token`; kills the process if it fires.
149    /// * Sends [`Operation::TaskFinished`] (or Cancelled) back through `op_tx`
150    ///   so the main loop can update the registry.
151    ///
152    /// `marker` is the issue registry marker used for all extracted issues
153    /// (typically `"task:{queue}:{target}"`).  The caller is responsible for
154    /// clearing stale issues with this marker before calling `spawn`.
155    ///
156    /// `matchers` is the set of compiled log matchers contributed by loaded
157    /// extensions.  Pass an empty `Vec` when no extensions are active.
158    ///
159    /// If `log_store` is provided, a dedicated log-writing sub-task subscribes
160    /// to the broadcast channel and writes each line to
161    /// `.oo/cache/tasks/<task_id>.log`, then compresses the file on completion.
162    #[allow(clippy::too_many_arguments)]
163    pub fn spawn(
164        &mut self,
165        task_id: TaskId,
166        command: String,
167        cancellation_token: CancellationToken,
168        marker: String,
169        log_store: Option<LogStore>,
170        op_tx: &tokio::sync::mpsc::UnboundedSender<Vec<Operation>>,
171    ) {
172        // Abort any existing handle for this task ID (defensive — should be
173        // a no-op in normal operation because schedule_task cancelled it first).
174        if let Some(old) = self.handles.remove(&task_id) {
175            old.abort();
176        }
177
178        let event_tx = self.event_tx.clone();
179        let op_tx = op_tx.clone();
180
181        // Subscribe to the broadcast channel BEFORE spawning the async task so
182        // that the log writer's receiver is set up before any events are sent.
183        // Broadcast ring-buffers up to BROADCAST_CAPACITY messages for us.
184        let log_rx = log_store.as_ref().map(|_| event_tx.subscribe());
185
186        // Build a shared diagnostics extractor for this task execution.
187        // The source tag is derived from the marker (best-effort).
188        let source = marker
189            .strip_prefix("task:")
190            .and_then(|s| s.split(':').next())
191            .unwrap_or("task")
192            .to_string();
193        let extractor = Arc::new(DiagnosticsExtractor::new(marker.clone(), source));
194
195        // Obtain a snapshot of active matchers from the central registry and
196        // build per-stream MatcherEngine instances.  Clone is O(1) per matcher
197        // because regex fields are Arc-wrapped.
198        let matchers = self.matcher_registry.iter_active();
199        let stdout_engine = if matchers.is_empty() {
200            None
201        } else {
202            Some(MatcherEngine::new(matchers.clone(), marker.clone()))
203        };
204        let stderr_engine = if matchers.is_empty() {
205            None
206        } else {
207            Some(MatcherEngine::new(matchers, marker))
208        };
209
210        let handle = tokio::spawn(async move {
211            // Spawn the log-writing task BEFORE broadcasting Started so it is
212            // ready to receive every subsequent event.
213            if let (Some(ls), Some(mut rx)) = (log_store, log_rx) {
214                tokio::spawn(async move {
215                    let mut writer = match ls.open_writer(task_id).await {
216                        Ok(w) => w,
217                        Err(e) => {
218                            log::warn!(
219                                "log_store: failed to open writer for task {}: {e}",
220                                task_id.0
221                            );
222                            return;
223                        }
224                    };
225
226                    let mut finished_normally = false;
227                    loop {
228                        match rx.recv().await {
229                            Ok(TaskEvent::ParsedOutput(ev)) if ev.task_id == task_id => {
230                                for line in &ev.lines {
231                                    if let Err(e) = writer.append_line(&line.text).await {
232                                        log::warn!("log_store: write error: {e}");
233                                    }
234                                }
235                            }
236                            Ok(TaskEvent::Finished(id, _)) if id == task_id => {
237                                finished_normally = true;
238                                break;
239                            }
240                            Err(broadcast::error::RecvError::Closed) => break,
241                            Err(broadcast::error::RecvError::Lagged(n)) => {
242                                log::warn!("log_store: receiver lagged by {n} messages");
243                            }
244                            _ => {}
245                        }
246                    }
247
248                    if finished_normally {
249                        if let Err(e) = writer.close_and_compress(&ls).await {
250                            log::warn!(
251                                "log_store: compress error for task {}: {e}",
252                                task_id.0
253                            );
254                            // Attempt plain close as a fallback.
255                            let _ = ls.log_path(task_id); // path still exists
256                        }
257                    } else {
258                        // Cancelled or channel closed — keep partial log uncompressed.
259                        if let Err(e) = writer.close().await {
260                            log::warn!(
261                                "log_store: close error for task {}: {e}",
262                                task_id.0
263                            );
264                        }
265                    }
266                });
267            }
268
269            // Signal that we are starting.
270            let _ = event_tx.send(TaskEvent::Started(task_id));
271            let _ = op_tx.send(vec![Operation::TaskStarted(task_id)]);
272
273            // Spawn the OS process.
274            let child_result = build_command(&command)
275                .stdout(std::process::Stdio::piped())
276                .stderr(std::process::Stdio::piped())
277                // Ensure the process is killed if this future is dropped or
278                // aborted before `child.wait()` is reached.
279                .kill_on_drop(true)
280                .spawn();
281
282            let mut child = match child_result {
283                Ok(c) => c,
284                Err(_) => {
285                    let _ = event_tx.send(TaskEvent::Finished(task_id, TaskStatus::Error));
286                    let _ = op_tx.send(vec![Operation::TaskFinished {
287                        id: task_id,
288                        status: TaskStatus::Error,
289                    }]);
290                    return;
291                }
292            };
293
294            // Take the stdio handles before `child` is borrowed by `wait`.
295            let stdout = child.stdout.take().expect("stdout was piped");
296            let stderr = child.stderr.take().expect("stderr was piped");
297
298            // Each stream gets its own stateful VT100 parser and MatcherEngine;
299            // they run concurrently in separate sub-tasks and flush on stream close.
300            let stdout_tx = event_tx.clone();
301            let stdout_op_tx = op_tx.clone();
302            let stdout_extractor = Arc::clone(&extractor);
303            let stdout_task = tokio::spawn(async move {
304                stream_output_parsed(
305                    task_id,
306                    OutputStream::Stdout,
307                    stdout,
308                    TerminalParser::new(),
309                    &stdout_extractor,
310                    stdout_engine,
311                    &stdout_tx,
312                    &stdout_op_tx,
313                )
314                .await;
315            });
316
317            let stderr_tx = event_tx.clone();
318            let stderr_op_tx = op_tx.clone();
319            let stderr_extractor = Arc::clone(&extractor);
320            let stderr_task = tokio::spawn(async move {
321                stream_output_parsed(
322                    task_id,
323                    OutputStream::Stderr,
324                    stderr,
325                    TerminalParser::new(),
326                    &stderr_extractor,
327                    stderr_engine,
328                    &stderr_tx,
329                    &stderr_op_tx,
330                )
331                .await;
332            });
333
334            // Wait for either the process to exit or cancellation.
335            let final_status = tokio::select! {
336                _ = cancellation_token.cancelled() => {
337                    // Kill the process and abort the streaming tasks immediately.
338                    // On Windows, killing the shell (`cmd.exe`) may leave grandchild
339                    // processes alive that still hold the pipe handles open; aborting
340                    // the streaming tasks avoids an indefinite pipe-drain hang.
341                    let _ = child.kill().await;
342                    stdout_task.abort();
343                    stderr_task.abort();
344                    TaskStatus::Cancelled
345                }
346                result = child.wait() => {
347                    // Process exited naturally — drain all remaining output first.
348                    let _ = tokio::join!(stdout_task, stderr_task);
349                    match result {
350                        Ok(exit) if exit.success() => TaskStatus::Success,
351                        _ => TaskStatus::Error,
352                    }
353                }
354            };
355
356            let _ = event_tx.send(TaskEvent::Finished(task_id, final_status.clone()));
357            let _ = op_tx.send(vec![Operation::TaskFinished {
358                id: task_id,
359                status: final_status,
360            }]);
361        });
362
363        self.handles.insert(task_id, handle);
364    }
365
366    /// Abort the async task for `task_id`, if one is running.
367    ///
368    /// This does *not* update the registry — the caller is responsible for
369    /// calling [`crate::task_registry::TaskRegistry::cancel`] before or after this.
370    pub fn abort(&mut self, task_id: TaskId) {
371        if let Some(handle) = self.handles.remove(&task_id) {
372            handle.abort();
373        }
374    }
375}
376
377impl Default for TaskExecutor {
378    fn default() -> Self {
379        Self::new()
380    }
381}
382
383// ---------------------------------------------------------------------------
384// Helpers
385// ---------------------------------------------------------------------------
386
387/// Read bytes from `reader`, parse them through `parser`, and broadcast
388/// completed [`StyledLine`]s as [`TaskEvent::ParsedOutput`] events until the
389/// stream closes.
390///
391/// Each completed line is also run through `extractor` and (if provided)
392/// `engine`; any resulting [`crate::issue_registry::NewIssue`]s are sent back
393/// via `op_tx` as [`Operation::AddIssue`] operations so the registry is
394/// updated on the main thread.
395///
396/// When the stream reaches EOF:
397/// * The parser is flushed so that any partial line without a trailing `\n`
398///   is also broadcast and extracted.
399/// * The engine (if any) is flushed so that any pending multi-line block is
400///   emitted as an issue.
401///
402/// On cancellation the parent task aborts this sub-task; no flush occurs,
403/// which is intentional — partial output from a cancelled task is discarded.
404#[allow(clippy::too_many_arguments)]
405async fn stream_output_parsed(
406    task_id: TaskId,
407    stream: OutputStream,
408    mut reader: impl tokio::io::AsyncRead + Unpin,
409    mut parser: TerminalParser,
410    extractor: &DiagnosticsExtractor,
411    mut engine: Option<MatcherEngine>,
412    tx: &broadcast::Sender<TaskEvent>,
413    op_tx: &tokio::sync::mpsc::UnboundedSender<Vec<Operation>>,
414) {
415    let mut buf = vec![0u8; 4096];
416    loop {
417        match reader.read(&mut buf).await {
418            Ok(0) | Err(_) => break,
419            Ok(n) => {
420                let lines = parser.push(&buf[..n]);
421                broadcast_and_extract(task_id, &stream, &lines, extractor, engine.as_mut(), tx, op_tx);
422            }
423        }
424    }
425    // Flush any partial line that had no trailing newline.
426    if let Some(line) = parser.flush() {
427        broadcast_and_extract(task_id, &stream, &[line], extractor, engine.as_mut(), tx, op_tx);
428    }
429    // Flush any pending engine block (end-of-stream).
430    if let Some(eng) = engine.as_mut() {
431        let issues = eng.flush();
432        send_issues(issues, op_tx);
433    }
434}
435
436/// Broadcast `lines` as a `ParsedOutput` event, run the extractor and engine
437/// on each line, sending any resulting `AddIssue` operations through `op_tx`.
438fn broadcast_and_extract(
439    task_id: TaskId,
440    stream: &OutputStream,
441    lines: &[StyledLine],
442    extractor: &DiagnosticsExtractor,
443    engine: Option<&mut MatcherEngine>,
444    tx: &broadcast::Sender<TaskEvent>,
445    op_tx: &tokio::sync::mpsc::UnboundedSender<Vec<Operation>>,
446) {
447    if lines.is_empty() {
448        return;
449    }
450
451    // Broadcast the styled lines to all subscribers.
452    let _ = tx.send(TaskEvent::ParsedOutput(ParsedOutputEvent {
453        task_id,
454        stream: stream.clone(),
455        lines: lines.to_vec(),
456    }));
457
458    // Extract diagnostics and send AddIssue ops for each match.
459    for line in lines {
460        let issues = extractor.extract_from_line(line);
461        if !issues.is_empty() {
462            let ops: Vec<Operation> =
463                issues.into_iter().map(|i| Operation::AddIssue { issue: i }).collect();
464            let _ = op_tx.send(ops);
465        }
466    }
467
468    // Run log matcher engine on each line.
469    if let Some(eng) = engine {
470        for line in lines {
471            let issues = eng.process_line(&line.text);
472            send_issues(issues, op_tx);
473        }
474    }
475}
476
477/// Send a batch of issues via `op_tx` (no-op if the vec is empty).
478fn send_issues(
479    issues: Vec<crate::issue_registry::NewIssue>,
480    op_tx: &tokio::sync::mpsc::UnboundedSender<Vec<Operation>>,
481) {
482    if !issues.is_empty() {
483        let ops: Vec<Operation> =
484            issues.into_iter().map(|i| Operation::AddIssue { issue: i }).collect();
485        let _ = op_tx.send(ops);
486    }
487}
488
489/// Build a platform-appropriate [`Command`] that runs `command` through the
490/// system shell.
491fn build_command(command: &str) -> Command {
492    #[cfg(unix)]
493    {
494        let mut cmd = Command::new("sh");
495        cmd.arg("-c").arg(command);
496        cmd
497    }
498    #[cfg(windows)]
499    {
500        let mut cmd = Command::new("cmd");
501        cmd.arg("/C").arg(command);
502        cmd
503    }
504}
505
506// ---------------------------------------------------------------------------
507// Tests
508// ---------------------------------------------------------------------------
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513    use crate::task_registry::{TaskKey, TaskQueueId, TaskTrigger, TaskRegistry};
514    use tokio::sync::mpsc::unbounded_channel;
515    use std::time::Duration;
516
517    /// Safety timeout for all async tests — prevents CI hangs on Windows.
518    const TEST_TIMEOUT: Duration = Duration::from_secs(15);
519
520    fn dummy_key() -> TaskKey {
521        TaskKey {
522            queue: TaskQueueId("test".into()),
523            target: "t".into(),
524        }
525    }
526
527    fn key_n(n: u64) -> TaskKey {
528        TaskKey {
529            queue: TaskQueueId("test".into()),
530            target: format!("t{n}"),
531        }
532    }
533
534    fn schedule(reg: &mut TaskRegistry, key: TaskKey, command: &str) -> (TaskId, CancellationToken, String) {
535        let cmd = command.to_string();
536        let id = reg.schedule_task(key, TaskTrigger::Manual, cmd.clone());
537        let token = reg.get(id).unwrap().cancellation_token.clone();
538        (id, token, cmd)
539    }
540
541    // Platform-specific test commands.
542    // On Windows we use `cmd /C <command>` via build_command.
543    // echo exits quickly; sleep_long is a long-running placeholder that
544    // we cancel before it ends.
545
546    #[cfg(unix)]
547    mod cmds {
548        pub const ECHO_STDOUT: &str = "echo hello";
549        pub const ECHO_STDERR: &str = "echo err >&2";
550        pub const SLEEP_LONG: &str = "sleep 60";
551        pub const NO_SUCH: &str = "/no/such/binary/xyz_oo_test";
552    }
553    #[cfg(windows)]
554    mod cmds {
555        // On Windows `cmd /C echo hello` writes to stdout.
556        pub const ECHO_STDOUT: &str = "echo hello";
557        // Redirect echo output to stderr via cmd.
558        pub const ECHO_STDERR: &str = "echo err>&2";
559        // `ping` with a very large count acts as a long-running sleep.
560        pub const SLEEP_LONG: &str = "ping -n 120 127.0.0.1";
561        // A command that cmd.exe cannot find → exits with non-zero.
562        pub const NO_SUCH: &str = "no_such_binary_xyz_oo_test";
563    }
564
565    // 1. Task runs and emits stdout
566    #[tokio::test]
567    async fn task_emits_stdout() {
568        tokio::time::timeout(TEST_TIMEOUT, async {
569            let mut reg = TaskRegistry::new();
570            let mut exec = TaskExecutor::new();
571            let mut rx = exec.subscribe();
572            let (op_tx, mut op_rx) = unbounded_channel::<Vec<Operation>>();
573
574            let (id, token, cmd) = schedule(&mut reg, dummy_key(), cmds::ECHO_STDOUT);
575            exec.spawn(id, cmd, token, "task:test:t".into(), None, &op_tx);
576
577            let mut got_output = false;
578            loop {
579                match rx.recv().await.unwrap() {
580                    TaskEvent::ParsedOutput(ev)
581                        if ev.task_id == id && ev.stream == OutputStream::Stdout =>
582                    {
583                        got_output = true;
584                    }
585                    TaskEvent::Finished(tid, status) if tid == id => {
586                        assert_eq!(status, TaskStatus::Success);
587                        break;
588                    }
589                    _ => {}
590                }
591            }
592            assert!(got_output, "expected at least one stdout output event");
593
594            // op_tx must deliver TaskStarted followed by TaskFinished(Success).
595            let ops: Vec<Operation> = op_rx.try_recv().unwrap();
596            assert!(ops.iter().any(|o| matches!(o, Operation::TaskStarted(i) if *i == id)));
597            let ops2: Vec<Operation> = op_rx.recv().await.unwrap();
598            assert!(ops2.iter().any(|o| matches!(
599                o,
600                Operation::TaskFinished { id: i, status: TaskStatus::Success } if *i == id
601            )));
602        })
603        .await
604        .expect("test timed out");
605    }
606
607    // 2. Task emits stderr
608    #[tokio::test]
609    async fn task_emits_stderr() {
610        tokio::time::timeout(TEST_TIMEOUT, async {
611            let mut reg = TaskRegistry::new();
612            let mut exec = TaskExecutor::new();
613            let mut rx = exec.subscribe();
614            let (op_tx, _op_rx) = unbounded_channel::<Vec<Operation>>();
615
616            let (id, token, cmd) = schedule(&mut reg, dummy_key(), cmds::ECHO_STDERR);
617            exec.spawn(id, cmd, token, "task:test:t".into(), None, &op_tx);
618
619            let mut got_stderr = false;
620            loop {
621                match rx.recv().await.unwrap() {
622                    TaskEvent::ParsedOutput(ev)
623                        if ev.task_id == id && ev.stream == OutputStream::Stderr =>
624                    {
625                        got_stderr = true;
626                    }
627                    TaskEvent::Finished(tid, _) if tid == id => break,
628                    _ => {}
629                }
630            }
631            assert!(got_stderr, "expected at least one stderr output event");
632        })
633        .await
634        .expect("test timed out");
635    }
636
637    // 3. Cancellation stops execution
638    #[tokio::test]
639    async fn cancellation_stops_task() {
640        tokio::time::timeout(TEST_TIMEOUT, async {
641            let mut reg = TaskRegistry::new();
642            let mut exec = TaskExecutor::new();
643            let mut rx = exec.subscribe();
644            let (op_tx, mut op_rx) = unbounded_channel::<Vec<Operation>>();
645
646            let (id, token, cmd) = schedule(&mut reg, dummy_key(), cmds::SLEEP_LONG);
647            exec.spawn(id, cmd, token.clone(), "task:test:t".into(), None, &op_tx);
648
649            // Wait for Started, then cancel.
650            loop {
651                if let Ok(TaskEvent::Started(tid)) = rx.recv().await
652                    && tid == id { break; }
653            }
654            token.cancel();
655
656            // Must receive Finished(Cancelled) on the broadcast channel.
657            loop {
658                if let Ok(TaskEvent::Finished(tid, status)) = rx.recv().await
659                    && tid == id {
660                        assert_eq!(status, TaskStatus::Cancelled);
661                        break;
662                    }
663            }
664
665            // op_tx must also deliver the Cancelled finish.
666            let mut found = false;
667            while let Some(ops) = op_rx.recv().await {
668                if ops.iter().any(|o| matches!(
669                    o,
670                    Operation::TaskFinished { id: i, status: TaskStatus::Cancelled }
671                    if *i == id
672                )) {
673                    found = true;
674                    break;
675                }
676            }
677            assert!(found, "expected TaskFinished(Cancelled) in op_tx");
678        })
679        .await
680        .expect("test timed out");
681    }
682
683    // 4. Multiple subscribers receive the same output
684    #[tokio::test]
685    async fn multiple_subscribers_receive_same_output() {
686        tokio::time::timeout(TEST_TIMEOUT, async {
687            let mut reg = TaskRegistry::new();
688            let mut exec = TaskExecutor::new();
689            let rx1 = exec.subscribe();
690            let rx2 = exec.subscribe();
691            let (op_tx, _op_rx) = unbounded_channel::<Vec<Operation>>();
692
693            let (id, token, cmd) = schedule(&mut reg, dummy_key(), cmds::ECHO_STDOUT);
694            exec.spawn(id, cmd, token, "task:test:t".into(), None, &op_tx);
695
696            async fn collect(mut rx: broadcast::Receiver<TaskEvent>, id: TaskId) -> usize {
697                let mut chunks = 0usize;
698                loop {
699                    match rx.recv().await.unwrap() {
700                        TaskEvent::ParsedOutput(_) => chunks += 1,
701                        TaskEvent::Finished(tid, _) if tid == id => return chunks,
702                        _ => {}
703                    }
704                }
705            }
706
707            let (c1, c2) = tokio::join!(collect(rx1, id), collect(rx2, id));
708            assert_eq!(c1, c2, "both subscribers should see the same chunk count");
709            assert!(c1 > 0, "expected at least one output chunk");
710        })
711        .await
712        .expect("test timed out");
713    }
714
715    // 5. Process spawn/exit failure → Error status
716    #[tokio::test]
717    async fn spawn_failure_reports_error() {
718        tokio::time::timeout(TEST_TIMEOUT, async {
719            let mut reg = TaskRegistry::new();
720            let mut exec = TaskExecutor::new();
721            let mut rx = exec.subscribe();
722            let (op_tx, _op_rx) = unbounded_channel::<Vec<Operation>>();
723
724            let (id, token, _) = schedule(&mut reg, dummy_key(), cmds::NO_SUCH);
725            exec.spawn(id, cmds::NO_SUCH.to_string(), token, "task:test:t".into(), None, &op_tx);
726
727            let final_status;
728            loop {
729                match rx.recv().await.unwrap() {
730                    TaskEvent::Finished(tid, status) if tid == id => {
731                        final_status = status;
732                        break;
733                    }
734                    _ => {}
735                }
736            }
737            // Whether spawn itself fails or the shell exits non-zero, both → Error.
738            assert!(
739                matches!(final_status, TaskStatus::Error),
740                "expected Error, got {final_status:?}"
741            );
742        })
743        .await
744        .expect("test timed out");
745    }
746
747    // 6. Rapid cancel + restart: old process is killed, new one succeeds
748    #[tokio::test]
749    async fn rapid_cancel_restart() {
750        tokio::time::timeout(TEST_TIMEOUT, async {
751            let mut reg = TaskRegistry::new();
752            let mut exec = TaskExecutor::new();
753            let mut rx = exec.subscribe();
754            let (op_tx, _op_rx) = unbounded_channel::<Vec<Operation>>();
755
756            let (id1, token1, cmd1) = schedule(&mut reg, key_n(1), cmds::SLEEP_LONG);
757            exec.spawn(id1, cmd1, token1.clone(), "task:test:t1".into(), None, &op_tx);
758
759            // Wait for the first task to start, then cancel it.
760            loop {
761                if let Ok(TaskEvent::Started(tid)) = rx.recv().await
762                    && tid == id1 { break; }
763            }
764            token1.cancel();
765
766            // Schedule a second task (different key so no registry interaction needed).
767            let (id2, token2, cmd2) = schedule(&mut reg, key_n(2), cmds::ECHO_STDOUT);
768            exec.spawn(id2, cmd2, token2, "task:test:t2".into(), None, &op_tx);
769
770            // Expect: id1 → Cancelled, id2 → Success (order may interleave).
771            let mut saw_cancel = false;
772            let mut saw_success = false;
773            loop {
774                match rx.recv().await.unwrap() {
775                    TaskEvent::Finished(tid, TaskStatus::Cancelled) if tid == id1 => {
776                        saw_cancel = true;
777                    }
778                    TaskEvent::Finished(tid, TaskStatus::Success) if tid == id2 => {
779                        saw_success = true;
780                    }
781                    _ => {}
782                }
783                if saw_cancel && saw_success { break; }
784            }
785            assert!(saw_cancel, "first task should be Cancelled");
786            assert!(saw_success, "second task should succeed");
787        })
788        .await
789        .expect("test timed out");
790    }
791}
792