use crate::codec::JsonCodec;
pub use anyhow::Error;
use anyhow::Result;
use core::fmt::Debug;
use futures_util::sink::SinkExt;
use futures_util::StreamExt;
use log::{debug, trace};
use serde::{de::DeserializeOwned, Serialize};
use std::path::Path;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::UnixStream;
use tokio_util::codec::{FramedRead, FramedWrite};
pub mod codec;
pub mod jsonrpc;
pub mod model;
pub mod notifications;
pub mod primitives;
pub use crate::model::TypedRequest;
pub use crate::{
model::{Request, Response},
notifications::Notification,
primitives::RpcError,
};
pub struct ClnRpc {
next_id: AtomicUsize,
#[allow(dead_code)]
read: FramedRead<OwnedReadHalf, JsonCodec>,
write: FramedWrite<OwnedWriteHalf, JsonCodec>,
}
impl ClnRpc {
pub async fn new<P>(path: P) -> Result<ClnRpc>
where
P: AsRef<Path>,
{
debug!(
"Connecting to socket at {}",
path.as_ref().to_string_lossy()
);
ClnRpc::from_stream(UnixStream::connect(path).await?)
}
fn from_stream(stream: UnixStream) -> Result<ClnRpc> {
let (read, write) = stream.into_split();
Ok(ClnRpc {
next_id: AtomicUsize::new(1),
read: FramedRead::new(read, JsonCodec::default()),
write: FramedWrite::new(write, JsonCodec::default()),
})
}
pub async fn call_raw<R, P>(&mut self, method: &str, params: &P) -> Result<R, RpcError>
where
P: Serialize + Debug,
R: DeserializeOwned + Debug,
{
trace!("Sending request {} with params {:?}", method, ¶ms);
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let req = serde_json::json!({
"jsonrpc" : "2.0",
"id" : id,
"method" : method,
"params" : params,
});
let response: serde_json::Value = self.call_raw_request(req).await?;
serde_json::from_value(response).map_err(|e| RpcError {
code: None,
message: format!("Failed to parse response {:?}", e),
data: None,
})
}
async fn call_raw_request(
&mut self,
request: serde_json::Value,
) -> Result<serde_json::Value, RpcError>
where {
trace!("Sending request {:?}", request);
self.write.send(request).await.map_err(|e| RpcError {
code: None,
message: format!("Error passing request to lightningd: {}", e),
data: None,
})?;
let mut response: serde_json::Value = self
.read
.next()
.await
.ok_or_else(|| RpcError {
code: None,
message: "no response from lightningd".to_string(),
data: None,
})?
.map_err(|_| RpcError {
code: None,
message: "reading response from socket".to_string(),
data: None,
})?;
match response.get("result") {
Some(_) => Ok(response["result"].take()),
None => {
let _ = response.get("error").ok_or(
RpcError {
code : None,
message : "Invalid response from lightningd. Neither `result` or `error` field is present".to_string(),
data : None
})?;
let rpc_error: RpcError = serde_json::from_value(response["error"].take())
.map_err(|e| RpcError {
code: None,
message: format!(
"Invalid response from lightningd. Failed to parse `error`. {:?}",
e
),
data: None,
})?;
Err(rpc_error)
}
}
}
pub async fn call(&mut self, req: Request) -> Result<Response, RpcError> {
self.call_enum(req).await
}
pub async fn call_enum(&mut self, req: Request) -> Result<Response, RpcError> {
trace!("call : Serialize and deserialize request {:?}", req);
let mut ser = serde_json::to_value(&req).unwrap();
let method: String = if let serde_json::Value::String(method) = ser["method"].take() {
method
} else {
panic!("Method should be string")
};
let params: serde_json::Value = ser["params"].take();
let response: serde_json::Value = self.call_raw(&method, ¶ms).await?;
let response = serde_json::json!({
"method" : method,
"result" : response
});
serde_json::from_value(response).map_err(|e| RpcError {
code: None,
message: format!("Failed to deserialize response : {}", e),
data: None,
})
}
pub async fn call_typed<R>(&mut self, request: &R) -> Result<R::Response, RpcError>
where
R: TypedRequest + Serialize + std::fmt::Debug,
R::Response: DeserializeOwned + std::fmt::Debug,
{
let method = request.method();
self.call_raw::<R::Response, R>(method, request).await
}
}
fn is_none_or_empty<T>(f: &Option<Vec<T>>) -> bool
where
T: Clone,
{
f.as_ref().map_or(true, |value| value.is_empty())
}
#[cfg(test)]
mod test {
use self::notifications::{BlockAddedNotification, CustomMsgNotification};
use super::*;
use crate::model::*;
use crate::primitives::PublicKey;
use futures_util::StreamExt;
use serde_json::json;
use std::str::FromStr;
use tokio_util::codec::{Framed, FramedRead};
#[tokio::test]
async fn call_raw_request() {
let (uds1, uds2) = UnixStream::pair().unwrap();
let mut cln = ClnRpc::from_stream(uds1).unwrap();
let mut frame = Framed::new(uds2, JsonCodec::default());
let rpc_request = serde_json::json!({
"id" : 1,
"jsonrpc" : "2.0",
"params" : {},
"method" : "some_method"
});
let rpc_request2 = rpc_request.clone();
let rpc_response = serde_json::json!({
"jsonrpc" : "2.0",
"id" : "1",
"result" : {"field_6" : 6}
});
let handle = tokio::task::spawn(async move { cln.call_raw_request(rpc_request2).await });
let read_req = dbg!(frame.next().await.unwrap().unwrap());
assert_eq!(&rpc_request, &read_req);
frame.send(rpc_response).await.unwrap();
let actual_response: Result<serde_json::Value, RpcError> = handle.await.unwrap();
let actual_response = actual_response.unwrap();
assert_eq!(actual_response, json!({"field_6" : 6}));
}
#[tokio::test]
async fn call_raw() {
let req = serde_json::json!({});
let (uds1, uds2) = UnixStream::pair().unwrap();
let mut cln = ClnRpc::from_stream(uds1).unwrap();
let mut read = FramedRead::new(uds2, JsonCodec::default());
tokio::task::spawn(async move {
let _: serde_json::Value = cln.call_raw("getinfo", &req).await.unwrap();
});
let read_req = dbg!(read.next().await.unwrap().unwrap());
assert_eq!(
json!({"id": 1, "method": "getinfo", "params": {}, "jsonrpc": "2.0"}),
read_req
);
}
#[tokio::test]
async fn test_call_enum_remote_error() {
let (uds1, uds2) = UnixStream::pair().unwrap();
let mut cln = ClnRpc::from_stream(uds1).unwrap();
let mut frame = Framed::new(uds2, JsonCodec::default());
let req = Request::Ping(requests::PingRequest {
id: PublicKey::from_str(
"0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b",
)
.unwrap(),
len: None,
pongbytes: None,
});
let mock_resp = json!({
"id" : 1,
"jsonrpc" : "2.0",
"error" : {
"code" : 666,
"message" : "MOCK_ERROR"
}
});
let handle = tokio::task::spawn(async move { cln.call(req).await });
let _ = dbg!(frame.next().await.unwrap().unwrap());
frame.send(mock_resp).await.unwrap();
let rpc_response: Result<_, RpcError> = handle.await.unwrap();
let rpc_error: RpcError = rpc_response.unwrap_err();
println!("RPC_ERROR : {:?}", rpc_error);
assert_eq!(rpc_error.code.unwrap(), 666);
assert_eq!(rpc_error.message, "MOCK_ERROR");
}
#[tokio::test]
async fn test_call_enum() {
let (uds1, uds2) = UnixStream::pair().unwrap();
let mut cln = ClnRpc::from_stream(uds1).unwrap();
let mut frame = Framed::new(uds2, JsonCodec::default());
let req = Request::Ping(requests::PingRequest {
id: PublicKey::from_str(
"0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b",
)
.unwrap(),
len: None,
pongbytes: None,
});
let mock_resp = json!({
"id" : 1,
"jsonrpc" : "2.0",
"result" : { "totlen" : 123 }
});
let handle = tokio::task::spawn(async move { cln.call(req).await });
let read_req = dbg!(frame.next().await.unwrap().unwrap());
assert_eq!(
read_req,
json!({"id" : 1, "jsonrpc" : "2.0", "method" : "ping", "params" : {"id" : "0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b"}})
);
frame.send(mock_resp).await.unwrap();
let rpc_response: Result<_, RpcError> = handle.await.unwrap();
match rpc_response.unwrap() {
Response::Ping(ping) => {
assert_eq!(ping.totlen, 123);
}
_ => panic!("A Request::Getinfo should return Response::Getinfo"),
}
}
#[tokio::test]
async fn test_call_typed() {
let (uds1, uds2) = UnixStream::pair().unwrap();
let mut cln = ClnRpc::from_stream(uds1).unwrap();
let mut frame = Framed::new(uds2, JsonCodec::default());
let req = requests::PingRequest {
id: PublicKey::from_str(
"0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b",
)
.unwrap(),
len: None,
pongbytes: None,
};
let mock_resp = json!({
"id" : 1,
"jsonrpc" : "2.0",
"result" : { "totlen" : 123 }
});
let handle = tokio::task::spawn(async move { cln.call_typed(&req).await });
_ = dbg!(frame.next().await.unwrap().unwrap());
frame.send(mock_resp).await.unwrap();
let rpc_response: Result<_, RpcError> = handle.await.unwrap();
let ping_response = rpc_response.unwrap();
assert_eq!(ping_response.totlen, 123);
}
#[tokio::test]
async fn test_call_typed_remote_error() {
let req = requests::GetinfoRequest {};
let response = json!({
"id" : 1,
"jsonrpc" : "2.0",
"error" : {
"code" : 666,
"message" : "MOCK_ERROR",
}});
let (uds1, uds2) = UnixStream::pair().unwrap();
let mut cln = ClnRpc::from_stream(uds1).unwrap();
let mut frame = Framed::new(uds2, JsonCodec::default());
let handle = tokio::task::spawn(async move { cln.call_typed(&req).await });
let _ = dbg!(frame.next().await.unwrap().unwrap());
frame.send(response).await.unwrap();
let rpc_response = handle.await.unwrap();
let rpc_error = rpc_response.expect_err("Must be an RPC-error response");
assert_eq!(rpc_error.code.unwrap(), 666);
assert_eq!(rpc_error.message, "MOCK_ERROR");
}
#[test]
fn serialize_custom_msg_notification() {
let msg = CustomMsgNotification {
peer_id : PublicKey::from_str("0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b").unwrap(),
payload : String::from("941746573749")
};
let notification = Notification::CustomMsg(msg);
assert_eq!(
serde_json::to_value(notification).unwrap(),
serde_json::json!(
{
"custommsg" : {
"peer_id" : "0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b",
"payload" : "941746573749"
}
}
)
);
}
#[test]
fn serialize_block_added_notification() {
let block_added = BlockAddedNotification {
hash : crate::primitives::Sha256::from_str("000000000000000000000acab8abe0c67a52ed7e5a90a19c64930ff11fa84eca").unwrap(),
height : 830702
};
let notification = Notification::BlockAdded(block_added);
assert_eq!(
serde_json::to_value(notification).unwrap(),
serde_json::json!({
"block_added" : {
"hash" : "000000000000000000000acab8abe0c67a52ed7e5a90a19c64930ff11fa84eca",
"height" : 830702
}
})
)
}
#[test]
fn deserialize_connect_notification() {
let connect_json = serde_json::json!({
"connect" : {
"address" : {
"address" : "127.0.0.1",
"port" : 38012,
"type" : "ipv4"
},
"direction" : "in",
"id" : "022d223620a359a47ff7f7ac447c85c46c923da53389221a0054c11c1e3ca31d59"
}
});
let _ : Notification = serde_json::from_value(connect_json).unwrap();
}
}