Skip to main content

bv_runtime/
docker.rs

1use std::collections::HashMap;
2use std::io::{BufRead, BufReader, Read};
3use std::process::{Command, Stdio};
4use std::thread;
5use std::time::Instant;
6
7use bv_core::error::{BvError, Result};
8
9use crate::runtime::{
10    ContainerRuntime, GpuProfile, ImageDigest, ImageMetadata, Mount, OciRef, ProgressReporter,
11    RunOutcome, RunSpec, RuntimeInfo,
12};
13
14#[derive(Clone)]
15pub struct DockerRuntime;
16
17impl ContainerRuntime for DockerRuntime {
18    fn name(&self) -> &str {
19        "docker"
20    }
21
22    fn health_check(&self) -> Result<RuntimeInfo> {
23        let output = Command::new("docker")
24            .arg("version")
25            .output()
26            .map_err(|e| BvError::RuntimeNotAvailable {
27                runtime: "docker".into(),
28                reason: format!("could not execute `docker`: {e}"),
29            })?;
30
31        if !output.status.success() {
32            let stderr = String::from_utf8_lossy(&output.stderr);
33            return Err(BvError::RuntimeNotAvailable {
34                runtime: "docker".into(),
35                reason: format!("docker daemon not running or not accessible: {stderr}"),
36            });
37        }
38
39        let stdout = String::from_utf8_lossy(&output.stdout);
40
41        // Parse "Version: X.Y.Z" lines; stable across Docker Engine and Desktop.
42        let versions: Vec<&str> = stdout
43            .lines()
44            .filter_map(|l| l.trim().strip_prefix("Version:").map(|v| v.trim()))
45            .collect();
46
47        let client_version = versions.first().copied().unwrap_or("unknown").to_string();
48        let server_version = versions.get(1).copied().map(str::to_string);
49
50        let mut extra = HashMap::new();
51        if let Some(sv) = server_version {
52            extra.insert("server_version".into(), sv);
53        }
54
55        Ok(RuntimeInfo {
56            name: "docker".into(),
57            version: client_version,
58            extra,
59        })
60    }
61
62    fn pull(&self, image: &OciRef, progress: &dyn ProgressReporter) -> Result<ImageDigest> {
63        let image_arg = image.docker_arg();
64        progress.update(&format!("Pulling {image_arg}"), None, None);
65
66        let mut child = Command::new("docker")
67            .args(["pull", &image_arg])
68            .stdout(Stdio::piped())
69            .stderr(Stdio::piped())
70            .spawn()
71            .map_err(|e| BvError::RuntimeNotAvailable {
72                runtime: "docker".into(),
73                reason: format!("could not execute `docker`: {e}"),
74            })?;
75
76        let stdout = child.stdout.take().expect("stdout was piped");
77        let stderr = child.stderr.take().expect("stderr was piped");
78
79        // Drain stderr in a background thread to prevent pipe deadlock.
80        let stderr_thread = thread::spawn(move || {
81            let mut s = String::new();
82            BufReader::new(stderr).read_to_string(&mut s).ok();
83            s
84        });
85
86        // Parse docker pull stdout line-by-line for progress and the digest.
87        let mut pull_digest: Option<String> = None;
88        for line in BufReader::new(stdout).lines() {
89            let line = line.map_err(BvError::Io)?;
90            let trimmed = line.trim();
91            // "Digest: sha256:abc123"
92            if let Some(d) = trimmed.strip_prefix("Digest: ") {
93                pull_digest = Some(d.to_string());
94            }
95            progress.update(trimmed, None, None);
96        }
97
98        let status = child.wait()?;
99        let stderr_output = stderr_thread.join().unwrap_or_default();
100
101        if !status.success() {
102            return Err(classify_pull_error(&stderr_output, &image_arg));
103        }
104
105        progress.finish(&format!("Pulled {image_arg}"));
106
107        let digest = match pull_digest {
108            Some(d) => d,
109            None => self.repo_digest(&image_arg)?,
110        };
111
112        Ok(ImageDigest(digest))
113    }
114
115    fn run(&self, spec: &RunSpec) -> Result<RunOutcome> {
116        let start = Instant::now();
117
118        let mut cmd = Command::new("docker");
119        cmd.arg("run").arg("--rm");
120
121        if let Some(wd) = &spec.working_dir {
122            cmd.args(["-w", &wd.to_string_lossy()]);
123        }
124
125        for arg in self.mount_args(&spec.mounts) {
126            cmd.arg(arg);
127        }
128
129        for (k, v) in &spec.env {
130            cmd.arg("-e").arg(format!("{k}={v}"));
131        }
132
133        for arg in self.gpu_args(&spec.gpu) {
134            cmd.arg(arg);
135        }
136
137        // Pass NVIDIA_VISIBLE_DEVICES through if the user has set it.
138        if let Ok(val) = std::env::var("NVIDIA_VISIBLE_DEVICES") {
139            cmd.arg("-e").arg(format!("NVIDIA_VISIBLE_DEVICES={val}"));
140        }
141
142        cmd.arg(spec.image.docker_arg());
143
144        for arg in &spec.command {
145            cmd.arg(arg);
146        }
147
148        cmd.stdin(Stdio::inherit())
149            .stdout(Stdio::inherit())
150            .stderr(Stdio::inherit());
151
152        let status = cmd
153            .status()
154            .map_err(|e| BvError::RuntimeError(format!("docker run failed to launch: {e}")))?;
155
156        Ok(RunOutcome {
157            exit_code: status.code().unwrap_or(-1),
158            duration: start.elapsed(),
159        })
160    }
161
162    fn inspect(&self, digest: &ImageDigest) -> Result<ImageMetadata> {
163        let output = Command::new("docker")
164            .args(["image", "inspect", "--format", "{{.Size}}", &digest.0])
165            .output()
166            .map_err(|e| BvError::RuntimeError(e.to_string()))?;
167
168        if !output.status.success() {
169            return Err(BvError::RuntimeError(format!(
170                "docker image inspect failed for '{}'",
171                digest.0
172            )));
173        }
174
175        let size_bytes = String::from_utf8_lossy(&output.stdout)
176            .trim()
177            .parse::<u64>()
178            .ok();
179
180        Ok(ImageMetadata {
181            digest: digest.clone(),
182            size_bytes,
183            labels: HashMap::new(),
184        })
185    }
186
187    fn is_locally_available(&self, image_ref: &str, digest: &str) -> bool {
188        let pinned = format!("{image_ref}@{digest}");
189        Command::new("docker")
190            .args(["image", "inspect", "--format", "{{.Id}}", &pinned])
191            .stdout(Stdio::null())
192            .stderr(Stdio::null())
193            .status()
194            .map(|s| s.success())
195            .unwrap_or(false)
196    }
197
198    fn gpu_args(&self, profile: &GpuProfile) -> Vec<String> {
199        match &profile.spec {
200            Some(spec) if spec.required => vec!["--gpus".into(), "all".into()],
201            _ => vec![],
202        }
203    }
204
205    fn mount_args(&self, mounts: &[Mount]) -> Vec<String> {
206        mounts
207            .iter()
208            .flat_map(|m| {
209                let mode = if m.read_only { "ro" } else { "rw" };
210                let spec = format!(
211                    "{}:{}:{mode}",
212                    m.host_path.display(),
213                    m.container_path.display()
214                );
215                ["-v".to_string(), spec]
216            })
217            .collect()
218    }
219}
220
221impl DockerRuntime {
222    /// Get the content digest for a locally available image by reference.
223    fn repo_digest(&self, image_ref: &str) -> Result<String> {
224        let output = Command::new("docker")
225            .args([
226                "image",
227                "inspect",
228                "--format",
229                "{{index .RepoDigests 0}}",
230                image_ref,
231            ])
232            .output()
233            .map_err(|e| BvError::RuntimeError(e.to_string()))?;
234
235        if !output.status.success() {
236            return Err(BvError::RuntimeError(format!(
237                "could not inspect image '{image_ref}' after pull"
238            )));
239        }
240
241        let line = String::from_utf8_lossy(&output.stdout);
242        let line = line.trim();
243
244        // RepoDigests entry looks like "ncbi/blast@sha256:abc123"; extract digest part.
245        if let Some(digest) = line.split('@').nth(1) {
246            Ok(digest.to_string())
247        } else if line.starts_with("sha256:") {
248            Ok(line.to_string())
249        } else {
250            // Locally built image without a registry digest; fall back to image ID.
251            let id_output = Command::new("docker")
252                .args(["image", "inspect", "--format", "{{.Id}}", image_ref])
253                .output()
254                .map_err(|e| BvError::RuntimeError(e.to_string()))?;
255            Ok(String::from_utf8_lossy(&id_output.stdout)
256                .trim()
257                .to_string())
258        }
259    }
260}
261
262/// Map docker pull stderr to a user-friendly `BvError`.
263fn classify_pull_error(stderr: &str, image_ref: &str) -> BvError {
264    if stderr.contains("Cannot connect to the Docker daemon")
265        || stderr.contains("Is the docker daemon running")
266    {
267        BvError::RuntimeNotAvailable {
268            runtime: "docker".into(),
269            reason: "Docker daemon is not available. Is Docker Desktop running?".into(),
270        }
271    } else if stderr.contains("manifest unknown")
272        || stderr.contains("not found")
273        || stderr.contains("does not exist")
274    {
275        BvError::RuntimeError(format!(
276            "image '{image_ref}' not found in registry (check the tool manifest)"
277        ))
278    } else if stderr.contains("connection refused") || stderr.contains("no such host") {
279        BvError::RuntimeError(format!(
280            "network error while pulling '{image_ref}': {stderr}"
281        ))
282    } else {
283        BvError::RuntimeError(format!("docker pull failed:\n{stderr}"))
284    }
285}