Skip to main content

couchbase_core/
retry.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use std::collections::HashSet;
20use std::fmt::{Debug, Display};
21use std::future::Future;
22use std::sync::Arc;
23use std::time::Duration;
24
25use crate::errmapcomponent::ErrMapComponent;
26use crate::error::{Error, ErrorKind};
27use crate::memdx::error::ErrorKind::{Cancelled, Dispatch, Resource, Server};
28use crate::memdx::error::{CancellationErrorKind, ServerError, ServerErrorKind};
29use crate::retryfailfast::FailFastRetryStrategy;
30use crate::tracingcomponent::SPAN_ATTRIB_RETRIES;
31use crate::{analyticsx, error, httpx, mgmtx, queryx, searchx};
32use async_trait::async_trait;
33use tokio::time::sleep;
34use tracing::{debug, info};
35
36lazy_static! {
37    pub(crate) static ref DEFAULT_RETRY_STRATEGY: Arc<dyn RetryStrategy> =
38        Arc::new(FailFastRetryStrategy::default());
39}
40
41/// The reason an operation is being retried.
42///
43/// Each variant identifies a specific transient failure condition that triggered
44/// a retry. The SDK passes this to [`RetryStrategy::retry_after`] so the strategy
45/// can decide whether (and how long) to wait before retrying.
46#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
47#[non_exhaustive]
48pub enum RetryReason {
49    /// The server indicated the vBucket is not owned by this node.
50    KvNotMyVbucket,
51    /// The vBucket map is invalid and must be refreshed.
52    KvInvalidVbucketMap,
53    /// A temporary failure occurred on the KV engine.
54    KvTemporaryFailure,
55    /// The collection ID is outdated and must be re-resolved.
56    KvCollectionOutdated,
57    /// The server error map indicated the operation should be retried.
58    KvErrorMapRetryIndicated,
59    /// The document is locked by another operation.
60    KvLocked,
61    /// A sync-write (durable operation) is already in progress on this key.
62    KvSyncWriteInProgress,
63    /// A sync-write recommit is in progress on this key.
64    KvSyncWriteRecommitInProgress,
65    /// The required service is temporarily unavailable.
66    ServiceNotAvailable,
67    /// The connection was closed while the request was in flight.
68    SocketClosedWhileInFlight,
69    /// No connection is currently available.
70    SocketNotAvailable,
71    /// A prepared statement for the query was invalidated.
72    QueryPreparedStatementFailure,
73    /// The query index was not found (may still be building).
74    QueryIndexNotFound,
75    /// The search service is rejecting requests due to rate limiting.
76    SearchTooManyRequests,
77    /// An HTTP request failed to send.
78    HttpSendRequestFailed,
79    /// An HTTP connection failed to be established.
80    HttpConnectFailed,
81    /// The SDK is not yet ready to perform the operation.
82    NotReady,
83}
84
85impl RetryReason {
86    /// Returns `true` if this reason allows retrying non-idempotent operations.
87    ///
88    /// Most retry reasons are safe for non-idempotent retries because the
89    /// server never processed the original request.
90    pub fn allows_non_idempotent_retry(&self) -> bool {
91        matches!(
92            self,
93            RetryReason::KvInvalidVbucketMap
94                | RetryReason::KvNotMyVbucket
95                | RetryReason::KvTemporaryFailure
96                | RetryReason::KvCollectionOutdated
97                | RetryReason::KvErrorMapRetryIndicated
98                | RetryReason::KvLocked
99                | RetryReason::ServiceNotAvailable
100                | RetryReason::SocketNotAvailable
101                | RetryReason::KvSyncWriteInProgress
102                | RetryReason::KvSyncWriteRecommitInProgress
103                | RetryReason::QueryPreparedStatementFailure
104                | RetryReason::QueryIndexNotFound
105                | RetryReason::SearchTooManyRequests
106                | RetryReason::HttpSendRequestFailed
107                | RetryReason::HttpConnectFailed
108                | RetryReason::NotReady
109        )
110    }
111
112    /// Returns `true` if the SDK should always retry for this reason,
113    /// regardless of the retry strategy's decision.
114    pub fn always_retry(&self) -> bool {
115        matches!(
116            self,
117            RetryReason::KvInvalidVbucketMap
118                | RetryReason::KvNotMyVbucket
119                | RetryReason::KvCollectionOutdated
120        )
121    }
122}
123
124impl Display for RetryReason {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        match self {
127            RetryReason::KvNotMyVbucket => write!(f, "KV_NOT_MY_VBUCKET"),
128            RetryReason::KvInvalidVbucketMap => write!(f, "KV_INVALID_VBUCKET_MAP"),
129            RetryReason::KvTemporaryFailure => write!(f, "KV_TEMPORARY_FAILURE"),
130            RetryReason::KvCollectionOutdated => write!(f, "KV_COLLECTION_OUTDATED"),
131            RetryReason::KvErrorMapRetryIndicated => write!(f, "KV_ERROR_MAP_RETRY_INDICATED"),
132            RetryReason::KvLocked => write!(f, "KV_LOCKED"),
133            RetryReason::ServiceNotAvailable => write!(f, "SERVICE_NOT_AVAILABLE"),
134            RetryReason::SocketClosedWhileInFlight => write!(f, "SOCKET_CLOSED_WHILE_IN_FLIGHT"),
135            RetryReason::SocketNotAvailable => write!(f, "SOCKET_NOT_AVAILABLE"),
136            RetryReason::KvSyncWriteInProgress => write!(f, "KV_SYNC_WRITE_IN_PROGRESS"),
137            RetryReason::KvSyncWriteRecommitInProgress => {
138                write!(f, "KV_SYNC_WRITE_RECOMMIT_IN_PROGRESS")
139            }
140            RetryReason::QueryPreparedStatementFailure => {
141                write!(f, "QUERY_PREPARED_STATEMENT_FAILURE")
142            }
143            RetryReason::QueryIndexNotFound => write!(f, "QUERY_INDEX_NOT_FOUND"),
144            RetryReason::SearchTooManyRequests => write!(f, "SEARCH_TOO_MANY_REQUESTS"),
145            RetryReason::NotReady => write!(f, "NOT_READY"),
146            RetryReason::HttpSendRequestFailed => write!(f, "HTTP_SEND_REQUEST_FAILED"),
147            RetryReason::HttpConnectFailed => write!(f, "HTTP_CONNECT_FAILED"),
148        }
149    }
150}
151
152/// The action a [`RetryStrategy`] returns to indicate when to retry.
153///
154/// Contains the [`Duration`] to wait before the next retry attempt.
155#[derive(Clone, Debug)]
156pub struct RetryAction {
157    /// How long to wait before retrying.
158    pub duration: Duration,
159}
160
161impl RetryAction {
162    /// Creates a new `RetryAction` with the given backoff duration.
163    pub fn new(duration: Duration) -> Self {
164        Self { duration }
165    }
166}
167
168/// A strategy that decides whether and when to retry a failed operation.
169///
170/// Implement this trait to provide custom retry behavior. The SDK calls
171/// [`retry_after`](RetryStrategy::retry_after) each time a retryable failure
172/// occurs, passing the request metadata and the reason for the failure.
173///
174/// Return `Some(RetryAction)` to retry after the specified duration,
175/// or `None` to stop retrying and propagate the error.
176///
177/// # Example
178///
179/// ```rust
180/// use couchbase_core::retry::{RetryStrategy, RetryAction, RetryRequest, RetryReason};
181/// use std::fmt::Debug;
182/// use std::time::Duration;
183///
184/// #[derive(Debug)]
185/// struct FixedDelayRetry(Duration);
186///
187/// impl RetryStrategy for FixedDelayRetry {
188///     fn retry_after(&self, request: &RetryRequest, reason: &RetryReason) -> Option<RetryAction> {
189///         if request.retry_attempts < 3 {
190///             Some(RetryAction::new(self.0))
191///         } else {
192///             None // give up after 3 attempts
193///         }
194///     }
195/// }
196/// ```
197pub trait RetryStrategy: Debug + Send + Sync {
198    /// Decides whether to retry an operation and how long to wait.
199    ///
200    /// * `request` — Metadata about the in-flight request (attempt count, idempotency, etc.).
201    /// * `reason` — Why the operation failed.
202    ///
203    /// Return `Some(RetryAction)` to retry, or `None` to stop.
204    fn retry_after(&self, request: &RetryRequest, reason: &RetryReason) -> Option<RetryAction>;
205}
206
207/// Metadata about a request that is being considered for retry.
208#[derive(Clone, Debug)]
209pub struct RetryRequest {
210    pub(crate) operation: &'static str,
211    /// Whether the operation is idempotent (safe to retry without side effects).
212    pub is_idempotent: bool,
213    /// The number of retry attempts that have already been made.
214    pub retry_attempts: u32,
215    /// The set of reasons this request has been retried so far.
216    pub retry_reasons: HashSet<RetryReason>,
217    pub(crate) unique_id: Option<String>,
218}
219
220impl RetryRequest {
221    pub(crate) fn new(operation: &'static str, is_idempotent: bool) -> Self {
222        Self {
223            operation,
224            is_idempotent,
225            retry_attempts: 0,
226            retry_reasons: Default::default(),
227            unique_id: None,
228        }
229    }
230
231    pub(crate) fn add_retry_attempt(&mut self, reason: RetryReason) {
232        self.retry_attempts += 1;
233        tracing::Span::current().record(SPAN_ATTRIB_RETRIES, self.retry_attempts);
234        self.retry_reasons.insert(reason);
235    }
236
237    pub fn is_idempotent(&self) -> bool {
238        self.is_idempotent
239    }
240
241    pub fn retry_attempts(&self) -> u32 {
242        self.retry_attempts
243    }
244
245    pub fn retry_reasons(&self) -> &HashSet<RetryReason> {
246        &self.retry_reasons
247    }
248}
249
250impl Display for RetryRequest {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        write!(
253            f,
254            "{{ operation: {}, id: {}, is_idempotent: {}, retry_attempts: {}, retry_reasons: {} }}",
255            self.operation,
256            self.unique_id.as_ref().unwrap_or(&"".to_string()),
257            self.is_idempotent,
258            self.retry_attempts,
259            self.retry_reasons
260                .iter()
261                .map(|r| r.to_string())
262                .collect::<Vec<_>>()
263                .join(", ")
264        )
265    }
266}
267
268pub struct RetryManager {
269    err_map_component: Arc<ErrMapComponent>,
270}
271
272impl RetryManager {
273    pub fn new(err_map_component: Arc<ErrMapComponent>) -> Self {
274        Self { err_map_component }
275    }
276
277    pub async fn maybe_retry(
278        &self,
279        strategy: Arc<dyn RetryStrategy>,
280        request: &mut RetryRequest,
281        reason: RetryReason,
282    ) -> Option<Duration> {
283        if reason.always_retry() {
284            request.add_retry_attempt(reason);
285            let backoff = controlled_backoff(request.retry_attempts);
286
287            return Some(backoff);
288        }
289
290        let action = strategy.retry_after(request, &reason);
291
292        if let Some(a) = action {
293            request.add_retry_attempt(reason);
294
295            return Some(a.duration);
296        }
297
298        None
299    }
300}
301
302pub(crate) async fn orchestrate_retries<Fut, Resp>(
303    rs: Arc<RetryManager>,
304    strategy: Arc<dyn RetryStrategy>,
305    mut retry_info: RetryRequest,
306    operation: impl Fn() -> Fut + Send + Sync,
307) -> error::Result<Resp>
308where
309    Fut: Future<Output = error::Result<Resp>> + Send,
310    Resp: Send,
311{
312    loop {
313        let mut err = match operation().await {
314            Ok(r) => {
315                return Ok(r);
316            }
317            Err(e) => e,
318        };
319
320        if let Some(reason) = error_to_retry_reason(&rs, &mut retry_info, &err) {
321            if let Some(duration) = rs
322                .maybe_retry(strategy.clone(), &mut retry_info, reason)
323                .await
324            {
325                debug!(
326                    "Retrying {} after {:?} due to {}",
327                    &retry_info, duration, reason
328                );
329                sleep(duration).await;
330                continue;
331            }
332        }
333
334        if retry_info.retry_attempts > 0 {
335            // If we aren't retrying then attach any retry info that we have.
336            err.set_retry_info(retry_info);
337        }
338
339        return Err(err);
340    }
341}
342
343pub(crate) fn error_to_retry_reason(
344    rs: &Arc<RetryManager>,
345    retry_info: &mut RetryRequest,
346    err: &Error,
347) -> Option<RetryReason> {
348    match err.kind() {
349        ErrorKind::Memdx(err) => {
350            retry_info.unique_id = err.has_opaque().map(|o| o.to_string());
351
352            match err.kind() {
353                Server(e) => return server_error_to_retry_reason(rs, e),
354                Resource(e) => return server_error_to_retry_reason(rs, e.cause()),
355                Cancelled(e) => {
356                    if e == &CancellationErrorKind::ClosedInFlight {
357                        return Some(RetryReason::SocketClosedWhileInFlight);
358                    }
359                }
360                Dispatch { .. } => return Some(RetryReason::SocketNotAvailable),
361                _ => {}
362            }
363        }
364        ErrorKind::NoVbucketMap => {
365            return Some(RetryReason::KvInvalidVbucketMap);
366        }
367        ErrorKind::ServiceNotAvailable { .. } => {
368            return Some(RetryReason::ServiceNotAvailable);
369        }
370        ErrorKind::Query(e) => match e.kind() {
371            queryx::error::ErrorKind::Server(e) => match e.kind() {
372                queryx::error::ServerErrorKind::PreparedStatementFailure => {
373                    return Some(RetryReason::QueryPreparedStatementFailure);
374                }
375                queryx::error::ServerErrorKind::IndexNotFound => {
376                    return Some(RetryReason::QueryIndexNotFound);
377                }
378                _ => {}
379            },
380            queryx::error::ErrorKind::Http { error, .. } => match error.kind() {
381                httpx::error::ErrorKind::SendRequest(_) => {
382                    return Some(RetryReason::HttpSendRequestFailed);
383                }
384                httpx::error::ErrorKind::Connect { .. } => {
385                    return Some(RetryReason::HttpConnectFailed);
386                }
387                _ => {}
388            },
389            _ => {}
390        },
391        ErrorKind::Search(e) => match e.kind() {
392            searchx::error::ErrorKind::Server(e) => {
393                if e.status_code() == 429 {
394                    return Some(RetryReason::SearchTooManyRequests);
395                }
396            }
397            searchx::error::ErrorKind::Http { error, .. } => match error.kind() {
398                httpx::error::ErrorKind::SendRequest(_) => {
399                    return Some(RetryReason::HttpSendRequestFailed);
400                }
401                httpx::error::ErrorKind::Connect { .. } => {
402                    return Some(RetryReason::HttpConnectFailed);
403                }
404                _ => {}
405            },
406            _ => {}
407        },
408        ErrorKind::Analytics(e) => {
409            if let analyticsx::error::ErrorKind::Http { error, .. } = e.kind() {
410                match error.kind() {
411                    httpx::error::ErrorKind::SendRequest(_) => {
412                        return Some(RetryReason::HttpSendRequestFailed);
413                    }
414                    httpx::error::ErrorKind::Connect { .. } => {
415                        return Some(RetryReason::HttpConnectFailed);
416                    }
417                    _ => {}
418                }
419            }
420        }
421        ErrorKind::Mgmt(e) => {
422            if let mgmtx::error::ErrorKind::Http(error) = e.kind() {
423                match error.kind() {
424                    httpx::error::ErrorKind::SendRequest(_) => {
425                        return Some(RetryReason::HttpSendRequestFailed);
426                    }
427                    httpx::error::ErrorKind::Connect { .. } => {
428                        return Some(RetryReason::HttpConnectFailed);
429                    }
430                    _ => {}
431                }
432            }
433        }
434        _ => {}
435    }
436
437    None
438}
439
440fn server_error_to_retry_reason(rs: &Arc<RetryManager>, e: &ServerError) -> Option<RetryReason> {
441    match e.kind() {
442        ServerErrorKind::NotMyVbucket => {
443            return Some(RetryReason::KvNotMyVbucket);
444        }
445        ServerErrorKind::TmpFail => {
446            return Some(RetryReason::KvTemporaryFailure);
447        }
448        ServerErrorKind::UnknownCollectionID => {
449            return Some(RetryReason::KvCollectionOutdated);
450        }
451        ServerErrorKind::UnknownCollectionName => {
452            return Some(RetryReason::KvCollectionOutdated);
453        }
454        ServerErrorKind::UnknownScopeName => {
455            return Some(RetryReason::KvCollectionOutdated);
456        }
457        ServerErrorKind::Locked => {
458            return Some(RetryReason::KvLocked);
459        }
460        ServerErrorKind::SyncWriteInProgress => {
461            return Some(RetryReason::KvSyncWriteInProgress);
462        }
463        ServerErrorKind::SyncWriteRecommitInProgress => {
464            return Some(RetryReason::KvSyncWriteRecommitInProgress);
465        }
466        ServerErrorKind::UnknownStatus { status } => {
467            if rs.err_map_component.should_retry(status) {
468                return Some(RetryReason::KvErrorMapRetryIndicated);
469            }
470        }
471        _ => {}
472    }
473
474    None
475}
476
477pub(crate) fn controlled_backoff(retry_attempts: u32) -> Duration {
478    match retry_attempts {
479        0 => Duration::from_millis(1),
480        1 => Duration::from_millis(10),
481        2 => Duration::from_millis(50),
482        3 => Duration::from_millis(100),
483        4 => Duration::from_millis(500),
484        _ => Duration::from_millis(1000),
485    }
486}