hojicha_runtime/
resilient_input.rs1use crossterm::event;
7use log::{debug, error, info, warn};
8use std::panic::{self, AssertUnwindSafe};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::{mpsc, Arc};
11use std::thread;
12use std::time::Duration;
13
14#[derive(Debug, Clone, Default)]
16pub struct InputThreadStats {
17 pub total_events: usize,
19 pub panic_count: usize,
21 pub error_count: usize,
23 pub restart_count: usize,
25}
26
27pub fn spawn_resilient_input_thread(
29 running: Arc<AtomicBool>,
30 force_quit: Arc<AtomicBool>,
31 crossterm_tx: mpsc::SyncSender<event::Event>,
32) -> thread::JoinHandle<InputThreadStats> {
33 thread::spawn(move || {
34 let mut stats = InputThreadStats::default();
35
36 while running.load(Ordering::SeqCst) && !force_quit.load(Ordering::SeqCst) {
38 info!(
39 "Starting input reader (attempt #{})",
40 stats.restart_count + 1
41 );
42
43 let result = panic::catch_unwind(AssertUnwindSafe(|| {
45 run_input_loop(&running, &force_quit, &crossterm_tx, &mut stats)
46 }));
47
48 match result {
49 Ok(()) => {
50 debug!("Input loop ended normally");
51 break; }
53 Err(panic_info) => {
54 stats.panic_count += 1;
55 stats.restart_count += 1;
56
57 error!("Input thread panicked (panic #{}):", stats.panic_count);
59 if let Some(s) = panic_info.downcast_ref::<&str>() {
60 error!(" Panic message: {}", s);
61 } else if let Some(s) = panic_info.downcast_ref::<String>() {
62 error!(" Panic message: {}", s);
63 } else {
64 error!(" Unknown panic type");
65 }
66
67 if stats.panic_count > 10 {
69 error!("Too many panics in input thread, giving up");
70 break;
71 }
72
73 thread::sleep(Duration::from_millis(100));
75 warn!("Restarting input thread after panic...");
76 }
77 }
78 }
79
80 info!("Input thread supervisor ending. Stats: {:?}", stats);
81 stats
82 })
83}
84
85fn run_input_loop(
87 running: &Arc<AtomicBool>,
88 force_quit: &Arc<AtomicBool>,
89 crossterm_tx: &mpsc::SyncSender<event::Event>,
90 stats: &mut InputThreadStats,
91) {
92 let mut consecutive_errors = 0;
93
94 loop {
95 if !running.load(Ordering::SeqCst) || force_quit.load(Ordering::SeqCst) {
96 debug!(
97 "Input loop stopping (running={}, force_quit={})",
98 running.load(Ordering::SeqCst),
99 force_quit.load(Ordering::SeqCst)
100 );
101 break;
102 }
103
104 match event::poll(Duration::from_millis(100)) {
106 Ok(true) => {
107 match event::read() {
109 Ok(evt) => {
110 consecutive_errors = 0; stats.total_events += 1;
112
113 if let Err(e) = crossterm_tx.send(evt) {
115 debug!("Failed to send event (receiver disconnected): {:?}", e);
116 break; }
118 }
119 Err(e) => {
120 consecutive_errors += 1;
121 stats.error_count += 1;
122
123 use std::io::ErrorKind;
125 match e.kind() {
126 ErrorKind::Interrupted => {
127 debug!("Input read interrupted, continuing...");
129 continue;
130 }
131 ErrorKind::WouldBlock => {
132 warn!("Unexpected WouldBlock after successful poll");
134 thread::sleep(Duration::from_millis(10));
135 }
136 _ => {
137 warn!("Error reading input: {}", e);
138
139 if consecutive_errors > 10 {
141 error!(
142 "Too many consecutive input errors, stopping input thread"
143 );
144 break;
145 }
146
147 thread::sleep(Duration::from_millis(50));
149 }
150 }
151 }
152 }
153 }
154 Ok(false) => {
155 consecutive_errors = 0;
157 }
158 Err(e) => {
159 consecutive_errors += 1;
160 stats.error_count += 1;
161
162 warn!("Error polling for events: {}", e);
163
164 if consecutive_errors > 10 {
165 error!("Too many consecutive polling errors, stopping input thread");
166 break;
167 }
168
169 thread::sleep(Duration::from_millis(100));
171 }
172 }
173 }
174}
175
176pub fn wrap_with_panic_recovery<F>(name: &str, mut f: F) -> thread::JoinHandle<()>
178where
179 F: FnMut() -> bool + Send + 'static,
180{
181 let thread_name = name.to_string();
182
183 thread::spawn(move || {
184 let mut attempt = 0;
185
186 loop {
187 attempt += 1;
188 info!("{}: Starting (attempt #{})", thread_name, attempt);
189
190 let result = panic::catch_unwind(AssertUnwindSafe(&mut f));
191
192 match result {
193 Ok(should_continue) => {
194 if !should_continue {
195 info!("{}: Ending normally", thread_name);
196 break;
197 }
198 }
199 Err(panic_info) => {
200 error!("{}: Panicked!", thread_name);
201 if let Some(s) = panic_info.downcast_ref::<&str>() {
202 error!(" Panic message: {}", s);
203 }
204
205 if attempt > 10 {
206 error!("{}: Too many panics, giving up", thread_name);
207 break;
208 }
209
210 thread::sleep(Duration::from_millis(100));
211 warn!("{}: Restarting after panic...", thread_name);
212 }
213 }
214 }
215 })
216}