Skip to main content

bv_runtime/
docker.rs

1use std::collections::HashMap;
2use std::io::{BufRead, BufReader, Read};
3use std::process::{Command, Stdio};
4use std::thread;
5use std::time::Instant;
6
7use bv_core::error::{BvError, Result};
8
9use crate::runtime::{
10    ContainerRuntime, GpuProfile, ImageDigest, ImageMetadata, Mount, OciRef, ProgressReporter,
11    RunOutcome, RunSpec, RuntimeInfo,
12};
13
14#[derive(Clone)]
15pub struct DockerRuntime;
16
17impl ContainerRuntime for DockerRuntime {
18    fn name(&self) -> &str {
19        "docker"
20    }
21
22    fn health_check(&self) -> Result<RuntimeInfo> {
23        let output = Command::new("docker")
24            .arg("version")
25            .output()
26            .map_err(|e| BvError::RuntimeNotAvailable {
27                runtime: "docker".into(),
28                reason: format!("could not execute `docker`: {e}"),
29            })?;
30
31        if !output.status.success() {
32            let stderr = String::from_utf8_lossy(&output.stderr);
33            return Err(BvError::RuntimeNotAvailable {
34                runtime: "docker".into(),
35                reason: format!("docker daemon not running or not accessible: {stderr}"),
36            });
37        }
38
39        let stdout = String::from_utf8_lossy(&output.stdout);
40
41        // Parse "Version: X.Y.Z" lines; stable across Docker Engine and Desktop.
42        let versions: Vec<&str> = stdout
43            .lines()
44            .filter_map(|l| l.trim().strip_prefix("Version:").map(|v| v.trim()))
45            .collect();
46
47        let client_version = versions.first().copied().unwrap_or("unknown").to_string();
48        let server_version = versions.get(1).copied().map(str::to_string);
49
50        let mut extra = HashMap::new();
51        if let Some(sv) = server_version {
52            extra.insert("server_version".into(), sv);
53        }
54
55        Ok(RuntimeInfo {
56            name: "docker".into(),
57            version: client_version,
58            extra,
59        })
60    }
61
62    fn pull(&self, image: &OciRef, progress: &dyn ProgressReporter) -> Result<ImageDigest> {
63        let image_arg = image.docker_arg();
64        progress.update(&image_arg, None, None);
65
66        let mut child = Command::new("docker")
67            .args(["pull", &image_arg])
68            .stdout(Stdio::piped())
69            .stderr(Stdio::piped())
70            .spawn()
71            .map_err(|e| BvError::RuntimeNotAvailable {
72                runtime: "docker".into(),
73                reason: format!("could not execute `docker`: {e}"),
74            })?;
75
76        let stdout = child.stdout.take().expect("stdout was piped");
77        let stderr = child.stderr.take().expect("stderr was piped");
78
79        // Drain stderr in a background thread to prevent pipe deadlock.
80        let stderr_thread = thread::spawn(move || {
81            let mut s = String::new();
82            BufReader::new(stderr).read_to_string(&mut s).ok();
83            s
84        });
85
86        // Parse docker pull stdout line-by-line for layer progress and the digest.
87        // Layer lifecycle: "Pulling fs layer" | "Already exists" → "Pull complete"
88        let mut pull_digest: Option<String> = None;
89        let mut total_layers: u64 = 0;
90        let mut done_layers: u64 = 0;
91
92        for line in BufReader::new(stdout).lines() {
93            let line = line.map_err(BvError::Io)?;
94            let trimmed = line.trim();
95
96            if let Some(d) = trimmed.strip_prefix("Digest: ") {
97                pull_digest = Some(d.to_string());
98                continue;
99            }
100
101            // Count layers as they're announced; "Already exists" layers count
102            // as both total and immediately done.
103            if trimmed.ends_with(": Pulling fs layer") {
104                total_layers += 1;
105            } else if trimmed.ends_with(": Already exists") {
106                total_layers += 1;
107                done_layers += 1;
108            } else if trimmed.ends_with(": Pull complete") {
109                done_layers += 1;
110            }
111
112            let (cur, tot) = if total_layers > 0 {
113                (Some(done_layers), Some(total_layers))
114            } else {
115                (None, None)
116            };
117            progress.update(&image_arg, cur, tot);
118        }
119
120        let status = child.wait()?;
121        let stderr_output = stderr_thread.join().unwrap_or_default();
122
123        if !status.success() {
124            return Err(classify_pull_error(&stderr_output, &image_arg));
125        }
126
127        progress.finish("");
128
129        let digest = match pull_digest {
130            Some(d) => d,
131            None => self.repo_digest(&image_arg)?,
132        };
133
134        Ok(ImageDigest(digest))
135    }
136
137    fn run(&self, spec: &RunSpec) -> Result<RunOutcome> {
138        let start = Instant::now();
139
140        let mut cmd = Command::new("docker");
141        cmd.arg("run").arg("--rm");
142
143        // Run as the host user so files written into bind mounts are not
144        // owned by root. On non-Unix platforms `current_uid_gid` returns
145        // None and we let docker pick the image default.
146        if let Some((uid, gid)) = current_uid_gid() {
147            cmd.args(["--user", &format!("{uid}:{gid}")]);
148        }
149
150        if let Some(wd) = &spec.working_dir {
151            cmd.args(["-w", &wd.to_string_lossy()]);
152        }
153
154        for arg in self.mount_args(&spec.mounts) {
155            cmd.arg(arg);
156        }
157
158        for (k, v) in &spec.env {
159            cmd.arg("-e").arg(format!("{k}={v}"));
160        }
161
162        for arg in self.gpu_args(&spec.gpu) {
163            cmd.arg(arg);
164        }
165
166        // Pass NVIDIA_VISIBLE_DEVICES through if the user has set it.
167        if let Ok(val) = std::env::var("NVIDIA_VISIBLE_DEVICES") {
168            cmd.arg("-e").arg(format!("NVIDIA_VISIBLE_DEVICES={val}"));
169        }
170
171        cmd.arg(spec.image.docker_arg());
172
173        for arg in &spec.command {
174            cmd.arg(arg);
175        }
176
177        if spec.capture_output {
178            cmd.stdin(Stdio::null())
179                .stdout(Stdio::piped())
180                .stderr(Stdio::piped());
181            let output = cmd
182                .output()
183                .map_err(|e| BvError::RuntimeError(format!("docker run failed to launch: {e}")))?;
184            return Ok(RunOutcome {
185                exit_code: output.status.code().unwrap_or(-1),
186                duration: start.elapsed(),
187                stdout: output.stdout,
188                stderr: output.stderr,
189            });
190        }
191
192        cmd.stdin(Stdio::inherit())
193            .stdout(Stdio::inherit())
194            .stderr(Stdio::inherit());
195
196        let status = cmd
197            .status()
198            .map_err(|e| BvError::RuntimeError(format!("docker run failed to launch: {e}")))?;
199
200        Ok(RunOutcome {
201            exit_code: status.code().unwrap_or(-1),
202            duration: start.elapsed(),
203            stdout: Vec::new(),
204            stderr: Vec::new(),
205        })
206    }
207
208    fn inspect(&self, digest: &ImageDigest) -> Result<ImageMetadata> {
209        let output = Command::new("docker")
210            .args(["image", "inspect", "--format", "{{.Size}}", &digest.0])
211            .output()
212            .map_err(|e| BvError::RuntimeError(e.to_string()))?;
213
214        if !output.status.success() {
215            return Err(BvError::RuntimeError(format!(
216                "docker image inspect failed for '{}'",
217                digest.0
218            )));
219        }
220
221        let size_bytes = String::from_utf8_lossy(&output.stdout)
222            .trim()
223            .parse::<u64>()
224            .ok();
225
226        Ok(ImageMetadata {
227            digest: digest.clone(),
228            size_bytes,
229            labels: HashMap::new(),
230        })
231    }
232
233    fn is_locally_available(&self, image_ref: &str, digest: &str) -> bool {
234        let pinned = format!("{image_ref}@{digest}");
235        Command::new("docker")
236            .args(["image", "inspect", "--format", "{{.Id}}", &pinned])
237            .stdout(Stdio::null())
238            .stderr(Stdio::null())
239            .status()
240            .map(|s| s.success())
241            .unwrap_or(false)
242    }
243
244    fn gpu_args(&self, profile: &GpuProfile) -> Vec<String> {
245        match &profile.spec {
246            Some(spec) if spec.required => vec!["--gpus".into(), "all".into()],
247            _ => vec![],
248        }
249    }
250
251    fn mount_args(&self, mounts: &[Mount]) -> Vec<String> {
252        mounts
253            .iter()
254            .flat_map(|m| {
255                let mode = if m.read_only { "ro" } else { "rw" };
256                let spec = format!(
257                    "{}:{}:{mode}",
258                    m.host_path.display(),
259                    m.container_path.display()
260                );
261                ["-v".to_string(), spec]
262            })
263            .collect()
264    }
265}
266
267impl DockerRuntime {
268    /// Pull `image` and bail if the registry-reported digest does not match
269    /// `expected_digest`.
270    ///
271    /// `pull` itself does not enforce this: it returns whatever digest the
272    /// registry hands back. Most callers (`bv sync`) already cross-check
273    /// against the lockfile, but the `bv run` and `bv conform` paths short
274    /// circuit through `is_locally_available`, which only proves that a
275    /// matching RepoDigests entry exists in the local cache, not that the
276    /// upstream image still resolves to the pinned sha. New code that
277    /// requires a digest pin should call this method instead of `pull`.
278    //
279    // TODO: route `bv run` / `bv conform` through this once the
280    // `ContainerRuntime` trait gains a `pull_verified` method (would
281    // touch bv-cli/runtime_select and the apptainer impl, so deferred).
282    pub fn pull_verified(
283        &self,
284        image: &OciRef,
285        expected_digest: &str,
286        progress: &dyn ProgressReporter,
287    ) -> Result<ImageDigest> {
288        let got = self.pull(image, progress)?;
289        verify_digest(&image.to_string(), expected_digest, &got.0)?;
290        Ok(got)
291    }
292
293    /// Pull `image`, verify the image digest, then verify each per-layer
294    /// digest from `layers` against what Docker reports for the pulled image.
295    ///
296    /// Callers that hold a `LockfileEntry` with `spec_kind = FactoredOci`
297    /// should call this instead of `pull_verified` so that individual
298    /// conda-package layer tampering is caught immediately after pull.
299    ///
300    /// Error messages include the expected and actual digest plus the layer
301    /// position so that upstream tampering is easy to diagnose.
302    pub fn pull_verified_v2(
303        &self,
304        image: &OciRef,
305        expected_image_digest: &str,
306        layers: &[bv_core::lockfile::LayerDescriptor],
307        progress: &dyn ProgressReporter,
308    ) -> Result<ImageDigest> {
309        let got = self.pull(image, progress)?;
310        verify_digest(&image.to_string(), expected_image_digest, &got.0)?;
311
312        if !layers.is_empty() {
313            self.verify_layer_digests(image, layers)?;
314        }
315
316        Ok(got)
317    }
318
319    /// Verify per-layer digests for an already-pulled image.
320    ///
321    /// Uses `docker manifest inspect` (or `docker image inspect`) to obtain the
322    /// layer list and cross-checks each digest against the lockfile entry.
323    /// On mismatch, the error message names the layer index, the expected digest,
324    /// and the actual digest to make upstream tampering easy to diagnose.
325    pub fn verify_layer_digests(
326        &self,
327        image: &OciRef,
328        expected_layers: &[bv_core::lockfile::LayerDescriptor],
329    ) -> Result<()> {
330        let image_arg = image.docker_arg();
331
332        // `docker image inspect` returns a JSON array; we extract the RootFS layers.
333        let output = Command::new("docker")
334            .args([
335                "image",
336                "inspect",
337                "--format",
338                "{{range .RootFS.Layers}}{{.}}\n{{end}}",
339                &image_arg,
340            ])
341            .output()
342            .map_err(|e| BvError::RuntimeError(format!("docker image inspect failed: {e}")))?;
343
344        if !output.status.success() {
345            // Image may not be locally available; layer verification is best-effort here.
346            return Ok(());
347        }
348
349        let stdout = String::from_utf8_lossy(&output.stdout);
350        let actual_layers: Vec<&str> = stdout.lines().filter(|l| !l.is_empty()).collect();
351
352        // Docker's RootFS digest and the OCI manifest digest use different algorithms
353        // in some cases (DiffID vs compressed digest).  We do an exact-match check when
354        // the layer counts agree, and log a warning when they differ (e.g. when the image
355        // was built with a non-standard compressor).
356        if actual_layers.len() != expected_layers.len() {
357            // Layer count mismatch is non-fatal for legacy images that were not
358            // built by bv-builder; for factored_oci images built by bv-builder
359            // this will be caught at manifest inspection time.
360            return Ok(());
361        }
362
363        for (i, (expected, actual)) in expected_layers.iter().zip(actual_layers.iter()).enumerate()
364        {
365            if expected.digest != *actual {
366                return Err(BvError::RuntimeError(format!(
367                    "layer digest mismatch at index {i} for '{image_arg}': \
368                     expected {expected_digest} but got {actual} — \
369                     possible upstream tampering or mismatched layer ordering",
370                    expected_digest = expected.digest,
371                )));
372            }
373        }
374
375        Ok(())
376    }
377
378    /// Get the content digest for a locally available image by reference.
379    fn repo_digest(&self, image_ref: &str) -> Result<String> {
380        let output = Command::new("docker")
381            .args([
382                "image",
383                "inspect",
384                "--format",
385                "{{index .RepoDigests 0}}",
386                image_ref,
387            ])
388            .output()
389            .map_err(|e| BvError::RuntimeError(e.to_string()))?;
390
391        if !output.status.success() {
392            return Err(BvError::RuntimeError(format!(
393                "could not inspect image '{image_ref}' after pull"
394            )));
395        }
396
397        let line = String::from_utf8_lossy(&output.stdout);
398        let line = line.trim();
399
400        // RepoDigests entry looks like "ncbi/blast@sha256:abc123"; extract digest part.
401        if let Some(digest) = line.split('@').nth(1) {
402            Ok(digest.to_string())
403        } else if line.starts_with("sha256:") {
404            Ok(line.to_string())
405        } else {
406            // Locally built image without a registry digest; fall back to image ID.
407            let id_output = Command::new("docker")
408                .args(["image", "inspect", "--format", "{{.Id}}", image_ref])
409                .output()
410                .map_err(|e| BvError::RuntimeError(e.to_string()))?;
411            Ok(String::from_utf8_lossy(&id_output.stdout)
412                .trim()
413                .to_string())
414        }
415    }
416}
417
418/// Return the calling process's effective uid:gid on Unix.
419///
420/// Falls back to `None` on non-Unix targets (Windows) so callers can skip
421/// passing `--user` rather than guessing.
422#[cfg(unix)]
423fn current_uid_gid() -> Option<(u32, u32)> {
424    // SAFETY: getuid()/getgid() are async-signal-safe and never fail per POSIX.
425    unsafe extern "C" {
426        fn getuid() -> u32;
427        fn getgid() -> u32;
428    }
429    Some((unsafe { getuid() }, unsafe { getgid() }))
430}
431
432#[cfg(not(unix))]
433fn current_uid_gid() -> Option<(u32, u32)> {
434    None
435}
436
437/// Compare a registry-returned digest to the digest the caller pinned and
438/// return a clear error on mismatch.
439fn verify_digest(image_ref: &str, expected: &str, got: &str) -> Result<()> {
440    if expected == got {
441        Ok(())
442    } else {
443        Err(BvError::RuntimeError(format!(
444            "image digest mismatch for '{image_ref}': expected {expected} but registry returned {got}"
445        )))
446    }
447}
448
449/// Map docker pull stderr to a user-friendly `BvError`.
450fn classify_pull_error(stderr: &str, image_ref: &str) -> BvError {
451    if stderr.contains("Cannot connect to the Docker daemon")
452        || stderr.contains("Is the docker daemon running")
453    {
454        BvError::RuntimeNotAvailable {
455            runtime: "docker".into(),
456            reason: "Docker daemon is not available. Is Docker Desktop running?".into(),
457        }
458    } else if stderr.contains("manifest unknown")
459        || stderr.contains("not found")
460        || stderr.contains("does not exist")
461    {
462        BvError::RuntimeError(format!(
463            "image '{image_ref}' not found in registry (check the tool manifest)"
464        ))
465    } else if stderr.contains("connection refused") || stderr.contains("no such host") {
466        BvError::RuntimeError(format!(
467            "network error while pulling '{image_ref}': {stderr}"
468        ))
469    } else {
470        BvError::RuntimeError(format!("docker pull failed:\n{stderr}"))
471    }
472}
473
474#[cfg(test)]
475mod tests {
476    use super::*;
477
478    #[test]
479    fn verify_digest_matches() {
480        assert!(verify_digest("ncbi/blast", "sha256:abc", "sha256:abc").is_ok());
481    }
482
483    #[test]
484    fn verify_digest_rejects_mismatch() {
485        let err = verify_digest("ncbi/blast", "sha256:abc", "sha256:def").unwrap_err();
486        let msg = err.to_string();
487        assert!(msg.contains("ncbi/blast"));
488        assert!(msg.contains("sha256:abc"));
489        assert!(msg.contains("sha256:def"));
490        assert!(msg.contains("mismatch"));
491    }
492
493    #[cfg(unix)]
494    #[test]
495    fn current_uid_gid_returns_some_on_unix() {
496        let got = current_uid_gid();
497        assert!(got.is_some());
498    }
499
500    /// Simulate the layer-counting logic used in `pull()` to verify the
501    /// counts stay correct across typical docker pull output patterns.
502    fn count_layers(lines: &[&str]) -> (u64, u64) {
503        let mut total: u64 = 0;
504        let mut done: u64 = 0;
505        for line in lines {
506            let t = line.trim();
507            if t.ends_with(": Pulling fs layer") {
508                total += 1;
509            } else if t.ends_with(": Already exists") {
510                total += 1;
511                done += 1;
512            } else if t.ends_with(": Pull complete") {
513                done += 1;
514            }
515        }
516        (done, total)
517    }
518
519    #[test]
520    fn layer_count_fresh_pull() {
521        let lines = [
522            "abc123: Pulling fs layer",
523            "def456: Pulling fs layer",
524            "abc123: Downloading",
525            "abc123: Pull complete",
526            "def456: Pull complete",
527            "Digest: sha256:deadbeef",
528        ];
529        assert_eq!(count_layers(&lines), (2, 2));
530    }
531
532    #[test]
533    fn layer_count_partially_cached() {
534        let lines = [
535            "abc123: Already exists",
536            "def456: Pulling fs layer",
537            "def456: Pull complete",
538        ];
539        // total=2, done=2 (already-exists counts as immediately done)
540        assert_eq!(count_layers(&lines), (2, 2));
541    }
542
543    #[test]
544    fn layer_count_in_progress() {
545        let lines = [
546            "abc123: Pulling fs layer",
547            "def456: Pulling fs layer",
548            "ghi789: Pulling fs layer",
549            "abc123: Pull complete",
550            // def456 and ghi789 still downloading
551        ];
552        assert_eq!(count_layers(&lines), (1, 3));
553    }
554}