1use std::collections::HashMap;
8use std::io::{self, Stdout, Write};
9use std::time::Duration;
10
11use chrono::{DateTime, Utc};
12use crossterm::terminal::{disable_raw_mode, enable_raw_mode};
13use ratatui::backend::{Backend, ClearType, CrosstermBackend, WindowSize};
14use ratatui::layout::{Position, Size};
15use ratatui::style::Style;
16use ratatui::text::{Line, Span};
17use ratatui::widgets::{Paragraph, Widget};
18use ratatui::{Terminal, TerminalOptions, Viewport};
19
20use crate::yarli_core::domain::CancellationProvenance;
21use crate::yarli_core::domain::TaskId;
22use crate::yarli_core::entities::continuation::TaskHealthAction;
23use crate::yarli_core::explain::DeteriorationTrend;
24use crate::yarli_core::fsm::task::TaskState;
25
26use super::events::{StreamEvent, TaskView};
27use super::spinner::{Spinner, GLYPH_BLOCKED, GLYPH_COMPLETE, GLYPH_FAILED, GLYPH_PENDING};
28use super::style::Tier;
29
30const DEFAULT_VIEWPORT_HEIGHT: u16 = 8;
32const RUN_ID_DISPLAY_LEN: usize = 12;
33const TRANSIENT_STATUS_EMIT_SECS: i64 = 30;
34type StreamTerminal = Terminal<CursorProbeBackend<CrosstermBackend<Stdout>>>;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37enum CursorProbeStrategy {
38 #[allow(dead_code)]
39 QueryTerminal,
40 AnchorBottomRow,
41}
42
43#[derive(Debug)]
44struct CursorProbeBackend<B> {
45 inner: B,
46 strategy: CursorProbeStrategy,
47 last_known_cursor: Option<Position>,
48}
49
50impl<B> CursorProbeBackend<B> {
51 fn new(inner: B, strategy: CursorProbeStrategy) -> Self {
52 Self {
53 inner,
54 strategy,
55 last_known_cursor: None,
56 }
57 }
58}
59
60impl<B> CursorProbeBackend<B>
61where
62 B: Backend,
63{
64 fn anchor_bottom_row(&mut self) -> io::Result<Position> {
65 let position = fallback_cursor_position(self.inner.size()?);
66 self.inner.set_cursor_position(position)?;
67 self.last_known_cursor = Some(position);
68 Ok(position)
69 }
70}
71
72impl<B> Backend for CursorProbeBackend<B>
73where
74 B: Backend,
75{
76 fn draw<'a, I>(&mut self, content: I) -> io::Result<()>
77 where
78 I: Iterator<Item = (u16, u16, &'a ratatui::buffer::Cell)>,
79 {
80 self.inner.draw(content)
81 }
82
83 fn append_lines(&mut self, n: u16) -> io::Result<()> {
84 self.inner.append_lines(n)
85 }
86
87 fn hide_cursor(&mut self) -> io::Result<()> {
88 self.inner.hide_cursor()
89 }
90
91 fn show_cursor(&mut self) -> io::Result<()> {
92 self.inner.show_cursor()
93 }
94
95 fn get_cursor_position(&mut self) -> io::Result<Position> {
96 let position = match self.strategy {
97 CursorProbeStrategy::QueryTerminal => match self.inner.get_cursor_position() {
98 Ok(position) => position,
99 Err(_) => {
100 self.strategy = CursorProbeStrategy::AnchorBottomRow;
101 self.anchor_bottom_row()?
102 }
103 },
104 CursorProbeStrategy::AnchorBottomRow => self.anchor_bottom_row()?,
105 };
106 self.last_known_cursor = Some(position);
107 Ok(position)
108 }
109
110 fn set_cursor_position<P: Into<Position>>(&mut self, position: P) -> io::Result<()> {
111 let position = position.into();
112 self.last_known_cursor = Some(position);
113 self.inner.set_cursor_position(position)
114 }
115
116 fn clear(&mut self) -> io::Result<()> {
117 self.inner.clear()
118 }
119
120 fn clear_region(&mut self, clear_type: ClearType) -> io::Result<()> {
121 self.inner.clear_region(clear_type)
122 }
123
124 fn size(&self) -> io::Result<Size> {
125 self.inner.size()
126 }
127
128 fn window_size(&mut self) -> io::Result<WindowSize> {
129 self.inner.window_size()
130 }
131
132 fn flush(&mut self) -> io::Result<()> {
133 self.inner.flush()
134 }
135}
136
137#[derive(Debug, Clone)]
139pub struct StreamConfig {
140 pub viewport_height: u16,
142 pub verbose_output: bool,
144}
145
146impl Default for StreamConfig {
147 fn default() -> Self {
148 Self {
149 viewport_height: DEFAULT_VIEWPORT_HEIGHT,
150 verbose_output: false,
151 }
152 }
153}
154
155pub struct StreamRenderer {
161 terminal: StreamTerminal,
162 config: StreamConfig,
163 tasks: HashMap<TaskId, TaskView>,
165 task_states: HashMap<TaskId, TaskState>,
167 task_order: Vec<TaskId>,
169 spinners: HashMap<TaskId, Spinner>,
171 explain_summary: Option<String>,
173 transient_status: Option<String>,
175 last_transient_status_emit_at: Option<DateTime<Utc>>,
177}
178
179impl StreamRenderer {
180 pub fn new(config: StreamConfig) -> io::Result<Self> {
182 enable_raw_mode()?;
183 let backend = CursorProbeBackend::new(
186 CrosstermBackend::new(io::stdout()),
187 CursorProbeStrategy::AnchorBottomRow,
188 );
189 let terminal = match Terminal::with_options(
190 backend,
191 TerminalOptions {
192 viewport: Viewport::Inline(config.viewport_height),
193 },
194 ) {
195 Ok(terminal) => terminal,
196 Err(error) => {
197 let _ = disable_raw_mode();
198 return Err(error);
199 }
200 };
201
202 Ok(Self {
203 terminal,
204 config,
205 tasks: HashMap::new(),
206 task_states: HashMap::new(),
207 task_order: Vec::new(),
208 spinners: HashMap::new(),
209 explain_summary: None,
210 transient_status: None,
211 last_transient_status_emit_at: None,
212 })
213 }
214
215 pub fn handle_event(&mut self, event: StreamEvent) -> io::Result<()> {
217 match event {
218 StreamEvent::TaskDiscovered {
219 task_id,
220 task_name,
221 depends_on,
222 } => {
223 let blocked_by = if depends_on.is_empty() {
224 None
225 } else {
226 Some(depends_on.join(", "))
227 };
228 if let std::collections::hash_map::Entry::Vacant(e) = self.tasks.entry(task_id) {
229 self.task_order.push(task_id);
230 e.insert(TaskView {
231 task_id,
232 name: task_name,
233 state: TaskState::TaskOpen,
234 elapsed: None,
235 last_output_line: None,
236 blocked_by,
237 worker_id: None,
238 });
239 }
240 self.task_states
241 .entry(task_id)
242 .or_insert(TaskState::TaskOpen);
243 }
244 StreamEvent::TaskTransition {
245 task_id,
246 task_name,
247 from,
248 to,
249 elapsed,
250 exit_code,
251 detail,
252 at,
253 } => {
254 self.handle_task_transition(
255 task_id,
256 &task_name,
257 from,
258 to,
259 elapsed,
260 exit_code,
261 detail.as_deref(),
262 at,
263 )?;
264 }
265 StreamEvent::RunTransition {
266 run_id,
267 from,
268 to,
269 reason,
270 at,
271 } => {
272 let progress = self.progress_snapshot();
273 self.push_run_transition(run_id, from, to, reason.as_deref(), at, progress)?;
274 }
275 StreamEvent::RunStarted {
276 run_id,
277 objective,
278 at,
279 } => {
280 self.push_run_started(run_id, &objective, at)?;
281 }
282 StreamEvent::CommandOutput {
283 task_id,
284 task_name,
285 line,
286 } => {
287 if self.config.verbose_output {
288 let output_line = Line::from(vec![
289 Span::styled(format!(" [{task_name}] "), Tier::Background.style()),
290 Span::styled(line.clone(), Tier::Contextual.style()),
291 ]);
292 self.terminal.insert_before(1, |buf| {
293 Paragraph::new(output_line).render(buf.area, buf);
294 })?;
295 }
296 if let Some(view) = self.tasks.get_mut(&task_id) {
297 view.last_output_line = Some(line);
298 }
299 }
300 StreamEvent::TransientStatus { message } => {
301 if should_emit_transient_status_line(
302 self.last_transient_status_emit_at,
303 &message,
304 Utc::now(),
305 ) {
306 let progress = self.progress_snapshot();
307 self.push_transient_status_line(&message, Utc::now(), progress)?;
308 }
309 self.transient_status = Some(message);
310 }
311 StreamEvent::ExplainUpdate { summary } => {
312 self.explain_summary = Some(summary);
313 }
314 StreamEvent::TaskWorker { task_id, worker_id } => {
315 if let Some(view) = self.tasks.get_mut(&task_id) {
316 view.worker_id = Some(worker_id);
317 }
318 }
319 StreamEvent::RunExited { payload } => {
320 self.push_continuation_summary(&payload)?;
321 }
322 StreamEvent::Tick => {
323 for spinner in self.spinners.values_mut() {
324 spinner.tick();
325 }
326 }
327 }
328
329 self.draw_viewport()?;
330 io::stdout().flush()?;
332 Ok(())
333 }
334
335 #[allow(clippy::too_many_arguments)]
337 fn handle_task_transition(
338 &mut self,
339 task_id: TaskId,
340 task_name: &str,
341 from: TaskState,
342 to: TaskState,
343 elapsed: Option<Duration>,
344 exit_code: Option<i32>,
345 detail: Option<&str>,
346 at: DateTime<Utc>,
347 ) -> io::Result<()> {
348 self.task_states.insert(task_id, to);
349 let progress = self.progress_snapshot();
350
351 self.push_task_transition(
353 task_id, task_name, from, to, elapsed, exit_code, detail, at, progress,
354 )?;
355
356 if to.is_terminal() {
357 self.tasks.remove(&task_id);
359 self.task_order.retain(|id| *id != task_id);
360 self.spinners.remove(&task_id);
361 } else {
362 if let std::collections::hash_map::Entry::Vacant(e) = self.tasks.entry(task_id) {
364 self.task_order.push(task_id);
365 e.insert(TaskView {
366 task_id,
367 name: task_name.to_string(),
368 state: to,
369 elapsed,
370 last_output_line: None,
371 blocked_by: None,
372 worker_id: None,
373 });
374 } else {
375 let view = self.tasks.get_mut(&task_id).unwrap();
376 view.state = to;
377 view.elapsed = elapsed;
378 }
379
380 if to == TaskState::TaskExecuting {
381 self.spinners.entry(task_id).or_default();
382 }
383 }
384
385 Ok(())
386 }
387
388 #[allow(clippy::too_many_arguments)]
393 fn push_task_transition(
394 &mut self,
395 _task_id: TaskId,
396 task_name: &str,
397 from: TaskState,
398 to: TaskState,
399 elapsed: Option<Duration>,
400 exit_code: Option<i32>,
401 detail: Option<&str>,
402 at: DateTime<Utc>,
403 progress: ProgressSnapshot,
404 ) -> io::Result<()> {
405 let tier = tier_for_task_state(to);
406 let time_str = at.format("%H:%M:%S").to_string();
407
408 let mut spans = vec![
409 Span::styled(time_str, Tier::Contextual.style()),
410 Span::styled(" ▸ ", Tier::Background.style()),
411 Span::styled(format!("task/{:<16}", task_name), tier.style()),
412 Span::styled(format!("{:?}", from), Tier::Contextual.style()),
413 Span::styled(" → ", Tier::Background.style()),
414 Span::styled(format!("{:?}", to), tier.style()),
415 ];
416
417 let mut meta_parts = Vec::new();
419 if let Some(d) = elapsed {
420 meta_parts.push(format_duration(d));
421 }
422 if let Some(code) = exit_code {
423 meta_parts.push(format!("exit {code}"));
424 }
425 if let Some(d) = detail {
426 meta_parts.push(d.to_string());
427 }
428 if !meta_parts.is_empty() {
429 spans.push(Span::styled(
430 format!(" ({})", meta_parts.join(", ")),
431 Tier::Contextual.style(),
432 ));
433 }
434 spans.push(Span::styled(
435 format!(" progress {}", format_ascii_progress(progress, 20)),
436 Tier::Contextual.style(),
437 ));
438
439 let line = Line::from(spans);
440
441 self.terminal.insert_before(1, |buf| {
442 Paragraph::new(line).render(buf.area, buf);
443 })?;
444
445 Ok(())
446 }
447
448 fn push_run_started(
450 &mut self,
451 run_id: uuid::Uuid,
452 objective: &str,
453 at: DateTime<Utc>,
454 ) -> io::Result<()> {
455 let time_str = at.format("%H:%M:%S").to_string();
456 let display_id = display_run_id(run_id);
457
458 let spans = vec![
459 Span::styled(time_str, Tier::Contextual.style()),
460 Span::styled(" ▸ ", Tier::Background.style()),
461 Span::styled(format!("run/{display_id}"), Tier::Active.style()),
462 Span::styled(format!(" started: {objective}"), Tier::Contextual.style()),
463 ];
464
465 let line = Line::from(spans);
466
467 self.terminal.insert_before(1, |buf| {
468 Paragraph::new(line).render(buf.area, buf);
469 })?;
470
471 Ok(())
472 }
473
474 fn push_run_transition(
476 &mut self,
477 run_id: uuid::Uuid,
478 from: crate::yarli_core::fsm::run::RunState,
479 to: crate::yarli_core::fsm::run::RunState,
480 reason: Option<&str>,
481 at: DateTime<Utc>,
482 progress: ProgressSnapshot,
483 ) -> io::Result<()> {
484 let tier = tier_for_run_state(to);
485 let time_str = at.format("%H:%M:%S").to_string();
486 let display_id = display_run_id(run_id);
487
488 let mut spans = vec![
489 Span::styled(time_str, Tier::Contextual.style()),
490 Span::styled(" ▸ ", Tier::Background.style()),
491 Span::styled(format!("run/{:<20}", display_id), tier.style()),
492 Span::styled(format!("{:?}", from), Tier::Contextual.style()),
493 Span::styled(" → ", Tier::Background.style()),
494 Span::styled(format!("{:?}", to), tier.style()),
495 ];
496
497 if let Some(r) = reason {
498 spans.push(Span::styled(
499 format!(" (reason: {r})"),
500 Tier::Contextual.style(),
501 ));
502 }
503 spans.push(Span::styled(
504 format!(" progress {}", format_ascii_progress(progress, 20)),
505 Tier::Contextual.style(),
506 ));
507
508 let line = Line::from(spans);
509
510 self.terminal.insert_before(1, |buf| {
511 Paragraph::new(line).render(buf.area, buf);
512 })?;
513
514 Ok(())
515 }
516
517 fn push_continuation_summary(
519 &mut self,
520 payload: &crate::yarli_core::entities::ContinuationPayload,
521 ) -> io::Result<()> {
522 let s = &payload.summary;
523
524 let header = Line::from(vec![Span::styled(
526 "── Continuation ──",
527 Tier::Active.style(),
528 )]);
529 self.terminal.insert_before(1, |buf| {
530 Paragraph::new(header).render(buf.area, buf);
531 })?;
532
533 let counts = format!(
535 " {} completed, {} failed, {} pending",
536 s.completed, s.failed, s.pending
537 );
538 let counts_line = Line::from(vec![Span::styled(counts, Tier::Contextual.style())]);
539 self.terminal.insert_before(1, |buf| {
540 Paragraph::new(counts_line).render(buf.area, buf);
541 })?;
542
543 if let Some(reason) = payload.exit_reason {
544 let reason_line = Line::from(vec![Span::styled(
545 format!(" Exit reason: {reason}"),
546 Tier::Contextual.style(),
547 )]);
548 self.terminal.insert_before(1, |buf| {
549 Paragraph::new(reason_line).render(buf.area, buf);
550 })?;
551 }
552
553 let cancelled = payload.exit_state == crate::yarli_core::fsm::run::RunState::RunCancelled;
554 if cancelled || payload.cancellation_source.is_some() {
555 let source = payload
556 .cancellation_source
557 .map(|value| value.to_string())
558 .unwrap_or_else(|| "unknown".to_string());
559 let source_line = Line::from(vec![Span::styled(
560 format!(" Cancel source: {source}"),
561 Tier::Contextual.style(),
562 )]);
563 self.terminal.insert_before(1, |buf| {
564 Paragraph::new(source_line).render(buf.area, buf);
565 })?;
566 }
567
568 if cancelled || payload.cancellation_provenance.is_some() {
569 let summary =
570 format_cancel_provenance_summary(payload.cancellation_provenance.as_ref());
571 let provenance_line = Line::from(vec![Span::styled(
572 format!(" Cancel provenance: {summary}"),
573 Tier::Contextual.style(),
574 )]);
575 self.terminal.insert_before(1, |buf| {
576 Paragraph::new(provenance_line).render(buf.area, buf);
577 })?;
578 }
579
580 if let Some(tranche) = &payload.next_tranche {
582 if !tranche.retry_task_keys.is_empty() {
583 let retry = format!(" Retry: [{}]", tranche.retry_task_keys.join(", "));
584 let retry_line = Line::from(vec![Span::styled(retry, Tier::Urgent.style())]);
585 self.terminal.insert_before(1, |buf| {
586 Paragraph::new(retry_line).render(buf.area, buf);
587 })?;
588 }
589 if !tranche.unfinished_task_keys.is_empty() {
590 let unfinished = format!(
591 " Unfinished: [{}]",
592 tranche.unfinished_task_keys.join(", ")
593 );
594 let unfinished_line =
595 Line::from(vec![Span::styled(unfinished, Tier::Contextual.style())]);
596 self.terminal.insert_before(1, |buf| {
597 Paragraph::new(unfinished_line).render(buf.area, buf);
598 })?;
599 }
600 let next = format!(" Next: \"{}\"", tranche.suggested_objective);
601 let next_line = Line::from(vec![Span::styled(next, Tier::Active.style())]);
602 self.terminal.insert_before(1, |buf| {
603 Paragraph::new(next_line).render(buf.area, buf);
604 })?;
605 }
606
607 if let Some(quality_gate) = payload.quality_gate.as_ref() {
608 if matches!(
609 quality_gate.task_health_action,
610 TaskHealthAction::ForcePivot
611 ) {
612 if let Some(guidance) = Self::force_pivot_guidance(quality_gate.trend.as_ref()) {
613 let guidance_line =
614 Line::from(vec![Span::styled(guidance, Tier::Urgent.style())]);
615 self.terminal.insert_before(1, |buf| {
616 Paragraph::new(guidance_line).render(buf.area, buf);
617 })?;
618 }
619 }
620 if matches!(
621 quality_gate.task_health_action,
622 TaskHealthAction::StopAndSummarize
623 ) {
624 let guidance = format!(" Stop-and-summarize guidance: {}", quality_gate.reason);
625 let guidance_line = Line::from(vec![Span::styled(guidance, Tier::Urgent.style())]);
626 self.terminal.insert_before(1, |buf| {
627 Paragraph::new(guidance_line).render(buf.area, buf);
628 })?;
629 }
630 if matches!(
631 quality_gate.task_health_action,
632 TaskHealthAction::CheckpointNow
633 ) {
634 let guidance = format!(" Checkpoint-now guidance: {}", quality_gate.reason);
635 let guidance_line = Line::from(vec![Span::styled(guidance, Tier::Urgent.style())]);
636 self.terminal.insert_before(1, |buf| {
637 Paragraph::new(guidance_line).render(buf.area, buf);
638 })?;
639 }
640 }
641
642 Ok(())
643 }
644
645 fn force_pivot_guidance(trend: Option<&DeteriorationTrend>) -> Option<String> {
646 if matches!(trend, Some(DeteriorationTrend::Deteriorating)) {
647 Some(
648 " Force-pivot guidance: sequence quality is deteriorating; narrow scope and shift task focus before continuing."
649 .to_string(),
650 )
651 } else {
652 None
653 }
654 }
655
656 fn draw_viewport(&mut self) -> io::Result<()> {
658 let tasks: Vec<_> = self
659 .task_order
660 .iter()
661 .filter_map(|id| self.tasks.get(id))
662 .cloned()
663 .collect();
664 let spinners = &self.spinners;
665 let transient = self.transient_status.take();
666 let explain = self.explain_summary.clone();
667
668 self.terminal.draw(|frame| {
669 let area = frame.area();
670 let mut lines = Vec::new();
671
672 for task in &tasks {
674 let (glyph, tier) = match task.state {
675 TaskState::TaskExecuting => {
676 let sp = spinners
677 .get(&task.task_id)
678 .map(|s| s.frame())
679 .unwrap_or('⠋');
680 (sp, Tier::Active)
681 }
682 TaskState::TaskWaiting => ('⠿', Tier::Active),
683 TaskState::TaskBlocked => (GLYPH_BLOCKED, Tier::Contextual),
684 TaskState::TaskReady => (GLYPH_PENDING, Tier::Contextual),
685 TaskState::TaskOpen => (GLYPH_PENDING, Tier::Contextual),
686 TaskState::TaskComplete => (GLYPH_COMPLETE, Tier::Contextual),
687 TaskState::TaskFailed => (GLYPH_FAILED, Tier::Urgent),
688 TaskState::TaskCancelled => (GLYPH_BLOCKED, Tier::Contextual),
689 TaskState::TaskVerifying => (GLYPH_PENDING, Tier::Active),
690 };
691
692 let elapsed_str = task
693 .elapsed
694 .map(|d| format!("{}s", d.as_secs()))
695 .unwrap_or_default();
696
697 let mut spans = vec![
698 Span::styled(" ", Style::default()),
699 Span::styled(format!("{glyph} "), tier.style()),
700 Span::styled(format!("task/{:<14}", task.name), tier.style()),
701 Span::styled(format!("{:<8}", elapsed_str), Tier::Contextual.style()),
702 ];
703
704 if let Some(ref output) = task.last_output_line {
706 let max_len = area.width.saturating_sub(40) as usize;
707 let truncated = if output.len() > max_len {
708 let mut end = max_len;
710 while end > 0 && !output.is_char_boundary(end) {
711 end -= 1;
712 }
713 &output[..end]
714 } else {
715 output.as_str()
716 };
717 spans.push(Span::styled(truncated.to_string(), tier.accent()));
718 } else if task.state == TaskState::TaskBlocked {
719 if let Some(ref by) = task.blocked_by {
720 spans.push(Span::styled(
721 format!("blocked-by: {by}"),
722 Tier::Contextual.accent(),
723 ));
724 }
725 } else if task.state == TaskState::TaskReady || task.state == TaskState::TaskOpen {
726 spans.push(Span::styled("waiting", Tier::Contextual.accent()));
727 }
728
729 lines.push(Line::from(spans));
730 }
731
732 if let Some(msg) = transient {
734 lines.push(Line::from(vec![Span::styled(
735 format!(" {msg}"),
736 Tier::Background.style(),
737 )]));
738 }
739
740 if let Some(ref summary) = explain {
742 lines.push(Line::from(vec![
743 Span::styled(" WHY: ", Tier::Urgent.accent()),
744 Span::styled(summary.clone(), Tier::Urgent.style()),
745 ]));
746 }
747
748 let paragraph = Paragraph::new(lines);
749 frame.render_widget(paragraph, area);
750 })?;
751
752 Ok(())
753 }
754
755 pub fn restore(&mut self) -> io::Result<()> {
760 self.terminal.clear()?;
762 disable_raw_mode()?;
763 io::stdout().flush()?;
764 Ok(())
765 }
766 fn push_transient_status_line(
767 &mut self,
768 message: &str,
769 at: DateTime<Utc>,
770 progress: ProgressSnapshot,
771 ) -> io::Result<()> {
772 self.last_transient_status_emit_at = Some(at);
773 let spans = vec![
774 Span::styled(at.format("%H:%M:%S").to_string(), Tier::Contextual.style()),
775 Span::styled(" ▸ ", Tier::Background.style()),
776 Span::styled("status", Tier::Active.style()),
777 Span::styled(
778 format!(
779 " {message} progress {}",
780 format_ascii_progress(progress, 20)
781 ),
782 Tier::Contextual.style(),
783 ),
784 ];
785 let line = Line::from(spans);
786 self.terminal.insert_before(1, |buf| {
787 Paragraph::new(line).render(buf.area, buf);
788 })?;
789 Ok(())
790 }
791
792 fn progress_snapshot(&self) -> ProgressSnapshot {
793 let mut snapshot = ProgressSnapshot {
794 total: self.task_states.len() as u32,
795 ..ProgressSnapshot::default()
796 };
797 for state in self.task_states.values() {
798 match state {
799 TaskState::TaskComplete => snapshot.completed += 1,
800 TaskState::TaskFailed => snapshot.failed += 1,
801 TaskState::TaskCancelled => snapshot.cancelled += 1,
802 _ => {}
803 }
804 }
805 snapshot
806 }
807}
808
809impl Drop for StreamRenderer {
810 fn drop(&mut self) {
811 let _ = self.restore();
812 }
813}
814
815#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
816struct ProgressSnapshot {
817 total: u32,
818 completed: u32,
819 failed: u32,
820 cancelled: u32,
821}
822
823impl ProgressSnapshot {
824 fn terminal_count(self) -> u32 {
825 self.completed + self.failed + self.cancelled
826 }
827}
828
829fn format_ascii_progress(snapshot: ProgressSnapshot, width: usize) -> String {
830 let done = snapshot.terminal_count();
831 let total = snapshot.total;
832 let (filled, percent) = if total == 0 {
833 (0usize, 0u32)
834 } else {
835 let filled = ((done as f64 / total as f64) * width as f64).round() as usize;
836 let percent = ((done as f64 / total as f64) * 100.0).round() as u32;
837 (filled.min(width), percent.min(100))
838 };
839 let bar = format!("{}{}", "#".repeat(filled), ".".repeat(width - filled));
840 format!("[{bar}] {done}/{total} ({percent}%)")
841}
842
843fn should_emit_transient_status_line(
844 last_emit_at: Option<DateTime<Utc>>,
845 message: &str,
846 now: DateTime<Utc>,
847) -> bool {
848 if message.starts_with("operator ") {
849 return true;
850 }
851 let Some(last) = last_emit_at else {
852 return true;
853 };
854 now.signed_duration_since(last).num_seconds() >= TRANSIENT_STATUS_EMIT_SECS
855}
856
857fn format_cancel_provenance_summary(provenance: Option<&CancellationProvenance>) -> String {
858 let signal = provenance
859 .and_then(|p| p.signal_name.as_deref())
860 .unwrap_or("unknown");
861 let sender = provenance
862 .and_then(|p| p.sender_pid)
863 .map(|pid| pid.to_string())
864 .unwrap_or_else(|| "unknown".to_string());
865 let receiver = provenance
866 .and_then(|p| p.receiver_pid)
867 .map(|pid| format!("yarli({pid})"))
868 .unwrap_or_else(|| "unknown".to_string());
869 let actor = provenance
870 .and_then(|p| p.actor_kind)
871 .map(|kind| kind.to_string())
872 .unwrap_or_else(|| "unknown".to_string());
873 let stage = provenance
874 .and_then(|p| p.stage)
875 .map(|stage| stage.to_string())
876 .unwrap_or_else(|| "unknown".to_string());
877 format!("signal={signal} sender={sender} receiver={receiver} actor={actor} stage={stage}")
878}
879
880fn tier_for_task_state(state: TaskState) -> Tier {
882 match state {
883 TaskState::TaskFailed => Tier::Urgent,
884 TaskState::TaskBlocked => Tier::Contextual,
885 TaskState::TaskExecuting | TaskState::TaskWaiting => Tier::Active,
886 TaskState::TaskComplete => Tier::Contextual,
887 _ => Tier::Contextual,
888 }
889}
890
891fn display_run_id(run_id: uuid::Uuid) -> String {
892 let compact = run_id.simple().to_string();
893 compact[..RUN_ID_DISPLAY_LEN.min(compact.len())].to_string()
894}
895
896fn fallback_cursor_position(size: Size) -> Position {
897 Position {
898 x: 0,
899 y: size.height.saturating_sub(1),
900 }
901}
902
903fn tier_for_run_state(state: crate::yarli_core::fsm::run::RunState) -> Tier {
905 use crate::yarli_core::fsm::run::RunState;
906 match state {
907 RunState::RunFailed | RunState::RunBlocked => Tier::Urgent,
908 RunState::RunActive | RunState::RunVerifying => Tier::Active,
909 RunState::RunCompleted => Tier::Contextual,
910 RunState::RunDrained => Tier::Contextual,
911 _ => Tier::Contextual,
912 }
913}
914
915fn format_duration(d: Duration) -> String {
917 let secs = d.as_secs();
918 let millis = d.subsec_millis();
919 if secs >= 60 {
920 format!("{}m {}s", secs / 60, secs % 60)
921 } else if secs >= 10 {
922 format!("{secs}s")
923 } else {
924 format!("{secs}.{millis_h}s", millis_h = millis / 100)
925 }
926}
927
928#[cfg(test)]
930mod tests {
931 use super::*;
932 use ratatui::backend::WindowSize;
933
934 #[derive(Debug, Clone, Copy)]
935 struct StubBackend {
936 size: Size,
937 cursor: Position,
938 fail_cursor_probe: bool,
939 }
940
941 impl StubBackend {
942 fn new(size: Size) -> Self {
943 Self {
944 size,
945 cursor: Position::ORIGIN,
946 fail_cursor_probe: false,
947 }
948 }
949
950 fn failing_probe(size: Size) -> Self {
951 Self {
952 fail_cursor_probe: true,
953 ..Self::new(size)
954 }
955 }
956 }
957
958 impl Backend for StubBackend {
959 fn draw<'a, I>(&mut self, _content: I) -> io::Result<()>
960 where
961 I: Iterator<Item = (u16, u16, &'a ratatui::buffer::Cell)>,
962 {
963 Ok(())
964 }
965
966 fn hide_cursor(&mut self) -> io::Result<()> {
967 Ok(())
968 }
969
970 fn show_cursor(&mut self) -> io::Result<()> {
971 Ok(())
972 }
973
974 fn get_cursor_position(&mut self) -> io::Result<Position> {
975 if self.fail_cursor_probe {
976 Err(io::Error::other("cursor probe failed"))
977 } else {
978 Ok(self.cursor)
979 }
980 }
981
982 fn set_cursor_position<P: Into<Position>>(&mut self, position: P) -> io::Result<()> {
983 self.cursor = position.into();
984 Ok(())
985 }
986
987 fn clear(&mut self) -> io::Result<()> {
988 Ok(())
989 }
990
991 fn size(&self) -> io::Result<Size> {
992 Ok(self.size)
993 }
994
995 fn window_size(&mut self) -> io::Result<WindowSize> {
996 Ok(WindowSize {
997 columns_rows: self.size,
998 pixels: Size::default(),
999 })
1000 }
1001
1002 fn flush(&mut self) -> io::Result<()> {
1003 Ok(())
1004 }
1005 }
1006
1007 #[test]
1008 fn fallback_cursor_position_anchors_to_last_row() {
1009 assert_eq!(
1010 fallback_cursor_position(Size::new(120, 40)),
1011 Position::new(0, 39)
1012 );
1013 assert_eq!(
1014 fallback_cursor_position(Size::new(120, 0)),
1015 Position::ORIGIN
1016 );
1017 }
1018
1019 #[test]
1020 fn cursor_probe_backend_uses_backend_probe_when_available() {
1021 let mut backend = CursorProbeBackend::new(
1022 StubBackend::new(Size::new(80, 24)),
1023 CursorProbeStrategy::QueryTerminal,
1024 );
1025 assert_eq!(
1026 backend.get_cursor_position().expect("probe should succeed"),
1027 Position::ORIGIN
1028 );
1029 }
1030
1031 #[test]
1032 fn cursor_probe_backend_falls_back_without_input_tty() {
1033 let mut backend = CursorProbeBackend::new(
1034 StubBackend::failing_probe(Size::new(80, 24)),
1035 CursorProbeStrategy::AnchorBottomRow,
1036 );
1037 assert_eq!(
1038 backend
1039 .get_cursor_position()
1040 .expect("fallback should succeed"),
1041 Position::new(0, 23)
1042 );
1043 }
1044
1045 #[test]
1046 fn cursor_probe_backend_degrades_after_probe_failure() {
1047 let mut backend = CursorProbeBackend::new(
1048 StubBackend::failing_probe(Size::new(90, 30)),
1049 CursorProbeStrategy::QueryTerminal,
1050 );
1051 assert_eq!(
1052 backend
1053 .get_cursor_position()
1054 .expect("fallback should succeed"),
1055 Position::new(0, 29)
1056 );
1057 assert_eq!(
1058 backend
1059 .get_cursor_position()
1060 .expect("fallback should stay active"),
1061 Position::new(0, 29)
1062 );
1063 }
1064
1065 #[test]
1066 fn format_duration_short() {
1067 assert_eq!(format_duration(Duration::from_millis(3200)), "3.2s");
1068 }
1069
1070 #[test]
1071 fn format_duration_medium() {
1072 assert_eq!(format_duration(Duration::from_secs(34)), "34s");
1073 }
1074
1075 #[test]
1076 fn format_duration_long() {
1077 assert_eq!(format_duration(Duration::from_secs(64)), "1m 4s");
1078 }
1079
1080 #[test]
1081 fn format_duration_zero() {
1082 assert_eq!(format_duration(Duration::ZERO), "0.0s");
1083 }
1084
1085 #[test]
1086 fn tier_for_failed_task() {
1087 assert_eq!(tier_for_task_state(TaskState::TaskFailed), Tier::Urgent);
1088 }
1089
1090 #[test]
1091 fn tier_for_executing_task() {
1092 assert_eq!(tier_for_task_state(TaskState::TaskExecuting), Tier::Active);
1093 }
1094
1095 #[test]
1096 fn tier_for_complete_task() {
1097 assert_eq!(
1098 tier_for_task_state(TaskState::TaskComplete),
1099 Tier::Contextual
1100 );
1101 }
1102
1103 #[test]
1104 fn tier_for_blocked_run() {
1105 use crate::yarli_core::fsm::run::RunState;
1106 assert_eq!(tier_for_run_state(RunState::RunBlocked), Tier::Urgent);
1107 }
1108
1109 #[test]
1110 fn tier_for_active_run() {
1111 use crate::yarli_core::fsm::run::RunState;
1112 assert_eq!(tier_for_run_state(RunState::RunActive), Tier::Active);
1113 }
1114
1115 #[test]
1116 fn tier_for_completed_run() {
1117 use crate::yarli_core::fsm::run::RunState;
1118 assert_eq!(tier_for_run_state(RunState::RunCompleted), Tier::Contextual);
1119 }
1120
1121 #[test]
1122 fn display_run_id_uses_compact_prefix() {
1123 let run_id =
1124 uuid::Uuid::parse_str("019c4f51-5f70-7d84-a0c8-2f5c6bb8ef12").expect("valid uuid");
1125 assert_eq!(display_run_id(run_id), "019c4f515f70");
1126 }
1127
1128 #[test]
1129 fn transient_status_line_throttles_regular_heartbeats() {
1130 let now = Utc::now();
1131 assert!(!should_emit_transient_status_line(
1132 Some(now),
1133 "heartbeat: pending=1 leased=1",
1134 now + chrono::Duration::seconds(10),
1135 ));
1136 assert!(should_emit_transient_status_line(
1137 Some(now),
1138 "heartbeat: pending=1 leased=1",
1139 now + chrono::Duration::seconds(31),
1140 ));
1141 }
1142
1143 #[test]
1144 fn transient_status_line_always_emits_operator_messages() {
1145 let now = Utc::now();
1146 assert!(should_emit_transient_status_line(
1147 Some(now),
1148 "operator pause: maintenance",
1149 now + chrono::Duration::seconds(1),
1150 ));
1151 }
1152
1153 #[test]
1154 fn ascii_progress_formats_empty_snapshot() {
1155 let snapshot = ProgressSnapshot::default();
1156 assert_eq!(format_ascii_progress(snapshot, 10), "[..........] 0/0 (0%)");
1157 }
1158
1159 #[test]
1160 fn ascii_progress_formats_partial_completion() {
1161 let snapshot = ProgressSnapshot {
1162 total: 5,
1163 completed: 2,
1164 failed: 0,
1165 cancelled: 0,
1166 };
1167 assert_eq!(
1168 format_ascii_progress(snapshot, 10),
1169 "[####......] 2/5 (40%)"
1170 );
1171 }
1172
1173 #[test]
1174 fn ascii_progress_counts_all_terminal_states() {
1175 let snapshot = ProgressSnapshot {
1176 total: 4,
1177 completed: 1,
1178 failed: 1,
1179 cancelled: 1,
1180 };
1181 assert_eq!(
1182 format_ascii_progress(snapshot, 10),
1183 "[########..] 3/4 (75%)"
1184 );
1185 }
1186}