1#![warn(missing_docs)]
4#![warn(rustdoc::bare_urls)]
5
6use std::path::PathBuf;
7use std::pin::Pin;
8use std::str::FromStr;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use cdk::amount::{to_unit, Amount, MSAT_IN_SAT};
14use cdk::cdk_lightning::{
15 self, CreateInvoiceResponse, MintLightning, PayInvoiceResponse, PaymentQuoteResponse, Settings,
16};
17use cdk::mint::FeeReserve;
18use cdk::nuts::{CurrencyUnit, MeltQuoteBolt11Request, MeltQuoteState, MintQuoteState};
19use cdk::util::{hex, unix_time};
20use cdk::{mint, Bolt11Invoice};
21use cln_rpc::model::requests::{
22 InvoiceRequest, ListinvoicesRequest, ListpaysRequest, PayRequest, WaitanyinvoiceRequest,
23};
24use cln_rpc::model::responses::{
25 ListinvoicesInvoices, ListinvoicesInvoicesStatus, ListpaysPaysStatus, PayStatus,
26 WaitanyinvoiceStatus,
27};
28use cln_rpc::primitives::{Amount as CLN_Amount, AmountOrAny};
29use error::Error;
30use futures::{Stream, StreamExt};
31use tokio::sync::Mutex;
32use tokio_util::sync::CancellationToken;
33use uuid::Uuid;
34
35pub mod error;
36
37#[derive(Clone)]
39pub struct Cln {
40 rpc_socket: PathBuf,
41 cln_client: Arc<Mutex<cln_rpc::ClnRpc>>,
42 fee_reserve: FeeReserve,
43 wait_invoice_cancel_token: CancellationToken,
44 wait_invoice_is_active: Arc<AtomicBool>,
45}
46
47impl Cln {
48 pub async fn new(rpc_socket: PathBuf, fee_reserve: FeeReserve) -> Result<Self, Error> {
50 let cln_client = cln_rpc::ClnRpc::new(&rpc_socket).await?;
51
52 Ok(Self {
53 rpc_socket,
54 cln_client: Arc::new(Mutex::new(cln_client)),
55 fee_reserve,
56 wait_invoice_cancel_token: CancellationToken::new(),
57 wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
58 })
59 }
60}
61
62#[async_trait]
63impl MintLightning for Cln {
64 type Err = cdk_lightning::Error;
65
66 fn get_settings(&self) -> Settings {
67 Settings {
68 mpp: true,
69 unit: CurrencyUnit::Msat,
70 invoice_description: true,
71 }
72 }
73
74 fn is_wait_invoice_active(&self) -> bool {
76 self.wait_invoice_is_active.load(Ordering::SeqCst)
77 }
78
79 fn cancel_wait_invoice(&self) {
81 self.wait_invoice_cancel_token.cancel()
82 }
83
84 #[allow(clippy::incompatible_msrv)]
85 async fn wait_any_invoice(
87 &self,
88 ) -> Result<Pin<Box<dyn Stream<Item = String> + Send>>, Self::Err> {
89 let last_pay_index = self.get_last_pay_index().await?;
90 let cln_client = cln_rpc::ClnRpc::new(&self.rpc_socket).await?;
91
92 let stream = futures::stream::unfold(
93 (
94 cln_client,
95 last_pay_index,
96 self.wait_invoice_cancel_token.clone(),
97 Arc::clone(&self.wait_invoice_is_active),
98 ),
99 |(mut cln_client, mut last_pay_idx, cancel_token, is_active)| async move {
100 is_active.store(true, Ordering::SeqCst);
102
103 loop {
104 let request = WaitanyinvoiceRequest {
105 timeout: None,
106 lastpay_index: last_pay_idx,
107 };
108 tokio::select! {
109 _ = cancel_token.cancelled() => {
110 is_active.store(false, Ordering::SeqCst);
112 return None;
114 }
115 result = cln_client.call_typed(&request) => {
116 match result {
117 Ok(invoice) => {
118
119 match invoice.status {
122 WaitanyinvoiceStatus::PAID => (),
123 WaitanyinvoiceStatus::EXPIRED => continue,
124 }
125
126 last_pay_idx = invoice.pay_index;
127
128 let payment_hash = invoice.payment_hash.to_string();
129
130 let request_look_up = match invoice.bolt12 {
131 Some(_) => {
135 match fetch_invoice_by_payment_hash(
136 &mut cln_client,
137 &payment_hash,
138 )
139 .await
140 {
141 Ok(Some(invoice)) => {
142 if let Some(local_offer_id) = invoice.local_offer_id {
143 local_offer_id.to_string()
144 } else {
145 continue;
146 }
147 }
148 Ok(None) => continue,
149 Err(e) => {
150 tracing::warn!(
151 "Error fetching invoice by payment hash: {e}"
152 );
153 continue;
154 }
155 }
156 }
157 None => payment_hash,
158 };
159
160 return Some((request_look_up, (cln_client, last_pay_idx, cancel_token, is_active)));
161 }
162 Err(e) => {
163 tracing::warn!("Error fetching invoice: {e}");
164 is_active.store(false, Ordering::SeqCst);
165 return None;
166 }
167 }
168 }
169 }
170 }
171 },
172 )
173 .boxed();
174
175 Ok(stream)
176 }
177
178 async fn get_payment_quote(
179 &self,
180 melt_quote_request: &MeltQuoteBolt11Request,
181 ) -> Result<PaymentQuoteResponse, Self::Err> {
182 let amount = melt_quote_request.amount_msat()?;
183
184 let amount = amount / MSAT_IN_SAT.into();
185
186 let relative_fee_reserve =
187 (self.fee_reserve.percent_fee_reserve * u64::from(amount) as f32) as u64;
188
189 let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
190
191 let fee = match relative_fee_reserve > absolute_fee_reserve {
192 true => relative_fee_reserve,
193 false => absolute_fee_reserve,
194 };
195
196 Ok(PaymentQuoteResponse {
197 request_lookup_id: melt_quote_request.request.payment_hash().to_string(),
198 amount,
199 fee: fee.into(),
200 state: MeltQuoteState::Unpaid,
201 })
202 }
203
204 async fn pay_invoice(
205 &self,
206 melt_quote: mint::MeltQuote,
207 partial_amount: Option<Amount>,
208 max_fee: Option<Amount>,
209 ) -> Result<PayInvoiceResponse, Self::Err> {
210 let bolt11 = Bolt11Invoice::from_str(&melt_quote.request)?;
211 let pay_state = self
212 .check_outgoing_payment(&bolt11.payment_hash().to_string())
213 .await?;
214
215 match pay_state.status {
216 MeltQuoteState::Unpaid | MeltQuoteState::Unknown | MeltQuoteState::Failed => (),
217 MeltQuoteState::Paid => {
218 tracing::debug!("Melt attempted on invoice already paid");
219 return Err(Self::Err::InvoiceAlreadyPaid);
220 }
221 MeltQuoteState::Pending => {
222 tracing::debug!("Melt attempted on invoice already pending");
223 return Err(Self::Err::InvoicePaymentPending);
224 }
225 }
226
227 let amount_msat = partial_amount
228 .is_none()
229 .then(|| {
230 melt_quote
231 .msat_to_pay
232 .map(|a| CLN_Amount::from_msat(a.into()))
233 })
234 .flatten();
235
236 let mut cln_client = self.cln_client.lock().await;
237 let cln_response = cln_client
238 .call_typed(&PayRequest {
239 bolt11: melt_quote.request.to_string(),
240 amount_msat,
241 label: None,
242 riskfactor: None,
243 maxfeepercent: None,
244 retry_for: None,
245 maxdelay: None,
246 exemptfee: None,
247 localinvreqid: None,
248 exclude: None,
249 maxfee: max_fee
250 .map(|a| {
251 let msat = to_unit(a, &melt_quote.unit, &CurrencyUnit::Msat)?;
252 Ok::<CLN_Amount, Self::Err>(CLN_Amount::from_msat(msat.into()))
253 })
254 .transpose()?,
255 description: None,
256 partial_msat: partial_amount
257 .map(|a| {
258 let msat = to_unit(a, &melt_quote.unit, &CurrencyUnit::Msat)?;
259
260 Ok::<cln_rpc::primitives::Amount, Self::Err>(CLN_Amount::from_msat(
261 msat.into(),
262 ))
263 })
264 .transpose()?,
265 })
266 .await;
267
268 let response = match cln_response {
269 Ok(pay_response) => {
270 let status = match pay_response.status {
271 PayStatus::COMPLETE => MeltQuoteState::Paid,
272 PayStatus::PENDING => MeltQuoteState::Pending,
273 PayStatus::FAILED => MeltQuoteState::Failed,
274 };
275
276 PayInvoiceResponse {
277 payment_preimage: Some(hex::encode(pay_response.payment_preimage.to_vec())),
278 payment_lookup_id: pay_response.payment_hash.to_string(),
279 status,
280 total_spent: to_unit(
281 pay_response.amount_sent_msat.msat(),
282 &CurrencyUnit::Msat,
283 &melt_quote.unit,
284 )?,
285 unit: melt_quote.unit,
286 }
287 }
288 Err(err) => {
289 tracing::error!("Could not pay invoice: {}", err);
290 return Err(Error::ClnRpc(err).into());
291 }
292 };
293
294 Ok(response)
295 }
296
297 async fn create_invoice(
298 &self,
299 amount: Amount,
300 unit: &CurrencyUnit,
301 description: String,
302 unix_expiry: u64,
303 ) -> Result<CreateInvoiceResponse, Self::Err> {
304 let time_now = unix_time();
305 assert!(unix_expiry > time_now);
306
307 let mut cln_client = self.cln_client.lock().await;
308
309 let label = Uuid::new_v4().to_string();
310
311 let amount = to_unit(amount, unit, &CurrencyUnit::Msat)?;
312 let amount_msat = AmountOrAny::Amount(CLN_Amount::from_msat(amount.into()));
313
314 let invoice_response = cln_client
315 .call_typed(&InvoiceRequest {
316 amount_msat,
317 description,
318 label: label.clone(),
319 expiry: Some(unix_expiry - time_now),
320 fallbacks: None,
321 preimage: None,
322 cltv: None,
323 deschashonly: None,
324 exposeprivatechannels: None,
325 })
326 .await
327 .map_err(Error::from)?;
328
329 let request = Bolt11Invoice::from_str(&invoice_response.bolt11)?;
330 let expiry = request.expires_at().map(|t| t.as_secs());
331 let payment_hash = request.payment_hash();
332
333 Ok(CreateInvoiceResponse {
334 request_lookup_id: payment_hash.to_string(),
335 request,
336 expiry,
337 })
338 }
339
340 async fn check_incoming_invoice_status(
341 &self,
342 payment_hash: &str,
343 ) -> Result<MintQuoteState, Self::Err> {
344 let mut cln_client = self.cln_client.lock().await;
345
346 let listinvoices_response = cln_client
347 .call_typed(&ListinvoicesRequest {
348 payment_hash: Some(payment_hash.to_string()),
349 label: None,
350 invstring: None,
351 offer_id: None,
352 index: None,
353 limit: None,
354 start: None,
355 })
356 .await
357 .map_err(Error::from)?;
358
359 let status = match listinvoices_response.invoices.first() {
360 Some(invoice_response) => cln_invoice_status_to_mint_state(invoice_response.status),
361 None => {
362 tracing::info!(
363 "Check invoice called on unknown look up id: {}",
364 payment_hash
365 );
366 return Err(Error::WrongClnResponse.into());
367 }
368 };
369
370 Ok(status)
371 }
372
373 async fn check_outgoing_payment(
374 &self,
375 payment_hash: &str,
376 ) -> Result<PayInvoiceResponse, Self::Err> {
377 let mut cln_client = self.cln_client.lock().await;
378
379 let listpays_response = cln_client
380 .call_typed(&ListpaysRequest {
381 payment_hash: Some(payment_hash.parse().map_err(|_| Error::InvalidHash)?),
382 bolt11: None,
383 status: None,
384 start: None,
385 index: None,
386 limit: None,
387 })
388 .await
389 .map_err(Error::from)?;
390
391 match listpays_response.pays.first() {
392 Some(pays_response) => {
393 let status = cln_pays_status_to_mint_state(pays_response.status);
394
395 Ok(PayInvoiceResponse {
396 payment_lookup_id: pays_response.payment_hash.to_string(),
397 payment_preimage: pays_response.preimage.map(|p| hex::encode(p.to_vec())),
398 status,
399 total_spent: pays_response
400 .amount_sent_msat
401 .map_or(Amount::ZERO, |a| a.msat().into()),
402 unit: CurrencyUnit::Msat,
403 })
404 }
405 None => Ok(PayInvoiceResponse {
406 payment_lookup_id: payment_hash.to_string(),
407 payment_preimage: None,
408 status: MeltQuoteState::Unknown,
409 total_spent: Amount::ZERO,
410 unit: CurrencyUnit::Msat,
411 }),
412 }
413 }
414}
415
416impl Cln {
417 async fn get_last_pay_index(&self) -> Result<Option<u64>, Error> {
419 let mut cln_client = self.cln_client.lock().await;
420 let listinvoices_response = cln_client
421 .call_typed(&ListinvoicesRequest {
422 index: None,
423 invstring: None,
424 label: None,
425 limit: None,
426 offer_id: None,
427 payment_hash: None,
428 start: None,
429 })
430 .await
431 .map_err(Error::from)?;
432
433 match listinvoices_response.invoices.last() {
434 Some(last_invoice) => Ok(last_invoice.pay_index),
435 None => Ok(None),
436 }
437 }
438}
439
440fn cln_invoice_status_to_mint_state(status: ListinvoicesInvoicesStatus) -> MintQuoteState {
441 match status {
442 ListinvoicesInvoicesStatus::UNPAID => MintQuoteState::Unpaid,
443 ListinvoicesInvoicesStatus::PAID => MintQuoteState::Paid,
444 ListinvoicesInvoicesStatus::EXPIRED => MintQuoteState::Unpaid,
445 }
446}
447
448fn cln_pays_status_to_mint_state(status: ListpaysPaysStatus) -> MeltQuoteState {
449 match status {
450 ListpaysPaysStatus::PENDING => MeltQuoteState::Pending,
451 ListpaysPaysStatus::COMPLETE => MeltQuoteState::Paid,
452 ListpaysPaysStatus::FAILED => MeltQuoteState::Failed,
453 }
454}
455
456async fn fetch_invoice_by_payment_hash(
457 cln_client: &mut cln_rpc::ClnRpc,
458 payment_hash: &str,
459) -> Result<Option<ListinvoicesInvoices>, Error> {
460 match cln_client
461 .call_typed(&ListinvoicesRequest {
462 payment_hash: Some(payment_hash.to_string()),
463 index: None,
464 invstring: None,
465 label: None,
466 limit: None,
467 offer_id: None,
468 start: None,
469 })
470 .await
471 {
472 Ok(invoice_response) => Ok(invoice_response.invoices.first().cloned()),
473 Err(e) => {
474 tracing::warn!("Error fetching invoice: {e}");
475 Err(Error::from(e))
476 }
477 }
478}