1use bollard::{
2 Docker,
3 container::LogOutput,
4 errors::Error as BollardError,
5 models::{
6 ContainerCreateBody, ContainerInspectResponse, EndpointIpamConfig, EndpointSettings,
7 HostConfig, Ipam, IpamConfig, NetworkCreateRequest, NetworkInspect, NetworkingConfig,
8 PortBinding, PortMap,
9 },
10 query_parameters::{
11 CreateContainerOptionsBuilder, CreateImageOptionsBuilder,
12 DownloadFromContainerOptionsBuilder, InspectContainerOptions, ListContainersOptionsBuilder,
13 ListNetworksOptionsBuilder, LogsOptionsBuilder, RemoveContainerOptionsBuilder,
14 StartContainerOptions, StopContainerOptionsBuilder,
15 },
16};
17use futures_util::StreamExt;
18use serde::{Deserialize, Serialize};
19use std::{collections::HashMap, io::Read};
20use thiserror::Error;
21
22pub const LABEL_MANAGED: &str = "spawn-lnd";
24pub const LABEL_MANAGED_VALUE: &str = "true";
26pub const LABEL_CLUSTER: &str = "spawn-lnd.cluster";
28pub const LABEL_NODE: &str = "spawn-lnd.node";
30pub const LABEL_ROLE: &str = "spawn-lnd.role";
32
33const STOP_TIMEOUT_SECONDS: i32 = 10;
34const LOG_TAIL_LINES: &str = "200";
35const LOG_MAX_BYTES: usize = 64 * 1024;
36
37#[derive(Clone, Debug)]
39pub struct DockerClient {
40 docker: Docker,
41}
42
43impl DockerClient {
44 pub async fn connect() -> Result<Self, DockerError> {
46 let docker =
47 Docker::connect_with_defaults().map_err(|source| DockerError::Connect { source })?;
48
49 docker
50 .ping()
51 .await
52 .map_err(|source| DockerError::Ping { source })?;
53
54 Ok(Self { docker })
55 }
56
57 pub fn from_bollard(docker: Docker) -> Self {
59 Self { docker }
60 }
61
62 pub fn inner(&self) -> &Docker {
64 &self.docker
65 }
66
67 pub async fn ensure_image(&self, image: &str) -> Result<ImageStatus, DockerError> {
69 match self.docker.inspect_image(image).await {
70 Ok(_) => return Ok(ImageStatus::AlreadyPresent),
71 Err(source) if is_not_found_error(&source) => {}
72 Err(source) => {
73 return Err(DockerError::InspectImage {
74 image: image.to_string(),
75 source,
76 });
77 }
78 }
79
80 let options = CreateImageOptionsBuilder::new().from_image(image).build();
81 let mut stream = self.docker.create_image(Some(options), None, None);
82
83 while let Some(result) = stream.next().await {
84 result.map_err(|source| DockerError::PullImage {
85 image: image.to_string(),
86 source,
87 })?;
88 }
89
90 Ok(ImageStatus::Pulled)
91 }
92
93 pub async fn create_and_start(
95 &self,
96 spec: ContainerSpec,
97 ) -> Result<SpawnedContainer, DockerError> {
98 self.ensure_image(&spec.image).await?;
99
100 let options = CreateContainerOptionsBuilder::new()
101 .name(&spec.name)
102 .build();
103 let response = self
104 .docker
105 .create_container(Some(options), spec.create_body())
106 .await
107 .map_err(|source| DockerError::CreateContainer {
108 name: spec.name.clone(),
109 image: spec.image.clone(),
110 source,
111 })?;
112
113 if let Err(source) = self
114 .docker
115 .start_container(&response.id, None::<StartContainerOptions>)
116 .await
117 {
118 let _ = self.stop_and_remove_container(&response.id).await;
119 return Err(DockerError::StartContainer {
120 container_id: response.id,
121 source,
122 });
123 }
124
125 let inspect = self
126 .docker
127 .inspect_container(&response.id, None::<InspectContainerOptions>)
128 .await
129 .map_err(|source| DockerError::InspectContainer {
130 container_id: response.id.clone(),
131 source,
132 })?;
133
134 SpawnedContainer::from_inspect(response.id, inspect)
135 }
136
137 pub async fn inspect_container(
139 &self,
140 container_id: &str,
141 ) -> Result<SpawnedContainer, DockerError> {
142 let inspect = self
143 .docker
144 .inspect_container(container_id, None::<InspectContainerOptions>)
145 .await
146 .map_err(|source| DockerError::InspectContainer {
147 container_id: container_id.to_string(),
148 source,
149 })?;
150
151 SpawnedContainer::from_inspect(container_id.to_string(), inspect)
152 }
153
154 pub async fn stop_container(&self, container_id: &str) -> Result<(), DockerError> {
156 let stop_options = StopContainerOptionsBuilder::new()
157 .t(STOP_TIMEOUT_SECONDS)
158 .build();
159
160 if let Err(source) = self
161 .docker
162 .stop_container(container_id, Some(stop_options))
163 .await
164 && !is_already_stopped_error(&source)
165 {
166 return Err(DockerError::StopContainer {
167 container_id: container_id.to_string(),
168 source,
169 });
170 }
171
172 Ok(())
173 }
174
175 pub async fn start_container(
177 &self,
178 container_id: &str,
179 ) -> Result<SpawnedContainer, DockerError> {
180 if let Err(source) = self
181 .docker
182 .start_container(container_id, None::<StartContainerOptions>)
183 .await
184 && !is_ignorable_start_error(&source)
185 {
186 return Err(DockerError::StartContainer {
187 container_id: container_id.to_string(),
188 source,
189 });
190 }
191
192 self.inspect_container(container_id).await
193 }
194
195 pub async fn restart_container(
197 &self,
198 container_id: &str,
199 ) -> Result<SpawnedContainer, DockerError> {
200 self.stop_container(container_id).await?;
201 self.start_container(container_id).await
202 }
203
204 pub async fn create_network(&self, spec: NetworkSpec) -> Result<ManagedNetwork, DockerError> {
206 let response = self
207 .docker
208 .create_network(spec.create_request())
209 .await
210 .map_err(|source| DockerError::CreateNetwork {
211 name: spec.name.clone(),
212 source,
213 })?;
214 let inspect = self
215 .docker
216 .inspect_network(&response.id, None)
217 .await
218 .map_err(|source| DockerError::InspectNetwork {
219 network: response.id.clone(),
220 source,
221 })?;
222
223 ManagedNetwork::from_inspect(response.id, spec.name, inspect)
224 }
225
226 pub async fn remove_network(&self, network: &str) -> Result<(), DockerError> {
228 self.docker
229 .remove_network(network)
230 .await
231 .map_err(|source| DockerError::RemoveNetwork {
232 network: network.to_string(),
233 source,
234 })
235 }
236
237 pub async fn copy_file_from_container(
239 &self,
240 container_id: &str,
241 path: &str,
242 ) -> Result<Vec<u8>, DockerError> {
243 let options = DownloadFromContainerOptionsBuilder::new()
244 .path(path)
245 .build();
246 let mut stream = self
247 .docker
248 .download_from_container(container_id, Some(options));
249 let mut archive = Vec::new();
250
251 while let Some(chunk) = stream.next().await {
252 let chunk = chunk.map_err(|source| DockerError::DownloadFromContainer {
253 container_id: container_id.to_string(),
254 path: path.to_string(),
255 source,
256 })?;
257 archive.extend_from_slice(&chunk);
258 }
259
260 extract_first_file_from_tar(&archive).map_err(|message| DockerError::ArchiveRead {
261 container_id: container_id.to_string(),
262 path: path.to_string(),
263 message,
264 })
265 }
266
267 pub async fn cleanup_cluster(&self, cluster_id: &str) -> Result<CleanupReport, DockerError> {
269 let mut report = self
270 .cleanup_containers_by_labels(cluster_label_filters(cluster_id))
271 .await?;
272 report.extend(
273 self.cleanup_networks_by_labels(cluster_label_filters(cluster_id))
274 .await?,
275 );
276 cleanup_result(report)
277 }
278
279 pub async fn cleanup_all(&self) -> Result<CleanupReport, DockerError> {
281 let mut report = self
282 .cleanup_containers_by_labels(managed_label_filters())
283 .await?;
284 report.extend(
285 self.cleanup_networks_by_labels(managed_label_filters())
286 .await?,
287 );
288 cleanup_result(report)
289 }
290
291 pub async fn managed_container_ids(&self) -> Result<Vec<String>, DockerError> {
293 self.container_ids_by_labels(managed_label_filters()).await
294 }
295
296 pub async fn managed_network_ids(&self) -> Result<Vec<String>, DockerError> {
298 self.network_ids_by_labels(managed_label_filters()).await
299 }
300
301 pub async fn cluster_container_ids(
303 &self,
304 cluster_id: &str,
305 ) -> Result<Vec<String>, DockerError> {
306 self.container_ids_by_labels(cluster_label_filters(cluster_id))
307 .await
308 }
309
310 pub async fn cluster_network_ids(&self, cluster_id: &str) -> Result<Vec<String>, DockerError> {
312 self.network_ids_by_labels(cluster_label_filters(cluster_id))
313 .await
314 }
315
316 pub async fn container_logs(&self, container_id: &str) -> Result<String, DockerError> {
318 let options = LogsOptionsBuilder::default()
319 .stdout(true)
320 .stderr(true)
321 .tail(LOG_TAIL_LINES)
322 .build();
323 let mut stream = self.docker.logs(container_id, Some(options));
324 let mut logs = String::new();
325
326 while let Some(chunk) = stream.next().await {
327 let chunk = chunk.map_err(|source| DockerError::ReadContainerLogs {
328 container_id: container_id.to_string(),
329 source,
330 })?;
331 append_log_output(&mut logs, chunk);
332
333 if logs.len() > LOG_MAX_BYTES {
334 logs.truncate(LOG_MAX_BYTES);
335 logs.push_str("\n<truncated>");
336 break;
337 }
338 }
339
340 Ok(logs)
341 }
342
343 pub fn rollback_guard(&self) -> StartupRollback<'_> {
345 StartupRollback::new(self)
346 }
347
348 pub async fn rollback_containers<I>(
350 &self,
351 container_ids: I,
352 ) -> Result<CleanupReport, DockerError>
353 where
354 I: IntoIterator,
355 I::Item: Into<String>,
356 {
357 let mut report = CleanupReport {
358 matched: 0,
359 removed: 0,
360 failures: Vec::new(),
361 };
362
363 for container_id in container_ids {
364 report.matched += 1;
365 let container_id = container_id.into();
366
367 match self.stop_and_remove_container(&container_id).await {
368 Ok(()) => report.removed += 1,
369 Err(failure) => report.failures.push(failure),
370 }
371 }
372
373 if report.failures.is_empty() {
374 Ok(report)
375 } else {
376 Err(DockerError::cleanup_failed(report))
377 }
378 }
379
380 async fn cleanup_containers_by_labels(
381 &self,
382 label_filters: HashMap<String, Vec<String>>,
383 ) -> Result<CleanupReport, DockerError> {
384 let options = ListContainersOptionsBuilder::new()
385 .all(true)
386 .filters(&label_filters)
387 .build();
388 let containers = self
389 .docker
390 .list_containers(Some(options))
391 .await
392 .map_err(|source| DockerError::ListContainers { source })?;
393
394 let mut report = CleanupReport {
395 matched: containers.len(),
396 removed: 0,
397 failures: Vec::new(),
398 };
399
400 for container in containers {
401 let Some(container_id) = container.id else {
402 report.failures.push(CleanupFailure {
403 container_id: "<missing>".to_string(),
404 operation: "inspect".to_string(),
405 message: "container summary did not include an id".to_string(),
406 });
407 continue;
408 };
409
410 match self.stop_and_remove_container(&container_id).await {
411 Ok(()) => report.removed += 1,
412 Err(failure) => report.failures.push(failure),
413 }
414 }
415
416 Ok(report)
417 }
418
419 async fn cleanup_networks_by_labels(
420 &self,
421 label_filters: HashMap<String, Vec<String>>,
422 ) -> Result<CleanupReport, DockerError> {
423 let options = ListNetworksOptionsBuilder::new()
424 .filters(&label_filters)
425 .build();
426 let networks = self
427 .docker
428 .list_networks(Some(options))
429 .await
430 .map_err(|source| DockerError::ListNetworks { source })?;
431
432 let mut report = CleanupReport {
433 matched: networks.len(),
434 removed: 0,
435 failures: Vec::new(),
436 };
437
438 for network in networks {
439 let network_id = network
440 .id
441 .or(network.name)
442 .unwrap_or_else(|| "<missing>".to_string());
443
444 match self.remove_network(&network_id).await {
445 Ok(()) => report.removed += 1,
446 Err(error) => report.failures.push(CleanupFailure {
447 container_id: network_id,
448 operation: "remove-network".to_string(),
449 message: error.to_string(),
450 }),
451 }
452 }
453
454 Ok(report)
455 }
456
457 async fn container_ids_by_labels(
458 &self,
459 label_filters: HashMap<String, Vec<String>>,
460 ) -> Result<Vec<String>, DockerError> {
461 let options = ListContainersOptionsBuilder::new()
462 .all(true)
463 .filters(&label_filters)
464 .build();
465 let containers = self
466 .docker
467 .list_containers(Some(options))
468 .await
469 .map_err(|source| DockerError::ListContainers { source })?;
470
471 Ok(containers
472 .into_iter()
473 .filter_map(|container| container.id)
474 .collect())
475 }
476
477 async fn network_ids_by_labels(
478 &self,
479 label_filters: HashMap<String, Vec<String>>,
480 ) -> Result<Vec<String>, DockerError> {
481 let options = ListNetworksOptionsBuilder::new()
482 .filters(&label_filters)
483 .build();
484 let networks = self
485 .docker
486 .list_networks(Some(options))
487 .await
488 .map_err(|source| DockerError::ListNetworks { source })?;
489
490 Ok(networks
491 .into_iter()
492 .filter_map(|network| network.id)
493 .collect())
494 }
495
496 async fn stop_and_remove_container(&self, container_id: &str) -> Result<(), CleanupFailure> {
497 let stop_options = StopContainerOptionsBuilder::new()
498 .t(STOP_TIMEOUT_SECONDS)
499 .build();
500
501 if let Err(source) = self
502 .docker
503 .stop_container(container_id, Some(stop_options))
504 .await
505 && !is_ignorable_cleanup_stop_error(&source)
506 {
507 return Err(CleanupFailure::from_error(container_id, "stop", source));
508 }
509
510 let remove_options = RemoveContainerOptionsBuilder::new()
511 .force(true)
512 .v(true)
513 .build();
514
515 if let Err(source) = self
516 .docker
517 .remove_container(container_id, Some(remove_options))
518 .await
519 && !is_not_found_error(&source)
520 {
521 return Err(CleanupFailure::from_error(container_id, "remove", source));
522 }
523
524 Ok(())
525 }
526}
527
528#[derive(Debug)]
530pub struct StartupRollback<'a> {
531 docker: &'a DockerClient,
532 container_ids: Vec<String>,
533 disarmed: bool,
534}
535
536impl<'a> StartupRollback<'a> {
537 fn new(docker: &'a DockerClient) -> Self {
538 Self {
539 docker,
540 container_ids: Vec::new(),
541 disarmed: false,
542 }
543 }
544
545 pub fn record(&mut self, container: &SpawnedContainer) {
547 self.record_id(container.id.clone());
548 }
549
550 pub fn record_id(&mut self, container_id: impl Into<String>) {
552 self.container_ids.push(container_id.into());
553 }
554
555 pub fn container_ids(&self) -> &[String] {
557 &self.container_ids
558 }
559
560 pub fn disarm(mut self) -> Vec<String> {
562 self.disarmed = true;
563 std::mem::take(&mut self.container_ids)
564 }
565
566 pub async fn rollback(mut self) -> Result<CleanupReport, DockerError> {
568 self.disarmed = true;
569 let container_ids = std::mem::take(&mut self.container_ids);
570 self.docker.rollback_containers(container_ids).await
571 }
572}
573
574impl Drop for StartupRollback<'_> {
575 fn drop(&mut self) {
576 if !self.disarmed && !self.container_ids.is_empty() {
577 eprintln!(
578 "spawn-lnd startup rollback guard dropped with {} tracked container(s); call rollback().await to clean them up",
579 self.container_ids.len()
580 );
581 }
582 }
583}
584
585#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
587pub struct ContainerSpec {
588 pub name: String,
590 pub image: String,
592 pub cmd: Vec<String>,
594 pub env: Vec<String>,
596 pub labels: HashMap<String, String>,
598 pub exposed_ports: Vec<u16>,
600 pub network: Option<String>,
602 pub ipv4_address: Option<String>,
604}
605
606impl ContainerSpec {
607 pub fn new(name: impl Into<String>, image: impl Into<String>) -> Self {
609 Self {
610 name: name.into(),
611 image: image.into(),
612 cmd: Vec::new(),
613 env: Vec::new(),
614 labels: HashMap::new(),
615 exposed_ports: Vec::new(),
616 network: None,
617 ipv4_address: None,
618 }
619 }
620
621 pub fn cmd<I, S>(mut self, cmd: I) -> Self
623 where
624 I: IntoIterator<Item = S>,
625 S: Into<String>,
626 {
627 self.cmd = cmd.into_iter().map(Into::into).collect();
628 self
629 }
630
631 pub fn env<I, S>(mut self, env: I) -> Self
633 where
634 I: IntoIterator<Item = S>,
635 S: Into<String>,
636 {
637 self.env = env.into_iter().map(Into::into).collect();
638 self
639 }
640
641 pub fn labels(mut self, labels: HashMap<String, String>) -> Self {
643 self.labels = labels;
644 self
645 }
646
647 pub fn expose_port(mut self, port: u16) -> Self {
649 self.exposed_ports.push(port);
650 self
651 }
652
653 pub fn expose_ports<I>(mut self, ports: I) -> Self
655 where
656 I: IntoIterator<Item = u16>,
657 {
658 self.exposed_ports.extend(ports);
659 self
660 }
661
662 pub fn network(mut self, network: impl Into<String>) -> Self {
664 self.network = Some(network.into());
665 self
666 }
667
668 pub fn ipv4_address(mut self, ip: impl Into<String>) -> Self {
670 self.ipv4_address = Some(ip.into());
671 self
672 }
673
674 fn create_body(&self) -> ContainerCreateBody {
675 ContainerCreateBody {
676 image: Some(self.image.clone()),
677 cmd: (!self.cmd.is_empty()).then(|| self.cmd.clone()),
678 env: (!self.env.is_empty()).then(|| self.env.clone()),
679 labels: (!self.labels.is_empty()).then(|| self.labels.clone()),
680 exposed_ports: (!self.exposed_ports.is_empty())
681 .then(|| exposed_ports(&self.exposed_ports)),
682 host_config: Some(HostConfig {
683 auto_remove: Some(false),
684 network_mode: self.network.clone(),
685 port_bindings: (!self.exposed_ports.is_empty())
686 .then(|| port_bindings(&self.exposed_ports)),
687 ..Default::default()
688 }),
689 networking_config: self.networking_config(),
690 ..Default::default()
691 }
692 }
693
694 fn networking_config(&self) -> Option<NetworkingConfig> {
695 let network = self.network.as_ref()?;
696 let ipv4_address = self.ipv4_address.as_ref()?;
697
698 Some(NetworkingConfig {
699 endpoints_config: Some(HashMap::from([(
700 network.clone(),
701 EndpointSettings {
702 ipam_config: Some(EndpointIpamConfig {
703 ipv4_address: Some(ipv4_address.clone()),
704 ..Default::default()
705 }),
706 ..Default::default()
707 },
708 )])),
709 })
710 }
711}
712
713#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
715pub struct NetworkSpec {
716 pub name: String,
718 pub labels: HashMap<String, String>,
720 pub subnet: Option<String>,
722}
723
724impl NetworkSpec {
725 pub fn new(cluster_id: &str) -> Self {
727 Self {
728 name: managed_network_name(cluster_id),
729 labels: managed_network_labels(cluster_id),
730 subnet: None,
731 }
732 }
733
734 pub fn subnet(mut self, subnet: impl Into<String>) -> Self {
736 self.subnet = Some(subnet.into());
737 self
738 }
739
740 fn create_request(&self) -> NetworkCreateRequest {
741 NetworkCreateRequest {
742 name: self.name.clone(),
743 driver: Some("bridge".to_string()),
744 attachable: Some(false),
745 ipam: self.subnet.as_ref().map(|subnet| Ipam {
746 config: Some(vec![IpamConfig {
747 subnet: Some(subnet.clone()),
748 ..Default::default()
749 }]),
750 ..Default::default()
751 }),
752 labels: Some(self.labels.clone()),
753 ..Default::default()
754 }
755 }
756}
757
758#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
760pub struct ManagedNetwork {
761 pub id: String,
763 pub name: String,
765 pub subnet: String,
767}
768
769impl ManagedNetwork {
770 fn from_inspect(
771 id: String,
772 fallback_name: String,
773 inspect: NetworkInspect,
774 ) -> Result<Self, DockerError> {
775 let name = inspect.name.unwrap_or(fallback_name);
776 let subnet = inspect
777 .ipam
778 .and_then(|ipam| ipam.config)
779 .and_then(|configs| configs.into_iter().find_map(|config| config.subnet))
780 .ok_or_else(|| DockerError::MissingNetworkSubnet {
781 network: id.clone(),
782 })?;
783
784 Ok(Self { id, name, subnet })
785 }
786}
787
788#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
790pub struct SpawnedContainer {
791 pub id: String,
793 pub name: Option<String>,
795 pub ip_address: Option<String>,
797 pub host_ports: HashMap<u16, u16>,
799}
800
801impl SpawnedContainer {
802 fn from_inspect(id: String, inspect: ContainerInspectResponse) -> Result<Self, DockerError> {
803 let network_settings = inspect.network_settings.as_ref();
804 Ok(Self {
805 id,
806 name: inspect
807 .name
808 .map(|name| name.trim_start_matches('/').to_string()),
809 ip_address: container_ip_address(network_settings),
810 host_ports: published_tcp_ports(network_settings)?,
811 })
812 }
813
814 pub fn host_port(&self, container_port: u16) -> Option<u16> {
816 self.host_ports.get(&container_port).copied()
817 }
818}
819
820#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
822pub enum ContainerRole {
823 Bitcoind,
825 Lnd,
827}
828
829#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
831pub enum ImageStatus {
832 AlreadyPresent,
834 Pulled,
836}
837
838impl ContainerRole {
839 pub fn as_label_value(self) -> &'static str {
841 match self {
842 Self::Bitcoind => "bitcoind",
843 Self::Lnd => "lnd",
844 }
845 }
846}
847
848#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
850pub struct CleanupReport {
851 pub matched: usize,
853 pub removed: usize,
855 pub failures: Vec<CleanupFailure>,
857}
858
859impl CleanupReport {
860 pub fn is_success(&self) -> bool {
862 self.failures.is_empty()
863 }
864
865 fn extend(&mut self, other: CleanupReport) {
866 self.matched += other.matched;
867 self.removed += other.removed;
868 self.failures.extend(other.failures);
869 }
870}
871
872#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
874pub struct CleanupFailure {
875 pub container_id: String,
877 pub operation: String,
879 pub message: String,
881}
882
883impl CleanupFailure {
884 fn from_error(container_id: &str, operation: &'static str, source: BollardError) -> Self {
885 Self {
886 container_id: container_id.to_string(),
887 operation: operation.to_string(),
888 message: source.to_string(),
889 }
890 }
891}
892
893#[derive(Debug, Error)]
895#[allow(missing_docs)]
896pub enum DockerError {
897 #[error("failed to connect to Docker")]
898 Connect { source: BollardError },
899
900 #[error("failed to ping Docker")]
901 Ping { source: BollardError },
902
903 #[error("failed to list Docker containers")]
904 ListContainers { source: BollardError },
905
906 #[error("failed to list Docker networks")]
907 ListNetworks { source: BollardError },
908
909 #[error("failed to inspect Docker image {image}")]
910 InspectImage { image: String, source: BollardError },
911
912 #[error("failed to pull Docker image {image}")]
913 PullImage { image: String, source: BollardError },
914
915 #[error("failed to create Docker container {name} from image {image}")]
916 CreateContainer {
917 name: String,
918 image: String,
919 source: BollardError,
920 },
921
922 #[error("failed to start Docker container {container_id}")]
923 StartContainer {
924 container_id: String,
925 source: BollardError,
926 },
927
928 #[error("failed to stop Docker container {container_id}")]
929 StopContainer {
930 container_id: String,
931 source: BollardError,
932 },
933
934 #[error("failed to inspect Docker container {container_id}")]
935 InspectContainer {
936 container_id: String,
937 source: BollardError,
938 },
939
940 #[error("failed to create Docker network {name}")]
941 CreateNetwork { name: String, source: BollardError },
942
943 #[error("failed to inspect Docker network {network}")]
944 InspectNetwork {
945 network: String,
946 source: BollardError,
947 },
948
949 #[error("failed to remove Docker network {network}")]
950 RemoveNetwork {
951 network: String,
952 source: BollardError,
953 },
954
955 #[error("Docker network {network} did not report an IPv4 subnet")]
956 MissingNetworkSubnet { network: String },
957
958 #[error("Docker reported invalid host port {host_port} for container port {container_port}")]
959 InvalidPublishedPort {
960 container_port: u16,
961 host_port: String,
962 },
963
964 #[error("failed to download {path} from Docker container {container_id}")]
965 DownloadFromContainer {
966 container_id: String,
967 path: String,
968 source: BollardError,
969 },
970
971 #[error("failed to read archived file {path} from Docker container {container_id}: {message}")]
972 ArchiveRead {
973 container_id: String,
974 path: String,
975 message: String,
976 },
977
978 #[error("failed to read logs from Docker container {container_id}")]
979 ReadContainerLogs {
980 container_id: String,
981 source: BollardError,
982 },
983
984 #[error("failed to clean up {count} Docker container(s)")]
985 CleanupFailed {
986 #[source]
987 report: CleanupReportError,
988 count: usize,
989 },
990}
991
992impl DockerError {
993 fn cleanup_failed(report: CleanupReport) -> Self {
994 Self::CleanupFailed {
995 count: report.failures.len(),
996 report: CleanupReportError(report),
997 }
998 }
999
1000 pub(crate) fn is_network_pool_overlap(&self) -> bool {
1001 let Self::CreateNetwork { source, .. } = self else {
1002 return false;
1003 };
1004 let Some(message) = docker_response_message(source) else {
1005 return false;
1006 };
1007
1008 docker_status_code(source) == Some(400) && message.to_ascii_lowercase().contains("overlap")
1009 }
1010}
1011
1012#[derive(Debug)]
1014pub struct CleanupReportError(
1015 pub CleanupReport,
1017);
1018
1019impl std::fmt::Display for CleanupReportError {
1020 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1021 write!(
1022 f,
1023 "{} cleanup failure(s) after matching {} Docker resource(s)",
1024 self.0.failures.len(),
1025 self.0.matched
1026 )
1027 }
1028}
1029
1030impl std::error::Error for CleanupReportError {}
1031
1032pub fn managed_container_labels(
1034 cluster_id: &str,
1035 role: ContainerRole,
1036 node_alias: Option<&str>,
1037) -> HashMap<String, String> {
1038 let mut labels = HashMap::from([
1039 (LABEL_MANAGED.to_string(), LABEL_MANAGED_VALUE.to_string()),
1040 (LABEL_CLUSTER.to_string(), cluster_id.to_string()),
1041 (LABEL_ROLE.to_string(), role.as_label_value().to_string()),
1042 ]);
1043
1044 if let Some(node_alias) = node_alias {
1045 labels.insert(LABEL_NODE.to_string(), node_alias.to_string());
1046 }
1047
1048 labels
1049}
1050
1051pub fn managed_network_labels(cluster_id: &str) -> HashMap<String, String> {
1053 HashMap::from([
1054 (LABEL_MANAGED.to_string(), LABEL_MANAGED_VALUE.to_string()),
1055 (LABEL_CLUSTER.to_string(), cluster_id.to_string()),
1056 ])
1057}
1058
1059pub fn managed_network_name(cluster_id: &str) -> String {
1061 format!("spawn-lnd-{cluster_id}")
1062}
1063
1064pub fn managed_label_filters() -> HashMap<String, Vec<String>> {
1066 label_filters([format!("{LABEL_MANAGED}={LABEL_MANAGED_VALUE}")])
1067}
1068
1069pub fn cluster_label_filters(cluster_id: &str) -> HashMap<String, Vec<String>> {
1071 label_filters([
1072 format!("{LABEL_MANAGED}={LABEL_MANAGED_VALUE}"),
1073 format!("{LABEL_CLUSTER}={cluster_id}"),
1074 ])
1075}
1076
1077fn label_filters(labels: impl IntoIterator<Item = String>) -> HashMap<String, Vec<String>> {
1078 HashMap::from([("label".to_string(), labels.into_iter().collect())])
1079}
1080
1081fn cleanup_result(report: CleanupReport) -> Result<CleanupReport, DockerError> {
1082 if report.failures.is_empty() {
1083 Ok(report)
1084 } else {
1085 Err(DockerError::cleanup_failed(report))
1086 }
1087}
1088
1089fn exposed_ports(ports: &[u16]) -> Vec<String> {
1090 ports.iter().copied().map(tcp_port_key).collect()
1091}
1092
1093fn port_bindings(ports: &[u16]) -> PortMap {
1094 ports
1095 .iter()
1096 .copied()
1097 .map(|port| {
1098 (
1099 tcp_port_key(port),
1100 Some(vec![PortBinding {
1101 host_ip: Some("127.0.0.1".to_string()),
1102 host_port: Some(String::new()),
1103 }]),
1104 )
1105 })
1106 .collect()
1107}
1108
1109fn tcp_port_key(port: u16) -> String {
1110 format!("{port}/tcp")
1111}
1112
1113fn container_ip_address(
1114 network_settings: Option<&bollard::models::NetworkSettings>,
1115) -> Option<String> {
1116 network_settings
1117 .and_then(|settings| settings.networks.as_ref())
1118 .and_then(|networks| {
1119 networks
1120 .values()
1121 .filter_map(|endpoint| endpoint.ip_address.as_ref())
1122 .find(|ip| !ip.is_empty())
1123 .cloned()
1124 })
1125}
1126
1127fn published_tcp_ports(
1128 network_settings: Option<&bollard::models::NetworkSettings>,
1129) -> Result<HashMap<u16, u16>, DockerError> {
1130 let Some(ports) = network_settings.and_then(|settings| settings.ports.as_ref()) else {
1131 return Ok(HashMap::new());
1132 };
1133
1134 let mut mapped = HashMap::new();
1135 for (key, bindings) in ports {
1136 let Some(container_port) = parse_tcp_port_key(key) else {
1137 continue;
1138 };
1139 let Some(binding) = bindings
1140 .as_ref()
1141 .and_then(|bindings| bindings.iter().find(|binding| binding.host_port.is_some()))
1142 else {
1143 continue;
1144 };
1145 let Some(host_port) = binding.host_port.as_ref() else {
1146 continue;
1147 };
1148 let host_port =
1149 host_port
1150 .parse::<u16>()
1151 .map_err(|_| DockerError::InvalidPublishedPort {
1152 container_port,
1153 host_port: host_port.clone(),
1154 })?;
1155
1156 mapped.insert(container_port, host_port);
1157 }
1158
1159 Ok(mapped)
1160}
1161
1162fn parse_tcp_port_key(key: &str) -> Option<u16> {
1163 let (port, protocol) = key.split_once('/')?;
1164 (protocol == "tcp")
1165 .then(|| port.parse::<u16>().ok())
1166 .flatten()
1167}
1168
1169fn extract_first_file_from_tar(bytes: &[u8]) -> Result<Vec<u8>, String> {
1170 let mut archive = tar::Archive::new(bytes);
1171 let entries = archive
1172 .entries()
1173 .map_err(|err| format!("failed to read tar entries: {err}"))?;
1174
1175 for entry in entries {
1176 let mut entry = entry.map_err(|err| format!("failed to read tar entry: {err}"))?;
1177 if !entry.header().entry_type().is_file() {
1178 continue;
1179 }
1180
1181 let mut file = Vec::new();
1182 entry
1183 .read_to_end(&mut file)
1184 .map_err(|err| format!("failed to read tar file contents: {err}"))?;
1185 return Ok(file);
1186 }
1187
1188 Err("archive did not contain a regular file".to_string())
1189}
1190
1191fn append_log_output(logs: &mut String, output: LogOutput) {
1192 let prefix = match &output {
1193 LogOutput::StdErr { .. } => "stderr",
1194 LogOutput::StdOut { .. } => "stdout",
1195 LogOutput::StdIn { .. } => "stdin",
1196 LogOutput::Console { .. } => "console",
1197 };
1198 let message = String::from_utf8_lossy(output.as_ref());
1199
1200 logs.push('[');
1201 logs.push_str(prefix);
1202 logs.push_str("] ");
1203 logs.push_str(&message);
1204
1205 if !logs.ends_with('\n') {
1206 logs.push('\n');
1207 }
1208}
1209
1210fn is_already_stopped_error(error: &BollardError) -> bool {
1211 matches!(docker_status_code(error), Some(304))
1212}
1213
1214fn is_ignorable_cleanup_stop_error(error: &BollardError) -> bool {
1215 matches!(docker_status_code(error), Some(304 | 404))
1216}
1217
1218fn is_ignorable_start_error(error: &BollardError) -> bool {
1219 matches!(docker_status_code(error), Some(304))
1220}
1221
1222fn is_not_found_error(error: &BollardError) -> bool {
1223 matches!(docker_status_code(error), Some(404))
1224}
1225
1226fn docker_status_code(error: &BollardError) -> Option<u16> {
1227 match error {
1228 BollardError::DockerResponseServerError { status_code, .. } => Some(*status_code),
1229 _ => None,
1230 }
1231}
1232
1233fn docker_response_message(error: &BollardError) -> Option<&str> {
1234 match error {
1235 BollardError::DockerResponseServerError { message, .. } => Some(message),
1236 _ => None,
1237 }
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242 use bollard::models::{NetworkSettings, PortBinding};
1243 use std::collections::HashMap;
1244
1245 use super::{
1246 ContainerRole, ContainerSpec, LABEL_CLUSTER, LABEL_MANAGED, LABEL_MANAGED_VALUE,
1247 LABEL_NODE, LABEL_ROLE, append_log_output, cluster_label_filters,
1248 extract_first_file_from_tar, managed_container_labels, managed_label_filters,
1249 managed_network_labels, managed_network_name, parse_tcp_port_key, published_tcp_ports,
1250 };
1251
1252 #[test]
1253 fn builds_managed_labels_for_lnd_node() {
1254 let labels = managed_container_labels("cluster-1", ContainerRole::Lnd, Some("alice"));
1255
1256 assert_eq!(labels.get(LABEL_MANAGED).unwrap(), LABEL_MANAGED_VALUE);
1257 assert_eq!(labels.get(LABEL_CLUSTER).unwrap(), "cluster-1");
1258 assert_eq!(labels.get(LABEL_ROLE).unwrap(), "lnd");
1259 assert_eq!(labels.get(LABEL_NODE).unwrap(), "alice");
1260 }
1261
1262 #[test]
1263 fn builds_managed_labels_for_bitcoind_group() {
1264 let labels = managed_container_labels("cluster-1", ContainerRole::Bitcoind, None);
1265
1266 assert_eq!(labels.get(LABEL_MANAGED).unwrap(), LABEL_MANAGED_VALUE);
1267 assert_eq!(labels.get(LABEL_CLUSTER).unwrap(), "cluster-1");
1268 assert_eq!(labels.get(LABEL_ROLE).unwrap(), "bitcoind");
1269 assert!(!labels.contains_key(LABEL_NODE));
1270 }
1271
1272 #[test]
1273 fn builds_managed_network_labels_and_name() {
1274 let labels = managed_network_labels("cluster-1");
1275
1276 assert_eq!(managed_network_name("cluster-1"), "spawn-lnd-cluster-1");
1277 assert_eq!(labels.get(LABEL_MANAGED).unwrap(), LABEL_MANAGED_VALUE);
1278 assert_eq!(labels.get(LABEL_CLUSTER).unwrap(), "cluster-1");
1279 assert!(!labels.contains_key(LABEL_ROLE));
1280 }
1281
1282 #[test]
1283 fn builds_cleanup_all_filter() {
1284 let filters = managed_label_filters();
1285
1286 assert_eq!(
1287 filters.get("label").unwrap(),
1288 &vec!["spawn-lnd=true".to_string()]
1289 );
1290 }
1291
1292 #[test]
1293 fn builds_cleanup_cluster_filter() {
1294 let filters = cluster_label_filters("cluster-1");
1295
1296 assert_eq!(
1297 filters.get("label").unwrap(),
1298 &vec![
1299 "spawn-lnd=true".to_string(),
1300 "spawn-lnd.cluster=cluster-1".to_string()
1301 ]
1302 );
1303 }
1304
1305 #[test]
1306 fn builds_container_create_body_with_labels_and_ports() {
1307 let labels = managed_container_labels("cluster-1", ContainerRole::Bitcoind, None);
1308 let spec = ContainerSpec::new("spawn-lnd-test", "lightninglabs/bitcoin-core:30")
1309 .cmd(["bitcoind", "-regtest"])
1310 .env(["A=B"])
1311 .labels(labels)
1312 .expose_ports([18443, 18444])
1313 .network("spawn-lnd-cluster-1")
1314 .ipv4_address("172.28.0.10");
1315
1316 let body = spec.create_body();
1317 let host_config = body.host_config.expect("host config");
1318 let port_bindings = host_config.port_bindings.expect("port bindings");
1319 let endpoint = body
1320 .networking_config
1321 .expect("networking config")
1322 .endpoints_config
1323 .expect("endpoints config")
1324 .remove("spawn-lnd-cluster-1")
1325 .expect("network endpoint");
1326
1327 assert_eq!(body.image.as_deref(), Some("lightninglabs/bitcoin-core:30"));
1328 assert_eq!(body.cmd.unwrap(), ["bitcoind", "-regtest"]);
1329 assert_eq!(body.env.unwrap(), ["A=B"]);
1330 assert_eq!(
1331 body.labels.unwrap().get(LABEL_MANAGED).unwrap(),
1332 LABEL_MANAGED_VALUE
1333 );
1334 assert_eq!(host_config.auto_remove, Some(false));
1335 assert_eq!(
1336 host_config.network_mode.as_deref(),
1337 Some("spawn-lnd-cluster-1")
1338 );
1339 assert_eq!(
1340 endpoint
1341 .ipam_config
1342 .and_then(|ipam| ipam.ipv4_address)
1343 .as_deref(),
1344 Some("172.28.0.10")
1345 );
1346 assert!(
1347 body.exposed_ports
1348 .unwrap()
1349 .contains(&"18443/tcp".to_string())
1350 );
1351
1352 let binding = port_bindings
1353 .get("18443/tcp")
1354 .and_then(|bindings| bindings.as_ref())
1355 .and_then(|bindings| bindings.first())
1356 .expect("port binding");
1357 assert_eq!(binding.host_ip.as_deref(), Some("127.0.0.1"));
1358 assert_eq!(binding.host_port.as_deref(), Some(""));
1359 }
1360
1361 #[test]
1362 fn parses_tcp_port_keys() {
1363 assert_eq!(parse_tcp_port_key("10009/tcp"), Some(10009));
1364 assert_eq!(parse_tcp_port_key("10009/udp"), None);
1365 assert_eq!(parse_tcp_port_key("not-a-port/tcp"), None);
1366 }
1367
1368 #[test]
1369 fn extracts_published_tcp_ports() {
1370 let settings = NetworkSettings {
1371 ports: Some(HashMap::from([
1372 (
1373 "10009/tcp".to_string(),
1374 Some(vec![PortBinding {
1375 host_ip: Some("127.0.0.1".to_string()),
1376 host_port: Some("49153".to_string()),
1377 }]),
1378 ),
1379 (
1380 "9735/udp".to_string(),
1381 Some(vec![PortBinding {
1382 host_ip: Some("127.0.0.1".to_string()),
1383 host_port: Some("49154".to_string()),
1384 }]),
1385 ),
1386 ])),
1387 ..Default::default()
1388 };
1389
1390 let ports = published_tcp_ports(Some(&settings)).expect("published ports");
1391
1392 assert_eq!(ports.get(&10009), Some(&49153));
1393 assert!(!ports.contains_key(&9735));
1394 }
1395
1396 #[test]
1397 fn extracts_first_regular_file_from_tar() {
1398 let mut archive = Vec::new();
1399 {
1400 let mut builder = tar::Builder::new(&mut archive);
1401 let content = b"certificate-bytes";
1402 let mut header = tar::Header::new_gnu();
1403 header.set_path("tls.cert").expect("path");
1404 header.set_size(content.len() as u64);
1405 header.set_cksum();
1406 builder
1407 .append(&header, &content[..])
1408 .expect("append tar entry");
1409 builder.finish().expect("finish tar");
1410 }
1411
1412 let file = extract_first_file_from_tar(&archive).expect("file contents");
1413
1414 assert_eq!(file, b"certificate-bytes");
1415 }
1416
1417 #[test]
1418 fn errors_when_tar_has_no_regular_file() {
1419 let mut archive = Vec::new();
1420 {
1421 let mut builder = tar::Builder::new(&mut archive);
1422 let mut header = tar::Header::new_gnu();
1423 header.set_entry_type(tar::EntryType::Directory);
1424 header.set_path("empty-dir").expect("path");
1425 header.set_size(0);
1426 header.set_cksum();
1427 builder
1428 .append(&header, std::io::empty())
1429 .expect("append directory");
1430 builder.finish().expect("finish tar");
1431 }
1432
1433 let error = extract_first_file_from_tar(&archive).expect_err("no file");
1434
1435 assert_eq!(error, "archive did not contain a regular file");
1436 }
1437
1438 #[test]
1439 fn rollback_guard_tracks_and_disarms_ids() {
1440 let docker = super::DockerClient::from_bollard(
1441 bollard::Docker::connect_with_http(
1442 "http://127.0.0.1:65535",
1443 1,
1444 bollard::API_DEFAULT_VERSION,
1445 )
1446 .expect("construct Docker client"),
1447 );
1448 let mut rollback = docker.rollback_guard();
1449
1450 rollback.record_id("container-a");
1451 rollback.record_id("container-b");
1452
1453 assert_eq!(rollback.container_ids(), ["container-a", "container-b"]);
1454 assert_eq!(rollback.disarm(), ["container-a", "container-b"]);
1455 }
1456
1457 #[test]
1458 fn formats_log_output_with_stream_prefix() {
1459 let mut logs = String::new();
1460
1461 append_log_output(
1462 &mut logs,
1463 bollard::container::LogOutput::StdErr {
1464 message: "failure".into(),
1465 },
1466 );
1467
1468 assert_eq!(logs, "[stderr] failure\n");
1469 }
1470}