makepad_hub/
hubrouter.rs

1use std::net::{TcpStream, Shutdown};
2use std::sync::{mpsc, Arc, Mutex};
3use crate::hubmsg::*;
4
5#[derive(PartialEq)]
6pub enum HubRouteType{
7    Unknown,
8    Builder(String),
9    Clone(String),
10    UI
11}
12
13// wraps a connection to the router either networked or direct.
14#[derive(Clone)]
15pub enum HubRouteSend {
16    Networked{
17        uid_alloc: Arc<Mutex<u64>>,
18        tx_write_arc: Arc<Mutex<Option<mpsc::Sender<ToHubMsg>>>>,
19        own_addr_arc: Arc<Mutex<Option<HubAddr>>>,
20    },
21    Direct{
22        uid_alloc: Arc<Mutex<u64>>,
23        tx_pump: mpsc::Sender<(HubAddr, ToHubMsg)>,
24        own_addr: HubAddr
25    }
26}
27
28// the connection send interface to the router
29impl HubRouteSend{
30
31    pub fn is_own_addr(&self, addr:&HubAddr)->bool{
32        match self{
33            HubRouteSend::Networked{own_addr_arc,..}=>{
34                if let Ok(own_addr) = own_addr_arc.lock(){
35                    if let Some(own_addr) = *own_addr{
36                        return own_addr == *addr
37                    }
38                }
39                //self.hub_log.log("HubUI - Warning, is_own_addr whilst disconnected from hub");
40                return false
41            },
42            HubRouteSend::Direct{own_addr,..}=>{
43                return *own_addr == *addr
44            }
45        }
46    }
47
48    pub fn alloc_uid(&mut self)->HubUid{
49        match self{
50            HubRouteSend::Networked{own_addr_arc,uid_alloc,..}=>{
51                let id = if let Ok(mut uid_alloc) = uid_alloc.lock(){
52                    *uid_alloc += 1;
53                    *uid_alloc
54                }
55                else{0};
56                if let Ok(own_addr) = own_addr_arc.lock(){
57                    if let Some(own_addr) = *own_addr{
58                        return HubUid{
59                            addr:own_addr,
60                            id: id
61                        }
62                    }
63                }
64            },
65            HubRouteSend::Direct{own_addr,uid_alloc,..}=>{
66                let id = if let Ok(mut uid_alloc) = uid_alloc.lock(){
67                    *uid_alloc += 1;
68                    *uid_alloc
69                }
70                else{0};
71                return HubUid{
72                    addr:own_addr.clone(),
73                    id: id
74                }
75            }
76        }
77        println!("HubUI - Warning, trying to alloc_uid whilst disconnected from hub");
78        return HubUid{
79            addr:HubAddr::None,
80            id: 0
81        }
82    }
83    
84     pub fn update_networked_in_place(&self, set_addr:Option<HubAddr>, tx_write:Option<mpsc::Sender<ToHubMsg>>){
85        match self{
86            HubRouteSend::Networked{own_addr_arc,tx_write_arc,..}=>{
87                if let Ok(mut own_addr) = own_addr_arc.lock(){
88                    *own_addr = set_addr
89                }
90                if let Ok(mut tx_write_arc) = tx_write_arc.lock(){
91                    *tx_write_arc = tx_write
92                }
93            },
94            HubRouteSend::Direct{..}=>{
95                panic!("update_inner_networked on direct route");
96            }
97        }
98    }
99    
100    
101    pub fn send(&self, msg:ToHubMsg){
102        match self{
103            HubRouteSend::Networked{tx_write_arc,..}=>{
104                if let Ok(tx_write) = tx_write_arc.lock(){
105                    if let Some(tx_write) = &*tx_write{
106                        tx_write.send(msg).expect("Cannot tx_write.send - unexpected");
107                    }//else{ // lets queue up
108                    //    self.hub_log.log("HubUI - Warning, trying to send messages whilst disconnected from hub");
109                   // }
110                }
111            },
112            HubRouteSend::Direct{tx_pump,own_addr,..}=>{
113                tx_pump.send((*own_addr, msg)).expect("Cannot tx_write.send - unexpected");
114            }
115        }
116    }
117}
118
119pub struct HubRoute {
120    pub peer_addr: HubAddr,
121    pub tx_write: mpsc::Sender<FromHubMsg>,
122    pub tcp_stream: Option<TcpStream>,
123    pub route_type: HubRouteType
124}
125
126pub struct HubRouter{
127    pub local_uid: u64,
128    pub tx_pump: mpsc::Sender<(HubAddr, ToHubMsg)>,
129    pub routes: Arc<Mutex<Vec<HubRoute>>>,
130    pub router_thread: Option<std::thread::JoinHandle<()>>,
131}
132
133impl HubRouter{
134    pub fn alloc_local_addr(&mut self)->HubAddr{
135        self.local_uid += 1;
136        return HubAddr::Local{uid:self.local_uid};
137    }
138
139    pub fn connect_direct(&mut self, route_type: HubRouteType, tx_write: mpsc::Sender<FromHubMsg>)->HubRouteSend{
140        let tx_pump = self.tx_pump.clone();
141        let own_addr = self.alloc_local_addr();
142        
143        if let Ok(mut routes) = self.routes.lock() {
144            routes.push(HubRoute {
145                route_type: route_type,
146                peer_addr: own_addr.clone(),
147                tcp_stream: None,
148                tx_write: tx_write
149            })
150        };
151        
152        HubRouteSend::Direct{
153            uid_alloc: Arc::new(Mutex::new(0)),
154            tx_pump: tx_pump,
155            own_addr: own_addr
156        }
157    }
158        
159    pub fn start_hub_router(hub_log:HubLog)->HubRouter{
160         let (tx_pump, rx_pump) = mpsc::channel::<(HubAddr, ToHubMsg)>();
161         let routes = Arc::new(Mutex::new(Vec::<HubRoute>::new()));
162         let router_thread = {
163            let hub_log = hub_log.clone();
164            let routes = Arc::clone(&routes);
165            std::thread::spawn(move || {
166                // ok we get inbound messages from the threads
167                while let Ok((from, cth_msg)) = rx_pump.recv() {
168                    let to = cth_msg.to;
169                    let htc_msg = FromHubMsg {
170                        from: from,
171                        msg: cth_msg.msg
172                    };
173                    // we got a message.. now lets route it elsewhere
174                    if let Ok(mut routes) = routes.lock() {
175                        hub_log.msg("HubServer sending", &htc_msg);
176                        
177                        if let Some(cid) = routes.iter().position( | c | c.peer_addr == htc_msg.from) {
178                            if routes[cid].route_type == HubRouteType::Unknown {
179                                match &htc_msg.msg {
180                                    HubMsg::ConnectBuilder(ws_name) => { // send it to all clients
181                                        let mut connection_refused = false;
182                                        for route in routes.iter() {
183                                            if let HubRouteType::Builder(existing_ws_name) = &route.route_type{
184                                                if *existing_ws_name == *ws_name{
185                                                    connection_refused = true;
186                                                    break;
187                                                }
188                                            }
189                                        }
190                                        if connection_refused{
191                                            println!("Already have a workspace by that name {}, disconnecting", ws_name);
192                                            if let Some(tcp_stream) = &mut routes[cid].tcp_stream{
193                                                let _ = tcp_stream.shutdown(Shutdown::Both);
194                                            }
195                                            routes.remove(cid);
196                                            continue;
197                                        }
198                                        routes[cid].route_type = HubRouteType::Builder(ws_name.to_string());
199                                    },
200                                    HubMsg::ConnectClone(ws_name)=>{
201                                        routes[cid].route_type = HubRouteType::Clone(ws_name.to_string());
202                                    },
203                                    HubMsg::ConnectUI => { // send it to all clients
204                                        routes[cid].route_type = HubRouteType::UI;
205                                    },
206                                    _ => {
207                                        println!("Router got message from unknown client {:?}, disconnecting", htc_msg.from);
208                                        if let Some(tcp_stream) = &mut routes[cid].tcp_stream{
209                                            let _ = tcp_stream.shutdown(Shutdown::Both);
210                                        }
211                                        routes.remove(cid);
212                                        continue;
213                                    }
214                                }
215                            }
216                        }
217                        
218                        match to {
219                            HubMsgTo::All => { // send it to all
220                                for route in routes.iter() {
221                                    if route.route_type != HubRouteType::Unknown {
222                                        route.tx_write.send(htc_msg.clone()).expect("Could not tx_write.send");
223                                    }
224                                }
225                            },
226                            HubMsgTo::Client(addr) => { // find our specific addr and send
227                                if let Some(route) = routes.iter().find( | c | c.peer_addr == addr) {
228                                    if route.route_type != HubRouteType::Unknown {
229                                        route.tx_write.send(htc_msg).expect("Could not tx_write.send");
230                                    }
231                                }
232                            },
233                            HubMsgTo::Builder(to_ws_name)=>{
234                                for route in routes.iter() {
235                                    match &route.route_type{
236                                        HubRouteType::Builder(ws_name)=>if to_ws_name == *ws_name{
237                                            route.tx_write.send(htc_msg.clone()).expect("Could not tx_write.send");
238                                        },
239                                        HubRouteType::Clone(ws_name)=>if to_ws_name == *ws_name{
240                                            route.tx_write.send(htc_msg.clone()).expect("Could not tx_write.send");
241                                        },
242                                        _=>()
243                                    }
244                                }
245                            },
246                            HubMsgTo::UI=>{
247                                for route in routes.iter() {
248                                    if route.route_type == HubRouteType::UI{
249                                        route.tx_write.send(htc_msg.clone()).expect("Could not tx_write.send");
250                                    }
251                                }
252                            },
253                            HubMsgTo::Hub => { // process queries on the hub
254                                match &htc_msg.msg {
255                                    HubMsg::ConnectionError(e) => {
256                                        // connection error, lets remove connection
257                                        if let Some(pos) = routes.iter().position( | c | c.peer_addr == htc_msg.from) {
258                                            hub_log.log(&format!("Server closing connection {:?} from error {:?}", htc_msg.from, e));
259                                            // let everyone know we lost something
260                                            let msg = FromHubMsg{
261                                                from:htc_msg.from,
262                                                msg: match &routes[pos].route_type{
263                                                    HubRouteType::Builder(ws_name)=>HubMsg::DisconnectBuilder(ws_name.clone()),
264                                                    HubRouteType::Clone(ws_name)=>HubMsg::DisconnectClone(ws_name.clone()),
265                                                    HubRouteType::UI=>HubMsg::DisconnectUI,
266                                                    HubRouteType::Unknown=>{
267                                                        continue
268                                                    }
269                                                }
270                                            };
271                                            routes.remove(pos);
272                                            for route in routes.iter() {
273                                                route.tx_write.send(msg.clone()).expect("Could not tx_write.send");
274                                            }
275                                        }
276                                    },
277                                    HubMsg::ListBuildersRequest{uid}=>{
278                                        let mut builders = Vec::new();
279                                        for route in routes.iter() {
280                                            match &route.route_type{
281                                                HubRouteType::Builder(ws_name)=>builders.push(ws_name.to_string()),
282                                                _=>()
283                                            }
284                                        }
285                                        // send it back to the caller
286                                        if let Some(route) = routes.iter().find( | c | c.peer_addr == htc_msg.from) {
287                                            route.tx_write.send(FromHubMsg{
288                                                from:htc_msg.from,
289                                                msg:HubMsg::ListBuildersResponse{
290                                                    uid:*uid,
291                                                    builders:builders
292                                                }
293                                            }).expect("Could not tx_write.send");
294                                        }
295                                    },
296                                    _ => ()
297                                }
298                                // return current connections
299                            }
300                        }
301                    }
302                }
303            })
304        };
305        
306        return HubRouter {
307            tx_pump: tx_pump,
308            router_thread: Some(router_thread),
309            local_uid: 1,
310            routes: routes
311        };
312    }
313}