otx_pool/plugin_extension/
plugin_proxy.rs1use crate::notify::RuntimeHandle;
2
3use otx_pool_plugin_protocol::{
4 HostServiceHandler, MessageFromHost, MessageFromPlugin, MessageType, Plugin, PluginInfo,
5 PluginMeta,
6};
7
8use ckb_types::core::service::Request;
9use crossbeam_channel::{bounded, select, unbounded, Sender};
10use tokio::task::JoinHandle;
11
12use std::io::{BufRead, BufReader, Write};
13use std::path::PathBuf;
14use std::process::{Child, ChildStdin, Command, Stdio};
15
16pub type RequestHandler = Sender<Request<(u64, MessageFromHost), (u64, MessageFromPlugin)>>;
17pub type MsgHandler = Sender<(u64, MessageFromHost)>;
18
19pub struct PluginProcess {
20 _plugin_process: Child,
21 _stdin_thread: JoinHandle<()>,
22 _stdout_thread: JoinHandle<()>,
23}
24
25pub struct PluginProxy {
26 state: PluginMeta,
27 info: PluginInfo,
28 _process: PluginProcess,
29
30 request_handler: RequestHandler,
32
33 msg_handler: MsgHandler,
35}
36
37impl Plugin for PluginProxy {
38 fn get_name(&self) -> String {
39 self.info.name.clone()
40 }
41
42 fn get_info(&self) -> PluginInfo {
43 self.info.clone()
44 }
45
46 fn get_meta(&self) -> PluginMeta {
47 self.state.clone()
48 }
49}
50
51impl PluginProxy {
52 pub fn msg_handler(&self) -> MsgHandler {
53 self.msg_handler.clone()
54 }
55
56 pub fn request_handler(&self) -> RequestHandler {
57 self.request_handler.clone()
58 }
59
60 pub fn load_plugin_info(binary_path: PathBuf) -> Result<PluginInfo, String> {
62 let mut child = Command::new(&binary_path)
63 .stdin(Stdio::piped())
64 .stdout(Stdio::piped())
65 .spawn()
66 .map_err(|err| err.to_string())?;
67 let mut stdin = child
68 .stdin
69 .take()
70 .ok_or_else(|| String::from("Get stdin failed"))?;
71 let stdout = child
72 .stdout
73 .take()
74 .ok_or_else(|| String::from("Get stdout failed"))?;
75
76 let request = (0u64, MessageFromHost::GetPluginInfo);
78 let request_string = serde_json::to_string(&request).expect("Serialize request error");
79 log::debug!("Send request to plugin: {}", request_string);
80 stdin
81 .write_all(format!("{}\n", request_string).as_bytes())
82 .map_err(|err| err.to_string())?;
83 stdin.flush().map_err(|err| err.to_string())?;
84
85 let mut buf_reader = BufReader::new(stdout);
87 let mut response_string = String::new();
88 buf_reader
89 .read_line(&mut response_string)
90 .map_err(|err| err.to_string())?;
91 log::debug!("Receive response from plugin: {}", response_string.trim());
92 let (id, response): (u64, MessageFromPlugin) =
93 serde_json::from_str(&response_string).map_err(|err| err.to_string())?;
94
95 if let (0u64, MessageFromPlugin::PluginInfo(plugin_info)) = (id, response) {
96 Ok(plugin_info)
97 } else {
98 Err(format!(
99 "Invalid response for get_info call to plugin {:?}, response: {}",
100 binary_path, response_string
101 ))
102 }
103 }
104
105 pub fn start_process(
106 runtime: RuntimeHandle,
107 plugin_state: PluginMeta,
108 plugin_info: PluginInfo,
109 service_handler: HostServiceHandler,
110 ) -> Result<PluginProxy, String> {
111 let mut child = Command::new(plugin_state.binary_path.clone())
112 .stdin(Stdio::piped())
113 .stdout(Stdio::piped())
114 .spawn()
115 .map_err(|err| err.to_string())?;
116 let mut stdin = child
117 .stdin
118 .take()
119 .ok_or_else(|| String::from("Get stdin failed"))?;
120 let stdout = child
121 .stdout
122 .take()
123 .ok_or_else(|| String::from("Get stdout failed"))?;
124
125 let (host_request_sender, host_request_receiver) = bounded(1);
127
128 let (plugin_response_sender, plugin_response_receiver) = bounded(1);
131
132 let (host_msg_sender, host_msg_receiver) = unbounded();
134
135 let plugin_name = plugin_info.name.clone();
136 let stdin_thread = runtime.spawn(async move {
138 let handle_host_msg =
139 |stdin: &mut ChildStdin, (id, response)| -> Result<bool, String> {
140 let response_string =
141 serde_json::to_string(&(id, response)).expect("Serialize response error");
142 log::debug!("Send response/notification to plugin: {}", response_string);
143 stdin
144 .write_all(format!("{}\n", response_string).as_bytes())
145 .map_err(|err| err.to_string())?;
146 stdin.flush().map_err(|err| err.to_string())?;
147 Ok(false)
148 };
149
150 let mut do_select = || -> Result<bool, String> {
151 select! {
152 recv(host_request_receiver) -> msg => {
154 match msg {
155 Ok(Request { responder, arguments }) => {
156 let request_string = serde_json::to_string(&arguments).expect("Serialize request error");
157 log::debug!("Send request to plugin: {}", request_string);
158 stdin.write_all(format!("{}\n", request_string).as_bytes()).map_err(|err| err.to_string())?;
159 stdin.flush().map_err(|err| err.to_string())?;
160 loop {
161 select!{
162 recv(plugin_response_receiver) -> msg => {
163 match msg {
164 Ok(response) => {
165 responder.send(response).map_err(|err| err.to_string())?;
166 return Ok(false);
167 }
168 Err(err) => {
169 return Err(err.to_string());
170 }
171 }
172 },
173 recv(host_msg_receiver) -> msg => {
174 match msg {
175 Ok(msg) => {
176 handle_host_msg(&mut stdin, msg)?;
177 },
178 Err(err) => {
179 return Err(err.to_string());
180 }
181 }
182 }
183 }
184 }
185 }
186 Err(err) => Err(err.to_string())
187 }
188 }
189 recv(host_msg_receiver) -> msg => {
191 match msg {
192 Ok(msg) => handle_host_msg(&mut stdin, msg),
193 Err(err) => Err(err.to_string())
194 }
195 }
196 recv(plugin_response_receiver) -> msg => {
198 log::debug!("Received unexpected response/notification to plugin: {:?}", msg);
199 match msg {
200 Ok(_) => Ok(false),
201 Err(err) => Err(err.to_string())
202 }
203 }
204 }
205 };
206 loop {
207 match do_select() {
208 Ok(true) => {
209 break;
210 }
211 Ok(false) => (),
212 Err(err) => {
213 log::error!("plugin {} stdin error: {}", plugin_name, err);
214 break;
215 }
216 }
217 }
218 });
219
220 let plugin_name = plugin_info.name.clone();
221 let msg_sender = host_msg_sender.clone();
222 let mut buf_reader = BufReader::new(stdout);
223 let stdout_thread = runtime.spawn(async move {
224 let mut do_recv = || -> Result<bool, String> {
225 let mut content = String::new();
226 if buf_reader
227 .read_line(&mut content)
228 .map_err(|err| err.to_string())?
229 == 0
230 {
231 return Ok(true);
233 }
234
235 let (id, message_from_plugin): (u64, MessageFromPlugin) =
236 serde_json::from_str(&content).map_err(|err| err.to_string())?;
237 match message_from_plugin.get_message_type() {
238 MessageType::Response => {
239 log::debug!("Receive response from plugin: {}", content.trim());
241 plugin_response_sender
242 .send((id, message_from_plugin))
243 .map_err(|err| err.to_string())?;
244 }
245 MessageType::Request => {
246 log::debug!("Receive request from plugin: {}", content.trim());
248 log::debug!("Sending request to ServiceProvider");
249 let message_from_host =
250 Request::call(&service_handler, message_from_plugin).ok_or_else(
251 || String::from("Send request to ServiceProvider failed"),
252 )?;
253 log::debug!(
254 "Received response from ServiceProvider: {:?}",
255 message_from_host
256 );
257 msg_sender
258 .send((id, message_from_host))
259 .map_err(|err| err.to_string())?;
260 }
261 MessageType::Notify => {
262 unreachable!()
263 }
264 }
265
266 Ok(false)
267 };
268 loop {
269 match do_recv() {
270 Ok(true) => {
271 log::info!("plugin {} quit", plugin_name);
272 break;
273 }
274 Ok(false) => {}
275 Err(err) => {
276 log::warn!("plugin {} stdout error: {}", plugin_name, err);
277 break;
278 }
279 }
280 }
281 });
282
283 let process = PluginProcess {
284 _plugin_process: child,
285 _stdin_thread: stdin_thread,
286 _stdout_thread: stdout_thread,
287 };
288
289 Ok(PluginProxy {
290 state: plugin_state,
291 info: plugin_info,
292 _process: process,
293 request_handler: host_request_sender,
294 msg_handler: host_msg_sender,
295 })
296 }
297}
298
299impl Drop for PluginProxy {
300 fn drop(&mut self) {
301 }
303}