qubit_cas/executor/cas_builder.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Builder for [`crate::CasExecutor`].
11
12use std::marker::PhantomData;
13use std::time::Duration;
14
15use qubit_error::BoxError;
16use qubit_retry::{RetryConfigError, RetryDelay, RetryJitter, RetryOptions};
17
18use crate::observability::{CasObservabilityConfig, ContentionThresholds, ListenerPanicPolicy};
19use crate::options::CasTimeoutPolicy;
20use crate::strategy::CasStrategy;
21
22use super::cas_executor::CasExecutor;
23
24/// Builder for [`CasExecutor`](crate::CasExecutor).
25pub struct CasBuilder<T, E = BoxError> {
26 /// Maximum attempts, including the initial attempt.
27 max_attempts: u32,
28 /// Maximum cumulative user operation time for the retry flow.
29 max_operation_elapsed: Option<Duration>,
30 /// Maximum monotonic elapsed time for the whole retry flow.
31 max_total_elapsed: Option<Duration>,
32 /// Base retry delay strategy.
33 delay: RetryDelay,
34 /// Jitter strategy applied to the base delay.
35 jitter: RetryJitter,
36 /// Optional async attempt timeout.
37 attempt_timeout: Option<Duration>,
38 /// Policy used when one attempt exceeds the timeout.
39 timeout_policy: CasTimeoutPolicy,
40 /// Observability settings.
41 observability: CasObservabilityConfig,
42 /// Stored validation error for zero max attempts.
43 max_attempts_error: Option<RetryConfigError>,
44 /// Marker preserving the executor type parameters.
45 marker: PhantomData<fn() -> (T, E)>,
46}
47
48impl<T, E> CasBuilder<T, E> {
49 /// Creates a builder with default retry options.
50 ///
51 /// # Returns
52 /// A [`CasBuilder`] using [`RetryOptions::default`].
53 pub fn new() -> Self {
54 let options = RetryOptions::default();
55 Self {
56 max_attempts: options.max_attempts(),
57 max_operation_elapsed: options.max_operation_elapsed(),
58 max_total_elapsed: options.max_total_elapsed(),
59 delay: options.delay().clone(),
60 jitter: options.jitter(),
61 attempt_timeout: None,
62 timeout_policy: CasTimeoutPolicy::Retry,
63 observability: CasObservabilityConfig::default(),
64 max_attempts_error: None,
65 marker: PhantomData,
66 }
67 }
68
69 /// Replaces builder settings from an existing retry option snapshot.
70 ///
71 /// # Parameters
72 /// - `options`: Retry option snapshot to install.
73 ///
74 /// # Returns
75 /// The updated builder.
76 pub fn options(mut self, options: RetryOptions) -> Self {
77 self.max_attempts = options.max_attempts();
78 self.max_operation_elapsed = options.max_operation_elapsed();
79 self.max_total_elapsed = options.max_total_elapsed();
80 self.delay = options.delay().clone();
81 self.jitter = options.jitter();
82 self.max_attempts_error = None;
83 self
84 }
85
86 /// Sets the maximum total attempts.
87 ///
88 /// # Parameters
89 /// - `max_attempts`: Maximum attempts, including the initial attempt.
90 ///
91 /// # Returns
92 /// The updated builder.
93 pub fn max_attempts(mut self, max_attempts: u32) -> Self {
94 if max_attempts == 0 {
95 self.max_attempts_error = Some(RetryConfigError::invalid_value(
96 qubit_retry::constants::KEY_MAX_ATTEMPTS,
97 "max_attempts must be greater than zero",
98 ));
99 } else {
100 self.max_attempts = max_attempts;
101 self.max_attempts_error = None;
102 }
103 self
104 }
105
106 /// Sets the maximum retries after the initial attempt.
107 ///
108 /// # Parameters
109 /// - `max_retries`: Maximum retries after the first attempt.
110 ///
111 /// # Returns
112 /// The updated builder.
113 #[inline]
114 pub fn max_retries(self, max_retries: u32) -> Self {
115 self.max_attempts(max_retries.saturating_add(1))
116 }
117
118 /// Sets the maximum cumulative user operation elapsed-time budget.
119 ///
120 /// # Parameters
121 /// - `max_operation_elapsed`: Optional cumulative user operation time budget.
122 ///
123 /// # Returns
124 /// The updated builder.
125 #[inline]
126 pub fn max_operation_elapsed(mut self, max_operation_elapsed: Option<Duration>) -> Self {
127 self.max_operation_elapsed = max_operation_elapsed;
128 self
129 }
130
131 /// Sets the maximum monotonic elapsed-time budget for the whole retry flow.
132 ///
133 /// # Parameters
134 /// - `max_total_elapsed`: Optional total retry-flow time budget.
135 ///
136 /// # Returns
137 /// The updated builder.
138 #[inline]
139 pub fn max_total_elapsed(mut self, max_total_elapsed: Option<Duration>) -> Self {
140 self.max_total_elapsed = max_total_elapsed;
141 self
142 }
143
144 /// Sets the base retry delay strategy.
145 ///
146 /// # Parameters
147 /// - `delay`: Delay strategy used between failed attempts.
148 ///
149 /// # Returns
150 /// The updated builder.
151 #[inline]
152 pub fn delay(mut self, delay: RetryDelay) -> Self {
153 self.delay = delay;
154 self
155 }
156
157 /// Uses immediate retries with no sleep.
158 ///
159 /// # Returns
160 /// The updated builder.
161 #[inline]
162 pub fn no_delay(self) -> Self {
163 self.delay(RetryDelay::none())
164 }
165
166 /// Uses one fixed retry delay.
167 ///
168 /// # Parameters
169 /// - `delay`: Delay slept before each retry.
170 ///
171 /// # Returns
172 /// The updated builder.
173 #[inline]
174 pub fn fixed_delay(self, delay: Duration) -> Self {
175 self.delay(RetryDelay::fixed(delay))
176 }
177
178 /// Uses one random retry delay range.
179 ///
180 /// # Parameters
181 /// - `min`: Inclusive minimum delay.
182 /// - `max`: Inclusive maximum delay.
183 ///
184 /// # Returns
185 /// The updated builder.
186 #[inline]
187 pub fn random_delay(self, min: Duration, max: Duration) -> Self {
188 self.delay(RetryDelay::random(min, max))
189 }
190
191 /// Uses exponential backoff with multiplier `2.0`.
192 ///
193 /// # Parameters
194 /// - `initial`: Initial retry delay.
195 /// - `max`: Maximum retry delay.
196 ///
197 /// # Returns
198 /// The updated builder.
199 #[inline]
200 pub fn exponential_backoff(self, initial: Duration, max: Duration) -> Self {
201 self.exponential_backoff_with_multiplier(initial, max, 2.0)
202 }
203
204 /// Uses exponential backoff with a custom multiplier.
205 ///
206 /// # Parameters
207 /// - `initial`: Initial retry delay.
208 /// - `max`: Maximum retry delay.
209 /// - `multiplier`: Multiplier applied after each failed attempt.
210 ///
211 /// # Returns
212 /// The updated builder.
213 #[inline]
214 pub fn exponential_backoff_with_multiplier(
215 self,
216 initial: Duration,
217 max: Duration,
218 multiplier: f64,
219 ) -> Self {
220 self.delay(RetryDelay::exponential(initial, max, multiplier))
221 }
222
223 /// Sets the jitter strategy.
224 ///
225 /// # Parameters
226 /// - `jitter`: Jitter strategy applied to retry delays.
227 ///
228 /// # Returns
229 /// The updated builder.
230 #[inline]
231 pub fn jitter(mut self, jitter: RetryJitter) -> Self {
232 self.jitter = jitter;
233 self
234 }
235
236 /// Sets relative jitter by factor.
237 ///
238 /// # Parameters
239 /// - `factor`: Relative jitter factor in `[0.0, 1.0]`.
240 ///
241 /// # Returns
242 /// The updated builder.
243 #[inline]
244 pub fn jitter_factor(self, factor: f64) -> Self {
245 self.jitter(RetryJitter::factor(factor))
246 }
247
248 /// Sets the async per-attempt timeout.
249 ///
250 /// # Parameters
251 /// - `attempt_timeout`: Timeout applied to each async CAS attempt.
252 ///
253 /// # Returns
254 /// The updated builder.
255 #[inline]
256 pub fn attempt_timeout(mut self, attempt_timeout: Option<Duration>) -> Self {
257 self.attempt_timeout = attempt_timeout;
258 self
259 }
260
261 /// Retries attempts that exceed the configured timeout.
262 ///
263 /// # Returns
264 /// The updated builder.
265 #[inline]
266 pub fn retry_on_timeout(mut self) -> Self {
267 self.timeout_policy = CasTimeoutPolicy::Retry;
268 self
269 }
270
271 /// Aborts the CAS flow when one attempt exceeds the timeout.
272 ///
273 /// # Returns
274 /// The updated builder.
275 #[inline]
276 pub fn abort_on_timeout(mut self) -> Self {
277 self.timeout_policy = CasTimeoutPolicy::Abort;
278 self
279 }
280
281 /// Applies a built-in CAS strategy to this builder.
282 ///
283 /// # Parameters
284 /// - `strategy`: Strategy profile to install.
285 ///
286 /// # Returns
287 /// The updated builder.
288 pub fn strategy(self, strategy: CasStrategy) -> Self {
289 let profile = strategy.profile();
290 let builder = self
291 .max_attempts(profile.max_attempts())
292 .max_operation_elapsed(Some(profile.max_operation_elapsed()))
293 .max_total_elapsed(profile.max_total_elapsed());
294 if let Some((initial, max, jitter)) = strategy.backoff() {
295 builder
296 .exponential_backoff(initial, max)
297 .jitter_factor(jitter)
298 } else {
299 builder.no_delay()
300 }
301 }
302
303 /// Installs observability configuration.
304 ///
305 /// # Parameters
306 /// - `observability`: Observability settings to use.
307 ///
308 /// # Returns
309 /// The updated builder.
310 #[inline]
311 pub fn observability(mut self, observability: CasObservabilityConfig) -> Self {
312 self.observability = observability;
313 self
314 }
315
316 /// Enables contention alerting with the supplied thresholds.
317 ///
318 /// # Parameters
319 /// - `thresholds`: Thresholds used to classify hot contention.
320 ///
321 /// # Returns
322 /// The updated builder.
323 #[inline]
324 pub fn alert_on_contention(mut self, thresholds: ContentionThresholds) -> Self {
325 self.observability = self.observability.with_contention_thresholds(thresholds);
326 self
327 }
328
329 /// Enables retry-layer listener panic isolation.
330 ///
331 /// # Returns
332 /// The updated builder.
333 #[inline]
334 pub fn isolate_listener_panics(mut self) -> Self {
335 self.observability = self
336 .observability
337 .with_listener_panic_policy(ListenerPanicPolicy::Isolate);
338 self
339 }
340
341 /// Builds one executor after validating the settings.
342 ///
343 /// # Returns
344 /// A validated [`CasExecutor`].
345 ///
346 /// # Errors
347 /// Returns [`RetryConfigError`] when the configured retry settings are
348 /// invalid.
349 pub fn build(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
350 if let Some(error) = self.max_attempts_error {
351 return Err(error);
352 }
353 let options = RetryOptions::new(
354 self.max_attempts,
355 self.max_operation_elapsed,
356 self.max_total_elapsed,
357 self.delay,
358 self.jitter,
359 )?;
360 Ok(CasExecutor::new(
361 options,
362 self.attempt_timeout,
363 self.timeout_policy,
364 self.observability,
365 ))
366 }
367
368 /// Builds one executor with the contention-adaptive strategy.
369 ///
370 /// # Returns
371 /// A configured [`CasExecutor`] suitable for contended writers.
372 pub fn build_contention_adaptive(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
373 self.strategy(CasStrategy::ContentionAdaptive).build()
374 }
375
376 /// Builds one executor with the latency-first strategy.
377 ///
378 /// # Returns
379 /// A configured [`CasExecutor`] optimized for low latency.
380 pub fn build_latency_first(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
381 self.strategy(CasStrategy::LatencyFirst).build()
382 }
383
384 /// Builds one executor with the reliability-first strategy.
385 ///
386 /// # Returns
387 /// A configured [`CasExecutor`] optimized for long retry windows.
388 pub fn build_reliability_first(self) -> Result<CasExecutor<T, E>, RetryConfigError> {
389 self.strategy(CasStrategy::ReliabilityFirst).build()
390 }
391}
392
393impl<T, E> Default for CasBuilder<T, E> {
394 /// Creates a default CAS builder.
395 ///
396 /// # Returns
397 /// A builder equivalent to [`CasBuilder::new`].
398 #[inline]
399 fn default() -> Self {
400 Self::new()
401 }
402}