openvcs_core/
plugin_runtime.rs

1#![cfg(feature = "plugin-protocol")]
2
3use crate::models::VcsEvent;
4use crate::plugin_protocol::{PluginMessage, RpcRequest};
5use crate::plugin_stdio::ok_null;
6use crate::plugin_stdio::{PluginError, receive_message, respond_shared, send_message_shared};
7use std::collections::HashMap;
8use std::collections::VecDeque;
9use std::io::{self, BufReader, LineWriter};
10use std::sync::{Arc, Mutex, OnceLock};
11use std::time::Duration;
12
13pub type HandlerResult = Result<serde_json::Value, PluginError>;
14pub type EventHandlerResult = Result<(), PluginError>;
15
16const DEFAULT_HOST_TIMEOUT_MS: u64 = 60_000;
17const HOST_TIMEOUT_ENV: &str = "OPENVCS_PLUGIN_HOST_TIMEOUT_MS";
18
19fn host_timeout() -> Duration {
20    std::env::var(HOST_TIMEOUT_ENV)
21        .ok()
22        .and_then(|s| s.parse::<u64>().ok())
23        .filter(|&ms| ms > 0)
24        .map(Duration::from_millis)
25        .unwrap_or(Duration::from_millis(DEFAULT_HOST_TIMEOUT_MS))
26}
27
28pub struct PluginCtx {
29    stdout: Arc<Mutex<LineWriter<io::Stdout>>>,
30}
31
32impl PluginCtx {
33    pub fn emit(&self, event: VcsEvent) {
34        send_message_shared(&self.stdout, &PluginMessage::Event { event });
35    }
36
37    pub fn stdout(&self) -> Arc<Mutex<LineWriter<io::Stdout>>> {
38        Arc::clone(&self.stdout)
39    }
40}
41
42fn next_request(
43    queue: &Arc<Mutex<VecDeque<RpcRequest>>>,
44    stdin: &Arc<Mutex<BufReader<io::Stdin>>>,
45) -> Option<RpcRequest> {
46    if let Ok(mut q) = queue.lock() {
47        if let Some(req) = q.pop_front() {
48            return Some(req);
49        }
50    }
51
52    loop {
53        let msg = {
54            let mut lock = stdin.lock().ok()?;
55            receive_message(&mut *lock)?
56        };
57        match msg {
58            PluginMessage::Request(req) => return Some(req),
59            PluginMessage::Response(_) | PluginMessage::Event { .. } => continue,
60        }
61    }
62}
63
64pub struct PluginRuntime {
65    ctx: PluginCtx,
66    stdin: Arc<Mutex<BufReader<io::Stdin>>>,
67    queue: Arc<Mutex<VecDeque<RpcRequest>>>,
68}
69
70impl PluginRuntime {
71    pub fn init() -> Self {
72        #[cfg(target_arch = "wasm32")]
73        let next_id = 1u64 << 63;
74
75        #[cfg(not(target_arch = "wasm32"))]
76        let next_id = 1u64;
77
78        let timeout = host_timeout();
79
80        let stdout = Arc::new(Mutex::new(LineWriter::new(io::stdout())));
81        let stdin = Arc::new(Mutex::new(BufReader::new(io::stdin())));
82        let queue: Arc<Mutex<VecDeque<RpcRequest>>> = Arc::new(Mutex::new(VecDeque::new()));
83        let ids = Arc::new(Mutex::new(crate::plugin_stdio::RequestIdState { next_id }));
84
85        crate::host::init_default_stdio_host(
86            Arc::clone(&stdout),
87            Arc::clone(&stdin),
88            Arc::clone(&queue),
89            Arc::clone(&ids),
90            timeout,
91        );
92
93        Self {
94            ctx: PluginCtx { stdout },
95            stdin,
96            queue,
97        }
98    }
99
100    pub fn ctx(&mut self) -> &mut PluginCtx {
101        &mut self.ctx
102    }
103
104    pub(crate) fn tick(
105        &mut self,
106        mut handle: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult,
107    ) -> io::Result<bool> {
108        let Some(req) = next_request(&self.queue, &self.stdin) else {
109            return Ok(false);
110        };
111        let id = req.id;
112        let res = handle(&mut self.ctx, req);
113        respond_shared(&self.ctx.stdout, id, res);
114        Ok(true)
115    }
116}
117
118type RpcHandler = Box<dyn FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static>;
119type EventHandler =
120    Box<dyn FnMut(&mut PluginCtx, serde_json::Value) -> EventHandlerResult + Send + 'static>;
121
122struct Registry {
123    rpc: HashMap<String, RpcHandler>,
124    event: HashMap<String, Vec<EventHandler>>,
125    fallback: Option<RpcHandler>,
126}
127
128static REGISTRY: OnceLock<Mutex<Registry>> = OnceLock::new();
129
130fn registry() -> &'static Mutex<Registry> {
131    REGISTRY.get_or_init(|| {
132        Mutex::new(Registry {
133            rpc: HashMap::new(),
134            event: HashMap::new(),
135            fallback: None,
136        })
137    })
138}
139
140fn register_rpc_impl(
141    method: &str,
142    handler: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static,
143) {
144    if let Ok(mut lock) = registry().lock() {
145        lock.rpc.insert(method.to_string(), Box::new(handler));
146    }
147}
148
149pub fn register_delegate(
150    method: &str,
151    handler: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static,
152) {
153    register_rpc_impl(method, handler)
154}
155
156/// Backward-compatible alias (will be removed once callers migrate).
157#[deprecated(note = "renamed to register_delegate")]
158pub fn on_rpc(
159    method: &str,
160    handler: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static,
161) {
162    register_rpc_impl(method, handler)
163}
164
165#[deprecated(note = "renamed to register_delegate")]
166pub fn register_rpc(
167    method: &str,
168    handler: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static,
169) {
170    register_rpc_impl(method, handler)
171}
172
173pub fn on_event(
174    name: &str,
175    handler: impl FnMut(&mut PluginCtx, serde_json::Value) -> EventHandlerResult + Send + 'static,
176) {
177    if let Ok(mut lock) = registry().lock() {
178        lock.event
179            .entry(name.to_string())
180            .or_default()
181            .push(Box::new(handler));
182    }
183}
184
185pub fn set_fallback_rpc(
186    handler: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static,
187) {
188    if let Ok(mut lock) = registry().lock() {
189        lock.fallback = Some(Box::new(handler));
190    }
191}
192
193fn dispatch_registered(ctx: &mut PluginCtx, req: RpcRequest) -> HandlerResult {
194    if req.method == "event.dispatch" {
195        #[derive(serde::Deserialize)]
196        struct P {
197            name: String,
198            #[serde(default)]
199            payload: serde_json::Value,
200        }
201        let p: P =
202            crate::plugin_stdio::parse_json_params(req.params).map_err(PluginError::message)?;
203
204        if let Ok(mut lock) = registry().lock() {
205            if let Some(handlers) = lock.event.get_mut(&p.name) {
206                for h in handlers.iter_mut() {
207                    h(ctx, p.payload.clone())?;
208                }
209            }
210        }
211        return ok_null();
212    }
213
214    if let Ok(mut lock) = registry().lock() {
215        if let Some(h) = lock.rpc.get_mut(&req.method) {
216            return h(ctx, req);
217        }
218        if let Some(fallback) = lock.fallback.as_mut() {
219            return fallback(ctx, req);
220        }
221    }
222
223    Err(PluginError::code(
224        "plugin.unknown_method",
225        format!("unknown method '{}'", req.method),
226    ))
227}
228
229pub fn run_registered() -> io::Result<()> {
230    let mut rt = PluginRuntime::init();
231    while rt.tick(|ctx, req| dispatch_registered(ctx, req))? {}
232    Ok(())
233}
234
235pub fn run(mut handle: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult) -> io::Result<()> {
236    let mut rt = PluginRuntime::init();
237    while rt.tick(|ctx, req| handle(ctx, req))? {}
238    Ok(())
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use std::sync::OnceLock;
245
246    static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
247
248    fn with_test_lock<T>(f: impl FnOnce() -> T) -> T {
249        let lock = TEST_LOCK.get_or_init(|| Mutex::new(()));
250        let _guard = lock.lock().expect("test lock");
251        f()
252    }
253
254    fn reset_registry() {
255        if let Ok(mut lock) = registry().lock() {
256            lock.rpc.clear();
257            lock.event.clear();
258            lock.fallback = None;
259        }
260    }
261
262    fn test_ctx() -> PluginCtx {
263        PluginCtx {
264            stdout: Arc::new(Mutex::new(LineWriter::new(io::stdout()))),
265        }
266    }
267
268    #[test]
269    fn dispatch_registered_routes_to_rpc_handler() {
270        with_test_lock(|| {
271            reset_registry();
272            register_delegate("ping", |_ctx, req| {
273                assert_eq!(req.method, "ping");
274                crate::plugin_stdio::ok(serde_json::json!({"pong": true}))
275            });
276
277            let mut ctx = test_ctx();
278            let req = RpcRequest {
279                id: 1,
280                method: "ping".into(),
281                params: serde_json::Value::Null,
282            };
283            let res = dispatch_registered(&mut ctx, req).expect("ok");
284            assert_eq!(res, serde_json::json!({"pong": true}));
285        });
286    }
287
288    #[test]
289    fn dispatch_registered_uses_fallback_when_no_handler() {
290        with_test_lock(|| {
291            reset_registry();
292            set_fallback_rpc(|_ctx, req| {
293                crate::plugin_stdio::ok(serde_json::json!({ "method": req.method }))
294            });
295
296            let mut ctx = test_ctx();
297            let req = RpcRequest {
298                id: 1,
299                method: "unknown".into(),
300                params: serde_json::Value::Null,
301            };
302
303            let res = dispatch_registered(&mut ctx, req).expect("ok");
304            assert_eq!(res, serde_json::json!({ "method": "unknown" }));
305        });
306    }
307
308    #[test]
309    fn dispatch_registered_returns_unknown_method_error_when_unhandled() {
310        with_test_lock(|| {
311            reset_registry();
312
313            let mut ctx = test_ctx();
314            let req = RpcRequest {
315                id: 1,
316                method: "unknown".into(),
317                params: serde_json::Value::Null,
318            };
319
320            let err = dispatch_registered(&mut ctx, req).expect_err("should error");
321            assert_eq!(err.code.as_deref(), Some("plugin.unknown_method"));
322            assert!(err.message.contains("unknown method"));
323        });
324    }
325
326    #[test]
327    fn dispatch_registered_event_dispatch_invokes_handlers() {
328        with_test_lock(|| {
329            reset_registry();
330
331            let seen: Arc<Mutex<Vec<serde_json::Value>>> = Arc::new(Mutex::new(Vec::new()));
332            let seen_1 = Arc::clone(&seen);
333            on_event("evt", move |_ctx, payload| {
334                if let Ok(mut lock) = seen_1.lock() {
335                    lock.push(payload);
336                }
337                Ok(())
338            });
339
340            let mut ctx = test_ctx();
341            let req = RpcRequest {
342                id: 1,
343                method: "event.dispatch".into(),
344                params: serde_json::json!({ "name": "evt", "payload": { "x": 1 } }),
345            };
346
347            let res = dispatch_registered(&mut ctx, req).expect("ok");
348            assert_eq!(res, serde_json::Value::Null);
349
350            let lock = seen.lock().expect("lock");
351            assert_eq!(lock.as_slice(), &[serde_json::json!({ "x": 1 })]);
352        });
353    }
354
355    #[test]
356    fn dispatch_registered_event_dispatch_propagates_handler_error() {
357        with_test_lock(|| {
358            reset_registry();
359
360            on_event("evt", |_ctx, _payload| {
361                Err(PluginError::code("evt.fail", "nope"))
362            });
363
364            let mut ctx = test_ctx();
365            let req = RpcRequest {
366                id: 1,
367                method: "event.dispatch".into(),
368                params: serde_json::json!({ "name": "evt", "payload": null }),
369            };
370
371            let err = dispatch_registered(&mut ctx, req).expect_err("should error");
372            assert_eq!(err.code.as_deref(), Some("evt.fail"));
373            assert_eq!(err.message, "nope");
374        });
375    }
376}