pact_broker_cli/cli/pact_broker/
main.rs

1//! Structs and functions for interacting with a Pact Broker
2
3use std::collections::HashMap;
4use std::ops::Not;
5use std::panic::RefUnwindSafe;
6use std::str::from_utf8;
7
8use anyhow::anyhow;
9use futures::stream::*;
10
11use itertools::Itertools;
12use maplit::hashmap;
13
14use pact_models::http_utils;
15use pact_models::http_utils::HttpAuth;
16use pact_models::json_utils::json_to_string;
17use pact_models::pact::{Pact, load_pact_from_json};
18use regex::{Captures, Regex};
19use reqwest::{Method, Url};
20use serde::{Deserialize, Serialize};
21use serde_json::{Value, json};
22use serde_with::skip_serializing_none;
23use tracing::{debug, error, info, trace, warn};
24pub mod branches;
25pub mod can_i_deploy;
26pub mod deployments;
27pub mod environments;
28pub mod pact_publish;
29pub mod pacticipants;
30pub mod pacts;
31pub mod subcommands;
32pub mod tags;
33#[cfg(test)]
34pub mod test_utils;
35pub mod types;
36pub mod utils;
37pub mod verification;
38pub mod versions;
39pub mod webhooks;
40use utils::with_retries;
41// for otel
42use http::Extensions;
43use opentelemetry::Context;
44use opentelemetry::global;
45use opentelemetry_http::HeaderInjector;
46use reqwest::Request;
47use reqwest::Response;
48use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
49use reqwest_middleware::{Middleware, Next};
50use reqwest_tracing::TracingMiddleware;
51use crate::cli::utils::{CYAN, GREEN, RED, YELLOW};
52
53use crate::cli::pact_broker::main::types::SslOptions;
54
55pub fn process_notices(notices: &[Notice]) {
56    for notice in notices {
57        let notice_text = notice.text.to_string();
58        let formatted_text = notice_text
59            .split_whitespace()
60            .map(|word| {
61                if word.starts_with("https") || word.starts_with("http") {
62                    format!("{}", CYAN.apply_to(word))
63                } else {
64                    match notice.type_field.as_str() {
65                        "success" => format!("{}", GREEN.apply_to(word)),
66                        "warning" | "prompt" => format!("{}", YELLOW.apply_to(word)),
67                        "error" | "danger" => format!("{}", RED.apply_to(word)),
68                        _ => word.to_string(),
69                    }
70                }
71            })
72            .collect::<Vec<String>>()
73            .join(" ");
74        println!("{}", formatted_text);
75    }
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
79#[serde(rename_all = "camelCase")]
80pub struct Notice {
81    pub text: String,
82    #[serde(rename = "type")]
83    pub type_field: String,
84}
85
86fn is_true(object: &serde_json::Map<String, Value>, field: &str) -> bool {
87    match object.get(field) {
88        Some(json) => match *json {
89            serde_json::Value::Bool(b) => b,
90            _ => false,
91        },
92        None => false,
93    }
94}
95
96fn as_string(json: &Value) -> String {
97    match *json {
98        serde_json::Value::String(ref s) => s.clone(),
99        _ => format!("{}", json),
100    }
101}
102
103fn content_type(response: &reqwest::Response) -> String {
104    match response.headers().get("content-type") {
105        Some(value) => value.to_str().unwrap_or("text/plain").into(),
106        None => "text/plain".to_string(),
107    }
108}
109
110fn json_content_type(response: &reqwest::Response) -> bool {
111    match content_type(response).parse::<mime::Mime>() {
112        Ok(mime) => {
113            match (
114                mime.type_().as_str(),
115                mime.subtype().as_str(),
116                mime.suffix(),
117            ) {
118                ("application", "json", None) => true,
119                ("application", "hal", Some(mime::JSON)) => true,
120                _ => false,
121            }
122        }
123        Err(_) => false,
124    }
125}
126
127fn find_entry(map: &serde_json::Map<String, Value>, key: &str) -> Option<(String, Value)> {
128    match map.keys().find(|k| k.to_lowercase() == key.to_lowercase()) {
129        Some(k) => map.get(k).map(|v| (key.to_string(), v.clone())),
130        None => None,
131    }
132}
133
134/// Errors that can occur with a Pact Broker
135#[derive(Debug, Clone, thiserror::Error)]
136pub enum PactBrokerError {
137    /// Error with a HAL link
138    #[error("Error with a HAL link - {0}")]
139    LinkError(String),
140    /// Error with the content of a HAL resource
141    #[error("Error with the content of a HAL resource - {0}")]
142    ContentError(String),
143    #[error("IO Error - {0}")]
144    /// IO Error
145    IoError(String),
146    /// Link/Resource was not found
147    #[error("Link/Resource was not found - {0}")]
148    NotFound(String),
149    /// Invalid URL
150    #[error("Invalid URL - {0}")]
151    UrlError(String),
152    /// Validation error
153    #[error("failed validation - {0:?}")]
154    ValidationError(Vec<String>),
155    /// Validation error with notices
156    #[error("failed validation - {0:?}")]
157    ValidationErrorWithNotices(Vec<String>, Vec<Notice>),
158}
159
160impl PartialEq<String> for PactBrokerError {
161    fn eq(&self, other: &String) -> bool {
162        let mut buffer = String::new();
163        match self {
164            PactBrokerError::LinkError(s) => buffer.push_str(s),
165            PactBrokerError::ContentError(s) => buffer.push_str(s),
166            PactBrokerError::IoError(s) => buffer.push_str(s),
167            PactBrokerError::NotFound(s) => buffer.push_str(s),
168            PactBrokerError::UrlError(s) => buffer.push_str(s),
169            PactBrokerError::ValidationError(errors) => {
170                buffer.push_str(errors.iter().join(", ").as_str())
171            },
172            PactBrokerError::ValidationErrorWithNotices(errors, _) => {
173                buffer.push_str(errors.iter().join(", ").as_str())
174            }
175        };
176        buffer == *other
177    }
178}
179
180impl<'a> PartialEq<&'a str> for PactBrokerError {
181    fn eq(&self, other: &&str) -> bool {
182        let message = match self {
183            PactBrokerError::LinkError(s) => s.clone(),
184            PactBrokerError::ContentError(s) => s.clone(),
185            PactBrokerError::IoError(s) => s.clone(),
186            PactBrokerError::NotFound(s) => s.clone(),
187            PactBrokerError::UrlError(s) => s.clone(),
188            PactBrokerError::ValidationError(errors) => errors.iter().join(", "),
189            PactBrokerError::ValidationErrorWithNotices(errors, _) => errors.iter().join(", "),
190        };
191        message.as_str() == *other
192    }
193}
194
195impl From<url::ParseError> for PactBrokerError {
196    fn from(err: url::ParseError) -> Self {
197        PactBrokerError::UrlError(format!("{}", err))
198    }
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202#[serde(default)]
203/// Structure to represent a HAL link
204pub struct Link {
205    /// Link name
206    pub name: String,
207    /// Link HREF
208    pub href: Option<String>,
209    /// If the link is templated (has expressions in the HREF that need to be expanded)
210    pub templated: bool,
211    /// Link title
212    pub title: Option<String>,
213}
214
215impl Link {
216    /// Create a link from serde JSON data
217    pub fn from_json(link: &str, link_data: &serde_json::Map<String, serde_json::Value>) -> Link {
218        Link {
219            name: link.to_string(),
220            href: find_entry(link_data, &"href".to_string()).map(|(_, href)| as_string(&href)),
221            templated: is_true(link_data, "templated"),
222            title: link_data.get("title").map(|title| as_string(title)),
223        }
224    }
225
226    /// Converts the Link into a JSON representation
227    pub fn as_json(&self) -> serde_json::Value {
228        match (self.href.clone(), self.title.clone()) {
229            (Some(href), Some(title)) => json!({
230              "href": href,
231              "title": title,
232              "templated": self.templated
233            }),
234            (Some(href), None) => json!({
235              "href": href,
236              "templated": self.templated
237            }),
238            (None, Some(title)) => json!({
239              "title": title,
240              "templated": self.templated
241            }),
242            (None, None) => json!({
243              "templated": self.templated
244            }),
245        }
246    }
247}
248
249impl Default for Link {
250    fn default() -> Self {
251        Link {
252            name: "link".to_string(),
253            href: None,
254            templated: false,
255            title: None,
256        }
257    }
258}
259
260/// HAL aware HTTP client
261#[derive(Clone)]
262pub struct HALClient {
263    client: ClientWithMiddleware,
264    url: String,
265    path_info: Option<Value>,
266    auth: Option<HttpAuth>,
267    ssl_options: SslOptions,
268    retries: u8,
269}
270
271struct OtelPropagatorMiddleware;
272
273#[async_trait::async_trait]
274impl Middleware for OtelPropagatorMiddleware {
275    async fn handle(
276        &self,
277        mut req: Request,
278        extensions: &mut Extensions,
279        next: Next<'_>,
280    ) -> reqwest_middleware::Result<Response> {
281        let cx = Context::current();
282        let mut headers = reqwest::header::HeaderMap::new();
283        global::get_text_map_propagator(|propagator| {
284            propagator.inject_context(&cx, &mut HeaderInjector(&mut headers))
285        });
286        headers.append(
287            "baggage",
288            reqwest::header::HeaderValue::from_static("is_synthetic=true"),
289        );
290
291        for (key, value) in headers.iter() {
292            req.headers_mut().append(key, value.clone());
293        }
294        let res = next.run(req, extensions).await;
295        res
296    }
297}
298
299pub trait WithCurrentSpan {
300    fn with_current_span<F, R>(&self, f: F) -> R
301    where
302        F: FnOnce() -> R;
303}
304
305impl<T> WithCurrentSpan for T {
306    fn with_current_span<F, R>(&self, f: F) -> R
307    where
308        F: FnOnce() -> R,
309    {
310        let span = tracing::Span::current();
311        let _enter = span.enter();
312        f()
313    }
314}
315
316impl HALClient {
317    /// Initialise a client with the URL and authentication
318    pub fn with_url(url: &str, auth: Option<HttpAuth>, ssl_options: SslOptions) -> HALClient {
319        HALClient {
320            url: url.to_string(),
321            auth: auth.clone(),
322            ssl_options: ssl_options.clone(),
323            ..HALClient::setup(url, auth, ssl_options)
324        }
325    }
326
327    fn update_path_info(self, path_info: serde_json::Value) -> HALClient {
328        HALClient {
329            client: self.client.clone(),
330            url: self.url.clone(),
331            path_info: Some(path_info),
332            auth: self.auth,
333            retries: self.retries,
334            ssl_options: self.ssl_options,
335        }
336    }
337
338    /// Navigate to the resource from the link name
339    pub async fn navigate(
340        self,
341        link: &'static str,
342        template_values: &HashMap<String, String>,
343    ) -> Result<HALClient, PactBrokerError> {
344        trace!(
345            "navigate(link='{}', template_values={:?})",
346            link, template_values
347        );
348
349        let client = if self.path_info.is_none() {
350            let path_info = self.clone().fetch("/".into()).await?;
351            self.update_path_info(path_info)
352        } else {
353            self
354        };
355
356        let path_info = client.clone().fetch_link(link, template_values).await?;
357        let client = client.update_path_info(path_info);
358
359        Ok(client)
360    }
361
362    fn find_link(&self, link: &'static str) -> Result<Link, PactBrokerError> {
363        match self.path_info {
364            None => Err(PactBrokerError::LinkError(format!("No previous resource has been fetched from the pact broker. URL: '{}', LINK: '{}'",
365                self.url, link))),
366            Some(ref json) => match json.get("_links") {
367                Some(json) => match json.get(link) {
368                    Some(link_data) => link_data.as_object()
369                        .map(|link_data| Link::from_json(&link.to_string(), &link_data))
370                        .ok_or_else(|| PactBrokerError::LinkError(format!("Link is malformed, expected an object but got {}. URL: '{}', LINK: '{}'",
371                            link_data, self.url, link))),
372                    None => Err(PactBrokerError::LinkError(format!("Link '{}' was not found in the response, only the following links where found: {:?}. URL: '{}', LINK: '{}'",
373                        link, json.as_object().unwrap_or(&json!({}).as_object().unwrap()).keys().join(", "), self.url, link)))
374                },
375                None => Err(PactBrokerError::LinkError(format!("Expected a HAL+JSON response from the pact broker, but got a response with no '_links'. URL: '{}', LINK: '{}'",
376                    self.url, link)))
377            }
378        }
379    }
380
381    async fn fetch_link(
382        self,
383        link: &'static str,
384        template_values: &HashMap<String, String>,
385    ) -> Result<Value, PactBrokerError> {
386        trace!(
387            "fetch_link(link='{}', template_values={:?})",
388            link, template_values
389        );
390
391        let link_data = self.find_link(link)?;
392
393        self.fetch_url(&link_data, template_values).await
394    }
395
396    /// Fetch the resource at the Link from the Pact broker
397    pub async fn fetch_url(
398        self,
399        link: &Link,
400        template_values: &HashMap<String, String>,
401    ) -> Result<Value, PactBrokerError> {
402        debug!(
403            "fetch_url(link={:?}, template_values={:?})",
404            link, template_values
405        );
406
407        let link_url = if link.templated {
408            debug!("Link URL is templated");
409            self.clone().parse_link_url(&link, &template_values)
410        } else {
411            link.href.clone().ok_or_else(|| {
412                PactBrokerError::LinkError(format!(
413                    "Link is malformed, there is no href. URL: '{}', LINK: '{}'",
414                    self.url, link.name
415                ))
416            })
417        }?;
418
419        let base_url = self.url.parse::<Url>()?;
420        let joined_url = base_url.join(&link_url)?;
421        self.fetch(joined_url.path().into()).await
422    }
423    pub async fn delete_url(
424        self,
425        link: &Link,
426        template_values: &HashMap<String, String>,
427    ) -> Result<Value, PactBrokerError> {
428        debug!(
429            "fetch_url(link={:?}, template_values={:?})",
430            link, template_values
431        );
432
433        let link_url = if link.templated {
434            debug!("Link URL is templated");
435            self.clone().parse_link_url(&link, &template_values)
436        } else {
437            link.href.clone().ok_or_else(|| {
438                PactBrokerError::LinkError(format!(
439                    "Link is malformed, there is no href. URL: '{}', LINK: '{}'",
440                    self.url, link.name
441                ))
442            })
443        }?;
444
445        let base_url = self.url.parse::<Url>()?;
446        debug!("base_url: {}", base_url);
447        debug!("link_url: {}", link_url);
448        let joined_url = base_url.join(&link_url)?;
449        debug!("joined_url: {}", joined_url);
450        self.delete(joined_url.path().into()).await
451    }
452
453    pub async fn fetch(self, path: &str) -> Result<Value, PactBrokerError> {
454        info!("Fetching path '{}' from pact broker", path);
455
456        let broker_url = self.url.parse::<Url>()?;
457        let context_path = broker_url.path();
458        let url = if context_path.is_empty().not()
459            && context_path != "/"
460            && path.starts_with(context_path)
461        {
462            let mut base_url = broker_url.clone();
463            base_url.set_path("/");
464            base_url.join(path)?
465        } else {
466            broker_url.join(path)?
467        };
468
469        let request_builder = match self.auth {
470            Some(ref auth) => match auth {
471                HttpAuth::User(username, password) => {
472                    self.client.get(url).basic_auth(username, password.clone())
473                }
474                HttpAuth::Token(token) => self.client.get(url).bearer_auth(token),
475                _ => self.client.get(url),
476            },
477            None => self.client.get(url),
478        }
479        .header("accept", "application/hal+json, application/json");
480
481        let response = utils::with_retries(self.retries, request_builder)
482            .await
483            .map_err(|err| {
484                PactBrokerError::IoError(format!(
485                    "Failed to access pact broker path '{}' - {}. URL: '{}'",
486                    &path, err, &self.url,
487                ))
488            })?;
489
490        self.parse_broker_response(path.to_string(), response).await
491    }
492
493    pub async fn delete(self, path: &str) -> Result<Value, PactBrokerError> {
494        info!("Deleting path '{}' from pact broker", path);
495
496        let broker_url = self.url.parse::<Url>()?;
497        let context_path = broker_url.path();
498        let url = if context_path.is_empty().not()
499            && context_path != "/"
500            && path.starts_with(context_path)
501        {
502            let mut base_url = broker_url.clone();
503            base_url.set_path("/");
504            base_url.join(path)?
505        } else {
506            broker_url.join(path)?
507        };
508
509        let request_builder = match self.auth {
510            Some(ref auth) => match auth {
511                HttpAuth::User(username, password) => self
512                    .client
513                    .delete(url)
514                    .basic_auth(username, password.clone()),
515                HttpAuth::Token(token) => self.client.delete(url).bearer_auth(token),
516                _ => self.client.delete(url),
517            },
518            None => self.client.delete(url),
519        };
520
521        let response = utils::with_retries(self.retries, request_builder)
522            .await
523            .map_err(|err| {
524                PactBrokerError::IoError(format!(
525                    "Failed to delete pact broker path '{}' - {}. URL: '{}'",
526                    &path, err, &self.url,
527                ))
528            })?;
529
530        self.parse_broker_response(path.to_string(), response).await
531    }
532
533    async fn parse_broker_response(
534        &self,
535        path: String,
536        response: reqwest::Response,
537    ) -> Result<Value, PactBrokerError> {
538        let is_json_content_type = json_content_type(&response);
539        let content_type = content_type(&response);
540        let status_code = response.status();
541
542        if status_code.is_success() {
543            if is_json_content_type {
544                response.json::<Value>()
545            .await
546            .map_err(|err| PactBrokerError::ContentError(
547              format!("Did not get a valid HAL response body from pact broker path '{}' - {}. URL: '{}'",
548                      path, err, self.url)
549            ))
550            } else if status_code.as_u16() == 204 {
551                Ok(json!({}))
552            } else {
553                debug!("Request from broker was a success, but the response body was not JSON");
554                Err(PactBrokerError::ContentError(format!(
555                    "Did not get a valid HAL response body from pact broker path '{}', content type is '{}'. URL: '{}'",
556                    path, content_type, self.url
557                )))
558            }
559        } else if status_code.as_u16() == 404 {
560            Err(PactBrokerError::NotFound(format!(
561                "Request to pact broker path '{}' failed: {}. URL: '{}'",
562                path, status_code, self.url
563            )))
564        } else {
565            // Handle any error status code (400, 422, 409, etc.)
566            let body = response.bytes().await.map_err(|_| {
567                PactBrokerError::IoError(format!(
568                    "Failed to download response body for path '{}'. URL: '{}'",
569                    &path, self.url
570                ))
571            })?;
572
573            if is_json_content_type {
574                match serde_json::from_slice::<Value>(&body) {
575                    Ok(json_body) => {
576                        if json_body.get("errors").is_some() || json_body.get("notices").is_some() {
577                            Err(handle_validation_errors(json_body))
578                        } else {
579                            Err(PactBrokerError::IoError(format!(
580                                "Request to pact broker path '{}' failed: {}. Response: {}. URL: '{}'",
581                                path, status_code, json_body, self.url
582                            )))
583                        }
584                    }
585                    Err(_) => {
586                        let body_text = from_utf8(&body)
587                            .map(|b| b.to_string())
588                            .unwrap_or_else(|err| format!("could not read body: {}", err));
589                        error!("Request to pact broker path '{}' failed: {}", path, body_text);
590                        Err(PactBrokerError::IoError(format!(
591                            "Request to pact broker path '{}' failed: {}. URL: '{}'",
592                            path, status_code, self.url
593                        )))
594                    }
595                }
596            } else {
597                let body_text = from_utf8(&body)
598                    .map(|b| b.to_string())
599                    .unwrap_or_else(|err| format!("could not read body: {}", err));
600                error!("Request to pact broker path '{}' failed: {}", path, body_text);
601                Err(PactBrokerError::IoError(format!(
602                    "Request to pact broker path '{}' failed: {}. URL: '{}'",
603                    path, status_code, self.url
604                )))
605            }
606        }
607    }
608
609    fn parse_link_url(
610        &self,
611        link: &Link,
612        values: &HashMap<String, String>,
613    ) -> Result<String, PactBrokerError> {
614        match link.href {
615            Some(ref href) => {
616                debug!("templated URL = {}", href);
617                let re = Regex::new(r"\{(\w+)}").unwrap();
618                let final_url = re.replace_all(href, |caps: &Captures| {
619                    let lookup = caps.get(1).unwrap().as_str();
620                    trace!("Looking up value for key '{}'", lookup);
621                    match values.get(lookup) {
622                        Some(val) => urlencoding::encode(val.as_str()).to_string(),
623                        None => {
624                            warn!(
625                                "No value was found for key '{}', mapped values are {:?}",
626                                lookup, values
627                            );
628                            format!("{{{}}}", lookup)
629                        }
630                    }
631                });
632                debug!("final URL = {}", final_url);
633                Ok(final_url.to_string())
634            }
635            None => Err(PactBrokerError::LinkError(format!(
636                "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', LINK: '{}'",
637                self.url, link.name
638            ))),
639        }
640    }
641
642    /// Iterate over all the links by name
643    pub fn iter_links(&self, link: &str) -> Result<Vec<Link>, PactBrokerError> {
644        match self.path_info {
645      None => Err(PactBrokerError::LinkError(format!("No previous resource has been fetched from the pact broker. URL: '{}', LINK: '{}'",
646        self.url, link))),
647      Some(ref json) => match json.get("_links") {
648        Some(json) => match json.get(&link) {
649          Some(link_data) => link_data.as_array()
650              .map(|link_data| link_data.iter().map(|link_json| match link_json {
651                Value::Object(data) => Link::from_json(&link, data),
652                Value::String(s) => Link { name: link.to_string(), href: Some(s.clone()), templated: false, title: None },
653                _ => Link { name: link.to_string(), href: Some(link_json.to_string()), templated: false, title: None }
654              }).collect())
655              .ok_or_else(|| PactBrokerError::LinkError(format!("Link is malformed, expected an object but got {}. URL: '{}', LINK: '{}'",
656                  link_data, self.url, link))),
657          None => Err(PactBrokerError::LinkError(format!("Link '{}' was not found in the response, only the following links where found: {:?}. URL: '{}', LINK: '{}'",
658            link, json.as_object().unwrap_or(&json!({}).as_object().unwrap()).keys().join(", "), self.url, link)))
659        },
660        None => Err(PactBrokerError::LinkError(format!("Expected a HAL+JSON response from the pact broker, but got a response with no '_links'. URL: '{}', LINK: '{}'",
661          self.url, link)))
662      }
663    }
664    }
665
666    pub async fn post_json(
667        &self,
668        url: &str,
669        body: &str,
670        headers: Option<HashMap<String, String>>,
671    ) -> Result<serde_json::Value, PactBrokerError> {
672        trace!("post_json(url='{}', body='{}')", url, body);
673
674        self.send_document(url, body, Method::POST, headers).await
675    }
676
677    pub async fn put_json(
678        &self,
679        url: &str,
680        body: &str,
681        headers: Option<HashMap<String, String>>,
682    ) -> Result<serde_json::Value, PactBrokerError> {
683        trace!("put_json(url='{}', body='{}')", url, body);
684
685        self.send_document(url, body, Method::PUT, headers).await
686    }
687    pub async fn patch_json(
688        &self,
689        url: &str,
690        body: &str,
691        headers: Option<HashMap<String, String>>,
692    ) -> Result<serde_json::Value, PactBrokerError> {
693        trace!("put_json(url='{}', body='{}')", url, body);
694
695        self.send_document(url, body, Method::PATCH, headers).await
696    }
697
698    async fn send_document(
699        &self,
700        url: &str,
701        body: &str,
702        method: Method,
703        headers: Option<HashMap<String, String>>,
704    ) -> Result<Value, PactBrokerError> {
705        let method_type = method.clone();
706        debug!("Sending JSON to {} using {}: {}", url, method, body);
707
708        let base_url = &self.url.parse::<Url>()?;
709        let url = if url.starts_with("/") {
710            base_url.join(url)?
711        } else {
712            let url = url.parse::<Url>()?;
713            base_url.join(&url.path())?
714        };
715
716        let request_builder = match self.auth {
717            Some(ref auth) => match auth {
718                HttpAuth::User(username, password) => self
719                    .client
720                    .request(method, url.clone())
721                    .basic_auth(username, password.clone()),
722                HttpAuth::Token(token) => {
723                    self.client.request(method, url.clone()).bearer_auth(token)
724                }
725                _ => self.client.request(method, url.clone()),
726            },
727            None => self.client.request(method, url.clone()),
728        }
729        .header("Accept", "application/hal+json")
730        .body(body.to_string());
731
732        // Add any additional headers if provided
733
734        let request_builder = if let Some(ref headers) = headers {
735            headers
736                .iter()
737                .fold(request_builder, |builder, (key, value)| {
738                    builder.header(key.as_str(), value.as_str())
739                })
740        } else {
741            request_builder
742        };
743
744        let request_builder = if method_type == Method::PATCH {
745            request_builder.header("Content-Type", "application/merge-patch+json")
746        } else {
747            request_builder.header("Content-Type", "application/json")
748        };
749        let response = with_retries(self.retries, request_builder).await;
750        match response {
751            Ok(res) => {
752                self.parse_broker_response(url.path().to_string(), res)
753                    .await
754            }
755            Err(err) => Err(PactBrokerError::IoError(format!(
756                "Failed to send JSON to the pact broker URL '{}' - IoError {}",
757                url, err
758            ))),
759        }
760    }
761
762    fn with_doc_context(self, doc_attributes: &[Link]) -> Result<HALClient, PactBrokerError> {
763        let links: serde_json::Map<String, serde_json::Value> = doc_attributes
764            .iter()
765            .map(|link| (link.name.clone(), link.as_json()))
766            .collect();
767        let links_json = json!({
768          "_links": json!(links)
769        });
770        Ok(self.update_path_info(links_json))
771    }
772}
773
774fn handle_validation_errors(body: Value) -> PactBrokerError {
775    match &body {
776        Value::Object(attrs) => {
777            // Extract notices if present
778            let notices: Vec<Notice> = attrs.get("notices")
779                .and_then(|n| n.as_array())
780                .map(|notices_array| {
781                    notices_array.iter()
782                        .filter_map(|notice| serde_json::from_value::<Notice>(notice.clone()).ok())
783                        .collect()
784                })
785                .unwrap_or_default();
786
787            if let Some(errors) = attrs.get("errors") {
788                let error_messages = match errors {
789                    Value::Array(values) => values.iter().map(|v| json_to_string(v)).collect(),
790                    Value::Object(errors) => errors
791                        .iter()
792                        .map(|(field, errors)| match errors {
793                            Value::String(error) => format!("{}: {}", field, error),
794                            Value::Array(errors) => format!(
795                                "{}: {}",
796                                field,
797                                errors.iter().map(|err| json_to_string(err)).join(", ")
798                            ),
799                            _ => format!("{}: {}", field, errors),
800                        })
801                        .collect(),
802                    Value::String(s) => vec![s.clone()],
803                    _ => vec![errors.to_string()],
804                };
805                
806                if !notices.is_empty() {
807                    PactBrokerError::ValidationErrorWithNotices(error_messages, notices)
808                } else {
809                    PactBrokerError::ValidationError(error_messages)
810                }
811            } else if !notices.is_empty() {
812                // Even if there are no explicit errors, notices might contain error information
813                let notice_messages = notices.iter().map(|n| n.text.clone()).collect();
814                PactBrokerError::ValidationErrorWithNotices(notice_messages, notices)
815            } else {
816                PactBrokerError::ValidationError(vec![body.to_string()])
817            }
818        }
819        Value::String(s) => PactBrokerError::ValidationError(vec![s.clone()]),
820        _ => PactBrokerError::ValidationError(vec![body.to_string()]),
821    }
822}
823
824impl HALClient {
825    pub fn setup(url: &str, auth: Option<HttpAuth>, ssl_options: SslOptions) -> HALClient {
826        let mut builder = reqwest::Client::builder().user_agent(format!(
827            "{}/{}",
828            env!("CARGO_PKG_NAME"),
829            env!("CARGO_PKG_VERSION")
830        ));
831
832        debug!("Using ssl_options: {:?}", ssl_options);
833        if let Some(ref path) = ssl_options.ssl_cert_path {
834            if let Ok(cert_bytes) = std::fs::read(path) {
835                if let Ok(cert) = reqwest::Certificate::from_pem_bundle(&cert_bytes) {
836                    debug!("Adding SSL certificate from path: {}", path);
837                    for c in cert {
838                        builder = builder.add_root_certificate(c.clone());
839                    }
840                }
841            } else {
842                debug!(
843                    "Could not read SSL certificate from provided path: {}",
844                    path
845                );
846            }
847        }
848        if ssl_options.skip_ssl {
849            builder = builder.danger_accept_invalid_certs(true);
850            debug!("Skipping SSL certificate validation");
851        }
852        if !ssl_options.use_root_trust_store {
853            builder = builder.tls_built_in_root_certs(false);
854            debug!("Disabling root trust store for SSL");
855        }
856
857        let built_client = builder.build().unwrap();
858        let client = ClientBuilder::new(built_client)
859            .with(TracingMiddleware::default())
860            .with(OtelPropagatorMiddleware)
861            .build();
862
863        HALClient {
864            client,
865            url: url.to_string(),
866            path_info: None,
867            auth,
868            retries: 3,
869            ssl_options,
870        }
871    }
872}
873
874pub fn links_from_json(json: &Value) -> Vec<Link> {
875    match json.get("_links") {
876        Some(json) => match json {
877            Value::Object(v) => v
878                .iter()
879                .map(|(link, json)| match json {
880                    Value::Object(attr) => Link::from_json(link, attr),
881                    _ => Link {
882                        name: link.clone(),
883                        ..Link::default()
884                    },
885                })
886                .collect(),
887            _ => vec![],
888        },
889        None => vec![],
890    }
891}
892
893/// Fetches the pacts from the broker that match the provider name
894pub async fn fetch_pacts_from_broker(
895    broker_url: &str,
896    provider_name: &str,
897    auth: Option<HttpAuth>,
898    ssl_options: SslOptions,
899) -> anyhow::Result<
900    Vec<
901        anyhow::Result<(
902            Box<dyn Pact + Send + Sync + RefUnwindSafe>,
903            Option<PactVerificationContext>,
904            Vec<Link>,
905        )>,
906    >,
907> {
908    trace!(
909        "fetch_pacts_from_broker(broker_url='{}', provider_name='{}', auth={})",
910        broker_url,
911        provider_name,
912        auth.clone().unwrap_or_default()
913    );
914
915    let mut hal_client = HALClient::with_url(broker_url, auth, ssl_options);
916    let template_values = hashmap! { "provider".to_string() => provider_name.to_string() };
917
918    hal_client = hal_client
919        .navigate("pb:latest-provider-pacts", &template_values)
920        .await
921        .map_err(move |err| match err {
922            PactBrokerError::NotFound(_) => PactBrokerError::NotFound(format!(
923                "No pacts for provider '{}' where found in the pact broker. URL: '{}'",
924                provider_name, broker_url
925            )),
926            _ => err,
927        })?;
928
929    let pact_links = hal_client.clone().iter_links("pacts")?;
930
931    let results: Vec<_> = futures::stream::iter(pact_links)
932        .map(|ref pact_link| {
933          match pact_link.href {
934            Some(_) => Ok((hal_client.clone(), pact_link.clone())),
935            None => Err(
936              PactBrokerError::LinkError(
937                format!(
938                  "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', LINK: '{:?}'",
939                  &hal_client.url,
940                  pact_link
941                )
942              )
943            )
944          }
945        })
946        .and_then(|(hal_client, pact_link)| async {
947          let pact_json = hal_client.fetch_url(
948            &pact_link.clone(),
949            &template_values.clone()
950          ).await?;
951          Ok((pact_link, pact_json))
952        })
953        .map(|result| {
954          match result {
955            Ok((pact_link, pact_json)) => {
956              let href = pact_link.href.unwrap_or_default();
957              let links = links_from_json(&pact_json);
958              load_pact_from_json(href.as_str(), &pact_json)
959                .map(|pact| (pact, None, links))
960            },
961            Err(err) => Err(err.into())
962          }
963        })
964        .into_stream()
965        .collect()
966        .await;
967
968    Ok(results)
969}
970
971/// Fetch Pacts from the broker using the "provider-pacts-for-verification" endpoint
972pub async fn fetch_pacts_dynamically_from_broker(
973    broker_url: &str,
974    provider_name: String,
975    pending: bool,
976    include_wip_pacts_since: Option<String>,
977    provider_tags: Vec<String>,
978    provider_branch: Option<String>,
979    consumer_version_selectors: Vec<ConsumerVersionSelector>,
980    auth: Option<HttpAuth>,
981    ssl_options: SslOptions,
982    headers: Option<HashMap<String, String>>,
983) -> anyhow::Result<
984    Vec<
985        Result<
986            (
987                Box<dyn Pact + Send + Sync + RefUnwindSafe>,
988                Option<PactVerificationContext>,
989                Vec<Link>,
990            ),
991            PactBrokerError,
992        >,
993    >,
994> {
995    trace!(
996        "fetch_pacts_dynamically_from_broker(broker_url='{}', provider_name='{}', pending={}, \
997    include_wip_pacts_since={:?}, provider_tags: {:?}, consumer_version_selectors: {:?}, auth={})",
998        broker_url,
999        provider_name,
1000        pending,
1001        include_wip_pacts_since,
1002        provider_tags,
1003        consumer_version_selectors,
1004        auth.clone().unwrap_or_default()
1005    );
1006
1007    let mut hal_client = HALClient::with_url(broker_url, auth, ssl_options);
1008    let template_values = hashmap! { "provider".to_string() => provider_name.clone() };
1009
1010    hal_client = hal_client
1011        .navigate("pb:provider-pacts-for-verification", &template_values)
1012        .await
1013        .map_err(move |err| match err {
1014            PactBrokerError::NotFound(_) => PactBrokerError::NotFound(format!(
1015                "No pacts for provider '{}' were found in the pact broker. URL: '{}'",
1016                provider_name.clone(),
1017                broker_url
1018            )),
1019            _ => err,
1020        })?;
1021
1022    // Construct the Pacts for verification payload
1023    let pacts_for_verification = PactsForVerificationRequest {
1024        provider_version_tags: provider_tags,
1025        provider_version_branch: provider_branch,
1026        include_wip_pacts_since,
1027        consumer_version_selectors,
1028        include_pending_status: pending,
1029    };
1030    let request_body = serde_json::to_string(&pacts_for_verification).unwrap();
1031
1032    // Post the verification request
1033    let response = match hal_client.find_link("self") {
1034        Ok(link) => {
1035            let link = hal_client.clone().parse_link_url(&link, &hashmap! {})?;
1036            match hal_client
1037                .clone()
1038                .post_json(link.as_str(), request_body.as_str(), headers)
1039                .await
1040            {
1041                Ok(res) => Some(res),
1042                Err(err) => {
1043                    info!("error response for pacts for verification: {} ", err);
1044                    return Err(anyhow!(err));
1045                }
1046            }
1047        }
1048        Err(e) => return Err(anyhow!(e)),
1049    };
1050
1051    // Find all of the Pact links
1052    let pact_links = match response {
1053        Some(v) => {
1054            let pfv: PactsForVerificationResponse = serde_json::from_value(v)
1055                .map_err(|err| {
1056                    trace!(
1057                        "Failed to deserialise PactsForVerificationResponse: {}",
1058                        err
1059                    );
1060                    err
1061                })
1062                .unwrap_or(PactsForVerificationResponse {
1063                    embedded: PactsForVerificationBody { pacts: vec![] },
1064                });
1065            trace!(?pfv, "got pacts for verification response");
1066
1067            if pfv.embedded.pacts.len() == 0 {
1068                return Err(anyhow!(PactBrokerError::NotFound(format!(
1069                    "No pacts were found for this provider"
1070                ))));
1071            };
1072
1073            let links: Result<Vec<(Link, PactVerificationContext)>, PactBrokerError> = pfv.embedded.pacts.iter().map(| p| {
1074          match p.links.get("self") {
1075            Some(l) => Ok((l.clone(), p.into())),
1076            None => Err(
1077              PactBrokerError::LinkError(
1078                format!(
1079                  "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', PATH: '{:?}'",
1080                  &hal_client.url,
1081                  &p.links,
1082                )
1083              )
1084            )
1085          }
1086        }).collect();
1087
1088            links
1089        }
1090        None => Err(PactBrokerError::NotFound(format!(
1091            "No pacts were found for this provider"
1092        ))),
1093    }?;
1094
1095    let results: Vec<_> = futures::stream::iter(pact_links)
1096      .map(|(ref pact_link, ref context)| {
1097        match pact_link.href {
1098          Some(_) => Ok((hal_client.clone(), pact_link.clone(), context.clone())),
1099          None => Err(
1100            PactBrokerError::LinkError(
1101              format!(
1102                "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', LINK: '{:?}'",
1103                &hal_client.url,
1104                pact_link
1105              )
1106            )
1107          )
1108        }
1109      })
1110      .and_then(|(hal_client, pact_link, context)| async {
1111        let pact_json = hal_client.fetch_url(
1112          &pact_link.clone(),
1113          &template_values.clone()
1114        ).await?;
1115        Ok((pact_link, pact_json, context))
1116      })
1117      .map(|result| {
1118        match result {
1119          Ok((pact_link, pact_json, context)) => {
1120            let href = pact_link.href.unwrap_or_default();
1121            let links = links_from_json(&pact_json);
1122            load_pact_from_json(href.as_str(), &pact_json)
1123              .map(|pact| (pact, Some(context), links))
1124              .map_err(|err| PactBrokerError::ContentError(format!("{}", err)))
1125          },
1126          Err(err) => Err(err)
1127        }
1128      })
1129      .into_stream()
1130      .collect()
1131      .await;
1132
1133    Ok(results)
1134}
1135
1136/// Fetch the Pact from the given URL, using any required authentication. This will use a GET
1137/// request to the given URL and parse the result into a Pact model. It will also look for any HAL
1138/// links in the response, returning those if found.
1139pub async fn fetch_pact_from_url(
1140    url: &str,
1141    auth: &Option<HttpAuth>,
1142) -> anyhow::Result<(Box<dyn Pact + Send + Sync + RefUnwindSafe>, Vec<Link>)> {
1143    let url = url.to_string();
1144    let auth = auth.clone();
1145    let (url, pact_json) =
1146        tokio::task::spawn_blocking(move || http_utils::fetch_json_from_url(&url, &auth)).await??;
1147    let pact = load_pact_from_json(&url, &pact_json)?;
1148    let links = links_from_json(&pact_json);
1149    Ok((pact, links))
1150}
1151
1152async fn publish_provider_tags(
1153    hal_client: &HALClient,
1154    links: &[Link],
1155    provider_tags: Vec<String>,
1156    version: &str,
1157    headers: Option<HashMap<String, String>>,
1158) -> Result<(), PactBrokerError> {
1159    let hal_client = hal_client
1160        .clone()
1161        .with_doc_context(links)?
1162        .navigate("pb:provider", &hashmap! {})
1163        .await?;
1164    match hal_client.find_link("pb:version-tag") {
1165        Ok(link) => {
1166            for tag in &provider_tags {
1167                let template_values = hashmap! {
1168                  "version".to_string() => version.to_string(),
1169                  "tag".to_string() => tag.clone()
1170                };
1171                match hal_client
1172                    .clone()
1173                    .put_json(
1174                        hal_client
1175                            .clone()
1176                            .parse_link_url(&link, &template_values)?
1177                            .as_str(),
1178                        "{}",
1179                        headers.clone(),
1180                    )
1181                    .await
1182                {
1183                    Ok(_) => debug!("Pushed tag {} for provider version {}", tag, version),
1184                    Err(err) => {
1185                        error!(
1186                            "Failed to push tag {} for provider version {}",
1187                            tag, version
1188                        );
1189                        return Err(err);
1190                    }
1191                }
1192            }
1193            Ok(())
1194        }
1195        Err(_) => Err(PactBrokerError::LinkError(
1196            "Can't publish provider tags as there is no 'pb:version-tag' link".to_string(),
1197        )),
1198    }
1199}
1200
1201async fn publish_provider_branch(
1202    hal_client: &HALClient,
1203    links: &[Link],
1204    branch: &str,
1205    version: &str,
1206    headers: Option<HashMap<String, String>>,
1207) -> Result<(), PactBrokerError> {
1208    let hal_client = hal_client
1209        .clone()
1210        .with_doc_context(links)?
1211        .navigate("pb:provider", &hashmap! {})
1212        .await?;
1213
1214    match hal_client.find_link("pb:branch-version") {
1215    Ok(link) => {
1216      let template_values = hashmap! {
1217        "branch".to_string() => branch.to_string(),
1218        "version".to_string() => version.to_string(),
1219      };
1220      match hal_client.clone().put_json(hal_client.clone().parse_link_url(&link, &template_values)?.as_str(), "{}",headers).await {
1221        Ok(_) => debug!("Pushed branch {} for provider version {}", branch, version),
1222        Err(err) => {
1223          error!("Failed to push branch {} for provider version {}", branch, version);
1224          return Err(err);
1225        }
1226      }
1227      Ok(())
1228    },
1229    Err(_) => Err(PactBrokerError::LinkError("Can't publish provider branch as there is no 'pb:branch-version' link. Please ugrade to Pact Broker version 2.86.0 or later for branch support".to_string()))
1230  }
1231}
1232
1233#[skip_serializing_none]
1234#[derive(Serialize, Deserialize, Debug, Clone)]
1235#[serde(rename_all = "camelCase")]
1236/// Structure to represent a HAL link
1237pub struct ConsumerVersionSelector {
1238    /// Application name to filter the results on
1239    pub consumer: Option<String>,
1240    /// Tag
1241    pub tag: Option<String>,
1242    /// Fallback tag if Tag doesn't exist
1243    pub fallback_tag: Option<String>,
1244    /// Only select the latest (if false, this selects all pacts for a tag)
1245    pub latest: Option<bool>,
1246    /// Applications that have been deployed or released
1247    pub deployed_or_released: Option<bool>,
1248    /// Applications that have been deployed
1249    pub deployed: Option<bool>,
1250    /// Applications that have been released
1251    pub released: Option<bool>,
1252    /// Applications in a given environment
1253    pub environment: Option<String>,
1254    /// Applications with the default branch set in the broker
1255    pub main_branch: Option<bool>,
1256    /// Applications with the given branch
1257    pub branch: Option<String>,
1258    /// Applications that match the the provider version branch sent during verification
1259    pub matching_branch: Option<bool>,
1260}
1261
1262#[derive(Serialize, Deserialize, Debug, Clone)]
1263#[serde(rename_all = "camelCase")]
1264struct PactsForVerificationResponse {
1265    #[serde(rename(deserialize = "_embedded"))]
1266    pub embedded: PactsForVerificationBody,
1267}
1268
1269#[derive(Serialize, Deserialize, Debug, Clone)]
1270#[serde(rename_all = "camelCase")]
1271struct PactsForVerificationBody {
1272    pub pacts: Vec<PactForVerification>,
1273}
1274
1275#[derive(Serialize, Deserialize, Debug, Clone)]
1276#[serde(rename_all = "camelCase")]
1277struct PactForVerification {
1278    pub short_description: String,
1279    #[serde(rename(deserialize = "_links"))]
1280    pub links: HashMap<String, Link>,
1281    pub verification_properties: Option<PactVerificationProperties>,
1282}
1283
1284#[skip_serializing_none]
1285#[derive(Serialize, Deserialize, Debug, Clone)]
1286#[serde(rename_all = "camelCase")]
1287/// Request to send to determine the pacts to verify
1288pub struct PactsForVerificationRequest {
1289    /// Provider tags to use for determining pending pacts (if enabled)
1290    #[serde(skip_serializing_if = "Vec::is_empty")]
1291    pub provider_version_tags: Vec<String>,
1292    /// Enable pending pacts feature
1293    pub include_pending_status: bool,
1294    /// Find WIP pacts after given date
1295    pub include_wip_pacts_since: Option<String>,
1296    /// Detailed pact selection criteria , see https://docs.pact.io/pact_broker/advanced_topics/consumer_version_selectors/
1297    pub consumer_version_selectors: Vec<ConsumerVersionSelector>,
1298    /// Current provider version branch if used (instead of tags)
1299    pub provider_version_branch: Option<String>,
1300}
1301
1302#[skip_serializing_none]
1303#[derive(Serialize, Deserialize, Debug, Clone)]
1304#[serde(rename_all = "camelCase")]
1305/// Provides the context on why a Pact was included
1306pub struct PactVerificationContext {
1307    /// Description
1308    pub short_description: String,
1309    /// Properties
1310    pub verification_properties: PactVerificationProperties,
1311}
1312
1313impl From<&PactForVerification> for PactVerificationContext {
1314    fn from(value: &PactForVerification) -> Self {
1315        PactVerificationContext {
1316            short_description: value.short_description.clone(),
1317            verification_properties: value.verification_properties.clone().unwrap_or_default(),
1318        }
1319    }
1320}
1321
1322#[skip_serializing_none]
1323#[derive(Serialize, Deserialize, Debug, Clone, Default)]
1324#[serde(rename_all = "camelCase")]
1325/// Properties associated with the verification context
1326pub struct PactVerificationProperties {
1327    #[serde(default)]
1328    /// If the Pact is pending
1329    pub pending: bool,
1330    /// Notices provided by the Pact Broker
1331    pub notices: Vec<HashMap<String, String>>,
1332}