use crate::codec::JsonCodec;
pub use anyhow::Error;
use anyhow::Result;
use core::fmt::Debug;
use futures_util::StreamExt;
use futures_util::sink::SinkExt;
use log::{debug, trace};
use serde::{Serialize, de::DeserializeOwned};
use std::path::Path;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tokio::net::UnixStream;
use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
use tokio_util::codec::{FramedRead, FramedWrite};
pub mod codec;
pub mod hooks;
pub mod jsonrpc;
pub mod model;
pub mod notifications;
pub mod primitives;
#[cfg(test)]
mod test;
pub use crate::model::TypedRequest;
pub use crate::{
model::{Request, Response},
notifications::Notification,
primitives::RpcError,
};
pub struct ClnRpc {
next_id: AtomicUsize,
#[allow(dead_code)]
read: FramedRead<OwnedReadHalf, JsonCodec>,
write: FramedWrite<OwnedWriteHalf, JsonCodec>,
}
impl ClnRpc {
pub async fn new<P>(path: P) -> Result<ClnRpc>
where
P: AsRef<Path>,
{
debug!(
"Connecting to socket at {}",
path.as_ref().to_string_lossy()
);
ClnRpc::from_stream(UnixStream::connect(path).await?)
}
fn from_stream(stream: UnixStream) -> Result<ClnRpc> {
let (read, write) = stream.into_split();
Ok(ClnRpc {
next_id: AtomicUsize::new(1),
read: FramedRead::new(read, JsonCodec::default()),
write: FramedWrite::new(write, JsonCodec::default()),
})
}
pub async fn call_raw<R, P>(&mut self, method: &str, params: &P) -> Result<R, RpcError>
where
P: Serialize + Debug,
R: DeserializeOwned + Debug,
{
trace!("Sending request {} with params {:?}", method, ¶ms);
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let req = serde_json::json!({
"jsonrpc" : "2.0",
"id" : id,
"method" : method,
"params" : params,
});
let response: serde_json::Value = self.call_raw_request(req).await?;
serde_json::from_value(response).map_err(|e| RpcError {
code: None,
message: format!("Failed to parse response {:?}", e),
data: None,
})
}
async fn call_raw_request(
&mut self,
request: serde_json::Value,
) -> Result<serde_json::Value, RpcError>
where {
trace!("Sending request {:?}", request);
self.write.send(request).await.map_err(|e| RpcError {
code: None,
message: format!("Error passing request to lightningd: {}", e),
data: None,
})?;
let mut response: serde_json::Value = self
.read
.next()
.await
.ok_or_else(|| RpcError {
code: None,
message: "no response from lightningd".to_string(),
data: None,
})?
.map_err(|_| RpcError {
code: None,
message: "reading response from socket".to_string(),
data: None,
})?;
match response.get("result") {
Some(_) => Ok(response["result"].take()),
None => {
let _ = response.get("error").ok_or(
RpcError {
code : None,
message : "Invalid response from lightningd. Neither `result` or `error` field is present".to_string(),
data : None
})?;
let rpc_error: RpcError = serde_json::from_value(response["error"].take())
.map_err(|e| RpcError {
code: None,
message: format!(
"Invalid response from lightningd. Failed to parse `error`. {:?}",
e
),
data: None,
})?;
Err(rpc_error)
}
}
}
pub async fn call(&mut self, req: Request) -> Result<Response, RpcError> {
self.call_enum(req).await
}
pub async fn call_enum(&mut self, req: Request) -> Result<Response, RpcError> {
trace!("call : Serialize and deserialize request {:?}", req);
let mut ser = serde_json::to_value(&req).unwrap();
let method: String = if let serde_json::Value::String(method) = ser["method"].take() {
method
} else {
panic!("Method should be string")
};
let params: serde_json::Value = ser["params"].take();
let response: serde_json::Value = self.call_raw(&method, ¶ms).await?;
let response = serde_json::json!({
"method" : method,
"result" : response
});
serde_json::from_value(response).map_err(|e| RpcError {
code: None,
message: format!("Failed to deserialize response : {}", e),
data: None,
})
}
pub async fn call_typed<R>(&mut self, request: &R) -> Result<R::Response, RpcError>
where
R: TypedRequest + Serialize + std::fmt::Debug,
R::Response: DeserializeOwned + std::fmt::Debug,
{
let method = request.method();
self.call_raw::<R::Response, R>(method, request).await
}
}
fn is_none_or_empty<T>(f: &Option<Vec<T>>) -> bool
where
T: Clone,
{
f.as_ref().map_or(true, |value| value.is_empty())
}