Skip to main content

harmont_cli/orchestrator/
docker_client.rs

1//! Thin wrapper around bollard for the local executor.
2//!
3//! Operations: pull images, start containers (long-lived sleep), exec
4//! commands streaming stdout/stderr, commit container to image, look
5//! up images by tag, stop and remove containers.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use anyhow::{Context, Result};
11use bollard::Docker;
12use bollard::container::{
13    Config, CreateContainerOptions, RemoveContainerOptions, StartContainerOptions,
14    StopContainerOptions,
15};
16use bollard::exec::{CreateExecOptions, StartExecResults};
17use bollard::image::{
18    CommitContainerOptions, CreateImageOptions, ImportImageOptions, ListImagesOptions,
19    RemoveImageOptions, TagImageOptions,
20};
21use bollard::models::HostConfig;
22use futures_util::StreamExt;
23use tokio::io::AsyncWrite;
24
25use crate::error::HmError;
26
27/// Build a [`HostConfig`] with optional bind mounts and Linux capabilities.
28///
29/// Empty slices become `None` so Docker applies its defaults.
30fn build_host_config(binds: &[String], cap_add: &[String]) -> HostConfig {
31    HostConfig {
32        binds: if binds.is_empty() {
33            None
34        } else {
35            Some(binds.to_vec())
36        },
37        cap_add: if cap_add.is_empty() {
38            None
39        } else {
40            Some(cap_add.to_vec())
41        },
42        ..Default::default()
43    }
44}
45
46#[derive(Debug, Clone)]
47pub struct DockerClient {
48    inner: Arc<Docker>,
49}
50
51impl DockerClient {
52    /// Open a Docker connection using the platform's default socket /
53    /// pipe. The handle is cheap to clone (refcounted internally).
54    ///
55    /// # Errors
56    ///
57    /// Returns [`HmError::Docker`] when bollard cannot resolve a
58    /// local Docker endpoint (no socket on `DOCKER_HOST`, no Windows
59    /// pipe, etc.).
60    pub fn connect() -> Result<Self> {
61        let d = Docker::connect_with_local_defaults()
62            .map_err(|e| HmError::Docker(format!("connect: {e}")))?;
63        Ok(Self { inner: Arc::new(d) })
64    }
65
66    /// Round-trip the daemon to confirm reachability.
67    ///
68    /// # Errors
69    ///
70    /// Returns [`HmError::Docker`] if the ping request fails (daemon
71    /// stopped, socket revoked, version negotiation failure).
72    pub async fn ping(&self) -> Result<()> {
73        self.inner
74            .ping()
75            .await
76            .map_err(|e| HmError::Docker(format!("ping failed: {e}")))?;
77        Ok(())
78    }
79
80    /// True if `tag` resolves to a locally-cached image.
81    ///
82    /// # Errors
83    ///
84    /// Returns [`HmError::Docker`] if the `list_images` API call
85    /// fails (daemon unreachable, malformed filter).
86    pub async fn image_exists(&self, tag: &str) -> Result<bool> {
87        let mut filters = HashMap::new();
88        filters.insert("reference".to_string(), vec![tag.to_string()]);
89        let images = self
90            .inner
91            .list_images(Some(ListImagesOptions {
92                filters,
93                ..Default::default()
94            }))
95            .await
96            .map_err(|e| HmError::Docker(format!("list_images: {e}")))?;
97        Ok(!images.is_empty())
98    }
99
100    /// List all `repo_tags` from images that have at least one tag
101    /// matching `reference` (e.g. `"harmont-local/build"` matches
102    /// `harmont-local/build:abc123`).
103    ///
104    /// # Errors
105    ///
106    /// Returns [`HmError::Docker`] if the `list_images` API call fails.
107    pub async fn list_images_by_reference(&self, reference: &str) -> Result<Vec<String>> {
108        let mut filters = HashMap::new();
109        filters.insert("reference".to_string(), vec![format!("{reference}:*")]);
110        let images = self
111            .inner
112            .list_images(Some(ListImagesOptions {
113                filters,
114                ..Default::default()
115            }))
116            .await
117            .map_err(|e| HmError::Docker(format!("list_images: {e}")))?;
118        Ok(images.into_iter().flat_map(|img| img.repo_tags).collect())
119    }
120
121    /// Pull `tag` from its registry, surfacing the daemon's progress
122    /// stream as Docker errors.
123    ///
124    /// # Errors
125    ///
126    /// Returns [`HmError::Docker`] if any chunk of the pull stream
127    /// reports an error (registry not reachable, image not found,
128    /// auth required).
129    pub async fn pull_image(&self, tag: &str) -> Result<()> {
130        let mut s = self.inner.create_image(
131            Some(CreateImageOptions {
132                from_image: tag,
133                ..Default::default()
134            }),
135            None,
136            None,
137        );
138        while let Some(item) = s.next().await {
139            item.map_err(|e| HmError::Docker(format!("pull {tag}: {e}")))?;
140        }
141        Ok(())
142    }
143
144    /// Start a long-lived container that runs `sh -c 'sleep infinity'` so
145    /// later `exec`s land in a stable shell. Returns the container ID.
146    ///
147    /// # Errors
148    ///
149    /// Returns [`HmError::Docker`] if the container cannot be created
150    /// (image not pulled, name conflict, OCI runtime failure) or if
151    /// `start_container` rejects the create.
152    pub async fn start_long_lived(
153        &self,
154        image: &str,
155        env: &[String],
156        workdir: &str,
157        name: &str,
158    ) -> Result<String> {
159        self.start_long_lived_with_mounts(image, env, workdir, name, &[])
160            .await
161    }
162
163    /// Like [`Self::start_long_lived`] but with bind mounts via `HostConfig`.
164    ///
165    /// Each entry in `binds` is a Docker bind-mount string of the form
166    /// `"/host/path:/container/path"` (with an optional `:ro` suffix).
167    ///
168    /// # Errors
169    ///
170    /// Returns [`HmError::Docker`] if the container cannot be created
171    /// (image not pulled, name conflict, OCI runtime failure) or if
172    /// `start_container` rejects the create.
173    pub async fn start_long_lived_with_mounts(
174        &self,
175        image: &str,
176        env: &[String],
177        workdir: &str,
178        name: &str,
179        binds: &[String],
180    ) -> Result<String> {
181        let cfg = Config {
182            image: Some(image.to_string()),
183            cmd: Some(vec!["sh".into(), "-c".into(), "sleep infinity".into()]),
184            env: Some(env.to_vec()),
185            working_dir: Some(workdir.to_string()),
186            host_config: Some(build_host_config(binds, &[])),
187            ..Default::default()
188        };
189        let create = self
190            .inner
191            .create_container(
192                Some(CreateContainerOptions {
193                    name,
194                    ..Default::default()
195                }),
196                cfg,
197            )
198            .await
199            .map_err(|e| HmError::Docker(format!("create_container: {e}")))?;
200        self.inner
201            .start_container(&create.id, None::<StartContainerOptions<String>>)
202            .await
203            .map_err(|e| HmError::Docker(format!("start_container: {e}")))?;
204        Ok(create.id)
205    }
206
207    /// Exec a command inside a running container and stream stdout+stderr
208    /// to `out`. Returns the command's exit code.
209    ///
210    /// # Errors
211    ///
212    /// Returns [`HmError::Docker`] if `create_exec` / `start_exec` /
213    /// `inspect_exec` fail, or surfaces an `anyhow` error if writing a
214    /// log frame to `out` fails.
215    pub async fn exec_streaming(
216        &self,
217        container_id: &str,
218        cmd: &[String],
219        env: &[String],
220        workdir: &str,
221        out: &mut (impl AsyncWrite + Send + Unpin),
222    ) -> Result<i64> {
223        use bollard::container::LogOutput;
224        use tokio::io::AsyncWriteExt;
225
226        let exec = self
227            .inner
228            .create_exec(
229                container_id,
230                CreateExecOptions {
231                    cmd: Some(cmd.iter().map(std::string::String::as_str).collect()),
232                    env: Some(env.iter().map(std::string::String::as_str).collect()),
233                    working_dir: Some(workdir),
234                    attach_stdout: Some(true),
235                    attach_stderr: Some(true),
236                    ..Default::default()
237                },
238            )
239            .await
240            .map_err(|e| HmError::Docker(format!("create_exec: {e}")))?;
241        match self
242            .inner
243            .start_exec(&exec.id, None)
244            .await
245            .map_err(|e| HmError::Docker(format!("start_exec: {e}")))?
246        {
247            StartExecResults::Attached { mut output, .. } => {
248                while let Some(item) = output.next().await {
249                    let chunk = item.map_err(|e| HmError::Docker(format!("exec stream: {e}")))?;
250                    let (LogOutput::StdOut { message: bytes }
251                    | LogOutput::StdErr { message: bytes }
252                    | LogOutput::Console { message: bytes }) = chunk
253                    else {
254                        // StdIn frames are echoed by some daemons; ignore them.
255                        continue;
256                    };
257                    out.write_all(&bytes).await.context("write exec output")?;
258                }
259            }
260            StartExecResults::Detached => {}
261        }
262        let inspect = self
263            .inner
264            .inspect_exec(&exec.id)
265            .await
266            .map_err(|e| HmError::Docker(format!("inspect_exec: {e}")))?;
267        Ok(inspect.exit_code.unwrap_or(0))
268    }
269
270    /// Like [`Self::exec_streaming`], but also pipes `stdin_bytes` into the
271    /// exec'd process's stdin (closing it after the write so the process
272    /// sees EOF). Used to stream a tar archive into `tar -xzf -` when
273    /// hydrating `/workspace` in a fresh chain-root container.
274    ///
275    /// # Errors
276    ///
277    /// Returns [`HmError::Docker`] if any of the exec lifecycle calls
278    /// fail, or surfaces an `anyhow` error if writing stdin or output
279    /// frames fails.
280    pub async fn exec_streaming_stdin(
281        &self,
282        container_id: &str,
283        cmd: &[String],
284        env: &[String],
285        workdir: &str,
286        stdin_bytes: &[u8],
287        out: &mut (impl AsyncWrite + Send + Unpin),
288    ) -> Result<i64> {
289        use bollard::container::LogOutput;
290        use tokio::io::AsyncWriteExt;
291
292        let exec = self
293            .inner
294            .create_exec(
295                container_id,
296                CreateExecOptions {
297                    cmd: Some(cmd.iter().map(std::string::String::as_str).collect()),
298                    env: Some(env.iter().map(std::string::String::as_str).collect()),
299                    working_dir: Some(workdir),
300                    attach_stdin: Some(true),
301                    attach_stdout: Some(true),
302                    attach_stderr: Some(true),
303                    ..Default::default()
304                },
305            )
306            .await
307            .map_err(|e| HmError::Docker(format!("create_exec: {e}")))?;
308        match self
309            .inner
310            .start_exec(&exec.id, None)
311            .await
312            .map_err(|e| HmError::Docker(format!("start_exec: {e}")))?
313        {
314            StartExecResults::Attached {
315                mut output,
316                mut input,
317            } => {
318                input
319                    .write_all(stdin_bytes)
320                    .await
321                    .context("write exec stdin")?;
322                input.shutdown().await.context("close exec stdin")?;
323                // Drop the writer to fully release the half-duplex.
324                drop(input);
325                while let Some(item) = output.next().await {
326                    let chunk = item.map_err(|e| HmError::Docker(format!("exec stream: {e}")))?;
327                    let (LogOutput::StdOut { message: bytes }
328                    | LogOutput::StdErr { message: bytes }
329                    | LogOutput::Console { message: bytes }) = chunk
330                    else {
331                        // StdIn frames are echoed by some daemons; ignore them.
332                        continue;
333                    };
334                    out.write_all(&bytes).await.context("write exec output")?;
335                }
336            }
337            StartExecResults::Detached => {}
338        }
339        let inspect = self
340            .inner
341            .inspect_exec(&exec.id)
342            .await
343            .map_err(|e| HmError::Docker(format!("inspect_exec: {e}")))?;
344        Ok(inspect.exit_code.unwrap_or(0))
345    }
346
347    /// Commit a running container to an image tag. Returns the tag, which
348    /// is a valid image reference once the daemon's commit succeeds.
349    ///
350    /// We don't return the daemon's image ID: bollard 0.18's `Commit`
351    /// stub deserialises the response as `{"id": ...}`, but the Docker
352    /// daemon returns `{"Id": ...}` (capital I). The image is committed
353    /// correctly either way; the tag is the canonical reference and is
354    /// what every caller actually uses.
355    ///
356    /// # Errors
357    ///
358    /// Returns [`HmError::Docker`] if `commit_container` fails (paused
359    /// container, daemon I/O failure).
360    ///
361    /// # Panics
362    ///
363    /// Panics if `tag.splitn(2, ':')` produces neither one nor two parts.
364    /// `splitn` is total for non-empty input, so this branch is only
365    /// reachable for the empty string, which the caller never passes.
366    pub async fn commit_container(&self, container_id: &str, tag: &str) -> Result<String> {
367        let parts: Vec<&str> = tag.splitn(2, ':').collect();
368        let (repo, ver) = match parts.as_slice() {
369            [r, v] => (*r, *v),
370            [r] => (*r, "latest"),
371            _ => unreachable!("splitn(2) yields one or two parts for non-empty input"),
372        };
373        let opts = CommitContainerOptions {
374            container: container_id,
375            repo,
376            tag: ver,
377            ..Default::default()
378        };
379        self.inner
380            .commit_container(opts, Config::<String>::default())
381            .await
382            .map_err(|e| HmError::Docker(format!("commit_container: {e}")))?;
383        Ok(tag.to_string())
384    }
385
386    /// Force-remove an image by tag. Used for end-of-run pruning of
387    /// ephemeral parent-snapshot tags committed during this process's
388    /// run. Best-effort callers should swallow the error themselves;
389    /// failures here are non-fatal.
390    ///
391    /// # Errors
392    ///
393    /// Returns [`HmError::Docker`] if `remove_image` fails (image
394    /// missing, still referenced by a running container, daemon I/O
395    /// failure).
396    pub async fn remove_image(&self, image: &str) -> Result<()> {
397        self.inner
398            .remove_image(
399                image,
400                Some(RemoveImageOptions {
401                    force: true,
402                    noprune: false,
403                }),
404                None,
405            )
406            .await
407            .map_err(|e| HmError::Docker(format!("remove_image '{image}': {e}")))?;
408        Ok(())
409    }
410
411    /// Add an additional tag to an existing image.
412    ///
413    /// `source` is the existing image reference (tag or ID) and
414    /// `new_tag` is the desired `repo:tag` string. If `new_tag`
415    /// contains no `:`, the Docker-default tag `"latest"` is used.
416    ///
417    /// # Errors
418    ///
419    /// Returns [`HmError::Docker`] if `tag_image` fails (source image
420    /// not found, daemon I/O failure).
421    pub async fn tag_image(&self, source: &str, new_tag: &str) -> Result<()> {
422        let parts: Vec<&str> = new_tag.splitn(2, ':').collect();
423        let (repo, tag) = match parts.as_slice() {
424            [r, v] => (*r, *v),
425            [r] => (*r, "latest"),
426            _ => unreachable!("splitn(2) yields one or two parts for non-empty input"),
427        };
428        self.inner
429            .tag_image(source, Some(TagImageOptions { repo, tag }))
430            .await
431            .map_err(|e| HmError::Docker(format!("tag_image '{source}' -> '{new_tag}': {e}")))?;
432        Ok(())
433    }
434
435    /// Export a Docker image to a tar file on disk.
436    ///
437    /// Streams the image layer data from the daemon and writes it to
438    /// `dest` using a buffered writer.
439    ///
440    /// # Errors
441    ///
442    /// Returns [`HmError::Docker`] if the daemon's export stream fails,
443    /// or an I/O error if writing to `dest` fails.
444    pub async fn export_image(&self, image: &str, dest: &std::path::Path) -> Result<()> {
445        use tokio::io::AsyncWriteExt;
446
447        let mut stream = self.inner.export_image(image);
448        let file = tokio::fs::File::create(dest)
449            .await
450            .with_context(|| format!("create export file '{}'", dest.display()))?;
451        let mut writer = tokio::io::BufWriter::new(file);
452        while let Some(chunk) = stream.next().await {
453            let bytes =
454                chunk.map_err(|e| HmError::Docker(format!("export_image '{image}': {e}")))?;
455            writer
456                .write_all(&bytes)
457                .await
458                .with_context(|| format!("write export data to '{}'", dest.display()))?;
459        }
460        writer
461            .flush()
462            .await
463            .with_context(|| format!("flush export file '{}'", dest.display()))?;
464        Ok(())
465    }
466
467    /// Import a Docker image from a tar file on disk.
468    ///
469    /// Reads the full tar file into memory and loads it into the
470    /// daemon via the image import API.
471    ///
472    /// # Errors
473    ///
474    /// Returns [`HmError::Docker`] if the daemon rejects the import
475    /// stream, or an I/O error if reading `src` fails.
476    pub async fn import_image(&self, src: &std::path::Path) -> Result<()> {
477        let body = tokio::fs::read(src)
478            .await
479            .with_context(|| format!("read import file '{}'", src.display()))?;
480        let mut stream =
481            self.inner
482                .import_image(ImportImageOptions { quiet: true }, body.into(), None);
483        while let Some(item) = stream.next().await {
484            item.map_err(|e| HmError::Docker(format!("import_image '{}': {e}", src.display())))?;
485        }
486        Ok(())
487    }
488
489    /// List all image tags whose name starts with `prefix`.
490    ///
491    /// Uses the Docker `reference` filter with a glob pattern and then
492    /// post-filters the returned `repo_tags` to those that truly begin
493    /// with `prefix`. The result is sorted lexicographically.
494    ///
495    /// # Errors
496    ///
497    /// Returns [`HmError::Docker`] if the `list_images` API call
498    /// fails (daemon unreachable, malformed filter).
499    pub async fn list_images_by_prefix(&self, prefix: &str) -> Result<Vec<String>> {
500        let mut filters = HashMap::new();
501        filters.insert("reference".to_string(), vec![format!("{prefix}*")]);
502        let images = self
503            .inner
504            .list_images(Some(ListImagesOptions {
505                filters,
506                ..Default::default()
507            }))
508            .await
509            .map_err(|e| HmError::Docker(format!("list_images: {e}")))?;
510        let mut tags: Vec<String> = images
511            .iter()
512            .flat_map(|img| &img.repo_tags)
513            .filter(|tag| tag.starts_with(prefix))
514            .cloned()
515            .collect();
516        tags.sort();
517        Ok(tags)
518    }
519
520    pub async fn stop_remove(&self, container_id: &str) {
521        let _ = self
522            .inner
523            .stop_container(container_id, Some(StopContainerOptions { t: 0 }))
524            .await;
525        let _ = self
526            .inner
527            .remove_container(
528                container_id,
529                Some(RemoveContainerOptions {
530                    force: true,
531                    v: true,
532                    ..Default::default()
533                }),
534            )
535            .await;
536    }
537
538    /// Internal access to the underlying bollard handle, for callers
539    /// that need to call bollard APIs not yet wrapped here (e.g., log
540    /// streaming via `Docker::logs`).
541    ///
542    /// Prefer adding a dedicated method to this type; only use this
543    /// accessor when a one-off stream is needed outside the main
544    /// `DockerClient` API surface.
545    #[doc(hidden)]
546    #[must_use]
547    pub fn inner_for_logs(&self) -> &bollard::Docker {
548        &self.inner
549    }
550
551    /// List container summaries filtered by a single label `k=v` predicate.
552    ///
553    /// # Errors
554    ///
555    /// Returns [`HmError::Docker`] when `list_containers` fails.
556    pub async fn list_containers_by_label(
557        &self,
558        k: &str,
559        v: &str,
560    ) -> Result<Vec<bollard::secret::ContainerSummary>> {
561        use bollard::container::ListContainersOptions;
562        use std::collections::HashMap;
563        let mut filters: HashMap<String, Vec<String>> = HashMap::new();
564        filters.insert("label".to_string(), vec![format!("{k}={v}")]);
565        let out = self
566            .inner
567            .list_containers(Some(ListContainersOptions {
568                all: true,
569                filters,
570                ..Default::default()
571            }))
572            .await
573            .map_err(|e| HmError::Docker(format!("list_containers: {e}")))?;
574        Ok(out)
575    }
576}
577
578#[cfg(test)]
579#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
580mod smoke {
581    use super::*;
582
583    #[tokio::test]
584    #[ignore = "requires a running Docker daemon; opt in with `cargo test -- --ignored`"]
585    async fn docker_ping() {
586        let c = DockerClient::connect().unwrap();
587        c.ping().await.unwrap();
588    }
589
590    #[tokio::test]
591    #[ignore = "requires a running Docker daemon; opt in with `cargo test -- --ignored`"]
592    async fn list_images_by_reference_returns_empty_for_nonexistent() {
593        let c = DockerClient::connect().unwrap();
594        let tags = c
595            .list_images_by_reference("harmont-test-nonexistent")
596            .await
597            .unwrap();
598        assert!(tags.is_empty());
599    }
600
601    #[test]
602    fn build_host_config_with_binds_and_no_caps() {
603        let hc = super::build_host_config(&["/host/path:/container/path".to_string()], &[]);
604        assert_eq!(
605            hc.binds.as_ref().unwrap(),
606            &["/host/path:/container/path".to_string()]
607        );
608        assert!(hc.cap_add.is_none());
609    }
610
611    #[test]
612    fn build_host_config_empty_binds_is_none() {
613        let hc = super::build_host_config(&[], &[]);
614        assert!(hc.binds.is_none());
615        assert!(hc.cap_add.is_none());
616    }
617}