async-icmp 0.2.1

Async ICMP library
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
use crate::message::IcmpV6MsgType;
#[cfg(doc)]
use crate::ping::PingMultiplexer;
use crate::{
    message::{
        decode::DecodedIcmpMsg,
        echo::{parse_echo_reply, EchoId, EchoSeq},
        IcmpV4MsgType,
    },
    platform,
    socket::{SocketConfig, SocketPair},
    Icmpv4, Icmpv6,
};
use hashbrown::hash_map::Entry;
use log::{debug, warn};
use std::{fmt, hash, io, net, sync, time};
use tokio::sync::{mpsc as tmpsc, mpsc::error::TrySendError, oneshot};

/// Multiplexer task that runs in the background.
///
/// All `recv` is done by this task so that session lookup can be lock and wait free.
///
/// It also handles session management (add and close) so that there's a single point of truth
/// for whether a session exists.
pub(crate) struct MultiplexTask {
    v4_buf: Vec<u8>,
    v6_buf: Vec<u8>,
    sockets: sync::Arc<SocketPair>,
    /// Session lookup for send path.
    ///
    /// `hashbrown` map so we can use `try_insert`.
    ///
    /// Uses [`SessionHandle`] as a key since the send key will be sent across the command
    /// channel with every send, so the key should be small.
    send_session_states:
        sync::Arc<sync::RwLock<hashbrown::HashMap<SessionHandle, SendSessionState>>>,
    /// Session lookup for recv path.
    ///
    /// `hashbrown` map so we can use `Equivalent` and `try_insert`..
    /// Otherwise, we end up stuck trying to implement a self-referential Borrow to appease stdlib
    /// HashMap::get
    recv_session_states: hashbrown::HashMap<RecvHashKey, RecvSessionState>,
    commands: tmpsc::Receiver<MultiplexerCommand>,
    /// If true, main loop should abort.
    shutdown: bool,
    /// The next handle id to attempt to use
    next_handle_id: u64,
}

impl MultiplexTask {
    /// Create a new multiplexer task state with the provided config.
    ///
    /// Returns the multiplexer, ipv4 port, ipv6 port, and command channel sender.
    #[allow(clippy::type_complexity)]
    pub(crate) fn new(
        icmpv4_config: SocketConfig<Icmpv4>,
        icmpv6_config: SocketConfig<Icmpv6>,
    ) -> io::Result<(
        Self,
        u16,
        u16,
        sync::Arc<SocketPair>,
        tmpsc::Sender<MultiplexerCommand>,
        sync::Arc<sync::RwLock<hashbrown::HashMap<SessionHandle, SendSessionState>>>,
    )> {
        let (tx, rx) = tmpsc::channel(16);
        let sockets = sync::Arc::new(SocketPair::new(icmpv4_config, icmpv6_config)?);
        let v4_port = sockets.ipv4.local_port();
        let v6_port = sockets.ipv6.local_port();
        let send_session_states = sync::Arc::new(sync::RwLock::new(hashbrown::HashMap::new()));
        Ok((
            Self {
                // baseline size to be expanded if needed when a session is added
                v4_buf: Vec::new(),
                v6_buf: Vec::new(),
                sockets: sockets.clone(),
                next_handle_id: 0,
                send_session_states: send_session_states.clone(),
                recv_session_states: hashbrown::HashMap::new(),
                commands: rx,
                shutdown: false,
            },
            v4_port,
            v6_port,
            sockets,
            tx,
            send_session_states,
        ))
    }

    /// Run the main loop until shutdown.
    pub(crate) async fn run(&mut self) {
        loop {
            if self.shutdown {
                break;
            }

            if let Err(e) = self.recv_or_cmd().await {
                warn!("Recv task error: {e}")
            }
        }
    }

    /// Return `Err` if it's worth logging.
    async fn recv_or_cmd(&mut self) -> Result<(), RecvError> {
        let send_states = &mut self.send_session_states;
        let recv_states = &mut self.recv_session_states;
        tokio::select! {
            v4_res = self.sockets.ipv4.recv(&mut self.v4_buf) => {
                let (msg, _range) = v4_res?;
                handle_recv(msg, IcmpV4MsgType::EchoReply as u8, send_states, recv_states)?;
            }
            v6_res = self.sockets.ipv6.recv(&mut self.v6_buf) => {
                let (msg, _range) = v6_res?;
                handle_recv(msg, IcmpV6MsgType::EchoReply as u8, send_states, recv_states)?;
            }
            cmd_opt = self.commands.recv() => {
                match cmd_opt {
                    None => {
                        // treat closing the cmd channel as shutdown
                        self.handle_command(MultiplexerCommand::Shutdown(oneshot::channel().0)).await?
                    }
                    Some(cmd) => self.handle_command(cmd).await?
                }
            }
        }

        Ok(())
    }

