massping 0.6.0

ICMP ping library for sending batches of ICMP echo request packets and measuring the roundtrip time
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
#[cfg(feature = "stream")]
use std::pin::Pin;
use std::{
    collections::HashMap,
    future::poll_fn,
    io,
    iter::Peekable,
    net::{Ipv4Addr, Ipv6Addr},
    sync::{
        Arc,
        atomic::{AtomicU64, Ordering},
    },
    task::{Context, Poll},
    time::Duration,
};

use bytes::{Bytes, BytesMut};
#[cfg(feature = "stream")]
use futures_core::Stream;
use tokio::{
    sync::mpsc::{self, error::TryRecvError},
    time::Instant,
};

use crate::{IpVersion, packet::EchoRequestPacket, raw_pinger::RawPinger};

/// A pinger for IPv4 addresses
pub type V4Pinger = Pinger<Ipv4Addr>;
/// A pinger for IPv6 addresses
pub type V6Pinger = Pinger<Ipv6Addr>;

/// A pinger for [`IpVersion`] (either [`Ipv4Addr`] or [`Ipv6Addr`]).
///
/// Cloning is cheap: clones share the same socket and background
/// receive task, which shut down when the last clone is dropped.
pub struct Pinger<V: IpVersion> {
    inner: Arc<InnerPinger<V>>,
    // Kept out of `InnerPinger` (which the background receive task holds)
    // so that dropping the last `Pinger` clone disconnects the channel,
    // telling the background task to shut down and release the socket.
    round_sender: mpsc::UnboundedSender<RoundMessage<V>>,
}

impl<V: IpVersion> Clone for Pinger<V> {
    fn clone(&self) -> Self {
        Self {
            inner: Arc::clone(&self.inner),
            round_sender: self.round_sender.clone(),
        }
    }
}

struct InnerPinger<V: IpVersion> {
    raw: RawPinger<V>,
    next_round_id: AtomicU64,
}

// Each `measure_many` round gets a unique `u64` id; the wire sequence
// number is its lower 16 bits. The full id lets the receive task tell
// rounds apart after the sequence number wraps around.
enum RoundMessage<V: IpVersion> {
    Subscribe {
        round_id: u64,
        expected_payload: Bytes,
        sender: mpsc::UnboundedSender<(V, Instant)>,
    },
    Unsubscribe {
        round_id: u64,
    },
}

struct Subscriber<V: IpVersion> {
    round_id: u64,
    expected_payload: Bytes,
    sender: mpsc::UnboundedSender<(V, Instant)>,
}

enum PollResult<V: IpVersion> {
    Subscription(RoundMessage<V>),
    Packet(crate::packet::EchoReplyPacket<V>),
}

