1use std::collections::HashMap;
7use std::io::{self, BufRead, BufReader, Write};
8use std::sync::{Arc, Mutex};
9
10use serde::Serialize;
11use serde_json::Value;
12
13use crate::protocol::{Event, Request, Response};
14
15type SharedReader = Arc<Mutex<Box<dyn BufRead + Send>>>;
16type SharedWriter = Arc<Mutex<Box<dyn Write + Send>>>;
17
18pub type Handler = fn(Vec<Value>) -> Result<Value, String>;
20
21#[derive(Clone)]
23pub struct EventEmitter {
24 output: SharedWriter,
25}
26
27impl EventEmitter {
28 pub fn emit<T: Serialize>(&self, event: &str, data: T) -> io::Result<()> {
30 let value = serde_json::to_value(data).map_err(serialization_error)?;
31 self.emit_value(event, Some(value))
32 }
33
34 pub fn emit_empty(&self, event: &str) -> io::Result<()> {
36 self.emit_value(event, None)
37 }
38
39 fn emit_value(&self, event: &str, data: Option<Value>) -> io::Result<()> {
40 self.write_json_line(&Event::new(event, data))
41 }
42
43 fn write_json_line<T: Serialize>(&self, value: &T) -> io::Result<()> {
44 let payload = serde_json::to_vec(value).map_err(serialization_error)?;
45 let mut writer = self
46 .output
47 .lock()
48 .map_err(|_| io::Error::other("output writer lock poisoned"))?;
49
50 writer.write_all(&payload)?;
51 writer.write_all(b"\n")?;
52 writer.flush()
53 }
54}
55
56pub struct BinaryServer {
58 handlers: HashMap<String, Handler>,
59 input: SharedReader,
60 emitter: EventEmitter,
61}
62
63impl BinaryServer {
64 pub fn new() -> Self {
66 Self::with_io(BufReader::new(io::stdin()), io::stdout())
67 }
68
69 pub fn with_io<R, W>(input: R, output: W) -> Self
71 where
72 R: BufRead + Send + 'static,
73 W: Write + Send + 'static,
74 {
75 let output: SharedWriter = Arc::new(Mutex::new(Box::new(output)));
76
77 Self {
78 handlers: HashMap::new(),
79 input: Arc::new(Mutex::new(Box::new(input))),
80 emitter: EventEmitter { output },
81 }
82 }
83
84 pub fn emitter(&self) -> EventEmitter {
86 self.emitter.clone()
87 }
88
89 pub fn register(&mut self, action: &str, handler: Handler) {
91 self.handlers.insert(action.to_string(), handler);
92 }
93
94 pub fn emit_event<T: Serialize>(&self, event: &str, data: T) -> io::Result<()> {
96 self.emitter.emit(event, data)
97 }
98
99 pub fn emit_event_empty(&self, event: &str) -> io::Result<()> {
101 self.emitter.emit_empty(event)
102 }
103
104 pub fn run(&self) {
106 let mut line = String::new();
107
108 loop {
109 line.clear();
110
111 let bytes_read = {
112 let mut reader = match self.input.lock() {
113 Ok(reader) => reader,
114 Err(_) => return,
115 };
116
117 match reader.read_line(&mut line) {
118 Ok(bytes) => bytes,
119 Err(_) => continue,
120 }
121 };
122
123 if bytes_read == 0 {
124 break;
125 }
126
127 let trimmed = line.trim_end_matches(['\n', '\r']);
128 if trimmed.is_empty() {
129 continue;
130 }
131
132 let response = match serde_json::from_str::<Request>(trimmed) {
133 Ok(req) => self.handle_request(req),
134 Err(err) => {
135 let raw = format!(r#"{{"status":"error","error":"invalid json: {}"}}"#, err);
136 let _ = self.write_raw_line(&raw);
137 continue;
138 }
139 };
140
141 if self.emitter.write_json_line(&response).is_err() {
142 let raw = r#"{"status":"error","error":"serialization failed"}"#;
143 let _ = self.write_raw_line(raw);
144 }
145 }
146 }
147
148 fn handle_request(&self, req: Request) -> Response {
150 match self.handlers.get(&req.action) {
151 Some(handler) => match handler(req.params) {
152 Ok(result) => Response::Ok { id: req.id, result },
153 Err(msg) => Response::Error {
154 id: req.id,
155 error: msg,
156 },
157 },
158 None => Response::Error {
159 id: req.id,
160 error: format!("unknown action: {}", req.action),
161 },
162 }
163 }
164
165 fn write_raw_line(&self, raw: &str) -> io::Result<()> {
166 let mut writer = self
167 .emitter
168 .output
169 .lock()
170 .map_err(|_| io::Error::other("output writer lock poisoned"))?;
171 writer.write_all(raw.as_bytes())?;
172 writer.write_all(b"\n")?;
173 writer.flush()
174 }
175}
176
177impl Default for BinaryServer {
178 fn default() -> Self {
179 Self::new()
180 }
181}
182
183fn serialization_error(err: serde_json::Error) -> io::Error {
184 io::Error::new(io::ErrorKind::InvalidData, err)
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190 use serde_json::json;
191 use std::io::Cursor;
192
193 #[derive(Clone, Default)]
194 struct SharedBuffer {
195 bytes: Arc<Mutex<Vec<u8>>>,
196 }
197
198 impl SharedBuffer {
199 fn into_string(&self) -> String {
200 String::from_utf8(self.bytes.lock().unwrap().clone()).unwrap()
201 }
202 }
203
204 impl Write for SharedBuffer {
205 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
206 self.bytes.lock().unwrap().extend_from_slice(buf);
207 Ok(buf.len())
208 }
209
210 fn flush(&mut self) -> io::Result<()> {
211 Ok(())
212 }
213 }
214
215 fn test_handler_success(params: Vec<Value>) -> Result<Value, String> {
216 Ok(json!({"received": params.len()}))
217 }
218
219 fn test_handler_error(_params: Vec<Value>) -> Result<Value, String> {
220 Err("intentional error".to_string())
221 }
222
223 fn add_handler(params: Vec<Value>) -> Result<Value, String> {
224 if params.len() != 2 {
225 return Err("Expected 2 parameters".into());
226 }
227 let a = params[0].as_i64().ok_or("Invalid number")?;
228 let b = params[1].as_i64().ok_or("Invalid number")?;
229 Ok(Value::from(a + b))
230 }
231
232 #[test]
233 fn test_server_creation() {
234 let server = BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
235 assert_eq!(server.handlers.len(), 0);
236 }
237
238 #[test]
239 fn test_server_default() {
240 let server = BinaryServer::default();
241 assert_eq!(server.handlers.len(), 0);
242 }
243
244 #[test]
245 fn test_register_handler() {
246 let mut server =
247 BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
248 server.register("test", test_handler_success);
249 assert_eq!(server.handlers.len(), 1);
250 assert!(server.handlers.contains_key("test"));
251 }
252
253 #[test]
254 fn test_register_multiple_handlers() {
255 let mut server =
256 BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
257 server.register("handler1", test_handler_success);
258 server.register("handler2", test_handler_error);
259 assert_eq!(server.handlers.len(), 2);
260 }
261
262 #[test]
263 fn test_handle_request_success() {
264 let mut server =
265 BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
266 server.register("test", test_handler_success);
267
268 let request = Request {
269 id: "req-1".to_string(),
270 action: "test".to_string(),
271 params: vec![json!(1), json!(2)],
272 };
273
274 let response = server.handle_request(request);
275
276 match response {
277 Response::Ok { id, result } => {
278 assert_eq!(id, "req-1");
279 assert_eq!(result, json!({"received": 2}));
280 }
281 _ => panic!("Expected Ok response"),
282 }
283 }
284
285 #[test]
286 fn test_handle_request_handler_error() {
287 let mut server =
288 BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
289 server.register("test", test_handler_error);
290
291 let request = Request {
292 id: "req-2".to_string(),
293 action: "test".to_string(),
294 params: vec![],
295 };
296
297 let response = server.handle_request(request);
298
299 match response {
300 Response::Error { id, error } => {
301 assert_eq!(id, "req-2");
302 assert_eq!(error, "intentional error");
303 }
304 _ => panic!("Expected Error response"),
305 }
306 }
307
308 #[test]
309 fn test_handle_request_unknown_action() {
310 let server = BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
311
312 let request = Request {
313 id: "req-3".to_string(),
314 action: "nonexistent".to_string(),
315 params: vec![],
316 };
317
318 let response = server.handle_request(request);
319
320 match response {
321 Response::Error { id, error } => {
322 assert_eq!(id, "req-3");
323 assert_eq!(error, "unknown action: nonexistent");
324 }
325 _ => panic!("Expected Error response"),
326 }
327 }
328
329 #[test]
330 fn test_add_handler_success() {
331 let mut server =
332 BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
333 server.register("add", add_handler);
334
335 let request = Request {
336 id: "req-4".to_string(),
337 action: "add".to_string(),
338 params: vec![json!(5), json!(10)],
339 };
340
341 let response = server.handle_request(request);
342
343 match response {
344 Response::Ok { id, result } => {
345 assert_eq!(id, "req-4");
346 assert_eq!(result, json!(15));
347 }
348 _ => panic!("Expected Ok response"),
349 }
350 }
351
352 #[test]
353 fn test_add_handler_wrong_param_count() {
354 let mut server =
355 BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
356 server.register("add", add_handler);
357
358 let request = Request {
359 id: "req-5".to_string(),
360 action: "add".to_string(),
361 params: vec![json!(5)],
362 };
363
364 let response = server.handle_request(request);
365
366 match response {
367 Response::Error { id, error } => {
368 assert_eq!(id, "req-5");
369 assert_eq!(error, "Expected 2 parameters");
370 }
371 _ => panic!("Expected Error response"),
372 }
373 }
374
375 #[test]
376 fn test_add_handler_invalid_type() {
377 let mut server =
378 BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), SharedBuffer::default());
379 server.register("add", add_handler);
380
381 let request = Request {
382 id: "req-6".to_string(),
383 action: "add".to_string(),
384 params: vec![json!("not a number"), json!(10)],
385 };
386
387 let response = server.handle_request(request);
388
389 match response {
390 Response::Error { id, error } => {
391 assert_eq!(id, "req-6");
392 assert_eq!(error, "Invalid number");
393 }
394 _ => panic!("Expected Error response"),
395 }
396 }
397
398 #[test]
399 fn test_emit_event() {
400 let output = SharedBuffer::default();
401 let server = BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), output.clone());
402
403 server
404 .emit_event("worker.ready", json!({"pid": 42}))
405 .unwrap();
406
407 let line = output.into_string();
408 let parsed: Value = serde_json::from_str(line.trim()).unwrap();
409 assert_eq!(parsed["type"], "event");
410 assert_eq!(parsed["event"], "worker.ready");
411 assert_eq!(parsed["data"]["pid"], 42);
412 }
413
414 #[test]
415 fn test_emit_event_empty() {
416 let output = SharedBuffer::default();
417 let server = BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), output.clone());
418
419 server.emit_event_empty("heartbeat").unwrap();
420
421 let line = output.into_string();
422 let parsed: Value = serde_json::from_str(line.trim()).unwrap();
423 assert_eq!(parsed["type"], "event");
424 assert_eq!(parsed["event"], "heartbeat");
425 assert!(parsed.get("data").is_none());
426 }
427
428 #[test]
429 fn test_emitter_clone_writes_same_stream() {
430 let output = SharedBuffer::default();
431 let server = BinaryServer::with_io(Cursor::new(Vec::<u8>::new()), output.clone());
432 let emitter = server.emitter();
433
434 emitter.emit("worker.ready", json!({"pid": 7})).unwrap();
435 server.emit_event_empty("heartbeat").unwrap();
436
437 let content = output.into_string();
438 let lines: Vec<&str> = content.lines().collect();
439 assert_eq!(lines.len(), 2);
440
441 let first: Value = serde_json::from_str(lines[0]).unwrap();
442 let second: Value = serde_json::from_str(lines[1]).unwrap();
443 assert_eq!(first["event"], "worker.ready");
444 assert_eq!(second["event"], "heartbeat");
445 }
446
447 #[test]
448 fn test_run_writes_event_and_response() {
449 let output = SharedBuffer::default();
450 let input =
451 Cursor::new(b"{\"id\":\"req-1\",\"action\":\"sum\",\"params\":[2,3]}\n".to_vec());
452 let mut server = BinaryServer::with_io(input, output.clone());
453
454 server.register("sum", |params| {
455 let a = params[0].as_i64().ok_or("Invalid number")?;
456 let b = params[1].as_i64().ok_or("Invalid number")?;
457 Ok(Value::from(a + b))
458 });
459
460 server.emit_event_empty("startup").unwrap();
461 server.run();
462
463 let content = output.into_string();
464 let lines: Vec<&str> = content.lines().collect();
465 assert_eq!(lines.len(), 2);
466
467 let event: Value = serde_json::from_str(lines[0]).unwrap();
468 let response: Value = serde_json::from_str(lines[1]).unwrap();
469 assert_eq!(event["type"], "event");
470 assert_eq!(event["event"], "startup");
471 assert_eq!(response["status"], "ok");
472 assert_eq!(response["result"], 5);
473 }
474}