1use super::container_arch::default_container_cli_arch;
2use crate::EngineKind;
3use crate::model::ServiceSpec;
4use crate::naming::job_name_slug;
5use anyhow::{Context, Result, anyhow, bail};
6use serde::Deserialize;
7use sha2::{Digest, Sha256};
8use std::collections::{HashMap, HashSet};
9use std::env;
10use std::fmt::Write as FmtWrite;
11use std::process::{Command, Stdio};
12use std::thread;
13use std::time::{Duration, Instant};
14use tracing::warn;
15
16const MAX_NAME_LEN: usize = 63;
17const CONTAINER_NETWORK_RETRY_ATTEMPTS: usize = 8;
18const CONTAINER_NETWORK_RETRY_DELAY_MS: u64 = 750;
19const CONTAINER_COMMAND_TIMEOUT_DEFAULT_SECS: u64 = 10;
20const SERVICE_READY_TIMEOUT_DEFAULT_SECS: u64 = 30;
21const SERVICE_READY_POLL_MS: u64 = 250;
22
23pub struct ServiceRuntime {
24 engine: EngineKind,
25 network: String,
26 containers: Vec<String>,
27 link_env: Vec<(String, String)>,
28 claimed_aliases: HashSet<String>,
29 host_aliases: Vec<(String, String)>,
30}
31
32impl ServiceRuntime {
33 pub fn start(
34 engine: EngineKind,
35 run_id: &str,
36 job_name: &str,
37 services: &[ServiceSpec],
38 base_env: &[(String, String)],
39 shared_env: &HashMap<String, String>,
40 ) -> Result<Option<Self>> {
41 if services.is_empty() {
42 return Ok(None);
43 }
44 service_supported(engine)?;
45 let clean_run_id = sanitize_identifier(run_id);
46 let network = clamp_name(&format!(
47 "opal-net-{}-{}",
48 clean_run_id,
49 job_name_slug(job_name)
50 ));
51 run_network_create(engine, &network)
52 .with_context(|| format!("failed to create network {}", network))?;
53
54 let mut runtime = ServiceRuntime {
55 engine,
56 network: network.clone(),
57 containers: Vec::new(),
58 link_env: Vec::new(),
59 claimed_aliases: HashSet::new(),
60 host_aliases: Vec::new(),
61 };
62
63 for (idx, service) in services.iter().enumerate() {
64 let aliases = runtime.aliases_for_service(idx, service)?;
65 let container_name = clamp_name(&format!(
66 "opal-svc-{}-{}-{:02}",
67 clean_run_id,
68 job_name_slug(job_name),
69 idx
70 ));
71 let ports = if matches!(engine, EngineKind::ContainerCli) {
72 match discover_container_ports(&service.image) {
73 Ok(list) => list,
74 Err(err) => {
75 warn!(
76 image = %service.image,
77 "failed to detect exposed ports for service: {err}"
78 );
79 Vec::new()
80 }
81 }
82 } else {
83 Vec::new()
84 };
85 if let Err(err) =
86 runtime.start_service(&container_name, service, &aliases, base_env, shared_env)
87 {
88 runtime.cleanup();
89 return Err(err);
90 }
91 if let Err(err) = runtime.wait_for_service_readiness(&container_name, service, &ports) {
92 runtime.cleanup();
93 return Err(err);
94 }
95 if let Some(ip) = inspect_service_ipv4(engine, &container_name) {
96 for alias in &aliases {
97 runtime.host_aliases.push((alias.clone(), ip.clone()));
98 }
99 }
100 if matches!(engine, EngineKind::ContainerCli) && !ports.is_empty() {
101 for alias in &aliases {
102 runtime
103 .link_env
104 .extend(build_service_env(alias, &container_name, &ports));
105 }
106 }
107 }
108
109 Ok(Some(runtime))
110 }
111
112 pub fn network_name(&self) -> &str {
113 &self.network
114 }
115
116 pub fn container_names(&self) -> &[String] {
117 &self.containers
118 }
119
120 pub fn cleanup(&mut self) {
121 for name in self.containers.drain(..).rev() {
122 let _ = Command::new(engine_binary(self.engine))
123 .arg("rm")
124 .arg("-f")
125 .arg(&name)
126 .status();
127 }
128 let _ = run_network_remove(self.engine, &self.network);
129 }
130
131 pub fn link_env(&self) -> &[(String, String)] {
132 &self.link_env
133 }
134
135 pub fn host_aliases(&self) -> &[(String, String)] {
136 &self.host_aliases
137 }
138
139 fn start_service(
140 &mut self,
141 container_name: &str,
142 service: &ServiceSpec,
143 aliases: &[String],
144 base_env: &[(String, String)],
145 _shared_env: &HashMap<String, String>,
146 ) -> Result<()> {
147 let mut command = service_command(self.engine, service);
148 command
149 .arg("-d")
150 .arg("--name")
151 .arg(container_name)
152 .arg("--network")
153 .arg(&self.network);
154 if !matches!(self.engine, EngineKind::ContainerCli) {
155 for alias in aliases {
156 command.arg("--network-alias").arg(alias);
157 }
158 }
159
160 let merged = merged_env(base_env, &service.variables);
161 for (key, value) in merged {
162 command.arg("--env").arg(format!("{key}={value}"));
163 }
164
165 if let Some(user) = &service.docker_user {
166 command.arg("--user").arg(user);
167 }
168
169 if !service.entrypoint.is_empty() {
170 command
171 .arg("--entrypoint")
172 .arg(service.entrypoint.join(" "));
173 }
174
175 command.arg(&service.image);
176
177 for arg in &service.command {
178 command.arg(arg);
179 }
180
181 if env::var("OPAL_DEBUG_CONTAINER")
182 .map(|v| v == "1")
183 .unwrap_or(false)
184 {
185 let program = command.get_program().to_string_lossy();
186 let args: Vec<String> = command
187 .get_args()
188 .map(|arg| arg.to_string_lossy().to_string())
189 .collect();
190 eprintln!("[opal] service command: {} {}", program, args.join(" "));
191 }
192
193 run_command_with_timeout(command, command_timeout(self.engine)).with_context(|| {
194 format!(
195 "failed to start service '{}' ({})",
196 container_name, service.image
197 )
198 })?;
199 self.containers.push(container_name.to_string());
200 Ok(())
201 }
202
203 fn aliases_for_service(&mut self, idx: usize, service: &ServiceSpec) -> Result<Vec<String>> {
204 let mut accepted = Vec::new();
205 if service.aliases.is_empty() {
206 for alias in default_service_aliases(&service.image) {
207 if self.claimed_aliases.insert(alias.clone()) {
208 accepted.push(alias);
209 }
210 }
211 } else {
212 for raw in service.aliases.clone() {
213 let alias = validate_service_alias(&raw)?;
214 if self.claimed_aliases.insert(alias.clone()) {
215 accepted.push(alias);
216 }
217 }
218 }
219
220 if accepted.is_empty() {
221 let fallback = validate_service_alias(&format!("svc-{idx}"))?;
222 self.claimed_aliases.insert(fallback.clone());
223 accepted.push(fallback);
224 }
225
226 Ok(accepted)
227 }
228
229 fn wait_for_service_readiness(
230 &self,
231 container_name: &str,
232 service: &ServiceSpec,
233 ports: &[ServicePort],
234 ) -> Result<()> {
235 let timeout = service_ready_timeout();
236 let started = Instant::now();
237 let mut confirmed_running_without_health = false;
238
239 loop {
240 let state = match inspect_service_state(self.engine, container_name) {
241 Ok(state) => state,
242 Err(err) => {
243 warn!(
244 service = container_name,
245 "failed to inspect service readiness ({err}); continuing without readiness gate"
246 );
247 return Ok(());
248 }
249 };
250
251 match readiness_from_state(&state) {
252 ServiceReadiness::Ready => {
253 if state.health.is_none() {
254 if !ports.is_empty() {
255 let Some(ip) = inspect_service_ipv4(self.engine, container_name) else {
256 if started.elapsed() >= timeout {
257 return Err(anyhow!(
258 "service '{}' ({}) did not expose a reachable IP within {}s",
259 container_name,
260 service.image,
261 timeout.as_secs()
262 ));
263 }
264 thread::sleep(Duration::from_millis(SERVICE_READY_POLL_MS));
265 continue;
266 };
267 if probe_service_ports(self.engine, &self.network, &ip, ports)? {
268 return Ok(());
269 }
270 if started.elapsed() >= timeout {
271 return Err(anyhow!(
272 "service '{}' ({}) did not accept connections on exposed ports within {}s",
273 container_name,
274 service.image,
275 timeout.as_secs()
276 ));
277 }
278 thread::sleep(Duration::from_millis(SERVICE_READY_POLL_MS));
279 continue;
280 }
281 if !confirmed_running_without_health {
282 confirmed_running_without_health = true;
283 thread::sleep(Duration::from_millis(SERVICE_READY_POLL_MS));
284 continue;
285 }
286 }
287 return Ok(());
288 }
289 ServiceReadiness::Waiting(detail) => {
290 confirmed_running_without_health = false;
291 if started.elapsed() >= timeout {
292 return Err(anyhow!(
293 "service '{}' ({}) did not become ready within {}s: {}",
294 container_name,
295 service.image,
296 timeout.as_secs(),
297 detail
298 ));
299 }
300 thread::sleep(Duration::from_millis(SERVICE_READY_POLL_MS));
301 }
302 ServiceReadiness::Failed(detail) => {
303 return Err(anyhow!(
304 "service '{}' ({}) failed readiness check: {}",
305 container_name,
306 service.image,
307 detail
308 ));
309 }
310 }
311 }
312 }
313}
314
315fn service_supported(engine: EngineKind) -> Result<()> {
316 if matches!(
317 engine,
318 EngineKind::Docker
319 | EngineKind::Orbstack
320 | EngineKind::Podman
321 | EngineKind::Nerdctl
322 | EngineKind::ContainerCli
323 ) {
324 Ok(())
325 } else {
326 Err(anyhow!(
327 "services are only supported when using docker, podman, nerdctl, orbstack, or container"
328 ))
329 }
330}
331
332fn engine_binary(engine: EngineKind) -> &'static str {
333 match engine {
334 EngineKind::Docker | EngineKind::Orbstack => "docker",
335 EngineKind::Podman => "podman",
336 EngineKind::Nerdctl => "nerdctl",
337 EngineKind::ContainerCli => "container",
338 }
339}
340
341fn service_command(engine: EngineKind, service: &ServiceSpec) -> Command {
342 let mut command = Command::new(engine_binary(engine));
343 command.arg("run");
344 if matches!(engine, EngineKind::ContainerCli) {
345 if let Some(arch) = default_container_cli_arch(service.docker_platform.as_deref()) {
346 command.arg("--arch").arg(arch);
347 }
348 } else if let Some(platform) = &service.docker_platform {
349 command.arg("--platform").arg(platform);
350 }
351 command
352}
353
354#[derive(Debug, Clone, PartialEq, Eq)]
355struct ServiceState {
356 running: bool,
357 status: Option<String>,
358 health: Option<String>,
359 exit_code: Option<i64>,
360}
361
362#[derive(Debug, Clone, PartialEq, Eq)]
363enum ServiceReadiness {
364 Ready,
365 Waiting(String),
366 Failed(String),
367}
368
369fn service_ready_timeout() -> Duration {
370 env::var("OPAL_SERVICE_READY_TIMEOUT_SECS")
371 .ok()
372 .and_then(|raw| raw.parse::<u64>().ok())
373 .filter(|seconds| *seconds > 0)
374 .map(Duration::from_secs)
375 .unwrap_or_else(|| Duration::from_secs(SERVICE_READY_TIMEOUT_DEFAULT_SECS))
376}
377
378fn inspect_service_state(engine: EngineKind, container_name: &str) -> Result<ServiceState> {
379 let mut command = Command::new(engine_binary(engine));
380 command
381 .arg("inspect")
382 .arg("--format")
383 .arg("{{json .State}}")
384 .arg(container_name);
385 let output = command
386 .output()
387 .with_context(|| format!("failed to inspect service container '{}'", container_name))?;
388 if output.status.success() {
389 return parse_service_state(&output.stdout);
390 }
391
392 let mut fallback = Command::new(engine_binary(engine));
393 fallback.arg("inspect").arg(container_name);
394 let output = fallback
395 .output()
396 .with_context(|| format!("failed to inspect service container '{}'", container_name))?;
397 if !output.status.success() {
398 return Err(command_failed(
399 &fallback,
400 &output.stdout,
401 &output.stderr,
402 output.status.code(),
403 ));
404 }
405 parse_service_state(&output.stdout)
406}
407
408fn inspect_service_ipv4(engine: EngineKind, container_name: &str) -> Option<String> {
409 let mut command = Command::new(engine_binary(engine));
410 command.arg("inspect").arg(container_name);
411 let output = command.output().ok()?;
412 if !output.status.success() {
413 return None;
414 }
415 parse_service_ipv4(&output.stdout).ok().flatten()
416}
417
418fn parse_service_ipv4(payload: &[u8]) -> Result<Option<String>> {
419 let value: serde_json::Value = serde_json::from_slice(payload)
420 .context("failed to parse service inspect output as json")?;
421 let service = value
422 .as_array()
423 .and_then(|items| items.first())
424 .or_else(|| value.as_object().map(|_| &value))
425 .ok_or_else(|| anyhow!("service inspect output was not an object or array"))?;
426
427 if let Some(ip) = service
428 .get("networks")
429 .and_then(|networks| networks.as_array())
430 .and_then(|items| items.first())
431 .and_then(|network| network.get("ipv4Address"))
432 .and_then(|value| value.as_str())
433 {
434 return Ok(ip.split('/').next().map(str::to_string));
435 }
436
437 if let Some(networks) = service
438 .get("NetworkSettings")
439 .and_then(|settings| settings.get("Networks"))
440 .and_then(|networks| networks.as_object())
441 {
442 for network in networks.values() {
443 if let Some(ip) = network.get("IPAddress").and_then(|value| value.as_str())
444 && !ip.is_empty()
445 {
446 return Ok(Some(ip.to_string()));
447 }
448 }
449 }
450
451 Ok(None)
452}
453
454fn probe_service_ports(
455 engine: EngineKind,
456 network: &str,
457 host: &str,
458 ports: &[ServicePort],
459) -> Result<bool> {
460 if ports.is_empty() {
461 return Ok(true);
462 }
463
464 let checks = ports
465 .iter()
466 .filter(|port| port.proto == "tcp")
467 .map(|port| format!("nc -z {} {}", shell_escape(host), port.port))
468 .collect::<Vec<_>>();
469 if checks.is_empty() {
470 return Ok(true);
471 }
472
473 let mut command = Command::new(engine_binary(engine));
474 command.arg("run").arg("--rm").arg("--network").arg(network);
475 if matches!(engine, EngineKind::ContainerCli)
476 && let Some(arch) = default_container_cli_arch(None)
477 {
478 command.arg("--arch").arg(arch);
479 }
480 let script = checks.join(" && ");
481 let status = command
482 .arg("docker.io/library/alpine:3.19")
483 .arg("sh")
484 .arg("-lc")
485 .arg(script)
486 .status()
487 .with_context(|| "failed to run service connectivity probe")?;
488 Ok(status.success())
489}
490
491fn shell_escape(value: &str) -> String {
492 format!("'{}'", value.replace('\'', "'\"'\"'"))
493}
494
495fn parse_service_state(payload: &[u8]) -> Result<ServiceState> {
496 let value: serde_json::Value = serde_json::from_slice(payload)
497 .context("failed to parse service inspect output as json")?;
498 let service = value
499 .as_array()
500 .and_then(|items| items.first())
501 .or_else(|| value.as_object().map(|_| &value))
502 .ok_or_else(|| anyhow!("service inspect output was not an object or array"))?;
503
504 let state = if let Some(state) = service.get("State") {
505 state
506 } else if service.get("Running").is_some()
507 || service.get("Status").is_some()
508 || service.get("status").is_some()
509 {
510 service
511 } else {
512 return Err(anyhow!("service inspect output missing State field"));
513 };
514
515 let running = state
516 .get("Running")
517 .and_then(|v| v.as_bool())
518 .or_else(|| state.get("running").and_then(|v| v.as_bool()))
519 .unwrap_or_else(|| {
520 state
521 .get("Status")
522 .and_then(|v| v.as_str())
523 .or_else(|| state.get("status").and_then(|v| v.as_str()))
524 .is_some_and(|status| status.eq_ignore_ascii_case("running"))
525 });
526 let status = state
527 .get("Status")
528 .and_then(|v| v.as_str())
529 .or_else(|| state.get("status").and_then(|v| v.as_str()))
530 .map(|s| s.to_ascii_lowercase());
531 let health = state
532 .get("Health")
533 .and_then(|health| health.get("Status"))
534 .and_then(|status| status.as_str())
535 .or_else(|| {
536 state
537 .get("health")
538 .and_then(|health| health.get("status"))
539 .and_then(|status| status.as_str())
540 })
541 .map(|s| s.to_ascii_lowercase());
542 let exit_code = state
543 .get("ExitCode")
544 .and_then(|v| v.as_i64())
545 .or_else(|| state.get("exitCode").and_then(|v| v.as_i64()));
546
547 Ok(ServiceState {
548 running,
549 status,
550 health,
551 exit_code,
552 })
553}
554
555fn readiness_from_state(state: &ServiceState) -> ServiceReadiness {
556 if !state.running {
557 if matches!(state.status.as_deref(), Some("exited" | "dead" | "stopped"))
558 || state.exit_code.is_some_and(|code| code != 0)
559 {
560 return ServiceReadiness::Failed(format!(
561 "status={}, running=false, exit_code={}",
562 state.status.as_deref().unwrap_or("unknown"),
563 state
564 .exit_code
565 .map(|code| code.to_string())
566 .unwrap_or_else(|| "unknown".to_string())
567 ));
568 }
569 return ServiceReadiness::Waiting(format!(
570 "status={}, running=false",
571 state.status.as_deref().unwrap_or("unknown")
572 ));
573 }
574
575 match state.health.as_deref() {
576 Some("healthy") => ServiceReadiness::Ready,
577 Some("unhealthy") => ServiceReadiness::Failed("health=unhealthy".to_string()),
578 Some(status) => ServiceReadiness::Waiting(format!("health={status}")),
579 None => ServiceReadiness::Ready,
580 }
581}
582
583fn sanitize_identifier(input: &str) -> String {
584 let filtered: String = input
585 .chars()
586 .filter(|c| c.is_ascii_alphanumeric())
587 .collect();
588 if filtered.is_empty() {
589 "opal".to_string()
590 } else {
591 filtered
592 }
593}
594
595fn clamp_name(base: &str) -> String {
596 if base.len() <= MAX_NAME_LEN {
597 return base.to_string();
598 }
599 let mut hasher = Sha256::new();
600 hasher.update(base.as_bytes());
601 let digest = hasher.finalize();
602 let mut suffix = String::with_capacity(8);
603 for byte in digest.iter().take(4) {
604 let _ = FmtWrite::write_fmt(&mut suffix, format_args!("{:02x}", byte));
605 }
606 let prefix_len = MAX_NAME_LEN.saturating_sub(suffix.len() + 1);
607 let prefix: String = base.chars().take(prefix_len).collect();
608 format!("{prefix}-{suffix}")
609}
610
611fn validate_service_alias(alias: &str) -> Result<String> {
612 let normalized = alias.trim().to_ascii_lowercase();
613 if normalized.is_empty() {
614 bail!("service alias must not be empty");
615 }
616 if normalized.starts_with('-') || normalized.ends_with('-') {
617 bail!("service alias '{}' must not start or end with '-'", alias);
618 }
619 if !normalized
620 .chars()
621 .all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '-')
622 {
623 bail!(
624 "service alias '{}' contains unsupported characters; use lowercase letters, digits, or '-'",
625 alias
626 );
627 }
628 Ok(normalized)
629}
630
631#[derive(Debug, Clone)]
632struct ServicePort {
633 port: u16,
634 proto: String,
635}
636
637fn discover_container_ports(image: &str) -> Result<Vec<ServicePort>> {
638 let output = Command::new("container")
639 .arg("image")
640 .arg("inspect")
641 .arg(image)
642 .output()
643 .context("failed to inspect container image")?;
644 if !output.status.success() {
645 return Ok(Vec::new());
646 }
647 let infos: Vec<ContainerImageInspect> = serde_json::from_slice(&output.stdout)?;
648 let mut ports = Vec::new();
649 let mut seen = HashSet::new();
650 for info in infos {
652 for variant in info.variants {
653 for entry in variant.config.history {
654 if let Some(cmd) = entry.created_by
655 && let Some(idx) = cmd.find("EXPOSE map[")
656 {
657 let rest = &cmd[idx + "EXPOSE map[".len()..];
658 if let Some(end) = rest.find(']') {
659 let map = &rest[..end];
660 for token in map.split_whitespace() {
661 let cleaned = token.trim_matches(|c| c == ',' || c == '{' || c == '}');
662 if cleaned.is_empty() {
663 continue;
664 }
665 let mut parts = cleaned.split('/');
666 let port_str = parts.next().unwrap_or("");
667 let proto_part = parts.next().unwrap_or("tcp");
668 if let Ok(port) = port_str.parse::<u16>() {
669 let proto = proto_part
670 .split(':')
671 .next()
672 .unwrap_or("tcp")
673 .to_ascii_lowercase();
674 if seen.insert((port, proto.clone())) {
675 ports.push(ServicePort { port, proto });
676 }
677 }
678 }
679 }
680 }
681 }
682 }
683 }
684 Ok(ports)
685}
686
687fn build_service_env(alias: &str, host: &str, ports: &[ServicePort]) -> Vec<(String, String)> {
688 if ports.is_empty() {
689 return Vec::new();
690 }
691 let mut envs = Vec::new();
692 let alias_key: String = alias
693 .chars()
694 .map(|c| {
695 if c.is_ascii_alphanumeric() {
696 c.to_ascii_uppercase()
697 } else {
698 '_'
699 }
700 })
701 .collect();
702 let primary = &ports[0];
703 envs.push((
704 format!("{}_PORT", alias_key),
705 format!("{}://{}:{}", primary.proto, host, primary.port),
706 ));
707 for port in ports {
708 let proto_upper = port.proto.to_ascii_uppercase();
709 let proto_lower = port.proto.to_ascii_lowercase();
710 let base = format!("{}_PORT_{}_{}", alias_key, port.port, proto_upper);
711 envs.push((
712 base.clone(),
713 format!("{}://{}:{}", proto_lower, host, port.port),
714 ));
715 envs.push((format!("{}_ADDR", base), host.to_string()));
716 envs.push((format!("{}_PORT", base), port.port.to_string()));
717 envs.push((format!("{}_PROTO", base), proto_lower));
718 }
719 envs
720}
721
722#[derive(Deserialize)]
723struct ContainerImageInspect {
724 variants: Vec<ContainerVariant>,
725}
726
727#[derive(Deserialize)]
728struct ContainerVariant {
729 config: VariantConfig,
730}
731
732#[derive(Deserialize)]
733struct VariantConfig {
734 history: Vec<HistoryEntry>,
735}
736
737#[derive(Deserialize)]
738struct HistoryEntry {
739 #[serde(rename = "created_by")]
740 created_by: Option<String>,
741}
742
743fn run_command(mut cmd: Command) -> Result<()> {
744 let output = cmd.output()?;
745 if output.status.success() {
746 Ok(())
747 } else {
748 Err(command_failed(
749 &cmd,
750 &output.stdout,
751 &output.stderr,
752 output.status.code(),
753 ))
754 }
755}
756
757fn run_command_with_timeout(mut cmd: Command, timeout: Option<Duration>) -> Result<()> {
758 let Some(timeout) = timeout else {
759 return run_command(cmd);
760 };
761 cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
762 let mut child = cmd.spawn()?;
763 let started = Instant::now();
764
765 loop {
766 if child.try_wait()?.is_some() {
767 let output = child.wait_with_output()?;
768 if output.status.success() {
769 return Ok(());
770 }
771 return Err(command_failed(
772 &cmd,
773 &output.stdout,
774 &output.stderr,
775 output.status.code(),
776 ));
777 }
778
779 if started.elapsed() >= timeout {
780 let _ = child.kill();
781 let output = child.wait_with_output().ok();
782 let (stdout, stderr, code) = if let Some(output) = output {
783 (output.stdout, output.stderr, output.status.code())
784 } else {
785 (Vec::new(), Vec::new(), None)
786 };
787 return Err(anyhow!(
788 "command {:?} timed out after {}s{}",
789 &cmd,
790 timeout.as_secs(),
791 command_failed_detail(&stdout, &stderr, code)
792 ));
793 }
794
795 thread::sleep(Duration::from_millis(100));
796 }
797}
798
799fn command_timeout(engine: EngineKind) -> Option<Duration> {
800 if matches!(engine, EngineKind::ContainerCli) {
801 Some(container_command_timeout())
802 } else {
803 None
804 }
805}
806
807fn container_command_timeout() -> Duration {
808 env::var("OPAL_CONTAINER_COMMAND_TIMEOUT_SECS")
809 .ok()
810 .and_then(|raw| raw.parse::<u64>().ok())
811 .filter(|seconds| *seconds > 0)
812 .map(Duration::from_secs)
813 .unwrap_or_else(|| Duration::from_secs(CONTAINER_COMMAND_TIMEOUT_DEFAULT_SECS))
814}
815
816fn command_failed(cmd: &Command, stdout: &[u8], stderr: &[u8], code: Option<i32>) -> anyhow::Error {
817 anyhow!(
818 "command {:?} exited with status {:?}{}",
819 cmd,
820 code,
821 command_failed_detail(stdout, stderr, code)
822 )
823}
824
825fn command_failed_detail(stdout: &[u8], stderr: &[u8], _code: Option<i32>) -> String {
826 let stdout = String::from_utf8_lossy(stdout).trim().to_string();
827 let stderr = String::from_utf8_lossy(stderr).trim().to_string();
828 match (stdout.is_empty(), stderr.is_empty()) {
829 (true, true) => String::new(),
830 (false, true) => format!(": {stdout}"),
831 (true, false) => format!(": {stderr}"),
832 (false, false) => format!(": stdout={stdout}; stderr={stderr}"),
833 }
834}
835
836fn run_network_create(engine: EngineKind, network: &str) -> Result<()> {
837 run_network_command(engine, "create", network)
838}
839
840fn run_network_remove(engine: EngineKind, network: &str) -> Result<()> {
841 run_network_command(engine, "rm", network)
842}
843
844fn run_network_command(engine: EngineKind, action: &str, network: &str) -> Result<()> {
845 let attempts = if matches!(engine, EngineKind::ContainerCli) {
846 CONTAINER_NETWORK_RETRY_ATTEMPTS
847 } else {
848 1
849 };
850
851 let mut last_error = None;
852 for attempt in 0..attempts {
853 let mut command = Command::new(engine_binary(engine));
854 command.arg("network").arg(action).arg(network);
855 match run_command_with_timeout(command, command_timeout(engine)) {
856 Ok(()) => return Ok(()),
857 Err(err) => {
858 if matches!(engine, EngineKind::ContainerCli)
859 && should_retry_container_network_error(&err.to_string())
860 && attempt + 1 < attempts
861 {
862 warn!(
863 network,
864 action,
865 attempt = attempt + 1,
866 "container network command timed out; retrying"
867 );
868 thread::sleep(Duration::from_millis(
869 CONTAINER_NETWORK_RETRY_DELAY_MS * (attempt + 1) as u64,
870 ));
871 last_error = Some(err);
872 continue;
873 }
874 return Err(err);
875 }
876 }
877 }
878
879 Err(last_error.unwrap_or_else(|| anyhow!("network command failed without an error")))
880}
881
882fn should_retry_container_network_error(message: &str) -> bool {
883 message.contains("XPC timeout for request to com.apple.container.apiserver/networkCreate")
884 || message
885 .contains("XPC timeout for request to com.apple.container.apiserver/networkDelete")
886 || message.contains("Connection invalid")
887 || message.contains("apiserver")
888}
889
890fn merged_env(
891 base: &[(String, String)],
892 overrides: &HashMap<String, String>,
893) -> Vec<(String, String)> {
894 let lookup: HashMap<String, String> = base.iter().cloned().collect();
895 let mut env = base.to_vec();
896 for (_, value) in &mut env {
897 *value = crate::env::expand_value(value, &lookup);
898 }
899 let mut map: HashMap<String, String> = env.into_iter().collect();
900 for (key, value) in overrides {
901 map.insert(key.clone(), value.clone());
902 }
903 map.into_iter().collect()
904}
905
906fn default_service_aliases(image: &str) -> Vec<String> {
907 let without_tag = image.split(':').next().unwrap_or(image);
908 let primary = without_tag.replace('/', "__");
909 let secondary = without_tag.replace('/', "-");
910 let mut aliases = Vec::new();
911 if !primary.is_empty() {
912 aliases.push(primary);
913 }
914 if !secondary.is_empty() && !aliases.iter().any(|existing| existing == &secondary) {
915 aliases.push(secondary);
916 }
917 if aliases.is_empty() {
918 aliases.push("service".to_string());
919 }
920 aliases
921}
922
923#[cfg(test)]
924mod tests {
925 use super::{
926 ServiceReadiness, ServiceRuntime, ServiceState, parse_service_ipv4, parse_service_state,
927 readiness_from_state, service_command, should_retry_container_network_error,
928 };
929 use crate::EngineKind;
930 use crate::model::ServiceSpec;
931 use std::collections::HashMap;
932 use std::process::Command;
933 use std::time::Duration;
934
935 #[test]
936 fn retries_container_network_xpc_timeouts() {
937 assert!(should_retry_container_network_error(
938 "XPC timeout for request to com.apple.container.apiserver/networkCreate"
939 ));
940 assert!(should_retry_container_network_error(
941 "XPC timeout for request to com.apple.container.apiserver/networkDelete"
942 ));
943 assert!(!should_retry_container_network_error(
944 "cannot delete subnet with referring containers"
945 ));
946 }
947
948 #[test]
949 fn parse_service_state_reads_running_and_health_status() {
950 let payload = br#"[{"State":{"Running":true,"Status":"running","ExitCode":0,"Health":{"Status":"starting"}}}]"#;
951 let state = parse_service_state(payload).expect("parse service state");
952
953 assert!(state.running);
954 assert_eq!(state.status.as_deref(), Some("running"));
955 assert_eq!(state.health.as_deref(), Some("starting"));
956 assert_eq!(state.exit_code, Some(0));
957 }
958
959 #[test]
960 fn parse_service_state_accepts_direct_state_object() {
961 let payload = br#"{"Running":false,"Status":"exited","ExitCode":1}"#;
962 let state = parse_service_state(payload).expect("parse service state");
963
964 assert!(!state.running);
965 assert_eq!(state.status.as_deref(), Some("exited"));
966 assert_eq!(state.exit_code, Some(1));
967 }
968
969 #[test]
970 fn parse_service_state_accepts_container_cli_shape() {
971 let payload = br#"[{"status":"exited","exitCode":1}]"#;
972 let state = parse_service_state(payload).expect("parse service state");
973
974 assert!(!state.running);
975 assert_eq!(state.status.as_deref(), Some("exited"));
976 assert_eq!(state.exit_code, Some(1));
977 }
978
979 #[test]
980 fn readiness_from_state_is_ready_without_healthcheck() {
981 let state = ServiceState {
982 running: true,
983 status: Some("running".to_string()),
984 health: None,
985 exit_code: Some(0),
986 };
987
988 assert!(matches!(
989 readiness_from_state(&state),
990 ServiceReadiness::Ready
991 ));
992 }
993
994 #[test]
995 fn readiness_from_state_waits_while_healthcheck_is_starting() {
996 let state = ServiceState {
997 running: true,
998 status: Some("running".to_string()),
999 health: Some("starting".to_string()),
1000 exit_code: Some(0),
1001 };
1002
1003 match readiness_from_state(&state) {
1004 ServiceReadiness::Waiting(detail) => assert!(detail.contains("starting")),
1005 other => panic!("expected waiting readiness, got {other:?}"),
1006 }
1007 }
1008
1009 #[test]
1010 fn readiness_from_state_fails_when_service_exits() {
1011 let state = ServiceState {
1012 running: false,
1013 status: Some("exited".to_string()),
1014 health: None,
1015 exit_code: Some(1),
1016 };
1017
1018 match readiness_from_state(&state) {
1019 ServiceReadiness::Failed(detail) => assert!(detail.contains("exit_code=1")),
1020 other => panic!("expected failed readiness, got {other:?}"),
1021 }
1022 }
1023
1024 #[test]
1025 fn readiness_from_state_fails_when_healthcheck_unhealthy() {
1026 let state = ServiceState {
1027 running: true,
1028 status: Some("running".to_string()),
1029 health: Some("unhealthy".to_string()),
1030 exit_code: Some(0),
1031 };
1032
1033 match readiness_from_state(&state) {
1034 ServiceReadiness::Failed(detail) => assert!(detail.contains("unhealthy")),
1035 other => panic!("expected failed readiness, got {other:?}"),
1036 }
1037 }
1038
1039 #[test]
1040 fn aliases_for_service_preserves_multiple_unique_aliases() {
1041 let mut runtime = ServiceRuntime {
1042 engine: EngineKind::Docker,
1043 network: "net".into(),
1044 containers: Vec::new(),
1045 link_env: Vec::new(),
1046 claimed_aliases: Default::default(),
1047 host_aliases: Vec::new(),
1048 };
1049 let service = ServiceSpec {
1050 image: "redis:7".into(),
1051 aliases: vec!["cache".into(), "redis".into()],
1052 docker_platform: None,
1053 docker_user: None,
1054 entrypoint: Vec::new(),
1055 command: Vec::new(),
1056 variables: HashMap::new(),
1057 };
1058
1059 let aliases = runtime
1060 .aliases_for_service(0, &service)
1061 .expect("aliases resolve");
1062
1063 assert_eq!(aliases, vec!["cache", "redis"]);
1064 }
1065
1066 #[test]
1067 fn aliases_for_service_uses_gitlab_style_default_aliases() {
1068 let mut runtime = ServiceRuntime {
1069 engine: EngineKind::Docker,
1070 network: "net".into(),
1071 containers: Vec::new(),
1072 link_env: Vec::new(),
1073 claimed_aliases: Default::default(),
1074 host_aliases: Vec::new(),
1075 };
1076 let service = ServiceSpec {
1077 image: "tutum/wordpress:latest".into(),
1078 aliases: Vec::new(),
1079 docker_platform: None,
1080 docker_user: None,
1081 entrypoint: Vec::new(),
1082 command: Vec::new(),
1083 variables: HashMap::new(),
1084 };
1085
1086 let aliases = runtime
1087 .aliases_for_service(0, &service)
1088 .expect("aliases resolve");
1089
1090 assert_eq!(aliases, vec!["tutum__wordpress", "tutum-wordpress"]);
1091 }
1092
1093 #[test]
1094 fn aliases_for_service_falls_back_when_aliases_conflict() {
1095 let mut runtime = ServiceRuntime {
1096 engine: EngineKind::Docker,
1097 network: "net".into(),
1098 containers: Vec::new(),
1099 link_env: Vec::new(),
1100 claimed_aliases: Default::default(),
1101 host_aliases: Vec::new(),
1102 };
1103 let first = ServiceSpec {
1104 image: "redis:7".into(),
1105 aliases: vec!["cache".into()],
1106 docker_platform: None,
1107 docker_user: None,
1108 entrypoint: Vec::new(),
1109 command: Vec::new(),
1110 variables: HashMap::new(),
1111 };
1112 let second = ServiceSpec {
1113 image: "postgres:16".into(),
1114 aliases: vec!["cache".into()],
1115 docker_platform: None,
1116 docker_user: None,
1117 entrypoint: Vec::new(),
1118 command: Vec::new(),
1119 variables: HashMap::new(),
1120 };
1121
1122 assert_eq!(
1123 runtime.aliases_for_service(0, &first).unwrap(),
1124 vec!["cache"]
1125 );
1126 assert_eq!(
1127 runtime.aliases_for_service(1, &second).unwrap(),
1128 vec!["svc-1"]
1129 );
1130 }
1131
1132 #[test]
1133 fn aliases_for_service_falls_back_after_default_aliases_conflict() {
1134 let mut runtime = ServiceRuntime {
1135 engine: EngineKind::Docker,
1136 network: "net".into(),
1137 containers: Vec::new(),
1138 link_env: Vec::new(),
1139 claimed_aliases: Default::default(),
1140 host_aliases: Vec::new(),
1141 };
1142 let first = ServiceSpec {
1143 image: "tutum/wordpress:latest".into(),
1144 aliases: Vec::new(),
1145 docker_platform: None,
1146 docker_user: None,
1147 entrypoint: Vec::new(),
1148 command: Vec::new(),
1149 variables: HashMap::new(),
1150 };
1151 let second = ServiceSpec {
1152 image: "tutum/wordpress:latest".into(),
1153 aliases: Vec::new(),
1154 docker_platform: None,
1155 docker_user: None,
1156 entrypoint: Vec::new(),
1157 command: Vec::new(),
1158 variables: HashMap::new(),
1159 };
1160
1161 assert_eq!(
1162 runtime.aliases_for_service(0, &first).unwrap(),
1163 vec!["tutum__wordpress", "tutum-wordpress"]
1164 );
1165 assert_eq!(
1166 runtime.aliases_for_service(1, &second).unwrap(),
1167 vec!["svc-1"]
1168 );
1169 }
1170
1171 #[test]
1172 fn aliases_for_service_rejects_invalid_aliases() {
1173 let mut runtime = ServiceRuntime {
1174 engine: EngineKind::Docker,
1175 network: "net".into(),
1176 containers: Vec::new(),
1177 link_env: Vec::new(),
1178 claimed_aliases: Default::default(),
1179 host_aliases: Vec::new(),
1180 };
1181 let service = ServiceSpec {
1182 image: "redis:7".into(),
1183 aliases: vec!["bad_alias".into()],
1184 docker_platform: None,
1185 docker_user: None,
1186 entrypoint: Vec::new(),
1187 command: Vec::new(),
1188 variables: HashMap::new(),
1189 };
1190
1191 let err = runtime
1192 .aliases_for_service(0, &service)
1193 .expect_err("alias must error");
1194 assert!(err.to_string().contains("unsupported characters"));
1195 }
1196
1197 #[test]
1198 fn service_command_for_docker_forwards_platform_and_user() {
1199 let service = ServiceSpec {
1200 image: "redis:7".into(),
1201 aliases: vec!["cache".into()],
1202 docker_platform: Some("linux/arm64/v8".into()),
1203 docker_user: Some("1000:1000".into()),
1204 entrypoint: Vec::new(),
1205 command: Vec::new(),
1206 variables: HashMap::new(),
1207 };
1208
1209 let mut command = service_command(EngineKind::Docker, &service);
1210 command
1211 .arg("--user")
1212 .arg(service.docker_user.as_deref().unwrap());
1213 let args: Vec<String> = command
1214 .get_args()
1215 .map(|arg| arg.to_string_lossy().to_string())
1216 .collect();
1217
1218 assert!(
1219 args.windows(2)
1220 .any(|pair| pair == ["--platform", "linux/arm64/v8"])
1221 );
1222 assert!(args.windows(2).any(|pair| pair == ["--user", "1000:1000"]));
1223 }
1224
1225 #[test]
1226 fn service_command_for_container_cli_translates_platform_to_arch() {
1227 let service = ServiceSpec {
1228 image: "redis:7".into(),
1229 aliases: vec!["cache".into()],
1230 docker_platform: Some("linux/amd64".into()),
1231 docker_user: Some("1000:1000".into()),
1232 entrypoint: Vec::new(),
1233 command: Vec::new(),
1234 variables: HashMap::new(),
1235 };
1236
1237 let mut command = service_command(EngineKind::ContainerCli, &service);
1238 command
1239 .arg("--user")
1240 .arg(service.docker_user.as_deref().unwrap());
1241 let args: Vec<String> = command
1242 .get_args()
1243 .map(|arg| arg.to_string_lossy().to_string())
1244 .collect();
1245
1246 assert!(args.windows(2).any(|pair| pair == ["--arch", "x86_64"]));
1247 assert!(args.windows(2).any(|pair| pair == ["--user", "1000:1000"]));
1248 }
1249
1250 #[test]
1251 fn run_command_with_timeout_fails_fast() {
1252 let mut command = Command::new("sh");
1253 command.arg("-lc").arg("sleep 1");
1254
1255 let err = super::run_command_with_timeout(command, Some(Duration::from_millis(50)))
1256 .expect_err("command should time out");
1257
1258 assert!(err.to_string().contains("timed out"));
1259 }
1260
1261 #[test]
1262 fn parse_service_ipv4_accepts_container_cli_shape() {
1263 let payload = br#"[{"networks":[{"ipv4Address":"192.168.64.57/24"}]}]"#;
1264 assert_eq!(
1265 parse_service_ipv4(payload).unwrap(),
1266 Some("192.168.64.57".into())
1267 );
1268 }
1269
1270 #[test]
1271 fn parse_service_ipv4_accepts_docker_shape() {
1272 let payload = br#"[{"NetworkSettings":{"Networks":{"opal":{"IPAddress":"172.18.0.2"}}}}]"#;
1273 assert_eq!(
1274 parse_service_ipv4(payload).unwrap(),
1275 Some("172.18.0.2".into())
1276 );
1277 }
1278
1279 #[test]
1280 fn merged_env_does_not_expand_service_only_variables() {
1281 let merged = super::merged_env(
1282 &[("BASE".into(), "hello".into())],
1283 &HashMap::from([
1284 ("BASE".into(), "override".into()),
1285 ("SERVICE_ONLY".into(), "$BASE-world".into()),
1286 ]),
1287 );
1288 let merged_map: HashMap<_, _> = merged.into_iter().collect();
1289
1290 assert_eq!(merged_map.get("BASE").map(String::as_str), Some("override"));
1291 assert_eq!(
1292 merged_map.get("SERVICE_ONLY").map(String::as_str),
1293 Some("$BASE-world")
1294 );
1295 }
1296}