pact_broker_cli/cli/pact_broker/
main.rs

1//! Structs and functions for interacting with a Pact Broker
2
3use std::collections::HashMap;
4use std::ops::Not;
5use std::panic::RefUnwindSafe;
6use std::str::from_utf8;
7
8use anyhow::anyhow;
9use futures::stream::*;
10
11use itertools::Itertools;
12use maplit::hashmap;
13
14use pact_models::http_utils;
15use pact_models::http_utils::HttpAuth;
16use pact_models::json_utils::json_to_string;
17use pact_models::pact::{Pact, load_pact_from_json};
18use regex::{Captures, Regex};
19use reqwest::{Method, Url};
20use serde::{Deserialize, Serialize};
21use serde_json::{Value, json};
22use serde_with::skip_serializing_none;
23use tracing::{debug, error, info, trace, warn};
24pub mod branches;
25pub mod can_i_deploy;
26pub mod deployments;
27pub mod environments;
28pub mod pact_publish;
29pub mod pacticipants;
30pub mod pacts;
31pub mod subcommands;
32pub mod tags;
33#[cfg(test)]
34pub mod test_utils;
35pub mod types;
36pub mod utils;
37pub mod verification;
38pub mod versions;
39pub mod webhooks;
40use utils::with_retries;
41// for otel
42use http::Extensions;
43use opentelemetry::Context;
44use opentelemetry::global;
45use opentelemetry_http::HeaderInjector;
46use reqwest::Request;
47use reqwest::Response;
48use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
49use reqwest_middleware::{Middleware, Next};
50use reqwest_tracing::TracingMiddleware;
51
52use crate::cli::pact_broker::main::types::SslOptions;
53
54fn is_true(object: &serde_json::Map<String, Value>, field: &str) -> bool {
55    match object.get(field) {
56        Some(json) => match *json {
57            serde_json::Value::Bool(b) => b,
58            _ => false,
59        },
60        None => false,
61    }
62}
63
64fn as_string(json: &Value) -> String {
65    match *json {
66        serde_json::Value::String(ref s) => s.clone(),
67        _ => format!("{}", json),
68    }
69}
70
71fn content_type(response: &reqwest::Response) -> String {
72    match response.headers().get("content-type") {
73        Some(value) => value.to_str().unwrap_or("text/plain").into(),
74        None => "text/plain".to_string(),
75    }
76}
77
78fn json_content_type(response: &reqwest::Response) -> bool {
79    match content_type(response).parse::<mime::Mime>() {
80        Ok(mime) => {
81            match (
82                mime.type_().as_str(),
83                mime.subtype().as_str(),
84                mime.suffix(),
85            ) {
86                ("application", "json", None) => true,
87                ("application", "hal", Some(mime::JSON)) => true,
88                _ => false,
89            }
90        }
91        Err(_) => false,
92    }
93}
94
95fn find_entry(map: &serde_json::Map<String, Value>, key: &str) -> Option<(String, Value)> {
96    match map.keys().find(|k| k.to_lowercase() == key.to_lowercase()) {
97        Some(k) => map.get(k).map(|v| (key.to_string(), v.clone())),
98        None => None,
99    }
100}
101
102/// Errors that can occur with a Pact Broker
103#[derive(Debug, Clone, thiserror::Error)]
104pub enum PactBrokerError {
105    /// Error with a HAL link
106    #[error("Error with a HAL link - {0}")]
107    LinkError(String),
108    /// Error with the content of a HAL resource
109    #[error("Error with the content of a HAL resource - {0}")]
110    ContentError(String),
111    #[error("IO Error - {0}")]
112    /// IO Error
113    IoError(String),
114    /// Link/Resource was not found
115    #[error("Link/Resource was not found - {0}")]
116    NotFound(String),
117    /// Invalid URL
118    #[error("Invalid URL - {0}")]
119    UrlError(String),
120    /// Validation error
121    #[error("failed validation - {0:?}")]
122    ValidationError(Vec<String>),
123}
124
125impl PartialEq<String> for PactBrokerError {
126    fn eq(&self, other: &String) -> bool {
127        let mut buffer = String::new();
128        match self {
129            PactBrokerError::LinkError(s) => buffer.push_str(s),
130            PactBrokerError::ContentError(s) => buffer.push_str(s),
131            PactBrokerError::IoError(s) => buffer.push_str(s),
132            PactBrokerError::NotFound(s) => buffer.push_str(s),
133            PactBrokerError::UrlError(s) => buffer.push_str(s),
134            PactBrokerError::ValidationError(errors) => {
135                buffer.push_str(errors.iter().join(", ").as_str())
136            }
137        };
138        buffer == *other
139    }
140}
141
142impl<'a> PartialEq<&'a str> for PactBrokerError {
143    fn eq(&self, other: &&str) -> bool {
144        let message = match self {
145            PactBrokerError::LinkError(s) => s.clone(),
146            PactBrokerError::ContentError(s) => s.clone(),
147            PactBrokerError::IoError(s) => s.clone(),
148            PactBrokerError::NotFound(s) => s.clone(),
149            PactBrokerError::UrlError(s) => s.clone(),
150            PactBrokerError::ValidationError(errors) => errors.iter().join(", "),
151        };
152        message.as_str() == *other
153    }
154}
155
156impl From<url::ParseError> for PactBrokerError {
157    fn from(err: url::ParseError) -> Self {
158        PactBrokerError::UrlError(format!("{}", err))
159    }
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
163#[serde(default)]
164/// Structure to represent a HAL link
165pub struct Link {
166    /// Link name
167    pub name: String,
168    /// Link HREF
169    pub href: Option<String>,
170    /// If the link is templated (has expressions in the HREF that need to be expanded)
171    pub templated: bool,
172    /// Link title
173    pub title: Option<String>,
174}
175
176impl Link {
177    /// Create a link from serde JSON data
178    pub fn from_json(link: &str, link_data: &serde_json::Map<String, serde_json::Value>) -> Link {
179        Link {
180            name: link.to_string(),
181            href: find_entry(link_data, &"href".to_string()).map(|(_, href)| as_string(&href)),
182            templated: is_true(link_data, "templated"),
183            title: link_data.get("title").map(|title| as_string(title)),
184        }
185    }
186
187    /// Converts the Link into a JSON representation
188    pub fn as_json(&self) -> serde_json::Value {
189        match (self.href.clone(), self.title.clone()) {
190            (Some(href), Some(title)) => json!({
191              "href": href,
192              "title": title,
193              "templated": self.templated
194            }),
195            (Some(href), None) => json!({
196              "href": href,
197              "templated": self.templated
198            }),
199            (None, Some(title)) => json!({
200              "title": title,
201              "templated": self.templated
202            }),
203            (None, None) => json!({
204              "templated": self.templated
205            }),
206        }
207    }
208}
209
210impl Default for Link {
211    fn default() -> Self {
212        Link {
213            name: "link".to_string(),
214            href: None,
215            templated: false,
216            title: None,
217        }
218    }
219}
220
221/// HAL aware HTTP client
222#[derive(Clone)]
223pub struct HALClient {
224    client: ClientWithMiddleware,
225    url: String,
226    path_info: Option<Value>,
227    auth: Option<HttpAuth>,
228    ssl_options: SslOptions,
229    retries: u8,
230}
231
232struct OtelPropagatorMiddleware;
233
234#[async_trait::async_trait]
235impl Middleware for OtelPropagatorMiddleware {
236    async fn handle(
237        &self,
238        mut req: Request,
239        extensions: &mut Extensions,
240        next: Next<'_>,
241    ) -> reqwest_middleware::Result<Response> {
242        let cx = Context::current();
243        let mut headers = reqwest::header::HeaderMap::new();
244        global::get_text_map_propagator(|propagator| {
245            propagator.inject_context(&cx, &mut HeaderInjector(&mut headers))
246        });
247        headers.append(
248            "baggage",
249            reqwest::header::HeaderValue::from_static("is_synthetic=true"),
250        );
251
252        for (key, value) in headers.iter() {
253            req.headers_mut().append(key, value.clone());
254        }
255        let res = next.run(req, extensions).await;
256        res
257    }
258}
259
260pub trait WithCurrentSpan {
261    fn with_current_span<F, R>(&self, f: F) -> R
262    where
263        F: FnOnce() -> R;
264}
265
266impl<T> WithCurrentSpan for T {
267    fn with_current_span<F, R>(&self, f: F) -> R
268    where
269        F: FnOnce() -> R,
270    {
271        let span = tracing::Span::current();
272        let _enter = span.enter();
273        f()
274    }
275}
276
277impl HALClient {
278    /// Initialise a client with the URL and authentication
279    pub fn with_url(url: &str, auth: Option<HttpAuth>, ssl_options: SslOptions) -> HALClient {
280        HALClient {
281            url: url.to_string(),
282            auth: auth.clone(),
283            ssl_options: ssl_options.clone(),
284            ..HALClient::setup(url, auth, ssl_options)
285        }
286    }
287
288    fn update_path_info(self, path_info: serde_json::Value) -> HALClient {
289        HALClient {
290            client: self.client.clone(),
291            url: self.url.clone(),
292            path_info: Some(path_info),
293            auth: self.auth,
294            retries: self.retries,
295            ssl_options: self.ssl_options,
296        }
297    }
298
299    /// Navigate to the resource from the link name
300    pub async fn navigate(
301        self,
302        link: &'static str,
303        template_values: &HashMap<String, String>,
304    ) -> Result<HALClient, PactBrokerError> {
305        trace!(
306            "navigate(link='{}', template_values={:?})",
307            link, template_values
308        );
309
310        let client = if self.path_info.is_none() {
311            let path_info = self.clone().fetch("/".into()).await?;
312            self.update_path_info(path_info)
313        } else {
314            self
315        };
316
317        let path_info = client.clone().fetch_link(link, template_values).await?;
318        let client = client.update_path_info(path_info);
319
320        Ok(client)
321    }
322
323    fn find_link(&self, link: &'static str) -> Result<Link, PactBrokerError> {
324        match self.path_info {
325            None => Err(PactBrokerError::LinkError(format!("No previous resource has been fetched from the pact broker. URL: '{}', LINK: '{}'",
326                self.url, link))),
327            Some(ref json) => match json.get("_links") {
328                Some(json) => match json.get(link) {
329                    Some(link_data) => link_data.as_object()
330                        .map(|link_data| Link::from_json(&link.to_string(), &link_data))
331                        .ok_or_else(|| PactBrokerError::LinkError(format!("Link is malformed, expected an object but got {}. URL: '{}', LINK: '{}'",
332                            link_data, self.url, link))),
333                    None => Err(PactBrokerError::LinkError(format!("Link '{}' was not found in the response, only the following links where found: {:?}. URL: '{}', LINK: '{}'",
334                        link, json.as_object().unwrap_or(&json!({}).as_object().unwrap()).keys().join(", "), self.url, link)))
335                },
336                None => Err(PactBrokerError::LinkError(format!("Expected a HAL+JSON response from the pact broker, but got a response with no '_links'. URL: '{}', LINK: '{}'",
337                    self.url, link)))
338            }
339        }
340    }
341
342    async fn fetch_link(
343        self,
344        link: &'static str,
345        template_values: &HashMap<String, String>,
346    ) -> Result<Value, PactBrokerError> {
347        trace!(
348            "fetch_link(link='{}', template_values={:?})",
349            link, template_values
350        );
351
352        let link_data = self.find_link(link)?;
353
354        self.fetch_url(&link_data, template_values).await
355    }
356
357    /// Fetch the resource at the Link from the Pact broker
358    pub async fn fetch_url(
359        self,
360        link: &Link,
361        template_values: &HashMap<String, String>,
362    ) -> Result<Value, PactBrokerError> {
363        debug!(
364            "fetch_url(link={:?}, template_values={:?})",
365            link, template_values
366        );
367
368        let link_url = if link.templated {
369            debug!("Link URL is templated");
370            self.clone().parse_link_url(&link, &template_values)
371        } else {
372            link.href.clone().ok_or_else(|| {
373                PactBrokerError::LinkError(format!(
374                    "Link is malformed, there is no href. URL: '{}', LINK: '{}'",
375                    self.url, link.name
376                ))
377            })
378        }?;
379
380        let base_url = self.url.parse::<Url>()?;
381        let joined_url = base_url.join(&link_url)?;
382        self.fetch(joined_url.path().into()).await
383    }
384    pub async fn delete_url(
385        self,
386        link: &Link,
387        template_values: &HashMap<String, String>,
388    ) -> Result<Value, PactBrokerError> {
389        debug!(
390            "fetch_url(link={:?}, template_values={:?})",
391            link, template_values
392        );
393
394        let link_url = if link.templated {
395            debug!("Link URL is templated");
396            self.clone().parse_link_url(&link, &template_values)
397        } else {
398            link.href.clone().ok_or_else(|| {
399                PactBrokerError::LinkError(format!(
400                    "Link is malformed, there is no href. URL: '{}', LINK: '{}'",
401                    self.url, link.name
402                ))
403            })
404        }?;
405
406        let base_url = self.url.parse::<Url>()?;
407        debug!("base_url: {}", base_url);
408        debug!("link_url: {}", link_url);
409        let joined_url = base_url.join(&link_url)?;
410        debug!("joined_url: {}", joined_url);
411        self.delete(joined_url.path().into()).await
412    }
413
414    pub async fn fetch(self, path: &str) -> Result<Value, PactBrokerError> {
415        info!("Fetching path '{}' from pact broker", path);
416
417        let broker_url = self.url.parse::<Url>()?;
418        let context_path = broker_url.path();
419        let url = if context_path.is_empty().not()
420            && context_path != "/"
421            && path.starts_with(context_path)
422        {
423            let mut base_url = broker_url.clone();
424            base_url.set_path("/");
425            base_url.join(path)?
426        } else {
427            broker_url.join(path)?
428        };
429
430        let request_builder = match self.auth {
431            Some(ref auth) => match auth {
432                HttpAuth::User(username, password) => {
433                    self.client.get(url).basic_auth(username, password.clone())
434                }
435                HttpAuth::Token(token) => self.client.get(url).bearer_auth(token),
436                _ => self.client.get(url),
437            },
438            None => self.client.get(url),
439        }
440        .header("accept", "application/hal+json, application/json");
441
442        let response = utils::with_retries(self.retries, request_builder)
443            .await
444            .map_err(|err| {
445                PactBrokerError::IoError(format!(
446                    "Failed to access pact broker path '{}' - {}. URL: '{}'",
447                    &path, err, &self.url,
448                ))
449            })?;
450
451        self.parse_broker_response(path.to_string(), response).await
452    }
453
454    pub async fn delete(self, path: &str) -> Result<Value, PactBrokerError> {
455        info!("Deleting path '{}' from pact broker", path);
456
457        let broker_url = self.url.parse::<Url>()?;
458        let context_path = broker_url.path();
459        let url = if context_path.is_empty().not()
460            && context_path != "/"
461            && path.starts_with(context_path)
462        {
463            let mut base_url = broker_url.clone();
464            base_url.set_path("/");
465            base_url.join(path)?
466        } else {
467            broker_url.join(path)?
468        };
469
470        let request_builder = match self.auth {
471            Some(ref auth) => match auth {
472                HttpAuth::User(username, password) => self
473                    .client
474                    .delete(url)
475                    .basic_auth(username, password.clone()),
476                HttpAuth::Token(token) => self.client.delete(url).bearer_auth(token),
477                _ => self.client.delete(url),
478            },
479            None => self.client.delete(url),
480        };
481
482        let response = utils::with_retries(self.retries, request_builder)
483            .await
484            .map_err(|err| {
485                PactBrokerError::IoError(format!(
486                    "Failed to delete pact broker path '{}' - {}. URL: '{}'",
487                    &path, err, &self.url,
488                ))
489            })?;
490
491        self.parse_broker_response(path.to_string(), response).await
492    }
493
494    async fn parse_broker_response(
495        &self,
496        path: String,
497        response: reqwest::Response,
498    ) -> Result<Value, PactBrokerError> {
499        let is_json_content_type = json_content_type(&response);
500        let content_type = content_type(&response);
501        let status_code = response.status();
502
503        if status_code.is_success() {
504            if is_json_content_type {
505                response.json::<Value>()
506            .await
507            .map_err(|err| PactBrokerError::ContentError(
508              format!("Did not get a valid HAL response body from pact broker path '{}' - {}. URL: '{}'",
509                      path, err, self.url)
510            ))
511            } else if status_code.as_u16() == 204 {
512                Ok(json!({}))
513            } else {
514                debug!("Request from broker was a success, but the response body was not JSON");
515                Err(PactBrokerError::ContentError(format!(
516                    "Did not get a valid HAL response body from pact broker path '{}', content type is '{}'. URL: '{}'",
517                    path, content_type, self.url
518                )))
519            }
520        } else if status_code.as_u16() == 404 {
521            Err(PactBrokerError::NotFound(format!(
522                "Request to pact broker path '{}' failed: {}. URL: '{}'",
523                path, status_code, self.url
524            )))
525        } else if status_code.as_u16() == 400 {
526            let body = response.bytes().await.map_err(|_| {
527                PactBrokerError::IoError(format!(
528                    "Failed to download response body for path '{}'. URL: '{}'",
529                    &path, self.url
530                ))
531            })?;
532
533            if is_json_content_type {
534                let errors = serde_json::from_slice(&body)
535            .map_err(|err| PactBrokerError::ContentError(
536              format!("Did not get a valid HAL response body from pact broker path '{}' - {}. URL: '{}'",
537                      path, err, self.url)
538            ))?;
539                Err(handle_validation_errors(errors))
540            } else {
541                let body = from_utf8(&body)
542                    .map(|b| b.to_string())
543                    .unwrap_or_else(|err| format!("could not read body: {}", err));
544                error!("Request to pact broker path '{}' failed: {}", path, body);
545                Err(PactBrokerError::IoError(format!(
546                    "Request to pact broker path '{}' failed: {}. URL: '{}'",
547                    path, status_code, self.url
548                )))
549            }
550        } else {
551            Err(PactBrokerError::IoError(format!(
552                "Request to pact broker path '{}' failed: {}. URL: '{}'",
553                path, status_code, self.url
554            )))
555        }
556    }
557
558    fn parse_link_url(
559        &self,
560        link: &Link,
561        values: &HashMap<String, String>,
562    ) -> Result<String, PactBrokerError> {
563        match link.href {
564            Some(ref href) => {
565                debug!("templated URL = {}", href);
566                let re = Regex::new(r"\{(\w+)}").unwrap();
567                let final_url = re.replace_all(href, |caps: &Captures| {
568                    let lookup = caps.get(1).unwrap().as_str();
569                    trace!("Looking up value for key '{}'", lookup);
570                    match values.get(lookup) {
571                        Some(val) => urlencoding::encode(val.as_str()).to_string(),
572                        None => {
573                            warn!(
574                                "No value was found for key '{}', mapped values are {:?}",
575                                lookup, values
576                            );
577                            format!("{{{}}}", lookup)
578                        }
579                    }
580                });
581                debug!("final URL = {}", final_url);
582                Ok(final_url.to_string())
583            }
584            None => Err(PactBrokerError::LinkError(format!(
585                "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', LINK: '{}'",
586                self.url, link.name
587            ))),
588        }
589    }
590
591    /// Iterate over all the links by name
592    pub fn iter_links(&self, link: &str) -> Result<Vec<Link>, PactBrokerError> {
593        match self.path_info {
594      None => Err(PactBrokerError::LinkError(format!("No previous resource has been fetched from the pact broker. URL: '{}', LINK: '{}'",
595        self.url, link))),
596      Some(ref json) => match json.get("_links") {
597        Some(json) => match json.get(&link) {
598          Some(link_data) => link_data.as_array()
599              .map(|link_data| link_data.iter().map(|link_json| match link_json {
600                Value::Object(data) => Link::from_json(&link, data),
601                Value::String(s) => Link { name: link.to_string(), href: Some(s.clone()), templated: false, title: None },
602                _ => Link { name: link.to_string(), href: Some(link_json.to_string()), templated: false, title: None }
603              }).collect())
604              .ok_or_else(|| PactBrokerError::LinkError(format!("Link is malformed, expected an object but got {}. URL: '{}', LINK: '{}'",
605                  link_data, self.url, link))),
606          None => Err(PactBrokerError::LinkError(format!("Link '{}' was not found in the response, only the following links where found: {:?}. URL: '{}', LINK: '{}'",
607            link, json.as_object().unwrap_or(&json!({}).as_object().unwrap()).keys().join(", "), self.url, link)))
608        },
609        None => Err(PactBrokerError::LinkError(format!("Expected a HAL+JSON response from the pact broker, but got a response with no '_links'. URL: '{}', LINK: '{}'",
610          self.url, link)))
611      }
612    }
613    }
614
615    pub async fn post_json(
616        &self,
617        url: &str,
618        body: &str,
619        headers: Option<HashMap<String, String>>,
620    ) -> Result<serde_json::Value, PactBrokerError> {
621        trace!("post_json(url='{}', body='{}')", url, body);
622
623        self.send_document(url, body, Method::POST, headers).await
624    }
625
626    pub async fn put_json(
627        &self,
628        url: &str,
629        body: &str,
630        headers: Option<HashMap<String, String>>,
631    ) -> Result<serde_json::Value, PactBrokerError> {
632        trace!("put_json(url='{}', body='{}')", url, body);
633
634        self.send_document(url, body, Method::PUT, headers).await
635    }
636    pub async fn patch_json(
637        &self,
638        url: &str,
639        body: &str,
640        headers: Option<HashMap<String, String>>,
641    ) -> Result<serde_json::Value, PactBrokerError> {
642        trace!("put_json(url='{}', body='{}')", url, body);
643
644        self.send_document(url, body, Method::PATCH, headers).await
645    }
646
647    async fn send_document(
648        &self,
649        url: &str,
650        body: &str,
651        method: Method,
652        headers: Option<HashMap<String, String>>,
653    ) -> Result<Value, PactBrokerError> {
654        let method_type = method.clone();
655        debug!("Sending JSON to {} using {}: {}", url, method, body);
656
657        let base_url = &self.url.parse::<Url>()?;
658        let url = if url.starts_with("/") {
659            base_url.join(url)?
660        } else {
661            let url = url.parse::<Url>()?;
662            base_url.join(&url.path())?
663        };
664
665        let request_builder = match self.auth {
666            Some(ref auth) => match auth {
667                HttpAuth::User(username, password) => self
668                    .client
669                    .request(method, url.clone())
670                    .basic_auth(username, password.clone()),
671                HttpAuth::Token(token) => {
672                    self.client.request(method, url.clone()).bearer_auth(token)
673                }
674                _ => self.client.request(method, url.clone()),
675            },
676            None => self.client.request(method, url.clone()),
677        }
678        .header("Accept", "application/hal+json")
679        .body(body.to_string());
680
681        // Add any additional headers if provided
682
683        let request_builder = if let Some(ref headers) = headers {
684            headers
685                .iter()
686                .fold(request_builder, |builder, (key, value)| {
687                    builder.header(key.as_str(), value.as_str())
688                })
689        } else {
690            request_builder
691        };
692
693        let request_builder = if method_type == Method::PATCH {
694            request_builder.header("Content-Type", "application/merge-patch+json")
695        } else {
696            request_builder.header("Content-Type", "application/json")
697        };
698        let response = with_retries(self.retries, request_builder).await;
699        match response {
700            Ok(res) => {
701                self.parse_broker_response(url.path().to_string(), res)
702                    .await
703            }
704            Err(err) => Err(PactBrokerError::IoError(format!(
705                "Failed to send JSON to the pact broker URL '{}' - IoError {}",
706                url, err
707            ))),
708        }
709    }
710
711    fn with_doc_context(self, doc_attributes: &[Link]) -> Result<HALClient, PactBrokerError> {
712        let links: serde_json::Map<String, serde_json::Value> = doc_attributes
713            .iter()
714            .map(|link| (link.name.clone(), link.as_json()))
715            .collect();
716        let links_json = json!({
717          "_links": json!(links)
718        });
719        Ok(self.update_path_info(links_json))
720    }
721}
722
723fn handle_validation_errors(body: Value) -> PactBrokerError {
724    match &body {
725        Value::Object(attrs) => {
726            if let Some(errors) = attrs.get("errors") {
727                match errors {
728                    Value::Array(values) => PactBrokerError::ValidationError(
729                        values.iter().map(|v| json_to_string(v)).collect(),
730                    ),
731                    Value::Object(errors) => PactBrokerError::ValidationError(
732                        errors
733                            .iter()
734                            .map(|(field, errors)| match errors {
735                                Value::String(error) => format!("{}: {}", field, error),
736                                Value::Array(errors) => format!(
737                                    "{}: {}",
738                                    field,
739                                    errors.iter().map(|err| json_to_string(err)).join(", ")
740                                ),
741                                _ => format!("{}: {}", field, errors),
742                            })
743                            .collect(),
744                    ),
745                    Value::String(s) => PactBrokerError::ValidationError(vec![s.clone()]),
746                    _ => PactBrokerError::ValidationError(vec![errors.to_string()]),
747                }
748            } else {
749                PactBrokerError::ValidationError(vec![body.to_string()])
750            }
751        }
752        Value::String(s) => PactBrokerError::ValidationError(vec![s.clone()]),
753        _ => PactBrokerError::ValidationError(vec![body.to_string()]),
754    }
755}
756
757impl HALClient {
758    pub fn setup(url: &str, auth: Option<HttpAuth>, ssl_options: SslOptions) -> HALClient {
759        let mut builder = reqwest::Client::builder().user_agent(format!(
760            "{}/{}",
761            env!("CARGO_PKG_NAME"),
762            env!("CARGO_PKG_VERSION")
763        ));
764
765        debug!("Using ssl_options: {:?}", ssl_options);
766        if let Some(ref path) = ssl_options.ssl_cert_path {
767            if let Ok(cert_bytes) = std::fs::read(path) {
768                if let Ok(cert) = reqwest::Certificate::from_pem_bundle(&cert_bytes) {
769                    debug!("Adding SSL certificate from path: {}", path);
770                    for c in cert {
771                        builder = builder.add_root_certificate(c.clone());
772                    }
773                }
774            } else {
775                debug!(
776                    "Could not read SSL certificate from provided path: {}",
777                    path
778                );
779            }
780        }
781        if ssl_options.skip_ssl {
782            builder = builder.danger_accept_invalid_certs(true);
783            debug!("Skipping SSL certificate validation");
784        }
785        if !ssl_options.use_root_trust_store {
786            builder = builder.tls_built_in_root_certs(false);
787            debug!("Disabling root trust store for SSL");
788        }
789
790        let built_client = builder.build().unwrap();
791        let client = ClientBuilder::new(built_client)
792            .with(TracingMiddleware::default())
793            .with(OtelPropagatorMiddleware)
794            .build();
795
796        HALClient {
797            client,
798            url: url.to_string(),
799            path_info: None,
800            auth,
801            retries: 3,
802            ssl_options,
803        }
804    }
805}
806
807pub fn links_from_json(json: &Value) -> Vec<Link> {
808    match json.get("_links") {
809        Some(json) => match json {
810            Value::Object(v) => v
811                .iter()
812                .map(|(link, json)| match json {
813                    Value::Object(attr) => Link::from_json(link, attr),
814                    _ => Link {
815                        name: link.clone(),
816                        ..Link::default()
817                    },
818                })
819                .collect(),
820            _ => vec![],
821        },
822        None => vec![],
823    }
824}
825
826/// Fetches the pacts from the broker that match the provider name
827pub async fn fetch_pacts_from_broker(
828    broker_url: &str,
829    provider_name: &str,
830    auth: Option<HttpAuth>,
831    ssl_options: SslOptions,
832) -> anyhow::Result<
833    Vec<
834        anyhow::Result<(
835            Box<dyn Pact + Send + Sync + RefUnwindSafe>,
836            Option<PactVerificationContext>,
837            Vec<Link>,
838        )>,
839    >,
840> {
841    trace!(
842        "fetch_pacts_from_broker(broker_url='{}', provider_name='{}', auth={})",
843        broker_url,
844        provider_name,
845        auth.clone().unwrap_or_default()
846    );
847
848    let mut hal_client = HALClient::with_url(broker_url, auth, ssl_options);
849    let template_values = hashmap! { "provider".to_string() => provider_name.to_string() };
850
851    hal_client = hal_client
852        .navigate("pb:latest-provider-pacts", &template_values)
853        .await
854        .map_err(move |err| match err {
855            PactBrokerError::NotFound(_) => PactBrokerError::NotFound(format!(
856                "No pacts for provider '{}' where found in the pact broker. URL: '{}'",
857                provider_name, broker_url
858            )),
859            _ => err,
860        })?;
861
862    let pact_links = hal_client.clone().iter_links("pacts")?;
863
864    let results: Vec<_> = futures::stream::iter(pact_links)
865        .map(|ref pact_link| {
866          match pact_link.href {
867            Some(_) => Ok((hal_client.clone(), pact_link.clone())),
868            None => Err(
869              PactBrokerError::LinkError(
870                format!(
871                  "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', LINK: '{:?}'",
872                  &hal_client.url,
873                  pact_link
874                )
875              )
876            )
877          }
878        })
879        .and_then(|(hal_client, pact_link)| async {
880          let pact_json = hal_client.fetch_url(
881            &pact_link.clone(),
882            &template_values.clone()
883          ).await?;
884          Ok((pact_link, pact_json))
885        })
886        .map(|result| {
887          match result {
888            Ok((pact_link, pact_json)) => {
889              let href = pact_link.href.unwrap_or_default();
890              let links = links_from_json(&pact_json);
891              load_pact_from_json(href.as_str(), &pact_json)
892                .map(|pact| (pact, None, links))
893            },
894            Err(err) => Err(err.into())
895          }
896        })
897        .into_stream()
898        .collect()
899        .await;
900
901    Ok(results)
902}
903
904/// Fetch Pacts from the broker using the "provider-pacts-for-verification" endpoint
905pub async fn fetch_pacts_dynamically_from_broker(
906    broker_url: &str,
907    provider_name: String,
908    pending: bool,
909    include_wip_pacts_since: Option<String>,
910    provider_tags: Vec<String>,
911    provider_branch: Option<String>,
912    consumer_version_selectors: Vec<ConsumerVersionSelector>,
913    auth: Option<HttpAuth>,
914    ssl_options: SslOptions,
915    headers: Option<HashMap<String, String>>,
916) -> anyhow::Result<
917    Vec<
918        Result<
919            (
920                Box<dyn Pact + Send + Sync + RefUnwindSafe>,
921                Option<PactVerificationContext>,
922                Vec<Link>,
923            ),
924            PactBrokerError,
925        >,
926    >,
927> {
928    trace!(
929        "fetch_pacts_dynamically_from_broker(broker_url='{}', provider_name='{}', pending={}, \
930    include_wip_pacts_since={:?}, provider_tags: {:?}, consumer_version_selectors: {:?}, auth={})",
931        broker_url,
932        provider_name,
933        pending,
934        include_wip_pacts_since,
935        provider_tags,
936        consumer_version_selectors,
937        auth.clone().unwrap_or_default()
938    );
939
940    let mut hal_client = HALClient::with_url(broker_url, auth, ssl_options);
941    let template_values = hashmap! { "provider".to_string() => provider_name.clone() };
942
943    hal_client = hal_client
944        .navigate("pb:provider-pacts-for-verification", &template_values)
945        .await
946        .map_err(move |err| match err {
947            PactBrokerError::NotFound(_) => PactBrokerError::NotFound(format!(
948                "No pacts for provider '{}' were found in the pact broker. URL: '{}'",
949                provider_name.clone(),
950                broker_url
951            )),
952            _ => err,
953        })?;
954
955    // Construct the Pacts for verification payload
956    let pacts_for_verification = PactsForVerificationRequest {
957        provider_version_tags: provider_tags,
958        provider_version_branch: provider_branch,
959        include_wip_pacts_since,
960        consumer_version_selectors,
961        include_pending_status: pending,
962    };
963    let request_body = serde_json::to_string(&pacts_for_verification).unwrap();
964
965    // Post the verification request
966    let response = match hal_client.find_link("self") {
967        Ok(link) => {
968            let link = hal_client.clone().parse_link_url(&link, &hashmap! {})?;
969            match hal_client
970                .clone()
971                .post_json(link.as_str(), request_body.as_str(), headers)
972                .await
973            {
974                Ok(res) => Some(res),
975                Err(err) => {
976                    info!("error response for pacts for verification: {} ", err);
977                    return Err(anyhow!(err));
978                }
979            }
980        }
981        Err(e) => return Err(anyhow!(e)),
982    };
983
984    // Find all of the Pact links
985    let pact_links = match response {
986        Some(v) => {
987            let pfv: PactsForVerificationResponse = serde_json::from_value(v)
988                .map_err(|err| {
989                    trace!(
990                        "Failed to deserialise PactsForVerificationResponse: {}",
991                        err
992                    );
993                    err
994                })
995                .unwrap_or(PactsForVerificationResponse {
996                    embedded: PactsForVerificationBody { pacts: vec![] },
997                });
998            trace!(?pfv, "got pacts for verification response");
999
1000            if pfv.embedded.pacts.len() == 0 {
1001                return Err(anyhow!(PactBrokerError::NotFound(format!(
1002                    "No pacts were found for this provider"
1003                ))));
1004            };
1005
1006            let links: Result<Vec<(Link, PactVerificationContext)>, PactBrokerError> = pfv.embedded.pacts.iter().map(| p| {
1007          match p.links.get("self") {
1008            Some(l) => Ok((l.clone(), p.into())),
1009            None => Err(
1010              PactBrokerError::LinkError(
1011                format!(
1012                  "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', PATH: '{:?}'",
1013                  &hal_client.url,
1014                  &p.links,
1015                )
1016              )
1017            )
1018          }
1019        }).collect();
1020
1021            links
1022        }
1023        None => Err(PactBrokerError::NotFound(format!(
1024            "No pacts were found for this provider"
1025        ))),
1026    }?;
1027
1028    let results: Vec<_> = futures::stream::iter(pact_links)
1029      .map(|(ref pact_link, ref context)| {
1030        match pact_link.href {
1031          Some(_) => Ok((hal_client.clone(), pact_link.clone(), context.clone())),
1032          None => Err(
1033            PactBrokerError::LinkError(
1034              format!(
1035                "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', LINK: '{:?}'",
1036                &hal_client.url,
1037                pact_link
1038              )
1039            )
1040          )
1041        }
1042      })
1043      .and_then(|(hal_client, pact_link, context)| async {
1044        let pact_json = hal_client.fetch_url(
1045          &pact_link.clone(),
1046          &template_values.clone()
1047        ).await?;
1048        Ok((pact_link, pact_json, context))
1049      })
1050      .map(|result| {
1051        match result {
1052          Ok((pact_link, pact_json, context)) => {
1053            let href = pact_link.href.unwrap_or_default();
1054            let links = links_from_json(&pact_json);
1055            load_pact_from_json(href.as_str(), &pact_json)
1056              .map(|pact| (pact, Some(context), links))
1057              .map_err(|err| PactBrokerError::ContentError(format!("{}", err)))
1058          },
1059          Err(err) => Err(err)
1060        }
1061      })
1062      .into_stream()
1063      .collect()
1064      .await;
1065
1066    Ok(results)
1067}
1068
1069/// Fetch the Pact from the given URL, using any required authentication. This will use a GET
1070/// request to the given URL and parse the result into a Pact model. It will also look for any HAL
1071/// links in the response, returning those if found.
1072pub async fn fetch_pact_from_url(
1073    url: &str,
1074    auth: &Option<HttpAuth>,
1075) -> anyhow::Result<(Box<dyn Pact + Send + Sync + RefUnwindSafe>, Vec<Link>)> {
1076    let url = url.to_string();
1077    let auth = auth.clone();
1078    let (url, pact_json) =
1079        tokio::task::spawn_blocking(move || http_utils::fetch_json_from_url(&url, &auth)).await??;
1080    let pact = load_pact_from_json(&url, &pact_json)?;
1081    let links = links_from_json(&pact_json);
1082    Ok((pact, links))
1083}
1084
1085async fn publish_provider_tags(
1086    hal_client: &HALClient,
1087    links: &[Link],
1088    provider_tags: Vec<String>,
1089    version: &str,
1090    headers: Option<HashMap<String, String>>,
1091) -> Result<(), PactBrokerError> {
1092    let hal_client = hal_client
1093        .clone()
1094        .with_doc_context(links)?
1095        .navigate("pb:provider", &hashmap! {})
1096        .await?;
1097    match hal_client.find_link("pb:version-tag") {
1098        Ok(link) => {
1099            for tag in &provider_tags {
1100                let template_values = hashmap! {
1101                  "version".to_string() => version.to_string(),
1102                  "tag".to_string() => tag.clone()
1103                };
1104                match hal_client
1105                    .clone()
1106                    .put_json(
1107                        hal_client
1108                            .clone()
1109                            .parse_link_url(&link, &template_values)?
1110                            .as_str(),
1111                        "{}",
1112                        headers.clone(),
1113                    )
1114                    .await
1115                {
1116                    Ok(_) => debug!("Pushed tag {} for provider version {}", tag, version),
1117                    Err(err) => {
1118                        error!(
1119                            "Failed to push tag {} for provider version {}",
1120                            tag, version
1121                        );
1122                        return Err(err);
1123                    }
1124                }
1125            }
1126            Ok(())
1127        }
1128        Err(_) => Err(PactBrokerError::LinkError(
1129            "Can't publish provider tags as there is no 'pb:version-tag' link".to_string(),
1130        )),
1131    }
1132}
1133
1134async fn publish_provider_branch(
1135    hal_client: &HALClient,
1136    links: &[Link],
1137    branch: &str,
1138    version: &str,
1139    headers: Option<HashMap<String, String>>,
1140) -> Result<(), PactBrokerError> {
1141    let hal_client = hal_client
1142        .clone()
1143        .with_doc_context(links)?
1144        .navigate("pb:provider", &hashmap! {})
1145        .await?;
1146
1147    match hal_client.find_link("pb:branch-version") {
1148    Ok(link) => {
1149      let template_values = hashmap! {
1150        "branch".to_string() => branch.to_string(),
1151        "version".to_string() => version.to_string(),
1152      };
1153      match hal_client.clone().put_json(hal_client.clone().parse_link_url(&link, &template_values)?.as_str(), "{}",headers).await {
1154        Ok(_) => debug!("Pushed branch {} for provider version {}", branch, version),
1155        Err(err) => {
1156          error!("Failed to push branch {} for provider version {}", branch, version);
1157          return Err(err);
1158        }
1159      }
1160      Ok(())
1161    },
1162    Err(_) => Err(PactBrokerError::LinkError("Can't publish provider branch as there is no 'pb:branch-version' link. Please ugrade to Pact Broker version 2.86.0 or later for branch support".to_string()))
1163  }
1164}
1165
1166#[skip_serializing_none]
1167#[derive(Serialize, Deserialize, Debug, Clone)]
1168#[serde(rename_all = "camelCase")]
1169/// Structure to represent a HAL link
1170pub struct ConsumerVersionSelector {
1171    /// Application name to filter the results on
1172    pub consumer: Option<String>,
1173    /// Tag
1174    pub tag: Option<String>,
1175    /// Fallback tag if Tag doesn't exist
1176    pub fallback_tag: Option<String>,
1177    /// Only select the latest (if false, this selects all pacts for a tag)
1178    pub latest: Option<bool>,
1179    /// Applications that have been deployed or released
1180    pub deployed_or_released: Option<bool>,
1181    /// Applications that have been deployed
1182    pub deployed: Option<bool>,
1183    /// Applications that have been released
1184    pub released: Option<bool>,
1185    /// Applications in a given environment
1186    pub environment: Option<String>,
1187    /// Applications with the default branch set in the broker
1188    pub main_branch: Option<bool>,
1189    /// Applications with the given branch
1190    pub branch: Option<String>,
1191    /// Applications that match the the provider version branch sent during verification
1192    pub matching_branch: Option<bool>,
1193}
1194
1195#[derive(Serialize, Deserialize, Debug, Clone)]
1196#[serde(rename_all = "camelCase")]
1197struct PactsForVerificationResponse {
1198    #[serde(rename(deserialize = "_embedded"))]
1199    pub embedded: PactsForVerificationBody,
1200}
1201
1202#[derive(Serialize, Deserialize, Debug, Clone)]
1203#[serde(rename_all = "camelCase")]
1204struct PactsForVerificationBody {
1205    pub pacts: Vec<PactForVerification>,
1206}
1207
1208#[derive(Serialize, Deserialize, Debug, Clone)]
1209#[serde(rename_all = "camelCase")]
1210struct PactForVerification {
1211    pub short_description: String,
1212    #[serde(rename(deserialize = "_links"))]
1213    pub links: HashMap<String, Link>,
1214    pub verification_properties: Option<PactVerificationProperties>,
1215}
1216
1217#[skip_serializing_none]
1218#[derive(Serialize, Deserialize, Debug, Clone)]
1219#[serde(rename_all = "camelCase")]
1220/// Request to send to determine the pacts to verify
1221pub struct PactsForVerificationRequest {
1222    /// Provider tags to use for determining pending pacts (if enabled)
1223    #[serde(skip_serializing_if = "Vec::is_empty")]
1224    pub provider_version_tags: Vec<String>,
1225    /// Enable pending pacts feature
1226    pub include_pending_status: bool,
1227    /// Find WIP pacts after given date
1228    pub include_wip_pacts_since: Option<String>,
1229    /// Detailed pact selection criteria , see https://docs.pact.io/pact_broker/advanced_topics/consumer_version_selectors/
1230    pub consumer_version_selectors: Vec<ConsumerVersionSelector>,
1231    /// Current provider version branch if used (instead of tags)
1232    pub provider_version_branch: Option<String>,
1233}
1234
1235#[skip_serializing_none]
1236#[derive(Serialize, Deserialize, Debug, Clone)]
1237#[serde(rename_all = "camelCase")]
1238/// Provides the context on why a Pact was included
1239pub struct PactVerificationContext {
1240    /// Description
1241    pub short_description: String,
1242    /// Properties
1243    pub verification_properties: PactVerificationProperties,
1244}
1245
1246impl From<&PactForVerification> for PactVerificationContext {
1247    fn from(value: &PactForVerification) -> Self {
1248        PactVerificationContext {
1249            short_description: value.short_description.clone(),
1250            verification_properties: value.verification_properties.clone().unwrap_or_default(),
1251        }
1252    }
1253}
1254
1255#[skip_serializing_none]
1256#[derive(Serialize, Deserialize, Debug, Clone, Default)]
1257#[serde(rename_all = "camelCase")]
1258/// Properties associated with the verification context
1259pub struct PactVerificationProperties {
1260    #[serde(default)]
1261    /// If the Pact is pending
1262    pub pending: bool,
1263    /// Notices provided by the Pact Broker
1264    pub notices: Vec<HashMap<String, String>>,
1265}