netmod_udp/
lib.rs

1//! netmod-udp is a UDP overlay for Ratman
2#![allow(warnings)]
3
4#[macro_use]
5extern crate tracing;
6
7mod addrs;
8pub(crate) use addrs::{AddrTable, Peer};
9
10mod socket;
11pub(crate) use socket::Socket;
12
13mod framing;
14pub(crate) use framing::{Envelope, FrameExt};
15
16use async_std::{sync::Arc, task};
17use async_trait::async_trait;
18use netmod::{Endpoint as EndpointExt, Frame, Recipient, Result, Target};
19use std::net::ToSocketAddrs;
20
21#[derive(Clone)]
22pub struct Endpoint {
23    socket: Arc<Socket>,
24    addrs: Arc<AddrTable>,
25}
26
27impl Endpoint {
28    /// Create a new endpoint and spawn a dispatch task
29    pub fn spawn(port: u16) -> Arc<Self> {
30        task::block_on(async move {
31            let addrs = Arc::new(AddrTable::new());
32            Arc::new(Self {
33                socket: Socket::with_port(port, Arc::clone(&addrs)).await,
34                addrs,
35            })
36        })
37    }
38
39    /// Manually introduce this endpoint to other endpoints
40    pub async fn introduce<A: ToSocketAddrs>(&self, addr: A) -> std::io::Result<()> {
41        for addr in addr.to_socket_addrs()? {
42            self.addrs.set(addr).await;
43        }
44        Ok(())
45    }
46
47    #[cfg(test)]
48    pub async fn peers(&self) -> usize {
49        self.addrs.all().await.len()
50    }
51}
52
53#[async_trait]
54impl EndpointExt for Endpoint {
55    fn size_hint(&self) -> usize {
56        0
57    }
58
59    async fn send(&self, frame: Frame, target: Target) -> Result<()> {
60        match target {
61            /// Sending to a user,
62            Target::Single(ref id) => {
63                self.socket
64                    .send(&frame, self.addrs.ip(*id).await.unwrap())
65                    .await
66            }
67            Target::Flood => {
68                let addrs = self.addrs.all().await;
69                self.socket.send_many(&frame, addrs).await;
70            }
71        }
72
73        Ok(())
74    }
75
76    async fn next(&self) -> Result<(Frame, Target)> {
77        let fe = self.socket.next().await;
78        Ok((fe.0, fe.1))
79    }
80}
81
82/// A test that makes two instances on the same device see each other
83///
84/// In theory this test is a good idea, but in practise it doesn't
85/// work.  The multicast protocol doesn't filter by port, but the
86/// implementation on Linux does.  This means that unless all
87/// participants on the same device are on the same multicast port, we
88/// can't get the multicast messages from each other because they're
89/// being filtered by the Kernel.
90///
91/// We still wanna keep this test around just in case we can run on a
92/// platform that doesn't do this, or when we can set the
93/// non-exclusive port option.
94#[test]
95#[ignore]
96fn discover() {
97    task::block_on(async {
98        use async_std::net::{IpAddr, Ipv4Addr};
99
100        let p1 = Peer {
101            ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
102            port: 11000,
103        };
104        let p2 = Peer {
105            ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
106            port: 11001,
107        };
108
109        let e1 = Endpoint::spawn(p1.port);
110        let e2 = Endpoint::spawn(p2.port);
111
112        std::thread::sleep_ms(5000);
113
114        assert_eq!(task::block_on(async { e1.peers().await }), 1);
115    })
116}