openvcs_core/
plugin_runtime.rs1#![cfg(feature = "plugin-protocol")]
2
3use crate::models::VcsEvent;
4use crate::plugin_protocol::{PluginMessage, RpcRequest};
5use crate::plugin_stdio::{PluginError, receive_message, respond_shared, send_message_shared};
6use crate::plugin_stdio::ok_null;
7use std::collections::VecDeque;
8use std::collections::HashMap;
9use std::io::{self, BufReader, LineWriter};
10use std::sync::{Arc, Mutex, OnceLock};
11use std::time::Duration;
12
13pub type HandlerResult = Result<serde_json::Value, PluginError>;
14pub type EventHandlerResult = Result<(), PluginError>;
15
16const DEFAULT_HOST_TIMEOUT_MS: u64 = 60_000;
17const HOST_TIMEOUT_ENV: &str = "OPENVCS_PLUGIN_HOST_TIMEOUT_MS";
18
19fn host_timeout() -> Duration {
20 std::env::var(HOST_TIMEOUT_ENV)
21 .ok()
22 .and_then(|s| s.parse::<u64>().ok())
23 .filter(|&ms| ms > 0)
24 .map(Duration::from_millis)
25 .unwrap_or(Duration::from_millis(DEFAULT_HOST_TIMEOUT_MS))
26}
27
28pub struct PluginCtx {
29 stdout: Arc<Mutex<LineWriter<io::Stdout>>>,
30}
31
32impl PluginCtx {
33 pub fn emit(&self, event: VcsEvent) {
34 send_message_shared(&self.stdout, &PluginMessage::Event { event });
35 }
36
37 pub fn stdout(&self) -> Arc<Mutex<LineWriter<io::Stdout>>> {
38 Arc::clone(&self.stdout)
39 }
40}
41
42fn next_request(
43 queue: &Arc<Mutex<VecDeque<RpcRequest>>>,
44 stdin: &Arc<Mutex<BufReader<io::Stdin>>>,
45) -> Option<RpcRequest> {
46 if let Ok(mut q) = queue.lock() {
47 if let Some(req) = q.pop_front() {
48 return Some(req);
49 }
50 }
51
52 loop {
53 let msg = {
54 let mut lock = stdin.lock().ok()?;
55 receive_message(&mut *lock)?
56 };
57 match msg {
58 PluginMessage::Request(req) => return Some(req),
59 PluginMessage::Response(_) | PluginMessage::Event { .. } => continue,
60 }
61 }
62}
63
64pub struct PluginRuntime {
65 ctx: PluginCtx,
66 stdin: Arc<Mutex<BufReader<io::Stdin>>>,
67 queue: Arc<Mutex<VecDeque<RpcRequest>>>,
68}
69
70impl PluginRuntime {
71 pub fn init() -> Self {
72 #[cfg(target_arch = "wasm32")]
73 let next_id = 1u64 << 63;
74
75 #[cfg(not(target_arch = "wasm32"))]
76 let next_id = 1u64;
77
78 let timeout = host_timeout();
79
80 let stdout = Arc::new(Mutex::new(LineWriter::new(io::stdout())));
81 let stdin = Arc::new(Mutex::new(BufReader::new(io::stdin())));
82 let queue: Arc<Mutex<VecDeque<RpcRequest>>> = Arc::new(Mutex::new(VecDeque::new()));
83 let ids = Arc::new(Mutex::new(crate::plugin_stdio::RequestIdState { next_id }));
84
85 crate::host::init_default_stdio_host(
86 Arc::clone(&stdout),
87 Arc::clone(&stdin),
88 Arc::clone(&queue),
89 Arc::clone(&ids),
90 timeout,
91 );
92
93 Self {
94 ctx: PluginCtx { stdout },
95 stdin,
96 queue,
97 }
98 }
99
100 pub fn ctx(&mut self) -> &mut PluginCtx {
101 &mut self.ctx
102 }
103
104 pub(crate) fn tick(
105 &mut self,
106 mut handle: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult,
107 ) -> io::Result<bool> {
108 let Some(req) = next_request(&self.queue, &self.stdin) else {
109 return Ok(false);
110 };
111 let id = req.id;
112 let res = handle(&mut self.ctx, req);
113 respond_shared(&self.ctx.stdout, id, res);
114 Ok(true)
115 }
116}
117
118type RpcHandler = Box<dyn FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static>;
119type EventHandler =
120 Box<dyn FnMut(&mut PluginCtx, serde_json::Value) -> EventHandlerResult + Send + 'static>;
121
122struct Registry {
123 rpc: HashMap<String, RpcHandler>,
124 event: HashMap<String, Vec<EventHandler>>,
125 fallback: Option<RpcHandler>,
126}
127
128static REGISTRY: OnceLock<Mutex<Registry>> = OnceLock::new();
129
130fn registry() -> &'static Mutex<Registry> {
131 REGISTRY.get_or_init(|| {
132 Mutex::new(Registry {
133 rpc: HashMap::new(),
134 event: HashMap::new(),
135 fallback: None,
136 })
137 })
138}
139
140fn register_rpc_impl(
141 method: &str,
142 handler: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static,
143) {
144 if let Ok(mut lock) = registry().lock() {
145 lock.rpc.insert(method.to_string(), Box::new(handler));
146 }
147}
148
149pub fn register_delegate(
150 method: &str,
151 handler: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static,
152) {
153 register_rpc_impl(method, handler)
154}
155
156#[deprecated(note = "renamed to register_delegate")]
158pub fn on_rpc(
159 method: &str,
160 handler: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static,
161) {
162 register_rpc_impl(method, handler)
163}
164
165#[deprecated(note = "renamed to register_delegate")]
166pub fn register_rpc(
167 method: &str,
168 handler: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static,
169) {
170 register_rpc_impl(method, handler)
171}
172
173pub fn on_event(
174 name: &str,
175 handler: impl FnMut(&mut PluginCtx, serde_json::Value) -> EventHandlerResult + Send + 'static,
176) {
177 if let Ok(mut lock) = registry().lock() {
178 lock.event.entry(name.to_string()).or_default().push(Box::new(handler));
179 }
180}
181
182pub fn set_fallback_rpc(
183 handler: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static,
184) {
185 if let Ok(mut lock) = registry().lock() {
186 lock.fallback = Some(Box::new(handler));
187 }
188}
189
190fn dispatch_registered(ctx: &mut PluginCtx, req: RpcRequest) -> HandlerResult {
191 if req.method == "event.dispatch" {
192 #[derive(serde::Deserialize)]
193 struct P {
194 name: String,
195 #[serde(default)]
196 payload: serde_json::Value,
197 }
198 let p: P = crate::plugin_stdio::parse_json_params(req.params)
199 .map_err(PluginError::message)?;
200
201 if let Ok(mut lock) = registry().lock() {
202 if let Some(handlers) = lock.event.get_mut(&p.name) {
203 for h in handlers.iter_mut() {
204 h(ctx, p.payload.clone())?;
205 }
206 }
207 }
208 return ok_null();
209 }
210
211 if let Ok(mut lock) = registry().lock() {
212 if let Some(h) = lock.rpc.get_mut(&req.method) {
213 return h(ctx, req);
214 }
215 if let Some(fallback) = lock.fallback.as_mut() {
216 return fallback(ctx, req);
217 }
218 }
219
220 Err(PluginError::code(
221 "plugin.unknown_method",
222 format!("unknown method '{}'", req.method),
223 ))
224}
225
226pub fn run_registered() -> io::Result<()> {
227 let mut rt = PluginRuntime::init();
228 while rt.tick(|ctx, req| dispatch_registered(ctx, req))? {}
229 Ok(())
230}
231
232pub fn run(mut handle: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult) -> io::Result<()> {
233 let mut rt = PluginRuntime::init();
234 while rt.tick(|ctx, req| handle(ctx, req))? {}
235 Ok(())
236}