Skip to main content

haz_exec/
output.rs

1//! [`RunObserver`] implementations for the two normative
2//! presentation modes of `EXEC-016`.
3//!
4//! Two distinct observer types live here. They share the
5//! `EXEC-016` vocabulary (mode-specific surfacing of a task's
6//! captured `stdout` and `stderr`) but compose differently with
7//! the scheduler:
8//!
9//! - [`LiveOutputObserver`] splits incoming byte chunks on `\n`
10//!   and writes each complete line to the configured `stdout` /
11//!   `stderr` sink prefixed with `[project:task] `. Multiple
12//!   in-flight tasks interleave at line granularity; never within
13//!   a line. Trailing partial bytes (a write with no terminating
14//!   newline) are buffered until the next `\n` or until the task
15//!   terminates, at which point they are flushed with an appended
16//!   `\n` so the line-atomicity invariant is preserved.
17//!
18//! - [`BufferedOutputObserver`] accumulates incoming byte chunks
19//!   per task into in-memory buffers and writes each task's full
20//!   `stdout` followed by its full `stderr` as two contiguous
21//!   blocks when the task terminates. No tag prefix; bytes are
22//!   verbatim. Output from different tasks never interleaves.
23//!
24//! Both observers are generic over the sink types `O: Write +
25//! Send` (stdout) and `E: Write + Send` (stderr). Production code
26//! passes [`std::io::Stdout`] / [`std::io::Stderr`] (or their
27//! `lock` guards). Tests pass `Vec<u8>` (which implements
28//! [`Write`]) and call [`LiveOutputObserver::into_inner`] /
29//! [`BufferedOutputObserver::into_inner`] after the run to extract
30//! the captured bytes.
31//!
32//! Interior mutability is a single [`std::sync::Mutex`] per
33//! observer guarding the sinks together with the per-task
34//! accumulator map. The kernel `write` syscall dominates lock-hold
35//! time; per-stream contention savings are theoretical. Holding a
36//! single mutex across the line-splitting and sink write is what
37//! enforces `EXEC-016`'s atomic-per-line-writes contract in live
38//! mode.
39//!
40//! Both observers implement the cache-hit contract of `EXEC-017`
41//! implicitly: [`crate::run_task::restore_from_hit`] feeds cached
42//! `stdout` / `stderr` bytes through the same
43//! [`RunObserver::on_stdout`] and [`RunObserver::on_stderr`] calls
44//! a fresh run uses. The observer cannot distinguish a hit from a
45//! fresh run; both surface byte-identically.
46//!
47//! Cache contents themselves are untouched by either observer.
48//! `EXEC-016`'s "the captured bytes that enter the cache per
49//! `CACHE-012` are the bytes the process wrote, EXACTLY, with no
50//! prefix and no transformation" is satisfied structurally:
51//! [`crate::run_task::run_fresh`] hashes and stores the raw byte
52//! buffers; the observer reads its own copy of the same bytes for
53//! presentation.
54
55use std::collections::BTreeMap;
56use std::io::{self, Write};
57use std::sync::{Arc, Mutex, PoisonError};
58use std::time::Instant;
59
60use haz_domain::task_id::TaskId;
61
62use crate::presenter::{PlainPresenter, TaskPresenter};
63use crate::run_task::{CancelledRecord, CompletedRecord, RunObserver, SkipRecord};
64
65/// Live-mode [`RunObserver`] per `EXEC-016`.
66///
67/// See the [module-level docs](self) for the presentation
68/// contract. Constructed with two sinks via [`Self::new`]; the
69/// inner sinks can be reclaimed with [`Self::into_inner`] after
70/// the run completes.
71///
72/// # Errors
73///
74/// Sink writes go through [`Write::write_all`]. The trait method
75/// signatures on [`RunObserver`] do NOT return errors; a write
76/// failure in the sink is therefore silently ignored at this
77/// layer. Production callers wire `stdout` and `stderr` to the
78/// parent process's terminal streams, where write failure is a
79/// terminal-condition concern out of scope here. Tests use
80/// `Vec<u8>` sinks, which cannot fail.
81pub struct LiveOutputObserver<O, E>
82where
83    O: Write + Send,
84    E: Write + Send,
85{
86    state: Mutex<LiveState<O, E>>,
87    presenter: Arc<dyn TaskPresenter>,
88}
89
90struct LiveState<O, E> {
91    stdout: O,
92    stderr: E,
93    accumulators: BTreeMap<TaskId, LiveAccumulator>,
94}
95
96struct LiveAccumulator {
97    stdout_partial: Vec<u8>,
98    stderr_partial: Vec<u8>,
99    started_at: Instant,
100}
101
102impl LiveAccumulator {
103    fn new() -> Self {
104        Self {
105            stdout_partial: Vec::new(),
106            stderr_partial: Vec::new(),
107            started_at: Instant::now(),
108        }
109    }
110}
111
112impl<O, E> LiveOutputObserver<O, E>
113where
114    O: Write + Send,
115    E: Write + Send,
116{
117    /// Build an observer that writes complete lines to `stdout`
118    /// and `stderr` with per-task tag prefixes from
119    /// [`PlainPresenter`] (the historical `[project:task] `
120    /// format, no per-task summary lines). Equivalent to
121    /// [`Self::with_presenter`] called with `Arc::new(PlainPresenter)`.
122    pub fn new(stdout: O, stderr: E) -> Self {
123        Self::with_presenter(stdout, stderr, Arc::new(PlainPresenter))
124    }
125
126    /// Build an observer that delegates prefix and summary-line
127    /// rendering to `presenter`.
128    ///
129    /// Callers that want color, glyph status markers, or per-task
130    /// duration summaries inject a colour-aware presenter; library
131    /// tests use the default [`PlainPresenter`] via [`Self::new`].
132    pub fn with_presenter(stdout: O, stderr: E, presenter: Arc<dyn TaskPresenter>) -> Self {
133        Self {
134            state: Mutex::new(LiveState {
135                stdout,
136                stderr,
137                accumulators: BTreeMap::new(),
138            }),
139            presenter,
140        }
141    }
142
143    /// Reclaim the two sinks after the run has finished and the
144    /// observer is no longer borrowed by the scheduler. The
145    /// returned tuple is `(stdout, stderr)`.
146    pub fn into_inner(self) -> (O, E) {
147        let state = self
148            .state
149            .into_inner()
150            .unwrap_or_else(PoisonError::into_inner);
151        (state.stdout, state.stderr)
152    }
153}
154
155impl<O, E> RunObserver for LiveOutputObserver<O, E>
156where
157    O: Write + Send,
158    E: Write + Send,
159{
160    fn on_task_started(&self, task: &TaskId) {
161        let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
162        state
163            .accumulators
164            .insert(task.clone(), LiveAccumulator::new());
165    }
166
167    fn on_stdout(&self, task: &TaskId, bytes: &[u8]) {
168        if bytes.is_empty() {
169            return;
170        }
171        let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
172        let Some(accumulator) = state.accumulators.get_mut(task) else {
173            return;
174        };
175        let mut partial = std::mem::take(&mut accumulator.stdout_partial);
176        partial.extend_from_slice(bytes);
177        let (complete, remainder) = split_lines(&partial);
178        let _ = emit_lines(&mut state.stdout, self.presenter.as_ref(), task, complete);
179        if let Some(accumulator) = state.accumulators.get_mut(task) {
180            accumulator.stdout_partial = remainder;
181        }
182    }
183
184    fn on_stderr(&self, task: &TaskId, bytes: &[u8]) {
185        if bytes.is_empty() {
186            return;
187        }
188        let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
189        let Some(accumulator) = state.accumulators.get_mut(task) else {
190            return;
191        };
192        let mut partial = std::mem::take(&mut accumulator.stderr_partial);
193        partial.extend_from_slice(bytes);
194        let (complete, remainder) = split_lines(&partial);
195        let _ = emit_lines(&mut state.stderr, self.presenter.as_ref(), task, complete);
196        if let Some(accumulator) = state.accumulators.get_mut(task) {
197            accumulator.stderr_partial = remainder;
198        }
199    }
200
201    fn on_task_finished(&self, task: &TaskId, record: &CompletedRecord) {
202        let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
203        let Some(accumulator) = state.accumulators.remove(task) else {
204            return;
205        };
206        flush_live_partials(&mut state, self.presenter.as_ref(), task, &accumulator);
207        let duration = accumulator.started_at.elapsed();
208        if let Some(bytes) = self.presenter.summary_completed(task, record, duration) {
209            let _ = state.stderr.write_all(&bytes);
210        }
211    }
212
213    fn on_task_skipped(&self, task: &TaskId, record: &SkipRecord) {
214        // No accumulator: cascade-skipped tasks never had
215        // `on_task_started` fire.
216        let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
217        if let Some(bytes) = self.presenter.summary_skipped(task, record) {
218            let _ = state.stderr.write_all(&bytes);
219        }
220    }
221
222    fn on_task_cancelled(&self, task: &TaskId, record: &CancelledRecord) {
223        let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
224        let duration = match record {
225            CancelledRecord::SignaledInFlight { .. } => {
226                state.accumulators.remove(task).map(|acc| {
227                    flush_live_partials(&mut state, self.presenter.as_ref(), task, &acc);
228                    acc.started_at.elapsed()
229                })
230            }
231            CancelledRecord::UpstreamCancelled { .. } | CancelledRecord::RunCancelled { .. } => {
232                // No accumulator: cascade-cancelled / drained tasks
233                // never had `on_task_started` fire.
234                None
235            }
236        };
237        if let Some(bytes) = self.presenter.summary_cancelled(task, record, duration) {
238            let _ = state.stderr.write_all(&bytes);
239        }
240    }
241}
242
243/// Flush an accumulator's trailing partial-line bytes to the
244/// matching sink, each appended with a `\n` so the
245/// line-atomicity invariant holds even when the task ended on a
246/// no-newline write.
247///
248/// The accumulator is passed by reference (already removed by the
249/// caller). The borrow lifetime here is the existing lock the
250/// caller holds: this helper writes through the live `LiveState`
251/// reference without re-locking.
252fn flush_live_partials<O, E>(
253    state: &mut LiveState<O, E>,
254    presenter: &dyn TaskPresenter,
255    task: &TaskId,
256    accumulator: &LiveAccumulator,
257) where
258    O: Write + Send,
259    E: Write + Send,
260{
261    if !accumulator.stdout_partial.is_empty() {
262        let _ = emit_lines(
263            &mut state.stdout,
264            presenter,
265            task,
266            vec![accumulator.stdout_partial.clone()],
267        );
268    }
269    if !accumulator.stderr_partial.is_empty() {
270        let _ = emit_lines(
271            &mut state.stderr,
272            presenter,
273            task,
274            vec![accumulator.stderr_partial.clone()],
275        );
276    }
277}
278
279/// Buffered-mode [`RunObserver`] per `EXEC-016`.
280///
281/// See the [module-level docs](self) for the presentation
282/// contract. Constructed with two sinks via [`Self::new`]; the
283/// inner sinks can be reclaimed with [`Self::into_inner`] after
284/// the run completes.
285///
286/// # Errors
287///
288/// Same write-failure contract as [`LiveOutputObserver`].
289pub struct BufferedOutputObserver<O, E>
290where
291    O: Write + Send,
292    E: Write + Send,
293{
294    state: Mutex<BufferedState<O, E>>,
295    presenter: Arc<dyn TaskPresenter>,
296}
297
298struct BufferedState<O, E> {
299    stdout: O,
300    stderr: E,
301    accumulators: BTreeMap<TaskId, BufferedAccumulator>,
302}
303
304struct BufferedAccumulator {
305    stdout: Vec<u8>,
306    stderr: Vec<u8>,
307    started_at: Instant,
308}
309
310impl BufferedAccumulator {
311    fn new() -> Self {
312        Self {
313            stdout: Vec::new(),
314            stderr: Vec::new(),
315            started_at: Instant::now(),
316        }
317    }
318}
319
320impl<O, E> BufferedOutputObserver<O, E>
321where
322    O: Write + Send,
323    E: Write + Send,
324{
325    /// Build an observer that accumulates per-task bytes and
326    /// emits them as two contiguous blocks (`stdout` then
327    /// `stderr`) on task completion, with prefix and summary-line
328    /// rendering delegated to [`PlainPresenter`]. Equivalent to
329    /// [`Self::with_presenter`] called with `Arc::new(PlainPresenter)`.
330    pub fn new(stdout: O, stderr: E) -> Self {
331        Self::with_presenter(stdout, stderr, Arc::new(PlainPresenter))
332    }
333
334    /// Build an observer that delegates prefix and summary-line
335    /// rendering to `presenter`. Buffered mode ignores the prefix
336    /// (captured bytes flush verbatim), but the presenter's
337    /// `summary_*` methods are still consulted on terminal
338    /// callbacks; a `None` return suppresses the summary line.
339    pub fn with_presenter(stdout: O, stderr: E, presenter: Arc<dyn TaskPresenter>) -> Self {
340        Self {
341            state: Mutex::new(BufferedState {
342                stdout,
343                stderr,
344                accumulators: BTreeMap::new(),
345            }),
346            presenter,
347        }
348    }
349
350    /// Reclaim the two sinks after the run has finished. Returns
351    /// `(stdout, stderr)`.
352    pub fn into_inner(self) -> (O, E) {
353        let state = self
354            .state
355            .into_inner()
356            .unwrap_or_else(PoisonError::into_inner);
357        (state.stdout, state.stderr)
358    }
359}
360
361impl<O, E> RunObserver for BufferedOutputObserver<O, E>
362where
363    O: Write + Send,
364    E: Write + Send,
365{
366    fn on_task_started(&self, task: &TaskId) {
367        let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
368        state
369            .accumulators
370            .insert(task.clone(), BufferedAccumulator::new());
371    }
372
373    fn on_stdout(&self, task: &TaskId, bytes: &[u8]) {
374        if bytes.is_empty() {
375            return;
376        }
377        let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
378        if let Some(accumulator) = state.accumulators.get_mut(task) {
379            accumulator.stdout.extend_from_slice(bytes);
380        }
381    }
382
383    fn on_stderr(&self, task: &TaskId, bytes: &[u8]) {
384        if bytes.is_empty() {
385            return;
386        }
387        let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
388        if let Some(accumulator) = state.accumulators.get_mut(task) {
389            accumulator.stderr.extend_from_slice(bytes);
390        }
391    }
392
393    fn on_task_finished(&self, task: &TaskId, record: &CompletedRecord) {
394        let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
395        let Some(accumulator) = state.accumulators.remove(task) else {
396            return;
397        };
398        flush_buffered_accumulator(&mut state, &accumulator);
399        let duration = accumulator.started_at.elapsed();
400        if let Some(bytes) = self.presenter.summary_completed(task, record, duration) {
401            let _ = state.stderr.write_all(&bytes);
402        }
403    }
404
405    fn on_task_skipped(&self, task: &TaskId, record: &SkipRecord) {
406        // No accumulator: cascade-skipped tasks never had
407        // `on_task_started` fire.
408        let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
409        if let Some(bytes) = self.presenter.summary_skipped(task, record) {
410            let _ = state.stderr.write_all(&bytes);
411        }
412    }
413
414    fn on_task_cancelled(&self, task: &TaskId, record: &CancelledRecord) {
415        let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
416        let duration = match record {
417            CancelledRecord::SignaledInFlight { .. } => {
418                state.accumulators.remove(task).map(|acc| {
419                    flush_buffered_accumulator(&mut state, &acc);
420                    acc.started_at.elapsed()
421                })
422            }
423            CancelledRecord::UpstreamCancelled { .. } | CancelledRecord::RunCancelled { .. } => {
424                None
425            }
426        };
427        if let Some(bytes) = self.presenter.summary_cancelled(task, record, duration) {
428            let _ = state.stderr.write_all(&bytes);
429        }
430    }
431}
432
433/// Write the accumulator's full stdout and stderr blocks to the
434/// matching sinks. Stdout flushes first, then stderr; this is the
435/// `EXEC-016` buffered-mode contract. Empty blocks emit nothing.
436fn flush_buffered_accumulator<O, E>(
437    state: &mut BufferedState<O, E>,
438    accumulator: &BufferedAccumulator,
439) where
440    O: Write + Send,
441    E: Write + Send,
442{
443    if !accumulator.stdout.is_empty() {
444        let _ = state.stdout.write_all(&accumulator.stdout);
445    }
446    if !accumulator.stderr.is_empty() {
447        let _ = state.stderr.write_all(&accumulator.stderr);
448    }
449}
450
451/// Split `bytes` at every `\n`, returning a vector of complete
452/// line contents (the `\n` is dropped) and the trailing remainder
453/// that comes after the last `\n` (empty when `bytes` ends in
454/// `\n`).
455fn split_lines(bytes: &[u8]) -> (Vec<Vec<u8>>, Vec<u8>) {
456    let mut lines = Vec::new();
457    let mut start = 0usize;
458    for (index, byte) in bytes.iter().enumerate() {
459        if *byte == b'\n' {
460            lines.push(bytes[start..index].to_vec());
461            start = index + 1;
462        }
463    }
464    let remainder = bytes[start..].to_vec();
465    (lines, remainder)
466}
467
468/// Write each line in `lines` to `sink` under one `write_all`
469/// call per line, prefixed with the bytes the presenter produces
470/// for `task`. The mutex held by the caller guarantees no other
471/// observer call interleaves between or within these writes.
472fn emit_lines<W: Write>(
473    sink: &mut W,
474    presenter: &dyn TaskPresenter,
475    task: &TaskId,
476    lines: Vec<Vec<u8>>,
477) -> io::Result<()> {
478    let prefix = presenter.prefix(task);
479    for line in lines {
480        let mut buf = Vec::with_capacity(prefix.len() + line.len() + 1);
481        buf.extend_from_slice(&prefix);
482        buf.extend_from_slice(&line);
483        buf.push(b'\n');
484        sink.write_all(&buf)?;
485    }
486    Ok(())
487}
488
489#[cfg(test)]
490mod tests {
491    use std::str::FromStr;
492
493    use haz_domain::name::{ProjectName, TaskName};
494
495    use super::*;
496    use crate::run_task::{RunSource, RunState};
497
498    fn task_id_for(project: &str, task: &str) -> TaskId {
499        TaskId {
500            project: ProjectName::from_str(project).unwrap(),
501            task: TaskName::from_str(task).unwrap(),
502        }
503    }
504
505    fn completed_for(task: &TaskId) -> CompletedRecord {
506        CompletedRecord {
507            task: task.clone(),
508            source: RunSource::FreshRun,
509            state: RunState::Succeeded,
510            exit_status: None,
511            stdout_hash: [0u8; 32],
512            stderr_hash: [0u8; 32],
513            materialised_outputs: Vec::new(),
514        }
515    }
516
517    fn signaled_in_flight_for(task: &TaskId) -> CancelledRecord {
518        let status: std::process::ExitStatus = std::os::unix::process::ExitStatusExt::from_raw(0);
519        CancelledRecord::SignaledInFlight {
520            task: task.clone(),
521            exit_status: status,
522            stdout_hash: [0u8; 32],
523            stderr_hash: [0u8; 32],
524        }
525    }
526
527    fn upstream_cancelled_for(task: &TaskId, upstream: &TaskId) -> CancelledRecord {
528        CancelledRecord::UpstreamCancelled {
529            task: task.clone(),
530            upstream: upstream.clone(),
531        }
532    }
533
534    fn run_cancelled_for(task: &TaskId) -> CancelledRecord {
535        CancelledRecord::RunCancelled { task: task.clone() }
536    }
537
538    fn upstream_failed_for(task: &TaskId, upstream: &TaskId) -> SkipRecord {
539        SkipRecord {
540            task: task.clone(),
541            cause: crate::run_task::SkipCause::UpstreamFailed {
542                upstream: upstream.clone(),
543            },
544        }
545    }
546
547    // -------------------- Live: line shaping --------------------
548
549    #[test]
550    fn live_emits_complete_line_with_tag_prefix() {
551        let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
552        let task = task_id_for("lib", "build");
553        observer.on_task_started(&task);
554        observer.on_stdout(&task, b"hello world\n");
555        observer.on_task_finished(&task, &completed_for(&task));
556        let (stdout, stderr) = observer.into_inner();
557        assert_eq!(stdout, b"[lib:build] hello world\n");
558        assert!(stderr.is_empty());
559    }
560
561    #[test]
562    fn live_splits_multi_line_chunk_on_newlines() {
563        let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
564        let task = task_id_for("lib", "build");
565        observer.on_task_started(&task);
566        observer.on_stdout(&task, b"one\ntwo\nthree\n");
567        observer.on_task_finished(&task, &completed_for(&task));
568        let (stdout, _) = observer.into_inner();
569        assert_eq!(
570            stdout,
571            b"[lib:build] one\n[lib:build] two\n[lib:build] three\n"
572        );
573    }
574
575    #[test]
576    fn live_joins_line_across_chunks() {
577        let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
578        let task = task_id_for("lib", "build");
579        observer.on_task_started(&task);
580        observer.on_stdout(&task, b"hel");
581        observer.on_stdout(&task, b"lo wor");
582        observer.on_stdout(&task, b"ld\n");
583        observer.on_task_finished(&task, &completed_for(&task));
584        let (stdout, _) = observer.into_inner();
585        assert_eq!(stdout, b"[lib:build] hello world\n");
586    }
587
588    #[test]
589    fn live_flushes_partial_line_with_newline_on_finish() {
590        let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
591        let task = task_id_for("lib", "build");
592        observer.on_task_started(&task);
593        observer.on_stdout(&task, b"complete\npartial");
594        observer.on_task_finished(&task, &completed_for(&task));
595        let (stdout, _) = observer.into_inner();
596        assert_eq!(stdout, b"[lib:build] complete\n[lib:build] partial\n");
597    }
598
599    #[test]
600    fn live_flushes_partial_line_on_cancel_signaled_in_flight() {
601        let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
602        let task = task_id_for("lib", "build");
603        observer.on_task_started(&task);
604        observer.on_stdout(&task, b"error: panic at");
605        observer.on_task_cancelled(&task, &signaled_in_flight_for(&task));
606        let (stdout, _) = observer.into_inner();
607        assert_eq!(stdout, b"[lib:build] error: panic at\n");
608    }
609
610    #[test]
611    fn live_stderr_routed_to_stderr_sink_with_prefix() {
612        let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
613        let task = task_id_for("lib", "build");
614        observer.on_task_started(&task);
615        observer.on_stderr(&task, b"warning: deprecated\n");
616        observer.on_task_finished(&task, &completed_for(&task));
617        let (stdout, stderr) = observer.into_inner();
618        assert!(stdout.is_empty());
619        assert_eq!(stderr, b"[lib:build] warning: deprecated\n");
620    }
621
622    #[test]
623    fn live_empty_chunk_is_noop() {
624        let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
625        let task = task_id_for("lib", "build");
626        observer.on_task_started(&task);
627        observer.on_stdout(&task, b"");
628        observer.on_stderr(&task, b"");
629        observer.on_task_finished(&task, &completed_for(&task));
630        let (stdout, stderr) = observer.into_inner();
631        assert!(stdout.is_empty());
632        assert!(stderr.is_empty());
633    }
634
635    #[test]
636    fn live_bare_newline_byte_emits_empty_tagged_line() {
637        let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
638        let task = task_id_for("lib", "build");
639        observer.on_task_started(&task);
640        observer.on_stdout(&task, b"\n");
641        observer.on_task_finished(&task, &completed_for(&task));
642        let (stdout, _) = observer.into_inner();
643        assert_eq!(stdout, b"[lib:build] \n");
644    }
645
646    // -------------------- Live: lifecycle defensiveness --------------------
647
648    #[test]
649    fn live_cancel_upstream_is_noop_when_no_accumulator() {
650        let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
651        let task = task_id_for("lib", "downstream");
652        let upstream = task_id_for("lib", "root");
653        observer.on_task_cancelled(&task, &upstream_cancelled_for(&task, &upstream));
654        let (stdout, stderr) = observer.into_inner();
655        assert!(stdout.is_empty());
656        assert!(stderr.is_empty());
657    }
658
659    #[test]
660    fn live_cancel_run_cancelled_is_noop_when_no_accumulator() {
661        let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
662        let task = task_id_for("lib", "drained");
663        observer.on_task_cancelled(&task, &run_cancelled_for(&task));
664        let (stdout, stderr) = observer.into_inner();
665        assert!(stdout.is_empty());
666        assert!(stderr.is_empty());
667    }
668
669    #[test]
670    fn live_skipped_is_noop_when_no_accumulator() {
671        let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
672        let task = task_id_for("lib", "downstream");
673        let upstream = task_id_for("lib", "root");
674        observer.on_task_skipped(&task, &upstream_failed_for(&task, &upstream));
675        let (stdout, stderr) = observer.into_inner();
676        assert!(stdout.is_empty());
677        assert!(stderr.is_empty());
678    }
679
680    // -------------------- Live: interleaving --------------------
681
682    #[test]
683    fn live_two_tasks_lines_interleave_at_line_granularity() {
684        let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
685        let a = task_id_for("lib", "a");
686        let b = task_id_for("lib", "b");
687        observer.on_task_started(&a);
688        observer.on_task_started(&b);
689        observer.on_stdout(&a, b"alpha\n");
690        observer.on_stdout(&b, b"beta\n");
691        observer.on_stdout(&a, b"gamma\n");
692        observer.on_task_finished(&a, &completed_for(&a));
693        observer.on_task_finished(&b, &completed_for(&b));
694        let (stdout, _) = observer.into_inner();
695        assert_eq!(stdout, b"[lib:a] alpha\n[lib:b] beta\n[lib:a] gamma\n");
696    }
697
698    // -------------------- Buffered --------------------
699
700    #[test]
701    fn buffered_emits_block_on_finish_no_prefix() {
702        let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
703        let task = task_id_for("lib", "build");
704        observer.on_task_started(&task);
705        observer.on_stdout(&task, b"hello world\nno-newline-tail");
706        observer.on_task_finished(&task, &completed_for(&task));
707        let (stdout, _) = observer.into_inner();
708        assert_eq!(stdout, b"hello world\nno-newline-tail");
709    }
710
711    #[test]
712    fn buffered_accumulates_across_chunks() {
713        let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
714        let task = task_id_for("lib", "build");
715        observer.on_task_started(&task);
716        observer.on_stdout(&task, b"abc");
717        observer.on_stdout(&task, b"def");
718        observer.on_stdout(&task, b"ghi");
719        observer.on_task_finished(&task, &completed_for(&task));
720        let (stdout, _) = observer.into_inner();
721        assert_eq!(stdout, b"abcdefghi");
722    }
723
724    #[test]
725    fn buffered_emits_stdout_block_before_stderr_block() {
726        let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
727        let task = task_id_for("lib", "build");
728        observer.on_task_started(&task);
729        observer.on_stdout(&task, b"out\n");
730        observer.on_stderr(&task, b"err\n");
731        observer.on_task_finished(&task, &completed_for(&task));
732        let (stdout, stderr) = observer.into_inner();
733        assert_eq!(stdout, b"out\n");
734        assert_eq!(stderr, b"err\n");
735    }
736
737    #[test]
738    fn buffered_cancel_signaled_in_flight_flushes_blocks() {
739        let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
740        let task = task_id_for("lib", "build");
741        observer.on_task_started(&task);
742        observer.on_stdout(&task, b"out-bytes");
743        observer.on_stderr(&task, b"err-bytes");
744        observer.on_task_cancelled(&task, &signaled_in_flight_for(&task));
745        let (stdout, stderr) = observer.into_inner();
746        assert_eq!(stdout, b"out-bytes");
747        assert_eq!(stderr, b"err-bytes");
748    }
749
750    #[test]
751    fn buffered_cancel_upstream_is_noop_when_no_accumulator() {
752        let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
753        let task = task_id_for("lib", "downstream");
754        let upstream = task_id_for("lib", "root");
755        observer.on_task_cancelled(&task, &upstream_cancelled_for(&task, &upstream));
756        let (stdout, stderr) = observer.into_inner();
757        assert!(stdout.is_empty());
758        assert!(stderr.is_empty());
759    }
760
761    #[test]
762    fn buffered_cancel_run_cancelled_is_noop_when_no_accumulator() {
763        let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
764        let task = task_id_for("lib", "drained");
765        observer.on_task_cancelled(&task, &run_cancelled_for(&task));
766        let (stdout, stderr) = observer.into_inner();
767        assert!(stdout.is_empty());
768        assert!(stderr.is_empty());
769    }
770
771    #[test]
772    fn buffered_skipped_is_noop_when_no_accumulator() {
773        let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
774        let task = task_id_for("lib", "downstream");
775        let upstream = task_id_for("lib", "root");
776        observer.on_task_skipped(&task, &upstream_failed_for(&task, &upstream));
777        let (stdout, stderr) = observer.into_inner();
778        assert!(stdout.is_empty());
779        assert!(stderr.is_empty());
780    }
781
782    #[test]
783    fn buffered_empty_streams_emit_nothing_no_panic() {
784        let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
785        let task = task_id_for("lib", "build");
786        observer.on_task_started(&task);
787        observer.on_task_finished(&task, &completed_for(&task));
788        let (stdout, stderr) = observer.into_inner();
789        assert!(stdout.is_empty());
790        assert!(stderr.is_empty());
791    }
792
793    #[test]
794    fn buffered_tasks_never_interleave_under_sequential_calls() {
795        // The single-mutex observer serialises calls. Verify that
796        // bytes from task `a` and task `b` arrive contiguously
797        // per task in the output, never interleaved.
798        let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
799        let a = task_id_for("lib", "a");
800        let b = task_id_for("lib", "b");
801        observer.on_task_started(&a);
802        observer.on_task_started(&b);
803        observer.on_stdout(&a, b"AAA");
804        observer.on_stdout(&b, b"BBB");
805        observer.on_stdout(&a, b"AAA");
806        observer.on_task_finished(&a, &completed_for(&a));
807        observer.on_task_finished(&b, &completed_for(&b));
808        let (stdout, _) = observer.into_inner();
809        // `a` finishes first: its full accumulated `AAAAAA`
810        // (no `B` bytes) is emitted as a contiguous block,
811        // then `b`'s `BBB` follows.
812        assert_eq!(stdout, b"AAAAAABBB");
813    }
814
815    // -------------------- split_lines helper --------------------
816
817    #[test]
818    fn split_lines_yields_complete_lines_and_trailing_remainder() {
819        let (lines, remainder) = split_lines(b"alpha\nbeta\ngamma");
820        assert_eq!(lines, vec![b"alpha".to_vec(), b"beta".to_vec()]);
821        assert_eq!(remainder, b"gamma".to_vec());
822    }
823
824    #[test]
825    fn split_lines_terminator_only_input_yields_empty_remainder() {
826        let (lines, remainder) = split_lines(b"alpha\n");
827        assert_eq!(lines, vec![b"alpha".to_vec()]);
828        assert!(remainder.is_empty());
829    }
830
831    #[test]
832    fn split_lines_empty_input_yields_nothing() {
833        let (lines, remainder) = split_lines(b"");
834        assert!(lines.is_empty());
835        assert!(remainder.is_empty());
836    }
837
838    // -------------------- Presenter wiring --------------------
839
840    /// Recording presenter for the wiring tests: emits a synthetic
841    /// prefix and summary line so the observer's behaviour around
842    /// the [`TaskPresenter`] seam can be asserted byte-for-byte.
843    struct RecordingPresenter;
844
845    impl TaskPresenter for RecordingPresenter {
846        fn prefix(&self, task: &TaskId) -> Vec<u8> {
847            format!("<{task}> ").into_bytes()
848        }
849
850        fn summary_completed(
851            &self,
852            task: &TaskId,
853            record: &CompletedRecord,
854            duration: std::time::Duration,
855        ) -> Option<Vec<u8>> {
856            let kind = match record.source {
857                crate::run_task::RunSource::CacheHit => "hit",
858                crate::run_task::RunSource::FreshRun => "fresh",
859            };
860            let nonzero = if duration > std::time::Duration::ZERO {
861                "+"
862            } else {
863                "0"
864            };
865            Some(format!("[{task}] completed:{kind}:{nonzero}\n").into_bytes())
866        }
867
868        fn summary_skipped(&self, task: &TaskId, _record: &SkipRecord) -> Option<Vec<u8>> {
869            Some(format!("[{task}] skipped\n").into_bytes())
870        }
871
872        fn summary_cancelled(
873            &self,
874            task: &TaskId,
875            _record: &CancelledRecord,
876            duration: Option<std::time::Duration>,
877        ) -> Option<Vec<u8>> {
878            let dur = if duration.is_some() { "some" } else { "none" };
879            Some(format!("[{task}] cancelled:{dur}\n").into_bytes())
880        }
881    }
882
883    #[test]
884    fn live_with_presenter_uses_custom_prefix() {
885        let observer = LiveOutputObserver::with_presenter(
886            Vec::<u8>::new(),
887            Vec::<u8>::new(),
888            Arc::new(RecordingPresenter),
889        );
890        let task = task_id_for("lib", "build");
891        observer.on_task_started(&task);
892        observer.on_stdout(&task, b"hello\n");
893        observer.on_task_finished(&task, &completed_for(&task));
894        let (stdout, stderr) = observer.into_inner();
895        assert_eq!(stdout, b"<lib:build> hello\n");
896        // Summary line goes to stderr, not stdout; non-zero
897        // duration was observed by the presenter.
898        assert_eq!(stderr, b"[lib:build] completed:fresh:+\n");
899    }
900
901    #[test]
902    fn live_completed_summary_routed_to_stderr_after_partial_flush() {
903        let observer = LiveOutputObserver::with_presenter(
904            Vec::<u8>::new(),
905            Vec::<u8>::new(),
906            Arc::new(RecordingPresenter),
907        );
908        let task = task_id_for("lib", "build");
909        observer.on_task_started(&task);
910        observer.on_stdout(&task, b"trailing-no-newline");
911        observer.on_task_finished(&task, &completed_for(&task));
912        let (stdout, stderr) = observer.into_inner();
913        // Partial line flushed to stdout with appended `\n`...
914        assert_eq!(stdout, b"<lib:build> trailing-no-newline\n");
915        // ...and the summary line lands on stderr.
916        assert_eq!(stderr, b"[lib:build] completed:fresh:+\n");
917    }
918
919    #[test]
920    fn live_skip_emits_summary_line_without_accumulator() {
921        let observer = LiveOutputObserver::with_presenter(
922            Vec::<u8>::new(),
923            Vec::<u8>::new(),
924            Arc::new(RecordingPresenter),
925        );
926        let task = task_id_for("lib", "down");
927        let upstream = task_id_for("lib", "root");
928        observer.on_task_skipped(&task, &upstream_failed_for(&task, &upstream));
929        let (stdout, stderr) = observer.into_inner();
930        assert!(stdout.is_empty());
931        assert_eq!(stderr, b"[lib:down] skipped\n");
932    }
933
934    #[test]
935    fn live_cancel_signaled_in_flight_carries_duration() {
936        let observer = LiveOutputObserver::with_presenter(
937            Vec::<u8>::new(),
938            Vec::<u8>::new(),
939            Arc::new(RecordingPresenter),
940        );
941        let task = task_id_for("lib", "build");
942        observer.on_task_started(&task);
943        observer.on_task_cancelled(&task, &signaled_in_flight_for(&task));
944        let (_, stderr) = observer.into_inner();
945        assert_eq!(stderr, b"[lib:build] cancelled:some\n");
946    }
947
948    #[test]
949    fn live_cancel_run_cancelled_carries_no_duration() {
950        let observer = LiveOutputObserver::with_presenter(
951            Vec::<u8>::new(),
952            Vec::<u8>::new(),
953            Arc::new(RecordingPresenter),
954        );
955        let task = task_id_for("lib", "drained");
956        observer.on_task_cancelled(&task, &run_cancelled_for(&task));
957        let (_, stderr) = observer.into_inner();
958        assert_eq!(stderr, b"[lib:drained] cancelled:none\n");
959    }
960
961    #[test]
962    fn buffered_with_presenter_emits_summary_after_blocks() {
963        let observer = BufferedOutputObserver::with_presenter(
964            Vec::<u8>::new(),
965            Vec::<u8>::new(),
966            Arc::new(RecordingPresenter),
967        );
968        let task = task_id_for("lib", "build");
969        observer.on_task_started(&task);
970        observer.on_stdout(&task, b"out");
971        observer.on_stderr(&task, b"err");
972        observer.on_task_finished(&task, &completed_for(&task));
973        let (stdout, stderr) = observer.into_inner();
974        // Buffered mode emits stdout verbatim with NO prefix...
975        assert_eq!(stdout, b"out");
976        // ...then writes stderr block FOLLOWED BY the summary line
977        // to the same stderr sink.
978        assert_eq!(stderr, b"err[lib:build] completed:fresh:+\n");
979    }
980
981    #[test]
982    fn buffered_skip_emits_summary_line() {
983        let observer = BufferedOutputObserver::with_presenter(
984            Vec::<u8>::new(),
985            Vec::<u8>::new(),
986            Arc::new(RecordingPresenter),
987        );
988        let task = task_id_for("lib", "down");
989        let upstream = task_id_for("lib", "root");
990        observer.on_task_skipped(&task, &upstream_failed_for(&task, &upstream));
991        let (_, stderr) = observer.into_inner();
992        assert_eq!(stderr, b"[lib:down] skipped\n");
993    }
994
995    #[test]
996    fn buffered_cancel_signaled_carries_duration() {
997        let observer = BufferedOutputObserver::with_presenter(
998            Vec::<u8>::new(),
999            Vec::<u8>::new(),
1000            Arc::new(RecordingPresenter),
1001        );
1002        let task = task_id_for("lib", "build");
1003        observer.on_task_started(&task);
1004        observer.on_task_cancelled(&task, &signaled_in_flight_for(&task));
1005        let (_, stderr) = observer.into_inner();
1006        assert_eq!(stderr, b"[lib:build] cancelled:some\n");
1007    }
1008}