1use std::collections::HashMap;
3use std::env;
4use std::fs;
5use std::fs::File;
6use std::io::{BufReader, Write};
7use std::path::PathBuf;
8use std::process::Stdio;
9use std::process::Command;
10use std::str::from_utf8;
11use std::str::FromStr;
12use std::sync::Mutex;
13use std::thread;
14
15use anyhow::{anyhow, bail, Context};
16use bytes::Bytes;
17use itertools::Either;
18use lazy_static::lazy_static;
19use log::max_level;
20use maplit::hashmap;
21use os_info::Type;
22use pact_models::bodies::OptionalBody;
23use pact_models::json_utils::json_to_string;
24use pact_models::PactSpecification;
25use pact_models::prelude::{ContentType, Pact};
26use pact_models::prelude::v4::V4Pact;
27use pact_models::v4::interaction::V4Interaction;
28use reqwest::Client;
29use semver::Version;
30use serde_json::Value;
31use sysinfo::{Pid,System};
32use tracing::{debug, info, trace, warn};
33
34use crate::catalogue_manager::{all_entries, CatalogueEntry, register_plugin_entries, remove_plugin_entries};
35use crate::child_process::ChildPluginProcess;
36use crate::content::ContentMismatch;
37use crate::download::{download_json_from_github, download_plugin_executable, fetch_json_from_url};
38use crate::metrics::send_metrics;
39use crate::mock_server::{MockServerConfig, MockServerDetails, MockServerResults};
40use crate::plugin_models::{PactPlugin, PactPluginManifest, PactPluginRpc, PluginDependency};
41use crate::proto::*;
42use crate::repository::{fetch_repository_index, USER_AGENT};
43use crate::utils::{optional_string, proto_value_to_json, to_proto_struct, to_proto_value, versions_compatible};
44use crate::verification::{InteractionVerificationData, InteractionVerificationResult};
45
46lazy_static! {
47 static ref PLUGIN_MANIFEST_REGISTER: Mutex<HashMap<String, PactPluginManifest>> = Mutex::new(HashMap::new());
48 static ref PLUGIN_REGISTER: Mutex<HashMap<String, PactPlugin>> = Mutex::new(HashMap::new());
49}
50
51pub async fn load_plugin(plugin: &PluginDependency) -> anyhow::Result<PactPlugin> {
54 let thread_id = thread::current().id();
55 debug!("Loading plugin {:?}", plugin);
56 trace!("Rust plugin driver version {}", option_env!("CARGO_PKG_VERSION").unwrap_or_default());
57 trace!("load_plugin {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
58 let mut inner = PLUGIN_REGISTER.lock().unwrap();
59 trace!("load_plugin {:?}: Got PLUGIN_REGISTER lock", thread_id);
60 let result = match lookup_plugin_inner(plugin, &mut inner) {
61 Some(plugin) => {
62 debug!("Found running plugin {:?}", plugin);
63 plugin.update_access();
64 Ok(plugin.clone())
65 },
66 None => {
67 debug!("Did not find plugin, will attempt to start it");
68 let manifest = match load_plugin_manifest(plugin) {
69 Ok(manifest) => manifest,
70 Err(err) => {
71 warn!("Could not load plugin manifest from disk, will try auto install it: {}", err);
72 let http_client = reqwest::ClientBuilder::new()
73 .user_agent(USER_AGENT)
74 .build()?;
75 let index = fetch_repository_index(&http_client, None).await?;
76 match index.lookup_plugin_version(&plugin.name, &plugin.version) {
77 Some(entry) => {
78 info!("Found an entry for the plugin in the plugin index, will try install that");
79 install_plugin_from_url(&http_client, entry.source.value().as_str()).await?
80 }
81 None => Err(err)?
82 }
83 }
84 };
85 send_metrics(&manifest);
86 initialise_plugin(&manifest, &mut inner).await
87 }
88 };
89 trace!("load_plugin {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
90 result
91}
92
93fn lookup_plugin_inner<'a>(
94 plugin: &PluginDependency,
95 plugin_register: &'a mut HashMap<String, PactPlugin>
96) -> Option<&'a mut PactPlugin> {
97 if let Some(version) = &plugin.version {
98 plugin_register.get_mut(format!("{}/{}", plugin.name, version).as_str())
99 } else {
100 plugin_register.iter_mut()
101 .filter(|(_, value)| value.manifest.name == plugin.name)
102 .max_by(|(_, v1), (_, v2)| v1.manifest.version.cmp(&v2.manifest.version))
103 .map(|(_, plugin)| plugin)
104 }
105}
106
107pub fn lookup_plugin(plugin: &PluginDependency) -> Option<PactPlugin> {
109 let thread_id = thread::current().id();
110 trace!("lookup_plugin {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
111 let mut inner = PLUGIN_REGISTER.lock().unwrap();
112 trace!("lookup_plugin {:?}: Got PLUGIN_REGISTER lock", thread_id);
113 let entry = lookup_plugin_inner(plugin, &mut inner);
114 trace!("lookup_plugin {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
115 entry.cloned()
116}
117
118pub fn load_plugin_manifest(plugin_dep: &PluginDependency) -> anyhow::Result<PactPluginManifest> {
121 debug!("Loading plugin manifest for plugin {:?}", plugin_dep);
122 match lookup_plugin_manifest(plugin_dep) {
123 Some(manifest) => Ok(manifest),
124 None => load_manifest_from_disk(plugin_dep)
125 }
126}
127
128fn load_manifest_from_disk(plugin_dep: &PluginDependency) -> anyhow::Result<PactPluginManifest> {
129 let plugin_dir = pact_plugin_dir()?;
130 debug!("Looking for plugin in {:?}", plugin_dir);
131
132 if plugin_dir.exists() {
133 load_manifest_from_dir(plugin_dep, &plugin_dir)
134 } else {
135 Err(anyhow!("Plugin directory {:?} does not exist", plugin_dir))
136 }
137}
138
139fn load_manifest_from_dir(plugin_dep: &PluginDependency, plugin_dir: &PathBuf) -> anyhow::Result<PactPluginManifest> {
140 let mut manifests = vec![];
141 for entry in fs::read_dir(plugin_dir)? {
142 let path = entry?.path();
143 trace!("Found: {:?}", path);
144
145 if path.is_dir() {
146 let manifest_file = path.join("pact-plugin.json");
147 if manifest_file.exists() && manifest_file.is_file() {
148 debug!("Found plugin manifest: {:?}", manifest_file);
149 let file = File::open(manifest_file)?;
150 let reader = BufReader::new(file);
151 let manifest: PactPluginManifest = serde_json::from_reader(reader)?;
152 trace!("Parsed plugin manifest: {:?}", manifest);
153 let version = manifest.version.clone();
154 if manifest.name == plugin_dep.name && versions_compatible(version.as_str(), &plugin_dep.version) {
155 let manifest = PactPluginManifest {
156 plugin_dir: path.to_string_lossy().to_string(),
157 ..manifest
158 };
159 manifests.push(manifest);
160 }
161 }
162 }
163 }
164
165 let manifest = manifests.iter()
166 .max_by(|a, b| {
167 let a = Version::parse(&a.version).unwrap_or_else(|_| Version::new(0, 0, 0));
168 let b = Version::parse(&b.version).unwrap_or_else(|_| Version::new(0, 0, 0));
169 a.cmp(&b)
170 });
171 if let Some(manifest) = manifest {
172 let key = format!("{}/{}", manifest.name, manifest.version);
173 {
174 let mut guard = PLUGIN_MANIFEST_REGISTER.lock().unwrap();
175 guard.insert(key.clone(), manifest.clone());
176 }
177 Ok(manifest.clone())
178 } else {
179 Err(anyhow!("Plugin {} was not found (in $HOME/.pact/plugins or $PACT_PLUGIN_DIR)", plugin_dep))
180 }
181}
182
183pub(crate) fn pact_plugin_dir() -> anyhow::Result<PathBuf> {
184 let env_var = env::var_os("PACT_PLUGIN_DIR");
185 let plugin_dir = env_var.unwrap_or_default();
186 let plugin_dir = plugin_dir.to_string_lossy();
187 if plugin_dir.is_empty() {
188 home::home_dir().map(|dir| dir.join(".pact").join("plugins"))
189 } else {
190 PathBuf::from_str(plugin_dir.as_ref()).ok()
191 }.ok_or_else(|| anyhow!("No Pact plugin directory was found (in $HOME/.pact/plugins or $PACT_PLUGIN_DIR)"))
192}
193
194pub fn lookup_plugin_manifest(plugin: &PluginDependency) -> Option<PactPluginManifest> {
196 let guard = PLUGIN_MANIFEST_REGISTER.lock().unwrap();
197 if let Some(version) = &plugin.version {
198 let key = format!("{}/{}", plugin.name, version);
199 guard.get(&key).cloned()
200 } else {
201 guard.iter()
202 .filter(|(_, value)| value.name == plugin.name)
203 .max_by(|(_, v1), (_, v2)| v1.version.cmp(&v2.version))
204 .map(|(_, p)| p.clone())
205 }
206}
207
208async fn initialise_plugin(
209 manifest: &PactPluginManifest,
210 plugin_register: &mut HashMap<String, PactPlugin>
211) -> anyhow::Result<PactPlugin> {
212 match manifest.executable_type.as_str() {
213 "exec" => {
214 let mut plugin = start_plugin_process(manifest).await?;
215 debug!("Plugin process started OK (port = {}), sending init message", plugin.port());
216
217 init_handshake(manifest, &mut plugin).await.map_err(|err| {
218 plugin.kill();
219 anyhow!("Failed to send init request to the plugin - {}", err)
220 })?;
221
222 let key = format!("{}/{}", manifest.name, manifest.version);
223 plugin_register.insert(key, plugin.clone());
224
225 Ok(plugin)
226 }
227 _ => Err(anyhow!("Plugin executable type of {} is not supported", manifest.executable_type))
228 }
229}
230
231pub async fn init_handshake(manifest: &PactPluginManifest, plugin: &mut (dyn PactPluginRpc + Send + Sync)) -> anyhow::Result<()> {
233 let request = InitPluginRequest {
234 implementation: "plugin-driver-rust".to_string(),
235 version: option_env!("CARGO_PKG_VERSION").unwrap_or("0").to_string()
236 };
237 let response = plugin.init_plugin(request).await?;
238 debug!("Got init response {:?} from plugin {}", response, manifest.name);
239 register_plugin_entries(manifest, &response.catalogue);
240 tokio::task::spawn(publish_updated_catalogue());
241 Ok(())
242}
243
244async fn start_plugin_process(manifest: &PactPluginManifest) -> anyhow::Result<PactPlugin> {
245 debug!("Starting plugin with manifest {:?}", manifest);
246
247 let os_info = os_info::get();
248 debug!("Detected OS: {}", os_info);
249 let mut path = if let Some(entry_point) = manifest.entry_points.get(&os_info.to_string()) {
250 PathBuf::from(entry_point)
251 } else if os_info.os_type() == Type::Windows && manifest.entry_points.contains_key("windows") {
252 PathBuf::from(manifest.entry_points.get("windows").unwrap())
253 } else {
254 PathBuf::from(&manifest.entry_point)
255 };
256 if !path.is_absolute() || !path.exists() {
257 path = PathBuf::from(manifest.plugin_dir.clone()).join(path);
258 }
259 debug!("Starting plugin using {:?}", &path);
260
261 let log_level = max_level();
262 let mut child_command = Command::new(path.clone());
263 let mut child_command = child_command
264 .env("LOG_LEVEL", log_level.to_string())
265 .env("RUST_LOG", log_level.to_string())
266 .current_dir(manifest.plugin_dir.clone());
267
268 if let Some(args) = &manifest.args {
269 child_command = child_command.args(args);
270 }
271
272 let child = child_command
273 .stdout(Stdio::piped())
274 .stderr(Stdio::piped())
275 .spawn()
276 .map_err(|err| anyhow!("Was not able to start plugin process for '{}' - {}",
277 path.to_string_lossy(), err))?;
278 let child_pid = child.id();
279 debug!("Plugin {} started with PID {}", manifest.name, child_pid);
280
281 match ChildPluginProcess::new(child, manifest).await {
282 Ok(child) => Ok(PactPlugin::new(manifest, child)),
283 Err(err) => {
284 let mut s = System::new();
285 s.refresh_processes();
286 if let Some(process) = s.process(Pid::from_u32(child_pid)) {
287 #[cfg(not(windows))]
288 process.kill();
289 #[cfg(windows)]
291 let _ = Command::new("taskkill.exe").arg("/PID").arg(child_pid.to_string()).arg("/F").arg("/T").output();
292 } else {
293 warn!("Child process with PID {} was not found", child_pid);
294 }
295 Err(err)
296 }
297 }
298}
299
300pub fn shutdown_plugins() {
302 let thread_id = thread::current().id();
303 debug!("Shutting down all plugins");
304 trace!("shutdown_plugins {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
305 let mut guard = PLUGIN_REGISTER.lock().unwrap();
306 trace!("shutdown_plugins {:?}: Got PLUGIN_REGISTER lock", thread_id);
307 for plugin in guard.values() {
308 debug!("Shutting down plugin {:?}", plugin);
309 plugin.kill();
310 remove_plugin_entries(&plugin.manifest.name);
311 }
312 guard.clear();
313 trace!("shutdown_plugins {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
314}
315
316pub fn shutdown_plugin(plugin: &mut PactPlugin) {
318 debug!("Shutting down plugin {}:{}", plugin.manifest.name, plugin.manifest.version);
319 plugin.kill();
320 remove_plugin_entries(&plugin.manifest.name);
321}
322
323pub async fn publish_updated_catalogue() {
325 let thread_id = thread::current().id();
326
327 let request = Catalogue {
328 catalogue: all_entries().iter()
329 .map(|entry| crate::proto::CatalogueEntry {
330 r#type: entry.entry_type.to_proto_type() as i32,
331 key: entry.key.clone(),
332 values: entry.values.clone()
333 }).collect()
334 };
335
336 let plugins = {
337 trace!("publish_updated_catalogue {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
338 let inner = PLUGIN_REGISTER.lock().unwrap();
339 trace!("publish_updated_catalogue {:?}: Got PLUGIN_REGISTER lock", thread_id);
340 let plugins = inner.values().cloned().collect::<Vec<_>>();
341 trace!("publish_updated_catalogue {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
342 plugins
343 };
344
345 for plugin in plugins {
346 if let Err(err) = plugin.update_catalogue(request.clone()).await {
347 warn!("Failed to send updated catalogue to plugin '{}' - {}", plugin.manifest.name, err);
348 }
349 }
350}
351
352#[tracing::instrument]
354pub fn increment_plugin_access(plugin: &PluginDependency) {
355 let thread_id = thread::current().id();
356
357 trace!("increment_plugin_access {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
358 let mut inner = PLUGIN_REGISTER.lock().unwrap();
359 trace!("increment_plugin_access {:?}: Got PLUGIN_REGISTER lock", thread_id);
360
361 if let Some(plugin) = lookup_plugin_inner(plugin, &mut inner) {
362 plugin.update_access();
363 }
364
365 trace!("increment_plugin_access {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
366}
367
368#[tracing::instrument]
370pub fn drop_plugin_access(plugin: &PluginDependency) {
371 let thread_id = thread::current().id();
372
373 trace!("drop_plugin_access {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
374 let mut inner = PLUGIN_REGISTER.lock().unwrap();
375 trace!("drop_plugin_access {:?}: Got PLUGIN_REGISTER lock", thread_id);
376
377 if let Some(plugin) = lookup_plugin_inner(plugin, &mut inner) {
378 let key = format!("{}/{}", plugin.manifest.name, plugin.manifest.version);
379 if plugin.drop_access() == 0 {
380 shutdown_plugin(plugin);
381 inner.remove(key.as_str());
382 }
383 }
384
385 trace!("drop_plugin_access {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
386}
387
388#[deprecated(note = "Use start_mock_server_v2 which takes a test context map", since = "0.2.2")]
390pub async fn start_mock_server(
391 catalogue_entry: &CatalogueEntry,
392 pact: Box<dyn Pact + Send + Sync>,
393 config: MockServerConfig
394) -> anyhow::Result<MockServerDetails> {
395 start_mock_server_v2(catalogue_entry, pact, config, hashmap!{}).await
396}
397
398pub async fn start_mock_server_v2(
400 catalogue_entry: &CatalogueEntry,
401 pact: Box<dyn Pact + Send + Sync>,
402 config: MockServerConfig,
403 test_context: HashMap<String, Value>
404) -> anyhow::Result<MockServerDetails> {
405 let manifest = catalogue_entry.plugin.as_ref()
406 .ok_or_else(|| anyhow!("Catalogue entry did not have an associated plugin manifest"))?;
407 let plugin = lookup_plugin(&manifest.as_dependency())
408 .ok_or_else(|| anyhow!("Did not find a running plugin for manifest {:?}", manifest))?;
409
410 debug!(plugin_name = manifest.name.as_str(), plugin_version = manifest.version.as_str(),
411 "Sending startMockServer request to plugin");
412 let request = StartMockServerRequest {
413 host_interface: config.host_interface.unwrap_or_default(),
414 port: config.port,
415 tls: config.tls,
416 pact: pact.to_json(PactSpecification::V4)?.to_string(),
417 test_context: Some(to_proto_struct(&test_context))
418 };
419 let response = plugin.start_mock_server(request).await?;
420 debug!("Got response ${response:?}");
421
422 let mock_server_response = response.response
423 .ok_or_else(|| anyhow!("Did not get a valid response from the start mock server call"))?;
424 match mock_server_response {
425 start_mock_server_response::Response::Error(err) => Err(anyhow!("Mock server failed to start: {}", err)),
426 start_mock_server_response::Response::Details(details) => Ok(MockServerDetails {
427 key: details.key.clone(),
428 base_url: details.address.clone(),
429 port: details.port,
430 plugin
431 })
432 }
433}
434
435pub async fn shutdown_mock_server(mock_server: &MockServerDetails) -> anyhow::Result<Vec<MockServerResults>> {
437 let request = ShutdownMockServerRequest {
438 server_key: mock_server.key.to_string()
439 };
440
441 debug!(
442 plugin_name = mock_server.plugin.manifest.name.as_str(),
443 plugin_version = mock_server.plugin.manifest.version.as_str(),
444 server_key = mock_server.key.as_str(),
445 "Sending shutdownMockServer request to plugin"
446 );
447 let response = mock_server.plugin.shutdown_mock_server(request).await?;
448 debug!("Got response: {response:?}");
449
450 if response.ok {
451 Ok(vec![])
452 } else {
453 Ok(response.results.iter().map(|result| {
454 MockServerResults {
455 path: result.path.clone(),
456 error: result.error.clone(),
457 mismatches: result.mismatches.iter().map(|mismatch| {
458 ContentMismatch {
459 expected: mismatch.expected.as_ref()
460 .map(|e| from_utf8(&e).unwrap_or_default().to_string())
461 .unwrap_or_default(),
462 actual: mismatch.actual.as_ref()
463 .map(|a| from_utf8(&a).unwrap_or_default().to_string())
464 .unwrap_or_default(),
465 mismatch: mismatch.mismatch.clone(),
466 path: mismatch.path.clone(),
467 diff: optional_string(&mismatch.diff),
468 mismatch_type: optional_string(&mismatch.mismatch_type)
469 }
470 }).collect()
471 }
472 }).collect())
473 }
474}
475
476pub async fn get_mock_server_results(mock_server: &MockServerDetails) -> anyhow::Result<Vec<MockServerResults>> {
478 let request = MockServerRequest {
479 server_key: mock_server.key.to_string()
480 };
481
482 debug!(
483 plugin_name = mock_server.plugin.manifest.name.as_str(),
484 plugin_version = mock_server.plugin.manifest.version.as_str(),
485 server_key = mock_server.key.as_str(),
486 "Sending getMockServerResults request to plugin"
487 );
488 let response = mock_server.plugin.get_mock_server_results(request).await?;
489 debug!("Got response: {response:?}");
490
491 if response.ok {
492 Ok(vec![])
493 } else {
494 Ok(response.results.iter().map(|result| {
495 MockServerResults {
496 path: result.path.clone(),
497 error: result.error.clone(),
498 mismatches: result.mismatches.iter().map(|mismatch| {
499 ContentMismatch {
500 expected: mismatch.expected.as_ref()
501 .map(|e| from_utf8(&e).unwrap_or_default().to_string())
502 .unwrap_or_default(),
503 actual: mismatch.actual.as_ref()
504 .map(|a| from_utf8(&a).unwrap_or_default().to_string())
505 .unwrap_or_default(),
506 mismatch: mismatch.mismatch.clone(),
507 path: mismatch.path.clone(),
508 diff: optional_string(&mismatch.diff),
509 mismatch_type: optional_string(&mismatch.mismatch_type)
510 }
511 }).collect()
512 }
513 }).collect())
514 }
515}
516
517pub async fn prepare_validation_for_interaction(
520 transport_entry: &CatalogueEntry,
521 pact: &V4Pact,
522 interaction: &(dyn V4Interaction + Send + Sync),
523 context: &HashMap<String, Value>
524) -> anyhow::Result<InteractionVerificationData> {
525 let manifest = transport_entry.plugin.as_ref()
526 .ok_or_else(|| anyhow!("Transport catalogue entry did not have an associated plugin manifest"))?;
527 let plugin = lookup_plugin(&manifest.as_dependency())
528 .ok_or_else(|| anyhow!("Did not find a running plugin for manifest {:?}", manifest))?;
529
530 prepare_validation_for_interaction_inner(&plugin, manifest, pact, interaction, context).await
531}
532
533pub(crate) async fn prepare_validation_for_interaction_inner(
534 plugin: &(dyn PactPluginRpc + Send + Sync),
535 manifest: &PactPluginManifest,
536 pact: &V4Pact,
537 interaction: &(dyn V4Interaction + Send + Sync),
538 context: &HashMap<String, Value>
539) -> anyhow::Result<InteractionVerificationData> {
540 let mut pact = pact.clone();
541 pact.interactions = pact.interactions.iter().map(|i| i.with_unique_key()).collect();
542 let request = VerificationPreparationRequest {
543 pact: pact.to_json(PactSpecification::V4)?.to_string(),
544 interaction_key: interaction.unique_key(),
545 config: Some(to_proto_struct(context))
546 };
547
548 debug!(plugin_name = manifest.name.as_str(), plugin_version = manifest.version.as_str(),
549 "Sending prepareValidationForInteraction request to plugin");
550 let response = plugin.prepare_interaction_for_verification(request).await?;
551 debug!("Got response: {response:?}");
552
553 let validation_response = response.response
554 .ok_or_else(|| anyhow!("Did not get a valid response from the prepare interaction for verification call"))?;
555 match &validation_response {
556 verification_preparation_response::Response::Error(err) => Err(anyhow!("Failed to prepare the request: {}", err)),
557 verification_preparation_response::Response::InteractionData(data) => {
558 let content_type = data.body.as_ref().and_then(|body| ContentType::parse(body.content_type.as_str()).ok());
559 Ok(InteractionVerificationData {
560 request_data: data.body.as_ref()
561 .and_then(|body| body.content.as_ref())
562 .map(|body| OptionalBody::Present(Bytes::from(body.clone()), content_type, None)).unwrap_or_default(),
563 metadata: data.metadata.iter().map(|(k, v)| {
564 let value = match &v.value {
565 Some(v) => match &v {
566 metadata_value::Value::NonBinaryValue(v) => Either::Left(proto_value_to_json(v)),
567 metadata_value::Value::BinaryValue(b) => Either::Right(Bytes::from(b.clone()))
568 }
569 None => Either::Left(Value::Null)
570 };
571 (k.clone(), value)
572 }).collect()
573 })
574 }
575 }
576}
577
578pub async fn verify_interaction(
580 transport_entry: &CatalogueEntry,
581 verification_data: &InteractionVerificationData,
582 config: &HashMap<String, Value>,
583 pact: &V4Pact,
584 interaction: &(dyn V4Interaction + Send + Sync)
585) -> anyhow::Result<InteractionVerificationResult> {
586 let manifest = transport_entry.plugin.as_ref()
587 .ok_or_else(|| anyhow!("Transport catalogue entry did not have an associated plugin manifest"))?;
588 let plugin = lookup_plugin(&manifest.as_dependency())
589 .ok_or_else(|| anyhow!("Did not find a running plugin for manifest {:?}", manifest))?;
590
591 verify_interaction_inner(
592 &plugin,
593 &manifest,
594 verification_data,
595 config,
596 pact,
597 interaction
598 ).await
599}
600
601pub(crate) async fn verify_interaction_inner(
602 plugin: &(dyn PactPluginRpc + Send + Sync),
603 manifest: &PactPluginManifest,
604 verification_data: &InteractionVerificationData,
605 config: &HashMap<String, Value>,
606 pact: &V4Pact,
607 interaction: &(dyn V4Interaction + Send + Sync)
608) -> anyhow::Result<InteractionVerificationResult> {
609 let mut pact = pact.clone();
610 pact.interactions = pact.interactions.iter().map(|i| i.with_unique_key()).collect();
611 let request = VerifyInteractionRequest {
612 pact: pact.to_json(PactSpecification::V4)?.to_string(),
613 interaction_key: interaction.unique_key(),
614 config: Some(to_proto_struct(config)),
615 interaction_data: Some(InteractionData {
616 body: Some((&verification_data.request_data).into()),
617 metadata: verification_data.metadata.iter().map(|(k, v)| {
618 (k.clone(), MetadataValue { value: Some(match v {
619 Either::Left(value) => metadata_value::Value::NonBinaryValue(to_proto_value(value)),
620 Either::Right(b) => metadata_value::Value::BinaryValue(b.to_vec())
621 }) })
622 }).collect()
623 })
624 };
625
626 debug!(plugin_name = manifest.name.as_str(), plugin_version = manifest.version.as_str(),
627 "Sending verifyInteraction request to plugin");
628 let response = plugin.verify_interaction(request).await?;
629 debug!("Got response: {response:?}");
630
631 let validation_response = response.response
632 .ok_or_else(|| anyhow!("Did not get a valid response from the verification call"))?;
633 match &validation_response {
634 verify_interaction_response::Response::Error(err) => Err(anyhow!("Failed to verify the request: {}", err)),
635 verify_interaction_response::Response::Result(data) => Ok(data.into())
636 }
637}
638
639pub async fn install_plugin_from_url(
642 http_client: &Client,
643 source_url: &str
644) -> anyhow::Result<PactPluginManifest> {
645 let response = fetch_json_from_url(source_url, http_client).await?;
646 if let Some(map) = response.as_object() {
647 if let Some(tag) = map.get("tag_name") {
648 let tag = json_to_string(tag);
649 debug!(%tag, "Found tag");
650 let url = if source_url.ends_with("/latest") {
651 source_url.strip_suffix("/latest").unwrap_or(source_url)
652 } else {
653 let suffix = format!("/tag/{}", tag);
654 source_url.strip_suffix(suffix.as_str()).unwrap_or(source_url)
655 };
656 let manifest_json = download_json_from_github(&http_client, url, &tag, "pact-plugin.json")
657 .await.context("Downloading manifest file from GitHub")?;
658 let manifest: PactPluginManifest = serde_json::from_value(manifest_json)
659 .context("Failed to parsing JSON manifest file from GitHub")?;
660 debug!(?manifest, "Loaded manifest from GitHub");
661
662 debug!("Installing plugin {} version {}", manifest.name, manifest.version);
663 let plugin_dir = create_plugin_dir(&manifest)
664 .context("Failed to creating plugins directory")?;
665 download_plugin_executable(&manifest, &plugin_dir, &http_client, url, &tag, false).await?;
666
667 Ok(PactPluginManifest {
668 plugin_dir: plugin_dir.to_string_lossy().to_string(),
669 .. manifest
670 })
671 } else {
672 bail!("GitHub release page does not have a valid tag_name attribute");
673 }
674 } else {
675 bail!("Response from source is not a valid JSON from a GitHub release page")
676 }
677}
678
679fn create_plugin_dir(manifest: &PactPluginManifest) -> anyhow::Result<PathBuf> {
680 let plugins_dir = pact_plugin_dir()?;
681 if !plugins_dir.exists() {
682 info!(plugins_dir = %plugins_dir.display(), "Creating plugins directory");
683 fs::create_dir_all(plugins_dir.clone())?;
684 }
685
686 let plugin_dir = plugins_dir.join(format!("{}-{}", manifest.name, manifest.version));
687 info!(plugin_dir = %plugin_dir.display(), "Creating plugin directory");
688 fs::create_dir(plugin_dir.clone())?;
689
690 info!("Writing plugin manifest file");
691 let file_name = plugin_dir.join("pact-plugin.json");
692 let mut f = File::create(file_name)?;
693 let json = serde_json::to_string(manifest)?;
694 f.write_all(json.as_bytes())?;
695
696 Ok(plugin_dir.clone())
697}
698
699#[cfg(test)]
700mod tests {
701 use std::fs::{self, File};
702
703 use maplit::hashmap;
704 use pact_models::v4::sync_message::SynchronousMessage;
705 use pact_models::prelude::v4::V4Pact;
706 use pact_models::v4::interaction::V4Interaction;
707
708 use expectest::prelude::*;
709 use tempdir::TempDir;
710
711 use crate::plugin_models::PluginDependency;
712 use crate::plugin_manager::prepare_validation_for_interaction_inner;
713 use crate::plugin_manager::verify_interaction_inner;
714 use crate::plugin_models::tests::MockPlugin;
715 use crate::plugin_models::tests::PREPARE_INTERACTION_FOR_VERIFICATION_ARG;
716 use crate::plugin_models::tests::VERIFY_INTERACTION_ARG;
717 use crate::verification::InteractionVerificationData;
718
719 use super::{
720 load_manifest_from_dir,
721 PactPluginManifest
722 };
723
724 #[test]
725 fn load_manifest_from_dir_test() {
726 let tmp_dir = TempDir::new("load_manifest_from_dir").unwrap();
727
728 let manifest_1 = PactPluginManifest {
729 name: "test-plugin".to_string(),
730 version: "0.1.5".to_string(),
731 .. PactPluginManifest::default()
732 };
733 let path_1 = tmp_dir.path().join("1");
734 fs::create_dir_all(&path_1).unwrap();
735 let file_1 = File::create(path_1.join("pact-plugin.json")).unwrap();
736 serde_json::to_writer(file_1, &manifest_1).unwrap();
737
738 let manifest_2 = PactPluginManifest {
739 name: "test-plugin".to_string(),
740 version: "0.1.20".to_string(),
741 .. PactPluginManifest::default()
742 };
743 let path_2 = tmp_dir.path().join("2");
744 fs::create_dir_all(&path_2).unwrap();
745 let file_2 = File::create(path_2.join("pact-plugin.json")).unwrap();
746 serde_json::to_writer(file_2, &manifest_2).unwrap();
747
748 let manifest_3 = PactPluginManifest {
749 name: "test-plugin".to_string(),
750 version: "0.1.7".to_string(),
751 .. PactPluginManifest::default()
752 };
753 let path_3 = tmp_dir.path().join("3");
754 fs::create_dir_all(&path_3).unwrap();
755 let file_3 = File::create(path_3.join("pact-plugin.json")).unwrap();
756 serde_json::to_writer(file_3, &manifest_3).unwrap();
757
758 let manifest_4 = PactPluginManifest {
759 name: "test-plugin".to_string(),
760 version: "0.1.14".to_string(),
761 .. PactPluginManifest::default()
762 };
763 let path_4 = tmp_dir.path().join("4");
764 fs::create_dir_all(&path_4).unwrap();
765 let file_4 = File::create(path_4.join("pact-plugin.json")).unwrap();
766 serde_json::to_writer(file_4, &manifest_4).unwrap();
767
768 let manifest_5 = PactPluginManifest {
769 name: "test-plugin".to_string(),
770 version: "0.1.12".to_string(),
771 .. PactPluginManifest::default()
772 };
773 let path_5 = tmp_dir.path().join("5");
774 fs::create_dir_all(&path_5).unwrap();
775 let file_5 = File::create(path_5.join("pact-plugin.json")).unwrap();
776 serde_json::to_writer(file_5, &manifest_5).unwrap();
777
778 let dep = PluginDependency {
779 name: "test-plugin".to_string(),
780 version: None,
781 dependency_type: Default::default()
782 };
783
784 let result = load_manifest_from_dir(&dep, &tmp_dir.path().to_path_buf()).unwrap();
785 expect!(result.version).to(be_equal_to("0.1.20"));
786 }
787
788 #[test_log::test(tokio::test)]
789 async fn prepare_validation_for_interaction_passes_in_pact_with_interaction_keys_set() {
790 let manifest = PactPluginManifest {
791 name: "test-plugin".to_string(),
792 version: "0.0.0".to_string(),
793 .. PactPluginManifest::default()
794 };
795 let mock_plugin = MockPlugin {
796 .. MockPlugin::default()
797 };
798
799 let interaction = SynchronousMessage {
800 .. SynchronousMessage::default()
801 };
802 let pact = V4Pact {
803 interactions: vec![ interaction.boxed_v4() ],
804 .. V4Pact::default()
805 };
806 let context = hashmap!{};
807
808 let result = prepare_validation_for_interaction_inner(
809 &mock_plugin,
810 &manifest,
811 &pact,
812 &interaction,
813 &context
814 ).await;
815
816 expect!(result).to(be_ok());
817 let request = {
818 let r = PREPARE_INTERACTION_FOR_VERIFICATION_ARG.read().unwrap();
819 r.clone()
820 }.unwrap();
821 let pact_in = V4Pact::pact_from_json(&serde_json::from_str(request.pact.as_str()).unwrap(), "").unwrap();
822 expect!(pact_in.interactions[0].key().unwrap()).to(be_equal_to(request.interaction_key));
823 }
824
825 #[test_log::test(tokio::test)]
826 async fn verify_interaction_passes_in_pact_with_interaction_keys_set() {
827 let manifest = PactPluginManifest {
828 name: "test-plugin".to_string(),
829 version: "0.0.0".to_string(),
830 .. PactPluginManifest::default()
831 };
832 let mock_plugin = MockPlugin {
833 .. MockPlugin::default()
834 };
835
836 let interaction = SynchronousMessage {
837 .. SynchronousMessage::default()
838 };
839 let pact = V4Pact {
840 interactions: vec![ interaction.boxed_v4() ],
841 .. V4Pact::default()
842 };
843 let context = hashmap!{};
844 let data = InteractionVerificationData::default();
845
846 let result = verify_interaction_inner(
847 &mock_plugin,
848 &manifest,
849 &data,
850 &context,
851 &pact,
852 &interaction
853 ).await;
854
855 expect!(result).to(be_ok());
856 let request = {
857 let r = VERIFY_INTERACTION_ARG.read().unwrap();
858 r.clone()
859 }.unwrap();
860 let pact_in = V4Pact::pact_from_json(&serde_json::from_str(request.pact.as_str()).unwrap(), "").unwrap();
861 expect!(pact_in.interactions[0].key().unwrap()).to(be_equal_to(request.interaction_key));
862 }
863}