Skip to main content

spawn_lnd/
docker.rs

1use bollard::{
2    Docker,
3    container::LogOutput,
4    errors::Error as BollardError,
5    models::{
6        ContainerCreateBody, ContainerInspectResponse, EndpointIpamConfig, EndpointSettings,
7        HostConfig, Ipam, IpamConfig, NetworkCreateRequest, NetworkInspect, NetworkingConfig,
8        PortBinding, PortMap,
9    },
10    query_parameters::{
11        CreateContainerOptionsBuilder, CreateImageOptionsBuilder,
12        DownloadFromContainerOptionsBuilder, InspectContainerOptions, ListContainersOptionsBuilder,
13        ListNetworksOptionsBuilder, LogsOptionsBuilder, RemoveContainerOptionsBuilder,
14        StartContainerOptions, StopContainerOptionsBuilder,
15    },
16};
17use futures_util::StreamExt;
18use serde::{Deserialize, Serialize};
19use std::{collections::HashMap, io::Read};
20use thiserror::Error;
21
22/// Docker label key marking containers managed by this crate.
23pub const LABEL_MANAGED: &str = "spawn-lnd";
24/// Docker label value used with [`LABEL_MANAGED`].
25pub const LABEL_MANAGED_VALUE: &str = "true";
26/// Docker label key storing the generated cluster id.
27pub const LABEL_CLUSTER: &str = "spawn-lnd.cluster";
28/// Docker label key storing an LND node alias when applicable.
29pub const LABEL_NODE: &str = "spawn-lnd.node";
30/// Docker label key storing the managed container role.
31pub const LABEL_ROLE: &str = "spawn-lnd.role";
32
33const STOP_TIMEOUT_SECONDS: i32 = 10;
34const LOG_TAIL_LINES: &str = "200";
35const LOG_MAX_BYTES: usize = 64 * 1024;
36
37/// Thin async wrapper around a Bollard Docker client.
38#[derive(Clone, Debug)]
39pub struct DockerClient {
40    docker: Docker,
41}
42
43impl DockerClient {
44    /// Connect to Docker using Bollard defaults and verify connectivity with `ping`.
45    pub async fn connect() -> Result<Self, DockerError> {
46        let docker =
47            Docker::connect_with_defaults().map_err(|source| DockerError::Connect { source })?;
48
49        docker
50            .ping()
51            .await
52            .map_err(|source| DockerError::Ping { source })?;
53
54        Ok(Self { docker })
55    }
56
57    /// Wrap an existing Bollard Docker client.
58    pub fn from_bollard(docker: Docker) -> Self {
59        Self { docker }
60    }
61
62    /// Access the underlying Bollard client.
63    pub fn inner(&self) -> &Docker {
64        &self.docker
65    }
66
67    /// Ensure a Docker image exists locally, pulling it if necessary.
68    pub async fn ensure_image(&self, image: &str) -> Result<ImageStatus, DockerError> {
69        match self.docker.inspect_image(image).await {
70            Ok(_) => return Ok(ImageStatus::AlreadyPresent),
71            Err(source) if is_not_found_error(&source) => {}
72            Err(source) => {
73                return Err(DockerError::InspectImage {
74                    image: image.to_string(),
75                    source,
76                });
77            }
78        }
79
80        let options = CreateImageOptionsBuilder::new().from_image(image).build();
81        let mut stream = self.docker.create_image(Some(options), None, None);
82
83        while let Some(result) = stream.next().await {
84            result.map_err(|source| DockerError::PullImage {
85                image: image.to_string(),
86                source,
87            })?;
88        }
89
90        Ok(ImageStatus::Pulled)
91    }
92
93    /// Create and start a container from a [`ContainerSpec`].
94    pub async fn create_and_start(
95        &self,
96        spec: ContainerSpec,
97    ) -> Result<SpawnedContainer, DockerError> {
98        self.ensure_image(&spec.image).await?;
99
100        let options = CreateContainerOptionsBuilder::new()
101            .name(&spec.name)
102            .build();
103        let response = self
104            .docker
105            .create_container(Some(options), spec.create_body())
106            .await
107            .map_err(|source| DockerError::CreateContainer {
108                name: spec.name.clone(),
109                image: spec.image.clone(),
110                source,
111            })?;
112
113        if let Err(source) = self
114            .docker
115            .start_container(&response.id, None::<StartContainerOptions>)
116            .await
117        {
118            let _ = self.stop_and_remove_container(&response.id).await;
119            return Err(DockerError::StartContainer {
120                container_id: response.id,
121                source,
122            });
123        }
124
125        let inspect = self
126            .docker
127            .inspect_container(&response.id, None::<InspectContainerOptions>)
128            .await
129            .map_err(|source| DockerError::InspectContainer {
130                container_id: response.id.clone(),
131                source,
132            })?;
133
134        SpawnedContainer::from_inspect(response.id, inspect)
135    }
136
137    /// Inspect an existing container and return refreshed metadata.
138    pub async fn inspect_container(
139        &self,
140        container_id: &str,
141    ) -> Result<SpawnedContainer, DockerError> {
142        let inspect = self
143            .docker
144            .inspect_container(container_id, None::<InspectContainerOptions>)
145            .await
146            .map_err(|source| DockerError::InspectContainer {
147                container_id: container_id.to_string(),
148                source,
149            })?;
150
151        SpawnedContainer::from_inspect(container_id.to_string(), inspect)
152    }
153
154    /// Stop an existing container. Already-stopped containers are treated as success.
155    pub async fn stop_container(&self, container_id: &str) -> Result<(), DockerError> {
156        let stop_options = StopContainerOptionsBuilder::new()
157            .t(STOP_TIMEOUT_SECONDS)
158            .build();
159
160        if let Err(source) = self
161            .docker
162            .stop_container(container_id, Some(stop_options))
163            .await
164            && !is_already_stopped_error(&source)
165        {
166            return Err(DockerError::StopContainer {
167                container_id: container_id.to_string(),
168                source,
169            });
170        }
171
172        Ok(())
173    }
174
175    /// Start an existing container and return refreshed metadata.
176    pub async fn start_container(
177        &self,
178        container_id: &str,
179    ) -> Result<SpawnedContainer, DockerError> {
180        if let Err(source) = self
181            .docker
182            .start_container(container_id, None::<StartContainerOptions>)
183            .await
184            && !is_ignorable_start_error(&source)
185        {
186            return Err(DockerError::StartContainer {
187                container_id: container_id.to_string(),
188                source,
189            });
190        }
191
192        self.inspect_container(container_id).await
193    }
194
195    /// Stop and start an existing container, returning refreshed metadata.
196    pub async fn restart_container(
197        &self,
198        container_id: &str,
199    ) -> Result<SpawnedContainer, DockerError> {
200        self.stop_container(container_id).await?;
201        self.start_container(container_id).await
202    }
203
204    /// Create a managed Docker network and return its inspected metadata.
205    pub async fn create_network(&self, spec: NetworkSpec) -> Result<ManagedNetwork, DockerError> {
206        let response = self
207            .docker
208            .create_network(spec.create_request())
209            .await
210            .map_err(|source| DockerError::CreateNetwork {
211                name: spec.name.clone(),
212                source,
213            })?;
214        let inspect = self
215            .docker
216            .inspect_network(&response.id, None)
217            .await
218            .map_err(|source| DockerError::InspectNetwork {
219                network: response.id.clone(),
220                source,
221            })?;
222
223        ManagedNetwork::from_inspect(response.id, spec.name, inspect)
224    }
225
226    /// Remove a Docker network by id or name.
227    pub async fn remove_network(&self, network: &str) -> Result<(), DockerError> {
228        self.docker
229            .remove_network(network)
230            .await
231            .map_err(|source| DockerError::RemoveNetwork {
232                network: network.to_string(),
233                source,
234            })
235    }
236
237    /// Copy a single file from a container and return its bytes.
238    pub async fn copy_file_from_container(
239        &self,
240        container_id: &str,
241        path: &str,
242    ) -> Result<Vec<u8>, DockerError> {
243        let options = DownloadFromContainerOptionsBuilder::new()
244            .path(path)
245            .build();
246        let mut stream = self
247            .docker
248            .download_from_container(container_id, Some(options));
249        let mut archive = Vec::new();
250
251        while let Some(chunk) = stream.next().await {
252            let chunk = chunk.map_err(|source| DockerError::DownloadFromContainer {
253                container_id: container_id.to_string(),
254                path: path.to_string(),
255                source,
256            })?;
257            archive.extend_from_slice(&chunk);
258        }
259
260        extract_first_file_from_tar(&archive).map_err(|message| DockerError::ArchiveRead {
261            container_id: container_id.to_string(),
262            path: path.to_string(),
263            message,
264        })
265    }
266
267    /// Remove all managed containers with the given cluster id.
268    pub async fn cleanup_cluster(&self, cluster_id: &str) -> Result<CleanupReport, DockerError> {
269        let mut report = self
270            .cleanup_containers_by_labels(cluster_label_filters(cluster_id))
271            .await?;
272        report.extend(
273            self.cleanup_networks_by_labels(cluster_label_filters(cluster_id))
274                .await?,
275        );
276        cleanup_result(report)
277    }
278
279    /// Remove all containers and networks managed by this crate.
280    pub async fn cleanup_all(&self) -> Result<CleanupReport, DockerError> {
281        let mut report = self
282            .cleanup_containers_by_labels(managed_label_filters())
283            .await?;
284        report.extend(
285            self.cleanup_networks_by_labels(managed_label_filters())
286                .await?,
287        );
288        cleanup_result(report)
289    }
290
291    /// Return ids for all containers managed by this crate.
292    pub async fn managed_container_ids(&self) -> Result<Vec<String>, DockerError> {
293        self.container_ids_by_labels(managed_label_filters()).await
294    }
295
296    /// Return ids for all networks managed by this crate.
297    pub async fn managed_network_ids(&self) -> Result<Vec<String>, DockerError> {
298        self.network_ids_by_labels(managed_label_filters()).await
299    }
300
301    /// Return ids for all managed containers with the given cluster id.
302    pub async fn cluster_container_ids(
303        &self,
304        cluster_id: &str,
305    ) -> Result<Vec<String>, DockerError> {
306        self.container_ids_by_labels(cluster_label_filters(cluster_id))
307            .await
308    }
309
310    /// Return ids for all managed networks with the given cluster id.
311    pub async fn cluster_network_ids(&self, cluster_id: &str) -> Result<Vec<String>, DockerError> {
312        self.network_ids_by_labels(cluster_label_filters(cluster_id))
313            .await
314    }
315
316    /// Return recent stdout/stderr logs from a container.
317    pub async fn container_logs(&self, container_id: &str) -> Result<String, DockerError> {
318        let options = LogsOptionsBuilder::default()
319            .stdout(true)
320            .stderr(true)
321            .tail(LOG_TAIL_LINES)
322            .build();
323        let mut stream = self.docker.logs(container_id, Some(options));
324        let mut logs = String::new();
325
326        while let Some(chunk) = stream.next().await {
327            let chunk = chunk.map_err(|source| DockerError::ReadContainerLogs {
328                container_id: container_id.to_string(),
329                source,
330            })?;
331            append_log_output(&mut logs, chunk);
332
333            if logs.len() > LOG_MAX_BYTES {
334                logs.truncate(LOG_MAX_BYTES);
335                logs.push_str("\n<truncated>");
336                break;
337            }
338        }
339
340        Ok(logs)
341    }
342
343    /// Create a rollback guard for containers created during startup.
344    pub fn rollback_guard(&self) -> StartupRollback<'_> {
345        StartupRollback::new(self)
346    }
347
348    /// Stop and remove explicit container ids.
349    pub async fn rollback_containers<I>(
350        &self,
351        container_ids: I,
352    ) -> Result<CleanupReport, DockerError>
353    where
354        I: IntoIterator,
355        I::Item: Into<String>,
356    {
357        let mut report = CleanupReport {
358            matched: 0,
359            removed: 0,
360            failures: Vec::new(),
361        };
362
363        for container_id in container_ids {
364            report.matched += 1;
365            let container_id = container_id.into();
366
367            match self.stop_and_remove_container(&container_id).await {
368                Ok(()) => report.removed += 1,
369                Err(failure) => report.failures.push(failure),
370            }
371        }
372
373        if report.failures.is_empty() {
374            Ok(report)
375        } else {
376            Err(DockerError::cleanup_failed(report))
377        }
378    }
379
380    async fn cleanup_containers_by_labels(
381        &self,
382        label_filters: HashMap<String, Vec<String>>,
383    ) -> Result<CleanupReport, DockerError> {
384        let options = ListContainersOptionsBuilder::new()
385            .all(true)
386            .filters(&label_filters)
387            .build();
388        let containers = self
389            .docker
390            .list_containers(Some(options))
391            .await
392            .map_err(|source| DockerError::ListContainers { source })?;
393
394        let mut report = CleanupReport {
395            matched: containers.len(),
396            removed: 0,
397            failures: Vec::new(),
398        };
399
400        for container in containers {
401            let Some(container_id) = container.id else {
402                report.failures.push(CleanupFailure {
403                    container_id: "<missing>".to_string(),
404                    operation: "inspect".to_string(),
405                    message: "container summary did not include an id".to_string(),
406                });
407                continue;
408            };
409
410            match self.stop_and_remove_container(&container_id).await {
411                Ok(()) => report.removed += 1,
412                Err(failure) => report.failures.push(failure),
413            }
414        }
415
416        Ok(report)
417    }
418
419    async fn cleanup_networks_by_labels(
420        &self,
421        label_filters: HashMap<String, Vec<String>>,
422    ) -> Result<CleanupReport, DockerError> {
423        let options = ListNetworksOptionsBuilder::new()
424            .filters(&label_filters)
425            .build();
426        let networks = self
427            .docker
428            .list_networks(Some(options))
429            .await
430            .map_err(|source| DockerError::ListNetworks { source })?;
431
432        let mut report = CleanupReport {
433            matched: networks.len(),
434            removed: 0,
435            failures: Vec::new(),
436        };
437
438        for network in networks {
439            let network_id = network
440                .id
441                .or(network.name)
442                .unwrap_or_else(|| "<missing>".to_string());
443
444            match self.remove_network(&network_id).await {
445                Ok(()) => report.removed += 1,
446                Err(error) => report.failures.push(CleanupFailure {
447                    container_id: network_id,
448                    operation: "remove-network".to_string(),
449                    message: error.to_string(),
450                }),
451            }
452        }
453
454        Ok(report)
455    }
456
457    async fn container_ids_by_labels(
458        &self,
459        label_filters: HashMap<String, Vec<String>>,
460    ) -> Result<Vec<String>, DockerError> {
461        let options = ListContainersOptionsBuilder::new()
462            .all(true)
463            .filters(&label_filters)
464            .build();
465        let containers = self
466            .docker
467            .list_containers(Some(options))
468            .await
469            .map_err(|source| DockerError::ListContainers { source })?;
470
471        Ok(containers
472            .into_iter()
473            .filter_map(|container| container.id)
474            .collect())
475    }
476
477    async fn network_ids_by_labels(
478        &self,
479        label_filters: HashMap<String, Vec<String>>,
480    ) -> Result<Vec<String>, DockerError> {
481        let options = ListNetworksOptionsBuilder::new()
482            .filters(&label_filters)
483            .build();
484        let networks = self
485            .docker
486            .list_networks(Some(options))
487            .await
488            .map_err(|source| DockerError::ListNetworks { source })?;
489
490        Ok(networks
491            .into_iter()
492            .filter_map(|network| network.id)
493            .collect())
494    }
495
496    async fn stop_and_remove_container(&self, container_id: &str) -> Result<(), CleanupFailure> {
497        let stop_options = StopContainerOptionsBuilder::new()
498            .t(STOP_TIMEOUT_SECONDS)
499            .build();
500
501        if let Err(source) = self
502            .docker
503            .stop_container(container_id, Some(stop_options))
504            .await
505            && !is_ignorable_cleanup_stop_error(&source)
506        {
507            return Err(CleanupFailure::from_error(container_id, "stop", source));
508        }
509
510        let remove_options = RemoveContainerOptionsBuilder::new()
511            .force(true)
512            .v(true)
513            .build();
514
515        if let Err(source) = self
516            .docker
517            .remove_container(container_id, Some(remove_options))
518            .await
519            && !is_not_found_error(&source)
520        {
521            return Err(CleanupFailure::from_error(container_id, "remove", source));
522        }
523
524        Ok(())
525    }
526}
527
528/// Tracks containers that should be removed if startup fails.
529#[derive(Debug)]
530pub struct StartupRollback<'a> {
531    docker: &'a DockerClient,
532    container_ids: Vec<String>,
533    disarmed: bool,
534}
535
536impl<'a> StartupRollback<'a> {
537    fn new(docker: &'a DockerClient) -> Self {
538        Self {
539            docker,
540            container_ids: Vec::new(),
541            disarmed: false,
542        }
543    }
544
545    /// Record a container for later rollback.
546    pub fn record(&mut self, container: &SpawnedContainer) {
547        self.record_id(container.id.clone());
548    }
549
550    /// Record a container id for later rollback.
551    pub fn record_id(&mut self, container_id: impl Into<String>) {
552        self.container_ids.push(container_id.into());
553    }
554
555    /// Return currently tracked container ids.
556    pub fn container_ids(&self) -> &[String] {
557        &self.container_ids
558    }
559
560    /// Disarm the guard and return tracked ids without removing containers.
561    pub fn disarm(mut self) -> Vec<String> {
562        self.disarmed = true;
563        std::mem::take(&mut self.container_ids)
564    }
565
566    /// Remove all tracked containers and disarm the guard.
567    pub async fn rollback(mut self) -> Result<CleanupReport, DockerError> {
568        self.disarmed = true;
569        let container_ids = std::mem::take(&mut self.container_ids);
570        self.docker.rollback_containers(container_ids).await
571    }
572}
573
574impl Drop for StartupRollback<'_> {
575    fn drop(&mut self) {
576        if !self.disarmed && !self.container_ids.is_empty() {
577            eprintln!(
578                "spawn-lnd startup rollback guard dropped with {} tracked container(s); call rollback().await to clean them up",
579                self.container_ids.len()
580            );
581        }
582    }
583}
584
585/// Docker container creation parameters used by [`DockerClient`].
586#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
587pub struct ContainerSpec {
588    /// Container name.
589    pub name: String,
590    /// Docker image reference.
591    pub image: String,
592    /// Container command arguments.
593    pub cmd: Vec<String>,
594    /// Environment variables.
595    pub env: Vec<String>,
596    /// Docker labels.
597    pub labels: HashMap<String, String>,
598    /// Container TCP ports to publish on random host ports.
599    pub exposed_ports: Vec<u16>,
600    /// Optional Docker network mode/name.
601    pub network: Option<String>,
602    /// Optional static IPv4 address on the configured Docker network.
603    pub ipv4_address: Option<String>,
604}
605
606impl ContainerSpec {
607    /// Create a container spec with no command, env, labels, ports, or network.
608    pub fn new(name: impl Into<String>, image: impl Into<String>) -> Self {
609        Self {
610            name: name.into(),
611            image: image.into(),
612            cmd: Vec::new(),
613            env: Vec::new(),
614            labels: HashMap::new(),
615            exposed_ports: Vec::new(),
616            network: None,
617            ipv4_address: None,
618        }
619    }
620
621    /// Set the container command.
622    pub fn cmd<I, S>(mut self, cmd: I) -> Self
623    where
624        I: IntoIterator<Item = S>,
625        S: Into<String>,
626    {
627        self.cmd = cmd.into_iter().map(Into::into).collect();
628        self
629    }
630
631    /// Set environment variables.
632    pub fn env<I, S>(mut self, env: I) -> Self
633    where
634        I: IntoIterator<Item = S>,
635        S: Into<String>,
636    {
637        self.env = env.into_iter().map(Into::into).collect();
638        self
639    }
640
641    /// Set Docker labels.
642    pub fn labels(mut self, labels: HashMap<String, String>) -> Self {
643        self.labels = labels;
644        self
645    }
646
647    /// Publish one container TCP port on a random host port.
648    pub fn expose_port(mut self, port: u16) -> Self {
649        self.exposed_ports.push(port);
650        self
651    }
652
653    /// Publish multiple container TCP ports on random host ports.
654    pub fn expose_ports<I>(mut self, ports: I) -> Self
655    where
656        I: IntoIterator<Item = u16>,
657    {
658        self.exposed_ports.extend(ports);
659        self
660    }
661
662    /// Set Docker network mode/name.
663    pub fn network(mut self, network: impl Into<String>) -> Self {
664        self.network = Some(network.into());
665        self
666    }
667
668    /// Set a static IPv4 address for this container on its configured network.
669    pub fn ipv4_address(mut self, ip: impl Into<String>) -> Self {
670        self.ipv4_address = Some(ip.into());
671        self
672    }
673
674    fn create_body(&self) -> ContainerCreateBody {
675        ContainerCreateBody {
676            image: Some(self.image.clone()),
677            cmd: (!self.cmd.is_empty()).then(|| self.cmd.clone()),
678            env: (!self.env.is_empty()).then(|| self.env.clone()),
679            labels: (!self.labels.is_empty()).then(|| self.labels.clone()),
680            exposed_ports: (!self.exposed_ports.is_empty())
681                .then(|| exposed_ports(&self.exposed_ports)),
682            host_config: Some(HostConfig {
683                auto_remove: Some(false),
684                network_mode: self.network.clone(),
685                port_bindings: (!self.exposed_ports.is_empty())
686                    .then(|| port_bindings(&self.exposed_ports)),
687                ..Default::default()
688            }),
689            networking_config: self.networking_config(),
690            ..Default::default()
691        }
692    }
693
694    fn networking_config(&self) -> Option<NetworkingConfig> {
695        let network = self.network.as_ref()?;
696        let ipv4_address = self.ipv4_address.as_ref()?;
697
698        Some(NetworkingConfig {
699            endpoints_config: Some(HashMap::from([(
700                network.clone(),
701                EndpointSettings {
702                    ipam_config: Some(EndpointIpamConfig {
703                        ipv4_address: Some(ipv4_address.clone()),
704                        ..Default::default()
705                    }),
706                    ..Default::default()
707                },
708            )])),
709        })
710    }
711}
712
713/// Docker network creation parameters used by [`DockerClient`].
714#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
715pub struct NetworkSpec {
716    /// Network name.
717    pub name: String,
718    /// Docker labels.
719    pub labels: HashMap<String, String>,
720    /// Optional IPv4 subnet in CIDR notation.
721    pub subnet: Option<String>,
722}
723
724impl NetworkSpec {
725    /// Create a managed bridge network spec for a cluster.
726    pub fn new(cluster_id: &str) -> Self {
727        Self {
728            name: managed_network_name(cluster_id),
729            labels: managed_network_labels(cluster_id),
730            subnet: None,
731        }
732    }
733
734    /// Set the IPv4 subnet for this network.
735    pub fn subnet(mut self, subnet: impl Into<String>) -> Self {
736        self.subnet = Some(subnet.into());
737        self
738    }
739
740    fn create_request(&self) -> NetworkCreateRequest {
741        NetworkCreateRequest {
742            name: self.name.clone(),
743            driver: Some("bridge".to_string()),
744            attachable: Some(false),
745            ipam: self.subnet.as_ref().map(|subnet| Ipam {
746                config: Some(vec![IpamConfig {
747                    subnet: Some(subnet.clone()),
748                    ..Default::default()
749                }]),
750                ..Default::default()
751            }),
752            labels: Some(self.labels.clone()),
753            ..Default::default()
754        }
755    }
756}
757
758/// Metadata for a Docker network managed by this crate.
759#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
760pub struct ManagedNetwork {
761    /// Docker network id.
762    pub id: String,
763    /// Docker network name.
764    pub name: String,
765    /// IPv4 subnet assigned to the network.
766    pub subnet: String,
767}
768
769impl ManagedNetwork {
770    fn from_inspect(
771        id: String,
772        fallback_name: String,
773        inspect: NetworkInspect,
774    ) -> Result<Self, DockerError> {
775        let name = inspect.name.unwrap_or(fallback_name);
776        let subnet = inspect
777            .ipam
778            .and_then(|ipam| ipam.config)
779            .and_then(|configs| configs.into_iter().find_map(|config| config.subnet))
780            .ok_or_else(|| DockerError::MissingNetworkSubnet {
781                network: id.clone(),
782            })?;
783
784        Ok(Self { id, name, subnet })
785    }
786}
787
788/// Metadata for a container created by this crate.
789#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
790pub struct SpawnedContainer {
791    /// Docker container id.
792    pub id: String,
793    /// Docker container name without a leading slash.
794    pub name: Option<String>,
795    /// Bridge-network IP address when Docker reports one.
796    pub ip_address: Option<String>,
797    /// Mapping from container TCP port to published host TCP port.
798    pub host_ports: HashMap<u16, u16>,
799}
800
801impl SpawnedContainer {
802    fn from_inspect(id: String, inspect: ContainerInspectResponse) -> Result<Self, DockerError> {
803        let network_settings = inspect.network_settings.as_ref();
804        Ok(Self {
805            id,
806            name: inspect
807                .name
808                .map(|name| name.trim_start_matches('/').to_string()),
809            ip_address: container_ip_address(network_settings),
810            host_ports: published_tcp_ports(network_settings)?,
811        })
812    }
813
814    /// Return the host port for a published container port.
815    pub fn host_port(&self, container_port: u16) -> Option<u16> {
816        self.host_ports.get(&container_port).copied()
817    }
818}
819
820/// Role label for a managed container.
821#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
822pub enum ContainerRole {
823    /// Bitcoin Core container.
824    Bitcoind,
825    /// LND container.
826    Lnd,
827}
828
829/// Result of ensuring an image is available locally.
830#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
831pub enum ImageStatus {
832    /// The image was already present before the call.
833    AlreadyPresent,
834    /// The image was pulled by the call.
835    Pulled,
836}
837
838impl ContainerRole {
839    /// Return the Docker label value for this role.
840    pub fn as_label_value(self) -> &'static str {
841        match self {
842            Self::Bitcoind => "bitcoind",
843            Self::Lnd => "lnd",
844        }
845    }
846}
847
848/// Summary of a cleanup operation.
849#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
850pub struct CleanupReport {
851    /// Number of Docker resources matched for cleanup.
852    pub matched: usize,
853    /// Number of Docker resources successfully removed.
854    pub removed: usize,
855    /// Per-resource cleanup failures.
856    pub failures: Vec<CleanupFailure>,
857}
858
859impl CleanupReport {
860    /// Return true when no cleanup failures occurred.
861    pub fn is_success(&self) -> bool {
862        self.failures.is_empty()
863    }
864
865    fn extend(&mut self, other: CleanupReport) {
866        self.matched += other.matched;
867        self.removed += other.removed;
868        self.failures.extend(other.failures);
869    }
870}
871
872/// Failure for one Docker resource cleanup operation.
873#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
874pub struct CleanupFailure {
875    /// Docker resource id.
876    pub container_id: String,
877    /// Operation that failed, such as `stop` or `remove`.
878    pub operation: String,
879    /// Error message returned by Docker.
880    pub message: String,
881}
882
883impl CleanupFailure {
884    fn from_error(container_id: &str, operation: &'static str, source: BollardError) -> Self {
885        Self {
886            container_id: container_id.to_string(),
887            operation: operation.to_string(),
888            message: source.to_string(),
889        }
890    }
891}
892
893/// Error returned by Docker operations.
894#[derive(Debug, Error)]
895#[allow(missing_docs)]
896pub enum DockerError {
897    #[error("failed to connect to Docker")]
898    Connect { source: BollardError },
899
900    #[error("failed to ping Docker")]
901    Ping { source: BollardError },
902
903    #[error("failed to list Docker containers")]
904    ListContainers { source: BollardError },
905
906    #[error("failed to list Docker networks")]
907    ListNetworks { source: BollardError },
908
909    #[error("failed to inspect Docker image {image}")]
910    InspectImage { image: String, source: BollardError },
911
912    #[error("failed to pull Docker image {image}")]
913    PullImage { image: String, source: BollardError },
914
915    #[error("failed to create Docker container {name} from image {image}")]
916    CreateContainer {
917        name: String,
918        image: String,
919        source: BollardError,
920    },
921
922    #[error("failed to start Docker container {container_id}")]
923    StartContainer {
924        container_id: String,
925        source: BollardError,
926    },
927
928    #[error("failed to stop Docker container {container_id}")]
929    StopContainer {
930        container_id: String,
931        source: BollardError,
932    },
933
934    #[error("failed to inspect Docker container {container_id}")]
935    InspectContainer {
936        container_id: String,
937        source: BollardError,
938    },
939
940    #[error("failed to create Docker network {name}")]
941    CreateNetwork { name: String, source: BollardError },
942
943    #[error("failed to inspect Docker network {network}")]
944    InspectNetwork {
945        network: String,
946        source: BollardError,
947    },
948
949    #[error("failed to remove Docker network {network}")]
950    RemoveNetwork {
951        network: String,
952        source: BollardError,
953    },
954
955    #[error("Docker network {network} did not report an IPv4 subnet")]
956    MissingNetworkSubnet { network: String },
957
958    #[error("Docker reported invalid host port {host_port} for container port {container_port}")]
959    InvalidPublishedPort {
960        container_port: u16,
961        host_port: String,
962    },
963
964    #[error("failed to download {path} from Docker container {container_id}")]
965    DownloadFromContainer {
966        container_id: String,
967        path: String,
968        source: BollardError,
969    },
970
971    #[error("failed to read archived file {path} from Docker container {container_id}: {message}")]
972    ArchiveRead {
973        container_id: String,
974        path: String,
975        message: String,
976    },
977
978    #[error("failed to read logs from Docker container {container_id}")]
979    ReadContainerLogs {
980        container_id: String,
981        source: BollardError,
982    },
983
984    #[error("failed to clean up {count} Docker container(s)")]
985    CleanupFailed {
986        #[source]
987        report: CleanupReportError,
988        count: usize,
989    },
990}
991
992impl DockerError {
993    fn cleanup_failed(report: CleanupReport) -> Self {
994        Self::CleanupFailed {
995            count: report.failures.len(),
996            report: CleanupReportError(report),
997        }
998    }
999
1000    pub(crate) fn is_network_pool_overlap(&self) -> bool {
1001        let Self::CreateNetwork { source, .. } = self else {
1002            return false;
1003        };
1004        let Some(message) = docker_response_message(source) else {
1005            return false;
1006        };
1007
1008        docker_status_code(source) == Some(400) && message.to_ascii_lowercase().contains("overlap")
1009    }
1010}
1011
1012/// Error wrapper used as the source for cleanup failures.
1013#[derive(Debug)]
1014pub struct CleanupReportError(
1015    /// Cleanup report containing the underlying failures.
1016    pub CleanupReport,
1017);
1018
1019impl std::fmt::Display for CleanupReportError {
1020    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1021        write!(
1022            f,
1023            "{} cleanup failure(s) after matching {} Docker resource(s)",
1024            self.0.failures.len(),
1025            self.0.matched
1026        )
1027    }
1028}
1029
1030impl std::error::Error for CleanupReportError {}
1031
1032/// Build the Docker labels applied to every managed container.
1033pub fn managed_container_labels(
1034    cluster_id: &str,
1035    role: ContainerRole,
1036    node_alias: Option<&str>,
1037) -> HashMap<String, String> {
1038    let mut labels = HashMap::from([
1039        (LABEL_MANAGED.to_string(), LABEL_MANAGED_VALUE.to_string()),
1040        (LABEL_CLUSTER.to_string(), cluster_id.to_string()),
1041        (LABEL_ROLE.to_string(), role.as_label_value().to_string()),
1042    ]);
1043
1044    if let Some(node_alias) = node_alias {
1045        labels.insert(LABEL_NODE.to_string(), node_alias.to_string());
1046    }
1047
1048    labels
1049}
1050
1051/// Build the Docker labels applied to every managed network.
1052pub fn managed_network_labels(cluster_id: &str) -> HashMap<String, String> {
1053    HashMap::from([
1054        (LABEL_MANAGED.to_string(), LABEL_MANAGED_VALUE.to_string()),
1055        (LABEL_CLUSTER.to_string(), cluster_id.to_string()),
1056    ])
1057}
1058
1059/// Build the Docker network name for a cluster.
1060pub fn managed_network_name(cluster_id: &str) -> String {
1061    format!("spawn-lnd-{cluster_id}")
1062}
1063
1064/// Build Docker list filters that match all containers managed by this crate.
1065pub fn managed_label_filters() -> HashMap<String, Vec<String>> {
1066    label_filters([format!("{LABEL_MANAGED}={LABEL_MANAGED_VALUE}")])
1067}
1068
1069/// Build Docker list filters that match containers for a specific cluster.
1070pub fn cluster_label_filters(cluster_id: &str) -> HashMap<String, Vec<String>> {
1071    label_filters([
1072        format!("{LABEL_MANAGED}={LABEL_MANAGED_VALUE}"),
1073        format!("{LABEL_CLUSTER}={cluster_id}"),
1074    ])
1075}
1076
1077fn label_filters(labels: impl IntoIterator<Item = String>) -> HashMap<String, Vec<String>> {
1078    HashMap::from([("label".to_string(), labels.into_iter().collect())])
1079}
1080
1081fn cleanup_result(report: CleanupReport) -> Result<CleanupReport, DockerError> {
1082    if report.failures.is_empty() {
1083        Ok(report)
1084    } else {
1085        Err(DockerError::cleanup_failed(report))
1086    }
1087}
1088
1089fn exposed_ports(ports: &[u16]) -> Vec<String> {
1090    ports.iter().copied().map(tcp_port_key).collect()
1091}
1092
1093fn port_bindings(ports: &[u16]) -> PortMap {
1094    ports
1095        .iter()
1096        .copied()
1097        .map(|port| {
1098            (
1099                tcp_port_key(port),
1100                Some(vec![PortBinding {
1101                    host_ip: Some("127.0.0.1".to_string()),
1102                    host_port: Some(String::new()),
1103                }]),
1104            )
1105        })
1106        .collect()
1107}
1108
1109fn tcp_port_key(port: u16) -> String {
1110    format!("{port}/tcp")
1111}
1112
1113fn container_ip_address(
1114    network_settings: Option<&bollard::models::NetworkSettings>,
1115) -> Option<String> {
1116    network_settings
1117        .and_then(|settings| settings.networks.as_ref())
1118        .and_then(|networks| {
1119            networks
1120                .values()
1121                .filter_map(|endpoint| endpoint.ip_address.as_ref())
1122                .find(|ip| !ip.is_empty())
1123                .cloned()
1124        })
1125}
1126
1127fn published_tcp_ports(
1128    network_settings: Option<&bollard::models::NetworkSettings>,
1129) -> Result<HashMap<u16, u16>, DockerError> {
1130    let Some(ports) = network_settings.and_then(|settings| settings.ports.as_ref()) else {
1131        return Ok(HashMap::new());
1132    };
1133
1134    let mut mapped = HashMap::new();
1135    for (key, bindings) in ports {
1136        let Some(container_port) = parse_tcp_port_key(key) else {
1137            continue;
1138        };
1139        let Some(binding) = bindings
1140            .as_ref()
1141            .and_then(|bindings| bindings.iter().find(|binding| binding.host_port.is_some()))
1142        else {
1143            continue;
1144        };
1145        let Some(host_port) = binding.host_port.as_ref() else {
1146            continue;
1147        };
1148        let host_port =
1149            host_port
1150                .parse::<u16>()
1151                .map_err(|_| DockerError::InvalidPublishedPort {
1152                    container_port,
1153                    host_port: host_port.clone(),
1154                })?;
1155
1156        mapped.insert(container_port, host_port);
1157    }
1158
1159    Ok(mapped)
1160}
1161
1162fn parse_tcp_port_key(key: &str) -> Option<u16> {
1163    let (port, protocol) = key.split_once('/')?;
1164    (protocol == "tcp")
1165        .then(|| port.parse::<u16>().ok())
1166        .flatten()
1167}
1168
1169fn extract_first_file_from_tar(bytes: &[u8]) -> Result<Vec<u8>, String> {
1170    let mut archive = tar::Archive::new(bytes);
1171    let entries = archive
1172        .entries()
1173        .map_err(|err| format!("failed to read tar entries: {err}"))?;
1174
1175    for entry in entries {
1176        let mut entry = entry.map_err(|err| format!("failed to read tar entry: {err}"))?;
1177        if !entry.header().entry_type().is_file() {
1178            continue;
1179        }
1180
1181        let mut file = Vec::new();
1182        entry
1183            .read_to_end(&mut file)
1184            .map_err(|err| format!("failed to read tar file contents: {err}"))?;
1185        return Ok(file);
1186    }
1187
1188    Err("archive did not contain a regular file".to_string())
1189}
1190
1191fn append_log_output(logs: &mut String, output: LogOutput) {
1192    let prefix = match &output {
1193        LogOutput::StdErr { .. } => "stderr",
1194        LogOutput::StdOut { .. } => "stdout",
1195        LogOutput::StdIn { .. } => "stdin",
1196        LogOutput::Console { .. } => "console",
1197    };
1198    let message = String::from_utf8_lossy(output.as_ref());
1199
1200    logs.push('[');
1201    logs.push_str(prefix);
1202    logs.push_str("] ");
1203    logs.push_str(&message);
1204
1205    if !logs.ends_with('\n') {
1206        logs.push('\n');
1207    }
1208}
1209
1210fn is_already_stopped_error(error: &BollardError) -> bool {
1211    matches!(docker_status_code(error), Some(304))
1212}
1213
1214fn is_ignorable_cleanup_stop_error(error: &BollardError) -> bool {
1215    matches!(docker_status_code(error), Some(304 | 404))
1216}
1217
1218fn is_ignorable_start_error(error: &BollardError) -> bool {
1219    matches!(docker_status_code(error), Some(304))
1220}
1221
1222fn is_not_found_error(error: &BollardError) -> bool {
1223    matches!(docker_status_code(error), Some(404))
1224}
1225
1226fn docker_status_code(error: &BollardError) -> Option<u16> {
1227    match error {
1228        BollardError::DockerResponseServerError { status_code, .. } => Some(*status_code),
1229        _ => None,
1230    }
1231}
1232
1233fn docker_response_message(error: &BollardError) -> Option<&str> {
1234    match error {
1235        BollardError::DockerResponseServerError { message, .. } => Some(message),
1236        _ => None,
1237    }
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242    use bollard::models::{NetworkSettings, PortBinding};
1243    use std::collections::HashMap;
1244
1245    use super::{
1246        ContainerRole, ContainerSpec, LABEL_CLUSTER, LABEL_MANAGED, LABEL_MANAGED_VALUE,
1247        LABEL_NODE, LABEL_ROLE, append_log_output, cluster_label_filters,
1248        extract_first_file_from_tar, managed_container_labels, managed_label_filters,
1249        managed_network_labels, managed_network_name, parse_tcp_port_key, published_tcp_ports,
1250    };
1251
1252    #[test]
1253    fn builds_managed_labels_for_lnd_node() {
1254        let labels = managed_container_labels("cluster-1", ContainerRole::Lnd, Some("alice"));
1255
1256        assert_eq!(labels.get(LABEL_MANAGED).unwrap(), LABEL_MANAGED_VALUE);
1257        assert_eq!(labels.get(LABEL_CLUSTER).unwrap(), "cluster-1");
1258        assert_eq!(labels.get(LABEL_ROLE).unwrap(), "lnd");
1259        assert_eq!(labels.get(LABEL_NODE).unwrap(), "alice");
1260    }
1261
1262    #[test]
1263    fn builds_managed_labels_for_bitcoind_group() {
1264        let labels = managed_container_labels("cluster-1", ContainerRole::Bitcoind, None);
1265
1266        assert_eq!(labels.get(LABEL_MANAGED).unwrap(), LABEL_MANAGED_VALUE);
1267        assert_eq!(labels.get(LABEL_CLUSTER).unwrap(), "cluster-1");
1268        assert_eq!(labels.get(LABEL_ROLE).unwrap(), "bitcoind");
1269        assert!(!labels.contains_key(LABEL_NODE));
1270    }
1271
1272    #[test]
1273    fn builds_managed_network_labels_and_name() {
1274        let labels = managed_network_labels("cluster-1");
1275
1276        assert_eq!(managed_network_name("cluster-1"), "spawn-lnd-cluster-1");
1277        assert_eq!(labels.get(LABEL_MANAGED).unwrap(), LABEL_MANAGED_VALUE);
1278        assert_eq!(labels.get(LABEL_CLUSTER).unwrap(), "cluster-1");
1279        assert!(!labels.contains_key(LABEL_ROLE));
1280    }
1281
1282    #[test]
1283    fn builds_cleanup_all_filter() {
1284        let filters = managed_label_filters();
1285
1286        assert_eq!(
1287            filters.get("label").unwrap(),
1288            &vec!["spawn-lnd=true".to_string()]
1289        );
1290    }
1291
1292    #[test]
1293    fn builds_cleanup_cluster_filter() {
1294        let filters = cluster_label_filters("cluster-1");
1295
1296        assert_eq!(
1297            filters.get("label").unwrap(),
1298            &vec![
1299                "spawn-lnd=true".to_string(),
1300                "spawn-lnd.cluster=cluster-1".to_string()
1301            ]
1302        );
1303    }
1304
1305    #[test]
1306    fn builds_container_create_body_with_labels_and_ports() {
1307        let labels = managed_container_labels("cluster-1", ContainerRole::Bitcoind, None);
1308        let spec = ContainerSpec::new("spawn-lnd-test", "lightninglabs/bitcoin-core:30")
1309            .cmd(["bitcoind", "-regtest"])
1310            .env(["A=B"])
1311            .labels(labels)
1312            .expose_ports([18443, 18444])
1313            .network("spawn-lnd-cluster-1")
1314            .ipv4_address("172.28.0.10");
1315
1316        let body = spec.create_body();
1317        let host_config = body.host_config.expect("host config");
1318        let port_bindings = host_config.port_bindings.expect("port bindings");
1319        let endpoint = body
1320            .networking_config
1321            .expect("networking config")
1322            .endpoints_config
1323            .expect("endpoints config")
1324            .remove("spawn-lnd-cluster-1")
1325            .expect("network endpoint");
1326
1327        assert_eq!(body.image.as_deref(), Some("lightninglabs/bitcoin-core:30"));
1328        assert_eq!(body.cmd.unwrap(), ["bitcoind", "-regtest"]);
1329        assert_eq!(body.env.unwrap(), ["A=B"]);
1330        assert_eq!(
1331            body.labels.unwrap().get(LABEL_MANAGED).unwrap(),
1332            LABEL_MANAGED_VALUE
1333        );
1334        assert_eq!(host_config.auto_remove, Some(false));
1335        assert_eq!(
1336            host_config.network_mode.as_deref(),
1337            Some("spawn-lnd-cluster-1")
1338        );
1339        assert_eq!(
1340            endpoint
1341                .ipam_config
1342                .and_then(|ipam| ipam.ipv4_address)
1343                .as_deref(),
1344            Some("172.28.0.10")
1345        );
1346        assert!(
1347            body.exposed_ports
1348                .unwrap()
1349                .contains(&"18443/tcp".to_string())
1350        );
1351
1352        let binding = port_bindings
1353            .get("18443/tcp")
1354            .and_then(|bindings| bindings.as_ref())
1355            .and_then(|bindings| bindings.first())
1356            .expect("port binding");
1357        assert_eq!(binding.host_ip.as_deref(), Some("127.0.0.1"));
1358        assert_eq!(binding.host_port.as_deref(), Some(""));
1359    }
1360
1361    #[test]
1362    fn parses_tcp_port_keys() {
1363        assert_eq!(parse_tcp_port_key("10009/tcp"), Some(10009));
1364        assert_eq!(parse_tcp_port_key("10009/udp"), None);
1365        assert_eq!(parse_tcp_port_key("not-a-port/tcp"), None);
1366    }
1367
1368    #[test]
1369    fn extracts_published_tcp_ports() {
1370        let settings = NetworkSettings {
1371            ports: Some(HashMap::from([
1372                (
1373                    "10009/tcp".to_string(),
1374                    Some(vec![PortBinding {
1375                        host_ip: Some("127.0.0.1".to_string()),
1376                        host_port: Some("49153".to_string()),
1377                    }]),
1378                ),
1379                (
1380                    "9735/udp".to_string(),
1381                    Some(vec![PortBinding {
1382                        host_ip: Some("127.0.0.1".to_string()),
1383                        host_port: Some("49154".to_string()),
1384                    }]),
1385                ),
1386            ])),
1387            ..Default::default()
1388        };
1389
1390        let ports = published_tcp_ports(Some(&settings)).expect("published ports");
1391
1392        assert_eq!(ports.get(&10009), Some(&49153));
1393        assert!(!ports.contains_key(&9735));
1394    }
1395
1396    #[test]
1397    fn extracts_first_regular_file_from_tar() {
1398        let mut archive = Vec::new();
1399        {
1400            let mut builder = tar::Builder::new(&mut archive);
1401            let content = b"certificate-bytes";
1402            let mut header = tar::Header::new_gnu();
1403            header.set_path("tls.cert").expect("path");
1404            header.set_size(content.len() as u64);
1405            header.set_cksum();
1406            builder
1407                .append(&header, &content[..])
1408                .expect("append tar entry");
1409            builder.finish().expect("finish tar");
1410        }
1411
1412        let file = extract_first_file_from_tar(&archive).expect("file contents");
1413
1414        assert_eq!(file, b"certificate-bytes");
1415    }
1416
1417    #[test]
1418    fn errors_when_tar_has_no_regular_file() {
1419        let mut archive = Vec::new();
1420        {
1421            let mut builder = tar::Builder::new(&mut archive);
1422            let mut header = tar::Header::new_gnu();
1423            header.set_entry_type(tar::EntryType::Directory);
1424            header.set_path("empty-dir").expect("path");
1425            header.set_size(0);
1426            header.set_cksum();
1427            builder
1428                .append(&header, std::io::empty())
1429                .expect("append directory");
1430            builder.finish().expect("finish tar");
1431        }
1432
1433        let error = extract_first_file_from_tar(&archive).expect_err("no file");
1434
1435        assert_eq!(error, "archive did not contain a regular file");
1436    }
1437
1438    #[test]
1439    fn rollback_guard_tracks_and_disarms_ids() {
1440        let docker = super::DockerClient::from_bollard(
1441            bollard::Docker::connect_with_http(
1442                "http://127.0.0.1:65535",
1443                1,
1444                bollard::API_DEFAULT_VERSION,
1445            )
1446            .expect("construct Docker client"),
1447        );
1448        let mut rollback = docker.rollback_guard();
1449
1450        rollback.record_id("container-a");
1451        rollback.record_id("container-b");
1452
1453        assert_eq!(rollback.container_ids(), ["container-a", "container-b"]);
1454        assert_eq!(rollback.disarm(), ["container-a", "container-b"]);
1455    }
1456
1457    #[test]
1458    fn formats_log_output_with_stream_prefix() {
1459        let mut logs = String::new();
1460
1461        append_log_output(
1462            &mut logs,
1463            bollard::container::LogOutput::StdErr {
1464                message: "failure".into(),
1465            },
1466        );
1467
1468        assert_eq!(logs, "[stderr] failure\n");
1469    }
1470}