1use crate::error::{Error, Result};
2use crate::message::{JsonRpcMessage, JsonRpcResponse, MessageWithFds, file_descriptor_error};
3use crate::transport::{Sender, UnixSocketTransport};
4use jsonrpsee::types::error::ErrorObject;
5use serde_json::Value;
6use std::collections::HashMap;
7use std::os::unix::io::OwnedFd;
8use std::path::Path;
9use std::sync::Arc;
10use tokio::net::{UnixListener, UnixStream};
11use tracing::{debug, error, info};
12
13pub type MethodHandler = Box<
18 dyn Fn(&str, Option<Value>, Vec<OwnedFd>) -> Result<(Option<Value>, Vec<OwnedFd>)>
19 + Send
20 + Sync,
21>;
22
23pub type NotificationHandler =
27 Box<dyn Fn(&str, Option<Value>, Vec<OwnedFd>) -> Result<()> + Send + Sync>;
28
29pub struct Server {
31 methods: HashMap<String, MethodHandler>,
32 notifications: HashMap<String, NotificationHandler>,
33}
34
35impl Server {
36 pub fn new() -> Self {
38 Self {
39 methods: HashMap::new(),
40 notifications: HashMap::new(),
41 }
42 }
43
44 pub fn register_method<F>(&mut self, name: &str, handler: F)
46 where
47 F: Fn(&str, Option<Value>, Vec<OwnedFd>) -> Result<(Option<Value>, Vec<OwnedFd>)>
48 + Send
49 + Sync
50 + 'static,
51 {
52 self.methods.insert(name.to_string(), Box::new(handler));
53 }
54
55 pub fn register_notification<F>(&mut self, name: &str, handler: F)
57 where
58 F: Fn(&str, Option<Value>, Vec<OwnedFd>) -> Result<()> + Send + Sync + 'static,
59 {
60 self.notifications
61 .insert(name.to_string(), Box::new(handler));
62 }
63
64 pub async fn listen<P: AsRef<Path>>(self, path: P) -> Result<()> {
66 let listener = UnixListener::bind(path)?;
67 let server = Arc::new(self);
68
69 info!("Server listening on Unix socket");
70
71 while let Ok((stream, _)) = listener.accept().await {
72 let server = Arc::clone(&server);
73 tokio::spawn(async move {
74 if let Err(e) = server.handle_connection(stream).await {
75 error!("Connection handler error: {}", e);
76 }
77 });
78 }
79
80 Ok(())
81 }
82
83 async fn handle_connection(&self, stream: UnixStream) -> Result<()> {
84 let transport = UnixSocketTransport::new(stream);
85 let (mut sender, mut receiver) = transport.split();
86
87 debug!("New connection established");
88
89 loop {
90 match receiver.receive().await {
91 Ok(message_with_fds) => {
92 if let Err(e) = self.process_message(message_with_fds, &mut sender).await {
93 error!("Error processing message: {}", e);
94 break;
95 }
96 }
97 Err(Error::ConnectionClosed) => {
98 debug!("Connection closed");
99 break;
100 }
101 Err(e) => {
102 error!("Error receiving message: {}", e);
103 break;
104 }
105 }
106 }
107
108 Ok(())
109 }
110
111 pub async fn process_message(
113 &self,
114 message_with_fds: MessageWithFds,
115 sender: &mut Sender,
116 ) -> Result<()> {
117 match message_with_fds.message {
118 JsonRpcMessage::Request(request) => {
119 let id = request.id.clone();
120 let method = &request.method;
121 let params = request.params.clone();
122
123 debug!("Processing request: {}", method);
124
125 let response = if let Some(handler) = self.methods.get(method) {
126 match handler(method, params, message_with_fds.file_descriptors) {
127 Ok((result, response_fds)) => {
128 let response =
129 JsonRpcResponse::success(result.unwrap_or(Value::Null), id);
130 let message = JsonRpcMessage::Response(response);
131 MessageWithFds::new(message, response_fds)
132 }
133 Err(_) => {
134 let error = file_descriptor_error();
135 let response = JsonRpcResponse::error(error, id);
136 let message = JsonRpcMessage::Response(response);
137 MessageWithFds::new(message, Vec::new())
138 }
139 }
140 } else {
141 let error =
142 ErrorObject::owned(-32601, "Method not found".to_string(), None::<Value>);
143 let response = JsonRpcResponse::error(error, id);
144 let message = JsonRpcMessage::Response(response);
145 MessageWithFds::new(message, Vec::new())
146 };
147
148 sender.send(response).await?;
149 }
150 JsonRpcMessage::Notification(notification) => {
151 debug!("Processing notification: {}", notification.method);
152
153 if let Some(handler) = self.notifications.get(¬ification.method) {
154 if let Err(e) = handler(
155 ¬ification.method,
156 notification.params,
157 message_with_fds.file_descriptors,
158 ) {
159 error!("Notification handler error: {}", e);
160 }
161 }
162 }
163 JsonRpcMessage::Response(_) => {
164 debug!("Received response (unexpected on server side)");
165 }
166 }
167
168 Ok(())
169 }
170}
171
172impl Default for Server {
173 fn default() -> Self {
174 Self::new()
175 }
176}