Skip to main content

harmont_cli/orchestrator/
docker_client.rs

1//! Thin wrapper around bollard for the local executor.
2//!
3//! Operations: pull images, start containers (long-lived sleep), exec
4//! commands streaming stdout/stderr, commit container to image, look
5//! up images by tag, stop+remove containers.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use anyhow::{Context, Result};
11use bollard::Docker;
12use bollard::container::{
13    Config, CreateContainerOptions, RemoveContainerOptions, StartContainerOptions,
14    StopContainerOptions,
15};
16use bollard::exec::{CreateExecOptions, StartExecResults};
17use bollard::image::{
18    CommitContainerOptions, CreateImageOptions, ListImagesOptions, RemoveImageOptions,
19};
20use futures_util::StreamExt;
21use tokio::io::AsyncWrite;
22
23use crate::error::HmError;
24
25#[derive(Debug, Clone)]
26pub struct DockerClient {
27    inner: Arc<Docker>,
28}
29
30impl DockerClient {
31    /// Open a Docker connection using the platform's default socket /
32    /// pipe. The handle is cheap to clone (refcounted internally).
33    ///
34    /// # Errors
35    ///
36    /// Returns [`HmError::Docker`] when bollard cannot resolve a
37    /// local Docker endpoint (no socket on `DOCKER_HOST`, no Windows
38    /// pipe, etc.).
39    pub fn connect() -> Result<Self> {
40        let d = Docker::connect_with_local_defaults()
41            .map_err(|e| HmError::Docker(format!("connect: {e}")))?;
42        Ok(Self { inner: Arc::new(d) })
43    }
44
45    /// Round-trip the daemon to confirm reachability.
46    ///
47    /// # Errors
48    ///
49    /// Returns [`HmError::Docker`] if the ping request fails (daemon
50    /// stopped, socket revoked, version negotiation failure).
51    pub async fn ping(&self) -> Result<()> {
52        self.inner
53            .ping()
54            .await
55            .map_err(|e| HmError::Docker(format!("ping failed: {e}")))?;
56        Ok(())
57    }
58
59    /// True if `tag` resolves to a locally-cached image.
60    ///
61    /// # Errors
62    ///
63    /// Returns [`HmError::Docker`] if the `list_images` API call
64    /// fails (daemon unreachable, malformed filter).
65    pub async fn image_exists(&self, tag: &str) -> Result<bool> {
66        let mut filters = HashMap::new();
67        filters.insert("reference".to_string(), vec![tag.to_string()]);
68        let images = self
69            .inner
70            .list_images(Some(ListImagesOptions {
71                filters,
72                ..Default::default()
73            }))
74            .await
75            .map_err(|e| HmError::Docker(format!("list_images: {e}")))?;
76        Ok(!images.is_empty())
77    }
78
79    /// Pull `tag` from its registry, surfacing the daemon's progress
80    /// stream as Docker errors.
81    ///
82    /// # Errors
83    ///
84    /// Returns [`HmError::Docker`] if any chunk of the pull stream
85    /// reports an error (registry not reachable, image not found,
86    /// auth required).
87    pub async fn pull_image(&self, tag: &str) -> Result<()> {
88        let mut s = self.inner.create_image(
89            Some(CreateImageOptions {
90                from_image: tag,
91                ..Default::default()
92            }),
93            None,
94            None,
95        );
96        while let Some(item) = s.next().await {
97            item.map_err(|e| HmError::Docker(format!("pull {tag}: {e}")))?;
98        }
99        Ok(())
100    }
101
102    /// Start a long-lived container that runs `sh -c 'sleep infinity'` so
103    /// later `exec`s land in a stable shell. Returns the container ID.
104    ///
105    /// # Errors
106    ///
107    /// Returns [`HmError::Docker`] if the container cannot be created
108    /// (image not pulled, name conflict, OCI runtime failure) or if
109    /// `start_container` rejects the create.
110    pub async fn start_long_lived(
111        &self,
112        image: &str,
113        env: &[String],
114        workdir: &str,
115        name: &str,
116    ) -> Result<String> {
117        let cfg = Config {
118            image: Some(image.to_string()),
119            cmd: Some(vec!["sh".into(), "-c".into(), "sleep infinity".into()]),
120            env: Some(env.to_vec()),
121            working_dir: Some(workdir.to_string()),
122            ..Default::default()
123        };
124        let create = self
125            .inner
126            .create_container(
127                Some(CreateContainerOptions {
128                    name,
129                    ..Default::default()
130                }),
131                cfg,
132            )
133            .await
134            .map_err(|e| HmError::Docker(format!("create_container: {e}")))?;
135        self.inner
136            .start_container(&create.id, None::<StartContainerOptions<String>>)
137            .await
138            .map_err(|e| HmError::Docker(format!("start_container: {e}")))?;
139        Ok(create.id)
140    }
141
142    /// Exec a command inside a running container and stream stdout+stderr
143    /// to `out`. Returns the command's exit code.
144    ///
145    /// # Errors
146    ///
147    /// Returns [`HmError::Docker`] if `create_exec` / `start_exec` /
148    /// `inspect_exec` fail, or surfaces an `anyhow` error if writing a
149    /// log frame to `out` fails.
150    pub async fn exec_streaming(
151        &self,
152        container_id: &str,
153        cmd: &[String],
154        env: &[String],
155        workdir: &str,
156        out: &mut (impl AsyncWrite + Send + Unpin),
157    ) -> Result<i64> {
158        use bollard::container::LogOutput;
159        use tokio::io::AsyncWriteExt;
160
161        let exec = self
162            .inner
163            .create_exec(
164                container_id,
165                CreateExecOptions {
166                    cmd: Some(cmd.iter().map(std::string::String::as_str).collect()),
167                    env: Some(env.iter().map(std::string::String::as_str).collect()),
168                    working_dir: Some(workdir),
169                    attach_stdout: Some(true),
170                    attach_stderr: Some(true),
171                    ..Default::default()
172                },
173            )
174            .await
175            .map_err(|e| HmError::Docker(format!("create_exec: {e}")))?;
176        match self
177            .inner
178            .start_exec(&exec.id, None)
179            .await
180            .map_err(|e| HmError::Docker(format!("start_exec: {e}")))?
181        {
182            StartExecResults::Attached { mut output, .. } => {
183                while let Some(item) = output.next().await {
184                    let chunk = item.map_err(|e| HmError::Docker(format!("exec stream: {e}")))?;
185                    let (LogOutput::StdOut { message: bytes }
186                    | LogOutput::StdErr { message: bytes }
187                    | LogOutput::Console { message: bytes }) = chunk
188                    else {
189                        // StdIn frames are echoed by some daemons; ignore them.
190                        continue;
191                    };
192                    out.write_all(&bytes).await.context("write exec output")?;
193                }
194            }
195            StartExecResults::Detached => {}
196        }
197        let inspect = self
198            .inner
199            .inspect_exec(&exec.id)
200            .await
201            .map_err(|e| HmError::Docker(format!("inspect_exec: {e}")))?;
202        Ok(inspect.exit_code.unwrap_or(0))
203    }
204
205    /// Like [`Self::exec_streaming`], but also pipes `stdin_bytes` into the
206    /// exec'd process's stdin (closing it after the write so the process
207    /// sees EOF). Used to stream a tar archive into `tar -xzf -` when
208    /// hydrating `/workspace` in a fresh chain-root container.
209    ///
210    /// # Errors
211    ///
212    /// Returns [`HmError::Docker`] if any of the exec lifecycle calls
213    /// fail, or surfaces an `anyhow` error if writing stdin or output
214    /// frames fails.
215    pub async fn exec_streaming_stdin(
216        &self,
217        container_id: &str,
218        cmd: &[String],
219        env: &[String],
220        workdir: &str,
221        stdin_bytes: &[u8],
222        out: &mut (impl AsyncWrite + Send + Unpin),
223    ) -> Result<i64> {
224        use bollard::container::LogOutput;
225        use tokio::io::AsyncWriteExt;
226
227        let exec = self
228            .inner
229            .create_exec(
230                container_id,
231                CreateExecOptions {
232                    cmd: Some(cmd.iter().map(std::string::String::as_str).collect()),
233                    env: Some(env.iter().map(std::string::String::as_str).collect()),
234                    working_dir: Some(workdir),
235                    attach_stdin: Some(true),
236                    attach_stdout: Some(true),
237                    attach_stderr: Some(true),
238                    ..Default::default()
239                },
240            )
241            .await
242            .map_err(|e| HmError::Docker(format!("create_exec: {e}")))?;
243        match self
244            .inner
245            .start_exec(&exec.id, None)
246            .await
247            .map_err(|e| HmError::Docker(format!("start_exec: {e}")))?
248        {
249            StartExecResults::Attached {
250                mut output,
251                mut input,
252            } => {
253                input
254                    .write_all(stdin_bytes)
255                    .await
256                    .context("write exec stdin")?;
257                input.shutdown().await.context("close exec stdin")?;
258                // Drop the writer to fully release the half-duplex.
259                drop(input);
260                while let Some(item) = output.next().await {
261                    let chunk = item.map_err(|e| HmError::Docker(format!("exec stream: {e}")))?;
262                    let (LogOutput::StdOut { message: bytes }
263                    | LogOutput::StdErr { message: bytes }
264                    | LogOutput::Console { message: bytes }) = chunk
265                    else {
266                        // StdIn frames are echoed by some daemons; ignore them.
267                        continue;
268                    };
269                    out.write_all(&bytes).await.context("write exec output")?;
270                }
271            }
272            StartExecResults::Detached => {}
273        }
274        let inspect = self
275            .inner
276            .inspect_exec(&exec.id)
277            .await
278            .map_err(|e| HmError::Docker(format!("inspect_exec: {e}")))?;
279        Ok(inspect.exit_code.unwrap_or(0))
280    }
281
282    /// Commit a running container to an image tag. Returns the tag, which
283    /// is a valid image reference once the daemon's commit succeeds.
284    ///
285    /// We don't return the daemon's image ID: bollard 0.18's `Commit`
286    /// stub deserialises the response as `{"id": ...}`, but the Docker
287    /// daemon returns `{"Id": ...}` (capital I). The image is committed
288    /// correctly either way; the tag is the canonical reference and is
289    /// what every caller actually uses.
290    ///
291    /// # Errors
292    ///
293    /// Returns [`HmError::Docker`] if `commit_container` fails (paused
294    /// container, daemon I/O failure).
295    ///
296    /// # Panics
297    ///
298    /// Panics if `tag.splitn(2, ':')` produces neither one nor two parts.
299    /// `splitn` is total for non-empty input, so this branch is only
300    /// reachable for the empty string, which the caller never passes.
301    pub async fn commit_container(&self, container_id: &str, tag: &str) -> Result<String> {
302        let parts: Vec<&str> = tag.splitn(2, ':').collect();
303        let (repo, ver) = match parts.as_slice() {
304            [r, v] => (*r, *v),
305            [r] => (*r, "latest"),
306            _ => unreachable!("splitn(2) yields one or two parts for non-empty input"),
307        };
308        let opts = CommitContainerOptions {
309            container: container_id,
310            repo,
311            tag: ver,
312            ..Default::default()
313        };
314        self.inner
315            .commit_container(opts, Config::<String>::default())
316            .await
317            .map_err(|e| HmError::Docker(format!("commit_container: {e}")))?;
318        Ok(tag.to_string())
319    }
320
321    /// Force-remove an image by tag. Used for end-of-run pruning of
322    /// ephemeral parent-snapshot tags committed during this process's
323    /// run. Best-effort callers should swallow the error themselves;
324    /// failures here are non-fatal.
325    ///
326    /// # Errors
327    ///
328    /// Returns [`HmError::Docker`] if `remove_image` fails (image
329    /// missing, still referenced by a running container, daemon I/O
330    /// failure).
331    pub async fn remove_image(&self, image: &str) -> Result<()> {
332        self.inner
333            .remove_image(
334                image,
335                Some(RemoveImageOptions {
336                    force: true,
337                    noprune: false,
338                }),
339                None,
340            )
341            .await
342            .map_err(|e| HmError::Docker(format!("remove_image '{image}': {e}")))?;
343        Ok(())
344    }
345
346    pub async fn stop_remove(&self, container_id: &str) {
347        let _ = self
348            .inner
349            .stop_container(container_id, Some(StopContainerOptions { t: 0 }))
350            .await;
351        let _ = self
352            .inner
353            .remove_container(
354                container_id,
355                Some(RemoveContainerOptions {
356                    force: true,
357                    v: true,
358                    ..Default::default()
359                }),
360            )
361            .await;
362    }
363}
364
365#[cfg(test)]
366#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
367mod smoke {
368    use super::*;
369
370    #[tokio::test]
371    #[ignore = "requires a running Docker daemon; opt in with `cargo test -- --ignored`"]
372    async fn docker_ping() {
373        let c = DockerClient::connect().unwrap();
374        c.ping().await.unwrap();
375    }
376}