qubit_cas/executor/cas_builder.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9//! Builder for [`crate::CasExecutor`].
10
11use std::marker::PhantomData;
12use std::time::Duration;
13
14use qubit_common::BoxError;
15use qubit_retry::{RetryConfigError, RetryDelay, RetryJitter, RetryOptions};
16
17use crate::observability::{CasObservabilityConfig, ContentionThresholds, ListenerPanicPolicy};
18use crate::options::CasTimeoutPolicy;
19use crate::strategy::CasStrategy;
20
21use super::cas_executor::CasExecutor;
22
23/// Builder for [`CasExecutor`](crate::CasExecutor).
24pub struct CasBuilder<T, E = BoxError> {
25 /// Maximum attempts, including the initial attempt.
26 max_attempts: u32,
27 /// Maximum cumulative user operation time for the retry flow.
28 max_operation_elapsed: Option<Duration>,
29 /// Maximum monotonic elapsed time for the whole retry flow.
30 max_total_elapsed: Option<Duration>,
31 /// Base retry delay strategy.
32 delay: RetryDelay,
33 /// Jitter strategy applied to the base delay.
34 jitter: RetryJitter,
35 /// Optional async attempt timeout.
36 attempt_timeout: Option<Duration>,
37 /// Policy used when one attempt exceeds the timeout.
38 timeout_policy: CasTimeoutPolicy,
39 /// Observability settings.
40 observability: CasObservabilityConfig,
41 /// Stored validation error for zero max attempts.
42 max_attempts_error: Option<RetryConfigError>,
43 /// Marker preserving the executor type parameters.
44 marker: PhantomData<fn() -> (T, E)>,
45}
46
47impl<T, E> CasBuilder<T, E> {
48 /// Creates a builder with default retry options.
49 ///
50 /// # Returns
51 /// A [`CasBuilder`] using [`RetryOptions::default`].
52 pub fn new() -> Self {
53 let options = RetryOptions::default();
54 Self {
55 max_attempts: options.max_attempts(),
56 max_operation_elapsed: options.max_operation_elapsed(),
57 max_total_elapsed: options.max_total_elapsed(),
58 delay: options.delay().clone(),
59 jitter: options.jitter(),
60 attempt_timeout: None,
61 timeout_policy: CasTimeoutPolicy::Retry,
62 observability: CasObservabilityConfig::default(),
63 max_attempts_error: None,
64 marker: PhantomData,
65 }
66 }
67
68 /// Replaces builder settings from an existing retry option snapshot.
69 ///
70 /// # Parameters
71 /// - `options`: Retry option snapshot to install.
72 ///
73 /// # Returns
74 /// The updated builder.
75 pub fn options(mut self, options: RetryOptions) -> Self {
76 self.max_attempts = options.max_attempts();
77 self.max_operation_elapsed = options.max_operation_elapsed();
78 self.max_total_elapsed = options.max_total_elapsed();
79 self.delay = options.delay().clone();
80 self.jitter = options.jitter();
81 self.max_attempts_error = None;
82 self
83 }
84
85 /// Sets the maximum total attempts.
86 ///
87 /// # Parameters
88 /// - `max_attempts`: Maximum attempts, including the initial attempt.
89 ///
90 /// # Returns
91 /// The updated builder.
92 pub fn max_attempts(mut self, max_attempts: u32) -> Self {
93 if max_attempts == 0 {
94 self.max_attempts_error = Some(RetryConfigError::invalid_value(
95 qubit_retry::constants::KEY_MAX_ATTEMPTS,
96 "max_attempts must be greater than zero",
97 ));
98 } else {
99 self.max_attempts = max_attempts;
100 self.max_attempts_error = None;
101 }
102 self
103 }
104
105 /// Sets the maximum retries after the initial attempt.
106 ///
107 /// # Parameters
108 /// - `max_retries`: Maximum retries after the first attempt.
109 ///
110 /// # Returns
111 /// The updated builder.
112 #[inline]
113 pub fn max_retries(self, max_retries: u32) -> Self {
114 self.max_attempts(max_retries.saturating_add(1))
115 }
116
117 /// Sets the maximum cumulative user operation elapsed-time budget.
118 ///
119 /// # Parameters
120 /// - `max_operation_elapsed`: Optional cumulative user operation time budget.
121 ///
122 /// # Returns
123 /// The updated builder.
124 #[inline]
125 pub fn max_operation_elapsed(mut self, max_operation_elapsed: Option<Duration>) -> Self {
126 self.max_operation_elapsed = max_operation_elapsed;
127 self
128 }
129
130 /// Sets the maximum monotonic elapsed-time budget for the whole retry flow.
131 ///
132 /// # Parameters
133 /// - `max_total_elapsed`: Optional total retry-flow time budget.
134 ///
135 /// # Returns
136 /// The updated builder.
137 #[inline]
138 pub fn max_total_elapsed(mut self, max_total_elapsed: Option<Duration>) -> Self {
139 self.max_total_elapsed = max_total_elapsed;
140 self
141 }
142
143 /// Sets the base retry delay strategy.
144 ///
145 /// # Parameters
146 /// - `delay`: Delay strategy used between failed attempts.
147 ///
148 /// # Returns
149 /// The updated builder.
150 #[inline]
151 pub fn delay(mut self, delay: RetryDelay) -> Self {
152 self.delay = delay;
153 self
154 }
155
156 /// Uses immediate retries with no sleep.
157 ///
158 /// # Returns
159 /// The updated builder.
160 #[inline]
161 pub fn no_delay(self) -> Self {
162 self.delay(RetryDelay::none())
163 }
164
165 /// Uses one fixed retry delay.
166 ///
167 /// # Parameters
168 /// - `delay`: Delay slept before each retry.
169 ///
170 /// # Returns
171 /// The updated builder.
172 #[inline]
173 pub fn fixed_delay(self, delay: Duration) -> Self {
174 self.delay(RetryDelay::fixed(delay))
175 }
176
177 /// Uses one random retry delay range.
178 ///
179 /// # Parameters
180 /// - `min`: Inclusive minimum delay.
181 /// - `max`: Inclusive maximum delay.
182 ///
183 /// # Returns
184 /// The updated builder.
185 #[inline]
186 pub fn random_delay(self, min: Duration, max: Duration) -> Self {
187 self.delay(RetryDelay::random(min, max))
188 }
189
190 /// Uses exponential backoff with multiplier `2.0`.
191 ///
192 /// # Parameters
193 /// - `initial`: Initial retry delay.
194 /// - `max`: Maximum retry delay.
195 ///
196 /// # Returns
197 /// The updated builder.
198 #[inline]
199 pub fn exponential_backoff(self, initial: Duration, max: Duration) -> Self {
200 self.exponential_backoff_with_multiplier(initial, max, 2.0)
201 }
202
203 /// Uses exponential backoff with a custom multiplier.
204 ///
205 /// # Parameters
206 /// - `initial`: Initial retry delay.
207 /// - `max`: Maximum retry delay.
208 /// - `multiplier`: Multiplier applied after each failed attempt.
209 ///
210 /// # Returns
211 /// The updated builder.
212 #[inline]
213 pub fn exponential_backoff_with_multiplier(
214 self,
215 initial: Duration,
216 max: Duration,
217 multiplier: f64,
218 ) -> Self {
219 self.delay(RetryDelay::exponential(initial, max, multiplier))
220 }
221
222 /// Sets the jitter strategy.
223 ///
224 /// # Parameters
225 /// - `jitter`: Jitter strategy applied to retry delays.
226 ///
227 /// # Returns
228 /// The updated builder.
229 #[inline]
230 pub fn jitter(mut self, jitter: RetryJitter) -> Self {
231 self.jitter = jitter;
232 self
233 }
234
235 /// Sets relative jitter by factor.
236 ///
237 /// # Parameters
238 /// - `factor`: Relative jitter factor in `[0.0, 1.0]`.
239 ///
240 /// # Returns
241 /// The updated builder.
242 #[inline]
243 pub fn jitter_factor(self, factor: f64) -> Self {
244 self.jitter(RetryJitter::factor(factor))
245 }
246
247 /// Sets the async per-attempt timeout.
248 ///
249 /// # Parameters
250 /// - `attempt_timeout`: Timeout applied to each async CAS attempt.
251 ///
252 /// # Returns
253 /// The updated builder.
254 #[inline]
255 pub fn attempt_timeout(mut self, attempt_timeout: Option<Duration>) -> Self {
256 self.attempt_timeout = attempt_timeout;
257 self
258 }
259
260 /// Retries attempts that exceed the configured timeout.
261 ///
262 /// # Returns
263 /// The updated builder.
264 #[inline]
265 pub fn retry_on_timeout(mut self) -> Self {
266 self.timeout_policy = CasTimeoutPolicy::Retry;
267 self
268 }
269
270 /// Aborts the CAS flow when one attempt exceeds the timeout.
271 ///
272 /// # Returns
273 /// The updated builder.
274 #[inline]
275 pub fn abort_on_timeout(mut self) -> Self {
276 self.timeout_policy = CasTimeoutPolicy::Abort;
277 self
278 }
279
280 /// Applies a built-in CAS strategy to this builder.
281 ///
282 /// # Parameters
283 /// - `strategy`: Strategy profile to install.
284 ///
285 /// # Returns
286 /// The updated builder.
287 pub fn strategy(self, strategy: CasStrategy) -> Self {
288 let profile = strategy.profile();
289 let builder = self
290 .max_attempts(profile.max_attempts())
291 .max_operation_elapsed(Some(profile.max_operation_elapsed()))
292 .max_total_elapsed(profile.max_total_elapsed());
293 if let Some((initial, max, jitter)) = strategy.backoff() {
294 builder
295 .exponential_backoff(initial, max)
296 .jitter_factor(jitter)
297 } else {
298 builder.no_delay()
299 }
300 }
301
302 /// Installs observability configuration.
303 ///
304 /// # Parameters
305 /// - `observability`: Observability settings to use.
306 ///
307 /// # Returns
308 /// The updated builder.
309 #[inline]
310 pub fn observability(mut self, observability: CasObservabilityConfig) -> Self {
311 self.observability = observability;
312 self
313 }
314
315 /// Enables contention alerting with the supplied thresholds.
316 ///
317 /// # Parameters
318 /// - `thresholds`: Thresholds used to classify hot contention.
319 ///
320 /// # Returns
321 /// The updated builder.
322 #[inline]
323 pub fn alert_on_contention(mut self, thresholds: ContentionThresholds) -> Self {
324 self.observability = self.observability.with_contention_thresholds(thresholds);
325 self
326 }
327
328 /// Enables retry-layer listener panic isolation.
329 ///
330 /// # Returns
331 /// The updated builder.
332 #[inline]
333 pub fn isolate_listener_panics(mut self) -> Self {
334 self.observability = self
335 .observability
336 .with_listener_panic_policy(ListenerPanicPolicy::Isolate);
337 self
338 }
339
340 /// Builds one executor after validating the settings.
341 ///
342 /// # Returns
343 /// A validated [`CasExecutor`].
344 ///
345 /// # Errors
346 /// Returns [`RetryConfigError`] when the configured retry settings are
347 /// invalid.
348 pub fn build(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
349 if let Some(error) = self.max_attempts_error {
350 return Err(error);
351 }
352 let options = RetryOptions::new(
353 self.max_attempts,
354 self.max_operation_elapsed,
355 self.max_total_elapsed,
356 self.delay,
357 self.jitter,
358 )?;
359 Ok(CasExecutor::new(
360 options,
361 self.attempt_timeout,
362 self.timeout_policy,
363 self.observability,
364 ))
365 }
366
367 /// Builds one executor with the contention-adaptive strategy.
368 ///
369 /// # Returns
370 /// A configured [`CasExecutor`] suitable for contended writers.
371 pub fn build_contention_adaptive(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
372 self.strategy(CasStrategy::ContentionAdaptive).build()
373 }
374
375 /// Builds one executor with the latency-first strategy.
376 ///
377 /// # Returns
378 /// A configured [`CasExecutor`] optimized for low latency.
379 pub fn build_latency_first(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
380 self.strategy(CasStrategy::LatencyFirst).build()
381 }
382
383 /// Builds one executor with the reliability-first strategy.
384 ///
385 /// # Returns
386 /// A configured [`CasExecutor`] optimized for long retry windows.
387 pub fn build_reliability_first(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
388 self.strategy(CasStrategy::ReliabilityFirst).build()
389 }
390}
391
392impl<T, E> Default for CasBuilder<T, E> {
393 /// Creates a default CAS builder.
394 ///
395 /// # Returns
396 /// A builder equivalent to [`CasBuilder::new`].
397 #[inline]
398 fn default() -> Self {
399 Self::new()
400 }
401}