1use async_trait::async_trait;
2use bitcoin::secp256k1::PublicKey;
3use bitcoin::Network;
4use cln_grpc::pb::{
5 listpays_pays::ListpaysPaysStatus, node_client::NodeClient, Amount, GetinfoRequest,
6 KeysendRequest, KeysendResponse, ListchannelsRequest, ListnodesRequest, ListpaysRequest,
7 ListpaysResponse,
8};
9use lightning::ln::features::NodeFeatures;
10use lightning::ln::PaymentHash;
11
12use serde::{Deserialize, Serialize};
13use tokio::fs::File;
14use tokio::io::{AsyncReadExt, Error};
15use tokio::time::{self, Duration};
16use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
17use triggered::Listener;
18
19use crate::{
20 serializers, LightningError, LightningNode, NodeId, NodeInfo, PaymentOutcome, PaymentResult,
21};
22
23#[derive(Serialize, Deserialize, Debug, Clone)]
24pub struct ClnConnection {
25 #[serde(with = "serializers::serde_node_id")]
26 pub id: NodeId,
27 pub address: String,
28 #[serde(deserialize_with = "serializers::deserialize_path")]
29 pub ca_cert: String,
30 #[serde(deserialize_with = "serializers::deserialize_path")]
31 pub client_cert: String,
32 #[serde(deserialize_with = "serializers::deserialize_path")]
33 pub client_key: String,
34}
35
36pub struct ClnNode {
37 pub client: NodeClient<Channel>,
38 info: NodeInfo,
39}
40
41impl ClnNode {
42 pub async fn new(connection: ClnConnection) -> Result<Self, LightningError> {
43 let tls = ClientTlsConfig::new()
44 .domain_name("cln")
45 .identity(Identity::from_pem(
46 reader(&connection.client_cert).await.map_err(|_| {
47 LightningError::ConnectionError("Cannot loads client certificate".to_string())
48 })?,
49 reader(&connection.client_key).await.map_err(|_| {
50 LightningError::ConnectionError("Cannot loads client key".to_string())
51 })?,
52 ))
53 .ca_certificate(Certificate::from_pem(
54 reader(&connection.ca_cert).await.map_err(|_| {
55 LightningError::ConnectionError("Cannot loads CA certificate".to_string())
56 })?,
57 ));
58
59 let mut client = NodeClient::new(
60 Channel::from_shared(connection.address.to_string())
61 .map_err(|err| LightningError::ConnectionError(err.to_string()))?
62 .tls_config(tls)
63 .map_err(|_| {
64 LightningError::ConnectionError("Cannot establish tls connection".to_string())
65 })?
66 .connect()
67 .await
68 .map_err(|_| {
69 LightningError::ConnectionError("Cannot connect to gRPC server".to_string())
70 })?,
71 );
72
73 let (id, mut alias, our_features) = client
74 .getinfo(GetinfoRequest {})
75 .await
76 .map(|r| {
77 let inner = r.into_inner();
78 (
79 inner.id,
80 inner.alias.unwrap_or_default(),
81 inner.our_features,
82 )
83 })
84 .map_err(|err| LightningError::GetInfoError(err.to_string()))?;
85
86 let pubkey = PublicKey::from_slice(&id)
87 .map_err(|err| LightningError::GetInfoError(err.to_string()))?;
88 connection.id.validate(&pubkey, &mut alias)?;
89
90 let features = if let Some(features) = our_features {
91 NodeFeatures::from_le_bytes(features.node)
92 } else {
93 NodeFeatures::empty()
94 };
95
96 Ok(Self {
97 client,
98 info: NodeInfo {
99 pubkey,
100 features,
101 alias,
102 },
103 })
104 }
105
106 async fn node_channels(&mut self, is_source: bool) -> Result<Vec<u64>, LightningError> {
110 let req = if is_source {
111 ListchannelsRequest {
112 source: Some(self.info.pubkey.serialize().to_vec()),
113 ..Default::default()
114 }
115 } else {
116 ListchannelsRequest {
117 destination: Some(self.info.pubkey.serialize().to_vec()),
118 ..Default::default()
119 }
120 };
121
122 let resp = self
123 .client
124 .list_channels(req)
125 .await
126 .map_err(|err| LightningError::ListChannelsError(err.to_string()))?
127 .into_inner();
128
129 Ok(resp
130 .channels
131 .into_iter()
132 .map(|channel| channel.amount_msat.unwrap_or_default().msat)
133 .collect())
134 }
135}
136
137#[async_trait]
138impl LightningNode for ClnNode {
139 fn get_info(&self) -> &NodeInfo {
140 &self.info
141 }
142
143 async fn get_network(&mut self) -> Result<Network, LightningError> {
144 let info = self
145 .client
146 .getinfo(GetinfoRequest {})
147 .await
148 .map_err(|err| LightningError::GetInfoError(err.to_string()))?
149 .into_inner();
150
151 Ok(Network::from_core_arg(&info.network)
152 .map_err(|err| LightningError::ValidationError(err.to_string()))?)
153 }
154
155 async fn send_payment(
156 &mut self,
157 dest: PublicKey,
158 amount_msat: u64,
159 ) -> Result<PaymentHash, LightningError> {
160 let KeysendResponse { payment_hash, .. } = self
161 .client
162 .key_send(KeysendRequest {
163 destination: dest.serialize().to_vec(),
164 amount_msat: Some(Amount { msat: amount_msat }),
165 ..Default::default()
166 })
167 .await
168 .map_err(|s| {
169 let message = s.message();
170 if message.contains("Some(-1") | message.contains("Some(203") {
173 LightningError::PermanentError(format!("{:?}", message))
175 } else {
176 LightningError::SendPaymentError(format!("{:?}", message))
178 }
179 })?
180 .into_inner();
181 let slice: [u8; 32] = payment_hash
182 .as_slice()
183 .try_into()
184 .map_err(|_| LightningError::InvalidPaymentHash)?;
185
186 Ok(PaymentHash(slice))
187 }
188
189 async fn track_payment(
190 &mut self,
191 hash: &PaymentHash,
192 shutdown: Listener,
193 ) -> Result<PaymentResult, LightningError> {
194 loop {
195 tokio::select! {
196 biased;
197 _ = shutdown.clone() => {
198 return Err(LightningError::TrackPaymentError("Shutdown before tracking results".to_string()));
199 },
200 _ = time::sleep(Duration::from_millis(500)) => {
201 let ListpaysResponse { pays } = self
202 .client
203 .list_pays(ListpaysRequest {
204 payment_hash: Some(hash.0.to_vec()),
205 ..Default::default()
206 })
207 .await
208 .map_err(|err| LightningError::TrackPaymentError(err.to_string()))?
209 .into_inner();
210
211 if let Some(pay) = pays.first() {
212 let payment_status = ListpaysPaysStatus::from_i32(pay.status)
213 .ok_or(LightningError::TrackPaymentError("Invalid payment status".to_string()))?;
214
215 let payment_outcome = match payment_status {
216 ListpaysPaysStatus::Pending => continue,
217 ListpaysPaysStatus::Complete => PaymentOutcome::Success,
218 ListpaysPaysStatus::Failed => PaymentOutcome::UnexpectedError,
220 };
221 let htlc_count = pay.number_of_parts.unwrap_or(1).try_into().map_err(|_| LightningError::TrackPaymentError("Invalid number of parts".to_string()))?;
222 return Ok(PaymentResult {
223 htlc_count,
224 payment_outcome,
225 });
226 }
227 },
228 }
229 }
230 }
231
232 async fn get_node_info(&mut self, node_id: &PublicKey) -> Result<NodeInfo, LightningError> {
233 let mut nodes: Vec<cln_grpc::pb::ListnodesNodes> = self
234 .client
235 .list_nodes(ListnodesRequest {
236 id: Some(node_id.serialize().to_vec()),
237 })
238 .await
239 .map_err(|err| LightningError::GetNodeInfoError(err.to_string()))?
240 .into_inner()
241 .nodes;
242
243 if let Some(node) = nodes.pop() {
245 Ok(NodeInfo {
246 pubkey: *node_id,
247 alias: node.alias.unwrap_or(String::new()),
248 features: node
249 .features
250 .clone()
251 .map_or(NodeFeatures::empty(), |mut f| {
252 f.reverse();
254 NodeFeatures::from_le_bytes(f)
255 }),
256 })
257 } else {
258 Err(LightningError::GetNodeInfoError(
259 "Node not found".to_string(),
260 ))
261 }
262 }
263
264 async fn list_channels(&mut self) -> Result<Vec<u64>, LightningError> {
265 let mut node_channels = self.node_channels(true).await?;
266 node_channels.extend(self.node_channels(false).await?);
267 Ok(node_channels)
268 }
269}
270
271async fn reader(filename: &str) -> Result<Vec<u8>, Error> {
272 let mut file = File::open(filename).await?;
273 let mut contents = vec![];
274 file.read_to_end(&mut contents).await?;
275 Ok(contents)
276}