1use std::collections::HashSet;
20use std::fmt::{Debug, Display};
21use std::future::Future;
22use std::sync::Arc;
23use std::time::Duration;
24
25use crate::errmapcomponent::ErrMapComponent;
26use crate::error::{Error, ErrorKind};
27use crate::memdx::error::ErrorKind::{Cancelled, Dispatch, Resource, Server};
28use crate::memdx::error::{CancellationErrorKind, ServerError, ServerErrorKind};
29use crate::retrybesteffort::controlled_backoff;
30use crate::retryfailfast::FailFastRetryStrategy;
31use crate::{error, queryx, searchx};
32use async_trait::async_trait;
33use log::{debug, info};
34use tokio::time::sleep;
35
36lazy_static! {
37 pub(crate) static ref DEFAULT_RETRY_STRATEGY: Arc<dyn RetryStrategy> =
38 Arc::new(FailFastRetryStrategy::default());
39}
40
41#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
42#[non_exhaustive]
43pub enum RetryReason {
44 KvNotMyVbucket,
45 KvInvalidVbucketMap,
46 KvTemporaryFailure,
47 KvCollectionOutdated,
48 KvErrorMapRetryIndicated,
49 KvLocked,
50 KvSyncWriteInProgress,
51 KvSyncWriteRecommitInProgress,
52 ServiceNotAvailable,
53 SocketClosedWhileInFlight,
54 SocketNotAvailable,
55 QueryPreparedStatementFailure,
56 QueryIndexNotFound,
57 SearchTooManyRequests,
58 NotReady,
59}
60
61impl RetryReason {
62 pub fn allows_non_idempotent_retry(&self) -> bool {
63 matches!(
64 self,
65 RetryReason::KvInvalidVbucketMap
66 | RetryReason::KvNotMyVbucket
67 | RetryReason::KvTemporaryFailure
68 | RetryReason::KvCollectionOutdated
69 | RetryReason::KvErrorMapRetryIndicated
70 | RetryReason::KvLocked
71 | RetryReason::ServiceNotAvailable
72 | RetryReason::SocketNotAvailable
73 | RetryReason::KvSyncWriteInProgress
74 | RetryReason::KvSyncWriteRecommitInProgress
75 | RetryReason::QueryPreparedStatementFailure
76 | RetryReason::QueryIndexNotFound
77 | RetryReason::SearchTooManyRequests
78 | RetryReason::NotReady
79 )
80 }
81
82 pub fn always_retry(&self) -> bool {
83 match self {
84 RetryReason::KvInvalidVbucketMap => true,
85 RetryReason::KvNotMyVbucket => true,
86 RetryReason::KvTemporaryFailure => false,
87 RetryReason::KvCollectionOutdated => true,
88 RetryReason::KvErrorMapRetryIndicated => false,
89 RetryReason::KvLocked => false,
90 RetryReason::NotReady => false,
91 _ => false,
92 }
93 }
94}
95
96impl Display for RetryReason {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 match self {
99 RetryReason::KvNotMyVbucket => write!(f, "KV_NOT_MY_VBUCKET"),
100 RetryReason::KvInvalidVbucketMap => write!(f, "KV_INVALID_VBUCKET_MAP"),
101 RetryReason::KvTemporaryFailure => write!(f, "KV_TEMPORARY_FAILURE"),
102 RetryReason::KvCollectionOutdated => write!(f, "KV_COLLECTION_OUTDATED"),
103 RetryReason::KvErrorMapRetryIndicated => write!(f, "KV_ERROR_MAP_RETRY_INDICATED"),
104 RetryReason::KvLocked => write!(f, "KV_LOCKED"),
105 RetryReason::ServiceNotAvailable => write!(f, "SERVICE_NOT_AVAILABLE"),
106 RetryReason::SocketClosedWhileInFlight => write!(f, "SOCKET_CLOSED_WHILE_IN_FLIGHT"),
107 RetryReason::SocketNotAvailable => write!(f, "SOCKET_NOT_AVAILABLE"),
108 RetryReason::KvSyncWriteInProgress => write!(f, "KV_SYNC_WRITE_IN_PROGRESS"),
109 RetryReason::KvSyncWriteRecommitInProgress => {
110 write!(f, "KV_SYNC_WRITE_RECOMMIT_IN_PROGRESS")
111 }
112 RetryReason::QueryPreparedStatementFailure => {
113 write!(f, "QUERY_PREPARED_STATEMENT_FAILURE")
114 }
115 RetryReason::QueryIndexNotFound => write!(f, "QUERY_INDEX_NOT_FOUND"),
116 RetryReason::SearchTooManyRequests => write!(f, "SEARCH_TOO_MANY_REQUESTS"),
117 RetryReason::NotReady => write!(f, "NOT_READY"),
118 }
119 }
120}
121
122pub struct RetryManager {
123 err_map_component: Arc<ErrMapComponent>,
124}
125
126impl RetryManager {
127 pub fn new(err_map_component: Arc<ErrMapComponent>) -> Self {
128 Self { err_map_component }
129 }
130
131 pub async fn maybe_retry(
132 &self,
133 request: &mut RetryInfo,
134 reason: RetryReason,
135 ) -> Option<Duration> {
136 if reason.always_retry() {
137 request.add_retry_attempt(reason);
138 let backoff = controlled_backoff(request.retry_attempts);
139
140 return Some(backoff);
141 }
142
143 let strategy = request.retry_strategy();
144 let action = strategy.retry_after(request, &reason).await;
145
146 if let Some(a) = action {
147 request.add_retry_attempt(reason);
148
149 return Some(a.duration);
150 }
151
152 None
153 }
154}
155
156#[derive(Clone, Debug)]
157pub struct RetryAction {
158 pub duration: Duration,
159}
160
161impl RetryAction {
162 pub fn new(duration: Duration) -> Self {
163 Self { duration }
164 }
165}
166
167#[async_trait]
168pub trait RetryStrategy: Debug + Send + Sync {
169 async fn retry_after(&self, request: &RetryInfo, reason: &RetryReason) -> Option<RetryAction>;
170}
171
172#[derive(Clone, Debug)]
173pub struct RetryInfo {
174 pub(crate) operation: &'static str,
175 pub(crate) is_idempotent: bool,
176 pub(crate) retry_strategy: Arc<dyn RetryStrategy>,
177 pub(crate) retry_attempts: u32,
178 pub(crate) retry_reasons: HashSet<RetryReason>,
179 pub(crate) unique_id: Option<String>,
180}
181
182impl RetryInfo {
183 pub(crate) fn new(
184 operation: &'static str,
185 is_idempotent: bool,
186 retry_strategy: Arc<dyn RetryStrategy>,
187 ) -> Self {
188 Self {
189 operation,
190 is_idempotent,
191 retry_strategy,
192 retry_attempts: 0,
193 retry_reasons: Default::default(),
194 unique_id: None,
195 }
196 }
197
198 pub(crate) fn add_retry_attempt(&mut self, reason: RetryReason) {
199 self.retry_attempts += 1;
200 self.retry_reasons.insert(reason);
201 }
202
203 pub fn is_idempotent(&self) -> bool {
204 self.is_idempotent
205 }
206
207 pub fn retry_strategy(&self) -> &Arc<dyn RetryStrategy> {
208 &self.retry_strategy
209 }
210
211 pub fn retry_attempts(&self) -> u32 {
212 self.retry_attempts
213 }
214
215 pub fn retry_reasons(&self) -> &HashSet<RetryReason> {
216 &self.retry_reasons
217 }
218}
219
220impl Display for RetryInfo {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 write!(
223 f,
224 "{{ operation: {}, id: {}, is_idempotent: {}, retry_attempts: {}, retry_reasons: {} }}",
225 self.operation,
226 self.unique_id.as_ref().unwrap_or(&"".to_string()),
227 self.is_idempotent,
228 self.retry_attempts,
229 self.retry_reasons
230 .iter()
231 .map(|r| r.to_string())
232 .collect::<Vec<_>>()
233 .join(", ")
234 )
235 }
236}
237
238pub(crate) async fn orchestrate_retries<Fut, Resp>(
239 rs: Arc<RetryManager>,
240 mut retry_info: RetryInfo,
241 operation: impl Fn() -> Fut + Send + Sync,
242) -> error::Result<Resp>
243where
244 Fut: Future<Output = error::Result<Resp>> + Send,
245 Resp: Send,
246{
247 loop {
248 let mut err = match operation().await {
249 Ok(r) => {
250 return Ok(r);
251 }
252 Err(e) => e,
253 };
254
255 if let Some(reason) = error_to_retry_reason(&rs, &mut retry_info, &err) {
256 if let Some(duration) = rs.maybe_retry(&mut retry_info, reason).await {
257 debug!(
258 "Retrying {} after {:?} due to {}",
259 &retry_info, duration, reason
260 );
261 sleep(duration).await;
262 continue;
263 }
264 }
265
266 if retry_info.retry_attempts > 0 {
267 err.set_retry_info(retry_info);
269 }
270
271 return Err(err);
272 }
273}
274
275pub(crate) fn error_to_retry_reason(
276 rs: &Arc<RetryManager>,
277 retry_info: &mut RetryInfo,
278 err: &Error,
279) -> Option<RetryReason> {
280 match err.kind() {
281 ErrorKind::Memdx(err) => {
282 retry_info.unique_id = err.has_opaque().map(|o| o.to_string());
283
284 match err.kind() {
285 Server(e) => return server_error_to_retry_reason(rs, e),
286 Resource(e) => return server_error_to_retry_reason(rs, e.cause()),
287 Cancelled(e) => {
288 if e == &CancellationErrorKind::ClosedInFlight {
289 return Some(RetryReason::SocketClosedWhileInFlight);
290 }
291 }
292 Dispatch { .. } => return Some(RetryReason::SocketNotAvailable),
293 _ => {}
294 }
295 }
296 ErrorKind::NoVbucketMap => {
297 return Some(RetryReason::KvInvalidVbucketMap);
298 }
299 ErrorKind::ServiceNotAvailable { .. } => {
300 return Some(RetryReason::ServiceNotAvailable);
301 }
302 ErrorKind::Query(e) => {
303 if let queryx::error::ErrorKind::Server(e) = e.kind() {
304 match e.kind() {
305 queryx::error::ServerErrorKind::PreparedStatementFailure => {
306 return Some(RetryReason::QueryPreparedStatementFailure);
307 }
308 queryx::error::ServerErrorKind::IndexNotFound => {
309 return Some(RetryReason::QueryIndexNotFound);
310 }
311 _ => {}
312 }
313 }
314 }
315 ErrorKind::Search(e) => {
316 if let searchx::error::ErrorKind::Server(e) = e.kind() {
317 if e.status_code() == 429 {
318 return Some(RetryReason::SearchTooManyRequests);
319 }
320 }
321 }
322 _ => {}
323 }
324
325 None
326}
327
328fn server_error_to_retry_reason(rs: &Arc<RetryManager>, e: &ServerError) -> Option<RetryReason> {
329 match e.kind() {
330 ServerErrorKind::NotMyVbucket => {
331 return Some(RetryReason::KvNotMyVbucket);
332 }
333 ServerErrorKind::TmpFail => {
334 return Some(RetryReason::KvTemporaryFailure);
335 }
336 ServerErrorKind::UnknownCollectionID => {
337 return Some(RetryReason::KvCollectionOutdated);
338 }
339 ServerErrorKind::UnknownCollectionName => {
340 return Some(RetryReason::KvCollectionOutdated);
341 }
342 ServerErrorKind::Locked => {
343 return Some(RetryReason::KvLocked);
344 }
345 ServerErrorKind::SyncWriteInProgress => {
346 return Some(RetryReason::KvSyncWriteInProgress);
347 }
348 ServerErrorKind::SyncWriteRecommitInProgress => {
349 return Some(RetryReason::KvSyncWriteRecommitInProgress);
350 }
351 ServerErrorKind::UnknownStatus { status } => {
352 if rs.err_map_component.should_retry(status) {
353 return Some(RetryReason::KvErrorMapRetryIndicated);
354 }
355 }
356 _ => {}
357 }
358
359 None
360}