gl_plugin/
rpc.rs

1use crate::{requests, responses};
2use clightningrpc::Response;
3use cln_rpc::codec::JsonCodec;
4use futures::{SinkExt, StreamExt};
5use log::{debug, error, warn};
6use serde::{de::DeserializeOwned, Serialize};
7use serde_json::{json, Deserializer, Value};
8use std::path::{Path, PathBuf};
9use thiserror::Error;
10use tokio::net::UnixStream;
11use tokio_util::codec::Framed;
12
13#[derive(Error, Debug)]
14pub enum Error {
15    #[error("error calling RPC: {0}")]
16    RpcError(clightningrpc::Error),
17    #[error("error calling RPC: {0}")]
18    ClnRpcError(anyhow::Error),
19    #[error("empty result from RPC")]
20    EmptyResult,
21    #[error("this is unsupported: {0}")]
22    Unsupported(String),
23    #[error("IO error talking to RPC: {0}")]
24    IoError(#[from] std::io::Error),
25    #[error("error serializing / deserializing: {0}")]
26    Serialization(#[from] serde_json::Error),
27    #[error("unknown error {0}")]
28    Other(Box<dyn std::error::Error + Send + Sync>),
29}
30
31#[derive(Clone, Debug)]
32pub struct LightningClient {
33    sockpath: PathBuf,
34}
35
36impl LightningClient {
37    pub fn new<P: AsRef<Path>>(sockpath: P) -> LightningClient {
38        LightningClient {
39            sockpath: sockpath.as_ref().to_path_buf(),
40        }
41    }
42
43    pub async fn send_request<S: Serialize, D: DeserializeOwned>(
44        &self,
45        method: &str,
46        params: S,
47    ) -> Result<Response<D>, Error> {
48        // Setup connection
49        let stream = UnixStream::connect(&self.sockpath).await?;
50        let mut codec = Framed::new(stream, JsonCodec::default());
51
52        // TODO Re-enable timeout for the socket
53        //stream.set_read_timeout(self.timeout)?;
54        //stream.set_write_timeout(self.timeout)?;
55
56        let request = json!({
57            "method": method,
58            "params": params,
59            "id": 0, // we always open a new connection, so we don't have to care about the nonce
60            "jsonrpc": "2.0",
61        });
62
63        debug!(
64            "Sending request to JSON-RPC: {}",
65            serde_json::to_string(&request).unwrap()
66        );
67
68        if let Err(e) = codec.send(request).await {
69            warn!("Error sending request to RPC interface: {}", e);
70            return Err(Error::ClnRpcError(e));
71        }
72
73        let response = match codec.next().await {
74            Some(Ok(v)) => v,
75            Some(Err(e)) => {
76                warn!("Error from RPC: {:?}", e);
77                return Err(Error::ClnRpcError(e));
78            }
79            None => {
80                warn!("Error reading response from RPC interface, returned None");
81                return Err(Error::EmptyResult);
82            }
83        };
84
85        debug!(
86            "Read response from JSON-RPC: {}",
87            serde_json::to_string(&response).unwrap()
88        );
89
90        // TODO (cdecker) inefficient: serialize just to re-serialize,
91        // but it's how I got it working.
92        let response: Response<D> = Deserializer::from_str(&response.to_string())
93            .into_iter()
94            .next()
95            .map_or(Err(Error::EmptyResult), |res| Ok(res?))?;
96        Ok(response)
97    }
98
99    /// Generic call function for RPC calls.
100    pub async fn call<T: Serialize, U: DeserializeOwned>(
101        &self,
102        method: &str,
103        input: T,
104    ) -> Result<U, Error> {
105        self.send_request(method, input)
106            .await?
107            .into_result()
108            .map_err(|e| Error::RpcError(e))
109    }
110
111    /// Show information about this node.
112    pub async fn getinfo(&self) -> Result<crate::responses::GetInfo, Error> {
113        self.call("getinfo", json!({})).await
114    }
115
116    pub async fn stop(&self) -> Result<(), Error> {
117        match self.call::<Value, ()>("stop", json!({})).await {
118            Ok(()) => Ok(()),
119            Err(e) => {
120                debug!("Ignoring error on `stop` call: {}", e);
121                Ok(())
122            }
123        }
124    }
125
126    pub async fn connect<'a>(
127        &self,
128        req: &requests::Connect<'a>,
129    ) -> Result<responses::Connect, Error> {
130        self.call("connect", req).await
131    }
132
133    pub async fn listpeers(
134        &self,
135        node_id: Option<&str>,
136    ) -> Result<crate::responses::ListPeers, Error> {
137        self.call(
138            "listpeers",
139            requests::ListPeers {
140                id: node_id,
141                level: None,
142            },
143        )
144        .await
145    }
146
147    pub async fn disconnect(&self, node_id: &str, force: bool) -> Result<(), Error> {
148        if force {
149            return Err(Error::Unsupported(
150                "Force-disconnects are currently not supported".to_owned(),
151            ));
152        }
153
154        self.call::<requests::Disconnect, responses::Disconnect>(
155            "disconnect",
156            requests::Disconnect { id: node_id },
157        )
158        .await?;
159        Ok(())
160    }
161
162    pub async fn newaddr(&self, typ: crate::pb::BtcAddressType) -> Result<String, Error> {
163        use crate::pb::BtcAddressType;
164        let addresstype = match typ {
165            BtcAddressType::Bech32 => "bech32",
166            BtcAddressType::P2shSegwit => "p2sh-segwit",
167        };
168        let res: responses::NewAddr = self
169            .call(
170                "newaddr",
171                requests::NewAddr {
172                    addresstype: Some(addresstype),
173                },
174            )
175            .await?;
176
177        let addr = match typ {
178            BtcAddressType::Bech32 => res.bech32.unwrap(),
179            BtcAddressType::P2shSegwit => res.p2sh_segwit.unwrap(),
180        };
181
182        Ok(addr)
183    }
184
185    pub async fn listincoming(&self) -> Result<crate::responses::ListIncoming, Error> {
186        self.call("listincoming", crate::requests::ListIncoming {})
187            .await
188    }
189}