1use std::collections::BTreeMap;
56use std::io::{self, Write};
57use std::sync::{Arc, Mutex, PoisonError};
58use std::time::Instant;
59
60use haz_domain::task_id::TaskId;
61
62use crate::presenter::{PlainPresenter, TaskPresenter};
63use crate::run_task::{CancelledRecord, CompletedRecord, RunObserver, SkipRecord};
64
65pub struct LiveOutputObserver<O, E>
82where
83 O: Write + Send,
84 E: Write + Send,
85{
86 state: Mutex<LiveState<O, E>>,
87 presenter: Arc<dyn TaskPresenter>,
88}
89
90struct LiveState<O, E> {
91 stdout: O,
92 stderr: E,
93 accumulators: BTreeMap<TaskId, LiveAccumulator>,
94}
95
96struct LiveAccumulator {
97 stdout_partial: Vec<u8>,
98 stderr_partial: Vec<u8>,
99 started_at: Instant,
100}
101
102impl LiveAccumulator {
103 fn new() -> Self {
104 Self {
105 stdout_partial: Vec::new(),
106 stderr_partial: Vec::new(),
107 started_at: Instant::now(),
108 }
109 }
110}
111
112impl<O, E> LiveOutputObserver<O, E>
113where
114 O: Write + Send,
115 E: Write + Send,
116{
117 pub fn new(stdout: O, stderr: E) -> Self {
123 Self::with_presenter(stdout, stderr, Arc::new(PlainPresenter))
124 }
125
126 pub fn with_presenter(stdout: O, stderr: E, presenter: Arc<dyn TaskPresenter>) -> Self {
133 Self {
134 state: Mutex::new(LiveState {
135 stdout,
136 stderr,
137 accumulators: BTreeMap::new(),
138 }),
139 presenter,
140 }
141 }
142
143 pub fn into_inner(self) -> (O, E) {
147 let state = self
148 .state
149 .into_inner()
150 .unwrap_or_else(PoisonError::into_inner);
151 (state.stdout, state.stderr)
152 }
153}
154
155impl<O, E> RunObserver for LiveOutputObserver<O, E>
156where
157 O: Write + Send,
158 E: Write + Send,
159{
160 fn on_task_started(&self, task: &TaskId) {
161 let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
162 state
163 .accumulators
164 .insert(task.clone(), LiveAccumulator::new());
165 }
166
167 fn on_stdout(&self, task: &TaskId, bytes: &[u8]) {
168 if bytes.is_empty() {
169 return;
170 }
171 let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
172 let Some(accumulator) = state.accumulators.get_mut(task) else {
173 return;
174 };
175 let mut partial = std::mem::take(&mut accumulator.stdout_partial);
176 partial.extend_from_slice(bytes);
177 let (complete, remainder) = split_lines(&partial);
178 let _ = emit_lines(&mut state.stdout, self.presenter.as_ref(), task, complete);
179 if let Some(accumulator) = state.accumulators.get_mut(task) {
180 accumulator.stdout_partial = remainder;
181 }
182 }
183
184 fn on_stderr(&self, task: &TaskId, bytes: &[u8]) {
185 if bytes.is_empty() {
186 return;
187 }
188 let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
189 let Some(accumulator) = state.accumulators.get_mut(task) else {
190 return;
191 };
192 let mut partial = std::mem::take(&mut accumulator.stderr_partial);
193 partial.extend_from_slice(bytes);
194 let (complete, remainder) = split_lines(&partial);
195 let _ = emit_lines(&mut state.stderr, self.presenter.as_ref(), task, complete);
196 if let Some(accumulator) = state.accumulators.get_mut(task) {
197 accumulator.stderr_partial = remainder;
198 }
199 }
200
201 fn on_task_finished(&self, task: &TaskId, record: &CompletedRecord) {
202 let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
203 let Some(accumulator) = state.accumulators.remove(task) else {
204 return;
205 };
206 flush_live_partials(&mut state, self.presenter.as_ref(), task, &accumulator);
207 let duration = accumulator.started_at.elapsed();
208 if let Some(bytes) = self.presenter.summary_completed(task, record, duration) {
209 let _ = state.stderr.write_all(&bytes);
210 }
211 }
212
213 fn on_task_skipped(&self, task: &TaskId, record: &SkipRecord) {
214 let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
217 if let Some(bytes) = self.presenter.summary_skipped(task, record) {
218 let _ = state.stderr.write_all(&bytes);
219 }
220 }
221
222 fn on_task_cancelled(&self, task: &TaskId, record: &CancelledRecord) {
223 let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
224 let duration = match record {
225 CancelledRecord::SignaledInFlight { .. } => {
226 state.accumulators.remove(task).map(|acc| {
227 flush_live_partials(&mut state, self.presenter.as_ref(), task, &acc);
228 acc.started_at.elapsed()
229 })
230 }
231 CancelledRecord::UpstreamCancelled { .. } | CancelledRecord::RunCancelled { .. } => {
232 None
235 }
236 };
237 if let Some(bytes) = self.presenter.summary_cancelled(task, record, duration) {
238 let _ = state.stderr.write_all(&bytes);
239 }
240 }
241}
242
243fn flush_live_partials<O, E>(
253 state: &mut LiveState<O, E>,
254 presenter: &dyn TaskPresenter,
255 task: &TaskId,
256 accumulator: &LiveAccumulator,
257) where
258 O: Write + Send,
259 E: Write + Send,
260{
261 if !accumulator.stdout_partial.is_empty() {
262 let _ = emit_lines(
263 &mut state.stdout,
264 presenter,
265 task,
266 vec![accumulator.stdout_partial.clone()],
267 );
268 }
269 if !accumulator.stderr_partial.is_empty() {
270 let _ = emit_lines(
271 &mut state.stderr,
272 presenter,
273 task,
274 vec![accumulator.stderr_partial.clone()],
275 );
276 }
277}
278
279pub struct BufferedOutputObserver<O, E>
290where
291 O: Write + Send,
292 E: Write + Send,
293{
294 state: Mutex<BufferedState<O, E>>,
295 presenter: Arc<dyn TaskPresenter>,
296}
297
298struct BufferedState<O, E> {
299 stdout: O,
300 stderr: E,
301 accumulators: BTreeMap<TaskId, BufferedAccumulator>,
302}
303
304struct BufferedAccumulator {
305 stdout: Vec<u8>,
306 stderr: Vec<u8>,
307 started_at: Instant,
308}
309
310impl BufferedAccumulator {
311 fn new() -> Self {
312 Self {
313 stdout: Vec::new(),
314 stderr: Vec::new(),
315 started_at: Instant::now(),
316 }
317 }
318}
319
320impl<O, E> BufferedOutputObserver<O, E>
321where
322 O: Write + Send,
323 E: Write + Send,
324{
325 pub fn new(stdout: O, stderr: E) -> Self {
331 Self::with_presenter(stdout, stderr, Arc::new(PlainPresenter))
332 }
333
334 pub fn with_presenter(stdout: O, stderr: E, presenter: Arc<dyn TaskPresenter>) -> Self {
340 Self {
341 state: Mutex::new(BufferedState {
342 stdout,
343 stderr,
344 accumulators: BTreeMap::new(),
345 }),
346 presenter,
347 }
348 }
349
350 pub fn into_inner(self) -> (O, E) {
353 let state = self
354 .state
355 .into_inner()
356 .unwrap_or_else(PoisonError::into_inner);
357 (state.stdout, state.stderr)
358 }
359}
360
361impl<O, E> RunObserver for BufferedOutputObserver<O, E>
362where
363 O: Write + Send,
364 E: Write + Send,
365{
366 fn on_task_started(&self, task: &TaskId) {
367 let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
368 state
369 .accumulators
370 .insert(task.clone(), BufferedAccumulator::new());
371 }
372
373 fn on_stdout(&self, task: &TaskId, bytes: &[u8]) {
374 if bytes.is_empty() {
375 return;
376 }
377 let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
378 if let Some(accumulator) = state.accumulators.get_mut(task) {
379 accumulator.stdout.extend_from_slice(bytes);
380 }
381 }
382
383 fn on_stderr(&self, task: &TaskId, bytes: &[u8]) {
384 if bytes.is_empty() {
385 return;
386 }
387 let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
388 if let Some(accumulator) = state.accumulators.get_mut(task) {
389 accumulator.stderr.extend_from_slice(bytes);
390 }
391 }
392
393 fn on_task_finished(&self, task: &TaskId, record: &CompletedRecord) {
394 let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
395 let Some(accumulator) = state.accumulators.remove(task) else {
396 return;
397 };
398 flush_buffered_accumulator(&mut state, &accumulator);
399 let duration = accumulator.started_at.elapsed();
400 if let Some(bytes) = self.presenter.summary_completed(task, record, duration) {
401 let _ = state.stderr.write_all(&bytes);
402 }
403 }
404
405 fn on_task_skipped(&self, task: &TaskId, record: &SkipRecord) {
406 let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
409 if let Some(bytes) = self.presenter.summary_skipped(task, record) {
410 let _ = state.stderr.write_all(&bytes);
411 }
412 }
413
414 fn on_task_cancelled(&self, task: &TaskId, record: &CancelledRecord) {
415 let mut state = self.state.lock().unwrap_or_else(PoisonError::into_inner);
416 let duration = match record {
417 CancelledRecord::SignaledInFlight { .. } => {
418 state.accumulators.remove(task).map(|acc| {
419 flush_buffered_accumulator(&mut state, &acc);
420 acc.started_at.elapsed()
421 })
422 }
423 CancelledRecord::UpstreamCancelled { .. } | CancelledRecord::RunCancelled { .. } => {
424 None
425 }
426 };
427 if let Some(bytes) = self.presenter.summary_cancelled(task, record, duration) {
428 let _ = state.stderr.write_all(&bytes);
429 }
430 }
431}
432
433fn flush_buffered_accumulator<O, E>(
437 state: &mut BufferedState<O, E>,
438 accumulator: &BufferedAccumulator,
439) where
440 O: Write + Send,
441 E: Write + Send,
442{
443 if !accumulator.stdout.is_empty() {
444 let _ = state.stdout.write_all(&accumulator.stdout);
445 }
446 if !accumulator.stderr.is_empty() {
447 let _ = state.stderr.write_all(&accumulator.stderr);
448 }
449}
450
451fn split_lines(bytes: &[u8]) -> (Vec<Vec<u8>>, Vec<u8>) {
456 let mut lines = Vec::new();
457 let mut start = 0usize;
458 for (index, byte) in bytes.iter().enumerate() {
459 if *byte == b'\n' {
460 lines.push(bytes[start..index].to_vec());
461 start = index + 1;
462 }
463 }
464 let remainder = bytes[start..].to_vec();
465 (lines, remainder)
466}
467
468fn emit_lines<W: Write>(
473 sink: &mut W,
474 presenter: &dyn TaskPresenter,
475 task: &TaskId,
476 lines: Vec<Vec<u8>>,
477) -> io::Result<()> {
478 let prefix = presenter.prefix(task);
479 for line in lines {
480 let mut buf = Vec::with_capacity(prefix.len() + line.len() + 1);
481 buf.extend_from_slice(&prefix);
482 buf.extend_from_slice(&line);
483 buf.push(b'\n');
484 sink.write_all(&buf)?;
485 }
486 Ok(())
487}
488
489#[cfg(test)]
490mod tests {
491 use std::str::FromStr;
492
493 use haz_domain::name::{ProjectName, TaskName};
494
495 use super::*;
496 use crate::run_task::{RunSource, RunState};
497
498 fn task_id_for(project: &str, task: &str) -> TaskId {
499 TaskId {
500 project: ProjectName::from_str(project).unwrap(),
501 task: TaskName::from_str(task).unwrap(),
502 }
503 }
504
505 fn completed_for(task: &TaskId) -> CompletedRecord {
506 CompletedRecord {
507 task: task.clone(),
508 source: RunSource::FreshRun,
509 state: RunState::Succeeded,
510 exit_status: None,
511 stdout_hash: [0u8; 32],
512 stderr_hash: [0u8; 32],
513 materialised_outputs: Vec::new(),
514 }
515 }
516
517 fn signaled_in_flight_for(task: &TaskId) -> CancelledRecord {
518 let status: std::process::ExitStatus = std::os::unix::process::ExitStatusExt::from_raw(0);
519 CancelledRecord::SignaledInFlight {
520 task: task.clone(),
521 exit_status: status,
522 stdout_hash: [0u8; 32],
523 stderr_hash: [0u8; 32],
524 }
525 }
526
527 fn upstream_cancelled_for(task: &TaskId, upstream: &TaskId) -> CancelledRecord {
528 CancelledRecord::UpstreamCancelled {
529 task: task.clone(),
530 upstream: upstream.clone(),
531 }
532 }
533
534 fn run_cancelled_for(task: &TaskId) -> CancelledRecord {
535 CancelledRecord::RunCancelled { task: task.clone() }
536 }
537
538 fn upstream_failed_for(task: &TaskId, upstream: &TaskId) -> SkipRecord {
539 SkipRecord {
540 task: task.clone(),
541 cause: crate::run_task::SkipCause::UpstreamFailed {
542 upstream: upstream.clone(),
543 },
544 }
545 }
546
547 #[test]
550 fn live_emits_complete_line_with_tag_prefix() {
551 let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
552 let task = task_id_for("lib", "build");
553 observer.on_task_started(&task);
554 observer.on_stdout(&task, b"hello world\n");
555 observer.on_task_finished(&task, &completed_for(&task));
556 let (stdout, stderr) = observer.into_inner();
557 assert_eq!(stdout, b"[lib:build] hello world\n");
558 assert!(stderr.is_empty());
559 }
560
561 #[test]
562 fn live_splits_multi_line_chunk_on_newlines() {
563 let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
564 let task = task_id_for("lib", "build");
565 observer.on_task_started(&task);
566 observer.on_stdout(&task, b"one\ntwo\nthree\n");
567 observer.on_task_finished(&task, &completed_for(&task));
568 let (stdout, _) = observer.into_inner();
569 assert_eq!(
570 stdout,
571 b"[lib:build] one\n[lib:build] two\n[lib:build] three\n"
572 );
573 }
574
575 #[test]
576 fn live_joins_line_across_chunks() {
577 let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
578 let task = task_id_for("lib", "build");
579 observer.on_task_started(&task);
580 observer.on_stdout(&task, b"hel");
581 observer.on_stdout(&task, b"lo wor");
582 observer.on_stdout(&task, b"ld\n");
583 observer.on_task_finished(&task, &completed_for(&task));
584 let (stdout, _) = observer.into_inner();
585 assert_eq!(stdout, b"[lib:build] hello world\n");
586 }
587
588 #[test]
589 fn live_flushes_partial_line_with_newline_on_finish() {
590 let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
591 let task = task_id_for("lib", "build");
592 observer.on_task_started(&task);
593 observer.on_stdout(&task, b"complete\npartial");
594 observer.on_task_finished(&task, &completed_for(&task));
595 let (stdout, _) = observer.into_inner();
596 assert_eq!(stdout, b"[lib:build] complete\n[lib:build] partial\n");
597 }
598
599 #[test]
600 fn live_flushes_partial_line_on_cancel_signaled_in_flight() {
601 let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
602 let task = task_id_for("lib", "build");
603 observer.on_task_started(&task);
604 observer.on_stdout(&task, b"error: panic at");
605 observer.on_task_cancelled(&task, &signaled_in_flight_for(&task));
606 let (stdout, _) = observer.into_inner();
607 assert_eq!(stdout, b"[lib:build] error: panic at\n");
608 }
609
610 #[test]
611 fn live_stderr_routed_to_stderr_sink_with_prefix() {
612 let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
613 let task = task_id_for("lib", "build");
614 observer.on_task_started(&task);
615 observer.on_stderr(&task, b"warning: deprecated\n");
616 observer.on_task_finished(&task, &completed_for(&task));
617 let (stdout, stderr) = observer.into_inner();
618 assert!(stdout.is_empty());
619 assert_eq!(stderr, b"[lib:build] warning: deprecated\n");
620 }
621
622 #[test]
623 fn live_empty_chunk_is_noop() {
624 let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
625 let task = task_id_for("lib", "build");
626 observer.on_task_started(&task);
627 observer.on_stdout(&task, b"");
628 observer.on_stderr(&task, b"");
629 observer.on_task_finished(&task, &completed_for(&task));
630 let (stdout, stderr) = observer.into_inner();
631 assert!(stdout.is_empty());
632 assert!(stderr.is_empty());
633 }
634
635 #[test]
636 fn live_bare_newline_byte_emits_empty_tagged_line() {
637 let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
638 let task = task_id_for("lib", "build");
639 observer.on_task_started(&task);
640 observer.on_stdout(&task, b"\n");
641 observer.on_task_finished(&task, &completed_for(&task));
642 let (stdout, _) = observer.into_inner();
643 assert_eq!(stdout, b"[lib:build] \n");
644 }
645
646 #[test]
649 fn live_cancel_upstream_is_noop_when_no_accumulator() {
650 let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
651 let task = task_id_for("lib", "downstream");
652 let upstream = task_id_for("lib", "root");
653 observer.on_task_cancelled(&task, &upstream_cancelled_for(&task, &upstream));
654 let (stdout, stderr) = observer.into_inner();
655 assert!(stdout.is_empty());
656 assert!(stderr.is_empty());
657 }
658
659 #[test]
660 fn live_cancel_run_cancelled_is_noop_when_no_accumulator() {
661 let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
662 let task = task_id_for("lib", "drained");
663 observer.on_task_cancelled(&task, &run_cancelled_for(&task));
664 let (stdout, stderr) = observer.into_inner();
665 assert!(stdout.is_empty());
666 assert!(stderr.is_empty());
667 }
668
669 #[test]
670 fn live_skipped_is_noop_when_no_accumulator() {
671 let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
672 let task = task_id_for("lib", "downstream");
673 let upstream = task_id_for("lib", "root");
674 observer.on_task_skipped(&task, &upstream_failed_for(&task, &upstream));
675 let (stdout, stderr) = observer.into_inner();
676 assert!(stdout.is_empty());
677 assert!(stderr.is_empty());
678 }
679
680 #[test]
683 fn live_two_tasks_lines_interleave_at_line_granularity() {
684 let observer = LiveOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
685 let a = task_id_for("lib", "a");
686 let b = task_id_for("lib", "b");
687 observer.on_task_started(&a);
688 observer.on_task_started(&b);
689 observer.on_stdout(&a, b"alpha\n");
690 observer.on_stdout(&b, b"beta\n");
691 observer.on_stdout(&a, b"gamma\n");
692 observer.on_task_finished(&a, &completed_for(&a));
693 observer.on_task_finished(&b, &completed_for(&b));
694 let (stdout, _) = observer.into_inner();
695 assert_eq!(stdout, b"[lib:a] alpha\n[lib:b] beta\n[lib:a] gamma\n");
696 }
697
698 #[test]
701 fn buffered_emits_block_on_finish_no_prefix() {
702 let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
703 let task = task_id_for("lib", "build");
704 observer.on_task_started(&task);
705 observer.on_stdout(&task, b"hello world\nno-newline-tail");
706 observer.on_task_finished(&task, &completed_for(&task));
707 let (stdout, _) = observer.into_inner();
708 assert_eq!(stdout, b"hello world\nno-newline-tail");
709 }
710
711 #[test]
712 fn buffered_accumulates_across_chunks() {
713 let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
714 let task = task_id_for("lib", "build");
715 observer.on_task_started(&task);
716 observer.on_stdout(&task, b"abc");
717 observer.on_stdout(&task, b"def");
718 observer.on_stdout(&task, b"ghi");
719 observer.on_task_finished(&task, &completed_for(&task));
720 let (stdout, _) = observer.into_inner();
721 assert_eq!(stdout, b"abcdefghi");
722 }
723
724 #[test]
725 fn buffered_emits_stdout_block_before_stderr_block() {
726 let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
727 let task = task_id_for("lib", "build");
728 observer.on_task_started(&task);
729 observer.on_stdout(&task, b"out\n");
730 observer.on_stderr(&task, b"err\n");
731 observer.on_task_finished(&task, &completed_for(&task));
732 let (stdout, stderr) = observer.into_inner();
733 assert_eq!(stdout, b"out\n");
734 assert_eq!(stderr, b"err\n");
735 }
736
737 #[test]
738 fn buffered_cancel_signaled_in_flight_flushes_blocks() {
739 let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
740 let task = task_id_for("lib", "build");
741 observer.on_task_started(&task);
742 observer.on_stdout(&task, b"out-bytes");
743 observer.on_stderr(&task, b"err-bytes");
744 observer.on_task_cancelled(&task, &signaled_in_flight_for(&task));
745 let (stdout, stderr) = observer.into_inner();
746 assert_eq!(stdout, b"out-bytes");
747 assert_eq!(stderr, b"err-bytes");
748 }
749
750 #[test]
751 fn buffered_cancel_upstream_is_noop_when_no_accumulator() {
752 let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
753 let task = task_id_for("lib", "downstream");
754 let upstream = task_id_for("lib", "root");
755 observer.on_task_cancelled(&task, &upstream_cancelled_for(&task, &upstream));
756 let (stdout, stderr) = observer.into_inner();
757 assert!(stdout.is_empty());
758 assert!(stderr.is_empty());
759 }
760
761 #[test]
762 fn buffered_cancel_run_cancelled_is_noop_when_no_accumulator() {
763 let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
764 let task = task_id_for("lib", "drained");
765 observer.on_task_cancelled(&task, &run_cancelled_for(&task));
766 let (stdout, stderr) = observer.into_inner();
767 assert!(stdout.is_empty());
768 assert!(stderr.is_empty());
769 }
770
771 #[test]
772 fn buffered_skipped_is_noop_when_no_accumulator() {
773 let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
774 let task = task_id_for("lib", "downstream");
775 let upstream = task_id_for("lib", "root");
776 observer.on_task_skipped(&task, &upstream_failed_for(&task, &upstream));
777 let (stdout, stderr) = observer.into_inner();
778 assert!(stdout.is_empty());
779 assert!(stderr.is_empty());
780 }
781
782 #[test]
783 fn buffered_empty_streams_emit_nothing_no_panic() {
784 let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
785 let task = task_id_for("lib", "build");
786 observer.on_task_started(&task);
787 observer.on_task_finished(&task, &completed_for(&task));
788 let (stdout, stderr) = observer.into_inner();
789 assert!(stdout.is_empty());
790 assert!(stderr.is_empty());
791 }
792
793 #[test]
794 fn buffered_tasks_never_interleave_under_sequential_calls() {
795 let observer = BufferedOutputObserver::new(Vec::<u8>::new(), Vec::<u8>::new());
799 let a = task_id_for("lib", "a");
800 let b = task_id_for("lib", "b");
801 observer.on_task_started(&a);
802 observer.on_task_started(&b);
803 observer.on_stdout(&a, b"AAA");
804 observer.on_stdout(&b, b"BBB");
805 observer.on_stdout(&a, b"AAA");
806 observer.on_task_finished(&a, &completed_for(&a));
807 observer.on_task_finished(&b, &completed_for(&b));
808 let (stdout, _) = observer.into_inner();
809 assert_eq!(stdout, b"AAAAAABBB");
813 }
814
815 #[test]
818 fn split_lines_yields_complete_lines_and_trailing_remainder() {
819 let (lines, remainder) = split_lines(b"alpha\nbeta\ngamma");
820 assert_eq!(lines, vec![b"alpha".to_vec(), b"beta".to_vec()]);
821 assert_eq!(remainder, b"gamma".to_vec());
822 }
823
824 #[test]
825 fn split_lines_terminator_only_input_yields_empty_remainder() {
826 let (lines, remainder) = split_lines(b"alpha\n");
827 assert_eq!(lines, vec![b"alpha".to_vec()]);
828 assert!(remainder.is_empty());
829 }
830
831 #[test]
832 fn split_lines_empty_input_yields_nothing() {
833 let (lines, remainder) = split_lines(b"");
834 assert!(lines.is_empty());
835 assert!(remainder.is_empty());
836 }
837
838 struct RecordingPresenter;
844
845 impl TaskPresenter for RecordingPresenter {
846 fn prefix(&self, task: &TaskId) -> Vec<u8> {
847 format!("<{task}> ").into_bytes()
848 }
849
850 fn summary_completed(
851 &self,
852 task: &TaskId,
853 record: &CompletedRecord,
854 duration: std::time::Duration,
855 ) -> Option<Vec<u8>> {
856 let kind = match record.source {
857 crate::run_task::RunSource::CacheHit => "hit",
858 crate::run_task::RunSource::FreshRun => "fresh",
859 };
860 let nonzero = if duration > std::time::Duration::ZERO {
861 "+"
862 } else {
863 "0"
864 };
865 Some(format!("[{task}] completed:{kind}:{nonzero}\n").into_bytes())
866 }
867
868 fn summary_skipped(&self, task: &TaskId, _record: &SkipRecord) -> Option<Vec<u8>> {
869 Some(format!("[{task}] skipped\n").into_bytes())
870 }
871
872 fn summary_cancelled(
873 &self,
874 task: &TaskId,
875 _record: &CancelledRecord,
876 duration: Option<std::time::Duration>,
877 ) -> Option<Vec<u8>> {
878 let dur = if duration.is_some() { "some" } else { "none" };
879 Some(format!("[{task}] cancelled:{dur}\n").into_bytes())
880 }
881 }
882
883 #[test]
884 fn live_with_presenter_uses_custom_prefix() {
885 let observer = LiveOutputObserver::with_presenter(
886 Vec::<u8>::new(),
887 Vec::<u8>::new(),
888 Arc::new(RecordingPresenter),
889 );
890 let task = task_id_for("lib", "build");
891 observer.on_task_started(&task);
892 observer.on_stdout(&task, b"hello\n");
893 observer.on_task_finished(&task, &completed_for(&task));
894 let (stdout, stderr) = observer.into_inner();
895 assert_eq!(stdout, b"<lib:build> hello\n");
896 assert_eq!(stderr, b"[lib:build] completed:fresh:+\n");
899 }
900
901 #[test]
902 fn live_completed_summary_routed_to_stderr_after_partial_flush() {
903 let observer = LiveOutputObserver::with_presenter(
904 Vec::<u8>::new(),
905 Vec::<u8>::new(),
906 Arc::new(RecordingPresenter),
907 );
908 let task = task_id_for("lib", "build");
909 observer.on_task_started(&task);
910 observer.on_stdout(&task, b"trailing-no-newline");
911 observer.on_task_finished(&task, &completed_for(&task));
912 let (stdout, stderr) = observer.into_inner();
913 assert_eq!(stdout, b"<lib:build> trailing-no-newline\n");
915 assert_eq!(stderr, b"[lib:build] completed:fresh:+\n");
917 }
918
919 #[test]
920 fn live_skip_emits_summary_line_without_accumulator() {
921 let observer = LiveOutputObserver::with_presenter(
922 Vec::<u8>::new(),
923 Vec::<u8>::new(),
924 Arc::new(RecordingPresenter),
925 );
926 let task = task_id_for("lib", "down");
927 let upstream = task_id_for("lib", "root");
928 observer.on_task_skipped(&task, &upstream_failed_for(&task, &upstream));
929 let (stdout, stderr) = observer.into_inner();
930 assert!(stdout.is_empty());
931 assert_eq!(stderr, b"[lib:down] skipped\n");
932 }
933
934 #[test]
935 fn live_cancel_signaled_in_flight_carries_duration() {
936 let observer = LiveOutputObserver::with_presenter(
937 Vec::<u8>::new(),
938 Vec::<u8>::new(),
939 Arc::new(RecordingPresenter),
940 );
941 let task = task_id_for("lib", "build");
942 observer.on_task_started(&task);
943 observer.on_task_cancelled(&task, &signaled_in_flight_for(&task));
944 let (_, stderr) = observer.into_inner();
945 assert_eq!(stderr, b"[lib:build] cancelled:some\n");
946 }
947
948 #[test]
949 fn live_cancel_run_cancelled_carries_no_duration() {
950 let observer = LiveOutputObserver::with_presenter(
951 Vec::<u8>::new(),
952 Vec::<u8>::new(),
953 Arc::new(RecordingPresenter),
954 );
955 let task = task_id_for("lib", "drained");
956 observer.on_task_cancelled(&task, &run_cancelled_for(&task));
957 let (_, stderr) = observer.into_inner();
958 assert_eq!(stderr, b"[lib:drained] cancelled:none\n");
959 }
960
961 #[test]
962 fn buffered_with_presenter_emits_summary_after_blocks() {
963 let observer = BufferedOutputObserver::with_presenter(
964 Vec::<u8>::new(),
965 Vec::<u8>::new(),
966 Arc::new(RecordingPresenter),
967 );
968 let task = task_id_for("lib", "build");
969 observer.on_task_started(&task);
970 observer.on_stdout(&task, b"out");
971 observer.on_stderr(&task, b"err");
972 observer.on_task_finished(&task, &completed_for(&task));
973 let (stdout, stderr) = observer.into_inner();
974 assert_eq!(stdout, b"out");
976 assert_eq!(stderr, b"err[lib:build] completed:fresh:+\n");
979 }
980
981 #[test]
982 fn buffered_skip_emits_summary_line() {
983 let observer = BufferedOutputObserver::with_presenter(
984 Vec::<u8>::new(),
985 Vec::<u8>::new(),
986 Arc::new(RecordingPresenter),
987 );
988 let task = task_id_for("lib", "down");
989 let upstream = task_id_for("lib", "root");
990 observer.on_task_skipped(&task, &upstream_failed_for(&task, &upstream));
991 let (_, stderr) = observer.into_inner();
992 assert_eq!(stderr, b"[lib:down] skipped\n");
993 }
994
995 #[test]
996 fn buffered_cancel_signaled_carries_duration() {
997 let observer = BufferedOutputObserver::with_presenter(
998 Vec::<u8>::new(),
999 Vec::<u8>::new(),
1000 Arc::new(RecordingPresenter),
1001 );
1002 let task = task_id_for("lib", "build");
1003 observer.on_task_started(&task);
1004 observer.on_task_cancelled(&task, &signaled_in_flight_for(&task));
1005 let (_, stderr) = observer.into_inner();
1006 assert_eq!(stderr, b"[lib:build] cancelled:some\n");
1007 }
1008}