Skip to main content

prettyping_rs/
runtime.rs

1use std::io::{self, IsTerminal, Write};
2use std::num::NonZeroU16;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::Duration;
6
7use thiserror::Error;
8
9use crate::app::{self, AppConfig, AppError, AppEvent, AppReport};
10use crate::config::Config;
11use crate::engine::{EngineTime, PingEngine, PingEngineError, PingEvent, TimedEvent};
12use crate::render::RenderConfig;
13use crate::render::plain::PlainRenderer;
14use crate::render::terminal::TerminalRenderer;
15
16const INTERRUPT_POLL_SLICE: Duration = Duration::from_millis(50);
17
18#[derive(Debug, Error)]
19pub enum RuntimeError {
20    #[error("failed to install signal handlers: {message}")]
21    SignalInstall { message: String },
22    #[error("failed to flush output: {message}")]
23    Output { message: String },
24    #[error(transparent)]
25    App(#[from] AppError),
26}
27
28pub fn run_with_runtime<E>(
29    engine: &mut E,
30    app_config: &AppConfig,
31    config: &Config,
32) -> Result<AppReport, RuntimeError>
33where
34    E: PingEngine,
35{
36    let signals = RuntimeSignals::install()?;
37    let stdout_is_terminal = io::stdout().is_terminal();
38
39    #[cfg(target_os = "windows")]
40    {
41        let use_terminal = config.terminal.unwrap_or(stdout_is_terminal);
42        // Best-effort enabling of ANSI/VT escape processing for classic Windows console hosts.
43        // If this fails (e.g., redirected output), we continue and the user may see raw escapes.
44        if use_terminal {
45            let _ = crate::windows_console::enable_virtual_terminal_processing();
46        }
47    }
48
49    let mut driver = RenderDriver::new(config, stdout_is_terminal);
50
51    let stdout = io::stdout();
52    let mut writer = stdout.lock();
53
54    let mut interrupting_engine =
55        InterruptingEngine::new(engine, signals.interrupt_flag(), INTERRUPT_POLL_SLICE);
56
57    // Streaming mode: do not retain an unbounded event log in memory.
58    let run_result =
59        app::run_streaming_with_observer(&mut interrupting_engine, app_config, |event| {
60            let resize_requested = signals.take_resize_requested();
61            driver
62                .observe_event(event, resize_requested, &mut writer)
63                .map_err(|err| AppError::Observer {
64                    message: err.to_string(),
65                })
66        });
67
68    let finish_result = driver.finish(&mut writer);
69
70    let report = run_result.map_err(RuntimeError::from)?;
71
72    if let Err(err) = finish_result {
73        return Err(RuntimeError::Output {
74            message: err.to_string(),
75        });
76    }
77
78    Ok(report)
79}
80
81pub struct InterruptingEngine<'a, E> {
82    inner: &'a mut E,
83    interrupt_requested: &'a AtomicBool,
84    poll_slice: Duration,
85}
86
87impl<'a, E> InterruptingEngine<'a, E>
88where
89    E: PingEngine,
90{
91    #[must_use]
92    pub fn new(
93        inner: &'a mut E,
94        interrupt_requested: &'a AtomicBool,
95        poll_slice: Duration,
96    ) -> Self {
97        Self {
98            inner,
99            interrupt_requested,
100            poll_slice,
101        }
102    }
103
104    fn interrupt_is_requested(&self) -> bool {
105        self.interrupt_requested.load(Ordering::SeqCst)
106    }
107
108    fn interrupt_event(&self, at: EngineTime) -> TimedEvent {
109        TimedEvent {
110            at,
111            event: PingEvent::Interrupt,
112        }
113    }
114}
115
116impl<E> PingEngine for InterruptingEngine<'_, E>
117where
118    E: PingEngine,
119{
120    fn now(&self) -> EngineTime {
121        self.inner.now()
122    }
123
124    fn send_probe(&mut self, request: crate::engine::ProbeRequest) -> Result<(), PingEngineError> {
125        self.inner.send_probe(request)
126    }
127
128    fn poll_until(&mut self, deadline: EngineTime) -> Result<Vec<TimedEvent>, PingEngineError> {
129        if deadline < self.inner.now() {
130            return Err(PingEngineError::NonMonotonicPoll);
131        }
132
133        loop {
134            let now = self.inner.now();
135
136            if self.interrupt_is_requested() {
137                let immediate_events = self.inner.poll_until(now)?;
138                if !immediate_events.is_empty() {
139                    return Ok(immediate_events);
140                }
141
142                return Ok(vec![self.interrupt_event(now)]);
143            }
144
145            if now >= deadline {
146                return Ok(Vec::new());
147            }
148
149            let sliced_deadline = now
150                .checked_add(self.poll_slice)
151                .map_or(deadline, |at| at.min(deadline));
152
153            let events = self.inner.poll_until(sliced_deadline)?;
154            if !events.is_empty() {
155                return Ok(events);
156            }
157
158            if self.interrupt_is_requested() {
159                return Ok(vec![self.interrupt_event(self.inner.now())]);
160            }
161
162            if self.inner.now() >= deadline {
163                return Ok(Vec::new());
164            }
165        }
166    }
167}
168
169#[derive(Debug)]
170pub struct RenderDriver {
171    renderer: Renderer,
172    terminal_columns_locked: bool,
173    terminal_lines_locked: bool,
174}
175
176impl RenderDriver {
177    #[must_use]
178    pub fn new(config: &Config, stdout_is_terminal: bool) -> Self {
179        let use_terminal = config.terminal.unwrap_or(stdout_is_terminal);
180        let mut render_config = RenderConfig::from(config);
181        let terminal_columns_locked = render_config.columns.is_some();
182        let terminal_lines_locked = render_config.lines.is_some();
183
184        if use_terminal {
185            if let Some((columns, lines)) = terminal_dimensions() {
186                if render_config.columns.is_none() {
187                    render_config.columns = Some(columns);
188                }
189                if render_config.lines.is_none() {
190                    render_config.lines = Some(lines);
191                }
192            }
193
194            return Self {
195                renderer: Renderer::Terminal(TerminalRenderer::new(render_config)),
196                terminal_columns_locked,
197                terminal_lines_locked,
198            };
199        }
200
201        Self {
202            renderer: Renderer::Plain(PlainRenderer::new(render_config)),
203            terminal_columns_locked,
204            terminal_lines_locked,
205        }
206    }
207
208    pub fn observe_event(
209        &mut self,
210        event: &AppEvent,
211        resize_requested: bool,
212        writer: &mut impl Write,
213    ) -> io::Result<()> {
214        if resize_requested {
215            self.update_terminal_size();
216        }
217
218        self.renderer.render_event(event);
219        self.flush_incremental_output(writer)
220    }
221
222    pub fn finish(&mut self, writer: &mut impl Write) -> io::Result<()> {
223        self.renderer.finish();
224        self.flush_incremental_output(writer)?;
225
226        if self.renderer.is_terminal() {
227            writer.write_all(b"\x1b[0m\n")?;
228        }
229
230        writer.flush()
231    }
232
233    fn update_terminal_size(&mut self) {
234        self.update_terminal_size_with(terminal_dimensions());
235    }
236
237    fn update_terminal_size_with(&mut self, dimensions: Option<(u16, u16)>) {
238        let Some((columns, lines)) = dimensions else {
239            return;
240        };
241
242        let effective_columns = if self.terminal_columns_locked {
243            None
244        } else {
245            NonZeroU16::new(columns).map(NonZeroU16::get)
246        };
247
248        let effective_lines = if self.terminal_lines_locked {
249            None
250        } else {
251            NonZeroU16::new(lines).map(NonZeroU16::get)
252        };
253
254        self.renderer
255            .update_size(effective_columns, effective_lines);
256    }
257
258    fn flush_incremental_output(&mut self, writer: &mut impl Write) -> io::Result<()> {
259        let output = self.renderer.output_mut();
260        if !output.is_empty() {
261            writer.write_all(output.as_bytes())?;
262            output.clear();
263        }
264
265        writer.flush()
266    }
267
268    #[cfg(test)]
269    fn flush_incremental_output_for_test(&mut self, writer: &mut impl Write) -> io::Result<()> {
270        self.flush_incremental_output(writer)
271    }
272}
273
274#[derive(Debug)]
275enum Renderer {
276    Plain(PlainRenderer),
277    Terminal(TerminalRenderer),
278}
279
280impl Renderer {
281    fn render_event(&mut self, event: &AppEvent) {
282        match self {
283            Self::Plain(renderer) => renderer.render_event(event),
284            Self::Terminal(renderer) => renderer.render_event(event),
285        }
286    }
287
288    fn finish(&mut self) {
289        match self {
290            Self::Plain(renderer) => renderer.finish(),
291            Self::Terminal(renderer) => renderer.finish(),
292        }
293    }
294
295    fn output_mut(&mut self) -> &mut String {
296        match self {
297            Self::Plain(renderer) => renderer.output_mut(),
298            Self::Terminal(renderer) => renderer.output_mut(),
299        }
300    }
301
302    fn update_size(&mut self, columns: Option<u16>, lines: Option<u16>) {
303        if let Self::Terminal(renderer) = self {
304            renderer.update_size(columns, lines);
305        }
306    }
307
308    fn is_terminal(&self) -> bool {
309        matches!(self, Self::Terminal(_))
310    }
311}
312
313fn terminal_dimensions() -> Option<(u16, u16)> {
314    let (terminal_size::Width(columns), terminal_size::Height(lines)) =
315        terminal_size::terminal_size()?;
316    Some((columns, lines))
317}
318
319pub struct RuntimeSignals {
320    interrupt_requested: Arc<AtomicBool>,
321    resize_requested: Arc<AtomicBool>,
322    _hooks: SignalHooks,
323}
324
325impl RuntimeSignals {
326    fn install() -> Result<Self, RuntimeError> {
327        let interrupt_requested = Arc::new(AtomicBool::new(false));
328        let resize_requested = Arc::new(AtomicBool::new(false));
329
330        let hooks = SignalHooks::install(
331            Arc::clone(&interrupt_requested),
332            Arc::clone(&resize_requested),
333        )
334        .map_err(|err| RuntimeError::SignalInstall {
335            message: err.to_string(),
336        })?;
337
338        Ok(Self {
339            interrupt_requested,
340            resize_requested,
341            _hooks: hooks,
342        })
343    }
344
345    fn interrupt_flag(&self) -> &AtomicBool {
346        &self.interrupt_requested
347    }
348
349    fn take_resize_requested(&self) -> bool {
350        self.resize_requested.swap(false, Ordering::SeqCst)
351    }
352}
353
354#[cfg(any(target_os = "linux", target_os = "macos"))]
355struct SignalHooks {
356    ids: Vec<signal_hook::SigId>,
357}
358
359#[cfg(any(target_os = "linux", target_os = "macos"))]
360impl SignalHooks {
361    fn install(interrupt: Arc<AtomicBool>, resize: Arc<AtomicBool>) -> io::Result<Self> {
362        let ids = vec![
363            signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(&interrupt))?,
364            signal_hook::flag::register(signal_hook::consts::SIGTERM, interrupt)?,
365            signal_hook::flag::register(signal_hook::consts::SIGWINCH, resize)?,
366        ];
367
368        Ok(Self { ids })
369    }
370}
371
372#[cfg(any(target_os = "linux", target_os = "macos"))]
373impl Drop for SignalHooks {
374    fn drop(&mut self) {
375        for id in self.ids.drain(..) {
376            signal_hook::low_level::unregister(id);
377        }
378    }
379}
380
381#[cfg(target_os = "windows")]
382struct SignalHooks;
383
384#[cfg(target_os = "windows")]
385impl SignalHooks {
386    fn install(interrupt: Arc<AtomicBool>, _resize: Arc<AtomicBool>) -> io::Result<Self> {
387        ctrlc::set_handler(move || {
388            interrupt.store(true, Ordering::SeqCst);
389        })
390        .map_err(|err| io::Error::other(err.to_string()))?;
391
392        Ok(Self)
393    }
394}
395
396#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
397struct SignalHooks;
398
399#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
400impl SignalHooks {
401    fn install(_interrupt: Arc<AtomicBool>, _resize: Arc<AtomicBool>) -> io::Result<Self> {
402        Ok(Self)
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use std::io;
409    use std::net::{IpAddr, Ipv4Addr};
410    use std::sync::atomic::AtomicBool;
411    use std::time::Duration;
412
413    use crate::app::{AppConfig, AppEvent, run_with_observer};
414    use crate::engine::mock::MockEngine;
415    use crate::engine::{PingEngine, PingEvent, PingReply, TimedEvent};
416
417    use super::{InterruptingEngine, RenderDriver};
418    use crate::config::{AddressFamily, Config};
419
420    fn ms(value: u64) -> Duration {
421        Duration::from_millis(value)
422    }
423
424    fn base_app_config(count: Option<u64>) -> AppConfig {
425        AppConfig {
426            target: IpAddr::V4(Ipv4Addr::LOCALHOST),
427            interval: ms(1_000),
428            timeout: ms(2_000),
429            count,
430            payload_size: 56,
431            ttl: None,
432        }
433    }
434
435    fn base_render_config() -> Config {
436        Config {
437            host: "127.0.0.1".to_string(),
438            color: false,
439            multicolor: false,
440            unicode: false,
441            legend: true,
442            globalstats: true,
443            recentstats: true,
444            terminal: Some(false),
445            last: 10,
446            columns: Some(80),
447            lines: Some(24),
448            rttmin: None,
449            rttmax: None,
450            family: AddressFamily::Any,
451            count: Some(10),
452            interval_secs: Some(1.0),
453            timeout_secs: Some(1.0),
454            packet_size: Some(56),
455            ttl: Some(64),
456        }
457    }
458
459    #[test]
460    fn interrupting_engine_stops_run_promptly_when_interrupt_flag_is_raised() {
461        let interrupt_requested = AtomicBool::new(false);
462        let mut engine = MockEngine::with_now(Duration::ZERO);
463
464        engine.queue_event(TimedEvent {
465            at: ms(200),
466            event: PingEvent::Reply(PingReply::for_seq(1)),
467        });
468
469        let mut interrupting_engine =
470            InterruptingEngine::new(&mut engine, &interrupt_requested, ms(50));
471
472        let mut driver = RenderDriver::new(&base_render_config(), false);
473        let mut output = Vec::new();
474
475        let report = run_with_observer(&mut interrupting_engine, &base_app_config(None), |event| {
476            driver
477                .observe_event(event, false, &mut output)
478                .map_err(|err| crate::app::AppError::Observer {
479                    message: err.to_string(),
480                })?;
481
482            if matches!(event, AppEvent::ProbeReply { .. }) {
483                interrupt_requested.store(true, std::sync::atomic::Ordering::SeqCst);
484            }
485
486            Ok(())
487        })
488        .expect("runtime should exit on synthetic interrupt");
489
490        driver
491            .finish(&mut output)
492            .expect("finish should flush output");
493
494        assert!(report.interrupted);
495        assert!(
496            report
497                .events
498                .iter()
499                .any(|event| matches!(event, AppEvent::Interrupted { .. }))
500        );
501
502        let sent_sequences: Vec<u64> = engine
503            .sent_requests()
504            .iter()
505            .map(|request| request.seq)
506            .collect();
507        assert_eq!(sent_sequences, vec![1]);
508    }
509
510    #[test]
511    fn terminal_finish_always_appends_reset_and_newline() {
512        let mut config = base_render_config();
513        config.terminal = Some(true);
514
515        let mut driver = RenderDriver::new(&config, true);
516        let mut output = Vec::new();
517
518        driver
519            .observe_event(
520                &AppEvent::ProbeReply {
521                    seq: 1,
522                    sent_at: Duration::ZERO,
523                    received_at: ms(10),
524                    rtt_ms: 10,
525                    duplicate: false,
526                    late: false,
527                },
528                false,
529                &mut output,
530            )
531            .expect("event rendering should succeed");
532
533        driver.finish(&mut output).expect("finish should succeed");
534
535        let text = String::from_utf8(output).expect("output should be utf8");
536        assert!(text.ends_with("\x1b[0m\n"));
537        assert!(
538            text.ends_with("\n\n\n\x1b[0m\n"),
539            "terminal finish should move below reserved stats lines before final reset"
540        );
541    }
542
543    #[test]
544    fn interrupting_engine_defers_interrupt_when_ready_events_exist() {
545        let interrupt_requested = AtomicBool::new(true);
546        let mut engine = MockEngine::with_now(Duration::ZERO);
547        engine.queue_event(TimedEvent {
548            at: Duration::ZERO,
549            event: PingEvent::Reply(PingReply::for_seq(7)),
550        });
551
552        let mut interrupting_engine =
553            InterruptingEngine::new(&mut engine, &interrupt_requested, ms(50));
554
555        let events = interrupting_engine
556            .poll_until(ms(100))
557            .expect("poll should return immediate reply");
558
559        assert_eq!(events.len(), 1);
560        assert!(matches!(
561            events[0].event,
562            PingEvent::Reply(PingReply { seq: 7, .. })
563        ));
564    }
565
566    #[test]
567    fn render_driver_handles_resize_hint_without_terminal_dimensions() {
568        let mut config = base_render_config();
569        config.terminal = Some(true);
570
571        let mut driver = RenderDriver::new(&config, true);
572        let mut output = io::sink();
573
574        driver
575            .observe_event(
576                &AppEvent::ProbeTimeout {
577                    seq: 1,
578                    sent_at: Duration::ZERO,
579                    deadline: ms(900),
580                },
581                true,
582                &mut output,
583            )
584            .expect("resize hint should not fail");
585    }
586
587    #[test]
588    fn render_driver_clears_renderer_buffer_after_flush() {
589        let config = base_render_config();
590        let mut driver = RenderDriver::new(&config, false);
591        let mut output = Vec::new();
592
593        driver
594            .observe_event(
595                &AppEvent::ProbeTimeout {
596                    seq: 1,
597                    sent_at: Duration::ZERO,
598                    deadline: ms(900),
599                },
600                false,
601                &mut output,
602            )
603            .expect("render should succeed");
604
605        // If the buffer is not cleared, a second flush would write the same bytes again.
606        let first_len = output.len();
607        driver
608            .flush_incremental_output_for_test(&mut output)
609            .expect("flush should succeed");
610        assert_eq!(output.len(), first_len);
611    }
612
613    #[test]
614    fn render_driver_resize_does_not_override_manual_columns_setting() {
615        let mut config = base_render_config();
616        config.terminal = Some(true);
617        config.columns = Some(80);
618        config.lines = None;
619
620        let mut driver = RenderDriver::new(&config, true);
621
622        // Render a few events to produce output at the configured width.
623        let mut output = Vec::new();
624        for seq in 1..=90 {
625            driver
626                .observe_event(
627                    &AppEvent::ProbeTimeout {
628                        seq,
629                        sent_at: Duration::ZERO,
630                        deadline: ms(900),
631                    },
632                    false,
633                    &mut output,
634                )
635                .expect("render should succeed");
636        }
637
638        // Resize should NOT change columns when user set --columns.
639        driver.update_terminal_size_with(Some((120, 40)));
640
641        // After resizing, rendering should still wrap at around 80 columns, not 120.
642        let before = String::from_utf8_lossy(&output).to_string();
643        let before_max_line = before.lines().map(|l| l.len()).max().unwrap_or(0);
644
645        output.clear();
646        for seq in 91..=180 {
647            driver
648                .observe_event(
649                    &AppEvent::ProbeTimeout {
650                        seq,
651                        sent_at: Duration::ZERO,
652                        deadline: ms(900),
653                    },
654                    false,
655                    &mut output,
656                )
657                .expect("render should succeed");
658        }
659
660        let after = String::from_utf8_lossy(&output).to_string();
661        let after_max_line = after.lines().map(|l| l.len()).max().unwrap_or(0);
662
663        // Allow some slack for terminal control sequences and legend/stats lines.
664        assert!(before_max_line <= 200);
665        assert!(after_max_line <= 200);
666        assert!(after_max_line < 300);
667
668        // The key property: it should not become substantially wider after resize.
669        assert!(after_max_line <= before_max_line.saturating_add(40));
670    }
671}