seq_runtime/
diagnostics.rs1use crate::memory_stats::memory_registry;
25use crate::scheduler::{
26 ACTIVE_STRANDS, PEAK_STRANDS, TOTAL_COMPLETED, TOTAL_SPAWNED, strand_registry,
27};
28use std::sync::Once;
29use std::sync::atomic::Ordering;
30
31static SIGNAL_HANDLER_INIT: Once = Once::new();
32
33const STRAND_DISPLAY_LIMIT: usize = 20;
36
37pub fn install_signal_handler() {
48 SIGNAL_HANDLER_INIT.call_once(|| {
49 #[cfg(unix)]
50 {
51 use signal_hook::consts::SIGQUIT;
52 use signal_hook::iterator::Signals;
53
54 let mut signals = match Signals::new([SIGQUIT]) {
56 Ok(s) => s,
57 Err(_) => return, };
59
60 std::thread::Builder::new()
63 .name("seq-diagnostics".to_string())
64 .spawn(move || {
65 for sig in signals.forever() {
66 if sig == SIGQUIT {
67 dump_diagnostics();
68 }
69 }
70 })
71 .ok(); }
73
74 #[cfg(not(unix))]
75 {
76 }
79 });
80}
81
82pub fn dump_diagnostics() {
87 use std::io::Write;
88
89 let mut out = std::io::stderr().lock();
90
91 let _ = writeln!(out, "\n=== Seq Runtime Diagnostics ===");
92 let _ = writeln!(out, "Timestamp: {:?}", std::time::SystemTime::now());
93
94 let active = ACTIVE_STRANDS.load(Ordering::Relaxed);
96 let total_spawned = TOTAL_SPAWNED.load(Ordering::Relaxed);
97 let total_completed = TOTAL_COMPLETED.load(Ordering::Relaxed);
98 let peak = PEAK_STRANDS.load(Ordering::Relaxed);
99
100 let _ = writeln!(out, "\n[Strands]");
101 let _ = writeln!(out, " Active: {}", active);
102 let _ = writeln!(out, " Spawned: {} (total)", total_spawned);
103 let _ = writeln!(out, " Completed: {} (total)", total_completed);
104 let _ = writeln!(out, " Peak: {} (high-water mark)", peak);
105
106 let expected_completed = total_spawned.saturating_sub(active as u64);
109 if total_completed < expected_completed {
110 let lost = expected_completed - total_completed;
111 let _ = writeln!(
112 out,
113 " WARNING: {} strands may have been lost (panic/abort)",
114 lost
115 );
116 }
117
118 let registry = strand_registry();
120 let overflow = registry.overflow_count.load(Ordering::Relaxed);
121
122 let _ = writeln!(out, "\n[Active Strand Details]");
123 let _ = writeln!(out, " Registry capacity: {} slots", registry.capacity());
124 if overflow > 0 {
125 let _ = writeln!(
126 out,
127 " WARNING: {} strands exceeded registry capacity (not tracked)",
128 overflow
129 );
130 }
131
132 let now = std::time::SystemTime::now()
134 .duration_since(std::time::UNIX_EPOCH)
135 .map(|d| d.as_secs())
136 .unwrap_or(0);
137
138 let mut strands: Vec<_> = registry.active_strands().collect();
140 strands.sort_by_key(|(_, spawn_time)| *spawn_time);
141
142 if strands.is_empty() {
143 let _ = writeln!(out, " (no active strands in registry)");
144 } else {
145 let _ = writeln!(out, " {} strand(s) tracked:", strands.len());
146 for (idx, (strand_id, spawn_time)) in strands.iter().take(STRAND_DISPLAY_LIMIT).enumerate()
147 {
148 let duration = now.saturating_sub(*spawn_time);
149 let _ = writeln!(
150 out,
151 " [{:2}] Strand #{:<8} running for {}s",
152 idx + 1,
153 strand_id,
154 duration
155 );
156 }
157 if strands.len() > STRAND_DISPLAY_LIMIT {
158 let _ = writeln!(
159 out,
160 " ... and {} more strands",
161 strands.len() - STRAND_DISPLAY_LIMIT
162 );
163 }
164 }
165
166 let _ = writeln!(out, "\n[Memory]");
168 let mem_stats = memory_registry().aggregate_stats();
169 let _ = writeln!(out, " Tracked threads: {}", mem_stats.active_threads);
170 let _ = writeln!(
171 out,
172 " Arena bytes: {} (across all threads)",
173 format_bytes(mem_stats.total_arena_bytes)
174 );
175 let _ = writeln!(
176 out,
177 " Pool nodes: {} free / {} capacity",
178 mem_stats.total_pool_free, mem_stats.total_pool_capacity
179 );
180 let _ = writeln!(
181 out,
182 " Pool allocs: {} (lifetime total)",
183 mem_stats.total_pool_allocations
184 );
185 if mem_stats.overflow_count > 0 {
186 let _ = writeln!(
187 out,
188 " WARNING: {} threads exceeded registry capacity (memory not tracked)",
189 mem_stats.overflow_count
190 );
191 let _ = writeln!(
192 out,
193 " Consider increasing MAX_THREADS in memory_stats.rs (currently 64)"
194 );
195 }
196
197 let _ = writeln!(out, "\n[Channels]");
199 match get_channel_stats() {
200 Some(stats) => {
201 let _ = writeln!(out, " Open channels: {}", stats.len());
202 if !stats.is_empty() {
203 let _ = writeln!(out);
204 let _ = writeln!(
205 out,
206 " {:>6} {:>8} {:>8} {:>8}",
207 "ID", "Depth", "Sends", "Recvs"
208 );
209 let _ = writeln!(out, " {:->6} {:->8} {:->8} {:->8}", "", "", "", "");
210
211 let mut sorted_stats = stats;
213 sorted_stats.sort_by_key(|s| s.id);
214
215 for stat in sorted_stats {
216 let backpressure_warning = if stat.queue_depth > 100 {
217 " ⚠️"
218 } else {
219 ""
220 };
221 let _ = writeln!(
222 out,
223 " {:>6} {:>8} {:>8} {:>8}{}",
224 format!("#{}", stat.id),
225 stat.queue_depth,
226 stat.send_count,
227 stat.receive_count,
228 backpressure_warning
229 );
230 }
231 }
232 }
233 None => {
234 let _ = writeln!(out, " Open channels: (unavailable - registry locked)");
235 }
236 }
237
238 let _ = writeln!(out, "\n=== End Diagnostics ===\n");
239}
240
241#[allow(dead_code)]
244fn get_channel_count() -> Option<usize> {
245 use crate::channel::channel_count;
246 channel_count()
247}
248
249fn get_channel_stats() -> Option<Vec<crate::channel::ChannelStats>> {
252 use crate::channel::channel_stats;
253 channel_stats()
254}
255
256fn format_bytes(bytes: u64) -> String {
258 if bytes >= 1024 * 1024 * 1024 {
259 format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
260 } else if bytes >= 1024 * 1024 {
261 format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
262 } else if bytes >= 1024 {
263 format!("{:.2} KB", bytes as f64 / 1024.0)
264 } else {
265 format!("{} B", bytes)
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272
273 #[test]
274 fn test_dump_diagnostics_runs() {
275 dump_diagnostics();
277 }
278
279 #[test]
280 fn test_install_signal_handler_idempotent() {
281 install_signal_handler();
283 install_signal_handler();
284 install_signal_handler();
285 }
286}