logo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Copyright 2022 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use super::Client;
use crate::client::{connections::QueryResult, errors::Error};
use crate::messaging::{
    data::{DataQuery, ServiceMsg},
    ServiceAuth, WireMsg,
};
use crate::types::{PublicKey, Signature};
use bytes::Bytes;
use rand::Rng;
use tracing::{debug, info_span};

// We divide the total query timeout by this number.
// This also represents the max retries possible, while still staying within the max_timeout.
const MAX_RETRY_COUNT: f32 = 25.0;

impl Client {
    /// Send a Query to the network and await a response.
    /// Queries are automatically retried using exponential backoff if the timeout is hit.
    #[instrument(skip(self), level = "debug")]
    pub async fn send_query(&self, query: DataQuery) -> Result<QueryResult, Error> {
        self.send_query_with_retry_count(query, MAX_RETRY_COUNT)
            .await
    }

    /// Send a Query to the network and await a response.
    /// Queries are not retried if the timeout is hit.
    #[instrument(skip(self), level = "debug")]
    pub async fn send_query_without_retry(&self, query: DataQuery) -> Result<QueryResult, Error> {
        self.send_query_with_retry_count(query, 1.0).await
    }

    // Send a Query to the network and await a response.
    // Queries are automatically retried if the timeout is hit
    // This function is a private helper.
    #[instrument(skip(self), level = "debug")]
    async fn send_query_with_retry_count(
        &self,
        query: DataQuery,
        retry_count: f32,
    ) -> Result<QueryResult, Error> {
        let client_pk = self.public_key();
        let msg = ServiceMsg::Query(query.clone());
        let serialised_query = WireMsg::serialize_msg_payload(&msg)?;
        let signature = self.keypair.sign(&serialised_query);

        let mut rng = rand::rngs::OsRng;

        // Add jitter so not all clients retry at the same rate. This divider will knock on to the overall retry window
        // and should help prevent elders from being conseceutively overwhelmed
        let jitter = rng.gen_range(1.0, 1.5);
        let attempt_timeout = self.query_timeout.div_f32(retry_count + jitter);
        trace!("Setting up query retry, interval is: {:?}", attempt_timeout);

        let span = info_span!("Attempting a query");
        let _ = span.enter();
        let mut attempt = 1.0;
        loop {
            debug!(
                "Attempting {:?} (attempt #{}) with a query timeout of {:?}",
                query, attempt, attempt_timeout
            );

            let res = tokio::time::timeout(
                attempt_timeout,
                self.send_signed_query(
                    query.clone(),
                    client_pk,
                    serialised_query.clone(),
                    signature.clone(),
                ),
            )
            .await;

            if let Ok(Ok(query_result)) = res {
                break Ok(query_result);
            } else if attempt > MAX_RETRY_COUNT {
                debug!(
                    "Retries ({}) all failed returning no response for {:?}",
                    MAX_RETRY_COUNT, query
                );
                break Err(Error::NoResponse);
            }

            attempt += 1.0;
        }
    }

    /// Send a Query to the network and await a response.
    /// This is part of a public API, for the user to
    /// provide the serialised and already signed query.
    pub async fn send_signed_query(
        &self,
        query: DataQuery,
        client_pk: PublicKey,
        serialised_query: Bytes,
        signature: Signature,
    ) -> Result<QueryResult, Error> {
        debug!("Sending Query: {:?}", query);
        let auth = ServiceAuth {
            public_key: client_pk,
            signature,
        };

        self.session.send_query(query, auth, serialised_query).await
    }
}