tox_core/onion/client/
mod.rs

1//! Onion client implementation.
2
3mod errors;
4mod nodes_pool;
5mod onion_path;
6mod paths_pool;
7
8use std::collections::HashMap;
9use std::net::SocketAddr;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use failure::Fail;
14use futures::{StreamExt, SinkExt};
15use futures::channel::mpsc;
16use futures::lock::Mutex;
17
18use tox_crypto::*;
19use crate::dht::ip_port::IsGlobal;
20use tox_packet::dht::packed_node::PackedNode;
21use tox_packet::dht::*;
22use crate::dht::request_queue::RequestQueue;
23use crate::dht::server::Server as DhtServer;
24use crate::dht::kbucket::*;
25use tox_packet::ip_port::*;
26use crate::onion::client::errors::*;
27use crate::onion::client::onion_path::*;
28use crate::onion::client::paths_pool::*;
29use crate::onion::onion_announce::initial_ping_id;
30use tox_packet::onion::*;
31use tox_packet::packed_node::*;
32use crate::relay::client::Connections as TcpConnections;
33use crate::time::*;
34use crate::io_tokio::*;
35
36/// Shorthand for the transmit half of the message channel for sending DHT
37/// `PublicKey` when it gets known. The first key is a long term key, the second
38/// key is a DHT key.
39type DhtPkTx = mpsc::UnboundedSender<(PublicKey, PublicKey)>;
40
41/// Shorthand for the transmit half of the message channel for sending friend
42/// requests when we get them. The key is a long term key.
43type FriendRequestTx = mpsc::UnboundedSender<(PublicKey, FriendRequest)>;
44
45/// Number of friend's close nodes to store.
46const MAX_ONION_FRIEND_NODES: u8 = 8;
47
48/// Number of nodes to announce ourselves to.
49const MAX_ONION_ANNOUNCE_NODES: u8 = 12;
50
51/// Timeout for onion announce packets.
52const ANNOUNCE_TIMEOUT: Duration = Duration::from_secs(10);
53
54/// How many attempts to reach a node we should make.
55const ONION_NODE_MAX_PINGS: u32 = 3;
56
57/// How often to ping a node (announce or friend searching).
58const ONION_NODE_PING_INTERVAL: Duration = Duration::from_secs(15);
59
60/// How often we should announce ourselves to a node we are not announced to.
61const ANNOUNCE_INTERVAL_NOT_ANNOUNCED: Duration = Duration::from_secs(3);
62
63/// How often we should announce ourselves to a node we are announced to.
64const ANNOUNCE_INTERVAL_ANNOUNCED: Duration = ONION_NODE_PING_INTERVAL;
65
66/// How often we should announce ourselves to a node we are announced to when
67/// it's considered stable.
68const ANNOUNCE_INTERVAL_STABLE: Duration = Duration::from_secs(ONION_NODE_PING_INTERVAL.as_secs() * 8);
69
70/// How often we should search a friend.
71const ANNOUNCE_FRIEND: Duration = Duration::from_secs(ONION_NODE_PING_INTERVAL.as_secs() * 6);
72
73/// How often we should search a friend right after it was added to the friends
74/// list.
75const ANNOUNCE_FRIEND_BEGINNING: Duration = Duration::from_secs(3);
76
77/// After this amount of searches we switch from `ANNOUNCE_FRIEND_BEGINNING`
78/// to `ANNOUNCE_FRIEND` interval.
79const SEARCH_COUNT_FRIEND_ANNOUNCE_BEGINNING: u32 = 17;
80
81/// Longer we didn't see a friend less often we will look for him. This const
82/// defines proportion between the search interval and the time we didn't see a
83/// friend.
84const ONION_FRIEND_BACKOFF_FACTOR: u32 = 4;
85
86/// Maximum interval for friends searching.
87const ONION_FRIEND_MAX_PING_INTERVAL: Duration =
88    Duration::from_secs(MAX_ONION_FRIEND_NODES as u64 * 60 * 5);
89
90/// After this time since last unsuccessful ping (and when ping attempts are
91/// exhausted) node is considered timed out.
92const ONION_NODE_TIMEOUT: Duration = ONION_NODE_PING_INTERVAL;
93
94/// After this interval since creation node is considered stable.
95pub(crate) const TIME_TO_STABLE: Duration = Duration::from_secs(ONION_NODE_PING_INTERVAL.as_secs() * 6);
96
97/// The interval of time at which to tell our friends our DHT `PublicKey`
98/// via onion.
99const ONION_DHTPK_SEND_INTERVAL: Duration = Duration::from_secs(30);
100
101/// The interval of time at which to tell our friends our DHT `PublicKey`
102/// via DHT request.
103const DHT_DHTPK_SEND_INTERVAL: Duration = Duration::from_secs(20);
104
105/// Minimum interval for sending requests to received in `OnionAnnounceResponse`
106/// packet nodes.
107const MIN_NODE_PING_TIME: Duration = Duration::from_secs(10);
108
109/// Friend related data stored in the onion client.
110#[derive(Clone, Debug)]
111struct OnionFriend {
112    /// Friend's long term `PublicKey`.
113    real_pk: PublicKey,
114    /// Friend's DHT `PublicKey` if it's known.
115    dht_pk: Option<PublicKey>,
116    /// Temporary `PublicKey` that should be used to encrypt search requests for
117    /// this friend.
118    temporary_pk: PublicKey,
119    /// Temporary `SecretKey` that should be used to encrypt search requests for
120    /// this friend.
121    temporary_sk: SecretKey,
122    /// List of nodes close to friend's long term `PublicKey`.
123    close_nodes: Kbucket<OnionNode>,
124    /// `no_reply` from last DHT `PublicKey` announce packet used to prevent
125    /// reply attacks.
126    last_no_reply: u64,
127    /// Time when our DHT `PublicKey` was sent to this friend via onion last
128    /// time.
129    last_dht_pk_onion_sent: Option<Instant>,
130    /// Time when our DHT `PublicKey` was sent to this friend via DHT request
131    /// last time.
132    last_dht_pk_dht_sent: Option<Instant>,
133    /// How many times we sent search requests to friend's close nodes.
134    search_count: u32,
135    /// Time when this friend was seen online last time
136    last_seen: Option<Instant>,
137    /// Whether we connected to this friend.
138    connected: bool,
139}
140
141impl OnionFriend {
142    /// Create new `OnionFriend`.
143    pub fn new(real_pk: PublicKey) -> Self {
144        let (temporary_pk, temporary_sk) = gen_keypair();
145        OnionFriend {
146            real_pk,
147            dht_pk: None,
148            temporary_pk,
149            temporary_sk,
150            close_nodes: Kbucket::new(MAX_ONION_FRIEND_NODES),
151            last_no_reply: 0,
152            last_dht_pk_onion_sent: None,
153            last_dht_pk_dht_sent: None,
154            search_count: 0,
155            last_seen: None,
156            connected: false,
157        }
158    }
159}
160
161/// Type for onion close nodes.
162#[derive(Clone, Debug)]
163struct OnionNode {
164    /// Node's `PublicKey`.
165    pk: PublicKey,
166    /// Node's IP address.
167    saddr: SocketAddr,
168    /// Path used to send packets to this node.
169    path_id: OnionPathId,
170    /// Ping id that should be used to announce to this node.
171    ping_id: Option<sha256::Digest>,
172    /// Data `PublicKey` that should be used to send data packets to our friend
173    /// through this node.
174    data_pk: Option<PublicKey>,
175    /// Number of announce requests sent to this node without any response.
176    /// Resets to 0 after receiving a response.
177    unsuccessful_pings: u32,
178    /// Time when this node was added to close nodes list.
179    added_time: Instant,
180    /// Time when the last announce packet was sent to this node.
181    ping_time: Instant,
182    /// Time when we received the last response from this node.
183    response_time: Instant,
184    /// Announce status from last response from this node.
185    announce_status: AnnounceStatus,
186}
187
188impl HasPK for OnionNode {
189    fn pk(&self) -> PublicKey {
190        self.pk
191    }
192}
193
194impl KbucketNode for OnionNode {
195    type NewNode = OnionNode;
196    type CheckNode = PackedNode;
197
198    fn is_outdated(&self, other: &PackedNode) -> bool {
199        assert_eq!(self.pk, other.pk);
200        self.saddr != other.saddr
201    }
202    fn update(&mut self, other: &OnionNode) {
203        assert_eq!(self.pk, other.pk);
204        self.saddr = other.saddr;
205        self.path_id = other.path_id;
206        self.ping_id = other.ping_id.or(self.ping_id);
207        self.data_pk = other.data_pk.or(self.data_pk);
208        self.unsuccessful_pings = 0;
209        self.response_time = clock_now();
210        self.announce_status = other.announce_status;
211    }
212    fn is_evictable(&self) -> bool {
213        self.is_timed_out()
214    }
215}
216
217impl OnionNode {
218    /// Check if the next ping attempt is the last one.
219    pub fn is_last_ping_attempt(&self) -> bool {
220        self.unsuccessful_pings == ONION_NODE_MAX_PINGS - 1
221    }
222
223    /// Check if ping attempts to this node are exhausted.
224    pub fn is_ping_attempts_exhausted(&self) -> bool {
225        self.unsuccessful_pings >= ONION_NODE_MAX_PINGS
226    }
227
228    /// Check if this node is timed out.
229    pub fn is_timed_out(&self) -> bool {
230        self.is_ping_attempts_exhausted() && clock_elapsed(self.ping_time) >= ONION_NODE_TIMEOUT
231    }
232
233    /// Node is considered stable after `TIME_TO_STABLE` since it was
234    /// added to a close list if it responses to our requests.
235    pub fn is_stable(&self) -> bool {
236        clock_elapsed(self.added_time) >= TIME_TO_STABLE &&
237            (self.unsuccessful_pings == 0 || clock_elapsed(self.ping_time) < ONION_NODE_TIMEOUT)
238    }
239}
240
241#[derive(Clone, Debug, Eq, PartialEq)]
242struct AnnounceRequestData {
243    /// `PublicKey` of the node to which we sent a packet.
244    pk: PublicKey,
245    /// IP address of the node to which we sent a packet.
246    saddr: SocketAddr,
247    /// Path used to send announce request packet.
248    path_id: OnionPathId,
249    /// Friend's long term `PublicKey` if announce request was searching
250    /// request.
251    friend_pk: Option<PublicKey>,
252}
253
254/// Announce packet data that doesn't depend on destination node.
255#[derive(Clone, Debug, Eq, PartialEq)]
256struct AnnouncePacketData<'a> {
257    /// `SecretKey` used to encrypt and decrypt announce packets.
258    packet_sk: &'a SecretKey,
259    /// `PublicKey` used to encrypt and decrypt announce packets.
260    packet_pk: PublicKey,
261    /// Key that should be used to search close nodes.
262    search_pk: PublicKey,
263    /// `PublicKey` key that should be used to send data packets to us.
264    data_pk: Option<PublicKey>,
265}
266
267impl<'a> AnnouncePacketData<'a> {
268    /// Create `InnerOnionAnnounceRequest`. The request is a search request if
269    /// pind_id is 0 and an announce request otherwise.
270    fn request(&self, node_pk: &PublicKey, ping_id: Option<sha256::Digest>, request_id: u64) -> InnerOnionAnnounceRequest {
271        let payload = OnionAnnounceRequestPayload {
272            ping_id: ping_id.unwrap_or_else(initial_ping_id),
273            search_pk: self.search_pk,
274            data_pk: self.data_pk.unwrap_or(PublicKey([0; 32])),
275            sendback_data: request_id,
276        };
277        InnerOnionAnnounceRequest::new(
278            &precompute(node_pk, &self.packet_sk),
279            &self.packet_pk,
280            &payload
281        )
282    }
283    /// Create `InnerOnionAnnounceRequest` for a search request.
284    pub fn search_request(&self, node_pk: &PublicKey, request_id: u64) -> InnerOnionAnnounceRequest {
285        self.request(node_pk, None, request_id)
286    }
287    /// Create `InnerOnionAnnounceRequest` for an announce request.
288    pub fn announce_request(&self, node_pk: &PublicKey, ping_id: sha256::Digest, request_id: u64) -> InnerOnionAnnounceRequest {
289        self.request(node_pk, Some(ping_id), request_id)
290    }
291}
292
293/// Onion client state.
294#[derive(Clone, Debug)]
295struct OnionClientState {
296    /// Pool of random onion paths.
297    paths_pool: PathsPool,
298    /// List of nodes we announce ourselves to.
299    announce_list: Kbucket<OnionNode>,
300    /// Struct that stores and manages requests IDs and timeouts.
301    announce_requests: RequestQueue<AnnounceRequestData>,
302    /// List of friends we are looking for.
303    friends: HashMap<PublicKey, OnionFriend>,
304    /// Sink to send DHT `PublicKey` when it gets known. The first key is a long
305    /// term key, the second key is a DHT key.
306    dht_pk_tx: Option<DhtPkTx>,
307    /// Sink to send friend requests when we get them. The key is a long term
308    /// key.
309    friend_request_tx: Option<FriendRequestTx>,
310}
311
312impl OnionClientState {
313    pub fn new() -> Self {
314        OnionClientState {
315            paths_pool: PathsPool::new(),
316            announce_list: Kbucket::new(MAX_ONION_ANNOUNCE_NODES),
317            announce_requests: RequestQueue::new(ANNOUNCE_TIMEOUT),
318            friends: HashMap::new(),
319            dht_pk_tx: None,
320            friend_request_tx: None,
321        }
322    }
323}
324
325/// Onion client that is responsible for announcing our DHT `PublicKey` to our
326/// friends and looking for their DHT `PublicKey`s.
327#[derive(Clone)]
328pub struct OnionClient {
329    /// DHT server instance.
330    dht: DhtServer,
331    /// TCP connections instance.
332    tcp_connections: TcpConnections,
333    /// Our long term `SecretKey`.
334    real_sk: SecretKey,
335    /// Our long term `PublicKey`.
336    real_pk: PublicKey,
337    /// `SecretKey` for data packets that we can accept.
338    data_sk: SecretKey,
339    /// `PublicKey` that should be used to encrypt data packets that we can
340    /// accept.
341    data_pk: PublicKey,
342    /// Onion client state.
343    state: Arc<Mutex<OnionClientState>>,
344}
345
346impl OnionClient {
347    /// Create new `OnionClient`.
348    pub fn new(
349        dht: DhtServer,
350        tcp_connections: TcpConnections,
351        real_sk: SecretKey,
352        real_pk: PublicKey
353    ) -> Self {
354        let (data_pk, data_sk) = gen_keypair();
355        OnionClient {
356            dht,
357            tcp_connections,
358            real_sk,
359            real_pk,
360            data_sk,
361            data_pk,
362            state: Arc::new(Mutex::new(OnionClientState::new())),
363        }
364    }
365
366    /// Set sink to send DHT `PublicKey` when it gets known.
367    pub async fn set_dht_pk_sink(&self, dht_pk_tx: DhtPkTx) {
368        self.state.lock().await.dht_pk_tx = Some(dht_pk_tx);
369    }
370
371    /// Set sink to receive `FriendRequest`s.
372    pub async fn set_friend_request_sink(&self, friend_request_sink: FriendRequestTx) {
373        self.state.lock().await.friend_request_tx = Some(friend_request_sink)
374    }
375
376    /// Check if a node was pinged recently.
377    fn is_pinged_recently(&self, pk: PublicKey, search_pk: PublicKey, request_queue: &RequestQueue<AnnounceRequestData>) -> bool {
378        let check_pks = |data: &AnnounceRequestData| -> bool {
379            let request_search_pk = if let Some(friend_pk) = data.friend_pk {
380                friend_pk
381            } else {
382                self.real_pk
383            };
384            data.pk == pk && search_pk == request_search_pk
385        };
386        request_queue.get_values()
387            .any(|(ping_time, request_data)| check_pks(request_data) &&
388                clock_elapsed(ping_time) < MIN_NODE_PING_TIME)
389    }
390
391    /// Send onion request via TCP or UDP depending on path.
392    async fn send_onion_request(&self, path: OnionPath, inner_onion_request: InnerOnionRequest, saddr: SocketAddr)
393        -> Result<(), mpsc::SendError> {
394        match path.path_type {
395            OnionPathType::TCP => {
396                let onion_request = path.create_tcp_onion_request(saddr, inner_onion_request);
397                // TODO: can we handle errors better? Right now we can try send a
398                // request to a non-existent or suspended node which returns an error
399                self.tcp_connections.send_onion(path.nodes[0].public_key, onion_request).await.ok();
400                Ok(())
401            },
402            OnionPathType::UDP => {
403                let onion_request =
404                    path.create_udp_onion_request(saddr, inner_onion_request);
405                let mut tx = self.dht.tx.clone();
406
407                tx.send((
408                    Packet::OnionRequest0(onion_request),
409                    path.nodes[0].saddr
410                )).await
411            },
412        }
413    }
414
415    /// Handle `OnionAnnounceResponse` packet.
416    pub async fn handle_announce_response(&self, packet: &OnionAnnounceResponse, is_global: bool) -> Result<(), HandleAnnounceResponseError> {
417        let state = &mut *self.state.lock().await;
418
419        let announce_data = if let Some(announce_data) = state.announce_requests.check_ping_id(packet.sendback_data, |_| true) {
420            announce_data
421        } else {
422            return Err(HandleAnnounceResponseErrorKind::InvalidRequestId.into());
423        };
424
425        // Assign variables depending on response type (was it announcing or searching request)
426        let (nodes_list, last_seen, announce_packet_data) = if let Some(ref friend_pk) = announce_data.friend_pk {
427            if let Some(friend) = state.friends.get_mut(friend_pk) {
428                let announce_packet_data = AnnouncePacketData {
429                    packet_sk: &friend.temporary_sk,
430                    packet_pk: friend.temporary_pk,
431                    search_pk: friend.real_pk,
432                    data_pk: None,
433                };
434                (&mut friend.close_nodes, Some(&mut friend.last_seen), announce_packet_data)
435            } else {
436                return Err(HandleAnnounceResponseErrorKind::NoFriendWithPk.into());
437            }
438        } else {
439            let announce_packet_data = AnnouncePacketData {
440                packet_sk: &self.real_sk,
441                packet_pk: self.real_pk,
442                search_pk: self.real_pk,
443                data_pk: Some(self.data_pk),
444            };
445            (&mut state.announce_list, None, announce_packet_data)
446        };
447
448        let payload = match packet.get_payload(&precompute(&announce_data.pk, &announce_packet_data.packet_sk)) {
449            Ok(payload) => payload,
450            Err(e) => return Err(e.context(HandleAnnounceResponseErrorKind::InvalidPayload).into())
451        };
452
453        trace!("OnionAnnounceResponse status: {:?}, data: {:?}", payload.announce_status, announce_data);
454
455        if announce_data.friend_pk.is_some() && payload.announce_status == AnnounceStatus::Announced ||
456            announce_data.friend_pk.is_none() && payload.announce_status == AnnounceStatus::Found {
457            return Err(HandleAnnounceResponseErrorKind::InvalidAnnounceStatus.into());
458        }
459
460        state.paths_pool.set_timeouts(announce_data.path_id, announce_data.friend_pk.is_some());
461
462        if payload.announce_status == AnnounceStatus::Found {
463            if let Some(last_seen) = last_seen {
464                *last_seen = Some(clock_now());
465            }
466        }
467
468        let (ping_id, data_pk) = if payload.announce_status == AnnounceStatus::Found {
469            (None, Some(digest_as_pk(payload.ping_id_or_pk)))
470        } else {
471            (Some(payload.ping_id_or_pk), None)
472        };
473
474        let now = clock_now();
475        nodes_list.try_add(&announce_packet_data.search_pk, OnionNode {
476            pk: announce_data.pk,
477            saddr: announce_data.saddr,
478            path_id: announce_data.path_id,
479            ping_id,
480            data_pk,
481            unsuccessful_pings: 0,
482            added_time: now,
483            ping_time: now,
484            response_time: now,
485            announce_status: payload.announce_status,
486        }, /* evict */ true);
487
488        state.paths_pool.path_nodes.put(PackedNode::new(announce_data.saddr, &announce_data.pk));
489
490        for node in &payload.nodes {
491            // skip LAN nodes if the packet wasn't received from LAN
492            if !IsGlobal::is_global(&node.ip()) && is_global {
493                continue;
494            }
495
496            // do not ping nodes that was pinged recently
497            if self.is_pinged_recently(node.pk, announce_packet_data.search_pk, &state.announce_requests) {
498                continue;
499            }
500
501            if !nodes_list.can_add(&announce_packet_data.search_pk, &node, /* evict */ true) {
502                continue;
503            }
504
505            let path = if let Some(path) = state.paths_pool.random_path(&self.dht, &self.tcp_connections, announce_data.friend_pk.is_some()).await {
506                path
507            } else {
508                continue
509            };
510
511            let request_id = state.announce_requests.new_ping_id(AnnounceRequestData {
512                pk: node.pk,
513                saddr: node.saddr,
514                path_id: path.id(),
515                friend_pk: announce_data.friend_pk,
516            });
517
518            let inner_announce_request = announce_packet_data.search_request(&node.pk, request_id);
519            self.send_onion_request(path, InnerOnionRequest::InnerOnionAnnounceRequest(inner_announce_request), node.saddr)
520                .await
521                .map_err(|e| e.context(HandleAnnounceResponseErrorKind::SendTo))?;
522        }
523
524        Ok(())
525    }
526
527    /// Handle DHT `PublicKey` announce from both onion and DHT.
528    pub async fn handle_dht_pk_announce(&self, friend_pk: PublicKey, dht_pk_announce: DhtPkAnnouncePayload) -> Result<(), HandleDhtPkAnnounceError> {
529        let mut state = self.state.lock().await;
530
531        let friend = match state.friends.get_mut(&friend_pk) {
532            Some(friend) => friend,
533            None => return Err(HandleDhtPkAnnounceErrorKind::NoFriendWithPk.into())
534        };
535
536        if dht_pk_announce.no_reply <= friend.last_no_reply {
537            return Err(HandleDhtPkAnnounceErrorKind::InvalidNoReply.into())
538        }
539
540        friend.last_no_reply = dht_pk_announce.no_reply;
541        friend.dht_pk = Some(dht_pk_announce.dht_pk);
542        friend.last_seen = Some(clock_now());
543
544        let tx = state.dht_pk_tx.clone();
545        let dht_pk = dht_pk_announce.dht_pk;
546        maybe_send_unbounded(tx, (friend_pk, dht_pk)).await
547            .map_err(|e| e.context(HandleDhtPkAnnounceErrorKind::SendTo))?;
548
549        for node in dht_pk_announce.nodes.into_iter() {
550            match node.ip_port.protocol {
551                ProtocolType::UDP => {
552                    let packed_node = PackedNode::new(node.ip_port.to_saddr(), &node.pk);
553                    self.dht.ping_node(&packed_node).await
554                        .map_err(|e| e.context(HandleDhtPkAnnounceErrorKind::PingNode))?;
555                },
556                ProtocolType::TCP => {
557                    self.tcp_connections.add_relay_connection(node.ip_port.to_saddr(), node.pk, dht_pk_announce.dht_pk).await
558                        .map_err(|e| e.context(HandleDhtPkAnnounceErrorKind::AddRelay))?;
559                }
560            }
561        }
562
563        Ok(())
564    }
565
566    /// Handle `OnionDataResponse` packet.
567    pub async fn handle_data_response(&self, packet: &OnionDataResponse) -> Result<(), HandleDataResponseError> {
568        let payload = match packet.get_payload(&precompute(&packet.temporary_pk, &self.data_sk)) {
569            Ok(payload) => payload,
570            Err(e) => return Err(e.context(HandleDataResponseErrorKind::InvalidPayload).into())
571        };
572        let iner_payload = match payload.get_payload(&packet.nonce, &precompute(&payload.real_pk, &self.real_sk)) {
573            Ok(payload) => payload,
574            Err(e) => return Err(e.context(HandleDataResponseErrorKind::InvalidInnerPayload).into())
575        };
576        match iner_payload {
577            OnionDataResponseInnerPayload::DhtPkAnnounce(dht_pk_announce) =>
578                self.handle_dht_pk_announce(payload.real_pk, dht_pk_announce).await
579                    .map_err(|e| e.context(HandleDataResponseErrorKind::DhtPkAnnounce).into()),
580            OnionDataResponseInnerPayload::FriendRequest(friend_request) => {
581                let tx = self.state.lock().await.friend_request_tx.clone();
582                maybe_send_unbounded(tx, (payload.real_pk, friend_request)).await
583                    .map_err(|e| e.context(HandleDataResponseErrorKind::FriendRequest).into())
584            }
585        }
586    }
587
588    /// Add new node to random nodes pool to use them to build random paths.
589    pub async fn add_path_node(&self, node: PackedNode) {
590        let mut state = self.state.lock().await;
591
592        state.paths_pool.path_nodes.put(node);
593    }
594
595    /// Add a friend to start looking for its DHT `PublicKey`.
596    pub async fn add_friend(&self, real_pk: PublicKey) {
597        let mut state = self.state.lock().await;
598
599        state.friends.insert(real_pk, OnionFriend::new(real_pk));
600    }
601
602    /// Remove a friend and stop looking for him.
603    pub async fn remove_friend(&self, real_pk: PublicKey) {
604        let mut state = self.state.lock().await;
605
606        state.friends.remove(&real_pk);
607    }
608
609    /// Set connection status of a friend. If he's connected we can stop looking
610    /// for his DHT `PublicKey`.
611    pub async fn set_friend_connected(&self, real_pk: PublicKey, connected: bool) {
612        let mut state = self.state.lock().await;
613
614        if let Some(friend) = state.friends.get_mut(&real_pk) {
615            if friend.connected && !connected {
616                friend.last_seen = Some(clock_now());
617                friend.search_count = 0;
618                // reset no_reply for the case when a friend will try to connect
619                // from a different device with different clock
620                friend.last_no_reply = 0;
621            }
622            friend.connected = connected;
623        }
624    }
625
626    /// Set friend's DHT `PublicKey` when it gets known somewhere else.
627    pub async fn set_friend_dht_pk(&self, real_pk: PublicKey, dht_pk: PublicKey) {
628        let mut state = self.state.lock().await;
629
630        if let Some(friend) = state.friends.get_mut(&real_pk) {
631            friend.dht_pk = Some(dht_pk);
632        }
633    }
634
635    /// Generic function for sending search and announce requests to close nodes.
636    async fn ping_close_nodes<'a>(
637        &self,
638        close_nodes: &mut Kbucket<OnionNode>,
639        paths_pool: &mut PathsPool,
640        announce_requests: &mut RequestQueue<AnnounceRequestData>,
641        announce_packet_data: AnnouncePacketData<'a>,
642        friend_pk: Option<PublicKey>,
643        interval: Option<Duration>
644    ) -> Result<bool, mpsc::SendError> {
645        let capacity = close_nodes.capacity();
646        let ping_random = close_nodes.iter().all(|node|
647            clock_elapsed(node.ping_time) >= ONION_NODE_PING_INTERVAL &&
648                // ensure we get a response from some node roughly once per interval / capacity
649                interval.map_or(true, |interval| clock_elapsed(node.response_time) >= interval / capacity as u32)
650        );
651        let mut packets_sent = false;
652        let mut good_nodes_count = 0;
653        for node in close_nodes.iter_mut() {
654            if !node.is_timed_out() {
655                good_nodes_count += 1;
656            }
657
658            if node.is_ping_attempts_exhausted() {
659                continue;
660            }
661
662            let interval = if let Some(interval) = interval {
663                interval
664            } else if node.announce_status == AnnounceStatus::Announced {
665                if let Some(stored_path) = paths_pool.get_stored_path(node.path_id, friend_pk.is_some()) {
666                    if node.is_stable() && stored_path.is_stable() {
667                        ANNOUNCE_INTERVAL_STABLE
668                    } else {
669                        ANNOUNCE_INTERVAL_ANNOUNCED
670                    }
671                } else {
672                    ANNOUNCE_INTERVAL_NOT_ANNOUNCED
673                }
674            } else {
675                ANNOUNCE_INTERVAL_NOT_ANNOUNCED
676            };
677
678            if clock_elapsed(node.ping_time) >= interval || ping_random && random_limit_usize(capacity) == 0 {
679                // Last chance for a long-lived node
680                let path = if node.is_last_ping_attempt() && node.is_stable() {
681                    paths_pool.random_path(&self.dht, &self.tcp_connections, friend_pk.is_some()).await
682                } else {
683                    paths_pool.get_or_random_path(&self.dht, &self.tcp_connections, node.path_id, friend_pk.is_some()).await
684                };
685
686                let path = if let Some(path) = path {
687                    path
688                } else {
689                    continue
690                };
691
692                node.unsuccessful_pings += 1;
693                node.ping_time = clock_now();
694
695                let request_id = announce_requests.new_ping_id(AnnounceRequestData {
696                    pk: node.pk,
697                    saddr: node.saddr,
698                    path_id: path.id(),
699                    friend_pk,
700                });
701
702                let inner_announce_request = match node.ping_id {
703                    Some(ping_id) if friend_pk.is_none() => announce_packet_data.announce_request(&node.pk, ping_id, request_id),
704                    _ => announce_packet_data.search_request(&node.pk, request_id)
705                };
706                self.send_onion_request(path, InnerOnionRequest::InnerOnionAnnounceRequest(inner_announce_request), node.saddr).await?;
707                packets_sent = true;
708            }
709        }
710
711        if good_nodes_count <= random_limit_usize(close_nodes.capacity()) {
712            for _ in 0 .. close_nodes.capacity() / 2 {
713                let node = if let Some(node) = paths_pool.path_nodes.rand() {
714                    node
715                } else {
716                    break
717                };
718
719                let path = if let Some(path) = paths_pool.random_path(&self.dht, &self.tcp_connections, friend_pk.is_some()).await {
720                    path
721                } else {
722                    break
723                };
724
725                let request_id = announce_requests.new_ping_id(AnnounceRequestData {
726                    pk: node.pk,
727                    saddr: node.saddr,
728                    path_id: path.id(),
729                    friend_pk,
730                });
731
732                let inner_announce_request = announce_packet_data.request(&node.pk, None, request_id);
733                self.send_onion_request(path, InnerOnionRequest::InnerOnionAnnounceRequest(inner_announce_request), node.saddr).await?;
734                packets_sent = true;
735            }
736        }
737
738        Ok(packets_sent)
739    }
740
741    /// Announce ourselves periodically.
742    async fn announce_loop(&self, state: &mut OnionClientState) -> Result<(), RunError> {
743        let announce_packet_data = AnnouncePacketData {
744            packet_sk: &self.real_sk,
745            packet_pk: self.real_pk,
746            search_pk: self.real_pk,
747            data_pk: Some(self.data_pk),
748        };
749
750        self.ping_close_nodes(
751            &mut state.announce_list,
752            &mut state.paths_pool,
753            &mut state.announce_requests,
754            announce_packet_data,
755            None,
756            None,
757        ).await.map(drop).map_err(|e| e.context(RunErrorKind::SendTo).into())
758    }
759
760    /// Get nodes to include to DHT `PublicKey` announcement packet.
761    async fn dht_pk_nodes(&self) -> Vec<TcpUdpPackedNode> {
762        let relays = self.tcp_connections.get_random_relays(2).await;
763        let close_nodes: Vec<PackedNode> = self.dht.get_closest(&self.dht.pk, 4 - relays.len() as u8, false).await.into();
764        relays.into_iter().map(|node| TcpUdpPackedNode {
765            pk: node.pk,
766            ip_port: IpPort::from_tcp_saddr(node.saddr),
767        }).chain(close_nodes.into_iter().map(|node| TcpUdpPackedNode {
768            pk: node.pk,
769            ip_port: IpPort::from_udp_saddr(node.saddr),
770        })).collect()
771    }
772
773    /// Announce our DHT `PublicKey` to a friend via onion.
774    async fn send_dht_pk_onion(&self, friend: &mut OnionFriend, paths_pool: &mut PathsPool) -> Result<(), mpsc::SendError> {
775        let dht_pk_announce = DhtPkAnnouncePayload::new(self.dht.pk, self.dht_pk_nodes().await);
776        let inner_payload = OnionDataResponseInnerPayload::DhtPkAnnounce(dht_pk_announce);
777        let nonce = gen_nonce();
778        let payload = OnionDataResponsePayload::new(&precompute(&friend.real_pk, &self.real_sk), self.real_pk, &nonce, &inner_payload);
779
780        let mut packets_sent = false;
781
782        for node in friend.close_nodes.iter() {
783            if node.is_timed_out() {
784                continue;
785            }
786
787            let data_pk = if let Some(data_pk) = node.data_pk {
788                data_pk
789            } else {
790                continue
791            };
792
793            let path = if let Some(path) = paths_pool.get_or_random_path(&self.dht, &self.tcp_connections, node.path_id, true).await {
794                path
795            } else {
796                continue
797            };
798
799            let (temporary_pk, temporary_sk) = gen_keypair();
800            let inner_data_request = InnerOnionDataRequest::new(&precompute(&data_pk, &temporary_sk), friend.real_pk, temporary_pk, nonce, &payload);
801
802            self.send_onion_request(path, InnerOnionRequest::InnerOnionDataRequest(inner_data_request), node.saddr).await?;
803            packets_sent = true;
804        }
805
806        if packets_sent {
807            friend.last_dht_pk_onion_sent = Some(clock_now());
808        }
809
810        Ok(())
811    }
812
813    /// Announce our DHT `PublicKey` to a friend via `DhtRequest`.
814    async fn send_dht_pk_dht_request(&self, friend: &mut OnionFriend) -> Result<(), mpsc::SendError> {
815        let friend_dht_pk = if let Some(friend_dht_pk) = friend.dht_pk {
816            friend_dht_pk
817        } else {
818            return Ok(());
819        };
820
821        let dht_pk_announce_payload = DhtPkAnnouncePayload::new(self.dht.pk, self.dht_pk_nodes().await);
822        let dht_pk_announce = DhtPkAnnounce::new(
823            &precompute(&friend.real_pk, &self.real_sk),
824            self.real_pk,
825            &dht_pk_announce_payload
826        );
827        let payload = DhtRequestPayload::DhtPkAnnounce(dht_pk_announce);
828        let packet = DhtRequest::new(
829            &precompute(&friend_dht_pk, &self.dht.sk),
830            &friend_dht_pk,
831            &self.dht.pk,
832            &payload
833        );
834        let packet = Packet::DhtRequest(packet);
835
836        let nodes = self.dht.get_closest(&friend_dht_pk, 8, false).await;
837
838        if !nodes.is_empty() {
839            friend.last_dht_pk_dht_sent = Some(clock_now());
840        }
841
842        let packets = nodes.iter()
843            .map(|node| (packet.clone(), node.saddr))
844            .collect::<Vec<_>>();
845
846        let mut tx = self.dht.tx.clone();
847        let mut stream = futures::stream::iter(packets).map(Ok);
848        tx.send_all(&mut stream).await
849    }
850
851    /// Search friends periodically.
852    async fn friends_loop(&self, state: &mut OnionClientState) -> Result<(), RunError> {
853        for friend in state.friends.values_mut() {
854            if friend.connected {
855                continue;
856            }
857
858            let announce_packet_data = AnnouncePacketData {
859                packet_sk: &friend.temporary_sk,
860                packet_pk: friend.temporary_pk,
861                search_pk: friend.real_pk,
862                data_pk: None,
863            };
864
865            let interval = if friend.search_count < SEARCH_COUNT_FRIEND_ANNOUNCE_BEGINNING {
866                ANNOUNCE_FRIEND_BEGINNING
867            } else {
868                let backoff_interval = friend.last_seen.map_or_else(
869                    || ONION_FRIEND_MAX_PING_INTERVAL,
870                    |last_seen| clock_elapsed(last_seen) / ONION_FRIEND_BACKOFF_FACTOR
871                );
872                backoff_interval
873                    .min(ONION_FRIEND_MAX_PING_INTERVAL)
874                    .max(ANNOUNCE_FRIEND)
875            };
876
877            let packets_sent = self.ping_close_nodes(
878                &mut friend.close_nodes,
879                &mut state.paths_pool,
880                &mut state.announce_requests,
881                announce_packet_data,
882                Some(friend.real_pk),
883                Some(interval),
884            ).await.map_err(|e| e.context(RunErrorKind::SendTo))?;
885
886            if packets_sent {
887                friend.search_count = friend.search_count.saturating_add(1);
888            }
889
890            if friend.last_dht_pk_onion_sent.map_or(true, |time| clock_elapsed(time) > ONION_DHTPK_SEND_INTERVAL) {
891                self.send_dht_pk_onion(friend, &mut state.paths_pool).await.map_err(|e| e.context(RunErrorKind::SendTo))?;
892            }
893
894            if friend.last_dht_pk_dht_sent.map_or(true, |time| clock_elapsed(time) > DHT_DHTPK_SEND_INTERVAL) {
895                self.send_dht_pk_dht_request(friend).await.map_err(|e| e.context(RunErrorKind::SendTo))?;
896            }
897        }
898
899        Ok(())
900    }
901
902    /// Populate nodes pool from DHT for building random paths.
903    async fn populate_path_nodes(&self, state: &mut OnionClientState) {
904        for node in self.dht.random_friend_nodes(MAX_ONION_ANNOUNCE_NODES).await {
905            state.paths_pool.path_nodes.put(node);
906        }
907    }
908
909    /// Run periodical announcements and friends searching.
910    pub async fn run(&self) -> Result<(), RunError> {
911        let interval = Duration::from_secs(1);
912        let mut wakeups = tokio::time::interval(interval);
913
914        while wakeups.next().await.is_some() {
915            trace!("Onion client sender wake up");
916
917            let mut state = self.state.lock().await;
918            self.populate_path_nodes(&mut state).await;
919
920            self.announce_loop(&mut state).await?;
921            self.friends_loop(&mut state).await?;
922        }
923
924        Ok(())
925    }
926}
927
928#[cfg(test)]
929mod tests {
930    use super::*;
931    use tox_binary_io::*;
932
933    impl OnionClient {
934        pub async fn has_friend(&self, pk: &PublicKey) -> bool {
935            self.state.lock().await.friends.contains_key(pk)
936        }
937
938        pub async fn friend_dht_pk(&self, pk: &PublicKey) -> Option<PublicKey> {
939            self.state.lock().await.friends.get(pk).and_then(|friend| friend.dht_pk)
940        }
941
942        pub async fn is_friend_connected(&self, pk: &PublicKey) -> bool {
943            self.state.lock().await.friends.get(pk).map_or(false, |friend| friend.connected)
944        }
945    }
946
947    fn unpack_onion_packet(packet: OnionRequest0, saddr: SocketAddr, key_by_addr: &HashMap<SocketAddr, SecretKey>) -> OnionRequest2Payload {
948        let payload = packet.get_payload(&precompute(&packet.temporary_pk, &key_by_addr[&saddr])).unwrap();
949        let packet = OnionRequest1 {
950            nonce: packet.nonce,
951            temporary_pk: payload.temporary_pk,
952            payload: payload.inner,
953            onion_return: OnionReturn {
954                nonce: secretbox::gen_nonce(),
955                payload: vec![42; 123],
956            }
957        };
958        let payload = packet.get_payload(&precompute(&packet.temporary_pk, &key_by_addr[&payload.ip_port.to_saddr()])).unwrap();
959        let packet = OnionRequest2 {
960            nonce: packet.nonce,
961            temporary_pk: payload.temporary_pk,
962            payload: payload.inner,
963            onion_return: OnionReturn {
964                nonce: secretbox::gen_nonce(),
965                payload: vec![42; 123],
966            }
967        };
968        packet.get_payload(&precompute(&packet.temporary_pk, &key_by_addr[&payload.ip_port.to_saddr()])).unwrap()
969    }
970
971    #[test]
972    fn onion_node_is_outdated() {
973        let now = Instant::now();
974        let (pk, _sk) = gen_keypair();
975        let saddr = "127.0.0.1:12345".parse().unwrap();
976        let onion_node = OnionNode {
977            pk,
978            saddr,
979            path_id: OnionPathId {
980                keys: [gen_keypair().0, gen_keypair().0, gen_keypair().0],
981                path_type: OnionPathType::UDP,
982            },
983            ping_id: None,
984            data_pk: None,
985            unsuccessful_pings: 0,
986            added_time: now,
987            ping_time: now,
988            response_time: now,
989            announce_status: AnnounceStatus::Announced,
990        };
991
992        assert!(!onion_node.is_outdated(&PackedNode::new(saddr, &pk)));
993        let other_saddr = "127.0.0.1:12346".parse().unwrap();
994        assert!(onion_node.is_outdated(&PackedNode::new(other_saddr, &pk)))
995    }
996
997    #[tokio::test]
998    async fn onion_node_update() {
999        tokio::time::pause();
1000        let now = clock_now();
1001        let (pk, _sk) = gen_keypair();
1002        let mut onion_node = OnionNode {
1003            pk,
1004            saddr: "127.0.0.1:12345".parse().unwrap(),
1005            path_id: OnionPathId {
1006                keys: [gen_keypair().0, gen_keypair().0, gen_keypair().0],
1007                path_type: OnionPathType::UDP,
1008            },
1009            ping_id: None,
1010            data_pk: None,
1011            unsuccessful_pings: 1,
1012            added_time: now,
1013            ping_time: now,
1014            response_time: now,
1015            announce_status: AnnounceStatus::Failed,
1016        };
1017
1018        let saddr = "127.0.0.1:12346".parse().unwrap();
1019        let path_id = OnionPathId {
1020            keys: [gen_keypair().0, gen_keypair().0, gen_keypair().0],
1021            path_type: OnionPathType::UDP,
1022        };
1023        let ping_id = sha256::hash(&[1, 2, 3]);
1024        let data_pk = gen_keypair().0;
1025        let new_now = now + Duration::from_secs(1);
1026        let other_onion_node = OnionNode {
1027            pk,
1028            saddr,
1029            path_id,
1030            ping_id: Some(ping_id),
1031            data_pk: Some(data_pk),
1032            unsuccessful_pings: 0,
1033            added_time: now,
1034            ping_time: now,
1035            response_time: now,
1036            announce_status: AnnounceStatus::Announced,
1037        };
1038
1039        tokio::time::advance(Duration::from_secs(1)).await;
1040
1041        onion_node.update(&other_onion_node);
1042
1043        assert_eq!(onion_node.saddr, saddr);
1044        assert_eq!(onion_node.path_id, path_id);
1045        assert_eq!(onion_node.ping_id, Some(ping_id));
1046        assert_eq!(onion_node.data_pk, Some(data_pk));
1047        assert_eq!(onion_node.unsuccessful_pings, 0);
1048        assert_eq!(onion_node.added_time, now);
1049        assert_eq!(onion_node.ping_time, now);
1050        assert_eq!(onion_node.response_time, new_now);
1051        assert_eq!(onion_node.announce_status, AnnounceStatus::Announced);
1052    }
1053
1054    #[tokio::test]
1055    async fn onion_node_is_evictable() {
1056        let now = Instant::now();
1057        let mut onion_node = OnionNode {
1058            pk: gen_keypair().0,
1059            saddr: "127.0.0.1:12345".parse().unwrap(),
1060            path_id: OnionPathId {
1061                keys: [gen_keypair().0, gen_keypair().0, gen_keypair().0],
1062                path_type: OnionPathType::UDP,
1063            },
1064            ping_id: None,
1065            data_pk: None,
1066            unsuccessful_pings: 0,
1067            added_time: now,
1068            ping_time: now,
1069            response_time: now,
1070            announce_status: AnnounceStatus::Announced,
1071        };
1072
1073        assert!(!onion_node.is_evictable());
1074
1075        onion_node.unsuccessful_pings = ONION_NODE_MAX_PINGS;
1076
1077        tokio::time::pause();
1078        // time when node is timed out
1079        tokio::time::advance(ONION_NODE_TIMEOUT + Duration::from_secs(1)).await;
1080
1081        assert!(onion_node.is_evictable());
1082    }
1083
1084    #[tokio::test]
1085    async fn add_path_node() {
1086        let (dht_pk, dht_sk) = gen_keypair();
1087        let (real_pk, real_sk) = gen_keypair();
1088        let (udp_tx, _udp_rx) = mpsc::channel(1);
1089        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1090        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1091        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1092        let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
1093
1094        let node = PackedNode::new("127.0.0.1:12345".parse().unwrap(), &gen_keypair().0);
1095        onion_client.add_path_node(node).await;
1096
1097        let state = onion_client.state.lock().await;
1098        assert_eq!(state.paths_pool.path_nodes.rand(), Some(node));
1099    }
1100
1101    #[tokio::test]
1102    async fn add_remove_friend() {
1103        let (dht_pk, dht_sk) = gen_keypair();
1104        let (real_pk, real_sk) = gen_keypair();
1105        let (udp_tx, _udp_rx) = mpsc::channel(1);
1106        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1107        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1108        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1109        let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
1110
1111        let (friend_pk, _friend_sk) = gen_keypair();
1112        onion_client.add_friend(friend_pk).await;
1113
1114        assert_eq!(onion_client.state.lock().await.friends[&friend_pk].real_pk, friend_pk);
1115
1116        onion_client.remove_friend(friend_pk).await;
1117
1118        assert!(!onion_client.state.lock().await.friends.contains_key(&friend_pk));
1119    }
1120
1121    #[tokio::test]
1122    async fn set_friend_connected() {
1123        let (dht_pk, dht_sk) = gen_keypair();
1124        let (real_pk, real_sk) = gen_keypair();
1125        let (udp_tx, _udp_rx) = mpsc::channel(1);
1126        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1127        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1128        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1129        let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
1130
1131        let (friend_pk, _friend_sk) = gen_keypair();
1132        onion_client.add_friend(friend_pk).await;
1133
1134        onion_client.set_friend_connected(friend_pk, true).await;
1135
1136        let state = onion_client.state.lock().await;
1137        assert!(state.friends[&friend_pk].connected);
1138        drop(state);
1139
1140        tokio::time::pause();
1141        let now = clock_now();
1142
1143        onion_client.set_friend_connected(friend_pk, false).await;
1144
1145        let state = onion_client.state.lock().await;
1146        let friend = &state.friends[&friend_pk];
1147        assert!(!friend.connected);
1148        assert_eq!(friend.last_seen, Some(now));
1149    }
1150
1151    #[tokio::test]
1152    async fn set_friend_dht_pk() {
1153        let (dht_pk, dht_sk) = gen_keypair();
1154        let (real_pk, real_sk) = gen_keypair();
1155        let (udp_tx, _udp_rx) = mpsc::channel(1);
1156        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1157        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1158        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1159        let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
1160
1161        let (friend_pk, _friend_sk) = gen_keypair();
1162        onion_client.add_friend(friend_pk).await;
1163
1164        let (friend_dht_pk, _friend_dht_sk) = gen_keypair();
1165        onion_client.set_friend_dht_pk(friend_pk, friend_dht_pk).await;
1166
1167        let state = onion_client.state.lock().await;
1168        assert_eq!(state.friends[&friend_pk].dht_pk, Some(friend_dht_pk));
1169    }
1170
1171    #[tokio::test]
1172    async fn handle_announce_response_announced() {
1173        let (dht_pk, dht_sk) = gen_keypair();
1174        let (real_pk, real_sk) = gen_keypair();
1175        let (udp_tx, udp_rx) = mpsc::channel(1);
1176        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1177        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1178        // make DHT connected so that we will build UDP onion paths
1179        dht.add_node(PackedNode::new("127.0.0.1:12345".parse().unwrap(), &gen_keypair().0)).await;
1180        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1181        let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
1182
1183        let mut state = onion_client.state.lock().await;
1184
1185        // map needed to decrypt onion packets later
1186        let mut key_by_addr = HashMap::new();
1187        let addr = "127.0.0.1".parse().unwrap();
1188        for i in 0 .. 3 {
1189            let saddr = SocketAddr::new(addr, 12346 + i);
1190            let (pk, sk) = gen_keypair();
1191            key_by_addr.insert(saddr, sk);
1192            let node = PackedNode::new(saddr, &pk);
1193            state.paths_pool.path_nodes.put(node);
1194        }
1195        let path = state.paths_pool.path_nodes.udp_path().unwrap();
1196
1197        let (sender_pk, sender_sk) = gen_keypair();
1198        let saddr = "127.0.0.1:12345".parse().unwrap();
1199
1200        // the sender will be added to the nodes pool so add it to the map
1201        key_by_addr.insert(saddr, sender_sk.clone());
1202
1203        let request_data = AnnounceRequestData {
1204            pk: sender_pk,
1205            saddr,
1206            path_id: path.id(),
1207            friend_pk: None,
1208        };
1209        let request_id = state.announce_requests.new_ping_id(request_data);
1210
1211        drop(state);
1212
1213        let ping_id = sha256::hash(&[1, 2, 3]);
1214        let (node_pk, node_sk) = gen_keypair();
1215        let node = PackedNode::new(SocketAddr::V4("5.6.7.8:12345".parse().unwrap()), &node_pk);
1216        let payload = OnionAnnounceResponsePayload {
1217            announce_status: AnnounceStatus::Announced,
1218            ping_id_or_pk: ping_id,
1219            nodes: vec![node]
1220        };
1221        let packet = OnionAnnounceResponse::new(&precompute(&real_pk, &sender_sk), request_id, &payload);
1222
1223        onion_client.handle_announce_response(&packet, true).await.unwrap();
1224
1225        let state = onion_client.state.lock().await;
1226
1227        // The sender should be added to close nodes
1228        let onion_node = state.announce_list.get_node(&real_pk, &sender_pk).unwrap();
1229        assert_eq!(onion_node.path_id, path.id());
1230        assert_eq!(onion_node.ping_id, Some(ping_id));
1231        assert_eq!(onion_node.data_pk, None);
1232        assert_eq!(onion_node.announce_status, AnnounceStatus::Announced);
1233
1234        // Node from the packet should be pinged
1235        let (received, _udp_rx) = udp_rx.into_future().await;
1236        let (packet, addr_to_send) = received.unwrap();
1237
1238        let packet = unpack!(packet, Packet::OnionRequest0);
1239        let payload = unpack_onion_packet(packet, addr_to_send, &key_by_addr);
1240        let packet = unpack!(payload.inner, InnerOnionRequest::InnerOnionAnnounceRequest);
1241        let payload = packet.get_payload(&precompute(&real_pk, &node_sk)).unwrap();
1242        assert_eq!(payload.ping_id, initial_ping_id());
1243        assert_eq!(payload.search_pk, real_pk);
1244        assert_eq!(payload.data_pk, onion_client.data_pk);
1245    }
1246
1247    #[tokio::test]
1248    async fn handle_announce_response_announced_invalid_status() {
1249        let (dht_pk, dht_sk) = gen_keypair();
1250        let (real_pk, real_sk) = gen_keypair();
1251        let (udp_tx, _udp_rx) = mpsc::channel(1);
1252        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1253        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1254        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1255        let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
1256
1257        let mut state = onion_client.state.lock().await;
1258
1259        let (friend_pk, _friend_sk) = gen_keypair();
1260        let friend = OnionFriend::new(friend_pk);
1261        let friend_temporary_pk = friend.temporary_pk;
1262        state.friends.insert(friend_pk, friend);
1263
1264        let addr = "127.0.0.1".parse().unwrap();
1265        for i in 0 .. 3 {
1266            let saddr = SocketAddr::new(addr, 12346 + i);
1267            let (pk, _sk) = gen_keypair();
1268            let node = PackedNode::new(saddr, &pk);
1269            state.paths_pool.path_nodes.put(node);
1270        }
1271        let path = state.paths_pool.path_nodes.udp_path().unwrap();
1272
1273        let (sender_pk, sender_sk) = gen_keypair();
1274        let saddr = "127.0.0.1:12345".parse().unwrap();
1275
1276        let request_data = AnnounceRequestData {
1277            pk: sender_pk,
1278            saddr,
1279            path_id: path.id(),
1280            friend_pk: Some(friend_pk),
1281        };
1282        let request_id = state.announce_requests.new_ping_id(request_data);
1283
1284        drop(state);
1285
1286        let ping_id = sha256::hash(&[1, 2, 3]);
1287        let (node_pk, _node_sk) = gen_keypair();
1288        let node = PackedNode::new(SocketAddr::V4("5.6.7.8:12345".parse().unwrap()), &node_pk);
1289        let payload = OnionAnnounceResponsePayload {
1290            announce_status: AnnounceStatus::Announced,
1291            ping_id_or_pk: ping_id,
1292            nodes: vec![node]
1293        };
1294        let packet = OnionAnnounceResponse::new(&precompute(&friend_temporary_pk, &sender_sk), request_id, &payload);
1295
1296        let error = onion_client.handle_announce_response(&packet, true).await.err().unwrap();
1297        assert_eq!(error.kind(), &HandleAnnounceResponseErrorKind::InvalidAnnounceStatus);
1298    }
1299
1300    #[tokio::test]
1301    async fn handle_announce_response_announced_pinged_recently() {
1302        let (dht_pk, dht_sk) = gen_keypair();
1303        let (real_pk, real_sk) = gen_keypair();
1304        let (udp_tx, udp_rx) = mpsc::channel(1);
1305        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1306        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1307        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1308        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1309
1310        let mut state = onion_client.state.lock().await;
1311
1312        let addr = "127.0.0.1".parse().unwrap();
1313        for i in 0 .. 3 {
1314            let saddr = SocketAddr::new(addr, 12346 + i);
1315            let (pk, _sk) = gen_keypair();
1316            let node = PackedNode::new(saddr, &pk);
1317            state.paths_pool.path_nodes.put(node);
1318        }
1319        let path = state.paths_pool.path_nodes.udp_path().unwrap();
1320
1321        let (sender_pk, sender_sk) = gen_keypair();
1322        let saddr = "127.0.0.1:12345".parse().unwrap();
1323
1324        let request_data = AnnounceRequestData {
1325            pk: sender_pk,
1326            saddr,
1327            path_id: path.id(),
1328            friend_pk: None,
1329        };
1330        let request_id = state.announce_requests.new_ping_id(request_data);
1331
1332        // insert request to a node to announce_requests so that it won't be pinged again
1333        let (node_pk, _node_sk) = gen_keypair();
1334        let node = PackedNode::new(SocketAddr::V4("5.6.7.8:12345".parse().unwrap()), &node_pk);
1335        let node_request_data = AnnounceRequestData {
1336            pk: node_pk,
1337            saddr: node.saddr,
1338            path_id: path.id(),
1339            friend_pk: None,
1340        };
1341        let _node_request_id = state.announce_requests.new_ping_id(node_request_data);
1342
1343        drop(state);
1344
1345        let payload = OnionAnnounceResponsePayload {
1346            announce_status: AnnounceStatus::Announced,
1347            ping_id_or_pk: sha256::hash(&[1, 2, 3]),
1348            nodes: vec![node]
1349        };
1350        let packet = OnionAnnounceResponse::new(&precompute(&real_pk, &sender_sk), request_id, &payload);
1351
1352        onion_client.handle_announce_response(&packet, true).await.unwrap();
1353
1354        // Necessary to drop tx so that rx.collect() can be finished
1355        drop(onion_client);
1356
1357        assert!(udp_rx.collect::<Vec<_>>().await.is_empty());
1358    }
1359
1360    #[tokio::test]
1361    async fn handle_announce_response_found() {
1362        let (dht_pk, dht_sk) = gen_keypair();
1363        let (real_pk, real_sk) = gen_keypair();
1364        let (udp_tx, udp_rx) = mpsc::channel(1);
1365        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1366        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1367        // make DHT connected so that we will build UDP onion paths
1368        dht.add_node(PackedNode::new("127.0.0.1:12345".parse().unwrap(), &gen_keypair().0)).await;
1369        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1370        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1371
1372        let mut state = onion_client.state.lock().await;
1373
1374        let (friend_pk, _friend_sk) = gen_keypair();
1375        let friend = OnionFriend::new(friend_pk);
1376        let friend_temporary_pk = friend.temporary_pk;
1377        state.friends.insert(friend_pk, friend);
1378
1379        // map needed to decrypt onion packets later
1380        let mut key_by_addr = HashMap::new();
1381        let addr = "127.0.0.1".parse().unwrap();
1382        for i in 0 .. 3 {
1383            let saddr = SocketAddr::new(addr, 12346 + i);
1384            let (pk, sk) = gen_keypair();
1385            key_by_addr.insert(saddr, sk);
1386            let node = PackedNode::new(saddr, &pk);
1387            state.paths_pool.path_nodes.put(node);
1388        }
1389        let path = state.paths_pool.path_nodes.udp_path().unwrap();
1390
1391        let (sender_pk, sender_sk) = gen_keypair();
1392        let saddr = "127.0.0.1:12345".parse().unwrap();
1393
1394        // the sender will be added to the nodes pool so add it to the map
1395        key_by_addr.insert(saddr, sender_sk.clone());
1396
1397        let request_data = AnnounceRequestData {
1398            pk: sender_pk,
1399            saddr,
1400            path_id: path.id(),
1401            friend_pk: Some(friend_pk),
1402        };
1403        let request_id = state.announce_requests.new_ping_id(request_data);
1404
1405        drop(state);
1406
1407        let (friend_data_pk, _friend_data_sk) = gen_keypair();
1408        let (node_pk, node_sk) = gen_keypair();
1409        let node = PackedNode::new(SocketAddr::V4("5.6.7.8:12345".parse().unwrap()), &node_pk);
1410        let payload = OnionAnnounceResponsePayload {
1411            announce_status: AnnounceStatus::Found,
1412            ping_id_or_pk: pk_as_digest(friend_data_pk),
1413            nodes: vec![node]
1414        };
1415        let packet = OnionAnnounceResponse::new(&precompute(&friend_temporary_pk, &sender_sk), request_id, &payload);
1416
1417        onion_client.handle_announce_response(&packet, true).await.unwrap();
1418
1419        let state = onion_client.state.lock().await;
1420
1421        // The sender should be added to close nodes
1422        let onion_node = state.friends[&friend_pk].close_nodes.get_node(&real_pk, &sender_pk).unwrap();
1423        assert_eq!(onion_node.path_id, path.id());
1424        assert_eq!(onion_node.ping_id, None);
1425        assert_eq!(onion_node.data_pk, Some(friend_data_pk));
1426        assert_eq!(onion_node.announce_status, AnnounceStatus::Found);
1427
1428        // Node from the packet should be pinged
1429        let (received, _udp_rx) = udp_rx.into_future().await;
1430        let (packet, addr_to_send) = received.unwrap();
1431
1432        let packet = unpack!(packet, Packet::OnionRequest0);
1433        let payload = unpack_onion_packet(packet, addr_to_send, &key_by_addr);
1434        let packet = unpack!(payload.inner, InnerOnionRequest::InnerOnionAnnounceRequest);
1435        let payload = packet.get_payload(&precompute(&friend_temporary_pk, &node_sk)).unwrap();
1436        assert_eq!(payload.ping_id, initial_ping_id());
1437        assert_eq!(payload.search_pk, friend_pk);
1438        assert_eq!(payload.data_pk, PublicKey([0; 32]));
1439    }
1440
1441    #[tokio::test]
1442    async fn handle_announce_response_found_invalid_status() {
1443        let (dht_pk, dht_sk) = gen_keypair();
1444        let (real_pk, real_sk) = gen_keypair();
1445        let (udp_tx, _udp_rx) = mpsc::channel(1);
1446        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1447        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1448        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1449        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1450
1451        let mut state = onion_client.state.lock().await;
1452
1453        let addr = "127.0.0.1".parse().unwrap();
1454        for i in 0 .. 3 {
1455            let saddr = SocketAddr::new(addr, 12346 + i);
1456            let (pk, _sk) = gen_keypair();
1457            let node = PackedNode::new(saddr, &pk);
1458            state.paths_pool.path_nodes.put(node);
1459        }
1460        let path = state.paths_pool.path_nodes.udp_path().unwrap();
1461
1462        let (sender_pk, sender_sk) = gen_keypair();
1463        let saddr = "127.0.0.1:12345".parse().unwrap();
1464
1465        let request_data = AnnounceRequestData {
1466            pk: sender_pk,
1467            saddr,
1468            path_id: path.id(),
1469            friend_pk: None,
1470        };
1471        let request_id = state.announce_requests.new_ping_id(request_data);
1472
1473        drop(state);
1474
1475        let (friend_data_pk, _friend_data_sk) = gen_keypair();
1476        let (node_pk, _node_sk) = gen_keypair();
1477        let node = PackedNode::new(SocketAddr::V4("5.6.7.8:12345".parse().unwrap()), &node_pk);
1478        let payload = OnionAnnounceResponsePayload {
1479            announce_status: AnnounceStatus::Found,
1480            ping_id_or_pk: pk_as_digest(friend_data_pk),
1481            nodes: vec![node]
1482        };
1483        let packet = OnionAnnounceResponse::new(&precompute(&real_pk, &sender_sk), request_id, &payload);
1484
1485        let error = onion_client.handle_announce_response(&packet, true).await.err().unwrap();
1486        assert_eq!(error.kind(), &HandleAnnounceResponseErrorKind::InvalidAnnounceStatus);
1487    }
1488
1489    #[tokio::test]
1490    async fn handle_announce_response_no_friend_with_pk() {
1491        let (dht_pk, dht_sk) = gen_keypair();
1492        let (real_pk, real_sk) = gen_keypair();
1493        let (udp_tx, _udp_rx) = mpsc::channel(1);
1494        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1495        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1496        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1497        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1498
1499        let mut state = onion_client.state.lock().await;
1500
1501        let (friend_pk, _friend_sk) = gen_keypair();
1502        let (friend_temporary_pk, _friend_temporary_sk) = gen_keypair();
1503        let (sender_pk, sender_sk) = gen_keypair();
1504        let saddr = "127.0.0.1:12345".parse().unwrap();
1505
1506        let request_data = AnnounceRequestData {
1507            pk: sender_pk,
1508            saddr,
1509            path_id: OnionPathId {
1510                keys: [gen_keypair().0, gen_keypair().0, gen_keypair().0],
1511                path_type: OnionPathType::UDP,
1512            },
1513            friend_pk: Some(friend_pk),
1514        };
1515        let request_id = state.announce_requests.new_ping_id(request_data);
1516
1517        drop(state);
1518
1519        let (friend_data_pk, _friend_data_sk) = gen_keypair();
1520        let payload = OnionAnnounceResponsePayload {
1521            announce_status: AnnounceStatus::Found,
1522            ping_id_or_pk: pk_as_digest(friend_data_pk),
1523            nodes: vec![]
1524        };
1525        let packet = OnionAnnounceResponse::new(&precompute(&friend_temporary_pk, &sender_sk), request_id, &payload);
1526
1527        let error = onion_client.handle_announce_response(&packet, true).await.err().unwrap();
1528        assert_eq!(error.kind(), &HandleAnnounceResponseErrorKind::NoFriendWithPk);
1529    }
1530
1531    #[tokio::test]
1532    async fn handle_announce_response_invalid_payload() {
1533        let (dht_pk, dht_sk) = gen_keypair();
1534        let (real_pk, real_sk) = gen_keypair();
1535        let (udp_tx, _udp_rx) = mpsc::channel(1);
1536        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1537        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1538        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1539        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1540
1541        let mut state = onion_client.state.lock().await;
1542
1543        let (friend_pk, _friend_sk) = gen_keypair();
1544        let friend = OnionFriend::new(friend_pk);
1545        state.friends.insert(friend_pk, friend);
1546
1547        let (sender_pk, _sender_sk) = gen_keypair();
1548        let saddr = "127.0.0.1:12345".parse().unwrap();
1549
1550        let request_data = AnnounceRequestData {
1551            pk: sender_pk,
1552            saddr,
1553            path_id: OnionPathId {
1554                keys: [gen_keypair().0, gen_keypair().0, gen_keypair().0],
1555                path_type: OnionPathType::UDP,
1556            },
1557            friend_pk: Some(friend_pk),
1558        };
1559        let request_id = state.announce_requests.new_ping_id(request_data);
1560
1561        drop(state);
1562
1563        let packet = OnionAnnounceResponse {
1564            sendback_data: request_id,
1565            nonce: gen_nonce(),
1566            payload: vec![42; 123],
1567        };
1568
1569        let error = onion_client.handle_announce_response(&packet, true).await.err().unwrap();
1570        assert_eq!(error.kind(), &HandleAnnounceResponseErrorKind::InvalidPayload);
1571    }
1572
1573    #[tokio::test]
1574    async fn handle_announce_response_found_pinged_recently() {
1575        let (dht_pk, dht_sk) = gen_keypair();
1576        let (real_pk, real_sk) = gen_keypair();
1577        let (udp_tx, udp_rx) = mpsc::channel(1);
1578        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1579        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1580        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1581        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1582
1583        let mut state = onion_client.state.lock().await;
1584
1585        let (friend_pk, _friend_sk) = gen_keypair();
1586        let friend = OnionFriend::new(friend_pk);
1587        let friend_temporary_pk = friend.temporary_pk;
1588        state.friends.insert(friend_pk, friend);
1589
1590        let addr = "127.0.0.1".parse().unwrap();
1591        for i in 0 .. 3 {
1592            let saddr = SocketAddr::new(addr, 12346 + i);
1593            let (pk, _sk) = gen_keypair();
1594            let node = PackedNode::new(saddr, &pk);
1595            state.paths_pool.path_nodes.put(node);
1596        }
1597        let path = state.paths_pool.path_nodes.udp_path().unwrap();
1598
1599        let (sender_pk, sender_sk) = gen_keypair();
1600        let saddr = "127.0.0.1:12345".parse().unwrap();
1601
1602        let request_data = AnnounceRequestData {
1603            pk: sender_pk,
1604            saddr,
1605            path_id: path.id(),
1606            friend_pk: Some(friend_pk),
1607        };
1608        let request_id = state.announce_requests.new_ping_id(request_data);
1609
1610        // insert request to a node to announce_requests so that it won't be pinged again
1611        let (node_pk, _node_sk) = gen_keypair();
1612        let node = PackedNode::new(SocketAddr::V4("5.6.7.8:12345".parse().unwrap()), &node_pk);
1613        let node_request_data = AnnounceRequestData {
1614            pk: node_pk,
1615            saddr: node.saddr,
1616            path_id: path.id(),
1617            friend_pk: Some(friend_pk),
1618        };
1619        let _node_request_id = state.announce_requests.new_ping_id(node_request_data);
1620
1621        drop(state);
1622
1623        let (friend_data_pk, _friend_data_sk) = gen_keypair();
1624        let node = PackedNode::new(SocketAddr::V4("5.6.7.8:12345".parse().unwrap()), &node_pk);
1625        let payload = OnionAnnounceResponsePayload {
1626            announce_status: AnnounceStatus::Found,
1627            ping_id_or_pk: pk_as_digest(friend_data_pk),
1628            nodes: vec![node]
1629        };
1630        let packet = OnionAnnounceResponse::new(&precompute(&friend_temporary_pk, &sender_sk), request_id, &payload);
1631
1632        onion_client.handle_announce_response(&packet, true).await.unwrap();
1633
1634        // Necessary to drop tx so that rx.collect() can be finished
1635        drop(onion_client);
1636
1637        assert!(udp_rx.collect::<Vec<_>>().await.is_empty());
1638    }
1639
1640    #[tokio::test]
1641    async fn handle_data_response_dht_pk_announce_udp_node() {
1642        let (dht_pk, dht_sk) = gen_keypair();
1643        let (real_pk, real_sk) = gen_keypair();
1644        let (udp_tx, udp_rx) = mpsc::channel(1);
1645        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1646        let (dht_pk_tx, dht_pk_rx) = mpsc::unbounded();
1647        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1648        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1649        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1650
1651        onion_client.set_dht_pk_sink(dht_pk_tx).await;
1652
1653        let (friend_dht_pk, _friend_dht_sk) = gen_keypair();
1654        let (friend_real_pk, friend_real_sk) = gen_keypair();
1655
1656        onion_client.add_friend(friend_real_pk).await;
1657
1658        let saddr: SocketAddr = "127.0.0.1:12345".parse().unwrap();
1659        let (node_pk, node_sk) = gen_keypair();
1660        let dht_pk_announce_payload = DhtPkAnnouncePayload::new(friend_dht_pk, vec![
1661            TcpUdpPackedNode {
1662                ip_port: IpPort {
1663                    protocol: ProtocolType::UDP,
1664                    ip_addr: saddr.ip(),
1665                    port: saddr.port(),
1666                },
1667                pk: node_pk,
1668            },
1669        ]);
1670        let no_reply = dht_pk_announce_payload.no_reply;
1671        let onion_data_response_inner_payload = OnionDataResponseInnerPayload::DhtPkAnnounce(dht_pk_announce_payload);
1672        let nonce = gen_nonce();
1673        let onion_data_response_payload = OnionDataResponsePayload::new(&precompute(&real_pk, &friend_real_sk), friend_real_pk, &nonce, &onion_data_response_inner_payload);
1674        let (temporary_pk, temporary_sk) = gen_keypair();
1675        let onion_data_response = OnionDataResponse::new(&precompute(&onion_client.data_pk, &temporary_sk), temporary_pk, nonce, &onion_data_response_payload);
1676
1677        onion_client.handle_data_response(&onion_data_response).await.unwrap();
1678
1679        let state = onion_client.state.lock().await;
1680
1681        // friend should have updated data
1682        let friend = &state.friends[&friend_real_pk];
1683        assert_eq!(friend.last_no_reply, no_reply);
1684        assert_eq!(friend.dht_pk, Some(friend_dht_pk));
1685
1686        // friend's DHT key should be sent to dht_pk_tx
1687        let (received, _dht_pk_rx) = dht_pk_rx.into_future().await;
1688        let (received_real_pk, received_dht_pk) = received.unwrap();
1689        assert_eq!(received_real_pk, friend_real_pk);
1690        assert_eq!(received_dht_pk, friend_dht_pk);
1691
1692        // the node from announce packet should be pinged
1693        let (received, _udp_rx) = udp_rx.into_future().await;
1694        let (packet, addr_to_send) = received.unwrap();
1695
1696        assert_eq!(addr_to_send, saddr);
1697        let packet = unpack!(packet, Packet::NodesRequest);
1698        let payload = packet.get_payload(&precompute(&dht_pk, &node_sk)).unwrap();
1699
1700        assert_eq!(payload.pk, dht_pk);
1701    }
1702
1703    #[tokio::test]
1704    async fn handle_data_response_dht_pk_announce_tcp_node() {
1705        let (dht_pk, dht_sk) = gen_keypair();
1706        let (real_pk, real_sk) = gen_keypair();
1707        let (udp_tx, _udp_rx) = mpsc::channel(1);
1708        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1709        let (dht_pk_tx, _dht_pk_rx) = mpsc::unbounded();
1710        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1711        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1712        let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
1713
1714        onion_client.set_dht_pk_sink(dht_pk_tx).await;
1715
1716        let (friend_dht_pk, _friend_dht_sk) = gen_keypair();
1717        let (friend_real_pk, friend_real_sk) = gen_keypair();
1718
1719        onion_client.add_friend(friend_real_pk).await;
1720
1721        let (node_pk, _node_sk) = gen_keypair();
1722        let dht_pk_announce_payload = DhtPkAnnouncePayload::new(friend_dht_pk, vec![
1723            TcpUdpPackedNode {
1724                ip_port: IpPort {
1725                    protocol: ProtocolType::TCP,
1726                    ip_addr: "127.0.0.2".parse().unwrap(),
1727                    port: 12346,
1728                },
1729                pk: node_pk,
1730            },
1731        ]);
1732        let no_reply = dht_pk_announce_payload.no_reply;
1733        let onion_data_response_inner_payload = OnionDataResponseInnerPayload::DhtPkAnnounce(dht_pk_announce_payload);
1734        let nonce = gen_nonce();
1735        let onion_data_response_payload = OnionDataResponsePayload::new(&precompute(&real_pk, &friend_real_sk), friend_real_pk, &nonce, &onion_data_response_inner_payload);
1736        let (temporary_pk, temporary_sk) = gen_keypair();
1737        let onion_data_response = OnionDataResponse::new(&precompute(&onion_client.data_pk, &temporary_sk), temporary_pk, nonce, &onion_data_response_payload);
1738
1739        onion_client.handle_data_response(&onion_data_response).await.unwrap();
1740
1741        let state = onion_client.state.lock().await;
1742
1743        // friend should have updated data
1744        let friend = &state.friends[&friend_real_pk];
1745        assert_eq!(friend.last_no_reply, no_reply);
1746        assert_eq!(friend.dht_pk, Some(friend_dht_pk));
1747
1748        assert!(onion_client.tcp_connections.has_relay(&node_pk).await);
1749        assert!(onion_client.tcp_connections.has_connection(&friend_dht_pk).await);
1750    }
1751
1752    #[tokio::test]
1753    async fn handle_data_response_dht_pk_announce_no_friend_with_pk() {
1754        let (dht_pk, dht_sk) = gen_keypair();
1755        let (real_pk, real_sk) = gen_keypair();
1756        let (udp_tx, _udp_rx) = mpsc::channel(1);
1757        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1758        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1759        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1760        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1761
1762        let (friend_dht_pk, _friend_dht_sk) = gen_keypair();
1763        let (friend_real_pk, friend_real_sk) = gen_keypair();
1764
1765        let dht_pk_announce_payload = DhtPkAnnouncePayload::new(friend_dht_pk, vec![]);
1766        let onion_data_response_inner_payload = OnionDataResponseInnerPayload::DhtPkAnnounce(dht_pk_announce_payload);
1767        let nonce = gen_nonce();
1768        let onion_data_response_payload = OnionDataResponsePayload::new(&precompute(&real_pk, &friend_real_sk), friend_real_pk, &nonce, &onion_data_response_inner_payload);
1769        let (temporary_pk, temporary_sk) = gen_keypair();
1770        let onion_data_response = OnionDataResponse::new(&precompute(&onion_client.data_pk, &temporary_sk), temporary_pk, nonce, &onion_data_response_payload);
1771
1772        let error = onion_client.handle_data_response(&onion_data_response).await.err().unwrap();
1773        assert_eq!(error.kind(), &HandleDataResponseErrorKind::DhtPkAnnounce);
1774        let cause = error.cause().unwrap().downcast_ref::<HandleDhtPkAnnounceError>().unwrap();
1775        assert_eq!(cause.kind(), &HandleDhtPkAnnounceErrorKind::NoFriendWithPk);
1776    }
1777
1778    #[tokio::test]
1779    async fn handle_data_response_dht_pk_announce_invalid_no_reply() {
1780        let (dht_pk, dht_sk) = gen_keypair();
1781        let (real_pk, real_sk) = gen_keypair();
1782        let (udp_tx, _udp_rx) = mpsc::channel(1);
1783        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1784        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1785        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1786        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1787
1788        let (friend_dht_pk, _friend_dht_sk) = gen_keypair();
1789        let (friend_real_pk, friend_real_sk) = gen_keypair();
1790
1791        onion_client.add_friend(friend_real_pk).await;
1792
1793        let dht_pk_announce_payload = DhtPkAnnouncePayload::new(friend_dht_pk, vec![]);
1794        let onion_data_response_inner_payload = OnionDataResponseInnerPayload::DhtPkAnnounce(dht_pk_announce_payload);
1795        let nonce = gen_nonce();
1796        let onion_data_response_payload = OnionDataResponsePayload::new(&precompute(&real_pk, &friend_real_sk), friend_real_pk, &nonce, &onion_data_response_inner_payload);
1797        let (temporary_pk, temporary_sk) = gen_keypair();
1798        let onion_data_response = OnionDataResponse::new(&precompute(&onion_client.data_pk, &temporary_sk), temporary_pk, nonce, &onion_data_response_payload);
1799
1800        onion_client.handle_data_response(&onion_data_response).await.unwrap();
1801
1802        // second announce with the same no_reply should be rejected
1803        let error = onion_client.handle_data_response(&onion_data_response).await.err().unwrap();
1804        assert_eq!(error.kind(), &HandleDataResponseErrorKind::DhtPkAnnounce);
1805        let cause = error.cause().unwrap().downcast_ref::<HandleDhtPkAnnounceError>().unwrap();
1806        assert_eq!(cause.kind(), &HandleDhtPkAnnounceErrorKind::InvalidNoReply);
1807    }
1808
1809    #[tokio::test]
1810    async fn handle_data_response_invalid_payload() {
1811        let (dht_pk, dht_sk) = gen_keypair();
1812        let (real_pk, real_sk) = gen_keypair();
1813        let (udp_tx, _udp_rx) = mpsc::channel(1);
1814        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1815        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1816        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1817        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1818
1819        let onion_data_response = OnionDataResponse {
1820            nonce: gen_nonce(),
1821            temporary_pk: gen_keypair().0,
1822            payload: vec![42; 123],
1823        };
1824
1825        let error = onion_client.handle_data_response(&onion_data_response).await.err().unwrap();
1826        assert_eq!(error.kind(), &HandleDataResponseErrorKind::InvalidPayload);
1827    }
1828
1829    #[tokio::test]
1830    async fn handle_data_response_invalid_inner_payload() {
1831        let (dht_pk, dht_sk) = gen_keypair();
1832        let (real_pk, real_sk) = gen_keypair();
1833        let (udp_tx, _udp_rx) = mpsc::channel(1);
1834        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1835        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1836        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1837        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1838
1839        let onion_data_response_payload = OnionDataResponsePayload {
1840            real_pk: gen_keypair().0,
1841            payload: vec![42; 123],
1842        };
1843        let (temporary_pk, temporary_sk) = gen_keypair();
1844        let onion_data_response = OnionDataResponse::new(&precompute(&onion_client.data_pk, &temporary_sk), temporary_pk, gen_nonce(), &onion_data_response_payload);
1845
1846        let error = onion_client.handle_data_response(&onion_data_response).await.err().unwrap();
1847        assert_eq!(error.kind(), &HandleDataResponseErrorKind::InvalidInnerPayload);
1848    }
1849
1850    #[tokio::test]
1851    async fn announce_loop_empty() {
1852        let (dht_pk, dht_sk) = gen_keypair();
1853        let (real_pk, real_sk) = gen_keypair();
1854        let (udp_tx, udp_rx) = mpsc::channel(MAX_ONION_ANNOUNCE_NODES as usize / 2);
1855        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1856        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1857        // make DHT connected so that we will build UDP onion paths
1858        dht.add_node(PackedNode::new("127.0.0.1:12345".parse().unwrap(), &gen_keypair().0)).await;
1859        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1860        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1861
1862        let mut state = onion_client.state.lock().await;
1863
1864        // map needed to decrypt onion packets later
1865        let mut key_by_addr = HashMap::new();
1866        let addr = "127.0.0.1".parse().unwrap();
1867        for i in 0 .. 3 {
1868            let saddr = SocketAddr::new(addr, 12346 + i);
1869            let (pk, sk) = gen_keypair();
1870            key_by_addr.insert(saddr, sk);
1871            let node = PackedNode::new(saddr, &pk);
1872            state.paths_pool.path_nodes.put(node);
1873        }
1874
1875        onion_client.announce_loop(&mut state).await.unwrap();
1876
1877        let data_pk = onion_client.data_pk;
1878
1879        // Necessary to drop tx so that rx.collect() can be finished
1880        drop(state);
1881        drop(onion_client);
1882
1883        let packets = udp_rx.collect::<Vec<_>>().await;
1884
1885        assert!(!packets.is_empty());
1886
1887        for (packet, addr_to_send) in packets {
1888            let packet = unpack!(packet, Packet::OnionRequest0);
1889            let payload = unpack_onion_packet(packet, addr_to_send, &key_by_addr);
1890            let packet = unpack!(payload.inner, InnerOnionRequest::InnerOnionAnnounceRequest);
1891            let payload = packet.get_payload(&precompute(&real_pk, &key_by_addr[&payload.ip_port.to_saddr()])).unwrap();
1892            assert_eq!(payload.ping_id, initial_ping_id());
1893            assert_eq!(payload.search_pk, real_pk);
1894            assert_eq!(payload.data_pk, data_pk);
1895        }
1896    }
1897
1898    #[tokio::test]
1899    async fn announce_loop() {
1900        let (dht_pk, dht_sk) = gen_keypair();
1901        let (real_pk, real_sk) = gen_keypair();
1902        let (udp_tx, udp_rx) = mpsc::channel(MAX_ONION_ANNOUNCE_NODES as usize);
1903        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1904        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1905        // make DHT connected so that we will build UDP onion paths
1906        dht.add_node(PackedNode::new("127.0.0.1:12345".parse().unwrap(), &gen_keypair().0)).await;
1907        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1908        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1909
1910        let mut state = onion_client.state.lock().await;
1911
1912        // map needed to decrypt onion packets later
1913        let mut key_by_addr = HashMap::new();
1914        let addr = "127.0.0.1".parse().unwrap();
1915        for i in 0 .. 3 {
1916            let saddr = SocketAddr::new(addr, 12346 + i);
1917            let (pk, sk) = gen_keypair();
1918            key_by_addr.insert(saddr, sk);
1919            let node = PackedNode::new(saddr, &pk);
1920            state.paths_pool.path_nodes.put(node);
1921        }
1922
1923        let ping_id = sha256::hash(&[1, 2, 3]);
1924        let now = Instant::now();
1925
1926        let mut nodes_key_by_addr = HashMap::new();
1927        for i in 0 .. MAX_ONION_ANNOUNCE_NODES {
1928            let saddr = SocketAddr::new(addr, 23456 + u16::from(i));
1929            let path = state.paths_pool.path_nodes.udp_path().unwrap();
1930            let (node_pk, node_sk) = gen_keypair();
1931            nodes_key_by_addr.insert(saddr, node_sk);
1932            let node = OnionNode {
1933                pk: node_pk,
1934                saddr,
1935                path_id: path.id(),
1936                ping_id: Some(ping_id),
1937                data_pk: None,
1938                unsuccessful_pings: 0,
1939                added_time: now,
1940                ping_time: now,
1941                response_time: now,
1942                announce_status: AnnounceStatus::Failed,
1943            };
1944            assert!(state.announce_list.try_add(&real_pk, node, true));
1945        }
1946
1947        tokio::time::pause();
1948        // time when entry is timed out
1949        tokio::time::advance(ANNOUNCE_INTERVAL_NOT_ANNOUNCED + Duration::from_secs(1)).await;
1950
1951        onion_client.announce_loop(&mut state).await.unwrap();
1952
1953        let data_pk = onion_client.data_pk;
1954
1955        // Necessary to drop tx so that rx.collect() can be finished
1956        drop(state);
1957        drop(onion_client);
1958
1959        let packets = udp_rx.collect::<Vec<_>>().await;
1960
1961        assert_eq!(packets.len(), MAX_ONION_ANNOUNCE_NODES as usize);
1962
1963        for (packet, addr_to_send) in packets {
1964            let packet = unpack!(packet, Packet::OnionRequest0);
1965            let payload = unpack_onion_packet(packet, addr_to_send, &key_by_addr);
1966            let packet = unpack!(payload.inner, InnerOnionRequest::InnerOnionAnnounceRequest);
1967            let payload = packet.get_payload(&precompute(&real_pk, &nodes_key_by_addr[&payload.ip_port.to_saddr()])).unwrap();
1968            assert_eq!(payload.ping_id, ping_id);
1969            assert_eq!(payload.search_pk, real_pk);
1970            assert_eq!(payload.data_pk, data_pk);
1971        }
1972    }
1973
1974    #[tokio::test]
1975    async fn friends_loop_empty() {
1976        let (dht_pk, dht_sk) = gen_keypair();
1977        let (real_pk, real_sk) = gen_keypair();
1978        let (udp_tx, udp_rx) = mpsc::channel(MAX_ONION_ANNOUNCE_NODES as usize / 2);
1979        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
1980        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
1981        // make DHT connected so that we will build UDP onion paths
1982        dht.add_node(PackedNode::new("127.0.0.1:12345".parse().unwrap(), &gen_keypair().0)).await;
1983        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
1984        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
1985
1986        let mut state = onion_client.state.lock().await;
1987
1988        let (friend_pk, _friend_sk) = gen_keypair();
1989        let friend = OnionFriend::new(friend_pk);
1990        let friend_temporary_pk = friend.temporary_pk;
1991        state.friends.insert(friend_pk, friend);
1992
1993        // map needed to decrypt onion packets later
1994        let mut key_by_addr = HashMap::new();
1995        let addr = "127.0.0.1".parse().unwrap();
1996        for i in 0 .. 3 {
1997            let saddr = SocketAddr::new(addr, 12346 + i);
1998            let (pk, sk) = gen_keypair();
1999            key_by_addr.insert(saddr, sk);
2000            let node = PackedNode::new(saddr, &pk);
2001            state.paths_pool.path_nodes.put(node);
2002        }
2003
2004        onion_client.friends_loop(&mut state).await.unwrap();
2005
2006        // Necessary to drop tx so that rx.collect() can be finished
2007        drop(state);
2008        drop(onion_client);
2009
2010        let packets = udp_rx.collect::<Vec<_>>().await;
2011
2012        assert!(!packets.is_empty());
2013
2014        for (packet, addr_to_send) in packets {
2015            let packet = unpack!(packet, Packet::OnionRequest0);
2016            let payload = unpack_onion_packet(packet, addr_to_send, &key_by_addr);
2017            let packet = unpack!(payload.inner, InnerOnionRequest::InnerOnionAnnounceRequest);
2018            let payload = packet.get_payload(&precompute(&friend_temporary_pk, &key_by_addr[&payload.ip_port.to_saddr()])).unwrap();
2019            assert_eq!(payload.ping_id, initial_ping_id());
2020            assert_eq!(payload.search_pk, friend_pk);
2021            assert_eq!(payload.data_pk, PublicKey([0; 32]));
2022        }
2023    }
2024
2025    #[tokio::test]
2026    async fn friends_loop() {
2027        let (dht_pk, dht_sk) = gen_keypair();
2028        let (real_pk, real_sk) = gen_keypair();
2029        let (udp_tx, udp_rx) = mpsc::channel(MAX_ONION_FRIEND_NODES as usize);
2030        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
2031        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
2032        // make DHT connected so that we will build UDP onion paths
2033        dht.add_node(PackedNode::new("127.0.0.1:12345".parse().unwrap(), &gen_keypair().0)).await;
2034        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
2035        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
2036
2037        let mut state = onion_client.state.lock().await;
2038
2039        let (friend_pk, _friend_sk) = gen_keypair();
2040        let mut friend = OnionFriend::new(friend_pk);
2041        let friend_temporary_pk = friend.temporary_pk;
2042
2043        // map needed to decrypt onion packets later
2044        let mut key_by_addr = HashMap::new();
2045        let addr = "127.0.0.1".parse().unwrap();
2046        for i in 0 .. 3 {
2047            let saddr = SocketAddr::new(addr, 12346 + i);
2048            let (pk, sk) = gen_keypair();
2049            key_by_addr.insert(saddr, sk);
2050            let node = PackedNode::new(saddr, &pk);
2051            state.paths_pool.path_nodes.put(node);
2052        }
2053
2054        let now = Instant::now();
2055
2056        let mut nodes_key_by_addr = HashMap::new();
2057        for i in 0 .. MAX_ONION_FRIEND_NODES {
2058            let saddr = SocketAddr::new(addr, 23456 + u16::from(i));
2059            let path = state.paths_pool.path_nodes.udp_path().unwrap();
2060            let (node_pk, node_sk) = gen_keypair();
2061            nodes_key_by_addr.insert(saddr, node_sk);
2062            let node = OnionNode {
2063                pk: node_pk,
2064                saddr,
2065                path_id: path.id(),
2066                // regardless of this ping_id search requests should contain 0
2067                ping_id: Some(sha256::hash(&[1, 2, 3])),
2068                data_pk: None,
2069                unsuccessful_pings: 0,
2070                added_time: now,
2071                ping_time: now,
2072                response_time: now,
2073                announce_status: AnnounceStatus::Failed,
2074            };
2075            assert!(friend.close_nodes.try_add(&real_pk, node, true));
2076        }
2077
2078        state.friends.insert(friend_pk, friend);
2079
2080        tokio::time::pause();
2081        // time when announce packet should be sent
2082        tokio::time::advance(ANNOUNCE_INTERVAL_NOT_ANNOUNCED + Duration::from_secs(1)).await;
2083
2084        onion_client.friends_loop(&mut state).await.unwrap();
2085
2086        // Necessary to drop tx so that rx.collect() can be finished
2087        drop(state);
2088        drop(onion_client);
2089
2090        let packets = udp_rx.collect::<Vec<_>>().await;
2091
2092        assert_eq!(packets.len(), MAX_ONION_FRIEND_NODES as usize);
2093
2094        for (packet, addr_to_send) in packets {
2095            let packet = unpack!(packet, Packet::OnionRequest0);
2096            let payload = unpack_onion_packet(packet, addr_to_send, &key_by_addr);
2097            let packet = unpack!(payload.inner, InnerOnionRequest::InnerOnionAnnounceRequest);
2098            let payload = packet.get_payload(&precompute(&friend_temporary_pk, &nodes_key_by_addr[&payload.ip_port.to_saddr()])).unwrap();
2099            assert_eq!(payload.ping_id, initial_ping_id());
2100            assert_eq!(payload.search_pk, friend_pk);
2101            assert_eq!(payload.data_pk, PublicKey([0; 32]));
2102        }
2103    }
2104
2105    #[tokio::test]
2106    async fn friends_loop_ignore_online() {
2107        let (dht_pk, dht_sk) = gen_keypair();
2108        let (real_pk, real_sk) = gen_keypair();
2109        let (udp_tx, udp_rx) = mpsc::channel(MAX_ONION_FRIEND_NODES as usize);
2110        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
2111        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
2112        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
2113        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
2114
2115        let mut state = onion_client.state.lock().await;
2116
2117        let (friend_pk, _friend_sk) = gen_keypair();
2118        let mut friend = OnionFriend::new(friend_pk);
2119        friend.connected = true;
2120
2121        let addr = "127.0.0.1".parse().unwrap();
2122        for i in 0 .. 3 {
2123            let saddr = SocketAddr::new(addr, 12346 + i);
2124            let (pk, _sk) = gen_keypair();
2125            let node = PackedNode::new(saddr, &pk);
2126            state.paths_pool.path_nodes.put(node);
2127        }
2128
2129        let now = Instant::now();
2130
2131        for i in 0 .. MAX_ONION_FRIEND_NODES {
2132            let saddr = SocketAddr::new(addr, 23456 + u16::from(i));
2133            let path = state.paths_pool.path_nodes.udp_path().unwrap();
2134            let (node_pk, _node_sk) = gen_keypair();
2135            let node = OnionNode {
2136                pk: node_pk,
2137                saddr,
2138                path_id: path.id(),
2139                ping_id: None,
2140                data_pk: None,
2141                unsuccessful_pings: 0,
2142                added_time: now,
2143                ping_time: now,
2144                response_time: now,
2145                announce_status: AnnounceStatus::Failed,
2146            };
2147            assert!(friend.close_nodes.try_add(&real_pk, node, true));
2148        }
2149
2150        state.friends.insert(friend_pk, friend);
2151
2152        tokio::time::pause();
2153        // time when announce packet should be sent
2154        tokio::time::advance(ANNOUNCE_INTERVAL_NOT_ANNOUNCED + Duration::from_secs(1)).await;
2155
2156        onion_client.friends_loop(&mut state).await.unwrap();
2157
2158        // Necessary to drop tx so that rx.collect() can be finished
2159        drop(state);
2160        drop(onion_client);
2161
2162        assert!(udp_rx.collect::<Vec<_>>().await.is_empty());
2163    }
2164
2165    #[tokio::test]
2166    async fn send_dht_pk_onion() {
2167        let (dht_pk, dht_sk) = gen_keypair();
2168        let (real_pk, real_sk) = gen_keypair();
2169        let (udp_tx, udp_rx) = mpsc::channel(MAX_ONION_FRIEND_NODES as usize);
2170        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
2171        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
2172        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
2173        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
2174
2175        let mut state = onion_client.state.lock().await;
2176
2177        let (friend_pk, friend_sk) = gen_keypair();
2178        let mut friend = OnionFriend::new(friend_pk);
2179
2180        // map needed to decrypt onion packets later
2181        let mut key_by_addr = HashMap::new();
2182        let addr = "127.0.0.1".parse().unwrap();
2183        for i in 0 .. 3 {
2184            let saddr = SocketAddr::new(addr, 12346 + i);
2185            let (pk, sk) = gen_keypair();
2186            key_by_addr.insert(saddr, sk);
2187            let node = PackedNode::new(saddr, &pk);
2188            state.paths_pool.path_nodes.put(node);
2189        }
2190
2191        let now = Instant::now();
2192
2193        let (data_pk, data_sk) = gen_keypair();
2194        let mut nodes_key_by_addr = HashMap::new();
2195        for i in 0 .. MAX_ONION_FRIEND_NODES {
2196            let saddr = SocketAddr::new(addr, 23456 + u16::from(i));
2197            let path = state.paths_pool.path_nodes.udp_path().unwrap();
2198            let (node_pk, node_sk) = gen_keypair();
2199            nodes_key_by_addr.insert(saddr, node_sk);
2200            let node = OnionNode {
2201                pk: node_pk,
2202                saddr,
2203                path_id: path.id(),
2204                ping_id: None,
2205                data_pk: Some(data_pk),
2206                unsuccessful_pings: 0,
2207                added_time: now,
2208                ping_time: now,
2209                response_time: now,
2210                announce_status: AnnounceStatus::Failed,
2211            };
2212            assert!(friend.close_nodes.try_add(&real_pk, node, true));
2213        }
2214
2215        state.friends.insert(friend_pk, friend);
2216
2217        let mut dht_close_nodes = onion_client.dht.close_nodes.write().await;
2218        for i in 0 .. 4 {
2219            let saddr = SocketAddr::new(addr, 23456 + i);
2220            let (node_pk, _node_sk) = gen_keypair();
2221            let node = PackedNode::new(saddr, &node_pk);
2222            assert!(dht_close_nodes.try_add(node));
2223        }
2224        drop(dht_close_nodes);
2225
2226        onion_client.friends_loop(&mut state).await.unwrap();
2227
2228        // Necessary to drop tx so that rx.collect() can be finished
2229        drop(state);
2230        drop(onion_client);
2231
2232        let packets = udp_rx.collect::<Vec<_>>().await;
2233
2234        assert_eq!(packets.len(), MAX_ONION_FRIEND_NODES as usize);
2235
2236        for (packet, addr_to_send) in packets {
2237            let packet = unpack!(packet, Packet::OnionRequest0);
2238            let payload = unpack_onion_packet(packet, addr_to_send, &key_by_addr);
2239            let packet = unpack!(payload.inner, InnerOnionRequest::InnerOnionDataRequest);
2240            assert_eq!(packet.destination_pk, friend_pk);
2241            let payload = packet.get_payload(&precompute(&packet.temporary_pk, &data_sk)).unwrap();
2242            assert_eq!(payload.real_pk, real_pk);
2243            let payload = payload.get_payload(&packet.nonce, &precompute(&real_pk, &friend_sk)).unwrap();
2244            let payload = unpack!(payload, OnionDataResponseInnerPayload::DhtPkAnnounce);
2245            assert_eq!(payload.dht_pk, dht_pk);
2246            assert_eq!(payload.nodes.len(), 4);
2247        }
2248    }
2249
2250    #[tokio::test]
2251    async fn send_dht_pk_dht_request() {
2252        let (dht_pk, dht_sk) = gen_keypair();
2253        let (real_pk, real_sk) = gen_keypair();
2254        let (udp_tx, udp_rx) = mpsc::channel(MAX_ONION_FRIEND_NODES as usize);
2255        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
2256        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
2257        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
2258        let onion_client = OnionClient::new(dht, tcp_connections, real_sk.clone(), real_pk);
2259
2260        let mut state = onion_client.state.lock().await;
2261
2262        let (friend_pk, friend_sk) = gen_keypair();
2263        let (friend_dht_pk, friend_dht_sk) = gen_keypair();
2264        let mut friend = OnionFriend::new(friend_pk);
2265        friend.dht_pk = Some(friend_dht_pk);
2266        state.friends.insert(friend_pk, friend);
2267
2268        let addr = "127.0.0.1".parse().unwrap();
2269        let mut dht_close_nodes = onion_client.dht.close_nodes.write().await;
2270        for i in 0 .. 8 {
2271            let saddr = SocketAddr::new(addr, 23456 + i);
2272            let (node_pk, _node_sk) = gen_keypair();
2273            let node = PackedNode::new(saddr, &node_pk);
2274            assert!(dht_close_nodes.try_add(node));
2275        }
2276        drop(dht_close_nodes);
2277
2278        onion_client.friends_loop(&mut state).await.unwrap();
2279
2280        // Necessary to drop tx so that rx.collect() can be finished
2281        drop(state);
2282        drop(onion_client);
2283
2284        let packets = udp_rx.collect::<Vec<_>>().await;
2285
2286        assert_eq!(packets.len(), 8);
2287
2288        for (packet, _addr_to_send) in packets {
2289            let packet = unpack!(packet, Packet::DhtRequest);
2290            assert_eq!(packet.rpk, friend_dht_pk);
2291            assert_eq!(packet.spk, dht_pk);
2292            let payload = packet.get_payload(&precompute(&dht_pk, &friend_dht_sk)).unwrap();
2293            let packet = unpack!(payload, DhtRequestPayload::DhtPkAnnounce);
2294            assert_eq!(packet.real_pk, real_pk);
2295            let payload = packet.get_payload(&precompute(&real_pk, &friend_sk)).unwrap();
2296            assert_eq!(payload.dht_pk, dht_pk);
2297        }
2298    }
2299
2300    #[tokio::test]
2301    async fn populate_path_nodes() {
2302        let (dht_pk, dht_sk) = gen_keypair();
2303        let (real_pk, real_sk) = gen_keypair();
2304        let (udp_tx, udp_rx) = mpsc::channel(1);
2305        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
2306        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
2307        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
2308        let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
2309
2310        let (node_pk, node_sk) = gen_keypair();
2311        let node = PackedNode::new("127.0.0.1:12345".parse().unwrap(), &node_pk);
2312
2313        // add node to DHT server via responding to NodesRequest
2314        onion_client.dht.ping_node(&node).await.unwrap();
2315        let (received, _udp_rx) = udp_rx.into_future().await;
2316        let (packet, addr_to_send) = received.unwrap();
2317        assert_eq!(addr_to_send, node.saddr);
2318        let shared_secret = precompute(&dht_pk, &node_sk);
2319        let request_packet = unpack!(packet, Packet::NodesRequest);
2320        let request_payload = request_packet.get_payload(&shared_secret).unwrap();
2321        let response_payload = NodesResponsePayload {
2322            nodes: vec![],
2323            id: request_payload.id,
2324        };
2325        let response_packet = NodesResponse::new(&shared_secret, &node_pk, &response_payload);
2326        onion_client.dht.handle_packet(Packet::NodesResponse(response_packet), node.saddr).await.unwrap();
2327
2328        let mut state = onion_client.state.lock().await;
2329
2330        onion_client.populate_path_nodes(&mut state).await;
2331
2332        assert!(state.paths_pool.path_nodes.rand().is_some());
2333    }
2334
2335    #[tokio::test]
2336    async fn send_onion_request_udp() {
2337        let (dht_pk, dht_sk) = gen_keypair();
2338        let (real_pk, real_sk) = gen_keypair();
2339        let (udp_tx, udp_rx) = mpsc::channel(1);
2340        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
2341        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
2342        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
2343        let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
2344
2345        let inner_onion_request = InnerOnionRequest::InnerOnionAnnounceRequest(InnerOnionAnnounceRequest {
2346            nonce: gen_nonce(),
2347            pk: gen_keypair().0,
2348            payload: vec![42; 123],
2349        });
2350
2351        let path = OnionPath::new([
2352            PackedNode::new("127.0.0.1:12346".parse().unwrap(), &gen_keypair().0),
2353            PackedNode::new("127.0.0.1:12347".parse().unwrap(), &gen_keypair().0),
2354            PackedNode::new("127.0.0.1:12348".parse().unwrap(), &gen_keypair().0),
2355        ], OnionPathType::UDP);
2356        let saddr = "127.0.0.1:12345".parse().unwrap();
2357        onion_client.send_onion_request(path, inner_onion_request, saddr).await.unwrap();
2358
2359        let (_received, _udp_rx) = udp_rx.into_future().await;
2360    }
2361
2362    #[tokio::test]
2363    async fn send_onion_request_tcp() {
2364        let (dht_pk, dht_sk) = gen_keypair();
2365        let (real_pk, real_sk) = gen_keypair();
2366        let (udp_tx, udp_rx) = mpsc::channel(1);
2367        let (tcp_incoming_tx, _tcp_incoming_rx) = mpsc::unbounded();
2368        let dht = DhtServer::new(udp_tx, dht_pk, dht_sk.clone());
2369        let tcp_connections = TcpConnections::new(dht_pk, dht_sk, tcp_incoming_tx);
2370        let (_relay_incoming_rx, relay_outgoing_rx, relay_pk) = tcp_connections.add_client().await;
2371        let onion_client = OnionClient::new(dht, tcp_connections, real_sk, real_pk);
2372
2373        let inner_onion_request = InnerOnionRequest::InnerOnionAnnounceRequest(InnerOnionAnnounceRequest {
2374            nonce: gen_nonce(),
2375            pk: gen_keypair().0,
2376            payload: vec![42; 123],
2377        });
2378
2379        let path = OnionPath::new([
2380            PackedNode::new("127.0.0.1:12346".parse().unwrap(), &relay_pk),
2381            PackedNode::new("127.0.0.1:12347".parse().unwrap(), &gen_keypair().0),
2382            PackedNode::new("127.0.0.1:12348".parse().unwrap(), &gen_keypair().0),
2383        ], OnionPathType::TCP);
2384        let saddr = "127.0.0.1:12345".parse().unwrap();
2385        onion_client.send_onion_request(path, inner_onion_request, saddr).await.unwrap();
2386
2387        let (_received, _relay_outgoing_rx) = relay_outgoing_rx.into_future().await;
2388
2389        // Necessary to drop tx so that rx.collect() can be finished
2390        drop(onion_client);
2391
2392        assert!(udp_rx.collect::<Vec<_>>().await.is_empty());
2393    }
2394}