Skip to main content

harmont_cli/orchestrator/
docker_host_fns.rs

1//! Bollard-backed implementations of the `hm_docker_*` host fns.
2//!
3//! These wrap [`crate::orchestrator::docker_client::DockerClient`]. The
4//! docker step-executor plugin calls these via Extism host-fn imports.
5
6use anyhow::{Context, Result};
7use hm_plugin_protocol::{DockerCommitArgs, DockerExecArgs, DockerExtractArgs, DockerStartArgs};
8
9use super::state::current;
10
11// Workspace extract must be idempotent across snapshot reuse: when a
12// parent snapshot is shared across different repos (e.g. an apt-base
13// step's image cached on apt-package set only, then reused by two
14// example projects), the previous repo's files in $WORKDIR would
15// otherwise leak into the new run because `tar -xzf` overlays rather
16// than mirrors. To keep this surgical, every extract writes a manifest
17// of the paths it laid down to `$WORKDIR/.harmont-extracted`. The next
18// extract reads that manifest, deletes only the paths the previous
19// extract added (longest first so files go before their parent dirs),
20// then unpacks the new archive (writing a fresh manifest). Files
21// created inside the container by a step's command (e.g. `node_modules`
22// after `npm ci`, build artifacts under `build/`) are not in any
23// manifest, so they survive untouched — preserving the intra-chain
24// artifact-passing semantics that toolchains rely on.
25const EXTRACT_CMD_SH: &str = r#"set -e
26mkdir -p "$WORKDIR"
27cd "$WORKDIR"
28manifest="$WORKDIR/.harmont-extracted"
29if [ -f "$manifest" ]; then
30  # Longest paths first: removes nested entries before their parents.
31  sort -r "$manifest" | while IFS= read -r p; do
32    [ -n "$p" ] || continue
33    if [ -d "$p" ] && [ ! -L "$p" ]; then
34      rmdir "$p" 2>/dev/null || true
35    else
36      rm -f "$p" 2>/dev/null || true
37    fi
38  done
39  rm -f "$manifest"
40fi
41# Stream the archive into a temp file so we can both list and extract.
42tmp=$(mktemp)
43trap 'rm -f "$tmp"' EXIT
44cat > "$tmp"
45tar -tzf "$tmp" > "$manifest"
46tar -xzf "$tmp"
47"#;
48
49pub(crate) async fn ping_impl() -> bool {
50    let Some(s) = current() else {
51        return false;
52    };
53    s.docker.ping().await.is_ok()
54}
55
56pub(crate) async fn image_exists_impl(tag: String) -> bool {
57    let Some(s) = current() else { return false };
58    s.docker.image_exists(&tag).await.unwrap_or(false)
59}
60
61pub(crate) async fn pull_impl(tag: String) -> Result<()> {
62    let s = current().context("no orchestrator state")?;
63    let cancel = s.cancel.clone();
64    let docker = s.docker.clone();
65    let pull_fut = async move { docker.pull_image(&tag).await };
66    tokio::select! {
67        result = pull_fut => result,
68        () = wait_cancel(&cancel) => Err(anyhow::anyhow!("cancelled during image pull")),
69    }
70}
71
72pub(crate) async fn start_container_impl(args: DockerStartArgs) -> Result<String> {
73    let s = current().context("no orchestrator state")?;
74    let env_vec: Vec<String> = args
75        .env
76        .into_iter()
77        .map(|(k, v)| format!("{k}={v}"))
78        .collect();
79    s.docker
80        .start_long_lived(&args.image, &env_vec, &args.workdir, &args.name_hint)
81        .await
82}
83
84pub(crate) async fn extract_workspace_impl(args: DockerExtractArgs) -> Result<()> {
85    let s = current().context("no orchestrator state")?;
86    let archive = s.archives.read(args.archive_id, 0, u64::MAX);
87    if archive.is_empty() {
88        anyhow::bail!("archive {} is empty or unknown", args.archive_id.0);
89    }
90    let cancel = s.cancel.clone();
91    let docker = s.docker.clone();
92    let cid = args.container_id;
93    let workdir = args.workdir;
94    let cmd = vec![
95        "sh".to_string(),
96        "-c".to_string(),
97        EXTRACT_CMD_SH.replace("$WORKDIR", &workdir),
98    ];
99    let extract_fut = async move {
100        let mut sink = tokio::io::sink();
101        let rc = docker
102            .exec_streaming_stdin(&cid, &cmd, &[], "/", &archive, &mut sink)
103            .await?;
104        if rc != 0 {
105            anyhow::bail!("tar extract exited {rc}");
106        }
107        Ok::<(), anyhow::Error>(())
108    };
109    tokio::select! {
110        result = extract_fut => result,
111        () = wait_cancel(&cancel) => Err(anyhow::anyhow!("cancelled during workspace extract")),
112    }
113}
114
115pub(crate) async fn exec_impl(args: DockerExecArgs) -> Result<i32> {
116    let s = current().context("no orchestrator state")?;
117    let env_vec: Vec<String> = args
118        .env
119        .into_iter()
120        .map(|(k, v)| format!("{k}={v}"))
121        .collect();
122    // Emit StepLog events for each line written; the writer below
123    // forwards bytes into the event bus tagged with the current
124    // thread-local step_id set by the scheduler.
125    let mut writer = StepLogWriter::new();
126
127    // Future doing the exec; we race it against cancellation.
128    let cancel = s.cancel.clone();
129    let docker = s.docker.clone();
130    let cid = args.container_id.clone();
131    let cmd = args.cmd.clone();
132    let workdir = args.workdir.clone();
133    let archive_opt = args.stdin_archive_id;
134    let archive_bytes = archive_opt.map(|id| s.archives.read(id, 0, u64::MAX));
135
136    let exec_fut = async move {
137        let rc = match archive_bytes {
138            Some(bytes) => {
139                docker
140                    .exec_streaming_stdin(&cid, &cmd, &env_vec, &workdir, &bytes, &mut writer)
141                    .await?
142            }
143            None => {
144                docker
145                    .exec_streaming(&cid, &cmd, &env_vec, &workdir, &mut writer)
146                    .await?
147            }
148        };
149        writer.flush_remaining();
150        Ok::<i64, anyhow::Error>(rc)
151    };
152
153    let rc = tokio::select! {
154        result = exec_fut => result?,
155        () = wait_cancel(&cancel) => {
156            // Cancelled. Try to bail with the conventional sigint code.
157            return Ok(130);
158        }
159    };
160    i32::try_from(rc).context("docker exit code out of i32 range")
161}
162
163async fn wait_cancel(cancel: &crate::orchestrator::cancel::CancellationToken) {
164    // Poll the atomic every 50ms. Cheap; never wakes a thread early.
165    loop {
166        if cancel.is_cancelled() {
167            return;
168        }
169        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
170    }
171}
172
173pub(crate) async fn commit_impl(args: DockerCommitArgs) -> Result<String> {
174    let s = current().context("no orchestrator state")?;
175    s.docker
176        .commit_container(&args.container_id, &args.tag)
177        .await
178}
179
180pub(crate) async fn remove_image_impl(tag: String) -> Result<()> {
181    let s = current().context("no orchestrator state")?;
182    s.docker.remove_image(&tag).await
183}
184
185pub(crate) async fn stop_remove_impl(container_id: String) {
186    if let Some(s) = current() {
187        s.docker.stop_remove(&container_id).await;
188    }
189}
190
191/// Streams bytes from a Docker exec into per-line `StepLog` events on
192/// the event bus. Buffers partial lines until a `\n` arrives.
193struct StepLogWriter {
194    buf: Vec<u8>,
195}
196
197impl StepLogWriter {
198    fn new() -> Self {
199        Self {
200            buf: Vec::with_capacity(8192),
201        }
202    }
203
204    fn flush_line(line: &[u8]) {
205        let Some(state) = current() else { return };
206        let Some(step_id) = crate::plugin::host_fns::current_step_id() else {
207            return;
208        };
209        state
210            .event_bus
211            .emit(hm_plugin_protocol::BuildEvent::StepLog {
212                step_id,
213                stream: hm_plugin_protocol::StdStream::Stdout,
214                line: String::from_utf8_lossy(line).into_owned(),
215                ts: chrono::Utc::now(),
216            });
217    }
218
219    fn flush_remaining(&mut self) {
220        if !self.buf.is_empty() {
221            let line = std::mem::take(&mut self.buf);
222            Self::flush_line(&line);
223        }
224    }
225}
226
227impl tokio::io::AsyncWrite for StepLogWriter {
228    fn poll_write(
229        mut self: std::pin::Pin<&mut Self>,
230        _cx: &mut std::task::Context<'_>,
231        buf: &[u8],
232    ) -> std::task::Poll<std::io::Result<usize>> {
233        let len = buf.len();
234        for b in buf {
235            if *b == b'\n' {
236                let line = std::mem::take(&mut self.buf);
237                Self::flush_line(&line);
238            } else {
239                self.buf.push(*b);
240            }
241        }
242        std::task::Poll::Ready(Ok(len))
243    }
244
245    fn poll_flush(
246        self: std::pin::Pin<&mut Self>,
247        _cx: &mut std::task::Context<'_>,
248    ) -> std::task::Poll<std::io::Result<()>> {
249        std::task::Poll::Ready(Ok(()))
250    }
251
252    fn poll_shutdown(
253        mut self: std::pin::Pin<&mut Self>,
254        _cx: &mut std::task::Context<'_>,
255    ) -> std::task::Poll<std::io::Result<()>> {
256        self.flush_remaining();
257        std::task::Poll::Ready(Ok(()))
258    }
259}