Skip to main content

hm_vm/
docker.rs

1//! Docker backend -- container orchestration via bollard.
2//!
3//! Each "VM" is a long-lived container running `sleep infinity`,
4//! commands are executed via the exec API, and snapshots are Docker
5//! image commits.
6
7use std::collections::HashMap;
8use std::path::Path;
9
10use anyhow::{Context, Result};
11use async_trait::async_trait;
12use bollard::Docker;
13use bollard::container::{
14    Config, CreateContainerOptions, RemoveContainerOptions, StartContainerOptions,
15    StopContainerOptions, UploadToContainerOptions,
16};
17use bollard::exec::{CreateExecOptions, StartExecResults};
18use bollard::image::{
19    CommitContainerOptions, CreateImageOptions, ListImagesOptions, RemoveImageOptions,
20};
21use futures::StreamExt;
22use tracing::instrument;
23
24use crate::backend::{Vm, VmBackend};
25use crate::types::{OutputSink, SnapshotId, SnapshotLabel, VmConfig};
26
27/// Docker-based VM backend.
28///
29/// Each VM is a long-lived container; snapshots are committed images.
30#[derive(Debug)]
31pub struct DockerBackend {
32    client: Docker,
33}
34
35impl DockerBackend {
36    /// Connect to the local Docker daemon.
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if bollard cannot resolve a Docker endpoint.
41    pub fn connect() -> Result<Self> {
42        let client =
43            Docker::connect_with_local_defaults().context("failed to connect to Docker daemon")?;
44        Ok(Self { client })
45    }
46
47    #[instrument(skip(self))]
48    async fn ensure_image(&self, image: &str) -> Result<()> {
49        if self.image_exists_by_tag(image).await? {
50            return Ok(());
51        }
52        let mut stream = self.client.create_image(
53            Some(CreateImageOptions {
54                from_image: image,
55                ..Default::default()
56            }),
57            None,
58            None,
59        );
60        while let Some(item) = stream.next().await {
61            item.with_context(|| format!("pulling image '{image}'"))?;
62        }
63        Ok(())
64    }
65
66    /// Check whether an image with the given tag exists locally.
67    async fn image_exists_by_tag(&self, tag: &str) -> Result<bool> {
68        let mut filters = HashMap::new();
69        filters.insert("reference".to_string(), vec![tag.to_string()]);
70        let images = self
71            .client
72            .list_images(Some(ListImagesOptions {
73                filters,
74                ..Default::default()
75            }))
76            .await
77            .with_context(|| format!("listing images for tag '{tag}'"))?;
78        Ok(!images.is_empty())
79    }
80
81    #[instrument(skip(self))]
82    async fn start_container(&self, image: &str) -> Result<String> {
83        let cfg = Config {
84            image: Some(image.to_string()),
85            cmd: Some(vec!["sh".into(), "-c".into(), "sleep infinity".into()]),
86            ..Default::default()
87        };
88        let create = self
89            .client
90            .create_container(None::<CreateContainerOptions<String>>, cfg)
91            .await
92            .context("create container")?;
93        self.client
94            .start_container(&create.id, None::<StartContainerOptions<String>>)
95            .await
96            .context("start container")?;
97        Ok(create.id)
98    }
99}
100
101#[async_trait]
102impl VmBackend for DockerBackend {
103    #[instrument(skip(self, _config))]
104    async fn create(&self, image: &str, _config: &VmConfig) -> Result<Box<dyn Vm>> {
105        self.ensure_image(image).await?;
106        let container_id = self.start_container(image).await?;
107        Ok(Box::new(DockerVm {
108            client: self.client.clone(),
109            container_id: Some(container_id),
110        }))
111    }
112
113    #[instrument(skip(self, _config))]
114    async fn restore(&self, snapshot: &SnapshotId, _config: &VmConfig) -> Result<Box<dyn Vm>> {
115        let container_id = self.start_container(snapshot.as_ref()).await?;
116        Ok(Box::new(DockerVm {
117            client: self.client.clone(),
118            container_id: Some(container_id),
119        }))
120    }
121
122    #[instrument(skip(self))]
123    async fn snapshot_exists(&self, snapshot: &SnapshotId) -> Result<bool> {
124        self.image_exists_by_tag(snapshot.as_ref()).await
125    }
126
127    #[instrument(skip(self))]
128    async fn remove_snapshot(&self, snapshot: &SnapshotId) -> Result<()> {
129        self.client
130            .remove_image(
131                snapshot.as_ref(),
132                Some(RemoveImageOptions {
133                    force: true,
134                    noprune: false,
135                }),
136                None,
137            )
138            .await
139            .with_context(|| format!("removing image '{snapshot}'"))?;
140        Ok(())
141    }
142}
143
144/// Handle to a running Docker container acting as a VM.
145#[derive(derive_more::Debug)]
146struct DockerVm {
147    #[debug(skip)]
148    client: Docker,
149    container_id: Option<String>,
150}
151
152impl Drop for DockerVm {
153    fn drop(&mut self) {
154        if let Some(id) = self.container_id.take() {
155            let client = self.client.clone();
156            tokio::spawn(async move {
157                let opts = StopContainerOptions { t: 0 };
158                let _ = client.stop_container(&id, Some(opts)).await;
159                let rm = RemoveContainerOptions {
160                    force: true,
161                    v: true,
162                    ..Default::default()
163                };
164                let _ = client.remove_container(&id, Some(rm)).await;
165                tracing::debug!(container = %id, "dropped container cleaned up");
166            });
167        }
168    }
169}
170
171/// Build a tar archive from a host directory.
172///
173/// The archive contains all files under `host_path` with paths relative
174/// to `host_path` itself (i.e. the directory contents, not the directory).
175fn tar_directory(host_path: &Path) -> Result<Vec<u8>> {
176    let mut archive = tar::Builder::new(Vec::new());
177    archive
178        .append_dir_all(".", host_path)
179        .with_context(|| format!("archiving '{}'", host_path.display()))?;
180    archive.finish().context("finalizing tar archive")?;
181    archive.into_inner().context("extracting tar bytes")
182}
183
184#[async_trait]
185impl Vm for DockerVm {
186    #[instrument(skip(self), fields(host = %host_path.display()))]
187    async fn inject(&self, host_path: &Path, guest_path: &str) -> Result<()> {
188        // Ensure the destination directory exists inside the container.
189        let cid = self
190            .container_id
191            .as_deref()
192            .context("container already destroyed")?;
193        let mkdir = self
194            .client
195            .create_exec(
196                cid,
197                CreateExecOptions {
198                    cmd: Some(vec!["mkdir", "-p", guest_path]),
199                    attach_stdout: Some(true),
200                    attach_stderr: Some(true),
201                    ..Default::default()
202                },
203            )
204            .await
205            .context("create mkdir exec")?;
206        if let StartExecResults::Attached { mut output, .. } = self
207            .client
208            .start_exec(&mkdir.id, None)
209            .await
210            .context("start mkdir exec")?
211        {
212            while output.next().await.is_some() {}
213        }
214
215        let tar_bytes = tar_directory(host_path)?;
216        let options = UploadToContainerOptions {
217            path: guest_path,
218            ..Default::default()
219        };
220        self.client
221            .upload_to_container(cid, Some(options), tar_bytes.into())
222            .await
223            .with_context(|| {
224                format!(
225                    "uploading '{}' to container '{}:{guest_path}'",
226                    host_path.display(),
227                    cid.get(..12).unwrap_or(cid),
228                )
229            })?;
230        Ok(())
231    }
232
233    #[instrument(skip(self, env, sink))]
234    async fn exec(
235        &self,
236        cmd: &str,
237        env: &[(String, String)],
238        working_dir: &str,
239        sink: &dyn OutputSink,
240    ) -> Result<i32> {
241        let cid = self
242            .container_id
243            .as_deref()
244            .context("container already destroyed")?;
245        let env_strings: Vec<String> = env.iter().map(|(k, v)| format!("{k}={v}")).collect();
246        let exec = self
247            .client
248            .create_exec(
249                cid,
250                CreateExecOptions {
251                    cmd: Some(vec!["sh", "-c", cmd]),
252                    env: Some(env_strings.iter().map(String::as_str).collect()),
253                    working_dir: Some(working_dir),
254                    attach_stdout: Some(true),
255                    attach_stderr: Some(true),
256                    ..Default::default()
257                },
258            )
259            .await
260            .context("create exec")?;
261
262        if let StartExecResults::Attached { mut output, .. } = self
263            .client
264            .start_exec(&exec.id, None)
265            .await
266            .context("start exec")?
267        {
268            use bollard::container::LogOutput;
269
270            while let Some(item) = output.next().await {
271                let chunk = item.context("exec stream")?;
272                match chunk {
273                    LogOutput::StdOut { message } => {
274                        let text = String::from_utf8_lossy(&message);
275                        for line in text.lines() {
276                            sink.on_stdout(line);
277                        }
278                    }
279                    LogOutput::StdErr { message } => {
280                        let text = String::from_utf8_lossy(&message);
281                        for line in text.lines() {
282                            sink.on_stderr(line);
283                        }
284                    }
285                    LogOutput::StdIn { .. } | LogOutput::Console { .. } => {}
286                }
287            }
288        }
289
290        // Retry inspect_exec: the connection pool can go stale after
291        // long-running exec streams on Docker Desktop for macOS.
292        let mut inspect_result = self.client.inspect_exec(&exec.id).await;
293        for _ in 0..3 {
294            if inspect_result.is_ok() {
295                break;
296            }
297            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
298            inspect_result = self.client.inspect_exec(&exec.id).await;
299        }
300        let inspect = inspect_result.context("inspect exec")?;
301
302        #[allow(
303            clippy::cast_possible_truncation,
304            reason = "docker exit codes fit in i32"
305        )]
306        let exit_code = inspect.exit_code.unwrap_or(0) as i32;
307        Ok(exit_code)
308    }
309
310    #[instrument(skip(self))]
311    async fn snapshot(&mut self, label: &SnapshotLabel) -> Result<SnapshotId> {
312        let cid = self
313            .container_id
314            .as_deref()
315            .context("container already destroyed")?;
316        // An ephemeral, uncached snapshot is committed under a unique tag (the
317        // container id) rather than a shared `:latest`: concurrent sibling leaf
318        // steps off the same parent all commit ephemeral snapshots, and racing
319        // to write the same `ephemeral:latest` image fails the loser of the
320        // race in dockerd. A cached snapshot parses its cache key as `repo:tag`.
321        let (repo, tag) = match label {
322            SnapshotLabel::Ephemeral => ("ephemeral", cid),
323            SnapshotLabel::Cached(key) => match key.split_once(':') {
324                Some((r, v)) => (r, v),
325                None => (key.as_str(), cid),
326            },
327        };
328        let opts = CommitContainerOptions {
329            container: cid,
330            repo,
331            tag,
332            ..Default::default()
333        };
334        // docker commit can be slow for containers with large filesystems;
335        // use a dedicated long-timeout client for this operation.
336        #[allow(
337            clippy::duration_suboptimal_units,
338            reason = "from_mins is nightly-only"
339        )]
340        let commit_client = self
341            .client
342            .clone()
343            .with_timeout(std::time::Duration::from_secs(600));
344        commit_client
345            .commit_container(opts, Config::<String>::default())
346            .await
347            .context("commit container")?;
348        let full_tag = format!("{repo}:{tag}");
349        Ok(SnapshotId::new(full_tag))
350    }
351
352    #[instrument(skip(self))]
353    async fn destroy(&mut self) -> Result<()> {
354        let Some(id) = self.container_id.take() else {
355            return Ok(());
356        };
357        let _ = self
358            .client
359            .stop_container(&id, Some(StopContainerOptions { t: 0 }))
360            .await;
361        self.client
362            .remove_container(
363                &id,
364                Some(RemoveContainerOptions {
365                    force: true,
366                    v: true,
367                    ..Default::default()
368                }),
369            )
370            .await
371            .with_context(|| format!("removing container '{id}'"))?;
372        Ok(())
373    }
374}