    /// Must always send _something_ to `cmd`'s reply channel.
    async fn handle_command(&mut self, cmd: MultiplexerCommand) -> Result<(), RecvError> {
        match cmd {
            MultiplexerCommand::Shutdown(reply) => {
                self.shutdown = true;
                // we'll be exiting the loop anyway but might as well clean up aggressively
                self.send_session_states.write().unwrap().clear();
                self.recv_session_states.clear();
                self.commands.close();
                reply_if_possible(reply, ())
            }
            MultiplexerCommand::AddSession {
                ip,
                id,
                data,
                reply,
            } => reply_if_possible(reply, self.add_session(ip, id, data, 16)),
            MultiplexerCommand::CloseSession {
                session_handle,
                reply,
            } => {
                handle_close_session(
                    session_handle,
                    &mut self.send_session_states,
                    &mut self.recv_session_states,
                );
                reply_if_possible(reply, ())
            }
        }

        Ok(())
    }

    /// Add a session.
    ///
    /// `channel_buf_size` need not be very large (i.e. 8) unless you plan on only occasionally
    /// reading from the receivers. Normally there would be a task constantly waiting on all such
    /// receivers, and since pings are not normally sent or received very fast, a small channel will
    /// do.
    ///
    /// If the returned receiver is detected as dropped when trying to send to it,
    /// the session will be closed.
    fn add_session(
        &mut self,
        ip: net::IpAddr,
        id: EchoId,
        data: Vec<u8>,
        channel_buf_size: usize,
    ) -> Result<(SessionHandle, tmpsc::Receiver<ReplyTimestamp>), AddSessionError> {
        // resize buf to be able to receive the new data size, if needed.
        // 4 for ICMP header, 4 for ICMP Echo Reply header.
        let buf_len = 4 + 4 + data.len();
        match ip {
            net::IpAddr::V4(_) => {
                let prefix_len = if platform::ipv4_recv_prefix_ipv4_header() {
                    // Normal IPv4 header is 20 bytes, but technically there could be options, so
                    // use 60 = max IPv4 header len.
                    60
                } else {
                    0
                };

                let buf_len = prefix_len + buf_len;
                if self.v4_buf.len() < buf_len {
                    self.v4_buf.resize(buf_len, 0);
                }
            }
            net::IpAddr::V6(_) => {
                if self.v6_buf.len() < buf_len {
                    self.v6_buf.resize(buf_len, 0);
                }
            }
        }

        let echo_data = sync::Arc::new(SessionEchoData { id, data });
        let key = RecvHashKey {
            echo_data: echo_data.clone(),
        };
        let (tx, rx) = tmpsc::channel(channel_buf_size);

        let recv_state = match self.recv_session_states.entry(key) {
            Entry::Occupied(_) => {
                return Err(AddSessionError::Duplicate);
            }
            Entry::Vacant(v) => {
                v.insert(RecvSessionState {
                    tx,
                    // placeholder, replaced below
                    session_handle: SessionHandle { id: u64::MAX },
                })
            }
        };

        // id/data are unique, so now we can populate send state with a unique handle id

        let send_state = SendSessionState {
            ip,
            echo_data: echo_data.clone(),
        };

        // highly unlikely to ever wrap around on u64 but might as well be thorough
        loop {
            let handle = SessionHandle {
                id: self.next_handle_id,
            };
            // prepare for either the next loop or the next add
            self.next_handle_id = self.next_handle_id.wrapping_add(1);

            match self.send_session_states.write().unwrap().entry(handle) {
                Entry::Occupied(_) => {
                    continue;
                }
                Entry::Vacant(v) => {
                    v.insert(send_state);
                    recv_state.session_handle = handle;
                    debug!(
                        "Added session: handle = {handle:?}, id = {id:?}, data = {}",
                        hex::encode(&echo_data.data)
                    );
                    return Ok((handle, rx));
                }
            }
        }
    }
}

