1use std::collections::HashMap;
4use std::ops::Not;
5use std::panic::RefUnwindSafe;
6use std::str::from_utf8;
7
8use anyhow::anyhow;
9use futures::stream::*;
10
11use itertools::Itertools;
12use maplit::hashmap;
13
14use pact_models::http_utils;
15use pact_models::http_utils::HttpAuth;
16use pact_models::json_utils::json_to_string;
17use pact_models::pact::{Pact, load_pact_from_json};
18use regex::{Captures, Regex};
19use reqwest::{Method, Url};
20use serde::{Deserialize, Serialize};
21use serde_json::{Value, json};
22use serde_with::skip_serializing_none;
23use tracing::{debug, error, info, trace, warn};
24pub mod branches;
25pub mod can_i_deploy;
26pub mod deployments;
27pub mod environments;
28pub mod pact_publish;
29pub mod pacticipants;
30pub mod pacts;
31pub mod subcommands;
32pub mod tags;
33#[cfg(test)]
34pub mod test_utils;
35pub mod types;
36pub mod utils;
37pub mod verification;
38pub mod versions;
39pub mod webhooks;
40use utils::with_retries;
41use http::Extensions;
43use opentelemetry::Context;
44use opentelemetry::global;
45use opentelemetry_http::HeaderInjector;
46use reqwest::Request;
47use reqwest::Response;
48use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
49use reqwest_middleware::{Middleware, Next};
50use reqwest_tracing::TracingMiddleware;
51
52use crate::cli::pact_broker::main::types::SslOptions;
53
54fn is_true(object: &serde_json::Map<String, Value>, field: &str) -> bool {
55 match object.get(field) {
56 Some(json) => match *json {
57 serde_json::Value::Bool(b) => b,
58 _ => false,
59 },
60 None => false,
61 }
62}
63
64fn as_string(json: &Value) -> String {
65 match *json {
66 serde_json::Value::String(ref s) => s.clone(),
67 _ => format!("{}", json),
68 }
69}
70
71fn content_type(response: &reqwest::Response) -> String {
72 match response.headers().get("content-type") {
73 Some(value) => value.to_str().unwrap_or("text/plain").into(),
74 None => "text/plain".to_string(),
75 }
76}
77
78fn json_content_type(response: &reqwest::Response) -> bool {
79 match content_type(response).parse::<mime::Mime>() {
80 Ok(mime) => {
81 match (
82 mime.type_().as_str(),
83 mime.subtype().as_str(),
84 mime.suffix(),
85 ) {
86 ("application", "json", None) => true,
87 ("application", "hal", Some(mime::JSON)) => true,
88 _ => false,
89 }
90 }
91 Err(_) => false,
92 }
93}
94
95fn find_entry(map: &serde_json::Map<String, Value>, key: &str) -> Option<(String, Value)> {
96 match map.keys().find(|k| k.to_lowercase() == key.to_lowercase()) {
97 Some(k) => map.get(k).map(|v| (key.to_string(), v.clone())),
98 None => None,
99 }
100}
101
102#[derive(Debug, Clone, thiserror::Error)]
104pub enum PactBrokerError {
105 #[error("Error with a HAL link - {0}")]
107 LinkError(String),
108 #[error("Error with the content of a HAL resource - {0}")]
110 ContentError(String),
111 #[error("IO Error - {0}")]
112 IoError(String),
114 #[error("Link/Resource was not found - {0}")]
116 NotFound(String),
117 #[error("Invalid URL - {0}")]
119 UrlError(String),
120 #[error("failed validation - {0:?}")]
122 ValidationError(Vec<String>),
123}
124
125impl PartialEq<String> for PactBrokerError {
126 fn eq(&self, other: &String) -> bool {
127 let mut buffer = String::new();
128 match self {
129 PactBrokerError::LinkError(s) => buffer.push_str(s),
130 PactBrokerError::ContentError(s) => buffer.push_str(s),
131 PactBrokerError::IoError(s) => buffer.push_str(s),
132 PactBrokerError::NotFound(s) => buffer.push_str(s),
133 PactBrokerError::UrlError(s) => buffer.push_str(s),
134 PactBrokerError::ValidationError(errors) => {
135 buffer.push_str(errors.iter().join(", ").as_str())
136 }
137 };
138 buffer == *other
139 }
140}
141
142impl<'a> PartialEq<&'a str> for PactBrokerError {
143 fn eq(&self, other: &&str) -> bool {
144 let message = match self {
145 PactBrokerError::LinkError(s) => s.clone(),
146 PactBrokerError::ContentError(s) => s.clone(),
147 PactBrokerError::IoError(s) => s.clone(),
148 PactBrokerError::NotFound(s) => s.clone(),
149 PactBrokerError::UrlError(s) => s.clone(),
150 PactBrokerError::ValidationError(errors) => errors.iter().join(", "),
151 };
152 message.as_str() == *other
153 }
154}
155
156impl From<url::ParseError> for PactBrokerError {
157 fn from(err: url::ParseError) -> Self {
158 PactBrokerError::UrlError(format!("{}", err))
159 }
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
163#[serde(default)]
164pub struct Link {
166 pub name: String,
168 pub href: Option<String>,
170 pub templated: bool,
172 pub title: Option<String>,
174}
175
176impl Link {
177 pub fn from_json(link: &str, link_data: &serde_json::Map<String, serde_json::Value>) -> Link {
179 Link {
180 name: link.to_string(),
181 href: find_entry(link_data, &"href".to_string()).map(|(_, href)| as_string(&href)),
182 templated: is_true(link_data, "templated"),
183 title: link_data.get("title").map(|title| as_string(title)),
184 }
185 }
186
187 pub fn as_json(&self) -> serde_json::Value {
189 match (self.href.clone(), self.title.clone()) {
190 (Some(href), Some(title)) => json!({
191 "href": href,
192 "title": title,
193 "templated": self.templated
194 }),
195 (Some(href), None) => json!({
196 "href": href,
197 "templated": self.templated
198 }),
199 (None, Some(title)) => json!({
200 "title": title,
201 "templated": self.templated
202 }),
203 (None, None) => json!({
204 "templated": self.templated
205 }),
206 }
207 }
208}
209
210impl Default for Link {
211 fn default() -> Self {
212 Link {
213 name: "link".to_string(),
214 href: None,
215 templated: false,
216 title: None,
217 }
218 }
219}
220
221#[derive(Clone)]
223pub struct HALClient {
224 client: ClientWithMiddleware,
225 url: String,
226 path_info: Option<Value>,
227 auth: Option<HttpAuth>,
228 ssl_options: SslOptions,
229 retries: u8,
230}
231
232struct OtelPropagatorMiddleware;
233
234#[async_trait::async_trait]
235impl Middleware for OtelPropagatorMiddleware {
236 async fn handle(
237 &self,
238 mut req: Request,
239 extensions: &mut Extensions,
240 next: Next<'_>,
241 ) -> reqwest_middleware::Result<Response> {
242 let cx = Context::current();
243 let mut headers = reqwest::header::HeaderMap::new();
244 global::get_text_map_propagator(|propagator| {
245 propagator.inject_context(&cx, &mut HeaderInjector(&mut headers))
246 });
247 headers.append(
248 "baggage",
249 reqwest::header::HeaderValue::from_static("is_synthetic=true"),
250 );
251
252 for (key, value) in headers.iter() {
253 req.headers_mut().append(key, value.clone());
254 }
255 let res = next.run(req, extensions).await;
256 res
257 }
258}
259
260pub trait WithCurrentSpan {
261 fn with_current_span<F, R>(&self, f: F) -> R
262 where
263 F: FnOnce() -> R;
264}
265
266impl<T> WithCurrentSpan for T {
267 fn with_current_span<F, R>(&self, f: F) -> R
268 where
269 F: FnOnce() -> R,
270 {
271 let span = tracing::Span::current();
272 let _enter = span.enter();
273 f()
274 }
275}
276
277impl HALClient {
278 pub fn with_url(url: &str, auth: Option<HttpAuth>, ssl_options: SslOptions) -> HALClient {
280 HALClient {
281 url: url.to_string(),
282 auth: auth.clone(),
283 ssl_options: ssl_options.clone(),
284 ..HALClient::setup(url, auth, ssl_options)
285 }
286 }
287
288 fn update_path_info(self, path_info: serde_json::Value) -> HALClient {
289 HALClient {
290 client: self.client.clone(),
291 url: self.url.clone(),
292 path_info: Some(path_info),
293 auth: self.auth,
294 retries: self.retries,
295 ssl_options: self.ssl_options,
296 }
297 }
298
299 pub async fn navigate(
301 self,
302 link: &'static str,
303 template_values: &HashMap<String, String>,
304 ) -> Result<HALClient, PactBrokerError> {
305 trace!(
306 "navigate(link='{}', template_values={:?})",
307 link, template_values
308 );
309
310 let client = if self.path_info.is_none() {
311 let path_info = self.clone().fetch("/".into()).await?;
312 self.update_path_info(path_info)
313 } else {
314 self
315 };
316
317 let path_info = client.clone().fetch_link(link, template_values).await?;
318 let client = client.update_path_info(path_info);
319
320 Ok(client)
321 }
322
323 fn find_link(&self, link: &'static str) -> Result<Link, PactBrokerError> {
324 match self.path_info {
325 None => Err(PactBrokerError::LinkError(format!("No previous resource has been fetched from the pact broker. URL: '{}', LINK: '{}'",
326 self.url, link))),
327 Some(ref json) => match json.get("_links") {
328 Some(json) => match json.get(link) {
329 Some(link_data) => link_data.as_object()
330 .map(|link_data| Link::from_json(&link.to_string(), &link_data))
331 .ok_or_else(|| PactBrokerError::LinkError(format!("Link is malformed, expected an object but got {}. URL: '{}', LINK: '{}'",
332 link_data, self.url, link))),
333 None => Err(PactBrokerError::LinkError(format!("Link '{}' was not found in the response, only the following links where found: {:?}. URL: '{}', LINK: '{}'",
334 link, json.as_object().unwrap_or(&json!({}).as_object().unwrap()).keys().join(", "), self.url, link)))
335 },
336 None => Err(PactBrokerError::LinkError(format!("Expected a HAL+JSON response from the pact broker, but got a response with no '_links'. URL: '{}', LINK: '{}'",
337 self.url, link)))
338 }
339 }
340 }
341
342 async fn fetch_link(
343 self,
344 link: &'static str,
345 template_values: &HashMap<String, String>,
346 ) -> Result<Value, PactBrokerError> {
347 trace!(
348 "fetch_link(link='{}', template_values={:?})",
349 link, template_values
350 );
351
352 let link_data = self.find_link(link)?;
353
354 self.fetch_url(&link_data, template_values).await
355 }
356
357 pub async fn fetch_url(
359 self,
360 link: &Link,
361 template_values: &HashMap<String, String>,
362 ) -> Result<Value, PactBrokerError> {
363 debug!(
364 "fetch_url(link={:?}, template_values={:?})",
365 link, template_values
366 );
367
368 let link_url = if link.templated {
369 debug!("Link URL is templated");
370 self.clone().parse_link_url(&link, &template_values)
371 } else {
372 link.href.clone().ok_or_else(|| {
373 PactBrokerError::LinkError(format!(
374 "Link is malformed, there is no href. URL: '{}', LINK: '{}'",
375 self.url, link.name
376 ))
377 })
378 }?;
379
380 let base_url = self.url.parse::<Url>()?;
381 let joined_url = base_url.join(&link_url)?;
382 self.fetch(joined_url.path().into()).await
383 }
384 pub async fn delete_url(
385 self,
386 link: &Link,
387 template_values: &HashMap<String, String>,
388 ) -> Result<Value, PactBrokerError> {
389 debug!(
390 "fetch_url(link={:?}, template_values={:?})",
391 link, template_values
392 );
393
394 let link_url = if link.templated {
395 debug!("Link URL is templated");
396 self.clone().parse_link_url(&link, &template_values)
397 } else {
398 link.href.clone().ok_or_else(|| {
399 PactBrokerError::LinkError(format!(
400 "Link is malformed, there is no href. URL: '{}', LINK: '{}'",
401 self.url, link.name
402 ))
403 })
404 }?;
405
406 let base_url = self.url.parse::<Url>()?;
407 debug!("base_url: {}", base_url);
408 debug!("link_url: {}", link_url);
409 let joined_url = base_url.join(&link_url)?;
410 debug!("joined_url: {}", joined_url);
411 self.delete(joined_url.path().into()).await
412 }
413
414 pub async fn fetch(self, path: &str) -> Result<Value, PactBrokerError> {
415 info!("Fetching path '{}' from pact broker", path);
416
417 let broker_url = self.url.parse::<Url>()?;
418 let context_path = broker_url.path();
419 let url = if context_path.is_empty().not()
420 && context_path != "/"
421 && path.starts_with(context_path)
422 {
423 let mut base_url = broker_url.clone();
424 base_url.set_path("/");
425 base_url.join(path)?
426 } else {
427 broker_url.join(path)?
428 };
429
430 let request_builder = match self.auth {
431 Some(ref auth) => match auth {
432 HttpAuth::User(username, password) => {
433 self.client.get(url).basic_auth(username, password.clone())
434 }
435 HttpAuth::Token(token) => self.client.get(url).bearer_auth(token),
436 _ => self.client.get(url),
437 },
438 None => self.client.get(url),
439 }
440 .header("accept", "application/hal+json, application/json");
441
442 let response = utils::with_retries(self.retries, request_builder)
443 .await
444 .map_err(|err| {
445 PactBrokerError::IoError(format!(
446 "Failed to access pact broker path '{}' - {}. URL: '{}'",
447 &path, err, &self.url,
448 ))
449 })?;
450
451 self.parse_broker_response(path.to_string(), response).await
452 }
453
454 pub async fn delete(self, path: &str) -> Result<Value, PactBrokerError> {
455 info!("Deleting path '{}' from pact broker", path);
456
457 let broker_url = self.url.parse::<Url>()?;
458 let context_path = broker_url.path();
459 let url = if context_path.is_empty().not()
460 && context_path != "/"
461 && path.starts_with(context_path)
462 {
463 let mut base_url = broker_url.clone();
464 base_url.set_path("/");
465 base_url.join(path)?
466 } else {
467 broker_url.join(path)?
468 };
469
470 let request_builder = match self.auth {
471 Some(ref auth) => match auth {
472 HttpAuth::User(username, password) => self
473 .client
474 .delete(url)
475 .basic_auth(username, password.clone()),
476 HttpAuth::Token(token) => self.client.delete(url).bearer_auth(token),
477 _ => self.client.delete(url),
478 },
479 None => self.client.delete(url),
480 };
481
482 let response = utils::with_retries(self.retries, request_builder)
483 .await
484 .map_err(|err| {
485 PactBrokerError::IoError(format!(
486 "Failed to delete pact broker path '{}' - {}. URL: '{}'",
487 &path, err, &self.url,
488 ))
489 })?;
490
491 self.parse_broker_response(path.to_string(), response).await
492 }
493
494 async fn parse_broker_response(
495 &self,
496 path: String,
497 response: reqwest::Response,
498 ) -> Result<Value, PactBrokerError> {
499 let is_json_content_type = json_content_type(&response);
500 let content_type = content_type(&response);
501 let status_code = response.status();
502
503 if status_code.is_success() {
504 if is_json_content_type {
505 response.json::<Value>()
506 .await
507 .map_err(|err| PactBrokerError::ContentError(
508 format!("Did not get a valid HAL response body from pact broker path '{}' - {}. URL: '{}'",
509 path, err, self.url)
510 ))
511 } else if status_code.as_u16() == 204 {
512 Ok(json!({}))
513 } else {
514 debug!("Request from broker was a success, but the response body was not JSON");
515 Err(PactBrokerError::ContentError(format!(
516 "Did not get a valid HAL response body from pact broker path '{}', content type is '{}'. URL: '{}'",
517 path, content_type, self.url
518 )))
519 }
520 } else if status_code.as_u16() == 404 {
521 Err(PactBrokerError::NotFound(format!(
522 "Request to pact broker path '{}' failed: {}. URL: '{}'",
523 path, status_code, self.url
524 )))
525 } else if status_code.as_u16() == 400 {
526 let body = response.bytes().await.map_err(|_| {
527 PactBrokerError::IoError(format!(
528 "Failed to download response body for path '{}'. URL: '{}'",
529 &path, self.url
530 ))
531 })?;
532
533 if is_json_content_type {
534 let errors = serde_json::from_slice(&body)
535 .map_err(|err| PactBrokerError::ContentError(
536 format!("Did not get a valid HAL response body from pact broker path '{}' - {}. URL: '{}'",
537 path, err, self.url)
538 ))?;
539 Err(handle_validation_errors(errors))
540 } else {
541 let body = from_utf8(&body)
542 .map(|b| b.to_string())
543 .unwrap_or_else(|err| format!("could not read body: {}", err));
544 error!("Request to pact broker path '{}' failed: {}", path, body);
545 Err(PactBrokerError::IoError(format!(
546 "Request to pact broker path '{}' failed: {}. URL: '{}'",
547 path, status_code, self.url
548 )))
549 }
550 } else {
551 Err(PactBrokerError::IoError(format!(
552 "Request to pact broker path '{}' failed: {}. URL: '{}'",
553 path, status_code, self.url
554 )))
555 }
556 }
557
558 fn parse_link_url(
559 &self,
560 link: &Link,
561 values: &HashMap<String, String>,
562 ) -> Result<String, PactBrokerError> {
563 match link.href {
564 Some(ref href) => {
565 debug!("templated URL = {}", href);
566 let re = Regex::new(r"\{(\w+)}").unwrap();
567 let final_url = re.replace_all(href, |caps: &Captures| {
568 let lookup = caps.get(1).unwrap().as_str();
569 trace!("Looking up value for key '{}'", lookup);
570 match values.get(lookup) {
571 Some(val) => urlencoding::encode(val.as_str()).to_string(),
572 None => {
573 warn!(
574 "No value was found for key '{}', mapped values are {:?}",
575 lookup, values
576 );
577 format!("{{{}}}", lookup)
578 }
579 }
580 });
581 debug!("final URL = {}", final_url);
582 Ok(final_url.to_string())
583 }
584 None => Err(PactBrokerError::LinkError(format!(
585 "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', LINK: '{}'",
586 self.url, link.name
587 ))),
588 }
589 }
590
591 pub fn iter_links(&self, link: &str) -> Result<Vec<Link>, PactBrokerError> {
593 match self.path_info {
594 None => Err(PactBrokerError::LinkError(format!("No previous resource has been fetched from the pact broker. URL: '{}', LINK: '{}'",
595 self.url, link))),
596 Some(ref json) => match json.get("_links") {
597 Some(json) => match json.get(&link) {
598 Some(link_data) => link_data.as_array()
599 .map(|link_data| link_data.iter().map(|link_json| match link_json {
600 Value::Object(data) => Link::from_json(&link, data),
601 Value::String(s) => Link { name: link.to_string(), href: Some(s.clone()), templated: false, title: None },
602 _ => Link { name: link.to_string(), href: Some(link_json.to_string()), templated: false, title: None }
603 }).collect())
604 .ok_or_else(|| PactBrokerError::LinkError(format!("Link is malformed, expected an object but got {}. URL: '{}', LINK: '{}'",
605 link_data, self.url, link))),
606 None => Err(PactBrokerError::LinkError(format!("Link '{}' was not found in the response, only the following links where found: {:?}. URL: '{}', LINK: '{}'",
607 link, json.as_object().unwrap_or(&json!({}).as_object().unwrap()).keys().join(", "), self.url, link)))
608 },
609 None => Err(PactBrokerError::LinkError(format!("Expected a HAL+JSON response from the pact broker, but got a response with no '_links'. URL: '{}', LINK: '{}'",
610 self.url, link)))
611 }
612 }
613 }
614
615 pub async fn post_json(
616 &self,
617 url: &str,
618 body: &str,
619 headers: Option<HashMap<String, String>>,
620 ) -> Result<serde_json::Value, PactBrokerError> {
621 trace!("post_json(url='{}', body='{}')", url, body);
622
623 self.send_document(url, body, Method::POST, headers).await
624 }
625
626 pub async fn put_json(
627 &self,
628 url: &str,
629 body: &str,
630 headers: Option<HashMap<String, String>>,
631 ) -> Result<serde_json::Value, PactBrokerError> {
632 trace!("put_json(url='{}', body='{}')", url, body);
633
634 self.send_document(url, body, Method::PUT, headers).await
635 }
636 pub async fn patch_json(
637 &self,
638 url: &str,
639 body: &str,
640 headers: Option<HashMap<String, String>>,
641 ) -> Result<serde_json::Value, PactBrokerError> {
642 trace!("put_json(url='{}', body='{}')", url, body);
643
644 self.send_document(url, body, Method::PATCH, headers).await
645 }
646
647 async fn send_document(
648 &self,
649 url: &str,
650 body: &str,
651 method: Method,
652 headers: Option<HashMap<String, String>>,
653 ) -> Result<Value, PactBrokerError> {
654 let method_type = method.clone();
655 debug!("Sending JSON to {} using {}: {}", url, method, body);
656
657 let base_url = &self.url.parse::<Url>()?;
658 let url = if url.starts_with("/") {
659 base_url.join(url)?
660 } else {
661 let url = url.parse::<Url>()?;
662 base_url.join(&url.path())?
663 };
664
665 let request_builder = match self.auth {
666 Some(ref auth) => match auth {
667 HttpAuth::User(username, password) => self
668 .client
669 .request(method, url.clone())
670 .basic_auth(username, password.clone()),
671 HttpAuth::Token(token) => {
672 self.client.request(method, url.clone()).bearer_auth(token)
673 }
674 _ => self.client.request(method, url.clone()),
675 },
676 None => self.client.request(method, url.clone()),
677 }
678 .header("Accept", "application/hal+json")
679 .body(body.to_string());
680
681 let request_builder = if let Some(ref headers) = headers {
684 headers
685 .iter()
686 .fold(request_builder, |builder, (key, value)| {
687 builder.header(key.as_str(), value.as_str())
688 })
689 } else {
690 request_builder
691 };
692
693 let request_builder = if method_type == Method::PATCH {
694 request_builder.header("Content-Type", "application/merge-patch+json")
695 } else {
696 request_builder.header("Content-Type", "application/json")
697 };
698 let response = with_retries(self.retries, request_builder).await;
699 match response {
700 Ok(res) => {
701 self.parse_broker_response(url.path().to_string(), res)
702 .await
703 }
704 Err(err) => Err(PactBrokerError::IoError(format!(
705 "Failed to send JSON to the pact broker URL '{}' - IoError {}",
706 url, err
707 ))),
708 }
709 }
710
711 fn with_doc_context(self, doc_attributes: &[Link]) -> Result<HALClient, PactBrokerError> {
712 let links: serde_json::Map<String, serde_json::Value> = doc_attributes
713 .iter()
714 .map(|link| (link.name.clone(), link.as_json()))
715 .collect();
716 let links_json = json!({
717 "_links": json!(links)
718 });
719 Ok(self.update_path_info(links_json))
720 }
721}
722
723fn handle_validation_errors(body: Value) -> PactBrokerError {
724 match &body {
725 Value::Object(attrs) => {
726 if let Some(errors) = attrs.get("errors") {
727 match errors {
728 Value::Array(values) => PactBrokerError::ValidationError(
729 values.iter().map(|v| json_to_string(v)).collect(),
730 ),
731 Value::Object(errors) => PactBrokerError::ValidationError(
732 errors
733 .iter()
734 .map(|(field, errors)| match errors {
735 Value::String(error) => format!("{}: {}", field, error),
736 Value::Array(errors) => format!(
737 "{}: {}",
738 field,
739 errors.iter().map(|err| json_to_string(err)).join(", ")
740 ),
741 _ => format!("{}: {}", field, errors),
742 })
743 .collect(),
744 ),
745 Value::String(s) => PactBrokerError::ValidationError(vec![s.clone()]),
746 _ => PactBrokerError::ValidationError(vec![errors.to_string()]),
747 }
748 } else {
749 PactBrokerError::ValidationError(vec![body.to_string()])
750 }
751 }
752 Value::String(s) => PactBrokerError::ValidationError(vec![s.clone()]),
753 _ => PactBrokerError::ValidationError(vec![body.to_string()]),
754 }
755}
756
757impl HALClient {
758 pub fn setup(url: &str, auth: Option<HttpAuth>, ssl_options: SslOptions) -> HALClient {
759 let mut builder = reqwest::Client::builder().user_agent(format!(
760 "{}/{}",
761 env!("CARGO_PKG_NAME"),
762 env!("CARGO_PKG_VERSION")
763 ));
764
765 debug!("Using ssl_options: {:?}", ssl_options);
766 if let Some(ref path) = ssl_options.ssl_cert_path {
767 if let Ok(cert_bytes) = std::fs::read(path) {
768 if let Ok(cert) = reqwest::Certificate::from_pem_bundle(&cert_bytes) {
769 debug!("Adding SSL certificate from path: {}", path);
770 for c in cert {
771 builder = builder.add_root_certificate(c.clone());
772 }
773 }
774 } else {
775 debug!(
776 "Could not read SSL certificate from provided path: {}",
777 path
778 );
779 }
780 }
781 if ssl_options.skip_ssl {
782 builder = builder.danger_accept_invalid_certs(true);
783 debug!("Skipping SSL certificate validation");
784 }
785 if !ssl_options.use_root_trust_store {
786 builder = builder.tls_built_in_root_certs(false);
787 debug!("Disabling root trust store for SSL");
788 }
789
790 let built_client = builder.build().unwrap();
791 let client = ClientBuilder::new(built_client)
792 .with(TracingMiddleware::default())
793 .with(OtelPropagatorMiddleware)
794 .build();
795
796 HALClient {
797 client,
798 url: url.to_string(),
799 path_info: None,
800 auth,
801 retries: 3,
802 ssl_options,
803 }
804 }
805}
806
807pub fn links_from_json(json: &Value) -> Vec<Link> {
808 match json.get("_links") {
809 Some(json) => match json {
810 Value::Object(v) => v
811 .iter()
812 .map(|(link, json)| match json {
813 Value::Object(attr) => Link::from_json(link, attr),
814 _ => Link {
815 name: link.clone(),
816 ..Link::default()
817 },
818 })
819 .collect(),
820 _ => vec![],
821 },
822 None => vec![],
823 }
824}
825
826pub async fn fetch_pacts_from_broker(
828 broker_url: &str,
829 provider_name: &str,
830 auth: Option<HttpAuth>,
831 ssl_options: SslOptions,
832) -> anyhow::Result<
833 Vec<
834 anyhow::Result<(
835 Box<dyn Pact + Send + Sync + RefUnwindSafe>,
836 Option<PactVerificationContext>,
837 Vec<Link>,
838 )>,
839 >,
840> {
841 trace!(
842 "fetch_pacts_from_broker(broker_url='{}', provider_name='{}', auth={})",
843 broker_url,
844 provider_name,
845 auth.clone().unwrap_or_default()
846 );
847
848 let mut hal_client = HALClient::with_url(broker_url, auth, ssl_options);
849 let template_values = hashmap! { "provider".to_string() => provider_name.to_string() };
850
851 hal_client = hal_client
852 .navigate("pb:latest-provider-pacts", &template_values)
853 .await
854 .map_err(move |err| match err {
855 PactBrokerError::NotFound(_) => PactBrokerError::NotFound(format!(
856 "No pacts for provider '{}' where found in the pact broker. URL: '{}'",
857 provider_name, broker_url
858 )),
859 _ => err,
860 })?;
861
862 let pact_links = hal_client.clone().iter_links("pacts")?;
863
864 let results: Vec<_> = futures::stream::iter(pact_links)
865 .map(|ref pact_link| {
866 match pact_link.href {
867 Some(_) => Ok((hal_client.clone(), pact_link.clone())),
868 None => Err(
869 PactBrokerError::LinkError(
870 format!(
871 "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', LINK: '{:?}'",
872 &hal_client.url,
873 pact_link
874 )
875 )
876 )
877 }
878 })
879 .and_then(|(hal_client, pact_link)| async {
880 let pact_json = hal_client.fetch_url(
881 &pact_link.clone(),
882 &template_values.clone()
883 ).await?;
884 Ok((pact_link, pact_json))
885 })
886 .map(|result| {
887 match result {
888 Ok((pact_link, pact_json)) => {
889 let href = pact_link.href.unwrap_or_default();
890 let links = links_from_json(&pact_json);
891 load_pact_from_json(href.as_str(), &pact_json)
892 .map(|pact| (pact, None, links))
893 },
894 Err(err) => Err(err.into())
895 }
896 })
897 .into_stream()
898 .collect()
899 .await;
900
901 Ok(results)
902}
903
904pub async fn fetch_pacts_dynamically_from_broker(
906 broker_url: &str,
907 provider_name: String,
908 pending: bool,
909 include_wip_pacts_since: Option<String>,
910 provider_tags: Vec<String>,
911 provider_branch: Option<String>,
912 consumer_version_selectors: Vec<ConsumerVersionSelector>,
913 auth: Option<HttpAuth>,
914 ssl_options: SslOptions,
915 headers: Option<HashMap<String, String>>,
916) -> anyhow::Result<
917 Vec<
918 Result<
919 (
920 Box<dyn Pact + Send + Sync + RefUnwindSafe>,
921 Option<PactVerificationContext>,
922 Vec<Link>,
923 ),
924 PactBrokerError,
925 >,
926 >,
927> {
928 trace!(
929 "fetch_pacts_dynamically_from_broker(broker_url='{}', provider_name='{}', pending={}, \
930 include_wip_pacts_since={:?}, provider_tags: {:?}, consumer_version_selectors: {:?}, auth={})",
931 broker_url,
932 provider_name,
933 pending,
934 include_wip_pacts_since,
935 provider_tags,
936 consumer_version_selectors,
937 auth.clone().unwrap_or_default()
938 );
939
940 let mut hal_client = HALClient::with_url(broker_url, auth, ssl_options);
941 let template_values = hashmap! { "provider".to_string() => provider_name.clone() };
942
943 hal_client = hal_client
944 .navigate("pb:provider-pacts-for-verification", &template_values)
945 .await
946 .map_err(move |err| match err {
947 PactBrokerError::NotFound(_) => PactBrokerError::NotFound(format!(
948 "No pacts for provider '{}' were found in the pact broker. URL: '{}'",
949 provider_name.clone(),
950 broker_url
951 )),
952 _ => err,
953 })?;
954
955 let pacts_for_verification = PactsForVerificationRequest {
957 provider_version_tags: provider_tags,
958 provider_version_branch: provider_branch,
959 include_wip_pacts_since,
960 consumer_version_selectors,
961 include_pending_status: pending,
962 };
963 let request_body = serde_json::to_string(&pacts_for_verification).unwrap();
964
965 let response = match hal_client.find_link("self") {
967 Ok(link) => {
968 let link = hal_client.clone().parse_link_url(&link, &hashmap! {})?;
969 match hal_client
970 .clone()
971 .post_json(link.as_str(), request_body.as_str(), headers)
972 .await
973 {
974 Ok(res) => Some(res),
975 Err(err) => {
976 info!("error response for pacts for verification: {} ", err);
977 return Err(anyhow!(err));
978 }
979 }
980 }
981 Err(e) => return Err(anyhow!(e)),
982 };
983
984 let pact_links = match response {
986 Some(v) => {
987 let pfv: PactsForVerificationResponse = serde_json::from_value(v)
988 .map_err(|err| {
989 trace!(
990 "Failed to deserialise PactsForVerificationResponse: {}",
991 err
992 );
993 err
994 })
995 .unwrap_or(PactsForVerificationResponse {
996 embedded: PactsForVerificationBody { pacts: vec![] },
997 });
998 trace!(?pfv, "got pacts for verification response");
999
1000 if pfv.embedded.pacts.len() == 0 {
1001 return Err(anyhow!(PactBrokerError::NotFound(format!(
1002 "No pacts were found for this provider"
1003 ))));
1004 };
1005
1006 let links: Result<Vec<(Link, PactVerificationContext)>, PactBrokerError> = pfv.embedded.pacts.iter().map(| p| {
1007 match p.links.get("self") {
1008 Some(l) => Ok((l.clone(), p.into())),
1009 None => Err(
1010 PactBrokerError::LinkError(
1011 format!(
1012 "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', PATH: '{:?}'",
1013 &hal_client.url,
1014 &p.links,
1015 )
1016 )
1017 )
1018 }
1019 }).collect();
1020
1021 links
1022 }
1023 None => Err(PactBrokerError::NotFound(format!(
1024 "No pacts were found for this provider"
1025 ))),
1026 }?;
1027
1028 let results: Vec<_> = futures::stream::iter(pact_links)
1029 .map(|(ref pact_link, ref context)| {
1030 match pact_link.href {
1031 Some(_) => Ok((hal_client.clone(), pact_link.clone(), context.clone())),
1032 None => Err(
1033 PactBrokerError::LinkError(
1034 format!(
1035 "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', LINK: '{:?}'",
1036 &hal_client.url,
1037 pact_link
1038 )
1039 )
1040 )
1041 }
1042 })
1043 .and_then(|(hal_client, pact_link, context)| async {
1044 let pact_json = hal_client.fetch_url(
1045 &pact_link.clone(),
1046 &template_values.clone()
1047 ).await?;
1048 Ok((pact_link, pact_json, context))
1049 })
1050 .map(|result| {
1051 match result {
1052 Ok((pact_link, pact_json, context)) => {
1053 let href = pact_link.href.unwrap_or_default();
1054 let links = links_from_json(&pact_json);
1055 load_pact_from_json(href.as_str(), &pact_json)
1056 .map(|pact| (pact, Some(context), links))
1057 .map_err(|err| PactBrokerError::ContentError(format!("{}", err)))
1058 },
1059 Err(err) => Err(err)
1060 }
1061 })
1062 .into_stream()
1063 .collect()
1064 .await;
1065
1066 Ok(results)
1067}
1068
1069pub async fn fetch_pact_from_url(
1073 url: &str,
1074 auth: &Option<HttpAuth>,
1075) -> anyhow::Result<(Box<dyn Pact + Send + Sync + RefUnwindSafe>, Vec<Link>)> {
1076 let url = url.to_string();
1077 let auth = auth.clone();
1078 let (url, pact_json) =
1079 tokio::task::spawn_blocking(move || http_utils::fetch_json_from_url(&url, &auth)).await??;
1080 let pact = load_pact_from_json(&url, &pact_json)?;
1081 let links = links_from_json(&pact_json);
1082 Ok((pact, links))
1083}
1084
1085async fn publish_provider_tags(
1086 hal_client: &HALClient,
1087 links: &[Link],
1088 provider_tags: Vec<String>,
1089 version: &str,
1090 headers: Option<HashMap<String, String>>,
1091) -> Result<(), PactBrokerError> {
1092 let hal_client = hal_client
1093 .clone()
1094 .with_doc_context(links)?
1095 .navigate("pb:provider", &hashmap! {})
1096 .await?;
1097 match hal_client.find_link("pb:version-tag") {
1098 Ok(link) => {
1099 for tag in &provider_tags {
1100 let template_values = hashmap! {
1101 "version".to_string() => version.to_string(),
1102 "tag".to_string() => tag.clone()
1103 };
1104 match hal_client
1105 .clone()
1106 .put_json(
1107 hal_client
1108 .clone()
1109 .parse_link_url(&link, &template_values)?
1110 .as_str(),
1111 "{}",
1112 headers.clone(),
1113 )
1114 .await
1115 {
1116 Ok(_) => debug!("Pushed tag {} for provider version {}", tag, version),
1117 Err(err) => {
1118 error!(
1119 "Failed to push tag {} for provider version {}",
1120 tag, version
1121 );
1122 return Err(err);
1123 }
1124 }
1125 }
1126 Ok(())
1127 }
1128 Err(_) => Err(PactBrokerError::LinkError(
1129 "Can't publish provider tags as there is no 'pb:version-tag' link".to_string(),
1130 )),
1131 }
1132}
1133
1134async fn publish_provider_branch(
1135 hal_client: &HALClient,
1136 links: &[Link],
1137 branch: &str,
1138 version: &str,
1139 headers: Option<HashMap<String, String>>,
1140) -> Result<(), PactBrokerError> {
1141 let hal_client = hal_client
1142 .clone()
1143 .with_doc_context(links)?
1144 .navigate("pb:provider", &hashmap! {})
1145 .await?;
1146
1147 match hal_client.find_link("pb:branch-version") {
1148 Ok(link) => {
1149 let template_values = hashmap! {
1150 "branch".to_string() => branch.to_string(),
1151 "version".to_string() => version.to_string(),
1152 };
1153 match hal_client.clone().put_json(hal_client.clone().parse_link_url(&link, &template_values)?.as_str(), "{}",headers).await {
1154 Ok(_) => debug!("Pushed branch {} for provider version {}", branch, version),
1155 Err(err) => {
1156 error!("Failed to push branch {} for provider version {}", branch, version);
1157 return Err(err);
1158 }
1159 }
1160 Ok(())
1161 },
1162 Err(_) => Err(PactBrokerError::LinkError("Can't publish provider branch as there is no 'pb:branch-version' link. Please ugrade to Pact Broker version 2.86.0 or later for branch support".to_string()))
1163 }
1164}
1165
1166#[skip_serializing_none]
1167#[derive(Serialize, Deserialize, Debug, Clone)]
1168#[serde(rename_all = "camelCase")]
1169pub struct ConsumerVersionSelector {
1171 pub consumer: Option<String>,
1173 pub tag: Option<String>,
1175 pub fallback_tag: Option<String>,
1177 pub latest: Option<bool>,
1179 pub deployed_or_released: Option<bool>,
1181 pub deployed: Option<bool>,
1183 pub released: Option<bool>,
1185 pub environment: Option<String>,
1187 pub main_branch: Option<bool>,
1189 pub branch: Option<String>,
1191 pub matching_branch: Option<bool>,
1193}
1194
1195#[derive(Serialize, Deserialize, Debug, Clone)]
1196#[serde(rename_all = "camelCase")]
1197struct PactsForVerificationResponse {
1198 #[serde(rename(deserialize = "_embedded"))]
1199 pub embedded: PactsForVerificationBody,
1200}
1201
1202#[derive(Serialize, Deserialize, Debug, Clone)]
1203#[serde(rename_all = "camelCase")]
1204struct PactsForVerificationBody {
1205 pub pacts: Vec<PactForVerification>,
1206}
1207
1208#[derive(Serialize, Deserialize, Debug, Clone)]
1209#[serde(rename_all = "camelCase")]
1210struct PactForVerification {
1211 pub short_description: String,
1212 #[serde(rename(deserialize = "_links"))]
1213 pub links: HashMap<String, Link>,
1214 pub verification_properties: Option<PactVerificationProperties>,
1215}
1216
1217#[skip_serializing_none]
1218#[derive(Serialize, Deserialize, Debug, Clone)]
1219#[serde(rename_all = "camelCase")]
1220pub struct PactsForVerificationRequest {
1222 #[serde(skip_serializing_if = "Vec::is_empty")]
1224 pub provider_version_tags: Vec<String>,
1225 pub include_pending_status: bool,
1227 pub include_wip_pacts_since: Option<String>,
1229 pub consumer_version_selectors: Vec<ConsumerVersionSelector>,
1231 pub provider_version_branch: Option<String>,
1233}
1234
1235#[skip_serializing_none]
1236#[derive(Serialize, Deserialize, Debug, Clone)]
1237#[serde(rename_all = "camelCase")]
1238pub struct PactVerificationContext {
1240 pub short_description: String,
1242 pub verification_properties: PactVerificationProperties,
1244}
1245
1246impl From<&PactForVerification> for PactVerificationContext {
1247 fn from(value: &PactForVerification) -> Self {
1248 PactVerificationContext {
1249 short_description: value.short_description.clone(),
1250 verification_properties: value.verification_properties.clone().unwrap_or_default(),
1251 }
1252 }
1253}
1254
1255#[skip_serializing_none]
1256#[derive(Serialize, Deserialize, Debug, Clone, Default)]
1257#[serde(rename_all = "camelCase")]
1258pub struct PactVerificationProperties {
1260 #[serde(default)]
1261 pub pending: bool,
1263 pub notices: Vec<HashMap<String, String>>,
1265}