1use std::io::{self, Read, Write};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender};
9use std::sync::{Arc, Mutex};
10use std::thread;
11use std::time::Duration;
12
13#[cfg(feature = "async")]
14use crate::command::CommandKind;
15
16#[cfg(feature = "async")]
17use tokio_util::sync::CancellationToken;
18
19#[cfg(feature = "async")]
20use tokio_util::task::TaskTracker;
21
22use tracing::debug;
23
24#[cfg(feature = "thread-pool")]
33fn spawn_batch(f: impl FnOnce() + Send + 'static) {
34 rayon::spawn(f);
35}
36
37#[cfg(not(feature = "thread-pool"))]
38fn spawn_batch(f: impl FnOnce() + Send + 'static) {
39 let _ = thread::spawn(f);
40}
41
42use crossterm::{
43 cursor::{Hide, MoveTo, Show},
44 event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyEventKind},
45 execute,
46 terminal::{
47 self, Clear, ClearType, EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode,
48 enable_raw_mode,
49 },
50};
51
52use crate::command::Cmd;
53use crate::key::{from_crossterm_key, is_sequence_prefix};
54use crate::message::{
55 BatchMsg, BlurMsg, FocusMsg, InterruptMsg, Message, PrintLineMsg, QuitMsg,
56 RequestWindowSizeMsg, SequenceMsg, SetWindowTitleMsg, WindowSizeMsg,
57};
58use crate::mouse::from_crossterm_mouse;
59use crate::screen::{ReleaseTerminalMsg, RestoreTerminalMsg};
60use crate::{KeyMsg, KeyType};
61
62#[derive(thiserror::Error, Debug)]
115pub enum Error {
116 #[error("terminal io error: {0}")]
137 Io(#[from] io::Error),
138
139 #[error("failed to {action} raw mode: {source}")]
150 RawModeFailure {
151 action: &'static str,
153 #[source]
155 source: io::Error,
156 },
157
158 #[error("failed to {action} alternate screen: {source}")]
168 AltScreenFailure {
169 action: &'static str,
171 #[source]
173 source: io::Error,
174 },
175
176 #[error("failed to poll terminal events: {0}")]
185 EventPoll(io::Error),
186
187 #[error("failed to render view: {0}")]
196 Render(io::Error),
197}
198
199pub type Result<T> = std::result::Result<T, Error>;
230
231pub trait Model: Send + 'static {
258 fn init(&self) -> Option<Cmd>;
262
263 fn update(&mut self, msg: Message) -> Option<Cmd>;
267
268 fn view(&self) -> String;
272}
273
274#[derive(Debug, Clone)]
276pub struct ProgramOptions {
277 pub alt_screen: bool,
279 pub mouse_cell_motion: bool,
281 pub mouse_all_motion: bool,
283 pub bracketed_paste: bool,
285 pub report_focus: bool,
287 pub custom_io: bool,
289 pub fps: u32,
291 pub without_signals: bool,
293 pub without_catch_panics: bool,
295}
296
297impl Default for ProgramOptions {
298 fn default() -> Self {
299 Self {
300 alt_screen: false,
301 mouse_cell_motion: false,
302 mouse_all_motion: false,
303 bracketed_paste: true,
304 report_focus: false,
305 custom_io: false,
306 fps: 60,
307 without_signals: false,
308 without_catch_panics: false,
309 }
310 }
311}
312
313pub struct ProgramHandle<M: Model> {
335 tx: Sender<Message>,
336 handle: Option<thread::JoinHandle<Result<M>>>,
337}
338
339impl<M: Model> ProgramHandle<M> {
340 pub fn send<T: Into<Message>>(&self, msg: T) -> bool {
346 self.tx.send(msg.into()).is_ok()
347 }
348
349 pub fn quit(&self) {
353 let _ = self.tx.send(Message::new(QuitMsg));
354 }
355
356 pub fn wait(mut self) -> Result<M> {
360 if let Some(handle) = self.handle.take() {
361 handle
362 .join()
363 .map_err(|_| Error::Io(io::Error::other("program thread panicked")))?
364 } else {
365 Err(Error::Io(io::Error::other("program already joined")))
366 }
367 }
368
369 pub fn is_running(&self) -> bool {
371 self.handle.as_ref().is_some_and(|h| !h.is_finished())
372 }
373}
374
375pub struct Program<M: Model> {
393 model: M,
394 options: ProgramOptions,
395 external_rx: Option<Receiver<Message>>,
396 input: Option<Box<dyn Read + Send>>,
397 output: Option<Box<dyn Write + Send>>,
398}
399
400impl<M: Model> Program<M> {
401 pub fn new(model: M) -> Self {
403 Self {
404 model,
405 options: ProgramOptions::default(),
406 external_rx: None,
407 input: None,
408 output: None,
409 }
410 }
411
412 pub fn with_input_receiver(mut self, rx: Receiver<Message>) -> Self {
417 self.external_rx = Some(rx);
418 self
419 }
420
421 pub fn with_input<R: Read + Send + 'static>(mut self, input: R) -> Self {
426 self.input = Some(Box::new(input));
427 self.options.custom_io = true;
428 self
429 }
430
431 pub fn with_output<W: Write + Send + 'static>(mut self, output: W) -> Self {
435 self.output = Some(Box::new(output));
436 self.options.custom_io = true;
437 self
438 }
439
440 pub fn with_alt_screen(mut self) -> Self {
442 self.options.alt_screen = true;
443 self
444 }
445
446 pub fn with_mouse_cell_motion(mut self) -> Self {
450 self.options.mouse_cell_motion = true;
451 self
452 }
453
454 pub fn with_mouse_all_motion(mut self) -> Self {
458 self.options.mouse_all_motion = true;
459 self
460 }
461
462 pub fn with_fps(mut self, fps: u32) -> Self {
466 self.options.fps = fps.clamp(1, 120);
467 self
468 }
469
470 pub fn with_report_focus(mut self) -> Self {
474 self.options.report_focus = true;
475 self
476 }
477
478 pub fn without_bracketed_paste(mut self) -> Self {
480 self.options.bracketed_paste = false;
481 self
482 }
483
484 pub fn without_signal_handler(mut self) -> Self {
486 self.options.without_signals = true;
487 self
488 }
489
490 pub fn without_catch_panics(mut self) -> Self {
492 self.options.without_catch_panics = true;
493 self
494 }
495
496 pub fn with_custom_io(mut self) -> Self {
501 self.options.custom_io = true;
502 self
503 }
504
505 pub fn run_with_writer<W: Write + Send + 'static>(self, mut writer: W) -> Result<M> {
507 let options = self.options.clone();
509
510 if !options.custom_io {
512 enable_raw_mode()?;
513 }
514
515 if options.alt_screen {
516 execute!(writer, EnterAlternateScreen)?;
517 }
518
519 execute!(writer, Hide)?;
520
521 if options.mouse_all_motion {
522 execute!(writer, EnableMouseCapture)?;
523 } else if options.mouse_cell_motion {
524 execute!(writer, EnableMouseCapture)?;
525 }
526
527 if options.report_focus {
528 execute!(writer, event::EnableFocusChange)?;
529 }
530
531 if options.bracketed_paste {
532 execute!(writer, event::EnableBracketedPaste)?;
533 }
534
535 let prev_hook = if !options.without_catch_panics {
539 let cleanup_opts = options.clone();
540 let prev = std::panic::take_hook();
541 std::panic::set_hook(Box::new(move |info| {
542 let mut stderr = io::stderr();
545 if cleanup_opts.bracketed_paste {
546 let _ = execute!(stderr, event::DisableBracketedPaste);
547 }
548 if cleanup_opts.report_focus {
549 let _ = execute!(stderr, event::DisableFocusChange);
550 }
551 if cleanup_opts.mouse_all_motion || cleanup_opts.mouse_cell_motion {
552 let _ = execute!(stderr, DisableMouseCapture);
553 }
554 let _ = execute!(stderr, Show);
555 if cleanup_opts.alt_screen {
556 let _ = execute!(stderr, LeaveAlternateScreen);
557 }
558 if !cleanup_opts.custom_io {
559 let _ = disable_raw_mode();
560 }
561 prev(info);
563 }));
564 true
565 } else {
566 false
567 };
568
569 let result = self.event_loop(&mut writer);
571
572 if prev_hook {
575 let _ = std::panic::take_hook();
576 }
579
580 if options.bracketed_paste {
582 let _ = execute!(writer, event::DisableBracketedPaste);
583 }
584
585 if options.report_focus {
586 let _ = execute!(writer, event::DisableFocusChange);
587 }
588
589 if options.mouse_all_motion || options.mouse_cell_motion {
590 let _ = execute!(writer, DisableMouseCapture);
591 }
592
593 let _ = execute!(writer, Show);
594
595 if options.alt_screen {
596 let _ = execute!(writer, LeaveAlternateScreen);
597 }
598
599 if !options.custom_io {
600 let _ = disable_raw_mode();
601 }
602
603 result
604 }
605
606 pub fn run(mut self) -> Result<M> {
608 if let Some(output) = self.output.take() {
609 return self.run_with_writer(output);
610 }
611
612 let stdout = io::stdout();
613 self.run_with_writer(stdout)
614 }
615
616 pub fn start(mut self) -> ProgramHandle<M> {
640 let (tx, rx) = mpsc::channel();
642
643 self.external_rx = Some(rx);
645
646 let output = self.output.take();
648
649 let handle = thread::spawn(move || {
651 if let Some(output) = output {
652 self.run_with_writer(output)
653 } else {
654 let stdout = io::stdout();
655 self.run_with_writer(stdout)
656 }
657 });
658
659 ProgramHandle {
660 tx,
661 handle: Some(handle),
662 }
663 }
664
665 fn event_loop<W: Write>(mut self, writer: &mut W) -> Result<M> {
666 let (tx, rx): (Sender<Message>, Receiver<Message>) = mpsc::channel();
668
669 let mut external_forwarder_handle: Option<thread::JoinHandle<()>> = None;
671 let mut input_parser_handle: Option<thread::JoinHandle<()>> = None;
672 let external_shutdown = Arc::new(AtomicBool::new(false));
673
674 let command_threads: Arc<Mutex<Vec<thread::JoinHandle<()>>>> =
676 Arc::new(Mutex::new(Vec::new()));
677
678 if let Some(ext_rx) = self.external_rx.take() {
680 let tx_clone = tx.clone();
681 let shutdown_clone = Arc::clone(&external_shutdown);
682 debug!(target: "bubbletea::thread", "Spawning external forwarder thread");
683 external_forwarder_handle = Some(thread::spawn(move || {
684 const POLL_INTERVAL: Duration = Duration::from_millis(50);
685 loop {
686 if shutdown_clone.load(Ordering::Relaxed) {
687 break;
688 }
689
690 match ext_rx.recv_timeout(POLL_INTERVAL) {
691 Ok(msg) => {
692 if tx_clone.send(msg).is_err() {
693 debug!(target: "bubbletea::event", "external message dropped — receiver disconnected");
694 break;
695 }
696 }
697 Err(RecvTimeoutError::Timeout) => {}
698 Err(RecvTimeoutError::Disconnected) => break,
699 }
700 }
701 }));
702 }
703
704 if let Some(mut input) = self.input.take() {
706 let tx_clone = tx.clone();
707 debug!(target: "bubbletea::thread", "Spawning input parser thread");
708 input_parser_handle = Some(thread::spawn(move || {
709 let mut parser = InputParser::new();
710 let mut buf = [0u8; 256];
711 loop {
712 match input.read(&mut buf) {
713 Ok(0) => break,
714 Ok(n) => {
715 let can_have_more_data = true;
717 for msg in parser.push_bytes(&buf[..n], can_have_more_data) {
718 if tx_clone.send(msg).is_err() {
719 debug!(target: "bubbletea::input", "input message dropped — receiver disconnected");
720 return;
721 }
722 }
723 }
724 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
725 thread::yield_now();
726 }
727 Err(_) => break,
728 }
729 }
730
731 for msg in parser.flush() {
732 if tx_clone.send(msg).is_err() {
733 debug!(target: "bubbletea::input", "flush message dropped — receiver disconnected");
734 break;
735 }
736 }
737 }));
738 }
739
740 if !self.options.custom_io
742 && let Ok((width, height)) = terminal::size()
743 && tx
744 .send(Message::new(WindowSizeMsg { width, height }))
745 .is_err()
746 {
747 debug!(target: "bubbletea::event", "initial window size dropped — receiver disconnected");
748 }
749
750 if let Some(cmd) = self.model.init() {
752 self.handle_command(cmd, tx.clone(), Arc::clone(&command_threads));
753 }
754
755 let mut last_view = String::new();
757 self.render(writer, &mut last_view)?;
758
759 let frame_duration = Duration::from_secs_f64(1.0 / self.options.fps as f64);
761
762 loop {
764 if !self.options.custom_io && event::poll(frame_duration)? {
768 match event::read()? {
769 Event::Key(key_event) => {
770 if key_event.kind != KeyEventKind::Press {
772 continue;
773 }
774
775 let key_msg = from_crossterm_key(key_event.code, key_event.modifiers);
776
777 if key_msg.key_type == crate::KeyType::CtrlC {
779 if tx.send(Message::new(InterruptMsg)).is_err() {
780 debug!(target: "bubbletea::event", "interrupt message dropped — receiver disconnected");
781 }
782 } else if tx.send(Message::new(key_msg)).is_err() {
783 debug!(target: "bubbletea::event", "key message dropped — receiver disconnected");
784 }
785 }
786 Event::Mouse(mouse_event) => {
787 let mouse_msg = from_crossterm_mouse(mouse_event);
788 if tx.send(Message::new(mouse_msg)).is_err() {
789 debug!(target: "bubbletea::event", "mouse message dropped — receiver disconnected");
790 }
791 }
792 Event::Resize(width, height) => {
793 if tx
794 .send(Message::new(WindowSizeMsg { width, height }))
795 .is_err()
796 {
797 debug!(target: "bubbletea::event", "resize message dropped — receiver disconnected");
798 }
799 }
800 Event::FocusGained => {
801 if tx.send(Message::new(FocusMsg)).is_err() {
802 debug!(target: "bubbletea::event", "focus message dropped — receiver disconnected");
803 }
804 }
805 Event::FocusLost => {
806 if tx.send(Message::new(BlurMsg)).is_err() {
807 debug!(target: "bubbletea::event", "blur message dropped — receiver disconnected");
808 }
809 }
810 Event::Paste(text) => {
811 let key_msg = KeyMsg {
813 key_type: crate::KeyType::Runes,
814 runes: text.chars().collect(),
815 alt: false,
816 paste: true,
817 };
818 if tx.send(Message::new(key_msg)).is_err() {
819 debug!(target: "bubbletea::event", "paste message dropped — receiver disconnected");
820 }
821 }
822 }
823 }
824
825 let mut needs_render = false;
827 let mut should_quit = false;
828 while let Ok(msg) = rx.try_recv() {
829 if msg.is::<QuitMsg>() {
831 should_quit = true;
832 break;
833 }
834
835 if msg.is::<InterruptMsg>() {
837 should_quit = true;
838 break;
839 }
840
841 if msg.is::<BatchMsg>() {
843 continue;
844 }
845
846 if let Some(title_msg) = msg.downcast_ref::<SetWindowTitleMsg>() {
848 execute!(writer, terminal::SetTitle(&title_msg.0))?;
849 continue;
850 }
851
852 if msg.is::<RequestWindowSizeMsg>() {
854 if !self.options.custom_io
855 && let Ok((width, height)) = terminal::size()
856 && tx
857 .send(Message::new(WindowSizeMsg { width, height }))
858 .is_err()
859 {
860 debug!(target: "bubbletea::event", "window size response dropped — receiver disconnected");
861 }
862 continue;
863 }
864
865 if let Some(print_msg) = msg.downcast_ref::<PrintLineMsg>() {
867 if !self.options.alt_screen {
868 for line in print_msg.0.lines() {
870 let _ = writeln!(writer, "{}", line);
871 }
872 let _ = writer.flush();
873 last_view.clear();
875 needs_render = true;
876 }
877 continue;
878 }
879
880 if msg.is::<ReleaseTerminalMsg>() {
882 if !self.options.custom_io {
883 if self.options.bracketed_paste {
885 let _ = execute!(writer, event::DisableBracketedPaste);
886 }
887 if self.options.report_focus {
888 let _ = execute!(writer, event::DisableFocusChange);
889 }
890 if self.options.mouse_all_motion || self.options.mouse_cell_motion {
891 let _ = execute!(writer, DisableMouseCapture);
892 }
893 let _ = execute!(writer, Show);
894 if self.options.alt_screen {
895 let _ = execute!(writer, LeaveAlternateScreen);
896 }
897 let _ = disable_raw_mode();
898 }
899 continue;
900 }
901
902 if msg.is::<RestoreTerminalMsg>() {
904 if !self.options.custom_io {
905 let _ = enable_raw_mode();
907 if self.options.alt_screen {
908 let _ = execute!(writer, EnterAlternateScreen);
909 }
910 let _ = execute!(writer, Hide);
911 if self.options.mouse_all_motion {
912 let _ = execute!(writer, EnableMouseCapture);
913 } else if self.options.mouse_cell_motion {
914 let _ = execute!(writer, EnableMouseCapture);
915 }
916 if self.options.report_focus {
917 let _ = execute!(writer, event::EnableFocusChange);
918 }
919 if self.options.bracketed_paste {
920 let _ = execute!(writer, event::EnableBracketedPaste);
921 }
922 last_view.clear();
924 }
925 needs_render = true;
926 continue;
927 }
928
929 if let Some(cmd) = self.model.update(msg) {
931 self.handle_command(cmd, tx.clone(), Arc::clone(&command_threads));
932 }
933 needs_render = true;
934 }
935
936 if should_quit {
938 break;
939 }
940
941 if needs_render {
943 self.render(writer, &mut last_view)?;
944 }
945
946 if self.options.custom_io {
948 thread::sleep(frame_duration);
949 }
950 }
951
952 external_shutdown.store(true, Ordering::Relaxed);
954 drop(tx);
955 debug!(target: "bubbletea::thread", "Sender dropped, waiting for threads to exit");
956
957 if let Some(handle) = external_forwarder_handle {
958 match handle.join() {
959 Ok(()) => {
960 debug!(target: "bubbletea::thread", "External forwarder thread joined successfully")
961 }
962 Err(e) => {
963 tracing::warn!(target: "bubbletea::thread", "External forwarder thread panicked: {:?}", e)
964 }
965 }
966 }
967
968 if let Some(handle) = input_parser_handle {
969 match handle.join() {
970 Ok(()) => {
971 debug!(target: "bubbletea::thread", "Input parser thread joined successfully")
972 }
973 Err(e) => {
974 tracing::warn!(target: "bubbletea::thread", "Input parser thread panicked: {:?}", e)
975 }
976 }
977 }
978
979 const COMMAND_THREAD_TIMEOUT: Duration = Duration::from_secs(5);
982 let join_deadline = std::time::Instant::now() + COMMAND_THREAD_TIMEOUT;
983
984 if let Ok(mut threads) = command_threads.lock() {
985 let thread_count = threads.len();
986 if thread_count > 0 {
987 debug!(target: "bubbletea::thread", "Waiting for {} command thread(s) to complete", thread_count);
988 }
989
990 for handle in threads.drain(..) {
992 if handle.is_finished() {
993 let _ = handle.join();
995 } else {
996 let remaining =
998 join_deadline.saturating_duration_since(std::time::Instant::now());
999 if remaining.is_zero() {
1000 debug!(target: "bubbletea::thread", "Timeout waiting for command threads, abandoning remaining");
1001 break;
1002 }
1003
1004 let poll_interval = Duration::from_millis(10);
1006 let start = std::time::Instant::now();
1007 while !handle.is_finished() && start.elapsed() < remaining {
1008 thread::sleep(poll_interval);
1009 }
1010
1011 if handle.is_finished() {
1012 match handle.join() {
1013 Ok(()) => {
1014 debug!(target: "bubbletea::thread", "Command thread joined successfully")
1015 }
1016 Err(e) => {
1017 tracing::warn!(target: "bubbletea::thread", "Command thread panicked: {:?}", e)
1018 }
1019 }
1020 } else {
1021 debug!(target: "bubbletea::thread", "Command thread did not finish in time, abandoning");
1022 }
1023 }
1024 }
1025 } else {
1026 tracing::warn!(target: "bubbletea::thread", "Failed to join command threads: mutex poisoned");
1027 }
1028
1029 Ok(self.model)
1030 }
1031
1032 fn handle_command(
1033 &self,
1034 cmd: Cmd,
1035 tx: Sender<Message>,
1036 command_threads: Arc<Mutex<Vec<thread::JoinHandle<()>>>>,
1037 ) {
1038 let handle = thread::spawn(move || {
1040 if let Some(msg) = cmd.execute() {
1041 if msg.is::<BatchMsg>() {
1043 if let Some(batch) = msg.downcast::<BatchMsg>() {
1044 for cmd in batch.0 {
1045 let tx_clone = tx.clone();
1046 spawn_batch(move || {
1047 if let Some(msg) = cmd.execute()
1048 && tx_clone.send(msg).is_err()
1049 {
1050 debug!(target: "bubbletea::command", "batch command result dropped — receiver disconnected");
1051 }
1052 });
1053 }
1054 }
1055 } else if msg.is::<SequenceMsg>() {
1056 if let Some(seq) = msg.downcast::<SequenceMsg>() {
1057 for cmd in seq.0 {
1058 if let Some(msg) = cmd.execute()
1059 && tx.send(msg).is_err()
1060 {
1061 debug!(target: "bubbletea::command", "sequence command result dropped — receiver disconnected");
1062 break;
1063 }
1064 }
1065 }
1066 } else if tx.send(msg).is_err() {
1067 debug!(target: "bubbletea::command", "command result dropped — receiver disconnected");
1068 }
1069 }
1070 });
1071
1072 if let Ok(mut threads) = command_threads.lock() {
1074 threads.retain(|h| !h.is_finished());
1076 threads.push(handle);
1077 } else {
1078 debug!(target: "bubbletea::thread", "Failed to track command thread: mutex poisoned");
1081 }
1082 }
1083
1084 fn render<W: Write>(&self, writer: &mut W, last_view: &mut String) -> Result<()> {
1085 let view = self.model.view();
1086
1087 if view == *last_view {
1089 return Ok(());
1090 }
1091
1092 execute!(writer, MoveTo(0, 0), Clear(ClearType::All))?;
1094 write!(writer, "{}", view)?;
1095 writer.flush()?;
1096
1097 *last_view = view;
1098 Ok(())
1099 }
1100}
1101
1102#[cfg(feature = "async")]
1107impl<M: Model> Program<M> {
1108 pub async fn run_async(mut self) -> Result<M> {
1129 if let Some(output) = self.output.take() {
1130 return self.run_async_with_writer(output).await;
1131 }
1132
1133 let stdout = io::stdout();
1134 self.run_async_with_writer(stdout).await
1135 }
1136
1137 pub async fn run_async_with_writer<W: Write + Send + 'static>(
1139 self,
1140 mut writer: W,
1141 ) -> Result<M> {
1142 let options = self.options.clone();
1144
1145 if !options.custom_io {
1147 enable_raw_mode()?;
1148 }
1149
1150 if options.alt_screen {
1151 execute!(writer, EnterAlternateScreen)?;
1152 }
1153
1154 execute!(writer, Hide)?;
1155
1156 if options.mouse_all_motion {
1157 execute!(writer, EnableMouseCapture)?;
1158 } else if options.mouse_cell_motion {
1159 execute!(writer, EnableMouseCapture)?;
1160 }
1161
1162 if options.report_focus {
1163 execute!(writer, event::EnableFocusChange)?;
1164 }
1165
1166 if options.bracketed_paste {
1167 execute!(writer, event::EnableBracketedPaste)?;
1168 }
1169
1170 let prev_hook = if !options.without_catch_panics {
1172 let cleanup_opts = options.clone();
1173 let prev = std::panic::take_hook();
1174 std::panic::set_hook(Box::new(move |info| {
1175 let mut stderr = io::stderr();
1176 if cleanup_opts.bracketed_paste {
1177 let _ = execute!(stderr, event::DisableBracketedPaste);
1178 }
1179 if cleanup_opts.report_focus {
1180 let _ = execute!(stderr, event::DisableFocusChange);
1181 }
1182 if cleanup_opts.mouse_all_motion || cleanup_opts.mouse_cell_motion {
1183 let _ = execute!(stderr, DisableMouseCapture);
1184 }
1185 let _ = execute!(stderr, Show);
1186 if cleanup_opts.alt_screen {
1187 let _ = execute!(stderr, LeaveAlternateScreen);
1188 }
1189 if !cleanup_opts.custom_io {
1190 let _ = disable_raw_mode();
1191 }
1192 prev(info);
1193 }));
1194 true
1195 } else {
1196 false
1197 };
1198
1199 let result = self.event_loop_async(&mut writer).await;
1201
1202 if prev_hook {
1204 let _ = std::panic::take_hook();
1205 }
1206
1207 if options.bracketed_paste {
1209 let _ = execute!(writer, event::DisableBracketedPaste);
1210 }
1211
1212 if options.report_focus {
1213 let _ = execute!(writer, event::DisableFocusChange);
1214 }
1215
1216 if options.mouse_all_motion || options.mouse_cell_motion {
1217 let _ = execute!(writer, DisableMouseCapture);
1218 }
1219
1220 let _ = execute!(writer, Show);
1221
1222 if options.alt_screen {
1223 let _ = execute!(writer, LeaveAlternateScreen);
1224 }
1225
1226 if !options.custom_io {
1227 let _ = disable_raw_mode();
1228 }
1229
1230 result
1231 }
1232
1233 async fn event_loop_async<W: Write>(mut self, stdout: &mut W) -> Result<M> {
1234 let (tx, mut rx) = tokio::sync::mpsc::channel::<Message>(256);
1236
1237 let cancel_token = CancellationToken::new();
1239 let task_tracker = TaskTracker::new();
1240
1241 if let Some(ext_rx) = self.external_rx.take() {
1244 let tx_clone = tx.clone();
1245 let cancel_clone = cancel_token.clone();
1246 task_tracker.spawn_blocking(move || {
1247 let timeout = Duration::from_millis(100);
1249 loop {
1250 if cancel_clone.is_cancelled() {
1251 break;
1252 }
1253 match ext_rx.recv_timeout(timeout) {
1254 Ok(msg) => {
1255 if tx_clone.blocking_send(msg).is_err() {
1256 debug!(target: "bubbletea::event", "async external message dropped — receiver disconnected");
1257 break;
1258 }
1259 }
1260 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
1261 }
1263 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
1264 break;
1266 }
1267 }
1268 }
1269 });
1270 }
1271
1272 if let Some(mut input) = self.input.take() {
1274 let tx_clone = tx.clone();
1275 let cancel_clone = cancel_token.clone();
1276 task_tracker.spawn_blocking(move || {
1277 let mut parser = InputParser::new();
1278 let mut buf = [0u8; 256];
1279 loop {
1280 if cancel_clone.is_cancelled() {
1281 break;
1282 }
1283 match input.read(&mut buf) {
1284 Ok(0) => break,
1285 Ok(n) => {
1286 let can_have_more_data = true;
1288 for msg in parser.push_bytes(&buf[..n], can_have_more_data) {
1289 if tx_clone.blocking_send(msg).is_err() {
1290 return;
1291 }
1292 }
1293 }
1294 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1295 std::thread::yield_now();
1296 }
1297 Err(_) => break,
1298 }
1299 }
1300
1301 for msg in parser.flush() {
1302 if tx_clone.blocking_send(msg).is_err() {
1303 break;
1304 }
1305 }
1306 });
1307 }
1308
1309 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<Event>(100);
1311 let event_cancel = cancel_token.clone();
1312
1313 if !self.options.custom_io {
1314 task_tracker.spawn_blocking(move || {
1315 loop {
1316 if event_cancel.is_cancelled() {
1317 break;
1318 }
1319 match event::poll(Duration::from_millis(100)) {
1321 Ok(true) => {
1322 if let Ok(evt) = event::read()
1323 && event_tx.blocking_send(evt).is_err()
1324 {
1325 break;
1326 }
1327 }
1328 Ok(false) => {} Err(_) => {
1330 break;
1331 } }
1333 }
1334 });
1335 }
1336
1337 if !self.options.custom_io {
1339 let (width, height) = terminal::size()?;
1340 if tx
1341 .send(Message::new(WindowSizeMsg { width, height }))
1342 .await
1343 .is_err()
1344 {
1345 debug!(target: "bubbletea::event", "async initial window size dropped — receiver disconnected");
1346 }
1347 }
1348
1349 if let Some(cmd) = self.model.init() {
1351 Self::handle_command_tracked(
1352 cmd.into(),
1353 tx.clone(),
1354 &task_tracker,
1355 cancel_token.clone(),
1356 );
1357 }
1358
1359 let mut last_view = String::new();
1361 self.render(stdout, &mut last_view)?;
1362
1363 let frame_duration = Duration::from_secs_f64(1.0 / self.options.fps as f64);
1365 let mut frame_interval = tokio::time::interval(frame_duration);
1366
1367 loop {
1369 tokio::select! {
1370 Some(event) = event_rx.recv(), if !self.options.custom_io => {
1372 match event {
1373 Event::Key(key_event) => {
1374 if key_event.kind != KeyEventKind::Press {
1376 continue;
1377 }
1378
1379 let key_msg = from_crossterm_key(key_event.code, key_event.modifiers);
1380
1381 if key_msg.key_type == crate::KeyType::CtrlC {
1383 if tx.send(Message::new(InterruptMsg)).await.is_err() {
1384 debug!(target: "bubbletea::event", "async interrupt message dropped — receiver disconnected");
1385 }
1386 } else if tx.send(Message::new(key_msg)).await.is_err() {
1387 debug!(target: "bubbletea::event", "async key message dropped — receiver disconnected");
1388 }
1389 }
1390 Event::Mouse(mouse_event) => {
1391 let mouse_msg = from_crossterm_mouse(mouse_event);
1392 if tx.send(Message::new(mouse_msg)).await.is_err() {
1393 debug!(target: "bubbletea::event", "async mouse message dropped — receiver disconnected");
1394 }
1395 }
1396 Event::Resize(width, height) => {
1397 if tx.send(Message::new(WindowSizeMsg { width, height })).await.is_err() {
1398 debug!(target: "bubbletea::event", "async resize message dropped — receiver disconnected");
1399 }
1400 }
1401 Event::FocusGained => {
1402 if tx.send(Message::new(FocusMsg)).await.is_err() {
1403 debug!(target: "bubbletea::event", "async focus message dropped — receiver disconnected");
1404 }
1405 }
1406 Event::FocusLost => {
1407 if tx.send(Message::new(BlurMsg)).await.is_err() {
1408 debug!(target: "bubbletea::event", "async blur message dropped — receiver disconnected");
1409 }
1410 }
1411 Event::Paste(text) => {
1412 let key_msg = KeyMsg {
1414 key_type: crate::KeyType::Runes,
1415 runes: text.chars().collect(),
1416 alt: false,
1417 paste: true,
1418 };
1419 if tx.send(Message::new(key_msg)).await.is_err() {
1420 debug!(target: "bubbletea::event", "async paste message dropped — receiver disconnected");
1421 }
1422 }
1423 }
1424 }
1425
1426 Some(msg) = rx.recv() => {
1428 if msg.is::<QuitMsg>() {
1430 Self::graceful_shutdown(&cancel_token, &task_tracker).await;
1431 return Ok(self.model);
1432 }
1433
1434 if msg.is::<InterruptMsg>() {
1436 Self::graceful_shutdown(&cancel_token, &task_tracker).await;
1437 return Ok(self.model);
1438 }
1439
1440 if msg.is::<BatchMsg>() {
1442 continue;
1443 }
1444
1445 if let Some(title_msg) = msg.downcast_ref::<SetWindowTitleMsg>() {
1447 execute!(stdout, terminal::SetTitle(&title_msg.0))?;
1448 continue;
1449 }
1450
1451 if msg.is::<RequestWindowSizeMsg>() {
1453 if !self.options.custom_io {
1454 let (width, height) = terminal::size()?;
1455 if tx.send(Message::new(WindowSizeMsg { width, height })).await.is_err() {
1456 debug!(target: "bubbletea::event", "async window size response dropped — receiver disconnected");
1457 }
1458 }
1459 continue;
1460 }
1461
1462 if let Some(print_msg) = msg.downcast_ref::<PrintLineMsg>() {
1464 if !self.options.alt_screen {
1465 for line in print_msg.0.lines() {
1467 let _ = writeln!(stdout, "{}", line);
1468 }
1469 let _ = stdout.flush();
1470 last_view.clear();
1472 }
1473 self.render(stdout, &mut last_view)?;
1474 continue;
1475 }
1476
1477 if msg.is::<ReleaseTerminalMsg>() {
1479 if !self.options.custom_io {
1480 if self.options.bracketed_paste {
1482 let _ = execute!(stdout, event::DisableBracketedPaste);
1483 }
1484 if self.options.report_focus {
1485 let _ = execute!(stdout, event::DisableFocusChange);
1486 }
1487 if self.options.mouse_all_motion || self.options.mouse_cell_motion {
1488 let _ = execute!(stdout, DisableMouseCapture);
1489 }
1490 let _ = execute!(stdout, Show);
1491 if self.options.alt_screen {
1492 let _ = execute!(stdout, LeaveAlternateScreen);
1493 }
1494 let _ = disable_raw_mode();
1495 }
1496 continue;
1497 }
1498
1499 if msg.is::<RestoreTerminalMsg>() {
1501 if !self.options.custom_io {
1502 let _ = enable_raw_mode();
1504 if self.options.alt_screen {
1505 let _ = execute!(stdout, EnterAlternateScreen);
1506 }
1507 let _ = execute!(stdout, Hide);
1508 if self.options.mouse_all_motion {
1509 let _ = execute!(stdout, EnableMouseCapture);
1510 } else if self.options.mouse_cell_motion {
1511 let _ = execute!(stdout, EnableMouseCapture);
1512 }
1513 if self.options.report_focus {
1514 let _ = execute!(stdout, event::EnableFocusChange);
1515 }
1516 if self.options.bracketed_paste {
1517 let _ = execute!(stdout, event::EnableBracketedPaste);
1518 }
1519 last_view.clear();
1521 }
1522 self.render(stdout, &mut last_view)?;
1523 continue;
1524 }
1525
1526 if let Some(cmd) = self.model.update(msg) {
1528 Self::handle_command_tracked(
1529 cmd.into(),
1530 tx.clone(),
1531 &task_tracker,
1532 cancel_token.clone(),
1533 );
1534 }
1535
1536 self.render(stdout, &mut last_view)?;
1538 }
1539
1540 _ = frame_interval.tick() => {
1542 }
1544 }
1545 }
1546 }
1547
1548 async fn graceful_shutdown(cancel_token: &CancellationToken, task_tracker: &TaskTracker) {
1550 cancel_token.cancel();
1552
1553 task_tracker.close();
1555
1556 let shutdown_timeout = Duration::from_secs(5);
1558 let _ = tokio::time::timeout(shutdown_timeout, task_tracker.wait()).await;
1559 }
1560
1561 fn handle_command_tracked(
1563 cmd: CommandKind,
1564 tx: tokio::sync::mpsc::Sender<Message>,
1565 tracker: &TaskTracker,
1566 cancel_token: CancellationToken,
1567 ) {
1568 let batch_tracker = tracker.clone();
1571 let batch_cancel = cancel_token.clone();
1572 tracker.spawn(async move {
1573 tokio::select! {
1574 result = cmd.execute() => {
1576 if let Some(msg) = result {
1577 if msg.is::<BatchMsg>() {
1579 if let Some(batch) = msg.downcast::<BatchMsg>() {
1580 for cmd in batch.0 {
1581 let tx_clone = tx.clone();
1582 let cancel = batch_cancel.clone();
1583 batch_tracker.spawn(async move {
1586 tokio::select! {
1587 result = async {
1588 let cmd_kind: CommandKind = cmd.into();
1589 cmd_kind.execute().await
1590 } => {
1591 if let Some(msg) = result {
1592 if tx_clone.send(msg).await.is_err() {
1593 debug!(target: "bubbletea::command", "async batch command result dropped — receiver disconnected");
1594 }
1595 }
1596 }
1597 _ = cancel.cancelled() => {
1598 debug!(target: "bubbletea::command", "async batch command cancelled during shutdown");
1599 }
1600 }
1601 });
1602 }
1603 }
1604 } else if msg.is::<SequenceMsg>() {
1605 if let Some(seq) = msg.downcast::<SequenceMsg>() {
1606 for cmd in seq.0 {
1607 let cmd_kind: CommandKind = cmd.into();
1608 if let Some(msg) = cmd_kind.execute().await {
1609 if tx.send(msg).await.is_err() {
1610 debug!(target: "bubbletea::command", "async sequence command result dropped — receiver disconnected");
1611 break;
1612 }
1613 }
1614 }
1615 }
1616 } else if tx.send(msg).await.is_err() {
1617 debug!(target: "bubbletea::command", "async command result dropped — receiver disconnected");
1618 }
1619 }
1620 }
1621 _ = cancel_token.cancelled() => {
1623 }
1625 }
1626 });
1627 }
1628
1629 #[allow(dead_code)]
1631 fn handle_command_async(&self, cmd: CommandKind, tx: tokio::sync::mpsc::Sender<Message>) {
1632 tokio::spawn(async move {
1633 if let Some(msg) = cmd.execute().await {
1634 if msg.is::<BatchMsg>() {
1636 if let Some(batch) = msg.downcast::<BatchMsg>() {
1637 for cmd in batch.0 {
1638 let tx_clone = tx.clone();
1639 tokio::spawn(async move {
1640 let cmd_kind: CommandKind = cmd.into();
1641 if let Some(msg) = cmd_kind.execute().await {
1642 if tx_clone.send(msg).await.is_err() {
1643 debug!(target: "bubbletea::command", "legacy async batch command result dropped — receiver disconnected");
1644 }
1645 }
1646 });
1647 }
1648 }
1649 } else if msg.is::<SequenceMsg>() {
1650 if let Some(seq) = msg.downcast::<SequenceMsg>() {
1651 for cmd in seq.0 {
1652 let cmd_kind: CommandKind = cmd.into();
1653 if let Some(msg) = cmd_kind.execute().await {
1654 if tx.send(msg).await.is_err() {
1655 debug!(target: "bubbletea::command", "legacy async sequence command result dropped — receiver disconnected");
1656 break;
1657 }
1658 }
1659 }
1660 }
1661 } else if tx.send(msg).await.is_err() {
1662 debug!(target: "bubbletea::command", "legacy async command result dropped — receiver disconnected");
1663 }
1664 }
1665 });
1666 }
1667}
1668
1669struct InputParser {
1674 buffer: Vec<u8>,
1675}
1676
1677impl InputParser {
1678 fn new() -> Self {
1679 Self { buffer: Vec::new() }
1680 }
1681
1682 const MAX_BUFFER: usize = 1024 * 1024;
1685
1686 fn push_bytes(&mut self, bytes: &[u8], can_have_more_data: bool) -> Vec<Message> {
1687 if !bytes.is_empty() {
1688 if self.buffer.len() + bytes.len() > Self::MAX_BUFFER {
1689 debug!(
1690 target: "bubbletea::input",
1691 "Input buffer exceeded 1MB limit, draining"
1692 );
1693 self.buffer.clear();
1694 }
1695 self.buffer.extend_from_slice(bytes);
1696 }
1697
1698 let mut messages = Vec::new();
1699 loop {
1700 if self.buffer.is_empty() {
1701 break;
1702 }
1703
1704 match parse_one_message(&self.buffer, can_have_more_data) {
1705 ParseOutcome::NeedMore => break,
1706 ParseOutcome::Parsed(consumed, msg) => {
1707 self.buffer.drain(0..consumed);
1708 if let Some(msg) = msg {
1709 messages.push(msg);
1710 }
1711 }
1712 }
1713 }
1714
1715 messages
1716 }
1717
1718 fn flush(&mut self) -> Vec<Message> {
1719 let mut messages = Vec::new();
1720 loop {
1721 if self.buffer.is_empty() {
1722 break;
1723 }
1724
1725 match parse_one_message(&self.buffer, false) {
1726 ParseOutcome::NeedMore => break,
1727 ParseOutcome::Parsed(consumed, msg) => {
1728 self.buffer.drain(0..consumed);
1729 if let Some(msg) = msg {
1730 messages.push(msg);
1731 }
1732 }
1733 }
1734 }
1735 messages
1736 }
1737}
1738
1739enum ParseOutcome {
1740 NeedMore,
1741 Parsed(usize, Option<Message>),
1742}
1743
1744fn parse_one_message(buf: &[u8], can_have_more_data: bool) -> ParseOutcome {
1745 if buf.is_empty() {
1746 return ParseOutcome::NeedMore;
1747 }
1748
1749 if let Some(outcome) = parse_mouse_event(buf, can_have_more_data) {
1750 return outcome;
1751 }
1752
1753 if let Some(outcome) = parse_focus_event(buf, can_have_more_data) {
1754 return outcome;
1755 }
1756
1757 if let Some(outcome) = parse_bracketed_paste(buf, can_have_more_data) {
1758 return outcome;
1759 }
1760
1761 if let Some(outcome) = parse_key_sequence(buf, can_have_more_data) {
1762 return outcome;
1763 }
1764
1765 parse_runes_or_control(buf, can_have_more_data)
1766}
1767
1768fn parse_mouse_event(buf: &[u8], can_have_more_data: bool) -> Option<ParseOutcome> {
1769 if buf.starts_with(b"\x1b[M") {
1770 if buf.len() < 6 {
1771 return Some(if can_have_more_data {
1772 ParseOutcome::NeedMore
1773 } else {
1774 ParseOutcome::Parsed(1, Some(replacement_message()))
1775 });
1776 }
1777 let seq = &buf[..6];
1778 return Some(match crate::mouse::parse_mouse_event_sequence(seq) {
1779 Ok(msg) => ParseOutcome::Parsed(6, Some(Message::new(msg))),
1780 Err(_) => ParseOutcome::Parsed(1, Some(replacement_message())),
1781 });
1782 }
1783
1784 if buf.starts_with(b"\x1b[<") {
1785 if let Some(end_idx) = buf.iter().position(|b| *b == b'M' || *b == b'm') {
1786 let seq = &buf[..=end_idx];
1787 return Some(match crate::mouse::parse_mouse_event_sequence(seq) {
1788 Ok(msg) => ParseOutcome::Parsed(seq.len(), Some(Message::new(msg))),
1789 Err(_) => ParseOutcome::Parsed(1, Some(replacement_message())),
1790 });
1791 }
1792 return Some(if can_have_more_data {
1793 ParseOutcome::NeedMore
1794 } else {
1795 ParseOutcome::Parsed(1, Some(replacement_message()))
1796 });
1797 }
1798
1799 None
1800}
1801
1802fn parse_focus_event(buf: &[u8], can_have_more_data: bool) -> Option<ParseOutcome> {
1803 if buf.len() < 3 && buf.starts_with(b"\x1b[") && can_have_more_data {
1804 return Some(ParseOutcome::NeedMore);
1805 }
1806
1807 if buf.starts_with(b"\x1b[I") {
1808 return Some(ParseOutcome::Parsed(3, Some(Message::new(FocusMsg))));
1809 }
1810
1811 if buf.starts_with(b"\x1b[O") {
1812 return Some(ParseOutcome::Parsed(3, Some(Message::new(BlurMsg))));
1813 }
1814
1815 None
1816}
1817
1818fn parse_bracketed_paste(buf: &[u8], can_have_more_data: bool) -> Option<ParseOutcome> {
1819 const BP_START: &[u8] = b"\x1b[200~";
1820 const BP_END: &[u8] = b"\x1b[201~";
1821
1822 if !buf.starts_with(BP_START) {
1823 return None;
1824 }
1825
1826 if let Some(idx) = buf.windows(BP_END.len()).position(|w| w == BP_END) {
1827 let content = &buf[BP_START.len()..idx];
1828 let text = String::from_utf8_lossy(content);
1829 let runes = text.chars().collect::<Vec<char>>();
1830 let key = KeyMsg::from_runes(runes).with_paste();
1831 let total_len = idx + BP_END.len();
1832 return Some(ParseOutcome::Parsed(total_len, Some(message_from_key(key))));
1833 }
1834
1835 Some(if can_have_more_data {
1836 ParseOutcome::NeedMore
1837 } else {
1838 let content = &buf[BP_START.len()..];
1839 let text = String::from_utf8_lossy(content);
1840 let runes = text.chars().collect::<Vec<char>>();
1841 let key = KeyMsg::from_runes(runes).with_paste();
1842 ParseOutcome::Parsed(buf.len(), Some(message_from_key(key)))
1843 })
1844}
1845
1846fn parse_key_sequence(buf: &[u8], can_have_more_data: bool) -> Option<ParseOutcome> {
1847 if let Some((key, len)) = crate::key::parse_sequence_prefix(buf) {
1848 return Some(ParseOutcome::Parsed(len, Some(message_from_key(key))));
1849 }
1850
1851 if can_have_more_data && is_sequence_prefix(buf) {
1853 return Some(ParseOutcome::NeedMore);
1854 }
1855
1856 if buf.starts_with(b"\x1b")
1857 && let Some((mut key, len)) = crate::key::parse_sequence_prefix(&buf[1..])
1858 {
1859 if !key.alt {
1860 key = key.with_alt();
1861 }
1862 return Some(ParseOutcome::Parsed(len + 1, Some(message_from_key(key))));
1863 }
1864
1865 None
1866}
1867
1868fn parse_runes_or_control(buf: &[u8], can_have_more_data: bool) -> ParseOutcome {
1869 let mut alt = false;
1870 let mut idx = 0;
1871
1872 if buf[0] == 0x1b {
1873 if buf.len() == 1 {
1874 return if can_have_more_data {
1875 ParseOutcome::NeedMore
1876 } else {
1877 ParseOutcome::Parsed(1, Some(message_from_key(KeyMsg::from_type(KeyType::Esc))))
1878 };
1879 }
1880 alt = true;
1881 idx = 1;
1882 }
1883
1884 if idx >= buf.len() {
1885 return ParseOutcome::NeedMore;
1886 }
1887
1888 if let Some(key_type) = control_key_type(buf[idx]) {
1889 let mut key = KeyMsg::from_type(key_type);
1890 if alt {
1891 key = key.with_alt();
1892 }
1893 return ParseOutcome::Parsed(idx + 1, Some(message_from_key(key)));
1894 }
1895
1896 let mut runes = Vec::new();
1897 let mut i = idx;
1898 while i < buf.len() {
1899 let b = buf[i];
1900 if is_control_or_space(b) {
1901 break;
1902 }
1903
1904 let (ch, width, valid) = match decode_char(&buf[i..], can_have_more_data) {
1905 DecodeOutcome::NeedMore => return ParseOutcome::NeedMore,
1906 DecodeOutcome::Decoded(ch, width, valid) => (ch, width, valid),
1907 };
1908
1909 if !valid {
1910 runes.push(std::char::REPLACEMENT_CHARACTER);
1911 i += 1;
1912 } else {
1913 runes.push(ch);
1914 i += width;
1915 }
1916
1917 if alt {
1918 break;
1919 }
1920 }
1921
1922 if !runes.is_empty() {
1923 let mut key = KeyMsg::from_runes(runes);
1924 if alt {
1925 key = key.with_alt();
1926 }
1927 return ParseOutcome::Parsed(i, Some(message_from_key(key)));
1928 }
1929
1930 ParseOutcome::Parsed(1, Some(replacement_message()))
1931}
1932
1933fn control_key_type(byte: u8) -> Option<KeyType> {
1934 match byte {
1935 0x00 => Some(KeyType::Null),
1936 0x01 => Some(KeyType::CtrlA),
1937 0x02 => Some(KeyType::CtrlB),
1938 0x03 => Some(KeyType::CtrlC),
1939 0x04 => Some(KeyType::CtrlD),
1940 0x05 => Some(KeyType::CtrlE),
1941 0x06 => Some(KeyType::CtrlF),
1942 0x07 => Some(KeyType::CtrlG),
1943 0x08 => Some(KeyType::CtrlH),
1944 0x09 => Some(KeyType::Tab),
1945 0x0A => Some(KeyType::CtrlJ),
1946 0x0B => Some(KeyType::CtrlK),
1947 0x0C => Some(KeyType::CtrlL),
1948 0x0D => Some(KeyType::Enter),
1949 0x0E => Some(KeyType::CtrlN),
1950 0x0F => Some(KeyType::CtrlO),
1951 0x10 => Some(KeyType::CtrlP),
1952 0x11 => Some(KeyType::CtrlQ),
1953 0x12 => Some(KeyType::CtrlR),
1954 0x13 => Some(KeyType::CtrlS),
1955 0x14 => Some(KeyType::CtrlT),
1956 0x15 => Some(KeyType::CtrlU),
1957 0x16 => Some(KeyType::CtrlV),
1958 0x17 => Some(KeyType::CtrlW),
1959 0x18 => Some(KeyType::CtrlX),
1960 0x19 => Some(KeyType::CtrlY),
1961 0x1A => Some(KeyType::CtrlZ),
1962 0x1B => Some(KeyType::Esc),
1963 0x1C => Some(KeyType::CtrlBackslash),
1964 0x1D => Some(KeyType::CtrlCloseBracket),
1965 0x1E => Some(KeyType::CtrlCaret),
1966 0x1F => Some(KeyType::CtrlUnderscore),
1967 0x20 => Some(KeyType::Space),
1968 0x7F => Some(KeyType::Backspace),
1969 _ => None,
1970 }
1971}
1972
1973fn is_control_or_space(byte: u8) -> bool {
1974 byte <= 0x1F || byte == 0x7F || byte == b' '
1975}
1976
1977enum DecodeOutcome {
1978 NeedMore,
1979 Decoded(char, usize, bool),
1980}
1981
1982fn decode_char(input: &[u8], can_have_more_data: bool) -> DecodeOutcome {
1983 let first = input[0];
1984 let width = if first < 0x80 {
1985 1
1986 } else if (first & 0xE0) == 0xC0 {
1987 2
1988 } else if (first & 0xF0) == 0xE0 {
1989 3
1990 } else if (first & 0xF8) == 0xF0 {
1991 4
1992 } else {
1993 return DecodeOutcome::Decoded(std::char::REPLACEMENT_CHARACTER, 1, false);
1994 };
1995
1996 if input.len() < width {
1997 return if can_have_more_data {
1998 DecodeOutcome::NeedMore
1999 } else {
2000 DecodeOutcome::Decoded(std::char::REPLACEMENT_CHARACTER, 1, false)
2001 };
2002 }
2003
2004 match std::str::from_utf8(&input[..width]) {
2005 Ok(s) => {
2006 let ch = s.chars().next().unwrap_or(std::char::REPLACEMENT_CHARACTER);
2007 DecodeOutcome::Decoded(ch, width, true)
2008 }
2009 Err(_) => DecodeOutcome::Decoded(std::char::REPLACEMENT_CHARACTER, 1, false),
2010 }
2011}
2012
2013fn message_from_key(key: KeyMsg) -> Message {
2014 if key.key_type == KeyType::CtrlC {
2015 Message::new(InterruptMsg)
2016 } else {
2017 Message::new(key)
2018 }
2019}
2020
2021fn replacement_message() -> Message {
2022 Message::new(KeyMsg::from_char(std::char::REPLACEMENT_CHARACTER))
2023}
2024
2025#[cfg(test)]
2026mod tests {
2027 use super::*;
2028 use tokio_util::sync::CancellationToken;
2029 use tokio_util::task::TaskTracker;
2030
2031 struct TestModel {
2032 count: i32,
2033 }
2034
2035 impl Model for TestModel {
2036 fn init(&self) -> Option<Cmd> {
2037 None
2038 }
2039
2040 fn update(&mut self, msg: Message) -> Option<Cmd> {
2041 if let Some(n) = msg.downcast::<i32>() {
2042 self.count += n;
2043 }
2044 None
2045 }
2046
2047 fn view(&self) -> String {
2048 format!("Count: {}", self.count)
2049 }
2050 }
2051
2052 #[test]
2053 fn test_program_options_default() {
2054 let opts = ProgramOptions::default();
2055 assert!(!opts.alt_screen);
2056 assert!(!opts.mouse_cell_motion);
2057 assert!(opts.bracketed_paste);
2058 assert_eq!(opts.fps, 60);
2059 }
2060
2061 #[test]
2062 fn test_program_builder() {
2063 let model = TestModel { count: 0 };
2064 let program = Program::new(model)
2065 .with_alt_screen()
2066 .with_mouse_cell_motion()
2067 .with_fps(30);
2068
2069 assert!(program.options.alt_screen);
2070 assert!(program.options.mouse_cell_motion);
2071 assert_eq!(program.options.fps, 30);
2072 }
2073
2074 #[test]
2075 fn test_program_fps_max() {
2076 let model = TestModel { count: 0 };
2077 let program = Program::new(model).with_fps(200);
2078 assert_eq!(program.options.fps, 120); }
2080
2081 #[test]
2082 fn test_program_fps_min() {
2083 let model = TestModel { count: 0 };
2084 let program = Program::new(model).with_fps(0);
2085 assert_eq!(program.options.fps, 1); }
2087
2088 #[test]
2091 fn test_parse_bracketed_paste_basic() {
2092 let input = b"\x1b[200~hello world\x1b[201~";
2094 let result = parse_bracketed_paste(input, false);
2095
2096 assert!(result.is_some());
2097 if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2098 assert_eq!(len, input.len());
2099 let key = msg.downcast_ref::<KeyMsg>().unwrap();
2100 assert!(key.paste, "Key should have paste flag set");
2101 assert_eq!(
2102 key.runes,
2103 vec!['h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd']
2104 );
2105 } else {
2106 panic!("Expected Parsed outcome");
2107 }
2108 }
2109
2110 #[test]
2111 fn test_parse_bracketed_paste_empty() {
2112 let input = b"\x1b[200~\x1b[201~";
2113 let result = parse_bracketed_paste(input, false);
2114
2115 assert!(result.is_some());
2116 if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2117 assert_eq!(len, input.len());
2118 let key = msg.downcast_ref::<KeyMsg>().unwrap();
2119 assert!(key.paste);
2120 assert!(key.runes.is_empty());
2121 } else {
2122 panic!("Expected Parsed outcome");
2123 }
2124 }
2125
2126 #[test]
2127 fn test_parse_bracketed_paste_multiline() {
2128 let input = b"\x1b[200~line1\nline2\nline3\x1b[201~";
2129 let result = parse_bracketed_paste(input, false);
2130
2131 assert!(result.is_some());
2132 if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2133 assert_eq!(len, input.len());
2134 let key = msg.downcast_ref::<KeyMsg>().unwrap();
2135 assert!(key.paste);
2136 let text: String = key.runes.iter().collect();
2137 assert_eq!(text, "line1\nline2\nline3");
2138 } else {
2139 panic!("Expected Parsed outcome");
2140 }
2141 }
2142
2143 #[test]
2144 fn test_parse_bracketed_paste_unicode() {
2145 let input = "\x1b[200~hello 世界 🌍\x1b[201~".as_bytes();
2146 let result = parse_bracketed_paste(input, false);
2147
2148 assert!(result.is_some());
2149 if let Some(ParseOutcome::Parsed(_, Some(msg))) = result {
2150 let key = msg.downcast_ref::<KeyMsg>().unwrap();
2151 assert!(key.paste);
2152 let text: String = key.runes.iter().collect();
2153 assert_eq!(text, "hello 世界 🌍");
2154 } else {
2155 panic!("Expected Parsed outcome");
2156 }
2157 }
2158
2159 #[test]
2160 fn test_parse_bracketed_paste_incomplete() {
2161 let input = b"\x1b[200~hello";
2163 let result = parse_bracketed_paste(input, true);
2164
2165 assert!(matches!(result, Some(ParseOutcome::NeedMore)));
2166 }
2167
2168 #[test]
2169 fn test_parse_bracketed_paste_incomplete_no_more_data() {
2170 let input = b"\x1b[200~hello";
2172 let result = parse_bracketed_paste(input, false);
2173
2174 assert!(result.is_some());
2175 if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2176 assert_eq!(len, input.len());
2177 let key = msg.downcast_ref::<KeyMsg>().unwrap();
2178 assert!(key.paste);
2179 let text: String = key.runes.iter().collect();
2180 assert_eq!(text, "hello");
2181 } else {
2182 panic!("Expected Parsed outcome");
2183 }
2184 }
2185
2186 #[test]
2187 fn test_parse_bracketed_paste_not_bracketed() {
2188 let input = b"hello";
2190 let result = parse_bracketed_paste(input, false);
2191 assert!(result.is_none(), "Non-paste input should return None");
2192 }
2193
2194 #[test]
2195 fn test_parse_bracketed_paste_large() {
2196 let content = "a".repeat(10000);
2198 let input = format!("\x1b[200~{}\x1b[201~", content);
2199 let result = parse_bracketed_paste(input.as_bytes(), false);
2200
2201 assert!(result.is_some());
2202 if let Some(ParseOutcome::Parsed(len, Some(msg))) = result {
2203 assert_eq!(len, input.len());
2204 let key = msg.downcast_ref::<KeyMsg>().unwrap();
2205 assert!(key.paste);
2206 assert_eq!(key.runes.len(), 10000);
2207 } else {
2208 panic!("Expected Parsed outcome");
2209 }
2210 }
2211
2212 #[test]
2215 fn spawn_batch_executes_closure() {
2216 use std::sync::Arc;
2217 use std::sync::atomic::{AtomicBool, Ordering};
2218
2219 let executed = Arc::new(AtomicBool::new(false));
2220 let clone = executed.clone();
2221
2222 spawn_batch(move || {
2223 clone.store(true, Ordering::SeqCst);
2224 });
2225
2226 thread::sleep(Duration::from_millis(200));
2228 assert!(
2229 executed.load(Ordering::SeqCst),
2230 "spawn_batch should execute the closure"
2231 );
2232 }
2233
2234 #[test]
2235 fn spawn_batch_handles_many_concurrent_tasks() {
2236 use std::sync::Arc;
2237 use std::sync::atomic::{AtomicUsize, Ordering};
2238
2239 let counter = Arc::new(AtomicUsize::new(0));
2240 let task_count = 200;
2241
2242 for _ in 0..task_count {
2243 let c = counter.clone();
2244 spawn_batch(move || {
2245 c.fetch_add(1, Ordering::SeqCst);
2246 });
2247 }
2248
2249 let deadline = std::time::Instant::now() + Duration::from_secs(5);
2251 while counter.load(Ordering::SeqCst) < task_count && std::time::Instant::now() < deadline {
2252 thread::sleep(Duration::from_millis(50));
2253 }
2254
2255 assert_eq!(
2256 counter.load(Ordering::SeqCst),
2257 task_count,
2258 "All {task_count} tasks should complete"
2259 );
2260 }
2261
2262 #[test]
2263 fn handle_command_batch_executes_all_subcommands() {
2264 let model = TestModel { count: 0 };
2265 let program = Program::new(model);
2266 let (tx, rx) = mpsc::channel();
2267 let command_threads = Arc::new(Mutex::new(Vec::new()));
2268
2269 let cmds: Vec<Option<Cmd>> = (0..50)
2271 .map(|i| Some(Cmd::new(move || Message::new(i))))
2272 .collect();
2273 let batch_cmd = crate::batch(cmds).unwrap();
2274
2275 program.handle_command(batch_cmd, tx, Arc::clone(&command_threads));
2276
2277 let mut results = Vec::new();
2279 let deadline = std::time::Instant::now() + Duration::from_secs(5);
2280 while results.len() < 50 && std::time::Instant::now() < deadline {
2281 if let Ok(msg) = rx.recv_timeout(Duration::from_millis(100)) {
2282 results.push(msg.downcast::<i32>().unwrap());
2283 }
2284 }
2285
2286 assert_eq!(
2287 results.len(),
2288 50,
2289 "All 50 batch sub-commands should produce results"
2290 );
2291 results.sort();
2292 let expected: Vec<i32> = (0..50).collect();
2293 assert_eq!(
2294 results, expected,
2295 "Each sub-command value should be received exactly once"
2296 );
2297 }
2298
2299 #[cfg(feature = "thread-pool")]
2300 #[test]
2301 fn handle_command_batch_bounded_parallelism() {
2302 use std::sync::atomic::{AtomicUsize, Ordering};
2303
2304 let model = TestModel { count: 0 };
2305 let program = Program::new(model);
2306 let (tx, rx) = mpsc::channel();
2307 let command_threads = Arc::new(Mutex::new(Vec::new()));
2308
2309 let active = Arc::new(AtomicUsize::new(0));
2310 let max_active = Arc::new(AtomicUsize::new(0));
2311
2312 let task_count: usize = 100;
2313 let cmds: Vec<Option<Cmd>> = (0..task_count)
2314 .map(|_| {
2315 let a = active.clone();
2316 let m = max_active.clone();
2317 Some(Cmd::new(move || {
2318 let current = a.fetch_add(1, Ordering::SeqCst) + 1;
2319 m.fetch_max(current, Ordering::SeqCst);
2320 thread::sleep(Duration::from_millis(5));
2321 a.fetch_sub(1, Ordering::SeqCst);
2322 Message::new(1i32)
2323 }))
2324 })
2325 .collect();
2326 let batch_cmd = crate::batch(cmds).unwrap();
2327
2328 program.handle_command(batch_cmd, tx, Arc::clone(&command_threads));
2329
2330 let mut count = 0usize;
2332 let deadline = std::time::Instant::now() + Duration::from_secs(15);
2333 while count < task_count && std::time::Instant::now() < deadline {
2334 if let Ok(_msg) = rx.recv_timeout(Duration::from_millis(100)) {
2335 count += 1;
2336 }
2337 }
2338
2339 assert_eq!(count, task_count, "All batch commands should complete");
2340
2341 let observed_max = max_active.load(Ordering::SeqCst);
2342 let num_cpus = thread::available_parallelism()
2343 .map(|n| n.get())
2344 .unwrap_or(4);
2345
2346 assert!(
2349 observed_max <= num_cpus + 2,
2350 "Expected bounded parallelism near {num_cpus}, but observed {observed_max}. \
2351 Without thread-pool feature, this would be near {task_count}."
2352 );
2353 }
2354
2355 #[test]
2356 fn handle_command_large_batch_no_panic() {
2357 let model = TestModel { count: 0 };
2358 let program = Program::new(model);
2359 let (tx, rx) = mpsc::channel();
2360 let command_threads = Arc::new(Mutex::new(Vec::new()));
2361
2362 let cmds: Vec<Option<Cmd>> = (0..500)
2364 .map(|i| Some(Cmd::new(move || Message::new(i))))
2365 .collect();
2366 let batch_cmd = crate::batch(cmds).unwrap();
2367
2368 program.handle_command(batch_cmd, tx, Arc::clone(&command_threads));
2369
2370 let mut count = 0usize;
2372 let deadline = std::time::Instant::now() + Duration::from_secs(10);
2373 while count < 500 && std::time::Instant::now() < deadline {
2374 if let Ok(_msg) = rx.recv_timeout(Duration::from_millis(50)) {
2375 count += 1;
2376 }
2377 }
2378
2379 assert_eq!(count, 500, "Large batch should complete without panic");
2380 }
2381
2382 #[test]
2385 fn test_thread_handles_captured() {
2386 let handle: Option<thread::JoinHandle<()>> = Some(thread::spawn(|| {
2389 }));
2391
2392 assert!(handle.is_some(), "Handle should be captured");
2393
2394 if let Some(h) = handle {
2396 h.join().expect("Thread should join successfully");
2397 }
2398 }
2399
2400 #[test]
2401 fn test_threads_exit_on_channel_drop() {
2402 use std::sync::Arc;
2403 use std::sync::atomic::{AtomicBool, Ordering};
2404
2405 let thread_exited = Arc::new(AtomicBool::new(false));
2407 let thread_exited_clone = Arc::clone(&thread_exited);
2408
2409 let (tx, rx) = mpsc::channel::<i32>();
2411
2412 let handle = thread::spawn(move || {
2414 while rx.recv().is_ok() {}
2416 thread_exited_clone.store(true, Ordering::SeqCst);
2417 });
2418
2419 assert!(!thread_exited.load(Ordering::SeqCst));
2421
2422 drop(tx);
2424
2425 handle
2427 .join()
2428 .expect("Thread should join after channel drop");
2429
2430 assert!(
2432 thread_exited.load(Ordering::SeqCst),
2433 "Thread should have exited after channel drop"
2434 );
2435 }
2436
2437 #[test]
2438 fn test_shutdown_joins_all_threads() {
2439 use std::sync::Arc;
2440 use std::sync::atomic::{AtomicUsize, Ordering};
2441
2442 let join_count = Arc::new(AtomicUsize::new(0));
2444
2445 let mut handles: Vec<thread::JoinHandle<()>> = Vec::new();
2447
2448 for i in 0..3 {
2449 let join_count_clone = Arc::clone(&join_count);
2450 handles.push(thread::spawn(move || {
2451 thread::sleep(Duration::from_millis(10 * (i as u64 + 1)));
2453 join_count_clone.fetch_add(1, Ordering::SeqCst);
2454 }));
2455 }
2456
2457 for handle in handles {
2459 match handle.join() {
2460 Ok(()) => {} Err(e) => panic!("Thread panicked during join: {:?}", e),
2462 }
2463 }
2464
2465 assert_eq!(
2467 join_count.load(Ordering::SeqCst),
2468 3,
2469 "All threads should have completed and been joined"
2470 );
2471 }
2472
2473 #[test]
2474 fn test_thread_panic_handled_gracefully() {
2475 let handle = thread::spawn(|| {
2477 panic!("Intentional panic for testing");
2478 });
2479
2480 let result = handle.join();
2482 let e = result.expect_err("Join should return Err when thread panics");
2484 let _panic_info = format!("{:?}", e);
2486 }
2488
2489 #[test]
2490 fn test_external_forwarder_pattern() {
2491 let (external_tx, external_rx) = mpsc::channel::<Message>();
2493 let (event_tx, event_rx) = mpsc::channel::<Message>();
2494
2495 let tx_clone = event_tx.clone();
2497 let handle = thread::spawn(move || {
2498 while let Ok(msg) = external_rx.recv() {
2499 if tx_clone.send(msg).is_err() {
2500 break;
2501 }
2502 }
2503 });
2504
2505 external_tx.send(Message::new(1i32)).unwrap();
2507 external_tx.send(Message::new(2i32)).unwrap();
2508 external_tx.send(Message::new(3i32)).unwrap();
2509
2510 drop(external_tx);
2512
2513 let join_result = handle.join();
2515 assert!(
2516 join_result.is_ok(),
2517 "Forwarder thread should exit cleanly when sender is dropped"
2518 );
2519
2520 let mut received = Vec::new();
2522 while let Ok(msg) = event_rx.try_recv() {
2523 if let Some(&n) = msg.downcast_ref::<i32>() {
2524 received.push(n);
2525 }
2526 }
2527 assert_eq!(received, vec![1, 2, 3], "All messages should be forwarded");
2528 }
2529
2530 #[test]
2533 fn test_task_tracker_spawn_blocking_tracks_thread() {
2534 use std::sync::Arc;
2537 use std::sync::atomic::{AtomicBool, Ordering};
2538
2539 let rt = tokio::runtime::Builder::new_current_thread()
2540 .enable_all()
2541 .build()
2542 .expect("Failed to create runtime");
2543
2544 let thread_completed = Arc::new(AtomicBool::new(false));
2545 let thread_completed_clone = Arc::clone(&thread_completed);
2546
2547 rt.block_on(async {
2548 let task_tracker = TaskTracker::new();
2549
2550 task_tracker.spawn_blocking(move || {
2552 thread::sleep(Duration::from_millis(50));
2553 thread_completed_clone.store(true, Ordering::SeqCst);
2554 });
2555
2556 task_tracker.close();
2558
2559 task_tracker.wait().await;
2561
2562 assert!(
2564 thread_completed.load(Ordering::SeqCst),
2565 "spawn_blocking task should complete before wait() returns"
2566 );
2567 });
2568 }
2569
2570 #[test]
2571 fn test_cancellation_token_stops_blocking_task() {
2572 use std::sync::Arc;
2574 use std::sync::atomic::{AtomicBool, Ordering};
2575
2576 let rt = tokio::runtime::Builder::new_current_thread()
2577 .enable_all()
2578 .build()
2579 .expect("Failed to create runtime");
2580
2581 let task_exited = Arc::new(AtomicBool::new(false));
2582 let task_exited_clone = Arc::clone(&task_exited);
2583
2584 rt.block_on(async {
2585 let cancel_token = CancellationToken::new();
2586 let task_tracker = TaskTracker::new();
2587
2588 let cancel_clone = cancel_token.clone();
2589
2590 task_tracker.spawn_blocking(move || {
2592 loop {
2594 if cancel_clone.is_cancelled() {
2595 task_exited_clone.store(true, Ordering::SeqCst);
2596 break;
2597 }
2598 thread::sleep(Duration::from_millis(10));
2599 }
2600 });
2601
2602 thread::sleep(Duration::from_millis(30));
2604 assert!(
2605 !task_exited.load(Ordering::SeqCst),
2606 "Task should still be running before cancellation"
2607 );
2608
2609 cancel_token.cancel();
2611 task_tracker.close();
2612 task_tracker.wait().await;
2613
2614 assert!(
2615 task_exited.load(Ordering::SeqCst),
2616 "Task should exit after cancellation"
2617 );
2618 });
2619 }
2620
2621 #[test]
2622 fn test_graceful_shutdown_waits_for_all_blocking_tasks() {
2623 use std::sync::Arc;
2625 use std::sync::atomic::{AtomicUsize, Ordering};
2626
2627 let rt = tokio::runtime::Builder::new_current_thread()
2628 .enable_all()
2629 .build()
2630 .expect("Failed to create runtime");
2631
2632 let completed_count = Arc::new(AtomicUsize::new(0));
2633
2634 rt.block_on(async {
2635 let cancel_token = CancellationToken::new();
2636 let task_tracker = TaskTracker::new();
2637
2638 for i in 0..3 {
2640 let count_clone = Arc::clone(&completed_count);
2641 let cancel_clone = cancel_token.clone();
2642 task_tracker.spawn_blocking(move || {
2643 loop {
2645 if cancel_clone.is_cancelled() {
2646 break;
2647 }
2648 thread::sleep(Duration::from_millis(10));
2649 }
2650 thread::sleep(Duration::from_millis(10 * (i as u64 + 1)));
2652 count_clone.fetch_add(1, Ordering::SeqCst);
2653 });
2654 }
2655
2656 thread::sleep(Duration::from_millis(30));
2658 assert_eq!(
2659 completed_count.load(Ordering::SeqCst),
2660 0,
2661 "No tasks should complete before shutdown"
2662 );
2663
2664 cancel_token.cancel();
2666 task_tracker.close();
2667
2668 let timeout_result: std::result::Result<(), tokio::time::error::Elapsed> =
2670 tokio::time::timeout(Duration::from_secs(2), task_tracker.wait()).await;
2671
2672 assert!(
2673 timeout_result.is_ok(),
2674 "All tasks should complete within timeout"
2675 );
2676
2677 assert_eq!(
2678 completed_count.load(Ordering::SeqCst),
2679 3,
2680 "All 3 tasks should complete during graceful shutdown"
2681 );
2682 });
2683 }
2684
2685 #[test]
2686 fn test_spawn_blocking_vs_spawn_difference() {
2687 use std::sync::Arc;
2691 use std::sync::atomic::{AtomicBool, Ordering};
2692
2693 let rt = tokio::runtime::Builder::new_current_thread()
2694 .enable_all()
2695 .build()
2696 .expect("Failed to create runtime");
2697
2698 let untracked_done = Arc::new(AtomicBool::new(false));
2700 let untracked_done_clone = Arc::clone(&untracked_done);
2701
2702 rt.block_on(async {
2703 let task_tracker = TaskTracker::new();
2704
2705 let _handle = thread::spawn(move || {
2707 thread::sleep(Duration::from_millis(100));
2708 untracked_done_clone.store(true, Ordering::SeqCst);
2709 });
2710
2711 task_tracker.close();
2712 task_tracker.wait().await;
2713
2714 });
2718
2719 let tracked_done = Arc::new(AtomicBool::new(false));
2721 let tracked_done_clone = Arc::clone(&tracked_done);
2722
2723 rt.block_on(async {
2724 let task_tracker = TaskTracker::new();
2725
2726 task_tracker.spawn_blocking(move || {
2728 thread::sleep(Duration::from_millis(50));
2729 tracked_done_clone.store(true, Ordering::SeqCst);
2730 });
2731
2732 task_tracker.close();
2733 task_tracker.wait().await;
2734
2735 assert!(
2737 tracked_done.load(Ordering::SeqCst),
2738 "spawn_blocking task should complete before wait() returns"
2739 );
2740 });
2741 }
2742
2743 #[test]
2744 fn test_event_thread_pattern_with_poll_timeout() {
2745 use std::sync::Arc;
2749 use std::sync::atomic::{AtomicUsize, Ordering};
2750
2751 let rt = tokio::runtime::Builder::new_current_thread()
2752 .enable_all()
2753 .build()
2754 .expect("Failed to create runtime");
2755
2756 let poll_count = Arc::new(AtomicUsize::new(0));
2757 let poll_count_clone = Arc::clone(&poll_count);
2758
2759 rt.block_on(async {
2760 let cancel_token = CancellationToken::new();
2761 let task_tracker = TaskTracker::new();
2762
2763 let cancel_clone = cancel_token.clone();
2764
2765 task_tracker.spawn_blocking(move || {
2767 loop {
2768 if cancel_clone.is_cancelled() {
2769 break;
2770 }
2771 thread::sleep(Duration::from_millis(25));
2773 poll_count_clone.fetch_add(1, Ordering::SeqCst);
2774 }
2775 });
2776
2777 thread::sleep(Duration::from_millis(100));
2779 let count_before_cancel = poll_count.load(Ordering::SeqCst);
2780 assert!(
2781 count_before_cancel >= 2,
2782 "Thread should have polled multiple times: {}",
2783 count_before_cancel
2784 );
2785
2786 cancel_token.cancel();
2788 task_tracker.close();
2789 task_tracker.wait().await;
2790
2791 let final_count = poll_count.load(Ordering::SeqCst);
2793 assert!(
2794 final_count >= count_before_cancel,
2795 "Poll count should not decrease"
2796 );
2797 });
2798 }
2799}