cdk_payment_processor/proto/
client.rs

1use std::path::PathBuf;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use cdk_common::payment::{
8    CreateIncomingPaymentResponse, IncomingPaymentOptions as CdkIncomingPaymentOptions,
9    MakePaymentResponse as CdkMakePaymentResponse, MintPayment,
10    PaymentQuoteResponse as CdkPaymentQuoteResponse, WaitPaymentResponse,
11};
12use futures::{Stream, StreamExt};
13use serde_json::Value;
14use tokio_util::sync::CancellationToken;
15use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
16use tonic::{async_trait, Request};
17use tracing::instrument;
18
19use crate::proto::cdk_payment_processor_client::CdkPaymentProcessorClient;
20use crate::proto::{
21    CheckIncomingPaymentRequest, CheckOutgoingPaymentRequest, CreatePaymentRequest, EmptyRequest,
22    IncomingPaymentOptions, MakePaymentRequest, OutgoingPaymentRequestType, PaymentQuoteRequest,
23};
24
25/// Payment Processor
26#[derive(Clone)]
27pub struct PaymentProcessorClient {
28    inner: CdkPaymentProcessorClient<Channel>,
29    wait_incoming_payment_stream_is_active: Arc<AtomicBool>,
30    cancel_incoming_payment_listener: CancellationToken,
31}
32
33impl PaymentProcessorClient {
34    /// Payment Processor
35    pub async fn new(addr: &str, port: u16, tls_dir: Option<PathBuf>) -> anyhow::Result<Self> {
36        let addr = format!("{addr}:{port}");
37        let channel = if let Some(tls_dir) = tls_dir {
38            // TLS directory exists, configure TLS
39
40            // Check for ca.pem
41            let ca_pem_path = tls_dir.join("ca.pem");
42            if !ca_pem_path.exists() {
43                let err_msg = format!("CA certificate file not found: {}", ca_pem_path.display());
44                tracing::error!("{}", err_msg);
45                return Err(anyhow!(err_msg));
46            }
47
48            // Check for client.pem
49            let client_pem_path = tls_dir.join("client.pem");
50            if !client_pem_path.exists() {
51                let err_msg = format!(
52                    "Client certificate file not found: {}",
53                    client_pem_path.display()
54                );
55                tracing::error!("{}", err_msg);
56                return Err(anyhow!(err_msg));
57            }
58
59            // Check for client.key
60            let client_key_path = tls_dir.join("client.key");
61            if !client_key_path.exists() {
62                let err_msg = format!("Client key file not found: {}", client_key_path.display());
63                tracing::error!("{}", err_msg);
64                return Err(anyhow!(err_msg));
65            }
66
67            let server_root_ca_cert = std::fs::read_to_string(&ca_pem_path)?;
68            let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
69            let client_cert = std::fs::read_to_string(&client_pem_path)?;
70            let client_key = std::fs::read_to_string(&client_key_path)?;
71            let client_identity = Identity::from_pem(client_cert, client_key);
72            let tls = ClientTlsConfig::new()
73                .ca_certificate(server_root_ca_cert)
74                .identity(client_identity);
75
76            Channel::from_shared(addr)?
77                .tls_config(tls)?
78                .connect()
79                .await?
80        } else {
81            // No TLS directory, skip TLS configuration
82            Channel::from_shared(addr)?.connect().await?
83        };
84
85        let client = CdkPaymentProcessorClient::new(channel);
86
87        Ok(Self {
88            inner: client,
89            wait_incoming_payment_stream_is_active: Arc::new(AtomicBool::new(false)),
90            cancel_incoming_payment_listener: CancellationToken::new(),
91        })
92    }
93}
94
95#[async_trait]
96impl MintPayment for PaymentProcessorClient {
97    type Err = cdk_common::payment::Error;
98
99    async fn get_settings(&self) -> Result<Value, Self::Err> {
100        let mut inner = self.inner.clone();
101        let response = inner
102            .get_settings(Request::new(EmptyRequest {}))
103            .await
104            .map_err(|err| {
105                tracing::error!("Could not get settings: {}", err);
106                cdk_common::payment::Error::Custom(err.to_string())
107            })?;
108
109        let settings = response.into_inner();
110
111        Ok(serde_json::from_str(&settings.inner)?)
112    }
113
114    /// Create a new invoice
115    async fn create_incoming_payment_request(
116        &self,
117        unit: &cdk_common::CurrencyUnit,
118        options: CdkIncomingPaymentOptions,
119    ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
120        let mut inner = self.inner.clone();
121
122        let proto_options = match options {
123            CdkIncomingPaymentOptions::Bolt11(opts) => IncomingPaymentOptions {
124                options: Some(super::incoming_payment_options::Options::Bolt11(
125                    super::Bolt11IncomingPaymentOptions {
126                        description: opts.description,
127                        amount: opts.amount.into(),
128                        unix_expiry: opts.unix_expiry,
129                    },
130                )),
131            },
132            CdkIncomingPaymentOptions::Bolt12(opts) => IncomingPaymentOptions {
133                options: Some(super::incoming_payment_options::Options::Bolt12(
134                    super::Bolt12IncomingPaymentOptions {
135                        description: opts.description,
136                        amount: opts.amount.map(Into::into),
137                        unix_expiry: opts.unix_expiry,
138                    },
139                )),
140            },
141        };
142
143        let response = inner
144            .create_payment(Request::new(CreatePaymentRequest {
145                unit: unit.to_string(),
146                options: Some(proto_options),
147            }))
148            .await
149            .map_err(|err| {
150                tracing::error!("Could not create payment request: {}", err);
151                cdk_common::payment::Error::Custom(err.to_string())
152            })?;
153
154        let response = response.into_inner();
155
156        Ok(response.try_into().map_err(|_| {
157            cdk_common::payment::Error::Anyhow(anyhow!("Could not create create payment response"))
158        })?)
159    }
160
161    async fn get_payment_quote(
162        &self,
163        unit: &cdk_common::CurrencyUnit,
164        options: cdk_common::payment::OutgoingPaymentOptions,
165    ) -> Result<CdkPaymentQuoteResponse, Self::Err> {
166        let mut inner = self.inner.clone();
167
168        let request_type = match &options {
169            cdk_common::payment::OutgoingPaymentOptions::Bolt11(_) => {
170                OutgoingPaymentRequestType::Bolt11Invoice
171            }
172            cdk_common::payment::OutgoingPaymentOptions::Bolt12(_) => {
173                OutgoingPaymentRequestType::Bolt12Offer
174            }
175        };
176
177        let proto_request = match &options {
178            cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.bolt11.to_string(),
179            cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.offer.to_string(),
180        };
181
182        let proto_options = match &options {
183            cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.melt_options,
184            cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.melt_options,
185        };
186
187        let response = inner
188            .get_payment_quote(Request::new(PaymentQuoteRequest {
189                request: proto_request,
190                unit: unit.to_string(),
191                options: proto_options.map(Into::into),
192                request_type: request_type.into(),
193            }))
194            .await
195            .map_err(|err| {
196                tracing::error!("Could not get payment quote: {}", err);
197                cdk_common::payment::Error::Custom(err.to_string())
198            })?;
199
200        let response = response.into_inner();
201
202        Ok(response.into())
203    }
204
205    async fn make_payment(
206        &self,
207        _unit: &cdk_common::CurrencyUnit,
208        options: cdk_common::payment::OutgoingPaymentOptions,
209    ) -> Result<CdkMakePaymentResponse, Self::Err> {
210        let mut inner = self.inner.clone();
211
212        let payment_options = match options {
213            cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => {
214                super::OutgoingPaymentVariant {
215                    options: Some(super::outgoing_payment_variant::Options::Bolt11(
216                        super::Bolt11OutgoingPaymentOptions {
217                            bolt11: opts.bolt11.to_string(),
218                            max_fee_amount: opts.max_fee_amount.map(Into::into),
219                            timeout_secs: opts.timeout_secs,
220                            melt_options: opts.melt_options.map(Into::into),
221                        },
222                    )),
223                }
224            }
225            cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => {
226                super::OutgoingPaymentVariant {
227                    options: Some(super::outgoing_payment_variant::Options::Bolt12(
228                        super::Bolt12OutgoingPaymentOptions {
229                            offer: opts.offer.to_string(),
230                            max_fee_amount: opts.max_fee_amount.map(Into::into),
231                            timeout_secs: opts.timeout_secs,
232                            melt_options: opts.melt_options.map(Into::into),
233                        },
234                    )),
235                }
236            }
237        };
238
239        let response = inner
240            .make_payment(Request::new(MakePaymentRequest {
241                payment_options: Some(payment_options),
242                partial_amount: None,
243                max_fee_amount: None,
244            }))
245            .await
246            .map_err(|err| {
247                tracing::error!("Could not pay payment request: {}", err);
248
249                if err.message().contains("already paid") {
250                    cdk_common::payment::Error::InvoiceAlreadyPaid
251                } else if err.message().contains("pending") {
252                    cdk_common::payment::Error::InvoicePaymentPending
253                } else {
254                    cdk_common::payment::Error::Custom(err.to_string())
255                }
256            })?;
257
258        let response = response.into_inner();
259
260        Ok(response.try_into().map_err(|_err| {
261            cdk_common::payment::Error::Anyhow(anyhow!("could not make payment"))
262        })?)
263    }
264
265    #[instrument(skip_all)]
266    async fn wait_payment_event(
267        &self,
268    ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
269        self.wait_incoming_payment_stream_is_active
270            .store(true, Ordering::SeqCst);
271        tracing::debug!("Client waiting for payment");
272        let mut inner = self.inner.clone();
273        let stream = inner
274            .wait_incoming_payment(EmptyRequest {})
275            .await
276            .map_err(|err| {
277                tracing::error!("Could not check incoming payment stream: {}", err);
278                cdk_common::payment::Error::Custom(err.to_string())
279            })?
280            .into_inner();
281
282        let cancel_token = self.cancel_incoming_payment_listener.clone();
283        let cancel_fut = cancel_token.cancelled_owned();
284        let active_flag = self.wait_incoming_payment_stream_is_active.clone();
285
286        let transformed_stream = stream
287            .take_until(cancel_fut)
288            .filter_map(|item| async {
289                match item {
290                    Ok(value) => match value.try_into() {
291                        Ok(payment_response) => Some(cdk_common::payment::Event::PaymentReceived(
292                            payment_response,
293                        )),
294                        Err(e) => {
295                            tracing::error!("Error converting payment response: {}", e);
296                            None
297                        }
298                    },
299                    Err(e) => {
300                        tracing::error!("Error in payment stream: {}", e);
301                        None
302                    }
303                }
304            })
305            .inspect(move |_| {
306                active_flag.store(false, Ordering::SeqCst);
307                tracing::info!("Payment stream inactive");
308            });
309
310        Ok(Box::pin(transformed_stream))
311    }
312
313    /// Is wait invoice active
314    fn is_wait_invoice_active(&self) -> bool {
315        self.wait_incoming_payment_stream_is_active
316            .load(Ordering::SeqCst)
317    }
318
319    /// Cancel wait invoice
320    fn cancel_wait_invoice(&self) {
321        self.cancel_incoming_payment_listener.cancel();
322    }
323
324    async fn check_incoming_payment_status(
325        &self,
326        payment_identifier: &cdk_common::payment::PaymentIdentifier,
327    ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
328        let mut inner = self.inner.clone();
329        let response = inner
330            .check_incoming_payment(Request::new(CheckIncomingPaymentRequest {
331                request_identifier: Some(payment_identifier.clone().into()),
332            }))
333            .await
334            .map_err(|err| {
335                tracing::error!("Could not check incoming payment: {}", err);
336                cdk_common::payment::Error::Custom(err.to_string())
337            })?;
338
339        let check_incoming = response.into_inner();
340        check_incoming
341            .payments
342            .into_iter()
343            .map(|resp| resp.try_into().map_err(Self::Err::from))
344            .collect()
345    }
346
347    async fn check_outgoing_payment(
348        &self,
349        payment_identifier: &cdk_common::payment::PaymentIdentifier,
350    ) -> Result<CdkMakePaymentResponse, Self::Err> {
351        let mut inner = self.inner.clone();
352        let response = inner
353            .check_outgoing_payment(Request::new(CheckOutgoingPaymentRequest {
354                request_identifier: Some(payment_identifier.clone().into()),
355            }))
356            .await
357            .map_err(|err| {
358                tracing::error!("Could not check outgoing payment: {}", err);
359                cdk_common::payment::Error::Custom(err.to_string())
360            })?;
361
362        let check_outgoing = response.into_inner();
363
364        Ok(check_outgoing
365            .try_into()
366            .map_err(|_| cdk_common::payment::Error::UnknownPaymentState)?)
367    }
368}