openvcs_core/
plugin_stdio.rs1use crate::plugin_protocol::{PluginMessage, RpcRequest, RpcResponse};
2use serde::Serialize;
3use serde::de::DeserializeOwned;
4use serde_json::Value;
5use std::collections::{HashMap, VecDeque};
6use std::io::{self, BufRead, Write};
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9
10#[derive(Debug, Clone)]
11pub struct PluginError {
12 pub code: Option<String>,
13 pub message: String,
14 pub data: Option<Value>,
15}
16
17impl PluginError {
18 pub fn message(message: impl Into<String>) -> Self {
19 Self {
20 code: None,
21 message: message.into(),
22 data: None,
23 }
24 }
25
26 pub fn code(code: impl Into<String>, message: impl Into<String>) -> Self {
27 Self {
28 code: Some(code.into()),
29 message: message.into(),
30 data: None,
31 }
32 }
33
34 pub fn with_data(mut self, data: Value) -> Self {
35 self.data = Some(data);
36 self
37 }
38}
39
40pub fn err_display(err: impl std::fmt::Display) -> PluginError {
41 PluginError::message(err.to_string())
42}
43
44pub fn receive_message<R: BufRead>(stdin: &mut R) -> Option<PluginMessage> {
45 crate::plugin_logging::ensure_initialized();
46 let mut line = String::new();
47 loop {
48 line.clear();
49 let n = stdin.read_line(&mut line).ok()?;
50 if n == 0 {
51 return None;
52 }
53 let trimmed = line.trim();
54 if trimmed.is_empty() {
55 continue;
56 }
57 if let Ok(msg) = serde_json::from_str::<PluginMessage>(trimmed) {
58 return Some(msg);
59 }
60 }
61}
62
63pub fn write_message<W: Write>(out: &mut W, msg: &PluginMessage) -> io::Result<()> {
64 crate::plugin_logging::ensure_initialized();
65 let line = serde_json::to_string(msg).unwrap_or_else(|_| "{}".into());
66 writeln!(out, "{line}")?;
67 out.flush()?;
68 Ok(())
69}
70
71pub fn send_message_shared<W: Write>(out: &Arc<Mutex<W>>, msg: &PluginMessage) {
72 crate::plugin_logging::ensure_initialized();
73 if let Ok(mut w) = out.lock() {
74 let _ = write_message(&mut *w, msg);
75 }
76}
77
78pub fn send_request_shared<W: Write>(out: &Arc<Mutex<W>>, req: RpcRequest) {
79 send_message_shared(out, &PluginMessage::Request(req));
80}
81
82pub fn send_request<W: Write>(out: &mut W, req: RpcRequest) -> io::Result<()> {
83 write_message(out, &PluginMessage::Request(req))
84}
85
86pub fn receive_request<R: BufRead>(stdin: &mut R) -> Option<RpcRequest> {
87 loop {
88 match receive_message(stdin)? {
89 PluginMessage::Request(req) => return Some(req),
90 PluginMessage::Response(_) | PluginMessage::Event { .. } => {}
91 }
92 }
93}
94
95pub fn respond_shared<W: Write>(out: &Arc<Mutex<W>>, id: u64, res: Result<Value, PluginError>) {
96 let response = match res {
97 Ok(result) => RpcResponse {
98 id,
99 ok: true,
100 result,
101 error: None,
102 error_code: None,
103 error_data: None,
104 },
105 Err(err) => RpcResponse {
106 id,
107 ok: false,
108 result: Value::Null,
109 error: Some(err.message),
110 error_code: err.code,
111 error_data: err.data,
112 },
113 };
114
115 send_message_shared(out, &PluginMessage::Response(response));
116}
117
118pub fn ok<T: Serialize>(value: T) -> Result<Value, PluginError> {
119 serde_json::to_value(value).map_err(|e| PluginError::code("plugin.serialize", e.to_string()))
120}
121
122pub fn ok_null() -> Result<Value, PluginError> {
123 Ok(Value::Null)
124}
125
126pub fn parse_json_params<T: DeserializeOwned>(value: Value) -> Result<T, String> {
127 serde_json::from_value(value).map_err(|e| format!("invalid params: {e}"))
128}
129
130#[derive(Debug)]
131pub struct RequestIdState {
132 pub next_id: u64,
133}
134
135pub fn call_host<W: Write, R: BufRead>(
136 out: &Arc<Mutex<W>>,
137 stdin: &Arc<Mutex<R>>,
138 queue: &Arc<Mutex<VecDeque<crate::plugin_protocol::RpcRequest>>>,
139 ids: &Arc<Mutex<RequestIdState>>,
140 method: &str,
141 params: Value,
142 timeout: Duration,
143) -> Result<Value, PluginError> {
144 crate::plugin_logging::ensure_initialized();
145 let id = {
146 let mut lock = ids
147 .lock()
148 .map_err(|_| PluginError::message("pending lock poisoned"))?;
149 let id = lock.next_id;
150 lock.next_id = lock.next_id.saturating_add(1);
151 id
152 };
153
154 send_request_shared(
155 out,
156 crate::plugin_protocol::RpcRequest {
157 id,
158 method: method.to_string(),
159 params,
160 },
161 );
162
163 let deadline = Instant::now() + timeout;
164 let mut stash: HashMap<u64, RpcResponse> = HashMap::new();
165
166 loop {
167 if Instant::now() > deadline {
168 return Err(PluginError::code("host.timeout", "host call timed out"));
169 }
170
171 if let Some(resp) = stash.remove(&id) {
172 return if resp.ok {
173 Ok(resp.result)
174 } else {
175 Err(PluginError {
176 code: resp.error_code.or(Some("host.error".into())),
177 message: resp.error.unwrap_or_else(|| "error".into()),
178 data: resp.error_data,
179 })
180 };
181 }
182
183 let msg = {
184 let mut lock = stdin
185 .lock()
186 .map_err(|_| PluginError::message("stdin lock poisoned"))?;
187 receive_message(&mut *lock).ok_or_else(|| PluginError::message("host closed stdin"))?
188 };
189
190 match msg {
191 PluginMessage::Response(resp) => {
192 if resp.id == id {
193 return if resp.ok {
194 Ok(resp.result)
195 } else {
196 Err(PluginError {
197 code: resp.error_code.or(Some("host.error".into())),
198 message: resp.error.unwrap_or_else(|| "error".into()),
199 data: resp.error_data,
200 })
201 };
202 }
203 stash.insert(resp.id, resp);
204 }
205 PluginMessage::Request(req) => {
206 if let Ok(mut q) = queue.lock() {
207 q.push_back(req);
208 }
209 }
210 PluginMessage::Event { .. } => {}
211 }
212 }
213}