Skip to main content

cdk_payment_processor/proto/
client.rs

1use std::path::PathBuf;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use cdk_common::grpc::VERSION_HEADER;
8use cdk_common::payment::{
9    CreateIncomingPaymentResponse, IncomingPaymentOptions as CdkIncomingPaymentOptions,
10    MakePaymentResponse as CdkMakePaymentResponse, MintPayment,
11    PaymentQuoteResponse as CdkPaymentQuoteResponse, WaitPaymentResponse,
12};
13use futures::{Stream, StreamExt};
14use tokio_util::sync::CancellationToken;
15use tonic::metadata::MetadataValue;
16use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
17use tonic::{async_trait, Request};
18use tracing::instrument;
19
20use crate::proto::cdk_payment_processor_client::CdkPaymentProcessorClient;
21use crate::proto::{
22    CheckIncomingPaymentRequest, CheckOutgoingPaymentRequest, CreatePaymentRequest, EmptyRequest,
23    IncomingPaymentOptions, MakePaymentRequest, OutgoingPaymentRequestType, PaymentQuoteRequest,
24};
25
26/// Helper function to add version header to a request
27fn with_version_header<T>(mut request: Request<T>) -> Request<T> {
28    request.metadata_mut().insert(
29        VERSION_HEADER,
30        MetadataValue::from_static(cdk_common::PAYMENT_PROCESSOR_PROTOCOL_VERSION),
31    );
32    request
33}
34
35/// Payment Processor
36#[derive(Clone)]
37pub struct PaymentProcessorClient {
38    inner: CdkPaymentProcessorClient<Channel>,
39    wait_incoming_payment_stream_is_active: Arc<AtomicBool>,
40    cancel_incoming_payment_listener: CancellationToken,
41}
42
43impl std::fmt::Debug for PaymentProcessorClient {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        f.debug_struct("PaymentProcessorClient")
46            .finish_non_exhaustive()
47    }
48}
49
50impl PaymentProcessorClient {
51    /// Payment Processor
52    pub async fn new(addr: &str, port: u16, tls_dir: Option<PathBuf>) -> anyhow::Result<Self> {
53        let addr = format!("{addr}:{port}");
54        let channel = if let Some(tls_dir) = tls_dir {
55            // TLS directory exists, configure TLS
56
57            // Check for ca.pem
58            let ca_pem_path = tls_dir.join("ca.pem");
59            if !ca_pem_path.exists() {
60                let err_msg = format!("CA certificate file not found: {}", ca_pem_path.display());
61                tracing::error!("{}", err_msg);
62                return Err(anyhow!(err_msg));
63            }
64
65            // Check for client.pem
66            let client_pem_path = tls_dir.join("client.pem");
67
68            // Check for client.key
69            let client_key_path = tls_dir.join("client.key");
70            // check for ca cert
71            let server_root_ca_cert = std::fs::read_to_string(&ca_pem_path)?;
72            let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
73            let tls: ClientTlsConfig = match client_pem_path.exists() && client_key_path.exists() {
74                true => {
75                    let client_cert = std::fs::read_to_string(&client_pem_path)?;
76                    let client_key = std::fs::read_to_string(&client_key_path)?;
77                    let client_identity = Identity::from_pem(client_cert, client_key);
78                    ClientTlsConfig::new()
79                        .ca_certificate(server_root_ca_cert)
80                        .identity(client_identity)
81                }
82                false => ClientTlsConfig::new().ca_certificate(server_root_ca_cert),
83            };
84            Channel::from_shared(addr)?
85                .tls_config(tls)?
86                .connect()
87                .await?
88        } else {
89            // No TLS directory, skip TLS configuration
90            Channel::from_shared(addr)?.connect().await?
91        };
92
93        let client = CdkPaymentProcessorClient::new(channel);
94
95        Ok(Self {
96            inner: client,
97            wait_incoming_payment_stream_is_active: Arc::new(AtomicBool::new(false)),
98            cancel_incoming_payment_listener: CancellationToken::new(),
99        })
100    }
101}
102
103#[async_trait]
104impl MintPayment for PaymentProcessorClient {
105    type Err = cdk_common::payment::Error;
106
107    async fn get_settings(&self) -> Result<cdk_common::payment::SettingsResponse, Self::Err> {
108        let mut inner = self.inner.clone();
109        let response = inner
110            .get_settings(with_version_header(Request::new(EmptyRequest {})))
111            .await
112            .map_err(|err| {
113                tracing::error!("Could not get settings: {}", err);
114                cdk_common::payment::Error::Custom(err.to_string())
115            })?;
116
117        let settings = response.into_inner();
118
119        Ok(cdk_common::payment::SettingsResponse {
120            unit: settings.unit,
121            bolt11: settings
122                .bolt11
123                .map(|b| cdk_common::payment::Bolt11Settings {
124                    mpp: b.mpp,
125                    amountless: b.amountless,
126                    invoice_description: b.invoice_description,
127                }),
128            bolt12: settings
129                .bolt12
130                .map(|b| cdk_common::payment::Bolt12Settings {
131                    amountless: b.amountless,
132                }),
133            custom: settings.custom,
134        })
135    }
136
137    /// Create a new invoice
138    async fn create_incoming_payment_request(
139        &self,
140        unit: &cdk_common::CurrencyUnit,
141        options: CdkIncomingPaymentOptions,
142    ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
143        let mut inner = self.inner.clone();
144
145        let proto_options = match options {
146            CdkIncomingPaymentOptions::Custom(opts) => IncomingPaymentOptions {
147                options: Some(super::incoming_payment_options::Options::Custom(
148                    super::CustomIncomingPaymentOptions {
149                        description: opts.description,
150                        amount: Some(opts.amount.into()),
151                        unix_expiry: opts.unix_expiry,
152                        extra_json: opts.extra_json.clone(),
153                    },
154                )),
155            },
156            CdkIncomingPaymentOptions::Bolt11(opts) => IncomingPaymentOptions {
157                options: Some(super::incoming_payment_options::Options::Bolt11(
158                    super::Bolt11IncomingPaymentOptions {
159                        description: opts.description,
160                        amount: opts.amount.into(),
161                        unix_expiry: opts.unix_expiry,
162                    },
163                )),
164            },
165            CdkIncomingPaymentOptions::Bolt12(opts) => IncomingPaymentOptions {
166                options: Some(super::incoming_payment_options::Options::Bolt12(
167                    super::Bolt12IncomingPaymentOptions {
168                        description: opts.description,
169                        amount: opts.amount.map(Into::into),
170                        unix_expiry: opts.unix_expiry,
171                    },
172                )),
173            },
174        };
175
176        let response = inner
177            .create_payment(with_version_header(Request::new(CreatePaymentRequest {
178                unit: unit.to_string(),
179                options: Some(proto_options),
180            })))
181            .await
182            .map_err(|err| {
183                tracing::error!("Could not create payment request: {}", err);
184                cdk_common::payment::Error::Custom(err.to_string())
185            })?;
186
187        let response = response.into_inner();
188
189        Ok(response.try_into().map_err(|_| {
190            cdk_common::payment::Error::Anyhow(anyhow!("Could not create create payment response"))
191        })?)
192    }
193
194    async fn get_payment_quote(
195        &self,
196        unit: &cdk_common::CurrencyUnit,
197        options: cdk_common::payment::OutgoingPaymentOptions,
198    ) -> Result<CdkPaymentQuoteResponse, Self::Err> {
199        let mut inner = self.inner.clone();
200
201        let request_type = match &options {
202            cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
203                OutgoingPaymentRequestType::Custom
204            }
205            cdk_common::payment::OutgoingPaymentOptions::Bolt11(_) => {
206                OutgoingPaymentRequestType::Bolt11Invoice
207            }
208            cdk_common::payment::OutgoingPaymentOptions::Bolt12(_) => {
209                OutgoingPaymentRequestType::Bolt12Offer
210            }
211        };
212
213        let proto_request = match &options {
214            cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.request.to_string(),
215            cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.bolt11.to_string(),
216            cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.offer.to_string(),
217        };
218
219        let proto_options = match &options {
220            cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.melt_options,
221            cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.melt_options,
222            cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.melt_options,
223        };
224
225        let extra_json = match &options {
226            cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.extra_json.clone(),
227            _ => None,
228        };
229
230        let response = inner
231            .get_payment_quote(with_version_header(Request::new(PaymentQuoteRequest {
232                request: proto_request,
233                unit: unit.to_string(),
234                options: proto_options.map(Into::into),
235                request_type: request_type.into(),
236                extra_json,
237            })))
238            .await
239            .map_err(|err| {
240                tracing::error!("Could not get payment quote: {}", err);
241                cdk_common::payment::Error::Custom(err.to_string())
242            })?;
243
244        let response = response.into_inner();
245
246        Ok(response.into())
247    }
248
249    async fn make_payment(
250        &self,
251        _unit: &cdk_common::CurrencyUnit,
252        options: cdk_common::payment::OutgoingPaymentOptions,
253    ) -> Result<CdkMakePaymentResponse, Self::Err> {
254        let mut inner = self.inner.clone();
255        let payment_options = match options {
256            cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => {
257                super::OutgoingPaymentVariant {
258                    options: Some(super::outgoing_payment_variant::Options::Custom(
259                        super::CustomOutgoingPaymentOptions {
260                            offer: opts.request.to_string(),
261                            max_fee_amount: opts.max_fee_amount.map(Into::into),
262                            timeout_secs: opts.timeout_secs,
263                            melt_options: opts.melt_options.map(Into::into),
264                            extra_json: opts.extra_json.clone(),
265                        },
266                    )),
267                }
268            }
269            cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => {
270                super::OutgoingPaymentVariant {
271                    options: Some(super::outgoing_payment_variant::Options::Bolt11(
272                        super::Bolt11OutgoingPaymentOptions {
273                            bolt11: opts.bolt11.to_string(),
274                            max_fee_amount: opts.max_fee_amount.map(Into::into),
275                            timeout_secs: opts.timeout_secs,
276                            melt_options: opts.melt_options.map(Into::into),
277                        },
278                    )),
279                }
280            }
281            cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => {
282                super::OutgoingPaymentVariant {
283                    options: Some(super::outgoing_payment_variant::Options::Bolt12(
284                        super::Bolt12OutgoingPaymentOptions {
285                            offer: opts.offer.to_string(),
286                            max_fee_amount: opts.max_fee_amount.map(Into::into),
287                            timeout_secs: opts.timeout_secs,
288                            melt_options: opts.melt_options.map(Into::into),
289                        },
290                    )),
291                }
292            }
293        };
294
295        let response = inner
296            .make_payment(with_version_header(Request::new(MakePaymentRequest {
297                payment_options: Some(payment_options),
298                partial_amount: None,
299                max_fee_amount: None,
300            })))
301            .await
302            .map_err(|err| {
303                tracing::error!("Could not pay payment request: {}", err);
304
305                if err.message().contains("already paid") {
306                    cdk_common::payment::Error::InvoiceAlreadyPaid
307                } else if err.message().contains("pending") {
308                    cdk_common::payment::Error::InvoicePaymentPending
309                } else {
310                    cdk_common::payment::Error::Custom(err.to_string())
311                }
312            })?;
313
314        let response = response.into_inner();
315
316        Ok(response.try_into().map_err(|_err| {
317            cdk_common::payment::Error::Anyhow(anyhow!("could not make payment"))
318        })?)
319    }
320
321    #[instrument(skip_all)]
322    async fn wait_payment_event(
323        &self,
324    ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
325        self.wait_incoming_payment_stream_is_active
326            .store(true, Ordering::SeqCst);
327        tracing::debug!("Client waiting for payment");
328        let mut inner = self.inner.clone();
329        let stream = inner
330            .wait_incoming_payment(with_version_header(Request::new(EmptyRequest {})))
331            .await
332            .map_err(|err| {
333                tracing::error!("Could not check incoming payment stream: {}", err);
334                cdk_common::payment::Error::Custom(err.to_string())
335            })?
336            .into_inner();
337
338        let cancel_token = self.cancel_incoming_payment_listener.clone();
339        let cancel_fut = cancel_token.cancelled_owned();
340        let active_flag = self.wait_incoming_payment_stream_is_active.clone();
341
342        let transformed_stream = stream
343            .take_until(cancel_fut)
344            .filter_map(|item| async {
345                match item {
346                    Ok(value) => match value.try_into() {
347                        Ok(payment_response) => Some(cdk_common::payment::Event::PaymentReceived(
348                            payment_response,
349                        )),
350                        Err(e) => {
351                            tracing::error!("Error converting payment response: {}", e);
352                            None
353                        }
354                    },
355                    Err(e) => {
356                        tracing::error!("Error in payment stream: {}", e);
357                        None
358                    }
359                }
360            })
361            .inspect(move |_| {
362                active_flag.store(false, Ordering::SeqCst);
363                tracing::info!("Payment stream inactive");
364            });
365
366        Ok(Box::pin(transformed_stream))
367    }
368
369    /// Is wait invoice active
370    fn is_wait_invoice_active(&self) -> bool {
371        self.wait_incoming_payment_stream_is_active
372            .load(Ordering::SeqCst)
373    }
374
375    /// Cancel wait invoice
376    fn cancel_wait_invoice(&self) {
377        self.cancel_incoming_payment_listener.cancel();
378    }
379
380    async fn check_incoming_payment_status(
381        &self,
382        payment_identifier: &cdk_common::payment::PaymentIdentifier,
383    ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
384        let mut inner = self.inner.clone();
385        let response = inner
386            .check_incoming_payment(with_version_header(Request::new(
387                CheckIncomingPaymentRequest {
388                    request_identifier: Some(payment_identifier.clone().into()),
389                },
390            )))
391            .await
392            .map_err(|err| {
393                tracing::error!("Could not check incoming payment: {}", err);
394                cdk_common::payment::Error::Custom(err.to_string())
395            })?;
396
397        let check_incoming = response.into_inner();
398        check_incoming
399            .payments
400            .into_iter()
401            .map(|resp| resp.try_into().map_err(Self::Err::from))
402            .collect()
403    }
404
405    async fn check_outgoing_payment(
406        &self,
407        payment_identifier: &cdk_common::payment::PaymentIdentifier,
408    ) -> Result<CdkMakePaymentResponse, Self::Err> {
409        let mut inner = self.inner.clone();
410        let response = inner
411            .check_outgoing_payment(with_version_header(Request::new(
412                CheckOutgoingPaymentRequest {
413                    request_identifier: Some(payment_identifier.clone().into()),
414                },
415            )))
416            .await
417            .map_err(|err| {
418                tracing::error!("Could not check outgoing payment: {}", err);
419                cdk_common::payment::Error::Custom(err.to_string())
420            })?;
421
422        let check_outgoing = response.into_inner();
423
424        Ok(check_outgoing
425            .try_into()
426            .map_err(|_| cdk_common::payment::Error::UnknownPaymentState)?)
427    }
428}