1use std::collections::HashSet;
20use std::fmt::{Debug, Display};
21use std::future::Future;
22use std::sync::Arc;
23use std::time::Duration;
24
25use crate::errmapcomponent::ErrMapComponent;
26use crate::error::{Error, ErrorKind};
27use crate::memdx::error::ErrorKind::{Cancelled, Dispatch, Resource, Server};
28use crate::memdx::error::{CancellationErrorKind, ServerError, ServerErrorKind};
29use crate::retryfailfast::FailFastRetryStrategy;
30use crate::tracingcomponent::SPAN_ATTRIB_RETRIES;
31use crate::{analyticsx, error, httpx, mgmtx, queryx, searchx};
32use async_trait::async_trait;
33use tokio::time::sleep;
34use tracing::{debug, info};
35
36lazy_static! {
37 pub(crate) static ref DEFAULT_RETRY_STRATEGY: Arc<dyn RetryStrategy> =
38 Arc::new(FailFastRetryStrategy::default());
39}
40
41#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
47#[non_exhaustive]
48pub enum RetryReason {
49 KvNotMyVbucket,
51 KvInvalidVbucketMap,
53 KvTemporaryFailure,
55 KvCollectionOutdated,
57 KvErrorMapRetryIndicated,
59 KvLocked,
61 KvSyncWriteInProgress,
63 KvSyncWriteRecommitInProgress,
65 ServiceNotAvailable,
67 SocketClosedWhileInFlight,
69 SocketNotAvailable,
71 QueryPreparedStatementFailure,
73 QueryIndexNotFound,
75 SearchTooManyRequests,
77 HttpSendRequestFailed,
79 HttpConnectFailed,
81 NotReady,
83}
84
85impl RetryReason {
86 pub fn allows_non_idempotent_retry(&self) -> bool {
91 matches!(
92 self,
93 RetryReason::KvInvalidVbucketMap
94 | RetryReason::KvNotMyVbucket
95 | RetryReason::KvTemporaryFailure
96 | RetryReason::KvCollectionOutdated
97 | RetryReason::KvErrorMapRetryIndicated
98 | RetryReason::KvLocked
99 | RetryReason::ServiceNotAvailable
100 | RetryReason::SocketNotAvailable
101 | RetryReason::KvSyncWriteInProgress
102 | RetryReason::KvSyncWriteRecommitInProgress
103 | RetryReason::QueryPreparedStatementFailure
104 | RetryReason::QueryIndexNotFound
105 | RetryReason::SearchTooManyRequests
106 | RetryReason::HttpSendRequestFailed
107 | RetryReason::HttpConnectFailed
108 | RetryReason::NotReady
109 )
110 }
111
112 pub fn always_retry(&self) -> bool {
115 matches!(
116 self,
117 RetryReason::KvInvalidVbucketMap
118 | RetryReason::KvNotMyVbucket
119 | RetryReason::KvCollectionOutdated
120 )
121 }
122}
123
124impl Display for RetryReason {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 match self {
127 RetryReason::KvNotMyVbucket => write!(f, "KV_NOT_MY_VBUCKET"),
128 RetryReason::KvInvalidVbucketMap => write!(f, "KV_INVALID_VBUCKET_MAP"),
129 RetryReason::KvTemporaryFailure => write!(f, "KV_TEMPORARY_FAILURE"),
130 RetryReason::KvCollectionOutdated => write!(f, "KV_COLLECTION_OUTDATED"),
131 RetryReason::KvErrorMapRetryIndicated => write!(f, "KV_ERROR_MAP_RETRY_INDICATED"),
132 RetryReason::KvLocked => write!(f, "KV_LOCKED"),
133 RetryReason::ServiceNotAvailable => write!(f, "SERVICE_NOT_AVAILABLE"),
134 RetryReason::SocketClosedWhileInFlight => write!(f, "SOCKET_CLOSED_WHILE_IN_FLIGHT"),
135 RetryReason::SocketNotAvailable => write!(f, "SOCKET_NOT_AVAILABLE"),
136 RetryReason::KvSyncWriteInProgress => write!(f, "KV_SYNC_WRITE_IN_PROGRESS"),
137 RetryReason::KvSyncWriteRecommitInProgress => {
138 write!(f, "KV_SYNC_WRITE_RECOMMIT_IN_PROGRESS")
139 }
140 RetryReason::QueryPreparedStatementFailure => {
141 write!(f, "QUERY_PREPARED_STATEMENT_FAILURE")
142 }
143 RetryReason::QueryIndexNotFound => write!(f, "QUERY_INDEX_NOT_FOUND"),
144 RetryReason::SearchTooManyRequests => write!(f, "SEARCH_TOO_MANY_REQUESTS"),
145 RetryReason::NotReady => write!(f, "NOT_READY"),
146 RetryReason::HttpSendRequestFailed => write!(f, "HTTP_SEND_REQUEST_FAILED"),
147 RetryReason::HttpConnectFailed => write!(f, "HTTP_CONNECT_FAILED"),
148 }
149 }
150}
151
152#[derive(Clone, Debug)]
156pub struct RetryAction {
157 pub duration: Duration,
159}
160
161impl RetryAction {
162 pub fn new(duration: Duration) -> Self {
164 Self { duration }
165 }
166}
167
168pub trait RetryStrategy: Debug + Send + Sync {
198 fn retry_after(&self, request: &RetryRequest, reason: &RetryReason) -> Option<RetryAction>;
205}
206
207#[derive(Clone, Debug)]
209pub struct RetryRequest {
210 pub(crate) operation: &'static str,
211 pub is_idempotent: bool,
213 pub retry_attempts: u32,
215 pub retry_reasons: HashSet<RetryReason>,
217 pub(crate) unique_id: Option<String>,
218}
219
220impl RetryRequest {
221 pub(crate) fn new(operation: &'static str, is_idempotent: bool) -> Self {
222 Self {
223 operation,
224 is_idempotent,
225 retry_attempts: 0,
226 retry_reasons: Default::default(),
227 unique_id: None,
228 }
229 }
230
231 pub(crate) fn add_retry_attempt(&mut self, reason: RetryReason) {
232 self.retry_attempts += 1;
233 tracing::Span::current().record(SPAN_ATTRIB_RETRIES, self.retry_attempts);
234 self.retry_reasons.insert(reason);
235 }
236
237 pub fn is_idempotent(&self) -> bool {
238 self.is_idempotent
239 }
240
241 pub fn retry_attempts(&self) -> u32 {
242 self.retry_attempts
243 }
244
245 pub fn retry_reasons(&self) -> &HashSet<RetryReason> {
246 &self.retry_reasons
247 }
248}
249
250impl Display for RetryRequest {
251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 write!(
253 f,
254 "{{ operation: {}, id: {}, is_idempotent: {}, retry_attempts: {}, retry_reasons: {} }}",
255 self.operation,
256 self.unique_id.as_ref().unwrap_or(&"".to_string()),
257 self.is_idempotent,
258 self.retry_attempts,
259 self.retry_reasons
260 .iter()
261 .map(|r| r.to_string())
262 .collect::<Vec<_>>()
263 .join(", ")
264 )
265 }
266}
267
268pub struct RetryManager {
269 err_map_component: Arc<ErrMapComponent>,
270}
271
272impl RetryManager {
273 pub fn new(err_map_component: Arc<ErrMapComponent>) -> Self {
274 Self { err_map_component }
275 }
276
277 pub async fn maybe_retry(
278 &self,
279 strategy: Arc<dyn RetryStrategy>,
280 request: &mut RetryRequest,
281 reason: RetryReason,
282 ) -> Option<Duration> {
283 if reason.always_retry() {
284 request.add_retry_attempt(reason);
285 let backoff = controlled_backoff(request.retry_attempts);
286
287 return Some(backoff);
288 }
289
290 let action = strategy.retry_after(request, &reason);
291
292 if let Some(a) = action {
293 request.add_retry_attempt(reason);
294
295 return Some(a.duration);
296 }
297
298 None
299 }
300}
301
302pub(crate) async fn orchestrate_retries<Fut, Resp>(
303 rs: Arc<RetryManager>,
304 strategy: Arc<dyn RetryStrategy>,
305 mut retry_info: RetryRequest,
306 operation: impl Fn() -> Fut + Send + Sync,
307) -> error::Result<Resp>
308where
309 Fut: Future<Output = error::Result<Resp>> + Send,
310 Resp: Send,
311{
312 loop {
313 let mut err = match operation().await {
314 Ok(r) => {
315 return Ok(r);
316 }
317 Err(e) => e,
318 };
319
320 if let Some(reason) = error_to_retry_reason(&rs, &mut retry_info, &err) {
321 if let Some(duration) = rs
322 .maybe_retry(strategy.clone(), &mut retry_info, reason)
323 .await
324 {
325 debug!(
326 "Retrying {} after {:?} due to {}",
327 &retry_info, duration, reason
328 );
329 sleep(duration).await;
330 continue;
331 }
332 }
333
334 if retry_info.retry_attempts > 0 {
335 err.set_retry_info(retry_info);
337 }
338
339 return Err(err);
340 }
341}
342
343pub(crate) fn error_to_retry_reason(
344 rs: &Arc<RetryManager>,
345 retry_info: &mut RetryRequest,
346 err: &Error,
347) -> Option<RetryReason> {
348 match err.kind() {
349 ErrorKind::Memdx(err) => {
350 retry_info.unique_id = err.has_opaque().map(|o| o.to_string());
351
352 match err.kind() {
353 Server(e) => return server_error_to_retry_reason(rs, e),
354 Resource(e) => return server_error_to_retry_reason(rs, e.cause()),
355 Cancelled(e) => {
356 if e == &CancellationErrorKind::ClosedInFlight {
357 return Some(RetryReason::SocketClosedWhileInFlight);
358 }
359 }
360 Dispatch { .. } => return Some(RetryReason::SocketNotAvailable),
361 _ => {}
362 }
363 }
364 ErrorKind::NoVbucketMap => {
365 return Some(RetryReason::KvInvalidVbucketMap);
366 }
367 ErrorKind::ServiceNotAvailable { .. } => {
368 return Some(RetryReason::ServiceNotAvailable);
369 }
370 ErrorKind::Query(e) => match e.kind() {
371 queryx::error::ErrorKind::Server(e) => match e.kind() {
372 queryx::error::ServerErrorKind::PreparedStatementFailure => {
373 return Some(RetryReason::QueryPreparedStatementFailure);
374 }
375 queryx::error::ServerErrorKind::IndexNotFound => {
376 return Some(RetryReason::QueryIndexNotFound);
377 }
378 _ => {}
379 },
380 queryx::error::ErrorKind::Http { error, .. } => match error.kind() {
381 httpx::error::ErrorKind::SendRequest(_) => {
382 return Some(RetryReason::HttpSendRequestFailed);
383 }
384 httpx::error::ErrorKind::Connect { .. } => {
385 return Some(RetryReason::HttpConnectFailed);
386 }
387 _ => {}
388 },
389 _ => {}
390 },
391 ErrorKind::Search(e) => match e.kind() {
392 searchx::error::ErrorKind::Server(e) => {
393 if e.status_code() == 429 {
394 return Some(RetryReason::SearchTooManyRequests);
395 }
396 }
397 searchx::error::ErrorKind::Http { error, .. } => match error.kind() {
398 httpx::error::ErrorKind::SendRequest(_) => {
399 return Some(RetryReason::HttpSendRequestFailed);
400 }
401 httpx::error::ErrorKind::Connect { .. } => {
402 return Some(RetryReason::HttpConnectFailed);
403 }
404 _ => {}
405 },
406 _ => {}
407 },
408 ErrorKind::Analytics(e) => {
409 if let analyticsx::error::ErrorKind::Http { error, .. } = e.kind() {
410 match error.kind() {
411 httpx::error::ErrorKind::SendRequest(_) => {
412 return Some(RetryReason::HttpSendRequestFailed);
413 }
414 httpx::error::ErrorKind::Connect { .. } => {
415 return Some(RetryReason::HttpConnectFailed);
416 }
417 _ => {}
418 }
419 }
420 }
421 ErrorKind::Mgmt(e) => {
422 if let mgmtx::error::ErrorKind::Http(error) = e.kind() {
423 match error.kind() {
424 httpx::error::ErrorKind::SendRequest(_) => {
425 return Some(RetryReason::HttpSendRequestFailed);
426 }
427 httpx::error::ErrorKind::Connect { .. } => {
428 return Some(RetryReason::HttpConnectFailed);
429 }
430 _ => {}
431 }
432 }
433 }
434 _ => {}
435 }
436
437 None
438}
439
440fn server_error_to_retry_reason(rs: &Arc<RetryManager>, e: &ServerError) -> Option<RetryReason> {
441 match e.kind() {
442 ServerErrorKind::NotMyVbucket => {
443 return Some(RetryReason::KvNotMyVbucket);
444 }
445 ServerErrorKind::TmpFail => {
446 return Some(RetryReason::KvTemporaryFailure);
447 }
448 ServerErrorKind::UnknownCollectionID => {
449 return Some(RetryReason::KvCollectionOutdated);
450 }
451 ServerErrorKind::UnknownCollectionName => {
452 return Some(RetryReason::KvCollectionOutdated);
453 }
454 ServerErrorKind::UnknownScopeName => {
455 return Some(RetryReason::KvCollectionOutdated);
456 }
457 ServerErrorKind::Locked => {
458 return Some(RetryReason::KvLocked);
459 }
460 ServerErrorKind::SyncWriteInProgress => {
461 return Some(RetryReason::KvSyncWriteInProgress);
462 }
463 ServerErrorKind::SyncWriteRecommitInProgress => {
464 return Some(RetryReason::KvSyncWriteRecommitInProgress);
465 }
466 ServerErrorKind::UnknownStatus { status } => {
467 if rs.err_map_component.should_retry(status) {
468 return Some(RetryReason::KvErrorMapRetryIndicated);
469 }
470 }
471 _ => {}
472 }
473
474 None
475}
476
477pub(crate) fn controlled_backoff(retry_attempts: u32) -> Duration {
478 match retry_attempts {
479 0 => Duration::from_millis(1),
480 1 => Duration::from_millis(10),
481 2 => Duration::from_millis(50),
482 3 => Duration::from_millis(100),
483 4 => Duration::from_millis(500),
484 _ => Duration::from_millis(1000),
485 }
486}