Skip to main content

opal/executor/
services.rs

1use super::container_arch::default_container_cli_arch;
2use crate::EngineKind;
3use crate::model::ServiceSpec;
4use crate::naming::job_name_slug;
5use anyhow::{Context, Result, anyhow, bail};
6use serde::Deserialize;
7use sha2::{Digest, Sha256};
8use std::collections::{HashMap, HashSet};
9use std::env;
10use std::fmt::Write as FmtWrite;
11use std::process::{Command, Stdio};
12use std::thread;
13use std::time::{Duration, Instant};
14use tracing::warn;
15
16const MAX_NAME_LEN: usize = 63;
17const CONTAINER_NETWORK_RETRY_ATTEMPTS: usize = 8;
18const CONTAINER_NETWORK_RETRY_DELAY_MS: u64 = 750;
19const CONTAINER_COMMAND_TIMEOUT_DEFAULT_SECS: u64 = 10;
20const SERVICE_READY_TIMEOUT_DEFAULT_SECS: u64 = 30;
21const SERVICE_READY_POLL_MS: u64 = 250;
22
23pub struct ServiceRuntime {
24    engine: EngineKind,
25    network: String,
26    containers: Vec<String>,
27    link_env: Vec<(String, String)>,
28    claimed_aliases: HashSet<String>,
29    host_aliases: Vec<(String, String)>,
30}
31
32impl ServiceRuntime {
33    pub fn start(
34        engine: EngineKind,
35        run_id: &str,
36        job_name: &str,
37        services: &[ServiceSpec],
38        base_env: &[(String, String)],
39        shared_env: &HashMap<String, String>,
40    ) -> Result<Option<Self>> {
41        if services.is_empty() {
42            return Ok(None);
43        }
44        service_supported(engine)?;
45        let clean_run_id = sanitize_identifier(run_id);
46        let network = clamp_name(&format!(
47            "opal-net-{}-{}",
48            clean_run_id,
49            job_name_slug(job_name)
50        ));
51        run_network_create(engine, &network)
52            .with_context(|| format!("failed to create network {}", network))?;
53
54        let mut runtime = ServiceRuntime {
55            engine,
56            network: network.clone(),
57            containers: Vec::new(),
58            link_env: Vec::new(),
59            claimed_aliases: HashSet::new(),
60            host_aliases: Vec::new(),
61        };
62
63        for (idx, service) in services.iter().enumerate() {
64            let aliases = runtime.aliases_for_service(idx, service)?;
65            let container_name = clamp_name(&format!(
66                "opal-svc-{}-{}-{:02}",
67                clean_run_id,
68                job_name_slug(job_name),
69                idx
70            ));
71            let ports = if matches!(engine, EngineKind::ContainerCli) {
72                match discover_container_ports(&service.image) {
73                    Ok(list) => list,
74                    Err(err) => {
75                        warn!(
76                            image = %service.image,
77                            "failed to detect exposed ports for service: {err}"
78                        );
79                        Vec::new()
80                    }
81                }
82            } else {
83                Vec::new()
84            };
85            if let Err(err) =
86                runtime.start_service(&container_name, service, &aliases, base_env, shared_env)
87            {
88                runtime.cleanup();
89                return Err(err);
90            }
91            if let Err(err) = runtime.wait_for_service_readiness(&container_name, service, &ports) {
92                runtime.cleanup();
93                return Err(err);
94            }
95            if let Some(ip) = inspect_service_ipv4(engine, &container_name) {
96                for alias in &aliases {
97                    runtime.host_aliases.push((alias.clone(), ip.clone()));
98                }
99            }
100            if matches!(engine, EngineKind::ContainerCli) && !ports.is_empty() {
101                for alias in &aliases {
102                    runtime
103                        .link_env
104                        .extend(build_service_env(alias, &container_name, &ports));
105                }
106            }
107        }
108
109        Ok(Some(runtime))
110    }
111
112    pub fn network_name(&self) -> &str {
113        &self.network
114    }
115
116    pub fn container_names(&self) -> &[String] {
117        &self.containers
118    }
119
120    pub fn cleanup(&mut self) {
121        for name in self.containers.drain(..).rev() {
122            let _ = Command::new(engine_binary(self.engine))
123                .arg("rm")
124                .arg("-f")
125                .arg(&name)
126                .status();
127        }
128        let _ = run_network_remove(self.engine, &self.network);
129    }
130
131    pub fn link_env(&self) -> &[(String, String)] {
132        &self.link_env
133    }
134
135    pub fn host_aliases(&self) -> &[(String, String)] {
136        &self.host_aliases
137    }
138
139    fn start_service(
140        &mut self,
141        container_name: &str,
142        service: &ServiceSpec,
143        aliases: &[String],
144        base_env: &[(String, String)],
145        _shared_env: &HashMap<String, String>,
146    ) -> Result<()> {
147        let mut command = service_command(self.engine, service);
148        command
149            .arg("-d")
150            .arg("--name")
151            .arg(container_name)
152            .arg("--network")
153            .arg(&self.network);
154        if !matches!(self.engine, EngineKind::ContainerCli) {
155            for alias in aliases {
156                command.arg("--network-alias").arg(alias);
157            }
158        }
159
160        let merged = merged_env(base_env, &service.variables);
161        for (key, value) in merged {
162            command.arg("--env").arg(format!("{key}={value}"));
163        }
164
165        if let Some(user) = &service.docker_user {
166            command.arg("--user").arg(user);
167        }
168
169        if !service.entrypoint.is_empty() {
170            command
171                .arg("--entrypoint")
172                .arg(service.entrypoint.join(" "));
173        }
174
175        command.arg(&service.image);
176
177        for arg in &service.command {
178            command.arg(arg);
179        }
180
181        if env::var("OPAL_DEBUG_CONTAINER")
182            .map(|v| v == "1")
183            .unwrap_or(false)
184        {
185            let program = command.get_program().to_string_lossy();
186            let args: Vec<String> = command
187                .get_args()
188                .map(|arg| arg.to_string_lossy().to_string())
189                .collect();
190            eprintln!("[opal] service command: {} {}", program, args.join(" "));
191        }
192
193        run_command_with_timeout(command, command_timeout(self.engine)).with_context(|| {
194            format!(
195                "failed to start service '{}' ({})",
196                container_name, service.image
197            )
198        })?;
199        self.containers.push(container_name.to_string());
200        Ok(())
201    }
202
203    fn aliases_for_service(&mut self, idx: usize, service: &ServiceSpec) -> Result<Vec<String>> {
204        let mut accepted = Vec::new();
205        if service.aliases.is_empty() {
206            for alias in default_service_aliases(&service.image) {
207                if self.claimed_aliases.insert(alias.clone()) {
208                    accepted.push(alias);
209                }
210            }
211        } else {
212            for raw in service.aliases.clone() {
213                let alias = validate_service_alias(&raw)?;
214                if self.claimed_aliases.insert(alias.clone()) {
215                    accepted.push(alias);
216                }
217            }
218        }
219
220        if accepted.is_empty() {
221            let fallback = validate_service_alias(&format!("svc-{idx}"))?;
222            self.claimed_aliases.insert(fallback.clone());
223            accepted.push(fallback);
224        }
225
226        Ok(accepted)
227    }
228
229    fn wait_for_service_readiness(
230        &self,
231        container_name: &str,
232        service: &ServiceSpec,
233        ports: &[ServicePort],
234    ) -> Result<()> {
235        let timeout = service_ready_timeout();
236        let started = Instant::now();
237        let mut confirmed_running_without_health = false;
238
239        loop {
240            let state = match inspect_service_state(self.engine, container_name) {
241                Ok(state) => state,
242                Err(err) => {
243                    warn!(
244                        service = container_name,
245                        "failed to inspect service readiness ({err}); continuing without readiness gate"
246                    );
247                    return Ok(());
248                }
249            };
250
251            match readiness_from_state(&state) {
252                ServiceReadiness::Ready => {
253                    if state.health.is_none() {
254                        if !ports.is_empty() {
255                            let Some(ip) = inspect_service_ipv4(self.engine, container_name) else {
256                                if started.elapsed() >= timeout {
257                                    return Err(anyhow!(
258                                        "service '{}' ({}) did not expose a reachable IP within {}s",
259                                        container_name,
260                                        service.image,
261                                        timeout.as_secs()
262                                    ));
263                                }
264                                thread::sleep(Duration::from_millis(SERVICE_READY_POLL_MS));
265                                continue;
266                            };
267                            if probe_service_ports(self.engine, &self.network, &ip, ports)? {
268                                return Ok(());
269                            }
270                            if started.elapsed() >= timeout {
271                                return Err(anyhow!(
272                                    "service '{}' ({}) did not accept connections on exposed ports within {}s",
273                                    container_name,
274                                    service.image,
275                                    timeout.as_secs()
276                                ));
277                            }
278                            thread::sleep(Duration::from_millis(SERVICE_READY_POLL_MS));
279                            continue;
280                        }
281                        if !confirmed_running_without_health {
282                            confirmed_running_without_health = true;
283                            thread::sleep(Duration::from_millis(SERVICE_READY_POLL_MS));
284                            continue;
285                        }
286                    }
287                    return Ok(());
288                }
289                ServiceReadiness::Waiting(detail) => {
290                    confirmed_running_without_health = false;
291                    if started.elapsed() >= timeout {
292                        return Err(anyhow!(
293                            "service '{}' ({}) did not become ready within {}s: {}",
294                            container_name,
295                            service.image,
296                            timeout.as_secs(),
297                            detail
298                        ));
299                    }
300                    thread::sleep(Duration::from_millis(SERVICE_READY_POLL_MS));
301                }
302                ServiceReadiness::Failed(detail) => {
303                    return Err(anyhow!(
304                        "service '{}' ({}) failed readiness check: {}",
305                        container_name,
306                        service.image,
307                        detail
308                    ));
309                }
310            }
311        }
312    }
313}
314
315fn service_supported(engine: EngineKind) -> Result<()> {
316    if matches!(
317        engine,
318        EngineKind::Docker
319            | EngineKind::Orbstack
320            | EngineKind::Podman
321            | EngineKind::Nerdctl
322            | EngineKind::ContainerCli
323    ) {
324        Ok(())
325    } else {
326        Err(anyhow!(
327            "services are only supported when using docker, podman, nerdctl, orbstack, or container"
328        ))
329    }
330}
331
332fn engine_binary(engine: EngineKind) -> &'static str {
333    match engine {
334        EngineKind::Docker | EngineKind::Orbstack => "docker",
335        EngineKind::Podman => "podman",
336        EngineKind::Nerdctl => "nerdctl",
337        EngineKind::ContainerCli => "container",
338    }
339}
340
341fn service_command(engine: EngineKind, service: &ServiceSpec) -> Command {
342    let mut command = Command::new(engine_binary(engine));
343    command.arg("run");
344    if matches!(engine, EngineKind::ContainerCli) {
345        if let Some(arch) = default_container_cli_arch(service.docker_platform.as_deref()) {
346            command.arg("--arch").arg(arch);
347        }
348    } else if let Some(platform) = &service.docker_platform {
349        command.arg("--platform").arg(platform);
350    }
351    command
352}
353
354#[derive(Debug, Clone, PartialEq, Eq)]
355struct ServiceState {
356    running: bool,
357    status: Option<String>,
358    health: Option<String>,
359    exit_code: Option<i64>,
360}
361
362#[derive(Debug, Clone, PartialEq, Eq)]
363enum ServiceReadiness {
364    Ready,
365    Waiting(String),
366    Failed(String),
367}
368
369fn service_ready_timeout() -> Duration {
370    env::var("OPAL_SERVICE_READY_TIMEOUT_SECS")
371        .ok()
372        .and_then(|raw| raw.parse::<u64>().ok())
373        .filter(|seconds| *seconds > 0)
374        .map(Duration::from_secs)
375        .unwrap_or_else(|| Duration::from_secs(SERVICE_READY_TIMEOUT_DEFAULT_SECS))
376}
377
378fn inspect_service_state(engine: EngineKind, container_name: &str) -> Result<ServiceState> {
379    let mut command = Command::new(engine_binary(engine));
380    command
381        .arg("inspect")
382        .arg("--format")
383        .arg("{{json .State}}")
384        .arg(container_name);
385    let output = command
386        .output()
387        .with_context(|| format!("failed to inspect service container '{}'", container_name))?;
388    if output.status.success() {
389        return parse_service_state(&output.stdout);
390    }
391
392    let mut fallback = Command::new(engine_binary(engine));
393    fallback.arg("inspect").arg(container_name);
394    let output = fallback
395        .output()
396        .with_context(|| format!("failed to inspect service container '{}'", container_name))?;
397    if !output.status.success() {
398        return Err(command_failed(
399            &fallback,
400            &output.stdout,
401            &output.stderr,
402            output.status.code(),
403        ));
404    }
405    parse_service_state(&output.stdout)
406}
407
408fn inspect_service_ipv4(engine: EngineKind, container_name: &str) -> Option<String> {
409    let mut command = Command::new(engine_binary(engine));
410    command.arg("inspect").arg(container_name);
411    let output = command.output().ok()?;
412    if !output.status.success() {
413        return None;
414    }
415    parse_service_ipv4(&output.stdout).ok().flatten()
416}
417
418fn parse_service_ipv4(payload: &[u8]) -> Result<Option<String>> {
419    let value: serde_json::Value = serde_json::from_slice(payload)
420        .context("failed to parse service inspect output as json")?;
421    let service = value
422        .as_array()
423        .and_then(|items| items.first())
424        .or_else(|| value.as_object().map(|_| &value))
425        .ok_or_else(|| anyhow!("service inspect output was not an object or array"))?;
426
427    if let Some(ip) = service
428        .get("networks")
429        .and_then(|networks| networks.as_array())
430        .and_then(|items| items.first())
431        .and_then(|network| network.get("ipv4Address"))
432        .and_then(|value| value.as_str())
433    {
434        return Ok(ip.split('/').next().map(str::to_string));
435    }
436
437    if let Some(networks) = service
438        .get("NetworkSettings")
439        .and_then(|settings| settings.get("Networks"))
440        .and_then(|networks| networks.as_object())
441    {
442        for network in networks.values() {
443            if let Some(ip) = network.get("IPAddress").and_then(|value| value.as_str())
444                && !ip.is_empty()
445            {
446                return Ok(Some(ip.to_string()));
447            }
448        }
449    }
450
451    Ok(None)
452}
453
454fn probe_service_ports(
455    engine: EngineKind,
456    network: &str,
457    host: &str,
458    ports: &[ServicePort],
459) -> Result<bool> {
460    if ports.is_empty() {
461        return Ok(true);
462    }
463
464    let checks = ports
465        .iter()
466        .filter(|port| port.proto == "tcp")
467        .map(|port| format!("nc -z {} {}", shell_escape(host), port.port))
468        .collect::<Vec<_>>();
469    if checks.is_empty() {
470        return Ok(true);
471    }
472
473    let mut command = Command::new(engine_binary(engine));
474    command.arg("run").arg("--rm").arg("--network").arg(network);
475    if matches!(engine, EngineKind::ContainerCli)
476        && let Some(arch) = default_container_cli_arch(None)
477    {
478        command.arg("--arch").arg(arch);
479    }
480    let script = checks.join(" && ");
481    let status = command
482        .arg("docker.io/library/alpine:3.19")
483        .arg("sh")
484        .arg("-lc")
485        .arg(script)
486        .status()
487        .with_context(|| "failed to run service connectivity probe")?;
488    Ok(status.success())
489}
490
491fn shell_escape(value: &str) -> String {
492    format!("'{}'", value.replace('\'', "'\"'\"'"))
493}
494
495fn parse_service_state(payload: &[u8]) -> Result<ServiceState> {
496    let value: serde_json::Value = serde_json::from_slice(payload)
497        .context("failed to parse service inspect output as json")?;
498    let service = value
499        .as_array()
500        .and_then(|items| items.first())
501        .or_else(|| value.as_object().map(|_| &value))
502        .ok_or_else(|| anyhow!("service inspect output was not an object or array"))?;
503
504    let state = if let Some(state) = service.get("State") {
505        state
506    } else if service.get("Running").is_some()
507        || service.get("Status").is_some()
508        || service.get("status").is_some()
509    {
510        service
511    } else {
512        return Err(anyhow!("service inspect output missing State field"));
513    };
514
515    let running = state
516        .get("Running")
517        .and_then(|v| v.as_bool())
518        .or_else(|| state.get("running").and_then(|v| v.as_bool()))
519        .unwrap_or_else(|| {
520            state
521                .get("Status")
522                .and_then(|v| v.as_str())
523                .or_else(|| state.get("status").and_then(|v| v.as_str()))
524                .is_some_and(|status| status.eq_ignore_ascii_case("running"))
525        });
526    let status = state
527        .get("Status")
528        .and_then(|v| v.as_str())
529        .or_else(|| state.get("status").and_then(|v| v.as_str()))
530        .map(|s| s.to_ascii_lowercase());
531    let health = state
532        .get("Health")
533        .and_then(|health| health.get("Status"))
534        .and_then(|status| status.as_str())
535        .or_else(|| {
536            state
537                .get("health")
538                .and_then(|health| health.get("status"))
539                .and_then(|status| status.as_str())
540        })
541        .map(|s| s.to_ascii_lowercase());
542    let exit_code = state
543        .get("ExitCode")
544        .and_then(|v| v.as_i64())
545        .or_else(|| state.get("exitCode").and_then(|v| v.as_i64()));
546
547    Ok(ServiceState {
548        running,
549        status,
550        health,
551        exit_code,
552    })
553}
554
555fn readiness_from_state(state: &ServiceState) -> ServiceReadiness {
556    if !state.running {
557        if matches!(state.status.as_deref(), Some("exited" | "dead" | "stopped"))
558            || state.exit_code.is_some_and(|code| code != 0)
559        {
560            return ServiceReadiness::Failed(format!(
561                "status={}, running=false, exit_code={}",
562                state.status.as_deref().unwrap_or("unknown"),
563                state
564                    .exit_code
565                    .map(|code| code.to_string())
566                    .unwrap_or_else(|| "unknown".to_string())
567            ));
568        }
569        return ServiceReadiness::Waiting(format!(
570            "status={}, running=false",
571            state.status.as_deref().unwrap_or("unknown")
572        ));
573    }
574
575    match state.health.as_deref() {
576        Some("healthy") => ServiceReadiness::Ready,
577        Some("unhealthy") => ServiceReadiness::Failed("health=unhealthy".to_string()),
578        Some(status) => ServiceReadiness::Waiting(format!("health={status}")),
579        None => ServiceReadiness::Ready,
580    }
581}
582
583fn sanitize_identifier(input: &str) -> String {
584    let filtered: String = input
585        .chars()
586        .filter(|c| c.is_ascii_alphanumeric())
587        .collect();
588    if filtered.is_empty() {
589        "opal".to_string()
590    } else {
591        filtered
592    }
593}
594
595fn clamp_name(base: &str) -> String {
596    if base.len() <= MAX_NAME_LEN {
597        return base.to_string();
598    }
599    let mut hasher = Sha256::new();
600    hasher.update(base.as_bytes());
601    let digest = hasher.finalize();
602    let mut suffix = String::with_capacity(8);
603    for byte in digest.iter().take(4) {
604        let _ = FmtWrite::write_fmt(&mut suffix, format_args!("{:02x}", byte));
605    }
606    let prefix_len = MAX_NAME_LEN.saturating_sub(suffix.len() + 1);
607    let prefix: String = base.chars().take(prefix_len).collect();
608    format!("{prefix}-{suffix}")
609}
610
611fn validate_service_alias(alias: &str) -> Result<String> {
612    let normalized = alias.trim().to_ascii_lowercase();
613    if normalized.is_empty() {
614        bail!("service alias must not be empty");
615    }
616    if normalized.starts_with('-') || normalized.ends_with('-') {
617        bail!("service alias '{}' must not start or end with '-'", alias);
618    }
619    if !normalized
620        .chars()
621        .all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '-')
622    {
623        bail!(
624            "service alias '{}' contains unsupported characters; use lowercase letters, digits, or '-'",
625            alias
626        );
627    }
628    Ok(normalized)
629}
630
631#[derive(Debug, Clone)]
632struct ServicePort {
633    port: u16,
634    proto: String,
635}
636
637fn discover_container_ports(image: &str) -> Result<Vec<ServicePort>> {
638    let output = Command::new("container")
639        .arg("image")
640        .arg("inspect")
641        .arg(image)
642        .output()
643        .context("failed to inspect container image")?;
644    if !output.status.success() {
645        return Ok(Vec::new());
646    }
647    let infos: Vec<ContainerImageInspect> = serde_json::from_slice(&output.stdout)?;
648    let mut ports = Vec::new();
649    let mut seen = HashSet::new();
650    // TODO; jesus on a cracker, what the fuck, for in for in for in for in for.........
651    for info in infos {
652        for variant in info.variants {
653            for entry in variant.config.history {
654                if let Some(cmd) = entry.created_by
655                    && let Some(idx) = cmd.find("EXPOSE map[")
656                {
657                    let rest = &cmd[idx + "EXPOSE map[".len()..];
658                    if let Some(end) = rest.find(']') {
659                        let map = &rest[..end];
660                        for token in map.split_whitespace() {
661                            let cleaned = token.trim_matches(|c| c == ',' || c == '{' || c == '}');
662                            if cleaned.is_empty() {
663                                continue;
664                            }
665                            let mut parts = cleaned.split('/');
666                            let port_str = parts.next().unwrap_or("");
667                            let proto_part = parts.next().unwrap_or("tcp");
668                            if let Ok(port) = port_str.parse::<u16>() {
669                                let proto = proto_part
670                                    .split(':')
671                                    .next()
672                                    .unwrap_or("tcp")
673                                    .to_ascii_lowercase();
674                                if seen.insert((port, proto.clone())) {
675                                    ports.push(ServicePort { port, proto });
676                                }
677                            }
678                        }
679                    }
680                }
681            }
682        }
683    }
684    Ok(ports)
685}
686
687fn build_service_env(alias: &str, host: &str, ports: &[ServicePort]) -> Vec<(String, String)> {
688    if ports.is_empty() {
689        return Vec::new();
690    }
691    let mut envs = Vec::new();
692    let alias_key: String = alias
693        .chars()
694        .map(|c| {
695            if c.is_ascii_alphanumeric() {
696                c.to_ascii_uppercase()
697            } else {
698                '_'
699            }
700        })
701        .collect();
702    let primary = &ports[0];
703    envs.push((
704        format!("{}_PORT", alias_key),
705        format!("{}://{}:{}", primary.proto, host, primary.port),
706    ));
707    for port in ports {
708        let proto_upper = port.proto.to_ascii_uppercase();
709        let proto_lower = port.proto.to_ascii_lowercase();
710        let base = format!("{}_PORT_{}_{}", alias_key, port.port, proto_upper);
711        envs.push((
712            base.clone(),
713            format!("{}://{}:{}", proto_lower, host, port.port),
714        ));
715        envs.push((format!("{}_ADDR", base), host.to_string()));
716        envs.push((format!("{}_PORT", base), port.port.to_string()));
717        envs.push((format!("{}_PROTO", base), proto_lower));
718    }
719    envs
720}
721
722#[derive(Deserialize)]
723struct ContainerImageInspect {
724    variants: Vec<ContainerVariant>,
725}
726
727#[derive(Deserialize)]
728struct ContainerVariant {
729    config: VariantConfig,
730}
731
732#[derive(Deserialize)]
733struct VariantConfig {
734    history: Vec<HistoryEntry>,
735}
736
737#[derive(Deserialize)]
738struct HistoryEntry {
739    #[serde(rename = "created_by")]
740    created_by: Option<String>,
741}
742
743fn run_command(mut cmd: Command) -> Result<()> {
744    let output = cmd.output()?;
745    if output.status.success() {
746        Ok(())
747    } else {
748        Err(command_failed(
749            &cmd,
750            &output.stdout,
751            &output.stderr,
752            output.status.code(),
753        ))
754    }
755}
756
757fn run_command_with_timeout(mut cmd: Command, timeout: Option<Duration>) -> Result<()> {
758    let Some(timeout) = timeout else {
759        return run_command(cmd);
760    };
761    cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
762    let mut child = cmd.spawn()?;
763    let started = Instant::now();
764
765    loop {
766        if child.try_wait()?.is_some() {
767            let output = child.wait_with_output()?;
768            if output.status.success() {
769                return Ok(());
770            }
771            return Err(command_failed(
772                &cmd,
773                &output.stdout,
774                &output.stderr,
775                output.status.code(),
776            ));
777        }
778
779        if started.elapsed() >= timeout {
780            let _ = child.kill();
781            let output = child.wait_with_output().ok();
782            let (stdout, stderr, code) = if let Some(output) = output {
783                (output.stdout, output.stderr, output.status.code())
784            } else {
785                (Vec::new(), Vec::new(), None)
786            };
787            return Err(anyhow!(
788                "command {:?} timed out after {}s{}",
789                &cmd,
790                timeout.as_secs(),
791                command_failed_detail(&stdout, &stderr, code)
792            ));
793        }
794
795        thread::sleep(Duration::from_millis(100));
796    }
797}
798
799fn command_timeout(engine: EngineKind) -> Option<Duration> {
800    if matches!(engine, EngineKind::ContainerCli) {
801        Some(container_command_timeout())
802    } else {
803        None
804    }
805}
806
807fn container_command_timeout() -> Duration {
808    env::var("OPAL_CONTAINER_COMMAND_TIMEOUT_SECS")
809        .ok()
810        .and_then(|raw| raw.parse::<u64>().ok())
811        .filter(|seconds| *seconds > 0)
812        .map(Duration::from_secs)
813        .unwrap_or_else(|| Duration::from_secs(CONTAINER_COMMAND_TIMEOUT_DEFAULT_SECS))
814}
815
816fn command_failed(cmd: &Command, stdout: &[u8], stderr: &[u8], code: Option<i32>) -> anyhow::Error {
817    anyhow!(
818        "command {:?} exited with status {:?}{}",
819        cmd,
820        code,
821        command_failed_detail(stdout, stderr, code)
822    )
823}
824
825fn command_failed_detail(stdout: &[u8], stderr: &[u8], _code: Option<i32>) -> String {
826    let stdout = String::from_utf8_lossy(stdout).trim().to_string();
827    let stderr = String::from_utf8_lossy(stderr).trim().to_string();
828    match (stdout.is_empty(), stderr.is_empty()) {
829        (true, true) => String::new(),
830        (false, true) => format!(": {stdout}"),
831        (true, false) => format!(": {stderr}"),
832        (false, false) => format!(": stdout={stdout}; stderr={stderr}"),
833    }
834}
835
836fn run_network_create(engine: EngineKind, network: &str) -> Result<()> {
837    run_network_command(engine, "create", network)
838}
839
840fn run_network_remove(engine: EngineKind, network: &str) -> Result<()> {
841    run_network_command(engine, "rm", network)
842}
843
844fn run_network_command(engine: EngineKind, action: &str, network: &str) -> Result<()> {
845    let attempts = if matches!(engine, EngineKind::ContainerCli) {
846        CONTAINER_NETWORK_RETRY_ATTEMPTS
847    } else {
848        1
849    };
850
851    let mut last_error = None;
852    for attempt in 0..attempts {
853        let mut command = Command::new(engine_binary(engine));
854        command.arg("network").arg(action).arg(network);
855        match run_command_with_timeout(command, command_timeout(engine)) {
856            Ok(()) => return Ok(()),
857            Err(err) => {
858                if matches!(engine, EngineKind::ContainerCli)
859                    && should_retry_container_network_error(&err.to_string())
860                    && attempt + 1 < attempts
861                {
862                    warn!(
863                        network,
864                        action,
865                        attempt = attempt + 1,
866                        "container network command timed out; retrying"
867                    );
868                    thread::sleep(Duration::from_millis(
869                        CONTAINER_NETWORK_RETRY_DELAY_MS * (attempt + 1) as u64,
870                    ));
871                    last_error = Some(err);
872                    continue;
873                }
874                return Err(err);
875            }
876        }
877    }
878
879    Err(last_error.unwrap_or_else(|| anyhow!("network command failed without an error")))
880}
881
882fn should_retry_container_network_error(message: &str) -> bool {
883    message.contains("XPC timeout for request to com.apple.container.apiserver/networkCreate")
884        || message
885            .contains("XPC timeout for request to com.apple.container.apiserver/networkDelete")
886        || message.contains("Connection invalid")
887        || message.contains("apiserver")
888}
889
890fn merged_env(
891    base: &[(String, String)],
892    overrides: &HashMap<String, String>,
893) -> Vec<(String, String)> {
894    let lookup: HashMap<String, String> = base.iter().cloned().collect();
895    let mut env = base.to_vec();
896    for (_, value) in &mut env {
897        *value = crate::env::expand_value(value, &lookup);
898    }
899    let mut map: HashMap<String, String> = env.into_iter().collect();
900    for (key, value) in overrides {
901        map.insert(key.clone(), value.clone());
902    }
903    map.into_iter().collect()
904}
905
906fn default_service_aliases(image: &str) -> Vec<String> {
907    let without_tag = image.split(':').next().unwrap_or(image);
908    let primary = without_tag.replace('/', "__");
909    let secondary = without_tag.replace('/', "-");
910    let mut aliases = Vec::new();
911    if !primary.is_empty() {
912        aliases.push(primary);
913    }
914    if !secondary.is_empty() && !aliases.iter().any(|existing| existing == &secondary) {
915        aliases.push(secondary);
916    }
917    if aliases.is_empty() {
918        aliases.push("service".to_string());
919    }
920    aliases
921}
922
923#[cfg(test)]
924mod tests {
925    use super::{
926        ServiceReadiness, ServiceRuntime, ServiceState, parse_service_ipv4, parse_service_state,
927        readiness_from_state, service_command, should_retry_container_network_error,
928    };
929    use crate::EngineKind;
930    use crate::model::ServiceSpec;
931    use std::collections::HashMap;
932    use std::process::Command;
933    use std::time::Duration;
934
935    #[test]
936    fn retries_container_network_xpc_timeouts() {
937        assert!(should_retry_container_network_error(
938            "XPC timeout for request to com.apple.container.apiserver/networkCreate"
939        ));
940        assert!(should_retry_container_network_error(
941            "XPC timeout for request to com.apple.container.apiserver/networkDelete"
942        ));
943        assert!(!should_retry_container_network_error(
944            "cannot delete subnet with referring containers"
945        ));
946    }
947
948    #[test]
949    fn parse_service_state_reads_running_and_health_status() {
950        let payload = br#"[{"State":{"Running":true,"Status":"running","ExitCode":0,"Health":{"Status":"starting"}}}]"#;
951        let state = parse_service_state(payload).expect("parse service state");
952
953        assert!(state.running);
954        assert_eq!(state.status.as_deref(), Some("running"));
955        assert_eq!(state.health.as_deref(), Some("starting"));
956        assert_eq!(state.exit_code, Some(0));
957    }
958
959    #[test]
960    fn parse_service_state_accepts_direct_state_object() {
961        let payload = br#"{"Running":false,"Status":"exited","ExitCode":1}"#;
962        let state = parse_service_state(payload).expect("parse service state");
963
964        assert!(!state.running);
965        assert_eq!(state.status.as_deref(), Some("exited"));
966        assert_eq!(state.exit_code, Some(1));
967    }
968
969    #[test]
970    fn parse_service_state_accepts_container_cli_shape() {
971        let payload = br#"[{"status":"exited","exitCode":1}]"#;
972        let state = parse_service_state(payload).expect("parse service state");
973
974        assert!(!state.running);
975        assert_eq!(state.status.as_deref(), Some("exited"));
976        assert_eq!(state.exit_code, Some(1));
977    }
978
979    #[test]
980    fn readiness_from_state_is_ready_without_healthcheck() {
981        let state = ServiceState {
982            running: true,
983            status: Some("running".to_string()),
984            health: None,
985            exit_code: Some(0),
986        };
987
988        assert!(matches!(
989            readiness_from_state(&state),
990            ServiceReadiness::Ready
991        ));
992    }
993
994    #[test]
995    fn readiness_from_state_waits_while_healthcheck_is_starting() {
996        let state = ServiceState {
997            running: true,
998            status: Some("running".to_string()),
999            health: Some("starting".to_string()),
1000            exit_code: Some(0),
1001        };
1002
1003        match readiness_from_state(&state) {
1004            ServiceReadiness::Waiting(detail) => assert!(detail.contains("starting")),
1005            other => panic!("expected waiting readiness, got {other:?}"),
1006        }
1007    }
1008
1009    #[test]
1010    fn readiness_from_state_fails_when_service_exits() {
1011        let state = ServiceState {
1012            running: false,
1013            status: Some("exited".to_string()),
1014            health: None,
1015            exit_code: Some(1),
1016        };
1017
1018        match readiness_from_state(&state) {
1019            ServiceReadiness::Failed(detail) => assert!(detail.contains("exit_code=1")),
1020            other => panic!("expected failed readiness, got {other:?}"),
1021        }
1022    }
1023
1024    #[test]
1025    fn readiness_from_state_fails_when_healthcheck_unhealthy() {
1026        let state = ServiceState {
1027            running: true,
1028            status: Some("running".to_string()),
1029            health: Some("unhealthy".to_string()),
1030            exit_code: Some(0),
1031        };
1032
1033        match readiness_from_state(&state) {
1034            ServiceReadiness::Failed(detail) => assert!(detail.contains("unhealthy")),
1035            other => panic!("expected failed readiness, got {other:?}"),
1036        }
1037    }
1038
1039    #[test]
1040    fn aliases_for_service_preserves_multiple_unique_aliases() {
1041        let mut runtime = ServiceRuntime {
1042            engine: EngineKind::Docker,
1043            network: "net".into(),
1044            containers: Vec::new(),
1045            link_env: Vec::new(),
1046            claimed_aliases: Default::default(),
1047            host_aliases: Vec::new(),
1048        };
1049        let service = ServiceSpec {
1050            image: "redis:7".into(),
1051            aliases: vec!["cache".into(), "redis".into()],
1052            docker_platform: None,
1053            docker_user: None,
1054            entrypoint: Vec::new(),
1055            command: Vec::new(),
1056            variables: HashMap::new(),
1057        };
1058
1059        let aliases = runtime
1060            .aliases_for_service(0, &service)
1061            .expect("aliases resolve");
1062
1063        assert_eq!(aliases, vec!["cache", "redis"]);
1064    }
1065
1066    #[test]
1067    fn aliases_for_service_uses_gitlab_style_default_aliases() {
1068        let mut runtime = ServiceRuntime {
1069            engine: EngineKind::Docker,
1070            network: "net".into(),
1071            containers: Vec::new(),
1072            link_env: Vec::new(),
1073            claimed_aliases: Default::default(),
1074            host_aliases: Vec::new(),
1075        };
1076        let service = ServiceSpec {
1077            image: "tutum/wordpress:latest".into(),
1078            aliases: Vec::new(),
1079            docker_platform: None,
1080            docker_user: None,
1081            entrypoint: Vec::new(),
1082            command: Vec::new(),
1083            variables: HashMap::new(),
1084        };
1085
1086        let aliases = runtime
1087            .aliases_for_service(0, &service)
1088            .expect("aliases resolve");
1089
1090        assert_eq!(aliases, vec!["tutum__wordpress", "tutum-wordpress"]);
1091    }
1092
1093    #[test]
1094    fn aliases_for_service_falls_back_when_aliases_conflict() {
1095        let mut runtime = ServiceRuntime {
1096            engine: EngineKind::Docker,
1097            network: "net".into(),
1098            containers: Vec::new(),
1099            link_env: Vec::new(),
1100            claimed_aliases: Default::default(),
1101            host_aliases: Vec::new(),
1102        };
1103        let first = ServiceSpec {
1104            image: "redis:7".into(),
1105            aliases: vec!["cache".into()],
1106            docker_platform: None,
1107            docker_user: None,
1108            entrypoint: Vec::new(),
1109            command: Vec::new(),
1110            variables: HashMap::new(),
1111        };
1112        let second = ServiceSpec {
1113            image: "postgres:16".into(),
1114            aliases: vec!["cache".into()],
1115            docker_platform: None,
1116            docker_user: None,
1117            entrypoint: Vec::new(),
1118            command: Vec::new(),
1119            variables: HashMap::new(),
1120        };
1121
1122        assert_eq!(
1123            runtime.aliases_for_service(0, &first).unwrap(),
1124            vec!["cache"]
1125        );
1126        assert_eq!(
1127            runtime.aliases_for_service(1, &second).unwrap(),
1128            vec!["svc-1"]
1129        );
1130    }
1131
1132    #[test]
1133    fn aliases_for_service_falls_back_after_default_aliases_conflict() {
1134        let mut runtime = ServiceRuntime {
1135            engine: EngineKind::Docker,
1136            network: "net".into(),
1137            containers: Vec::new(),
1138            link_env: Vec::new(),
1139            claimed_aliases: Default::default(),
1140            host_aliases: Vec::new(),
1141        };
1142        let first = ServiceSpec {
1143            image: "tutum/wordpress:latest".into(),
1144            aliases: Vec::new(),
1145            docker_platform: None,
1146            docker_user: None,
1147            entrypoint: Vec::new(),
1148            command: Vec::new(),
1149            variables: HashMap::new(),
1150        };
1151        let second = ServiceSpec {
1152            image: "tutum/wordpress:latest".into(),
1153            aliases: Vec::new(),
1154            docker_platform: None,
1155            docker_user: None,
1156            entrypoint: Vec::new(),
1157            command: Vec::new(),
1158            variables: HashMap::new(),
1159        };
1160
1161        assert_eq!(
1162            runtime.aliases_for_service(0, &first).unwrap(),
1163            vec!["tutum__wordpress", "tutum-wordpress"]
1164        );
1165        assert_eq!(
1166            runtime.aliases_for_service(1, &second).unwrap(),
1167            vec!["svc-1"]
1168        );
1169    }
1170
1171    #[test]
1172    fn aliases_for_service_rejects_invalid_aliases() {
1173        let mut runtime = ServiceRuntime {
1174            engine: EngineKind::Docker,
1175            network: "net".into(),
1176            containers: Vec::new(),
1177            link_env: Vec::new(),
1178            claimed_aliases: Default::default(),
1179            host_aliases: Vec::new(),
1180        };
1181        let service = ServiceSpec {
1182            image: "redis:7".into(),
1183            aliases: vec!["bad_alias".into()],
1184            docker_platform: None,
1185            docker_user: None,
1186            entrypoint: Vec::new(),
1187            command: Vec::new(),
1188            variables: HashMap::new(),
1189        };
1190
1191        let err = runtime
1192            .aliases_for_service(0, &service)
1193            .expect_err("alias must error");
1194        assert!(err.to_string().contains("unsupported characters"));
1195    }
1196
1197    #[test]
1198    fn service_command_for_docker_forwards_platform_and_user() {
1199        let service = ServiceSpec {
1200            image: "redis:7".into(),
1201            aliases: vec!["cache".into()],
1202            docker_platform: Some("linux/arm64/v8".into()),
1203            docker_user: Some("1000:1000".into()),
1204            entrypoint: Vec::new(),
1205            command: Vec::new(),
1206            variables: HashMap::new(),
1207        };
1208
1209        let mut command = service_command(EngineKind::Docker, &service);
1210        command
1211            .arg("--user")
1212            .arg(service.docker_user.as_deref().unwrap());
1213        let args: Vec<String> = command
1214            .get_args()
1215            .map(|arg| arg.to_string_lossy().to_string())
1216            .collect();
1217
1218        assert!(
1219            args.windows(2)
1220                .any(|pair| pair == ["--platform", "linux/arm64/v8"])
1221        );
1222        assert!(args.windows(2).any(|pair| pair == ["--user", "1000:1000"]));
1223    }
1224
1225    #[test]
1226    fn service_command_for_container_cli_translates_platform_to_arch() {
1227        let service = ServiceSpec {
1228            image: "redis:7".into(),
1229            aliases: vec!["cache".into()],
1230            docker_platform: Some("linux/amd64".into()),
1231            docker_user: Some("1000:1000".into()),
1232            entrypoint: Vec::new(),
1233            command: Vec::new(),
1234            variables: HashMap::new(),
1235        };
1236
1237        let mut command = service_command(EngineKind::ContainerCli, &service);
1238        command
1239            .arg("--user")
1240            .arg(service.docker_user.as_deref().unwrap());
1241        let args: Vec<String> = command
1242            .get_args()
1243            .map(|arg| arg.to_string_lossy().to_string())
1244            .collect();
1245
1246        assert!(args.windows(2).any(|pair| pair == ["--arch", "x86_64"]));
1247        assert!(args.windows(2).any(|pair| pair == ["--user", "1000:1000"]));
1248    }
1249
1250    #[test]
1251    fn run_command_with_timeout_fails_fast() {
1252        let mut command = Command::new("sh");
1253        command.arg("-lc").arg("sleep 1");
1254
1255        let err = super::run_command_with_timeout(command, Some(Duration::from_millis(50)))
1256            .expect_err("command should time out");
1257
1258        assert!(err.to_string().contains("timed out"));
1259    }
1260
1261    #[test]
1262    fn parse_service_ipv4_accepts_container_cli_shape() {
1263        let payload = br#"[{"networks":[{"ipv4Address":"192.168.64.57/24"}]}]"#;
1264        assert_eq!(
1265            parse_service_ipv4(payload).unwrap(),
1266            Some("192.168.64.57".into())
1267        );
1268    }
1269
1270    #[test]
1271    fn parse_service_ipv4_accepts_docker_shape() {
1272        let payload = br#"[{"NetworkSettings":{"Networks":{"opal":{"IPAddress":"172.18.0.2"}}}}]"#;
1273        assert_eq!(
1274            parse_service_ipv4(payload).unwrap(),
1275            Some("172.18.0.2".into())
1276        );
1277    }
1278
1279    #[test]
1280    fn merged_env_does_not_expand_service_only_variables() {
1281        let merged = super::merged_env(
1282            &[("BASE".into(), "hello".into())],
1283            &HashMap::from([
1284                ("BASE".into(), "override".into()),
1285                ("SERVICE_ONLY".into(), "$BASE-world".into()),
1286            ]),
1287        );
1288        let merged_map: HashMap<_, _> = merged.into_iter().collect();
1289
1290        assert_eq!(merged_map.get("BASE").map(String::as_str), Some("override"));
1291        assert_eq!(
1292            merged_map.get("SERVICE_ONLY").map(String::as_str),
1293            Some("$BASE-world")
1294        );
1295    }
1296}