1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use crate::MAX_FRAME_SIZE;
use crate::frame;
use super::endpoint;
use super::peer;
use super::udp_frame_sink::UdpFrameSink;
use std::cell::RefCell;
use std::collections::HashMap;
use std::net;
use std::rc::Rc;
pub struct Server {
socket: net::UdpSocket,
max_connections: usize,
incoming_config: endpoint::Config,
endpoints: HashMap<net::SocketAddr, Rc<RefCell<endpoint::Endpoint>>>,
incoming_peers: Vec<peer::Peer>,
}
impl Server {
pub fn bind<A: net::ToSocketAddrs>(addr: A,
max_connections: usize,
incoming_config: endpoint::Config) -> Result<Self, std::io::Error> {
assert!(incoming_config.is_valid(), "invalid endpoint config");
let socket = net::UdpSocket::bind(addr)?;
socket.set_nonblocking(true)?;
Ok(Self {
socket,
endpoints: HashMap::new(),
incoming_peers: Vec::new(),
max_connections,
incoming_config,
})
}
pub fn bind_any_ipv4(max_connections: usize, incoming_config: endpoint::Config) -> Result<Self, std::io::Error> {
Self::bind((net::Ipv4Addr::UNSPECIFIED, 0), max_connections, incoming_config)
}
pub fn bind_any_ipv6(max_connections: usize, incoming_config: endpoint::Config) -> Result<Self, std::io::Error> {
Self::bind((net::Ipv6Addr::UNSPECIFIED, 0), max_connections, incoming_config)
}
pub fn incoming(&mut self) -> impl Iterator<Item = peer::Peer> {
std::mem::take(&mut self.incoming_peers).into_iter()
}
pub fn step(&mut self) {
let mut frame_data_buf = [0; MAX_FRAME_SIZE];
while let Ok((frame_size, address)) = self.socket.recv_from(&mut frame_data_buf) {
use frame::serial::Serialize;
if let Some(frame) = frame::Frame::read(&frame_data_buf[ .. frame_size]) {
self.handle_frame(address, frame);
}
}
self.flush();
self.endpoints.retain(|_, endpoint| !endpoint.borrow().is_zombie());
self.incoming_peers.retain(|client| !client.is_zombie());
}
pub fn flush(&mut self) {
for (&address, endpoint) in self.endpoints.iter_mut() {
let ref mut data_sink = UdpFrameSink::new(&self.socket, address);
endpoint.borrow_mut().flush(data_sink);
}
}
pub fn address(&self) -> net::SocketAddr {
self.socket.local_addr().unwrap()
}
fn handle_frame(&mut self, address: net::SocketAddr, frame: frame::Frame) {
if let Some(endpoint) = self.endpoints.get_mut(&address) {
let ref mut data_sink = UdpFrameSink::new(&self.socket, address);
endpoint.borrow_mut().handle_frame(frame, data_sink);
} else {
if self.endpoints.len() < self.max_connections as usize {
let ref mut data_sink = UdpFrameSink::new(&self.socket, address);
let mut endpoint = endpoint::Endpoint::new(self.incoming_config.clone());
endpoint.handle_frame(frame, data_sink);
let endpoint_ref = Rc::new(RefCell::new(endpoint));
self.endpoints.insert(address, Rc::clone(&endpoint_ref));
self.incoming_peers.push(peer::Peer::new(address, endpoint_ref));
}
}
}
}