1use std::path::PathBuf;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use anyhow::anyhow;
8use cdk_common::grpc::{VersionInterceptor, VERSION_HEADER};
9use cdk_common::payment::{
10 CreateIncomingPaymentResponse, IncomingPaymentOptions as CdkIncomingPaymentOptions,
11 MakePaymentResponse as CdkMakePaymentResponse, MintPayment,
12 PaymentQuoteResponse as CdkPaymentQuoteResponse, WaitPaymentResponse,
13};
14use futures::{Stream, StreamExt};
15use tokio::sync::Mutex;
16use tokio_util::sync::CancellationToken;
17use tonic::codegen::InterceptedService;
18use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
19use tonic::{async_trait, Request};
20use tracing::instrument;
21
22use crate::proto::cdk_payment_processor_client::CdkPaymentProcessorClient;
23use crate::proto::{
24 CheckIncomingPaymentRequest, CheckOutgoingPaymentRequest, CreatePaymentRequest, EmptyRequest,
25 IncomingPaymentOptions, IntoProtoAmount, MakePaymentRequest, OutgoingPaymentRequestType,
26 PaymentQuoteRequest,
27};
28
29#[derive(Clone)]
31pub struct PaymentProcessorClient {
32 inner: CdkPaymentProcessorClient<InterceptedService<Channel, VersionInterceptor>>,
33 payment_event_stream_is_active: Arc<AtomicBool>,
34 cancel_payment_event_stream: Arc<Mutex<CancellationToken>>,
35}
36
37struct ActivePaymentEventStream {
38 inner: Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>,
39 active_flag: Arc<AtomicBool>,
40}
41
42impl ActivePaymentEventStream {
43 fn new(
44 inner: Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>,
45 active_flag: Arc<AtomicBool>,
46 ) -> Self {
47 Self { inner, active_flag }
48 }
49}
50
51impl Drop for ActivePaymentEventStream {
52 fn drop(&mut self) {
53 self.active_flag.store(false, Ordering::SeqCst);
54 tracing::info!("Payment event stream inactive");
55 }
56}
57
58impl Stream for ActivePaymentEventStream {
59 type Item = cdk_common::payment::Event;
60
61 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62 let this = self.get_mut();
63 this.inner.as_mut().poll_next(cx)
64 }
65}
66
67impl std::fmt::Debug for PaymentProcessorClient {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("PaymentProcessorClient")
70 .finish_non_exhaustive()
71 }
72}
73
74impl PaymentProcessorClient {
75 pub async fn new(addr: &str, port: u16, tls_dir: Option<PathBuf>) -> anyhow::Result<Self> {
77 let addr = format!("{addr}:{port}");
78 let channel = if let Some(tls_dir) = tls_dir {
79 let ca_pem_path = tls_dir.join("ca.pem");
83 if !ca_pem_path.exists() {
84 let err_msg = format!("CA certificate file not found: {}", ca_pem_path.display());
85 tracing::error!("{}", err_msg);
86 return Err(anyhow!(err_msg));
87 }
88
89 let client_pem_path = tls_dir.join("client.pem");
91
92 let client_key_path = tls_dir.join("client.key");
94 let server_root_ca_cert = std::fs::read_to_string(&ca_pem_path)?;
96 let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
97 let tls: ClientTlsConfig = match client_pem_path.exists() && client_key_path.exists() {
98 true => {
99 let client_cert = std::fs::read_to_string(&client_pem_path)?;
100 let client_key = std::fs::read_to_string(&client_key_path)?;
101 let client_identity = Identity::from_pem(client_cert, client_key);
102 ClientTlsConfig::new()
103 .ca_certificate(server_root_ca_cert)
104 .identity(client_identity)
105 }
106 false => ClientTlsConfig::new().ca_certificate(server_root_ca_cert),
107 };
108 Channel::from_shared(addr)?
109 .tls_config(tls)?
110 .connect()
111 .await?
112 } else {
113 Channel::from_shared(addr)?.connect().await?
115 };
116
117 let interceptor = VersionInterceptor::new(
118 VERSION_HEADER,
119 cdk_common::PAYMENT_PROCESSOR_PROTOCOL_VERSION,
120 );
121 let client = CdkPaymentProcessorClient::with_interceptor(channel, interceptor);
122
123 Ok(Self {
124 inner: client,
125 payment_event_stream_is_active: Arc::new(AtomicBool::new(false)),
126 cancel_payment_event_stream: Arc::new(Mutex::new(CancellationToken::new())),
127 })
128 }
129}
130
131#[async_trait]
132impl MintPayment for PaymentProcessorClient {
133 type Err = cdk_common::payment::Error;
134
135 async fn get_settings(&self) -> Result<cdk_common::payment::SettingsResponse, Self::Err> {
136 let mut inner = self.inner.clone();
137 let response = inner
138 .get_settings(Request::new(EmptyRequest {}))
139 .await
140 .map_err(|err| {
141 tracing::error!("Could not get settings: {}", err);
142 cdk_common::payment::Error::Custom(err.to_string())
143 })?;
144
145 let settings = response.into_inner();
146
147 Ok(cdk_common::payment::SettingsResponse {
148 unit: settings.unit,
149 bolt11: settings
150 .bolt11
151 .map(|b| cdk_common::payment::Bolt11Settings {
152 mpp: b.mpp,
153 amountless: b.amountless,
154 invoice_description: b.invoice_description,
155 }),
156 bolt12: settings
157 .bolt12
158 .map(|b| cdk_common::payment::Bolt12Settings {
159 amountless: b.amountless,
160 }),
161 onchain: settings
162 .onchain
163 .map(|o| cdk_common::payment::OnchainSettings {
164 confirmations: o.confirmations,
165 min_receive_amount_sat: o.min_receive_amount_sat,
166 min_send_amount_sat: o.min_send_amount_sat,
167 }),
168 custom: settings.custom,
169 })
170 }
171
172 async fn create_incoming_payment_request(
174 &self,
175 options: CdkIncomingPaymentOptions,
176 ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
177 let mut inner = self.inner.clone();
178
179 let proto_options = match options {
180 CdkIncomingPaymentOptions::Custom(opts) => IncomingPaymentOptions {
181 options: Some(super::incoming_payment_options::Options::Custom(
182 super::CustomIncomingPaymentOptions {
183 description: opts.description,
184 amount: Some(opts.amount.into()),
185 unix_expiry: opts.unix_expiry,
186 extra_json: opts.extra_json.clone(),
187 },
188 )),
189 },
190 CdkIncomingPaymentOptions::Bolt11(opts) => IncomingPaymentOptions {
191 options: Some(super::incoming_payment_options::Options::Bolt11(
192 super::Bolt11IncomingPaymentOptions {
193 description: opts.description,
194 amount: Some(opts.amount.into()),
195 unix_expiry: opts.unix_expiry,
196 },
197 )),
198 },
199 CdkIncomingPaymentOptions::Bolt12(opts) => IncomingPaymentOptions {
200 options: Some(super::incoming_payment_options::Options::Bolt12(
201 super::Bolt12IncomingPaymentOptions {
202 description: opts.description,
203 amount: opts.amount.map(Into::into),
204 unix_expiry: opts.unix_expiry,
205 },
206 )),
207 },
208 CdkIncomingPaymentOptions::Onchain(opts) => IncomingPaymentOptions {
209 options: Some(super::incoming_payment_options::Options::Onchain(
210 super::OnchainIncomingPaymentOptions {
211 quote_id: opts.quote_id.to_string(),
212 },
213 )),
214 },
215 };
216
217 let response = inner
218 .create_payment(Request::new(CreatePaymentRequest {
219 options: Some(proto_options),
220 }))
221 .await
222 .map_err(|err| {
223 tracing::error!("Could not create payment request: {}", err);
224 cdk_common::payment::Error::Custom(err.to_string())
225 })?;
226
227 let response = response.into_inner();
228
229 Ok(response.try_into().map_err(|_| {
230 cdk_common::payment::Error::Anyhow(anyhow!("Could not create create payment response"))
231 })?)
232 }
233
234 async fn get_payment_quote(
235 &self,
236 unit: &cdk_common::CurrencyUnit,
237 options: cdk_common::payment::OutgoingPaymentOptions,
238 ) -> Result<CdkPaymentQuoteResponse, Self::Err> {
239 let mut inner = self.inner.clone();
240
241 let request_type = match &options {
242 cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
243 OutgoingPaymentRequestType::Custom
244 }
245 cdk_common::payment::OutgoingPaymentOptions::Bolt11(_) => {
246 OutgoingPaymentRequestType::Bolt11Invoice
247 }
248 cdk_common::payment::OutgoingPaymentOptions::Bolt12(_) => {
249 OutgoingPaymentRequestType::Bolt12Offer
250 }
251 cdk_common::payment::OutgoingPaymentOptions::Onchain(_) => {
252 OutgoingPaymentRequestType::Onchain
253 }
254 };
255
256 let proto_request = match &options {
257 cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.request.to_string(),
258 cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.bolt11.to_string(),
259 cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.offer.to_string(),
260 cdk_common::payment::OutgoingPaymentOptions::Onchain(opts) => opts.address.clone(),
261 };
262
263 let proto_options = match &options {
264 cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.melt_options,
265 cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.melt_options,
266 cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.melt_options,
267 cdk_common::payment::OutgoingPaymentOptions::Onchain(_) => None,
268 };
269
270 let onchain_options = match &options {
271 cdk_common::payment::OutgoingPaymentOptions::Onchain(opts) => {
272 Some(super::OnchainOutgoingPaymentOptions {
273 address: opts.address.clone(),
274 amount: Some(opts.amount.clone().into()),
275 max_fee_amount: opts.max_fee_amount.clone().into_proto(),
276 quote_id: opts.quote_id.to_string(),
277 fee_index: opts.fee_index,
278 metadata: opts.metadata.clone(),
279 })
280 }
281 _ => None,
282 };
283
284 let extra_json = match &options {
285 cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.extra_json.clone(),
286 _ => None,
287 };
288
289 let quote_id = match &options {
290 cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.quote_id.to_string(),
291 cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.quote_id.to_string(),
292 cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.quote_id.to_string(),
293 cdk_common::payment::OutgoingPaymentOptions::Onchain(opts) => opts.quote_id.to_string(),
294 };
295
296 let response = inner
297 .get_payment_quote(Request::new(PaymentQuoteRequest {
298 request: proto_request,
299 unit: unit.to_string(),
300 options: proto_options.map(Into::into),
301 request_type: request_type.into(),
302 extra_json,
303 quote_id,
304 onchain_options,
305 }))
306 .await
307 .map_err(|err| {
308 tracing::error!("Could not get payment quote: {}", err);
309 cdk_common::payment::Error::Custom(err.to_string())
310 })?;
311
312 let response = response.into_inner();
313
314 Ok(response.try_into().map_err(|_| {
315 cdk_common::payment::Error::Custom(
316 "Failed to convert payment quote response".to_string(),
317 )
318 })?)
319 }
320
321 async fn make_payment(
322 &self,
323 unit: &cdk_common::CurrencyUnit,
324 options: cdk_common::payment::OutgoingPaymentOptions,
325 ) -> Result<CdkMakePaymentResponse, Self::Err> {
326 let mut inner = self.inner.clone();
327 let payment_options = match options {
328 cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => {
329 super::OutgoingPaymentVariant {
330 options: Some(super::outgoing_payment_variant::Options::Custom(
331 super::CustomOutgoingPaymentOptions {
332 offer: opts.request.to_string(),
333 max_fee_amount: opts.max_fee_amount.into_proto(),
334 timeout_secs: opts.timeout_secs,
335 melt_options: opts.melt_options.map(Into::into),
336 extra_json: opts.extra_json.clone(),
337 quote_id: opts.quote_id.to_string(),
338 },
339 )),
340 }
341 }
342 cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => {
343 super::OutgoingPaymentVariant {
344 options: Some(super::outgoing_payment_variant::Options::Bolt11(
345 super::Bolt11OutgoingPaymentOptions {
346 bolt11: opts.bolt11.to_string(),
347 max_fee_amount: opts.max_fee_amount.into_proto(),
348 timeout_secs: opts.timeout_secs,
349 melt_options: opts.melt_options.map(Into::into),
350 quote_id: opts.quote_id.to_string(),
351 },
352 )),
353 }
354 }
355 cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => {
356 super::OutgoingPaymentVariant {
357 options: Some(super::outgoing_payment_variant::Options::Bolt12(
358 super::Bolt12OutgoingPaymentOptions {
359 offer: opts.offer.to_string(),
360 max_fee_amount: opts.max_fee_amount.into_proto(),
361 timeout_secs: opts.timeout_secs,
362 melt_options: opts.melt_options.map(Into::into),
363 quote_id: opts.quote_id.to_string(),
364 },
365 )),
366 }
367 }
368 cdk_common::payment::OutgoingPaymentOptions::Onchain(opts) => {
369 super::OutgoingPaymentVariant {
370 options: Some(super::outgoing_payment_variant::Options::Onchain(
371 super::OnchainOutgoingPaymentOptions {
372 address: opts.address.clone(),
373 amount: Some(opts.amount.into()),
374 max_fee_amount: opts.max_fee_amount.into_proto(),
375 quote_id: opts.quote_id.to_string(),
376 fee_index: opts.fee_index,
377 metadata: opts.metadata.clone(),
378 },
379 )),
380 }
381 }
382 };
383
384 let response = inner
385 .make_payment(Request::new(MakePaymentRequest {
386 payment_options: Some(payment_options),
387 partial_amount: None,
388 max_fee_amount: None,
389 unit: unit.to_string(),
390 }))
391 .await
392 .map_err(|err| {
393 tracing::error!("Could not pay payment request: {}", err);
394
395 if err.message().contains("already paid") {
396 cdk_common::payment::Error::InvoiceAlreadyPaid
397 } else if err.message().contains("pending") {
398 cdk_common::payment::Error::InvoicePaymentPending
399 } else {
400 cdk_common::payment::Error::Custom(err.to_string())
401 }
402 })?;
403
404 let response = response.into_inner();
405
406 Ok(response.try_into().map_err(|_err| {
407 cdk_common::payment::Error::Anyhow(anyhow!("could not make payment"))
408 })?)
409 }
410
411 #[instrument(skip_all)]
412 async fn wait_payment_event(
413 &self,
414 ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
415 tracing::debug!("Client waiting for payment");
416 let mut inner = self.inner.clone();
417 let stream = inner
418 .wait_payment_event(Request::new(EmptyRequest {}))
419 .await
420 .map_err(|err| {
421 self.payment_event_stream_is_active
422 .store(false, Ordering::SeqCst);
423 tracing::error!("Could not open payment event stream: {}", err);
424 cdk_common::payment::Error::Custom(err.to_string())
425 })?
426 .into_inner();
427
428 self.payment_event_stream_is_active
429 .store(true, Ordering::SeqCst);
430
431 let cancel_token = self.cancel_payment_event_stream.lock().await.clone();
432 let cancel_fut = cancel_token.cancelled_owned();
433 let active_flag = self.payment_event_stream_is_active.clone();
434
435 let transformed_stream = stream.take_until(cancel_fut).filter_map(|item| async {
436 match item {
437 Ok(value) => match value.try_into() {
438 Ok(payment_event) => Some(payment_event),
439 Err(e) => {
440 tracing::error!("Error converting payment event: {}", e);
441 None
442 }
443 },
444 Err(e) => {
445 tracing::error!("Error in payment event stream: {}", e);
446 None
447 }
448 }
449 });
450
451 Ok(Box::pin(ActivePaymentEventStream::new(
452 Box::pin(transformed_stream),
453 active_flag,
454 )))
455 }
456
457 fn is_payment_event_stream_active(&self) -> bool {
459 self.payment_event_stream_is_active.load(Ordering::SeqCst)
460 }
461
462 fn cancel_payment_event_stream(&self) {
464 let cancel_payment_event_stream = Arc::clone(&self.cancel_payment_event_stream);
465
466 tokio::spawn(async move {
467 let mut cancel_token = cancel_payment_event_stream.lock().await;
468 cancel_token.cancel();
469 *cancel_token = CancellationToken::new();
470 });
471 }
472
473 async fn check_incoming_payment_status(
474 &self,
475 payment_identifier: &cdk_common::payment::PaymentIdentifier,
476 ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
477 let mut inner = self.inner.clone();
478 let response = inner
479 .check_incoming_payment(Request::new(CheckIncomingPaymentRequest {
480 request_identifier: Some(payment_identifier.clone().into()),
481 }))
482 .await
483 .map_err(|err| {
484 tracing::error!("Could not check incoming payment: {}", err);
485 cdk_common::payment::Error::Custom(err.to_string())
486 })?;
487
488 let check_incoming = response.into_inner();
489 check_incoming
490 .payments
491 .into_iter()
492 .map(|resp| resp.try_into().map_err(Self::Err::from))
493 .collect()
494 }
495
496 async fn check_outgoing_payment(
497 &self,
498 payment_identifier: &cdk_common::payment::PaymentIdentifier,
499 ) -> Result<CdkMakePaymentResponse, Self::Err> {
500 let mut inner = self.inner.clone();
501 let response = inner
502 .check_outgoing_payment(Request::new(CheckOutgoingPaymentRequest {
503 request_identifier: Some(payment_identifier.clone().into()),
504 }))
505 .await
506 .map_err(|err| {
507 tracing::error!("Could not check outgoing payment: {}", err);
508 cdk_common::payment::Error::Custom(err.to_string())
509 })?;
510
511 let check_outgoing = response.into_inner();
512
513 Ok(check_outgoing
514 .try_into()
515 .map_err(|_| cdk_common::payment::Error::UnknownPaymentState)?)
516 }
517}