couchbase_core/
retry.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use std::collections::HashSet;
20use std::fmt::{Debug, Display};
21use std::future::Future;
22use std::sync::Arc;
23use std::time::Duration;
24
25use crate::errmapcomponent::ErrMapComponent;
26use crate::error::{Error, ErrorKind};
27use crate::memdx::error::ErrorKind::{Cancelled, Dispatch, Resource, Server};
28use crate::memdx::error::{CancellationErrorKind, ServerError, ServerErrorKind};
29use crate::retrybesteffort::controlled_backoff;
30use crate::retryfailfast::FailFastRetryStrategy;
31use crate::{error, queryx, searchx};
32use async_trait::async_trait;
33use log::{debug, info};
34use tokio::time::sleep;
35
36lazy_static! {
37    pub(crate) static ref DEFAULT_RETRY_STRATEGY: Arc<dyn RetryStrategy> =
38        Arc::new(FailFastRetryStrategy::default());
39}
40
41#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
42#[non_exhaustive]
43pub enum RetryReason {
44    KvNotMyVbucket,
45    KvInvalidVbucketMap,
46    KvTemporaryFailure,
47    KvCollectionOutdated,
48    KvErrorMapRetryIndicated,
49    KvLocked,
50    KvSyncWriteInProgress,
51    KvSyncWriteRecommitInProgress,
52    ServiceNotAvailable,
53    SocketClosedWhileInFlight,
54    SocketNotAvailable,
55    QueryPreparedStatementFailure,
56    QueryIndexNotFound,
57    SearchTooManyRequests,
58    NotReady,
59}
60
61impl RetryReason {
62    pub fn allows_non_idempotent_retry(&self) -> bool {
63        matches!(
64            self,
65            RetryReason::KvInvalidVbucketMap
66                | RetryReason::KvNotMyVbucket
67                | RetryReason::KvTemporaryFailure
68                | RetryReason::KvCollectionOutdated
69                | RetryReason::KvErrorMapRetryIndicated
70                | RetryReason::KvLocked
71                | RetryReason::ServiceNotAvailable
72                | RetryReason::SocketNotAvailable
73                | RetryReason::KvSyncWriteInProgress
74                | RetryReason::KvSyncWriteRecommitInProgress
75                | RetryReason::QueryPreparedStatementFailure
76                | RetryReason::QueryIndexNotFound
77                | RetryReason::SearchTooManyRequests
78                | RetryReason::NotReady
79        )
80    }
81
82    pub fn always_retry(&self) -> bool {
83        match self {
84            RetryReason::KvInvalidVbucketMap => true,
85            RetryReason::KvNotMyVbucket => true,
86            RetryReason::KvTemporaryFailure => false,
87            RetryReason::KvCollectionOutdated => true,
88            RetryReason::KvErrorMapRetryIndicated => false,
89            RetryReason::KvLocked => false,
90            RetryReason::NotReady => false,
91            _ => false,
92        }
93    }
94}
95
96impl Display for RetryReason {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        match self {
99            RetryReason::KvNotMyVbucket => write!(f, "KV_NOT_MY_VBUCKET"),
100            RetryReason::KvInvalidVbucketMap => write!(f, "KV_INVALID_VBUCKET_MAP"),
101            RetryReason::KvTemporaryFailure => write!(f, "KV_TEMPORARY_FAILURE"),
102            RetryReason::KvCollectionOutdated => write!(f, "KV_COLLECTION_OUTDATED"),
103            RetryReason::KvErrorMapRetryIndicated => write!(f, "KV_ERROR_MAP_RETRY_INDICATED"),
104            RetryReason::KvLocked => write!(f, "KV_LOCKED"),
105            RetryReason::ServiceNotAvailable => write!(f, "SERVICE_NOT_AVAILABLE"),
106            RetryReason::SocketClosedWhileInFlight => write!(f, "SOCKET_CLOSED_WHILE_IN_FLIGHT"),
107            RetryReason::SocketNotAvailable => write!(f, "SOCKET_NOT_AVAILABLE"),
108            RetryReason::KvSyncWriteInProgress => write!(f, "KV_SYNC_WRITE_IN_PROGRESS"),
109            RetryReason::KvSyncWriteRecommitInProgress => {
110                write!(f, "KV_SYNC_WRITE_RECOMMIT_IN_PROGRESS")
111            }
112            RetryReason::QueryPreparedStatementFailure => {
113                write!(f, "QUERY_PREPARED_STATEMENT_FAILURE")
114            }
115            RetryReason::QueryIndexNotFound => write!(f, "QUERY_INDEX_NOT_FOUND"),
116            RetryReason::SearchTooManyRequests => write!(f, "SEARCH_TOO_MANY_REQUESTS"),
117            RetryReason::NotReady => write!(f, "NOT_READY"),
118        }
119    }
120}
121
122pub struct RetryManager {
123    err_map_component: Arc<ErrMapComponent>,
124}
125
126impl RetryManager {
127    pub fn new(err_map_component: Arc<ErrMapComponent>) -> Self {
128        Self { err_map_component }
129    }
130
131    pub async fn maybe_retry(
132        &self,
133        request: &mut RetryInfo,
134        reason: RetryReason,
135    ) -> Option<Duration> {
136        if reason.always_retry() {
137            request.add_retry_attempt(reason);
138            let backoff = controlled_backoff(request.retry_attempts);
139
140            return Some(backoff);
141        }
142
143        let strategy = request.retry_strategy();
144        let action = strategy.retry_after(request, &reason).await;
145
146        if let Some(a) = action {
147            request.add_retry_attempt(reason);
148
149            return Some(a.duration);
150        }
151
152        None
153    }
154}
155
156#[derive(Clone, Debug)]
157pub struct RetryAction {
158    pub duration: Duration,
159}
160
161impl RetryAction {
162    pub fn new(duration: Duration) -> Self {
163        Self { duration }
164    }
165}
166
167#[async_trait]
168pub trait RetryStrategy: Debug + Send + Sync {
169    async fn retry_after(&self, request: &RetryInfo, reason: &RetryReason) -> Option<RetryAction>;
170}
171
172#[derive(Clone, Debug)]
173pub struct RetryInfo {
174    pub(crate) operation: &'static str,
175    pub(crate) is_idempotent: bool,
176    pub(crate) retry_strategy: Arc<dyn RetryStrategy>,
177    pub(crate) retry_attempts: u32,
178    pub(crate) retry_reasons: HashSet<RetryReason>,
179    pub(crate) unique_id: Option<String>,
180}
181
182impl RetryInfo {
183    pub(crate) fn new(
184        operation: &'static str,
185        is_idempotent: bool,
186        retry_strategy: Arc<dyn RetryStrategy>,
187    ) -> Self {
188        Self {
189            operation,
190            is_idempotent,
191            retry_strategy,
192            retry_attempts: 0,
193            retry_reasons: Default::default(),
194            unique_id: None,
195        }
196    }
197
198    pub(crate) fn add_retry_attempt(&mut self, reason: RetryReason) {
199        self.retry_attempts += 1;
200        self.retry_reasons.insert(reason);
201    }
202
203    pub fn is_idempotent(&self) -> bool {
204        self.is_idempotent
205    }
206
207    pub fn retry_strategy(&self) -> &Arc<dyn RetryStrategy> {
208        &self.retry_strategy
209    }
210
211    pub fn retry_attempts(&self) -> u32 {
212        self.retry_attempts
213    }
214
215    pub fn retry_reasons(&self) -> &HashSet<RetryReason> {
216        &self.retry_reasons
217    }
218}
219
220impl Display for RetryInfo {
221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222        write!(
223            f,
224            "{{ operation: {}, id: {}, is_idempotent: {}, retry_attempts: {}, retry_reasons: {} }}",
225            self.operation,
226            self.unique_id.as_ref().unwrap_or(&"".to_string()),
227            self.is_idempotent,
228            self.retry_attempts,
229            self.retry_reasons
230                .iter()
231                .map(|r| r.to_string())
232                .collect::<Vec<_>>()
233                .join(", ")
234        )
235    }
236}
237
238pub(crate) async fn orchestrate_retries<Fut, Resp>(
239    rs: Arc<RetryManager>,
240    mut retry_info: RetryInfo,
241    operation: impl Fn() -> Fut + Send + Sync,
242) -> error::Result<Resp>
243where
244    Fut: Future<Output = error::Result<Resp>> + Send,
245    Resp: Send,
246{
247    loop {
248        let mut err = match operation().await {
249            Ok(r) => {
250                return Ok(r);
251            }
252            Err(e) => e,
253        };
254
255        if let Some(reason) = error_to_retry_reason(&rs, &mut retry_info, &err) {
256            if let Some(duration) = rs.maybe_retry(&mut retry_info, reason).await {
257                debug!(
258                    "Retrying {} after {:?} due to {}",
259                    &retry_info, duration, reason
260                );
261                sleep(duration).await;
262                continue;
263            }
264        }
265
266        if retry_info.retry_attempts > 0 {
267            // If we aren't retrying then attach any retry info that we have.
268            err.set_retry_info(retry_info);
269        }
270
271        return Err(err);
272    }
273}
274
275pub(crate) fn error_to_retry_reason(
276    rs: &Arc<RetryManager>,
277    retry_info: &mut RetryInfo,
278    err: &Error,
279) -> Option<RetryReason> {
280    match err.kind() {
281        ErrorKind::Memdx(err) => {
282            retry_info.unique_id = err.has_opaque().map(|o| o.to_string());
283
284            match err.kind() {
285                Server(e) => return server_error_to_retry_reason(rs, e),
286                Resource(e) => return server_error_to_retry_reason(rs, e.cause()),
287                Cancelled(e) => {
288                    if e == &CancellationErrorKind::ClosedInFlight {
289                        return Some(RetryReason::SocketClosedWhileInFlight);
290                    }
291                }
292                Dispatch { .. } => return Some(RetryReason::SocketNotAvailable),
293                _ => {}
294            }
295        }
296        ErrorKind::NoVbucketMap => {
297            return Some(RetryReason::KvInvalidVbucketMap);
298        }
299        ErrorKind::ServiceNotAvailable { .. } => {
300            return Some(RetryReason::ServiceNotAvailable);
301        }
302        ErrorKind::Query(e) => {
303            if let queryx::error::ErrorKind::Server(e) = e.kind() {
304                match e.kind() {
305                    queryx::error::ServerErrorKind::PreparedStatementFailure => {
306                        return Some(RetryReason::QueryPreparedStatementFailure);
307                    }
308                    queryx::error::ServerErrorKind::IndexNotFound => {
309                        return Some(RetryReason::QueryIndexNotFound);
310                    }
311                    _ => {}
312                }
313            }
314        }
315        ErrorKind::Search(e) => {
316            if let searchx::error::ErrorKind::Server(e) = e.kind() {
317                if e.status_code() == 429 {
318                    return Some(RetryReason::SearchTooManyRequests);
319                }
320            }
321        }
322        _ => {}
323    }
324
325    None
326}
327
328fn server_error_to_retry_reason(rs: &Arc<RetryManager>, e: &ServerError) -> Option<RetryReason> {
329    match e.kind() {
330        ServerErrorKind::NotMyVbucket => {
331            return Some(RetryReason::KvNotMyVbucket);
332        }
333        ServerErrorKind::TmpFail => {
334            return Some(RetryReason::KvTemporaryFailure);
335        }
336        ServerErrorKind::UnknownCollectionID => {
337            return Some(RetryReason::KvCollectionOutdated);
338        }
339        ServerErrorKind::UnknownCollectionName => {
340            return Some(RetryReason::KvCollectionOutdated);
341        }
342        ServerErrorKind::Locked => {
343            return Some(RetryReason::KvLocked);
344        }
345        ServerErrorKind::SyncWriteInProgress => {
346            return Some(RetryReason::KvSyncWriteInProgress);
347        }
348        ServerErrorKind::SyncWriteRecommitInProgress => {
349            return Some(RetryReason::KvSyncWriteRecommitInProgress);
350        }
351        ServerErrorKind::UnknownStatus { status } => {
352            if rs.err_map_component.should_retry(status) {
353                return Some(RetryReason::KvErrorMapRetryIndicated);
354            }
355        }
356        _ => {}
357    }
358
359    None
360}