Skip to main content

atomcode_tuix/render/
worker.rs

1// crates/atomcode-tuix/src/render/worker.rs
2//
3// Render worker — moves terminal I/O off the main event loop.
4//
5// ## Why
6//
7// Mac Terminal.app takes 30-60ms to process a full footer ANSI payload.
8// When the event loop calls `renderer.render()` directly, that 30-60ms
9// blocks the select! loop, which means:
10//   - the spinner tick task can't deliver (drops),
11//   - the next keystroke can't be read,
12//   - agent events queue up behind the render.
13//
14// `InputThrottle` (see throttle.rs) mitigates the storm by coalescing
15// InputPrompt/StreamingBox paints. This worker eliminates the blocking
16// at the architectural level: the event loop sends `UiLine`s and
17// lifecycle commands into a channel, a dedicated OS thread owns the
18// inner renderer and drains the channel. Slow terminal ≠ stalled event
19// loop.
20//
21// ## Sync vs. async lifecycle
22//
23// Most render calls are fire-and-forget: `render(UiLine)` just enqueues.
24// Lifecycle methods that must complete before the caller proceeds —
25// `reset`, `clear_screen`, `suspend_for_external`, `resume_from_external`,
26// `shutdown` — send a command with an ACK oneshot channel and block
27// until the worker reports done. The `/login` OAuth flow for example
28// can't tolerate "renderer hasn't flipped raw mode yet" when the child
29// process opens the browser.
30//
31// `flush` and `flush_deferred` are fire-and-forget (no ACK) — order is
32// preserved because all commands travel the same channel.
33//
34// ## Shutdown
35//
36// `Drop` sends `Shutdown` and joins the thread, guaranteeing the final
37// terminal-reset bytes land before `run()` returns. Dropping the sender
38// alone would also let the worker exit on the next recv error, but an
39// explicit Shutdown gives clean "process the last queued line + flush"
40// semantics rather than "drop whatever is still in flight".
41
42use std::sync::mpsc;
43use std::thread;
44use std::time::Duration;
45
46use super::{Renderer, UiLine};
47
48/// Commands sent to the render worker thread.
49enum RenderCmd {
50    Line(UiLine),
51    Flush,
52    FlushDeferred,
53    /// Terminal resize — fire-and-forget, the worker updates its
54    /// internal DECSTBM region and repaints the footer.
55    Resize(u16, u16),
56    /// Remove the tail ApprovalPrompt body row (fire-and-forget).
57    PopApprovalPrompt,
58    /// Scroll the body viewport by `delta` rows. Negative = up,
59    /// positive = down. AltScreenRenderer mutates viewport_top;
60    /// other renderers default to no-op.
61    ScrollBody(i32),
62    /// Jump body viewport to absolute top / bottom of scrollback.
63    ScrollBodyToTop,
64    ScrollBodyToBottom,
65    /// Mouse-drag selection lifecycle. Forwarded to the inner renderer;
66    /// only AltScreenRenderer acts on these. `(col, row)` are 0-indexed
67    /// terminal cells.
68    BeginSelection(u16, u16),
69    UpdateSelection(u16, u16),
70    EndSelection,
71    /// Copy the current selection to the system clipboard (arboard).
72    /// Returns `true` via the ACK channel if a non-empty selection was
73    /// copied. Used by Ctrl+C to copy selected text on Windows where
74    /// OSC 52 is not supported.
75    CopySelection(mpsc::Sender<bool>),
76    /// Lifecycle operation requiring an ACK — the worker performs the
77    /// op then sends `()` back so the caller can proceed.
78    Ack {
79        op: AckOp,
80        ack: mpsc::Sender<()>,
81    },
82}
83
84#[derive(Debug, Clone, Copy)]
85enum AckOp {
86    Reset,
87    ClearScreen,
88    SuspendForExternal,
89    ResumeFromExternal,
90    Shutdown,
91}
92
93/// Renderer facade that forwards every call to a background OS thread.
94/// Implements the `Renderer` trait so the event loop can use it as a
95/// drop-in replacement for `AnsiRenderer` / `PlainRenderer` — the wire
96/// protocol is the same `UiLine` enum.
97pub struct TaskRenderer {
98    cmd_tx: mpsc::Sender<RenderCmd>,
99    /// Join handle for the worker thread; `Some` until `Drop` takes it
100    /// to `join()`.
101    worker: Option<thread::JoinHandle<()>>,
102}
103
104impl TaskRenderer {
105    /// Spawn the worker thread, handing it ownership of the inner
106    /// renderer. After this returns the caller interacts with the inner
107    /// renderer only via the returned facade.
108    pub fn new(inner: Box<dyn Renderer>) -> Self {
109        let (cmd_tx, cmd_rx) = mpsc::channel::<RenderCmd>();
110        let worker = thread::Builder::new()
111            .name("tuix-render".to_string())
112            .spawn(move || run_worker(inner, cmd_rx))
113            .expect("spawn render worker thread");
114        Self {
115            cmd_tx,
116            worker: Some(worker),
117        }
118    }
119
120    /// Send an ACK op and block until the worker reports done. 10s
121    /// bound keeps us from hanging forever if the worker ever wedges,
122    /// while giving slow CI machines / thermal-throttled laptops /
123    /// debug builds enough headroom that routine lifecycle ops don't
124    /// spuriously timeout.
125    ///
126    /// 2s was the original budget — a worker processing `Shutdown`
127    /// normally takes < 1ms, so 2s felt like plenty. But on a loaded
128    /// CI runner mid-cargo-test, a few tests would sporadically fail
129    /// on the timeout line because the OS hadn't scheduled the worker
130    /// thread fast enough. CC-style TUI harnesses use ~10s for the
131    /// same reason.
132    fn ack(&self, op: AckOp) {
133        let (ack_tx, ack_rx) = mpsc::channel();
134        if self
135            .cmd_tx
136            .send(RenderCmd::Ack { op, ack: ack_tx })
137            .is_err()
138        {
139            // Worker is gone (already shut down) — nothing to do.
140            return;
141        }
142        let _ = ack_rx.recv_timeout(Duration::from_secs(10));
143    }
144}
145
146impl Renderer for TaskRenderer {
147    fn render(&mut self, line: UiLine) {
148        let _ = self.cmd_tx.send(RenderCmd::Line(line));
149    }
150
151    fn flush(&mut self) {
152        let _ = self.cmd_tx.send(RenderCmd::Flush);
153    }
154
155    fn shutdown(&mut self) {
156        self.ack(AckOp::Shutdown);
157    }
158
159    fn reset(&mut self) {
160        self.ack(AckOp::Reset);
161    }
162
163    fn clear_screen(&mut self) {
164        self.ack(AckOp::ClearScreen);
165    }
166
167    fn suspend_for_external(&mut self) {
168        self.ack(AckOp::SuspendForExternal);
169    }
170
171    fn resume_from_external(&mut self) {
172        self.ack(AckOp::ResumeFromExternal);
173    }
174
175    fn flush_deferred(&mut self) {
176        let _ = self.cmd_tx.send(RenderCmd::FlushDeferred);
177    }
178
179    fn on_resize(&mut self, cols: u16, rows: u16) {
180        let _ = self.cmd_tx.send(RenderCmd::Resize(cols, rows));
181    }
182
183    fn pop_approval_prompt(&mut self) {
184        let _ = self.cmd_tx.send(RenderCmd::PopApprovalPrompt);
185    }
186
187    fn scroll_body(&mut self, delta: i32) {
188        let _ = self.cmd_tx.send(RenderCmd::ScrollBody(delta));
189    }
190
191    fn scroll_body_to_top(&mut self) {
192        let _ = self.cmd_tx.send(RenderCmd::ScrollBodyToTop);
193    }
194
195    fn scroll_body_to_bottom(&mut self) {
196        let _ = self.cmd_tx.send(RenderCmd::ScrollBodyToBottom);
197    }
198
199    fn begin_selection(&mut self, col: u16, row: u16) {
200        let _ = self.cmd_tx.send(RenderCmd::BeginSelection(col, row));
201    }
202
203    fn update_selection(&mut self, col: u16, row: u16) {
204        let _ = self.cmd_tx.send(RenderCmd::UpdateSelection(col, row));
205    }
206
207    fn end_selection(&mut self) {
208        let _ = self.cmd_tx.send(RenderCmd::EndSelection);
209    }
210
211    fn copy_selection(&mut self) -> bool {
212        let (ack_tx, ack_rx) = mpsc::channel();
213        if self.cmd_tx.send(RenderCmd::CopySelection(ack_tx)).is_err() {
214            return false;
215        }
216        ack_rx.recv_timeout(Duration::from_secs(5)).unwrap_or(false)
217    }
218}
219
220impl Drop for TaskRenderer {
221    fn drop(&mut self) {
222        // Idempotent shutdown — `Renderer::shutdown` may have already
223        // run, in which case the worker is already gone and this call
224        // is a no-op (ack() swallows the send error).
225        self.ack(AckOp::Shutdown);
226        if let Some(handle) = self.worker.take() {
227            let _ = handle.join();
228        }
229    }
230}
231
232fn run_worker(mut inner: Box<dyn Renderer>, cmd_rx: mpsc::Receiver<RenderCmd>) {
233    use std::time::Instant;
234    while let Ok(cmd) = cmd_rx.recv() {
235        // Measure the wall-clock time each terminal I/O takes so the log
236        // shows where Mac Terminal.app / iTerm2 / etc. actually spend time.
237        // Big `flush` durations = kernel pipe backpressure from a slow
238        // terminal emulator; big `render` durations = our own bytes taking
239        // forever to serialize or intermediate `write_all` blocking.
240        match cmd {
241            RenderCmd::Line(line) => {
242                let tag = ui_line_tag(&line);
243                let t0 = Instant::now();
244                inner.render(line);
245                crate::tuix_trace!("REN", "Line {} render={}µs", tag, t0.elapsed().as_micros());
246            }
247            RenderCmd::Flush => {
248                let t0 = Instant::now();
249                inner.flush();
250                crate::tuix_trace!("REN", "Flush flush={}µs", t0.elapsed().as_micros());
251            }
252            RenderCmd::FlushDeferred => {
253                // Skip logging when it's a true no-op (no pending payload
254                // and window not elapsed). throttle.rs already logs when
255                // this path actually paints.
256                let t0 = Instant::now();
257                inner.flush_deferred();
258                let d = t0.elapsed();
259                if d.as_micros() > 100 {
260                    crate::tuix_trace!("REN", "FlushDeferred deferred={}µs", d.as_micros());
261                }
262            }
263            RenderCmd::Resize(cols, rows) => {
264                let t0 = Instant::now();
265                inner.on_resize(cols, rows);
266                crate::tuix_trace!(
267                    "REN",
268                    "Resize {}x{} dur={}µs",
269                    cols,
270                    rows,
271                    t0.elapsed().as_micros()
272                );
273            }
274            RenderCmd::PopApprovalPrompt => {
275                inner.pop_approval_prompt();
276            }
277            RenderCmd::ScrollBody(delta) => {
278                inner.scroll_body(delta);
279            }
280            RenderCmd::ScrollBodyToTop => {
281                inner.scroll_body_to_top();
282            }
283            RenderCmd::ScrollBodyToBottom => {
284                inner.scroll_body_to_bottom();
285            }
286            RenderCmd::BeginSelection(col, row) => {
287                inner.begin_selection(col, row);
288            }
289            RenderCmd::UpdateSelection(col, row) => {
290                inner.update_selection(col, row);
291            }
292            RenderCmd::EndSelection => {
293                inner.end_selection();
294            }
295            RenderCmd::CopySelection(ack) => {
296                let result = inner.copy_selection();
297                let _ = ack.send(result);
298            }
299            RenderCmd::Ack { op, ack } => {
300                let t0 = Instant::now();
301                match op {
302                    AckOp::Reset => inner.reset(),
303                    AckOp::ClearScreen => inner.clear_screen(),
304                    AckOp::SuspendForExternal => inner.suspend_for_external(),
305                    AckOp::ResumeFromExternal => inner.resume_from_external(),
306                    AckOp::Shutdown => {
307                        inner.shutdown();
308                        crate::tuix_trace!(
309                            "REN",
310                            "Ack Shutdown dur={}µs",
311                            t0.elapsed().as_micros()
312                        );
313                        let _ = ack.send(());
314                        // Exit the loop — drop `inner` + `cmd_rx`.
315                        // Any queued commands after this point are
316                        // discarded (the sender's next send errors,
317                        // which callers treat as "worker gone").
318                        return;
319                    }
320                }
321                crate::tuix_trace!("REN", "Ack {:?} dur={}µs", op, t0.elapsed().as_micros());
322                let _ = ack.send(());
323            }
324        }
325    }
326    // Sender dropped without explicit Shutdown — still run shutdown so
327    // the terminal isn't left in raw mode on abrupt exit paths.
328    inner.shutdown();
329}
330
331/// Short tag for logging which UiLine variant the worker is processing.
332/// Keeps trace lines column-aligned so `grep Line` output is readable.
333fn ui_line_tag(l: &UiLine) -> &'static str {
334    match l {
335        UiLine::Welcome { .. } => "Welcome",
336        UiLine::User(_) => "User",
337            UiLine::AssistantText(_) => "AssistantText",
338            UiLine::ReasoningText(_) => "ReasoningText",
339            UiLine::AssistantLineBreak => "AssistantLineBreak",
340        UiLine::ToolCall { .. } => "ToolCall",
341            UiLine::ToolCallInFlight { .. } => "ToolCallInFlight",
342            UiLine::ToolCallCommit { .. } => "ToolCallCommit",
343            UiLine::ToolGroupRender { .. } => "ToolGroupRender",
344            UiLine::ToolGroupChildUpdate { .. } => "ToolGroupChildUpdate",
345            UiLine::ToolGroupSummary { .. } => "ToolGroupSummary",
346        UiLine::ToolResult { .. } => "ToolResult",
347        UiLine::DiffLine { .. } => "DiffLine",
348        UiLine::DiffBlock(_) => "DiffBlock",
349        UiLine::ApprovalPrompt { .. } => "ApprovalPrompt",
350        UiLine::Error(_) => "Error",
351        UiLine::Warning(_) => "Warning",
352        UiLine::TurnCancelled => "TurnCancelled",
353        UiLine::TurnComplete => "TurnComplete",
354        UiLine::Spinner { .. } => "Spinner",
355        UiLine::StreamingBox { .. } => "StreamingBox",
356        UiLine::ClearTransient => "ClearTransient",
357        UiLine::InputPrompt { .. } => "InputPrompt",
358        UiLine::InputCommit => "InputCommit",
359        UiLine::CommandOutput(_) => "CommandOutput",
360        UiLine::ImageAttachment(_) => "ImageAttachment",
361        UiLine::VisionPreprocessSuccess { .. } => "VisionPreprocessSuccess",
362        UiLine::TurnSeparator { .. } => "TurnSeparator",
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use crate::render::Renderer;
370    use std::sync::{Arc, Mutex};
371
372    /// Counting test renderer — records every call so tests can assert
373    /// the worker forwards correctly.
374    #[derive(Default)]
375    struct Counts {
376        renders: usize,
377        flushes: usize,
378        shutdowns: usize,
379        resets: usize,
380        clear_screens: usize,
381        suspends: usize,
382        resumes: usize,
383        deferred: usize,
384    }
385
386    struct TestRenderer {
387        counts: Arc<Mutex<Counts>>,
388    }
389
390    impl Renderer for TestRenderer {
391        fn render(&mut self, _line: UiLine) {
392            self.counts.lock().unwrap().renders += 1;
393        }
394        fn flush(&mut self) {
395            self.counts.lock().unwrap().flushes += 1;
396        }
397        fn shutdown(&mut self) {
398            self.counts.lock().unwrap().shutdowns += 1;
399        }
400        fn reset(&mut self) {
401            self.counts.lock().unwrap().resets += 1;
402        }
403        fn clear_screen(&mut self) {
404            self.counts.lock().unwrap().clear_screens += 1;
405        }
406        fn suspend_for_external(&mut self) {
407            self.counts.lock().unwrap().suspends += 1;
408        }
409        fn resume_from_external(&mut self) {
410            self.counts.lock().unwrap().resumes += 1;
411        }
412        fn flush_deferred(&mut self) {
413            self.counts.lock().unwrap().deferred += 1;
414        }
415    }
416
417    fn setup() -> (TaskRenderer, Arc<Mutex<Counts>>) {
418        let counts = Arc::new(Mutex::new(Counts::default()));
419        let inner = Box::new(TestRenderer {
420            counts: counts.clone(),
421        });
422        (TaskRenderer::new(inner), counts)
423    }
424
425    #[test]
426    fn render_and_flush_forward_to_inner() {
427        let (mut r, counts) = setup();
428        r.render(UiLine::User("hi".into()));
429        r.render(UiLine::User("there".into()));
430        r.flush();
431        // Force ordering: reset is an ACK op that blocks until the
432        // worker has drained earlier commands, so after reset() returns
433        // the renders + flush must already be counted.
434        r.reset();
435        let c = counts.lock().unwrap();
436        assert_eq!(c.renders, 2);
437        assert_eq!(c.flushes, 1);
438        assert_eq!(c.resets, 1);
439    }
440
441    #[test]
442    fn lifecycle_ack_blocks_until_worker_done() {
443        let (mut r, counts) = setup();
444        // Chain several lifecycle ACKs — each must complete in order
445        // before the next returns.
446        r.clear_screen();
447        assert_eq!(counts.lock().unwrap().clear_screens, 1);
448        r.suspend_for_external();
449        assert_eq!(counts.lock().unwrap().suspends, 1);
450        r.resume_from_external();
451        assert_eq!(counts.lock().unwrap().resumes, 1);
452    }
453
454    #[test]
455    fn shutdown_drops_worker_and_later_sends_are_noops() {
456        let (mut r, counts) = setup();
457        r.render(UiLine::User("before".into()));
458        r.shutdown();
459        assert_eq!(counts.lock().unwrap().shutdowns, 1);
460        // Worker is gone — these must not panic, even though no one is
461        // listening on the channel anymore.
462        r.render(UiLine::User("after".into()));
463        r.flush();
464        // Second shutdown is idempotent.
465        r.shutdown();
466    }
467
468    #[test]
469    fn drop_triggers_shutdown_when_not_called_explicitly() {
470        let counts = {
471            let counts = Arc::new(Mutex::new(Counts::default()));
472            let inner = Box::new(TestRenderer {
473                counts: counts.clone(),
474            });
475            let mut r = TaskRenderer::new(inner);
476            r.render(UiLine::User("one".into()));
477            counts
478            // r dropped here — Drop must shut the worker down + join.
479        };
480        // By the time Drop returns, the worker has finished, so the
481        // render AND one shutdown are accounted for.
482        let c = counts.lock().unwrap();
483        assert_eq!(c.renders, 1);
484        assert_eq!(c.shutdowns, 1);
485    }
486
487    #[test]
488    fn flush_deferred_fire_and_forget() {
489        let (mut r, counts) = setup();
490        r.flush_deferred();
491        // No ACK on flush_deferred — have to fence with a separate ACK
492        // to observe it deterministically.
493        r.reset();
494        assert_eq!(counts.lock().unwrap().deferred, 1);
495    }
496}