1use std::collections::HashMap;
4use std::ops::Not;
5use std::panic::RefUnwindSafe;
6use std::str::from_utf8;
7
8use anyhow::anyhow;
9use futures::stream::*;
10
11use itertools::Itertools;
12use maplit::hashmap;
13
14use pact_models::http_utils;
15use pact_models::http_utils::HttpAuth;
16use pact_models::json_utils::json_to_string;
17use pact_models::pact::{Pact, load_pact_from_json};
18use regex::{Captures, Regex};
19use reqwest::{Method, Url};
20use serde::{Deserialize, Serialize};
21use serde_json::{Value, json};
22use serde_with::skip_serializing_none;
23use tracing::{debug, error, info, trace, warn};
24pub mod branches;
25pub mod can_i_deploy;
26pub mod deployments;
27pub mod environments;
28pub mod pact_publish;
29pub mod pacticipants;
30pub mod pacts;
31pub mod subcommands;
32pub mod tags;
33#[cfg(test)]
34pub mod test_utils;
35pub mod types;
36pub mod utils;
37pub mod verification;
38pub mod versions;
39pub mod webhooks;
40use utils::with_retries;
41use http::Extensions;
43use opentelemetry::Context;
44use opentelemetry::global;
45use opentelemetry_http::HeaderInjector;
46use reqwest::Request;
47use reqwest::Response;
48use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
49use reqwest_middleware::{Middleware, Next};
50use reqwest_tracing::TracingMiddleware;
51use crate::cli::utils::{CYAN, GREEN, RED, YELLOW};
52
53use crate::cli::pact_broker::main::types::SslOptions;
54
55pub fn process_notices(notices: &[Notice]) {
56 for notice in notices {
57 let notice_text = notice.text.to_string();
58 let formatted_text = notice_text
59 .split_whitespace()
60 .map(|word| {
61 if word.starts_with("https") || word.starts_with("http") {
62 format!("{}", CYAN.apply_to(word))
63 } else {
64 match notice.type_field.as_str() {
65 "success" => format!("{}", GREEN.apply_to(word)),
66 "warning" | "prompt" => format!("{}", YELLOW.apply_to(word)),
67 "error" | "danger" => format!("{}", RED.apply_to(word)),
68 _ => word.to_string(),
69 }
70 }
71 })
72 .collect::<Vec<String>>()
73 .join(" ");
74 println!("{}", formatted_text);
75 }
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
79#[serde(rename_all = "camelCase")]
80pub struct Notice {
81 pub text: String,
82 #[serde(rename = "type")]
83 pub type_field: String,
84}
85
86fn is_true(object: &serde_json::Map<String, Value>, field: &str) -> bool {
87 match object.get(field) {
88 Some(json) => match *json {
89 serde_json::Value::Bool(b) => b,
90 _ => false,
91 },
92 None => false,
93 }
94}
95
96fn as_string(json: &Value) -> String {
97 match *json {
98 serde_json::Value::String(ref s) => s.clone(),
99 _ => format!("{}", json),
100 }
101}
102
103fn content_type(response: &reqwest::Response) -> String {
104 match response.headers().get("content-type") {
105 Some(value) => value.to_str().unwrap_or("text/plain").into(),
106 None => "text/plain".to_string(),
107 }
108}
109
110fn json_content_type(response: &reqwest::Response) -> bool {
111 match content_type(response).parse::<mime::Mime>() {
112 Ok(mime) => {
113 match (
114 mime.type_().as_str(),
115 mime.subtype().as_str(),
116 mime.suffix(),
117 ) {
118 ("application", "json", None) => true,
119 ("application", "hal", Some(mime::JSON)) => true,
120 _ => false,
121 }
122 }
123 Err(_) => false,
124 }
125}
126
127fn find_entry(map: &serde_json::Map<String, Value>, key: &str) -> Option<(String, Value)> {
128 match map.keys().find(|k| k.to_lowercase() == key.to_lowercase()) {
129 Some(k) => map.get(k).map(|v| (key.to_string(), v.clone())),
130 None => None,
131 }
132}
133
134#[derive(Debug, Clone, thiserror::Error)]
136pub enum PactBrokerError {
137 #[error("Error with a HAL link - {0}")]
139 LinkError(String),
140 #[error("Error with the content of a HAL resource - {0}")]
142 ContentError(String),
143 #[error("IO Error - {0}")]
144 IoError(String),
146 #[error("Link/Resource was not found - {0}")]
148 NotFound(String),
149 #[error("Invalid URL - {0}")]
151 UrlError(String),
152 #[error("failed validation - {0:?}")]
154 ValidationError(Vec<String>),
155 #[error("failed validation - {0:?}")]
157 ValidationErrorWithNotices(Vec<String>, Vec<Notice>),
158}
159
160impl PartialEq<String> for PactBrokerError {
161 fn eq(&self, other: &String) -> bool {
162 let mut buffer = String::new();
163 match self {
164 PactBrokerError::LinkError(s) => buffer.push_str(s),
165 PactBrokerError::ContentError(s) => buffer.push_str(s),
166 PactBrokerError::IoError(s) => buffer.push_str(s),
167 PactBrokerError::NotFound(s) => buffer.push_str(s),
168 PactBrokerError::UrlError(s) => buffer.push_str(s),
169 PactBrokerError::ValidationError(errors) => {
170 buffer.push_str(errors.iter().join(", ").as_str())
171 },
172 PactBrokerError::ValidationErrorWithNotices(errors, _) => {
173 buffer.push_str(errors.iter().join(", ").as_str())
174 }
175 };
176 buffer == *other
177 }
178}
179
180impl<'a> PartialEq<&'a str> for PactBrokerError {
181 fn eq(&self, other: &&str) -> bool {
182 let message = match self {
183 PactBrokerError::LinkError(s) => s.clone(),
184 PactBrokerError::ContentError(s) => s.clone(),
185 PactBrokerError::IoError(s) => s.clone(),
186 PactBrokerError::NotFound(s) => s.clone(),
187 PactBrokerError::UrlError(s) => s.clone(),
188 PactBrokerError::ValidationError(errors) => errors.iter().join(", "),
189 PactBrokerError::ValidationErrorWithNotices(errors, _) => errors.iter().join(", "),
190 };
191 message.as_str() == *other
192 }
193}
194
195impl From<url::ParseError> for PactBrokerError {
196 fn from(err: url::ParseError) -> Self {
197 PactBrokerError::UrlError(format!("{}", err))
198 }
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202#[serde(default)]
203pub struct Link {
205 pub name: String,
207 pub href: Option<String>,
209 pub templated: bool,
211 pub title: Option<String>,
213}
214
215impl Link {
216 pub fn from_json(link: &str, link_data: &serde_json::Map<String, serde_json::Value>) -> Link {
218 Link {
219 name: link.to_string(),
220 href: find_entry(link_data, &"href".to_string()).map(|(_, href)| as_string(&href)),
221 templated: is_true(link_data, "templated"),
222 title: link_data.get("title").map(|title| as_string(title)),
223 }
224 }
225
226 pub fn as_json(&self) -> serde_json::Value {
228 match (self.href.clone(), self.title.clone()) {
229 (Some(href), Some(title)) => json!({
230 "href": href,
231 "title": title,
232 "templated": self.templated
233 }),
234 (Some(href), None) => json!({
235 "href": href,
236 "templated": self.templated
237 }),
238 (None, Some(title)) => json!({
239 "title": title,
240 "templated": self.templated
241 }),
242 (None, None) => json!({
243 "templated": self.templated
244 }),
245 }
246 }
247}
248
249impl Default for Link {
250 fn default() -> Self {
251 Link {
252 name: "link".to_string(),
253 href: None,
254 templated: false,
255 title: None,
256 }
257 }
258}
259
260#[derive(Clone)]
262pub struct HALClient {
263 client: ClientWithMiddleware,
264 url: String,
265 path_info: Option<Value>,
266 auth: Option<HttpAuth>,
267 ssl_options: SslOptions,
268 retries: u8,
269}
270
271struct OtelPropagatorMiddleware;
272
273#[async_trait::async_trait]
274impl Middleware for OtelPropagatorMiddleware {
275 async fn handle(
276 &self,
277 mut req: Request,
278 extensions: &mut Extensions,
279 next: Next<'_>,
280 ) -> reqwest_middleware::Result<Response> {
281 let cx = Context::current();
282 let mut headers = reqwest::header::HeaderMap::new();
283 global::get_text_map_propagator(|propagator| {
284 propagator.inject_context(&cx, &mut HeaderInjector(&mut headers))
285 });
286 headers.append(
287 "baggage",
288 reqwest::header::HeaderValue::from_static("is_synthetic=true"),
289 );
290
291 for (key, value) in headers.iter() {
292 req.headers_mut().append(key, value.clone());
293 }
294 let res = next.run(req, extensions).await;
295 res
296 }
297}
298
299pub trait WithCurrentSpan {
300 fn with_current_span<F, R>(&self, f: F) -> R
301 where
302 F: FnOnce() -> R;
303}
304
305impl<T> WithCurrentSpan for T {
306 fn with_current_span<F, R>(&self, f: F) -> R
307 where
308 F: FnOnce() -> R,
309 {
310 let span = tracing::Span::current();
311 let _enter = span.enter();
312 f()
313 }
314}
315
316impl HALClient {
317 pub fn with_url(url: &str, auth: Option<HttpAuth>, ssl_options: SslOptions) -> HALClient {
319 HALClient {
320 url: url.to_string(),
321 auth: auth.clone(),
322 ssl_options: ssl_options.clone(),
323 ..HALClient::setup(url, auth, ssl_options)
324 }
325 }
326
327 fn update_path_info(self, path_info: serde_json::Value) -> HALClient {
328 HALClient {
329 client: self.client.clone(),
330 url: self.url.clone(),
331 path_info: Some(path_info),
332 auth: self.auth,
333 retries: self.retries,
334 ssl_options: self.ssl_options,
335 }
336 }
337
338 pub async fn navigate(
340 self,
341 link: &'static str,
342 template_values: &HashMap<String, String>,
343 ) -> Result<HALClient, PactBrokerError> {
344 trace!(
345 "navigate(link='{}', template_values={:?})",
346 link, template_values
347 );
348
349 let client = if self.path_info.is_none() {
350 let path_info = self.clone().fetch("/".into()).await?;
351 self.update_path_info(path_info)
352 } else {
353 self
354 };
355
356 let path_info = client.clone().fetch_link(link, template_values).await?;
357 let client = client.update_path_info(path_info);
358
359 Ok(client)
360 }
361
362 fn find_link(&self, link: &'static str) -> Result<Link, PactBrokerError> {
363 match self.path_info {
364 None => Err(PactBrokerError::LinkError(format!("No previous resource has been fetched from the pact broker. URL: '{}', LINK: '{}'",
365 self.url, link))),
366 Some(ref json) => match json.get("_links") {
367 Some(json) => match json.get(link) {
368 Some(link_data) => link_data.as_object()
369 .map(|link_data| Link::from_json(&link.to_string(), &link_data))
370 .ok_or_else(|| PactBrokerError::LinkError(format!("Link is malformed, expected an object but got {}. URL: '{}', LINK: '{}'",
371 link_data, self.url, link))),
372 None => Err(PactBrokerError::LinkError(format!("Link '{}' was not found in the response, only the following links where found: {:?}. URL: '{}', LINK: '{}'",
373 link, json.as_object().unwrap_or(&json!({}).as_object().unwrap()).keys().join(", "), self.url, link)))
374 },
375 None => Err(PactBrokerError::LinkError(format!("Expected a HAL+JSON response from the pact broker, but got a response with no '_links'. URL: '{}', LINK: '{}'",
376 self.url, link)))
377 }
378 }
379 }
380
381 async fn fetch_link(
382 self,
383 link: &'static str,
384 template_values: &HashMap<String, String>,
385 ) -> Result<Value, PactBrokerError> {
386 trace!(
387 "fetch_link(link='{}', template_values={:?})",
388 link, template_values
389 );
390
391 let link_data = self.find_link(link)?;
392
393 self.fetch_url(&link_data, template_values).await
394 }
395
396 pub async fn fetch_url(
398 self,
399 link: &Link,
400 template_values: &HashMap<String, String>,
401 ) -> Result<Value, PactBrokerError> {
402 debug!(
403 "fetch_url(link={:?}, template_values={:?})",
404 link, template_values
405 );
406
407 let link_url = if link.templated {
408 debug!("Link URL is templated");
409 self.clone().parse_link_url(&link, &template_values)
410 } else {
411 link.href.clone().ok_or_else(|| {
412 PactBrokerError::LinkError(format!(
413 "Link is malformed, there is no href. URL: '{}', LINK: '{}'",
414 self.url, link.name
415 ))
416 })
417 }?;
418
419 let base_url = self.url.parse::<Url>()?;
420 let joined_url = base_url.join(&link_url)?;
421 self.fetch(joined_url.path().into()).await
422 }
423 pub async fn delete_url(
424 self,
425 link: &Link,
426 template_values: &HashMap<String, String>,
427 ) -> Result<Value, PactBrokerError> {
428 debug!(
429 "fetch_url(link={:?}, template_values={:?})",
430 link, template_values
431 );
432
433 let link_url = if link.templated {
434 debug!("Link URL is templated");
435 self.clone().parse_link_url(&link, &template_values)
436 } else {
437 link.href.clone().ok_or_else(|| {
438 PactBrokerError::LinkError(format!(
439 "Link is malformed, there is no href. URL: '{}', LINK: '{}'",
440 self.url, link.name
441 ))
442 })
443 }?;
444
445 let base_url = self.url.parse::<Url>()?;
446 debug!("base_url: {}", base_url);
447 debug!("link_url: {}", link_url);
448 let joined_url = base_url.join(&link_url)?;
449 debug!("joined_url: {}", joined_url);
450 self.delete(joined_url.path().into()).await
451 }
452
453 pub async fn fetch(self, path: &str) -> Result<Value, PactBrokerError> {
454 info!("Fetching path '{}' from pact broker", path);
455
456 let broker_url = self.url.parse::<Url>()?;
457 let context_path = broker_url.path();
458 let url = if context_path.is_empty().not()
459 && context_path != "/"
460 && path.starts_with(context_path)
461 {
462 let mut base_url = broker_url.clone();
463 base_url.set_path("/");
464 base_url.join(path)?
465 } else {
466 broker_url.join(path)?
467 };
468
469 let request_builder = match self.auth {
470 Some(ref auth) => match auth {
471 HttpAuth::User(username, password) => {
472 self.client.get(url).basic_auth(username, password.clone())
473 }
474 HttpAuth::Token(token) => self.client.get(url).bearer_auth(token),
475 _ => self.client.get(url),
476 },
477 None => self.client.get(url),
478 }
479 .header("accept", "application/hal+json, application/json");
480
481 let response = utils::with_retries(self.retries, request_builder)
482 .await
483 .map_err(|err| {
484 PactBrokerError::IoError(format!(
485 "Failed to access pact broker path '{}' - {}. URL: '{}'",
486 &path, err, &self.url,
487 ))
488 })?;
489
490 self.parse_broker_response(path.to_string(), response).await
491 }
492
493 pub async fn delete(self, path: &str) -> Result<Value, PactBrokerError> {
494 info!("Deleting path '{}' from pact broker", path);
495
496 let broker_url = self.url.parse::<Url>()?;
497 let context_path = broker_url.path();
498 let url = if context_path.is_empty().not()
499 && context_path != "/"
500 && path.starts_with(context_path)
501 {
502 let mut base_url = broker_url.clone();
503 base_url.set_path("/");
504 base_url.join(path)?
505 } else {
506 broker_url.join(path)?
507 };
508
509 let request_builder = match self.auth {
510 Some(ref auth) => match auth {
511 HttpAuth::User(username, password) => self
512 .client
513 .delete(url)
514 .basic_auth(username, password.clone()),
515 HttpAuth::Token(token) => self.client.delete(url).bearer_auth(token),
516 _ => self.client.delete(url),
517 },
518 None => self.client.delete(url),
519 };
520
521 let response = utils::with_retries(self.retries, request_builder)
522 .await
523 .map_err(|err| {
524 PactBrokerError::IoError(format!(
525 "Failed to delete pact broker path '{}' - {}. URL: '{}'",
526 &path, err, &self.url,
527 ))
528 })?;
529
530 self.parse_broker_response(path.to_string(), response).await
531 }
532
533 async fn parse_broker_response(
534 &self,
535 path: String,
536 response: reqwest::Response,
537 ) -> Result<Value, PactBrokerError> {
538 let is_json_content_type = json_content_type(&response);
539 let content_type = content_type(&response);
540 let status_code = response.status();
541
542 if status_code.is_success() {
543 if is_json_content_type {
544 response.json::<Value>()
545 .await
546 .map_err(|err| PactBrokerError::ContentError(
547 format!("Did not get a valid HAL response body from pact broker path '{}' - {}. URL: '{}'",
548 path, err, self.url)
549 ))
550 } else if status_code.as_u16() == 204 {
551 Ok(json!({}))
552 } else {
553 debug!("Request from broker was a success, but the response body was not JSON");
554 Err(PactBrokerError::ContentError(format!(
555 "Did not get a valid HAL response body from pact broker path '{}', content type is '{}'. URL: '{}'",
556 path, content_type, self.url
557 )))
558 }
559 } else if status_code.as_u16() == 404 {
560 Err(PactBrokerError::NotFound(format!(
561 "Request to pact broker path '{}' failed: {}. URL: '{}'",
562 path, status_code, self.url
563 )))
564 } else {
565 let body = response.bytes().await.map_err(|_| {
567 PactBrokerError::IoError(format!(
568 "Failed to download response body for path '{}'. URL: '{}'",
569 &path, self.url
570 ))
571 })?;
572
573 if is_json_content_type {
574 match serde_json::from_slice::<Value>(&body) {
575 Ok(json_body) => {
576 if json_body.get("errors").is_some() || json_body.get("notices").is_some() {
577 Err(handle_validation_errors(json_body))
578 } else {
579 Err(PactBrokerError::IoError(format!(
580 "Request to pact broker path '{}' failed: {}. Response: {}. URL: '{}'",
581 path, status_code, json_body, self.url
582 )))
583 }
584 }
585 Err(_) => {
586 let body_text = from_utf8(&body)
587 .map(|b| b.to_string())
588 .unwrap_or_else(|err| format!("could not read body: {}", err));
589 error!("Request to pact broker path '{}' failed: {}", path, body_text);
590 Err(PactBrokerError::IoError(format!(
591 "Request to pact broker path '{}' failed: {}. URL: '{}'",
592 path, status_code, self.url
593 )))
594 }
595 }
596 } else {
597 let body_text = from_utf8(&body)
598 .map(|b| b.to_string())
599 .unwrap_or_else(|err| format!("could not read body: {}", err));
600 error!("Request to pact broker path '{}' failed: {}", path, body_text);
601 Err(PactBrokerError::IoError(format!(
602 "Request to pact broker path '{}' failed: {}. URL: '{}'",
603 path, status_code, self.url
604 )))
605 }
606 }
607 }
608
609 fn parse_link_url(
610 &self,
611 link: &Link,
612 values: &HashMap<String, String>,
613 ) -> Result<String, PactBrokerError> {
614 match link.href {
615 Some(ref href) => {
616 debug!("templated URL = {}", href);
617 let re = Regex::new(r"\{(\w+)}").unwrap();
618 let final_url = re.replace_all(href, |caps: &Captures| {
619 let lookup = caps.get(1).unwrap().as_str();
620 trace!("Looking up value for key '{}'", lookup);
621 match values.get(lookup) {
622 Some(val) => urlencoding::encode(val.as_str()).to_string(),
623 None => {
624 warn!(
625 "No value was found for key '{}', mapped values are {:?}",
626 lookup, values
627 );
628 format!("{{{}}}", lookup)
629 }
630 }
631 });
632 debug!("final URL = {}", final_url);
633 Ok(final_url.to_string())
634 }
635 None => Err(PactBrokerError::LinkError(format!(
636 "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', LINK: '{}'",
637 self.url, link.name
638 ))),
639 }
640 }
641
642 pub fn iter_links(&self, link: &str) -> Result<Vec<Link>, PactBrokerError> {
644 match self.path_info {
645 None => Err(PactBrokerError::LinkError(format!("No previous resource has been fetched from the pact broker. URL: '{}', LINK: '{}'",
646 self.url, link))),
647 Some(ref json) => match json.get("_links") {
648 Some(json) => match json.get(&link) {
649 Some(link_data) => link_data.as_array()
650 .map(|link_data| link_data.iter().map(|link_json| match link_json {
651 Value::Object(data) => Link::from_json(&link, data),
652 Value::String(s) => Link { name: link.to_string(), href: Some(s.clone()), templated: false, title: None },
653 _ => Link { name: link.to_string(), href: Some(link_json.to_string()), templated: false, title: None }
654 }).collect())
655 .ok_or_else(|| PactBrokerError::LinkError(format!("Link is malformed, expected an object but got {}. URL: '{}', LINK: '{}'",
656 link_data, self.url, link))),
657 None => Err(PactBrokerError::LinkError(format!("Link '{}' was not found in the response, only the following links where found: {:?}. URL: '{}', LINK: '{}'",
658 link, json.as_object().unwrap_or(&json!({}).as_object().unwrap()).keys().join(", "), self.url, link)))
659 },
660 None => Err(PactBrokerError::LinkError(format!("Expected a HAL+JSON response from the pact broker, but got a response with no '_links'. URL: '{}', LINK: '{}'",
661 self.url, link)))
662 }
663 }
664 }
665
666 pub async fn post_json(
667 &self,
668 url: &str,
669 body: &str,
670 headers: Option<HashMap<String, String>>,
671 ) -> Result<serde_json::Value, PactBrokerError> {
672 trace!("post_json(url='{}', body='{}')", url, body);
673
674 self.send_document(url, body, Method::POST, headers).await
675 }
676
677 pub async fn put_json(
678 &self,
679 url: &str,
680 body: &str,
681 headers: Option<HashMap<String, String>>,
682 ) -> Result<serde_json::Value, PactBrokerError> {
683 trace!("put_json(url='{}', body='{}')", url, body);
684
685 self.send_document(url, body, Method::PUT, headers).await
686 }
687 pub async fn patch_json(
688 &self,
689 url: &str,
690 body: &str,
691 headers: Option<HashMap<String, String>>,
692 ) -> Result<serde_json::Value, PactBrokerError> {
693 trace!("put_json(url='{}', body='{}')", url, body);
694
695 self.send_document(url, body, Method::PATCH, headers).await
696 }
697
698 async fn send_document(
699 &self,
700 url: &str,
701 body: &str,
702 method: Method,
703 headers: Option<HashMap<String, String>>,
704 ) -> Result<Value, PactBrokerError> {
705 let method_type = method.clone();
706 debug!("Sending JSON to {} using {}: {}", url, method, body);
707
708 let base_url = &self.url.parse::<Url>()?;
709 let url = if url.starts_with("/") {
710 base_url.join(url)?
711 } else {
712 let url = url.parse::<Url>()?;
713 base_url.join(&url.path())?
714 };
715
716 let request_builder = match self.auth {
717 Some(ref auth) => match auth {
718 HttpAuth::User(username, password) => self
719 .client
720 .request(method, url.clone())
721 .basic_auth(username, password.clone()),
722 HttpAuth::Token(token) => {
723 self.client.request(method, url.clone()).bearer_auth(token)
724 }
725 _ => self.client.request(method, url.clone()),
726 },
727 None => self.client.request(method, url.clone()),
728 }
729 .header("Accept", "application/hal+json")
730 .body(body.to_string());
731
732 let request_builder = if let Some(ref headers) = headers {
735 headers
736 .iter()
737 .fold(request_builder, |builder, (key, value)| {
738 builder.header(key.as_str(), value.as_str())
739 })
740 } else {
741 request_builder
742 };
743
744 let request_builder = if method_type == Method::PATCH {
745 request_builder.header("Content-Type", "application/merge-patch+json")
746 } else {
747 request_builder.header("Content-Type", "application/json")
748 };
749 let response = with_retries(self.retries, request_builder).await;
750 match response {
751 Ok(res) => {
752 self.parse_broker_response(url.path().to_string(), res)
753 .await
754 }
755 Err(err) => Err(PactBrokerError::IoError(format!(
756 "Failed to send JSON to the pact broker URL '{}' - IoError {}",
757 url, err
758 ))),
759 }
760 }
761
762 fn with_doc_context(self, doc_attributes: &[Link]) -> Result<HALClient, PactBrokerError> {
763 let links: serde_json::Map<String, serde_json::Value> = doc_attributes
764 .iter()
765 .map(|link| (link.name.clone(), link.as_json()))
766 .collect();
767 let links_json = json!({
768 "_links": json!(links)
769 });
770 Ok(self.update_path_info(links_json))
771 }
772}
773
774fn handle_validation_errors(body: Value) -> PactBrokerError {
775 match &body {
776 Value::Object(attrs) => {
777 let notices: Vec<Notice> = attrs.get("notices")
779 .and_then(|n| n.as_array())
780 .map(|notices_array| {
781 notices_array.iter()
782 .filter_map(|notice| serde_json::from_value::<Notice>(notice.clone()).ok())
783 .collect()
784 })
785 .unwrap_or_default();
786
787 if let Some(errors) = attrs.get("errors") {
788 let error_messages = match errors {
789 Value::Array(values) => values.iter().map(|v| json_to_string(v)).collect(),
790 Value::Object(errors) => errors
791 .iter()
792 .map(|(field, errors)| match errors {
793 Value::String(error) => format!("{}: {}", field, error),
794 Value::Array(errors) => format!(
795 "{}: {}",
796 field,
797 errors.iter().map(|err| json_to_string(err)).join(", ")
798 ),
799 _ => format!("{}: {}", field, errors),
800 })
801 .collect(),
802 Value::String(s) => vec![s.clone()],
803 _ => vec![errors.to_string()],
804 };
805
806 if !notices.is_empty() {
807 PactBrokerError::ValidationErrorWithNotices(error_messages, notices)
808 } else {
809 PactBrokerError::ValidationError(error_messages)
810 }
811 } else if !notices.is_empty() {
812 let notice_messages = notices.iter().map(|n| n.text.clone()).collect();
814 PactBrokerError::ValidationErrorWithNotices(notice_messages, notices)
815 } else {
816 PactBrokerError::ValidationError(vec![body.to_string()])
817 }
818 }
819 Value::String(s) => PactBrokerError::ValidationError(vec![s.clone()]),
820 _ => PactBrokerError::ValidationError(vec![body.to_string()]),
821 }
822}
823
824impl HALClient {
825 pub fn setup(url: &str, auth: Option<HttpAuth>, ssl_options: SslOptions) -> HALClient {
826 let mut builder = reqwest::Client::builder().user_agent(format!(
827 "{}/{}",
828 env!("CARGO_PKG_NAME"),
829 env!("CARGO_PKG_VERSION")
830 ));
831
832 debug!("Using ssl_options: {:?}", ssl_options);
833 if let Some(ref path) = ssl_options.ssl_cert_path {
834 if let Ok(cert_bytes) = std::fs::read(path) {
835 if let Ok(cert) = reqwest::Certificate::from_pem_bundle(&cert_bytes) {
836 debug!("Adding SSL certificate from path: {}", path);
837 for c in cert {
838 builder = builder.add_root_certificate(c.clone());
839 }
840 }
841 } else {
842 debug!(
843 "Could not read SSL certificate from provided path: {}",
844 path
845 );
846 }
847 }
848 if ssl_options.skip_ssl {
849 builder = builder.danger_accept_invalid_certs(true);
850 debug!("Skipping SSL certificate validation");
851 }
852 if !ssl_options.use_root_trust_store {
853 builder = builder.tls_built_in_root_certs(false);
854 debug!("Disabling root trust store for SSL");
855 }
856
857 let built_client = builder.build().unwrap();
858 let client = ClientBuilder::new(built_client)
859 .with(TracingMiddleware::default())
860 .with(OtelPropagatorMiddleware)
861 .build();
862
863 HALClient {
864 client,
865 url: url.to_string(),
866 path_info: None,
867 auth,
868 retries: 3,
869 ssl_options,
870 }
871 }
872}
873
874pub fn links_from_json(json: &Value) -> Vec<Link> {
875 match json.get("_links") {
876 Some(json) => match json {
877 Value::Object(v) => v
878 .iter()
879 .map(|(link, json)| match json {
880 Value::Object(attr) => Link::from_json(link, attr),
881 _ => Link {
882 name: link.clone(),
883 ..Link::default()
884 },
885 })
886 .collect(),
887 _ => vec![],
888 },
889 None => vec![],
890 }
891}
892
893pub async fn fetch_pacts_from_broker(
895 broker_url: &str,
896 provider_name: &str,
897 auth: Option<HttpAuth>,
898 ssl_options: SslOptions,
899) -> anyhow::Result<
900 Vec<
901 anyhow::Result<(
902 Box<dyn Pact + Send + Sync + RefUnwindSafe>,
903 Option<PactVerificationContext>,
904 Vec<Link>,
905 )>,
906 >,
907> {
908 trace!(
909 "fetch_pacts_from_broker(broker_url='{}', provider_name='{}', auth={})",
910 broker_url,
911 provider_name,
912 auth.clone().unwrap_or_default()
913 );
914
915 let mut hal_client = HALClient::with_url(broker_url, auth, ssl_options);
916 let template_values = hashmap! { "provider".to_string() => provider_name.to_string() };
917
918 hal_client = hal_client
919 .navigate("pb:latest-provider-pacts", &template_values)
920 .await
921 .map_err(move |err| match err {
922 PactBrokerError::NotFound(_) => PactBrokerError::NotFound(format!(
923 "No pacts for provider '{}' where found in the pact broker. URL: '{}'",
924 provider_name, broker_url
925 )),
926 _ => err,
927 })?;
928
929 let pact_links = hal_client.clone().iter_links("pacts")?;
930
931 let results: Vec<_> = futures::stream::iter(pact_links)
932 .map(|ref pact_link| {
933 match pact_link.href {
934 Some(_) => Ok((hal_client.clone(), pact_link.clone())),
935 None => Err(
936 PactBrokerError::LinkError(
937 format!(
938 "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', LINK: '{:?}'",
939 &hal_client.url,
940 pact_link
941 )
942 )
943 )
944 }
945 })
946 .and_then(|(hal_client, pact_link)| async {
947 let pact_json = hal_client.fetch_url(
948 &pact_link.clone(),
949 &template_values.clone()
950 ).await?;
951 Ok((pact_link, pact_json))
952 })
953 .map(|result| {
954 match result {
955 Ok((pact_link, pact_json)) => {
956 let href = pact_link.href.unwrap_or_default();
957 let links = links_from_json(&pact_json);
958 load_pact_from_json(href.as_str(), &pact_json)
959 .map(|pact| (pact, None, links))
960 },
961 Err(err) => Err(err.into())
962 }
963 })
964 .into_stream()
965 .collect()
966 .await;
967
968 Ok(results)
969}
970
971pub async fn fetch_pacts_dynamically_from_broker(
973 broker_url: &str,
974 provider_name: String,
975 pending: bool,
976 include_wip_pacts_since: Option<String>,
977 provider_tags: Vec<String>,
978 provider_branch: Option<String>,
979 consumer_version_selectors: Vec<ConsumerVersionSelector>,
980 auth: Option<HttpAuth>,
981 ssl_options: SslOptions,
982 headers: Option<HashMap<String, String>>,
983) -> anyhow::Result<
984 Vec<
985 Result<
986 (
987 Box<dyn Pact + Send + Sync + RefUnwindSafe>,
988 Option<PactVerificationContext>,
989 Vec<Link>,
990 ),
991 PactBrokerError,
992 >,
993 >,
994> {
995 trace!(
996 "fetch_pacts_dynamically_from_broker(broker_url='{}', provider_name='{}', pending={}, \
997 include_wip_pacts_since={:?}, provider_tags: {:?}, consumer_version_selectors: {:?}, auth={})",
998 broker_url,
999 provider_name,
1000 pending,
1001 include_wip_pacts_since,
1002 provider_tags,
1003 consumer_version_selectors,
1004 auth.clone().unwrap_or_default()
1005 );
1006
1007 let mut hal_client = HALClient::with_url(broker_url, auth, ssl_options);
1008 let template_values = hashmap! { "provider".to_string() => provider_name.clone() };
1009
1010 hal_client = hal_client
1011 .navigate("pb:provider-pacts-for-verification", &template_values)
1012 .await
1013 .map_err(move |err| match err {
1014 PactBrokerError::NotFound(_) => PactBrokerError::NotFound(format!(
1015 "No pacts for provider '{}' were found in the pact broker. URL: '{}'",
1016 provider_name.clone(),
1017 broker_url
1018 )),
1019 _ => err,
1020 })?;
1021
1022 let pacts_for_verification = PactsForVerificationRequest {
1024 provider_version_tags: provider_tags,
1025 provider_version_branch: provider_branch,
1026 include_wip_pacts_since,
1027 consumer_version_selectors,
1028 include_pending_status: pending,
1029 };
1030 let request_body = serde_json::to_string(&pacts_for_verification).unwrap();
1031
1032 let response = match hal_client.find_link("self") {
1034 Ok(link) => {
1035 let link = hal_client.clone().parse_link_url(&link, &hashmap! {})?;
1036 match hal_client
1037 .clone()
1038 .post_json(link.as_str(), request_body.as_str(), headers)
1039 .await
1040 {
1041 Ok(res) => Some(res),
1042 Err(err) => {
1043 info!("error response for pacts for verification: {} ", err);
1044 return Err(anyhow!(err));
1045 }
1046 }
1047 }
1048 Err(e) => return Err(anyhow!(e)),
1049 };
1050
1051 let pact_links = match response {
1053 Some(v) => {
1054 let pfv: PactsForVerificationResponse = serde_json::from_value(v)
1055 .map_err(|err| {
1056 trace!(
1057 "Failed to deserialise PactsForVerificationResponse: {}",
1058 err
1059 );
1060 err
1061 })
1062 .unwrap_or(PactsForVerificationResponse {
1063 embedded: PactsForVerificationBody { pacts: vec![] },
1064 });
1065 trace!(?pfv, "got pacts for verification response");
1066
1067 if pfv.embedded.pacts.len() == 0 {
1068 return Err(anyhow!(PactBrokerError::NotFound(format!(
1069 "No pacts were found for this provider"
1070 ))));
1071 };
1072
1073 let links: Result<Vec<(Link, PactVerificationContext)>, PactBrokerError> = pfv.embedded.pacts.iter().map(| p| {
1074 match p.links.get("self") {
1075 Some(l) => Ok((l.clone(), p.into())),
1076 None => Err(
1077 PactBrokerError::LinkError(
1078 format!(
1079 "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', PATH: '{:?}'",
1080 &hal_client.url,
1081 &p.links,
1082 )
1083 )
1084 )
1085 }
1086 }).collect();
1087
1088 links
1089 }
1090 None => Err(PactBrokerError::NotFound(format!(
1091 "No pacts were found for this provider"
1092 ))),
1093 }?;
1094
1095 let results: Vec<_> = futures::stream::iter(pact_links)
1096 .map(|(ref pact_link, ref context)| {
1097 match pact_link.href {
1098 Some(_) => Ok((hal_client.clone(), pact_link.clone(), context.clone())),
1099 None => Err(
1100 PactBrokerError::LinkError(
1101 format!(
1102 "Expected a HAL+JSON response from the pact broker, but got a link with no HREF. URL: '{}', LINK: '{:?}'",
1103 &hal_client.url,
1104 pact_link
1105 )
1106 )
1107 )
1108 }
1109 })
1110 .and_then(|(hal_client, pact_link, context)| async {
1111 let pact_json = hal_client.fetch_url(
1112 &pact_link.clone(),
1113 &template_values.clone()
1114 ).await?;
1115 Ok((pact_link, pact_json, context))
1116 })
1117 .map(|result| {
1118 match result {
1119 Ok((pact_link, pact_json, context)) => {
1120 let href = pact_link.href.unwrap_or_default();
1121 let links = links_from_json(&pact_json);
1122 load_pact_from_json(href.as_str(), &pact_json)
1123 .map(|pact| (pact, Some(context), links))
1124 .map_err(|err| PactBrokerError::ContentError(format!("{}", err)))
1125 },
1126 Err(err) => Err(err)
1127 }
1128 })
1129 .into_stream()
1130 .collect()
1131 .await;
1132
1133 Ok(results)
1134}
1135
1136pub async fn fetch_pact_from_url(
1140 url: &str,
1141 auth: &Option<HttpAuth>,
1142) -> anyhow::Result<(Box<dyn Pact + Send + Sync + RefUnwindSafe>, Vec<Link>)> {
1143 let url = url.to_string();
1144 let auth = auth.clone();
1145 let (url, pact_json) =
1146 tokio::task::spawn_blocking(move || http_utils::fetch_json_from_url(&url, &auth)).await??;
1147 let pact = load_pact_from_json(&url, &pact_json)?;
1148 let links = links_from_json(&pact_json);
1149 Ok((pact, links))
1150}
1151
1152async fn publish_provider_tags(
1153 hal_client: &HALClient,
1154 links: &[Link],
1155 provider_tags: Vec<String>,
1156 version: &str,
1157 headers: Option<HashMap<String, String>>,
1158) -> Result<(), PactBrokerError> {
1159 let hal_client = hal_client
1160 .clone()
1161 .with_doc_context(links)?
1162 .navigate("pb:provider", &hashmap! {})
1163 .await?;
1164 match hal_client.find_link("pb:version-tag") {
1165 Ok(link) => {
1166 for tag in &provider_tags {
1167 let template_values = hashmap! {
1168 "version".to_string() => version.to_string(),
1169 "tag".to_string() => tag.clone()
1170 };
1171 match hal_client
1172 .clone()
1173 .put_json(
1174 hal_client
1175 .clone()
1176 .parse_link_url(&link, &template_values)?
1177 .as_str(),
1178 "{}",
1179 headers.clone(),
1180 )
1181 .await
1182 {
1183 Ok(_) => debug!("Pushed tag {} for provider version {}", tag, version),
1184 Err(err) => {
1185 error!(
1186 "Failed to push tag {} for provider version {}",
1187 tag, version
1188 );
1189 return Err(err);
1190 }
1191 }
1192 }
1193 Ok(())
1194 }
1195 Err(_) => Err(PactBrokerError::LinkError(
1196 "Can't publish provider tags as there is no 'pb:version-tag' link".to_string(),
1197 )),
1198 }
1199}
1200
1201async fn publish_provider_branch(
1202 hal_client: &HALClient,
1203 links: &[Link],
1204 branch: &str,
1205 version: &str,
1206 headers: Option<HashMap<String, String>>,
1207) -> Result<(), PactBrokerError> {
1208 let hal_client = hal_client
1209 .clone()
1210 .with_doc_context(links)?
1211 .navigate("pb:provider", &hashmap! {})
1212 .await?;
1213
1214 match hal_client.find_link("pb:branch-version") {
1215 Ok(link) => {
1216 let template_values = hashmap! {
1217 "branch".to_string() => branch.to_string(),
1218 "version".to_string() => version.to_string(),
1219 };
1220 match hal_client.clone().put_json(hal_client.clone().parse_link_url(&link, &template_values)?.as_str(), "{}",headers).await {
1221 Ok(_) => debug!("Pushed branch {} for provider version {}", branch, version),
1222 Err(err) => {
1223 error!("Failed to push branch {} for provider version {}", branch, version);
1224 return Err(err);
1225 }
1226 }
1227 Ok(())
1228 },
1229 Err(_) => Err(PactBrokerError::LinkError("Can't publish provider branch as there is no 'pb:branch-version' link. Please ugrade to Pact Broker version 2.86.0 or later for branch support".to_string()))
1230 }
1231}
1232
1233#[skip_serializing_none]
1234#[derive(Serialize, Deserialize, Debug, Clone)]
1235#[serde(rename_all = "camelCase")]
1236pub struct ConsumerVersionSelector {
1238 pub consumer: Option<String>,
1240 pub tag: Option<String>,
1242 pub fallback_tag: Option<String>,
1244 pub latest: Option<bool>,
1246 pub deployed_or_released: Option<bool>,
1248 pub deployed: Option<bool>,
1250 pub released: Option<bool>,
1252 pub environment: Option<String>,
1254 pub main_branch: Option<bool>,
1256 pub branch: Option<String>,
1258 pub matching_branch: Option<bool>,
1260}
1261
1262#[derive(Serialize, Deserialize, Debug, Clone)]
1263#[serde(rename_all = "camelCase")]
1264struct PactsForVerificationResponse {
1265 #[serde(rename(deserialize = "_embedded"))]
1266 pub embedded: PactsForVerificationBody,
1267}
1268
1269#[derive(Serialize, Deserialize, Debug, Clone)]
1270#[serde(rename_all = "camelCase")]
1271struct PactsForVerificationBody {
1272 pub pacts: Vec<PactForVerification>,
1273}
1274
1275#[derive(Serialize, Deserialize, Debug, Clone)]
1276#[serde(rename_all = "camelCase")]
1277struct PactForVerification {
1278 pub short_description: String,
1279 #[serde(rename(deserialize = "_links"))]
1280 pub links: HashMap<String, Link>,
1281 pub verification_properties: Option<PactVerificationProperties>,
1282}
1283
1284#[skip_serializing_none]
1285#[derive(Serialize, Deserialize, Debug, Clone)]
1286#[serde(rename_all = "camelCase")]
1287pub struct PactsForVerificationRequest {
1289 #[serde(skip_serializing_if = "Vec::is_empty")]
1291 pub provider_version_tags: Vec<String>,
1292 pub include_pending_status: bool,
1294 pub include_wip_pacts_since: Option<String>,
1296 pub consumer_version_selectors: Vec<ConsumerVersionSelector>,
1298 pub provider_version_branch: Option<String>,
1300}
1301
1302#[skip_serializing_none]
1303#[derive(Serialize, Deserialize, Debug, Clone)]
1304#[serde(rename_all = "camelCase")]
1305pub struct PactVerificationContext {
1307 pub short_description: String,
1309 pub verification_properties: PactVerificationProperties,
1311}
1312
1313impl From<&PactForVerification> for PactVerificationContext {
1314 fn from(value: &PactForVerification) -> Self {
1315 PactVerificationContext {
1316 short_description: value.short_description.clone(),
1317 verification_properties: value.verification_properties.clone().unwrap_or_default(),
1318 }
1319 }
1320}
1321
1322#[skip_serializing_none]
1323#[derive(Serialize, Deserialize, Debug, Clone, Default)]
1324#[serde(rename_all = "camelCase")]
1325pub struct PactVerificationProperties {
1327 #[serde(default)]
1328 pub pending: bool,
1330 pub notices: Vec<HashMap<String, String>>,
1332}