harmont_cli/orchestrator/
docker_host_fns.rs1use anyhow::{Context, Result};
7use hm_plugin_protocol::{DockerCommitArgs, DockerExecArgs, DockerExtractArgs, DockerStartArgs};
8
9use super::state::current;
10
11const EXTRACT_CMD_SH: &str = r#"set -e
26mkdir -p "$WORKDIR"
27cd "$WORKDIR"
28manifest="$WORKDIR/.harmont-extracted"
29if [ -f "$manifest" ]; then
30 # Longest paths first: removes nested entries before their parents.
31 sort -r "$manifest" | while IFS= read -r p; do
32 [ -n "$p" ] || continue
33 if [ -d "$p" ] && [ ! -L "$p" ]; then
34 rmdir "$p" 2>/dev/null || true
35 else
36 rm -f "$p" 2>/dev/null || true
37 fi
38 done
39 rm -f "$manifest"
40fi
41# Stream the archive into a temp file so we can both list and extract.
42tmp=$(mktemp)
43trap 'rm -f "$tmp"' EXIT
44cat > "$tmp"
45tar -tzf "$tmp" > "$manifest"
46tar -xzf "$tmp"
47"#;
48
49pub(crate) async fn ping_impl() -> bool {
50 let Some(s) = current() else {
51 return false;
52 };
53 s.docker.ping().await.is_ok()
54}
55
56pub(crate) async fn image_exists_impl(tag: String) -> bool {
57 let Some(s) = current() else { return false };
58 s.docker.image_exists(&tag).await.unwrap_or(false)
59}
60
61pub(crate) async fn pull_impl(tag: String) -> Result<()> {
62 let s = current().context("no orchestrator state")?;
63 let cancel = s.cancel.clone();
64 let docker = s.docker.clone();
65 let pull_fut = async move { docker.pull_image(&tag).await };
66 tokio::select! {
67 result = pull_fut => result,
68 () = wait_cancel(&cancel) => Err(anyhow::anyhow!("cancelled during image pull")),
69 }
70}
71
72pub(crate) async fn start_container_impl(args: DockerStartArgs) -> Result<String> {
73 let s = current().context("no orchestrator state")?;
74 let env_vec: Vec<String> = args
75 .env
76 .into_iter()
77 .map(|(k, v)| format!("{k}={v}"))
78 .collect();
79 s.docker
80 .start_long_lived(&args.image, &env_vec, &args.workdir, &args.name_hint)
81 .await
82}
83
84pub(crate) async fn extract_workspace_impl(args: DockerExtractArgs) -> Result<()> {
85 let s = current().context("no orchestrator state")?;
86 let archive = s.archives.read(args.archive_id, 0, u64::MAX);
87 if archive.is_empty() {
88 anyhow::bail!("archive {} is empty or unknown", args.archive_id.0);
89 }
90 let cancel = s.cancel.clone();
91 let docker = s.docker.clone();
92 let cid = args.container_id;
93 let workdir = args.workdir;
94 let cmd = vec![
95 "sh".to_string(),
96 "-c".to_string(),
97 EXTRACT_CMD_SH.replace("$WORKDIR", &workdir),
98 ];
99 let extract_fut = async move {
100 let mut sink = tokio::io::sink();
101 let rc = docker
102 .exec_streaming_stdin(&cid, &cmd, &[], "/", &archive, &mut sink)
103 .await?;
104 if rc != 0 {
105 anyhow::bail!("tar extract exited {rc}");
106 }
107 Ok::<(), anyhow::Error>(())
108 };
109 tokio::select! {
110 result = extract_fut => result,
111 () = wait_cancel(&cancel) => Err(anyhow::anyhow!("cancelled during workspace extract")),
112 }
113}
114
115pub(crate) async fn exec_impl(args: DockerExecArgs) -> Result<i32> {
116 let s = current().context("no orchestrator state")?;
117 let env_vec: Vec<String> = args
118 .env
119 .into_iter()
120 .map(|(k, v)| format!("{k}={v}"))
121 .collect();
122 let mut writer = StepLogWriter::new();
126
127 let cancel = s.cancel.clone();
129 let docker = s.docker.clone();
130 let cid = args.container_id.clone();
131 let cmd = args.cmd.clone();
132 let workdir = args.workdir.clone();
133 let archive_opt = args.stdin_archive_id;
134 let archive_bytes = archive_opt.map(|id| s.archives.read(id, 0, u64::MAX));
135
136 let exec_fut = async move {
137 let rc = match archive_bytes {
138 Some(bytes) => {
139 docker
140 .exec_streaming_stdin(&cid, &cmd, &env_vec, &workdir, &bytes, &mut writer)
141 .await?
142 }
143 None => {
144 docker
145 .exec_streaming(&cid, &cmd, &env_vec, &workdir, &mut writer)
146 .await?
147 }
148 };
149 writer.flush_remaining();
150 Ok::<i64, anyhow::Error>(rc)
151 };
152
153 let rc = tokio::select! {
154 result = exec_fut => result?,
155 () = wait_cancel(&cancel) => {
156 return Ok(130);
158 }
159 };
160 i32::try_from(rc).context("docker exit code out of i32 range")
161}
162
163async fn wait_cancel(cancel: &crate::orchestrator::cancel::CancellationToken) {
164 loop {
166 if cancel.is_cancelled() {
167 return;
168 }
169 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
170 }
171}
172
173pub(crate) async fn commit_impl(args: DockerCommitArgs) -> Result<String> {
174 let s = current().context("no orchestrator state")?;
175 s.docker
176 .commit_container(&args.container_id, &args.tag)
177 .await
178}
179
180pub(crate) async fn remove_image_impl(tag: String) -> Result<()> {
181 let s = current().context("no orchestrator state")?;
182 s.docker.remove_image(&tag).await
183}
184
185pub(crate) async fn stop_remove_impl(container_id: String) {
186 if let Some(s) = current() {
187 s.docker.stop_remove(&container_id).await;
188 }
189}
190
191struct StepLogWriter {
194 buf: Vec<u8>,
195}
196
197impl StepLogWriter {
198 fn new() -> Self {
199 Self {
200 buf: Vec::with_capacity(8192),
201 }
202 }
203
204 fn flush_line(line: &[u8]) {
205 let Some(state) = current() else { return };
206 let Some(step_id) = crate::plugin::host_fns::current_step_id() else {
207 return;
208 };
209 state
210 .event_bus
211 .emit(hm_plugin_protocol::BuildEvent::StepLog {
212 step_id,
213 stream: hm_plugin_protocol::StdStream::Stdout,
214 line: String::from_utf8_lossy(line).into_owned(),
215 ts: chrono::Utc::now(),
216 });
217 }
218
219 fn flush_remaining(&mut self) {
220 if !self.buf.is_empty() {
221 let line = std::mem::take(&mut self.buf);
222 Self::flush_line(&line);
223 }
224 }
225}
226
227impl tokio::io::AsyncWrite for StepLogWriter {
228 fn poll_write(
229 mut self: std::pin::Pin<&mut Self>,
230 _cx: &mut std::task::Context<'_>,
231 buf: &[u8],
232 ) -> std::task::Poll<std::io::Result<usize>> {
233 let len = buf.len();
234 for b in buf {
235 if *b == b'\n' {
236 let line = std::mem::take(&mut self.buf);
237 Self::flush_line(&line);
238 } else {
239 self.buf.push(*b);
240 }
241 }
242 std::task::Poll::Ready(Ok(len))
243 }
244
245 fn poll_flush(
246 self: std::pin::Pin<&mut Self>,
247 _cx: &mut std::task::Context<'_>,
248 ) -> std::task::Poll<std::io::Result<()>> {
249 std::task::Poll::Ready(Ok(()))
250 }
251
252 fn poll_shutdown(
253 mut self: std::pin::Pin<&mut Self>,
254 _cx: &mut std::task::Context<'_>,
255 ) -> std::task::Poll<std::io::Result<()>> {
256 self.flush_remaining();
257 std::task::Poll::Ready(Ok(()))
258 }
259}