1use crate::lnd::Route;
2use crate::lnd::{AddInvoiceResponse, FeeLimit, LndClient, LndSendPaymentSyncReq};
3use anyhow::Result;
4use redis::aio::Connection;
5use core::fmt;
6use rand::seq::SliceRandom;
7use serde::{Deserialize, Serialize};
8use std::fmt::Display;
9extern crate redis;
10use redis::{AsyncCommands, FromRedisValue};
11
12pub struct Cluster {
13 pub nodes: Vec<Node>,
14 pub cache: Connection,
15 pub inv_exp_sec: i64,
16 pub addr_exp_sec: i64,
17 pub utxo_exp_sec: i64,
18}
19
20#[derive(Clone)]
21pub struct Node {
22 pub pubkey: String,
23 pub ip: String,
24 pub port: String,
25 pub network: NodeNetwork,
26 pub lightning_impl: NodeLightningImpl,
27 pub client: NodeClient,
28}
29
30#[derive(Clone)]
31pub enum NodeClient {
32 Lnd(LndClient),
33 CLightning,
34 Eclair,
35 Other,
36}
37#[derive(Clone)]
38pub enum NodeNetwork {
39 Mainnet,
40 Testnet,
41}
42
43#[derive(Clone)]
44pub enum NodeLightningImpl {
45 Lnd,
46 CLightning,
47 Eclair,
48 Other,
49}
50
51#[derive(Serialize, Deserialize, Debug)]
52pub struct ClusterAddInvoice {
53 pub pubkey: Option<String>,
54 pub memo: String,
55 pub value: i64,
56 pub expiry: i64,
57}
58
59#[derive(Serialize, Deserialize, Debug, Clone)]
60pub struct ClusterLookupInvoice {
61 pub pubkey: String,
62 pub memo: String,
63 pub r_preimage: String,
64 pub r_hash: String,
65 pub value: String,
66 pub settle_date: String,
67 pub payment_request: String,
68 pub description_hash: String,
69 pub expiry: String,
70 pub amt_paid_sat: String,
71 pub state: ClusterInvoiceState,
72}
73
74impl FromRedisValue for ClusterLookupInvoice {
75 fn from_redis_value(v: &redis::Value) -> redis::RedisResult<Self> {
76 match v {
77 redis::Value::Okay => {
78 return Ok(ClusterLookupInvoice {
79 pubkey: "".to_string(),
80 memo: "".to_string(),
81 r_preimage: "".to_string(),
82 r_hash: "".to_string(),
83 value: "".to_string(),
84 settle_date: "".to_string(),
85 payment_request: "".to_string(),
86 description_hash: "".to_string(),
87 expiry: "".to_string(),
88 amt_paid_sat: "".to_string(),
89 state: ClusterInvoiceState::Open,
90 })
91 },
92 redis::Value::Data(data) => {
93 let json = String::from_utf8(data.to_vec()).unwrap();
94 let invoice: ClusterLookupInvoice = serde_json::from_str(&json).unwrap();
95 return Ok(invoice)
96 },
97 _ => panic!("Invalid redis value"),
98 };
99 }
100}
101
102#[derive(Serialize, Deserialize, Debug, Clone)]
103pub struct ClusterPayPaymentRequestRes {
104 pub pubkey: String,
105 pub payment_error: Option<String>,
106 pub payment_preimage: Option<String>,
107 pub payment_route: Option<Route>,
108 pub payment_hash: Option<String>,
109}
110
111#[derive(Deserialize, Debug, Clone, Serialize)]
112pub struct ClusterUtxos {
113 pub utxos: Vec<ClusterUtxo>,
114}
115
116impl FromRedisValue for ClusterUtxos {
117 fn from_redis_value(v: &redis::Value) -> redis::RedisResult<Self> {
118 match v {
119 redis::Value::Okay => {
120 return Ok(ClusterUtxos {
121 utxos: vec![],
122 })
123 },
124 redis::Value::Data(data) => {
125 let json = String::from_utf8(data.to_vec()).unwrap();
126 let utxos: ClusterUtxos = serde_json::from_str(&json).unwrap();
127 return Ok(utxos)
128 },
129 _ => panic!("Invalid redis value"),
130 }
131 }
132}
133
134#[derive(Debug, Clone, Deserialize, Serialize)]
135pub struct ClusterUtxo {
136 pub pubkey: String,
137 pub address: String,
138 pub amount: u64,
139 pub confirmations: u64,
140}
141
142#[derive(Serialize, Deserialize, Debug, Clone)]
143pub enum ClusterInvoiceState {
144 #[serde(rename = "OPEN")]
145 Open = 0,
146 #[serde(rename = "SETTLED")]
147 Settled = 1,
148 #[serde(rename = "CANCELED")]
149 Canceled = 2,
150 #[serde(rename = "ACCEPTED")]
151 Accepted = 3,
152}
153
154impl Node {
155 pub async fn lookup_invoice(self: &Self, r_hash: &str) -> Result<ClusterLookupInvoice> {
156 match &self.client {
157 NodeClient::Lnd(client) => {
158 let invoice = client.lookup_invoice(r_hash).await?;
159 Ok(invoice.to_cluster(&self.pubkey))
160 }
161 _ => {
162 panic!("We only support LND nodes at this time.")
163 }
164 }
165 }
166
167 pub async fn add_invoice(self: &Self, req: ClusterAddInvoice) -> Result<AddInvoiceResponse> {
168 match &self.client {
169 NodeClient::Lnd(client) => {
170 let invoice = client.add_invoice(req).await?;
171
172 let response = AddInvoiceResponse {
173 r_hash: to_hex(&invoice.r_hash)?,
174 payment_addr: to_hex(&invoice.payment_addr)?,
175 ..invoice
176 };
177 Ok(response)
178 }
179 _ => {
180 panic!("We only support LND nodes at this time.")
181 }
182 }
183 }
184
185 pub async fn next_address(&self) -> Result<String> {
186 match &self.client {
187 NodeClient::Lnd(client) => {
188 let addr = client.new_address().await?;
189 Ok(addr.address)
190 }
191 _ => {
192 panic!("We only support LND nodes at this time.")
193 }
194 }
195 }
196
197 pub async fn list_utxos(&self) -> Result<ClusterUtxos> {
198 match &self.client {
199 NodeClient::Lnd(client) => {
200 let utxos = client.list_unspent().await?;
201 let cluster_utxos = ClusterUtxos {
202 utxos: utxos
203 .utxos
204 .into_iter()
205 .map(|utxo| ClusterUtxo {
206 pubkey: self.pubkey.clone(),
207 address: utxo.address,
208 amount: utxo.amount_sat.parse::<u64>().unwrap(),
209 confirmations: utxo.confirmations.parse::<u64>().unwrap(),
210 })
211 .collect(),
212 };
213 Ok(cluster_utxos)
214 }
215 _ => {
216 panic!("We only support LND nodes at this time.")
217 }
218 }
219 }
220}
221
222impl Cluster {
223 pub fn new(
224 nodes: Vec<Node>,
225 redis: redis::aio::Connection,
226 inv_exp_sec: i64,
227 addr_exp_sec: i64,
228 utxo_exp_sec: i64,
229 ) -> Cluster {
230 Self {
231 nodes,
232 cache: redis,
233 inv_exp_sec: inv_exp_sec,
234 addr_exp_sec: addr_exp_sec,
235 utxo_exp_sec: utxo_exp_sec,
236 }
237 }
238
239 pub async fn lookup_invoice(
240 &mut self,
241 r_hash: &str,
242 pubkey: Option<String>,
243 ) -> Result<ClusterLookupInvoice> {
244 let cached_invoice = self.cache.get(&r_hash.to_string()).await?;
245
246 match cached_invoice {
247 Some(invoice) => {
248 eprintln!("cached");
249 Ok(invoice)
250 },
251 None => {
252 if let Some(pubkey) = pubkey {
253 let node = self
254 .nodes
255 .iter()
256 .find(|node| node.pubkey == pubkey)
257 .unwrap();
258 let invoice = node.lookup_invoice(r_hash).await?;
259 let hexed_invoice = ClusterLookupInvoice {
260 r_hash: to_hex(&invoice.r_hash)?,
261 r_preimage: to_hex(&invoice.r_preimage)?,
262 ..invoice
263 };
264 let json_string = serde_json::to_string(&hexed_invoice).unwrap();
265
266 let _: Result<ClusterLookupInvoice, _> = self.cache
267 .set_ex(
268 r_hash.to_string(),
269 json_string,
270 self.inv_exp_sec as usize,
271 ).await;
272 eprintln!("requested invoice from node");
273 Ok(hexed_invoice)
274 } else {
275 let mut tasks = vec![];
277 for node in &self.nodes {
278 let r_hash_clone = r_hash.clone();
279 let task = node.lookup_invoice(r_hash_clone);
280 tasks.push(task);
281 }
282
283 let success_result = match futures::future::join_all(tasks)
285 .await
286 .into_iter()
287 .enumerate()
288 .find_map(|(_index, result)| result.ok())
289 {
290 Some(success_result) => success_result,
291 None => return Err(anyhow::Error::msg("No nodes found this invoice.")),
292 };
293
294 let hexed_invoice = ClusterLookupInvoice {
295 r_hash: to_hex(&success_result.r_hash)?,
296 r_preimage: to_hex(&success_result.r_preimage)?,
297 ..success_result.clone()
298 };
299
300 let json_invoice = serde_json::to_string(&hexed_invoice).unwrap();
301
302 let _: Result<ClusterLookupInvoice, _> = self.cache
304 .set_ex(
305 r_hash.to_string(),
306 json_invoice,
307 self.inv_exp_sec as usize,
308 )
309 .await;
310
311 eprintln!("requested invoice from node");
312
313 Ok(hexed_invoice)
314 }
315 }
316 }
317 }
318
319 pub async fn add_invoice(
320 &self,
321 req: ClusterAddInvoice,
322 pubkey: Option<String>,
323 ) -> Result<AddInvoiceResponse> {
324 match pubkey {
325 Some(pubkey) => {
326 let node = self
327 .nodes
328 .iter()
329 .find(|node| node.pubkey == pubkey)
330 .unwrap();
331 node.add_invoice(req).await
332 }
333 None => {
334 let mut rng = rand::thread_rng();
335 let node = self.nodes.choose(&mut rng).unwrap();
336 node.add_invoice(req).await
337 }
338 }
339 }
340
341 pub async fn next_address(&mut self, pubkey: Option<String>) -> Result<String> {
342 match pubkey {
343 Some(pubkey) => {
344 let node = self
345 .nodes
346 .iter()
347 .find(|node| node.pubkey == pubkey)
348 .unwrap();
349
350 let addr = node.next_address().await?;
351
352 let _: Result<String, _> = self.cache
353 .set_ex(
354 addr.clone(),
355 node.clone().pubkey,
356 self.addr_exp_sec as usize,
357 )
358 .await;
359 Ok(addr)
360 }
361 None => {
362 let mut rng = rand::thread_rng();
363 let node = self.nodes.choose(&mut rng).unwrap();
364
365 let addr = node.next_address().await?;
366
367 let _: Result<String, _> = self.cache.set_ex(
368 addr.clone(),
369 node.clone().pubkey,
370 self.addr_exp_sec as usize,
371 ).await;
372 Ok(addr)
373 }
374 }
375 }
376
377 pub async fn list_utxos(&mut self, pubkey: Option<&str>) -> Result<ClusterUtxos> {
378 match pubkey {
379 Some(pubkey) => {
380 let node = self
381 .nodes
382 .iter()
383 .find(|node| node.pubkey == pubkey)
384 .ok_or_else(|| anyhow::anyhow!("Node not found with provided pubkey"))?;
385
386 let cache_key = format!("utxos:{}", node.pubkey);
387 let cached_utxos = self.cache.get(&cache_key).await?;
388
389 match cached_utxos {
390 Some(utxos) => Ok(utxos),
391 None => {
392 let utxos = node.list_utxos().await?;
393 let json_utxos = serde_json::to_string(&utxos).unwrap();
394 let _: Result<ClusterUtxos, _> = self.cache.set_ex(
395 cache_key,
396 json_utxos,
397 self.utxo_exp_sec as usize,
398 ).await;
399 Ok(utxos)
400 }
401 }
402 }
403 None => {
404 let mut cluster_utxos = ClusterUtxos { utxos: vec![] };
405
406 for node in &self.nodes {
407 let cache_key = format!("utxos:{}", node.pubkey);
408 let cached_utxos = self.cache.get(&cache_key).await?;
409
410 let node_utxos = match cached_utxos {
411 Some(utxos) => utxos,
412 None => {
413 let fetched_utxos = node.list_utxos().await?;
414 let json_utxos = serde_json::to_string(&fetched_utxos).unwrap();
415 let _: Result<ClusterUtxos, _> = self.cache.set_ex(
416 cache_key,
417 json_utxos,
418 self.utxo_exp_sec as usize,
419 ).await;
420 fetched_utxos
421 }
422 };
423
424 cluster_utxos.utxos.extend(node_utxos.utxos);
425 }
426
427 Ok(cluster_utxos)
428 }
429 }
430 }
431
432 pub async fn pay_invoice(
433 &self,
434 amount: u64,
435 payment_request: String,
436 max_fee: i64,
437 pubkey: Option<String>,
438 ) -> Result<ClusterPayPaymentRequestRes> {
439 if pubkey.is_some() {
441 let node = self
442 .nodes
443 .iter()
444 .find(|node| &node.pubkey == pubkey.as_ref().unwrap())
445 .ok_or_else(|| anyhow::anyhow!("Node not found with provided pubkey"))?;
446
447 match &node.client {
448 NodeClient::Lnd(client) => {
449 let req = LndSendPaymentSyncReq {
450 payment_request: payment_request.clone(),
451 amt: amount.to_string(),
452 fee_limit: FeeLimit {
453 fixed: max_fee.to_string(),
454 },
455 allow_self_payment: false,
456 };
457 let invoice = client.send_payment_sync(req).await?;
458 eprintln!("{:?}", invoice);
459 Ok(invoice.to_cluster(node.clone().pubkey))
460 }
461 _ => {
462 panic!("We only support LND nodes at this time.")
463 }
464 }
465 } else {
466 let mut rng = rand::thread_rng();
468 let node = self.nodes.choose(&mut rng).unwrap();
469
470 match &node.client {
471 NodeClient::Lnd(client) => {
472 let req = LndSendPaymentSyncReq {
473 payment_request: payment_request.clone(),
474 amt: amount.to_string(),
475 fee_limit: FeeLimit {
476 fixed: max_fee.to_string(),
477 },
478 allow_self_payment: false,
479 };
480 let invoice = client.send_payment_sync(req).await?;
481 Ok(invoice.to_cluster(node.clone().pubkey))
482 }
483 _ => {
484 panic!("We only support LND nodes at this time.")
485 }
486 }
487 }
488 }
489}
490
491impl Node {
492 pub fn new(
493 pubkey: String,
494 ip: String,
495 port: String,
496 network: NodeNetwork,
497 lightning_impl: NodeLightningImpl,
498 client: NodeClient,
499 ) -> Node {
500 Self {
501 pubkey,
502 ip,
503 port,
504 network,
505 lightning_impl,
506 client,
507 }
508 }
509}
510
511impl Display for NodeNetwork {
512 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
513 match self {
514 NodeNetwork::Mainnet => write!(f, "mainnet"),
515 NodeNetwork::Testnet => write!(f, "testnet"),
516 }
517 }
518}
519
520pub fn to_hex(str: &str) -> Result<String> {
521 let decoded_bytes = base64::decode(str)?;
522 let hex_string = hex::encode(decoded_bytes);
523
524 Ok(hex_string)
525}
526
527#[cfg(test)]
528pub mod tests {
529 use crate::lnd::LndClient;
530
531 use super::{Cluster, ClusterAddInvoice, Node, NodeClient, NodeLightningImpl, NodeNetwork};
532
533 #[tokio::test]
534 async fn test_add_lookup_invoice() {
535 let mut cluster = create_test_cluster().await;
536 let add_invoice = ClusterAddInvoice {
537 pubkey: None,
538 memo: String::from("test"),
539 value: 1000,
540 expiry: 1000,
541 };
542 let invoice = cluster.add_invoice(add_invoice, None).await.unwrap();
543
544 assert_eq!(invoice.r_hash.len(), 64);
545 assert_eq!(invoice.payment_addr.len(), 64);
546
547 let lookup_invoice = cluster.lookup_invoice(&invoice.r_hash, None).await.unwrap();
548
549 assert_eq!(lookup_invoice.r_hash, invoice.r_hash);
550 }
551
552 pub async fn create_test_cluster() -> Cluster {
553 let node1 = Node {
554 pubkey: dotenvy::var("NODE1_PUBKEY").unwrap(),
555 ip: dotenvy::var("NODE1_IP").unwrap(),
556 port: dotenvy::var("NODE1_PORT").unwrap(),
557 network: NodeNetwork::Testnet,
558 lightning_impl: NodeLightningImpl::Lnd,
559 client: NodeClient::Lnd(LndClient::new(
560 dotenvy::var("NODE1_HOST").unwrap(),
561 dotenvy::var("NODE1_CERT_PATH").unwrap(),
562 dotenvy::var("NODE1_MACAROON_PATH").unwrap(),
563 )),
564 };
565
566 let nodes = vec![node1];
567 let redis = redis::Client::open("redis://127.0.01/").unwrap().get_async_connection().await.unwrap();
568 let cluster = Cluster::new(nodes, redis, 60, 60, 60);
569
570 cluster
571 }
572}