gl_client/lsps/
client.rs

1//
2// This is an implementation of the LSPS1 specification (https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/OpenAPI/LSPS1.json)
3//
4// This specification describes how a lightning-node should communicate with
5// a Lightning Service Provider.
6//
7// The key-idea is that all communication occurs using JSON-rpc and BOLT8-messages
8// are used as the transport layer. All messages related to LSPS will start
9// with the LSPS_MESSAGE_ID.
10//
11use super::json_rpc::{generate_random_rpc_id, JsonRpcResponse};
12use super::json_rpc_erased::JsonRpcMethodUnerased;
13use crate::lsps::error::LspsError;
14use crate::node::{Client, ClnClient};
15use crate::pb::{Custommsg, StreamCustommsgRequest};
16use crate::util::is_feature_bit_enabled;
17use cln_grpc::pb::{ListnodesRequest, SendcustommsgRequest};
18use std::io::{Cursor, Read, Write};
19
20// BOLT8 message ID 37913
21const LSPS_MESSAGE_ID: [u8; 2] = [0x94, 0x19];
22const TIMEOUT_MILLIS: u128 = 30_000;
23const BITNUM_LSP_FEATURE: usize = 729;
24
25pub struct LspClient {
26    client: Client,        // Used for receiving custom messages
27    cln_client: ClnClient, // USed for sending custom message
28}
29
30impl LspClient {
31    pub fn new(client: Client, cln_client: ClnClient) -> Self {
32        Self {
33						client,
34            cln_client,
35        }
36    }
37
38    /// Create a JSON-rpc request to a LSPS
39    ///
40    /// # Arguments:
41    /// - peer_id: the node_id of the lsps
42    /// - method: the request method as defined in the LSPS
43    /// - param: the request parameter
44    ///
45    pub async fn request<'a, I, O, E>(
46        &mut self,
47        peer_id: &[u8],
48        method: &impl JsonRpcMethodUnerased<'a, I, O, E>,
49        param: I,
50    ) -> Result<JsonRpcResponse<O, E>, LspsError>
51    where
52        I: serde::Serialize,
53        E: serde::de::DeserializeOwned,
54        O: serde::de::DeserializeOwned,
55    {
56        let json_rpc_id = generate_random_rpc_id();
57        self
58            .request_with_json_rpc_id(peer_id, method, param, json_rpc_id)
59            .await
60    }
61
62    /// Makes the jsonrpc request and returns the response
63    ///
64    /// This method allows the user to specify a custom json_rpc_id.
65    /// To ensure compliance with LSPS0 the use of `[request`] is recommended.
66    /// For some testing scenario's it is useful to have a reproducable json_rpc_id
67    pub async fn request_with_json_rpc_id<'a, I, O, E>(
68        &mut self,
69        peer_id: &[u8],
70        method: &impl JsonRpcMethodUnerased<'a, I, O, E>,
71        param: I,
72        json_rpc_id: String,
73    ) -> Result<JsonRpcResponse<O, E>, LspsError>
74    where
75        I: serde::Serialize,
76        E: serde::de::DeserializeOwned,
77        O: serde::de::DeserializeOwned,
78    {
79        // Constructs the JsonRpcRequest
80        let request = method
81            .create_request(param, json_rpc_id)
82            .map_err(LspsError::JsonParseRequestError)?;
83
84        // Core-lightning uses the convention that the first two bytes are the BOLT-8 message id
85        let mut cursor: Cursor<Vec<u8>> = std::io::Cursor::new(Vec::new());
86        cursor.write_all(&LSPS_MESSAGE_ID)?;
87        serde_json::to_writer(&mut cursor, &request)
88            .map_err(LspsError::JsonParseRequestError)?;
89
90        let custom_message_request = SendcustommsgRequest {
91            node_id: peer_id.to_vec(),
92            msg: cursor.into_inner(),
93        };
94
95        // Here we start listening to the responses.
96        // It is important that we do this before we send the first message
97        // to prevent high-latency clients from missing the response
98        //
99        // The await ensures that we are ready to get incoming messages
100        let mut stream: tonic::Streaming<Custommsg> = self
101            .client
102            .stream_custommsg(StreamCustommsgRequest {})
103            .await?
104            .into_inner();
105
106        // Sends the JsonRpcRequest
107        // Once the await has completed our greenlight node has successfully send the message
108        // The LSPS did probably not respond yet
109        self.cln_client
110            .send_custom_msg(custom_message_request)
111            .await?;
112
113        // We read all incoming messages one-by-one
114        // If the peer_id, LSPS_MESSAGE_ID and the id used in json_rpc matches we return the result
115        let loop_start_instant = std::time::Instant::now();
116        while let Some(mut msg) = stream.message().await? {
117            // Skip if peer_id doesn't match
118            if msg.peer_id != peer_id {
119                continue;
120            }
121
122            // Skip if LSPS_MESSAGE_ID (first 2 bytes) doesn't match
123            let mut msg_cursor: Cursor<&mut [u8]> = std::io::Cursor::new(msg.payload.as_mut());
124            let mut msg_bolt8_id: [u8; 2] = [0, 0];
125            msg_cursor.read_exact(&mut msg_bolt8_id)?;
126
127            if msg_bolt8_id != LSPS_MESSAGE_ID {
128                continue;
129            }
130
131            // Deserialize the JSON compare the json_rpc_id
132            // If it matches we return a typed JsonRpcRequest
133            let value: serde_json::Value = serde_json::from_reader(&mut msg_cursor)
134                .map_err(LspsError::JsonParseResponseError)?;
135            if value.get("id").and_then(|x| x.as_str()) == Some(&request.id) {
136                // There is a bug here. We need to do the parsing in the underlying trait
137                let rpc_response = method
138                    .parse_json_response_value(value)
139                    .map_err(LspsError::JsonParseResponseError)?;
140                return Ok(rpc_response);
141            }
142
143            // Check if the connection timed-out
144            // TODO:
145            // This is implementation is somewhat flawed.
146            // If no message comes in it will wait forever.
147            //
148            // I might have to add a built-in time-out mechanism in StreamCustomMsg or come up with
149            // a better solution.
150            if loop_start_instant.elapsed().as_millis() >= TIMEOUT_MILLIS {
151                return Err(LspsError::Timeout);
152            }
153        }
154
155        // If the stream was closed
156        Err(LspsError::ConnectionClosed)
157    }
158
159    pub async fn list_lsp_servers(&mut self) -> Result<Vec<Vec<u8>>, LspsError> {
160        let request = ListnodesRequest { id: None };
161
162        // Query all known lightning-nodes
163        let response = self.cln_client.list_nodes(request).await?;
164
165        return Ok(response
166            .into_inner()
167            .nodes
168            .into_iter()
169            .filter(|x| is_feature_bit_enabled(x.features(), BITNUM_LSP_FEATURE))
170            .map(|n| n.nodeid)
171            .collect());
172    }
173}