tox_core/relay/server/
server.rs

1/*! The implementation of relay server
2*/
3
4use tox_crypto::*;
5use tox_packet::onion::InnerOnionResponse;
6use crate::relay::server::client::Client;
7use tox_packet::relay::connection_id::ConnectionId;
8use crate::relay::links::*;
9use tox_packet::relay::*;
10
11use std::io::{Error, ErrorKind};
12use std::collections::HashMap;
13use std::net::{IpAddr, SocketAddr};
14use std::sync::Arc;
15use std::time::Instant;
16
17use futures::SinkExt;
18use futures::channel::mpsc;
19use tokio::sync::RwLock;
20
21/** A `Server` is a structure that holds connected clients, manages their links and handles
22their responses. Notice that there is no actual network code here, the `Server` accepts packets
23by value from `Server::handle_packet`, sends packets back to clients via
24`futures::sync::mpsc::UnboundedSender<Packet>` channel, accepts onion responses from
25`Server::handle_udp_onion_response` and sends onion requests via
26`futures::sync::mpsc::UnboundedSender<OnionRequest>` channel. The outer code should manage how to
27handshake connections, get packets from clients, pass them into `Server::handle_packet`, get onion
28responses from UPD socket and send them to `Server::handle_udp_onion_response`, create `mpsc`
29channels, take packets from `futures::sync::mpsc::UnboundedReceiver<Packet>` send them back
30to clients via network.
31*/
32#[derive(Default, Clone)]
33pub struct Server {
34    state: Arc<RwLock<ServerState>>,
35    // None if the server is not responsible to handle OnionRequests
36    onion_sink: Option<mpsc::Sender<(OnionRequest, SocketAddr)>>,
37}
38
39#[derive(Default)]
40struct ServerState {
41    pub connected_clients: HashMap<PublicKey, Client>,
42    pub keys_by_addr: HashMap<(IpAddr, /*port*/ u16), PublicKey>,
43}
44
45
46impl Server {
47    /** Create a new `Server` without onion
48    */
49    pub fn new() -> Server {
50        Server::default()
51    }
52    /** Create a new `Server` with onion
53    */
54    pub fn set_udp_onion_sink(&mut self, onion_sink: mpsc::Sender<(OnionRequest, SocketAddr)>) {
55        self.onion_sink = Some(onion_sink)
56    }
57    /** Insert the client into `connected_clients`. If `connected_clients`
58    contains a client with the same pk it will be terminated.
59    */
60    pub async fn insert(&self, client: Client) -> Result<(), Error> {
61        let mut state = self.state.write().await;
62
63        if state.connected_clients.contains_key(&client.pk()) {
64            self.shutdown_client_inner(&client.pk(), &mut state).await?;
65        }
66
67        state.keys_by_addr
68            .insert((client.ip_addr(), client.port()), client.pk());
69        state.connected_clients
70            .insert(client.pk(), client);
71
72        Ok(())
73    }
74    /** The main processing function. Call in on each incoming packet from connected and
75    handshaked client.
76    */
77    pub async fn handle_packet(&self, pk: &PublicKey, packet: Packet) -> Result<(), Error> {
78        match packet {
79            Packet::RouteRequest(packet) => self.handle_route_request(pk, &packet).await,
80            Packet::RouteResponse(packet) => self.handle_route_response(pk, &packet).await,
81            Packet::ConnectNotification(packet) => self.handle_connect_notification(pk, &packet).await,
82            Packet::DisconnectNotification(packet) => self.handle_disconnect_notification(pk, &packet).await,
83            Packet::PingRequest(packet) => self.handle_ping_request(pk, &packet).await,
84            Packet::PongResponse(packet) => self.handle_pong_response(pk, &packet).await,
85            Packet::OobSend(packet) => self.handle_oob_send(pk, packet).await,
86            Packet::OobReceive(packet) => self.handle_oob_receive(pk, &packet).await,
87            Packet::OnionRequest(packet) => self.handle_onion_request(pk, packet).await,
88            Packet::OnionResponse(packet) => self.handle_onion_response(pk, &packet).await,
89            Packet::Data(packet) => self.handle_data(pk, packet).await,
90        }
91    }
92    /** Send `OnionResponse` packet to the client by it's `std::net::IpAddr`.
93    */
94    pub async fn handle_udp_onion_response(&self, ip_addr: IpAddr, port: u16, payload: InnerOnionResponse) -> Result<(), Error> {
95        let state = self.state.read().await;
96        if let Some(client) = state.keys_by_addr.get(&(ip_addr, port)).and_then(|pk| state.connected_clients.get(pk)) {
97            client.send_onion_response(payload).await
98        } else {
99            Err(Error::new(ErrorKind::Other, "Cannot find client by ip_addr to send onion response"))
100        }
101    }
102    /** Gracefully shutdown client by pk, IP address and port. IP address and
103    port are used to ensure that the right client will be removed. If the client
104    with passed pk has different IP address or port it means that it was
105    recently reconnected and it shouldn't be removed by the old connection
106    finalization step. If IP address with port are correct remove it from the
107    list of connected clients. If there are any clients mutually linked to
108    current client, we send them corresponding `DisconnectNotification`.
109    */
110    pub async fn shutdown_client(&self, pk: &PublicKey, ip_addr: IpAddr, port: u16) -> Result<(), Error> {
111        let mut state = self.state.write().await;
112
113        // check that the client's address isn't changed
114        if let Some(client) = state.connected_clients.get(pk) {
115            if client.ip_addr() != ip_addr || client.port() != port {
116                return Err(Error::new(ErrorKind::Other, "Client with pk has different address"))
117            }
118        } else {
119            return Err(Error::new(ErrorKind::Other, "Cannot find client by pk to shutdown it"))
120        }
121
122        self.shutdown_client_inner(pk, &mut state).await
123    }
124
125    /** Actual shutdown is done here.
126    */
127    async fn shutdown_client_inner(&self, pk: &PublicKey, state: &mut ServerState) -> Result<(), Error> {
128        // remove client by pk from connected_clients
129        let client_a = if let Some(client) = state.connected_clients.remove(pk) {
130            client
131        } else {
132            return Err(Error::new(
133                ErrorKind::Other,
134                "Cannot find client by pk to shutdown it"
135            ))
136        };
137
138        state.keys_by_addr.remove(&(client_a.ip_addr(), client_a.port()));
139        let links = client_a.links();
140        for link in links.iter_links() {
141            match link.status {
142                LinkStatus::Registered => {
143                    // Current client is not linked in client_b
144                },
145                LinkStatus::Online => {
146                    let client_b_pk = link.pk;
147                    if let Some(client_b) = state.connected_clients.get_mut(&client_b_pk) {
148                        if let Some(a_id_in_client_b) = client_b.links().id_by_pk(pk) {
149                            // they are linked, we should notify client_b
150                            // link from client_b.links should be downgraded
151                            client_b.links_mut().downgrade(a_id_in_client_b);
152
153                            client_b.send_disconnect_notification(
154                                ConnectionId::from_index(a_id_in_client_b)
155                            ).await;
156                        }
157                    }
158                }
159            }
160        }
161
162        Ok(())
163    }
164    // Here start the impl of `handle_***` methods
165
166    async fn handle_route_request(&self, pk: &PublicKey, packet: &RouteRequest) -> Result<(), Error> {
167        let mut state = self.state.write().await;
168
169        // get client_a
170        let client_a =
171            if let Some(client) = state.connected_clients.get_mut(pk) {
172                client
173            } else {
174                return Err(Error::new(ErrorKind::Other, "RouteRequest: no such PK"))
175            };
176
177        if pk == &packet.pk {
178            // send RouteResponse(0) if client requests its own pk
179            return client_a.send_route_response(pk, ConnectionId::zero()).await
180        }
181
182        // check if client_a is already linked
183        if let Some(index) = client_a.links().id_by_pk(&packet.pk) {
184            // send RouteResponse if client was already linked to pk
185            return client_a.send_route_response(&packet.pk, ConnectionId::from_index(index)).await
186        }
187
188        // try to insert a new link
189        let b_id_in_client_a = if let Some(index) = client_a.links_mut().insert(&packet.pk) {
190            index
191        } else {
192            // send RouteResponse(0) if no space to insert new link
193            return client_a.send_route_response(&packet.pk, ConnectionId::zero()).await
194        };
195
196        client_a.send_route_response(&packet.pk, ConnectionId::from_index(b_id_in_client_a)).await?;
197
198        // get client_b
199        let client_b = if let Some(client) = state.connected_clients.get(&packet.pk) {
200            client
201        } else {
202            return Ok(())
203        };
204
205        // check if current pk is linked inside other_client
206        let a_id_in_client_b = if let Some(index) = client_b.links().id_by_pk(pk) {
207            index
208        } else {
209            // they are not linked
210            return Ok(())
211        };
212
213        // they are both linked, send RouteResponse and
214        // send each other ConnectNotification
215        // we don't care if connect notifications fail
216        let client_a = state.connected_clients.get_mut(pk).unwrap();
217        client_a.links_mut().upgrade(b_id_in_client_a);
218        client_a.send_connect_notification(ConnectionId::from_index(b_id_in_client_a)).await;
219
220        let client_b = state.connected_clients.get_mut(&packet.pk).unwrap();
221        client_b.links_mut().upgrade(a_id_in_client_b);
222        client_b.send_connect_notification(ConnectionId::from_index(a_id_in_client_b)).await;
223
224        Ok(())
225    }
226
227    async fn handle_route_response(&self, _pk: &PublicKey, _packet: &RouteResponse) -> Result<(), Error> {
228        Err(Error::new(ErrorKind::Other, "Client must not send RouteResponse to server"))
229    }
230
231    async fn handle_connect_notification(&self, _pk: &PublicKey, _packet: &ConnectNotification) -> Result<(), Error> {
232        // Although normally a client should not send ConnectNotification to server
233        //  we ignore it for backward compatibility
234        Ok(())
235    }
236    async fn handle_disconnect_notification(&self, pk: &PublicKey, packet: &DisconnectNotification) -> Result<(), Error> {
237        let index = if let Some(index) = packet.connection_id.index() {
238            index
239        } else {
240            return Err(Error::new(ErrorKind::Other, "DisconnectNotification: connection id is zero"))
241        };
242
243        let mut state = self.state.write().await;
244
245        // get client_a
246        let a_link = if let Some(client_a) = state.connected_clients.get_mut(pk) {
247            // unlink the link from client.links if any
248            if let Some(link) = client_a.links_mut().take(index) {
249                link
250            } else {
251                trace!("DisconnectNotification.connection_id is not linked for the client {:?}", pk);
252                // There is possibility that the first client disconnected but the second client
253                // haven't received DisconnectNotification yet and have sent yet another packet.
254                // In this case we don't want to throw an error and force disconnect the second client.
255                // TODO: failure can be used to return an error and handle it inside ServerProcessor
256                return Ok(())
257            }
258        } else {
259            return Err(Error::new(ErrorKind::Other, "DisconnectNotification: no such PK"))
260        };
261
262        match a_link.status {
263            LinkStatus::Registered => {
264                // Do nothing because
265                // client_b has not sent RouteRequest yet to connect to client_a
266                Ok(())
267            },
268            LinkStatus::Online => {
269                let client_b_pk = a_link.pk;
270                // get client_b
271                let client_b = if let Some(client) = state.connected_clients.get_mut(&client_b_pk) {
272                    client
273                } else  {
274                    // client_b is not connected to the server
275                    // so ignore DisconnectNotification
276                    return Ok(())
277                };
278                let a_id_in_client_b = if let Some(id) = client_b.links().id_by_pk(pk) {
279                    id
280                } else {
281                    // No a_id_in_client_b
282                    return Ok(())
283                };
284                // it is linked, we should notify client_b
285                // link from client_b.links should be downgraded
286                client_b.links_mut().downgrade(a_id_in_client_b);
287                client_b.send_disconnect_notification(ConnectionId::from_index(a_id_in_client_b)).await;
288                Ok(())
289            }
290        }
291    }
292
293    async fn handle_ping_request(&self, pk: &PublicKey, packet: &PingRequest) -> Result<(), Error> {
294        if packet.ping_id == 0 {
295            return Err(Error::new(ErrorKind::Other, "PingRequest.ping_id == 0"))
296        }
297        let state = self.state.read().await;
298        if let Some(client_a) = state.connected_clients.get(pk) {
299            client_a.send_pong_response(packet.ping_id).await
300        } else {
301            Err(Error::new(ErrorKind::Other, "PingRequest: no such PK"))
302        }
303    }
304
305    async fn handle_pong_response(&self, pk: &PublicKey, packet: &PongResponse) -> Result<(), Error> {
306        if packet.ping_id == 0 {
307            return Err(
308                Error::new(ErrorKind::Other,
309                    "PongResponse.ping_id == 0"
310            ))
311        }
312        let mut state = self.state.write().await;
313        if let Some(client_a) = state.connected_clients.get_mut(pk) {
314            if packet.ping_id == client_a.ping_id() {
315                client_a.set_last_pong_resp(Instant::now());
316
317                Ok(())
318            } else {
319                Err(Error::new(ErrorKind::Other, "PongResponse.ping_id does not match"))
320            }
321        } else {
322            Err(Error::new(ErrorKind::Other, "PongResponse: no such PK"))
323        }
324    }
325
326    async fn handle_oob_send(&self, pk: &PublicKey, packet: OobSend) -> Result<(), Error> {
327        if packet.data.is_empty() || packet.data.len() > 1024 {
328            return Err(Error::new(ErrorKind::Other, "OobSend wrong data length"))
329        }
330        let state = self.state.read().await;
331        if let Some(client_b) = state.connected_clients.get(&packet.destination_pk) {
332            client_b.send_oob(pk, packet.data).await;
333        }
334
335        Ok(())
336    }
337
338    async fn handle_oob_receive(&self, _pk: &PublicKey, _packet: &OobReceive) -> Result<(), Error> {
339        Err(Error::new(ErrorKind::Other, "Client must not send OobReceive to server"))
340    }
341
342    async fn handle_onion_request(&self, pk: &PublicKey, packet: OnionRequest) -> Result<(), Error> {
343        if let Some(ref onion_sink) = self.onion_sink {
344            let state = self.state.read().await;
345            if let Some(client) = state.connected_clients.get(&pk) {
346                let saddr = SocketAddr::new(client.ip_addr(), client.port());
347                let mut tx = onion_sink.clone();
348                tx // clone sink for 1 send only
349                    .send((packet, saddr)).await
350                    .map_err(|_| {
351                        // This may only happen if sink is gone
352                        // So cast SendError<T> to a corresponding std::io::Error
353                        Error::from(ErrorKind::UnexpectedEof)
354                    })
355            } else {
356               Err(Error::new(ErrorKind::Other, "OnionRequest: no such PK"))
357            }
358        } else {
359            // Ignore OnionRequest as the server is not connected to onion subsystem
360            Ok(())
361        }
362    }
363
364    async fn handle_onion_response(&self, _pk: &PublicKey, _packet: &OnionResponse) -> Result<(), Error> {
365        Err(Error::new(ErrorKind::Other, "Client must not send OnionResponse to server"))
366    }
367
368    async fn handle_data(&self, pk: &PublicKey, packet: Data) -> Result<(), Error> {
369        let index = if let Some(index) = packet.connection_id.index() {
370            index
371        } else {
372            return Err(Error::new(ErrorKind::Other, "Data: connection id is zero"))
373        };
374
375        let state = self.state.read().await;
376
377        // get client_a
378        let client_a = if let Some(client) = state.connected_clients.get(pk) {
379            client
380        } else  {
381            return Err(Error::new(ErrorKind::Other, "Data: no such PK"));
382        };
383
384        // get the link from client.links if any
385        let a_link = if let Some(link) = client_a.links().by_id(index) {
386            *link
387        } else {
388            trace!("Data.connection_id is not linked for the client {:?}", pk);
389            // There is possibility that the first client disconnected but the second client
390            // haven't received DisconnectNotification yet and have sent yet another packet.
391            // In this case we don't want to throw an error and force disconnect the second client.
392            // TODO: failure can be used to return an error and handle it inside ServerProcessor
393            return Ok(())
394        };
395
396        match a_link.status {
397            LinkStatus::Registered => {
398                // Do nothing because
399                // client_b has not sent RouteRequest yet to connect to client_a
400                Ok(())
401            },
402            LinkStatus::Online => {
403                let client_b_pk = a_link.pk;
404                // get client_b
405                let client_b = if let Some(client) = state.connected_clients.get(&client_b_pk) {
406                    client
407                } else  {
408                    // Do nothing because client_b is not connected to server
409                    return Ok(())
410                };
411                let a_id_in_client_b = if let Some(id) = client_b.links().id_by_pk(pk) {
412                    id
413                } else {
414                    // No a_id_in_client_b
415                    return Ok(())
416                };
417                // it is linked, we should send data to client_b
418                client_b.send_data(ConnectionId::from_index(a_id_in_client_b), packet.data).await
419            }
420        }
421    }
422
423    /** Remove timedout connected clients
424    */
425    async fn remove_timedout_clients(&self, state: &mut ServerState) -> Result<(), Error> {
426        let keys = state.connected_clients.iter()
427            .filter(|(_key, client)| client.is_pong_timedout())
428            .map(|(key, _client)| *key)
429            .collect::<Vec<PublicKey>>();
430
431        for key in keys {
432            // failure in removing one client should not affect other clients
433            self.shutdown_client_inner(&key, state).await.ok();
434        }
435
436        Ok(())
437    }
438
439    /** Send pings to all connected clients and terminate all timed out clients.
440    */
441    pub async fn send_pings(&self) -> Result<(), Error> {
442        let mut state = self.state.write().await;
443
444        self.remove_timedout_clients(&mut state).await?;
445
446        for client in state.connected_clients.values_mut() {
447            if client.is_ping_interval_passed() {
448                // failure in ping sending for one client should not affect other clients
449                client.send_ping_request().await.ok();
450            }
451        }
452
453        Ok(())
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460
461    use tox_packet::dht::CryptoData;
462    use tox_packet::ip_port::*;
463    use tox_packet::onion::*;
464    use crate::relay::server::{Client, Server};
465    use crate::relay::server::client::*;
466
467    use futures::channel::mpsc;
468    use futures::StreamExt;
469    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
470    use std::time::Duration;
471
472    use crate::time::*;
473
474    #[tokio::test]
475    async fn server_is_clonable() {
476        crypto_init().unwrap();
477        let server = Server::new();
478        let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
479        server.insert(client_1).await.unwrap();
480        let _cloned = server.clone();
481        // that's all.
482    }
483
484    /// A function that generates random keypair, random `std::net::IpAddr`,
485    /// random port, creates mpsc channel and returns created with them Client
486    fn create_random_client(saddr: SocketAddr) -> (Client, mpsc::Receiver<Packet>) {
487        crypto_init().unwrap();
488        let (client_pk, _) = gen_keypair();
489        let (tx, rx) = mpsc::channel(32);
490        let client = Client::new(tx, &client_pk, saddr.ip(), saddr.port());
491        (client, rx)
492    }
493
494    #[tokio::test]
495    async fn normal_communication_scenario() {
496        let server = Server::new();
497
498        let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
499        let client_pk_1 = client_1.pk();
500        let client_ip_addr_1 = client_1.ip_addr();
501        let client_port_1 = client_1.port();
502
503        // client 1 connects to the server
504        server.insert(client_1).await.unwrap();
505
506        let (client_2, rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
507        let client_pk_2 = client_2.pk();
508
509        // emulate send RouteRequest from client_1
510        server.handle_packet(&client_pk_1, Packet::RouteRequest(
511            RouteRequest { pk: client_pk_2 }
512        )).await.unwrap();
513
514        // the server should put RouteResponse into rx_1
515        let (packet, rx_1) = rx_1.into_future().await;
516        assert_eq!(packet.unwrap(), Packet::RouteResponse(
517            RouteResponse { pk: client_pk_2, connection_id: ConnectionId::from_index(0) }
518        ));
519
520        {
521            // check links
522            let state = server.state.read().await;
523
524            // check client_1.links[client_2] == Registered
525            let client_a = &state.connected_clients[&client_pk_1];
526            let link_id = client_a.links().id_by_pk(&client_pk_2).unwrap();
527            assert_eq!(client_a.links().by_id(link_id).unwrap().status, LinkStatus::Registered);
528        }
529
530        // client 2 connects to the server
531        server.insert(client_2).await.unwrap();
532
533        // emulate send RouteRequest from client_1 again
534        server.handle_packet(&client_pk_1, Packet::RouteRequest(
535            RouteRequest { pk: client_pk_2 }
536        )).await.unwrap();
537
538        // the server should put RouteResponse into rx_1
539        let (packet, rx_1) = rx_1.into_future().await;
540        assert_eq!(packet.unwrap(), Packet::RouteResponse(
541            RouteResponse { pk: client_pk_2, connection_id: ConnectionId::from_index(0) }
542        ));
543
544        {
545            // check links
546            let state = server.state.read().await;
547
548            // check client_2.links[client_1] == None
549            let client_b = &state.connected_clients[&client_pk_2];
550            assert!(client_b.links().id_by_pk(&client_pk_1).is_none());
551        }
552
553        // emulate send RouteRequest from client_2
554        server.handle_packet(&client_pk_2, Packet::RouteRequest(
555            RouteRequest { pk: client_pk_1 }
556        )).await.unwrap();
557
558        // the server should put RouteResponse into rx_2
559        let (packet, rx_2) = rx_2.into_future().await;
560        assert_eq!(packet.unwrap(), Packet::RouteResponse(
561            RouteResponse { pk: client_pk_1, connection_id: ConnectionId::from_index(0) }
562        ));
563        // AND
564        // the server should put ConnectNotification into rx_1
565        let (packet, _rx_1) = rx_1.into_future().await;
566        assert_eq!(packet.unwrap(), Packet::ConnectNotification(
567            ConnectNotification { connection_id: ConnectionId::from_index(0) }
568        ));
569        // AND
570        // the server should put ConnectNotification into rx_2
571        let (packet, rx_2) = rx_2.into_future().await;
572        assert_eq!(packet.unwrap(), Packet::ConnectNotification(
573            ConnectNotification { connection_id: ConnectionId::from_index(0) }
574        ));
575
576        {
577            // check links
578            let state = server.state.read().await;
579
580            // check client_1.links[client_2] == Online
581            let client_a = &state.connected_clients[&client_pk_1];
582            let link_id = client_a.links().id_by_pk(&client_pk_2).unwrap();
583            assert_eq!(client_a.links().by_id(link_id).unwrap().status, LinkStatus::Online);
584
585            // check client_2.links[client_1] == Online
586            let client_b = &state.connected_clients[&client_pk_2];
587            let link_id = client_b.links().id_by_pk(&client_pk_1).unwrap();
588            assert_eq!(client_a.links().by_id(link_id).unwrap().status, LinkStatus::Online);
589        }
590
591        // emulate send Data from client_1
592        server.handle_packet(&client_pk_1, Packet::Data(
593            Data {
594                connection_id: ConnectionId::from_index(0),
595                data: DataPayload::CryptoData(CryptoData {
596                    nonce_last_bytes: 42,
597                    payload: vec![42; 123],
598                }),
599            }
600        )).await.unwrap();
601
602        // the server should put Data into rx_2
603        let (packet, rx_2) = rx_2.into_future().await;
604        assert_eq!(packet.unwrap(), Packet::Data(
605            Data {
606                connection_id: ConnectionId::from_index(0),
607                data: DataPayload::CryptoData(CryptoData {
608                    nonce_last_bytes: 42,
609                    payload: vec![42; 123],
610                }),
611            }
612        ));
613
614        // emulate client_1 disconnected
615        server.shutdown_client(&client_pk_1, client_ip_addr_1, client_port_1).await.unwrap();
616        // the server should put DisconnectNotification into rx_2
617        let (packet, _rx_2) = rx_2.into_future().await;
618        assert_eq!(packet.unwrap(), Packet::DisconnectNotification(
619            DisconnectNotification { connection_id: ConnectionId::from_index(0) }
620        ));
621
622        // check client_2.links[client_1] == Registered
623        let state = server.state.read().await;
624        let client_b = &state.connected_clients[&client_pk_2];
625        assert_eq!(client_b.links().by_id(0).unwrap().status, LinkStatus::Registered);
626    }
627    #[tokio::test]
628    async fn handle_route_request() {
629        let server = Server::new();
630
631        let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
632        let client_pk_1 = client_1.pk();
633        server.insert(client_1).await.unwrap();
634
635        let (client_2, _rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
636        let client_pk_2 = client_2.pk();
637        server.insert(client_2).await.unwrap();
638
639        // emulate send RouteRequest from client_1
640        server.handle_packet(&client_pk_1, Packet::RouteRequest(
641            RouteRequest { pk: client_pk_2 }
642        )).await.unwrap();
643
644        // the server should put RouteResponse into rx_1
645        let (packet, _rx_1) = rx_1.into_future().await;
646        assert_eq!(packet.unwrap(), Packet::RouteResponse(
647            RouteResponse { pk: client_pk_2, connection_id: ConnectionId::from_index(0) }
648        ));
649
650        {
651            // check links
652            let state = server.state.read().await;
653
654            // check client_1.links[client_2] == Registered
655            let client_a = &state.connected_clients[&client_pk_1];
656            let link_id = client_a.links().id_by_pk(&client_pk_2).unwrap();
657            assert_eq!(client_a.links().by_id(link_id).unwrap().status, LinkStatus::Registered);
658
659            // check client_2.links[client_1] == None
660            let client_b = &state.connected_clients[&client_pk_2];
661            assert!(client_b.links().id_by_pk(&client_pk_1).is_none());
662        }
663    }
664    #[tokio::test]
665    async fn handle_route_request_to_itself() {
666        let server = Server::new();
667
668        let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
669        let client_pk_1 = client_1.pk();
670        server.insert(client_1).await.unwrap();
671
672        // emulate send RouteRequest from client_1
673        server.handle_packet(&client_pk_1, Packet::RouteRequest(
674            RouteRequest { pk: client_pk_1 }
675        )).await.unwrap();
676
677        // the server should put RouteResponse into rx_1
678        let (packet, _rx_1) = rx_1.into_future().await;
679        assert_eq!(packet.unwrap(), Packet::RouteResponse(
680            RouteResponse { pk: client_pk_1, connection_id: ConnectionId::zero() }
681        ));
682    }
683    #[tokio::test]
684    async fn handle_route_request_too_many_connections() {
685        let server = Server::new();
686
687        let (client_1, mut rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
688        let client_pk_1 = client_1.pk();
689        server.insert(client_1).await.unwrap();
690
691        // send 240 RouteRequest
692        for i in 0..240 {
693            let saddr = SocketAddr::new("1.2.3.4".parse().unwrap(), 12346 + u16::from(i));
694            let (other_client, _other_rx) = create_random_client(saddr);
695            let other_client_pk = other_client.pk();
696            server.insert(other_client).await.unwrap();
697
698            // emulate send RouteRequest from client_1
699            server.handle_packet(&client_pk_1, Packet::RouteRequest(
700                RouteRequest { pk: other_client_pk }
701            )).await.unwrap();
702
703            // the server should put RouteResponse into rx_1
704            let (packet, rx_1_nested) = rx_1.into_future().await;
705            assert_eq!(packet.unwrap(), Packet::RouteResponse(
706                RouteResponse { pk: other_client_pk, connection_id: ConnectionId::from_index(i) }
707            ));
708            rx_1 = rx_1_nested;
709        }
710        // and send one more again
711        let (other_client, _other_rx) = create_random_client("1.2.3.5:12345".parse().unwrap());
712        let other_client_pk = other_client.pk();
713        server.insert(other_client).await.unwrap();
714        // emulate send RouteRequest from client_1
715        server.handle_packet(&client_pk_1, Packet::RouteRequest(
716            RouteRequest { pk: other_client_pk }
717        )).await.unwrap();
718
719        // the server should put RouteResponse into rx_1
720        let (packet, _rx_1) = rx_1.into_future().await;
721        assert_eq!(packet.unwrap(), Packet::RouteResponse(
722            RouteResponse { pk: other_client_pk, connection_id: ConnectionId::zero() }
723        ));
724    }
725    #[tokio::test]
726    async fn handle_connect_notification() {
727        let server = Server::new();
728
729        let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
730        let client_pk_1 = client_1.pk();
731        server.insert(client_1).await.unwrap();
732
733        // emulate send ConnectNotification from client_1
734        let handle_res = server.handle_packet(&client_pk_1, Packet::ConnectNotification(
735            ConnectNotification { connection_id: ConnectionId::from_index(42) }
736        )).await;
737        assert!(handle_res.is_ok());
738    }
739    #[tokio::test]
740    async fn handle_disconnect_notification() {
741        let server = Server::new();
742
743        let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
744        let client_pk_1 = client_1.pk();
745        server.insert(client_1).await.unwrap();
746
747        let (client_2, rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
748        let client_pk_2 = client_2.pk();
749        server.insert(client_2).await.unwrap();
750
751        // emulate send RouteRequest from client_1
752        server.handle_packet(&client_pk_1, Packet::RouteRequest(
753            RouteRequest { pk: client_pk_2 }
754        )).await.unwrap();
755
756        // the server should put RouteResponse into rx_1
757        let (packet, rx_1) = rx_1.into_future().await;
758        assert_eq!(packet.unwrap(), Packet::RouteResponse(
759            RouteResponse { pk: client_pk_2, connection_id: ConnectionId::from_index(0) }
760        ));
761
762        // emulate send RouteRequest from client_2
763        server.handle_packet(&client_pk_2, Packet::RouteRequest(
764            RouteRequest { pk: client_pk_1 }
765        )).await.unwrap();
766
767        // the server should put RouteResponse into rx_2
768        let (packet, rx_2) = rx_2.into_future().await;
769        assert_eq!(packet.unwrap(), Packet::RouteResponse(
770            RouteResponse { pk: client_pk_1, connection_id: ConnectionId::from_index(0) }
771        ));
772        // AND
773        // the server should put ConnectNotification into rx_1
774        let (packet, rx_1) = rx_1.into_future().await;
775        assert_eq!(packet.unwrap(), Packet::ConnectNotification(
776            ConnectNotification { connection_id: ConnectionId::from_index(0) }
777        ));
778        // AND
779        // the server should put ConnectNotification into rx_2
780        let (packet, rx_2) = rx_2.into_future().await;
781        assert_eq!(packet.unwrap(), Packet::ConnectNotification(
782            ConnectNotification { connection_id: ConnectionId::from_index(0) }
783        ));
784
785        {
786            // check links
787            let state = server.state.read().await;
788
789            // check client_1.links[client_2] == Online
790            let client_a = &state.connected_clients[&client_pk_1];
791            let link_id = client_a.links().id_by_pk(&client_pk_2).unwrap();
792            assert_eq!(client_a.links().by_id(link_id).unwrap().status, LinkStatus::Online);
793
794            // check client_2.links[client_1] == Online
795            let client_b = &state.connected_clients[&client_pk_2];
796            let link_id = client_b.links().id_by_pk(&client_pk_1).unwrap();
797            assert_eq!(client_a.links().by_id(link_id).unwrap().status, LinkStatus::Online);
798        }
799
800        // emulate send DisconnectNotification from client_1
801        server.handle_packet(&client_pk_1, Packet::DisconnectNotification(
802            DisconnectNotification { connection_id: ConnectionId::from_index(0) }
803        )).await.unwrap();
804
805        // the server should put DisconnectNotification into rx_2
806        let (packet, _rx_2) = rx_2.into_future().await;
807        assert_eq!(packet.unwrap(), Packet::DisconnectNotification(
808            DisconnectNotification { connection_id: ConnectionId::from_index(0) }
809        ));
810
811        {
812            // check links
813            let state = server.state.read().await;
814
815            // check client_1.links[client_2] == None
816            let client_a = &state.connected_clients[&client_pk_1];
817            assert!(client_a.links().id_by_pk(&client_pk_2).is_none());
818
819            // check client_2.links[client_1] == Registered
820            let client_b = &state.connected_clients[&client_pk_2];
821            let link_id = client_b.links().id_by_pk(&client_pk_1).unwrap();
822            assert_eq!(client_b.links().by_id(link_id).unwrap().status, LinkStatus::Registered);
823        }
824
825        // emulate send DisconnectNotification from client_2
826        server.handle_packet(&client_pk_2, Packet::DisconnectNotification(
827            DisconnectNotification { connection_id: ConnectionId::from_index(0) }
828        )).await.unwrap();
829
830        {
831            // check links
832            let state = server.state.read().await;
833
834            // check client_2.links[client_1] == None
835            let client_b = &state.connected_clients[&client_pk_2];
836            assert!(client_b.links().id_by_pk(&client_pk_2).is_none());
837        }
838
839        // check that DisconnectNotification from client_2 did not put anything in client1.rx
840        // necessary to drop server so that rx.collect() can be finished
841        drop(server);
842        assert!(rx_1.collect::<Vec<_>>().await.is_empty());
843    }
844    #[tokio::test]
845    async fn handle_disconnect_notification_other_not_linked() {
846        let server = Server::new();
847
848        let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
849        let client_pk_1 = client_1.pk();
850        server.insert(client_1).await.unwrap();
851
852        let (client_2, rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
853        let client_pk_2 = client_2.pk();
854        server.insert(client_2).await.unwrap();
855
856        // emulate send RouteRequest from client_1
857        server.handle_packet(&client_pk_1, Packet::RouteRequest(
858            RouteRequest { pk: client_pk_2 }
859        )).await.unwrap();
860
861        // emulate send DisconnectNotification from client_1
862        let handle_res = server.handle_packet(&client_pk_1, Packet::DisconnectNotification(
863            DisconnectNotification { connection_id: ConnectionId::from_index(0) }
864        )).await;
865        assert!(handle_res.is_ok());
866
867        // check that packets from client_1 did not put anything in client2.rx
868        // necessary to drop server so that rx.collect() can be finished
869        drop(server);
870        assert!(rx_2.collect::<Vec<_>>().await.is_empty());
871    }
872    #[tokio::test]
873    async fn handle_disconnect_notification_0() {
874        crypto_init().unwrap();
875        let server = Server::new();
876
877        let (client_pk, _) = gen_keypair();
878
879        let handle_res = server.handle_packet(&client_pk, Packet::DisconnectNotification(
880            DisconnectNotification { connection_id: ConnectionId::zero() }
881        )).await;
882        assert!(handle_res.is_err());
883    }
884    #[tokio::test]
885    async fn handle_ping_request() {
886        let server = Server::new();
887
888        let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
889        let client_pk_1 = client_1.pk();
890        server.insert(client_1).await.unwrap();
891
892        // emulate send PingRequest from client_1
893        server.handle_packet(&client_pk_1, Packet::PingRequest(
894            PingRequest { ping_id: 42 }
895        )).await.unwrap();
896
897        // the server should put PongResponse into rx_1
898        let (packet, _rx_1) = rx_1.into_future().await;
899        assert_eq!(packet.unwrap(), Packet::PongResponse(
900            PongResponse { ping_id: 42 }
901        ));
902    }
903    #[tokio::test]
904    async fn handle_oob_send() {
905        let server = Server::new();
906
907        let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
908        let client_pk_1 = client_1.pk();
909        server.insert(client_1).await.unwrap();
910
911        let (client_2, rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
912        let client_pk_2 = client_2.pk();
913        server.insert(client_2).await.unwrap();
914
915        // emulate send OobSend from client_1
916        server.handle_packet(&client_pk_1, Packet::OobSend(
917            OobSend { destination_pk: client_pk_2, data: vec![13; 1024] }
918        )).await.unwrap();
919
920        // the server should put OobReceive into rx_2
921        let (packet, _rx_2) = rx_2.into_future().await;
922        assert_eq!(packet.unwrap(), Packet::OobReceive(
923            OobReceive { sender_pk: client_pk_1, data: vec![13; 1024] }
924        ));
925    }
926    #[tokio::test]
927    async fn handle_onion_request() {
928        crypto_init().unwrap();
929        let (udp_onion_sink, udp_onion_stream) = mpsc::channel(1);
930        let mut server = Server::new();
931        server.set_udp_onion_sink(udp_onion_sink);
932
933        let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
934        let client_pk_1 = client_1.pk();
935        let client_addr_1 = client_1.ip_addr();
936        let client_port_1 = client_1.port();
937        server.insert(client_1).await.unwrap();
938
939        let request = OnionRequest {
940            nonce: gen_nonce(),
941            ip_port: IpPort {
942                protocol: ProtocolType::TCP,
943                ip_addr: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
944                port: 12345,
945            },
946            temporary_pk: gen_keypair().0,
947            payload: vec![13; 170]
948        };
949        let handle_res = server
950            .handle_packet(&client_pk_1, Packet::OnionRequest(request.clone()))
951            .await;
952        assert!(handle_res.is_ok());
953
954        let (packet, _) = udp_onion_stream.into_future().await;
955        let (packet, saddr) = packet.unwrap();
956
957        assert_eq!(saddr.ip(), client_addr_1);
958        assert_eq!(saddr.port(), client_port_1);
959        assert_eq!(packet, request);
960    }
961    #[tokio::test]
962    async fn handle_udp_onion_response() {
963        let server = Server::new();
964
965        let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
966        let client_addr_1 = client_1.ip_addr();
967        let client_port_1 = client_1.port();
968        server.insert(client_1).await.unwrap();
969
970        let payload = InnerOnionResponse::OnionAnnounceResponse(OnionAnnounceResponse {
971            sendback_data: 12345,
972            nonce: gen_nonce(),
973            payload: vec![42; 123]
974        });
975        let handle_res = server
976            .handle_udp_onion_response(client_addr_1, client_port_1, payload.clone())
977            .await;
978        assert!(handle_res.is_ok());
979
980        let (packet, _) = rx_1.into_future().await;
981        assert_eq!(packet.unwrap(), Packet::OnionResponse(
982            OnionResponse { payload }
983        ));
984    }
985    #[tokio::test]
986    async fn insert_with_same_pk() {
987        let server = Server::new();
988
989        let (mut client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
990        let (mut client_2, rx_2) = create_random_client("1.2.3.4:12346".parse().unwrap());
991
992        // link client_1 with client_2
993        let index_1 = client_1.links_mut().insert(&client_2.pk()).unwrap();
994        assert!(client_1.links_mut().upgrade(index_1));
995        let index_2 = client_2.links_mut().insert(&client_1.pk()).unwrap();
996        assert!(client_2.links_mut().upgrade(index_2));
997
998        let client_pk_1 = client_1.pk();
999        let client_addr_3 = "1.2.3.4".parse().unwrap();
1000        let client_port_3 = 12347;
1001        let (tx_3, _rx_3) = mpsc::channel(32);
1002        let client_3 = Client::new(tx_3, &client_pk_1, client_addr_3, client_port_3);
1003
1004        server.insert(client_1).await.unwrap();
1005        server.insert(client_2).await.unwrap();
1006
1007        // replace client_1 with client_3
1008        server.insert(client_3).await.unwrap();
1009
1010        let (packet, _) = rx_2.into_future().await;
1011        assert_eq!(packet.unwrap(), Packet::DisconnectNotification(
1012            DisconnectNotification { connection_id: ConnectionId::from_index(index_2) }
1013        ));
1014
1015        let state = server.state.read().await;
1016        let client = &state.connected_clients[&client_pk_1];
1017
1018        assert_eq!(client.ip_addr(), client_addr_3);
1019        assert_eq!(client.port(), client_port_3);
1020    }
1021    #[tokio::test]
1022    async fn shutdown_other_not_linked() {
1023        let server = Server::new();
1024
1025        let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1026        let client_pk_1 = client_1.pk();
1027        let client_ip_addr_1 = client_1.ip_addr();
1028        let client_port_1 = client_1.port();
1029        server.insert(client_1).await.unwrap();
1030
1031        let (client_2, _rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
1032        let client_pk_2 = client_2.pk();
1033        server.insert(client_2).await.unwrap();
1034
1035        // emulate send RouteRequest from client_1
1036        server.handle_packet(&client_pk_1, Packet::RouteRequest(
1037            RouteRequest { pk: client_pk_2 }
1038        )).await.unwrap();
1039
1040        // the server should put RouteResponse into rx_1
1041        let (packet, _rx_1) = rx_1.into_future().await;
1042        assert_eq!(packet.unwrap(), Packet::RouteResponse(
1043            RouteResponse { pk: client_pk_2, connection_id: ConnectionId::from_index(0) }
1044        ));
1045
1046        // emulate shutdown
1047        let handle_res = server.shutdown_client(&client_pk_1, client_ip_addr_1, client_port_1).await;
1048        assert!(handle_res.is_ok());
1049    }
1050    #[tokio::test]
1051    async fn handle_data_other_not_linked() {
1052        let server = Server::new();
1053
1054        let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1055        let client_pk_1 = client_1.pk();
1056        server.insert(client_1).await.unwrap();
1057
1058        let (client_2, _rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
1059        let client_pk_2 = client_2.pk();
1060        server.insert(client_2).await.unwrap();
1061
1062        // emulate send RouteRequest from client_1
1063        server.handle_packet(&client_pk_1, Packet::RouteRequest(
1064            RouteRequest { pk: client_pk_2 }
1065        )).await.unwrap();
1066
1067        // the server should put RouteResponse into rx_1
1068        let (packet, _rx_1) = rx_1.into_future().await;
1069        assert_eq!(packet.unwrap(), Packet::RouteResponse(
1070            RouteResponse { pk: client_pk_2, connection_id: ConnectionId::from_index(0) }
1071        ));
1072
1073        // emulate send Data from client_1
1074        let handle_res = server.handle_packet(&client_pk_1, Packet::Data(
1075            Data {
1076                connection_id: ConnectionId::from_index(0),
1077                data: DataPayload::CryptoData(CryptoData {
1078                    nonce_last_bytes: 42,
1079                    payload: vec![42; 123],
1080                }),
1081            }
1082        )).await;
1083        assert!(handle_res.is_ok());
1084    }
1085    #[tokio::test]
1086    async fn handle_data_0() {
1087        crypto_init().unwrap();
1088        let server = Server::new();
1089
1090        let (client_pk, _) = gen_keypair();
1091
1092        let handle_res = server.handle_packet(&client_pk, Packet::Data(
1093            Data {
1094                connection_id: ConnectionId::zero(),
1095                data: DataPayload::CryptoData(CryptoData {
1096                    nonce_last_bytes: 42,
1097                    payload: vec![42; 123],
1098                }),
1099            }
1100        )).await;
1101        assert!(handle_res.is_err());
1102    }
1103
1104    ////////////////////////////////////////////////////////////////////////////////////////
1105    // Here be all handle_* tests with wrong args
1106    #[tokio::test]
1107    async fn handle_route_response() {
1108        let server = Server::new();
1109
1110        let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1111        let client_pk_1 = client_1.pk();
1112        server.insert(client_1).await.unwrap();
1113
1114        // emulate send RouteResponse from client_1
1115        let handle_res = server.handle_packet(&client_pk_1, Packet::RouteResponse(
1116            RouteResponse { pk: client_pk_1, connection_id: ConnectionId::from_index(42) }
1117        )).await;
1118        assert!(handle_res.is_err());
1119    }
1120    #[tokio::test]
1121    async fn handle_disconnect_notification_not_linked() {
1122        let server = Server::new();
1123
1124        let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1125        let client_pk_1 = client_1.pk();
1126        server.insert(client_1).await.unwrap();
1127
1128        // emulate send DisconnectNotification from client_1
1129        let handle_res = server.handle_packet(&client_pk_1, Packet::DisconnectNotification(
1130            DisconnectNotification { connection_id: ConnectionId::from_index(0) }
1131        )).await;
1132        assert!(handle_res.is_ok());
1133
1134        // Necessary to drop tx so that rx.collect() can be finished
1135        drop(server);
1136
1137        assert!(rx_1.collect::<Vec<_>>().await.is_empty());
1138    }
1139    #[tokio::test]
1140    async fn handle_ping_request_0() {
1141        let server = Server::new();
1142
1143        let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1144        let client_pk_1 = client_1.pk();
1145        server.insert(client_1).await.unwrap();
1146
1147        // emulate send PingRequest from client_1
1148        let handle_res = server.handle_packet(&client_pk_1, Packet::PingRequest(
1149            PingRequest { ping_id: 0 }
1150        )).await;
1151        assert!(handle_res.is_err());
1152    }
1153    #[tokio::test]
1154    async fn handle_pong_response_0() {
1155        let server = Server::new();
1156
1157        let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1158        let client_pk_1 = client_1.pk();
1159        server.insert(client_1).await.unwrap();
1160
1161        // emulate send PongResponse from client_1
1162        let handle_res = server.handle_packet(&client_pk_1, Packet::PongResponse(
1163            PongResponse { ping_id: 0 }
1164        )).await;
1165        assert!(handle_res.is_err());
1166    }
1167    #[tokio::test]
1168    async fn handle_oob_send_empty_data() {
1169        let server = Server::new();
1170
1171        let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1172        let client_pk_1 = client_1.pk();
1173        server.insert(client_1).await.unwrap();
1174
1175        let (client_2, _rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
1176        let client_pk_2 = client_2.pk();
1177        server.insert(client_2).await.unwrap();
1178
1179        // emulate send OobSend from client_1
1180        let handle_res = server.handle_packet(&client_pk_1, Packet::OobSend(
1181            OobSend { destination_pk: client_pk_2, data: vec![] }
1182        )).await;
1183        assert!(handle_res.is_err());
1184    }
1185    #[tokio::test]
1186    async fn handle_data_self_not_linked() {
1187        let server = Server::new();
1188
1189        let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1190        let client_pk_1 = client_1.pk();
1191        server.insert(client_1).await.unwrap();
1192
1193        // emulate send Data from client_1
1194        let handle_res = server.handle_packet(&client_pk_1, Packet::Data(
1195            Data {
1196                connection_id: ConnectionId::from_index(0),
1197                data: DataPayload::CryptoData(CryptoData {
1198                    nonce_last_bytes: 42,
1199                    payload: vec![42; 123],
1200                }),
1201            }
1202        )).await;
1203        assert!(handle_res.is_ok());
1204
1205        // Necessary to drop tx so that rx.collect() can be finished
1206        drop(server);
1207
1208        assert!(rx_1.collect::<Vec<_>>().await.is_empty());
1209    }
1210    #[tokio::test]
1211    async fn handle_oob_send_to_loooong_data() {
1212        let server = Server::new();
1213
1214        let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1215        let client_pk_1 = client_1.pk();
1216        server.insert(client_1).await.unwrap();
1217
1218        let (client_2, _rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
1219        let client_pk_2 = client_2.pk();
1220        server.insert(client_2).await.unwrap();
1221
1222        // emulate send OobSend from client_1
1223        let handle_res = server.handle_packet(&client_pk_1, Packet::OobSend(
1224            OobSend { destination_pk: client_pk_2, data: vec![42; 1024 + 1] }
1225        )).await;
1226        assert!(handle_res.is_err());
1227    }
1228    #[tokio::test]
1229    async fn handle_oob_recv() {
1230        let server = Server::new();
1231
1232        let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1233        let client_pk_1 = client_1.pk();
1234        server.insert(client_1).await.unwrap();
1235
1236        let (client_2, _rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
1237        let client_pk_2 = client_2.pk();
1238        server.insert(client_2).await.unwrap();
1239
1240        // emulate send OobReceive from client_1
1241        let handle_res = server.handle_packet(&client_pk_1, Packet::OobReceive(
1242            OobReceive { sender_pk: client_pk_2, data: vec![42; 1024] }
1243        )).await;
1244        assert!(handle_res.is_err());
1245    }
1246    #[tokio::test]
1247    async fn handle_onion_request_disabled_onion_loooong_data() {
1248        let server = Server::new();
1249
1250        let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1251        let client_pk_1 = client_1.pk();
1252        server.insert(client_1).await.unwrap();
1253
1254        let request = OnionRequest {
1255            nonce: gen_nonce(),
1256            ip_port: IpPort {
1257                protocol: ProtocolType::TCP,
1258                ip_addr: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
1259                port: 12345,
1260            },
1261            temporary_pk: gen_keypair().0,
1262            payload: vec![13; 1500]
1263        };
1264        let handle_res = server
1265            .handle_packet(&client_pk_1, Packet::OnionRequest(request))
1266            .await;
1267        assert!(handle_res.is_ok());
1268    }
1269    #[tokio::test]
1270    async fn handle_onion_response() {
1271        let server = Server::new();
1272
1273        let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1274        let client_pk_1 = client_1.pk();
1275        server.insert(client_1).await.unwrap();
1276
1277        let payload = InnerOnionResponse::OnionAnnounceResponse(OnionAnnounceResponse {
1278            sendback_data: 12345,
1279            nonce: gen_nonce(),
1280            payload: vec![42; 123]
1281        });
1282        let handle_res = server.handle_packet(&client_pk_1, Packet::OnionResponse(
1283            OnionResponse { payload }
1284        )).await;
1285        assert!(handle_res.is_err());
1286    }
1287    #[tokio::test]
1288    async fn handle_udp_onion_response_for_unknown_client() {
1289        crypto_init().unwrap();
1290        let (udp_onion_sink, _) = mpsc::channel(1);
1291        let mut server = Server::new();
1292        server.set_udp_onion_sink(udp_onion_sink);
1293
1294        let client_addr_1 = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4));
1295        let client_port_1 = 12345u16;
1296        let (client_pk_1, _) = gen_keypair();
1297        let (tx_1, _rx_1) = mpsc::channel(1);
1298        let client_1 = Client::new(tx_1, &client_pk_1, client_addr_1, client_port_1);
1299        server.insert(client_1).await.unwrap();
1300
1301        let client_addr_2 = IpAddr::V4(Ipv4Addr::new(5, 6, 7, 8));
1302        let client_port_2 = 54321u16;
1303
1304        let payload = InnerOnionResponse::OnionAnnounceResponse(OnionAnnounceResponse {
1305            sendback_data: 12345,
1306            nonce: gen_nonce(),
1307            payload: vec![42; 123]
1308        });
1309        let handle_res = server
1310            .handle_udp_onion_response(client_addr_2, client_port_2, payload)
1311            .await;
1312        assert!(handle_res.is_err());
1313    }
1314
1315    ////////////////////////////////////////////////////////////////////////////////////////
1316    // Here be all handle_* tests from PK or to PK not in connected clients list
1317    #[tokio::test]
1318    async fn handle_route_request_not_connected() {
1319        crypto_init().unwrap();
1320        let server = Server::new();
1321        let (client_pk_1, _) = gen_keypair();
1322        let (client_pk_2, _) = gen_keypair();
1323
1324        // emulate send RouteRequest from client_pk_1
1325        let handle_res = server.handle_packet(&client_pk_1, Packet::RouteRequest(
1326            RouteRequest { pk: client_pk_2 }
1327        )).await;
1328        assert!(handle_res.is_err());
1329    }
1330    #[tokio::test]
1331    async fn handle_disconnect_notification_not_connected() {
1332        crypto_init().unwrap();
1333        let server = Server::new();
1334        let (client_pk_1, _) = gen_keypair();
1335
1336        // emulate send DisconnectNotification from client_1
1337        let handle_res = server.handle_packet(&client_pk_1, Packet::DisconnectNotification(
1338            DisconnectNotification { connection_id: ConnectionId::from_index(42) }
1339        )).await;
1340        assert!(handle_res.is_err());
1341    }
1342    #[tokio::test]
1343    async fn handle_disconnect_notification_other_not_connected() {
1344        let server = Server::new();
1345
1346        let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1347        let client_pk_1 = client_1.pk();
1348        server.insert(client_1).await.unwrap();
1349
1350        let (client_pk_2, _) = gen_keypair();
1351
1352        // emulate send RouteRequest from client_1
1353        server.handle_packet(&client_pk_1, Packet::RouteRequest(
1354            RouteRequest { pk: client_pk_2 }
1355        )).await.unwrap();
1356
1357        // emulate send DisconnectNotification from client_1
1358        let handle_res = server.handle_packet(&client_pk_1, Packet::DisconnectNotification(
1359            DisconnectNotification { connection_id: ConnectionId::from_index(0) }
1360        )).await;
1361        assert!(handle_res.is_ok());
1362    }
1363    #[tokio::test]
1364    async fn handle_ping_request_not_connected() {
1365        crypto_init().unwrap();
1366        let server = Server::new();
1367        let (client_pk_1, _) = gen_keypair();
1368
1369        // emulate send PingRequest from client_1
1370        let handle_res = server.handle_packet(&client_pk_1, Packet::PingRequest(
1371            PingRequest { ping_id: 42 }
1372        )).await;
1373        assert!(handle_res.is_err());
1374    }
1375    #[tokio::test]
1376    async fn handle_pong_response_not_connected() {
1377        crypto_init().unwrap();
1378        let server = Server::new();
1379        let (client_pk_1, _) = gen_keypair();
1380
1381        // emulate send PongResponse from client_1
1382        let handle_res = server.handle_packet(&client_pk_1, Packet::PongResponse(
1383            PongResponse { ping_id: 42 }
1384        )).await;
1385        assert!(handle_res.is_err());
1386    }
1387    #[tokio::test]
1388    async fn handle_oob_send_not_connected() {
1389        crypto_init().unwrap();
1390        let server = Server::new();
1391        let (client_pk_1, _) = gen_keypair();
1392        let (client_pk_2, _) = gen_keypair();
1393
1394        // emulate send OobSend from client_1
1395        let handle_res = server.handle_packet(&client_pk_1, Packet::OobSend(
1396            OobSend { destination_pk: client_pk_2, data: vec![42; 1024] }
1397        )).await;
1398        assert!(handle_res.is_ok());
1399    }
1400    #[tokio::test]
1401    async fn handle_data_not_connected() {
1402        crypto_init().unwrap();
1403        let server = Server::new();
1404        let (client_pk_1, _) = gen_keypair();
1405
1406        // emulate send Data from client_1
1407        let handle_res = server.handle_packet(&client_pk_1, Packet::Data(
1408            Data {
1409                connection_id: ConnectionId::from_index(0),
1410                data: DataPayload::CryptoData(CryptoData {
1411                    nonce_last_bytes: 42,
1412                    payload: vec![42; 123],
1413                }),
1414            }
1415        )).await;
1416        assert!(handle_res.is_err());
1417    }
1418    #[tokio::test]
1419    async fn handle_data_other_not_connected() {
1420        let server = Server::new();
1421
1422        let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1423        let client_pk_1 = client_1.pk();
1424        server.insert(client_1).await.unwrap();
1425
1426        let (client_pk_2, _) = gen_keypair();
1427
1428        // emulate send RouteRequest from client_1
1429        server.handle_packet(&client_pk_1, Packet::RouteRequest(
1430            RouteRequest { pk: client_pk_2 }
1431        )).await.unwrap();
1432
1433        // the server should put RouteResponse into rx_1
1434        let (packet, _rx_1) = rx_1.into_future().await;
1435        assert_eq!(packet.unwrap(), Packet::RouteResponse(
1436            RouteResponse { pk: client_pk_2, connection_id: ConnectionId::from_index(0) }
1437        ));
1438
1439        // emulate send Data from client_1
1440        let handle_res = server.handle_packet(&client_pk_1, Packet::Data(
1441            Data {
1442                connection_id: ConnectionId::from_index(0),
1443                data: DataPayload::CryptoData(CryptoData {
1444                    nonce_last_bytes: 42,
1445                    payload: vec![42; 123],
1446                }),
1447            }
1448        )).await;
1449        assert!(handle_res.is_ok());
1450    }
1451    #[tokio::test]
1452    async fn shutdown_different_addr() {
1453        let server = Server::new();
1454
1455        let (client, _rx) = create_random_client("1.2.3.4:12345".parse().unwrap());
1456        let client_pk = client.pk();
1457        server.insert(client).await.unwrap();
1458
1459        // emulate shutdown
1460        let handle_res = server.shutdown_client(&client_pk, "1.2.3.4".parse().unwrap(), 12346).await;
1461        assert!(handle_res.is_err());
1462
1463        let state = server.state.read().await;
1464
1465        assert!(state.connected_clients.contains_key(&client_pk));
1466    }
1467    #[tokio::test]
1468    async fn shutdown_not_connected() {
1469        crypto_init().unwrap();
1470        let server = Server::new();
1471        let (client_pk, _) = gen_keypair();
1472        let client_ip_addr = "1.2.3.4".parse().unwrap();
1473        let client_port = 12345;
1474
1475        // emulate shutdown
1476        let handle_res = server.shutdown_client(&client_pk, client_ip_addr, client_port).await;
1477        assert!(handle_res.is_err());
1478    }
1479    #[tokio::test]
1480    async fn shutdown_inner_not_connected() {
1481        crypto_init().unwrap();
1482        let server = Server::new();
1483        let (client_pk, _) = gen_keypair();
1484
1485        let mut state = server.state.write().await;
1486
1487        // emulate shutdown
1488        let handle_res = server.shutdown_client_inner(&client_pk, &mut state).await;
1489        assert!(handle_res.is_err());
1490    }
1491    #[tokio::test]
1492    async fn shutdown_other_not_connected() {
1493        let server = Server::new();
1494
1495        let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1496        let client_pk_1 = client_1.pk();
1497        let client_ip_addr_1 = client_1.ip_addr();
1498        let client_port_1 = client_1.port();
1499        server.insert(client_1).await.unwrap();
1500
1501        let (client_pk_2, _) = gen_keypair();
1502
1503        // emulate send RouteRequest from client_1
1504        server.handle_packet(&client_pk_1, Packet::RouteRequest(
1505            RouteRequest { pk: client_pk_2 }
1506        )).await.unwrap();
1507
1508        // the server should put RouteResponse into rx_1
1509        let (packet, _rx_1) = rx_1.into_future().await;
1510        assert_eq!(packet.unwrap(), Packet::RouteResponse(
1511            RouteResponse { pk: client_pk_2, connection_id: ConnectionId::from_index(0) }
1512        ));
1513
1514        // emulate shutdown
1515        let handle_res = server.shutdown_client(&client_pk_1, client_ip_addr_1, client_port_1).await;
1516        assert!(handle_res.is_ok());
1517    }
1518    #[tokio::test]
1519    async fn send_anything_to_dropped_client() {
1520        let server = Server::new();
1521
1522        let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1523        let client_pk_1 = client_1.pk();
1524        server.insert(client_1).await.unwrap();
1525
1526        let (client_2, _rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
1527        let client_pk_2 = client_2.pk();
1528        server.insert(client_2).await.unwrap();
1529
1530        drop(rx_1);
1531
1532        // emulate send RouteRequest from client_1
1533        let handle_res = server.handle_packet(&client_pk_1, Packet::RouteRequest(
1534            RouteRequest { pk: client_pk_2 }
1535        )).await;
1536        assert!(handle_res.is_err())
1537    }
1538    #[tokio::test]
1539    async fn send_onion_request_to_dropped_stream() {
1540        crypto_init().unwrap();
1541        let (udp_onion_sink, udp_onion_stream) = mpsc::channel(1);
1542        let mut server = Server::new();
1543        server.set_udp_onion_sink(udp_onion_sink);
1544
1545        let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1546        let client_pk_1 = client_1.pk();
1547        server.insert(client_1).await.unwrap();
1548
1549        drop(udp_onion_stream);
1550
1551        // emulate send OnionRequest from client_1
1552        let request = OnionRequest {
1553            nonce: gen_nonce(),
1554            ip_port: IpPort {
1555                protocol: ProtocolType::TCP,
1556                ip_addr: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
1557                port: 12345,
1558            },
1559            temporary_pk: gen_keypair().0,
1560            payload: vec![13; 170]
1561        };
1562        let handle_res = server
1563            .handle_packet(&client_pk_1, Packet::OnionRequest(request))
1564            .await;
1565        assert!(handle_res.is_err());
1566    }
1567    #[tokio::test]
1568    async fn tcp_send_pings_test() {
1569        let server = Server::new();
1570
1571        // client #1
1572        let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1573        let pk_1 = client_1.pk();
1574        server.insert(client_1).await.unwrap();
1575
1576        // client #2
1577        let (client_2, rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
1578        let pk_2 = client_2.pk();
1579        server.insert(client_2).await.unwrap();
1580
1581        // client #3
1582        let (client_3, rx_3) = create_random_client("1.2.3.6:12345".parse().unwrap());
1583        let pk_3 = client_3.pk();
1584        server.insert(client_3).await.unwrap();
1585
1586
1587        tokio::time::pause();
1588        // time when all entries is needed to send PingRequest
1589        tokio::time::advance(TCP_PING_FREQUENCY + Duration::from_secs(1)).await;
1590
1591        let sender_res = server.send_pings().await;
1592        assert!(sender_res.is_ok());
1593
1594        let (packet, _rx_1) = rx_1.into_future().await;
1595        assert_eq!(packet.unwrap(), Packet::PingRequest(
1596            PingRequest { ping_id: server.state.read().await.connected_clients[&pk_1].ping_id() }
1597        ));
1598        let (packet, _rx_2) = rx_2.into_future().await;
1599        assert_eq!(packet.unwrap(), Packet::PingRequest(
1600            PingRequest { ping_id: server.state.read().await.connected_clients[&pk_2].ping_id() }
1601        ));
1602        let (packet, _rx_3) = rx_3.into_future().await;
1603        assert_eq!(packet.unwrap(), Packet::PingRequest(
1604            PingRequest { ping_id: server.state.read().await.connected_clients[&pk_3].ping_id() }
1605        ));
1606    }
1607    #[tokio::test]
1608    async fn tcp_send_remove_timedouts() {
1609        let server = Server::new();
1610
1611        // client #1
1612        let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1613        let pk_1 = client_1.pk();
1614        server.insert(client_1).await.unwrap();
1615
1616        // client #2
1617        let (client_2, _rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
1618        let pk_2 = client_2.pk();
1619        server.insert(client_2).await.unwrap();
1620
1621        // client #3
1622        let (mut client_3, _rx_3) = create_random_client("1.2.3.6:12345".parse().unwrap());
1623        let pk_3 = client_3.pk();
1624
1625        tokio::time::pause();
1626        // time when all entries is timedout and should be removed
1627        tokio::time::advance(TCP_PING_FREQUENCY + TCP_PING_TIMEOUT + Duration::from_secs(1)).await;
1628
1629        client_3.set_last_pong_resp(clock_now());
1630        server.insert(client_3).await.unwrap();
1631        let sender_res = server.send_pings().await;
1632        assert!(sender_res.is_ok());
1633
1634        assert!(!server.state.read().await.connected_clients.contains_key(&pk_1));
1635        assert!(!server.state.read().await.connected_clients.contains_key(&pk_2));
1636        assert!(server.state.read().await.connected_clients.contains_key(&pk_3));
1637    }
1638}