1use std::collections::HashSet;
20use std::fmt::{Debug, Display};
21use std::future::Future;
22use std::sync::Arc;
23use std::time::Duration;
24
25use crate::errmapcomponent::ErrMapComponent;
26use crate::error::{Error, ErrorKind};
27use crate::memdx::error::ErrorKind::{Cancelled, Dispatch, Resource, Server};
28use crate::memdx::error::{CancellationErrorKind, ServerError, ServerErrorKind};
29use crate::retryfailfast::FailFastRetryStrategy;
30use crate::tracingcomponent::SPAN_ATTRIB_RETRIES;
31use crate::{analyticsx, error, httpx, mgmtx, queryx, searchx};
32use async_trait::async_trait;
33use tokio::time::sleep;
34use tracing::{debug, info};
35
36lazy_static! {
37 pub(crate) static ref DEFAULT_RETRY_STRATEGY: Arc<dyn RetryStrategy> =
38 Arc::new(FailFastRetryStrategy::default());
39}
40
41#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
47#[non_exhaustive]
48pub enum RetryReason {
49 KvNotMyVbucket,
51 KvInvalidVbucketMap,
53 KvTemporaryFailure,
55 KvCollectionOutdated,
57 KvErrorMapRetryIndicated,
59 KvLocked,
61 KvSyncWriteInProgress,
63 KvSyncWriteRecommitInProgress,
65 ServiceNotAvailable,
67 SocketClosedWhileInFlight,
69 SocketNotAvailable,
71 QueryPreparedStatementFailure,
73 QueryIndexNotFound,
75 QueryErrorRetryable,
77 SearchTooManyRequests,
79 HttpSendRequestFailed,
81 HttpConnectFailed,
83 NotReady,
85}
86
87impl RetryReason {
88 pub fn allows_non_idempotent_retry(&self) -> bool {
93 matches!(
94 self,
95 RetryReason::KvInvalidVbucketMap
96 | RetryReason::KvNotMyVbucket
97 | RetryReason::KvTemporaryFailure
98 | RetryReason::KvCollectionOutdated
99 | RetryReason::KvErrorMapRetryIndicated
100 | RetryReason::KvLocked
101 | RetryReason::ServiceNotAvailable
102 | RetryReason::SocketNotAvailable
103 | RetryReason::KvSyncWriteInProgress
104 | RetryReason::KvSyncWriteRecommitInProgress
105 | RetryReason::QueryPreparedStatementFailure
106 | RetryReason::QueryIndexNotFound
107 | RetryReason::QueryErrorRetryable
108 | RetryReason::SearchTooManyRequests
109 | RetryReason::HttpSendRequestFailed
110 | RetryReason::HttpConnectFailed
111 | RetryReason::NotReady
112 )
113 }
114
115 pub fn always_retry(&self) -> bool {
118 matches!(
119 self,
120 RetryReason::KvInvalidVbucketMap
121 | RetryReason::KvNotMyVbucket
122 | RetryReason::KvCollectionOutdated
123 )
124 }
125}
126
127impl Display for RetryReason {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 match self {
130 RetryReason::KvNotMyVbucket => write!(f, "KV_NOT_MY_VBUCKET"),
131 RetryReason::KvInvalidVbucketMap => write!(f, "KV_INVALID_VBUCKET_MAP"),
132 RetryReason::KvTemporaryFailure => write!(f, "KV_TEMPORARY_FAILURE"),
133 RetryReason::KvCollectionOutdated => write!(f, "KV_COLLECTION_OUTDATED"),
134 RetryReason::KvErrorMapRetryIndicated => write!(f, "KV_ERROR_MAP_RETRY_INDICATED"),
135 RetryReason::KvLocked => write!(f, "KV_LOCKED"),
136 RetryReason::ServiceNotAvailable => write!(f, "SERVICE_NOT_AVAILABLE"),
137 RetryReason::SocketClosedWhileInFlight => write!(f, "SOCKET_CLOSED_WHILE_IN_FLIGHT"),
138 RetryReason::SocketNotAvailable => write!(f, "SOCKET_NOT_AVAILABLE"),
139 RetryReason::KvSyncWriteInProgress => write!(f, "KV_SYNC_WRITE_IN_PROGRESS"),
140 RetryReason::KvSyncWriteRecommitInProgress => {
141 write!(f, "KV_SYNC_WRITE_RECOMMIT_IN_PROGRESS")
142 }
143 RetryReason::QueryPreparedStatementFailure => {
144 write!(f, "QUERY_PREPARED_STATEMENT_FAILURE")
145 }
146 RetryReason::QueryIndexNotFound => write!(f, "QUERY_INDEX_NOT_FOUND"),
147 RetryReason::QueryErrorRetryable => write!(f, "QUERY_ERROR_RETRYABLE"),
148 RetryReason::SearchTooManyRequests => write!(f, "SEARCH_TOO_MANY_REQUESTS"),
149 RetryReason::NotReady => write!(f, "NOT_READY"),
150 RetryReason::HttpSendRequestFailed => write!(f, "HTTP_SEND_REQUEST_FAILED"),
151 RetryReason::HttpConnectFailed => write!(f, "HTTP_CONNECT_FAILED"),
152 }
153 }
154}
155
156#[derive(Clone, Debug)]
160pub struct RetryAction {
161 pub duration: Duration,
163}
164
165impl RetryAction {
166 pub fn new(duration: Duration) -> Self {
168 Self { duration }
169 }
170}
171
172pub trait RetryStrategy: Debug + Send + Sync {
202 fn retry_after(&self, request: &RetryRequest, reason: &RetryReason) -> Option<RetryAction>;
209}
210
211#[derive(Clone, Debug)]
213pub struct RetryRequest {
214 pub(crate) operation: &'static str,
215 pub is_idempotent: bool,
217 pub retry_attempts: u32,
219 pub retry_reasons: HashSet<RetryReason>,
221 pub(crate) unique_id: Option<String>,
222}
223
224impl RetryRequest {
225 pub(crate) fn new(operation: &'static str, is_idempotent: bool) -> Self {
226 Self {
227 operation,
228 is_idempotent,
229 retry_attempts: 0,
230 retry_reasons: Default::default(),
231 unique_id: None,
232 }
233 }
234
235 pub(crate) fn add_retry_attempt(&mut self, reason: RetryReason) {
236 self.retry_attempts += 1;
237 tracing::Span::current().record(SPAN_ATTRIB_RETRIES, self.retry_attempts);
238 self.retry_reasons.insert(reason);
239 }
240
241 pub fn is_idempotent(&self) -> bool {
242 self.is_idempotent
243 }
244
245 pub fn retry_attempts(&self) -> u32 {
246 self.retry_attempts
247 }
248
249 pub fn retry_reasons(&self) -> &HashSet<RetryReason> {
250 &self.retry_reasons
251 }
252}
253
254impl Display for RetryRequest {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 write!(
257 f,
258 "{{ operation: {}, id: {}, is_idempotent: {}, retry_attempts: {}, retry_reasons: {} }}",
259 self.operation,
260 self.unique_id.as_ref().unwrap_or(&"".to_string()),
261 self.is_idempotent,
262 self.retry_attempts,
263 self.retry_reasons
264 .iter()
265 .map(|r| r.to_string())
266 .collect::<Vec<_>>()
267 .join(", ")
268 )
269 }
270}
271
272pub struct RetryManager {
273 err_map_component: Arc<ErrMapComponent>,
274}
275
276impl RetryManager {
277 pub fn new(err_map_component: Arc<ErrMapComponent>) -> Self {
278 Self { err_map_component }
279 }
280
281 pub async fn maybe_retry(
282 &self,
283 strategy: Arc<dyn RetryStrategy>,
284 request: &mut RetryRequest,
285 reason: RetryReason,
286 ) -> Option<Duration> {
287 if reason.always_retry() {
288 request.add_retry_attempt(reason);
289 let backoff = controlled_backoff(request.retry_attempts);
290
291 return Some(backoff);
292 }
293
294 let action = strategy.retry_after(request, &reason);
295
296 if let Some(a) = action {
297 request.add_retry_attempt(reason);
298
299 return Some(a.duration);
300 }
301
302 None
303 }
304}
305
306pub(crate) async fn orchestrate_retries<Fut, Resp>(
307 rs: Arc<RetryManager>,
308 strategy: Arc<dyn RetryStrategy>,
309 mut retry_info: RetryRequest,
310 operation: impl Fn() -> Fut + Send + Sync,
311) -> error::Result<Resp>
312where
313 Fut: Future<Output = error::Result<Resp>> + Send,
314 Resp: Send,
315{
316 loop {
317 let mut err = match operation().await {
318 Ok(r) => {
319 return Ok(r);
320 }
321 Err(e) => e,
322 };
323
324 if let Some(reason) = error_to_retry_reason(&rs, &mut retry_info, &err) {
325 if let Some(duration) = rs
326 .maybe_retry(strategy.clone(), &mut retry_info, reason)
327 .await
328 {
329 debug!(
330 "Retrying {} after {:?} due to {}",
331 &retry_info, duration, reason
332 );
333 sleep(duration).await;
334 continue;
335 }
336 }
337
338 if retry_info.retry_attempts > 0 {
339 err.set_retry_info(retry_info);
341 }
342
343 return Err(err);
344 }
345}
346
347pub(crate) fn error_to_retry_reason(
348 rs: &Arc<RetryManager>,
349 retry_info: &mut RetryRequest,
350 err: &Error,
351) -> Option<RetryReason> {
352 match err.kind() {
353 ErrorKind::Memdx(err) => {
354 retry_info.unique_id = err.has_opaque().map(|o| o.to_string());
355
356 match err.kind() {
357 Server(e) => return server_error_to_retry_reason(rs, e),
358 Resource(e) => return server_error_to_retry_reason(rs, e.cause()),
359 Cancelled(e) => {
360 if e == &CancellationErrorKind::ClosedInFlight {
361 return Some(RetryReason::SocketClosedWhileInFlight);
362 }
363 }
364 Dispatch { .. } => return Some(RetryReason::SocketNotAvailable),
365 _ => {}
366 }
367 }
368 ErrorKind::NoVbucketMap => {
369 return Some(RetryReason::KvInvalidVbucketMap);
370 }
371 ErrorKind::ServiceNotAvailable { .. } => {
372 return Some(RetryReason::ServiceNotAvailable);
373 }
374 ErrorKind::Query(e) => match e.kind() {
375 queryx::error::ErrorKind::Server(e) => match e.kind() {
376 queryx::error::ServerErrorKind::PreparedStatementFailure => {
377 return Some(RetryReason::QueryPreparedStatementFailure);
378 }
379 queryx::error::ServerErrorKind::IndexNotFound => {
380 return Some(RetryReason::QueryIndexNotFound);
381 }
382 _ => {
383 if e.retry() {
384 return Some(RetryReason::QueryErrorRetryable);
385 }
386 }
387 },
388 queryx::error::ErrorKind::Http { error, .. } => match error.kind() {
389 httpx::error::ErrorKind::SendRequest(_) => {
390 return Some(RetryReason::HttpSendRequestFailed);
391 }
392 httpx::error::ErrorKind::Connect { .. } => {
393 return Some(RetryReason::HttpConnectFailed);
394 }
395 _ => {}
396 },
397 _ => {}
398 },
399 ErrorKind::Search(e) => match e.kind() {
400 searchx::error::ErrorKind::Server(e) => {
401 if e.status_code() == 429 {
402 return Some(RetryReason::SearchTooManyRequests);
403 }
404 }
405 searchx::error::ErrorKind::Http { error, .. } => match error.kind() {
406 httpx::error::ErrorKind::SendRequest(_) => {
407 return Some(RetryReason::HttpSendRequestFailed);
408 }
409 httpx::error::ErrorKind::Connect { .. } => {
410 return Some(RetryReason::HttpConnectFailed);
411 }
412 _ => {}
413 },
414 _ => {}
415 },
416 ErrorKind::Analytics(e) => {
417 if let analyticsx::error::ErrorKind::Http { error, .. } = e.kind() {
418 match error.kind() {
419 httpx::error::ErrorKind::SendRequest(_) => {
420 return Some(RetryReason::HttpSendRequestFailed);
421 }
422 httpx::error::ErrorKind::Connect { .. } => {
423 return Some(RetryReason::HttpConnectFailed);
424 }
425 _ => {}
426 }
427 }
428 }
429 ErrorKind::Mgmt(e) => {
430 if let mgmtx::error::ErrorKind::Http(error) = e.kind() {
431 match error.kind() {
432 httpx::error::ErrorKind::SendRequest(_) => {
433 return Some(RetryReason::HttpSendRequestFailed);
434 }
435 httpx::error::ErrorKind::Connect { .. } => {
436 return Some(RetryReason::HttpConnectFailed);
437 }
438 _ => {}
439 }
440 }
441 }
442 _ => {}
443 }
444
445 None
446}
447
448fn server_error_to_retry_reason(rs: &Arc<RetryManager>, e: &ServerError) -> Option<RetryReason> {
449 match e.kind() {
450 ServerErrorKind::NotMyVbucket => {
451 return Some(RetryReason::KvNotMyVbucket);
452 }
453 ServerErrorKind::TmpFail => {
454 return Some(RetryReason::KvTemporaryFailure);
455 }
456 ServerErrorKind::UnknownCollectionID => {
457 return Some(RetryReason::KvCollectionOutdated);
458 }
459 ServerErrorKind::UnknownCollectionName => {
460 return Some(RetryReason::KvCollectionOutdated);
461 }
462 ServerErrorKind::UnknownScopeName => {
463 return Some(RetryReason::KvCollectionOutdated);
464 }
465 ServerErrorKind::Locked => {
466 return Some(RetryReason::KvLocked);
467 }
468 ServerErrorKind::SyncWriteInProgress => {
469 return Some(RetryReason::KvSyncWriteInProgress);
470 }
471 ServerErrorKind::SyncWriteRecommitInProgress => {
472 return Some(RetryReason::KvSyncWriteRecommitInProgress);
473 }
474 ServerErrorKind::UnknownStatus { status } => {
475 if rs.err_map_component.should_retry(status) {
476 return Some(RetryReason::KvErrorMapRetryIndicated);
477 }
478 }
479 _ => {}
480 }
481
482 None
483}
484
485pub(crate) fn controlled_backoff(retry_attempts: u32) -> Duration {
486 match retry_attempts {
487 0 => Duration::from_millis(1),
488 1 => Duration::from_millis(10),
489 2 => Duration::from_millis(50),
490 3 => Duration::from_millis(100),
491 4 => Duration::from_millis(500),
492 _ => Duration::from_millis(1000),
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499 use crate::queryx;
500 use http::StatusCode;
501
502 fn make_retry_manager() -> Arc<RetryManager> {
503 Arc::new(RetryManager::new(Arc::new(ErrMapComponent::default())))
504 }
505
506 fn make_query_server_error(kind: queryx::error::ServerErrorKind, retry: bool) -> Error {
507 let server_error = queryx::error::ServerError::new(
508 kind,
509 "localhost:8093",
510 StatusCode::INTERNAL_SERVER_ERROR,
511 12345,
512 retry,
513 "test error",
514 );
515 queryx::error::Error::new_server_error(server_error).into()
516 }
517
518 #[test]
519 fn test_query_error_retryable_when_retry_true() {
520 let rs = make_retry_manager();
521 let mut retry_info = RetryRequest::new("query", false);
522 let err = make_query_server_error(queryx::error::ServerErrorKind::Unknown, true);
523
524 let reason = error_to_retry_reason(&rs, &mut retry_info, &err);
525 assert_eq!(reason, Some(RetryReason::QueryErrorRetryable));
526 }
527
528 #[test]
529 fn test_query_error_not_retryable_when_retry_false() {
530 let rs = make_retry_manager();
531 let mut retry_info = RetryRequest::new("query", false);
532 let err = make_query_server_error(queryx::error::ServerErrorKind::Unknown, false);
533
534 let reason = error_to_retry_reason(&rs, &mut retry_info, &err);
535 assert_eq!(reason, None);
536 }
537
538 #[test]
539 fn test_query_prepared_statement_failure_ignores_retry_flag() {
540 let rs = make_retry_manager();
541 let mut retry_info = RetryRequest::new("query", false);
542 let err = make_query_server_error(
543 queryx::error::ServerErrorKind::PreparedStatementFailure,
544 false,
545 );
546
547 let reason = error_to_retry_reason(&rs, &mut retry_info, &err);
548 assert_eq!(reason, Some(RetryReason::QueryPreparedStatementFailure));
549 }
550
551 #[test]
552 fn test_query_index_not_found_ignores_retry_flag() {
553 let rs = make_retry_manager();
554 let mut retry_info = RetryRequest::new("query", false);
555 let err = make_query_server_error(queryx::error::ServerErrorKind::IndexNotFound, false);
556
557 let reason = error_to_retry_reason(&rs, &mut retry_info, &err);
558 assert_eq!(reason, Some(RetryReason::QueryIndexNotFound));
559 }
560
561 #[test]
562 fn test_query_error_retryable_allows_non_idempotent_retry() {
563 assert!(RetryReason::QueryErrorRetryable.allows_non_idempotent_retry());
564 }
565
566 #[test]
567 fn test_query_error_retryable_does_not_always_retry() {
568 assert!(!RetryReason::QueryErrorRetryable.always_retry());
569 }
570}