Skip to main content

yarli_cli/stream/
headless.rs

1//! Headless renderer — structured output to stderr when no TTY is available.
2//!
3//! Replaces the silent drain loop for non-TTY environments (CI, pipes,
4//! redirected output). Logs task state transitions, forwards command output,
5//! and prints a final run summary with pass/fail counts.
6
7use std::io::{self, Write};
8
9use chrono::{DateTime, Utc};
10use tokio::sync::mpsc;
11use tracing::{debug, info, warn};
12
13use super::events::StreamEvent;
14use crate::yarli_core::domain::CancellationProvenance;
15use crate::yarli_core::entities::continuation::TaskHealthAction;
16use crate::yarli_core::explain::DeteriorationTrend;
17use crate::yarli_core::fsm::run::RunState;
18use crate::yarli_core::fsm::task::TaskState;
19
20/// Counts for the final run summary.
21#[derive(Default)]
22struct RunSummary {
23    run_id: Option<uuid::Uuid>,
24    run_state: Option<RunState>,
25    tasks_complete: u32,
26    tasks_failed: u32,
27    tasks_cancelled: u32,
28    transitions: u32,
29}
30
31/// A renderer that writes structured log lines to stderr.
32///
33/// Used when neither stream nor dashboard mode can initialize
34/// (no TTY, too small terminal, etc.).
35#[derive(Default)]
36pub struct HeadlessRenderer {
37    summary: RunSummary,
38    last_transient_status_emit_at: Option<DateTime<Utc>>,
39}
40
41impl HeadlessRenderer {
42    pub fn new() -> Self {
43        Self::default()
44    }
45
46    /// Consume events from the channel until it closes, writing structured
47    /// output to stderr.
48    pub fn run(mut self, mut rx: mpsc::UnboundedReceiver<StreamEvent>) {
49        while let Some(event) = rx.blocking_recv() {
50            self.handle_event(event);
51        }
52        self.print_summary();
53    }
54
55    fn handle_event(&mut self, event: StreamEvent) {
56        match event {
57            StreamEvent::TaskDiscovered {
58                task_id: _,
59                task_name: _,
60                depends_on: _,
61            } => {
62                // Catalog/discovery event only; no terminal transition yet.
63            }
64            StreamEvent::TaskTransition {
65                task_id,
66                task_name,
67                from,
68                to,
69                elapsed,
70                exit_code,
71                detail,
72                at,
73            } => {
74                self.summary.transitions += 1;
75                match to {
76                    TaskState::TaskComplete => self.summary.tasks_complete += 1,
77                    TaskState::TaskFailed => self.summary.tasks_failed += 1,
78                    TaskState::TaskCancelled => self.summary.tasks_cancelled += 1,
79                    _ => {}
80                }
81
82                let elapsed_str = elapsed
83                    .map(|d| format!(" ({:.1}s)", d.as_secs_f64()))
84                    .unwrap_or_default();
85                let exit_str = exit_code.map(|c| format!(", exit {c}")).unwrap_or_default();
86                let detail_str = detail.map(|d| format!(", {d}")).unwrap_or_default();
87                let time_str = at.format("%H:%M:%S");
88
89                let line = format!(
90                    "{time_str} task/{task_name} {from:?} -> {to:?}{elapsed_str}{exit_str}{detail_str} [{task_id}]"
91                );
92
93                if to == TaskState::TaskFailed {
94                    warn!("{}", line);
95                } else {
96                    info!("{}", line);
97                }
98                let _ = writeln!(io::stderr(), "{line}");
99            }
100            StreamEvent::RunTransition {
101                run_id,
102                from,
103                to,
104                reason,
105                at,
106            } => {
107                let time_str = at.format("%H:%M:%S");
108                let reason_str = reason.map(|r| format!(" ({r})")).unwrap_or_default();
109                let line = format!(
110                    "{time_str} run/{} {from:?} -> {to:?}{reason_str}",
111                    display_run_id(run_id)
112                );
113                if to.is_terminal() {
114                    self.summary.run_state = Some(to);
115                }
116                info!("{}", line);
117                let _ = writeln!(io::stderr(), "{line}");
118            }
119            StreamEvent::RunStarted {
120                run_id,
121                objective,
122                at,
123            } => {
124                self.summary.run_id = Some(run_id);
125                let time_str = at.format("%H:%M:%S");
126                let line = format!(
127                    "{time_str} run/{} started: {objective}",
128                    display_run_id(run_id)
129                );
130                info!("{}", line);
131                let _ = writeln!(io::stderr(), "{line}");
132            }
133            StreamEvent::CommandOutput {
134                task_id: _,
135                task_name,
136                line,
137            } => {
138                let _ = writeln!(io::stderr(), "  [{task_name}] {line}");
139            }
140            StreamEvent::ExplainUpdate { summary } => {
141                info!(summary = %summary, "explain update");
142                let _ = writeln!(io::stderr(), "  WHY: {summary}");
143            }
144            StreamEvent::TransientStatus { message } => {
145                debug!(message = %message, "transient status");
146                let now = Utc::now();
147                let should_emit = message.starts_with("operator ")
148                    || self
149                        .last_transient_status_emit_at
150                        .map(|last| now.signed_duration_since(last).num_seconds() >= 30)
151                        .unwrap_or(true);
152                if should_emit {
153                    self.last_transient_status_emit_at = Some(now);
154                    let time_str = now.format("%H:%M:%S");
155                    let line = format!("{time_str} status {message}");
156                    let _ = writeln!(io::stderr(), "{line}");
157                }
158            }
159            StreamEvent::TaskWorker {
160                task_id: _,
161                worker_id: _,
162            } => {
163                // No output needed for headless mode.
164            }
165            StreamEvent::RunExited { payload } => {
166                self.summary.run_id = Some(payload.run_id);
167                self.summary.run_state = Some(payload.exit_state);
168                self.summary.tasks_complete = payload.summary.completed;
169                self.summary.tasks_failed = payload.summary.failed;
170                self.summary.tasks_cancelled = payload.summary.cancelled;
171                if payload.exit_state == RunState::RunCancelled
172                    || payload.cancellation_provenance.is_some()
173                {
174                    let summary =
175                        format_cancel_provenance_summary(payload.cancellation_provenance.as_ref());
176                    let _ = writeln!(io::stderr(), "  Cancel provenance: {summary}");
177                }
178
179                if let Some(quality_gate) = payload.quality_gate.as_ref() {
180                    if matches!(
181                        quality_gate.task_health_action,
182                        TaskHealthAction::ForcePivot
183                    ) {
184                        if let Some(guidance) = force_pivot_guidance(quality_gate.trend.as_ref()) {
185                            let _ = writeln!(io::stderr(), "{guidance}");
186                        }
187                    }
188                    if matches!(
189                        quality_gate.task_health_action,
190                        TaskHealthAction::StopAndSummarize
191                    ) {
192                        let guidance =
193                            format!("  Stop-and-summarize guidance: {}", quality_gate.reason);
194                        let _ = writeln!(io::stderr(), "{guidance}");
195                    }
196                    if matches!(
197                        quality_gate.task_health_action,
198                        TaskHealthAction::CheckpointNow
199                    ) {
200                        let guidance =
201                            format!("  Checkpoint-now guidance: {}", quality_gate.reason);
202                        let _ = writeln!(io::stderr(), "{guidance}");
203                    }
204                }
205
206                // Print machine-readable JSON to stdout (not stderr).
207                if let Ok(json) = serde_json::to_string(&payload) {
208                    let _ = writeln!(io::stdout(), "{json}");
209                }
210            }
211            StreamEvent::Tick => {
212                // No spinners in headless mode.
213            }
214        }
215    }
216
217    fn print_summary(&self) {
218        let s = &self.summary;
219        let status = match s.run_state {
220            Some(RunState::RunCompleted) => "OK",
221            Some(RunState::RunFailed | RunState::RunBlocked) => "FAILED",
222            Some(RunState::RunCancelled) => "CANCELLED",
223            Some(RunState::RunDrained) => "DRAINED",
224            Some(_) => "DONE",
225            None => {
226                if s.tasks_failed > 0 {
227                    "FAILED"
228                } else {
229                    "OK"
230                }
231            }
232        };
233        let run_label = s
234            .run_id
235            .map(|id| format!(" [{}]", display_run_id(id)))
236            .unwrap_or_default();
237        let line = format!(
238            "--- Run {status}{run_label}: {} complete, {} failed, {} cancelled ({} transitions) ---",
239            s.tasks_complete, s.tasks_failed, s.tasks_cancelled, s.transitions
240        );
241        info!("{}", line);
242        let _ = writeln!(io::stderr(), "{line}");
243    }
244}
245
246fn display_run_id(run_id: uuid::Uuid) -> String {
247    const RUN_ID_DISPLAY_LEN: usize = 12;
248    let compact = run_id.simple().to_string();
249    compact[..RUN_ID_DISPLAY_LEN.min(compact.len())].to_string()
250}
251
252fn format_cancel_provenance_summary(provenance: Option<&CancellationProvenance>) -> String {
253    let signal = provenance
254        .and_then(|p| p.signal_name.as_deref())
255        .unwrap_or("unknown");
256    let sender = provenance
257        .and_then(|p| p.sender_pid)
258        .map(|pid| pid.to_string())
259        .unwrap_or_else(|| "unknown".to_string());
260    let receiver = provenance
261        .and_then(|p| p.receiver_pid)
262        .map(|pid| format!("yarli({pid})"))
263        .unwrap_or_else(|| "unknown".to_string());
264    let actor = provenance
265        .and_then(|p| p.actor_kind)
266        .map(|kind| kind.to_string())
267        .unwrap_or_else(|| "unknown".to_string());
268    let stage = provenance
269        .and_then(|p| p.stage)
270        .map(|stage| stage.to_string())
271        .unwrap_or_else(|| "unknown".to_string());
272    format!("signal={signal} sender={sender} receiver={receiver} actor={actor} stage={stage}")
273}
274
275fn force_pivot_guidance(trend: Option<&DeteriorationTrend>) -> Option<String> {
276    if matches!(trend, Some(DeteriorationTrend::Deteriorating)) {
277        Some(
278            "  Force-pivot guidance: sequence quality is deteriorating; narrow scope and shift task focus before continuing."
279                .to_string(),
280        )
281    } else {
282        None
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289    use crate::yarli_core::fsm::run::RunState;
290    use chrono::Utc;
291    use std::time::Duration;
292    use uuid::Uuid;
293
294    #[test]
295    fn headless_counts_complete_tasks() {
296        let mut renderer = HeadlessRenderer::new();
297        renderer.handle_event(StreamEvent::TaskTransition {
298            task_id: Uuid::new_v4(),
299            task_name: "build".into(),
300            from: TaskState::TaskExecuting,
301            to: TaskState::TaskComplete,
302            elapsed: Some(Duration::from_secs(5)),
303            exit_code: Some(0),
304            detail: None,
305            at: Utc::now(),
306        });
307        assert_eq!(renderer.summary.tasks_complete, 1);
308        assert_eq!(renderer.summary.tasks_failed, 0);
309        assert_eq!(renderer.summary.transitions, 1);
310    }
311
312    #[test]
313    fn headless_counts_failed_tasks() {
314        let mut renderer = HeadlessRenderer::new();
315        renderer.handle_event(StreamEvent::TaskTransition {
316            task_id: Uuid::new_v4(),
317            task_name: "test".into(),
318            from: TaskState::TaskExecuting,
319            to: TaskState::TaskFailed,
320            elapsed: Some(Duration::from_secs(2)),
321            exit_code: Some(1),
322            detail: Some("nonzero exit".into()),
323            at: Utc::now(),
324        });
325        assert_eq!(renderer.summary.tasks_failed, 1);
326        assert_eq!(renderer.summary.tasks_complete, 0);
327    }
328
329    #[test]
330    fn headless_forwards_command_output() {
331        // Just verify it doesn't panic.
332        let mut renderer = HeadlessRenderer::new();
333        renderer.handle_event(StreamEvent::CommandOutput {
334            task_id: Uuid::new_v4(),
335            task_name: "build".into(),
336            line: "Compiling yarli v0.1.0".into(),
337        });
338    }
339
340    #[test]
341    fn headless_records_run_id_from_run_started() {
342        let mut renderer = HeadlessRenderer::new();
343        let run_id = Uuid::new_v4();
344        renderer.handle_event(StreamEvent::RunStarted {
345            run_id,
346            objective: "build everything".into(),
347            at: Utc::now(),
348        });
349        assert_eq!(renderer.summary.run_id, Some(run_id));
350        // RunStarted doesn't affect task counts.
351        assert_eq!(renderer.summary.transitions, 0);
352    }
353
354    #[test]
355    fn headless_summary_includes_run_id() {
356        let mut renderer = HeadlessRenderer::new();
357        let run_id = Uuid::new_v4();
358        renderer.handle_event(StreamEvent::RunStarted {
359            run_id,
360            objective: "test".into(),
361            at: Utc::now(),
362        });
363        renderer.handle_event(StreamEvent::TaskTransition {
364            task_id: Uuid::new_v4(),
365            task_name: "build".into(),
366            from: TaskState::TaskExecuting,
367            to: TaskState::TaskComplete,
368            elapsed: None,
369            exit_code: Some(0),
370            detail: None,
371            at: Utc::now(),
372        });
373        // Verify run_id is captured for summary.
374        assert_eq!(renderer.summary.run_id, Some(run_id));
375        assert_eq!(renderer.summary.tasks_complete, 1);
376    }
377
378    #[test]
379    fn headless_handles_run_transition() {
380        let mut renderer = HeadlessRenderer::new();
381        renderer.handle_event(StreamEvent::RunTransition {
382            run_id: Uuid::new_v4(),
383            from: RunState::RunOpen,
384            to: RunState::RunActive,
385            reason: Some("started".into()),
386            at: Utc::now(),
387        });
388        // Run transitions don't affect task counts.
389        assert_eq!(renderer.summary.transitions, 0);
390    }
391
392    #[test]
393    fn headless_summary_reflects_all_events() {
394        let mut renderer = HeadlessRenderer::new();
395        for _ in 0..3 {
396            renderer.handle_event(StreamEvent::TaskTransition {
397                task_id: Uuid::new_v4(),
398                task_name: "task".into(),
399                from: TaskState::TaskExecuting,
400                to: TaskState::TaskComplete,
401                elapsed: None,
402                exit_code: Some(0),
403                detail: None,
404                at: Utc::now(),
405            });
406        }
407        renderer.handle_event(StreamEvent::TaskTransition {
408            task_id: Uuid::new_v4(),
409            task_name: "fail".into(),
410            from: TaskState::TaskExecuting,
411            to: TaskState::TaskFailed,
412            elapsed: None,
413            exit_code: Some(1),
414            detail: None,
415            at: Utc::now(),
416        });
417        renderer.handle_event(StreamEvent::TaskTransition {
418            task_id: Uuid::new_v4(),
419            task_name: "cancel".into(),
420            from: TaskState::TaskExecuting,
421            to: TaskState::TaskCancelled,
422            elapsed: None,
423            exit_code: None,
424            detail: None,
425            at: Utc::now(),
426        });
427
428        assert_eq!(renderer.summary.tasks_complete, 3);
429        assert_eq!(renderer.summary.tasks_failed, 1);
430        assert_eq!(renderer.summary.tasks_cancelled, 1);
431        assert_eq!(renderer.summary.transitions, 5);
432    }
433
434    #[test]
435    fn headless_handles_run_exited_without_panic() {
436        use crate::yarli_core::domain::{CommandClass, SafeMode};
437        use crate::yarli_core::entities::continuation::ContinuationPayload;
438        use crate::yarli_core::entities::run::Run;
439        use crate::yarli_core::entities::task::Task;
440
441        let run = Run::new("test", SafeMode::Execute);
442        let mut t = Task::new(
443            run.id,
444            "build",
445            "do build",
446            CommandClass::Io,
447            run.correlation_id,
448        );
449        t.state = TaskState::TaskComplete;
450        let payload = ContinuationPayload::build(&run, &[&t]);
451
452        let mut renderer = HeadlessRenderer::new();
453        renderer.handle_event(StreamEvent::RunExited { payload });
454        // No panic — success.
455    }
456
457    #[test]
458    fn run_exited_summary_overrides_transition_failure_counts() {
459        use crate::yarli_core::entities::continuation::{
460            ContinuationPayload, RunSummary, TrancheSpec,
461        };
462
463        let mut renderer = HeadlessRenderer::new();
464        let run_id = Uuid::new_v4();
465        renderer.handle_event(StreamEvent::TaskTransition {
466            task_id: Uuid::new_v4(),
467            task_name: "retryable".into(),
468            from: TaskState::TaskExecuting,
469            to: TaskState::TaskFailed,
470            elapsed: None,
471            exit_code: Some(1),
472            detail: None,
473            at: Utc::now(),
474        });
475
476        renderer.handle_event(StreamEvent::RunExited {
477            payload: ContinuationPayload {
478                run_id,
479                objective: "retry flow".into(),
480                exit_state: RunState::RunCompleted,
481                exit_reason: None,
482                cancellation_source: None,
483                cancellation_provenance: None,
484                completed_at: Utc::now(),
485                tasks: Vec::new(),
486                summary: RunSummary {
487                    total: 1,
488                    completed: 1,
489                    failed: 0,
490                    cancelled: 0,
491                    pending: 0,
492                },
493                next_tranche: Some(TrancheSpec {
494                    suggested_objective: "next".into(),
495                    kind: crate::yarli_core::entities::continuation::TrancheKind::PlannedNext,
496                    retry_task_keys: Vec::new(),
497                    unfinished_task_keys: Vec::new(),
498                    planned_task_keys: vec!["t2".into()],
499                    planned_tranche_key: Some("t2".into()),
500                    cursor: None,
501                    config_snapshot: serde_json::json!({}),
502                    interventions: Vec::new(),
503                }),
504                quality_gate: None,
505                retry_recommendation: None,
506            },
507        });
508
509        assert_eq!(renderer.summary.run_state, Some(RunState::RunCompleted));
510        assert_eq!(renderer.summary.tasks_failed, 0);
511    }
512}