otx_pool/plugin_extension/
plugin_proxy.rs

1use crate::notify::RuntimeHandle;
2
3use otx_pool_plugin_protocol::{
4    HostServiceHandler, MessageFromHost, MessageFromPlugin, MessageType, Plugin, PluginInfo,
5    PluginMeta,
6};
7
8use ckb_types::core::service::Request;
9use crossbeam_channel::{bounded, select, unbounded, Sender};
10use tokio::task::JoinHandle;
11
12use std::io::{BufRead, BufReader, Write};
13use std::path::PathBuf;
14use std::process::{Child, ChildStdin, Command, Stdio};
15
16pub type RequestHandler = Sender<Request<(u64, MessageFromHost), (u64, MessageFromPlugin)>>;
17pub type MsgHandler = Sender<(u64, MessageFromHost)>;
18
19pub struct PluginProcess {
20    _plugin_process: Child,
21    _stdin_thread: JoinHandle<()>,
22    _stdout_thread: JoinHandle<()>,
23}
24
25pub struct PluginProxy {
26    state: PluginMeta,
27    info: PluginInfo,
28    _process: PluginProcess,
29
30    /// Send request to stdin thread, and expect a response from stdout thread.
31    request_handler: RequestHandler,
32
33    /// Send notifaction/response to stdin thread.
34    msg_handler: MsgHandler,
35}
36
37impl Plugin for PluginProxy {
38    fn get_name(&self) -> String {
39        self.info.name.clone()
40    }
41
42    fn get_info(&self) -> PluginInfo {
43        self.info.clone()
44    }
45
46    fn get_meta(&self) -> PluginMeta {
47        self.state.clone()
48    }
49}
50
51impl PluginProxy {
52    pub fn msg_handler(&self) -> MsgHandler {
53        self.msg_handler.clone()
54    }
55
56    pub fn request_handler(&self) -> RequestHandler {
57        self.request_handler.clone()
58    }
59
60    /// This function will create a temporary plugin process to fetch plugin information.
61    pub fn load_plugin_info(binary_path: PathBuf) -> Result<PluginInfo, String> {
62        let mut child = Command::new(&binary_path)
63            .stdin(Stdio::piped())
64            .stdout(Stdio::piped())
65            .spawn()
66            .map_err(|err| err.to_string())?;
67        let mut stdin = child
68            .stdin
69            .take()
70            .ok_or_else(|| String::from("Get stdin failed"))?;
71        let stdout = child
72            .stdout
73            .take()
74            .ok_or_else(|| String::from("Get stdout failed"))?;
75
76        // request from host to plugin
77        let request = (0u64, MessageFromHost::GetPluginInfo);
78        let request_string = serde_json::to_string(&request).expect("Serialize request error");
79        log::debug!("Send request to plugin: {}", request_string);
80        stdin
81            .write_all(format!("{}\n", request_string).as_bytes())
82            .map_err(|err| err.to_string())?;
83        stdin.flush().map_err(|err| err.to_string())?;
84
85        // get response from plugin
86        let mut buf_reader = BufReader::new(stdout);
87        let mut response_string = String::new();
88        buf_reader
89            .read_line(&mut response_string)
90            .map_err(|err| err.to_string())?;
91        log::debug!("Receive response from plugin: {}", response_string.trim());
92        let (id, response): (u64, MessageFromPlugin) =
93            serde_json::from_str(&response_string).map_err(|err| err.to_string())?;
94
95        if let (0u64, MessageFromPlugin::PluginInfo(plugin_info)) = (id, response) {
96            Ok(plugin_info)
97        } else {
98            Err(format!(
99                "Invalid response for get_info call to plugin {:?}, response: {}",
100                binary_path, response_string
101            ))
102        }
103    }
104
105    pub fn start_process(
106        runtime: RuntimeHandle,
107        plugin_state: PluginMeta,
108        plugin_info: PluginInfo,
109        service_handler: HostServiceHandler,
110    ) -> Result<PluginProxy, String> {
111        let mut child = Command::new(plugin_state.binary_path.clone())
112            .stdin(Stdio::piped())
113            .stdout(Stdio::piped())
114            .spawn()
115            .map_err(|err| err.to_string())?;
116        let mut stdin = child
117            .stdin
118            .take()
119            .ok_or_else(|| String::from("Get stdin failed"))?;
120        let stdout = child
121            .stdout
122            .take()
123            .ok_or_else(|| String::from("Get stdout failed"))?;
124
125        // the host request channel receives request from host to plugin
126        let (host_request_sender, host_request_receiver) = bounded(1);
127
128        // the plugin response channel receives response from plugin,
129        // it cooperates with the host request channel to complete the request-response pair
130        let (plugin_response_sender, plugin_response_receiver) = bounded(1);
131
132        // the channel sends notifications or responses from the host to plugin
133        let (host_msg_sender, host_msg_receiver) = unbounded();
134
135        let plugin_name = plugin_info.name.clone();
136        // this thread processes stdin information from host to plugin
137        let stdin_thread = runtime.spawn(async move  {
138            let handle_host_msg =
139                |stdin: &mut ChildStdin, (id, response)| -> Result<bool, String> {
140                    let response_string =
141                        serde_json::to_string(&(id, response)).expect("Serialize response error");
142                    log::debug!("Send response/notification to plugin: {}", response_string);
143                    stdin
144                        .write_all(format!("{}\n", response_string).as_bytes())
145                        .map_err(|err| err.to_string())?;
146                    stdin.flush().map_err(|err| err.to_string())?;
147                    Ok(false)
148                };
149
150            let mut do_select = || -> Result<bool, String> {
151                select! {
152                    // request from host to plugin
153                    recv(host_request_receiver) -> msg => {
154                        match msg {
155                            Ok(Request { responder, arguments }) => {
156                                let request_string = serde_json::to_string(&arguments).expect("Serialize request error");
157                                log::debug!("Send request to plugin: {}", request_string);
158                                stdin.write_all(format!("{}\n", request_string).as_bytes()).map_err(|err| err.to_string())?;
159                                stdin.flush().map_err(|err| err.to_string())?;
160                                loop {
161                                    select!{
162                                        recv(plugin_response_receiver) -> msg => {
163                                            match msg {
164                                                Ok(response) => {
165                                                    responder.send(response).map_err(|err| err.to_string())?;
166                                                    return Ok(false);
167                                                }
168                                                Err(err) => {
169                                                    return Err(err.to_string());
170                                                }
171                                            }
172                                        },
173                                        recv(host_msg_receiver) -> msg => {
174                                            match msg {
175                                                Ok(msg) => {
176                                                    handle_host_msg(&mut stdin, msg)?;
177                                                },
178                                                Err(err) => {
179                                                    return Err(err.to_string());
180                                                }
181                                            }
182                                        }
183                                    }
184                                }
185                            }
186                            Err(err) => Err(err.to_string())
187                        }
188                    }
189                    // repsonse/notification from host to plugin
190                    recv(host_msg_receiver) -> msg => {
191                        match msg {
192                            Ok(msg) => handle_host_msg(&mut stdin, msg),
193                            Err(err) => Err(err.to_string())
194                        }
195                    }
196                    // ignore the unexpected response from plugin
197                    recv(plugin_response_receiver) -> msg => {
198                        log::debug!("Received unexpected response/notification to plugin: {:?}", msg);
199                        match msg {
200                            Ok(_) => Ok(false),
201                            Err(err) => Err(err.to_string())
202                        }
203                    }
204                }
205            };
206            loop {
207                match do_select() {
208                    Ok(true) => {
209                        break;
210                    }
211                    Ok(false) => (),
212                    Err(err) => {
213                        log::error!("plugin {} stdin error: {}", plugin_name, err);
214                        break;
215                    }
216                }
217            }
218        });
219
220        let plugin_name = plugin_info.name.clone();
221        let msg_sender = host_msg_sender.clone();
222        let mut buf_reader = BufReader::new(stdout);
223        let stdout_thread = runtime.spawn(async move {
224            let mut do_recv = || -> Result<bool, String> {
225                let mut content = String::new();
226                if buf_reader
227                    .read_line(&mut content)
228                    .map_err(|err| err.to_string())?
229                    == 0
230                {
231                    // EOF
232                    return Ok(true);
233                }
234
235                let (id, message_from_plugin): (u64, MessageFromPlugin) =
236                    serde_json::from_str(&content).map_err(|err| err.to_string())?;
237                match message_from_plugin.get_message_type() {
238                    MessageType::Response => {
239                        // Receive response from plugin
240                        log::debug!("Receive response from plugin: {}", content.trim());
241                        plugin_response_sender
242                            .send((id, message_from_plugin))
243                            .map_err(|err| err.to_string())?;
244                    }
245                    MessageType::Request => {
246                        // Handle request from plugin
247                        log::debug!("Receive request from plugin: {}", content.trim());
248                        log::debug!("Sending request to ServiceProvider");
249                        let message_from_host =
250                            Request::call(&service_handler, message_from_plugin).ok_or_else(
251                                || String::from("Send request to ServiceProvider failed"),
252                            )?;
253                        log::debug!(
254                            "Received response from ServiceProvider: {:?}",
255                            message_from_host
256                        );
257                        msg_sender
258                            .send((id, message_from_host))
259                            .map_err(|err| err.to_string())?;
260                    }
261                    MessageType::Notify => {
262                        unreachable!()
263                    }
264                }
265
266                Ok(false)
267            };
268            loop {
269                match do_recv() {
270                    Ok(true) => {
271                        log::info!("plugin {} quit", plugin_name);
272                        break;
273                    }
274                    Ok(false) => {}
275                    Err(err) => {
276                        log::warn!("plugin {} stdout error: {}", plugin_name, err);
277                        break;
278                    }
279                }
280            }
281        });
282
283        let process = PluginProcess {
284            _plugin_process: child,
285            _stdin_thread: stdin_thread,
286            _stdout_thread: stdout_thread,
287        };
288
289        Ok(PluginProxy {
290            state: plugin_state,
291            info: plugin_info,
292            _process: process,
293            request_handler: host_request_sender,
294            msg_handler: host_msg_sender,
295        })
296    }
297}
298
299impl Drop for PluginProxy {
300    fn drop(&mut self) {
301        // TODO: send term signal to the process
302    }
303}