Skip to main content

qubit_cas/executor/
cas_builder.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9//! Builder for [`crate::CasExecutor`].
10
11use std::marker::PhantomData;
12use std::time::Duration;
13
14use qubit_common::BoxError;
15use qubit_retry::{RetryConfigError, RetryDelay, RetryJitter, RetryOptions};
16
17use crate::observability::{CasObservabilityConfig, ContentionThresholds, ListenerPanicPolicy};
18use crate::options::CasTimeoutPolicy;
19use crate::strategy::CasStrategy;
20
21use super::cas_executor::CasExecutor;
22
23/// Builder for [`CasExecutor`](crate::CasExecutor).
24pub struct CasBuilder<T, E = BoxError> {
25    /// Maximum attempts, including the initial attempt.
26    max_attempts: u32,
27    /// Maximum cumulative user operation time for the retry flow.
28    max_operation_elapsed: Option<Duration>,
29    /// Maximum monotonic elapsed time for the whole retry flow.
30    max_total_elapsed: Option<Duration>,
31    /// Base retry delay strategy.
32    delay: RetryDelay,
33    /// Jitter strategy applied to the base delay.
34    jitter: RetryJitter,
35    /// Optional async attempt timeout.
36    attempt_timeout: Option<Duration>,
37    /// Policy used when one attempt exceeds the timeout.
38    timeout_policy: CasTimeoutPolicy,
39    /// Observability settings.
40    observability: CasObservabilityConfig,
41    /// Stored validation error for zero max attempts.
42    max_attempts_error: Option<RetryConfigError>,
43    /// Marker preserving the executor type parameters.
44    marker: PhantomData<fn() -> (T, E)>,
45}
46
47impl<T, E> CasBuilder<T, E> {
48    /// Creates a builder with default retry options.
49    ///
50    /// # Returns
51    /// A [`CasBuilder`] using [`RetryOptions::default`].
52    pub fn new() -> Self {
53        let options = RetryOptions::default();
54        Self {
55            max_attempts: options.max_attempts(),
56            max_operation_elapsed: options.max_operation_elapsed(),
57            max_total_elapsed: options.max_total_elapsed(),
58            delay: options.delay().clone(),
59            jitter: options.jitter(),
60            attempt_timeout: None,
61            timeout_policy: CasTimeoutPolicy::Retry,
62            observability: CasObservabilityConfig::default(),
63            max_attempts_error: None,
64            marker: PhantomData,
65        }
66    }
67
68    /// Replaces builder settings from an existing retry option snapshot.
69    ///
70    /// # Parameters
71    /// - `options`: Retry option snapshot to install.
72    ///
73    /// # Returns
74    /// The updated builder.
75    pub fn options(mut self, options: RetryOptions) -> Self {
76        self.max_attempts = options.max_attempts();
77        self.max_operation_elapsed = options.max_operation_elapsed();
78        self.max_total_elapsed = options.max_total_elapsed();
79        self.delay = options.delay().clone();
80        self.jitter = options.jitter();
81        self.max_attempts_error = None;
82        self
83    }
84
85    /// Sets the maximum total attempts.
86    ///
87    /// # Parameters
88    /// - `max_attempts`: Maximum attempts, including the initial attempt.
89    ///
90    /// # Returns
91    /// The updated builder.
92    pub fn max_attempts(mut self, max_attempts: u32) -> Self {
93        if max_attempts == 0 {
94            self.max_attempts_error = Some(RetryConfigError::invalid_value(
95                qubit_retry::constants::KEY_MAX_ATTEMPTS,
96                "max_attempts must be greater than zero",
97            ));
98        } else {
99            self.max_attempts = max_attempts;
100            self.max_attempts_error = None;
101        }
102        self
103    }
104
105    /// Sets the maximum retries after the initial attempt.
106    ///
107    /// # Parameters
108    /// - `max_retries`: Maximum retries after the first attempt.
109    ///
110    /// # Returns
111    /// The updated builder.
112    #[inline]
113    pub fn max_retries(self, max_retries: u32) -> Self {
114        self.max_attempts(max_retries.saturating_add(1))
115    }
116
117    /// Sets the maximum cumulative user operation elapsed-time budget.
118    ///
119    /// # Parameters
120    /// - `max_operation_elapsed`: Optional cumulative user operation time budget.
121    ///
122    /// # Returns
123    /// The updated builder.
124    #[inline]
125    pub fn max_operation_elapsed(mut self, max_operation_elapsed: Option<Duration>) -> Self {
126        self.max_operation_elapsed = max_operation_elapsed;
127        self
128    }
129
130    /// Sets the maximum monotonic elapsed-time budget for the whole retry flow.
131    ///
132    /// # Parameters
133    /// - `max_total_elapsed`: Optional total retry-flow time budget.
134    ///
135    /// # Returns
136    /// The updated builder.
137    #[inline]
138    pub fn max_total_elapsed(mut self, max_total_elapsed: Option<Duration>) -> Self {
139        self.max_total_elapsed = max_total_elapsed;
140        self
141    }
142
143    /// Sets the base retry delay strategy.
144    ///
145    /// # Parameters
146    /// - `delay`: Delay strategy used between failed attempts.
147    ///
148    /// # Returns
149    /// The updated builder.
150    #[inline]
151    pub fn delay(mut self, delay: RetryDelay) -> Self {
152        self.delay = delay;
153        self
154    }
155
156    /// Uses immediate retries with no sleep.
157    ///
158    /// # Returns
159    /// The updated builder.
160    #[inline]
161    pub fn no_delay(self) -> Self {
162        self.delay(RetryDelay::none())
163    }
164
165    /// Uses one fixed retry delay.
166    ///
167    /// # Parameters
168    /// - `delay`: Delay slept before each retry.
169    ///
170    /// # Returns
171    /// The updated builder.
172    #[inline]
173    pub fn fixed_delay(self, delay: Duration) -> Self {
174        self.delay(RetryDelay::fixed(delay))
175    }
176
177    /// Uses one random retry delay range.
178    ///
179    /// # Parameters
180    /// - `min`: Inclusive minimum delay.
181    /// - `max`: Inclusive maximum delay.
182    ///
183    /// # Returns
184    /// The updated builder.
185    #[inline]
186    pub fn random_delay(self, min: Duration, max: Duration) -> Self {
187        self.delay(RetryDelay::random(min, max))
188    }
189
190    /// Uses exponential backoff with multiplier `2.0`.
191    ///
192    /// # Parameters
193    /// - `initial`: Initial retry delay.
194    /// - `max`: Maximum retry delay.
195    ///
196    /// # Returns
197    /// The updated builder.
198    #[inline]
199    pub fn exponential_backoff(self, initial: Duration, max: Duration) -> Self {
200        self.exponential_backoff_with_multiplier(initial, max, 2.0)
201    }
202
203    /// Uses exponential backoff with a custom multiplier.
204    ///
205    /// # Parameters
206    /// - `initial`: Initial retry delay.
207    /// - `max`: Maximum retry delay.
208    /// - `multiplier`: Multiplier applied after each failed attempt.
209    ///
210    /// # Returns
211    /// The updated builder.
212    #[inline]
213    pub fn exponential_backoff_with_multiplier(
214        self,
215        initial: Duration,
216        max: Duration,
217        multiplier: f64,
218    ) -> Self {
219        self.delay(RetryDelay::exponential(initial, max, multiplier))
220    }
221
222    /// Sets the jitter strategy.
223    ///
224    /// # Parameters
225    /// - `jitter`: Jitter strategy applied to retry delays.
226    ///
227    /// # Returns
228    /// The updated builder.
229    #[inline]
230    pub fn jitter(mut self, jitter: RetryJitter) -> Self {
231        self.jitter = jitter;
232        self
233    }
234
235    /// Sets relative jitter by factor.
236    ///
237    /// # Parameters
238    /// - `factor`: Relative jitter factor in `[0.0, 1.0]`.
239    ///
240    /// # Returns
241    /// The updated builder.
242    #[inline]
243    pub fn jitter_factor(self, factor: f64) -> Self {
244        self.jitter(RetryJitter::factor(factor))
245    }
246
247    /// Sets the async per-attempt timeout.
248    ///
249    /// # Parameters
250    /// - `attempt_timeout`: Timeout applied to each async CAS attempt.
251    ///
252    /// # Returns
253    /// The updated builder.
254    #[inline]
255    pub fn attempt_timeout(mut self, attempt_timeout: Option<Duration>) -> Self {
256        self.attempt_timeout = attempt_timeout;
257        self
258    }
259
260    /// Retries attempts that exceed the configured timeout.
261    ///
262    /// # Returns
263    /// The updated builder.
264    #[inline]
265    pub fn retry_on_timeout(mut self) -> Self {
266        self.timeout_policy = CasTimeoutPolicy::Retry;
267        self
268    }
269
270    /// Aborts the CAS flow when one attempt exceeds the timeout.
271    ///
272    /// # Returns
273    /// The updated builder.
274    #[inline]
275    pub fn abort_on_timeout(mut self) -> Self {
276        self.timeout_policy = CasTimeoutPolicy::Abort;
277        self
278    }
279
280    /// Applies a built-in CAS strategy to this builder.
281    ///
282    /// # Parameters
283    /// - `strategy`: Strategy profile to install.
284    ///
285    /// # Returns
286    /// The updated builder.
287    pub fn strategy(self, strategy: CasStrategy) -> Self {
288        let profile = strategy.profile();
289        let builder = self
290            .max_attempts(profile.max_attempts())
291            .max_operation_elapsed(Some(profile.max_operation_elapsed()))
292            .max_total_elapsed(profile.max_total_elapsed());
293        if let Some((initial, max, jitter)) = strategy.backoff() {
294            builder
295                .exponential_backoff(initial, max)
296                .jitter_factor(jitter)
297        } else {
298            builder.no_delay()
299        }
300    }
301
302    /// Installs observability configuration.
303    ///
304    /// # Parameters
305    /// - `observability`: Observability settings to use.
306    ///
307    /// # Returns
308    /// The updated builder.
309    #[inline]
310    pub fn observability(mut self, observability: CasObservabilityConfig) -> Self {
311        self.observability = observability;
312        self
313    }
314
315    /// Enables contention alerting with the supplied thresholds.
316    ///
317    /// # Parameters
318    /// - `thresholds`: Thresholds used to classify hot contention.
319    ///
320    /// # Returns
321    /// The updated builder.
322    #[inline]
323    pub fn alert_on_contention(mut self, thresholds: ContentionThresholds) -> Self {
324        self.observability = self.observability.with_contention_thresholds(thresholds);
325        self
326    }
327
328    /// Enables retry-layer listener panic isolation.
329    ///
330    /// # Returns
331    /// The updated builder.
332    #[inline]
333    pub fn isolate_listener_panics(mut self) -> Self {
334        self.observability = self
335            .observability
336            .with_listener_panic_policy(ListenerPanicPolicy::Isolate);
337        self
338    }
339
340    /// Builds one executor after validating the settings.
341    ///
342    /// # Returns
343    /// A validated [`CasExecutor`].
344    ///
345    /// # Errors
346    /// Returns [`RetryConfigError`] when the configured retry settings are
347    /// invalid.
348    pub fn build(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
349        if let Some(error) = self.max_attempts_error {
350            return Err(error);
351        }
352        let options = RetryOptions::new(
353            self.max_attempts,
354            self.max_operation_elapsed,
355            self.max_total_elapsed,
356            self.delay,
357            self.jitter,
358        )?;
359        Ok(CasExecutor::new(
360            options,
361            self.attempt_timeout,
362            self.timeout_policy,
363            self.observability,
364        ))
365    }
366
367    /// Builds one executor with the contention-adaptive strategy.
368    ///
369    /// # Returns
370    /// A configured [`CasExecutor`] suitable for contended writers.
371    pub fn build_contention_adaptive(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
372        self.strategy(CasStrategy::ContentionAdaptive).build()
373    }
374
375    /// Builds one executor with the latency-first strategy.
376    ///
377    /// # Returns
378    /// A configured [`CasExecutor`] optimized for low latency.
379    pub fn build_latency_first(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
380        self.strategy(CasStrategy::LatencyFirst).build()
381    }
382
383    /// Builds one executor with the reliability-first strategy.
384    ///
385    /// # Returns
386    /// A configured [`CasExecutor`] optimized for long retry windows.
387    pub fn build_reliability_first(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
388        self.strategy(CasStrategy::ReliabilityFirst).build()
389    }
390}
391
392impl<T, E> Default for CasBuilder<T, E> {
393    /// Creates a default CAS builder.
394    ///
395    /// # Returns
396    /// A builder equivalent to [`CasBuilder::new`].
397    #[inline]
398    fn default() -> Self {
399        Self::new()
400    }
401}