1use bollard::{
2 Docker,
3 container::LogOutput,
4 errors::Error as BollardError,
5 models::{ContainerCreateBody, ContainerInspectResponse, HostConfig, PortBinding, PortMap},
6 query_parameters::{
7 CreateContainerOptionsBuilder, CreateImageOptionsBuilder,
8 DownloadFromContainerOptionsBuilder, InspectContainerOptions, ListContainersOptionsBuilder,
9 LogsOptionsBuilder, RemoveContainerOptionsBuilder, StartContainerOptions,
10 StopContainerOptionsBuilder,
11 },
12};
13use futures_util::StreamExt;
14use serde::{Deserialize, Serialize};
15use std::{collections::HashMap, io::Read};
16use thiserror::Error;
17
18pub const LABEL_MANAGED: &str = "spawn-lnd";
20pub const LABEL_MANAGED_VALUE: &str = "true";
22pub const LABEL_CLUSTER: &str = "spawn-lnd.cluster";
24pub const LABEL_NODE: &str = "spawn-lnd.node";
26pub const LABEL_ROLE: &str = "spawn-lnd.role";
28
29const STOP_TIMEOUT_SECONDS: i32 = 10;
30const LOG_TAIL_LINES: &str = "200";
31const LOG_MAX_BYTES: usize = 64 * 1024;
32
33#[derive(Clone, Debug)]
35pub struct DockerClient {
36 docker: Docker,
37}
38
39impl DockerClient {
40 pub async fn connect() -> Result<Self, DockerError> {
42 let docker =
43 Docker::connect_with_defaults().map_err(|source| DockerError::Connect { source })?;
44
45 docker
46 .ping()
47 .await
48 .map_err(|source| DockerError::Ping { source })?;
49
50 Ok(Self { docker })
51 }
52
53 pub fn from_bollard(docker: Docker) -> Self {
55 Self { docker }
56 }
57
58 pub fn inner(&self) -> &Docker {
60 &self.docker
61 }
62
63 pub async fn ensure_image(&self, image: &str) -> Result<ImageStatus, DockerError> {
65 match self.docker.inspect_image(image).await {
66 Ok(_) => return Ok(ImageStatus::AlreadyPresent),
67 Err(source) if is_not_found_error(&source) => {}
68 Err(source) => {
69 return Err(DockerError::InspectImage {
70 image: image.to_string(),
71 source,
72 });
73 }
74 }
75
76 let options = CreateImageOptionsBuilder::new().from_image(image).build();
77 let mut stream = self.docker.create_image(Some(options), None, None);
78
79 while let Some(result) = stream.next().await {
80 result.map_err(|source| DockerError::PullImage {
81 image: image.to_string(),
82 source,
83 })?;
84 }
85
86 Ok(ImageStatus::Pulled)
87 }
88
89 pub async fn create_and_start(
91 &self,
92 spec: ContainerSpec,
93 ) -> Result<SpawnedContainer, DockerError> {
94 self.ensure_image(&spec.image).await?;
95
96 let options = CreateContainerOptionsBuilder::new()
97 .name(&spec.name)
98 .build();
99 let response = self
100 .docker
101 .create_container(Some(options), spec.create_body())
102 .await
103 .map_err(|source| DockerError::CreateContainer {
104 name: spec.name.clone(),
105 image: spec.image.clone(),
106 source,
107 })?;
108
109 if let Err(source) = self
110 .docker
111 .start_container(&response.id, None::<StartContainerOptions>)
112 .await
113 {
114 let _ = self.stop_and_remove_container(&response.id).await;
115 return Err(DockerError::StartContainer {
116 container_id: response.id,
117 source,
118 });
119 }
120
121 let inspect = self
122 .docker
123 .inspect_container(&response.id, None::<InspectContainerOptions>)
124 .await
125 .map_err(|source| DockerError::InspectContainer {
126 container_id: response.id.clone(),
127 source,
128 })?;
129
130 SpawnedContainer::from_inspect(response.id, inspect)
131 }
132
133 pub async fn copy_file_from_container(
135 &self,
136 container_id: &str,
137 path: &str,
138 ) -> Result<Vec<u8>, DockerError> {
139 let options = DownloadFromContainerOptionsBuilder::new()
140 .path(path)
141 .build();
142 let mut stream = self
143 .docker
144 .download_from_container(container_id, Some(options));
145 let mut archive = Vec::new();
146
147 while let Some(chunk) = stream.next().await {
148 let chunk = chunk.map_err(|source| DockerError::DownloadFromContainer {
149 container_id: container_id.to_string(),
150 path: path.to_string(),
151 source,
152 })?;
153 archive.extend_from_slice(&chunk);
154 }
155
156 extract_first_file_from_tar(&archive).map_err(|message| DockerError::ArchiveRead {
157 container_id: container_id.to_string(),
158 path: path.to_string(),
159 message,
160 })
161 }
162
163 pub async fn cleanup_cluster(&self, cluster_id: &str) -> Result<CleanupReport, DockerError> {
165 self.cleanup_by_labels(cluster_label_filters(cluster_id))
166 .await
167 }
168
169 pub async fn cleanup_all(&self) -> Result<CleanupReport, DockerError> {
171 self.cleanup_by_labels(managed_label_filters()).await
172 }
173
174 pub async fn managed_container_ids(&self) -> Result<Vec<String>, DockerError> {
176 self.container_ids_by_labels(managed_label_filters()).await
177 }
178
179 pub async fn cluster_container_ids(
181 &self,
182 cluster_id: &str,
183 ) -> Result<Vec<String>, DockerError> {
184 self.container_ids_by_labels(cluster_label_filters(cluster_id))
185 .await
186 }
187
188 pub async fn container_logs(&self, container_id: &str) -> Result<String, DockerError> {
190 let options = LogsOptionsBuilder::default()
191 .stdout(true)
192 .stderr(true)
193 .tail(LOG_TAIL_LINES)
194 .build();
195 let mut stream = self.docker.logs(container_id, Some(options));
196 let mut logs = String::new();
197
198 while let Some(chunk) = stream.next().await {
199 let chunk = chunk.map_err(|source| DockerError::ReadContainerLogs {
200 container_id: container_id.to_string(),
201 source,
202 })?;
203 append_log_output(&mut logs, chunk);
204
205 if logs.len() > LOG_MAX_BYTES {
206 logs.truncate(LOG_MAX_BYTES);
207 logs.push_str("\n<truncated>");
208 break;
209 }
210 }
211
212 Ok(logs)
213 }
214
215 pub fn rollback_guard(&self) -> StartupRollback<'_> {
217 StartupRollback::new(self)
218 }
219
220 pub async fn rollback_containers<I>(
222 &self,
223 container_ids: I,
224 ) -> Result<CleanupReport, DockerError>
225 where
226 I: IntoIterator,
227 I::Item: Into<String>,
228 {
229 let mut report = CleanupReport {
230 matched: 0,
231 removed: 0,
232 failures: Vec::new(),
233 };
234
235 for container_id in container_ids {
236 report.matched += 1;
237 let container_id = container_id.into();
238
239 match self.stop_and_remove_container(&container_id).await {
240 Ok(()) => report.removed += 1,
241 Err(failure) => report.failures.push(failure),
242 }
243 }
244
245 if report.failures.is_empty() {
246 Ok(report)
247 } else {
248 Err(DockerError::cleanup_failed(report))
249 }
250 }
251
252 async fn cleanup_by_labels(
253 &self,
254 label_filters: HashMap<String, Vec<String>>,
255 ) -> Result<CleanupReport, DockerError> {
256 let options = ListContainersOptionsBuilder::new()
257 .all(true)
258 .filters(&label_filters)
259 .build();
260 let containers = self
261 .docker
262 .list_containers(Some(options))
263 .await
264 .map_err(|source| DockerError::ListContainers { source })?;
265
266 let mut report = CleanupReport {
267 matched: containers.len(),
268 removed: 0,
269 failures: Vec::new(),
270 };
271
272 for container in containers {
273 let Some(container_id) = container.id else {
274 report.failures.push(CleanupFailure {
275 container_id: "<missing>".to_string(),
276 operation: "inspect".to_string(),
277 message: "container summary did not include an id".to_string(),
278 });
279 continue;
280 };
281
282 match self.stop_and_remove_container(&container_id).await {
283 Ok(()) => report.removed += 1,
284 Err(failure) => report.failures.push(failure),
285 }
286 }
287
288 if report.failures.is_empty() {
289 Ok(report)
290 } else {
291 Err(DockerError::cleanup_failed(report))
292 }
293 }
294
295 async fn container_ids_by_labels(
296 &self,
297 label_filters: HashMap<String, Vec<String>>,
298 ) -> Result<Vec<String>, DockerError> {
299 let options = ListContainersOptionsBuilder::new()
300 .all(true)
301 .filters(&label_filters)
302 .build();
303 let containers = self
304 .docker
305 .list_containers(Some(options))
306 .await
307 .map_err(|source| DockerError::ListContainers { source })?;
308
309 Ok(containers
310 .into_iter()
311 .filter_map(|container| container.id)
312 .collect())
313 }
314
315 async fn stop_and_remove_container(&self, container_id: &str) -> Result<(), CleanupFailure> {
316 let stop_options = StopContainerOptionsBuilder::new()
317 .t(STOP_TIMEOUT_SECONDS)
318 .build();
319
320 if let Err(source) = self
321 .docker
322 .stop_container(container_id, Some(stop_options))
323 .await
324 && !is_ignorable_stop_error(&source)
325 {
326 return Err(CleanupFailure::from_error(container_id, "stop", source));
327 }
328
329 let remove_options = RemoveContainerOptionsBuilder::new()
330 .force(true)
331 .v(true)
332 .build();
333
334 if let Err(source) = self
335 .docker
336 .remove_container(container_id, Some(remove_options))
337 .await
338 && !is_not_found_error(&source)
339 {
340 return Err(CleanupFailure::from_error(container_id, "remove", source));
341 }
342
343 Ok(())
344 }
345}
346
347#[derive(Debug)]
349pub struct StartupRollback<'a> {
350 docker: &'a DockerClient,
351 container_ids: Vec<String>,
352 disarmed: bool,
353}
354
355impl<'a> StartupRollback<'a> {
356 fn new(docker: &'a DockerClient) -> Self {
357 Self {
358 docker,
359 container_ids: Vec::new(),
360 disarmed: false,
361 }
362 }
363
364 pub fn record(&mut self, container: &SpawnedContainer) {
366 self.record_id(container.id.clone());
367 }
368
369 pub fn record_id(&mut self, container_id: impl Into<String>) {
371 self.container_ids.push(container_id.into());
372 }
373
374 pub fn container_ids(&self) -> &[String] {
376 &self.container_ids
377 }
378
379 pub fn disarm(mut self) -> Vec<String> {
381 self.disarmed = true;
382 std::mem::take(&mut self.container_ids)
383 }
384
385 pub async fn rollback(mut self) -> Result<CleanupReport, DockerError> {
387 self.disarmed = true;
388 let container_ids = std::mem::take(&mut self.container_ids);
389 self.docker.rollback_containers(container_ids).await
390 }
391}
392
393impl Drop for StartupRollback<'_> {
394 fn drop(&mut self) {
395 if !self.disarmed && !self.container_ids.is_empty() {
396 eprintln!(
397 "spawn-lnd startup rollback guard dropped with {} tracked container(s); call rollback().await to clean them up",
398 self.container_ids.len()
399 );
400 }
401 }
402}
403
404#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
406pub struct ContainerSpec {
407 pub name: String,
409 pub image: String,
411 pub cmd: Vec<String>,
413 pub env: Vec<String>,
415 pub labels: HashMap<String, String>,
417 pub exposed_ports: Vec<u16>,
419 pub network: Option<String>,
421}
422
423impl ContainerSpec {
424 pub fn new(name: impl Into<String>, image: impl Into<String>) -> Self {
426 Self {
427 name: name.into(),
428 image: image.into(),
429 cmd: Vec::new(),
430 env: Vec::new(),
431 labels: HashMap::new(),
432 exposed_ports: Vec::new(),
433 network: None,
434 }
435 }
436
437 pub fn cmd<I, S>(mut self, cmd: I) -> Self
439 where
440 I: IntoIterator<Item = S>,
441 S: Into<String>,
442 {
443 self.cmd = cmd.into_iter().map(Into::into).collect();
444 self
445 }
446
447 pub fn env<I, S>(mut self, env: I) -> Self
449 where
450 I: IntoIterator<Item = S>,
451 S: Into<String>,
452 {
453 self.env = env.into_iter().map(Into::into).collect();
454 self
455 }
456
457 pub fn labels(mut self, labels: HashMap<String, String>) -> Self {
459 self.labels = labels;
460 self
461 }
462
463 pub fn expose_port(mut self, port: u16) -> Self {
465 self.exposed_ports.push(port);
466 self
467 }
468
469 pub fn expose_ports<I>(mut self, ports: I) -> Self
471 where
472 I: IntoIterator<Item = u16>,
473 {
474 self.exposed_ports.extend(ports);
475 self
476 }
477
478 pub fn network(mut self, network: impl Into<String>) -> Self {
480 self.network = Some(network.into());
481 self
482 }
483
484 fn create_body(&self) -> ContainerCreateBody {
485 ContainerCreateBody {
486 image: Some(self.image.clone()),
487 cmd: (!self.cmd.is_empty()).then(|| self.cmd.clone()),
488 env: (!self.env.is_empty()).then(|| self.env.clone()),
489 labels: (!self.labels.is_empty()).then(|| self.labels.clone()),
490 exposed_ports: (!self.exposed_ports.is_empty())
491 .then(|| exposed_ports(&self.exposed_ports)),
492 host_config: Some(HostConfig {
493 auto_remove: Some(false),
494 network_mode: self.network.clone(),
495 port_bindings: (!self.exposed_ports.is_empty())
496 .then(|| port_bindings(&self.exposed_ports)),
497 ..Default::default()
498 }),
499 ..Default::default()
500 }
501 }
502}
503
504#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
506pub struct SpawnedContainer {
507 pub id: String,
509 pub name: Option<String>,
511 pub ip_address: Option<String>,
513 pub host_ports: HashMap<u16, u16>,
515}
516
517impl SpawnedContainer {
518 fn from_inspect(id: String, inspect: ContainerInspectResponse) -> Result<Self, DockerError> {
519 let network_settings = inspect.network_settings.as_ref();
520 Ok(Self {
521 id,
522 name: inspect
523 .name
524 .map(|name| name.trim_start_matches('/').to_string()),
525 ip_address: container_ip_address(network_settings),
526 host_ports: published_tcp_ports(network_settings)?,
527 })
528 }
529
530 pub fn host_port(&self, container_port: u16) -> Option<u16> {
532 self.host_ports.get(&container_port).copied()
533 }
534}
535
536#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
538pub enum ContainerRole {
539 Bitcoind,
541 Lnd,
543}
544
545#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
547pub enum ImageStatus {
548 AlreadyPresent,
550 Pulled,
552}
553
554impl ContainerRole {
555 pub fn as_label_value(self) -> &'static str {
557 match self {
558 Self::Bitcoind => "bitcoind",
559 Self::Lnd => "lnd",
560 }
561 }
562}
563
564#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
566pub struct CleanupReport {
567 pub matched: usize,
569 pub removed: usize,
571 pub failures: Vec<CleanupFailure>,
573}
574
575impl CleanupReport {
576 pub fn is_success(&self) -> bool {
578 self.failures.is_empty()
579 }
580}
581
582#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
584pub struct CleanupFailure {
585 pub container_id: String,
587 pub operation: String,
589 pub message: String,
591}
592
593impl CleanupFailure {
594 fn from_error(container_id: &str, operation: &'static str, source: BollardError) -> Self {
595 Self {
596 container_id: container_id.to_string(),
597 operation: operation.to_string(),
598 message: source.to_string(),
599 }
600 }
601}
602
603#[derive(Debug, Error)]
605#[allow(missing_docs)]
606pub enum DockerError {
607 #[error("failed to connect to Docker")]
608 Connect { source: BollardError },
609
610 #[error("failed to ping Docker")]
611 Ping { source: BollardError },
612
613 #[error("failed to list Docker containers")]
614 ListContainers { source: BollardError },
615
616 #[error("failed to inspect Docker image {image}")]
617 InspectImage { image: String, source: BollardError },
618
619 #[error("failed to pull Docker image {image}")]
620 PullImage { image: String, source: BollardError },
621
622 #[error("failed to create Docker container {name} from image {image}")]
623 CreateContainer {
624 name: String,
625 image: String,
626 source: BollardError,
627 },
628
629 #[error("failed to start Docker container {container_id}")]
630 StartContainer {
631 container_id: String,
632 source: BollardError,
633 },
634
635 #[error("failed to inspect Docker container {container_id}")]
636 InspectContainer {
637 container_id: String,
638 source: BollardError,
639 },
640
641 #[error("Docker reported invalid host port {host_port} for container port {container_port}")]
642 InvalidPublishedPort {
643 container_port: u16,
644 host_port: String,
645 },
646
647 #[error("failed to download {path} from Docker container {container_id}")]
648 DownloadFromContainer {
649 container_id: String,
650 path: String,
651 source: BollardError,
652 },
653
654 #[error("failed to read archived file {path} from Docker container {container_id}: {message}")]
655 ArchiveRead {
656 container_id: String,
657 path: String,
658 message: String,
659 },
660
661 #[error("failed to read logs from Docker container {container_id}")]
662 ReadContainerLogs {
663 container_id: String,
664 source: BollardError,
665 },
666
667 #[error("failed to clean up {count} Docker container(s)")]
668 CleanupFailed {
669 #[source]
670 report: CleanupReportError,
671 count: usize,
672 },
673}
674
675impl DockerError {
676 fn cleanup_failed(report: CleanupReport) -> Self {
677 Self::CleanupFailed {
678 count: report.failures.len(),
679 report: CleanupReportError(report),
680 }
681 }
682}
683
684#[derive(Debug)]
686pub struct CleanupReportError(
687 pub CleanupReport,
689);
690
691impl std::fmt::Display for CleanupReportError {
692 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
693 write!(
694 f,
695 "{} cleanup failure(s) after matching {} container(s)",
696 self.0.failures.len(),
697 self.0.matched
698 )
699 }
700}
701
702impl std::error::Error for CleanupReportError {}
703
704pub fn managed_container_labels(
706 cluster_id: &str,
707 role: ContainerRole,
708 node_alias: Option<&str>,
709) -> HashMap<String, String> {
710 let mut labels = HashMap::from([
711 (LABEL_MANAGED.to_string(), LABEL_MANAGED_VALUE.to_string()),
712 (LABEL_CLUSTER.to_string(), cluster_id.to_string()),
713 (LABEL_ROLE.to_string(), role.as_label_value().to_string()),
714 ]);
715
716 if let Some(node_alias) = node_alias {
717 labels.insert(LABEL_NODE.to_string(), node_alias.to_string());
718 }
719
720 labels
721}
722
723pub fn managed_label_filters() -> HashMap<String, Vec<String>> {
725 label_filters([format!("{LABEL_MANAGED}={LABEL_MANAGED_VALUE}")])
726}
727
728pub fn cluster_label_filters(cluster_id: &str) -> HashMap<String, Vec<String>> {
730 label_filters([
731 format!("{LABEL_MANAGED}={LABEL_MANAGED_VALUE}"),
732 format!("{LABEL_CLUSTER}={cluster_id}"),
733 ])
734}
735
736fn label_filters(labels: impl IntoIterator<Item = String>) -> HashMap<String, Vec<String>> {
737 HashMap::from([("label".to_string(), labels.into_iter().collect())])
738}
739
740fn exposed_ports(ports: &[u16]) -> Vec<String> {
741 ports.iter().copied().map(tcp_port_key).collect()
742}
743
744fn port_bindings(ports: &[u16]) -> PortMap {
745 ports
746 .iter()
747 .copied()
748 .map(|port| {
749 (
750 tcp_port_key(port),
751 Some(vec![PortBinding {
752 host_ip: Some("127.0.0.1".to_string()),
753 host_port: Some(String::new()),
754 }]),
755 )
756 })
757 .collect()
758}
759
760fn tcp_port_key(port: u16) -> String {
761 format!("{port}/tcp")
762}
763
764fn container_ip_address(
765 network_settings: Option<&bollard::models::NetworkSettings>,
766) -> Option<String> {
767 network_settings
768 .and_then(|settings| settings.networks.as_ref())
769 .and_then(|networks| {
770 networks
771 .values()
772 .filter_map(|endpoint| endpoint.ip_address.as_ref())
773 .find(|ip| !ip.is_empty())
774 .cloned()
775 })
776}
777
778fn published_tcp_ports(
779 network_settings: Option<&bollard::models::NetworkSettings>,
780) -> Result<HashMap<u16, u16>, DockerError> {
781 let Some(ports) = network_settings.and_then(|settings| settings.ports.as_ref()) else {
782 return Ok(HashMap::new());
783 };
784
785 let mut mapped = HashMap::new();
786 for (key, bindings) in ports {
787 let Some(container_port) = parse_tcp_port_key(key) else {
788 continue;
789 };
790 let Some(binding) = bindings
791 .as_ref()
792 .and_then(|bindings| bindings.iter().find(|binding| binding.host_port.is_some()))
793 else {
794 continue;
795 };
796 let Some(host_port) = binding.host_port.as_ref() else {
797 continue;
798 };
799 let host_port =
800 host_port
801 .parse::<u16>()
802 .map_err(|_| DockerError::InvalidPublishedPort {
803 container_port,
804 host_port: host_port.clone(),
805 })?;
806
807 mapped.insert(container_port, host_port);
808 }
809
810 Ok(mapped)
811}
812
813fn parse_tcp_port_key(key: &str) -> Option<u16> {
814 let (port, protocol) = key.split_once('/')?;
815 (protocol == "tcp")
816 .then(|| port.parse::<u16>().ok())
817 .flatten()
818}
819
820fn extract_first_file_from_tar(bytes: &[u8]) -> Result<Vec<u8>, String> {
821 let mut archive = tar::Archive::new(bytes);
822 let entries = archive
823 .entries()
824 .map_err(|err| format!("failed to read tar entries: {err}"))?;
825
826 for entry in entries {
827 let mut entry = entry.map_err(|err| format!("failed to read tar entry: {err}"))?;
828 if !entry.header().entry_type().is_file() {
829 continue;
830 }
831
832 let mut file = Vec::new();
833 entry
834 .read_to_end(&mut file)
835 .map_err(|err| format!("failed to read tar file contents: {err}"))?;
836 return Ok(file);
837 }
838
839 Err("archive did not contain a regular file".to_string())
840}
841
842fn append_log_output(logs: &mut String, output: LogOutput) {
843 let prefix = match &output {
844 LogOutput::StdErr { .. } => "stderr",
845 LogOutput::StdOut { .. } => "stdout",
846 LogOutput::StdIn { .. } => "stdin",
847 LogOutput::Console { .. } => "console",
848 };
849 let message = String::from_utf8_lossy(output.as_ref());
850
851 logs.push('[');
852 logs.push_str(prefix);
853 logs.push_str("] ");
854 logs.push_str(&message);
855
856 if !logs.ends_with('\n') {
857 logs.push('\n');
858 }
859}
860
861fn is_ignorable_stop_error(error: &BollardError) -> bool {
862 matches!(docker_status_code(error), Some(304 | 404))
863}
864
865fn is_not_found_error(error: &BollardError) -> bool {
866 matches!(docker_status_code(error), Some(404))
867}
868
869fn docker_status_code(error: &BollardError) -> Option<u16> {
870 match error {
871 BollardError::DockerResponseServerError { status_code, .. } => Some(*status_code),
872 _ => None,
873 }
874}
875
876#[cfg(test)]
877mod tests {
878 use bollard::models::{NetworkSettings, PortBinding};
879 use std::collections::HashMap;
880
881 use super::{
882 ContainerRole, ContainerSpec, LABEL_CLUSTER, LABEL_MANAGED, LABEL_MANAGED_VALUE,
883 LABEL_NODE, LABEL_ROLE, append_log_output, cluster_label_filters,
884 extract_first_file_from_tar, managed_container_labels, managed_label_filters,
885 parse_tcp_port_key, published_tcp_ports,
886 };
887
888 #[test]
889 fn builds_managed_labels_for_lnd_node() {
890 let labels = managed_container_labels("cluster-1", ContainerRole::Lnd, Some("alice"));
891
892 assert_eq!(labels.get(LABEL_MANAGED).unwrap(), LABEL_MANAGED_VALUE);
893 assert_eq!(labels.get(LABEL_CLUSTER).unwrap(), "cluster-1");
894 assert_eq!(labels.get(LABEL_ROLE).unwrap(), "lnd");
895 assert_eq!(labels.get(LABEL_NODE).unwrap(), "alice");
896 }
897
898 #[test]
899 fn builds_managed_labels_for_bitcoind_group() {
900 let labels = managed_container_labels("cluster-1", ContainerRole::Bitcoind, None);
901
902 assert_eq!(labels.get(LABEL_MANAGED).unwrap(), LABEL_MANAGED_VALUE);
903 assert_eq!(labels.get(LABEL_CLUSTER).unwrap(), "cluster-1");
904 assert_eq!(labels.get(LABEL_ROLE).unwrap(), "bitcoind");
905 assert!(!labels.contains_key(LABEL_NODE));
906 }
907
908 #[test]
909 fn builds_cleanup_all_filter() {
910 let filters = managed_label_filters();
911
912 assert_eq!(
913 filters.get("label").unwrap(),
914 &vec!["spawn-lnd=true".to_string()]
915 );
916 }
917
918 #[test]
919 fn builds_cleanup_cluster_filter() {
920 let filters = cluster_label_filters("cluster-1");
921
922 assert_eq!(
923 filters.get("label").unwrap(),
924 &vec![
925 "spawn-lnd=true".to_string(),
926 "spawn-lnd.cluster=cluster-1".to_string()
927 ]
928 );
929 }
930
931 #[test]
932 fn builds_container_create_body_with_labels_and_ports() {
933 let labels = managed_container_labels("cluster-1", ContainerRole::Bitcoind, None);
934 let spec = ContainerSpec::new("spawn-lnd-test", "lightninglabs/bitcoin-core:30")
935 .cmd(["bitcoind", "-regtest"])
936 .env(["A=B"])
937 .labels(labels)
938 .expose_ports([18443, 18444])
939 .network("bridge");
940
941 let body = spec.create_body();
942 let host_config = body.host_config.expect("host config");
943 let port_bindings = host_config.port_bindings.expect("port bindings");
944
945 assert_eq!(body.image.as_deref(), Some("lightninglabs/bitcoin-core:30"));
946 assert_eq!(body.cmd.unwrap(), ["bitcoind", "-regtest"]);
947 assert_eq!(body.env.unwrap(), ["A=B"]);
948 assert_eq!(
949 body.labels.unwrap().get(LABEL_MANAGED).unwrap(),
950 LABEL_MANAGED_VALUE
951 );
952 assert_eq!(host_config.auto_remove, Some(false));
953 assert_eq!(host_config.network_mode.as_deref(), Some("bridge"));
954 assert!(
955 body.exposed_ports
956 .unwrap()
957 .contains(&"18443/tcp".to_string())
958 );
959
960 let binding = port_bindings
961 .get("18443/tcp")
962 .and_then(|bindings| bindings.as_ref())
963 .and_then(|bindings| bindings.first())
964 .expect("port binding");
965 assert_eq!(binding.host_ip.as_deref(), Some("127.0.0.1"));
966 assert_eq!(binding.host_port.as_deref(), Some(""));
967 }
968
969 #[test]
970 fn parses_tcp_port_keys() {
971 assert_eq!(parse_tcp_port_key("10009/tcp"), Some(10009));
972 assert_eq!(parse_tcp_port_key("10009/udp"), None);
973 assert_eq!(parse_tcp_port_key("not-a-port/tcp"), None);
974 }
975
976 #[test]
977 fn extracts_published_tcp_ports() {
978 let settings = NetworkSettings {
979 ports: Some(HashMap::from([
980 (
981 "10009/tcp".to_string(),
982 Some(vec![PortBinding {
983 host_ip: Some("127.0.0.1".to_string()),
984 host_port: Some("49153".to_string()),
985 }]),
986 ),
987 (
988 "9735/udp".to_string(),
989 Some(vec![PortBinding {
990 host_ip: Some("127.0.0.1".to_string()),
991 host_port: Some("49154".to_string()),
992 }]),
993 ),
994 ])),
995 ..Default::default()
996 };
997
998 let ports = published_tcp_ports(Some(&settings)).expect("published ports");
999
1000 assert_eq!(ports.get(&10009), Some(&49153));
1001 assert!(!ports.contains_key(&9735));
1002 }
1003
1004 #[test]
1005 fn extracts_first_regular_file_from_tar() {
1006 let mut archive = Vec::new();
1007 {
1008 let mut builder = tar::Builder::new(&mut archive);
1009 let content = b"certificate-bytes";
1010 let mut header = tar::Header::new_gnu();
1011 header.set_path("tls.cert").expect("path");
1012 header.set_size(content.len() as u64);
1013 header.set_cksum();
1014 builder
1015 .append(&header, &content[..])
1016 .expect("append tar entry");
1017 builder.finish().expect("finish tar");
1018 }
1019
1020 let file = extract_first_file_from_tar(&archive).expect("file contents");
1021
1022 assert_eq!(file, b"certificate-bytes");
1023 }
1024
1025 #[test]
1026 fn errors_when_tar_has_no_regular_file() {
1027 let mut archive = Vec::new();
1028 {
1029 let mut builder = tar::Builder::new(&mut archive);
1030 let mut header = tar::Header::new_gnu();
1031 header.set_entry_type(tar::EntryType::Directory);
1032 header.set_path("empty-dir").expect("path");
1033 header.set_size(0);
1034 header.set_cksum();
1035 builder
1036 .append(&header, std::io::empty())
1037 .expect("append directory");
1038 builder.finish().expect("finish tar");
1039 }
1040
1041 let error = extract_first_file_from_tar(&archive).expect_err("no file");
1042
1043 assert_eq!(error, "archive did not contain a regular file");
1044 }
1045
1046 #[test]
1047 fn rollback_guard_tracks_and_disarms_ids() {
1048 let docker = super::DockerClient::from_bollard(
1049 bollard::Docker::connect_with_http(
1050 "http://127.0.0.1:65535",
1051 1,
1052 bollard::API_DEFAULT_VERSION,
1053 )
1054 .expect("construct Docker client"),
1055 );
1056 let mut rollback = docker.rollback_guard();
1057
1058 rollback.record_id("container-a");
1059 rollback.record_id("container-b");
1060
1061 assert_eq!(rollback.container_ids(), ["container-a", "container-b"]);
1062 assert_eq!(rollback.disarm(), ["container-a", "container-b"]);
1063 }
1064
1065 #[test]
1066 fn formats_log_output_with_stream_prefix() {
1067 let mut logs = String::new();
1068
1069 append_log_output(
1070 &mut logs,
1071 bollard::container::LogOutput::StdErr {
1072 message: "failure".into(),
1073 },
1074 );
1075
1076 assert_eq!(logs, "[stderr] failure\n");
1077 }
1078}