Skip to main content

qubit_cas/executor/
cas_builder.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Builder for [`crate::CasExecutor`].
11
12use std::marker::PhantomData;
13use std::time::Duration;
14
15use qubit_error::BoxError;
16use qubit_retry::{
17    RetryConfigError,
18    RetryDelay,
19    RetryJitter,
20    RetryOptions,
21};
22
23use crate::observability::{
24    CasObservabilityConfig,
25    ContentionThresholds,
26    ListenerPanicPolicy,
27};
28use crate::options::CasTimeoutPolicy;
29use crate::strategy::CasStrategy;
30
31use super::cas_executor::CasExecutor;
32
33/// Builder for [`CasExecutor`](crate::CasExecutor).
34pub struct CasBuilder<T, E = BoxError> {
35    /// Maximum attempts, including the initial attempt.
36    max_attempts: u32,
37    /// Maximum cumulative user operation time for the retry flow.
38    max_operation_elapsed: Option<Duration>,
39    /// Maximum monotonic elapsed time for the whole retry flow.
40    max_total_elapsed: Option<Duration>,
41    /// Base retry delay strategy.
42    delay: RetryDelay,
43    /// Jitter strategy applied to the base delay.
44    jitter: RetryJitter,
45    /// Optional async attempt timeout.
46    attempt_timeout: Option<Duration>,
47    /// Policy used when one attempt exceeds the timeout.
48    timeout_policy: CasTimeoutPolicy,
49    /// Observability settings.
50    observability: CasObservabilityConfig,
51    /// Stored validation error for zero max attempts.
52    max_attempts_error: Option<RetryConfigError>,
53    /// Marker preserving the executor type parameters.
54    marker: PhantomData<fn() -> (T, E)>,
55}
56
57impl<T, E> CasBuilder<T, E> {
58    /// Creates a builder with default retry options.
59    ///
60    /// # Returns
61    /// A [`CasBuilder`] using [`RetryOptions::default`].
62    pub fn new() -> Self {
63        let options = RetryOptions::default();
64        Self {
65            max_attempts: options.max_attempts(),
66            max_operation_elapsed: options.max_operation_elapsed(),
67            max_total_elapsed: options.max_total_elapsed(),
68            delay: options.delay().clone(),
69            jitter: options.jitter(),
70            attempt_timeout: None,
71            timeout_policy: CasTimeoutPolicy::Retry,
72            observability: CasObservabilityConfig::default(),
73            max_attempts_error: None,
74            marker: PhantomData,
75        }
76    }
77
78    /// Replaces builder settings from an existing retry option snapshot.
79    ///
80    /// # Parameters
81    /// - `options`: Retry option snapshot to install.
82    ///
83    /// # Returns
84    /// The updated builder.
85    pub fn options(mut self, options: RetryOptions) -> Self {
86        self.max_attempts = options.max_attempts();
87        self.max_operation_elapsed = options.max_operation_elapsed();
88        self.max_total_elapsed = options.max_total_elapsed();
89        self.delay = options.delay().clone();
90        self.jitter = options.jitter();
91        self.max_attempts_error = None;
92        self
93    }
94
95    /// Sets the maximum total attempts.
96    ///
97    /// # Parameters
98    /// - `max_attempts`: Maximum attempts, including the initial attempt.
99    ///
100    /// # Returns
101    /// The updated builder.
102    pub fn max_attempts(mut self, max_attempts: u32) -> Self {
103        if max_attempts == 0 {
104            self.max_attempts_error = Some(RetryConfigError::invalid_value(
105                qubit_retry::constants::KEY_MAX_ATTEMPTS,
106                "max_attempts must be greater than zero",
107            ));
108        } else {
109            self.max_attempts = max_attempts;
110            self.max_attempts_error = None;
111        }
112        self
113    }
114
115    /// Sets the maximum retries after the initial attempt.
116    ///
117    /// # Parameters
118    /// - `max_retries`: Maximum retries after the first attempt.
119    ///
120    /// # Returns
121    /// The updated builder.
122    #[inline]
123    pub fn max_retries(self, max_retries: u32) -> Self {
124        self.max_attempts(max_retries.saturating_add(1))
125    }
126
127    /// Sets the maximum cumulative user operation elapsed-time budget.
128    ///
129    /// # Parameters
130    /// - `max_operation_elapsed`: Optional cumulative user operation time budget.
131    ///
132    /// # Returns
133    /// The updated builder.
134    #[inline]
135    pub fn max_operation_elapsed(mut self, max_operation_elapsed: Option<Duration>) -> Self {
136        self.max_operation_elapsed = max_operation_elapsed;
137        self
138    }
139
140    /// Sets the maximum monotonic elapsed-time budget for the whole retry flow.
141    ///
142    /// # Parameters
143    /// - `max_total_elapsed`: Optional total retry-flow time budget.
144    ///
145    /// # Returns
146    /// The updated builder.
147    #[inline]
148    pub fn max_total_elapsed(mut self, max_total_elapsed: Option<Duration>) -> Self {
149        self.max_total_elapsed = max_total_elapsed;
150        self
151    }
152
153    /// Sets the base retry delay strategy.
154    ///
155    /// # Parameters
156    /// - `delay`: Delay strategy used between failed attempts.
157    ///
158    /// # Returns
159    /// The updated builder.
160    #[inline]
161    pub fn delay(mut self, delay: RetryDelay) -> Self {
162        self.delay = delay;
163        self
164    }
165
166    /// Uses immediate retries with no sleep.
167    ///
168    /// # Returns
169    /// The updated builder.
170    #[inline]
171    pub fn no_delay(self) -> Self {
172        self.delay(RetryDelay::none())
173    }
174
175    /// Uses one fixed retry delay.
176    ///
177    /// # Parameters
178    /// - `delay`: Delay slept before each retry.
179    ///
180    /// # Returns
181    /// The updated builder.
182    #[inline]
183    pub fn fixed_delay(self, delay: Duration) -> Self {
184        self.delay(RetryDelay::fixed(delay))
185    }
186
187    /// Uses one random retry delay range.
188    ///
189    /// # Parameters
190    /// - `min`: Inclusive minimum delay.
191    /// - `max`: Inclusive maximum delay.
192    ///
193    /// # Returns
194    /// The updated builder.
195    #[inline]
196    pub fn random_delay(self, min: Duration, max: Duration) -> Self {
197        self.delay(RetryDelay::random(min, max))
198    }
199
200    /// Uses exponential backoff with multiplier `2.0`.
201    ///
202    /// # Parameters
203    /// - `initial`: Initial retry delay.
204    /// - `max`: Maximum retry delay.
205    ///
206    /// # Returns
207    /// The updated builder.
208    #[inline]
209    pub fn exponential_backoff(self, initial: Duration, max: Duration) -> Self {
210        self.exponential_backoff_with_multiplier(initial, max, 2.0)
211    }
212
213    /// Uses exponential backoff with a custom multiplier.
214    ///
215    /// # Parameters
216    /// - `initial`: Initial retry delay.
217    /// - `max`: Maximum retry delay.
218    /// - `multiplier`: Multiplier applied after each failed attempt.
219    ///
220    /// # Returns
221    /// The updated builder.
222    #[inline]
223    pub fn exponential_backoff_with_multiplier(
224        self,
225        initial: Duration,
226        max: Duration,
227        multiplier: f64,
228    ) -> Self {
229        self.delay(RetryDelay::exponential(initial, max, multiplier))
230    }
231
232    /// Sets the jitter strategy.
233    ///
234    /// # Parameters
235    /// - `jitter`: Jitter strategy applied to retry delays.
236    ///
237    /// # Returns
238    /// The updated builder.
239    #[inline]
240    pub fn jitter(mut self, jitter: RetryJitter) -> Self {
241        self.jitter = jitter;
242        self
243    }
244
245    /// Sets relative jitter by factor.
246    ///
247    /// # Parameters
248    /// - `factor`: Relative jitter factor in `[0.0, 1.0]`.
249    ///
250    /// # Returns
251    /// The updated builder.
252    #[inline]
253    pub fn jitter_factor(self, factor: f64) -> Self {
254        self.jitter(RetryJitter::factor(factor))
255    }
256
257    /// Sets the async per-attempt timeout.
258    ///
259    /// # Parameters
260    /// - `attempt_timeout`: Timeout applied to each async CAS attempt.
261    ///
262    /// # Returns
263    /// The updated builder.
264    #[inline]
265    pub fn attempt_timeout(mut self, attempt_timeout: Option<Duration>) -> Self {
266        self.attempt_timeout = attempt_timeout;
267        self
268    }
269
270    /// Retries attempts that exceed the configured timeout.
271    ///
272    /// # Returns
273    /// The updated builder.
274    #[inline]
275    pub fn retry_on_timeout(mut self) -> Self {
276        self.timeout_policy = CasTimeoutPolicy::Retry;
277        self
278    }
279
280    /// Aborts the CAS flow when one attempt exceeds the timeout.
281    ///
282    /// # Returns
283    /// The updated builder.
284    #[inline]
285    pub fn abort_on_timeout(mut self) -> Self {
286        self.timeout_policy = CasTimeoutPolicy::Abort;
287        self
288    }
289
290    /// Applies a built-in CAS strategy to this builder.
291    ///
292    /// # Parameters
293    /// - `strategy`: Strategy profile to install.
294    ///
295    /// # Returns
296    /// The updated builder.
297    pub fn strategy(self, strategy: CasStrategy) -> Self {
298        let profile = strategy.profile();
299        let builder = self
300            .max_attempts(profile.max_attempts())
301            .max_operation_elapsed(Some(profile.max_operation_elapsed()))
302            .max_total_elapsed(profile.max_total_elapsed());
303        if let Some((initial, max, jitter)) = strategy.backoff() {
304            builder
305                .exponential_backoff(initial, max)
306                .jitter_factor(jitter)
307        } else {
308            builder.no_delay()
309        }
310    }
311
312    /// Installs observability configuration.
313    ///
314    /// # Parameters
315    /// - `observability`: Observability settings to use.
316    ///
317    /// # Returns
318    /// The updated builder.
319    #[inline]
320    pub fn observability(mut self, observability: CasObservabilityConfig) -> Self {
321        self.observability = observability;
322        self
323    }
324
325    /// Enables contention alerting with the supplied thresholds.
326    ///
327    /// # Parameters
328    /// - `thresholds`: Thresholds used to classify hot contention.
329    ///
330    /// # Returns
331    /// The updated builder.
332    #[inline]
333    pub fn alert_on_contention(mut self, thresholds: ContentionThresholds) -> Self {
334        self.observability = self.observability.with_contention_thresholds(thresholds);
335        self
336    }
337
338    /// Enables retry-layer listener panic isolation.
339    ///
340    /// # Returns
341    /// The updated builder.
342    #[inline]
343    pub fn isolate_listener_panics(mut self) -> Self {
344        self.observability = self
345            .observability
346            .with_listener_panic_policy(ListenerPanicPolicy::Isolate);
347        self
348    }
349
350    /// Builds one executor after validating the settings.
351    ///
352    /// # Returns
353    /// A validated [`CasExecutor`].
354    ///
355    /// # Errors
356    /// Returns [`RetryConfigError`] when the configured retry settings are
357    /// invalid.
358    pub fn build(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
359        if let Some(error) = self.max_attempts_error {
360            return Err(error);
361        }
362        let options = RetryOptions::new(
363            self.max_attempts,
364            self.max_operation_elapsed,
365            self.max_total_elapsed,
366            self.delay,
367            self.jitter,
368        )?;
369        Ok(CasExecutor::new(
370            options,
371            self.attempt_timeout,
372            self.timeout_policy,
373            self.observability,
374        ))
375    }
376
377    /// Builds one executor with the contention-adaptive strategy.
378    ///
379    /// # Returns
380    /// A configured [`CasExecutor`] suitable for contended writers.
381    pub fn build_contention_adaptive(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
382        self.strategy(CasStrategy::ContentionAdaptive).build()
383    }
384
385    /// Builds one executor with the latency-first strategy.
386    ///
387    /// # Returns
388    /// A configured [`CasExecutor`] optimized for low latency.
389    pub fn build_latency_first(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
390        self.strategy(CasStrategy::LatencyFirst).build()
391    }
392
393    /// Builds one executor with the reliability-first strategy.
394    ///
395    /// # Returns
396    /// A configured [`CasExecutor`] optimized for long retry windows.
397    pub fn build_reliability_first(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
398        self.strategy(CasStrategy::ReliabilityFirst).build()
399    }
400}
401
402impl<T, E> Default for CasBuilder<T, E> {
403    /// Creates a default CAS builder.
404    ///
405    /// # Returns
406    /// A builder equivalent to [`CasBuilder::new`].
407    #[inline]
408    fn default() -> Self {
409        Self::new()
410    }
411}