1use std::net::{TcpStream, Shutdown};
2use std::sync::{mpsc, Arc, Mutex};
3use crate::hubmsg::*;
4
5#[derive(PartialEq)]
6pub enum HubRouteType{
7 Unknown,
8 Builder(String),
9 Clone(String),
10 UI
11}
12
13#[derive(Clone)]
15pub enum HubRouteSend {
16 Networked{
17 uid_alloc: Arc<Mutex<u64>>,
18 tx_write_arc: Arc<Mutex<Option<mpsc::Sender<ToHubMsg>>>>,
19 own_addr_arc: Arc<Mutex<Option<HubAddr>>>,
20 },
21 Direct{
22 uid_alloc: Arc<Mutex<u64>>,
23 tx_pump: mpsc::Sender<(HubAddr, ToHubMsg)>,
24 own_addr: HubAddr
25 }
26}
27
28impl HubRouteSend{
30
31 pub fn is_own_addr(&self, addr:&HubAddr)->bool{
32 match self{
33 HubRouteSend::Networked{own_addr_arc,..}=>{
34 if let Ok(own_addr) = own_addr_arc.lock(){
35 if let Some(own_addr) = *own_addr{
36 return own_addr == *addr
37 }
38 }
39 return false
41 },
42 HubRouteSend::Direct{own_addr,..}=>{
43 return *own_addr == *addr
44 }
45 }
46 }
47
48 pub fn alloc_uid(&mut self)->HubUid{
49 match self{
50 HubRouteSend::Networked{own_addr_arc,uid_alloc,..}=>{
51 let id = if let Ok(mut uid_alloc) = uid_alloc.lock(){
52 *uid_alloc += 1;
53 *uid_alloc
54 }
55 else{0};
56 if let Ok(own_addr) = own_addr_arc.lock(){
57 if let Some(own_addr) = *own_addr{
58 return HubUid{
59 addr:own_addr,
60 id: id
61 }
62 }
63 }
64 },
65 HubRouteSend::Direct{own_addr,uid_alloc,..}=>{
66 let id = if let Ok(mut uid_alloc) = uid_alloc.lock(){
67 *uid_alloc += 1;
68 *uid_alloc
69 }
70 else{0};
71 return HubUid{
72 addr:own_addr.clone(),
73 id: id
74 }
75 }
76 }
77 println!("HubUI - Warning, trying to alloc_uid whilst disconnected from hub");
78 return HubUid{
79 addr:HubAddr::None,
80 id: 0
81 }
82 }
83
84 pub fn update_networked_in_place(&self, set_addr:Option<HubAddr>, tx_write:Option<mpsc::Sender<ToHubMsg>>){
85 match self{
86 HubRouteSend::Networked{own_addr_arc,tx_write_arc,..}=>{
87 if let Ok(mut own_addr) = own_addr_arc.lock(){
88 *own_addr = set_addr
89 }
90 if let Ok(mut tx_write_arc) = tx_write_arc.lock(){
91 *tx_write_arc = tx_write
92 }
93 },
94 HubRouteSend::Direct{..}=>{
95 panic!("update_inner_networked on direct route");
96 }
97 }
98 }
99
100
101 pub fn send(&self, msg:ToHubMsg){
102 match self{
103 HubRouteSend::Networked{tx_write_arc,..}=>{
104 if let Ok(tx_write) = tx_write_arc.lock(){
105 if let Some(tx_write) = &*tx_write{
106 tx_write.send(msg).expect("Cannot tx_write.send - unexpected");
107 }}
111 },
112 HubRouteSend::Direct{tx_pump,own_addr,..}=>{
113 tx_pump.send((*own_addr, msg)).expect("Cannot tx_write.send - unexpected");
114 }
115 }
116 }
117}
118
119pub struct HubRoute {
120 pub peer_addr: HubAddr,
121 pub tx_write: mpsc::Sender<FromHubMsg>,
122 pub tcp_stream: Option<TcpStream>,
123 pub route_type: HubRouteType
124}
125
126pub struct HubRouter{
127 pub local_uid: u64,
128 pub tx_pump: mpsc::Sender<(HubAddr, ToHubMsg)>,
129 pub routes: Arc<Mutex<Vec<HubRoute>>>,
130 pub router_thread: Option<std::thread::JoinHandle<()>>,
131}
132
133impl HubRouter{
134 pub fn alloc_local_addr(&mut self)->HubAddr{
135 self.local_uid += 1;
136 return HubAddr::Local{uid:self.local_uid};
137 }
138
139 pub fn connect_direct(&mut self, route_type: HubRouteType, tx_write: mpsc::Sender<FromHubMsg>)->HubRouteSend{
140 let tx_pump = self.tx_pump.clone();
141 let own_addr = self.alloc_local_addr();
142
143 if let Ok(mut routes) = self.routes.lock() {
144 routes.push(HubRoute {
145 route_type: route_type,
146 peer_addr: own_addr.clone(),
147 tcp_stream: None,
148 tx_write: tx_write
149 })
150 };
151
152 HubRouteSend::Direct{
153 uid_alloc: Arc::new(Mutex::new(0)),
154 tx_pump: tx_pump,
155 own_addr: own_addr
156 }
157 }
158
159 pub fn start_hub_router(hub_log:HubLog)->HubRouter{
160 let (tx_pump, rx_pump) = mpsc::channel::<(HubAddr, ToHubMsg)>();
161 let routes = Arc::new(Mutex::new(Vec::<HubRoute>::new()));
162 let router_thread = {
163 let hub_log = hub_log.clone();
164 let routes = Arc::clone(&routes);
165 std::thread::spawn(move || {
166 while let Ok((from, cth_msg)) = rx_pump.recv() {
168 let to = cth_msg.to;
169 let htc_msg = FromHubMsg {
170 from: from,
171 msg: cth_msg.msg
172 };
173 if let Ok(mut routes) = routes.lock() {
175 hub_log.msg("HubServer sending", &htc_msg);
176
177 if let Some(cid) = routes.iter().position( | c | c.peer_addr == htc_msg.from) {
178 if routes[cid].route_type == HubRouteType::Unknown {
179 match &htc_msg.msg {
180 HubMsg::ConnectBuilder(ws_name) => { let mut connection_refused = false;
182 for route in routes.iter() {
183 if let HubRouteType::Builder(existing_ws_name) = &route.route_type{
184 if *existing_ws_name == *ws_name{
185 connection_refused = true;
186 break;
187 }
188 }
189 }
190 if connection_refused{
191 println!("Already have a workspace by that name {}, disconnecting", ws_name);
192 if let Some(tcp_stream) = &mut routes[cid].tcp_stream{
193 let _ = tcp_stream.shutdown(Shutdown::Both);
194 }
195 routes.remove(cid);
196 continue;
197 }
198 routes[cid].route_type = HubRouteType::Builder(ws_name.to_string());
199 },
200 HubMsg::ConnectClone(ws_name)=>{
201 routes[cid].route_type = HubRouteType::Clone(ws_name.to_string());
202 },
203 HubMsg::ConnectUI => { routes[cid].route_type = HubRouteType::UI;
205 },
206 _ => {
207 println!("Router got message from unknown client {:?}, disconnecting", htc_msg.from);
208 if let Some(tcp_stream) = &mut routes[cid].tcp_stream{
209 let _ = tcp_stream.shutdown(Shutdown::Both);
210 }
211 routes.remove(cid);
212 continue;
213 }
214 }
215 }
216 }
217
218 match to {
219 HubMsgTo::All => { for route in routes.iter() {
221 if route.route_type != HubRouteType::Unknown {
222 route.tx_write.send(htc_msg.clone()).expect("Could not tx_write.send");
223 }
224 }
225 },
226 HubMsgTo::Client(addr) => { if let Some(route) = routes.iter().find( | c | c.peer_addr == addr) {
228 if route.route_type != HubRouteType::Unknown {
229 route.tx_write.send(htc_msg).expect("Could not tx_write.send");
230 }
231 }
232 },
233 HubMsgTo::Builder(to_ws_name)=>{
234 for route in routes.iter() {
235 match &route.route_type{
236 HubRouteType::Builder(ws_name)=>if to_ws_name == *ws_name{
237 route.tx_write.send(htc_msg.clone()).expect("Could not tx_write.send");
238 },
239 HubRouteType::Clone(ws_name)=>if to_ws_name == *ws_name{
240 route.tx_write.send(htc_msg.clone()).expect("Could not tx_write.send");
241 },
242 _=>()
243 }
244 }
245 },
246 HubMsgTo::UI=>{
247 for route in routes.iter() {
248 if route.route_type == HubRouteType::UI{
249 route.tx_write.send(htc_msg.clone()).expect("Could not tx_write.send");
250 }
251 }
252 },
253 HubMsgTo::Hub => { match &htc_msg.msg {
255 HubMsg::ConnectionError(e) => {
256 if let Some(pos) = routes.iter().position( | c | c.peer_addr == htc_msg.from) {
258 hub_log.log(&format!("Server closing connection {:?} from error {:?}", htc_msg.from, e));
259 let msg = FromHubMsg{
261 from:htc_msg.from,
262 msg: match &routes[pos].route_type{
263 HubRouteType::Builder(ws_name)=>HubMsg::DisconnectBuilder(ws_name.clone()),
264 HubRouteType::Clone(ws_name)=>HubMsg::DisconnectClone(ws_name.clone()),
265 HubRouteType::UI=>HubMsg::DisconnectUI,
266 HubRouteType::Unknown=>{
267 continue
268 }
269 }
270 };
271 routes.remove(pos);
272 for route in routes.iter() {
273 route.tx_write.send(msg.clone()).expect("Could not tx_write.send");
274 }
275 }
276 },
277 HubMsg::ListBuildersRequest{uid}=>{
278 let mut builders = Vec::new();
279 for route in routes.iter() {
280 match &route.route_type{
281 HubRouteType::Builder(ws_name)=>builders.push(ws_name.to_string()),
282 _=>()
283 }
284 }
285 if let Some(route) = routes.iter().find( | c | c.peer_addr == htc_msg.from) {
287 route.tx_write.send(FromHubMsg{
288 from:htc_msg.from,
289 msg:HubMsg::ListBuildersResponse{
290 uid:*uid,
291 builders:builders
292 }
293 }).expect("Could not tx_write.send");
294 }
295 },
296 _ => ()
297 }
298 }
300 }
301 }
302 }
303 })
304 };
305
306 return HubRouter {
307 tx_pump: tx_pump,
308 router_thread: Some(router_thread),
309 local_uid: 1,
310 routes: routes
311 };
312 }
313}