vls_proxy/rpc_server/
server.rs

1use jsonrpsee::{
2    server::{RpcModule, Server},
3    types::{error::ErrorCode, ErrorObject},
4};
5use lightning_signer::node::Node;
6
7use std::{
8    net::{IpAddr, SocketAddr},
9    sync::Arc,
10};
11use tokio::task::JoinHandle;
12
13use crate::GIT_DESC;
14
15use super::InfoModel;
16use tracing::*;
17
18#[derive(Debug)]
19pub enum RpcMethods {
20    Info,
21    Version,
22    AllowlistDisplay,
23    AllowlistAdd,
24    AllowlistRemove,
25}
26
27impl RpcMethods {
28    pub fn as_str(&self) -> &'static str {
29        match self {
30            Self::Info => "info",
31            Self::Version => "version",
32            Self::AllowlistDisplay => "allowlist_display",
33            Self::AllowlistAdd => "allowlist_add",
34            Self::AllowlistRemove => "allowlist_remove",
35        }
36    }
37}
38
39pub async fn start_rpc_server(
40    node: Arc<Node>,
41    ip: IpAddr,
42    port: u16,
43    username: &str,
44    password: &str,
45    shutdown_signal: triggered::Listener,
46) -> anyhow::Result<(SocketAddr, JoinHandle<()>)> {
47    let mut module = RpcModule::new(node);
48    module.register_method(RpcMethods::Info.as_str(), |_, context| {
49        info!("rpc_server: info");
50        let height = context.get_chain_height();
51        let channels = context.get_channels().values().len() as u32;
52        Ok::<_, ErrorObject>(InfoModel::new(height, channels, GIT_DESC.to_string()))
53    })?;
54
55    module.register_method(RpcMethods::Version.as_str(), |_, _| {
56        Ok::<_, ErrorObject>(GIT_DESC.to_string())
57    })?;
58
59    module.register_method(RpcMethods::AllowlistDisplay.as_str(), |_, context| {
60        return match context.allowlist() {
61            Ok(allowlist) => Ok(allowlist),
62            Err(e) => Err(ErrorObject::owned(e.code() as i32, e.message(), None::<bool>)),
63        };
64    })?;
65
66    module.register_method(RpcMethods::AllowlistAdd.as_str(), |params, context| {
67        info!("rpc_server: allow list add, params {:?}", params);
68        let address = params.one::<String>().map_err(|_| ErrorCode::InvalidParams)?;
69        match context.add_allowlist(&[address.clone()]) {
70            Ok(_) => {
71                trace!("successfully added address:{}", address);
72                Ok::<_, ErrorObject>(())
73            }
74            Err(e) => {
75                error!("failed to add address:{}, error:{:?}", address, e);
76                Err(ErrorObject::owned(e.code() as i32, e.message(), None::<bool>))
77            }
78        }
79    })?;
80
81    module.register_method(RpcMethods::AllowlistRemove.as_str(), |params, context| {
82        info!("rpc_server: allow list remove, params {:?}", params);
83        let address = params.one::<String>().map_err(|_| ErrorCode::InvalidParams)?;
84        match context.remove_allowlist(&[address.clone()]) {
85            Ok(_) => {
86                trace!("successfully removed address:{}", address);
87                Ok::<_, ErrorObject>(())
88            }
89            Err(e) => {
90                error!("failed to remove address:{}, error:{:?}", address, e);
91                Err(ErrorObject::owned(e.code() as i32, e.message(), None::<bool>))
92            }
93        }
94    })?;
95
96    let auth_middleware = tower::ServiceBuilder::new()
97        .layer(tower_http::auth::AddAuthorizationLayer::basic(username, password));
98
99    let server = Server::builder()
100        .set_http_middleware(auth_middleware)
101        .build(SocketAddr::new(ip, port))
102        .await?;
103
104    let addr = server.local_addr()?;
105    let handle = server.start(module);
106    info!("rpc_server: listening on {} on port {}", addr, port);
107
108    let join_handle = tokio::spawn(async move {
109        shutdown_signal.await;
110        handle.stop().expect("not already stopped");
111        handle.stopped().await;
112    });
113
114    Ok((addr, join_handle))
115}
116
117#[cfg(test)]
118mod tests {
119    use clap::Parser;
120    use std::sync::Arc;
121
122    use crate::{
123        config::{SignerArgs, RPC_SERVER_ADDRESS, RPC_SERVER_PORT},
124        grpc::signer::make_handler,
125    };
126
127    use super::start_rpc_server;
128
129    #[tokio::test]
130    async fn test_rpc_server() {
131        let temp_dir = tempfile::tempdir_in(".").unwrap();
132        let datadir = temp_dir.path().to_str().unwrap();
133
134        let ip = RPC_SERVER_ADDRESS.to_string();
135        let port = RPC_SERVER_PORT.to_string();
136        let args = vec![
137            "signer",
138            "--network",
139            "regtest",
140            "--datadir",
141            datadir,
142            "--rpc-server-address",
143            &ip,
144            "--rpc-server-port",
145            &port,
146        ];
147        let signer_args = SignerArgs::parse_from(&args);
148
149        let (root_handler, _muts) = make_handler(datadir, &signer_args);
150        let (shutdown_trigger, shutdown_signal) = triggered::trigger();
151        match start_rpc_server(
152            Arc::clone(root_handler.node()),
153            signer_args.rpc_server_address,
154            signer_args.rpc_server_port,
155            "user",
156            "password",
157            shutdown_signal,
158        )
159        .await
160        {
161            Ok((addr, join_handle)) => {
162                println!("rpc server started at {}", addr);
163                shutdown_trigger.trigger();
164                join_handle.await.unwrap();
165            }
166            Err(e) => {
167                println!("rpc server failed to start: {}", e);
168                assert!(false);
169            }
170        }
171    }
172}