nislib/
lib.rs

1use std::net::SocketAddr;
2use std::sync::{Arc, RwLock};
3use std::io::Write;
4use log::*;
5
6use palserializer::deserialize_be;
7use rsa::pkcs1::{DecodeRsaPublicKey, EncodeRsaPublicKey};
8use rsa::{RsaPrivateKey, RsaPublicKey};
9
10mod tcpserver;
11pub mod networking;
12pub mod srsa;
13
14use tcpserver::*;
15use networking::*;
16
17pub struct Node {
18    pub parent_public_key: Arc<RwLock<Option<RsaPublicKey>>>,
19    pub children: Arc<RwLock<Vec<RemoteNode>>>,
20    pub keypair: Arc<RwLock<(RsaPublicKey, RsaPrivateKey)>>,
21    pub server: Arc<RwLock<TcpServer>>,
22
23    pub client: Arc<RwLock<Option<TcpClient>>>,
24    fallback_address: Arc<RwLock<String>>,
25    next_origin: Arc<RwLock<Option<(RemoteNode, String)>>>,
26    listener_address: Arc<String>,
27    riddle_cache: Arc<RwLock<Vec<(SocketAddr, [u8; 512], RsaPublicKey)>>>,
28    events: Arc<RwLock<NodeEvents>>
29}
30
31#[derive(Clone)]
32pub struct RemoteNode {
33    pub public_key: RsaPublicKey,
34    pub address: Option<SocketAddr>,
35    pub listener_address: Option<String>
36}
37
38#[derive(Clone)]
39pub struct NodeEvents {
40    pub on_child_connect: Arc<dyn Fn(RemoteNode) + Send + Sync>,
41    pub on_child_disconnect: Arc<dyn Fn(RemoteNode) + Send + Sync>,
42    pub on_ready: Arc<dyn Fn() + Send + Sync>,
43    pub on_disconnect: Arc<dyn Fn() + Send + Sync>,
44    pub on_data: Arc<dyn Fn(RemoteNode, &[u8]) + Send + Sync>,
45}
46
47pub struct NodeOptions {
48    pub connect_to: Option<String>
49}
50
51impl Node {
52    pub fn new(listen_on: String, profile: (RsaPrivateKey, RsaPublicKey)) -> Node {
53
54        let (private_key, public_key) = profile;
55
56        debug!("Starting server on {}...", listen_on.clone());
57        let server = TcpServer::new(listen_on.clone()).unwrap();
58            
59        Node {
60            parent_public_key: Arc::new(RwLock::new(None)),
61            children: Arc::new(RwLock::new(vec![])),
62            keypair: Arc::new(RwLock::new((public_key, private_key))),
63            server: Arc::new(RwLock::new(server)),
64            client: Arc::new(RwLock::new(None)),
65            fallback_address: Arc::new(RwLock::new(String::new())),
66            listener_address: Arc::new(listen_on),
67            next_origin: Arc::new(RwLock::new(None)),
68            riddle_cache: Arc::new(RwLock::new(vec![])),
69            events: Arc::new(RwLock::new(NodeEvents {
70                on_child_connect: Arc::new(|_| {}),
71                on_child_disconnect: Arc::new(|_| {}),
72                on_ready: Arc::new(|| {}),
73                on_disconnect: Arc::new(|| {}),
74                on_data: Arc::new(|_, _| {}),
75            }))
76        }
77    }
78
79    pub fn connect(&self, events: NodeEvents, options: NodeOptions) {
80        *self.events.write().unwrap() = events;
81
82        if let Some(connect_to) = options.connect_to {
83            debug!("Connecting to parent {}...", connect_to.clone());
84            self.client.write().unwrap().replace(TcpClient::new(connect_to.clone().as_str()).unwrap());
85
86            let client = Arc::clone(&self.client);
87            let client2 = Arc::clone(&self.client);
88            let client3 = Arc::clone(&self.client);
89            let keypair: Arc<RwLock<(RsaPublicKey, RsaPrivateKey)>> = Arc::clone(&self.keypair);
90            let keypair2: Arc<RwLock<(RsaPublicKey, RsaPrivateKey)>> = Arc::clone(&self.keypair);
91            let parent_public_key = Arc::clone(&self.parent_public_key);
92            let server = Arc::clone(&self.server);
93            let server3 = Arc::clone(&self.server);
94
95            let events = Arc::clone(&self.events);
96            let events3 = Arc::clone(&self.events);
97
98            let fallback_address = Arc::clone(&self.fallback_address);
99            let fallback_address3 = Arc::clone(&self.fallback_address);
100            let next_origin = Arc::clone(&self.next_origin);
101            let children = Arc::clone(&self.children);
102
103            let listener_address = Arc::clone(&self.listener_address);
104            let listener_address3 = Arc::clone(&self.listener_address);
105
106            let client_events = ClientEvents {
107                data: Arc::new(move |data| {
108                    let client: Arc<RwLock<Option<TcpClient>>> = Arc::clone(&client);
109
110                    let packet = Packet::from_bytes(data);
111    
112                    match packet.packet_type {
113                        PacketType::RRequest => {
114                            client.write().unwrap().as_mut().unwrap().send(
115                                &Packet::new(PacketType::Error,
116                                    b"Riddle Request sent to a Client instead of Server".to_vec(),
117                                    vec![], vec![]
118                                ).to_bytes()
119                            ).unwrap();
120                            warn!("Parent {} sent Riddle Request to a Client instead of Server",
121                                Arc::clone(&client).read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap());
122                        },
123                        PacketType::RResponse => {
124                            client.write().unwrap().as_mut().unwrap().send(
125                                &Packet::new(PacketType::CReport,
126                                    palserializer::serialize_be(&[
127                                        srsa::decrypt(keypair.read().unwrap().1.clone(), packet.payload.to_vec()).unwrap().as_slice(),
128                                        listener_address.clone().as_bytes()
129                                    ]).unwrap(),
130                                    vec![], vec![]
131                                ).to_bytes()
132                            ).unwrap();
133                            debug!("Got Riddle Response from Parent");
134                        },
135                        PacketType::CReport => {
136                            client.write().unwrap().as_mut().unwrap().send(
137                                &Packet::new(PacketType::Error,
138                                    b"Completion Report sent to a Client instead of Server".to_vec(),
139                                    vec![], vec![]
140                                ).to_bytes()
141                            ).unwrap();
142                            warn!("Parent {} sent Completion Report to a Client instead of Server",
143                                Arc::clone(&client).read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap());
144                        },
145                        PacketType::Validation => {
146                            parent_public_key.write().as_mut().unwrap().replace(
147                                RsaPublicKey::from_pkcs1_der(&packet.payload).unwrap()
148                            );
149    
150                            debug!("Got Validation from Parent {}! Public Key: {}",
151                                client.read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap(),
152                                srsa::bytes_to_hex(&packet.payload));
153
154                            (events.write().unwrap().on_ready)();
155
156                            // Get fallback address
157
158                            client.write().unwrap().as_mut().unwrap().send(
159                                &Packet::new(PacketType::GetFallback,
160                                    vec![], vec![], vec![]
161                                ).to_bytes()
162                            ).unwrap();
163                        },
164                        PacketType::Error => {
165                            error!("{} sent an Error packet: {}",
166                                client.read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap(),
167                                String::from_utf8(packet.payload).unwrap());
168                        },
169                        PacketType::GetFallback => {
170
171                            if client.read().unwrap().is_none() {
172                                client.write().unwrap().as_mut().unwrap().send(
173                                    &Packet::new(PacketType::Error,
174                                        b"Validation not received".to_vec(),
175                                        vec![], vec![]
176                                    ).to_bytes()
177                                ).unwrap();
178    
179                                warn!("{} sent a GetFallback without a Validation",
180                                    client.read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap());
181
182                                return;
183                            }
184
185                            *fallback_address.write().unwrap() = String::from_utf8(packet.payload).unwrap();
186
187                            debug!("Got Fallback Address from Parent: {}",
188                                fallback_address.read().unwrap());
189                        },
190                        PacketType::Message => {
191                            debug!("Got Message from Parent");
192
193                            if parent_public_key.read().unwrap().is_none() {
194                                client.write().unwrap().as_mut().unwrap().send(
195                                    &Packet::new(PacketType::Error,
196                                        b"Validation not received".to_vec(),
197                                        vec![], vec![]
198                                    ).to_bytes()
199                                ).unwrap();
200    
201                                warn!("{} sent a Message without a Validation",
202                                    client.read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap());
203
204                                return;
205                            }
206
207                            let destination = RsaPublicKey::from_pkcs1_der(&packet.destination);
208                            let source = RsaPublicKey::from_pkcs1_der(&packet.source);
209
210                            if (destination.is_err() && &packet.destination != &vec![1, 1, 1, 1]) || source.is_err() {    
211                                warn!("{} sent a Message with a bad src/dst",
212                                    client.read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap());
213
214                                return;
215                            }
216
217                            let source = source.unwrap();
218
219                            if packet.destination.clone() == vec![1, 1, 1, 1] || destination.clone().unwrap() == keypair.read().unwrap().0.clone() {
220
221                                debug!("Decrypting...");
222
223                                let packet = packet.clone().decrypt(keypair.read().unwrap().1.clone());
224
225                                (events.write().unwrap().on_data)(
226                                    RemoteNode {
227                                        address: None,
228                                        listener_address: None,
229                                        public_key: source.clone()
230                                    },
231                                    &packet.payload
232                                );
233
234                            } 
235                            if packet.destination.clone() == vec![1, 1, 1, 1] || destination.clone().unwrap() != keypair.read().unwrap().0.clone() {
236                                debug!("Forwarding...");
237
238                                server.write().unwrap().connections.write().unwrap().iter_mut().for_each(|x| {
239                                    x.write(&packet.to_bytes()).unwrap();
240                                });
241                            }
242                            
243                        }
244                    }
245                }),
246                connect: Arc::new(move || {
247                
248                    // Nislib connection protocol: Riddle Request
249    
250                    client2.write().unwrap().as_mut().unwrap().send(
251                        &Packet::new(PacketType::RRequest,
252                            keypair2.read().unwrap().0.clone().to_pkcs1_der().unwrap().to_vec(),
253                            vec![], vec![]
254                        ).to_bytes()
255                    ).unwrap();
256    
257                    debug!("Sent Riddle Request to Parent {}",
258                        client2.read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap());
259                    
260                }),
261                close: Arc::new(move || {
262
263                    if fallback_address3.read().unwrap().as_bytes() == listener_address3.as_bytes() { // If you're next origin
264                        debug!("I'm origin now");
265                        *client3.write().unwrap() = Option::None;
266
267                        if children.read().unwrap().len() > 0 {
268                            next_origin.write().unwrap().replace((children.read().unwrap().first().unwrap().clone(), children.read().unwrap().first().unwrap().clone().listener_address.unwrap()));
269
270                            server3.read().unwrap().connections.write().unwrap().iter_mut().for_each(|x| {
271                                x.write(&Packet::new(PacketType::GetFallback,
272                                    children.read().unwrap().first().unwrap().clone().listener_address.unwrap().into_bytes(),
273                                    vec![], vec![]
274                                ).to_bytes()).unwrap();
275                            });
276                        }
277
278                        
279                    } else {
280                        match client3.write().unwrap().as_mut().unwrap().reconnect(fallback_address3.read().unwrap().as_str()) {
281                            Ok(_) => {
282                                info!("Reconnected to fallback address {}", fallback_address3.read().unwrap());
283                            },
284                            Err(_) => {
285                                error!("Failed to reconnect to fallback address {}", fallback_address3.read().unwrap());
286                            }
287                        }
288                    }
289    
290                    (events3.write().unwrap().on_disconnect)();
291                })
292            };
293
294            let client = Arc::clone(&self.client);
295
296            client.write().unwrap().as_mut().unwrap().set_events(client_events);
297            client.read().unwrap().as_ref().unwrap().listen();
298        }
299
300        let client = Arc::clone(&self.client);
301        let connections = Arc::clone(&self.server).read().unwrap().connections.clone();
302        let keypair: Arc<RwLock<(RsaPublicKey, RsaPrivateKey)>> = Arc::clone(&self.keypair);
303        let children = Arc::clone(&self.children);
304        let children3 = Arc::clone(&self.children);
305        let riddle_cache = Arc::clone(&self.riddle_cache);
306        let events = Arc::clone(&self.events);
307        let events3 = Arc::clone(&self.events);
308        let next_origin = Arc::clone(&self.next_origin);
309        let next_origin3 = Arc::clone(&self.next_origin);
310
311        Arc::clone(&self.server).write().unwrap().listen(ServerEvents {
312            data: Arc::new(move |data, mut stream| {
313                let packet = Packet::from_bytes(data);
314    
315                match packet.packet_type {
316                    PacketType::RRequest => {
317                        debug!("Got Riddle Request from {}",
318                                stream.peer_addr().unwrap());
319    
320                        if riddle_cache.write().unwrap().iter().any(|x| x.0 == stream.peer_addr().unwrap()) {
321    
322                            warn!("{} already has a Riddle Request in the cache",
323                                stream.peer_addr().unwrap());
324    
325                            stream.write(&Packet::new(PacketType::Error,
326                                b"Riddle already sent".to_vec(),
327                                vec![], vec![]
328                            ).to_bytes()).unwrap();
329    
330                            return;
331                        }
332    
333                        // Generate random riddle
334                        let mut riddle = [0; 512];
335                        for i in 0..512 {
336                            riddle[i] = rand::random::<u8>();
337                        }
338                        
339                        riddle_cache.write().unwrap().push((stream.peer_addr().unwrap(), riddle, RsaPublicKey::from_pkcs1_der(&packet.payload).unwrap()));
340    
341                        stream.write(&Packet::new(PacketType::RResponse,
342                            srsa::encrypt(RsaPublicKey::from_pkcs1_der(&packet.payload).unwrap(), riddle.to_vec()).unwrap().to_vec(),
343                            vec![], vec![]
344                        ).to_bytes()).unwrap();
345    
346                        debug!("Sent Riddle Response to {}",
347                                stream.peer_addr().unwrap());
348                    },
349                    PacketType::RResponse => {
350                        stream.write(&Packet::new(PacketType::Error,
351                            b"Riddle Response sent to a Server instead of Client".to_vec(),
352                            vec![], vec![]
353                        ).to_bytes()).unwrap();
354                        warn!("{} sent Riddle Response to a Server instead of Client",
355                                stream.peer_addr().unwrap());
356                    },
357                    PacketType::CReport => {
358                        if !riddle_cache.read().unwrap().iter().any(|x| x.0 == stream.peer_addr().unwrap()) {
359    
360                            warn!("{} sent a Completion Report without a Riddle Request",
361                                stream.peer_addr().unwrap());
362    
363                            stream.write(&Packet::new(PacketType::Error,
364                                b"Riddle Request not sent".to_vec(),
365                                vec![], vec![]
366                            ).to_bytes()).unwrap();
367    
368                            return;
369                        }
370    
371                        let cache_register = riddle_cache.read().unwrap().iter().find(|x| x.0 == stream.peer_addr().unwrap()).unwrap().clone();
372    
373                        // First: Riddle, Second: Server address
374                        let deserialized = deserialize_be(packet.payload.as_slice());
375
376                        if deserialized.is_err() {
377                            warn!("{} sent a Completion Report with a false Riddle Response",
378                                stream.peer_addr().unwrap());
379    
380                            stream.write(&Packet::new(PacketType::Error,
381                                b"Riddle Response false".to_vec(),
382                                vec![], vec![]
383                            ).to_bytes()).unwrap();
384    
385                            riddle_cache.write().unwrap().retain(|x| x.0 != stream.peer_addr().unwrap());
386    
387                            return;
388                        }
389
390                        if &cache_register.1 != deserialized.clone().unwrap()[0].as_slice() {
391    
392                            warn!("{} sent a Completion Report with an incorrect Riddle Response",
393                                stream.peer_addr().unwrap());
394    
395                            stream.write(&Packet::new(PacketType::Error,
396                                b"Riddle Response incorrect".to_vec(),
397                                vec![], vec![]
398                            ).to_bytes()).unwrap();
399    
400                            riddle_cache.write().unwrap().retain(|x| x.0 != stream.peer_addr().unwrap());
401    
402                            return;
403                        }
404    
405                        debug!("{} sent a Completion Report with a correct Riddle Response",
406                                stream.peer_addr().unwrap());
407    
408                        children.write().unwrap().push(RemoteNode {
409                            address: Some(stream.peer_addr().unwrap()),
410                            listener_address: Some(String::from_utf8(deserialized.clone().unwrap()[1].clone()).unwrap()),
411                            public_key: cache_register.2.clone()
412                        });
413
414                        debug!("-1");
415    
416                        riddle_cache.write().unwrap().retain(|x| x.0 != stream.peer_addr().unwrap());
417    
418                        debug!("0");
419
420                        (events.write().unwrap().on_child_connect)(RemoteNode {
421                            address: Some(stream.peer_addr().unwrap()),
422                            listener_address: Some(String::from_utf8(deserialized.clone().unwrap()[1].clone()).unwrap()),
423                            public_key: cache_register.2.clone()
424                        });
425
426                        debug!("1");
427
428                        if client.read().unwrap().is_none() && next_origin.read().unwrap().is_none() {
429                            debug!("{}/{} is future origin",
430                                stream.peer_addr().unwrap(), String::from_utf8(deserialized.clone().unwrap()[1].clone()).unwrap());
431                            next_origin.write().unwrap().replace((RemoteNode {
432                                address: None,
433                                listener_address: None,
434                                public_key: cache_register.2.clone()
435                            }, String::from_utf8(deserialized.unwrap()[1].clone()).unwrap()));
436                        }
437
438                        stream.write(&Packet::new(PacketType::Validation,
439                            keypair.read().unwrap().0.to_pkcs1_der().unwrap().to_vec(),
440                            vec![], vec![]
441                        ).to_bytes()).unwrap();
442                    },
443                    PacketType::Validation => {
444                        stream.write(&Packet::new(PacketType::Error,
445                            b"Validation sent to a Server instead of Client".to_vec(),
446                            vec![], vec![]
447                        ).to_bytes()).unwrap();
448                        warn!("{} sent Validation to a Server instead of Client",
449                                stream.peer_addr().unwrap());
450                    },
451                    PacketType::Error => {
452                        error!("{} sent an Error packet: {}",
453                                stream.peer_addr().unwrap(),
454                                String::from_utf8(packet.payload).unwrap());
455                    },
456                    PacketType::GetFallback => {
457                        if !children.read().unwrap().iter().any(|x| x.address == Some(stream.peer_addr().unwrap())) {
458    
459                            warn!("{} sent a Message without a Validation",
460                                stream.peer_addr().unwrap());
461    
462                            stream.write(&Packet::new(PacketType::Error,
463                                b"Validation not sent".to_vec(),
464                                vec![], vec![]
465                            ).to_bytes()).unwrap();
466    
467                            return;
468                        }
469
470
471                        // If you're not origin, respond with your parent
472                        if client.read().unwrap().is_some() {
473                            stream.write(&Packet::new(PacketType::GetFallback, 
474                                client.read().unwrap().as_ref().unwrap().stream.peer_addr().unwrap().to_string().into_bytes(),
475                                vec![], vec![]
476                            ).to_bytes()).unwrap();
477                        } else { // If you ARE origin, respond with next_origin
478                            stream.write(&Packet::new(PacketType::GetFallback,
479                                next_origin.read().unwrap().as_ref().unwrap().1.clone().into_bytes(),
480                                vec![], vec![]
481                            ).to_bytes()).unwrap();
482                        }
483
484                    },
485                    PacketType::Message => {
486                        debug!("Got Message from {}", stream.peer_addr().unwrap());
487
488                        if !children.read().unwrap().iter().any(|x| x.address == Some(stream.peer_addr().unwrap())) {
489    
490                            warn!("{} sent a Message without a Validation",
491                                stream.peer_addr().unwrap());
492    
493                            stream.write(&Packet::new(PacketType::Error,
494                                b"Validation not sent".to_vec(),
495                                vec![], vec![]
496                            ).to_bytes()).unwrap();
497    
498                            return;
499                        }
500
501                        let destination = RsaPublicKey::from_pkcs1_der(&packet.destination);
502                        let source = RsaPublicKey::from_pkcs1_der(&packet.source);
503
504                        if (destination.is_err() && &packet.destination != &vec![1, 1, 1, 1]) || source.is_err() {
505                            warn!("{} sent a Message with a bad src/dst",
506                                stream.peer_addr().unwrap());
507
508                            return;
509                        }
510    
511                        if &packet.destination == &vec![1, 1, 1, 1] || &destination.clone().unwrap() == &keypair.read().unwrap().0 {
512                            debug!("Decrypting...");
513
514                            let packet = packet.clone().decrypt(keypair.read().unwrap().1.clone());
515
516                            (events.write().unwrap().on_data)(
517                                children.read().unwrap().iter().find(|x| x.address == Some(stream.peer_addr().unwrap())).unwrap().clone(),
518                                &packet.payload
519                            );
520
521                        } 
522                        if &packet.destination == &vec![1, 1, 1, 1] || &destination.unwrap() != &keypair.read().unwrap().0 {
523                            debug!("Forwarding...");
524
525                            connections.write().unwrap().iter_mut().for_each(|x| {
526                                if x.peer_addr().unwrap() != stream.peer_addr().unwrap() {
527                                    x.write(&packet.to_bytes()).unwrap();
528                                }
529                            });
530
531                            if client.read().unwrap().is_some() {
532                                client.write().unwrap().as_mut().unwrap().send(&packet.to_bytes()).unwrap();
533                            }
534                        }
535                    }
536                }
537    
538            }),
539            connect: Arc::new(move |stream| {
540                debug!("Client connected: {}", stream.peer_addr().unwrap());
541            }),
542            close: Arc::new(move |stream| {
543                
544                let child = children3.read().unwrap().iter().find(|x| x.address == Some(stream.peer_addr().unwrap())).unwrap().clone();
545
546                children3.write().unwrap().retain(|x| x.address != Some(stream.peer_addr().unwrap()));
547
548                if &child.public_key == &next_origin3.read().unwrap().as_ref().unwrap().0.public_key {
549                    debug!("Indended next origin disconnected");
550                    
551                    *next_origin3.write().unwrap() = Option::None;
552                }
553
554                (events3.write().unwrap().on_child_disconnect)(child);
555            }),
556        });
557
558        if Arc::clone(&self.client).read().unwrap().is_none() {
559            (Arc::clone(&self.events).write().unwrap().on_ready)();
560        }
561    }
562
563    pub fn broadcast(&self, data: Vec<u8>) {
564        self.send(vec![1, 1, 1, 1], data);
565    }
566
567    pub fn send(&self, to: Vec<u8>, data: Vec<u8>) {
568        self.send_packet(Packet::new(PacketType::Message,
569            data.clone(),
570            self.keypair.read().unwrap().0.to_pkcs1_der().unwrap().to_vec(), to.clone()
571        ));
572    }
573
574    pub fn send_packet(&self, packet: Packet) {
575        self.server.write().unwrap().connections.write().unwrap().iter_mut().filter(|x| {
576            self.children.read().unwrap().iter().any(|y| Some(x.peer_addr().unwrap()) == y.address)
577        }).for_each(|x| {
578            x.write(&packet.to_bytes()).unwrap();
579        });
580
581        if self.client.read().unwrap().is_some() {
582            self.client.write().unwrap().as_mut().unwrap().send(
583                &packet.to_bytes()
584            ).unwrap();
585        }
586    }
587}