1use std::path::Path;
7use std::sync::Arc;
8
9use anyhow::Result;
10use folk_protocol::{FrameCodec, RpcMessage};
11use futures_util::{SinkExt, StreamExt};
12use rmpv::Value;
13use tokio::net::{UnixListener, UnixStream};
14use tokio::sync::watch;
15use tokio_util::codec::Framed;
16use tracing::{debug, error, info, warn};
17
18use crate::rpc_registry::RpcRegistry;
19
20pub async fn run_rpc_server(
22 socket_path: impl AsRef<Path>,
23 registry: Arc<RpcRegistry>,
24 mut shutdown: watch::Receiver<bool>,
25) -> Result<()> {
26 let path = socket_path.as_ref();
27
28 if path.exists() {
30 std::fs::remove_file(path)?;
31 }
32
33 let listener = UnixListener::bind(path)?;
34 info!(socket = %path.display(), "admin RPC server listening");
35
36 loop {
37 tokio::select! {
38 accept = listener.accept() => {
39 match accept {
40 Ok((stream, _addr)) => {
41 let reg = registry.clone();
42 let sd = shutdown.clone();
43 tokio::spawn(handle_connection(stream, reg, sd));
44 },
45 Err(e) => {
46 error!(error = ?e, "accept error");
47 },
48 }
49 },
50 _ = shutdown.changed() => {
51 if *shutdown.borrow() {
52 info!("admin RPC server shutting down");
53 break;
54 }
55 },
56 }
57 }
58
59 let _ = std::fs::remove_file(path);
61 Ok(())
62}
63
64async fn handle_connection(
65 stream: UnixStream,
66 registry: Arc<RpcRegistry>,
67 mut shutdown: watch::Receiver<bool>,
68) {
69 let mut framed = Framed::new(stream, FrameCodec::new());
70
71 loop {
72 tokio::select! {
73 frame = framed.next() => {
74 match frame {
75 Some(Ok(msg)) => {
76 let response = dispatch(®istry, msg).await;
77 if let Err(e) = framed.send(response).await {
78 warn!(error = ?e, "send error; closing connection");
79 return;
80 }
81 },
82 Some(Err(e)) => {
83 warn!(error = ?e, "frame decode error; closing connection");
84 return;
85 },
86 None => {
87 debug!("connection closed by peer");
88 return;
89 },
90 }
91 },
92 _ = shutdown.changed() => {
93 if *shutdown.borrow() {
94 debug!("connection closed due to shutdown");
95 return;
96 }
97 },
98 }
99 }
100}
101
102async fn dispatch(registry: &RpcRegistry, msg: RpcMessage) -> RpcMessage {
103 match msg {
104 RpcMessage::Request { msgid, method, params } => {
105 let payload = rmp_serde::to_vec(¶ms)
106 .map(bytes::Bytes::from)
107 .unwrap_or_default();
108 match registry.dispatch(&method, payload).await {
109 Ok(bytes) => {
110 let result = rmp_serde::from_slice::<Value>(&bytes)
111 .unwrap_or(Value::String(format!("{bytes:?}").into()));
112 RpcMessage::response_ok(msgid, result)
113 },
114 Err(e) => {
115 let err = Value::Map(vec![
116 (
117 Value::String("code".into()),
118 Value::Integer((-32601).into()),
119 ),
120 (
121 Value::String("message".into()),
122 Value::String(e.to_string().into()),
123 ),
124 ]);
125 RpcMessage::response_err(msgid, err)
126 },
127 }
128 },
129 RpcMessage::Notify { .. } | RpcMessage::Response { .. } => {
130 if matches!(msg, RpcMessage::Response { .. }) {
131 warn!("received unexpected Response on admin socket; ignoring");
132 }
133 RpcMessage::notify("noop", Value::Nil)
134 },
135 }
136}