1use super::json_rpc::{generate_random_rpc_id, JsonRpcResponse};
12use super::json_rpc_erased::JsonRpcMethodUnerased;
13use crate::lsps::error::LspsError;
14use crate::node::{Client, ClnClient};
15use crate::pb::{Custommsg, StreamCustommsgRequest};
16use crate::util::is_feature_bit_enabled;
17use cln_grpc::pb::{ListnodesRequest, SendcustommsgRequest};
18use std::io::{Cursor, Read, Write};
19
20const LSPS_MESSAGE_ID: [u8; 2] = [0x94, 0x19];
22const TIMEOUT_MILLIS: u128 = 30_000;
23const BITNUM_LSP_FEATURE: usize = 729;
24
25pub struct LspClient {
26 client: Client, cln_client: ClnClient, }
29
30impl LspClient {
31 pub fn new(client: Client, cln_client: ClnClient) -> Self {
32 Self {
33 client,
34 cln_client,
35 }
36 }
37
38 pub async fn request<'a, I, O, E>(
46 &mut self,
47 peer_id: &[u8],
48 method: &impl JsonRpcMethodUnerased<'a, I, O, E>,
49 param: I,
50 ) -> Result<JsonRpcResponse<O, E>, LspsError>
51 where
52 I: serde::Serialize,
53 E: serde::de::DeserializeOwned,
54 O: serde::de::DeserializeOwned,
55 {
56 let json_rpc_id = generate_random_rpc_id();
57 self
58 .request_with_json_rpc_id(peer_id, method, param, json_rpc_id)
59 .await
60 }
61
62 pub async fn request_with_json_rpc_id<'a, I, O, E>(
68 &mut self,
69 peer_id: &[u8],
70 method: &impl JsonRpcMethodUnerased<'a, I, O, E>,
71 param: I,
72 json_rpc_id: String,
73 ) -> Result<JsonRpcResponse<O, E>, LspsError>
74 where
75 I: serde::Serialize,
76 E: serde::de::DeserializeOwned,
77 O: serde::de::DeserializeOwned,
78 {
79 let request = method
81 .create_request(param, json_rpc_id)
82 .map_err(LspsError::JsonParseRequestError)?;
83
84 let mut cursor: Cursor<Vec<u8>> = std::io::Cursor::new(Vec::new());
86 cursor.write_all(&LSPS_MESSAGE_ID)?;
87 serde_json::to_writer(&mut cursor, &request)
88 .map_err(LspsError::JsonParseRequestError)?;
89
90 let custom_message_request = SendcustommsgRequest {
91 node_id: peer_id.to_vec(),
92 msg: cursor.into_inner(),
93 };
94
95 let mut stream: tonic::Streaming<Custommsg> = self
101 .client
102 .stream_custommsg(StreamCustommsgRequest {})
103 .await?
104 .into_inner();
105
106 self.cln_client
110 .send_custom_msg(custom_message_request)
111 .await?;
112
113 let loop_start_instant = std::time::Instant::now();
116 while let Some(mut msg) = stream.message().await? {
117 if msg.peer_id != peer_id {
119 continue;
120 }
121
122 let mut msg_cursor: Cursor<&mut [u8]> = std::io::Cursor::new(msg.payload.as_mut());
124 let mut msg_bolt8_id: [u8; 2] = [0, 0];
125 msg_cursor.read_exact(&mut msg_bolt8_id)?;
126
127 if msg_bolt8_id != LSPS_MESSAGE_ID {
128 continue;
129 }
130
131 let value: serde_json::Value = serde_json::from_reader(&mut msg_cursor)
134 .map_err(LspsError::JsonParseResponseError)?;
135 if value.get("id").and_then(|x| x.as_str()) == Some(&request.id) {
136 let rpc_response = method
138 .parse_json_response_value(value)
139 .map_err(LspsError::JsonParseResponseError)?;
140 return Ok(rpc_response);
141 }
142
143 if loop_start_instant.elapsed().as_millis() >= TIMEOUT_MILLIS {
151 return Err(LspsError::Timeout);
152 }
153 }
154
155 Err(LspsError::ConnectionClosed)
157 }
158
159 pub async fn list_lsp_servers(&mut self) -> Result<Vec<Vec<u8>>, LspsError> {
160 let request = ListnodesRequest { id: None };
161
162 let response = self.cln_client.list_nodes(request).await?;
164
165 return Ok(response
166 .into_inner()
167 .nodes
168 .into_iter()
169 .filter(|x| is_feature_bit_enabled(x.features(), BITNUM_LSP_FEATURE))
170 .map(|n| n.nodeid)
171 .collect());
172 }
173}