Skip to main content

tokio_multicast/
diagnostics.rs

1use std::io;
2use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
3use std::time::Duration;
4
5use socket2::{Domain, Protocol, Socket, Type};
6
7use crate::{raw, sys, Interface, Membership};
8
9#[derive(Debug, Clone)]
10pub struct MulticastDiagnosticConfig {
11    pub ipv4_group: Ipv4Addr,
12    pub ipv6_group: Ipv6Addr,
13    pub ipv6_interface: Option<u32>,
14    pub timeout: Duration,
15}
16
17impl Default for MulticastDiagnosticConfig {
18    fn default() -> Self {
19        Self {
20            ipv4_group: Ipv4Addr::new(239, 1, 1, 250),
21            ipv6_group: Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 0x114),
22            ipv6_interface: sys::loopback_interface_v6(),
23            timeout: Duration::from_millis(500),
24        }
25    }
26}
27
28#[derive(Debug, Clone, Default)]
29pub struct ProbeStages {
30    pub socket_created: bool,
31    pub bound: bool,
32    pub joined: bool,
33    pub loopback_sent: bool,
34    pub loopback_received: bool,
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum ProbeErrorKind {
39    Unavailable,
40    SocketCreate,
41    Bind,
42    Join,
43    Send,
44    Receive,
45    InvalidData,
46    Other,
47}
48
49#[derive(Debug, Clone)]
50pub struct ProbeResult {
51    pub label: &'static str,
52    pub supported: bool,
53    pub stages: ProbeStages,
54    pub error_kind: Option<ProbeErrorKind>,
55    pub details: Option<String>,
56    pub error: Option<String>,
57}
58
59impl ProbeResult {
60    fn success(label: &'static str, stages: ProbeStages, details: impl Into<String>) -> Self {
61        Self {
62            label,
63            supported: true,
64            stages,
65            error_kind: None,
66            details: Some(details.into()),
67            error: None,
68        }
69    }
70
71    fn failure(
72        label: &'static str,
73        stages: ProbeStages,
74        error_kind: ProbeErrorKind,
75        error: impl Into<String>,
76        details: Option<String>,
77    ) -> Self {
78        Self {
79            label,
80            supported: false,
81            stages,
82            error_kind: Some(error_kind),
83            details,
84            error: Some(error.into()),
85        }
86    }
87}
88
89#[derive(Debug, Clone)]
90pub struct MulticastDiagnostics {
91    pub ipv4: ProbeResult,
92    pub ipv6: ProbeResult,
93}
94
95impl MulticastDiagnostics {
96    pub fn supported(&self) -> bool {
97        self.ipv4.supported || self.ipv6.supported
98    }
99}
100
101pub fn diagnose_multicast() -> MulticastDiagnostics {
102    diagnose_multicast_with_config(&MulticastDiagnosticConfig::default())
103}
104
105pub fn diagnose_multicast_with_config(config: &MulticastDiagnosticConfig) -> MulticastDiagnostics {
106    MulticastDiagnostics {
107        ipv4: diagnose_ipv4(config),
108        ipv6: diagnose_ipv6(config),
109    }
110}
111
112fn diagnose_ipv4(config: &MulticastDiagnosticConfig) -> ProbeResult {
113    let label = "ipv4";
114    let group = config.ipv4_group;
115
116    match try_ipv4_probe(group, config.timeout) {
117        Ok((receiver_addr, sender_addr)) => ProbeResult::success(
118            label,
119            ProbeStages {
120                socket_created: true,
121                bound: true,
122                joined: true,
123                loopback_sent: true,
124                loopback_received: true,
125            },
126            format!(
127                "bound receiver {receiver_addr}, sent from {sender_addr}, joined {group}, timeout {:?}",
128                config.timeout
129            ),
130        ),
131        Err((stages, kind, err, details)) => {
132            ProbeResult::failure(label, stages, kind, err.to_string(), details)
133        }
134    }
135}
136
137fn diagnose_ipv6(config: &MulticastDiagnosticConfig) -> ProbeResult {
138    let label = "ipv6";
139    let group = config.ipv6_group;
140    let Some(ifindex) = config.ipv6_interface else {
141        return ProbeResult::failure(
142            label,
143            ProbeStages::default(),
144            ProbeErrorKind::Unavailable,
145            "no loopback IPv6 interface index available",
146            None,
147        );
148    };
149
150    match try_ipv6_probe(group, ifindex, config.timeout) {
151        Ok((receiver_addr, sender_addr)) => ProbeResult::success(
152            label,
153            ProbeStages {
154                socket_created: true,
155                bound: true,
156                joined: true,
157                loopback_sent: true,
158                loopback_received: true,
159            },
160            format!(
161                "bound receiver {receiver_addr}, sent from {sender_addr}, joined {group}%{ifindex}, timeout {:?}",
162                config.timeout
163            ),
164        ),
165        Err((stages, kind, err, details)) => {
166            ProbeResult::failure(label, stages, kind, err.to_string(), details)
167        }
168    }
169}
170
171fn try_ipv4_probe(
172    group: Ipv4Addr,
173    timeout: Duration,
174) -> std::result::Result<
175    (SocketAddr, SocketAddr),
176    (ProbeStages, ProbeErrorKind, io::Error, Option<String>),
177> {
178    let mut stages = ProbeStages::default();
179
180    let receiver = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))
181        .map_err(|err| (stages.clone(), ProbeErrorKind::SocketCreate, err, None))?;
182    stages.socket_created = true;
183    receiver
184        .set_reuse_address(true)
185        .map_err(|err| (stages.clone(), ProbeErrorKind::Other, err, None))?;
186    receiver
187        .bind(&SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0)).into())
188        .map_err(|err| (stages.clone(), ProbeErrorKind::Bind, err, None))?;
189    stages.bound = true;
190    receiver
191        .join_multicast_v4(&group, &Ipv4Addr::UNSPECIFIED)
192        .map_err(|err| (stages.clone(), ProbeErrorKind::Join, err, None))?;
193    stages.joined = true;
194
195    let receiver: UdpSocket = receiver.into();
196    receiver
197        .set_read_timeout(Some(timeout))
198        .map_err(|err| (stages.clone(), ProbeErrorKind::Other, err, None))?;
199    let receiver_addr = receiver
200        .local_addr()
201        .map_err(|err| (stages.clone(), ProbeErrorKind::Other, err, None))?;
202
203    let sender = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))
204        .map_err(|err| {
205            (
206                stages.clone(),
207                ProbeErrorKind::SocketCreate,
208                err,
209                Some(format!("receiver bound at {receiver_addr}")),
210            )
211        })?;
212    sender
213        .bind(&SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0)).into())
214        .map_err(|err| {
215            (
216                stages.clone(),
217                ProbeErrorKind::Bind,
218                err,
219                Some(format!("receiver bound at {receiver_addr}")),
220            )
221        })?;
222    sender
223        .set_multicast_loop_v4(true)
224        .map_err(|err| {
225            (
226                stages.clone(),
227                ProbeErrorKind::Other,
228                err,
229                Some(format!("receiver bound at {receiver_addr}")),
230            )
231        })?;
232
233    let sender: UdpSocket = sender.into();
234    let sender_addr = sender
235        .local_addr()
236        .map_err(|err| {
237            (
238                stages.clone(),
239                ProbeErrorKind::Other,
240                err,
241                Some(format!("receiver bound at {receiver_addr}")),
242            )
243        })?;
244    sender
245        .send_to(b"diag", SocketAddr::from((group, receiver_addr.port())))
246        .map_err(|err| {
247            (
248                stages.clone(),
249                ProbeErrorKind::Send,
250                err,
251                Some(format!("receiver bound at {receiver_addr}")),
252            )
253        })?;
254    stages.loopback_sent = true;
255
256    let mut buf = [0_u8; 16];
257    let (n, _) = receiver.recv_from(&mut buf).map_err(|err| {
258        (
259            stages.clone(),
260            ProbeErrorKind::Receive,
261            err,
262            Some(format!(
263                "receiver bound at {receiver_addr}, sender bound at {sender_addr}"
264            )),
265        )
266    })?;
267    if &buf[..n] != b"diag" {
268        return Err((
269            stages,
270            ProbeErrorKind::InvalidData,
271            io::Error::new(io::ErrorKind::InvalidData, "unexpected loopback payload"),
272            Some(format!(
273                "receiver bound at {receiver_addr}, sender bound at {sender_addr}"
274            )),
275        ));
276    }
277
278    Ok((
279        receiver_addr,
280        sender_addr,
281    ))
282}
283
284fn try_ipv6_probe(
285    group: Ipv6Addr,
286    ifindex: u32,
287    timeout: Duration,
288) -> std::result::Result<
289    (SocketAddr, SocketAddr),
290    (ProbeStages, ProbeErrorKind, io::Error, Option<String>),
291> {
292    let mut stages = ProbeStages::default();
293
294    let receiver = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))
295        .map_err(|err| (stages.clone(), ProbeErrorKind::SocketCreate, err, None))?;
296    stages.socket_created = true;
297    receiver
298        .set_reuse_address(true)
299        .map_err(|err| (stages.clone(), ProbeErrorKind::Other, err, None))?;
300    receiver
301        .bind(&raw::group_as_v6_socket(Ipv6Addr::UNSPECIFIED, 0, 0).into())
302        .map_err(|err| (stages.clone(), ProbeErrorKind::Bind, err, None))?;
303    stages.bound = true;
304    sys::join_membership(
305        &receiver,
306        &Membership::any_source(group.into()),
307        Some(&Interface::V6(ifindex)),
308    )
309    .map_err(|err| (stages.clone(), ProbeErrorKind::Join, map_multicast_error(err), None))?;
310    stages.joined = true;
311
312    let receiver: UdpSocket = receiver.into();
313    receiver
314        .set_read_timeout(Some(timeout))
315        .map_err(|err| (stages.clone(), ProbeErrorKind::Other, err, None))?;
316    let receiver_addr = receiver
317        .local_addr()
318        .map_err(|err| (stages.clone(), ProbeErrorKind::Other, err, None))?;
319
320    let sender = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))
321        .map_err(|err| {
322            (
323                stages.clone(),
324                ProbeErrorKind::SocketCreate,
325                err,
326                Some(format!("receiver bound at {receiver_addr}")),
327            )
328        })?;
329    sender
330        .bind(&raw::group_as_v6_socket(Ipv6Addr::UNSPECIFIED, 0, 0).into())
331        .map_err(|err| {
332            (
333                stages.clone(),
334                ProbeErrorKind::Bind,
335                err,
336                Some(format!("receiver bound at {receiver_addr}")),
337            )
338        })?;
339    sender
340        .set_multicast_if_v6(ifindex)
341        .map_err(|err| {
342            (
343                stages.clone(),
344                ProbeErrorKind::Other,
345                err,
346                Some(format!("receiver bound at {receiver_addr}")),
347            )
348        })?;
349    sender
350        .set_multicast_loop_v6(true)
351        .map_err(|err| {
352            (
353                stages.clone(),
354                ProbeErrorKind::Other,
355                err,
356                Some(format!("receiver bound at {receiver_addr}")),
357            )
358        })?;
359
360    let sender: UdpSocket = sender.into();
361    let sender_addr = sender
362        .local_addr()
363        .map_err(|err| {
364            (
365                stages.clone(),
366                ProbeErrorKind::Other,
367                err,
368                Some(format!("receiver bound at {receiver_addr}")),
369            )
370        })?;
371    sender
372        .send_to(b"diag", raw::group_as_v6_socket(group, receiver_addr.port(), ifindex))
373        .map_err(|err| {
374            (
375                stages.clone(),
376                ProbeErrorKind::Send,
377                err,
378                Some(format!(
379                    "receiver bound at {receiver_addr}, sender bound at {sender_addr}"
380                )),
381            )
382        })?;
383    stages.loopback_sent = true;
384
385    let mut buf = [0_u8; 16];
386    let (n, _) = receiver.recv_from(&mut buf).map_err(|err| {
387        (
388            stages.clone(),
389            ProbeErrorKind::Receive,
390            err,
391            Some(format!(
392                "receiver bound at {receiver_addr}, sender bound at {sender_addr}, ifindex {ifindex}"
393            )),
394        )
395    })?;
396    if &buf[..n] != b"diag" {
397        return Err((
398            stages,
399            ProbeErrorKind::InvalidData,
400            io::Error::new(io::ErrorKind::InvalidData, "unexpected loopback payload"),
401            Some(format!(
402                "receiver bound at {receiver_addr}, sender bound at {sender_addr}, ifindex {ifindex}"
403            )),
404        ));
405    }
406
407    Ok((
408        receiver_addr,
409        sender_addr,
410    ))
411}
412
413fn map_multicast_error(err: crate::MulticastError) -> io::Error {
414    match err {
415        crate::MulticastError::Io(source) => source,
416        crate::MulticastError::BindFailed { source, .. } => source,
417        other => io::Error::other(other.to_string()),
418    }
419}