cdk_payment_processor/proto/
client.rs1use std::path::PathBuf;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use cdk_common::grpc::{VersionInterceptor, VERSION_HEADER};
8use cdk_common::payment::{
9 CreateIncomingPaymentResponse, IncomingPaymentOptions as CdkIncomingPaymentOptions,
10 MakePaymentResponse as CdkMakePaymentResponse, MintPayment,
11 PaymentQuoteResponse as CdkPaymentQuoteResponse, WaitPaymentResponse,
12};
13use futures::{Stream, StreamExt};
14use tokio_util::sync::CancellationToken;
15use tonic::codegen::InterceptedService;
16use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
17use tonic::{async_trait, Request};
18use tracing::instrument;
19
20use crate::proto::cdk_payment_processor_client::CdkPaymentProcessorClient;
21use crate::proto::{
22 CheckIncomingPaymentRequest, CheckOutgoingPaymentRequest, CreatePaymentRequest, EmptyRequest,
23 IncomingPaymentOptions, IntoProtoAmount, MakePaymentRequest, OutgoingPaymentRequestType,
24 PaymentQuoteRequest,
25};
26
27#[derive(Clone)]
29pub struct PaymentProcessorClient {
30 inner: CdkPaymentProcessorClient<InterceptedService<Channel, VersionInterceptor>>,
31 wait_incoming_payment_stream_is_active: Arc<AtomicBool>,
32 cancel_incoming_payment_listener: CancellationToken,
33}
34
35impl std::fmt::Debug for PaymentProcessorClient {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_struct("PaymentProcessorClient")
38 .finish_non_exhaustive()
39 }
40}
41
42impl PaymentProcessorClient {
43 pub async fn new(addr: &str, port: u16, tls_dir: Option<PathBuf>) -> anyhow::Result<Self> {
45 let addr = format!("{addr}:{port}");
46 let channel = if let Some(tls_dir) = tls_dir {
47 let ca_pem_path = tls_dir.join("ca.pem");
51 if !ca_pem_path.exists() {
52 let err_msg = format!("CA certificate file not found: {}", ca_pem_path.display());
53 tracing::error!("{}", err_msg);
54 return Err(anyhow!(err_msg));
55 }
56
57 let client_pem_path = tls_dir.join("client.pem");
59
60 let client_key_path = tls_dir.join("client.key");
62 let server_root_ca_cert = std::fs::read_to_string(&ca_pem_path)?;
64 let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
65 let tls: ClientTlsConfig = match client_pem_path.exists() && client_key_path.exists() {
66 true => {
67 let client_cert = std::fs::read_to_string(&client_pem_path)?;
68 let client_key = std::fs::read_to_string(&client_key_path)?;
69 let client_identity = Identity::from_pem(client_cert, client_key);
70 ClientTlsConfig::new()
71 .ca_certificate(server_root_ca_cert)
72 .identity(client_identity)
73 }
74 false => ClientTlsConfig::new().ca_certificate(server_root_ca_cert),
75 };
76 Channel::from_shared(addr)?
77 .tls_config(tls)?
78 .connect()
79 .await?
80 } else {
81 Channel::from_shared(addr)?.connect().await?
83 };
84
85 let interceptor = VersionInterceptor::new(
86 VERSION_HEADER,
87 cdk_common::PAYMENT_PROCESSOR_PROTOCOL_VERSION,
88 );
89 let client = CdkPaymentProcessorClient::with_interceptor(channel, interceptor);
90
91 Ok(Self {
92 inner: client,
93 wait_incoming_payment_stream_is_active: Arc::new(AtomicBool::new(false)),
94 cancel_incoming_payment_listener: CancellationToken::new(),
95 })
96 }
97}
98
99#[async_trait]
100impl MintPayment for PaymentProcessorClient {
101 type Err = cdk_common::payment::Error;
102
103 async fn get_settings(&self) -> Result<cdk_common::payment::SettingsResponse, Self::Err> {
104 let mut inner = self.inner.clone();
105 let response = inner
106 .get_settings(Request::new(EmptyRequest {}))
107 .await
108 .map_err(|err| {
109 tracing::error!("Could not get settings: {}", err);
110 cdk_common::payment::Error::Custom(err.to_string())
111 })?;
112
113 let settings = response.into_inner();
114
115 Ok(cdk_common::payment::SettingsResponse {
116 unit: settings.unit,
117 bolt11: settings
118 .bolt11
119 .map(|b| cdk_common::payment::Bolt11Settings {
120 mpp: b.mpp,
121 amountless: b.amountless,
122 invoice_description: b.invoice_description,
123 }),
124 bolt12: settings
125 .bolt12
126 .map(|b| cdk_common::payment::Bolt12Settings {
127 amountless: b.amountless,
128 }),
129 custom: settings.custom,
130 })
131 }
132
133 async fn create_incoming_payment_request(
135 &self,
136 options: CdkIncomingPaymentOptions,
137 ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
138 let mut inner = self.inner.clone();
139
140 let proto_options = match options {
141 CdkIncomingPaymentOptions::Custom(opts) => IncomingPaymentOptions {
142 options: Some(super::incoming_payment_options::Options::Custom(
143 super::CustomIncomingPaymentOptions {
144 description: opts.description,
145 amount: Some(opts.amount.into()),
146 unix_expiry: opts.unix_expiry,
147 extra_json: opts.extra_json.clone(),
148 },
149 )),
150 },
151 CdkIncomingPaymentOptions::Bolt11(opts) => IncomingPaymentOptions {
152 options: Some(super::incoming_payment_options::Options::Bolt11(
153 super::Bolt11IncomingPaymentOptions {
154 description: opts.description,
155 amount: Some(opts.amount.into()),
156 unix_expiry: opts.unix_expiry,
157 },
158 )),
159 },
160 CdkIncomingPaymentOptions::Bolt12(opts) => IncomingPaymentOptions {
161 options: Some(super::incoming_payment_options::Options::Bolt12(
162 super::Bolt12IncomingPaymentOptions {
163 description: opts.description,
164 amount: opts.amount.map(Into::into),
165 unix_expiry: opts.unix_expiry,
166 },
167 )),
168 },
169 };
170
171 let response = inner
172 .create_payment(Request::new(CreatePaymentRequest {
173 options: Some(proto_options),
174 }))
175 .await
176 .map_err(|err| {
177 tracing::error!("Could not create payment request: {}", err);
178 cdk_common::payment::Error::Custom(err.to_string())
179 })?;
180
181 let response = response.into_inner();
182
183 Ok(response.try_into().map_err(|_| {
184 cdk_common::payment::Error::Anyhow(anyhow!("Could not create create payment response"))
185 })?)
186 }
187
188 async fn get_payment_quote(
189 &self,
190 unit: &cdk_common::CurrencyUnit,
191 options: cdk_common::payment::OutgoingPaymentOptions,
192 ) -> Result<CdkPaymentQuoteResponse, Self::Err> {
193 let mut inner = self.inner.clone();
194
195 let request_type = match &options {
196 cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
197 OutgoingPaymentRequestType::Custom
198 }
199 cdk_common::payment::OutgoingPaymentOptions::Bolt11(_) => {
200 OutgoingPaymentRequestType::Bolt11Invoice
201 }
202 cdk_common::payment::OutgoingPaymentOptions::Bolt12(_) => {
203 OutgoingPaymentRequestType::Bolt12Offer
204 }
205 };
206
207 let proto_request = match &options {
208 cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.request.to_string(),
209 cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.bolt11.to_string(),
210 cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.offer.to_string(),
211 };
212
213 let proto_options = match &options {
214 cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.melt_options,
215 cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.melt_options,
216 cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.melt_options,
217 };
218
219 let extra_json = match &options {
220 cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.extra_json.clone(),
221 _ => None,
222 };
223
224 let response = inner
225 .get_payment_quote(Request::new(PaymentQuoteRequest {
226 request: proto_request,
227 unit: unit.to_string(),
228 options: proto_options.map(Into::into),
229 request_type: request_type.into(),
230 extra_json,
231 }))
232 .await
233 .map_err(|err| {
234 tracing::error!("Could not get payment quote: {}", err);
235 cdk_common::payment::Error::Custom(err.to_string())
236 })?;
237
238 let response = response.into_inner();
239
240 Ok(response.try_into().map_err(|_| {
241 cdk_common::payment::Error::Custom(
242 "Failed to convert payment quote response".to_string(),
243 )
244 })?)
245 }
246
247 async fn make_payment(
248 &self,
249 unit: &cdk_common::CurrencyUnit,
250 options: cdk_common::payment::OutgoingPaymentOptions,
251 ) -> Result<CdkMakePaymentResponse, Self::Err> {
252 let mut inner = self.inner.clone();
253 let payment_options = match options {
254 cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => {
255 super::OutgoingPaymentVariant {
256 options: Some(super::outgoing_payment_variant::Options::Custom(
257 super::CustomOutgoingPaymentOptions {
258 offer: opts.request.to_string(),
259 max_fee_amount: opts.max_fee_amount.into_proto(),
260 timeout_secs: opts.timeout_secs,
261 melt_options: opts.melt_options.map(Into::into),
262 extra_json: opts.extra_json.clone(),
263 },
264 )),
265 }
266 }
267 cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => {
268 super::OutgoingPaymentVariant {
269 options: Some(super::outgoing_payment_variant::Options::Bolt11(
270 super::Bolt11OutgoingPaymentOptions {
271 bolt11: opts.bolt11.to_string(),
272 max_fee_amount: opts.max_fee_amount.into_proto(),
273 timeout_secs: opts.timeout_secs,
274 melt_options: opts.melt_options.map(Into::into),
275 },
276 )),
277 }
278 }
279 cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => {
280 super::OutgoingPaymentVariant {
281 options: Some(super::outgoing_payment_variant::Options::Bolt12(
282 super::Bolt12OutgoingPaymentOptions {
283 offer: opts.offer.to_string(),
284 max_fee_amount: opts.max_fee_amount.into_proto(),
285 timeout_secs: opts.timeout_secs,
286 melt_options: opts.melt_options.map(Into::into),
287 },
288 )),
289 }
290 }
291 };
292
293 let response = inner
294 .make_payment(Request::new(MakePaymentRequest {
295 payment_options: Some(payment_options),
296 partial_amount: None,
297 max_fee_amount: None,
298 unit: unit.to_string(),
299 }))
300 .await
301 .map_err(|err| {
302 tracing::error!("Could not pay payment request: {}", err);
303
304 if err.message().contains("already paid") {
305 cdk_common::payment::Error::InvoiceAlreadyPaid
306 } else if err.message().contains("pending") {
307 cdk_common::payment::Error::InvoicePaymentPending
308 } else {
309 cdk_common::payment::Error::Custom(err.to_string())
310 }
311 })?;
312
313 let response = response.into_inner();
314
315 Ok(response.try_into().map_err(|_err| {
316 cdk_common::payment::Error::Anyhow(anyhow!("could not make payment"))
317 })?)
318 }
319
320 #[instrument(skip_all)]
321 async fn wait_payment_event(
322 &self,
323 ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
324 self.wait_incoming_payment_stream_is_active
325 .store(true, Ordering::SeqCst);
326 tracing::debug!("Client waiting for payment");
327 let mut inner = self.inner.clone();
328 let stream = inner
329 .wait_incoming_payment(Request::new(EmptyRequest {}))
330 .await
331 .map_err(|err| {
332 tracing::error!("Could not check incoming payment stream: {}", err);
333 cdk_common::payment::Error::Custom(err.to_string())
334 })?
335 .into_inner();
336
337 let cancel_token = self.cancel_incoming_payment_listener.clone();
338 let cancel_fut = cancel_token.cancelled_owned();
339 let active_flag = self.wait_incoming_payment_stream_is_active.clone();
340
341 let transformed_stream = stream
342 .take_until(cancel_fut)
343 .filter_map(|item| async {
344 match item {
345 Ok(value) => match value.try_into() {
346 Ok(payment_response) => Some(cdk_common::payment::Event::PaymentReceived(
347 payment_response,
348 )),
349 Err(e) => {
350 tracing::error!("Error converting payment response: {}", e);
351 None
352 }
353 },
354 Err(e) => {
355 tracing::error!("Error in payment stream: {}", e);
356 None
357 }
358 }
359 })
360 .inspect(move |_| {
361 active_flag.store(false, Ordering::SeqCst);
362 tracing::info!("Payment stream inactive");
363 });
364
365 Ok(Box::pin(transformed_stream))
366 }
367
368 fn is_wait_invoice_active(&self) -> bool {
370 self.wait_incoming_payment_stream_is_active
371 .load(Ordering::SeqCst)
372 }
373
374 fn cancel_wait_invoice(&self) {
376 self.cancel_incoming_payment_listener.cancel();
377 }
378
379 async fn check_incoming_payment_status(
380 &self,
381 payment_identifier: &cdk_common::payment::PaymentIdentifier,
382 ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
383 let mut inner = self.inner.clone();
384 let response = inner
385 .check_incoming_payment(Request::new(CheckIncomingPaymentRequest {
386 request_identifier: Some(payment_identifier.clone().into()),
387 }))
388 .await
389 .map_err(|err| {
390 tracing::error!("Could not check incoming payment: {}", err);
391 cdk_common::payment::Error::Custom(err.to_string())
392 })?;
393
394 let check_incoming = response.into_inner();
395 check_incoming
396 .payments
397 .into_iter()
398 .map(|resp| resp.try_into().map_err(Self::Err::from))
399 .collect()
400 }
401
402 async fn check_outgoing_payment(
403 &self,
404 payment_identifier: &cdk_common::payment::PaymentIdentifier,
405 ) -> Result<CdkMakePaymentResponse, Self::Err> {
406 let mut inner = self.inner.clone();
407 let response = inner
408 .check_outgoing_payment(Request::new(CheckOutgoingPaymentRequest {
409 request_identifier: Some(payment_identifier.clone().into()),
410 }))
411 .await
412 .map_err(|err| {
413 tracing::error!("Could not check outgoing payment: {}", err);
414 cdk_common::payment::Error::Custom(err.to_string())
415 })?;
416
417 let check_outgoing = response.into_inner();
418
419 Ok(check_outgoing
420 .try_into()
421 .map_err(|_| cdk_common::payment::Error::UnknownPaymentState)?)
422 }
423}