Skip to main content

cdk_payment_processor/proto/
client.rs

1use std::path::PathBuf;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use anyhow::anyhow;
8use cdk_common::grpc::{VersionInterceptor, VERSION_HEADER};
9use cdk_common::payment::{
10    CreateIncomingPaymentResponse, IncomingPaymentOptions as CdkIncomingPaymentOptions,
11    MakePaymentResponse as CdkMakePaymentResponse, MintPayment,
12    PaymentQuoteResponse as CdkPaymentQuoteResponse, WaitPaymentResponse,
13};
14use futures::{Stream, StreamExt};
15use tokio::sync::Mutex;
16use tokio_util::sync::CancellationToken;
17use tonic::codegen::InterceptedService;
18use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
19use tonic::{async_trait, Request};
20use tracing::instrument;
21
22use crate::proto::cdk_payment_processor_client::CdkPaymentProcessorClient;
23use crate::proto::{
24    CheckIncomingPaymentRequest, CheckOutgoingPaymentRequest, CreatePaymentRequest, EmptyRequest,
25    IncomingPaymentOptions, IntoProtoAmount, MakePaymentRequest, OutgoingPaymentRequestType,
26    PaymentQuoteRequest,
27};
28
29/// Payment Processor
30#[derive(Clone)]
31pub struct PaymentProcessorClient {
32    inner: CdkPaymentProcessorClient<InterceptedService<Channel, VersionInterceptor>>,
33    payment_event_stream_is_active: Arc<AtomicBool>,
34    cancel_payment_event_stream: Arc<Mutex<CancellationToken>>,
35}
36
37struct ActivePaymentEventStream {
38    inner: Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>,
39    active_flag: Arc<AtomicBool>,
40}
41
42impl ActivePaymentEventStream {
43    fn new(
44        inner: Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>,
45        active_flag: Arc<AtomicBool>,
46    ) -> Self {
47        Self { inner, active_flag }
48    }
49}
50
51impl Drop for ActivePaymentEventStream {
52    fn drop(&mut self) {
53        self.active_flag.store(false, Ordering::SeqCst);
54        tracing::info!("Payment event stream inactive");
55    }
56}
57
58impl Stream for ActivePaymentEventStream {
59    type Item = cdk_common::payment::Event;
60
61    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62        let this = self.get_mut();
63        this.inner.as_mut().poll_next(cx)
64    }
65}
66
67impl std::fmt::Debug for PaymentProcessorClient {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("PaymentProcessorClient")
70            .finish_non_exhaustive()
71    }
72}
73
74impl PaymentProcessorClient {
75    /// Payment Processor
76    pub async fn new(addr: &str, port: u16, tls_dir: Option<PathBuf>) -> anyhow::Result<Self> {
77        let addr = format!("{addr}:{port}");
78        let channel = if let Some(tls_dir) = tls_dir {
79            // TLS directory exists, configure TLS
80
81            // Check for ca.pem
82            let ca_pem_path = tls_dir.join("ca.pem");
83            if !ca_pem_path.exists() {
84                let err_msg = format!("CA certificate file not found: {}", ca_pem_path.display());
85                tracing::error!("{}", err_msg);
86                return Err(anyhow!(err_msg));
87            }
88
89            // Check for client.pem
90            let client_pem_path = tls_dir.join("client.pem");
91
92            // Check for client.key
93            let client_key_path = tls_dir.join("client.key");
94            // check for ca cert
95            let server_root_ca_cert = std::fs::read_to_string(&ca_pem_path)?;
96            let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
97            let tls: ClientTlsConfig = match client_pem_path.exists() && client_key_path.exists() {
98                true => {
99                    let client_cert = std::fs::read_to_string(&client_pem_path)?;
100                    let client_key = std::fs::read_to_string(&client_key_path)?;
101                    let client_identity = Identity::from_pem(client_cert, client_key);
102                    ClientTlsConfig::new()
103                        .ca_certificate(server_root_ca_cert)
104                        .identity(client_identity)
105                }
106                false => ClientTlsConfig::new().ca_certificate(server_root_ca_cert),
107            };
108            Channel::from_shared(addr)?
109                .tls_config(tls)?
110                .connect()
111                .await?
112        } else {
113            // No TLS directory, skip TLS configuration
114            Channel::from_shared(addr)?.connect().await?
115        };
116
117        let interceptor = VersionInterceptor::new(
118            VERSION_HEADER,
119            cdk_common::PAYMENT_PROCESSOR_PROTOCOL_VERSION,
120        );
121        let client = CdkPaymentProcessorClient::with_interceptor(channel, interceptor);
122
123        Ok(Self {
124            inner: client,
125            payment_event_stream_is_active: Arc::new(AtomicBool::new(false)),
126            cancel_payment_event_stream: Arc::new(Mutex::new(CancellationToken::new())),
127        })
128    }
129}
130
131#[async_trait]
132impl MintPayment for PaymentProcessorClient {
133    type Err = cdk_common::payment::Error;
134
135    async fn get_settings(&self) -> Result<cdk_common::payment::SettingsResponse, Self::Err> {
136        let mut inner = self.inner.clone();
137        let response = inner
138            .get_settings(Request::new(EmptyRequest {}))
139            .await
140            .map_err(|err| {
141                tracing::error!("Could not get settings: {}", err);
142                cdk_common::payment::Error::Custom(err.to_string())
143            })?;
144
145        let settings = response.into_inner();
146
147        Ok(cdk_common::payment::SettingsResponse {
148            unit: settings.unit,
149            bolt11: settings
150                .bolt11
151                .map(|b| cdk_common::payment::Bolt11Settings {
152                    mpp: b.mpp,
153                    amountless: b.amountless,
154                    invoice_description: b.invoice_description,
155                }),
156            bolt12: settings
157                .bolt12
158                .map(|b| cdk_common::payment::Bolt12Settings {
159                    amountless: b.amountless,
160                }),
161            onchain: settings
162                .onchain
163                .map(|o| cdk_common::payment::OnchainSettings {
164                    confirmations: o.confirmations,
165                    min_receive_amount_sat: o.min_receive_amount_sat,
166                    min_send_amount_sat: o.min_send_amount_sat,
167                }),
168            custom: settings.custom,
169        })
170    }
171
172    /// Create a new invoice
173    async fn create_incoming_payment_request(
174        &self,
175        options: CdkIncomingPaymentOptions,
176    ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
177        let mut inner = self.inner.clone();
178
179        let proto_options = match options {
180            CdkIncomingPaymentOptions::Custom(opts) => IncomingPaymentOptions {
181                options: Some(super::incoming_payment_options::Options::Custom(
182                    super::CustomIncomingPaymentOptions {
183                        description: opts.description,
184                        amount: Some(opts.amount.into()),
185                        unix_expiry: opts.unix_expiry,
186                        extra_json: opts.extra_json.clone(),
187                    },
188                )),
189            },
190            CdkIncomingPaymentOptions::Bolt11(opts) => IncomingPaymentOptions {
191                options: Some(super::incoming_payment_options::Options::Bolt11(
192                    super::Bolt11IncomingPaymentOptions {
193                        description: opts.description,
194                        amount: Some(opts.amount.into()),
195                        unix_expiry: opts.unix_expiry,
196                    },
197                )),
198            },
199            CdkIncomingPaymentOptions::Bolt12(opts) => IncomingPaymentOptions {
200                options: Some(super::incoming_payment_options::Options::Bolt12(
201                    super::Bolt12IncomingPaymentOptions {
202                        description: opts.description,
203                        amount: opts.amount.map(Into::into),
204                        unix_expiry: opts.unix_expiry,
205                    },
206                )),
207            },
208            CdkIncomingPaymentOptions::Onchain(opts) => IncomingPaymentOptions {
209                options: Some(super::incoming_payment_options::Options::Onchain(
210                    super::OnchainIncomingPaymentOptions {
211                        quote_id: opts.quote_id.to_string(),
212                    },
213                )),
214            },
215        };
216
217        let response = inner
218            .create_payment(Request::new(CreatePaymentRequest {
219                options: Some(proto_options),
220            }))
221            .await
222            .map_err(|err| {
223                tracing::error!("Could not create payment request: {}", err);
224                cdk_common::payment::Error::Custom(err.to_string())
225            })?;
226
227        let response = response.into_inner();
228
229        Ok(response.try_into().map_err(|_| {
230            cdk_common::payment::Error::Anyhow(anyhow!("Could not create create payment response"))
231        })?)
232    }
233
234    async fn get_payment_quote(
235        &self,
236        unit: &cdk_common::CurrencyUnit,
237        options: cdk_common::payment::OutgoingPaymentOptions,
238    ) -> Result<CdkPaymentQuoteResponse, Self::Err> {
239        let mut inner = self.inner.clone();
240
241        let request_type = match &options {
242            cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
243                OutgoingPaymentRequestType::Custom
244            }
245            cdk_common::payment::OutgoingPaymentOptions::Bolt11(_) => {
246                OutgoingPaymentRequestType::Bolt11Invoice
247            }
248            cdk_common::payment::OutgoingPaymentOptions::Bolt12(_) => {
249                OutgoingPaymentRequestType::Bolt12Offer
250            }
251            cdk_common::payment::OutgoingPaymentOptions::Onchain(_) => {
252                OutgoingPaymentRequestType::Onchain
253            }
254        };
255
256        let proto_request = match &options {
257            cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.request.to_string(),
258            cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.bolt11.to_string(),
259            cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.offer.to_string(),
260            cdk_common::payment::OutgoingPaymentOptions::Onchain(opts) => opts.address.clone(),
261        };
262
263        let proto_options = match &options {
264            cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.melt_options,
265            cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.melt_options,
266            cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.melt_options,
267            cdk_common::payment::OutgoingPaymentOptions::Onchain(_) => None,
268        };
269
270        let onchain_options = match &options {
271            cdk_common::payment::OutgoingPaymentOptions::Onchain(opts) => {
272                Some(super::OnchainOutgoingPaymentOptions {
273                    address: opts.address.clone(),
274                    amount: Some(opts.amount.clone().into()),
275                    max_fee_amount: opts.max_fee_amount.clone().into_proto(),
276                    quote_id: opts.quote_id.to_string(),
277                    fee_index: opts.fee_index,
278                    metadata: opts.metadata.clone(),
279                })
280            }
281            _ => None,
282        };
283
284        let extra_json = match &options {
285            cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.extra_json.clone(),
286            _ => None,
287        };
288
289        let quote_id = match &options {
290            cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.quote_id.to_string(),
291            cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.quote_id.to_string(),
292            cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.quote_id.to_string(),
293            cdk_common::payment::OutgoingPaymentOptions::Onchain(opts) => opts.quote_id.to_string(),
294        };
295
296        let response = inner
297            .get_payment_quote(Request::new(PaymentQuoteRequest {
298                request: proto_request,
299                unit: unit.to_string(),
300                options: proto_options.map(Into::into),
301                request_type: request_type.into(),
302                extra_json,
303                quote_id,
304                onchain_options,
305            }))
306            .await
307            .map_err(|err| {
308                tracing::error!("Could not get payment quote: {}", err);
309                cdk_common::payment::Error::Custom(err.to_string())
310            })?;
311
312        let response = response.into_inner();
313
314        Ok(response.try_into().map_err(|_| {
315            cdk_common::payment::Error::Custom(
316                "Failed to convert payment quote response".to_string(),
317            )
318        })?)
319    }
320
321    async fn make_payment(
322        &self,
323        unit: &cdk_common::CurrencyUnit,
324        options: cdk_common::payment::OutgoingPaymentOptions,
325    ) -> Result<CdkMakePaymentResponse, Self::Err> {
326        let mut inner = self.inner.clone();
327        let payment_options = match options {
328            cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => {
329                super::OutgoingPaymentVariant {
330                    options: Some(super::outgoing_payment_variant::Options::Custom(
331                        super::CustomOutgoingPaymentOptions {
332                            offer: opts.request.to_string(),
333                            max_fee_amount: opts.max_fee_amount.into_proto(),
334                            timeout_secs: opts.timeout_secs,
335                            melt_options: opts.melt_options.map(Into::into),
336                            extra_json: opts.extra_json.clone(),
337                            quote_id: opts.quote_id.to_string(),
338                        },
339                    )),
340                }
341            }
342            cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => {
343                super::OutgoingPaymentVariant {
344                    options: Some(super::outgoing_payment_variant::Options::Bolt11(
345                        super::Bolt11OutgoingPaymentOptions {
346                            bolt11: opts.bolt11.to_string(),
347                            max_fee_amount: opts.max_fee_amount.into_proto(),
348                            timeout_secs: opts.timeout_secs,
349                            melt_options: opts.melt_options.map(Into::into),
350                            quote_id: opts.quote_id.to_string(),
351                        },
352                    )),
353                }
354            }
355            cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => {
356                super::OutgoingPaymentVariant {
357                    options: Some(super::outgoing_payment_variant::Options::Bolt12(
358                        super::Bolt12OutgoingPaymentOptions {
359                            offer: opts.offer.to_string(),
360                            max_fee_amount: opts.max_fee_amount.into_proto(),
361                            timeout_secs: opts.timeout_secs,
362                            melt_options: opts.melt_options.map(Into::into),
363                            quote_id: opts.quote_id.to_string(),
364                        },
365                    )),
366                }
367            }
368            cdk_common::payment::OutgoingPaymentOptions::Onchain(opts) => {
369                super::OutgoingPaymentVariant {
370                    options: Some(super::outgoing_payment_variant::Options::Onchain(
371                        super::OnchainOutgoingPaymentOptions {
372                            address: opts.address.clone(),
373                            amount: Some(opts.amount.into()),
374                            max_fee_amount: opts.max_fee_amount.into_proto(),
375                            quote_id: opts.quote_id.to_string(),
376                            fee_index: opts.fee_index,
377                            metadata: opts.metadata.clone(),
378                        },
379                    )),
380                }
381            }
382        };
383
384        let response = inner
385            .make_payment(Request::new(MakePaymentRequest {
386                payment_options: Some(payment_options),
387                partial_amount: None,
388                max_fee_amount: None,
389                unit: unit.to_string(),
390            }))
391            .await
392            .map_err(|err| {
393                tracing::error!("Could not pay payment request: {}", err);
394
395                if err.message().contains("already paid") {
396                    cdk_common::payment::Error::InvoiceAlreadyPaid
397                } else if err.message().contains("pending") {
398                    cdk_common::payment::Error::InvoicePaymentPending
399                } else {
400                    cdk_common::payment::Error::Custom(err.to_string())
401                }
402            })?;
403
404        let response = response.into_inner();
405
406        Ok(response.try_into().map_err(|_err| {
407            cdk_common::payment::Error::Anyhow(anyhow!("could not make payment"))
408        })?)
409    }
410
411    #[instrument(skip_all)]
412    async fn wait_payment_event(
413        &self,
414    ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
415        tracing::debug!("Client waiting for payment");
416        let mut inner = self.inner.clone();
417        let stream = inner
418            .wait_payment_event(Request::new(EmptyRequest {}))
419            .await
420            .map_err(|err| {
421                self.payment_event_stream_is_active
422                    .store(false, Ordering::SeqCst);
423                tracing::error!("Could not open payment event stream: {}", err);
424                cdk_common::payment::Error::Custom(err.to_string())
425            })?
426            .into_inner();
427
428        self.payment_event_stream_is_active
429            .store(true, Ordering::SeqCst);
430
431        let cancel_token = self.cancel_payment_event_stream.lock().await.clone();
432        let cancel_fut = cancel_token.cancelled_owned();
433        let active_flag = self.payment_event_stream_is_active.clone();
434
435        let transformed_stream = stream.take_until(cancel_fut).filter_map(|item| async {
436            match item {
437                Ok(value) => match value.try_into() {
438                    Ok(payment_event) => Some(payment_event),
439                    Err(e) => {
440                        tracing::error!("Error converting payment event: {}", e);
441                        None
442                    }
443                },
444                Err(e) => {
445                    tracing::error!("Error in payment event stream: {}", e);
446                    None
447                }
448            }
449        });
450
451        Ok(Box::pin(ActivePaymentEventStream::new(
452            Box::pin(transformed_stream),
453            active_flag,
454        )))
455    }
456
457    /// Is payment event stream active
458    fn is_payment_event_stream_active(&self) -> bool {
459        self.payment_event_stream_is_active.load(Ordering::SeqCst)
460    }
461
462    /// Cancel payment event stream
463    fn cancel_payment_event_stream(&self) {
464        let cancel_payment_event_stream = Arc::clone(&self.cancel_payment_event_stream);
465
466        tokio::spawn(async move {
467            let mut cancel_token = cancel_payment_event_stream.lock().await;
468            cancel_token.cancel();
469            *cancel_token = CancellationToken::new();
470        });
471    }
472
473    async fn check_incoming_payment_status(
474        &self,
475        payment_identifier: &cdk_common::payment::PaymentIdentifier,
476    ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
477        let mut inner = self.inner.clone();
478        let response = inner
479            .check_incoming_payment(Request::new(CheckIncomingPaymentRequest {
480                request_identifier: Some(payment_identifier.clone().into()),
481            }))
482            .await
483            .map_err(|err| {
484                tracing::error!("Could not check incoming payment: {}", err);
485                cdk_common::payment::Error::Custom(err.to_string())
486            })?;
487
488        let check_incoming = response.into_inner();
489        check_incoming
490            .payments
491            .into_iter()
492            .map(|resp| resp.try_into().map_err(Self::Err::from))
493            .collect()
494    }
495
496    async fn check_outgoing_payment(
497        &self,
498        payment_identifier: &cdk_common::payment::PaymentIdentifier,
499    ) -> Result<CdkMakePaymentResponse, Self::Err> {
500        let mut inner = self.inner.clone();
501        let response = inner
502            .check_outgoing_payment(Request::new(CheckOutgoingPaymentRequest {
503                request_identifier: Some(payment_identifier.clone().into()),
504            }))
505            .await
506            .map_err(|err| {
507                tracing::error!("Could not check outgoing payment: {}", err);
508                cdk_common::payment::Error::Custom(err.to_string())
509            })?;
510
511        let check_outgoing = response.into_inner();
512
513        Ok(check_outgoing
514            .try_into()
515            .map_err(|_| cdk_common::payment::Error::UnknownPaymentState)?)
516    }
517}