/// A top level function to avoid lifetime wrangling
fn handle_recv(
    msg: &[u8],
    echo_reply_type: u8,
    send_states: &mut sync::Arc<sync::RwLock<hashbrown::HashMap<SessionHandle, SendSessionState>>>,
    recv_states: &mut hashbrown::HashMap<RecvHashKey, RecvSessionState>,
) -> Result<(), RecvError> {
    let decoded = if let Ok(decoded) = DecodedIcmpMsg::decode(msg) {
        decoded
    } else {
        debug!("ICMP message parse failed");
        return Ok(());
    };

    if decoded.msg_type() != echo_reply_type || decoded.msg_code() != 0 {
        debug!(
            "Skipping irrelevant ICMP message type {} code {}",
            decoded.msg_type(),
            decoded.msg_code()
        );
        return Ok(());
    }

    let (seq, key) = if let Some((id, seq, data)) = parse_echo_reply(decoded.body()) {
        (seq, RefHashKey { id, data })
    } else {
        debug!("Couldn't parse body as Echo Reply");
        return Ok(());
    };

    if let Some(recv_state) = recv_states.get(&key) {
        debug!("Reply for {:?}: seq {:?}", recv_state.session_handle, seq,);

        if let Err(e) = recv_state.tx.try_send(ReplyTimestamp {
            seq,
            received_at: time::Instant::now(),
        }) {
            match e {
                TrySendError::Full(_) => {
                    warn!("Session channel overflow");
                }
                TrySendError::Closed(_) => {
                    debug!("Session channel closed; closing session");
                    // rx has been dropped, can close the session
                    handle_close_session(recv_state.session_handle, send_states, recv_states)
                }
            }
        }
    } else {
        debug!("Couldn't find session for {key:?}");
    }

    Ok(())
}

fn handle_close_session(
    session_handle: SessionHandle,
    send_session_states: &mut sync::Arc<
        sync::RwLock<hashbrown::HashMap<SessionHandle, SendSessionState>>,
    >,
    recv_session_states: &mut hashbrown::HashMap<RecvHashKey, RecvSessionState>,
) {
    // remove from send map
    if let Some(send_state) = send_session_states.write().unwrap().remove(&session_handle) {
        // we found the handle, so we can close the recv channel
        recv_session_states.remove(&RecvHashKey {
            echo_data: send_state.echo_data,
        });
    }
}

/// Reply if the channel is still open, logging if the send fails.
fn reply_if_possible<T>(reply: oneshot::Sender<T>, val: T) {
    if reply.send(val).is_err() {
        debug!("Could not reply - channel closed");
    }
}

/// All commands should have a "reply" Sender that the recv task will use to respond.
///
/// Since each oneshot requires a heap allocation, commands should only be used for relatively low
/// frequency communication.
pub(crate) enum MultiplexerCommand {
    /// Slightly richer than simply closing the channel, as it allows us to wait until
    /// the task reports shutdown is complete.
    Shutdown(oneshot::Sender<()>),
    AddSession {
        ip: net::IpAddr,
        id: EchoId,
        data: Vec<u8>,
        reply: oneshot::Sender<
            Result<(SessionHandle, tmpsc::Receiver<ReplyTimestamp>), AddSessionError>,
        >,
    },
    CloseSession {
        session_handle: SessionHandle,
        reply: oneshot::Sender<()>,
    },
}

/// A handle to a session.
///
/// Created by [`PingMultiplexer::add_session`].
#[derive(Clone, Copy, Hash, Debug, PartialEq, Eq)]
pub struct SessionHandle {
    id: u64,
}

/// A record of receiving an ICMP Echo Reply for `seq` at `timestamp`.
#[derive(Debug, PartialEq, Eq)]
pub struct ReplyTimestamp {
    /// The reply sequence number
    pub seq: EchoSeq,
    /// The timestamp when the reply was received
    pub received_at: time::Instant,
}

/// Task lifecycle errors applicable for any command
#[derive(Debug, thiserror::Error)]
pub enum LifecycleError {
    /// The multiplexer has shut down, so it cannot respond to further commands
    #[error("Multiplexer has shut down")]
    Shutdown,
}

