openvcs_core/
plugin_stdio.rs

1use crate::plugin_protocol::{PluginMessage, RpcRequest, RpcResponse};
2use serde::Serialize;
3use serde::de::DeserializeOwned;
4use serde_json::Value;
5use std::collections::{HashMap, VecDeque};
6use std::io::{self, BufRead, Write};
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9
10#[derive(Debug, Clone)]
11pub struct PluginError {
12    pub code: Option<String>,
13    pub message: String,
14    pub data: Option<Value>,
15}
16
17impl PluginError {
18    pub fn message(message: impl Into<String>) -> Self {
19        Self {
20            code: None,
21            message: message.into(),
22            data: None,
23        }
24    }
25
26    pub fn code(code: impl Into<String>, message: impl Into<String>) -> Self {
27        Self {
28            code: Some(code.into()),
29            message: message.into(),
30            data: None,
31        }
32    }
33
34    pub fn with_data(mut self, data: Value) -> Self {
35        self.data = Some(data);
36        self
37    }
38}
39
40pub fn err_display(err: impl std::fmt::Display) -> PluginError {
41    PluginError::message(err.to_string())
42}
43
44pub fn receive_message<R: BufRead>(stdin: &mut R) -> Option<PluginMessage> {
45    crate::plugin_logging::ensure_initialized();
46    let mut line = String::new();
47    loop {
48        line.clear();
49        let n = stdin.read_line(&mut line).ok()?;
50        if n == 0 {
51            return None;
52        }
53        let trimmed = line.trim();
54        if trimmed.is_empty() {
55            continue;
56        }
57        if let Ok(msg) = serde_json::from_str::<PluginMessage>(trimmed) {
58            return Some(msg);
59        }
60    }
61}
62
63pub fn write_message<W: Write>(out: &mut W, msg: &PluginMessage) -> io::Result<()> {
64    crate::plugin_logging::ensure_initialized();
65    let line = serde_json::to_string(msg).unwrap_or_else(|_| "{}".into());
66    writeln!(out, "{line}")?;
67    out.flush()?;
68    Ok(())
69}
70
71pub fn send_message_shared<W: Write>(out: &Arc<Mutex<W>>, msg: &PluginMessage) {
72    crate::plugin_logging::ensure_initialized();
73    if let Ok(mut w) = out.lock() {
74        let _ = write_message(&mut *w, msg);
75    }
76}
77
78pub fn send_request_shared<W: Write>(out: &Arc<Mutex<W>>, req: RpcRequest) {
79    send_message_shared(out, &PluginMessage::Request(req));
80}
81
82pub fn send_request<W: Write>(out: &mut W, req: RpcRequest) -> io::Result<()> {
83    write_message(out, &PluginMessage::Request(req))
84}
85
86pub fn receive_request<R: BufRead>(stdin: &mut R) -> Option<RpcRequest> {
87    loop {
88        match receive_message(stdin)? {
89            PluginMessage::Request(req) => return Some(req),
90            PluginMessage::Response(_) | PluginMessage::Event { .. } => {}
91        }
92    }
93}
94
95pub fn respond_shared<W: Write>(out: &Arc<Mutex<W>>, id: u64, res: Result<Value, PluginError>) {
96    let response = match res {
97        Ok(result) => RpcResponse {
98            id,
99            ok: true,
100            result,
101            error: None,
102            error_code: None,
103            error_data: None,
104        },
105        Err(err) => RpcResponse {
106            id,
107            ok: false,
108            result: Value::Null,
109            error: Some(err.message),
110            error_code: err.code,
111            error_data: err.data,
112        },
113    };
114
115    send_message_shared(out, &PluginMessage::Response(response));
116}
117
118pub fn ok<T: Serialize>(value: T) -> Result<Value, PluginError> {
119    serde_json::to_value(value).map_err(|e| PluginError::code("plugin.serialize", e.to_string()))
120}
121
122pub fn ok_null() -> Result<Value, PluginError> {
123    Ok(Value::Null)
124}
125
126pub fn parse_json_params<T: DeserializeOwned>(value: Value) -> Result<T, String> {
127    serde_json::from_value(value).map_err(|e| format!("invalid params: {e}"))
128}
129
130#[derive(Debug)]
131pub struct RequestIdState {
132    pub next_id: u64,
133}
134
135pub fn call_host<W: Write, R: BufRead>(
136    out: &Arc<Mutex<W>>,
137    stdin: &Arc<Mutex<R>>,
138    queue: &Arc<Mutex<VecDeque<crate::plugin_protocol::RpcRequest>>>,
139    ids: &Arc<Mutex<RequestIdState>>,
140    method: &str,
141    params: Value,
142    timeout: Duration,
143) -> Result<Value, PluginError> {
144    crate::plugin_logging::ensure_initialized();
145    let id = {
146        let mut lock = ids
147            .lock()
148            .map_err(|_| PluginError::message("pending lock poisoned"))?;
149        let id = lock.next_id;
150        lock.next_id = lock.next_id.saturating_add(1);
151        id
152    };
153
154    send_request_shared(
155        out,
156        crate::plugin_protocol::RpcRequest {
157            id,
158            method: method.to_string(),
159            params,
160        },
161    );
162
163    let deadline = Instant::now() + timeout;
164    let mut stash: HashMap<u64, RpcResponse> = HashMap::new();
165
166    loop {
167        if Instant::now() > deadline {
168            return Err(PluginError::code("host.timeout", "host call timed out"));
169        }
170
171        if let Some(resp) = stash.remove(&id) {
172            return if resp.ok {
173                Ok(resp.result)
174            } else {
175                Err(PluginError {
176                    code: resp.error_code.or(Some("host.error".into())),
177                    message: resp.error.unwrap_or_else(|| "error".into()),
178                    data: resp.error_data,
179                })
180            };
181        }
182
183        let msg = {
184            let mut lock = stdin
185                .lock()
186                .map_err(|_| PluginError::message("stdin lock poisoned"))?;
187            receive_message(&mut *lock).ok_or_else(|| PluginError::message("host closed stdin"))?
188        };
189
190        match msg {
191            PluginMessage::Response(resp) => {
192                if resp.id == id {
193                    return if resp.ok {
194                        Ok(resp.result)
195                    } else {
196                        Err(PluginError {
197                            code: resp.error_code.or(Some("host.error".into())),
198                            message: resp.error.unwrap_or_else(|| "error".into()),
199                            data: resp.error_data,
200                        })
201                    };
202                }
203                stash.insert(resp.id, resp);
204            }
205            PluginMessage::Request(req) => {
206                if let Ok(mut q) = queue.lock() {
207                    q.push_back(req);
208                }
209            }
210            PluginMessage::Event { .. } => {}
211        }
212    }
213}