1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#![allow(warnings)]
#[macro_use]
extern crate tracing;
mod addrs;
pub(crate) use addrs::{AddrTable, Peer};
mod socket;
pub(crate) use socket::Socket;
mod framing;
pub(crate) use framing::{Envelope, FrameExt};
use async_std::{sync::Arc, task};
use async_trait::async_trait;
use netmod::{Endpoint as EndpointExt, Frame, Recipient, Result, Target};
use std::net::ToSocketAddrs;
#[derive(Clone)]
pub struct Endpoint {
socket: Arc<Socket>,
addrs: Arc<AddrTable>,
}
impl Endpoint {
pub fn spawn(port: u16) -> Arc<Self> {
task::block_on(async move {
let addrs = Arc::new(AddrTable::new());
Arc::new(Self {
socket: Socket::with_port(port, Arc::clone(&addrs)).await,
addrs,
})
})
}
pub async fn introduce<A: ToSocketAddrs>(&self, addr: A) -> std::io::Result<()> {
for addr in addr.to_socket_addrs()? {
self.addrs.set(addr).await;
}
Ok(())
}
#[cfg(test)]
pub async fn peers(&self) -> usize {
self.addrs.all().await.len()
}
}
#[async_trait]
impl EndpointExt for Endpoint {
fn size_hint(&self) -> usize {
0
}
async fn send(&self, frame: Frame, target: Target) -> Result<()> {
match target {
Target::Single(ref id) => {
self.socket
.send(&frame, self.addrs.ip(*id).await.unwrap())
.await
}
Target::Flood => {
let addrs = self.addrs.all().await;
self.socket.send_many(&frame, addrs).await;
}
}
Ok(())
}
async fn next(&self) -> Result<(Frame, Target)> {
let fe = self.socket.next().await;
Ok((fe.0, fe.1))
}
}
#[test]
#[ignore]
fn discover() {
task::block_on(async {
use async_std::net::{IpAddr, Ipv4Addr};
let p1 = Peer {
ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port: 11000,
};
let p2 = Peer {
ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port: 11001,
};
let e1 = Endpoint::spawn(p1.port);
let e2 = Endpoint::spawn(p2.port);
std::thread::sleep_ms(5000);
assert_eq!(task::block_on(async { e1.peers().await }), 1);
})
}