Skip to main content

jsonrpc_fdpass/
server.rs

1use crate::error::{Error, Result};
2use crate::message::{JsonRpcMessage, JsonRpcResponse, MessageWithFds, file_descriptor_error};
3use crate::transport::{Sender, UnixSocketTransport};
4use jsonrpsee::types::error::ErrorObject;
5use serde_json::Value;
6use std::collections::HashMap;
7use std::os::unix::io::OwnedFd;
8use std::path::Path;
9use std::sync::Arc;
10use tokio::net::{UnixListener, UnixStream};
11use tracing::{debug, error, info};
12
13/// Handler function for JSON-RPC methods.
14///
15/// Takes the method name, optional parameters, and received file descriptors.
16/// Returns the result value and any file descriptors to send with the response.
17pub type MethodHandler = Box<
18    dyn Fn(&str, Option<Value>, Vec<OwnedFd>) -> Result<(Option<Value>, Vec<OwnedFd>)>
19        + Send
20        + Sync,
21>;
22
23/// Handler function for JSON-RPC notifications.
24///
25/// Takes the method name, optional parameters, and received file descriptors.
26pub type NotificationHandler =
27    Box<dyn Fn(&str, Option<Value>, Vec<OwnedFd>) -> Result<()> + Send + Sync>;
28
29/// A JSON-RPC 2.0 server with file descriptor passing support.
30pub struct Server {
31    methods: HashMap<String, MethodHandler>,
32    notifications: HashMap<String, NotificationHandler>,
33}
34
35impl Server {
36    /// Create a new JSON-RPC server.
37    pub fn new() -> Self {
38        Self {
39            methods: HashMap::new(),
40            notifications: HashMap::new(),
41        }
42    }
43
44    /// Register a handler for a JSON-RPC method.
45    pub fn register_method<F>(&mut self, name: &str, handler: F)
46    where
47        F: Fn(&str, Option<Value>, Vec<OwnedFd>) -> Result<(Option<Value>, Vec<OwnedFd>)>
48            + Send
49            + Sync
50            + 'static,
51    {
52        self.methods.insert(name.to_string(), Box::new(handler));
53    }
54
55    /// Register a handler for a JSON-RPC notification.
56    pub fn register_notification<F>(&mut self, name: &str, handler: F)
57    where
58        F: Fn(&str, Option<Value>, Vec<OwnedFd>) -> Result<()> + Send + Sync + 'static,
59    {
60        self.notifications
61            .insert(name.to_string(), Box::new(handler));
62    }
63
64    /// Start listening for connections on the given Unix socket path.
65    pub async fn listen<P: AsRef<Path>>(self, path: P) -> Result<()> {
66        let listener = UnixListener::bind(path)?;
67        let server = Arc::new(self);
68
69        info!("Server listening on Unix socket");
70
71        while let Ok((stream, _)) = listener.accept().await {
72            let server = Arc::clone(&server);
73            tokio::spawn(async move {
74                if let Err(e) = server.handle_connection(stream).await {
75                    error!("Connection handler error: {}", e);
76                }
77            });
78        }
79
80        Ok(())
81    }
82
83    async fn handle_connection(&self, stream: UnixStream) -> Result<()> {
84        let transport = UnixSocketTransport::new(stream);
85        let (mut sender, mut receiver) = transport.split();
86
87        debug!("New connection established");
88
89        loop {
90            match receiver.receive().await {
91                Ok(message_with_fds) => {
92                    if let Err(e) = self.process_message(message_with_fds, &mut sender).await {
93                        error!("Error processing message: {}", e);
94                        break;
95                    }
96                }
97                Err(Error::ConnectionClosed) => {
98                    debug!("Connection closed");
99                    break;
100                }
101                Err(e) => {
102                    error!("Error receiving message: {}", e);
103                    break;
104                }
105            }
106        }
107
108        Ok(())
109    }
110
111    /// Process a single JSON-RPC message and send the response.
112    pub async fn process_message(
113        &self,
114        message_with_fds: MessageWithFds,
115        sender: &mut Sender,
116    ) -> Result<()> {
117        match message_with_fds.message {
118            JsonRpcMessage::Request(request) => {
119                let id = request.id.clone();
120                let method = &request.method;
121                let params = request.params.clone();
122
123                debug!("Processing request: {}", method);
124
125                let response = if let Some(handler) = self.methods.get(method) {
126                    match handler(method, params, message_with_fds.file_descriptors) {
127                        Ok((result, response_fds)) => {
128                            let response =
129                                JsonRpcResponse::success(result.unwrap_or(Value::Null), id);
130                            let message = JsonRpcMessage::Response(response);
131                            MessageWithFds::new(message, response_fds)
132                        }
133                        Err(_) => {
134                            let error = file_descriptor_error();
135                            let response = JsonRpcResponse::error(error, id);
136                            let message = JsonRpcMessage::Response(response);
137                            MessageWithFds::new(message, Vec::new())
138                        }
139                    }
140                } else {
141                    let error =
142                        ErrorObject::owned(-32601, "Method not found".to_string(), None::<Value>);
143                    let response = JsonRpcResponse::error(error, id);
144                    let message = JsonRpcMessage::Response(response);
145                    MessageWithFds::new(message, Vec::new())
146                };
147
148                sender.send(response).await?;
149            }
150            JsonRpcMessage::Notification(notification) => {
151                debug!("Processing notification: {}", notification.method);
152
153                if let Some(handler) = self.notifications.get(&notification.method) {
154                    if let Err(e) = handler(
155                        &notification.method,
156                        notification.params,
157                        message_with_fds.file_descriptors,
158                    ) {
159                        error!("Notification handler error: {}", e);
160                    }
161                }
162            }
163            JsonRpcMessage::Response(_) => {
164                debug!("Received response (unexpected on server side)");
165            }
166        }
167
168        Ok(())
169    }
170}
171
172impl Default for Server {
173    fn default() -> Self {
174        Self::new()
175    }
176}