impl<V: IpVersion> Pinger<V> {
    /// Construct a new `Pinger`.
    ///
    /// For maximum efficiency the same instance of `Pinger` should
    /// be used for as long as possible, although it might also
    /// be beneficial to `Drop` the `Pinger` and recreate it if
    /// you are not going to be sending pings for a long period of time.
    ///
    /// # Panics
    ///
    /// Panics if called from outside a tokio runtime, as it spawns a
    /// background receive task.
    pub fn new() -> io::Result<Self> {
        let raw = RawPinger::new()?;

        let (sender, mut receiver) = mpsc::unbounded_channel();

        let inner = Arc::new(InnerPinger {
            raw,
            next_round_id: AtomicU64::new(0),
        });

        // Spawn async receive task using the same socket.
        // It runs until `receiver` disconnects, which happens when the
        // `Pinger` holding the only sender is dropped.
        let inner_recv = Arc::clone(&inner);
        tokio::spawn(async move {
            let mut subscribers: HashMap<u16, Subscriber<V>> = HashMap::new();
            // Buffer kept outside poll_fn so it persists across polls.
            let mut recv_buf = BytesMut::new();

            loop {
                // Poll both subscription channel and socket in the same waker context.
                // This ensures we wake on either event, which is required for
                // single-threaded runtimes where we can't rely on concurrent execution.
                //
                // Note: We use try_recv() before poll_recv() as a fast path optimization.
                // Benchmarks show this is ~2x faster when messages are already queued
                // (~15ns vs ~25ns per iteration).
                let result = poll_fn(|cx| {
                    // Fast path: check for subscription changes (non-blocking, no waker)
                    match receiver.try_recv() {
                        Ok(msg) => return Poll::Ready(Some(PollResult::Subscription(msg))),
                        Err(TryRecvError::Empty) => {
                            // Continue - poll_recv() below will register the waker for this channel
                        }
                        Err(TryRecvError::Disconnected) => return Poll::Ready(None),
                    }

                    // Try to receive an ICMP packet
                    match inner_recv.raw.poll_recv(&mut recv_buf, cx) {
                        Poll::Ready(Ok(packet)) => {
                            return Poll::Ready(Some(PollResult::Packet(packet)));
                        }
                        Poll::Ready(Err(_)) => {
                            // Receiving failed (typically a transient kernel
                            // resource error). The socket readiness was
                            // consumed without registering a waker, so ask to
                            // be polled again right away; parking here would
                            // suspend reply processing until an unrelated
                            // subscription message wakes the task.
                            cx.waker().wake_by_ref();
                        }
                        Poll::Pending => {}
                    }

                    // Register waker for subscription channel
                    // We need to wake up when new subscriptions arrive
                    match receiver.poll_recv(cx) {
                        Poll::Ready(Some(msg)) => {
                            return Poll::Ready(Some(PollResult::Subscription(msg)));
                        }
                        Poll::Ready(None) => return Poll::Ready(None),
                        Poll::Pending => {}
                    }

                    Poll::Pending
                })
                .await;

                match result {
                    Some(PollResult::Subscription(RoundMessage::Subscribe {
                        round_id,
                        expected_payload,
                        sender,
                    })) => {
                        // A new round may displace a still-subscribed round
                        // whose sequence number collided after wraparound;
                        // the displaced round could not be served anyway as
                        // replies can only be told apart by sequence number.
                        subscribers.insert(
                            round_id as u16,
                            Subscriber {
                                round_id,
                                expected_payload,
                                sender,
                            },
                        );
                    }
                    Some(PollResult::Subscription(RoundMessage::Unsubscribe { round_id })) => {
                        let sequence_number = round_id as u16;
                        // Only unsubscribe if the slot still belongs to this
                        // round: after sequence number wraparound it may have
                        // been taken over by a newer round, which must keep
                        // receiving replies.
                        if subscribers
                            .get(&sequence_number)
                            .is_some_and(|subscriber| subscriber.round_id == round_id)
                        {
                            subscribers.remove(&sequence_number);
                        }
                    }
                    Some(PollResult::Packet(packet)) => {
                        let recv_instant = Instant::now();

                        let packet_source = packet.source();
                        let packet_sequence_number = packet.sequence_number();

                        if let Some(subscriber) = subscribers.get(&packet_sequence_number) {
                            // An echo reply mirrors the request's payload, so
                            // a mismatch means the reply wasn't produced by
                            // this round (e.g. a reply to an older round whose
                            // sequence number collided after wraparound, or
                            // blindly spoofed cross-traffic). Discard it.
                            let payload_matches =
                                packet.payload() == &subscriber.expected_payload[..];

                            if payload_matches
                                && subscriber
                                    .sender
                                    .send((packet_source, recv_instant))
                                    .is_err()
                            {
                                subscribers.remove(&packet_sequence_number);
                            }
                        }
                    }
                    None => return, // Channel closed
                }
            }
        });

        Ok(Self {
            inner,
            round_sender: sender,
        })
    }

    /// Ping `addresses`
    ///
    /// Creates [`MeasureManyStream`] which **lazily** sends ping
    /// requests and [`Stream`]s the responses as they arrive.
    ///
    /// Replies are matched by source address, so an address that appears
    /// multiple times is only pinged once per round and yields a single
    /// measurement.
    ///
    /// # Panics
    ///
    /// Panics if the background receive task has terminated, which only
    /// happens when the runtime the `Pinger` was created on has been
    /// shut down.
    ///
    /// [`Stream`]: futures_core::Stream
    pub fn measure_many<I>(&self, addresses: I) -> MeasureManyStream<'_, V, I>
    where
        I: Iterator<Item = V>,
    {
        let (size_hint, _) = addresses.size_hint();
        let send_queue = addresses.into_iter().peekable();
        let (sender, receiver) = mpsc::unbounded_channel();

        // Relaxed is enough: the counter is a pure id allocator, no other
        // memory is synchronized through it.
        let round_id = self.inner.next_round_id.fetch_add(1, Ordering::Relaxed);

        // The same packet is reused for every address of the round. Its
        // random payload lets the receive task discard replies that don't
        // belong to this round.
        //
        // The identifier is irrelevant: the kernel overwrites it with the
        // socket's own identifier, which it also uses to route echo replies
        // back to this socket.
        let payload = rand::random::<[u8; 64]>();
        let packet = EchoRequestPacket::new(0, round_id as u16, &payload);

        if self
            .round_sender
            .send(RoundMessage::Subscribe {
                round_id,
                expected_payload: packet.payload(),
                sender,
            })
            .is_err()
        {
            panic!("Receiver closed");
        }

        MeasureManyStream {
            pinger: self,
            packet,
            send_queue,
            in_flight: HashMap::with_capacity(size_hint),
            receiver,
            round_id,
        }
    }
}

