cln_rpc/
lib.rs

1//! # A Core Lightning RPC-client
2//!
3//! Core Lightning exposes a JSON-RPC interface over unix-domain sockets.
4//! The unix-domain socket appears like file and located by default in
5//! `~/.lightning/<network>/lightning-rpc`.
6//!
7//! This crate contains an RPC-client called [ClnRpc] and models
8//! for most [requests](crate::model::requests) and [responses](crate::model::responses).
9//!
10//! The example below shows how to initiate the client and celss the `getinfo`-rpc method.
11//!
12//! ```no_run
13//! use std::path::Path;
14//! use tokio_test;
15//! use cln_rpc::{ClnRpc, TypedRequest};
16//! use cln_rpc::model::requests::GetinfoRequest;
17//! use cln_rpc::model::responses::GetinfoResponse;
18//!
19//! tokio_test::block_on( async {
20//!     let path = Path::new("path_to_lightning_dir");
21//!     let mut rpc = ClnRpc::new(path).await.unwrap();
22//!     let request = GetinfoRequest {};
23//!     let response : GetinfoResponse = rpc.call_typed(&request).await.unwrap();
24//! });
25//! ```
26//!
27//! If the required model is not available you can implement [`TypedRequest`]
28//! and use [`ClnRpc::call_typed`] without a problem.
29//!
30//! ```no_run
31//! use std::path::Path;
32//! use tokio_test;
33//! use cln_rpc::{ClnRpc, TypedRequest};
34//! use serde::{Serialize, Deserialize};
35//!
36//! #[derive(Serialize, Debug)]
37//! struct CustomMethodRequest {
38//!     param_a : String
39//! };
40//! #[derive(Deserialize, Debug)]
41//! struct CustomMethodResponse {
42//!     field_a : String
43//! };
44//!
45//! impl TypedRequest for CustomMethodRequest {
46//!     type Response = CustomMethodResponse;
47//!
48//!     fn method(&self) -> &str {
49//!         "custommethod"
50//!     }
51//! }
52//!
53//! tokio_test::block_on( async {
54//!     let path = Path::new("path_to_lightning_dir");
55//!     let mut rpc = ClnRpc::new(path).await.unwrap();
56//!
57//!     let request = CustomMethodRequest { param_a : String::from("example")};
58//!     let response = rpc.call_typed(&request).await.unwrap();
59//! })
60//! ```
61//!
62//! An alternative is to use [`ClnRpc::call_raw`].
63//!
64//! ```no_run
65//! use std::path::Path;
66//! use tokio_test;
67//! use cln_rpc::{ClnRpc, TypedRequest};
68//!
69//! tokio_test::block_on( async {
70//!     let path = Path::new("path_to_lightning_dir");
71//!     let mut rpc = ClnRpc::new(path).await.unwrap();
72//!     let method = "custommethod";
73//!     let request = serde_json::json!({"param_a" : "example"});
74//!     let response : serde_json::Value = rpc.call_raw(method, &request).await.unwrap();
75//! })
76//! ```
77//!
78use crate::codec::JsonCodec;
79pub use anyhow::Error;
80use anyhow::Result;
81use core::fmt::Debug;
82use futures_util::sink::SinkExt;
83use futures_util::StreamExt;
84use log::{debug, trace};
85use serde::{de::DeserializeOwned, Serialize};
86use std::path::Path;
87use std::sync::atomic::AtomicUsize;
88use std::sync::atomic::Ordering;
89use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
90use tokio::net::UnixStream;
91use tokio_util::codec::{FramedRead, FramedWrite};
92
93pub mod codec;
94pub mod jsonrpc;
95pub mod model;
96pub mod notifications;
97pub mod primitives;
98
99pub use crate::model::TypedRequest;
100pub use crate::{
101    model::{Request, Response},
102    notifications::Notification,
103    primitives::RpcError,
104};
105
106/// An RPC-client for Core Lightning
107///
108///
109///
110pub struct ClnRpc {
111    next_id: AtomicUsize,
112
113    #[allow(dead_code)]
114    read: FramedRead<OwnedReadHalf, JsonCodec>,
115    write: FramedWrite<OwnedWriteHalf, JsonCodec>,
116}
117
118impl ClnRpc {
119    pub async fn new<P>(path: P) -> Result<ClnRpc>
120    where
121        P: AsRef<Path>,
122    {
123        debug!(
124            "Connecting to socket at {}",
125            path.as_ref().to_string_lossy()
126        );
127        ClnRpc::from_stream(UnixStream::connect(path).await?)
128    }
129
130    fn from_stream(stream: UnixStream) -> Result<ClnRpc> {
131        let (read, write) = stream.into_split();
132
133        Ok(ClnRpc {
134            next_id: AtomicUsize::new(1),
135            read: FramedRead::new(read, JsonCodec::default()),
136            write: FramedWrite::new(write, JsonCodec::default()),
137        })
138    }
139
140    /// Low-level API to call the rpc.
141    ///
142    /// An interesting choice of `R` and `P` is [`serde_json::Value`] because it allows
143    /// ad-hoc calls to custom RPC-methods
144    ///
145    /// If you are using a model such as [`crate::model::requests::GetinfoRequest`] you'd
146    /// probably want to use [`Self::call_typed`] instead.
147    ///
148    /// Example:
149    /// ```no_run
150    /// use cln_rpc::ClnRpc;
151    /// use cln_rpc::model::{requests::GetinfoRequest, responses::GetinfoResponse, responses::ListfundsResponse};
152    /// use std::path::Path;
153    /// use tokio_test;
154    /// tokio_test::block_on( async {
155    ///
156    ///    // Call using json-values
157    ///    let mut cln = ClnRpc::new(Path::new("./lightningd/rpc")).await.unwrap();
158    ///    let request = serde_json::json!({});
159    ///    let response : serde_json::Value = cln.call_raw("getinfo", &request).await.unwrap();
160    ///
161    ///    // Using a model
162    ///    // Prefer to use call_typed instead
163    ///    let request = GetinfoRequest {};
164    ///    let response : GetinfoResponse = cln.call_raw("getinfo", &request).await.unwrap();
165    /// })
166    /// ```
167    pub async fn call_raw<R, P>(&mut self, method: &str, params: &P) -> Result<R, RpcError>
168    where
169        P: Serialize + Debug,
170        R: DeserializeOwned + Debug,
171    {
172        trace!("Sending request {} with params {:?}", method, &params);
173        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
174
175        // TODO: Can we make this nicer
176        // I don't want to have this json_rpc : 2.0 floating everywhere
177        let req = serde_json::json!({
178            "jsonrpc" : "2.0",
179            "id" : id,
180            "method" : method,
181            "params" : params,
182        });
183
184        let response: serde_json::Value = self.call_raw_request(req).await?;
185
186        serde_json::from_value(response).map_err(|e| RpcError {
187            code: None,
188            message: format!("Failed to parse response {:?}", e),
189            data: None,
190        })
191    }
192
193    /// A low level method to call raw requests
194    ///
195    /// This method is private by intention.
196    /// The caller is (implicitly) providing the `id` of the JsonRpcRequest.
197    /// This is dangerous because the caller might pick a non-unique id.
198    ///
199    /// The request should serialize to a valid JsonRpcMessage.
200    /// If the response is succesful the content of the "result" field is returned
201    /// If the response is an error the content of the "error" field is returned
202    ///
203    /// ```no_run
204    /// use std::path::Path;
205    /// use cln_rpc::ClnRpc;
206    /// use tokio_test;
207    /// tokio_test::block_on( async {
208    ///     let request = serde_json::json!({
209    ///       "id" : 1,
210    ///       "jsonrpc" : "2.0",
211    ///       "method" : "some_method",
212    ///       "params" : {}
213    ///      }
214    ///     );
215    ///     let rpc = ClnRpc::new(Path::new("my_path_to_rpc_file"));
216    ///     // let resp : serde_json::Value = rpc.call_raw_request(request).await.unwrap();
217    /// })
218    /// ```
219    ///
220    async fn call_raw_request(
221        &mut self,
222        request: serde_json::Value,
223    ) -> Result<serde_json::Value, RpcError>
224where {
225        trace!("Sending request {:?}", request);
226        self.write.send(request).await.map_err(|e| RpcError {
227            code: None,
228            message: format!("Error passing request to lightningd: {}", e),
229            data: None,
230        })?;
231
232        let mut response: serde_json::Value = self
233            .read
234            .next()
235            .await
236            .ok_or_else(|| RpcError {
237                code: None,
238                message: "no response from lightningd".to_string(),
239                data: None,
240            })?
241            .map_err(|_| RpcError {
242                code: None,
243                message: "reading response from socket".to_string(),
244                data: None,
245            })?;
246
247        match response.get("result") {
248            Some(_) => Ok(response["result"].take()),
249            None => {
250                let _ = response.get("error").ok_or(
251                    RpcError {
252                        code : None,
253                        message : "Invalid response from lightningd. Neither `result` or `error` field is present".to_string(),
254                        data : None
255                    })?;
256                let rpc_error: RpcError = serde_json::from_value(response["error"].take())
257                    .map_err(|e| RpcError {
258                        code: None,
259                        message: format!(
260                            "Invalid response from lightningd. Failed to parse `error`. {:?}",
261                            e
262                        ),
263                        data: None,
264                    })?;
265                Err(rpc_error)
266            }
267        }
268    }
269
270    pub async fn call(&mut self, req: Request) -> Result<Response, RpcError> {
271        self.call_enum(req).await
272    }
273
274    /// Performs an rpc-call
275    pub async fn call_enum(&mut self, req: Request) -> Result<Response, RpcError> {
276        trace!("call : Serialize and deserialize request {:?}", req);
277        // A little bit hacky. But serialize the request to get the method name
278        let mut ser = serde_json::to_value(&req).unwrap();
279        let method: String = if let serde_json::Value::String(method) = ser["method"].take() {
280            method
281        } else {
282            panic!("Method should be string")
283        };
284        let params: serde_json::Value = ser["params"].take();
285
286        let response: serde_json::Value = self.call_raw(&method, &params).await?;
287        let response = serde_json::json!({
288            "method" : method,
289            "result" : response
290        });
291
292        // Parse the response
293        // We add the `method` here because the Response-enum uses it to determine the type
294        serde_json::from_value(response).map_err(|e| RpcError {
295            code: None,
296            message: format!("Failed to deserialize response : {}", e),
297            data: None,
298        })
299    }
300
301    /// Performs an rpc-call and performs type-checking.
302    ///
303    /// ```no_run
304    /// use cln_rpc::ClnRpc;
305    /// use cln_rpc::model::requests::GetinfoRequest;
306    /// use std::path::Path;
307    /// use tokio_test;
308    /// tokio_test::block_on( async {
309    ///    let mut rpc = ClnRpc::new(Path::new("path_to_rpc")).await.unwrap();
310    ///    let request = GetinfoRequest {};
311    ///    let response = rpc.call_typed(&request);
312    /// })
313    /// ```
314    pub async fn call_typed<R>(&mut self, request: &R) -> Result<R::Response, RpcError>
315    where
316        R: TypedRequest + Serialize + std::fmt::Debug,
317        R::Response: DeserializeOwned + std::fmt::Debug,
318    {
319        let method = request.method();
320        self.call_raw::<R::Response, R>(method, request).await
321    }
322}
323
324/// Used to skip optional arrays when serializing requests.
325fn is_none_or_empty<T>(f: &Option<Vec<T>>) -> bool
326where
327    T: Clone,
328{
329    f.as_ref().map_or(true, |value| value.is_empty())
330}
331
332#[cfg(test)]
333mod test {
334    use self::notifications::{BlockAddedNotification, CustomMsgNotification};
335
336    use super::*;
337    use crate::model::*;
338    use crate::primitives::PublicKey;
339    use futures_util::StreamExt;
340    use serde_json::json;
341    use std::str::FromStr;
342    use tokio_util::codec::{Framed, FramedRead};
343
344    #[tokio::test]
345    async fn call_raw_request() {
346        // Set up a pair of unix-streams
347        // The frame is a mock rpc-server
348        let (uds1, uds2) = UnixStream::pair().unwrap();
349        let mut cln = ClnRpc::from_stream(uds1).unwrap();
350        let mut frame = Framed::new(uds2, JsonCodec::default());
351
352        // Define the request and response send in the RPC-message
353        let rpc_request = serde_json::json!({
354            "id" : 1,
355            "jsonrpc" : "2.0",
356            "params" : {},
357            "method" : "some_method"
358        });
359        let rpc_request2 = rpc_request.clone();
360
361        let rpc_response = serde_json::json!({
362            "jsonrpc" : "2.0",
363            "id" : "1",
364            "result" : {"field_6" : 6}
365        });
366
367        // Spawn the task that performs the RPC-call
368        // Check that it reads the response correctly
369        let handle = tokio::task::spawn(async move { cln.call_raw_request(rpc_request2).await });
370
371        // Verify that our emulated server received a request
372        // and sendt the response
373        let read_req = dbg!(frame.next().await.unwrap().unwrap());
374        assert_eq!(&rpc_request, &read_req);
375        frame.send(rpc_response).await.unwrap();
376
377        // Get the result from `call_raw_request` and verify
378        let actual_response: Result<serde_json::Value, RpcError> = handle.await.unwrap();
379        let actual_response = actual_response.unwrap();
380        assert_eq!(actual_response, json!({"field_6" : 6}));
381    }
382
383    #[tokio::test]
384    async fn call_raw() {
385        let req = serde_json::json!({});
386        let (uds1, uds2) = UnixStream::pair().unwrap();
387        let mut cln = ClnRpc::from_stream(uds1).unwrap();
388
389        let mut read = FramedRead::new(uds2, JsonCodec::default());
390        tokio::task::spawn(async move {
391            let _: serde_json::Value = cln.call_raw("getinfo", &req).await.unwrap();
392        });
393
394        let read_req = dbg!(read.next().await.unwrap().unwrap());
395
396        assert_eq!(
397            json!({"id": 1, "method": "getinfo", "params": {}, "jsonrpc": "2.0"}),
398            read_req
399        );
400    }
401
402    #[tokio::test]
403    async fn test_call_enum_remote_error() {
404        // Set up the rpc-connection
405        // The frame represents a Mock rpc-server
406        let (uds1, uds2) = UnixStream::pair().unwrap();
407        let mut cln = ClnRpc::from_stream(uds1).unwrap();
408        let mut frame = Framed::new(uds2, JsonCodec::default());
409
410        // Construct the request and response
411        let req = Request::Ping(requests::PingRequest {
412            id: PublicKey::from_str(
413                "0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b",
414            )
415            .unwrap(),
416            len: None,
417            pongbytes: None,
418        });
419
420        let mock_resp = json!({
421            "id" : 1,
422            "jsonrpc" : "2.0",
423            "error" : {
424                "code" : 666,
425                "message" : "MOCK_ERROR"
426            }
427        });
428
429        // Spawn the task which calls the rpc
430        let handle = tokio::task::spawn(async move { cln.call(req).await });
431
432        // Ensure the mock receives the request and returns a response
433        let _ = dbg!(frame.next().await.unwrap().unwrap());
434        frame.send(mock_resp).await.unwrap();
435
436        let rpc_response: Result<_, RpcError> = handle.await.unwrap();
437        let rpc_error: RpcError = rpc_response.unwrap_err();
438
439        println!("RPC_ERROR : {:?}", rpc_error);
440        assert_eq!(rpc_error.code.unwrap(), 666);
441        assert_eq!(rpc_error.message, "MOCK_ERROR");
442    }
443
444    #[tokio::test]
445    async fn test_call_enum() {
446        // Set up the rpc-connection
447        // The frame represents a Mock rpc-server
448        let (uds1, uds2) = UnixStream::pair().unwrap();
449        let mut cln = ClnRpc::from_stream(uds1).unwrap();
450        let mut frame = Framed::new(uds2, JsonCodec::default());
451
452        // We'll use the Ping request here because both the request
453        // and response have few arguments
454        let req = Request::Ping(requests::PingRequest {
455            id: PublicKey::from_str(
456                "0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b",
457            )
458            .unwrap(),
459            len: None,
460            pongbytes: None,
461        });
462        let mock_resp = json!({
463            "id" : 1,
464            "jsonrpc" : "2.0",
465            "result" : { "totlen" : 123 }
466        });
467
468        // we create a task that sends the response and returns the response
469        let handle = tokio::task::spawn(async move { cln.call(req).await });
470
471        // Ensure our mock receives the request and sends the response
472        let read_req = dbg!(frame.next().await.unwrap().unwrap());
473        assert_eq!(
474            read_req,
475            json!({"id" : 1, "jsonrpc" : "2.0", "method" : "ping", "params" : {"id" : "0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b"}})
476        );
477        frame.send(mock_resp).await.unwrap();
478
479        // Verify that the error response is correct
480        let rpc_response: Result<_, RpcError> = handle.await.unwrap();
481        match rpc_response.unwrap() {
482            Response::Ping(ping) => {
483                assert_eq!(ping.totlen, 123);
484            }
485            _ => panic!("A Request::Getinfo should return Response::Getinfo"),
486        }
487    }
488
489    #[tokio::test]
490    async fn test_call_typed() {
491        // Set up the rpc-connection
492        // The frame represents a Mock rpc-server
493        let (uds1, uds2) = UnixStream::pair().unwrap();
494        let mut cln = ClnRpc::from_stream(uds1).unwrap();
495        let mut frame = Framed::new(uds2, JsonCodec::default());
496
497        // We'll use the Ping request here because both the request
498        // and response have few arguments
499        let req = requests::PingRequest {
500            id: PublicKey::from_str(
501                "0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b",
502            )
503            .unwrap(),
504            len: None,
505            pongbytes: None,
506        };
507        let mock_resp = json!({
508            "id" : 1,
509            "jsonrpc" : "2.0",
510            "result" : { "totlen" : 123 }
511        });
512
513        // we create a task that sends the response and returns the response
514        let handle = tokio::task::spawn(async move { cln.call_typed(&req).await });
515
516        // Ensure our mock receives the request and sends the response
517        _ = dbg!(frame.next().await.unwrap().unwrap());
518        frame.send(mock_resp).await.unwrap();
519
520        // Verify that the error response is correct
521        let rpc_response: Result<_, RpcError> = handle.await.unwrap();
522        let ping_response = rpc_response.unwrap();
523        assert_eq!(ping_response.totlen, 123);
524    }
525
526    #[tokio::test]
527    async fn test_call_typed_remote_error() {
528        // Create a dummy rpc-request
529        let req = requests::GetinfoRequest {};
530
531        // Create a dummy error response
532        let response = json!({
533        "id" : 1,
534        "jsonrpc" : "2.0",
535        "error" : {
536            "code" : 666,
537            "message" : "MOCK_ERROR",
538        }});
539
540        let (uds1, uds2) = UnixStream::pair().unwrap();
541        let mut cln = ClnRpc::from_stream(uds1).unwrap();
542
543        // Send out the request
544        let mut frame = Framed::new(uds2, JsonCodec::default());
545
546        let handle = tokio::task::spawn(async move { cln.call_typed(&req).await });
547
548        // Dummy-server ensures the request has been received and send the error response
549        let _ = dbg!(frame.next().await.unwrap().unwrap());
550        frame.send(response).await.unwrap();
551
552        let rpc_response = handle.await.unwrap();
553        let rpc_error = rpc_response.expect_err("Must be an RPC-error response");
554
555        assert_eq!(rpc_error.code.unwrap(), 666);
556        assert_eq!(rpc_error.message, "MOCK_ERROR");
557    }
558
559    #[test]
560    fn serialize_custom_msg_notification() {
561        let msg = CustomMsgNotification {
562            peer_id : PublicKey::from_str("0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b").unwrap(),
563            payload : String::from("941746573749")
564        };
565
566        let notification = Notification::CustomMsg(msg);
567
568        assert_eq!(
569            serde_json::to_value(notification).unwrap(),
570            serde_json::json!(
571                {
572                    "custommsg" : {
573                        "peer_id" : "0364aeb75519be29d1af7b8cc6232dbda9fdabb79b66e4e1f6a223750954db210b",
574                        "payload" : "941746573749"
575                    }
576                }
577            )
578        );
579
580    }
581
582    #[test]
583    fn serialize_block_added_notification() {
584        let block_added = BlockAddedNotification {
585            hash : crate::primitives::Sha256::from_str("000000000000000000000acab8abe0c67a52ed7e5a90a19c64930ff11fa84eca").unwrap(),
586            height : 830702
587        };
588
589        let notification = Notification::BlockAdded(block_added);
590
591        assert_eq!(
592            serde_json::to_value(notification).unwrap(),
593            serde_json::json!({
594                "block_added" : {
595                    "hash" : "000000000000000000000acab8abe0c67a52ed7e5a90a19c64930ff11fa84eca",
596                    "height" : 830702
597                }
598            })
599        )
600    }
601
602    #[test]
603    fn deserialize_connect_notification() {
604        let connect_json = serde_json::json!({
605            "connect" :  {
606                "address" : {
607                    "address" : "127.0.0.1",
608                    "port" : 38012,
609                    "type" : "ipv4"
610                },
611                "direction" : "in",
612                "id" : "022d223620a359a47ff7f7ac447c85c46c923da53389221a0054c11c1e3ca31d59"
613            }
614        });
615
616        let _ : Notification = serde_json::from_value(connect_json).unwrap();
617    }
618}