Skip to main content

folk_core/
rpc_server.rs

1//! Admin RPC server: Unix socket listener that dispatches to registered handlers.
2//!
3//! Not to be confused with the master-worker task channel. This is for
4//! external admin tooling (PHP's RPC client, `folk admin`, monitoring).
5
6use std::path::Path;
7use std::sync::Arc;
8
9use anyhow::Result;
10use folk_protocol::{FrameCodec, RpcMessage};
11use futures_util::{SinkExt, StreamExt};
12use rmpv::Value;
13use tokio::net::{UnixListener, UnixStream};
14use tokio::sync::watch;
15use tokio_util::codec::Framed;
16use tracing::{debug, error, info, warn};
17
18use crate::rpc_registry::RpcRegistry;
19
20/// Bind the admin RPC socket and serve connections until shutdown is signaled.
21pub async fn run_rpc_server(
22    socket_path: impl AsRef<Path>,
23    registry: Arc<RpcRegistry>,
24    mut shutdown: watch::Receiver<bool>,
25) -> Result<()> {
26    let path = socket_path.as_ref();
27
28    // Remove stale socket file.
29    if path.exists() {
30        std::fs::remove_file(path)?;
31    }
32
33    let listener = UnixListener::bind(path)?;
34    info!(socket = %path.display(), "admin RPC server listening");
35
36    loop {
37        tokio::select! {
38            accept = listener.accept() => {
39                match accept {
40                    Ok((stream, _addr)) => {
41                        let reg = registry.clone();
42                        let sd = shutdown.clone();
43                        tokio::spawn(handle_connection(stream, reg, sd));
44                    },
45                    Err(e) => {
46                        error!(error = ?e, "accept error");
47                    },
48                }
49            },
50            _ = shutdown.changed() => {
51                if *shutdown.borrow() {
52                    info!("admin RPC server shutting down");
53                    break;
54                }
55            },
56        }
57    }
58
59    // Remove socket file on clean exit.
60    let _ = std::fs::remove_file(path);
61    Ok(())
62}
63
64async fn handle_connection(
65    stream: UnixStream,
66    registry: Arc<RpcRegistry>,
67    mut shutdown: watch::Receiver<bool>,
68) {
69    let mut framed = Framed::new(stream, FrameCodec::new());
70
71    loop {
72        tokio::select! {
73            frame = framed.next() => {
74                match frame {
75                    Some(Ok(msg)) => {
76                        let response = dispatch(&registry, msg).await;
77                        if let Err(e) = framed.send(response).await {
78                            warn!(error = ?e, "send error; closing connection");
79                            return;
80                        }
81                    },
82                    Some(Err(e)) => {
83                        warn!(error = ?e, "frame decode error; closing connection");
84                        return;
85                    },
86                    None => {
87                        debug!("connection closed by peer");
88                        return;
89                    },
90                }
91            },
92            _ = shutdown.changed() => {
93                if *shutdown.borrow() {
94                    debug!("connection closed due to shutdown");
95                    return;
96                }
97            },
98        }
99    }
100}
101
102async fn dispatch(registry: &RpcRegistry, msg: RpcMessage) -> RpcMessage {
103    match msg {
104        RpcMessage::Request { msgid, method, params } => {
105            let payload = rmp_serde::to_vec(&params)
106                .map(bytes::Bytes::from)
107                .unwrap_or_default();
108            match registry.dispatch(&method, payload).await {
109                Ok(bytes) => {
110                    let result = rmp_serde::from_slice::<Value>(&bytes)
111                        .unwrap_or(Value::String(format!("{bytes:?}").into()));
112                    RpcMessage::response_ok(msgid, result)
113                },
114                Err(e) => {
115                    let err = Value::Map(vec![
116                        (
117                            Value::String("code".into()),
118                            Value::Integer((-32601).into()),
119                        ),
120                        (
121                            Value::String("message".into()),
122                            Value::String(e.to_string().into()),
123                        ),
124                    ]);
125                    RpcMessage::response_err(msgid, err)
126                },
127            }
128        },
129        RpcMessage::Notify { .. } | RpcMessage::Response { .. } => {
130            if matches!(msg, RpcMessage::Response { .. }) {
131                warn!("received unexpected Response on admin socket; ignoring");
132            }
133            RpcMessage::notify("noop", Value::Nil)
134        },
135    }
136}