qubit_cas/executor/cas_builder.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Builder for [`crate::CasExecutor`].
11
12use std::marker::PhantomData;
13use std::time::Duration;
14
15use qubit_error::BoxError;
16use qubit_retry::{
17 RetryConfigError,
18 RetryDelay,
19 RetryJitter,
20 RetryOptions,
21};
22
23use crate::observability::{
24 CasObservabilityConfig,
25 ContentionThresholds,
26 ListenerPanicPolicy,
27};
28use crate::options::CasTimeoutPolicy;
29use crate::strategy::CasStrategy;
30
31use super::cas_executor::CasExecutor;
32
33/// Builder for [`CasExecutor`](crate::CasExecutor).
34pub struct CasBuilder<T, E = BoxError> {
35 /// Maximum attempts, including the initial attempt.
36 max_attempts: u32,
37 /// Maximum cumulative user operation time for the retry flow.
38 max_operation_elapsed: Option<Duration>,
39 /// Maximum monotonic elapsed time for the whole retry flow.
40 max_total_elapsed: Option<Duration>,
41 /// Base retry delay strategy.
42 delay: RetryDelay,
43 /// Jitter strategy applied to the base delay.
44 jitter: RetryJitter,
45 /// Optional async attempt timeout.
46 attempt_timeout: Option<Duration>,
47 /// Policy used when one attempt exceeds the timeout.
48 timeout_policy: CasTimeoutPolicy,
49 /// Observability settings.
50 observability: CasObservabilityConfig,
51 /// Stored validation error for zero max attempts.
52 max_attempts_error: Option<RetryConfigError>,
53 /// Marker preserving the executor type parameters.
54 marker: PhantomData<fn() -> (T, E)>,
55}
56
57impl<T, E> CasBuilder<T, E> {
58 /// Creates a builder with default retry options.
59 ///
60 /// # Returns
61 /// A [`CasBuilder`] using [`RetryOptions::default`].
62 pub fn new() -> Self {
63 let options = RetryOptions::default();
64 Self {
65 max_attempts: options.max_attempts(),
66 max_operation_elapsed: options.max_operation_elapsed(),
67 max_total_elapsed: options.max_total_elapsed(),
68 delay: options.delay().clone(),
69 jitter: options.jitter(),
70 attempt_timeout: None,
71 timeout_policy: CasTimeoutPolicy::Retry,
72 observability: CasObservabilityConfig::default(),
73 max_attempts_error: None,
74 marker: PhantomData,
75 }
76 }
77
78 /// Replaces builder settings from an existing retry option snapshot.
79 ///
80 /// # Parameters
81 /// - `options`: Retry option snapshot to install.
82 ///
83 /// # Returns
84 /// The updated builder.
85 pub fn options(mut self, options: RetryOptions) -> Self {
86 self.max_attempts = options.max_attempts();
87 self.max_operation_elapsed = options.max_operation_elapsed();
88 self.max_total_elapsed = options.max_total_elapsed();
89 self.delay = options.delay().clone();
90 self.jitter = options.jitter();
91 self.max_attempts_error = None;
92 self
93 }
94
95 /// Sets the maximum total attempts.
96 ///
97 /// # Parameters
98 /// - `max_attempts`: Maximum attempts, including the initial attempt.
99 ///
100 /// # Returns
101 /// The updated builder.
102 pub fn max_attempts(mut self, max_attempts: u32) -> Self {
103 if max_attempts == 0 {
104 self.max_attempts_error = Some(RetryConfigError::invalid_value(
105 qubit_retry::constants::KEY_MAX_ATTEMPTS,
106 "max_attempts must be greater than zero",
107 ));
108 } else {
109 self.max_attempts = max_attempts;
110 self.max_attempts_error = None;
111 }
112 self
113 }
114
115 /// Sets the maximum retries after the initial attempt.
116 ///
117 /// # Parameters
118 /// - `max_retries`: Maximum retries after the first attempt.
119 ///
120 /// # Returns
121 /// The updated builder.
122 #[inline]
123 pub fn max_retries(self, max_retries: u32) -> Self {
124 self.max_attempts(max_retries.saturating_add(1))
125 }
126
127 /// Sets the maximum cumulative user operation elapsed-time budget.
128 ///
129 /// # Parameters
130 /// - `max_operation_elapsed`: Optional cumulative user operation time budget.
131 ///
132 /// # Returns
133 /// The updated builder.
134 #[inline]
135 pub fn max_operation_elapsed(mut self, max_operation_elapsed: Option<Duration>) -> Self {
136 self.max_operation_elapsed = max_operation_elapsed;
137 self
138 }
139
140 /// Sets the maximum monotonic elapsed-time budget for the whole retry flow.
141 ///
142 /// # Parameters
143 /// - `max_total_elapsed`: Optional total retry-flow time budget.
144 ///
145 /// # Returns
146 /// The updated builder.
147 #[inline]
148 pub fn max_total_elapsed(mut self, max_total_elapsed: Option<Duration>) -> Self {
149 self.max_total_elapsed = max_total_elapsed;
150 self
151 }
152
153 /// Sets the base retry delay strategy.
154 ///
155 /// # Parameters
156 /// - `delay`: Delay strategy used between failed attempts.
157 ///
158 /// # Returns
159 /// The updated builder.
160 #[inline]
161 pub fn delay(mut self, delay: RetryDelay) -> Self {
162 self.delay = delay;
163 self
164 }
165
166 /// Uses immediate retries with no sleep.
167 ///
168 /// # Returns
169 /// The updated builder.
170 #[inline]
171 pub fn no_delay(self) -> Self {
172 self.delay(RetryDelay::none())
173 }
174
175 /// Uses one fixed retry delay.
176 ///
177 /// # Parameters
178 /// - `delay`: Delay slept before each retry.
179 ///
180 /// # Returns
181 /// The updated builder.
182 #[inline]
183 pub fn fixed_delay(self, delay: Duration) -> Self {
184 self.delay(RetryDelay::fixed(delay))
185 }
186
187 /// Uses one random retry delay range.
188 ///
189 /// # Parameters
190 /// - `min`: Inclusive minimum delay.
191 /// - `max`: Inclusive maximum delay.
192 ///
193 /// # Returns
194 /// The updated builder.
195 #[inline]
196 pub fn random_delay(self, min: Duration, max: Duration) -> Self {
197 self.delay(RetryDelay::random(min, max))
198 }
199
200 /// Uses exponential backoff with multiplier `2.0`.
201 ///
202 /// # Parameters
203 /// - `initial`: Initial retry delay.
204 /// - `max`: Maximum retry delay.
205 ///
206 /// # Returns
207 /// The updated builder.
208 #[inline]
209 pub fn exponential_backoff(self, initial: Duration, max: Duration) -> Self {
210 self.exponential_backoff_with_multiplier(initial, max, 2.0)
211 }
212
213 /// Uses exponential backoff with a custom multiplier.
214 ///
215 /// # Parameters
216 /// - `initial`: Initial retry delay.
217 /// - `max`: Maximum retry delay.
218 /// - `multiplier`: Multiplier applied after each failed attempt.
219 ///
220 /// # Returns
221 /// The updated builder.
222 #[inline]
223 pub fn exponential_backoff_with_multiplier(
224 self,
225 initial: Duration,
226 max: Duration,
227 multiplier: f64,
228 ) -> Self {
229 self.delay(RetryDelay::exponential(initial, max, multiplier))
230 }
231
232 /// Sets the jitter strategy.
233 ///
234 /// # Parameters
235 /// - `jitter`: Jitter strategy applied to retry delays.
236 ///
237 /// # Returns
238 /// The updated builder.
239 #[inline]
240 pub fn jitter(mut self, jitter: RetryJitter) -> Self {
241 self.jitter = jitter;
242 self
243 }
244
245 /// Sets relative jitter by factor.
246 ///
247 /// # Parameters
248 /// - `factor`: Relative jitter factor in `[0.0, 1.0]`.
249 ///
250 /// # Returns
251 /// The updated builder.
252 #[inline]
253 pub fn jitter_factor(self, factor: f64) -> Self {
254 self.jitter(RetryJitter::factor(factor))
255 }
256
257 /// Sets the async per-attempt timeout.
258 ///
259 /// # Parameters
260 /// - `attempt_timeout`: Timeout applied to each async CAS attempt.
261 ///
262 /// # Returns
263 /// The updated builder.
264 #[inline]
265 pub fn attempt_timeout(mut self, attempt_timeout: Option<Duration>) -> Self {
266 self.attempt_timeout = attempt_timeout;
267 self
268 }
269
270 /// Retries attempts that exceed the configured timeout.
271 ///
272 /// # Returns
273 /// The updated builder.
274 #[inline]
275 pub fn retry_on_timeout(mut self) -> Self {
276 self.timeout_policy = CasTimeoutPolicy::Retry;
277 self
278 }
279
280 /// Aborts the CAS flow when one attempt exceeds the timeout.
281 ///
282 /// # Returns
283 /// The updated builder.
284 #[inline]
285 pub fn abort_on_timeout(mut self) -> Self {
286 self.timeout_policy = CasTimeoutPolicy::Abort;
287 self
288 }
289
290 /// Applies a built-in CAS strategy to this builder.
291 ///
292 /// # Parameters
293 /// - `strategy`: Strategy profile to install.
294 ///
295 /// # Returns
296 /// The updated builder.
297 pub fn strategy(self, strategy: CasStrategy) -> Self {
298 let profile = strategy.profile();
299 let builder = self
300 .max_attempts(profile.max_attempts())
301 .max_operation_elapsed(Some(profile.max_operation_elapsed()))
302 .max_total_elapsed(profile.max_total_elapsed());
303 if let Some((initial, max, jitter)) = strategy.backoff() {
304 builder
305 .exponential_backoff(initial, max)
306 .jitter_factor(jitter)
307 } else {
308 builder.no_delay()
309 }
310 }
311
312 /// Installs observability configuration.
313 ///
314 /// # Parameters
315 /// - `observability`: Observability settings to use.
316 ///
317 /// # Returns
318 /// The updated builder.
319 #[inline]
320 pub fn observability(mut self, observability: CasObservabilityConfig) -> Self {
321 self.observability = observability;
322 self
323 }
324
325 /// Enables contention alerting with the supplied thresholds.
326 ///
327 /// # Parameters
328 /// - `thresholds`: Thresholds used to classify hot contention.
329 ///
330 /// # Returns
331 /// The updated builder.
332 #[inline]
333 pub fn alert_on_contention(mut self, thresholds: ContentionThresholds) -> Self {
334 self.observability = self.observability.with_contention_thresholds(thresholds);
335 self
336 }
337
338 /// Enables retry-layer listener panic isolation.
339 ///
340 /// # Returns
341 /// The updated builder.
342 #[inline]
343 pub fn isolate_listener_panics(mut self) -> Self {
344 self.observability = self
345 .observability
346 .with_listener_panic_policy(ListenerPanicPolicy::Isolate);
347 self
348 }
349
350 /// Builds one executor after validating the settings.
351 ///
352 /// # Returns
353 /// A validated [`CasExecutor`].
354 ///
355 /// # Errors
356 /// Returns [`RetryConfigError`] when the configured retry settings are
357 /// invalid.
358 pub fn build(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
359 if let Some(error) = self.max_attempts_error {
360 return Err(error);
361 }
362 let options = RetryOptions::new(
363 self.max_attempts,
364 self.max_operation_elapsed,
365 self.max_total_elapsed,
366 self.delay,
367 self.jitter,
368 )?;
369 Ok(CasExecutor::new(
370 options,
371 self.attempt_timeout,
372 self.timeout_policy,
373 self.observability,
374 ))
375 }
376
377 /// Builds one executor with the contention-adaptive strategy.
378 ///
379 /// # Returns
380 /// A configured [`CasExecutor`] suitable for contended writers.
381 pub fn build_contention_adaptive(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
382 self.strategy(CasStrategy::ContentionAdaptive).build()
383 }
384
385 /// Builds one executor with the latency-first strategy.
386 ///
387 /// # Returns
388 /// A configured [`CasExecutor`] optimized for low latency.
389 pub fn build_latency_first(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
390 self.strategy(CasStrategy::LatencyFirst).build()
391 }
392
393 /// Builds one executor with the reliability-first strategy.
394 ///
395 /// # Returns
396 /// A configured [`CasExecutor`] optimized for long retry windows.
397 pub fn build_reliability_first(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
398 self.strategy(CasStrategy::ReliabilityFirst).build()
399 }
400}
401
402impl<T, E> Default for CasBuilder<T, E> {
403 /// Creates a default CAS builder.
404 ///
405 /// # Returns
406 /// A builder equivalent to [`CasBuilder::new`].
407 #[inline]
408 fn default() -> Self {
409 Self::new()
410 }
411}