Skip to main content

anodizer_core/
retry.rs

1//! Uniform retry-with-exponential-backoff primitives.
2//!
3//! Replaces six open-coded retry loops in `stage-docker` (3×) and
4//! `stage-release` (3×) that had diverged on backoff formulas —
5//! `2^(n-2)`, `2^(n-1)`, and `500 << (attempt-1)` all coexisted.
6//!
7//! The canonical policy is exponential backoff with multiplier 2 starting at
8//! `base_delay` and capped at `max_delay`:
9//!
10//! ```text
11//! attempt 1:  f() executes immediately
12//! attempt 2:  sleep base_delay
13//! attempt 3:  sleep base_delay * 2
14//! attempt N:  sleep min(base_delay * 2^(N-2), max_delay)
15//! ```
16//!
17//! `ControlFlow<Break, Continue>` lets the operation decide retry policy per
18//! failure (e.g. 4xx → Break, 5xx → Continue) without the helper encoding
19//! protocol-specific predicates.
20//!
21//! Both a sync (`retry_sync`) and async (`retry_async`) variant are provided so
22//! that sites can adopt without crossing a sync/async boundary.
23
24use std::error::Error as StdError;
25use std::fmt;
26use std::io;
27use std::ops::ControlFlow;
28use std::time::Duration;
29
30/// Retry policy used by `retry_sync` / `retry_async`.
31#[derive(Debug, Clone, Copy)]
32pub struct RetryPolicy {
33    /// Total attempts, including the first.
34    ///
35    /// Invariant: must be `>= 1`. The clamp is enforced at two layers so
36    /// every construction path is safe:
37    ///
38    /// 1. [`crate::config::RetryConfig::to_policy`] clamps user YAML
39    ///    (`attempts: 0` -> `1`) at the config-surface boundary.
40    /// 2. [`retry_sync`] / [`retry_async`] clamp again at the loop boundary
41    ///    to protect direct `RetryPolicy { max_attempts: 0, .. }`
42    ///    constructions (e.g. test fixtures).
43    ///
44    /// Callers therefore do NOT need to clamp `max_attempts` again at the
45    /// call site.
46    pub max_attempts: u32,
47    /// Delay before the second attempt (no wait before the first).
48    pub base_delay: Duration,
49    /// Upper bound on any individual sleep between attempts.
50    pub max_delay: Duration,
51}
52
53impl RetryPolicy {
54    /// Canonical policy matching GoReleaser upload defaults: 10 attempts, 50ms
55    /// base, 30s cap.
56    pub const UPLOAD: RetryPolicy = RetryPolicy {
57        max_attempts: 10,
58        base_delay: Duration::from_millis(50),
59        max_delay: Duration::from_secs(30),
60    };
61
62    pub fn delay_for(&self, next_attempt: u32) -> Duration {
63        // `next_attempt` is the attempt we're about to run (≥2). The wait
64        // before attempt 2 uses base_delay; before attempt 3 uses base_delay*2;
65        // i.e. multiplier = 2^(next_attempt - 2).
66        let exp = next_attempt.saturating_sub(2);
67        let mult = 1u64.checked_shl(exp).unwrap_or(u64::MAX);
68        let ms = (self.base_delay.as_millis() as u64).saturating_mul(mult);
69        std::cmp::min(Duration::from_millis(ms), self.max_delay)
70    }
71}
72
73/// Retry a synchronous operation according to `policy`.
74///
75/// `op` returns:
76/// - `Ok(T)` on success (no retry).
77/// - `Err(ControlFlow::Continue(e))` to retry if attempts remain.
78/// - `Err(ControlFlow::Break(e))` to stop immediately (4xx-style fast-fail).
79///
80/// Returns the last error if all attempts are exhausted.
81pub fn retry_sync<T, E, F>(policy: &RetryPolicy, mut op: F) -> Result<T, E>
82where
83    F: FnMut(u32) -> Result<T, ControlFlow<E, E>>,
84{
85    let max = policy.max_attempts.max(1);
86    let mut attempt: u32 = 1;
87    loop {
88        if attempt > 1 {
89            std::thread::sleep(policy.delay_for(attempt));
90        }
91        match op(attempt) {
92            Ok(v) => return Ok(v),
93            Err(ControlFlow::Break(e)) => return Err(e),
94            Err(ControlFlow::Continue(e)) => {
95                if attempt >= max {
96                    return Err(e);
97                }
98            }
99        }
100        attempt += 1;
101    }
102}
103
104/// Retry an asynchronous operation according to `policy`.
105///
106/// Same semantics as `retry_sync` but awaits `op` and uses `tokio::time::sleep`.
107pub async fn retry_async<T, E, F, Fut>(policy: &RetryPolicy, mut op: F) -> Result<T, E>
108where
109    F: FnMut(u32) -> Fut,
110    Fut: std::future::Future<Output = Result<T, ControlFlow<E, E>>>,
111{
112    let max = policy.max_attempts.max(1);
113    let mut attempt: u32 = 1;
114    loop {
115        if attempt > 1 {
116            tokio::time::sleep(policy.delay_for(attempt)).await;
117        }
118        match op(attempt).await {
119            Ok(v) => return Ok(v),
120            Err(ControlFlow::Break(e)) => return Err(e),
121            Err(ControlFlow::Continue(e)) => {
122                if attempt >= max {
123                    return Err(e);
124                }
125            }
126        }
127        attempt += 1;
128    }
129}
130
131/// Whether to consider 3xx redirects a success outcome (most upload-style
132/// publishers do, since the underlying client follows redirects under the
133/// hood; some callers explicitly want only 2xx).
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub enum SuccessClass {
136    /// 2xx only. Any 3xx is treated as a non-success status (eligible for
137    /// retry / fast-fail per `is_retriable`).
138    Strict,
139    /// 2xx OR 3xx. Used by upload publishers whose servers may emit a
140    /// 301/302/307 in the success path (artifactory does this for some
141    /// virtual repo configurations).
142    AllowRedirects,
143}
144
145/// Drive a single HTTP call to completion, retrying transient failures via
146/// the shared [`retry_sync`] machinery.
147///
148/// On every attempt, `send` is invoked to construct + dispatch a fresh
149/// request. The closure must rebuild the request from scratch (multipart
150/// `Form`, streamed body, etc. are move-only). The helper:
151///
152/// 1. On `Err` (transport-level): wrap in [`HttpError::from_response`] +
153///    a `<label>: <stage> transport error` context, classify with
154///    [`is_retriable`] (so EOF / connection-reset retry, plain "dial
155///    failed" fast-fails), and dispatch `Continue`/`Break`.
156/// 2. On non-success status: drain the body, format the outer message via
157///    `error_msg`, wrap in [`HttpError::new`] with the upstream status, and
158///    classify (5xx/429 → `Continue`, 4xx → `Break`).
159/// 3. On success status: return `(status, body)`.
160///
161/// The `error_msg` closure receives the response status and body so callers
162/// can format publisher-specific envelopes (e.g. artifactory's
163/// `{"errors":[...]}` JSON).
164///
165/// Replaces three nearly-identical retry loops:
166/// - `stage-publish/cloudsmith.rs::retry_request`
167/// - `stage-publish/artifactory.rs::upload_single_artifact` (inline)
168/// - `stage-announce/helpers.rs::retry_http` (now wraps this helper; see
169///   announce/helpers.rs for the thin adapter that returns the body string
170///   instead of `(StatusCode, String)`).
171pub fn retry_http_blocking<F, M>(
172    label: &str,
173    policy: &RetryPolicy,
174    success_class: SuccessClass,
175    mut send: F,
176    error_msg: M,
177) -> anyhow::Result<(reqwest::StatusCode, String)>
178where
179    F: FnMut(u32) -> Result<reqwest::blocking::Response, reqwest::Error>,
180    M: Fn(reqwest::StatusCode, &str) -> String,
181{
182    use anyhow::Context as _;
183    retry_sync(policy, |attempt| {
184        match send(attempt) {
185            Ok(resp) => {
186                let status = resp.status();
187                let succeeded = match success_class {
188                    SuccessClass::Strict => status.is_success(),
189                    SuccessClass::AllowRedirects => status.is_success() || status.is_redirection(),
190                };
191                let body = resp
192                    .text()
193                    .unwrap_or_else(|e| format!("<failed to read body: {e}>"));
194                if succeeded {
195                    Ok((status, body))
196                } else {
197                    let msg = error_msg(status, &body);
198                    let inner = anyhow::anyhow!("{msg}");
199                    let wrapped = anyhow::Error::new(HttpError::new(
200                        std::io::Error::other(inner.to_string()),
201                        status.as_u16(),
202                    ))
203                    .context(inner);
204                    // `as_ref()` is the head of the chain; `is_retriable` walks
205                    // `.source()` to reach `HttpError`. `root_cause()` would
206                    // unwrap past `HttpError` to the io::Error leaf and miss
207                    // the status. Pinned by
208                    // `classifier_5xx_via_anyhow_chain_uses_as_ref`.
209                    if is_retriable(wrapped.as_ref()) {
210                        Err(ControlFlow::Continue(wrapped))
211                    } else {
212                        Err(ControlFlow::Break(wrapped))
213                    }
214                }
215            }
216            Err(e) => {
217                // Transport-layer failure: always wrap in HttpError(status=0)
218                // so the chain-walking classifier can see network-error
219                // substrings via the inner io::Error message.
220                let err = anyhow::Error::new(HttpError::from_response(e, None))
221                    .context(format!("{label}: HTTP transport error"));
222                if is_retriable(err.as_ref()) {
223                    Err(ControlFlow::Continue(err))
224                } else {
225                    Err(ControlFlow::Break(err))
226                }
227            }
228        }
229    })
230    .with_context(|| format!("{label}: exhausted retry attempts"))
231}
232
233/// Async sibling of [`retry_http_blocking`] for `reqwest::Client` (non-blocking)
234/// call sites such as the GitLab and Gitea release publishers.
235///
236/// Each attempt invokes `send` (a fresh future) and:
237///
238/// 1. On `Err` (transport-level): wraps in [`HttpError::from_response`] +
239///    a `<label>: HTTP transport error` context, classifies via
240///    [`is_retriable`] (network-substring + EOF chain match), and dispatches
241///    `Continue`/`Break`.
242/// 2. On non-success status: drains the body via `Response::text().await`,
243///    formats the outer message via `error_msg`, wraps in [`HttpError::new`]
244///    with the upstream status, and classifies (5xx/429 → `Continue`, 4xx →
245///    `Break`).
246/// 3. On success status: returns the raw [`reqwest::Response`] for the
247///    caller to consume (e.g. `.json()`, `.text()`, header inspection).
248///
249/// `success_class` mirrors the blocking variant: `Strict` rejects 3xx,
250/// `AllowRedirects` accepts them. Most async API clients want `Strict`
251/// (their reqwest::Client follows redirects by default, so a surfaced 3xx
252/// is itself an error).
253pub async fn retry_http_async<F, Fut, M>(
254    label: &str,
255    policy: &RetryPolicy,
256    success_class: SuccessClass,
257    mut send: F,
258    error_msg: M,
259) -> anyhow::Result<reqwest::Response>
260where
261    F: FnMut(u32) -> Fut,
262    Fut: std::future::Future<Output = Result<reqwest::Response, reqwest::Error>>,
263    M: Fn(reqwest::StatusCode, &str) -> String,
264{
265    use anyhow::Context as _;
266    retry_async(policy, |attempt| {
267        let fut = send(attempt);
268        let error_msg = &error_msg;
269        async move {
270            match fut.await {
271                Ok(resp) => {
272                    let status = resp.status();
273                    let succeeded = match success_class {
274                        SuccessClass::Strict => status.is_success(),
275                        SuccessClass::AllowRedirects => {
276                            status.is_success() || status.is_redirection()
277                        }
278                    };
279                    if succeeded {
280                        Ok(resp)
281                    } else {
282                        let body = resp
283                            .text()
284                            .await
285                            .unwrap_or_else(|e| format!("<failed to read body: {e}>"));
286                        let msg = error_msg(status, &body);
287                        let inner = anyhow::anyhow!("{msg}");
288                        let wrapped = anyhow::Error::new(HttpError::new(
289                            std::io::Error::other(inner.to_string()),
290                            status.as_u16(),
291                        ))
292                        .context(inner);
293                        // `as_ref()` is the head of the chain; `is_retriable`
294                        // walks `.source()` to reach `HttpError`. `root_cause()`
295                        // would unwrap past `HttpError` to the io::Error leaf
296                        // and miss the status. Pinned by
297                        // `classifier_5xx_via_anyhow_chain_uses_as_ref`.
298                        if is_retriable(wrapped.as_ref()) {
299                            Err(ControlFlow::Continue(wrapped))
300                        } else {
301                            Err(ControlFlow::Break(wrapped))
302                        }
303                    }
304                }
305                Err(e) => {
306                    // Transport-layer failure: wrap in HttpError(status=0) so
307                    // the chain-walking classifier can see network-error
308                    // substrings via the inner io::Error message.
309                    let err = anyhow::Error::new(HttpError::from_response(e, None))
310                        .context(format!("{label}: HTTP transport error"));
311                    if is_retriable(err.as_ref()) {
312                        Err(ControlFlow::Continue(err))
313                    } else {
314                        Err(ControlFlow::Break(err))
315                    }
316                }
317            }
318        }
319    })
320    .await
321    .with_context(|| format!("{label}: exhausted retry attempts"))
322}
323
324/// Classify a `reqwest::Result<reqwest::blocking::Response>` into the
325/// `ControlFlow` shape expected by `retry_sync` for a typical HTTP call:
326/// 5xx + transport errors retry, 4xx fast-fails, 2xx/3xx returns Ok. The
327/// returned response (Ok branch) is the caller's to consume.
328///
329/// This is the convention shared by every HTTP-uploading publisher; see audit
330/// A7 dedup S5.
331pub fn classify_http_sync(
332    result: reqwest::Result<reqwest::blocking::Response>,
333) -> Result<reqwest::blocking::Response, ControlFlow<anyhow::Error, anyhow::Error>> {
334    use anyhow::anyhow;
335    match result {
336        Ok(resp) => {
337            let status = resp.status();
338            if status.is_success() || status.is_redirection() {
339                Ok(resp)
340            } else if status.is_server_error() {
341                Err(ControlFlow::Continue(anyhow!(
342                    "HTTP {} {}",
343                    status.as_u16(),
344                    status.canonical_reason().unwrap_or("server error")
345                )))
346            } else {
347                // 4xx (and any other non-success/redirect/5xx): fast-fail
348                Err(ControlFlow::Break(anyhow!(
349                    "HTTP {} {}",
350                    status.as_u16(),
351                    status.canonical_reason().unwrap_or("client error")
352                )))
353            }
354        }
355        // Transport-layer failure (DNS, connect, TLS, timeout): retry.
356        Err(e) => Err(ControlFlow::Continue(anyhow!(e))),
357    }
358}
359
360// ---------------------------------------------------------------------------
361// Retriable-error classification (mirrors GoReleaser internal/retryx)
362// ---------------------------------------------------------------------------
363
364/// Carries an HTTP status code alongside the original error so
365/// [`is_retriable`] can route 5xx / 429 to retry and 4xx to fast-fail.
366///
367/// Mirrors GoReleaser `retryx.HTTPError`. Construct via [`HttpError::new`]
368/// (status-only) or wrap an existing `reqwest::Response` via
369/// [`HttpError::from_response`].
370///
371/// A `status` of `0` denotes a network-level failure where no response was
372/// ever received (matches GR's `nil resp` branch). Network-level failures
373/// are still classified via the inner error's message, so wrapping them in
374/// `HttpError { status: 0, .. }` does not lose retriability information.
375#[derive(Debug)]
376pub struct HttpError {
377    /// The wrapped error (transport, decode, or status-derived message).
378    /// Reachable via the [`StdError::source`] trait method (not directly).
379    source: Box<dyn StdError + Send + Sync + 'static>,
380    /// HTTP status code; `0` for transport-level failures.
381    pub status: u16,
382}
383
384impl HttpError {
385    /// Wrap an error with a status code. `0` denotes a network-level failure
386    /// (no response received).
387    pub fn new<E>(source: E, status: u16) -> Self
388    where
389        E: StdError + Send + Sync + 'static,
390    {
391        Self {
392            source: Box::new(source),
393            status,
394        }
395    }
396
397    /// Wrap a transport-layer error with the status code from the (possibly
398    /// missing) response. Mirrors GoReleaser `retryx.HTTP(err, resp)`.
399    /// `None` resp yields status `0` (network-level failure).
400    pub fn from_response<E>(err: E, resp: Option<&reqwest::Response>) -> Self
401    where
402        E: StdError + Send + Sync + 'static,
403    {
404        Self::new(err, resp.map(|r| r.status().as_u16()).unwrap_or(0))
405    }
406}
407
408impl fmt::Display for HttpError {
409    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410        // Defer to the inner error so messages stay focused on the cause.
411        // Mirrors GR `(e HTTPError) Error() string { return e.Err.Error() }`.
412        fmt::Display::fmt(&self.source, f)
413    }
414}
415
416impl StdError for HttpError {
417    fn source(&self) -> Option<&(dyn StdError + 'static)> {
418        Some(&*self.source)
419    }
420}
421
422/// Marker error wrapping any inner error so [`is_retriable`] returns `true`
423/// regardless of class. Mirrors GoReleaser `retryx.Retriable` — useful when a
424/// caller knows the failure is transient (e.g. an idempotent registry write
425/// returning 422 because of a transient race condition) and wants the retry
426/// loop to ignore the usual 4xx fast-fail.
427#[derive(Debug)]
428pub struct Retriable(Box<dyn StdError + Send + Sync + 'static>);
429
430impl Retriable {
431    /// Wrap any error so [`is_retriable`] returns `true` regardless of class.
432    /// Use this when a caller knows a 4xx is transient (e.g. a 422 from an
433    /// idempotent registry write losing a race) and wants to override the
434    /// usual fast-fail. For `Option<E>` inputs, see [`is_retriable_opt`] —
435    /// this constructor itself is non-nullable.
436    pub fn new<E>(source: E) -> Self
437    where
438        E: StdError + Send + Sync + 'static,
439    {
440        Self(Box::new(source))
441    }
442}
443
444impl fmt::Display for Retriable {
445    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
446        fmt::Display::fmt(&self.0, f)
447    }
448}
449
450impl StdError for Retriable {
451    fn source(&self) -> Option<&(dyn StdError + 'static)> {
452        Some(&*self.0)
453    }
454}
455
456/// Returns `true` if the message looks like a transient network-layer failure.
457///
458/// Mirrors GoReleaser `retryx.IsNetworkError` and extends it for Rust /
459/// Windows. Each link in the error chain is checked two ways:
460///
461/// 1a. **Structural [`io::ErrorKind`] check** via `downcast_ref::<io::Error>()`.
462///     Treats `UnexpectedEof`, `TimedOut`, `ConnectionRefused`,
463///     `ConnectionReset`, `ConnectionAborted`, and `BrokenPipe` as transient.
464///     The OS-classified `ErrorKind` is robust where Display text is not:
465///     Linux's connect-refused says `"Connection refused"` but Windows
466///     surfaces a transient connect failure as
467///     `io::Error { kind: TimedOut, message: "operation timed out" }`, and
468///     a Windows-reset reads `"An existing connection was forcibly closed"`.
469///     Matching `kind()` catches all of them regardless of phrasing. Also
470///     recognises any `io::Error` whose Display form is `"EOF"` /
471///     `"unexpected eof"` (rustls / hyper convention; Rust has no
472///     equivalent of Go's `io.EOF` sentinel).
473///
474/// 1b. **Substring match on the lowercased Display form** against
475///     [`NETWORK_ERROR_NEEDLES`]. Covers the GR-parity surface plus the
476///     Windows / Rust-stdlib phrasings that bypass the kind check when an
477///     error has been wrapped (e.g. reqwest coercing the inner kind to
478///     `Other` while preserving the OS message text).
479///
480/// Walks `.source()` for both branches — Rust's `Display` impls do NOT
481/// inherit the wrapped error's text the way Go's `err.Error()` does, so a
482/// reqwest "Connection refused" message buried under an anyhow context would
483/// otherwise be invisible to the head-only string.
484pub fn is_network_error(err: &(dyn StdError + 'static)) -> bool {
485    let mut cur: Option<&(dyn StdError + 'static)> = Some(err);
486    while let Some(e) = cur {
487        // 1a. Structural ErrorKind check — robust to platform Display drift
488        //     (Windows's "operation timed out" vs Linux's "Connection refused").
489        if let Some(io_err) = e.downcast_ref::<io::Error>() {
490            match io_err.kind() {
491                io::ErrorKind::UnexpectedEof
492                | io::ErrorKind::TimedOut
493                | io::ErrorKind::ConnectionRefused
494                | io::ErrorKind::ConnectionReset
495                | io::ErrorKind::ConnectionAborted
496                | io::ErrorKind::BrokenPipe => return true,
497                _ => {}
498            }
499            let m = io_err.to_string().to_lowercase();
500            if m == "eof" || m == "unexpected eof" {
501                return true;
502            }
503        }
504
505        // 1b. Substring match on each link's own Display (NOT the full
506        //     chain "{e:#}" form, which would double-count the same text on
507        //     deeper links). Lowercased once per link.
508        let s = e.to_string().to_lowercase();
509        if NETWORK_ERROR_NEEDLES.iter().any(|n| s.contains(n)) {
510            return true;
511        }
512
513        cur = e.source();
514    }
515    false
516}
517
518/// The set of substrings classified as transient.
519///
520/// The first nine entries mirror GoReleaser's `retryx.IsNetworkError`
521/// (matching is case-insensitive). The remaining entries cover Windows and
522/// Rust-stdlib phrasings of transient transport failures that surface when
523/// an `io::Error` has been wrapped by a higher layer (reqwest, hyper,
524/// anyhow), losing the original `ErrorKind` classification but preserving
525/// the OS message text. Without these, every publisher running on Windows
526/// fast-failed on the first transient connect blip instead of retrying.
527const NETWORK_ERROR_NEEDLES: &[&str] = &[
528    "connection reset",
529    "network is unreachable",
530    "connection closed",
531    "connection refused",
532    "tls handshake timeout",
533    "i/o timeout",
534    "broken pipe",
535    "timeout awaiting response headers",
536    "context deadline exceeded",
537    // Windows + macOS phrasing of ErrorKind::TimedOut after wrapping.
538    "operation timed out",
539    // Windows ErrorKind::ConnectionAborted phrasing.
540    "the network connection was aborted",
541    // Windows ErrorKind::ConnectionReset phrasing.
542    "an existing connection was forcibly closed",
543    // hyper-util / reqwest DNS-resolution failures wrapped through the
544    // connector. Surfaces as `client error (Connect): dns error: ...` with
545    // a platform-specific resolver tail ("Name or service not known" on
546    // Linux/glibc, "nodename nor servname provided, or not known" on macOS,
547    // "No such host is known" on Windows). The leading "dns error" prefix
548    // is the cross-platform constant.
549    "dns error",
550    // GAI (getaddrinfo) wording across resolvers; covers the Linux
551    // resolver tail above and BSD/macOS phrasing.
552    "failed to lookup address",
553    // Windows resolver tail when DNS-resolution fails.
554    "no such host is known",
555];
556
557/// Classify an error as retriable (mirrors GoReleaser `retryx.IsRetriable`).
558///
559/// Returns `true` for:
560/// - any [`is_network_error`] match (substring + EOF / UnexpectedEof in the
561///   `source()` chain)
562/// - any error whose chain contains a [`Retriable`] wrapper
563/// - any error whose chain contains an [`HttpError`] with status `>= 500`
564///   or status `429` (Too Many Requests)
565///
566/// Returns `false` for plain errors and 4xx HTTP errors (other than 429) —
567/// those are fast-failed by the retry loop.
568pub fn is_retriable(err: &(dyn StdError + 'static)) -> bool {
569    // 1. Any link in the chain is an explicit Retriable marker.
570    let mut cur: Option<&(dyn StdError + 'static)> = Some(err);
571    while let Some(e) = cur {
572        if e.is::<Retriable>() {
573            return true;
574        }
575        if let Some(http) = e.downcast_ref::<HttpError>()
576            && (http.status >= 500 || http.status == 429)
577        {
578            return true;
579        }
580        cur = e.source();
581    }
582
583    // 2. Network-error substring / EOF chain match.
584    is_network_error(err)
585}
586
587/// Convenience: `None` passes through as `false`. Mirrors GoReleaser's
588/// `IsRetriable(nil) -> false` semantics.
589pub fn is_retriable_opt(err: Option<&(dyn StdError + 'static)>) -> bool {
590    err.is_some_and(is_retriable)
591}
592
593/// Apply ±20 % pseudo-jitter to `base` using a cheap subsecond-nanos modulo.
594///
595/// Returns a value in `[base * 0.8, base * 1.2)`. No `rand` crate dependency:
596/// `SystemTime::now().subsec_nanos()` provides ~nanosecond entropy that is
597/// sufficient for retry jitter (the goal is spreading out concurrent retriers,
598/// not cryptographic unpredictability).
599///
600/// The ±20 % window is a widely-adopted convention (AWS SDK, GCP client libs).
601/// Jitter only ever widens the sleep by up to 20 %; it never shortens it below
602/// 80 % of the nominal delay, so `Retry-After` honoring is conservative.
603pub fn jitter_duration(base: Duration) -> Duration {
604    let nanos = base.as_nanos() as u64;
605    // 20 % of the nominal duration.
606    let window = nanos / 5;
607    if window == 0 {
608        return base;
609    }
610    // Cheap pseudo-random offset in [0, window * 2) centred on window,
611    // giving a net range of [base - window, base + window). Routed
612    // through `sde::resolve_now()` so jitter collapses to a constant
613    // under `SOURCE_DATE_EPOCH` (no subsec precision) — required for
614    // determinism-harness byte-equality; real jitter is preserved in
615    // prod where the helper falls back to `Utc::now()`.
616    let seed = crate::sde::resolve_now().timestamp_subsec_nanos() as u64;
617    let offset = seed % (window * 2);
618    // Saturating arithmetic so we never panic on extreme values.
619    let jittered = nanos.saturating_sub(window).saturating_add(offset);
620    Duration::from_nanos(jittered)
621}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626    use std::sync::atomic::{AtomicU32, Ordering};
627
628    fn fast_policy() -> RetryPolicy {
629        RetryPolicy {
630            max_attempts: 4,
631            base_delay: Duration::from_millis(1),
632            max_delay: Duration::from_millis(5),
633        }
634    }
635
636    #[test]
637    fn delay_progression_caps_at_max() {
638        let p = RetryPolicy {
639            max_attempts: 10,
640            base_delay: Duration::from_millis(100),
641            max_delay: Duration::from_millis(500),
642        };
643        assert_eq!(p.delay_for(2), Duration::from_millis(100));
644        assert_eq!(p.delay_for(3), Duration::from_millis(200));
645        assert_eq!(p.delay_for(4), Duration::from_millis(400));
646        assert_eq!(p.delay_for(5), Duration::from_millis(500)); // capped
647        assert_eq!(p.delay_for(8), Duration::from_millis(500)); // capped
648    }
649
650    #[test]
651    fn sync_succeeds_on_first_attempt() {
652        let calls = AtomicU32::new(0);
653        let result: Result<&str, ()> = retry_sync(&fast_policy(), |_| {
654            calls.fetch_add(1, Ordering::SeqCst);
655            Ok("ok")
656        });
657        assert_eq!(result, Ok("ok"));
658        assert_eq!(calls.load(Ordering::SeqCst), 1);
659    }
660
661    #[test]
662    fn sync_retries_until_success() {
663        let calls = AtomicU32::new(0);
664        let result: Result<u32, &str> = retry_sync(&fast_policy(), |attempt| {
665            calls.fetch_add(1, Ordering::SeqCst);
666            if attempt < 3 {
667                Err(ControlFlow::Continue("transient"))
668            } else {
669                Ok(attempt)
670            }
671        });
672        assert_eq!(result, Ok(3));
673        assert_eq!(calls.load(Ordering::SeqCst), 3);
674    }
675
676    #[test]
677    fn sync_break_stops_immediately() {
678        let calls = AtomicU32::new(0);
679        let result: Result<(), &str> = retry_sync(&fast_policy(), |_| {
680            calls.fetch_add(1, Ordering::SeqCst);
681            Err(ControlFlow::Break("fatal"))
682        });
683        assert_eq!(result, Err("fatal"));
684        assert_eq!(calls.load(Ordering::SeqCst), 1);
685    }
686
687    #[test]
688    fn sync_returns_last_error_after_exhaustion() {
689        let calls = AtomicU32::new(0);
690        let result: Result<(), String> = retry_sync(&fast_policy(), |attempt| {
691            calls.fetch_add(1, Ordering::SeqCst);
692            Err(ControlFlow::Continue(format!("fail {attempt}")))
693        });
694        assert_eq!(result, Err("fail 4".to_string()));
695        assert_eq!(calls.load(Ordering::SeqCst), 4);
696    }
697
698    #[tokio::test]
699    async fn async_retries_until_success() {
700        let calls = std::sync::Arc::new(AtomicU32::new(0));
701        let calls_inner = calls.clone();
702        let result: Result<u32, &str> = retry_async(&fast_policy(), move |attempt| {
703            let c = calls_inner.clone();
704            async move {
705                c.fetch_add(1, Ordering::SeqCst);
706                if attempt < 2 {
707                    Err(ControlFlow::Continue("transient"))
708                } else {
709                    Ok(attempt)
710                }
711            }
712        })
713        .await;
714        assert_eq!(result, Ok(2));
715        assert_eq!(calls.load(Ordering::SeqCst), 2);
716    }
717
718    // -----------------------------------------------------------------------
719    // is_network_error / is_retriable / HttpError / Retriable
720    //
721    // Mirrors GoReleaser internal/retryx/retryx_test.go test cases.
722    // -----------------------------------------------------------------------
723
724    /// Plain string error wrapper used in classification tests.
725    #[derive(Debug)]
726    struct StrErr(&'static str);
727    impl fmt::Display for StrErr {
728        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
729            f.write_str(self.0)
730        }
731    }
732    impl StdError for StrErr {}
733
734    #[derive(Debug)]
735    struct OwnedErr(String);
736    impl fmt::Display for OwnedErr {
737        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
738            f.write_str(&self.0)
739        }
740    }
741    impl StdError for OwnedErr {}
742
743    #[test]
744    fn network_error_substrings_match() {
745        for s in [
746            "connection reset by peer",
747            "network is unreachable",
748            "connection closed unexpectedly",
749            "connection refused",
750            "tls handshake timeout",
751            "i/o timeout",
752            "CONNECTION RESET",
753            "TLS Handshake Timeout",
754            "write: broken pipe",
755            "net/http: timeout awaiting response headers",
756            "context deadline exceeded",
757            // DNS-resolution failures across platforms (hyper-util connector
758            // surfaces these via reqwest as `client error (Connect): dns
759            // error: <platform tail>`). Pin every tail we know about so a
760            // cross-platform CI failure cannot reintroduce the gap.
761            "client error (Connect): dns error: failed to lookup address information: Name or service not known",
762            "dns error: nodename nor servname provided, or not known",
763            "dns error: No such host is known. (os error 11001)",
764        ] {
765            let e = OwnedErr(s.to_string());
766            assert!(is_network_error(&e), "expected network error: {s:?}");
767        }
768    }
769
770    #[test]
771    fn network_error_io_eof_kinds() {
772        let e = io::Error::from(io::ErrorKind::UnexpectedEof);
773        assert!(is_network_error(&e));
774
775        // A custom-kind io::Error whose Display is "EOF" (rustls / hyper convention).
776        let e2 = io::Error::other("EOF");
777        assert!(is_network_error(&e2));
778    }
779
780    // Windows-CI regression: connect() on Windows surfaces transient failures
781    // as io::Error { kind: TimedOut, message: "operation timed out" }, neither
782    // of which matched the original EOF-only kind check or the GR-parity
783    // needle list. Same shape for the connection-* kinds across platforms —
784    // pin each branch.
785
786    #[test]
787    fn is_network_error_classifies_io_timedout() {
788        let e = io::Error::from(io::ErrorKind::TimedOut);
789        assert!(is_network_error(&e));
790        assert!(is_retriable(&e));
791    }
792
793    #[test]
794    fn is_network_error_classifies_io_connection_refused() {
795        let e = io::Error::from(io::ErrorKind::ConnectionRefused);
796        assert!(is_network_error(&e));
797        assert!(is_retriable(&e));
798    }
799
800    #[test]
801    fn is_network_error_classifies_io_connection_reset() {
802        let e = io::Error::from(io::ErrorKind::ConnectionReset);
803        assert!(is_network_error(&e));
804        assert!(is_retriable(&e));
805    }
806
807    #[test]
808    fn is_network_error_classifies_io_connection_aborted() {
809        let e = io::Error::from(io::ErrorKind::ConnectionAborted);
810        assert!(is_network_error(&e));
811        assert!(is_retriable(&e));
812    }
813
814    #[test]
815    fn is_network_error_classifies_io_broken_pipe() {
816        let e = io::Error::from(io::ErrorKind::BrokenPipe);
817        assert!(is_network_error(&e));
818        assert!(is_retriable(&e));
819    }
820
821    #[test]
822    fn is_network_error_classifies_operation_timed_out_substring() {
823        // Simulate a reqwest- or hyper-wrapped error whose io::ErrorKind has
824        // been coerced to Other but whose Display still carries the Windows /
825        // macOS TimedOut phrasing. Both the substring path and the
826        // ErrorKind path must classify this independently.
827        let other_kind = io::Error::other("operation timed out");
828        assert!(is_network_error(&other_kind));
829        assert!(is_retriable(&other_kind));
830
831        let kind_only = io::Error::from(io::ErrorKind::TimedOut);
832        assert!(is_network_error(&kind_only));
833        assert!(is_retriable(&kind_only));
834    }
835
836    #[test]
837    fn network_error_wrapped_unexpected_eof() {
838        // Wrap an UnexpectedEof in an outer error so chain-walking is exercised.
839        #[derive(Debug)]
840        struct Wrap(io::Error);
841        impl fmt::Display for Wrap {
842            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
843                write!(f, "read failed")
844            }
845        }
846        impl StdError for Wrap {
847            fn source(&self) -> Option<&(dyn StdError + 'static)> {
848                Some(&self.0)
849            }
850        }
851        let inner = io::Error::from(io::ErrorKind::UnexpectedEof);
852        let outer = Wrap(inner);
853        assert!(is_network_error(&outer));
854    }
855
856    #[test]
857    fn network_error_non_network_strings_reject() {
858        for s in [
859            "file not found",
860            "permission denied",
861            "dial tcp: lookup example.com: no such host",
862            "",
863        ] {
864            let e = OwnedErr(s.to_string());
865            assert!(!is_network_error(&e), "expected NOT network error: {s:?}");
866        }
867    }
868
869    #[test]
870    fn retriable_opt_nil_passthrough() {
871        assert!(!is_retriable_opt(None));
872    }
873
874    #[test]
875    fn http_error_500_retriable() {
876        let e = HttpError::new(StrErr("internal server error"), 500);
877        assert!(is_retriable(&e));
878    }
879
880    #[test]
881    fn http_error_502_503_retriable() {
882        for s in [502u16, 503] {
883            let e = HttpError::new(StrErr("bad gateway"), s);
884            assert!(is_retriable(&e), "status {s} should be retriable");
885        }
886    }
887
888    #[test]
889    fn http_error_429_retriable() {
890        let e = HttpError::new(StrErr("rate limited"), 429);
891        assert!(is_retriable(&e));
892    }
893
894    #[test]
895    fn http_error_4xx_not_retriable() {
896        for s in [400u16, 401, 403, 404, 422] {
897            let e = HttpError::new(StrErr("client err"), s);
898            assert!(!is_retriable(&e), "status {s} should NOT be retriable");
899        }
900    }
901
902    #[test]
903    fn http_error_zero_status_routes_via_message() {
904        // Status 0 == network-level failure with no response. Retriability
905        // falls back to the network-error substring matcher on the inner.
906        let net = HttpError::new(StrErr("connection reset"), 0);
907        assert!(is_retriable(&net));
908
909        let non_net = HttpError::new(StrErr("dial failed"), 0);
910        assert!(!is_retriable(&non_net));
911    }
912
913    #[test]
914    fn http_error_unwrap_chain_visible() {
915        let inner = StrErr("inner");
916        let e = HttpError::new(inner, 503);
917        assert!(e.source().is_some());
918    }
919
920    #[test]
921    fn from_response_nil_resp_yields_status_zero() {
922        // Mirrors GR `retryx.HTTP(err, nil)` — no response means status 0.
923        // Use a concrete `io::Error` since `reqwest::Error` cannot be
924        // synthesised in tests; the API accepts any `E: StdError + Send + Sync`.
925        let inner = io::Error::other("connect: dial tcp");
926        let e = HttpError::from_response(inner, None);
927        assert_eq!(e.status, 0);
928    }
929
930    #[test]
931    fn from_response_unwrap_chain_visible() {
932        // The inner error must remain reachable via the StdError chain so
933        // is_retriable's network-error matcher can still see the cause.
934        let inner = io::Error::other("connection reset by peer");
935        let e = HttpError::from_response(inner, None);
936        assert!(
937            e.source().is_some(),
938            "inner error must be reachable via source()"
939        );
940        // And classification must walk through to the network-error matcher.
941        assert!(is_retriable(&e));
942    }
943
944    #[test]
945    fn retriable_wrapper_is_retriable() {
946        let e = Retriable::new(StrErr("retry me"));
947        assert!(is_retriable(&e));
948    }
949
950    #[test]
951    fn retriable_wrapper_overrides_4xx() {
952        // GR test: a 422 wrapped in Retriable is still retriable.
953        let inner = HttpError::new(StrErr("exists"), 422);
954        let outer = Retriable::new(inner);
955        assert!(is_retriable(&outer));
956    }
957
958    #[test]
959    fn retriable_wrapper_unwrap_chain_visible() {
960        let inner = StrErr("inner");
961        let e = Retriable::new(inner);
962        assert!(e.source().is_some());
963    }
964
965    #[test]
966    fn plain_error_not_retriable() {
967        let e = StrErr("something");
968        assert!(!is_retriable(&e));
969    }
970
971    #[test]
972    fn anyhow_error_threadable() {
973        // Ensure is_retriable works through anyhow::Error's deref-to-dyn path
974        // (which is the canonical caller form across the codebase).
975        let e: anyhow::Error = anyhow::anyhow!("connection refused");
976        assert!(is_retriable(e.as_ref()));
977
978        let e2: anyhow::Error = anyhow::anyhow!("permission denied");
979        assert!(!is_retriable(e2.as_ref()));
980    }
981
982    #[test]
983    fn is_retriable_chain_walks_to_http_error() {
984        // An anyhow::Error wrapping a concrete HttpError must be classified
985        // by walking source(), not by Display alone — the message "outer"
986        // gives no hint, the 503 status does.
987        let inner = HttpError::new(StrErr("bad gateway"), 503);
988        let wrapped: anyhow::Error = anyhow::Error::new(inner).context("publish failed");
989        assert!(is_retriable(wrapped.as_ref()));
990    }
991
992    // ----- as_ref vs root_cause drift guard ---------------------------------
993    //
994    // Every consumer of `retry_http_blocking` (artifactory, cloudsmith, the
995    // future stage-blob upload paths) classifies via `is_retriable(err.as_ref())`.
996    // A subtle but catastrophic regression is to "simplify" that to
997    // `is_retriable(err.root_cause())`, which walks past the HttpError wrapper
998    // to the leaf io::Error — at which point 5xx misclassifies as fast-fail
999    // (the leaf has no status code), and the entire retry policy becomes a
1000    // no-op. These tests pin the distinction once at the helper's home.
1001
1002    #[test]
1003    fn classifier_5xx_via_anyhow_chain_uses_as_ref() {
1004        let wrapped: anyhow::Error =
1005            anyhow::Error::new(HttpError::new(std::io::Error::other("503"), 503))
1006                .context("publish");
1007        assert!(
1008            is_retriable(wrapped.as_ref()),
1009            "5xx HttpError reached via as_ref() must classify retriable"
1010        );
1011    }
1012
1013    #[test]
1014    fn classifier_root_cause_walks_past_http_error_drift_guard() {
1015        // Drift guard: root_cause() unwraps to the leaf io::Error, which
1016        // has no status. If a future caller ever swaps as_ref → root_cause
1017        // they'll regress 5xx retry handling. This assertion locks the
1018        // distinction.
1019        let wrapped: anyhow::Error =
1020            anyhow::Error::new(HttpError::new(std::io::Error::other("503"), 503))
1021                .context("publish");
1022        assert!(
1023            !is_retriable(wrapped.root_cause()),
1024            "root_cause() walks past HttpError; 5xx must NOT be detected via the leaf"
1025        );
1026    }
1027
1028    #[test]
1029    fn classifier_429_via_anyhow_chain_uses_as_ref() {
1030        // Symmetry with the 5xx case: 429 is the other retriable status
1031        // class and must also stay reachable via as_ref().
1032        let wrapped: anyhow::Error =
1033            anyhow::Error::new(HttpError::new(std::io::Error::other("429"), 429))
1034                .context("publish");
1035        assert!(is_retriable(wrapped.as_ref()));
1036        assert!(!is_retriable(wrapped.root_cause()));
1037    }
1038
1039    // ----- retry_http_blocking behavioural tests ---------------------------
1040    //
1041    // `reqwest::Error` has no public constructor, so the transport-error
1042    // branch is exercised indirectly via per-publisher integration tests
1043    // (which mock at the network layer). The unit tests here drive a tiny
1044    // hand-rolled TCP server so we can exercise the success / non-success
1045    // status branches with a real reqwest::blocking::Client end-to-end.
1046
1047    use crate::test_helpers::responder::spawn_oneshot_http_responder;
1048
1049    #[test]
1050    fn retry_http_blocking_success_returns_first_attempt() {
1051        let (addr, calls) =
1052            spawn_oneshot_http_responder(vec!["HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok"]);
1053        let client = reqwest::blocking::Client::builder()
1054            .timeout(Duration::from_secs(2))
1055            .build()
1056            .expect("client");
1057        let policy = RetryPolicy {
1058            max_attempts: 3,
1059            base_delay: Duration::from_millis(1),
1060            max_delay: Duration::from_millis(2),
1061        };
1062        let result = retry_http_blocking(
1063            "test",
1064            &policy,
1065            SuccessClass::Strict,
1066            |_| client.get(format!("http://{addr}/")).send(),
1067            |_, _| String::from("should not be called on success"),
1068        );
1069        let (status, body) = result.expect("success");
1070        assert_eq!(status.as_u16(), 200);
1071        assert_eq!(body, "ok");
1072        assert_eq!(calls.load(Ordering::SeqCst), 1, "single attempt");
1073    }
1074
1075    #[test]
1076    fn retry_http_blocking_retries_5xx_then_succeeds() {
1077        let (addr, calls) = spawn_oneshot_http_responder(vec![
1078            "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n",
1079            "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok",
1080        ]);
1081        let client = reqwest::blocking::Client::builder()
1082            .timeout(Duration::from_secs(2))
1083            .build()
1084            .expect("client");
1085        let policy = RetryPolicy {
1086            max_attempts: 3,
1087            base_delay: Duration::from_millis(1),
1088            max_delay: Duration::from_millis(2),
1089        };
1090        let result = retry_http_blocking(
1091            "test",
1092            &policy,
1093            SuccessClass::Strict,
1094            |_| client.get(format!("http://{addr}/")).send(),
1095            |status, body| format!("{status}: {body}"),
1096        );
1097        let (status, _) = result.expect("eventually succeeds");
1098        assert_eq!(status.as_u16(), 200);
1099        assert_eq!(calls.load(Ordering::SeqCst), 2, "one retry then success");
1100    }
1101
1102    #[test]
1103    fn retry_http_blocking_4xx_fast_fails_no_retry() {
1104        let (addr, calls) = spawn_oneshot_http_responder(vec![
1105            "HTTP/1.1 404 Not Found\r\nContent-Length: 9\r\n\r\nnot found",
1106        ]);
1107        let client = reqwest::blocking::Client::builder()
1108            .timeout(Duration::from_secs(2))
1109            .build()
1110            .expect("client");
1111        let policy = RetryPolicy {
1112            max_attempts: 5,
1113            base_delay: Duration::from_millis(1),
1114            max_delay: Duration::from_millis(2),
1115        };
1116        let result = retry_http_blocking(
1117            "myscope",
1118            &policy,
1119            SuccessClass::Strict,
1120            |_| client.get(format!("http://{addr}/")).send(),
1121            |status, body| format!("custom error: {status} body={body}"),
1122        );
1123        let err = result.expect_err("4xx must fast-fail");
1124        let chain = format!("{err:#}");
1125        assert!(
1126            chain.contains("custom error"),
1127            "error formatter must be invoked on non-success; got: {chain}"
1128        );
1129        assert!(chain.contains("404"), "status must be in chain: {chain}");
1130        assert_eq!(
1131            calls.load(Ordering::SeqCst),
1132            1,
1133            "4xx must NOT retry (only one connection accepted)"
1134        );
1135    }
1136
1137    #[test]
1138    fn retry_http_blocking_redirect_class_alters_success_predicate() {
1139        let (addr, _calls) = spawn_oneshot_http_responder(vec![
1140            "HTTP/1.1 307 Temporary Redirect\r\nLocation: /next\r\nContent-Length: 0\r\n\r\n",
1141        ]);
1142        let client = reqwest::blocking::Client::builder()
1143            .timeout(Duration::from_secs(2))
1144            // Disable redirect-following so the 307 surfaces to our helper.
1145            .redirect(reqwest::redirect::Policy::none())
1146            .build()
1147            .expect("client");
1148        let policy = RetryPolicy {
1149            max_attempts: 3,
1150            base_delay: Duration::from_millis(1),
1151            max_delay: Duration::from_millis(2),
1152        };
1153        let result = retry_http_blocking(
1154            "test",
1155            &policy,
1156            SuccessClass::AllowRedirects,
1157            |_| client.get(format!("http://{addr}/")).send(),
1158            |_, _| String::from("should not be called on 3xx with AllowRedirects"),
1159        );
1160        let (status, _) = result.expect("3xx is success under AllowRedirects");
1161        assert_eq!(status.as_u16(), 307);
1162    }
1163
1164    // ----- retry_http_async behavioural tests ------------------------------
1165    //
1166    // Mirrors the blocking suite but drives an async reqwest::Client against
1167    // the same hand-rolled TCP responder (running on a worker thread, so the
1168    // tokio reactor is free to drive the client futures). The transport-error
1169    // arm (Err(reqwest::Error)) is exercised by
1170    // `retry_http_{async,blocking}_transport_error_retries_then_fails` below,
1171    // which bind an ephemeral port, drop the listener, then point the client
1172    // at the now-defunct address.
1173
1174    #[tokio::test]
1175    async fn retry_http_async_success_returns_first_attempt() {
1176        let (addr, calls) =
1177            spawn_oneshot_http_responder(vec!["HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok"]);
1178        let client = reqwest::Client::builder()
1179            .timeout(Duration::from_secs(2))
1180            .build()
1181            .expect("client");
1182        let policy = RetryPolicy {
1183            max_attempts: 3,
1184            base_delay: Duration::from_millis(1),
1185            max_delay: Duration::from_millis(2),
1186        };
1187        let result = retry_http_async(
1188            "test",
1189            &policy,
1190            SuccessClass::Strict,
1191            |_| client.get(format!("http://{addr}/")).send(),
1192            |_, _| String::from("should not be called on success"),
1193        )
1194        .await;
1195        let resp = result.expect("success");
1196        assert_eq!(resp.status().as_u16(), 200);
1197        let body = resp.text().await.expect("body");
1198        assert_eq!(body, "ok");
1199        assert_eq!(calls.load(Ordering::SeqCst), 1, "single attempt");
1200    }
1201
1202    #[tokio::test]
1203    async fn retry_http_async_retries_5xx_then_succeeds() {
1204        let (addr, calls) = spawn_oneshot_http_responder(vec![
1205            "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n",
1206            "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok",
1207        ]);
1208        let client = reqwest::Client::builder()
1209            .timeout(Duration::from_secs(2))
1210            .build()
1211            .expect("client");
1212        let policy = RetryPolicy {
1213            max_attempts: 3,
1214            base_delay: Duration::from_millis(1),
1215            max_delay: Duration::from_millis(2),
1216        };
1217        let result = retry_http_async(
1218            "test",
1219            &policy,
1220            SuccessClass::Strict,
1221            |_| client.get(format!("http://{addr}/")).send(),
1222            |status, body| format!("{status}: {body}"),
1223        )
1224        .await;
1225        let resp = result.expect("eventually succeeds");
1226        assert_eq!(resp.status().as_u16(), 200);
1227        assert_eq!(calls.load(Ordering::SeqCst), 2, "one retry then success");
1228    }
1229
1230    #[tokio::test]
1231    async fn retry_http_async_4xx_fast_fails_no_retry() {
1232        let (addr, calls) = spawn_oneshot_http_responder(vec![
1233            "HTTP/1.1 404 Not Found\r\nContent-Length: 9\r\n\r\nnot found",
1234        ]);
1235        let client = reqwest::Client::builder()
1236            .timeout(Duration::from_secs(2))
1237            .build()
1238            .expect("client");
1239        let policy = RetryPolicy {
1240            max_attempts: 5,
1241            base_delay: Duration::from_millis(1),
1242            max_delay: Duration::from_millis(2),
1243        };
1244        let result = retry_http_async(
1245            "myscope",
1246            &policy,
1247            SuccessClass::Strict,
1248            |_| client.get(format!("http://{addr}/")).send(),
1249            |status, body| format!("custom error: {status} body={body}"),
1250        )
1251        .await;
1252        let err = result.expect_err("4xx must fast-fail");
1253        let chain = format!("{err:#}");
1254        assert!(
1255            chain.contains("custom error"),
1256            "error formatter must be invoked on non-success; got: {chain}"
1257        );
1258        assert!(chain.contains("404"), "status must be in chain: {chain}");
1259        assert_eq!(
1260            calls.load(Ordering::SeqCst),
1261            1,
1262            "4xx must NOT retry (only one connection accepted)"
1263        );
1264    }
1265
1266    #[tokio::test]
1267    async fn retry_http_async_429_retries_then_succeeds() {
1268        // 429 (Too Many Requests) is the second retriable class alongside
1269        // 5xx. Ensures the helper doesn't accidentally fast-fail on rate
1270        // limits — a regression here would defeat the whole point of
1271        // wiring retry into release publishers.
1272        let (addr, calls) = spawn_oneshot_http_responder(vec![
1273            "HTTP/1.1 429 Too Many Requests\r\nContent-Length: 0\r\n\r\n",
1274            "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok",
1275        ]);
1276        let client = reqwest::Client::builder()
1277            .timeout(Duration::from_secs(2))
1278            .build()
1279            .expect("client");
1280        let policy = RetryPolicy {
1281            max_attempts: 3,
1282            base_delay: Duration::from_millis(1),
1283            max_delay: Duration::from_millis(2),
1284        };
1285        let result = retry_http_async(
1286            "test",
1287            &policy,
1288            SuccessClass::Strict,
1289            |_| client.get(format!("http://{addr}/")).send(),
1290            |status, body| format!("{status}: {body}"),
1291        )
1292        .await;
1293        let resp = result.expect("429 retried then success");
1294        assert_eq!(resp.status().as_u16(), 200);
1295        assert_eq!(calls.load(Ordering::SeqCst), 2);
1296    }
1297
1298    // ----- transport-error behavioural tests -------------------------------
1299    //
1300    // The transport-error arm (Err(reqwest::Error): DNS failure, connection
1301    // refused, EOF, TLS handshake failure, etc.) is the single most
1302    // reviewer-load-bearing path: it is the one the helper claims to retry
1303    // and that publishers rely on for resilience against transient network
1304    // blips. The pattern below dials the RFC 2606-reserved `.invalid` TLD,
1305    // which is guaranteed never to resolve, so every attempt fails at the
1306    // DNS-resolution stage in a few milliseconds on Linux, macOS, and
1307    // Windows alike.
1308    //
1309    // We verify:
1310    //   1. the helper retries (attempt counter > 1)
1311    //   2. eventually surfaces an Err with the configured label in the chain
1312    // The outer attempt counter is incremented inside the closure, so it
1313    // sees one bump per attempt regardless of the underlying transport
1314    // outcome.
1315    //
1316    // RFC 2606 (https://datatracker.ietf.org/doc/html/rfc2606) reserves the
1317    // `.invalid` TLD precisely for this purpose; using it removes any
1318    // dependence on OS-level TCP semantics (Windows' kernel can retransmit
1319    // SYN against an unbound loopback port until the connect timeout fires
1320    // rather than refusing synchronously like Linux + macOS do).
1321    const TRANSPORT_FAIL_URL: &str = "http://nonexistent.invalid/";
1322
1323    #[test]
1324    fn retry_http_blocking_transport_error_retries_then_fails() {
1325        let attempts = std::sync::Arc::new(AtomicU32::new(0));
1326        let attempts_inner = attempts.clone();
1327        let client = reqwest::blocking::Client::builder()
1328            .timeout(Duration::from_millis(500))
1329            .build()
1330            .expect("client");
1331        let policy = RetryPolicy {
1332            max_attempts: 3,
1333            base_delay: Duration::from_millis(1),
1334            max_delay: Duration::from_millis(2),
1335        };
1336        let result = retry_http_blocking(
1337            "test-transport",
1338            &policy,
1339            SuccessClass::Strict,
1340            |_| {
1341                attempts_inner.fetch_add(1, Ordering::SeqCst);
1342                client.get(TRANSPORT_FAIL_URL).send()
1343            },
1344            |_, _| String::from("non-success branch should not be reached"),
1345        );
1346        let err = result.expect_err("transport error must surface as Err");
1347        let chain = format!("{err:#}");
1348        assert!(
1349            attempts.load(Ordering::SeqCst) > 1,
1350            "transport error must be retried; got {} attempts; chain={chain}",
1351            attempts.load(Ordering::SeqCst)
1352        );
1353        assert!(
1354            chain.contains("test-transport"),
1355            "label must surface in error chain; got: {chain}"
1356        );
1357    }
1358
1359    #[tokio::test]
1360    async fn retry_http_async_transport_error_retries_then_fails() {
1361        let attempts = std::sync::Arc::new(AtomicU32::new(0));
1362        let attempts_inner = attempts.clone();
1363        let client = reqwest::Client::builder()
1364            .timeout(Duration::from_millis(500))
1365            .build()
1366            .expect("client");
1367        let policy = RetryPolicy {
1368            max_attempts: 3,
1369            base_delay: Duration::from_millis(1),
1370            max_delay: Duration::from_millis(2),
1371        };
1372        let result = retry_http_async(
1373            "test-transport-async",
1374            &policy,
1375            SuccessClass::Strict,
1376            |_| {
1377                attempts_inner.fetch_add(1, Ordering::SeqCst);
1378                client.get(TRANSPORT_FAIL_URL).send()
1379            },
1380            |_, _| String::from("non-success branch should not be reached"),
1381        )
1382        .await;
1383        let err = result.expect_err("transport error must surface as Err");
1384        assert!(
1385            attempts.load(Ordering::SeqCst) > 1,
1386            "transport error must be retried; got {} attempts",
1387            attempts.load(Ordering::SeqCst)
1388        );
1389        let chain = format!("{err:#}");
1390        assert!(
1391            chain.contains("test-transport-async"),
1392            "label must surface in error chain; got: {chain}"
1393        );
1394    }
1395}