1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use super::Client;
use crate::client::{connections::QueryResult, errors::Error};
use crate::messaging::{
data::{DataQuery, ServiceMsg},
ServiceAuth, WireMsg,
};
use crate::types::{PublicKey, Signature};
use bytes::Bytes;
use rand::Rng;
use tracing::{debug, info_span};
const MAX_RETRY_COUNT: f32 = 25.0;
impl Client {
#[instrument(skip(self), level = "debug")]
pub async fn send_query(&self, query: DataQuery) -> Result<QueryResult, Error> {
self.send_query_with_retry_count(query, MAX_RETRY_COUNT)
.await
}
#[instrument(skip(self), level = "debug")]
pub async fn send_query_without_retry(&self, query: DataQuery) -> Result<QueryResult, Error> {
self.send_query_with_retry_count(query, 1.0).await
}
#[instrument(skip(self), level = "debug")]
async fn send_query_with_retry_count(
&self,
query: DataQuery,
retry_count: f32,
) -> Result<QueryResult, Error> {
let client_pk = self.public_key();
let msg = ServiceMsg::Query(query.clone());
let serialised_query = WireMsg::serialize_msg_payload(&msg)?;
let signature = self.keypair.sign(&serialised_query);
let mut rng = rand::rngs::OsRng;
let jitter = rng.gen_range(1.0, 1.5);
let attempt_timeout = self.query_timeout.div_f32(retry_count + jitter);
trace!("Setting up query retry, interval is: {:?}", attempt_timeout);
let span = info_span!("Attempting a query");
let _ = span.enter();
let mut attempt = 1.0;
loop {
debug!(
"Attempting {:?} (attempt #{}) with a query timeout of {:?}",
query, attempt, attempt_timeout
);
let res = tokio::time::timeout(
attempt_timeout,
self.send_signed_query(
query.clone(),
client_pk,
serialised_query.clone(),
signature.clone(),
),
)
.await;
if let Ok(Ok(query_result)) = res {
break Ok(query_result);
} else if attempt > MAX_RETRY_COUNT {
debug!(
"Retries ({}) all failed returning no response for {:?}",
MAX_RETRY_COUNT, query
);
break Err(Error::NoResponse);
}
attempt += 1.0;
}
}
pub async fn send_signed_query(
&self,
query: DataQuery,
client_pk: PublicKey,
serialised_query: Bytes,
signature: Signature,
) -> Result<QueryResult, Error> {
debug!("Sending Query: {:?}", query);
let auth = ServiceAuth {
public_key: client_pk,
signature,
};
self.session.send_query(query, auth, serialised_query).await
}
}