use crate::{requests, responses};
use clightningrpc::Response;
use cln_rpc::codec::JsonCodec;
use futures::{SinkExt, StreamExt};
use log::{debug, error, warn};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::{json, Deserializer, Value};
use std::path::{Path, PathBuf};
use thiserror::Error;
use tokio::net::UnixStream;
use tokio_util::codec::Framed;
#[derive(Error, Debug)]
pub enum Error {
#[error("error calling RPC: {0}")]
RpcError(clightningrpc::Error),
#[error("error calling RPC: {0}")]
ClnRpcError(anyhow::Error),
#[error("empty result from RPC")]
EmptyResult,
#[error("this is unsupported: {0}")]
Unsupported(String),
#[error("IO error talking to RPC: {0}")]
IoError(#[from] std::io::Error),
#[error("error serializing / deserializing: {0}")]
Serialization(#[from] serde_json::Error),
#[error("unknown error {0}")]
Other(Box<dyn std::error::Error + Send + Sync>),
}
#[derive(Clone, Debug)]
pub struct LightningClient {
sockpath: PathBuf,
}
impl LightningClient {
pub fn new<P: AsRef<Path>>(sockpath: P) -> LightningClient {
LightningClient {
sockpath: sockpath.as_ref().to_path_buf(),
}
}
pub async fn send_request<S: Serialize, D: DeserializeOwned>(
&self,
method: &str,
params: S,
) -> Result<Response<D>, Error> {
let stream = UnixStream::connect(&self.sockpath).await?;
let mut codec = Framed::new(stream, JsonCodec::default());
let request = json!({
"method": method,
"params": params,
"id": 0, "jsonrpc": "2.0",
});
debug!(
"Sending request to JSON-RPC: {}",
serde_json::to_string(&request).unwrap()
);
if let Err(e) = codec.send(request).await {
warn!("Error sending request to RPC interface: {}", e);
return Err(Error::ClnRpcError(e));
}
let response = match codec.next().await {
Some(Ok(v)) => v,
Some(Err(e)) => {
warn!("Error from RPC: {:?}", e);
return Err(Error::ClnRpcError(e));
}
None => {
warn!("Error reading response from RPC interface, returned None");
return Err(Error::EmptyResult);
}
};
debug!(
"Read response from JSON-RPC: {}",
serde_json::to_string(&response).unwrap()
);
let response: Response<D> = Deserializer::from_str(&response.to_string())
.into_iter()
.next()
.map_or(Err(Error::EmptyResult), |res| Ok(res?))?;
Ok(response)
}
pub async fn call<T: Serialize, U: DeserializeOwned>(
&self,
method: &str,
input: T,
) -> Result<U, Error> {
self.send_request(method, input)
.await?
.into_result()
.map_err(|e| Error::RpcError(e))
}
pub async fn getinfo(&self) -> Result<crate::responses::GetInfo, Error> {
self.call("getinfo", json!({})).await
}
pub async fn stop(&self) -> Result<(), Error> {
match self.call::<Value, ()>("stop", json!({})).await {
Ok(()) => Ok(()),
Err(e) => {
debug!("Ignoring error on `stop` call: {}", e);
Ok(())
}
}
}
pub async fn connect<'a>(
&self,
req: &requests::Connect<'a>,
) -> Result<responses::Connect, Error> {
self.call("connect", req).await
}
pub async fn listpeers(
&self,
node_id: Option<&str>,
) -> Result<crate::responses::ListPeers, Error> {
self.call(
"listpeers",
requests::ListPeers {
id: node_id,
level: None,
},
)
.await
}
pub async fn disconnect(&self, node_id: &str, force: bool) -> Result<(), Error> {
if force {
return Err(Error::Unsupported(
"Force-disconnects are currently not supported".to_owned(),
));
}
self.call::<requests::Disconnect, responses::Disconnect>(
"disconnect",
requests::Disconnect { id: node_id },
)
.await?;
Ok(())
}
pub async fn newaddr(&self, typ: crate::pb::BtcAddressType) -> Result<String, Error> {
use crate::pb::BtcAddressType;
let addresstype = match typ {
BtcAddressType::Bech32 => "bech32",
BtcAddressType::P2shSegwit => "p2sh-segwit",
};
let res: responses::NewAddr = self
.call(
"newaddr",
requests::NewAddr {
addresstype: Some(addresstype),
},
)
.await?;
let addr = match typ {
BtcAddressType::Bech32 => res.bech32.unwrap(),
BtcAddressType::P2shSegwit => res.p2sh_segwit.unwrap(),
};
Ok(addr)
}
pub async fn listincoming(&self) -> Result<crate::responses::ListIncoming, Error> {
self.call("listincoming", crate::requests::ListIncoming {})
.await
}
}