1use std::collections::HashMap;
3use std::env;
4use std::fs;
5use std::fs::File;
6use std::io::{BufReader, Write};
7use std::path::PathBuf;
8use std::process::Stdio;
9use std::process::Command;
10use std::str::from_utf8;
11use std::str::FromStr;
12use std::sync::Mutex;
13use std::thread;
14
15use anyhow::{anyhow, bail, Context};
16use bytes::Bytes;
17use itertools::Either;
18use lazy_static::lazy_static;
19use log::max_level;
20use maplit::hashmap;
21use os_info::Type;
22use pact_models::bodies::OptionalBody;
23use pact_models::json_utils::json_to_string;
24use pact_models::PactSpecification;
25use pact_models::prelude::{ContentType, Pact};
26use pact_models::prelude::v4::V4Pact;
27use pact_models::v4::interaction::V4Interaction;
28use reqwest::Client;
29use semver::Version;
30use serde_json::Value;
31use sysinfo::{Pid,System};
32use tracing::{debug, info, trace, warn};
33
34use crate::catalogue_manager::{all_entries, CatalogueEntry, register_plugin_entries, remove_plugin_entries};
35use crate::child_process::ChildPluginProcess;
36use crate::content::ContentMismatch;
37use crate::download::{download_json_from_github, download_plugin_executable, fetch_json_from_url};
38use crate::metrics::send_metrics;
39use crate::mock_server::{MockServerConfig, MockServerDetails, MockServerResults};
40use crate::plugin_models::{PactPlugin, PactPluginManifest, PactPluginRpc, PluginDependency};
41use crate::proto::*;
42use crate::repository::{fetch_repository_index, USER_AGENT};
43use crate::utils::{optional_string, proto_value_to_json, to_proto_struct, to_proto_value, versions_compatible};
44use crate::verification::{InteractionVerificationData, InteractionVerificationResult};
45
46lazy_static! {
47 static ref PLUGIN_MANIFEST_REGISTER: Mutex<HashMap<String, PactPluginManifest>> = Mutex::new(HashMap::new());
48 static ref PLUGIN_REGISTER: Mutex<HashMap<String, PactPlugin>> = Mutex::new(HashMap::new());
49}
50
51pub async fn load_plugin(plugin: &PluginDependency) -> anyhow::Result<PactPlugin> {
54 let thread_id = thread::current().id();
55 debug!("Loading plugin {:?}", plugin);
56 trace!("Rust plugin driver version {}", option_env!("CARGO_PKG_VERSION").unwrap_or_default());
57 trace!("load_plugin {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
58 let mut inner = PLUGIN_REGISTER.lock().unwrap();
59 trace!("load_plugin {:?}: Got PLUGIN_REGISTER lock", thread_id);
60 let result = match lookup_plugin_inner(plugin, &mut inner) {
61 Some(plugin) => {
62 debug!("Found running plugin {:?}", plugin);
63 plugin.update_access();
64 Ok(plugin.clone())
65 },
66 None => {
67 debug!("Did not find plugin, will attempt to start it");
68 let manifest = match load_plugin_manifest(plugin) {
69 Ok(manifest) => manifest,
70 Err(err) => {
71 warn!("Could not load plugin manifest from disk, will try auto install it: {}", err);
72 let http_client = reqwest::ClientBuilder::new()
73 .user_agent(USER_AGENT)
74 .build()?;
75 let index = fetch_repository_index(&http_client, None).await?;
76 match index.lookup_plugin_version(&plugin.name, &plugin.version) {
77 Some(entry) => {
78 info!("Found an entry for the plugin in the plugin index, will try install that");
79 install_plugin_from_url(&http_client, entry.source.value().as_str()).await?
80 }
81 None => Err(err)?
82 }
83 }
84 };
85 send_metrics(&manifest);
86 initialise_plugin(&manifest, &mut inner).await
87 }
88 };
89 trace!("load_plugin {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
90 result
91}
92
93fn lookup_plugin_inner<'a>(
94 plugin: &PluginDependency,
95 plugin_register: &'a mut HashMap<String, PactPlugin>
96) -> Option<&'a mut PactPlugin> {
97 if let Some(version) = &plugin.version {
98 plugin_register.get_mut(format!("{}/{}", plugin.name, version).as_str())
99 } else {
100 plugin_register.iter_mut()
101 .filter(|(_, value)| value.manifest.name == plugin.name)
102 .max_by(|(_, v1), (_, v2)| v1.manifest.version.cmp(&v2.manifest.version))
103 .map(|(_, plugin)| plugin)
104 }
105}
106
107pub fn lookup_plugin(plugin: &PluginDependency) -> Option<PactPlugin> {
109 let thread_id = thread::current().id();
110 trace!("lookup_plugin {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
111 let mut inner = PLUGIN_REGISTER.lock().unwrap();
112 trace!("lookup_plugin {:?}: Got PLUGIN_REGISTER lock", thread_id);
113 let entry = lookup_plugin_inner(plugin, &mut inner);
114 trace!("lookup_plugin {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
115 entry.cloned()
116}
117
118pub fn load_plugin_manifest(plugin_dep: &PluginDependency) -> anyhow::Result<PactPluginManifest> {
121 debug!("Loading plugin manifest for plugin {:?}", plugin_dep);
122 match lookup_plugin_manifest(plugin_dep) {
123 Some(manifest) => Ok(manifest),
124 None => load_manifest_from_disk(plugin_dep)
125 }
126}
127
128fn load_manifest_from_disk(plugin_dep: &PluginDependency) -> anyhow::Result<PactPluginManifest> {
129 let plugin_dir = pact_plugin_dir()?;
130 debug!("Looking for plugin in {:?}", plugin_dir);
131
132 if plugin_dir.exists() {
133 load_manifest_from_dir(plugin_dep, &plugin_dir)
134 } else {
135 Err(anyhow!("Plugin directory {:?} does not exist", plugin_dir))
136 }
137}
138
139fn load_manifest_from_dir(plugin_dep: &PluginDependency, plugin_dir: &PathBuf) -> anyhow::Result<PactPluginManifest> {
140 let mut manifests = vec![];
141 for entry in fs::read_dir(plugin_dir)? {
142 let path = entry?.path();
143 trace!("Found: {:?}", path);
144
145 if path.is_dir() {
146 let manifest_file = path.join("pact-plugin.json");
147 if manifest_file.exists() && manifest_file.is_file() {
148 debug!("Found plugin manifest: {:?}", manifest_file);
149 let file = File::open(manifest_file)?;
150 let reader = BufReader::new(file);
151 let manifest: PactPluginManifest = serde_json::from_reader(reader)?;
152 trace!("Parsed plugin manifest: {:?}", manifest);
153 let version = manifest.version.clone();
154 if manifest.name == plugin_dep.name && versions_compatible(version.as_str(), &plugin_dep.version) {
155 let manifest = PactPluginManifest {
156 plugin_dir: path.to_string_lossy().to_string(),
157 ..manifest
158 };
159 manifests.push(manifest);
160 }
161 }
162 }
163 }
164
165 let manifest = manifests.iter()
166 .max_by(|a, b| {
167 let a = Version::parse(&a.version).unwrap_or_else(|_| Version::new(0, 0, 0));
168 let b = Version::parse(&b.version).unwrap_or_else(|_| Version::new(0, 0, 0));
169 a.cmp(&b)
170 });
171 if let Some(manifest) = manifest {
172 let key = format!("{}/{}", manifest.name, manifest.version);
173 {
174 let mut guard = PLUGIN_MANIFEST_REGISTER.lock().unwrap();
175 guard.insert(key.clone(), manifest.clone());
176 }
177 Ok(manifest.clone())
178 } else {
179 Err(anyhow!("Plugin {} was not found (in $HOME/.pact/plugins or $PACT_PLUGIN_DIR)", plugin_dep))
180 }
181}
182
183pub(crate) fn pact_plugin_dir() -> anyhow::Result<PathBuf> {
184 let env_var = env::var_os("PACT_PLUGIN_DIR");
185 let plugin_dir = env_var.unwrap_or_default();
186 let plugin_dir = plugin_dir.to_string_lossy();
187 if plugin_dir.is_empty() {
188 home::home_dir().map(|dir| dir.join(".pact").join("plugins"))
189 } else {
190 PathBuf::from_str(plugin_dir.as_ref()).ok()
191 }.ok_or_else(|| anyhow!("No Pact plugin directory was found (in $HOME/.pact/plugins or $PACT_PLUGIN_DIR)"))
192}
193
194pub fn lookup_plugin_manifest(plugin: &PluginDependency) -> Option<PactPluginManifest> {
196 let guard = PLUGIN_MANIFEST_REGISTER.lock().unwrap();
197 if let Some(version) = &plugin.version {
198 let key = format!("{}/{}", plugin.name, version);
199 guard.get(&key).cloned()
200 } else {
201 guard.iter()
202 .filter(|(_, value)| value.name == plugin.name)
203 .max_by(|(_, v1), (_, v2)| v1.version.cmp(&v2.version))
204 .map(|(_, p)| p.clone())
205 }
206}
207
208async fn initialise_plugin(
209 manifest: &PactPluginManifest,
210 plugin_register: &mut HashMap<String, PactPlugin>
211) -> anyhow::Result<PactPlugin> {
212 match manifest.executable_type.as_str() {
213 "exec" => {
214 let mut plugin = start_plugin_process(manifest).await?;
215 debug!("Plugin process started OK (port = {}), sending init message", plugin.port());
216
217 init_handshake(manifest, &mut plugin).await.map_err(|err| {
218 plugin.kill();
219 anyhow!("Failed to send init request to the plugin - {}", err)
220 })?;
221
222 let key = format!("{}/{}", manifest.name, manifest.version);
223 plugin_register.insert(key, plugin.clone());
224
225 Ok(plugin)
226 }
227 _ => Err(anyhow!("Plugin executable type of {} is not supported", manifest.executable_type))
228 }
229}
230
231pub async fn init_handshake(manifest: &PactPluginManifest, plugin: &mut (dyn PactPluginRpc + Send + Sync)) -> anyhow::Result<()> {
233 let request = InitPluginRequest {
234 implementation: "plugin-driver-rust".to_string(),
235 version: option_env!("CARGO_PKG_VERSION").unwrap_or("0").to_string()
236 };
237 let response = plugin.init_plugin(request).await?;
238 debug!("Got init response {:?} from plugin {}", response, manifest.name);
239 register_plugin_entries(manifest, &response.catalogue);
240 tokio::task::spawn(publish_updated_catalogue());
241 Ok(())
242}
243
244async fn start_plugin_process(manifest: &PactPluginManifest) -> anyhow::Result<PactPlugin> {
245 debug!("Starting plugin with manifest {:?}", manifest);
246
247 let os_info = os_info::get();
248 debug!("Detected OS: {}", os_info);
249 let mut path = if let Some(entry_point) = manifest.entry_points.get(&os_info.to_string()) {
250 PathBuf::from(entry_point)
251 } else if os_info.os_type() == Type::Windows && manifest.entry_points.contains_key("windows") {
252 PathBuf::from(manifest.entry_points.get("windows").unwrap())
253 } else {
254 PathBuf::from(&manifest.entry_point)
255 };
256 if !path.is_absolute() || !path.exists() {
257 path = PathBuf::from(manifest.plugin_dir.clone()).join(path);
258 }
259 debug!("Starting plugin using {:?}", &path);
260
261 let log_level = max_level();
262 let mut child_command = Command::new(path.clone());
263 let mut child_command = child_command
264 .env("LOG_LEVEL", log_level.to_string())
265 .env("RUST_LOG", log_level.to_string())
266 .current_dir(manifest.plugin_dir.clone());
267
268 if let Some(args) = &manifest.args {
269 child_command = child_command.args(args);
270 }
271
272 let child = child_command
273 .stdout(Stdio::piped())
274 .stderr(Stdio::piped())
275 .spawn()
276 .map_err(|err| anyhow!("Was not able to start plugin process for '{}' - {}",
277 path.to_string_lossy(), err))?;
278 let child_pid = child.id();
279 debug!("Plugin {} started with PID {}", manifest.name, child_pid);
280
281 match ChildPluginProcess::new(child, manifest).await {
282 Ok(child) => Ok(PactPlugin::new(manifest, child)),
283 Err(err) => {
284 let mut s = System::new();
285 s.refresh_processes();
286 if let Some(process) = s.process(Pid::from_u32(child_pid)) {
287 #[cfg(not(windows))]
288 process.kill();
289 #[cfg(windows)]
291 let _ = Command::new("taskkill.exe").arg("/PID").arg(child_pid.to_string()).arg("/F").arg("/T").output();
292 } else {
293 warn!("Child process with PID {} was not found", child_pid);
294 }
295 Err(err)
296 }
297 }
298}
299
300pub fn shutdown_plugins() {
302 let thread_id = thread::current().id();
303 debug!("Shutting down all plugins");
304 trace!("shutdown_plugins {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
305 let mut guard = PLUGIN_REGISTER.lock().unwrap();
306 trace!("shutdown_plugins {:?}: Got PLUGIN_REGISTER lock", thread_id);
307 for plugin in guard.values() {
308 debug!("Shutting down plugin {:?}", plugin);
309 plugin.kill();
310 remove_plugin_entries(&plugin.manifest.name);
311 }
312 guard.clear();
313 trace!("shutdown_plugins {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
314}
315
316pub fn shutdown_plugin(plugin: &mut PactPlugin) {
318 debug!("Shutting down plugin {}:{}", plugin.manifest.name, plugin.manifest.version);
319 plugin.kill();
320 remove_plugin_entries(&plugin.manifest.name);
321}
322
323pub async fn publish_updated_catalogue() {
325 let thread_id = thread::current().id();
326
327 let request = Catalogue {
328 catalogue: all_entries().iter()
329 .map(|entry| crate::proto::CatalogueEntry {
330 r#type: entry.entry_type.to_proto_type() as i32,
331 key: entry.key.clone(),
332 values: entry.values.clone()
333 }).collect()
334 };
335
336 let plugins = {
337 trace!("publish_updated_catalogue {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
338 let inner = PLUGIN_REGISTER.lock().unwrap();
339 trace!("publish_updated_catalogue {:?}: Got PLUGIN_REGISTER lock", thread_id);
340 let plugins = inner.values().cloned().collect::<Vec<_>>();
341 trace!("publish_updated_catalogue {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
342 plugins
343 };
344
345 for plugin in plugins {
346 if let Err(err) = plugin.update_catalogue(request.clone()).await {
347 warn!("Failed to send updated catalogue to plugin '{}' - {}", plugin.manifest.name, err);
348 }
349 }
350}
351
352#[tracing::instrument]
354pub fn increment_plugin_access(plugin: &PluginDependency) {
355 let thread_id = thread::current().id();
356
357 trace!("increment_plugin_access {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
358 let mut inner = PLUGIN_REGISTER.lock().unwrap();
359 trace!("increment_plugin_access {:?}: Got PLUGIN_REGISTER lock", thread_id);
360
361 if let Some(plugin) = lookup_plugin_inner(plugin, &mut inner) {
362 plugin.update_access();
363 }
364
365 trace!("increment_plugin_access {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
366}
367
368#[tracing::instrument]
370pub fn drop_plugin_access(plugin: &PluginDependency) {
371 let thread_id = thread::current().id();
372
373 trace!("drop_plugin_access {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
374 let mut inner = PLUGIN_REGISTER.lock().unwrap();
375 trace!("drop_plugin_access {:?}: Got PLUGIN_REGISTER lock", thread_id);
376
377 if let Some(plugin) = lookup_plugin_inner(plugin, &mut inner) {
378 let key = format!("{}/{}", plugin.manifest.name, plugin.manifest.version);
379 if plugin.drop_access() == 0 {
380 shutdown_plugin(plugin);
381 inner.remove(key.as_str());
382 }
383 }
384
385 trace!("drop_plugin_access {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
386}
387
388#[deprecated(note = "Use start_mock_server_v2 which takes a test context map", since = "0.2.2")]
390pub async fn start_mock_server(
391 catalogue_entry: &CatalogueEntry,
392 pact: Box<dyn Pact + Send + Sync>,
393 config: MockServerConfig
394) -> anyhow::Result<MockServerDetails> {
395 start_mock_server_v2(catalogue_entry, pact, config, hashmap!{}).await
396}
397
398pub async fn start_mock_server_v2(
403 catalogue_entry: &CatalogueEntry,
404 pact: Box<dyn Pact + Send + Sync>,
405 config: MockServerConfig,
406 test_context: HashMap<String, Value>
407) -> anyhow::Result<MockServerDetails> {
408 let manifest = catalogue_entry.plugin.as_ref()
409 .ok_or_else(|| anyhow!("Catalogue entry did not have an associated plugin manifest"))?;
410 let plugin = lookup_plugin(&manifest.as_dependency())
411 .ok_or_else(|| anyhow!("Did not find a running plugin for manifest {:?}", manifest))?;
412
413 debug!(plugin_name = manifest.name.as_str(), plugin_version = manifest.version.as_str(),
414 ?test_context, "Sending startMockServer request to plugin");
415 let request = StartMockServerRequest {
416 host_interface: config.host_interface.unwrap_or_default(),
417 port: config.port,
418 tls: config.tls,
419 pact: pact.to_json(PactSpecification::V4)?.to_string(),
420 test_context: Some(to_proto_struct(&test_context))
421 };
422 let response = plugin.start_mock_server(request).await?;
423 debug!("Got response ${response:?}");
424
425 let mock_server_response = response.response
426 .ok_or_else(|| anyhow!("Did not get a valid response from the start mock server call"))?;
427 match mock_server_response {
428 start_mock_server_response::Response::Error(err) => Err(anyhow!("Mock server failed to start: {}", err)),
429 start_mock_server_response::Response::Details(details) => Ok(MockServerDetails {
430 key: details.key.clone(),
431 base_url: details.address.clone(),
432 port: details.port,
433 plugin
434 })
435 }
436}
437
438pub async fn shutdown_mock_server(mock_server: &MockServerDetails) -> anyhow::Result<Vec<MockServerResults>> {
440 let request = ShutdownMockServerRequest {
441 server_key: mock_server.key.to_string()
442 };
443
444 debug!(
445 plugin_name = mock_server.plugin.manifest.name.as_str(),
446 plugin_version = mock_server.plugin.manifest.version.as_str(),
447 server_key = mock_server.key.as_str(),
448 "Sending shutdownMockServer request to plugin"
449 );
450 let response = mock_server.plugin.shutdown_mock_server(request).await?;
451 debug!("Got response: {response:?}");
452
453 if response.ok {
454 Ok(vec![])
455 } else {
456 Ok(response.results.iter().map(|result| {
457 MockServerResults {
458 path: result.path.clone(),
459 error: result.error.clone(),
460 mismatches: result.mismatches.iter().map(|mismatch| {
461 ContentMismatch {
462 expected: mismatch.expected.as_ref()
463 .map(|e| from_utf8(&e).unwrap_or_default().to_string())
464 .unwrap_or_default(),
465 actual: mismatch.actual.as_ref()
466 .map(|a| from_utf8(&a).unwrap_or_default().to_string())
467 .unwrap_or_default(),
468 mismatch: mismatch.mismatch.clone(),
469 path: mismatch.path.clone(),
470 diff: optional_string(&mismatch.diff),
471 mismatch_type: optional_string(&mismatch.mismatch_type)
472 }
473 }).collect()
474 }
475 }).collect())
476 }
477}
478
479pub async fn get_mock_server_results(mock_server: &MockServerDetails) -> anyhow::Result<Vec<MockServerResults>> {
481 let request = MockServerRequest {
482 server_key: mock_server.key.to_string()
483 };
484
485 debug!(
486 plugin_name = mock_server.plugin.manifest.name.as_str(),
487 plugin_version = mock_server.plugin.manifest.version.as_str(),
488 server_key = mock_server.key.as_str(),
489 "Sending getMockServerResults request to plugin"
490 );
491 let response = mock_server.plugin.get_mock_server_results(request).await?;
492 debug!("Got response: {response:?}");
493
494 if response.ok {
495 Ok(vec![])
496 } else {
497 Ok(response.results.iter().map(|result| {
498 MockServerResults {
499 path: result.path.clone(),
500 error: result.error.clone(),
501 mismatches: result.mismatches.iter().map(|mismatch| {
502 ContentMismatch {
503 expected: mismatch.expected.as_ref()
504 .map(|e| from_utf8(&e).unwrap_or_default().to_string())
505 .unwrap_or_default(),
506 actual: mismatch.actual.as_ref()
507 .map(|a| from_utf8(&a).unwrap_or_default().to_string())
508 .unwrap_or_default(),
509 mismatch: mismatch.mismatch.clone(),
510 path: mismatch.path.clone(),
511 diff: optional_string(&mismatch.diff),
512 mismatch_type: optional_string(&mismatch.mismatch_type)
513 }
514 }).collect()
515 }
516 }).collect())
517 }
518}
519
520pub async fn prepare_validation_for_interaction(
523 transport_entry: &CatalogueEntry,
524 pact: &V4Pact,
525 interaction: &(dyn V4Interaction + Send + Sync),
526 context: &HashMap<String, Value>
527) -> anyhow::Result<InteractionVerificationData> {
528 let manifest = transport_entry.plugin.as_ref()
529 .ok_or_else(|| anyhow!("Transport catalogue entry did not have an associated plugin manifest"))?;
530 let plugin = lookup_plugin(&manifest.as_dependency())
531 .ok_or_else(|| anyhow!("Did not find a running plugin for manifest {:?}", manifest))?;
532
533 prepare_validation_for_interaction_inner(&plugin, manifest, pact, interaction, context).await
534}
535
536pub(crate) async fn prepare_validation_for_interaction_inner(
537 plugin: &(dyn PactPluginRpc + Send + Sync),
538 manifest: &PactPluginManifest,
539 pact: &V4Pact,
540 interaction: &(dyn V4Interaction + Send + Sync),
541 context: &HashMap<String, Value>
542) -> anyhow::Result<InteractionVerificationData> {
543 let mut pact = pact.clone();
544 pact.interactions = pact.interactions.iter()
545 .map(|i| {
546 if i.key().is_none() {
547 i.with_unique_key()
548 } else {
549 i.boxed_v4()
550 }
551 })
552 .collect();
553 let request = VerificationPreparationRequest {
554 pact: pact.to_json(PactSpecification::V4)?.to_string(),
555 interaction_key: interaction.unique_key(),
556 config: Some(to_proto_struct(context))
557 };
558
559 debug!(plugin_name = manifest.name.as_str(), plugin_version = manifest.version.as_str(),
560 "Sending prepareValidationForInteraction request to plugin");
561 let response = plugin.prepare_interaction_for_verification(request).await?;
562 debug!("Got response: {response:?}");
563
564 let validation_response = response.response
565 .ok_or_else(|| anyhow!("Did not get a valid response from the prepare interaction for verification call"))?;
566 match &validation_response {
567 verification_preparation_response::Response::Error(err) => Err(anyhow!("Failed to prepare the request: {}", err)),
568 verification_preparation_response::Response::InteractionData(data) => {
569 let content_type = data.body.as_ref().and_then(|body| ContentType::parse(body.content_type.as_str()).ok());
570 Ok(InteractionVerificationData {
571 request_data: data.body.as_ref()
572 .and_then(|body| body.content.as_ref())
573 .map(|body| OptionalBody::Present(Bytes::from(body.clone()), content_type, None)).unwrap_or_default(),
574 metadata: data.metadata.iter().map(|(k, v)| {
575 let value = match &v.value {
576 Some(v) => match &v {
577 metadata_value::Value::NonBinaryValue(v) => Either::Left(proto_value_to_json(v)),
578 metadata_value::Value::BinaryValue(b) => Either::Right(Bytes::from(b.clone()))
579 }
580 None => Either::Left(Value::Null)
581 };
582 (k.clone(), value)
583 }).collect()
584 })
585 }
586 }
587}
588
589pub async fn verify_interaction(
591 transport_entry: &CatalogueEntry,
592 verification_data: &InteractionVerificationData,
593 config: &HashMap<String, Value>,
594 pact: &V4Pact,
595 interaction: &(dyn V4Interaction + Send + Sync)
596) -> anyhow::Result<InteractionVerificationResult> {
597 let manifest = transport_entry.plugin.as_ref()
598 .ok_or_else(|| anyhow!("Transport catalogue entry did not have an associated plugin manifest"))?;
599 let plugin = lookup_plugin(&manifest.as_dependency())
600 .ok_or_else(|| anyhow!("Did not find a running plugin for manifest {:?}", manifest))?;
601
602 verify_interaction_inner(
603 &plugin,
604 &manifest,
605 verification_data,
606 config,
607 pact,
608 interaction
609 ).await
610}
611
612pub(crate) async fn verify_interaction_inner(
613 plugin: &(dyn PactPluginRpc + Send + Sync),
614 manifest: &PactPluginManifest,
615 verification_data: &InteractionVerificationData,
616 config: &HashMap<String, Value>,
617 pact: &V4Pact,
618 interaction: &(dyn V4Interaction + Send + Sync)
619) -> anyhow::Result<InteractionVerificationResult> {
620 let mut pact = pact.clone();
621 pact.interactions = pact.interactions.iter()
622 .map(|i| {
623 if i.key().is_none() {
624 i.with_unique_key()
625 } else {
626 i.boxed_v4()
627 }
628 })
629 .collect();
630 let request = VerifyInteractionRequest {
631 pact: pact.to_json(PactSpecification::V4)?.to_string(),
632 interaction_key: interaction.unique_key(),
633 config: Some(to_proto_struct(config)),
634 interaction_data: Some(InteractionData {
635 body: Some((&verification_data.request_data).into()),
636 metadata: verification_data.metadata.iter().map(|(k, v)| {
637 (k.clone(), MetadataValue { value: Some(match v {
638 Either::Left(value) => metadata_value::Value::NonBinaryValue(to_proto_value(value)),
639 Either::Right(b) => metadata_value::Value::BinaryValue(b.to_vec())
640 }) })
641 }).collect()
642 })
643 };
644
645 debug!(plugin_name = manifest.name.as_str(), plugin_version = manifest.version.as_str(),
646 "Sending verifyInteraction request to plugin");
647 let response = plugin.verify_interaction(request).await?;
648 debug!("Got response: {response:?}");
649
650 let validation_response = response.response
651 .ok_or_else(|| anyhow!("Did not get a valid response from the verification call"))?;
652 match &validation_response {
653 verify_interaction_response::Response::Error(err) => Err(anyhow!("Failed to verify the request: {}", err)),
654 verify_interaction_response::Response::Result(data) => Ok(data.into())
655 }
656}
657
658pub async fn install_plugin_from_url(
661 http_client: &Client,
662 source_url: &str
663) -> anyhow::Result<PactPluginManifest> {
664 let response = fetch_json_from_url(source_url, http_client).await?;
665 if let Some(map) = response.as_object() {
666 if let Some(tag) = map.get("tag_name") {
667 let tag = json_to_string(tag);
668 debug!(%tag, "Found tag");
669 let url = if source_url.ends_with("/latest") {
670 source_url.strip_suffix("/latest").unwrap_or(source_url)
671 } else {
672 let suffix = format!("/tag/{}", tag);
673 source_url.strip_suffix(suffix.as_str()).unwrap_or(source_url)
674 };
675 let manifest_json = download_json_from_github(&http_client, url, &tag, "pact-plugin.json")
676 .await.context("Downloading manifest file from GitHub")?;
677 let manifest: PactPluginManifest = serde_json::from_value(manifest_json)
678 .context("Failed to parsing JSON manifest file from GitHub")?;
679 debug!(?manifest, "Loaded manifest from GitHub");
680
681 debug!("Installing plugin {} version {}", manifest.name, manifest.version);
682 let plugin_dir = create_plugin_dir(&manifest)
683 .context("Failed to creating plugins directory")?;
684 download_plugin_executable(&manifest, &plugin_dir, &http_client, url, &tag, false).await?;
685
686 Ok(PactPluginManifest {
687 plugin_dir: plugin_dir.to_string_lossy().to_string(),
688 .. manifest
689 })
690 } else {
691 bail!("GitHub release page does not have a valid tag_name attribute");
692 }
693 } else {
694 bail!("Response from source is not a valid JSON from a GitHub release page")
695 }
696}
697
698fn create_plugin_dir(manifest: &PactPluginManifest) -> anyhow::Result<PathBuf> {
699 let plugins_dir = pact_plugin_dir()?;
700 if !plugins_dir.exists() {
701 info!(plugins_dir = %plugins_dir.display(), "Creating plugins directory");
702 fs::create_dir_all(plugins_dir.clone())?;
703 }
704
705 let plugin_dir = plugins_dir.join(format!("{}-{}", manifest.name, manifest.version));
706 info!(plugin_dir = %plugin_dir.display(), "Creating plugin directory");
707 fs::create_dir(plugin_dir.clone())?;
708
709 info!("Writing plugin manifest file");
710 let file_name = plugin_dir.join("pact-plugin.json");
711 let mut f = File::create(file_name)?;
712 let json = serde_json::to_string(manifest)?;
713 f.write_all(json.as_bytes())?;
714
715 Ok(plugin_dir.clone())
716}
717
718#[cfg(test)]
719mod tests {
720 use std::fs::{self, File};
721
722 use maplit::hashmap;
723 use pact_models::v4::sync_message::SynchronousMessage;
724 use pact_models::prelude::v4::V4Pact;
725 use pact_models::v4::interaction::V4Interaction;
726
727 use expectest::prelude::*;
728 use tempdir::TempDir;
729
730 use crate::plugin_models::PluginDependency;
731 use crate::plugin_manager::prepare_validation_for_interaction_inner;
732 use crate::plugin_manager::verify_interaction_inner;
733 use crate::plugin_models::tests::MockPlugin;
734 use crate::verification::InteractionVerificationData;
735
736 use super::{
737 load_manifest_from_dir,
738 PactPluginManifest
739 };
740
741 #[test]
742 fn load_manifest_from_dir_test() {
743 let tmp_dir = TempDir::new("load_manifest_from_dir").unwrap();
744
745 let manifest_1 = PactPluginManifest {
746 name: "test-plugin".to_string(),
747 version: "0.1.5".to_string(),
748 .. PactPluginManifest::default()
749 };
750 let path_1 = tmp_dir.path().join("1");
751 fs::create_dir_all(&path_1).unwrap();
752 let file_1 = File::create(path_1.join("pact-plugin.json")).unwrap();
753 serde_json::to_writer(file_1, &manifest_1).unwrap();
754
755 let manifest_2 = PactPluginManifest {
756 name: "test-plugin".to_string(),
757 version: "0.1.20".to_string(),
758 .. PactPluginManifest::default()
759 };
760 let path_2 = tmp_dir.path().join("2");
761 fs::create_dir_all(&path_2).unwrap();
762 let file_2 = File::create(path_2.join("pact-plugin.json")).unwrap();
763 serde_json::to_writer(file_2, &manifest_2).unwrap();
764
765 let manifest_3 = PactPluginManifest {
766 name: "test-plugin".to_string(),
767 version: "0.1.7".to_string(),
768 .. PactPluginManifest::default()
769 };
770 let path_3 = tmp_dir.path().join("3");
771 fs::create_dir_all(&path_3).unwrap();
772 let file_3 = File::create(path_3.join("pact-plugin.json")).unwrap();
773 serde_json::to_writer(file_3, &manifest_3).unwrap();
774
775 let manifest_4 = PactPluginManifest {
776 name: "test-plugin".to_string(),
777 version: "0.1.14".to_string(),
778 .. PactPluginManifest::default()
779 };
780 let path_4 = tmp_dir.path().join("4");
781 fs::create_dir_all(&path_4).unwrap();
782 let file_4 = File::create(path_4.join("pact-plugin.json")).unwrap();
783 serde_json::to_writer(file_4, &manifest_4).unwrap();
784
785 let manifest_5 = PactPluginManifest {
786 name: "test-plugin".to_string(),
787 version: "0.1.12".to_string(),
788 .. PactPluginManifest::default()
789 };
790 let path_5 = tmp_dir.path().join("5");
791 fs::create_dir_all(&path_5).unwrap();
792 let file_5 = File::create(path_5.join("pact-plugin.json")).unwrap();
793 serde_json::to_writer(file_5, &manifest_5).unwrap();
794
795 let dep = PluginDependency {
796 name: "test-plugin".to_string(),
797 version: None,
798 dependency_type: Default::default()
799 };
800
801 let result = load_manifest_from_dir(&dep, &tmp_dir.path().to_path_buf()).unwrap();
802 expect!(result.version).to(be_equal_to("0.1.20"));
803 }
804
805 #[test_log::test(tokio::test)]
806 async fn prepare_validation_for_interaction_passes_in_pact_with_interaction_keys_set() {
807 let manifest = PactPluginManifest {
808 name: "test-plugin".to_string(),
809 version: "0.0.0".to_string(),
810 .. PactPluginManifest::default()
811 };
812 let mock_plugin = MockPlugin::default();
813
814 let interaction = SynchronousMessage {
815 .. SynchronousMessage::default()
816 };
817 let pact = V4Pact {
818 interactions: vec![ interaction.boxed_v4() ],
819 .. V4Pact::default()
820 };
821 let context = hashmap!{};
822
823 let result = prepare_validation_for_interaction_inner(
824 &mock_plugin,
825 &manifest,
826 &pact,
827 &interaction,
828 &context
829 ).await;
830
831 expect!(result).to(be_ok());
832 let request = {
833 let r = mock_plugin.prepare_request.read().unwrap();
834 r.clone()
835 };
836 let pact_in = V4Pact::pact_from_json(&serde_json::from_str(request.pact.as_str()).unwrap(), "").unwrap();
837 expect!(pact_in.interactions[0].key().unwrap()).to(be_equal_to(request.interaction_key));
838 }
839
840 #[test_log::test(tokio::test)]
841 async fn prepare_validation_for_interaction_handles_pact_with_keys_already_set() {
842 let manifest = PactPluginManifest {
843 name: "test-plugin".to_string(),
844 version: "0.0.0".to_string(),
845 .. PactPluginManifest::default()
846 };
847 let mock_plugin = MockPlugin::default();
848
849 let interaction = SynchronousMessage {
850 key: Some("1234567890".to_string()),
851 .. SynchronousMessage::default()
852 };
853 let pact = V4Pact {
854 interactions: vec![ interaction.boxed_v4() ],
855 .. V4Pact::default()
856 };
857 let context = hashmap!{};
858
859 let result = prepare_validation_for_interaction_inner(
860 &mock_plugin,
861 &manifest,
862 &pact,
863 &interaction,
864 &context
865 ).await;
866
867 expect!(result).to(be_ok());
868 let request = {
869 let r = mock_plugin.prepare_request.read().unwrap();
870 r.clone()
871 };
872 let pact_in = V4Pact::pact_from_json(&serde_json::from_str(request.pact.as_str()).unwrap(), "").unwrap();
873 expect!(request.interaction_key.as_str()).to(be_equal_to("1234567890"));
874 expect!(pact_in.interactions[0].key().unwrap()).to(be_equal_to(request.interaction_key));
875 }
876
877 #[test_log::test(tokio::test)]
878 async fn verify_interaction_passes_in_pact_with_interaction_keys_set() {
879 let manifest = PactPluginManifest {
880 name: "test-plugin".to_string(),
881 version: "0.0.0".to_string(),
882 .. PactPluginManifest::default()
883 };
884 let mock_plugin = MockPlugin::default();
885
886 let interaction = SynchronousMessage {
887 .. SynchronousMessage::default()
888 };
889 let pact = V4Pact {
890 interactions: vec![ interaction.boxed_v4() ],
891 .. V4Pact::default()
892 };
893 let context = hashmap!{};
894 let data = InteractionVerificationData::default();
895
896 let result = verify_interaction_inner(
897 &mock_plugin,
898 &manifest,
899 &data,
900 &context,
901 &pact,
902 &interaction
903 ).await;
904
905 expect!(result).to(be_ok());
906 let request = {
907 let r = mock_plugin.verify_request.read().unwrap();
908 r.clone()
909 };
910 let pact_in = V4Pact::pact_from_json(&serde_json::from_str(request.pact.as_str()).unwrap(), "").unwrap();
911 expect!(pact_in.interactions[0].key().unwrap()).to(be_equal_to(request.interaction_key));
912 }
913
914 #[test_log::test(tokio::test)]
915 async fn verify_interaction_handles_interaction_with_key_already_set() {
916 let manifest = PactPluginManifest {
917 name: "test-plugin".to_string(),
918 version: "0.0.0".to_string(),
919 .. PactPluginManifest::default()
920 };
921 let mock_plugin = MockPlugin::default();
922
923 let interaction = SynchronousMessage {
924 key: Some("1234567890".to_string()),
925 .. SynchronousMessage::default()
926 };
927 let pact = V4Pact {
928 interactions: vec![ interaction.boxed_v4() ],
929 .. V4Pact::default()
930 };
931 let context = hashmap!{};
932 let data = InteractionVerificationData::default();
933
934 let result = verify_interaction_inner(
935 &mock_plugin,
936 &manifest,
937 &data,
938 &context,
939 &pact,
940 &interaction
941 ).await;
942
943 expect!(result).to(be_ok());
944 let request = {
945 let r = mock_plugin.verify_request.read().unwrap();
946 r.clone()
947 };
948 let pact_in = V4Pact::pact_from_json(&serde_json::from_str(request.pact.as_str()).unwrap(), "").unwrap();
949 expect!(request.interaction_key.as_str()).to(be_equal_to("1234567890"));
950 expect!(pact_in.interactions[0].key().unwrap()).to(be_equal_to(request.interaction_key));
951 }
952}