/// Errors that can occur when adding a session
#[derive(Debug, thiserror::Error)]
pub enum AddSessionError {
    /// The provided session metadata (id, data) is already in use.
    #[error("Duplicate session metadata")]
    Duplicate,
    /// Lifecycle error
    #[error("Lifecycle error: {0}")]
    Lifecycle(#[from] LifecycleError),
}

#[derive(Debug, thiserror::Error)]
enum RecvError {
    #[error("IO error: {0}")]
    Io(#[from] io::Error),
}

/// Errors that can occur when sending a ping
#[derive(Debug, thiserror::Error)]
pub enum SendPingError {
    /// Invalid session handle
    #[error("Invalid session handle")]
    InvalidSessionHandle,
    /// IO error
    #[error("IO error: {0}")]
    Io(#[from] io::Error),
    /// Recv task error
    #[error("Task error: {0}")]
    Lifecycle(#[from] LifecycleError),
}

/// State needed when sending an echo request for a session
#[derive(Debug)]
pub(crate) struct SendSessionState {
    /// IP to send to
    pub(crate) ip: net::IpAddr,

    /// Key needed to clear the session from the recv state map.
    ///
    /// Arc overhead is negligible because we only make one clone ever, and only destroy
    /// during session close.
    pub(crate) echo_data: sync::Arc<SessionEchoData>,
}

/// Recv path state kept by the recv task for each open session
#[derive(Debug)]
struct RecvSessionState {
    /// Used when closing a session via detecting a closed channel
    session_handle: SessionHandle,
    /// Where timestamps for replies matching the session are sent
    tx: tokio::sync::mpsc::Sender<ReplyTimestamp>,
}

/// Owned key for recv session map.
///
/// Contains the data that would be provided by parsing an ICMP Echo Reply.
#[derive(Debug, PartialEq, Eq)]
struct RecvHashKey {
    echo_data: sync::Arc<SessionEchoData>,
}

/// Data needed to look up a session with reasonable confidence, and therefore also the data needed
/// to send a request for that session.
#[derive(PartialEq, Eq)]
pub(crate) struct SessionEchoData {
    pub(crate) id: EchoId,
    pub(crate) data: Vec<u8>,
}

impl fmt::Debug for SessionEchoData {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("SessionEchoData")
            .field("id", &self.id)
            .field("data", &hex::encode(&self.data))
            .finish()
    }
}

/// Must match Hash impl for [`RefHashKey`]
impl hash::Hash for RecvHashKey {
    fn hash<H: hash::Hasher>(&self, state: &mut H) {
        // prefix free: id is always two bytes
        self.echo_data.id.hash(state);
        self.echo_data.data.hash(state);
    }
}

/// Reference form of [`RecvHashKey`] for map queries via [`hashbrown::Equivalent`]
/// without having to own a copy of `data`
#[derive(PartialEq, Eq)]
struct RefHashKey<'a> {
    id: EchoId,
    data: &'a [u8],
}

/// Must match Hash impl for [`RecvHashKey`]
#[allow(clippy::needless_lifetimes)] // not on 1.74
impl<'a> hash::Hash for RefHashKey<'a> {
    fn hash<H: hash::Hasher>(&self, state: &mut H) {
        self.id.hash(state);
        self.data.hash(state);
    }
}

#[allow(clippy::needless_lifetimes)] // not on 1.74
impl<'a> hashbrown::Equivalent<RecvHashKey> for RefHashKey<'a> {
    fn equivalent(&self, key: &RecvHashKey) -> bool {
        self.id == key.echo_data.id && self.data == key.echo_data.data
    }
}

impl fmt::Debug for RefHashKey<'_> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("RefHashKey")
            .field("id", &self.id)
            .field("data", &hex::encode(self.data))
            .finish()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use hashbrown::Equivalent;

    #[test]
    fn hash_key_hash_equivalent_to_ref_hash_key() {
        let key = RecvHashKey {
            echo_data: SessionEchoData {
                id: EchoId::from_be(1234),
                data: vec![5, 6, 7, 8],
            }
            .into(),
        };

        let mut ref_key = RefHashKey {
            id: key.echo_data.id,
            data: &key.echo_data.data,
        };

        assert!(ref_key.equivalent(&key));

        ref_key.id = [42_u8; 2].into();
        assert!(!ref_key.equivalent(&key));
    }
}