pact_plugin_driver/
plugin_manager.rs

1//! Manages interactions with Pact plugins
2use std::collections::HashMap;
3use std::env;
4use std::fs;
5use std::fs::File;
6use std::io::{BufReader, Write};
7use std::path::PathBuf;
8use std::process::Stdio;
9use std::process::Command;
10use std::str::from_utf8;
11use std::str::FromStr;
12use std::sync::Mutex;
13use std::thread;
14
15use anyhow::{anyhow, bail, Context};
16use bytes::Bytes;
17use itertools::Either;
18use lazy_static::lazy_static;
19use log::max_level;
20use maplit::hashmap;
21use os_info::Type;
22use pact_models::bodies::OptionalBody;
23use pact_models::json_utils::json_to_string;
24use pact_models::PactSpecification;
25use pact_models::prelude::{ContentType, Pact};
26use pact_models::prelude::v4::V4Pact;
27use pact_models::v4::interaction::V4Interaction;
28use reqwest::Client;
29use semver::Version;
30use serde_json::Value;
31use sysinfo::{Pid,System};
32use tracing::{debug, info, trace, warn};
33
34use crate::catalogue_manager::{all_entries, CatalogueEntry, register_plugin_entries, remove_plugin_entries};
35use crate::child_process::ChildPluginProcess;
36use crate::content::ContentMismatch;
37use crate::download::{download_json_from_github, download_plugin_executable, fetch_json_from_url};
38use crate::metrics::send_metrics;
39use crate::mock_server::{MockServerConfig, MockServerDetails, MockServerResults};
40use crate::plugin_models::{PactPlugin, PactPluginManifest, PactPluginRpc, PluginDependency};
41use crate::proto::*;
42use crate::repository::{fetch_repository_index, USER_AGENT};
43use crate::utils::{optional_string, proto_value_to_json, to_proto_struct, to_proto_value, versions_compatible};
44use crate::verification::{InteractionVerificationData, InteractionVerificationResult};
45
46lazy_static! {
47  static ref PLUGIN_MANIFEST_REGISTER: Mutex<HashMap<String, PactPluginManifest>> = Mutex::new(HashMap::new());
48  static ref PLUGIN_REGISTER: Mutex<HashMap<String, PactPlugin>> = Mutex::new(HashMap::new());
49}
50
51/// Load the plugin defined by the dependency information. Will first look in the global
52/// plugin registry.
53pub async fn load_plugin(plugin: &PluginDependency) -> anyhow::Result<PactPlugin> {
54  let thread_id = thread::current().id();
55  debug!("Loading plugin {:?}", plugin);
56  trace!("Rust plugin driver version {}", option_env!("CARGO_PKG_VERSION").unwrap_or_default());
57  trace!("load_plugin {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
58  let mut inner = PLUGIN_REGISTER.lock().unwrap();
59  trace!("load_plugin {:?}: Got PLUGIN_REGISTER lock", thread_id);
60  let result = match lookup_plugin_inner(plugin, &mut inner) {
61    Some(plugin) => {
62      debug!("Found running plugin {:?}", plugin);
63      plugin.update_access();
64      Ok(plugin.clone())
65    },
66    None => {
67      debug!("Did not find plugin, will attempt to start it");
68      let manifest = match load_plugin_manifest(plugin) {
69        Ok(manifest) => manifest,
70        Err(err) => {
71          warn!("Could not load plugin manifest from disk, will try auto install it: {}", err);
72          let http_client = reqwest::ClientBuilder::new()
73            .user_agent(USER_AGENT)
74            .build()?;
75          let index = fetch_repository_index(&http_client, None).await?;
76          match index.lookup_plugin_version(&plugin.name, &plugin.version) {
77            Some(entry) => {
78              info!("Found an entry for the plugin in the plugin index, will try install that");
79              install_plugin_from_url(&http_client, entry.source.value().as_str()).await?
80            }
81            None => Err(err)?
82          }
83        }
84      };
85      send_metrics(&manifest);
86      initialise_plugin(&manifest, &mut inner).await
87    }
88  };
89  trace!("load_plugin {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
90  result
91}
92
93fn lookup_plugin_inner<'a>(
94  plugin: &PluginDependency,
95  plugin_register: &'a mut HashMap<String, PactPlugin>
96) -> Option<&'a mut PactPlugin> {
97  if let Some(version) = &plugin.version {
98    plugin_register.get_mut(format!("{}/{}", plugin.name, version).as_str())
99  } else {
100    plugin_register.iter_mut()
101      .filter(|(_, value)| value.manifest.name == plugin.name)
102      .max_by(|(_, v1), (_, v2)| v1.manifest.version.cmp(&v2.manifest.version))
103      .map(|(_, plugin)| plugin)
104  }
105}
106
107/// Look up the plugin in the global plugin register
108pub fn lookup_plugin(plugin: &PluginDependency) -> Option<PactPlugin> {
109  let thread_id = thread::current().id();
110  trace!("lookup_plugin {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
111  let mut inner = PLUGIN_REGISTER.lock().unwrap();
112  trace!("lookup_plugin {:?}: Got PLUGIN_REGISTER lock", thread_id);
113  let entry = lookup_plugin_inner(plugin, &mut inner);
114  trace!("lookup_plugin {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
115  entry.cloned()
116}
117
118/// Return the plugin manifest for the given plugin. Will first look in the global plugin manifest
119/// registry.
120pub fn load_plugin_manifest(plugin_dep: &PluginDependency) -> anyhow::Result<PactPluginManifest> {
121  debug!("Loading plugin manifest for plugin {:?}", plugin_dep);
122  match lookup_plugin_manifest(plugin_dep) {
123    Some(manifest) => Ok(manifest),
124    None => load_manifest_from_disk(plugin_dep)
125  }
126}
127
128fn load_manifest_from_disk(plugin_dep: &PluginDependency) -> anyhow::Result<PactPluginManifest> {
129  let plugin_dir = pact_plugin_dir()?;
130  debug!("Looking for plugin in {:?}", plugin_dir);
131
132  if plugin_dir.exists() {
133    load_manifest_from_dir(plugin_dep, &plugin_dir)
134  } else {
135    Err(anyhow!("Plugin directory {:?} does not exist", plugin_dir))
136  }
137}
138
139fn load_manifest_from_dir(plugin_dep: &PluginDependency, plugin_dir: &PathBuf) -> anyhow::Result<PactPluginManifest> {
140  let mut manifests = vec![];
141  for entry in fs::read_dir(plugin_dir)? {
142    let path = entry?.path();
143    trace!("Found: {:?}", path);
144
145    if path.is_dir() {
146      let manifest_file = path.join("pact-plugin.json");
147      if manifest_file.exists() && manifest_file.is_file() {
148        debug!("Found plugin manifest: {:?}", manifest_file);
149        let file = File::open(manifest_file)?;
150        let reader = BufReader::new(file);
151        let manifest: PactPluginManifest = serde_json::from_reader(reader)?;
152        trace!("Parsed plugin manifest: {:?}", manifest);
153        let version = manifest.version.clone();
154        if manifest.name == plugin_dep.name && versions_compatible(version.as_str(), &plugin_dep.version) {
155          let manifest = PactPluginManifest {
156            plugin_dir: path.to_string_lossy().to_string(),
157            ..manifest
158          };
159          manifests.push(manifest);
160        }
161      }
162    }
163  }
164
165  let manifest = manifests.iter()
166    .max_by(|a, b| {
167      let a = Version::parse(&a.version).unwrap_or_else(|_| Version::new(0, 0, 0));
168      let b = Version::parse(&b.version).unwrap_or_else(|_| Version::new(0, 0, 0));
169      a.cmp(&b)
170    });
171  if let Some(manifest) = manifest {
172    let key = format!("{}/{}", manifest.name, manifest.version);
173    {
174      let mut guard = PLUGIN_MANIFEST_REGISTER.lock().unwrap();
175      guard.insert(key.clone(), manifest.clone());
176    }
177    Ok(manifest.clone())
178  } else {
179    Err(anyhow!("Plugin {} was not found (in $HOME/.pact/plugins or $PACT_PLUGIN_DIR)", plugin_dep))
180  }
181}
182
183pub(crate) fn pact_plugin_dir() -> anyhow::Result<PathBuf> {
184  let env_var = env::var_os("PACT_PLUGIN_DIR");
185  let plugin_dir = env_var.unwrap_or_default();
186  let plugin_dir = plugin_dir.to_string_lossy();
187  if plugin_dir.is_empty() {
188    home::home_dir().map(|dir| dir.join(".pact").join("plugins"))
189  } else {
190    PathBuf::from_str(plugin_dir.as_ref()).ok()
191  }.ok_or_else(|| anyhow!("No Pact plugin directory was found (in $HOME/.pact/plugins or $PACT_PLUGIN_DIR)"))
192}
193
194/// Lookup the plugin manifest in the global plugin manifest registry.
195pub fn lookup_plugin_manifest(plugin: &PluginDependency) -> Option<PactPluginManifest> {
196  let guard = PLUGIN_MANIFEST_REGISTER.lock().unwrap();
197  if let Some(version) = &plugin.version {
198    let key = format!("{}/{}", plugin.name, version);
199    guard.get(&key).cloned()
200  } else {
201    guard.iter()
202      .filter(|(_, value)| value.name == plugin.name)
203      .max_by(|(_, v1), (_, v2)| v1.version.cmp(&v2.version))
204      .map(|(_, p)| p.clone())
205  }
206}
207
208async fn initialise_plugin(
209  manifest: &PactPluginManifest,
210  plugin_register: &mut HashMap<String, PactPlugin>
211) -> anyhow::Result<PactPlugin> {
212  match manifest.executable_type.as_str() {
213    "exec" => {
214      let mut plugin = start_plugin_process(manifest).await?;
215      debug!("Plugin process started OK (port = {}), sending init message", plugin.port());
216
217      init_handshake(manifest, &mut plugin).await.map_err(|err| {
218        plugin.kill();
219        anyhow!("Failed to send init request to the plugin - {}", err)
220      })?;
221
222      let key = format!("{}/{}", manifest.name, manifest.version);
223      plugin_register.insert(key, plugin.clone());
224
225      Ok(plugin)
226    }
227    _ => Err(anyhow!("Plugin executable type of {} is not supported", manifest.executable_type))
228  }
229}
230
231/// Internal function: public for testing
232pub async fn init_handshake(manifest: &PactPluginManifest, plugin: &mut (dyn PactPluginRpc + Send + Sync)) -> anyhow::Result<()> {
233  let request = InitPluginRequest {
234    implementation: "plugin-driver-rust".to_string(),
235    version: option_env!("CARGO_PKG_VERSION").unwrap_or("0").to_string()
236  };
237  let response = plugin.init_plugin(request).await?;
238  debug!("Got init response {:?} from plugin {}", response, manifest.name);
239  register_plugin_entries(manifest, &response.catalogue);
240  tokio::task::spawn(publish_updated_catalogue());
241  Ok(())
242}
243
244async fn start_plugin_process(manifest: &PactPluginManifest) -> anyhow::Result<PactPlugin> {
245  debug!("Starting plugin with manifest {:?}", manifest);
246
247  let os_info = os_info::get();
248  debug!("Detected OS: {}", os_info);
249  let mut path = if let Some(entry_point) = manifest.entry_points.get(&os_info.to_string()) {
250    PathBuf::from(entry_point)
251  } else if os_info.os_type() == Type::Windows && manifest.entry_points.contains_key("windows") {
252    PathBuf::from(manifest.entry_points.get("windows").unwrap())
253  } else {
254    PathBuf::from(&manifest.entry_point)
255  };
256  if !path.is_absolute() || !path.exists() {
257    path = PathBuf::from(manifest.plugin_dir.clone()).join(path);
258  }
259  debug!("Starting plugin using {:?}", &path);
260
261  let log_level = max_level();
262  let mut child_command = Command::new(path.clone());
263  let mut child_command = child_command
264    .env("LOG_LEVEL", log_level.to_string())
265    .env("RUST_LOG", log_level.to_string())
266    .current_dir(manifest.plugin_dir.clone());
267
268  if let Some(args) = &manifest.args {
269    child_command = child_command.args(args);
270  }
271
272  let child = child_command
273    .stdout(Stdio::piped())
274    .stderr(Stdio::piped())
275    .spawn()
276    .map_err(|err| anyhow!("Was not able to start plugin process for '{}' - {}",
277      path.to_string_lossy(), err))?;
278  let child_pid = child.id();
279  debug!("Plugin {} started with PID {}", manifest.name, child_pid);
280
281  match ChildPluginProcess::new(child, manifest).await {
282    Ok(child) => Ok(PactPlugin::new(manifest, child)),
283    Err(err) => {
284      let mut s = System::new();
285      s.refresh_processes();
286      if let Some(process) = s.process(Pid::from_u32(child_pid)) {
287        #[cfg(not(windows))]
288        process.kill();
289        // revert windows specific logic once https://github.com/GuillaumeGomez/sysinfo/pull/1341/files is merged/released
290        #[cfg(windows)]
291        let _ = Command::new("taskkill.exe").arg("/PID").arg(child_pid.to_string()).arg("/F").arg("/T").output();
292      } else {
293        warn!("Child process with PID {} was not found", child_pid);
294      }
295      Err(err)
296    }
297  }
298}
299
300/// Shut down all plugin processes
301pub fn shutdown_plugins() {
302  let thread_id = thread::current().id();
303  debug!("Shutting down all plugins");
304  trace!("shutdown_plugins {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
305  let mut guard = PLUGIN_REGISTER.lock().unwrap();
306  trace!("shutdown_plugins {:?}: Got PLUGIN_REGISTER lock", thread_id);
307  for plugin in guard.values() {
308    debug!("Shutting down plugin {:?}", plugin);
309    plugin.kill();
310    remove_plugin_entries(&plugin.manifest.name);
311  }
312  guard.clear();
313  trace!("shutdown_plugins {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
314}
315
316/// Shutdown the given plugin
317pub fn shutdown_plugin(plugin: &mut PactPlugin) {
318  debug!("Shutting down plugin {}:{}", plugin.manifest.name, plugin.manifest.version);
319  plugin.kill();
320  remove_plugin_entries(&plugin.manifest.name);
321}
322
323/// Publish the current catalogue to all plugins
324pub async fn publish_updated_catalogue() {
325  let thread_id = thread::current().id();
326
327  let request = Catalogue {
328    catalogue: all_entries().iter()
329      .map(|entry| crate::proto::CatalogueEntry {
330        r#type: entry.entry_type.to_proto_type() as i32,
331        key: entry.key.clone(),
332        values: entry.values.clone()
333      }).collect()
334  };
335
336  let plugins = {
337    trace!("publish_updated_catalogue {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
338    let inner = PLUGIN_REGISTER.lock().unwrap();
339    trace!("publish_updated_catalogue {:?}: Got PLUGIN_REGISTER lock", thread_id);
340    let plugins = inner.values().cloned().collect::<Vec<_>>();
341    trace!("publish_updated_catalogue {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
342    plugins
343  };
344
345  for plugin in plugins {
346    if let Err(err) = plugin.update_catalogue(request.clone()).await {
347      warn!("Failed to send updated catalogue to plugin '{}' - {}", plugin.manifest.name, err);
348    }
349  }
350}
351
352/// Increment access to the plugin.
353#[tracing::instrument]
354pub fn increment_plugin_access(plugin: &PluginDependency) {
355  let thread_id = thread::current().id();
356
357  trace!("increment_plugin_access {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
358  let mut inner = PLUGIN_REGISTER.lock().unwrap();
359  trace!("increment_plugin_access {:?}: Got PLUGIN_REGISTER lock", thread_id);
360
361  if let Some(plugin) = lookup_plugin_inner(plugin, &mut inner) {
362    plugin.update_access();
363  }
364
365  trace!("increment_plugin_access {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
366}
367
368/// Decrement access to the plugin. If the current access count is zero, shut down the plugin
369#[tracing::instrument]
370pub fn drop_plugin_access(plugin: &PluginDependency) {
371  let thread_id = thread::current().id();
372
373  trace!("drop_plugin_access {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
374  let mut inner = PLUGIN_REGISTER.lock().unwrap();
375  trace!("drop_plugin_access {:?}: Got PLUGIN_REGISTER lock", thread_id);
376
377  if let Some(plugin) = lookup_plugin_inner(plugin, &mut inner) {
378    let key = format!("{}/{}", plugin.manifest.name, plugin.manifest.version);
379    if plugin.drop_access() == 0 {
380      shutdown_plugin(plugin);
381      inner.remove(key.as_str());
382    }
383  }
384
385  trace!("drop_plugin_access {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
386}
387
388/// Starts a mock server given the catalog entry for it and a Pact
389#[deprecated(note = "Use start_mock_server_v2 which takes a test context map", since = "0.2.2")]
390pub async fn start_mock_server(
391  catalogue_entry: &CatalogueEntry,
392  pact: Box<dyn Pact + Send + Sync>,
393  config: MockServerConfig
394) -> anyhow::Result<MockServerDetails> {
395  start_mock_server_v2(catalogue_entry, pact, config, hashmap!{}).await
396}
397
398/// Starts a mock server given the catalog entry for it and a Pact.
399/// Any transport specific configuration must be passed in the `test_context` field under
400/// the `transport_config` key. Note that the next version of the interface will pass the
401/// transport specific configuration in its own field.
402pub async fn start_mock_server_v2(
403  catalogue_entry: &CatalogueEntry,
404  pact: Box<dyn Pact + Send + Sync>,
405  config: MockServerConfig,
406  test_context: HashMap<String, Value>
407) -> anyhow::Result<MockServerDetails> {
408  let manifest = catalogue_entry.plugin.as_ref()
409    .ok_or_else(|| anyhow!("Catalogue entry did not have an associated plugin manifest"))?;
410  let plugin = lookup_plugin(&manifest.as_dependency())
411    .ok_or_else(|| anyhow!("Did not find a running plugin for manifest {:?}", manifest))?;
412
413  debug!(plugin_name = manifest.name.as_str(), plugin_version = manifest.version.as_str(),
414    ?test_context, "Sending startMockServer request to plugin");
415  let request = StartMockServerRequest {
416    host_interface: config.host_interface.unwrap_or_default(),
417    port: config.port,
418    tls: config.tls,
419    pact: pact.to_json(PactSpecification::V4)?.to_string(),
420    test_context: Some(to_proto_struct(&test_context))
421  };
422  let response = plugin.start_mock_server(request).await?;
423  debug!("Got response ${response:?}");
424
425  let mock_server_response = response.response
426    .ok_or_else(|| anyhow!("Did not get a valid response from the start mock server call"))?;
427  match mock_server_response {
428    start_mock_server_response::Response::Error(err) => Err(anyhow!("Mock server failed to start: {}", err)),
429    start_mock_server_response::Response::Details(details) => Ok(MockServerDetails {
430      key: details.key.clone(),
431      base_url: details.address.clone(),
432      port: details.port,
433      plugin
434    })
435  }
436}
437
438/// Shutdowns a running mock server. Will return any errors from the mock server.
439pub async fn shutdown_mock_server(mock_server: &MockServerDetails) -> anyhow::Result<Vec<MockServerResults>> {
440  let request = ShutdownMockServerRequest {
441    server_key: mock_server.key.to_string()
442  };
443
444  debug!(
445    plugin_name = mock_server.plugin.manifest.name.as_str(),
446    plugin_version = mock_server.plugin.manifest.version.as_str(),
447    server_key = mock_server.key.as_str(),
448    "Sending shutdownMockServer request to plugin"
449  );
450  let response = mock_server.plugin.shutdown_mock_server(request).await?;
451  debug!("Got response: {response:?}");
452
453  if response.ok {
454    Ok(vec![])
455  } else {
456    Ok(response.results.iter().map(|result| {
457      MockServerResults {
458        path: result.path.clone(),
459        error: result.error.clone(),
460        mismatches: result.mismatches.iter().map(|mismatch| {
461          ContentMismatch {
462            expected: mismatch.expected.as_ref()
463              .map(|e| from_utf8(&e).unwrap_or_default().to_string())
464              .unwrap_or_default(),
465            actual: mismatch.actual.as_ref()
466              .map(|a| from_utf8(&a).unwrap_or_default().to_string())
467              .unwrap_or_default(),
468            mismatch: mismatch.mismatch.clone(),
469            path: mismatch.path.clone(),
470            diff: optional_string(&mismatch.diff),
471            mismatch_type: optional_string(&mismatch.mismatch_type)
472          }
473        }).collect()
474      }
475    }).collect())
476  }
477}
478
479/// Gets the results from a running mock server.
480pub async fn get_mock_server_results(mock_server: &MockServerDetails) -> anyhow::Result<Vec<MockServerResults>> {
481  let request = MockServerRequest {
482    server_key: mock_server.key.to_string()
483  };
484
485  debug!(
486    plugin_name = mock_server.plugin.manifest.name.as_str(),
487    plugin_version = mock_server.plugin.manifest.version.as_str(),
488    server_key = mock_server.key.as_str(),
489    "Sending getMockServerResults request to plugin"
490  );
491  let response = mock_server.plugin.get_mock_server_results(request).await?;
492  debug!("Got response: {response:?}");
493
494  if response.ok {
495    Ok(vec![])
496  } else {
497    Ok(response.results.iter().map(|result| {
498      MockServerResults {
499        path: result.path.clone(),
500        error: result.error.clone(),
501        mismatches: result.mismatches.iter().map(|mismatch| {
502          ContentMismatch {
503            expected: mismatch.expected.as_ref()
504              .map(|e| from_utf8(&e).unwrap_or_default().to_string())
505              .unwrap_or_default(),
506            actual: mismatch.actual.as_ref()
507              .map(|a| from_utf8(&a).unwrap_or_default().to_string())
508              .unwrap_or_default(),
509            mismatch: mismatch.mismatch.clone(),
510            path: mismatch.path.clone(),
511            diff: optional_string(&mismatch.diff),
512            mismatch_type: optional_string(&mismatch.mismatch_type)
513          }
514        }).collect()
515      }
516    }).collect())
517  }
518}
519
520/// Sets up a transport request to be made. This is the first phase when verifying, and it allows the
521/// users to add additional values to any requests that are made.
522pub async fn prepare_validation_for_interaction(
523  transport_entry: &CatalogueEntry,
524  pact: &V4Pact,
525  interaction: &(dyn V4Interaction + Send + Sync),
526  context: &HashMap<String, Value>
527) -> anyhow::Result<InteractionVerificationData> {
528  let manifest = transport_entry.plugin.as_ref()
529    .ok_or_else(|| anyhow!("Transport catalogue entry did not have an associated plugin manifest"))?;
530  let plugin = lookup_plugin(&manifest.as_dependency())
531    .ok_or_else(|| anyhow!("Did not find a running plugin for manifest {:?}", manifest))?;
532
533  prepare_validation_for_interaction_inner(&plugin, manifest, pact, interaction, context).await
534}
535
536pub(crate) async fn prepare_validation_for_interaction_inner(
537  plugin: &(dyn PactPluginRpc + Send + Sync),
538  manifest: &PactPluginManifest,
539  pact: &V4Pact,
540  interaction: &(dyn V4Interaction + Send + Sync),
541  context: &HashMap<String, Value>
542) -> anyhow::Result<InteractionVerificationData> {
543  let mut pact = pact.clone();
544  pact.interactions = pact.interactions.iter()
545    .map(|i| {
546      if i.key().is_none() {
547        i.with_unique_key()
548      } else {
549        i.boxed_v4()
550      }
551    })
552    .collect();
553  let request = VerificationPreparationRequest {
554    pact: pact.to_json(PactSpecification::V4)?.to_string(),
555    interaction_key: interaction.unique_key(),
556    config: Some(to_proto_struct(context))
557  };
558
559  debug!(plugin_name = manifest.name.as_str(), plugin_version = manifest.version.as_str(),
560    "Sending prepareValidationForInteraction request to plugin");
561  let response = plugin.prepare_interaction_for_verification(request).await?;
562  debug!("Got response: {response:?}");
563
564  let validation_response = response.response
565    .ok_or_else(|| anyhow!("Did not get a valid response from the prepare interaction for verification call"))?;
566  match &validation_response {
567    verification_preparation_response::Response::Error(err) => Err(anyhow!("Failed to prepare the request: {}", err)),
568    verification_preparation_response::Response::InteractionData(data) => {
569      let content_type = data.body.as_ref().and_then(|body| ContentType::parse(body.content_type.as_str()).ok());
570      Ok(InteractionVerificationData {
571        request_data: data.body.as_ref()
572          .and_then(|body| body.content.as_ref())
573          .map(|body| OptionalBody::Present(Bytes::from(body.clone()), content_type, None)).unwrap_or_default(),
574        metadata: data.metadata.iter().map(|(k, v)| {
575          let value = match &v.value {
576            Some(v) => match &v {
577              metadata_value::Value::NonBinaryValue(v) => Either::Left(proto_value_to_json(v)),
578              metadata_value::Value::BinaryValue(b) => Either::Right(Bytes::from(b.clone()))
579            }
580            None => Either::Left(Value::Null)
581          };
582          (k.clone(), value)
583        }).collect()
584      })
585    }
586  }
587}
588
589/// Executes the verification of the interaction that was configured with the prepare_validation_for_interaction call
590pub async fn verify_interaction(
591  transport_entry: &CatalogueEntry,
592  verification_data: &InteractionVerificationData,
593  config: &HashMap<String, Value>,
594  pact: &V4Pact,
595  interaction: &(dyn V4Interaction + Send + Sync)
596) -> anyhow::Result<InteractionVerificationResult> {
597  let manifest = transport_entry.plugin.as_ref()
598    .ok_or_else(|| anyhow!("Transport catalogue entry did not have an associated plugin manifest"))?;
599  let plugin = lookup_plugin(&manifest.as_dependency())
600    .ok_or_else(|| anyhow!("Did not find a running plugin for manifest {:?}", manifest))?;
601
602  verify_interaction_inner(
603    &plugin,
604    &manifest,
605    verification_data,
606    config,
607    pact,
608    interaction
609  ).await
610}
611
612pub(crate) async fn verify_interaction_inner(
613  plugin: &(dyn PactPluginRpc + Send + Sync),
614  manifest: &PactPluginManifest,
615  verification_data: &InteractionVerificationData,
616  config: &HashMap<String, Value>,
617  pact: &V4Pact,
618  interaction: &(dyn V4Interaction + Send + Sync)
619) -> anyhow::Result<InteractionVerificationResult> {
620  let mut pact = pact.clone();
621  pact.interactions = pact.interactions.iter()
622    .map(|i| {
623      if i.key().is_none() {
624        i.with_unique_key()
625      } else {
626        i.boxed_v4()
627      }
628    })
629    .collect();
630  let request = VerifyInteractionRequest {
631    pact: pact.to_json(PactSpecification::V4)?.to_string(),
632    interaction_key: interaction.unique_key(),
633    config: Some(to_proto_struct(config)),
634    interaction_data: Some(InteractionData {
635      body: Some((&verification_data.request_data).into()),
636      metadata: verification_data.metadata.iter().map(|(k, v)| {
637        (k.clone(), MetadataValue { value: Some(match v {
638          Either::Left(value) => metadata_value::Value::NonBinaryValue(to_proto_value(value)),
639          Either::Right(b) => metadata_value::Value::BinaryValue(b.to_vec())
640        }) })
641      }).collect()
642    })
643  };
644
645  debug!(plugin_name = manifest.name.as_str(), plugin_version = manifest.version.as_str(),
646    "Sending verifyInteraction request to plugin");
647  let response = plugin.verify_interaction(request).await?;
648  debug!("Got response: {response:?}");
649
650  let validation_response = response.response
651    .ok_or_else(|| anyhow!("Did not get a valid response from the verification call"))?;
652  match &validation_response {
653    verify_interaction_response::Response::Error(err) => Err(anyhow!("Failed to verify the request: {}", err)),
654    verify_interaction_response::Response::Result(data) => Ok(data.into())
655  }
656}
657
658/// Tries to download and install the plugin from the given URL, returning the manifest for the
659/// plugin if successful.
660pub async fn install_plugin_from_url(
661  http_client: &Client,
662  source_url: &str
663) -> anyhow::Result<PactPluginManifest> {
664  let response = fetch_json_from_url(source_url, http_client).await?;
665  if let Some(map) = response.as_object() {
666    if let Some(tag) = map.get("tag_name") {
667      let tag = json_to_string(tag);
668      debug!(%tag, "Found tag");
669      let url = if source_url.ends_with("/latest") {
670        source_url.strip_suffix("/latest").unwrap_or(source_url)
671      } else {
672        let suffix = format!("/tag/{}", tag);
673        source_url.strip_suffix(suffix.as_str()).unwrap_or(source_url)
674      };
675      let manifest_json = download_json_from_github(&http_client, url, &tag, "pact-plugin.json")
676        .await.context("Downloading manifest file from GitHub")?;
677      let manifest: PactPluginManifest = serde_json::from_value(manifest_json)
678        .context("Failed to parsing JSON manifest file from GitHub")?;
679      debug!(?manifest, "Loaded manifest from GitHub");
680
681      debug!("Installing plugin {} version {}", manifest.name, manifest.version);
682      let plugin_dir = create_plugin_dir(&manifest)
683        .context("Failed to creating plugins directory")?;
684      download_plugin_executable(&manifest, &plugin_dir, &http_client, url, &tag, false).await?;
685
686      Ok(PactPluginManifest {
687        plugin_dir: plugin_dir.to_string_lossy().to_string(),
688        .. manifest
689      })
690    } else {
691      bail!("GitHub release page does not have a valid tag_name attribute");
692    }
693  } else {
694    bail!("Response from source is not a valid JSON from a GitHub release page")
695  }
696}
697
698fn create_plugin_dir(manifest: &PactPluginManifest) -> anyhow::Result<PathBuf> {
699  let plugins_dir = pact_plugin_dir()?;
700  if !plugins_dir.exists() {
701    info!(plugins_dir = %plugins_dir.display(), "Creating plugins directory");
702    fs::create_dir_all(plugins_dir.clone())?;
703  }
704
705  let plugin_dir = plugins_dir.join(format!("{}-{}", manifest.name, manifest.version));
706  info!(plugin_dir = %plugin_dir.display(), "Creating plugin directory");
707  fs::create_dir(plugin_dir.clone())?;
708
709  info!("Writing plugin manifest file");
710  let file_name = plugin_dir.join("pact-plugin.json");
711  let mut f = File::create(file_name)?;
712  let json = serde_json::to_string(manifest)?;
713  f.write_all(json.as_bytes())?;
714
715  Ok(plugin_dir.clone())
716}
717
718#[cfg(test)]
719mod tests {
720  use std::fs::{self, File};
721
722  use maplit::hashmap;
723  use pact_models::v4::sync_message::SynchronousMessage;
724  use pact_models::prelude::v4::V4Pact;
725  use pact_models::v4::interaction::V4Interaction;
726
727  use expectest::prelude::*;
728  use tempdir::TempDir;
729
730  use crate::plugin_models::PluginDependency;
731  use crate::plugin_manager::prepare_validation_for_interaction_inner;
732  use crate::plugin_manager::verify_interaction_inner;
733  use crate::plugin_models::tests::MockPlugin;
734  use crate::verification::InteractionVerificationData;
735
736  use super::{
737    load_manifest_from_dir,
738    PactPluginManifest
739  };
740
741  #[test]
742  fn load_manifest_from_dir_test() {
743    let tmp_dir = TempDir::new("load_manifest_from_dir").unwrap();
744
745    let manifest_1 = PactPluginManifest {
746      name: "test-plugin".to_string(),
747      version: "0.1.5".to_string(),
748      .. PactPluginManifest::default()
749    };
750    let path_1 = tmp_dir.path().join("1");
751    fs::create_dir_all(&path_1).unwrap();
752    let file_1 = File::create(path_1.join("pact-plugin.json")).unwrap();
753    serde_json::to_writer(file_1, &manifest_1).unwrap();
754
755    let manifest_2 = PactPluginManifest {
756      name: "test-plugin".to_string(),
757      version: "0.1.20".to_string(),
758      .. PactPluginManifest::default()
759    };
760    let path_2 = tmp_dir.path().join("2");
761    fs::create_dir_all(&path_2).unwrap();
762    let file_2 = File::create(path_2.join("pact-plugin.json")).unwrap();
763    serde_json::to_writer(file_2, &manifest_2).unwrap();
764
765    let manifest_3 = PactPluginManifest {
766      name: "test-plugin".to_string(),
767      version: "0.1.7".to_string(),
768      .. PactPluginManifest::default()
769    };
770    let path_3 = tmp_dir.path().join("3");
771    fs::create_dir_all(&path_3).unwrap();
772    let file_3 = File::create(path_3.join("pact-plugin.json")).unwrap();
773    serde_json::to_writer(file_3, &manifest_3).unwrap();
774
775    let manifest_4 = PactPluginManifest {
776      name: "test-plugin".to_string(),
777      version: "0.1.14".to_string(),
778      .. PactPluginManifest::default()
779    };
780    let path_4 = tmp_dir.path().join("4");
781    fs::create_dir_all(&path_4).unwrap();
782    let file_4 = File::create(path_4.join("pact-plugin.json")).unwrap();
783    serde_json::to_writer(file_4, &manifest_4).unwrap();
784
785    let manifest_5 = PactPluginManifest {
786      name: "test-plugin".to_string(),
787      version: "0.1.12".to_string(),
788      .. PactPluginManifest::default()
789    };
790    let path_5 = tmp_dir.path().join("5");
791    fs::create_dir_all(&path_5).unwrap();
792    let file_5 = File::create(path_5.join("pact-plugin.json")).unwrap();
793    serde_json::to_writer(file_5, &manifest_5).unwrap();
794
795    let dep = PluginDependency {
796      name: "test-plugin".to_string(),
797      version: None,
798      dependency_type: Default::default()
799    };
800
801    let result = load_manifest_from_dir(&dep, &tmp_dir.path().to_path_buf()).unwrap();
802    expect!(result.version).to(be_equal_to("0.1.20"));
803  }
804
805  #[test_log::test(tokio::test)]
806  async fn prepare_validation_for_interaction_passes_in_pact_with_interaction_keys_set() {
807    let manifest = PactPluginManifest {
808      name: "test-plugin".to_string(),
809      version: "0.0.0".to_string(),
810      .. PactPluginManifest::default()
811    };
812    let mock_plugin = MockPlugin::default();
813
814    let interaction = SynchronousMessage {
815      .. SynchronousMessage::default()
816    };
817    let pact = V4Pact {
818      interactions: vec![ interaction.boxed_v4() ],
819      .. V4Pact::default()
820    };
821    let context = hashmap!{};
822
823    let result = prepare_validation_for_interaction_inner(
824      &mock_plugin,
825      &manifest,
826      &pact,
827      &interaction,
828      &context
829    ).await;
830
831    expect!(result).to(be_ok());
832    let request = {
833      let r = mock_plugin.prepare_request.read().unwrap();
834      r.clone()
835    };
836    let pact_in = V4Pact::pact_from_json(&serde_json::from_str(request.pact.as_str()).unwrap(), "").unwrap();
837    expect!(pact_in.interactions[0].key().unwrap()).to(be_equal_to(request.interaction_key));
838  }
839
840  #[test_log::test(tokio::test)]
841  async fn prepare_validation_for_interaction_handles_pact_with_keys_already_set() {
842    let manifest = PactPluginManifest {
843      name: "test-plugin".to_string(),
844      version: "0.0.0".to_string(),
845      .. PactPluginManifest::default()
846    };
847    let mock_plugin = MockPlugin::default();
848
849    let interaction = SynchronousMessage {
850      key: Some("1234567890".to_string()),
851      .. SynchronousMessage::default()
852    };
853    let pact = V4Pact {
854      interactions: vec![ interaction.boxed_v4() ],
855      .. V4Pact::default()
856    };
857    let context = hashmap!{};
858
859    let result = prepare_validation_for_interaction_inner(
860      &mock_plugin,
861      &manifest,
862      &pact,
863      &interaction,
864      &context
865    ).await;
866
867    expect!(result).to(be_ok());
868    let request = {
869      let r = mock_plugin.prepare_request.read().unwrap();
870      r.clone()
871    };
872    let pact_in = V4Pact::pact_from_json(&serde_json::from_str(request.pact.as_str()).unwrap(), "").unwrap();
873    expect!(request.interaction_key.as_str()).to(be_equal_to("1234567890"));
874    expect!(pact_in.interactions[0].key().unwrap()).to(be_equal_to(request.interaction_key));
875  }
876
877  #[test_log::test(tokio::test)]
878  async fn verify_interaction_passes_in_pact_with_interaction_keys_set() {
879    let manifest = PactPluginManifest {
880      name: "test-plugin".to_string(),
881      version: "0.0.0".to_string(),
882      .. PactPluginManifest::default()
883    };
884    let mock_plugin = MockPlugin::default();
885
886    let interaction = SynchronousMessage {
887      .. SynchronousMessage::default()
888    };
889    let pact = V4Pact {
890      interactions: vec![ interaction.boxed_v4() ],
891      .. V4Pact::default()
892    };
893    let context = hashmap!{};
894    let data = InteractionVerificationData::default();
895
896    let result = verify_interaction_inner(
897      &mock_plugin,
898      &manifest,
899      &data,
900      &context,
901      &pact,
902      &interaction
903    ).await;
904
905    expect!(result).to(be_ok());
906    let request = {
907      let r = mock_plugin.verify_request.read().unwrap();
908      r.clone()
909    };
910    let pact_in = V4Pact::pact_from_json(&serde_json::from_str(request.pact.as_str()).unwrap(), "").unwrap();
911    expect!(pact_in.interactions[0].key().unwrap()).to(be_equal_to(request.interaction_key));
912  }
913
914  #[test_log::test(tokio::test)]
915  async fn verify_interaction_handles_interaction_with_key_already_set() {
916    let manifest = PactPluginManifest {
917      name: "test-plugin".to_string(),
918      version: "0.0.0".to_string(),
919      .. PactPluginManifest::default()
920    };
921    let mock_plugin = MockPlugin::default();
922
923    let interaction = SynchronousMessage {
924      key: Some("1234567890".to_string()),
925      .. SynchronousMessage::default()
926    };
927    let pact = V4Pact {
928      interactions: vec![ interaction.boxed_v4() ],
929      .. V4Pact::default()
930    };
931    let context = hashmap!{};
932    let data = InteractionVerificationData::default();
933
934    let result = verify_interaction_inner(
935      &mock_plugin,
936      &manifest,
937      &data,
938      &context,
939      &pact,
940      &interaction
941    ).await;
942
943    expect!(result).to(be_ok());
944    let request = {
945      let r = mock_plugin.verify_request.read().unwrap();
946      r.clone()
947    };
948    let pact_in = V4Pact::pact_from_json(&serde_json::from_str(request.pact.as_str()).unwrap(), "").unwrap();
949    expect!(request.interaction_key.as_str()).to(be_equal_to("1234567890"));
950    expect!(pact_in.interactions[0].key().unwrap()).to(be_equal_to(request.interaction_key));
951  }
952}