otx_pool/plugin_extension/
manager.rs1use crate::notify::{NotifyController, RuntimeHandle};
2use crate::plugin_extension::host_service::HostServiceProvider;
3use crate::plugin_extension::plugin_proxy::PluginProxy;
4
5use anyhow::Result;
6use ckb_async_runtime::Handle;
7use otx_pool_plugin_protocol::{HostServiceHandler, Plugin, PluginInfo, PluginMeta};
8use tokio::task::{block_in_place, JoinHandle};
9
10use std::collections::HashMap;
11use std::fs;
12use std::io;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15
16pub const PLUGINS_DIRNAME: &str = "plugins";
17pub const INACTIVE_DIRNAME: &str = "plugins_inactive";
18
19type PluginList = Vec<(String, Box<Arc<dyn Plugin + Send>>)>;
20
21pub struct PluginManager {
22 _plugin_dir: PathBuf,
23 inactive_plugin_dir: PathBuf,
24
25 plugin_configs: HashMap<String, (PluginMeta, PluginInfo)>,
27
28 plugins: HashMap<String, Box<Arc<dyn Plugin + Send>>>,
30
31 service_provider: HostServiceHandler,
32 _event_thread: Option<JoinHandle<()>>,
33}
34
35impl PluginManager {
36 pub fn new(host_dir: &Path, service_provider: HostServiceHandler) -> Self {
37 let plugin_configs: HashMap<String, (PluginMeta, PluginInfo)> = HashMap::new();
38 let plugins: HashMap<String, Box<Arc<dyn Plugin + Send>>> = HashMap::new();
39
40 PluginManager {
41 _plugin_dir: host_dir.join(PLUGINS_DIRNAME),
42 inactive_plugin_dir: host_dir.join(INACTIVE_DIRNAME),
43 plugin_configs,
44 plugins,
45 service_provider,
46 _event_thread: None,
47 }
48 }
49
50 pub fn register_built_in_plugins(&mut self, plugin: Box<Arc<dyn Plugin + Send>>) {
51 let plugin_info = plugin.get_info();
52 let plugin_state = plugin.get_meta();
53 self.plugin_configs
54 .insert(plugin.get_name(), (plugin_state, plugin_info));
55 let plugin = plugin;
56 self.plugins.insert(plugin.get_name(), plugin.clone());
57 }
58
59 pub fn load_third_party_plugins(
60 &mut self,
61 runtime_handle: &RuntimeHandle,
62 service_provider: &HostServiceProvider,
63 ) -> Result<(), String> {
64 log::info!("load third-party plugins");
66 for (plugin_name, (plugin_state, plugin_info)) in
67 self.load_plugin_configs().map_err(|err| err.to_string())?
68 {
69 self.plugin_configs.insert(
70 plugin_name.clone(),
71 (plugin_state.to_owned(), plugin_info.to_owned()),
72 );
73 if plugin_state.is_active {
74 let plugin_proxy = PluginProxy::start_process(
75 runtime_handle.clone(),
76 plugin_state,
77 plugin_info,
78 service_provider.handler(),
79 )?;
80 self.plugins
81 .insert(plugin_name.to_owned(), Box::new(Arc::new(plugin_proxy)));
82 }
83 }
84
85 Ok(())
86 }
87
88 pub fn subscribe_events(&mut self, notify_ctrl: &NotifyController, runtime_handle: &Handle) {
89 let plugins: PluginList = self
90 .plugins
91 .iter()
92 .map(|(name, p)| (name.to_owned(), p.clone()))
93 .collect();
94
95 let mut interval_event_receiver =
96 runtime_handle.block_on(notify_ctrl.subscribe_interval("plugin manager"));
97 let mut new_otx_event_receiver =
98 runtime_handle.block_on(notify_ctrl.subscribe_new_open_tx("plugin manager"));
99 let mut commit_otx_event_receiver =
100 runtime_handle.block_on(notify_ctrl.subscribe_commit_open_tx("plugin manager"));
101 let event_listening_thread = runtime_handle.spawn(async move {
102 loop {
103 tokio::select! {
104 Some(elapsed) = interval_event_receiver.recv() => {
105 plugins.iter().for_each(| (_, plugin) | {
106 block_in_place(|| plugin.on_new_intervel(elapsed));
107 })
108 }
109 Some(open_tx) = new_otx_event_receiver.recv() => {
110 plugins.iter().for_each(|(_, plugin) | {
111 block_in_place(|| plugin.on_new_otx(open_tx.clone()));
112 })
113 }
114 Some(otx_hash) = commit_otx_event_receiver.recv() => {
115 plugins.iter().for_each(|(_, plugin) | {
116 block_in_place(|| plugin.on_commit_otx(otx_hash.clone()));
117 })
118 }
119 }
120 }
121 });
122 self._event_thread = Some(event_listening_thread);
123 }
124
125 pub fn plugin_configs(&self) -> &HashMap<String, (PluginMeta, PluginInfo)> {
126 &self.plugin_configs
127 }
128
129 pub fn service_handler(&self) -> HostServiceHandler {
130 self.service_provider.clone()
131 }
132
133 fn load_plugin_configs(&self) -> Result<HashMap<String, (PluginMeta, PluginInfo)>, io::Error> {
134 if !self._plugin_dir.exists() {
135 fs::create_dir_all(&self._plugin_dir)?;
136 }
137 if !self.inactive_plugin_dir.exists() {
138 fs::create_dir_all(&self.inactive_plugin_dir)?;
139 }
140
141 let mut plugin_configs = HashMap::new();
142 for (dir, is_active) in &[
143 (&self._plugin_dir, true),
144 (&self.inactive_plugin_dir, false),
145 ] {
146 for entry in fs::read_dir(dir)? {
147 let path = entry?.path();
148 if path.is_file() {
149 let plugin_state = PluginMeta::new(path.clone(), *is_active, false);
150 match PluginProxy::load_plugin_info(path.clone()) {
151 Ok(plugin_info) => {
152 log::info!("Loaded plugin: {}", plugin_info.name);
153 plugin_configs.insert(
154 plugin_info.clone().name,
155 (plugin_state, plugin_info.clone()),
156 );
157 }
158 Err(err) => {
159 log::warn!("get_config error: {}, path: {:?}", err, path);
160 }
161 }
162 }
163 }
164 }
165 Ok(plugin_configs)
166 }
167}