1use std::future::Future;
10use std::pin::Pin;
11use std::sync::Arc;
12
13use anyhow::{Context, Result};
14use hm_plugin_protocol::{
15 BuildEvent, CacheDecision, CommandStep, ExecutorInput, SnapshotRef, StdStream, StepResult,
16};
17use uuid::Uuid;
18
19use super::{RunContext, StepRunner};
20use crate::orchestrator::events::EventBus;
21
22#[derive(Debug)]
25pub struct DockerRunner;
26
27impl StepRunner for DockerRunner {
28 fn name(&self) -> &'static str {
29 "docker"
30 }
31
32 fn execute(
33 &self,
34 ctx: &RunContext,
35 input: ExecutorInput,
36 ) -> Pin<Box<dyn Future<Output = Result<StepResult>> + Send + '_>> {
37 let ctx = ctx.clone();
38 Box::pin(async move { run_step(&ctx, input).await })
39 }
40}
41
42fn resolve_image(step: &CommandStep, input: &ExecutorInput) -> String {
43 if let Some(snap) = &input.parent_snapshot {
44 return snap.to_string();
45 }
46 step.image
47 .clone()
48 .unwrap_or_else(|| "alpine:latest".to_string())
49}
50
51async fn run_step(ctx: &RunContext, input: ExecutorInput) -> Result<StepResult> {
52 let plan = decision_plan(&input.cache_lookup);
53
54 if !plan.run_command {
55 return Ok(StepResult {
56 exit_code: 0,
57 committed_snapshot: plan.hit_tag.clone(),
58 artifacts: vec![],
59 });
60 }
61
62 let image = resolve_image(&input.step, &input);
63 let container_name = sanitize_container_name(&input.run_id.to_string(), &input.step.key);
64 let env_vec: Vec<String> = input.env.iter().map(|(k, v)| format!("{k}={v}")).collect();
65
66 if !ctx.docker.image_exists(&image).await.unwrap_or(false) {
68 let docker = ctx.docker.clone();
69 let cancel = ctx.cancel.clone();
70 let img = image.clone();
71 let pull_fut = async move { docker.pull_image(&img).await };
72 tokio::select! {
73 result = pull_fut => result.with_context(|| format!("pull '{image}'"))?,
74 () = cancel.cancelled() => anyhow::bail!("cancelled during image pull"),
75 }
76 }
77
78 let cid = ctx
79 .docker
80 .start_long_lived(&image, &env_vec, &input.workdir, &container_name)
81 .await
82 .context("docker start failed")?;
83
84 let archive_bytes = ctx
88 .archives
89 .get_bytes(input.workspace_archive_id)
90 .ok_or_else(|| anyhow::anyhow!("source archive not found"))?;
91
92 let mkdir_cmd = vec!["mkdir".into(), "-p".into(), input.workdir.clone()];
93 let mut sink = tokio::io::sink();
94 ctx.docker
95 .exec_streaming(&cid, &mkdir_cmd, &env_vec, "/", &mut sink)
96 .await
97 .context("mkdir /workspace")?;
98
99 let tar_cmd = vec![
100 "tar".into(),
101 "-xzf".into(),
102 "-".into(),
103 "-C".into(),
104 input.workdir.clone(),
105 ];
106 ctx.docker
107 .exec_streaming_stdin(&cid, &tar_cmd, &env_vec, "/", &archive_bytes, &mut sink)
108 .await
109 .context("pipe source archive into container")?;
110
111 let result = run_in_container(ctx, &cid, &input, &env_vec, &plan).await;
112 ctx.docker.stop_remove(&cid).await;
113 result
114}
115
116async fn run_in_container(
117 ctx: &RunContext,
118 cid: &str,
119 input: &ExecutorInput,
120 env_vec: &[String],
121 plan: &DecisionPlan,
122) -> Result<StepResult> {
123 let mut writer = StepLogWriter::new(input.step_id, Arc::clone(&ctx.event_bus));
124 let docker = ctx.docker.clone();
125 let cancel = ctx.cancel.clone();
126 let cid_owned = cid.to_owned();
127 let cmd = vec!["sh".into(), "-c".into(), input.step.cmd.clone()];
128 let workdir = input.workdir.clone();
129 let env_owned = env_vec.to_vec();
130 let exec_fut = async move {
131 let rc = docker
132 .exec_streaming(&cid_owned, &cmd, &env_owned, &workdir, &mut writer)
133 .await?;
134 writer.flush_remaining();
135 Ok::<i64, anyhow::Error>(rc)
136 };
137
138 let rc = tokio::select! {
139 result = exec_fut => result.context("docker exec failed")?,
140 () = cancel.cancelled() => {
141 return Ok(StepResult {
142 exit_code: 130,
143 committed_snapshot: None,
144 artifacts: vec![],
145 });
146 }
147 };
148
149 #[allow(
150 clippy::cast_possible_truncation,
151 reason = "docker exit codes fit in i32"
152 )]
153 let exit_code = rc as i32;
154
155 let committed = if exit_code == 0 {
159 let target_tag = plan.commit_to.clone().unwrap_or_else(|| {
160 let safe: String = input
161 .step
162 .key
163 .chars()
164 .map(|c| {
165 if c.is_ascii_alphanumeric() || c == '_' || c == '-' {
166 c
167 } else {
168 '-'
169 }
170 })
171 .collect();
172 SnapshotRef::from(format!(
173 "harmont-local-ephemeral/{safe}:run-{}",
174 input.step_id.simple()
175 ))
176 });
177 match ctx
178 .docker
179 .commit_container(cid, &target_tag.to_string())
180 .await
181 {
182 Ok(_) => Some(target_tag),
183 Err(e) => {
184 tracing::warn!(step_key = %input.step.key, "docker commit failed, step still succeeded: {e:#}");
185 None
186 }
187 }
188 } else {
189 None
190 };
191
192 Ok(StepResult {
193 exit_code,
194 committed_snapshot: committed,
195 artifacts: vec![],
196 })
197}
198
199#[derive(Debug, Clone)]
200struct DecisionPlan {
201 run_command: bool,
202 commit_to: Option<SnapshotRef>,
203 hit_tag: Option<SnapshotRef>,
204}
205
206fn decision_plan(decision: &CacheDecision) -> DecisionPlan {
207 match decision {
208 CacheDecision::Hit { tag } => DecisionPlan {
209 run_command: false,
210 commit_to: None,
211 hit_tag: Some(tag.clone()),
212 },
213 CacheDecision::MissBuildAs { tag } => DecisionPlan {
214 run_command: true,
215 commit_to: Some(tag.clone()),
216 hit_tag: None,
217 },
218 CacheDecision::MissNoCommit => DecisionPlan {
219 run_command: true,
220 commit_to: None,
221 hit_tag: None,
222 },
223 }
224}
225
226fn sanitize_container_name(run_id: &str, step_key: &str) -> String {
227 let run_short: String = run_id.chars().take(8).collect();
228 let key: String = step_key
229 .chars()
230 .map(|c| {
231 if c.is_ascii_alphanumeric() || c == '_' || c == '-' {
232 c
233 } else {
234 '-'
235 }
236 })
237 .collect();
238 format!("harmont-{run_short}-{key}")
239}
240
241struct StepLogWriter {
244 step_id: Uuid,
245 bus: Arc<EventBus>,
246 buf: Vec<u8>,
247}
248
249impl StepLogWriter {
250 fn new(step_id: Uuid, bus: Arc<EventBus>) -> Self {
251 Self {
252 step_id,
253 bus,
254 buf: Vec::with_capacity(8192),
255 }
256 }
257
258 fn flush_line(&self, line: &[u8]) {
259 self.bus.emit(BuildEvent::StepLog {
260 step_id: self.step_id,
261 stream: StdStream::Stdout,
262 line: String::from_utf8_lossy(line).into_owned(),
263 ts: chrono::Utc::now(),
264 });
265 }
266
267 fn flush_remaining(&mut self) {
268 if !self.buf.is_empty() {
269 let line = std::mem::take(&mut self.buf);
270 self.flush_line(&line);
271 }
272 }
273}
274
275impl tokio::io::AsyncWrite for StepLogWriter {
276 fn poll_write(
277 mut self: Pin<&mut Self>,
278 _cx: &mut std::task::Context<'_>,
279 buf: &[u8],
280 ) -> std::task::Poll<std::io::Result<usize>> {
281 let len = buf.len();
282 for b in buf {
283 if *b == b'\n' {
284 let line = std::mem::take(&mut self.buf);
285 self.flush_line(&line);
286 } else {
287 self.buf.push(*b);
288 }
289 }
290 std::task::Poll::Ready(Ok(len))
291 }
292
293 fn poll_flush(
294 self: Pin<&mut Self>,
295 _cx: &mut std::task::Context<'_>,
296 ) -> std::task::Poll<std::io::Result<()>> {
297 std::task::Poll::Ready(Ok(()))
298 }
299
300 fn poll_shutdown(
301 mut self: Pin<&mut Self>,
302 _cx: &mut std::task::Context<'_>,
303 ) -> std::task::Poll<std::io::Result<()>> {
304 self.flush_remaining();
305 std::task::Poll::Ready(Ok(()))
306 }
307}
308
309#[cfg(test)]
310#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
311mod tests {
312 use super::*;
313
314 use hm_plugin_protocol::CacheDecision;
315
316 fn step_with_image(image: Option<&str>) -> CommandStep {
317 CommandStep {
318 key: "k".into(),
319 label: None,
320 cmd: "true".into(),
321 image: image.map(String::from),
322 env: None,
323 timeout_seconds: None,
324 cache: None,
325 runner: None,
326 runner_args: None,
327 }
328 }
329
330 fn make_input(step: CommandStep, parent_snapshot: Option<SnapshotRef>) -> ExecutorInput {
331 ExecutorInput {
332 step,
333 workspace_archive_id: hm_plugin_protocol::ArchiveId::from(uuid::Uuid::nil()),
334 env: std::collections::BTreeMap::new(),
335 workdir: "/workspace".into(),
336 run_id: uuid::Uuid::nil(),
337 step_id: uuid::Uuid::nil(),
338 cache_lookup: CacheDecision::MissNoCommit,
339 parent_snapshot,
340 }
341 }
342
343 #[test]
346 fn resolve_image_uses_step_image() {
347 let s = step_with_image(Some("rust:1.82"));
348 let input = make_input(s.clone(), None);
349 assert_eq!(resolve_image(&s, &input), "rust:1.82");
350 }
351
352 #[test]
353 fn resolve_image_fallback_alpine() {
354 let s = step_with_image(None);
355 let input = make_input(s.clone(), None);
356 assert_eq!(resolve_image(&s, &input), "alpine:latest");
357 }
358
359 #[test]
360 fn resolve_image_prefers_parent_snapshot() {
361 let s = step_with_image(Some("rust:1.82"));
362 let snap = SnapshotRef::from("harmont-local-ephemeral/base:abc123".to_string());
363 let input = make_input(s.clone(), Some(snap));
364 assert_eq!(
365 resolve_image(&s, &input),
366 "harmont-local-ephemeral/base:abc123"
367 );
368 }
369
370 #[test]
373 fn decision_hit_skips_command() {
374 let plan = decision_plan(&CacheDecision::Hit {
375 tag: SnapshotRef("cached:v1".into()),
376 });
377 assert!(!plan.run_command);
378 assert!(plan.commit_to.is_none());
379 assert_eq!(plan.hit_tag.as_ref().unwrap().0, "cached:v1");
380 }
381
382 #[test]
383 fn decision_miss_build_as_runs_and_commits() {
384 let plan = decision_plan(&CacheDecision::MissBuildAs {
385 tag: SnapshotRef("build:v2".into()),
386 });
387 assert!(plan.run_command);
388 assert_eq!(plan.commit_to.as_ref().unwrap().0, "build:v2");
389 assert!(plan.hit_tag.is_none());
390 }
391
392 #[test]
393 fn decision_miss_no_commit() {
394 let plan = decision_plan(&CacheDecision::MissNoCommit);
395 assert!(plan.run_command);
396 assert!(plan.commit_to.is_none());
397 assert!(plan.hit_tag.is_none());
398 }
399
400 #[test]
403 fn sanitize_container_name_replaces_special_chars() {
404 let name = sanitize_container_name("abcdef12-3456-7890", "my/step.key:v1");
405 assert_eq!(name, "harmont-abcdef12-my-step-key-v1");
406 }
407
408 #[test]
409 fn sanitize_container_name_preserves_valid_chars() {
410 let name = sanitize_container_name("run-id-1234", "normal_step-key");
411 assert_eq!(name, "harmont-run-id-1-normal_step-key");
412 }
413
414 #[tokio::test]
417 async fn step_log_writer_emits_on_newline() {
418 use tokio::io::AsyncWriteExt;
419
420 let bus = EventBus::new();
421 let mut rx = bus.subscribe();
422 let step_id = Uuid::new_v4();
423
424 let mut writer = StepLogWriter::new(step_id, bus);
425 writer.write_all(b"hello\nworld\n").await.unwrap();
426
427 let ev1 = rx.recv().await.unwrap();
428 let ev2 = rx.recv().await.unwrap();
429
430 match ev1 {
431 BuildEvent::StepLog {
432 step_id: sid, line, ..
433 } => {
434 assert_eq!(sid, step_id);
435 assert_eq!(line, "hello");
436 }
437 other => panic!("expected StepLog, got {other:?}"),
438 }
439 match ev2 {
440 BuildEvent::StepLog { line, .. } => assert_eq!(line, "world"),
441 other => panic!("expected StepLog, got {other:?}"),
442 }
443 }
444
445 #[tokio::test]
446 async fn step_log_writer_flushes_remaining_on_shutdown() {
447 use tokio::io::AsyncWriteExt;
448
449 let bus = EventBus::new();
450 let mut rx = bus.subscribe();
451 let step_id = Uuid::new_v4();
452
453 let mut writer = StepLogWriter::new(step_id, bus);
454 writer.write_all(b"partial").await.unwrap();
456 writer.shutdown().await.unwrap();
457
458 let ev = rx.recv().await.unwrap();
459 match ev {
460 BuildEvent::StepLog { line, .. } => assert_eq!(line, "partial"),
461 other => panic!("expected StepLog, got {other:?}"),
462 }
463 }
464}