1use std::collections::{BTreeMap, BTreeSet};
2use std::net::IpAddr;
3use std::time::Duration;
4
5use thiserror::Error;
6
7use crate::engine::{
8 EngineTime, PingEngine, PingEngineError, PingEvent, ProbeRequest, SequenceNumber,
9};
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct AppConfig {
13 pub target: IpAddr,
14 pub interval: Duration,
15 pub timeout: Duration,
16 pub count: Option<u64>,
17 pub payload_size: usize,
18 pub ttl: Option<u8>,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum AppEvent {
23 ProbeSent {
24 seq: SequenceNumber,
25 at: Duration,
26 },
27 ProbeReply {
28 seq: SequenceNumber,
29 sent_at: Duration,
30 received_at: Duration,
31 rtt_ms: u64,
32 duplicate: bool,
33 late: bool,
34 },
35 ProbeTimeout {
36 seq: SequenceNumber,
37 sent_at: Duration,
38 deadline: Duration,
39 },
40 Interrupted {
41 at: Duration,
42 },
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Default)]
46pub struct AppReport {
47 pub events: Vec<AppEvent>,
48 pub sent: u64,
49 pub replies: u64,
50 pub timeouts: u64,
51 pub duplicate_replies: u64,
52 pub late_replies: u64,
53 pub interrupted: bool,
54}
55
56#[derive(Debug, Error, Clone, PartialEq, Eq)]
57pub enum AppError {
58 #[error("interval must be greater than 0")]
59 InvalidInterval,
60 #[error("timeout must be greater than 0")]
61 InvalidTimeout,
62 #[error("count must be greater than 0 when provided")]
63 InvalidCount,
64 #[error("duration overflow while scheduling probes")]
65 ClockOverflow,
66 #[error("observer failed: {message}")]
67 Observer { message: String },
68 #[error(transparent)]
69 Engine(#[from] PingEngineError),
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
73struct InFlight {
74 sent_at: Duration,
75 deadline: Duration,
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
79struct SentMeta {
80 sent_at: Duration,
81}
82
83pub fn run<E>(engine: &mut E, config: &AppConfig) -> Result<AppReport, AppError>
84where
85 E: PingEngine,
86{
87 run_with_observer(engine, config, |_| Ok(()))
88}
89
90pub fn run_with_observer<E, F>(
91 engine: &mut E,
92 config: &AppConfig,
93 mut observer: F,
94) -> Result<AppReport, AppError>
95where
96 E: PingEngine,
97 F: FnMut(&AppEvent) -> Result<(), AppError>,
98{
99 run_with_observer_internal(engine, config, &mut observer, true)
100}
101
102pub fn run_streaming_with_observer<E, F>(
107 engine: &mut E,
108 config: &AppConfig,
109 mut observer: F,
110) -> Result<AppReport, AppError>
111where
112 E: PingEngine,
113 F: FnMut(&AppEvent) -> Result<(), AppError>,
114{
115 run_with_observer_internal(engine, config, &mut observer, false)
116}
117
118fn run_with_observer_internal<E>(
119 engine: &mut E,
120 config: &AppConfig,
121 observer: &mut impl FnMut(&AppEvent) -> Result<(), AppError>,
122 record_events: bool,
123) -> Result<AppReport, AppError>
124where
125 E: PingEngine,
126{
127 validate_config(config)?;
128
129 let mut report = AppReport::default();
130 let mut sent_total: u64 = 0;
131 let mut next_seq: SequenceNumber = 1;
132 let mut next_send_at = engine.now();
133
134 let mut in_flight: BTreeMap<SequenceNumber, InFlight> = BTreeMap::new();
135 let mut sent_meta: BTreeMap<SequenceNumber, SentMeta> = BTreeMap::new();
136 let mut replied: BTreeSet<SequenceNumber> = BTreeSet::new();
137 let mut timed_out: BTreeSet<SequenceNumber> = BTreeSet::new();
138
139 schedule_due(
140 engine,
141 config,
142 observer,
143 &mut report,
144 record_events,
145 &mut sent_total,
146 &mut next_seq,
147 &mut next_send_at,
148 &mut in_flight,
149 &mut sent_meta,
150 &mut replied,
151 &mut timed_out,
152 )?;
153
154 loop {
155 if report.interrupted || is_finished(config.count, sent_total, &in_flight) {
156 return Ok(report);
157 }
158
159 let deadline = match next_deadline(config.count, sent_total, next_send_at, &in_flight) {
160 Some(deadline) => deadline,
161 None => return Ok(report),
162 };
163
164 let events = engine.poll_until(deadline)?;
165 for timed_event in events {
166 match timed_event.event {
167 PingEvent::Reply(reply) => {
168 let seq = reply.seq;
169 if let Some(inflight) = in_flight.remove(&seq) {
170 let did_insert = replied.insert(seq);
171 if did_insert {
172 report.replies = report.replies.saturating_add(1);
173 record_event(
174 &mut report,
175 AppEvent::ProbeReply {
176 seq,
177 sent_at: inflight.sent_at,
178 received_at: timed_event.at,
179 rtt_ms: duration_to_ms(
180 timed_event.at.saturating_sub(inflight.sent_at),
181 ),
182 duplicate: false,
183 late: false,
184 },
185 observer,
186 record_events,
187 )?;
188 }
189 } else if let Some(meta) = sent_meta.get(&seq) {
190 let duplicate = replied.contains(&seq);
191 let late = timed_out.contains(&seq);
192 if duplicate {
193 report.duplicate_replies = report.duplicate_replies.saturating_add(1);
194 } else {
195 let _ = replied.insert(seq);
196 report.replies = report.replies.saturating_add(1);
197 }
198 if late {
199 report.late_replies = report.late_replies.saturating_add(1);
200 }
201
202 record_event(
203 &mut report,
204 AppEvent::ProbeReply {
205 seq,
206 sent_at: meta.sent_at,
207 received_at: timed_event.at,
208 rtt_ms: duration_to_ms(timed_event.at.saturating_sub(meta.sent_at)),
209 duplicate,
210 late,
211 },
212 observer,
213 record_events,
214 )?;
215 }
216 }
217 PingEvent::Interrupt => {
218 report.interrupted = true;
219 record_event(
220 &mut report,
221 AppEvent::Interrupted { at: timed_event.at },
222 observer,
223 record_events,
224 )?;
225 break;
226 }
227 }
228 }
229
230 if report.interrupted {
231 return Ok(report);
232 }
233
234 schedule_due(
235 engine,
236 config,
237 observer,
238 &mut report,
239 record_events,
240 &mut sent_total,
241 &mut next_seq,
242 &mut next_send_at,
243 &mut in_flight,
244 &mut sent_meta,
245 &mut replied,
246 &mut timed_out,
247 )?;
248 }
249}
250
251fn validate_config(config: &AppConfig) -> Result<(), AppError> {
252 if config.interval.is_zero() {
253 return Err(AppError::InvalidInterval);
254 }
255 if config.timeout.is_zero() {
256 return Err(AppError::InvalidTimeout);
257 }
258 if matches!(config.count, Some(0)) {
259 return Err(AppError::InvalidCount);
260 }
261 Ok(())
262}
263
264fn next_deadline(
265 count: Option<u64>,
266 sent_total: u64,
267 next_send_at: Duration,
268 in_flight: &BTreeMap<SequenceNumber, InFlight>,
269) -> Option<Duration> {
270 let send_deadline = if should_send_more(count, sent_total) {
271 Some(next_send_at)
272 } else {
273 None
274 };
275
276 let timeout_deadline = in_flight.values().map(|entry| entry.deadline).min();
277
278 match (send_deadline, timeout_deadline) {
279 (Some(a), Some(b)) => Some(a.min(b)),
280 (Some(a), None) => Some(a),
281 (None, Some(b)) => Some(b),
282 (None, None) => None,
283 }
284}
285
286#[allow(clippy::too_many_arguments)]
287fn schedule_due<E>(
288 engine: &mut E,
289 config: &AppConfig,
290 observer: &mut impl FnMut(&AppEvent) -> Result<(), AppError>,
291 report: &mut AppReport,
292 record_events: bool,
293 sent_total: &mut u64,
294 next_seq: &mut SequenceNumber,
295 next_send_at: &mut Duration,
296 in_flight: &mut BTreeMap<SequenceNumber, InFlight>,
297 sent_meta: &mut BTreeMap<SequenceNumber, SentMeta>,
298 replied: &mut BTreeSet<SequenceNumber>,
299 timed_out: &mut BTreeSet<SequenceNumber>,
300) -> Result<(), AppError>
301where
302 E: PingEngine,
303{
304 let now = engine.now();
305
306 if *next_send_at < now {
309 *next_send_at = now;
310 }
311
312 let expired: Vec<SequenceNumber> = in_flight
313 .iter()
314 .filter_map(|(seq, inflight)| (inflight.deadline <= now).then_some(*seq))
315 .collect();
316
317 for seq in expired {
318 if let Some(inflight) = in_flight.remove(&seq) {
319 let _ = timed_out.insert(seq);
320 report.timeouts = report.timeouts.saturating_add(1);
321 record_event(
322 report,
323 AppEvent::ProbeTimeout {
324 seq,
325 sent_at: inflight.sent_at,
326 deadline: inflight.deadline,
327 },
328 observer,
329 record_events,
330 )?;
331 }
332 }
333
334 prune_tracking_state(*next_seq, sent_meta, replied, timed_out);
335
336 while should_send_more(config.count, *sent_total) && *next_send_at <= now {
337 let seq = *next_seq;
338 let sent_at: EngineTime = now;
339
340 let request = ProbeRequest {
341 seq,
342 target: config.target,
343 sent_at,
344 payload_size: config.payload_size,
345 ttl: config.ttl,
346 };
347
348 engine.send_probe(request)?;
349
350 let deadline = add_duration(sent_at, config.timeout)?;
351
352 in_flight.insert(seq, InFlight { sent_at, deadline });
353 sent_meta.insert(seq, SentMeta { sent_at });
354 report.sent = report.sent.saturating_add(1);
355 record_event(
356 report,
357 AppEvent::ProbeSent { seq, at: sent_at },
358 observer,
359 record_events,
360 )?;
361
362 *sent_total = sent_total.saturating_add(1);
363 *next_seq = next_seq.saturating_add(1);
364 *next_send_at = add_duration(*next_send_at, config.interval)?;
365 }
366
367 Ok(())
368}
369
370fn record_event(
371 report: &mut AppReport,
372 event: AppEvent,
373 observer: &mut impl FnMut(&AppEvent) -> Result<(), AppError>,
374 record_events: bool,
375) -> Result<(), AppError> {
376 observer(&event)?;
377 if record_events {
378 report.events.push(event);
379 }
380 Ok(())
381}
382
383fn should_send_more(count: Option<u64>, sent_total: u64) -> bool {
384 match count {
385 Some(limit) => sent_total < limit,
386 None => true,
387 }
388}
389
390fn is_finished(
391 count: Option<u64>,
392 sent_total: u64,
393 in_flight: &BTreeMap<SequenceNumber, InFlight>,
394) -> bool {
395 matches!(count, Some(limit) if sent_total >= limit) && in_flight.is_empty()
396}
397
398fn add_duration(base: Duration, delta: Duration) -> Result<Duration, AppError> {
399 base.checked_add(delta).ok_or(AppError::ClockOverflow)
400}
401
402fn duration_to_ms(duration: Duration) -> u64 {
403 u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
404}
405
406fn prune_tracking_state(
407 next_seq: SequenceNumber,
408 sent_meta: &mut BTreeMap<SequenceNumber, SentMeta>,
409 replied: &mut BTreeSet<SequenceNumber>,
410 timed_out: &mut BTreeSet<SequenceNumber>,
411) {
412 const KEEP_WINDOW: SequenceNumber = 10_000;
415
416 let keep_from = next_seq.saturating_sub(KEEP_WINDOW);
417
418 let kept_sent_meta = sent_meta.split_off(&keep_from);
419 *sent_meta = kept_sent_meta;
420
421 let kept_replied = replied.split_off(&keep_from);
422 *replied = kept_replied;
423
424 let kept_timed_out = timed_out.split_off(&keep_from);
425 *timed_out = kept_timed_out;
426}