1use crate::codec::JsonCodec;
79pub use anyhow::Error;
80use anyhow::Result;
81use core::fmt::Debug;
82use futures_util::sink::SinkExt;
83use futures_util::StreamExt;
84use log::{debug, trace};
85use serde::{de::DeserializeOwned, Serialize};
86use std::path::Path;
87use std::sync::atomic::AtomicUsize;
88use std::sync::atomic::Ordering;
89use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
90use tokio::net::UnixStream;
91use tokio_util::codec::{FramedRead, FramedWrite};
92
93pub mod codec;
94pub mod jsonrpc;
95pub mod model;
96pub mod notifications;
97pub mod primitives;
98
99pub use crate::model::TypedRequest;
100pub use crate::{
101 model::{Request, Response},
102 notifications::Notification,
103 primitives::RpcError,
104};
105
106pub struct ClnRpc {
111 next_id: AtomicUsize,
112
113 #[allow(dead_code)]
114 read: FramedRead<OwnedReadHalf, JsonCodec>,
115 write: FramedWrite<OwnedWriteHalf, JsonCodec>,
116}
117
118impl ClnRpc {
119 pub async fn new<P>(path: P) -> Result<ClnRpc>
120 where
121 P: AsRef<Path>,
122 {
123 debug!(
124 "Connecting to socket at {}",
125 path.as_ref().to_string_lossy()
126 );
127 ClnRpc::from_stream(UnixStream::connect(path).await?)
128 }
129
130 fn from_stream(stream: UnixStream) -> Result<ClnRpc> {
131 let (read, write) = stream.into_split();
132
133 Ok(ClnRpc {
134 next_id: AtomicUsize::new(1),
135 read: FramedRead::new(read, JsonCodec::default()),
136 write: FramedWrite::new(write, JsonCodec::default()),
137 })
138 }
139
140 pub async fn call_raw<R, P>(&mut self, method: &str, params: &P) -> Result<R, RpcError>
168 where
169 P: Serialize + Debug,
170 R: DeserializeOwned + Debug,
171 {
172 trace!("Sending request {} with params {:?}", method, ¶ms);
173 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
174
175 let req = serde_json::json!({
178 "jsonrpc" : "2.0",
179 "id" : id,
180 "method" : method,
181 "params" : params,
182 });
183
184 let response: serde_json::Value = self.call_raw_request(req).await?;
185
186 serde_json::from_value(response).map_err(|e| RpcError {
187 code: None,
188 message: format!("Failed to parse response {:?}", e),
189 data: None,
190 })
191 }
192
193 async fn call_raw_request(
221 &mut self,
222 request: serde_json::Value,
223 ) -> Result<serde_json::Value, RpcError>
224where {
225 trace!("Sending request {:?}", request);
226 self.write.send(request).await.map_err(|e| RpcError {
227 code: None,
228 message: format!("Error passing request to lightningd: {}", e),
229 data: None,
230 })?;
231
232 let mut response: serde_json::Value = self
233 .read
234 .next()
235 .await
236 .ok_or_else(|| RpcError {
237 code: None,
238 message: "no response from lightningd".to_string(),
239 data: None,
240 })?
241 .map_err(|_| RpcError {
242 code: None,
243 message: "reading response from socket".to_string(),
244 data: None,
245 })?;
246
247 match response.get("result") {
248 Some(_) => Ok(response["result"].take()),
249 None => {
250 let _ = response.get("error").ok_or(
251 RpcError {
252 code : None,
253 message : "Invalid response from lightningd. Neither `result` or `error` field is present".to_string(),
254 data : None
255 })?;
256 let rpc_error: RpcError = serde_json::from_value(response["error"].take())
257 .map_err(|e| RpcError {
258 code: None,
259 message: format!(
260 "Invalid response from lightningd. Failed to parse `error`. {:?}",
261 e
262 ),
263 data: None,
264 })?;
265 Err(rpc_error)
266 }
267 }
268 }
269
270 pub async fn call(&mut self, req: Request) -> Result<Response, RpcError> {
271 self.call_enum(req).await
272 }
273
274 pub async fn call_enum(&mut self, req: Request) -> Result<Response, RpcError> {
276 trace!("call : Serialize and deserialize request {:?}", req);
277 let mut ser = serde_json::to_value(&req).unwrap();
279 let method: String = if let serde_json::Value::String(method) = ser["method"].take() {
280 method
281 } else {
282 panic!("Method should be string")
283 };
284 let params: serde_json::Value = ser["params"].take();
285
286 let response: serde_json::Value = self.call_raw(&method, ¶ms).await?;
287 let response = serde_json::json!({
288 "method" : method,
289 "result" : response
290 });
291
292 serde_json::from_value(response).map_err(|e| RpcError {
295 code: None,
296 message: format!("Failed to deserialize response : {}", e),
297 data: None,
298 })
299 }
300
301 pub async fn call_typed<R>(&mut self, request: &R) -> Result<R::Response, RpcError>
315 where
316 R: TypedRequest + Serialize + std::fmt::Debug,
317 R::Response: DeserializeOwned + std::fmt::Debug,
318 {
319 let method = request.method();
320 self.call_raw::<R::Response, R>(method, request).await
321 }
322}
323
324fn is_none_or_empty<T>(f: &Option<Vec<T>>) -> bool
326where
327 T: Clone,
328{
329 f.as_ref().map_or(true, |value| value.is_empty())
330}
331
332#[cfg(test)]
333mod test {
334 use self::notifications::{BlockAddedNotification, CustomMsgNotification};
335
336 use super::*;
337 use crate::model::*;
338 use crate::primitives::PublicKey;
339 use futures_util::StreamExt;
340 use serde_json::json;
341 use std::str::FromStr;
342 use tokio_util::codec::{Framed, FramedRead};
343
344 #[tokio::test]
345 async fn call_raw_request() {
346 let (uds1, uds2) = UnixStream::pair().unwrap();
349 let mut cln = ClnRpc::from_stream(uds1).unwrap();
350 let mut frame = Framed::new(uds2, JsonCodec::default());
351
352 let rpc_request = serde_json::json!({
354 "id" : 1,
355 "jsonrpc" : "2.0",
356 "params" : {},
357 "method" : "some_method"
358 });
359 let rpc_request2 = rpc_request.clone();
360
361 let rpc_response = serde_json::json!({
362 "jsonrpc" : "2.0",
363 "id" : "1",
364 "result" : {"field_6" : 6}
365 });
366
367 let handle = tokio::task::spawn(async move { cln.call_raw_request(rpc_request2).await });
370
371 let read_req = dbg!(frame.next().await.unwrap().unwrap());
374 assert_eq!(&rpc_request, &read_req);
375 frame.send(rpc_response).await.unwrap();
376
377 let actual_response: Result<serde_json::Value, RpcError> = handle.await.unwrap();
379 let actual_response = actual_response.unwrap();
380 assert_eq!(actual_response, json!({"field_6" : 6}));
381 }
382
383 #[tokio::test]
384 async fn call_raw() {
385 let req = serde_json::json!({});
386 let (uds1, uds2) = UnixStream::pair().unwrap();
387 let mut cln = ClnRpc::from_stream(uds1).unwrap();
388
389 let mut read = FramedRead::new(uds2, JsonCodec::default());
390 tokio::task::spawn(async move {
391 let _: serde_json::Value = cln.call_raw("getinfo", &req).await.unwrap();
392 });
393
394 let read_req = dbg!(read.next().await.unwrap().unwrap());
395
396 assert_eq!(
397 json!({"id": 1, "method": "getinfo", "params": {}, "jsonrpc": "2.0"}),
398 read_req
399 );
400 }
401
402 #[tokio::test]
403 async fn test_call_enum_remote_error() {
404 let (uds1, uds2) = UnixStream::pair().unwrap();
407 let mut cln = ClnRpc::from_stream(uds1).unwrap();
408 let mut frame = Framed::new(uds2, JsonCodec::default());
409
410 let req = Request::Ping(requests::PingRequest {
412 id: PublicKey::from_str(
413 "0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b",
414 )
415 .unwrap(),
416 len: None,
417 pongbytes: None,
418 });
419
420 let mock_resp = json!({
421 "id" : 1,
422 "jsonrpc" : "2.0",
423 "error" : {
424 "code" : 666,
425 "message" : "MOCK_ERROR"
426 }
427 });
428
429 let handle = tokio::task::spawn(async move { cln.call(req).await });
431
432 let _ = dbg!(frame.next().await.unwrap().unwrap());
434 frame.send(mock_resp).await.unwrap();
435
436 let rpc_response: Result<_, RpcError> = handle.await.unwrap();
437 let rpc_error: RpcError = rpc_response.unwrap_err();
438
439 println!("RPC_ERROR : {:?}", rpc_error);
440 assert_eq!(rpc_error.code.unwrap(), 666);
441 assert_eq!(rpc_error.message, "MOCK_ERROR");
442 }
443
444 #[tokio::test]
445 async fn test_call_enum() {
446 let (uds1, uds2) = UnixStream::pair().unwrap();
449 let mut cln = ClnRpc::from_stream(uds1).unwrap();
450 let mut frame = Framed::new(uds2, JsonCodec::default());
451
452 let req = Request::Ping(requests::PingRequest {
455 id: PublicKey::from_str(
456 "0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b",
457 )
458 .unwrap(),
459 len: None,
460 pongbytes: None,
461 });
462 let mock_resp = json!({
463 "id" : 1,
464 "jsonrpc" : "2.0",
465 "result" : { "totlen" : 123 }
466 });
467
468 let handle = tokio::task::spawn(async move { cln.call(req).await });
470
471 let read_req = dbg!(frame.next().await.unwrap().unwrap());
473 assert_eq!(
474 read_req,
475 json!({"id" : 1, "jsonrpc" : "2.0", "method" : "ping", "params" : {"id" : "0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b"}})
476 );
477 frame.send(mock_resp).await.unwrap();
478
479 let rpc_response: Result<_, RpcError> = handle.await.unwrap();
481 match rpc_response.unwrap() {
482 Response::Ping(ping) => {
483 assert_eq!(ping.totlen, 123);
484 }
485 _ => panic!("A Request::Getinfo should return Response::Getinfo"),
486 }
487 }
488
489 #[tokio::test]
490 async fn test_call_typed() {
491 let (uds1, uds2) = UnixStream::pair().unwrap();
494 let mut cln = ClnRpc::from_stream(uds1).unwrap();
495 let mut frame = Framed::new(uds2, JsonCodec::default());
496
497 let req = requests::PingRequest {
500 id: PublicKey::from_str(
501 "0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b",
502 )
503 .unwrap(),
504 len: None,
505 pongbytes: None,
506 };
507 let mock_resp = json!({
508 "id" : 1,
509 "jsonrpc" : "2.0",
510 "result" : { "totlen" : 123 }
511 });
512
513 let handle = tokio::task::spawn(async move { cln.call_typed(&req).await });
515
516 _ = dbg!(frame.next().await.unwrap().unwrap());
518 frame.send(mock_resp).await.unwrap();
519
520 let rpc_response: Result<_, RpcError> = handle.await.unwrap();
522 let ping_response = rpc_response.unwrap();
523 assert_eq!(ping_response.totlen, 123);
524 }
525
526 #[tokio::test]
527 async fn test_call_typed_remote_error() {
528 let req = requests::GetinfoRequest {};
530
531 let response = json!({
533 "id" : 1,
534 "jsonrpc" : "2.0",
535 "error" : {
536 "code" : 666,
537 "message" : "MOCK_ERROR",
538 }});
539
540 let (uds1, uds2) = UnixStream::pair().unwrap();
541 let mut cln = ClnRpc::from_stream(uds1).unwrap();
542
543 let mut frame = Framed::new(uds2, JsonCodec::default());
545
546 let handle = tokio::task::spawn(async move { cln.call_typed(&req).await });
547
548 let _ = dbg!(frame.next().await.unwrap().unwrap());
550 frame.send(response).await.unwrap();
551
552 let rpc_response = handle.await.unwrap();
553 let rpc_error = rpc_response.expect_err("Must be an RPC-error response");
554
555 assert_eq!(rpc_error.code.unwrap(), 666);
556 assert_eq!(rpc_error.message, "MOCK_ERROR");
557 }
558
559 #[test]
560 fn serialize_custom_msg_notification() {
561 let msg = CustomMsgNotification {
562 peer_id : PublicKey::from_str("0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b").unwrap(),
563 payload : String::from("941746573749")
564 };
565
566 let notification = Notification::CustomMsg(msg);
567
568 assert_eq!(
569 serde_json::to_value(notification).unwrap(),
570 serde_json::json!(
571 {
572 "custommsg" : {
573 "peer_id" : "0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b",
574 "payload" : "941746573749"
575 }
576 }
577 )
578 );
579
580 }
581
582 #[test]
583 fn serialize_block_added_notification() {
584 let block_added = BlockAddedNotification {
585 hash : crate::primitives::Sha256::from_str("000000000000000000000acab8abe0c67a52ed7e5a90a19c64930ff11fa84eca").unwrap(),
586 height : 830702
587 };
588
589 let notification = Notification::BlockAdded(block_added);
590
591 assert_eq!(
592 serde_json::to_value(notification).unwrap(),
593 serde_json::json!({
594 "block_added" : {
595 "hash" : "000000000000000000000acab8abe0c67a52ed7e5a90a19c64930ff11fa84eca",
596 "height" : 830702
597 }
598 })
599 )
600 }
601
602 #[test]
603 fn deserialize_connect_notification() {
604 let connect_json = serde_json::json!({
605 "connect" : {
606 "address" : {
607 "address" : "127.0.0.1",
608 "port" : 38012,
609 "type" : "ipv4"
610 },
611 "direction" : "in",
612 "id" : "022d223620a359a47ff7f7ac447c85c46c923da53389221a0054c11c1e3ca31d59"
613 }
614 });
615
616 let _ : Notification = serde_json::from_value(connect_json).unwrap();
617 }
618}