qubit_cas/executor/cas_builder.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9//! Builder for [`crate::CasExecutor`].
10
11use std::marker::PhantomData;
12use std::time::Duration;
13
14use qubit_common::BoxError;
15use qubit_retry::{RetryConfigError, RetryDelay, RetryJitter, RetryOptions};
16
17use crate::observability::{CasObservabilityConfig, ContentionThresholds, ListenerPanicPolicy};
18use crate::options::CasTimeoutPolicy;
19use crate::strategy::CasStrategy;
20
21use super::cas_executor::CasExecutor;
22
23/// Builder for [`CasExecutor`](crate::CasExecutor).
24pub struct CasBuilder<T, E = BoxError> {
25 /// Maximum attempts, including the initial attempt.
26 max_attempts: u32,
27 /// Maximum total elapsed time.
28 max_elapsed: Option<Duration>,
29 /// Base retry delay strategy.
30 delay: RetryDelay,
31 /// Jitter strategy applied to the base delay.
32 jitter: RetryJitter,
33 /// Optional async attempt timeout.
34 attempt_timeout: Option<Duration>,
35 /// Policy used when one attempt exceeds the timeout.
36 timeout_policy: CasTimeoutPolicy,
37 /// Observability settings.
38 observability: CasObservabilityConfig,
39 /// Stored validation error for zero max attempts.
40 max_attempts_error: Option<RetryConfigError>,
41 /// Marker preserving the executor type parameters.
42 marker: PhantomData<fn() -> (T, E)>,
43}
44
45impl<T, E> CasBuilder<T, E> {
46 /// Creates a builder with default retry options.
47 ///
48 /// # Returns
49 /// A [`CasBuilder`] using [`RetryOptions::default`].
50 pub fn new() -> Self {
51 let options = RetryOptions::default();
52 Self {
53 max_attempts: options.max_attempts(),
54 max_elapsed: options.max_elapsed(),
55 delay: options.delay().clone(),
56 jitter: options.jitter(),
57 attempt_timeout: None,
58 timeout_policy: CasTimeoutPolicy::Retry,
59 observability: CasObservabilityConfig::default(),
60 max_attempts_error: None,
61 marker: PhantomData,
62 }
63 }
64
65 /// Replaces builder settings from an existing retry option snapshot.
66 ///
67 /// # Parameters
68 /// - `options`: Retry option snapshot to install.
69 ///
70 /// # Returns
71 /// The updated builder.
72 pub fn options(mut self, options: RetryOptions) -> Self {
73 self.max_attempts = options.max_attempts();
74 self.max_elapsed = options.max_elapsed();
75 self.delay = options.delay().clone();
76 self.jitter = options.jitter();
77 self.max_attempts_error = None;
78 self
79 }
80
81 /// Sets the maximum total attempts.
82 ///
83 /// # Parameters
84 /// - `max_attempts`: Maximum attempts, including the initial attempt.
85 ///
86 /// # Returns
87 /// The updated builder.
88 pub fn max_attempts(mut self, max_attempts: u32) -> Self {
89 if max_attempts == 0 {
90 self.max_attempts_error = Some(RetryConfigError::invalid_value(
91 qubit_retry::constants::KEY_MAX_ATTEMPTS,
92 "max_attempts must be greater than zero",
93 ));
94 } else {
95 self.max_attempts = max_attempts;
96 self.max_attempts_error = None;
97 }
98 self
99 }
100
101 /// Sets the maximum retries after the initial attempt.
102 ///
103 /// # Parameters
104 /// - `max_retries`: Maximum retries after the first attempt.
105 ///
106 /// # Returns
107 /// The updated builder.
108 #[inline]
109 pub fn max_retries(self, max_retries: u32) -> Self {
110 self.max_attempts(max_retries.saturating_add(1))
111 }
112
113 /// Sets the maximum elapsed-time budget.
114 ///
115 /// # Parameters
116 /// - `max_elapsed`: Optional total elapsed-time budget.
117 ///
118 /// # Returns
119 /// The updated builder.
120 #[inline]
121 pub fn max_elapsed(mut self, max_elapsed: Option<Duration>) -> Self {
122 self.max_elapsed = max_elapsed;
123 self
124 }
125
126 /// Sets the base retry delay strategy.
127 ///
128 /// # Parameters
129 /// - `delay`: Delay strategy used between failed attempts.
130 ///
131 /// # Returns
132 /// The updated builder.
133 #[inline]
134 pub fn delay(mut self, delay: RetryDelay) -> Self {
135 self.delay = delay;
136 self
137 }
138
139 /// Uses immediate retries with no sleep.
140 ///
141 /// # Returns
142 /// The updated builder.
143 #[inline]
144 pub fn no_delay(self) -> Self {
145 self.delay(RetryDelay::none())
146 }
147
148 /// Uses one fixed retry delay.
149 ///
150 /// # Parameters
151 /// - `delay`: Delay slept before each retry.
152 ///
153 /// # Returns
154 /// The updated builder.
155 #[inline]
156 pub fn fixed_delay(self, delay: Duration) -> Self {
157 self.delay(RetryDelay::fixed(delay))
158 }
159
160 /// Uses one random retry delay range.
161 ///
162 /// # Parameters
163 /// - `min`: Inclusive minimum delay.
164 /// - `max`: Inclusive maximum delay.
165 ///
166 /// # Returns
167 /// The updated builder.
168 #[inline]
169 pub fn random_delay(self, min: Duration, max: Duration) -> Self {
170 self.delay(RetryDelay::random(min, max))
171 }
172
173 /// Uses exponential backoff with multiplier `2.0`.
174 ///
175 /// # Parameters
176 /// - `initial`: Initial retry delay.
177 /// - `max`: Maximum retry delay.
178 ///
179 /// # Returns
180 /// The updated builder.
181 #[inline]
182 pub fn exponential_backoff(self, initial: Duration, max: Duration) -> Self {
183 self.exponential_backoff_with_multiplier(initial, max, 2.0)
184 }
185
186 /// Uses exponential backoff with a custom multiplier.
187 ///
188 /// # Parameters
189 /// - `initial`: Initial retry delay.
190 /// - `max`: Maximum retry delay.
191 /// - `multiplier`: Multiplier applied after each failed attempt.
192 ///
193 /// # Returns
194 /// The updated builder.
195 #[inline]
196 pub fn exponential_backoff_with_multiplier(
197 self,
198 initial: Duration,
199 max: Duration,
200 multiplier: f64,
201 ) -> Self {
202 self.delay(RetryDelay::exponential(initial, max, multiplier))
203 }
204
205 /// Sets the jitter strategy.
206 ///
207 /// # Parameters
208 /// - `jitter`: Jitter strategy applied to retry delays.
209 ///
210 /// # Returns
211 /// The updated builder.
212 #[inline]
213 pub fn jitter(mut self, jitter: RetryJitter) -> Self {
214 self.jitter = jitter;
215 self
216 }
217
218 /// Sets relative jitter by factor.
219 ///
220 /// # Parameters
221 /// - `factor`: Relative jitter factor in `[0.0, 1.0]`.
222 ///
223 /// # Returns
224 /// The updated builder.
225 #[inline]
226 pub fn jitter_factor(self, factor: f64) -> Self {
227 self.jitter(RetryJitter::factor(factor))
228 }
229
230 /// Sets the async per-attempt timeout.
231 ///
232 /// # Parameters
233 /// - `attempt_timeout`: Timeout applied to each async CAS attempt.
234 ///
235 /// # Returns
236 /// The updated builder.
237 #[inline]
238 pub fn attempt_timeout(mut self, attempt_timeout: Option<Duration>) -> Self {
239 self.attempt_timeout = attempt_timeout;
240 self
241 }
242
243 /// Retries attempts that exceed the configured timeout.
244 ///
245 /// # Returns
246 /// The updated builder.
247 #[inline]
248 pub fn retry_on_timeout(mut self) -> Self {
249 self.timeout_policy = CasTimeoutPolicy::Retry;
250 self
251 }
252
253 /// Aborts the CAS flow when one attempt exceeds the timeout.
254 ///
255 /// # Returns
256 /// The updated builder.
257 #[inline]
258 pub fn abort_on_timeout(mut self) -> Self {
259 self.timeout_policy = CasTimeoutPolicy::Abort;
260 self
261 }
262
263 /// Applies a built-in CAS strategy to this builder.
264 ///
265 /// # Parameters
266 /// - `strategy`: Strategy profile to install.
267 ///
268 /// # Returns
269 /// The updated builder.
270 pub fn strategy(self, strategy: CasStrategy) -> Self {
271 let profile = strategy.profile();
272 let builder = self
273 .max_attempts(profile.max_attempts())
274 .max_elapsed(Some(profile.max_elapsed()));
275 if let Some((initial, max, jitter)) = strategy.backoff() {
276 builder
277 .exponential_backoff(initial, max)
278 .jitter_factor(jitter)
279 } else {
280 builder.no_delay()
281 }
282 }
283
284 /// Installs observability configuration.
285 ///
286 /// # Parameters
287 /// - `observability`: Observability settings to use.
288 ///
289 /// # Returns
290 /// The updated builder.
291 #[inline]
292 pub fn observability(mut self, observability: CasObservabilityConfig) -> Self {
293 self.observability = observability;
294 self
295 }
296
297 /// Enables contention alerting with the supplied thresholds.
298 ///
299 /// # Parameters
300 /// - `thresholds`: Thresholds used to classify hot contention.
301 ///
302 /// # Returns
303 /// The updated builder.
304 #[inline]
305 pub fn alert_on_contention(mut self, thresholds: ContentionThresholds) -> Self {
306 self.observability = self.observability.with_contention_thresholds(thresholds);
307 self
308 }
309
310 /// Enables retry-layer listener panic isolation.
311 ///
312 /// # Returns
313 /// The updated builder.
314 #[inline]
315 pub fn isolate_listener_panics(mut self) -> Self {
316 self.observability = self
317 .observability
318 .with_listener_panic_policy(ListenerPanicPolicy::Isolate);
319 self
320 }
321
322 /// Builds one executor after validating the settings.
323 ///
324 /// # Returns
325 /// A validated [`CasExecutor`].
326 ///
327 /// # Errors
328 /// Returns [`RetryConfigError`] when the configured retry settings are
329 /// invalid.
330 pub fn build(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
331 if let Some(error) = self.max_attempts_error {
332 return Err(error);
333 }
334 let options =
335 RetryOptions::new(self.max_attempts, self.max_elapsed, self.delay, self.jitter)?;
336 Ok(CasExecutor::new(
337 options,
338 self.attempt_timeout,
339 self.timeout_policy,
340 self.observability,
341 ))
342 }
343
344 /// Builds one executor with the contention-adaptive strategy.
345 ///
346 /// # Returns
347 /// A configured [`CasExecutor`] suitable for contended writers.
348 pub fn build_contention_adaptive(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
349 self.strategy(CasStrategy::ContentionAdaptive).build()
350 }
351
352 /// Builds one executor with the latency-first strategy.
353 ///
354 /// # Returns
355 /// A configured [`CasExecutor`] optimized for low latency.
356 pub fn build_latency_first(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
357 self.strategy(CasStrategy::LatencyFirst).build()
358 }
359
360 /// Builds one executor with the reliability-first strategy.
361 ///
362 /// # Returns
363 /// A configured [`CasExecutor`] optimized for long retry windows.
364 pub fn build_reliability_first(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
365 self.strategy(CasStrategy::ReliabilityFirst).build()
366 }
367}
368
369impl<T, E> Default for CasBuilder<T, E> {
370 /// Creates a default CAS builder.
371 ///
372 /// # Returns
373 /// A builder equivalent to [`CasBuilder::new`].
374 #[inline]
375 fn default() -> Self {
376 Self::new()
377 }
378}