Skip to main content

qubit_cas/executor/
cas_builder.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Builder for [`crate::CasExecutor`].
11
12use std::marker::PhantomData;
13use std::time::Duration;
14
15use qubit_error::BoxError;
16use qubit_retry::{RetryConfigError, RetryDelay, RetryJitter, RetryOptions};
17
18use crate::observability::{CasObservabilityConfig, ContentionThresholds, ListenerPanicPolicy};
19use crate::options::CasTimeoutPolicy;
20use crate::strategy::CasStrategy;
21
22use super::cas_executor::CasExecutor;
23
24/// Builder for [`CasExecutor`](crate::CasExecutor).
25pub struct CasBuilder<T, E = BoxError> {
26    /// Maximum attempts, including the initial attempt.
27    max_attempts: u32,
28    /// Maximum cumulative user operation time for the retry flow.
29    max_operation_elapsed: Option<Duration>,
30    /// Maximum monotonic elapsed time for the whole retry flow.
31    max_total_elapsed: Option<Duration>,
32    /// Base retry delay strategy.
33    delay: RetryDelay,
34    /// Jitter strategy applied to the base delay.
35    jitter: RetryJitter,
36    /// Optional async attempt timeout.
37    attempt_timeout: Option<Duration>,
38    /// Policy used when one attempt exceeds the timeout.
39    timeout_policy: CasTimeoutPolicy,
40    /// Observability settings.
41    observability: CasObservabilityConfig,
42    /// Stored validation error for zero max attempts.
43    max_attempts_error: Option<RetryConfigError>,
44    /// Marker preserving the executor type parameters.
45    marker: PhantomData<fn() -> (T, E)>,
46}
47
48impl<T, E> CasBuilder<T, E> {
49    /// Creates a builder with default retry options.
50    ///
51    /// # Returns
52    /// A [`CasBuilder`] using [`RetryOptions::default`].
53    pub fn new() -> Self {
54        let options = RetryOptions::default();
55        Self {
56            max_attempts: options.max_attempts(),
57            max_operation_elapsed: options.max_operation_elapsed(),
58            max_total_elapsed: options.max_total_elapsed(),
59            delay: options.delay().clone(),
60            jitter: options.jitter(),
61            attempt_timeout: None,
62            timeout_policy: CasTimeoutPolicy::Retry,
63            observability: CasObservabilityConfig::default(),
64            max_attempts_error: None,
65            marker: PhantomData,
66        }
67    }
68
69    /// Replaces builder settings from an existing retry option snapshot.
70    ///
71    /// # Parameters
72    /// - `options`: Retry option snapshot to install.
73    ///
74    /// # Returns
75    /// The updated builder.
76    pub fn options(mut self, options: RetryOptions) -> Self {
77        self.max_attempts = options.max_attempts();
78        self.max_operation_elapsed = options.max_operation_elapsed();
79        self.max_total_elapsed = options.max_total_elapsed();
80        self.delay = options.delay().clone();
81        self.jitter = options.jitter();
82        self.max_attempts_error = None;
83        self
84    }
85
86    /// Sets the maximum total attempts.
87    ///
88    /// # Parameters
89    /// - `max_attempts`: Maximum attempts, including the initial attempt.
90    ///
91    /// # Returns
92    /// The updated builder.
93    pub fn max_attempts(mut self, max_attempts: u32) -> Self {
94        if max_attempts == 0 {
95            self.max_attempts_error = Some(RetryConfigError::invalid_value(
96                qubit_retry::constants::KEY_MAX_ATTEMPTS,
97                "max_attempts must be greater than zero",
98            ));
99        } else {
100            self.max_attempts = max_attempts;
101            self.max_attempts_error = None;
102        }
103        self
104    }
105
106    /// Sets the maximum retries after the initial attempt.
107    ///
108    /// # Parameters
109    /// - `max_retries`: Maximum retries after the first attempt.
110    ///
111    /// # Returns
112    /// The updated builder.
113    #[inline]
114    pub fn max_retries(self, max_retries: u32) -> Self {
115        self.max_attempts(max_retries.saturating_add(1))
116    }
117
118    /// Sets the maximum cumulative user operation elapsed-time budget.
119    ///
120    /// # Parameters
121    /// - `max_operation_elapsed`: Optional cumulative user operation time budget.
122    ///
123    /// # Returns
124    /// The updated builder.
125    #[inline]
126    pub fn max_operation_elapsed(mut self, max_operation_elapsed: Option<Duration>) -> Self {
127        self.max_operation_elapsed = max_operation_elapsed;
128        self
129    }
130
131    /// Sets the maximum monotonic elapsed-time budget for the whole retry flow.
132    ///
133    /// # Parameters
134    /// - `max_total_elapsed`: Optional total retry-flow time budget.
135    ///
136    /// # Returns
137    /// The updated builder.
138    #[inline]
139    pub fn max_total_elapsed(mut self, max_total_elapsed: Option<Duration>) -> Self {
140        self.max_total_elapsed = max_total_elapsed;
141        self
142    }
143
144    /// Sets the base retry delay strategy.
145    ///
146    /// # Parameters
147    /// - `delay`: Delay strategy used between failed attempts.
148    ///
149    /// # Returns
150    /// The updated builder.
151    #[inline]
152    pub fn delay(mut self, delay: RetryDelay) -> Self {
153        self.delay = delay;
154        self
155    }
156
157    /// Uses immediate retries with no sleep.
158    ///
159    /// # Returns
160    /// The updated builder.
161    #[inline]
162    pub fn no_delay(self) -> Self {
163        self.delay(RetryDelay::none())
164    }
165
166    /// Uses one fixed retry delay.
167    ///
168    /// # Parameters
169    /// - `delay`: Delay slept before each retry.
170    ///
171    /// # Returns
172    /// The updated builder.
173    #[inline]
174    pub fn fixed_delay(self, delay: Duration) -> Self {
175        self.delay(RetryDelay::fixed(delay))
176    }
177
178    /// Uses one random retry delay range.
179    ///
180    /// # Parameters
181    /// - `min`: Inclusive minimum delay.
182    /// - `max`: Inclusive maximum delay.
183    ///
184    /// # Returns
185    /// The updated builder.
186    #[inline]
187    pub fn random_delay(self, min: Duration, max: Duration) -> Self {
188        self.delay(RetryDelay::random(min, max))
189    }
190
191    /// Uses exponential backoff with multiplier `2.0`.
192    ///
193    /// # Parameters
194    /// - `initial`: Initial retry delay.
195    /// - `max`: Maximum retry delay.
196    ///
197    /// # Returns
198    /// The updated builder.
199    #[inline]
200    pub fn exponential_backoff(self, initial: Duration, max: Duration) -> Self {
201        self.exponential_backoff_with_multiplier(initial, max, 2.0)
202    }
203
204    /// Uses exponential backoff with a custom multiplier.
205    ///
206    /// # Parameters
207    /// - `initial`: Initial retry delay.
208    /// - `max`: Maximum retry delay.
209    /// - `multiplier`: Multiplier applied after each failed attempt.
210    ///
211    /// # Returns
212    /// The updated builder.
213    #[inline]
214    pub fn exponential_backoff_with_multiplier(
215        self,
216        initial: Duration,
217        max: Duration,
218        multiplier: f64,
219    ) -> Self {
220        self.delay(RetryDelay::exponential(initial, max, multiplier))
221    }
222
223    /// Sets the jitter strategy.
224    ///
225    /// # Parameters
226    /// - `jitter`: Jitter strategy applied to retry delays.
227    ///
228    /// # Returns
229    /// The updated builder.
230    #[inline]
231    pub fn jitter(mut self, jitter: RetryJitter) -> Self {
232        self.jitter = jitter;
233        self
234    }
235
236    /// Sets relative jitter by factor.
237    ///
238    /// # Parameters
239    /// - `factor`: Relative jitter factor in `[0.0, 1.0]`.
240    ///
241    /// # Returns
242    /// The updated builder.
243    #[inline]
244    pub fn jitter_factor(self, factor: f64) -> Self {
245        self.jitter(RetryJitter::factor(factor))
246    }
247
248    /// Sets the async per-attempt timeout.
249    ///
250    /// # Parameters
251    /// - `attempt_timeout`: Timeout applied to each async CAS attempt.
252    ///
253    /// # Returns
254    /// The updated builder.
255    #[inline]
256    pub fn attempt_timeout(mut self, attempt_timeout: Option<Duration>) -> Self {
257        self.attempt_timeout = attempt_timeout;
258        self
259    }
260
261    /// Retries attempts that exceed the configured timeout.
262    ///
263    /// # Returns
264    /// The updated builder.
265    #[inline]
266    pub fn retry_on_timeout(mut self) -> Self {
267        self.timeout_policy = CasTimeoutPolicy::Retry;
268        self
269    }
270
271    /// Aborts the CAS flow when one attempt exceeds the timeout.
272    ///
273    /// # Returns
274    /// The updated builder.
275    #[inline]
276    pub fn abort_on_timeout(mut self) -> Self {
277        self.timeout_policy = CasTimeoutPolicy::Abort;
278        self
279    }
280
281    /// Applies a built-in CAS strategy to this builder.
282    ///
283    /// # Parameters
284    /// - `strategy`: Strategy profile to install.
285    ///
286    /// # Returns
287    /// The updated builder.
288    pub fn strategy(self, strategy: CasStrategy) -> Self {
289        let profile = strategy.profile();
290        let builder = self
291            .max_attempts(profile.max_attempts())
292            .max_operation_elapsed(Some(profile.max_operation_elapsed()))
293            .max_total_elapsed(profile.max_total_elapsed());
294        if let Some((initial, max, jitter)) = strategy.backoff() {
295            builder
296                .exponential_backoff(initial, max)
297                .jitter_factor(jitter)
298        } else {
299            builder.no_delay()
300        }
301    }
302
303    /// Installs observability configuration.
304    ///
305    /// # Parameters
306    /// - `observability`: Observability settings to use.
307    ///
308    /// # Returns
309    /// The updated builder.
310    #[inline]
311    pub fn observability(mut self, observability: CasObservabilityConfig) -> Self {
312        self.observability = observability;
313        self
314    }
315
316    /// Enables contention alerting with the supplied thresholds.
317    ///
318    /// # Parameters
319    /// - `thresholds`: Thresholds used to classify hot contention.
320    ///
321    /// # Returns
322    /// The updated builder.
323    #[inline]
324    pub fn alert_on_contention(mut self, thresholds: ContentionThresholds) -> Self {
325        self.observability = self.observability.with_contention_thresholds(thresholds);
326        self
327    }
328
329    /// Enables retry-layer listener panic isolation.
330    ///
331    /// # Returns
332    /// The updated builder.
333    #[inline]
334    pub fn isolate_listener_panics(mut self) -> Self {
335        self.observability = self
336            .observability
337            .with_listener_panic_policy(ListenerPanicPolicy::Isolate);
338        self
339    }
340
341    /// Builds one executor after validating the settings.
342    ///
343    /// # Returns
344    /// A validated [`CasExecutor`].
345    ///
346    /// # Errors
347    /// Returns [`RetryConfigError`] when the configured retry settings are
348    /// invalid.
349    pub fn build(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
350        if let Some(error) = self.max_attempts_error {
351            return Err(error);
352        }
353        let options = RetryOptions::new(
354            self.max_attempts,
355            self.max_operation_elapsed,
356            self.max_total_elapsed,
357            self.delay,
358            self.jitter,
359        )?;
360        Ok(CasExecutor::new(
361            options,
362            self.attempt_timeout,
363            self.timeout_policy,
364            self.observability,
365        ))
366    }
367
368    /// Builds one executor with the contention-adaptive strategy.
369    ///
370    /// # Returns
371    /// A configured [`CasExecutor`] suitable for contended writers.
372    pub fn build_contention_adaptive(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
373        self.strategy(CasStrategy::ContentionAdaptive).build()
374    }
375
376    /// Builds one executor with the latency-first strategy.
377    ///
378    /// # Returns
379    /// A configured [`CasExecutor`] optimized for low latency.
380    pub fn build_latency_first(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
381        self.strategy(CasStrategy::LatencyFirst).build()
382    }
383
384    /// Builds one executor with the reliability-first strategy.
385    ///
386    /// # Returns
387    /// A configured [`CasExecutor`] optimized for long retry windows.
388    pub fn build_reliability_first(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
389        self.strategy(CasStrategy::ReliabilityFirst).build()
390    }
391}
392
393impl<T, E> Default for CasBuilder<T, E> {
394    /// Creates a default CAS builder.
395    ///
396    /// # Returns
397    /// A builder equivalent to [`CasBuilder::new`].
398    #[inline]
399    fn default() -> Self {
400        Self::new()
401    }
402}