1use std::net::SocketAddr;
2use std::sync::{Arc, RwLock};
3use std::io::Write;
4use log::*;
5
6use palserializer::deserialize_be;
7use rsa::pkcs1::{DecodeRsaPublicKey, EncodeRsaPublicKey};
8use rsa::{RsaPrivateKey, RsaPublicKey};
9
10mod tcpserver;
11pub mod networking;
12pub mod srsa;
13
14use tcpserver::*;
15use networking::*;
16
17pub struct Node {
18 pub parent_public_key: Arc<RwLock<Option<RsaPublicKey>>>,
19 pub children: Arc<RwLock<Vec<RemoteNode>>>,
20 pub keypair: Arc<RwLock<(RsaPublicKey, RsaPrivateKey)>>,
21 pub server: Arc<RwLock<TcpServer>>,
22
23 pub client: Arc<RwLock<Option<TcpClient>>>,
24 fallback_address: Arc<RwLock<String>>,
25 next_origin: Arc<RwLock<Option<(RemoteNode, String)>>>,
26 listener_address: Arc<String>,
27 riddle_cache: Arc<RwLock<Vec<(SocketAddr, [u8; 512], RsaPublicKey)>>>,
28 events: Arc<RwLock<NodeEvents>>
29}
30
31#[derive(Clone)]
32pub struct RemoteNode {
33 pub public_key: RsaPublicKey,
34 pub address: Option<SocketAddr>,
35 pub listener_address: Option<String>
36}
37
38#[derive(Clone)]
39pub struct NodeEvents {
40 pub on_child_connect: Arc<dyn Fn(RemoteNode) + Send + Sync>,
41 pub on_child_disconnect: Arc<dyn Fn(RemoteNode) + Send + Sync>,
42 pub on_ready: Arc<dyn Fn() + Send + Sync>,
43 pub on_disconnect: Arc<dyn Fn() + Send + Sync>,
44 pub on_data: Arc<dyn Fn(RemoteNode, &[u8]) + Send + Sync>,
45}
46
47pub struct NodeOptions {
48 pub connect_to: Option<String>
49}
50
51impl Node {
52 pub fn new(listen_on: String, profile: (RsaPrivateKey, RsaPublicKey)) -> Node {
53
54 let (private_key, public_key) = profile;
55
56 debug!("Starting server on {}...", listen_on.clone());
57 let server = TcpServer::new(listen_on.clone()).unwrap();
58
59 Node {
60 parent_public_key: Arc::new(RwLock::new(None)),
61 children: Arc::new(RwLock::new(vec![])),
62 keypair: Arc::new(RwLock::new((public_key, private_key))),
63 server: Arc::new(RwLock::new(server)),
64 client: Arc::new(RwLock::new(None)),
65 fallback_address: Arc::new(RwLock::new(String::new())),
66 listener_address: Arc::new(listen_on),
67 next_origin: Arc::new(RwLock::new(None)),
68 riddle_cache: Arc::new(RwLock::new(vec![])),
69 events: Arc::new(RwLock::new(NodeEvents {
70 on_child_connect: Arc::new(|_| {}),
71 on_child_disconnect: Arc::new(|_| {}),
72 on_ready: Arc::new(|| {}),
73 on_disconnect: Arc::new(|| {}),
74 on_data: Arc::new(|_, _| {}),
75 }))
76 }
77 }
78
79 pub fn connect(&self, events: NodeEvents, options: NodeOptions) {
80 *self.events.write().unwrap() = events;
81
82 if let Some(connect_to) = options.connect_to {
83 debug!("Connecting to parent {}...", connect_to.clone());
84 self.client.write().unwrap().replace(TcpClient::new(connect_to.clone().as_str()).unwrap());
85
86 let client = Arc::clone(&self.client);
87 let client2 = Arc::clone(&self.client);
88 let client3 = Arc::clone(&self.client);
89 let keypair: Arc<RwLock<(RsaPublicKey, RsaPrivateKey)>> = Arc::clone(&self.keypair);
90 let keypair2: Arc<RwLock<(RsaPublicKey, RsaPrivateKey)>> = Arc::clone(&self.keypair);
91 let parent_public_key = Arc::clone(&self.parent_public_key);
92 let server = Arc::clone(&self.server);
93 let server3 = Arc::clone(&self.server);
94
95 let events = Arc::clone(&self.events);
96 let events3 = Arc::clone(&self.events);
97
98 let fallback_address = Arc::clone(&self.fallback_address);
99 let fallback_address3 = Arc::clone(&self.fallback_address);
100 let next_origin = Arc::clone(&self.next_origin);
101 let children = Arc::clone(&self.children);
102
103 let listener_address = Arc::clone(&self.listener_address);
104 let listener_address3 = Arc::clone(&self.listener_address);
105
106 let client_events = ClientEvents {
107 data: Arc::new(move |data| {
108 let client: Arc<RwLock<Option<TcpClient>>> = Arc::clone(&client);
109
110 let packet = Packet::from_bytes(data);
111
112 match packet.packet_type {
113 PacketType::RRequest => {
114 client.write().unwrap().as_mut().unwrap().send(
115 &Packet::new(PacketType::Error,
116 b"Riddle Request sent to a Client instead of Server".to_vec(),
117 vec![], vec![]
118 ).to_bytes()
119 ).unwrap();
120 warn!("Parent {} sent Riddle Request to a Client instead of Server",
121 Arc::clone(&client).read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap());
122 },
123 PacketType::RResponse => {
124 client.write().unwrap().as_mut().unwrap().send(
125 &Packet::new(PacketType::CReport,
126 palserializer::serialize_be(&[
127 srsa::decrypt(keypair.read().unwrap().1.clone(), packet.payload.to_vec()).unwrap().as_slice(),
128 listener_address.clone().as_bytes()
129 ]).unwrap(),
130 vec![], vec![]
131 ).to_bytes()
132 ).unwrap();
133 debug!("Got Riddle Response from Parent");
134 },
135 PacketType::CReport => {
136 client.write().unwrap().as_mut().unwrap().send(
137 &Packet::new(PacketType::Error,
138 b"Completion Report sent to a Client instead of Server".to_vec(),
139 vec![], vec![]
140 ).to_bytes()
141 ).unwrap();
142 warn!("Parent {} sent Completion Report to a Client instead of Server",
143 Arc::clone(&client).read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap());
144 },
145 PacketType::Validation => {
146 parent_public_key.write().as_mut().unwrap().replace(
147 RsaPublicKey::from_pkcs1_der(&packet.payload).unwrap()
148 );
149
150 debug!("Got Validation from Parent {}! Public Key: {}",
151 client.read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap(),
152 srsa::bytes_to_hex(&packet.payload));
153
154 (events.write().unwrap().on_ready)();
155
156 client.write().unwrap().as_mut().unwrap().send(
159 &Packet::new(PacketType::GetFallback,
160 vec![], vec![], vec![]
161 ).to_bytes()
162 ).unwrap();
163 },
164 PacketType::Error => {
165 error!("{} sent an Error packet: {}",
166 client.read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap(),
167 String::from_utf8(packet.payload).unwrap());
168 },
169 PacketType::GetFallback => {
170
171 if client.read().unwrap().is_none() {
172 client.write().unwrap().as_mut().unwrap().send(
173 &Packet::new(PacketType::Error,
174 b"Validation not received".to_vec(),
175 vec![], vec![]
176 ).to_bytes()
177 ).unwrap();
178
179 warn!("{} sent a GetFallback without a Validation",
180 client.read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap());
181
182 return;
183 }
184
185 *fallback_address.write().unwrap() = String::from_utf8(packet.payload).unwrap();
186
187 debug!("Got Fallback Address from Parent: {}",
188 fallback_address.read().unwrap());
189 },
190 PacketType::Message => {
191 debug!("Got Message from Parent");
192
193 if parent_public_key.read().unwrap().is_none() {
194 client.write().unwrap().as_mut().unwrap().send(
195 &Packet::new(PacketType::Error,
196 b"Validation not received".to_vec(),
197 vec![], vec![]
198 ).to_bytes()
199 ).unwrap();
200
201 warn!("{} sent a Message without a Validation",
202 client.read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap());
203
204 return;
205 }
206
207 let destination = RsaPublicKey::from_pkcs1_der(&packet.destination);
208 let source = RsaPublicKey::from_pkcs1_der(&packet.source);
209
210 if (destination.is_err() && &packet.destination != &vec![1, 1, 1, 1]) || source.is_err() {
211 warn!("{} sent a Message with a bad src/dst",
212 client.read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap());
213
214 return;
215 }
216
217 let source = source.unwrap();
218
219 if packet.destination.clone() == vec![1, 1, 1, 1] || destination.clone().unwrap() == keypair.read().unwrap().0.clone() {
220
221 debug!("Decrypting...");
222
223 let packet = packet.clone().decrypt(keypair.read().unwrap().1.clone());
224
225 (events.write().unwrap().on_data)(
226 RemoteNode {
227 address: None,
228 listener_address: None,
229 public_key: source.clone()
230 },
231 &packet.payload
232 );
233
234 }
235 if packet.destination.clone() == vec![1, 1, 1, 1] || destination.clone().unwrap() != keypair.read().unwrap().0.clone() {
236 debug!("Forwarding...");
237
238 server.write().unwrap().connections.write().unwrap().iter_mut().for_each(|x| {
239 x.write(&packet.to_bytes()).unwrap();
240 });
241 }
242
243 }
244 }
245 }),
246 connect: Arc::new(move || {
247
248 client2.write().unwrap().as_mut().unwrap().send(
251 &Packet::new(PacketType::RRequest,
252 keypair2.read().unwrap().0.clone().to_pkcs1_der().unwrap().to_vec(),
253 vec![], vec![]
254 ).to_bytes()
255 ).unwrap();
256
257 debug!("Sent Riddle Request to Parent {}",
258 client2.read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap());
259
260 }),
261 close: Arc::new(move || {
262
263 if fallback_address3.read().unwrap().as_bytes() == listener_address3.as_bytes() { debug!("I'm origin now");
265 *client3.write().unwrap() = Option::None;
266
267 if children.read().unwrap().len() > 0 {
268 next_origin.write().unwrap().replace((children.read().unwrap().first().unwrap().clone(), children.read().unwrap().first().unwrap().clone().listener_address.unwrap()));
269
270 server3.read().unwrap().connections.write().unwrap().iter_mut().for_each(|x| {
271 x.write(&Packet::new(PacketType::GetFallback,
272 children.read().unwrap().first().unwrap().clone().listener_address.unwrap().into_bytes(),
273 vec![], vec![]
274 ).to_bytes()).unwrap();
275 });
276 }
277
278
279 } else {
280 match client3.write().unwrap().as_mut().unwrap().reconnect(fallback_address3.read().unwrap().as_str()) {
281 Ok(_) => {
282 info!("Reconnected to fallback address {}", fallback_address3.read().unwrap());
283 },
284 Err(_) => {
285 error!("Failed to reconnect to fallback address {}", fallback_address3.read().unwrap());
286 }
287 }
288 }
289
290 (events3.write().unwrap().on_disconnect)();
291 })
292 };
293
294 let client = Arc::clone(&self.client);
295
296 client.write().unwrap().as_mut().unwrap().set_events(client_events);
297 client.read().unwrap().as_ref().unwrap().listen();
298 }
299
300 let client = Arc::clone(&self.client);
301 let connections = Arc::clone(&self.server).read().unwrap().connections.clone();
302 let keypair: Arc<RwLock<(RsaPublicKey, RsaPrivateKey)>> = Arc::clone(&self.keypair);
303 let children = Arc::clone(&self.children);
304 let children3 = Arc::clone(&self.children);
305 let riddle_cache = Arc::clone(&self.riddle_cache);
306 let events = Arc::clone(&self.events);
307 let events3 = Arc::clone(&self.events);
308 let next_origin = Arc::clone(&self.next_origin);
309 let next_origin3 = Arc::clone(&self.next_origin);
310
311 Arc::clone(&self.server).write().unwrap().listen(ServerEvents {
312 data: Arc::new(move |data, mut stream| {
313 let packet = Packet::from_bytes(data);
314
315 match packet.packet_type {
316 PacketType::RRequest => {
317 debug!("Got Riddle Request from {}",
318 stream.peer_addr().unwrap());
319
320 if riddle_cache.write().unwrap().iter().any(|x| x.0 == stream.peer_addr().unwrap()) {
321
322 warn!("{} already has a Riddle Request in the cache",
323 stream.peer_addr().unwrap());
324
325 stream.write(&Packet::new(PacketType::Error,
326 b"Riddle already sent".to_vec(),
327 vec![], vec![]
328 ).to_bytes()).unwrap();
329
330 return;
331 }
332
333 let mut riddle = [0; 512];
335 for i in 0..512 {
336 riddle[i] = rand::random::<u8>();
337 }
338
339 riddle_cache.write().unwrap().push((stream.peer_addr().unwrap(), riddle, RsaPublicKey::from_pkcs1_der(&packet.payload).unwrap()));
340
341 stream.write(&Packet::new(PacketType::RResponse,
342 srsa::encrypt(RsaPublicKey::from_pkcs1_der(&packet.payload).unwrap(), riddle.to_vec()).unwrap().to_vec(),
343 vec![], vec![]
344 ).to_bytes()).unwrap();
345
346 debug!("Sent Riddle Response to {}",
347 stream.peer_addr().unwrap());
348 },
349 PacketType::RResponse => {
350 stream.write(&Packet::new(PacketType::Error,
351 b"Riddle Response sent to a Server instead of Client".to_vec(),
352 vec![], vec![]
353 ).to_bytes()).unwrap();
354 warn!("{} sent Riddle Response to a Server instead of Client",
355 stream.peer_addr().unwrap());
356 },
357 PacketType::CReport => {
358 if !riddle_cache.read().unwrap().iter().any(|x| x.0 == stream.peer_addr().unwrap()) {
359
360 warn!("{} sent a Completion Report without a Riddle Request",
361 stream.peer_addr().unwrap());
362
363 stream.write(&Packet::new(PacketType::Error,
364 b"Riddle Request not sent".to_vec(),
365 vec![], vec![]
366 ).to_bytes()).unwrap();
367
368 return;
369 }
370
371 let cache_register = riddle_cache.read().unwrap().iter().find(|x| x.0 == stream.peer_addr().unwrap()).unwrap().clone();
372
373 let deserialized = deserialize_be(packet.payload.as_slice());
375
376 if deserialized.is_err() {
377 warn!("{} sent a Completion Report with a false Riddle Response",
378 stream.peer_addr().unwrap());
379
380 stream.write(&Packet::new(PacketType::Error,
381 b"Riddle Response false".to_vec(),
382 vec![], vec![]
383 ).to_bytes()).unwrap();
384
385 riddle_cache.write().unwrap().retain(|x| x.0 != stream.peer_addr().unwrap());
386
387 return;
388 }
389
390 if &cache_register.1 != deserialized.clone().unwrap()[0].as_slice() {
391
392 warn!("{} sent a Completion Report with an incorrect Riddle Response",
393 stream.peer_addr().unwrap());
394
395 stream.write(&Packet::new(PacketType::Error,
396 b"Riddle Response incorrect".to_vec(),
397 vec![], vec![]
398 ).to_bytes()).unwrap();
399
400 riddle_cache.write().unwrap().retain(|x| x.0 != stream.peer_addr().unwrap());
401
402 return;
403 }
404
405 debug!("{} sent a Completion Report with a correct Riddle Response",
406 stream.peer_addr().unwrap());
407
408 children.write().unwrap().push(RemoteNode {
409 address: Some(stream.peer_addr().unwrap()),
410 listener_address: Some(String::from_utf8(deserialized.clone().unwrap()[1].clone()).unwrap()),
411 public_key: cache_register.2.clone()
412 });
413
414 debug!("-1");
415
416 riddle_cache.write().unwrap().retain(|x| x.0 != stream.peer_addr().unwrap());
417
418 debug!("0");
419
420 (events.write().unwrap().on_child_connect)(RemoteNode {
421 address: Some(stream.peer_addr().unwrap()),
422 listener_address: Some(String::from_utf8(deserialized.clone().unwrap()[1].clone()).unwrap()),
423 public_key: cache_register.2.clone()
424 });
425
426 debug!("1");
427
428 if client.read().unwrap().is_none() && next_origin.read().unwrap().is_none() {
429 debug!("{}/{} is future origin",
430 stream.peer_addr().unwrap(), String::from_utf8(deserialized.clone().unwrap()[1].clone()).unwrap());
431 next_origin.write().unwrap().replace((RemoteNode {
432 address: None,
433 listener_address: None,
434 public_key: cache_register.2.clone()
435 }, String::from_utf8(deserialized.unwrap()[1].clone()).unwrap()));
436 }
437
438 stream.write(&Packet::new(PacketType::Validation,
439 keypair.read().unwrap().0.to_pkcs1_der().unwrap().to_vec(),
440 vec![], vec![]
441 ).to_bytes()).unwrap();
442 },
443 PacketType::Validation => {
444 stream.write(&Packet::new(PacketType::Error,
445 b"Validation sent to a Server instead of Client".to_vec(),
446 vec![], vec![]
447 ).to_bytes()).unwrap();
448 warn!("{} sent Validation to a Server instead of Client",
449 stream.peer_addr().unwrap());
450 },
451 PacketType::Error => {
452 error!("{} sent an Error packet: {}",
453 stream.peer_addr().unwrap(),
454 String::from_utf8(packet.payload).unwrap());
455 },
456 PacketType::GetFallback => {
457 if !children.read().unwrap().iter().any(|x| x.address == Some(stream.peer_addr().unwrap())) {
458
459 warn!("{} sent a Message without a Validation",
460 stream.peer_addr().unwrap());
461
462 stream.write(&Packet::new(PacketType::Error,
463 b"Validation not sent".to_vec(),
464 vec![], vec![]
465 ).to_bytes()).unwrap();
466
467 return;
468 }
469
470
471 if client.read().unwrap().is_some() {
473 stream.write(&Packet::new(PacketType::GetFallback,
474 client.read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap().to_string().into_bytes(),
475 vec![], vec![]
476 ).to_bytes()).unwrap();
477 } else { stream.write(&Packet::new(PacketType::GetFallback,
479 next_origin.read().unwrap().as_ref().unwrap().1.clone().into_bytes(),
480 vec![], vec![]
481 ).to_bytes()).unwrap();
482 }
483
484 },
485 PacketType::Message => {
486 debug!("Got Message from {}", stream.peer_addr().unwrap());
487
488 if !children.read().unwrap().iter().any(|x| x.address == Some(stream.peer_addr().unwrap())) {
489
490 warn!("{} sent a Message without a Validation",
491 stream.peer_addr().unwrap());
492
493 stream.write(&Packet::new(PacketType::Error,
494 b"Validation not sent".to_vec(),
495 vec![], vec![]
496 ).to_bytes()).unwrap();
497
498 return;
499 }
500
501 let destination = RsaPublicKey::from_pkcs1_der(&packet.destination);
502 let source = RsaPublicKey::from_pkcs1_der(&packet.source);
503
504 if (destination.is_err() && &packet.destination != &vec![1, 1, 1, 1]) || source.is_err() {
505 warn!("{} sent a Message with a bad src/dst",
506 stream.peer_addr().unwrap());
507
508 return;
509 }
510
511 if &packet.destination == &vec![1, 1, 1, 1] || &destination.clone().unwrap() == &keypair.read().unwrap().0 {
512 debug!("Decrypting...");
513
514 let packet = packet.clone().decrypt(keypair.read().unwrap().1.clone());
515
516 (events.write().unwrap().on_data)(
517 children.read().unwrap().iter().find(|x| x.address == Some(stream.peer_addr().unwrap())).unwrap().clone(),
518 &packet.payload
519 );
520
521 }
522 if &packet.destination == &vec![1, 1, 1, 1] || &destination.unwrap() != &keypair.read().unwrap().0 {
523 debug!("Forwarding...");
524
525 connections.write().unwrap().iter_mut().for_each(|x| {
526 if x.peer_addr().unwrap() != stream.peer_addr().unwrap() {
527 x.write(&packet.to_bytes()).unwrap();
528 }
529 });
530
531 if client.read().unwrap().is_some() {
532 client.write().unwrap().as_mut().unwrap().send(&packet.to_bytes()).unwrap();
533 }
534 }
535 }
536 }
537
538 }),
539 connect: Arc::new(move |stream| {
540 debug!("Client connected: {}", stream.peer_addr().unwrap());
541 }),
542 close: Arc::new(move |stream| {
543
544 let child = children3.read().unwrap().iter().find(|x| x.address == Some(stream.peer_addr().unwrap())).unwrap().clone();
545
546 children3.write().unwrap().retain(|x| x.address != Some(stream.peer_addr().unwrap()));
547
548 if &child.public_key == &next_origin3.read().unwrap().as_ref().unwrap().0.public_key {
549 debug!("Indended next origin disconnected");
550
551 *next_origin3.write().unwrap() = Option::None;
552 }
553
554 (events3.write().unwrap().on_child_disconnect)(child);
555 }),
556 });
557
558 if Arc::clone(&self.client).read().unwrap().is_none() {
559 (Arc::clone(&self.events).write().unwrap().on_ready)();
560 }
561 }
562
563 pub fn broadcast(&self, data: Vec<u8>) {
564 self.send(vec![1, 1, 1, 1], data);
565 }
566
567 pub fn send(&self, to: Vec<u8>, data: Vec<u8>) {
568 self.send_packet(Packet::new(PacketType::Message,
569 data.clone(),
570 self.keypair.read().unwrap().0.to_pkcs1_der().unwrap().to_vec(), to.clone()
571 ));
572 }
573
574 pub fn send_packet(&self, packet: Packet) {
575 self.server.write().unwrap().connections.write().unwrap().iter_mut().filter(|x| {
576 self.children.read().unwrap().iter().any(|y| Some(x.peer_addr().unwrap()) == y.address)
577 }).for_each(|x| {
578 x.write(&packet.to_bytes()).unwrap();
579 });
580
581 if self.client.read().unwrap().is_some() {
582 self.client.write().unwrap().as_mut().unwrap().send(
583 &packet.to_bytes()
584 ).unwrap();
585 }
586 }
587}