lightning_cluster/
cluster.rs

1use crate::lnd::Route;
2use crate::lnd::{AddInvoiceResponse, FeeLimit, LndClient, LndSendPaymentSyncReq};
3use anyhow::Result;
4use redis::aio::Connection;
5use core::fmt;
6use rand::seq::SliceRandom;
7use serde::{Deserialize, Serialize};
8use std::fmt::Display;
9extern crate redis;
10use redis::{AsyncCommands, FromRedisValue};
11
12pub struct Cluster {
13    pub nodes: Vec<Node>,
14    pub cache: Connection,
15    pub inv_exp_sec: i64,
16    pub addr_exp_sec: i64,
17    pub utxo_exp_sec: i64,
18}
19
20#[derive(Clone)]
21pub struct Node {
22    pub pubkey: String,
23    pub ip: String,
24    pub port: String,
25    pub network: NodeNetwork,
26    pub lightning_impl: NodeLightningImpl,
27    pub client: NodeClient,
28}
29
30#[derive(Clone)]
31pub enum NodeClient {
32    Lnd(LndClient),
33    CLightning,
34    Eclair,
35    Other,
36}
37#[derive(Clone)]
38pub enum NodeNetwork {
39    Mainnet,
40    Testnet,
41}
42
43#[derive(Clone)]
44pub enum NodeLightningImpl {
45    Lnd,
46    CLightning,
47    Eclair,
48    Other,
49}
50
51#[derive(Serialize, Deserialize, Debug)]
52pub struct ClusterAddInvoice {
53    pub pubkey: Option<String>,
54    pub memo: String,
55    pub value: i64,
56    pub expiry: i64,
57}
58
59#[derive(Serialize, Deserialize, Debug, Clone)]
60pub struct ClusterLookupInvoice {
61    pub pubkey: String,
62    pub memo: String,
63    pub r_preimage: String,
64    pub r_hash: String,
65    pub value: String,
66    pub settle_date: String,
67    pub payment_request: String,
68    pub description_hash: String,
69    pub expiry: String,
70    pub amt_paid_sat: String,
71    pub state: ClusterInvoiceState,
72}
73
74impl FromRedisValue for ClusterLookupInvoice {
75    fn from_redis_value(v: &redis::Value) -> redis::RedisResult<Self> {
76        match v {
77            redis::Value::Okay => {
78                return Ok(ClusterLookupInvoice {
79                    pubkey: "".to_string(),
80                    memo: "".to_string(),
81                    r_preimage: "".to_string(),
82                    r_hash: "".to_string(),
83                    value: "".to_string(),
84                    settle_date: "".to_string(),
85                    payment_request: "".to_string(),
86                    description_hash: "".to_string(),
87                    expiry: "".to_string(),
88                    amt_paid_sat: "".to_string(),
89                    state: ClusterInvoiceState::Open,
90                })
91            },
92            redis::Value::Data(data) => {
93                let json = String::from_utf8(data.to_vec()).unwrap();
94                let invoice: ClusterLookupInvoice = serde_json::from_str(&json).unwrap();
95                return Ok(invoice)
96            },
97            _ => panic!("Invalid redis value"),
98        };
99    }
100}
101
102#[derive(Serialize, Deserialize, Debug, Clone)]
103pub struct ClusterPayPaymentRequestRes {
104    pub pubkey: String,
105    pub payment_error: Option<String>,
106    pub payment_preimage: Option<String>,
107    pub payment_route: Option<Route>,
108    pub payment_hash: Option<String>,
109}
110
111#[derive(Deserialize, Debug, Clone, Serialize)]
112pub struct ClusterUtxos {
113    pub utxos: Vec<ClusterUtxo>,
114}
115
116impl FromRedisValue for ClusterUtxos {
117    fn from_redis_value(v: &redis::Value) -> redis::RedisResult<Self> {
118        match v {
119            redis::Value::Okay => {
120                return Ok(ClusterUtxos {
121                    utxos: vec![],
122                })
123            },
124            redis::Value::Data(data) => {
125                let json = String::from_utf8(data.to_vec()).unwrap();
126                let utxos: ClusterUtxos = serde_json::from_str(&json).unwrap();
127                return Ok(utxos)
128            },
129            _ => panic!("Invalid redis value"),
130        }
131    }
132}
133
134#[derive(Debug, Clone, Deserialize, Serialize)]
135pub struct ClusterUtxo {
136    pub pubkey: String,
137    pub address: String,
138    pub amount: u64,
139    pub confirmations: u64,
140}
141
142#[derive(Serialize, Deserialize, Debug, Clone)]
143pub enum ClusterInvoiceState {
144    #[serde(rename = "OPEN")]
145    Open = 0,
146    #[serde(rename = "SETTLED")]
147    Settled = 1,
148    #[serde(rename = "CANCELED")]
149    Canceled = 2,
150    #[serde(rename = "ACCEPTED")]
151    Accepted = 3,
152}
153
154impl Node {
155    pub async fn lookup_invoice(self: &Self, r_hash: &str) -> Result<ClusterLookupInvoice> {
156        match &self.client {
157            NodeClient::Lnd(client) => {
158                let invoice = client.lookup_invoice(r_hash).await?;
159                Ok(invoice.to_cluster(&self.pubkey))
160            }
161            _ => {
162                panic!("We only support LND nodes at this time.")
163            }
164        }
165    }
166
167    pub async fn add_invoice(self: &Self, req: ClusterAddInvoice) -> Result<AddInvoiceResponse> {
168        match &self.client {
169            NodeClient::Lnd(client) => {
170                let invoice = client.add_invoice(req).await?;
171
172                let response = AddInvoiceResponse {
173                    r_hash: to_hex(&invoice.r_hash)?,
174                    payment_addr: to_hex(&invoice.payment_addr)?,
175                    ..invoice
176                };
177                Ok(response)
178            }
179            _ => {
180                panic!("We only support LND nodes at this time.")
181            }
182        }
183    }
184
185    pub async fn next_address(&self) -> Result<String> {
186        match &self.client {
187            NodeClient::Lnd(client) => {
188                let addr = client.new_address().await?;
189                Ok(addr.address)
190            }
191            _ => {
192                panic!("We only support LND nodes at this time.")
193            }
194        }
195    }
196
197    pub async fn list_utxos(&self) -> Result<ClusterUtxos> {
198        match &self.client {
199            NodeClient::Lnd(client) => {
200                let utxos = client.list_unspent().await?;
201                let cluster_utxos = ClusterUtxos {
202                    utxos: utxos
203                        .utxos
204                        .into_iter()
205                        .map(|utxo| ClusterUtxo {
206                            pubkey: self.pubkey.clone(),
207                            address: utxo.address,
208                            amount: utxo.amount_sat.parse::<u64>().unwrap(),
209                            confirmations: utxo.confirmations.parse::<u64>().unwrap(),
210                        })
211                        .collect(),
212                };
213                Ok(cluster_utxos)
214            }
215            _ => {
216                panic!("We only support LND nodes at this time.")
217            }
218        }
219    }
220}
221
222impl Cluster {
223    pub fn new(
224        nodes: Vec<Node>,
225        redis: redis::aio::Connection,
226        inv_exp_sec: i64,
227        addr_exp_sec: i64,
228        utxo_exp_sec: i64,
229    ) -> Cluster {
230        Self {
231            nodes,
232            cache: redis,
233            inv_exp_sec: inv_exp_sec,
234            addr_exp_sec: addr_exp_sec,
235            utxo_exp_sec: utxo_exp_sec,
236        }
237    }
238
239    pub async fn lookup_invoice(
240        &mut self,
241        r_hash: &str,
242        pubkey: Option<String>,
243    ) -> Result<ClusterLookupInvoice> {
244        let cached_invoice = self.cache.get(&r_hash.to_string()).await?;
245
246        match cached_invoice {
247            Some(invoice) => {
248                eprintln!("cached");
249                Ok(invoice)
250            },
251            None => {
252                if let Some(pubkey) = pubkey {
253                    let node = self
254                        .nodes
255                        .iter()
256                        .find(|node| node.pubkey == pubkey)
257                        .unwrap();
258                    let invoice = node.lookup_invoice(r_hash).await?;
259                    let hexed_invoice = ClusterLookupInvoice {
260                        r_hash: to_hex(&invoice.r_hash)?,
261                        r_preimage: to_hex(&invoice.r_preimage)?,
262                        ..invoice
263                    };
264                    let json_string = serde_json::to_string(&hexed_invoice).unwrap();
265
266                    let _: Result<ClusterLookupInvoice, _> = self.cache
267                        .set_ex(
268                            r_hash.to_string(),
269                            json_string,
270                            self.inv_exp_sec as usize,
271                        ).await;
272                        eprintln!("requested invoice from node");
273                    Ok(hexed_invoice)
274                } else {
275                    // Make calls to all nodes to find who owns the invoice
276                    let mut tasks = vec![];
277                    for node in &self.nodes {
278                        let r_hash_clone = r_hash.clone();
279                        let task = node.lookup_invoice(r_hash_clone);
280                        tasks.push(task);
281                    }
282
283                    // Wait for all tasks to complete and find the first successful result
284                    let success_result = match futures::future::join_all(tasks)
285                        .await
286                        .into_iter()
287                        .enumerate()
288                        .find_map(|(_index, result)| result.ok())
289                    {
290                        Some(success_result) => success_result,
291                        None => return Err(anyhow::Error::msg("No nodes found this invoice.")),
292                    };
293
294                    let hexed_invoice = ClusterLookupInvoice {
295                        r_hash: to_hex(&success_result.r_hash)?,
296                        r_preimage: to_hex(&success_result.r_preimage)?,
297                        ..success_result.clone()
298                    };
299
300                    let json_invoice = serde_json::to_string(&hexed_invoice).unwrap();
301
302                    // Insert the successful result into the cache
303                    let _: Result<ClusterLookupInvoice, _> = self.cache
304                        .set_ex(
305                            r_hash.to_string(),
306                            json_invoice,
307                            self.inv_exp_sec as usize,
308                        )
309                        .await;
310
311                        eprintln!("requested invoice from node");
312
313                    Ok(hexed_invoice)
314                }
315            }
316        }
317    }
318
319    pub async fn add_invoice(
320        &self,
321        req: ClusterAddInvoice,
322        pubkey: Option<String>,
323    ) -> Result<AddInvoiceResponse> {
324        match pubkey {
325            Some(pubkey) => {
326                let node = self
327                    .nodes
328                    .iter()
329                    .find(|node| node.pubkey == pubkey)
330                    .unwrap();
331                node.add_invoice(req).await
332            }
333            None => {
334                let mut rng = rand::thread_rng();
335                let node = self.nodes.choose(&mut rng).unwrap();
336                node.add_invoice(req).await
337            }
338        }
339    }
340
341    pub async fn next_address(&mut self, pubkey: Option<String>) -> Result<String> {
342        match pubkey {
343            Some(pubkey) => {
344                let node = self
345                    .nodes
346                    .iter()
347                    .find(|node| node.pubkey == pubkey)
348                    .unwrap();
349
350                let addr = node.next_address().await?;
351
352                let _: Result<String, _> = self.cache
353                    .set_ex(
354                        addr.clone(),
355                        node.clone().pubkey,
356                        self.addr_exp_sec as usize,
357                    )
358                    .await;
359                Ok(addr)
360            }
361            None => {
362                let mut rng = rand::thread_rng();
363                let node = self.nodes.choose(&mut rng).unwrap();
364
365                let addr = node.next_address().await?;
366
367                let _: Result<String, _> = self.cache.set_ex(
368                    addr.clone(),
369                    node.clone().pubkey,
370                    self.addr_exp_sec as usize,
371                ).await;
372                Ok(addr)
373            }
374        }
375    }
376
377    pub async fn list_utxos(&mut self, pubkey: Option<&str>) -> Result<ClusterUtxos> {
378        match pubkey {
379            Some(pubkey) => {
380                let node = self
381                    .nodes
382                    .iter()
383                    .find(|node| node.pubkey == pubkey)
384                    .ok_or_else(|| anyhow::anyhow!("Node not found with provided pubkey"))?;
385
386                let cache_key = format!("utxos:{}", node.pubkey);
387                let cached_utxos = self.cache.get(&cache_key).await?;
388
389                match cached_utxos {
390                    Some(utxos) => Ok(utxos),
391                    None => {
392                        let utxos = node.list_utxos().await?;
393                        let json_utxos = serde_json::to_string(&utxos).unwrap();
394                        let _: Result<ClusterUtxos, _> = self.cache.set_ex(
395                            cache_key,
396                            json_utxos,
397                            self.utxo_exp_sec as usize,
398                        ).await;
399                        Ok(utxos)
400                    }
401                }
402            }
403            None => {
404                let mut cluster_utxos = ClusterUtxos { utxos: vec![] };
405
406                for node in &self.nodes {
407                    let cache_key = format!("utxos:{}", node.pubkey);
408                    let cached_utxos = self.cache.get(&cache_key).await?;
409
410                    let node_utxos = match cached_utxos {
411                        Some(utxos) => utxos,
412                        None => {
413                            let fetched_utxos = node.list_utxos().await?;
414                            let json_utxos = serde_json::to_string(&fetched_utxos).unwrap();
415                            let _: Result<ClusterUtxos, _> = self.cache.set_ex(
416                                cache_key,
417                                json_utxos,
418                                self.utxo_exp_sec as usize,
419                            ).await;
420                            fetched_utxos
421                        }
422                    };
423
424                    cluster_utxos.utxos.extend(node_utxos.utxos);
425                }
426
427                Ok(cluster_utxos)
428            }
429        }
430    }
431
432    pub async fn pay_invoice(
433        &self,
434        amount: u64,
435        payment_request: String,
436        max_fee: i64,
437        pubkey: Option<String>,
438    ) -> Result<ClusterPayPaymentRequestRes> {
439        // node selected
440        if pubkey.is_some() {
441            let node = self
442                .nodes
443                .iter()
444                .find(|node| &node.pubkey == pubkey.as_ref().unwrap())
445                .ok_or_else(|| anyhow::anyhow!("Node not found with provided pubkey"))?;
446
447            match &node.client {
448                NodeClient::Lnd(client) => {
449                    let req = LndSendPaymentSyncReq {
450                        payment_request: payment_request.clone(),
451                        amt: amount.to_string(),
452                        fee_limit: FeeLimit {
453                            fixed: max_fee.to_string(),
454                        },
455                        allow_self_payment: false,
456                    };
457                    let invoice = client.send_payment_sync(req).await?;
458                    eprintln!("{:?}", invoice);
459                    Ok(invoice.to_cluster(node.clone().pubkey))
460                }
461                _ => {
462                    panic!("We only support LND nodes at this time.")
463                }
464            }
465        } else {
466            // no node selected, select a node at random
467            let mut rng = rand::thread_rng();
468            let node = self.nodes.choose(&mut rng).unwrap();
469
470            match &node.client {
471                NodeClient::Lnd(client) => {
472                    let req = LndSendPaymentSyncReq {
473                        payment_request: payment_request.clone(),
474                        amt: amount.to_string(),
475                        fee_limit: FeeLimit {
476                            fixed: max_fee.to_string(),
477                        },
478                        allow_self_payment: false,
479                    };
480                    let invoice = client.send_payment_sync(req).await?;
481                    Ok(invoice.to_cluster(node.clone().pubkey))
482                }
483                _ => {
484                    panic!("We only support LND nodes at this time.")
485                }
486            }
487        }
488    }
489}
490
491impl Node {
492    pub fn new(
493        pubkey: String,
494        ip: String,
495        port: String,
496        network: NodeNetwork,
497        lightning_impl: NodeLightningImpl,
498        client: NodeClient,
499    ) -> Node {
500        Self {
501            pubkey,
502            ip,
503            port,
504            network,
505            lightning_impl,
506            client,
507        }
508    }
509}
510
511impl Display for NodeNetwork {
512    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
513        match self {
514            NodeNetwork::Mainnet => write!(f, "mainnet"),
515            NodeNetwork::Testnet => write!(f, "testnet"),
516        }
517    }
518}
519
520pub fn to_hex(str: &str) -> Result<String> {
521    let decoded_bytes = base64::decode(str)?;
522    let hex_string = hex::encode(decoded_bytes);
523
524    Ok(hex_string)
525}
526
527#[cfg(test)]
528pub mod tests {
529    use crate::lnd::LndClient;
530
531    use super::{Cluster, ClusterAddInvoice, Node, NodeClient, NodeLightningImpl, NodeNetwork};
532
533    #[tokio::test]
534    async fn test_add_lookup_invoice() {
535        let mut cluster = create_test_cluster().await;
536        let add_invoice = ClusterAddInvoice {
537            pubkey: None,
538            memo: String::from("test"),
539            value: 1000,
540            expiry: 1000,
541        };
542        let invoice = cluster.add_invoice(add_invoice, None).await.unwrap();
543
544        assert_eq!(invoice.r_hash.len(), 64);
545        assert_eq!(invoice.payment_addr.len(), 64);
546
547        let lookup_invoice = cluster.lookup_invoice(&invoice.r_hash, None).await.unwrap();
548
549        assert_eq!(lookup_invoice.r_hash, invoice.r_hash);
550    }
551
552    pub async fn create_test_cluster() -> Cluster {
553        let node1 = Node {
554            pubkey: dotenvy::var("NODE1_PUBKEY").unwrap(),
555            ip: dotenvy::var("NODE1_IP").unwrap(),
556            port: dotenvy::var("NODE1_PORT").unwrap(),
557            network: NodeNetwork::Testnet,
558            lightning_impl: NodeLightningImpl::Lnd,
559            client: NodeClient::Lnd(LndClient::new(
560                dotenvy::var("NODE1_HOST").unwrap(),
561                dotenvy::var("NODE1_CERT_PATH").unwrap(),
562                dotenvy::var("NODE1_MACAROON_PATH").unwrap(),
563            )),
564        };
565
566        let nodes = vec![node1];
567        let redis = redis::Client::open("redis://127.0.01/").unwrap().get_async_connection().await.unwrap();
568        let cluster = Cluster::new(nodes, redis, 60, 60, 60);
569
570        cluster
571    }
572}