Skip to main content

bubbletea/
program.rs

1//! Program lifecycle and event loop.
2//!
3//! The Program struct manages the entire TUI application lifecycle,
4//! including terminal setup, event handling, and rendering.
5
6use std::io::{self, Read, Write};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::Duration;
12
13#[cfg(feature = "async")]
14use crate::command::CommandKind;
15
16#[cfg(feature = "async")]
17use tokio_util::sync::CancellationToken;
18
19#[cfg(feature = "async")]
20use tokio_util::task::TaskTracker;
21
22use tracing::debug;
23
24/// Spawn a closure for batch command execution.
25///
26/// When the `thread-pool` feature is enabled, uses rayon's work-stealing
27/// thread pool (bounded to `num_cpus` threads by default). Configure the
28/// pool size with [`rayon::ThreadPoolBuilder`] or the `RAYON_NUM_THREADS`
29/// environment variable.
30///
31/// Without the feature, falls back to spawning a new OS thread per command.
32#[cfg(feature = "thread-pool")]
33fn spawn_batch(f: impl FnOnce() + Send + 'static) {
34    rayon::spawn(f);
35}
36
37#[cfg(not(feature = "thread-pool"))]
38fn spawn_batch(f: impl FnOnce() + Send + 'static) {
39    let _ = thread::spawn(f);
40}
41
42use crossterm::{
43    cursor::{Hide, MoveTo, Show},
44    event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyEventKind},
45    execute,
46    terminal::{
47        self, Clear, ClearType, EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode,
48        enable_raw_mode,
49    },
50};
51
52use crate::command::Cmd;
53use crate::key::{from_crossterm_key, is_sequence_prefix};
54use crate::message::{
55    BatchMsg, BlurMsg, FocusMsg, InterruptMsg, Message, PrintLineMsg, QuitMsg,
56    RequestWindowSizeMsg, SequenceMsg, SetWindowTitleMsg, WindowSizeMsg,
57};
58use crate::mouse::from_crossterm_mouse;
59use crate::screen::{ReleaseTerminalMsg, RestoreTerminalMsg};
60use crate::{KeyMsg, KeyType};
61
62/// Errors that can occur when running a bubbletea program.
63///
64/// This enum represents all possible error conditions when running
65/// a TUI application with bubbletea.
66///
67/// # Error Handling
68///
69/// Most errors from bubbletea are recoverable. The recommended pattern
70/// is to use the `?` operator for propagation:
71///
72/// ```rust,ignore
73/// use bubbletea::{Program, Result};
74///
75/// fn run_app() -> Result<()> {
76///     let program = Program::new(MyModel::default());
77///     program.run()?;
78///     Ok(())
79/// }
80/// ```
81///
82/// # Recovery Strategies
83///
84/// | Error Variant | Recovery Strategy |
85/// |--------------|-------------------|
86/// | [`Io`](Error::Io) | Check terminal availability, retry, or report to user |
87/// | [`RawModeFailure`](Error::RawModeFailure) | Check terminal compatibility |
88/// | [`AltScreenFailure`](Error::AltScreenFailure) | Disable alt screen option |
89/// | [`EventPoll`](Error::EventPoll) | Terminal may be disconnected |
90/// | [`Render`](Error::Render) | Check output stream, retry |
91///
92/// # Example: Graceful Error Handling
93///
94/// ```rust,ignore
95/// use bubbletea::{Program, Error};
96///
97/// match Program::new(my_model).run() {
98///     Ok(final_model) => {
99///         println!("Program completed successfully");
100///     }
101///     Err(Error::Io(e)) if e.kind() == std::io::ErrorKind::NotConnected => {
102///         eprintln!("Terminal disconnected, saving state...");
103///         // Save any important state before exiting
104///     }
105///     Err(Error::RawModeFailure { .. }) => {
106///         eprintln!("Terminal doesn't support raw mode. Try a different terminal.");
107///     }
108///     Err(e) => {
109///         eprintln!("Program error: {}", e);
110///         std::process::exit(1);
111///     }
112/// }
113/// ```
114#[derive(thiserror::Error, Debug)]
115pub enum Error {
116    /// I/O error during terminal operations.
117    ///
118    /// This typically occurs when:
119    /// - The terminal is not available (e.g., running in a pipe)
120    /// - The terminal was closed unexpectedly
121    /// - System I/O resources are exhausted
122    /// - Terminal control sequences failed
123    ///
124    /// # Recovery
125    ///
126    /// Check if stdin/stdout are TTYs before starting your program.
127    /// Consider using a fallback mode for non-interactive environments.
128    ///
129    /// # Underlying Error
130    ///
131    /// The underlying [`std::io::Error`] can be accessed to determine
132    /// the specific cause. Common error kinds include:
133    /// - `NotConnected`: Terminal was disconnected
134    /// - `BrokenPipe`: Output stream closed
135    /// - `Other`: Terminal control sequence errors
136    #[error("terminal io error: {0}")]
137    Io(#[from] io::Error),
138
139    /// Failed to enable or disable raw mode.
140    ///
141    /// Raw mode is required for TUI operation as it disables terminal
142    /// line buffering and echo. This error typically indicates the
143    /// terminal doesn't support raw mode or isn't a TTY.
144    ///
145    /// # Recovery
146    ///
147    /// Verify the program is running in an interactive terminal.
148    /// Some terminals (especially on Windows) may have limited support.
149    #[error("failed to {action} raw mode: {source}")]
150    RawModeFailure {
151        /// Whether we were trying to enable or disable raw mode.
152        action: &'static str,
153        /// The underlying I/O error.
154        #[source]
155        source: io::Error,
156    },
157
158    /// Failed to enter or exit alternate screen.
159    ///
160    /// Alternate screen provides a separate buffer that preserves
161    /// the user's terminal content. This error may indicate the
162    /// terminal doesn't support alternate screen mode.
163    ///
164    /// # Recovery
165    ///
166    /// Try running without `.with_alt_screen()`.
167    #[error("failed to {action} alternate screen: {source}")]
168    AltScreenFailure {
169        /// Whether we were trying to enter or exit alt screen.
170        action: &'static str,
171        /// The underlying I/O error.
172        #[source]
173        source: io::Error,
174    },
175
176    /// Failed to poll for terminal events.
177    ///
178    /// This error occurs when the event polling system fails,
179    /// typically because the terminal was disconnected or closed.
180    ///
181    /// # Recovery
182    ///
183    /// The terminal connection may be lost. Save state and exit.
184    #[error("failed to poll terminal events: {0}")]
185    EventPoll(io::Error),
186
187    /// Failed to render the view to the terminal.
188    ///
189    /// This error occurs when writing the view output fails,
190    /// typically due to a broken pipe or disconnected terminal.
191    ///
192    /// # Recovery
193    ///
194    /// The output stream may be closed. Save state and exit.
195    #[error("failed to render view: {0}")]
196    Render(io::Error),
197}
198
199/// A specialized [`Result`] type for bubbletea operations.
200///
201/// This type alias is used throughout the bubbletea crate for convenience.
202/// It defaults to [`Error`] as the error type.
203///
204/// # Example
205///
206/// ```rust,ignore
207/// use bubbletea::Result;
208///
209/// fn run_program() -> Result<()> {
210///     // ... implementation
211///     Ok(())
212/// }
213/// ```
214///
215/// # Converting to Other Error Types
216///
217/// When integrating with other crates like `anyhow`:
218///
219/// ```rust,ignore
220/// use anyhow::Context;
221///
222/// fn main() -> anyhow::Result<()> {
223///     let model = bubbletea::Program::new(my_model)
224///         .run()
225///         .context("failed to run TUI program")?;
226///     Ok(())
227/// }
228/// ```
229pub type Result<T> = std::result::Result<T, Error>;
230
231/// The Model trait for TUI applications.
232///
233/// Implement this trait to define your application's behavior.
234///
235/// # Example
236///
237/// ```rust
238/// use bubbletea::{Model, Message, Cmd};
239///
240/// struct Counter { count: i32 }
241///
242/// impl Model for Counter {
243///     fn init(&self) -> Option<Cmd> { None }
244///
245///     fn update(&mut self, msg: Message) -> Option<Cmd> {
246///         if msg.is::<i32>() {
247///             self.count += msg.downcast::<i32>().unwrap();
248///         }
249///         None
250///     }
251///
252///     fn view(&self) -> String {
253///         format!("Count: {}", self.count)
254///     }
255/// }
256/// ```
257pub trait Model: Send + 'static {
258    /// Initialize the model and return an optional startup command.
259    ///
260    /// This is called once when the program starts.
261    fn init(&self) -> Option<Cmd>;
262
263    /// Process a message and return a new command.
264    ///
265    /// This is the pure update function at the heart of the Elm Architecture.
266    fn update(&mut self, msg: Message) -> Option<Cmd>;
267
268    /// Render the model as a string for display.
269    ///
270    /// This should be a pure function with no side effects.
271    fn view(&self) -> String;
272}
273
274/// Program options.
275#[derive(Debug, Clone)]
276pub struct ProgramOptions {
277    /// Use alternate screen buffer.
278    pub alt_screen: bool,
279    /// Enable mouse cell motion tracking.
280    pub mouse_cell_motion: bool,
281    /// Enable mouse all motion tracking.
282    pub mouse_all_motion: bool,
283    /// Enable bracketed paste mode.
284    pub bracketed_paste: bool,
285    /// Enable focus reporting.
286    pub report_focus: bool,
287    /// Use custom I/O (skip terminal setup and event polling).
288    pub custom_io: bool,
289    /// Target frames per second for rendering.
290    pub fps: u32,
291    /// Disable signal handling.
292    pub without_signals: bool,
293    /// Don't catch panics.
294    pub without_catch_panics: bool,
295}
296
297impl Default for ProgramOptions {
298    fn default() -> Self {
299        Self {
300            alt_screen: false,
301            mouse_cell_motion: false,
302            mouse_all_motion: false,
303            bracketed_paste: true,
304            report_focus: false,
305            custom_io: false,
306            fps: 60,
307            without_signals: false,
308            without_catch_panics: false,
309        }
310    }
311}
312
313/// Handle to a running program.
314///
315/// Returned by [`Program::start()`] to allow external interaction with the
316/// running TUI program. This is particularly useful for SSH applications
317/// where events need to be injected from outside the program.
318///
319/// # Example
320///
321/// ```rust,ignore
322/// use bubbletea::{Program, Message};
323///
324/// let handle = Program::new(MyModel::default())
325///     .with_custom_io()
326///     .start();
327///
328/// // Send a message to the running program
329/// handle.send(MyMessage::DoSomething);
330///
331/// // Wait for the program to finish
332/// let final_model = handle.wait()?;
333/// ```
334pub struct ProgramHandle<M: Model> {
335    tx: Sender<Message>,
336    handle: Option<thread::JoinHandle<Result<M>>>,
337}
338
339impl<M: Model> ProgramHandle<M> {
340    /// Send a message to the running program.
341    ///
342    /// This queues the message for processing in the program's event loop.
343    /// Returns `true` if the message was sent successfully, `false` if the
344    /// program has already exited.
345    pub fn send<T: Into<Message>>(&self, msg: T) -> bool {
346        self.tx.send(msg.into()).is_ok()
347    }
348
349    /// Request the program to quit.
350    ///
351    /// This sends a `QuitMsg` to the program's event loop.
352    pub fn quit(&self) {
353        let _ = self.tx.send(Message::new(QuitMsg));
354    }
355
356    /// Wait for the program to finish and return the final model state.
357    ///
358    /// This blocks until the program exits.
359    pub fn wait(mut self) -> Result<M> {
360        if let Some(handle) = self.handle.take() {
361            handle
362                .join()
363                .map_err(|_| Error::Io(io::Error::other("program thread panicked")))?
364        } else {
365            Err(Error::Io(io::Error::other("program already joined")))
366        }
367    }
368
369    /// Check if the program is still running.
370    pub fn is_running(&self) -> bool {
371        self.handle.as_ref().is_some_and(|h| !h.is_finished())
372    }
373}
374
375/// The main program runner.
376///
377/// Program manages the entire lifecycle of a TUI application:
378/// - Terminal setup and teardown
379/// - Event polling and message dispatching
380/// - Frame-rate limited rendering
381///
382/// # Example
383///
384/// ```rust,ignore
385/// use bubbletea::Program;
386///
387/// let model = MyModel::new();
388/// let final_model = Program::new(model)
389///     .with_alt_screen()
390///     .run()?;
391/// ```
392pub struct Program<M: Model> {
393    model: M,
394    options: ProgramOptions,
395    external_rx: Option<Receiver<Message>>,
396    input: Option<Box<dyn Read + Send>>,
397    output: Option<Box<dyn Write + Send>>,
398}
399
400impl<M: Model> Program<M> {
401    /// Create a new program with the given model.
402    pub fn new(model: M) -> Self {
403        Self {
404            model,
405            options: ProgramOptions::default(),
406            external_rx: None,
407            input: None,
408            output: None,
409        }
410    }
411
412    /// Provide an external message receiver.
413    ///
414    /// Messages received on this channel will be forwarded to the program's event loop.
415    /// This is useful for injecting events from external sources (e.g. SSH).
416    pub fn with_input_receiver(mut self, rx: Receiver<Message>) -> Self {
417        self.external_rx = Some(rx);
418        self
419    }
420
421    /// Provide a custom input reader.
422    ///
423    /// This enables custom I/O mode and reads raw bytes from the given reader,
424    /// translating them into Bubbletea messages.
425    pub fn with_input<R: Read + Send + 'static>(mut self, input: R) -> Self {
426        self.input = Some(Box::new(input));
427        self.options.custom_io = true;
428        self
429    }
430
431    /// Provide a custom output writer.
432    ///
433    /// This enables custom I/O mode and writes render output to the given writer.
434    pub fn with_output<W: Write + Send + 'static>(mut self, output: W) -> Self {
435        self.output = Some(Box::new(output));
436        self.options.custom_io = true;
437        self
438    }
439
440    /// Use alternate screen buffer (full-screen mode).
441    pub fn with_alt_screen(mut self) -> Self {
442        self.options.alt_screen = true;
443        self
444    }
445
446    /// Enable mouse cell motion tracking.
447    ///
448    /// Reports mouse clicks and drags.
449    pub fn with_mouse_cell_motion(mut self) -> Self {
450        self.options.mouse_cell_motion = true;
451        self
452    }
453
454    /// Enable mouse all motion tracking.
455    ///
456    /// Reports all mouse movement, even without button presses.
457    pub fn with_mouse_all_motion(mut self) -> Self {
458        self.options.mouse_all_motion = true;
459        self
460    }
461
462    /// Set the target frames per second.
463    ///
464    /// Default is 60 FPS. Valid range is 1-120 FPS.
465    pub fn with_fps(mut self, fps: u32) -> Self {
466        self.options.fps = fps.clamp(1, 120);
467        self
468    }
469
470    /// Enable focus reporting.
471    ///
472    /// Sends FocusMsg and BlurMsg when terminal gains/loses focus.
473    pub fn with_report_focus(mut self) -> Self {
474        self.options.report_focus = true;
475        self
476    }
477
478    /// Disable bracketed paste mode.
479    pub fn without_bracketed_paste(mut self) -> Self {
480        self.options.bracketed_paste = false;
481        self
482    }
483
484    /// Disable signal handling.
485    pub fn without_signal_handler(mut self) -> Self {
486        self.options.without_signals = true;
487        self
488    }
489
490    /// Don't catch panics.
491    pub fn without_catch_panics(mut self) -> Self {
492        self.options.without_catch_panics = true;
493        self
494    }
495
496    /// Enable custom I/O mode (skip terminal setup and crossterm polling).
497    ///
498    /// This is useful when embedding bubbletea in environments that manage
499    /// terminal state externally or when events are injected manually.
500    pub fn with_custom_io(mut self) -> Self {
501        self.options.custom_io = true;
502        self
503    }
504
505    /// Run the program with a custom writer.
506    pub fn run_with_writer<W: Write + Send + 'static>(self, mut writer: W) -> Result<M> {
507        // Save options for cleanup (since self will be moved)
508        let options = self.options.clone();
509
510        // Setup terminal (skip for custom IO)
511        if !options.custom_io {
512            enable_raw_mode()?;
513        }
514
515        if options.alt_screen {
516            execute!(writer, EnterAlternateScreen)?;
517        }
518
519        execute!(writer, Hide)?;
520
521        if options.mouse_all_motion {
522            execute!(writer, EnableMouseCapture)?;
523        } else if options.mouse_cell_motion {
524            execute!(writer, EnableMouseCapture)?;
525        }
526
527        if options.report_focus {
528            execute!(writer, event::EnableFocusChange)?;
529        }
530
531        if options.bracketed_paste {
532            execute!(writer, event::EnableBracketedPaste)?;
533        }
534
535        // Install a panic hook that restores the terminal before printing the
536        // panic message. Without this, a panic in update()/view() leaves the
537        // terminal in raw mode with hidden cursor and alternate screen active.
538        let prev_hook = if !options.without_catch_panics {
539            let cleanup_opts = options.clone();
540            let prev = std::panic::take_hook();
541            std::panic::set_hook(Box::new(move |info| {
542                // Best-effort terminal restoration — ignore errors since we're
543                // already in a panic context.
544                let mut stderr = io::stderr();
545                if cleanup_opts.bracketed_paste {
546                    let _ = execute!(stderr, event::DisableBracketedPaste);
547                }
548                if cleanup_opts.report_focus {
549                    let _ = execute!(stderr, event::DisableFocusChange);
550                }
551                if cleanup_opts.mouse_all_motion || cleanup_opts.mouse_cell_motion {
552                    let _ = execute!(stderr, DisableMouseCapture);
553                }
554                let _ = execute!(stderr, Show);
555                if cleanup_opts.alt_screen {
556                    let _ = execute!(stderr, LeaveAlternateScreen);
557                }
558                if !cleanup_opts.custom_io {
559                    let _ = disable_raw_mode();
560                }
561                // Call the previous hook so the user still sees the panic message
562                prev(info);
563            }));
564            true
565        } else {
566            false
567        };
568
569        // Run the event loop
570        let result = self.event_loop(&mut writer);
571
572        // Restore the previous panic hook before cleanup so a late panic in the
573        // cleanup code itself doesn't recurse.
574        if prev_hook {
575            let _ = std::panic::take_hook();
576            // Note: we can't easily restore the *original* hook since set_hook
577            // moved it into the closure. The default hook is fine for post-run.
578        }
579
580        // Cleanup terminal
581        if options.bracketed_paste {
582            let _ = execute!(writer, event::DisableBracketedPaste);
583        }
584
585        if options.report_focus {
586            let _ = execute!(writer, event::DisableFocusChange);
587        }
588
589        if options.mouse_all_motion || options.mouse_cell_motion {
590            let _ = execute!(writer, DisableMouseCapture);
591        }
592
593        let _ = execute!(writer, Show);
594
595        if options.alt_screen {
596            let _ = execute!(writer, LeaveAlternateScreen);
597        }
598
599        if !options.custom_io {
600            let _ = disable_raw_mode();
601        }
602
603        result
604    }
605
606    /// Run the program and return the final model state.
607    pub fn run(mut self) -> Result<M> {
608        if let Some(output) = self.output.take() {
609            return self.run_with_writer(output);
610        }
611
612        let stdout = io::stdout();
613        self.run_with_writer(stdout)
614    }
615
616    /// Start the program in a background thread and return a handle for interaction.
617    ///
618    /// This is useful for SSH applications and other scenarios where you need to
619    /// inject events from external sources after the program has started.
620    ///
621    /// # Example
622    ///
623    /// ```rust,ignore
624    /// use bubbletea::{Program, Message};
625    ///
626    /// let handle = Program::new(MyModel::default())
627    ///     .with_custom_io()
628    ///     .start();
629    ///
630    /// // Inject a key event
631    /// handle.send(KeyMsg::from_char('a'));
632    ///
633    /// // Later, quit the program
634    /// handle.quit();
635    ///
636    /// // Wait for completion
637    /// let final_model = handle.wait()?;
638    /// ```
639    pub fn start(mut self) -> ProgramHandle<M> {
640        // Create channel for external message injection
641        let (tx, rx) = mpsc::channel();
642
643        // Set up external receiver (will be forwarded in event_loop)
644        self.external_rx = Some(rx);
645
646        // Take ownership of custom output if provided
647        let output = self.output.take();
648
649        // Spawn program in background thread
650        let handle = thread::spawn(move || {
651            if let Some(output) = output {
652                self.run_with_writer(output)
653            } else {
654                let stdout = io::stdout();
655                self.run_with_writer(stdout)
656            }
657        });
658
659        ProgramHandle {
660            tx,
661            handle: Some(handle),
662        }
663    }
664
665    fn event_loop<W: Write>(mut self, writer: &mut W) -> Result<M> {
666        // Create message channel
667        let (tx, rx): (Sender<Message>, Receiver<Message>) = mpsc::channel();
668
669        // Thread handles for clean shutdown (bd-3dmx, bd-3azk)
670        let mut external_forwarder_handle: Option<thread::JoinHandle<()>> = None;
671        let mut input_parser_handle: Option<thread::JoinHandle<()>> = None;
672        let external_shutdown = Arc::new(AtomicBool::new(false));
673
674        // Track command execution threads for graceful shutdown (bd-zyb8)
675        let command_threads: Arc<Mutex<Vec<thread::JoinHandle<()>>>> =
676            Arc::new(Mutex::new(Vec::new()));
677
678        // Forward external messages
679        if let Some(ext_rx) = self.external_rx.take() {
680            let tx_clone = tx.clone();
681            let shutdown_clone = Arc::clone(&external_shutdown);
682            debug!(target: "bubbletea::thread", "Spawning external forwarder thread");
683            external_forwarder_handle = Some(thread::spawn(move || {
684                const POLL_INTERVAL: Duration = Duration::from_millis(50);
685                loop {
686                    if shutdown_clone.load(Ordering::Relaxed) {
687                        break;
688                    }
689
690                    match ext_rx.recv_timeout(POLL_INTERVAL) {
691                        Ok(msg) => {
692                            if tx_clone.send(msg).is_err() {
693                                debug!(target: "bubbletea::event", "external message dropped — receiver disconnected");
694                                break;
695                            }
696                        }
697                        Err(RecvTimeoutError::Timeout) => {}
698                        Err(RecvTimeoutError::Disconnected) => break,
699                    }
700                }
701            }));
702        }
703
704        // Read custom input stream and inject messages.
705        if let Some(mut input) = self.input.take() {
706            let tx_clone = tx.clone();
707            debug!(target: "bubbletea::thread", "Spawning input parser thread");
708            input_parser_handle = Some(thread::spawn(move || {
709                let mut parser = InputParser::new();
710                let mut buf = [0u8; 256];
711                loop {
712                    match input.read(&mut buf) {
713                        Ok(0) => break,
714                        Ok(n) => {
715                            // We always assume there could be more data unless we hit EOF (Ok(0))
716                            let can_have_more_data = true;
717                            for msg in parser.push_bytes(&buf[..n], can_have_more_data) {
718                                if tx_clone.send(msg).is_err() {
719                                    debug!(target: "bubbletea::input", "input message dropped — receiver disconnected");
720                                    return;
721                                }
722                            }
723                        }
724                        Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
725                            thread::yield_now();
726                        }
727                        Err(_) => break,
728                    }
729                }
730
731                for msg in parser.flush() {
732                    if tx_clone.send(msg).is_err() {
733                        debug!(target: "bubbletea::input", "flush message dropped — receiver disconnected");
734                        break;
735                    }
736                }
737            }));
738        }
739
740        // Get initial window size (only if not custom IO, otherwise trust init msg)
741        if !self.options.custom_io
742            && let Ok((width, height)) = terminal::size()
743            && tx
744                .send(Message::new(WindowSizeMsg { width, height }))
745                .is_err()
746        {
747            debug!(target: "bubbletea::event", "initial window size dropped — receiver disconnected");
748        }
749
750        // Call init and handle initial command
751        if let Some(cmd) = self.model.init() {
752            self.handle_command(cmd, tx.clone(), Arc::clone(&command_threads));
753        }
754
755        // Render initial view
756        let mut last_view = String::new();
757        self.render(writer, &mut last_view)?;
758
759        // Frame timing
760        let frame_duration = Duration::from_secs_f64(1.0 / self.options.fps as f64);
761
762        // Event loop
763        loop {
764            // Poll for events with frame-rate limiting (skip poll if custom IO)
765            // In custom IO mode, events are injected via `with_input_receiver()` or `with_input()`.
766            // Crossterm polling is skipped since input comes from external sources.
767            if !self.options.custom_io && event::poll(frame_duration)? {
768                match event::read()? {
769                    Event::Key(key_event) => {
770                        // Only handle key press events, not release
771                        if key_event.kind != KeyEventKind::Press {
772                            continue;
773                        }
774
775                        let key_msg = from_crossterm_key(key_event.code, key_event.modifiers);
776
777                        // Handle Ctrl+C specially
778                        if key_msg.key_type == crate::KeyType::CtrlC {
779                            if tx.send(Message::new(InterruptMsg)).is_err() {
780                                debug!(target: "bubbletea::event", "interrupt message dropped — receiver disconnected");
781                            }
782                        } else if tx.send(Message::new(key_msg)).is_err() {
783                            debug!(target: "bubbletea::event", "key message dropped — receiver disconnected");
784                        }
785                    }
786                    Event::Mouse(mouse_event) => {
787                        let mouse_msg = from_crossterm_mouse(mouse_event);
788                        if tx.send(Message::new(mouse_msg)).is_err() {
789                            debug!(target: "bubbletea::event", "mouse message dropped — receiver disconnected");
790                        }
791                    }
792                    Event::Resize(width, height) => {
793                        if tx
794                            .send(Message::new(WindowSizeMsg { width, height }))
795                            .is_err()
796                        {
797                            debug!(target: "bubbletea::event", "resize message dropped — receiver disconnected");
798                        }
799                    }
800                    Event::FocusGained => {
801                        if tx.send(Message::new(FocusMsg)).is_err() {
802                            debug!(target: "bubbletea::event", "focus message dropped — receiver disconnected");
803                        }
804                    }
805                    Event::FocusLost => {
806                        if tx.send(Message::new(BlurMsg)).is_err() {
807                            debug!(target: "bubbletea::event", "blur message dropped — receiver disconnected");
808                        }
809                    }
810                    Event::Paste(text) => {
811                        // Send as a key message with paste flag
812                        let key_msg = KeyMsg {
813                            key_type: crate::KeyType::Runes,
814                            runes: text.chars().collect(),
815                            alt: false,
816                            paste: true,
817                        };
818                        if tx.send(Message::new(key_msg)).is_err() {
819                            debug!(target: "bubbletea::event", "paste message dropped — receiver disconnected");
820                        }
821                    }
822                }
823            }
824
825            // Process all pending messages
826            let mut needs_render = false;
827            let mut should_quit = false;
828            while let Ok(msg) = rx.try_recv() {
829                // Check for quit message
830                if msg.is::<QuitMsg>() {
831                    should_quit = true;
832                    break;
833                }
834
835                // Check for interrupt message (Ctrl+C)
836                if msg.is::<InterruptMsg>() {
837                    should_quit = true;
838                    break;
839                }
840
841                // Handle batch message (already handled in handle_command)
842                if msg.is::<BatchMsg>() {
843                    continue;
844                }
845
846                // Handle window title
847                if let Some(title_msg) = msg.downcast_ref::<SetWindowTitleMsg>() {
848                    execute!(writer, terminal::SetTitle(&title_msg.0))?;
849                    continue;
850                }
851
852                // Handle window size request
853                if msg.is::<RequestWindowSizeMsg>() {
854                    if !self.options.custom_io
855                        && let Ok((width, height)) = terminal::size()
856                        && tx
857                            .send(Message::new(WindowSizeMsg { width, height }))
858                            .is_err()
859                    {
860                        debug!(target: "bubbletea::event", "window size response dropped — receiver disconnected");
861                    }
862                    continue;
863                }
864
865                // Handle print line message (only when not in alt screen)
866                if let Some(print_msg) = msg.downcast_ref::<PrintLineMsg>() {
867                    if !self.options.alt_screen {
868                        // Print each line above the TUI
869                        for line in print_msg.0.lines() {
870                            let _ = writeln!(writer, "{}", line);
871                        }
872                        let _ = writer.flush();
873                        // Force a full re-render since we printed above
874                        last_view.clear();
875                        needs_render = true;
876                    }
877                    continue;
878                }
879
880                // Handle release terminal
881                if msg.is::<ReleaseTerminalMsg>() {
882                    if !self.options.custom_io {
883                        // Disable features in reverse order
884                        if self.options.bracketed_paste {
885                            let _ = execute!(writer, event::DisableBracketedPaste);
886                        }
887                        if self.options.report_focus {
888                            let _ = execute!(writer, event::DisableFocusChange);
889                        }
890                        if self.options.mouse_all_motion || self.options.mouse_cell_motion {
891                            let _ = execute!(writer, DisableMouseCapture);
892                        }
893                        let _ = execute!(writer, Show);
894                        if self.options.alt_screen {
895                            let _ = execute!(writer, LeaveAlternateScreen);
896                        }
897                        let _ = disable_raw_mode();
898                    }
899                    continue;
900                }
901
902                // Handle restore terminal
903                if msg.is::<RestoreTerminalMsg>() {
904                    if !self.options.custom_io {
905                        // Re-enable features in original order
906                        let _ = enable_raw_mode();
907                        if self.options.alt_screen {
908                            let _ = execute!(writer, EnterAlternateScreen);
909                        }
910                        let _ = execute!(writer, Hide);
911                        if self.options.mouse_all_motion {
912                            let _ = execute!(writer, EnableMouseCapture);
913                        } else if self.options.mouse_cell_motion {
914                            let _ = execute!(writer, EnableMouseCapture);
915                        }
916                        if self.options.report_focus {
917                            let _ = execute!(writer, event::EnableFocusChange);
918                        }
919                        if self.options.bracketed_paste {
920                            let _ = execute!(writer, event::EnableBracketedPaste);
921                        }
922                        // Force a full re-render
923                        last_view.clear();
924                    }
925                    needs_render = true;
926                    continue;
927                }
928
929                // Update model
930                if let Some(cmd) = self.model.update(msg) {
931                    self.handle_command(cmd, tx.clone(), Arc::clone(&command_threads));
932                }
933                needs_render = true;
934            }
935
936            // Exit main loop if quit was requested
937            if should_quit {
938                break;
939            }
940
941            // Render if needed
942            if needs_render {
943                self.render(writer, &mut last_view)?;
944            }
945
946            // Sleep a bit if loop is tight (only needed if poll didn't sleep)
947            if self.options.custom_io {
948                thread::sleep(frame_duration);
949            }
950        }
951
952        // Clean shutdown: drop sender to signal threads, then join them (bd-3azk)
953        external_shutdown.store(true, Ordering::Relaxed);
954        drop(tx);
955        debug!(target: "bubbletea::thread", "Sender dropped, waiting for threads to exit");
956
957        if let Some(handle) = external_forwarder_handle {
958            match handle.join() {
959                Ok(()) => {
960                    debug!(target: "bubbletea::thread", "External forwarder thread joined successfully")
961                }
962                Err(e) => {
963                    tracing::warn!(target: "bubbletea::thread", "External forwarder thread panicked: {:?}", e)
964                }
965            }
966        }
967
968        if let Some(handle) = input_parser_handle {
969            match handle.join() {
970                Ok(()) => {
971                    debug!(target: "bubbletea::thread", "Input parser thread joined successfully")
972                }
973                Err(e) => {
974                    tracing::warn!(target: "bubbletea::thread", "Input parser thread panicked: {:?}", e)
975                }
976            }
977        }
978
979        // Join command execution threads with timeout (bd-zyb8)
980        // Give commands a reasonable time to complete, but don't block forever.
981        const COMMAND_THREAD_TIMEOUT: Duration = Duration::from_secs(5);
982        let join_deadline = std::time::Instant::now() + COMMAND_THREAD_TIMEOUT;
983
984        if let Ok(mut threads) = command_threads.lock() {
985            let thread_count = threads.len();
986            if thread_count > 0 {
987                debug!(target: "bubbletea::thread", "Waiting for {} command thread(s) to complete", thread_count);
988            }
989
990            // Drain and join all threads
991            for handle in threads.drain(..) {
992                if handle.is_finished() {
993                    // Thread already done, just join to clean up
994                    let _ = handle.join();
995                } else {
996                    // Thread still running - wait with timeout
997                    let remaining =
998                        join_deadline.saturating_duration_since(std::time::Instant::now());
999                    if remaining.is_zero() {
1000                        debug!(target: "bubbletea::thread", "Timeout waiting for command threads, abandoning remaining");
1001                        break;
1002                    }
1003
1004                    // Spin-wait with small sleeps until thread finishes or timeout
1005                    let poll_interval = Duration::from_millis(10);
1006                    let start = std::time::Instant::now();
1007                    while !handle.is_finished() && start.elapsed() < remaining {
1008                        thread::sleep(poll_interval);
1009                    }
1010
1011                    if handle.is_finished() {
1012                        match handle.join() {
1013                            Ok(()) => {
1014                                debug!(target: "bubbletea::thread", "Command thread joined successfully")
1015                            }
1016                            Err(e) => {
1017                                tracing::warn!(target: "bubbletea::thread", "Command thread panicked: {:?}", e)
1018                            }
1019                        }
1020                    } else {
1021                        debug!(target: "bubbletea::thread", "Command thread did not finish in time, abandoning");
1022                    }
1023                }
1024            }
1025        } else {
1026            tracing::warn!(target: "bubbletea::thread", "Failed to join command threads: mutex poisoned");
1027        }
1028
1029        Ok(self.model)
1030    }
1031
1032    fn handle_command(
1033        &self,
1034        cmd: Cmd,
1035        tx: Sender<Message>,
1036        command_threads: Arc<Mutex<Vec<thread::JoinHandle<()>>>>,
1037    ) {
1038        // Spawn thread and track its handle for graceful shutdown (bd-zyb8)
1039        let handle = thread::spawn(move || {
1040            if let Some(msg) = cmd.execute() {
1041                // Handle batch and sequence messages specially
1042                if msg.is::<BatchMsg>() {
1043                    if let Some(batch) = msg.downcast::<BatchMsg>() {
1044                        for cmd in batch.0 {
1045                            let tx_clone = tx.clone();
1046                            spawn_batch(move || {
1047                                if let Some(msg) = cmd.execute()
1048                                    && tx_clone.send(msg).is_err()
1049                                {
1050                                    debug!(target: "bubbletea::command", "batch command result dropped — receiver disconnected");
1051                                }
1052                            });
1053                        }
1054                    }
1055                } else if msg.is::<SequenceMsg>() {
1056                    if let Some(seq) = msg.downcast::<SequenceMsg>() {
1057                        for cmd in seq.0 {
1058                            if let Some(msg) = cmd.execute()
1059                                && tx.send(msg).is_err()
1060                            {
1061                                debug!(target: "bubbletea::command", "sequence command result dropped — receiver disconnected");
1062                                break;
1063                            }
1064                        }
1065                    }
1066                } else if tx.send(msg).is_err() {
1067                    debug!(target: "bubbletea::command", "command result dropped — receiver disconnected");
1068                }
1069            }
1070        });
1071
1072        // Track the handle for shutdown (lock is brief, just a Vec push)
1073        if let Ok(mut threads) = command_threads.lock() {
1074            // Prune finished threads to prevent unbounded growth
1075            threads.retain(|h| !h.is_finished());
1076            threads.push(handle);
1077        } else {
1078            // Mutex was poisoned (a thread panicked while holding it).
1079            // Log and continue - the handle will be orphaned but program can proceed.
1080            debug!(target: "bubbletea::thread", "Failed to track command thread: mutex poisoned");
1081        }
1082    }
1083
1084    fn render<W: Write>(&self, writer: &mut W, last_view: &mut String) -> Result<()> {
1085        let view = self.model.view();
1086
1087        // Skip if view hasn't changed
1088        if view == *last_view {
1089            return Ok(());
1090        }
1091
1092        // Clear and render
1093        execute!(writer, MoveTo(0, 0), Clear(ClearType::All))?;
1094        write!(writer, "{}", view)?;
1095        writer.flush()?;
1096
1097        *last_view = view;
1098        Ok(())
1099    }
1100}
1101
1102// =============================================================================
1103// Async Program Implementation (requires "async" feature)
1104// =============================================================================
1105
1106#[cfg(feature = "async")]
1107impl<M: Model> Program<M> {
1108    /// Run the program using the tokio async runtime.
1109    ///
1110    /// This is the async version of `run()`. It uses tokio for command execution
1111    /// and event handling, which is more efficient for I/O-bound operations.
1112    ///
1113    /// # Example
1114    ///
1115    /// ```rust,ignore
1116    /// use bubbletea::Program;
1117    ///
1118    /// #[tokio::main]
1119    /// async fn main() -> Result<(), bubbletea::Error> {
1120    ///     let model = MyModel::new();
1121    ///     let final_model = Program::new(model)
1122    ///         .with_alt_screen()
1123    ///         .run_async()
1124    ///         .await?;
1125    ///     Ok(())
1126    /// }
1127    /// ```
1128    pub async fn run_async(mut self) -> Result<M> {
1129        if let Some(output) = self.output.take() {
1130            return self.run_async_with_writer(output).await;
1131        }
1132
1133        let stdout = io::stdout();
1134        self.run_async_with_writer(stdout).await
1135    }
1136
1137    /// Run the program using the tokio async runtime with a custom writer.
1138    pub async fn run_async_with_writer<W: Write + Send + 'static>(
1139        self,
1140        mut writer: W,
1141    ) -> Result<M> {
1142        // Save options for cleanup (since self will be moved)
1143        let options = self.options.clone();
1144
1145        // Setup terminal (skip for custom I/O)
1146        if !options.custom_io {
1147            enable_raw_mode()?;
1148        }
1149
1150        if options.alt_screen {
1151            execute!(writer, EnterAlternateScreen)?;
1152        }
1153
1154        execute!(writer, Hide)?;
1155
1156        if options.mouse_all_motion {
1157            execute!(writer, EnableMouseCapture)?;
1158        } else if options.mouse_cell_motion {
1159            execute!(writer, EnableMouseCapture)?;
1160        }
1161
1162        if options.report_focus {
1163            execute!(writer, event::EnableFocusChange)?;
1164        }
1165
1166        if options.bracketed_paste {
1167            execute!(writer, event::EnableBracketedPaste)?;
1168        }
1169
1170        // Install panic hook to restore terminal on panic (mirrors sync path)
1171        let prev_hook = if !options.without_catch_panics {
1172            let cleanup_opts = options.clone();
1173            let prev = std::panic::take_hook();
1174            std::panic::set_hook(Box::new(move |info| {
1175                let mut stderr = io::stderr();
1176                if cleanup_opts.bracketed_paste {
1177                    let _ = execute!(stderr, event::DisableBracketedPaste);
1178                }
1179                if cleanup_opts.report_focus {
1180                    let _ = execute!(stderr, event::DisableFocusChange);
1181                }
1182                if cleanup_opts.mouse_all_motion || cleanup_opts.mouse_cell_motion {
1183                    let _ = execute!(stderr, DisableMouseCapture);
1184                }
1185                let _ = execute!(stderr, Show);
1186                if cleanup_opts.alt_screen {
1187                    let _ = execute!(stderr, LeaveAlternateScreen);
1188                }
1189                if !cleanup_opts.custom_io {
1190                    let _ = disable_raw_mode();
1191                }
1192                prev(info);
1193            }));
1194            true
1195        } else {
1196            false
1197        };
1198
1199        // Run the async event loop
1200        let result = self.event_loop_async(&mut writer).await;
1201
1202        // Restore panic hook
1203        if prev_hook {
1204            let _ = std::panic::take_hook();
1205        }
1206
1207        // Cleanup terminal
1208        if options.bracketed_paste {
1209            let _ = execute!(writer, event::DisableBracketedPaste);
1210        }
1211
1212        if options.report_focus {
1213            let _ = execute!(writer, event::DisableFocusChange);
1214        }
1215
1216        if options.mouse_all_motion || options.mouse_cell_motion {
1217            let _ = execute!(writer, DisableMouseCapture);
1218        }
1219
1220        let _ = execute!(writer, Show);
1221
1222        if options.alt_screen {
1223            let _ = execute!(writer, LeaveAlternateScreen);
1224        }
1225
1226        if !options.custom_io {
1227            let _ = disable_raw_mode();
1228        }
1229
1230        result
1231    }
1232
1233    async fn event_loop_async<W: Write>(mut self, stdout: &mut W) -> Result<M> {
1234        // Create async message channel
1235        let (tx, mut rx) = tokio::sync::mpsc::channel::<Message>(256);
1236
1237        // Create cancellation token and task tracker for graceful shutdown
1238        let cancel_token = CancellationToken::new();
1239        let task_tracker = TaskTracker::new();
1240
1241        // Forward external messages using tokio's blocking thread pool
1242        // This is tracked for graceful shutdown and respects cancellation
1243        if let Some(ext_rx) = self.external_rx.take() {
1244            let tx_clone = tx.clone();
1245            let cancel_clone = cancel_token.clone();
1246            task_tracker.spawn_blocking(move || {
1247                // Use recv_timeout to periodically check for cancellation
1248                let timeout = Duration::from_millis(100);
1249                loop {
1250                    if cancel_clone.is_cancelled() {
1251                        break;
1252                    }
1253                    match ext_rx.recv_timeout(timeout) {
1254                        Ok(msg) => {
1255                            if tx_clone.blocking_send(msg).is_err() {
1256                                debug!(target: "bubbletea::event", "async external message dropped — receiver disconnected");
1257                                break;
1258                            }
1259                        }
1260                        Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
1261                            // Continue loop to check cancellation
1262                        }
1263                        Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
1264                            // Channel closed, exit
1265                            break;
1266                        }
1267                    }
1268                }
1269            });
1270        }
1271
1272        // Read custom input stream and inject messages.
1273        if let Some(mut input) = self.input.take() {
1274            let tx_clone = tx.clone();
1275            let cancel_clone = cancel_token.clone();
1276            task_tracker.spawn_blocking(move || {
1277                let mut parser = InputParser::new();
1278                let mut buf = [0u8; 256];
1279                loop {
1280                    if cancel_clone.is_cancelled() {
1281                        break;
1282                    }
1283                    match input.read(&mut buf) {
1284                        Ok(0) => break,
1285                        Ok(n) => {
1286                            // We always assume there could be more data unless we hit EOF (Ok(0))
1287                            let can_have_more_data = true;
1288                            for msg in parser.push_bytes(&buf[..n], can_have_more_data) {
1289                                if tx_clone.blocking_send(msg).is_err() {
1290                                    return;
1291                                }
1292                            }
1293                        }
1294                        Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1295                            std::thread::yield_now();
1296                        }
1297                        Err(_) => break,
1298                    }
1299                }
1300
1301                for msg in parser.flush() {
1302                    if tx_clone.blocking_send(msg).is_err() {
1303                        break;
1304                    }
1305                }
1306            });
1307        }
1308
1309        // Spawn event listener thread (bd-2353: use task_tracker for graceful shutdown)
1310        let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<Event>(100);
1311        let event_cancel = cancel_token.clone();
1312
1313        if !self.options.custom_io {
1314            task_tracker.spawn_blocking(move || {
1315                loop {
1316                    if event_cancel.is_cancelled() {
1317                        break;
1318                    }
1319                    // Poll with timeout to check cancellation
1320                    match event::poll(Duration::from_millis(100)) {
1321                        Ok(true) => {
1322                            if let Ok(evt) = event::read()
1323                                && event_tx.blocking_send(evt).is_err()
1324                            {
1325                                break;
1326                            }
1327                        }
1328                        Ok(false) => {} // timeout
1329                        Err(_) => {
1330                            break;
1331                        } // error
1332                    }
1333                }
1334            });
1335        }
1336
1337        // Get initial window size
1338        if !self.options.custom_io {
1339            let (width, height) = terminal::size()?;
1340            if tx
1341                .send(Message::new(WindowSizeMsg { width, height }))
1342                .await
1343                .is_err()
1344            {
1345                debug!(target: "bubbletea::event", "async initial window size dropped — receiver disconnected");
1346            }
1347        }
1348
1349        // Call init and handle initial command
1350        if let Some(cmd) = self.model.init() {
1351            Self::handle_command_tracked(
1352                cmd.into(),
1353                tx.clone(),
1354                &task_tracker,
1355                cancel_token.clone(),
1356            );
1357        }
1358
1359        // Render initial view
1360        let mut last_view = String::new();
1361        self.render(stdout, &mut last_view)?;
1362
1363        // Frame timing
1364        let frame_duration = Duration::from_secs_f64(1.0 / self.options.fps as f64);
1365        let mut frame_interval = tokio::time::interval(frame_duration);
1366
1367        // Event loop
1368        loop {
1369            tokio::select! {
1370                // Check for terminal events via channel
1371                Some(event) = event_rx.recv(), if !self.options.custom_io => {
1372                    match event {
1373                        Event::Key(key_event) => {
1374                            // Only handle key press events, not release
1375                            if key_event.kind != KeyEventKind::Press {
1376                                continue;
1377                            }
1378
1379                            let key_msg = from_crossterm_key(key_event.code, key_event.modifiers);
1380
1381                            // Handle Ctrl+C specially
1382                            if key_msg.key_type == crate::KeyType::CtrlC {
1383                                if tx.send(Message::new(InterruptMsg)).await.is_err() {
1384                                    debug!(target: "bubbletea::event", "async interrupt message dropped — receiver disconnected");
1385                                }
1386                            } else if tx.send(Message::new(key_msg)).await.is_err() {
1387                                debug!(target: "bubbletea::event", "async key message dropped — receiver disconnected");
1388                            }
1389                        }
1390                        Event::Mouse(mouse_event) => {
1391                            let mouse_msg = from_crossterm_mouse(mouse_event);
1392                            if tx.send(Message::new(mouse_msg)).await.is_err() {
1393                                debug!(target: "bubbletea::event", "async mouse message dropped — receiver disconnected");
1394                            }
1395                        }
1396                        Event::Resize(width, height) => {
1397                            if tx.send(Message::new(WindowSizeMsg { width, height })).await.is_err() {
1398                                debug!(target: "bubbletea::event", "async resize message dropped — receiver disconnected");
1399                            }
1400                        }
1401                        Event::FocusGained => {
1402                            if tx.send(Message::new(FocusMsg)).await.is_err() {
1403                                debug!(target: "bubbletea::event", "async focus message dropped — receiver disconnected");
1404                            }
1405                        }
1406                        Event::FocusLost => {
1407                            if tx.send(Message::new(BlurMsg)).await.is_err() {
1408                                debug!(target: "bubbletea::event", "async blur message dropped — receiver disconnected");
1409                            }
1410                        }
1411                        Event::Paste(text) => {
1412                            // Send as a key message with paste flag
1413                            let key_msg = KeyMsg {
1414                                key_type: crate::KeyType::Runes,
1415                                runes: text.chars().collect(),
1416                                alt: false,
1417                                paste: true,
1418                            };
1419                            if tx.send(Message::new(key_msg)).await.is_err() {
1420                                debug!(target: "bubbletea::event", "async paste message dropped — receiver disconnected");
1421                            }
1422                        }
1423                    }
1424                }
1425
1426                // Process incoming messages
1427                Some(msg) = rx.recv() => {
1428                    // Check for quit message - initiate graceful shutdown
1429                    if msg.is::<QuitMsg>() {
1430                        Self::graceful_shutdown(&cancel_token, &task_tracker).await;
1431                        return Ok(self.model);
1432                    }
1433
1434                    // Check for interrupt message (Ctrl+C) - initiate graceful shutdown
1435                    if msg.is::<InterruptMsg>() {
1436                        Self::graceful_shutdown(&cancel_token, &task_tracker).await;
1437                        return Ok(self.model);
1438                    }
1439
1440                    // Handle batch message (already handled in handle_command_tracked)
1441                    if msg.is::<BatchMsg>() {
1442                        continue;
1443                    }
1444
1445                    // Handle window title
1446                    if let Some(title_msg) = msg.downcast_ref::<SetWindowTitleMsg>() {
1447                        execute!(stdout, terminal::SetTitle(&title_msg.0))?;
1448                        continue;
1449                    }
1450
1451                    // Handle window size request
1452                    if msg.is::<RequestWindowSizeMsg>() {
1453                        if !self.options.custom_io {
1454                            let (width, height) = terminal::size()?;
1455                            if tx.send(Message::new(WindowSizeMsg { width, height })).await.is_err() {
1456                                debug!(target: "bubbletea::event", "async window size response dropped — receiver disconnected");
1457                            }
1458                        }
1459                        continue;
1460                    }
1461
1462                    // Handle print line message (only when not in alt screen)
1463                    if let Some(print_msg) = msg.downcast_ref::<PrintLineMsg>() {
1464                        if !self.options.alt_screen {
1465                            // Print each line above the TUI
1466                            for line in print_msg.0.lines() {
1467                                let _ = writeln!(stdout, "{}", line);
1468                            }
1469                            let _ = stdout.flush();
1470                            // Force a full re-render since we printed above
1471                            last_view.clear();
1472                        }
1473                        self.render(stdout, &mut last_view)?;
1474                        continue;
1475                    }
1476
1477                    // Handle release terminal
1478                    if msg.is::<ReleaseTerminalMsg>() {
1479                        if !self.options.custom_io {
1480                            // Disable features in reverse order
1481                            if self.options.bracketed_paste {
1482                                let _ = execute!(stdout, event::DisableBracketedPaste);
1483                            }
1484                            if self.options.report_focus {
1485                                let _ = execute!(stdout, event::DisableFocusChange);
1486                            }
1487                            if self.options.mouse_all_motion || self.options.mouse_cell_motion {
1488                                let _ = execute!(stdout, DisableMouseCapture);
1489                            }
1490                            let _ = execute!(stdout, Show);
1491                            if self.options.alt_screen {
1492                                let _ = execute!(stdout, LeaveAlternateScreen);
1493                            }
1494                            let _ = disable_raw_mode();
1495                        }
1496                        continue;
1497                    }
1498
1499                    // Handle restore terminal
1500                    if msg.is::<RestoreTerminalMsg>() {
1501                        if !self.options.custom_io {
1502                            // Re-enable features in original order
1503                            let _ = enable_raw_mode();
1504                            if self.options.alt_screen {
1505                                let _ = execute!(stdout, EnterAlternateScreen);
1506                            }
1507                            let _ = execute!(stdout, Hide);
1508                            if self.options.mouse_all_motion {
1509                                let _ = execute!(stdout, EnableMouseCapture);
1510                            } else if self.options.mouse_cell_motion {
1511                                let _ = execute!(stdout, EnableMouseCapture);
1512                            }
1513                            if self.options.report_focus {
1514                                let _ = execute!(stdout, event::EnableFocusChange);
1515                            }
1516                            if self.options.bracketed_paste {
1517                                let _ = execute!(stdout, event::EnableBracketedPaste);
1518                            }
1519                            // Force a full re-render
1520                            last_view.clear();
1521                        }
1522                        self.render(stdout, &mut last_view)?;
1523                        continue;
1524                    }
1525
1526                    // Update model
1527                    if let Some(cmd) = self.model.update(msg) {
1528                        Self::handle_command_tracked(
1529                            cmd.into(),
1530                            tx.clone(),
1531                            &task_tracker,
1532                            cancel_token.clone(),
1533                        );
1534                    }
1535
1536                    // Render after processing message
1537                    self.render(stdout, &mut last_view)?;
1538                }
1539
1540                // Frame tick for rendering
1541                _ = frame_interval.tick() => {
1542                    // Periodic render check (in case we missed something)
1543                }
1544            }
1545        }
1546    }
1547
1548    /// Perform graceful shutdown: cancel all tasks and wait for them to complete.
1549    async fn graceful_shutdown(cancel_token: &CancellationToken, task_tracker: &TaskTracker) {
1550        // Signal all tasks to cancel
1551        cancel_token.cancel();
1552
1553        // Close the tracker to prevent new tasks
1554        task_tracker.close();
1555
1556        // Wait for all tasks with a timeout (5 seconds)
1557        let shutdown_timeout = Duration::from_secs(5);
1558        let _ = tokio::time::timeout(shutdown_timeout, task_tracker.wait()).await;
1559    }
1560
1561    /// Handle a command with task tracking and cancellation support.
1562    fn handle_command_tracked(
1563        cmd: CommandKind,
1564        tx: tokio::sync::mpsc::Sender<Message>,
1565        tracker: &TaskTracker,
1566        cancel_token: CancellationToken,
1567    ) {
1568        // Clone tracker and cancel token so batch sub-commands can also be
1569        // tracked and cancelled during graceful shutdown.
1570        let batch_tracker = tracker.clone();
1571        let batch_cancel = cancel_token.clone();
1572        tracker.spawn(async move {
1573            tokio::select! {
1574                // Execute the command
1575                result = cmd.execute() => {
1576                    if let Some(msg) = result {
1577                        // Handle batch and sequence messages specially
1578                        if msg.is::<BatchMsg>() {
1579                            if let Some(batch) = msg.downcast::<BatchMsg>() {
1580                                for cmd in batch.0 {
1581                                    let tx_clone = tx.clone();
1582                                    let cancel = batch_cancel.clone();
1583                                    // Batch commands are now tracked via TaskTracker
1584                                    // and respect the cancellation token for clean shutdown.
1585                                    batch_tracker.spawn(async move {
1586                                        tokio::select! {
1587                                            result = async {
1588                                                let cmd_kind: CommandKind = cmd.into();
1589                                                cmd_kind.execute().await
1590                                            } => {
1591                                                if let Some(msg) = result {
1592                                                    if tx_clone.send(msg).await.is_err() {
1593                                                        debug!(target: "bubbletea::command", "async batch command result dropped — receiver disconnected");
1594                                                    }
1595                                                }
1596                                            }
1597                                            _ = cancel.cancelled() => {
1598                                                debug!(target: "bubbletea::command", "async batch command cancelled during shutdown");
1599                                            }
1600                                        }
1601                                    });
1602                                }
1603                            }
1604                        } else if msg.is::<SequenceMsg>() {
1605                            if let Some(seq) = msg.downcast::<SequenceMsg>() {
1606                                for cmd in seq.0 {
1607                                    let cmd_kind: CommandKind = cmd.into();
1608                                    if let Some(msg) = cmd_kind.execute().await {
1609                                        if tx.send(msg).await.is_err() {
1610                                            debug!(target: "bubbletea::command", "async sequence command result dropped — receiver disconnected");
1611                                            break;
1612                                        }
1613                                    }
1614                                }
1615                            }
1616                        } else if tx.send(msg).await.is_err() {
1617                            debug!(target: "bubbletea::command", "async command result dropped — receiver disconnected");
1618                        }
1619                    }
1620                }
1621                // Cancellation requested - exit cleanly
1622                _ = cancel_token.cancelled() => {
1623                    // Command cancelled, cleanup happens automatically
1624                }
1625            }
1626        });
1627    }
1628
1629    /// Handle a command asynchronously using tokio::spawn (legacy, without tracking).
1630    #[allow(dead_code)]
1631    fn handle_command_async(&self, cmd: CommandKind, tx: tokio::sync::mpsc::Sender<Message>) {
1632        tokio::spawn(async move {
1633            if let Some(msg) = cmd.execute().await {
1634                // Handle batch and sequence messages specially
1635                if msg.is::<BatchMsg>() {
1636                    if let Some(batch) = msg.downcast::<BatchMsg>() {
1637                        for cmd in batch.0 {
1638                            let tx_clone = tx.clone();
1639                            tokio::spawn(async move {
1640                                let cmd_kind: CommandKind = cmd.into();
1641                                if let Some(msg) = cmd_kind.execute().await {
1642                                    if tx_clone.send(msg).await.is_err() {
1643                                        debug!(target: "bubbletea::command", "legacy async batch command result dropped — receiver disconnected");
1644                                    }
1645                                }
1646                            });
1647                        }
1648                    }
1649                } else if msg.is::<SequenceMsg>() {
1650                    if let Some(seq) = msg.downcast::<SequenceMsg>() {
1651                        for cmd in seq.0 {
1652                            let cmd_kind: CommandKind = cmd.into();
1653                            if let Some(msg) = cmd_kind.execute().await {
1654                                if tx.send(msg).await.is_err() {
1655                                    debug!(target: "bubbletea::command", "legacy async sequence command result dropped — receiver disconnected");
1656                                    break;
1657                                }
1658                            }
1659                        }
1660                    }
1661                } else if tx.send(msg).await.is_err() {
1662                    debug!(target: "bubbletea::command", "legacy async command result dropped — receiver disconnected");
1663                }
1664            }
1665        });
1666    }
1667}
1668
1669// =============================================================================
1670// Custom Input Parsing (for custom I/O mode)
1671// =============================================================================
1672
1673struct InputParser {
1674    buffer: Vec<u8>,
1675}
1676
1677impl InputParser {
1678    fn new() -> Self {
1679        Self { buffer: Vec::new() }
1680    }
1681
1682    /// Maximum input buffer size (1 MB). Prevents memory exhaustion from
1683    /// malformed escape sequences or malicious input streams.
1684    const MAX_BUFFER: usize = 1024 * 1024;
1685
1686    fn push_bytes(&mut self, bytes: &[u8], can_have_more_data: bool) -> Vec<Message> {
1687        if !bytes.is_empty() {
1688            if self.buffer.len() + bytes.len() > Self::MAX_BUFFER {
1689                debug!(
1690                    target: "bubbletea::input",
1691                    "Input buffer exceeded 1MB limit, draining"
1692                );
1693                self.buffer.clear();
1694            }
1695            self.buffer.extend_from_slice(bytes);
1696        }
1697
1698        let mut messages = Vec::new();
1699        loop {
1700            if self.buffer.is_empty() {
1701                break;
1702            }
1703
1704            match parse_one_message(&self.buffer, can_have_more_data) {
1705                ParseOutcome::NeedMore => break,
1706                ParseOutcome::Parsed(consumed, msg) => {
1707                    self.buffer.drain(0..consumed);
1708                    if let Some(msg) = msg {
1709                        messages.push(msg);
1710                    }
1711                }
1712            }
1713        }
1714
1715        messages
1716    }
1717
1718    fn flush(&mut self) -> Vec<Message> {
1719        let mut messages = Vec::new();
1720        loop {
1721            if self.buffer.is_empty() {
1722                break;
1723            }
1724
1725            match parse_one_message(&self.buffer, false) {
1726                ParseOutcome::NeedMore => break,
1727                ParseOutcome::Parsed(consumed, msg) => {
1728                    self.buffer.drain(0..consumed);
1729                    if let Some(msg) = msg {
1730                        messages.push(msg);
1731                    }
1732                }
1733            }
1734        }
1735        messages
1736    }
1737}
1738
1739enum ParseOutcome {
1740    NeedMore,
1741    Parsed(usize, Option<Message>),
1742}
1743
1744fn parse_one_message(buf: &[u8], can_have_more_data: bool) -> ParseOutcome {
1745    if buf.is_empty() {
1746        return ParseOutcome::NeedMore;
1747    }
1748
1749    if let Some(outcome) = parse_mouse_event(buf, can_have_more_data) {
1750        return outcome;
1751    }
1752
1753    if let Some(outcome) = parse_focus_event(buf, can_have_more_data) {
1754        return outcome;
1755    }
1756
1757    if let Some(outcome) = parse_bracketed_paste(buf, can_have_more_data) {
1758        return outcome;
1759    }
1760
1761    if let Some(outcome) = parse_key_sequence(buf, can_have_more_data) {
1762        return outcome;
1763    }
1764
1765    parse_runes_or_control(buf, can_have_more_data)
1766}
1767
1768fn parse_mouse_event(buf: &[u8], can_have_more_data: bool) -> Option<ParseOutcome> {
1769    if buf.starts_with(b"\x1b[M") {
1770        if buf.len() < 6 {
1771            return Some(if can_have_more_data {
1772                ParseOutcome::NeedMore
1773            } else {
1774                ParseOutcome::Parsed(1, Some(replacement_message()))
1775            });
1776        }
1777        let seq = &buf[..6];
1778        return Some(match crate::mouse::parse_mouse_event_sequence(seq) {
1779            Ok(msg) => ParseOutcome::Parsed(6, Some(Message::new(msg))),
1780            Err(_) => ParseOutcome::Parsed(1, Some(replacement_message())),
1781        });
1782    }
1783
1784    if buf.starts_with(b"\x1b[<") {
1785        if let Some(end_idx) = buf.iter().position(|b| *b == b'M' || *b == b'm') {
1786            let seq = &buf[..=end_idx];
1787            return Some(match crate::mouse::parse_mouse_event_sequence(seq) {
1788                Ok(msg) => ParseOutcome::Parsed(seq.len(), Some(Message::new(msg))),
1789                Err(_) => ParseOutcome::Parsed(1, Some(replacement_message())),
1790            });
1791        }
1792        return Some(if can_have_more_data {
1793            ParseOutcome::NeedMore
1794        } else {
1795            ParseOutcome::Parsed(1, Some(replacement_message()))
1796        });
1797    }
1798
1799    None
1800}
1801
1802fn parse_focus_event(buf: &[u8], can_have_more_data: bool) -> Option<ParseOutcome> {
1803    if buf.len() < 3 && buf.starts_with(b"\x1b[") && can_have_more_data {
1804        return Some(ParseOutcome::NeedMore);
1805    }
1806
1807    if buf.starts_with(b"\x1b[I") {
1808        return Some(ParseOutcome::Parsed(3, Some(Message::new(FocusMsg))));
1809    }
1810
1811    if buf.starts_with(b"\x1b[O") {
1812        return Some(ParseOutcome::Parsed(3, Some(Message::new(BlurMsg))));
1813    }
1814
1815    None
1816}
1817
1818fn parse_bracketed_paste(buf: &[u8], can_have_more_data: bool) -> Option<ParseOutcome> {
1819    const BP_START: &[u8] = b"\x1b[200~";
1820    const BP_END: &[u8] = b"\x1b[201~";
1821
1822    if !buf.starts_with(BP_START) {
1823        return None;
1824    }
1825
1826    if let Some(idx) = buf.windows(BP_END.len()).position(|w| w == BP_END) {
1827        let content = &buf[BP_START.len()..idx];
1828        let text = String::from_utf8_lossy(content);
1829        let runes = text.chars().collect::<Vec<char>>();
1830        let key = KeyMsg::from_runes(runes).with_paste();
1831        let total_len = idx + BP_END.len();
1832        return Some(ParseOutcome::Parsed(total_len, Some(message_from_key(key))));
1833    }
1834
1835    Some(if can_have_more_data {
1836        ParseOutcome::NeedMore
1837    } else {
1838        let content = &buf[BP_START.len()..];
1839        let text = String::from_utf8_lossy(content);
1840        let runes = text.chars().collect::<Vec<char>>();
1841        let key = KeyMsg::from_runes(runes).with_paste();
1842        ParseOutcome::Parsed(buf.len(), Some(message_from_key(key)))
1843    })
1844}
1845
1846fn parse_key_sequence(buf: &[u8], can_have_more_data: bool) -> Option<ParseOutcome> {
1847    if let Some((key, len)) = crate::key::parse_sequence_prefix(buf) {
1848        return Some(ParseOutcome::Parsed(len, Some(message_from_key(key))));
1849    }
1850
1851    // Check if it's a prefix of a known sequence
1852    if can_have_more_data && is_sequence_prefix(buf) {
1853        return Some(ParseOutcome::NeedMore);
1854    }
1855
1856    if buf.starts_with(b"\x1b")
1857        && let Some((mut key, len)) = crate::key::parse_sequence_prefix(&buf[1..])
1858    {
1859        if !key.alt {
1860            key = key.with_alt();
1861        }
1862        return Some(ParseOutcome::Parsed(len + 1, Some(message_from_key(key))));
1863    }
1864
1865    None
1866}
1867
1868fn parse_runes_or_control(buf: &[u8], can_have_more_data: bool) -> ParseOutcome {
1869    let mut alt = false;
1870    let mut idx = 0;
1871
1872    if buf[0] == 0x1b {
1873        if buf.len() == 1 {
1874            return if can_have_more_data {
1875                ParseOutcome::NeedMore
1876            } else {
1877                ParseOutcome::Parsed(1, Some(message_from_key(KeyMsg::from_type(KeyType::Esc))))
1878            };
1879        }
1880        alt = true;
1881        idx = 1;
1882    }
1883
1884    if idx >= buf.len() {
1885        return ParseOutcome::NeedMore;
1886    }
1887
1888    if let Some(key_type) = control_key_type(buf[idx]) {
1889        let mut key = KeyMsg::from_type(key_type);
1890        if alt {
1891            key = key.with_alt();
1892        }
1893        return ParseOutcome::Parsed(idx + 1, Some(message_from_key(key)));
1894    }
1895
1896    let mut runes = Vec::new();
1897    let mut i = idx;
1898    while i < buf.len() {
1899        let b = buf[i];
1900        if is_control_or_space(b) {
1901            break;
1902        }
1903
1904        let (ch, width, valid) = match decode_char(&buf[i..], can_have_more_data) {
1905            DecodeOutcome::NeedMore => return ParseOutcome::NeedMore,
1906            DecodeOutcome::Decoded(ch, width, valid) => (ch, width, valid),
1907        };
1908
1909        if !valid {
1910            runes.push(std::char::REPLACEMENT_CHARACTER);
1911            i += 1;
1912        } else {
1913            runes.push(ch);
1914            i += width;
1915        }
1916
1917        if alt {
1918            break;
1919        }
1920    }
1921
1922    if !runes.is_empty() {
1923        let mut key = KeyMsg::from_runes(runes);
1924        if alt {
1925            key = key.with_alt();
1926        }
1927        return ParseOutcome::Parsed(i, Some(message_from_key(key)));
1928    }
1929
1930    ParseOutcome::Parsed(1, Some(replacement_message()))
1931}
1932
1933fn control_key_type(byte: u8) -> Option<KeyType> {
1934    match byte {
1935        0x00 => Some(KeyType::Null),
1936        0x01 => Some(KeyType::CtrlA),
1937        0x02 => Some(KeyType::CtrlB),
1938        0x03 => Some(KeyType::CtrlC),
1939        0x04 => Some(KeyType::CtrlD),
1940        0x05 => Some(KeyType::CtrlE),
1941        0x06 => Some(KeyType::CtrlF),
1942        0x07 => Some(KeyType::CtrlG),
1943        0x08 => Some(KeyType::CtrlH),
1944        0x09 => Some(KeyType::Tab),
1945        0x0A => Some(KeyType::CtrlJ),
1946        0x0B => Some(KeyType::CtrlK),
1947        0x0C => Some(KeyType::CtrlL),
1948        0x0D => Some(KeyType::Enter),
1949        0x0E => Some(KeyType::CtrlN),
1950        0x0F => Some(KeyType::CtrlO),
1951        0x10 => Some(KeyType::CtrlP),
1952        0x11 => Some(KeyType::CtrlQ),
1953        0x12 => Some(KeyType::CtrlR),
1954        0x13 => Some(KeyType::CtrlS),
1955        0x14 => Some(KeyType::CtrlT),
1956        0x15 => Some(KeyType::CtrlU),
1957        0x16 => Some(KeyType::CtrlV),
1958        0x17 => Some(KeyType::CtrlW),
1959        0x18 => Some(KeyType::CtrlX),
1960        0x19 => Some(KeyType::CtrlY),
1961        0x1A => Some(KeyType::CtrlZ),
1962        0x1B => Some(KeyType::Esc),
1963        0x1C => Some(KeyType::CtrlBackslash),
1964        0x1D => Some(KeyType::CtrlCloseBracket),
1965        0x1E => Some(KeyType::CtrlCaret),
1966        0x1F => Some(KeyType::CtrlUnderscore),
1967        0x20 => Some(KeyType::Space),
1968        0x7F => Some(KeyType::Backspace),
1969        _ => None,
1970    }
1971}
1972
1973fn is_control_or_space(byte: u8) -> bool {
1974    byte <= 0x1F || byte == 0x7F || byte == b' '
1975}
1976
1977enum DecodeOutcome {
1978    NeedMore,
1979    Decoded(char, usize, bool),
1980}
1981
1982fn decode_char(input: &[u8], can_have_more_data: bool) -> DecodeOutcome {
1983    let first = input[0];
1984    let width = if first < 0x80 {
1985        1
1986    } else if (first & 0xE0) == 0xC0 {
1987        2
1988    } else if (first & 0xF0) == 0xE0 {
1989        3
1990    } else if (first & 0xF8) == 0xF0 {
1991        4
1992    } else {
1993        return DecodeOutcome::Decoded(std::char::REPLACEMENT_CHARACTER, 1, false);
1994    };
1995
1996    if input.len() < width {
1997        return if can_have_more_data {
1998            DecodeOutcome::NeedMore
1999        } else {
2000            DecodeOutcome::Decoded(std::char::REPLACEMENT_CHARACTER, 1, false)
2001        };
2002    }
2003
2004    match std::str::from_utf8(&input[..width]) {
2005        Ok(s) => {
2006            let ch = s.chars().next().unwrap_or(std::char::REPLACEMENT_CHARACTER);
2007            DecodeOutcome::Decoded(ch, width, true)
2008        }
2009        Err(_) => DecodeOutcome::Decoded(std::char::REPLACEMENT_CHARACTER, 1, false),
2010    }
2011}
2012
2013fn message_from_key(key: KeyMsg) -> Message {
2014    if key.key_type == KeyType::CtrlC {
2015        Message::new(InterruptMsg)
2016    } else {
2017        Message::new(key)
2018    }
2019}
2020
2021fn replacement_message() -> Message {
2022    Message::new(KeyMsg::from_char(std::char::REPLACEMENT_CHARACTER))
2023}
2024
2025#[cfg(test)]
2026mod tests {
2027    use super::*;
2028    use tokio_util::sync::CancellationToken;
2029    use tokio_util::task::TaskTracker;
2030
2031    struct TestModel {
2032        count: i32,
2033    }
2034
2035    impl Model for TestModel {
2036        fn init(&self) -> Option<Cmd> {
2037            None
2038        }
2039
2040        fn update(&mut self, msg: Message) -> Option<Cmd> {
2041            if let Some(n) = msg.downcast::<i32>() {
2042                self.count += n;
2043            }
2044            None
2045        }
2046
2047        fn view(&self) -> String {
2048            format!("Count: {}", self.count)
2049        }
2050    }
2051
2052    #[test]
2053    fn test_program_options_default() {
2054        let opts = ProgramOptions::default();
2055        assert!(!opts.alt_screen);
2056        assert!(!opts.mouse_cell_motion);
2057        assert!(opts.bracketed_paste);
2058        assert_eq!(opts.fps, 60);
2059    }
2060
2061    #[test]
2062    fn test_program_builder() {
2063        let model = TestModel { count: 0 };
2064        let program = Program::new(model)
2065            .with_alt_screen()
2066            .with_mouse_cell_motion()
2067            .with_fps(30);
2068
2069        assert!(program.options.alt_screen);
2070        assert!(program.options.mouse_cell_motion);
2071        assert_eq!(program.options.fps, 30);
2072    }
2073
2074    #[test]
2075    fn test_program_fps_max() {
2076        let model = TestModel { count: 0 };
2077        let program = Program::new(model).with_fps(200);
2078        assert_eq!(program.options.fps, 120); // Capped at 120
2079    }
2080
2081    #[test]
2082    fn test_program_fps_min() {
2083        let model = TestModel { count: 0 };
2084        let program = Program::new(model).with_fps(0);
2085        assert_eq!(program.options.fps, 1); // Clamped to minimum of 1 to avoid division by zero
2086    }
2087
2088    // === Bracketed Paste Parsing Tests ===
2089
2090    #[test]
2091    fn test_parse_bracketed_paste_basic() {
2092        // Bracketed paste sequence: ESC[200~ ... ESC[201~
2093        let input = b"\x1b[200~hello world\x1b[201~";
2094        let result = parse_bracketed_paste(input, false);
2095
2096        assert!(result.is_some());
2097        if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2098            assert_eq!(len, input.len());
2099            let key = msg.downcast_ref::<KeyMsg>().unwrap();
2100            assert!(key.paste, "Key should have paste flag set");
2101            assert_eq!(
2102                key.runes,
2103                vec!['h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd']
2104            );
2105        } else {
2106            panic!("Expected Parsed outcome");
2107        }
2108    }
2109
2110    #[test]
2111    fn test_parse_bracketed_paste_empty() {
2112        let input = b"\x1b[200~\x1b[201~";
2113        let result = parse_bracketed_paste(input, false);
2114
2115        assert!(result.is_some());
2116        if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2117            assert_eq!(len, input.len());
2118            let key = msg.downcast_ref::<KeyMsg>().unwrap();
2119            assert!(key.paste);
2120            assert!(key.runes.is_empty());
2121        } else {
2122            panic!("Expected Parsed outcome");
2123        }
2124    }
2125
2126    #[test]
2127    fn test_parse_bracketed_paste_multiline() {
2128        let input = b"\x1b[200~line1\nline2\nline3\x1b[201~";
2129        let result = parse_bracketed_paste(input, false);
2130
2131        assert!(result.is_some());
2132        if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2133            assert_eq!(len, input.len());
2134            let key = msg.downcast_ref::<KeyMsg>().unwrap();
2135            assert!(key.paste);
2136            let text: String = key.runes.iter().collect();
2137            assert_eq!(text, "line1\nline2\nline3");
2138        } else {
2139            panic!("Expected Parsed outcome");
2140        }
2141    }
2142
2143    #[test]
2144    fn test_parse_bracketed_paste_unicode() {
2145        let input = "\x1b[200~hello 世界 🌍\x1b[201~".as_bytes();
2146        let result = parse_bracketed_paste(input, false);
2147
2148        assert!(result.is_some());
2149        if let Some(ParseOutcome::Parsed(_, Some(msg))) = result {
2150            let key = msg.downcast_ref::<KeyMsg>().unwrap();
2151            assert!(key.paste);
2152            let text: String = key.runes.iter().collect();
2153            assert_eq!(text, "hello 世界 🌍");
2154        } else {
2155            panic!("Expected Parsed outcome");
2156        }
2157    }
2158
2159    #[test]
2160    fn test_parse_bracketed_paste_incomplete() {
2161        // Missing end sequence, with more data expected
2162        let input = b"\x1b[200~hello";
2163        let result = parse_bracketed_paste(input, true);
2164
2165        assert!(matches!(result, Some(ParseOutcome::NeedMore)));
2166    }
2167
2168    #[test]
2169    fn test_parse_bracketed_paste_incomplete_no_more_data() {
2170        // Missing end sequence, no more data expected - should parse what we have
2171        let input = b"\x1b[200~hello";
2172        let result = parse_bracketed_paste(input, false);
2173
2174        assert!(result.is_some());
2175        if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2176            assert_eq!(len, input.len());
2177            let key = msg.downcast_ref::<KeyMsg>().unwrap();
2178            assert!(key.paste);
2179            let text: String = key.runes.iter().collect();
2180            assert_eq!(text, "hello");
2181        } else {
2182            panic!("Expected Parsed outcome");
2183        }
2184    }
2185
2186    #[test]
2187    fn test_parse_bracketed_paste_not_bracketed() {
2188        // Regular input, not bracketed paste
2189        let input = b"hello";
2190        let result = parse_bracketed_paste(input, false);
2191        assert!(result.is_none(), "Non-paste input should return None");
2192    }
2193
2194    #[test]
2195    fn test_parse_bracketed_paste_large() {
2196        // Large paste (simulating a big paste operation)
2197        let content = "a".repeat(10000);
2198        let input = format!("\x1b[200~{}\x1b[201~", content);
2199        let result = parse_bracketed_paste(input.as_bytes(), false);
2200
2201        assert!(result.is_some());
2202        if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2203            assert_eq!(len, input.len());
2204            let key = msg.downcast_ref::<KeyMsg>().unwrap();
2205            assert!(key.paste);
2206            assert_eq!(key.runes.len(), 10000);
2207        } else {
2208            panic!("Expected Parsed outcome");
2209        }
2210    }
2211
2212    // === Thread Pool / spawn_batch Tests (bd-3ut7) ===
2213
2214    #[test]
2215    fn spawn_batch_executes_closure() {
2216        use std::sync::Arc;
2217        use std::sync::atomic::{AtomicBool, Ordering};
2218
2219        let executed = Arc::new(AtomicBool::new(false));
2220        let clone = executed.clone();
2221
2222        spawn_batch(move || {
2223            clone.store(true, Ordering::SeqCst);
2224        });
2225
2226        // Allow time for the task to run
2227        thread::sleep(Duration::from_millis(200));
2228        assert!(
2229            executed.load(Ordering::SeqCst),
2230            "spawn_batch should execute the closure"
2231        );
2232    }
2233
2234    #[test]
2235    fn spawn_batch_handles_many_concurrent_tasks() {
2236        use std::sync::Arc;
2237        use std::sync::atomic::{AtomicUsize, Ordering};
2238
2239        let counter = Arc::new(AtomicUsize::new(0));
2240        let task_count = 200;
2241
2242        for _ in 0..task_count {
2243            let c = counter.clone();
2244            spawn_batch(move || {
2245                c.fetch_add(1, Ordering::SeqCst);
2246            });
2247        }
2248
2249        // Wait for all tasks with generous timeout
2250        let deadline = std::time::Instant::now() + Duration::from_secs(5);
2251        while counter.load(Ordering::SeqCst) < task_count && std::time::Instant::now() < deadline {
2252            thread::sleep(Duration::from_millis(50));
2253        }
2254
2255        assert_eq!(
2256            counter.load(Ordering::SeqCst),
2257            task_count,
2258            "All {task_count} tasks should complete"
2259        );
2260    }
2261
2262    #[test]
2263    fn handle_command_batch_executes_all_subcommands() {
2264        let model = TestModel { count: 0 };
2265        let program = Program::new(model);
2266        let (tx, rx) = mpsc::channel();
2267        let command_threads = Arc::new(Mutex::new(Vec::new()));
2268
2269        // Build a batch of 50 sub-commands, each returning a unique i32
2270        let cmds: Vec<Option<Cmd>> = (0..50)
2271            .map(|i| Some(Cmd::new(move || Message::new(i))))
2272            .collect();
2273        let batch_cmd = crate::batch(cmds).unwrap();
2274
2275        program.handle_command(batch_cmd, tx, Arc::clone(&command_threads));
2276
2277        // Collect all 50 results
2278        let mut results = Vec::new();
2279        let deadline = std::time::Instant::now() + Duration::from_secs(5);
2280        while results.len() < 50 && std::time::Instant::now() < deadline {
2281            if let Ok(msg) = rx.recv_timeout(Duration::from_millis(100)) {
2282                results.push(msg.downcast::<i32>().unwrap());
2283            }
2284        }
2285
2286        assert_eq!(
2287            results.len(),
2288            50,
2289            "All 50 batch sub-commands should produce results"
2290        );
2291        results.sort();
2292        let expected: Vec<i32> = (0..50).collect();
2293        assert_eq!(
2294            results, expected,
2295            "Each sub-command value should be received exactly once"
2296        );
2297    }
2298
2299    #[cfg(feature = "thread-pool")]
2300    #[test]
2301    fn handle_command_batch_bounded_parallelism() {
2302        use std::sync::atomic::{AtomicUsize, Ordering};
2303
2304        let model = TestModel { count: 0 };
2305        let program = Program::new(model);
2306        let (tx, rx) = mpsc::channel();
2307        let command_threads = Arc::new(Mutex::new(Vec::new()));
2308
2309        let active = Arc::new(AtomicUsize::new(0));
2310        let max_active = Arc::new(AtomicUsize::new(0));
2311
2312        let task_count: usize = 100;
2313        let cmds: Vec<Option<Cmd>> = (0..task_count)
2314            .map(|_| {
2315                let a = active.clone();
2316                let m = max_active.clone();
2317                Some(Cmd::new(move || {
2318                    let current = a.fetch_add(1, Ordering::SeqCst) + 1;
2319                    m.fetch_max(current, Ordering::SeqCst);
2320                    thread::sleep(Duration::from_millis(5));
2321                    a.fetch_sub(1, Ordering::SeqCst);
2322                    Message::new(1i32)
2323                }))
2324            })
2325            .collect();
2326        let batch_cmd = crate::batch(cmds).unwrap();
2327
2328        program.handle_command(batch_cmd, tx, Arc::clone(&command_threads));
2329
2330        // Wait for all results
2331        let mut count = 0usize;
2332        let deadline = std::time::Instant::now() + Duration::from_secs(15);
2333        while count < task_count && std::time::Instant::now() < deadline {
2334            if let Ok(_msg) = rx.recv_timeout(Duration::from_millis(100)) {
2335                count += 1;
2336            }
2337        }
2338
2339        assert_eq!(count, task_count, "All batch commands should complete");
2340
2341        let observed_max = max_active.load(Ordering::SeqCst);
2342        let num_cpus = thread::available_parallelism()
2343            .map(|n| n.get())
2344            .unwrap_or(4);
2345
2346        // With rayon, max concurrent tasks should be bounded by the pool size.
2347        // Allow num_cpus + 2 to account for the outer thread::spawn and scheduling jitter.
2348        assert!(
2349            observed_max <= num_cpus + 2,
2350            "Expected bounded parallelism near {num_cpus}, but observed {observed_max}. \
2351             Without thread-pool feature, this would be near {task_count}."
2352        );
2353    }
2354
2355    #[test]
2356    fn handle_command_large_batch_no_panic() {
2357        let model = TestModel { count: 0 };
2358        let program = Program::new(model);
2359        let (tx, rx) = mpsc::channel();
2360        let command_threads = Arc::new(Mutex::new(Vec::new()));
2361
2362        // Create a large batch of 500 lightweight commands
2363        let cmds: Vec<Option<Cmd>> = (0..500)
2364            .map(|i| Some(Cmd::new(move || Message::new(i))))
2365            .collect();
2366        let batch_cmd = crate::batch(cmds).unwrap();
2367
2368        program.handle_command(batch_cmd, tx, Arc::clone(&command_threads));
2369
2370        // Collect results with timeout - don't need all, just verify no panic
2371        let mut count = 0usize;
2372        let deadline = std::time::Instant::now() + Duration::from_secs(10);
2373        while count < 500 && std::time::Instant::now() < deadline {
2374            if let Ok(_msg) = rx.recv_timeout(Duration::from_millis(50)) {
2375                count += 1;
2376            }
2377        }
2378
2379        assert_eq!(count, 500, "Large batch should complete without panic");
2380    }
2381
2382    // === Thread Lifecycle Tests (bd-1327) ===
2383
2384    #[test]
2385    fn test_thread_handles_captured() {
2386        // Verify that JoinHandle types can be stored in Option variables.
2387        // This is a compile-time check that the types work as expected.
2388        let handle: Option<thread::JoinHandle<()>> = Some(thread::spawn(|| {
2389            // Thread does nothing
2390        }));
2391
2392        assert!(handle.is_some(), "Handle should be captured");
2393
2394        // Join to clean up
2395        if let Some(h) = handle {
2396            h.join().expect("Thread should join successfully");
2397        }
2398    }
2399
2400    #[test]
2401    fn test_threads_exit_on_channel_drop() {
2402        use std::sync::Arc;
2403        use std::sync::atomic::{AtomicBool, Ordering};
2404
2405        // Create a flag to track if the thread has exited
2406        let thread_exited = Arc::new(AtomicBool::new(false));
2407        let thread_exited_clone = Arc::clone(&thread_exited);
2408
2409        // Create a channel like the event_loop does
2410        let (tx, rx) = mpsc::channel::<i32>();
2411
2412        // Spawn a thread that blocks on recv() like the external forwarder
2413        let handle = thread::spawn(move || {
2414            // This loop will exit when tx is dropped (recv() returns Err)
2415            while rx.recv().is_ok() {}
2416            thread_exited_clone.store(true, Ordering::SeqCst);
2417        });
2418
2419        // Thread should be running
2420        assert!(!thread_exited.load(Ordering::SeqCst));
2421
2422        // Drop the sender - this should cause recv() to return Err
2423        drop(tx);
2424
2425        // Join the thread - it should exit promptly
2426        handle
2427            .join()
2428            .expect("Thread should join after channel drop");
2429
2430        // Verify the thread actually ran its exit code
2431        assert!(
2432            thread_exited.load(Ordering::SeqCst),
2433            "Thread should have exited after channel drop"
2434        );
2435    }
2436
2437    #[test]
2438    fn test_shutdown_joins_all_threads() {
2439        use std::sync::Arc;
2440        use std::sync::atomic::{AtomicUsize, Ordering};
2441
2442        // Track how many threads have been joined
2443        let join_count = Arc::new(AtomicUsize::new(0));
2444
2445        // Create multiple thread handles like event_loop does
2446        let mut handles: Vec<thread::JoinHandle<()>> = Vec::new();
2447
2448        for i in 0..3 {
2449            let join_count_clone = Arc::clone(&join_count);
2450            handles.push(thread::spawn(move || {
2451                // Simulate some work
2452                thread::sleep(Duration::from_millis(10 * (i as u64 + 1)));
2453                join_count_clone.fetch_add(1, Ordering::SeqCst);
2454            }));
2455        }
2456
2457        // Join all threads (simulating shutdown behavior)
2458        for handle in handles {
2459            match handle.join() {
2460                Ok(()) => {} // Thread joined successfully
2461                Err(e) => panic!("Thread panicked during join: {:?}", e),
2462            }
2463        }
2464
2465        // All threads should have completed
2466        assert_eq!(
2467            join_count.load(Ordering::SeqCst),
2468            3,
2469            "All threads should have completed and been joined"
2470        );
2471    }
2472
2473    #[test]
2474    fn test_thread_panic_handled_gracefully() {
2475        // Spawn a thread that will panic
2476        let handle = thread::spawn(|| {
2477            panic!("Intentional panic for testing");
2478        });
2479
2480        // The join should return Err, not propagate the panic
2481        let result = handle.join();
2482        // Verify we can inspect the panic payload
2483        let e = result.expect_err("Join should return Err when thread panics");
2484        // The panic payload is available for logging
2485        let _panic_info = format!("{:?}", e);
2486        // In production code, this would be logged with tracing::warn!
2487    }
2488
2489    #[test]
2490    fn test_external_forwarder_pattern() {
2491        // Test the exact pattern used in event_loop for external message forwarding
2492        let (external_tx, external_rx) = mpsc::channel::<Message>();
2493        let (event_tx, event_rx) = mpsc::channel::<Message>();
2494
2495        // Spawn forwarder thread like event_loop does
2496        let tx_clone = event_tx.clone();
2497        let handle = thread::spawn(move || {
2498            while let Ok(msg) = external_rx.recv() {
2499                if tx_clone.send(msg).is_err() {
2500                    break;
2501                }
2502            }
2503        });
2504
2505        // Send some messages through the forwarder
2506        external_tx.send(Message::new(1i32)).unwrap();
2507        external_tx.send(Message::new(2i32)).unwrap();
2508        external_tx.send(Message::new(3i32)).unwrap();
2509
2510        // Drop the external sender to signal the forwarder to exit
2511        drop(external_tx);
2512
2513        // Join should complete because the thread exits when external_rx.recv() returns Err
2514        let join_result = handle.join();
2515        assert!(
2516            join_result.is_ok(),
2517            "Forwarder thread should exit cleanly when sender is dropped"
2518        );
2519
2520        // Verify messages were forwarded
2521        let mut received = Vec::new();
2522        while let Ok(msg) = event_rx.try_recv() {
2523            if let Some(&n) = msg.downcast_ref::<i32>() {
2524                received.push(n);
2525            }
2526        }
2527        assert_eq!(received, vec![1, 2, 3], "All messages should be forwarded");
2528    }
2529
2530    // === Async TaskTracker Integration Tests (bd-2i18) ===
2531
2532    #[test]
2533    fn test_task_tracker_spawn_blocking_tracks_thread() {
2534        // Verify that TaskTracker.spawn_blocking() actually tracks the spawned thread.
2535        // The key behavior: task_tracker.wait() should block until the spawned thread completes.
2536        use std::sync::Arc;
2537        use std::sync::atomic::{AtomicBool, Ordering};
2538
2539        let rt = tokio::runtime::Builder::new_current_thread()
2540            .enable_all()
2541            .build()
2542            .expect("Failed to create runtime");
2543
2544        let thread_completed = Arc::new(AtomicBool::new(false));
2545        let thread_completed_clone = Arc::clone(&thread_completed);
2546
2547        rt.block_on(async {
2548            let task_tracker = TaskTracker::new();
2549
2550            // Spawn a blocking task that takes some time
2551            task_tracker.spawn_blocking(move || {
2552                thread::sleep(Duration::from_millis(50));
2553                thread_completed_clone.store(true, Ordering::SeqCst);
2554            });
2555
2556            // Close the tracker (prevents new tasks)
2557            task_tracker.close();
2558
2559            // Wait for all tracked tasks - should block until thread completes
2560            task_tracker.wait().await;
2561
2562            // After wait() returns, the thread should have completed
2563            assert!(
2564                thread_completed.load(Ordering::SeqCst),
2565                "spawn_blocking task should complete before wait() returns"
2566            );
2567        });
2568    }
2569
2570    #[test]
2571    fn test_cancellation_token_stops_blocking_task() {
2572        // Verify that CancellationToken properly signals spawn_blocking threads to exit.
2573        use std::sync::Arc;
2574        use std::sync::atomic::{AtomicBool, Ordering};
2575
2576        let rt = tokio::runtime::Builder::new_current_thread()
2577            .enable_all()
2578            .build()
2579            .expect("Failed to create runtime");
2580
2581        let task_exited = Arc::new(AtomicBool::new(false));
2582        let task_exited_clone = Arc::clone(&task_exited);
2583
2584        rt.block_on(async {
2585            let cancel_token = CancellationToken::new();
2586            let task_tracker = TaskTracker::new();
2587
2588            let cancel_clone = cancel_token.clone();
2589
2590            // Spawn a task that polls the cancellation token
2591            task_tracker.spawn_blocking(move || {
2592                // Loop until cancelled (mimics event thread pattern)
2593                loop {
2594                    if cancel_clone.is_cancelled() {
2595                        task_exited_clone.store(true, Ordering::SeqCst);
2596                        break;
2597                    }
2598                    thread::sleep(Duration::from_millis(10));
2599                }
2600            });
2601
2602            // Task should be running
2603            thread::sleep(Duration::from_millis(30));
2604            assert!(
2605                !task_exited.load(Ordering::SeqCst),
2606                "Task should still be running before cancellation"
2607            );
2608
2609            // Cancel and wait
2610            cancel_token.cancel();
2611            task_tracker.close();
2612            task_tracker.wait().await;
2613
2614            assert!(
2615                task_exited.load(Ordering::SeqCst),
2616                "Task should exit after cancellation"
2617            );
2618        });
2619    }
2620
2621    #[test]
2622    fn test_graceful_shutdown_waits_for_all_blocking_tasks() {
2623        // Verify that graceful shutdown waits for ALL spawn_blocking tasks.
2624        use std::sync::Arc;
2625        use std::sync::atomic::{AtomicUsize, Ordering};
2626
2627        let rt = tokio::runtime::Builder::new_current_thread()
2628            .enable_all()
2629            .build()
2630            .expect("Failed to create runtime");
2631
2632        let completed_count = Arc::new(AtomicUsize::new(0));
2633
2634        rt.block_on(async {
2635            let cancel_token = CancellationToken::new();
2636            let task_tracker = TaskTracker::new();
2637
2638            // Spawn multiple blocking tasks with different durations
2639            for i in 0..3 {
2640                let count_clone = Arc::clone(&completed_count);
2641                let cancel_clone = cancel_token.clone();
2642                task_tracker.spawn_blocking(move || {
2643                    // Wait for cancellation or work
2644                    loop {
2645                        if cancel_clone.is_cancelled() {
2646                            break;
2647                        }
2648                        thread::sleep(Duration::from_millis(10));
2649                    }
2650                    // Simulate different completion times
2651                    thread::sleep(Duration::from_millis(10 * (i as u64 + 1)));
2652                    count_clone.fetch_add(1, Ordering::SeqCst);
2653                });
2654            }
2655
2656            // All tasks should be running
2657            thread::sleep(Duration::from_millis(30));
2658            assert_eq!(
2659                completed_count.load(Ordering::SeqCst),
2660                0,
2661                "No tasks should complete before shutdown"
2662            );
2663
2664            // Initiate graceful shutdown (mimics Program::graceful_shutdown)
2665            cancel_token.cancel();
2666            task_tracker.close();
2667
2668            // Use timeout similar to graceful_shutdown
2669            let timeout_result: std::result::Result<(), tokio::time::error::Elapsed> =
2670                tokio::time::timeout(Duration::from_secs(2), task_tracker.wait()).await;
2671
2672            assert!(
2673                timeout_result.is_ok(),
2674                "All tasks should complete within timeout"
2675            );
2676
2677            assert_eq!(
2678                completed_count.load(Ordering::SeqCst),
2679                3,
2680                "All 3 tasks should complete during graceful shutdown"
2681            );
2682        });
2683    }
2684
2685    #[test]
2686    fn test_spawn_blocking_vs_spawn_difference() {
2687        // Document why spawn_blocking is needed vs std::thread::spawn.
2688        // With std::thread::spawn, TaskTracker.wait() doesn't wait for the thread.
2689        // With spawn_blocking, it does.
2690        use std::sync::Arc;
2691        use std::sync::atomic::{AtomicBool, Ordering};
2692
2693        let rt = tokio::runtime::Builder::new_current_thread()
2694            .enable_all()
2695            .build()
2696            .expect("Failed to create runtime");
2697
2698        // Test 1: std::thread::spawn is NOT tracked
2699        let untracked_done = Arc::new(AtomicBool::new(false));
2700        let untracked_done_clone = Arc::clone(&untracked_done);
2701
2702        rt.block_on(async {
2703            let task_tracker = TaskTracker::new();
2704
2705            // This thread is NOT spawned via task_tracker
2706            let _handle = thread::spawn(move || {
2707                thread::sleep(Duration::from_millis(100));
2708                untracked_done_clone.store(true, Ordering::SeqCst);
2709            });
2710
2711            task_tracker.close();
2712            task_tracker.wait().await;
2713
2714            // wait() returns immediately because no tasks were tracked!
2715            // The untracked thread may still be running
2716            // (We don't assert here because timing is unpredictable)
2717        });
2718
2719        // Test 2: spawn_blocking IS tracked
2720        let tracked_done = Arc::new(AtomicBool::new(false));
2721        let tracked_done_clone = Arc::clone(&tracked_done);
2722
2723        rt.block_on(async {
2724            let task_tracker = TaskTracker::new();
2725
2726            // This thread IS tracked
2727            task_tracker.spawn_blocking(move || {
2728                thread::sleep(Duration::from_millis(50));
2729                tracked_done_clone.store(true, Ordering::SeqCst);
2730            });
2731
2732            task_tracker.close();
2733            task_tracker.wait().await;
2734
2735            // wait() blocks until the tracked task completes
2736            assert!(
2737                tracked_done.load(Ordering::SeqCst),
2738                "spawn_blocking task should complete before wait() returns"
2739            );
2740        });
2741    }
2742
2743    #[test]
2744    fn test_event_thread_pattern_with_poll_timeout() {
2745        // Test the exact pattern used for the event thread:
2746        // - spawn_blocking with CancellationToken
2747        // - poll with timeout to check cancellation
2748        use std::sync::Arc;
2749        use std::sync::atomic::{AtomicUsize, Ordering};
2750
2751        let rt = tokio::runtime::Builder::new_current_thread()
2752            .enable_all()
2753            .build()
2754            .expect("Failed to create runtime");
2755
2756        let poll_count = Arc::new(AtomicUsize::new(0));
2757        let poll_count_clone = Arc::clone(&poll_count);
2758
2759        rt.block_on(async {
2760            let cancel_token = CancellationToken::new();
2761            let task_tracker = TaskTracker::new();
2762
2763            let cancel_clone = cancel_token.clone();
2764
2765            // Spawn event thread pattern
2766            task_tracker.spawn_blocking(move || {
2767                loop {
2768                    if cancel_clone.is_cancelled() {
2769                        break;
2770                    }
2771                    // Simulate poll with timeout (like event::poll(Duration::from_millis(100)))
2772                    thread::sleep(Duration::from_millis(25));
2773                    poll_count_clone.fetch_add(1, Ordering::SeqCst);
2774                }
2775            });
2776
2777            // Let the thread poll a few times
2778            thread::sleep(Duration::from_millis(100));
2779            let count_before_cancel = poll_count.load(Ordering::SeqCst);
2780            assert!(
2781                count_before_cancel >= 2,
2782                "Thread should have polled multiple times: {}",
2783                count_before_cancel
2784            );
2785
2786            // Cancel and wait
2787            cancel_token.cancel();
2788            task_tracker.close();
2789            task_tracker.wait().await;
2790
2791            // Thread should have exited
2792            let final_count = poll_count.load(Ordering::SeqCst);
2793            assert!(
2794                final_count >= count_before_cancel,
2795                "Poll count should not decrease"
2796            );
2797        });
2798    }
2799}