Skip to main content

spawn_lnd/
lnd.rs

1use hyper::Uri;
2use lnd_grpc_rust::{
3    LndClient, LndNodeConfig, MyChannel,
4    lnrpc::{
5        AddressType, Channel, ChannelPoint, ConnectPeerRequest, ConnectPeerResponse,
6        GenSeedRequest, GetInfoRequest, GetInfoResponse, InitWalletRequest, LightningAddress,
7        ListChannelsRequest, ListUnspentRequest, NewAddressRequest, OpenChannelRequest,
8        PendingChannelsRequest, PendingChannelsResponse, Utxo, WalletBalanceRequest,
9        WalletBalanceResponse, wallet_unlocker_client::WalletUnlockerClient,
10    },
11};
12use serde::{Deserialize, Serialize};
13use thiserror::Error;
14use tokio::time::{Duration, sleep};
15
16use crate::{
17    BitcoinCore, DEFAULT_LND_IMAGE, RetryPolicy,
18    bitcoin::BITCOIND_RPC_PORT,
19    docker::{
20        ContainerRole, ContainerSpec, DockerClient, DockerError, SpawnedContainer,
21        managed_container_labels,
22    },
23};
24
25/// LND gRPC port exposed inside the Docker container.
26pub const LND_GRPC_PORT: u16 = 10009;
27/// LND P2P port exposed inside the Docker container.
28pub const LND_P2P_PORT: u16 = 9735;
29/// Path to the TLS certificate inside the LND container.
30pub const LND_TLS_CERT_PATH: &str = "/root/.lnd/tls.cert";
31/// Path to the admin macaroon inside the LND container.
32pub const LND_ADMIN_MACAROON_PATH: &str = "/root/.lnd/data/chain/bitcoin/regtest/admin.macaroon";
33/// Fixed wallet password used for spawned regtest LND nodes.
34pub const LND_WALLET_PASSWORD: &[u8] = b"password";
35/// Static regtest address used for internal block generation.
36pub const DEFAULT_GENERATE_ADDRESS: &str = "2N8hwP1WmJrFF5QWABn38y63uYLhnJYJYTF";
37
38const READY_RETRY_ATTEMPTS: usize = 500;
39const READY_RETRY_INTERVAL: Duration = Duration::from_millis(100);
40const MAX_UTXO_CONFIRMATIONS: i32 = i32::MAX;
41
42/// Configuration for one spawned LND node.
43#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
44pub struct LndConfig {
45    /// Cluster identifier used in container names and labels.
46    pub cluster_id: String,
47    /// LND alias.
48    pub alias: String,
49    /// Zero-based node index in spawn order.
50    pub node_index: usize,
51    /// Docker image used for this LND container.
52    pub image: String,
53    /// Extra command-line flags appended to the LND command.
54    pub extra_args: Vec<String>,
55    /// Retry policy used while waiting for wallet init and chain sync.
56    pub startup_retry: RetryPolicy,
57    /// Optional Docker network name.
58    pub network: Option<String>,
59    /// Optional static IPv4 address on the configured Docker network.
60    pub ipv4_address: Option<String>,
61}
62
63impl LndConfig {
64    /// Create an LND config using the default pinned image.
65    pub fn new(cluster_id: impl Into<String>, alias: impl Into<String>, node_index: usize) -> Self {
66        Self {
67            cluster_id: cluster_id.into(),
68            alias: alias.into(),
69            node_index,
70            image: DEFAULT_LND_IMAGE.to_string(),
71            extra_args: Vec::new(),
72            startup_retry: RetryPolicy::default(),
73            network: None,
74            ipv4_address: None,
75        }
76    }
77
78    /// Override the LND Docker image.
79    pub fn image(mut self, image: impl Into<String>) -> Self {
80        self.image = image.into();
81        self
82    }
83
84    /// Append one extra LND command-line argument.
85    pub fn extra_arg(mut self, arg: impl Into<String>) -> Self {
86        self.extra_args.push(arg.into());
87        self
88    }
89
90    /// Append multiple extra LND command-line arguments.
91    pub fn extra_args<I, S>(mut self, args: I) -> Self
92    where
93        I: IntoIterator<Item = S>,
94        S: Into<String>,
95    {
96        self.extra_args.extend(args.into_iter().map(Into::into));
97        self
98    }
99
100    /// Override the startup retry policy.
101    pub fn startup_retry_policy(mut self, policy: RetryPolicy) -> Self {
102        self.startup_retry = policy;
103        self
104    }
105
106    /// Attach this LND container to a Docker network.
107    pub fn network(mut self, network: impl Into<String>) -> Self {
108        self.network = Some(network.into());
109        self
110    }
111
112    /// Assign a static IPv4 address on the configured Docker network.
113    pub fn ipv4_address(mut self, ip: impl Into<String>) -> Self {
114        self.ipv4_address = Some(ip.into());
115        self
116    }
117}
118
119/// A running LND container and authenticated connection material.
120#[derive(Clone, Debug)]
121pub struct LndDaemon {
122    /// LND alias.
123    pub alias: String,
124    /// Docker container metadata.
125    pub container: SpawnedContainer,
126    /// Hex-encoded TLS certificate.
127    pub cert_hex: String,
128    /// Hex-encoded admin macaroon.
129    pub macaroon_hex: String,
130    /// Host gRPC socket, usually `127.0.0.1:<port>`.
131    pub rpc_socket: String,
132    /// Host P2P socket, usually `127.0.0.1:<port>`.
133    pub p2p_socket: String,
134    /// LND identity public key.
135    pub public_key: String,
136}
137
138impl LndDaemon {
139    /// Spawn an LND container, initialize its wallet, and wait for chain sync.
140    pub async fn spawn(
141        docker: &DockerClient,
142        bitcoind: &BitcoinCore,
143        config: LndConfig,
144    ) -> Result<Self, LndError> {
145        Self::spawn_with_startup_cleanup(docker, bitcoind, config, true).await
146    }
147
148    pub(crate) async fn spawn_with_startup_cleanup(
149        docker: &DockerClient,
150        bitcoind: &BitcoinCore,
151        config: LndConfig,
152        cleanup_on_startup_failure: bool,
153    ) -> Result<Self, LndError> {
154        bitcoind
155            .rpc
156            .generate_to_address(1, DEFAULT_GENERATE_ADDRESS)
157            .await
158            .map_err(LndError::BitcoinRpc)?;
159
160        let spec = lnd_container_spec(&config, bitcoind)?;
161        let container = docker.create_and_start(spec).await?;
162        let container_id = container.id.clone();
163        let alias = config.alias;
164        let result =
165            Self::initialize_started(docker, container, alias.clone(), config.startup_retry).await;
166
167        match result {
168            Ok(daemon) => Ok(daemon),
169            Err(error) => {
170                let logs = docker.container_logs(&container_id).await.ok();
171                if cleanup_on_startup_failure {
172                    let _ = docker.rollback_containers([container_id.clone()]).await;
173                }
174                Err(LndError::Startup {
175                    alias,
176                    container_id,
177                    logs,
178                    source: Box::new(error),
179                })
180            }
181        }
182    }
183
184    async fn initialize_started(
185        docker: &DockerClient,
186        container: SpawnedContainer,
187        alias: String,
188        startup_retry: RetryPolicy,
189    ) -> Result<Self, LndError> {
190        let rpc_port =
191            container
192                .host_port(LND_GRPC_PORT)
193                .ok_or_else(|| LndError::MissingHostPort {
194                    container_id: container.id.clone(),
195                    container_port: LND_GRPC_PORT,
196                })?;
197        let p2p_port =
198            container
199                .host_port(LND_P2P_PORT)
200                .ok_or_else(|| LndError::MissingHostPort {
201                    container_id: container.id.clone(),
202                    container_port: LND_P2P_PORT,
203                })?;
204        let rpc_socket = format!("127.0.0.1:{rpc_port}");
205        let cert_bytes =
206            wait_for_file(docker, &container.id, LND_TLS_CERT_PATH, &startup_retry).await?;
207        let cert_hex = hex::encode(&cert_bytes);
208        let macaroon_hex = init_wallet_or_read_macaroon(
209            docker,
210            &container.id,
211            &cert_bytes,
212            &rpc_socket,
213            &startup_retry,
214        )
215        .await?;
216        let info =
217            wait_for_synced_get_info(&cert_hex, &macaroon_hex, &rpc_socket, &startup_retry).await?;
218
219        Ok(Self {
220            alias,
221            p2p_socket: format!("127.0.0.1:{p2p_port}"),
222            public_key: info.identity_pubkey,
223            cert_hex,
224            macaroon_hex,
225            rpc_socket,
226            container,
227        })
228    }
229
230    fn refresh_from_container(&mut self, container: SpawnedContainer) -> Result<(), LndError> {
231        let rpc_port =
232            container
233                .host_port(LND_GRPC_PORT)
234                .ok_or_else(|| LndError::MissingHostPort {
235                    container_id: container.id.clone(),
236                    container_port: LND_GRPC_PORT,
237                })?;
238        let p2p_port =
239            container
240                .host_port(LND_P2P_PORT)
241                .ok_or_else(|| LndError::MissingHostPort {
242                    container_id: container.id.clone(),
243                    container_port: LND_P2P_PORT,
244                })?;
245
246        self.rpc_socket = format!("127.0.0.1:{rpc_port}");
247        self.p2p_socket = format!("127.0.0.1:{p2p_port}");
248        self.container = container;
249        Ok(())
250    }
251
252    /// Stop the LND container without removing it.
253    pub async fn stop(&self, docker: &DockerClient) -> Result<(), LndError> {
254        docker.stop_container(&self.container.id).await?;
255        Ok(())
256    }
257
258    /// Start the LND container and wait until it is synced to chain.
259    pub async fn start(
260        &mut self,
261        docker: &DockerClient,
262        policy: &RetryPolicy,
263    ) -> Result<GetInfoResponse, LndError> {
264        let container = docker.start_container(&self.container.id).await?;
265        self.refresh_from_container(container)?;
266        self.wait_synced_to_chain_with_policy(policy).await
267    }
268
269    /// Restart the LND container and wait until it is synced to chain.
270    pub async fn restart(
271        &mut self,
272        docker: &DockerClient,
273        policy: &RetryPolicy,
274    ) -> Result<GetInfoResponse, LndError> {
275        let container = docker.restart_container(&self.container.id).await?;
276        self.refresh_from_container(container)?;
277        self.wait_synced_to_chain_with_policy(policy).await
278    }
279
280    /// Build an `lnd_grpc_rust` connection config for this node.
281    pub fn node_config(&self) -> LndNodeConfig {
282        LndNodeConfig::new(
283            self.alias.clone(),
284            self.cert_hex.clone(),
285            self.macaroon_hex.clone(),
286            self.rpc_socket.clone(),
287        )
288    }
289
290    /// Connect to this node using its TLS certificate and admin macaroon.
291    pub async fn connect(&self) -> Result<LndClient, LndError> {
292        connect_authenticated(&self.cert_hex, &self.macaroon_hex, &self.rpc_socket).await
293    }
294
295    /// Wait until `GetInfo` reports `synced_to_chain`.
296    pub async fn wait_synced_to_chain(&self) -> Result<GetInfoResponse, LndError> {
297        self.wait_synced_to_chain_with_policy(&RetryPolicy::default())
298            .await
299    }
300
301    pub(crate) async fn wait_synced_to_chain_with_policy(
302        &self,
303        policy: &RetryPolicy,
304    ) -> Result<GetInfoResponse, LndError> {
305        wait_for_synced_get_info(&self.cert_hex, &self.macaroon_hex, &self.rpc_socket, policy).await
306    }
307
308    /// Generate a new LND wallet address.
309    pub async fn new_address(&self) -> Result<String, LndError> {
310        let mut client = self.connect().await?;
311        let response = client
312            .lightning()
313            .new_address(NewAddressRequest {
314                r#type: AddressType::WitnessPubkeyHash as i32,
315                account: String::new(),
316            })
317            .await
318            .map_err(|error| LndError::rpc(&self.rpc_socket, "NewAddress", error))?
319            .into_inner();
320
321        Ok(response.address)
322    }
323
324    /// Connect this LND node to a peer by public key and host socket.
325    pub async fn connect_peer(
326        &self,
327        public_key: impl Into<String>,
328        host: impl Into<String>,
329    ) -> Result<ConnectPeerResponse, LndError> {
330        let public_key = public_key.into();
331        let host = host.into();
332        let mut last_error = None;
333
334        for _ in 0..READY_RETRY_ATTEMPTS {
335            let mut client = match self.connect().await {
336                Ok(client) => client,
337                Err(error) if error.is_lnd_starting() => {
338                    last_error = Some(error.to_string());
339                    sleep(READY_RETRY_INTERVAL).await;
340                    continue;
341                }
342                Err(error) => return Err(error),
343            };
344            match client
345                .lightning()
346                .connect_peer(ConnectPeerRequest {
347                    addr: Some(LightningAddress {
348                        pubkey: public_key.clone(),
349                        host: host.clone(),
350                    }),
351                    perm: false,
352                    timeout: 10,
353                })
354                .await
355            {
356                Ok(response) => return Ok(response.into_inner()),
357                Err(error) => {
358                    let error = LndError::rpc(&self.rpc_socket, "ConnectPeer", error);
359                    if error.is_lnd_starting() {
360                        last_error = Some(error.to_string());
361                        sleep(READY_RETRY_INTERVAL).await;
362                        continue;
363                    }
364
365                    return Err(error);
366                }
367            }
368        }
369
370        Err(LndError::PeerConnectTimeout {
371            alias: self.alias.clone(),
372            public_key,
373            attempts: READY_RETRY_ATTEMPTS,
374            last_error,
375        })
376    }
377
378    /// Return LND wallet balance with the given minimum confirmations.
379    pub async fn wallet_balance(&self, min_confs: i32) -> Result<WalletBalanceResponse, LndError> {
380        let mut client = self.connect().await?;
381        let response = client
382            .lightning()
383            .wallet_balance(WalletBalanceRequest {
384                account: String::new(),
385                min_confs,
386            })
387            .await
388            .map_err(|error| LndError::rpc(&self.rpc_socket, "WalletBalance", error))?
389            .into_inner();
390
391        Ok(response)
392    }
393
394    /// Return wallet UTXOs matching the confirmation range.
395    pub async fn list_unspent(
396        &self,
397        min_confs: i32,
398        max_confs: i32,
399    ) -> Result<Vec<Utxo>, LndError> {
400        let mut client = self.connect().await?;
401        let response = client
402            .lightning()
403            .list_unspent(ListUnspentRequest {
404                min_confs,
405                max_confs,
406                account: String::new(),
407            })
408            .await
409            .map_err(|error| LndError::rpc(&self.rpc_socket, "ListUnspent", error))?
410            .into_inner();
411
412        Ok(response.utxos)
413    }
414
415    /// Wait until confirmed wallet balance is at least `minimum_sat`.
416    pub async fn wait_for_spendable_balance(
417        &self,
418        minimum_sat: i64,
419    ) -> Result<WalletBalanceResponse, LndError> {
420        let mut last_error = None;
421
422        for _ in 0..READY_RETRY_ATTEMPTS {
423            match self.wallet_balance(1).await {
424                Ok(balance) if balance.confirmed_balance >= minimum_sat => return Ok(balance),
425                Ok(balance) => {
426                    last_error = Some(format!(
427                        "confirmed balance {} is below required {minimum_sat}",
428                        balance.confirmed_balance
429                    ));
430                }
431                Err(error) => last_error = Some(error.to_string()),
432            }
433
434            sleep(READY_RETRY_INTERVAL).await;
435        }
436
437        Err(LndError::BalanceTimeout {
438            alias: self.alias.clone(),
439            minimum_sat,
440            attempts: READY_RETRY_ATTEMPTS,
441            last_error,
442        })
443    }
444
445    /// Wait until spendable UTXOs total at least `minimum_sat`.
446    pub async fn wait_for_spendable_utxos(&self, minimum_sat: i64) -> Result<Vec<Utxo>, LndError> {
447        let mut last_error = None;
448
449        for _ in 0..READY_RETRY_ATTEMPTS {
450            match self.list_unspent(1, MAX_UTXO_CONFIRMATIONS).await {
451                Ok(utxos) if utxo_total_sat(&utxos) >= minimum_sat => return Ok(utxos),
452                Ok(utxos) => {
453                    last_error = Some(format!(
454                        "spendable UTXO total {} is below required {minimum_sat}",
455                        utxo_total_sat(&utxos)
456                    ));
457                }
458                Err(error) => last_error = Some(error.to_string()),
459            }
460
461            sleep(READY_RETRY_INTERVAL).await;
462        }
463
464        Err(LndError::UtxoTimeout {
465            alias: self.alias.clone(),
466            minimum_sat,
467            attempts: READY_RETRY_ATTEMPTS,
468            last_error,
469        })
470    }
471
472    /// Open a public channel synchronously through LND.
473    pub async fn open_channel_sync(
474        &self,
475        remote_public_key: &str,
476        local_funding_amount_sat: i64,
477        push_sat: i64,
478    ) -> Result<ChannelPoint, LndError> {
479        let mut client = self.connect().await?;
480        let remote_public_key =
481            hex::decode(remote_public_key).map_err(|error| LndError::InvalidPublicKey {
482                public_key: remote_public_key.to_string(),
483                message: error.to_string(),
484            })?;
485        let response = client
486            .lightning()
487            .open_channel_sync(OpenChannelRequest {
488                node_pubkey: remote_public_key,
489                local_funding_amount: local_funding_amount_sat,
490                push_sat,
491                target_conf: 1,
492                private: false,
493                min_confs: 1,
494                spend_unconfirmed: false,
495                ..Default::default()
496            })
497            .await
498            .map_err(|error| LndError::rpc(&self.rpc_socket, "OpenChannelSync", error))?
499            .into_inner();
500
501        Ok(response)
502    }
503
504    /// Return LND pending channel state.
505    pub async fn pending_channels(&self) -> Result<PendingChannelsResponse, LndError> {
506        let mut client = self.connect().await?;
507        let response = client
508            .lightning()
509            .pending_channels(PendingChannelsRequest {
510                include_raw_tx: false,
511            })
512            .await
513            .map_err(|error| LndError::rpc(&self.rpc_socket, "PendingChannels", error))?
514            .into_inner();
515
516        Ok(response)
517    }
518
519    /// List channels, optionally filtered by remote public key.
520    pub async fn list_channels(
521        &self,
522        remote_public_key: Option<&str>,
523    ) -> Result<Vec<Channel>, LndError> {
524        let mut client = self.connect().await?;
525        let peer = match remote_public_key {
526            Some(public_key) => {
527                hex::decode(public_key).map_err(|error| LndError::InvalidPublicKey {
528                    public_key: public_key.to_string(),
529                    message: error.to_string(),
530                })?
531            }
532            None => Vec::new(),
533        };
534        let response = client
535            .lightning()
536            .list_channels(ListChannelsRequest {
537                active_only: false,
538                inactive_only: false,
539                public_only: false,
540                private_only: false,
541                peer,
542                peer_alias_lookup: true,
543            })
544            .await
545            .map_err(|error| LndError::rpc(&self.rpc_socket, "ListChannels", error))?
546            .into_inner();
547
548        Ok(response.channels)
549    }
550
551    /// Wait until LND reports a pending channel with the given peer and point.
552    pub async fn wait_for_pending_channel(
553        &self,
554        remote_public_key: &str,
555        channel_point: &str,
556    ) -> Result<(), LndError> {
557        let mut last_error = None;
558
559        for _ in 0..READY_RETRY_ATTEMPTS {
560            match self.pending_channels().await {
561                Ok(pending) if has_pending_channel(&pending, remote_public_key, channel_point) => {
562                    return Ok(());
563                }
564                Ok(pending) => {
565                    last_error = Some(format!(
566                        "pending channels did not include {channel_point}; count={}",
567                        pending.pending_open_channels.len()
568                    ));
569                }
570                Err(error) => last_error = Some(error.to_string()),
571            }
572
573            sleep(READY_RETRY_INTERVAL).await;
574        }
575
576        Err(LndError::PendingChannelTimeout {
577            alias: self.alias.clone(),
578            remote_public_key: remote_public_key.to_string(),
579            channel_point: channel_point.to_string(),
580            attempts: READY_RETRY_ATTEMPTS,
581            last_error,
582        })
583    }
584
585    /// Wait until LND reports an active channel with the given peer and point.
586    pub async fn wait_for_active_channel(
587        &self,
588        remote_public_key: &str,
589        channel_point: &str,
590    ) -> Result<Channel, LndError> {
591        let mut last_error = None;
592
593        for _ in 0..READY_RETRY_ATTEMPTS {
594            match self.list_channels(Some(remote_public_key)).await {
595                Ok(channels) => {
596                    if let Some(channel) = channels
597                        .iter()
598                        .find(|channel| channel.channel_point == channel_point && channel.active)
599                    {
600                        return Ok(channel.clone());
601                    }
602
603                    last_error = Some(format!(
604                        "active channels did not include {channel_point}; count={}",
605                        channels.len()
606                    ));
607                }
608                Err(error) => last_error = Some(error.to_string()),
609            }
610
611            sleep(READY_RETRY_INTERVAL).await;
612        }
613
614        Err(LndError::ActiveChannelTimeout {
615            alias: self.alias.clone(),
616            remote_public_key: remote_public_key.to_string(),
617            channel_point: channel_point.to_string(),
618            attempts: READY_RETRY_ATTEMPTS,
619            last_error,
620        })
621    }
622}
623
624/// Error returned by LND lifecycle and RPC helpers.
625#[derive(Debug, Error)]
626#[allow(missing_docs)]
627pub enum LndError {
628    #[error(transparent)]
629    Docker(#[from] DockerError),
630
631    #[error(transparent)]
632    BitcoinRpc(#[from] crate::BitcoinRpcError),
633
634    #[error("Bitcoin Core container did not expose a bridge IP address for LND")]
635    MissingBitcoindIp,
636
637    #[error("Docker container {container_id} did not publish expected LND port {container_port}")]
638    MissingHostPort {
639        container_id: String,
640        container_port: u16,
641    },
642
643    #[error("failed to connect to LND at {socket}: {message}")]
644    Connect { socket: String, message: String },
645
646    #[error("LND RPC {method} failed at {socket}: {message}")]
647    Rpc {
648        socket: String,
649        method: &'static str,
650        message: String,
651    },
652
653    #[error("invalid LND public key {public_key}: {message}")]
654    InvalidPublicKey { public_key: String, message: String },
655
656    #[error("LND node {alias} startup failed for container {container_id}; logs: {logs:?}")]
657    Startup {
658        alias: String,
659        container_id: String,
660        logs: Option<String>,
661        source: Box<LndError>,
662    },
663
664    #[error("failed to create unauthenticated LND channel to {socket}: {message}")]
665    UnauthenticatedChannel { socket: String, message: String },
666
667    #[error(
668        "LND wallet init did not complete after {attempts} attempts; last error: {last_error:?}"
669    )]
670    WalletInitTimeout {
671        attempts: usize,
672        last_error: Option<String>,
673    },
674
675    #[error(
676        "LND did not report synced_to_chain after {attempts} attempts; last error: {last_error:?}"
677    )]
678    ReadyTimeout {
679        attempts: usize,
680        last_error: Option<String>,
681    },
682
683    #[error(
684        "LND {container_id} did not produce file {path} after {attempts} attempts; last error: {last_error:?}"
685    )]
686    FileTimeout {
687        container_id: String,
688        path: String,
689        attempts: usize,
690        last_error: Option<String>,
691    },
692
693    #[error(
694        "LND node {alias} did not reach spendable balance {minimum_sat} sat after {attempts} attempts; last error: {last_error:?}"
695    )]
696    BalanceTimeout {
697        alias: String,
698        minimum_sat: i64,
699        attempts: usize,
700        last_error: Option<String>,
701    },
702
703    #[error(
704        "LND node {alias} did not report spendable UTXOs totaling {minimum_sat} sat after {attempts} attempts; last error: {last_error:?}"
705    )]
706    UtxoTimeout {
707        alias: String,
708        minimum_sat: i64,
709        attempts: usize,
710        last_error: Option<String>,
711    },
712
713    #[error(
714        "LND node {alias} did not report pending channel {channel_point} with {remote_public_key} after {attempts} attempts; last error: {last_error:?}"
715    )]
716    PendingChannelTimeout {
717        alias: String,
718        remote_public_key: String,
719        channel_point: String,
720        attempts: usize,
721        last_error: Option<String>,
722    },
723
724    #[error(
725        "LND node {alias} did not report active channel {channel_point} with {remote_public_key} after {attempts} attempts; last error: {last_error:?}"
726    )]
727    ActiveChannelTimeout {
728        alias: String,
729        remote_public_key: String,
730        channel_point: String,
731        attempts: usize,
732        last_error: Option<String>,
733    },
734
735    #[error(
736        "LND node {alias} could not connect peer {public_key} after {attempts} attempts; last error: {last_error:?}"
737    )]
738    PeerConnectTimeout {
739        alias: String,
740        public_key: String,
741        attempts: usize,
742        last_error: Option<String>,
743    },
744}
745
746impl LndError {
747    fn rpc(socket: &str, method: &'static str, error: impl std::fmt::Display) -> Self {
748        Self::Rpc {
749            socket: socket.to_string(),
750            method,
751            message: error.to_string(),
752        }
753    }
754
755    fn is_lnd_starting(&self) -> bool {
756        matches!(
757            self,
758            LndError::Connect { message, .. } | LndError::Rpc { message, .. }
759                if message.contains("server is still in the process of starting")
760        )
761    }
762}
763
764fn lnd_container_spec(
765    config: &LndConfig,
766    bitcoind: &BitcoinCore,
767) -> Result<ContainerSpec, LndError> {
768    let bitcoind_ip = bitcoind
769        .container
770        .ip_address
771        .as_deref()
772        .ok_or(LndError::MissingBitcoindIp)?;
773    let name = format!(
774        "spawn-lnd-{}-lnd-{}-{}",
775        config.cluster_id, config.node_index, config.alias
776    );
777    let labels =
778        managed_container_labels(&config.cluster_id, ContainerRole::Lnd, Some(&config.alias));
779    let mut args = lnd_args(bitcoind_ip, bitcoind);
780
781    args.extend(config.extra_args.clone());
782
783    let mut spec = ContainerSpec::new(name, config.image.clone())
784        .cmd(args)
785        .labels(labels)
786        .expose_ports([LND_GRPC_PORT, LND_P2P_PORT]);
787
788    if let Some(network) = &config.network {
789        spec = spec.network(network.clone());
790    }
791    if let Some(ipv4_address) = &config.ipv4_address {
792        spec = spec.ipv4_address(ipv4_address.clone());
793    }
794
795    Ok(spec)
796}
797
798fn lnd_args(bitcoind_ip: &str, bitcoind: &BitcoinCore) -> Vec<String> {
799    vec![
800        "--bitcoin.regtest".to_string(),
801        "--bitcoin.node=bitcoind".to_string(),
802        "--bitcoind.rpcpolling".to_string(),
803        format!("--bitcoind.rpchost={bitcoind_ip}:{BITCOIND_RPC_PORT}"),
804        format!("--bitcoind.rpcuser={}", bitcoind.auth.user),
805        format!("--bitcoind.rpcpass={}", bitcoind.auth.password),
806        "--accept-keysend".to_string(),
807        "--allow-circular-route".to_string(),
808        "--debuglevel=info".to_string(),
809        "--noseedbackup".to_string(),
810        "--listen=0.0.0.0:9735".to_string(),
811        "--rpclisten=0.0.0.0:10009".to_string(),
812    ]
813}
814
815fn utxo_total_sat(utxos: &[Utxo]) -> i64 {
816    utxos.iter().map(|utxo| utxo.amount_sat).sum()
817}
818
819pub(crate) fn channel_point_string(channel_point: &ChannelPoint) -> Result<String, LndError> {
820    let funding_txid = match channel_point.funding_txid.as_ref() {
821        Some(lnd_grpc_rust::lnrpc::channel_point::FundingTxid::FundingTxidBytes(bytes)) => {
822            let mut txid = bytes.clone();
823            txid.reverse();
824            hex::encode(txid)
825        }
826        Some(lnd_grpc_rust::lnrpc::channel_point::FundingTxid::FundingTxidStr(txid)) => {
827            txid.clone()
828        }
829        None => {
830            return Err(LndError::Rpc {
831                socket: "<open-channel>".to_string(),
832                method: "OpenChannelSync",
833                message: "response did not include funding txid".to_string(),
834            });
835        }
836    };
837
838    Ok(format!("{}:{}", funding_txid, channel_point.output_index))
839}
840
841fn has_pending_channel(
842    pending: &PendingChannelsResponse,
843    remote_public_key: &str,
844    channel_point: &str,
845) -> bool {
846    pending.pending_open_channels.iter().any(|pending| {
847        pending.channel.as_ref().is_some_and(|channel| {
848            channel.remote_node_pub == remote_public_key && channel.channel_point == channel_point
849        })
850    })
851}
852
853async fn wait_for_file(
854    docker: &DockerClient,
855    container_id: &str,
856    path: &str,
857    policy: &RetryPolicy,
858) -> Result<Vec<u8>, LndError> {
859    let mut last_error = None;
860
861    for _ in 0..policy.attempts {
862        match docker.copy_file_from_container(container_id, path).await {
863            Ok(file) => return Ok(file),
864            Err(error) => {
865                last_error = Some(error.to_string());
866                sleep(policy.interval()).await;
867            }
868        }
869    }
870
871    Err(LndError::FileTimeout {
872        container_id: container_id.to_string(),
873        path: path.to_string(),
874        attempts: policy.attempts,
875        last_error,
876    })
877}
878
879async fn init_wallet_or_read_macaroon(
880    docker: &DockerClient,
881    container_id: &str,
882    cert_bytes: &[u8],
883    socket: &str,
884    policy: &RetryPolicy,
885) -> Result<String, LndError> {
886    let mut last_error = None;
887
888    for _ in 0..policy.attempts {
889        let init_error = match init_wallet_once(cert_bytes, socket).await {
890            Ok(macaroon) if !macaroon.is_empty() => return Ok(macaroon),
891            Ok(_) => Some("InitWallet returned an empty admin macaroon".to_string()),
892            Err(error) => Some(error),
893        };
894
895        match docker
896            .copy_file_from_container(container_id, LND_ADMIN_MACAROON_PATH)
897            .await
898        {
899            Ok(macaroon) if !macaroon.is_empty() => return Ok(hex::encode(macaroon)),
900            Ok(_) => {
901                last_error = Some(format!(
902                    "{LND_ADMIN_MACAROON_PATH} was empty; wallet init: {}",
903                    init_error.as_deref().unwrap_or("no error")
904                ));
905            }
906            Err(error) => {
907                last_error = Some(format!(
908                    "failed to read {LND_ADMIN_MACAROON_PATH}: {error}; wallet init: {}",
909                    init_error.as_deref().unwrap_or("no error")
910                ));
911            }
912        }
913
914        sleep(policy.interval()).await;
915    }
916
917    Err(LndError::WalletInitTimeout {
918        attempts: policy.attempts,
919        last_error,
920    })
921}
922
923async fn init_wallet_once(cert_bytes: &[u8], socket: &str) -> Result<String, String> {
924    let channel = unauthenticated_channel(cert_bytes, socket)
925        .await
926        .map_err(|error| error.to_string())?;
927    let mut unlocker = WalletUnlockerClient::new(channel);
928    let seed = unlocker
929        .gen_seed(GenSeedRequest {
930            aezeed_passphrase: Vec::new(),
931            seed_entropy: Vec::new(),
932        })
933        .await
934        .map_err(|error| error.to_string())?
935        .into_inner()
936        .cipher_seed_mnemonic;
937    let response = unlocker
938        .init_wallet(InitWalletRequest {
939            wallet_password: LND_WALLET_PASSWORD.to_vec(),
940            cipher_seed_mnemonic: seed,
941            ..Default::default()
942        })
943        .await
944        .map_err(|error| error.to_string())?
945        .into_inner();
946
947    Ok(hex::encode(response.admin_macaroon))
948}
949
950async fn unauthenticated_channel(cert_bytes: &[u8], socket: &str) -> Result<MyChannel, LndError> {
951    let uri = format!("https://{socket}")
952        .parse::<Uri>()
953        .map_err(|error| LndError::UnauthenticatedChannel {
954            socket: socket.to_string(),
955            message: error.to_string(),
956        })?;
957
958    MyChannel::new(Some(cert_bytes.to_vec()), uri)
959        .await
960        .map_err(|error| LndError::UnauthenticatedChannel {
961            socket: socket.to_string(),
962            message: error.to_string(),
963        })
964}
965
966async fn wait_for_synced_get_info(
967    cert_hex: &str,
968    macaroon_hex: &str,
969    socket: &str,
970    policy: &RetryPolicy,
971) -> Result<GetInfoResponse, LndError> {
972    let mut last_error = None;
973
974    for _ in 0..policy.attempts {
975        match get_synced_info_once(cert_hex, macaroon_hex, socket).await {
976            Ok(info) if info.synced_to_chain => return Ok(info),
977            Ok(info) => {
978                last_error = Some(format!(
979                    "GetInfo returned synced_to_chain=false at height {}",
980                    info.block_height
981                ));
982            }
983            Err(error) => last_error = Some(error.to_string()),
984        }
985
986        sleep(policy.interval()).await;
987    }
988
989    Err(LndError::ReadyTimeout {
990        attempts: policy.attempts,
991        last_error,
992    })
993}
994
995async fn get_synced_info_once(
996    cert_hex: &str,
997    macaroon_hex: &str,
998    socket: &str,
999) -> Result<GetInfoResponse, LndError> {
1000    let mut client = connect_authenticated(cert_hex, macaroon_hex, socket).await?;
1001    let info = client
1002        .lightning()
1003        .get_info(GetInfoRequest {})
1004        .await
1005        .map_err(|error| LndError::Connect {
1006            socket: socket.to_string(),
1007            message: error.to_string(),
1008        })?
1009        .into_inner();
1010
1011    Ok(info)
1012}
1013
1014async fn connect_authenticated(
1015    cert_hex: &str,
1016    macaroon_hex: &str,
1017    socket: &str,
1018) -> Result<LndClient, LndError> {
1019    lnd_grpc_rust::connect(
1020        cert_hex.to_string(),
1021        macaroon_hex.to_string(),
1022        socket.to_string(),
1023    )
1024    .await
1025    .map_err(|error| LndError::Connect {
1026        socket: socket.to_string(),
1027        message: error.to_string(),
1028    })
1029}
1030
1031#[cfg(test)]
1032mod tests {
1033    use std::collections::HashMap;
1034
1035    use crate::{
1036        BitcoinCore, BitcoinRpcAuth, BitcoinRpcClient, DEFAULT_LND_IMAGE,
1037        bitcoin::{BITCOIND_P2P_PORT, BITCOIND_RPC_PORT},
1038        docker::SpawnedContainer,
1039    };
1040
1041    use lnd_grpc_rust::lnrpc::{ChannelPoint, channel_point};
1042
1043    use super::{
1044        LND_GRPC_PORT, LND_P2P_PORT, LndConfig, channel_point_string, lnd_args, lnd_container_spec,
1045    };
1046
1047    fn fake_bitcoind() -> BitcoinCore {
1048        BitcoinCore {
1049            container: SpawnedContainer {
1050                id: "bitcoind".to_string(),
1051                name: Some("bitcoind".to_string()),
1052                ip_address: Some("172.17.0.2".to_string()),
1053                host_ports: HashMap::from([(BITCOIND_RPC_PORT, 18443), (BITCOIND_P2P_PORT, 18444)]),
1054            },
1055            auth: BitcoinRpcAuth {
1056                user: "bitcoinrpc".to_string(),
1057                password: "password".to_string(),
1058                rpcauth: "bitcoinrpc:salt$hmac".to_string(),
1059            },
1060            rpc: BitcoinRpcClient::new("127.0.0.1", 18443, "bitcoinrpc", "password"),
1061            wallet_rpc: BitcoinRpcClient::new("127.0.0.1", 18443, "bitcoinrpc", "password")
1062                .wallet("spawn-lnd"),
1063            rpc_socket: "127.0.0.1:18443".to_string(),
1064            p2p_socket: "127.0.0.1:18444".to_string(),
1065        }
1066    }
1067
1068    #[test]
1069    fn default_lnd_config_uses_pinned_image() {
1070        let config = LndConfig::new("cluster-1", "alice", 0);
1071
1072        assert_eq!(config.cluster_id, "cluster-1");
1073        assert_eq!(config.alias, "alice");
1074        assert_eq!(config.node_index, 0);
1075        assert_eq!(config.image, DEFAULT_LND_IMAGE);
1076    }
1077
1078    #[test]
1079    fn builds_lnd_args_for_bitcoind_bridge_ip() {
1080        let bitcoind = fake_bitcoind();
1081        let args = lnd_args("172.17.0.2", &bitcoind);
1082
1083        assert!(args.contains(&"--bitcoin.regtest".to_string()));
1084        assert!(args.contains(&"--bitcoin.node=bitcoind".to_string()));
1085        assert!(args.contains(&"--bitcoind.rpcpolling".to_string()));
1086        assert!(args.contains(&"--rpclisten=0.0.0.0:10009".to_string()));
1087        assert!(args.contains(&"--listen=0.0.0.0:9735".to_string()));
1088        assert!(args.contains(&"--bitcoind.rpchost=172.17.0.2:18443".to_string()));
1089        assert!(args.contains(&"--bitcoind.rpcuser=bitcoinrpc".to_string()));
1090        assert!(args.contains(&"--bitcoind.rpcpass=password".to_string()));
1091        assert!(args.contains(&"--accept-keysend".to_string()));
1092        assert!(args.contains(&"--allow-circular-route".to_string()));
1093        assert!(args.contains(&"--debuglevel=info".to_string()));
1094        assert!(args.contains(&"--noseedbackup".to_string()));
1095    }
1096
1097    #[test]
1098    fn builds_lnd_container_spec() {
1099        let bitcoind = fake_bitcoind();
1100        let config = LndConfig::new("cluster-1", "alice", 0).extra_arg("--debuglevel=info");
1101
1102        let spec = lnd_container_spec(&config, &bitcoind).expect("spec");
1103
1104        assert_eq!(spec.name, "spawn-lnd-cluster-1-lnd-0-alice");
1105        assert_eq!(spec.image, DEFAULT_LND_IMAGE);
1106        assert!(spec.cmd.contains(&"--debuglevel=info".to_string()));
1107        assert!(spec.exposed_ports.contains(&LND_GRPC_PORT));
1108        assert!(spec.exposed_ports.contains(&LND_P2P_PORT));
1109    }
1110
1111    #[test]
1112    fn formats_channel_point_string() {
1113        let channel_point = ChannelPoint {
1114            funding_txid: Some(channel_point::FundingTxid::FundingTxidStr(
1115                "txid".to_string(),
1116            )),
1117            output_index: 1,
1118        };
1119
1120        assert_eq!(
1121            channel_point_string(&channel_point).expect("channel point"),
1122            "txid:1"
1123        );
1124    }
1125
1126    #[test]
1127    fn formats_channel_point_bytes_as_display_txid() {
1128        let channel_point = ChannelPoint {
1129            funding_txid: Some(channel_point::FundingTxid::FundingTxidBytes(vec![
1130                0x01, 0x02, 0x03, 0x04,
1131            ])),
1132            output_index: 0,
1133        };
1134
1135        assert_eq!(
1136            channel_point_string(&channel_point).expect("channel point"),
1137            "04030201:0"
1138        );
1139    }
1140}