1#![warn(missing_docs)]
4#![warn(rustdoc::bare_urls)]
5
6use std::cmp::max;
7use std::path::PathBuf;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12
13use async_trait::async_trait;
14use cdk::amount::{to_unit, Amount};
15use cdk::cdk_payment::{
16 self, Bolt11Settings, CreateIncomingPaymentResponse, MakePaymentResponse, MintPayment,
17 PaymentQuoteResponse,
18};
19use cdk::nuts::{CurrencyUnit, MeltOptions, MeltQuoteState, MintQuoteState};
20use cdk::types::FeeReserve;
21use cdk::util::{hex, unix_time};
22use cdk::{mint, Bolt11Invoice};
23use cln_rpc::model::requests::{
24 InvoiceRequest, ListinvoicesRequest, ListpaysRequest, PayRequest, WaitanyinvoiceRequest,
25};
26use cln_rpc::model::responses::{
27 ListinvoicesInvoices, ListinvoicesInvoicesStatus, ListpaysPaysStatus, PayStatus,
28 WaitanyinvoiceStatus,
29};
30use cln_rpc::primitives::{Amount as CLN_Amount, AmountOrAny};
31use error::Error;
32use futures::{Stream, StreamExt};
33use serde_json::Value;
34use tokio::sync::Mutex;
35use tokio_util::sync::CancellationToken;
36use uuid::Uuid;
37
38pub mod error;
39
40#[derive(Clone)]
42pub struct Cln {
43 rpc_socket: PathBuf,
44 cln_client: Arc<Mutex<cln_rpc::ClnRpc>>,
45 fee_reserve: FeeReserve,
46 wait_invoice_cancel_token: CancellationToken,
47 wait_invoice_is_active: Arc<AtomicBool>,
48}
49
50impl Cln {
51 pub async fn new(rpc_socket: PathBuf, fee_reserve: FeeReserve) -> Result<Self, Error> {
53 let cln_client = cln_rpc::ClnRpc::new(&rpc_socket).await?;
54
55 Ok(Self {
56 rpc_socket,
57 cln_client: Arc::new(Mutex::new(cln_client)),
58 fee_reserve,
59 wait_invoice_cancel_token: CancellationToken::new(),
60 wait_invoice_is_active: Arc::new(AtomicBool::new(false)),
61 })
62 }
63}
64
65#[async_trait]
66impl MintPayment for Cln {
67 type Err = cdk_payment::Error;
68
69 async fn get_settings(&self) -> Result<Value, Self::Err> {
70 Ok(serde_json::to_value(Bolt11Settings {
71 mpp: true,
72 unit: CurrencyUnit::Msat,
73 invoice_description: true,
74 })?)
75 }
76
77 fn is_wait_invoice_active(&self) -> bool {
79 self.wait_invoice_is_active.load(Ordering::SeqCst)
80 }
81
82 fn cancel_wait_invoice(&self) {
84 self.wait_invoice_cancel_token.cancel()
85 }
86
87 async fn wait_any_incoming_payment(
88 &self,
89 ) -> Result<Pin<Box<dyn Stream<Item = String> + Send>>, Self::Err> {
90 let last_pay_index = self.get_last_pay_index().await?;
91 let cln_client = cln_rpc::ClnRpc::new(&self.rpc_socket).await?;
92
93 let stream = futures::stream::unfold(
94 (
95 cln_client,
96 last_pay_index,
97 self.wait_invoice_cancel_token.clone(),
98 Arc::clone(&self.wait_invoice_is_active),
99 ),
100 |(mut cln_client, mut last_pay_idx, cancel_token, is_active)| async move {
101 is_active.store(true, Ordering::SeqCst);
103
104 loop {
105 let request = WaitanyinvoiceRequest {
106 timeout: None,
107 lastpay_index: last_pay_idx,
108 };
109 tokio::select! {
110 _ = cancel_token.cancelled() => {
111 is_active.store(false, Ordering::SeqCst);
113 return None;
115 }
116 result = cln_client.call_typed(&request) => {
117 match result {
118 Ok(invoice) => {
119
120 match invoice.status {
123 WaitanyinvoiceStatus::PAID => (),
124 WaitanyinvoiceStatus::EXPIRED => continue,
125 }
126
127 last_pay_idx = invoice.pay_index;
128
129 let payment_hash = invoice.payment_hash.to_string();
130
131 let request_look_up = match invoice.bolt12 {
132 Some(_) => {
136 match fetch_invoice_by_payment_hash(
137 &mut cln_client,
138 &payment_hash,
139 )
140 .await
141 {
142 Ok(Some(invoice)) => {
143 if let Some(local_offer_id) = invoice.local_offer_id {
144 local_offer_id.to_string()
145 } else {
146 continue;
147 }
148 }
149 Ok(None) => continue,
150 Err(e) => {
151 tracing::warn!(
152 "Error fetching invoice by payment hash: {e}"
153 );
154 continue;
155 }
156 }
157 }
158 None => payment_hash,
159 };
160
161 return Some((request_look_up, (cln_client, last_pay_idx, cancel_token, is_active)));
162 }
163 Err(e) => {
164 tracing::warn!("Error fetching invoice: {e}");
165 is_active.store(false, Ordering::SeqCst);
166 return None;
167 }
168 }
169 }
170 }
171 }
172 },
173 )
174 .boxed();
175
176 Ok(stream)
177 }
178
179 async fn get_payment_quote(
180 &self,
181 request: &str,
182 unit: &CurrencyUnit,
183 options: Option<MeltOptions>,
184 ) -> Result<PaymentQuoteResponse, Self::Err> {
185 let bolt11 = Bolt11Invoice::from_str(request)?;
186
187 let amount_msat = match options {
188 Some(amount) => amount.amount_msat(),
189 None => bolt11
190 .amount_milli_satoshis()
191 .ok_or(Error::UnknownInvoiceAmount)?
192 .into(),
193 };
194
195 let amount = to_unit(amount_msat, &CurrencyUnit::Msat, unit)?;
196
197 let relative_fee_reserve =
198 (self.fee_reserve.percent_fee_reserve * u64::from(amount) as f32) as u64;
199
200 let absolute_fee_reserve: u64 = self.fee_reserve.min_fee_reserve.into();
201
202 let fee = max(relative_fee_reserve, absolute_fee_reserve);
203
204 Ok(PaymentQuoteResponse {
205 request_lookup_id: bolt11.payment_hash().to_string(),
206 amount,
207 fee: fee.into(),
208 state: MeltQuoteState::Unpaid,
209 })
210 }
211
212 async fn make_payment(
213 &self,
214 melt_quote: mint::MeltQuote,
215 partial_amount: Option<Amount>,
216 max_fee: Option<Amount>,
217 ) -> Result<MakePaymentResponse, Self::Err> {
218 let bolt11 = Bolt11Invoice::from_str(&melt_quote.request)?;
219 let pay_state = self
220 .check_outgoing_payment(&bolt11.payment_hash().to_string())
221 .await?;
222
223 match pay_state.status {
224 MeltQuoteState::Unpaid | MeltQuoteState::Unknown | MeltQuoteState::Failed => (),
225 MeltQuoteState::Paid => {
226 tracing::debug!("Melt attempted on invoice already paid");
227 return Err(Self::Err::InvoiceAlreadyPaid);
228 }
229 MeltQuoteState::Pending => {
230 tracing::debug!("Melt attempted on invoice already pending");
231 return Err(Self::Err::InvoicePaymentPending);
232 }
233 }
234
235 let amount_msat = partial_amount
236 .is_none()
237 .then(|| {
238 melt_quote
239 .msat_to_pay
240 .map(|a| CLN_Amount::from_msat(a.into()))
241 })
242 .flatten();
243
244 let mut cln_client = self.cln_client.lock().await;
245 let cln_response = cln_client
246 .call_typed(&PayRequest {
247 bolt11: melt_quote.request.to_string(),
248 amount_msat,
249 label: None,
250 riskfactor: None,
251 maxfeepercent: None,
252 retry_for: None,
253 maxdelay: None,
254 exemptfee: None,
255 localinvreqid: None,
256 exclude: None,
257 maxfee: max_fee
258 .map(|a| {
259 let msat = to_unit(a, &melt_quote.unit, &CurrencyUnit::Msat)?;
260 Ok::<CLN_Amount, Self::Err>(CLN_Amount::from_msat(msat.into()))
261 })
262 .transpose()?,
263 description: None,
264 partial_msat: partial_amount
265 .map(|a| {
266 let msat = to_unit(a, &melt_quote.unit, &CurrencyUnit::Msat)?;
267
268 Ok::<cln_rpc::primitives::Amount, Self::Err>(CLN_Amount::from_msat(
269 msat.into(),
270 ))
271 })
272 .transpose()?,
273 })
274 .await;
275
276 let response = match cln_response {
277 Ok(pay_response) => {
278 let status = match pay_response.status {
279 PayStatus::COMPLETE => MeltQuoteState::Paid,
280 PayStatus::PENDING => MeltQuoteState::Pending,
281 PayStatus::FAILED => MeltQuoteState::Failed,
282 };
283
284 MakePaymentResponse {
285 payment_proof: Some(hex::encode(pay_response.payment_preimage.to_vec())),
286 payment_lookup_id: pay_response.payment_hash.to_string(),
287 status,
288 total_spent: to_unit(
289 pay_response.amount_sent_msat.msat(),
290 &CurrencyUnit::Msat,
291 &melt_quote.unit,
292 )?,
293 unit: melt_quote.unit,
294 }
295 }
296 Err(err) => {
297 tracing::error!("Could not pay invoice: {}", err);
298 return Err(Error::ClnRpc(err).into());
299 }
300 };
301
302 Ok(response)
303 }
304
305 async fn create_incoming_payment_request(
306 &self,
307 amount: Amount,
308 unit: &CurrencyUnit,
309 description: String,
310 unix_expiry: Option<u64>,
311 ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
312 let time_now = unix_time();
313
314 let mut cln_client = self.cln_client.lock().await;
315
316 let label = Uuid::new_v4().to_string();
317
318 let amount = to_unit(amount, unit, &CurrencyUnit::Msat)?;
319 let amount_msat = AmountOrAny::Amount(CLN_Amount::from_msat(amount.into()));
320
321 let invoice_response = cln_client
322 .call_typed(&InvoiceRequest {
323 amount_msat,
324 description,
325 label: label.clone(),
326 expiry: unix_expiry.map(|t| t - time_now),
327 fallbacks: None,
328 preimage: None,
329 cltv: None,
330 deschashonly: None,
331 exposeprivatechannels: None,
332 })
333 .await
334 .map_err(Error::from)?;
335
336 let request = Bolt11Invoice::from_str(&invoice_response.bolt11)?;
337 let expiry = request.expires_at().map(|t| t.as_secs());
338 let payment_hash = request.payment_hash();
339
340 Ok(CreateIncomingPaymentResponse {
341 request_lookup_id: payment_hash.to_string(),
342 request: request.to_string(),
343 expiry,
344 })
345 }
346
347 async fn check_incoming_payment_status(
348 &self,
349 payment_hash: &str,
350 ) -> Result<MintQuoteState, Self::Err> {
351 let mut cln_client = self.cln_client.lock().await;
352
353 let listinvoices_response = cln_client
354 .call_typed(&ListinvoicesRequest {
355 payment_hash: Some(payment_hash.to_string()),
356 label: None,
357 invstring: None,
358 offer_id: None,
359 index: None,
360 limit: None,
361 start: None,
362 })
363 .await
364 .map_err(Error::from)?;
365
366 let status = match listinvoices_response.invoices.first() {
367 Some(invoice_response) => cln_invoice_status_to_mint_state(invoice_response.status),
368 None => {
369 tracing::info!(
370 "Check invoice called on unknown look up id: {}",
371 payment_hash
372 );
373 return Err(Error::WrongClnResponse.into());
374 }
375 };
376
377 Ok(status)
378 }
379
380 async fn check_outgoing_payment(
381 &self,
382 payment_hash: &str,
383 ) -> Result<MakePaymentResponse, Self::Err> {
384 let mut cln_client = self.cln_client.lock().await;
385
386 let listpays_response = cln_client
387 .call_typed(&ListpaysRequest {
388 payment_hash: Some(payment_hash.parse().map_err(|_| Error::InvalidHash)?),
389 bolt11: None,
390 status: None,
391 start: None,
392 index: None,
393 limit: None,
394 })
395 .await
396 .map_err(Error::from)?;
397
398 match listpays_response.pays.first() {
399 Some(pays_response) => {
400 let status = cln_pays_status_to_mint_state(pays_response.status);
401
402 Ok(MakePaymentResponse {
403 payment_lookup_id: pays_response.payment_hash.to_string(),
404 payment_proof: pays_response.preimage.map(|p| hex::encode(p.to_vec())),
405 status,
406 total_spent: pays_response
407 .amount_sent_msat
408 .map_or(Amount::ZERO, |a| a.msat().into()),
409 unit: CurrencyUnit::Msat,
410 })
411 }
412 None => Ok(MakePaymentResponse {
413 payment_lookup_id: payment_hash.to_string(),
414 payment_proof: None,
415 status: MeltQuoteState::Unknown,
416 total_spent: Amount::ZERO,
417 unit: CurrencyUnit::Msat,
418 }),
419 }
420 }
421}
422
423impl Cln {
424 async fn get_last_pay_index(&self) -> Result<Option<u64>, Error> {
426 let mut cln_client = self.cln_client.lock().await;
427 let listinvoices_response = cln_client
428 .call_typed(&ListinvoicesRequest {
429 index: None,
430 invstring: None,
431 label: None,
432 limit: None,
433 offer_id: None,
434 payment_hash: None,
435 start: None,
436 })
437 .await
438 .map_err(Error::from)?;
439
440 match listinvoices_response.invoices.last() {
441 Some(last_invoice) => Ok(last_invoice.pay_index),
442 None => Ok(None),
443 }
444 }
445}
446
447fn cln_invoice_status_to_mint_state(status: ListinvoicesInvoicesStatus) -> MintQuoteState {
448 match status {
449 ListinvoicesInvoicesStatus::UNPAID => MintQuoteState::Unpaid,
450 ListinvoicesInvoicesStatus::PAID => MintQuoteState::Paid,
451 ListinvoicesInvoicesStatus::EXPIRED => MintQuoteState::Unpaid,
452 }
453}
454
455fn cln_pays_status_to_mint_state(status: ListpaysPaysStatus) -> MeltQuoteState {
456 match status {
457 ListpaysPaysStatus::PENDING => MeltQuoteState::Pending,
458 ListpaysPaysStatus::COMPLETE => MeltQuoteState::Paid,
459 ListpaysPaysStatus::FAILED => MeltQuoteState::Failed,
460 }
461}
462
463async fn fetch_invoice_by_payment_hash(
464 cln_client: &mut cln_rpc::ClnRpc,
465 payment_hash: &str,
466) -> Result<Option<ListinvoicesInvoices>, Error> {
467 match cln_client
468 .call_typed(&ListinvoicesRequest {
469 payment_hash: Some(payment_hash.to_string()),
470 index: None,
471 invstring: None,
472 label: None,
473 limit: None,
474 offer_id: None,
475 start: None,
476 })
477 .await
478 {
479 Ok(invoice_response) => Ok(invoice_response.invoices.first().cloned()),
480 Err(e) => {
481 tracing::warn!("Error fetching invoice: {e}");
482 Err(Error::from(e))
483 }
484 }
485}