1use tox_crypto::*;
5use tox_packet::onion::InnerOnionResponse;
6use crate::relay::server::client::Client;
7use tox_packet::relay::connection_id::ConnectionId;
8use crate::relay::links::*;
9use tox_packet::relay::*;
10
11use std::io::{Error, ErrorKind};
12use std::collections::HashMap;
13use std::net::{IpAddr, SocketAddr};
14use std::sync::Arc;
15use std::time::Instant;
16
17use futures::SinkExt;
18use futures::channel::mpsc;
19use tokio::sync::RwLock;
20
21#[derive(Default, Clone)]
33pub struct Server {
34 state: Arc<RwLock<ServerState>>,
35 onion_sink: Option<mpsc::Sender<(OnionRequest, SocketAddr)>>,
37}
38
39#[derive(Default)]
40struct ServerState {
41 pub connected_clients: HashMap<PublicKey, Client>,
42 pub keys_by_addr: HashMap<(IpAddr, u16), PublicKey>,
43}
44
45
46impl Server {
47 pub fn new() -> Server {
50 Server::default()
51 }
52 pub fn set_udp_onion_sink(&mut self, onion_sink: mpsc::Sender<(OnionRequest, SocketAddr)>) {
55 self.onion_sink = Some(onion_sink)
56 }
57 pub async fn insert(&self, client: Client) -> Result<(), Error> {
61 let mut state = self.state.write().await;
62
63 if state.connected_clients.contains_key(&client.pk()) {
64 self.shutdown_client_inner(&client.pk(), &mut state).await?;
65 }
66
67 state.keys_by_addr
68 .insert((client.ip_addr(), client.port()), client.pk());
69 state.connected_clients
70 .insert(client.pk(), client);
71
72 Ok(())
73 }
74 pub async fn handle_packet(&self, pk: &PublicKey, packet: Packet) -> Result<(), Error> {
78 match packet {
79 Packet::RouteRequest(packet) => self.handle_route_request(pk, &packet).await,
80 Packet::RouteResponse(packet) => self.handle_route_response(pk, &packet).await,
81 Packet::ConnectNotification(packet) => self.handle_connect_notification(pk, &packet).await,
82 Packet::DisconnectNotification(packet) => self.handle_disconnect_notification(pk, &packet).await,
83 Packet::PingRequest(packet) => self.handle_ping_request(pk, &packet).await,
84 Packet::PongResponse(packet) => self.handle_pong_response(pk, &packet).await,
85 Packet::OobSend(packet) => self.handle_oob_send(pk, packet).await,
86 Packet::OobReceive(packet) => self.handle_oob_receive(pk, &packet).await,
87 Packet::OnionRequest(packet) => self.handle_onion_request(pk, packet).await,
88 Packet::OnionResponse(packet) => self.handle_onion_response(pk, &packet).await,
89 Packet::Data(packet) => self.handle_data(pk, packet).await,
90 }
91 }
92 pub async fn handle_udp_onion_response(&self, ip_addr: IpAddr, port: u16, payload: InnerOnionResponse) -> Result<(), Error> {
95 let state = self.state.read().await;
96 if let Some(client) = state.keys_by_addr.get(&(ip_addr, port)).and_then(|pk| state.connected_clients.get(pk)) {
97 client.send_onion_response(payload).await
98 } else {
99 Err(Error::new(ErrorKind::Other, "Cannot find client by ip_addr to send onion response"))
100 }
101 }
102 pub async fn shutdown_client(&self, pk: &PublicKey, ip_addr: IpAddr, port: u16) -> Result<(), Error> {
111 let mut state = self.state.write().await;
112
113 if let Some(client) = state.connected_clients.get(pk) {
115 if client.ip_addr() != ip_addr || client.port() != port {
116 return Err(Error::new(ErrorKind::Other, "Client with pk has different address"))
117 }
118 } else {
119 return Err(Error::new(ErrorKind::Other, "Cannot find client by pk to shutdown it"))
120 }
121
122 self.shutdown_client_inner(pk, &mut state).await
123 }
124
125 async fn shutdown_client_inner(&self, pk: &PublicKey, state: &mut ServerState) -> Result<(), Error> {
128 let client_a = if let Some(client) = state.connected_clients.remove(pk) {
130 client
131 } else {
132 return Err(Error::new(
133 ErrorKind::Other,
134 "Cannot find client by pk to shutdown it"
135 ))
136 };
137
138 state.keys_by_addr.remove(&(client_a.ip_addr(), client_a.port()));
139 let links = client_a.links();
140 for link in links.iter_links() {
141 match link.status {
142 LinkStatus::Registered => {
143 },
145 LinkStatus::Online => {
146 let client_b_pk = link.pk;
147 if let Some(client_b) = state.connected_clients.get_mut(&client_b_pk) {
148 if let Some(a_id_in_client_b) = client_b.links().id_by_pk(pk) {
149 client_b.links_mut().downgrade(a_id_in_client_b);
152
153 client_b.send_disconnect_notification(
154 ConnectionId::from_index(a_id_in_client_b)
155 ).await;
156 }
157 }
158 }
159 }
160 }
161
162 Ok(())
163 }
164 async fn handle_route_request(&self, pk: &PublicKey, packet: &RouteRequest) -> Result<(), Error> {
167 let mut state = self.state.write().await;
168
169 let client_a =
171 if let Some(client) = state.connected_clients.get_mut(pk) {
172 client
173 } else {
174 return Err(Error::new(ErrorKind::Other, "RouteRequest: no such PK"))
175 };
176
177 if pk == &packet.pk {
178 return client_a.send_route_response(pk, ConnectionId::zero()).await
180 }
181
182 if let Some(index) = client_a.links().id_by_pk(&packet.pk) {
184 return client_a.send_route_response(&packet.pk, ConnectionId::from_index(index)).await
186 }
187
188 let b_id_in_client_a = if let Some(index) = client_a.links_mut().insert(&packet.pk) {
190 index
191 } else {
192 return client_a.send_route_response(&packet.pk, ConnectionId::zero()).await
194 };
195
196 client_a.send_route_response(&packet.pk, ConnectionId::from_index(b_id_in_client_a)).await?;
197
198 let client_b = if let Some(client) = state.connected_clients.get(&packet.pk) {
200 client
201 } else {
202 return Ok(())
203 };
204
205 let a_id_in_client_b = if let Some(index) = client_b.links().id_by_pk(pk) {
207 index
208 } else {
209 return Ok(())
211 };
212
213 let client_a = state.connected_clients.get_mut(pk).unwrap();
217 client_a.links_mut().upgrade(b_id_in_client_a);
218 client_a.send_connect_notification(ConnectionId::from_index(b_id_in_client_a)).await;
219
220 let client_b = state.connected_clients.get_mut(&packet.pk).unwrap();
221 client_b.links_mut().upgrade(a_id_in_client_b);
222 client_b.send_connect_notification(ConnectionId::from_index(a_id_in_client_b)).await;
223
224 Ok(())
225 }
226
227 async fn handle_route_response(&self, _pk: &PublicKey, _packet: &RouteResponse) -> Result<(), Error> {
228 Err(Error::new(ErrorKind::Other, "Client must not send RouteResponse to server"))
229 }
230
231 async fn handle_connect_notification(&self, _pk: &PublicKey, _packet: &ConnectNotification) -> Result<(), Error> {
232 Ok(())
235 }
236 async fn handle_disconnect_notification(&self, pk: &PublicKey, packet: &DisconnectNotification) -> Result<(), Error> {
237 let index = if let Some(index) = packet.connection_id.index() {
238 index
239 } else {
240 return Err(Error::new(ErrorKind::Other, "DisconnectNotification: connection id is zero"))
241 };
242
243 let mut state = self.state.write().await;
244
245 let a_link = if let Some(client_a) = state.connected_clients.get_mut(pk) {
247 if let Some(link) = client_a.links_mut().take(index) {
249 link
250 } else {
251 trace!("DisconnectNotification.connection_id is not linked for the client {:?}", pk);
252 return Ok(())
257 }
258 } else {
259 return Err(Error::new(ErrorKind::Other, "DisconnectNotification: no such PK"))
260 };
261
262 match a_link.status {
263 LinkStatus::Registered => {
264 Ok(())
267 },
268 LinkStatus::Online => {
269 let client_b_pk = a_link.pk;
270 let client_b = if let Some(client) = state.connected_clients.get_mut(&client_b_pk) {
272 client
273 } else {
274 return Ok(())
277 };
278 let a_id_in_client_b = if let Some(id) = client_b.links().id_by_pk(pk) {
279 id
280 } else {
281 return Ok(())
283 };
284 client_b.links_mut().downgrade(a_id_in_client_b);
287 client_b.send_disconnect_notification(ConnectionId::from_index(a_id_in_client_b)).await;
288 Ok(())
289 }
290 }
291 }
292
293 async fn handle_ping_request(&self, pk: &PublicKey, packet: &PingRequest) -> Result<(), Error> {
294 if packet.ping_id == 0 {
295 return Err(Error::new(ErrorKind::Other, "PingRequest.ping_id == 0"))
296 }
297 let state = self.state.read().await;
298 if let Some(client_a) = state.connected_clients.get(pk) {
299 client_a.send_pong_response(packet.ping_id).await
300 } else {
301 Err(Error::new(ErrorKind::Other, "PingRequest: no such PK"))
302 }
303 }
304
305 async fn handle_pong_response(&self, pk: &PublicKey, packet: &PongResponse) -> Result<(), Error> {
306 if packet.ping_id == 0 {
307 return Err(
308 Error::new(ErrorKind::Other,
309 "PongResponse.ping_id == 0"
310 ))
311 }
312 let mut state = self.state.write().await;
313 if let Some(client_a) = state.connected_clients.get_mut(pk) {
314 if packet.ping_id == client_a.ping_id() {
315 client_a.set_last_pong_resp(Instant::now());
316
317 Ok(())
318 } else {
319 Err(Error::new(ErrorKind::Other, "PongResponse.ping_id does not match"))
320 }
321 } else {
322 Err(Error::new(ErrorKind::Other, "PongResponse: no such PK"))
323 }
324 }
325
326 async fn handle_oob_send(&self, pk: &PublicKey, packet: OobSend) -> Result<(), Error> {
327 if packet.data.is_empty() || packet.data.len() > 1024 {
328 return Err(Error::new(ErrorKind::Other, "OobSend wrong data length"))
329 }
330 let state = self.state.read().await;
331 if let Some(client_b) = state.connected_clients.get(&packet.destination_pk) {
332 client_b.send_oob(pk, packet.data).await;
333 }
334
335 Ok(())
336 }
337
338 async fn handle_oob_receive(&self, _pk: &PublicKey, _packet: &OobReceive) -> Result<(), Error> {
339 Err(Error::new(ErrorKind::Other, "Client must not send OobReceive to server"))
340 }
341
342 async fn handle_onion_request(&self, pk: &PublicKey, packet: OnionRequest) -> Result<(), Error> {
343 if let Some(ref onion_sink) = self.onion_sink {
344 let state = self.state.read().await;
345 if let Some(client) = state.connected_clients.get(&pk) {
346 let saddr = SocketAddr::new(client.ip_addr(), client.port());
347 let mut tx = onion_sink.clone();
348 tx .send((packet, saddr)).await
350 .map_err(|_| {
351 Error::from(ErrorKind::UnexpectedEof)
354 })
355 } else {
356 Err(Error::new(ErrorKind::Other, "OnionRequest: no such PK"))
357 }
358 } else {
359 Ok(())
361 }
362 }
363
364 async fn handle_onion_response(&self, _pk: &PublicKey, _packet: &OnionResponse) -> Result<(), Error> {
365 Err(Error::new(ErrorKind::Other, "Client must not send OnionResponse to server"))
366 }
367
368 async fn handle_data(&self, pk: &PublicKey, packet: Data) -> Result<(), Error> {
369 let index = if let Some(index) = packet.connection_id.index() {
370 index
371 } else {
372 return Err(Error::new(ErrorKind::Other, "Data: connection id is zero"))
373 };
374
375 let state = self.state.read().await;
376
377 let client_a = if let Some(client) = state.connected_clients.get(pk) {
379 client
380 } else {
381 return Err(Error::new(ErrorKind::Other, "Data: no such PK"));
382 };
383
384 let a_link = if let Some(link) = client_a.links().by_id(index) {
386 *link
387 } else {
388 trace!("Data.connection_id is not linked for the client {:?}", pk);
389 return Ok(())
394 };
395
396 match a_link.status {
397 LinkStatus::Registered => {
398 Ok(())
401 },
402 LinkStatus::Online => {
403 let client_b_pk = a_link.pk;
404 let client_b = if let Some(client) = state.connected_clients.get(&client_b_pk) {
406 client
407 } else {
408 return Ok(())
410 };
411 let a_id_in_client_b = if let Some(id) = client_b.links().id_by_pk(pk) {
412 id
413 } else {
414 return Ok(())
416 };
417 client_b.send_data(ConnectionId::from_index(a_id_in_client_b), packet.data).await
419 }
420 }
421 }
422
423 async fn remove_timedout_clients(&self, state: &mut ServerState) -> Result<(), Error> {
426 let keys = state.connected_clients.iter()
427 .filter(|(_key, client)| client.is_pong_timedout())
428 .map(|(key, _client)| *key)
429 .collect::<Vec<PublicKey>>();
430
431 for key in keys {
432 self.shutdown_client_inner(&key, state).await.ok();
434 }
435
436 Ok(())
437 }
438
439 pub async fn send_pings(&self) -> Result<(), Error> {
442 let mut state = self.state.write().await;
443
444 self.remove_timedout_clients(&mut state).await?;
445
446 for client in state.connected_clients.values_mut() {
447 if client.is_ping_interval_passed() {
448 client.send_ping_request().await.ok();
450 }
451 }
452
453 Ok(())
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460
461 use tox_packet::dht::CryptoData;
462 use tox_packet::ip_port::*;
463 use tox_packet::onion::*;
464 use crate::relay::server::{Client, Server};
465 use crate::relay::server::client::*;
466
467 use futures::channel::mpsc;
468 use futures::StreamExt;
469 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
470 use std::time::Duration;
471
472 use crate::time::*;
473
474 #[tokio::test]
475 async fn server_is_clonable() {
476 crypto_init().unwrap();
477 let server = Server::new();
478 let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
479 server.insert(client_1).await.unwrap();
480 let _cloned = server.clone();
481 }
483
484 fn create_random_client(saddr: SocketAddr) -> (Client, mpsc::Receiver<Packet>) {
487 crypto_init().unwrap();
488 let (client_pk, _) = gen_keypair();
489 let (tx, rx) = mpsc::channel(32);
490 let client = Client::new(tx, &client_pk, saddr.ip(), saddr.port());
491 (client, rx)
492 }
493
494 #[tokio::test]
495 async fn normal_communication_scenario() {
496 let server = Server::new();
497
498 let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
499 let client_pk_1 = client_1.pk();
500 let client_ip_addr_1 = client_1.ip_addr();
501 let client_port_1 = client_1.port();
502
503 server.insert(client_1).await.unwrap();
505
506 let (client_2, rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
507 let client_pk_2 = client_2.pk();
508
509 server.handle_packet(&client_pk_1, Packet::RouteRequest(
511 RouteRequest { pk: client_pk_2 }
512 )).await.unwrap();
513
514 let (packet, rx_1) = rx_1.into_future().await;
516 assert_eq!(packet.unwrap(), Packet::RouteResponse(
517 RouteResponse { pk: client_pk_2, connection_id: ConnectionId::from_index(0) }
518 ));
519
520 {
521 let state = server.state.read().await;
523
524 let client_a = &state.connected_clients[&client_pk_1];
526 let link_id = client_a.links().id_by_pk(&client_pk_2).unwrap();
527 assert_eq!(client_a.links().by_id(link_id).unwrap().status, LinkStatus::Registered);
528 }
529
530 server.insert(client_2).await.unwrap();
532
533 server.handle_packet(&client_pk_1, Packet::RouteRequest(
535 RouteRequest { pk: client_pk_2 }
536 )).await.unwrap();
537
538 let (packet, rx_1) = rx_1.into_future().await;
540 assert_eq!(packet.unwrap(), Packet::RouteResponse(
541 RouteResponse { pk: client_pk_2, connection_id: ConnectionId::from_index(0) }
542 ));
543
544 {
545 let state = server.state.read().await;
547
548 let client_b = &state.connected_clients[&client_pk_2];
550 assert!(client_b.links().id_by_pk(&client_pk_1).is_none());
551 }
552
553 server.handle_packet(&client_pk_2, Packet::RouteRequest(
555 RouteRequest { pk: client_pk_1 }
556 )).await.unwrap();
557
558 let (packet, rx_2) = rx_2.into_future().await;
560 assert_eq!(packet.unwrap(), Packet::RouteResponse(
561 RouteResponse { pk: client_pk_1, connection_id: ConnectionId::from_index(0) }
562 ));
563 let (packet, _rx_1) = rx_1.into_future().await;
566 assert_eq!(packet.unwrap(), Packet::ConnectNotification(
567 ConnectNotification { connection_id: ConnectionId::from_index(0) }
568 ));
569 let (packet, rx_2) = rx_2.into_future().await;
572 assert_eq!(packet.unwrap(), Packet::ConnectNotification(
573 ConnectNotification { connection_id: ConnectionId::from_index(0) }
574 ));
575
576 {
577 let state = server.state.read().await;
579
580 let client_a = &state.connected_clients[&client_pk_1];
582 let link_id = client_a.links().id_by_pk(&client_pk_2).unwrap();
583 assert_eq!(client_a.links().by_id(link_id).unwrap().status, LinkStatus::Online);
584
585 let client_b = &state.connected_clients[&client_pk_2];
587 let link_id = client_b.links().id_by_pk(&client_pk_1).unwrap();
588 assert_eq!(client_a.links().by_id(link_id).unwrap().status, LinkStatus::Online);
589 }
590
591 server.handle_packet(&client_pk_1, Packet::Data(
593 Data {
594 connection_id: ConnectionId::from_index(0),
595 data: DataPayload::CryptoData(CryptoData {
596 nonce_last_bytes: 42,
597 payload: vec![42; 123],
598 }),
599 }
600 )).await.unwrap();
601
602 let (packet, rx_2) = rx_2.into_future().await;
604 assert_eq!(packet.unwrap(), Packet::Data(
605 Data {
606 connection_id: ConnectionId::from_index(0),
607 data: DataPayload::CryptoData(CryptoData {
608 nonce_last_bytes: 42,
609 payload: vec![42; 123],
610 }),
611 }
612 ));
613
614 server.shutdown_client(&client_pk_1, client_ip_addr_1, client_port_1).await.unwrap();
616 let (packet, _rx_2) = rx_2.into_future().await;
618 assert_eq!(packet.unwrap(), Packet::DisconnectNotification(
619 DisconnectNotification { connection_id: ConnectionId::from_index(0) }
620 ));
621
622 let state = server.state.read().await;
624 let client_b = &state.connected_clients[&client_pk_2];
625 assert_eq!(client_b.links().by_id(0).unwrap().status, LinkStatus::Registered);
626 }
627 #[tokio::test]
628 async fn handle_route_request() {
629 let server = Server::new();
630
631 let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
632 let client_pk_1 = client_1.pk();
633 server.insert(client_1).await.unwrap();
634
635 let (client_2, _rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
636 let client_pk_2 = client_2.pk();
637 server.insert(client_2).await.unwrap();
638
639 server.handle_packet(&client_pk_1, Packet::RouteRequest(
641 RouteRequest { pk: client_pk_2 }
642 )).await.unwrap();
643
644 let (packet, _rx_1) = rx_1.into_future().await;
646 assert_eq!(packet.unwrap(), Packet::RouteResponse(
647 RouteResponse { pk: client_pk_2, connection_id: ConnectionId::from_index(0) }
648 ));
649
650 {
651 let state = server.state.read().await;
653
654 let client_a = &state.connected_clients[&client_pk_1];
656 let link_id = client_a.links().id_by_pk(&client_pk_2).unwrap();
657 assert_eq!(client_a.links().by_id(link_id).unwrap().status, LinkStatus::Registered);
658
659 let client_b = &state.connected_clients[&client_pk_2];
661 assert!(client_b.links().id_by_pk(&client_pk_1).is_none());
662 }
663 }
664 #[tokio::test]
665 async fn handle_route_request_to_itself() {
666 let server = Server::new();
667
668 let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
669 let client_pk_1 = client_1.pk();
670 server.insert(client_1).await.unwrap();
671
672 server.handle_packet(&client_pk_1, Packet::RouteRequest(
674 RouteRequest { pk: client_pk_1 }
675 )).await.unwrap();
676
677 let (packet, _rx_1) = rx_1.into_future().await;
679 assert_eq!(packet.unwrap(), Packet::RouteResponse(
680 RouteResponse { pk: client_pk_1, connection_id: ConnectionId::zero() }
681 ));
682 }
683 #[tokio::test]
684 async fn handle_route_request_too_many_connections() {
685 let server = Server::new();
686
687 let (client_1, mut rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
688 let client_pk_1 = client_1.pk();
689 server.insert(client_1).await.unwrap();
690
691 for i in 0..240 {
693 let saddr = SocketAddr::new("1.2.3.4".parse().unwrap(), 12346 + u16::from(i));
694 let (other_client, _other_rx) = create_random_client(saddr);
695 let other_client_pk = other_client.pk();
696 server.insert(other_client).await.unwrap();
697
698 server.handle_packet(&client_pk_1, Packet::RouteRequest(
700 RouteRequest { pk: other_client_pk }
701 )).await.unwrap();
702
703 let (packet, rx_1_nested) = rx_1.into_future().await;
705 assert_eq!(packet.unwrap(), Packet::RouteResponse(
706 RouteResponse { pk: other_client_pk, connection_id: ConnectionId::from_index(i) }
707 ));
708 rx_1 = rx_1_nested;
709 }
710 let (other_client, _other_rx) = create_random_client("1.2.3.5:12345".parse().unwrap());
712 let other_client_pk = other_client.pk();
713 server.insert(other_client).await.unwrap();
714 server.handle_packet(&client_pk_1, Packet::RouteRequest(
716 RouteRequest { pk: other_client_pk }
717 )).await.unwrap();
718
719 let (packet, _rx_1) = rx_1.into_future().await;
721 assert_eq!(packet.unwrap(), Packet::RouteResponse(
722 RouteResponse { pk: other_client_pk, connection_id: ConnectionId::zero() }
723 ));
724 }
725 #[tokio::test]
726 async fn handle_connect_notification() {
727 let server = Server::new();
728
729 let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
730 let client_pk_1 = client_1.pk();
731 server.insert(client_1).await.unwrap();
732
733 let handle_res = server.handle_packet(&client_pk_1, Packet::ConnectNotification(
735 ConnectNotification { connection_id: ConnectionId::from_index(42) }
736 )).await;
737 assert!(handle_res.is_ok());
738 }
739 #[tokio::test]
740 async fn handle_disconnect_notification() {
741 let server = Server::new();
742
743 let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
744 let client_pk_1 = client_1.pk();
745 server.insert(client_1).await.unwrap();
746
747 let (client_2, rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
748 let client_pk_2 = client_2.pk();
749 server.insert(client_2).await.unwrap();
750
751 server.handle_packet(&client_pk_1, Packet::RouteRequest(
753 RouteRequest { pk: client_pk_2 }
754 )).await.unwrap();
755
756 let (packet, rx_1) = rx_1.into_future().await;
758 assert_eq!(packet.unwrap(), Packet::RouteResponse(
759 RouteResponse { pk: client_pk_2, connection_id: ConnectionId::from_index(0) }
760 ));
761
762 server.handle_packet(&client_pk_2, Packet::RouteRequest(
764 RouteRequest { pk: client_pk_1 }
765 )).await.unwrap();
766
767 let (packet, rx_2) = rx_2.into_future().await;
769 assert_eq!(packet.unwrap(), Packet::RouteResponse(
770 RouteResponse { pk: client_pk_1, connection_id: ConnectionId::from_index(0) }
771 ));
772 let (packet, rx_1) = rx_1.into_future().await;
775 assert_eq!(packet.unwrap(), Packet::ConnectNotification(
776 ConnectNotification { connection_id: ConnectionId::from_index(0) }
777 ));
778 let (packet, rx_2) = rx_2.into_future().await;
781 assert_eq!(packet.unwrap(), Packet::ConnectNotification(
782 ConnectNotification { connection_id: ConnectionId::from_index(0) }
783 ));
784
785 {
786 let state = server.state.read().await;
788
789 let client_a = &state.connected_clients[&client_pk_1];
791 let link_id = client_a.links().id_by_pk(&client_pk_2).unwrap();
792 assert_eq!(client_a.links().by_id(link_id).unwrap().status, LinkStatus::Online);
793
794 let client_b = &state.connected_clients[&client_pk_2];
796 let link_id = client_b.links().id_by_pk(&client_pk_1).unwrap();
797 assert_eq!(client_a.links().by_id(link_id).unwrap().status, LinkStatus::Online);
798 }
799
800 server.handle_packet(&client_pk_1, Packet::DisconnectNotification(
802 DisconnectNotification { connection_id: ConnectionId::from_index(0) }
803 )).await.unwrap();
804
805 let (packet, _rx_2) = rx_2.into_future().await;
807 assert_eq!(packet.unwrap(), Packet::DisconnectNotification(
808 DisconnectNotification { connection_id: ConnectionId::from_index(0) }
809 ));
810
811 {
812 let state = server.state.read().await;
814
815 let client_a = &state.connected_clients[&client_pk_1];
817 assert!(client_a.links().id_by_pk(&client_pk_2).is_none());
818
819 let client_b = &state.connected_clients[&client_pk_2];
821 let link_id = client_b.links().id_by_pk(&client_pk_1).unwrap();
822 assert_eq!(client_b.links().by_id(link_id).unwrap().status, LinkStatus::Registered);
823 }
824
825 server.handle_packet(&client_pk_2, Packet::DisconnectNotification(
827 DisconnectNotification { connection_id: ConnectionId::from_index(0) }
828 )).await.unwrap();
829
830 {
831 let state = server.state.read().await;
833
834 let client_b = &state.connected_clients[&client_pk_2];
836 assert!(client_b.links().id_by_pk(&client_pk_2).is_none());
837 }
838
839 drop(server);
842 assert!(rx_1.collect::<Vec<_>>().await.is_empty());
843 }
844 #[tokio::test]
845 async fn handle_disconnect_notification_other_not_linked() {
846 let server = Server::new();
847
848 let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
849 let client_pk_1 = client_1.pk();
850 server.insert(client_1).await.unwrap();
851
852 let (client_2, rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
853 let client_pk_2 = client_2.pk();
854 server.insert(client_2).await.unwrap();
855
856 server.handle_packet(&client_pk_1, Packet::RouteRequest(
858 RouteRequest { pk: client_pk_2 }
859 )).await.unwrap();
860
861 let handle_res = server.handle_packet(&client_pk_1, Packet::DisconnectNotification(
863 DisconnectNotification { connection_id: ConnectionId::from_index(0) }
864 )).await;
865 assert!(handle_res.is_ok());
866
867 drop(server);
870 assert!(rx_2.collect::<Vec<_>>().await.is_empty());
871 }
872 #[tokio::test]
873 async fn handle_disconnect_notification_0() {
874 crypto_init().unwrap();
875 let server = Server::new();
876
877 let (client_pk, _) = gen_keypair();
878
879 let handle_res = server.handle_packet(&client_pk, Packet::DisconnectNotification(
880 DisconnectNotification { connection_id: ConnectionId::zero() }
881 )).await;
882 assert!(handle_res.is_err());
883 }
884 #[tokio::test]
885 async fn handle_ping_request() {
886 let server = Server::new();
887
888 let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
889 let client_pk_1 = client_1.pk();
890 server.insert(client_1).await.unwrap();
891
892 server.handle_packet(&client_pk_1, Packet::PingRequest(
894 PingRequest { ping_id: 42 }
895 )).await.unwrap();
896
897 let (packet, _rx_1) = rx_1.into_future().await;
899 assert_eq!(packet.unwrap(), Packet::PongResponse(
900 PongResponse { ping_id: 42 }
901 ));
902 }
903 #[tokio::test]
904 async fn handle_oob_send() {
905 let server = Server::new();
906
907 let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
908 let client_pk_1 = client_1.pk();
909 server.insert(client_1).await.unwrap();
910
911 let (client_2, rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
912 let client_pk_2 = client_2.pk();
913 server.insert(client_2).await.unwrap();
914
915 server.handle_packet(&client_pk_1, Packet::OobSend(
917 OobSend { destination_pk: client_pk_2, data: vec![13; 1024] }
918 )).await.unwrap();
919
920 let (packet, _rx_2) = rx_2.into_future().await;
922 assert_eq!(packet.unwrap(), Packet::OobReceive(
923 OobReceive { sender_pk: client_pk_1, data: vec![13; 1024] }
924 ));
925 }
926 #[tokio::test]
927 async fn handle_onion_request() {
928 crypto_init().unwrap();
929 let (udp_onion_sink, udp_onion_stream) = mpsc::channel(1);
930 let mut server = Server::new();
931 server.set_udp_onion_sink(udp_onion_sink);
932
933 let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
934 let client_pk_1 = client_1.pk();
935 let client_addr_1 = client_1.ip_addr();
936 let client_port_1 = client_1.port();
937 server.insert(client_1).await.unwrap();
938
939 let request = OnionRequest {
940 nonce: gen_nonce(),
941 ip_port: IpPort {
942 protocol: ProtocolType::TCP,
943 ip_addr: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
944 port: 12345,
945 },
946 temporary_pk: gen_keypair().0,
947 payload: vec![13; 170]
948 };
949 let handle_res = server
950 .handle_packet(&client_pk_1, Packet::OnionRequest(request.clone()))
951 .await;
952 assert!(handle_res.is_ok());
953
954 let (packet, _) = udp_onion_stream.into_future().await;
955 let (packet, saddr) = packet.unwrap();
956
957 assert_eq!(saddr.ip(), client_addr_1);
958 assert_eq!(saddr.port(), client_port_1);
959 assert_eq!(packet, request);
960 }
961 #[tokio::test]
962 async fn handle_udp_onion_response() {
963 let server = Server::new();
964
965 let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
966 let client_addr_1 = client_1.ip_addr();
967 let client_port_1 = client_1.port();
968 server.insert(client_1).await.unwrap();
969
970 let payload = InnerOnionResponse::OnionAnnounceResponse(OnionAnnounceResponse {
971 sendback_data: 12345,
972 nonce: gen_nonce(),
973 payload: vec![42; 123]
974 });
975 let handle_res = server
976 .handle_udp_onion_response(client_addr_1, client_port_1, payload.clone())
977 .await;
978 assert!(handle_res.is_ok());
979
980 let (packet, _) = rx_1.into_future().await;
981 assert_eq!(packet.unwrap(), Packet::OnionResponse(
982 OnionResponse { payload }
983 ));
984 }
985 #[tokio::test]
986 async fn insert_with_same_pk() {
987 let server = Server::new();
988
989 let (mut client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
990 let (mut client_2, rx_2) = create_random_client("1.2.3.4:12346".parse().unwrap());
991
992 let index_1 = client_1.links_mut().insert(&client_2.pk()).unwrap();
994 assert!(client_1.links_mut().upgrade(index_1));
995 let index_2 = client_2.links_mut().insert(&client_1.pk()).unwrap();
996 assert!(client_2.links_mut().upgrade(index_2));
997
998 let client_pk_1 = client_1.pk();
999 let client_addr_3 = "1.2.3.4".parse().unwrap();
1000 let client_port_3 = 12347;
1001 let (tx_3, _rx_3) = mpsc::channel(32);
1002 let client_3 = Client::new(tx_3, &client_pk_1, client_addr_3, client_port_3);
1003
1004 server.insert(client_1).await.unwrap();
1005 server.insert(client_2).await.unwrap();
1006
1007 server.insert(client_3).await.unwrap();
1009
1010 let (packet, _) = rx_2.into_future().await;
1011 assert_eq!(packet.unwrap(), Packet::DisconnectNotification(
1012 DisconnectNotification { connection_id: ConnectionId::from_index(index_2) }
1013 ));
1014
1015 let state = server.state.read().await;
1016 let client = &state.connected_clients[&client_pk_1];
1017
1018 assert_eq!(client.ip_addr(), client_addr_3);
1019 assert_eq!(client.port(), client_port_3);
1020 }
1021 #[tokio::test]
1022 async fn shutdown_other_not_linked() {
1023 let server = Server::new();
1024
1025 let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1026 let client_pk_1 = client_1.pk();
1027 let client_ip_addr_1 = client_1.ip_addr();
1028 let client_port_1 = client_1.port();
1029 server.insert(client_1).await.unwrap();
1030
1031 let (client_2, _rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
1032 let client_pk_2 = client_2.pk();
1033 server.insert(client_2).await.unwrap();
1034
1035 server.handle_packet(&client_pk_1, Packet::RouteRequest(
1037 RouteRequest { pk: client_pk_2 }
1038 )).await.unwrap();
1039
1040 let (packet, _rx_1) = rx_1.into_future().await;
1042 assert_eq!(packet.unwrap(), Packet::RouteResponse(
1043 RouteResponse { pk: client_pk_2, connection_id: ConnectionId::from_index(0) }
1044 ));
1045
1046 let handle_res = server.shutdown_client(&client_pk_1, client_ip_addr_1, client_port_1).await;
1048 assert!(handle_res.is_ok());
1049 }
1050 #[tokio::test]
1051 async fn handle_data_other_not_linked() {
1052 let server = Server::new();
1053
1054 let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1055 let client_pk_1 = client_1.pk();
1056 server.insert(client_1).await.unwrap();
1057
1058 let (client_2, _rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
1059 let client_pk_2 = client_2.pk();
1060 server.insert(client_2).await.unwrap();
1061
1062 server.handle_packet(&client_pk_1, Packet::RouteRequest(
1064 RouteRequest { pk: client_pk_2 }
1065 )).await.unwrap();
1066
1067 let (packet, _rx_1) = rx_1.into_future().await;
1069 assert_eq!(packet.unwrap(), Packet::RouteResponse(
1070 RouteResponse { pk: client_pk_2, connection_id: ConnectionId::from_index(0) }
1071 ));
1072
1073 let handle_res = server.handle_packet(&client_pk_1, Packet::Data(
1075 Data {
1076 connection_id: ConnectionId::from_index(0),
1077 data: DataPayload::CryptoData(CryptoData {
1078 nonce_last_bytes: 42,
1079 payload: vec![42; 123],
1080 }),
1081 }
1082 )).await;
1083 assert!(handle_res.is_ok());
1084 }
1085 #[tokio::test]
1086 async fn handle_data_0() {
1087 crypto_init().unwrap();
1088 let server = Server::new();
1089
1090 let (client_pk, _) = gen_keypair();
1091
1092 let handle_res = server.handle_packet(&client_pk, Packet::Data(
1093 Data {
1094 connection_id: ConnectionId::zero(),
1095 data: DataPayload::CryptoData(CryptoData {
1096 nonce_last_bytes: 42,
1097 payload: vec![42; 123],
1098 }),
1099 }
1100 )).await;
1101 assert!(handle_res.is_err());
1102 }
1103
1104 #[tokio::test]
1107 async fn handle_route_response() {
1108 let server = Server::new();
1109
1110 let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1111 let client_pk_1 = client_1.pk();
1112 server.insert(client_1).await.unwrap();
1113
1114 let handle_res = server.handle_packet(&client_pk_1, Packet::RouteResponse(
1116 RouteResponse { pk: client_pk_1, connection_id: ConnectionId::from_index(42) }
1117 )).await;
1118 assert!(handle_res.is_err());
1119 }
1120 #[tokio::test]
1121 async fn handle_disconnect_notification_not_linked() {
1122 let server = Server::new();
1123
1124 let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1125 let client_pk_1 = client_1.pk();
1126 server.insert(client_1).await.unwrap();
1127
1128 let handle_res = server.handle_packet(&client_pk_1, Packet::DisconnectNotification(
1130 DisconnectNotification { connection_id: ConnectionId::from_index(0) }
1131 )).await;
1132 assert!(handle_res.is_ok());
1133
1134 drop(server);
1136
1137 assert!(rx_1.collect::<Vec<_>>().await.is_empty());
1138 }
1139 #[tokio::test]
1140 async fn handle_ping_request_0() {
1141 let server = Server::new();
1142
1143 let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1144 let client_pk_1 = client_1.pk();
1145 server.insert(client_1).await.unwrap();
1146
1147 let handle_res = server.handle_packet(&client_pk_1, Packet::PingRequest(
1149 PingRequest { ping_id: 0 }
1150 )).await;
1151 assert!(handle_res.is_err());
1152 }
1153 #[tokio::test]
1154 async fn handle_pong_response_0() {
1155 let server = Server::new();
1156
1157 let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1158 let client_pk_1 = client_1.pk();
1159 server.insert(client_1).await.unwrap();
1160
1161 let handle_res = server.handle_packet(&client_pk_1, Packet::PongResponse(
1163 PongResponse { ping_id: 0 }
1164 )).await;
1165 assert!(handle_res.is_err());
1166 }
1167 #[tokio::test]
1168 async fn handle_oob_send_empty_data() {
1169 let server = Server::new();
1170
1171 let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1172 let client_pk_1 = client_1.pk();
1173 server.insert(client_1).await.unwrap();
1174
1175 let (client_2, _rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
1176 let client_pk_2 = client_2.pk();
1177 server.insert(client_2).await.unwrap();
1178
1179 let handle_res = server.handle_packet(&client_pk_1, Packet::OobSend(
1181 OobSend { destination_pk: client_pk_2, data: vec![] }
1182 )).await;
1183 assert!(handle_res.is_err());
1184 }
1185 #[tokio::test]
1186 async fn handle_data_self_not_linked() {
1187 let server = Server::new();
1188
1189 let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1190 let client_pk_1 = client_1.pk();
1191 server.insert(client_1).await.unwrap();
1192
1193 let handle_res = server.handle_packet(&client_pk_1, Packet::Data(
1195 Data {
1196 connection_id: ConnectionId::from_index(0),
1197 data: DataPayload::CryptoData(CryptoData {
1198 nonce_last_bytes: 42,
1199 payload: vec![42; 123],
1200 }),
1201 }
1202 )).await;
1203 assert!(handle_res.is_ok());
1204
1205 drop(server);
1207
1208 assert!(rx_1.collect::<Vec<_>>().await.is_empty());
1209 }
1210 #[tokio::test]
1211 async fn handle_oob_send_to_loooong_data() {
1212 let server = Server::new();
1213
1214 let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1215 let client_pk_1 = client_1.pk();
1216 server.insert(client_1).await.unwrap();
1217
1218 let (client_2, _rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
1219 let client_pk_2 = client_2.pk();
1220 server.insert(client_2).await.unwrap();
1221
1222 let handle_res = server.handle_packet(&client_pk_1, Packet::OobSend(
1224 OobSend { destination_pk: client_pk_2, data: vec![42; 1024 + 1] }
1225 )).await;
1226 assert!(handle_res.is_err());
1227 }
1228 #[tokio::test]
1229 async fn handle_oob_recv() {
1230 let server = Server::new();
1231
1232 let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1233 let client_pk_1 = client_1.pk();
1234 server.insert(client_1).await.unwrap();
1235
1236 let (client_2, _rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
1237 let client_pk_2 = client_2.pk();
1238 server.insert(client_2).await.unwrap();
1239
1240 let handle_res = server.handle_packet(&client_pk_1, Packet::OobReceive(
1242 OobReceive { sender_pk: client_pk_2, data: vec![42; 1024] }
1243 )).await;
1244 assert!(handle_res.is_err());
1245 }
1246 #[tokio::test]
1247 async fn handle_onion_request_disabled_onion_loooong_data() {
1248 let server = Server::new();
1249
1250 let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1251 let client_pk_1 = client_1.pk();
1252 server.insert(client_1).await.unwrap();
1253
1254 let request = OnionRequest {
1255 nonce: gen_nonce(),
1256 ip_port: IpPort {
1257 protocol: ProtocolType::TCP,
1258 ip_addr: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
1259 port: 12345,
1260 },
1261 temporary_pk: gen_keypair().0,
1262 payload: vec![13; 1500]
1263 };
1264 let handle_res = server
1265 .handle_packet(&client_pk_1, Packet::OnionRequest(request))
1266 .await;
1267 assert!(handle_res.is_ok());
1268 }
1269 #[tokio::test]
1270 async fn handle_onion_response() {
1271 let server = Server::new();
1272
1273 let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1274 let client_pk_1 = client_1.pk();
1275 server.insert(client_1).await.unwrap();
1276
1277 let payload = InnerOnionResponse::OnionAnnounceResponse(OnionAnnounceResponse {
1278 sendback_data: 12345,
1279 nonce: gen_nonce(),
1280 payload: vec![42; 123]
1281 });
1282 let handle_res = server.handle_packet(&client_pk_1, Packet::OnionResponse(
1283 OnionResponse { payload }
1284 )).await;
1285 assert!(handle_res.is_err());
1286 }
1287 #[tokio::test]
1288 async fn handle_udp_onion_response_for_unknown_client() {
1289 crypto_init().unwrap();
1290 let (udp_onion_sink, _) = mpsc::channel(1);
1291 let mut server = Server::new();
1292 server.set_udp_onion_sink(udp_onion_sink);
1293
1294 let client_addr_1 = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4));
1295 let client_port_1 = 12345u16;
1296 let (client_pk_1, _) = gen_keypair();
1297 let (tx_1, _rx_1) = mpsc::channel(1);
1298 let client_1 = Client::new(tx_1, &client_pk_1, client_addr_1, client_port_1);
1299 server.insert(client_1).await.unwrap();
1300
1301 let client_addr_2 = IpAddr::V4(Ipv4Addr::new(5, 6, 7, 8));
1302 let client_port_2 = 54321u16;
1303
1304 let payload = InnerOnionResponse::OnionAnnounceResponse(OnionAnnounceResponse {
1305 sendback_data: 12345,
1306 nonce: gen_nonce(),
1307 payload: vec![42; 123]
1308 });
1309 let handle_res = server
1310 .handle_udp_onion_response(client_addr_2, client_port_2, payload)
1311 .await;
1312 assert!(handle_res.is_err());
1313 }
1314
1315 #[tokio::test]
1318 async fn handle_route_request_not_connected() {
1319 crypto_init().unwrap();
1320 let server = Server::new();
1321 let (client_pk_1, _) = gen_keypair();
1322 let (client_pk_2, _) = gen_keypair();
1323
1324 let handle_res = server.handle_packet(&client_pk_1, Packet::RouteRequest(
1326 RouteRequest { pk: client_pk_2 }
1327 )).await;
1328 assert!(handle_res.is_err());
1329 }
1330 #[tokio::test]
1331 async fn handle_disconnect_notification_not_connected() {
1332 crypto_init().unwrap();
1333 let server = Server::new();
1334 let (client_pk_1, _) = gen_keypair();
1335
1336 let handle_res = server.handle_packet(&client_pk_1, Packet::DisconnectNotification(
1338 DisconnectNotification { connection_id: ConnectionId::from_index(42) }
1339 )).await;
1340 assert!(handle_res.is_err());
1341 }
1342 #[tokio::test]
1343 async fn handle_disconnect_notification_other_not_connected() {
1344 let server = Server::new();
1345
1346 let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1347 let client_pk_1 = client_1.pk();
1348 server.insert(client_1).await.unwrap();
1349
1350 let (client_pk_2, _) = gen_keypair();
1351
1352 server.handle_packet(&client_pk_1, Packet::RouteRequest(
1354 RouteRequest { pk: client_pk_2 }
1355 )).await.unwrap();
1356
1357 let handle_res = server.handle_packet(&client_pk_1, Packet::DisconnectNotification(
1359 DisconnectNotification { connection_id: ConnectionId::from_index(0) }
1360 )).await;
1361 assert!(handle_res.is_ok());
1362 }
1363 #[tokio::test]
1364 async fn handle_ping_request_not_connected() {
1365 crypto_init().unwrap();
1366 let server = Server::new();
1367 let (client_pk_1, _) = gen_keypair();
1368
1369 let handle_res = server.handle_packet(&client_pk_1, Packet::PingRequest(
1371 PingRequest { ping_id: 42 }
1372 )).await;
1373 assert!(handle_res.is_err());
1374 }
1375 #[tokio::test]
1376 async fn handle_pong_response_not_connected() {
1377 crypto_init().unwrap();
1378 let server = Server::new();
1379 let (client_pk_1, _) = gen_keypair();
1380
1381 let handle_res = server.handle_packet(&client_pk_1, Packet::PongResponse(
1383 PongResponse { ping_id: 42 }
1384 )).await;
1385 assert!(handle_res.is_err());
1386 }
1387 #[tokio::test]
1388 async fn handle_oob_send_not_connected() {
1389 crypto_init().unwrap();
1390 let server = Server::new();
1391 let (client_pk_1, _) = gen_keypair();
1392 let (client_pk_2, _) = gen_keypair();
1393
1394 let handle_res = server.handle_packet(&client_pk_1, Packet::OobSend(
1396 OobSend { destination_pk: client_pk_2, data: vec![42; 1024] }
1397 )).await;
1398 assert!(handle_res.is_ok());
1399 }
1400 #[tokio::test]
1401 async fn handle_data_not_connected() {
1402 crypto_init().unwrap();
1403 let server = Server::new();
1404 let (client_pk_1, _) = gen_keypair();
1405
1406 let handle_res = server.handle_packet(&client_pk_1, Packet::Data(
1408 Data {
1409 connection_id: ConnectionId::from_index(0),
1410 data: DataPayload::CryptoData(CryptoData {
1411 nonce_last_bytes: 42,
1412 payload: vec![42; 123],
1413 }),
1414 }
1415 )).await;
1416 assert!(handle_res.is_err());
1417 }
1418 #[tokio::test]
1419 async fn handle_data_other_not_connected() {
1420 let server = Server::new();
1421
1422 let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1423 let client_pk_1 = client_1.pk();
1424 server.insert(client_1).await.unwrap();
1425
1426 let (client_pk_2, _) = gen_keypair();
1427
1428 server.handle_packet(&client_pk_1, Packet::RouteRequest(
1430 RouteRequest { pk: client_pk_2 }
1431 )).await.unwrap();
1432
1433 let (packet, _rx_1) = rx_1.into_future().await;
1435 assert_eq!(packet.unwrap(), Packet::RouteResponse(
1436 RouteResponse { pk: client_pk_2, connection_id: ConnectionId::from_index(0) }
1437 ));
1438
1439 let handle_res = server.handle_packet(&client_pk_1, Packet::Data(
1441 Data {
1442 connection_id: ConnectionId::from_index(0),
1443 data: DataPayload::CryptoData(CryptoData {
1444 nonce_last_bytes: 42,
1445 payload: vec![42; 123],
1446 }),
1447 }
1448 )).await;
1449 assert!(handle_res.is_ok());
1450 }
1451 #[tokio::test]
1452 async fn shutdown_different_addr() {
1453 let server = Server::new();
1454
1455 let (client, _rx) = create_random_client("1.2.3.4:12345".parse().unwrap());
1456 let client_pk = client.pk();
1457 server.insert(client).await.unwrap();
1458
1459 let handle_res = server.shutdown_client(&client_pk, "1.2.3.4".parse().unwrap(), 12346).await;
1461 assert!(handle_res.is_err());
1462
1463 let state = server.state.read().await;
1464
1465 assert!(state.connected_clients.contains_key(&client_pk));
1466 }
1467 #[tokio::test]
1468 async fn shutdown_not_connected() {
1469 crypto_init().unwrap();
1470 let server = Server::new();
1471 let (client_pk, _) = gen_keypair();
1472 let client_ip_addr = "1.2.3.4".parse().unwrap();
1473 let client_port = 12345;
1474
1475 let handle_res = server.shutdown_client(&client_pk, client_ip_addr, client_port).await;
1477 assert!(handle_res.is_err());
1478 }
1479 #[tokio::test]
1480 async fn shutdown_inner_not_connected() {
1481 crypto_init().unwrap();
1482 let server = Server::new();
1483 let (client_pk, _) = gen_keypair();
1484
1485 let mut state = server.state.write().await;
1486
1487 let handle_res = server.shutdown_client_inner(&client_pk, &mut state).await;
1489 assert!(handle_res.is_err());
1490 }
1491 #[tokio::test]
1492 async fn shutdown_other_not_connected() {
1493 let server = Server::new();
1494
1495 let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1496 let client_pk_1 = client_1.pk();
1497 let client_ip_addr_1 = client_1.ip_addr();
1498 let client_port_1 = client_1.port();
1499 server.insert(client_1).await.unwrap();
1500
1501 let (client_pk_2, _) = gen_keypair();
1502
1503 server.handle_packet(&client_pk_1, Packet::RouteRequest(
1505 RouteRequest { pk: client_pk_2 }
1506 )).await.unwrap();
1507
1508 let (packet, _rx_1) = rx_1.into_future().await;
1510 assert_eq!(packet.unwrap(), Packet::RouteResponse(
1511 RouteResponse { pk: client_pk_2, connection_id: ConnectionId::from_index(0) }
1512 ));
1513
1514 let handle_res = server.shutdown_client(&client_pk_1, client_ip_addr_1, client_port_1).await;
1516 assert!(handle_res.is_ok());
1517 }
1518 #[tokio::test]
1519 async fn send_anything_to_dropped_client() {
1520 let server = Server::new();
1521
1522 let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1523 let client_pk_1 = client_1.pk();
1524 server.insert(client_1).await.unwrap();
1525
1526 let (client_2, _rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
1527 let client_pk_2 = client_2.pk();
1528 server.insert(client_2).await.unwrap();
1529
1530 drop(rx_1);
1531
1532 let handle_res = server.handle_packet(&client_pk_1, Packet::RouteRequest(
1534 RouteRequest { pk: client_pk_2 }
1535 )).await;
1536 assert!(handle_res.is_err())
1537 }
1538 #[tokio::test]
1539 async fn send_onion_request_to_dropped_stream() {
1540 crypto_init().unwrap();
1541 let (udp_onion_sink, udp_onion_stream) = mpsc::channel(1);
1542 let mut server = Server::new();
1543 server.set_udp_onion_sink(udp_onion_sink);
1544
1545 let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1546 let client_pk_1 = client_1.pk();
1547 server.insert(client_1).await.unwrap();
1548
1549 drop(udp_onion_stream);
1550
1551 let request = OnionRequest {
1553 nonce: gen_nonce(),
1554 ip_port: IpPort {
1555 protocol: ProtocolType::TCP,
1556 ip_addr: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
1557 port: 12345,
1558 },
1559 temporary_pk: gen_keypair().0,
1560 payload: vec![13; 170]
1561 };
1562 let handle_res = server
1563 .handle_packet(&client_pk_1, Packet::OnionRequest(request))
1564 .await;
1565 assert!(handle_res.is_err());
1566 }
1567 #[tokio::test]
1568 async fn tcp_send_pings_test() {
1569 let server = Server::new();
1570
1571 let (client_1, rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1573 let pk_1 = client_1.pk();
1574 server.insert(client_1).await.unwrap();
1575
1576 let (client_2, rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
1578 let pk_2 = client_2.pk();
1579 server.insert(client_2).await.unwrap();
1580
1581 let (client_3, rx_3) = create_random_client("1.2.3.6:12345".parse().unwrap());
1583 let pk_3 = client_3.pk();
1584 server.insert(client_3).await.unwrap();
1585
1586
1587 tokio::time::pause();
1588 tokio::time::advance(TCP_PING_FREQUENCY + Duration::from_secs(1)).await;
1590
1591 let sender_res = server.send_pings().await;
1592 assert!(sender_res.is_ok());
1593
1594 let (packet, _rx_1) = rx_1.into_future().await;
1595 assert_eq!(packet.unwrap(), Packet::PingRequest(
1596 PingRequest { ping_id: server.state.read().await.connected_clients[&pk_1].ping_id() }
1597 ));
1598 let (packet, _rx_2) = rx_2.into_future().await;
1599 assert_eq!(packet.unwrap(), Packet::PingRequest(
1600 PingRequest { ping_id: server.state.read().await.connected_clients[&pk_2].ping_id() }
1601 ));
1602 let (packet, _rx_3) = rx_3.into_future().await;
1603 assert_eq!(packet.unwrap(), Packet::PingRequest(
1604 PingRequest { ping_id: server.state.read().await.connected_clients[&pk_3].ping_id() }
1605 ));
1606 }
1607 #[tokio::test]
1608 async fn tcp_send_remove_timedouts() {
1609 let server = Server::new();
1610
1611 let (client_1, _rx_1) = create_random_client("1.2.3.4:12345".parse().unwrap());
1613 let pk_1 = client_1.pk();
1614 server.insert(client_1).await.unwrap();
1615
1616 let (client_2, _rx_2) = create_random_client("1.2.3.5:12345".parse().unwrap());
1618 let pk_2 = client_2.pk();
1619 server.insert(client_2).await.unwrap();
1620
1621 let (mut client_3, _rx_3) = create_random_client("1.2.3.6:12345".parse().unwrap());
1623 let pk_3 = client_3.pk();
1624
1625 tokio::time::pause();
1626 tokio::time::advance(TCP_PING_FREQUENCY + TCP_PING_TIMEOUT + Duration::from_secs(1)).await;
1628
1629 client_3.set_last_pong_resp(clock_now());
1630 server.insert(client_3).await.unwrap();
1631 let sender_res = server.send_pings().await;
1632 assert!(sender_res.is_ok());
1633
1634 assert!(!server.state.read().await.connected_clients.contains_key(&pk_1));
1635 assert!(!server.state.read().await.connected_clients.contains_key(&pk_2));
1636 assert!(server.state.read().await.connected_clients.contains_key(&pk_3));
1637 }
1638}