Skip to main content

cdk_payment_processor/proto/
client.rs

1use std::path::PathBuf;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use cdk_common::grpc::{VersionInterceptor, VERSION_HEADER};
8use cdk_common::payment::{
9    CreateIncomingPaymentResponse, IncomingPaymentOptions as CdkIncomingPaymentOptions,
10    MakePaymentResponse as CdkMakePaymentResponse, MintPayment,
11    PaymentQuoteResponse as CdkPaymentQuoteResponse, WaitPaymentResponse,
12};
13use futures::{Stream, StreamExt};
14use tokio_util::sync::CancellationToken;
15use tonic::codegen::InterceptedService;
16use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
17use tonic::{async_trait, Request};
18use tracing::instrument;
19
20use crate::proto::cdk_payment_processor_client::CdkPaymentProcessorClient;
21use crate::proto::{
22    CheckIncomingPaymentRequest, CheckOutgoingPaymentRequest, CreatePaymentRequest, EmptyRequest,
23    IncomingPaymentOptions, IntoProtoAmount, MakePaymentRequest, OutgoingPaymentRequestType,
24    PaymentQuoteRequest,
25};
26
27/// Payment Processor
28#[derive(Clone)]
29pub struct PaymentProcessorClient {
30    inner: CdkPaymentProcessorClient<InterceptedService<Channel, VersionInterceptor>>,
31    wait_incoming_payment_stream_is_active: Arc<AtomicBool>,
32    cancel_incoming_payment_listener: CancellationToken,
33}
34
35impl std::fmt::Debug for PaymentProcessorClient {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_struct("PaymentProcessorClient")
38            .finish_non_exhaustive()
39    }
40}
41
42impl PaymentProcessorClient {
43    /// Payment Processor
44    pub async fn new(addr: &str, port: u16, tls_dir: Option<PathBuf>) -> anyhow::Result<Self> {
45        let addr = format!("{addr}:{port}");
46        let channel = if let Some(tls_dir) = tls_dir {
47            // TLS directory exists, configure TLS
48
49            // Check for ca.pem
50            let ca_pem_path = tls_dir.join("ca.pem");
51            if !ca_pem_path.exists() {
52                let err_msg = format!("CA certificate file not found: {}", ca_pem_path.display());
53                tracing::error!("{}", err_msg);
54                return Err(anyhow!(err_msg));
55            }
56
57            // Check for client.pem
58            let client_pem_path = tls_dir.join("client.pem");
59
60            // Check for client.key
61            let client_key_path = tls_dir.join("client.key");
62            // check for ca cert
63            let server_root_ca_cert = std::fs::read_to_string(&ca_pem_path)?;
64            let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
65            let tls: ClientTlsConfig = match client_pem_path.exists() && client_key_path.exists() {
66                true => {
67                    let client_cert = std::fs::read_to_string(&client_pem_path)?;
68                    let client_key = std::fs::read_to_string(&client_key_path)?;
69                    let client_identity = Identity::from_pem(client_cert, client_key);
70                    ClientTlsConfig::new()
71                        .ca_certificate(server_root_ca_cert)
72                        .identity(client_identity)
73                }
74                false => ClientTlsConfig::new().ca_certificate(server_root_ca_cert),
75            };
76            Channel::from_shared(addr)?
77                .tls_config(tls)?
78                .connect()
79                .await?
80        } else {
81            // No TLS directory, skip TLS configuration
82            Channel::from_shared(addr)?.connect().await?
83        };
84
85        let interceptor = VersionInterceptor::new(
86            VERSION_HEADER,
87            cdk_common::PAYMENT_PROCESSOR_PROTOCOL_VERSION,
88        );
89        let client = CdkPaymentProcessorClient::with_interceptor(channel, interceptor);
90
91        Ok(Self {
92            inner: client,
93            wait_incoming_payment_stream_is_active: Arc::new(AtomicBool::new(false)),
94            cancel_incoming_payment_listener: CancellationToken::new(),
95        })
96    }
97}
98
99#[async_trait]
100impl MintPayment for PaymentProcessorClient {
101    type Err = cdk_common::payment::Error;
102
103    async fn get_settings(&self) -> Result<cdk_common::payment::SettingsResponse, Self::Err> {
104        let mut inner = self.inner.clone();
105        let response = inner
106            .get_settings(Request::new(EmptyRequest {}))
107            .await
108            .map_err(|err| {
109                tracing::error!("Could not get settings: {}", err);
110                cdk_common::payment::Error::Custom(err.to_string())
111            })?;
112
113        let settings = response.into_inner();
114
115        Ok(cdk_common::payment::SettingsResponse {
116            unit: settings.unit,
117            bolt11: settings
118                .bolt11
119                .map(|b| cdk_common::payment::Bolt11Settings {
120                    mpp: b.mpp,
121                    amountless: b.amountless,
122                    invoice_description: b.invoice_description,
123                }),
124            bolt12: settings
125                .bolt12
126                .map(|b| cdk_common::payment::Bolt12Settings {
127                    amountless: b.amountless,
128                }),
129            custom: settings.custom,
130        })
131    }
132
133    /// Create a new invoice
134    async fn create_incoming_payment_request(
135        &self,
136        options: CdkIncomingPaymentOptions,
137    ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
138        let mut inner = self.inner.clone();
139
140        let proto_options = match options {
141            CdkIncomingPaymentOptions::Custom(opts) => IncomingPaymentOptions {
142                options: Some(super::incoming_payment_options::Options::Custom(
143                    super::CustomIncomingPaymentOptions {
144                        description: opts.description,
145                        amount: Some(opts.amount.into()),
146                        unix_expiry: opts.unix_expiry,
147                        extra_json: opts.extra_json.clone(),
148                    },
149                )),
150            },
151            CdkIncomingPaymentOptions::Bolt11(opts) => IncomingPaymentOptions {
152                options: Some(super::incoming_payment_options::Options::Bolt11(
153                    super::Bolt11IncomingPaymentOptions {
154                        description: opts.description,
155                        amount: Some(opts.amount.into()),
156                        unix_expiry: opts.unix_expiry,
157                    },
158                )),
159            },
160            CdkIncomingPaymentOptions::Bolt12(opts) => IncomingPaymentOptions {
161                options: Some(super::incoming_payment_options::Options::Bolt12(
162                    super::Bolt12IncomingPaymentOptions {
163                        description: opts.description,
164                        amount: opts.amount.map(Into::into),
165                        unix_expiry: opts.unix_expiry,
166                    },
167                )),
168            },
169        };
170
171        let response = inner
172            .create_payment(Request::new(CreatePaymentRequest {
173                options: Some(proto_options),
174            }))
175            .await
176            .map_err(|err| {
177                tracing::error!("Could not create payment request: {}", err);
178                cdk_common::payment::Error::Custom(err.to_string())
179            })?;
180
181        let response = response.into_inner();
182
183        Ok(response.try_into().map_err(|_| {
184            cdk_common::payment::Error::Anyhow(anyhow!("Could not create create payment response"))
185        })?)
186    }
187
188    async fn get_payment_quote(
189        &self,
190        unit: &cdk_common::CurrencyUnit,
191        options: cdk_common::payment::OutgoingPaymentOptions,
192    ) -> Result<CdkPaymentQuoteResponse, Self::Err> {
193        let mut inner = self.inner.clone();
194
195        let request_type = match &options {
196            cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
197                OutgoingPaymentRequestType::Custom
198            }
199            cdk_common::payment::OutgoingPaymentOptions::Bolt11(_) => {
200                OutgoingPaymentRequestType::Bolt11Invoice
201            }
202            cdk_common::payment::OutgoingPaymentOptions::Bolt12(_) => {
203                OutgoingPaymentRequestType::Bolt12Offer
204            }
205        };
206
207        let proto_request = match &options {
208            cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.request.to_string(),
209            cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.bolt11.to_string(),
210            cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.offer.to_string(),
211        };
212
213        let proto_options = match &options {
214            cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.melt_options,
215            cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.melt_options,
216            cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.melt_options,
217        };
218
219        let extra_json = match &options {
220            cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.extra_json.clone(),
221            _ => None,
222        };
223
224        let response = inner
225            .get_payment_quote(Request::new(PaymentQuoteRequest {
226                request: proto_request,
227                unit: unit.to_string(),
228                options: proto_options.map(Into::into),
229                request_type: request_type.into(),
230                extra_json,
231            }))
232            .await
233            .map_err(|err| {
234                tracing::error!("Could not get payment quote: {}", err);
235                cdk_common::payment::Error::Custom(err.to_string())
236            })?;
237
238        let response = response.into_inner();
239
240        Ok(response.try_into().map_err(|_| {
241            cdk_common::payment::Error::Custom(
242                "Failed to convert payment quote response".to_string(),
243            )
244        })?)
245    }
246
247    async fn make_payment(
248        &self,
249        unit: &cdk_common::CurrencyUnit,
250        options: cdk_common::payment::OutgoingPaymentOptions,
251    ) -> Result<CdkMakePaymentResponse, Self::Err> {
252        let mut inner = self.inner.clone();
253        let payment_options = match options {
254            cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => {
255                super::OutgoingPaymentVariant {
256                    options: Some(super::outgoing_payment_variant::Options::Custom(
257                        super::CustomOutgoingPaymentOptions {
258                            offer: opts.request.to_string(),
259                            max_fee_amount: opts.max_fee_amount.into_proto(),
260                            timeout_secs: opts.timeout_secs,
261                            melt_options: opts.melt_options.map(Into::into),
262                            extra_json: opts.extra_json.clone(),
263                        },
264                    )),
265                }
266            }
267            cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => {
268                super::OutgoingPaymentVariant {
269                    options: Some(super::outgoing_payment_variant::Options::Bolt11(
270                        super::Bolt11OutgoingPaymentOptions {
271                            bolt11: opts.bolt11.to_string(),
272                            max_fee_amount: opts.max_fee_amount.into_proto(),
273                            timeout_secs: opts.timeout_secs,
274                            melt_options: opts.melt_options.map(Into::into),
275                        },
276                    )),
277                }
278            }
279            cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => {
280                super::OutgoingPaymentVariant {
281                    options: Some(super::outgoing_payment_variant::Options::Bolt12(
282                        super::Bolt12OutgoingPaymentOptions {
283                            offer: opts.offer.to_string(),
284                            max_fee_amount: opts.max_fee_amount.into_proto(),
285                            timeout_secs: opts.timeout_secs,
286                            melt_options: opts.melt_options.map(Into::into),
287                        },
288                    )),
289                }
290            }
291        };
292
293        let response = inner
294            .make_payment(Request::new(MakePaymentRequest {
295                payment_options: Some(payment_options),
296                partial_amount: None,
297                max_fee_amount: None,
298                unit: unit.to_string(),
299            }))
300            .await
301            .map_err(|err| {
302                tracing::error!("Could not pay payment request: {}", err);
303
304                if err.message().contains("already paid") {
305                    cdk_common::payment::Error::InvoiceAlreadyPaid
306                } else if err.message().contains("pending") {
307                    cdk_common::payment::Error::InvoicePaymentPending
308                } else {
309                    cdk_common::payment::Error::Custom(err.to_string())
310                }
311            })?;
312
313        let response = response.into_inner();
314
315        Ok(response.try_into().map_err(|_err| {
316            cdk_common::payment::Error::Anyhow(anyhow!("could not make payment"))
317        })?)
318    }
319
320    #[instrument(skip_all)]
321    async fn wait_payment_event(
322        &self,
323    ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
324        self.wait_incoming_payment_stream_is_active
325            .store(true, Ordering::SeqCst);
326        tracing::debug!("Client waiting for payment");
327        let mut inner = self.inner.clone();
328        let stream = inner
329            .wait_incoming_payment(Request::new(EmptyRequest {}))
330            .await
331            .map_err(|err| {
332                tracing::error!("Could not check incoming payment stream: {}", err);
333                cdk_common::payment::Error::Custom(err.to_string())
334            })?
335            .into_inner();
336
337        let cancel_token = self.cancel_incoming_payment_listener.clone();
338        let cancel_fut = cancel_token.cancelled_owned();
339        let active_flag = self.wait_incoming_payment_stream_is_active.clone();
340
341        let transformed_stream = stream
342            .take_until(cancel_fut)
343            .filter_map(|item| async {
344                match item {
345                    Ok(value) => match value.try_into() {
346                        Ok(payment_response) => Some(cdk_common::payment::Event::PaymentReceived(
347                            payment_response,
348                        )),
349                        Err(e) => {
350                            tracing::error!("Error converting payment response: {}", e);
351                            None
352                        }
353                    },
354                    Err(e) => {
355                        tracing::error!("Error in payment stream: {}", e);
356                        None
357                    }
358                }
359            })
360            .inspect(move |_| {
361                active_flag.store(false, Ordering::SeqCst);
362                tracing::info!("Payment stream inactive");
363            });
364
365        Ok(Box::pin(transformed_stream))
366    }
367
368    /// Is wait invoice active
369    fn is_wait_invoice_active(&self) -> bool {
370        self.wait_incoming_payment_stream_is_active
371            .load(Ordering::SeqCst)
372    }
373
374    /// Cancel wait invoice
375    fn cancel_wait_invoice(&self) {
376        self.cancel_incoming_payment_listener.cancel();
377    }
378
379    async fn check_incoming_payment_status(
380        &self,
381        payment_identifier: &cdk_common::payment::PaymentIdentifier,
382    ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
383        let mut inner = self.inner.clone();
384        let response = inner
385            .check_incoming_payment(Request::new(CheckIncomingPaymentRequest {
386                request_identifier: Some(payment_identifier.clone().into()),
387            }))
388            .await
389            .map_err(|err| {
390                tracing::error!("Could not check incoming payment: {}", err);
391                cdk_common::payment::Error::Custom(err.to_string())
392            })?;
393
394        let check_incoming = response.into_inner();
395        check_incoming
396            .payments
397            .into_iter()
398            .map(|resp| resp.try_into().map_err(Self::Err::from))
399            .collect()
400    }
401
402    async fn check_outgoing_payment(
403        &self,
404        payment_identifier: &cdk_common::payment::PaymentIdentifier,
405    ) -> Result<CdkMakePaymentResponse, Self::Err> {
406        let mut inner = self.inner.clone();
407        let response = inner
408            .check_outgoing_payment(Request::new(CheckOutgoingPaymentRequest {
409                request_identifier: Some(payment_identifier.clone().into()),
410            }))
411            .await
412            .map_err(|err| {
413                tracing::error!("Could not check outgoing payment: {}", err);
414                cdk_common::payment::Error::Custom(err.to_string())
415            })?;
416
417        let check_outgoing = response.into_inner();
418
419        Ok(check_outgoing
420            .try_into()
421            .map_err(|_| cdk_common::payment::Error::UnknownPaymentState)?)
422    }
423}