/// A [`Stream`] of ping responses.
///
/// No kind of `rtt` timeout is implemented, so an external mechanism
/// like [`tokio::time::timeout`] should be used to prevent the program
/// from hanging indefinitely.
///
/// Leaking this stream may create a memory leak that lasts until the
/// [`Pinger`] is dropped.
///
/// [`Stream`]: futures_core::Stream
/// [`tokio::time::timeout`]: tokio::time::timeout
#[must_use = "streams do nothing unless polled"]
pub struct MeasureManyStream<'a, V: IpVersion, I: Iterator<Item = V>> {
    pinger: &'a Pinger<V>,
    packet: EchoRequestPacket<V>,
    send_queue: Peekable<I>,
    in_flight: HashMap<V, Instant>,
    receiver: mpsc::UnboundedReceiver<(V, Instant)>,
    round_id: u64,
}

impl<V: IpVersion, I: Iterator<Item = V>> MeasureManyStream<'_, V, I> {
    pub fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<(V, Duration)>> {
        // Try to receive a response (may be from a different round)
        if let Poll::Ready(maybe_reply) = self.poll_next_from_different_round(cx) {
            return Poll::Ready(maybe_reply);
        }

        // Try to send ICMP echo requests
        self.poll_next_icmp_replies(cx);

        // Check if we're done: no more addresses to send AND no responses pending
        if self.send_queue.peek().is_none() && self.in_flight.is_empty() {
            return Poll::Ready(None);
        }

        Poll::Pending
    }

    fn poll_next_icmp_replies(&mut self, cx: &mut Context<'_>) {
        while let Some(&addr) = self.send_queue.peek() {
            // Replies are matched by source address within a round, so a
            // second ping to an address that is still awaiting its reply
            // could never produce a second measurement; it would only
            // clobber the first ping's start time. Skip the duplicate.
            if self.in_flight.contains_key(&addr) {
                self.send_queue.next();
                continue;
            }

            match self.pinger.inner.raw.poll_send_to(cx, addr, &self.packet) {
                Poll::Ready(result) => {
                    let sent_at = Instant::now();

                    let taken_addr = self.send_queue.next();
                    debug_assert!(taken_addr.is_some());

                    // If the send failed (e.g. no route to host) no reply
                    // can ever arrive, so don't track the address as
                    // in-flight or the stream would never terminate.
                    if result.is_ok() {
                        self.in_flight.insert(addr, sent_at);
                    }
                }
                Poll::Pending => {
                    // The socket only remembers the most recent waker per
                    // direction (`AsyncFd` semantics), so with multiple
                    // streams sharing the socket another stream could
                    // overwrite ours and we'd never be woken again. Sends
                    // only return `Pending` while the send buffer is full,
                    // which clears up quickly, so schedule an immediate
                    // re-poll instead of parking.
                    cx.waker().wake_by_ref();
                    break;
                }
            }
        }
    }

    fn poll_next_from_different_round(
        &mut self,
        cx: &mut Context<'_>,
    ) -> Poll<Option<(V, Duration)>> {
        loop {
            match self.receiver.poll_recv(cx) {
                Poll::Pending => return Poll::Pending,
                Poll::Ready(Some((addr, recv_instant))) => {
                    if let Some(send_instant) = self.in_flight.remove(&addr) {
                        let rtt = recv_instant - send_instant;
                        return Poll::Ready(Some((addr, rtt)));
                    }
                }
                Poll::Ready(None) => return Poll::Ready(None),
            }
        }
    }
}

#[cfg(feature = "stream")]
impl<V: IpVersion, I: Iterator<Item = V> + Unpin> Stream for MeasureManyStream<'_, V, I> {
    type Item = (V, Duration);

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.as_mut().poll_next_unpin(cx)
    }
}

impl<V: IpVersion, I: Iterator<Item = V>> Drop for MeasureManyStream<'_, V, I> {
    fn drop(&mut self) {
        let _ = self.pinger.round_sender.send(RoundMessage::Unsubscribe {
            round_id: self.round_id,
        });
    }
}