1use std::io::{self, IsTerminal, Write};
2use std::num::NonZeroU16;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::Duration;
6
7use thiserror::Error;
8
9use crate::app::{self, AppConfig, AppError, AppEvent, AppReport};
10use crate::config::Config;
11use crate::engine::{EngineTime, PingEngine, PingEngineError, PingEvent, TimedEvent};
12use crate::render::RenderConfig;
13use crate::render::plain::PlainRenderer;
14use crate::render::terminal::TerminalRenderer;
15
16const INTERRUPT_POLL_SLICE: Duration = Duration::from_millis(50);
17
18#[derive(Debug, Error)]
19pub enum RuntimeError {
20 #[error("failed to install signal handlers: {message}")]
21 SignalInstall { message: String },
22 #[error("failed to flush output: {message}")]
23 Output { message: String },
24 #[error(transparent)]
25 App(#[from] AppError),
26}
27
28pub fn run_with_runtime<E>(
29 engine: &mut E,
30 app_config: &AppConfig,
31 config: &Config,
32) -> Result<AppReport, RuntimeError>
33where
34 E: PingEngine,
35{
36 let signals = RuntimeSignals::install()?;
37 let stdout_is_terminal = io::stdout().is_terminal();
38
39 #[cfg(target_os = "windows")]
40 {
41 let use_terminal = config.terminal.unwrap_or(stdout_is_terminal);
42 if use_terminal {
45 let _ = crate::windows_console::enable_virtual_terminal_processing();
46 }
47 }
48
49 let mut driver = RenderDriver::new(config, stdout_is_terminal);
50
51 let stdout = io::stdout();
52 let mut writer = stdout.lock();
53
54 let mut interrupting_engine =
55 InterruptingEngine::new(engine, signals.interrupt_flag(), INTERRUPT_POLL_SLICE);
56
57 let run_result =
59 app::run_streaming_with_observer(&mut interrupting_engine, app_config, |event| {
60 let resize_requested = signals.take_resize_requested();
61 driver
62 .observe_event(event, resize_requested, &mut writer)
63 .map_err(|err| AppError::Observer {
64 message: err.to_string(),
65 })
66 });
67
68 let finish_result = driver.finish(&mut writer);
69
70 let report = run_result.map_err(RuntimeError::from)?;
71
72 if let Err(err) = finish_result {
73 return Err(RuntimeError::Output {
74 message: err.to_string(),
75 });
76 }
77
78 Ok(report)
79}
80
81pub struct InterruptingEngine<'a, E> {
82 inner: &'a mut E,
83 interrupt_requested: &'a AtomicBool,
84 poll_slice: Duration,
85}
86
87impl<'a, E> InterruptingEngine<'a, E>
88where
89 E: PingEngine,
90{
91 #[must_use]
92 pub fn new(
93 inner: &'a mut E,
94 interrupt_requested: &'a AtomicBool,
95 poll_slice: Duration,
96 ) -> Self {
97 Self {
98 inner,
99 interrupt_requested,
100 poll_slice,
101 }
102 }
103
104 fn interrupt_is_requested(&self) -> bool {
105 self.interrupt_requested.load(Ordering::SeqCst)
106 }
107
108 fn interrupt_event(&self, at: EngineTime) -> TimedEvent {
109 TimedEvent {
110 at,
111 event: PingEvent::Interrupt,
112 }
113 }
114}
115
116impl<E> PingEngine for InterruptingEngine<'_, E>
117where
118 E: PingEngine,
119{
120 fn now(&self) -> EngineTime {
121 self.inner.now()
122 }
123
124 fn send_probe(&mut self, request: crate::engine::ProbeRequest) -> Result<(), PingEngineError> {
125 self.inner.send_probe(request)
126 }
127
128 fn poll_until(&mut self, deadline: EngineTime) -> Result<Vec<TimedEvent>, PingEngineError> {
129 if deadline < self.inner.now() {
130 return Err(PingEngineError::NonMonotonicPoll);
131 }
132
133 loop {
134 let now = self.inner.now();
135
136 if self.interrupt_is_requested() {
137 let immediate_events = self.inner.poll_until(now)?;
138 if !immediate_events.is_empty() {
139 return Ok(immediate_events);
140 }
141
142 return Ok(vec![self.interrupt_event(now)]);
143 }
144
145 if now >= deadline {
146 return Ok(Vec::new());
147 }
148
149 let sliced_deadline = now
150 .checked_add(self.poll_slice)
151 .map_or(deadline, |at| at.min(deadline));
152
153 let events = self.inner.poll_until(sliced_deadline)?;
154 if !events.is_empty() {
155 return Ok(events);
156 }
157
158 if self.interrupt_is_requested() {
159 return Ok(vec![self.interrupt_event(self.inner.now())]);
160 }
161
162 if self.inner.now() >= deadline {
163 return Ok(Vec::new());
164 }
165 }
166 }
167}
168
169#[derive(Debug)]
170pub struct RenderDriver {
171 renderer: Renderer,
172 terminal_columns_locked: bool,
173 terminal_lines_locked: bool,
174}
175
176impl RenderDriver {
177 #[must_use]
178 pub fn new(config: &Config, stdout_is_terminal: bool) -> Self {
179 let use_terminal = config.terminal.unwrap_or(stdout_is_terminal);
180 let mut render_config = RenderConfig::from(config);
181 let terminal_columns_locked = render_config.columns.is_some();
182 let terminal_lines_locked = render_config.lines.is_some();
183
184 if use_terminal {
185 if let Some((columns, lines)) = terminal_dimensions() {
186 if render_config.columns.is_none() {
187 render_config.columns = Some(columns);
188 }
189 if render_config.lines.is_none() {
190 render_config.lines = Some(lines);
191 }
192 }
193
194 return Self {
195 renderer: Renderer::Terminal(TerminalRenderer::new(render_config)),
196 terminal_columns_locked,
197 terminal_lines_locked,
198 };
199 }
200
201 Self {
202 renderer: Renderer::Plain(PlainRenderer::new(render_config)),
203 terminal_columns_locked,
204 terminal_lines_locked,
205 }
206 }
207
208 pub fn observe_event(
209 &mut self,
210 event: &AppEvent,
211 resize_requested: bool,
212 writer: &mut impl Write,
213 ) -> io::Result<()> {
214 if resize_requested {
215 self.update_terminal_size();
216 }
217
218 self.renderer.render_event(event);
219 self.flush_incremental_output(writer)
220 }
221
222 pub fn finish(&mut self, writer: &mut impl Write) -> io::Result<()> {
223 self.renderer.finish();
224 self.flush_incremental_output(writer)?;
225
226 if self.renderer.is_terminal() {
227 writer.write_all(b"\x1b[0m\n")?;
228 }
229
230 writer.flush()
231 }
232
233 fn update_terminal_size(&mut self) {
234 self.update_terminal_size_with(terminal_dimensions());
235 }
236
237 fn update_terminal_size_with(&mut self, dimensions: Option<(u16, u16)>) {
238 let Some((columns, lines)) = dimensions else {
239 return;
240 };
241
242 let effective_columns = if self.terminal_columns_locked {
243 None
244 } else {
245 NonZeroU16::new(columns).map(NonZeroU16::get)
246 };
247
248 let effective_lines = if self.terminal_lines_locked {
249 None
250 } else {
251 NonZeroU16::new(lines).map(NonZeroU16::get)
252 };
253
254 self.renderer
255 .update_size(effective_columns, effective_lines);
256 }
257
258 fn flush_incremental_output(&mut self, writer: &mut impl Write) -> io::Result<()> {
259 let output = self.renderer.output_mut();
260 if !output.is_empty() {
261 writer.write_all(output.as_bytes())?;
262 output.clear();
263 }
264
265 writer.flush()
266 }
267
268 #[cfg(test)]
269 fn flush_incremental_output_for_test(&mut self, writer: &mut impl Write) -> io::Result<()> {
270 self.flush_incremental_output(writer)
271 }
272}
273
274#[derive(Debug)]
275enum Renderer {
276 Plain(PlainRenderer),
277 Terminal(TerminalRenderer),
278}
279
280impl Renderer {
281 fn render_event(&mut self, event: &AppEvent) {
282 match self {
283 Self::Plain(renderer) => renderer.render_event(event),
284 Self::Terminal(renderer) => renderer.render_event(event),
285 }
286 }
287
288 fn finish(&mut self) {
289 match self {
290 Self::Plain(renderer) => renderer.finish(),
291 Self::Terminal(renderer) => renderer.finish(),
292 }
293 }
294
295 fn output_mut(&mut self) -> &mut String {
296 match self {
297 Self::Plain(renderer) => renderer.output_mut(),
298 Self::Terminal(renderer) => renderer.output_mut(),
299 }
300 }
301
302 fn update_size(&mut self, columns: Option<u16>, lines: Option<u16>) {
303 if let Self::Terminal(renderer) = self {
304 renderer.update_size(columns, lines);
305 }
306 }
307
308 fn is_terminal(&self) -> bool {
309 matches!(self, Self::Terminal(_))
310 }
311}
312
313fn terminal_dimensions() -> Option<(u16, u16)> {
314 let (terminal_size::Width(columns), terminal_size::Height(lines)) =
315 terminal_size::terminal_size()?;
316 Some((columns, lines))
317}
318
319pub struct RuntimeSignals {
320 interrupt_requested: Arc<AtomicBool>,
321 resize_requested: Arc<AtomicBool>,
322 _hooks: SignalHooks,
323}
324
325impl RuntimeSignals {
326 fn install() -> Result<Self, RuntimeError> {
327 let interrupt_requested = Arc::new(AtomicBool::new(false));
328 let resize_requested = Arc::new(AtomicBool::new(false));
329
330 let hooks = SignalHooks::install(
331 Arc::clone(&interrupt_requested),
332 Arc::clone(&resize_requested),
333 )
334 .map_err(|err| RuntimeError::SignalInstall {
335 message: err.to_string(),
336 })?;
337
338 Ok(Self {
339 interrupt_requested,
340 resize_requested,
341 _hooks: hooks,
342 })
343 }
344
345 fn interrupt_flag(&self) -> &AtomicBool {
346 &self.interrupt_requested
347 }
348
349 fn take_resize_requested(&self) -> bool {
350 self.resize_requested.swap(false, Ordering::SeqCst)
351 }
352}
353
354#[cfg(any(target_os = "linux", target_os = "macos"))]
355struct SignalHooks {
356 ids: Vec<signal_hook::SigId>,
357}
358
359#[cfg(any(target_os = "linux", target_os = "macos"))]
360impl SignalHooks {
361 fn install(interrupt: Arc<AtomicBool>, resize: Arc<AtomicBool>) -> io::Result<Self> {
362 let ids = vec![
363 signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(&interrupt))?,
364 signal_hook::flag::register(signal_hook::consts::SIGTERM, interrupt)?,
365 signal_hook::flag::register(signal_hook::consts::SIGWINCH, resize)?,
366 ];
367
368 Ok(Self { ids })
369 }
370}
371
372#[cfg(any(target_os = "linux", target_os = "macos"))]
373impl Drop for SignalHooks {
374 fn drop(&mut self) {
375 for id in self.ids.drain(..) {
376 signal_hook::low_level::unregister(id);
377 }
378 }
379}
380
381#[cfg(target_os = "windows")]
382struct SignalHooks;
383
384#[cfg(target_os = "windows")]
385impl SignalHooks {
386 fn install(interrupt: Arc<AtomicBool>, _resize: Arc<AtomicBool>) -> io::Result<Self> {
387 ctrlc::set_handler(move || {
388 interrupt.store(true, Ordering::SeqCst);
389 })
390 .map_err(|err| io::Error::other(err.to_string()))?;
391
392 Ok(Self)
393 }
394}
395
396#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
397struct SignalHooks;
398
399#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
400impl SignalHooks {
401 fn install(_interrupt: Arc<AtomicBool>, _resize: Arc<AtomicBool>) -> io::Result<Self> {
402 Ok(Self)
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use std::io;
409 use std::net::{IpAddr, Ipv4Addr};
410 use std::sync::atomic::AtomicBool;
411 use std::time::Duration;
412
413 use crate::app::{AppConfig, AppEvent, run_with_observer};
414 use crate::engine::mock::MockEngine;
415 use crate::engine::{PingEngine, PingEvent, PingReply, TimedEvent};
416
417 use super::{InterruptingEngine, RenderDriver};
418 use crate::config::{AddressFamily, Config};
419
420 fn ms(value: u64) -> Duration {
421 Duration::from_millis(value)
422 }
423
424 fn base_app_config(count: Option<u64>) -> AppConfig {
425 AppConfig {
426 target: IpAddr::V4(Ipv4Addr::LOCALHOST),
427 interval: ms(1_000),
428 timeout: ms(2_000),
429 count,
430 payload_size: 56,
431 ttl: None,
432 }
433 }
434
435 fn base_render_config() -> Config {
436 Config {
437 host: "127.0.0.1".to_string(),
438 color: false,
439 multicolor: false,
440 unicode: false,
441 legend: true,
442 globalstats: true,
443 recentstats: true,
444 terminal: Some(false),
445 last: 10,
446 columns: Some(80),
447 lines: Some(24),
448 rttmin: None,
449 rttmax: None,
450 family: AddressFamily::Any,
451 count: Some(10),
452 interval_secs: Some(1.0),
453 timeout_secs: Some(1.0),
454 packet_size: Some(56),
455 ttl: Some(64),
456 }
457 }
458
459 #[test]
460 fn interrupting_engine_stops_run_promptly_when_interrupt_flag_is_raised() {
461 let interrupt_requested = AtomicBool::new(false);
462 let mut engine = MockEngine::with_now(Duration::ZERO);
463
464 engine.queue_event(TimedEvent {
465 at: ms(200),
466 event: PingEvent::Reply(PingReply::for_seq(1)),
467 });
468
469 let mut interrupting_engine =
470 InterruptingEngine::new(&mut engine, &interrupt_requested, ms(50));
471
472 let mut driver = RenderDriver::new(&base_render_config(), false);
473 let mut output = Vec::new();
474
475 let report = run_with_observer(&mut interrupting_engine, &base_app_config(None), |event| {
476 driver
477 .observe_event(event, false, &mut output)
478 .map_err(|err| crate::app::AppError::Observer {
479 message: err.to_string(),
480 })?;
481
482 if matches!(event, AppEvent::ProbeReply { .. }) {
483 interrupt_requested.store(true, std::sync::atomic::Ordering::SeqCst);
484 }
485
486 Ok(())
487 })
488 .expect("runtime should exit on synthetic interrupt");
489
490 driver
491 .finish(&mut output)
492 .expect("finish should flush output");
493
494 assert!(report.interrupted);
495 assert!(
496 report
497 .events
498 .iter()
499 .any(|event| matches!(event, AppEvent::Interrupted { .. }))
500 );
501
502 let sent_sequences: Vec<u64> = engine
503 .sent_requests()
504 .iter()
505 .map(|request| request.seq)
506 .collect();
507 assert_eq!(sent_sequences, vec![1]);
508 }
509
510 #[test]
511 fn terminal_finish_always_appends_reset_and_newline() {
512 let mut config = base_render_config();
513 config.terminal = Some(true);
514
515 let mut driver = RenderDriver::new(&config, true);
516 let mut output = Vec::new();
517
518 driver
519 .observe_event(
520 &AppEvent::ProbeReply {
521 seq: 1,
522 sent_at: Duration::ZERO,
523 received_at: ms(10),
524 rtt_ms: 10,
525 duplicate: false,
526 late: false,
527 },
528 false,
529 &mut output,
530 )
531 .expect("event rendering should succeed");
532
533 driver.finish(&mut output).expect("finish should succeed");
534
535 let text = String::from_utf8(output).expect("output should be utf8");
536 assert!(text.ends_with("\x1b[0m\n"));
537 assert!(
538 text.ends_with("\n\n\n\x1b[0m\n"),
539 "terminal finish should move below reserved stats lines before final reset"
540 );
541 }
542
543 #[test]
544 fn interrupting_engine_defers_interrupt_when_ready_events_exist() {
545 let interrupt_requested = AtomicBool::new(true);
546 let mut engine = MockEngine::with_now(Duration::ZERO);
547 engine.queue_event(TimedEvent {
548 at: Duration::ZERO,
549 event: PingEvent::Reply(PingReply::for_seq(7)),
550 });
551
552 let mut interrupting_engine =
553 InterruptingEngine::new(&mut engine, &interrupt_requested, ms(50));
554
555 let events = interrupting_engine
556 .poll_until(ms(100))
557 .expect("poll should return immediate reply");
558
559 assert_eq!(events.len(), 1);
560 assert!(matches!(
561 events[0].event,
562 PingEvent::Reply(PingReply { seq: 7, .. })
563 ));
564 }
565
566 #[test]
567 fn render_driver_handles_resize_hint_without_terminal_dimensions() {
568 let mut config = base_render_config();
569 config.terminal = Some(true);
570
571 let mut driver = RenderDriver::new(&config, true);
572 let mut output = io::sink();
573
574 driver
575 .observe_event(
576 &AppEvent::ProbeTimeout {
577 seq: 1,
578 sent_at: Duration::ZERO,
579 deadline: ms(900),
580 },
581 true,
582 &mut output,
583 )
584 .expect("resize hint should not fail");
585 }
586
587 #[test]
588 fn render_driver_clears_renderer_buffer_after_flush() {
589 let config = base_render_config();
590 let mut driver = RenderDriver::new(&config, false);
591 let mut output = Vec::new();
592
593 driver
594 .observe_event(
595 &AppEvent::ProbeTimeout {
596 seq: 1,
597 sent_at: Duration::ZERO,
598 deadline: ms(900),
599 },
600 false,
601 &mut output,
602 )
603 .expect("render should succeed");
604
605 let first_len = output.len();
607 driver
608 .flush_incremental_output_for_test(&mut output)
609 .expect("flush should succeed");
610 assert_eq!(output.len(), first_len);
611 }
612
613 #[test]
614 fn render_driver_resize_does_not_override_manual_columns_setting() {
615 let mut config = base_render_config();
616 config.terminal = Some(true);
617 config.columns = Some(80);
618 config.lines = None;
619
620 let mut driver = RenderDriver::new(&config, true);
621
622 let mut output = Vec::new();
624 for seq in 1..=90 {
625 driver
626 .observe_event(
627 &AppEvent::ProbeTimeout {
628 seq,
629 sent_at: Duration::ZERO,
630 deadline: ms(900),
631 },
632 false,
633 &mut output,
634 )
635 .expect("render should succeed");
636 }
637
638 driver.update_terminal_size_with(Some((120, 40)));
640
641 let before = String::from_utf8_lossy(&output).to_string();
643 let before_max_line = before.lines().map(|l| l.len()).max().unwrap_or(0);
644
645 output.clear();
646 for seq in 91..=180 {
647 driver
648 .observe_event(
649 &AppEvent::ProbeTimeout {
650 seq,
651 sent_at: Duration::ZERO,
652 deadline: ms(900),
653 },
654 false,
655 &mut output,
656 )
657 .expect("render should succeed");
658 }
659
660 let after = String::from_utf8_lossy(&output).to_string();
661 let after_max_line = after.lines().map(|l| l.len()).max().unwrap_or(0);
662
663 assert!(before_max_line <= 200);
665 assert!(after_max_line <= 200);
666 assert!(after_max_line < 300);
667
668 assert!(after_max_line <= before_max_line.saturating_add(40));
670 }
671}