cdk_payment_processor/proto/
client.rs1use std::path::PathBuf;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use cdk_common::grpc::VERSION_HEADER;
8use cdk_common::payment::{
9 CreateIncomingPaymentResponse, IncomingPaymentOptions as CdkIncomingPaymentOptions,
10 MakePaymentResponse as CdkMakePaymentResponse, MintPayment,
11 PaymentQuoteResponse as CdkPaymentQuoteResponse, WaitPaymentResponse,
12};
13use futures::{Stream, StreamExt};
14use tokio_util::sync::CancellationToken;
15use tonic::metadata::MetadataValue;
16use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
17use tonic::{async_trait, Request};
18use tracing::instrument;
19
20use crate::proto::cdk_payment_processor_client::CdkPaymentProcessorClient;
21use crate::proto::{
22 CheckIncomingPaymentRequest, CheckOutgoingPaymentRequest, CreatePaymentRequest, EmptyRequest,
23 IncomingPaymentOptions, MakePaymentRequest, OutgoingPaymentRequestType, PaymentQuoteRequest,
24};
25
26fn with_version_header<T>(mut request: Request<T>) -> Request<T> {
28 request.metadata_mut().insert(
29 VERSION_HEADER,
30 MetadataValue::from_static(cdk_common::PAYMENT_PROCESSOR_PROTOCOL_VERSION),
31 );
32 request
33}
34
35#[derive(Clone)]
37pub struct PaymentProcessorClient {
38 inner: CdkPaymentProcessorClient<Channel>,
39 wait_incoming_payment_stream_is_active: Arc<AtomicBool>,
40 cancel_incoming_payment_listener: CancellationToken,
41}
42
43impl std::fmt::Debug for PaymentProcessorClient {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 f.debug_struct("PaymentProcessorClient")
46 .finish_non_exhaustive()
47 }
48}
49
50impl PaymentProcessorClient {
51 pub async fn new(addr: &str, port: u16, tls_dir: Option<PathBuf>) -> anyhow::Result<Self> {
53 let addr = format!("{addr}:{port}");
54 let channel = if let Some(tls_dir) = tls_dir {
55 let ca_pem_path = tls_dir.join("ca.pem");
59 if !ca_pem_path.exists() {
60 let err_msg = format!("CA certificate file not found: {}", ca_pem_path.display());
61 tracing::error!("{}", err_msg);
62 return Err(anyhow!(err_msg));
63 }
64
65 let client_pem_path = tls_dir.join("client.pem");
67
68 let client_key_path = tls_dir.join("client.key");
70 let server_root_ca_cert = std::fs::read_to_string(&ca_pem_path)?;
72 let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
73 let tls: ClientTlsConfig = match client_pem_path.exists() && client_key_path.exists() {
74 true => {
75 let client_cert = std::fs::read_to_string(&client_pem_path)?;
76 let client_key = std::fs::read_to_string(&client_key_path)?;
77 let client_identity = Identity::from_pem(client_cert, client_key);
78 ClientTlsConfig::new()
79 .ca_certificate(server_root_ca_cert)
80 .identity(client_identity)
81 }
82 false => ClientTlsConfig::new().ca_certificate(server_root_ca_cert),
83 };
84 Channel::from_shared(addr)?
85 .tls_config(tls)?
86 .connect()
87 .await?
88 } else {
89 Channel::from_shared(addr)?.connect().await?
91 };
92
93 let client = CdkPaymentProcessorClient::new(channel);
94
95 Ok(Self {
96 inner: client,
97 wait_incoming_payment_stream_is_active: Arc::new(AtomicBool::new(false)),
98 cancel_incoming_payment_listener: CancellationToken::new(),
99 })
100 }
101}
102
103#[async_trait]
104impl MintPayment for PaymentProcessorClient {
105 type Err = cdk_common::payment::Error;
106
107 async fn get_settings(&self) -> Result<cdk_common::payment::SettingsResponse, Self::Err> {
108 let mut inner = self.inner.clone();
109 let response = inner
110 .get_settings(with_version_header(Request::new(EmptyRequest {})))
111 .await
112 .map_err(|err| {
113 tracing::error!("Could not get settings: {}", err);
114 cdk_common::payment::Error::Custom(err.to_string())
115 })?;
116
117 let settings = response.into_inner();
118
119 Ok(cdk_common::payment::SettingsResponse {
120 unit: settings.unit,
121 bolt11: settings
122 .bolt11
123 .map(|b| cdk_common::payment::Bolt11Settings {
124 mpp: b.mpp,
125 amountless: b.amountless,
126 invoice_description: b.invoice_description,
127 }),
128 bolt12: settings
129 .bolt12
130 .map(|b| cdk_common::payment::Bolt12Settings {
131 amountless: b.amountless,
132 }),
133 custom: settings.custom,
134 })
135 }
136
137 async fn create_incoming_payment_request(
139 &self,
140 unit: &cdk_common::CurrencyUnit,
141 options: CdkIncomingPaymentOptions,
142 ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
143 let mut inner = self.inner.clone();
144
145 let proto_options = match options {
146 CdkIncomingPaymentOptions::Custom(opts) => IncomingPaymentOptions {
147 options: Some(super::incoming_payment_options::Options::Custom(
148 super::CustomIncomingPaymentOptions {
149 description: opts.description,
150 amount: Some(opts.amount.into()),
151 unix_expiry: opts.unix_expiry,
152 extra_json: opts.extra_json.clone(),
153 },
154 )),
155 },
156 CdkIncomingPaymentOptions::Bolt11(opts) => IncomingPaymentOptions {
157 options: Some(super::incoming_payment_options::Options::Bolt11(
158 super::Bolt11IncomingPaymentOptions {
159 description: opts.description,
160 amount: opts.amount.into(),
161 unix_expiry: opts.unix_expiry,
162 },
163 )),
164 },
165 CdkIncomingPaymentOptions::Bolt12(opts) => IncomingPaymentOptions {
166 options: Some(super::incoming_payment_options::Options::Bolt12(
167 super::Bolt12IncomingPaymentOptions {
168 description: opts.description,
169 amount: opts.amount.map(Into::into),
170 unix_expiry: opts.unix_expiry,
171 },
172 )),
173 },
174 };
175
176 let response = inner
177 .create_payment(with_version_header(Request::new(CreatePaymentRequest {
178 unit: unit.to_string(),
179 options: Some(proto_options),
180 })))
181 .await
182 .map_err(|err| {
183 tracing::error!("Could not create payment request: {}", err);
184 cdk_common::payment::Error::Custom(err.to_string())
185 })?;
186
187 let response = response.into_inner();
188
189 Ok(response.try_into().map_err(|_| {
190 cdk_common::payment::Error::Anyhow(anyhow!("Could not create create payment response"))
191 })?)
192 }
193
194 async fn get_payment_quote(
195 &self,
196 unit: &cdk_common::CurrencyUnit,
197 options: cdk_common::payment::OutgoingPaymentOptions,
198 ) -> Result<CdkPaymentQuoteResponse, Self::Err> {
199 let mut inner = self.inner.clone();
200
201 let request_type = match &options {
202 cdk_common::payment::OutgoingPaymentOptions::Custom(_) => {
203 OutgoingPaymentRequestType::Custom
204 }
205 cdk_common::payment::OutgoingPaymentOptions::Bolt11(_) => {
206 OutgoingPaymentRequestType::Bolt11Invoice
207 }
208 cdk_common::payment::OutgoingPaymentOptions::Bolt12(_) => {
209 OutgoingPaymentRequestType::Bolt12Offer
210 }
211 };
212
213 let proto_request = match &options {
214 cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.request.to_string(),
215 cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.bolt11.to_string(),
216 cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.offer.to_string(),
217 };
218
219 let proto_options = match &options {
220 cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.melt_options,
221 cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.melt_options,
222 cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.melt_options,
223 };
224
225 let extra_json = match &options {
226 cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => opts.extra_json.clone(),
227 _ => None,
228 };
229
230 let response = inner
231 .get_payment_quote(with_version_header(Request::new(PaymentQuoteRequest {
232 request: proto_request,
233 unit: unit.to_string(),
234 options: proto_options.map(Into::into),
235 request_type: request_type.into(),
236 extra_json,
237 })))
238 .await
239 .map_err(|err| {
240 tracing::error!("Could not get payment quote: {}", err);
241 cdk_common::payment::Error::Custom(err.to_string())
242 })?;
243
244 let response = response.into_inner();
245
246 Ok(response.into())
247 }
248
249 async fn make_payment(
250 &self,
251 _unit: &cdk_common::CurrencyUnit,
252 options: cdk_common::payment::OutgoingPaymentOptions,
253 ) -> Result<CdkMakePaymentResponse, Self::Err> {
254 let mut inner = self.inner.clone();
255 let payment_options = match options {
256 cdk_common::payment::OutgoingPaymentOptions::Custom(opts) => {
257 super::OutgoingPaymentVariant {
258 options: Some(super::outgoing_payment_variant::Options::Custom(
259 super::CustomOutgoingPaymentOptions {
260 offer: opts.request.to_string(),
261 max_fee_amount: opts.max_fee_amount.map(Into::into),
262 timeout_secs: opts.timeout_secs,
263 melt_options: opts.melt_options.map(Into::into),
264 extra_json: opts.extra_json.clone(),
265 },
266 )),
267 }
268 }
269 cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => {
270 super::OutgoingPaymentVariant {
271 options: Some(super::outgoing_payment_variant::Options::Bolt11(
272 super::Bolt11OutgoingPaymentOptions {
273 bolt11: opts.bolt11.to_string(),
274 max_fee_amount: opts.max_fee_amount.map(Into::into),
275 timeout_secs: opts.timeout_secs,
276 melt_options: opts.melt_options.map(Into::into),
277 },
278 )),
279 }
280 }
281 cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => {
282 super::OutgoingPaymentVariant {
283 options: Some(super::outgoing_payment_variant::Options::Bolt12(
284 super::Bolt12OutgoingPaymentOptions {
285 offer: opts.offer.to_string(),
286 max_fee_amount: opts.max_fee_amount.map(Into::into),
287 timeout_secs: opts.timeout_secs,
288 melt_options: opts.melt_options.map(Into::into),
289 },
290 )),
291 }
292 }
293 };
294
295 let response = inner
296 .make_payment(with_version_header(Request::new(MakePaymentRequest {
297 payment_options: Some(payment_options),
298 partial_amount: None,
299 max_fee_amount: None,
300 })))
301 .await
302 .map_err(|err| {
303 tracing::error!("Could not pay payment request: {}", err);
304
305 if err.message().contains("already paid") {
306 cdk_common::payment::Error::InvoiceAlreadyPaid
307 } else if err.message().contains("pending") {
308 cdk_common::payment::Error::InvoicePaymentPending
309 } else {
310 cdk_common::payment::Error::Custom(err.to_string())
311 }
312 })?;
313
314 let response = response.into_inner();
315
316 Ok(response.try_into().map_err(|_err| {
317 cdk_common::payment::Error::Anyhow(anyhow!("could not make payment"))
318 })?)
319 }
320
321 #[instrument(skip_all)]
322 async fn wait_payment_event(
323 &self,
324 ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
325 self.wait_incoming_payment_stream_is_active
326 .store(true, Ordering::SeqCst);
327 tracing::debug!("Client waiting for payment");
328 let mut inner = self.inner.clone();
329 let stream = inner
330 .wait_incoming_payment(with_version_header(Request::new(EmptyRequest {})))
331 .await
332 .map_err(|err| {
333 tracing::error!("Could not check incoming payment stream: {}", err);
334 cdk_common::payment::Error::Custom(err.to_string())
335 })?
336 .into_inner();
337
338 let cancel_token = self.cancel_incoming_payment_listener.clone();
339 let cancel_fut = cancel_token.cancelled_owned();
340 let active_flag = self.wait_incoming_payment_stream_is_active.clone();
341
342 let transformed_stream = stream
343 .take_until(cancel_fut)
344 .filter_map(|item| async {
345 match item {
346 Ok(value) => match value.try_into() {
347 Ok(payment_response) => Some(cdk_common::payment::Event::PaymentReceived(
348 payment_response,
349 )),
350 Err(e) => {
351 tracing::error!("Error converting payment response: {}", e);
352 None
353 }
354 },
355 Err(e) => {
356 tracing::error!("Error in payment stream: {}", e);
357 None
358 }
359 }
360 })
361 .inspect(move |_| {
362 active_flag.store(false, Ordering::SeqCst);
363 tracing::info!("Payment stream inactive");
364 });
365
366 Ok(Box::pin(transformed_stream))
367 }
368
369 fn is_wait_invoice_active(&self) -> bool {
371 self.wait_incoming_payment_stream_is_active
372 .load(Ordering::SeqCst)
373 }
374
375 fn cancel_wait_invoice(&self) {
377 self.cancel_incoming_payment_listener.cancel();
378 }
379
380 async fn check_incoming_payment_status(
381 &self,
382 payment_identifier: &cdk_common::payment::PaymentIdentifier,
383 ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
384 let mut inner = self.inner.clone();
385 let response = inner
386 .check_incoming_payment(with_version_header(Request::new(
387 CheckIncomingPaymentRequest {
388 request_identifier: Some(payment_identifier.clone().into()),
389 },
390 )))
391 .await
392 .map_err(|err| {
393 tracing::error!("Could not check incoming payment: {}", err);
394 cdk_common::payment::Error::Custom(err.to_string())
395 })?;
396
397 let check_incoming = response.into_inner();
398 check_incoming
399 .payments
400 .into_iter()
401 .map(|resp| resp.try_into().map_err(Self::Err::from))
402 .collect()
403 }
404
405 async fn check_outgoing_payment(
406 &self,
407 payment_identifier: &cdk_common::payment::PaymentIdentifier,
408 ) -> Result<CdkMakePaymentResponse, Self::Err> {
409 let mut inner = self.inner.clone();
410 let response = inner
411 .check_outgoing_payment(with_version_header(Request::new(
412 CheckOutgoingPaymentRequest {
413 request_identifier: Some(payment_identifier.clone().into()),
414 },
415 )))
416 .await
417 .map_err(|err| {
418 tracing::error!("Could not check outgoing payment: {}", err);
419 cdk_common::payment::Error::Custom(err.to_string())
420 })?;
421
422 let check_outgoing = response.into_inner();
423
424 Ok(check_outgoing
425 .try_into()
426 .map_err(|_| cdk_common::payment::Error::UnknownPaymentState)?)
427 }
428}