Skip to main content

couchbase_core/
retry.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use std::collections::HashSet;
20use std::fmt::{Debug, Display};
21use std::future::Future;
22use std::sync::Arc;
23use std::time::Duration;
24
25use crate::errmapcomponent::ErrMapComponent;
26use crate::error::{Error, ErrorKind};
27use crate::memdx::error::ErrorKind::{Cancelled, Dispatch, Resource, Server};
28use crate::memdx::error::{CancellationErrorKind, ServerError, ServerErrorKind};
29use crate::retryfailfast::FailFastRetryStrategy;
30use crate::tracingcomponent::SPAN_ATTRIB_RETRIES;
31use crate::{analyticsx, error, httpx, mgmtx, queryx, searchx};
32use async_trait::async_trait;
33use tokio::time::sleep;
34use tracing::{debug, info};
35
36lazy_static! {
37    pub(crate) static ref DEFAULT_RETRY_STRATEGY: Arc<dyn RetryStrategy> =
38        Arc::new(FailFastRetryStrategy::default());
39}
40
41/// The reason an operation is being retried.
42///
43/// Each variant identifies a specific transient failure condition that triggered
44/// a retry. The SDK passes this to [`RetryStrategy::retry_after`] so the strategy
45/// can decide whether (and how long) to wait before retrying.
46#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
47#[non_exhaustive]
48pub enum RetryReason {
49    /// The server indicated the vBucket is not owned by this node.
50    KvNotMyVbucket,
51    /// The vBucket map is invalid and must be refreshed.
52    KvInvalidVbucketMap,
53    /// A temporary failure occurred on the KV engine.
54    KvTemporaryFailure,
55    /// The collection ID is outdated and must be re-resolved.
56    KvCollectionOutdated,
57    /// The server error map indicated the operation should be retried.
58    KvErrorMapRetryIndicated,
59    /// The document is locked by another operation.
60    KvLocked,
61    /// A sync-write (durable operation) is already in progress on this key.
62    KvSyncWriteInProgress,
63    /// A sync-write recommit is in progress on this key.
64    KvSyncWriteRecommitInProgress,
65    /// The required service is temporarily unavailable.
66    ServiceNotAvailable,
67    /// The connection was closed while the request was in flight.
68    SocketClosedWhileInFlight,
69    /// No connection is currently available.
70    SocketNotAvailable,
71    /// A prepared statement for the query was invalidated.
72    QueryPreparedStatementFailure,
73    /// The query index was not found (may still be building).
74    QueryIndexNotFound,
75    /// The operation is retryable as indicated by the query engine.
76    QueryErrorRetryable,
77    /// The search service is rejecting requests due to rate limiting.
78    SearchTooManyRequests,
79    /// An HTTP request failed to send.
80    HttpSendRequestFailed,
81    /// An HTTP connection failed to be established.
82    HttpConnectFailed,
83    /// The SDK is not yet ready to perform the operation.
84    NotReady,
85}
86
87impl RetryReason {
88    /// Returns `true` if this reason allows retrying non-idempotent operations.
89    ///
90    /// Most retry reasons are safe for non-idempotent retries because the
91    /// server never processed the original request.
92    pub fn allows_non_idempotent_retry(&self) -> bool {
93        matches!(
94            self,
95            RetryReason::KvInvalidVbucketMap
96                | RetryReason::KvNotMyVbucket
97                | RetryReason::KvTemporaryFailure
98                | RetryReason::KvCollectionOutdated
99                | RetryReason::KvErrorMapRetryIndicated
100                | RetryReason::KvLocked
101                | RetryReason::ServiceNotAvailable
102                | RetryReason::SocketNotAvailable
103                | RetryReason::KvSyncWriteInProgress
104                | RetryReason::KvSyncWriteRecommitInProgress
105                | RetryReason::QueryPreparedStatementFailure
106                | RetryReason::QueryIndexNotFound
107                | RetryReason::QueryErrorRetryable
108                | RetryReason::SearchTooManyRequests
109                | RetryReason::HttpSendRequestFailed
110                | RetryReason::HttpConnectFailed
111                | RetryReason::NotReady
112        )
113    }
114
115    /// Returns `true` if the SDK should always retry for this reason,
116    /// regardless of the retry strategy's decision.
117    pub fn always_retry(&self) -> bool {
118        matches!(
119            self,
120            RetryReason::KvInvalidVbucketMap
121                | RetryReason::KvNotMyVbucket
122                | RetryReason::KvCollectionOutdated
123        )
124    }
125}
126
127impl Display for RetryReason {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        match self {
130            RetryReason::KvNotMyVbucket => write!(f, "KV_NOT_MY_VBUCKET"),
131            RetryReason::KvInvalidVbucketMap => write!(f, "KV_INVALID_VBUCKET_MAP"),
132            RetryReason::KvTemporaryFailure => write!(f, "KV_TEMPORARY_FAILURE"),
133            RetryReason::KvCollectionOutdated => write!(f, "KV_COLLECTION_OUTDATED"),
134            RetryReason::KvErrorMapRetryIndicated => write!(f, "KV_ERROR_MAP_RETRY_INDICATED"),
135            RetryReason::KvLocked => write!(f, "KV_LOCKED"),
136            RetryReason::ServiceNotAvailable => write!(f, "SERVICE_NOT_AVAILABLE"),
137            RetryReason::SocketClosedWhileInFlight => write!(f, "SOCKET_CLOSED_WHILE_IN_FLIGHT"),
138            RetryReason::SocketNotAvailable => write!(f, "SOCKET_NOT_AVAILABLE"),
139            RetryReason::KvSyncWriteInProgress => write!(f, "KV_SYNC_WRITE_IN_PROGRESS"),
140            RetryReason::KvSyncWriteRecommitInProgress => {
141                write!(f, "KV_SYNC_WRITE_RECOMMIT_IN_PROGRESS")
142            }
143            RetryReason::QueryPreparedStatementFailure => {
144                write!(f, "QUERY_PREPARED_STATEMENT_FAILURE")
145            }
146            RetryReason::QueryIndexNotFound => write!(f, "QUERY_INDEX_NOT_FOUND"),
147            RetryReason::QueryErrorRetryable => write!(f, "QUERY_ERROR_RETRYABLE"),
148            RetryReason::SearchTooManyRequests => write!(f, "SEARCH_TOO_MANY_REQUESTS"),
149            RetryReason::NotReady => write!(f, "NOT_READY"),
150            RetryReason::HttpSendRequestFailed => write!(f, "HTTP_SEND_REQUEST_FAILED"),
151            RetryReason::HttpConnectFailed => write!(f, "HTTP_CONNECT_FAILED"),
152        }
153    }
154}
155
156/// The action a [`RetryStrategy`] returns to indicate when to retry.
157///
158/// Contains the [`Duration`] to wait before the next retry attempt.
159#[derive(Clone, Debug)]
160pub struct RetryAction {
161    /// How long to wait before retrying.
162    pub duration: Duration,
163}
164
165impl RetryAction {
166    /// Creates a new `RetryAction` with the given backoff duration.
167    pub fn new(duration: Duration) -> Self {
168        Self { duration }
169    }
170}
171
172/// A strategy that decides whether and when to retry a failed operation.
173///
174/// Implement this trait to provide custom retry behavior. The SDK calls
175/// [`retry_after`](RetryStrategy::retry_after) each time a retryable failure
176/// occurs, passing the request metadata and the reason for the failure.
177///
178/// Return `Some(RetryAction)` to retry after the specified duration,
179/// or `None` to stop retrying and propagate the error.
180///
181/// # Example
182///
183/// ```rust
184/// use couchbase_core::retry::{RetryStrategy, RetryAction, RetryRequest, RetryReason};
185/// use std::fmt::Debug;
186/// use std::time::Duration;
187///
188/// #[derive(Debug)]
189/// struct FixedDelayRetry(Duration);
190///
191/// impl RetryStrategy for FixedDelayRetry {
192///     fn retry_after(&self, request: &RetryRequest, reason: &RetryReason) -> Option<RetryAction> {
193///         if request.retry_attempts < 3 {
194///             Some(RetryAction::new(self.0))
195///         } else {
196///             None // give up after 3 attempts
197///         }
198///     }
199/// }
200/// ```
201pub trait RetryStrategy: Debug + Send + Sync {
202    /// Decides whether to retry an operation and how long to wait.
203    ///
204    /// * `request` — Metadata about the in-flight request (attempt count, idempotency, etc.).
205    /// * `reason` — Why the operation failed.
206    ///
207    /// Return `Some(RetryAction)` to retry, or `None` to stop.
208    fn retry_after(&self, request: &RetryRequest, reason: &RetryReason) -> Option<RetryAction>;
209}
210
211/// Metadata about a request that is being considered for retry.
212#[derive(Clone, Debug)]
213pub struct RetryRequest {
214    pub(crate) operation: &'static str,
215    /// Whether the operation is idempotent (safe to retry without side effects).
216    pub is_idempotent: bool,
217    /// The number of retry attempts that have already been made.
218    pub retry_attempts: u32,
219    /// The set of reasons this request has been retried so far.
220    pub retry_reasons: HashSet<RetryReason>,
221    pub(crate) unique_id: Option<String>,
222}
223
224impl RetryRequest {
225    pub(crate) fn new(operation: &'static str, is_idempotent: bool) -> Self {
226        Self {
227            operation,
228            is_idempotent,
229            retry_attempts: 0,
230            retry_reasons: Default::default(),
231            unique_id: None,
232        }
233    }
234
235    pub(crate) fn add_retry_attempt(&mut self, reason: RetryReason) {
236        self.retry_attempts += 1;
237        tracing::Span::current().record(SPAN_ATTRIB_RETRIES, self.retry_attempts);
238        self.retry_reasons.insert(reason);
239    }
240
241    pub fn is_idempotent(&self) -> bool {
242        self.is_idempotent
243    }
244
245    pub fn retry_attempts(&self) -> u32 {
246        self.retry_attempts
247    }
248
249    pub fn retry_reasons(&self) -> &HashSet<RetryReason> {
250        &self.retry_reasons
251    }
252}
253
254impl Display for RetryRequest {
255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256        write!(
257            f,
258            "{{ operation: {}, id: {}, is_idempotent: {}, retry_attempts: {}, retry_reasons: {} }}",
259            self.operation,
260            self.unique_id.as_ref().unwrap_or(&"".to_string()),
261            self.is_idempotent,
262            self.retry_attempts,
263            self.retry_reasons
264                .iter()
265                .map(|r| r.to_string())
266                .collect::<Vec<_>>()
267                .join(", ")
268        )
269    }
270}
271
272pub struct RetryManager {
273    err_map_component: Arc<ErrMapComponent>,
274}
275
276impl RetryManager {
277    pub fn new(err_map_component: Arc<ErrMapComponent>) -> Self {
278        Self { err_map_component }
279    }
280
281    pub async fn maybe_retry(
282        &self,
283        strategy: Arc<dyn RetryStrategy>,
284        request: &mut RetryRequest,
285        reason: RetryReason,
286    ) -> Option<Duration> {
287        if reason.always_retry() {
288            request.add_retry_attempt(reason);
289            let backoff = controlled_backoff(request.retry_attempts);
290
291            return Some(backoff);
292        }
293
294        let action = strategy.retry_after(request, &reason);
295
296        if let Some(a) = action {
297            request.add_retry_attempt(reason);
298
299            return Some(a.duration);
300        }
301
302        None
303    }
304}
305
306pub(crate) async fn orchestrate_retries<Fut, Resp>(
307    rs: Arc<RetryManager>,
308    strategy: Arc<dyn RetryStrategy>,
309    mut retry_info: RetryRequest,
310    operation: impl Fn() -> Fut + Send + Sync,
311) -> error::Result<Resp>
312where
313    Fut: Future<Output = error::Result<Resp>> + Send,
314    Resp: Send,
315{
316    loop {
317        let mut err = match operation().await {
318            Ok(r) => {
319                return Ok(r);
320            }
321            Err(e) => e,
322        };
323
324        if let Some(reason) = error_to_retry_reason(&rs, &mut retry_info, &err) {
325            if let Some(duration) = rs
326                .maybe_retry(strategy.clone(), &mut retry_info, reason)
327                .await
328            {
329                debug!(
330                    "Retrying {} after {:?} due to {}",
331                    &retry_info, duration, reason
332                );
333                sleep(duration).await;
334                continue;
335            }
336        }
337
338        if retry_info.retry_attempts > 0 {
339            // If we aren't retrying then attach any retry info that we have.
340            err.set_retry_info(retry_info);
341        }
342
343        return Err(err);
344    }
345}
346
347pub(crate) fn error_to_retry_reason(
348    rs: &Arc<RetryManager>,
349    retry_info: &mut RetryRequest,
350    err: &Error,
351) -> Option<RetryReason> {
352    match err.kind() {
353        ErrorKind::Memdx(err) => {
354            retry_info.unique_id = err.has_opaque().map(|o| o.to_string());
355
356            match err.kind() {
357                Server(e) => return server_error_to_retry_reason(rs, e),
358                Resource(e) => return server_error_to_retry_reason(rs, e.cause()),
359                Cancelled(e) => {
360                    if e == &CancellationErrorKind::ClosedInFlight {
361                        return Some(RetryReason::SocketClosedWhileInFlight);
362                    }
363                }
364                Dispatch { .. } => return Some(RetryReason::SocketNotAvailable),
365                _ => {}
366            }
367        }
368        ErrorKind::NoVbucketMap => {
369            return Some(RetryReason::KvInvalidVbucketMap);
370        }
371        ErrorKind::ServiceNotAvailable { .. } => {
372            return Some(RetryReason::ServiceNotAvailable);
373        }
374        ErrorKind::Query(e) => match e.kind() {
375            queryx::error::ErrorKind::Server(e) => match e.kind() {
376                queryx::error::ServerErrorKind::PreparedStatementFailure => {
377                    return Some(RetryReason::QueryPreparedStatementFailure);
378                }
379                queryx::error::ServerErrorKind::IndexNotFound => {
380                    return Some(RetryReason::QueryIndexNotFound);
381                }
382                _ => {
383                    if e.retry() {
384                        return Some(RetryReason::QueryErrorRetryable);
385                    }
386                }
387            },
388            queryx::error::ErrorKind::Http { error, .. } => match error.kind() {
389                httpx::error::ErrorKind::SendRequest(_) => {
390                    return Some(RetryReason::HttpSendRequestFailed);
391                }
392                httpx::error::ErrorKind::Connect { .. } => {
393                    return Some(RetryReason::HttpConnectFailed);
394                }
395                _ => {}
396            },
397            _ => {}
398        },
399        ErrorKind::Search(e) => match e.kind() {
400            searchx::error::ErrorKind::Server(e) => {
401                if e.status_code() == 429 {
402                    return Some(RetryReason::SearchTooManyRequests);
403                }
404            }
405            searchx::error::ErrorKind::Http { error, .. } => match error.kind() {
406                httpx::error::ErrorKind::SendRequest(_) => {
407                    return Some(RetryReason::HttpSendRequestFailed);
408                }
409                httpx::error::ErrorKind::Connect { .. } => {
410                    return Some(RetryReason::HttpConnectFailed);
411                }
412                _ => {}
413            },
414            _ => {}
415        },
416        ErrorKind::Analytics(e) => {
417            if let analyticsx::error::ErrorKind::Http { error, .. } = e.kind() {
418                match error.kind() {
419                    httpx::error::ErrorKind::SendRequest(_) => {
420                        return Some(RetryReason::HttpSendRequestFailed);
421                    }
422                    httpx::error::ErrorKind::Connect { .. } => {
423                        return Some(RetryReason::HttpConnectFailed);
424                    }
425                    _ => {}
426                }
427            }
428        }
429        ErrorKind::Mgmt(e) => {
430            if let mgmtx::error::ErrorKind::Http(error) = e.kind() {
431                match error.kind() {
432                    httpx::error::ErrorKind::SendRequest(_) => {
433                        return Some(RetryReason::HttpSendRequestFailed);
434                    }
435                    httpx::error::ErrorKind::Connect { .. } => {
436                        return Some(RetryReason::HttpConnectFailed);
437                    }
438                    _ => {}
439                }
440            }
441        }
442        _ => {}
443    }
444
445    None
446}
447
448fn server_error_to_retry_reason(rs: &Arc<RetryManager>, e: &ServerError) -> Option<RetryReason> {
449    match e.kind() {
450        ServerErrorKind::NotMyVbucket => {
451            return Some(RetryReason::KvNotMyVbucket);
452        }
453        ServerErrorKind::TmpFail => {
454            return Some(RetryReason::KvTemporaryFailure);
455        }
456        ServerErrorKind::UnknownCollectionID => {
457            return Some(RetryReason::KvCollectionOutdated);
458        }
459        ServerErrorKind::UnknownCollectionName => {
460            return Some(RetryReason::KvCollectionOutdated);
461        }
462        ServerErrorKind::UnknownScopeName => {
463            return Some(RetryReason::KvCollectionOutdated);
464        }
465        ServerErrorKind::Locked => {
466            return Some(RetryReason::KvLocked);
467        }
468        ServerErrorKind::SyncWriteInProgress => {
469            return Some(RetryReason::KvSyncWriteInProgress);
470        }
471        ServerErrorKind::SyncWriteRecommitInProgress => {
472            return Some(RetryReason::KvSyncWriteRecommitInProgress);
473        }
474        ServerErrorKind::UnknownStatus { status } => {
475            if rs.err_map_component.should_retry(status) {
476                return Some(RetryReason::KvErrorMapRetryIndicated);
477            }
478        }
479        _ => {}
480    }
481
482    None
483}
484
485pub(crate) fn controlled_backoff(retry_attempts: u32) -> Duration {
486    match retry_attempts {
487        0 => Duration::from_millis(1),
488        1 => Duration::from_millis(10),
489        2 => Duration::from_millis(50),
490        3 => Duration::from_millis(100),
491        4 => Duration::from_millis(500),
492        _ => Duration::from_millis(1000),
493    }
494}
495
496#[cfg(test)]
497mod tests {
498    use super::*;
499    use crate::queryx;
500    use http::StatusCode;
501
502    fn make_retry_manager() -> Arc<RetryManager> {
503        Arc::new(RetryManager::new(Arc::new(ErrMapComponent::default())))
504    }
505
506    fn make_query_server_error(kind: queryx::error::ServerErrorKind, retry: bool) -> Error {
507        let server_error = queryx::error::ServerError::new(
508            kind,
509            "localhost:8093",
510            StatusCode::INTERNAL_SERVER_ERROR,
511            12345,
512            retry,
513            "test error",
514        );
515        queryx::error::Error::new_server_error(server_error).into()
516    }
517
518    #[test]
519    fn test_query_error_retryable_when_retry_true() {
520        let rs = make_retry_manager();
521        let mut retry_info = RetryRequest::new("query", false);
522        let err = make_query_server_error(queryx::error::ServerErrorKind::Unknown, true);
523
524        let reason = error_to_retry_reason(&rs, &mut retry_info, &err);
525        assert_eq!(reason, Some(RetryReason::QueryErrorRetryable));
526    }
527
528    #[test]
529    fn test_query_error_not_retryable_when_retry_false() {
530        let rs = make_retry_manager();
531        let mut retry_info = RetryRequest::new("query", false);
532        let err = make_query_server_error(queryx::error::ServerErrorKind::Unknown, false);
533
534        let reason = error_to_retry_reason(&rs, &mut retry_info, &err);
535        assert_eq!(reason, None);
536    }
537
538    #[test]
539    fn test_query_prepared_statement_failure_ignores_retry_flag() {
540        let rs = make_retry_manager();
541        let mut retry_info = RetryRequest::new("query", false);
542        let err = make_query_server_error(
543            queryx::error::ServerErrorKind::PreparedStatementFailure,
544            false,
545        );
546
547        let reason = error_to_retry_reason(&rs, &mut retry_info, &err);
548        assert_eq!(reason, Some(RetryReason::QueryPreparedStatementFailure));
549    }
550
551    #[test]
552    fn test_query_index_not_found_ignores_retry_flag() {
553        let rs = make_retry_manager();
554        let mut retry_info = RetryRequest::new("query", false);
555        let err = make_query_server_error(queryx::error::ServerErrorKind::IndexNotFound, false);
556
557        let reason = error_to_retry_reason(&rs, &mut retry_info, &err);
558        assert_eq!(reason, Some(RetryReason::QueryIndexNotFound));
559    }
560
561    #[test]
562    fn test_query_error_retryable_allows_non_idempotent_retry() {
563        assert!(RetryReason::QueryErrorRetryable.allows_non_idempotent_retry());
564    }
565
566    #[test]
567    fn test_query_error_retryable_does_not_always_retry() {
568        assert!(!RetryReason::QueryErrorRetryable.always_retry());
569    }
570}