1use std::collections::HashMap;
8use std::path::Path;
9
10use anyhow::{Context, Result};
11use async_trait::async_trait;
12use bollard::Docker;
13use bollard::container::{
14 Config, CreateContainerOptions, RemoveContainerOptions, StartContainerOptions,
15 StopContainerOptions, UploadToContainerOptions,
16};
17use bollard::exec::{CreateExecOptions, StartExecResults};
18use bollard::image::{
19 CommitContainerOptions, CreateImageOptions, ListImagesOptions, RemoveImageOptions,
20};
21use futures::StreamExt;
22use tracing::instrument;
23
24use crate::backend::{Vm, VmBackend};
25use crate::types::{OutputSink, SnapshotId, SnapshotLabel, VmConfig};
26
27#[derive(Debug)]
31pub struct DockerBackend {
32 client: Docker,
33}
34
35impl DockerBackend {
36 pub fn connect() -> Result<Self> {
42 let client =
43 Docker::connect_with_local_defaults().context("failed to connect to Docker daemon")?;
44 Ok(Self { client })
45 }
46
47 #[instrument(skip(self))]
48 async fn ensure_image(&self, image: &str) -> Result<()> {
49 if self.image_exists_by_tag(image).await? {
50 return Ok(());
51 }
52 let mut stream = self.client.create_image(
53 Some(CreateImageOptions {
54 from_image: image,
55 ..Default::default()
56 }),
57 None,
58 None,
59 );
60 while let Some(item) = stream.next().await {
61 item.with_context(|| format!("pulling image '{image}'"))?;
62 }
63 Ok(())
64 }
65
66 async fn image_exists_by_tag(&self, tag: &str) -> Result<bool> {
68 let mut filters = HashMap::new();
69 filters.insert("reference".to_string(), vec![tag.to_string()]);
70 let images = self
71 .client
72 .list_images(Some(ListImagesOptions {
73 filters,
74 ..Default::default()
75 }))
76 .await
77 .with_context(|| format!("listing images for tag '{tag}'"))?;
78 Ok(!images.is_empty())
79 }
80
81 #[instrument(skip(self))]
82 async fn start_container(&self, image: &str) -> Result<String> {
83 let cfg = Config {
84 image: Some(image.to_string()),
85 cmd: Some(vec!["sh".into(), "-c".into(), "sleep infinity".into()]),
86 ..Default::default()
87 };
88 let create = self
89 .client
90 .create_container(None::<CreateContainerOptions<String>>, cfg)
91 .await
92 .context("create container")?;
93 self.client
94 .start_container(&create.id, None::<StartContainerOptions<String>>)
95 .await
96 .context("start container")?;
97 Ok(create.id)
98 }
99}
100
101#[async_trait]
102impl VmBackend for DockerBackend {
103 #[instrument(skip(self, _config))]
104 async fn create(&self, image: &str, _config: &VmConfig) -> Result<Box<dyn Vm>> {
105 self.ensure_image(image).await?;
106 let container_id = self.start_container(image).await?;
107 Ok(Box::new(DockerVm {
108 client: self.client.clone(),
109 container_id: Some(container_id),
110 }))
111 }
112
113 #[instrument(skip(self, _config))]
114 async fn restore(&self, snapshot: &SnapshotId, _config: &VmConfig) -> Result<Box<dyn Vm>> {
115 let container_id = self.start_container(snapshot.as_ref()).await?;
116 Ok(Box::new(DockerVm {
117 client: self.client.clone(),
118 container_id: Some(container_id),
119 }))
120 }
121
122 #[instrument(skip(self))]
123 async fn snapshot_exists(&self, snapshot: &SnapshotId) -> Result<bool> {
124 self.image_exists_by_tag(snapshot.as_ref()).await
125 }
126
127 #[instrument(skip(self))]
128 async fn remove_snapshot(&self, snapshot: &SnapshotId) -> Result<()> {
129 self.client
130 .remove_image(
131 snapshot.as_ref(),
132 Some(RemoveImageOptions {
133 force: true,
134 noprune: false,
135 }),
136 None,
137 )
138 .await
139 .with_context(|| format!("removing image '{snapshot}'"))?;
140 Ok(())
141 }
142}
143
144#[derive(derive_more::Debug)]
146struct DockerVm {
147 #[debug(skip)]
148 client: Docker,
149 container_id: Option<String>,
150}
151
152impl Drop for DockerVm {
153 fn drop(&mut self) {
154 if let Some(id) = self.container_id.take() {
155 let client = self.client.clone();
156 tokio::spawn(async move {
157 let opts = StopContainerOptions { t: 0 };
158 let _ = client.stop_container(&id, Some(opts)).await;
159 let rm = RemoveContainerOptions {
160 force: true,
161 v: true,
162 ..Default::default()
163 };
164 let _ = client.remove_container(&id, Some(rm)).await;
165 tracing::debug!(container = %id, "dropped container cleaned up");
166 });
167 }
168 }
169}
170
171fn tar_directory(host_path: &Path) -> Result<Vec<u8>> {
176 let mut archive = tar::Builder::new(Vec::new());
177 archive
178 .append_dir_all(".", host_path)
179 .with_context(|| format!("archiving '{}'", host_path.display()))?;
180 archive.finish().context("finalizing tar archive")?;
181 archive.into_inner().context("extracting tar bytes")
182}
183
184#[async_trait]
185impl Vm for DockerVm {
186 #[instrument(skip(self), fields(host = %host_path.display()))]
187 async fn inject(&self, host_path: &Path, guest_path: &str) -> Result<()> {
188 let cid = self
190 .container_id
191 .as_deref()
192 .context("container already destroyed")?;
193 let mkdir = self
194 .client
195 .create_exec(
196 cid,
197 CreateExecOptions {
198 cmd: Some(vec!["mkdir", "-p", guest_path]),
199 attach_stdout: Some(true),
200 attach_stderr: Some(true),
201 ..Default::default()
202 },
203 )
204 .await
205 .context("create mkdir exec")?;
206 if let StartExecResults::Attached { mut output, .. } = self
207 .client
208 .start_exec(&mkdir.id, None)
209 .await
210 .context("start mkdir exec")?
211 {
212 while output.next().await.is_some() {}
213 }
214
215 let tar_bytes = tar_directory(host_path)?;
216 let options = UploadToContainerOptions {
217 path: guest_path,
218 ..Default::default()
219 };
220 self.client
221 .upload_to_container(cid, Some(options), tar_bytes.into())
222 .await
223 .with_context(|| {
224 format!(
225 "uploading '{}' to container '{}:{guest_path}'",
226 host_path.display(),
227 cid.get(..12).unwrap_or(cid),
228 )
229 })?;
230 Ok(())
231 }
232
233 #[instrument(skip(self, env, sink))]
234 async fn exec(
235 &self,
236 cmd: &str,
237 env: &[(String, String)],
238 working_dir: &str,
239 sink: &dyn OutputSink,
240 ) -> Result<i32> {
241 let cid = self
242 .container_id
243 .as_deref()
244 .context("container already destroyed")?;
245 let env_strings: Vec<String> = env.iter().map(|(k, v)| format!("{k}={v}")).collect();
246 let exec = self
247 .client
248 .create_exec(
249 cid,
250 CreateExecOptions {
251 cmd: Some(vec!["sh", "-c", cmd]),
252 env: Some(env_strings.iter().map(String::as_str).collect()),
253 working_dir: Some(working_dir),
254 attach_stdout: Some(true),
255 attach_stderr: Some(true),
256 ..Default::default()
257 },
258 )
259 .await
260 .context("create exec")?;
261
262 if let StartExecResults::Attached { mut output, .. } = self
263 .client
264 .start_exec(&exec.id, None)
265 .await
266 .context("start exec")?
267 {
268 use bollard::container::LogOutput;
269
270 while let Some(item) = output.next().await {
271 let chunk = item.context("exec stream")?;
272 match chunk {
273 LogOutput::StdOut { message } => {
274 let text = String::from_utf8_lossy(&message);
275 for line in text.lines() {
276 sink.on_stdout(line);
277 }
278 }
279 LogOutput::StdErr { message } => {
280 let text = String::from_utf8_lossy(&message);
281 for line in text.lines() {
282 sink.on_stderr(line);
283 }
284 }
285 LogOutput::StdIn { .. } | LogOutput::Console { .. } => {}
286 }
287 }
288 }
289
290 let mut inspect_result = self.client.inspect_exec(&exec.id).await;
293 for _ in 0..3 {
294 if inspect_result.is_ok() {
295 break;
296 }
297 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
298 inspect_result = self.client.inspect_exec(&exec.id).await;
299 }
300 let inspect = inspect_result.context("inspect exec")?;
301
302 #[allow(
303 clippy::cast_possible_truncation,
304 reason = "docker exit codes fit in i32"
305 )]
306 let exit_code = inspect.exit_code.unwrap_or(0) as i32;
307 Ok(exit_code)
308 }
309
310 #[instrument(skip(self))]
311 async fn snapshot(&mut self, label: &SnapshotLabel) -> Result<SnapshotId> {
312 let cid = self
313 .container_id
314 .as_deref()
315 .context("container already destroyed")?;
316 let (repo, tag) = match label {
322 SnapshotLabel::Ephemeral => ("ephemeral", cid),
323 SnapshotLabel::Cached(key) => match key.split_once(':') {
324 Some((r, v)) => (r, v),
325 None => (key.as_str(), cid),
326 },
327 };
328 let opts = CommitContainerOptions {
329 container: cid,
330 repo,
331 tag,
332 ..Default::default()
333 };
334 #[allow(
337 clippy::duration_suboptimal_units,
338 reason = "from_mins is nightly-only"
339 )]
340 let commit_client = self
341 .client
342 .clone()
343 .with_timeout(std::time::Duration::from_secs(600));
344 commit_client
345 .commit_container(opts, Config::<String>::default())
346 .await
347 .context("commit container")?;
348 let full_tag = format!("{repo}:{tag}");
349 Ok(SnapshotId::new(full_tag))
350 }
351
352 #[instrument(skip(self))]
353 async fn destroy(&mut self) -> Result<()> {
354 let Some(id) = self.container_id.take() else {
355 return Ok(());
356 };
357 let _ = self
358 .client
359 .stop_container(&id, Some(StopContainerOptions { t: 0 }))
360 .await;
361 self.client
362 .remove_container(
363 &id,
364 Some(RemoveContainerOptions {
365 force: true,
366 v: true,
367 ..Default::default()
368 }),
369 )
370 .await
371 .with_context(|| format!("removing container '{id}'"))?;
372 Ok(())
373 }
374}