1use std::collections::HashMap;
2use std::io::{BufRead, BufReader, Read};
3use std::process::{Command, Stdio};
4use std::thread;
5use std::time::Instant;
6
7use bv_core::error::{BvError, Result};
8
9use crate::runtime::{
10 ContainerRuntime, GpuProfile, ImageDigest, ImageMetadata, Mount, OciRef, ProgressReporter,
11 RunOutcome, RunSpec, RuntimeInfo,
12};
13
14#[derive(Clone)]
15pub struct DockerRuntime;
16
17impl ContainerRuntime for DockerRuntime {
18 fn name(&self) -> &str {
19 "docker"
20 }
21
22 fn health_check(&self) -> Result<RuntimeInfo> {
23 let output = Command::new("docker")
24 .arg("version")
25 .output()
26 .map_err(|e| BvError::RuntimeNotAvailable {
27 runtime: "docker".into(),
28 reason: format!("could not execute `docker`: {e}"),
29 })?;
30
31 if !output.status.success() {
32 let stderr = String::from_utf8_lossy(&output.stderr);
33 return Err(BvError::RuntimeNotAvailable {
34 runtime: "docker".into(),
35 reason: format!("docker daemon not running or not accessible: {stderr}"),
36 });
37 }
38
39 let stdout = String::from_utf8_lossy(&output.stdout);
40
41 let versions: Vec<&str> = stdout
43 .lines()
44 .filter_map(|l| l.trim().strip_prefix("Version:").map(|v| v.trim()))
45 .collect();
46
47 let client_version = versions.first().copied().unwrap_or("unknown").to_string();
48 let server_version = versions.get(1).copied().map(str::to_string);
49
50 let mut extra = HashMap::new();
51 if let Some(sv) = server_version {
52 extra.insert("server_version".into(), sv);
53 }
54
55 Ok(RuntimeInfo {
56 name: "docker".into(),
57 version: client_version,
58 extra,
59 })
60 }
61
62 fn pull(&self, image: &OciRef, progress: &dyn ProgressReporter) -> Result<ImageDigest> {
63 let image_arg = image.docker_arg();
64 progress.update(&image_arg, None, None);
65
66 let mut child = Command::new("docker")
67 .args(["pull", &image_arg])
68 .stdout(Stdio::piped())
69 .stderr(Stdio::piped())
70 .spawn()
71 .map_err(|e| BvError::RuntimeNotAvailable {
72 runtime: "docker".into(),
73 reason: format!("could not execute `docker`: {e}"),
74 })?;
75
76 let stdout = child.stdout.take().expect("stdout was piped");
77 let stderr = child.stderr.take().expect("stderr was piped");
78
79 let stderr_thread = thread::spawn(move || {
81 let mut s = String::new();
82 BufReader::new(stderr).read_to_string(&mut s).ok();
83 s
84 });
85
86 let mut pull_digest: Option<String> = None;
89 let mut total_layers: u64 = 0;
90 let mut done_layers: u64 = 0;
91
92 for line in BufReader::new(stdout).lines() {
93 let line = line.map_err(BvError::Io)?;
94 let trimmed = line.trim();
95
96 if let Some(d) = trimmed.strip_prefix("Digest: ") {
97 pull_digest = Some(d.to_string());
98 continue;
99 }
100
101 if trimmed.ends_with(": Pulling fs layer") {
104 total_layers += 1;
105 } else if trimmed.ends_with(": Already exists") {
106 total_layers += 1;
107 done_layers += 1;
108 } else if trimmed.ends_with(": Pull complete") {
109 done_layers += 1;
110 }
111
112 let (cur, tot) = if total_layers > 0 {
113 (Some(done_layers), Some(total_layers))
114 } else {
115 (None, None)
116 };
117 progress.update(&image_arg, cur, tot);
118 }
119
120 let status = child.wait()?;
121 let stderr_output = stderr_thread.join().unwrap_or_default();
122
123 if !status.success() {
124 return Err(classify_pull_error(&stderr_output, &image_arg));
125 }
126
127 progress.finish("");
128
129 let digest = match pull_digest {
130 Some(d) => d,
131 None => self.repo_digest(&image_arg)?,
132 };
133
134 Ok(ImageDigest(digest))
135 }
136
137 fn run(&self, spec: &RunSpec) -> Result<RunOutcome> {
138 let start = Instant::now();
139
140 let mut cmd = Command::new("docker");
141 cmd.arg("run").arg("--rm");
142
143 if let Some((uid, gid)) = current_uid_gid() {
147 cmd.args(["--user", &format!("{uid}:{gid}")]);
148 }
149
150 if let Some(wd) = &spec.working_dir {
151 cmd.args(["-w", &wd.to_string_lossy()]);
152 }
153
154 for arg in self.mount_args(&spec.mounts) {
155 cmd.arg(arg);
156 }
157
158 for (k, v) in &spec.env {
159 cmd.arg("-e").arg(format!("{k}={v}"));
160 }
161
162 for arg in self.gpu_args(&spec.gpu) {
163 cmd.arg(arg);
164 }
165
166 if let Ok(val) = std::env::var("NVIDIA_VISIBLE_DEVICES") {
168 cmd.arg("-e").arg(format!("NVIDIA_VISIBLE_DEVICES={val}"));
169 }
170
171 cmd.arg(spec.image.docker_arg());
172
173 for arg in &spec.command {
174 cmd.arg(arg);
175 }
176
177 if spec.capture_output {
178 cmd.stdin(Stdio::null())
179 .stdout(Stdio::piped())
180 .stderr(Stdio::piped());
181 let output = cmd
182 .output()
183 .map_err(|e| BvError::RuntimeError(format!("docker run failed to launch: {e}")))?;
184 return Ok(RunOutcome {
185 exit_code: output.status.code().unwrap_or(-1),
186 duration: start.elapsed(),
187 stdout: output.stdout,
188 stderr: output.stderr,
189 });
190 }
191
192 cmd.stdin(Stdio::inherit())
193 .stdout(Stdio::inherit())
194 .stderr(Stdio::inherit());
195
196 let status = cmd
197 .status()
198 .map_err(|e| BvError::RuntimeError(format!("docker run failed to launch: {e}")))?;
199
200 Ok(RunOutcome {
201 exit_code: status.code().unwrap_or(-1),
202 duration: start.elapsed(),
203 stdout: Vec::new(),
204 stderr: Vec::new(),
205 })
206 }
207
208 fn inspect(&self, digest: &ImageDigest) -> Result<ImageMetadata> {
209 let output = Command::new("docker")
210 .args(["image", "inspect", "--format", "{{.Size}}", &digest.0])
211 .output()
212 .map_err(|e| BvError::RuntimeError(e.to_string()))?;
213
214 if !output.status.success() {
215 return Err(BvError::RuntimeError(format!(
216 "docker image inspect failed for '{}'",
217 digest.0
218 )));
219 }
220
221 let size_bytes = String::from_utf8_lossy(&output.stdout)
222 .trim()
223 .parse::<u64>()
224 .ok();
225
226 Ok(ImageMetadata {
227 digest: digest.clone(),
228 size_bytes,
229 labels: HashMap::new(),
230 })
231 }
232
233 fn is_locally_available(&self, image_ref: &str, digest: &str) -> bool {
234 let pinned = format!("{image_ref}@{digest}");
235 Command::new("docker")
236 .args(["image", "inspect", "--format", "{{.Id}}", &pinned])
237 .stdout(Stdio::null())
238 .stderr(Stdio::null())
239 .status()
240 .map(|s| s.success())
241 .unwrap_or(false)
242 }
243
244 fn gpu_args(&self, profile: &GpuProfile) -> Vec<String> {
245 match &profile.spec {
246 Some(spec) if spec.required => vec!["--gpus".into(), "all".into()],
247 _ => vec![],
248 }
249 }
250
251 fn mount_args(&self, mounts: &[Mount]) -> Vec<String> {
252 mounts
253 .iter()
254 .flat_map(|m| {
255 let mode = if m.read_only { "ro" } else { "rw" };
256 let spec = format!(
257 "{}:{}:{mode}",
258 m.host_path.display(),
259 m.container_path.display()
260 );
261 ["-v".to_string(), spec]
262 })
263 .collect()
264 }
265}
266
267impl DockerRuntime {
268 pub fn pull_verified(
283 &self,
284 image: &OciRef,
285 expected_digest: &str,
286 progress: &dyn ProgressReporter,
287 ) -> Result<ImageDigest> {
288 let got = self.pull(image, progress)?;
289 verify_digest(&image.to_string(), expected_digest, &got.0)?;
290 Ok(got)
291 }
292
293 pub fn pull_verified_v2(
303 &self,
304 image: &OciRef,
305 expected_image_digest: &str,
306 layers: &[bv_core::lockfile::LayerDescriptor],
307 progress: &dyn ProgressReporter,
308 ) -> Result<ImageDigest> {
309 let got = self.pull(image, progress)?;
310 verify_digest(&image.to_string(), expected_image_digest, &got.0)?;
311
312 if !layers.is_empty() {
313 self.verify_layer_digests(image, layers)?;
314 }
315
316 Ok(got)
317 }
318
319 pub fn verify_layer_digests(
326 &self,
327 image: &OciRef,
328 expected_layers: &[bv_core::lockfile::LayerDescriptor],
329 ) -> Result<()> {
330 let image_arg = image.docker_arg();
331
332 let output = Command::new("docker")
334 .args([
335 "image",
336 "inspect",
337 "--format",
338 "{{range .RootFS.Layers}}{{.}}\n{{end}}",
339 &image_arg,
340 ])
341 .output()
342 .map_err(|e| BvError::RuntimeError(format!("docker image inspect failed: {e}")))?;
343
344 if !output.status.success() {
345 return Ok(());
347 }
348
349 let stdout = String::from_utf8_lossy(&output.stdout);
350 let actual_layers: Vec<&str> = stdout.lines().filter(|l| !l.is_empty()).collect();
351
352 if actual_layers.len() != expected_layers.len() {
357 return Ok(());
361 }
362
363 for (i, (expected, actual)) in expected_layers.iter().zip(actual_layers.iter()).enumerate()
364 {
365 if expected.digest != *actual {
366 return Err(BvError::RuntimeError(format!(
367 "layer digest mismatch at index {i} for '{image_arg}': \
368 expected {expected_digest} but got {actual} — \
369 possible upstream tampering or mismatched layer ordering",
370 expected_digest = expected.digest,
371 )));
372 }
373 }
374
375 Ok(())
376 }
377
378 fn repo_digest(&self, image_ref: &str) -> Result<String> {
380 let output = Command::new("docker")
381 .args([
382 "image",
383 "inspect",
384 "--format",
385 "{{index .RepoDigests 0}}",
386 image_ref,
387 ])
388 .output()
389 .map_err(|e| BvError::RuntimeError(e.to_string()))?;
390
391 if !output.status.success() {
392 return Err(BvError::RuntimeError(format!(
393 "could not inspect image '{image_ref}' after pull"
394 )));
395 }
396
397 let line = String::from_utf8_lossy(&output.stdout);
398 let line = line.trim();
399
400 if let Some(digest) = line.split('@').nth(1) {
402 Ok(digest.to_string())
403 } else if line.starts_with("sha256:") {
404 Ok(line.to_string())
405 } else {
406 let id_output = Command::new("docker")
408 .args(["image", "inspect", "--format", "{{.Id}}", image_ref])
409 .output()
410 .map_err(|e| BvError::RuntimeError(e.to_string()))?;
411 Ok(String::from_utf8_lossy(&id_output.stdout)
412 .trim()
413 .to_string())
414 }
415 }
416}
417
418#[cfg(unix)]
423fn current_uid_gid() -> Option<(u32, u32)> {
424 unsafe extern "C" {
426 fn getuid() -> u32;
427 fn getgid() -> u32;
428 }
429 Some((unsafe { getuid() }, unsafe { getgid() }))
430}
431
432#[cfg(not(unix))]
433fn current_uid_gid() -> Option<(u32, u32)> {
434 None
435}
436
437fn verify_digest(image_ref: &str, expected: &str, got: &str) -> Result<()> {
440 if expected == got {
441 Ok(())
442 } else {
443 Err(BvError::RuntimeError(format!(
444 "image digest mismatch for '{image_ref}': expected {expected} but registry returned {got}"
445 )))
446 }
447}
448
449fn classify_pull_error(stderr: &str, image_ref: &str) -> BvError {
451 if stderr.contains("Cannot connect to the Docker daemon")
452 || stderr.contains("Is the docker daemon running")
453 {
454 BvError::RuntimeNotAvailable {
455 runtime: "docker".into(),
456 reason: "Docker daemon is not available. Is Docker Desktop running?".into(),
457 }
458 } else if stderr.contains("manifest unknown")
459 || stderr.contains("not found")
460 || stderr.contains("does not exist")
461 {
462 BvError::RuntimeError(format!(
463 "image '{image_ref}' not found in registry (check the tool manifest)"
464 ))
465 } else if stderr.contains("connection refused") || stderr.contains("no such host") {
466 BvError::RuntimeError(format!(
467 "network error while pulling '{image_ref}': {stderr}"
468 ))
469 } else {
470 BvError::RuntimeError(format!("docker pull failed:\n{stderr}"))
471 }
472}
473
474#[cfg(test)]
475mod tests {
476 use super::*;
477
478 #[test]
479 fn verify_digest_matches() {
480 assert!(verify_digest("ncbi/blast", "sha256:abc", "sha256:abc").is_ok());
481 }
482
483 #[test]
484 fn verify_digest_rejects_mismatch() {
485 let err = verify_digest("ncbi/blast", "sha256:abc", "sha256:def").unwrap_err();
486 let msg = err.to_string();
487 assert!(msg.contains("ncbi/blast"));
488 assert!(msg.contains("sha256:abc"));
489 assert!(msg.contains("sha256:def"));
490 assert!(msg.contains("mismatch"));
491 }
492
493 #[cfg(unix)]
494 #[test]
495 fn current_uid_gid_returns_some_on_unix() {
496 let got = current_uid_gid();
497 assert!(got.is_some());
498 }
499
500 fn count_layers(lines: &[&str]) -> (u64, u64) {
503 let mut total: u64 = 0;
504 let mut done: u64 = 0;
505 for line in lines {
506 let t = line.trim();
507 if t.ends_with(": Pulling fs layer") {
508 total += 1;
509 } else if t.ends_with(": Already exists") {
510 total += 1;
511 done += 1;
512 } else if t.ends_with(": Pull complete") {
513 done += 1;
514 }
515 }
516 (done, total)
517 }
518
519 #[test]
520 fn layer_count_fresh_pull() {
521 let lines = [
522 "abc123: Pulling fs layer",
523 "def456: Pulling fs layer",
524 "abc123: Downloading",
525 "abc123: Pull complete",
526 "def456: Pull complete",
527 "Digest: sha256:deadbeef",
528 ];
529 assert_eq!(count_layers(&lines), (2, 2));
530 }
531
532 #[test]
533 fn layer_count_partially_cached() {
534 let lines = [
535 "abc123: Already exists",
536 "def456: Pulling fs layer",
537 "def456: Pull complete",
538 ];
539 assert_eq!(count_layers(&lines), (2, 2));
541 }
542
543 #[test]
544 fn layer_count_in_progress() {
545 let lines = [
546 "abc123: Pulling fs layer",
547 "def456: Pulling fs layer",
548 "ghi789: Pulling fs layer",
549 "abc123: Pull complete",
550 ];
552 assert_eq!(count_layers(&lines), (1, 3));
553 }
554}