cdk_payment_processor/proto/
client.rs1use std::path::PathBuf;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use cdk_common::payment::{
8 CreateIncomingPaymentResponse, IncomingPaymentOptions as CdkIncomingPaymentOptions,
9 MakePaymentResponse as CdkMakePaymentResponse, MintPayment,
10 PaymentQuoteResponse as CdkPaymentQuoteResponse, WaitPaymentResponse,
11};
12use futures::{Stream, StreamExt};
13use serde_json::Value;
14use tokio_util::sync::CancellationToken;
15use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
16use tonic::{async_trait, Request};
17use tracing::instrument;
18
19use crate::proto::cdk_payment_processor_client::CdkPaymentProcessorClient;
20use crate::proto::{
21 CheckIncomingPaymentRequest, CheckOutgoingPaymentRequest, CreatePaymentRequest, EmptyRequest,
22 IncomingPaymentOptions, MakePaymentRequest, OutgoingPaymentRequestType, PaymentQuoteRequest,
23};
24
25#[derive(Clone)]
27pub struct PaymentProcessorClient {
28 inner: CdkPaymentProcessorClient<Channel>,
29 wait_incoming_payment_stream_is_active: Arc<AtomicBool>,
30 cancel_incoming_payment_listener: CancellationToken,
31}
32
33impl PaymentProcessorClient {
34 pub async fn new(addr: &str, port: u16, tls_dir: Option<PathBuf>) -> anyhow::Result<Self> {
36 let addr = format!("{addr}:{port}");
37 let channel = if let Some(tls_dir) = tls_dir {
38 let ca_pem_path = tls_dir.join("ca.pem");
42 if !ca_pem_path.exists() {
43 let err_msg = format!("CA certificate file not found: {}", ca_pem_path.display());
44 tracing::error!("{}", err_msg);
45 return Err(anyhow!(err_msg));
46 }
47
48 let client_pem_path = tls_dir.join("client.pem");
50 if !client_pem_path.exists() {
51 let err_msg = format!(
52 "Client certificate file not found: {}",
53 client_pem_path.display()
54 );
55 tracing::error!("{}", err_msg);
56 return Err(anyhow!(err_msg));
57 }
58
59 let client_key_path = tls_dir.join("client.key");
61 if !client_key_path.exists() {
62 let err_msg = format!("Client key file not found: {}", client_key_path.display());
63 tracing::error!("{}", err_msg);
64 return Err(anyhow!(err_msg));
65 }
66
67 let server_root_ca_cert = std::fs::read_to_string(&ca_pem_path)?;
68 let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
69 let client_cert = std::fs::read_to_string(&client_pem_path)?;
70 let client_key = std::fs::read_to_string(&client_key_path)?;
71 let client_identity = Identity::from_pem(client_cert, client_key);
72 let tls = ClientTlsConfig::new()
73 .ca_certificate(server_root_ca_cert)
74 .identity(client_identity);
75
76 Channel::from_shared(addr)?
77 .tls_config(tls)?
78 .connect()
79 .await?
80 } else {
81 Channel::from_shared(addr)?.connect().await?
83 };
84
85 let client = CdkPaymentProcessorClient::new(channel);
86
87 Ok(Self {
88 inner: client,
89 wait_incoming_payment_stream_is_active: Arc::new(AtomicBool::new(false)),
90 cancel_incoming_payment_listener: CancellationToken::new(),
91 })
92 }
93}
94
95#[async_trait]
96impl MintPayment for PaymentProcessorClient {
97 type Err = cdk_common::payment::Error;
98
99 async fn get_settings(&self) -> Result<Value, Self::Err> {
100 let mut inner = self.inner.clone();
101 let response = inner
102 .get_settings(Request::new(EmptyRequest {}))
103 .await
104 .map_err(|err| {
105 tracing::error!("Could not get settings: {}", err);
106 cdk_common::payment::Error::Custom(err.to_string())
107 })?;
108
109 let settings = response.into_inner();
110
111 Ok(serde_json::from_str(&settings.inner)?)
112 }
113
114 async fn create_incoming_payment_request(
116 &self,
117 unit: &cdk_common::CurrencyUnit,
118 options: CdkIncomingPaymentOptions,
119 ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
120 let mut inner = self.inner.clone();
121
122 let proto_options = match options {
123 CdkIncomingPaymentOptions::Bolt11(opts) => IncomingPaymentOptions {
124 options: Some(super::incoming_payment_options::Options::Bolt11(
125 super::Bolt11IncomingPaymentOptions {
126 description: opts.description,
127 amount: opts.amount.into(),
128 unix_expiry: opts.unix_expiry,
129 },
130 )),
131 },
132 CdkIncomingPaymentOptions::Bolt12(opts) => IncomingPaymentOptions {
133 options: Some(super::incoming_payment_options::Options::Bolt12(
134 super::Bolt12IncomingPaymentOptions {
135 description: opts.description,
136 amount: opts.amount.map(Into::into),
137 unix_expiry: opts.unix_expiry,
138 },
139 )),
140 },
141 };
142
143 let response = inner
144 .create_payment(Request::new(CreatePaymentRequest {
145 unit: unit.to_string(),
146 options: Some(proto_options),
147 }))
148 .await
149 .map_err(|err| {
150 tracing::error!("Could not create payment request: {}", err);
151 cdk_common::payment::Error::Custom(err.to_string())
152 })?;
153
154 let response = response.into_inner();
155
156 Ok(response.try_into().map_err(|_| {
157 cdk_common::payment::Error::Anyhow(anyhow!("Could not create create payment response"))
158 })?)
159 }
160
161 async fn get_payment_quote(
162 &self,
163 unit: &cdk_common::CurrencyUnit,
164 options: cdk_common::payment::OutgoingPaymentOptions,
165 ) -> Result<CdkPaymentQuoteResponse, Self::Err> {
166 let mut inner = self.inner.clone();
167
168 let request_type = match &options {
169 cdk_common::payment::OutgoingPaymentOptions::Bolt11(_) => {
170 OutgoingPaymentRequestType::Bolt11Invoice
171 }
172 cdk_common::payment::OutgoingPaymentOptions::Bolt12(_) => {
173 OutgoingPaymentRequestType::Bolt12Offer
174 }
175 };
176
177 let proto_request = match &options {
178 cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.bolt11.to_string(),
179 cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.offer.to_string(),
180 };
181
182 let proto_options = match &options {
183 cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.melt_options,
184 cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.melt_options,
185 };
186
187 let response = inner
188 .get_payment_quote(Request::new(PaymentQuoteRequest {
189 request: proto_request,
190 unit: unit.to_string(),
191 options: proto_options.map(Into::into),
192 request_type: request_type.into(),
193 }))
194 .await
195 .map_err(|err| {
196 tracing::error!("Could not get payment quote: {}", err);
197 cdk_common::payment::Error::Custom(err.to_string())
198 })?;
199
200 let response = response.into_inner();
201
202 Ok(response.into())
203 }
204
205 async fn make_payment(
206 &self,
207 _unit: &cdk_common::CurrencyUnit,
208 options: cdk_common::payment::OutgoingPaymentOptions,
209 ) -> Result<CdkMakePaymentResponse, Self::Err> {
210 let mut inner = self.inner.clone();
211
212 let payment_options = match options {
213 cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => {
214 super::OutgoingPaymentVariant {
215 options: Some(super::outgoing_payment_variant::Options::Bolt11(
216 super::Bolt11OutgoingPaymentOptions {
217 bolt11: opts.bolt11.to_string(),
218 max_fee_amount: opts.max_fee_amount.map(Into::into),
219 timeout_secs: opts.timeout_secs,
220 melt_options: opts.melt_options.map(Into::into),
221 },
222 )),
223 }
224 }
225 cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => {
226 super::OutgoingPaymentVariant {
227 options: Some(super::outgoing_payment_variant::Options::Bolt12(
228 super::Bolt12OutgoingPaymentOptions {
229 offer: opts.offer.to_string(),
230 max_fee_amount: opts.max_fee_amount.map(Into::into),
231 timeout_secs: opts.timeout_secs,
232 melt_options: opts.melt_options.map(Into::into),
233 },
234 )),
235 }
236 }
237 };
238
239 let response = inner
240 .make_payment(Request::new(MakePaymentRequest {
241 payment_options: Some(payment_options),
242 partial_amount: None,
243 max_fee_amount: None,
244 }))
245 .await
246 .map_err(|err| {
247 tracing::error!("Could not pay payment request: {}", err);
248
249 if err.message().contains("already paid") {
250 cdk_common::payment::Error::InvoiceAlreadyPaid
251 } else if err.message().contains("pending") {
252 cdk_common::payment::Error::InvoicePaymentPending
253 } else {
254 cdk_common::payment::Error::Custom(err.to_string())
255 }
256 })?;
257
258 let response = response.into_inner();
259
260 Ok(response.try_into().map_err(|_err| {
261 cdk_common::payment::Error::Anyhow(anyhow!("could not make payment"))
262 })?)
263 }
264
265 #[instrument(skip_all)]
266 async fn wait_payment_event(
267 &self,
268 ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
269 self.wait_incoming_payment_stream_is_active
270 .store(true, Ordering::SeqCst);
271 tracing::debug!("Client waiting for payment");
272 let mut inner = self.inner.clone();
273 let stream = inner
274 .wait_incoming_payment(EmptyRequest {})
275 .await
276 .map_err(|err| {
277 tracing::error!("Could not check incoming payment stream: {}", err);
278 cdk_common::payment::Error::Custom(err.to_string())
279 })?
280 .into_inner();
281
282 let cancel_token = self.cancel_incoming_payment_listener.clone();
283 let cancel_fut = cancel_token.cancelled_owned();
284 let active_flag = self.wait_incoming_payment_stream_is_active.clone();
285
286 let transformed_stream = stream
287 .take_until(cancel_fut)
288 .filter_map(|item| async {
289 match item {
290 Ok(value) => match value.try_into() {
291 Ok(payment_response) => Some(cdk_common::payment::Event::PaymentReceived(
292 payment_response,
293 )),
294 Err(e) => {
295 tracing::error!("Error converting payment response: {}", e);
296 None
297 }
298 },
299 Err(e) => {
300 tracing::error!("Error in payment stream: {}", e);
301 None
302 }
303 }
304 })
305 .inspect(move |_| {
306 active_flag.store(false, Ordering::SeqCst);
307 tracing::info!("Payment stream inactive");
308 });
309
310 Ok(Box::pin(transformed_stream))
311 }
312
313 fn is_wait_invoice_active(&self) -> bool {
315 self.wait_incoming_payment_stream_is_active
316 .load(Ordering::SeqCst)
317 }
318
319 fn cancel_wait_invoice(&self) {
321 self.cancel_incoming_payment_listener.cancel();
322 }
323
324 async fn check_incoming_payment_status(
325 &self,
326 payment_identifier: &cdk_common::payment::PaymentIdentifier,
327 ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
328 let mut inner = self.inner.clone();
329 let response = inner
330 .check_incoming_payment(Request::new(CheckIncomingPaymentRequest {
331 request_identifier: Some(payment_identifier.clone().into()),
332 }))
333 .await
334 .map_err(|err| {
335 tracing::error!("Could not check incoming payment: {}", err);
336 cdk_common::payment::Error::Custom(err.to_string())
337 })?;
338
339 let check_incoming = response.into_inner();
340 check_incoming
341 .payments
342 .into_iter()
343 .map(|resp| resp.try_into().map_err(Self::Err::from))
344 .collect()
345 }
346
347 async fn check_outgoing_payment(
348 &self,
349 payment_identifier: &cdk_common::payment::PaymentIdentifier,
350 ) -> Result<CdkMakePaymentResponse, Self::Err> {
351 let mut inner = self.inner.clone();
352 let response = inner
353 .check_outgoing_payment(Request::new(CheckOutgoingPaymentRequest {
354 request_identifier: Some(payment_identifier.clone().into()),
355 }))
356 .await
357 .map_err(|err| {
358 tracing::error!("Could not check outgoing payment: {}", err);
359 cdk_common::payment::Error::Custom(err.to_string())
360 })?;
361
362 let check_outgoing = response.into_inner();
363
364 Ok(check_outgoing
365 .try_into()
366 .map_err(|_| cdk_common::payment::Error::UnknownPaymentState)?)
367 }
368}