otx_pool/plugin_extension/
manager.rs

1use crate::notify::{NotifyController, RuntimeHandle};
2use crate::plugin_extension::host_service::HostServiceProvider;
3use crate::plugin_extension::plugin_proxy::PluginProxy;
4
5use anyhow::Result;
6use ckb_async_runtime::Handle;
7use otx_pool_plugin_protocol::{HostServiceHandler, Plugin, PluginInfo, PluginMeta};
8use tokio::task::{block_in_place, JoinHandle};
9
10use std::collections::HashMap;
11use std::fs;
12use std::io;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15
16pub const PLUGINS_DIRNAME: &str = "plugins";
17pub const INACTIVE_DIRNAME: &str = "plugins_inactive";
18
19type PluginList = Vec<(String, Box<Arc<dyn Plugin + Send>>)>;
20
21pub struct PluginManager {
22    _plugin_dir: PathBuf,
23    inactive_plugin_dir: PathBuf,
24
25    // information about all plugins, including inactive ones
26    plugin_configs: HashMap<String, (PluginMeta, PluginInfo)>,
27
28    // proxies for activated plugin processes
29    plugins: HashMap<String, Box<Arc<dyn Plugin + Send>>>,
30
31    service_provider: HostServiceHandler,
32    _event_thread: Option<JoinHandle<()>>,
33}
34
35impl PluginManager {
36    pub fn new(host_dir: &Path, service_provider: HostServiceHandler) -> Self {
37        let plugin_configs: HashMap<String, (PluginMeta, PluginInfo)> = HashMap::new();
38        let plugins: HashMap<String, Box<Arc<dyn Plugin + Send>>> = HashMap::new();
39
40        PluginManager {
41            _plugin_dir: host_dir.join(PLUGINS_DIRNAME),
42            inactive_plugin_dir: host_dir.join(INACTIVE_DIRNAME),
43            plugin_configs,
44            plugins,
45            service_provider,
46            _event_thread: None,
47        }
48    }
49
50    pub fn register_built_in_plugins(&mut self, plugin: Box<Arc<dyn Plugin + Send>>) {
51        let plugin_info = plugin.get_info();
52        let plugin_state = plugin.get_meta();
53        self.plugin_configs
54            .insert(plugin.get_name(), (plugin_state, plugin_info));
55        let plugin = plugin;
56        self.plugins.insert(plugin.get_name(), plugin.clone());
57    }
58
59    pub fn load_third_party_plugins(
60        &mut self,
61        runtime_handle: &RuntimeHandle,
62        service_provider: &HostServiceProvider,
63    ) -> Result<(), String> {
64        // load plugins
65        log::info!("load third-party plugins");
66        for (plugin_name, (plugin_state, plugin_info)) in
67            self.load_plugin_configs().map_err(|err| err.to_string())?
68        {
69            self.plugin_configs.insert(
70                plugin_name.clone(),
71                (plugin_state.to_owned(), plugin_info.to_owned()),
72            );
73            if plugin_state.is_active {
74                let plugin_proxy = PluginProxy::start_process(
75                    runtime_handle.clone(),
76                    plugin_state,
77                    plugin_info,
78                    service_provider.handler(),
79                )?;
80                self.plugins
81                    .insert(plugin_name.to_owned(), Box::new(Arc::new(plugin_proxy)));
82            }
83        }
84
85        Ok(())
86    }
87
88    pub fn subscribe_events(&mut self, notify_ctrl: &NotifyController, runtime_handle: &Handle) {
89        let plugins: PluginList = self
90            .plugins
91            .iter()
92            .map(|(name, p)| (name.to_owned(), p.clone()))
93            .collect();
94
95        let mut interval_event_receiver =
96            runtime_handle.block_on(notify_ctrl.subscribe_interval("plugin manager"));
97        let mut new_otx_event_receiver =
98            runtime_handle.block_on(notify_ctrl.subscribe_new_open_tx("plugin manager"));
99        let mut commit_otx_event_receiver =
100            runtime_handle.block_on(notify_ctrl.subscribe_commit_open_tx("plugin manager"));
101        let event_listening_thread = runtime_handle.spawn(async move {
102            loop {
103                tokio::select! {
104                    Some(elapsed) = interval_event_receiver.recv() => {
105                        plugins.iter().for_each(| (_, plugin) | {
106                            block_in_place(|| plugin.on_new_intervel(elapsed));
107                        })
108                    }
109                    Some(open_tx) = new_otx_event_receiver.recv() => {
110                        plugins.iter().for_each(|(_, plugin) | {
111                            block_in_place(|| plugin.on_new_otx(open_tx.clone()));
112                        })
113                    }
114                    Some(otx_hash) = commit_otx_event_receiver.recv() => {
115                        plugins.iter().for_each(|(_, plugin) | {
116                            block_in_place(|| plugin.on_commit_otx(otx_hash.clone()));
117                        })
118                    }
119                }
120            }
121        });
122        self._event_thread = Some(event_listening_thread);
123    }
124
125    pub fn plugin_configs(&self) -> &HashMap<String, (PluginMeta, PluginInfo)> {
126        &self.plugin_configs
127    }
128
129    pub fn service_handler(&self) -> HostServiceHandler {
130        self.service_provider.clone()
131    }
132
133    fn load_plugin_configs(&self) -> Result<HashMap<String, (PluginMeta, PluginInfo)>, io::Error> {
134        if !self._plugin_dir.exists() {
135            fs::create_dir_all(&self._plugin_dir)?;
136        }
137        if !self.inactive_plugin_dir.exists() {
138            fs::create_dir_all(&self.inactive_plugin_dir)?;
139        }
140
141        let mut plugin_configs = HashMap::new();
142        for (dir, is_active) in &[
143            (&self._plugin_dir, true),
144            (&self.inactive_plugin_dir, false),
145        ] {
146            for entry in fs::read_dir(dir)? {
147                let path = entry?.path();
148                if path.is_file() {
149                    let plugin_state = PluginMeta::new(path.clone(), *is_active, false);
150                    match PluginProxy::load_plugin_info(path.clone()) {
151                        Ok(plugin_info) => {
152                            log::info!("Loaded plugin: {}", plugin_info.name);
153                            plugin_configs.insert(
154                                plugin_info.clone().name,
155                                (plugin_state, plugin_info.clone()),
156                            );
157                        }
158                        Err(err) => {
159                            log::warn!("get_config error: {}, path: {:?}", err, path);
160                        }
161                    }
162                }
163            }
164        }
165        Ok(plugin_configs)
166    }
167}