1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use futures::{
future::FutureExt,
stream::{FuturesUnordered, StreamExt},
};
use std::{future::Future, sync::Arc, time::Duration};
use tracing::warn;
use super::{errors::QueryError, metrics::Metrics};
pub struct Context {
pub metrics: Arc<Metrics>,
}
pub trait SpeculativeExecutionPolicy: Send + Sync {
fn max_retry_count(&self, context: &Context) -> usize;
fn retry_interval(&self, context: &Context) -> Duration;
}
#[derive(Debug, Clone)]
pub struct SimpleSpeculativeExecutionPolicy {
pub max_retry_count: usize,
pub retry_interval: Duration,
}
#[derive(Debug, Clone)]
pub struct PercentileSpeculativeExecutionPolicy {
pub max_retry_count: usize,
pub percentile: f64,
}
impl SpeculativeExecutionPolicy for SimpleSpeculativeExecutionPolicy {
fn max_retry_count(&self, _: &Context) -> usize {
self.max_retry_count
}
fn retry_interval(&self, _: &Context) -> Duration {
self.retry_interval
}
}
impl SpeculativeExecutionPolicy for PercentileSpeculativeExecutionPolicy {
fn max_retry_count(&self, _: &Context) -> usize {
self.max_retry_count
}
fn retry_interval(&self, context: &Context) -> Duration {
let interval = context.metrics.get_latency_percentile_ms(self.percentile);
let ms = match interval {
Ok(d) => d,
Err(e) => {
warn!(
"Failed to get latency percentile ({}), defaulting to 100 ms",
e
);
100
}
};
Duration::from_millis(ms)
}
}
fn can_be_ignored<ResT>(result: &Result<ResT, QueryError>) -> bool {
match result {
Ok(_) => false,
Err(QueryError::IoError(_)) => true,
Err(QueryError::TimeoutError) => true,
_ => false,
}
}
const EMPTY_PLAN_ERROR: QueryError = QueryError::ProtocolError("Empty query plan - driver bug!");
pub async fn execute<QueryFut, ResT>(
policy: &dyn SpeculativeExecutionPolicy,
context: &Context,
query_runner_generator: impl Fn() -> QueryFut,
) -> Result<ResT, QueryError>
where
QueryFut: Future<Output = Option<Result<ResT, QueryError>>>,
{
let mut retries_remaining = policy.max_retry_count(context);
let retry_interval = policy.retry_interval(context);
let mut async_tasks = FuturesUnordered::new();
async_tasks.push(query_runner_generator());
let sleep = tokio::time::sleep(retry_interval).fuse();
tokio::pin!(sleep);
let mut last_error = None;
loop {
futures::select! {
_ = &mut sleep => {
if retries_remaining > 0 {
async_tasks.push(query_runner_generator());
retries_remaining -= 1;
sleep.set(tokio::time::sleep(retry_interval).fuse());
}
}
res = async_tasks.select_next_some() => {
match res {
Some(r) => {
if !can_be_ignored(&r) {
return r;
} else {
last_error = Some(r)
}
},
None => {
if async_tasks.is_empty() && retries_remaining == 0 {
return last_error.unwrap_or({
Err(EMPTY_PLAN_ERROR)
});
}
},
}
}
}
}
}