Skip to main content

pact_broker_cli/cli/pact_broker/
main.rs

1//! Structs and functions for interacting with a Pact Broker
2
3use std::collections::HashMap;
4use std::ops::Not;
5use std::panic::RefUnwindSafe;
6use std::str::from_utf8;
7
8use anyhow::anyhow;
9use futures::stream::*;
10
11use itertools::Itertools;
12use maplit::hashmap;
13
14use pact_models::http_utils;
15use pact_models::http_utils::HttpAuth;
16use pact_models::json_utils::json_to_string;
17
18#[derive(Debug, Clone)]
19pub struct CustomHeaders {
20    pub headers: std::collections::HashMap<String, String>,
21}
22use pact_models::pact::{Pact, load_pact_from_json};
23use regex::{Captures, Regex};
24use reqwest::{Method, Url};
25use serde::{Deserialize, Serialize};
26use serde_json::{Value, json};
27use serde_with::skip_serializing_none;
28use tracing::{debug, error, info, trace, warn};
29pub mod branches;
30pub mod can_i_deploy;
31pub mod deployments;
32pub mod environments;
33pub mod pact_publish;
34pub mod pacticipants;
35pub mod pacts;
36pub mod provider_states;
37pub mod subcommands;
38pub mod tags;
39#[cfg(test)]
40pub mod test_utils;
41pub mod types;
42pub mod utils;
43pub mod verification;
44pub mod versions;
45pub mod webhooks;
46// for otel
47use crate::cli::utils::{CYAN, GREEN, RED, YELLOW};
48use http::Extensions;
49use opentelemetry::Context;
50use opentelemetry::global;
51use opentelemetry_http::HeaderInjector;
52use reqwest::Request;
53use reqwest::Response;
54use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
55use reqwest_middleware::{Middleware, Next};
56use reqwest_retry::{DefaultRetryableStrategy, Retryable, RetryableStrategy};
57use reqwest_tracing::TracingMiddleware;
58
59use crate::cli::pact_broker::main::types::SslOptions;
60
61pub fn process_notices(notices: &[Notice]) {
62    for notice in notices {
63        let notice_text = notice.text.to_string();
64        let formatted_text = notice_text
65            .split_whitespace()
66            .map(|word| {
67                if word.starts_with("https") || word.starts_with("http") {
68                    format!("{}", CYAN.apply_to(word))
69                } else {
70                    match notice.type_field.as_str() {
71                        "success" => format!("{}", GREEN.apply_to(word)),
72                        "warning" | "prompt" => format!("{}", YELLOW.apply_to(word)),
73                        "error" | "danger" => format!("{}", RED.apply_to(word)),
74                        _ => word.to_string(),
75                    }
76                }
77            })
78            .collect::<Vec<String>>()
79            .join(" ");
80        println!("{}", formatted_text);
81    }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
85#[serde(rename_all = "camelCase")]
86pub struct Notice {
87    pub text: String,
88    #[serde(rename = "type")]
89    pub type_field: String,
90}
91
92fn is_true(object: &serde_json::Map<String, Value>, field: &str) -> bool {
93    match object.get(field) {
94        Some(json) => match *json {
95            serde_json::Value::Bool(b) => b,
96            _ => false,
97        },
98        None => false,
99    }
100}
101
102fn as_string(json: &Value) -> String {
103    match *json {
104        serde_json::Value::String(ref s) => s.clone(),
105        _ => format!("{}", json),
106    }
107}
108
109fn content_type(response: &reqwest::Response) -> String {
110    match response.headers().get("content-type") {
111        Some(value) => value.to_str().unwrap_or("text/plain").into(),
112        None => "text/plain".to_string(),
113    }
114}
115
116fn json_content_type(response: &reqwest::Response) -> bool {
117    match content_type(response).parse::<mime::Mime>() {
118        Ok(mime) => {
119            match (
120                mime.type_().as_str(),
121                mime.subtype().as_str(),
122                mime.suffix(),
123            ) {
124                ("application", "json", None) => true,
125                ("application", "hal", Some(mime::JSON)) => true,
126                _ => false,
127            }
128        }
129        Err(_) => false,
130    }
131}
132
133fn find_entry(map: &serde_json::Map<String, Value>, key: &str) -> Option<(String, Value)> {
134    match map.keys().find(|k| k.to_lowercase() == key.to_lowercase()) {
135        Some(k) => map.get(k).map(|v| (key.to_string(), v.clone())),
136        None => None,
137    }
138}
139
140/// Errors that can occur with a Pact Broker
141#[derive(Debug, Clone, thiserror::Error)]
142pub enum PactBrokerError {
143    /// Error with a HAL link
144    #[error("Error with a HAL link - {0}")]
145    LinkError(String),
146    /// Error with the content of a HAL resource
147    #[error("Error with the content of a HAL resource - {0}")]
148    ContentError(String),
149    #[error("IO Error - {0}")]
150    /// IO Error
151    IoError(String),
152    /// Link/Resource was not found
153    #[error("Link/Resource was not found - {0}")]
154    NotFound(String),
155    /// Invalid URL
156    #[error("Invalid URL - {0}")]
157    UrlError(String),
158    /// Validation error
159    #[error("failed validation - {0:?}")]
160    ValidationError(Vec<String>),
161    /// Validation error with notices
162    #[error("failed validation - {0:?}")]
163    ValidationErrorWithNotices(Vec<String>, Vec<Notice>),
164}
165
166impl PartialEq<String> for PactBrokerError {
167    fn eq(&self, other: &String) -> bool {
168        let mut buffer = String::new();
169        match self {
170            PactBrokerError::LinkError(s) => buffer.push_str(s),
171            PactBrokerError::ContentError(s) => buffer.push_str(s),
172            PactBrokerError::IoError(s) => buffer.push_str(s),
173            PactBrokerError::NotFound(s) => buffer.push_str(s),
174            PactBrokerError::UrlError(s) => buffer.push_str(s),
175            PactBrokerError::ValidationError(errors) => {
176                buffer.push_str(errors.iter().join(", ").as_str())
177            }
178            PactBrokerError::ValidationErrorWithNotices(errors, _) => {
179                buffer.push_str(errors.iter().join(", ").as_str())
180            }
181        };
182        buffer == *other
183    }
184}
185
186impl<'a> PartialEq<&'a str> for PactBrokerError {
187    fn eq(&self, other: &&str) -> bool {
188        let message = match self {
189            PactBrokerError::LinkError(s) => s.clone(),
190            PactBrokerError::ContentError(s) => s.clone(),
191            PactBrokerError::IoError(s) => s.clone(),
192            PactBrokerError::NotFound(s) => s.clone(),
193            PactBrokerError::UrlError(s) => s.clone(),
194            PactBrokerError::ValidationError(errors) => errors.iter().join(", "),
195            PactBrokerError::ValidationErrorWithNotices(errors, _) => errors.iter().join(", "),
196        };
197        message.as_str() == *other
198    }
199}
200
201impl From<url::ParseError> for PactBrokerError {
202    fn from(err: url::ParseError) -> Self {
203        PactBrokerError::UrlError(format!("{}", err))
204    }
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
208#[serde(default)]
209/// Structure to represent a HAL link
210pub struct Link {
211    /// Link name
212    pub name: String,
213    /// Link HREF
214    pub href: Option<String>,
215    /// If the link is templated (has expressions in the HREF that need to be expanded)
216    pub templated: bool,
217    /// Link title
218    pub title: Option<String>,
219}
220
221impl Link {
222    /// Create a link from serde JSON data
223    pub fn from_json(link: &str, link_data: &serde_json::Map<String, serde_json::Value>) -> Link {
224        Link {
225            name: link.to_string(),
226            href: find_entry(link_data, &"href".to_string()).map(|(_, href)| as_string(&href)),
227            templated: is_true(link_data, "templated"),
228            title: link_data.get("title").map(|title| as_string(title)),
229        }
230    }
231
232    /// Converts the Link into a JSON representation
233    pub fn as_json(&self) -> serde_json::Value {
234        match (self.href.clone(), self.title.clone()) {
235            (Some(href), Some(title)) => json!({
236              "href": href,
237              "title": title,
238              "templated": self.templated
239            }),
240            (Some(href), None) => json!({
241              "href": href,
242              "templated": self.templated
243            }),
244            (None, Some(title)) => json!({
245              "title": title,
246              "templated": self.templated
247            }),
248            (None, None) => json!({
249              "templated": self.templated
250            }),
251        }
252    }
253}
254
255impl Default for Link {
256    fn default() -> Self {
257        Link {
258            name: "link".to_string(),
259            href: None,
260            templated: false,
261            title: None,
262        }
263    }
264}
265
266/// HAL aware HTTP client
267#[derive(Clone)]
268pub struct HALClient {
269    pub url: String,
270    pub client: ClientWithMiddleware,
271    path_info: Option<Value>,
272    auth: Option<HttpAuth>,
273    custom_headers: Option<CustomHeaders>,
274    ssl_options: SslOptions,
275    pub retries: u8,
276}
277
278struct OtelPropagatorMiddleware;
279
280#[async_trait::async_trait]
281impl Middleware for OtelPropagatorMiddleware {
282    async fn handle(
283        &self,
284        mut req: Request,
285        extensions: &mut Extensions,
286        next: Next<'_>,
287    ) -> reqwest_middleware::Result<Response> {
288        let cx = Context::current();
289        let mut headers = reqwest::header::HeaderMap::new();
290        global::get_text_map_propagator(|propagator| {
291            propagator.inject_context(&cx, &mut HeaderInjector(&mut headers))
292        });
293        headers.append(
294            "baggage",
295            reqwest::header::HeaderValue::from_static("is_synthetic=true"),
296        );
297
298        for (key, value) in headers.iter() {
299            req.headers_mut().append(key, value.clone());
300        }
301        let res = next.run(req, extensions).await;
302        res
303    }
304}
305
306/// Parses the value of a `Retry-After` response header into a [`Duration`].
307///
308/// The header may be either:
309/// - an integer number of seconds (`Retry-After: 120`), or
310/// - an HTTP-date (`Retry-After: Fri, 31 Dec 1999 23:59:59 GMT`).
311///
312/// For a date in the past the returned duration is [`Duration::ZERO`].
313/// Returns `None` if the header is absent or unparseable.
314///
315/// # Arguments
316///
317/// * `response` - The HTTP response to inspect.
318///
319/// # Returns
320///
321/// The wait duration indicated by the header, or `None` if absent/unparseable.
322fn parse_retry_after(response: &Response) -> Option<std::time::Duration> {
323    let header_value = response
324        .headers()
325        .get(reqwest::header::RETRY_AFTER)?
326        .to_str()
327        .ok()?;
328
329    // Try the decimal-seconds form first (e.g. "120").
330    if let Ok(secs) = header_value.trim().parse::<u64>() {
331        return Some(std::time::Duration::from_secs(secs));
332    }
333
334    // Fall back to the HTTP-date form (e.g. "Fri, 31 Dec 1999 23:59:59 GMT").
335    if let Ok(system_time) = httpdate::parse_http_date(header_value) {
336        let delay = system_time
337            .duration_since(std::time::SystemTime::now())
338            .unwrap_or_default();
339        return Some(delay);
340    }
341
342    None
343}
344
345/// Middleware that retries transient HTTP failures and honours `Retry-After` headers.
346///
347/// Uses [`DefaultRetryableStrategy`] to classify responses: 5xx, 408, and 429 are
348/// treated as transient and retried.
349///
350/// For `429 Too Many Requests` responses the `Retry-After` header is read when
351/// present; both the decimal-seconds form (`Retry-After: 120`) and the HTTP-date
352/// form (`Retry-After: Fri, 31 Dec 1999 23:59:59 GMT`) are supported.  The parsed
353/// delay is passed to [`utils::compute_retry_delay`], which adds a ≈20 % jitter
354/// (capped at 60 s) to spread simultaneous retries across the new rate-limit window.
355///
356/// All other transient failures use exponential back-off (`10^attempt` ms).
357///
358/// Requests with streaming bodies that cannot be cloned produce an error on the first
359/// transient failure without retrying.
360struct RetryMiddleware {
361    /// Maximum number of total attempts, including the initial send.  `0` means
362    /// one attempt with no retries (same as `1`).
363    max_attempts: u8,
364}
365
366#[async_trait::async_trait]
367impl Middleware for RetryMiddleware {
368    async fn handle(
369        &self,
370        req: Request,
371        extensions: &mut Extensions,
372        next: Next<'_>,
373    ) -> reqwest_middleware::Result<Response> {
374        let max_retries = self.max_attempts.saturating_sub(1) as u32;
375        let mut n_past_retries: u32 = 0;
376
377        loop {
378            // Clone the request so we still hold it for subsequent retry iterations.
379            let cloned = req.try_clone().ok_or_else(|| {
380                reqwest_middleware::Error::Middleware(anyhow::anyhow!(
381                    "Request object is not cloneable. Are you passing a streaming body?"
382                ))
383            })?;
384
385            let result = next.clone().run(cloned, extensions).await;
386
387            // Classify the response using reqwest-retry's built-in strategy.
388            if let Some(Retryable::Transient) = DefaultRetryableStrategy.handle(&result) {
389                if n_past_retries < max_retries {
390                    let delay = if let Ok(ref resp) = result {
391                        utils::compute_retry_delay(
392                            resp.status(),
393                            parse_retry_after(resp),
394                            n_past_retries + 1,
395                        )
396                    } else {
397                        utils::compute_retry_delay(
398                            reqwest::StatusCode::INTERNAL_SERVER_ERROR,
399                            None,
400                            n_past_retries + 1,
401                        )
402                    };
403                    trace!(
404                        attempt = n_past_retries + 1,
405                        max_attempts = self.max_attempts,
406                        delay_ms = delay.as_millis(),
407                        "retrying transient HTTP failure"
408                    );
409                    tokio::time::sleep(delay).await;
410                    n_past_retries += 1;
411                    continue;
412                }
413            }
414
415            break result;
416        }
417    }
418}
419
420pub trait WithCurrentSpan {
421    fn with_current_span<F, R>(&self, f: F) -> R
422    where
423        F: FnOnce() -> R;
424}
425
426impl<T> WithCurrentSpan for T {
427    fn with_current_span<F, R>(&self, f: F) -> R
428    where
429        F: FnOnce() -> R,
430    {
431        let span = tracing::Span::current();
432        let _enter = span.enter();
433        f()
434    }
435}
436
437impl HALClient {
438    /// Helper method to apply custom headers to a request builder
439    fn apply_custom_headers(
440        &self,
441        mut builder: reqwest_middleware::RequestBuilder,
442    ) -> reqwest_middleware::RequestBuilder {
443        if let Some(ref custom_headers) = self.custom_headers {
444            for (name, value) in &custom_headers.headers {
445                builder = builder.header(name, value);
446            }
447        }
448        builder
449    }
450
451    /// Initialise a client with the URL and authentication
452    pub fn with_url(
453        url: &str,
454        auth: Option<HttpAuth>,
455        ssl_options: SslOptions,
456        custom_headers: Option<CustomHeaders>,
457    ) -> HALClient {
458        HALClient {
459            url: url.to_string(),
460            auth: auth.clone(),
461            custom_headers,
462            ssl_options: ssl_options.clone(),
463            ..HALClient::setup(url, auth, ssl_options)
464        }
465    }
466
467    fn update_path_info(self, path_info: serde_json::Value) -> HALClient {
468        HALClient {
469            client: self.client.clone(),
470            url: self.url.clone(),
471            path_info: Some(path_info),
472            auth: self.auth,
473            custom_headers: self.custom_headers,
474            retries: self.retries,
475            ssl_options: self.ssl_options,
476        }
477    }
478
479    /// Navigate to the resource from the link name
480    pub async fn navigate(
481        self,
482        link: &'static str,
483        template_values: &HashMap<String, String>,
484    ) -> Result<HALClient, PactBrokerError> {
485        trace!(
486            "navigate(link='{}', template_values={:?})",
487            link, template_values
488        );
489
490        let client = if self.path_info.is_none() {
491            let path_info = self.fetch("").await?;
492            self.update_path_info(path_info)
493        } else {
494            self
495        };
496
497        let path_info = client.clone().fetch_link(link, template_values).await?;
498        let client = client.update_path_info(path_info);
499
500        Ok(client)
501    }
502
503    fn find_link(&self, link: &'static str) -> Result<Link, PactBrokerError> {
504        match self.path_info {
505            None => Err(PactBrokerError::LinkError(format!("No previous resource has been fetched from the pact broker. URL: '{}', LINK: '{}'",
506                self.url, link))),
507            Some(ref json) => match json.get("_links") {
508                Some(json) => match json.get(link) {
509                    Some(link_data) => link_data.as_object()
510                        .map(|link_data| Link::from_json(&link.to_string(), &link_data))
511                        .ok_or_else(|| PactBrokerError::LinkError(format!("Link is malformed, expected an object but got {}. URL: '{}', LINK: '{}'",
512                            link_data, self.url, link))),
513                    None => Err(PactBrokerError::LinkError(format!("Link '{}' was not found in the response, only the following links where found: {:?}. URL: '{}', LINK: '{}'",
514                        link, json.as_object().unwrap_or(&json!({}).as_object().unwrap()).keys().join(", "), self.url, link)))
515                },
516                None => Err(PactBrokerError::LinkError(format!("Expected a HAL+JSON response from the pact broker, but got a response with no '_links'. URL: '{}', LINK: '{}'",
517                    self.url, link)))
518            }
519        }
520    }
521
522    async fn fetch_link(
523        self,
524        link: &'static str,
525        template_values: &HashMap<String, String>,
526    ) -> Result<Value, PactBrokerError> {
527        trace!(
528            "fetch_link(link='{}', template_values={:?})",
529            link, template_values
530        );
531
532        let link_data = self.find_link(link)?;
533
534        self.fetch_url(&link_data, template_values).await
535    }
536
537    /// Fetch the resource at the Link from the Pact broker
538    pub async fn fetch_url(
539        self,
540        link: &Link,
541        template_values: &HashMap<String, String>,
542    ) -> Result<Value, PactBrokerError> {
543        debug!(
544            "fetch_url(link={:?}, template_values={:?})",
545            link, template_values
546        );
547
548        let link_url = if link.templated {
549            debug!("Link URL is templated");
550            self.parse_link_url(&link, &template_values)
551        } else {
552            link.href.clone().ok_or_else(|| {
553                PactBrokerError::LinkError(format!(
554                    "Link is malformed, there is no href. URL: '{}', LINK: '{}'",
555                    self.url, link.name
556                ))
557            })
558        }?;
559
560        let base_url = self.url.parse::<Url>()?;
561        let joined_url = base_url.join(&link_url)?;
562        self.fetch(joined_url.path().into()).await
563    }
564    pub async fn delete_url(
565        self,
566        link: &Link,
567        template_values: &HashMap<String, String>,
568    ) -> Result<Value, PactBrokerError> {
569        debug!(
570            "fetch_url(link={:?}, template_values={:?})",
571            link, template_values
572        );
573
574        let link_url = if link.templated {
575            debug!("Link URL is templated");
576            self.parse_link_url(&link, &template_values)
577        } else {
578            link.href.clone().ok_or_else(|| {
579                PactBrokerError::LinkError(format!(
580                    "Link is malformed, there is no href. URL: '{}', LINK: '{}'",
581                    self.url, link.name
582                ))
583            })
584        }?;
585
586        let base_url = self.url.parse::<Url>()?;
587        debug!("base_url: {}", base_url);
588        debug!("link_url: {}", link_url);
589        let joined_url = base_url.join(&link_url)?;
590        debug!("joined_url: {}", joined_url);
591        self.delete(joined_url.path().into()).await
592    }
593
594    pub async fn fetch(&self, path: &str) -> Result<Value, PactBrokerError> {
595        info!("Fetching path '{}' from pact broker", path);
596        trace!(%path, broker_url = %self.url, ">> fetch");
597        let url = self.resolve_path(path)?;
598            debug!("Final broker URL: {}", url);
599
600        let mut request_builder = match self.auth {
601            Some(ref auth) => match auth {
602                HttpAuth::User(username, password) => {
603                    self.client.get(url).basic_auth(username, password.clone())
604                }
605                HttpAuth::Token(token) => self.client.get(url).bearer_auth(token),
606                _ => self.client.get(url),
607            },
608            None => self.client.get(url),
609        }
610        .header("accept", "application/hal+json, application/json");
611
612        // Apply custom headers if present
613        request_builder = self.apply_custom_headers(request_builder);
614
615        let response = request_builder.send().await.map_err(|err| {
616            PactBrokerError::IoError(format!(
617                "Failed to access pact broker path '{}' - {}. URL: '{}'",
618                &path, err, &self.url,
619            ))
620        })?;
621
622        self.parse_broker_response(path.to_string(), response).await
623    }
624
625      fn resolve_path(&self, path: &str) -> Result<Url, PactBrokerError> {
626        let broker_url = self.url.parse::<Url>()?;
627        let context_path = broker_url.path();
628        let url = if path.is_empty() {
629        broker_url
630        } else if !context_path.is_empty() && context_path != "/" {
631        if path.starts_with(context_path) {
632            let mut base_url = broker_url.clone();
633            base_url.set_path("/");
634            base_url.join(path)?
635        } else if path.starts_with("/") {
636            let mut base_url = broker_url.clone();
637            base_url.set_path(path);
638            base_url
639        } else {
640            let mut base_url = broker_url.clone();
641            let mut cp = context_path.to_string();
642            cp.push('/');
643            base_url.set_path(cp.as_str());
644            base_url.join(path)?
645        }
646        } else {
647        broker_url.join(path)?
648        };
649        Ok(url)
650    }
651
652    pub async fn delete(self, path: &str) -> Result<Value, PactBrokerError> {
653        info!("Deleting path '{}' from pact broker", path);
654
655        let broker_url = self.url.parse::<Url>()?;
656        let context_path = broker_url.path();
657        let url = if context_path.is_empty().not()
658            && context_path != "/"
659            && path.starts_with(context_path)
660        {
661            let mut base_url = broker_url.clone();
662            base_url.set_path("/");
663            base_url.join(path)?
664        } else {
665            broker_url.join(path)?
666        };
667
668        let request_builder = match self.auth {
669            Some(ref auth) => match auth {
670                HttpAuth::User(username, password) => self
671                    .client
672                    .delete(url)
673                    .basic_auth(username, password.clone()),
674                HttpAuth::Token(token) => self.client.delete(url).bearer_auth(token),
675                _ => self.client.delete(url),
676            },
677            None => self.client.delete(url),
678        }
679        .header("Accept", "application/hal+json");
680
681        let response = request_builder.send().await.map_err(|err| {
682            PactBrokerError::IoError(format!(
683                "Failed to delete pact broker path '{}' - {}. URL: '{}'",
684                &path, err, &self.url,
685            ))
686        })?;
687
688        self.parse_broker_response(path.to_string(), response).await
689    }
690
691    async fn parse_broker_response(
692        &self,
693        path: String,
694        response: reqwest::Response,
695    ) -> Result<Value, PactBrokerError> {
696        let is_json_content_type = json_content_type(&response);
697        let content_type = content_type(&response);
698        let status_code = response.status();
699
700        if status_code.is_success() {
701            if is_json_content_type {
702                response.json::<Value>()
703            .await
704            .map_err(|err| PactBrokerError::ContentError(
705              format!("Did not get a valid HAL response body from pact broker path '{}' - {}. URL: '{}'",
706                      path, err, self.url)
707            ))
708            } else if status_code.as_u16() == 204 {
709                Ok(json!({}))
710            } else {
711                debug!("Request from broker was a success, but the response body was not JSON");
712                Err(PactBrokerError::ContentError(format!(
713                    "Did not get a valid HAL response body from pact broker path '{}', content type is '{}'. URL: '{}'",
714                    path, content_type, self.url
715                )))
716            }
717        } else if status_code.as_u16() == 404 {
718            Err(PactBrokerError::NotFound(format!(
719                "Request to pact broker path '{}' failed: {}. URL: '{}'",
720                path, status_code, self.url
721            )))
722        } else {
723            // Handle any error status code (400, 422, 409, etc.)
724            let body = response.bytes().await.map_err(|_| {
725                PactBrokerError::IoError(format!(
726                    "Failed to download response body for path '{}'. URL: '{}'",
727                    &path, self.url
728                ))
729            })?;
730
731            if is_json_content_type {
732                match serde_json::from_slice::<Value>(&body) {
733                    Ok(json_body) => {
734                        if json_body.get("errors").is_some() || json_body.get("notices").is_some() {
735                            Err(handle_validation_errors(json_body))
736                        } else {
737                            Err(PactBrokerError::IoError(format!(
738                                "Request to pact broker path '{}' failed: {}. Response: {}. URL: '{}'",
739                                path, status_code, json_body, self.url
740                            )))
741                        }
742                    }
743                    Err(_) => {
744                        let body_text = from_utf8(&body)
745                            .map(|b| b.to_string())
746                            .unwrap_or_else(|err| format!("could not read body: {}", err));
747                        error!(
748                            "Request to pact broker path '{}' failed: {}",
749                            path, body_text
750                        );
751                        Err(PactBrokerError::IoError(format!(
752                            "Request to pact broker path '{}' failed: {}. URL: '{}'",
753                            path, status_code, self.url
754                        )))
755                    }
756                }
757            } else {
758                let body_text = from_utf8(&body)
759                    .map(|b| b.to_string())
760                    .unwrap_or_else(|err| format!("could not read body: {}", err));
761                error!(
762                    "Request to pact broker path '{}' failed: {}",
763                    path, body_text
764                );
765                Err(PactBrokerError::IoError(format!(
766                    "Request to pact broker path '{}' failed: {}. URL: '{}'",
767                    path, status_code, self.url
768                )))
769            }
770        }
771    }
772
773    fn parse_link_url(
774        &self,
775        link: &Link,
776        values: &HashMap<String, String>,
777    ) -> Result<String, PactBrokerError> {
778        match link.href {
779            Some(ref href) => {
780                debug!("templated URL = {}", href);
781                let re = Regex::new(r"\{(\w+)}").unwrap();
782                let final_url = re.replace_all(href, |caps: &Captures| {
783                    let lookup = caps.get(1).unwrap().as_str();
784                    trace!("Looking up value for key '{}'", lookup);
785                    match values.get(lookup) {
786                        Some(val) => urlencoding::encode(val.as_str()).to_string(),
787                        None => {
788                            warn!(
789                                "No value was found for key '{}', mapped values are {:?}",
790                                lookup, values
791                            );
792                            format!("{{{}}}", lookup)
793                        }
794                    }
795                });
796                debug!("final URL = {}", final_url);
797                Ok(final_url.to_string())
798            }
799            None => Err(PactBrokerError::LinkError(format!(
800                "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', LINK: '{}'",
801                self.url, link.name
802            ))),
803        }
804    }
805
806    /// Iterate over all the links by name
807    pub fn iter_links(&self, link: &str) -> Result<Vec<Link>, PactBrokerError> {
808        match self.path_info {
809      None => Err(PactBrokerError::LinkError(format!("No previous resource has been fetched from the pact broker. URL: '{}', LINK: '{}'",
810        self.url, link))),
811      Some(ref json) => match json.get("_links") {
812        Some(json) => match json.get(&link) {
813          Some(link_data) => link_data.as_array()
814              .map(|link_data| link_data.iter().map(|link_json| match link_json {
815                Value::Object(data) => Link::from_json(&link, data),
816                Value::String(s) => Link { name: link.to_string(), href: Some(s.clone()), templated: false, title: None },
817                _ => Link { name: link.to_string(), href: Some(link_json.to_string()), templated: false, title: None }
818              }).collect())
819              .ok_or_else(|| PactBrokerError::LinkError(format!("Link is malformed, expected an object but got {}. URL: '{}', LINK: '{}'",
820                  link_data, self.url, link))),
821          None => Err(PactBrokerError::LinkError(format!("Link '{}' was not found in the response, only the following links where found: {:?}. URL: '{}', LINK: '{}'",
822            link, json.as_object().unwrap_or(&json!({}).as_object().unwrap()).keys().join(", "), self.url, link)))
823        },
824        None => Err(PactBrokerError::LinkError(format!("Expected a HAL+JSON response from the pact broker, but got a response with no '_links'. URL: '{}', LINK: '{}'",
825          self.url, link)))
826      }
827    }
828    }
829
830    pub async fn post_json(
831        &self,
832        url: &str,
833        body: &str,
834        headers: Option<HashMap<String, String>>,
835    ) -> Result<serde_json::Value, PactBrokerError> {
836        trace!("post_json(url='{}', body='{}')", url, body);
837
838        self.send_document(url, body, Method::POST, headers).await
839    }
840
841    pub async fn put_json(
842        &self,
843        url: &str,
844        body: &str,
845        headers: Option<HashMap<String, String>>,
846    ) -> Result<serde_json::Value, PactBrokerError> {
847        trace!("put_json(url='{}', body='{}')", url, body);
848
849        self.send_document(url, body, Method::PUT, headers).await
850    }
851    pub async fn patch_json(
852        &self,
853        url: &str,
854        body: &str,
855        headers: Option<HashMap<String, String>>,
856    ) -> Result<serde_json::Value, PactBrokerError> {
857        trace!("put_json(url='{}', body='{}')", url, body);
858
859        self.send_document(url, body, Method::PATCH, headers).await
860    }
861
862    async fn send_document(
863        &self,
864        url: &str,
865        body: &str,
866        method: Method,
867        headers: Option<HashMap<String, String>>,
868    ) -> Result<Value, PactBrokerError> {
869        let method_type = method.clone();
870        debug!("Sending JSON to {} using {}: {}", url, method, body);
871
872        let base_url = &self.url.parse::<Url>()?;
873        let url = if url.starts_with("/") {
874            base_url.join(url)?
875        } else {
876            let url = url.parse::<Url>()?;
877            base_url.join(&url.path())?
878        };
879
880        let request_builder = match self.auth {
881            Some(ref auth) => match auth {
882                HttpAuth::User(username, password) => self
883                    .client
884                    .request(method, url.clone())
885                    .basic_auth(username, password.clone()),
886                HttpAuth::Token(token) => {
887                    self.client.request(method, url.clone()).bearer_auth(token)
888                }
889                _ => self.client.request(method, url.clone()),
890            },
891            None => self.client.request(method, url.clone()),
892        }
893        .header("Accept", "application/hal+json")
894        .body(body.to_string());
895
896        // Add any additional headers if provided
897
898        let request_builder = if let Some(ref headers) = headers {
899            headers
900                .iter()
901                .fold(request_builder, |builder, (key, value)| {
902                    builder.header(key.as_str(), value.as_str())
903                })
904        } else {
905            request_builder
906        };
907
908        let request_builder = if method_type == Method::PATCH {
909            request_builder.header("Content-Type", "application/merge-patch+json")
910        } else {
911            request_builder.header("Content-Type", "application/json")
912        };
913        match request_builder.send().await {
914            Ok(res) => {
915                self.parse_broker_response(url.path().to_string(), res)
916                    .await
917            }
918            Err(err) => Err(PactBrokerError::IoError(format!(
919                "Failed to send JSON to the pact broker URL '{}' - IoError {}",
920                url, err
921            ))),
922        }
923    }
924
925    fn with_doc_context(self, doc_attributes: &[Link]) -> Result<HALClient, PactBrokerError> {
926        let links: serde_json::Map<String, serde_json::Value> = doc_attributes
927            .iter()
928            .map(|link| (link.name.clone(), link.as_json()))
929            .collect();
930        let links_json = json!({
931          "_links": json!(links)
932        });
933        Ok(self.update_path_info(links_json))
934    }
935}
936
937fn handle_validation_errors(body: Value) -> PactBrokerError {
938    match &body {
939        Value::Object(attrs) => {
940            // Extract notices if present
941            let notices: Vec<Notice> = attrs
942                .get("notices")
943                .and_then(|n| n.as_array())
944                .map(|notices_array| {
945                    notices_array
946                        .iter()
947                        .filter_map(|notice| serde_json::from_value::<Notice>(notice.clone()).ok())
948                        .collect()
949                })
950                .unwrap_or_default();
951
952            if let Some(errors) = attrs.get("errors") {
953                let error_messages = match errors {
954                    Value::Array(values) => values.iter().map(|v| json_to_string(v)).collect(),
955                    Value::Object(errors) => errors
956                        .iter()
957                        .map(|(field, errors)| match errors {
958                            Value::String(error) => format!("{}: {}", field, error),
959                            Value::Array(errors) => format!(
960                                "{}: {}",
961                                field,
962                                errors.iter().map(|err| json_to_string(err)).join(", ")
963                            ),
964                            _ => format!("{}: {}", field, errors),
965                        })
966                        .collect(),
967                    Value::String(s) => vec![s.clone()],
968                    _ => vec![errors.to_string()],
969                };
970
971                if !notices.is_empty() {
972                    PactBrokerError::ValidationErrorWithNotices(error_messages, notices)
973                } else {
974                    PactBrokerError::ValidationError(error_messages)
975                }
976            } else if !notices.is_empty() {
977                // Even if there are no explicit errors, notices might contain error information
978                let notice_messages = notices.iter().map(|n| n.text.clone()).collect();
979                PactBrokerError::ValidationErrorWithNotices(notice_messages, notices)
980            } else {
981                PactBrokerError::ValidationError(vec![body.to_string()])
982            }
983        }
984        Value::String(s) => PactBrokerError::ValidationError(vec![s.clone()]),
985        _ => PactBrokerError::ValidationError(vec![body.to_string()]),
986    }
987}
988
989impl HALClient {
990    /// Builds the reqwest-middleware client stack for a given retry count and SSL
991    /// configuration.
992    ///
993    /// The middleware chain is (outermost → innermost):
994    /// 1. [`TracingMiddleware`] — adds OpenTelemetry trace context to every request.
995    /// 2. [`OtelPropagatorMiddleware`] — injects baggage / W3C trace propagation headers.
996    /// 3. [`RetryMiddleware`] — retries transient 5xx / 408 / 429 failures, honouring
997    ///    any `Retry-After` header present on the response.
998    ///
999    /// # Arguments
1000    ///
1001    /// * `retries` - Maximum number of total attempts (including the first send).
1002    /// * `ssl_options` - TLS configuration (custom CA cert, skip-verify, …).
1003    ///
1004    /// # Returns
1005    ///
1006    /// A fully configured [`ClientWithMiddleware`] ready for use.
1007    fn build_middleware_client(retries: u8, ssl_options: &SslOptions) -> ClientWithMiddleware {
1008        let mut builder = reqwest::Client::builder().user_agent(format!(
1009            "{}/{}",
1010            env!("CARGO_PKG_NAME"),
1011            env!("CARGO_PKG_VERSION")
1012        ));
1013
1014        debug!("Using ssl_options: {:?}", ssl_options);
1015        if let Some(ref path) = ssl_options.ssl_cert_path {
1016            if let Ok(cert_bytes) = std::fs::read(path) {
1017                match reqwest::Certificate::from_pem_bundle(&cert_bytes) {
1018                    Ok(certs) => {
1019                        debug!("Adding SSL certificate from path: {}", path);
1020                        if ssl_options.use_root_trust_store {
1021                            // Merge custom cert into the native root store.
1022                            builder = builder.tls_certs_merge(certs);
1023                        } else {
1024                            // Use ONLY the provided certificate; bypass all built-in roots.
1025                            debug!("Disabling root trust store for SSL — using only the provided certificate");
1026                            builder = builder.tls_certs_only(certs);
1027                        }
1028                    }
1029                    Err(err) => {
1030                        debug!(
1031                            "Could not parse SSL certificate from path {}: {}",
1032                            path, err
1033                        );
1034                    }
1035                }
1036            } else {
1037                debug!(
1038                    "Could not read SSL certificate from provided path: {}",
1039                    path
1040                );
1041            }
1042        } else if !ssl_options.use_root_trust_store {
1043            debug!("ssl-trust-store disabled but no custom certificate provided; proceeding with system roots");
1044        }
1045        if ssl_options.skip_ssl {
1046            builder = builder.danger_accept_invalid_certs(true);
1047            debug!("Skipping SSL certificate validation");
1048        }
1049
1050        let built_client = builder.build().expect("failed to build reqwest client");
1051        ClientBuilder::new(built_client)
1052            .with(TracingMiddleware::default())
1053            .with(OtelPropagatorMiddleware)
1054            .with(RetryMiddleware { max_attempts: retries })
1055            .build()
1056    }
1057
1058    pub fn setup(url: &str, auth: Option<HttpAuth>, ssl_options: SslOptions) -> HALClient {
1059        let retries = std::env::var("PACT_BROKER_HTTP_RETRIES")
1060            .ok()
1061            .and_then(|v| v.parse::<u8>().ok())
1062            .unwrap_or(8);
1063
1064        let client = Self::build_middleware_client(retries, &ssl_options);
1065
1066        HALClient {
1067            client,
1068            url: url.to_string(),
1069            path_info: None,
1070            auth,
1071            custom_headers: None,
1072            retries,
1073            ssl_options,
1074        }
1075    }
1076
1077    /// Sets the number of HTTP request retry attempts, overriding the default (3) or any
1078    /// value read from the `PACT_BROKER_HTTP_RETRIES` environment variable.
1079    ///
1080    /// CLI command handlers call this after construction to apply the `--retries` flag value.
1081    /// This rebuilds the internal HTTP client so the new retry count takes effect immediately.
1082    pub fn with_retry_count(mut self, retries: u8) -> Self {
1083        self.retries = retries;
1084        self.client = Self::build_middleware_client(retries, &self.ssl_options);
1085        self
1086    }
1087}
1088
1089pub fn links_from_json(json: &Value) -> Vec<Link> {
1090    match json.get("_links") {
1091        Some(json) => match json {
1092            Value::Object(v) => v
1093                .iter()
1094                .map(|(link, json)| match json {
1095                    Value::Object(attr) => Link::from_json(link, attr),
1096                    _ => Link {
1097                        name: link.clone(),
1098                        ..Link::default()
1099                    },
1100                })
1101                .collect(),
1102            _ => vec![],
1103        },
1104        None => vec![],
1105    }
1106}
1107
1108/// Fetches the pacts from the broker that match the provider name
1109pub async fn fetch_pacts_from_broker(
1110    broker_url: &str,
1111    provider_name: &str,
1112    auth: Option<HttpAuth>,
1113    ssl_options: SslOptions,
1114    custom_headers: Option<CustomHeaders>,
1115) -> anyhow::Result<
1116    Vec<
1117        anyhow::Result<(
1118            Box<dyn Pact + Send + Sync + RefUnwindSafe>,
1119            Option<PactVerificationContext>,
1120            Vec<Link>,
1121        )>,
1122    >,
1123> {
1124    trace!(
1125        "fetch_pacts_from_broker(broker_url='{}', provider_name='{}', auth={})",
1126        broker_url,
1127        provider_name,
1128        auth.clone().unwrap_or_default()
1129    );
1130
1131    let mut hal_client = HALClient::with_url(broker_url, auth, ssl_options, custom_headers);
1132    let template_values = hashmap! { "provider".to_string() => provider_name.to_string() };
1133
1134    hal_client = hal_client
1135        .navigate("pb:latest-provider-pacts", &template_values)
1136        .await
1137        .map_err(move |err| match err {
1138            PactBrokerError::NotFound(_) => PactBrokerError::NotFound(format!(
1139                "No pacts for provider '{}' where found in the pact broker. URL: '{}'",
1140                provider_name, broker_url
1141            )),
1142            _ => err,
1143        })?;
1144
1145    let pact_links = hal_client.clone().iter_links("pacts")?;
1146
1147    let results: Vec<_> = futures::stream::iter(pact_links)
1148        .map(|ref pact_link| {
1149          match pact_link.href {
1150            Some(_) => Ok((hal_client.clone(), pact_link.clone())),
1151            None => Err(
1152              PactBrokerError::LinkError(
1153                format!(
1154                  "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', LINK: '{:?}'",
1155                  &hal_client.url,
1156                  pact_link
1157                )
1158              )
1159            )
1160          }
1161        })
1162        .and_then(|(hal_client, pact_link)| async {
1163          let pact_json = hal_client.fetch_url(
1164            &pact_link.clone(),
1165            &template_values.clone()
1166          ).await?;
1167          Ok((pact_link, pact_json))
1168        })
1169        .map(|result| {
1170          match result {
1171            Ok((pact_link, pact_json)) => {
1172              let href = pact_link.href.unwrap_or_default();
1173              let links = links_from_json(&pact_json);
1174              load_pact_from_json(href.as_str(), &pact_json)
1175                .map(|pact| (pact, None, links))
1176            },
1177            Err(err) => Err(err.into())
1178          }
1179        })
1180        .into_stream()
1181        .collect()
1182        .await;
1183
1184    Ok(results)
1185}
1186
1187/// Fetch Pacts from the broker using the "provider-pacts-for-verification" endpoint
1188pub async fn fetch_pacts_dynamically_from_broker(
1189    broker_url: &str,
1190    provider_name: String,
1191    pending: bool,
1192    include_wip_pacts_since: Option<String>,
1193    provider_tags: Vec<String>,
1194    provider_branch: Option<String>,
1195    consumer_version_selectors: Vec<ConsumerVersionSelector>,
1196    auth: Option<HttpAuth>,
1197    ssl_options: SslOptions,
1198    headers: Option<HashMap<String, String>>,
1199    custom_headers: Option<CustomHeaders>,
1200) -> anyhow::Result<
1201    Vec<
1202        Result<
1203            (
1204                Box<dyn Pact + Send + Sync + RefUnwindSafe>,
1205                Option<PactVerificationContext>,
1206                Vec<Link>,
1207            ),
1208            PactBrokerError,
1209        >,
1210    >,
1211> {
1212    trace!(
1213        "fetch_pacts_dynamically_from_broker(broker_url='{}', provider_name='{}', pending={}, \
1214    include_wip_pacts_since={:?}, provider_tags: {:?}, consumer_version_selectors: {:?}, auth={})",
1215        broker_url,
1216        provider_name,
1217        pending,
1218        include_wip_pacts_since,
1219        provider_tags,
1220        consumer_version_selectors,
1221        auth.clone().unwrap_or_default()
1222    );
1223
1224    let mut hal_client = HALClient::with_url(broker_url, auth, ssl_options, custom_headers);
1225    let template_values = hashmap! { "provider".to_string() => provider_name.clone() };
1226
1227    hal_client = hal_client
1228        .navigate("pb:provider-pacts-for-verification", &template_values)
1229        .await
1230        .map_err(move |err| match err {
1231            PactBrokerError::NotFound(_) => PactBrokerError::NotFound(format!(
1232                "No pacts for provider '{}' were found in the pact broker. URL: '{}'",
1233                provider_name.clone(),
1234                broker_url
1235            )),
1236            _ => err,
1237        })?;
1238
1239    // Construct the Pacts for verification payload
1240    let pacts_for_verification = PactsForVerificationRequest {
1241        provider_version_tags: provider_tags,
1242        provider_version_branch: provider_branch,
1243        include_wip_pacts_since,
1244        consumer_version_selectors,
1245        include_pending_status: pending,
1246    };
1247    let request_body = serde_json::to_string(&pacts_for_verification).unwrap();
1248
1249    // Post the verification request
1250    let response = match hal_client.find_link("self") {
1251        Ok(link) => {
1252            let link = hal_client.clone().parse_link_url(&link, &hashmap! {})?;
1253            match hal_client
1254                .clone()
1255                .post_json(link.as_str(), request_body.as_str(), headers)
1256                .await
1257            {
1258                Ok(res) => Some(res),
1259                Err(err) => {
1260                    info!("error response for pacts for verification: {} ", err);
1261                    return Err(anyhow!(err));
1262                }
1263            }
1264        }
1265        Err(e) => return Err(anyhow!(e)),
1266    };
1267
1268    // Find all of the Pact links
1269    let pact_links = match response {
1270        Some(v) => {
1271            let pfv: PactsForVerificationResponse = serde_json::from_value(v)
1272                .map_err(|err| {
1273                    trace!(
1274                        "Failed to deserialise PactsForVerificationResponse: {}",
1275                        err
1276                    );
1277                    err
1278                })
1279                .unwrap_or(PactsForVerificationResponse {
1280                    embedded: PactsForVerificationBody { pacts: vec![] },
1281                });
1282            trace!(?pfv, "got pacts for verification response");
1283
1284            if pfv.embedded.pacts.len() == 0 {
1285                return Err(anyhow!(PactBrokerError::NotFound(format!(
1286                    "No pacts were found for this provider"
1287                ))));
1288            };
1289
1290            let links: Result<Vec<(Link, PactVerificationContext)>, PactBrokerError> = pfv.embedded.pacts.iter().map(| p| {
1291          match p.links.get("self") {
1292            Some(l) => Ok((l.clone(), p.into())),
1293            None => Err(
1294              PactBrokerError::LinkError(
1295                format!(
1296                  "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', PATH: '{:?}'",
1297                  &hal_client.url,
1298                  &p.links,
1299                )
1300              )
1301            )
1302          }
1303        }).collect();
1304
1305            links
1306        }
1307        None => Err(PactBrokerError::NotFound(format!(
1308            "No pacts were found for this provider"
1309        ))),
1310    }?;
1311
1312    let results: Vec<_> = futures::stream::iter(pact_links)
1313      .map(|(ref pact_link, ref context)| {
1314        match pact_link.href {
1315          Some(_) => Ok((hal_client.clone(), pact_link.clone(), context.clone())),
1316          None => Err(
1317            PactBrokerError::LinkError(
1318              format!(
1319                "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', LINK: '{:?}'",
1320                &hal_client.url,
1321                pact_link
1322              )
1323            )
1324          )
1325        }
1326      })
1327      .and_then(|(hal_client, pact_link, context)| async {
1328        let pact_json = hal_client.fetch_url(
1329          &pact_link.clone(),
1330          &template_values.clone()
1331        ).await?;
1332        Ok((pact_link, pact_json, context))
1333      })
1334      .map(|result| {
1335        match result {
1336          Ok((pact_link, pact_json, context)) => {
1337            let href = pact_link.href.unwrap_or_default();
1338            let links = links_from_json(&pact_json);
1339            load_pact_from_json(href.as_str(), &pact_json)
1340              .map(|pact| (pact, Some(context), links))
1341              .map_err(|err| PactBrokerError::ContentError(format!("{}", err)))
1342          },
1343          Err(err) => Err(err)
1344        }
1345      })
1346      .into_stream()
1347      .collect()
1348      .await;
1349
1350    Ok(results)
1351}
1352
1353/// Fetch the Pact from the given URL, using any required authentication. This will use a GET
1354/// request to the given URL and parse the result into a Pact model. It will also look for any HAL
1355/// links in the response, returning those if found.
1356pub async fn fetch_pact_from_url(
1357    url: &str,
1358    auth: &Option<HttpAuth>,
1359) -> anyhow::Result<(Box<dyn Pact + Send + Sync + RefUnwindSafe>, Vec<Link>)> {
1360    let url = url.to_string();
1361    let auth = auth.clone();
1362    let (url, pact_json) =
1363        tokio::task::spawn_blocking(move || http_utils::fetch_json_from_url(&url, &auth)).await??;
1364    let pact = load_pact_from_json(&url, &pact_json)?;
1365    let links = links_from_json(&pact_json);
1366    Ok((pact, links))
1367}
1368
1369async fn publish_provider_tags(
1370    hal_client: &HALClient,
1371    links: &[Link],
1372    provider_tags: Vec<String>,
1373    version: &str,
1374    headers: Option<HashMap<String, String>>,
1375) -> Result<(), PactBrokerError> {
1376    let hal_client = hal_client
1377        .clone()
1378        .with_doc_context(links)?
1379        .navigate("pb:provider", &hashmap! {})
1380        .await?;
1381    match hal_client.find_link("pb:version-tag") {
1382        Ok(link) => {
1383            for tag in &provider_tags {
1384                let template_values = hashmap! {
1385                  "version".to_string() => version.to_string(),
1386                  "tag".to_string() => tag.clone()
1387                };
1388                match hal_client
1389                    .clone()
1390                    .put_json(
1391                        hal_client
1392                            .clone()
1393                            .parse_link_url(&link, &template_values)?
1394                            .as_str(),
1395                        "{}",
1396                        headers.clone(),
1397                    )
1398                    .await
1399                {
1400                    Ok(_) => debug!("Pushed tag {} for provider version {}", tag, version),
1401                    Err(err) => {
1402                        error!(
1403                            "Failed to push tag {} for provider version {}",
1404                            tag, version
1405                        );
1406                        return Err(err);
1407                    }
1408                }
1409            }
1410            Ok(())
1411        }
1412        Err(_) => Err(PactBrokerError::LinkError(
1413            "Can't publish provider tags as there is no 'pb:version-tag' link".to_string(),
1414        )),
1415    }
1416}
1417
1418async fn publish_provider_branch(
1419    hal_client: &HALClient,
1420    links: &[Link],
1421    branch: &str,
1422    version: &str,
1423    headers: Option<HashMap<String, String>>,
1424) -> Result<(), PactBrokerError> {
1425    let hal_client = hal_client
1426        .clone()
1427        .with_doc_context(links)?
1428        .navigate("pb:provider", &hashmap! {})
1429        .await?;
1430
1431    match hal_client.find_link("pb:branch-version") {
1432    Ok(link) => {
1433      let template_values = hashmap! {
1434        "branch".to_string() => branch.to_string(),
1435        "version".to_string() => version.to_string(),
1436      };
1437      match hal_client.clone().put_json(hal_client.clone().parse_link_url(&link, &template_values)?.as_str(), "{}",headers).await {
1438        Ok(_) => debug!("Pushed branch {} for provider version {}", branch, version),
1439        Err(err) => {
1440          error!("Failed to push branch {} for provider version {}", branch, version);
1441          return Err(err);
1442        }
1443      }
1444      Ok(())
1445    },
1446    Err(_) => Err(PactBrokerError::LinkError("Can't publish provider branch as there is no 'pb:branch-version' link. Please ugrade to Pact Broker version 2.86.0 or later for branch support".to_string()))
1447  }
1448}
1449
1450#[skip_serializing_none]
1451#[derive(Serialize, Deserialize, Debug, Clone)]
1452#[serde(rename_all = "camelCase")]
1453/// Structure to represent a HAL link
1454pub struct ConsumerVersionSelector {
1455    /// Application name to filter the results on
1456    pub consumer: Option<String>,
1457    /// Tag
1458    pub tag: Option<String>,
1459    /// Fallback tag if Tag doesn't exist
1460    pub fallback_tag: Option<String>,
1461    /// Only select the latest (if false, this selects all pacts for a tag)
1462    pub latest: Option<bool>,
1463    /// Applications that have been deployed or released
1464    pub deployed_or_released: Option<bool>,
1465    /// Applications that have been deployed
1466    pub deployed: Option<bool>,
1467    /// Applications that have been released
1468    pub released: Option<bool>,
1469    /// Applications in a given environment
1470    pub environment: Option<String>,
1471    /// Applications with the default branch set in the broker
1472    pub main_branch: Option<bool>,
1473    /// Applications with the given branch
1474    pub branch: Option<String>,
1475    /// Applications that match the the provider version branch sent during verification
1476    pub matching_branch: Option<bool>,
1477}
1478
1479#[derive(Serialize, Deserialize, Debug, Clone)]
1480#[serde(rename_all = "camelCase")]
1481struct PactsForVerificationResponse {
1482    #[serde(rename(deserialize = "_embedded"))]
1483    pub embedded: PactsForVerificationBody,
1484}
1485
1486#[derive(Serialize, Deserialize, Debug, Clone)]
1487#[serde(rename_all = "camelCase")]
1488struct PactsForVerificationBody {
1489    pub pacts: Vec<PactForVerification>,
1490}
1491
1492#[derive(Serialize, Deserialize, Debug, Clone)]
1493#[serde(rename_all = "camelCase")]
1494struct PactForVerification {
1495    pub short_description: String,
1496    #[serde(rename(deserialize = "_links"))]
1497    pub links: HashMap<String, Link>,
1498    pub verification_properties: Option<PactVerificationProperties>,
1499}
1500
1501#[skip_serializing_none]
1502#[derive(Serialize, Deserialize, Debug, Clone)]
1503#[serde(rename_all = "camelCase")]
1504/// Request to send to determine the pacts to verify
1505pub struct PactsForVerificationRequest {
1506    /// Provider tags to use for determining pending pacts (if enabled)
1507    #[serde(skip_serializing_if = "Vec::is_empty")]
1508    pub provider_version_tags: Vec<String>,
1509    /// Enable pending pacts feature
1510    pub include_pending_status: bool,
1511    /// Find WIP pacts after given date
1512    pub include_wip_pacts_since: Option<String>,
1513    /// Detailed pact selection criteria , see https://docs.pact.io/pact_broker/advanced_topics/consumer_version_selectors/
1514    pub consumer_version_selectors: Vec<ConsumerVersionSelector>,
1515    /// Current provider version branch if used (instead of tags)
1516    pub provider_version_branch: Option<String>,
1517}
1518
1519#[skip_serializing_none]
1520#[derive(Serialize, Deserialize, Debug, Clone)]
1521#[serde(rename_all = "camelCase")]
1522/// Provides the context on why a Pact was included
1523pub struct PactVerificationContext {
1524    /// Description
1525    pub short_description: String,
1526    /// Properties
1527    pub verification_properties: PactVerificationProperties,
1528}
1529
1530impl From<&PactForVerification> for PactVerificationContext {
1531    fn from(value: &PactForVerification) -> Self {
1532        PactVerificationContext {
1533            short_description: value.short_description.clone(),
1534            verification_properties: value.verification_properties.clone().unwrap_or_default(),
1535        }
1536    }
1537}
1538
1539#[skip_serializing_none]
1540#[derive(Serialize, Deserialize, Debug, Clone, Default)]
1541#[serde(rename_all = "camelCase")]
1542/// Properties associated with the verification context
1543pub struct PactVerificationProperties {
1544    #[serde(default)]
1545    /// If the Pact is pending
1546    pub pending: bool,
1547    /// Notices provided by the Pact Broker
1548    pub notices: Vec<HashMap<String, String>>,
1549}
1550
1551#[cfg(test)]
1552mod hal_client_custom_headers_tests {
1553    use super::*;
1554    use crate::cli::pact_broker::main::types::SslOptions;
1555    use std::collections::HashMap;
1556
1557    fn create_test_custom_headers() -> CustomHeaders {
1558        let mut headers = HashMap::new();
1559        headers.insert("Authorization".to_string(), "Bearer test-token".to_string());
1560        headers.insert("X-API-Key".to_string(), "secret-key".to_string());
1561        CustomHeaders { headers }
1562    }
1563
1564    fn create_cloudflare_custom_headers() -> CustomHeaders {
1565        let mut headers = HashMap::new();
1566        headers.insert(
1567            "CF-Access-Client-Id".to_string(),
1568            "client-id-123".to_string(),
1569        );
1570        headers.insert(
1571            "CF-Access-Client-Secret".to_string(),
1572            "secret-456".to_string(),
1573        );
1574        CustomHeaders { headers }
1575    }
1576
1577    #[test]
1578    fn test_hal_client_with_custom_headers() {
1579        let custom_headers = Some(create_test_custom_headers());
1580        let ssl_options = SslOptions::default();
1581
1582        let client = HALClient::with_url(
1583            "https://test.example.com",
1584            None,
1585            ssl_options,
1586            custom_headers.clone(),
1587        );
1588
1589        assert_eq!(client.url, "https://test.example.com");
1590        assert!(client.custom_headers.is_some());
1591
1592        let headers = client.custom_headers.unwrap();
1593        assert_eq!(headers.headers.len(), 2);
1594        assert_eq!(
1595            headers.headers.get("Authorization"),
1596            Some(&"Bearer test-token".to_string())
1597        );
1598        assert_eq!(
1599            headers.headers.get("X-API-Key"),
1600            Some(&"secret-key".to_string())
1601        );
1602    }
1603
1604    #[test]
1605    fn test_hal_client_with_cloudflare_headers() {
1606        let custom_headers = Some(create_cloudflare_custom_headers());
1607        let ssl_options = SslOptions::default();
1608
1609        let client = HALClient::with_url(
1610            "https://pact-broker.example.com",
1611            None,
1612            ssl_options,
1613            custom_headers.clone(),
1614        );
1615
1616        assert!(client.custom_headers.is_some());
1617
1618        let headers = client.custom_headers.unwrap();
1619        assert_eq!(headers.headers.len(), 2);
1620        assert_eq!(
1621            headers.headers.get("CF-Access-Client-Id"),
1622            Some(&"client-id-123".to_string())
1623        );
1624        assert_eq!(
1625            headers.headers.get("CF-Access-Client-Secret"),
1626            Some(&"secret-456".to_string())
1627        );
1628    }
1629
1630    #[test]
1631    fn test_hal_client_without_custom_headers() {
1632        let ssl_options = SslOptions::default();
1633
1634        let client = HALClient::with_url("https://test.example.com", None, ssl_options, None);
1635
1636        assert!(client.custom_headers.is_none());
1637    }
1638
1639    #[test]
1640    fn test_hal_client_with_auth_and_custom_headers() {
1641        let auth = Some(HttpAuth::Token("bearer-token".to_string()));
1642        let custom_headers = Some(create_test_custom_headers());
1643        let ssl_options = SslOptions::default();
1644
1645        let client = HALClient::with_url(
1646            "https://test.example.com",
1647            auth.clone(),
1648            ssl_options,
1649            custom_headers,
1650        );
1651
1652        assert!(client.auth.is_some());
1653        assert!(client.custom_headers.is_some());
1654
1655        if let Some(HttpAuth::Token(token)) = client.auth {
1656            assert_eq!(token, "bearer-token");
1657        }
1658    }
1659
1660    #[test]
1661    fn test_apply_custom_headers_with_mock_request() {
1662        use reqwest::Client;
1663        use reqwest_middleware::ClientBuilder;
1664
1665        let custom_headers = Some(create_test_custom_headers());
1666        let ssl_options = SslOptions::default();
1667
1668        let client = HALClient::with_url(
1669            "https://test.example.com",
1670            None,
1671            ssl_options,
1672            custom_headers,
1673        );
1674
1675        // Create a mock request builder to test header application
1676        let reqwest_client = Client::new();
1677        let middleware_client = ClientBuilder::new(reqwest_client).build();
1678        let request_builder = middleware_client.get("https://test.example.com/test");
1679
1680        // Apply custom headers
1681        let modified_builder = client.apply_custom_headers(request_builder);
1682
1683        // Build the request to inspect headers
1684        let request = modified_builder.build().unwrap();
1685
1686        // Check that custom headers were applied
1687        assert!(request.headers().contains_key("authorization"));
1688        assert!(request.headers().contains_key("x-api-key"));
1689
1690        assert_eq!(
1691            request
1692                .headers()
1693                .get("authorization")
1694                .unwrap()
1695                .to_str()
1696                .unwrap(),
1697            "Bearer test-token"
1698        );
1699        assert_eq!(
1700            request
1701                .headers()
1702                .get("x-api-key")
1703                .unwrap()
1704                .to_str()
1705                .unwrap(),
1706            "secret-key"
1707        );
1708    }
1709
1710    #[test]
1711    fn test_apply_custom_headers_without_headers() {
1712        use reqwest::Client;
1713        use reqwest_middleware::ClientBuilder;
1714
1715        let ssl_options = SslOptions::default();
1716
1717        let client = HALClient::with_url("https://test.example.com", None, ssl_options, None);
1718
1719        // Create a mock request builder
1720        let reqwest_client = Client::new();
1721        let middleware_client = ClientBuilder::new(reqwest_client).build();
1722        let request_builder = middleware_client.get("https://test.example.com/test");
1723
1724        // Apply custom headers (should be no-op)
1725        let modified_builder = client.apply_custom_headers(request_builder);
1726
1727        // Build the request to inspect headers
1728        let request = modified_builder.build().unwrap();
1729
1730        // Should not contain our test headers
1731        assert!(!request.headers().contains_key("authorization"));
1732        assert!(!request.headers().contains_key("x-api-key"));
1733    }
1734
1735    #[test]
1736    fn test_custom_headers_struct_creation() {
1737        let mut headers = HashMap::new();
1738        headers.insert("Test-Header".to_string(), "test-value".to_string());
1739
1740        let custom_headers = CustomHeaders { headers };
1741
1742        assert_eq!(custom_headers.headers.len(), 1);
1743        assert_eq!(
1744            custom_headers.headers.get("Test-Header"),
1745            Some(&"test-value".to_string())
1746        );
1747    }
1748
1749    #[test]
1750    fn test_custom_headers_empty() {
1751        let headers = HashMap::new();
1752        let custom_headers = CustomHeaders { headers };
1753
1754        assert_eq!(custom_headers.headers.len(), 0);
1755        assert!(custom_headers.headers.is_empty());
1756    }
1757
1758    #[test]
1759    fn test_custom_headers_case_sensitivity() {
1760        let mut headers = HashMap::new();
1761        headers.insert("content-type".to_string(), "application/json".to_string());
1762        headers.insert("Content-Type".to_string(), "text/plain".to_string());
1763
1764        let custom_headers = CustomHeaders { headers };
1765
1766        // Both should exist as separate entries (case sensitive keys)
1767        assert_eq!(custom_headers.headers.len(), 2);
1768        assert_eq!(
1769            custom_headers.headers.get("content-type"),
1770            Some(&"application/json".to_string())
1771        );
1772        assert_eq!(
1773            custom_headers.headers.get("Content-Type"),
1774            Some(&"text/plain".to_string())
1775        );
1776    }
1777}
1778
1779#[cfg(test)]
1780mod tests
1781{
1782    use expectest::expect;
1783  use expectest::prelude::*;
1784  use pact_models::{Consumer, PactSpecification, Provider};
1785  use pact_models::prelude::RequestResponsePact;
1786  use pact_models::sync_interaction::RequestResponseInteraction;
1787  use pretty_assertions::assert_eq;
1788
1789  use pact_consumer::*;
1790  use pact_consumer::prelude::*;
1791
1792  use super::*;
1793  use super::{content_type, json_content_type};
1794
1795    #[test]
1796  fn resolve_path_test() {
1797    let client = HALClient::with_url("not a URL", None, SslOptions::default(), None);
1798    expect!(client.resolve_path("/any")).to(be_err());
1799
1800    let client = HALClient::with_url("http://localhost-ip4:1234", None, SslOptions::default(), None);
1801    expect!(client.resolve_path("")).to(be_ok().value(Url::parse("http://localhost-ip4:1234").unwrap()));
1802    expect!(client.resolve_path("/")).to(be_ok().value(Url::parse("http://localhost-ip4:1234").unwrap()));
1803    expect!(client.resolve_path("/any")).to(be_ok().value(Url::parse("http://localhost-ip4:1234/any").unwrap()));
1804    expect!(client.resolve_path("any")).to(be_ok().value(Url::parse("http://localhost-ip4:1234/any").unwrap()));
1805    expect!(client.resolve_path("any/sub-path")).to(be_ok().value(Url::parse("http://localhost-ip4:1234/any/sub-path").unwrap()));
1806    expect!(client.resolve_path("/base-path")).to(be_ok().value(Url::parse("http://localhost-ip4:1234/base-path").unwrap()));
1807    expect!(client.resolve_path("/base-path/")).to(be_ok().value(Url::parse("http://localhost-ip4:1234/base-path/").unwrap()));
1808    expect!(client.resolve_path("/base-path/sub-path")).to(be_ok().value(Url::parse("http://localhost-ip4:1234/base-path/sub-path").unwrap()));
1809
1810    let client = HALClient::with_url("http://localhost-ip4:1234/base-path", None, SslOptions::default(), None);
1811    expect!(client.resolve_path("")).to(be_ok().value(Url::parse("http://localhost-ip4:1234/base-path").unwrap()));
1812    expect!(client.resolve_path("/")).to(be_ok().value(Url::parse("http://localhost-ip4:1234").unwrap()));
1813    expect!(client.resolve_path("/any")).to(be_ok().value(Url::parse("http://localhost-ip4:1234/any").unwrap()));
1814    expect!(client.resolve_path("any")).to(be_ok().value(Url::parse("http://localhost-ip4:1234/base-path/any").unwrap()));
1815    expect!(client.resolve_path("any/sub-path")).to(be_ok().value(Url::parse("http://localhost-ip4:1234/base-path/any/sub-path").unwrap()));
1816    expect!(client.resolve_path("/base-path")).to(be_ok().value(Url::parse("http://localhost-ip4:1234/base-path").unwrap()));
1817    expect!(client.resolve_path("/base-path/")).to(be_ok().value(Url::parse("http://localhost-ip4:1234/base-path/").unwrap()));
1818    expect!(client.resolve_path("/base-path/sub-path")).to(be_ok().value(Url::parse("http://localhost-ip4:1234/base-path/sub-path").unwrap()));
1819  }
1820
1821  #[test_log::test(tokio::test)]
1822  async fn navigate_first_retrieves_the_root_resource() {
1823    let pact_broker = PactBuilderAsync::new("RustPactVerifier", "PactBrokerStub")
1824      .interaction("a request to a hal resource", "", |mut i| async move {
1825        i.request.path("/");
1826        i.response
1827          .header("Content-Type", "application/hal+json")
1828          .body("{\"_links\":{\"next\":{\"href\":\"/abc\"},\"prev\":{\"href\":\"/def\"}}}");
1829        i
1830      })
1831      .await
1832      .interaction("a request to next", "", |mut i| async move {
1833        i.request.path("/abc");
1834        i.response
1835          .header("Content-Type", "application/json")
1836          .json_body(json_pattern!("Yay! You found your way here"));
1837        i
1838      })
1839      .await
1840      .start_mock_server(None, Some(MockServerConfig::default()));
1841
1842    let client = HALClient::with_url(pact_broker.url().as_str(), None, SslOptions::default(), None);
1843    let result = client.navigate("next", &hashmap!{}).await.unwrap();
1844    expect!(result.path_info).to(be_some().value(serde_json::Value::String("Yay! You found your way here".to_string())));
1845  }
1846
1847  #[test_log::test(tokio::test)]
1848  async fn navigate_will_not_retrieve_the_root_resource_if_a_path_is_already_set() {
1849    let pact_broker = PactBuilderAsync::new("RustPactVerifier", "PactBrokerStub")
1850      .interaction("a request to next", "", |mut i| async move {
1851        i.request.path("/abc");
1852        i.response
1853          .header("Content-Type", "application/json")
1854          .json_body(json_pattern!("Yay! You found your way here"));
1855        i
1856      })
1857      .await
1858      .start_mock_server(None, Some(MockServerConfig::default()));
1859
1860    let mut client = HALClient::with_url(pact_broker.url().as_str(), None, SslOptions::default(), None);
1861    client.path_info = Some(json!({
1862      "_links": {
1863        "next": { "href": "/abc" },
1864        "prev": { "href": "/def" }
1865      }
1866    }));
1867    let result = client.navigate("next", &hashmap!{}).await.unwrap();
1868    expect!(result.path_info).to(be_some().value(serde_json::Value::String("Yay! You found your way here".to_string())));
1869  }
1870
1871  #[test_log::test(tokio::test)]
1872  async fn navigate_takes_context_paths_into_account() {
1873    let pact_broker = PactBuilderAsync::new("RustPactVerifier", "PactBrokerStub")
1874      .interaction("a request to a hal resource with base path", "", |mut i| async move {
1875        i.request.path("/base-path");
1876        i.response
1877          .header("Content-Type", "application/hal+json")
1878          .body("{\"_links\":{\"next\":{\"href\":\"/base-path/abc\"},\"prev\":{\"href\":\"/base-path/def\"}}}");
1879        i
1880      })
1881      .await
1882      .interaction("a request to next with a base path", "", |mut i| async move {
1883        i.request.path("/base-path/abc");
1884        i.response
1885          .header("Content-Type", "application/json")
1886          .json_body(json_pattern!("Yay! You found your way here"));
1887        i
1888      })
1889      .await
1890      .start_mock_server(None, Some(MockServerConfig::default()));
1891
1892    let client = HALClient::with_url(pact_broker.url().join("/base-path").unwrap().as_str(), None, SslOptions::default(), None);
1893    let result = client.navigate("next", &hashmap!{}).await.unwrap();
1894    expect!(result.path_info).to(be_some().value(serde_json::Value::String("Yay! You found your way here".to_string())));
1895  }
1896}
1897
1898// MARK: RetryMiddleware integration tests
1899
1900#[cfg(test)]
1901mod retry_middleware_tests {
1902    use std::sync::Arc;
1903    use std::sync::atomic::{AtomicUsize, Ordering};
1904
1905    use axum::{Router, body::Body, http::StatusCode, response::Response, routing::get};
1906    use tokio::net::TcpListener;
1907
1908    use super::{HALClient, SslOptions};
1909
1910    async fn spawn_test_server(router: Router) -> String {
1911        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1912        let addr = listener.local_addr().unwrap();
1913        tokio::spawn(async move {
1914            axum::serve(listener, router).await.unwrap();
1915        });
1916        format!("http://{}", addr)
1917    }
1918
1919    fn hal_client(base_url: &str, retries: u8) -> HALClient {
1920        HALClient::with_url(base_url, None, SslOptions::default(), None)
1921            .with_retry_count(retries)
1922    }
1923
1924    #[tokio::test]
1925    async fn retries_on_429_too_many_requests() {
1926        let request_count = Arc::new(AtomicUsize::new(0));
1927        let count = request_count.clone();
1928
1929        let router = Router::new().route(
1930            "/",
1931            get(move || {
1932                let count = count.clone();
1933                async move {
1934                    let n = count.fetch_add(1, Ordering::SeqCst);
1935                    if n < 2 {
1936                        Response::builder()
1937                            .status(StatusCode::TOO_MANY_REQUESTS)
1938                            .body(Body::from("{\"_links\":{}}"))
1939                            .unwrap()
1940                    } else {
1941                        Response::builder()
1942                            .status(StatusCode::OK)
1943                            .header("content-type", "application/hal+json")
1944                            .body(Body::from("{\"_links\":{}}"))
1945                            .unwrap()
1946                    }
1947                }
1948            }),
1949        );
1950
1951        let base_url = spawn_test_server(router).await;
1952        let client = hal_client(&base_url, 3);
1953        let result = client.fetch("").await;
1954
1955        assert!(result.is_ok(), "expected OK but got: {:?}", result.err());
1956        assert_eq!(request_count.load(Ordering::SeqCst), 3);
1957    }
1958
1959    #[tokio::test]
1960    async fn does_not_retry_404_not_found() {
1961        let request_count = Arc::new(AtomicUsize::new(0));
1962        let count = request_count.clone();
1963
1964        let router = Router::new().route(
1965            "/",
1966            get(move || {
1967                let count = count.clone();
1968                async move {
1969                    count.fetch_add(1, Ordering::SeqCst);
1970                    StatusCode::NOT_FOUND
1971                }
1972            }),
1973        );
1974
1975        let base_url = spawn_test_server(router).await;
1976        let client = hal_client(&base_url, 3);
1977        let _ = client.fetch("").await;
1978
1979        assert_eq!(
1980            request_count.load(Ordering::SeqCst),
1981            1,
1982            "404 should not be retried"
1983        );
1984    }
1985
1986    #[tokio::test]
1987    async fn retries_on_500_internal_server_error() {
1988        let request_count = Arc::new(AtomicUsize::new(0));
1989        let count = request_count.clone();
1990
1991        let router = Router::new().route(
1992            "/",
1993            get(move || {
1994                let count = count.clone();
1995                async move {
1996                    let n = count.fetch_add(1, Ordering::SeqCst);
1997                    if n == 0 {
1998                        StatusCode::INTERNAL_SERVER_ERROR
1999                    } else {
2000                        StatusCode::OK
2001                    }
2002                }
2003            }),
2004        );
2005
2006        let base_url = spawn_test_server(router).await;
2007        let client = hal_client(&base_url, 3);
2008        let _ = client.fetch("").await;
2009
2010        assert_eq!(
2011            request_count.load(Ordering::SeqCst),
2012            2,
2013            "500 should be retried once"
2014        );
2015    }
2016
2017    #[tokio::test]
2018    async fn honours_integer_retry_after_header() {
2019        // Retry-After: 0 exercises the header path without real delay.
2020        let request_count = Arc::new(AtomicUsize::new(0));
2021        let count = request_count.clone();
2022
2023        let router = Router::new().route(
2024            "/",
2025            get(move || {
2026                let count = count.clone();
2027                async move {
2028                    let n = count.fetch_add(1, Ordering::SeqCst);
2029                    if n == 0 {
2030                        Response::builder()
2031                            .status(StatusCode::TOO_MANY_REQUESTS)
2032                            .header("Retry-After", "0")
2033                            .body(Body::from("{\"_links\":{}}"))
2034                            .unwrap()
2035                    } else {
2036                        Response::builder()
2037                            .status(StatusCode::OK)
2038                            .header("content-type", "application/hal+json")
2039                            .body(Body::from("{\"_links\":{}}"))
2040                            .unwrap()
2041                    }
2042                }
2043            }),
2044        );
2045
2046        let base_url = spawn_test_server(router).await;
2047        let client = hal_client(&base_url, 3);
2048        let result = client.fetch("").await;
2049
2050        assert!(result.is_ok());
2051        assert_eq!(request_count.load(Ordering::SeqCst), 2);
2052    }
2053
2054    #[tokio::test]
2055    async fn honours_http_date_retry_after_header() {
2056        // An HTTP-date Retry-After in the past should result in a zero delay,
2057        // confirming that the date form is parsed rather than silently ignored.
2058        let request_count = Arc::new(AtomicUsize::new(0));
2059        let count = request_count.clone();
2060
2061        let router = Router::new().route(
2062            "/",
2063            get(move || {
2064                let count = count.clone();
2065                async move {
2066                    let n = count.fetch_add(1, Ordering::SeqCst);
2067                    if n == 0 {
2068                        Response::builder()
2069                            .status(StatusCode::TOO_MANY_REQUESTS)
2070                            // A date well in the past → delay is immediately 0.
2071                            .header("Retry-After", "Thu, 01 Jan 1970 00:00:00 GMT")
2072                            .body(Body::from("{\"_links\":{}}"))
2073                            .unwrap()
2074                    } else {
2075                        Response::builder()
2076                            .status(StatusCode::OK)
2077                            .header("content-type", "application/hal+json")
2078                            .body(Body::from("{\"_links\":{}}"))
2079                            .unwrap()
2080                    }
2081                }
2082            }),
2083        );
2084
2085        let base_url = spawn_test_server(router).await;
2086        let client = hal_client(&base_url, 3);
2087        let result = client.fetch("").await;
2088
2089        assert!(result.is_ok());
2090        assert_eq!(
2091            request_count.load(Ordering::SeqCst),
2092            2,
2093            "HTTP-date Retry-After should be parsed and retry should happen"
2094        );
2095    }
2096
2097    #[tokio::test]
2098    async fn sends_one_request_when_retries_is_zero() {
2099        let request_count = Arc::new(AtomicUsize::new(0));
2100        let count = request_count.clone();
2101
2102        let router = Router::new().route(
2103            "/",
2104            get(move || {
2105                let count = count.clone();
2106                async move {
2107                    count.fetch_add(1, Ordering::SeqCst);
2108                    Response::builder()
2109                        .status(StatusCode::TOO_MANY_REQUESTS)
2110                        .body(Body::empty())
2111                        .unwrap()
2112                }
2113            }),
2114        );
2115
2116        let base_url = spawn_test_server(router).await;
2117        let client = hal_client(&base_url, 0);
2118        let _ = client.fetch("").await;
2119
2120        assert_eq!(
2121            request_count.load(Ordering::SeqCst),
2122            1,
2123            "retries=0 should send exactly one request"
2124        );
2125    }
2126
2127    #[tokio::test]
2128    async fn returns_last_failure_when_all_retries_exhausted() {
2129        let router = Router::new().route(
2130            "/",
2131            get(|| async {
2132                Response::builder()
2133                    .status(StatusCode::TOO_MANY_REQUESTS)
2134                    .body(Body::empty())
2135                    .unwrap()
2136            }),
2137        );
2138
2139        let base_url = spawn_test_server(router).await;
2140        let client = hal_client(&base_url, 2);
2141        let result = client.fetch("").await;
2142
2143        assert!(
2144            result.is_err(),
2145            "all retries exhausted should return error, got: {:?}",
2146            result
2147        );
2148    }
2149}