async_icmp/ping/mod.rs
1//! High level "ping" (ICMP Echo / Echo Reply) API, built on top of the rest of this library.
2//!
3//! [`PingMultiplexer`] supports pinging multiple hosts concurrently, both IPv4 and IPv6, with flexible Echo contents.
4
5use log::debug;
6use std::{fmt, io, net, sync, time};
7use tokio::sync::{mpsc as tmpsc, oneshot};
8
9mod multiplexer_task;
10#[cfg(test)]
11mod tests;
12
13use crate::{
14 message::echo::{EchoId, EchoSeq, IcmpEchoRequest},
15 ping::multiplexer_task::{MultiplexTask, MultiplexerCommand, SendSessionState},
16 platform,
17 socket::{SocketConfig, SocketPair},
18 Icmpv4, Icmpv6, IpVersion,
19};
20pub use multiplexer_task::{
21 AddSessionError, LifecycleError, ReplyTimestamp, SendPingError, SessionHandle,
22};
23
24/// A high-level ping (ICMP echo / echo reply) API.
25///
26/// This addresses the common case of handling one or more ping "sessions": for a given
27/// IP address, ICMP Echo ID, and ICMP Echo Data, you send one or more pings (ICMP Echo).
28///
29/// Each session has its own channel for responses (ICMP Echo Reply).
30///
31/// Because this is designed to be used from many tasks simultaneously, it is safe to `clone()`
32/// as it uses `Arc` internally.
33///
34/// # Examples
35///
36/// Create two ping sessions on one socket, and send one ping with each.
37///
38/// ```
39/// use std::net;
40/// use async_icmp::{
41/// message::echo::EchoSeq,
42/// IpVersion,
43/// ping::PingMultiplexer,
44/// socket::SocketConfig
45/// };
46///
47/// async fn ping_demo() -> anyhow::Result<()> {
48/// let multiplexer = PingMultiplexer::new(SocketConfig::default(), SocketConfig::default())?;
49///
50/// // Two sessions with distinct `data` (and possibly id, platform permitting)
51/// let (handle1, mut rx1) = multiplexer
52/// .add_session(
53/// net::Ipv4Addr::LOCALHOST.into(),
54/// multiplexer.platform_echo_id(IpVersion::V4).unwrap_or_else(rand::random),
55/// rand::random::<[u8; 32]>().to_vec(),
56/// )
57/// .await?;
58///
59/// let (handle2, mut rx2) = multiplexer
60/// .add_session(
61/// net::Ipv4Addr::LOCALHOST.into(),
62/// multiplexer.platform_echo_id(IpVersion::V4).unwrap_or_else(rand::random),
63/// rand::random::<[u8; 32]>().to_vec(),
64/// )
65/// .await?;
66///
67/// // Using distinct `seq` just to show that sessions are disambiguated.
68/// // Typically you would start at 0 and increment for multiple pings.
69/// let seq1 = EchoSeq::from_be(3);
70/// let seq2 = EchoSeq::from_be(7);
71///
72/// // Receiver task waiting in the background
73/// let receiver = tokio::spawn(async move {
74/// assert_eq!(seq1, rx1.recv().await.unwrap().seq);
75/// assert_eq!(seq2, rx2.recv().await.unwrap().seq);
76/// });
77///
78/// multiplexer.send_ping(handle1, seq1).await?;
79/// multiplexer.send_ping(handle2, seq2).await?;
80///
81/// // Make sure receiver got expected results
82/// receiver.await?;
83///
84/// Ok(())
85/// }
86///
87/// # tokio_test::block_on(ping_demo()).unwrap();
88/// ```
89#[derive(Clone)]
90pub struct PingMultiplexer {
91 state: sync::Arc<MultiplexerClientState>,
92}
93
94impl PingMultiplexer {
95 /// Create a new multiplexer with the provided socket configs.
96 pub fn new(
97 icmpv4_config: SocketConfig<Icmpv4>,
98 icmpv6_config: SocketConfig<Icmpv6>,
99 ) -> io::Result<Self> {
100 let (mut inner, ipv4_local_port, ipv6_local_port, sockets, tx, send_state) =
101 MultiplexTask::new(icmpv4_config, icmpv6_config)?;
102
103 let handle = tokio::spawn(async move {
104 inner.run().await;
105 });
106
107 Ok(Self {
108 state: sync::Arc::new(MultiplexerClientState {
109 commands: tx,
110 ipv4_local_port,
111 ipv6_local_port,
112 sockets,
113 task_handle: Some(handle).into(),
114 send_sessions: send_state,
115 req_pool: opool::Pool::new(4, ReqAllocator),
116 }),
117 })
118 }
119
120 /// Add a session to the multiplexer.
121 ///
122 /// Echo Reply messages with the provided `id` and `data` will cause a [`ReplyTimestamp`] to be
123 /// sent to the returned channel receiver. Timestamps will be sent as they are received from the
124 /// socket, including duplicates, etc.
125 ///
126 /// If the receiver is not drained fast enough, timestamps will be dropped.
127 ///
128 /// If the receiver is dropped, the session may be closed at some point in the future, but to
129 /// reliably release resources, call [`Self::close_session`].
130 ///
131 /// On some platforms (for which [`platform::icmp_send_overwrite_echo_id_with_local_port`] returns true),
132 /// the provided `id` will be overwritten in the kernel by the local port. On such platforms,
133 /// `id` must be the local port for the relevant socket. If a different id is used, echo reply
134 /// messages won't be matched to this session.
135 ///
136 /// See [`Self::platform_echo_id`], [`Self::ipv4_local_port`], and [`Self::ipv6_local_port`].
137 pub async fn add_session(
138 &self,
139 ip: net::IpAddr,
140 id: EchoId,
141 data: Vec<u8>,
142 ) -> Result<(SessionHandle, tmpsc::Receiver<ReplyTimestamp>), AddSessionError> {
143 let (tx, rx) = oneshot::channel();
144 self.send_cmd(
145 MultiplexerCommand::AddSession {
146 ip,
147 id,
148 data,
149 reply: tx,
150 },
151 rx,
152 )
153 .await?
154 }
155
156 /// Send a ping to the IP address specified when the session was created.
157 ///
158 /// Returns the timestamp at which the ICMP message was passed to the socket.
159 pub async fn send_ping(
160 &self,
161 session_handle: SessionHandle,
162 seq: EchoSeq,
163 ) -> Result<time::Instant, SendPingError> {
164 {
165 let (mut req, ip) = {
166 if let Some(session_send_state) = self
167 .state
168 .send_sessions
169 .read()
170 .unwrap()
171 .get(&session_handle)
172 {
173 let mut req = self.state.req_pool.get();
174 req.set_id(session_send_state.echo_data.id);
175 req.set_seq(seq);
176 req.set_data(&session_send_state.echo_data.data);
177 (req, session_send_state.ip)
178 } else {
179 return Err(SendPingError::InvalidSessionHandle);
180 }
181 };
182
183 self.state.sockets.send_to_either(&mut *req, ip).await?;
184 debug!("Sent {session_handle:?} seq {seq:?}");
185 Ok(time::Instant::now())
186 }
187 }
188
189 /// Close a session, releasing any resources associated with it.
190 ///
191 /// If the session is open, it will be closed. It is not an error to attempt to close an already
192 /// closed session.
193 pub async fn close_session(&self, session_handle: SessionHandle) -> Result<(), LifecycleError> {
194 let (tx, rx) = oneshot::channel();
195 self.send_cmd(
196 MultiplexerCommand::CloseSession {
197 session_handle,
198 reply: tx,
199 },
200 rx,
201 )
202 .await
203 }
204
205 /// Shutdown the multiplexer.
206 ///
207 /// While dropping this value will eventually shut down the background task, if you
208 /// need to wait until shutdown is complete, this method provides that.
209 ///
210 /// It is not an error to call this multiple times.
211 ///
212 /// Attempts to use the multiplexer (send, etc) after shutdown will result in an error.
213 pub async fn shutdown(&self) {
214 let (tx, rx) = oneshot::channel();
215 if let Err(e) = self.send_cmd(MultiplexerCommand::Shutdown(tx), rx).await {
216 match e {
217 LifecycleError::Shutdown => {
218 // we're already shutting down, so this is fine
219 }
220 }
221 }
222
223 // holding the lock only momentarily, not across an await point
224 let handle = match self.state.task_handle.lock().unwrap().take() {
225 Some(h) => h,
226 None => return,
227 };
228
229 if let Err(e) = handle.await {
230 debug!("Inner task exited with error: {}", e);
231 };
232 }
233
234 /// Returns the local port used by the IPv4 listen socket.
235 ///
236 /// See [`Self::ipv6_local_port`] for the IPv6 equivalent, and [`Self::platform_echo_id`]
237 /// for use as an echo id.
238 pub fn ipv4_local_port(&self) -> u16 {
239 self.state.ipv4_local_port
240 }
241
242 /// Returns the local port used by the IPv6 listen socket.
243 ///
244 /// See [`Self::ipv4_local_port`] for the IPv4 equivalent, and [`Self::platform_echo_id`]
245 /// for use as an echo id.
246 pub fn ipv6_local_port(&self) -> u16 {
247 self.state.ipv6_local_port
248 }
249
250 /// Returns the local port used by the IPv4 or IPv6 listen socket (per `ip_version` ) as an
251 /// [`EchoId`], if it required to be used as the `id` in ICMP Echo Request messages by the local
252 /// platform -- that is, when [`platform::icmp_send_overwrite_echo_id_with_local_port`]
253 /// returns true.
254 ///
255 /// This is just a possibly more convenient way to turn [`Self::ipv4_local_port`] or
256 /// [`Self::ipv6_local_port`] into an [`EchoId`].
257 ///
258 /// # Examples
259 ///
260 /// Get the IPv4 local port as an EchoId, if required by the platform, otherwise use a random
261 /// one.
262 ///
263 /// ```
264 /// use async_icmp::{message::echo::EchoId, ping::PingMultiplexer, platform, IpVersion};
265 ///
266 /// fn get_ipv4_echo_id(multiplexer: &PingMultiplexer) -> EchoId {
267 /// multiplexer.platform_echo_id(IpVersion::V4).unwrap_or_else(rand::random)
268 /// }
269 /// ```
270 pub fn platform_echo_id(&self, ip_version: IpVersion) -> Option<EchoId> {
271 if platform::icmp_send_overwrite_echo_id_with_local_port() {
272 let port = match ip_version {
273 IpVersion::V4 => self.ipv4_local_port(),
274 IpVersion::V6 => self.ipv6_local_port(),
275 };
276
277 Some(EchoId::from_be(port))
278 } else {
279 None
280 }
281 }
282
283 async fn send_cmd<T>(
284 &self,
285 cmd: MultiplexerCommand,
286 rx: oneshot::Receiver<T>,
287 ) -> Result<T, LifecycleError> {
288 // If the cmd receiver or reply sender are closed or dropped, treat as shutdown
289 self.state
290 .commands
291 .send(cmd)
292 .await
293 .map_err(|_| LifecycleError::Shutdown)?;
294 rx.await.map_err(|_| LifecycleError::Shutdown)
295 }
296}
297
298impl fmt::Debug for PingMultiplexer {
299 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
300 write!(f, "PingMultiplexer")
301 }
302}
303
304/// State for being the client end of working with [`MultiplexerTask`].
305struct MultiplexerClientState {
306 /// Tx side to communicate with recv task
307 commands: tmpsc::Sender<MultiplexerCommand>,
308 /// Used for sending only.
309 ///
310 /// Recv task does all the receiving, with a clone of the same Arc.
311 sockets: sync::Arc<SocketPair>,
312 ipv4_local_port: u16,
313 ipv6_local_port: u16,
314 /// Handle for the recv task.
315 ///
316 /// `Some` if not shut down yet.
317 ///
318 /// Behind a lock to allow shutdown to only take &self.
319 task_handle: sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
320 /// Contents maintained by the background task.
321 ///
322 /// We only use read locks to get session data when sending.
323 send_sessions: sync::Arc<sync::RwLock<hashbrown::HashMap<SessionHandle, SendSessionState>>>,
324 /// Object pool for ICMP requests to lower steady state allocation
325 req_pool: opool::Pool<ReqAllocator, IcmpEchoRequest>,
326}
327
328struct ReqAllocator;
329
330impl opool::PoolAllocator<IcmpEchoRequest> for ReqAllocator {
331 fn allocate(&self) -> IcmpEchoRequest {
332 IcmpEchoRequest::new()
333 }
334}