Skip to main content

fakecloud_ecs/runtime/
mod.rs

1//! Docker/Podman-based ECS task execution.
2//!
3//! Mirrors the Lambda `ContainerRuntime` approach (auto-detect CLI, forward
4//! localhost → host.docker.internal) but scoped for ECS's different
5//! lifecycle: tasks are ephemeral, so there is no warm-container pool. Each
6//! `run_task` spawns a background tokio task that pulls the image, starts
7//! the container, waits for exit, captures logs, and updates shared ECS
8//! state in place.
9
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use base64::Engine;
15use chrono::Utc;
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_logs::ingest::{append_events, IngestEvent};
18use fakecloud_logs::SharedLogsState;
19use fakecloud_secretsmanager::SharedSecretsManagerState;
20use fakecloud_ssm::SharedSsmState;
21use parking_lot::RwLock;
22use tempfile::TempDir;
23use tokio::process::Command;
24
25use crate::state::{LifecycleEvent, SharedEcsState};
26
27#[derive(Debug, thiserror::Error)]
28pub enum RuntimeError {
29    #[error("container CLI not found (tried docker, podman)")]
30    NoCli,
31    #[error("image pull failed: {0}")]
32    ImagePull(String),
33    #[error("container start failed: {0}")]
34    ContainerStart(String),
35    #[error("docker wait failed: {0}")]
36    Wait(String),
37}
38
39/// Docker/Podman executor for ECS tasks.
40pub struct EcsRuntime {
41    cli: String,
42    /// Container-to-host networking resolution (host alias, `--add-host`
43    /// arg, sibling-container address) shared with the other runtimes via
44    /// [`fakecloud_core::container_net`]. Carries the issue #1539 podman +
45    /// in-container fixes.
46    net: fakecloud_core::container_net::HostNetworking,
47    /// Port the main fakecloud server bound to. Used to translate AWS
48    /// ECR URIs (`<acct>.dkr.ecr.<region>.amazonaws.com/<repo>:<tag>`) to
49    /// the local OCI v2 endpoint (`127.0.0.1:<port>/<repo>:<tag>`) so
50    /// tasks can pull images pushed to fakecloud's own ECR.
51    server_port: u16,
52    /// Isolated DOCKER_CONFIG dir pre-populated with Basic auth for
53    /// `127.0.0.1:<port>`; keeps the host user's `~/.docker/config.json`
54    /// untouched and lets `docker pull` succeed against fakecloud ECR
55    /// without a prior `aws ecr get-login-password | docker login`.
56    docker_config: Option<Arc<TempDir>>,
57    /// Tracks per-task lists of `(container_name, docker_container_id)` so
58    /// `stop_task` can kill every container backing a task — multi-container
59    /// task definitions launch one docker container per `containerDefinitions`
60    /// entry, all of which must be torn down on stop.
61    containers: RwLock<std::collections::HashMap<String, Vec<(String, String)>>>,
62    /// Cross-service delivery bus — emits `aws.ecs` EventBridge events
63    /// on task state transitions when wired. `None` if the server started
64    /// without EventBridge configured (or for unit tests).
65    delivery_bus: Option<Arc<DeliveryBus>>,
66    /// CloudWatch Logs state — when set, tasks whose container definition
67    /// declares the `awslogs` log driver get their captured stdout/stderr
68    /// forwarded to a log group/stream under this shared state.
69    logs_state: Option<SharedLogsState>,
70    /// SecretsManager state for resolving `containerDefinition.secrets[]`
71    /// entries whose `valueFrom` is a SecretsManager ARN.
72    secretsmanager_state: Option<SharedSecretsManagerState>,
73    /// SSM Parameter Store state for resolving `secrets[]` entries whose
74    /// `valueFrom` is an SSM parameter ARN.
75    ssm_state: Option<SharedSsmState>,
76    /// `Some` when running on the Kubernetes backend; `run_task` then maps
77    /// each task to a Pod instead of `docker run`. `None` is the default
78    /// Docker/Podman backend, and the fields above drive it.
79    k8s: Option<k8s::K8sTaskBackend>,
80}
81
82mod config;
83mod k8s;
84mod lb;
85mod monitoring;
86mod secrets;
87mod task_lifecycle;
88
89impl EcsRuntime {
90    /// Auto-detect Docker or Podman. Returns `None` if neither is
91    /// available. Honours `FAKECLOUD_CONTAINER_CLI` for explicit override.
92    /// `server_port` is the port the main fakecloud server bound to;
93    /// needed to resolve AWS ECR URIs against the local OCI v2 registry.
94    pub fn new(server_port: u16) -> Option<Self> {
95        let cli = fakecloud_core::container_net::detect_container_cli()?;
96        let net = fakecloud_core::container_net::HostNetworking::detect(&cli);
97        let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
98        Some(Self {
99            cli,
100            net,
101            server_port,
102            docker_config,
103            containers: RwLock::new(std::collections::HashMap::new()),
104            delivery_bus: None,
105            logs_state: None,
106            secretsmanager_state: None,
107            ssm_state: None,
108            k8s: None,
109        })
110    }
111
112    /// Construct the Kubernetes backend. `server_port` is fakecloud's
113    /// bound port (used when `FAKECLOUD_K8S_SELF_URL` omits one). Fails
114    /// fast on misconfiguration — never silently degrades to Docker.
115    pub async fn new_k8s(server_port: u16) -> Result<Self, k8s::BackendInitError> {
116        let backend = k8s::K8sTaskBackend::from_env(server_port).await?;
117        // Docker fields are inert on the k8s backend; populate the cheap
118        // ones and leave docker_config unset.
119        let net = fakecloud_core::container_net::HostNetworking {
120            host_alias: String::new(),
121            add_host_arg: None,
122            sibling_host: String::new(),
123        };
124        Ok(Self {
125            cli: String::new(),
126            net,
127            server_port,
128            docker_config: None,
129            containers: RwLock::new(std::collections::HashMap::new()),
130            delivery_bus: None,
131            logs_state: None,
132            secretsmanager_state: None,
133            ssm_state: None,
134            k8s: Some(backend),
135        })
136    }
137
138    /// Backend name for logging.
139    pub fn cli_name(&self) -> &str {
140        if self.k8s.is_some() {
141            "kubernetes"
142        } else {
143            &self.cli
144        }
145    }
146
147    /// Sweep task Pods orphaned by a previous process (k8s only; no-op on
148    /// the Docker backend, handled by the shared container reaper).
149    pub async fn reap_stale(&self) {
150        if let Some(k) = &self.k8s {
151            k.reap_stale().await;
152        }
153    }
154
155    /// Wire EventBridge delivery so task state transitions emit
156    /// `aws.ecs` / `ECS Task State Change` events.
157    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
158        self.delivery_bus = Some(bus);
159        self
160    }
161
162    /// Wire CloudWatch Logs state so tasks using the `awslogs` driver
163    /// get their captured stdout/stderr forwarded.
164    pub fn with_logs(mut self, logs: SharedLogsState) -> Self {
165        self.logs_state = Some(logs);
166        self
167    }
168}
169
170/// Per-container launch plan derived from a task definition.
171#[derive(Clone, Debug)]
172pub(crate) struct ContainerPlan {
173    pub(crate) container_name: String,
174    pub(crate) image: String,
175    pub(crate) env: Vec<(String, String)>,
176    pub(crate) entry_point: Vec<String>,
177    pub(crate) command: Vec<String>,
178    pub(crate) secrets_refs: Vec<(String, String)>,
179    pub(crate) essential: bool,
180    pub(crate) has_task_role: bool,
181    /// Port mappings parsed from the task definition. Each entry becomes
182    /// a `--publish containerPort:hostPort/protocol` flag on the docker
183    /// run command (except for `awsvpc`, where ports are exposed via the
184    /// per-task ENI rather than the docker host's port table).
185    pub(crate) port_mappings: Vec<PortMapping>,
186    /// Task-level network mode propagated to every container plan so the
187    /// argv builder can decide whether to emit `--publish` flags. Real
188    /// ECS treats `awsvpc` as "container is on its own ENI"; the
189    /// equivalent in fakecloud is "don't publish to the host".
190    pub(crate) network_mode: Option<String>,
191    /// Container dependencies parsed from `dependsOn[]`. Each entry pairs
192    /// the target container name with the condition that must be observed
193    /// before this container is launched: `START` (target exists/running),
194    /// `COMPLETE` (target exited, any code), `SUCCESS` (target exited with
195    /// code 0), or `HEALTHY` (target's docker `Health.Status` is `healthy`).
196    /// Used both to topologically order the launch loop and to gate each
197    /// `docker run` on the upstream condition.
198    pub(crate) depends_on: Vec<DependsOn>,
199    /// Parsed `healthCheck` from the task definition. Translated into
200    /// docker `--health-*` flags on `docker run` so the container's
201    /// health is observable via `docker inspect .State.Health.Status`.
202    /// `None` when the task definition doesn't declare a healthCheck;
203    /// the container's `healthStatus` then stays `UNKNOWN` (matching ECS
204    /// behaviour for tasks without a health probe).
205    pub(crate) health_check: Option<HealthCheckSpec>,
206    /// Volume mounts resolved by joining the container definition's
207    /// `mountPoints[]` with the task definition's `volumes[]`. Each entry
208    /// renders as one `-v` flag on the `docker run` invocation. Empty when
209    /// the container has no mount points or no matching volume entries.
210    pub(crate) volume_mounts: Vec<VolumeMount>,
211    /// Parsed `ulimits` from the container definition. Each entry becomes
212    /// `--ulimit <name>=<soft>:<hard>` on `docker run`.
213    pub(crate) ulimits: Vec<Ulimit>,
214    /// Parsed `linuxParameters` from the container definition. Emits
215    /// `--cap-add`, `--cap-drop`, `--device`, `--init`, `--shm-size`,
216    /// `--sysctl`, `--tmpfs`, `--privileged`, and `--read-only` flags.
217    pub(crate) linux_parameters: Option<LinuxParameters>,
218    /// `stopTimeout` in seconds. Becomes `--stop-timeout <N>` on `docker run`.
219    pub(crate) stop_timeout: Option<u32>,
220    /// `user` from the container definition. Becomes `--user <value>`.
221    pub(crate) user: Option<String>,
222    /// `workingDirectory` from the container definition. Becomes `--workdir`.
223    pub(crate) working_directory: Option<String>,
224    /// `tty` from the container definition. Emits `--tty` when true.
225    pub(crate) tty: bool,
226    /// `interactive` from the container definition. Emits `--interactive` when true.
227    pub(crate) interactive: bool,
228    /// `readonlyRootFilesystem` from the container definition. Emits `--read-only` when true.
229    pub(crate) readonly_rootfs: bool,
230}
231
232/// One parsed `dependsOn[]` entry on a container. Pairs the upstream
233/// container name with the condition that must hold before the dependent
234/// container is launched. AWS spells the conditions `START`, `COMPLETE`,
235/// `SUCCESS`, `HEALTHY` and treats anything else as an error at register
236/// time — we mirror that in [`parse_depends_on`].
237#[derive(Clone, Debug, PartialEq, Eq)]
238pub(crate) struct DependsOn {
239    pub container_name: String,
240    pub condition: DependsOnCondition,
241}
242
243/// `dependsOn[].condition` from the task definition. The variants map
244/// 1:1 to AWS's documented values; the launch loop polls docker for the
245/// matching predicate before starting the dependent container.
246#[derive(Clone, Copy, Debug, PartialEq, Eq)]
247pub(crate) enum DependsOnCondition {
248    /// Upstream container has been started (docker container exists and
249    /// is either running or has exited).
250    Start,
251    /// Upstream container has exited (any exit code).
252    Complete,
253    /// Upstream container has exited with code 0.
254    Success,
255    /// Upstream container's `Health.Status` is `healthy`. When the
256    /// upstream has no healthCheck configured, AWS treats this as
257    /// immediately satisfied — we do the same.
258    Healthy,
259}
260
261impl DependsOnCondition {
262    /// Parse the AWS-spelled condition string. Returns `None` for
263    /// unrecognised values so callers can surface a `ClientException`
264    /// at register time.
265    pub fn parse(raw: &str) -> Option<Self> {
266        match raw {
267            "START" => Some(Self::Start),
268            "COMPLETE" => Some(Self::Complete),
269            "SUCCESS" => Some(Self::Success),
270            "HEALTHY" => Some(Self::Healthy),
271            _ => None,
272        }
273    }
274
275    /// AWS-spelled string for this condition. Used in user-facing error
276    /// messages so timeout/dependency-failed reasons echo back the same
277    /// value the user wrote in their task definition.
278    pub fn as_aws_str(self) -> &'static str {
279        match self {
280            Self::Start => "START",
281            Self::Complete => "COMPLETE",
282            Self::Success => "SUCCESS",
283            Self::Healthy => "HEALTHY",
284        }
285    }
286}
287
288/// Container health check parsed from the ECS task definition. Each
289/// field maps 1:1 to a docker `--health-*` flag on `docker run`. AWS
290/// defaults: interval=30s, timeout=5s, retries=3, startPeriod=0s — we
291/// preserve those defaults at parse time so the argv builder always
292/// has concrete values to emit.
293#[derive(Clone, Debug, PartialEq, Eq)]
294pub(crate) struct HealthCheckSpec {
295    /// `command[]` from the task definition. The first element selects
296    /// the docker syntax: `CMD-SHELL` => `--health-cmd <rest joined by space>`,
297    /// `CMD` => `--health-cmd <rest joined by space>` (still routed to
298    /// `--health-cmd` because docker doesn't accept argv-form here),
299    /// `NONE` => no flag emitted (caller skips emitting healthcheck).
300    pub command: Vec<String>,
301    pub interval_seconds: u32,
302    pub timeout_seconds: u32,
303    pub retries: u32,
304    pub start_period_seconds: u32,
305}
306
307/// One entry in a container's `portMappings`. Mirrors the AWS shape so
308/// [`build_run_argv`] and the `networkBindings` response can share the
309/// same parsed representation.
310#[derive(Clone, Debug, PartialEq, Eq)]
311pub(crate) struct PortMapping {
312    pub container_port: u16,
313    /// `0` (or unset in the source JSON) means "use the same value as
314    /// containerPort" — host-mode default per AWS docs.
315    pub host_port: u16,
316    /// Lower-case `tcp` / `udp`. Defaults to `tcp` when omitted.
317    pub protocol: String,
318}
319
320/// One resolved `mountPoints` entry on a container plan. Computed at
321/// launch by joining the container definition's `mountPoints` against the
322/// task definition's `volumes` array. Each entry becomes a single
323/// `-v <source>:<containerPath>[:ro]` flag on `docker run`.
324///
325/// Source resolution by volume kind:
326/// - **host bind** (`volume.host.sourcePath` set): bind the host path
327///   into the container at `containerPath`.
328/// - **EFS** (`efsVolumeConfiguration` set): bind a host-side stub
329///   directory at `/tmp/fakecloud/efs/<filesystemId>[/<rootDirectory>]`
330///   so multiple tasks targeting the same filesystem id can share state
331///   the way real EFS would. The stub directory is created with
332///   `mkdir -p` ahead of `docker run`.
333/// - **FSx for Windows** (`fsxWindowsFileServerVolumeConfiguration` set):
334///   stub directory at `/tmp/fakecloud/fsx/<filesystemId>/<rootDirectory>`
335///   created the same way as EFS.
336/// - **Docker named volume** (`dockerVolumeConfiguration` set): pass the
337///   volume name through to docker as a named volume reference.
338/// - **Bare volume** (only `name` set, no host config): treated as an
339///   anonymous docker volume for that task — matches AWS's "Docker
340///   volumes" default scope.
341#[derive(Clone, Debug, PartialEq, Eq)]
342pub(crate) struct VolumeMount {
343    /// Left side of `-v`: a host path, a docker named volume, or a stub
344    /// directory under `/tmp/fakecloud/{efs,fsx}/...` for shared FS
345    /// emulation.
346    pub source: String,
347    /// Container-side path, taken verbatim from the container
348    /// definition's `mountPoints[].containerPath`.
349    pub container_path: String,
350    /// `mountPoints[].readOnly` honoured: when true, append `:ro` to the
351    /// `-v` flag so the bind/named volume is read-only inside the
352    /// container. Defaults to false (read-write) when omitted.
353    pub read_only: bool,
354    /// Whether `source` is a task-scoped docker named volume that should be
355    /// removed when the task stops, matching AWS: anonymous "Docker volumes"
356    /// and `dockerVolumeConfiguration` with `scope=task` are deleted on task
357    /// teardown. `false` for host bind mounts, EFS/FSx shared stubs, and
358    /// `scope=shared` docker volumes, which persist independently of the task.
359    pub cleanup_on_stop: bool,
360}
361
362/// One `ulimits` entry. Becomes `--ulimit <name>=<soft>:<hard>`.
363#[derive(Clone, Debug, PartialEq, Eq)]
364pub(crate) struct Ulimit {
365    pub name: String,
366    pub soft_limit: i32,
367    pub hard_limit: i32,
368}
369
370/// One `linuxParameters.devices` entry. Becomes `--device <hostPath>:<containerPath><permissions>`.
371#[derive(Clone, Debug, PartialEq, Eq)]
372pub(crate) struct Device {
373    pub host_path: String,
374    pub container_path: String,
375    pub permissions: String,
376}
377
378/// One `linuxParameters.sysctl` entry. Becomes `--sysctl <name>=<value>`.
379#[derive(Clone, Debug, PartialEq, Eq)]
380pub(crate) struct Sysctl {
381    pub name: String,
382    pub value: String,
383}
384
385/// Parsed `linuxParameters` from the container definition.
386#[derive(Clone, Debug, PartialEq, Eq, Default)]
387pub(crate) struct LinuxParameters {
388    pub capabilities_add: Vec<String>,
389    pub capabilities_drop: Vec<String>,
390    pub devices: Vec<Device>,
391    pub init_process_enabled: bool,
392    pub shared_memory_size: Option<i32>,
393    pub sysctls: Vec<Sysctl>,
394    pub tmpfs: Vec<Tmpfs>,
395    pub privileged: bool,
396}
397
398/// One `linuxParameters.tmpfs` entry. Becomes `--tmpfs <containerPath>:size=<size>M<,options>*`.
399#[derive(Clone, Debug, PartialEq, Eq)]
400pub(crate) struct Tmpfs {
401    pub container_path: String,
402    pub size: i32,
403    pub mount_options: Vec<String>,
404}
405
406#[derive(Clone, Debug)]
407struct ResolvedContainerPlan {
408    plan: ContainerPlan,
409    env: Vec<(String, String)>,
410}
411
412/// Result of waiting for a task's lifetime-determining container.
413#[derive(Clone, Debug)]
414struct TaskExitOutcome {
415    /// Index into the started-containers list of the container whose exit
416    /// closed out the task. `None` only in degenerate cases — kept as
417    /// `Option` so `final_containers` indexing stays explicit.
418    exited_index: Option<usize>,
419    exit_code: i64,
420    stop_code: &'static str,
421}
422
423/// Per-container record persisted on the task. Mirrors the AWS Container
424/// shape but tracks the docker-side container id alongside ECS metadata.
425#[derive(Clone, Debug)]
426pub(crate) struct RunningContainer {
427    pub(crate) name: String,
428    pub(crate) container_id: String,
429    pub(crate) essential: bool,
430    pub(crate) exit_code: Option<i64>,
431    /// Resolved `networkBindings` for DescribeTasks. Computed from the
432    /// task definition's `portMappings` at launch and surfaced verbatim
433    /// in the per-container response.
434    pub(crate) network_bindings: Vec<serde_json::Value>,
435    /// Image digest captured from `docker inspect` after pull. AWS
436    /// surfaces this on the Container response so callers can pin which
437    /// exact image revision a task is running. `None` when the inspect
438    /// failed or the CLI didn't expose `RepoDigests`.
439    pub(crate) image_digest: Option<String>,
440}
441
442/// Pure decision: does the current set of containers warrant stopping
443/// the task? Returns true when any essential container has exited, or
444/// when every container has exited (regardless of essential). Mirrors
445/// AWS ECS task lifetime semantics.
446pub(crate) fn task_should_stop(containers: &[RunningContainer]) -> bool {
447    if containers.is_empty() {
448        return true;
449    }
450    let any_essential_exited = containers
451        .iter()
452        .any(|c| c.essential && c.exit_code.is_some());
453    if any_essential_exited {
454        return true;
455    }
456    containers.iter().all(|c| c.exit_code.is_some())
457}
458
459/// True if the task's `desired_status` in state is `STOPPED` — i.e. a
460/// StopTask / scale-down / DeleteService raced the launch and asked for this
461/// task to be killed. A missing task (deleted from state mid-launch) also
462/// counts as "stop": there's nothing left to keep running for.
463pub(crate) fn task_desired_stopped(
464    state: &SharedEcsState,
465    account_id: &str,
466    task_id: &str,
467) -> bool {
468    let accounts = state.read();
469    match accounts.get(account_id).and_then(|s| s.tasks.get(task_id)) {
470        Some(task) => task.desired_status == "STOPPED",
471        None => true,
472    }
473}
474
475fn build_container_plans(
476    state: &SharedEcsState,
477    account_id: &str,
478    task_id: &str,
479    _server_port: u16,
480) -> Result<Vec<ContainerPlan>, RuntimeError> {
481    let accounts = state.read();
482    let s = accounts
483        .get(account_id)
484        .ok_or_else(|| RuntimeError::ContainerStart("account missing".into()))?;
485    let task = s
486        .tasks
487        .get(task_id)
488        .ok_or_else(|| RuntimeError::ContainerStart("task missing".into()))?;
489    if task.containers.is_empty() {
490        return Err(RuntimeError::ContainerStart(
491            "task has no containers".into(),
492        ));
493    }
494    let has_task_role = task.task_role_arn.is_some();
495    let task_def = s
496        .task_definitions
497        .get(&task.family)
498        .and_then(|revs| revs.get(&task.revision));
499    let network_mode = task_def.and_then(|td| td.network_mode.clone());
500    // Index `volumes[]` by name so each container's `mountPoints[]` can
501    // resolve its volume in O(1). Real ECS rejects mountPoints that
502    // reference an undeclared volume at register time; we don't yet, so
503    // unresolved names just produce zero mounts at launch.
504    let volumes_by_name: std::collections::HashMap<String, &serde_json::Value> = task_def
505        .map(|td| {
506            td.volumes
507                .iter()
508                .filter_map(|v| {
509                    let name = v.get("name").and_then(|n| n.as_str())?;
510                    Some((name.to_string(), v))
511                })
512                .collect()
513        })
514        .unwrap_or_default();
515    let mut plans = Vec::with_capacity(task.containers.len());
516    for container in &task.containers {
517        let def = find_container_definition(s, &task.family, task.revision, &container.name);
518        let secrets_refs = def
519            .as_ref()
520            .and_then(|d| d.get("secrets").and_then(|v| v.as_array()).cloned())
521            .map(|arr| {
522                arr.iter()
523                    .filter_map(|e| {
524                        let name = e.get("name").and_then(|v| v.as_str())?.to_string();
525                        let value_from = e.get("valueFrom").and_then(|v| v.as_str())?.to_string();
526                        Some((name, value_from))
527                    })
528                    .collect::<Vec<_>>()
529            })
530            .unwrap_or_default();
531        let str_array = |key: &str| -> Vec<String> {
532            def.as_ref()
533                .and_then(|d| d.get(key).and_then(|v| v.as_array()).cloned())
534                .map(|arr| {
535                    arr.iter()
536                        .filter_map(|v| v.as_str().map(String::from))
537                        .collect::<Vec<_>>()
538                })
539                .unwrap_or_default()
540        };
541        let env = def
542            .as_ref()
543            .and_then(|d| d.get("environment").and_then(|v| v.as_array()).cloned())
544            .map(|arr| {
545                arr.iter()
546                    .filter_map(|e| {
547                        let k = e.get("name").and_then(|v| v.as_str())?;
548                        let v = e.get("value").and_then(|v| v.as_str()).unwrap_or("");
549                        Some((k.to_string(), v.to_string()))
550                    })
551                    .collect::<Vec<_>>()
552            })
553            .unwrap_or_default();
554        let port_mappings = def
555            .as_ref()
556            .and_then(|d| d.get("portMappings").and_then(|v| v.as_array()).cloned())
557            .map(|arr| {
558                arr.iter()
559                    .filter_map(parse_port_mapping)
560                    .collect::<Vec<_>>()
561            })
562            .unwrap_or_default();
563        let depends_on = def
564            .as_ref()
565            .and_then(|d| d.get("dependsOn").and_then(|v| v.as_array()).cloned())
566            .map(|arr| {
567                arr.iter()
568                    .filter_map(parse_depends_on_entry)
569                    .collect::<Vec<_>>()
570            })
571            .unwrap_or_default();
572        let health_check = def
573            .as_ref()
574            .and_then(|d| d.get("healthCheck"))
575            .and_then(parse_health_check);
576        let volume_mounts = def
577            .as_ref()
578            .and_then(|d| d.get("mountPoints").and_then(|v| v.as_array()).cloned())
579            .map(|arr| {
580                arr.iter()
581                    .filter_map(|mp| resolve_mount_point(mp, &volumes_by_name))
582                    .collect::<Vec<_>>()
583            })
584            .unwrap_or_default();
585        let ulimits = def
586            .as_ref()
587            .and_then(|d| d.get("ulimits").and_then(|v| v.as_array()).cloned())
588            .map(|arr| arr.iter().filter_map(parse_ulimit).collect::<Vec<_>>())
589            .unwrap_or_default();
590        let linux_parameters = def
591            .as_ref()
592            .and_then(|d| d.get("linuxParameters"))
593            .and_then(parse_linux_parameters);
594        let stop_timeout = def.as_ref().and_then(|d| {
595            d.get("stopTimeout")
596                .and_then(|v| v.as_u64())
597                .map(|n| n as u32)
598        });
599        let user = def
600            .as_ref()
601            .and_then(|d| d.get("user").and_then(|v| v.as_str()).map(String::from));
602        let working_directory = def.as_ref().and_then(|d| {
603            d.get("workingDirectory")
604                .and_then(|v| v.as_str())
605                .map(String::from)
606        });
607        let tty = def
608            .as_ref()
609            .and_then(|d| d.get("tty").and_then(|v| v.as_bool()))
610            .unwrap_or(false);
611        let interactive = def
612            .as_ref()
613            .and_then(|d| d.get("interactive").and_then(|v| v.as_bool()))
614            .unwrap_or(false);
615        let readonly_rootfs = def
616            .as_ref()
617            .and_then(|d| d.get("readonlyRootFilesystem").and_then(|v| v.as_bool()))
618            .unwrap_or(false);
619        plans.push(ContainerPlan {
620            container_name: container.name.clone(),
621            image: container.image.clone(),
622            env,
623            entry_point: str_array("entryPoint"),
624            command: str_array("command"),
625            secrets_refs,
626            essential: container.essential,
627            has_task_role,
628            port_mappings,
629            network_mode: network_mode.clone(),
630            depends_on,
631            health_check,
632            volume_mounts,
633            ulimits,
634            linux_parameters,
635            stop_timeout,
636            user,
637            working_directory,
638            tty,
639            interactive,
640            readonly_rootfs,
641        });
642    }
643    let plans = topo_sort_plans(plans);
644    Ok(plans)
645}
646
647/// Resolve one `mountPoints[]` entry against the indexed task-definition
648/// volumes. Returns `None` when:
649/// - the entry has no `containerPath` or `sourceVolume`,
650/// - the named volume isn't declared on the task definition.
651///
652/// Returns `Some(VolumeMount)` for every supported volume kind:
653/// host bind, EFS, FSx, named docker volume, anonymous docker volume.
654fn resolve_mount_point(
655    mount_point: &serde_json::Value,
656    volumes_by_name: &std::collections::HashMap<String, &serde_json::Value>,
657) -> Option<VolumeMount> {
658    let container_path = mount_point
659        .get("containerPath")
660        .and_then(|v| v.as_str())?
661        .to_string();
662    let source_volume = mount_point.get("sourceVolume").and_then(|v| v.as_str())?;
663    let read_only = mount_point
664        .get("readOnly")
665        .and_then(|v| v.as_bool())
666        .unwrap_or(false);
667    let volume = volumes_by_name.get(source_volume)?;
668    let source = resolve_volume_source(source_volume, volume)?;
669    let cleanup_on_stop = volume_is_task_scoped(volume);
670    Some(VolumeMount {
671        source,
672        container_path,
673        read_only,
674        cleanup_on_stop,
675    })
676}
677
678/// Whether a task-definition `volumes[]` entry is a task-scoped docker named
679/// volume that AWS deletes when the task stops. Mirrors the kind matching in
680/// [`resolve_volume_source`]:
681/// - host bind with a non-empty `sourcePath` -> persists on the host: `false`.
682/// - EFS / FSx -> shared filesystem, persists: `false`.
683/// - `dockerVolumeConfiguration` -> task-scoped unless `scope=shared`.
684/// - bare entry (or host with empty `sourcePath`) -> anonymous task-scoped
685///   docker volume: `true`.
686fn volume_is_task_scoped(volume: &serde_json::Value) -> bool {
687    if let Some(host) = volume.get("host") {
688        if let Some(path) = host.get("sourcePath").and_then(|v| v.as_str()) {
689            if !path.is_empty() {
690                return false;
691            }
692        }
693    }
694    if volume.get("efsVolumeConfiguration").is_some()
695        || volume
696            .get("fsxWindowsFileServerVolumeConfiguration")
697            .is_some()
698    {
699        return false;
700    }
701    if let Some(docker) = volume.get("dockerVolumeConfiguration") {
702        // Default scope for an ECS docker volume is `task`; only `shared`
703        // volumes outlive the task.
704        let scope = docker
705            .get("scope")
706            .and_then(|v| v.as_str())
707            .unwrap_or("task");
708        return scope != "shared";
709    }
710    // Bare volume entry (or host with empty sourcePath): anonymous task volume.
711    true
712}
713
714/// Map a single task-definition `volumes[]` entry to the source side of a
715/// `docker run -v` flag. The matching here mirrors the AWS volume kinds:
716///
717/// 1. `host.sourcePath` -> use that path directly (bind mount).
718/// 2. `efsVolumeConfiguration.fileSystemId` -> stub directory under
719///    `/tmp/fakecloud/efs/<filesystemId>[/<rootDirectory>]`. Created with
720///    `mkdir -p` so different tasks targeting the same filesystem id
721///    share the same host directory, matching real EFS's "many tasks,
722///    one filesystem" semantics.
723/// 3. `fsxWindowsFileServerVolumeConfiguration.fileSystemId` -> stub
724///    directory under `/tmp/fakecloud/fsx/<filesystemId>/<rootDirectory>`.
725/// 4. `dockerVolumeConfiguration` -> the volume `name` itself (named
726///    docker volume; docker creates it on first reference).
727/// 5. Bare entry (only `name`) -> the volume `name` as an anonymous
728///    docker volume reference, matching AWS's "Docker volumes" default.
729///
730/// Returns `None` when the configuration is malformed (e.g. EFS without
731/// a fileSystemId).
732fn resolve_volume_source(name: &str, volume: &serde_json::Value) -> Option<String> {
733    if let Some(host) = volume.get("host") {
734        if let Some(path) = host.get("sourcePath").and_then(|v| v.as_str()) {
735            // Empty sourcePath means "anonymous host volume" — fall
736            // through to the named-volume default below.
737            if !path.is_empty() {
738                ensure_dir_exists(path);
739                return Some(path.to_string());
740            }
741        }
742    }
743    if let Some(efs) = volume.get("efsVolumeConfiguration") {
744        let fs_id = efs.get("fileSystemId").and_then(|v| v.as_str())?;
745        let root = efs
746            .get("rootDirectory")
747            .and_then(|v| v.as_str())
748            .unwrap_or("/");
749        return Some(shared_volume_name("efs", fs_id, root));
750    }
751    if let Some(fsx) = volume.get("fsxWindowsFileServerVolumeConfiguration") {
752        let fs_id = fsx.get("fileSystemId").and_then(|v| v.as_str())?;
753        let root = fsx
754            .get("rootDirectory")
755            .and_then(|v| v.as_str())
756            .unwrap_or("/");
757        return Some(shared_volume_name("fsx", fs_id, root));
758    }
759    if volume.get("dockerVolumeConfiguration").is_some() {
760        // Named docker volume — docker auto-creates it on first
761        // reference. Pass the volume name through verbatim.
762        return Some(name.to_string());
763    }
764    // Bare volume entry: anonymous docker volume keyed by name.
765    Some(name.to_string())
766}
767
768/// Compose the docker **named-volume** name for an EFS/FSx volume. A
769/// single shared volume per filesystem id when `rootDirectory` is unset
770/// or `/` (the EFS default mount target); otherwise the rootDirectory is
771/// folded into the name so distinct mount targets within one filesystem
772/// stay isolated. A docker named volume lives on the daemon rather than a
773/// host path, so tasks share state correctly *and* it works when
774/// fakecloud itself runs in a container (`FAKECLOUD_IN_CONTAINER=1`),
775/// where a host-path stub created inside fakecloud's own filesystem would
776/// resolve to an empty dir against the host daemon (issue #1539, bug 0.6).
777/// The segments are sanitized to docker's volume-name charset.
778fn shared_volume_name(kind: &str, fs_id: &str, root: &str) -> String {
779    let trimmed = root.trim_start_matches('/').trim_end_matches('/');
780    let fs_id = sanitize_volume_segment(fs_id);
781    if trimmed.is_empty() {
782        format!("fakecloud-{kind}-{fs_id}")
783    } else {
784        format!(
785            "fakecloud-{kind}-{fs_id}-{}",
786            sanitize_volume_segment(trimmed)
787        )
788    }
789}
790
791/// Map an arbitrary string to docker's volume-name charset by replacing
792/// every character outside `[A-Za-z0-9_.-]` with `-`.
793fn sanitize_volume_segment(s: &str) -> String {
794    s.chars()
795        .map(|c| {
796            if c.is_ascii_alphanumeric() || matches!(c, '_' | '.' | '-') {
797                c
798            } else {
799                '-'
800            }
801        })
802        .collect()
803}
804
805/// Best-effort `mkdir -p` so the EFS/FSx stub path exists before the
806/// first task tries to bind-mount it. Failures are ignored — docker
807/// will surface a clear error on the run, and unit tests don't have a
808/// writable `/tmp/fakecloud` in every sandbox.
809fn ensure_dir_exists(path: &str) {
810    let _ = std::fs::create_dir_all(path);
811}
812
813/// Parse one `dependsOn[]` entry. Returns `None` for malformed entries
814/// (missing `containerName`, unrecognised `condition`) so the caller
815/// can drop them silently from the launch plan — register-time
816/// validation already rejects bad values; this is a defensive fallback.
817fn parse_depends_on_entry(value: &serde_json::Value) -> Option<DependsOn> {
818    let container_name = value
819        .get("containerName")
820        .and_then(|v| v.as_str())?
821        .to_string();
822    let raw_condition = value.get("condition").and_then(|v| v.as_str())?;
823    let condition = DependsOnCondition::parse(raw_condition)?;
824    Some(DependsOn {
825        container_name,
826        condition,
827    })
828}
829
830/// Topologically sort container plans so `dependsOn` dependencies start
831/// before their dependants. Implements Kahn's algorithm with stable order:
832/// when multiple plans are ready, we keep their original declaration
833/// index, so a task without any dependsOn launches in the same order the
834/// user wrote in the task definition. Cycles fall through with the
835/// remaining plans appended in original order — the runtime will still
836/// launch every container; it just can't guarantee dependency ordering
837/// in that degenerate case. Cycles are rejected at register time
838/// (RegisterTaskDefinition -> validate_depends_on_acyclic), so reaching
839/// that branch from a real launch path means a bug elsewhere.
840fn topo_sort_plans(plans: Vec<ContainerPlan>) -> Vec<ContainerPlan> {
841    use std::collections::{HashMap, HashSet};
842    let names: HashSet<String> = plans.iter().map(|p| p.container_name.clone()).collect();
843    let index: HashMap<String, usize> = plans
844        .iter()
845        .enumerate()
846        .map(|(i, p)| (p.container_name.clone(), i))
847        .collect();
848    // in_degree[i] = number of unresolved dependencies for plan i. We
849    // ignore depends_on entries that name a container not in the task
850    // (real ECS rejects those at register time; our register path doesn't
851    // yet, so be defensive here).
852    let mut in_degree: Vec<usize> = plans
853        .iter()
854        .map(|p| {
855            p.depends_on
856                .iter()
857                .filter(|d| names.contains(&d.container_name))
858                .count()
859        })
860        .collect();
861    // dependants[i] = indices of plans that depend on plan i.
862    let mut dependants: Vec<Vec<usize>> = vec![Vec::new(); plans.len()];
863    for (i, p) in plans.iter().enumerate() {
864        for d in &p.depends_on {
865            if let Some(&di) = index.get(&d.container_name) {
866                dependants[di].push(i);
867            }
868        }
869    }
870    let mut ordered: Vec<ContainerPlan> = Vec::with_capacity(plans.len());
871    let mut emitted: Vec<bool> = vec![false; plans.len()];
872    loop {
873        // Pick the lowest-index plan whose in_degree is 0 to keep stable
874        // order across runs.
875        let next = (0..plans.len()).find(|&i| !emitted[i] && in_degree[i] == 0);
876        match next {
877            Some(i) => {
878                emitted[i] = true;
879                ordered.push(plans[i].clone());
880                for &di in &dependants[i] {
881                    if in_degree[di] > 0 {
882                        in_degree[di] -= 1;
883                    }
884                }
885            }
886            None => break,
887        }
888    }
889    // Cycle: append anything left in original order so we don't drop plans.
890    for (i, p) in plans.into_iter().enumerate() {
891        if !emitted[i] {
892            ordered.push(p);
893        }
894    }
895    ordered
896}
897
898/// Validate that `containerDefinitions[].dependsOn[]` graph is acyclic.
899/// Real ECS rejects cyclic dependencies at RegisterTaskDefinition time
900/// with a `ClientException`; we mirror that. Returns the offending pair
901/// of container names so the caller can produce a useful error.
902///
903/// Operates directly on the raw JSON definitions (rather than parsed
904/// `ContainerPlan`s) so register-time validation doesn't have to first
905/// build a full plan from a not-yet-stored task definition.
906pub(crate) fn find_depends_on_cycle(
907    container_definitions: &[serde_json::Value],
908) -> Option<(String, String)> {
909    use std::collections::HashMap;
910
911    let names: Vec<String> = container_definitions
912        .iter()
913        .filter_map(|c| c.get("name").and_then(|n| n.as_str()).map(String::from))
914        .collect();
915    let index: HashMap<&str, usize> = names
916        .iter()
917        .enumerate()
918        .map(|(i, n)| (n.as_str(), i))
919        .collect();
920
921    let mut adj: Vec<Vec<usize>> = vec![Vec::new(); names.len()];
922    for (i, cd) in container_definitions.iter().enumerate() {
923        if i >= names.len() {
924            continue;
925        }
926        let Some(deps) = cd.get("dependsOn").and_then(|v| v.as_array()) else {
927            continue;
928        };
929        for d in deps {
930            let Some(target) = d.get("containerName").and_then(|v| v.as_str()) else {
931                continue;
932            };
933            if let Some(&j) = index.get(target) {
934                // Edge: i depends on j -> for cycle DFS we walk from i to j.
935                adj[i].push(j);
936            }
937        }
938    }
939
940    // DFS with three-colour marking (white=0, gray=1, black=2). When we
941    // hit a gray neighbour we've closed a cycle; report the back-edge as
942    // the offending pair.
943    let mut state = vec![0u8; names.len()];
944    let mut stack: Vec<(usize, usize)> = Vec::new();
945    for start in 0..names.len() {
946        if state[start] != 0 {
947            continue;
948        }
949        stack.clear();
950        stack.push((start, 0));
951        state[start] = 1;
952        while let Some(&(node, next_edge)) = stack.last() {
953            if next_edge < adj[node].len() {
954                let nb = adj[node][next_edge];
955                stack.last_mut().unwrap().1 += 1;
956                match state[nb] {
957                    0 => {
958                        state[nb] = 1;
959                        stack.push((nb, 0));
960                    }
961                    1 => {
962                        return Some((names[node].clone(), names[nb].clone()));
963                    }
964                    _ => {}
965                }
966            } else {
967                state[node] = 2;
968                stack.pop();
969            }
970        }
971    }
972    None
973}
974
975/// Snapshot of the docker container state we care about for `dependsOn`
976/// gating: whether the container exists/started, whether it's exited,
977/// its exit code, and (when configured) its health status.
978#[derive(Debug, Clone)]
979struct InspectedState {
980    started: bool,
981    exited: bool,
982    exit_code: i64,
983    health: Option<String>,
984}
985
986/// One `docker inspect` call returning every field needed by
987/// [`condition_is_met`]. Returns `None` when the container doesn't exist
988/// yet or inspect fails — the caller will simply retry on the next poll.
989async fn inspect_container_state(cli: &str, container_id: &str) -> Option<InspectedState> {
990    // Compose all four fields into a single inspect format so the gate
991    // costs one process spawn per poll rather than four.
992    let format =
993        "{{.State.Status}}|{{.State.Running}}|{{.State.ExitCode}}|{{if .State.Health}}{{.State.Health.Status}}{{else}}<none>{{end}}";
994    let out = Command::new(cli)
995        .args(["inspect", "-f", format, container_id])
996        .output()
997        .await
998        .ok()?;
999    if !out.status.success() {
1000        return None;
1001    }
1002    let raw = String::from_utf8_lossy(&out.stdout).trim().to_string();
1003    let parts: Vec<&str> = raw.split('|').collect();
1004    if parts.len() < 4 {
1005        return None;
1006    }
1007    let status = parts[0];
1008    let running = parts[1] == "true";
1009    let exit_code: i64 = parts[2].parse().unwrap_or(-1);
1010    let health = match parts[3] {
1011        "<none>" | "" => None,
1012        other => Some(other.to_string()),
1013    };
1014    // `created` is the brief moment between docker creating the
1015    // container and the entrypoint running. Treat anything past
1016    // `created` as "started" for the START condition.
1017    let started = running || status == "exited" || status == "running" || status == "dead";
1018    let exited = status == "exited" || status == "dead";
1019    Some(InspectedState {
1020        started,
1021        exited,
1022        exit_code,
1023        health,
1024    })
1025}
1026
1027/// Decide whether the polled `state` satisfies a `dependsOn[].condition`.
1028/// Encapsulates the AWS semantics so the polling loop is purely
1029/// mechanical.
1030fn condition_is_met(condition: DependsOnCondition, state: &InspectedState) -> bool {
1031    match condition {
1032        DependsOnCondition::Start => state.started,
1033        DependsOnCondition::Complete => state.exited,
1034        DependsOnCondition::Success => state.exited && state.exit_code == 0,
1035        DependsOnCondition::Healthy => state.health.as_deref() == Some("healthy"),
1036    }
1037}
1038
1039/// Test-only re-export of [`parse_port_mapping`] so sibling test modules
1040/// can lock in the default-port / default-protocol behaviour without us
1041/// widening the visibility of the parser itself.
1042#[cfg(test)]
1043pub(crate) fn __test_parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
1044    parse_port_mapping(value)
1045}
1046
1047/// Parse a `healthCheck` block from a task definition's container
1048/// definition. Returns `None` for missing `command` or for a command
1049/// whose first token is `NONE` (the AWS-documented "disable healthcheck
1050/// inherited from image" sentinel — emit no flags rather than a `none`
1051/// healthcheck). Defaults follow AWS: 30s/5s/3/0s.
1052fn parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
1053    let cmd_arr = value.get("command")?.as_array()?;
1054    let command: Vec<String> = cmd_arr
1055        .iter()
1056        .filter_map(|v| v.as_str().map(String::from))
1057        .collect();
1058    if command.is_empty() {
1059        return None;
1060    }
1061    if command.first().map(|s| s.as_str()) == Some("NONE") {
1062        return None;
1063    }
1064    let read_u32 = |key: &str, default: u32| -> u32 {
1065        value
1066            .get(key)
1067            .and_then(|v| v.as_i64())
1068            .filter(|n| (0..=u32::MAX as i64).contains(n))
1069            .map(|n| n as u32)
1070            .unwrap_or(default)
1071    };
1072    Some(HealthCheckSpec {
1073        command,
1074        interval_seconds: read_u32("interval", 30),
1075        timeout_seconds: read_u32("timeout", 5),
1076        retries: read_u32("retries", 3),
1077        start_period_seconds: read_u32("startPeriod", 0),
1078    })
1079}
1080
1081/// Parse one `ulimits` entry from the container definition JSON.
1082fn parse_ulimit(value: &serde_json::Value) -> Option<Ulimit> {
1083    let name = value.get("name").and_then(|v| v.as_str())?;
1084    let soft = value
1085        .get("softLimit")
1086        .and_then(|v| v.as_i64())
1087        .filter(|n| *n >= 0)? as i32;
1088    let hard = value
1089        .get("hardLimit")
1090        .and_then(|v| v.as_i64())
1091        .filter(|n| *n >= 0)? as i32;
1092    Some(Ulimit {
1093        name: name.to_string(),
1094        soft_limit: soft,
1095        hard_limit: hard,
1096    })
1097}
1098
1099/// Parse `linuxParameters` from the container definition JSON.
1100fn parse_linux_parameters(value: &serde_json::Value) -> Option<LinuxParameters> {
1101    let mut lp = LinuxParameters::default();
1102    if let Some(arr) = value
1103        .get("capabilities")
1104        .and_then(|v| v.get("add"))
1105        .and_then(|v| v.as_array())
1106    {
1107        lp.capabilities_add = arr
1108            .iter()
1109            .filter_map(|v| v.as_str().map(String::from))
1110            .collect();
1111    }
1112    if let Some(arr) = value
1113        .get("capabilities")
1114        .and_then(|v| v.get("drop"))
1115        .and_then(|v| v.as_array())
1116    {
1117        lp.capabilities_drop = arr
1118            .iter()
1119            .filter_map(|v| v.as_str().map(String::from))
1120            .collect();
1121    }
1122    if let Some(arr) = value.get("devices").and_then(|v| v.as_array()) {
1123        lp.devices = arr.iter().filter_map(parse_device).collect();
1124    }
1125    lp.init_process_enabled = value
1126        .get("initProcessEnabled")
1127        .and_then(|v| v.as_bool())
1128        .unwrap_or(false);
1129    lp.shared_memory_size = value
1130        .get("sharedMemorySize")
1131        .and_then(|v| v.as_i64())
1132        .map(|n| n as i32);
1133    if let Some(arr) = value.get("sysctl").and_then(|v| v.as_array()) {
1134        lp.sysctls = arr.iter().filter_map(parse_sysctl).collect();
1135    }
1136    if let Some(arr) = value.get("tmpfs").and_then(|v| v.as_array()) {
1137        lp.tmpfs = arr.iter().filter_map(parse_tmpfs).collect();
1138    }
1139    lp.privileged = value
1140        .get("privileged")
1141        .and_then(|v| v.as_bool())
1142        .unwrap_or(false);
1143    Some(lp)
1144}
1145
1146fn parse_device(value: &serde_json::Value) -> Option<Device> {
1147    let host_path = value.get("hostPath").and_then(|v| v.as_str())?.to_string();
1148    let container_path = value
1149        .get("containerPath")
1150        .and_then(|v| v.as_str())?
1151        .to_string();
1152    let permissions = value
1153        .get("permissions")
1154        .and_then(|v| v.as_str())
1155        .unwrap_or("rwm")
1156        .to_string();
1157    Some(Device {
1158        host_path,
1159        container_path,
1160        permissions,
1161    })
1162}
1163
1164fn parse_sysctl(value: &serde_json::Value) -> Option<Sysctl> {
1165    let name = value.get("name").and_then(|v| v.as_str())?.to_string();
1166    let value_str = value.get("value").and_then(|v| v.as_str())?.to_string();
1167    Some(Sysctl {
1168        name,
1169        value: value_str,
1170    })
1171}
1172
1173fn parse_tmpfs(value: &serde_json::Value) -> Option<Tmpfs> {
1174    let container_path = value
1175        .get("containerPath")
1176        .and_then(|v| v.as_str())?
1177        .to_string();
1178    let size = value
1179        .get("size")
1180        .and_then(|v| v.as_i64())
1181        .filter(|n| *n > 0)? as i32;
1182    let mount_options = value
1183        .get("mountOptions")
1184        .and_then(|v| v.as_array())
1185        .map(|arr| {
1186            arr.iter()
1187                .filter_map(|v| v.as_str().map(String::from))
1188                .collect()
1189        })
1190        .unwrap_or_default();
1191    Some(Tmpfs {
1192        container_path,
1193        size,
1194        mount_options,
1195    })
1196}
1197
1198/// Render a [`HealthCheckSpec`] into the docker run flags that emulate
1199/// the equivalent ECS healthCheck. AWS's `command[0]` is a sentinel
1200/// (`CMD-SHELL`/`CMD`/`NONE`); docker's `--health-cmd` always takes a
1201/// single shell-string, so we collapse the remaining tokens with spaces
1202/// for either sentinel — matching how docker itself stringifies HEALTHCHECK
1203/// CMD ["a","b"] back to a shell string at inspect time.
1204pub(crate) fn render_health_flags(hc: &HealthCheckSpec) -> Vec<String> {
1205    if hc.command.len() < 2 {
1206        return Vec::new();
1207    }
1208    let cmd_kind = hc.command[0].as_str();
1209    if cmd_kind != "CMD" && cmd_kind != "CMD-SHELL" {
1210        return Vec::new();
1211    }
1212    let cmd_string = hc.command[1..].join(" ");
1213    vec![
1214        "--health-cmd".into(),
1215        cmd_string,
1216        format!("--health-interval={}s", hc.interval_seconds),
1217        format!("--health-timeout={}s", hc.timeout_seconds),
1218        format!("--health-retries={}", hc.retries),
1219        format!("--health-start-period={}s", hc.start_period_seconds),
1220    ]
1221}
1222
1223/// Test-only re-export of [`parse_health_check`] so unit tests in
1224/// sibling modules can lock in the AWS default-fill behaviour without
1225/// us widening the parser's visibility.
1226#[cfg(test)]
1227pub(crate) fn __test_parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
1228    parse_health_check(value)
1229}
1230
1231/// Map a docker `.State.Health.Status` value to the ECS `healthStatus`
1232/// shape. Docker emits `starting|healthy|unhealthy|none|""` (empty when
1233/// the image has no HEALTHCHECK and we didn't add one). ECS only knows
1234/// `HEALTHY|UNHEALTHY|UNKNOWN`, so anything that isn't a clean healthy/
1235/// unhealthy lands in `UNKNOWN`.
1236pub(crate) fn docker_health_to_ecs(raw: &str) -> &'static str {
1237    match raw.trim().to_ascii_lowercase().as_str() {
1238        "healthy" => "HEALTHY",
1239        "unhealthy" => "UNHEALTHY",
1240        _ => "UNKNOWN",
1241    }
1242}
1243
1244/// Parse a single `portMappings[]` entry. Returns `None` for entries
1245/// that are missing `containerPort` or have a value out of `u16` range.
1246/// Defaults: `hostPort` -> `containerPort`, `protocol` -> `tcp`.
1247fn parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
1248    let container_port = value
1249        .get("containerPort")
1250        .and_then(|v| v.as_i64())
1251        .filter(|n| (0..=u16::MAX as i64).contains(n))? as u16;
1252    let host_port_raw = value
1253        .get("hostPort")
1254        .and_then(|v| v.as_i64())
1255        .filter(|n| (0..=u16::MAX as i64).contains(n))
1256        .map(|n| n as u16)
1257        .unwrap_or(0);
1258    let host_port = if host_port_raw == 0 {
1259        container_port
1260    } else {
1261        host_port_raw
1262    };
1263    let protocol = value
1264        .get("protocol")
1265        .and_then(|v| v.as_str())
1266        .map(|s| s.to_ascii_lowercase())
1267        .unwrap_or_else(|| "tcp".to_string());
1268    Some(PortMapping {
1269        container_port,
1270        host_port,
1271        protocol,
1272    })
1273}
1274
1275/// The `fakecloud-instance=fakecloud-<pid>` ownership label value, matching
1276/// exactly how RDS/ElastiCache/Lambda/EC2(Docker) construct it. The shared
1277/// startup reaper lists containers carrying this label, parses the owning
1278/// PID, and removes any whose owner is no longer alive. Returns the full
1279/// `key=value` string ready to follow a `--label` flag.
1280pub(crate) fn fakecloud_instance_label() -> String {
1281    format!("fakecloud-instance=fakecloud-{}", std::process::id())
1282}
1283
1284/// Build the docker `run` argv for a single container plan. Pure so unit
1285/// tests can assert on flag ordering / `--publish` translation without
1286/// shelling out. The returned vector is everything *after* the binary
1287/// name (i.e. starts with `run`, ends with the user-supplied command
1288/// args).
1289pub(crate) fn build_run_argv(
1290    plan: &ContainerPlan,
1291    env: &[(String, String)],
1292    task_id: &str,
1293    host_alias: &str,
1294    add_host_arg: Option<&str>,
1295    run_image: &str,
1296    awsvpc_network_ready: bool,
1297) -> Vec<String> {
1298    let mut argv: Vec<String> = Vec::new();
1299    argv.push("run".into());
1300    argv.push("-d".into());
1301    argv.push("--name".into());
1302    argv.push(format!("{}-{}", task_id, plan.container_name));
1303    argv.push("--label".into());
1304    argv.push(format!("fakecloud-ecs-task={}", task_id));
1305    argv.push("--label".into());
1306    argv.push(format!("fakecloud-ecs-container={}", plan.container_name));
1307    // Ownership label shared with RDS/ElastiCache/Lambda/EC2(Docker). The
1308    // startup reaper (`fakecloud-server::reaper`) filters strictly on
1309    // `label=fakecloud-instance` and parses the owning PID out of the value
1310    // (`fakecloud-<pid>`). Without this, ECS task containers (and their
1311    // host-port publishes / awsvpc networks) leak unreapably after an
1312    // ungraceful restart. See fakecloud_instance_label().
1313    argv.push("--label".into());
1314    argv.push(fakecloud_instance_label());
1315    // Inject `--add-host host.docker.internal:<ip>` only for docker;
1316    // podman provides `host.containers.internal` natively and rejects
1317    // the host-gateway mapping (issue #1539).
1318    if let Some(arg) = add_host_arg {
1319        argv.push("--add-host".into());
1320        argv.push(arg.to_string());
1321    }
1322    let use_awsvpc_network = plan.network_mode.as_deref() == Some("awsvpc") && awsvpc_network_ready;
1323    if use_awsvpc_network {
1324        argv.push("--network".into());
1325        argv.push(format!("fakecloud-ecs-{}", task_id));
1326    }
1327    // `awsvpc` puts the container on a per-task ENI; emulating that on a
1328    // local docker host means *not* publishing to the host port table.
1329    // Bridge / host / default network modes still get `--publish`. If
1330    // the awsvpc per-task network creation failed and we fell back to
1331    // bridge, we DO want to publish so the container is reachable.
1332    let publish_ports = !use_awsvpc_network;
1333    if publish_ports {
1334        for pm in &plan.port_mappings {
1335            argv.push("--publish".into());
1336            argv.push(format!(
1337                "{}:{}/{}",
1338                pm.container_port, pm.host_port, pm.protocol
1339            ));
1340        }
1341    }
1342    if let Some(ref hc) = plan.health_check {
1343        argv.extend(render_health_flags(hc));
1344    }
1345    let http_alias_prefix = format!("http://{host_alias}:");
1346    let https_alias_prefix = format!("https://{host_alias}:");
1347    for (k, v) in env {
1348        let transformed = v
1349            .replace("http://127.0.0.1:", http_alias_prefix.as_str())
1350            .replace("https://127.0.0.1:", https_alias_prefix.as_str())
1351            .replace("http://localhost:", http_alias_prefix.as_str())
1352            .replace("https://localhost:", https_alias_prefix.as_str());
1353        argv.push("-e".into());
1354        argv.push(format!("{}={}", k, transformed));
1355    }
1356    // Volume mounts: one `-v` flag per mountPoints entry, with the
1357    // source resolved from the task definition's `volumes[]`. EFS and
1358    // FSx stubs were materialised on the host (mkdir -p) before this
1359    // function returns, so docker can bind them straight in.
1360    for vm in &plan.volume_mounts {
1361        argv.push("-v".into());
1362        let suffix = if vm.read_only { ":ro" } else { "" };
1363        argv.push(format!("{}:{}{}", vm.source, vm.container_path, suffix));
1364    }
1365    for ul in &plan.ulimits {
1366        argv.push("--ulimit".into());
1367        argv.push(format!("{}={}:{}", ul.name, ul.soft_limit, ul.hard_limit));
1368    }
1369    if let Some(ref lp) = plan.linux_parameters {
1370        for cap in &lp.capabilities_add {
1371            argv.push("--cap-add".into());
1372            argv.push(cap.clone());
1373        }
1374        for cap in &lp.capabilities_drop {
1375            argv.push("--cap-drop".into());
1376            argv.push(cap.clone());
1377        }
1378        for dev in &lp.devices {
1379            argv.push("--device".into());
1380            argv.push(format!(
1381                "{}:{}{}",
1382                dev.host_path, dev.container_path, dev.permissions
1383            ));
1384        }
1385        if lp.init_process_enabled {
1386            argv.push("--init".into());
1387        }
1388        if let Some(size) = lp.shared_memory_size {
1389            argv.push("--shm-size".into());
1390            argv.push(format!("{}m", size));
1391        }
1392        for sys in &lp.sysctls {
1393            argv.push("--sysctl".into());
1394            argv.push(format!("{}={}", sys.name, sys.value));
1395        }
1396        for tmp in &lp.tmpfs {
1397            let mut opts = tmp.mount_options.join(",");
1398            if !opts.is_empty() {
1399                opts = format!(",{}", opts);
1400            }
1401            argv.push("--tmpfs".into());
1402            argv.push(format!("{}:size={}M{}", tmp.container_path, tmp.size, opts));
1403        }
1404        if lp.privileged {
1405            argv.push("--privileged".into());
1406        }
1407    }
1408    if let Some(timeout) = plan.stop_timeout {
1409        argv.push("--stop-timeout".into());
1410        argv.push(format!("{}", timeout));
1411    }
1412    if let Some(ref user) = plan.user {
1413        argv.push("--user".into());
1414        argv.push(user.clone());
1415    }
1416    if let Some(ref wd) = plan.working_directory {
1417        argv.push("--workdir".into());
1418        argv.push(wd.clone());
1419    }
1420    if plan.tty {
1421        argv.push("--tty".into());
1422    }
1423    if plan.interactive {
1424        argv.push("--interactive".into());
1425    }
1426    if plan.readonly_rootfs {
1427        argv.push("--read-only".into());
1428    }
1429    if let Some(first) = plan.entry_point.first() {
1430        argv.push("--entrypoint".into());
1431        argv.push(first.clone());
1432    }
1433    argv.push(run_image.to_string());
1434    for arg in plan.entry_point.iter().skip(1) {
1435        argv.push(arg.clone());
1436    }
1437    for arg in &plan.command {
1438        argv.push(arg.clone());
1439    }
1440    argv
1441}
1442
1443/// Render `networkBindings` JSON for a launched container. Empty under
1444/// `awsvpc` (the equivalent info goes on the task's ENI attachments) and
1445/// for containers without `portMappings`.
1446pub(crate) fn network_bindings_for(plan: &ContainerPlan) -> Vec<serde_json::Value> {
1447    if plan.network_mode.as_deref() == Some("awsvpc") {
1448        return Vec::new();
1449    }
1450    plan.port_mappings
1451        .iter()
1452        .map(|pm| {
1453            serde_json::json!({
1454                "bindIP": "0.0.0.0",
1455                "containerPort": pm.container_port,
1456                "hostPort": pm.host_port,
1457                "protocol": pm.protocol,
1458            })
1459        })
1460        .collect()
1461}
1462
1463/// Compute ELBv2 target registrations for a task based on its service's
1464/// loadBalancers configuration. Returns (target_group_arn, [(target_id, port)])
1465/// for each target group that should receive this task.
1466#[allow(clippy::type_complexity)]
1467pub(crate) fn compute_elbv2_targets(
1468    ecs_state: &crate::state::EcsState,
1469    task: &crate::state::Task,
1470) -> Vec<(String, Vec<(String, Option<i64>)>)> {
1471    let mut result = Vec::new();
1472    let Some(group) = task.group.as_deref() else {
1473        return result;
1474    };
1475    let service_name = group.strip_prefix("service:").unwrap_or(group);
1476    let key = crate::state::EcsState::service_key(&task.cluster_name, service_name);
1477    let Some(service) = ecs_state.services.get(&key) else {
1478        return result;
1479    };
1480
1481    let network_mode = ecs_state
1482        .task_definitions
1483        .get(&task.family)
1484        .and_then(|revs| revs.get(&task.revision))
1485        .and_then(|td| td.network_mode.as_deref());
1486
1487    for lb in &service.load_balancers {
1488        let tg_arn = lb.get("targetGroupArn").and_then(|v| v.as_str());
1489        let container_name = lb.get("containerName").and_then(|v| v.as_str());
1490        let container_port = lb.get("containerPort").and_then(|v| v.as_i64());
1491        let Some(tg_arn) = tg_arn else { continue };
1492        let Some(container_name) = container_name else {
1493            continue;
1494        };
1495
1496        let target_id = if network_mode == Some("awsvpc") {
1497            task.attachments
1498                .iter()
1499                .find(|a| a.attachment_type == "eni")
1500                .and_then(|eni| {
1501                    eni.details
1502                        .iter()
1503                        .find(|d| d.name == "privateIPv4Address")
1504                        .map(|d| d.value.clone())
1505                })
1506        } else {
1507            Some("127.0.0.1".to_string())
1508        };
1509
1510        let port = if network_mode == Some("awsvpc") {
1511            container_port
1512        } else {
1513            task.containers
1514                .iter()
1515                .find(|c| c.name == container_name)
1516                .and_then(|c| {
1517                    c.network_bindings
1518                        .iter()
1519                        .find(|nb| {
1520                            nb.get("containerPort").and_then(|v| v.as_i64()) == container_port
1521                        })
1522                        .and_then(|nb| nb.get("hostPort").and_then(|v| v.as_i64()))
1523                })
1524        };
1525
1526        if let Some(id) = target_id {
1527            if let Some(entry) = result.iter_mut().find(|(arn, _)| arn == tg_arn) {
1528                entry.1.push((id, port));
1529            } else {
1530                result.push((tg_arn.to_string(), vec![(id, port)]));
1531            }
1532        }
1533    }
1534    result
1535}
1536
1537struct TaskSnapshot {
1538    task_arn: String,
1539    cluster_arn: String,
1540    launch_type: String,
1541    group: Option<String>,
1542    task_definition_arn: String,
1543    containers: serde_json::Value,
1544}
1545
1546fn snapshot_task(state: &SharedEcsState, account_id: &str, task_id: &str) -> Option<TaskSnapshot> {
1547    let accounts = state.read();
1548    let s = accounts.get(account_id)?;
1549    let task = s.tasks.get(task_id)?;
1550    Some(TaskSnapshot {
1551        task_arn: task.task_arn.clone(),
1552        cluster_arn: task.cluster_arn.clone(),
1553        launch_type: task.launch_type.clone(),
1554        group: task.group.clone(),
1555        task_definition_arn: task.task_definition_arn.clone(),
1556        containers: serde_json::Value::Array(
1557            task.containers
1558                .iter()
1559                .map(|c| {
1560                    serde_json::json!({
1561                        "containerArn": c.container_arn,
1562                        "name": c.name,
1563                        "image": c.image,
1564                        "lastStatus": c.last_status,
1565                        "exitCode": c.exit_code,
1566                        "reason": c.reason,
1567                    })
1568                })
1569                .collect(),
1570        ),
1571    })
1572}
1573
1574/// Build an isolated docker config directory with Basic auth for
1575/// fakecloud ECR. Lets `docker pull/push/tag` work against the local OCI
1576/// v2 registry without requiring the user to run
1577/// `aws ecr get-login-password | docker login` first. Authorizes every host
1578/// fakecloud's ECR can be addressed by -- `127.0.0.1` (fakecloud on the host),
1579/// `host.docker.internal` (Docker) and `host.containers.internal` (podman) when
1580/// fakecloud runs in a container and pull URIs are rewritten to the sibling
1581/// host. Centralized in container_net so Lambda and ECS can't drift (bug-audit
1582/// 2026-06-20, 0.B2).
1583fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
1584    let dir = TempDir::new().ok()?;
1585    let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-ecs-runtime");
1586    let auths: serde_json::Map<String, serde_json::Value> =
1587        fakecloud_core::container_net::registry_auth_hosts(server_port)
1588            .into_iter()
1589            .map(|host| (host, serde_json::json!({ "auth": auth })))
1590            .collect();
1591    let config = serde_json::json!({ "auths": auths });
1592    std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
1593    Some(dir)
1594}
1595
1596fn find_container_definition(
1597    state: &crate::state::EcsState,
1598    family: &str,
1599    revision: i32,
1600    name: &str,
1601) -> Option<serde_json::Value> {
1602    state
1603        .task_definitions
1604        .get(family)?
1605        .get(&revision)?
1606        .container_definitions
1607        .iter()
1608        .find(|c| c.get("name").and_then(|v| v.as_str()) == Some(name))
1609        .cloned()
1610}
1611
1612fn mark_pull_started(state: &SharedEcsState, account_id: &str, task_id: &str) {
1613    let mut accounts = state.write();
1614    let Some(s) = accounts.get_mut(account_id) else {
1615        return;
1616    };
1617    let task_arn_cluster = s
1618        .tasks
1619        .get(task_id)
1620        .map(|t| (t.task_arn.clone(), t.cluster_arn.clone()));
1621    if let Some(task) = s.tasks.get_mut(task_id) {
1622        task.pull_started_at = Some(Utc::now());
1623    }
1624    if let Some((arn, cluster_arn)) = task_arn_cluster {
1625        s.push_event(LifecycleEvent {
1626            at: Utc::now(),
1627            event_type: "PullStarted".into(),
1628            task_arn: Some(arn),
1629            cluster_arn: Some(cluster_arn),
1630            last_status: Some("PENDING".into()),
1631            detail: serde_json::json!({}),
1632        });
1633    }
1634}
1635
1636fn mark_pull_stopped(state: &SharedEcsState, account_id: &str, task_id: &str) {
1637    let mut accounts = state.write();
1638    let Some(s) = accounts.get_mut(account_id) else {
1639        return;
1640    };
1641    if let Some(task) = s.tasks.get_mut(task_id) {
1642        task.pull_stopped_at = Some(Utc::now());
1643    }
1644}
1645
1646pub(crate) fn mark_running_multi(
1647    state: &SharedEcsState,
1648    account_id: &str,
1649    task_id: &str,
1650    started: &[RunningContainer],
1651) {
1652    let mut accounts = state.write();
1653    let Some(s) = accounts.get_mut(account_id) else {
1654        return;
1655    };
1656    let (arn, cluster_arn) = {
1657        let Some(task) = s.tasks.get_mut(task_id) else {
1658            return;
1659        };
1660        task.last_status = "RUNNING".into();
1661        task.connectivity = "CONNECTED".into();
1662        task.connectivity_at = Some(Utc::now());
1663        task.started_at = Some(Utc::now());
1664        for rc in started {
1665            if let Some(c) = task.containers.iter_mut().find(|c| c.name == rc.name) {
1666                c.runtime_id = Some(rc.container_id.clone());
1667                c.last_status = "RUNNING".into();
1668                c.network_bindings = rc.network_bindings.clone();
1669                if rc.image_digest.is_some() {
1670                    c.image_digest = rc.image_digest.clone();
1671                }
1672            }
1673        }
1674        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1675            cluster.running_tasks_count += 1;
1676            if cluster.pending_tasks_count > 0 {
1677                cluster.pending_tasks_count -= 1;
1678            }
1679        }
1680        if let Some(ref ci_arn) = task.container_instance_arn {
1681            if let Some(ci) = s
1682                .container_instances
1683                .values_mut()
1684                .find(|ci| ci.container_instance_arn == *ci_arn)
1685            {
1686                ci.running_tasks_count += 1;
1687                if ci.pending_tasks_count > 0 {
1688                    ci.pending_tasks_count -= 1;
1689                }
1690            }
1691        }
1692        (task.task_arn.clone(), task.cluster_arn.clone())
1693    };
1694    s.push_event(LifecycleEvent {
1695        at: Utc::now(),
1696        event_type: "TaskStateChange".into(),
1697        task_arn: Some(arn),
1698        cluster_arn: Some(cluster_arn),
1699        last_status: Some("RUNNING".into()),
1700        detail: serde_json::json!({}),
1701    });
1702}
1703
1704#[allow(clippy::too_many_arguments)]
1705fn finalize_stopped_multi(
1706    state: &SharedEcsState,
1707    account_id: &str,
1708    task_id: &str,
1709    final_containers: &[RunningContainer],
1710    primary_exit_code: i64,
1711    captured: &str,
1712    stop_code: &str,
1713    stopped_reason: Option<String>,
1714) {
1715    let mut accounts = state.write();
1716    let Some(s) = accounts.get_mut(account_id) else {
1717        return;
1718    };
1719    let (arn, cluster_arn) = {
1720        let Some(task) = s.tasks.get_mut(task_id) else {
1721            return;
1722        };
1723        task.last_status = "STOPPED".into();
1724        task.desired_status = "STOPPED".into();
1725        task.stopping_at = task.stopping_at.or(Some(Utc::now()));
1726        task.stopped_at = Some(Utc::now());
1727        task.stop_code = Some(stop_code.into());
1728        task.stopped_reason = stopped_reason.or(Some(format!("Exit code {}", primary_exit_code)));
1729        task.captured_logs = captured.to_string();
1730        for c in task.containers.iter_mut() {
1731            c.last_status = "STOPPED".into();
1732            if c.exit_code.is_none() {
1733                let mapped = final_containers
1734                    .iter()
1735                    .find(|r| r.name == c.name)
1736                    .and_then(|r| r.exit_code);
1737                c.exit_code = mapped.or(Some(primary_exit_code));
1738            }
1739        }
1740        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1741            if cluster.running_tasks_count > 0 {
1742                cluster.running_tasks_count -= 1;
1743            }
1744        }
1745        if let Some(ref ci_arn) = task.container_instance_arn {
1746            if let Some(ci) = s
1747                .container_instances
1748                .values_mut()
1749                .find(|ci| ci.container_instance_arn == *ci_arn)
1750            {
1751                if ci.running_tasks_count > 0 {
1752                    ci.running_tasks_count -= 1;
1753                }
1754            }
1755        }
1756        (task.task_arn.clone(), task.cluster_arn.clone())
1757    };
1758    s.push_event(LifecycleEvent {
1759        at: Utc::now(),
1760        event_type: "TaskStateChange".into(),
1761        task_arn: Some(arn),
1762        cluster_arn: Some(cluster_arn),
1763        last_status: Some("STOPPED".into()),
1764        detail: serde_json::json!({
1765            "exitCode": primary_exit_code,
1766            "stopCode": stop_code,
1767        }),
1768    });
1769}
1770
1771fn finalize_failure(state: &SharedEcsState, account_id: &str, task_id: &str, reason: &str) {
1772    let mut accounts = state.write();
1773    let Some(s) = accounts.get_mut(account_id) else {
1774        return;
1775    };
1776    let (arn, cluster_arn) = {
1777        let Some(task) = s.tasks.get_mut(task_id) else {
1778            return;
1779        };
1780        // Capture the prior status before we clobber it: if the task had
1781        // already reached RUNNING when execution failed (e.g. `docker wait`
1782        // blew up after the container started), we owe the cluster a
1783        // running-tasks decrement. Tasks that died before RUNNING only
1784        // ever incremented pendingTasksCount.
1785        let was_running = task.last_status == "RUNNING";
1786        task.last_status = "STOPPED".into();
1787        task.desired_status = "STOPPED".into();
1788        task.stopped_at = Some(Utc::now());
1789        task.stop_code = Some("TaskFailedToStart".into());
1790        task.stopped_reason = Some(reason.to_string());
1791        // Surface the failure reason on the /logs endpoint — without this,
1792        // a task that never reached RUNNING returns an empty log string,
1793        // leaving E2E assertions with no diagnostic.
1794        task.captured_logs = format!("[task failed to start]: {reason}");
1795        for c in task.containers.iter_mut() {
1796            c.last_status = "STOPPED".into();
1797            c.reason = Some(reason.to_string());
1798        }
1799        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1800            if was_running {
1801                if cluster.running_tasks_count > 0 {
1802                    cluster.running_tasks_count -= 1;
1803                }
1804            } else if cluster.pending_tasks_count > 0 {
1805                cluster.pending_tasks_count -= 1;
1806            }
1807        }
1808        if let Some(ref ci_arn) = task.container_instance_arn {
1809            if let Some(ci) = s
1810                .container_instances
1811                .values_mut()
1812                .find(|ci| ci.container_instance_arn == *ci_arn)
1813            {
1814                if was_running {
1815                    if ci.running_tasks_count > 0 {
1816                        ci.running_tasks_count -= 1;
1817                    }
1818                } else if ci.pending_tasks_count > 0 {
1819                    ci.pending_tasks_count -= 1;
1820                }
1821            }
1822        }
1823        (task.task_arn.clone(), task.cluster_arn.clone())
1824    };
1825    s.push_event(LifecycleEvent {
1826        at: Utc::now(),
1827        event_type: "TaskFailedToStart".into(),
1828        task_arn: Some(arn),
1829        cluster_arn: Some(cluster_arn),
1830        last_status: Some("STOPPED".into()),
1831        detail: serde_json::json!({ "reason": reason }),
1832    });
1833}
1834
1835/// Short helper for tests + snapshot code to sleep between state
1836/// transitions. Exposed on the crate boundary to keep test timing
1837/// centralized.
1838pub async fn sleep(duration: Duration) {
1839    tokio::time::sleep(duration).await;
1840}
1841
1842#[cfg(test)]
1843mod tests {
1844    use super::*;
1845    use crate::state::{EcsState, Task};
1846    use fakecloud_aws::arn::Arn;
1847    use fakecloud_core::multi_account::MultiAccountState;
1848    use parking_lot::RwLock;
1849    use std::sync::Arc;
1850
1851    #[test]
1852    fn cli_available_for_known_missing_binary_is_false() {
1853        assert!(!fakecloud_core::container_net::cli_available(
1854            "definitely-not-a-real-cli-binary-xyz"
1855        ));
1856    }
1857
1858    #[test]
1859    fn aws_ecr_uris_translate_for_local_pull() {
1860        assert_eq!(
1861            fakecloud_core::ecr_uri::translate_to_local(
1862                "123456789012.dkr.ecr.us-east-1.amazonaws.com/app:latest",
1863                4566
1864            )
1865            .as_deref(),
1866            Some("127.0.0.1:4566/app:latest")
1867        );
1868    }
1869
1870    fn make_task(task_id: &str) -> Task {
1871        Task {
1872            task_arn: Arn::new(
1873                "ecs",
1874                "us-east-1",
1875                "000000000000",
1876                &format!("task/default/{task_id}"),
1877            )
1878            .to_string(),
1879            task_id: task_id.into(),
1880            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
1881            cluster_name: "default".into(),
1882            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
1883            family: "app".into(),
1884            revision: 1,
1885            container_instance_arn: None,
1886            capacity_provider_name: None,
1887            last_status: "PENDING".into(),
1888            desired_status: "RUNNING".into(),
1889            launch_type: "FARGATE".into(),
1890            platform_version: None,
1891            cpu: None,
1892            memory: None,
1893            containers: Vec::new(),
1894            overrides: serde_json::json!({}),
1895            started_by: None,
1896            group: None,
1897            connectivity: "CONNECTING".into(),
1898            stop_code: None,
1899            stopped_reason: None,
1900            created_at: Utc::now(),
1901            started_at: None,
1902            stopping_at: None,
1903            stopped_at: None,
1904            pull_started_at: None,
1905            pull_stopped_at: None,
1906            connectivity_at: None,
1907            started_by_ref_id: None,
1908            execution_role_arn: None,
1909            task_role_arn: None,
1910            tags: Vec::new(),
1911            awslogs: None,
1912            captured_logs: String::new(),
1913            protection: None,
1914            enable_execute_command: false,
1915            attachments: Vec::new(),
1916            volume_configurations: Vec::new(),
1917            task_set_arn: None,
1918        }
1919    }
1920
1921    #[test]
1922    fn finalize_failure_writes_reason_into_captured_logs() {
1923        let mut accounts: MultiAccountState<EcsState> =
1924            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1925        let acct = accounts.get_or_create("000000000000");
1926        acct.tasks.insert("t1".into(), make_task("t1"));
1927        let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1928
1929        finalize_failure(
1930            &state,
1931            "000000000000",
1932            "t1",
1933            "failed to resolve secret DB_PASSWORD",
1934        );
1935
1936        let accounts = state.read();
1937        let task = accounts
1938            .get("000000000000")
1939            .unwrap()
1940            .tasks
1941            .get("t1")
1942            .unwrap();
1943        assert_eq!(task.last_status, "STOPPED");
1944        assert_eq!(task.stop_code.as_deref(), Some("TaskFailedToStart"));
1945        assert!(
1946            task.captured_logs
1947                .contains("failed to resolve secret DB_PASSWORD"),
1948            "captured_logs missing reason: {:?}",
1949            task.captured_logs
1950        );
1951        assert!(
1952            task.captured_logs.starts_with("[task failed to start]:"),
1953            "captured_logs missing prefix: {:?}",
1954            task.captured_logs
1955        );
1956    }
1957
1958    /// 4.2 — `task_desired_stopped` is the post-launch gate `run_task_inner`
1959    /// uses to detect a StopTask / scale-down / DeleteService that raced the
1960    /// launch. RUNNING desired_status -> keep running; STOPPED -> self-stop;
1961    /// task removed from state -> treat as stop (nothing to keep alive).
1962    #[test]
1963    fn task_desired_stopped_detects_stop_during_launch() {
1964        let mut accounts: MultiAccountState<EcsState> =
1965            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1966        let acct = accounts.get_or_create("000000000000");
1967        acct.tasks.insert("running".into(), make_task("running"));
1968        let mut stopping = make_task("stopping");
1969        stopping.desired_status = "STOPPED".into();
1970        acct.tasks.insert("stopping".into(), stopping);
1971        let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1972
1973        assert!(
1974            !task_desired_stopped(&state, "000000000000", "running"),
1975            "a RUNNING task must not be treated as stopped",
1976        );
1977        assert!(
1978            task_desired_stopped(&state, "000000000000", "stopping"),
1979            "a task whose desired_status is STOPPED must be treated as stopped",
1980        );
1981        assert!(
1982            task_desired_stopped(&state, "000000000000", "deleted-mid-launch"),
1983            "a task removed from state mid-launch must be treated as stopped",
1984        );
1985    }
1986
1987    fn make_container(name: &str, essential: bool) -> crate::state::Container {
1988        crate::state::Container {
1989            container_arn: format!(
1990                "arn:aws:ecs:us-east-1:000000000000:container/default/abc/{name}"
1991            ),
1992            name: name.into(),
1993            image: "alpine".into(),
1994            task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
1995            last_status: "RUNNING".into(),
1996            exit_code: None,
1997            reason: None,
1998            runtime_id: Some(format!("dockerid-{name}")),
1999            essential,
2000            cpu: None,
2001            memory: None,
2002            memory_reservation: None,
2003            network_bindings: Vec::new(),
2004            network_interfaces: Vec::new(),
2005            health_status: None,
2006            managed_agents: None,
2007            image_digest: None,
2008        }
2009    }
2010
2011    #[test]
2012    fn task_should_stop_when_essential_exits() {
2013        let containers = vec![
2014            RunningContainer {
2015                name: "app".into(),
2016                container_id: "id-app".into(),
2017                essential: true,
2018                exit_code: Some(0),
2019                network_bindings: Vec::new(),
2020                image_digest: None,
2021            },
2022            RunningContainer {
2023                name: "sidecar".into(),
2024                container_id: "id-sc".into(),
2025                essential: false,
2026                exit_code: None,
2027                network_bindings: Vec::new(),
2028                image_digest: None,
2029            },
2030        ];
2031        assert!(task_should_stop(&containers));
2032    }
2033
2034    #[test]
2035    fn task_keeps_running_when_only_non_essential_exits() {
2036        let containers = vec![
2037            RunningContainer {
2038                name: "app".into(),
2039                container_id: "id-app".into(),
2040                essential: true,
2041                exit_code: None,
2042                network_bindings: Vec::new(),
2043                image_digest: None,
2044            },
2045            RunningContainer {
2046                name: "sidecar".into(),
2047                container_id: "id-sc".into(),
2048                essential: false,
2049                exit_code: Some(0),
2050                network_bindings: Vec::new(),
2051                image_digest: None,
2052            },
2053        ];
2054        assert!(!task_should_stop(&containers));
2055    }
2056
2057    #[test]
2058    fn task_stops_when_all_non_essentials_exit() {
2059        let containers = vec![
2060            RunningContainer {
2061                name: "a".into(),
2062                container_id: "id-a".into(),
2063                essential: false,
2064                exit_code: Some(0),
2065                network_bindings: Vec::new(),
2066                image_digest: None,
2067            },
2068            RunningContainer {
2069                name: "b".into(),
2070                container_id: "id-b".into(),
2071                essential: false,
2072                exit_code: Some(1),
2073                network_bindings: Vec::new(),
2074                image_digest: None,
2075            },
2076        ];
2077        assert!(task_should_stop(&containers));
2078    }
2079
2080    #[test]
2081    fn finalize_stopped_multi_assigns_per_container_exit_codes() {
2082        let mut accounts: MultiAccountState<EcsState> =
2083            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2084        let acct = accounts.get_or_create("000000000000");
2085        let mut t = make_task("t1");
2086        t.containers = vec![
2087            make_container("app", true),
2088            make_container("sidecar", false),
2089        ];
2090        acct.tasks.insert("t1".into(), t);
2091        let state: SharedEcsState = Arc::new(RwLock::new(accounts));
2092
2093        let final_containers = vec![
2094            RunningContainer {
2095                name: "app".into(),
2096                container_id: "id-app".into(),
2097                essential: true,
2098                exit_code: Some(0),
2099                network_bindings: Vec::new(),
2100                image_digest: None,
2101            },
2102            RunningContainer {
2103                name: "sidecar".into(),
2104                container_id: "id-sc".into(),
2105                essential: false,
2106                exit_code: Some(137),
2107                network_bindings: Vec::new(),
2108                image_digest: None,
2109            },
2110        ];
2111        finalize_stopped_multi(
2112            &state,
2113            "000000000000",
2114            "t1",
2115            &final_containers,
2116            0,
2117            "captured",
2118            "EssentialContainerExited",
2119            None,
2120        );
2121
2122        let accounts = state.read();
2123        let task = accounts
2124            .get("000000000000")
2125            .unwrap()
2126            .tasks
2127            .get("t1")
2128            .unwrap();
2129        assert_eq!(task.last_status, "STOPPED");
2130        assert_eq!(task.stop_code.as_deref(), Some("EssentialContainerExited"));
2131        let app = task.containers.iter().find(|c| c.name == "app").unwrap();
2132        let sc = task
2133            .containers
2134            .iter()
2135            .find(|c| c.name == "sidecar")
2136            .unwrap();
2137        assert_eq!(app.exit_code, Some(0));
2138        assert_eq!(sc.exit_code, Some(137));
2139        assert_eq!(app.last_status, "STOPPED");
2140        assert_eq!(sc.last_status, "STOPPED");
2141    }
2142
2143    fn plan(name: &str, deps: &[&str]) -> ContainerPlan {
2144        ContainerPlan {
2145            container_name: name.into(),
2146            image: "alpine".into(),
2147            env: Vec::new(),
2148            entry_point: Vec::new(),
2149            command: Vec::new(),
2150            secrets_refs: Vec::new(),
2151            essential: true,
2152            has_task_role: false,
2153            port_mappings: Vec::new(),
2154            network_mode: None,
2155            depends_on: deps
2156                .iter()
2157                .map(|s| DependsOn {
2158                    container_name: (*s).to_string(),
2159                    condition: DependsOnCondition::Start,
2160                })
2161                .collect(),
2162            health_check: None,
2163            volume_mounts: Vec::new(),
2164            ulimits: Vec::new(),
2165            linux_parameters: None,
2166            stop_timeout: None,
2167            user: None,
2168            working_directory: None,
2169            tty: false,
2170            interactive: false,
2171            readonly_rootfs: false,
2172        }
2173    }
2174
2175    #[test]
2176    fn topo_sort_orders_by_depends_on() {
2177        // sidecar depends on app, so app must come first regardless of
2178        // declaration order.
2179        let plans = vec![plan("sidecar", &["app"]), plan("app", &[])];
2180        let ordered = topo_sort_plans(plans);
2181        assert_eq!(ordered[0].container_name, "app");
2182        assert_eq!(ordered[1].container_name, "sidecar");
2183    }
2184
2185    #[test]
2186    fn topo_sort_preserves_declaration_order_when_no_deps() {
2187        let plans = vec![plan("first", &[]), plan("second", &[]), plan("third", &[])];
2188        let ordered = topo_sort_plans(plans);
2189        let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2190        assert_eq!(names, vec!["first", "second", "third"]);
2191    }
2192
2193    #[test]
2194    fn topo_sort_handles_chain() {
2195        // c -> b -> a, declared in reverse so the topological sort must
2196        // bubble dependencies up.
2197        let plans = vec![plan("c", &["b"]), plan("b", &["a"]), plan("a", &[])];
2198        let ordered = topo_sort_plans(plans);
2199        let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2200        assert_eq!(names, vec!["a", "b", "c"]);
2201    }
2202
2203    #[test]
2204    fn topo_sort_ignores_unknown_dependency() {
2205        // depends_on names a container not in this task definition. Real
2206        // ECS would reject this at register time; we don't (yet), so the
2207        // unknown dep should just be skipped instead of stalling the sort.
2208        let plans = vec![plan("only", &["does-not-exist"])];
2209        let ordered = topo_sort_plans(plans);
2210        assert_eq!(ordered.len(), 1);
2211        assert_eq!(ordered[0].container_name, "only");
2212    }
2213
2214    #[test]
2215    fn topo_sort_recovers_from_cycle() {
2216        // Cyclic dependsOn: both plans should still appear in the output
2217        // so the runtime doesn't silently drop them.
2218        let plans = vec![plan("a", &["b"]), plan("b", &["a"])];
2219        let ordered = topo_sort_plans(plans);
2220        assert_eq!(ordered.len(), 2);
2221    }
2222
2223    #[test]
2224    fn parse_health_check_fills_aws_defaults() {
2225        let v = serde_json::json!({
2226            "command": ["CMD-SHELL", "curl -f http://localhost/ || exit 1"],
2227        });
2228        let hc = __test_parse_health_check(&v).expect("parsed");
2229        assert_eq!(hc.command[0], "CMD-SHELL");
2230        assert_eq!(hc.interval_seconds, 30);
2231        assert_eq!(hc.timeout_seconds, 5);
2232        assert_eq!(hc.retries, 3);
2233        assert_eq!(hc.start_period_seconds, 0);
2234    }
2235
2236    #[test]
2237    fn parse_health_check_overrides_explicit_values() {
2238        let v = serde_json::json!({
2239            "command": ["CMD", "/probe"],
2240            "interval": 7,
2241            "timeout": 2,
2242            "retries": 9,
2243            "startPeriod": 12,
2244        });
2245        let hc = __test_parse_health_check(&v).expect("parsed");
2246        assert_eq!(hc.interval_seconds, 7);
2247        assert_eq!(hc.timeout_seconds, 2);
2248        assert_eq!(hc.retries, 9);
2249        assert_eq!(hc.start_period_seconds, 12);
2250    }
2251
2252    #[test]
2253    fn parse_health_check_returns_none_for_none_sentinel() {
2254        // ECS uses ["NONE"] to disable an inherited HEALTHCHECK; we
2255        // skip emission rather than passing a literal `none` to docker.
2256        let v = serde_json::json!({ "command": ["NONE"] });
2257        assert!(__test_parse_health_check(&v).is_none());
2258    }
2259
2260    #[test]
2261    fn parse_health_check_returns_none_for_missing_command() {
2262        let v = serde_json::json!({ "interval": 30 });
2263        assert!(__test_parse_health_check(&v).is_none());
2264    }
2265
2266    #[test]
2267    fn render_health_flags_emits_full_set_for_cmd_shell() {
2268        let hc = HealthCheckSpec {
2269            command: vec!["CMD-SHELL".into(), "curl -f http://localhost/".into()],
2270            interval_seconds: 15,
2271            timeout_seconds: 3,
2272            retries: 4,
2273            start_period_seconds: 10,
2274        };
2275        let flags = render_health_flags(&hc);
2276        assert_eq!(flags[0], "--health-cmd");
2277        assert_eq!(flags[1], "curl -f http://localhost/");
2278        assert!(flags.contains(&"--health-interval=15s".to_string()));
2279        assert!(flags.contains(&"--health-timeout=3s".to_string()));
2280        assert!(flags.contains(&"--health-retries=4".to_string()));
2281        assert!(flags.contains(&"--health-start-period=10s".to_string()));
2282    }
2283
2284    #[test]
2285    fn render_health_flags_joins_cmd_argv_with_spaces() {
2286        // CMD form in ECS is argv-style; docker `--health-cmd` only
2287        // accepts a single shell string, so we collapse with spaces.
2288        let hc = HealthCheckSpec {
2289            command: vec![
2290                "CMD".into(),
2291                "/bin/probe".into(),
2292                "--port".into(),
2293                "8080".into(),
2294            ],
2295            interval_seconds: 30,
2296            timeout_seconds: 5,
2297            retries: 3,
2298            start_period_seconds: 0,
2299        };
2300        let flags = render_health_flags(&hc);
2301        assert_eq!(flags[1], "/bin/probe --port 8080");
2302    }
2303
2304    #[test]
2305    fn build_run_argv_emits_health_flags_when_present() {
2306        let plan = ContainerPlan {
2307            container_name: "app".into(),
2308            image: "alpine".into(),
2309            env: Vec::new(),
2310            entry_point: Vec::new(),
2311            command: Vec::new(),
2312            secrets_refs: Vec::new(),
2313            essential: true,
2314            has_task_role: false,
2315            port_mappings: Vec::new(),
2316            network_mode: None,
2317            depends_on: Vec::new(),
2318            health_check: Some(HealthCheckSpec {
2319                command: vec!["CMD-SHELL".into(), "true".into()],
2320                interval_seconds: 5,
2321                timeout_seconds: 2,
2322                retries: 1,
2323                start_period_seconds: 1,
2324            }),
2325            volume_mounts: Vec::new(),
2326            ulimits: Vec::new(),
2327            linux_parameters: None,
2328            stop_timeout: None,
2329            user: None,
2330            working_directory: None,
2331            tty: false,
2332            interactive: false,
2333            readonly_rootfs: false,
2334        };
2335        let argv = build_run_argv(
2336            &plan,
2337            &[],
2338            "task-1",
2339            "host.docker.internal",
2340            None,
2341            "alpine",
2342            true,
2343        );
2344        let joined = argv.join(" ");
2345        assert!(joined.contains("--health-cmd true"), "argv: {joined}");
2346        assert!(joined.contains("--health-interval=5s"), "argv: {joined}");
2347        assert!(joined.contains("--health-timeout=2s"), "argv: {joined}");
2348        assert!(joined.contains("--health-retries=1"), "argv: {joined}");
2349        assert!(
2350            joined.contains("--health-start-period=1s"),
2351            "argv: {joined}"
2352        );
2353    }
2354
2355    #[test]
2356    fn build_run_argv_emits_no_health_flags_when_absent() {
2357        let plan = ContainerPlan {
2358            container_name: "app".into(),
2359            image: "alpine".into(),
2360            env: Vec::new(),
2361            entry_point: Vec::new(),
2362            command: Vec::new(),
2363            secrets_refs: Vec::new(),
2364            essential: true,
2365            has_task_role: false,
2366            port_mappings: Vec::new(),
2367            network_mode: None,
2368            depends_on: Vec::new(),
2369            health_check: None,
2370            volume_mounts: Vec::new(),
2371            ulimits: Vec::new(),
2372            linux_parameters: None,
2373            stop_timeout: None,
2374            user: None,
2375            working_directory: None,
2376            tty: false,
2377            interactive: false,
2378            readonly_rootfs: false,
2379        };
2380        let argv = build_run_argv(
2381            &plan,
2382            &[],
2383            "task-1",
2384            "host.docker.internal",
2385            None,
2386            "alpine",
2387            true,
2388        );
2389        assert!(!argv.iter().any(|s| s.starts_with("--health")));
2390    }
2391
2392    #[test]
2393    fn docker_health_to_ecs_maps_known_states() {
2394        assert_eq!(docker_health_to_ecs("healthy"), "HEALTHY");
2395        assert_eq!(docker_health_to_ecs("HEALTHY"), "HEALTHY");
2396        assert_eq!(docker_health_to_ecs("unhealthy"), "UNHEALTHY");
2397        assert_eq!(docker_health_to_ecs("starting"), "UNKNOWN");
2398        assert_eq!(docker_health_to_ecs("none"), "UNKNOWN");
2399        assert_eq!(docker_health_to_ecs(""), "UNKNOWN");
2400    }
2401
2402    /// `host.sourcePath` becomes a host bind mount with the path
2403    /// passed straight through to docker.
2404    #[test]
2405    fn resolve_host_bind_volume_uses_source_path() {
2406        let mut volumes = std::collections::HashMap::new();
2407        let v = serde_json::json!({
2408            "name": "data",
2409            "host": { "sourcePath": "/var/lib/myapp" }
2410        });
2411        volumes.insert("data".to_string(), &v);
2412        let mp = serde_json::json!({
2413            "sourceVolume": "data",
2414            "containerPath": "/app/data",
2415            "readOnly": false
2416        });
2417        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2418        assert_eq!(resolved.source, "/var/lib/myapp");
2419        assert_eq!(resolved.container_path, "/app/data");
2420        assert!(!resolved.read_only);
2421    }
2422
2423    /// `readOnly: true` on the mount point appends `:ro` to the
2424    /// rendered docker `-v` flag.
2425    #[test]
2426    fn read_only_mount_renders_ro_suffix() {
2427        let plan = ContainerPlan {
2428            container_name: "app".into(),
2429            image: "alpine".into(),
2430            env: Vec::new(),
2431            entry_point: Vec::new(),
2432            command: Vec::new(),
2433            secrets_refs: Vec::new(),
2434            essential: true,
2435            has_task_role: false,
2436            port_mappings: Vec::new(),
2437            network_mode: None,
2438            depends_on: Vec::new(),
2439            health_check: None,
2440            volume_mounts: vec![VolumeMount {
2441                source: "/host/path".into(),
2442                container_path: "/in/container".into(),
2443                read_only: true,
2444                cleanup_on_stop: false,
2445            }],
2446            ulimits: Vec::new(),
2447            linux_parameters: None,
2448            stop_timeout: None,
2449            user: None,
2450            working_directory: None,
2451            tty: false,
2452            interactive: false,
2453            readonly_rootfs: false,
2454        };
2455        let argv = build_run_argv(
2456            &plan,
2457            &[],
2458            "task-1",
2459            "host.docker.internal",
2460            None,
2461            "alpine",
2462            true,
2463        );
2464        let pair = argv
2465            .windows(2)
2466            .find(|w| w[0] == "-v")
2467            .expect("expected -v flag");
2468        assert_eq!(pair[1], "/host/path:/in/container:ro");
2469    }
2470
2471    /// EFS volumes resolve to a stub directory under `/tmp/fakecloud/efs`
2472    /// keyed by `fileSystemId`. `rootDirectory` (when set and not `/`)
2473    /// is appended so different mount targets within the same
2474    /// filesystem stay isolated.
2475    #[test]
2476    fn resolve_efs_volume_uses_stub_dir() {
2477        let mut volumes = std::collections::HashMap::new();
2478        let v = serde_json::json!({
2479            "name": "efs-vol",
2480            "efsVolumeConfiguration": {
2481                "fileSystemId": "fs-12345678",
2482                "rootDirectory": "/exports/app"
2483            }
2484        });
2485        volumes.insert("efs-vol".to_string(), &v);
2486        let mp = serde_json::json!({
2487            "sourceVolume": "efs-vol",
2488            "containerPath": "/mnt/efs"
2489        });
2490        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2491        // EFS resolves to a docker named volume (container-safe), with the
2492        // rootDirectory folded into the name (bug-audit 2026-05-28, 0.6).
2493        assert_eq!(resolved.source, "fakecloud-efs-fs-12345678-exports-app");
2494        assert_eq!(resolved.container_path, "/mnt/efs");
2495    }
2496
2497    /// EFS without `rootDirectory` (or with `/`) maps to the root of
2498    /// the filesystem stub so multiple tasks targeting the same id
2499    /// share state.
2500    #[test]
2501    fn efs_without_root_directory_uses_filesystem_root() {
2502        // No rootDirectory (or "/") -> a single shared named volume per
2503        // filesystem id.
2504        assert_eq!(
2505            shared_volume_name("efs", "fs-abc", "/"),
2506            "fakecloud-efs-fs-abc"
2507        );
2508        assert_eq!(
2509            shared_volume_name("efs", "fs-abc", ""),
2510            "fakecloud-efs-fs-abc"
2511        );
2512    }
2513
2514    /// `dockerVolumeConfiguration` resolves to the volume name itself,
2515    /// which docker treats as a named volume reference. No host path
2516    /// is materialised — docker creates the volume on first reference.
2517    #[test]
2518    fn resolve_docker_named_volume_uses_volume_name() {
2519        let mut volumes = std::collections::HashMap::new();
2520        let v = serde_json::json!({
2521            "name": "named-vol",
2522            "dockerVolumeConfiguration": {
2523                "scope": "task",
2524                "driver": "local"
2525            }
2526        });
2527        volumes.insert("named-vol".to_string(), &v);
2528        let mp = serde_json::json!({
2529            "sourceVolume": "named-vol",
2530            "containerPath": "/data"
2531        });
2532        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2533        assert_eq!(resolved.source, "named-vol");
2534        assert_eq!(resolved.container_path, "/data");
2535    }
2536
2537    /// Only task-scoped docker volumes are flagged for removal on task stop;
2538    /// host binds, EFS/FSx shared stubs and scope=shared volumes persist.
2539    #[test]
2540    fn cleanup_on_stop_matches_aws_volume_scope() {
2541        let cases = [
2542            // (volume json, expected cleanup_on_stop)
2543            (
2544                serde_json::json!({ "name": "v", "host": { "sourcePath": "/data" } }),
2545                false,
2546            ),
2547            (
2548                serde_json::json!({ "name": "v", "efsVolumeConfiguration": { "fileSystemId": "fs-a" } }),
2549                false,
2550            ),
2551            (
2552                serde_json::json!({ "name": "v", "fsxWindowsFileServerVolumeConfiguration": { "fileSystemId": "fs-b" } }),
2553                false,
2554            ),
2555            (
2556                serde_json::json!({ "name": "v", "dockerVolumeConfiguration": { "scope": "shared" } }),
2557                false,
2558            ),
2559            (
2560                serde_json::json!({ "name": "v", "dockerVolumeConfiguration": { "scope": "task" } }),
2561                true,
2562            ),
2563            // scope omitted defaults to task.
2564            (
2565                serde_json::json!({ "name": "v", "dockerVolumeConfiguration": {} }),
2566                true,
2567            ),
2568            // bare entry and empty host sourcePath -> anonymous task volume.
2569            (serde_json::json!({ "name": "v" }), true),
2570            (
2571                serde_json::json!({ "name": "v", "host": { "sourcePath": "" } }),
2572                true,
2573            ),
2574        ];
2575        for (vol, expected) in cases {
2576            assert_eq!(
2577                volume_is_task_scoped(&vol),
2578                expected,
2579                "unexpected cleanup_on_stop for {vol}"
2580            );
2581        }
2582    }
2583
2584    /// FSx for Windows uses the same stub-directory pattern as EFS but
2585    /// scoped under `/tmp/fakecloud/fsx/<filesystemId>/`.
2586    #[test]
2587    fn resolve_fsx_volume_uses_stub_dir() {
2588        let mut volumes = std::collections::HashMap::new();
2589        let v = serde_json::json!({
2590            "name": "fsx-vol",
2591            "fsxWindowsFileServerVolumeConfiguration": {
2592                "fileSystemId": "fs-xyz",
2593                "rootDirectory": "share"
2594            }
2595        });
2596        volumes.insert("fsx-vol".to_string(), &v);
2597        let mp = serde_json::json!({
2598            "sourceVolume": "fsx-vol",
2599            "containerPath": "C:\\data"
2600        });
2601        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2602        // FSx resolves to a docker named volume (bug-audit 2026-05-28, 0.6).
2603        assert_eq!(resolved.source, "fakecloud-fsx-fs-xyz-share");
2604    }
2605
2606    /// Mount points that reference an undeclared `sourceVolume` resolve
2607    /// to `None` so `build_container_plans` skips them rather than
2608    /// emitting a broken `-v` flag.
2609    #[test]
2610    fn unknown_source_volume_returns_none() {
2611        let volumes = std::collections::HashMap::new();
2612        let mp = serde_json::json!({
2613            "sourceVolume": "missing",
2614            "containerPath": "/x"
2615        });
2616        assert!(resolve_mount_point(&mp, &volumes).is_none());
2617    }
2618
2619    /// `find_depends_on_cycle` returns the back-edge endpoints when a
2620    /// trivial 2-cycle exists. Real ECS would reject this at register
2621    /// time; our service-level handler relies on this helper.
2622    #[test]
2623    fn find_depends_on_cycle_detects_two_node_cycle() {
2624        let cds = vec![
2625            serde_json::json!({
2626                "name": "a",
2627                "image": "alpine",
2628                "dependsOn": [{"containerName": "b", "condition": "START"}],
2629            }),
2630            serde_json::json!({
2631                "name": "b",
2632                "image": "alpine",
2633                "dependsOn": [{"containerName": "a", "condition": "START"}],
2634            }),
2635        ];
2636        let cycle = find_depends_on_cycle(&cds);
2637        assert!(cycle.is_some(), "expected cycle to be detected");
2638    }
2639
2640    /// A three-node chain (a -> b -> c) is acyclic and must not be
2641    /// flagged. Guards against an over-eager DFS reporting back-edges
2642    /// from already-finished nodes.
2643    #[test]
2644    fn find_depends_on_cycle_accepts_chain() {
2645        let cds = vec![
2646            serde_json::json!({
2647                "name": "a",
2648                "image": "alpine",
2649                "dependsOn": [{"containerName": "b", "condition": "START"}],
2650            }),
2651            serde_json::json!({
2652                "name": "b",
2653                "image": "alpine",
2654                "dependsOn": [{"containerName": "c", "condition": "START"}],
2655            }),
2656            serde_json::json!({
2657                "name": "c",
2658                "image": "alpine",
2659            }),
2660        ];
2661        assert!(find_depends_on_cycle(&cds).is_none());
2662    }
2663
2664    /// `dependsOn[]` entries that name a container outside the task
2665    /// definition are ignored by the cycle check (they can't form a
2666    /// cycle by definition; runtime also drops them).
2667    #[test]
2668    fn find_depends_on_cycle_ignores_unknown_target() {
2669        let cds = vec![serde_json::json!({
2670            "name": "only",
2671            "image": "alpine",
2672            "dependsOn": [{"containerName": "ghost", "condition": "START"}],
2673        })];
2674        assert!(find_depends_on_cycle(&cds).is_none());
2675    }
2676
2677    /// `condition_is_met` covers each AWS condition value against a
2678    /// simulated docker inspect snapshot. Pinning these mappings here
2679    /// catches accidental re-orderings of the match arms.
2680    #[test]
2681    fn condition_is_met_matches_aws_semantics() {
2682        let running = InspectedState {
2683            started: true,
2684            exited: false,
2685            exit_code: 0,
2686            health: None,
2687        };
2688        let exited_ok = InspectedState {
2689            started: true,
2690            exited: true,
2691            exit_code: 0,
2692            health: None,
2693        };
2694        let exited_fail = InspectedState {
2695            started: true,
2696            exited: true,
2697            exit_code: 1,
2698            health: None,
2699        };
2700        let healthy = InspectedState {
2701            started: true,
2702            exited: false,
2703            exit_code: 0,
2704            health: Some("healthy".into()),
2705        };
2706
2707        // START is satisfied as soon as the container has started, even
2708        // if it later exited.
2709        assert!(condition_is_met(DependsOnCondition::Start, &running));
2710        assert!(condition_is_met(DependsOnCondition::Start, &exited_ok));
2711
2712        // COMPLETE requires an exit, regardless of code.
2713        assert!(!condition_is_met(DependsOnCondition::Complete, &running));
2714        assert!(condition_is_met(DependsOnCondition::Complete, &exited_ok));
2715        assert!(condition_is_met(DependsOnCondition::Complete, &exited_fail));
2716
2717        // SUCCESS requires an exit AND code 0.
2718        assert!(!condition_is_met(DependsOnCondition::Success, &running));
2719        assert!(condition_is_met(DependsOnCondition::Success, &exited_ok));
2720        assert!(!condition_is_met(DependsOnCondition::Success, &exited_fail));
2721
2722        // HEALTHY requires Health.Status == "healthy".
2723        assert!(!condition_is_met(DependsOnCondition::Healthy, &running));
2724        assert!(condition_is_met(DependsOnCondition::Healthy, &healthy));
2725    }
2726
2727    /// `DependsOnCondition::parse` accepts the four AWS-spelled values
2728    /// and rejects everything else — register-time validation depends on
2729    /// this returning `None` for unknowns.
2730    #[test]
2731    fn depends_on_condition_parse_round_trips() {
2732        assert_eq!(
2733            DependsOnCondition::parse("START"),
2734            Some(DependsOnCondition::Start)
2735        );
2736        assert_eq!(
2737            DependsOnCondition::parse("COMPLETE"),
2738            Some(DependsOnCondition::Complete)
2739        );
2740        assert_eq!(
2741            DependsOnCondition::parse("SUCCESS"),
2742            Some(DependsOnCondition::Success)
2743        );
2744        assert_eq!(
2745            DependsOnCondition::parse("HEALTHY"),
2746            Some(DependsOnCondition::Healthy)
2747        );
2748        assert_eq!(DependsOnCondition::parse("start"), None);
2749        assert_eq!(DependsOnCondition::parse("ANY"), None);
2750    }
2751
2752    // ── ulimits + linuxParameters + misc docker flags (O6) ──
2753
2754    #[test]
2755    fn build_run_argv_emits_ulimits() {
2756        let plan = ContainerPlan {
2757            container_name: "app".into(),
2758            image: "alpine".into(),
2759            env: Vec::new(),
2760            entry_point: Vec::new(),
2761            command: Vec::new(),
2762            secrets_refs: Vec::new(),
2763            essential: true,
2764            has_task_role: false,
2765            port_mappings: Vec::new(),
2766            network_mode: None,
2767            depends_on: Vec::new(),
2768            health_check: None,
2769            volume_mounts: Vec::new(),
2770            ulimits: vec![Ulimit {
2771                name: "nofile".into(),
2772                soft_limit: 1024,
2773                hard_limit: 2048,
2774            }],
2775            linux_parameters: None,
2776            stop_timeout: None,
2777            user: None,
2778            working_directory: None,
2779            tty: false,
2780            interactive: false,
2781            readonly_rootfs: false,
2782        };
2783        let argv = build_run_argv(&plan, &[], "t", "host.docker.internal", None, "img", true);
2784        assert!(argv.contains(&"--ulimit".to_string()));
2785        assert!(argv.contains(&"nofile=1024:2048".to_string()));
2786    }
2787
2788    #[test]
2789    fn build_run_argv_emits_linux_parameters() {
2790        let plan = ContainerPlan {
2791            container_name: "app".into(),
2792            image: "alpine".into(),
2793            env: Vec::new(),
2794            entry_point: Vec::new(),
2795            command: Vec::new(),
2796            secrets_refs: Vec::new(),
2797            essential: true,
2798            has_task_role: false,
2799            port_mappings: Vec::new(),
2800            network_mode: None,
2801            depends_on: Vec::new(),
2802            health_check: None,
2803            volume_mounts: Vec::new(),
2804            ulimits: Vec::new(),
2805            linux_parameters: Some(LinuxParameters {
2806                capabilities_add: vec!["NET_ADMIN".into()],
2807                capabilities_drop: vec!["ALL".into()],
2808                devices: vec![Device {
2809                    host_path: "/dev/zero".into(),
2810                    container_path: "/dev/zero".into(),
2811                    permissions: "rwm".into(),
2812                }],
2813                init_process_enabled: true,
2814                shared_memory_size: Some(256),
2815                sysctls: vec![Sysctl {
2816                    name: "net.ipv4.ip_forward".into(),
2817                    value: "1".into(),
2818                }],
2819                tmpfs: vec![Tmpfs {
2820                    container_path: "/tmp".into(),
2821                    size: 128,
2822                    mount_options: vec!["noexec".into()],
2823                }],
2824                privileged: true,
2825            }),
2826            stop_timeout: Some(30),
2827            user: Some("1000:1000".into()),
2828            working_directory: Some("/app".into()),
2829            tty: true,
2830            interactive: true,
2831            readonly_rootfs: true,
2832        };
2833        let argv = build_run_argv(&plan, &[], "t", "host.docker.internal", None, "img", true);
2834        assert!(argv.contains(&"--cap-add".to_string()));
2835        assert!(argv.contains(&"NET_ADMIN".to_string()));
2836        assert!(argv.contains(&"--cap-drop".to_string()));
2837        assert!(argv.contains(&"ALL".to_string()));
2838        assert!(argv.contains(&"--device".to_string()));
2839        assert!(argv.contains(&"/dev/zero:/dev/zerorwm".to_string()));
2840        assert!(argv.contains(&"--init".to_string()));
2841        assert!(argv.contains(&"--shm-size".to_string()));
2842        assert!(argv.contains(&"256m".to_string()));
2843        assert!(argv.contains(&"--sysctl".to_string()));
2844        assert!(argv.contains(&"net.ipv4.ip_forward=1".to_string()));
2845        assert!(argv.contains(&"--tmpfs".to_string()));
2846        assert!(argv.contains(&"--privileged".to_string()));
2847        assert!(argv.contains(&"--stop-timeout".to_string()));
2848        assert!(argv.contains(&"30".to_string()));
2849        assert!(argv.contains(&"--user".to_string()));
2850        assert!(argv.contains(&"1000:1000".to_string()));
2851        assert!(argv.contains(&"--workdir".to_string()));
2852        assert!(argv.contains(&"/app".to_string()));
2853        assert!(argv.contains(&"--tty".to_string()));
2854        assert!(argv.contains(&"--interactive".to_string()));
2855        assert!(argv.contains(&"--read-only".to_string()));
2856    }
2857
2858    #[test]
2859    fn parse_linux_parameters_fills_defaults() {
2860        let raw = serde_json::json!({"initProcessEnabled": true});
2861        let lp = parse_linux_parameters(&raw).expect("parses");
2862        assert!(lp.init_process_enabled);
2863        assert!(!lp.privileged);
2864        assert!(lp.capabilities_add.is_empty());
2865    }
2866
2867    #[test]
2868    fn parse_device_uses_default_permissions() {
2869        let raw = serde_json::json!({"hostPath": "/dev/null", "containerPath": "/dev/null"});
2870        let dev = parse_device(&raw).expect("parses");
2871        assert_eq!(dev.permissions, "rwm");
2872    }
2873
2874    #[test]
2875    fn compute_elbv2_targets_empty_when_no_group() {
2876        let mut accounts: MultiAccountState<EcsState> =
2877            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2878        let acct = accounts.get_or_create("000000000000");
2879        let mut task = make_task("t1");
2880        task.group = None;
2881        acct.tasks.insert("t1".into(), task);
2882        let state = acct.clone();
2883        let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2884        assert!(targets.is_empty());
2885    }
2886
2887    #[test]
2888    fn compute_elbv2_targets_bridge_mode_uses_localhost_and_host_port() {
2889        let mut accounts: MultiAccountState<EcsState> =
2890            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2891        let acct = accounts.get_or_create("000000000000");
2892
2893        let td = crate::state::TaskDefinition {
2894            family: "app".into(),
2895            revision: 1,
2896            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2897            container_definitions: Vec::new(),
2898            network_mode: Some("bridge".into()),
2899            status: "ACTIVE".into(),
2900            task_role_arn: None,
2901            execution_role_arn: None,
2902            requires_compatibilities: Vec::new(),
2903            compatibilities: Vec::new(),
2904            cpu: None,
2905            memory: None,
2906            pid_mode: None,
2907            ipc_mode: None,
2908            volumes: Vec::new(),
2909            placement_constraints: Vec::new(),
2910            proxy_configuration: None,
2911            inference_accelerators: Vec::new(),
2912            ephemeral_storage: None,
2913            runtime_platform: None,
2914            requires_attributes: Vec::new(),
2915            registered_at: Utc::now(),
2916            registered_by: None,
2917            deregistered_at: None,
2918            tags: Vec::new(),
2919            enable_fault_injection: None,
2920        };
2921        acct.task_definitions.insert("app".into(), {
2922            let mut m = std::collections::BTreeMap::new();
2923            m.insert(1, td);
2924            m
2925        });
2926
2927        let service = crate::state::Service {
2928            service_name: "svc".into(),
2929            service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
2930            cluster_name: "default".into(),
2931            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2932            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2933            family: "app".into(),
2934            revision: 1,
2935            desired_count: 1,
2936            running_count: 0,
2937            pending_count: 0,
2938            launch_type: "FARGATE".into(),
2939            status: "ACTIVE".into(),
2940            scheduling_strategy: "REPLICA".into(),
2941            deployment_controller: "ECS".into(),
2942            minimum_healthy_percent: Some(0),
2943            maximum_percent: Some(200),
2944            circuit_breaker: None,
2945            deployments: Vec::new(),
2946            load_balancers: vec![serde_json::json!({
2947                "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
2948                "containerName": "app",
2949                "containerPort": 80,
2950            })],
2951            service_registries: Vec::new(),
2952            placement_constraints: Vec::new(),
2953            placement_strategy: Vec::new(),
2954            network_configuration: None,
2955            volume_configurations: vec![],
2956            tags: Vec::new(),
2957            created_at: Utc::now(),
2958            created_by: None,
2959            role_arn: None,
2960            platform_version: None,
2961            health_check_grace_period_seconds: None,
2962            enable_execute_command: false,
2963            enable_ecs_managed_tags: false,
2964            propagate_tags: None,
2965            capacity_provider_strategy: Vec::new(),
2966            availability_zone_rebalancing: None,
2967        };
2968        acct.services.insert(
2969            crate::state::EcsState::service_key("default", "svc"),
2970            service,
2971        );
2972
2973        let mut task = make_task("t1");
2974        task.group = Some("service:svc".into());
2975        task.containers = vec![crate::state::Container {
2976            container_arn: "arn:aws:ecs:us-east-1:000000000000:container/default/abc/app".into(),
2977            name: "app".into(),
2978            image: "alpine".into(),
2979            task_arn: task.task_arn.clone(),
2980            last_status: "RUNNING".into(),
2981            exit_code: None,
2982            reason: None,
2983            runtime_id: Some("dockerid-app".into()),
2984            essential: true,
2985            cpu: None,
2986            memory: None,
2987            memory_reservation: None,
2988            network_bindings: vec![serde_json::json!({
2989                "bindIP": "0.0.0.0",
2990                "containerPort": 80,
2991                "hostPort": 32768,
2992                "protocol": "tcp",
2993            })],
2994            network_interfaces: Vec::new(),
2995            health_status: None,
2996            managed_agents: None,
2997            image_digest: None,
2998        }];
2999        acct.tasks.insert("t1".into(), task);
3000
3001        let state = acct.clone();
3002        let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
3003        assert_eq!(targets.len(), 1);
3004        let (arn, tg_targets) = &targets[0];
3005        assert_eq!(
3006            arn,
3007            "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
3008        );
3009        assert_eq!(tg_targets.len(), 1);
3010        assert_eq!(tg_targets[0].0, "127.0.0.1");
3011        assert_eq!(tg_targets[0].1, Some(32768));
3012    }
3013
3014    #[test]
3015    fn compute_elbv2_targets_awsvpc_uses_eni_ip() {
3016        let mut accounts: MultiAccountState<EcsState> =
3017            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
3018        let acct = accounts.get_or_create("000000000000");
3019
3020        let td = crate::state::TaskDefinition {
3021            family: "app".into(),
3022            revision: 1,
3023            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
3024            container_definitions: Vec::new(),
3025            network_mode: Some("awsvpc".into()),
3026            status: "ACTIVE".into(),
3027            task_role_arn: None,
3028            execution_role_arn: None,
3029            requires_compatibilities: Vec::new(),
3030            compatibilities: Vec::new(),
3031            cpu: None,
3032            memory: None,
3033            pid_mode: None,
3034            ipc_mode: None,
3035            volumes: Vec::new(),
3036            placement_constraints: Vec::new(),
3037            proxy_configuration: None,
3038            inference_accelerators: Vec::new(),
3039            ephemeral_storage: None,
3040            runtime_platform: None,
3041            requires_attributes: Vec::new(),
3042            registered_at: Utc::now(),
3043            registered_by: None,
3044            deregistered_at: None,
3045            tags: Vec::new(),
3046            enable_fault_injection: None,
3047        };
3048        acct.task_definitions.insert("app".into(), {
3049            let mut m = std::collections::BTreeMap::new();
3050            m.insert(1, td);
3051            m
3052        });
3053
3054        let service = crate::state::Service {
3055            service_name: "svc".into(),
3056            service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
3057            cluster_name: "default".into(),
3058            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
3059            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
3060            family: "app".into(),
3061            revision: 1,
3062            desired_count: 1,
3063            running_count: 0,
3064            pending_count: 0,
3065            launch_type: "FARGATE".into(),
3066            status: "ACTIVE".into(),
3067            scheduling_strategy: "REPLICA".into(),
3068            deployment_controller: "ECS".into(),
3069            minimum_healthy_percent: Some(0),
3070            maximum_percent: Some(200),
3071            circuit_breaker: None,
3072            deployments: Vec::new(),
3073            load_balancers: vec![serde_json::json!({
3074                "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
3075                "containerName": "app",
3076                "containerPort": 80,
3077            })],
3078            service_registries: Vec::new(),
3079            placement_constraints: Vec::new(),
3080            placement_strategy: Vec::new(),
3081            network_configuration: None,
3082            volume_configurations: vec![],
3083            tags: Vec::new(),
3084            created_at: Utc::now(),
3085            created_by: None,
3086            role_arn: None,
3087            platform_version: None,
3088            health_check_grace_period_seconds: None,
3089            enable_execute_command: false,
3090            enable_ecs_managed_tags: false,
3091            propagate_tags: None,
3092            capacity_provider_strategy: Vec::new(),
3093            availability_zone_rebalancing: None,
3094        };
3095        acct.services.insert(
3096            crate::state::EcsState::service_key("default", "svc"),
3097            service,
3098        );
3099
3100        let mut task = make_task("t1");
3101        task.group = Some("service:svc".into());
3102        task.attachments = vec![crate::state::TaskAttachment {
3103            id: "eni-123".into(),
3104            attachment_type: "eni".into(),
3105            status: "ATTACHED".into(),
3106            details: vec![
3107                crate::state::AttachmentDetail {
3108                    name: "privateIPv4Address".into(),
3109                    value: "172.18.0.2".into(),
3110                },
3111                crate::state::AttachmentDetail {
3112                    name: "macAddress".into(),
3113                    value: "02:42:ac:12:00:02".into(),
3114                },
3115            ],
3116        }];
3117        acct.tasks.insert("t1".into(), task);
3118
3119        let state = acct.clone();
3120        let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
3121        assert_eq!(targets.len(), 1);
3122        let (arn, tg_targets) = &targets[0];
3123        assert_eq!(
3124            arn,
3125            "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
3126        );
3127        assert_eq!(tg_targets.len(), 1);
3128        assert_eq!(tg_targets[0].0, "172.18.0.2");
3129        assert_eq!(tg_targets[0].1, Some(80));
3130    }
3131
3132    fn minimal_plan() -> ContainerPlan {
3133        ContainerPlan {
3134            container_name: "app".into(),
3135            image: "alpine".into(),
3136            env: Vec::new(),
3137            entry_point: Vec::new(),
3138            command: Vec::new(),
3139            secrets_refs: Vec::new(),
3140            essential: true,
3141            has_task_role: false,
3142            port_mappings: Vec::new(),
3143            network_mode: None,
3144            depends_on: Vec::new(),
3145            health_check: None,
3146            volume_mounts: Vec::new(),
3147            ulimits: Vec::new(),
3148            linux_parameters: None,
3149            stop_timeout: None,
3150            user: None,
3151            working_directory: None,
3152            tty: false,
3153            interactive: false,
3154            readonly_rootfs: false,
3155        }
3156    }
3157
3158    /// 4.1 — every ECS task container must carry the shared
3159    /// `fakecloud-instance` ownership label so the startup reaper picks it
3160    /// up after an ungraceful restart (it filters strictly on that label).
3161    #[test]
3162    fn build_run_argv_emits_fakecloud_instance_label() {
3163        let plan = minimal_plan();
3164        let argv = build_run_argv(
3165            &plan,
3166            &[],
3167            "task-1",
3168            "host.docker.internal",
3169            None,
3170            "alpine",
3171            true,
3172        );
3173        let expected = fakecloud_instance_label();
3174        assert!(
3175            argv.windows(2)
3176                .any(|w| w[0] == "--label" && w[1] == expected),
3177            "argv must contain `--label {expected}`: {argv:?}",
3178        );
3179    }
3180
3181    /// 4.1 — the label value must be exactly the shape the reaper parses:
3182    /// `fakecloud-instance=fakecloud-<pid>`. The reaper strips the
3183    /// `fakecloud-` prefix off the value and `parse::<u32>()`s the rest, so a
3184    /// non-numeric tail (e.g. a task id) would silently never reap.
3185    #[test]
3186    fn fakecloud_instance_label_matches_reaper_format() {
3187        let label = fakecloud_instance_label();
3188        let (key, value) = label.split_once('=').expect("label is key=value");
3189        assert_eq!(key, "fakecloud-instance");
3190        let pid_str = value
3191            .strip_prefix("fakecloud-")
3192            .expect("value starts with fakecloud-");
3193        assert_eq!(
3194            pid_str.parse::<u32>().ok(),
3195            Some(std::process::id()),
3196            "reaper must be able to parse the owning pid out of {label}",
3197        );
3198    }
3199}