Skip to main content

spawn_lnd/
cluster.rs

1use std::collections::HashMap;
2
3use lnd_grpc_rust::{
4    LndConnectError, LndNodeClients, LndNodeConfig,
5    lnrpc::{Channel, ConnectPeerResponse},
6};
7use thiserror::Error;
8use tokio::time::sleep;
9use uuid::Uuid;
10
11use crate::{
12    BITCOIND_P2P_PORT, BitcoinCore, BitcoinCoreConfig, BitcoinCoreError, BitcoinRpcError,
13    CleanupReport, ConfigError, DEFAULT_GENERATE_ADDRESS, DockerClient, DockerError, LND_P2P_PORT,
14    LndConfig, LndDaemon, LndError, NodeConfig, RetryPolicy, SpawnLndConfig,
15    lnd::channel_point_string,
16};
17
18/// Default on-chain funding amount sent to each LND node.
19pub const DEFAULT_FUNDING_AMOUNT_BTC: f64 = 1.0;
20/// Default number of blocks mined after funding transactions.
21pub const DEFAULT_FUNDING_CONFIRMATION_BLOCKS: u64 = 1;
22/// Default public channel capacity opened by [`SpawnedCluster::open_channel`].
23pub const DEFAULT_CHANNEL_CAPACITY_SAT: i64 = 100_000;
24/// Default number of blocks mined after opening a channel.
25pub const DEFAULT_CHANNEL_CONFIRMATION_BLOCKS: u64 = 6;
26const SATOSHIS_PER_BTC: f64 = 100_000_000.0;
27
28/// A running regtest cluster containing Bitcoin Core and LND containers.
29#[derive(Debug)]
30pub struct SpawnedCluster {
31    docker: DockerClient,
32    config: SpawnLndConfig,
33    cluster_id: String,
34    bitcoinds: Vec<BitcoinCore>,
35    nodes: HashMap<String, SpawnedNode>,
36    node_order: Vec<String>,
37    shutdown: bool,
38}
39
40impl SpawnedCluster {
41    /// Spawn a cluster using a validated config and a default Docker connection.
42    pub async fn spawn(config: SpawnLndConfig) -> Result<Self, SpawnError> {
43        config.validate()?;
44        let docker = DockerClient::connect().await?;
45        Self::spawn_validated_with_docker(docker, config).await
46    }
47
48    /// Spawn a cluster using a caller-provided Docker client.
49    pub async fn spawn_with_docker(
50        docker: DockerClient,
51        config: SpawnLndConfig,
52    ) -> Result<Self, SpawnError> {
53        config.validate()?;
54        Self::spawn_validated_with_docker(docker, config).await
55    }
56
57    async fn spawn_validated_with_docker(
58        docker: DockerClient,
59        config: SpawnLndConfig,
60    ) -> Result<Self, SpawnError> {
61        let cluster_id = new_cluster_id();
62        let cleanup_docker = docker.clone();
63        let keep_containers = config.keep_containers;
64
65        match spawn_inner(docker, config, cluster_id.clone()).await {
66            Ok(cluster) => Ok(cluster),
67            Err(error) => {
68                if keep_containers {
69                    return Err(error);
70                }
71
72                cleanup_docker
73                    .cleanup_cluster(&cluster_id)
74                    .await
75                    .map_err(|source| SpawnError::StartupCleanup {
76                        cluster_id,
77                        startup_error: error.to_string(),
78                        source: Box::new(source),
79                    })?;
80                Err(error)
81            }
82        }
83    }
84
85    /// Return the generated cluster id used in Docker labels.
86    pub fn cluster_id(&self) -> &str {
87        &self.cluster_id
88    }
89
90    /// Return the config used to spawn this cluster.
91    pub fn config(&self) -> &SpawnLndConfig {
92        &self.config
93    }
94
95    /// Return all spawned Bitcoin Core chain groups.
96    pub fn bitcoinds(&self) -> &[BitcoinCore] {
97        &self.bitcoinds
98    }
99
100    /// Look up a spawned LND node by alias.
101    pub fn node(&self, alias: &str) -> Option<&SpawnedNode> {
102        self.nodes.get(alias)
103    }
104
105    /// Iterate spawned LND nodes in configured order.
106    pub fn nodes(&self) -> impl Iterator<Item = &SpawnedNode> {
107        self.node_order
108            .iter()
109            .filter_map(|alias| self.nodes.get(alias))
110    }
111
112    /// Iterate node aliases in configured order.
113    pub fn node_aliases(&self) -> impl Iterator<Item = &str> {
114        self.node_order.iter().map(String::as_str)
115    }
116
117    /// Build connection configs for all LND nodes.
118    pub fn node_configs(&self) -> Vec<LndNodeConfig> {
119        self.nodes().map(SpawnedNode::node_config).collect()
120    }
121
122    /// Connect to all LND nodes with `lnd_grpc_rust`.
123    pub async fn connect_nodes(&self) -> Result<LndNodeClients, SpawnError> {
124        lnd_grpc_rust::connect_nodes(self.node_configs())
125            .await
126            .map_err(SpawnError::ConnectNodes)
127    }
128
129    /// Connect one LND node to another over the Docker bridge network.
130    pub async fn connect_peer(
131        &self,
132        from_alias: &str,
133        to_alias: &str,
134    ) -> Result<PeerConnection, SpawnError> {
135        let from = self.require_node(from_alias)?;
136        let to = self.require_node(to_alias)?;
137        let host = lnd_bridge_socket(to)?;
138        let response = from
139            .daemon
140            .connect_peer(to.daemon.public_key.clone(), host.clone())
141            .await
142            .or_else(|error| already_connected_response(error, &to.daemon.public_key))
143            .map_err(|source| SpawnError::Lnd {
144                alias: from_alias.to_string(),
145                source: Box::new(source),
146            })?;
147
148        Ok(PeerConnection {
149            from_alias: from_alias.to_string(),
150            to_alias: to_alias.to_string(),
151            public_key: to.daemon.public_key.clone(),
152            socket: host,
153            status: response.status,
154        })
155    }
156
157    /// Connect every LND node to every other LND node.
158    pub async fn connect_all_peers(&self) -> Result<Vec<PeerConnection>, SpawnError> {
159        let mut connections = Vec::new();
160
161        for from_alias in &self.node_order {
162            for to_alias in &self.node_order {
163                if from_alias == to_alias {
164                    continue;
165                }
166
167                connections.push(self.connect_peer(from_alias, to_alias).await?);
168            }
169        }
170
171        Ok(connections)
172    }
173
174    /// Fund one LND node with [`DEFAULT_FUNDING_AMOUNT_BTC`].
175    pub async fn fund_node(&self, alias: &str) -> Result<FundingReport, SpawnError> {
176        self.fund_node_with_amount(alias, DEFAULT_FUNDING_AMOUNT_BTC)
177            .await
178    }
179
180    /// Fund one LND node with a caller-provided BTC amount.
181    pub async fn fund_node_with_amount(
182        &self,
183        alias: &str,
184        amount_btc: f64,
185    ) -> Result<FundingReport, SpawnError> {
186        let mut reports = self.fund_nodes_with_amount([alias], amount_btc).await?;
187        Ok(reports.remove(0))
188    }
189
190    /// Batch-fund multiple LND nodes with [`DEFAULT_FUNDING_AMOUNT_BTC`] each.
191    pub async fn fund_nodes<I, S>(&self, aliases: I) -> Result<Vec<FundingReport>, SpawnError>
192    where
193        I: IntoIterator<Item = S>,
194        S: AsRef<str>,
195    {
196        self.fund_nodes_with_amount(aliases, DEFAULT_FUNDING_AMOUNT_BTC)
197            .await
198    }
199
200    /// Batch-fund multiple LND nodes with the same caller-provided BTC amount.
201    pub async fn fund_nodes_with_amount<I, S>(
202        &self,
203        aliases: I,
204        amount_btc: f64,
205    ) -> Result<Vec<FundingReport>, SpawnError>
206    where
207        I: IntoIterator<Item = S>,
208        S: AsRef<str>,
209    {
210        let amount_sat = btc_to_sat(amount_btc)?;
211        let mut recipients = Vec::new();
212        let mut amounts = HashMap::new();
213
214        for alias in aliases {
215            let alias = alias.as_ref().to_string();
216            let node = self.require_node(&alias)?;
217            let starting_balance_sat = node
218                .daemon
219                .wallet_balance(1)
220                .await
221                .map_err(|source| SpawnError::Lnd {
222                    alias: alias.clone(),
223                    source: Box::new(source),
224                })?
225                .confirmed_balance;
226            let starting_utxos = node
227                .daemon
228                .list_unspent(1, i32::MAX)
229                .await
230                .map_err(|source| SpawnError::Lnd {
231                    alias: alias.clone(),
232                    source: Box::new(source),
233                })?;
234            let starting_utxo_total_sat: i64 =
235                starting_utxos.iter().map(|utxo| utxo.amount_sat).sum();
236            let required_balance_sat = starting_balance_sat
237                .checked_add(amount_sat)
238                .ok_or(SpawnError::InvalidFundingAmount { amount_btc })?;
239            let required_utxo_total_sat = starting_utxo_total_sat
240                .checked_add(amount_sat)
241                .ok_or(SpawnError::InvalidFundingAmount { amount_btc })?;
242            let address = node
243                .daemon
244                .new_address()
245                .await
246                .map_err(|source| SpawnError::Lnd {
247                    alias: alias.clone(),
248                    source: Box::new(source),
249                })?;
250
251            amounts.insert(address.clone(), amount_btc);
252            recipients.push(FundingRecipient {
253                alias,
254                address,
255                required_balance_sat,
256                required_utxo_total_sat,
257            });
258        }
259
260        if recipients.is_empty() {
261            return Ok(Vec::new());
262        }
263
264        let funder = &self.bitcoinds[0];
265        let txid = funder
266            .wallet_rpc
267            .send_many(&amounts)
268            .await
269            .map_err(|source| SpawnError::BitcoinRpc {
270                group_index: 0,
271                source: Box::new(source),
272            })?;
273        let confirmation_blocks = funder
274            .rpc
275            .generate_to_address(
276                DEFAULT_FUNDING_CONFIRMATION_BLOCKS,
277                DEFAULT_GENERATE_ADDRESS,
278            )
279            .await
280            .map_err(|source| SpawnError::BitcoinRpc {
281                group_index: 0,
282                source: Box::new(source),
283            })?;
284
285        wait_bitcoind_groups_synced(&self.bitcoinds, &self.config.startup_retry).await?;
286        wait_lnd_nodes_synced(&self.nodes, &self.node_order, &self.config.startup_retry).await?;
287
288        let mut reports = Vec::with_capacity(recipients.len());
289        for recipient in recipients {
290            let node = self.require_node(&recipient.alias)?;
291            let balance = node
292                .daemon
293                .wait_for_spendable_balance(recipient.required_balance_sat)
294                .await
295                .map_err(|source| SpawnError::Lnd {
296                    alias: recipient.alias.clone(),
297                    source: Box::new(source),
298                })?;
299            let utxos = node
300                .daemon
301                .wait_for_spendable_utxos(recipient.required_utxo_total_sat)
302                .await
303                .map_err(|source| SpawnError::Lnd {
304                    alias: recipient.alias.clone(),
305                    source: Box::new(source),
306                })?;
307            let spendable_utxo_total_sat = utxos.iter().map(|utxo| utxo.amount_sat).sum();
308
309            reports.push(FundingReport {
310                alias: recipient.alias,
311                address: recipient.address,
312                txid: txid.clone(),
313                amount_btc,
314                confirmation_blocks: confirmation_blocks.clone(),
315                confirmed_balance_sat: balance.confirmed_balance,
316                spendable_utxo_count: utxos.len(),
317                spendable_utxo_total_sat,
318            });
319        }
320
321        Ok(reports)
322    }
323
324    /// Open a public channel with [`DEFAULT_CHANNEL_CAPACITY_SAT`].
325    pub async fn open_channel(
326        &self,
327        from_alias: &str,
328        to_alias: &str,
329    ) -> Result<ChannelReport, SpawnError> {
330        self.open_channel_with_amount(from_alias, to_alias, DEFAULT_CHANNEL_CAPACITY_SAT)
331            .await
332    }
333
334    /// Open a public channel with a caller-provided satoshi capacity.
335    pub async fn open_channel_with_amount(
336        &self,
337        from_alias: &str,
338        to_alias: &str,
339        local_funding_amount_sat: i64,
340    ) -> Result<ChannelReport, SpawnError> {
341        let from = self.require_node(from_alias)?;
342        let to = self.require_node(to_alias)?;
343        let bitcoind = &self.bitcoinds[from.chain_group_index];
344
345        self.connect_peer(from_alias, to_alias).await?;
346
347        let channel_point = from
348            .daemon
349            .open_channel_sync(&to.daemon.public_key, local_funding_amount_sat, 0)
350            .await
351            .map_err(|source| SpawnError::Lnd {
352                alias: from_alias.to_string(),
353                source: Box::new(source),
354            })?;
355        let channel_point =
356            channel_point_string(&channel_point).map_err(|source| SpawnError::Lnd {
357                alias: from_alias.to_string(),
358                source: Box::new(source),
359            })?;
360
361        from.daemon
362            .wait_for_pending_channel(&to.daemon.public_key, &channel_point)
363            .await
364            .map_err(|source| SpawnError::Lnd {
365                alias: from_alias.to_string(),
366                source: Box::new(source),
367            })?;
368
369        let confirmation_blocks = bitcoind
370            .rpc
371            .generate_to_address(
372                DEFAULT_CHANNEL_CONFIRMATION_BLOCKS,
373                DEFAULT_GENERATE_ADDRESS,
374            )
375            .await
376            .map_err(|source| SpawnError::BitcoinRpc {
377                group_index: from.chain_group_index,
378                source: Box::new(source),
379            })?;
380
381        wait_bitcoind_groups_synced(&self.bitcoinds, &self.config.startup_retry).await?;
382        wait_lnd_nodes_synced(&self.nodes, &self.node_order, &self.config.startup_retry).await?;
383
384        let from_channel = from
385            .daemon
386            .wait_for_active_channel(&to.daemon.public_key, &channel_point)
387            .await
388            .map_err(|source| SpawnError::Lnd {
389                alias: from_alias.to_string(),
390                source: Box::new(source),
391            })?;
392        let to_channel = to
393            .daemon
394            .wait_for_active_channel(&from.daemon.public_key, &channel_point)
395            .await
396            .map_err(|source| SpawnError::Lnd {
397                alias: to_alias.to_string(),
398                source: Box::new(source),
399            })?;
400
401        Ok(ChannelReport {
402            from_alias: from_alias.to_string(),
403            to_alias: to_alias.to_string(),
404            channel_point,
405            local_funding_amount_sat,
406            confirmation_blocks,
407            from_channel,
408            to_channel,
409        })
410    }
411
412    /// Stop and remove all containers in this cluster unless `keep_containers` is set.
413    pub async fn shutdown(&mut self) -> Result<CleanupReport, SpawnError> {
414        if self.shutdown || self.config.keep_containers {
415            self.shutdown = true;
416            return Ok(empty_cleanup_report());
417        }
418
419        let report = self.docker.cleanup_cluster(&self.cluster_id).await?;
420        self.shutdown = true;
421        Ok(report)
422    }
423
424    fn require_node(&self, alias: &str) -> Result<&SpawnedNode, SpawnError> {
425        self.nodes
426            .get(alias)
427            .ok_or_else(|| SpawnError::UnknownNode {
428                alias: alias.to_string(),
429            })
430    }
431}
432
433impl From<DockerError> for SpawnError {
434    fn from(source: DockerError) -> Self {
435        Self::Docker(Box::new(source))
436    }
437}
438
439impl Drop for SpawnedCluster {
440    fn drop(&mut self) {
441        if !self.shutdown && !self.config.keep_containers {
442            eprintln!(
443                "spawn-lnd cluster {} dropped without shutdown(); call shutdown().await to remove managed containers",
444                self.cluster_id
445            );
446        }
447    }
448}
449
450/// A spawned LND node and its placement in the cluster.
451#[derive(Clone, Debug)]
452pub struct SpawnedNode {
453    alias: String,
454    node_index: usize,
455    chain_group_index: usize,
456    daemon: LndDaemon,
457}
458
459impl SpawnedNode {
460    fn new(node_index: usize, chain_group_index: usize, daemon: LndDaemon) -> Self {
461        Self {
462            alias: daemon.alias.clone(),
463            node_index,
464            chain_group_index,
465            daemon,
466        }
467    }
468
469    /// Return the node alias.
470    pub fn alias(&self) -> &str {
471        &self.alias
472    }
473
474    /// Return the zero-based node index in spawn order.
475    pub fn node_index(&self) -> usize {
476        self.node_index
477    }
478
479    /// Return the Bitcoin Core chain group index backing this node.
480    pub fn chain_group_index(&self) -> usize {
481        self.chain_group_index
482    }
483
484    /// Return the underlying LND daemon handle.
485    pub fn lnd(&self) -> &LndDaemon {
486        &self.daemon
487    }
488
489    /// Build an `lnd_grpc_rust` node connection config for this node.
490    pub fn node_config(&self) -> LndNodeConfig {
491        self.daemon.node_config()
492    }
493
494    /// Return the LND identity public key.
495    pub fn public_key(&self) -> &str {
496        &self.daemon.public_key
497    }
498}
499
500/// Result of connecting one LND node to another.
501#[derive(Clone, Debug, Eq, PartialEq)]
502pub struct PeerConnection {
503    /// Source node alias.
504    pub from_alias: String,
505    /// Destination node alias.
506    pub to_alias: String,
507    /// Destination node identity public key.
508    pub public_key: String,
509    /// Destination P2P socket used for the connection.
510    pub socket: String,
511    /// LND connection status string.
512    pub status: String,
513}
514
515/// Result of funding an LND wallet.
516#[derive(Clone, Debug, PartialEq)]
517pub struct FundingReport {
518    /// Funded node alias.
519    pub alias: String,
520    /// On-chain address generated by the funded node.
521    pub address: String,
522    /// Funding transaction id.
523    pub txid: String,
524    /// Funding amount in BTC.
525    pub amount_btc: f64,
526    /// Block hashes mined to confirm the funding transaction.
527    pub confirmation_blocks: Vec<String>,
528    /// Confirmed LND wallet balance after funding.
529    pub confirmed_balance_sat: i64,
530    /// Spendable UTXO count after funding.
531    pub spendable_utxo_count: usize,
532    /// Total spendable UTXO value after funding.
533    pub spendable_utxo_total_sat: i64,
534}
535
536#[derive(Clone, Debug, Eq, PartialEq)]
537struct FundingRecipient {
538    alias: String,
539    address: String,
540    required_balance_sat: i64,
541    required_utxo_total_sat: i64,
542}
543
544/// Result of opening and confirming a public Lightning channel.
545#[derive(Clone, Debug, PartialEq)]
546pub struct ChannelReport {
547    /// Channel opener alias.
548    pub from_alias: String,
549    /// Remote node alias.
550    pub to_alias: String,
551    /// Channel point in `funding_txid:output_index` form.
552    pub channel_point: String,
553    /// Local funding amount in satoshis.
554    pub local_funding_amount_sat: i64,
555    /// Block hashes mined to confirm the funding transaction.
556    pub confirmation_blocks: Vec<String>,
557    /// Channel as reported by the opener.
558    pub from_channel: Channel,
559    /// Channel as reported by the remote node.
560    pub to_channel: Channel,
561}
562
563/// Error returned by cluster lifecycle and orchestration operations.
564#[derive(Debug, Error)]
565pub enum SpawnError {
566    /// Invalid cluster configuration.
567    #[error(transparent)]
568    Config(#[from] ConfigError),
569
570    /// Docker operation failed.
571    #[error(transparent)]
572    Docker(#[from] Box<DockerError>),
573
574    /// Failed to spawn a Bitcoin Core chain group.
575    #[error("failed to spawn Bitcoin Core chain group {group_index}")]
576    BitcoinCore {
577        /// Chain group index.
578        group_index: usize,
579        /// Underlying Bitcoin Core error.
580        source: Box<BitcoinCoreError>,
581    },
582
583    /// Failed to connect two Bitcoin Core chain groups.
584    #[error("failed to connect Bitcoin Core chain group {from_group} to group {to_group}")]
585    BitcoinPeer {
586        /// Source chain group index.
587        from_group: usize,
588        /// Destination chain group index.
589        to_group: usize,
590        /// Underlying Bitcoin RPC error.
591        source: Box<BitcoinRpcError>,
592    },
593
594    /// Bitcoin Core RPC failed for a chain group.
595    #[error("Bitcoin Core RPC failed for chain group {group_index}")]
596    BitcoinRpc {
597        /// Chain group index.
598        group_index: usize,
599        /// Underlying Bitcoin RPC error.
600        source: Box<BitcoinRpcError>,
601    },
602
603    /// Bitcoin Core chain groups did not converge on a common tip.
604    #[error(
605        "Bitcoin Core chain groups did not sync to a common tip after {attempts} attempts; last tips: {last_tips:?}"
606    )]
607    BitcoinSyncTimeout {
608        /// Number of sync attempts.
609        attempts: usize,
610        /// Last observed best block hashes.
611        last_tips: Vec<String>,
612    },
613
614    /// A Bitcoin Core container had no Docker bridge IP.
615    #[error("Bitcoin Core chain group {group_index} did not expose a bridge IP address")]
616    MissingBitcoindIp {
617        /// Chain group index.
618        group_index: usize,
619    },
620
621    /// A requested LND node alias does not exist.
622    #[error("unknown LND node alias: {alias}")]
623    UnknownNode {
624        /// Missing alias.
625        alias: String,
626    },
627
628    /// A funding amount was not positive and finite.
629    #[error("funding amount must be positive and finite, got {amount_btc} BTC")]
630    InvalidFundingAmount {
631        /// Invalid amount in BTC.
632        amount_btc: f64,
633    },
634
635    /// An LND container had no Docker bridge IP.
636    #[error("LND node {alias} did not expose a bridge IP address")]
637    MissingLndIp {
638        /// Node alias.
639        alias: String,
640    },
641
642    /// LND operation failed.
643    #[error("failed to spawn LND node {alias}")]
644    Lnd {
645        /// Node alias.
646        alias: String,
647        /// Underlying LND error.
648        source: Box<LndError>,
649    },
650
651    /// Connecting to all LND nodes failed.
652    #[error(transparent)]
653    ConnectNodes(#[from] LndConnectError),
654
655    /// Startup failed and the attempted cleanup also failed.
656    #[error(
657        "startup failed for cluster {cluster_id}, then cleanup failed; startup error: {startup_error}"
658    )]
659    StartupCleanup {
660        /// Cluster id.
661        cluster_id: String,
662        /// Original startup error as text.
663        startup_error: String,
664        /// Cleanup failure.
665        source: Box<DockerError>,
666    },
667}
668
669async fn spawn_inner(
670    docker: DockerClient,
671    config: SpawnLndConfig,
672    cluster_id: String,
673) -> Result<SpawnedCluster, SpawnError> {
674    let bitcoinds = spawn_bitcoinds(&docker, &config, &cluster_id).await?;
675    connect_bitcoind_groups(&bitcoinds).await?;
676    prepare_primary_wallet(&bitcoinds).await?;
677    wait_bitcoind_groups_synced(&bitcoinds, &config.startup_retry).await?;
678    let (nodes, node_order) = spawn_lnd_nodes(&docker, &config, &cluster_id, &bitcoinds).await?;
679    wait_bitcoind_groups_synced(&bitcoinds, &config.startup_retry).await?;
680    wait_lnd_nodes_synced(&nodes, &node_order, &config.startup_retry).await?;
681
682    Ok(SpawnedCluster {
683        docker,
684        config,
685        cluster_id,
686        bitcoinds,
687        nodes,
688        node_order,
689        shutdown: false,
690    })
691}
692
693async fn spawn_bitcoinds(
694    docker: &DockerClient,
695    config: &SpawnLndConfig,
696    cluster_id: &str,
697) -> Result<Vec<BitcoinCore>, SpawnError> {
698    let mut bitcoinds = Vec::with_capacity(config.chain_group_count());
699
700    for group_index in 0..config.chain_group_count() {
701        let bitcoind = BitcoinCore::spawn(
702            docker,
703            BitcoinCoreConfig::new(cluster_id, group_index)
704                .image(config.bitcoind_image.clone())
705                .startup_retry_policy(config.startup_retry),
706        )
707        .await
708        .map_err(|source| SpawnError::BitcoinCore {
709            group_index,
710            source: Box::new(source),
711        })?;
712        bitcoinds.push(bitcoind);
713    }
714
715    Ok(bitcoinds)
716}
717
718async fn connect_bitcoind_groups(bitcoinds: &[BitcoinCore]) -> Result<(), SpawnError> {
719    for (from_group, from) in bitcoinds.iter().enumerate() {
720        for (to_group, to) in bitcoinds.iter().enumerate() {
721            if from_group == to_group {
722                continue;
723            }
724
725            let socket = bitcoind_bridge_socket(to_group, to)?;
726            from.rpc
727                .add_node(&socket)
728                .await
729                .map_err(|source| SpawnError::BitcoinPeer {
730                    from_group,
731                    to_group,
732                    source: Box::new(source),
733                })?;
734        }
735    }
736
737    Ok(())
738}
739
740async fn prepare_primary_wallet(bitcoinds: &[BitcoinCore]) -> Result<(), SpawnError> {
741    bitcoinds[0]
742        .prepare_mining_wallet()
743        .await
744        .map_err(|source| SpawnError::BitcoinCore {
745            group_index: 0,
746            source: Box::new(source),
747        })?;
748
749    Ok(())
750}
751
752async fn wait_bitcoind_groups_synced(
753    bitcoinds: &[BitcoinCore],
754    policy: &RetryPolicy,
755) -> Result<(), SpawnError> {
756    if bitcoinds.len() <= 1 {
757        return Ok(());
758    }
759
760    let mut last_tips = Vec::new();
761
762    for _ in 0..policy.attempts {
763        let mut tips = Vec::with_capacity(bitcoinds.len());
764
765        for (group_index, bitcoind) in bitcoinds.iter().enumerate() {
766            let info = bitcoind.rpc.get_blockchain_info().await.map_err(|source| {
767                SpawnError::BitcoinRpc {
768                    group_index,
769                    source: Box::new(source),
770                }
771            })?;
772            tips.push((info.blocks, info.bestblockhash));
773        }
774
775        last_tips = tips
776            .iter()
777            .map(|(height, hash)| format!("{height}:{hash}"))
778            .collect();
779
780        if let Some((target_height, target_hash)) = tips.iter().max_by_key(|(height, _)| *height)
781            && tips
782                .iter()
783                .all(|(height, hash)| height == target_height && hash == target_hash)
784        {
785            return Ok(());
786        }
787
788        sleep(policy.interval()).await;
789    }
790
791    Err(SpawnError::BitcoinSyncTimeout {
792        attempts: policy.attempts,
793        last_tips,
794    })
795}
796
797async fn spawn_lnd_nodes(
798    docker: &DockerClient,
799    config: &SpawnLndConfig,
800    cluster_id: &str,
801    bitcoinds: &[BitcoinCore],
802) -> Result<(HashMap<String, SpawnedNode>, Vec<String>), SpawnError> {
803    let mut nodes = HashMap::with_capacity(config.nodes.len());
804    let mut node_order = Vec::with_capacity(config.nodes.len());
805
806    for (node_index, node_config) in config.nodes.iter().enumerate() {
807        let chain_group_index = chain_group_index(node_index, config.nodes_per_bitcoind);
808        let bitcoind = &bitcoinds[chain_group_index];
809        let lnd_config = lnd_config(cluster_id, node_index, node_config, config);
810        let daemon = LndDaemon::spawn_with_startup_cleanup(
811            docker,
812            bitcoind,
813            lnd_config,
814            !config.keep_containers,
815        )
816        .await
817        .map_err(|source| SpawnError::Lnd {
818            alias: node_config.alias.clone(),
819            source: Box::new(source),
820        })?;
821        wait_bitcoind_groups_synced(bitcoinds, &config.startup_retry).await?;
822        let node = SpawnedNode::new(node_index, chain_group_index, daemon);
823
824        node_order.push(node.alias.clone());
825        nodes.insert(node.alias.clone(), node);
826    }
827
828    Ok((nodes, node_order))
829}
830
831async fn wait_lnd_nodes_synced(
832    nodes: &HashMap<String, SpawnedNode>,
833    node_order: &[String],
834    policy: &RetryPolicy,
835) -> Result<(), SpawnError> {
836    for alias in node_order {
837        let node = &nodes[alias];
838        node.daemon
839            .wait_synced_to_chain_with_policy(policy)
840            .await
841            .map_err(|source| SpawnError::Lnd {
842                alias: alias.clone(),
843                source: Box::new(source),
844            })?;
845    }
846
847    Ok(())
848}
849
850fn lnd_config(
851    cluster_id: &str,
852    node_index: usize,
853    node_config: &NodeConfig,
854    config: &SpawnLndConfig,
855) -> LndConfig {
856    LndConfig::new(cluster_id, node_config.alias.clone(), node_index)
857        .image(config.lnd_image.clone())
858        .extra_args(node_config.lnd_args.clone())
859        .startup_retry_policy(config.startup_retry)
860}
861
862fn chain_group_index(node_index: usize, nodes_per_bitcoind: usize) -> usize {
863    node_index / nodes_per_bitcoind
864}
865
866fn btc_to_sat(amount_btc: f64) -> Result<i64, SpawnError> {
867    if !amount_btc.is_finite() || amount_btc <= 0.0 {
868        return Err(SpawnError::InvalidFundingAmount { amount_btc });
869    }
870
871    let amount_sat = (amount_btc * SATOSHIS_PER_BTC).round();
872    if amount_sat < 1.0 || amount_sat > i64::MAX as f64 {
873        return Err(SpawnError::InvalidFundingAmount { amount_btc });
874    }
875
876    Ok(amount_sat as i64)
877}
878
879fn bitcoind_bridge_socket(
880    group_index: usize,
881    bitcoind: &BitcoinCore,
882) -> Result<String, SpawnError> {
883    let ip = bitcoind
884        .container
885        .ip_address
886        .as_deref()
887        .ok_or(SpawnError::MissingBitcoindIp { group_index })?;
888
889    Ok(format!("{ip}:{BITCOIND_P2P_PORT}"))
890}
891
892fn lnd_bridge_socket(node: &SpawnedNode) -> Result<String, SpawnError> {
893    let ip =
894        node.daemon
895            .container
896            .ip_address
897            .as_deref()
898            .ok_or_else(|| SpawnError::MissingLndIp {
899                alias: node.alias.clone(),
900            })?;
901
902    Ok(format!("{ip}:{LND_P2P_PORT}"))
903}
904
905fn already_connected_response(
906    error: LndError,
907    public_key: &str,
908) -> Result<ConnectPeerResponse, LndError> {
909    match error {
910        LndError::Rpc { message, .. } if message.contains("already connected") => {
911            Ok(ConnectPeerResponse {
912                status: format!("already connected to {public_key}"),
913            })
914        }
915        error => Err(error),
916    }
917}
918
919fn new_cluster_id() -> String {
920    format!("cluster-{}", Uuid::new_v4().simple())
921}
922
923fn empty_cleanup_report() -> CleanupReport {
924    CleanupReport {
925        matched: 0,
926        removed: 0,
927        failures: Vec::new(),
928    }
929}
930
931#[cfg(test)]
932mod tests {
933    use super::{already_connected_response, btc_to_sat, chain_group_index, lnd_config};
934    use crate::LndError;
935    use crate::{NodeConfig, RetryPolicy, SpawnLndConfig};
936
937    #[test]
938    fn assigns_nodes_to_chain_groups() {
939        let groups = (0..8)
940            .map(|node_index| chain_group_index(node_index, 3))
941            .collect::<Vec<_>>();
942
943        assert_eq!(groups, [0, 0, 0, 1, 1, 1, 2, 2]);
944    }
945
946    #[test]
947    fn builds_lnd_config_from_node_config() {
948        let node = NodeConfig::new("alice").with_lnd_args(["--alias=Alice", "--color=#3399ff"]);
949        let spawn_config = SpawnLndConfig {
950            nodes: vec![node.clone()],
951            bitcoind_image: "custom/bitcoin:30".to_string(),
952            lnd_image: "custom/lnd:v1".to_string(),
953            nodes_per_bitcoind: 3,
954            keep_containers: false,
955            startup_retry: RetryPolicy::new(12, 250),
956        };
957        let config = lnd_config("cluster-1", 2, &node, &spawn_config);
958
959        assert_eq!(config.cluster_id, "cluster-1");
960        assert_eq!(config.alias, "alice");
961        assert_eq!(config.node_index, 2);
962        assert_eq!(config.image, "custom/lnd:v1");
963        assert_eq!(config.extra_args, ["--alias=Alice", "--color=#3399ff"]);
964        assert_eq!(config.startup_retry, RetryPolicy::new(12, 250));
965    }
966
967    #[test]
968    fn treats_already_connected_peer_as_success() {
969        let response = already_connected_response(
970            LndError::Rpc {
971                socket: "127.0.0.1:10009".to_string(),
972                method: "ConnectPeer",
973                message: "already connected to peer".to_string(),
974            },
975            "pubkey",
976        )
977        .expect("already connected is success");
978
979        assert_eq!(response.status, "already connected to pubkey");
980    }
981
982    #[test]
983    fn converts_btc_amount_to_sats() {
984        assert_eq!(btc_to_sat(1.0).expect("sats"), 100_000_000);
985        assert_eq!(btc_to_sat(0.000_000_01).expect("sats"), 1);
986        assert!(btc_to_sat(0.0).is_err());
987        assert!(btc_to_sat(f64::NAN).is_err());
988    }
989}