use std::collections::BTreeMap;
use std::io::{self, Write};
use std::sync::{Arc, Mutex, PoisonError};
use std::time::Instant;
use haz_domain::task_id::TaskId;
use crate::presenter::{PlainPresenter, TaskPresenter};
use crate::run_task::{CancelledRecord, CompletedRecord, RunObserver, SkipRecord};
pub struct LiveOutputObserver<O, E>
where
O: Write + Send,
E: Write + Send,
{
state: Mutex<LiveState<O, E>>,
presenter: Arc<dyn TaskPresenter>,
}
struct LiveState<O, E> {
stdout: O,
stderr: E,
accumulators: BTreeMap<TaskId, LiveAccumulator>,
}
struct LiveAccumulator {
stdout_partial: Vec<u8>,
stderr_partial: Vec<u8>,
started_at: Instant,
}
impl LiveAccumulator {
fn new() -> Self {
Self {
stdout_partial: Vec::new(),
stderr_partial: Vec::new(),
started_at: Instant::now(),
}
}
}
impl<O, E> LiveOutputObserver<O, E>
where
O: Write + Send,
E: Write + Send,
{
pub fn new(stdout: O, stderr: E) -> Self {
Self::with_presenter(stdout, stderr, Arc::new(PlainPresenter))
}
pub fn with_presenter(stdout: O, stderr: E, presenter: Arc<dyn TaskPresenter>) -> Self {
Self {
state: Mutex::new(LiveState {
stdout,
stderr,
accumulators: BTreeMap::new(),
}),
presenter,
}
}
pub fn into_inner(self) -> (O, E) {
let state = self
.state
.into_inner()
.unwrap_or_else(PoisonError::into_inner);
(state.stdout, state.stderr)
}
}
impl<O, E> RunObserver for LiveOutputObserver<O, E>
where
O: Write + Send,
E: Write + Send,
{
fn on_task_started(&self, task: &TaskId) {
let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
state
.accumulators
.insert(task.clone(), LiveAccumulator::new());
}
fn on_stdout(&self, task: &TaskId, bytes: &[u8]) {
if bytes.is_empty() {
return;
}
let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
let Some(accumulator) = state.accumulators.get_mut(task) else {
return;
};
let mut partial = std::mem::take(&mut accumulator.stdout_partial);
partial.extend_from_slice(bytes);
let (complete, remainder) = split_lines(&partial);
let _ = emit_lines(&mut state.stdout, self.presenter.as_ref(), task, complete);
if let Some(accumulator) = state.accumulators.get_mut(task) {
accumulator.stdout_partial = remainder;
}
}
fn on_stderr(&self, task: &TaskId, bytes: &[u8]) {
if bytes.is_empty() {
return;
}
let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
let Some(accumulator) = state.accumulators.get_mut(task) else {
return;
};
let mut partial = std::mem::take(&mut accumulator.stderr_partial);
partial.extend_from_slice(bytes);
let (complete, remainder) = split_lines(&partial);
let _ = emit_lines(&mut state.stderr, self.presenter.as_ref(), task, complete);
if let Some(accumulator) = state.accumulators.get_mut(task) {
accumulator.stderr_partial = remainder;
}
}
fn on_task_finished(&self, task: &TaskId, record: &CompletedRecord) {
let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
let Some(accumulator) = state.accumulators.remove(task) else {
return;
};
flush_live_partials(&mut state, self.presenter.as_ref(), task, &accumulator);
let duration = accumulator.started_at.elapsed();
if let Some(bytes) = self.presenter.summary_completed(task, record, duration) {
let _ = state.stderr.write_all(&bytes);
}
}
fn on_task_skipped(&self, task: &TaskId, record: &SkipRecord) {
let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
if let Some(bytes) = self.presenter.summary_skipped(task, record) {
let _ = state.stderr.write_all(&bytes);
}
}
fn on_task_cancelled(&self, task: &TaskId, record: &CancelledRecord) {
let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
let duration = match record {
CancelledRecord::SignaledInFlight { .. } => {
state.accumulators.remove(task).map(|acc| {
flush_live_partials(&mut state, self.presenter.as_ref(), task, &acc);
acc.started_at.elapsed()
})
}
CancelledRecord::UpstreamCancelled { .. } | CancelledRecord::RunCancelled { .. } => {
None
}
};
if let Some(bytes) = self.presenter.summary_cancelled(task, record, duration) {
let _ = state.stderr.write_all(&bytes);
}
}
}
fn flush_live_partials<O, E>(
state: &mut LiveState<O, E>,
presenter: &dyn TaskPresenter,
task: &TaskId,
accumulator: &LiveAccumulator,
) where
O: Write + Send,
E: Write + Send,
{
if !accumulator.stdout_partial.is_empty() {
let _ = emit_lines(
&mut state.stdout,
presenter,
task,
vec![accumulator.stdout_partial.clone()],
);
}
if !accumulator.stderr_partial.is_empty() {
let _ = emit_lines(
&mut state.stderr,
presenter,
task,
vec![accumulator.stderr_partial.clone()],
);
}
}
pub struct BufferedOutputObserver<O, E>
where
O: Write + Send,
E: Write + Send,
{
state: Mutex<BufferedState<O, E>>,
presenter: Arc<dyn TaskPresenter>,
}
struct BufferedState<O, E> {
stdout: O,
stderr: E,
accumulators: BTreeMap<TaskId, BufferedAccumulator>,
}
struct BufferedAccumulator {
stdout: Vec<u8>,
stderr: Vec<u8>,
started_at: Instant,
}
impl BufferedAccumulator {
fn new() -> Self {
Self {
stdout: Vec::new(),
stderr: Vec::new(),
started_at: Instant::now(),
}
}
}
impl<O, E> BufferedOutputObserver<O, E>
where
O: Write + Send,
E: Write + Send,
{
pub fn new(stdout: O, stderr: E) -> Self {
Self::with_presenter(stdout, stderr, Arc::new(PlainPresenter))
}
pub fn with_presenter(stdout: O, stderr: E, presenter: Arc<dyn TaskPresenter>) -> Self {
Self {
state: Mutex::new(BufferedState {
stdout,
stderr,
accumulators: BTreeMap::new(),
}),
presenter,
}
}
pub fn into_inner(self) -> (O, E) {
let state = self
.state
.into_inner()
.unwrap_or_else(PoisonError::into_inner);
(state.stdout, state.stderr)
}
}
impl<O, E> RunObserver for BufferedOutputObserver<O, E>
where
O: Write + Send,
E: Write + Send,
{
fn on_task_started(&self, task: &TaskId) {
let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
state
.accumulators
.insert(task.clone(), BufferedAccumulator::new());
}
fn on_stdout(&self, task: &TaskId, bytes: &[u8]) {
if bytes.is_empty() {
return;
}
let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
if let Some(accumulator) = state.accumulators.get_mut(task) {
accumulator.stdout.extend_from_slice(bytes);
}
}
fn on_stderr(&self, task: &TaskId, bytes: &[u8]) {
if bytes.is_empty() {
return;
}
let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
if let Some(accumulator) = state.accumulators.get_mut(task) {
accumulator.stderr.extend_from_slice(bytes);
}
}
fn on_task_finished(&self, task: &TaskId, record: &CompletedRecord) {
let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
let Some(accumulator) = state.accumulators.remove(task) else {
return;
};
flush_buffered_accumulator(&mut state, &accumulator);
let duration = accumulator.started_at.elapsed();
if let Some(bytes) = self.presenter.summary_completed(task, record, duration) {
let _ = state.stderr.write_all(&bytes);
}
}
fn on_task_skipped(&self, task: &TaskId, record: &SkipRecord) {
let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
if let Some(bytes) = self.presenter.summary_skipped(task, record) {
let _ = state.stderr.write_all(&bytes);
}
}
fn on_task_cancelled(&self, task: &TaskId, record: &CancelledRecord) {
let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
let duration = match record {
CancelledRecord::SignaledInFlight { .. } => {
state.accumulators.remove(task).map(|acc| {
flush_buffered_accumulator(&mut state, &acc);
acc.started_at.elapsed()
})
}
CancelledRecord::UpstreamCancelled { .. } | CancelledRecord::RunCancelled { .. } => {
None
}
};
if let Some(bytes) = self.presenter.summary_cancelled(task, record, duration) {
let _ = state.stderr.write_all(&bytes);
}
}
}
fn flush_buffered_accumulator<O, E>(
state: &mut BufferedState<O, E>,
accumulator: &BufferedAccumulator,
) where
O: Write + Send,
E: Write + Send,
{
if !accumulator.stdout.is_empty() {
let _ = state.stdout.write_all(&accumulator.stdout);
}
if !accumulator.stderr.is_empty() {
let _ = state.stderr.write_all(&accumulator.stderr);
}
}
fn split_lines(bytes: &[u8]) -> (Vec<Vec<u8>>, Vec<u8>) {
let mut lines = Vec::new();
let mut start = 0usize;
for (index, byte) in bytes.iter().enumerate() {
if *byte == b'\n' {
lines.push(bytes[start..index].to_vec());
start = index + 1;
}
}
let remainder = bytes[start..].to_vec();
(lines, remainder)
}
fn emit_lines<W: Write>(
sink: &mut W,
presenter: &dyn TaskPresenter,
task: &TaskId,
lines: Vec<Vec<u8>>,
) -> io::Result<()> {
let prefix = presenter.prefix(task);
for line in lines {
let mut buf = Vec::with_capacity(prefix.len() + line.len() + 1);
buf.extend_from_slice(&prefix);
buf.extend_from_slice(&line);
buf.push(b'\n');
sink.write_all(&buf)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use haz_domain::name::{ProjectName, TaskName};
use super::*;
use crate::run_task::{RunSource, RunState};
fn task_id_for(project: &str, task: &str) -> TaskId {
TaskId {
project: ProjectName::from_str(project).unwrap(),
task: TaskName::from_str(task).unwrap(),
}
}
fn completed_for(task: &TaskId) -> CompletedRecord {
CompletedRecord {
task: task.clone(),
source: RunSource::FreshRun,
state: RunState::Succeeded,
exit_status: None,
stdout_hash: [0u8; 32],
stderr_hash: [0u8; 32],
materialised_outputs: Vec::new(),
}
}
fn signaled_in_flight_for(task: &TaskId) -> CancelledRecord {
let status: std::process::ExitStatus = std::os::unix::process::ExitStatusExt::from_raw(0);
CancelledRecord::SignaledInFlight {
task: task.clone(),
exit_status: status,
stdout_hash: [0u8; 32],
stderr_hash: [0u8; 32],
}
}
fn upstream_cancelled_for(task: &TaskId, upstream: &TaskId) -> CancelledRecord {
CancelledRecord::UpstreamCancelled {
task: task.clone(),
upstream: upstream.clone(),
}
}
fn run_cancelled_for(task: &TaskId) -> CancelledRecord {
CancelledRecord::RunCancelled { task: task.clone() }
}
fn upstream_failed_for(task: &TaskId, upstream: &TaskId) -> SkipRecord {
SkipRecord {
task: task.clone(),
cause: crate::run_task::SkipCause::UpstreamFailed {
upstream: upstream.clone(),
},
}
}
#[test]
fn live_emits_complete_line_with_tag_prefix() {
let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_stdout(&task, b"hello world\n");
observer.on_task_finished(&task, &completed_for(&task));
let (stdout, stderr) = observer.into_inner();
assert_eq!(stdout, b"[lib:build] hello world\n");
assert!(stderr.is_empty());
}
#[test]
fn live_splits_multi_line_chunk_on_newlines() {
let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_stdout(&task, b"one\ntwo\nthree\n");
observer.on_task_finished(&task, &completed_for(&task));
let (stdout, _) = observer.into_inner();
assert_eq!(
stdout,
b"[lib:build] one\n[lib:build] two\n[lib:build] three\n"
);
}
#[test]
fn live_joins_line_across_chunks() {
let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_stdout(&task, b"hel");
observer.on_stdout(&task, b"lo wor");
observer.on_stdout(&task, b"ld\n");
observer.on_task_finished(&task, &completed_for(&task));
let (stdout, _) = observer.into_inner();
assert_eq!(stdout, b"[lib:build] hello world\n");
}
#[test]
fn live_flushes_partial_line_with_newline_on_finish() {
let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_stdout(&task, b"complete\npartial");
observer.on_task_finished(&task, &completed_for(&task));
let (stdout, _) = observer.into_inner();
assert_eq!(stdout, b"[lib:build] complete\n[lib:build] partial\n");
}
#[test]
fn live_flushes_partial_line_on_cancel_signaled_in_flight() {
let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_stdout(&task, b"error: panic at");
observer.on_task_cancelled(&task, &signaled_in_flight_for(&task));
let (stdout, _) = observer.into_inner();
assert_eq!(stdout, b"[lib:build] error: panic at\n");
}
#[test]
fn live_stderr_routed_to_stderr_sink_with_prefix() {
let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_stderr(&task, b"warning: deprecated\n");
observer.on_task_finished(&task, &completed_for(&task));
let (stdout, stderr) = observer.into_inner();
assert!(stdout.is_empty());
assert_eq!(stderr, b"[lib:build] warning: deprecated\n");
}
#[test]
fn live_empty_chunk_is_noop() {
let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_stdout(&task, b"");
observer.on_stderr(&task, b"");
observer.on_task_finished(&task, &completed_for(&task));
let (stdout, stderr) = observer.into_inner();
assert!(stdout.is_empty());
assert!(stderr.is_empty());
}
#[test]
fn live_bare_newline_byte_emits_empty_tagged_line() {
let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_stdout(&task, b"\n");
observer.on_task_finished(&task, &completed_for(&task));
let (stdout, _) = observer.into_inner();
assert_eq!(stdout, b"[lib:build] \n");
}
#[test]
fn live_cancel_upstream_is_noop_when_no_accumulator() {
let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "downstream");
let upstream = task_id_for("lib", "root");
observer.on_task_cancelled(&task, &upstream_cancelled_for(&task, &upstream));
let (stdout, stderr) = observer.into_inner();
assert!(stdout.is_empty());
assert!(stderr.is_empty());
}
#[test]
fn live_cancel_run_cancelled_is_noop_when_no_accumulator() {
let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "drained");
observer.on_task_cancelled(&task, &run_cancelled_for(&task));
let (stdout, stderr) = observer.into_inner();
assert!(stdout.is_empty());
assert!(stderr.is_empty());
}
#[test]
fn live_skipped_is_noop_when_no_accumulator() {
let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "downstream");
let upstream = task_id_for("lib", "root");
observer.on_task_skipped(&task, &upstream_failed_for(&task, &upstream));
let (stdout, stderr) = observer.into_inner();
assert!(stdout.is_empty());
assert!(stderr.is_empty());
}
#[test]
fn live_two_tasks_lines_interleave_at_line_granularity() {
let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let a = task_id_for("lib", "a");
let b = task_id_for("lib", "b");
observer.on_task_started(&a);
observer.on_task_started(&b);
observer.on_stdout(&a, b"alpha\n");
observer.on_stdout(&b, b"beta\n");
observer.on_stdout(&a, b"gamma\n");
observer.on_task_finished(&a, &completed_for(&a));
observer.on_task_finished(&b, &completed_for(&b));
let (stdout, _) = observer.into_inner();
assert_eq!(stdout, b"[lib:a] alpha\n[lib:b] beta\n[lib:a] gamma\n");
}
#[test]
fn buffered_emits_block_on_finish_no_prefix() {
let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_stdout(&task, b"hello world\nno-newline-tail");
observer.on_task_finished(&task, &completed_for(&task));
let (stdout, _) = observer.into_inner();
assert_eq!(stdout, b"hello world\nno-newline-tail");
}
#[test]
fn buffered_accumulates_across_chunks() {
let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_stdout(&task, b"abc");
observer.on_stdout(&task, b"def");
observer.on_stdout(&task, b"ghi");
observer.on_task_finished(&task, &completed_for(&task));
let (stdout, _) = observer.into_inner();
assert_eq!(stdout, b"abcdefghi");
}
#[test]
fn buffered_emits_stdout_block_before_stderr_block() {
let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_stdout(&task, b"out\n");
observer.on_stderr(&task, b"err\n");
observer.on_task_finished(&task, &completed_for(&task));
let (stdout, stderr) = observer.into_inner();
assert_eq!(stdout, b"out\n");
assert_eq!(stderr, b"err\n");
}
#[test]
fn buffered_cancel_signaled_in_flight_flushes_blocks() {
let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_stdout(&task, b"out-bytes");
observer.on_stderr(&task, b"err-bytes");
observer.on_task_cancelled(&task, &signaled_in_flight_for(&task));
let (stdout, stderr) = observer.into_inner();
assert_eq!(stdout, b"out-bytes");
assert_eq!(stderr, b"err-bytes");
}
#[test]
fn buffered_cancel_upstream_is_noop_when_no_accumulator() {
let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "downstream");
let upstream = task_id_for("lib", "root");
observer.on_task_cancelled(&task, &upstream_cancelled_for(&task, &upstream));
let (stdout, stderr) = observer.into_inner();
assert!(stdout.is_empty());
assert!(stderr.is_empty());
}
#[test]
fn buffered_cancel_run_cancelled_is_noop_when_no_accumulator() {
let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "drained");
observer.on_task_cancelled(&task, &run_cancelled_for(&task));
let (stdout, stderr) = observer.into_inner();
assert!(stdout.is_empty());
assert!(stderr.is_empty());
}
#[test]
fn buffered_skipped_is_noop_when_no_accumulator() {
let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "downstream");
let upstream = task_id_for("lib", "root");
observer.on_task_skipped(&task, &upstream_failed_for(&task, &upstream));
let (stdout, stderr) = observer.into_inner();
assert!(stdout.is_empty());
assert!(stderr.is_empty());
}
#[test]
fn buffered_empty_streams_emit_nothing_no_panic() {
let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_task_finished(&task, &completed_for(&task));
let (stdout, stderr) = observer.into_inner();
assert!(stdout.is_empty());
assert!(stderr.is_empty());
}
#[test]
fn buffered_tasks_never_interleave_under_sequential_calls() {
let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
let a = task_id_for("lib", "a");
let b = task_id_for("lib", "b");
observer.on_task_started(&a);
observer.on_task_started(&b);
observer.on_stdout(&a, b"AAA");
observer.on_stdout(&b, b"BBB");
observer.on_stdout(&a, b"AAA");
observer.on_task_finished(&a, &completed_for(&a));
observer.on_task_finished(&b, &completed_for(&b));
let (stdout, _) = observer.into_inner();
assert_eq!(stdout, b"AAAAAABBB");
}
#[test]
fn split_lines_yields_complete_lines_and_trailing_remainder() {
let (lines, remainder) = split_lines(b"alpha\nbeta\ngamma");
assert_eq!(lines, vec![b"alpha".to_vec(), b"beta".to_vec()]);
assert_eq!(remainder, b"gamma".to_vec());
}
#[test]
fn split_lines_terminator_only_input_yields_empty_remainder() {
let (lines, remainder) = split_lines(b"alpha\n");
assert_eq!(lines, vec![b"alpha".to_vec()]);
assert!(remainder.is_empty());
}
#[test]
fn split_lines_empty_input_yields_nothing() {
let (lines, remainder) = split_lines(b"");
assert!(lines.is_empty());
assert!(remainder.is_empty());
}
struct RecordingPresenter;
impl TaskPresenter for RecordingPresenter {
fn prefix(&self, task: &TaskId) -> Vec<u8> {
format!("<{task}> ").into_bytes()
}
fn summary_completed(
&self,
task: &TaskId,
record: &CompletedRecord,
duration: std::time::Duration,
) -> Option<Vec<u8>> {
let kind = match record.source {
crate::run_task::RunSource::CacheHit => "hit",
crate::run_task::RunSource::FreshRun => "fresh",
};
let nonzero = if duration > std::time::Duration::ZERO {
"+"
} else {
"0"
};
Some(format!("[{task}] completed:{kind}:{nonzero}\n").into_bytes())
}
fn summary_skipped(&self, task: &TaskId, _record: &SkipRecord) -> Option<Vec<u8>> {
Some(format!("[{task}] skipped\n").into_bytes())
}
fn summary_cancelled(
&self,
task: &TaskId,
_record: &CancelledRecord,
duration: Option<std::time::Duration>,
) -> Option<Vec<u8>> {
let dur = if duration.is_some() { "some" } else { "none" };
Some(format!("[{task}] cancelled:{dur}\n").into_bytes())
}
}
#[test]
fn live_with_presenter_uses_custom_prefix() {
let observer = LiveOutputObserver::with_presenter(
Vec::<u8>::new(),
Vec::<u8>::new(),
Arc::new(RecordingPresenter),
);
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_stdout(&task, b"hello\n");
observer.on_task_finished(&task, &completed_for(&task));
let (stdout, stderr) = observer.into_inner();
assert_eq!(stdout, b"<lib:build> hello\n");
assert_eq!(stderr, b"[lib:build] completed:fresh:+\n");
}
#[test]
fn live_completed_summary_routed_to_stderr_after_partial_flush() {
let observer = LiveOutputObserver::with_presenter(
Vec::<u8>::new(),
Vec::<u8>::new(),
Arc::new(RecordingPresenter),
);
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_stdout(&task, b"trailing-no-newline");
observer.on_task_finished(&task, &completed_for(&task));
let (stdout, stderr) = observer.into_inner();
assert_eq!(stdout, b"<lib:build> trailing-no-newline\n");
assert_eq!(stderr, b"[lib:build] completed:fresh:+\n");
}
#[test]
fn live_skip_emits_summary_line_without_accumulator() {
let observer = LiveOutputObserver::with_presenter(
Vec::<u8>::new(),
Vec::<u8>::new(),
Arc::new(RecordingPresenter),
);
let task = task_id_for("lib", "down");
let upstream = task_id_for("lib", "root");
observer.on_task_skipped(&task, &upstream_failed_for(&task, &upstream));
let (stdout, stderr) = observer.into_inner();
assert!(stdout.is_empty());
assert_eq!(stderr, b"[lib:down] skipped\n");
}
#[test]
fn live_cancel_signaled_in_flight_carries_duration() {
let observer = LiveOutputObserver::with_presenter(
Vec::<u8>::new(),
Vec::<u8>::new(),
Arc::new(RecordingPresenter),
);
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_task_cancelled(&task, &signaled_in_flight_for(&task));
let (_, stderr) = observer.into_inner();
assert_eq!(stderr, b"[lib:build] cancelled:some\n");
}
#[test]
fn live_cancel_run_cancelled_carries_no_duration() {
let observer = LiveOutputObserver::with_presenter(
Vec::<u8>::new(),
Vec::<u8>::new(),
Arc::new(RecordingPresenter),
);
let task = task_id_for("lib", "drained");
observer.on_task_cancelled(&task, &run_cancelled_for(&task));
let (_, stderr) = observer.into_inner();
assert_eq!(stderr, b"[lib:drained] cancelled:none\n");
}
#[test]
fn buffered_with_presenter_emits_summary_after_blocks() {
let observer = BufferedOutputObserver::with_presenter(
Vec::<u8>::new(),
Vec::<u8>::new(),
Arc::new(RecordingPresenter),
);
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_stdout(&task, b"out");
observer.on_stderr(&task, b"err");
observer.on_task_finished(&task, &completed_for(&task));
let (stdout, stderr) = observer.into_inner();
assert_eq!(stdout, b"out");
assert_eq!(stderr, b"err[lib:build] completed:fresh:+\n");
}
#[test]
fn buffered_skip_emits_summary_line() {
let observer = BufferedOutputObserver::with_presenter(
Vec::<u8>::new(),
Vec::<u8>::new(),
Arc::new(RecordingPresenter),
);
let task = task_id_for("lib", "down");
let upstream = task_id_for("lib", "root");
observer.on_task_skipped(&task, &upstream_failed_for(&task, &upstream));
let (_, stderr) = observer.into_inner();
assert_eq!(stderr, b"[lib:down] skipped\n");
}
#[test]
fn buffered_cancel_signaled_carries_duration() {
let observer = BufferedOutputObserver::with_presenter(
Vec::<u8>::new(),
Vec::<u8>::new(),
Arc::new(RecordingPresenter),
);
let task = task_id_for("lib", "build");
observer.on_task_started(&task);
observer.on_task_cancelled(&task, &signaled_in_flight_for(&task));
let (_, stderr) = observer.into_inner();
assert_eq!(stderr, b"[lib:build] cancelled:some\n");
}
}