use std::collections::HashMap;
use std::env;
use std::fs;
use std::fs::File;
use std::io::BufReader;
use std::path::PathBuf;
use std::process::Stdio;
use std::str::FromStr;
use std::sync::Mutex;
use std::thread;
use anyhow::anyhow;
use lazy_static::lazy_static;
use log::{debug, max_level, trace, warn};
use os_info::Type;
use sysinfo::{Pid, ProcessExt, RefreshKind, Signal, System, SystemExt};
use tokio::process::Command;
use crate::catalogue_manager::{register_plugin_entries, remove_plugin_entries};
use crate::child_process::ChildPluginProcess;
use crate::metrics::send_metrics;
use crate::plugin_models::{PactPlugin, PactPluginManifest, PactPluginRpc, PluginDependency};
use crate::proto::InitPluginRequest;
use crate::utils::versions_compatible;
lazy_static! {
static ref PLUGIN_MANIFEST_REGISTER: Mutex<HashMap<String, PactPluginManifest>> = Mutex::new(HashMap::new());
static ref PLUGIN_REGISTER: Mutex<HashMap<String, PactPlugin>> = Mutex::new(HashMap::new());
}
pub async fn load_plugin(plugin: &PluginDependency) -> anyhow::Result<PactPlugin> {
let thread_id = thread::current().id();
debug!("Loading plugin {:?}", plugin);
trace!("Rust plugin driver version {}", option_env!("CARGO_PKG_VERSION").unwrap_or_default());
trace!("load_plugin {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
let mut inner = PLUGIN_REGISTER.lock().unwrap();
trace!("load_plugin {:?}: Got PLUGIN_REGISTER lock", thread_id);
let result = match lookup_plugin_inner(plugin, &mut inner) {
Some(plugin) => {
debug!("Found running plugin {:?}", plugin);
plugin.update_access();
Ok(plugin.clone())
},
None => {
debug!("Did not find plugin, will start it");
let manifest = load_plugin_manifest(plugin)?;
send_metrics(&manifest);
initialise_plugin(&manifest, &mut inner).await
}
};
trace!("load_plugin {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
result
}
fn lookup_plugin_inner<'a>(
plugin: &PluginDependency,
plugin_register: &'a mut HashMap<String, PactPlugin>
) -> Option<&'a mut PactPlugin> {
if let Some(version) = &plugin.version {
plugin_register.get_mut(format!("{}/{}", plugin.name, version).as_str())
} else {
plugin_register.iter_mut()
.filter(|(_, value)| value.manifest.name == plugin.name)
.max_by(|(_, v1), (_, v2)| v1.manifest.version.cmp(&v2.manifest.version))
.map(|(_, plugin)| plugin)
}
}
pub fn lookup_plugin(plugin: &PluginDependency) -> Option<PactPlugin> {
let thread_id = thread::current().id();
trace!("lookup_plugin {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
let mut inner = PLUGIN_REGISTER.lock().unwrap();
trace!("lookup_plugin {:?}: Got PLUGIN_REGISTER lock", thread_id);
let entry = lookup_plugin_inner(plugin, &mut inner);
trace!("lookup_plugin {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
entry.cloned()
}
pub fn load_plugin_manifest(plugin_dep: &PluginDependency) -> anyhow::Result<PactPluginManifest> {
debug!("Loading plugin manifest for plugin {:?}", plugin_dep);
match lookup_plugin_manifest(plugin_dep) {
Some(manifest) => Ok(manifest),
None => load_manifest_from_disk(plugin_dep)
}
}
fn load_manifest_from_disk(plugin_dep: &PluginDependency) -> anyhow::Result<PactPluginManifest> {
let plugin_dir = pact_plugin_dir()?;
debug!("Looking for plugin in {:?}", plugin_dir);
if plugin_dir.exists() {
for entry in fs::read_dir(plugin_dir)? {
let path = entry?.path();
trace!("Found: {:?}", path);
if path.is_dir() {
let manifest_file = path.join("pact-plugin.json");
if manifest_file.exists() && manifest_file.is_file() {
debug!("Found plugin manifest: {:?}", manifest_file);
let file = File::open(manifest_file)?;
let reader = BufReader::new(file);
let manifest: PactPluginManifest = serde_json::from_reader(reader)?;
trace!("Parsed plugin manifest: {:?}", manifest);
let version = manifest.version.clone();
if manifest.name == plugin_dep.name && versions_compatible(version.as_str(), &plugin_dep.version) {
let manifest = PactPluginManifest {
plugin_dir: path.to_string_lossy().to_string(),
..manifest
};
let key = format!("{}/{}", manifest.name, version);
{
let mut guard = PLUGIN_MANIFEST_REGISTER.lock().unwrap();
guard.insert(key.clone(), manifest.clone());
}
return Ok(manifest);
}
}
}
}
Err(anyhow!("Plugin {:?} was not found (in $HOME/.pact/plugins or $PACT_PLUGIN_DIR)", plugin_dep))
} else {
Err(anyhow!("Plugin directory {:?} does not exist", plugin_dir))
}
}
fn pact_plugin_dir() -> anyhow::Result<PathBuf> {
let env_var = env::var_os("PACT_PLUGIN_DIR");
let plugin_dir = env_var.unwrap_or_default();
let plugin_dir = plugin_dir.to_string_lossy();
if plugin_dir.is_empty() {
home::home_dir().map(|dir| dir.join(".pact/plugins"))
} else {
PathBuf::from_str(plugin_dir.as_ref()).ok()
}.ok_or_else(|| anyhow!("No Pact plugin directory was found (in $HOME/.pact/plugins or $PACT_PLUGIN_DIR)"))
}
pub fn lookup_plugin_manifest(plugin: &PluginDependency) -> Option<PactPluginManifest> {
let guard = PLUGIN_MANIFEST_REGISTER.lock().unwrap();
if let Some(version) = &plugin.version {
let key = format!("{}/{}", plugin.name, version);
guard.get(&key).cloned()
} else {
guard.iter()
.filter(|(_, value)| value.name == plugin.name)
.max_by(|(_, v1), (_, v2)| v1.version.cmp(&v2.version))
.map(|(_, p)| p.clone())
}
}
async fn initialise_plugin(
manifest: &PactPluginManifest,
plugin_register: &mut HashMap<String, PactPlugin>
) -> anyhow::Result<PactPlugin> {
match manifest.executable_type.as_str() {
"exec" => {
let plugin = start_plugin_process(manifest).await?;
debug!("Plugin process started OK (port = {}), sending init message", plugin.port());
init_handshake(manifest, &plugin).await.map_err(|err| {
plugin.kill();
anyhow!("Failed to send init request to the plugin - {}", err)
})?;
let key = format!("{}/{}", manifest.name, manifest.version);
plugin_register.insert(key, plugin.clone());
Ok(plugin)
}
_ => Err(anyhow!("Plugin executable type of {} is not supported", manifest.executable_type))
}
}
pub async fn init_handshake(manifest: &PactPluginManifest, plugin: &dyn PactPluginRpc) -> anyhow::Result<()> {
let request = InitPluginRequest {
implementation: "plugin-driver-rust".to_string(),
version: option_env!("CARGO_PKG_VERSION").unwrap_or("0").to_string()
};
let response = plugin.init_plugin(request).await?;
debug!("Got init response {:?} from plugin {}", response, manifest.name);
register_plugin_entries(manifest, &response.catalogue);
tokio::task::spawn(async { publish_updated_catalogue() });
Ok(())
}
async fn start_plugin_process(manifest: &PactPluginManifest) -> anyhow::Result<PactPlugin> {
debug!("Starting plugin with manifest {:?}", manifest);
let os_info = os_info::get();
debug!("Detected OS: {}", os_info);
let mut path = if let Some(entry_point) = manifest.entry_points.get(&os_info.to_string()) {
PathBuf::from(entry_point)
} else if os_info.os_type() == Type::Windows && manifest.entry_points.contains_key("windows") {
PathBuf::from(manifest.entry_points.get("windows").unwrap())
} else {
PathBuf::from(&manifest.entry_point)
};
if !path.is_absolute() || !path.exists() {
path = PathBuf::from(manifest.plugin_dir.clone()).join(path);
}
debug!("Starting plugin using {:?}", path);
let log_level = max_level();
let child = Command::new(path)
.env("LOG_LEVEL", log_level.as_str())
.env("RUST_LOG", log_level.as_str())
.current_dir(manifest.plugin_dir.clone())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let child_pid = child.id().unwrap_or_default();
debug!("Plugin {} started with PID {}", manifest.name, child_pid);
match ChildPluginProcess::new(child, manifest).await {
Ok(child) => Ok(PactPlugin::new(manifest, child)),
Err(err) => {
let s = System::new_with_specifics(RefreshKind::new().with_processes());
if let Some(process) = s.process(child_pid as Pid) {
process.kill(Signal::Term);
} else {
warn!("Child process with PID {} was not found", child_pid);
}
Err(err)
}
}
}
pub fn shutdown_plugins() {
let thread_id = thread::current().id();
debug!("Shutting down all plugins");
trace!("shutdown_plugins {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
let mut guard = PLUGIN_REGISTER.lock().unwrap();
trace!("shutdown_plugins {:?}: Got PLUGIN_REGISTER lock", thread_id);
for plugin in guard.values() {
debug!("Shutting down plugin {:?}", plugin);
plugin.kill();
remove_plugin_entries(&plugin.manifest.name);
}
guard.clear();
trace!("shutdown_plugins {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
}
pub fn shutdown_plugin(plugin: &mut PactPlugin) {
debug!("Shutting down plugin {}:{}", plugin.manifest.name, plugin.manifest.version);
plugin.kill();
remove_plugin_entries(&plugin.manifest.name);
}
fn publish_updated_catalogue() {
}
pub fn drop_plugin_access(plugin: &PluginDependency) {
let thread_id = thread::current().id();
trace!("drop_plugin_access {:?}: Waiting on PLUGIN_REGISTER lock", thread_id);
let mut inner = PLUGIN_REGISTER.lock().unwrap();
trace!("drop_plugin_access {:?}: Got PLUGIN_REGISTER lock", thread_id);
if let Some(plugin) = lookup_plugin_inner(plugin, &mut inner) {
let key = format!("{}/{}", plugin.manifest.name, plugin.manifest.version);
if plugin.drop_access() == 0 {
shutdown_plugin(plugin);
inner.remove(key.as_str());
}
}
trace!("drop_plugin_access {:?}: Releasing PLUGIN_REGISTER lock", thread_id);
}