cdk_payment_processor/proto/
client.rs1use std::path::PathBuf;
2use std::pin::Pin;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5
6use anyhow::anyhow;
7use cdk_common::payment::{
8 CreateIncomingPaymentResponse, IncomingPaymentOptions as CdkIncomingPaymentOptions,
9 MakePaymentResponse as CdkMakePaymentResponse, MintPayment,
10 PaymentQuoteResponse as CdkPaymentQuoteResponse, WaitPaymentResponse,
11};
12use futures::{Stream, StreamExt};
13use serde_json::Value;
14use tokio_util::sync::CancellationToken;
15use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
16use tonic::{async_trait, Request};
17use tracing::instrument;
18
19use crate::proto::cdk_payment_processor_client::CdkPaymentProcessorClient;
20use crate::proto::{
21 CheckIncomingPaymentRequest, CheckOutgoingPaymentRequest, CreatePaymentRequest, EmptyRequest,
22 IncomingPaymentOptions, MakePaymentRequest, OutgoingPaymentRequestType, PaymentQuoteRequest,
23};
24
25#[derive(Clone)]
27pub struct PaymentProcessorClient {
28 inner: CdkPaymentProcessorClient<Channel>,
29 wait_incoming_payment_stream_is_active: Arc<AtomicBool>,
30 cancel_incoming_payment_listener: CancellationToken,
31}
32
33impl PaymentProcessorClient {
34 pub async fn new(addr: &str, port: u16, tls_dir: Option<PathBuf>) -> anyhow::Result<Self> {
36 let addr = format!("{addr}:{port}");
37 let channel = if let Some(tls_dir) = tls_dir {
38 let ca_pem_path = tls_dir.join("ca.pem");
42 if !ca_pem_path.exists() {
43 let err_msg = format!("CA certificate file not found: {}", ca_pem_path.display());
44 tracing::error!("{}", err_msg);
45 return Err(anyhow!(err_msg));
46 }
47
48 let client_pem_path = tls_dir.join("client.pem");
50
51 let client_key_path = tls_dir.join("client.key");
53 let server_root_ca_cert = std::fs::read_to_string(&ca_pem_path)?;
55 let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
56 let tls: ClientTlsConfig = match client_pem_path.exists() && client_key_path.exists() {
57 true => {
58 let client_cert = std::fs::read_to_string(&client_pem_path)?;
59 let client_key = std::fs::read_to_string(&client_key_path)?;
60 let client_identity = Identity::from_pem(client_cert, client_key);
61 ClientTlsConfig::new()
62 .ca_certificate(server_root_ca_cert)
63 .identity(client_identity)
64 }
65 false => ClientTlsConfig::new().ca_certificate(server_root_ca_cert),
66 };
67 Channel::from_shared(addr)?
68 .tls_config(tls)?
69 .connect()
70 .await?
71 } else {
72 Channel::from_shared(addr)?.connect().await?
74 };
75
76 let client = CdkPaymentProcessorClient::new(channel);
77
78 Ok(Self {
79 inner: client,
80 wait_incoming_payment_stream_is_active: Arc::new(AtomicBool::new(false)),
81 cancel_incoming_payment_listener: CancellationToken::new(),
82 })
83 }
84}
85
86#[async_trait]
87impl MintPayment for PaymentProcessorClient {
88 type Err = cdk_common::payment::Error;
89
90 async fn get_settings(&self) -> Result<Value, Self::Err> {
91 let mut inner = self.inner.clone();
92 let response = inner
93 .get_settings(Request::new(EmptyRequest {}))
94 .await
95 .map_err(|err| {
96 tracing::error!("Could not get settings: {}", err);
97 cdk_common::payment::Error::Custom(err.to_string())
98 })?;
99
100 let settings = response.into_inner();
101
102 Ok(serde_json::from_str(&settings.inner)?)
103 }
104
105 async fn create_incoming_payment_request(
107 &self,
108 unit: &cdk_common::CurrencyUnit,
109 options: CdkIncomingPaymentOptions,
110 ) -> Result<CreateIncomingPaymentResponse, Self::Err> {
111 let mut inner = self.inner.clone();
112
113 let proto_options = match options {
114 CdkIncomingPaymentOptions::Bolt11(opts) => IncomingPaymentOptions {
115 options: Some(super::incoming_payment_options::Options::Bolt11(
116 super::Bolt11IncomingPaymentOptions {
117 description: opts.description,
118 amount: opts.amount.into(),
119 unix_expiry: opts.unix_expiry,
120 },
121 )),
122 },
123 CdkIncomingPaymentOptions::Bolt12(opts) => IncomingPaymentOptions {
124 options: Some(super::incoming_payment_options::Options::Bolt12(
125 super::Bolt12IncomingPaymentOptions {
126 description: opts.description,
127 amount: opts.amount.map(Into::into),
128 unix_expiry: opts.unix_expiry,
129 },
130 )),
131 },
132 };
133
134 let response = inner
135 .create_payment(Request::new(CreatePaymentRequest {
136 unit: unit.to_string(),
137 options: Some(proto_options),
138 }))
139 .await
140 .map_err(|err| {
141 tracing::error!("Could not create payment request: {}", err);
142 cdk_common::payment::Error::Custom(err.to_string())
143 })?;
144
145 let response = response.into_inner();
146
147 Ok(response.try_into().map_err(|_| {
148 cdk_common::payment::Error::Anyhow(anyhow!("Could not create create payment response"))
149 })?)
150 }
151
152 async fn get_payment_quote(
153 &self,
154 unit: &cdk_common::CurrencyUnit,
155 options: cdk_common::payment::OutgoingPaymentOptions,
156 ) -> Result<CdkPaymentQuoteResponse, Self::Err> {
157 let mut inner = self.inner.clone();
158
159 let request_type = match &options {
160 cdk_common::payment::OutgoingPaymentOptions::Bolt11(_) => {
161 OutgoingPaymentRequestType::Bolt11Invoice
162 }
163 cdk_common::payment::OutgoingPaymentOptions::Bolt12(_) => {
164 OutgoingPaymentRequestType::Bolt12Offer
165 }
166 };
167
168 let proto_request = match &options {
169 cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.bolt11.to_string(),
170 cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.offer.to_string(),
171 };
172
173 let proto_options = match &options {
174 cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => opts.melt_options,
175 cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => opts.melt_options,
176 };
177
178 let response = inner
179 .get_payment_quote(Request::new(PaymentQuoteRequest {
180 request: proto_request,
181 unit: unit.to_string(),
182 options: proto_options.map(Into::into),
183 request_type: request_type.into(),
184 }))
185 .await
186 .map_err(|err| {
187 tracing::error!("Could not get payment quote: {}", err);
188 cdk_common::payment::Error::Custom(err.to_string())
189 })?;
190
191 let response = response.into_inner();
192
193 Ok(response.into())
194 }
195
196 async fn make_payment(
197 &self,
198 _unit: &cdk_common::CurrencyUnit,
199 options: cdk_common::payment::OutgoingPaymentOptions,
200 ) -> Result<CdkMakePaymentResponse, Self::Err> {
201 let mut inner = self.inner.clone();
202
203 let payment_options = match options {
204 cdk_common::payment::OutgoingPaymentOptions::Bolt11(opts) => {
205 super::OutgoingPaymentVariant {
206 options: Some(super::outgoing_payment_variant::Options::Bolt11(
207 super::Bolt11OutgoingPaymentOptions {
208 bolt11: opts.bolt11.to_string(),
209 max_fee_amount: opts.max_fee_amount.map(Into::into),
210 timeout_secs: opts.timeout_secs,
211 melt_options: opts.melt_options.map(Into::into),
212 },
213 )),
214 }
215 }
216 cdk_common::payment::OutgoingPaymentOptions::Bolt12(opts) => {
217 super::OutgoingPaymentVariant {
218 options: Some(super::outgoing_payment_variant::Options::Bolt12(
219 super::Bolt12OutgoingPaymentOptions {
220 offer: opts.offer.to_string(),
221 max_fee_amount: opts.max_fee_amount.map(Into::into),
222 timeout_secs: opts.timeout_secs,
223 melt_options: opts.melt_options.map(Into::into),
224 },
225 )),
226 }
227 }
228 };
229
230 let response = inner
231 .make_payment(Request::new(MakePaymentRequest {
232 payment_options: Some(payment_options),
233 partial_amount: None,
234 max_fee_amount: None,
235 }))
236 .await
237 .map_err(|err| {
238 tracing::error!("Could not pay payment request: {}", err);
239
240 if err.message().contains("already paid") {
241 cdk_common::payment::Error::InvoiceAlreadyPaid
242 } else if err.message().contains("pending") {
243 cdk_common::payment::Error::InvoicePaymentPending
244 } else {
245 cdk_common::payment::Error::Custom(err.to_string())
246 }
247 })?;
248
249 let response = response.into_inner();
250
251 Ok(response.try_into().map_err(|_err| {
252 cdk_common::payment::Error::Anyhow(anyhow!("could not make payment"))
253 })?)
254 }
255
256 #[instrument(skip_all)]
257 async fn wait_payment_event(
258 &self,
259 ) -> Result<Pin<Box<dyn Stream<Item = cdk_common::payment::Event> + Send>>, Self::Err> {
260 self.wait_incoming_payment_stream_is_active
261 .store(true, Ordering::SeqCst);
262 tracing::debug!("Client waiting for payment");
263 let mut inner = self.inner.clone();
264 let stream = inner
265 .wait_incoming_payment(EmptyRequest {})
266 .await
267 .map_err(|err| {
268 tracing::error!("Could not check incoming payment stream: {}", err);
269 cdk_common::payment::Error::Custom(err.to_string())
270 })?
271 .into_inner();
272
273 let cancel_token = self.cancel_incoming_payment_listener.clone();
274 let cancel_fut = cancel_token.cancelled_owned();
275 let active_flag = self.wait_incoming_payment_stream_is_active.clone();
276
277 let transformed_stream = stream
278 .take_until(cancel_fut)
279 .filter_map(|item| async {
280 match item {
281 Ok(value) => match value.try_into() {
282 Ok(payment_response) => Some(cdk_common::payment::Event::PaymentReceived(
283 payment_response,
284 )),
285 Err(e) => {
286 tracing::error!("Error converting payment response: {}", e);
287 None
288 }
289 },
290 Err(e) => {
291 tracing::error!("Error in payment stream: {}", e);
292 None
293 }
294 }
295 })
296 .inspect(move |_| {
297 active_flag.store(false, Ordering::SeqCst);
298 tracing::info!("Payment stream inactive");
299 });
300
301 Ok(Box::pin(transformed_stream))
302 }
303
304 fn is_wait_invoice_active(&self) -> bool {
306 self.wait_incoming_payment_stream_is_active
307 .load(Ordering::SeqCst)
308 }
309
310 fn cancel_wait_invoice(&self) {
312 self.cancel_incoming_payment_listener.cancel();
313 }
314
315 async fn check_incoming_payment_status(
316 &self,
317 payment_identifier: &cdk_common::payment::PaymentIdentifier,
318 ) -> Result<Vec<WaitPaymentResponse>, Self::Err> {
319 let mut inner = self.inner.clone();
320 let response = inner
321 .check_incoming_payment(Request::new(CheckIncomingPaymentRequest {
322 request_identifier: Some(payment_identifier.clone().into()),
323 }))
324 .await
325 .map_err(|err| {
326 tracing::error!("Could not check incoming payment: {}", err);
327 cdk_common::payment::Error::Custom(err.to_string())
328 })?;
329
330 let check_incoming = response.into_inner();
331 check_incoming
332 .payments
333 .into_iter()
334 .map(|resp| resp.try_into().map_err(Self::Err::from))
335 .collect()
336 }
337
338 async fn check_outgoing_payment(
339 &self,
340 payment_identifier: &cdk_common::payment::PaymentIdentifier,
341 ) -> Result<CdkMakePaymentResponse, Self::Err> {
342 let mut inner = self.inner.clone();
343 let response = inner
344 .check_outgoing_payment(Request::new(CheckOutgoingPaymentRequest {
345 request_identifier: Some(payment_identifier.clone().into()),
346 }))
347 .await
348 .map_err(|err| {
349 tracing::error!("Could not check outgoing payment: {}", err);
350 cdk_common::payment::Error::Custom(err.to_string())
351 })?;
352
353 let check_outgoing = response.into_inner();
354
355 Ok(check_outgoing
356 .try_into()
357 .map_err(|_| cdk_common::payment::Error::UnknownPaymentState)?)
358 }
359}