Skip to main content

bubbletea/
program.rs

1//! Program lifecycle and event loop.
2//!
3//! The Program struct manages the entire TUI application lifecycle,
4//! including terminal setup, event handling, and rendering.
5
6use std::io::{self, Read, Write};
7use std::sync::mpsc::{self, Receiver, Sender};
8use std::sync::{Arc, Mutex};
9use std::thread;
10use std::time::Duration;
11
12#[cfg(feature = "async")]
13use crate::command::CommandKind;
14
15#[cfg(feature = "async")]
16use tokio_util::sync::CancellationToken;
17
18#[cfg(feature = "async")]
19use tokio_util::task::TaskTracker;
20
21use tracing::debug;
22
23/// Spawn a closure for batch command execution.
24///
25/// When the `thread-pool` feature is enabled, uses rayon's work-stealing
26/// thread pool (bounded to `num_cpus` threads by default). Configure the
27/// pool size with [`rayon::ThreadPoolBuilder`] or the `RAYON_NUM_THREADS`
28/// environment variable.
29///
30/// Without the feature, falls back to spawning a new OS thread per command.
31#[cfg(feature = "thread-pool")]
32fn spawn_batch(f: impl FnOnce() + Send + 'static) {
33    rayon::spawn(f);
34}
35
36#[cfg(not(feature = "thread-pool"))]
37fn spawn_batch(f: impl FnOnce() + Send + 'static) {
38    let _ = thread::spawn(f);
39}
40
41use crossterm::{
42    cursor::{Hide, MoveTo, Show},
43    event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyEventKind},
44    execute,
45    terminal::{
46        self, Clear, ClearType, EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode,
47        enable_raw_mode,
48    },
49};
50
51use crate::command::Cmd;
52use crate::key::{from_crossterm_key, is_sequence_prefix};
53use crate::message::{
54    BatchMsg, BlurMsg, FocusMsg, InterruptMsg, Message, PrintLineMsg, QuitMsg,
55    RequestWindowSizeMsg, SequenceMsg, SetWindowTitleMsg, WindowSizeMsg,
56};
57use crate::mouse::from_crossterm_mouse;
58use crate::screen::{ReleaseTerminalMsg, RestoreTerminalMsg};
59use crate::{KeyMsg, KeyType};
60
61/// Errors that can occur when running a bubbletea program.
62///
63/// This enum represents all possible error conditions when running
64/// a TUI application with bubbletea.
65///
66/// # Error Handling
67///
68/// Most errors from bubbletea are recoverable. The recommended pattern
69/// is to use the `?` operator for propagation:
70///
71/// ```rust,ignore
72/// use bubbletea::{Program, Result};
73///
74/// fn run_app() -> Result<()> {
75///     let program = Program::new(MyModel::default());
76///     program.run()?;
77///     Ok(())
78/// }
79/// ```
80///
81/// # Recovery Strategies
82///
83/// | Error Variant | Recovery Strategy |
84/// |--------------|-------------------|
85/// | [`Io`](Error::Io) | Check terminal availability, retry, or report to user |
86/// | [`RawModeFailure`](Error::RawModeFailure) | Check terminal compatibility |
87/// | [`AltScreenFailure`](Error::AltScreenFailure) | Disable alt screen option |
88/// | [`EventPoll`](Error::EventPoll) | Terminal may be disconnected |
89/// | [`Render`](Error::Render) | Check output stream, retry |
90///
91/// # Example: Graceful Error Handling
92///
93/// ```rust,ignore
94/// use bubbletea::{Program, Error};
95///
96/// match Program::new(my_model).run() {
97///     Ok(final_model) => {
98///         println!("Program completed successfully");
99///     }
100///     Err(Error::Io(e)) if e.kind() == std::io::ErrorKind::NotConnected => {
101///         eprintln!("Terminal disconnected, saving state...");
102///         // Save any important state before exiting
103///     }
104///     Err(Error::RawModeFailure { .. }) => {
105///         eprintln!("Terminal doesn't support raw mode. Try a different terminal.");
106///     }
107///     Err(e) => {
108///         eprintln!("Program error: {}", e);
109///         std::process::exit(1);
110///     }
111/// }
112/// ```
113#[derive(thiserror::Error, Debug)]
114pub enum Error {
115    /// I/O error during terminal operations.
116    ///
117    /// This typically occurs when:
118    /// - The terminal is not available (e.g., running in a pipe)
119    /// - The terminal was closed unexpectedly
120    /// - System I/O resources are exhausted
121    /// - Terminal control sequences failed
122    ///
123    /// # Recovery
124    ///
125    /// Check if stdin/stdout are TTYs before starting your program.
126    /// Consider using a fallback mode for non-interactive environments.
127    ///
128    /// # Underlying Error
129    ///
130    /// The underlying [`std::io::Error`] can be accessed to determine
131    /// the specific cause. Common error kinds include:
132    /// - `NotConnected`: Terminal was disconnected
133    /// - `BrokenPipe`: Output stream closed
134    /// - `Other`: Terminal control sequence errors
135    #[error("terminal io error: {0}")]
136    Io(#[from] io::Error),
137
138    /// Failed to enable or disable raw mode.
139    ///
140    /// Raw mode is required for TUI operation as it disables terminal
141    /// line buffering and echo. This error typically indicates the
142    /// terminal doesn't support raw mode or isn't a TTY.
143    ///
144    /// # Recovery
145    ///
146    /// Verify the program is running in an interactive terminal.
147    /// Some terminals (especially on Windows) may have limited support.
148    #[error("failed to {action} raw mode: {source}")]
149    RawModeFailure {
150        /// Whether we were trying to enable or disable raw mode.
151        action: &'static str,
152        /// The underlying I/O error.
153        #[source]
154        source: io::Error,
155    },
156
157    /// Failed to enter or exit alternate screen.
158    ///
159    /// Alternate screen provides a separate buffer that preserves
160    /// the user's terminal content. This error may indicate the
161    /// terminal doesn't support alternate screen mode.
162    ///
163    /// # Recovery
164    ///
165    /// Try running without `.with_alt_screen()`.
166    #[error("failed to {action} alternate screen: {source}")]
167    AltScreenFailure {
168        /// Whether we were trying to enter or exit alt screen.
169        action: &'static str,
170        /// The underlying I/O error.
171        #[source]
172        source: io::Error,
173    },
174
175    /// Failed to poll for terminal events.
176    ///
177    /// This error occurs when the event polling system fails,
178    /// typically because the terminal was disconnected or closed.
179    ///
180    /// # Recovery
181    ///
182    /// The terminal connection may be lost. Save state and exit.
183    #[error("failed to poll terminal events: {0}")]
184    EventPoll(io::Error),
185
186    /// Failed to render the view to the terminal.
187    ///
188    /// This error occurs when writing the view output fails,
189    /// typically due to a broken pipe or disconnected terminal.
190    ///
191    /// # Recovery
192    ///
193    /// The output stream may be closed. Save state and exit.
194    #[error("failed to render view: {0}")]
195    Render(io::Error),
196}
197
198/// A specialized [`Result`] type for bubbletea operations.
199///
200/// This type alias is used throughout the bubbletea crate for convenience.
201/// It defaults to [`Error`] as the error type.
202///
203/// # Example
204///
205/// ```rust,ignore
206/// use bubbletea::Result;
207///
208/// fn run_program() -> Result<()> {
209///     // ... implementation
210///     Ok(())
211/// }
212/// ```
213///
214/// # Converting to Other Error Types
215///
216/// When integrating with other crates like `anyhow`:
217///
218/// ```rust,ignore
219/// use anyhow::Context;
220///
221/// fn main() -> anyhow::Result<()> {
222///     let model = bubbletea::Program::new(my_model)
223///         .run()
224///         .context("failed to run TUI program")?;
225///     Ok(())
226/// }
227/// ```
228pub type Result<T> = std::result::Result<T, Error>;
229
230/// The Model trait for TUI applications.
231///
232/// Implement this trait to define your application's behavior.
233///
234/// # Example
235///
236/// ```rust
237/// use bubbletea::{Model, Message, Cmd};
238///
239/// struct Counter { count: i32 }
240///
241/// impl Model for Counter {
242///     fn init(&self) -> Option<Cmd> { None }
243///
244///     fn update(&mut self, msg: Message) -> Option<Cmd> {
245///         if msg.is::<i32>() {
246///             self.count += msg.downcast::<i32>().unwrap();
247///         }
248///         None
249///     }
250///
251///     fn view(&self) -> String {
252///         format!("Count: {}", self.count)
253///     }
254/// }
255/// ```
256pub trait Model: Send + 'static {
257    /// Initialize the model and return an optional startup command.
258    ///
259    /// This is called once when the program starts.
260    fn init(&self) -> Option<Cmd>;
261
262    /// Process a message and return a new command.
263    ///
264    /// This is the pure update function at the heart of the Elm Architecture.
265    fn update(&mut self, msg: Message) -> Option<Cmd>;
266
267    /// Render the model as a string for display.
268    ///
269    /// This should be a pure function with no side effects.
270    fn view(&self) -> String;
271}
272
273/// Program options.
274#[derive(Debug, Clone)]
275pub struct ProgramOptions {
276    /// Use alternate screen buffer.
277    pub alt_screen: bool,
278    /// Enable mouse cell motion tracking.
279    pub mouse_cell_motion: bool,
280    /// Enable mouse all motion tracking.
281    pub mouse_all_motion: bool,
282    /// Enable bracketed paste mode.
283    pub bracketed_paste: bool,
284    /// Enable focus reporting.
285    pub report_focus: bool,
286    /// Use custom I/O (skip terminal setup and event polling).
287    pub custom_io: bool,
288    /// Target frames per second for rendering.
289    pub fps: u32,
290    /// Disable signal handling.
291    pub without_signals: bool,
292    /// Don't catch panics.
293    pub without_catch_panics: bool,
294}
295
296impl Default for ProgramOptions {
297    fn default() -> Self {
298        Self {
299            alt_screen: false,
300            mouse_cell_motion: false,
301            mouse_all_motion: false,
302            bracketed_paste: true,
303            report_focus: false,
304            custom_io: false,
305            fps: 60,
306            without_signals: false,
307            without_catch_panics: false,
308        }
309    }
310}
311
312/// Handle to a running program.
313///
314/// Returned by [`Program::start()`] to allow external interaction with the
315/// running TUI program. This is particularly useful for SSH applications
316/// where events need to be injected from outside the program.
317///
318/// # Example
319///
320/// ```rust,ignore
321/// use bubbletea::{Program, Message};
322///
323/// let handle = Program::new(MyModel::default())
324///     .with_custom_io()
325///     .start();
326///
327/// // Send a message to the running program
328/// handle.send(MyMessage::DoSomething);
329///
330/// // Wait for the program to finish
331/// let final_model = handle.wait()?;
332/// ```
333pub struct ProgramHandle<M: Model> {
334    tx: Sender<Message>,
335    handle: Option<thread::JoinHandle<Result<M>>>,
336}
337
338impl<M: Model> ProgramHandle<M> {
339    /// Send a message to the running program.
340    ///
341    /// This queues the message for processing in the program's event loop.
342    /// Returns `true` if the message was sent successfully, `false` if the
343    /// program has already exited.
344    pub fn send<T: Into<Message>>(&self, msg: T) -> bool {
345        self.tx.send(msg.into()).is_ok()
346    }
347
348    /// Request the program to quit.
349    ///
350    /// This sends a `QuitMsg` to the program's event loop.
351    pub fn quit(&self) {
352        let _ = self.tx.send(Message::new(QuitMsg));
353    }
354
355    /// Wait for the program to finish and return the final model state.
356    ///
357    /// This blocks until the program exits.
358    pub fn wait(mut self) -> Result<M> {
359        if let Some(handle) = self.handle.take() {
360            handle
361                .join()
362                .map_err(|_| Error::Io(io::Error::other("program thread panicked")))?
363        } else {
364            Err(Error::Io(io::Error::other("program already joined")))
365        }
366    }
367
368    /// Check if the program is still running.
369    pub fn is_running(&self) -> bool {
370        self.handle.as_ref().is_some_and(|h| !h.is_finished())
371    }
372}
373
374/// The main program runner.
375///
376/// Program manages the entire lifecycle of a TUI application:
377/// - Terminal setup and teardown
378/// - Event polling and message dispatching
379/// - Frame-rate limited rendering
380///
381/// # Example
382///
383/// ```rust,ignore
384/// use bubbletea::Program;
385///
386/// let model = MyModel::new();
387/// let final_model = Program::new(model)
388///     .with_alt_screen()
389///     .run()?;
390/// ```
391pub struct Program<M: Model> {
392    model: M,
393    options: ProgramOptions,
394    external_rx: Option<Receiver<Message>>,
395    input: Option<Box<dyn Read + Send>>,
396    output: Option<Box<dyn Write + Send>>,
397}
398
399impl<M: Model> Program<M> {
400    /// Create a new program with the given model.
401    pub fn new(model: M) -> Self {
402        Self {
403            model,
404            options: ProgramOptions::default(),
405            external_rx: None,
406            input: None,
407            output: None,
408        }
409    }
410
411    /// Provide an external message receiver.
412    ///
413    /// Messages received on this channel will be forwarded to the program's event loop.
414    /// This is useful for injecting events from external sources (e.g. SSH).
415    pub fn with_input_receiver(mut self, rx: Receiver<Message>) -> Self {
416        self.external_rx = Some(rx);
417        self
418    }
419
420    /// Provide a custom input reader.
421    ///
422    /// This enables custom I/O mode and reads raw bytes from the given reader,
423    /// translating them into Bubbletea messages.
424    pub fn with_input<R: Read + Send + 'static>(mut self, input: R) -> Self {
425        self.input = Some(Box::new(input));
426        self.options.custom_io = true;
427        self
428    }
429
430    /// Provide a custom output writer.
431    ///
432    /// This enables custom I/O mode and writes render output to the given writer.
433    pub fn with_output<W: Write + Send + 'static>(mut self, output: W) -> Self {
434        self.output = Some(Box::new(output));
435        self.options.custom_io = true;
436        self
437    }
438
439    /// Use alternate screen buffer (full-screen mode).
440    pub fn with_alt_screen(mut self) -> Self {
441        self.options.alt_screen = true;
442        self
443    }
444
445    /// Enable mouse cell motion tracking.
446    ///
447    /// Reports mouse clicks and drags.
448    pub fn with_mouse_cell_motion(mut self) -> Self {
449        self.options.mouse_cell_motion = true;
450        self
451    }
452
453    /// Enable mouse all motion tracking.
454    ///
455    /// Reports all mouse movement, even without button presses.
456    pub fn with_mouse_all_motion(mut self) -> Self {
457        self.options.mouse_all_motion = true;
458        self
459    }
460
461    /// Set the target frames per second.
462    ///
463    /// Default is 60 FPS. Valid range is 1-120 FPS.
464    pub fn with_fps(mut self, fps: u32) -> Self {
465        self.options.fps = fps.clamp(1, 120);
466        self
467    }
468
469    /// Enable focus reporting.
470    ///
471    /// Sends FocusMsg and BlurMsg when terminal gains/loses focus.
472    pub fn with_report_focus(mut self) -> Self {
473        self.options.report_focus = true;
474        self
475    }
476
477    /// Disable bracketed paste mode.
478    pub fn without_bracketed_paste(mut self) -> Self {
479        self.options.bracketed_paste = false;
480        self
481    }
482
483    /// Disable signal handling.
484    pub fn without_signal_handler(mut self) -> Self {
485        self.options.without_signals = true;
486        self
487    }
488
489    /// Don't catch panics.
490    pub fn without_catch_panics(mut self) -> Self {
491        self.options.without_catch_panics = true;
492        self
493    }
494
495    /// Enable custom I/O mode (skip terminal setup and crossterm polling).
496    ///
497    /// This is useful when embedding bubbletea in environments that manage
498    /// terminal state externally or when events are injected manually.
499    pub fn with_custom_io(mut self) -> Self {
500        self.options.custom_io = true;
501        self
502    }
503
504    /// Run the program with a custom writer.
505    pub fn run_with_writer<W: Write + Send + 'static>(self, mut writer: W) -> Result<M> {
506        // Save options for cleanup (since self will be moved)
507        let options = self.options.clone();
508
509        // Setup terminal (skip for custom IO)
510        if !options.custom_io {
511            enable_raw_mode()?;
512        }
513
514        if options.alt_screen {
515            execute!(writer, EnterAlternateScreen)?;
516        }
517
518        execute!(writer, Hide)?;
519
520        if options.mouse_all_motion {
521            execute!(writer, EnableMouseCapture)?;
522        } else if options.mouse_cell_motion {
523            execute!(writer, EnableMouseCapture)?;
524        }
525
526        if options.report_focus {
527            execute!(writer, event::EnableFocusChange)?;
528        }
529
530        if options.bracketed_paste {
531            execute!(writer, event::EnableBracketedPaste)?;
532        }
533
534        // Install a panic hook that restores the terminal before printing the
535        // panic message. Without this, a panic in update()/view() leaves the
536        // terminal in raw mode with hidden cursor and alternate screen active.
537        let prev_hook = if !options.without_catch_panics {
538            let cleanup_opts = options.clone();
539            let prev = std::panic::take_hook();
540            std::panic::set_hook(Box::new(move |info| {
541                // Best-effort terminal restoration — ignore errors since we're
542                // already in a panic context.
543                let mut stderr = io::stderr();
544                if cleanup_opts.bracketed_paste {
545                    let _ = execute!(stderr, event::DisableBracketedPaste);
546                }
547                if cleanup_opts.report_focus {
548                    let _ = execute!(stderr, event::DisableFocusChange);
549                }
550                if cleanup_opts.mouse_all_motion || cleanup_opts.mouse_cell_motion {
551                    let _ = execute!(stderr, DisableMouseCapture);
552                }
553                let _ = execute!(stderr, Show);
554                if cleanup_opts.alt_screen {
555                    let _ = execute!(stderr, LeaveAlternateScreen);
556                }
557                if !cleanup_opts.custom_io {
558                    let _ = disable_raw_mode();
559                }
560                // Call the previous hook so the user still sees the panic message
561                prev(info);
562            }));
563            true
564        } else {
565            false
566        };
567
568        // Run the event loop
569        let result = self.event_loop(&mut writer);
570
571        // Restore the previous panic hook before cleanup so a late panic in the
572        // cleanup code itself doesn't recurse.
573        if prev_hook {
574            let _ = std::panic::take_hook();
575            // Note: we can't easily restore the *original* hook since set_hook
576            // moved it into the closure. The default hook is fine for post-run.
577        }
578
579        // Cleanup terminal
580        if options.bracketed_paste {
581            let _ = execute!(writer, event::DisableBracketedPaste);
582        }
583
584        if options.report_focus {
585            let _ = execute!(writer, event::DisableFocusChange);
586        }
587
588        if options.mouse_all_motion || options.mouse_cell_motion {
589            let _ = execute!(writer, DisableMouseCapture);
590        }
591
592        let _ = execute!(writer, Show);
593
594        if options.alt_screen {
595            let _ = execute!(writer, LeaveAlternateScreen);
596        }
597
598        if !options.custom_io {
599            let _ = disable_raw_mode();
600        }
601
602        result
603    }
604
605    /// Run the program and return the final model state.
606    pub fn run(mut self) -> Result<M> {
607        if let Some(output) = self.output.take() {
608            return self.run_with_writer(output);
609        }
610
611        let stdout = io::stdout();
612        self.run_with_writer(stdout)
613    }
614
615    /// Start the program in a background thread and return a handle for interaction.
616    ///
617    /// This is useful for SSH applications and other scenarios where you need to
618    /// inject events from external sources after the program has started.
619    ///
620    /// # Example
621    ///
622    /// ```rust,ignore
623    /// use bubbletea::{Program, Message};
624    ///
625    /// let handle = Program::new(MyModel::default())
626    ///     .with_custom_io()
627    ///     .start();
628    ///
629    /// // Inject a key event
630    /// handle.send(KeyMsg::from_char('a'));
631    ///
632    /// // Later, quit the program
633    /// handle.quit();
634    ///
635    /// // Wait for completion
636    /// let final_model = handle.wait()?;
637    /// ```
638    pub fn start(mut self) -> ProgramHandle<M> {
639        // Create channel for external message injection
640        let (tx, rx) = mpsc::channel();
641
642        // Set up external receiver (will be forwarded in event_loop)
643        self.external_rx = Some(rx);
644
645        // Take ownership of custom output if provided
646        let output = self.output.take();
647
648        // Spawn program in background thread
649        let handle = thread::spawn(move || {
650            if let Some(output) = output {
651                self.run_with_writer(output)
652            } else {
653                let stdout = io::stdout();
654                self.run_with_writer(stdout)
655            }
656        });
657
658        ProgramHandle {
659            tx,
660            handle: Some(handle),
661        }
662    }
663
664    fn event_loop<W: Write>(mut self, writer: &mut W) -> Result<M> {
665        // Create message channel
666        let (tx, rx): (Sender<Message>, Receiver<Message>) = mpsc::channel();
667
668        // Thread handles for clean shutdown (bd-3dmx, bd-3azk)
669        let mut external_forwarder_handle: Option<thread::JoinHandle<()>> = None;
670        let mut input_parser_handle: Option<thread::JoinHandle<()>> = None;
671
672        // Track command execution threads for graceful shutdown (bd-zyb8)
673        let command_threads: Arc<Mutex<Vec<thread::JoinHandle<()>>>> =
674            Arc::new(Mutex::new(Vec::new()));
675
676        // Forward external messages
677        if let Some(ext_rx) = self.external_rx.take() {
678            let tx_clone = tx.clone();
679            debug!(target: "bubbletea::thread", "Spawning external forwarder thread");
680            external_forwarder_handle = Some(thread::spawn(move || {
681                while let Ok(msg) = ext_rx.recv() {
682                    if tx_clone.send(msg).is_err() {
683                        debug!(target: "bubbletea::event", "external message dropped — receiver disconnected");
684                        break;
685                    }
686                }
687            }));
688        }
689
690        // Read custom input stream and inject messages.
691        if let Some(mut input) = self.input.take() {
692            let tx_clone = tx.clone();
693            debug!(target: "bubbletea::thread", "Spawning input parser thread");
694            input_parser_handle = Some(thread::spawn(move || {
695                let mut parser = InputParser::new();
696                let mut buf = [0u8; 256];
697                loop {
698                    match input.read(&mut buf) {
699                        Ok(0) => break,
700                        Ok(n) => {
701                            // We always assume there could be more data unless we hit EOF (Ok(0))
702                            let can_have_more_data = true;
703                            for msg in parser.push_bytes(&buf[..n], can_have_more_data) {
704                                if tx_clone.send(msg).is_err() {
705                                    debug!(target: "bubbletea::input", "input message dropped — receiver disconnected");
706                                    return;
707                                }
708                            }
709                        }
710                        Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
711                            thread::yield_now();
712                        }
713                        Err(_) => break,
714                    }
715                }
716
717                for msg in parser.flush() {
718                    if tx_clone.send(msg).is_err() {
719                        debug!(target: "bubbletea::input", "flush message dropped — receiver disconnected");
720                        break;
721                    }
722                }
723            }));
724        }
725
726        // Get initial window size (only if not custom IO, otherwise trust init msg)
727        if !self.options.custom_io
728            && let Ok((width, height)) = terminal::size()
729            && tx
730                .send(Message::new(WindowSizeMsg { width, height }))
731                .is_err()
732        {
733            debug!(target: "bubbletea::event", "initial window size dropped — receiver disconnected");
734        }
735
736        // Call init and handle initial command
737        if let Some(cmd) = self.model.init() {
738            self.handle_command(cmd, tx.clone(), Arc::clone(&command_threads));
739        }
740
741        // Render initial view
742        let mut last_view = String::new();
743        self.render(writer, &mut last_view)?;
744
745        // Frame timing
746        let frame_duration = Duration::from_secs_f64(1.0 / self.options.fps as f64);
747
748        // Event loop
749        loop {
750            // Poll for events with frame-rate limiting (skip poll if custom IO)
751            // In custom IO mode, events are injected via `with_input_receiver()` or `with_input()`.
752            // Crossterm polling is skipped since input comes from external sources.
753            if !self.options.custom_io && event::poll(frame_duration)? {
754                match event::read()? {
755                    Event::Key(key_event) => {
756                        // Only handle key press events, not release
757                        if key_event.kind != KeyEventKind::Press {
758                            continue;
759                        }
760
761                        let key_msg = from_crossterm_key(key_event.code, key_event.modifiers);
762
763                        // Handle Ctrl+C specially
764                        if key_msg.key_type == crate::KeyType::CtrlC {
765                            if tx.send(Message::new(InterruptMsg)).is_err() {
766                                debug!(target: "bubbletea::event", "interrupt message dropped — receiver disconnected");
767                            }
768                        } else if tx.send(Message::new(key_msg)).is_err() {
769                            debug!(target: "bubbletea::event", "key message dropped — receiver disconnected");
770                        }
771                    }
772                    Event::Mouse(mouse_event) => {
773                        let mouse_msg = from_crossterm_mouse(mouse_event);
774                        if tx.send(Message::new(mouse_msg)).is_err() {
775                            debug!(target: "bubbletea::event", "mouse message dropped — receiver disconnected");
776                        }
777                    }
778                    Event::Resize(width, height) => {
779                        if tx
780                            .send(Message::new(WindowSizeMsg { width, height }))
781                            .is_err()
782                        {
783                            debug!(target: "bubbletea::event", "resize message dropped — receiver disconnected");
784                        }
785                    }
786                    Event::FocusGained => {
787                        if tx.send(Message::new(FocusMsg)).is_err() {
788                            debug!(target: "bubbletea::event", "focus message dropped — receiver disconnected");
789                        }
790                    }
791                    Event::FocusLost => {
792                        if tx.send(Message::new(BlurMsg)).is_err() {
793                            debug!(target: "bubbletea::event", "blur message dropped — receiver disconnected");
794                        }
795                    }
796                    Event::Paste(text) => {
797                        // Send as a key message with paste flag
798                        let key_msg = KeyMsg {
799                            key_type: crate::KeyType::Runes,
800                            runes: text.chars().collect(),
801                            alt: false,
802                            paste: true,
803                        };
804                        if tx.send(Message::new(key_msg)).is_err() {
805                            debug!(target: "bubbletea::event", "paste message dropped — receiver disconnected");
806                        }
807                    }
808                }
809            }
810
811            // Process all pending messages
812            let mut needs_render = false;
813            let mut should_quit = false;
814            while let Ok(msg) = rx.try_recv() {
815                // Check for quit message
816                if msg.is::<QuitMsg>() {
817                    should_quit = true;
818                    break;
819                }
820
821                // Check for interrupt message (Ctrl+C)
822                if msg.is::<InterruptMsg>() {
823                    should_quit = true;
824                    break;
825                }
826
827                // Handle batch message (already handled in handle_command)
828                if msg.is::<BatchMsg>() {
829                    continue;
830                }
831
832                // Handle window title
833                if let Some(title_msg) = msg.downcast_ref::<SetWindowTitleMsg>() {
834                    execute!(writer, terminal::SetTitle(&title_msg.0))?;
835                    continue;
836                }
837
838                // Handle window size request
839                if msg.is::<RequestWindowSizeMsg>() {
840                    if !self.options.custom_io
841                        && let Ok((width, height)) = terminal::size()
842                        && tx
843                            .send(Message::new(WindowSizeMsg { width, height }))
844                            .is_err()
845                    {
846                        debug!(target: "bubbletea::event", "window size response dropped — receiver disconnected");
847                    }
848                    continue;
849                }
850
851                // Handle print line message (only when not in alt screen)
852                if let Some(print_msg) = msg.downcast_ref::<PrintLineMsg>() {
853                    if !self.options.alt_screen {
854                        // Print each line above the TUI
855                        for line in print_msg.0.lines() {
856                            let _ = writeln!(writer, "{}", line);
857                        }
858                        let _ = writer.flush();
859                        // Force a full re-render since we printed above
860                        last_view.clear();
861                        needs_render = true;
862                    }
863                    continue;
864                }
865
866                // Handle release terminal
867                if msg.is::<ReleaseTerminalMsg>() {
868                    if !self.options.custom_io {
869                        // Disable features in reverse order
870                        if self.options.bracketed_paste {
871                            let _ = execute!(writer, event::DisableBracketedPaste);
872                        }
873                        if self.options.report_focus {
874                            let _ = execute!(writer, event::DisableFocusChange);
875                        }
876                        if self.options.mouse_all_motion || self.options.mouse_cell_motion {
877                            let _ = execute!(writer, DisableMouseCapture);
878                        }
879                        let _ = execute!(writer, Show);
880                        if self.options.alt_screen {
881                            let _ = execute!(writer, LeaveAlternateScreen);
882                        }
883                        let _ = disable_raw_mode();
884                    }
885                    continue;
886                }
887
888                // Handle restore terminal
889                if msg.is::<RestoreTerminalMsg>() {
890                    if !self.options.custom_io {
891                        // Re-enable features in original order
892                        let _ = enable_raw_mode();
893                        if self.options.alt_screen {
894                            let _ = execute!(writer, EnterAlternateScreen);
895                        }
896                        let _ = execute!(writer, Hide);
897                        if self.options.mouse_all_motion {
898                            let _ = execute!(writer, EnableMouseCapture);
899                        } else if self.options.mouse_cell_motion {
900                            let _ = execute!(writer, EnableMouseCapture);
901                        }
902                        if self.options.report_focus {
903                            let _ = execute!(writer, event::EnableFocusChange);
904                        }
905                        if self.options.bracketed_paste {
906                            let _ = execute!(writer, event::EnableBracketedPaste);
907                        }
908                        // Force a full re-render
909                        last_view.clear();
910                    }
911                    needs_render = true;
912                    continue;
913                }
914
915                // Update model
916                if let Some(cmd) = self.model.update(msg) {
917                    self.handle_command(cmd, tx.clone(), Arc::clone(&command_threads));
918                }
919                needs_render = true;
920            }
921
922            // Exit main loop if quit was requested
923            if should_quit {
924                break;
925            }
926
927            // Render if needed
928            if needs_render {
929                self.render(writer, &mut last_view)?;
930            }
931
932            // Sleep a bit if loop is tight (only needed if poll didn't sleep)
933            if self.options.custom_io {
934                thread::sleep(frame_duration);
935            }
936        }
937
938        // Clean shutdown: drop sender to signal threads, then join them (bd-3azk)
939        drop(tx);
940        debug!(target: "bubbletea::thread", "Sender dropped, waiting for threads to exit");
941
942        if let Some(handle) = external_forwarder_handle {
943            match handle.join() {
944                Ok(()) => {
945                    debug!(target: "bubbletea::thread", "External forwarder thread joined successfully")
946                }
947                Err(e) => {
948                    tracing::warn!(target: "bubbletea::thread", "External forwarder thread panicked: {:?}", e)
949                }
950            }
951        }
952
953        if let Some(handle) = input_parser_handle {
954            match handle.join() {
955                Ok(()) => {
956                    debug!(target: "bubbletea::thread", "Input parser thread joined successfully")
957                }
958                Err(e) => {
959                    tracing::warn!(target: "bubbletea::thread", "Input parser thread panicked: {:?}", e)
960                }
961            }
962        }
963
964        // Join command execution threads with timeout (bd-zyb8)
965        // Give commands a reasonable time to complete, but don't block forever.
966        const COMMAND_THREAD_TIMEOUT: Duration = Duration::from_secs(5);
967        let join_deadline = std::time::Instant::now() + COMMAND_THREAD_TIMEOUT;
968
969        if let Ok(mut threads) = command_threads.lock() {
970            let thread_count = threads.len();
971            if thread_count > 0 {
972                debug!(target: "bubbletea::thread", "Waiting for {} command thread(s) to complete", thread_count);
973            }
974
975            // Drain and join all threads
976            for handle in threads.drain(..) {
977                if handle.is_finished() {
978                    // Thread already done, just join to clean up
979                    let _ = handle.join();
980                } else {
981                    // Thread still running - wait with timeout
982                    let remaining =
983                        join_deadline.saturating_duration_since(std::time::Instant::now());
984                    if remaining.is_zero() {
985                        debug!(target: "bubbletea::thread", "Timeout waiting for command threads, abandoning remaining");
986                        break;
987                    }
988
989                    // Spin-wait with small sleeps until thread finishes or timeout
990                    let poll_interval = Duration::from_millis(10);
991                    let start = std::time::Instant::now();
992                    while !handle.is_finished() && start.elapsed() < remaining {
993                        thread::sleep(poll_interval);
994                    }
995
996                    if handle.is_finished() {
997                        match handle.join() {
998                            Ok(()) => {
999                                debug!(target: "bubbletea::thread", "Command thread joined successfully")
1000                            }
1001                            Err(e) => {
1002                                tracing::warn!(target: "bubbletea::thread", "Command thread panicked: {:?}", e)
1003                            }
1004                        }
1005                    } else {
1006                        debug!(target: "bubbletea::thread", "Command thread did not finish in time, abandoning");
1007                    }
1008                }
1009            }
1010        } else {
1011            tracing::warn!(target: "bubbletea::thread", "Failed to join command threads: mutex poisoned");
1012        }
1013
1014        Ok(self.model)
1015    }
1016
1017    fn handle_command(
1018        &self,
1019        cmd: Cmd,
1020        tx: Sender<Message>,
1021        command_threads: Arc<Mutex<Vec<thread::JoinHandle<()>>>>,
1022    ) {
1023        // Spawn thread and track its handle for graceful shutdown (bd-zyb8)
1024        let handle = thread::spawn(move || {
1025            if let Some(msg) = cmd.execute() {
1026                // Handle batch and sequence messages specially
1027                if msg.is::<BatchMsg>() {
1028                    if let Some(batch) = msg.downcast::<BatchMsg>() {
1029                        for cmd in batch.0 {
1030                            let tx_clone = tx.clone();
1031                            spawn_batch(move || {
1032                                if let Some(msg) = cmd.execute()
1033                                    && tx_clone.send(msg).is_err()
1034                                {
1035                                    debug!(target: "bubbletea::command", "batch command result dropped — receiver disconnected");
1036                                }
1037                            });
1038                        }
1039                    }
1040                } else if msg.is::<SequenceMsg>() {
1041                    if let Some(seq) = msg.downcast::<SequenceMsg>() {
1042                        for cmd in seq.0 {
1043                            if let Some(msg) = cmd.execute()
1044                                && tx.send(msg).is_err()
1045                            {
1046                                debug!(target: "bubbletea::command", "sequence command result dropped — receiver disconnected");
1047                                break;
1048                            }
1049                        }
1050                    }
1051                } else if tx.send(msg).is_err() {
1052                    debug!(target: "bubbletea::command", "command result dropped — receiver disconnected");
1053                }
1054            }
1055        });
1056
1057        // Track the handle for shutdown (lock is brief, just a Vec push)
1058        if let Ok(mut threads) = command_threads.lock() {
1059            // Prune finished threads to prevent unbounded growth
1060            threads.retain(|h| !h.is_finished());
1061            threads.push(handle);
1062        } else {
1063            // Mutex was poisoned (a thread panicked while holding it).
1064            // Log and continue - the handle will be orphaned but program can proceed.
1065            debug!(target: "bubbletea::thread", "Failed to track command thread: mutex poisoned");
1066        }
1067    }
1068
1069    fn render<W: Write>(&self, writer: &mut W, last_view: &mut String) -> Result<()> {
1070        let view = self.model.view();
1071
1072        // Skip if view hasn't changed
1073        if view == *last_view {
1074            return Ok(());
1075        }
1076
1077        // Clear and render
1078        execute!(writer, MoveTo(0, 0), Clear(ClearType::All))?;
1079        write!(writer, "{}", view)?;
1080        writer.flush()?;
1081
1082        *last_view = view;
1083        Ok(())
1084    }
1085}
1086
1087// =============================================================================
1088// Async Program Implementation (requires "async" feature)
1089// =============================================================================
1090
1091#[cfg(feature = "async")]
1092impl<M: Model> Program<M> {
1093    /// Run the program using the tokio async runtime.
1094    ///
1095    /// This is the async version of `run()`. It uses tokio for command execution
1096    /// and event handling, which is more efficient for I/O-bound operations.
1097    ///
1098    /// # Example
1099    ///
1100    /// ```rust,ignore
1101    /// use bubbletea::Program;
1102    ///
1103    /// #[tokio::main]
1104    /// async fn main() -> Result<(), bubbletea::Error> {
1105    ///     let model = MyModel::new();
1106    ///     let final_model = Program::new(model)
1107    ///         .with_alt_screen()
1108    ///         .run_async()
1109    ///         .await?;
1110    ///     Ok(())
1111    /// }
1112    /// ```
1113    pub async fn run_async(mut self) -> Result<M> {
1114        if let Some(output) = self.output.take() {
1115            return self.run_async_with_writer(output).await;
1116        }
1117
1118        let stdout = io::stdout();
1119        self.run_async_with_writer(stdout).await
1120    }
1121
1122    /// Run the program using the tokio async runtime with a custom writer.
1123    pub async fn run_async_with_writer<W: Write + Send + 'static>(
1124        self,
1125        mut writer: W,
1126    ) -> Result<M> {
1127        // Save options for cleanup (since self will be moved)
1128        let options = self.options.clone();
1129
1130        // Setup terminal (skip for custom I/O)
1131        if !options.custom_io {
1132            enable_raw_mode()?;
1133        }
1134
1135        if options.alt_screen {
1136            execute!(writer, EnterAlternateScreen)?;
1137        }
1138
1139        execute!(writer, Hide)?;
1140
1141        if options.mouse_all_motion {
1142            execute!(writer, EnableMouseCapture)?;
1143        } else if options.mouse_cell_motion {
1144            execute!(writer, EnableMouseCapture)?;
1145        }
1146
1147        if options.report_focus {
1148            execute!(writer, event::EnableFocusChange)?;
1149        }
1150
1151        if options.bracketed_paste {
1152            execute!(writer, event::EnableBracketedPaste)?;
1153        }
1154
1155        // Install panic hook to restore terminal on panic (mirrors sync path)
1156        let prev_hook = if !options.without_catch_panics {
1157            let cleanup_opts = options.clone();
1158            let prev = std::panic::take_hook();
1159            std::panic::set_hook(Box::new(move |info| {
1160                let mut stderr = io::stderr();
1161                if cleanup_opts.bracketed_paste {
1162                    let _ = execute!(stderr, event::DisableBracketedPaste);
1163                }
1164                if cleanup_opts.report_focus {
1165                    let _ = execute!(stderr, event::DisableFocusChange);
1166                }
1167                if cleanup_opts.mouse_all_motion || cleanup_opts.mouse_cell_motion {
1168                    let _ = execute!(stderr, DisableMouseCapture);
1169                }
1170                let _ = execute!(stderr, Show);
1171                if cleanup_opts.alt_screen {
1172                    let _ = execute!(stderr, LeaveAlternateScreen);
1173                }
1174                if !cleanup_opts.custom_io {
1175                    let _ = disable_raw_mode();
1176                }
1177                prev(info);
1178            }));
1179            true
1180        } else {
1181            false
1182        };
1183
1184        // Run the async event loop
1185        let result = self.event_loop_async(&mut writer).await;
1186
1187        // Restore panic hook
1188        if prev_hook {
1189            let _ = std::panic::take_hook();
1190        }
1191
1192        // Cleanup terminal
1193        if options.bracketed_paste {
1194            let _ = execute!(writer, event::DisableBracketedPaste);
1195        }
1196
1197        if options.report_focus {
1198            let _ = execute!(writer, event::DisableFocusChange);
1199        }
1200
1201        if options.mouse_all_motion || options.mouse_cell_motion {
1202            let _ = execute!(writer, DisableMouseCapture);
1203        }
1204
1205        let _ = execute!(writer, Show);
1206
1207        if options.alt_screen {
1208            let _ = execute!(writer, LeaveAlternateScreen);
1209        }
1210
1211        if !options.custom_io {
1212            let _ = disable_raw_mode();
1213        }
1214
1215        result
1216    }
1217
1218    async fn event_loop_async<W: Write>(mut self, stdout: &mut W) -> Result<M> {
1219        // Create async message channel
1220        let (tx, mut rx) = tokio::sync::mpsc::channel::<Message>(256);
1221
1222        // Create cancellation token and task tracker for graceful shutdown
1223        let cancel_token = CancellationToken::new();
1224        let task_tracker = TaskTracker::new();
1225
1226        // Forward external messages using tokio's blocking thread pool
1227        // This is tracked for graceful shutdown and respects cancellation
1228        if let Some(ext_rx) = self.external_rx.take() {
1229            let tx_clone = tx.clone();
1230            let cancel_clone = cancel_token.clone();
1231            task_tracker.spawn_blocking(move || {
1232                // Use recv_timeout to periodically check for cancellation
1233                let timeout = Duration::from_millis(100);
1234                loop {
1235                    if cancel_clone.is_cancelled() {
1236                        break;
1237                    }
1238                    match ext_rx.recv_timeout(timeout) {
1239                        Ok(msg) => {
1240                            if tx_clone.blocking_send(msg).is_err() {
1241                                debug!(target: "bubbletea::event", "async external message dropped — receiver disconnected");
1242                                break;
1243                            }
1244                        }
1245                        Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
1246                            // Continue loop to check cancellation
1247                        }
1248                        Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
1249                            // Channel closed, exit
1250                            break;
1251                        }
1252                    }
1253                }
1254            });
1255        }
1256
1257        // Read custom input stream and inject messages.
1258        if let Some(mut input) = self.input.take() {
1259            let tx_clone = tx.clone();
1260            let cancel_clone = cancel_token.clone();
1261            task_tracker.spawn_blocking(move || {
1262                let mut parser = InputParser::new();
1263                let mut buf = [0u8; 256];
1264                loop {
1265                    if cancel_clone.is_cancelled() {
1266                        break;
1267                    }
1268                    match input.read(&mut buf) {
1269                        Ok(0) => break,
1270                        Ok(n) => {
1271                            // We always assume there could be more data unless we hit EOF (Ok(0))
1272                            let can_have_more_data = true;
1273                            for msg in parser.push_bytes(&buf[..n], can_have_more_data) {
1274                                if tx_clone.blocking_send(msg).is_err() {
1275                                    return;
1276                                }
1277                            }
1278                        }
1279                        Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1280                            std::thread::yield_now();
1281                        }
1282                        Err(_) => break,
1283                    }
1284                }
1285
1286                for msg in parser.flush() {
1287                    if tx_clone.blocking_send(msg).is_err() {
1288                        break;
1289                    }
1290                }
1291            });
1292        }
1293
1294        // Spawn event listener thread (bd-2353: use task_tracker for graceful shutdown)
1295        let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<Event>(100);
1296        let event_cancel = cancel_token.clone();
1297
1298        if !self.options.custom_io {
1299            task_tracker.spawn_blocking(move || {
1300                loop {
1301                    if event_cancel.is_cancelled() {
1302                        break;
1303                    }
1304                    // Poll with timeout to check cancellation
1305                    match event::poll(Duration::from_millis(100)) {
1306                        Ok(true) => {
1307                            if let Ok(evt) = event::read()
1308                                && event_tx.blocking_send(evt).is_err()
1309                            {
1310                                break;
1311                            }
1312                        }
1313                        Ok(false) => {} // timeout
1314                        Err(_) => {
1315                            break;
1316                        } // error
1317                    }
1318                }
1319            });
1320        }
1321
1322        // Get initial window size
1323        if !self.options.custom_io {
1324            let (width, height) = terminal::size()?;
1325            if tx
1326                .send(Message::new(WindowSizeMsg { width, height }))
1327                .await
1328                .is_err()
1329            {
1330                debug!(target: "bubbletea::event", "async initial window size dropped — receiver disconnected");
1331            }
1332        }
1333
1334        // Call init and handle initial command
1335        if let Some(cmd) = self.model.init() {
1336            Self::handle_command_tracked(
1337                cmd.into(),
1338                tx.clone(),
1339                &task_tracker,
1340                cancel_token.clone(),
1341            );
1342        }
1343
1344        // Render initial view
1345        let mut last_view = String::new();
1346        self.render(stdout, &mut last_view)?;
1347
1348        // Frame timing
1349        let frame_duration = Duration::from_secs_f64(1.0 / self.options.fps as f64);
1350        let mut frame_interval = tokio::time::interval(frame_duration);
1351
1352        // Event loop
1353        loop {
1354            tokio::select! {
1355                // Check for terminal events via channel
1356                Some(event) = event_rx.recv(), if !self.options.custom_io => {
1357                    match event {
1358                        Event::Key(key_event) => {
1359                            // Only handle key press events, not release
1360                            if key_event.kind != KeyEventKind::Press {
1361                                continue;
1362                            }
1363
1364                            let key_msg = from_crossterm_key(key_event.code, key_event.modifiers);
1365
1366                            // Handle Ctrl+C specially
1367                            if key_msg.key_type == crate::KeyType::CtrlC {
1368                                if tx.send(Message::new(InterruptMsg)).await.is_err() {
1369                                    debug!(target: "bubbletea::event", "async interrupt message dropped — receiver disconnected");
1370                                }
1371                            } else if tx.send(Message::new(key_msg)).await.is_err() {
1372                                debug!(target: "bubbletea::event", "async key message dropped — receiver disconnected");
1373                            }
1374                        }
1375                        Event::Mouse(mouse_event) => {
1376                            let mouse_msg = from_crossterm_mouse(mouse_event);
1377                            if tx.send(Message::new(mouse_msg)).await.is_err() {
1378                                debug!(target: "bubbletea::event", "async mouse message dropped — receiver disconnected");
1379                            }
1380                        }
1381                        Event::Resize(width, height) => {
1382                            if tx.send(Message::new(WindowSizeMsg { width, height })).await.is_err() {
1383                                debug!(target: "bubbletea::event", "async resize message dropped — receiver disconnected");
1384                            }
1385                        }
1386                        Event::FocusGained => {
1387                            if tx.send(Message::new(FocusMsg)).await.is_err() {
1388                                debug!(target: "bubbletea::event", "async focus message dropped — receiver disconnected");
1389                            }
1390                        }
1391                        Event::FocusLost => {
1392                            if tx.send(Message::new(BlurMsg)).await.is_err() {
1393                                debug!(target: "bubbletea::event", "async blur message dropped — receiver disconnected");
1394                            }
1395                        }
1396                        Event::Paste(text) => {
1397                            // Send as a key message with paste flag
1398                            let key_msg = KeyMsg {
1399                                key_type: crate::KeyType::Runes,
1400                                runes: text.chars().collect(),
1401                                alt: false,
1402                                paste: true,
1403                            };
1404                            if tx.send(Message::new(key_msg)).await.is_err() {
1405                                debug!(target: "bubbletea::event", "async paste message dropped — receiver disconnected");
1406                            }
1407                        }
1408                    }
1409                }
1410
1411                // Process incoming messages
1412                Some(msg) = rx.recv() => {
1413                    // Check for quit message - initiate graceful shutdown
1414                    if msg.is::<QuitMsg>() {
1415                        Self::graceful_shutdown(&cancel_token, &task_tracker).await;
1416                        return Ok(self.model);
1417                    }
1418
1419                    // Check for interrupt message (Ctrl+C) - initiate graceful shutdown
1420                    if msg.is::<InterruptMsg>() {
1421                        Self::graceful_shutdown(&cancel_token, &task_tracker).await;
1422                        return Ok(self.model);
1423                    }
1424
1425                    // Handle batch message (already handled in handle_command_tracked)
1426                    if msg.is::<BatchMsg>() {
1427                        continue;
1428                    }
1429
1430                    // Handle window title
1431                    if let Some(title_msg) = msg.downcast_ref::<SetWindowTitleMsg>() {
1432                        execute!(stdout, terminal::SetTitle(&title_msg.0))?;
1433                        continue;
1434                    }
1435
1436                    // Handle window size request
1437                    if msg.is::<RequestWindowSizeMsg>() {
1438                        if !self.options.custom_io {
1439                            let (width, height) = terminal::size()?;
1440                            if tx.send(Message::new(WindowSizeMsg { width, height })).await.is_err() {
1441                                debug!(target: "bubbletea::event", "async window size response dropped — receiver disconnected");
1442                            }
1443                        }
1444                        continue;
1445                    }
1446
1447                    // Handle print line message (only when not in alt screen)
1448                    if let Some(print_msg) = msg.downcast_ref::<PrintLineMsg>() {
1449                        if !self.options.alt_screen {
1450                            // Print each line above the TUI
1451                            for line in print_msg.0.lines() {
1452                                let _ = writeln!(stdout, "{}", line);
1453                            }
1454                            let _ = stdout.flush();
1455                            // Force a full re-render since we printed above
1456                            last_view.clear();
1457                        }
1458                        self.render(stdout, &mut last_view)?;
1459                        continue;
1460                    }
1461
1462                    // Handle release terminal
1463                    if msg.is::<ReleaseTerminalMsg>() {
1464                        if !self.options.custom_io {
1465                            // Disable features in reverse order
1466                            if self.options.bracketed_paste {
1467                                let _ = execute!(stdout, event::DisableBracketedPaste);
1468                            }
1469                            if self.options.report_focus {
1470                                let _ = execute!(stdout, event::DisableFocusChange);
1471                            }
1472                            if self.options.mouse_all_motion || self.options.mouse_cell_motion {
1473                                let _ = execute!(stdout, DisableMouseCapture);
1474                            }
1475                            let _ = execute!(stdout, Show);
1476                            if self.options.alt_screen {
1477                                let _ = execute!(stdout, LeaveAlternateScreen);
1478                            }
1479                            let _ = disable_raw_mode();
1480                        }
1481                        continue;
1482                    }
1483
1484                    // Handle restore terminal
1485                    if msg.is::<RestoreTerminalMsg>() {
1486                        if !self.options.custom_io {
1487                            // Re-enable features in original order
1488                            let _ = enable_raw_mode();
1489                            if self.options.alt_screen {
1490                                let _ = execute!(stdout, EnterAlternateScreen);
1491                            }
1492                            let _ = execute!(stdout, Hide);
1493                            if self.options.mouse_all_motion {
1494                                let _ = execute!(stdout, EnableMouseCapture);
1495                            } else if self.options.mouse_cell_motion {
1496                                let _ = execute!(stdout, EnableMouseCapture);
1497                            }
1498                            if self.options.report_focus {
1499                                let _ = execute!(stdout, event::EnableFocusChange);
1500                            }
1501                            if self.options.bracketed_paste {
1502                                let _ = execute!(stdout, event::EnableBracketedPaste);
1503                            }
1504                            // Force a full re-render
1505                            last_view.clear();
1506                        }
1507                        self.render(stdout, &mut last_view)?;
1508                        continue;
1509                    }
1510
1511                    // Update model
1512                    if let Some(cmd) = self.model.update(msg) {
1513                        Self::handle_command_tracked(
1514                            cmd.into(),
1515                            tx.clone(),
1516                            &task_tracker,
1517                            cancel_token.clone(),
1518                        );
1519                    }
1520
1521                    // Render after processing message
1522                    self.render(stdout, &mut last_view)?;
1523                }
1524
1525                // Frame tick for rendering
1526                _ = frame_interval.tick() => {
1527                    // Periodic render check (in case we missed something)
1528                }
1529            }
1530        }
1531    }
1532
1533    /// Perform graceful shutdown: cancel all tasks and wait for them to complete.
1534    async fn graceful_shutdown(cancel_token: &CancellationToken, task_tracker: &TaskTracker) {
1535        // Signal all tasks to cancel
1536        cancel_token.cancel();
1537
1538        // Close the tracker to prevent new tasks
1539        task_tracker.close();
1540
1541        // Wait for all tasks with a timeout (5 seconds)
1542        let shutdown_timeout = Duration::from_secs(5);
1543        let _ = tokio::time::timeout(shutdown_timeout, task_tracker.wait()).await;
1544    }
1545
1546    /// Handle a command with task tracking and cancellation support.
1547    fn handle_command_tracked(
1548        cmd: CommandKind,
1549        tx: tokio::sync::mpsc::Sender<Message>,
1550        tracker: &TaskTracker,
1551        cancel_token: CancellationToken,
1552    ) {
1553        // Clone tracker and cancel token so batch sub-commands can also be
1554        // tracked and cancelled during graceful shutdown.
1555        let batch_tracker = tracker.clone();
1556        let batch_cancel = cancel_token.clone();
1557        tracker.spawn(async move {
1558            tokio::select! {
1559                // Execute the command
1560                result = cmd.execute() => {
1561                    if let Some(msg) = result {
1562                        // Handle batch and sequence messages specially
1563                        if msg.is::<BatchMsg>() {
1564                            if let Some(batch) = msg.downcast::<BatchMsg>() {
1565                                for cmd in batch.0 {
1566                                    let tx_clone = tx.clone();
1567                                    let cancel = batch_cancel.clone();
1568                                    // Batch commands are now tracked via TaskTracker
1569                                    // and respect the cancellation token for clean shutdown.
1570                                    batch_tracker.spawn(async move {
1571                                        tokio::select! {
1572                                            result = async {
1573                                                let cmd_kind: CommandKind = cmd.into();
1574                                                cmd_kind.execute().await
1575                                            } => {
1576                                                if let Some(msg) = result {
1577                                                    if tx_clone.send(msg).await.is_err() {
1578                                                        debug!(target: "bubbletea::command", "async batch command result dropped — receiver disconnected");
1579                                                    }
1580                                                }
1581                                            }
1582                                            _ = cancel.cancelled() => {
1583                                                debug!(target: "bubbletea::command", "async batch command cancelled during shutdown");
1584                                            }
1585                                        }
1586                                    });
1587                                }
1588                            }
1589                        } else if msg.is::<SequenceMsg>() {
1590                            if let Some(seq) = msg.downcast::<SequenceMsg>() {
1591                                for cmd in seq.0 {
1592                                    let cmd_kind: CommandKind = cmd.into();
1593                                    if let Some(msg) = cmd_kind.execute().await {
1594                                        if tx.send(msg).await.is_err() {
1595                                            debug!(target: "bubbletea::command", "async sequence command result dropped — receiver disconnected");
1596                                            break;
1597                                        }
1598                                    }
1599                                }
1600                            }
1601                        } else if tx.send(msg).await.is_err() {
1602                            debug!(target: "bubbletea::command", "async command result dropped — receiver disconnected");
1603                        }
1604                    }
1605                }
1606                // Cancellation requested - exit cleanly
1607                _ = cancel_token.cancelled() => {
1608                    // Command cancelled, cleanup happens automatically
1609                }
1610            }
1611        });
1612    }
1613
1614    /// Handle a command asynchronously using tokio::spawn (legacy, without tracking).
1615    #[allow(dead_code)]
1616    fn handle_command_async(&self, cmd: CommandKind, tx: tokio::sync::mpsc::Sender<Message>) {
1617        tokio::spawn(async move {
1618            if let Some(msg) = cmd.execute().await {
1619                // Handle batch and sequence messages specially
1620                if msg.is::<BatchMsg>() {
1621                    if let Some(batch) = msg.downcast::<BatchMsg>() {
1622                        for cmd in batch.0 {
1623                            let tx_clone = tx.clone();
1624                            tokio::spawn(async move {
1625                                let cmd_kind: CommandKind = cmd.into();
1626                                if let Some(msg) = cmd_kind.execute().await {
1627                                    if tx_clone.send(msg).await.is_err() {
1628                                        debug!(target: "bubbletea::command", "legacy async batch command result dropped — receiver disconnected");
1629                                    }
1630                                }
1631                            });
1632                        }
1633                    }
1634                } else if msg.is::<SequenceMsg>() {
1635                    if let Some(seq) = msg.downcast::<SequenceMsg>() {
1636                        for cmd in seq.0 {
1637                            let cmd_kind: CommandKind = cmd.into();
1638                            if let Some(msg) = cmd_kind.execute().await {
1639                                if tx.send(msg).await.is_err() {
1640                                    debug!(target: "bubbletea::command", "legacy async sequence command result dropped — receiver disconnected");
1641                                    break;
1642                                }
1643                            }
1644                        }
1645                    }
1646                } else if tx.send(msg).await.is_err() {
1647                    debug!(target: "bubbletea::command", "legacy async command result dropped — receiver disconnected");
1648                }
1649            }
1650        });
1651    }
1652}
1653
1654// =============================================================================
1655// Custom Input Parsing (for custom I/O mode)
1656// =============================================================================
1657
1658struct InputParser {
1659    buffer: Vec<u8>,
1660}
1661
1662impl InputParser {
1663    fn new() -> Self {
1664        Self { buffer: Vec::new() }
1665    }
1666
1667    /// Maximum input buffer size (1 MB). Prevents memory exhaustion from
1668    /// malformed escape sequences or malicious input streams.
1669    const MAX_BUFFER: usize = 1024 * 1024;
1670
1671    fn push_bytes(&mut self, bytes: &[u8], can_have_more_data: bool) -> Vec<Message> {
1672        if !bytes.is_empty() {
1673            if self.buffer.len() + bytes.len() > Self::MAX_BUFFER {
1674                debug!(
1675                    target: "bubbletea::input",
1676                    "Input buffer exceeded 1MB limit, draining"
1677                );
1678                self.buffer.clear();
1679            }
1680            self.buffer.extend_from_slice(bytes);
1681        }
1682
1683        let mut messages = Vec::new();
1684        loop {
1685            if self.buffer.is_empty() {
1686                break;
1687            }
1688
1689            match parse_one_message(&self.buffer, can_have_more_data) {
1690                ParseOutcome::NeedMore => break,
1691                ParseOutcome::Parsed(consumed, msg) => {
1692                    self.buffer.drain(0..consumed);
1693                    if let Some(msg) = msg {
1694                        messages.push(msg);
1695                    }
1696                }
1697            }
1698        }
1699
1700        messages
1701    }
1702
1703    fn flush(&mut self) -> Vec<Message> {
1704        let mut messages = Vec::new();
1705        loop {
1706            if self.buffer.is_empty() {
1707                break;
1708            }
1709
1710            match parse_one_message(&self.buffer, false) {
1711                ParseOutcome::NeedMore => break,
1712                ParseOutcome::Parsed(consumed, msg) => {
1713                    self.buffer.drain(0..consumed);
1714                    if let Some(msg) = msg {
1715                        messages.push(msg);
1716                    }
1717                }
1718            }
1719        }
1720        messages
1721    }
1722}
1723
1724enum ParseOutcome {
1725    NeedMore,
1726    Parsed(usize, Option<Message>),
1727}
1728
1729fn parse_one_message(buf: &[u8], can_have_more_data: bool) -> ParseOutcome {
1730    if buf.is_empty() {
1731        return ParseOutcome::NeedMore;
1732    }
1733
1734    if let Some(outcome) = parse_mouse_event(buf, can_have_more_data) {
1735        return outcome;
1736    }
1737
1738    if let Some(outcome) = parse_focus_event(buf, can_have_more_data) {
1739        return outcome;
1740    }
1741
1742    if let Some(outcome) = parse_bracketed_paste(buf, can_have_more_data) {
1743        return outcome;
1744    }
1745
1746    if let Some(outcome) = parse_key_sequence(buf, can_have_more_data) {
1747        return outcome;
1748    }
1749
1750    parse_runes_or_control(buf, can_have_more_data)
1751}
1752
1753fn parse_mouse_event(buf: &[u8], can_have_more_data: bool) -> Option<ParseOutcome> {
1754    if buf.starts_with(b"\x1b[M") {
1755        if buf.len() < 6 {
1756            return Some(if can_have_more_data {
1757                ParseOutcome::NeedMore
1758            } else {
1759                ParseOutcome::Parsed(1, Some(replacement_message()))
1760            });
1761        }
1762        let seq = &buf[..6];
1763        return Some(match crate::mouse::parse_mouse_event_sequence(seq) {
1764            Ok(msg) => ParseOutcome::Parsed(6, Some(Message::new(msg))),
1765            Err(_) => ParseOutcome::Parsed(1, Some(replacement_message())),
1766        });
1767    }
1768
1769    if buf.starts_with(b"\x1b[<") {
1770        if let Some(end_idx) = buf.iter().position(|b| *b == b'M' || *b == b'm') {
1771            let seq = &buf[..=end_idx];
1772            return Some(match crate::mouse::parse_mouse_event_sequence(seq) {
1773                Ok(msg) => ParseOutcome::Parsed(seq.len(), Some(Message::new(msg))),
1774                Err(_) => ParseOutcome::Parsed(1, Some(replacement_message())),
1775            });
1776        }
1777        return Some(if can_have_more_data {
1778            ParseOutcome::NeedMore
1779        } else {
1780            ParseOutcome::Parsed(1, Some(replacement_message()))
1781        });
1782    }
1783
1784    None
1785}
1786
1787fn parse_focus_event(buf: &[u8], can_have_more_data: bool) -> Option<ParseOutcome> {
1788    if buf.len() < 3 && buf.starts_with(b"\x1b[") && can_have_more_data {
1789        return Some(ParseOutcome::NeedMore);
1790    }
1791
1792    if buf.starts_with(b"\x1b[I") {
1793        return Some(ParseOutcome::Parsed(3, Some(Message::new(FocusMsg))));
1794    }
1795
1796    if buf.starts_with(b"\x1b[O") {
1797        return Some(ParseOutcome::Parsed(3, Some(Message::new(BlurMsg))));
1798    }
1799
1800    None
1801}
1802
1803fn parse_bracketed_paste(buf: &[u8], can_have_more_data: bool) -> Option<ParseOutcome> {
1804    const BP_START: &[u8] = b"\x1b[200~";
1805    const BP_END: &[u8] = b"\x1b[201~";
1806
1807    if !buf.starts_with(BP_START) {
1808        return None;
1809    }
1810
1811    if let Some(idx) = buf.windows(BP_END.len()).position(|w| w == BP_END) {
1812        let content = &buf[BP_START.len()..idx];
1813        let text = String::from_utf8_lossy(content);
1814        let runes = text.chars().collect::<Vec<char>>();
1815        let key = KeyMsg::from_runes(runes).with_paste();
1816        let total_len = idx + BP_END.len();
1817        return Some(ParseOutcome::Parsed(total_len, Some(message_from_key(key))));
1818    }
1819
1820    Some(if can_have_more_data {
1821        ParseOutcome::NeedMore
1822    } else {
1823        let content = &buf[BP_START.len()..];
1824        let text = String::from_utf8_lossy(content);
1825        let runes = text.chars().collect::<Vec<char>>();
1826        let key = KeyMsg::from_runes(runes).with_paste();
1827        ParseOutcome::Parsed(buf.len(), Some(message_from_key(key)))
1828    })
1829}
1830
1831fn parse_key_sequence(buf: &[u8], can_have_more_data: bool) -> Option<ParseOutcome> {
1832    if let Some((key, len)) = crate::key::parse_sequence_prefix(buf) {
1833        return Some(ParseOutcome::Parsed(len, Some(message_from_key(key))));
1834    }
1835
1836    // Check if it's a prefix of a known sequence
1837    if can_have_more_data && is_sequence_prefix(buf) {
1838        return Some(ParseOutcome::NeedMore);
1839    }
1840
1841    if buf.starts_with(b"\x1b")
1842        && let Some((mut key, len)) = crate::key::parse_sequence_prefix(&buf[1..])
1843    {
1844        if !key.alt {
1845            key = key.with_alt();
1846        }
1847        return Some(ParseOutcome::Parsed(len + 1, Some(message_from_key(key))));
1848    }
1849
1850    None
1851}
1852
1853fn parse_runes_or_control(buf: &[u8], can_have_more_data: bool) -> ParseOutcome {
1854    let mut alt = false;
1855    let mut idx = 0;
1856
1857    if buf[0] == 0x1b {
1858        if buf.len() == 1 {
1859            return if can_have_more_data {
1860                ParseOutcome::NeedMore
1861            } else {
1862                ParseOutcome::Parsed(1, Some(message_from_key(KeyMsg::from_type(KeyType::Esc))))
1863            };
1864        }
1865        alt = true;
1866        idx = 1;
1867    }
1868
1869    if idx >= buf.len() {
1870        return ParseOutcome::NeedMore;
1871    }
1872
1873    if let Some(key_type) = control_key_type(buf[idx]) {
1874        let mut key = KeyMsg::from_type(key_type);
1875        if alt {
1876            key = key.with_alt();
1877        }
1878        return ParseOutcome::Parsed(idx + 1, Some(message_from_key(key)));
1879    }
1880
1881    let mut runes = Vec::new();
1882    let mut i = idx;
1883    while i < buf.len() {
1884        let b = buf[i];
1885        if is_control_or_space(b) {
1886            break;
1887        }
1888
1889        let (ch, width, valid) = match decode_char(&buf[i..], can_have_more_data) {
1890            DecodeOutcome::NeedMore => return ParseOutcome::NeedMore,
1891            DecodeOutcome::Decoded(ch, width, valid) => (ch, width, valid),
1892        };
1893
1894        if !valid {
1895            runes.push(std::char::REPLACEMENT_CHARACTER);
1896            i += 1;
1897        } else {
1898            runes.push(ch);
1899            i += width;
1900        }
1901
1902        if alt {
1903            break;
1904        }
1905    }
1906
1907    if !runes.is_empty() {
1908        let mut key = KeyMsg::from_runes(runes);
1909        if alt {
1910            key = key.with_alt();
1911        }
1912        return ParseOutcome::Parsed(i, Some(message_from_key(key)));
1913    }
1914
1915    ParseOutcome::Parsed(1, Some(replacement_message()))
1916}
1917
1918fn control_key_type(byte: u8) -> Option<KeyType> {
1919    match byte {
1920        0x00 => Some(KeyType::Null),
1921        0x01 => Some(KeyType::CtrlA),
1922        0x02 => Some(KeyType::CtrlB),
1923        0x03 => Some(KeyType::CtrlC),
1924        0x04 => Some(KeyType::CtrlD),
1925        0x05 => Some(KeyType::CtrlE),
1926        0x06 => Some(KeyType::CtrlF),
1927        0x07 => Some(KeyType::CtrlG),
1928        0x08 => Some(KeyType::CtrlH),
1929        0x09 => Some(KeyType::Tab),
1930        0x0A => Some(KeyType::CtrlJ),
1931        0x0B => Some(KeyType::CtrlK),
1932        0x0C => Some(KeyType::CtrlL),
1933        0x0D => Some(KeyType::Enter),
1934        0x0E => Some(KeyType::CtrlN),
1935        0x0F => Some(KeyType::CtrlO),
1936        0x10 => Some(KeyType::CtrlP),
1937        0x11 => Some(KeyType::CtrlQ),
1938        0x12 => Some(KeyType::CtrlR),
1939        0x13 => Some(KeyType::CtrlS),
1940        0x14 => Some(KeyType::CtrlT),
1941        0x15 => Some(KeyType::CtrlU),
1942        0x16 => Some(KeyType::CtrlV),
1943        0x17 => Some(KeyType::CtrlW),
1944        0x18 => Some(KeyType::CtrlX),
1945        0x19 => Some(KeyType::CtrlY),
1946        0x1A => Some(KeyType::CtrlZ),
1947        0x1B => Some(KeyType::Esc),
1948        0x1C => Some(KeyType::CtrlBackslash),
1949        0x1D => Some(KeyType::CtrlCloseBracket),
1950        0x1E => Some(KeyType::CtrlCaret),
1951        0x1F => Some(KeyType::CtrlUnderscore),
1952        0x20 => Some(KeyType::Space),
1953        0x7F => Some(KeyType::Backspace),
1954        _ => None,
1955    }
1956}
1957
1958fn is_control_or_space(byte: u8) -> bool {
1959    byte <= 0x1F || byte == 0x7F || byte == b' '
1960}
1961
1962enum DecodeOutcome {
1963    NeedMore,
1964    Decoded(char, usize, bool),
1965}
1966
1967fn decode_char(input: &[u8], can_have_more_data: bool) -> DecodeOutcome {
1968    let first = input[0];
1969    let width = if first < 0x80 {
1970        1
1971    } else if (first & 0xE0) == 0xC0 {
1972        2
1973    } else if (first & 0xF0) == 0xE0 {
1974        3
1975    } else if (first & 0xF8) == 0xF0 {
1976        4
1977    } else {
1978        return DecodeOutcome::Decoded(std::char::REPLACEMENT_CHARACTER, 1, false);
1979    };
1980
1981    if input.len() < width {
1982        return if can_have_more_data {
1983            DecodeOutcome::NeedMore
1984        } else {
1985            DecodeOutcome::Decoded(std::char::REPLACEMENT_CHARACTER, 1, false)
1986        };
1987    }
1988
1989    match std::str::from_utf8(&input[..width]) {
1990        Ok(s) => {
1991            let ch = s.chars().next().unwrap_or(std::char::REPLACEMENT_CHARACTER);
1992            DecodeOutcome::Decoded(ch, width, true)
1993        }
1994        Err(_) => DecodeOutcome::Decoded(std::char::REPLACEMENT_CHARACTER, 1, false),
1995    }
1996}
1997
1998fn message_from_key(key: KeyMsg) -> Message {
1999    if key.key_type == KeyType::CtrlC {
2000        Message::new(InterruptMsg)
2001    } else {
2002        Message::new(key)
2003    }
2004}
2005
2006fn replacement_message() -> Message {
2007    Message::new(KeyMsg::from_char(std::char::REPLACEMENT_CHARACTER))
2008}
2009
2010#[cfg(test)]
2011mod tests {
2012    use super::*;
2013    use tokio_util::sync::CancellationToken;
2014    use tokio_util::task::TaskTracker;
2015
2016    struct TestModel {
2017        count: i32,
2018    }
2019
2020    impl Model for TestModel {
2021        fn init(&self) -> Option<Cmd> {
2022            None
2023        }
2024
2025        fn update(&mut self, msg: Message) -> Option<Cmd> {
2026            if let Some(n) = msg.downcast::<i32>() {
2027                self.count += n;
2028            }
2029            None
2030        }
2031
2032        fn view(&self) -> String {
2033            format!("Count: {}", self.count)
2034        }
2035    }
2036
2037    #[test]
2038    fn test_program_options_default() {
2039        let opts = ProgramOptions::default();
2040        assert!(!opts.alt_screen);
2041        assert!(!opts.mouse_cell_motion);
2042        assert!(opts.bracketed_paste);
2043        assert_eq!(opts.fps, 60);
2044    }
2045
2046    #[test]
2047    fn test_program_builder() {
2048        let model = TestModel { count: 0 };
2049        let program = Program::new(model)
2050            .with_alt_screen()
2051            .with_mouse_cell_motion()
2052            .with_fps(30);
2053
2054        assert!(program.options.alt_screen);
2055        assert!(program.options.mouse_cell_motion);
2056        assert_eq!(program.options.fps, 30);
2057    }
2058
2059    #[test]
2060    fn test_program_fps_max() {
2061        let model = TestModel { count: 0 };
2062        let program = Program::new(model).with_fps(200);
2063        assert_eq!(program.options.fps, 120); // Capped at 120
2064    }
2065
2066    #[test]
2067    fn test_program_fps_min() {
2068        let model = TestModel { count: 0 };
2069        let program = Program::new(model).with_fps(0);
2070        assert_eq!(program.options.fps, 1); // Clamped to minimum of 1 to avoid division by zero
2071    }
2072
2073    // === Bracketed Paste Parsing Tests ===
2074
2075    #[test]
2076    fn test_parse_bracketed_paste_basic() {
2077        // Bracketed paste sequence: ESC[200~ ... ESC[201~
2078        let input = b"\x1b[200~hello world\x1b[201~";
2079        let result = parse_bracketed_paste(input, false);
2080
2081        assert!(result.is_some());
2082        if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2083            assert_eq!(len, input.len());
2084            let key = msg.downcast_ref::<KeyMsg>().unwrap();
2085            assert!(key.paste, "Key should have paste flag set");
2086            assert_eq!(
2087                key.runes,
2088                vec!['h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd']
2089            );
2090        } else {
2091            panic!("Expected Parsed outcome");
2092        }
2093    }
2094
2095    #[test]
2096    fn test_parse_bracketed_paste_empty() {
2097        let input = b"\x1b[200~\x1b[201~";
2098        let result = parse_bracketed_paste(input, false);
2099
2100        assert!(result.is_some());
2101        if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2102            assert_eq!(len, input.len());
2103            let key = msg.downcast_ref::<KeyMsg>().unwrap();
2104            assert!(key.paste);
2105            assert!(key.runes.is_empty());
2106        } else {
2107            panic!("Expected Parsed outcome");
2108        }
2109    }
2110
2111    #[test]
2112    fn test_parse_bracketed_paste_multiline() {
2113        let input = b"\x1b[200~line1\nline2\nline3\x1b[201~";
2114        let result = parse_bracketed_paste(input, false);
2115
2116        assert!(result.is_some());
2117        if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2118            assert_eq!(len, input.len());
2119            let key = msg.downcast_ref::<KeyMsg>().unwrap();
2120            assert!(key.paste);
2121            let text: String = key.runes.iter().collect();
2122            assert_eq!(text, "line1\nline2\nline3");
2123        } else {
2124            panic!("Expected Parsed outcome");
2125        }
2126    }
2127
2128    #[test]
2129    fn test_parse_bracketed_paste_unicode() {
2130        let input = "\x1b[200~hello 世界 🌍\x1b[201~".as_bytes();
2131        let result = parse_bracketed_paste(input, false);
2132
2133        assert!(result.is_some());
2134        if let Some(ParseOutcome::Parsed(_, Some(msg))) = result {
2135            let key = msg.downcast_ref::<KeyMsg>().unwrap();
2136            assert!(key.paste);
2137            let text: String = key.runes.iter().collect();
2138            assert_eq!(text, "hello 世界 🌍");
2139        } else {
2140            panic!("Expected Parsed outcome");
2141        }
2142    }
2143
2144    #[test]
2145    fn test_parse_bracketed_paste_incomplete() {
2146        // Missing end sequence, with more data expected
2147        let input = b"\x1b[200~hello";
2148        let result = parse_bracketed_paste(input, true);
2149
2150        assert!(matches!(result, Some(ParseOutcome::NeedMore)));
2151    }
2152
2153    #[test]
2154    fn test_parse_bracketed_paste_incomplete_no_more_data() {
2155        // Missing end sequence, no more data expected - should parse what we have
2156        let input = b"\x1b[200~hello";
2157        let result = parse_bracketed_paste(input, false);
2158
2159        assert!(result.is_some());
2160        if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2161            assert_eq!(len, input.len());
2162            let key = msg.downcast_ref::<KeyMsg>().unwrap();
2163            assert!(key.paste);
2164            let text: String = key.runes.iter().collect();
2165            assert_eq!(text, "hello");
2166        } else {
2167            panic!("Expected Parsed outcome");
2168        }
2169    }
2170
2171    #[test]
2172    fn test_parse_bracketed_paste_not_bracketed() {
2173        // Regular input, not bracketed paste
2174        let input = b"hello";
2175        let result = parse_bracketed_paste(input, false);
2176        assert!(result.is_none(), "Non-paste input should return None");
2177    }
2178
2179    #[test]
2180    fn test_parse_bracketed_paste_large() {
2181        // Large paste (simulating a big paste operation)
2182        let content = "a".repeat(10000);
2183        let input = format!("\x1b[200~{}\x1b[201~", content);
2184        let result = parse_bracketed_paste(input.as_bytes(), false);
2185
2186        assert!(result.is_some());
2187        if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2188            assert_eq!(len, input.len());
2189            let key = msg.downcast_ref::<KeyMsg>().unwrap();
2190            assert!(key.paste);
2191            assert_eq!(key.runes.len(), 10000);
2192        } else {
2193            panic!("Expected Parsed outcome");
2194        }
2195    }
2196
2197    // === Thread Pool / spawn_batch Tests (bd-3ut7) ===
2198
2199    #[test]
2200    fn spawn_batch_executes_closure() {
2201        use std::sync::Arc;
2202        use std::sync::atomic::{AtomicBool, Ordering};
2203
2204        let executed = Arc::new(AtomicBool::new(false));
2205        let clone = executed.clone();
2206
2207        spawn_batch(move || {
2208            clone.store(true, Ordering::SeqCst);
2209        });
2210
2211        // Allow time for the task to run
2212        thread::sleep(Duration::from_millis(200));
2213        assert!(
2214            executed.load(Ordering::SeqCst),
2215            "spawn_batch should execute the closure"
2216        );
2217    }
2218
2219    #[test]
2220    fn spawn_batch_handles_many_concurrent_tasks() {
2221        use std::sync::Arc;
2222        use std::sync::atomic::{AtomicUsize, Ordering};
2223
2224        let counter = Arc::new(AtomicUsize::new(0));
2225        let task_count = 200;
2226
2227        for _ in 0..task_count {
2228            let c = counter.clone();
2229            spawn_batch(move || {
2230                c.fetch_add(1, Ordering::SeqCst);
2231            });
2232        }
2233
2234        // Wait for all tasks with generous timeout
2235        let deadline = std::time::Instant::now() + Duration::from_secs(5);
2236        while counter.load(Ordering::SeqCst) < task_count && std::time::Instant::now() < deadline {
2237            thread::sleep(Duration::from_millis(50));
2238        }
2239
2240        assert_eq!(
2241            counter.load(Ordering::SeqCst),
2242            task_count,
2243            "All {task_count} tasks should complete"
2244        );
2245    }
2246
2247    #[test]
2248    fn handle_command_batch_executes_all_subcommands() {
2249        let model = TestModel { count: 0 };
2250        let program = Program::new(model);
2251        let (tx, rx) = mpsc::channel();
2252        let command_threads = Arc::new(Mutex::new(Vec::new()));
2253
2254        // Build a batch of 50 sub-commands, each returning a unique i32
2255        let cmds: Vec<Option<Cmd>> = (0..50)
2256            .map(|i| Some(Cmd::new(move || Message::new(i))))
2257            .collect();
2258        let batch_cmd = crate::batch(cmds).unwrap();
2259
2260        program.handle_command(batch_cmd, tx, Arc::clone(&command_threads));
2261
2262        // Collect all 50 results
2263        let mut results = Vec::new();
2264        let deadline = std::time::Instant::now() + Duration::from_secs(5);
2265        while results.len() < 50 && std::time::Instant::now() < deadline {
2266            if let Ok(msg) = rx.recv_timeout(Duration::from_millis(100)) {
2267                results.push(msg.downcast::<i32>().unwrap());
2268            }
2269        }
2270
2271        assert_eq!(
2272            results.len(),
2273            50,
2274            "All 50 batch sub-commands should produce results"
2275        );
2276        results.sort();
2277        let expected: Vec<i32> = (0..50).collect();
2278        assert_eq!(
2279            results, expected,
2280            "Each sub-command value should be received exactly once"
2281        );
2282    }
2283
2284    #[cfg(feature = "thread-pool")]
2285    #[test]
2286    fn handle_command_batch_bounded_parallelism() {
2287        use std::sync::atomic::{AtomicUsize, Ordering};
2288
2289        let model = TestModel { count: 0 };
2290        let program = Program::new(model);
2291        let (tx, rx) = mpsc::channel();
2292        let command_threads = Arc::new(Mutex::new(Vec::new()));
2293
2294        let active = Arc::new(AtomicUsize::new(0));
2295        let max_active = Arc::new(AtomicUsize::new(0));
2296
2297        let task_count: usize = 100;
2298        let cmds: Vec<Option<Cmd>> = (0..task_count)
2299            .map(|_| {
2300                let a = active.clone();
2301                let m = max_active.clone();
2302                Some(Cmd::new(move || {
2303                    let current = a.fetch_add(1, Ordering::SeqCst) + 1;
2304                    m.fetch_max(current, Ordering::SeqCst);
2305                    thread::sleep(Duration::from_millis(5));
2306                    a.fetch_sub(1, Ordering::SeqCst);
2307                    Message::new(1i32)
2308                }))
2309            })
2310            .collect();
2311        let batch_cmd = crate::batch(cmds).unwrap();
2312
2313        program.handle_command(batch_cmd, tx, Arc::clone(&command_threads));
2314
2315        // Wait for all results
2316        let mut count = 0usize;
2317        let deadline = std::time::Instant::now() + Duration::from_secs(15);
2318        while count < task_count && std::time::Instant::now() < deadline {
2319            if let Ok(_msg) = rx.recv_timeout(Duration::from_millis(100)) {
2320                count += 1;
2321            }
2322        }
2323
2324        assert_eq!(count, task_count, "All batch commands should complete");
2325
2326        let observed_max = max_active.load(Ordering::SeqCst);
2327        let num_cpus = thread::available_parallelism()
2328            .map(|n| n.get())
2329            .unwrap_or(4);
2330
2331        // With rayon, max concurrent tasks should be bounded by the pool size.
2332        // Allow num_cpus + 2 to account for the outer thread::spawn and scheduling jitter.
2333        assert!(
2334            observed_max <= num_cpus + 2,
2335            "Expected bounded parallelism near {num_cpus}, but observed {observed_max}. \
2336             Without thread-pool feature, this would be near {task_count}."
2337        );
2338    }
2339
2340    #[test]
2341    fn handle_command_large_batch_no_panic() {
2342        let model = TestModel { count: 0 };
2343        let program = Program::new(model);
2344        let (tx, rx) = mpsc::channel();
2345        let command_threads = Arc::new(Mutex::new(Vec::new()));
2346
2347        // Create a large batch of 500 lightweight commands
2348        let cmds: Vec<Option<Cmd>> = (0..500)
2349            .map(|i| Some(Cmd::new(move || Message::new(i))))
2350            .collect();
2351        let batch_cmd = crate::batch(cmds).unwrap();
2352
2353        program.handle_command(batch_cmd, tx, Arc::clone(&command_threads));
2354
2355        // Collect results with timeout - don't need all, just verify no panic
2356        let mut count = 0usize;
2357        let deadline = std::time::Instant::now() + Duration::from_secs(10);
2358        while count < 500 && std::time::Instant::now() < deadline {
2359            if let Ok(_msg) = rx.recv_timeout(Duration::from_millis(50)) {
2360                count += 1;
2361            }
2362        }
2363
2364        assert_eq!(count, 500, "Large batch should complete without panic");
2365    }
2366
2367    // === Thread Lifecycle Tests (bd-1327) ===
2368
2369    #[test]
2370    fn test_thread_handles_captured() {
2371        // Verify that JoinHandle types can be stored in Option variables.
2372        // This is a compile-time check that the types work as expected.
2373        let handle: Option<thread::JoinHandle<()>> = Some(thread::spawn(|| {
2374            // Thread does nothing
2375        }));
2376
2377        assert!(handle.is_some(), "Handle should be captured");
2378
2379        // Join to clean up
2380        if let Some(h) = handle {
2381            h.join().expect("Thread should join successfully");
2382        }
2383    }
2384
2385    #[test]
2386    fn test_threads_exit_on_channel_drop() {
2387        use std::sync::Arc;
2388        use std::sync::atomic::{AtomicBool, Ordering};
2389
2390        // Create a flag to track if the thread has exited
2391        let thread_exited = Arc::new(AtomicBool::new(false));
2392        let thread_exited_clone = Arc::clone(&thread_exited);
2393
2394        // Create a channel like the event_loop does
2395        let (tx, rx) = mpsc::channel::<i32>();
2396
2397        // Spawn a thread that blocks on recv() like the external forwarder
2398        let handle = thread::spawn(move || {
2399            // This loop will exit when tx is dropped (recv() returns Err)
2400            while rx.recv().is_ok() {}
2401            thread_exited_clone.store(true, Ordering::SeqCst);
2402        });
2403
2404        // Thread should be running
2405        assert!(!thread_exited.load(Ordering::SeqCst));
2406
2407        // Drop the sender - this should cause recv() to return Err
2408        drop(tx);
2409
2410        // Join the thread - it should exit promptly
2411        handle
2412            .join()
2413            .expect("Thread should join after channel drop");
2414
2415        // Verify the thread actually ran its exit code
2416        assert!(
2417            thread_exited.load(Ordering::SeqCst),
2418            "Thread should have exited after channel drop"
2419        );
2420    }
2421
2422    #[test]
2423    fn test_shutdown_joins_all_threads() {
2424        use std::sync::Arc;
2425        use std::sync::atomic::{AtomicUsize, Ordering};
2426
2427        // Track how many threads have been joined
2428        let join_count = Arc::new(AtomicUsize::new(0));
2429
2430        // Create multiple thread handles like event_loop does
2431        let mut handles: Vec<thread::JoinHandle<()>> = Vec::new();
2432
2433        for i in 0..3 {
2434            let join_count_clone = Arc::clone(&join_count);
2435            handles.push(thread::spawn(move || {
2436                // Simulate some work
2437                thread::sleep(Duration::from_millis(10 * (i as u64 + 1)));
2438                join_count_clone.fetch_add(1, Ordering::SeqCst);
2439            }));
2440        }
2441
2442        // Join all threads (simulating shutdown behavior)
2443        for handle in handles {
2444            match handle.join() {
2445                Ok(()) => {} // Thread joined successfully
2446                Err(e) => panic!("Thread panicked during join: {:?}", e),
2447            }
2448        }
2449
2450        // All threads should have completed
2451        assert_eq!(
2452            join_count.load(Ordering::SeqCst),
2453            3,
2454            "All threads should have completed and been joined"
2455        );
2456    }
2457
2458    #[test]
2459    fn test_thread_panic_handled_gracefully() {
2460        // Spawn a thread that will panic
2461        let handle = thread::spawn(|| {
2462            panic!("Intentional panic for testing");
2463        });
2464
2465        // The join should return Err, not propagate the panic
2466        let result = handle.join();
2467        // Verify we can inspect the panic payload
2468        let e = result.expect_err("Join should return Err when thread panics");
2469        // The panic payload is available for logging
2470        let _panic_info = format!("{:?}", e);
2471        // In production code, this would be logged with tracing::warn!
2472    }
2473
2474    #[test]
2475    fn test_external_forwarder_pattern() {
2476        // Test the exact pattern used in event_loop for external message forwarding
2477        let (external_tx, external_rx) = mpsc::channel::<Message>();
2478        let (event_tx, event_rx) = mpsc::channel::<Message>();
2479
2480        // Spawn forwarder thread like event_loop does
2481        let tx_clone = event_tx.clone();
2482        let handle = thread::spawn(move || {
2483            while let Ok(msg) = external_rx.recv() {
2484                if tx_clone.send(msg).is_err() {
2485                    break;
2486                }
2487            }
2488        });
2489
2490        // Send some messages through the forwarder
2491        external_tx.send(Message::new(1i32)).unwrap();
2492        external_tx.send(Message::new(2i32)).unwrap();
2493        external_tx.send(Message::new(3i32)).unwrap();
2494
2495        // Drop the external sender to signal the forwarder to exit
2496        drop(external_tx);
2497
2498        // Join should complete because the thread exits when external_rx.recv() returns Err
2499        let join_result = handle.join();
2500        assert!(
2501            join_result.is_ok(),
2502            "Forwarder thread should exit cleanly when sender is dropped"
2503        );
2504
2505        // Verify messages were forwarded
2506        let mut received = Vec::new();
2507        while let Ok(msg) = event_rx.try_recv() {
2508            if let Some(&n) = msg.downcast_ref::<i32>() {
2509                received.push(n);
2510            }
2511        }
2512        assert_eq!(received, vec![1, 2, 3], "All messages should be forwarded");
2513    }
2514
2515    // === Async TaskTracker Integration Tests (bd-2i18) ===
2516
2517    #[test]
2518    fn test_task_tracker_spawn_blocking_tracks_thread() {
2519        // Verify that TaskTracker.spawn_blocking() actually tracks the spawned thread.
2520        // The key behavior: task_tracker.wait() should block until the spawned thread completes.
2521        use std::sync::Arc;
2522        use std::sync::atomic::{AtomicBool, Ordering};
2523
2524        let rt = tokio::runtime::Builder::new_current_thread()
2525            .enable_all()
2526            .build()
2527            .expect("Failed to create runtime");
2528
2529        let thread_completed = Arc::new(AtomicBool::new(false));
2530        let thread_completed_clone = Arc::clone(&thread_completed);
2531
2532        rt.block_on(async {
2533            let task_tracker = TaskTracker::new();
2534
2535            // Spawn a blocking task that takes some time
2536            task_tracker.spawn_blocking(move || {
2537                thread::sleep(Duration::from_millis(50));
2538                thread_completed_clone.store(true, Ordering::SeqCst);
2539            });
2540
2541            // Close the tracker (prevents new tasks)
2542            task_tracker.close();
2543
2544            // Wait for all tracked tasks - should block until thread completes
2545            task_tracker.wait().await;
2546
2547            // After wait() returns, the thread should have completed
2548            assert!(
2549                thread_completed.load(Ordering::SeqCst),
2550                "spawn_blocking task should complete before wait() returns"
2551            );
2552        });
2553    }
2554
2555    #[test]
2556    fn test_cancellation_token_stops_blocking_task() {
2557        // Verify that CancellationToken properly signals spawn_blocking threads to exit.
2558        use std::sync::Arc;
2559        use std::sync::atomic::{AtomicBool, Ordering};
2560
2561        let rt = tokio::runtime::Builder::new_current_thread()
2562            .enable_all()
2563            .build()
2564            .expect("Failed to create runtime");
2565
2566        let task_exited = Arc::new(AtomicBool::new(false));
2567        let task_exited_clone = Arc::clone(&task_exited);
2568
2569        rt.block_on(async {
2570            let cancel_token = CancellationToken::new();
2571            let task_tracker = TaskTracker::new();
2572
2573            let cancel_clone = cancel_token.clone();
2574
2575            // Spawn a task that polls the cancellation token
2576            task_tracker.spawn_blocking(move || {
2577                // Loop until cancelled (mimics event thread pattern)
2578                loop {
2579                    if cancel_clone.is_cancelled() {
2580                        task_exited_clone.store(true, Ordering::SeqCst);
2581                        break;
2582                    }
2583                    thread::sleep(Duration::from_millis(10));
2584                }
2585            });
2586
2587            // Task should be running
2588            thread::sleep(Duration::from_millis(30));
2589            assert!(
2590                !task_exited.load(Ordering::SeqCst),
2591                "Task should still be running before cancellation"
2592            );
2593
2594            // Cancel and wait
2595            cancel_token.cancel();
2596            task_tracker.close();
2597            task_tracker.wait().await;
2598
2599            assert!(
2600                task_exited.load(Ordering::SeqCst),
2601                "Task should exit after cancellation"
2602            );
2603        });
2604    }
2605
2606    #[test]
2607    fn test_graceful_shutdown_waits_for_all_blocking_tasks() {
2608        // Verify that graceful shutdown waits for ALL spawn_blocking tasks.
2609        use std::sync::Arc;
2610        use std::sync::atomic::{AtomicUsize, Ordering};
2611
2612        let rt = tokio::runtime::Builder::new_current_thread()
2613            .enable_all()
2614            .build()
2615            .expect("Failed to create runtime");
2616
2617        let completed_count = Arc::new(AtomicUsize::new(0));
2618
2619        rt.block_on(async {
2620            let cancel_token = CancellationToken::new();
2621            let task_tracker = TaskTracker::new();
2622
2623            // Spawn multiple blocking tasks with different durations
2624            for i in 0..3 {
2625                let count_clone = Arc::clone(&completed_count);
2626                let cancel_clone = cancel_token.clone();
2627                task_tracker.spawn_blocking(move || {
2628                    // Wait for cancellation or work
2629                    loop {
2630                        if cancel_clone.is_cancelled() {
2631                            break;
2632                        }
2633                        thread::sleep(Duration::from_millis(10));
2634                    }
2635                    // Simulate different completion times
2636                    thread::sleep(Duration::from_millis(10 * (i as u64 + 1)));
2637                    count_clone.fetch_add(1, Ordering::SeqCst);
2638                });
2639            }
2640
2641            // All tasks should be running
2642            thread::sleep(Duration::from_millis(30));
2643            assert_eq!(
2644                completed_count.load(Ordering::SeqCst),
2645                0,
2646                "No tasks should complete before shutdown"
2647            );
2648
2649            // Initiate graceful shutdown (mimics Program::graceful_shutdown)
2650            cancel_token.cancel();
2651            task_tracker.close();
2652
2653            // Use timeout similar to graceful_shutdown
2654            let timeout_result: std::result::Result<(), tokio::time::error::Elapsed> =
2655                tokio::time::timeout(Duration::from_secs(2), task_tracker.wait()).await;
2656
2657            assert!(
2658                timeout_result.is_ok(),
2659                "All tasks should complete within timeout"
2660            );
2661
2662            assert_eq!(
2663                completed_count.load(Ordering::SeqCst),
2664                3,
2665                "All 3 tasks should complete during graceful shutdown"
2666            );
2667        });
2668    }
2669
2670    #[test]
2671    fn test_spawn_blocking_vs_spawn_difference() {
2672        // Document why spawn_blocking is needed vs std::thread::spawn.
2673        // With std::thread::spawn, TaskTracker.wait() doesn't wait for the thread.
2674        // With spawn_blocking, it does.
2675        use std::sync::Arc;
2676        use std::sync::atomic::{AtomicBool, Ordering};
2677
2678        let rt = tokio::runtime::Builder::new_current_thread()
2679            .enable_all()
2680            .build()
2681            .expect("Failed to create runtime");
2682
2683        // Test 1: std::thread::spawn is NOT tracked
2684        let untracked_done = Arc::new(AtomicBool::new(false));
2685        let untracked_done_clone = Arc::clone(&untracked_done);
2686
2687        rt.block_on(async {
2688            let task_tracker = TaskTracker::new();
2689
2690            // This thread is NOT spawned via task_tracker
2691            let _handle = thread::spawn(move || {
2692                thread::sleep(Duration::from_millis(100));
2693                untracked_done_clone.store(true, Ordering::SeqCst);
2694            });
2695
2696            task_tracker.close();
2697            task_tracker.wait().await;
2698
2699            // wait() returns immediately because no tasks were tracked!
2700            // The untracked thread may still be running
2701            // (We don't assert here because timing is unpredictable)
2702        });
2703
2704        // Test 2: spawn_blocking IS tracked
2705        let tracked_done = Arc::new(AtomicBool::new(false));
2706        let tracked_done_clone = Arc::clone(&tracked_done);
2707
2708        rt.block_on(async {
2709            let task_tracker = TaskTracker::new();
2710
2711            // This thread IS tracked
2712            task_tracker.spawn_blocking(move || {
2713                thread::sleep(Duration::from_millis(50));
2714                tracked_done_clone.store(true, Ordering::SeqCst);
2715            });
2716
2717            task_tracker.close();
2718            task_tracker.wait().await;
2719
2720            // wait() blocks until the tracked task completes
2721            assert!(
2722                tracked_done.load(Ordering::SeqCst),
2723                "spawn_blocking task should complete before wait() returns"
2724            );
2725        });
2726    }
2727
2728    #[test]
2729    fn test_event_thread_pattern_with_poll_timeout() {
2730        // Test the exact pattern used for the event thread:
2731        // - spawn_blocking with CancellationToken
2732        // - poll with timeout to check cancellation
2733        use std::sync::Arc;
2734        use std::sync::atomic::{AtomicUsize, Ordering};
2735
2736        let rt = tokio::runtime::Builder::new_current_thread()
2737            .enable_all()
2738            .build()
2739            .expect("Failed to create runtime");
2740
2741        let poll_count = Arc::new(AtomicUsize::new(0));
2742        let poll_count_clone = Arc::clone(&poll_count);
2743
2744        rt.block_on(async {
2745            let cancel_token = CancellationToken::new();
2746            let task_tracker = TaskTracker::new();
2747
2748            let cancel_clone = cancel_token.clone();
2749
2750            // Spawn event thread pattern
2751            task_tracker.spawn_blocking(move || {
2752                loop {
2753                    if cancel_clone.is_cancelled() {
2754                        break;
2755                    }
2756                    // Simulate poll with timeout (like event::poll(Duration::from_millis(100)))
2757                    thread::sleep(Duration::from_millis(25));
2758                    poll_count_clone.fetch_add(1, Ordering::SeqCst);
2759                }
2760            });
2761
2762            // Let the thread poll a few times
2763            thread::sleep(Duration::from_millis(100));
2764            let count_before_cancel = poll_count.load(Ordering::SeqCst);
2765            assert!(
2766                count_before_cancel >= 2,
2767                "Thread should have polled multiple times: {}",
2768                count_before_cancel
2769            );
2770
2771            // Cancel and wait
2772            cancel_token.cancel();
2773            task_tracker.close();
2774            task_tracker.wait().await;
2775
2776            // Thread should have exited
2777            let final_count = poll_count.load(Ordering::SeqCst);
2778            assert!(
2779                final_count >= count_before_cancel,
2780                "Poll count should not decrease"
2781            );
2782        });
2783    }
2784}