1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
//! Resilient input handling with panic recovery
//!
//! This module provides a panic-safe input thread that can recover
//! from panics and continue processing terminal events.
use crossterm::event;
use log::{debug, error, info, warn};
use std::panic::{self, AssertUnwindSafe};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::Duration;
/// Statistics for monitoring input thread health
#[derive(Debug, Clone, Default)]
pub struct InputThreadStats {
/// Total number of events processed by the input thread
pub total_events: usize,
/// Number of panics that occurred in the input thread
pub panic_count: usize,
/// Number of recoverable errors that occurred
pub error_count: usize,
/// Number of times the input thread was restarted
pub restart_count: usize,
}
/// Spawn a resilient input thread with automatic restart on panic
pub fn spawn_resilient_input_thread(
running: Arc<AtomicBool>,
force_quit: Arc<AtomicBool>,
crossterm_tx: mpsc::SyncSender<event::Event>,
) -> thread::JoinHandle<InputThreadStats> {
thread::spawn(move || {
let mut stats = InputThreadStats::default();
// Supervisor loop - restarts the input reader if it panics
while running.load(Ordering::SeqCst) && !force_quit.load(Ordering::SeqCst) {
info!(
"Starting input reader (attempt #{})",
stats.restart_count + 1
);
// Run the actual input reading in a panic-safe wrapper
let result = panic::catch_unwind(AssertUnwindSafe(|| {
run_input_loop(&running, &force_quit, &crossterm_tx, &mut stats)
}));
match result {
Ok(()) => {
debug!("Input loop ended normally");
break; // Normal exit
}
Err(panic_info) => {
stats.panic_count += 1;
stats.restart_count += 1;
// Log the panic
error!("Input thread panicked (panic #{}):", stats.panic_count);
if let Some(s) = panic_info.downcast_ref::<&str>() {
error!(" Panic message: {}", s);
} else if let Some(s) = panic_info.downcast_ref::<String>() {
error!(" Panic message: {}", s);
} else {
error!(" Unknown panic type");
}
// Check if we should give up
if stats.panic_count > 10 {
error!("Too many panics in input thread, giving up");
break;
}
// Brief pause before restart to avoid tight panic loops
thread::sleep(Duration::from_millis(100));
warn!("Restarting input thread after panic...");
}
}
}
info!("Input thread supervisor ending. Stats: {:?}", stats);
stats
})
}
/// The actual input reading loop with error handling
fn run_input_loop(
running: &Arc<AtomicBool>,
force_quit: &Arc<AtomicBool>,
crossterm_tx: &mpsc::SyncSender<event::Event>,
stats: &mut InputThreadStats,
) {
let mut consecutive_errors = 0;
loop {
if !running.load(Ordering::SeqCst) || force_quit.load(Ordering::SeqCst) {
debug!(
"Input loop stopping (running={}, force_quit={})",
running.load(Ordering::SeqCst),
force_quit.load(Ordering::SeqCst)
);
break;
}
// Poll for events with error handling
match event::poll(Duration::from_millis(100)) {
Ok(true) => {
// Event is available, try to read it
match event::read() {
Ok(evt) => {
consecutive_errors = 0; // Reset error counter on success
stats.total_events += 1;
// Try to send the event
if let Err(e) = crossterm_tx.send(evt) {
debug!("Failed to send event (receiver disconnected): {:?}", e);
break; // Channel closed, exit gracefully
}
}
Err(e) => {
consecutive_errors += 1;
stats.error_count += 1;
// Handle specific error types
use std::io::ErrorKind;
match e.kind() {
ErrorKind::Interrupted => {
// This is often harmless (e.g., from signals)
debug!("Input read interrupted, continuing...");
continue;
}
ErrorKind::WouldBlock => {
// This shouldn't happen after poll returned true
warn!("Unexpected WouldBlock after successful poll");
thread::sleep(Duration::from_millis(10));
}
_ => {
warn!("Error reading input: {}", e);
// If we get too many consecutive errors, bail out
if consecutive_errors > 10 {
error!(
"Too many consecutive input errors, stopping input thread"
);
break;
}
// Brief pause to avoid tight error loops
thread::sleep(Duration::from_millis(50));
}
}
}
}
}
Ok(false) => {
// No event available, this is normal
consecutive_errors = 0;
}
Err(e) => {
consecutive_errors += 1;
stats.error_count += 1;
warn!("Error polling for events: {}", e);
if consecutive_errors > 10 {
error!("Too many consecutive polling errors, stopping input thread");
break;
}
// Longer pause for polling errors
thread::sleep(Duration::from_millis(100));
}
}
}
}
/// Alternative: Create a simple resilient wrapper for existing input code
pub fn wrap_with_panic_recovery<F>(name: &str, mut f: F) -> thread::JoinHandle<()>
where
F: FnMut() -> bool + Send + 'static,
{
let thread_name = name.to_string();
thread::spawn(move || {
let mut attempt = 0;
loop {
attempt += 1;
info!("{}: Starting (attempt #{})", thread_name, attempt);
let result = panic::catch_unwind(AssertUnwindSafe(&mut f));
match result {
Ok(should_continue) => {
if !should_continue {
info!("{}: Ending normally", thread_name);
break;
}
}
Err(panic_info) => {
error!("{}: Panicked!", thread_name);
if let Some(s) = panic_info.downcast_ref::<&str>() {
error!(" Panic message: {}", s);
}
if attempt > 10 {
error!("{}: Too many panics, giving up", thread_name);
break;
}
thread::sleep(Duration::from_millis(100));
warn!("{}: Restarting after panic...", thread_name);
}
}
}
})
}