cdk_payment_processor/proto/
client.rs

1use std::path::PathBuf;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use cdk_common::payment::{
8    CreateIncomingPaymentResponse, IncomingPaymentOptions as CdkIncomingPaymentOptions,
9    MakePaymentResponse as CdkMakePaymentResponse, MintPayment,
10    PaymentQuoteResponse as CdkPaymentQuoteResponse, WaitPaymentResponse,
11};
12use futures::{Stream, StreamExt};
13use serde_json::Value;
14use tokio_util::sync::CancellationToken;
15use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
16use tonic::{async_trait, Request};
17use tracing::instrument;
18
19use crate::proto::cdk_payment_processor_client::CdkPaymentProcessorClient;
20use crate::proto::{
21    CheckIncomingPaymentRequest, CheckOutgoingPaymentRequest, CreatePaymentRequest, EmptyRequest,
22    IncomingPaymentOptions, MakePaymentRequest, OutgoingPaymentRequestType, PaymentQuoteRequest,
23};
24
25/// Payment Processor
26#[derive(Clone)]
27pub struct PaymentProcessorClient {
28    inner: CdkPaymentProcessorClient<Channel>,
29    wait_incoming_payment_stream_is_active: Arc<AtomicBool>,
30    cancel_incoming_payment_listener: CancellationToken,
31}
32
33impl PaymentProcessorClient {
34    /// Payment Processor
35    pub async fn new(addr: &str, port: u16, tls_dir: Option<PathBuf>) -> anyhow::Result<Self> {
36        let addr = format!("{addr}:{port}");
37        let channel = if let Some(tls_dir) = tls_dir {
38            // TLS directory exists, configure TLS
39
40            // Check for ca.pem
41            let ca_pem_path = tls_dir.join("ca.pem");
42            if !ca_pem_path.exists() {
43                let err_msg = format!("CA certificate file not found: {}", ca_pem_path.display());
44                tracing::error!("{}", err_msg);
45                return Err(anyhow!(err_msg));
46            }
47
48            // Check for client.pem
49            let client_pem_path = tls_dir.join("client.pem");
50
51            // Check for client.key
52            let client_key_path = tls_dir.join("client.key");
53            // check for ca cert
54            let server_root_ca_cert = std::fs::read_to_string(&ca_pem_path)?;
55            let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
56            let tls: ClientTlsConfig = match client_pem_path.exists() && client_key_path.exists() {
57                true => {
58                    let client_cert = std::fs::read_to_string(&client_pem_path)?;
59                    let client_key = std::fs::read_to_string(&client_key_path)?;
60                    let client_identity = Identity::from_pem(client_cert, client_key);
61                    ClientTlsConfig::new()
62                        .ca_certificate(server_root_ca_cert)
63                        .identity(client_identity)
64                }
65                false => ClientTlsConfig::new().ca_certificate(server_root_ca_cert),
66            };
67            Channel::from_shared(addr)?
68                .tls_config(tls)?
69                .connect()
70                .await?
71        } else {
72            // No TLS directory, skip TLS configuration
73            Channel::from_shared(addr)?.connect().await?
74        };
75
76        let client = CdkPaymentProcessorClient::new(channel);
77
78        Ok(Self {
79            inner: client,
80            wait_incoming_payment_stream_is_active: Arc::new(AtomicBool::new(false)),
81            cancel_incoming_payment_listener: CancellationToken::new(),
82        })
83    }
84}
85
86#[async_trait]
87impl MintPayment for PaymentProcessorClient {
88    type Err = cdk_common::payment::Error;
89
90    async fn get_settings(&self) -> Result<Value, Self::Err> {
91        let mut inner = self.inner.clone();
92        let response = inner
93            .get_settings(Request::new(EmptyRequest {}))
94            .await
95            .map_err(|err| {
96                tracing::error!("Could not get settings: {}", err);
97                cdk_common::payment::Error::Custom(err.to_string())
98            })?;
99
100        let settings = response.into_inner();
101
102        Ok(serde_json::from_str(&settings.inner)?)
103    }
104
105    /// Create a new invoice
106    async fn create_incoming_payment_request(
107        &self,
108        unit: &cdk_common::CurrencyUnit,
109        options: CdkIncomingPaymentOptions,
110    ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
111        let mut inner = self.inner.clone();
112
113        let proto_options = match options {
114            CdkIncomingPaymentOptions::Bolt11(opts) => IncomingPaymentOptions {
115                options: Some(super::incoming_payment_options::Options::Bolt11(
116                    super::Bolt11IncomingPaymentOptions {
117                        description: opts.description,
118                        amount: opts.amount.into(),
119                        unix_expiry: opts.unix_expiry,
120                    },
121                )),
122            },
123            CdkIncomingPaymentOptions::Bolt12(opts) => IncomingPaymentOptions {
124                options: Some(super::incoming_payment_options::Options::Bolt12(
125                    super::Bolt12IncomingPaymentOptions {
126                        description: opts.description,
127                        amount: opts.amount.map(Into::into),
128                        unix_expiry: opts.unix_expiry,
129                    },
130                )),
131            },
132        };
133
134        let response = inner
135            .create_payment(Request::new(CreatePaymentRequest {
136                unit: unit.to_string(),
137                options: Some(proto_options),
138            }))
139            .await
140            .map_err(|err| {
141                tracing::error!("Could not create payment request: {}", err);
142                cdk_common::payment::Error::Custom(err.to_string())
143            })?;
144
145        let response = response.into_inner();
146
147        Ok(response.try_into().map_err(|_| {
148            cdk_common::payment::Error::Anyhow(anyhow!("Could not create create payment response"))
149        })?)
150    }
151
152    async fn get_payment_quote(
153        &self,
154        unit: &cdk_common::CurrencyUnit,
155        options: cdk_common::payment::OutgoingPaymentOptions,
156    ) -> Result<CdkPaymentQuoteResponse, Self::Err> {
157        let mut inner = self.inner.clone();
158
159        let request_type = match &options {
160            cdk_common::payment::OutgoingPaymentOptions::Bolt11(_) => {
161                OutgoingPaymentRequestType::Bolt11Invoice
162            }
163            cdk_common::payment::OutgoingPaymentOptions::Bolt12(_) => {
164                OutgoingPaymentRequestType::Bolt12Offer
165            }
166        };
167
168        let proto_request = match &options {
169            cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.bolt11.to_string(),
170            cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.offer.to_string(),
171        };
172
173        let proto_options = match &options {
174            cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.melt_options,
175            cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.melt_options,
176        };
177
178        let response = inner
179            .get_payment_quote(Request::new(PaymentQuoteRequest {
180                request: proto_request,
181                unit: unit.to_string(),
182                options: proto_options.map(Into::into),
183                request_type: request_type.into(),
184            }))
185            .await
186            .map_err(|err| {
187                tracing::error!("Could not get payment quote: {}", err);
188                cdk_common::payment::Error::Custom(err.to_string())
189            })?;
190
191        let response = response.into_inner();
192
193        Ok(response.into())
194    }
195
196    async fn make_payment(
197        &self,
198        _unit: &cdk_common::CurrencyUnit,
199        options: cdk_common::payment::OutgoingPaymentOptions,
200    ) -> Result<CdkMakePaymentResponse, Self::Err> {
201        let mut inner = self.inner.clone();
202
203        let payment_options = match options {
204            cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => {
205                super::OutgoingPaymentVariant {
206                    options: Some(super::outgoing_payment_variant::Options::Bolt11(
207                        super::Bolt11OutgoingPaymentOptions {
208                            bolt11: opts.bolt11.to_string(),
209                            max_fee_amount: opts.max_fee_amount.map(Into::into),
210                            timeout_secs: opts.timeout_secs,
211                            melt_options: opts.melt_options.map(Into::into),
212                        },
213                    )),
214                }
215            }
216            cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => {
217                super::OutgoingPaymentVariant {
218                    options: Some(super::outgoing_payment_variant::Options::Bolt12(
219                        super::Bolt12OutgoingPaymentOptions {
220                            offer: opts.offer.to_string(),
221                            max_fee_amount: opts.max_fee_amount.map(Into::into),
222                            timeout_secs: opts.timeout_secs,
223                            melt_options: opts.melt_options.map(Into::into),
224                        },
225                    )),
226                }
227            }
228        };
229
230        let response = inner
231            .make_payment(Request::new(MakePaymentRequest {
232                payment_options: Some(payment_options),
233                partial_amount: None,
234                max_fee_amount: None,
235            }))
236            .await
237            .map_err(|err| {
238                tracing::error!("Could not pay payment request: {}", err);
239
240                if err.message().contains("already paid") {
241                    cdk_common::payment::Error::InvoiceAlreadyPaid
242                } else if err.message().contains("pending") {
243                    cdk_common::payment::Error::InvoicePaymentPending
244                } else {
245                    cdk_common::payment::Error::Custom(err.to_string())
246                }
247            })?;
248
249        let response = response.into_inner();
250
251        Ok(response.try_into().map_err(|_err| {
252            cdk_common::payment::Error::Anyhow(anyhow!("could not make payment"))
253        })?)
254    }
255
256    #[instrument(skip_all)]
257    async fn wait_payment_event(
258        &self,
259    ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
260        self.wait_incoming_payment_stream_is_active
261            .store(true, Ordering::SeqCst);
262        tracing::debug!("Client waiting for payment");
263        let mut inner = self.inner.clone();
264        let stream = inner
265            .wait_incoming_payment(EmptyRequest {})
266            .await
267            .map_err(|err| {
268                tracing::error!("Could not check incoming payment stream: {}", err);
269                cdk_common::payment::Error::Custom(err.to_string())
270            })?
271            .into_inner();
272
273        let cancel_token = self.cancel_incoming_payment_listener.clone();
274        let cancel_fut = cancel_token.cancelled_owned();
275        let active_flag = self.wait_incoming_payment_stream_is_active.clone();
276
277        let transformed_stream = stream
278            .take_until(cancel_fut)
279            .filter_map(|item| async {
280                match item {
281                    Ok(value) => match value.try_into() {
282                        Ok(payment_response) => Some(cdk_common::payment::Event::PaymentReceived(
283                            payment_response,
284                        )),
285                        Err(e) => {
286                            tracing::error!("Error converting payment response: {}", e);
287                            None
288                        }
289                    },
290                    Err(e) => {
291                        tracing::error!("Error in payment stream: {}", e);
292                        None
293                    }
294                }
295            })
296            .inspect(move |_| {
297                active_flag.store(false, Ordering::SeqCst);
298                tracing::info!("Payment stream inactive");
299            });
300
301        Ok(Box::pin(transformed_stream))
302    }
303
304    /// Is wait invoice active
305    fn is_wait_invoice_active(&self) -> bool {
306        self.wait_incoming_payment_stream_is_active
307            .load(Ordering::SeqCst)
308    }
309
310    /// Cancel wait invoice
311    fn cancel_wait_invoice(&self) {
312        self.cancel_incoming_payment_listener.cancel();
313    }
314
315    async fn check_incoming_payment_status(
316        &self,
317        payment_identifier: &cdk_common::payment::PaymentIdentifier,
318    ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
319        let mut inner = self.inner.clone();
320        let response = inner
321            .check_incoming_payment(Request::new(CheckIncomingPaymentRequest {
322                request_identifier: Some(payment_identifier.clone().into()),
323            }))
324            .await
325            .map_err(|err| {
326                tracing::error!("Could not check incoming payment: {}", err);
327                cdk_common::payment::Error::Custom(err.to_string())
328            })?;
329
330        let check_incoming = response.into_inner();
331        check_incoming
332            .payments
333            .into_iter()
334            .map(|resp| resp.try_into().map_err(Self::Err::from))
335            .collect()
336    }
337
338    async fn check_outgoing_payment(
339        &self,
340        payment_identifier: &cdk_common::payment::PaymentIdentifier,
341    ) -> Result<CdkMakePaymentResponse, Self::Err> {
342        let mut inner = self.inner.clone();
343        let response = inner
344            .check_outgoing_payment(Request::new(CheckOutgoingPaymentRequest {
345                request_identifier: Some(payment_identifier.clone().into()),
346            }))
347            .await
348            .map_err(|err| {
349                tracing::error!("Could not check outgoing payment: {}", err);
350                cdk_common::payment::Error::Custom(err.to_string())
351            })?;
352
353        let check_outgoing = response.into_inner();
354
355        Ok(check_outgoing
356            .try_into()
357            .map_err(|_| cdk_common::payment::Error::UnknownPaymentState)?)
358    }
359}