Skip to main content

opencore_jsonrpc_rust/
server.rs

1//! Binary JSON-RPC server implementation.
2//!
3//! This module provides a simple server that communicates via stdin/stdout
4//! using line-delimited JSON messages, including fire-and-forget events.
5
6use std::collections::HashMap;
7use std::io::{self, BufRead, BufReader, Write};
8use std::sync::{Arc, Mutex};
9
10use serde::Serialize;
11use serde_json::Value;
12
13use crate::protocol::{Event, Request, Response};
14
15type SharedReader = Arc<Mutex<Box<dyn BufRead + Send>>>;
16type SharedWriter = Arc<Mutex<Box<dyn Write + Send>>>;
17
18/// A handler function that processes JSON-RPC requests.
19pub type Handler = fn(Vec<Value>) -> Result<Value, String>;
20
21/// A cloneable writer for emitting unsolicited OpenCore binary events.
22#[derive(Clone)]
23pub struct EventEmitter {
24    output: SharedWriter,
25}
26
27impl EventEmitter {
28    /// Emits an event with a JSON-serializable payload.
29    pub fn emit<T: Serialize>(&self, event: &str, data: T) -> io::Result<()> {
30        let value = serde_json::to_value(data).map_err(serialization_error)?;
31        self.emit_value(event, Some(value))
32    }
33
34    /// Emits an event without a payload.
35    pub fn emit_empty(&self, event: &str) -> io::Result<()> {
36        self.emit_value(event, None)
37    }
38
39    fn emit_value(&self, event: &str, data: Option<Value>) -> io::Result<()> {
40        self.write_json_line(&Event::new(event, data))
41    }
42
43    fn write_json_line<T: Serialize>(&self, value: &T) -> io::Result<()> {
44        let payload = serde_json::to_vec(value).map_err(serialization_error)?;
45        let mut writer = self
46            .output
47            .lock()
48            .map_err(|_| io::Error::other("output writer lock poisoned"))?;
49
50        writer.write_all(&payload)?;
51        writer.write_all(b"\n")?;
52        writer.flush()
53    }
54}
55
56/// A JSON-RPC server that communicates via stdin/stdout.
57pub struct BinaryServer {
58    handlers: HashMap<String, Handler>,
59    input: SharedReader,
60    emitter: EventEmitter,
61}
62
63impl BinaryServer {
64    /// Creates a new empty server with stdin/stdout transport.
65    pub fn new() -> Self {
66        Self::with_io(BufReader::new(io::stdin()), io::stdout())
67    }
68
69    /// Creates a server with custom input and output streams.
70    pub fn with_io<R, W>(input: R, output: W) -> Self
71    where
72        R: BufRead + Send + 'static,
73        W: Write + Send + 'static,
74    {
75        let output: SharedWriter = Arc::new(Mutex::new(Box::new(output)));
76
77        Self {
78            handlers: HashMap::new(),
79            input: Arc::new(Mutex::new(Box::new(input))),
80            emitter: EventEmitter { output },
81        }
82    }
83
84    /// Returns a cloneable event emitter tied to the server stdout stream.
85    pub fn emitter(&self) -> EventEmitter {
86        self.emitter.clone()
87    }
88
89    /// Registers a handler function for a specific action name.
90    pub fn register(&mut self, action: &str, handler: Handler) {
91        self.handlers.insert(action.to_string(), handler);
92    }
93
94    /// Emits an event with a JSON-serializable payload.
95    pub fn emit_event<T: Serialize>(&self, event: &str, data: T) -> io::Result<()> {
96        self.emitter.emit(event, data)
97    }
98
99    /// Emits an event without a payload.
100    pub fn emit_event_empty(&self, event: &str) -> io::Result<()> {
101        self.emitter.emit_empty(event)
102    }
103
104    /// Starts the server loop, reading from stdin and writing to stdout.
105    pub fn run(&self) {
106        let mut line = String::new();
107
108        loop {
109            line.clear();
110
111            let bytes_read = {
112                let mut reader = match self.input.lock() {
113                    Ok(reader) => reader,
114                    Err(_) => return,
115                };
116
117                match reader.read_line(&mut line) {
118                    Ok(bytes) => bytes,
119                    Err(_) => continue,
120                }
121            };
122
123            if bytes_read == 0 {
124                break;
125            }
126
127            let trimmed = line.trim_end_matches(['\n', '\r']);
128            if trimmed.is_empty() {
129                continue;
130            }
131
132            let response = match serde_json::from_str::<Request>(trimmed) {
133                Ok(req) => self.handle_request(req),
134                Err(err) => {
135                    let raw = format!(r#"{{"status":"error","error":"invalid json: {}"}}"#, err);
136                    let _ = self.write_raw_line(&raw);
137                    continue;
138                }
139            };
140
141            if self.emitter.write_json_line(&response).is_err() {
142                let raw = r#"{"status":"error","error":"serialization failed"}"#;
143                let _ = self.write_raw_line(raw);
144            }
145        }
146    }
147
148    /// Handles a single request by dispatching to the appropriate handler.
149    fn handle_request(&self, req: Request) -> Response {
150        match self.handlers.get(&req.action) {
151            Some(handler) => match handler(req.params) {
152                Ok(result) => Response::Ok { id: req.id, result },
153                Err(msg) => Response::Error {
154                    id: req.id,
155                    error: msg,
156                },
157            },
158            None => Response::Error {
159                id: req.id,
160                error: format!("unknown action: {}", req.action),
161            },
162        }
163    }
164
165    fn write_raw_line(&self, raw: &str) -> io::Result<()> {
166        let mut writer = self
167            .emitter
168            .output
169            .lock()
170            .map_err(|_| io::Error::other("output writer lock poisoned"))?;
171        writer.write_all(raw.as_bytes())?;
172        writer.write_all(b"\n")?;
173        writer.flush()
174    }
175}
176
177impl Default for BinaryServer {
178    fn default() -> Self {
179        Self::new()
180    }
181}
182
183fn serialization_error(err: serde_json::Error) -> io::Error {
184    io::Error::new(io::ErrorKind::InvalidData, err)
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190    use serde_json::json;
191    use std::io::Cursor;
192
193    #[derive(Clone, Default)]
194    struct SharedBuffer {
195        bytes: Arc<Mutex<Vec<u8>>>,
196    }
197
198    impl SharedBuffer {
199        fn into_string(&self) -> String {
200            String::from_utf8(self.bytes.lock().unwrap().clone()).unwrap()
201        }
202    }
203
204    impl Write for SharedBuffer {
205        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
206            self.bytes.lock().unwrap().extend_from_slice(buf);
207            Ok(buf.len())
208        }
209
210        fn flush(&mut self) -> io::Result<()> {
211            Ok(())
212        }
213    }
214
215    fn test_handler_success(params: Vec<Value>) -> Result<Value, String> {
216        Ok(json!({"received": params.len()}))
217    }
218
219    fn test_handler_error(_params: Vec<Value>) -> Result<Value, String> {
220        Err("intentional error".to_string())
221    }
222
223    fn add_handler(params: Vec<Value>) -> Result<Value, String> {
224        if params.len() != 2 {
225            return Err("Expected 2 parameters".into());
226        }
227        let a = params[0].as_i64().ok_or("Invalid number")?;
228        let b = params[1].as_i64().ok_or("Invalid number")?;
229        Ok(Value::from(a + b))
230    }
231
232    #[test]
233    fn test_server_creation() {
234        let server = BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
235        assert_eq!(server.handlers.len(), 0);
236    }
237
238    #[test]
239    fn test_server_default() {
240        let server = BinaryServer::default();
241        assert_eq!(server.handlers.len(), 0);
242    }
243
244    #[test]
245    fn test_register_handler() {
246        let mut server =
247            BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
248        server.register("test", test_handler_success);
249        assert_eq!(server.handlers.len(), 1);
250        assert!(server.handlers.contains_key("test"));
251    }
252
253    #[test]
254    fn test_register_multiple_handlers() {
255        let mut server =
256            BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
257        server.register("handler1", test_handler_success);
258        server.register("handler2", test_handler_error);
259        assert_eq!(server.handlers.len(), 2);
260    }
261
262    #[test]
263    fn test_handle_request_success() {
264        let mut server =
265            BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
266        server.register("test", test_handler_success);
267
268        let request = Request {
269            id: "req-1".to_string(),
270            action: "test".to_string(),
271            params: vec![json!(1), json!(2)],
272        };
273
274        let response = server.handle_request(request);
275
276        match response {
277            Response::Ok { id, result } => {
278                assert_eq!(id, "req-1");
279                assert_eq!(result, json!({"received": 2}));
280            }
281            _ => panic!("Expected Ok response"),
282        }
283    }
284
285    #[test]
286    fn test_handle_request_handler_error() {
287        let mut server =
288            BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
289        server.register("test", test_handler_error);
290
291        let request = Request {
292            id: "req-2".to_string(),
293            action: "test".to_string(),
294            params: vec![],
295        };
296
297        let response = server.handle_request(request);
298
299        match response {
300            Response::Error { id, error } => {
301                assert_eq!(id, "req-2");
302                assert_eq!(error, "intentional error");
303            }
304            _ => panic!("Expected Error response"),
305        }
306    }
307
308    #[test]
309    fn test_handle_request_unknown_action() {
310        let server = BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
311
312        let request = Request {
313            id: "req-3".to_string(),
314            action: "nonexistent".to_string(),
315            params: vec![],
316        };
317
318        let response = server.handle_request(request);
319
320        match response {
321            Response::Error { id, error } => {
322                assert_eq!(id, "req-3");
323                assert_eq!(error, "unknown action: nonexistent");
324            }
325            _ => panic!("Expected Error response"),
326        }
327    }
328
329    #[test]
330    fn test_add_handler_success() {
331        let mut server =
332            BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
333        server.register("add", add_handler);
334
335        let request = Request {
336            id: "req-4".to_string(),
337            action: "add".to_string(),
338            params: vec![json!(5), json!(10)],
339        };
340
341        let response = server.handle_request(request);
342
343        match response {
344            Response::Ok { id, result } => {
345                assert_eq!(id, "req-4");
346                assert_eq!(result, json!(15));
347            }
348            _ => panic!("Expected Ok response"),
349        }
350    }
351
352    #[test]
353    fn test_add_handler_wrong_param_count() {
354        let mut server =
355            BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
356        server.register("add", add_handler);
357
358        let request = Request {
359            id: "req-5".to_string(),
360            action: "add".to_string(),
361            params: vec![json!(5)],
362        };
363
364        let response = server.handle_request(request);
365
366        match response {
367            Response::Error { id, error } => {
368                assert_eq!(id, "req-5");
369                assert_eq!(error, "Expected 2 parameters");
370            }
371            _ => panic!("Expected Error response"),
372        }
373    }
374
375    #[test]
376    fn test_add_handler_invalid_type() {
377        let mut server =
378            BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
379        server.register("add", add_handler);
380
381        let request = Request {
382            id: "req-6".to_string(),
383            action: "add".to_string(),
384            params: vec![json!("not a number"), json!(10)],
385        };
386
387        let response = server.handle_request(request);
388
389        match response {
390            Response::Error { id, error } => {
391                assert_eq!(id, "req-6");
392                assert_eq!(error, "Invalid number");
393            }
394            _ => panic!("Expected Error response"),
395        }
396    }
397
398    #[test]
399    fn test_emit_event() {
400        let output = SharedBuffer::default();
401        let server = BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), output.clone());
402
403        server
404            .emit_event("worker.ready", json!({"pid": 42}))
405            .unwrap();
406
407        let line = output.into_string();
408        let parsed: Value = serde_json::from_str(line.trim()).unwrap();
409        assert_eq!(parsed["type"], "event");
410        assert_eq!(parsed["event"], "worker.ready");
411        assert_eq!(parsed["data"]["pid"], 42);
412    }
413
414    #[test]
415    fn test_emit_event_empty() {
416        let output = SharedBuffer::default();
417        let server = BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), output.clone());
418
419        server.emit_event_empty("heartbeat").unwrap();
420
421        let line = output.into_string();
422        let parsed: Value = serde_json::from_str(line.trim()).unwrap();
423        assert_eq!(parsed["type"], "event");
424        assert_eq!(parsed["event"], "heartbeat");
425        assert!(parsed.get("data").is_none());
426    }
427
428    #[test]
429    fn test_emitter_clone_writes_same_stream() {
430        let output = SharedBuffer::default();
431        let server = BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), output.clone());
432        let emitter = server.emitter();
433
434        emitter.emit("worker.ready", json!({"pid": 7})).unwrap();
435        server.emit_event_empty("heartbeat").unwrap();
436
437        let content = output.into_string();
438        let lines: Vec<&str> = content.lines().collect();
439        assert_eq!(lines.len(), 2);
440
441        let first: Value = serde_json::from_str(lines[0]).unwrap();
442        let second: Value = serde_json::from_str(lines[1]).unwrap();
443        assert_eq!(first["event"], "worker.ready");
444        assert_eq!(second["event"], "heartbeat");
445    }
446
447    #[test]
448    fn test_run_writes_event_and_response() {
449        let output = SharedBuffer::default();
450        let input =
451            Cursor::new(b"{\"id\":\"req-1\",\"action\":\"sum\",\"params\":[2,3]}\n".to_vec());
452        let mut server = BinaryServer::with_io(input, output.clone());
453
454        server.register("sum", |params| {
455            let a = params[0].as_i64().ok_or("Invalid number")?;
456            let b = params[1].as_i64().ok_or("Invalid number")?;
457            Ok(Value::from(a + b))
458        });
459
460        server.emit_event_empty("startup").unwrap();
461        server.run();
462
463        let content = output.into_string();
464        let lines: Vec<&str> = content.lines().collect();
465        assert_eq!(lines.len(), 2);
466
467        let event: Value = serde_json::from_str(lines[0]).unwrap();
468        let response: Value = serde_json::from_str(lines[1]).unwrap();
469        assert_eq!(event["type"], "event");
470        assert_eq!(event["event"], "startup");
471        assert_eq!(response["status"], "ok");
472        assert_eq!(response["result"], 5);
473    }
474}