pact_plugin_driver/
plugin_manager.rs

1//! Manages interactions with Pact plugins
2use std::collections::HashMap;
3use std::env;
4use std::fs;
5use std::fs::File;
6use std::io::{BufReader, Write};
7use std::path::PathBuf;
8use std::process::Stdio;
9use std::process::Command;
10use std::str::from_utf8;
11use std::str::FromStr;
12use std::sync::Mutex;
13use std::thread;
14
15use anyhow::{anyhow, bail, Context};
16use bytes::Bytes;
17use itertools::Either;
18use lazy_static::lazy_static;
19use log::max_level;
20use maplit::hashmap;
21use os_info::Type;
22use pact_models::bodies::OptionalBody;
23use pact_models::json_utils::json_to_string;
24use pact_models::PactSpecification;
25use pact_models::prelude::{ContentType, Pact};
26use pact_models::prelude::v4::V4Pact;
27use pact_models::v4::interaction::V4Interaction;
28use reqwest::Client;
29use semver::Version;
30use serde_json::Value;
31use sysinfo::{Pid,System};
32use tracing::{debug, info, trace, warn};
33
34use crate::catalogue_manager::{all_entries, CatalogueEntry, register_plugin_entries, remove_plugin_entries};
35use crate::child_process::ChildPluginProcess;
36use crate::content::ContentMismatch;
37use crate::download::{download_json_from_github, download_plugin_executable, fetch_json_from_url};
38use crate::metrics::send_metrics;
39use crate::mock_server::{MockServerConfig, MockServerDetails, MockServerResults};
40use crate::plugin_models::{PactPlugin, PactPluginManifest, PactPluginRpc, PluginDependency};
41use crate::proto::*;
42use crate::repository::{fetch_repository_index, USER_AGENT};
43use crate::utils::{optional_string, proto_value_to_json, to_proto_struct, to_proto_value, versions_compatible};
44use crate::verification::{InteractionVerificationData, InteractionVerificationResult};
45
46lazy_static! {
47  static ref PLUGIN_MANIFEST_REGISTER: Mutex<HashMap<String, PactPluginManifest>> = Mutex::new(HashMap::new());
48  static ref PLUGIN_REGISTER: Mutex<HashMap<String, PactPlugin>> = Mutex::new(HashMap::new());
49}
50
51/// Load the plugin defined by the dependency information. Will first look in the global
52/// plugin registry.
53pub async fn load_plugin(plugin: &PluginDependency) -> anyhow::Result<PactPlugin> {
54  let thread_id = thread::current().id();
55  debug!("Loading plugin {:?}", plugin);
56  trace!("Rust plugin driver version {}", option_env!("CARGO_PKG_VERSION").unwrap_or_default());
57  trace!("load_plugin {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
58  let mut inner = PLUGIN_REGISTER.lock().unwrap();
59  trace!("load_plugin {:?}: Got PLUGIN_REGISTER lock", thread_id);
60  let result = match lookup_plugin_inner(plugin, &mut inner) {
61    Some(plugin) => {
62      debug!("Found running plugin {:?}", plugin);
63      plugin.update_access();
64      Ok(plugin.clone())
65    },
66    None => {
67      debug!("Did not find plugin, will attempt to start it");
68      let manifest = match load_plugin_manifest(plugin) {
69        Ok(manifest) => manifest,
70        Err(err) => {
71          warn!("Could not load plugin manifest from disk, will try auto install it: {}", err);
72          let http_client = reqwest::ClientBuilder::new()
73            .user_agent(USER_AGENT)
74            .build()?;
75          let index = fetch_repository_index(&http_client, None).await?;
76          match index.lookup_plugin_version(&plugin.name, &plugin.version) {
77            Some(entry) => {
78              info!("Found an entry for the plugin in the plugin index, will try install that");
79              install_plugin_from_url(&http_client, entry.source.value().as_str()).await?
80            }
81            None => Err(err)?
82          }
83        }
84      };
85      send_metrics(&manifest);
86      initialise_plugin(&manifest, &mut inner).await
87    }
88  };
89  trace!("load_plugin {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
90  result
91}
92
93fn lookup_plugin_inner<'a>(
94  plugin: &PluginDependency,
95  plugin_register: &'a mut HashMap<String, PactPlugin>
96) -> Option<&'a mut PactPlugin> {
97  if let Some(version) = &plugin.version {
98    plugin_register.get_mut(format!("{}/{}", plugin.name, version).as_str())
99  } else {
100    plugin_register.iter_mut()
101      .filter(|(_, value)| value.manifest.name == plugin.name)
102      .max_by(|(_, v1), (_, v2)| v1.manifest.version.cmp(&v2.manifest.version))
103      .map(|(_, plugin)| plugin)
104  }
105}
106
107/// Look up the plugin in the global plugin register
108pub fn lookup_plugin(plugin: &PluginDependency) -> Option<PactPlugin> {
109  let thread_id = thread::current().id();
110  trace!("lookup_plugin {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
111  let mut inner = PLUGIN_REGISTER.lock().unwrap();
112  trace!("lookup_plugin {:?}: Got PLUGIN_REGISTER lock", thread_id);
113  let entry = lookup_plugin_inner(plugin, &mut inner);
114  trace!("lookup_plugin {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
115  entry.cloned()
116}
117
118/// Return the plugin manifest for the given plugin. Will first look in the global plugin manifest
119/// registry.
120pub fn load_plugin_manifest(plugin_dep: &PluginDependency) -> anyhow::Result<PactPluginManifest> {
121  debug!("Loading plugin manifest for plugin {:?}", plugin_dep);
122  match lookup_plugin_manifest(plugin_dep) {
123    Some(manifest) => Ok(manifest),
124    None => load_manifest_from_disk(plugin_dep)
125  }
126}
127
128fn load_manifest_from_disk(plugin_dep: &PluginDependency) -> anyhow::Result<PactPluginManifest> {
129  let plugin_dir = pact_plugin_dir()?;
130  debug!("Looking for plugin in {:?}", plugin_dir);
131
132  if plugin_dir.exists() {
133    load_manifest_from_dir(plugin_dep, &plugin_dir)
134  } else {
135    Err(anyhow!("Plugin directory {:?} does not exist", plugin_dir))
136  }
137}
138
139fn load_manifest_from_dir(plugin_dep: &PluginDependency, plugin_dir: &PathBuf) -> anyhow::Result<PactPluginManifest> {
140  let mut manifests = vec![];
141  for entry in fs::read_dir(plugin_dir)? {
142    let path = entry?.path();
143    trace!("Found: {:?}", path);
144
145    if path.is_dir() {
146      let manifest_file = path.join("pact-plugin.json");
147      if manifest_file.exists() && manifest_file.is_file() {
148        debug!("Found plugin manifest: {:?}", manifest_file);
149        let file = File::open(manifest_file)?;
150        let reader = BufReader::new(file);
151        let manifest: PactPluginManifest = serde_json::from_reader(reader)?;
152        trace!("Parsed plugin manifest: {:?}", manifest);
153        let version = manifest.version.clone();
154        if manifest.name == plugin_dep.name && versions_compatible(version.as_str(), &plugin_dep.version) {
155          let manifest = PactPluginManifest {
156            plugin_dir: path.to_string_lossy().to_string(),
157            ..manifest
158          };
159          manifests.push(manifest);
160        }
161      }
162    }
163  }
164
165  let manifest = manifests.iter()
166    .max_by(|a, b| {
167      let a = Version::parse(&a.version).unwrap_or_else(|_| Version::new(0, 0, 0));
168      let b = Version::parse(&b.version).unwrap_or_else(|_| Version::new(0, 0, 0));
169      a.cmp(&b)
170    });
171  if let Some(manifest) = manifest {
172    let key = format!("{}/{}", manifest.name, manifest.version);
173    {
174      let mut guard = PLUGIN_MANIFEST_REGISTER.lock().unwrap();
175      guard.insert(key.clone(), manifest.clone());
176    }
177    Ok(manifest.clone())
178  } else {
179    Err(anyhow!("Plugin {} was not found (in $HOME/.pact/plugins or $PACT_PLUGIN_DIR)", plugin_dep))
180  }
181}
182
183pub(crate) fn pact_plugin_dir() -> anyhow::Result<PathBuf> {
184  let env_var = env::var_os("PACT_PLUGIN_DIR");
185  let plugin_dir = env_var.unwrap_or_default();
186  let plugin_dir = plugin_dir.to_string_lossy();
187  if plugin_dir.is_empty() {
188    home::home_dir().map(|dir| dir.join(".pact").join("plugins"))
189  } else {
190    PathBuf::from_str(plugin_dir.as_ref()).ok()
191  }.ok_or_else(|| anyhow!("No Pact plugin directory was found (in $HOME/.pact/plugins or $PACT_PLUGIN_DIR)"))
192}
193
194/// Lookup the plugin manifest in the global plugin manifest registry.
195pub fn lookup_plugin_manifest(plugin: &PluginDependency) -> Option<PactPluginManifest> {
196  let guard = PLUGIN_MANIFEST_REGISTER.lock().unwrap();
197  if let Some(version) = &plugin.version {
198    let key = format!("{}/{}", plugin.name, version);
199    guard.get(&key).cloned()
200  } else {
201    guard.iter()
202      .filter(|(_, value)| value.name == plugin.name)
203      .max_by(|(_, v1), (_, v2)| v1.version.cmp(&v2.version))
204      .map(|(_, p)| p.clone())
205  }
206}
207
208async fn initialise_plugin(
209  manifest: &PactPluginManifest,
210  plugin_register: &mut HashMap<String, PactPlugin>
211) -> anyhow::Result<PactPlugin> {
212  match manifest.executable_type.as_str() {
213    "exec" => {
214      let mut plugin = start_plugin_process(manifest).await?;
215      debug!("Plugin process started OK (port = {}), sending init message", plugin.port());
216
217      init_handshake(manifest, &mut plugin).await.map_err(|err| {
218        plugin.kill();
219        anyhow!("Failed to send init request to the plugin - {}", err)
220      })?;
221
222      let key = format!("{}/{}", manifest.name, manifest.version);
223      plugin_register.insert(key, plugin.clone());
224
225      Ok(plugin)
226    }
227    _ => Err(anyhow!("Plugin executable type of {} is not supported", manifest.executable_type))
228  }
229}
230
231/// Internal function: public for testing
232pub async fn init_handshake(manifest: &PactPluginManifest, plugin: &mut (dyn PactPluginRpc + Send + Sync)) -> anyhow::Result<()> {
233  let request = InitPluginRequest {
234    implementation: "plugin-driver-rust".to_string(),
235    version: option_env!("CARGO_PKG_VERSION").unwrap_or("0").to_string()
236  };
237  let response = plugin.init_plugin(request).await?;
238  debug!("Got init response {:?} from plugin {}", response, manifest.name);
239  register_plugin_entries(manifest, &response.catalogue);
240  tokio::task::spawn(publish_updated_catalogue());
241  Ok(())
242}
243
244async fn start_plugin_process(manifest: &PactPluginManifest) -> anyhow::Result<PactPlugin> {
245  debug!("Starting plugin with manifest {:?}", manifest);
246
247  let os_info = os_info::get();
248  debug!("Detected OS: {}", os_info);
249  let mut path = if let Some(entry_point) = manifest.entry_points.get(&os_info.to_string()) {
250    PathBuf::from(entry_point)
251  } else if os_info.os_type() == Type::Windows && manifest.entry_points.contains_key("windows") {
252    PathBuf::from(manifest.entry_points.get("windows").unwrap())
253  } else {
254    PathBuf::from(&manifest.entry_point)
255  };
256  if !path.is_absolute() || !path.exists() {
257    path = PathBuf::from(manifest.plugin_dir.clone()).join(path);
258  }
259  debug!("Starting plugin using {:?}", &path);
260
261  let log_level = max_level();
262  let mut child_command = Command::new(path.clone());
263  let mut child_command = child_command
264    .env("LOG_LEVEL", log_level.to_string())
265    .env("RUST_LOG", log_level.to_string())
266    .current_dir(manifest.plugin_dir.clone());
267
268  if let Some(args) = &manifest.args {
269    child_command = child_command.args(args);
270  }
271
272  let child = child_command
273    .stdout(Stdio::piped())
274    .stderr(Stdio::piped())
275    .spawn()
276    .map_err(|err| anyhow!("Was not able to start plugin process for '{}' - {}",
277      path.to_string_lossy(), err))?;
278  let child_pid = child.id();
279  debug!("Plugin {} started with PID {}", manifest.name, child_pid);
280
281  match ChildPluginProcess::new(child, manifest).await {
282    Ok(child) => Ok(PactPlugin::new(manifest, child)),
283    Err(err) => {
284      let mut s = System::new();
285      s.refresh_processes();
286      if let Some(process) = s.process(Pid::from_u32(child_pid)) {
287        #[cfg(not(windows))]
288        process.kill();
289        // revert windows specific logic once https://github.com/GuillaumeGomez/sysinfo/pull/1341/files is merged/released
290        #[cfg(windows)]
291        let _ = Command::new("taskkill.exe").arg("/PID").arg(child_pid.to_string()).arg("/F").arg("/T").output();
292      } else {
293        warn!("Child process with PID {} was not found", child_pid);
294      }
295      Err(err)
296    }
297  }
298}
299
300/// Shut down all plugin processes
301pub fn shutdown_plugins() {
302  let thread_id = thread::current().id();
303  debug!("Shutting down all plugins");
304  trace!("shutdown_plugins {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
305  let mut guard = PLUGIN_REGISTER.lock().unwrap();
306  trace!("shutdown_plugins {:?}: Got PLUGIN_REGISTER lock", thread_id);
307  for plugin in guard.values() {
308    debug!("Shutting down plugin {:?}", plugin);
309    plugin.kill();
310    remove_plugin_entries(&plugin.manifest.name);
311  }
312  guard.clear();
313  trace!("shutdown_plugins {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
314}
315
316/// Shutdown the given plugin
317pub fn shutdown_plugin(plugin: &mut PactPlugin) {
318  debug!("Shutting down plugin {}:{}", plugin.manifest.name, plugin.manifest.version);
319  plugin.kill();
320  remove_plugin_entries(&plugin.manifest.name);
321}
322
323/// Publish the current catalogue to all plugins
324pub async fn publish_updated_catalogue() {
325  let thread_id = thread::current().id();
326
327  let request = Catalogue {
328    catalogue: all_entries().iter()
329      .map(|entry| crate::proto::CatalogueEntry {
330        r#type: entry.entry_type.to_proto_type() as i32,
331        key: entry.key.clone(),
332        values: entry.values.clone()
333      }).collect()
334  };
335
336  let plugins = {
337    trace!("publish_updated_catalogue {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
338    let inner = PLUGIN_REGISTER.lock().unwrap();
339    trace!("publish_updated_catalogue {:?}: Got PLUGIN_REGISTER lock", thread_id);
340    let plugins = inner.values().cloned().collect::<Vec<_>>();
341    trace!("publish_updated_catalogue {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
342    plugins
343  };
344
345  for plugin in plugins {
346    if let Err(err) = plugin.update_catalogue(request.clone()).await {
347      warn!("Failed to send updated catalogue to plugin '{}' - {}", plugin.manifest.name, err);
348    }
349  }
350}
351
352/// Increment access to the plugin.
353#[tracing::instrument]
354pub fn increment_plugin_access(plugin: &PluginDependency) {
355  let thread_id = thread::current().id();
356
357  trace!("increment_plugin_access {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
358  let mut inner = PLUGIN_REGISTER.lock().unwrap();
359  trace!("increment_plugin_access {:?}: Got PLUGIN_REGISTER lock", thread_id);
360
361  if let Some(plugin) = lookup_plugin_inner(plugin, &mut inner) {
362    plugin.update_access();
363  }
364
365  trace!("increment_plugin_access {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
366}
367
368/// Decrement access to the plugin. If the current access count is zero, shut down the plugin
369#[tracing::instrument]
370pub fn drop_plugin_access(plugin: &PluginDependency) {
371  let thread_id = thread::current().id();
372
373  trace!("drop_plugin_access {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
374  let mut inner = PLUGIN_REGISTER.lock().unwrap();
375  trace!("drop_plugin_access {:?}: Got PLUGIN_REGISTER lock", thread_id);
376
377  if let Some(plugin) = lookup_plugin_inner(plugin, &mut inner) {
378    let key = format!("{}/{}", plugin.manifest.name, plugin.manifest.version);
379    if plugin.drop_access() == 0 {
380      shutdown_plugin(plugin);
381      inner.remove(key.as_str());
382    }
383  }
384
385  trace!("drop_plugin_access {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
386}
387
388/// Starts a mock server given the catalog entry for it and a Pact
389#[deprecated(note = "Use start_mock_server_v2 which takes a test context map", since = "0.2.2")]
390pub async fn start_mock_server(
391  catalogue_entry: &CatalogueEntry,
392  pact: Box<dyn Pact + Send + Sync>,
393  config: MockServerConfig
394) -> anyhow::Result<MockServerDetails> {
395  start_mock_server_v2(catalogue_entry, pact, config, hashmap!{}).await
396}
397
398/// Starts a mock server given the catalog entry for it and a Pact
399pub async fn start_mock_server_v2(
400  catalogue_entry: &CatalogueEntry,
401  pact: Box<dyn Pact + Send + Sync>,
402  config: MockServerConfig,
403  test_context: HashMap<String, Value>
404) -> anyhow::Result<MockServerDetails> {
405  let manifest = catalogue_entry.plugin.as_ref()
406    .ok_or_else(|| anyhow!("Catalogue entry did not have an associated plugin manifest"))?;
407  let plugin = lookup_plugin(&manifest.as_dependency())
408    .ok_or_else(|| anyhow!("Did not find a running plugin for manifest {:?}", manifest))?;
409
410  debug!(plugin_name = manifest.name.as_str(), plugin_version = manifest.version.as_str(),
411    "Sending startMockServer request to plugin");
412  let request = StartMockServerRequest {
413    host_interface: config.host_interface.unwrap_or_default(),
414    port: config.port,
415    tls: config.tls,
416    pact: pact.to_json(PactSpecification::V4)?.to_string(),
417    test_context: Some(to_proto_struct(&test_context))
418  };
419  let response = plugin.start_mock_server(request).await?;
420  debug!("Got response ${response:?}");
421
422  let mock_server_response = response.response
423    .ok_or_else(|| anyhow!("Did not get a valid response from the start mock server call"))?;
424  match mock_server_response {
425    start_mock_server_response::Response::Error(err) => Err(anyhow!("Mock server failed to start: {}", err)),
426    start_mock_server_response::Response::Details(details) => Ok(MockServerDetails {
427      key: details.key.clone(),
428      base_url: details.address.clone(),
429      port: details.port,
430      plugin
431    })
432  }
433}
434
435/// Shutdowns a running mock server. Will return any errors from the mock server.
436pub async fn shutdown_mock_server(mock_server: &MockServerDetails) -> anyhow::Result<Vec<MockServerResults>> {
437  let request = ShutdownMockServerRequest {
438    server_key: mock_server.key.to_string()
439  };
440
441  debug!(
442    plugin_name = mock_server.plugin.manifest.name.as_str(),
443    plugin_version = mock_server.plugin.manifest.version.as_str(),
444    server_key = mock_server.key.as_str(),
445    "Sending shutdownMockServer request to plugin"
446  );
447  let response = mock_server.plugin.shutdown_mock_server(request).await?;
448  debug!("Got response: {response:?}");
449
450  if response.ok {
451    Ok(vec![])
452  } else {
453    Ok(response.results.iter().map(|result| {
454      MockServerResults {
455        path: result.path.clone(),
456        error: result.error.clone(),
457        mismatches: result.mismatches.iter().map(|mismatch| {
458          ContentMismatch {
459            expected: mismatch.expected.as_ref()
460              .map(|e| from_utf8(&e).unwrap_or_default().to_string())
461              .unwrap_or_default(),
462            actual: mismatch.actual.as_ref()
463              .map(|a| from_utf8(&a).unwrap_or_default().to_string())
464              .unwrap_or_default(),
465            mismatch: mismatch.mismatch.clone(),
466            path: mismatch.path.clone(),
467            diff: optional_string(&mismatch.diff),
468            mismatch_type: optional_string(&mismatch.mismatch_type)
469          }
470        }).collect()
471      }
472    }).collect())
473  }
474}
475
476/// Gets the results from a running mock server.
477pub async fn get_mock_server_results(mock_server: &MockServerDetails) -> anyhow::Result<Vec<MockServerResults>> {
478  let request = MockServerRequest {
479    server_key: mock_server.key.to_string()
480  };
481
482  debug!(
483    plugin_name = mock_server.plugin.manifest.name.as_str(),
484    plugin_version = mock_server.plugin.manifest.version.as_str(),
485    server_key = mock_server.key.as_str(),
486    "Sending getMockServerResults request to plugin"
487  );
488  let response = mock_server.plugin.get_mock_server_results(request).await?;
489  debug!("Got response: {response:?}");
490
491  if response.ok {
492    Ok(vec![])
493  } else {
494    Ok(response.results.iter().map(|result| {
495      MockServerResults {
496        path: result.path.clone(),
497        error: result.error.clone(),
498        mismatches: result.mismatches.iter().map(|mismatch| {
499          ContentMismatch {
500            expected: mismatch.expected.as_ref()
501              .map(|e| from_utf8(&e).unwrap_or_default().to_string())
502              .unwrap_or_default(),
503            actual: mismatch.actual.as_ref()
504              .map(|a| from_utf8(&a).unwrap_or_default().to_string())
505              .unwrap_or_default(),
506            mismatch: mismatch.mismatch.clone(),
507            path: mismatch.path.clone(),
508            diff: optional_string(&mismatch.diff),
509            mismatch_type: optional_string(&mismatch.mismatch_type)
510          }
511        }).collect()
512      }
513    }).collect())
514  }
515}
516
517/// Sets up a transport request to be made. This is the first phase when verifying, and it allows the
518/// users to add additional values to any requests that are made.
519pub async fn prepare_validation_for_interaction(
520  transport_entry: &CatalogueEntry,
521  pact: &V4Pact,
522  interaction: &(dyn V4Interaction + Send + Sync),
523  context: &HashMap<String, Value>
524) -> anyhow::Result<InteractionVerificationData> {
525  let manifest = transport_entry.plugin.as_ref()
526    .ok_or_else(|| anyhow!("Transport catalogue entry did not have an associated plugin manifest"))?;
527  let plugin = lookup_plugin(&manifest.as_dependency())
528    .ok_or_else(|| anyhow!("Did not find a running plugin for manifest {:?}", manifest))?;
529
530  prepare_validation_for_interaction_inner(&plugin, manifest, pact, interaction, context).await
531}
532
533pub(crate) async fn prepare_validation_for_interaction_inner(
534  plugin: &(dyn PactPluginRpc + Send + Sync),
535  manifest: &PactPluginManifest,
536  pact: &V4Pact,
537  interaction: &(dyn V4Interaction + Send + Sync),
538  context: &HashMap<String, Value>
539) -> anyhow::Result<InteractionVerificationData> {
540  let mut pact = pact.clone();
541  pact.interactions = pact.interactions.iter().map(|i| i.with_unique_key()).collect();
542  let request = VerificationPreparationRequest {
543    pact: pact.to_json(PactSpecification::V4)?.to_string(),
544    interaction_key: interaction.unique_key(),
545    config: Some(to_proto_struct(context))
546  };
547
548  debug!(plugin_name = manifest.name.as_str(), plugin_version = manifest.version.as_str(),
549    "Sending prepareValidationForInteraction request to plugin");
550  let response = plugin.prepare_interaction_for_verification(request).await?;
551  debug!("Got response: {response:?}");
552
553  let validation_response = response.response
554    .ok_or_else(|| anyhow!("Did not get a valid response from the prepare interaction for verification call"))?;
555  match &validation_response {
556    verification_preparation_response::Response::Error(err) => Err(anyhow!("Failed to prepare the request: {}", err)),
557    verification_preparation_response::Response::InteractionData(data) => {
558      let content_type = data.body.as_ref().and_then(|body| ContentType::parse(body.content_type.as_str()).ok());
559      Ok(InteractionVerificationData {
560        request_data: data.body.as_ref()
561          .and_then(|body| body.content.as_ref())
562          .map(|body| OptionalBody::Present(Bytes::from(body.clone()), content_type, None)).unwrap_or_default(),
563        metadata: data.metadata.iter().map(|(k, v)| {
564          let value = match &v.value {
565            Some(v) => match &v {
566              metadata_value::Value::NonBinaryValue(v) => Either::Left(proto_value_to_json(v)),
567              metadata_value::Value::BinaryValue(b) => Either::Right(Bytes::from(b.clone()))
568            }
569            None => Either::Left(Value::Null)
570          };
571          (k.clone(), value)
572        }).collect()
573      })
574    }
575  }
576}
577
578/// Executes the verification of the interaction that was configured with the prepare_validation_for_interaction call
579pub async fn verify_interaction(
580  transport_entry: &CatalogueEntry,
581  verification_data: &InteractionVerificationData,
582  config: &HashMap<String, Value>,
583  pact: &V4Pact,
584  interaction: &(dyn V4Interaction + Send + Sync)
585) -> anyhow::Result<InteractionVerificationResult> {
586  let manifest = transport_entry.plugin.as_ref()
587    .ok_or_else(|| anyhow!("Transport catalogue entry did not have an associated plugin manifest"))?;
588  let plugin = lookup_plugin(&manifest.as_dependency())
589    .ok_or_else(|| anyhow!("Did not find a running plugin for manifest {:?}", manifest))?;
590
591  verify_interaction_inner(
592    &plugin,
593    &manifest,
594    verification_data,
595    config,
596    pact,
597    interaction
598  ).await
599}
600
601pub(crate) async fn verify_interaction_inner(
602  plugin: &(dyn PactPluginRpc + Send + Sync),
603  manifest: &PactPluginManifest,
604  verification_data: &InteractionVerificationData,
605  config: &HashMap<String, Value>,
606  pact: &V4Pact,
607  interaction: &(dyn V4Interaction + Send + Sync)
608) -> anyhow::Result<InteractionVerificationResult> {
609  let mut pact = pact.clone();
610  pact.interactions = pact.interactions.iter().map(|i| i.with_unique_key()).collect();
611  let request = VerifyInteractionRequest {
612    pact: pact.to_json(PactSpecification::V4)?.to_string(),
613    interaction_key: interaction.unique_key(),
614    config: Some(to_proto_struct(config)),
615    interaction_data: Some(InteractionData {
616      body: Some((&verification_data.request_data).into()),
617      metadata: verification_data.metadata.iter().map(|(k, v)| {
618        (k.clone(), MetadataValue { value: Some(match v {
619          Either::Left(value) => metadata_value::Value::NonBinaryValue(to_proto_value(value)),
620          Either::Right(b) => metadata_value::Value::BinaryValue(b.to_vec())
621        }) })
622      }).collect()
623    })
624  };
625
626  debug!(plugin_name = manifest.name.as_str(), plugin_version = manifest.version.as_str(),
627    "Sending verifyInteraction request to plugin");
628  let response = plugin.verify_interaction(request).await?;
629  debug!("Got response: {response:?}");
630
631  let validation_response = response.response
632    .ok_or_else(|| anyhow!("Did not get a valid response from the verification call"))?;
633  match &validation_response {
634    verify_interaction_response::Response::Error(err) => Err(anyhow!("Failed to verify the request: {}", err)),
635    verify_interaction_response::Response::Result(data) => Ok(data.into())
636  }
637}
638
639/// Tries to download and install the plugin from the given URL, returning the manifest for the
640/// plugin if successful.
641pub async fn install_plugin_from_url(
642  http_client: &Client,
643  source_url: &str
644) -> anyhow::Result<PactPluginManifest> {
645  let response = fetch_json_from_url(source_url, http_client).await?;
646  if let Some(map) = response.as_object() {
647    if let Some(tag) = map.get("tag_name") {
648      let tag = json_to_string(tag);
649      debug!(%tag, "Found tag");
650      let url = if source_url.ends_with("/latest") {
651        source_url.strip_suffix("/latest").unwrap_or(source_url)
652      } else {
653        let suffix = format!("/tag/{}", tag);
654        source_url.strip_suffix(suffix.as_str()).unwrap_or(source_url)
655      };
656      let manifest_json = download_json_from_github(&http_client, url, &tag, "pact-plugin.json")
657        .await.context("Downloading manifest file from GitHub")?;
658      let manifest: PactPluginManifest = serde_json::from_value(manifest_json)
659        .context("Failed to parsing JSON manifest file from GitHub")?;
660      debug!(?manifest, "Loaded manifest from GitHub");
661
662      debug!("Installing plugin {} version {}", manifest.name, manifest.version);
663      let plugin_dir = create_plugin_dir(&manifest)
664        .context("Failed to creating plugins directory")?;
665      download_plugin_executable(&manifest, &plugin_dir, &http_client, url, &tag, false).await?;
666
667      Ok(PactPluginManifest {
668        plugin_dir: plugin_dir.to_string_lossy().to_string(),
669        .. manifest
670      })
671    } else {
672      bail!("GitHub release page does not have a valid tag_name attribute");
673    }
674  } else {
675    bail!("Response from source is not a valid JSON from a GitHub release page")
676  }
677}
678
679fn create_plugin_dir(manifest: &PactPluginManifest) -> anyhow::Result<PathBuf> {
680  let plugins_dir = pact_plugin_dir()?;
681  if !plugins_dir.exists() {
682    info!(plugins_dir = %plugins_dir.display(), "Creating plugins directory");
683    fs::create_dir_all(plugins_dir.clone())?;
684  }
685
686  let plugin_dir = plugins_dir.join(format!("{}-{}", manifest.name, manifest.version));
687  info!(plugin_dir = %plugin_dir.display(), "Creating plugin directory");
688  fs::create_dir(plugin_dir.clone())?;
689
690  info!("Writing plugin manifest file");
691  let file_name = plugin_dir.join("pact-plugin.json");
692  let mut f = File::create(file_name)?;
693  let json = serde_json::to_string(manifest)?;
694  f.write_all(json.as_bytes())?;
695
696  Ok(plugin_dir.clone())
697}
698
699#[cfg(test)]
700mod tests {
701  use std::fs::{self, File};
702
703  use maplit::hashmap;
704  use pact_models::v4::sync_message::SynchronousMessage;
705  use pact_models::prelude::v4::V4Pact;
706  use pact_models::v4::interaction::V4Interaction;
707
708  use expectest::prelude::*;
709  use tempdir::TempDir;
710
711  use crate::plugin_models::PluginDependency;
712  use crate::plugin_manager::prepare_validation_for_interaction_inner;
713  use crate::plugin_manager::verify_interaction_inner;
714  use crate::plugin_models::tests::MockPlugin;
715  use crate::plugin_models::tests::PREPARE_INTERACTION_FOR_VERIFICATION_ARG;
716  use crate::plugin_models::tests::VERIFY_INTERACTION_ARG;
717  use crate::verification::InteractionVerificationData;
718
719  use super::{
720    load_manifest_from_dir,
721    PactPluginManifest
722  };
723
724  #[test]
725  fn load_manifest_from_dir_test() {
726    let tmp_dir = TempDir::new("load_manifest_from_dir").unwrap();
727
728    let manifest_1 = PactPluginManifest {
729      name: "test-plugin".to_string(),
730      version: "0.1.5".to_string(),
731      .. PactPluginManifest::default()
732    };
733    let path_1 = tmp_dir.path().join("1");
734    fs::create_dir_all(&path_1).unwrap();
735    let file_1 = File::create(path_1.join("pact-plugin.json")).unwrap();
736    serde_json::to_writer(file_1, &manifest_1).unwrap();
737
738    let manifest_2 = PactPluginManifest {
739      name: "test-plugin".to_string(),
740      version: "0.1.20".to_string(),
741      .. PactPluginManifest::default()
742    };
743    let path_2 = tmp_dir.path().join("2");
744    fs::create_dir_all(&path_2).unwrap();
745    let file_2 = File::create(path_2.join("pact-plugin.json")).unwrap();
746    serde_json::to_writer(file_2, &manifest_2).unwrap();
747
748    let manifest_3 = PactPluginManifest {
749      name: "test-plugin".to_string(),
750      version: "0.1.7".to_string(),
751      .. PactPluginManifest::default()
752    };
753    let path_3 = tmp_dir.path().join("3");
754    fs::create_dir_all(&path_3).unwrap();
755    let file_3 = File::create(path_3.join("pact-plugin.json")).unwrap();
756    serde_json::to_writer(file_3, &manifest_3).unwrap();
757
758    let manifest_4 = PactPluginManifest {
759      name: "test-plugin".to_string(),
760      version: "0.1.14".to_string(),
761      .. PactPluginManifest::default()
762    };
763    let path_4 = tmp_dir.path().join("4");
764    fs::create_dir_all(&path_4).unwrap();
765    let file_4 = File::create(path_4.join("pact-plugin.json")).unwrap();
766    serde_json::to_writer(file_4, &manifest_4).unwrap();
767
768    let manifest_5 = PactPluginManifest {
769      name: "test-plugin".to_string(),
770      version: "0.1.12".to_string(),
771      .. PactPluginManifest::default()
772    };
773    let path_5 = tmp_dir.path().join("5");
774    fs::create_dir_all(&path_5).unwrap();
775    let file_5 = File::create(path_5.join("pact-plugin.json")).unwrap();
776    serde_json::to_writer(file_5, &manifest_5).unwrap();
777
778    let dep = PluginDependency {
779      name: "test-plugin".to_string(),
780      version: None,
781      dependency_type: Default::default()
782    };
783
784    let result = load_manifest_from_dir(&dep, &tmp_dir.path().to_path_buf()).unwrap();
785    expect!(result.version).to(be_equal_to("0.1.20"));
786  }
787
788  #[test_log::test(tokio::test)]
789  async fn prepare_validation_for_interaction_passes_in_pact_with_interaction_keys_set() {
790    let manifest = PactPluginManifest {
791      name: "test-plugin".to_string(),
792      version: "0.0.0".to_string(),
793      .. PactPluginManifest::default()
794    };
795    let mock_plugin = MockPlugin {
796      .. MockPlugin::default()
797    };
798
799    let interaction = SynchronousMessage {
800      .. SynchronousMessage::default()
801    };
802    let pact = V4Pact {
803      interactions: vec![ interaction.boxed_v4() ],
804      .. V4Pact::default()
805    };
806    let context = hashmap!{};
807
808    let result = prepare_validation_for_interaction_inner(
809      &mock_plugin,
810      &manifest,
811      &pact,
812      &interaction,
813      &context
814    ).await;
815
816    expect!(result).to(be_ok());
817    let request = {
818      let r = PREPARE_INTERACTION_FOR_VERIFICATION_ARG.read().unwrap();
819      r.clone()
820    }.unwrap();
821    let pact_in = V4Pact::pact_from_json(&serde_json::from_str(request.pact.as_str()).unwrap(), "").unwrap();
822    expect!(pact_in.interactions[0].key().unwrap()).to(be_equal_to(request.interaction_key));
823  }
824
825  #[test_log::test(tokio::test)]
826  async fn verify_interaction_passes_in_pact_with_interaction_keys_set() {
827    let manifest = PactPluginManifest {
828      name: "test-plugin".to_string(),
829      version: "0.0.0".to_string(),
830      .. PactPluginManifest::default()
831    };
832    let mock_plugin = MockPlugin {
833      .. MockPlugin::default()
834    };
835
836    let interaction = SynchronousMessage {
837      .. SynchronousMessage::default()
838    };
839    let pact = V4Pact {
840      interactions: vec![ interaction.boxed_v4() ],
841      .. V4Pact::default()
842    };
843    let context = hashmap!{};
844    let data = InteractionVerificationData::default();
845
846    let result = verify_interaction_inner(
847      &mock_plugin,
848      &manifest,
849      &data,
850      &context,
851      &pact,
852      &interaction
853    ).await;
854
855    expect!(result).to(be_ok());
856    let request = {
857      let r = VERIFY_INTERACTION_ARG.read().unwrap();
858      r.clone()
859    }.unwrap();
860    let pact_in = V4Pact::pact_from_json(&serde_json::from_str(request.pact.as_str()).unwrap(), "").unwrap();
861    expect!(pact_in.interactions[0].key().unwrap()).to(be_equal_to(request.interaction_key));
862  }
863}