Skip to main content

harmont_cli/runner/
docker.rs

1//! Docker-based step runner.
2//!
3//! Each step runs inside a Docker container. The source archive is
4//! piped into `/workspace` via `tar -xzf -` before the step command
5//! runs. System-level state (packages, caches, virtualenvs) AND
6//! workspace files all propagate via Docker image commits — no bind
7//! mounts, no host-side COW clones.
8
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::Arc;
12
13use anyhow::{Context, Result};
14use hm_plugin_protocol::{
15    BuildEvent, CacheDecision, CommandStep, ExecutorInput, SnapshotRef, StdStream, StepResult,
16};
17use uuid::Uuid;
18
19use super::{RunContext, StepRunner};
20use crate::orchestrator::events::EventBus;
21
22/// Step runner that executes pipeline steps inside Docker containers
23/// via the local daemon (Bollard).
24#[derive(Debug)]
25pub struct DockerRunner;
26
27impl StepRunner for DockerRunner {
28    fn name(&self) -> &'static str {
29        "docker"
30    }
31
32    fn execute(
33        &self,
34        ctx: &RunContext,
35        input: ExecutorInput,
36    ) -> Pin<Box<dyn Future<Output = Result<StepResult>> + Send + '_>> {
37        let ctx = ctx.clone();
38        Box::pin(async move { run_step(&ctx, input).await })
39    }
40}
41
42fn resolve_image(step: &CommandStep, input: &ExecutorInput) -> String {
43    if let Some(snap) = &input.parent_snapshot {
44        return snap.to_string();
45    }
46    step.image
47        .clone()
48        .unwrap_or_else(|| "alpine:latest".to_string())
49}
50
51async fn run_step(ctx: &RunContext, input: ExecutorInput) -> Result<StepResult> {
52    let plan = decision_plan(&input.cache_lookup);
53
54    if !plan.run_command {
55        return Ok(StepResult {
56            exit_code: 0,
57            committed_snapshot: plan.hit_tag.clone(),
58            artifacts: vec![],
59        });
60    }
61
62    let image = resolve_image(&input.step, &input);
63    let container_name = sanitize_container_name(&input.run_id.to_string(), &input.step.key);
64    let env_vec: Vec<String> = input.env.iter().map(|(k, v)| format!("{k}={v}")).collect();
65
66    // Pull image if needed.
67    if !ctx.docker.image_exists(&image).await.unwrap_or(false) {
68        let docker = ctx.docker.clone();
69        let cancel = ctx.cancel.clone();
70        let img = image.clone();
71        let pull_fut = async move { docker.pull_image(&img).await };
72        tokio::select! {
73            result = pull_fut => result.with_context(|| format!("pull '{image}'"))?,
74            () = cancel.cancelled() => anyhow::bail!("cancelled during image pull"),
75        }
76    }
77
78    let cid = ctx
79        .docker
80        .start_long_lived(&image, &env_vec, &input.workdir, &container_name)
81        .await
82        .context("docker start failed")?;
83
84    // Pipe source archive into /workspace. Runs for every step — cached
85    // parent images contain stale workspace files; tar overwrites source
86    // while preserving build artifacts (tar is additive).
87    let archive_bytes = ctx
88        .archives
89        .get_bytes(input.workspace_archive_id)
90        .ok_or_else(|| anyhow::anyhow!("source archive not found"))?;
91
92    let mkdir_cmd = vec!["mkdir".into(), "-p".into(), input.workdir.clone()];
93    let mut sink = tokio::io::sink();
94    ctx.docker
95        .exec_streaming(&cid, &mkdir_cmd, &env_vec, "/", &mut sink)
96        .await
97        .context("mkdir /workspace")?;
98
99    let tar_cmd = vec![
100        "tar".into(),
101        "-xzf".into(),
102        "-".into(),
103        "-C".into(),
104        input.workdir.clone(),
105    ];
106    ctx.docker
107        .exec_streaming_stdin(&cid, &tar_cmd, &env_vec, "/", &archive_bytes, &mut sink)
108        .await
109        .context("pipe source archive into container")?;
110
111    let result = run_in_container(ctx, &cid, &input, &env_vec, &plan).await;
112    ctx.docker.stop_remove(&cid).await;
113    result
114}
115
116async fn run_in_container(
117    ctx: &RunContext,
118    cid: &str,
119    input: &ExecutorInput,
120    env_vec: &[String],
121    plan: &DecisionPlan,
122) -> Result<StepResult> {
123    let mut writer = StepLogWriter::new(input.step_id, Arc::clone(&ctx.event_bus));
124    let docker = ctx.docker.clone();
125    let cancel = ctx.cancel.clone();
126    let cid_owned = cid.to_owned();
127    let cmd = vec!["sh".into(), "-c".into(), input.step.cmd.clone()];
128    let workdir = input.workdir.clone();
129    let env_owned = env_vec.to_vec();
130    let exec_fut = async move {
131        let rc = docker
132            .exec_streaming(&cid_owned, &cmd, &env_owned, &workdir, &mut writer)
133            .await?;
134        writer.flush_remaining();
135        Ok::<i64, anyhow::Error>(rc)
136    };
137
138    let rc = tokio::select! {
139        result = exec_fut => result.context("docker exec failed")?,
140        () = cancel.cancelled() => {
141            return Ok(StepResult {
142                exit_code: 130,
143                committed_snapshot: None,
144                artifacts: vec![],
145            });
146        }
147    };
148
149    #[allow(
150        clippy::cast_possible_truncation,
151        reason = "docker exit codes fit in i32"
152    )]
153    let exit_code = rc as i32;
154
155    // Commit container so child steps inherit system-level changes
156    // (installed packages, etc.). Workspace files propagate via COW
157    // bind mounts, but the container image captures everything else.
158    let committed = if exit_code == 0 {
159        let target_tag = plan.commit_to.clone().unwrap_or_else(|| {
160            let safe: String = input
161                .step
162                .key
163                .chars()
164                .map(|c| {
165                    if c.is_ascii_alphanumeric() || c == '_' || c == '-' {
166                        c
167                    } else {
168                        '-'
169                    }
170                })
171                .collect();
172            SnapshotRef::from(format!(
173                "harmont-local-ephemeral/{safe}:run-{}",
174                input.step_id.simple()
175            ))
176        });
177        match ctx
178            .docker
179            .commit_container(cid, &target_tag.to_string())
180            .await
181        {
182            Ok(_) => Some(target_tag),
183            Err(e) => {
184                tracing::warn!(step_key = %input.step.key, "docker commit failed, step still succeeded: {e:#}");
185                None
186            }
187        }
188    } else {
189        None
190    };
191
192    Ok(StepResult {
193        exit_code,
194        committed_snapshot: committed,
195        artifacts: vec![],
196    })
197}
198
199#[derive(Debug, Clone)]
200struct DecisionPlan {
201    run_command: bool,
202    commit_to: Option<SnapshotRef>,
203    hit_tag: Option<SnapshotRef>,
204}
205
206fn decision_plan(decision: &CacheDecision) -> DecisionPlan {
207    match decision {
208        CacheDecision::Hit { tag } => DecisionPlan {
209            run_command: false,
210            commit_to: None,
211            hit_tag: Some(tag.clone()),
212        },
213        CacheDecision::MissBuildAs { tag } => DecisionPlan {
214            run_command: true,
215            commit_to: Some(tag.clone()),
216            hit_tag: None,
217        },
218        CacheDecision::MissNoCommit => DecisionPlan {
219            run_command: true,
220            commit_to: None,
221            hit_tag: None,
222        },
223    }
224}
225
226fn sanitize_container_name(run_id: &str, step_key: &str) -> String {
227    let run_short: String = run_id.chars().take(8).collect();
228    let key: String = step_key
229        .chars()
230        .map(|c| {
231            if c.is_ascii_alphanumeric() || c == '_' || c == '-' {
232                c
233            } else {
234                '-'
235            }
236        })
237        .collect();
238    format!("harmont-{run_short}-{key}")
239}
240
241/// Streams bytes from a Docker exec into per-line [`BuildEvent::StepLog`]
242/// events on the [`EventBus`]. Buffers partial lines until a `\n` arrives.
243struct StepLogWriter {
244    step_id: Uuid,
245    bus: Arc<EventBus>,
246    buf: Vec<u8>,
247}
248
249impl StepLogWriter {
250    fn new(step_id: Uuid, bus: Arc<EventBus>) -> Self {
251        Self {
252            step_id,
253            bus,
254            buf: Vec::with_capacity(8192),
255        }
256    }
257
258    fn flush_line(&self, line: &[u8]) {
259        self.bus.emit(BuildEvent::StepLog {
260            step_id: self.step_id,
261            stream: StdStream::Stdout,
262            line: String::from_utf8_lossy(line).into_owned(),
263            ts: chrono::Utc::now(),
264        });
265    }
266
267    fn flush_remaining(&mut self) {
268        if !self.buf.is_empty() {
269            let line = std::mem::take(&mut self.buf);
270            self.flush_line(&line);
271        }
272    }
273}
274
275impl tokio::io::AsyncWrite for StepLogWriter {
276    fn poll_write(
277        mut self: Pin<&mut Self>,
278        _cx: &mut std::task::Context<'_>,
279        buf: &[u8],
280    ) -> std::task::Poll<std::io::Result<usize>> {
281        let len = buf.len();
282        for b in buf {
283            if *b == b'\n' {
284                let line = std::mem::take(&mut self.buf);
285                self.flush_line(&line);
286            } else {
287                self.buf.push(*b);
288            }
289        }
290        std::task::Poll::Ready(Ok(len))
291    }
292
293    fn poll_flush(
294        self: Pin<&mut Self>,
295        _cx: &mut std::task::Context<'_>,
296    ) -> std::task::Poll<std::io::Result<()>> {
297        std::task::Poll::Ready(Ok(()))
298    }
299
300    fn poll_shutdown(
301        mut self: Pin<&mut Self>,
302        _cx: &mut std::task::Context<'_>,
303    ) -> std::task::Poll<std::io::Result<()>> {
304        self.flush_remaining();
305        std::task::Poll::Ready(Ok(()))
306    }
307}
308
309#[cfg(test)]
310#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
311mod tests {
312    use super::*;
313
314    use hm_plugin_protocol::CacheDecision;
315
316    fn step_with_image(image: Option<&str>) -> CommandStep {
317        CommandStep {
318            key: "k".into(),
319            label: None,
320            cmd: "true".into(),
321            image: image.map(String::from),
322            env: None,
323            timeout_seconds: None,
324            cache: None,
325            runner: None,
326            runner_args: None,
327        }
328    }
329
330    fn make_input(step: CommandStep, parent_snapshot: Option<SnapshotRef>) -> ExecutorInput {
331        ExecutorInput {
332            step,
333            workspace_archive_id: hm_plugin_protocol::ArchiveId::from(uuid::Uuid::nil()),
334            env: std::collections::BTreeMap::new(),
335            workdir: "/workspace".into(),
336            run_id: uuid::Uuid::nil(),
337            step_id: uuid::Uuid::nil(),
338            cache_lookup: CacheDecision::MissNoCommit,
339            parent_snapshot,
340        }
341    }
342
343    // -- resolve_image ----------------------------------------------------
344
345    #[test]
346    fn resolve_image_uses_step_image() {
347        let s = step_with_image(Some("rust:1.82"));
348        let input = make_input(s.clone(), None);
349        assert_eq!(resolve_image(&s, &input), "rust:1.82");
350    }
351
352    #[test]
353    fn resolve_image_fallback_alpine() {
354        let s = step_with_image(None);
355        let input = make_input(s.clone(), None);
356        assert_eq!(resolve_image(&s, &input), "alpine:latest");
357    }
358
359    #[test]
360    fn resolve_image_prefers_parent_snapshot() {
361        let s = step_with_image(Some("rust:1.82"));
362        let snap = SnapshotRef::from("harmont-local-ephemeral/base:abc123".to_string());
363        let input = make_input(s.clone(), Some(snap));
364        assert_eq!(
365            resolve_image(&s, &input),
366            "harmont-local-ephemeral/base:abc123"
367        );
368    }
369
370    // -- decision_plan -------------------------------------------------------
371
372    #[test]
373    fn decision_hit_skips_command() {
374        let plan = decision_plan(&CacheDecision::Hit {
375            tag: SnapshotRef("cached:v1".into()),
376        });
377        assert!(!plan.run_command);
378        assert!(plan.commit_to.is_none());
379        assert_eq!(plan.hit_tag.as_ref().unwrap().0, "cached:v1");
380    }
381
382    #[test]
383    fn decision_miss_build_as_runs_and_commits() {
384        let plan = decision_plan(&CacheDecision::MissBuildAs {
385            tag: SnapshotRef("build:v2".into()),
386        });
387        assert!(plan.run_command);
388        assert_eq!(plan.commit_to.as_ref().unwrap().0, "build:v2");
389        assert!(plan.hit_tag.is_none());
390    }
391
392    #[test]
393    fn decision_miss_no_commit() {
394        let plan = decision_plan(&CacheDecision::MissNoCommit);
395        assert!(plan.run_command);
396        assert!(plan.commit_to.is_none());
397        assert!(plan.hit_tag.is_none());
398    }
399
400    // -- sanitize_container_name ---------------------------------------------
401
402    #[test]
403    fn sanitize_container_name_replaces_special_chars() {
404        let name = sanitize_container_name("abcdef12-3456-7890", "my/step.key:v1");
405        assert_eq!(name, "harmont-abcdef12-my-step-key-v1");
406    }
407
408    #[test]
409    fn sanitize_container_name_preserves_valid_chars() {
410        let name = sanitize_container_name("run-id-1234", "normal_step-key");
411        assert_eq!(name, "harmont-run-id-1-normal_step-key");
412    }
413
414    // -- StepLogWriter -------------------------------------------------------
415
416    #[tokio::test]
417    async fn step_log_writer_emits_on_newline() {
418        use tokio::io::AsyncWriteExt;
419
420        let bus = EventBus::new();
421        let mut rx = bus.subscribe();
422        let step_id = Uuid::new_v4();
423
424        let mut writer = StepLogWriter::new(step_id, bus);
425        writer.write_all(b"hello\nworld\n").await.unwrap();
426
427        let ev1 = rx.recv().await.unwrap();
428        let ev2 = rx.recv().await.unwrap();
429
430        match ev1 {
431            BuildEvent::StepLog {
432                step_id: sid, line, ..
433            } => {
434                assert_eq!(sid, step_id);
435                assert_eq!(line, "hello");
436            }
437            other => panic!("expected StepLog, got {other:?}"),
438        }
439        match ev2 {
440            BuildEvent::StepLog { line, .. } => assert_eq!(line, "world"),
441            other => panic!("expected StepLog, got {other:?}"),
442        }
443    }
444
445    #[tokio::test]
446    async fn step_log_writer_flushes_remaining_on_shutdown() {
447        use tokio::io::AsyncWriteExt;
448
449        let bus = EventBus::new();
450        let mut rx = bus.subscribe();
451        let step_id = Uuid::new_v4();
452
453        let mut writer = StepLogWriter::new(step_id, bus);
454        // Write partial line without trailing newline.
455        writer.write_all(b"partial").await.unwrap();
456        writer.shutdown().await.unwrap();
457
458        let ev = rx.recv().await.unwrap();
459        match ev {
460            BuildEvent::StepLog { line, .. } => assert_eq!(line, "partial"),
461            other => panic!("expected StepLog, got {other:?}"),
462        }
463    }
464}