Skip to main content

prettyping_rs/
app.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::net::IpAddr;
3use std::time::Duration;
4
5use thiserror::Error;
6
7use crate::engine::{
8    EngineTime, PingEngine, PingEngineError, PingEvent, ProbeRequest, SequenceNumber,
9};
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct AppConfig {
13    pub target: IpAddr,
14    pub interval: Duration,
15    pub timeout: Duration,
16    pub count: Option<u64>,
17    pub payload_size: usize,
18    pub ttl: Option<u8>,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum AppEvent {
23    ProbeSent {
24        seq: SequenceNumber,
25        at: Duration,
26    },
27    ProbeReply {
28        seq: SequenceNumber,
29        sent_at: Duration,
30        received_at: Duration,
31        rtt_ms: u64,
32        duplicate: bool,
33        late: bool,
34    },
35    ProbeTimeout {
36        seq: SequenceNumber,
37        sent_at: Duration,
38        deadline: Duration,
39    },
40    Interrupted {
41        at: Duration,
42    },
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Default)]
46pub struct AppReport {
47    pub events: Vec<AppEvent>,
48    pub sent: u64,
49    pub replies: u64,
50    pub timeouts: u64,
51    pub duplicate_replies: u64,
52    pub late_replies: u64,
53    pub interrupted: bool,
54}
55
56#[derive(Debug, Error, Clone, PartialEq, Eq)]
57pub enum AppError {
58    #[error("interval must be greater than 0")]
59    InvalidInterval,
60    #[error("timeout must be greater than 0")]
61    InvalidTimeout,
62    #[error("count must be greater than 0 when provided")]
63    InvalidCount,
64    #[error("duration overflow while scheduling probes")]
65    ClockOverflow,
66    #[error("observer failed: {message}")]
67    Observer { message: String },
68    #[error(transparent)]
69    Engine(#[from] PingEngineError),
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
73struct InFlight {
74    sent_at: Duration,
75    deadline: Duration,
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
79struct SentMeta {
80    sent_at: Duration,
81}
82
83pub fn run<E>(engine: &mut E, config: &AppConfig) -> Result<AppReport, AppError>
84where
85    E: PingEngine,
86{
87    run_with_observer(engine, config, |_| Ok(()))
88}
89
90pub fn run_with_observer<E, F>(
91    engine: &mut E,
92    config: &AppConfig,
93    mut observer: F,
94) -> Result<AppReport, AppError>
95where
96    E: PingEngine,
97    F: FnMut(&AppEvent) -> Result<(), AppError>,
98{
99    run_with_observer_internal(engine, config, &mut observer, true)
100}
101
102/// Like [`run_with_observer`], but does not retain an unbounded in-memory event log.
103///
104/// This is intended for long-running interactive sessions (weeks) where we only need
105/// live rendering via the observer callback and bounded counters in the returned report.
106pub fn run_streaming_with_observer<E, F>(
107    engine: &mut E,
108    config: &AppConfig,
109    mut observer: F,
110) -> Result<AppReport, AppError>
111where
112    E: PingEngine,
113    F: FnMut(&AppEvent) -> Result<(), AppError>,
114{
115    run_with_observer_internal(engine, config, &mut observer, false)
116}
117
118fn run_with_observer_internal<E>(
119    engine: &mut E,
120    config: &AppConfig,
121    observer: &mut impl FnMut(&AppEvent) -> Result<(), AppError>,
122    record_events: bool,
123) -> Result<AppReport, AppError>
124where
125    E: PingEngine,
126{
127    validate_config(config)?;
128
129    let mut report = AppReport::default();
130    let mut sent_total: u64 = 0;
131    let mut next_seq: SequenceNumber = 1;
132    let mut next_send_at = engine.now();
133
134    let mut in_flight: BTreeMap<SequenceNumber, InFlight> = BTreeMap::new();
135    let mut sent_meta: BTreeMap<SequenceNumber, SentMeta> = BTreeMap::new();
136    let mut replied: BTreeSet<SequenceNumber> = BTreeSet::new();
137    let mut timed_out: BTreeSet<SequenceNumber> = BTreeSet::new();
138
139    schedule_due(
140        engine,
141        config,
142        observer,
143        &mut report,
144        record_events,
145        &mut sent_total,
146        &mut next_seq,
147        &mut next_send_at,
148        &mut in_flight,
149        &mut sent_meta,
150        &mut replied,
151        &mut timed_out,
152    )?;
153
154    loop {
155        if report.interrupted || is_finished(config.count, sent_total, &in_flight) {
156            return Ok(report);
157        }
158
159        let deadline = match next_deadline(config.count, sent_total, next_send_at, &in_flight) {
160            Some(deadline) => deadline,
161            None => return Ok(report),
162        };
163
164        let events = engine.poll_until(deadline)?;
165        for timed_event in events {
166            match timed_event.event {
167                PingEvent::Reply(reply) => {
168                    let seq = reply.seq;
169                    if let Some(inflight) = in_flight.remove(&seq) {
170                        let did_insert = replied.insert(seq);
171                        if did_insert {
172                            report.replies = report.replies.saturating_add(1);
173                            record_event(
174                                &mut report,
175                                AppEvent::ProbeReply {
176                                    seq,
177                                    sent_at: inflight.sent_at,
178                                    received_at: timed_event.at,
179                                    rtt_ms: duration_to_ms(
180                                        timed_event.at.saturating_sub(inflight.sent_at),
181                                    ),
182                                    duplicate: false,
183                                    late: false,
184                                },
185                                observer,
186                                record_events,
187                            )?;
188                        }
189                    } else if let Some(meta) = sent_meta.get(&seq) {
190                        let duplicate = replied.contains(&seq);
191                        let late = timed_out.contains(&seq);
192                        if duplicate {
193                            report.duplicate_replies = report.duplicate_replies.saturating_add(1);
194                        } else {
195                            let _ = replied.insert(seq);
196                            report.replies = report.replies.saturating_add(1);
197                        }
198                        if late {
199                            report.late_replies = report.late_replies.saturating_add(1);
200                        }
201
202                        record_event(
203                            &mut report,
204                            AppEvent::ProbeReply {
205                                seq,
206                                sent_at: meta.sent_at,
207                                received_at: timed_event.at,
208                                rtt_ms: duration_to_ms(timed_event.at.saturating_sub(meta.sent_at)),
209                                duplicate,
210                                late,
211                            },
212                            observer,
213                            record_events,
214                        )?;
215                    }
216                }
217                PingEvent::Interrupt => {
218                    report.interrupted = true;
219                    record_event(
220                        &mut report,
221                        AppEvent::Interrupted { at: timed_event.at },
222                        observer,
223                        record_events,
224                    )?;
225                    break;
226                }
227            }
228        }
229
230        if report.interrupted {
231            return Ok(report);
232        }
233
234        schedule_due(
235            engine,
236            config,
237            observer,
238            &mut report,
239            record_events,
240            &mut sent_total,
241            &mut next_seq,
242            &mut next_send_at,
243            &mut in_flight,
244            &mut sent_meta,
245            &mut replied,
246            &mut timed_out,
247        )?;
248    }
249}
250
251fn validate_config(config: &AppConfig) -> Result<(), AppError> {
252    if config.interval.is_zero() {
253        return Err(AppError::InvalidInterval);
254    }
255    if config.timeout.is_zero() {
256        return Err(AppError::InvalidTimeout);
257    }
258    if matches!(config.count, Some(0)) {
259        return Err(AppError::InvalidCount);
260    }
261    Ok(())
262}
263
264fn next_deadline(
265    count: Option<u64>,
266    sent_total: u64,
267    next_send_at: Duration,
268    in_flight: &BTreeMap<SequenceNumber, InFlight>,
269) -> Option<Duration> {
270    let send_deadline = if should_send_more(count, sent_total) {
271        Some(next_send_at)
272    } else {
273        None
274    };
275
276    let timeout_deadline = in_flight.values().map(|entry| entry.deadline).min();
277
278    match (send_deadline, timeout_deadline) {
279        (Some(a), Some(b)) => Some(a.min(b)),
280        (Some(a), None) => Some(a),
281        (None, Some(b)) => Some(b),
282        (None, None) => None,
283    }
284}
285
286#[allow(clippy::too_many_arguments)]
287fn schedule_due<E>(
288    engine: &mut E,
289    config: &AppConfig,
290    observer: &mut impl FnMut(&AppEvent) -> Result<(), AppError>,
291    report: &mut AppReport,
292    record_events: bool,
293    sent_total: &mut u64,
294    next_seq: &mut SequenceNumber,
295    next_send_at: &mut Duration,
296    in_flight: &mut BTreeMap<SequenceNumber, InFlight>,
297    sent_meta: &mut BTreeMap<SequenceNumber, SentMeta>,
298    replied: &mut BTreeSet<SequenceNumber>,
299    timed_out: &mut BTreeSet<SequenceNumber>,
300) -> Result<(), AppError>
301where
302    E: PingEngine,
303{
304    let now = engine.now();
305
306    // Avoid burst sending after long pauses (sleep, debugger stop, heavy scheduling delays).
307    // We intentionally do not attempt to "catch up" missed intervals.
308    if *next_send_at < now {
309        *next_send_at = now;
310    }
311
312    let expired: Vec<SequenceNumber> = in_flight
313        .iter()
314        .filter_map(|(seq, inflight)| (inflight.deadline <= now).then_some(*seq))
315        .collect();
316
317    for seq in expired {
318        if let Some(inflight) = in_flight.remove(&seq) {
319            let _ = timed_out.insert(seq);
320            report.timeouts = report.timeouts.saturating_add(1);
321            record_event(
322                report,
323                AppEvent::ProbeTimeout {
324                    seq,
325                    sent_at: inflight.sent_at,
326                    deadline: inflight.deadline,
327                },
328                observer,
329                record_events,
330            )?;
331        }
332    }
333
334    prune_tracking_state(*next_seq, sent_meta, replied, timed_out);
335
336    while should_send_more(config.count, *sent_total) && *next_send_at <= now {
337        let seq = *next_seq;
338        let sent_at: EngineTime = now;
339
340        let request = ProbeRequest {
341            seq,
342            target: config.target,
343            sent_at,
344            payload_size: config.payload_size,
345            ttl: config.ttl,
346        };
347
348        engine.send_probe(request)?;
349
350        let deadline = add_duration(sent_at, config.timeout)?;
351
352        in_flight.insert(seq, InFlight { sent_at, deadline });
353        sent_meta.insert(seq, SentMeta { sent_at });
354        report.sent = report.sent.saturating_add(1);
355        record_event(
356            report,
357            AppEvent::ProbeSent { seq, at: sent_at },
358            observer,
359            record_events,
360        )?;
361
362        *sent_total = sent_total.saturating_add(1);
363        *next_seq = next_seq.saturating_add(1);
364        *next_send_at = add_duration(*next_send_at, config.interval)?;
365    }
366
367    Ok(())
368}
369
370fn record_event(
371    report: &mut AppReport,
372    event: AppEvent,
373    observer: &mut impl FnMut(&AppEvent) -> Result<(), AppError>,
374    record_events: bool,
375) -> Result<(), AppError> {
376    observer(&event)?;
377    if record_events {
378        report.events.push(event);
379    }
380    Ok(())
381}
382
383fn should_send_more(count: Option<u64>, sent_total: u64) -> bool {
384    match count {
385        Some(limit) => sent_total < limit,
386        None => true,
387    }
388}
389
390fn is_finished(
391    count: Option<u64>,
392    sent_total: u64,
393    in_flight: &BTreeMap<SequenceNumber, InFlight>,
394) -> bool {
395    matches!(count, Some(limit) if sent_total >= limit) && in_flight.is_empty()
396}
397
398fn add_duration(base: Duration, delta: Duration) -> Result<Duration, AppError> {
399    base.checked_add(delta).ok_or(AppError::ClockOverflow)
400}
401
402fn duration_to_ms(duration: Duration) -> u64 {
403    u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
404}
405
406fn prune_tracking_state(
407    next_seq: SequenceNumber,
408    sent_meta: &mut BTreeMap<SequenceNumber, SentMeta>,
409    replied: &mut BTreeSet<SequenceNumber>,
410    timed_out: &mut BTreeSet<SequenceNumber>,
411) {
412    // Keep a bounded window so the default infinite ping mode does not grow memory.
413    // This window only needs to be large enough to classify late/duplicate replies.
414    const KEEP_WINDOW: SequenceNumber = 10_000;
415
416    let keep_from = next_seq.saturating_sub(KEEP_WINDOW);
417
418    let kept_sent_meta = sent_meta.split_off(&keep_from);
419    *sent_meta = kept_sent_meta;
420
421    let kept_replied = replied.split_off(&keep_from);
422    *replied = kept_replied;
423
424    let kept_timed_out = timed_out.split_off(&keep_from);
425    *timed_out = kept_timed_out;
426}