1use std::io::{self, Write};
8
9use chrono::{DateTime, Utc};
10use tokio::sync::mpsc;
11use tracing::{debug, info, warn};
12
13use super::events::StreamEvent;
14use crate::yarli_core::domain::CancellationProvenance;
15use crate::yarli_core::entities::continuation::TaskHealthAction;
16use crate::yarli_core::explain::DeteriorationTrend;
17use crate::yarli_core::fsm::run::RunState;
18use crate::yarli_core::fsm::task::TaskState;
19
20#[derive(Default)]
22struct RunSummary {
23 run_id: Option<uuid::Uuid>,
24 run_state: Option<RunState>,
25 tasks_complete: u32,
26 tasks_failed: u32,
27 tasks_cancelled: u32,
28 transitions: u32,
29}
30
31#[derive(Default)]
36pub struct HeadlessRenderer {
37 summary: RunSummary,
38 last_transient_status_emit_at: Option<DateTime<Utc>>,
39}
40
41impl HeadlessRenderer {
42 pub fn new() -> Self {
43 Self::default()
44 }
45
46 pub fn run(mut self, mut rx: mpsc::UnboundedReceiver<StreamEvent>) {
49 while let Some(event) = rx.blocking_recv() {
50 self.handle_event(event);
51 }
52 self.print_summary();
53 }
54
55 fn handle_event(&mut self, event: StreamEvent) {
56 match event {
57 StreamEvent::TaskDiscovered {
58 task_id: _,
59 task_name: _,
60 depends_on: _,
61 } => {
62 }
64 StreamEvent::TaskTransition {
65 task_id,
66 task_name,
67 from,
68 to,
69 elapsed,
70 exit_code,
71 detail,
72 at,
73 } => {
74 self.summary.transitions += 1;
75 match to {
76 TaskState::TaskComplete => self.summary.tasks_complete += 1,
77 TaskState::TaskFailed => self.summary.tasks_failed += 1,
78 TaskState::TaskCancelled => self.summary.tasks_cancelled += 1,
79 _ => {}
80 }
81
82 let elapsed_str = elapsed
83 .map(|d| format!(" ({:.1}s)", d.as_secs_f64()))
84 .unwrap_or_default();
85 let exit_str = exit_code.map(|c| format!(", exit {c}")).unwrap_or_default();
86 let detail_str = detail.map(|d| format!(", {d}")).unwrap_or_default();
87 let time_str = at.format("%H:%M:%S");
88
89 let line = format!(
90 "{time_str} task/{task_name} {from:?} -> {to:?}{elapsed_str}{exit_str}{detail_str} [{task_id}]"
91 );
92
93 if to == TaskState::TaskFailed {
94 warn!("{}", line);
95 } else {
96 info!("{}", line);
97 }
98 let _ = writeln!(io::stderr(), "{line}");
99 }
100 StreamEvent::RunTransition {
101 run_id,
102 from,
103 to,
104 reason,
105 at,
106 } => {
107 let time_str = at.format("%H:%M:%S");
108 let reason_str = reason.map(|r| format!(" ({r})")).unwrap_or_default();
109 let line = format!(
110 "{time_str} run/{} {from:?} -> {to:?}{reason_str}",
111 display_run_id(run_id)
112 );
113 if to.is_terminal() {
114 self.summary.run_state = Some(to);
115 }
116 info!("{}", line);
117 let _ = writeln!(io::stderr(), "{line}");
118 }
119 StreamEvent::RunStarted {
120 run_id,
121 objective,
122 at,
123 } => {
124 self.summary.run_id = Some(run_id);
125 let time_str = at.format("%H:%M:%S");
126 let line = format!(
127 "{time_str} run/{} started: {objective}",
128 display_run_id(run_id)
129 );
130 info!("{}", line);
131 let _ = writeln!(io::stderr(), "{line}");
132 }
133 StreamEvent::CommandOutput {
134 task_id: _,
135 task_name,
136 line,
137 } => {
138 let _ = writeln!(io::stderr(), " [{task_name}] {line}");
139 }
140 StreamEvent::ExplainUpdate { summary } => {
141 info!(summary = %summary, "explain update");
142 let _ = writeln!(io::stderr(), " WHY: {summary}");
143 }
144 StreamEvent::TransientStatus { message } => {
145 debug!(message = %message, "transient status");
146 let now = Utc::now();
147 let should_emit = message.starts_with("operator ")
148 || self
149 .last_transient_status_emit_at
150 .map(|last| now.signed_duration_since(last).num_seconds() >= 30)
151 .unwrap_or(true);
152 if should_emit {
153 self.last_transient_status_emit_at = Some(now);
154 let time_str = now.format("%H:%M:%S");
155 let line = format!("{time_str} status {message}");
156 let _ = writeln!(io::stderr(), "{line}");
157 }
158 }
159 StreamEvent::TaskWorker {
160 task_id: _,
161 worker_id: _,
162 } => {
163 }
165 StreamEvent::RunExited { payload } => {
166 self.summary.run_id = Some(payload.run_id);
167 self.summary.run_state = Some(payload.exit_state);
168 self.summary.tasks_complete = payload.summary.completed;
169 self.summary.tasks_failed = payload.summary.failed;
170 self.summary.tasks_cancelled = payload.summary.cancelled;
171 if payload.exit_state == RunState::RunCancelled
172 || payload.cancellation_provenance.is_some()
173 {
174 let summary =
175 format_cancel_provenance_summary(payload.cancellation_provenance.as_ref());
176 let _ = writeln!(io::stderr(), " Cancel provenance: {summary}");
177 }
178
179 if let Some(quality_gate) = payload.quality_gate.as_ref() {
180 if matches!(
181 quality_gate.task_health_action,
182 TaskHealthAction::ForcePivot
183 ) {
184 if let Some(guidance) = force_pivot_guidance(quality_gate.trend.as_ref()) {
185 let _ = writeln!(io::stderr(), "{guidance}");
186 }
187 }
188 if matches!(
189 quality_gate.task_health_action,
190 TaskHealthAction::StopAndSummarize
191 ) {
192 let guidance =
193 format!(" Stop-and-summarize guidance: {}", quality_gate.reason);
194 let _ = writeln!(io::stderr(), "{guidance}");
195 }
196 if matches!(
197 quality_gate.task_health_action,
198 TaskHealthAction::CheckpointNow
199 ) {
200 let guidance =
201 format!(" Checkpoint-now guidance: {}", quality_gate.reason);
202 let _ = writeln!(io::stderr(), "{guidance}");
203 }
204 }
205
206 if let Ok(json) = serde_json::to_string(&payload) {
208 let _ = writeln!(io::stdout(), "{json}");
209 }
210 }
211 StreamEvent::Tick => {
212 }
214 }
215 }
216
217 fn print_summary(&self) {
218 let s = &self.summary;
219 let status = match s.run_state {
220 Some(RunState::RunCompleted) => "OK",
221 Some(RunState::RunFailed | RunState::RunBlocked) => "FAILED",
222 Some(RunState::RunCancelled) => "CANCELLED",
223 Some(RunState::RunDrained) => "DRAINED",
224 Some(_) => "DONE",
225 None => {
226 if s.tasks_failed > 0 {
227 "FAILED"
228 } else {
229 "OK"
230 }
231 }
232 };
233 let run_label = s
234 .run_id
235 .map(|id| format!(" [{}]", display_run_id(id)))
236 .unwrap_or_default();
237 let line = format!(
238 "--- Run {status}{run_label}: {} complete, {} failed, {} cancelled ({} transitions) ---",
239 s.tasks_complete, s.tasks_failed, s.tasks_cancelled, s.transitions
240 );
241 info!("{}", line);
242 let _ = writeln!(io::stderr(), "{line}");
243 }
244}
245
246fn display_run_id(run_id: uuid::Uuid) -> String {
247 const RUN_ID_DISPLAY_LEN: usize = 12;
248 let compact = run_id.simple().to_string();
249 compact[..RUN_ID_DISPLAY_LEN.min(compact.len())].to_string()
250}
251
252fn format_cancel_provenance_summary(provenance: Option<&CancellationProvenance>) -> String {
253 let signal = provenance
254 .and_then(|p| p.signal_name.as_deref())
255 .unwrap_or("unknown");
256 let sender = provenance
257 .and_then(|p| p.sender_pid)
258 .map(|pid| pid.to_string())
259 .unwrap_or_else(|| "unknown".to_string());
260 let receiver = provenance
261 .and_then(|p| p.receiver_pid)
262 .map(|pid| format!("yarli({pid})"))
263 .unwrap_or_else(|| "unknown".to_string());
264 let actor = provenance
265 .and_then(|p| p.actor_kind)
266 .map(|kind| kind.to_string())
267 .unwrap_or_else(|| "unknown".to_string());
268 let stage = provenance
269 .and_then(|p| p.stage)
270 .map(|stage| stage.to_string())
271 .unwrap_or_else(|| "unknown".to_string());
272 format!("signal={signal} sender={sender} receiver={receiver} actor={actor} stage={stage}")
273}
274
275fn force_pivot_guidance(trend: Option<&DeteriorationTrend>) -> Option<String> {
276 if matches!(trend, Some(DeteriorationTrend::Deteriorating)) {
277 Some(
278 " Force-pivot guidance: sequence quality is deteriorating; narrow scope and shift task focus before continuing."
279 .to_string(),
280 )
281 } else {
282 None
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289 use crate::yarli_core::fsm::run::RunState;
290 use chrono::Utc;
291 use std::time::Duration;
292 use uuid::Uuid;
293
294 #[test]
295 fn headless_counts_complete_tasks() {
296 let mut renderer = HeadlessRenderer::new();
297 renderer.handle_event(StreamEvent::TaskTransition {
298 task_id: Uuid::new_v4(),
299 task_name: "build".into(),
300 from: TaskState::TaskExecuting,
301 to: TaskState::TaskComplete,
302 elapsed: Some(Duration::from_secs(5)),
303 exit_code: Some(0),
304 detail: None,
305 at: Utc::now(),
306 });
307 assert_eq!(renderer.summary.tasks_complete, 1);
308 assert_eq!(renderer.summary.tasks_failed, 0);
309 assert_eq!(renderer.summary.transitions, 1);
310 }
311
312 #[test]
313 fn headless_counts_failed_tasks() {
314 let mut renderer = HeadlessRenderer::new();
315 renderer.handle_event(StreamEvent::TaskTransition {
316 task_id: Uuid::new_v4(),
317 task_name: "test".into(),
318 from: TaskState::TaskExecuting,
319 to: TaskState::TaskFailed,
320 elapsed: Some(Duration::from_secs(2)),
321 exit_code: Some(1),
322 detail: Some("nonzero exit".into()),
323 at: Utc::now(),
324 });
325 assert_eq!(renderer.summary.tasks_failed, 1);
326 assert_eq!(renderer.summary.tasks_complete, 0);
327 }
328
329 #[test]
330 fn headless_forwards_command_output() {
331 let mut renderer = HeadlessRenderer::new();
333 renderer.handle_event(StreamEvent::CommandOutput {
334 task_id: Uuid::new_v4(),
335 task_name: "build".into(),
336 line: "Compiling yarli v0.1.0".into(),
337 });
338 }
339
340 #[test]
341 fn headless_records_run_id_from_run_started() {
342 let mut renderer = HeadlessRenderer::new();
343 let run_id = Uuid::new_v4();
344 renderer.handle_event(StreamEvent::RunStarted {
345 run_id,
346 objective: "build everything".into(),
347 at: Utc::now(),
348 });
349 assert_eq!(renderer.summary.run_id, Some(run_id));
350 assert_eq!(renderer.summary.transitions, 0);
352 }
353
354 #[test]
355 fn headless_summary_includes_run_id() {
356 let mut renderer = HeadlessRenderer::new();
357 let run_id = Uuid::new_v4();
358 renderer.handle_event(StreamEvent::RunStarted {
359 run_id,
360 objective: "test".into(),
361 at: Utc::now(),
362 });
363 renderer.handle_event(StreamEvent::TaskTransition {
364 task_id: Uuid::new_v4(),
365 task_name: "build".into(),
366 from: TaskState::TaskExecuting,
367 to: TaskState::TaskComplete,
368 elapsed: None,
369 exit_code: Some(0),
370 detail: None,
371 at: Utc::now(),
372 });
373 assert_eq!(renderer.summary.run_id, Some(run_id));
375 assert_eq!(renderer.summary.tasks_complete, 1);
376 }
377
378 #[test]
379 fn headless_handles_run_transition() {
380 let mut renderer = HeadlessRenderer::new();
381 renderer.handle_event(StreamEvent::RunTransition {
382 run_id: Uuid::new_v4(),
383 from: RunState::RunOpen,
384 to: RunState::RunActive,
385 reason: Some("started".into()),
386 at: Utc::now(),
387 });
388 assert_eq!(renderer.summary.transitions, 0);
390 }
391
392 #[test]
393 fn headless_summary_reflects_all_events() {
394 let mut renderer = HeadlessRenderer::new();
395 for _ in 0..3 {
396 renderer.handle_event(StreamEvent::TaskTransition {
397 task_id: Uuid::new_v4(),
398 task_name: "task".into(),
399 from: TaskState::TaskExecuting,
400 to: TaskState::TaskComplete,
401 elapsed: None,
402 exit_code: Some(0),
403 detail: None,
404 at: Utc::now(),
405 });
406 }
407 renderer.handle_event(StreamEvent::TaskTransition {
408 task_id: Uuid::new_v4(),
409 task_name: "fail".into(),
410 from: TaskState::TaskExecuting,
411 to: TaskState::TaskFailed,
412 elapsed: None,
413 exit_code: Some(1),
414 detail: None,
415 at: Utc::now(),
416 });
417 renderer.handle_event(StreamEvent::TaskTransition {
418 task_id: Uuid::new_v4(),
419 task_name: "cancel".into(),
420 from: TaskState::TaskExecuting,
421 to: TaskState::TaskCancelled,
422 elapsed: None,
423 exit_code: None,
424 detail: None,
425 at: Utc::now(),
426 });
427
428 assert_eq!(renderer.summary.tasks_complete, 3);
429 assert_eq!(renderer.summary.tasks_failed, 1);
430 assert_eq!(renderer.summary.tasks_cancelled, 1);
431 assert_eq!(renderer.summary.transitions, 5);
432 }
433
434 #[test]
435 fn headless_handles_run_exited_without_panic() {
436 use crate::yarli_core::domain::{CommandClass, SafeMode};
437 use crate::yarli_core::entities::continuation::ContinuationPayload;
438 use crate::yarli_core::entities::run::Run;
439 use crate::yarli_core::entities::task::Task;
440
441 let run = Run::new("test", SafeMode::Execute);
442 let mut t = Task::new(
443 run.id,
444 "build",
445 "do build",
446 CommandClass::Io,
447 run.correlation_id,
448 );
449 t.state = TaskState::TaskComplete;
450 let payload = ContinuationPayload::build(&run, &[&t]);
451
452 let mut renderer = HeadlessRenderer::new();
453 renderer.handle_event(StreamEvent::RunExited { payload });
454 }
456
457 #[test]
458 fn run_exited_summary_overrides_transition_failure_counts() {
459 use crate::yarli_core::entities::continuation::{
460 ContinuationPayload, RunSummary, TrancheSpec,
461 };
462
463 let mut renderer = HeadlessRenderer::new();
464 let run_id = Uuid::new_v4();
465 renderer.handle_event(StreamEvent::TaskTransition {
466 task_id: Uuid::new_v4(),
467 task_name: "retryable".into(),
468 from: TaskState::TaskExecuting,
469 to: TaskState::TaskFailed,
470 elapsed: None,
471 exit_code: Some(1),
472 detail: None,
473 at: Utc::now(),
474 });
475
476 renderer.handle_event(StreamEvent::RunExited {
477 payload: ContinuationPayload {
478 run_id,
479 objective: "retry flow".into(),
480 exit_state: RunState::RunCompleted,
481 exit_reason: None,
482 cancellation_source: None,
483 cancellation_provenance: None,
484 completed_at: Utc::now(),
485 tasks: Vec::new(),
486 summary: RunSummary {
487 total: 1,
488 completed: 1,
489 failed: 0,
490 cancelled: 0,
491 pending: 0,
492 },
493 next_tranche: Some(TrancheSpec {
494 suggested_objective: "next".into(),
495 kind: crate::yarli_core::entities::continuation::TrancheKind::PlannedNext,
496 retry_task_keys: Vec::new(),
497 unfinished_task_keys: Vec::new(),
498 planned_task_keys: vec!["t2".into()],
499 planned_tranche_key: Some("t2".into()),
500 cursor: None,
501 config_snapshot: serde_json::json!({}),
502 interventions: Vec::new(),
503 }),
504 quality_gate: None,
505 retry_recommendation: None,
506 },
507 });
508
509 assert_eq!(renderer.summary.run_state, Some(RunState::RunCompleted));
510 assert_eq!(renderer.summary.tasks_failed, 0);
511 }
512}