async_icmp/ping/
mod.rs

1//! High level "ping" (ICMP Echo / Echo Reply) API, built on top of the rest of this library.
2//!
3//! [`PingMultiplexer`] supports pinging multiple hosts concurrently, both IPv4 and IPv6, with flexible Echo contents.
4
5use log::debug;
6use std::{fmt, io, net, sync, time};
7use tokio::sync::{mpsc as tmpsc, oneshot};
8
9mod multiplexer_task;
10#[cfg(test)]
11mod tests;
12
13use crate::{
14    message::echo::{EchoId, EchoSeq, IcmpEchoRequest},
15    ping::multiplexer_task::{MultiplexTask, MultiplexerCommand, SendSessionState},
16    platform,
17    socket::{SocketConfig, SocketPair},
18    Icmpv4, Icmpv6, IpVersion,
19};
20pub use multiplexer_task::{
21    AddSessionError, LifecycleError, ReplyTimestamp, SendPingError, SessionHandle,
22};
23
24/// A high-level ping (ICMP echo / echo reply) API.
25///
26/// This addresses the common case of handling one or more ping "sessions": for a given
27/// IP address, ICMP Echo ID, and ICMP Echo Data, you send one or more pings (ICMP Echo).
28///
29/// Each session has its own channel for responses (ICMP Echo Reply).
30///
31/// Because this is designed to be used from many tasks simultaneously, it is safe to `clone()`
32/// as it uses `Arc` internally.
33///
34/// # Examples
35///
36/// Create two ping sessions on one socket, and send one ping with each.
37///
38/// ```
39/// use std::net;
40/// use async_icmp::{
41///     message::echo::EchoSeq,
42///     IpVersion,
43///     ping::PingMultiplexer,
44///     socket::SocketConfig
45/// };
46///
47/// async fn ping_demo() -> anyhow::Result<()> {
48///     let multiplexer = PingMultiplexer::new(SocketConfig::default(), SocketConfig::default())?;
49///
50///     // Two sessions with distinct `data` (and possibly id, platform permitting)
51///     let (handle1, mut rx1) = multiplexer
52///        .add_session(
53///            net::Ipv4Addr::LOCALHOST.into(),
54///            multiplexer.platform_echo_id(IpVersion::V4).unwrap_or_else(rand::random),
55///            rand::random::<[u8; 32]>().to_vec(),
56///        )
57///        .await?;
58///
59///     let (handle2, mut rx2) = multiplexer
60///        .add_session(
61///            net::Ipv4Addr::LOCALHOST.into(),
62///            multiplexer.platform_echo_id(IpVersion::V4).unwrap_or_else(rand::random),
63///            rand::random::<[u8; 32]>().to_vec(),
64///        )
65///        .await?;
66///
67///     // Using distinct `seq` just to show that sessions are disambiguated.
68///     // Typically you would start at 0 and increment for multiple pings.
69///     let seq1 = EchoSeq::from_be(3);
70///     let seq2 = EchoSeq::from_be(7);
71///
72///     // Receiver task waiting in the background
73///     let receiver = tokio::spawn(async move {
74///         assert_eq!(seq1, rx1.recv().await.unwrap().seq);
75///         assert_eq!(seq2, rx2.recv().await.unwrap().seq);
76///     });
77///
78///     multiplexer.send_ping(handle1, seq1).await?;
79///     multiplexer.send_ping(handle2, seq2).await?;
80///
81///     // Make sure receiver got expected results
82///     receiver.await?;
83///
84///     Ok(())
85/// }
86///
87/// # tokio_test::block_on(ping_demo()).unwrap();
88/// ```
89#[derive(Clone)]
90pub struct PingMultiplexer {
91    state: sync::Arc<MultiplexerClientState>,
92}
93
94impl PingMultiplexer {
95    /// Create a new multiplexer with the provided socket configs.
96    pub fn new(
97        icmpv4_config: SocketConfig<Icmpv4>,
98        icmpv6_config: SocketConfig<Icmpv6>,
99    ) -> io::Result<Self> {
100        let (mut inner, ipv4_local_port, ipv6_local_port, sockets, tx, send_state) =
101            MultiplexTask::new(icmpv4_config, icmpv6_config)?;
102
103        let handle = tokio::spawn(async move {
104            inner.run().await;
105        });
106
107        Ok(Self {
108            state: sync::Arc::new(MultiplexerClientState {
109                commands: tx,
110                ipv4_local_port,
111                ipv6_local_port,
112                sockets,
113                task_handle: Some(handle).into(),
114                send_sessions: send_state,
115                req_pool: opool::Pool::new(4, ReqAllocator),
116            }),
117        })
118    }
119
120    /// Add a session to the multiplexer.
121    ///
122    /// Echo Reply messages with the provided `id` and `data` will cause a [`ReplyTimestamp`] to be
123    /// sent to the returned channel receiver. Timestamps will be sent as they are received from the
124    /// socket, including duplicates, etc.
125    ///
126    /// If the receiver is not drained fast enough, timestamps will be dropped.
127    ///
128    /// If the receiver is dropped, the session may be closed at some point in the future, but to
129    /// reliably release resources, call [`Self::close_session`].
130    ///
131    /// On some platforms (for which [`platform::icmp_send_overwrite_echo_id_with_local_port`] returns true),
132    /// the provided `id` will be overwritten in the kernel by the local port. On such platforms,
133    /// `id` must be the local port for the relevant socket. If a different id is used, echo reply
134    /// messages won't be matched to this session.
135    ///
136    /// See [`Self::platform_echo_id`], [`Self::ipv4_local_port`], and [`Self::ipv6_local_port`].
137    pub async fn add_session(
138        &self,
139        ip: net::IpAddr,
140        id: EchoId,
141        data: Vec<u8>,
142    ) -> Result<(SessionHandle, tmpsc::Receiver<ReplyTimestamp>), AddSessionError> {
143        let (tx, rx) = oneshot::channel();
144        self.send_cmd(
145            MultiplexerCommand::AddSession {
146                ip,
147                id,
148                data,
149                reply: tx,
150            },
151            rx,
152        )
153        .await?
154    }
155
156    /// Send a ping to the IP address specified when the session was created.
157    ///
158    /// Returns the timestamp at which the ICMP message was passed to the socket.
159    pub async fn send_ping(
160        &self,
161        session_handle: SessionHandle,
162        seq: EchoSeq,
163    ) -> Result<time::Instant, SendPingError> {
164        {
165            let (mut req, ip) = {
166                if let Some(session_send_state) = self
167                    .state
168                    .send_sessions
169                    .read()
170                    .unwrap()
171                    .get(&session_handle)
172                {
173                    let mut req = self.state.req_pool.get();
174                    req.set_id(session_send_state.echo_data.id);
175                    req.set_seq(seq);
176                    req.set_data(&session_send_state.echo_data.data);
177                    (req, session_send_state.ip)
178                } else {
179                    return Err(SendPingError::InvalidSessionHandle);
180                }
181            };
182
183            self.state.sockets.send_to_either(&mut *req, ip).await?;
184            debug!("Sent {session_handle:?} seq {seq:?}");
185            Ok(time::Instant::now())
186        }
187    }
188
189    /// Close a session, releasing any resources associated with it.
190    ///
191    /// If the session is open, it will be closed. It is not an error to attempt to close an already
192    /// closed session.
193    pub async fn close_session(&self, session_handle: SessionHandle) -> Result<(), LifecycleError> {
194        let (tx, rx) = oneshot::channel();
195        self.send_cmd(
196            MultiplexerCommand::CloseSession {
197                session_handle,
198                reply: tx,
199            },
200            rx,
201        )
202        .await
203    }
204
205    /// Shutdown the multiplexer.
206    ///
207    /// While dropping this value will eventually shut down the background task, if you
208    /// need to wait until shutdown is complete, this method provides that.
209    ///
210    /// It is not an error to call this multiple times.
211    ///
212    /// Attempts to use the multiplexer (send, etc) after shutdown will result in an error.
213    pub async fn shutdown(&self) {
214        let (tx, rx) = oneshot::channel();
215        if let Err(e) = self.send_cmd(MultiplexerCommand::Shutdown(tx), rx).await {
216            match e {
217                LifecycleError::Shutdown => {
218                    // we're already shutting down, so this is fine
219                }
220            }
221        }
222
223        // holding the lock only momentarily, not across an await point
224        let handle = match self.state.task_handle.lock().unwrap().take() {
225            Some(h) => h,
226            None => return,
227        };
228
229        if let Err(e) = handle.await {
230            debug!("Inner task exited with error: {}", e);
231        };
232    }
233
234    /// Returns the local port used by the IPv4 listen socket.
235    ///
236    /// See [`Self::ipv6_local_port`] for the IPv6 equivalent, and [`Self::platform_echo_id`]
237    /// for use as an echo id.
238    pub fn ipv4_local_port(&self) -> u16 {
239        self.state.ipv4_local_port
240    }
241
242    /// Returns the local port used by the IPv6 listen socket.
243    ///
244    /// See [`Self::ipv4_local_port`] for the IPv4 equivalent, and [`Self::platform_echo_id`]
245    /// for use as an echo id.
246    pub fn ipv6_local_port(&self) -> u16 {
247        self.state.ipv6_local_port
248    }
249
250    /// Returns the local port used by the IPv4 or IPv6 listen socket (per `ip_version` ) as an
251    /// [`EchoId`], if it required to be used as the `id` in ICMP Echo Request messages by the local
252    /// platform -- that is, when [`platform::icmp_send_overwrite_echo_id_with_local_port`]
253    /// returns true.
254    ///
255    /// This is just a possibly more convenient way to turn [`Self::ipv4_local_port`] or
256    /// [`Self::ipv6_local_port`] into an [`EchoId`].
257    ///
258    /// # Examples
259    ///
260    /// Get the IPv4 local port as an EchoId, if required by the platform, otherwise use a random
261    /// one.
262    ///
263    /// ```
264    /// use async_icmp::{message::echo::EchoId, ping::PingMultiplexer, platform, IpVersion};
265    ///
266    /// fn get_ipv4_echo_id(multiplexer: &PingMultiplexer) -> EchoId {
267    ///     multiplexer.platform_echo_id(IpVersion::V4).unwrap_or_else(rand::random)
268    /// }
269    /// ```
270    pub fn platform_echo_id(&self, ip_version: IpVersion) -> Option<EchoId> {
271        if platform::icmp_send_overwrite_echo_id_with_local_port() {
272            let port = match ip_version {
273                IpVersion::V4 => self.ipv4_local_port(),
274                IpVersion::V6 => self.ipv6_local_port(),
275            };
276
277            Some(EchoId::from_be(port))
278        } else {
279            None
280        }
281    }
282
283    async fn send_cmd<T>(
284        &self,
285        cmd: MultiplexerCommand,
286        rx: oneshot::Receiver<T>,
287    ) -> Result<T, LifecycleError> {
288        // If the cmd receiver or reply sender are closed or dropped, treat as shutdown
289        self.state
290            .commands
291            .send(cmd)
292            .await
293            .map_err(|_| LifecycleError::Shutdown)?;
294        rx.await.map_err(|_| LifecycleError::Shutdown)
295    }
296}
297
298impl fmt::Debug for PingMultiplexer {
299    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
300        write!(f, "PingMultiplexer")
301    }
302}
303
304/// State for being the client end of working with [`MultiplexerTask`].
305struct MultiplexerClientState {
306    /// Tx side to communicate with recv task
307    commands: tmpsc::Sender<MultiplexerCommand>,
308    /// Used for sending only.
309    ///
310    /// Recv task does all the receiving, with a clone of the same Arc.
311    sockets: sync::Arc<SocketPair>,
312    ipv4_local_port: u16,
313    ipv6_local_port: u16,
314    /// Handle for the recv task.
315    ///
316    /// `Some` if not shut down yet.
317    ///
318    /// Behind a lock to allow shutdown to only take &self.
319    task_handle: sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
320    /// Contents maintained by the background task.
321    ///
322    /// We only use read locks to get session data when sending.
323    send_sessions: sync::Arc<sync::RwLock<hashbrown::HashMap<SessionHandle, SendSessionState>>>,
324    /// Object pool for ICMP requests to lower steady state allocation
325    req_pool: opool::Pool<ReqAllocator, IcmpEchoRequest>,
326}
327
328struct ReqAllocator;
329
330impl opool::PoolAllocator<IcmpEchoRequest> for ReqAllocator {
331    fn allocate(&self) -> IcmpEchoRequest {
332        IcmpEchoRequest::new()
333    }
334}