1use std::io::{self, Read, Write};
7use std::sync::mpsc::{self, Receiver, Sender};
8use std::sync::{Arc, Mutex};
9use std::thread;
10use std::time::Duration;
11
12#[cfg(feature = "async")]
13use crate::command::CommandKind;
14
15#[cfg(feature = "async")]
16use tokio_util::sync::CancellationToken;
17
18#[cfg(feature = "async")]
19use tokio_util::task::TaskTracker;
20
21use tracing::debug;
22
23#[cfg(feature = "thread-pool")]
32fn spawn_batch(f: impl FnOnce() + Send + 'static) {
33 rayon::spawn(f);
34}
35
36#[cfg(not(feature = "thread-pool"))]
37fn spawn_batch(f: impl FnOnce() + Send + 'static) {
38 let _ = thread::spawn(f);
39}
40
41use crossterm::{
42 cursor::{Hide, MoveTo, Show},
43 event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyEventKind},
44 execute,
45 terminal::{
46 self, Clear, ClearType, EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode,
47 enable_raw_mode,
48 },
49};
50
51use crate::command::Cmd;
52use crate::key::{from_crossterm_key, is_sequence_prefix};
53use crate::message::{
54 BatchMsg, BlurMsg, FocusMsg, InterruptMsg, Message, PrintLineMsg, QuitMsg,
55 RequestWindowSizeMsg, SequenceMsg, SetWindowTitleMsg, WindowSizeMsg,
56};
57use crate::mouse::from_crossterm_mouse;
58use crate::screen::{ReleaseTerminalMsg, RestoreTerminalMsg};
59use crate::{KeyMsg, KeyType};
60
61#[derive(thiserror::Error, Debug)]
114pub enum Error {
115 #[error("terminal io error: {0}")]
136 Io(#[from] io::Error),
137
138 #[error("failed to {action} raw mode: {source}")]
149 RawModeFailure {
150 action: &'static str,
152 #[source]
154 source: io::Error,
155 },
156
157 #[error("failed to {action} alternate screen: {source}")]
167 AltScreenFailure {
168 action: &'static str,
170 #[source]
172 source: io::Error,
173 },
174
175 #[error("failed to poll terminal events: {0}")]
184 EventPoll(io::Error),
185
186 #[error("failed to render view: {0}")]
195 Render(io::Error),
196}
197
198pub type Result<T> = std::result::Result<T, Error>;
229
230pub trait Model: Send + 'static {
257 fn init(&self) -> Option<Cmd>;
261
262 fn update(&mut self, msg: Message) -> Option<Cmd>;
266
267 fn view(&self) -> String;
271}
272
273#[derive(Debug, Clone)]
275pub struct ProgramOptions {
276 pub alt_screen: bool,
278 pub mouse_cell_motion: bool,
280 pub mouse_all_motion: bool,
282 pub bracketed_paste: bool,
284 pub report_focus: bool,
286 pub custom_io: bool,
288 pub fps: u32,
290 pub without_signals: bool,
292 pub without_catch_panics: bool,
294}
295
296impl Default for ProgramOptions {
297 fn default() -> Self {
298 Self {
299 alt_screen: false,
300 mouse_cell_motion: false,
301 mouse_all_motion: false,
302 bracketed_paste: true,
303 report_focus: false,
304 custom_io: false,
305 fps: 60,
306 without_signals: false,
307 without_catch_panics: false,
308 }
309 }
310}
311
312pub struct ProgramHandle<M: Model> {
334 tx: Sender<Message>,
335 handle: Option<thread::JoinHandle<Result<M>>>,
336}
337
338impl<M: Model> ProgramHandle<M> {
339 pub fn send<T: Into<Message>>(&self, msg: T) -> bool {
345 self.tx.send(msg.into()).is_ok()
346 }
347
348 pub fn quit(&self) {
352 let _ = self.tx.send(Message::new(QuitMsg));
353 }
354
355 pub fn wait(mut self) -> Result<M> {
359 if let Some(handle) = self.handle.take() {
360 handle
361 .join()
362 .map_err(|_| Error::Io(io::Error::other("program thread panicked")))?
363 } else {
364 Err(Error::Io(io::Error::other("program already joined")))
365 }
366 }
367
368 pub fn is_running(&self) -> bool {
370 self.handle.as_ref().is_some_and(|h| !h.is_finished())
371 }
372}
373
374pub struct Program<M: Model> {
392 model: M,
393 options: ProgramOptions,
394 external_rx: Option<Receiver<Message>>,
395 input: Option<Box<dyn Read + Send>>,
396 output: Option<Box<dyn Write + Send>>,
397}
398
399impl<M: Model> Program<M> {
400 pub fn new(model: M) -> Self {
402 Self {
403 model,
404 options: ProgramOptions::default(),
405 external_rx: None,
406 input: None,
407 output: None,
408 }
409 }
410
411 pub fn with_input_receiver(mut self, rx: Receiver<Message>) -> Self {
416 self.external_rx = Some(rx);
417 self
418 }
419
420 pub fn with_input<R: Read + Send + 'static>(mut self, input: R) -> Self {
425 self.input = Some(Box::new(input));
426 self.options.custom_io = true;
427 self
428 }
429
430 pub fn with_output<W: Write + Send + 'static>(mut self, output: W) -> Self {
434 self.output = Some(Box::new(output));
435 self.options.custom_io = true;
436 self
437 }
438
439 pub fn with_alt_screen(mut self) -> Self {
441 self.options.alt_screen = true;
442 self
443 }
444
445 pub fn with_mouse_cell_motion(mut self) -> Self {
449 self.options.mouse_cell_motion = true;
450 self
451 }
452
453 pub fn with_mouse_all_motion(mut self) -> Self {
457 self.options.mouse_all_motion = true;
458 self
459 }
460
461 pub fn with_fps(mut self, fps: u32) -> Self {
465 self.options.fps = fps.clamp(1, 120);
466 self
467 }
468
469 pub fn with_report_focus(mut self) -> Self {
473 self.options.report_focus = true;
474 self
475 }
476
477 pub fn without_bracketed_paste(mut self) -> Self {
479 self.options.bracketed_paste = false;
480 self
481 }
482
483 pub fn without_signal_handler(mut self) -> Self {
485 self.options.without_signals = true;
486 self
487 }
488
489 pub fn without_catch_panics(mut self) -> Self {
491 self.options.without_catch_panics = true;
492 self
493 }
494
495 pub fn with_custom_io(mut self) -> Self {
500 self.options.custom_io = true;
501 self
502 }
503
504 pub fn run_with_writer<W: Write + Send + 'static>(self, mut writer: W) -> Result<M> {
506 let options = self.options.clone();
508
509 if !options.custom_io {
511 enable_raw_mode()?;
512 }
513
514 if options.alt_screen {
515 execute!(writer, EnterAlternateScreen)?;
516 }
517
518 execute!(writer, Hide)?;
519
520 if options.mouse_all_motion {
521 execute!(writer, EnableMouseCapture)?;
522 } else if options.mouse_cell_motion {
523 execute!(writer, EnableMouseCapture)?;
524 }
525
526 if options.report_focus {
527 execute!(writer, event::EnableFocusChange)?;
528 }
529
530 if options.bracketed_paste {
531 execute!(writer, event::EnableBracketedPaste)?;
532 }
533
534 let prev_hook = if !options.without_catch_panics {
538 let cleanup_opts = options.clone();
539 let prev = std::panic::take_hook();
540 std::panic::set_hook(Box::new(move |info| {
541 let mut stderr = io::stderr();
544 if cleanup_opts.bracketed_paste {
545 let _ = execute!(stderr, event::DisableBracketedPaste);
546 }
547 if cleanup_opts.report_focus {
548 let _ = execute!(stderr, event::DisableFocusChange);
549 }
550 if cleanup_opts.mouse_all_motion || cleanup_opts.mouse_cell_motion {
551 let _ = execute!(stderr, DisableMouseCapture);
552 }
553 let _ = execute!(stderr, Show);
554 if cleanup_opts.alt_screen {
555 let _ = execute!(stderr, LeaveAlternateScreen);
556 }
557 if !cleanup_opts.custom_io {
558 let _ = disable_raw_mode();
559 }
560 prev(info);
562 }));
563 true
564 } else {
565 false
566 };
567
568 let result = self.event_loop(&mut writer);
570
571 if prev_hook {
574 let _ = std::panic::take_hook();
575 }
578
579 if options.bracketed_paste {
581 let _ = execute!(writer, event::DisableBracketedPaste);
582 }
583
584 if options.report_focus {
585 let _ = execute!(writer, event::DisableFocusChange);
586 }
587
588 if options.mouse_all_motion || options.mouse_cell_motion {
589 let _ = execute!(writer, DisableMouseCapture);
590 }
591
592 let _ = execute!(writer, Show);
593
594 if options.alt_screen {
595 let _ = execute!(writer, LeaveAlternateScreen);
596 }
597
598 if !options.custom_io {
599 let _ = disable_raw_mode();
600 }
601
602 result
603 }
604
605 pub fn run(mut self) -> Result<M> {
607 if let Some(output) = self.output.take() {
608 return self.run_with_writer(output);
609 }
610
611 let stdout = io::stdout();
612 self.run_with_writer(stdout)
613 }
614
615 pub fn start(mut self) -> ProgramHandle<M> {
639 let (tx, rx) = mpsc::channel();
641
642 self.external_rx = Some(rx);
644
645 let output = self.output.take();
647
648 let handle = thread::spawn(move || {
650 if let Some(output) = output {
651 self.run_with_writer(output)
652 } else {
653 let stdout = io::stdout();
654 self.run_with_writer(stdout)
655 }
656 });
657
658 ProgramHandle {
659 tx,
660 handle: Some(handle),
661 }
662 }
663
664 fn event_loop<W: Write>(mut self, writer: &mut W) -> Result<M> {
665 let (tx, rx): (Sender<Message>, Receiver<Message>) = mpsc::channel();
667
668 let mut external_forwarder_handle: Option<thread::JoinHandle<()>> = None;
670 let mut input_parser_handle: Option<thread::JoinHandle<()>> = None;
671
672 let command_threads: Arc<Mutex<Vec<thread::JoinHandle<()>>>> =
674 Arc::new(Mutex::new(Vec::new()));
675
676 if let Some(ext_rx) = self.external_rx.take() {
678 let tx_clone = tx.clone();
679 debug!(target: "bubbletea::thread", "Spawning external forwarder thread");
680 external_forwarder_handle = Some(thread::spawn(move || {
681 while let Ok(msg) = ext_rx.recv() {
682 if tx_clone.send(msg).is_err() {
683 debug!(target: "bubbletea::event", "external message dropped — receiver disconnected");
684 break;
685 }
686 }
687 }));
688 }
689
690 if let Some(mut input) = self.input.take() {
692 let tx_clone = tx.clone();
693 debug!(target: "bubbletea::thread", "Spawning input parser thread");
694 input_parser_handle = Some(thread::spawn(move || {
695 let mut parser = InputParser::new();
696 let mut buf = [0u8; 256];
697 loop {
698 match input.read(&mut buf) {
699 Ok(0) => break,
700 Ok(n) => {
701 let can_have_more_data = true;
703 for msg in parser.push_bytes(&buf[..n], can_have_more_data) {
704 if tx_clone.send(msg).is_err() {
705 debug!(target: "bubbletea::input", "input message dropped — receiver disconnected");
706 return;
707 }
708 }
709 }
710 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
711 thread::yield_now();
712 }
713 Err(_) => break,
714 }
715 }
716
717 for msg in parser.flush() {
718 if tx_clone.send(msg).is_err() {
719 debug!(target: "bubbletea::input", "flush message dropped — receiver disconnected");
720 break;
721 }
722 }
723 }));
724 }
725
726 if !self.options.custom_io
728 && let Ok((width, height)) = terminal::size()
729 && tx
730 .send(Message::new(WindowSizeMsg { width, height }))
731 .is_err()
732 {
733 debug!(target: "bubbletea::event", "initial window size dropped — receiver disconnected");
734 }
735
736 if let Some(cmd) = self.model.init() {
738 self.handle_command(cmd, tx.clone(), Arc::clone(&command_threads));
739 }
740
741 let mut last_view = String::new();
743 self.render(writer, &mut last_view)?;
744
745 let frame_duration = Duration::from_secs_f64(1.0 / self.options.fps as f64);
747
748 loop {
750 if !self.options.custom_io && event::poll(frame_duration)? {
754 match event::read()? {
755 Event::Key(key_event) => {
756 if key_event.kind != KeyEventKind::Press {
758 continue;
759 }
760
761 let key_msg = from_crossterm_key(key_event.code, key_event.modifiers);
762
763 if key_msg.key_type == crate::KeyType::CtrlC {
765 if tx.send(Message::new(InterruptMsg)).is_err() {
766 debug!(target: "bubbletea::event", "interrupt message dropped — receiver disconnected");
767 }
768 } else if tx.send(Message::new(key_msg)).is_err() {
769 debug!(target: "bubbletea::event", "key message dropped — receiver disconnected");
770 }
771 }
772 Event::Mouse(mouse_event) => {
773 let mouse_msg = from_crossterm_mouse(mouse_event);
774 if tx.send(Message::new(mouse_msg)).is_err() {
775 debug!(target: "bubbletea::event", "mouse message dropped — receiver disconnected");
776 }
777 }
778 Event::Resize(width, height) => {
779 if tx
780 .send(Message::new(WindowSizeMsg { width, height }))
781 .is_err()
782 {
783 debug!(target: "bubbletea::event", "resize message dropped — receiver disconnected");
784 }
785 }
786 Event::FocusGained => {
787 if tx.send(Message::new(FocusMsg)).is_err() {
788 debug!(target: "bubbletea::event", "focus message dropped — receiver disconnected");
789 }
790 }
791 Event::FocusLost => {
792 if tx.send(Message::new(BlurMsg)).is_err() {
793 debug!(target: "bubbletea::event", "blur message dropped — receiver disconnected");
794 }
795 }
796 Event::Paste(text) => {
797 let key_msg = KeyMsg {
799 key_type: crate::KeyType::Runes,
800 runes: text.chars().collect(),
801 alt: false,
802 paste: true,
803 };
804 if tx.send(Message::new(key_msg)).is_err() {
805 debug!(target: "bubbletea::event", "paste message dropped — receiver disconnected");
806 }
807 }
808 }
809 }
810
811 let mut needs_render = false;
813 let mut should_quit = false;
814 while let Ok(msg) = rx.try_recv() {
815 if msg.is::<QuitMsg>() {
817 should_quit = true;
818 break;
819 }
820
821 if msg.is::<InterruptMsg>() {
823 should_quit = true;
824 break;
825 }
826
827 if msg.is::<BatchMsg>() {
829 continue;
830 }
831
832 if let Some(title_msg) = msg.downcast_ref::<SetWindowTitleMsg>() {
834 execute!(writer, terminal::SetTitle(&title_msg.0))?;
835 continue;
836 }
837
838 if msg.is::<RequestWindowSizeMsg>() {
840 if !self.options.custom_io
841 && let Ok((width, height)) = terminal::size()
842 && tx
843 .send(Message::new(WindowSizeMsg { width, height }))
844 .is_err()
845 {
846 debug!(target: "bubbletea::event", "window size response dropped — receiver disconnected");
847 }
848 continue;
849 }
850
851 if let Some(print_msg) = msg.downcast_ref::<PrintLineMsg>() {
853 if !self.options.alt_screen {
854 for line in print_msg.0.lines() {
856 let _ = writeln!(writer, "{}", line);
857 }
858 let _ = writer.flush();
859 last_view.clear();
861 needs_render = true;
862 }
863 continue;
864 }
865
866 if msg.is::<ReleaseTerminalMsg>() {
868 if !self.options.custom_io {
869 if self.options.bracketed_paste {
871 let _ = execute!(writer, event::DisableBracketedPaste);
872 }
873 if self.options.report_focus {
874 let _ = execute!(writer, event::DisableFocusChange);
875 }
876 if self.options.mouse_all_motion || self.options.mouse_cell_motion {
877 let _ = execute!(writer, DisableMouseCapture);
878 }
879 let _ = execute!(writer, Show);
880 if self.options.alt_screen {
881 let _ = execute!(writer, LeaveAlternateScreen);
882 }
883 let _ = disable_raw_mode();
884 }
885 continue;
886 }
887
888 if msg.is::<RestoreTerminalMsg>() {
890 if !self.options.custom_io {
891 let _ = enable_raw_mode();
893 if self.options.alt_screen {
894 let _ = execute!(writer, EnterAlternateScreen);
895 }
896 let _ = execute!(writer, Hide);
897 if self.options.mouse_all_motion {
898 let _ = execute!(writer, EnableMouseCapture);
899 } else if self.options.mouse_cell_motion {
900 let _ = execute!(writer, EnableMouseCapture);
901 }
902 if self.options.report_focus {
903 let _ = execute!(writer, event::EnableFocusChange);
904 }
905 if self.options.bracketed_paste {
906 let _ = execute!(writer, event::EnableBracketedPaste);
907 }
908 last_view.clear();
910 }
911 needs_render = true;
912 continue;
913 }
914
915 if let Some(cmd) = self.model.update(msg) {
917 self.handle_command(cmd, tx.clone(), Arc::clone(&command_threads));
918 }
919 needs_render = true;
920 }
921
922 if should_quit {
924 break;
925 }
926
927 if needs_render {
929 self.render(writer, &mut last_view)?;
930 }
931
932 if self.options.custom_io {
934 thread::sleep(frame_duration);
935 }
936 }
937
938 drop(tx);
940 debug!(target: "bubbletea::thread", "Sender dropped, waiting for threads to exit");
941
942 if let Some(handle) = external_forwarder_handle {
943 match handle.join() {
944 Ok(()) => {
945 debug!(target: "bubbletea::thread", "External forwarder thread joined successfully")
946 }
947 Err(e) => {
948 tracing::warn!(target: "bubbletea::thread", "External forwarder thread panicked: {:?}", e)
949 }
950 }
951 }
952
953 if let Some(handle) = input_parser_handle {
954 match handle.join() {
955 Ok(()) => {
956 debug!(target: "bubbletea::thread", "Input parser thread joined successfully")
957 }
958 Err(e) => {
959 tracing::warn!(target: "bubbletea::thread", "Input parser thread panicked: {:?}", e)
960 }
961 }
962 }
963
964 const COMMAND_THREAD_TIMEOUT: Duration = Duration::from_secs(5);
967 let join_deadline = std::time::Instant::now() + COMMAND_THREAD_TIMEOUT;
968
969 if let Ok(mut threads) = command_threads.lock() {
970 let thread_count = threads.len();
971 if thread_count > 0 {
972 debug!(target: "bubbletea::thread", "Waiting for {} command thread(s) to complete", thread_count);
973 }
974
975 for handle in threads.drain(..) {
977 if handle.is_finished() {
978 let _ = handle.join();
980 } else {
981 let remaining =
983 join_deadline.saturating_duration_since(std::time::Instant::now());
984 if remaining.is_zero() {
985 debug!(target: "bubbletea::thread", "Timeout waiting for command threads, abandoning remaining");
986 break;
987 }
988
989 let poll_interval = Duration::from_millis(10);
991 let start = std::time::Instant::now();
992 while !handle.is_finished() && start.elapsed() < remaining {
993 thread::sleep(poll_interval);
994 }
995
996 if handle.is_finished() {
997 match handle.join() {
998 Ok(()) => {
999 debug!(target: "bubbletea::thread", "Command thread joined successfully")
1000 }
1001 Err(e) => {
1002 tracing::warn!(target: "bubbletea::thread", "Command thread panicked: {:?}", e)
1003 }
1004 }
1005 } else {
1006 debug!(target: "bubbletea::thread", "Command thread did not finish in time, abandoning");
1007 }
1008 }
1009 }
1010 } else {
1011 tracing::warn!(target: "bubbletea::thread", "Failed to join command threads: mutex poisoned");
1012 }
1013
1014 Ok(self.model)
1015 }
1016
1017 fn handle_command(
1018 &self,
1019 cmd: Cmd,
1020 tx: Sender<Message>,
1021 command_threads: Arc<Mutex<Vec<thread::JoinHandle<()>>>>,
1022 ) {
1023 let handle = thread::spawn(move || {
1025 if let Some(msg) = cmd.execute() {
1026 if msg.is::<BatchMsg>() {
1028 if let Some(batch) = msg.downcast::<BatchMsg>() {
1029 for cmd in batch.0 {
1030 let tx_clone = tx.clone();
1031 spawn_batch(move || {
1032 if let Some(msg) = cmd.execute()
1033 && tx_clone.send(msg).is_err()
1034 {
1035 debug!(target: "bubbletea::command", "batch command result dropped — receiver disconnected");
1036 }
1037 });
1038 }
1039 }
1040 } else if msg.is::<SequenceMsg>() {
1041 if let Some(seq) = msg.downcast::<SequenceMsg>() {
1042 for cmd in seq.0 {
1043 if let Some(msg) = cmd.execute()
1044 && tx.send(msg).is_err()
1045 {
1046 debug!(target: "bubbletea::command", "sequence command result dropped — receiver disconnected");
1047 break;
1048 }
1049 }
1050 }
1051 } else if tx.send(msg).is_err() {
1052 debug!(target: "bubbletea::command", "command result dropped — receiver disconnected");
1053 }
1054 }
1055 });
1056
1057 if let Ok(mut threads) = command_threads.lock() {
1059 threads.retain(|h| !h.is_finished());
1061 threads.push(handle);
1062 } else {
1063 debug!(target: "bubbletea::thread", "Failed to track command thread: mutex poisoned");
1066 }
1067 }
1068
1069 fn render<W: Write>(&self, writer: &mut W, last_view: &mut String) -> Result<()> {
1070 let view = self.model.view();
1071
1072 if view == *last_view {
1074 return Ok(());
1075 }
1076
1077 execute!(writer, MoveTo(0, 0), Clear(ClearType::All))?;
1079 write!(writer, "{}", view)?;
1080 writer.flush()?;
1081
1082 *last_view = view;
1083 Ok(())
1084 }
1085}
1086
1087#[cfg(feature = "async")]
1092impl<M: Model> Program<M> {
1093 pub async fn run_async(mut self) -> Result<M> {
1114 if let Some(output) = self.output.take() {
1115 return self.run_async_with_writer(output).await;
1116 }
1117
1118 let stdout = io::stdout();
1119 self.run_async_with_writer(stdout).await
1120 }
1121
1122 pub async fn run_async_with_writer<W: Write + Send + 'static>(
1124 self,
1125 mut writer: W,
1126 ) -> Result<M> {
1127 let options = self.options.clone();
1129
1130 if !options.custom_io {
1132 enable_raw_mode()?;
1133 }
1134
1135 if options.alt_screen {
1136 execute!(writer, EnterAlternateScreen)?;
1137 }
1138
1139 execute!(writer, Hide)?;
1140
1141 if options.mouse_all_motion {
1142 execute!(writer, EnableMouseCapture)?;
1143 } else if options.mouse_cell_motion {
1144 execute!(writer, EnableMouseCapture)?;
1145 }
1146
1147 if options.report_focus {
1148 execute!(writer, event::EnableFocusChange)?;
1149 }
1150
1151 if options.bracketed_paste {
1152 execute!(writer, event::EnableBracketedPaste)?;
1153 }
1154
1155 let prev_hook = if !options.without_catch_panics {
1157 let cleanup_opts = options.clone();
1158 let prev = std::panic::take_hook();
1159 std::panic::set_hook(Box::new(move |info| {
1160 let mut stderr = io::stderr();
1161 if cleanup_opts.bracketed_paste {
1162 let _ = execute!(stderr, event::DisableBracketedPaste);
1163 }
1164 if cleanup_opts.report_focus {
1165 let _ = execute!(stderr, event::DisableFocusChange);
1166 }
1167 if cleanup_opts.mouse_all_motion || cleanup_opts.mouse_cell_motion {
1168 let _ = execute!(stderr, DisableMouseCapture);
1169 }
1170 let _ = execute!(stderr, Show);
1171 if cleanup_opts.alt_screen {
1172 let _ = execute!(stderr, LeaveAlternateScreen);
1173 }
1174 if !cleanup_opts.custom_io {
1175 let _ = disable_raw_mode();
1176 }
1177 prev(info);
1178 }));
1179 true
1180 } else {
1181 false
1182 };
1183
1184 let result = self.event_loop_async(&mut writer).await;
1186
1187 if prev_hook {
1189 let _ = std::panic::take_hook();
1190 }
1191
1192 if options.bracketed_paste {
1194 let _ = execute!(writer, event::DisableBracketedPaste);
1195 }
1196
1197 if options.report_focus {
1198 let _ = execute!(writer, event::DisableFocusChange);
1199 }
1200
1201 if options.mouse_all_motion || options.mouse_cell_motion {
1202 let _ = execute!(writer, DisableMouseCapture);
1203 }
1204
1205 let _ = execute!(writer, Show);
1206
1207 if options.alt_screen {
1208 let _ = execute!(writer, LeaveAlternateScreen);
1209 }
1210
1211 if !options.custom_io {
1212 let _ = disable_raw_mode();
1213 }
1214
1215 result
1216 }
1217
1218 async fn event_loop_async<W: Write>(mut self, stdout: &mut W) -> Result<M> {
1219 let (tx, mut rx) = tokio::sync::mpsc::channel::<Message>(256);
1221
1222 let cancel_token = CancellationToken::new();
1224 let task_tracker = TaskTracker::new();
1225
1226 if let Some(ext_rx) = self.external_rx.take() {
1229 let tx_clone = tx.clone();
1230 let cancel_clone = cancel_token.clone();
1231 task_tracker.spawn_blocking(move || {
1232 let timeout = Duration::from_millis(100);
1234 loop {
1235 if cancel_clone.is_cancelled() {
1236 break;
1237 }
1238 match ext_rx.recv_timeout(timeout) {
1239 Ok(msg) => {
1240 if tx_clone.blocking_send(msg).is_err() {
1241 debug!(target: "bubbletea::event", "async external message dropped — receiver disconnected");
1242 break;
1243 }
1244 }
1245 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
1246 }
1248 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
1249 break;
1251 }
1252 }
1253 }
1254 });
1255 }
1256
1257 if let Some(mut input) = self.input.take() {
1259 let tx_clone = tx.clone();
1260 let cancel_clone = cancel_token.clone();
1261 task_tracker.spawn_blocking(move || {
1262 let mut parser = InputParser::new();
1263 let mut buf = [0u8; 256];
1264 loop {
1265 if cancel_clone.is_cancelled() {
1266 break;
1267 }
1268 match input.read(&mut buf) {
1269 Ok(0) => break,
1270 Ok(n) => {
1271 let can_have_more_data = true;
1273 for msg in parser.push_bytes(&buf[..n], can_have_more_data) {
1274 if tx_clone.blocking_send(msg).is_err() {
1275 return;
1276 }
1277 }
1278 }
1279 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1280 std::thread::yield_now();
1281 }
1282 Err(_) => break,
1283 }
1284 }
1285
1286 for msg in parser.flush() {
1287 if tx_clone.blocking_send(msg).is_err() {
1288 break;
1289 }
1290 }
1291 });
1292 }
1293
1294 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<Event>(100);
1296 let event_cancel = cancel_token.clone();
1297
1298 if !self.options.custom_io {
1299 task_tracker.spawn_blocking(move || {
1300 loop {
1301 if event_cancel.is_cancelled() {
1302 break;
1303 }
1304 match event::poll(Duration::from_millis(100)) {
1306 Ok(true) => {
1307 if let Ok(evt) = event::read()
1308 && event_tx.blocking_send(evt).is_err()
1309 {
1310 break;
1311 }
1312 }
1313 Ok(false) => {} Err(_) => {
1315 break;
1316 } }
1318 }
1319 });
1320 }
1321
1322 if !self.options.custom_io {
1324 let (width, height) = terminal::size()?;
1325 if tx
1326 .send(Message::new(WindowSizeMsg { width, height }))
1327 .await
1328 .is_err()
1329 {
1330 debug!(target: "bubbletea::event", "async initial window size dropped — receiver disconnected");
1331 }
1332 }
1333
1334 if let Some(cmd) = self.model.init() {
1336 Self::handle_command_tracked(
1337 cmd.into(),
1338 tx.clone(),
1339 &task_tracker,
1340 cancel_token.clone(),
1341 );
1342 }
1343
1344 let mut last_view = String::new();
1346 self.render(stdout, &mut last_view)?;
1347
1348 let frame_duration = Duration::from_secs_f64(1.0 / self.options.fps as f64);
1350 let mut frame_interval = tokio::time::interval(frame_duration);
1351
1352 loop {
1354 tokio::select! {
1355 Some(event) = event_rx.recv(), if !self.options.custom_io => {
1357 match event {
1358 Event::Key(key_event) => {
1359 if key_event.kind != KeyEventKind::Press {
1361 continue;
1362 }
1363
1364 let key_msg = from_crossterm_key(key_event.code, key_event.modifiers);
1365
1366 if key_msg.key_type == crate::KeyType::CtrlC {
1368 if tx.send(Message::new(InterruptMsg)).await.is_err() {
1369 debug!(target: "bubbletea::event", "async interrupt message dropped — receiver disconnected");
1370 }
1371 } else if tx.send(Message::new(key_msg)).await.is_err() {
1372 debug!(target: "bubbletea::event", "async key message dropped — receiver disconnected");
1373 }
1374 }
1375 Event::Mouse(mouse_event) => {
1376 let mouse_msg = from_crossterm_mouse(mouse_event);
1377 if tx.send(Message::new(mouse_msg)).await.is_err() {
1378 debug!(target: "bubbletea::event", "async mouse message dropped — receiver disconnected");
1379 }
1380 }
1381 Event::Resize(width, height) => {
1382 if tx.send(Message::new(WindowSizeMsg { width, height })).await.is_err() {
1383 debug!(target: "bubbletea::event", "async resize message dropped — receiver disconnected");
1384 }
1385 }
1386 Event::FocusGained => {
1387 if tx.send(Message::new(FocusMsg)).await.is_err() {
1388 debug!(target: "bubbletea::event", "async focus message dropped — receiver disconnected");
1389 }
1390 }
1391 Event::FocusLost => {
1392 if tx.send(Message::new(BlurMsg)).await.is_err() {
1393 debug!(target: "bubbletea::event", "async blur message dropped — receiver disconnected");
1394 }
1395 }
1396 Event::Paste(text) => {
1397 let key_msg = KeyMsg {
1399 key_type: crate::KeyType::Runes,
1400 runes: text.chars().collect(),
1401 alt: false,
1402 paste: true,
1403 };
1404 if tx.send(Message::new(key_msg)).await.is_err() {
1405 debug!(target: "bubbletea::event", "async paste message dropped — receiver disconnected");
1406 }
1407 }
1408 }
1409 }
1410
1411 Some(msg) = rx.recv() => {
1413 if msg.is::<QuitMsg>() {
1415 Self::graceful_shutdown(&cancel_token, &task_tracker).await;
1416 return Ok(self.model);
1417 }
1418
1419 if msg.is::<InterruptMsg>() {
1421 Self::graceful_shutdown(&cancel_token, &task_tracker).await;
1422 return Ok(self.model);
1423 }
1424
1425 if msg.is::<BatchMsg>() {
1427 continue;
1428 }
1429
1430 if let Some(title_msg) = msg.downcast_ref::<SetWindowTitleMsg>() {
1432 execute!(stdout, terminal::SetTitle(&title_msg.0))?;
1433 continue;
1434 }
1435
1436 if msg.is::<RequestWindowSizeMsg>() {
1438 if !self.options.custom_io {
1439 let (width, height) = terminal::size()?;
1440 if tx.send(Message::new(WindowSizeMsg { width, height })).await.is_err() {
1441 debug!(target: "bubbletea::event", "async window size response dropped — receiver disconnected");
1442 }
1443 }
1444 continue;
1445 }
1446
1447 if let Some(print_msg) = msg.downcast_ref::<PrintLineMsg>() {
1449 if !self.options.alt_screen {
1450 for line in print_msg.0.lines() {
1452 let _ = writeln!(stdout, "{}", line);
1453 }
1454 let _ = stdout.flush();
1455 last_view.clear();
1457 }
1458 self.render(stdout, &mut last_view)?;
1459 continue;
1460 }
1461
1462 if msg.is::<ReleaseTerminalMsg>() {
1464 if !self.options.custom_io {
1465 if self.options.bracketed_paste {
1467 let _ = execute!(stdout, event::DisableBracketedPaste);
1468 }
1469 if self.options.report_focus {
1470 let _ = execute!(stdout, event::DisableFocusChange);
1471 }
1472 if self.options.mouse_all_motion || self.options.mouse_cell_motion {
1473 let _ = execute!(stdout, DisableMouseCapture);
1474 }
1475 let _ = execute!(stdout, Show);
1476 if self.options.alt_screen {
1477 let _ = execute!(stdout, LeaveAlternateScreen);
1478 }
1479 let _ = disable_raw_mode();
1480 }
1481 continue;
1482 }
1483
1484 if msg.is::<RestoreTerminalMsg>() {
1486 if !self.options.custom_io {
1487 let _ = enable_raw_mode();
1489 if self.options.alt_screen {
1490 let _ = execute!(stdout, EnterAlternateScreen);
1491 }
1492 let _ = execute!(stdout, Hide);
1493 if self.options.mouse_all_motion {
1494 let _ = execute!(stdout, EnableMouseCapture);
1495 } else if self.options.mouse_cell_motion {
1496 let _ = execute!(stdout, EnableMouseCapture);
1497 }
1498 if self.options.report_focus {
1499 let _ = execute!(stdout, event::EnableFocusChange);
1500 }
1501 if self.options.bracketed_paste {
1502 let _ = execute!(stdout, event::EnableBracketedPaste);
1503 }
1504 last_view.clear();
1506 }
1507 self.render(stdout, &mut last_view)?;
1508 continue;
1509 }
1510
1511 if let Some(cmd) = self.model.update(msg) {
1513 Self::handle_command_tracked(
1514 cmd.into(),
1515 tx.clone(),
1516 &task_tracker,
1517 cancel_token.clone(),
1518 );
1519 }
1520
1521 self.render(stdout, &mut last_view)?;
1523 }
1524
1525 _ = frame_interval.tick() => {
1527 }
1529 }
1530 }
1531 }
1532
1533 async fn graceful_shutdown(cancel_token: &CancellationToken, task_tracker: &TaskTracker) {
1535 cancel_token.cancel();
1537
1538 task_tracker.close();
1540
1541 let shutdown_timeout = Duration::from_secs(5);
1543 let _ = tokio::time::timeout(shutdown_timeout, task_tracker.wait()).await;
1544 }
1545
1546 fn handle_command_tracked(
1548 cmd: CommandKind,
1549 tx: tokio::sync::mpsc::Sender<Message>,
1550 tracker: &TaskTracker,
1551 cancel_token: CancellationToken,
1552 ) {
1553 let batch_tracker = tracker.clone();
1556 let batch_cancel = cancel_token.clone();
1557 tracker.spawn(async move {
1558 tokio::select! {
1559 result = cmd.execute() => {
1561 if let Some(msg) = result {
1562 if msg.is::<BatchMsg>() {
1564 if let Some(batch) = msg.downcast::<BatchMsg>() {
1565 for cmd in batch.0 {
1566 let tx_clone = tx.clone();
1567 let cancel = batch_cancel.clone();
1568 batch_tracker.spawn(async move {
1571 tokio::select! {
1572 result = async {
1573 let cmd_kind: CommandKind = cmd.into();
1574 cmd_kind.execute().await
1575 } => {
1576 if let Some(msg) = result {
1577 if tx_clone.send(msg).await.is_err() {
1578 debug!(target: "bubbletea::command", "async batch command result dropped — receiver disconnected");
1579 }
1580 }
1581 }
1582 _ = cancel.cancelled() => {
1583 debug!(target: "bubbletea::command", "async batch command cancelled during shutdown");
1584 }
1585 }
1586 });
1587 }
1588 }
1589 } else if msg.is::<SequenceMsg>() {
1590 if let Some(seq) = msg.downcast::<SequenceMsg>() {
1591 for cmd in seq.0 {
1592 let cmd_kind: CommandKind = cmd.into();
1593 if let Some(msg) = cmd_kind.execute().await {
1594 if tx.send(msg).await.is_err() {
1595 debug!(target: "bubbletea::command", "async sequence command result dropped — receiver disconnected");
1596 break;
1597 }
1598 }
1599 }
1600 }
1601 } else if tx.send(msg).await.is_err() {
1602 debug!(target: "bubbletea::command", "async command result dropped — receiver disconnected");
1603 }
1604 }
1605 }
1606 _ = cancel_token.cancelled() => {
1608 }
1610 }
1611 });
1612 }
1613
1614 #[allow(dead_code)]
1616 fn handle_command_async(&self, cmd: CommandKind, tx: tokio::sync::mpsc::Sender<Message>) {
1617 tokio::spawn(async move {
1618 if let Some(msg) = cmd.execute().await {
1619 if msg.is::<BatchMsg>() {
1621 if let Some(batch) = msg.downcast::<BatchMsg>() {
1622 for cmd in batch.0 {
1623 let tx_clone = tx.clone();
1624 tokio::spawn(async move {
1625 let cmd_kind: CommandKind = cmd.into();
1626 if let Some(msg) = cmd_kind.execute().await {
1627 if tx_clone.send(msg).await.is_err() {
1628 debug!(target: "bubbletea::command", "legacy async batch command result dropped — receiver disconnected");
1629 }
1630 }
1631 });
1632 }
1633 }
1634 } else if msg.is::<SequenceMsg>() {
1635 if let Some(seq) = msg.downcast::<SequenceMsg>() {
1636 for cmd in seq.0 {
1637 let cmd_kind: CommandKind = cmd.into();
1638 if let Some(msg) = cmd_kind.execute().await {
1639 if tx.send(msg).await.is_err() {
1640 debug!(target: "bubbletea::command", "legacy async sequence command result dropped — receiver disconnected");
1641 break;
1642 }
1643 }
1644 }
1645 }
1646 } else if tx.send(msg).await.is_err() {
1647 debug!(target: "bubbletea::command", "legacy async command result dropped — receiver disconnected");
1648 }
1649 }
1650 });
1651 }
1652}
1653
1654struct InputParser {
1659 buffer: Vec<u8>,
1660}
1661
1662impl InputParser {
1663 fn new() -> Self {
1664 Self { buffer: Vec::new() }
1665 }
1666
1667 const MAX_BUFFER: usize = 1024 * 1024;
1670
1671 fn push_bytes(&mut self, bytes: &[u8], can_have_more_data: bool) -> Vec<Message> {
1672 if !bytes.is_empty() {
1673 if self.buffer.len() + bytes.len() > Self::MAX_BUFFER {
1674 debug!(
1675 target: "bubbletea::input",
1676 "Input buffer exceeded 1MB limit, draining"
1677 );
1678 self.buffer.clear();
1679 }
1680 self.buffer.extend_from_slice(bytes);
1681 }
1682
1683 let mut messages = Vec::new();
1684 loop {
1685 if self.buffer.is_empty() {
1686 break;
1687 }
1688
1689 match parse_one_message(&self.buffer, can_have_more_data) {
1690 ParseOutcome::NeedMore => break,
1691 ParseOutcome::Parsed(consumed, msg) => {
1692 self.buffer.drain(0..consumed);
1693 if let Some(msg) = msg {
1694 messages.push(msg);
1695 }
1696 }
1697 }
1698 }
1699
1700 messages
1701 }
1702
1703 fn flush(&mut self) -> Vec<Message> {
1704 let mut messages = Vec::new();
1705 loop {
1706 if self.buffer.is_empty() {
1707 break;
1708 }
1709
1710 match parse_one_message(&self.buffer, false) {
1711 ParseOutcome::NeedMore => break,
1712 ParseOutcome::Parsed(consumed, msg) => {
1713 self.buffer.drain(0..consumed);
1714 if let Some(msg) = msg {
1715 messages.push(msg);
1716 }
1717 }
1718 }
1719 }
1720 messages
1721 }
1722}
1723
1724enum ParseOutcome {
1725 NeedMore,
1726 Parsed(usize, Option<Message>),
1727}
1728
1729fn parse_one_message(buf: &[u8], can_have_more_data: bool) -> ParseOutcome {
1730 if buf.is_empty() {
1731 return ParseOutcome::NeedMore;
1732 }
1733
1734 if let Some(outcome) = parse_mouse_event(buf, can_have_more_data) {
1735 return outcome;
1736 }
1737
1738 if let Some(outcome) = parse_focus_event(buf, can_have_more_data) {
1739 return outcome;
1740 }
1741
1742 if let Some(outcome) = parse_bracketed_paste(buf, can_have_more_data) {
1743 return outcome;
1744 }
1745
1746 if let Some(outcome) = parse_key_sequence(buf, can_have_more_data) {
1747 return outcome;
1748 }
1749
1750 parse_runes_or_control(buf, can_have_more_data)
1751}
1752
1753fn parse_mouse_event(buf: &[u8], can_have_more_data: bool) -> Option<ParseOutcome> {
1754 if buf.starts_with(b"\x1b[M") {
1755 if buf.len() < 6 {
1756 return Some(if can_have_more_data {
1757 ParseOutcome::NeedMore
1758 } else {
1759 ParseOutcome::Parsed(1, Some(replacement_message()))
1760 });
1761 }
1762 let seq = &buf[..6];
1763 return Some(match crate::mouse::parse_mouse_event_sequence(seq) {
1764 Ok(msg) => ParseOutcome::Parsed(6, Some(Message::new(msg))),
1765 Err(_) => ParseOutcome::Parsed(1, Some(replacement_message())),
1766 });
1767 }
1768
1769 if buf.starts_with(b"\x1b[<") {
1770 if let Some(end_idx) = buf.iter().position(|b| *b == b'M' || *b == b'm') {
1771 let seq = &buf[..=end_idx];
1772 return Some(match crate::mouse::parse_mouse_event_sequence(seq) {
1773 Ok(msg) => ParseOutcome::Parsed(seq.len(), Some(Message::new(msg))),
1774 Err(_) => ParseOutcome::Parsed(1, Some(replacement_message())),
1775 });
1776 }
1777 return Some(if can_have_more_data {
1778 ParseOutcome::NeedMore
1779 } else {
1780 ParseOutcome::Parsed(1, Some(replacement_message()))
1781 });
1782 }
1783
1784 None
1785}
1786
1787fn parse_focus_event(buf: &[u8], can_have_more_data: bool) -> Option<ParseOutcome> {
1788 if buf.len() < 3 && buf.starts_with(b"\x1b[") && can_have_more_data {
1789 return Some(ParseOutcome::NeedMore);
1790 }
1791
1792 if buf.starts_with(b"\x1b[I") {
1793 return Some(ParseOutcome::Parsed(3, Some(Message::new(FocusMsg))));
1794 }
1795
1796 if buf.starts_with(b"\x1b[O") {
1797 return Some(ParseOutcome::Parsed(3, Some(Message::new(BlurMsg))));
1798 }
1799
1800 None
1801}
1802
1803fn parse_bracketed_paste(buf: &[u8], can_have_more_data: bool) -> Option<ParseOutcome> {
1804 const BP_START: &[u8] = b"\x1b[200~";
1805 const BP_END: &[u8] = b"\x1b[201~";
1806
1807 if !buf.starts_with(BP_START) {
1808 return None;
1809 }
1810
1811 if let Some(idx) = buf.windows(BP_END.len()).position(|w| w == BP_END) {
1812 let content = &buf[BP_START.len()..idx];
1813 let text = String::from_utf8_lossy(content);
1814 let runes = text.chars().collect::<Vec<char>>();
1815 let key = KeyMsg::from_runes(runes).with_paste();
1816 let total_len = idx + BP_END.len();
1817 return Some(ParseOutcome::Parsed(total_len, Some(message_from_key(key))));
1818 }
1819
1820 Some(if can_have_more_data {
1821 ParseOutcome::NeedMore
1822 } else {
1823 let content = &buf[BP_START.len()..];
1824 let text = String::from_utf8_lossy(content);
1825 let runes = text.chars().collect::<Vec<char>>();
1826 let key = KeyMsg::from_runes(runes).with_paste();
1827 ParseOutcome::Parsed(buf.len(), Some(message_from_key(key)))
1828 })
1829}
1830
1831fn parse_key_sequence(buf: &[u8], can_have_more_data: bool) -> Option<ParseOutcome> {
1832 if let Some((key, len)) = crate::key::parse_sequence_prefix(buf) {
1833 return Some(ParseOutcome::Parsed(len, Some(message_from_key(key))));
1834 }
1835
1836 if can_have_more_data && is_sequence_prefix(buf) {
1838 return Some(ParseOutcome::NeedMore);
1839 }
1840
1841 if buf.starts_with(b"\x1b")
1842 && let Some((mut key, len)) = crate::key::parse_sequence_prefix(&buf[1..])
1843 {
1844 if !key.alt {
1845 key = key.with_alt();
1846 }
1847 return Some(ParseOutcome::Parsed(len + 1, Some(message_from_key(key))));
1848 }
1849
1850 None
1851}
1852
1853fn parse_runes_or_control(buf: &[u8], can_have_more_data: bool) -> ParseOutcome {
1854 let mut alt = false;
1855 let mut idx = 0;
1856
1857 if buf[0] == 0x1b {
1858 if buf.len() == 1 {
1859 return if can_have_more_data {
1860 ParseOutcome::NeedMore
1861 } else {
1862 ParseOutcome::Parsed(1, Some(message_from_key(KeyMsg::from_type(KeyType::Esc))))
1863 };
1864 }
1865 alt = true;
1866 idx = 1;
1867 }
1868
1869 if idx >= buf.len() {
1870 return ParseOutcome::NeedMore;
1871 }
1872
1873 if let Some(key_type) = control_key_type(buf[idx]) {
1874 let mut key = KeyMsg::from_type(key_type);
1875 if alt {
1876 key = key.with_alt();
1877 }
1878 return ParseOutcome::Parsed(idx + 1, Some(message_from_key(key)));
1879 }
1880
1881 let mut runes = Vec::new();
1882 let mut i = idx;
1883 while i < buf.len() {
1884 let b = buf[i];
1885 if is_control_or_space(b) {
1886 break;
1887 }
1888
1889 let (ch, width, valid) = match decode_char(&buf[i..], can_have_more_data) {
1890 DecodeOutcome::NeedMore => return ParseOutcome::NeedMore,
1891 DecodeOutcome::Decoded(ch, width, valid) => (ch, width, valid),
1892 };
1893
1894 if !valid {
1895 runes.push(std::char::REPLACEMENT_CHARACTER);
1896 i += 1;
1897 } else {
1898 runes.push(ch);
1899 i += width;
1900 }
1901
1902 if alt {
1903 break;
1904 }
1905 }
1906
1907 if !runes.is_empty() {
1908 let mut key = KeyMsg::from_runes(runes);
1909 if alt {
1910 key = key.with_alt();
1911 }
1912 return ParseOutcome::Parsed(i, Some(message_from_key(key)));
1913 }
1914
1915 ParseOutcome::Parsed(1, Some(replacement_message()))
1916}
1917
1918fn control_key_type(byte: u8) -> Option<KeyType> {
1919 match byte {
1920 0x00 => Some(KeyType::Null),
1921 0x01 => Some(KeyType::CtrlA),
1922 0x02 => Some(KeyType::CtrlB),
1923 0x03 => Some(KeyType::CtrlC),
1924 0x04 => Some(KeyType::CtrlD),
1925 0x05 => Some(KeyType::CtrlE),
1926 0x06 => Some(KeyType::CtrlF),
1927 0x07 => Some(KeyType::CtrlG),
1928 0x08 => Some(KeyType::CtrlH),
1929 0x09 => Some(KeyType::Tab),
1930 0x0A => Some(KeyType::CtrlJ),
1931 0x0B => Some(KeyType::CtrlK),
1932 0x0C => Some(KeyType::CtrlL),
1933 0x0D => Some(KeyType::Enter),
1934 0x0E => Some(KeyType::CtrlN),
1935 0x0F => Some(KeyType::CtrlO),
1936 0x10 => Some(KeyType::CtrlP),
1937 0x11 => Some(KeyType::CtrlQ),
1938 0x12 => Some(KeyType::CtrlR),
1939 0x13 => Some(KeyType::CtrlS),
1940 0x14 => Some(KeyType::CtrlT),
1941 0x15 => Some(KeyType::CtrlU),
1942 0x16 => Some(KeyType::CtrlV),
1943 0x17 => Some(KeyType::CtrlW),
1944 0x18 => Some(KeyType::CtrlX),
1945 0x19 => Some(KeyType::CtrlY),
1946 0x1A => Some(KeyType::CtrlZ),
1947 0x1B => Some(KeyType::Esc),
1948 0x1C => Some(KeyType::CtrlBackslash),
1949 0x1D => Some(KeyType::CtrlCloseBracket),
1950 0x1E => Some(KeyType::CtrlCaret),
1951 0x1F => Some(KeyType::CtrlUnderscore),
1952 0x20 => Some(KeyType::Space),
1953 0x7F => Some(KeyType::Backspace),
1954 _ => None,
1955 }
1956}
1957
1958fn is_control_or_space(byte: u8) -> bool {
1959 byte <= 0x1F || byte == 0x7F || byte == b' '
1960}
1961
1962enum DecodeOutcome {
1963 NeedMore,
1964 Decoded(char, usize, bool),
1965}
1966
1967fn decode_char(input: &[u8], can_have_more_data: bool) -> DecodeOutcome {
1968 let first = input[0];
1969 let width = if first < 0x80 {
1970 1
1971 } else if (first & 0xE0) == 0xC0 {
1972 2
1973 } else if (first & 0xF0) == 0xE0 {
1974 3
1975 } else if (first & 0xF8) == 0xF0 {
1976 4
1977 } else {
1978 return DecodeOutcome::Decoded(std::char::REPLACEMENT_CHARACTER, 1, false);
1979 };
1980
1981 if input.len() < width {
1982 return if can_have_more_data {
1983 DecodeOutcome::NeedMore
1984 } else {
1985 DecodeOutcome::Decoded(std::char::REPLACEMENT_CHARACTER, 1, false)
1986 };
1987 }
1988
1989 match std::str::from_utf8(&input[..width]) {
1990 Ok(s) => {
1991 let ch = s.chars().next().unwrap_or(std::char::REPLACEMENT_CHARACTER);
1992 DecodeOutcome::Decoded(ch, width, true)
1993 }
1994 Err(_) => DecodeOutcome::Decoded(std::char::REPLACEMENT_CHARACTER, 1, false),
1995 }
1996}
1997
1998fn message_from_key(key: KeyMsg) -> Message {
1999 if key.key_type == KeyType::CtrlC {
2000 Message::new(InterruptMsg)
2001 } else {
2002 Message::new(key)
2003 }
2004}
2005
2006fn replacement_message() -> Message {
2007 Message::new(KeyMsg::from_char(std::char::REPLACEMENT_CHARACTER))
2008}
2009
2010#[cfg(test)]
2011mod tests {
2012 use super::*;
2013 use tokio_util::sync::CancellationToken;
2014 use tokio_util::task::TaskTracker;
2015
2016 struct TestModel {
2017 count: i32,
2018 }
2019
2020 impl Model for TestModel {
2021 fn init(&self) -> Option<Cmd> {
2022 None
2023 }
2024
2025 fn update(&mut self, msg: Message) -> Option<Cmd> {
2026 if let Some(n) = msg.downcast::<i32>() {
2027 self.count += n;
2028 }
2029 None
2030 }
2031
2032 fn view(&self) -> String {
2033 format!("Count: {}", self.count)
2034 }
2035 }
2036
2037 #[test]
2038 fn test_program_options_default() {
2039 let opts = ProgramOptions::default();
2040 assert!(!opts.alt_screen);
2041 assert!(!opts.mouse_cell_motion);
2042 assert!(opts.bracketed_paste);
2043 assert_eq!(opts.fps, 60);
2044 }
2045
2046 #[test]
2047 fn test_program_builder() {
2048 let model = TestModel { count: 0 };
2049 let program = Program::new(model)
2050 .with_alt_screen()
2051 .with_mouse_cell_motion()
2052 .with_fps(30);
2053
2054 assert!(program.options.alt_screen);
2055 assert!(program.options.mouse_cell_motion);
2056 assert_eq!(program.options.fps, 30);
2057 }
2058
2059 #[test]
2060 fn test_program_fps_max() {
2061 let model = TestModel { count: 0 };
2062 let program = Program::new(model).with_fps(200);
2063 assert_eq!(program.options.fps, 120); }
2065
2066 #[test]
2067 fn test_program_fps_min() {
2068 let model = TestModel { count: 0 };
2069 let program = Program::new(model).with_fps(0);
2070 assert_eq!(program.options.fps, 1); }
2072
2073 #[test]
2076 fn test_parse_bracketed_paste_basic() {
2077 let input = b"\x1b[200~hello world\x1b[201~";
2079 let result = parse_bracketed_paste(input, false);
2080
2081 assert!(result.is_some());
2082 if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2083 assert_eq!(len, input.len());
2084 let key = msg.downcast_ref::<KeyMsg>().unwrap();
2085 assert!(key.paste, "Key should have paste flag set");
2086 assert_eq!(
2087 key.runes,
2088 vec!['h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd']
2089 );
2090 } else {
2091 panic!("Expected Parsed outcome");
2092 }
2093 }
2094
2095 #[test]
2096 fn test_parse_bracketed_paste_empty() {
2097 let input = b"\x1b[200~\x1b[201~";
2098 let result = parse_bracketed_paste(input, false);
2099
2100 assert!(result.is_some());
2101 if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2102 assert_eq!(len, input.len());
2103 let key = msg.downcast_ref::<KeyMsg>().unwrap();
2104 assert!(key.paste);
2105 assert!(key.runes.is_empty());
2106 } else {
2107 panic!("Expected Parsed outcome");
2108 }
2109 }
2110
2111 #[test]
2112 fn test_parse_bracketed_paste_multiline() {
2113 let input = b"\x1b[200~line1\nline2\nline3\x1b[201~";
2114 let result = parse_bracketed_paste(input, false);
2115
2116 assert!(result.is_some());
2117 if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2118 assert_eq!(len, input.len());
2119 let key = msg.downcast_ref::<KeyMsg>().unwrap();
2120 assert!(key.paste);
2121 let text: String = key.runes.iter().collect();
2122 assert_eq!(text, "line1\nline2\nline3");
2123 } else {
2124 panic!("Expected Parsed outcome");
2125 }
2126 }
2127
2128 #[test]
2129 fn test_parse_bracketed_paste_unicode() {
2130 let input = "\x1b[200~hello 世界 🌍\x1b[201~".as_bytes();
2131 let result = parse_bracketed_paste(input, false);
2132
2133 assert!(result.is_some());
2134 if let Some(ParseOutcome::Parsed(_, Some(msg))) = result {
2135 let key = msg.downcast_ref::<KeyMsg>().unwrap();
2136 assert!(key.paste);
2137 let text: String = key.runes.iter().collect();
2138 assert_eq!(text, "hello 世界 🌍");
2139 } else {
2140 panic!("Expected Parsed outcome");
2141 }
2142 }
2143
2144 #[test]
2145 fn test_parse_bracketed_paste_incomplete() {
2146 let input = b"\x1b[200~hello";
2148 let result = parse_bracketed_paste(input, true);
2149
2150 assert!(matches!(result, Some(ParseOutcome::NeedMore)));
2151 }
2152
2153 #[test]
2154 fn test_parse_bracketed_paste_incomplete_no_more_data() {
2155 let input = b"\x1b[200~hello";
2157 let result = parse_bracketed_paste(input, false);
2158
2159 assert!(result.is_some());
2160 if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2161 assert_eq!(len, input.len());
2162 let key = msg.downcast_ref::<KeyMsg>().unwrap();
2163 assert!(key.paste);
2164 let text: String = key.runes.iter().collect();
2165 assert_eq!(text, "hello");
2166 } else {
2167 panic!("Expected Parsed outcome");
2168 }
2169 }
2170
2171 #[test]
2172 fn test_parse_bracketed_paste_not_bracketed() {
2173 let input = b"hello";
2175 let result = parse_bracketed_paste(input, false);
2176 assert!(result.is_none(), "Non-paste input should return None");
2177 }
2178
2179 #[test]
2180 fn test_parse_bracketed_paste_large() {
2181 let content = "a".repeat(10000);
2183 let input = format!("\x1b[200~{}\x1b[201~", content);
2184 let result = parse_bracketed_paste(input.as_bytes(), false);
2185
2186 assert!(result.is_some());
2187 if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2188 assert_eq!(len, input.len());
2189 let key = msg.downcast_ref::<KeyMsg>().unwrap();
2190 assert!(key.paste);
2191 assert_eq!(key.runes.len(), 10000);
2192 } else {
2193 panic!("Expected Parsed outcome");
2194 }
2195 }
2196
2197 #[test]
2200 fn spawn_batch_executes_closure() {
2201 use std::sync::Arc;
2202 use std::sync::atomic::{AtomicBool, Ordering};
2203
2204 let executed = Arc::new(AtomicBool::new(false));
2205 let clone = executed.clone();
2206
2207 spawn_batch(move || {
2208 clone.store(true, Ordering::SeqCst);
2209 });
2210
2211 thread::sleep(Duration::from_millis(200));
2213 assert!(
2214 executed.load(Ordering::SeqCst),
2215 "spawn_batch should execute the closure"
2216 );
2217 }
2218
2219 #[test]
2220 fn spawn_batch_handles_many_concurrent_tasks() {
2221 use std::sync::Arc;
2222 use std::sync::atomic::{AtomicUsize, Ordering};
2223
2224 let counter = Arc::new(AtomicUsize::new(0));
2225 let task_count = 200;
2226
2227 for _ in 0..task_count {
2228 let c = counter.clone();
2229 spawn_batch(move || {
2230 c.fetch_add(1, Ordering::SeqCst);
2231 });
2232 }
2233
2234 let deadline = std::time::Instant::now() + Duration::from_secs(5);
2236 while counter.load(Ordering::SeqCst) < task_count && std::time::Instant::now() < deadline {
2237 thread::sleep(Duration::from_millis(50));
2238 }
2239
2240 assert_eq!(
2241 counter.load(Ordering::SeqCst),
2242 task_count,
2243 "All {task_count} tasks should complete"
2244 );
2245 }
2246
2247 #[test]
2248 fn handle_command_batch_executes_all_subcommands() {
2249 let model = TestModel { count: 0 };
2250 let program = Program::new(model);
2251 let (tx, rx) = mpsc::channel();
2252 let command_threads = Arc::new(Mutex::new(Vec::new()));
2253
2254 let cmds: Vec<Option<Cmd>> = (0..50)
2256 .map(|i| Some(Cmd::new(move || Message::new(i))))
2257 .collect();
2258 let batch_cmd = crate::batch(cmds).unwrap();
2259
2260 program.handle_command(batch_cmd, tx, Arc::clone(&command_threads));
2261
2262 let mut results = Vec::new();
2264 let deadline = std::time::Instant::now() + Duration::from_secs(5);
2265 while results.len() < 50 && std::time::Instant::now() < deadline {
2266 if let Ok(msg) = rx.recv_timeout(Duration::from_millis(100)) {
2267 results.push(msg.downcast::<i32>().unwrap());
2268 }
2269 }
2270
2271 assert_eq!(
2272 results.len(),
2273 50,
2274 "All 50 batch sub-commands should produce results"
2275 );
2276 results.sort();
2277 let expected: Vec<i32> = (0..50).collect();
2278 assert_eq!(
2279 results, expected,
2280 "Each sub-command value should be received exactly once"
2281 );
2282 }
2283
2284 #[cfg(feature = "thread-pool")]
2285 #[test]
2286 fn handle_command_batch_bounded_parallelism() {
2287 use std::sync::atomic::{AtomicUsize, Ordering};
2288
2289 let model = TestModel { count: 0 };
2290 let program = Program::new(model);
2291 let (tx, rx) = mpsc::channel();
2292 let command_threads = Arc::new(Mutex::new(Vec::new()));
2293
2294 let active = Arc::new(AtomicUsize::new(0));
2295 let max_active = Arc::new(AtomicUsize::new(0));
2296
2297 let task_count: usize = 100;
2298 let cmds: Vec<Option<Cmd>> = (0..task_count)
2299 .map(|_| {
2300 let a = active.clone();
2301 let m = max_active.clone();
2302 Some(Cmd::new(move || {
2303 let current = a.fetch_add(1, Ordering::SeqCst) + 1;
2304 m.fetch_max(current, Ordering::SeqCst);
2305 thread::sleep(Duration::from_millis(5));
2306 a.fetch_sub(1, Ordering::SeqCst);
2307 Message::new(1i32)
2308 }))
2309 })
2310 .collect();
2311 let batch_cmd = crate::batch(cmds).unwrap();
2312
2313 program.handle_command(batch_cmd, tx, Arc::clone(&command_threads));
2314
2315 let mut count = 0usize;
2317 let deadline = std::time::Instant::now() + Duration::from_secs(15);
2318 while count < task_count && std::time::Instant::now() < deadline {
2319 if let Ok(_msg) = rx.recv_timeout(Duration::from_millis(100)) {
2320 count += 1;
2321 }
2322 }
2323
2324 assert_eq!(count, task_count, "All batch commands should complete");
2325
2326 let observed_max = max_active.load(Ordering::SeqCst);
2327 let num_cpus = thread::available_parallelism()
2328 .map(|n| n.get())
2329 .unwrap_or(4);
2330
2331 assert!(
2334 observed_max <= num_cpus + 2,
2335 "Expected bounded parallelism near {num_cpus}, but observed {observed_max}. \
2336 Without thread-pool feature, this would be near {task_count}."
2337 );
2338 }
2339
2340 #[test]
2341 fn handle_command_large_batch_no_panic() {
2342 let model = TestModel { count: 0 };
2343 let program = Program::new(model);
2344 let (tx, rx) = mpsc::channel();
2345 let command_threads = Arc::new(Mutex::new(Vec::new()));
2346
2347 let cmds: Vec<Option<Cmd>> = (0..500)
2349 .map(|i| Some(Cmd::new(move || Message::new(i))))
2350 .collect();
2351 let batch_cmd = crate::batch(cmds).unwrap();
2352
2353 program.handle_command(batch_cmd, tx, Arc::clone(&command_threads));
2354
2355 let mut count = 0usize;
2357 let deadline = std::time::Instant::now() + Duration::from_secs(10);
2358 while count < 500 && std::time::Instant::now() < deadline {
2359 if let Ok(_msg) = rx.recv_timeout(Duration::from_millis(50)) {
2360 count += 1;
2361 }
2362 }
2363
2364 assert_eq!(count, 500, "Large batch should complete without panic");
2365 }
2366
2367 #[test]
2370 fn test_thread_handles_captured() {
2371 let handle: Option<thread::JoinHandle<()>> = Some(thread::spawn(|| {
2374 }));
2376
2377 assert!(handle.is_some(), "Handle should be captured");
2378
2379 if let Some(h) = handle {
2381 h.join().expect("Thread should join successfully");
2382 }
2383 }
2384
2385 #[test]
2386 fn test_threads_exit_on_channel_drop() {
2387 use std::sync::Arc;
2388 use std::sync::atomic::{AtomicBool, Ordering};
2389
2390 let thread_exited = Arc::new(AtomicBool::new(false));
2392 let thread_exited_clone = Arc::clone(&thread_exited);
2393
2394 let (tx, rx) = mpsc::channel::<i32>();
2396
2397 let handle = thread::spawn(move || {
2399 while rx.recv().is_ok() {}
2401 thread_exited_clone.store(true, Ordering::SeqCst);
2402 });
2403
2404 assert!(!thread_exited.load(Ordering::SeqCst));
2406
2407 drop(tx);
2409
2410 handle
2412 .join()
2413 .expect("Thread should join after channel drop");
2414
2415 assert!(
2417 thread_exited.load(Ordering::SeqCst),
2418 "Thread should have exited after channel drop"
2419 );
2420 }
2421
2422 #[test]
2423 fn test_shutdown_joins_all_threads() {
2424 use std::sync::Arc;
2425 use std::sync::atomic::{AtomicUsize, Ordering};
2426
2427 let join_count = Arc::new(AtomicUsize::new(0));
2429
2430 let mut handles: Vec<thread::JoinHandle<()>> = Vec::new();
2432
2433 for i in 0..3 {
2434 let join_count_clone = Arc::clone(&join_count);
2435 handles.push(thread::spawn(move || {
2436 thread::sleep(Duration::from_millis(10 * (i as u64 + 1)));
2438 join_count_clone.fetch_add(1, Ordering::SeqCst);
2439 }));
2440 }
2441
2442 for handle in handles {
2444 match handle.join() {
2445 Ok(()) => {} Err(e) => panic!("Thread panicked during join: {:?}", e),
2447 }
2448 }
2449
2450 assert_eq!(
2452 join_count.load(Ordering::SeqCst),
2453 3,
2454 "All threads should have completed and been joined"
2455 );
2456 }
2457
2458 #[test]
2459 fn test_thread_panic_handled_gracefully() {
2460 let handle = thread::spawn(|| {
2462 panic!("Intentional panic for testing");
2463 });
2464
2465 let result = handle.join();
2467 let e = result.expect_err("Join should return Err when thread panics");
2469 let _panic_info = format!("{:?}", e);
2471 }
2473
2474 #[test]
2475 fn test_external_forwarder_pattern() {
2476 let (external_tx, external_rx) = mpsc::channel::<Message>();
2478 let (event_tx, event_rx) = mpsc::channel::<Message>();
2479
2480 let tx_clone = event_tx.clone();
2482 let handle = thread::spawn(move || {
2483 while let Ok(msg) = external_rx.recv() {
2484 if tx_clone.send(msg).is_err() {
2485 break;
2486 }
2487 }
2488 });
2489
2490 external_tx.send(Message::new(1i32)).unwrap();
2492 external_tx.send(Message::new(2i32)).unwrap();
2493 external_tx.send(Message::new(3i32)).unwrap();
2494
2495 drop(external_tx);
2497
2498 let join_result = handle.join();
2500 assert!(
2501 join_result.is_ok(),
2502 "Forwarder thread should exit cleanly when sender is dropped"
2503 );
2504
2505 let mut received = Vec::new();
2507 while let Ok(msg) = event_rx.try_recv() {
2508 if let Some(&n) = msg.downcast_ref::<i32>() {
2509 received.push(n);
2510 }
2511 }
2512 assert_eq!(received, vec![1, 2, 3], "All messages should be forwarded");
2513 }
2514
2515 #[test]
2518 fn test_task_tracker_spawn_blocking_tracks_thread() {
2519 use std::sync::Arc;
2522 use std::sync::atomic::{AtomicBool, Ordering};
2523
2524 let rt = tokio::runtime::Builder::new_current_thread()
2525 .enable_all()
2526 .build()
2527 .expect("Failed to create runtime");
2528
2529 let thread_completed = Arc::new(AtomicBool::new(false));
2530 let thread_completed_clone = Arc::clone(&thread_completed);
2531
2532 rt.block_on(async {
2533 let task_tracker = TaskTracker::new();
2534
2535 task_tracker.spawn_blocking(move || {
2537 thread::sleep(Duration::from_millis(50));
2538 thread_completed_clone.store(true, Ordering::SeqCst);
2539 });
2540
2541 task_tracker.close();
2543
2544 task_tracker.wait().await;
2546
2547 assert!(
2549 thread_completed.load(Ordering::SeqCst),
2550 "spawn_blocking task should complete before wait() returns"
2551 );
2552 });
2553 }
2554
2555 #[test]
2556 fn test_cancellation_token_stops_blocking_task() {
2557 use std::sync::Arc;
2559 use std::sync::atomic::{AtomicBool, Ordering};
2560
2561 let rt = tokio::runtime::Builder::new_current_thread()
2562 .enable_all()
2563 .build()
2564 .expect("Failed to create runtime");
2565
2566 let task_exited = Arc::new(AtomicBool::new(false));
2567 let task_exited_clone = Arc::clone(&task_exited);
2568
2569 rt.block_on(async {
2570 let cancel_token = CancellationToken::new();
2571 let task_tracker = TaskTracker::new();
2572
2573 let cancel_clone = cancel_token.clone();
2574
2575 task_tracker.spawn_blocking(move || {
2577 loop {
2579 if cancel_clone.is_cancelled() {
2580 task_exited_clone.store(true, Ordering::SeqCst);
2581 break;
2582 }
2583 thread::sleep(Duration::from_millis(10));
2584 }
2585 });
2586
2587 thread::sleep(Duration::from_millis(30));
2589 assert!(
2590 !task_exited.load(Ordering::SeqCst),
2591 "Task should still be running before cancellation"
2592 );
2593
2594 cancel_token.cancel();
2596 task_tracker.close();
2597 task_tracker.wait().await;
2598
2599 assert!(
2600 task_exited.load(Ordering::SeqCst),
2601 "Task should exit after cancellation"
2602 );
2603 });
2604 }
2605
2606 #[test]
2607 fn test_graceful_shutdown_waits_for_all_blocking_tasks() {
2608 use std::sync::Arc;
2610 use std::sync::atomic::{AtomicUsize, Ordering};
2611
2612 let rt = tokio::runtime::Builder::new_current_thread()
2613 .enable_all()
2614 .build()
2615 .expect("Failed to create runtime");
2616
2617 let completed_count = Arc::new(AtomicUsize::new(0));
2618
2619 rt.block_on(async {
2620 let cancel_token = CancellationToken::new();
2621 let task_tracker = TaskTracker::new();
2622
2623 for i in 0..3 {
2625 let count_clone = Arc::clone(&completed_count);
2626 let cancel_clone = cancel_token.clone();
2627 task_tracker.spawn_blocking(move || {
2628 loop {
2630 if cancel_clone.is_cancelled() {
2631 break;
2632 }
2633 thread::sleep(Duration::from_millis(10));
2634 }
2635 thread::sleep(Duration::from_millis(10 * (i as u64 + 1)));
2637 count_clone.fetch_add(1, Ordering::SeqCst);
2638 });
2639 }
2640
2641 thread::sleep(Duration::from_millis(30));
2643 assert_eq!(
2644 completed_count.load(Ordering::SeqCst),
2645 0,
2646 "No tasks should complete before shutdown"
2647 );
2648
2649 cancel_token.cancel();
2651 task_tracker.close();
2652
2653 let timeout_result: std::result::Result<(), tokio::time::error::Elapsed> =
2655 tokio::time::timeout(Duration::from_secs(2), task_tracker.wait()).await;
2656
2657 assert!(
2658 timeout_result.is_ok(),
2659 "All tasks should complete within timeout"
2660 );
2661
2662 assert_eq!(
2663 completed_count.load(Ordering::SeqCst),
2664 3,
2665 "All 3 tasks should complete during graceful shutdown"
2666 );
2667 });
2668 }
2669
2670 #[test]
2671 fn test_spawn_blocking_vs_spawn_difference() {
2672 use std::sync::Arc;
2676 use std::sync::atomic::{AtomicBool, Ordering};
2677
2678 let rt = tokio::runtime::Builder::new_current_thread()
2679 .enable_all()
2680 .build()
2681 .expect("Failed to create runtime");
2682
2683 let untracked_done = Arc::new(AtomicBool::new(false));
2685 let untracked_done_clone = Arc::clone(&untracked_done);
2686
2687 rt.block_on(async {
2688 let task_tracker = TaskTracker::new();
2689
2690 let _handle = thread::spawn(move || {
2692 thread::sleep(Duration::from_millis(100));
2693 untracked_done_clone.store(true, Ordering::SeqCst);
2694 });
2695
2696 task_tracker.close();
2697 task_tracker.wait().await;
2698
2699 });
2703
2704 let tracked_done = Arc::new(AtomicBool::new(false));
2706 let tracked_done_clone = Arc::clone(&tracked_done);
2707
2708 rt.block_on(async {
2709 let task_tracker = TaskTracker::new();
2710
2711 task_tracker.spawn_blocking(move || {
2713 thread::sleep(Duration::from_millis(50));
2714 tracked_done_clone.store(true, Ordering::SeqCst);
2715 });
2716
2717 task_tracker.close();
2718 task_tracker.wait().await;
2719
2720 assert!(
2722 tracked_done.load(Ordering::SeqCst),
2723 "spawn_blocking task should complete before wait() returns"
2724 );
2725 });
2726 }
2727
2728 #[test]
2729 fn test_event_thread_pattern_with_poll_timeout() {
2730 use std::sync::Arc;
2734 use std::sync::atomic::{AtomicUsize, Ordering};
2735
2736 let rt = tokio::runtime::Builder::new_current_thread()
2737 .enable_all()
2738 .build()
2739 .expect("Failed to create runtime");
2740
2741 let poll_count = Arc::new(AtomicUsize::new(0));
2742 let poll_count_clone = Arc::clone(&poll_count);
2743
2744 rt.block_on(async {
2745 let cancel_token = CancellationToken::new();
2746 let task_tracker = TaskTracker::new();
2747
2748 let cancel_clone = cancel_token.clone();
2749
2750 task_tracker.spawn_blocking(move || {
2752 loop {
2753 if cancel_clone.is_cancelled() {
2754 break;
2755 }
2756 thread::sleep(Duration::from_millis(25));
2758 poll_count_clone.fetch_add(1, Ordering::SeqCst);
2759 }
2760 });
2761
2762 thread::sleep(Duration::from_millis(100));
2764 let count_before_cancel = poll_count.load(Ordering::SeqCst);
2765 assert!(
2766 count_before_cancel >= 2,
2767 "Thread should have polled multiple times: {}",
2768 count_before_cancel
2769 );
2770
2771 cancel_token.cancel();
2773 task_tracker.close();
2774 task_tracker.wait().await;
2775
2776 let final_count = poll_count.load(Ordering::SeqCst);
2778 assert!(
2779 final_count >= count_before_cancel,
2780 "Poll count should not decrease"
2781 );
2782 });
2783 }
2784}