use {
jsonrpc_core::{ErrorCode, Result as JsonRpcResult},
jsonrpc_server_utils::tokio::sync::oneshot::Sender as OneShotSender,
libloading::Library,
log::*,
solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPlugin,
std::path::Path,
};
#[derive(Default, Debug)]
pub struct GeyserPluginManager {
pub plugins: Vec<Box<dyn GeyserPlugin>>,
libs: Vec<Library>,
}
impl GeyserPluginManager {
pub fn new() -> Self {
GeyserPluginManager {
plugins: Vec::default(),
libs: Vec::default(),
}
}
pub fn unload(&mut self) {
for mut plugin in self.plugins.drain(..) {
info!("Unloading plugin for {:?}", plugin.name());
plugin.on_unload();
}
for lib in self.libs.drain(..) {
drop(lib);
}
}
pub fn account_data_notifications_enabled(&self) -> bool {
for plugin in &self.plugins {
if plugin.account_data_notifications_enabled() {
return true;
}
}
false
}
pub fn transaction_notifications_enabled(&self) -> bool {
for plugin in &self.plugins {
if plugin.transaction_notifications_enabled() {
return true;
}
}
false
}
pub fn entry_notifications_enabled(&self) -> bool {
for plugin in &self.plugins {
if plugin.entry_notifications_enabled() {
return true;
}
}
false
}
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())
}
pub(crate) fn load_plugin(
&mut self,
geyser_plugin_config_file: impl AsRef<Path>,
) -> JsonRpcResult<String> {
let (mut new_plugin, new_lib, new_config_file) =
load_plugin_from_config(geyser_plugin_config_file.as_ref()).map_err(|e| {
jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: format!("Failed to load plugin: {e}"),
data: None,
}
})?;
if self
.plugins
.iter()
.any(|plugin| plugin.name().eq(new_plugin.name()))
{
return Err(jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: format!(
"There already exists a plugin named {} loaded. Did not load requested plugin",
new_plugin.name()
),
data: None,
});
}
new_plugin
.on_load(new_config_file)
.map_err(|on_load_err| jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: format!(
"on_load method of plugin {} failed: {on_load_err}",
new_plugin.name()
),
data: None,
})?;
let name = new_plugin.name().to_string();
self.plugins.push(new_plugin);
self.libs.push(new_lib);
Ok(name)
}
pub(crate) fn unload_plugin(&mut self, name: &str) -> JsonRpcResult<()> {
let Some(idx) = self.plugins.iter().position(|plugin| plugin.name().eq(name)) else {
return Err(
jsonrpc_core::error::Error {
code: ErrorCode::InvalidRequest,
message: String::from("The plugin you requested to unload is not loaded"),
data: None,
}
)
};
self._drop_plugin(idx);
Ok(())
}
pub(crate) fn reload_plugin(&mut self, name: &str, config_file: &str) -> JsonRpcResult<()> {
let Some(idx) = self.plugins.iter().position(|plugin| plugin.name().eq(name)) else {
return Err(
jsonrpc_core::error::Error {
code: ErrorCode::InvalidRequest,
message: String::from("The plugin you requested to reload is not loaded"),
data: None,
}
)
};
self._drop_plugin(idx);
let (mut new_plugin, new_lib, new_parsed_config_file) =
load_plugin_from_config(config_file.as_ref()).map_err(|err| jsonrpc_core::Error {
code: ErrorCode::InvalidRequest,
message: err.to_string(),
data: None,
})?;
match new_plugin.on_load(new_parsed_config_file) {
Ok(()) => {
self.plugins.push(new_plugin);
self.libs.push(new_lib);
}
Err(err) => {
return Err(jsonrpc_core::error::Error {
code: ErrorCode::InvalidRequest,
message: format!(
"Failed to start new plugin (previous plugin was dropped!): {err}"
),
data: None,
});
}
}
Ok(())
}
fn _drop_plugin(&mut self, idx: usize) {
let mut current_plugin = self.plugins.remove(idx);
let _current_lib = self.libs.remove(idx);
current_plugin.on_unload();
}
}
#[derive(Debug)]
pub enum GeyserPluginManagerRequest {
ReloadPlugin {
name: String,
config_file: String,
response_sender: OneShotSender<JsonRpcResult<()>>,
},
UnloadPlugin {
name: String,
response_sender: OneShotSender<JsonRpcResult<()>>,
},
LoadPlugin {
config_file: String,
response_sender: OneShotSender<JsonRpcResult<String>>,
},
ListPlugins {
response_sender: OneShotSender<JsonRpcResult<Vec<String>>>,
},
}
#[derive(thiserror::Error, Debug)]
pub enum GeyserPluginManagerError {
#[error("Cannot open the the plugin config file")]
CannotOpenConfigFile(String),
#[error("Cannot read the the plugin config file")]
CannotReadConfigFile(String),
#[error("The config file is not in a valid Json format")]
InvalidConfigFileFormat(String),
#[error("Plugin library path is not specified in the config file")]
LibPathNotSet,
#[error("Invalid plugin path")]
InvalidPluginPath,
#[error("Cannot load plugin shared library")]
PluginLoadError(String),
#[error("The geyser plugin {0} is already loaded shared library")]
PluginAlreadyLoaded(String),
#[error("The GeyserPlugin on_load method failed")]
PluginStartError(String),
}
#[cfg(not(test))]
pub(crate) fn load_plugin_from_config(
geyser_plugin_config_file: &Path,
) -> Result<(Box<dyn GeyserPlugin>, Library, &str), GeyserPluginManagerError> {
use std::{fs::File, io::Read, path::PathBuf};
type PluginConstructor = unsafe fn() -> *mut dyn GeyserPlugin;
use libloading::Symbol;
let mut file = match File::open(geyser_plugin_config_file) {
Ok(file) => file,
Err(err) => {
return Err(GeyserPluginManagerError::CannotOpenConfigFile(format!(
"Failed to open the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
)));
}
};
let mut contents = String::new();
if let Err(err) = file.read_to_string(&mut contents) {
return Err(GeyserPluginManagerError::CannotReadConfigFile(format!(
"Failed to read the plugin config file {geyser_plugin_config_file:?}, error: {err:?}"
)));
}
let result: serde_json::Value = match json5::from_str(&contents) {
Ok(value) => value,
Err(err) => {
return Err(GeyserPluginManagerError::InvalidConfigFileFormat(format!(
"The config file {geyser_plugin_config_file:?} is not in a valid Json5 format, error: {err:?}"
)));
}
};
let libpath = result["libpath"]
.as_str()
.ok_or(GeyserPluginManagerError::LibPathNotSet)?;
let mut libpath = PathBuf::from(libpath);
if libpath.is_relative() {
let config_dir = geyser_plugin_config_file.parent().ok_or_else(|| {
GeyserPluginManagerError::CannotOpenConfigFile(format!(
"Failed to resolve parent of {geyser_plugin_config_file:?}",
))
})?;
libpath = config_dir.join(libpath);
}
let config_file = geyser_plugin_config_file
.as_os_str()
.to_str()
.ok_or(GeyserPluginManagerError::InvalidPluginPath)?;
let (plugin, lib) = unsafe {
let lib = Library::new(libpath)
.map_err(|e| GeyserPluginManagerError::PluginLoadError(e.to_string()))?;
let constructor: Symbol<PluginConstructor> = lib
.get(b"_create_plugin")
.map_err(|e| GeyserPluginManagerError::PluginLoadError(e.to_string()))?;
let plugin_raw = constructor();
(Box::from_raw(plugin_raw), lib)
};
Ok((plugin, lib, config_file))
}
#[cfg(test)]
pub(crate) fn load_plugin_from_config(
_geyser_plugin_config_file: &Path,
) -> Result<(Box<dyn GeyserPlugin>, Library, &str), GeyserPluginManagerError> {
Ok(tests::dummy_plugin_and_library())
}
#[cfg(test)]
mod tests {
use {
crate::geyser_plugin_manager::GeyserPluginManager,
libloading::Library,
solana_geyser_plugin_interface::geyser_plugin_interface::GeyserPlugin,
std::sync::{Arc, RwLock},
};
pub(super) fn dummy_plugin_and_library() -> (Box<dyn GeyserPlugin>, Library, &'static str) {
let plugin = Box::new(TestPlugin);
let lib = {
let handle: *mut std::os::raw::c_void = &mut () as *mut _ as *mut std::os::raw::c_void;
let inner_lib = unsafe { libloading::os::unix::Library::from_raw(handle) };
Library::from(inner_lib)
};
(plugin, lib, DUMMY_CONFIG)
}
pub(super) fn dummy_plugin_and_library2() -> (Box<dyn GeyserPlugin>, Library, &'static str) {
let plugin = Box::new(TestPlugin2);
let lib = {
let handle: *mut std::os::raw::c_void = &mut () as *mut _ as *mut std::os::raw::c_void;
let inner_lib = unsafe { libloading::os::unix::Library::from_raw(handle) };
Library::from(inner_lib)
};
(plugin, lib, DUMMY_CONFIG)
}
const DUMMY_NAME: &str = "dummy";
pub(super) const DUMMY_CONFIG: &str = "dummy_config";
const ANOTHER_DUMMY_NAME: &str = "another_dummy";
#[derive(Debug)]
pub(super) struct TestPlugin;
impl GeyserPlugin for TestPlugin {
fn name(&self) -> &'static str {
DUMMY_NAME
}
}
#[derive(Debug)]
pub(super) struct TestPlugin2;
impl GeyserPlugin for TestPlugin2 {
fn name(&self) -> &'static str {
ANOTHER_DUMMY_NAME
}
}
#[test]
fn test_geyser_reload() {
let plugin_manager = Arc::new(RwLock::new(GeyserPluginManager::new()));
let mut plugin_manager_lock = plugin_manager.write().unwrap();
let reload_result = plugin_manager_lock.reload_plugin(DUMMY_NAME, DUMMY_CONFIG);
assert_eq!(
reload_result.unwrap_err().message,
"The plugin you requested to reload is not loaded"
);
let (mut plugin, lib, config) = dummy_plugin_and_library();
plugin.on_load(config).unwrap();
plugin_manager_lock.plugins.push(plugin);
plugin_manager_lock.libs.push(lib);
assert_eq!(plugin_manager_lock.plugins[0].name(), DUMMY_NAME);
plugin_manager_lock.plugins[0].name();
const WRONG_NAME: &str = "wrong_name";
let reload_result = plugin_manager_lock.reload_plugin(WRONG_NAME, DUMMY_CONFIG);
assert_eq!(
reload_result.unwrap_err().message,
"The plugin you requested to reload is not loaded"
);
let reload_result = plugin_manager_lock.reload_plugin(DUMMY_NAME, DUMMY_CONFIG);
assert!(reload_result.is_ok());
}
#[test]
fn test_plugin_list() {
let plugin_manager = Arc::new(RwLock::new(GeyserPluginManager::new()));
let mut plugin_manager_lock = plugin_manager.write().unwrap();
let (mut plugin, lib, config) = dummy_plugin_and_library();
plugin.on_load(config).unwrap();
plugin_manager_lock.plugins.push(plugin);
plugin_manager_lock.libs.push(lib);
let (mut plugin, lib, config) = dummy_plugin_and_library2();
plugin.on_load(config).unwrap();
plugin_manager_lock.plugins.push(plugin);
plugin_manager_lock.libs.push(lib);
let plugins = plugin_manager_lock.list_plugins().unwrap();
assert!(plugins.iter().any(|name| name.eq(DUMMY_NAME)));
assert!(plugins.iter().any(|name| name.eq(ANOTHER_DUMMY_NAME)));
}
#[test]
fn test_plugin_load_unload() {
let plugin_manager = Arc::new(RwLock::new(GeyserPluginManager::new()));
let mut plugin_manager_lock = plugin_manager.write().unwrap();
let load_result = plugin_manager_lock.load_plugin(DUMMY_CONFIG);
assert!(load_result.is_ok());
assert_eq!(plugin_manager_lock.plugins.len(), 1);
let unload_result = plugin_manager_lock.unload_plugin(DUMMY_NAME);
assert!(unload_result.is_ok());
assert_eq!(plugin_manager_lock.plugins.len(), 0);
}
}