Skip to main content

fakecloud_ecs/
runtime.rs

1//! Docker/Podman-based ECS task execution.
2//!
3//! Mirrors the Lambda `ContainerRuntime` approach (auto-detect CLI, forward
4//! localhost → host.docker.internal) but scoped for ECS's different
5//! lifecycle: tasks are ephemeral, so there is no warm-container pool. Each
6//! `run_task` spawns a background tokio task that pulls the image, starts
7//! the container, waits for exit, captures logs, and updates shared ECS
8//! state in place.
9
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use base64::Engine;
15use chrono::Utc;
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_logs::ingest::{append_events, IngestEvent};
18use fakecloud_logs::SharedLogsState;
19use fakecloud_secretsmanager::SharedSecretsManagerState;
20use fakecloud_ssm::SharedSsmState;
21use parking_lot::RwLock;
22use tempfile::TempDir;
23use tokio::process::Command;
24
25use crate::state::{LifecycleEvent, SharedEcsState};
26
27#[derive(Debug, thiserror::Error)]
28pub enum RuntimeError {
29    #[error("container CLI not found (tried docker, podman)")]
30    NoCli,
31    #[error("image pull failed: {0}")]
32    ImagePull(String),
33    #[error("container start failed: {0}")]
34    ContainerStart(String),
35    #[error("docker wait failed: {0}")]
36    Wait(String),
37}
38
39/// Docker/Podman executor for ECS tasks.
40pub struct EcsRuntime {
41    cli: String,
42    host_ip: String,
43    /// Port the main fakecloud server bound to. Used to translate AWS
44    /// ECR URIs (`<acct>.dkr.ecr.<region>.amazonaws.com/<repo>:<tag>`) to
45    /// the local OCI v2 endpoint (`127.0.0.1:<port>/<repo>:<tag>`) so
46    /// tasks can pull images pushed to fakecloud's own ECR.
47    server_port: u16,
48    /// Isolated DOCKER_CONFIG dir pre-populated with Basic auth for
49    /// `127.0.0.1:<port>`; keeps the host user's `~/.docker/config.json`
50    /// untouched and lets `docker pull` succeed against fakecloud ECR
51    /// without a prior `aws ecr get-login-password | docker login`.
52    docker_config: Option<Arc<TempDir>>,
53    /// Tracks per-task lists of `(container_name, docker_container_id)` so
54    /// `stop_task` can kill every container backing a task — multi-container
55    /// task definitions launch one docker container per `containerDefinitions`
56    /// entry, all of which must be torn down on stop.
57    containers: RwLock<std::collections::HashMap<String, Vec<(String, String)>>>,
58    /// Cross-service delivery bus — emits `aws.ecs` EventBridge events
59    /// on task state transitions when wired. `None` if the server started
60    /// without EventBridge configured (or for unit tests).
61    delivery_bus: Option<Arc<DeliveryBus>>,
62    /// CloudWatch Logs state — when set, tasks whose container definition
63    /// declares the `awslogs` log driver get their captured stdout/stderr
64    /// forwarded to a log group/stream under this shared state.
65    logs_state: Option<SharedLogsState>,
66    /// SecretsManager state for resolving `containerDefinition.secrets[]`
67    /// entries whose `valueFrom` is a SecretsManager ARN.
68    secretsmanager_state: Option<SharedSecretsManagerState>,
69    /// SSM Parameter Store state for resolving `secrets[]` entries whose
70    /// `valueFrom` is an SSM parameter ARN.
71    ssm_state: Option<SharedSsmState>,
72}
73
74impl EcsRuntime {
75    /// Auto-detect Docker or Podman. Returns `None` if neither is
76    /// available. Honours `FAKECLOUD_CONTAINER_CLI` for explicit override.
77    /// `server_port` is the port the main fakecloud server bound to;
78    /// needed to resolve AWS ECR URIs against the local OCI v2 registry.
79    pub fn new(server_port: u16) -> Option<Self> {
80        let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
81            if cli_works(&cli) {
82                cli
83            } else {
84                return None;
85            }
86        } else if cli_works("docker") {
87            "docker".to_string()
88        } else if cli_works("podman") {
89            "podman".to_string()
90        } else {
91            return None;
92        };
93        let host_ip = if cfg!(target_os = "linux") {
94            "172.17.0.1".to_string()
95        } else {
96            "host-gateway".to_string()
97        };
98        let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
99        Some(Self {
100            cli,
101            host_ip,
102            server_port,
103            docker_config,
104            containers: RwLock::new(std::collections::HashMap::new()),
105            delivery_bus: None,
106            logs_state: None,
107            secretsmanager_state: None,
108            ssm_state: None,
109        })
110    }
111
112    /// Path suitable for `DOCKER_CONFIG`. `None` if the tempdir setup
113    /// failed; in that case pulls fall back to the user's own config and
114    /// will only work if they've already logged in.
115    fn docker_config_path(&self) -> Option<PathBuf> {
116        self.docker_config.as_ref().map(|d| d.path().to_path_buf())
117    }
118
119    /// Build a `Command` for the container CLI with `DOCKER_CONFIG` set
120    /// to our isolated tempdir so fakecloud ECR auth works out of the box.
121    fn cli_command(&self) -> Command {
122        let mut cmd = Command::new(&self.cli);
123        if let Some(p) = self.docker_config_path() {
124            cmd.env("DOCKER_CONFIG", p);
125        }
126        cmd
127    }
128
129    pub fn cli_name(&self) -> &str {
130        &self.cli
131    }
132
133    /// Wire EventBridge delivery so task state transitions emit
134    /// `aws.ecs` / `ECS Task State Change` events.
135    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
136        self.delivery_bus = Some(bus);
137        self
138    }
139
140    fn register_lb_targets(&self, state: &SharedEcsState, account_id: &str, task_id: &str) {
141        let Some(ref bus) = self.delivery_bus else {
142            return;
143        };
144        let accounts = state.read();
145        let Some(s) = accounts.get(account_id) else {
146            return;
147        };
148        let Some(task) = s.tasks.get(task_id) else {
149            return;
150        };
151        let targets = compute_elbv2_targets(s, task);
152        drop(accounts);
153        for (tg_arn, tg_targets) in targets {
154            bus.register_elbv2_targets(account_id, &tg_arn, tg_targets);
155        }
156    }
157
158    fn deregister_lb_targets(&self, state: &SharedEcsState, account_id: &str, task_id: &str) {
159        let Some(ref bus) = self.delivery_bus else {
160            return;
161        };
162        let accounts = state.read();
163        let Some(s) = accounts.get(account_id) else {
164            return;
165        };
166        let Some(task) = s.tasks.get(task_id) else {
167            return;
168        };
169        let targets = compute_elbv2_targets(s, task);
170        drop(accounts);
171        for (tg_arn, tg_targets) in targets {
172            bus.deregister_elbv2_targets(account_id, &tg_arn, tg_targets);
173        }
174    }
175
176    /// Wire CloudWatch Logs state so tasks using the `awslogs` driver
177    /// get their captured stdout/stderr forwarded.
178    pub fn with_logs(mut self, logs: SharedLogsState) -> Self {
179        self.logs_state = Some(logs);
180        self
181    }
182
183    /// Wire SecretsManager state so `secrets[].valueFrom` entries
184    /// pointing at SecretsManager ARNs resolve at task launch.
185    pub fn with_secretsmanager(mut self, state: SharedSecretsManagerState) -> Self {
186        self.secretsmanager_state = Some(state);
187        self
188    }
189
190    /// Wire SSM state so `secrets[].valueFrom` entries pointing at
191    /// Parameter Store ARNs resolve at task launch.
192    pub fn with_ssm(mut self, state: SharedSsmState) -> Self {
193        self.ssm_state = Some(state);
194        self
195    }
196
197    /// Spawn the task asynchronously. Returns immediately after transitioning
198    /// the task to `PENDING`; the background task advances it to `RUNNING`
199    /// once the container is created and to `STOPPED` once the container
200    /// exits.
201    pub fn run_task(self: Arc<Self>, state: SharedEcsState, task_id: String, account_id: String) {
202        let rt = self.clone();
203        tokio::spawn(async move {
204            if let Err(err) = rt.run_task_inner(&state, &task_id, &account_id).await {
205                tracing::warn!(%err, task = %task_id, "ecs task execution failed");
206                // Also surface on stderr so nextest's captured-output for a
207                // failed E2E shows the reason instead of just "empty logs".
208                eprintln!("[ecs] task {task_id} failed: {err}");
209                finalize_failure(&state, &account_id, &task_id, &err.to_string());
210                rt.emit_state_change(
211                    &state,
212                    &account_id,
213                    &task_id,
214                    "STOPPED",
215                    Some(("TaskFailedToStart", err.to_string())),
216                );
217            }
218        });
219    }
220
221    async fn run_task_inner(
222        &self,
223        state: &SharedEcsState,
224        task_id: &str,
225        account_id: &str,
226    ) -> Result<(), RuntimeError> {
227        // Build a per-container launch plan up-front so we hold the read
228        // lock once. Each entry carries everything needed to compose a
229        // `docker run` invocation for one container in the task.
230        let plans = build_container_plans(state, account_id, task_id, self.server_port)?;
231        if plans.is_empty() {
232            return Err(RuntimeError::ContainerStart(
233                "task has no containers".into(),
234            ));
235        }
236
237        // Resolve secrets for each plan. Failures fail the whole task to
238        // match real ECS's "failed to retrieve secret" behaviour — there's
239        // no point starting a sidecar when the app container will fail.
240        let mut resolved_plans: Vec<ResolvedContainerPlan> = Vec::with_capacity(plans.len());
241        for plan in plans {
242            let mut env = plan.env.clone();
243            for (name, value_from) in &plan.secrets_refs {
244                match self.resolve_secret(account_id, value_from) {
245                    Some(v) => env.push((name.clone(), v)),
246                    None => {
247                        return Err(RuntimeError::ContainerStart(format!(
248                            "failed to resolve secret {name} from {value_from}"
249                        )));
250                    }
251                }
252            }
253            if plan.has_task_role {
254                env.push((
255                    "AWS_CONTAINER_CREDENTIALS_FULL_URI".into(),
256                    format!(
257                        "http://host.docker.internal:{}/_fakecloud/ecs/creds/{}",
258                        self.server_port, task_id
259                    ),
260                ));
261            }
262            env.push((
263                "ECS_CONTAINER_METADATA_URI".into(),
264                format!(
265                    "http://host.docker.internal:{}/_fakecloud/ecs/v3/{}",
266                    self.server_port, task_id
267                ),
268            ));
269            env.push((
270                "ECS_CONTAINER_METADATA_URI_V4".into(),
271                format!(
272                    "http://host.docker.internal:{}/_fakecloud/ecs/v4/{}",
273                    self.server_port, task_id
274                ),
275            ));
276            resolved_plans.push(ResolvedContainerPlan { plan, env });
277        }
278
279        // Pull every distinct image up-front so a second container's pull
280        // failure surfaces before we leave the first container running.
281        mark_pull_started(state, account_id, task_id);
282        let mut run_images: Vec<String> = Vec::with_capacity(resolved_plans.len());
283        let mut image_digests: Vec<Option<String>> = Vec::with_capacity(resolved_plans.len());
284        for rp in &resolved_plans {
285            let local_pull_uri =
286                fakecloud_core::ecr_uri::translate_to_local(&rp.plan.image, self.server_port);
287            let pull_uri = local_pull_uri.as_deref().unwrap_or(&rp.plan.image);
288            let pull_out = self
289                .cli_command()
290                .args(["pull", pull_uri])
291                .output()
292                .await
293                .map_err(|e| RuntimeError::ImagePull(e.to_string()))?;
294            if !pull_out.status.success() {
295                let err = String::from_utf8_lossy(&pull_out.stderr).to_string();
296                return Err(RuntimeError::ImagePull(err));
297            }
298            // Retag the local pull URI to the AWS URI so `docker run` finds
299            // the image under the user-facing name. Digest-pinned refs
300            // can't be `docker tag` targets, so we fall through and run
301            // under the local URI in that case.
302            let run_image = if let Some(ref local_uri) = local_pull_uri {
303                if fakecloud_core::ecr_uri::is_digest_ref(&rp.plan.image) {
304                    local_uri.clone()
305                } else {
306                    let _ = self
307                        .cli_command()
308                        .args(["tag", local_uri, &rp.plan.image])
309                        .output()
310                        .await;
311                    rp.plan.image.clone()
312                }
313            } else {
314                rp.plan.image.clone()
315            };
316            // Best-effort image digest extraction so DescribeTasks emits
317            // the resolved digest the way real ECS does. Failures here
318            // (e.g. CLI without RepoDigests) are silent — digest stays
319            // `None` rather than failing the task.
320            let digest = self.lookup_image_digest(pull_uri).await;
321            run_images.push(run_image);
322            image_digests.push(digest);
323        }
324        mark_pull_stopped(state, account_id, task_id);
325
326        // For awsvpc network mode, create a per-task docker network so
327        // containers share an isolated bridge. Clean it up when the task
328        // stops. Network creation is best-effort: on failure we fall back
329        // to the default bridge and continue.
330        let awsvpc_network = resolved_plans
331            .iter()
332            .any(|rp| rp.plan.network_mode.as_deref() == Some("awsvpc"));
333        let network_name = format!("fakecloud-ecs-{}", task_id);
334        let network_created = if awsvpc_network {
335            let create = Command::new(&self.cli)
336                .args([
337                    "network",
338                    "create",
339                    "--driver",
340                    "bridge",
341                    "--label",
342                    &format!("fakecloud-ecs-task={}", task_id),
343                    &network_name,
344                ])
345                .output()
346                .await;
347            match create {
348                Ok(out) if out.status.success() => {
349                    tracing::info!(
350                        task = %task_id,
351                        network = %network_name,
352                        "created awsvpc docker network"
353                    );
354                    true
355                }
356                Ok(out) => {
357                    let err = String::from_utf8_lossy(&out.stderr);
358                    tracing::warn!(
359                        task = %task_id,
360                        network = %network_name,
361                        error = %err,
362                        "awsvpc network creation failed; falling back to default bridge"
363                    );
364                    false
365                }
366                Err(e) => {
367                    tracing::warn!(
368                        task = %task_id,
369                        network = %network_name,
370                        error = %e,
371                        "awsvpc network creation failed; falling back to default bridge"
372                    );
373                    false
374                }
375            }
376        } else {
377            false
378        };
379
380        if network_created {
381            let eni_id = format!(
382                "eni-{}",
383                uuid::Uuid::new_v4()
384                    .to_string()
385                    .replace('-', "")
386                    .get(..17)
387                    .unwrap_or("")
388            );
389            let mac = format!(
390                "02:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
391                rand::random::<u8>(),
392                rand::random::<u8>(),
393                rand::random::<u8>(),
394                rand::random::<u8>(),
395                rand::random::<u8>()
396            );
397            let ip = format!("10.0.{}.{}", rand::random::<u8>(), rand::random::<u8>());
398            let mut accounts = state.write();
399            if let Some(st) = accounts.get_mut(account_id) {
400                if let Some(task) = st.tasks.get_mut(task_id) {
401                    task.attachments.push(crate::state::TaskAttachment {
402                        id: eni_id.clone(),
403                        attachment_type: "eni".into(),
404                        status: "ATTACHED".into(),
405                        details: vec![
406                            crate::state::AttachmentDetail {
407                                name: "subnetId".into(),
408                                value: "subnet-fakecloud".into(),
409                            },
410                            crate::state::AttachmentDetail {
411                                name: "privateIPv4Address".into(),
412                                value: ip.clone(),
413                            },
414                            crate::state::AttachmentDetail {
415                                name: "macAddress".into(),
416                                value: mac.clone(),
417                            },
418                        ],
419                    });
420                }
421            }
422            tracing::info!(
423                task = %task_id,
424                eni = %eni_id,
425                ip = %ip,
426                "populated awsvpc ENI attachment"
427            );
428        }
429
430        // Launch every container detached, in topological order. Before
431        // each `docker run` we honour the dependent's `dependsOn[]` by
432        // polling docker until each upstream container reaches the
433        // requested condition (START/COMPLETE/SUCCESS/HEALTHY). If any
434        // fails to start (or an upstream gate times out), kill the
435        // already-started containers and bail — partial-launch state is
436        // harder to reason about than a clean failure.
437        let mut started: Vec<RunningContainer> = Vec::with_capacity(resolved_plans.len());
438        for (idx, (rp, run_image)) in resolved_plans.iter().zip(run_images.iter()).enumerate() {
439            // Wait for every dependsOn[] entry on this container. Upstreams
440            // declared in the same task always show up earlier in the
441            // launch order thanks to topo_sort_plans, so we only ever look
442            // backwards into `started`.
443            for dep in &rp.plan.depends_on {
444                let upstream = match started.iter().find(|c| c.name == dep.container_name) {
445                    Some(u) => u,
446                    // Upstream not in this task definition (we ignored it
447                    // during topo-sort too). Skip the gate — this matches
448                    // the existing "ignore unknown dependency" behaviour.
449                    None => continue,
450                };
451                // Whether the upstream has a healthCheck configured —
452                // governs the HEALTHY shortcut: AWS treats HEALTHY as
453                // immediately satisfied when the upstream has no probe.
454                let upstream_has_health_check = resolved_plans
455                    .iter()
456                    .find(|p| p.plan.container_name == dep.container_name)
457                    .is_some_and(|p| p.plan.health_check.is_some());
458                if let Err(err) = self
459                    .wait_for_depends_on(upstream, dep.condition, upstream_has_health_check)
460                    .await
461                {
462                    self.cleanup_partial_start(&started);
463                    return Err(err);
464                }
465            }
466            let argv = build_run_argv(&rp.plan, &rp.env, task_id, &self.host_ip, run_image);
467            let mut cmd = Command::new(&self.cli);
468            cmd.args(&argv);
469            let run_out = cmd.output().await.map_err(|e| {
470                // Cleanup already-started containers on launch failure.
471                self.cleanup_partial_start(&started);
472                RuntimeError::ContainerStart(e.to_string())
473            })?;
474            if !run_out.status.success() {
475                let err = String::from_utf8_lossy(&run_out.stderr).to_string();
476                self.cleanup_partial_start(&started);
477                return Err(RuntimeError::ContainerStart(err));
478            }
479            let container_id = String::from_utf8_lossy(&run_out.stdout).trim().to_string();
480            started.push(RunningContainer {
481                name: rp.plan.container_name.clone(),
482                container_id,
483                essential: rp.plan.essential,
484                exit_code: None,
485                network_bindings: network_bindings_for(&rp.plan),
486                image_digest: image_digests.get(idx).cloned().unwrap_or(None),
487            });
488        }
489
490        // Stash all (name, container_id) pairs so StopTask/stop_all can
491        // reach every container backing this task.
492        {
493            let mut guard = self.containers.write();
494            guard.insert(
495                task_id.to_string(),
496                started
497                    .iter()
498                    .map(|c| (c.name.clone(), c.container_id.clone()))
499                    .collect(),
500            );
501        }
502        mark_running_multi(state, account_id, task_id, &started);
503        self.register_lb_targets(state, account_id, task_id);
504        self.emit_state_change(state, account_id, task_id, "RUNNING", None);
505
506        // Wait for the first essential container (or, if none are
507        // essential, any container) to exit. ECS task lifetime is
508        // bounded by the first essential exit, after which all remaining
509        // containers are stopped. While polling we also refresh each
510        // container's `healthStatus` from `docker inspect` so
511        // DescribeTasks reflects HEALTHCHECK transitions in near real
512        // time.
513        let wait_outcome = self
514            .wait_for_task_exit_with_health(state, account_id, task_id, &started)
515            .await?;
516
517        // Stop and reap any sidecars still running. Best-effort — failures
518        // here shouldn't keep the task from transitioning to STOPPED.
519        let mut final_containers = started.clone();
520        for (i, rc) in started.iter().enumerate() {
521            if Some(i) == wait_outcome.exited_index {
522                final_containers[i].exit_code = Some(wait_outcome.exit_code);
523                continue;
524            }
525            // Try to grab the exit code if the container already exited
526            // on its own (non-essential exits don't stop the task), then
527            // fall back to `docker stop` for stragglers.
528            let inspect = Command::new(&self.cli)
529                .args(["inspect", "-f", "{{.State.ExitCode}}", &rc.container_id])
530                .output()
531                .await;
532            let still_running = match inspect {
533                Ok(out) if out.status.success() => {
534                    let s = String::from_utf8_lossy(&out.stdout).trim().to_string();
535                    // `docker inspect` returns 0 for not-yet-exited
536                    // containers, so we additionally check `State.Running`.
537                    let running = Command::new(&self.cli)
538                        .args(["inspect", "-f", "{{.State.Running}}", &rc.container_id])
539                        .output()
540                        .await
541                        .map(|o| String::from_utf8_lossy(&o.stdout).trim() == "true")
542                        .unwrap_or(false);
543                    if !running {
544                        if let Ok(code) = s.parse::<i64>() {
545                            final_containers[i].exit_code = Some(code);
546                        }
547                    }
548                    running
549                }
550                _ => false,
551            };
552            if still_running {
553                let _ = Command::new(&self.cli)
554                    .args(["stop", "--time", "10", &rc.container_id])
555                    .output()
556                    .await;
557                let wait_out = Command::new(&self.cli)
558                    .args(["wait", &rc.container_id])
559                    .output()
560                    .await;
561                if let Ok(out) = wait_out {
562                    let code: i64 = String::from_utf8_lossy(&out.stdout)
563                        .trim()
564                        .parse()
565                        .unwrap_or(-1);
566                    final_containers[i].exit_code = Some(code);
567                }
568            }
569        }
570
571        // Capture combined stdout+stderr from every container so the
572        // introspection endpoint shows logs from sidecars too.
573        let mut captured = String::new();
574        for rc in &started {
575            let logs_out = Command::new(&self.cli)
576                .args(["logs", &rc.container_id])
577                .output()
578                .await
579                .map_err(|e| RuntimeError::Wait(e.to_string()))?;
580            captured.push_str(&format!("[{}] ", rc.name));
581            captured.push_str(&String::from_utf8_lossy(&logs_out.stdout));
582            captured.push_str(&String::from_utf8_lossy(&logs_out.stderr));
583        }
584
585        // Reap every container we own.
586        for rc in &started {
587            let _ = Command::new(&self.cli)
588                .args(["rm", "-f", &rc.container_id])
589                .output()
590                .await;
591        }
592        // Clean up the per-task docker network for awsvpc.
593        if network_created {
594            let _ = Command::new(&self.cli)
595                .args(["network", "rm", &network_name])
596                .output()
597                .await;
598        }
599        self.containers.write().remove(task_id);
600
601        // Forward logs BEFORE flipping the task to STOPPED so a client
602        // that polls DescribeTasks and immediately queries
603        // DescribeLogStreams can't observe the STOPPED transition before
604        // the awslogs group/stream has been materialised.
605        self.forward_awslogs_if_configured(state, account_id, task_id, &captured);
606        let exit_code = wait_outcome.exit_code;
607        finalize_stopped_multi(
608            state,
609            account_id,
610            task_id,
611            &final_containers,
612            exit_code,
613            &captured,
614            wait_outcome.stop_code,
615            None,
616        );
617        self.deregister_lb_targets(state, account_id, task_id);
618        self.emit_state_change(
619            state,
620            account_id,
621            task_id,
622            "STOPPED",
623            Some((wait_outcome.stop_code, format!("Exit code {}", exit_code))),
624        );
625        Ok(())
626    }
627
628    /// Wait for the task to reach a stop condition (any essential
629    /// container exits, or every container exits when none are
630    /// essential) while also polling `docker inspect .State.Health.Status`
631    /// on every iteration to push the latest `healthStatus` onto each
632    /// task container — so DescribeTasks shows live HEALTHCHECK
633    /// transitions instead of the boot-time `UNKNOWN`. Returns the
634    /// index into `started` of the container whose exit determined the
635    /// task lifetime, its exit code, and the stopCode.
636    async fn wait_for_task_exit_with_health(
637        &self,
638        state: &SharedEcsState,
639        account_id: &str,
640        task_id: &str,
641        started: &[RunningContainer],
642    ) -> Result<TaskExitOutcome, RuntimeError> {
643        let any_essential = started.iter().any(|c| c.essential);
644        let mut working: Vec<RunningContainer> = started.to_vec();
645        let mut first_exited: Option<usize> = None;
646        loop {
647            // Refresh health status before checking exits so a container
648            // that goes UNHEALTHY -> exits in the same iteration leaves
649            // its final health state on the task before we transition to
650            // STOPPED.
651            self.refresh_health_status(state, account_id, task_id, started)
652                .await;
653            for (i, rc) in started.iter().enumerate() {
654                if working[i].exit_code.is_some() {
655                    continue;
656                }
657                let inspect = Command::new(&self.cli)
658                    .args(["inspect", "-f", "{{.State.Running}}", &rc.container_id])
659                    .output()
660                    .await;
661                let running = match inspect {
662                    Ok(out) if out.status.success() => {
663                        String::from_utf8_lossy(&out.stdout).trim() == "true"
664                    }
665                    _ => false,
666                };
667                if running {
668                    continue;
669                }
670                let wait_out = Command::new(&self.cli)
671                    .args(["wait", &rc.container_id])
672                    .output()
673                    .await
674                    .map_err(|e| RuntimeError::Wait(e.to_string()))?;
675                if !wait_out.status.success() {
676                    let err = String::from_utf8_lossy(&wait_out.stderr).to_string();
677                    return Err(RuntimeError::Wait(err));
678                }
679                let exit_code: i64 = String::from_utf8_lossy(&wait_out.stdout)
680                    .trim()
681                    .parse()
682                    .unwrap_or(-1);
683                working[i].exit_code = Some(exit_code);
684                if first_exited.is_none() && (rc.essential || !any_essential) {
685                    first_exited = Some(i);
686                }
687            }
688            if task_should_stop(&working) {
689                let idx = first_exited
690                    .or_else(|| working.iter().position(|c| c.exit_code.is_some()))
691                    .unwrap_or(0);
692                let exit_code = working[idx].exit_code.unwrap_or(-1);
693                return Ok(TaskExitOutcome {
694                    exited_index: Some(idx),
695                    exit_code,
696                    stop_code: if any_essential {
697                        "EssentialContainerExited"
698                    } else {
699                        "TaskCompleted"
700                    },
701                });
702            }
703            sleep(Duration::from_millis(200)).await;
704        }
705    }
706
707    /// Inspect each running container's `.State.Health.Status` and push
708    /// the mapped ECS healthStatus onto the task's container list.
709    /// Best-effort: a failed inspect (e.g. container already removed)
710    /// leaves the previous status untouched.
711    async fn refresh_health_status(
712        &self,
713        state: &SharedEcsState,
714        account_id: &str,
715        task_id: &str,
716        started: &[RunningContainer],
717    ) {
718        let mut updates: Vec<(String, String)> = Vec::with_capacity(started.len());
719        for rc in started {
720            let out = Command::new(&self.cli)
721                .args([
722                    "inspect",
723                    "-f",
724                    "{{if .State.Health}}{{.State.Health.Status}}{{else}}{{end}}",
725                    &rc.container_id,
726                ])
727                .output()
728                .await;
729            let status = match out {
730                Ok(o) if o.status.success() => {
731                    let raw = String::from_utf8_lossy(&o.stdout).trim().to_string();
732                    if raw.is_empty() {
733                        // No HEALTHCHECK on this container — leave the
734                        // ECS-side status as UNKNOWN (matches AWS).
735                        "UNKNOWN".to_string()
736                    } else {
737                        docker_health_to_ecs(&raw).to_string()
738                    }
739                }
740                _ => continue,
741            };
742            updates.push((rc.name.clone(), status));
743        }
744        if updates.is_empty() {
745            return;
746        }
747        let mut accounts = state.write();
748        let Some(s) = accounts.get_mut(account_id) else {
749            return;
750        };
751        let Some(task) = s.tasks.get_mut(task_id) else {
752            return;
753        };
754        for (name, status) in updates {
755            if let Some(c) = task.containers.iter_mut().find(|c| c.name == name) {
756                c.health_status = Some(status);
757            }
758        }
759    }
760
761    /// Best-effort image digest lookup via `docker image inspect` after a
762    /// pull. Returns the first `RepoDigests[0]` entry's `sha256:...` tail
763    /// when present, matching what AWS ECS returns on `DescribeTasks`.
764    /// `None` on any failure so digest extraction never fails the task.
765    async fn lookup_image_digest(&self, pull_uri: &str) -> Option<String> {
766        let out = self
767            .cli_command()
768            .args([
769                "image",
770                "inspect",
771                "-f",
772                "{{index .RepoDigests 0}}",
773                pull_uri,
774            ])
775            .output()
776            .await
777            .ok()?;
778        if !out.status.success() {
779            return None;
780        }
781        let raw = String::from_utf8_lossy(&out.stdout).trim().to_string();
782        if raw.is_empty() || raw == "<no value>" {
783            return None;
784        }
785        // RepoDigests entries are `<repo>@sha256:<hex>`. Real ECS surfaces
786        // the digest portion only.
787        Some(
788            raw.rsplit_once('@')
789                .map(|(_, d)| d.to_string())
790                .unwrap_or(raw),
791        )
792    }
793
794    /// Block the launch of a dependent container until its upstream
795    /// reaches the requested `dependsOn[].condition`. We poll
796    /// `docker inspect` at a small interval; the wait is bounded by an
797    /// AWS-style timeout (120s by default — long enough for image
798    /// startup but short enough to surface bugs as a clean
799    /// `ContainerStart` failure).
800    ///
801    /// `upstream_has_health_check` is needed for the `HEALTHY` branch:
802    /// when the upstream has no healthCheck, AWS treats `HEALTHY` as
803    /// immediately satisfied (otherwise the dependent would block
804    /// forever, since docker reports `Health.Status` only when the
805    /// container has a HEALTHCHECK directive).
806    async fn wait_for_depends_on(
807        &self,
808        upstream: &RunningContainer,
809        condition: DependsOnCondition,
810        upstream_has_health_check: bool,
811    ) -> Result<(), RuntimeError> {
812        // Bounded wait — chosen to comfortably cover slow init scripts
813        // without letting a wedged dependency stall a task indefinitely.
814        const WAIT_TIMEOUT: Duration = Duration::from_secs(120);
815        const POLL_INTERVAL: Duration = Duration::from_millis(200);
816
817        // HEALTHY against an upstream without a healthCheck: AWS treats
818        // this as immediately satisfied because there's no probe to
819        // observe. Skip the polling loop entirely so the dependent isn't
820        // wedged forever waiting for a status that docker will never set.
821        if matches!(condition, DependsOnCondition::Healthy) && !upstream_has_health_check {
822            return Ok(());
823        }
824
825        let deadline = std::time::Instant::now() + WAIT_TIMEOUT;
826        loop {
827            let inspect = inspect_container_state(&self.cli, &upstream.container_id).await;
828            if let Some(state) = inspect {
829                if condition_is_met(condition, &state) {
830                    return Ok(());
831                }
832                // SUCCESS specifically: if the container exited with a
833                // non-zero code, the gate can never be satisfied. Bail
834                // immediately rather than waiting for the timeout — this
835                // matches ECS's "stoppedReason: dependency failed" path.
836                if matches!(condition, DependsOnCondition::Success)
837                    && state.exited
838                    && state.exit_code != 0
839                {
840                    return Err(RuntimeError::ContainerStart(format!(
841                        "dependency on container {} ({}) failed: upstream exited with code {}",
842                        upstream.name,
843                        DependsOnCondition::Success.as_aws_str(),
844                        state.exit_code,
845                    )));
846                }
847            }
848            if std::time::Instant::now() >= deadline {
849                return Err(RuntimeError::ContainerStart(format!(
850                    "timed out waiting for container {} to reach condition {}",
851                    upstream.name,
852                    condition.as_aws_str(),
853                )));
854            }
855            tokio::time::sleep(POLL_INTERVAL).await;
856        }
857    }
858
859    /// Best-effort cleanup of containers we already started when a later
860    /// container in the task failed to launch. Without this, half-launched
861    /// tasks leak docker containers.
862    fn cleanup_partial_start(&self, started: &[RunningContainer]) {
863        let cli = self.cli.clone();
864        let ids: Vec<String> = started.iter().map(|c| c.container_id.clone()).collect();
865        tokio::spawn(async move {
866            for id in ids {
867                let _ = Command::new(&cli).args(["rm", "-f", &id]).output().await;
868            }
869        });
870    }
871
872    /// Resolve a `secrets[].valueFrom` reference to the actual secret
873    /// payload. Supports SecretsManager secret ARNs and SSM parameter
874    /// ARNs; returns `None` when the referenced state isn't wired or
875    /// the lookup misses.
876    fn resolve_secret(&self, account_id: &str, value_from: &str) -> Option<String> {
877        if value_from.contains(":secret:") {
878            let state = self.secretsmanager_state.as_ref()?;
879            let accounts = state.read();
880            let sm = accounts.get(account_id)?;
881            // ARN shape: arn:aws:secretsmanager:<region>:<acct>:secret:<name>-<6char>
882            // Stored key is the secret name (no suffix). Strip the
883            // AWS-generated 6-char suffix when comparing.
884            let arn_tail = value_from.rsplit(":secret:").next()?;
885            let name = arn_tail
886                .rsplit_once('-')
887                .map(|(n, _)| n)
888                .unwrap_or(arn_tail);
889            let secret = sm.secrets.get(name).or_else(|| sm.secrets.get(arn_tail))?;
890            let version_id = secret.current_version_id.as_ref()?;
891            let v = secret.versions.get(version_id)?;
892            return v.secret_string.clone();
893        }
894        if value_from.contains(":parameter") {
895            let state = self.ssm_state.as_ref()?;
896            let accounts = state.read();
897            let ssm = accounts.get(account_id)?;
898            // ARN shape: arn:aws:ssm:<region>:<acct>:parameter/<name>
899            // Parameters are stored keyed by name (with leading slash)
900            // or without, depending on how they were created. Try both.
901            let after = value_from.rsplit(":parameter").next()?;
902            let name_with_slash = after.trim_start_matches('/');
903            return ssm
904                .parameters
905                .get(&format!("/{name_with_slash}"))
906                .or_else(|| ssm.parameters.get(name_with_slash))
907                .map(|p| p.value.clone());
908        }
909        None
910    }
911
912    /// Emit an `ECS Task State Change` EventBridge event. No-op when no
913    /// delivery bus is wired. Matches AWS event shape so downstream
914    /// rules can filter on `detail.lastStatus`, `detail.stopCode`, etc.
915    fn emit_state_change(
916        &self,
917        state: &SharedEcsState,
918        account_id: &str,
919        task_id: &str,
920        last_status: &str,
921        stop: Option<(&str, String)>,
922    ) {
923        let Some(ref bus) = self.delivery_bus else {
924            return;
925        };
926        let Some(task_view) = snapshot_task(state, account_id, task_id) else {
927            return;
928        };
929        let mut detail = serde_json::json!({
930            "taskArn": task_view.task_arn,
931            "clusterArn": task_view.cluster_arn,
932            "lastStatus": last_status,
933            "desiredStatus": if last_status == "STOPPED" { "STOPPED" } else { "RUNNING" },
934            "launchType": task_view.launch_type,
935            "group": task_view.group,
936            "taskDefinitionArn": task_view.task_definition_arn,
937            "containers": task_view.containers,
938        });
939        if let Some((code, reason)) = stop {
940            detail["stopCode"] = code.into();
941            detail["stoppedReason"] = reason.into();
942        }
943        bus.put_event_to_eventbridge(
944            "aws.ecs",
945            "ECS Task State Change",
946            &detail.to_string(),
947            "default",
948        );
949    }
950
951    /// Forward captured stdout/stderr to CloudWatch Logs when the task's
952    /// container definition declares the `awslogs` log driver. No-op when
953    /// logs_state isn't wired or the task has no awslogs config.
954    fn forward_awslogs_if_configured(
955        &self,
956        state: &SharedEcsState,
957        account_id: &str,
958        task_id: &str,
959        captured: &str,
960    ) {
961        let Some(ref logs) = self.logs_state else {
962            return;
963        };
964        // Clone out of the read guard so we don't hold it across the logs
965        // state write.
966        let (cfg, task_region) = {
967            let accounts = state.read();
968            let Some(s) = accounts.get(account_id) else {
969                return;
970            };
971            let Some(task) = s.tasks.get(task_id) else {
972                return;
973            };
974            let Some(ref cfg) = task.awslogs else {
975                return;
976            };
977            (cfg.clone(), s.region.clone())
978        };
979        if captured.is_empty() {
980            return;
981        }
982        let now = Utc::now().timestamp_millis();
983        let stream_name = cfg.stream_name(task_id);
984        let events: Vec<IngestEvent> = captured
985            .lines()
986            .enumerate()
987            .map(|(i, line)| IngestEvent {
988                // Stagger within the same millisecond so CloudWatch's
989                // chronological-order invariant holds without relying on
990                // the host clock's resolution.
991                timestamp_ms: now.saturating_add(i as i64),
992                message: line.to_string(),
993            })
994            .collect();
995        append_events(
996            logs,
997            account_id,
998            &task_region,
999            &cfg.group,
1000            &stream_name,
1001            &events,
1002        );
1003    }
1004
1005    /// Kill every container behind a task with the configured stop
1006    /// timeout. Returns true if at least one container was killed. Called
1007    /// synchronously from `StopTask`; the wait loop in `run_task_inner`
1008    /// observes the exits and transitions the task to `STOPPED`.
1009    pub async fn stop_task(&self, task_id: &str, reason: &str) -> bool {
1010        let containers = self.containers.read().get(task_id).cloned();
1011        let Some(list) = containers else {
1012            return false;
1013        };
1014        if list.is_empty() {
1015            return false;
1016        }
1017        // `docker stop` sends SIGTERM then SIGKILL after a timeout.
1018        for (_name, id) in &list {
1019            let _ = Command::new(&self.cli)
1020                .args(["stop", "--time", "10", id])
1021                .output()
1022                .await;
1023        }
1024        tracing::info!(task = %task_id, reason = %reason, "ecs task stop requested");
1025        true
1026    }
1027
1028    /// Kill every running container the runtime owns. Called on reset /
1029    /// shutdown so docker state matches fakecloud state after a fresh
1030    /// boot.
1031    pub async fn stop_all(&self) {
1032        let ids: Vec<String> = self
1033            .containers
1034            .read()
1035            .values()
1036            .flat_map(|list| list.iter().map(|(_, id)| id.clone()))
1037            .collect();
1038        for id in ids {
1039            let _ = Command::new(&self.cli).args(["kill", &id]).output().await;
1040            let _ = Command::new(&self.cli).args(["rm", &id]).output().await;
1041        }
1042        self.containers.write().clear();
1043    }
1044}
1045
1046/// Per-container launch plan derived from a task definition.
1047#[derive(Clone, Debug)]
1048pub(crate) struct ContainerPlan {
1049    pub(crate) container_name: String,
1050    pub(crate) image: String,
1051    pub(crate) env: Vec<(String, String)>,
1052    pub(crate) entry_point: Vec<String>,
1053    pub(crate) command: Vec<String>,
1054    pub(crate) secrets_refs: Vec<(String, String)>,
1055    pub(crate) essential: bool,
1056    pub(crate) has_task_role: bool,
1057    /// Port mappings parsed from the task definition. Each entry becomes
1058    /// a `--publish containerPort:hostPort/protocol` flag on the docker
1059    /// run command (except for `awsvpc`, where ports are exposed via the
1060    /// per-task ENI rather than the docker host's port table).
1061    pub(crate) port_mappings: Vec<PortMapping>,
1062    /// Task-level network mode propagated to every container plan so the
1063    /// argv builder can decide whether to emit `--publish` flags. Real
1064    /// ECS treats `awsvpc` as "container is on its own ENI"; the
1065    /// equivalent in fakecloud is "don't publish to the host".
1066    pub(crate) network_mode: Option<String>,
1067    /// Container dependencies parsed from `dependsOn[]`. Each entry pairs
1068    /// the target container name with the condition that must be observed
1069    /// before this container is launched: `START` (target exists/running),
1070    /// `COMPLETE` (target exited, any code), `SUCCESS` (target exited with
1071    /// code 0), or `HEALTHY` (target's docker `Health.Status` is `healthy`).
1072    /// Used both to topologically order the launch loop and to gate each
1073    /// `docker run` on the upstream condition.
1074    pub(crate) depends_on: Vec<DependsOn>,
1075    /// Parsed `healthCheck` from the task definition. Translated into
1076    /// docker `--health-*` flags on `docker run` so the container's
1077    /// health is observable via `docker inspect .State.Health.Status`.
1078    /// `None` when the task definition doesn't declare a healthCheck;
1079    /// the container's `healthStatus` then stays `UNKNOWN` (matching ECS
1080    /// behaviour for tasks without a health probe).
1081    pub(crate) health_check: Option<HealthCheckSpec>,
1082    /// Volume mounts resolved by joining the container definition's
1083    /// `mountPoints[]` with the task definition's `volumes[]`. Each entry
1084    /// renders as one `-v` flag on the `docker run` invocation. Empty when
1085    /// the container has no mount points or no matching volume entries.
1086    pub(crate) volume_mounts: Vec<VolumeMount>,
1087    /// Parsed `ulimits` from the container definition. Each entry becomes
1088    /// `--ulimit <name>=<soft>:<hard>` on `docker run`.
1089    pub(crate) ulimits: Vec<Ulimit>,
1090    /// Parsed `linuxParameters` from the container definition. Emits
1091    /// `--cap-add`, `--cap-drop`, `--device`, `--init`, `--shm-size`,
1092    /// `--sysctl`, `--tmpfs`, `--privileged`, and `--read-only` flags.
1093    pub(crate) linux_parameters: Option<LinuxParameters>,
1094    /// `stopTimeout` in seconds. Becomes `--stop-timeout <N>` on `docker run`.
1095    pub(crate) stop_timeout: Option<u32>,
1096    /// `user` from the container definition. Becomes `--user <value>`.
1097    pub(crate) user: Option<String>,
1098    /// `workingDirectory` from the container definition. Becomes `--workdir`.
1099    pub(crate) working_directory: Option<String>,
1100    /// `tty` from the container definition. Emits `--tty` when true.
1101    pub(crate) tty: bool,
1102    /// `interactive` from the container definition. Emits `--interactive` when true.
1103    pub(crate) interactive: bool,
1104    /// `readonlyRootFilesystem` from the container definition. Emits `--read-only` when true.
1105    pub(crate) readonly_rootfs: bool,
1106}
1107
1108/// One parsed `dependsOn[]` entry on a container. Pairs the upstream
1109/// container name with the condition that must hold before the dependent
1110/// container is launched. AWS spells the conditions `START`, `COMPLETE`,
1111/// `SUCCESS`, `HEALTHY` and treats anything else as an error at register
1112/// time — we mirror that in [`parse_depends_on`].
1113#[derive(Clone, Debug, PartialEq, Eq)]
1114pub(crate) struct DependsOn {
1115    pub container_name: String,
1116    pub condition: DependsOnCondition,
1117}
1118
1119/// `dependsOn[].condition` from the task definition. The variants map
1120/// 1:1 to AWS's documented values; the launch loop polls docker for the
1121/// matching predicate before starting the dependent container.
1122#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1123pub(crate) enum DependsOnCondition {
1124    /// Upstream container has been started (docker container exists and
1125    /// is either running or has exited).
1126    Start,
1127    /// Upstream container has exited (any exit code).
1128    Complete,
1129    /// Upstream container has exited with code 0.
1130    Success,
1131    /// Upstream container's `Health.Status` is `healthy`. When the
1132    /// upstream has no healthCheck configured, AWS treats this as
1133    /// immediately satisfied — we do the same.
1134    Healthy,
1135}
1136
1137impl DependsOnCondition {
1138    /// Parse the AWS-spelled condition string. Returns `None` for
1139    /// unrecognised values so callers can surface a `ClientException`
1140    /// at register time.
1141    pub fn parse(raw: &str) -> Option<Self> {
1142        match raw {
1143            "START" => Some(Self::Start),
1144            "COMPLETE" => Some(Self::Complete),
1145            "SUCCESS" => Some(Self::Success),
1146            "HEALTHY" => Some(Self::Healthy),
1147            _ => None,
1148        }
1149    }
1150
1151    /// AWS-spelled string for this condition. Used in user-facing error
1152    /// messages so timeout/dependency-failed reasons echo back the same
1153    /// value the user wrote in their task definition.
1154    pub fn as_aws_str(self) -> &'static str {
1155        match self {
1156            Self::Start => "START",
1157            Self::Complete => "COMPLETE",
1158            Self::Success => "SUCCESS",
1159            Self::Healthy => "HEALTHY",
1160        }
1161    }
1162}
1163
1164/// Container health check parsed from the ECS task definition. Each
1165/// field maps 1:1 to a docker `--health-*` flag on `docker run`. AWS
1166/// defaults: interval=30s, timeout=5s, retries=3, startPeriod=0s — we
1167/// preserve those defaults at parse time so the argv builder always
1168/// has concrete values to emit.
1169#[derive(Clone, Debug, PartialEq, Eq)]
1170pub(crate) struct HealthCheckSpec {
1171    /// `command[]` from the task definition. The first element selects
1172    /// the docker syntax: `CMD-SHELL` => `--health-cmd <rest joined by space>`,
1173    /// `CMD` => `--health-cmd <rest joined by space>` (still routed to
1174    /// `--health-cmd` because docker doesn't accept argv-form here),
1175    /// `NONE` => no flag emitted (caller skips emitting healthcheck).
1176    pub command: Vec<String>,
1177    pub interval_seconds: u32,
1178    pub timeout_seconds: u32,
1179    pub retries: u32,
1180    pub start_period_seconds: u32,
1181}
1182
1183/// One entry in a container's `portMappings`. Mirrors the AWS shape so
1184/// [`build_run_argv`] and the `networkBindings` response can share the
1185/// same parsed representation.
1186#[derive(Clone, Debug, PartialEq, Eq)]
1187pub(crate) struct PortMapping {
1188    pub container_port: u16,
1189    /// `0` (or unset in the source JSON) means "use the same value as
1190    /// containerPort" — host-mode default per AWS docs.
1191    pub host_port: u16,
1192    /// Lower-case `tcp` / `udp`. Defaults to `tcp` when omitted.
1193    pub protocol: String,
1194}
1195
1196/// One resolved `mountPoints` entry on a container plan. Computed at
1197/// launch by joining the container definition's `mountPoints` against the
1198/// task definition's `volumes` array. Each entry becomes a single
1199/// `-v <source>:<containerPath>[:ro]` flag on `docker run`.
1200///
1201/// Source resolution by volume kind:
1202/// - **host bind** (`volume.host.sourcePath` set): bind the host path
1203///   into the container at `containerPath`.
1204/// - **EFS** (`efsVolumeConfiguration` set): bind a host-side stub
1205///   directory at `/tmp/fakecloud/efs/<filesystemId>[/<rootDirectory>]`
1206///   so multiple tasks targeting the same filesystem id can share state
1207///   the way real EFS would. The stub directory is created with
1208///   `mkdir -p` ahead of `docker run`.
1209/// - **FSx for Windows** (`fsxWindowsFileServerVolumeConfiguration` set):
1210///   stub directory at `/tmp/fakecloud/fsx/<filesystemId>/<rootDirectory>`
1211///   created the same way as EFS.
1212/// - **Docker named volume** (`dockerVolumeConfiguration` set): pass the
1213///   volume name through to docker as a named volume reference.
1214/// - **Bare volume** (only `name` set, no host config): treated as an
1215///   anonymous docker volume for that task — matches AWS's "Docker
1216///   volumes" default scope.
1217#[derive(Clone, Debug, PartialEq, Eq)]
1218pub(crate) struct VolumeMount {
1219    /// Left side of `-v`: a host path, a docker named volume, or a stub
1220    /// directory under `/tmp/fakecloud/{efs,fsx}/...` for shared FS
1221    /// emulation.
1222    pub source: String,
1223    /// Container-side path, taken verbatim from the container
1224    /// definition's `mountPoints[].containerPath`.
1225    pub container_path: String,
1226    /// `mountPoints[].readOnly` honoured: when true, append `:ro` to the
1227    /// `-v` flag so the bind/named volume is read-only inside the
1228    /// container. Defaults to false (read-write) when omitted.
1229    pub read_only: bool,
1230}
1231
1232/// One `ulimits` entry. Becomes `--ulimit <name>=<soft>:<hard>`.
1233#[derive(Clone, Debug, PartialEq, Eq)]
1234pub(crate) struct Ulimit {
1235    pub name: String,
1236    pub soft_limit: i32,
1237    pub hard_limit: i32,
1238}
1239
1240/// One `linuxParameters.devices` entry. Becomes `--device <hostPath>:<containerPath><permissions>`.
1241#[derive(Clone, Debug, PartialEq, Eq)]
1242pub(crate) struct Device {
1243    pub host_path: String,
1244    pub container_path: String,
1245    pub permissions: String,
1246}
1247
1248/// One `linuxParameters.sysctl` entry. Becomes `--sysctl <name>=<value>`.
1249#[derive(Clone, Debug, PartialEq, Eq)]
1250pub(crate) struct Sysctl {
1251    pub name: String,
1252    pub value: String,
1253}
1254
1255/// Parsed `linuxParameters` from the container definition.
1256#[derive(Clone, Debug, PartialEq, Eq, Default)]
1257pub(crate) struct LinuxParameters {
1258    pub capabilities_add: Vec<String>,
1259    pub capabilities_drop: Vec<String>,
1260    pub devices: Vec<Device>,
1261    pub init_process_enabled: bool,
1262    pub shared_memory_size: Option<i32>,
1263    pub sysctls: Vec<Sysctl>,
1264    pub tmpfs: Vec<Tmpfs>,
1265    pub privileged: bool,
1266}
1267
1268/// One `linuxParameters.tmpfs` entry. Becomes `--tmpfs <containerPath>:size=<size>M<,options>*`.
1269#[derive(Clone, Debug, PartialEq, Eq)]
1270pub(crate) struct Tmpfs {
1271    pub container_path: String,
1272    pub size: i32,
1273    pub mount_options: Vec<String>,
1274}
1275
1276#[derive(Clone, Debug)]
1277struct ResolvedContainerPlan {
1278    plan: ContainerPlan,
1279    env: Vec<(String, String)>,
1280}
1281
1282/// Result of waiting for a task's lifetime-determining container.
1283#[derive(Clone, Debug)]
1284struct TaskExitOutcome {
1285    /// Index into the started-containers list of the container whose exit
1286    /// closed out the task. `None` only in degenerate cases — kept as
1287    /// `Option` so `final_containers` indexing stays explicit.
1288    exited_index: Option<usize>,
1289    exit_code: i64,
1290    stop_code: &'static str,
1291}
1292
1293/// Per-container record persisted on the task. Mirrors the AWS Container
1294/// shape but tracks the docker-side container id alongside ECS metadata.
1295#[derive(Clone, Debug)]
1296pub(crate) struct RunningContainer {
1297    pub(crate) name: String,
1298    pub(crate) container_id: String,
1299    pub(crate) essential: bool,
1300    pub(crate) exit_code: Option<i64>,
1301    /// Resolved `networkBindings` for DescribeTasks. Computed from the
1302    /// task definition's `portMappings` at launch and surfaced verbatim
1303    /// in the per-container response.
1304    pub(crate) network_bindings: Vec<serde_json::Value>,
1305    /// Image digest captured from `docker inspect` after pull. AWS
1306    /// surfaces this on the Container response so callers can pin which
1307    /// exact image revision a task is running. `None` when the inspect
1308    /// failed or the CLI didn't expose `RepoDigests`.
1309    pub(crate) image_digest: Option<String>,
1310}
1311
1312/// Pure decision: does the current set of containers warrant stopping
1313/// the task? Returns true when any essential container has exited, or
1314/// when every container has exited (regardless of essential). Mirrors
1315/// AWS ECS task lifetime semantics.
1316pub(crate) fn task_should_stop(containers: &[RunningContainer]) -> bool {
1317    if containers.is_empty() {
1318        return true;
1319    }
1320    let any_essential_exited = containers
1321        .iter()
1322        .any(|c| c.essential && c.exit_code.is_some());
1323    if any_essential_exited {
1324        return true;
1325    }
1326    containers.iter().all(|c| c.exit_code.is_some())
1327}
1328
1329fn build_container_plans(
1330    state: &SharedEcsState,
1331    account_id: &str,
1332    task_id: &str,
1333    _server_port: u16,
1334) -> Result<Vec<ContainerPlan>, RuntimeError> {
1335    let accounts = state.read();
1336    let s = accounts
1337        .get(account_id)
1338        .ok_or_else(|| RuntimeError::ContainerStart("account missing".into()))?;
1339    let task = s
1340        .tasks
1341        .get(task_id)
1342        .ok_or_else(|| RuntimeError::ContainerStart("task missing".into()))?;
1343    if task.containers.is_empty() {
1344        return Err(RuntimeError::ContainerStart(
1345            "task has no containers".into(),
1346        ));
1347    }
1348    let has_task_role = task.task_role_arn.is_some();
1349    let task_def = s
1350        .task_definitions
1351        .get(&task.family)
1352        .and_then(|revs| revs.get(&task.revision));
1353    let network_mode = task_def.and_then(|td| td.network_mode.clone());
1354    // Index `volumes[]` by name so each container's `mountPoints[]` can
1355    // resolve its volume in O(1). Real ECS rejects mountPoints that
1356    // reference an undeclared volume at register time; we don't yet, so
1357    // unresolved names just produce zero mounts at launch.
1358    let volumes_by_name: std::collections::HashMap<String, &serde_json::Value> = task_def
1359        .map(|td| {
1360            td.volumes
1361                .iter()
1362                .filter_map(|v| {
1363                    let name = v.get("name").and_then(|n| n.as_str())?;
1364                    Some((name.to_string(), v))
1365                })
1366                .collect()
1367        })
1368        .unwrap_or_default();
1369    let mut plans = Vec::with_capacity(task.containers.len());
1370    for container in &task.containers {
1371        let def = find_container_definition(s, &task.family, task.revision, &container.name);
1372        let secrets_refs = def
1373            .as_ref()
1374            .and_then(|d| d.get("secrets").and_then(|v| v.as_array()).cloned())
1375            .map(|arr| {
1376                arr.iter()
1377                    .filter_map(|e| {
1378                        let name = e.get("name").and_then(|v| v.as_str())?.to_string();
1379                        let value_from = e.get("valueFrom").and_then(|v| v.as_str())?.to_string();
1380                        Some((name, value_from))
1381                    })
1382                    .collect::<Vec<_>>()
1383            })
1384            .unwrap_or_default();
1385        let str_array = |key: &str| -> Vec<String> {
1386            def.as_ref()
1387                .and_then(|d| d.get(key).and_then(|v| v.as_array()).cloned())
1388                .map(|arr| {
1389                    arr.iter()
1390                        .filter_map(|v| v.as_str().map(String::from))
1391                        .collect::<Vec<_>>()
1392                })
1393                .unwrap_or_default()
1394        };
1395        let env = def
1396            .as_ref()
1397            .and_then(|d| d.get("environment").and_then(|v| v.as_array()).cloned())
1398            .map(|arr| {
1399                arr.iter()
1400                    .filter_map(|e| {
1401                        let k = e.get("name").and_then(|v| v.as_str())?;
1402                        let v = e.get("value").and_then(|v| v.as_str()).unwrap_or("");
1403                        Some((k.to_string(), v.to_string()))
1404                    })
1405                    .collect::<Vec<_>>()
1406            })
1407            .unwrap_or_default();
1408        let port_mappings = def
1409            .as_ref()
1410            .and_then(|d| d.get("portMappings").and_then(|v| v.as_array()).cloned())
1411            .map(|arr| {
1412                arr.iter()
1413                    .filter_map(parse_port_mapping)
1414                    .collect::<Vec<_>>()
1415            })
1416            .unwrap_or_default();
1417        let depends_on = def
1418            .as_ref()
1419            .and_then(|d| d.get("dependsOn").and_then(|v| v.as_array()).cloned())
1420            .map(|arr| {
1421                arr.iter()
1422                    .filter_map(parse_depends_on_entry)
1423                    .collect::<Vec<_>>()
1424            })
1425            .unwrap_or_default();
1426        let health_check = def
1427            .as_ref()
1428            .and_then(|d| d.get("healthCheck"))
1429            .and_then(parse_health_check);
1430        let volume_mounts = def
1431            .as_ref()
1432            .and_then(|d| d.get("mountPoints").and_then(|v| v.as_array()).cloned())
1433            .map(|arr| {
1434                arr.iter()
1435                    .filter_map(|mp| resolve_mount_point(mp, &volumes_by_name))
1436                    .collect::<Vec<_>>()
1437            })
1438            .unwrap_or_default();
1439        let ulimits = def
1440            .as_ref()
1441            .and_then(|d| d.get("ulimits").and_then(|v| v.as_array()).cloned())
1442            .map(|arr| arr.iter().filter_map(parse_ulimit).collect::<Vec<_>>())
1443            .unwrap_or_default();
1444        let linux_parameters = def
1445            .as_ref()
1446            .and_then(|d| d.get("linuxParameters"))
1447            .and_then(parse_linux_parameters);
1448        let stop_timeout = def.as_ref().and_then(|d| {
1449            d.get("stopTimeout")
1450                .and_then(|v| v.as_u64())
1451                .map(|n| n as u32)
1452        });
1453        let user = def
1454            .as_ref()
1455            .and_then(|d| d.get("user").and_then(|v| v.as_str()).map(String::from));
1456        let working_directory = def.as_ref().and_then(|d| {
1457            d.get("workingDirectory")
1458                .and_then(|v| v.as_str())
1459                .map(String::from)
1460        });
1461        let tty = def
1462            .as_ref()
1463            .and_then(|d| d.get("tty").and_then(|v| v.as_bool()))
1464            .unwrap_or(false);
1465        let interactive = def
1466            .as_ref()
1467            .and_then(|d| d.get("interactive").and_then(|v| v.as_bool()))
1468            .unwrap_or(false);
1469        let readonly_rootfs = def
1470            .as_ref()
1471            .and_then(|d| d.get("readonlyRootFilesystem").and_then(|v| v.as_bool()))
1472            .unwrap_or(false);
1473        plans.push(ContainerPlan {
1474            container_name: container.name.clone(),
1475            image: container.image.clone(),
1476            env,
1477            entry_point: str_array("entryPoint"),
1478            command: str_array("command"),
1479            secrets_refs,
1480            essential: container.essential,
1481            has_task_role,
1482            port_mappings,
1483            network_mode: network_mode.clone(),
1484            depends_on,
1485            health_check,
1486            volume_mounts,
1487            ulimits,
1488            linux_parameters,
1489            stop_timeout,
1490            user,
1491            working_directory,
1492            tty,
1493            interactive,
1494            readonly_rootfs,
1495        });
1496    }
1497    let plans = topo_sort_plans(plans);
1498    Ok(plans)
1499}
1500
1501/// Resolve one `mountPoints[]` entry against the indexed task-definition
1502/// volumes. Returns `None` when:
1503/// - the entry has no `containerPath` or `sourceVolume`,
1504/// - the named volume isn't declared on the task definition.
1505///
1506/// Returns `Some(VolumeMount)` for every supported volume kind:
1507/// host bind, EFS, FSx, named docker volume, anonymous docker volume.
1508fn resolve_mount_point(
1509    mount_point: &serde_json::Value,
1510    volumes_by_name: &std::collections::HashMap<String, &serde_json::Value>,
1511) -> Option<VolumeMount> {
1512    let container_path = mount_point
1513        .get("containerPath")
1514        .and_then(|v| v.as_str())?
1515        .to_string();
1516    let source_volume = mount_point.get("sourceVolume").and_then(|v| v.as_str())?;
1517    let read_only = mount_point
1518        .get("readOnly")
1519        .and_then(|v| v.as_bool())
1520        .unwrap_or(false);
1521    let volume = volumes_by_name.get(source_volume)?;
1522    let source = resolve_volume_source(source_volume, volume)?;
1523    Some(VolumeMount {
1524        source,
1525        container_path,
1526        read_only,
1527    })
1528}
1529
1530/// Map a single task-definition `volumes[]` entry to the source side of a
1531/// `docker run -v` flag. The matching here mirrors the AWS volume kinds:
1532///
1533/// 1. `host.sourcePath` -> use that path directly (bind mount).
1534/// 2. `efsVolumeConfiguration.fileSystemId` -> stub directory under
1535///    `/tmp/fakecloud/efs/<filesystemId>[/<rootDirectory>]`. Created with
1536///    `mkdir -p` so different tasks targeting the same filesystem id
1537///    share the same host directory, matching real EFS's "many tasks,
1538///    one filesystem" semantics.
1539/// 3. `fsxWindowsFileServerVolumeConfiguration.fileSystemId` -> stub
1540///    directory under `/tmp/fakecloud/fsx/<filesystemId>/<rootDirectory>`.
1541/// 4. `dockerVolumeConfiguration` -> the volume `name` itself (named
1542///    docker volume; docker creates it on first reference).
1543/// 5. Bare entry (only `name`) -> the volume `name` as an anonymous
1544///    docker volume reference, matching AWS's "Docker volumes" default.
1545///
1546/// Returns `None` when the configuration is malformed (e.g. EFS without
1547/// a fileSystemId).
1548fn resolve_volume_source(name: &str, volume: &serde_json::Value) -> Option<String> {
1549    if let Some(host) = volume.get("host") {
1550        if let Some(path) = host.get("sourcePath").and_then(|v| v.as_str()) {
1551            // Empty sourcePath means "anonymous host volume" — fall
1552            // through to the named-volume default below.
1553            if !path.is_empty() {
1554                ensure_dir_exists(path);
1555                return Some(path.to_string());
1556            }
1557        }
1558    }
1559    if let Some(efs) = volume.get("efsVolumeConfiguration") {
1560        let fs_id = efs.get("fileSystemId").and_then(|v| v.as_str())?;
1561        let root = efs
1562            .get("rootDirectory")
1563            .and_then(|v| v.as_str())
1564            .unwrap_or("/");
1565        let path = stub_dir_for("efs", fs_id, root);
1566        ensure_dir_exists(&path);
1567        return Some(path);
1568    }
1569    if let Some(fsx) = volume.get("fsxWindowsFileServerVolumeConfiguration") {
1570        let fs_id = fsx.get("fileSystemId").and_then(|v| v.as_str())?;
1571        let root = fsx
1572            .get("rootDirectory")
1573            .and_then(|v| v.as_str())
1574            .unwrap_or("/");
1575        let path = stub_dir_for("fsx", fs_id, root);
1576        ensure_dir_exists(&path);
1577        return Some(path);
1578    }
1579    if volume.get("dockerVolumeConfiguration").is_some() {
1580        // Named docker volume — docker auto-creates it on first
1581        // reference. Pass the volume name through verbatim.
1582        return Some(name.to_string());
1583    }
1584    // Bare volume entry: anonymous docker volume keyed by name.
1585    Some(name.to_string())
1586}
1587
1588/// Compose the host stub directory path for an EFS/FSx volume. Falls
1589/// back to a single shared directory per filesystem id when
1590/// `rootDirectory` is unset or `/`, matching the EFS convention where
1591/// the root of the filesystem is the default mount target.
1592fn stub_dir_for(kind: &str, fs_id: &str, root: &str) -> String {
1593    let trimmed = root.trim_start_matches('/');
1594    if trimmed.is_empty() {
1595        format!("/tmp/fakecloud/{kind}/{fs_id}")
1596    } else {
1597        format!("/tmp/fakecloud/{kind}/{fs_id}/{trimmed}")
1598    }
1599}
1600
1601/// Best-effort `mkdir -p` so the EFS/FSx stub path exists before the
1602/// first task tries to bind-mount it. Failures are ignored — docker
1603/// will surface a clear error on the run, and unit tests don't have a
1604/// writable `/tmp/fakecloud` in every sandbox.
1605fn ensure_dir_exists(path: &str) {
1606    let _ = std::fs::create_dir_all(path);
1607}
1608
1609/// Parse one `dependsOn[]` entry. Returns `None` for malformed entries
1610/// (missing `containerName`, unrecognised `condition`) so the caller
1611/// can drop them silently from the launch plan — register-time
1612/// validation already rejects bad values; this is a defensive fallback.
1613fn parse_depends_on_entry(value: &serde_json::Value) -> Option<DependsOn> {
1614    let container_name = value
1615        .get("containerName")
1616        .and_then(|v| v.as_str())?
1617        .to_string();
1618    let raw_condition = value.get("condition").and_then(|v| v.as_str())?;
1619    let condition = DependsOnCondition::parse(raw_condition)?;
1620    Some(DependsOn {
1621        container_name,
1622        condition,
1623    })
1624}
1625
1626/// Topologically sort container plans so `dependsOn` dependencies start
1627/// before their dependants. Implements Kahn's algorithm with stable order:
1628/// when multiple plans are ready, we keep their original declaration
1629/// index, so a task without any dependsOn launches in the same order the
1630/// user wrote in the task definition. Cycles fall through with the
1631/// remaining plans appended in original order — the runtime will still
1632/// launch every container; it just can't guarantee dependency ordering
1633/// in that degenerate case. Cycles are rejected at register time
1634/// (RegisterTaskDefinition -> validate_depends_on_acyclic), so reaching
1635/// that branch from a real launch path means a bug elsewhere.
1636fn topo_sort_plans(plans: Vec<ContainerPlan>) -> Vec<ContainerPlan> {
1637    use std::collections::{HashMap, HashSet};
1638    let names: HashSet<String> = plans.iter().map(|p| p.container_name.clone()).collect();
1639    let index: HashMap<String, usize> = plans
1640        .iter()
1641        .enumerate()
1642        .map(|(i, p)| (p.container_name.clone(), i))
1643        .collect();
1644    // in_degree[i] = number of unresolved dependencies for plan i. We
1645    // ignore depends_on entries that name a container not in the task
1646    // (real ECS rejects those at register time; our register path doesn't
1647    // yet, so be defensive here).
1648    let mut in_degree: Vec<usize> = plans
1649        .iter()
1650        .map(|p| {
1651            p.depends_on
1652                .iter()
1653                .filter(|d| names.contains(&d.container_name))
1654                .count()
1655        })
1656        .collect();
1657    // dependants[i] = indices of plans that depend on plan i.
1658    let mut dependants: Vec<Vec<usize>> = vec![Vec::new(); plans.len()];
1659    for (i, p) in plans.iter().enumerate() {
1660        for d in &p.depends_on {
1661            if let Some(&di) = index.get(&d.container_name) {
1662                dependants[di].push(i);
1663            }
1664        }
1665    }
1666    let mut ordered: Vec<ContainerPlan> = Vec::with_capacity(plans.len());
1667    let mut emitted: Vec<bool> = vec![false; plans.len()];
1668    loop {
1669        // Pick the lowest-index plan whose in_degree is 0 to keep stable
1670        // order across runs.
1671        let next = (0..plans.len()).find(|&i| !emitted[i] && in_degree[i] == 0);
1672        match next {
1673            Some(i) => {
1674                emitted[i] = true;
1675                ordered.push(plans[i].clone());
1676                for &di in &dependants[i] {
1677                    if in_degree[di] > 0 {
1678                        in_degree[di] -= 1;
1679                    }
1680                }
1681            }
1682            None => break,
1683        }
1684    }
1685    // Cycle: append anything left in original order so we don't drop plans.
1686    for (i, p) in plans.into_iter().enumerate() {
1687        if !emitted[i] {
1688            ordered.push(p);
1689        }
1690    }
1691    ordered
1692}
1693
1694/// Validate that `containerDefinitions[].dependsOn[]` graph is acyclic.
1695/// Real ECS rejects cyclic dependencies at RegisterTaskDefinition time
1696/// with a `ClientException`; we mirror that. Returns the offending pair
1697/// of container names so the caller can produce a useful error.
1698///
1699/// Operates directly on the raw JSON definitions (rather than parsed
1700/// `ContainerPlan`s) so register-time validation doesn't have to first
1701/// build a full plan from a not-yet-stored task definition.
1702pub(crate) fn find_depends_on_cycle(
1703    container_definitions: &[serde_json::Value],
1704) -> Option<(String, String)> {
1705    use std::collections::HashMap;
1706
1707    let names: Vec<String> = container_definitions
1708        .iter()
1709        .filter_map(|c| c.get("name").and_then(|n| n.as_str()).map(String::from))
1710        .collect();
1711    let index: HashMap<&str, usize> = names
1712        .iter()
1713        .enumerate()
1714        .map(|(i, n)| (n.as_str(), i))
1715        .collect();
1716
1717    let mut adj: Vec<Vec<usize>> = vec![Vec::new(); names.len()];
1718    for (i, cd) in container_definitions.iter().enumerate() {
1719        if i >= names.len() {
1720            continue;
1721        }
1722        let Some(deps) = cd.get("dependsOn").and_then(|v| v.as_array()) else {
1723            continue;
1724        };
1725        for d in deps {
1726            let Some(target) = d.get("containerName").and_then(|v| v.as_str()) else {
1727                continue;
1728            };
1729            if let Some(&j) = index.get(target) {
1730                // Edge: i depends on j -> for cycle DFS we walk from i to j.
1731                adj[i].push(j);
1732            }
1733        }
1734    }
1735
1736    // DFS with three-colour marking (white=0, gray=1, black=2). When we
1737    // hit a gray neighbour we've closed a cycle; report the back-edge as
1738    // the offending pair.
1739    let mut state = vec![0u8; names.len()];
1740    let mut stack: Vec<(usize, usize)> = Vec::new();
1741    for start in 0..names.len() {
1742        if state[start] != 0 {
1743            continue;
1744        }
1745        stack.clear();
1746        stack.push((start, 0));
1747        state[start] = 1;
1748        while let Some(&(node, next_edge)) = stack.last() {
1749            if next_edge < adj[node].len() {
1750                let nb = adj[node][next_edge];
1751                stack.last_mut().unwrap().1 += 1;
1752                match state[nb] {
1753                    0 => {
1754                        state[nb] = 1;
1755                        stack.push((nb, 0));
1756                    }
1757                    1 => {
1758                        return Some((names[node].clone(), names[nb].clone()));
1759                    }
1760                    _ => {}
1761                }
1762            } else {
1763                state[node] = 2;
1764                stack.pop();
1765            }
1766        }
1767    }
1768    None
1769}
1770
1771/// Snapshot of the docker container state we care about for `dependsOn`
1772/// gating: whether the container exists/started, whether it's exited,
1773/// its exit code, and (when configured) its health status.
1774#[derive(Debug, Clone)]
1775struct InspectedState {
1776    started: bool,
1777    exited: bool,
1778    exit_code: i64,
1779    health: Option<String>,
1780}
1781
1782/// One `docker inspect` call returning every field needed by
1783/// [`condition_is_met`]. Returns `None` when the container doesn't exist
1784/// yet or inspect fails — the caller will simply retry on the next poll.
1785async fn inspect_container_state(cli: &str, container_id: &str) -> Option<InspectedState> {
1786    // Compose all four fields into a single inspect format so the gate
1787    // costs one process spawn per poll rather than four.
1788    let format =
1789        "{{.State.Status}}|{{.State.Running}}|{{.State.ExitCode}}|{{if .State.Health}}{{.State.Health.Status}}{{else}}<none>{{end}}";
1790    let out = Command::new(cli)
1791        .args(["inspect", "-f", format, container_id])
1792        .output()
1793        .await
1794        .ok()?;
1795    if !out.status.success() {
1796        return None;
1797    }
1798    let raw = String::from_utf8_lossy(&out.stdout).trim().to_string();
1799    let parts: Vec<&str> = raw.split('|').collect();
1800    if parts.len() < 4 {
1801        return None;
1802    }
1803    let status = parts[0];
1804    let running = parts[1] == "true";
1805    let exit_code: i64 = parts[2].parse().unwrap_or(-1);
1806    let health = match parts[3] {
1807        "<none>" | "" => None,
1808        other => Some(other.to_string()),
1809    };
1810    // `created` is the brief moment between docker creating the
1811    // container and the entrypoint running. Treat anything past
1812    // `created` as "started" for the START condition.
1813    let started = running || status == "exited" || status == "running" || status == "dead";
1814    let exited = status == "exited" || status == "dead";
1815    Some(InspectedState {
1816        started,
1817        exited,
1818        exit_code,
1819        health,
1820    })
1821}
1822
1823/// Decide whether the polled `state` satisfies a `dependsOn[].condition`.
1824/// Encapsulates the AWS semantics so the polling loop is purely
1825/// mechanical.
1826fn condition_is_met(condition: DependsOnCondition, state: &InspectedState) -> bool {
1827    match condition {
1828        DependsOnCondition::Start => state.started,
1829        DependsOnCondition::Complete => state.exited,
1830        DependsOnCondition::Success => state.exited && state.exit_code == 0,
1831        DependsOnCondition::Healthy => state.health.as_deref() == Some("healthy"),
1832    }
1833}
1834
1835/// Test-only re-export of [`parse_port_mapping`] so sibling test modules
1836/// can lock in the default-port / default-protocol behaviour without us
1837/// widening the visibility of the parser itself.
1838#[cfg(test)]
1839pub(crate) fn __test_parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
1840    parse_port_mapping(value)
1841}
1842
1843/// Parse a `healthCheck` block from a task definition's container
1844/// definition. Returns `None` for missing `command` or for a command
1845/// whose first token is `NONE` (the AWS-documented "disable healthcheck
1846/// inherited from image" sentinel — emit no flags rather than a `none`
1847/// healthcheck). Defaults follow AWS: 30s/5s/3/0s.
1848fn parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
1849    let cmd_arr = value.get("command")?.as_array()?;
1850    let command: Vec<String> = cmd_arr
1851        .iter()
1852        .filter_map(|v| v.as_str().map(String::from))
1853        .collect();
1854    if command.is_empty() {
1855        return None;
1856    }
1857    if command.first().map(|s| s.as_str()) == Some("NONE") {
1858        return None;
1859    }
1860    let read_u32 = |key: &str, default: u32| -> u32 {
1861        value
1862            .get(key)
1863            .and_then(|v| v.as_i64())
1864            .filter(|n| (0..=u32::MAX as i64).contains(n))
1865            .map(|n| n as u32)
1866            .unwrap_or(default)
1867    };
1868    Some(HealthCheckSpec {
1869        command,
1870        interval_seconds: read_u32("interval", 30),
1871        timeout_seconds: read_u32("timeout", 5),
1872        retries: read_u32("retries", 3),
1873        start_period_seconds: read_u32("startPeriod", 0),
1874    })
1875}
1876
1877/// Parse one `ulimits` entry from the container definition JSON.
1878fn parse_ulimit(value: &serde_json::Value) -> Option<Ulimit> {
1879    let name = value.get("name").and_then(|v| v.as_str())?;
1880    let soft = value
1881        .get("softLimit")
1882        .and_then(|v| v.as_i64())
1883        .filter(|n| *n >= 0)? as i32;
1884    let hard = value
1885        .get("hardLimit")
1886        .and_then(|v| v.as_i64())
1887        .filter(|n| *n >= 0)? as i32;
1888    Some(Ulimit {
1889        name: name.to_string(),
1890        soft_limit: soft,
1891        hard_limit: hard,
1892    })
1893}
1894
1895/// Parse `linuxParameters` from the container definition JSON.
1896fn parse_linux_parameters(value: &serde_json::Value) -> Option<LinuxParameters> {
1897    let mut lp = LinuxParameters::default();
1898    if let Some(arr) = value
1899        .get("capabilities")
1900        .and_then(|v| v.get("add"))
1901        .and_then(|v| v.as_array())
1902    {
1903        lp.capabilities_add = arr
1904            .iter()
1905            .filter_map(|v| v.as_str().map(String::from))
1906            .collect();
1907    }
1908    if let Some(arr) = value
1909        .get("capabilities")
1910        .and_then(|v| v.get("drop"))
1911        .and_then(|v| v.as_array())
1912    {
1913        lp.capabilities_drop = arr
1914            .iter()
1915            .filter_map(|v| v.as_str().map(String::from))
1916            .collect();
1917    }
1918    if let Some(arr) = value.get("devices").and_then(|v| v.as_array()) {
1919        lp.devices = arr.iter().filter_map(parse_device).collect();
1920    }
1921    lp.init_process_enabled = value
1922        .get("initProcessEnabled")
1923        .and_then(|v| v.as_bool())
1924        .unwrap_or(false);
1925    lp.shared_memory_size = value
1926        .get("sharedMemorySize")
1927        .and_then(|v| v.as_i64())
1928        .map(|n| n as i32);
1929    if let Some(arr) = value.get("sysctl").and_then(|v| v.as_array()) {
1930        lp.sysctls = arr.iter().filter_map(parse_sysctl).collect();
1931    }
1932    if let Some(arr) = value.get("tmpfs").and_then(|v| v.as_array()) {
1933        lp.tmpfs = arr.iter().filter_map(parse_tmpfs).collect();
1934    }
1935    lp.privileged = value
1936        .get("privileged")
1937        .and_then(|v| v.as_bool())
1938        .unwrap_or(false);
1939    Some(lp)
1940}
1941
1942fn parse_device(value: &serde_json::Value) -> Option<Device> {
1943    let host_path = value.get("hostPath").and_then(|v| v.as_str())?.to_string();
1944    let container_path = value
1945        .get("containerPath")
1946        .and_then(|v| v.as_str())?
1947        .to_string();
1948    let permissions = value
1949        .get("permissions")
1950        .and_then(|v| v.as_str())
1951        .unwrap_or("rwm")
1952        .to_string();
1953    Some(Device {
1954        host_path,
1955        container_path,
1956        permissions,
1957    })
1958}
1959
1960fn parse_sysctl(value: &serde_json::Value) -> Option<Sysctl> {
1961    let name = value.get("name").and_then(|v| v.as_str())?.to_string();
1962    let value_str = value.get("value").and_then(|v| v.as_str())?.to_string();
1963    Some(Sysctl {
1964        name,
1965        value: value_str,
1966    })
1967}
1968
1969fn parse_tmpfs(value: &serde_json::Value) -> Option<Tmpfs> {
1970    let container_path = value
1971        .get("containerPath")
1972        .and_then(|v| v.as_str())?
1973        .to_string();
1974    let size = value
1975        .get("size")
1976        .and_then(|v| v.as_i64())
1977        .filter(|n| *n > 0)? as i32;
1978    let mount_options = value
1979        .get("mountOptions")
1980        .and_then(|v| v.as_array())
1981        .map(|arr| {
1982            arr.iter()
1983                .filter_map(|v| v.as_str().map(String::from))
1984                .collect()
1985        })
1986        .unwrap_or_default();
1987    Some(Tmpfs {
1988        container_path,
1989        size,
1990        mount_options,
1991    })
1992}
1993
1994/// Render a [`HealthCheckSpec`] into the docker run flags that emulate
1995/// the equivalent ECS healthCheck. AWS's `command[0]` is a sentinel
1996/// (`CMD-SHELL`/`CMD`/`NONE`); docker's `--health-cmd` always takes a
1997/// single shell-string, so we collapse the remaining tokens with spaces
1998/// for either sentinel — matching how docker itself stringifies HEALTHCHECK
1999/// CMD ["a","b"] back to a shell string at inspect time.
2000pub(crate) fn render_health_flags(hc: &HealthCheckSpec) -> Vec<String> {
2001    if hc.command.len() < 2 {
2002        return Vec::new();
2003    }
2004    let cmd_kind = hc.command[0].as_str();
2005    if cmd_kind != "CMD" && cmd_kind != "CMD-SHELL" {
2006        return Vec::new();
2007    }
2008    let cmd_string = hc.command[1..].join(" ");
2009    vec![
2010        "--health-cmd".into(),
2011        cmd_string,
2012        format!("--health-interval={}s", hc.interval_seconds),
2013        format!("--health-timeout={}s", hc.timeout_seconds),
2014        format!("--health-retries={}", hc.retries),
2015        format!("--health-start-period={}s", hc.start_period_seconds),
2016    ]
2017}
2018
2019/// Test-only re-export of [`parse_health_check`] so unit tests in
2020/// sibling modules can lock in the AWS default-fill behaviour without
2021/// us widening the parser's visibility.
2022#[cfg(test)]
2023pub(crate) fn __test_parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
2024    parse_health_check(value)
2025}
2026
2027/// Map a docker `.State.Health.Status` value to the ECS `healthStatus`
2028/// shape. Docker emits `starting|healthy|unhealthy|none|""` (empty when
2029/// the image has no HEALTHCHECK and we didn't add one). ECS only knows
2030/// `HEALTHY|UNHEALTHY|UNKNOWN`, so anything that isn't a clean healthy/
2031/// unhealthy lands in `UNKNOWN`.
2032pub(crate) fn docker_health_to_ecs(raw: &str) -> &'static str {
2033    match raw.trim().to_ascii_lowercase().as_str() {
2034        "healthy" => "HEALTHY",
2035        "unhealthy" => "UNHEALTHY",
2036        _ => "UNKNOWN",
2037    }
2038}
2039
2040/// Parse a single `portMappings[]` entry. Returns `None` for entries
2041/// that are missing `containerPort` or have a value out of `u16` range.
2042/// Defaults: `hostPort` -> `containerPort`, `protocol` -> `tcp`.
2043fn parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
2044    let container_port = value
2045        .get("containerPort")
2046        .and_then(|v| v.as_i64())
2047        .filter(|n| (0..=u16::MAX as i64).contains(n))? as u16;
2048    let host_port_raw = value
2049        .get("hostPort")
2050        .and_then(|v| v.as_i64())
2051        .filter(|n| (0..=u16::MAX as i64).contains(n))
2052        .map(|n| n as u16)
2053        .unwrap_or(0);
2054    let host_port = if host_port_raw == 0 {
2055        container_port
2056    } else {
2057        host_port_raw
2058    };
2059    let protocol = value
2060        .get("protocol")
2061        .and_then(|v| v.as_str())
2062        .map(|s| s.to_ascii_lowercase())
2063        .unwrap_or_else(|| "tcp".to_string());
2064    Some(PortMapping {
2065        container_port,
2066        host_port,
2067        protocol,
2068    })
2069}
2070
2071/// Build the docker `run` argv for a single container plan. Pure so unit
2072/// tests can assert on flag ordering / `--publish` translation without
2073/// shelling out. The returned vector is everything *after* the binary
2074/// name (i.e. starts with `run`, ends with the user-supplied command
2075/// args).
2076pub(crate) fn build_run_argv(
2077    plan: &ContainerPlan,
2078    env: &[(String, String)],
2079    task_id: &str,
2080    host_ip: &str,
2081    run_image: &str,
2082) -> Vec<String> {
2083    let mut argv: Vec<String> = Vec::new();
2084    argv.push("run".into());
2085    argv.push("-d".into());
2086    argv.push("--name".into());
2087    argv.push(format!("{}-{}", task_id, plan.container_name));
2088    argv.push("--label".into());
2089    argv.push(format!("fakecloud-ecs-task={}", task_id));
2090    argv.push("--label".into());
2091    argv.push(format!("fakecloud-ecs-container={}", plan.container_name));
2092    argv.push("--add-host".into());
2093    argv.push(format!("host.docker.internal:{}", host_ip));
2094    if plan.network_mode.as_deref() == Some("awsvpc") {
2095        argv.push("--network".into());
2096        argv.push(format!("fakecloud-ecs-{}", task_id));
2097    }
2098    // `awsvpc` puts the container on a per-task ENI; emulating that on a
2099    // local docker host means *not* publishing to the host port table.
2100    // Bridge / host / default network modes still get `--publish`.
2101    let publish_ports = plan.network_mode.as_deref() != Some("awsvpc");
2102    if publish_ports {
2103        for pm in &plan.port_mappings {
2104            argv.push("--publish".into());
2105            argv.push(format!(
2106                "{}:{}/{}",
2107                pm.container_port, pm.host_port, pm.protocol
2108            ));
2109        }
2110    }
2111    if let Some(ref hc) = plan.health_check {
2112        argv.extend(render_health_flags(hc));
2113    }
2114    for (k, v) in env {
2115        let transformed = v
2116            .replace("http://127.0.0.1:", "http://host.docker.internal:")
2117            .replace("https://127.0.0.1:", "https://host.docker.internal:")
2118            .replace("http://localhost:", "http://host.docker.internal:")
2119            .replace("https://localhost:", "https://host.docker.internal:");
2120        argv.push("-e".into());
2121        argv.push(format!("{}={}", k, transformed));
2122    }
2123    // Volume mounts: one `-v` flag per mountPoints entry, with the
2124    // source resolved from the task definition's `volumes[]`. EFS and
2125    // FSx stubs were materialised on the host (mkdir -p) before this
2126    // function returns, so docker can bind them straight in.
2127    for vm in &plan.volume_mounts {
2128        argv.push("-v".into());
2129        let suffix = if vm.read_only { ":ro" } else { "" };
2130        argv.push(format!("{}:{}{}", vm.source, vm.container_path, suffix));
2131    }
2132    for ul in &plan.ulimits {
2133        argv.push("--ulimit".into());
2134        argv.push(format!("{}={}:{}", ul.name, ul.soft_limit, ul.hard_limit));
2135    }
2136    if let Some(ref lp) = plan.linux_parameters {
2137        for cap in &lp.capabilities_add {
2138            argv.push("--cap-add".into());
2139            argv.push(cap.clone());
2140        }
2141        for cap in &lp.capabilities_drop {
2142            argv.push("--cap-drop".into());
2143            argv.push(cap.clone());
2144        }
2145        for dev in &lp.devices {
2146            argv.push("--device".into());
2147            argv.push(format!(
2148                "{}:{}{}",
2149                dev.host_path, dev.container_path, dev.permissions
2150            ));
2151        }
2152        if lp.init_process_enabled {
2153            argv.push("--init".into());
2154        }
2155        if let Some(size) = lp.shared_memory_size {
2156            argv.push("--shm-size".into());
2157            argv.push(format!("{}m", size));
2158        }
2159        for sys in &lp.sysctls {
2160            argv.push("--sysctl".into());
2161            argv.push(format!("{}={}", sys.name, sys.value));
2162        }
2163        for tmp in &lp.tmpfs {
2164            let mut opts = tmp.mount_options.join(",");
2165            if !opts.is_empty() {
2166                opts = format!(",{}", opts);
2167            }
2168            argv.push("--tmpfs".into());
2169            argv.push(format!("{}:size={}M{}", tmp.container_path, tmp.size, opts));
2170        }
2171        if lp.privileged {
2172            argv.push("--privileged".into());
2173        }
2174    }
2175    if let Some(timeout) = plan.stop_timeout {
2176        argv.push("--stop-timeout".into());
2177        argv.push(format!("{}", timeout));
2178    }
2179    if let Some(ref user) = plan.user {
2180        argv.push("--user".into());
2181        argv.push(user.clone());
2182    }
2183    if let Some(ref wd) = plan.working_directory {
2184        argv.push("--workdir".into());
2185        argv.push(wd.clone());
2186    }
2187    if plan.tty {
2188        argv.push("--tty".into());
2189    }
2190    if plan.interactive {
2191        argv.push("--interactive".into());
2192    }
2193    if plan.readonly_rootfs {
2194        argv.push("--read-only".into());
2195    }
2196    if let Some(first) = plan.entry_point.first() {
2197        argv.push("--entrypoint".into());
2198        argv.push(first.clone());
2199    }
2200    argv.push(run_image.to_string());
2201    for arg in plan.entry_point.iter().skip(1) {
2202        argv.push(arg.clone());
2203    }
2204    for arg in &plan.command {
2205        argv.push(arg.clone());
2206    }
2207    argv
2208}
2209
2210/// Render `networkBindings` JSON for a launched container. Empty under
2211/// `awsvpc` (the equivalent info goes on the task's ENI attachments) and
2212/// for containers without `portMappings`.
2213pub(crate) fn network_bindings_for(plan: &ContainerPlan) -> Vec<serde_json::Value> {
2214    if plan.network_mode.as_deref() == Some("awsvpc") {
2215        return Vec::new();
2216    }
2217    plan.port_mappings
2218        .iter()
2219        .map(|pm| {
2220            serde_json::json!({
2221                "bindIP": "0.0.0.0",
2222                "containerPort": pm.container_port,
2223                "hostPort": pm.host_port,
2224                "protocol": pm.protocol,
2225            })
2226        })
2227        .collect()
2228}
2229
2230/// Compute ELBv2 target registrations for a task based on its service's
2231/// loadBalancers configuration. Returns (target_group_arn, [(target_id, port)])
2232/// for each target group that should receive this task.
2233#[allow(clippy::type_complexity)]
2234pub(crate) fn compute_elbv2_targets(
2235    ecs_state: &crate::state::EcsState,
2236    task: &crate::state::Task,
2237) -> Vec<(String, Vec<(String, Option<i64>)>)> {
2238    let mut result = Vec::new();
2239    let Some(group) = task.group.as_deref() else {
2240        return result;
2241    };
2242    let service_name = group.strip_prefix("service:").unwrap_or(group);
2243    let key = crate::state::EcsState::service_key(&task.cluster_name, service_name);
2244    let Some(service) = ecs_state.services.get(&key) else {
2245        return result;
2246    };
2247
2248    let network_mode = ecs_state
2249        .task_definitions
2250        .get(&task.family)
2251        .and_then(|revs| revs.get(&task.revision))
2252        .and_then(|td| td.network_mode.as_deref());
2253
2254    for lb in &service.load_balancers {
2255        let tg_arn = lb.get("targetGroupArn").and_then(|v| v.as_str());
2256        let container_name = lb.get("containerName").and_then(|v| v.as_str());
2257        let container_port = lb.get("containerPort").and_then(|v| v.as_i64());
2258        let Some(tg_arn) = tg_arn else { continue };
2259        let Some(container_name) = container_name else {
2260            continue;
2261        };
2262
2263        let target_id = if network_mode == Some("awsvpc") {
2264            task.attachments
2265                .iter()
2266                .find(|a| a.attachment_type == "eni")
2267                .and_then(|eni| {
2268                    eni.details
2269                        .iter()
2270                        .find(|d| d.name == "privateIPv4Address")
2271                        .map(|d| d.value.clone())
2272                })
2273        } else {
2274            Some("127.0.0.1".to_string())
2275        };
2276
2277        let port = if network_mode == Some("awsvpc") {
2278            container_port
2279        } else {
2280            task.containers
2281                .iter()
2282                .find(|c| c.name == container_name)
2283                .and_then(|c| {
2284                    c.network_bindings
2285                        .iter()
2286                        .find(|nb| {
2287                            nb.get("containerPort").and_then(|v| v.as_i64()) == container_port
2288                        })
2289                        .and_then(|nb| nb.get("hostPort").and_then(|v| v.as_i64()))
2290                })
2291        };
2292
2293        if let Some(id) = target_id {
2294            if let Some(entry) = result.iter_mut().find(|(arn, _)| arn == tg_arn) {
2295                entry.1.push((id, port));
2296            } else {
2297                result.push((tg_arn.to_string(), vec![(id, port)]));
2298            }
2299        }
2300    }
2301    result
2302}
2303
2304struct TaskSnapshot {
2305    task_arn: String,
2306    cluster_arn: String,
2307    launch_type: String,
2308    group: Option<String>,
2309    task_definition_arn: String,
2310    containers: serde_json::Value,
2311}
2312
2313fn snapshot_task(state: &SharedEcsState, account_id: &str, task_id: &str) -> Option<TaskSnapshot> {
2314    let accounts = state.read();
2315    let s = accounts.get(account_id)?;
2316    let task = s.tasks.get(task_id)?;
2317    Some(TaskSnapshot {
2318        task_arn: task.task_arn.clone(),
2319        cluster_arn: task.cluster_arn.clone(),
2320        launch_type: task.launch_type.clone(),
2321        group: task.group.clone(),
2322        task_definition_arn: task.task_definition_arn.clone(),
2323        containers: serde_json::Value::Array(
2324            task.containers
2325                .iter()
2326                .map(|c| {
2327                    serde_json::json!({
2328                        "containerArn": c.container_arn,
2329                        "name": c.name,
2330                        "image": c.image,
2331                        "lastStatus": c.last_status,
2332                        "exitCode": c.exit_code,
2333                        "reason": c.reason,
2334                    })
2335                })
2336                .collect(),
2337        ),
2338    })
2339}
2340
2341fn cli_works(cli: &str) -> bool {
2342    std::process::Command::new(cli)
2343        .arg("info")
2344        .stdout(std::process::Stdio::null())
2345        .stderr(std::process::Stdio::null())
2346        .status()
2347        .map(|s| s.success())
2348        .unwrap_or(false)
2349}
2350
2351/// Build an isolated docker config directory with Basic auth for
2352/// fakecloud ECR at `127.0.0.1:<port>`. Lets `docker pull/push/tag`
2353/// work against the local OCI v2 registry without requiring the user
2354/// to run `aws ecr get-login-password | docker login` first.
2355fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
2356    let dir = TempDir::new().ok()?;
2357    let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-ecs-runtime");
2358    let config = serde_json::json!({
2359        "auths": {
2360            format!("127.0.0.1:{server_port}"): { "auth": auth },
2361        }
2362    });
2363    std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
2364    Some(dir)
2365}
2366
2367fn find_container_definition(
2368    state: &crate::state::EcsState,
2369    family: &str,
2370    revision: i32,
2371    name: &str,
2372) -> Option<serde_json::Value> {
2373    state
2374        .task_definitions
2375        .get(family)?
2376        .get(&revision)?
2377        .container_definitions
2378        .iter()
2379        .find(|c| c.get("name").and_then(|v| v.as_str()) == Some(name))
2380        .cloned()
2381}
2382
2383fn mark_pull_started(state: &SharedEcsState, account_id: &str, task_id: &str) {
2384    let mut accounts = state.write();
2385    let Some(s) = accounts.get_mut(account_id) else {
2386        return;
2387    };
2388    let task_arn_cluster = s
2389        .tasks
2390        .get(task_id)
2391        .map(|t| (t.task_arn.clone(), t.cluster_arn.clone()));
2392    if let Some(task) = s.tasks.get_mut(task_id) {
2393        task.pull_started_at = Some(Utc::now());
2394    }
2395    if let Some((arn, cluster_arn)) = task_arn_cluster {
2396        s.push_event(LifecycleEvent {
2397            at: Utc::now(),
2398            event_type: "PullStarted".into(),
2399            task_arn: Some(arn),
2400            cluster_arn: Some(cluster_arn),
2401            last_status: Some("PENDING".into()),
2402            detail: serde_json::json!({}),
2403        });
2404    }
2405}
2406
2407fn mark_pull_stopped(state: &SharedEcsState, account_id: &str, task_id: &str) {
2408    let mut accounts = state.write();
2409    let Some(s) = accounts.get_mut(account_id) else {
2410        return;
2411    };
2412    if let Some(task) = s.tasks.get_mut(task_id) {
2413        task.pull_stopped_at = Some(Utc::now());
2414    }
2415}
2416
2417pub(crate) fn mark_running_multi(
2418    state: &SharedEcsState,
2419    account_id: &str,
2420    task_id: &str,
2421    started: &[RunningContainer],
2422) {
2423    let mut accounts = state.write();
2424    let Some(s) = accounts.get_mut(account_id) else {
2425        return;
2426    };
2427    let (arn, cluster_arn) = {
2428        let Some(task) = s.tasks.get_mut(task_id) else {
2429            return;
2430        };
2431        task.last_status = "RUNNING".into();
2432        task.connectivity = "CONNECTED".into();
2433        task.connectivity_at = Some(Utc::now());
2434        task.started_at = Some(Utc::now());
2435        for rc in started {
2436            if let Some(c) = task.containers.iter_mut().find(|c| c.name == rc.name) {
2437                c.runtime_id = Some(rc.container_id.clone());
2438                c.last_status = "RUNNING".into();
2439                c.network_bindings = rc.network_bindings.clone();
2440                if rc.image_digest.is_some() {
2441                    c.image_digest = rc.image_digest.clone();
2442                }
2443            }
2444        }
2445        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
2446            cluster.running_tasks_count += 1;
2447            if cluster.pending_tasks_count > 0 {
2448                cluster.pending_tasks_count -= 1;
2449            }
2450        }
2451        if let Some(ref ci_arn) = task.container_instance_arn {
2452            if let Some(ci) = s
2453                .container_instances
2454                .values_mut()
2455                .find(|ci| ci.container_instance_arn == *ci_arn)
2456            {
2457                ci.running_tasks_count += 1;
2458                if ci.pending_tasks_count > 0 {
2459                    ci.pending_tasks_count -= 1;
2460                }
2461            }
2462        }
2463        (task.task_arn.clone(), task.cluster_arn.clone())
2464    };
2465    s.push_event(LifecycleEvent {
2466        at: Utc::now(),
2467        event_type: "TaskStateChange".into(),
2468        task_arn: Some(arn),
2469        cluster_arn: Some(cluster_arn),
2470        last_status: Some("RUNNING".into()),
2471        detail: serde_json::json!({}),
2472    });
2473}
2474
2475#[allow(clippy::too_many_arguments)]
2476fn finalize_stopped_multi(
2477    state: &SharedEcsState,
2478    account_id: &str,
2479    task_id: &str,
2480    final_containers: &[RunningContainer],
2481    primary_exit_code: i64,
2482    captured: &str,
2483    stop_code: &str,
2484    stopped_reason: Option<String>,
2485) {
2486    let mut accounts = state.write();
2487    let Some(s) = accounts.get_mut(account_id) else {
2488        return;
2489    };
2490    let (arn, cluster_arn) = {
2491        let Some(task) = s.tasks.get_mut(task_id) else {
2492            return;
2493        };
2494        task.last_status = "STOPPED".into();
2495        task.desired_status = "STOPPED".into();
2496        task.stopping_at = task.stopping_at.or(Some(Utc::now()));
2497        task.stopped_at = Some(Utc::now());
2498        task.stop_code = Some(stop_code.into());
2499        task.stopped_reason = stopped_reason.or(Some(format!("Exit code {}", primary_exit_code)));
2500        task.captured_logs = captured.to_string();
2501        for c in task.containers.iter_mut() {
2502            c.last_status = "STOPPED".into();
2503            if c.exit_code.is_none() {
2504                let mapped = final_containers
2505                    .iter()
2506                    .find(|r| r.name == c.name)
2507                    .and_then(|r| r.exit_code);
2508                c.exit_code = mapped.or(Some(primary_exit_code));
2509            }
2510        }
2511        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
2512            if cluster.running_tasks_count > 0 {
2513                cluster.running_tasks_count -= 1;
2514            }
2515        }
2516        if let Some(ref ci_arn) = task.container_instance_arn {
2517            if let Some(ci) = s
2518                .container_instances
2519                .values_mut()
2520                .find(|ci| ci.container_instance_arn == *ci_arn)
2521            {
2522                if ci.running_tasks_count > 0 {
2523                    ci.running_tasks_count -= 1;
2524                }
2525            }
2526        }
2527        (task.task_arn.clone(), task.cluster_arn.clone())
2528    };
2529    s.push_event(LifecycleEvent {
2530        at: Utc::now(),
2531        event_type: "TaskStateChange".into(),
2532        task_arn: Some(arn),
2533        cluster_arn: Some(cluster_arn),
2534        last_status: Some("STOPPED".into()),
2535        detail: serde_json::json!({
2536            "exitCode": primary_exit_code,
2537            "stopCode": stop_code,
2538        }),
2539    });
2540}
2541
2542fn finalize_failure(state: &SharedEcsState, account_id: &str, task_id: &str, reason: &str) {
2543    let mut accounts = state.write();
2544    let Some(s) = accounts.get_mut(account_id) else {
2545        return;
2546    };
2547    let (arn, cluster_arn) = {
2548        let Some(task) = s.tasks.get_mut(task_id) else {
2549            return;
2550        };
2551        // Capture the prior status before we clobber it: if the task had
2552        // already reached RUNNING when execution failed (e.g. `docker wait`
2553        // blew up after the container started), we owe the cluster a
2554        // running-tasks decrement. Tasks that died before RUNNING only
2555        // ever incremented pendingTasksCount.
2556        let was_running = task.last_status == "RUNNING";
2557        task.last_status = "STOPPED".into();
2558        task.desired_status = "STOPPED".into();
2559        task.stopped_at = Some(Utc::now());
2560        task.stop_code = Some("TaskFailedToStart".into());
2561        task.stopped_reason = Some(reason.to_string());
2562        // Surface the failure reason on the /logs endpoint — without this,
2563        // a task that never reached RUNNING returns an empty log string,
2564        // leaving E2E assertions with no diagnostic.
2565        task.captured_logs = format!("[task failed to start]: {reason}");
2566        for c in task.containers.iter_mut() {
2567            c.last_status = "STOPPED".into();
2568            c.reason = Some(reason.to_string());
2569        }
2570        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
2571            if was_running {
2572                if cluster.running_tasks_count > 0 {
2573                    cluster.running_tasks_count -= 1;
2574                }
2575            } else if cluster.pending_tasks_count > 0 {
2576                cluster.pending_tasks_count -= 1;
2577            }
2578        }
2579        if let Some(ref ci_arn) = task.container_instance_arn {
2580            if let Some(ci) = s
2581                .container_instances
2582                .values_mut()
2583                .find(|ci| ci.container_instance_arn == *ci_arn)
2584            {
2585                if was_running {
2586                    if ci.running_tasks_count > 0 {
2587                        ci.running_tasks_count -= 1;
2588                    }
2589                } else if ci.pending_tasks_count > 0 {
2590                    ci.pending_tasks_count -= 1;
2591                }
2592            }
2593        }
2594        (task.task_arn.clone(), task.cluster_arn.clone())
2595    };
2596    s.push_event(LifecycleEvent {
2597        at: Utc::now(),
2598        event_type: "TaskFailedToStart".into(),
2599        task_arn: Some(arn),
2600        cluster_arn: Some(cluster_arn),
2601        last_status: Some("STOPPED".into()),
2602        detail: serde_json::json!({ "reason": reason }),
2603    });
2604}
2605
2606/// Short helper for tests + snapshot code to sleep between state
2607/// transitions. Exposed on the crate boundary to keep test timing
2608/// centralized.
2609pub async fn sleep(duration: Duration) {
2610    tokio::time::sleep(duration).await;
2611}
2612
2613#[cfg(test)]
2614mod tests {
2615    use super::*;
2616    use crate::state::{EcsState, Task};
2617    use fakecloud_aws::arn::Arn;
2618    use fakecloud_core::multi_account::MultiAccountState;
2619    use parking_lot::RwLock;
2620    use std::sync::Arc;
2621
2622    #[test]
2623    fn cli_works_for_known_missing_binary_is_false() {
2624        assert!(!cli_works("definitely-not-a-real-cli-binary-xyz"));
2625    }
2626
2627    #[test]
2628    fn aws_ecr_uris_translate_for_local_pull() {
2629        assert_eq!(
2630            fakecloud_core::ecr_uri::translate_to_local(
2631                "123456789012.dkr.ecr.us-east-1.amazonaws.com/app:latest",
2632                4566
2633            )
2634            .as_deref(),
2635            Some("127.0.0.1:4566/app:latest")
2636        );
2637    }
2638
2639    fn make_task(task_id: &str) -> Task {
2640        Task {
2641            task_arn: Arn::new(
2642                "ecs",
2643                "us-east-1",
2644                "000000000000",
2645                &format!("task/default/{task_id}"),
2646            )
2647            .to_string(),
2648            task_id: task_id.into(),
2649            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2650            cluster_name: "default".into(),
2651            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2652            family: "app".into(),
2653            revision: 1,
2654            container_instance_arn: None,
2655            capacity_provider_name: None,
2656            last_status: "PENDING".into(),
2657            desired_status: "RUNNING".into(),
2658            launch_type: "FARGATE".into(),
2659            platform_version: None,
2660            cpu: None,
2661            memory: None,
2662            containers: Vec::new(),
2663            overrides: serde_json::json!({}),
2664            started_by: None,
2665            group: None,
2666            connectivity: "CONNECTING".into(),
2667            stop_code: None,
2668            stopped_reason: None,
2669            created_at: Utc::now(),
2670            started_at: None,
2671            stopping_at: None,
2672            stopped_at: None,
2673            pull_started_at: None,
2674            pull_stopped_at: None,
2675            connectivity_at: None,
2676            started_by_ref_id: None,
2677            execution_role_arn: None,
2678            task_role_arn: None,
2679            tags: Vec::new(),
2680            awslogs: None,
2681            captured_logs: String::new(),
2682            protection: None,
2683            enable_execute_command: false,
2684            attachments: Vec::new(),
2685            volume_configurations: Vec::new(),
2686            task_set_arn: None,
2687        }
2688    }
2689
2690    #[test]
2691    fn finalize_failure_writes_reason_into_captured_logs() {
2692        let mut accounts: MultiAccountState<EcsState> =
2693            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2694        let acct = accounts.get_or_create("000000000000");
2695        acct.tasks.insert("t1".into(), make_task("t1"));
2696        let state: SharedEcsState = Arc::new(RwLock::new(accounts));
2697
2698        finalize_failure(
2699            &state,
2700            "000000000000",
2701            "t1",
2702            "failed to resolve secret DB_PASSWORD",
2703        );
2704
2705        let accounts = state.read();
2706        let task = accounts
2707            .get("000000000000")
2708            .unwrap()
2709            .tasks
2710            .get("t1")
2711            .unwrap();
2712        assert_eq!(task.last_status, "STOPPED");
2713        assert_eq!(task.stop_code.as_deref(), Some("TaskFailedToStart"));
2714        assert!(
2715            task.captured_logs
2716                .contains("failed to resolve secret DB_PASSWORD"),
2717            "captured_logs missing reason: {:?}",
2718            task.captured_logs
2719        );
2720        assert!(
2721            task.captured_logs.starts_with("[task failed to start]:"),
2722            "captured_logs missing prefix: {:?}",
2723            task.captured_logs
2724        );
2725    }
2726
2727    fn make_container(name: &str, essential: bool) -> crate::state::Container {
2728        crate::state::Container {
2729            container_arn: format!(
2730                "arn:aws:ecs:us-east-1:000000000000:container/default/abc/{name}"
2731            ),
2732            name: name.into(),
2733            image: "alpine".into(),
2734            task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
2735            last_status: "RUNNING".into(),
2736            exit_code: None,
2737            reason: None,
2738            runtime_id: Some(format!("dockerid-{name}")),
2739            essential,
2740            cpu: None,
2741            memory: None,
2742            memory_reservation: None,
2743            network_bindings: Vec::new(),
2744            network_interfaces: Vec::new(),
2745            health_status: None,
2746            managed_agents: None,
2747            image_digest: None,
2748        }
2749    }
2750
2751    #[test]
2752    fn task_should_stop_when_essential_exits() {
2753        let containers = vec![
2754            RunningContainer {
2755                name: "app".into(),
2756                container_id: "id-app".into(),
2757                essential: true,
2758                exit_code: Some(0),
2759                network_bindings: Vec::new(),
2760                image_digest: None,
2761            },
2762            RunningContainer {
2763                name: "sidecar".into(),
2764                container_id: "id-sc".into(),
2765                essential: false,
2766                exit_code: None,
2767                network_bindings: Vec::new(),
2768                image_digest: None,
2769            },
2770        ];
2771        assert!(task_should_stop(&containers));
2772    }
2773
2774    #[test]
2775    fn task_keeps_running_when_only_non_essential_exits() {
2776        let containers = vec![
2777            RunningContainer {
2778                name: "app".into(),
2779                container_id: "id-app".into(),
2780                essential: true,
2781                exit_code: None,
2782                network_bindings: Vec::new(),
2783                image_digest: None,
2784            },
2785            RunningContainer {
2786                name: "sidecar".into(),
2787                container_id: "id-sc".into(),
2788                essential: false,
2789                exit_code: Some(0),
2790                network_bindings: Vec::new(),
2791                image_digest: None,
2792            },
2793        ];
2794        assert!(!task_should_stop(&containers));
2795    }
2796
2797    #[test]
2798    fn task_stops_when_all_non_essentials_exit() {
2799        let containers = vec![
2800            RunningContainer {
2801                name: "a".into(),
2802                container_id: "id-a".into(),
2803                essential: false,
2804                exit_code: Some(0),
2805                network_bindings: Vec::new(),
2806                image_digest: None,
2807            },
2808            RunningContainer {
2809                name: "b".into(),
2810                container_id: "id-b".into(),
2811                essential: false,
2812                exit_code: Some(1),
2813                network_bindings: Vec::new(),
2814                image_digest: None,
2815            },
2816        ];
2817        assert!(task_should_stop(&containers));
2818    }
2819
2820    #[test]
2821    fn finalize_stopped_multi_assigns_per_container_exit_codes() {
2822        let mut accounts: MultiAccountState<EcsState> =
2823            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2824        let acct = accounts.get_or_create("000000000000");
2825        let mut t = make_task("t1");
2826        t.containers = vec![
2827            make_container("app", true),
2828            make_container("sidecar", false),
2829        ];
2830        acct.tasks.insert("t1".into(), t);
2831        let state: SharedEcsState = Arc::new(RwLock::new(accounts));
2832
2833        let final_containers = vec![
2834            RunningContainer {
2835                name: "app".into(),
2836                container_id: "id-app".into(),
2837                essential: true,
2838                exit_code: Some(0),
2839                network_bindings: Vec::new(),
2840                image_digest: None,
2841            },
2842            RunningContainer {
2843                name: "sidecar".into(),
2844                container_id: "id-sc".into(),
2845                essential: false,
2846                exit_code: Some(137),
2847                network_bindings: Vec::new(),
2848                image_digest: None,
2849            },
2850        ];
2851        finalize_stopped_multi(
2852            &state,
2853            "000000000000",
2854            "t1",
2855            &final_containers,
2856            0,
2857            "captured",
2858            "EssentialContainerExited",
2859            None,
2860        );
2861
2862        let accounts = state.read();
2863        let task = accounts
2864            .get("000000000000")
2865            .unwrap()
2866            .tasks
2867            .get("t1")
2868            .unwrap();
2869        assert_eq!(task.last_status, "STOPPED");
2870        assert_eq!(task.stop_code.as_deref(), Some("EssentialContainerExited"));
2871        let app = task.containers.iter().find(|c| c.name == "app").unwrap();
2872        let sc = task
2873            .containers
2874            .iter()
2875            .find(|c| c.name == "sidecar")
2876            .unwrap();
2877        assert_eq!(app.exit_code, Some(0));
2878        assert_eq!(sc.exit_code, Some(137));
2879        assert_eq!(app.last_status, "STOPPED");
2880        assert_eq!(sc.last_status, "STOPPED");
2881    }
2882
2883    fn plan(name: &str, deps: &[&str]) -> ContainerPlan {
2884        ContainerPlan {
2885            container_name: name.into(),
2886            image: "alpine".into(),
2887            env: Vec::new(),
2888            entry_point: Vec::new(),
2889            command: Vec::new(),
2890            secrets_refs: Vec::new(),
2891            essential: true,
2892            has_task_role: false,
2893            port_mappings: Vec::new(),
2894            network_mode: None,
2895            depends_on: deps
2896                .iter()
2897                .map(|s| DependsOn {
2898                    container_name: (*s).to_string(),
2899                    condition: DependsOnCondition::Start,
2900                })
2901                .collect(),
2902            health_check: None,
2903            volume_mounts: Vec::new(),
2904            ulimits: Vec::new(),
2905            linux_parameters: None,
2906            stop_timeout: None,
2907            user: None,
2908            working_directory: None,
2909            tty: false,
2910            interactive: false,
2911            readonly_rootfs: false,
2912        }
2913    }
2914
2915    #[test]
2916    fn topo_sort_orders_by_depends_on() {
2917        // sidecar depends on app, so app must come first regardless of
2918        // declaration order.
2919        let plans = vec![plan("sidecar", &["app"]), plan("app", &[])];
2920        let ordered = topo_sort_plans(plans);
2921        assert_eq!(ordered[0].container_name, "app");
2922        assert_eq!(ordered[1].container_name, "sidecar");
2923    }
2924
2925    #[test]
2926    fn topo_sort_preserves_declaration_order_when_no_deps() {
2927        let plans = vec![plan("first", &[]), plan("second", &[]), plan("third", &[])];
2928        let ordered = topo_sort_plans(plans);
2929        let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2930        assert_eq!(names, vec!["first", "second", "third"]);
2931    }
2932
2933    #[test]
2934    fn topo_sort_handles_chain() {
2935        // c -> b -> a, declared in reverse so the topological sort must
2936        // bubble dependencies up.
2937        let plans = vec![plan("c", &["b"]), plan("b", &["a"]), plan("a", &[])];
2938        let ordered = topo_sort_plans(plans);
2939        let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2940        assert_eq!(names, vec!["a", "b", "c"]);
2941    }
2942
2943    #[test]
2944    fn topo_sort_ignores_unknown_dependency() {
2945        // depends_on names a container not in this task definition. Real
2946        // ECS would reject this at register time; we don't (yet), so the
2947        // unknown dep should just be skipped instead of stalling the sort.
2948        let plans = vec![plan("only", &["does-not-exist"])];
2949        let ordered = topo_sort_plans(plans);
2950        assert_eq!(ordered.len(), 1);
2951        assert_eq!(ordered[0].container_name, "only");
2952    }
2953
2954    #[test]
2955    fn topo_sort_recovers_from_cycle() {
2956        // Cyclic dependsOn: both plans should still appear in the output
2957        // so the runtime doesn't silently drop them.
2958        let plans = vec![plan("a", &["b"]), plan("b", &["a"])];
2959        let ordered = topo_sort_plans(plans);
2960        assert_eq!(ordered.len(), 2);
2961    }
2962
2963    #[test]
2964    fn parse_health_check_fills_aws_defaults() {
2965        let v = serde_json::json!({
2966            "command": ["CMD-SHELL", "curl -f http://localhost/ || exit 1"],
2967        });
2968        let hc = __test_parse_health_check(&v).expect("parsed");
2969        assert_eq!(hc.command[0], "CMD-SHELL");
2970        assert_eq!(hc.interval_seconds, 30);
2971        assert_eq!(hc.timeout_seconds, 5);
2972        assert_eq!(hc.retries, 3);
2973        assert_eq!(hc.start_period_seconds, 0);
2974    }
2975
2976    #[test]
2977    fn parse_health_check_overrides_explicit_values() {
2978        let v = serde_json::json!({
2979            "command": ["CMD", "/probe"],
2980            "interval": 7,
2981            "timeout": 2,
2982            "retries": 9,
2983            "startPeriod": 12,
2984        });
2985        let hc = __test_parse_health_check(&v).expect("parsed");
2986        assert_eq!(hc.interval_seconds, 7);
2987        assert_eq!(hc.timeout_seconds, 2);
2988        assert_eq!(hc.retries, 9);
2989        assert_eq!(hc.start_period_seconds, 12);
2990    }
2991
2992    #[test]
2993    fn parse_health_check_returns_none_for_none_sentinel() {
2994        // ECS uses ["NONE"] to disable an inherited HEALTHCHECK; we
2995        // skip emission rather than passing a literal `none` to docker.
2996        let v = serde_json::json!({ "command": ["NONE"] });
2997        assert!(__test_parse_health_check(&v).is_none());
2998    }
2999
3000    #[test]
3001    fn parse_health_check_returns_none_for_missing_command() {
3002        let v = serde_json::json!({ "interval": 30 });
3003        assert!(__test_parse_health_check(&v).is_none());
3004    }
3005
3006    #[test]
3007    fn render_health_flags_emits_full_set_for_cmd_shell() {
3008        let hc = HealthCheckSpec {
3009            command: vec!["CMD-SHELL".into(), "curl -f http://localhost/".into()],
3010            interval_seconds: 15,
3011            timeout_seconds: 3,
3012            retries: 4,
3013            start_period_seconds: 10,
3014        };
3015        let flags = render_health_flags(&hc);
3016        assert_eq!(flags[0], "--health-cmd");
3017        assert_eq!(flags[1], "curl -f http://localhost/");
3018        assert!(flags.contains(&"--health-interval=15s".to_string()));
3019        assert!(flags.contains(&"--health-timeout=3s".to_string()));
3020        assert!(flags.contains(&"--health-retries=4".to_string()));
3021        assert!(flags.contains(&"--health-start-period=10s".to_string()));
3022    }
3023
3024    #[test]
3025    fn render_health_flags_joins_cmd_argv_with_spaces() {
3026        // CMD form in ECS is argv-style; docker `--health-cmd` only
3027        // accepts a single shell string, so we collapse with spaces.
3028        let hc = HealthCheckSpec {
3029            command: vec![
3030                "CMD".into(),
3031                "/bin/probe".into(),
3032                "--port".into(),
3033                "8080".into(),
3034            ],
3035            interval_seconds: 30,
3036            timeout_seconds: 5,
3037            retries: 3,
3038            start_period_seconds: 0,
3039        };
3040        let flags = render_health_flags(&hc);
3041        assert_eq!(flags[1], "/bin/probe --port 8080");
3042    }
3043
3044    #[test]
3045    fn build_run_argv_emits_health_flags_when_present() {
3046        let plan = ContainerPlan {
3047            container_name: "app".into(),
3048            image: "alpine".into(),
3049            env: Vec::new(),
3050            entry_point: Vec::new(),
3051            command: Vec::new(),
3052            secrets_refs: Vec::new(),
3053            essential: true,
3054            has_task_role: false,
3055            port_mappings: Vec::new(),
3056            network_mode: None,
3057            depends_on: Vec::new(),
3058            health_check: Some(HealthCheckSpec {
3059                command: vec!["CMD-SHELL".into(), "true".into()],
3060                interval_seconds: 5,
3061                timeout_seconds: 2,
3062                retries: 1,
3063                start_period_seconds: 1,
3064            }),
3065            volume_mounts: Vec::new(),
3066            ulimits: Vec::new(),
3067            linux_parameters: None,
3068            stop_timeout: None,
3069            user: None,
3070            working_directory: None,
3071            tty: false,
3072            interactive: false,
3073            readonly_rootfs: false,
3074        };
3075        let argv = build_run_argv(&plan, &[], "task-1", "host-gateway", "alpine");
3076        let joined = argv.join(" ");
3077        assert!(joined.contains("--health-cmd true"), "argv: {joined}");
3078        assert!(joined.contains("--health-interval=5s"), "argv: {joined}");
3079        assert!(joined.contains("--health-timeout=2s"), "argv: {joined}");
3080        assert!(joined.contains("--health-retries=1"), "argv: {joined}");
3081        assert!(
3082            joined.contains("--health-start-period=1s"),
3083            "argv: {joined}"
3084        );
3085    }
3086
3087    #[test]
3088    fn build_run_argv_emits_no_health_flags_when_absent() {
3089        let plan = ContainerPlan {
3090            container_name: "app".into(),
3091            image: "alpine".into(),
3092            env: Vec::new(),
3093            entry_point: Vec::new(),
3094            command: Vec::new(),
3095            secrets_refs: Vec::new(),
3096            essential: true,
3097            has_task_role: false,
3098            port_mappings: Vec::new(),
3099            network_mode: None,
3100            depends_on: Vec::new(),
3101            health_check: None,
3102            volume_mounts: Vec::new(),
3103            ulimits: Vec::new(),
3104            linux_parameters: None,
3105            stop_timeout: None,
3106            user: None,
3107            working_directory: None,
3108            tty: false,
3109            interactive: false,
3110            readonly_rootfs: false,
3111        };
3112        let argv = build_run_argv(&plan, &[], "task-1", "host-gateway", "alpine");
3113        assert!(!argv.iter().any(|s| s.starts_with("--health")));
3114    }
3115
3116    #[test]
3117    fn docker_health_to_ecs_maps_known_states() {
3118        assert_eq!(docker_health_to_ecs("healthy"), "HEALTHY");
3119        assert_eq!(docker_health_to_ecs("HEALTHY"), "HEALTHY");
3120        assert_eq!(docker_health_to_ecs("unhealthy"), "UNHEALTHY");
3121        assert_eq!(docker_health_to_ecs("starting"), "UNKNOWN");
3122        assert_eq!(docker_health_to_ecs("none"), "UNKNOWN");
3123        assert_eq!(docker_health_to_ecs(""), "UNKNOWN");
3124    }
3125
3126    /// `host.sourcePath` becomes a host bind mount with the path
3127    /// passed straight through to docker.
3128    #[test]
3129    fn resolve_host_bind_volume_uses_source_path() {
3130        let mut volumes = std::collections::HashMap::new();
3131        let v = serde_json::json!({
3132            "name": "data",
3133            "host": { "sourcePath": "/var/lib/myapp" }
3134        });
3135        volumes.insert("data".to_string(), &v);
3136        let mp = serde_json::json!({
3137            "sourceVolume": "data",
3138            "containerPath": "/app/data",
3139            "readOnly": false
3140        });
3141        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
3142        assert_eq!(resolved.source, "/var/lib/myapp");
3143        assert_eq!(resolved.container_path, "/app/data");
3144        assert!(!resolved.read_only);
3145    }
3146
3147    /// `readOnly: true` on the mount point appends `:ro` to the
3148    /// rendered docker `-v` flag.
3149    #[test]
3150    fn read_only_mount_renders_ro_suffix() {
3151        let plan = ContainerPlan {
3152            container_name: "app".into(),
3153            image: "alpine".into(),
3154            env: Vec::new(),
3155            entry_point: Vec::new(),
3156            command: Vec::new(),
3157            secrets_refs: Vec::new(),
3158            essential: true,
3159            has_task_role: false,
3160            port_mappings: Vec::new(),
3161            network_mode: None,
3162            depends_on: Vec::new(),
3163            health_check: None,
3164            volume_mounts: vec![VolumeMount {
3165                source: "/host/path".into(),
3166                container_path: "/in/container".into(),
3167                read_only: true,
3168            }],
3169            ulimits: Vec::new(),
3170            linux_parameters: None,
3171            stop_timeout: None,
3172            user: None,
3173            working_directory: None,
3174            tty: false,
3175            interactive: false,
3176            readonly_rootfs: false,
3177        };
3178        let argv = build_run_argv(&plan, &[], "task-1", "host-gateway", "alpine");
3179        let pair = argv
3180            .windows(2)
3181            .find(|w| w[0] == "-v")
3182            .expect("expected -v flag");
3183        assert_eq!(pair[1], "/host/path:/in/container:ro");
3184    }
3185
3186    /// EFS volumes resolve to a stub directory under `/tmp/fakecloud/efs`
3187    /// keyed by `fileSystemId`. `rootDirectory` (when set and not `/`)
3188    /// is appended so different mount targets within the same
3189    /// filesystem stay isolated.
3190    #[test]
3191    fn resolve_efs_volume_uses_stub_dir() {
3192        let mut volumes = std::collections::HashMap::new();
3193        let v = serde_json::json!({
3194            "name": "efs-vol",
3195            "efsVolumeConfiguration": {
3196                "fileSystemId": "fs-12345678",
3197                "rootDirectory": "/exports/app"
3198            }
3199        });
3200        volumes.insert("efs-vol".to_string(), &v);
3201        let mp = serde_json::json!({
3202            "sourceVolume": "efs-vol",
3203            "containerPath": "/mnt/efs"
3204        });
3205        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
3206        assert_eq!(
3207            resolved.source,
3208            "/tmp/fakecloud/efs/fs-12345678/exports/app"
3209        );
3210        assert_eq!(resolved.container_path, "/mnt/efs");
3211    }
3212
3213    /// EFS without `rootDirectory` (or with `/`) maps to the root of
3214    /// the filesystem stub so multiple tasks targeting the same id
3215    /// share state.
3216    #[test]
3217    fn efs_without_root_directory_uses_filesystem_root() {
3218        assert_eq!(
3219            stub_dir_for("efs", "fs-abc", "/"),
3220            "/tmp/fakecloud/efs/fs-abc"
3221        );
3222        assert_eq!(
3223            stub_dir_for("efs", "fs-abc", ""),
3224            "/tmp/fakecloud/efs/fs-abc"
3225        );
3226    }
3227
3228    /// `dockerVolumeConfiguration` resolves to the volume name itself,
3229    /// which docker treats as a named volume reference. No host path
3230    /// is materialised — docker creates the volume on first reference.
3231    #[test]
3232    fn resolve_docker_named_volume_uses_volume_name() {
3233        let mut volumes = std::collections::HashMap::new();
3234        let v = serde_json::json!({
3235            "name": "named-vol",
3236            "dockerVolumeConfiguration": {
3237                "scope": "task",
3238                "driver": "local"
3239            }
3240        });
3241        volumes.insert("named-vol".to_string(), &v);
3242        let mp = serde_json::json!({
3243            "sourceVolume": "named-vol",
3244            "containerPath": "/data"
3245        });
3246        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
3247        assert_eq!(resolved.source, "named-vol");
3248        assert_eq!(resolved.container_path, "/data");
3249    }
3250
3251    /// FSx for Windows uses the same stub-directory pattern as EFS but
3252    /// scoped under `/tmp/fakecloud/fsx/<filesystemId>/`.
3253    #[test]
3254    fn resolve_fsx_volume_uses_stub_dir() {
3255        let mut volumes = std::collections::HashMap::new();
3256        let v = serde_json::json!({
3257            "name": "fsx-vol",
3258            "fsxWindowsFileServerVolumeConfiguration": {
3259                "fileSystemId": "fs-xyz",
3260                "rootDirectory": "share"
3261            }
3262        });
3263        volumes.insert("fsx-vol".to_string(), &v);
3264        let mp = serde_json::json!({
3265            "sourceVolume": "fsx-vol",
3266            "containerPath": "C:\\data"
3267        });
3268        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
3269        assert_eq!(resolved.source, "/tmp/fakecloud/fsx/fs-xyz/share");
3270    }
3271
3272    /// Mount points that reference an undeclared `sourceVolume` resolve
3273    /// to `None` so `build_container_plans` skips them rather than
3274    /// emitting a broken `-v` flag.
3275    #[test]
3276    fn unknown_source_volume_returns_none() {
3277        let volumes = std::collections::HashMap::new();
3278        let mp = serde_json::json!({
3279            "sourceVolume": "missing",
3280            "containerPath": "/x"
3281        });
3282        assert!(resolve_mount_point(&mp, &volumes).is_none());
3283    }
3284
3285    /// `find_depends_on_cycle` returns the back-edge endpoints when a
3286    /// trivial 2-cycle exists. Real ECS would reject this at register
3287    /// time; our service-level handler relies on this helper.
3288    #[test]
3289    fn find_depends_on_cycle_detects_two_node_cycle() {
3290        let cds = vec![
3291            serde_json::json!({
3292                "name": "a",
3293                "image": "alpine",
3294                "dependsOn": [{"containerName": "b", "condition": "START"}],
3295            }),
3296            serde_json::json!({
3297                "name": "b",
3298                "image": "alpine",
3299                "dependsOn": [{"containerName": "a", "condition": "START"}],
3300            }),
3301        ];
3302        let cycle = find_depends_on_cycle(&cds);
3303        assert!(cycle.is_some(), "expected cycle to be detected");
3304    }
3305
3306    /// A three-node chain (a -> b -> c) is acyclic and must not be
3307    /// flagged. Guards against an over-eager DFS reporting back-edges
3308    /// from already-finished nodes.
3309    #[test]
3310    fn find_depends_on_cycle_accepts_chain() {
3311        let cds = vec![
3312            serde_json::json!({
3313                "name": "a",
3314                "image": "alpine",
3315                "dependsOn": [{"containerName": "b", "condition": "START"}],
3316            }),
3317            serde_json::json!({
3318                "name": "b",
3319                "image": "alpine",
3320                "dependsOn": [{"containerName": "c", "condition": "START"}],
3321            }),
3322            serde_json::json!({
3323                "name": "c",
3324                "image": "alpine",
3325            }),
3326        ];
3327        assert!(find_depends_on_cycle(&cds).is_none());
3328    }
3329
3330    /// `dependsOn[]` entries that name a container outside the task
3331    /// definition are ignored by the cycle check (they can't form a
3332    /// cycle by definition; runtime also drops them).
3333    #[test]
3334    fn find_depends_on_cycle_ignores_unknown_target() {
3335        let cds = vec![serde_json::json!({
3336            "name": "only",
3337            "image": "alpine",
3338            "dependsOn": [{"containerName": "ghost", "condition": "START"}],
3339        })];
3340        assert!(find_depends_on_cycle(&cds).is_none());
3341    }
3342
3343    /// `condition_is_met` covers each AWS condition value against a
3344    /// simulated docker inspect snapshot. Pinning these mappings here
3345    /// catches accidental re-orderings of the match arms.
3346    #[test]
3347    fn condition_is_met_matches_aws_semantics() {
3348        let running = InspectedState {
3349            started: true,
3350            exited: false,
3351            exit_code: 0,
3352            health: None,
3353        };
3354        let exited_ok = InspectedState {
3355            started: true,
3356            exited: true,
3357            exit_code: 0,
3358            health: None,
3359        };
3360        let exited_fail = InspectedState {
3361            started: true,
3362            exited: true,
3363            exit_code: 1,
3364            health: None,
3365        };
3366        let healthy = InspectedState {
3367            started: true,
3368            exited: false,
3369            exit_code: 0,
3370            health: Some("healthy".into()),
3371        };
3372
3373        // START is satisfied as soon as the container has started, even
3374        // if it later exited.
3375        assert!(condition_is_met(DependsOnCondition::Start, &running));
3376        assert!(condition_is_met(DependsOnCondition::Start, &exited_ok));
3377
3378        // COMPLETE requires an exit, regardless of code.
3379        assert!(!condition_is_met(DependsOnCondition::Complete, &running));
3380        assert!(condition_is_met(DependsOnCondition::Complete, &exited_ok));
3381        assert!(condition_is_met(DependsOnCondition::Complete, &exited_fail));
3382
3383        // SUCCESS requires an exit AND code 0.
3384        assert!(!condition_is_met(DependsOnCondition::Success, &running));
3385        assert!(condition_is_met(DependsOnCondition::Success, &exited_ok));
3386        assert!(!condition_is_met(DependsOnCondition::Success, &exited_fail));
3387
3388        // HEALTHY requires Health.Status == "healthy".
3389        assert!(!condition_is_met(DependsOnCondition::Healthy, &running));
3390        assert!(condition_is_met(DependsOnCondition::Healthy, &healthy));
3391    }
3392
3393    /// `DependsOnCondition::parse` accepts the four AWS-spelled values
3394    /// and rejects everything else — register-time validation depends on
3395    /// this returning `None` for unknowns.
3396    #[test]
3397    fn depends_on_condition_parse_round_trips() {
3398        assert_eq!(
3399            DependsOnCondition::parse("START"),
3400            Some(DependsOnCondition::Start)
3401        );
3402        assert_eq!(
3403            DependsOnCondition::parse("COMPLETE"),
3404            Some(DependsOnCondition::Complete)
3405        );
3406        assert_eq!(
3407            DependsOnCondition::parse("SUCCESS"),
3408            Some(DependsOnCondition::Success)
3409        );
3410        assert_eq!(
3411            DependsOnCondition::parse("HEALTHY"),
3412            Some(DependsOnCondition::Healthy)
3413        );
3414        assert_eq!(DependsOnCondition::parse("start"), None);
3415        assert_eq!(DependsOnCondition::parse("ANY"), None);
3416    }
3417
3418    // ── ulimits + linuxParameters + misc docker flags (O6) ──
3419
3420    #[test]
3421    fn build_run_argv_emits_ulimits() {
3422        let plan = ContainerPlan {
3423            container_name: "app".into(),
3424            image: "alpine".into(),
3425            env: Vec::new(),
3426            entry_point: Vec::new(),
3427            command: Vec::new(),
3428            secrets_refs: Vec::new(),
3429            essential: true,
3430            has_task_role: false,
3431            port_mappings: Vec::new(),
3432            network_mode: None,
3433            depends_on: Vec::new(),
3434            health_check: None,
3435            volume_mounts: Vec::new(),
3436            ulimits: vec![Ulimit {
3437                name: "nofile".into(),
3438                soft_limit: 1024,
3439                hard_limit: 2048,
3440            }],
3441            linux_parameters: None,
3442            stop_timeout: None,
3443            user: None,
3444            working_directory: None,
3445            tty: false,
3446            interactive: false,
3447            readonly_rootfs: false,
3448        };
3449        let argv = build_run_argv(&plan, &[], "t", "host", "img");
3450        assert!(argv.contains(&"--ulimit".to_string()));
3451        assert!(argv.contains(&"nofile=1024:2048".to_string()));
3452    }
3453
3454    #[test]
3455    fn build_run_argv_emits_linux_parameters() {
3456        let plan = ContainerPlan {
3457            container_name: "app".into(),
3458            image: "alpine".into(),
3459            env: Vec::new(),
3460            entry_point: Vec::new(),
3461            command: Vec::new(),
3462            secrets_refs: Vec::new(),
3463            essential: true,
3464            has_task_role: false,
3465            port_mappings: Vec::new(),
3466            network_mode: None,
3467            depends_on: Vec::new(),
3468            health_check: None,
3469            volume_mounts: Vec::new(),
3470            ulimits: Vec::new(),
3471            linux_parameters: Some(LinuxParameters {
3472                capabilities_add: vec!["NET_ADMIN".into()],
3473                capabilities_drop: vec!["ALL".into()],
3474                devices: vec![Device {
3475                    host_path: "/dev/zero".into(),
3476                    container_path: "/dev/zero".into(),
3477                    permissions: "rwm".into(),
3478                }],
3479                init_process_enabled: true,
3480                shared_memory_size: Some(256),
3481                sysctls: vec![Sysctl {
3482                    name: "net.ipv4.ip_forward".into(),
3483                    value: "1".into(),
3484                }],
3485                tmpfs: vec![Tmpfs {
3486                    container_path: "/tmp".into(),
3487                    size: 128,
3488                    mount_options: vec!["noexec".into()],
3489                }],
3490                privileged: true,
3491            }),
3492            stop_timeout: Some(30),
3493            user: Some("1000:1000".into()),
3494            working_directory: Some("/app".into()),
3495            tty: true,
3496            interactive: true,
3497            readonly_rootfs: true,
3498        };
3499        let argv = build_run_argv(&plan, &[], "t", "host", "img");
3500        assert!(argv.contains(&"--cap-add".to_string()));
3501        assert!(argv.contains(&"NET_ADMIN".to_string()));
3502        assert!(argv.contains(&"--cap-drop".to_string()));
3503        assert!(argv.contains(&"ALL".to_string()));
3504        assert!(argv.contains(&"--device".to_string()));
3505        assert!(argv.contains(&"/dev/zero:/dev/zerorwm".to_string()));
3506        assert!(argv.contains(&"--init".to_string()));
3507        assert!(argv.contains(&"--shm-size".to_string()));
3508        assert!(argv.contains(&"256m".to_string()));
3509        assert!(argv.contains(&"--sysctl".to_string()));
3510        assert!(argv.contains(&"net.ipv4.ip_forward=1".to_string()));
3511        assert!(argv.contains(&"--tmpfs".to_string()));
3512        assert!(argv.contains(&"--privileged".to_string()));
3513        assert!(argv.contains(&"--stop-timeout".to_string()));
3514        assert!(argv.contains(&"30".to_string()));
3515        assert!(argv.contains(&"--user".to_string()));
3516        assert!(argv.contains(&"1000:1000".to_string()));
3517        assert!(argv.contains(&"--workdir".to_string()));
3518        assert!(argv.contains(&"/app".to_string()));
3519        assert!(argv.contains(&"--tty".to_string()));
3520        assert!(argv.contains(&"--interactive".to_string()));
3521        assert!(argv.contains(&"--read-only".to_string()));
3522    }
3523
3524    #[test]
3525    fn parse_linux_parameters_fills_defaults() {
3526        let raw = serde_json::json!({"initProcessEnabled": true});
3527        let lp = parse_linux_parameters(&raw).expect("parses");
3528        assert!(lp.init_process_enabled);
3529        assert!(!lp.privileged);
3530        assert!(lp.capabilities_add.is_empty());
3531    }
3532
3533    #[test]
3534    fn parse_device_uses_default_permissions() {
3535        let raw = serde_json::json!({"hostPath": "/dev/null", "containerPath": "/dev/null"});
3536        let dev = parse_device(&raw).expect("parses");
3537        assert_eq!(dev.permissions, "rwm");
3538    }
3539
3540    #[test]
3541    fn compute_elbv2_targets_empty_when_no_group() {
3542        let mut accounts: MultiAccountState<EcsState> =
3543            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
3544        let acct = accounts.get_or_create("000000000000");
3545        let mut task = make_task("t1");
3546        task.group = None;
3547        acct.tasks.insert("t1".into(), task);
3548        let state = acct.clone();
3549        let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
3550        assert!(targets.is_empty());
3551    }
3552
3553    #[test]
3554    fn compute_elbv2_targets_bridge_mode_uses_localhost_and_host_port() {
3555        let mut accounts: MultiAccountState<EcsState> =
3556            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
3557        let acct = accounts.get_or_create("000000000000");
3558
3559        let td = crate::state::TaskDefinition {
3560            family: "app".into(),
3561            revision: 1,
3562            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
3563            container_definitions: Vec::new(),
3564            network_mode: Some("bridge".into()),
3565            status: "ACTIVE".into(),
3566            task_role_arn: None,
3567            execution_role_arn: None,
3568            requires_compatibilities: Vec::new(),
3569            compatibilities: Vec::new(),
3570            cpu: None,
3571            memory: None,
3572            pid_mode: None,
3573            ipc_mode: None,
3574            volumes: Vec::new(),
3575            placement_constraints: Vec::new(),
3576            proxy_configuration: None,
3577            inference_accelerators: Vec::new(),
3578            ephemeral_storage: None,
3579            runtime_platform: None,
3580            requires_attributes: Vec::new(),
3581            registered_at: Utc::now(),
3582            registered_by: None,
3583            deregistered_at: None,
3584            tags: Vec::new(),
3585            enable_fault_injection: None,
3586        };
3587        acct.task_definitions.insert("app".into(), {
3588            let mut m = std::collections::BTreeMap::new();
3589            m.insert(1, td);
3590            m
3591        });
3592
3593        let service = crate::state::Service {
3594            service_name: "svc".into(),
3595            service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
3596            cluster_name: "default".into(),
3597            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
3598            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
3599            family: "app".into(),
3600            revision: 1,
3601            desired_count: 1,
3602            running_count: 0,
3603            pending_count: 0,
3604            launch_type: "FARGATE".into(),
3605            status: "ACTIVE".into(),
3606            scheduling_strategy: "REPLICA".into(),
3607            deployment_controller: "ECS".into(),
3608            minimum_healthy_percent: Some(0),
3609            maximum_percent: Some(200),
3610            circuit_breaker: None,
3611            deployments: Vec::new(),
3612            load_balancers: vec![serde_json::json!({
3613                "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
3614                "containerName": "app",
3615                "containerPort": 80,
3616            })],
3617            service_registries: Vec::new(),
3618            placement_constraints: Vec::new(),
3619            placement_strategy: Vec::new(),
3620            network_configuration: None,
3621            volume_configurations: vec![],
3622            tags: Vec::new(),
3623            created_at: Utc::now(),
3624            created_by: None,
3625            role_arn: None,
3626            platform_version: None,
3627            health_check_grace_period_seconds: None,
3628            enable_execute_command: false,
3629            enable_ecs_managed_tags: false,
3630            propagate_tags: None,
3631            capacity_provider_strategy: Vec::new(),
3632            availability_zone_rebalancing: None,
3633        };
3634        acct.services.insert(
3635            crate::state::EcsState::service_key("default", "svc"),
3636            service,
3637        );
3638
3639        let mut task = make_task("t1");
3640        task.group = Some("service:svc".into());
3641        task.containers = vec![crate::state::Container {
3642            container_arn: "arn:aws:ecs:us-east-1:000000000000:container/default/abc/app".into(),
3643            name: "app".into(),
3644            image: "alpine".into(),
3645            task_arn: task.task_arn.clone(),
3646            last_status: "RUNNING".into(),
3647            exit_code: None,
3648            reason: None,
3649            runtime_id: Some("dockerid-app".into()),
3650            essential: true,
3651            cpu: None,
3652            memory: None,
3653            memory_reservation: None,
3654            network_bindings: vec![serde_json::json!({
3655                "bindIP": "0.0.0.0",
3656                "containerPort": 80,
3657                "hostPort": 32768,
3658                "protocol": "tcp",
3659            })],
3660            network_interfaces: Vec::new(),
3661            health_status: None,
3662            managed_agents: None,
3663            image_digest: None,
3664        }];
3665        acct.tasks.insert("t1".into(), task);
3666
3667        let state = acct.clone();
3668        let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
3669        assert_eq!(targets.len(), 1);
3670        let (arn, tg_targets) = &targets[0];
3671        assert_eq!(
3672            arn,
3673            "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
3674        );
3675        assert_eq!(tg_targets.len(), 1);
3676        assert_eq!(tg_targets[0].0, "127.0.0.1");
3677        assert_eq!(tg_targets[0].1, Some(32768));
3678    }
3679
3680    #[test]
3681    fn compute_elbv2_targets_awsvpc_uses_eni_ip() {
3682        let mut accounts: MultiAccountState<EcsState> =
3683            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
3684        let acct = accounts.get_or_create("000000000000");
3685
3686        let td = crate::state::TaskDefinition {
3687            family: "app".into(),
3688            revision: 1,
3689            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
3690            container_definitions: Vec::new(),
3691            network_mode: Some("awsvpc".into()),
3692            status: "ACTIVE".into(),
3693            task_role_arn: None,
3694            execution_role_arn: None,
3695            requires_compatibilities: Vec::new(),
3696            compatibilities: Vec::new(),
3697            cpu: None,
3698            memory: None,
3699            pid_mode: None,
3700            ipc_mode: None,
3701            volumes: Vec::new(),
3702            placement_constraints: Vec::new(),
3703            proxy_configuration: None,
3704            inference_accelerators: Vec::new(),
3705            ephemeral_storage: None,
3706            runtime_platform: None,
3707            requires_attributes: Vec::new(),
3708            registered_at: Utc::now(),
3709            registered_by: None,
3710            deregistered_at: None,
3711            tags: Vec::new(),
3712            enable_fault_injection: None,
3713        };
3714        acct.task_definitions.insert("app".into(), {
3715            let mut m = std::collections::BTreeMap::new();
3716            m.insert(1, td);
3717            m
3718        });
3719
3720        let service = crate::state::Service {
3721            service_name: "svc".into(),
3722            service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
3723            cluster_name: "default".into(),
3724            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
3725            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
3726            family: "app".into(),
3727            revision: 1,
3728            desired_count: 1,
3729            running_count: 0,
3730            pending_count: 0,
3731            launch_type: "FARGATE".into(),
3732            status: "ACTIVE".into(),
3733            scheduling_strategy: "REPLICA".into(),
3734            deployment_controller: "ECS".into(),
3735            minimum_healthy_percent: Some(0),
3736            maximum_percent: Some(200),
3737            circuit_breaker: None,
3738            deployments: Vec::new(),
3739            load_balancers: vec![serde_json::json!({
3740                "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
3741                "containerName": "app",
3742                "containerPort": 80,
3743            })],
3744            service_registries: Vec::new(),
3745            placement_constraints: Vec::new(),
3746            placement_strategy: Vec::new(),
3747            network_configuration: None,
3748            volume_configurations: vec![],
3749            tags: Vec::new(),
3750            created_at: Utc::now(),
3751            created_by: None,
3752            role_arn: None,
3753            platform_version: None,
3754            health_check_grace_period_seconds: None,
3755            enable_execute_command: false,
3756            enable_ecs_managed_tags: false,
3757            propagate_tags: None,
3758            capacity_provider_strategy: Vec::new(),
3759            availability_zone_rebalancing: None,
3760        };
3761        acct.services.insert(
3762            crate::state::EcsState::service_key("default", "svc"),
3763            service,
3764        );
3765
3766        let mut task = make_task("t1");
3767        task.group = Some("service:svc".into());
3768        task.attachments = vec![crate::state::TaskAttachment {
3769            id: "eni-123".into(),
3770            attachment_type: "eni".into(),
3771            status: "ATTACHED".into(),
3772            details: vec![
3773                crate::state::AttachmentDetail {
3774                    name: "privateIPv4Address".into(),
3775                    value: "172.18.0.2".into(),
3776                },
3777                crate::state::AttachmentDetail {
3778                    name: "macAddress".into(),
3779                    value: "02:42:ac:12:00:02".into(),
3780                },
3781            ],
3782        }];
3783        acct.tasks.insert("t1".into(), task);
3784
3785        let state = acct.clone();
3786        let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
3787        assert_eq!(targets.len(), 1);
3788        let (arn, tg_targets) = &targets[0];
3789        assert_eq!(
3790            arn,
3791            "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
3792        );
3793        assert_eq!(tg_targets.len(), 1);
3794        assert_eq!(tg_targets[0].0, "172.18.0.2");
3795        assert_eq!(tg_targets[0].1, Some(80));
3796    }
3797}