makepad_hub/
hubserver.rs

1use std::net::{TcpListener, TcpStream, UdpSocket, SocketAddr, Shutdown};
2use std::sync::{mpsc, Arc, Mutex};
3use std::{time, thread};
4use crate::hubmsg::*;
5use crate::hubclient::*;
6use crate::hubrouter::*;
7use serde::{Serialize, Deserialize};
8
9#[derive(Debug, Clone, Serialize,PartialEq, Deserialize)]
10pub enum HubServerConfig {
11    Offline, // no network connectivity
12    Announced, // 0.0.0.0:0
13    Network(u16), // 0.0.0.0:port
14    Localhost(u16), // 127.0.0.1:port
15    InterfaceV4((u16, [u8; 4]))
16}
17
18pub struct HubServerShared {
19    pub terminate:bool,
20    pub connections: Vec<(HubAddr, TcpStream)>
21}
22
23pub struct HubServer {
24    pub shared: Arc<Mutex<HubServerShared>>,
25    pub listen_address: Option<SocketAddr>,
26    pub listen_thread: Option<std::thread::JoinHandle<()>>,
27    pub announce_thread: Option<std::thread::JoinHandle<()>>,
28}
29
30impl HubServer {
31    pub fn start_hub_server(digest: Digest, config: &HubServerConfig, hub_router:&HubRouter) -> Option<HubServer> {
32        
33        let (listen_address, announce) = match config {
34            HubServerConfig::Offline => return None,
35            HubServerConfig::Announced=>(SocketAddr::from(([0, 0, 0, 0], 0)),true),
36            HubServerConfig::Localhost(port) => (SocketAddr::from(([127, 0, 0, 1], *port)),false),
37            HubServerConfig::Network(port) => (SocketAddr::from(([0, 0, 0, 0], *port)),false),
38            HubServerConfig::InterfaceV4((port, ip)) => (SocketAddr::from((*ip, *port)),false),
39        };
40
41        let listener = if let Ok(listener) = TcpListener::bind(listen_address){listener}else{println!("start_hub_server bind server address");return None};
42        let listen_address = listener.local_addr().expect("Cannot get local address");
43        
44        let tx_pump = hub_router.tx_pump.clone();
45        let routes = Arc::clone(&hub_router.routes);//Arc::new(Mutex::new(Vec::<HubServerConnection>::new()));
46        let shared = Arc::new(Mutex::new(HubServerShared{
47            connections:Vec::new(),
48            terminate:false
49        }));
50        
51        let listen_thread = {
52            //let hub_log = hub_log.clone();
53            let routes = Arc::clone(&routes);
54            let shared = Arc::clone(&shared);
55            let digest = digest.clone();
56            std::thread::spawn(move || {
57                for tcp_stream in listener.incoming() {
58                    let tcp_stream = tcp_stream.expect("Incoming stream failure");
59                    let peer_addr = HubAddr::from_socket_addr(tcp_stream.peer_addr().expect("No peer address"));
60
61                    if let Ok(mut shared) = shared.lock(){
62                        if shared.terminate{
63                            for (_, tcp_stream) in &mut shared.connections{
64                                let _ = tcp_stream.shutdown(Shutdown::Both);
65                            }
66                            // lets disconnect all our connections
67                            return
68                        }
69                        let tcp_stream = tcp_stream.try_clone().expect("Cannot clone tcp stream");
70                        shared.connections.push((peer_addr, tcp_stream));
71                    }
72                    
73                    let (tx_write, rx_write) = mpsc::channel::<FromHubMsg>();
74                    let tx_write_copy = tx_write.clone();
75                    // clone our transmit-to-pump
76                    let _read_thread = {
77                        let tx_pump = tx_pump.clone();
78                        let digest = digest.clone();
79                        let peer_addr = peer_addr.clone();
80                        let mut tcp_stream = tcp_stream.try_clone().expect("Cannot clone tcp stream");
81                        //let hub_log = hub_log.clone();
82                        std::thread::spawn(move || {
83                            loop {
84                                match read_block_from_tcp_stream(&mut tcp_stream, digest.clone()) {
85                                    Ok(msg_buf) => {
86                                        let cth_msg: ToHubMsg = bincode::deserialize(&msg_buf).expect("read_thread hub message deserialize fail - should never happen");
87                                        tx_pump.send((peer_addr.clone(), cth_msg)).expect("tx_pump.send fails - should never happen");
88                                    }
89                                    Err(e) => {
90                                        let _ = tcp_stream.shutdown(Shutdown::Both);
91                                        let _ = tx_pump.send((peer_addr.clone(), ToHubMsg {
92                                            to: HubMsgTo::Hub,
93                                            msg: HubMsg::ConnectionError(e.clone())
94                                        })).expect("tx_pump.send fails - should never happen");
95                                        // lets break rx write
96                                        let _ = tx_write_copy.send(FromHubMsg{
97                                            from:peer_addr.clone(),
98                                            msg:HubMsg::ConnectionError(e)
99                                        });
100                                        return
101                                    }
102                                }
103                            }
104                        })
105                    };
106                    let _write_thread = {
107                        let digest = digest.clone();
108                        let peer_addr = peer_addr.clone();
109                        let tx_pump = tx_pump.clone();
110                        let shared = Arc::clone(&shared);
111                        let mut tcp_stream = tcp_stream.try_clone().expect("Cannot clone tcp stream");
112                        //let hub_log = hub_log.clone();
113                        std::thread::spawn(move || {
114                            while let Ok(htc_msg) = rx_write.recv() {
115                                 match &htc_msg.msg{
116                                    HubMsg::ConnectionError(_)=>{ // we are closed by the read loop
117                                        let _ = tcp_stream.shutdown(Shutdown::Both);
118                                        break
119                                    },
120                                    _=>()
121                                }
122                                let msg_buf = bincode::serialize(&htc_msg).expect("write_thread hub message serialize fail");
123                                if let Err(e) = write_block_to_tcp_stream(&mut tcp_stream, &msg_buf, digest.clone()) {
124                                    // disconnect the socket and send shutdown
125                                    let _ = tcp_stream.shutdown(Shutdown::Both);
126                                    tx_pump.send((peer_addr.clone(), ToHubMsg {
127                                        to: HubMsgTo::Hub,
128                                        msg: HubMsg::ConnectionError(e)
129                                    })).expect("tx_pump.send fails - should never happen");
130                                }
131                            }
132                            // remove tx_write from our shared pool
133                            if let Ok(mut shared) = shared.lock(){
134                                while let Some(position) = shared.connections.iter().position(|(addr,_)| *addr == peer_addr){
135                                    shared.connections.remove(position);
136                                }
137                            }
138                        })
139                    };
140                    
141                    if let Ok(mut routes) = routes.lock() {
142                        routes.push(HubRoute {
143                            route_type: HubRouteType::Unknown,
144                            peer_addr: peer_addr.clone(),
145                            tcp_stream: Some(tcp_stream),
146                            tx_write: tx_write
147                        })
148                    };
149                }
150            })
151        };
152        
153        let mut hub_server = HubServer {
154            shared: shared,
155            listen_address: Some(listen_address),
156            listen_thread: Some(listen_thread),
157            //router_thread: Some(router_thread),
158            announce_thread: None
159        };
160        
161        if announce{
162            hub_server.start_announce_server_default(digest.clone());
163        }
164        
165        return Some(hub_server);
166    }
167
168    pub fn start_announce_server_default(&mut self, digest: Digest) {
169        self.start_announce_server(
170            digest,
171            SocketAddr::from(([0, 0, 0, 0], 0)),
172            SocketAddr::from(([255, 255, 255, 255], HUB_ANNOUNCE_PORT)),
173            SocketAddr::from(([127, 0, 0, 1], HUB_ANNOUNCE_PORT)),
174        )
175    }
176    
177    pub fn start_announce_server(&mut self, digest: Digest, announce_bind: SocketAddr, announce_send: SocketAddr, announce_backup: SocketAddr) {
178        let listen_port = if let Some(listen_address) = self.listen_address{
179            listen_address.port()
180        }
181        else{
182            panic!("No port to announce")
183        };
184        
185        let mut dwd = DigestWithData{
186            digest:digest,
187            data:listen_port as u64
188        };
189        dwd.digest.buf[0] ^= listen_port as u64;
190        dwd.digest.digest_cycle();
191        
192        let digest_u8 = unsafe {std::mem::transmute::<DigestWithData, [u8; 26 * 8]>(dwd)};
193
194        let shared = Arc::clone(&self.shared);
195
196        let announce_thread = std::thread::spawn(move || {
197            
198            let socket = UdpSocket::bind(announce_bind).expect("Server: Cannot bind announce port");
199            socket.set_broadcast(true).expect("Server: cannot set broadcast on announce ip");
200            
201            let thread_sleep_time = time::Duration::from_millis(100);
202            loop {
203                if let Ok(shared) = shared.lock() {
204                    if shared.terminate{
205                        return
206                    }
207                }
208                if let Err(_) = socket.send_to(&digest_u8, announce_send){
209                    if let Err(_) = socket.send_to(&digest_u8, announce_backup){
210                        println!("Cannot send to announce port");
211                        return
212                    }
213                }
214                thread::sleep(thread_sleep_time.clone());
215            }
216        });
217        self.announce_thread = Some(announce_thread);
218    }
219    
220    pub fn terminate(&mut self){
221        if let Ok(mut shared) = self.shared.lock() {
222            shared.terminate = true;
223        }
224        if let Some(listen_address) = self.listen_address {
225            self.listen_address = None;
226            // just do a single connection to the listen address to break the wait.
227            if let Ok(_) = TcpStream::connect(listen_address) {
228                self.listen_thread.take().expect("cant take listen thread").join().expect("cant join listen thread");
229            }
230        }
231        //self.router_thread.take().expect("cant take router thread").join().expect("cant join router thread");
232        if self.announce_thread.is_some() {
233            self.announce_thread.take().expect("cant take announce thread").join().expect("cant join announce_thread thread");
234        }
235    }
236}