1mod errors;
4mod nodes_pool;
5mod onion_path;
6mod paths_pool;
7
8use std::collections::HashMap;
9use std::net::SocketAddr;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use failure::Fail;
14use futures::{StreamExt, SinkExt};
15use futures::channel::mpsc;
16use futures::lock::Mutex;
17
18use tox_crypto::*;
19use crate::dht::ip_port::IsGlobal;
20use tox_packet::dht::packed_node::PackedNode;
21use tox_packet::dht::*;
22use crate::dht::request_queue::RequestQueue;
23use crate::dht::server::Server as DhtServer;
24use crate::dht::kbucket::*;
25use tox_packet::ip_port::*;
26use crate::onion::client::errors::*;
27use crate::onion::client::onion_path::*;
28use crate::onion::client::paths_pool::*;
29use crate::onion::onion_announce::initial_ping_id;
30use tox_packet::onion::*;
31use tox_packet::packed_node::*;
32use crate::relay::client::Connections as TcpConnections;
33use crate::time::*;
34use crate::io_tokio::*;
35
36type DhtPkTx = mpsc::UnboundedSender<(PublicKey, PublicKey)>;
40
41type FriendRequestTx = mpsc::UnboundedSender<(PublicKey, FriendRequest)>;
44
45const MAX_ONION_FRIEND_NODES: u8 = 8;
47
48const MAX_ONION_ANNOUNCE_NODES: u8 = 12;
50
51const ANNOUNCE_TIMEOUT: Duration = Duration::from_secs(10);
53
54const ONION_NODE_MAX_PINGS: u32 = 3;
56
57const ONION_NODE_PING_INTERVAL: Duration = Duration::from_secs(15);
59
60const ANNOUNCE_INTERVAL_NOT_ANNOUNCED: Duration = Duration::from_secs(3);
62
63const ANNOUNCE_INTERVAL_ANNOUNCED: Duration = ONION_NODE_PING_INTERVAL;
65
66const ANNOUNCE_INTERVAL_STABLE: Duration = Duration::from_secs(ONION_NODE_PING_INTERVAL.as_secs() * 8);
69
70const ANNOUNCE_FRIEND: Duration = Duration::from_secs(ONION_NODE_PING_INTERVAL.as_secs() * 6);
72
73const ANNOUNCE_FRIEND_BEGINNING: Duration = Duration::from_secs(3);
76
77const SEARCH_COUNT_FRIEND_ANNOUNCE_BEGINNING: u32 = 17;
80
81const ONION_FRIEND_BACKOFF_FACTOR: u32 = 4;
85
86const ONION_FRIEND_MAX_PING_INTERVAL: Duration =
88 Duration::from_secs(MAX_ONION_FRIEND_NODES as u64 * 60 * 5);
89
90const ONION_NODE_TIMEOUT: Duration = ONION_NODE_PING_INTERVAL;
93
94pub(crate) const TIME_TO_STABLE: Duration = Duration::from_secs(ONION_NODE_PING_INTERVAL.as_secs() * 6);
96
97const ONION_DHTPK_SEND_INTERVAL: Duration = Duration::from_secs(30);
100
101const DHT_DHTPK_SEND_INTERVAL: Duration = Duration::from_secs(20);
104
105const MIN_NODE_PING_TIME: Duration = Duration::from_secs(10);
108
109#[derive(Clone, Debug)]
111struct OnionFriend {
112 real_pk: PublicKey,
114 dht_pk: Option<PublicKey>,
116 temporary_pk: PublicKey,
119 temporary_sk: SecretKey,
122 close_nodes: Kbucket<OnionNode>,
124 last_no_reply: u64,
127 last_dht_pk_onion_sent: Option<Instant>,
130 last_dht_pk_dht_sent: Option<Instant>,
133 search_count: u32,
135 last_seen: Option<Instant>,
137 connected: bool,
139}
140
141impl OnionFriend {
142 pub fn new(real_pk: PublicKey) -> Self {
144 let (temporary_pk, temporary_sk) = gen_keypair();
145 OnionFriend {
146 real_pk,
147 dht_pk: None,
148 temporary_pk,
149 temporary_sk,
150 close_nodes: Kbucket::new(MAX_ONION_FRIEND_NODES),
151 last_no_reply: 0,
152 last_dht_pk_onion_sent: None,
153 last_dht_pk_dht_sent: None,
154 search_count: 0,
155 last_seen: None,
156 connected: false,
157 }
158 }
159}
160
161#[derive(Clone, Debug)]
163struct OnionNode {
164 pk: PublicKey,
166 saddr: SocketAddr,
168 path_id: OnionPathId,
170 ping_id: Option<sha256::Digest>,
172 data_pk: Option<PublicKey>,
175 unsuccessful_pings: u32,
178 added_time: Instant,
180 ping_time: Instant,
182 response_time: Instant,
184 announce_status: AnnounceStatus,
186}
187
188impl HasPK for OnionNode {
189 fn pk(&self) -> PublicKey {
190 self.pk
191 }
192}
193
194impl KbucketNode for OnionNode {
195 type NewNode = OnionNode;
196 type CheckNode = PackedNode;
197
198 fn is_outdated(&self, other: &PackedNode) -> bool {
199 assert_eq!(self.pk, other.pk);
200 self.saddr != other.saddr
201 }
202 fn update(&mut self, other: &OnionNode) {
203 assert_eq!(self.pk, other.pk);
204 self.saddr = other.saddr;
205 self.path_id = other.path_id;
206 self.ping_id = other.ping_id.or(self.ping_id);
207 self.data_pk = other.data_pk.or(self.data_pk);
208 self.unsuccessful_pings = 0;
209 self.response_time = clock_now();
210 self.announce_status = other.announce_status;
211 }
212 fn is_evictable(&self) -> bool {
213 self.is_timed_out()
214 }
215}
216
217impl OnionNode {
218 pub fn is_last_ping_attempt(&self) -> bool {
220 self.unsuccessful_pings == ONION_NODE_MAX_PINGS - 1
221 }
222
223 pub fn is_ping_attempts_exhausted(&self) -> bool {
225 self.unsuccessful_pings >= ONION_NODE_MAX_PINGS
226 }
227
228 pub fn is_timed_out(&self) -> bool {
230 self.is_ping_attempts_exhausted() && clock_elapsed(self.ping_time) >= ONION_NODE_TIMEOUT
231 }
232
233 pub fn is_stable(&self) -> bool {
236 clock_elapsed(self.added_time) >= TIME_TO_STABLE &&
237 (self.unsuccessful_pings == 0 || clock_elapsed(self.ping_time) < ONION_NODE_TIMEOUT)
238 }
239}
240
241#[derive(Clone, Debug, Eq, PartialEq)]
242struct AnnounceRequestData {
243 pk: PublicKey,
245 saddr: SocketAddr,
247 path_id: OnionPathId,
249 friend_pk: Option<PublicKey>,
252}
253
254#[derive(Clone, Debug, Eq, PartialEq)]
256struct AnnouncePacketData<'a> {
257 packet_sk: &'a SecretKey,
259 packet_pk: PublicKey,
261 search_pk: PublicKey,
263 data_pk: Option<PublicKey>,
265}
266
267impl<'a> AnnouncePacketData<'a> {
268 fn request(&self, node_pk: &PublicKey, ping_id: Option<sha256::Digest>, request_id: u64) -> InnerOnionAnnounceRequest {
271 let payload = OnionAnnounceRequestPayload {
272 ping_id: ping_id.unwrap_or_else(initial_ping_id),
273 search_pk: self.search_pk,
274 data_pk: self.data_pk.unwrap_or(PublicKey([0; 32])),
275 sendback_data: request_id,
276 };
277 InnerOnionAnnounceRequest::new(
278 &precompute(node_pk, &self.packet_sk),
279 &self.packet_pk,
280 &payload
281 )
282 }
283 pub fn search_request(&self, node_pk: &PublicKey, request_id: u64) -> InnerOnionAnnounceRequest {
285 self.request(node_pk, None, request_id)
286 }
287 pub fn announce_request(&self, node_pk: &PublicKey, ping_id: sha256::Digest, request_id: u64) -> InnerOnionAnnounceRequest {
289 self.request(node_pk, Some(ping_id), request_id)
290 }
291}
292
293#[derive(Clone, Debug)]
295struct OnionClientState {
296 paths_pool: PathsPool,
298 announce_list: Kbucket<OnionNode>,
300 announce_requests: RequestQueue<AnnounceRequestData>,
302 friends: HashMap<PublicKey, OnionFriend>,
304 dht_pk_tx: Option<DhtPkTx>,
307 friend_request_tx: Option<FriendRequestTx>,
310}
311
312impl OnionClientState {
313 pub fn new() -> Self {
314 OnionClientState {
315 paths_pool: PathsPool::new(),
316 announce_list: Kbucket::new(MAX_ONION_ANNOUNCE_NODES),
317 announce_requests: RequestQueue::new(ANNOUNCE_TIMEOUT),
318 friends: HashMap::new(),
319 dht_pk_tx: None,
320 friend_request_tx: None,
321 }
322 }
323}
324
325#[derive(Clone)]
328pub struct OnionClient {
329 dht: DhtServer,
331 tcp_connections: TcpConnections,
333 real_sk: SecretKey,
335 real_pk: PublicKey,
337 data_sk: SecretKey,
339 data_pk: PublicKey,
342 state: Arc<Mutex<OnionClientState>>,
344}
345
346impl OnionClient {
347 pub fn new(
349 dht: DhtServer,
350 tcp_connections: TcpConnections,
351 real_sk: SecretKey,
352 real_pk: PublicKey
353 ) -> Self {
354 let (data_pk, data_sk) = gen_keypair();
355 OnionClient {
356 dht,
357 tcp_connections,
358 real_sk,
359 real_pk,
360 data_sk,
361 data_pk,
362 state: Arc::new(Mutex::new(OnionClientState::new())),
363 }
364 }
365
366 pub async fn set_dht_pk_sink(&self, dht_pk_tx: DhtPkTx) {
368 self.state.lock().await.dht_pk_tx = Some(dht_pk_tx);
369 }
370
371 pub async fn set_friend_request_sink(&self, friend_request_sink: FriendRequestTx) {
373 self.state.lock().await.friend_request_tx = Some(friend_request_sink)
374 }
375
376 fn is_pinged_recently(&self, pk: PublicKey, search_pk: PublicKey, request_queue: &RequestQueue<AnnounceRequestData>) -> bool {
378 let check_pks = |data: &AnnounceRequestData| -> bool {
379 let request_search_pk = if let Some(friend_pk) = data.friend_pk {
380 friend_pk
381 } else {
382 self.real_pk
383 };
384 data.pk == pk && search_pk == request_search_pk
385 };
386 request_queue.get_values()
387 .any(|(ping_time, request_data)| check_pks(request_data) &&
388 clock_elapsed(ping_time) < MIN_NODE_PING_TIME)
389 }
390
391 async fn send_onion_request(&self, path: OnionPath, inner_onion_request: InnerOnionRequest, saddr: SocketAddr)
393 -> Result<(), mpsc::SendError> {
394 match path.path_type {
395 OnionPathType::TCP => {
396 let onion_request = path.create_tcp_onion_request(saddr, inner_onion_request);
397 self.tcp_connections.send_onion(path.nodes[0].public_key, onion_request).await.ok();
400 Ok(())
401 },
402 OnionPathType::UDP => {
403 let onion_request =
404 path.create_udp_onion_request(saddr, inner_onion_request);
405 let mut tx = self.dht.tx.clone();
406
407 tx.send((
408 Packet::OnionRequest0(onion_request),
409 path.nodes[0].saddr
410 )).await
411 },
412 }
413 }
414
415 pub async fn handle_announce_response(&self, packet: &OnionAnnounceResponse, is_global: bool) -> Result<(), HandleAnnounceResponseError> {
417 let state = &mut *self.state.lock().await;
418
419 let announce_data = if let Some(announce_data) = state.announce_requests.check_ping_id(packet.sendback_data, |_| true) {
420 announce_data
421 } else {
422 return Err(HandleAnnounceResponseErrorKind::InvalidRequestId.into());
423 };
424
425 let (nodes_list, last_seen, announce_packet_data) = if let Some(ref friend_pk) = announce_data.friend_pk {
427 if let Some(friend) = state.friends.get_mut(friend_pk) {
428 let announce_packet_data = AnnouncePacketData {
429 packet_sk: &friend.temporary_sk,
430 packet_pk: friend.temporary_pk,
431 search_pk: friend.real_pk,
432 data_pk: None,
433 };
434 (&mut friend.close_nodes, Some(&mut friend.last_seen), announce_packet_data)
435 } else {
436 return Err(HandleAnnounceResponseErrorKind::NoFriendWithPk.into());
437 }
438 } else {
439 let announce_packet_data = AnnouncePacketData {
440 packet_sk: &self.real_sk,
441 packet_pk: self.real_pk,
442 search_pk: self.real_pk,
443 data_pk: Some(self.data_pk),
444 };
445 (&mut state.announce_list, None, announce_packet_data)
446 };
447
448 let payload = match packet.get_payload(&precompute(&announce_data.pk, &announce_packet_data.packet_sk)) {
449 Ok(payload) => payload,
450 Err(e) => return Err(e.context(HandleAnnounceResponseErrorKind::InvalidPayload).into())
451 };
452
453 trace!("OnionAnnounceResponse status: {:?}, data: {:?}", payload.announce_status, announce_data);
454
455 if announce_data.friend_pk.is_some() && payload.announce_status == AnnounceStatus::Announced ||
456 announce_data.friend_pk.is_none() && payload.announce_status == AnnounceStatus::Found {
457 return Err(HandleAnnounceResponseErrorKind::InvalidAnnounceStatus.into());
458 }
459
460 state.paths_pool.set_timeouts(announce_data.path_id, announce_data.friend_pk.is_some());
461
462 if payload.announce_status == AnnounceStatus::Found {
463 if let Some(last_seen) = last_seen {
464 *last_seen = Some(clock_now());
465 }
466 }
467
468 let (ping_id, data_pk) = if payload.announce_status == AnnounceStatus::Found {
469 (None, Some(digest_as_pk(payload.ping_id_or_pk)))
470 } else {
471 (Some(payload.ping_id_or_pk), None)
472 };
473
474 let now = clock_now();
475 nodes_list.try_add(&announce_packet_data.search_pk, OnionNode {
476 pk: announce_data.pk,
477 saddr: announce_data.saddr,
478 path_id: announce_data.path_id,
479 ping_id,
480 data_pk,
481 unsuccessful_pings: 0,
482 added_time: now,
483 ping_time: now,
484 response_time: now,
485 announce_status: payload.announce_status,
486 }, true);
487
488 state.paths_pool.path_nodes.put(PackedNode::new(announce_data.saddr, &announce_data.pk));
489
490 for node in &payload.nodes {
491 if !IsGlobal::is_global(&node.ip()) && is_global {
493 continue;
494 }
495
496 if self.is_pinged_recently(node.pk, announce_packet_data.search_pk, &state.announce_requests) {
498 continue;
499 }
500
501 if !nodes_list.can_add(&announce_packet_data.search_pk, &node, true) {
502 continue;
503 }
504
505 let path = if let Some(path) = state.paths_pool.random_path(&self.dht, &self.tcp_connections, announce_data.friend_pk.is_some()).await {
506 path
507 } else {
508 continue
509 };
510
511 let request_id = state.announce_requests.new_ping_id(AnnounceRequestData {
512 pk: node.pk,
513 saddr: node.saddr,
514 path_id: path.id(),
515 friend_pk: announce_data.friend_pk,
516 });
517
518 let inner_announce_request = announce_packet_data.search_request(&node.pk, request_id);
519 self.send_onion_request(path, InnerOnionRequest::InnerOnionAnnounceRequest(inner_announce_request), node.saddr)
520 .await
521 .map_err(|e| e.context(HandleAnnounceResponseErrorKind::SendTo))?;
522 }
523
524 Ok(())
525 }
526
527 pub async fn handle_dht_pk_announce(&self, friend_pk: PublicKey, dht_pk_announce: DhtPkAnnouncePayload) -> Result<(), HandleDhtPkAnnounceError> {
529 let mut state = self.state.lock().await;
530
531 let friend = match state.friends.get_mut(&friend_pk) {
532 Some(friend) => friend,
533 None => return Err(HandleDhtPkAnnounceErrorKind::NoFriendWithPk.into())
534 };
535
536 if dht_pk_announce.no_reply <= friend.last_no_reply {
537 return Err(HandleDhtPkAnnounceErrorKind::InvalidNoReply.into())
538 }
539
540 friend.last_no_reply = dht_pk_announce.no_reply;
541 friend.dht_pk = Some(dht_pk_announce.dht_pk);
542 friend.last_seen = Some(clock_now());
543
544 let tx = state.dht_pk_tx.clone();
545 let dht_pk = dht_pk_announce.dht_pk;
546 maybe_send_unbounded(tx, (friend_pk, dht_pk)).await
547 .map_err(|e| e.context(HandleDhtPkAnnounceErrorKind::SendTo))?;
548
549 for node in dht_pk_announce.nodes.into_iter() {
550 match node.ip_port.protocol {
551 ProtocolType::UDP => {
552 let packed_node = PackedNode::new(node.ip_port.to_saddr(), &node.pk);
553 self.dht.ping_node(&packed_node).await
554 .map_err(|e| e.context(HandleDhtPkAnnounceErrorKind::PingNode))?;
555 },
556 ProtocolType::TCP => {
557 self.tcp_connections.add_relay_connection(node.ip_port.to_saddr(), node.pk, dht_pk_announce.dht_pk).await
558 .map_err(|e| e.context(HandleDhtPkAnnounceErrorKind::AddRelay))?;
559 }
560 }
561 }
562
563 Ok(())
564 }
565
566 pub async fn handle_data_response(&self, packet: &OnionDataResponse) -> Result<(), HandleDataResponseError> {
568 let payload = match packet.get_payload(&precompute(&packet.temporary_pk, &self.data_sk)) {
569 Ok(payload) => payload,
570 Err(e) => return Err(e.context(HandleDataResponseErrorKind::InvalidPayload).into())
571 };
572 let iner_payload = match payload.get_payload(&packet.nonce, &precompute(&payload.real_pk, &self.real_sk)) {
573 Ok(payload) => payload,
574 Err(e) => return Err(e.context(HandleDataResponseErrorKind::InvalidInnerPayload).into())
575 };
576 match iner_payload {
577 OnionDataResponseInnerPayload::DhtPkAnnounce(dht_pk_announce) =>
578 self.handle_dht_pk_announce(payload.real_pk, dht_pk_announce).await
579 .map_err(|e| e.context(HandleDataResponseErrorKind::DhtPkAnnounce).into()),
580 OnionDataResponseInnerPayload::FriendRequest(friend_request) => {
581 let tx = self.state.lock().await.friend_request_tx.clone();
582 maybe_send_unbounded(tx, (payload.real_pk, friend_request)).await
583 .map_err(|e| e.context(HandleDataResponseErrorKind::FriendRequest).into())
584 }
585 }
586 }
587
588 pub async fn add_path_node(&self, node: PackedNode) {
590 let mut state = self.state.lock().await;
591
592 state.paths_pool.path_nodes.put(node);
593 }
594
595 pub async fn add_friend(&self, real_pk: PublicKey) {
597 let mut state = self.state.lock().await;
598
599 state.friends.insert(real_pk, OnionFriend::new(real_pk));
600 }
601
602 pub async fn remove_friend(&self, real_pk: PublicKey) {
604 let mut state = self.state.lock().await;
605
606 state.friends.remove(&real_pk);
607 }
608
609 pub async fn set_friend_connected(&self, real_pk: PublicKey, connected: bool) {
612 let mut state = self.state.lock().await;
613
614 if let Some(friend) = state.friends.get_mut(&real_pk) {
615 if friend.connected && !connected {
616 friend.last_seen = Some(clock_now());
617 friend.search_count = 0;
618 friend.last_no_reply = 0;
621 }
622 friend.connected = connected;
623 }
624 }
625
626 pub async fn set_friend_dht_pk(&self, real_pk: PublicKey, dht_pk: PublicKey) {
628 let mut state = self.state.lock().await;
629
630 if let Some(friend) = state.friends.get_mut(&real_pk) {
631 friend.dht_pk = Some(dht_pk);
632 }
633 }
634
635 async fn ping_close_nodes<'a>(
637 &self,
638 close_nodes: &mut Kbucket<OnionNode>,
639 paths_pool: &mut PathsPool,
640 announce_requests: &mut RequestQueue<AnnounceRequestData>,
641 announce_packet_data: AnnouncePacketData<'a>,
642 friend_pk: Option<PublicKey>,
643 interval: Option<Duration>
644 ) -> Result<bool, mpsc::SendError> {
645 let capacity = close_nodes.capacity();
646 let ping_random = close_nodes.iter().all(|node|
647 clock_elapsed(node.ping_time) >= ONION_NODE_PING_INTERVAL &&
648 interval.map_or(true, |interval| clock_elapsed(node.response_time) >= interval / capacity as u32)
650 );
651 let mut packets_sent = false;
652 let mut good_nodes_count = 0;
653 for node in close_nodes.iter_mut() {
654 if !node.is_timed_out() {
655 good_nodes_count += 1;
656 }
657
658 if node.is_ping_attempts_exhausted() {
659 continue;
660 }
661
662 let interval = if let Some(interval) = interval {
663 interval
664 } else if node.announce_status == AnnounceStatus::Announced {
665 if let Some(stored_path) = paths_pool.get_stored_path(node.path_id, friend_pk.is_some()) {
666 if node.is_stable() && stored_path.is_stable() {
667 ANNOUNCE_INTERVAL_STABLE
668 } else {
669 ANNOUNCE_INTERVAL_ANNOUNCED
670 }
671 } else {
672 ANNOUNCE_INTERVAL_NOT_ANNOUNCED
673 }
674 } else {
675 ANNOUNCE_INTERVAL_NOT_ANNOUNCED
676 };
677
678 if clock_elapsed(node.ping_time) >= interval || ping_random && random_limit_usize(capacity) == 0 {
679 let path = if node.is_last_ping_attempt() && node.is_stable() {
681 paths_pool.random_path(&self.dht, &self.tcp_connections, friend_pk.is_some()).await
682 } else {
683 paths_pool.get_or_random_path(&self.dht, &self.tcp_connections, node.path_id, friend_pk.is_some()).await
684 };
685
686 let path = if let Some(path) = path {
687 path
688 } else {
689 continue
690 };
691
692 node.unsuccessful_pings += 1;
693 node.ping_time = clock_now();
694
695 let request_id = announce_requests.new_ping_id(AnnounceRequestData {
696 pk: node.pk,
697 saddr: node.saddr,
698 path_id: path.id(),
699 friend_pk,
700 });
701
702 let inner_announce_request = match node.ping_id {
703 Some(ping_id) if friend_pk.is_none() => announce_packet_data.announce_request(&node.pk, ping_id, request_id),
704 _ => announce_packet_data.search_request(&node.pk, request_id)
705 };
706 self.send_onion_request(path, InnerOnionRequest::InnerOnionAnnounceRequest(inner_announce_request), node.saddr).await?;
707 packets_sent = true;
708 }
709 }
710
711 if good_nodes_count <= random_limit_usize(close_nodes.capacity()) {
712 for _ in 0 .. close_nodes.capacity() / 2 {
713 let node = if let Some(node) = paths_pool.path_nodes.rand() {
714 node
715 } else {
716 break
717 };
718
719 let path = if let Some(path) = paths_pool.random_path(&self.dht, &self.tcp_connections, friend_pk.is_some()).await {
720 path
721 } else {
722 break
723 };
724
725 let request_id = announce_requests.new_ping_id(AnnounceRequestData {
726 pk: node.pk,
727 saddr: node.saddr,
728 path_id: path.id(),
729 friend_pk,
730 });
731
732 let inner_announce_request = announce_packet_data.request(&node.pk, None, request_id);
733 self.send_onion_request(path, InnerOnionRequest::InnerOnionAnnounceRequest(inner_announce_request), node.saddr).await?;
734 packets_sent = true;
735 }
736 }
737
738 Ok(packets_sent)
739 }
740
741 async fn announce_loop(&self, state: &mut OnionClientState) -> Result<(), RunError> {
743 let announce_packet_data = AnnouncePacketData {
744 packet_sk: &self.real_sk,
745 packet_pk: self.real_pk,
746 search_pk: self.real_pk,
747 data_pk: Some(self.data_pk),
748 };
749
750 self.ping_close_nodes(
751 &mut state.announce_list,
752 &mut state.paths_pool,
753 &mut state.announce_requests,
754 announce_packet_data,
755 None,
756 None,
757 ).await.map(drop).map_err(|e| e.context(RunErrorKind::SendTo).into())
758 }
759
760 async fn dht_pk_nodes(&self) -> Vec<TcpUdpPackedNode> {
762 let relays = self.tcp_connections.get_random_relays(2).await;
763 let close_nodes: Vec<PackedNode> = self.dht.get_closest(&self.dht.pk, 4 - relays.len() as u8, false).await.into();
764 relays.into_iter().map(|node| TcpUdpPackedNode {
765 pk: node.pk,
766 ip_port: IpPort::from_tcp_saddr(node.saddr),
767 }).chain(close_nodes.into_iter().map(|node| TcpUdpPackedNode {
768 pk: node.pk,
769 ip_port: IpPort::from_udp_saddr(node.saddr),
770 })).collect()
771 }
772
773 async fn send_dht_pk_onion(&self, friend: &mut OnionFriend, paths_pool: &mut PathsPool) -> Result<(), mpsc::SendError> {
775 let dht_pk_announce = DhtPkAnnouncePayload::new(self.dht.pk, self.dht_pk_nodes().await);
776 let inner_payload = OnionDataResponseInnerPayload::DhtPkAnnounce(dht_pk_announce);
777 let nonce = gen_nonce();
778 let payload = OnionDataResponsePayload::new(&precompute(&friend.real_pk, &self.real_sk), self.real_pk, &nonce, &inner_payload);
779
780 let mut packets_sent = false;
781
782 for node in friend.close_nodes.iter() {
783 if node.is_timed_out() {
784 continue;
785 }
786
787 let data_pk = if let Some(data_pk) = node.data_pk {
788 data_pk
789 } else {
790 continue
791 };
792
793 let path = if let Some(path) = paths_pool.get_or_random_path(&self.dht, &self.tcp_connections, node.path_id, true).await {
794 path
795 } else {
796 continue
797 };
798
799 let (temporary_pk, temporary_sk) = gen_keypair();
800 let inner_data_request = InnerOnionDataRequest::new(&precompute(&data_pk, &temporary_sk), friend.real_pk, temporary_pk, nonce, &payload);
801
802 self.send_onion_request(path, InnerOnionRequest::InnerOnionDataRequest(inner_data_request), node.saddr).await?;
803 packets_sent = true;
804 }
805
806 if packets_sent {
807 friend.last_dht_pk_onion_sent = Some(clock_now());
808 }
809
810 Ok(())
811 }
812
813 async fn send_dht_pk_dht_request(&self, friend: &mut OnionFriend) -> Result<(), mpsc::SendError> {
815 let friend_dht_pk = if let Some(friend_dht_pk) = friend.dht_pk {
816 friend_dht_pk
817 } else {
818 return Ok(());
819 };
820
821 let dht_pk_announce_payload = DhtPkAnnouncePayload::new(self.dht.pk, self.dht_pk_nodes().await);
822 let dht_pk_announce = DhtPkAnnounce::new(
823 &precompute(&friend.real_pk, &self.real_sk),
824 self.real_pk,
825 &dht_pk_announce_payload
826 );
827 let payload = DhtRequestPayload::DhtPkAnnounce(dht_pk_announce);
828 let packet = DhtRequest::new(
829 &precompute(&friend_dht_pk, &self.dht.sk),
830 &friend_dht_pk,
831 &self.dht.pk,
832 &payload
833 );
834 let packet = Packet::DhtRequest(packet);
835
836 let nodes = self.dht.get_closest(&friend_dht_pk, 8, false).await;
837
838 if !nodes.is_empty() {
839 friend.last_dht_pk_dht_sent = Some(clock_now());
840 }
841
842 let packets = nodes.iter()
843 .map(|node| (packet.clone(), node.saddr))
844 .collect::<Vec<_>>();
845
846 let mut tx = self.dht.tx.clone();
847 let mut stream = futures::stream::iter(packets).map(Ok);
848 tx.send_all(&mut stream).await
849 }
850
851 async fn friends_loop(&self, state: &mut OnionClientState) -> Result<(), RunError> {
853 for friend in state.friends.values_mut() {
854 if friend.connected {
855 continue;
856 }
857
858 let announce_packet_data = AnnouncePacketData {
859 packet_sk: &friend.temporary_sk,
860 packet_pk: friend.temporary_pk,
861 search_pk: friend.real_pk,
862 data_pk: None,
863 };
864
865 let interval = if friend.search_count < SEARCH_COUNT_FRIEND_ANNOUNCE_BEGINNING {
866 ANNOUNCE_FRIEND_BEGINNING
867 } else {
868 let backoff_interval = friend.last_seen.map_or_else(
869 || ONION_FRIEND_MAX_PING_INTERVAL,
870 |last_seen| clock_elapsed(last_seen) / ONION_FRIEND_BACKOFF_FACTOR
871 );
872 backoff_interval
873 .min(ONION_FRIEND_MAX_PING_INTERVAL)
874 .max(ANNOUNCE_FRIEND)
875 };
876
877 let packets_sent = self.ping_close_nodes(
878 &mut friend.close_nodes,
879 &mut state.paths_pool,
880 &mut state.announce_requests,
881 announce_packet_data,
882 Some(friend.real_pk),
883 Some(interval),
884 ).await.map_err(|e| e.context(RunErrorKind::SendTo))?;
885
886 if packets_sent {
887 friend.search_count = friend.search_count.saturating_add(1);
888 }
889
890 if friend.last_dht_pk_onion_sent.map_or(true, |time| clock_elapsed(time) > ONION_DHTPK_SEND_INTERVAL) {
891 self.send_dht_pk_onion(friend, &mut state.paths_pool).await.map_err(|e| e.context(RunErrorKind::SendTo))?;
892 }
893
894 if friend.last_dht_pk_dht_sent.map_or(true, |time| clock_elapsed(time) > DHT_DHTPK_SEND_INTERVAL) {
895 self.send_dht_pk_dht_request(friend).await.map_err(|e| e.context(RunErrorKind::SendTo))?;
896 }
897 }
898
899 Ok(())
900 }
901
902 async fn populate_path_nodes(&self, state: &mut OnionClientState) {
904 for node in self.dht.random_friend_nodes(MAX_ONION_ANNOUNCE_NODES).await {
905 state.paths_pool.path_nodes.put(node);
906 }
907 }
908
909 pub async fn run(&self) -> Result<(), RunError> {
911 let interval = Duration::from_secs(1);
912 let mut wakeups = tokio::time::interval(interval);
913
914 while wakeups.next().await.is_some() {
915 trace!("Onion client sender wake up");
916
917 let mut state = self.state.lock().await;
918 self.populate_path_nodes(&mut state).await;
919
920 self.announce_loop(&mut state).await?;
921 self.friends_loop(&mut state).await?;
922 }
923
924 Ok(())
925 }
926}
927
928#[cfg(test)]
929mod tests {
930 use super::*;
931 use tox_binary_io::*;
932
933 impl OnionClient {
934 pub async fn has_friend(&self, pk: &PublicKey) -> bool {
935 self.state.lock().await.friends.contains_key(pk)
936 }
937
938 pub async fn friend_dht_pk(&self, pk: &PublicKey) -> Option<PublicKey> {
939 self.state.lock().await.friends.get(pk).and_then(|friend| friend.dht_pk)
940 }
941
942 pub async fn is_friend_connected(&self, pk: &PublicKey) -> bool {
943 self.state.lock().await.friends.get(pk).map_or(false, |friend| friend.connected)
944 }
945 }
946
947 fn unpack_onion_packet(packet: OnionRequest0, saddr: SocketAddr, key_by_addr: &HashMap<SocketAddr, SecretKey>) -> OnionRequest2Payload {
948 let payload = packet.get_payload(&precompute(&packet.temporary_pk, &key_by_addr[&saddr])).unwrap();
949 let packet = OnionRequest1 {
950 nonce: packet.nonce,
951 temporary_pk: payload.temporary_pk,
952 payload: payload.inner,
953 onion_return: OnionReturn {
954 nonce: secretbox::gen_nonce(),
955 payload: vec![42; 123],
956 }
957 };
958 let payload = packet.get_payload(&precompute(&packet.temporary_pk, &key_by_addr[&payload.ip_port.to_saddr()])).unwrap();
959 let packet = OnionRequest2 {
960 nonce: packet.nonce,
961 temporary_pk: payload.temporary_pk,
962 payload: payload.inner,
963 onion_return: OnionReturn {
964 nonce: secretbox::gen_nonce(),
965 payload: vec![42; 123],
966 }
967 };
968 packet.get_payload(&precompute(&packet.temporary_pk, &key_by_addr[&payload.ip_port.to_saddr()])).unwrap()
969 }
970
971 #[test]
972 fn onion_node_is_outdated() {
973 let now = Instant::now();
974 let (pk, _sk) = gen_keypair();
975 let saddr = "127.0.0.1:12345".parse().unwrap();
976 let onion_node = OnionNode {
977 pk,
978 saddr,
979 path_id: OnionPathId {
980 keys: [gen_keypair().0, gen_keypair().0, gen_keypair().0],
981 path_type: OnionPathType::UDP,
982 },
983 ping_id: None,
984 data_pk: None,
985 unsuccessful_pings: 0,
986 added_time: now,
987 ping_time: now,
988 response_time: now,
989 announce_status: AnnounceStatus::Announced,
990 };
991
992 assert!(!onion_node.is_outdated(&PackedNode::new(saddr, &pk)));
993 let other_saddr = "127.0.0.1:12346".parse().unwrap();
994 assert!(onion_node.is_outdated(&PackedNode::new(other_saddr, &pk)))
995 }
996
997 #[tokio::test]
998 async fn onion_node_update() {
999 tokio::time::pause();
1000 let now = clock_now();
1001 let (pk, _sk) = gen_keypair();
1002 let mut onion_node = OnionNode {
1003 pk,
1004 saddr: "127.0.0.1:12345".parse().unwrap(),
1005 path_id: OnionPathId {
1006 keys: [gen_keypair().0, gen_keypair().0, gen_keypair().0],
1007 path_type: OnionPathType::UDP,
1008 },
1009 ping_id: None,
1010 data_pk: None,
1011 unsuccessful_pings: 1,
1012 added_time: now,
1013 ping_time: now,
1014 response_time: now,
1015 announce_status: AnnounceStatus::Failed,
1016 };
1017
1018 let saddr = "127.0.0.1:12346".parse().unwrap();
1019 let path_id = OnionPathId {
1020 keys: [gen_keypair().0, gen_keypair().0, gen_keypair().0],
1021 path_type: OnionPathType::UDP,
1022 };
1023 let ping_id = sha256::hash(&[1, 2, 3]);
1024 let data_pk = gen_keypair().0;
1025 let new_now = now + Duration::from_secs(1);
1026 let other_onion_node = OnionNode {
1027 pk,
1028 saddr,
1029 path_id,
1030 ping_id: Some(ping_id),
1031 data_pk: Some(data_pk),
1032 unsuccessful_pings: 0,
1033 added_time: now,
1034 ping_time: now,
1035 response_time: now,
1036 announce_status: AnnounceStatus::Announced,
1037 };
1038
1039 tokio::time::advance(Duration::from_secs(1)).await;
1040
1041 onion_node.update(&other_onion_node);
1042
1043 assert_eq!(onion_node.saddr, saddr);
1044 assert_eq!(onion_node.path_id, path_id);
1045 assert_eq!(onion_node.ping_id, Some(ping_id));
1046 assert_eq!(onion_node.data_pk, Some(data_pk));
1047 assert_eq!(onion_node.unsuccessful_pings, 0);
1048 assert_eq!(onion_node.added_time, now);
1049 assert_eq!(onion_node.ping_time, now);
1050 assert_eq!(onion_node.response_time, new_now);
1051 assert_eq!(onion_node.announce_status, AnnounceStatus::Announced);
1052 }
1053
1054 #[tokio::test]
1055 async fn onion_node_is_evictable() {
1056 let now = Instant::now();
1057 let mut onion_node = OnionNode {
1058 pk: gen_keypair().0,
1059 saddr: "127.0.0.1:12345".parse().unwrap(),
1060 path_id: OnionPathId {
1061 keys: [gen_keypair().0, gen_keypair().0, gen_keypair().0],
1062 path_type: OnionPathType::UDP,
1063 },
1064 ping_id: None,
1065 data_pk: None,
1066 unsuccessful_pings: 0,
1067 added_time: now,
1068 ping_time: now,
1069 response_time: now,
1070 announce_status: AnnounceStatus::Announced,
1071 };
1072
1073 assert!(!onion_node.is_evictable());
1074
1075 onion_node.unsuccessful_pings = ONION_NODE_MAX_PINGS;
1076
1077 tokio::time::pause();
1078 tokio::time::advance(ONION_NODE_TIMEOUT + Duration::from_secs(1)).await;
1080
1081 assert!(onion_node.is_evictable());
1082 }
1083
1084 #[tokio::test]
1085 async fn add_path_node() {
1086 let (dht_pk, dht_sk) = gen_keypair();
1087 let (real_pk, real_sk) = gen_keypair();
1088 let (udp_tx, _udp_rx) = mpsc::channel(1);
1089 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1090 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1091 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1092 let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
1093
1094 let node = PackedNode::new("127.0.0.1:12345".parse().unwrap(), &gen_keypair().0);
1095 onion_client.add_path_node(node).await;
1096
1097 let state = onion_client.state.lock().await;
1098 assert_eq!(state.paths_pool.path_nodes.rand(), Some(node));
1099 }
1100
1101 #[tokio::test]
1102 async fn add_remove_friend() {
1103 let (dht_pk, dht_sk) = gen_keypair();
1104 let (real_pk, real_sk) = gen_keypair();
1105 let (udp_tx, _udp_rx) = mpsc::channel(1);
1106 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1107 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1108 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1109 let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
1110
1111 let (friend_pk, _friend_sk) = gen_keypair();
1112 onion_client.add_friend(friend_pk).await;
1113
1114 assert_eq!(onion_client.state.lock().await.friends[&friend_pk].real_pk, friend_pk);
1115
1116 onion_client.remove_friend(friend_pk).await;
1117
1118 assert!(!onion_client.state.lock().await.friends.contains_key(&friend_pk));
1119 }
1120
1121 #[tokio::test]
1122 async fn set_friend_connected() {
1123 let (dht_pk, dht_sk) = gen_keypair();
1124 let (real_pk, real_sk) = gen_keypair();
1125 let (udp_tx, _udp_rx) = mpsc::channel(1);
1126 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1127 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1128 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1129 let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
1130
1131 let (friend_pk, _friend_sk) = gen_keypair();
1132 onion_client.add_friend(friend_pk).await;
1133
1134 onion_client.set_friend_connected(friend_pk, true).await;
1135
1136 let state = onion_client.state.lock().await;
1137 assert!(state.friends[&friend_pk].connected);
1138 drop(state);
1139
1140 tokio::time::pause();
1141 let now = clock_now();
1142
1143 onion_client.set_friend_connected(friend_pk, false).await;
1144
1145 let state = onion_client.state.lock().await;
1146 let friend = &state.friends[&friend_pk];
1147 assert!(!friend.connected);
1148 assert_eq!(friend.last_seen, Some(now));
1149 }
1150
1151 #[tokio::test]
1152 async fn set_friend_dht_pk() {
1153 let (dht_pk, dht_sk) = gen_keypair();
1154 let (real_pk, real_sk) = gen_keypair();
1155 let (udp_tx, _udp_rx) = mpsc::channel(1);
1156 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1157 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1158 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1159 let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
1160
1161 let (friend_pk, _friend_sk) = gen_keypair();
1162 onion_client.add_friend(friend_pk).await;
1163
1164 let (friend_dht_pk, _friend_dht_sk) = gen_keypair();
1165 onion_client.set_friend_dht_pk(friend_pk, friend_dht_pk).await;
1166
1167 let state = onion_client.state.lock().await;
1168 assert_eq!(state.friends[&friend_pk].dht_pk, Some(friend_dht_pk));
1169 }
1170
1171 #[tokio::test]
1172 async fn handle_announce_response_announced() {
1173 let (dht_pk, dht_sk) = gen_keypair();
1174 let (real_pk, real_sk) = gen_keypair();
1175 let (udp_tx, udp_rx) = mpsc::channel(1);
1176 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1177 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1178 dht.add_node(PackedNode::new("127.0.0.1:12345".parse().unwrap(), &gen_keypair().0)).await;
1180 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1181 let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
1182
1183 let mut state = onion_client.state.lock().await;
1184
1185 let mut key_by_addr = HashMap::new();
1187 let addr = "127.0.0.1".parse().unwrap();
1188 for i in 0 .. 3 {
1189 let saddr = SocketAddr::new(addr, 12346 + i);
1190 let (pk, sk) = gen_keypair();
1191 key_by_addr.insert(saddr, sk);
1192 let node = PackedNode::new(saddr, &pk);
1193 state.paths_pool.path_nodes.put(node);
1194 }
1195 let path = state.paths_pool.path_nodes.udp_path().unwrap();
1196
1197 let (sender_pk, sender_sk) = gen_keypair();
1198 let saddr = "127.0.0.1:12345".parse().unwrap();
1199
1200 key_by_addr.insert(saddr, sender_sk.clone());
1202
1203 let request_data = AnnounceRequestData {
1204 pk: sender_pk,
1205 saddr,
1206 path_id: path.id(),
1207 friend_pk: None,
1208 };
1209 let request_id = state.announce_requests.new_ping_id(request_data);
1210
1211 drop(state);
1212
1213 let ping_id = sha256::hash(&[1, 2, 3]);
1214 let (node_pk, node_sk) = gen_keypair();
1215 let node = PackedNode::new(SocketAddr::V4("5.6.7.8:12345".parse().unwrap()), &node_pk);
1216 let payload = OnionAnnounceResponsePayload {
1217 announce_status: AnnounceStatus::Announced,
1218 ping_id_or_pk: ping_id,
1219 nodes: vec![node]
1220 };
1221 let packet = OnionAnnounceResponse::new(&precompute(&real_pk, &sender_sk), request_id, &payload);
1222
1223 onion_client.handle_announce_response(&packet, true).await.unwrap();
1224
1225 let state = onion_client.state.lock().await;
1226
1227 let onion_node = state.announce_list.get_node(&real_pk, &sender_pk).unwrap();
1229 assert_eq!(onion_node.path_id, path.id());
1230 assert_eq!(onion_node.ping_id, Some(ping_id));
1231 assert_eq!(onion_node.data_pk, None);
1232 assert_eq!(onion_node.announce_status, AnnounceStatus::Announced);
1233
1234 let (received, _udp_rx) = udp_rx.into_future().await;
1236 let (packet, addr_to_send) = received.unwrap();
1237
1238 let packet = unpack!(packet, Packet::OnionRequest0);
1239 let payload = unpack_onion_packet(packet, addr_to_send, &key_by_addr);
1240 let packet = unpack!(payload.inner, InnerOnionRequest::InnerOnionAnnounceRequest);
1241 let payload = packet.get_payload(&precompute(&real_pk, &node_sk)).unwrap();
1242 assert_eq!(payload.ping_id, initial_ping_id());
1243 assert_eq!(payload.search_pk, real_pk);
1244 assert_eq!(payload.data_pk, onion_client.data_pk);
1245 }
1246
1247 #[tokio::test]
1248 async fn handle_announce_response_announced_invalid_status() {
1249 let (dht_pk, dht_sk) = gen_keypair();
1250 let (real_pk, real_sk) = gen_keypair();
1251 let (udp_tx, _udp_rx) = mpsc::channel(1);
1252 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1253 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1254 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1255 let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
1256
1257 let mut state = onion_client.state.lock().await;
1258
1259 let (friend_pk, _friend_sk) = gen_keypair();
1260 let friend = OnionFriend::new(friend_pk);
1261 let friend_temporary_pk = friend.temporary_pk;
1262 state.friends.insert(friend_pk, friend);
1263
1264 let addr = "127.0.0.1".parse().unwrap();
1265 for i in 0 .. 3 {
1266 let saddr = SocketAddr::new(addr, 12346 + i);
1267 let (pk, _sk) = gen_keypair();
1268 let node = PackedNode::new(saddr, &pk);
1269 state.paths_pool.path_nodes.put(node);
1270 }
1271 let path = state.paths_pool.path_nodes.udp_path().unwrap();
1272
1273 let (sender_pk, sender_sk) = gen_keypair();
1274 let saddr = "127.0.0.1:12345".parse().unwrap();
1275
1276 let request_data = AnnounceRequestData {
1277 pk: sender_pk,
1278 saddr,
1279 path_id: path.id(),
1280 friend_pk: Some(friend_pk),
1281 };
1282 let request_id = state.announce_requests.new_ping_id(request_data);
1283
1284 drop(state);
1285
1286 let ping_id = sha256::hash(&[1, 2, 3]);
1287 let (node_pk, _node_sk) = gen_keypair();
1288 let node = PackedNode::new(SocketAddr::V4("5.6.7.8:12345".parse().unwrap()), &node_pk);
1289 let payload = OnionAnnounceResponsePayload {
1290 announce_status: AnnounceStatus::Announced,
1291 ping_id_or_pk: ping_id,
1292 nodes: vec![node]
1293 };
1294 let packet = OnionAnnounceResponse::new(&precompute(&friend_temporary_pk, &sender_sk), request_id, &payload);
1295
1296 let error = onion_client.handle_announce_response(&packet, true).await.err().unwrap();
1297 assert_eq!(error.kind(), &HandleAnnounceResponseErrorKind::InvalidAnnounceStatus);
1298 }
1299
1300 #[tokio::test]
1301 async fn handle_announce_response_announced_pinged_recently() {
1302 let (dht_pk, dht_sk) = gen_keypair();
1303 let (real_pk, real_sk) = gen_keypair();
1304 let (udp_tx, udp_rx) = mpsc::channel(1);
1305 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1306 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1307 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1308 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1309
1310 let mut state = onion_client.state.lock().await;
1311
1312 let addr = "127.0.0.1".parse().unwrap();
1313 for i in 0 .. 3 {
1314 let saddr = SocketAddr::new(addr, 12346 + i);
1315 let (pk, _sk) = gen_keypair();
1316 let node = PackedNode::new(saddr, &pk);
1317 state.paths_pool.path_nodes.put(node);
1318 }
1319 let path = state.paths_pool.path_nodes.udp_path().unwrap();
1320
1321 let (sender_pk, sender_sk) = gen_keypair();
1322 let saddr = "127.0.0.1:12345".parse().unwrap();
1323
1324 let request_data = AnnounceRequestData {
1325 pk: sender_pk,
1326 saddr,
1327 path_id: path.id(),
1328 friend_pk: None,
1329 };
1330 let request_id = state.announce_requests.new_ping_id(request_data);
1331
1332 let (node_pk, _node_sk) = gen_keypair();
1334 let node = PackedNode::new(SocketAddr::V4("5.6.7.8:12345".parse().unwrap()), &node_pk);
1335 let node_request_data = AnnounceRequestData {
1336 pk: node_pk,
1337 saddr: node.saddr,
1338 path_id: path.id(),
1339 friend_pk: None,
1340 };
1341 let _node_request_id = state.announce_requests.new_ping_id(node_request_data);
1342
1343 drop(state);
1344
1345 let payload = OnionAnnounceResponsePayload {
1346 announce_status: AnnounceStatus::Announced,
1347 ping_id_or_pk: sha256::hash(&[1, 2, 3]),
1348 nodes: vec![node]
1349 };
1350 let packet = OnionAnnounceResponse::new(&precompute(&real_pk, &sender_sk), request_id, &payload);
1351
1352 onion_client.handle_announce_response(&packet, true).await.unwrap();
1353
1354 drop(onion_client);
1356
1357 assert!(udp_rx.collect::<Vec<_>>().await.is_empty());
1358 }
1359
1360 #[tokio::test]
1361 async fn handle_announce_response_found() {
1362 let (dht_pk, dht_sk) = gen_keypair();
1363 let (real_pk, real_sk) = gen_keypair();
1364 let (udp_tx, udp_rx) = mpsc::channel(1);
1365 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1366 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1367 dht.add_node(PackedNode::new("127.0.0.1:12345".parse().unwrap(), &gen_keypair().0)).await;
1369 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1370 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1371
1372 let mut state = onion_client.state.lock().await;
1373
1374 let (friend_pk, _friend_sk) = gen_keypair();
1375 let friend = OnionFriend::new(friend_pk);
1376 let friend_temporary_pk = friend.temporary_pk;
1377 state.friends.insert(friend_pk, friend);
1378
1379 let mut key_by_addr = HashMap::new();
1381 let addr = "127.0.0.1".parse().unwrap();
1382 for i in 0 .. 3 {
1383 let saddr = SocketAddr::new(addr, 12346 + i);
1384 let (pk, sk) = gen_keypair();
1385 key_by_addr.insert(saddr, sk);
1386 let node = PackedNode::new(saddr, &pk);
1387 state.paths_pool.path_nodes.put(node);
1388 }
1389 let path = state.paths_pool.path_nodes.udp_path().unwrap();
1390
1391 let (sender_pk, sender_sk) = gen_keypair();
1392 let saddr = "127.0.0.1:12345".parse().unwrap();
1393
1394 key_by_addr.insert(saddr, sender_sk.clone());
1396
1397 let request_data = AnnounceRequestData {
1398 pk: sender_pk,
1399 saddr,
1400 path_id: path.id(),
1401 friend_pk: Some(friend_pk),
1402 };
1403 let request_id = state.announce_requests.new_ping_id(request_data);
1404
1405 drop(state);
1406
1407 let (friend_data_pk, _friend_data_sk) = gen_keypair();
1408 let (node_pk, node_sk) = gen_keypair();
1409 let node = PackedNode::new(SocketAddr::V4("5.6.7.8:12345".parse().unwrap()), &node_pk);
1410 let payload = OnionAnnounceResponsePayload {
1411 announce_status: AnnounceStatus::Found,
1412 ping_id_or_pk: pk_as_digest(friend_data_pk),
1413 nodes: vec![node]
1414 };
1415 let packet = OnionAnnounceResponse::new(&precompute(&friend_temporary_pk, &sender_sk), request_id, &payload);
1416
1417 onion_client.handle_announce_response(&packet, true).await.unwrap();
1418
1419 let state = onion_client.state.lock().await;
1420
1421 let onion_node = state.friends[&friend_pk].close_nodes.get_node(&real_pk, &sender_pk).unwrap();
1423 assert_eq!(onion_node.path_id, path.id());
1424 assert_eq!(onion_node.ping_id, None);
1425 assert_eq!(onion_node.data_pk, Some(friend_data_pk));
1426 assert_eq!(onion_node.announce_status, AnnounceStatus::Found);
1427
1428 let (received, _udp_rx) = udp_rx.into_future().await;
1430 let (packet, addr_to_send) = received.unwrap();
1431
1432 let packet = unpack!(packet, Packet::OnionRequest0);
1433 let payload = unpack_onion_packet(packet, addr_to_send, &key_by_addr);
1434 let packet = unpack!(payload.inner, InnerOnionRequest::InnerOnionAnnounceRequest);
1435 let payload = packet.get_payload(&precompute(&friend_temporary_pk, &node_sk)).unwrap();
1436 assert_eq!(payload.ping_id, initial_ping_id());
1437 assert_eq!(payload.search_pk, friend_pk);
1438 assert_eq!(payload.data_pk, PublicKey([0; 32]));
1439 }
1440
1441 #[tokio::test]
1442 async fn handle_announce_response_found_invalid_status() {
1443 let (dht_pk, dht_sk) = gen_keypair();
1444 let (real_pk, real_sk) = gen_keypair();
1445 let (udp_tx, _udp_rx) = mpsc::channel(1);
1446 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1447 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1448 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1449 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1450
1451 let mut state = onion_client.state.lock().await;
1452
1453 let addr = "127.0.0.1".parse().unwrap();
1454 for i in 0 .. 3 {
1455 let saddr = SocketAddr::new(addr, 12346 + i);
1456 let (pk, _sk) = gen_keypair();
1457 let node = PackedNode::new(saddr, &pk);
1458 state.paths_pool.path_nodes.put(node);
1459 }
1460 let path = state.paths_pool.path_nodes.udp_path().unwrap();
1461
1462 let (sender_pk, sender_sk) = gen_keypair();
1463 let saddr = "127.0.0.1:12345".parse().unwrap();
1464
1465 let request_data = AnnounceRequestData {
1466 pk: sender_pk,
1467 saddr,
1468 path_id: path.id(),
1469 friend_pk: None,
1470 };
1471 let request_id = state.announce_requests.new_ping_id(request_data);
1472
1473 drop(state);
1474
1475 let (friend_data_pk, _friend_data_sk) = gen_keypair();
1476 let (node_pk, _node_sk) = gen_keypair();
1477 let node = PackedNode::new(SocketAddr::V4("5.6.7.8:12345".parse().unwrap()), &node_pk);
1478 let payload = OnionAnnounceResponsePayload {
1479 announce_status: AnnounceStatus::Found,
1480 ping_id_or_pk: pk_as_digest(friend_data_pk),
1481 nodes: vec![node]
1482 };
1483 let packet = OnionAnnounceResponse::new(&precompute(&real_pk, &sender_sk), request_id, &payload);
1484
1485 let error = onion_client.handle_announce_response(&packet, true).await.err().unwrap();
1486 assert_eq!(error.kind(), &HandleAnnounceResponseErrorKind::InvalidAnnounceStatus);
1487 }
1488
1489 #[tokio::test]
1490 async fn handle_announce_response_no_friend_with_pk() {
1491 let (dht_pk, dht_sk) = gen_keypair();
1492 let (real_pk, real_sk) = gen_keypair();
1493 let (udp_tx, _udp_rx) = mpsc::channel(1);
1494 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1495 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1496 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1497 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1498
1499 let mut state = onion_client.state.lock().await;
1500
1501 let (friend_pk, _friend_sk) = gen_keypair();
1502 let (friend_temporary_pk, _friend_temporary_sk) = gen_keypair();
1503 let (sender_pk, sender_sk) = gen_keypair();
1504 let saddr = "127.0.0.1:12345".parse().unwrap();
1505
1506 let request_data = AnnounceRequestData {
1507 pk: sender_pk,
1508 saddr,
1509 path_id: OnionPathId {
1510 keys: [gen_keypair().0, gen_keypair().0, gen_keypair().0],
1511 path_type: OnionPathType::UDP,
1512 },
1513 friend_pk: Some(friend_pk),
1514 };
1515 let request_id = state.announce_requests.new_ping_id(request_data);
1516
1517 drop(state);
1518
1519 let (friend_data_pk, _friend_data_sk) = gen_keypair();
1520 let payload = OnionAnnounceResponsePayload {
1521 announce_status: AnnounceStatus::Found,
1522 ping_id_or_pk: pk_as_digest(friend_data_pk),
1523 nodes: vec![]
1524 };
1525 let packet = OnionAnnounceResponse::new(&precompute(&friend_temporary_pk, &sender_sk), request_id, &payload);
1526
1527 let error = onion_client.handle_announce_response(&packet, true).await.err().unwrap();
1528 assert_eq!(error.kind(), &HandleAnnounceResponseErrorKind::NoFriendWithPk);
1529 }
1530
1531 #[tokio::test]
1532 async fn handle_announce_response_invalid_payload() {
1533 let (dht_pk, dht_sk) = gen_keypair();
1534 let (real_pk, real_sk) = gen_keypair();
1535 let (udp_tx, _udp_rx) = mpsc::channel(1);
1536 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1537 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1538 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1539 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1540
1541 let mut state = onion_client.state.lock().await;
1542
1543 let (friend_pk, _friend_sk) = gen_keypair();
1544 let friend = OnionFriend::new(friend_pk);
1545 state.friends.insert(friend_pk, friend);
1546
1547 let (sender_pk, _sender_sk) = gen_keypair();
1548 let saddr = "127.0.0.1:12345".parse().unwrap();
1549
1550 let request_data = AnnounceRequestData {
1551 pk: sender_pk,
1552 saddr,
1553 path_id: OnionPathId {
1554 keys: [gen_keypair().0, gen_keypair().0, gen_keypair().0],
1555 path_type: OnionPathType::UDP,
1556 },
1557 friend_pk: Some(friend_pk),
1558 };
1559 let request_id = state.announce_requests.new_ping_id(request_data);
1560
1561 drop(state);
1562
1563 let packet = OnionAnnounceResponse {
1564 sendback_data: request_id,
1565 nonce: gen_nonce(),
1566 payload: vec![42; 123],
1567 };
1568
1569 let error = onion_client.handle_announce_response(&packet, true).await.err().unwrap();
1570 assert_eq!(error.kind(), &HandleAnnounceResponseErrorKind::InvalidPayload);
1571 }
1572
1573 #[tokio::test]
1574 async fn handle_announce_response_found_pinged_recently() {
1575 let (dht_pk, dht_sk) = gen_keypair();
1576 let (real_pk, real_sk) = gen_keypair();
1577 let (udp_tx, udp_rx) = mpsc::channel(1);
1578 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1579 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1580 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1581 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1582
1583 let mut state = onion_client.state.lock().await;
1584
1585 let (friend_pk, _friend_sk) = gen_keypair();
1586 let friend = OnionFriend::new(friend_pk);
1587 let friend_temporary_pk = friend.temporary_pk;
1588 state.friends.insert(friend_pk, friend);
1589
1590 let addr = "127.0.0.1".parse().unwrap();
1591 for i in 0 .. 3 {
1592 let saddr = SocketAddr::new(addr, 12346 + i);
1593 let (pk, _sk) = gen_keypair();
1594 let node = PackedNode::new(saddr, &pk);
1595 state.paths_pool.path_nodes.put(node);
1596 }
1597 let path = state.paths_pool.path_nodes.udp_path().unwrap();
1598
1599 let (sender_pk, sender_sk) = gen_keypair();
1600 let saddr = "127.0.0.1:12345".parse().unwrap();
1601
1602 let request_data = AnnounceRequestData {
1603 pk: sender_pk,
1604 saddr,
1605 path_id: path.id(),
1606 friend_pk: Some(friend_pk),
1607 };
1608 let request_id = state.announce_requests.new_ping_id(request_data);
1609
1610 let (node_pk, _node_sk) = gen_keypair();
1612 let node = PackedNode::new(SocketAddr::V4("5.6.7.8:12345".parse().unwrap()), &node_pk);
1613 let node_request_data = AnnounceRequestData {
1614 pk: node_pk,
1615 saddr: node.saddr,
1616 path_id: path.id(),
1617 friend_pk: Some(friend_pk),
1618 };
1619 let _node_request_id = state.announce_requests.new_ping_id(node_request_data);
1620
1621 drop(state);
1622
1623 let (friend_data_pk, _friend_data_sk) = gen_keypair();
1624 let node = PackedNode::new(SocketAddr::V4("5.6.7.8:12345".parse().unwrap()), &node_pk);
1625 let payload = OnionAnnounceResponsePayload {
1626 announce_status: AnnounceStatus::Found,
1627 ping_id_or_pk: pk_as_digest(friend_data_pk),
1628 nodes: vec![node]
1629 };
1630 let packet = OnionAnnounceResponse::new(&precompute(&friend_temporary_pk, &sender_sk), request_id, &payload);
1631
1632 onion_client.handle_announce_response(&packet, true).await.unwrap();
1633
1634 drop(onion_client);
1636
1637 assert!(udp_rx.collect::<Vec<_>>().await.is_empty());
1638 }
1639
1640 #[tokio::test]
1641 async fn handle_data_response_dht_pk_announce_udp_node() {
1642 let (dht_pk, dht_sk) = gen_keypair();
1643 let (real_pk, real_sk) = gen_keypair();
1644 let (udp_tx, udp_rx) = mpsc::channel(1);
1645 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1646 let (dht_pk_tx, dht_pk_rx) = mpsc::unbounded();
1647 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1648 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1649 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1650
1651 onion_client.set_dht_pk_sink(dht_pk_tx).await;
1652
1653 let (friend_dht_pk, _friend_dht_sk) = gen_keypair();
1654 let (friend_real_pk, friend_real_sk) = gen_keypair();
1655
1656 onion_client.add_friend(friend_real_pk).await;
1657
1658 let saddr: SocketAddr = "127.0.0.1:12345".parse().unwrap();
1659 let (node_pk, node_sk) = gen_keypair();
1660 let dht_pk_announce_payload = DhtPkAnnouncePayload::new(friend_dht_pk, vec![
1661 TcpUdpPackedNode {
1662 ip_port: IpPort {
1663 protocol: ProtocolType::UDP,
1664 ip_addr: saddr.ip(),
1665 port: saddr.port(),
1666 },
1667 pk: node_pk,
1668 },
1669 ]);
1670 let no_reply = dht_pk_announce_payload.no_reply;
1671 let onion_data_response_inner_payload = OnionDataResponseInnerPayload::DhtPkAnnounce(dht_pk_announce_payload);
1672 let nonce = gen_nonce();
1673 let onion_data_response_payload = OnionDataResponsePayload::new(&precompute(&real_pk, &friend_real_sk), friend_real_pk, &nonce, &onion_data_response_inner_payload);
1674 let (temporary_pk, temporary_sk) = gen_keypair();
1675 let onion_data_response = OnionDataResponse::new(&precompute(&onion_client.data_pk, &temporary_sk), temporary_pk, nonce, &onion_data_response_payload);
1676
1677 onion_client.handle_data_response(&onion_data_response).await.unwrap();
1678
1679 let state = onion_client.state.lock().await;
1680
1681 let friend = &state.friends[&friend_real_pk];
1683 assert_eq!(friend.last_no_reply, no_reply);
1684 assert_eq!(friend.dht_pk, Some(friend_dht_pk));
1685
1686 let (received, _dht_pk_rx) = dht_pk_rx.into_future().await;
1688 let (received_real_pk, received_dht_pk) = received.unwrap();
1689 assert_eq!(received_real_pk, friend_real_pk);
1690 assert_eq!(received_dht_pk, friend_dht_pk);
1691
1692 let (received, _udp_rx) = udp_rx.into_future().await;
1694 let (packet, addr_to_send) = received.unwrap();
1695
1696 assert_eq!(addr_to_send, saddr);
1697 let packet = unpack!(packet, Packet::NodesRequest);
1698 let payload = packet.get_payload(&precompute(&dht_pk, &node_sk)).unwrap();
1699
1700 assert_eq!(payload.pk, dht_pk);
1701 }
1702
1703 #[tokio::test]
1704 async fn handle_data_response_dht_pk_announce_tcp_node() {
1705 let (dht_pk, dht_sk) = gen_keypair();
1706 let (real_pk, real_sk) = gen_keypair();
1707 let (udp_tx, _udp_rx) = mpsc::channel(1);
1708 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1709 let (dht_pk_tx, _dht_pk_rx) = mpsc::unbounded();
1710 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1711 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1712 let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
1713
1714 onion_client.set_dht_pk_sink(dht_pk_tx).await;
1715
1716 let (friend_dht_pk, _friend_dht_sk) = gen_keypair();
1717 let (friend_real_pk, friend_real_sk) = gen_keypair();
1718
1719 onion_client.add_friend(friend_real_pk).await;
1720
1721 let (node_pk, _node_sk) = gen_keypair();
1722 let dht_pk_announce_payload = DhtPkAnnouncePayload::new(friend_dht_pk, vec![
1723 TcpUdpPackedNode {
1724 ip_port: IpPort {
1725 protocol: ProtocolType::TCP,
1726 ip_addr: "127.0.0.2".parse().unwrap(),
1727 port: 12346,
1728 },
1729 pk: node_pk,
1730 },
1731 ]);
1732 let no_reply = dht_pk_announce_payload.no_reply;
1733 let onion_data_response_inner_payload = OnionDataResponseInnerPayload::DhtPkAnnounce(dht_pk_announce_payload);
1734 let nonce = gen_nonce();
1735 let onion_data_response_payload = OnionDataResponsePayload::new(&precompute(&real_pk, &friend_real_sk), friend_real_pk, &nonce, &onion_data_response_inner_payload);
1736 let (temporary_pk, temporary_sk) = gen_keypair();
1737 let onion_data_response = OnionDataResponse::new(&precompute(&onion_client.data_pk, &temporary_sk), temporary_pk, nonce, &onion_data_response_payload);
1738
1739 onion_client.handle_data_response(&onion_data_response).await.unwrap();
1740
1741 let state = onion_client.state.lock().await;
1742
1743 let friend = &state.friends[&friend_real_pk];
1745 assert_eq!(friend.last_no_reply, no_reply);
1746 assert_eq!(friend.dht_pk, Some(friend_dht_pk));
1747
1748 assert!(onion_client.tcp_connections.has_relay(&node_pk).await);
1749 assert!(onion_client.tcp_connections.has_connection(&friend_dht_pk).await);
1750 }
1751
1752 #[tokio::test]
1753 async fn handle_data_response_dht_pk_announce_no_friend_with_pk() {
1754 let (dht_pk, dht_sk) = gen_keypair();
1755 let (real_pk, real_sk) = gen_keypair();
1756 let (udp_tx, _udp_rx) = mpsc::channel(1);
1757 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1758 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1759 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1760 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1761
1762 let (friend_dht_pk, _friend_dht_sk) = gen_keypair();
1763 let (friend_real_pk, friend_real_sk) = gen_keypair();
1764
1765 let dht_pk_announce_payload = DhtPkAnnouncePayload::new(friend_dht_pk, vec![]);
1766 let onion_data_response_inner_payload = OnionDataResponseInnerPayload::DhtPkAnnounce(dht_pk_announce_payload);
1767 let nonce = gen_nonce();
1768 let onion_data_response_payload = OnionDataResponsePayload::new(&precompute(&real_pk, &friend_real_sk), friend_real_pk, &nonce, &onion_data_response_inner_payload);
1769 let (temporary_pk, temporary_sk) = gen_keypair();
1770 let onion_data_response = OnionDataResponse::new(&precompute(&onion_client.data_pk, &temporary_sk), temporary_pk, nonce, &onion_data_response_payload);
1771
1772 let error = onion_client.handle_data_response(&onion_data_response).await.err().unwrap();
1773 assert_eq!(error.kind(), &HandleDataResponseErrorKind::DhtPkAnnounce);
1774 let cause = error.cause().unwrap().downcast_ref::<HandleDhtPkAnnounceError>().unwrap();
1775 assert_eq!(cause.kind(), &HandleDhtPkAnnounceErrorKind::NoFriendWithPk);
1776 }
1777
1778 #[tokio::test]
1779 async fn handle_data_response_dht_pk_announce_invalid_no_reply() {
1780 let (dht_pk, dht_sk) = gen_keypair();
1781 let (real_pk, real_sk) = gen_keypair();
1782 let (udp_tx, _udp_rx) = mpsc::channel(1);
1783 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1784 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1785 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1786 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1787
1788 let (friend_dht_pk, _friend_dht_sk) = gen_keypair();
1789 let (friend_real_pk, friend_real_sk) = gen_keypair();
1790
1791 onion_client.add_friend(friend_real_pk).await;
1792
1793 let dht_pk_announce_payload = DhtPkAnnouncePayload::new(friend_dht_pk, vec![]);
1794 let onion_data_response_inner_payload = OnionDataResponseInnerPayload::DhtPkAnnounce(dht_pk_announce_payload);
1795 let nonce = gen_nonce();
1796 let onion_data_response_payload = OnionDataResponsePayload::new(&precompute(&real_pk, &friend_real_sk), friend_real_pk, &nonce, &onion_data_response_inner_payload);
1797 let (temporary_pk, temporary_sk) = gen_keypair();
1798 let onion_data_response = OnionDataResponse::new(&precompute(&onion_client.data_pk, &temporary_sk), temporary_pk, nonce, &onion_data_response_payload);
1799
1800 onion_client.handle_data_response(&onion_data_response).await.unwrap();
1801
1802 let error = onion_client.handle_data_response(&onion_data_response).await.err().unwrap();
1804 assert_eq!(error.kind(), &HandleDataResponseErrorKind::DhtPkAnnounce);
1805 let cause = error.cause().unwrap().downcast_ref::<HandleDhtPkAnnounceError>().unwrap();
1806 assert_eq!(cause.kind(), &HandleDhtPkAnnounceErrorKind::InvalidNoReply);
1807 }
1808
1809 #[tokio::test]
1810 async fn handle_data_response_invalid_payload() {
1811 let (dht_pk, dht_sk) = gen_keypair();
1812 let (real_pk, real_sk) = gen_keypair();
1813 let (udp_tx, _udp_rx) = mpsc::channel(1);
1814 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1815 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1816 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1817 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1818
1819 let onion_data_response = OnionDataResponse {
1820 nonce: gen_nonce(),
1821 temporary_pk: gen_keypair().0,
1822 payload: vec![42; 123],
1823 };
1824
1825 let error = onion_client.handle_data_response(&onion_data_response).await.err().unwrap();
1826 assert_eq!(error.kind(), &HandleDataResponseErrorKind::InvalidPayload);
1827 }
1828
1829 #[tokio::test]
1830 async fn handle_data_response_invalid_inner_payload() {
1831 let (dht_pk, dht_sk) = gen_keypair();
1832 let (real_pk, real_sk) = gen_keypair();
1833 let (udp_tx, _udp_rx) = mpsc::channel(1);
1834 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1835 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1836 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1837 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1838
1839 let onion_data_response_payload = OnionDataResponsePayload {
1840 real_pk: gen_keypair().0,
1841 payload: vec![42; 123],
1842 };
1843 let (temporary_pk, temporary_sk) = gen_keypair();
1844 let onion_data_response = OnionDataResponse::new(&precompute(&onion_client.data_pk, &temporary_sk), temporary_pk, gen_nonce(), &onion_data_response_payload);
1845
1846 let error = onion_client.handle_data_response(&onion_data_response).await.err().unwrap();
1847 assert_eq!(error.kind(), &HandleDataResponseErrorKind::InvalidInnerPayload);
1848 }
1849
1850 #[tokio::test]
1851 async fn announce_loop_empty() {
1852 let (dht_pk, dht_sk) = gen_keypair();
1853 let (real_pk, real_sk) = gen_keypair();
1854 let (udp_tx, udp_rx) = mpsc::channel(MAX_ONION_ANNOUNCE_NODES as usize / 2);
1855 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1856 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1857 dht.add_node(PackedNode::new("127.0.0.1:12345".parse().unwrap(), &gen_keypair().0)).await;
1859 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1860 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1861
1862 let mut state = onion_client.state.lock().await;
1863
1864 let mut key_by_addr = HashMap::new();
1866 let addr = "127.0.0.1".parse().unwrap();
1867 for i in 0 .. 3 {
1868 let saddr = SocketAddr::new(addr, 12346 + i);
1869 let (pk, sk) = gen_keypair();
1870 key_by_addr.insert(saddr, sk);
1871 let node = PackedNode::new(saddr, &pk);
1872 state.paths_pool.path_nodes.put(node);
1873 }
1874
1875 onion_client.announce_loop(&mut state).await.unwrap();
1876
1877 let data_pk = onion_client.data_pk;
1878
1879 drop(state);
1881 drop(onion_client);
1882
1883 let packets = udp_rx.collect::<Vec<_>>().await;
1884
1885 assert!(!packets.is_empty());
1886
1887 for (packet, addr_to_send) in packets {
1888 let packet = unpack!(packet, Packet::OnionRequest0);
1889 let payload = unpack_onion_packet(packet, addr_to_send, &key_by_addr);
1890 let packet = unpack!(payload.inner, InnerOnionRequest::InnerOnionAnnounceRequest);
1891 let payload = packet.get_payload(&precompute(&real_pk, &key_by_addr[&payload.ip_port.to_saddr()])).unwrap();
1892 assert_eq!(payload.ping_id, initial_ping_id());
1893 assert_eq!(payload.search_pk, real_pk);
1894 assert_eq!(payload.data_pk, data_pk);
1895 }
1896 }
1897
1898 #[tokio::test]
1899 async fn announce_loop() {
1900 let (dht_pk, dht_sk) = gen_keypair();
1901 let (real_pk, real_sk) = gen_keypair();
1902 let (udp_tx, udp_rx) = mpsc::channel(MAX_ONION_ANNOUNCE_NODES as usize);
1903 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1904 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1905 dht.add_node(PackedNode::new("127.0.0.1:12345".parse().unwrap(), &gen_keypair().0)).await;
1907 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1908 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1909
1910 let mut state = onion_client.state.lock().await;
1911
1912 let mut key_by_addr = HashMap::new();
1914 let addr = "127.0.0.1".parse().unwrap();
1915 for i in 0 .. 3 {
1916 let saddr = SocketAddr::new(addr, 12346 + i);
1917 let (pk, sk) = gen_keypair();
1918 key_by_addr.insert(saddr, sk);
1919 let node = PackedNode::new(saddr, &pk);
1920 state.paths_pool.path_nodes.put(node);
1921 }
1922
1923 let ping_id = sha256::hash(&[1, 2, 3]);
1924 let now = Instant::now();
1925
1926 let mut nodes_key_by_addr = HashMap::new();
1927 for i in 0 .. MAX_ONION_ANNOUNCE_NODES {
1928 let saddr = SocketAddr::new(addr, 23456 + u16::from(i));
1929 let path = state.paths_pool.path_nodes.udp_path().unwrap();
1930 let (node_pk, node_sk) = gen_keypair();
1931 nodes_key_by_addr.insert(saddr, node_sk);
1932 let node = OnionNode {
1933 pk: node_pk,
1934 saddr,
1935 path_id: path.id(),
1936 ping_id: Some(ping_id),
1937 data_pk: None,
1938 unsuccessful_pings: 0,
1939 added_time: now,
1940 ping_time: now,
1941 response_time: now,
1942 announce_status: AnnounceStatus::Failed,
1943 };
1944 assert!(state.announce_list.try_add(&real_pk, node, true));
1945 }
1946
1947 tokio::time::pause();
1948 tokio::time::advance(ANNOUNCE_INTERVAL_NOT_ANNOUNCED + Duration::from_secs(1)).await;
1950
1951 onion_client.announce_loop(&mut state).await.unwrap();
1952
1953 let data_pk = onion_client.data_pk;
1954
1955 drop(state);
1957 drop(onion_client);
1958
1959 let packets = udp_rx.collect::<Vec<_>>().await;
1960
1961 assert_eq!(packets.len(), MAX_ONION_ANNOUNCE_NODES as usize);
1962
1963 for (packet, addr_to_send) in packets {
1964 let packet = unpack!(packet, Packet::OnionRequest0);
1965 let payload = unpack_onion_packet(packet, addr_to_send, &key_by_addr);
1966 let packet = unpack!(payload.inner, InnerOnionRequest::InnerOnionAnnounceRequest);
1967 let payload = packet.get_payload(&precompute(&real_pk, &nodes_key_by_addr[&payload.ip_port.to_saddr()])).unwrap();
1968 assert_eq!(payload.ping_id, ping_id);
1969 assert_eq!(payload.search_pk, real_pk);
1970 assert_eq!(payload.data_pk, data_pk);
1971 }
1972 }
1973
1974 #[tokio::test]
1975 async fn friends_loop_empty() {
1976 let (dht_pk, dht_sk) = gen_keypair();
1977 let (real_pk, real_sk) = gen_keypair();
1978 let (udp_tx, udp_rx) = mpsc::channel(MAX_ONION_ANNOUNCE_NODES as usize / 2);
1979 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1980 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1981 dht.add_node(PackedNode::new("127.0.0.1:12345".parse().unwrap(), &gen_keypair().0)).await;
1983 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1984 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1985
1986 let mut state = onion_client.state.lock().await;
1987
1988 let (friend_pk, _friend_sk) = gen_keypair();
1989 let friend = OnionFriend::new(friend_pk);
1990 let friend_temporary_pk = friend.temporary_pk;
1991 state.friends.insert(friend_pk, friend);
1992
1993 let mut key_by_addr = HashMap::new();
1995 let addr = "127.0.0.1".parse().unwrap();
1996 for i in 0 .. 3 {
1997 let saddr = SocketAddr::new(addr, 12346 + i);
1998 let (pk, sk) = gen_keypair();
1999 key_by_addr.insert(saddr, sk);
2000 let node = PackedNode::new(saddr, &pk);
2001 state.paths_pool.path_nodes.put(node);
2002 }
2003
2004 onion_client.friends_loop(&mut state).await.unwrap();
2005
2006 drop(state);
2008 drop(onion_client);
2009
2010 let packets = udp_rx.collect::<Vec<_>>().await;
2011
2012 assert!(!packets.is_empty());
2013
2014 for (packet, addr_to_send) in packets {
2015 let packet = unpack!(packet, Packet::OnionRequest0);
2016 let payload = unpack_onion_packet(packet, addr_to_send, &key_by_addr);
2017 let packet = unpack!(payload.inner, InnerOnionRequest::InnerOnionAnnounceRequest);
2018 let payload = packet.get_payload(&precompute(&friend_temporary_pk, &key_by_addr[&payload.ip_port.to_saddr()])).unwrap();
2019 assert_eq!(payload.ping_id, initial_ping_id());
2020 assert_eq!(payload.search_pk, friend_pk);
2021 assert_eq!(payload.data_pk, PublicKey([0; 32]));
2022 }
2023 }
2024
2025 #[tokio::test]
2026 async fn friends_loop() {
2027 let (dht_pk, dht_sk) = gen_keypair();
2028 let (real_pk, real_sk) = gen_keypair();
2029 let (udp_tx, udp_rx) = mpsc::channel(MAX_ONION_FRIEND_NODES as usize);
2030 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
2031 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
2032 dht.add_node(PackedNode::new("127.0.0.1:12345".parse().unwrap(), &gen_keypair().0)).await;
2034 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
2035 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
2036
2037 let mut state = onion_client.state.lock().await;
2038
2039 let (friend_pk, _friend_sk) = gen_keypair();
2040 let mut friend = OnionFriend::new(friend_pk);
2041 let friend_temporary_pk = friend.temporary_pk;
2042
2043 let mut key_by_addr = HashMap::new();
2045 let addr = "127.0.0.1".parse().unwrap();
2046 for i in 0 .. 3 {
2047 let saddr = SocketAddr::new(addr, 12346 + i);
2048 let (pk, sk) = gen_keypair();
2049 key_by_addr.insert(saddr, sk);
2050 let node = PackedNode::new(saddr, &pk);
2051 state.paths_pool.path_nodes.put(node);
2052 }
2053
2054 let now = Instant::now();
2055
2056 let mut nodes_key_by_addr = HashMap::new();
2057 for i in 0 .. MAX_ONION_FRIEND_NODES {
2058 let saddr = SocketAddr::new(addr, 23456 + u16::from(i));
2059 let path = state.paths_pool.path_nodes.udp_path().unwrap();
2060 let (node_pk, node_sk) = gen_keypair();
2061 nodes_key_by_addr.insert(saddr, node_sk);
2062 let node = OnionNode {
2063 pk: node_pk,
2064 saddr,
2065 path_id: path.id(),
2066 ping_id: Some(sha256::hash(&[1, 2, 3])),
2068 data_pk: None,
2069 unsuccessful_pings: 0,
2070 added_time: now,
2071 ping_time: now,
2072 response_time: now,
2073 announce_status: AnnounceStatus::Failed,
2074 };
2075 assert!(friend.close_nodes.try_add(&real_pk, node, true));
2076 }
2077
2078 state.friends.insert(friend_pk, friend);
2079
2080 tokio::time::pause();
2081 tokio::time::advance(ANNOUNCE_INTERVAL_NOT_ANNOUNCED + Duration::from_secs(1)).await;
2083
2084 onion_client.friends_loop(&mut state).await.unwrap();
2085
2086 drop(state);
2088 drop(onion_client);
2089
2090 let packets = udp_rx.collect::<Vec<_>>().await;
2091
2092 assert_eq!(packets.len(), MAX_ONION_FRIEND_NODES as usize);
2093
2094 for (packet, addr_to_send) in packets {
2095 let packet = unpack!(packet, Packet::OnionRequest0);
2096 let payload = unpack_onion_packet(packet, addr_to_send, &key_by_addr);
2097 let packet = unpack!(payload.inner, InnerOnionRequest::InnerOnionAnnounceRequest);
2098 let payload = packet.get_payload(&precompute(&friend_temporary_pk, &nodes_key_by_addr[&payload.ip_port.to_saddr()])).unwrap();
2099 assert_eq!(payload.ping_id, initial_ping_id());
2100 assert_eq!(payload.search_pk, friend_pk);
2101 assert_eq!(payload.data_pk, PublicKey([0; 32]));
2102 }
2103 }
2104
2105 #[tokio::test]
2106 async fn friends_loop_ignore_online() {
2107 let (dht_pk, dht_sk) = gen_keypair();
2108 let (real_pk, real_sk) = gen_keypair();
2109 let (udp_tx, udp_rx) = mpsc::channel(MAX_ONION_FRIEND_NODES as usize);
2110 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
2111 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
2112 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
2113 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
2114
2115 let mut state = onion_client.state.lock().await;
2116
2117 let (friend_pk, _friend_sk) = gen_keypair();
2118 let mut friend = OnionFriend::new(friend_pk);
2119 friend.connected = true;
2120
2121 let addr = "127.0.0.1".parse().unwrap();
2122 for i in 0 .. 3 {
2123 let saddr = SocketAddr::new(addr, 12346 + i);
2124 let (pk, _sk) = gen_keypair();
2125 let node = PackedNode::new(saddr, &pk);
2126 state.paths_pool.path_nodes.put(node);
2127 }
2128
2129 let now = Instant::now();
2130
2131 for i in 0 .. MAX_ONION_FRIEND_NODES {
2132 let saddr = SocketAddr::new(addr, 23456 + u16::from(i));
2133 let path = state.paths_pool.path_nodes.udp_path().unwrap();
2134 let (node_pk, _node_sk) = gen_keypair();
2135 let node = OnionNode {
2136 pk: node_pk,
2137 saddr,
2138 path_id: path.id(),
2139 ping_id: None,
2140 data_pk: None,
2141 unsuccessful_pings: 0,
2142 added_time: now,
2143 ping_time: now,
2144 response_time: now,
2145 announce_status: AnnounceStatus::Failed,
2146 };
2147 assert!(friend.close_nodes.try_add(&real_pk, node, true));
2148 }
2149
2150 state.friends.insert(friend_pk, friend);
2151
2152 tokio::time::pause();
2153 tokio::time::advance(ANNOUNCE_INTERVAL_NOT_ANNOUNCED + Duration::from_secs(1)).await;
2155
2156 onion_client.friends_loop(&mut state).await.unwrap();
2157
2158 drop(state);
2160 drop(onion_client);
2161
2162 assert!(udp_rx.collect::<Vec<_>>().await.is_empty());
2163 }
2164
2165 #[tokio::test]
2166 async fn send_dht_pk_onion() {
2167 let (dht_pk, dht_sk) = gen_keypair();
2168 let (real_pk, real_sk) = gen_keypair();
2169 let (udp_tx, udp_rx) = mpsc::channel(MAX_ONION_FRIEND_NODES as usize);
2170 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
2171 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
2172 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
2173 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
2174
2175 let mut state = onion_client.state.lock().await;
2176
2177 let (friend_pk, friend_sk) = gen_keypair();
2178 let mut friend = OnionFriend::new(friend_pk);
2179
2180 let mut key_by_addr = HashMap::new();
2182 let addr = "127.0.0.1".parse().unwrap();
2183 for i in 0 .. 3 {
2184 let saddr = SocketAddr::new(addr, 12346 + i);
2185 let (pk, sk) = gen_keypair();
2186 key_by_addr.insert(saddr, sk);
2187 let node = PackedNode::new(saddr, &pk);
2188 state.paths_pool.path_nodes.put(node);
2189 }
2190
2191 let now = Instant::now();
2192
2193 let (data_pk, data_sk) = gen_keypair();
2194 let mut nodes_key_by_addr = HashMap::new();
2195 for i in 0 .. MAX_ONION_FRIEND_NODES {
2196 let saddr = SocketAddr::new(addr, 23456 + u16::from(i));
2197 let path = state.paths_pool.path_nodes.udp_path().unwrap();
2198 let (node_pk, node_sk) = gen_keypair();
2199 nodes_key_by_addr.insert(saddr, node_sk);
2200 let node = OnionNode {
2201 pk: node_pk,
2202 saddr,
2203 path_id: path.id(),
2204 ping_id: None,
2205 data_pk: Some(data_pk),
2206 unsuccessful_pings: 0,
2207 added_time: now,
2208 ping_time: now,
2209 response_time: now,
2210 announce_status: AnnounceStatus::Failed,
2211 };
2212 assert!(friend.close_nodes.try_add(&real_pk, node, true));
2213 }
2214
2215 state.friends.insert(friend_pk, friend);
2216
2217 let mut dht_close_nodes = onion_client.dht.close_nodes.write().await;
2218 for i in 0 .. 4 {
2219 let saddr = SocketAddr::new(addr, 23456 + i);
2220 let (node_pk, _node_sk) = gen_keypair();
2221 let node = PackedNode::new(saddr, &node_pk);
2222 assert!(dht_close_nodes.try_add(node));
2223 }
2224 drop(dht_close_nodes);
2225
2226 onion_client.friends_loop(&mut state).await.unwrap();
2227
2228 drop(state);
2230 drop(onion_client);
2231
2232 let packets = udp_rx.collect::<Vec<_>>().await;
2233
2234 assert_eq!(packets.len(), MAX_ONION_FRIEND_NODES as usize);
2235
2236 for (packet, addr_to_send) in packets {
2237 let packet = unpack!(packet, Packet::OnionRequest0);
2238 let payload = unpack_onion_packet(packet, addr_to_send, &key_by_addr);
2239 let packet = unpack!(payload.inner, InnerOnionRequest::InnerOnionDataRequest);
2240 assert_eq!(packet.destination_pk, friend_pk);
2241 let payload = packet.get_payload(&precompute(&packet.temporary_pk, &data_sk)).unwrap();
2242 assert_eq!(payload.real_pk, real_pk);
2243 let payload = payload.get_payload(&packet.nonce, &precompute(&real_pk, &friend_sk)).unwrap();
2244 let payload = unpack!(payload, OnionDataResponseInnerPayload::DhtPkAnnounce);
2245 assert_eq!(payload.dht_pk, dht_pk);
2246 assert_eq!(payload.nodes.len(), 4);
2247 }
2248 }
2249
2250 #[tokio::test]
2251 async fn send_dht_pk_dht_request() {
2252 let (dht_pk, dht_sk) = gen_keypair();
2253 let (real_pk, real_sk) = gen_keypair();
2254 let (udp_tx, udp_rx) = mpsc::channel(MAX_ONION_FRIEND_NODES as usize);
2255 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
2256 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
2257 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
2258 let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
2259
2260 let mut state = onion_client.state.lock().await;
2261
2262 let (friend_pk, friend_sk) = gen_keypair();
2263 let (friend_dht_pk, friend_dht_sk) = gen_keypair();
2264 let mut friend = OnionFriend::new(friend_pk);
2265 friend.dht_pk = Some(friend_dht_pk);
2266 state.friends.insert(friend_pk, friend);
2267
2268 let addr = "127.0.0.1".parse().unwrap();
2269 let mut dht_close_nodes = onion_client.dht.close_nodes.write().await;
2270 for i in 0 .. 8 {
2271 let saddr = SocketAddr::new(addr, 23456 + i);
2272 let (node_pk, _node_sk) = gen_keypair();
2273 let node = PackedNode::new(saddr, &node_pk);
2274 assert!(dht_close_nodes.try_add(node));
2275 }
2276 drop(dht_close_nodes);
2277
2278 onion_client.friends_loop(&mut state).await.unwrap();
2279
2280 drop(state);
2282 drop(onion_client);
2283
2284 let packets = udp_rx.collect::<Vec<_>>().await;
2285
2286 assert_eq!(packets.len(), 8);
2287
2288 for (packet, _addr_to_send) in packets {
2289 let packet = unpack!(packet, Packet::DhtRequest);
2290 assert_eq!(packet.rpk, friend_dht_pk);
2291 assert_eq!(packet.spk, dht_pk);
2292 let payload = packet.get_payload(&precompute(&dht_pk, &friend_dht_sk)).unwrap();
2293 let packet = unpack!(payload, DhtRequestPayload::DhtPkAnnounce);
2294 assert_eq!(packet.real_pk, real_pk);
2295 let payload = packet.get_payload(&precompute(&real_pk, &friend_sk)).unwrap();
2296 assert_eq!(payload.dht_pk, dht_pk);
2297 }
2298 }
2299
2300 #[tokio::test]
2301 async fn populate_path_nodes() {
2302 let (dht_pk, dht_sk) = gen_keypair();
2303 let (real_pk, real_sk) = gen_keypair();
2304 let (udp_tx, udp_rx) = mpsc::channel(1);
2305 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
2306 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
2307 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
2308 let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
2309
2310 let (node_pk, node_sk) = gen_keypair();
2311 let node = PackedNode::new("127.0.0.1:12345".parse().unwrap(), &node_pk);
2312
2313 onion_client.dht.ping_node(&node).await.unwrap();
2315 let (received, _udp_rx) = udp_rx.into_future().await;
2316 let (packet, addr_to_send) = received.unwrap();
2317 assert_eq!(addr_to_send, node.saddr);
2318 let shared_secret = precompute(&dht_pk, &node_sk);
2319 let request_packet = unpack!(packet, Packet::NodesRequest);
2320 let request_payload = request_packet.get_payload(&shared_secret).unwrap();
2321 let response_payload = NodesResponsePayload {
2322 nodes: vec![],
2323 id: request_payload.id,
2324 };
2325 let response_packet = NodesResponse::new(&shared_secret, &node_pk, &response_payload);
2326 onion_client.dht.handle_packet(Packet::NodesResponse(response_packet), node.saddr).await.unwrap();
2327
2328 let mut state = onion_client.state.lock().await;
2329
2330 onion_client.populate_path_nodes(&mut state).await;
2331
2332 assert!(state.paths_pool.path_nodes.rand().is_some());
2333 }
2334
2335 #[tokio::test]
2336 async fn send_onion_request_udp() {
2337 let (dht_pk, dht_sk) = gen_keypair();
2338 let (real_pk, real_sk) = gen_keypair();
2339 let (udp_tx, udp_rx) = mpsc::channel(1);
2340 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
2341 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
2342 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
2343 let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
2344
2345 let inner_onion_request = InnerOnionRequest::InnerOnionAnnounceRequest(InnerOnionAnnounceRequest {
2346 nonce: gen_nonce(),
2347 pk: gen_keypair().0,
2348 payload: vec![42; 123],
2349 });
2350
2351 let path = OnionPath::new([
2352 PackedNode::new("127.0.0.1:12346".parse().unwrap(), &gen_keypair().0),
2353 PackedNode::new("127.0.0.1:12347".parse().unwrap(), &gen_keypair().0),
2354 PackedNode::new("127.0.0.1:12348".parse().unwrap(), &gen_keypair().0),
2355 ], OnionPathType::UDP);
2356 let saddr = "127.0.0.1:12345".parse().unwrap();
2357 onion_client.send_onion_request(path, inner_onion_request, saddr).await.unwrap();
2358
2359 let (_received, _udp_rx) = udp_rx.into_future().await;
2360 }
2361
2362 #[tokio::test]
2363 async fn send_onion_request_tcp() {
2364 let (dht_pk, dht_sk) = gen_keypair();
2365 let (real_pk, real_sk) = gen_keypair();
2366 let (udp_tx, udp_rx) = mpsc::channel(1);
2367 let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
2368 let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
2369 let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
2370 let (_relay_incoming_rx, relay_outgoing_rx, relay_pk) = tcp_connections.add_client().await;
2371 let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
2372
2373 let inner_onion_request = InnerOnionRequest::InnerOnionAnnounceRequest(InnerOnionAnnounceRequest {
2374 nonce: gen_nonce(),
2375 pk: gen_keypair().0,
2376 payload: vec![42; 123],
2377 });
2378
2379 let path = OnionPath::new([
2380 PackedNode::new("127.0.0.1:12346".parse().unwrap(), &relay_pk),
2381 PackedNode::new("127.0.0.1:12347".parse().unwrap(), &gen_keypair().0),
2382 PackedNode::new("127.0.0.1:12348".parse().unwrap(), &gen_keypair().0),
2383 ], OnionPathType::TCP);
2384 let saddr = "127.0.0.1:12345".parse().unwrap();
2385 onion_client.send_onion_request(path, inner_onion_request, saddr).await.unwrap();
2386
2387 let (_received, _relay_outgoing_rx) = relay_outgoing_rx.into_future().await;
2388
2389 drop(onion_client);
2391
2392 assert!(udp_rx.collect::<Vec<_>>().await.is_empty());
2393 }
2394}