openvcs_core/
plugin_runtime.rs

1use crate::models::VcsEvent;
2use crate::plugin_protocol::{PluginMessage, RpcRequest};
3use crate::plugin_stdio::ok_null;
4use crate::plugin_stdio::{PluginError, receive_message, respond_shared, send_message_shared};
5use std::collections::HashMap;
6use std::collections::VecDeque;
7use std::io::{self, BufReader, LineWriter};
8use std::sync::{Arc, Mutex, OnceLock};
9use std::time::Duration;
10
11pub type HandlerResult = Result<serde_json::Value, PluginError>;
12pub type EventHandlerResult = Result<(), PluginError>;
13
14const DEFAULT_HOST_TIMEOUT_MS: u64 = 60_000;
15const HOST_TIMEOUT_ENV: &str = "OPENVCS_PLUGIN_HOST_TIMEOUT_MS";
16
17fn host_timeout() -> Duration {
18    std::env::var(HOST_TIMEOUT_ENV)
19        .ok()
20        .and_then(|s| s.parse::<u64>().ok())
21        .filter(|&ms| ms > 0)
22        .map(Duration::from_millis)
23        .unwrap_or(Duration::from_millis(DEFAULT_HOST_TIMEOUT_MS))
24}
25
26pub struct PluginCtx {
27    stdout: Arc<Mutex<LineWriter<io::Stdout>>>,
28}
29
30impl PluginCtx {
31    pub fn emit(&self, event: VcsEvent) {
32        send_message_shared(&self.stdout, &PluginMessage::Event { event });
33    }
34
35    pub fn stdout(&self) -> Arc<Mutex<LineWriter<io::Stdout>>> {
36        Arc::clone(&self.stdout)
37    }
38}
39
40fn next_request(
41    queue: &Arc<Mutex<VecDeque<RpcRequest>>>,
42    stdin: &Arc<Mutex<BufReader<io::Stdin>>>,
43) -> Option<RpcRequest> {
44    if let Ok(mut q) = queue.lock()
45        && let Some(req) = q.pop_front()
46    {
47        return Some(req);
48    }
49
50    loop {
51        let msg = {
52            let mut lock = stdin.lock().ok()?;
53            receive_message(&mut *lock)?
54        };
55        match msg {
56            PluginMessage::Request(req) => return Some(req),
57            PluginMessage::Response(_) | PluginMessage::Event { .. } => continue,
58        }
59    }
60}
61
62pub struct PluginRuntime {
63    ctx: PluginCtx,
64    stdin: Arc<Mutex<BufReader<io::Stdin>>>,
65    queue: Arc<Mutex<VecDeque<RpcRequest>>>,
66}
67
68impl PluginRuntime {
69    pub fn init() -> Self {
70        #[cfg(target_arch = "wasm32")]
71        let next_id = 1u64 << 63;
72
73        #[cfg(not(target_arch = "wasm32"))]
74        let next_id = 1u64;
75
76        let timeout = host_timeout();
77
78        let stdout = Arc::new(Mutex::new(LineWriter::new(io::stdout())));
79        let stdin = Arc::new(Mutex::new(BufReader::new(io::stdin())));
80        let queue: Arc<Mutex<VecDeque<RpcRequest>>> = Arc::new(Mutex::new(VecDeque::new()));
81        let ids = Arc::new(Mutex::new(crate::plugin_stdio::RequestIdState { next_id }));
82
83        crate::host::init_default_stdio_host(
84            Arc::clone(&stdout),
85            Arc::clone(&stdin),
86            Arc::clone(&queue),
87            Arc::clone(&ids),
88            timeout,
89        );
90
91        Self {
92            ctx: PluginCtx { stdout },
93            stdin,
94            queue,
95        }
96    }
97
98    pub fn ctx(&mut self) -> &mut PluginCtx {
99        &mut self.ctx
100    }
101
102    pub(crate) fn tick(
103        &mut self,
104        mut handle: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult,
105    ) -> io::Result<bool> {
106        let Some(req) = next_request(&self.queue, &self.stdin) else {
107            return Ok(false);
108        };
109        let id = req.id;
110        let res = handle(&mut self.ctx, req);
111        respond_shared(&self.ctx.stdout, id, res);
112        Ok(true)
113    }
114}
115
116type RpcHandler = Box<dyn FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static>;
117type EventHandler =
118    Box<dyn FnMut(&mut PluginCtx, serde_json::Value) -> EventHandlerResult + Send + 'static>;
119
120struct Registry {
121    rpc: HashMap<String, RpcHandler>,
122    event: HashMap<String, Vec<EventHandler>>,
123    fallback: Option<RpcHandler>,
124}
125
126static REGISTRY: OnceLock<Mutex<Registry>> = OnceLock::new();
127
128fn registry() -> &'static Mutex<Registry> {
129    REGISTRY.get_or_init(|| {
130        Mutex::new(Registry {
131            rpc: HashMap::new(),
132            event: HashMap::new(),
133            fallback: None,
134        })
135    })
136}
137
138fn register_rpc_impl(
139    method: &str,
140    handler: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static,
141) {
142    if let Ok(mut lock) = registry().lock() {
143        lock.rpc.insert(method.to_string(), Box::new(handler));
144    }
145}
146
147pub fn register_delegate(
148    method: &str,
149    handler: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static,
150) {
151    register_rpc_impl(method, handler)
152}
153
154/// Backward-compatible alias (will be removed once callers migrate).
155#[deprecated(note = "renamed to register_delegate")]
156pub fn on_rpc(
157    method: &str,
158    handler: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static,
159) {
160    register_rpc_impl(method, handler)
161}
162
163#[deprecated(note = "renamed to register_delegate")]
164pub fn register_rpc(
165    method: &str,
166    handler: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static,
167) {
168    register_rpc_impl(method, handler)
169}
170
171pub fn on_event(
172    name: &str,
173    handler: impl FnMut(&mut PluginCtx, serde_json::Value) -> EventHandlerResult + Send + 'static,
174) {
175    if let Ok(mut lock) = registry().lock() {
176        lock.event
177            .entry(name.to_string())
178            .or_default()
179            .push(Box::new(handler));
180    }
181}
182
183pub fn set_fallback_rpc(
184    handler: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult + Send + 'static,
185) {
186    if let Ok(mut lock) = registry().lock() {
187        lock.fallback = Some(Box::new(handler));
188    }
189}
190
191fn dispatch_registered(ctx: &mut PluginCtx, req: RpcRequest) -> HandlerResult {
192    if req.method == "event.dispatch" {
193        #[derive(serde::Deserialize)]
194        struct P {
195            name: String,
196            #[serde(default)]
197            payload: serde_json::Value,
198        }
199        let p: P =
200            crate::plugin_stdio::parse_json_params(req.params).map_err(PluginError::message)?;
201
202        if let Ok(mut lock) = registry().lock()
203            && let Some(handlers) = lock.event.get_mut(&p.name)
204        {
205            for h in handlers.iter_mut() {
206                h(ctx, p.payload.clone())?;
207            }
208        }
209        return ok_null();
210    }
211
212    if let Ok(mut lock) = registry().lock() {
213        if let Some(h) = lock.rpc.get_mut(&req.method) {
214            return h(ctx, req);
215        }
216        if let Some(fallback) = lock.fallback.as_mut() {
217            return fallback(ctx, req);
218        }
219    }
220
221    Err(PluginError::code(
222        "plugin.unknown_method",
223        format!("unknown method '{}'", req.method),
224    ))
225}
226
227pub fn run_registered() -> io::Result<()> {
228    let mut rt = PluginRuntime::init();
229    while rt.tick(dispatch_registered)? {}
230    Ok(())
231}
232
233pub fn run(mut handle: impl FnMut(&mut PluginCtx, RpcRequest) -> HandlerResult) -> io::Result<()> {
234    let mut rt = PluginRuntime::init();
235    while rt.tick(|ctx, req| handle(ctx, req))? {}
236    Ok(())
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242    use std::sync::OnceLock;
243
244    static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
245
246    fn with_test_lock<T>(f: impl FnOnce() -> T) -> T {
247        let lock = TEST_LOCK.get_or_init(|| Mutex::new(()));
248        let _guard = lock.lock().expect("test lock");
249        f()
250    }
251
252    fn reset_registry() {
253        if let Ok(mut lock) = registry().lock() {
254            lock.rpc.clear();
255            lock.event.clear();
256            lock.fallback = None;
257        }
258    }
259
260    fn test_ctx() -> PluginCtx {
261        PluginCtx {
262            stdout: Arc::new(Mutex::new(LineWriter::new(io::stdout()))),
263        }
264    }
265
266    #[test]
267    fn dispatch_registered_routes_to_rpc_handler() {
268        with_test_lock(|| {
269            reset_registry();
270            register_delegate("ping", |_ctx, req| {
271                assert_eq!(req.method, "ping");
272                crate::plugin_stdio::ok(serde_json::json!({"pong": true}))
273            });
274
275            let mut ctx = test_ctx();
276            let req = RpcRequest {
277                id: 1,
278                method: "ping".into(),
279                params: serde_json::Value::Null,
280            };
281            let res = dispatch_registered(&mut ctx, req).expect("ok");
282            assert_eq!(res, serde_json::json!({"pong": true}));
283        });
284    }
285
286    #[test]
287    fn dispatch_registered_uses_fallback_when_no_handler() {
288        with_test_lock(|| {
289            reset_registry();
290            set_fallback_rpc(|_ctx, req| {
291                crate::plugin_stdio::ok(serde_json::json!({ "method": req.method }))
292            });
293
294            let mut ctx = test_ctx();
295            let req = RpcRequest {
296                id: 1,
297                method: "unknown".into(),
298                params: serde_json::Value::Null,
299            };
300
301            let res = dispatch_registered(&mut ctx, req).expect("ok");
302            assert_eq!(res, serde_json::json!({ "method": "unknown" }));
303        });
304    }
305
306    #[test]
307    fn dispatch_registered_returns_unknown_method_error_when_unhandled() {
308        with_test_lock(|| {
309            reset_registry();
310
311            let mut ctx = test_ctx();
312            let req = RpcRequest {
313                id: 1,
314                method: "unknown".into(),
315                params: serde_json::Value::Null,
316            };
317
318            let err = dispatch_registered(&mut ctx, req).expect_err("should error");
319            assert_eq!(err.code.as_deref(), Some("plugin.unknown_method"));
320            assert!(err.message.contains("unknown method"));
321        });
322    }
323
324    #[test]
325    fn dispatch_registered_event_dispatch_invokes_handlers() {
326        with_test_lock(|| {
327            reset_registry();
328
329            let seen: Arc<Mutex<Vec<serde_json::Value>>> = Arc::new(Mutex::new(Vec::new()));
330            let seen_1 = Arc::clone(&seen);
331            on_event("evt", move |_ctx, payload| {
332                if let Ok(mut lock) = seen_1.lock() {
333                    lock.push(payload);
334                }
335                Ok(())
336            });
337
338            let mut ctx = test_ctx();
339            let req = RpcRequest {
340                id: 1,
341                method: "event.dispatch".into(),
342                params: serde_json::json!({ "name": "evt", "payload": { "x": 1 } }),
343            };
344
345            let res = dispatch_registered(&mut ctx, req).expect("ok");
346            assert_eq!(res, serde_json::Value::Null);
347
348            let lock = seen.lock().expect("lock");
349            assert_eq!(lock.as_slice(), &[serde_json::json!({ "x": 1 })]);
350        });
351    }
352
353    #[test]
354    fn dispatch_registered_event_dispatch_propagates_handler_error() {
355        with_test_lock(|| {
356            reset_registry();
357
358            on_event("evt", |_ctx, _payload| {
359                Err(PluginError::code("evt.fail", "nope"))
360            });
361
362            let mut ctx = test_ctx();
363            let req = RpcRequest {
364                id: 1,
365                method: "event.dispatch".into(),
366                params: serde_json::json!({ "name": "evt", "payload": null }),
367            };
368
369            let err = dispatch_registered(&mut ctx, req).expect_err("should error");
370            assert_eq!(err.code.as_deref(), Some("evt.fail"));
371            assert_eq!(err.message, "nope");
372        });
373    }
374}