hojicha_runtime/
resilient_input.rs1use crossterm::event;
7use log::{debug, error, info, warn};
8use std::panic::{self, AssertUnwindSafe};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::{mpsc, Arc};
11use std::thread;
12use std::time::Duration;
13
14#[derive(Debug, Clone, Default)]
16pub struct InputThreadStats {
17 pub total_events: usize,
18 pub panic_count: usize,
19 pub error_count: usize,
20 pub restart_count: usize,
21}
22
23pub fn spawn_resilient_input_thread(
25 running: Arc<AtomicBool>,
26 force_quit: Arc<AtomicBool>,
27 crossterm_tx: mpsc::SyncSender<event::Event>,
28) -> thread::JoinHandle<InputThreadStats> {
29 thread::spawn(move || {
30 let mut stats = InputThreadStats::default();
31
32 while running.load(Ordering::SeqCst) && !force_quit.load(Ordering::SeqCst) {
34 info!(
35 "Starting input reader (attempt #{})",
36 stats.restart_count + 1
37 );
38
39 let result = panic::catch_unwind(AssertUnwindSafe(|| {
41 run_input_loop(&running, &force_quit, &crossterm_tx, &mut stats)
42 }));
43
44 match result {
45 Ok(()) => {
46 debug!("Input loop ended normally");
47 break; }
49 Err(panic_info) => {
50 stats.panic_count += 1;
51 stats.restart_count += 1;
52
53 error!("Input thread panicked (panic #{}):", stats.panic_count);
55 if let Some(s) = panic_info.downcast_ref::<&str>() {
56 error!(" Panic message: {}", s);
57 } else if let Some(s) = panic_info.downcast_ref::<String>() {
58 error!(" Panic message: {}", s);
59 } else {
60 error!(" Unknown panic type");
61 }
62
63 if stats.panic_count > 10 {
65 error!("Too many panics in input thread, giving up");
66 break;
67 }
68
69 thread::sleep(Duration::from_millis(100));
71 warn!("Restarting input thread after panic...");
72 }
73 }
74 }
75
76 info!("Input thread supervisor ending. Stats: {:?}", stats);
77 stats
78 })
79}
80
81fn run_input_loop(
83 running: &Arc<AtomicBool>,
84 force_quit: &Arc<AtomicBool>,
85 crossterm_tx: &mpsc::SyncSender<event::Event>,
86 stats: &mut InputThreadStats,
87) {
88 let mut consecutive_errors = 0;
89
90 loop {
91 if !running.load(Ordering::SeqCst) || force_quit.load(Ordering::SeqCst) {
92 debug!(
93 "Input loop stopping (running={}, force_quit={})",
94 running.load(Ordering::SeqCst),
95 force_quit.load(Ordering::SeqCst)
96 );
97 break;
98 }
99
100 match event::poll(Duration::from_millis(100)) {
102 Ok(true) => {
103 match event::read() {
105 Ok(evt) => {
106 consecutive_errors = 0; stats.total_events += 1;
108
109 if let Err(e) = crossterm_tx.send(evt) {
111 debug!("Failed to send event (receiver disconnected): {:?}", e);
112 break; }
114 }
115 Err(e) => {
116 consecutive_errors += 1;
117 stats.error_count += 1;
118
119 use std::io::ErrorKind;
121 match e.kind() {
122 ErrorKind::Interrupted => {
123 debug!("Input read interrupted, continuing...");
125 continue;
126 }
127 ErrorKind::WouldBlock => {
128 warn!("Unexpected WouldBlock after successful poll");
130 thread::sleep(Duration::from_millis(10));
131 }
132 _ => {
133 warn!("Error reading input: {}", e);
134
135 if consecutive_errors > 10 {
137 error!(
138 "Too many consecutive input errors, stopping input thread"
139 );
140 break;
141 }
142
143 thread::sleep(Duration::from_millis(50));
145 }
146 }
147 }
148 }
149 }
150 Ok(false) => {
151 consecutive_errors = 0;
153 }
154 Err(e) => {
155 consecutive_errors += 1;
156 stats.error_count += 1;
157
158 warn!("Error polling for events: {}", e);
159
160 if consecutive_errors > 10 {
161 error!("Too many consecutive polling errors, stopping input thread");
162 break;
163 }
164
165 thread::sleep(Duration::from_millis(100));
167 }
168 }
169 }
170}
171
172pub fn wrap_with_panic_recovery<F>(name: &str, mut f: F) -> thread::JoinHandle<()>
174where
175 F: FnMut() -> bool + Send + 'static,
176{
177 let thread_name = name.to_string();
178
179 thread::spawn(move || {
180 let mut attempt = 0;
181
182 loop {
183 attempt += 1;
184 info!("{}: Starting (attempt #{})", thread_name, attempt);
185
186 let result = panic::catch_unwind(AssertUnwindSafe(|| f()));
187
188 match result {
189 Ok(should_continue) => {
190 if !should_continue {
191 info!("{}: Ending normally", thread_name);
192 break;
193 }
194 }
195 Err(panic_info) => {
196 error!("{}: Panicked!", thread_name);
197 if let Some(s) = panic_info.downcast_ref::<&str>() {
198 error!(" Panic message: {}", s);
199 }
200
201 if attempt > 10 {
202 error!("{}: Too many panics, giving up", thread_name);
203 break;
204 }
205
206 thread::sleep(Duration::from_millis(100));
207 warn!("{}: Restarting after panic...", thread_name);
208 }
209 }
210 }
211 })
212}