1#![allow(warnings)]
3
4#[macro_use]
5extern crate tracing;
6
7mod addrs;
8pub(crate) use addrs::{AddrTable, Peer};
9
10mod socket;
11pub(crate) use socket::Socket;
12
13mod framing;
14pub(crate) use framing::{Envelope, FrameExt};
15
16use async_std::{sync::Arc, task};
17use async_trait::async_trait;
18use netmod::{Endpoint as EndpointExt, Frame, Recipient, Result, Target};
19use std::net::ToSocketAddrs;
20
21#[derive(Clone)]
22pub struct Endpoint {
23 socket: Arc<Socket>,
24 addrs: Arc<AddrTable>,
25}
26
27impl Endpoint {
28 pub fn spawn(port: u16) -> Arc<Self> {
30 task::block_on(async move {
31 let addrs = Arc::new(AddrTable::new());
32 Arc::new(Self {
33 socket: Socket::with_port(port, Arc::clone(&addrs)).await,
34 addrs,
35 })
36 })
37 }
38
39 pub async fn introduce<A: ToSocketAddrs>(&self, addr: A) -> std::io::Result<()> {
41 for addr in addr.to_socket_addrs()? {
42 self.addrs.set(addr).await;
43 }
44 Ok(())
45 }
46
47 #[cfg(test)]
48 pub async fn peers(&self) -> usize {
49 self.addrs.all().await.len()
50 }
51}
52
53#[async_trait]
54impl EndpointExt for Endpoint {
55 fn size_hint(&self) -> usize {
56 0
57 }
58
59 async fn send(&self, frame: Frame, target: Target) -> Result<()> {
60 match target {
61 Target::Single(ref id) => {
63 self.socket
64 .send(&frame, self.addrs.ip(*id).await.unwrap())
65 .await
66 }
67 Target::Flood => {
68 let addrs = self.addrs.all().await;
69 self.socket.send_many(&frame, addrs).await;
70 }
71 }
72
73 Ok(())
74 }
75
76 async fn next(&self) -> Result<(Frame, Target)> {
77 let fe = self.socket.next().await;
78 Ok((fe.0, fe.1))
79 }
80}
81
82#[test]
95#[ignore]
96fn discover() {
97 task::block_on(async {
98 use async_std::net::{IpAddr, Ipv4Addr};
99
100 let p1 = Peer {
101 ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
102 port: 11000,
103 };
104 let p2 = Peer {
105 ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
106 port: 11001,
107 };
108
109 let e1 = Endpoint::spawn(p1.port);
110 let e2 = Endpoint::spawn(p2.port);
111
112 std::thread::sleep_ms(5000);
113
114 assert_eq!(task::block_on(async { e1.peers().await }), 1);
115 })
116}