1use std::net::{TcpListener, TcpStream, UdpSocket, SocketAddr, Shutdown};
2use std::sync::{mpsc, Arc, Mutex};
3use std::{time, thread};
4use crate::hubmsg::*;
5use crate::hubclient::*;
6use crate::hubrouter::*;
7use serde::{Serialize, Deserialize};
8
9#[derive(Debug, Clone, Serialize,PartialEq, Deserialize)]
10pub enum HubServerConfig {
11 Offline, Announced, Network(u16), Localhost(u16), InterfaceV4((u16, [u8; 4]))
16}
17
18pub struct HubServerShared {
19 pub terminate:bool,
20 pub connections: Vec<(HubAddr, TcpStream)>
21}
22
23pub struct HubServer {
24 pub shared: Arc<Mutex<HubServerShared>>,
25 pub listen_address: Option<SocketAddr>,
26 pub listen_thread: Option<std::thread::JoinHandle<()>>,
27 pub announce_thread: Option<std::thread::JoinHandle<()>>,
28}
29
30impl HubServer {
31 pub fn start_hub_server(digest: Digest, config: &HubServerConfig, hub_router:&HubRouter) -> Option<HubServer> {
32
33 let (listen_address, announce) = match config {
34 HubServerConfig::Offline => return None,
35 HubServerConfig::Announced=>(SocketAddr::from(([0, 0, 0, 0], 0)),true),
36 HubServerConfig::Localhost(port) => (SocketAddr::from(([127, 0, 0, 1], *port)),false),
37 HubServerConfig::Network(port) => (SocketAddr::from(([0, 0, 0, 0], *port)),false),
38 HubServerConfig::InterfaceV4((port, ip)) => (SocketAddr::from((*ip, *port)),false),
39 };
40
41 let listener = if let Ok(listener) = TcpListener::bind(listen_address){listener}else{println!("start_hub_server bind server address");return None};
42 let listen_address = listener.local_addr().expect("Cannot get local address");
43
44 let tx_pump = hub_router.tx_pump.clone();
45 let routes = Arc::clone(&hub_router.routes);let shared = Arc::new(Mutex::new(HubServerShared{
47 connections:Vec::new(),
48 terminate:false
49 }));
50
51 let listen_thread = {
52 let routes = Arc::clone(&routes);
54 let shared = Arc::clone(&shared);
55 let digest = digest.clone();
56 std::thread::spawn(move || {
57 for tcp_stream in listener.incoming() {
58 let tcp_stream = tcp_stream.expect("Incoming stream failure");
59 let peer_addr = HubAddr::from_socket_addr(tcp_stream.peer_addr().expect("No peer address"));
60
61 if let Ok(mut shared) = shared.lock(){
62 if shared.terminate{
63 for (_, tcp_stream) in &mut shared.connections{
64 let _ = tcp_stream.shutdown(Shutdown::Both);
65 }
66 return
68 }
69 let tcp_stream = tcp_stream.try_clone().expect("Cannot clone tcp stream");
70 shared.connections.push((peer_addr, tcp_stream));
71 }
72
73 let (tx_write, rx_write) = mpsc::channel::<FromHubMsg>();
74 let tx_write_copy = tx_write.clone();
75 let _read_thread = {
77 let tx_pump = tx_pump.clone();
78 let digest = digest.clone();
79 let peer_addr = peer_addr.clone();
80 let mut tcp_stream = tcp_stream.try_clone().expect("Cannot clone tcp stream");
81 std::thread::spawn(move || {
83 loop {
84 match read_block_from_tcp_stream(&mut tcp_stream, digest.clone()) {
85 Ok(msg_buf) => {
86 let cth_msg: ToHubMsg = bincode::deserialize(&msg_buf).expect("read_thread hub message deserialize fail - should never happen");
87 tx_pump.send((peer_addr.clone(), cth_msg)).expect("tx_pump.send fails - should never happen");
88 }
89 Err(e) => {
90 let _ = tcp_stream.shutdown(Shutdown::Both);
91 let _ = tx_pump.send((peer_addr.clone(), ToHubMsg {
92 to: HubMsgTo::Hub,
93 msg: HubMsg::ConnectionError(e.clone())
94 })).expect("tx_pump.send fails - should never happen");
95 let _ = tx_write_copy.send(FromHubMsg{
97 from:peer_addr.clone(),
98 msg:HubMsg::ConnectionError(e)
99 });
100 return
101 }
102 }
103 }
104 })
105 };
106 let _write_thread = {
107 let digest = digest.clone();
108 let peer_addr = peer_addr.clone();
109 let tx_pump = tx_pump.clone();
110 let shared = Arc::clone(&shared);
111 let mut tcp_stream = tcp_stream.try_clone().expect("Cannot clone tcp stream");
112 std::thread::spawn(move || {
114 while let Ok(htc_msg) = rx_write.recv() {
115 match &htc_msg.msg{
116 HubMsg::ConnectionError(_)=>{ let _ = tcp_stream.shutdown(Shutdown::Both);
118 break
119 },
120 _=>()
121 }
122 let msg_buf = bincode::serialize(&htc_msg).expect("write_thread hub message serialize fail");
123 if let Err(e) = write_block_to_tcp_stream(&mut tcp_stream, &msg_buf, digest.clone()) {
124 let _ = tcp_stream.shutdown(Shutdown::Both);
126 tx_pump.send((peer_addr.clone(), ToHubMsg {
127 to: HubMsgTo::Hub,
128 msg: HubMsg::ConnectionError(e)
129 })).expect("tx_pump.send fails - should never happen");
130 }
131 }
132 if let Ok(mut shared) = shared.lock(){
134 while let Some(position) = shared.connections.iter().position(|(addr,_)| *addr == peer_addr){
135 shared.connections.remove(position);
136 }
137 }
138 })
139 };
140
141 if let Ok(mut routes) = routes.lock() {
142 routes.push(HubRoute {
143 route_type: HubRouteType::Unknown,
144 peer_addr: peer_addr.clone(),
145 tcp_stream: Some(tcp_stream),
146 tx_write: tx_write
147 })
148 };
149 }
150 })
151 };
152
153 let mut hub_server = HubServer {
154 shared: shared,
155 listen_address: Some(listen_address),
156 listen_thread: Some(listen_thread),
157 announce_thread: None
159 };
160
161 if announce{
162 hub_server.start_announce_server_default(digest.clone());
163 }
164
165 return Some(hub_server);
166 }
167
168 pub fn start_announce_server_default(&mut self, digest: Digest) {
169 self.start_announce_server(
170 digest,
171 SocketAddr::from(([0, 0, 0, 0], 0)),
172 SocketAddr::from(([255, 255, 255, 255], HUB_ANNOUNCE_PORT)),
173 SocketAddr::from(([127, 0, 0, 1], HUB_ANNOUNCE_PORT)),
174 )
175 }
176
177 pub fn start_announce_server(&mut self, digest: Digest, announce_bind: SocketAddr, announce_send: SocketAddr, announce_backup: SocketAddr) {
178 let listen_port = if let Some(listen_address) = self.listen_address{
179 listen_address.port()
180 }
181 else{
182 panic!("No port to announce")
183 };
184
185 let mut dwd = DigestWithData{
186 digest:digest,
187 data:listen_port as u64
188 };
189 dwd.digest.buf[0] ^= listen_port as u64;
190 dwd.digest.digest_cycle();
191
192 let digest_u8 = unsafe {std::mem::transmute::<DigestWithData, [u8; 26 * 8]>(dwd)};
193
194 let shared = Arc::clone(&self.shared);
195
196 let announce_thread = std::thread::spawn(move || {
197
198 let socket = UdpSocket::bind(announce_bind).expect("Server: Cannot bind announce port");
199 socket.set_broadcast(true).expect("Server: cannot set broadcast on announce ip");
200
201 let thread_sleep_time = time::Duration::from_millis(100);
202 loop {
203 if let Ok(shared) = shared.lock() {
204 if shared.terminate{
205 return
206 }
207 }
208 if let Err(_) = socket.send_to(&digest_u8, announce_send){
209 if let Err(_) = socket.send_to(&digest_u8, announce_backup){
210 println!("Cannot send to announce port");
211 return
212 }
213 }
214 thread::sleep(thread_sleep_time.clone());
215 }
216 });
217 self.announce_thread = Some(announce_thread);
218 }
219
220 pub fn terminate(&mut self){
221 if let Ok(mut shared) = self.shared.lock() {
222 shared.terminate = true;
223 }
224 if let Some(listen_address) = self.listen_address {
225 self.listen_address = None;
226 if let Ok(_) = TcpStream::connect(listen_address) {
228 self.listen_thread.take().expect("cant take listen thread").join().expect("cant join listen thread");
229 }
230 }
231 if self.announce_thread.is_some() {
233 self.announce_thread.take().expect("cant take announce thread").join().expect("cant join announce_thread thread");
234 }
235 }
236}