Skip to main content

spawn_lnd/
docker.rs

1use bollard::{
2    Docker,
3    container::LogOutput,
4    errors::Error as BollardError,
5    models::{ContainerCreateBody, ContainerInspectResponse, HostConfig, PortBinding, PortMap},
6    query_parameters::{
7        CreateContainerOptionsBuilder, CreateImageOptionsBuilder,
8        DownloadFromContainerOptionsBuilder, InspectContainerOptions, ListContainersOptionsBuilder,
9        LogsOptionsBuilder, RemoveContainerOptionsBuilder, StartContainerOptions,
10        StopContainerOptionsBuilder,
11    },
12};
13use futures_util::StreamExt;
14use serde::{Deserialize, Serialize};
15use std::{collections::HashMap, io::Read};
16use thiserror::Error;
17
18/// Docker label key marking containers managed by this crate.
19pub const LABEL_MANAGED: &str = "spawn-lnd";
20/// Docker label value used with [`LABEL_MANAGED`].
21pub const LABEL_MANAGED_VALUE: &str = "true";
22/// Docker label key storing the generated cluster id.
23pub const LABEL_CLUSTER: &str = "spawn-lnd.cluster";
24/// Docker label key storing an LND node alias when applicable.
25pub const LABEL_NODE: &str = "spawn-lnd.node";
26/// Docker label key storing the managed container role.
27pub const LABEL_ROLE: &str = "spawn-lnd.role";
28
29const STOP_TIMEOUT_SECONDS: i32 = 10;
30const LOG_TAIL_LINES: &str = "200";
31const LOG_MAX_BYTES: usize = 64 * 1024;
32
33/// Thin async wrapper around a Bollard Docker client.
34#[derive(Clone, Debug)]
35pub struct DockerClient {
36    docker: Docker,
37}
38
39impl DockerClient {
40    /// Connect to Docker using Bollard defaults and verify connectivity with `ping`.
41    pub async fn connect() -> Result<Self, DockerError> {
42        let docker =
43            Docker::connect_with_defaults().map_err(|source| DockerError::Connect { source })?;
44
45        docker
46            .ping()
47            .await
48            .map_err(|source| DockerError::Ping { source })?;
49
50        Ok(Self { docker })
51    }
52
53    /// Wrap an existing Bollard Docker client.
54    pub fn from_bollard(docker: Docker) -> Self {
55        Self { docker }
56    }
57
58    /// Access the underlying Bollard client.
59    pub fn inner(&self) -> &Docker {
60        &self.docker
61    }
62
63    /// Ensure a Docker image exists locally, pulling it if necessary.
64    pub async fn ensure_image(&self, image: &str) -> Result<ImageStatus, DockerError> {
65        match self.docker.inspect_image(image).await {
66            Ok(_) => return Ok(ImageStatus::AlreadyPresent),
67            Err(source) if is_not_found_error(&source) => {}
68            Err(source) => {
69                return Err(DockerError::InspectImage {
70                    image: image.to_string(),
71                    source,
72                });
73            }
74        }
75
76        let options = CreateImageOptionsBuilder::new().from_image(image).build();
77        let mut stream = self.docker.create_image(Some(options), None, None);
78
79        while let Some(result) = stream.next().await {
80            result.map_err(|source| DockerError::PullImage {
81                image: image.to_string(),
82                source,
83            })?;
84        }
85
86        Ok(ImageStatus::Pulled)
87    }
88
89    /// Create and start a container from a [`ContainerSpec`].
90    pub async fn create_and_start(
91        &self,
92        spec: ContainerSpec,
93    ) -> Result<SpawnedContainer, DockerError> {
94        self.ensure_image(&spec.image).await?;
95
96        let options = CreateContainerOptionsBuilder::new()
97            .name(&spec.name)
98            .build();
99        let response = self
100            .docker
101            .create_container(Some(options), spec.create_body())
102            .await
103            .map_err(|source| DockerError::CreateContainer {
104                name: spec.name.clone(),
105                image: spec.image.clone(),
106                source,
107            })?;
108
109        if let Err(source) = self
110            .docker
111            .start_container(&response.id, None::<StartContainerOptions>)
112            .await
113        {
114            let _ = self.stop_and_remove_container(&response.id).await;
115            return Err(DockerError::StartContainer {
116                container_id: response.id,
117                source,
118            });
119        }
120
121        let inspect = self
122            .docker
123            .inspect_container(&response.id, None::<InspectContainerOptions>)
124            .await
125            .map_err(|source| DockerError::InspectContainer {
126                container_id: response.id.clone(),
127                source,
128            })?;
129
130        SpawnedContainer::from_inspect(response.id, inspect)
131    }
132
133    /// Copy a single file from a container and return its bytes.
134    pub async fn copy_file_from_container(
135        &self,
136        container_id: &str,
137        path: &str,
138    ) -> Result<Vec<u8>, DockerError> {
139        let options = DownloadFromContainerOptionsBuilder::new()
140            .path(path)
141            .build();
142        let mut stream = self
143            .docker
144            .download_from_container(container_id, Some(options));
145        let mut archive = Vec::new();
146
147        while let Some(chunk) = stream.next().await {
148            let chunk = chunk.map_err(|source| DockerError::DownloadFromContainer {
149                container_id: container_id.to_string(),
150                path: path.to_string(),
151                source,
152            })?;
153            archive.extend_from_slice(&chunk);
154        }
155
156        extract_first_file_from_tar(&archive).map_err(|message| DockerError::ArchiveRead {
157            container_id: container_id.to_string(),
158            path: path.to_string(),
159            message,
160        })
161    }
162
163    /// Remove all managed containers with the given cluster id.
164    pub async fn cleanup_cluster(&self, cluster_id: &str) -> Result<CleanupReport, DockerError> {
165        self.cleanup_by_labels(cluster_label_filters(cluster_id))
166            .await
167    }
168
169    /// Remove all containers managed by this crate.
170    pub async fn cleanup_all(&self) -> Result<CleanupReport, DockerError> {
171        self.cleanup_by_labels(managed_label_filters()).await
172    }
173
174    /// Return ids for all containers managed by this crate.
175    pub async fn managed_container_ids(&self) -> Result<Vec<String>, DockerError> {
176        self.container_ids_by_labels(managed_label_filters()).await
177    }
178
179    /// Return ids for all managed containers with the given cluster id.
180    pub async fn cluster_container_ids(
181        &self,
182        cluster_id: &str,
183    ) -> Result<Vec<String>, DockerError> {
184        self.container_ids_by_labels(cluster_label_filters(cluster_id))
185            .await
186    }
187
188    /// Return recent stdout/stderr logs from a container.
189    pub async fn container_logs(&self, container_id: &str) -> Result<String, DockerError> {
190        let options = LogsOptionsBuilder::default()
191            .stdout(true)
192            .stderr(true)
193            .tail(LOG_TAIL_LINES)
194            .build();
195        let mut stream = self.docker.logs(container_id, Some(options));
196        let mut logs = String::new();
197
198        while let Some(chunk) = stream.next().await {
199            let chunk = chunk.map_err(|source| DockerError::ReadContainerLogs {
200                container_id: container_id.to_string(),
201                source,
202            })?;
203            append_log_output(&mut logs, chunk);
204
205            if logs.len() > LOG_MAX_BYTES {
206                logs.truncate(LOG_MAX_BYTES);
207                logs.push_str("\n<truncated>");
208                break;
209            }
210        }
211
212        Ok(logs)
213    }
214
215    /// Create a rollback guard for containers created during startup.
216    pub fn rollback_guard(&self) -> StartupRollback<'_> {
217        StartupRollback::new(self)
218    }
219
220    /// Stop and remove explicit container ids.
221    pub async fn rollback_containers<I>(
222        &self,
223        container_ids: I,
224    ) -> Result<CleanupReport, DockerError>
225    where
226        I: IntoIterator,
227        I::Item: Into<String>,
228    {
229        let mut report = CleanupReport {
230            matched: 0,
231            removed: 0,
232            failures: Vec::new(),
233        };
234
235        for container_id in container_ids {
236            report.matched += 1;
237            let container_id = container_id.into();
238
239            match self.stop_and_remove_container(&container_id).await {
240                Ok(()) => report.removed += 1,
241                Err(failure) => report.failures.push(failure),
242            }
243        }
244
245        if report.failures.is_empty() {
246            Ok(report)
247        } else {
248            Err(DockerError::cleanup_failed(report))
249        }
250    }
251
252    async fn cleanup_by_labels(
253        &self,
254        label_filters: HashMap<String, Vec<String>>,
255    ) -> Result<CleanupReport, DockerError> {
256        let options = ListContainersOptionsBuilder::new()
257            .all(true)
258            .filters(&label_filters)
259            .build();
260        let containers = self
261            .docker
262            .list_containers(Some(options))
263            .await
264            .map_err(|source| DockerError::ListContainers { source })?;
265
266        let mut report = CleanupReport {
267            matched: containers.len(),
268            removed: 0,
269            failures: Vec::new(),
270        };
271
272        for container in containers {
273            let Some(container_id) = container.id else {
274                report.failures.push(CleanupFailure {
275                    container_id: "<missing>".to_string(),
276                    operation: "inspect".to_string(),
277                    message: "container summary did not include an id".to_string(),
278                });
279                continue;
280            };
281
282            match self.stop_and_remove_container(&container_id).await {
283                Ok(()) => report.removed += 1,
284                Err(failure) => report.failures.push(failure),
285            }
286        }
287
288        if report.failures.is_empty() {
289            Ok(report)
290        } else {
291            Err(DockerError::cleanup_failed(report))
292        }
293    }
294
295    async fn container_ids_by_labels(
296        &self,
297        label_filters: HashMap<String, Vec<String>>,
298    ) -> Result<Vec<String>, DockerError> {
299        let options = ListContainersOptionsBuilder::new()
300            .all(true)
301            .filters(&label_filters)
302            .build();
303        let containers = self
304            .docker
305            .list_containers(Some(options))
306            .await
307            .map_err(|source| DockerError::ListContainers { source })?;
308
309        Ok(containers
310            .into_iter()
311            .filter_map(|container| container.id)
312            .collect())
313    }
314
315    async fn stop_and_remove_container(&self, container_id: &str) -> Result<(), CleanupFailure> {
316        let stop_options = StopContainerOptionsBuilder::new()
317            .t(STOP_TIMEOUT_SECONDS)
318            .build();
319
320        if let Err(source) = self
321            .docker
322            .stop_container(container_id, Some(stop_options))
323            .await
324            && !is_ignorable_stop_error(&source)
325        {
326            return Err(CleanupFailure::from_error(container_id, "stop", source));
327        }
328
329        let remove_options = RemoveContainerOptionsBuilder::new()
330            .force(true)
331            .v(true)
332            .build();
333
334        if let Err(source) = self
335            .docker
336            .remove_container(container_id, Some(remove_options))
337            .await
338            && !is_not_found_error(&source)
339        {
340            return Err(CleanupFailure::from_error(container_id, "remove", source));
341        }
342
343        Ok(())
344    }
345}
346
347/// Tracks containers that should be removed if startup fails.
348#[derive(Debug)]
349pub struct StartupRollback<'a> {
350    docker: &'a DockerClient,
351    container_ids: Vec<String>,
352    disarmed: bool,
353}
354
355impl<'a> StartupRollback<'a> {
356    fn new(docker: &'a DockerClient) -> Self {
357        Self {
358            docker,
359            container_ids: Vec::new(),
360            disarmed: false,
361        }
362    }
363
364    /// Record a container for later rollback.
365    pub fn record(&mut self, container: &SpawnedContainer) {
366        self.record_id(container.id.clone());
367    }
368
369    /// Record a container id for later rollback.
370    pub fn record_id(&mut self, container_id: impl Into<String>) {
371        self.container_ids.push(container_id.into());
372    }
373
374    /// Return currently tracked container ids.
375    pub fn container_ids(&self) -> &[String] {
376        &self.container_ids
377    }
378
379    /// Disarm the guard and return tracked ids without removing containers.
380    pub fn disarm(mut self) -> Vec<String> {
381        self.disarmed = true;
382        std::mem::take(&mut self.container_ids)
383    }
384
385    /// Remove all tracked containers and disarm the guard.
386    pub async fn rollback(mut self) -> Result<CleanupReport, DockerError> {
387        self.disarmed = true;
388        let container_ids = std::mem::take(&mut self.container_ids);
389        self.docker.rollback_containers(container_ids).await
390    }
391}
392
393impl Drop for StartupRollback<'_> {
394    fn drop(&mut self) {
395        if !self.disarmed && !self.container_ids.is_empty() {
396            eprintln!(
397                "spawn-lnd startup rollback guard dropped with {} tracked container(s); call rollback().await to clean them up",
398                self.container_ids.len()
399            );
400        }
401    }
402}
403
404/// Docker container creation parameters used by [`DockerClient`].
405#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
406pub struct ContainerSpec {
407    /// Container name.
408    pub name: String,
409    /// Docker image reference.
410    pub image: String,
411    /// Container command arguments.
412    pub cmd: Vec<String>,
413    /// Environment variables.
414    pub env: Vec<String>,
415    /// Docker labels.
416    pub labels: HashMap<String, String>,
417    /// Container TCP ports to publish on random host ports.
418    pub exposed_ports: Vec<u16>,
419    /// Optional Docker network mode/name.
420    pub network: Option<String>,
421}
422
423impl ContainerSpec {
424    /// Create a container spec with no command, env, labels, ports, or network.
425    pub fn new(name: impl Into<String>, image: impl Into<String>) -> Self {
426        Self {
427            name: name.into(),
428            image: image.into(),
429            cmd: Vec::new(),
430            env: Vec::new(),
431            labels: HashMap::new(),
432            exposed_ports: Vec::new(),
433            network: None,
434        }
435    }
436
437    /// Set the container command.
438    pub fn cmd<I, S>(mut self, cmd: I) -> Self
439    where
440        I: IntoIterator<Item = S>,
441        S: Into<String>,
442    {
443        self.cmd = cmd.into_iter().map(Into::into).collect();
444        self
445    }
446
447    /// Set environment variables.
448    pub fn env<I, S>(mut self, env: I) -> Self
449    where
450        I: IntoIterator<Item = S>,
451        S: Into<String>,
452    {
453        self.env = env.into_iter().map(Into::into).collect();
454        self
455    }
456
457    /// Set Docker labels.
458    pub fn labels(mut self, labels: HashMap<String, String>) -> Self {
459        self.labels = labels;
460        self
461    }
462
463    /// Publish one container TCP port on a random host port.
464    pub fn expose_port(mut self, port: u16) -> Self {
465        self.exposed_ports.push(port);
466        self
467    }
468
469    /// Publish multiple container TCP ports on random host ports.
470    pub fn expose_ports<I>(mut self, ports: I) -> Self
471    where
472        I: IntoIterator<Item = u16>,
473    {
474        self.exposed_ports.extend(ports);
475        self
476    }
477
478    /// Set Docker network mode/name.
479    pub fn network(mut self, network: impl Into<String>) -> Self {
480        self.network = Some(network.into());
481        self
482    }
483
484    fn create_body(&self) -> ContainerCreateBody {
485        ContainerCreateBody {
486            image: Some(self.image.clone()),
487            cmd: (!self.cmd.is_empty()).then(|| self.cmd.clone()),
488            env: (!self.env.is_empty()).then(|| self.env.clone()),
489            labels: (!self.labels.is_empty()).then(|| self.labels.clone()),
490            exposed_ports: (!self.exposed_ports.is_empty())
491                .then(|| exposed_ports(&self.exposed_ports)),
492            host_config: Some(HostConfig {
493                auto_remove: Some(false),
494                network_mode: self.network.clone(),
495                port_bindings: (!self.exposed_ports.is_empty())
496                    .then(|| port_bindings(&self.exposed_ports)),
497                ..Default::default()
498            }),
499            ..Default::default()
500        }
501    }
502}
503
504/// Metadata for a container created by this crate.
505#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
506pub struct SpawnedContainer {
507    /// Docker container id.
508    pub id: String,
509    /// Docker container name without a leading slash.
510    pub name: Option<String>,
511    /// Bridge-network IP address when Docker reports one.
512    pub ip_address: Option<String>,
513    /// Mapping from container TCP port to published host TCP port.
514    pub host_ports: HashMap<u16, u16>,
515}
516
517impl SpawnedContainer {
518    fn from_inspect(id: String, inspect: ContainerInspectResponse) -> Result<Self, DockerError> {
519        let network_settings = inspect.network_settings.as_ref();
520        Ok(Self {
521            id,
522            name: inspect
523                .name
524                .map(|name| name.trim_start_matches('/').to_string()),
525            ip_address: container_ip_address(network_settings),
526            host_ports: published_tcp_ports(network_settings)?,
527        })
528    }
529
530    /// Return the host port for a published container port.
531    pub fn host_port(&self, container_port: u16) -> Option<u16> {
532        self.host_ports.get(&container_port).copied()
533    }
534}
535
536/// Role label for a managed container.
537#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
538pub enum ContainerRole {
539    /// Bitcoin Core container.
540    Bitcoind,
541    /// LND container.
542    Lnd,
543}
544
545/// Result of ensuring an image is available locally.
546#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
547pub enum ImageStatus {
548    /// The image was already present before the call.
549    AlreadyPresent,
550    /// The image was pulled by the call.
551    Pulled,
552}
553
554impl ContainerRole {
555    /// Return the Docker label value for this role.
556    pub fn as_label_value(self) -> &'static str {
557        match self {
558            Self::Bitcoind => "bitcoind",
559            Self::Lnd => "lnd",
560        }
561    }
562}
563
564/// Summary of a cleanup operation.
565#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
566pub struct CleanupReport {
567    /// Number of containers matched for cleanup.
568    pub matched: usize,
569    /// Number of containers successfully removed.
570    pub removed: usize,
571    /// Per-container cleanup failures.
572    pub failures: Vec<CleanupFailure>,
573}
574
575impl CleanupReport {
576    /// Return true when no cleanup failures occurred.
577    pub fn is_success(&self) -> bool {
578        self.failures.is_empty()
579    }
580}
581
582/// Failure for one container cleanup operation.
583#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
584pub struct CleanupFailure {
585    /// Docker container id.
586    pub container_id: String,
587    /// Operation that failed, such as `stop` or `remove`.
588    pub operation: String,
589    /// Error message returned by Docker.
590    pub message: String,
591}
592
593impl CleanupFailure {
594    fn from_error(container_id: &str, operation: &'static str, source: BollardError) -> Self {
595        Self {
596            container_id: container_id.to_string(),
597            operation: operation.to_string(),
598            message: source.to_string(),
599        }
600    }
601}
602
603/// Error returned by Docker operations.
604#[derive(Debug, Error)]
605#[allow(missing_docs)]
606pub enum DockerError {
607    #[error("failed to connect to Docker")]
608    Connect { source: BollardError },
609
610    #[error("failed to ping Docker")]
611    Ping { source: BollardError },
612
613    #[error("failed to list Docker containers")]
614    ListContainers { source: BollardError },
615
616    #[error("failed to inspect Docker image {image}")]
617    InspectImage { image: String, source: BollardError },
618
619    #[error("failed to pull Docker image {image}")]
620    PullImage { image: String, source: BollardError },
621
622    #[error("failed to create Docker container {name} from image {image}")]
623    CreateContainer {
624        name: String,
625        image: String,
626        source: BollardError,
627    },
628
629    #[error("failed to start Docker container {container_id}")]
630    StartContainer {
631        container_id: String,
632        source: BollardError,
633    },
634
635    #[error("failed to inspect Docker container {container_id}")]
636    InspectContainer {
637        container_id: String,
638        source: BollardError,
639    },
640
641    #[error("Docker reported invalid host port {host_port} for container port {container_port}")]
642    InvalidPublishedPort {
643        container_port: u16,
644        host_port: String,
645    },
646
647    #[error("failed to download {path} from Docker container {container_id}")]
648    DownloadFromContainer {
649        container_id: String,
650        path: String,
651        source: BollardError,
652    },
653
654    #[error("failed to read archived file {path} from Docker container {container_id}: {message}")]
655    ArchiveRead {
656        container_id: String,
657        path: String,
658        message: String,
659    },
660
661    #[error("failed to read logs from Docker container {container_id}")]
662    ReadContainerLogs {
663        container_id: String,
664        source: BollardError,
665    },
666
667    #[error("failed to clean up {count} Docker container(s)")]
668    CleanupFailed {
669        #[source]
670        report: CleanupReportError,
671        count: usize,
672    },
673}
674
675impl DockerError {
676    fn cleanup_failed(report: CleanupReport) -> Self {
677        Self::CleanupFailed {
678            count: report.failures.len(),
679            report: CleanupReportError(report),
680        }
681    }
682}
683
684/// Error wrapper used as the source for cleanup failures.
685#[derive(Debug)]
686pub struct CleanupReportError(
687    /// Cleanup report containing the underlying failures.
688    pub CleanupReport,
689);
690
691impl std::fmt::Display for CleanupReportError {
692    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
693        write!(
694            f,
695            "{} cleanup failure(s) after matching {} container(s)",
696            self.0.failures.len(),
697            self.0.matched
698        )
699    }
700}
701
702impl std::error::Error for CleanupReportError {}
703
704/// Build the Docker labels applied to every managed container.
705pub fn managed_container_labels(
706    cluster_id: &str,
707    role: ContainerRole,
708    node_alias: Option<&str>,
709) -> HashMap<String, String> {
710    let mut labels = HashMap::from([
711        (LABEL_MANAGED.to_string(), LABEL_MANAGED_VALUE.to_string()),
712        (LABEL_CLUSTER.to_string(), cluster_id.to_string()),
713        (LABEL_ROLE.to_string(), role.as_label_value().to_string()),
714    ]);
715
716    if let Some(node_alias) = node_alias {
717        labels.insert(LABEL_NODE.to_string(), node_alias.to_string());
718    }
719
720    labels
721}
722
723/// Build Docker list filters that match all containers managed by this crate.
724pub fn managed_label_filters() -> HashMap<String, Vec<String>> {
725    label_filters([format!("{LABEL_MANAGED}={LABEL_MANAGED_VALUE}")])
726}
727
728/// Build Docker list filters that match containers for a specific cluster.
729pub fn cluster_label_filters(cluster_id: &str) -> HashMap<String, Vec<String>> {
730    label_filters([
731        format!("{LABEL_MANAGED}={LABEL_MANAGED_VALUE}"),
732        format!("{LABEL_CLUSTER}={cluster_id}"),
733    ])
734}
735
736fn label_filters(labels: impl IntoIterator<Item = String>) -> HashMap<String, Vec<String>> {
737    HashMap::from([("label".to_string(), labels.into_iter().collect())])
738}
739
740fn exposed_ports(ports: &[u16]) -> Vec<String> {
741    ports.iter().copied().map(tcp_port_key).collect()
742}
743
744fn port_bindings(ports: &[u16]) -> PortMap {
745    ports
746        .iter()
747        .copied()
748        .map(|port| {
749            (
750                tcp_port_key(port),
751                Some(vec![PortBinding {
752                    host_ip: Some("127.0.0.1".to_string()),
753                    host_port: Some(String::new()),
754                }]),
755            )
756        })
757        .collect()
758}
759
760fn tcp_port_key(port: u16) -> String {
761    format!("{port}/tcp")
762}
763
764fn container_ip_address(
765    network_settings: Option<&bollard::models::NetworkSettings>,
766) -> Option<String> {
767    network_settings
768        .and_then(|settings| settings.networks.as_ref())
769        .and_then(|networks| {
770            networks
771                .values()
772                .filter_map(|endpoint| endpoint.ip_address.as_ref())
773                .find(|ip| !ip.is_empty())
774                .cloned()
775        })
776}
777
778fn published_tcp_ports(
779    network_settings: Option<&bollard::models::NetworkSettings>,
780) -> Result<HashMap<u16, u16>, DockerError> {
781    let Some(ports) = network_settings.and_then(|settings| settings.ports.as_ref()) else {
782        return Ok(HashMap::new());
783    };
784
785    let mut mapped = HashMap::new();
786    for (key, bindings) in ports {
787        let Some(container_port) = parse_tcp_port_key(key) else {
788            continue;
789        };
790        let Some(binding) = bindings
791            .as_ref()
792            .and_then(|bindings| bindings.iter().find(|binding| binding.host_port.is_some()))
793        else {
794            continue;
795        };
796        let Some(host_port) = binding.host_port.as_ref() else {
797            continue;
798        };
799        let host_port =
800            host_port
801                .parse::<u16>()
802                .map_err(|_| DockerError::InvalidPublishedPort {
803                    container_port,
804                    host_port: host_port.clone(),
805                })?;
806
807        mapped.insert(container_port, host_port);
808    }
809
810    Ok(mapped)
811}
812
813fn parse_tcp_port_key(key: &str) -> Option<u16> {
814    let (port, protocol) = key.split_once('/')?;
815    (protocol == "tcp")
816        .then(|| port.parse::<u16>().ok())
817        .flatten()
818}
819
820fn extract_first_file_from_tar(bytes: &[u8]) -> Result<Vec<u8>, String> {
821    let mut archive = tar::Archive::new(bytes);
822    let entries = archive
823        .entries()
824        .map_err(|err| format!("failed to read tar entries: {err}"))?;
825
826    for entry in entries {
827        let mut entry = entry.map_err(|err| format!("failed to read tar entry: {err}"))?;
828        if !entry.header().entry_type().is_file() {
829            continue;
830        }
831
832        let mut file = Vec::new();
833        entry
834            .read_to_end(&mut file)
835            .map_err(|err| format!("failed to read tar file contents: {err}"))?;
836        return Ok(file);
837    }
838
839    Err("archive did not contain a regular file".to_string())
840}
841
842fn append_log_output(logs: &mut String, output: LogOutput) {
843    let prefix = match &output {
844        LogOutput::StdErr { .. } => "stderr",
845        LogOutput::StdOut { .. } => "stdout",
846        LogOutput::StdIn { .. } => "stdin",
847        LogOutput::Console { .. } => "console",
848    };
849    let message = String::from_utf8_lossy(output.as_ref());
850
851    logs.push('[');
852    logs.push_str(prefix);
853    logs.push_str("] ");
854    logs.push_str(&message);
855
856    if !logs.ends_with('\n') {
857        logs.push('\n');
858    }
859}
860
861fn is_ignorable_stop_error(error: &BollardError) -> bool {
862    matches!(docker_status_code(error), Some(304 | 404))
863}
864
865fn is_not_found_error(error: &BollardError) -> bool {
866    matches!(docker_status_code(error), Some(404))
867}
868
869fn docker_status_code(error: &BollardError) -> Option<u16> {
870    match error {
871        BollardError::DockerResponseServerError { status_code, .. } => Some(*status_code),
872        _ => None,
873    }
874}
875
876#[cfg(test)]
877mod tests {
878    use bollard::models::{NetworkSettings, PortBinding};
879    use std::collections::HashMap;
880
881    use super::{
882        ContainerRole, ContainerSpec, LABEL_CLUSTER, LABEL_MANAGED, LABEL_MANAGED_VALUE,
883        LABEL_NODE, LABEL_ROLE, append_log_output, cluster_label_filters,
884        extract_first_file_from_tar, managed_container_labels, managed_label_filters,
885        parse_tcp_port_key, published_tcp_ports,
886    };
887
888    #[test]
889    fn builds_managed_labels_for_lnd_node() {
890        let labels = managed_container_labels("cluster-1", ContainerRole::Lnd, Some("alice"));
891
892        assert_eq!(labels.get(LABEL_MANAGED).unwrap(), LABEL_MANAGED_VALUE);
893        assert_eq!(labels.get(LABEL_CLUSTER).unwrap(), "cluster-1");
894        assert_eq!(labels.get(LABEL_ROLE).unwrap(), "lnd");
895        assert_eq!(labels.get(LABEL_NODE).unwrap(), "alice");
896    }
897
898    #[test]
899    fn builds_managed_labels_for_bitcoind_group() {
900        let labels = managed_container_labels("cluster-1", ContainerRole::Bitcoind, None);
901
902        assert_eq!(labels.get(LABEL_MANAGED).unwrap(), LABEL_MANAGED_VALUE);
903        assert_eq!(labels.get(LABEL_CLUSTER).unwrap(), "cluster-1");
904        assert_eq!(labels.get(LABEL_ROLE).unwrap(), "bitcoind");
905        assert!(!labels.contains_key(LABEL_NODE));
906    }
907
908    #[test]
909    fn builds_cleanup_all_filter() {
910        let filters = managed_label_filters();
911
912        assert_eq!(
913            filters.get("label").unwrap(),
914            &vec!["spawn-lnd=true".to_string()]
915        );
916    }
917
918    #[test]
919    fn builds_cleanup_cluster_filter() {
920        let filters = cluster_label_filters("cluster-1");
921
922        assert_eq!(
923            filters.get("label").unwrap(),
924            &vec![
925                "spawn-lnd=true".to_string(),
926                "spawn-lnd.cluster=cluster-1".to_string()
927            ]
928        );
929    }
930
931    #[test]
932    fn builds_container_create_body_with_labels_and_ports() {
933        let labels = managed_container_labels("cluster-1", ContainerRole::Bitcoind, None);
934        let spec = ContainerSpec::new("spawn-lnd-test", "lightninglabs/bitcoin-core:30")
935            .cmd(["bitcoind", "-regtest"])
936            .env(["A=B"])
937            .labels(labels)
938            .expose_ports([18443, 18444])
939            .network("bridge");
940
941        let body = spec.create_body();
942        let host_config = body.host_config.expect("host config");
943        let port_bindings = host_config.port_bindings.expect("port bindings");
944
945        assert_eq!(body.image.as_deref(), Some("lightninglabs/bitcoin-core:30"));
946        assert_eq!(body.cmd.unwrap(), ["bitcoind", "-regtest"]);
947        assert_eq!(body.env.unwrap(), ["A=B"]);
948        assert_eq!(
949            body.labels.unwrap().get(LABEL_MANAGED).unwrap(),
950            LABEL_MANAGED_VALUE
951        );
952        assert_eq!(host_config.auto_remove, Some(false));
953        assert_eq!(host_config.network_mode.as_deref(), Some("bridge"));
954        assert!(
955            body.exposed_ports
956                .unwrap()
957                .contains(&"18443/tcp".to_string())
958        );
959
960        let binding = port_bindings
961            .get("18443/tcp")
962            .and_then(|bindings| bindings.as_ref())
963            .and_then(|bindings| bindings.first())
964            .expect("port binding");
965        assert_eq!(binding.host_ip.as_deref(), Some("127.0.0.1"));
966        assert_eq!(binding.host_port.as_deref(), Some(""));
967    }
968
969    #[test]
970    fn parses_tcp_port_keys() {
971        assert_eq!(parse_tcp_port_key("10009/tcp"), Some(10009));
972        assert_eq!(parse_tcp_port_key("10009/udp"), None);
973        assert_eq!(parse_tcp_port_key("not-a-port/tcp"), None);
974    }
975
976    #[test]
977    fn extracts_published_tcp_ports() {
978        let settings = NetworkSettings {
979            ports: Some(HashMap::from([
980                (
981                    "10009/tcp".to_string(),
982                    Some(vec![PortBinding {
983                        host_ip: Some("127.0.0.1".to_string()),
984                        host_port: Some("49153".to_string()),
985                    }]),
986                ),
987                (
988                    "9735/udp".to_string(),
989                    Some(vec![PortBinding {
990                        host_ip: Some("127.0.0.1".to_string()),
991                        host_port: Some("49154".to_string()),
992                    }]),
993                ),
994            ])),
995            ..Default::default()
996        };
997
998        let ports = published_tcp_ports(Some(&settings)).expect("published ports");
999
1000        assert_eq!(ports.get(&10009), Some(&49153));
1001        assert!(!ports.contains_key(&9735));
1002    }
1003
1004    #[test]
1005    fn extracts_first_regular_file_from_tar() {
1006        let mut archive = Vec::new();
1007        {
1008            let mut builder = tar::Builder::new(&mut archive);
1009            let content = b"certificate-bytes";
1010            let mut header = tar::Header::new_gnu();
1011            header.set_path("tls.cert").expect("path");
1012            header.set_size(content.len() as u64);
1013            header.set_cksum();
1014            builder
1015                .append(&header, &content[..])
1016                .expect("append tar entry");
1017            builder.finish().expect("finish tar");
1018        }
1019
1020        let file = extract_first_file_from_tar(&archive).expect("file contents");
1021
1022        assert_eq!(file, b"certificate-bytes");
1023    }
1024
1025    #[test]
1026    fn errors_when_tar_has_no_regular_file() {
1027        let mut archive = Vec::new();
1028        {
1029            let mut builder = tar::Builder::new(&mut archive);
1030            let mut header = tar::Header::new_gnu();
1031            header.set_entry_type(tar::EntryType::Directory);
1032            header.set_path("empty-dir").expect("path");
1033            header.set_size(0);
1034            header.set_cksum();
1035            builder
1036                .append(&header, std::io::empty())
1037                .expect("append directory");
1038            builder.finish().expect("finish tar");
1039        }
1040
1041        let error = extract_first_file_from_tar(&archive).expect_err("no file");
1042
1043        assert_eq!(error, "archive did not contain a regular file");
1044    }
1045
1046    #[test]
1047    fn rollback_guard_tracks_and_disarms_ids() {
1048        let docker = super::DockerClient::from_bollard(
1049            bollard::Docker::connect_with_http(
1050                "http://127.0.0.1:65535",
1051                1,
1052                bollard::API_DEFAULT_VERSION,
1053            )
1054            .expect("construct Docker client"),
1055        );
1056        let mut rollback = docker.rollback_guard();
1057
1058        rollback.record_id("container-a");
1059        rollback.record_id("container-b");
1060
1061        assert_eq!(rollback.container_ids(), ["container-a", "container-b"]);
1062        assert_eq!(rollback.disarm(), ["container-a", "container-b"]);
1063    }
1064
1065    #[test]
1066    fn formats_log_output_with_stream_prefix() {
1067        let mut logs = String::new();
1068
1069        append_log_output(
1070            &mut logs,
1071            bollard::container::LogOutput::StdErr {
1072                message: "failure".into(),
1073            },
1074        );
1075
1076        assert_eq!(logs, "[stderr] failure\n");
1077    }
1078}