Skip to main content

fakecloud_ecs/runtime/
mod.rs

1//! Docker/Podman-based ECS task execution.
2//!
3//! Mirrors the Lambda `ContainerRuntime` approach (auto-detect CLI, forward
4//! localhost → host.docker.internal) but scoped for ECS's different
5//! lifecycle: tasks are ephemeral, so there is no warm-container pool. Each
6//! `run_task` spawns a background tokio task that pulls the image, starts
7//! the container, waits for exit, captures logs, and updates shared ECS
8//! state in place.
9
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use base64::Engine;
15use chrono::Utc;
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_logs::ingest::{append_events, IngestEvent};
18use fakecloud_logs::SharedLogsState;
19use fakecloud_secretsmanager::SharedSecretsManagerState;
20use fakecloud_ssm::SharedSsmState;
21use parking_lot::RwLock;
22use tempfile::TempDir;
23use tokio::process::Command;
24
25use crate::state::{LifecycleEvent, SharedEcsState};
26
27#[derive(Debug, thiserror::Error)]
28pub enum RuntimeError {
29    #[error("container CLI not found (tried docker, podman)")]
30    NoCli,
31    #[error("image pull failed: {0}")]
32    ImagePull(String),
33    #[error("container start failed: {0}")]
34    ContainerStart(String),
35    #[error("docker wait failed: {0}")]
36    Wait(String),
37}
38
39/// Docker/Podman executor for ECS tasks.
40pub struct EcsRuntime {
41    cli: String,
42    /// Container-to-host networking resolution (host alias, `--add-host`
43    /// arg, sibling-container address) shared with the other runtimes via
44    /// [`fakecloud_core::container_net`]. Carries the issue #1539 podman +
45    /// in-container fixes.
46    net: fakecloud_core::container_net::HostNetworking,
47    /// Port the main fakecloud server bound to. Used to translate AWS
48    /// ECR URIs (`<acct>.dkr.ecr.<region>.amazonaws.com/<repo>:<tag>`) to
49    /// the local OCI v2 endpoint (`127.0.0.1:<port>/<repo>:<tag>`) so
50    /// tasks can pull images pushed to fakecloud's own ECR.
51    server_port: u16,
52    /// Isolated DOCKER_CONFIG dir pre-populated with Basic auth for
53    /// `127.0.0.1:<port>`; keeps the host user's `~/.docker/config.json`
54    /// untouched and lets `docker pull` succeed against fakecloud ECR
55    /// without a prior `aws ecr get-login-password | docker login`.
56    docker_config: Option<Arc<TempDir>>,
57    /// Tracks per-task lists of `(container_name, docker_container_id)` so
58    /// `stop_task` can kill every container backing a task — multi-container
59    /// task definitions launch one docker container per `containerDefinitions`
60    /// entry, all of which must be torn down on stop.
61    containers: RwLock<std::collections::HashMap<String, Vec<(String, String)>>>,
62    /// Cross-service delivery bus — emits `aws.ecs` EventBridge events
63    /// on task state transitions when wired. `None` if the server started
64    /// without EventBridge configured (or for unit tests).
65    delivery_bus: Option<Arc<DeliveryBus>>,
66    /// CloudWatch Logs state — when set, tasks whose container definition
67    /// declares the `awslogs` log driver get their captured stdout/stderr
68    /// forwarded to a log group/stream under this shared state.
69    logs_state: Option<SharedLogsState>,
70    /// SecretsManager state for resolving `containerDefinition.secrets[]`
71    /// entries whose `valueFrom` is a SecretsManager ARN.
72    secretsmanager_state: Option<SharedSecretsManagerState>,
73    /// SSM Parameter Store state for resolving `secrets[]` entries whose
74    /// `valueFrom` is an SSM parameter ARN.
75    ssm_state: Option<SharedSsmState>,
76    /// `Some` when running on the Kubernetes backend; `run_task` then maps
77    /// each task to a Pod instead of `docker run`. `None` is the default
78    /// Docker/Podman backend, and the fields above drive it.
79    k8s: Option<k8s::K8sTaskBackend>,
80}
81
82mod config;
83mod k8s;
84mod lb;
85mod monitoring;
86mod secrets;
87mod task_lifecycle;
88
89impl EcsRuntime {
90    /// Auto-detect Docker or Podman. Returns `None` if neither is
91    /// available. Honours `FAKECLOUD_CONTAINER_CLI` for explicit override.
92    /// `server_port` is the port the main fakecloud server bound to;
93    /// needed to resolve AWS ECR URIs against the local OCI v2 registry.
94    pub fn new(server_port: u16) -> Option<Self> {
95        let cli = fakecloud_core::container_net::detect_container_cli()?;
96        let net = fakecloud_core::container_net::HostNetworking::detect(&cli);
97        let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
98        Some(Self {
99            cli,
100            net,
101            server_port,
102            docker_config,
103            containers: RwLock::new(std::collections::HashMap::new()),
104            delivery_bus: None,
105            logs_state: None,
106            secretsmanager_state: None,
107            ssm_state: None,
108            k8s: None,
109        })
110    }
111
112    /// Construct the Kubernetes backend. `server_port` is fakecloud's
113    /// bound port (used when `FAKECLOUD_K8S_SELF_URL` omits one). Fails
114    /// fast on misconfiguration — never silently degrades to Docker.
115    pub async fn new_k8s(server_port: u16) -> Result<Self, k8s::BackendInitError> {
116        let backend = k8s::K8sTaskBackend::from_env(server_port).await?;
117        // Docker fields are inert on the k8s backend; populate the cheap
118        // ones and leave docker_config unset.
119        let net = fakecloud_core::container_net::HostNetworking {
120            host_alias: String::new(),
121            add_host_arg: None,
122            sibling_host: String::new(),
123        };
124        Ok(Self {
125            cli: String::new(),
126            net,
127            server_port,
128            docker_config: None,
129            containers: RwLock::new(std::collections::HashMap::new()),
130            delivery_bus: None,
131            logs_state: None,
132            secretsmanager_state: None,
133            ssm_state: None,
134            k8s: Some(backend),
135        })
136    }
137
138    /// Backend name for logging.
139    pub fn cli_name(&self) -> &str {
140        if self.k8s.is_some() {
141            "kubernetes"
142        } else {
143            &self.cli
144        }
145    }
146
147    /// Sweep task Pods orphaned by a previous process (k8s only; no-op on
148    /// the Docker backend, handled by the shared container reaper).
149    pub async fn reap_stale(&self) {
150        if let Some(k) = &self.k8s {
151            k.reap_stale().await;
152        }
153    }
154
155    /// Wire EventBridge delivery so task state transitions emit
156    /// `aws.ecs` / `ECS Task State Change` events.
157    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
158        self.delivery_bus = Some(bus);
159        self
160    }
161
162    /// Wire CloudWatch Logs state so tasks using the `awslogs` driver
163    /// get their captured stdout/stderr forwarded.
164    pub fn with_logs(mut self, logs: SharedLogsState) -> Self {
165        self.logs_state = Some(logs);
166        self
167    }
168}
169
170/// Per-container launch plan derived from a task definition.
171#[derive(Clone, Debug)]
172pub(crate) struct ContainerPlan {
173    pub(crate) container_name: String,
174    pub(crate) image: String,
175    pub(crate) env: Vec<(String, String)>,
176    pub(crate) entry_point: Vec<String>,
177    pub(crate) command: Vec<String>,
178    pub(crate) secrets_refs: Vec<(String, String)>,
179    pub(crate) essential: bool,
180    pub(crate) has_task_role: bool,
181    /// Port mappings parsed from the task definition. Each entry becomes
182    /// a `--publish containerPort:hostPort/protocol` flag on the docker
183    /// run command (except for `awsvpc`, where ports are exposed via the
184    /// per-task ENI rather than the docker host's port table).
185    pub(crate) port_mappings: Vec<PortMapping>,
186    /// Task-level network mode propagated to every container plan so the
187    /// argv builder can decide whether to emit `--publish` flags. Real
188    /// ECS treats `awsvpc` as "container is on its own ENI"; the
189    /// equivalent in fakecloud is "don't publish to the host".
190    pub(crate) network_mode: Option<String>,
191    /// Container dependencies parsed from `dependsOn[]`. Each entry pairs
192    /// the target container name with the condition that must be observed
193    /// before this container is launched: `START` (target exists/running),
194    /// `COMPLETE` (target exited, any code), `SUCCESS` (target exited with
195    /// code 0), or `HEALTHY` (target's docker `Health.Status` is `healthy`).
196    /// Used both to topologically order the launch loop and to gate each
197    /// `docker run` on the upstream condition.
198    pub(crate) depends_on: Vec<DependsOn>,
199    /// Parsed `healthCheck` from the task definition. Translated into
200    /// docker `--health-*` flags on `docker run` so the container's
201    /// health is observable via `docker inspect .State.Health.Status`.
202    /// `None` when the task definition doesn't declare a healthCheck;
203    /// the container's `healthStatus` then stays `UNKNOWN` (matching ECS
204    /// behaviour for tasks without a health probe).
205    pub(crate) health_check: Option<HealthCheckSpec>,
206    /// Volume mounts resolved by joining the container definition's
207    /// `mountPoints[]` with the task definition's `volumes[]`. Each entry
208    /// renders as one `-v` flag on the `docker run` invocation. Empty when
209    /// the container has no mount points or no matching volume entries.
210    pub(crate) volume_mounts: Vec<VolumeMount>,
211    /// Parsed `ulimits` from the container definition. Each entry becomes
212    /// `--ulimit <name>=<soft>:<hard>` on `docker run`.
213    pub(crate) ulimits: Vec<Ulimit>,
214    /// Parsed `linuxParameters` from the container definition. Emits
215    /// `--cap-add`, `--cap-drop`, `--device`, `--init`, `--shm-size`,
216    /// `--sysctl`, `--tmpfs`, `--privileged`, and `--read-only` flags.
217    pub(crate) linux_parameters: Option<LinuxParameters>,
218    /// `stopTimeout` in seconds. Becomes `--stop-timeout <N>` on `docker run`.
219    pub(crate) stop_timeout: Option<u32>,
220    /// `user` from the container definition. Becomes `--user <value>`.
221    pub(crate) user: Option<String>,
222    /// `workingDirectory` from the container definition. Becomes `--workdir`.
223    pub(crate) working_directory: Option<String>,
224    /// `tty` from the container definition. Emits `--tty` when true.
225    pub(crate) tty: bool,
226    /// `interactive` from the container definition. Emits `--interactive` when true.
227    pub(crate) interactive: bool,
228    /// `readonlyRootFilesystem` from the container definition. Emits `--read-only` when true.
229    pub(crate) readonly_rootfs: bool,
230}
231
232/// One parsed `dependsOn[]` entry on a container. Pairs the upstream
233/// container name with the condition that must hold before the dependent
234/// container is launched. AWS spells the conditions `START`, `COMPLETE`,
235/// `SUCCESS`, `HEALTHY` and treats anything else as an error at register
236/// time — we mirror that in [`parse_depends_on`].
237#[derive(Clone, Debug, PartialEq, Eq)]
238pub(crate) struct DependsOn {
239    pub container_name: String,
240    pub condition: DependsOnCondition,
241}
242
243/// `dependsOn[].condition` from the task definition. The variants map
244/// 1:1 to AWS's documented values; the launch loop polls docker for the
245/// matching predicate before starting the dependent container.
246#[derive(Clone, Copy, Debug, PartialEq, Eq)]
247pub(crate) enum DependsOnCondition {
248    /// Upstream container has been started (docker container exists and
249    /// is either running or has exited).
250    Start,
251    /// Upstream container has exited (any exit code).
252    Complete,
253    /// Upstream container has exited with code 0.
254    Success,
255    /// Upstream container's `Health.Status` is `healthy`. When the
256    /// upstream has no healthCheck configured, AWS treats this as
257    /// immediately satisfied — we do the same.
258    Healthy,
259}
260
261impl DependsOnCondition {
262    /// Parse the AWS-spelled condition string. Returns `None` for
263    /// unrecognised values so callers can surface a `ClientException`
264    /// at register time.
265    pub fn parse(raw: &str) -> Option<Self> {
266        match raw {
267            "START" => Some(Self::Start),
268            "COMPLETE" => Some(Self::Complete),
269            "SUCCESS" => Some(Self::Success),
270            "HEALTHY" => Some(Self::Healthy),
271            _ => None,
272        }
273    }
274
275    /// AWS-spelled string for this condition. Used in user-facing error
276    /// messages so timeout/dependency-failed reasons echo back the same
277    /// value the user wrote in their task definition.
278    pub fn as_aws_str(self) -> &'static str {
279        match self {
280            Self::Start => "START",
281            Self::Complete => "COMPLETE",
282            Self::Success => "SUCCESS",
283            Self::Healthy => "HEALTHY",
284        }
285    }
286}
287
288/// Container health check parsed from the ECS task definition. Each
289/// field maps 1:1 to a docker `--health-*` flag on `docker run`. AWS
290/// defaults: interval=30s, timeout=5s, retries=3, startPeriod=0s — we
291/// preserve those defaults at parse time so the argv builder always
292/// has concrete values to emit.
293#[derive(Clone, Debug, PartialEq, Eq)]
294pub(crate) struct HealthCheckSpec {
295    /// `command[]` from the task definition. The first element selects
296    /// the docker syntax: `CMD-SHELL` => `--health-cmd <rest joined by space>`,
297    /// `CMD` => `--health-cmd <rest joined by space>` (still routed to
298    /// `--health-cmd` because docker doesn't accept argv-form here),
299    /// `NONE` => no flag emitted (caller skips emitting healthcheck).
300    pub command: Vec<String>,
301    pub interval_seconds: u32,
302    pub timeout_seconds: u32,
303    pub retries: u32,
304    pub start_period_seconds: u32,
305}
306
307/// One entry in a container's `portMappings`. Mirrors the AWS shape so
308/// [`build_run_argv`] and the `networkBindings` response can share the
309/// same parsed representation.
310#[derive(Clone, Debug, PartialEq, Eq)]
311pub(crate) struct PortMapping {
312    pub container_port: u16,
313    /// `0` (or unset in the source JSON) means "use the same value as
314    /// containerPort" — host-mode default per AWS docs.
315    pub host_port: u16,
316    /// Lower-case `tcp` / `udp`. Defaults to `tcp` when omitted.
317    pub protocol: String,
318}
319
320/// One resolved `mountPoints` entry on a container plan. Computed at
321/// launch by joining the container definition's `mountPoints` against the
322/// task definition's `volumes` array. Each entry becomes a single
323/// `-v <source>:<containerPath>[:ro]` flag on `docker run`.
324///
325/// Source resolution by volume kind:
326/// - **host bind** (`volume.host.sourcePath` set): bind the host path
327///   into the container at `containerPath`.
328/// - **EFS** (`efsVolumeConfiguration` set): bind a host-side stub
329///   directory at `/tmp/fakecloud/efs/<filesystemId>[/<rootDirectory>]`
330///   so multiple tasks targeting the same filesystem id can share state
331///   the way real EFS would. The stub directory is created with
332///   `mkdir -p` ahead of `docker run`.
333/// - **FSx for Windows** (`fsxWindowsFileServerVolumeConfiguration` set):
334///   stub directory at `/tmp/fakecloud/fsx/<filesystemId>/<rootDirectory>`
335///   created the same way as EFS.
336/// - **Docker named volume** (`dockerVolumeConfiguration` set): pass the
337///   volume name through to docker as a named volume reference.
338/// - **Bare volume** (only `name` set, no host config): treated as an
339///   anonymous docker volume for that task — matches AWS's "Docker
340///   volumes" default scope.
341#[derive(Clone, Debug, PartialEq, Eq)]
342pub(crate) struct VolumeMount {
343    /// Left side of `-v`: a host path, a docker named volume, or a stub
344    /// directory under `/tmp/fakecloud/{efs,fsx}/...` for shared FS
345    /// emulation.
346    pub source: String,
347    /// Container-side path, taken verbatim from the container
348    /// definition's `mountPoints[].containerPath`.
349    pub container_path: String,
350    /// `mountPoints[].readOnly` honoured: when true, append `:ro` to the
351    /// `-v` flag so the bind/named volume is read-only inside the
352    /// container. Defaults to false (read-write) when omitted.
353    pub read_only: bool,
354}
355
356/// One `ulimits` entry. Becomes `--ulimit <name>=<soft>:<hard>`.
357#[derive(Clone, Debug, PartialEq, Eq)]
358pub(crate) struct Ulimit {
359    pub name: String,
360    pub soft_limit: i32,
361    pub hard_limit: i32,
362}
363
364/// One `linuxParameters.devices` entry. Becomes `--device <hostPath>:<containerPath><permissions>`.
365#[derive(Clone, Debug, PartialEq, Eq)]
366pub(crate) struct Device {
367    pub host_path: String,
368    pub container_path: String,
369    pub permissions: String,
370}
371
372/// One `linuxParameters.sysctl` entry. Becomes `--sysctl <name>=<value>`.
373#[derive(Clone, Debug, PartialEq, Eq)]
374pub(crate) struct Sysctl {
375    pub name: String,
376    pub value: String,
377}
378
379/// Parsed `linuxParameters` from the container definition.
380#[derive(Clone, Debug, PartialEq, Eq, Default)]
381pub(crate) struct LinuxParameters {
382    pub capabilities_add: Vec<String>,
383    pub capabilities_drop: Vec<String>,
384    pub devices: Vec<Device>,
385    pub init_process_enabled: bool,
386    pub shared_memory_size: Option<i32>,
387    pub sysctls: Vec<Sysctl>,
388    pub tmpfs: Vec<Tmpfs>,
389    pub privileged: bool,
390}
391
392/// One `linuxParameters.tmpfs` entry. Becomes `--tmpfs <containerPath>:size=<size>M<,options>*`.
393#[derive(Clone, Debug, PartialEq, Eq)]
394pub(crate) struct Tmpfs {
395    pub container_path: String,
396    pub size: i32,
397    pub mount_options: Vec<String>,
398}
399
400#[derive(Clone, Debug)]
401struct ResolvedContainerPlan {
402    plan: ContainerPlan,
403    env: Vec<(String, String)>,
404}
405
406/// Result of waiting for a task's lifetime-determining container.
407#[derive(Clone, Debug)]
408struct TaskExitOutcome {
409    /// Index into the started-containers list of the container whose exit
410    /// closed out the task. `None` only in degenerate cases — kept as
411    /// `Option` so `final_containers` indexing stays explicit.
412    exited_index: Option<usize>,
413    exit_code: i64,
414    stop_code: &'static str,
415}
416
417/// Per-container record persisted on the task. Mirrors the AWS Container
418/// shape but tracks the docker-side container id alongside ECS metadata.
419#[derive(Clone, Debug)]
420pub(crate) struct RunningContainer {
421    pub(crate) name: String,
422    pub(crate) container_id: String,
423    pub(crate) essential: bool,
424    pub(crate) exit_code: Option<i64>,
425    /// Resolved `networkBindings` for DescribeTasks. Computed from the
426    /// task definition's `portMappings` at launch and surfaced verbatim
427    /// in the per-container response.
428    pub(crate) network_bindings: Vec<serde_json::Value>,
429    /// Image digest captured from `docker inspect` after pull. AWS
430    /// surfaces this on the Container response so callers can pin which
431    /// exact image revision a task is running. `None` when the inspect
432    /// failed or the CLI didn't expose `RepoDigests`.
433    pub(crate) image_digest: Option<String>,
434}
435
436/// Pure decision: does the current set of containers warrant stopping
437/// the task? Returns true when any essential container has exited, or
438/// when every container has exited (regardless of essential). Mirrors
439/// AWS ECS task lifetime semantics.
440pub(crate) fn task_should_stop(containers: &[RunningContainer]) -> bool {
441    if containers.is_empty() {
442        return true;
443    }
444    let any_essential_exited = containers
445        .iter()
446        .any(|c| c.essential && c.exit_code.is_some());
447    if any_essential_exited {
448        return true;
449    }
450    containers.iter().all(|c| c.exit_code.is_some())
451}
452
453/// True if the task's `desired_status` in state is `STOPPED` — i.e. a
454/// StopTask / scale-down / DeleteService raced the launch and asked for this
455/// task to be killed. A missing task (deleted from state mid-launch) also
456/// counts as "stop": there's nothing left to keep running for.
457pub(crate) fn task_desired_stopped(
458    state: &SharedEcsState,
459    account_id: &str,
460    task_id: &str,
461) -> bool {
462    let accounts = state.read();
463    match accounts.get(account_id).and_then(|s| s.tasks.get(task_id)) {
464        Some(task) => task.desired_status == "STOPPED",
465        None => true,
466    }
467}
468
469fn build_container_plans(
470    state: &SharedEcsState,
471    account_id: &str,
472    task_id: &str,
473    _server_port: u16,
474) -> Result<Vec<ContainerPlan>, RuntimeError> {
475    let accounts = state.read();
476    let s = accounts
477        .get(account_id)
478        .ok_or_else(|| RuntimeError::ContainerStart("account missing".into()))?;
479    let task = s
480        .tasks
481        .get(task_id)
482        .ok_or_else(|| RuntimeError::ContainerStart("task missing".into()))?;
483    if task.containers.is_empty() {
484        return Err(RuntimeError::ContainerStart(
485            "task has no containers".into(),
486        ));
487    }
488    let has_task_role = task.task_role_arn.is_some();
489    let task_def = s
490        .task_definitions
491        .get(&task.family)
492        .and_then(|revs| revs.get(&task.revision));
493    let network_mode = task_def.and_then(|td| td.network_mode.clone());
494    // Index `volumes[]` by name so each container's `mountPoints[]` can
495    // resolve its volume in O(1). Real ECS rejects mountPoints that
496    // reference an undeclared volume at register time; we don't yet, so
497    // unresolved names just produce zero mounts at launch.
498    let volumes_by_name: std::collections::HashMap<String, &serde_json::Value> = task_def
499        .map(|td| {
500            td.volumes
501                .iter()
502                .filter_map(|v| {
503                    let name = v.get("name").and_then(|n| n.as_str())?;
504                    Some((name.to_string(), v))
505                })
506                .collect()
507        })
508        .unwrap_or_default();
509    let mut plans = Vec::with_capacity(task.containers.len());
510    for container in &task.containers {
511        let def = find_container_definition(s, &task.family, task.revision, &container.name);
512        let secrets_refs = def
513            .as_ref()
514            .and_then(|d| d.get("secrets").and_then(|v| v.as_array()).cloned())
515            .map(|arr| {
516                arr.iter()
517                    .filter_map(|e| {
518                        let name = e.get("name").and_then(|v| v.as_str())?.to_string();
519                        let value_from = e.get("valueFrom").and_then(|v| v.as_str())?.to_string();
520                        Some((name, value_from))
521                    })
522                    .collect::<Vec<_>>()
523            })
524            .unwrap_or_default();
525        let str_array = |key: &str| -> Vec<String> {
526            def.as_ref()
527                .and_then(|d| d.get(key).and_then(|v| v.as_array()).cloned())
528                .map(|arr| {
529                    arr.iter()
530                        .filter_map(|v| v.as_str().map(String::from))
531                        .collect::<Vec<_>>()
532                })
533                .unwrap_or_default()
534        };
535        let env = def
536            .as_ref()
537            .and_then(|d| d.get("environment").and_then(|v| v.as_array()).cloned())
538            .map(|arr| {
539                arr.iter()
540                    .filter_map(|e| {
541                        let k = e.get("name").and_then(|v| v.as_str())?;
542                        let v = e.get("value").and_then(|v| v.as_str()).unwrap_or("");
543                        Some((k.to_string(), v.to_string()))
544                    })
545                    .collect::<Vec<_>>()
546            })
547            .unwrap_or_default();
548        let port_mappings = def
549            .as_ref()
550            .and_then(|d| d.get("portMappings").and_then(|v| v.as_array()).cloned())
551            .map(|arr| {
552                arr.iter()
553                    .filter_map(parse_port_mapping)
554                    .collect::<Vec<_>>()
555            })
556            .unwrap_or_default();
557        let depends_on = def
558            .as_ref()
559            .and_then(|d| d.get("dependsOn").and_then(|v| v.as_array()).cloned())
560            .map(|arr| {
561                arr.iter()
562                    .filter_map(parse_depends_on_entry)
563                    .collect::<Vec<_>>()
564            })
565            .unwrap_or_default();
566        let health_check = def
567            .as_ref()
568            .and_then(|d| d.get("healthCheck"))
569            .and_then(parse_health_check);
570        let volume_mounts = def
571            .as_ref()
572            .and_then(|d| d.get("mountPoints").and_then(|v| v.as_array()).cloned())
573            .map(|arr| {
574                arr.iter()
575                    .filter_map(|mp| resolve_mount_point(mp, &volumes_by_name))
576                    .collect::<Vec<_>>()
577            })
578            .unwrap_or_default();
579        let ulimits = def
580            .as_ref()
581            .and_then(|d| d.get("ulimits").and_then(|v| v.as_array()).cloned())
582            .map(|arr| arr.iter().filter_map(parse_ulimit).collect::<Vec<_>>())
583            .unwrap_or_default();
584        let linux_parameters = def
585            .as_ref()
586            .and_then(|d| d.get("linuxParameters"))
587            .and_then(parse_linux_parameters);
588        let stop_timeout = def.as_ref().and_then(|d| {
589            d.get("stopTimeout")
590                .and_then(|v| v.as_u64())
591                .map(|n| n as u32)
592        });
593        let user = def
594            .as_ref()
595            .and_then(|d| d.get("user").and_then(|v| v.as_str()).map(String::from));
596        let working_directory = def.as_ref().and_then(|d| {
597            d.get("workingDirectory")
598                .and_then(|v| v.as_str())
599                .map(String::from)
600        });
601        let tty = def
602            .as_ref()
603            .and_then(|d| d.get("tty").and_then(|v| v.as_bool()))
604            .unwrap_or(false);
605        let interactive = def
606            .as_ref()
607            .and_then(|d| d.get("interactive").and_then(|v| v.as_bool()))
608            .unwrap_or(false);
609        let readonly_rootfs = def
610            .as_ref()
611            .and_then(|d| d.get("readonlyRootFilesystem").and_then(|v| v.as_bool()))
612            .unwrap_or(false);
613        plans.push(ContainerPlan {
614            container_name: container.name.clone(),
615            image: container.image.clone(),
616            env,
617            entry_point: str_array("entryPoint"),
618            command: str_array("command"),
619            secrets_refs,
620            essential: container.essential,
621            has_task_role,
622            port_mappings,
623            network_mode: network_mode.clone(),
624            depends_on,
625            health_check,
626            volume_mounts,
627            ulimits,
628            linux_parameters,
629            stop_timeout,
630            user,
631            working_directory,
632            tty,
633            interactive,
634            readonly_rootfs,
635        });
636    }
637    let plans = topo_sort_plans(plans);
638    Ok(plans)
639}
640
641/// Resolve one `mountPoints[]` entry against the indexed task-definition
642/// volumes. Returns `None` when:
643/// - the entry has no `containerPath` or `sourceVolume`,
644/// - the named volume isn't declared on the task definition.
645///
646/// Returns `Some(VolumeMount)` for every supported volume kind:
647/// host bind, EFS, FSx, named docker volume, anonymous docker volume.
648fn resolve_mount_point(
649    mount_point: &serde_json::Value,
650    volumes_by_name: &std::collections::HashMap<String, &serde_json::Value>,
651) -> Option<VolumeMount> {
652    let container_path = mount_point
653        .get("containerPath")
654        .and_then(|v| v.as_str())?
655        .to_string();
656    let source_volume = mount_point.get("sourceVolume").and_then(|v| v.as_str())?;
657    let read_only = mount_point
658        .get("readOnly")
659        .and_then(|v| v.as_bool())
660        .unwrap_or(false);
661    let volume = volumes_by_name.get(source_volume)?;
662    let source = resolve_volume_source(source_volume, volume)?;
663    Some(VolumeMount {
664        source,
665        container_path,
666        read_only,
667    })
668}
669
670/// Map a single task-definition `volumes[]` entry to the source side of a
671/// `docker run -v` flag. The matching here mirrors the AWS volume kinds:
672///
673/// 1. `host.sourcePath` -> use that path directly (bind mount).
674/// 2. `efsVolumeConfiguration.fileSystemId` -> stub directory under
675///    `/tmp/fakecloud/efs/<filesystemId>[/<rootDirectory>]`. Created with
676///    `mkdir -p` so different tasks targeting the same filesystem id
677///    share the same host directory, matching real EFS's "many tasks,
678///    one filesystem" semantics.
679/// 3. `fsxWindowsFileServerVolumeConfiguration.fileSystemId` -> stub
680///    directory under `/tmp/fakecloud/fsx/<filesystemId>/<rootDirectory>`.
681/// 4. `dockerVolumeConfiguration` -> the volume `name` itself (named
682///    docker volume; docker creates it on first reference).
683/// 5. Bare entry (only `name`) -> the volume `name` as an anonymous
684///    docker volume reference, matching AWS's "Docker volumes" default.
685///
686/// Returns `None` when the configuration is malformed (e.g. EFS without
687/// a fileSystemId).
688fn resolve_volume_source(name: &str, volume: &serde_json::Value) -> Option<String> {
689    if let Some(host) = volume.get("host") {
690        if let Some(path) = host.get("sourcePath").and_then(|v| v.as_str()) {
691            // Empty sourcePath means "anonymous host volume" — fall
692            // through to the named-volume default below.
693            if !path.is_empty() {
694                ensure_dir_exists(path);
695                return Some(path.to_string());
696            }
697        }
698    }
699    if let Some(efs) = volume.get("efsVolumeConfiguration") {
700        let fs_id = efs.get("fileSystemId").and_then(|v| v.as_str())?;
701        let root = efs
702            .get("rootDirectory")
703            .and_then(|v| v.as_str())
704            .unwrap_or("/");
705        return Some(shared_volume_name("efs", fs_id, root));
706    }
707    if let Some(fsx) = volume.get("fsxWindowsFileServerVolumeConfiguration") {
708        let fs_id = fsx.get("fileSystemId").and_then(|v| v.as_str())?;
709        let root = fsx
710            .get("rootDirectory")
711            .and_then(|v| v.as_str())
712            .unwrap_or("/");
713        return Some(shared_volume_name("fsx", fs_id, root));
714    }
715    if volume.get("dockerVolumeConfiguration").is_some() {
716        // Named docker volume — docker auto-creates it on first
717        // reference. Pass the volume name through verbatim.
718        return Some(name.to_string());
719    }
720    // Bare volume entry: anonymous docker volume keyed by name.
721    Some(name.to_string())
722}
723
724/// Compose the docker **named-volume** name for an EFS/FSx volume. A
725/// single shared volume per filesystem id when `rootDirectory` is unset
726/// or `/` (the EFS default mount target); otherwise the rootDirectory is
727/// folded into the name so distinct mount targets within one filesystem
728/// stay isolated. A docker named volume lives on the daemon rather than a
729/// host path, so tasks share state correctly *and* it works when
730/// fakecloud itself runs in a container (`FAKECLOUD_IN_CONTAINER=1`),
731/// where a host-path stub created inside fakecloud's own filesystem would
732/// resolve to an empty dir against the host daemon (issue #1539, bug 0.6).
733/// The segments are sanitized to docker's volume-name charset.
734fn shared_volume_name(kind: &str, fs_id: &str, root: &str) -> String {
735    let trimmed = root.trim_start_matches('/').trim_end_matches('/');
736    let fs_id = sanitize_volume_segment(fs_id);
737    if trimmed.is_empty() {
738        format!("fakecloud-{kind}-{fs_id}")
739    } else {
740        format!(
741            "fakecloud-{kind}-{fs_id}-{}",
742            sanitize_volume_segment(trimmed)
743        )
744    }
745}
746
747/// Map an arbitrary string to docker's volume-name charset by replacing
748/// every character outside `[A-Za-z0-9_.-]` with `-`.
749fn sanitize_volume_segment(s: &str) -> String {
750    s.chars()
751        .map(|c| {
752            if c.is_ascii_alphanumeric() || matches!(c, '_' | '.' | '-') {
753                c
754            } else {
755                '-'
756            }
757        })
758        .collect()
759}
760
761/// Best-effort `mkdir -p` so the EFS/FSx stub path exists before the
762/// first task tries to bind-mount it. Failures are ignored — docker
763/// will surface a clear error on the run, and unit tests don't have a
764/// writable `/tmp/fakecloud` in every sandbox.
765fn ensure_dir_exists(path: &str) {
766    let _ = std::fs::create_dir_all(path);
767}
768
769/// Parse one `dependsOn[]` entry. Returns `None` for malformed entries
770/// (missing `containerName`, unrecognised `condition`) so the caller
771/// can drop them silently from the launch plan — register-time
772/// validation already rejects bad values; this is a defensive fallback.
773fn parse_depends_on_entry(value: &serde_json::Value) -> Option<DependsOn> {
774    let container_name = value
775        .get("containerName")
776        .and_then(|v| v.as_str())?
777        .to_string();
778    let raw_condition = value.get("condition").and_then(|v| v.as_str())?;
779    let condition = DependsOnCondition::parse(raw_condition)?;
780    Some(DependsOn {
781        container_name,
782        condition,
783    })
784}
785
786/// Topologically sort container plans so `dependsOn` dependencies start
787/// before their dependants. Implements Kahn's algorithm with stable order:
788/// when multiple plans are ready, we keep their original declaration
789/// index, so a task without any dependsOn launches in the same order the
790/// user wrote in the task definition. Cycles fall through with the
791/// remaining plans appended in original order — the runtime will still
792/// launch every container; it just can't guarantee dependency ordering
793/// in that degenerate case. Cycles are rejected at register time
794/// (RegisterTaskDefinition -> validate_depends_on_acyclic), so reaching
795/// that branch from a real launch path means a bug elsewhere.
796fn topo_sort_plans(plans: Vec<ContainerPlan>) -> Vec<ContainerPlan> {
797    use std::collections::{HashMap, HashSet};
798    let names: HashSet<String> = plans.iter().map(|p| p.container_name.clone()).collect();
799    let index: HashMap<String, usize> = plans
800        .iter()
801        .enumerate()
802        .map(|(i, p)| (p.container_name.clone(), i))
803        .collect();
804    // in_degree[i] = number of unresolved dependencies for plan i. We
805    // ignore depends_on entries that name a container not in the task
806    // (real ECS rejects those at register time; our register path doesn't
807    // yet, so be defensive here).
808    let mut in_degree: Vec<usize> = plans
809        .iter()
810        .map(|p| {
811            p.depends_on
812                .iter()
813                .filter(|d| names.contains(&d.container_name))
814                .count()
815        })
816        .collect();
817    // dependants[i] = indices of plans that depend on plan i.
818    let mut dependants: Vec<Vec<usize>> = vec![Vec::new(); plans.len()];
819    for (i, p) in plans.iter().enumerate() {
820        for d in &p.depends_on {
821            if let Some(&di) = index.get(&d.container_name) {
822                dependants[di].push(i);
823            }
824        }
825    }
826    let mut ordered: Vec<ContainerPlan> = Vec::with_capacity(plans.len());
827    let mut emitted: Vec<bool> = vec![false; plans.len()];
828    loop {
829        // Pick the lowest-index plan whose in_degree is 0 to keep stable
830        // order across runs.
831        let next = (0..plans.len()).find(|&i| !emitted[i] && in_degree[i] == 0);
832        match next {
833            Some(i) => {
834                emitted[i] = true;
835                ordered.push(plans[i].clone());
836                for &di in &dependants[i] {
837                    if in_degree[di] > 0 {
838                        in_degree[di] -= 1;
839                    }
840                }
841            }
842            None => break,
843        }
844    }
845    // Cycle: append anything left in original order so we don't drop plans.
846    for (i, p) in plans.into_iter().enumerate() {
847        if !emitted[i] {
848            ordered.push(p);
849        }
850    }
851    ordered
852}
853
854/// Validate that `containerDefinitions[].dependsOn[]` graph is acyclic.
855/// Real ECS rejects cyclic dependencies at RegisterTaskDefinition time
856/// with a `ClientException`; we mirror that. Returns the offending pair
857/// of container names so the caller can produce a useful error.
858///
859/// Operates directly on the raw JSON definitions (rather than parsed
860/// `ContainerPlan`s) so register-time validation doesn't have to first
861/// build a full plan from a not-yet-stored task definition.
862pub(crate) fn find_depends_on_cycle(
863    container_definitions: &[serde_json::Value],
864) -> Option<(String, String)> {
865    use std::collections::HashMap;
866
867    let names: Vec<String> = container_definitions
868        .iter()
869        .filter_map(|c| c.get("name").and_then(|n| n.as_str()).map(String::from))
870        .collect();
871    let index: HashMap<&str, usize> = names
872        .iter()
873        .enumerate()
874        .map(|(i, n)| (n.as_str(), i))
875        .collect();
876
877    let mut adj: Vec<Vec<usize>> = vec![Vec::new(); names.len()];
878    for (i, cd) in container_definitions.iter().enumerate() {
879        if i >= names.len() {
880            continue;
881        }
882        let Some(deps) = cd.get("dependsOn").and_then(|v| v.as_array()) else {
883            continue;
884        };
885        for d in deps {
886            let Some(target) = d.get("containerName").and_then(|v| v.as_str()) else {
887                continue;
888            };
889            if let Some(&j) = index.get(target) {
890                // Edge: i depends on j -> for cycle DFS we walk from i to j.
891                adj[i].push(j);
892            }
893        }
894    }
895
896    // DFS with three-colour marking (white=0, gray=1, black=2). When we
897    // hit a gray neighbour we've closed a cycle; report the back-edge as
898    // the offending pair.
899    let mut state = vec![0u8; names.len()];
900    let mut stack: Vec<(usize, usize)> = Vec::new();
901    for start in 0..names.len() {
902        if state[start] != 0 {
903            continue;
904        }
905        stack.clear();
906        stack.push((start, 0));
907        state[start] = 1;
908        while let Some(&(node, next_edge)) = stack.last() {
909            if next_edge < adj[node].len() {
910                let nb = adj[node][next_edge];
911                stack.last_mut().unwrap().1 += 1;
912                match state[nb] {
913                    0 => {
914                        state[nb] = 1;
915                        stack.push((nb, 0));
916                    }
917                    1 => {
918                        return Some((names[node].clone(), names[nb].clone()));
919                    }
920                    _ => {}
921                }
922            } else {
923                state[node] = 2;
924                stack.pop();
925            }
926        }
927    }
928    None
929}
930
931/// Snapshot of the docker container state we care about for `dependsOn`
932/// gating: whether the container exists/started, whether it's exited,
933/// its exit code, and (when configured) its health status.
934#[derive(Debug, Clone)]
935struct InspectedState {
936    started: bool,
937    exited: bool,
938    exit_code: i64,
939    health: Option<String>,
940}
941
942/// One `docker inspect` call returning every field needed by
943/// [`condition_is_met`]. Returns `None` when the container doesn't exist
944/// yet or inspect fails — the caller will simply retry on the next poll.
945async fn inspect_container_state(cli: &str, container_id: &str) -> Option<InspectedState> {
946    // Compose all four fields into a single inspect format so the gate
947    // costs one process spawn per poll rather than four.
948    let format =
949        "{{.State.Status}}|{{.State.Running}}|{{.State.ExitCode}}|{{if .State.Health}}{{.State.Health.Status}}{{else}}<none>{{end}}";
950    let out = Command::new(cli)
951        .args(["inspect", "-f", format, container_id])
952        .output()
953        .await
954        .ok()?;
955    if !out.status.success() {
956        return None;
957    }
958    let raw = String::from_utf8_lossy(&out.stdout).trim().to_string();
959    let parts: Vec<&str> = raw.split('|').collect();
960    if parts.len() < 4 {
961        return None;
962    }
963    let status = parts[0];
964    let running = parts[1] == "true";
965    let exit_code: i64 = parts[2].parse().unwrap_or(-1);
966    let health = match parts[3] {
967        "<none>" | "" => None,
968        other => Some(other.to_string()),
969    };
970    // `created` is the brief moment between docker creating the
971    // container and the entrypoint running. Treat anything past
972    // `created` as "started" for the START condition.
973    let started = running || status == "exited" || status == "running" || status == "dead";
974    let exited = status == "exited" || status == "dead";
975    Some(InspectedState {
976        started,
977        exited,
978        exit_code,
979        health,
980    })
981}
982
983/// Decide whether the polled `state` satisfies a `dependsOn[].condition`.
984/// Encapsulates the AWS semantics so the polling loop is purely
985/// mechanical.
986fn condition_is_met(condition: DependsOnCondition, state: &InspectedState) -> bool {
987    match condition {
988        DependsOnCondition::Start => state.started,
989        DependsOnCondition::Complete => state.exited,
990        DependsOnCondition::Success => state.exited && state.exit_code == 0,
991        DependsOnCondition::Healthy => state.health.as_deref() == Some("healthy"),
992    }
993}
994
995/// Test-only re-export of [`parse_port_mapping`] so sibling test modules
996/// can lock in the default-port / default-protocol behaviour without us
997/// widening the visibility of the parser itself.
998#[cfg(test)]
999pub(crate) fn __test_parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
1000    parse_port_mapping(value)
1001}
1002
1003/// Parse a `healthCheck` block from a task definition's container
1004/// definition. Returns `None` for missing `command` or for a command
1005/// whose first token is `NONE` (the AWS-documented "disable healthcheck
1006/// inherited from image" sentinel — emit no flags rather than a `none`
1007/// healthcheck). Defaults follow AWS: 30s/5s/3/0s.
1008fn parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
1009    let cmd_arr = value.get("command")?.as_array()?;
1010    let command: Vec<String> = cmd_arr
1011        .iter()
1012        .filter_map(|v| v.as_str().map(String::from))
1013        .collect();
1014    if command.is_empty() {
1015        return None;
1016    }
1017    if command.first().map(|s| s.as_str()) == Some("NONE") {
1018        return None;
1019    }
1020    let read_u32 = |key: &str, default: u32| -> u32 {
1021        value
1022            .get(key)
1023            .and_then(|v| v.as_i64())
1024            .filter(|n| (0..=u32::MAX as i64).contains(n))
1025            .map(|n| n as u32)
1026            .unwrap_or(default)
1027    };
1028    Some(HealthCheckSpec {
1029        command,
1030        interval_seconds: read_u32("interval", 30),
1031        timeout_seconds: read_u32("timeout", 5),
1032        retries: read_u32("retries", 3),
1033        start_period_seconds: read_u32("startPeriod", 0),
1034    })
1035}
1036
1037/// Parse one `ulimits` entry from the container definition JSON.
1038fn parse_ulimit(value: &serde_json::Value) -> Option<Ulimit> {
1039    let name = value.get("name").and_then(|v| v.as_str())?;
1040    let soft = value
1041        .get("softLimit")
1042        .and_then(|v| v.as_i64())
1043        .filter(|n| *n >= 0)? as i32;
1044    let hard = value
1045        .get("hardLimit")
1046        .and_then(|v| v.as_i64())
1047        .filter(|n| *n >= 0)? as i32;
1048    Some(Ulimit {
1049        name: name.to_string(),
1050        soft_limit: soft,
1051        hard_limit: hard,
1052    })
1053}
1054
1055/// Parse `linuxParameters` from the container definition JSON.
1056fn parse_linux_parameters(value: &serde_json::Value) -> Option<LinuxParameters> {
1057    let mut lp = LinuxParameters::default();
1058    if let Some(arr) = value
1059        .get("capabilities")
1060        .and_then(|v| v.get("add"))
1061        .and_then(|v| v.as_array())
1062    {
1063        lp.capabilities_add = arr
1064            .iter()
1065            .filter_map(|v| v.as_str().map(String::from))
1066            .collect();
1067    }
1068    if let Some(arr) = value
1069        .get("capabilities")
1070        .and_then(|v| v.get("drop"))
1071        .and_then(|v| v.as_array())
1072    {
1073        lp.capabilities_drop = arr
1074            .iter()
1075            .filter_map(|v| v.as_str().map(String::from))
1076            .collect();
1077    }
1078    if let Some(arr) = value.get("devices").and_then(|v| v.as_array()) {
1079        lp.devices = arr.iter().filter_map(parse_device).collect();
1080    }
1081    lp.init_process_enabled = value
1082        .get("initProcessEnabled")
1083        .and_then(|v| v.as_bool())
1084        .unwrap_or(false);
1085    lp.shared_memory_size = value
1086        .get("sharedMemorySize")
1087        .and_then(|v| v.as_i64())
1088        .map(|n| n as i32);
1089    if let Some(arr) = value.get("sysctl").and_then(|v| v.as_array()) {
1090        lp.sysctls = arr.iter().filter_map(parse_sysctl).collect();
1091    }
1092    if let Some(arr) = value.get("tmpfs").and_then(|v| v.as_array()) {
1093        lp.tmpfs = arr.iter().filter_map(parse_tmpfs).collect();
1094    }
1095    lp.privileged = value
1096        .get("privileged")
1097        .and_then(|v| v.as_bool())
1098        .unwrap_or(false);
1099    Some(lp)
1100}
1101
1102fn parse_device(value: &serde_json::Value) -> Option<Device> {
1103    let host_path = value.get("hostPath").and_then(|v| v.as_str())?.to_string();
1104    let container_path = value
1105        .get("containerPath")
1106        .and_then(|v| v.as_str())?
1107        .to_string();
1108    let permissions = value
1109        .get("permissions")
1110        .and_then(|v| v.as_str())
1111        .unwrap_or("rwm")
1112        .to_string();
1113    Some(Device {
1114        host_path,
1115        container_path,
1116        permissions,
1117    })
1118}
1119
1120fn parse_sysctl(value: &serde_json::Value) -> Option<Sysctl> {
1121    let name = value.get("name").and_then(|v| v.as_str())?.to_string();
1122    let value_str = value.get("value").and_then(|v| v.as_str())?.to_string();
1123    Some(Sysctl {
1124        name,
1125        value: value_str,
1126    })
1127}
1128
1129fn parse_tmpfs(value: &serde_json::Value) -> Option<Tmpfs> {
1130    let container_path = value
1131        .get("containerPath")
1132        .and_then(|v| v.as_str())?
1133        .to_string();
1134    let size = value
1135        .get("size")
1136        .and_then(|v| v.as_i64())
1137        .filter(|n| *n > 0)? as i32;
1138    let mount_options = value
1139        .get("mountOptions")
1140        .and_then(|v| v.as_array())
1141        .map(|arr| {
1142            arr.iter()
1143                .filter_map(|v| v.as_str().map(String::from))
1144                .collect()
1145        })
1146        .unwrap_or_default();
1147    Some(Tmpfs {
1148        container_path,
1149        size,
1150        mount_options,
1151    })
1152}
1153
1154/// Render a [`HealthCheckSpec`] into the docker run flags that emulate
1155/// the equivalent ECS healthCheck. AWS's `command[0]` is a sentinel
1156/// (`CMD-SHELL`/`CMD`/`NONE`); docker's `--health-cmd` always takes a
1157/// single shell-string, so we collapse the remaining tokens with spaces
1158/// for either sentinel — matching how docker itself stringifies HEALTHCHECK
1159/// CMD ["a","b"] back to a shell string at inspect time.
1160pub(crate) fn render_health_flags(hc: &HealthCheckSpec) -> Vec<String> {
1161    if hc.command.len() < 2 {
1162        return Vec::new();
1163    }
1164    let cmd_kind = hc.command[0].as_str();
1165    if cmd_kind != "CMD" && cmd_kind != "CMD-SHELL" {
1166        return Vec::new();
1167    }
1168    let cmd_string = hc.command[1..].join(" ");
1169    vec![
1170        "--health-cmd".into(),
1171        cmd_string,
1172        format!("--health-interval={}s", hc.interval_seconds),
1173        format!("--health-timeout={}s", hc.timeout_seconds),
1174        format!("--health-retries={}", hc.retries),
1175        format!("--health-start-period={}s", hc.start_period_seconds),
1176    ]
1177}
1178
1179/// Test-only re-export of [`parse_health_check`] so unit tests in
1180/// sibling modules can lock in the AWS default-fill behaviour without
1181/// us widening the parser's visibility.
1182#[cfg(test)]
1183pub(crate) fn __test_parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
1184    parse_health_check(value)
1185}
1186
1187/// Map a docker `.State.Health.Status` value to the ECS `healthStatus`
1188/// shape. Docker emits `starting|healthy|unhealthy|none|""` (empty when
1189/// the image has no HEALTHCHECK and we didn't add one). ECS only knows
1190/// `HEALTHY|UNHEALTHY|UNKNOWN`, so anything that isn't a clean healthy/
1191/// unhealthy lands in `UNKNOWN`.
1192pub(crate) fn docker_health_to_ecs(raw: &str) -> &'static str {
1193    match raw.trim().to_ascii_lowercase().as_str() {
1194        "healthy" => "HEALTHY",
1195        "unhealthy" => "UNHEALTHY",
1196        _ => "UNKNOWN",
1197    }
1198}
1199
1200/// Parse a single `portMappings[]` entry. Returns `None` for entries
1201/// that are missing `containerPort` or have a value out of `u16` range.
1202/// Defaults: `hostPort` -> `containerPort`, `protocol` -> `tcp`.
1203fn parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
1204    let container_port = value
1205        .get("containerPort")
1206        .and_then(|v| v.as_i64())
1207        .filter(|n| (0..=u16::MAX as i64).contains(n))? as u16;
1208    let host_port_raw = value
1209        .get("hostPort")
1210        .and_then(|v| v.as_i64())
1211        .filter(|n| (0..=u16::MAX as i64).contains(n))
1212        .map(|n| n as u16)
1213        .unwrap_or(0);
1214    let host_port = if host_port_raw == 0 {
1215        container_port
1216    } else {
1217        host_port_raw
1218    };
1219    let protocol = value
1220        .get("protocol")
1221        .and_then(|v| v.as_str())
1222        .map(|s| s.to_ascii_lowercase())
1223        .unwrap_or_else(|| "tcp".to_string());
1224    Some(PortMapping {
1225        container_port,
1226        host_port,
1227        protocol,
1228    })
1229}
1230
1231/// The `fakecloud-instance=fakecloud-<pid>` ownership label value, matching
1232/// exactly how RDS/ElastiCache/Lambda/EC2(Docker) construct it. The shared
1233/// startup reaper lists containers carrying this label, parses the owning
1234/// PID, and removes any whose owner is no longer alive. Returns the full
1235/// `key=value` string ready to follow a `--label` flag.
1236pub(crate) fn fakecloud_instance_label() -> String {
1237    format!("fakecloud-instance=fakecloud-{}", std::process::id())
1238}
1239
1240/// Build the docker `run` argv for a single container plan. Pure so unit
1241/// tests can assert on flag ordering / `--publish` translation without
1242/// shelling out. The returned vector is everything *after* the binary
1243/// name (i.e. starts with `run`, ends with the user-supplied command
1244/// args).
1245pub(crate) fn build_run_argv(
1246    plan: &ContainerPlan,
1247    env: &[(String, String)],
1248    task_id: &str,
1249    host_alias: &str,
1250    add_host_arg: Option<&str>,
1251    run_image: &str,
1252    awsvpc_network_ready: bool,
1253) -> Vec<String> {
1254    let mut argv: Vec<String> = Vec::new();
1255    argv.push("run".into());
1256    argv.push("-d".into());
1257    argv.push("--name".into());
1258    argv.push(format!("{}-{}", task_id, plan.container_name));
1259    argv.push("--label".into());
1260    argv.push(format!("fakecloud-ecs-task={}", task_id));
1261    argv.push("--label".into());
1262    argv.push(format!("fakecloud-ecs-container={}", plan.container_name));
1263    // Ownership label shared with RDS/ElastiCache/Lambda/EC2(Docker). The
1264    // startup reaper (`fakecloud-server::reaper`) filters strictly on
1265    // `label=fakecloud-instance` and parses the owning PID out of the value
1266    // (`fakecloud-<pid>`). Without this, ECS task containers (and their
1267    // host-port publishes / awsvpc networks) leak unreapably after an
1268    // ungraceful restart. See fakecloud_instance_label().
1269    argv.push("--label".into());
1270    argv.push(fakecloud_instance_label());
1271    // Inject `--add-host host.docker.internal:<ip>` only for docker;
1272    // podman provides `host.containers.internal` natively and rejects
1273    // the host-gateway mapping (issue #1539).
1274    if let Some(arg) = add_host_arg {
1275        argv.push("--add-host".into());
1276        argv.push(arg.to_string());
1277    }
1278    let use_awsvpc_network = plan.network_mode.as_deref() == Some("awsvpc") && awsvpc_network_ready;
1279    if use_awsvpc_network {
1280        argv.push("--network".into());
1281        argv.push(format!("fakecloud-ecs-{}", task_id));
1282    }
1283    // `awsvpc` puts the container on a per-task ENI; emulating that on a
1284    // local docker host means *not* publishing to the host port table.
1285    // Bridge / host / default network modes still get `--publish`. If
1286    // the awsvpc per-task network creation failed and we fell back to
1287    // bridge, we DO want to publish so the container is reachable.
1288    let publish_ports = !use_awsvpc_network;
1289    if publish_ports {
1290        for pm in &plan.port_mappings {
1291            argv.push("--publish".into());
1292            argv.push(format!(
1293                "{}:{}/{}",
1294                pm.container_port, pm.host_port, pm.protocol
1295            ));
1296        }
1297    }
1298    if let Some(ref hc) = plan.health_check {
1299        argv.extend(render_health_flags(hc));
1300    }
1301    let http_alias_prefix = format!("http://{host_alias}:");
1302    let https_alias_prefix = format!("https://{host_alias}:");
1303    for (k, v) in env {
1304        let transformed = v
1305            .replace("http://127.0.0.1:", http_alias_prefix.as_str())
1306            .replace("https://127.0.0.1:", https_alias_prefix.as_str())
1307            .replace("http://localhost:", http_alias_prefix.as_str())
1308            .replace("https://localhost:", https_alias_prefix.as_str());
1309        argv.push("-e".into());
1310        argv.push(format!("{}={}", k, transformed));
1311    }
1312    // Volume mounts: one `-v` flag per mountPoints entry, with the
1313    // source resolved from the task definition's `volumes[]`. EFS and
1314    // FSx stubs were materialised on the host (mkdir -p) before this
1315    // function returns, so docker can bind them straight in.
1316    for vm in &plan.volume_mounts {
1317        argv.push("-v".into());
1318        let suffix = if vm.read_only { ":ro" } else { "" };
1319        argv.push(format!("{}:{}{}", vm.source, vm.container_path, suffix));
1320    }
1321    for ul in &plan.ulimits {
1322        argv.push("--ulimit".into());
1323        argv.push(format!("{}={}:{}", ul.name, ul.soft_limit, ul.hard_limit));
1324    }
1325    if let Some(ref lp) = plan.linux_parameters {
1326        for cap in &lp.capabilities_add {
1327            argv.push("--cap-add".into());
1328            argv.push(cap.clone());
1329        }
1330        for cap in &lp.capabilities_drop {
1331            argv.push("--cap-drop".into());
1332            argv.push(cap.clone());
1333        }
1334        for dev in &lp.devices {
1335            argv.push("--device".into());
1336            argv.push(format!(
1337                "{}:{}{}",
1338                dev.host_path, dev.container_path, dev.permissions
1339            ));
1340        }
1341        if lp.init_process_enabled {
1342            argv.push("--init".into());
1343        }
1344        if let Some(size) = lp.shared_memory_size {
1345            argv.push("--shm-size".into());
1346            argv.push(format!("{}m", size));
1347        }
1348        for sys in &lp.sysctls {
1349            argv.push("--sysctl".into());
1350            argv.push(format!("{}={}", sys.name, sys.value));
1351        }
1352        for tmp in &lp.tmpfs {
1353            let mut opts = tmp.mount_options.join(",");
1354            if !opts.is_empty() {
1355                opts = format!(",{}", opts);
1356            }
1357            argv.push("--tmpfs".into());
1358            argv.push(format!("{}:size={}M{}", tmp.container_path, tmp.size, opts));
1359        }
1360        if lp.privileged {
1361            argv.push("--privileged".into());
1362        }
1363    }
1364    if let Some(timeout) = plan.stop_timeout {
1365        argv.push("--stop-timeout".into());
1366        argv.push(format!("{}", timeout));
1367    }
1368    if let Some(ref user) = plan.user {
1369        argv.push("--user".into());
1370        argv.push(user.clone());
1371    }
1372    if let Some(ref wd) = plan.working_directory {
1373        argv.push("--workdir".into());
1374        argv.push(wd.clone());
1375    }
1376    if plan.tty {
1377        argv.push("--tty".into());
1378    }
1379    if plan.interactive {
1380        argv.push("--interactive".into());
1381    }
1382    if plan.readonly_rootfs {
1383        argv.push("--read-only".into());
1384    }
1385    if let Some(first) = plan.entry_point.first() {
1386        argv.push("--entrypoint".into());
1387        argv.push(first.clone());
1388    }
1389    argv.push(run_image.to_string());
1390    for arg in plan.entry_point.iter().skip(1) {
1391        argv.push(arg.clone());
1392    }
1393    for arg in &plan.command {
1394        argv.push(arg.clone());
1395    }
1396    argv
1397}
1398
1399/// Render `networkBindings` JSON for a launched container. Empty under
1400/// `awsvpc` (the equivalent info goes on the task's ENI attachments) and
1401/// for containers without `portMappings`.
1402pub(crate) fn network_bindings_for(plan: &ContainerPlan) -> Vec<serde_json::Value> {
1403    if plan.network_mode.as_deref() == Some("awsvpc") {
1404        return Vec::new();
1405    }
1406    plan.port_mappings
1407        .iter()
1408        .map(|pm| {
1409            serde_json::json!({
1410                "bindIP": "0.0.0.0",
1411                "containerPort": pm.container_port,
1412                "hostPort": pm.host_port,
1413                "protocol": pm.protocol,
1414            })
1415        })
1416        .collect()
1417}
1418
1419/// Compute ELBv2 target registrations for a task based on its service's
1420/// loadBalancers configuration. Returns (target_group_arn, [(target_id, port)])
1421/// for each target group that should receive this task.
1422#[allow(clippy::type_complexity)]
1423pub(crate) fn compute_elbv2_targets(
1424    ecs_state: &crate::state::EcsState,
1425    task: &crate::state::Task,
1426) -> Vec<(String, Vec<(String, Option<i64>)>)> {
1427    let mut result = Vec::new();
1428    let Some(group) = task.group.as_deref() else {
1429        return result;
1430    };
1431    let service_name = group.strip_prefix("service:").unwrap_or(group);
1432    let key = crate::state::EcsState::service_key(&task.cluster_name, service_name);
1433    let Some(service) = ecs_state.services.get(&key) else {
1434        return result;
1435    };
1436
1437    let network_mode = ecs_state
1438        .task_definitions
1439        .get(&task.family)
1440        .and_then(|revs| revs.get(&task.revision))
1441        .and_then(|td| td.network_mode.as_deref());
1442
1443    for lb in &service.load_balancers {
1444        let tg_arn = lb.get("targetGroupArn").and_then(|v| v.as_str());
1445        let container_name = lb.get("containerName").and_then(|v| v.as_str());
1446        let container_port = lb.get("containerPort").and_then(|v| v.as_i64());
1447        let Some(tg_arn) = tg_arn else { continue };
1448        let Some(container_name) = container_name else {
1449            continue;
1450        };
1451
1452        let target_id = if network_mode == Some("awsvpc") {
1453            task.attachments
1454                .iter()
1455                .find(|a| a.attachment_type == "eni")
1456                .and_then(|eni| {
1457                    eni.details
1458                        .iter()
1459                        .find(|d| d.name == "privateIPv4Address")
1460                        .map(|d| d.value.clone())
1461                })
1462        } else {
1463            Some("127.0.0.1".to_string())
1464        };
1465
1466        let port = if network_mode == Some("awsvpc") {
1467            container_port
1468        } else {
1469            task.containers
1470                .iter()
1471                .find(|c| c.name == container_name)
1472                .and_then(|c| {
1473                    c.network_bindings
1474                        .iter()
1475                        .find(|nb| {
1476                            nb.get("containerPort").and_then(|v| v.as_i64()) == container_port
1477                        })
1478                        .and_then(|nb| nb.get("hostPort").and_then(|v| v.as_i64()))
1479                })
1480        };
1481
1482        if let Some(id) = target_id {
1483            if let Some(entry) = result.iter_mut().find(|(arn, _)| arn == tg_arn) {
1484                entry.1.push((id, port));
1485            } else {
1486                result.push((tg_arn.to_string(), vec![(id, port)]));
1487            }
1488        }
1489    }
1490    result
1491}
1492
1493struct TaskSnapshot {
1494    task_arn: String,
1495    cluster_arn: String,
1496    launch_type: String,
1497    group: Option<String>,
1498    task_definition_arn: String,
1499    containers: serde_json::Value,
1500}
1501
1502fn snapshot_task(state: &SharedEcsState, account_id: &str, task_id: &str) -> Option<TaskSnapshot> {
1503    let accounts = state.read();
1504    let s = accounts.get(account_id)?;
1505    let task = s.tasks.get(task_id)?;
1506    Some(TaskSnapshot {
1507        task_arn: task.task_arn.clone(),
1508        cluster_arn: task.cluster_arn.clone(),
1509        launch_type: task.launch_type.clone(),
1510        group: task.group.clone(),
1511        task_definition_arn: task.task_definition_arn.clone(),
1512        containers: serde_json::Value::Array(
1513            task.containers
1514                .iter()
1515                .map(|c| {
1516                    serde_json::json!({
1517                        "containerArn": c.container_arn,
1518                        "name": c.name,
1519                        "image": c.image,
1520                        "lastStatus": c.last_status,
1521                        "exitCode": c.exit_code,
1522                        "reason": c.reason,
1523                    })
1524                })
1525                .collect(),
1526        ),
1527    })
1528}
1529
1530/// Build an isolated docker config directory with Basic auth for
1531/// fakecloud ECR. Lets `docker pull/push/tag` work against the local OCI
1532/// v2 registry without requiring the user to run
1533/// `aws ecr get-login-password | docker login` first. Authorizes every host
1534/// fakecloud's ECR can be addressed by -- `127.0.0.1` (fakecloud on the host),
1535/// `host.docker.internal` (Docker) and `host.containers.internal` (podman) when
1536/// fakecloud runs in a container and pull URIs are rewritten to the sibling
1537/// host. Centralized in container_net so Lambda and ECS can't drift (bug-audit
1538/// 2026-06-20, 0.B2).
1539fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
1540    let dir = TempDir::new().ok()?;
1541    let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-ecs-runtime");
1542    let auths: serde_json::Map<String, serde_json::Value> =
1543        fakecloud_core::container_net::registry_auth_hosts(server_port)
1544            .into_iter()
1545            .map(|host| (host, serde_json::json!({ "auth": auth })))
1546            .collect();
1547    let config = serde_json::json!({ "auths": auths });
1548    std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
1549    Some(dir)
1550}
1551
1552fn find_container_definition(
1553    state: &crate::state::EcsState,
1554    family: &str,
1555    revision: i32,
1556    name: &str,
1557) -> Option<serde_json::Value> {
1558    state
1559        .task_definitions
1560        .get(family)?
1561        .get(&revision)?
1562        .container_definitions
1563        .iter()
1564        .find(|c| c.get("name").and_then(|v| v.as_str()) == Some(name))
1565        .cloned()
1566}
1567
1568fn mark_pull_started(state: &SharedEcsState, account_id: &str, task_id: &str) {
1569    let mut accounts = state.write();
1570    let Some(s) = accounts.get_mut(account_id) else {
1571        return;
1572    };
1573    let task_arn_cluster = s
1574        .tasks
1575        .get(task_id)
1576        .map(|t| (t.task_arn.clone(), t.cluster_arn.clone()));
1577    if let Some(task) = s.tasks.get_mut(task_id) {
1578        task.pull_started_at = Some(Utc::now());
1579    }
1580    if let Some((arn, cluster_arn)) = task_arn_cluster {
1581        s.push_event(LifecycleEvent {
1582            at: Utc::now(),
1583            event_type: "PullStarted".into(),
1584            task_arn: Some(arn),
1585            cluster_arn: Some(cluster_arn),
1586            last_status: Some("PENDING".into()),
1587            detail: serde_json::json!({}),
1588        });
1589    }
1590}
1591
1592fn mark_pull_stopped(state: &SharedEcsState, account_id: &str, task_id: &str) {
1593    let mut accounts = state.write();
1594    let Some(s) = accounts.get_mut(account_id) else {
1595        return;
1596    };
1597    if let Some(task) = s.tasks.get_mut(task_id) {
1598        task.pull_stopped_at = Some(Utc::now());
1599    }
1600}
1601
1602pub(crate) fn mark_running_multi(
1603    state: &SharedEcsState,
1604    account_id: &str,
1605    task_id: &str,
1606    started: &[RunningContainer],
1607) {
1608    let mut accounts = state.write();
1609    let Some(s) = accounts.get_mut(account_id) else {
1610        return;
1611    };
1612    let (arn, cluster_arn) = {
1613        let Some(task) = s.tasks.get_mut(task_id) else {
1614            return;
1615        };
1616        task.last_status = "RUNNING".into();
1617        task.connectivity = "CONNECTED".into();
1618        task.connectivity_at = Some(Utc::now());
1619        task.started_at = Some(Utc::now());
1620        for rc in started {
1621            if let Some(c) = task.containers.iter_mut().find(|c| c.name == rc.name) {
1622                c.runtime_id = Some(rc.container_id.clone());
1623                c.last_status = "RUNNING".into();
1624                c.network_bindings = rc.network_bindings.clone();
1625                if rc.image_digest.is_some() {
1626                    c.image_digest = rc.image_digest.clone();
1627                }
1628            }
1629        }
1630        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1631            cluster.running_tasks_count += 1;
1632            if cluster.pending_tasks_count > 0 {
1633                cluster.pending_tasks_count -= 1;
1634            }
1635        }
1636        if let Some(ref ci_arn) = task.container_instance_arn {
1637            if let Some(ci) = s
1638                .container_instances
1639                .values_mut()
1640                .find(|ci| ci.container_instance_arn == *ci_arn)
1641            {
1642                ci.running_tasks_count += 1;
1643                if ci.pending_tasks_count > 0 {
1644                    ci.pending_tasks_count -= 1;
1645                }
1646            }
1647        }
1648        (task.task_arn.clone(), task.cluster_arn.clone())
1649    };
1650    s.push_event(LifecycleEvent {
1651        at: Utc::now(),
1652        event_type: "TaskStateChange".into(),
1653        task_arn: Some(arn),
1654        cluster_arn: Some(cluster_arn),
1655        last_status: Some("RUNNING".into()),
1656        detail: serde_json::json!({}),
1657    });
1658}
1659
1660#[allow(clippy::too_many_arguments)]
1661fn finalize_stopped_multi(
1662    state: &SharedEcsState,
1663    account_id: &str,
1664    task_id: &str,
1665    final_containers: &[RunningContainer],
1666    primary_exit_code: i64,
1667    captured: &str,
1668    stop_code: &str,
1669    stopped_reason: Option<String>,
1670) {
1671    let mut accounts = state.write();
1672    let Some(s) = accounts.get_mut(account_id) else {
1673        return;
1674    };
1675    let (arn, cluster_arn) = {
1676        let Some(task) = s.tasks.get_mut(task_id) else {
1677            return;
1678        };
1679        task.last_status = "STOPPED".into();
1680        task.desired_status = "STOPPED".into();
1681        task.stopping_at = task.stopping_at.or(Some(Utc::now()));
1682        task.stopped_at = Some(Utc::now());
1683        task.stop_code = Some(stop_code.into());
1684        task.stopped_reason = stopped_reason.or(Some(format!("Exit code {}", primary_exit_code)));
1685        task.captured_logs = captured.to_string();
1686        for c in task.containers.iter_mut() {
1687            c.last_status = "STOPPED".into();
1688            if c.exit_code.is_none() {
1689                let mapped = final_containers
1690                    .iter()
1691                    .find(|r| r.name == c.name)
1692                    .and_then(|r| r.exit_code);
1693                c.exit_code = mapped.or(Some(primary_exit_code));
1694            }
1695        }
1696        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1697            if cluster.running_tasks_count > 0 {
1698                cluster.running_tasks_count -= 1;
1699            }
1700        }
1701        if let Some(ref ci_arn) = task.container_instance_arn {
1702            if let Some(ci) = s
1703                .container_instances
1704                .values_mut()
1705                .find(|ci| ci.container_instance_arn == *ci_arn)
1706            {
1707                if ci.running_tasks_count > 0 {
1708                    ci.running_tasks_count -= 1;
1709                }
1710            }
1711        }
1712        (task.task_arn.clone(), task.cluster_arn.clone())
1713    };
1714    s.push_event(LifecycleEvent {
1715        at: Utc::now(),
1716        event_type: "TaskStateChange".into(),
1717        task_arn: Some(arn),
1718        cluster_arn: Some(cluster_arn),
1719        last_status: Some("STOPPED".into()),
1720        detail: serde_json::json!({
1721            "exitCode": primary_exit_code,
1722            "stopCode": stop_code,
1723        }),
1724    });
1725}
1726
1727fn finalize_failure(state: &SharedEcsState, account_id: &str, task_id: &str, reason: &str) {
1728    let mut accounts = state.write();
1729    let Some(s) = accounts.get_mut(account_id) else {
1730        return;
1731    };
1732    let (arn, cluster_arn) = {
1733        let Some(task) = s.tasks.get_mut(task_id) else {
1734            return;
1735        };
1736        // Capture the prior status before we clobber it: if the task had
1737        // already reached RUNNING when execution failed (e.g. `docker wait`
1738        // blew up after the container started), we owe the cluster a
1739        // running-tasks decrement. Tasks that died before RUNNING only
1740        // ever incremented pendingTasksCount.
1741        let was_running = task.last_status == "RUNNING";
1742        task.last_status = "STOPPED".into();
1743        task.desired_status = "STOPPED".into();
1744        task.stopped_at = Some(Utc::now());
1745        task.stop_code = Some("TaskFailedToStart".into());
1746        task.stopped_reason = Some(reason.to_string());
1747        // Surface the failure reason on the /logs endpoint — without this,
1748        // a task that never reached RUNNING returns an empty log string,
1749        // leaving E2E assertions with no diagnostic.
1750        task.captured_logs = format!("[task failed to start]: {reason}");
1751        for c in task.containers.iter_mut() {
1752            c.last_status = "STOPPED".into();
1753            c.reason = Some(reason.to_string());
1754        }
1755        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1756            if was_running {
1757                if cluster.running_tasks_count > 0 {
1758                    cluster.running_tasks_count -= 1;
1759                }
1760            } else if cluster.pending_tasks_count > 0 {
1761                cluster.pending_tasks_count -= 1;
1762            }
1763        }
1764        if let Some(ref ci_arn) = task.container_instance_arn {
1765            if let Some(ci) = s
1766                .container_instances
1767                .values_mut()
1768                .find(|ci| ci.container_instance_arn == *ci_arn)
1769            {
1770                if was_running {
1771                    if ci.running_tasks_count > 0 {
1772                        ci.running_tasks_count -= 1;
1773                    }
1774                } else if ci.pending_tasks_count > 0 {
1775                    ci.pending_tasks_count -= 1;
1776                }
1777            }
1778        }
1779        (task.task_arn.clone(), task.cluster_arn.clone())
1780    };
1781    s.push_event(LifecycleEvent {
1782        at: Utc::now(),
1783        event_type: "TaskFailedToStart".into(),
1784        task_arn: Some(arn),
1785        cluster_arn: Some(cluster_arn),
1786        last_status: Some("STOPPED".into()),
1787        detail: serde_json::json!({ "reason": reason }),
1788    });
1789}
1790
1791/// Short helper for tests + snapshot code to sleep between state
1792/// transitions. Exposed on the crate boundary to keep test timing
1793/// centralized.
1794pub async fn sleep(duration: Duration) {
1795    tokio::time::sleep(duration).await;
1796}
1797
1798#[cfg(test)]
1799mod tests {
1800    use super::*;
1801    use crate::state::{EcsState, Task};
1802    use fakecloud_aws::arn::Arn;
1803    use fakecloud_core::multi_account::MultiAccountState;
1804    use parking_lot::RwLock;
1805    use std::sync::Arc;
1806
1807    #[test]
1808    fn cli_available_for_known_missing_binary_is_false() {
1809        assert!(!fakecloud_core::container_net::cli_available(
1810            "definitely-not-a-real-cli-binary-xyz"
1811        ));
1812    }
1813
1814    #[test]
1815    fn aws_ecr_uris_translate_for_local_pull() {
1816        assert_eq!(
1817            fakecloud_core::ecr_uri::translate_to_local(
1818                "123456789012.dkr.ecr.us-east-1.amazonaws.com/app:latest",
1819                4566
1820            )
1821            .as_deref(),
1822            Some("127.0.0.1:4566/app:latest")
1823        );
1824    }
1825
1826    fn make_task(task_id: &str) -> Task {
1827        Task {
1828            task_arn: Arn::new(
1829                "ecs",
1830                "us-east-1",
1831                "000000000000",
1832                &format!("task/default/{task_id}"),
1833            )
1834            .to_string(),
1835            task_id: task_id.into(),
1836            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
1837            cluster_name: "default".into(),
1838            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
1839            family: "app".into(),
1840            revision: 1,
1841            container_instance_arn: None,
1842            capacity_provider_name: None,
1843            last_status: "PENDING".into(),
1844            desired_status: "RUNNING".into(),
1845            launch_type: "FARGATE".into(),
1846            platform_version: None,
1847            cpu: None,
1848            memory: None,
1849            containers: Vec::new(),
1850            overrides: serde_json::json!({}),
1851            started_by: None,
1852            group: None,
1853            connectivity: "CONNECTING".into(),
1854            stop_code: None,
1855            stopped_reason: None,
1856            created_at: Utc::now(),
1857            started_at: None,
1858            stopping_at: None,
1859            stopped_at: None,
1860            pull_started_at: None,
1861            pull_stopped_at: None,
1862            connectivity_at: None,
1863            started_by_ref_id: None,
1864            execution_role_arn: None,
1865            task_role_arn: None,
1866            tags: Vec::new(),
1867            awslogs: None,
1868            captured_logs: String::new(),
1869            protection: None,
1870            enable_execute_command: false,
1871            attachments: Vec::new(),
1872            volume_configurations: Vec::new(),
1873            task_set_arn: None,
1874        }
1875    }
1876
1877    #[test]
1878    fn finalize_failure_writes_reason_into_captured_logs() {
1879        let mut accounts: MultiAccountState<EcsState> =
1880            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1881        let acct = accounts.get_or_create("000000000000");
1882        acct.tasks.insert("t1".into(), make_task("t1"));
1883        let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1884
1885        finalize_failure(
1886            &state,
1887            "000000000000",
1888            "t1",
1889            "failed to resolve secret DB_PASSWORD",
1890        );
1891
1892        let accounts = state.read();
1893        let task = accounts
1894            .get("000000000000")
1895            .unwrap()
1896            .tasks
1897            .get("t1")
1898            .unwrap();
1899        assert_eq!(task.last_status, "STOPPED");
1900        assert_eq!(task.stop_code.as_deref(), Some("TaskFailedToStart"));
1901        assert!(
1902            task.captured_logs
1903                .contains("failed to resolve secret DB_PASSWORD"),
1904            "captured_logs missing reason: {:?}",
1905            task.captured_logs
1906        );
1907        assert!(
1908            task.captured_logs.starts_with("[task failed to start]:"),
1909            "captured_logs missing prefix: {:?}",
1910            task.captured_logs
1911        );
1912    }
1913
1914    /// 4.2 — `task_desired_stopped` is the post-launch gate `run_task_inner`
1915    /// uses to detect a StopTask / scale-down / DeleteService that raced the
1916    /// launch. RUNNING desired_status -> keep running; STOPPED -> self-stop;
1917    /// task removed from state -> treat as stop (nothing to keep alive).
1918    #[test]
1919    fn task_desired_stopped_detects_stop_during_launch() {
1920        let mut accounts: MultiAccountState<EcsState> =
1921            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1922        let acct = accounts.get_or_create("000000000000");
1923        acct.tasks.insert("running".into(), make_task("running"));
1924        let mut stopping = make_task("stopping");
1925        stopping.desired_status = "STOPPED".into();
1926        acct.tasks.insert("stopping".into(), stopping);
1927        let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1928
1929        assert!(
1930            !task_desired_stopped(&state, "000000000000", "running"),
1931            "a RUNNING task must not be treated as stopped",
1932        );
1933        assert!(
1934            task_desired_stopped(&state, "000000000000", "stopping"),
1935            "a task whose desired_status is STOPPED must be treated as stopped",
1936        );
1937        assert!(
1938            task_desired_stopped(&state, "000000000000", "deleted-mid-launch"),
1939            "a task removed from state mid-launch must be treated as stopped",
1940        );
1941    }
1942
1943    fn make_container(name: &str, essential: bool) -> crate::state::Container {
1944        crate::state::Container {
1945            container_arn: format!(
1946                "arn:aws:ecs:us-east-1:000000000000:container/default/abc/{name}"
1947            ),
1948            name: name.into(),
1949            image: "alpine".into(),
1950            task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
1951            last_status: "RUNNING".into(),
1952            exit_code: None,
1953            reason: None,
1954            runtime_id: Some(format!("dockerid-{name}")),
1955            essential,
1956            cpu: None,
1957            memory: None,
1958            memory_reservation: None,
1959            network_bindings: Vec::new(),
1960            network_interfaces: Vec::new(),
1961            health_status: None,
1962            managed_agents: None,
1963            image_digest: None,
1964        }
1965    }
1966
1967    #[test]
1968    fn task_should_stop_when_essential_exits() {
1969        let containers = vec![
1970            RunningContainer {
1971                name: "app".into(),
1972                container_id: "id-app".into(),
1973                essential: true,
1974                exit_code: Some(0),
1975                network_bindings: Vec::new(),
1976                image_digest: None,
1977            },
1978            RunningContainer {
1979                name: "sidecar".into(),
1980                container_id: "id-sc".into(),
1981                essential: false,
1982                exit_code: None,
1983                network_bindings: Vec::new(),
1984                image_digest: None,
1985            },
1986        ];
1987        assert!(task_should_stop(&containers));
1988    }
1989
1990    #[test]
1991    fn task_keeps_running_when_only_non_essential_exits() {
1992        let containers = vec![
1993            RunningContainer {
1994                name: "app".into(),
1995                container_id: "id-app".into(),
1996                essential: true,
1997                exit_code: None,
1998                network_bindings: Vec::new(),
1999                image_digest: None,
2000            },
2001            RunningContainer {
2002                name: "sidecar".into(),
2003                container_id: "id-sc".into(),
2004                essential: false,
2005                exit_code: Some(0),
2006                network_bindings: Vec::new(),
2007                image_digest: None,
2008            },
2009        ];
2010        assert!(!task_should_stop(&containers));
2011    }
2012
2013    #[test]
2014    fn task_stops_when_all_non_essentials_exit() {
2015        let containers = vec![
2016            RunningContainer {
2017                name: "a".into(),
2018                container_id: "id-a".into(),
2019                essential: false,
2020                exit_code: Some(0),
2021                network_bindings: Vec::new(),
2022                image_digest: None,
2023            },
2024            RunningContainer {
2025                name: "b".into(),
2026                container_id: "id-b".into(),
2027                essential: false,
2028                exit_code: Some(1),
2029                network_bindings: Vec::new(),
2030                image_digest: None,
2031            },
2032        ];
2033        assert!(task_should_stop(&containers));
2034    }
2035
2036    #[test]
2037    fn finalize_stopped_multi_assigns_per_container_exit_codes() {
2038        let mut accounts: MultiAccountState<EcsState> =
2039            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2040        let acct = accounts.get_or_create("000000000000");
2041        let mut t = make_task("t1");
2042        t.containers = vec![
2043            make_container("app", true),
2044            make_container("sidecar", false),
2045        ];
2046        acct.tasks.insert("t1".into(), t);
2047        let state: SharedEcsState = Arc::new(RwLock::new(accounts));
2048
2049        let final_containers = vec![
2050            RunningContainer {
2051                name: "app".into(),
2052                container_id: "id-app".into(),
2053                essential: true,
2054                exit_code: Some(0),
2055                network_bindings: Vec::new(),
2056                image_digest: None,
2057            },
2058            RunningContainer {
2059                name: "sidecar".into(),
2060                container_id: "id-sc".into(),
2061                essential: false,
2062                exit_code: Some(137),
2063                network_bindings: Vec::new(),
2064                image_digest: None,
2065            },
2066        ];
2067        finalize_stopped_multi(
2068            &state,
2069            "000000000000",
2070            "t1",
2071            &final_containers,
2072            0,
2073            "captured",
2074            "EssentialContainerExited",
2075            None,
2076        );
2077
2078        let accounts = state.read();
2079        let task = accounts
2080            .get("000000000000")
2081            .unwrap()
2082            .tasks
2083            .get("t1")
2084            .unwrap();
2085        assert_eq!(task.last_status, "STOPPED");
2086        assert_eq!(task.stop_code.as_deref(), Some("EssentialContainerExited"));
2087        let app = task.containers.iter().find(|c| c.name == "app").unwrap();
2088        let sc = task
2089            .containers
2090            .iter()
2091            .find(|c| c.name == "sidecar")
2092            .unwrap();
2093        assert_eq!(app.exit_code, Some(0));
2094        assert_eq!(sc.exit_code, Some(137));
2095        assert_eq!(app.last_status, "STOPPED");
2096        assert_eq!(sc.last_status, "STOPPED");
2097    }
2098
2099    fn plan(name: &str, deps: &[&str]) -> ContainerPlan {
2100        ContainerPlan {
2101            container_name: name.into(),
2102            image: "alpine".into(),
2103            env: Vec::new(),
2104            entry_point: Vec::new(),
2105            command: Vec::new(),
2106            secrets_refs: Vec::new(),
2107            essential: true,
2108            has_task_role: false,
2109            port_mappings: Vec::new(),
2110            network_mode: None,
2111            depends_on: deps
2112                .iter()
2113                .map(|s| DependsOn {
2114                    container_name: (*s).to_string(),
2115                    condition: DependsOnCondition::Start,
2116                })
2117                .collect(),
2118            health_check: None,
2119            volume_mounts: Vec::new(),
2120            ulimits: Vec::new(),
2121            linux_parameters: None,
2122            stop_timeout: None,
2123            user: None,
2124            working_directory: None,
2125            tty: false,
2126            interactive: false,
2127            readonly_rootfs: false,
2128        }
2129    }
2130
2131    #[test]
2132    fn topo_sort_orders_by_depends_on() {
2133        // sidecar depends on app, so app must come first regardless of
2134        // declaration order.
2135        let plans = vec![plan("sidecar", &["app"]), plan("app", &[])];
2136        let ordered = topo_sort_plans(plans);
2137        assert_eq!(ordered[0].container_name, "app");
2138        assert_eq!(ordered[1].container_name, "sidecar");
2139    }
2140
2141    #[test]
2142    fn topo_sort_preserves_declaration_order_when_no_deps() {
2143        let plans = vec![plan("first", &[]), plan("second", &[]), plan("third", &[])];
2144        let ordered = topo_sort_plans(plans);
2145        let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2146        assert_eq!(names, vec!["first", "second", "third"]);
2147    }
2148
2149    #[test]
2150    fn topo_sort_handles_chain() {
2151        // c -> b -> a, declared in reverse so the topological sort must
2152        // bubble dependencies up.
2153        let plans = vec![plan("c", &["b"]), plan("b", &["a"]), plan("a", &[])];
2154        let ordered = topo_sort_plans(plans);
2155        let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2156        assert_eq!(names, vec!["a", "b", "c"]);
2157    }
2158
2159    #[test]
2160    fn topo_sort_ignores_unknown_dependency() {
2161        // depends_on names a container not in this task definition. Real
2162        // ECS would reject this at register time; we don't (yet), so the
2163        // unknown dep should just be skipped instead of stalling the sort.
2164        let plans = vec![plan("only", &["does-not-exist"])];
2165        let ordered = topo_sort_plans(plans);
2166        assert_eq!(ordered.len(), 1);
2167        assert_eq!(ordered[0].container_name, "only");
2168    }
2169
2170    #[test]
2171    fn topo_sort_recovers_from_cycle() {
2172        // Cyclic dependsOn: both plans should still appear in the output
2173        // so the runtime doesn't silently drop them.
2174        let plans = vec![plan("a", &["b"]), plan("b", &["a"])];
2175        let ordered = topo_sort_plans(plans);
2176        assert_eq!(ordered.len(), 2);
2177    }
2178
2179    #[test]
2180    fn parse_health_check_fills_aws_defaults() {
2181        let v = serde_json::json!({
2182            "command": ["CMD-SHELL", "curl -f http://localhost/ || exit 1"],
2183        });
2184        let hc = __test_parse_health_check(&v).expect("parsed");
2185        assert_eq!(hc.command[0], "CMD-SHELL");
2186        assert_eq!(hc.interval_seconds, 30);
2187        assert_eq!(hc.timeout_seconds, 5);
2188        assert_eq!(hc.retries, 3);
2189        assert_eq!(hc.start_period_seconds, 0);
2190    }
2191
2192    #[test]
2193    fn parse_health_check_overrides_explicit_values() {
2194        let v = serde_json::json!({
2195            "command": ["CMD", "/probe"],
2196            "interval": 7,
2197            "timeout": 2,
2198            "retries": 9,
2199            "startPeriod": 12,
2200        });
2201        let hc = __test_parse_health_check(&v).expect("parsed");
2202        assert_eq!(hc.interval_seconds, 7);
2203        assert_eq!(hc.timeout_seconds, 2);
2204        assert_eq!(hc.retries, 9);
2205        assert_eq!(hc.start_period_seconds, 12);
2206    }
2207
2208    #[test]
2209    fn parse_health_check_returns_none_for_none_sentinel() {
2210        // ECS uses ["NONE"] to disable an inherited HEALTHCHECK; we
2211        // skip emission rather than passing a literal `none` to docker.
2212        let v = serde_json::json!({ "command": ["NONE"] });
2213        assert!(__test_parse_health_check(&v).is_none());
2214    }
2215
2216    #[test]
2217    fn parse_health_check_returns_none_for_missing_command() {
2218        let v = serde_json::json!({ "interval": 30 });
2219        assert!(__test_parse_health_check(&v).is_none());
2220    }
2221
2222    #[test]
2223    fn render_health_flags_emits_full_set_for_cmd_shell() {
2224        let hc = HealthCheckSpec {
2225            command: vec!["CMD-SHELL".into(), "curl -f http://localhost/".into()],
2226            interval_seconds: 15,
2227            timeout_seconds: 3,
2228            retries: 4,
2229            start_period_seconds: 10,
2230        };
2231        let flags = render_health_flags(&hc);
2232        assert_eq!(flags[0], "--health-cmd");
2233        assert_eq!(flags[1], "curl -f http://localhost/");
2234        assert!(flags.contains(&"--health-interval=15s".to_string()));
2235        assert!(flags.contains(&"--health-timeout=3s".to_string()));
2236        assert!(flags.contains(&"--health-retries=4".to_string()));
2237        assert!(flags.contains(&"--health-start-period=10s".to_string()));
2238    }
2239
2240    #[test]
2241    fn render_health_flags_joins_cmd_argv_with_spaces() {
2242        // CMD form in ECS is argv-style; docker `--health-cmd` only
2243        // accepts a single shell string, so we collapse with spaces.
2244        let hc = HealthCheckSpec {
2245            command: vec![
2246                "CMD".into(),
2247                "/bin/probe".into(),
2248                "--port".into(),
2249                "8080".into(),
2250            ],
2251            interval_seconds: 30,
2252            timeout_seconds: 5,
2253            retries: 3,
2254            start_period_seconds: 0,
2255        };
2256        let flags = render_health_flags(&hc);
2257        assert_eq!(flags[1], "/bin/probe --port 8080");
2258    }
2259
2260    #[test]
2261    fn build_run_argv_emits_health_flags_when_present() {
2262        let plan = ContainerPlan {
2263            container_name: "app".into(),
2264            image: "alpine".into(),
2265            env: Vec::new(),
2266            entry_point: Vec::new(),
2267            command: Vec::new(),
2268            secrets_refs: Vec::new(),
2269            essential: true,
2270            has_task_role: false,
2271            port_mappings: Vec::new(),
2272            network_mode: None,
2273            depends_on: Vec::new(),
2274            health_check: Some(HealthCheckSpec {
2275                command: vec!["CMD-SHELL".into(), "true".into()],
2276                interval_seconds: 5,
2277                timeout_seconds: 2,
2278                retries: 1,
2279                start_period_seconds: 1,
2280            }),
2281            volume_mounts: Vec::new(),
2282            ulimits: Vec::new(),
2283            linux_parameters: None,
2284            stop_timeout: None,
2285            user: None,
2286            working_directory: None,
2287            tty: false,
2288            interactive: false,
2289            readonly_rootfs: false,
2290        };
2291        let argv = build_run_argv(
2292            &plan,
2293            &[],
2294            "task-1",
2295            "host.docker.internal",
2296            None,
2297            "alpine",
2298            true,
2299        );
2300        let joined = argv.join(" ");
2301        assert!(joined.contains("--health-cmd true"), "argv: {joined}");
2302        assert!(joined.contains("--health-interval=5s"), "argv: {joined}");
2303        assert!(joined.contains("--health-timeout=2s"), "argv: {joined}");
2304        assert!(joined.contains("--health-retries=1"), "argv: {joined}");
2305        assert!(
2306            joined.contains("--health-start-period=1s"),
2307            "argv: {joined}"
2308        );
2309    }
2310
2311    #[test]
2312    fn build_run_argv_emits_no_health_flags_when_absent() {
2313        let plan = ContainerPlan {
2314            container_name: "app".into(),
2315            image: "alpine".into(),
2316            env: Vec::new(),
2317            entry_point: Vec::new(),
2318            command: Vec::new(),
2319            secrets_refs: Vec::new(),
2320            essential: true,
2321            has_task_role: false,
2322            port_mappings: Vec::new(),
2323            network_mode: None,
2324            depends_on: Vec::new(),
2325            health_check: None,
2326            volume_mounts: Vec::new(),
2327            ulimits: Vec::new(),
2328            linux_parameters: None,
2329            stop_timeout: None,
2330            user: None,
2331            working_directory: None,
2332            tty: false,
2333            interactive: false,
2334            readonly_rootfs: false,
2335        };
2336        let argv = build_run_argv(
2337            &plan,
2338            &[],
2339            "task-1",
2340            "host.docker.internal",
2341            None,
2342            "alpine",
2343            true,
2344        );
2345        assert!(!argv.iter().any(|s| s.starts_with("--health")));
2346    }
2347
2348    #[test]
2349    fn docker_health_to_ecs_maps_known_states() {
2350        assert_eq!(docker_health_to_ecs("healthy"), "HEALTHY");
2351        assert_eq!(docker_health_to_ecs("HEALTHY"), "HEALTHY");
2352        assert_eq!(docker_health_to_ecs("unhealthy"), "UNHEALTHY");
2353        assert_eq!(docker_health_to_ecs("starting"), "UNKNOWN");
2354        assert_eq!(docker_health_to_ecs("none"), "UNKNOWN");
2355        assert_eq!(docker_health_to_ecs(""), "UNKNOWN");
2356    }
2357
2358    /// `host.sourcePath` becomes a host bind mount with the path
2359    /// passed straight through to docker.
2360    #[test]
2361    fn resolve_host_bind_volume_uses_source_path() {
2362        let mut volumes = std::collections::HashMap::new();
2363        let v = serde_json::json!({
2364            "name": "data",
2365            "host": { "sourcePath": "/var/lib/myapp" }
2366        });
2367        volumes.insert("data".to_string(), &v);
2368        let mp = serde_json::json!({
2369            "sourceVolume": "data",
2370            "containerPath": "/app/data",
2371            "readOnly": false
2372        });
2373        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2374        assert_eq!(resolved.source, "/var/lib/myapp");
2375        assert_eq!(resolved.container_path, "/app/data");
2376        assert!(!resolved.read_only);
2377    }
2378
2379    /// `readOnly: true` on the mount point appends `:ro` to the
2380    /// rendered docker `-v` flag.
2381    #[test]
2382    fn read_only_mount_renders_ro_suffix() {
2383        let plan = ContainerPlan {
2384            container_name: "app".into(),
2385            image: "alpine".into(),
2386            env: Vec::new(),
2387            entry_point: Vec::new(),
2388            command: Vec::new(),
2389            secrets_refs: Vec::new(),
2390            essential: true,
2391            has_task_role: false,
2392            port_mappings: Vec::new(),
2393            network_mode: None,
2394            depends_on: Vec::new(),
2395            health_check: None,
2396            volume_mounts: vec![VolumeMount {
2397                source: "/host/path".into(),
2398                container_path: "/in/container".into(),
2399                read_only: true,
2400            }],
2401            ulimits: Vec::new(),
2402            linux_parameters: None,
2403            stop_timeout: None,
2404            user: None,
2405            working_directory: None,
2406            tty: false,
2407            interactive: false,
2408            readonly_rootfs: false,
2409        };
2410        let argv = build_run_argv(
2411            &plan,
2412            &[],
2413            "task-1",
2414            "host.docker.internal",
2415            None,
2416            "alpine",
2417            true,
2418        );
2419        let pair = argv
2420            .windows(2)
2421            .find(|w| w[0] == "-v")
2422            .expect("expected -v flag");
2423        assert_eq!(pair[1], "/host/path:/in/container:ro");
2424    }
2425
2426    /// EFS volumes resolve to a stub directory under `/tmp/fakecloud/efs`
2427    /// keyed by `fileSystemId`. `rootDirectory` (when set and not `/`)
2428    /// is appended so different mount targets within the same
2429    /// filesystem stay isolated.
2430    #[test]
2431    fn resolve_efs_volume_uses_stub_dir() {
2432        let mut volumes = std::collections::HashMap::new();
2433        let v = serde_json::json!({
2434            "name": "efs-vol",
2435            "efsVolumeConfiguration": {
2436                "fileSystemId": "fs-12345678",
2437                "rootDirectory": "/exports/app"
2438            }
2439        });
2440        volumes.insert("efs-vol".to_string(), &v);
2441        let mp = serde_json::json!({
2442            "sourceVolume": "efs-vol",
2443            "containerPath": "/mnt/efs"
2444        });
2445        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2446        // EFS resolves to a docker named volume (container-safe), with the
2447        // rootDirectory folded into the name (bug-audit 2026-05-28, 0.6).
2448        assert_eq!(resolved.source, "fakecloud-efs-fs-12345678-exports-app");
2449        assert_eq!(resolved.container_path, "/mnt/efs");
2450    }
2451
2452    /// EFS without `rootDirectory` (or with `/`) maps to the root of
2453    /// the filesystem stub so multiple tasks targeting the same id
2454    /// share state.
2455    #[test]
2456    fn efs_without_root_directory_uses_filesystem_root() {
2457        // No rootDirectory (or "/") -> a single shared named volume per
2458        // filesystem id.
2459        assert_eq!(
2460            shared_volume_name("efs", "fs-abc", "/"),
2461            "fakecloud-efs-fs-abc"
2462        );
2463        assert_eq!(
2464            shared_volume_name("efs", "fs-abc", ""),
2465            "fakecloud-efs-fs-abc"
2466        );
2467    }
2468
2469    /// `dockerVolumeConfiguration` resolves to the volume name itself,
2470    /// which docker treats as a named volume reference. No host path
2471    /// is materialised — docker creates the volume on first reference.
2472    #[test]
2473    fn resolve_docker_named_volume_uses_volume_name() {
2474        let mut volumes = std::collections::HashMap::new();
2475        let v = serde_json::json!({
2476            "name": "named-vol",
2477            "dockerVolumeConfiguration": {
2478                "scope": "task",
2479                "driver": "local"
2480            }
2481        });
2482        volumes.insert("named-vol".to_string(), &v);
2483        let mp = serde_json::json!({
2484            "sourceVolume": "named-vol",
2485            "containerPath": "/data"
2486        });
2487        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2488        assert_eq!(resolved.source, "named-vol");
2489        assert_eq!(resolved.container_path, "/data");
2490    }
2491
2492    /// FSx for Windows uses the same stub-directory pattern as EFS but
2493    /// scoped under `/tmp/fakecloud/fsx/<filesystemId>/`.
2494    #[test]
2495    fn resolve_fsx_volume_uses_stub_dir() {
2496        let mut volumes = std::collections::HashMap::new();
2497        let v = serde_json::json!({
2498            "name": "fsx-vol",
2499            "fsxWindowsFileServerVolumeConfiguration": {
2500                "fileSystemId": "fs-xyz",
2501                "rootDirectory": "share"
2502            }
2503        });
2504        volumes.insert("fsx-vol".to_string(), &v);
2505        let mp = serde_json::json!({
2506            "sourceVolume": "fsx-vol",
2507            "containerPath": "C:\\data"
2508        });
2509        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2510        // FSx resolves to a docker named volume (bug-audit 2026-05-28, 0.6).
2511        assert_eq!(resolved.source, "fakecloud-fsx-fs-xyz-share");
2512    }
2513
2514    /// Mount points that reference an undeclared `sourceVolume` resolve
2515    /// to `None` so `build_container_plans` skips them rather than
2516    /// emitting a broken `-v` flag.
2517    #[test]
2518    fn unknown_source_volume_returns_none() {
2519        let volumes = std::collections::HashMap::new();
2520        let mp = serde_json::json!({
2521            "sourceVolume": "missing",
2522            "containerPath": "/x"
2523        });
2524        assert!(resolve_mount_point(&mp, &volumes).is_none());
2525    }
2526
2527    /// `find_depends_on_cycle` returns the back-edge endpoints when a
2528    /// trivial 2-cycle exists. Real ECS would reject this at register
2529    /// time; our service-level handler relies on this helper.
2530    #[test]
2531    fn find_depends_on_cycle_detects_two_node_cycle() {
2532        let cds = vec![
2533            serde_json::json!({
2534                "name": "a",
2535                "image": "alpine",
2536                "dependsOn": [{"containerName": "b", "condition": "START"}],
2537            }),
2538            serde_json::json!({
2539                "name": "b",
2540                "image": "alpine",
2541                "dependsOn": [{"containerName": "a", "condition": "START"}],
2542            }),
2543        ];
2544        let cycle = find_depends_on_cycle(&cds);
2545        assert!(cycle.is_some(), "expected cycle to be detected");
2546    }
2547
2548    /// A three-node chain (a -> b -> c) is acyclic and must not be
2549    /// flagged. Guards against an over-eager DFS reporting back-edges
2550    /// from already-finished nodes.
2551    #[test]
2552    fn find_depends_on_cycle_accepts_chain() {
2553        let cds = vec![
2554            serde_json::json!({
2555                "name": "a",
2556                "image": "alpine",
2557                "dependsOn": [{"containerName": "b", "condition": "START"}],
2558            }),
2559            serde_json::json!({
2560                "name": "b",
2561                "image": "alpine",
2562                "dependsOn": [{"containerName": "c", "condition": "START"}],
2563            }),
2564            serde_json::json!({
2565                "name": "c",
2566                "image": "alpine",
2567            }),
2568        ];
2569        assert!(find_depends_on_cycle(&cds).is_none());
2570    }
2571
2572    /// `dependsOn[]` entries that name a container outside the task
2573    /// definition are ignored by the cycle check (they can't form a
2574    /// cycle by definition; runtime also drops them).
2575    #[test]
2576    fn find_depends_on_cycle_ignores_unknown_target() {
2577        let cds = vec![serde_json::json!({
2578            "name": "only",
2579            "image": "alpine",
2580            "dependsOn": [{"containerName": "ghost", "condition": "START"}],
2581        })];
2582        assert!(find_depends_on_cycle(&cds).is_none());
2583    }
2584
2585    /// `condition_is_met` covers each AWS condition value against a
2586    /// simulated docker inspect snapshot. Pinning these mappings here
2587    /// catches accidental re-orderings of the match arms.
2588    #[test]
2589    fn condition_is_met_matches_aws_semantics() {
2590        let running = InspectedState {
2591            started: true,
2592            exited: false,
2593            exit_code: 0,
2594            health: None,
2595        };
2596        let exited_ok = InspectedState {
2597            started: true,
2598            exited: true,
2599            exit_code: 0,
2600            health: None,
2601        };
2602        let exited_fail = InspectedState {
2603            started: true,
2604            exited: true,
2605            exit_code: 1,
2606            health: None,
2607        };
2608        let healthy = InspectedState {
2609            started: true,
2610            exited: false,
2611            exit_code: 0,
2612            health: Some("healthy".into()),
2613        };
2614
2615        // START is satisfied as soon as the container has started, even
2616        // if it later exited.
2617        assert!(condition_is_met(DependsOnCondition::Start, &running));
2618        assert!(condition_is_met(DependsOnCondition::Start, &exited_ok));
2619
2620        // COMPLETE requires an exit, regardless of code.
2621        assert!(!condition_is_met(DependsOnCondition::Complete, &running));
2622        assert!(condition_is_met(DependsOnCondition::Complete, &exited_ok));
2623        assert!(condition_is_met(DependsOnCondition::Complete, &exited_fail));
2624
2625        // SUCCESS requires an exit AND code 0.
2626        assert!(!condition_is_met(DependsOnCondition::Success, &running));
2627        assert!(condition_is_met(DependsOnCondition::Success, &exited_ok));
2628        assert!(!condition_is_met(DependsOnCondition::Success, &exited_fail));
2629
2630        // HEALTHY requires Health.Status == "healthy".
2631        assert!(!condition_is_met(DependsOnCondition::Healthy, &running));
2632        assert!(condition_is_met(DependsOnCondition::Healthy, &healthy));
2633    }
2634
2635    /// `DependsOnCondition::parse` accepts the four AWS-spelled values
2636    /// and rejects everything else — register-time validation depends on
2637    /// this returning `None` for unknowns.
2638    #[test]
2639    fn depends_on_condition_parse_round_trips() {
2640        assert_eq!(
2641            DependsOnCondition::parse("START"),
2642            Some(DependsOnCondition::Start)
2643        );
2644        assert_eq!(
2645            DependsOnCondition::parse("COMPLETE"),
2646            Some(DependsOnCondition::Complete)
2647        );
2648        assert_eq!(
2649            DependsOnCondition::parse("SUCCESS"),
2650            Some(DependsOnCondition::Success)
2651        );
2652        assert_eq!(
2653            DependsOnCondition::parse("HEALTHY"),
2654            Some(DependsOnCondition::Healthy)
2655        );
2656        assert_eq!(DependsOnCondition::parse("start"), None);
2657        assert_eq!(DependsOnCondition::parse("ANY"), None);
2658    }
2659
2660    // ── ulimits + linuxParameters + misc docker flags (O6) ──
2661
2662    #[test]
2663    fn build_run_argv_emits_ulimits() {
2664        let plan = ContainerPlan {
2665            container_name: "app".into(),
2666            image: "alpine".into(),
2667            env: Vec::new(),
2668            entry_point: Vec::new(),
2669            command: Vec::new(),
2670            secrets_refs: Vec::new(),
2671            essential: true,
2672            has_task_role: false,
2673            port_mappings: Vec::new(),
2674            network_mode: None,
2675            depends_on: Vec::new(),
2676            health_check: None,
2677            volume_mounts: Vec::new(),
2678            ulimits: vec![Ulimit {
2679                name: "nofile".into(),
2680                soft_limit: 1024,
2681                hard_limit: 2048,
2682            }],
2683            linux_parameters: None,
2684            stop_timeout: None,
2685            user: None,
2686            working_directory: None,
2687            tty: false,
2688            interactive: false,
2689            readonly_rootfs: false,
2690        };
2691        let argv = build_run_argv(&plan, &[], "t", "host.docker.internal", None, "img", true);
2692        assert!(argv.contains(&"--ulimit".to_string()));
2693        assert!(argv.contains(&"nofile=1024:2048".to_string()));
2694    }
2695
2696    #[test]
2697    fn build_run_argv_emits_linux_parameters() {
2698        let plan = ContainerPlan {
2699            container_name: "app".into(),
2700            image: "alpine".into(),
2701            env: Vec::new(),
2702            entry_point: Vec::new(),
2703            command: Vec::new(),
2704            secrets_refs: Vec::new(),
2705            essential: true,
2706            has_task_role: false,
2707            port_mappings: Vec::new(),
2708            network_mode: None,
2709            depends_on: Vec::new(),
2710            health_check: None,
2711            volume_mounts: Vec::new(),
2712            ulimits: Vec::new(),
2713            linux_parameters: Some(LinuxParameters {
2714                capabilities_add: vec!["NET_ADMIN".into()],
2715                capabilities_drop: vec!["ALL".into()],
2716                devices: vec![Device {
2717                    host_path: "/dev/zero".into(),
2718                    container_path: "/dev/zero".into(),
2719                    permissions: "rwm".into(),
2720                }],
2721                init_process_enabled: true,
2722                shared_memory_size: Some(256),
2723                sysctls: vec![Sysctl {
2724                    name: "net.ipv4.ip_forward".into(),
2725                    value: "1".into(),
2726                }],
2727                tmpfs: vec![Tmpfs {
2728                    container_path: "/tmp".into(),
2729                    size: 128,
2730                    mount_options: vec!["noexec".into()],
2731                }],
2732                privileged: true,
2733            }),
2734            stop_timeout: Some(30),
2735            user: Some("1000:1000".into()),
2736            working_directory: Some("/app".into()),
2737            tty: true,
2738            interactive: true,
2739            readonly_rootfs: true,
2740        };
2741        let argv = build_run_argv(&plan, &[], "t", "host.docker.internal", None, "img", true);
2742        assert!(argv.contains(&"--cap-add".to_string()));
2743        assert!(argv.contains(&"NET_ADMIN".to_string()));
2744        assert!(argv.contains(&"--cap-drop".to_string()));
2745        assert!(argv.contains(&"ALL".to_string()));
2746        assert!(argv.contains(&"--device".to_string()));
2747        assert!(argv.contains(&"/dev/zero:/dev/zerorwm".to_string()));
2748        assert!(argv.contains(&"--init".to_string()));
2749        assert!(argv.contains(&"--shm-size".to_string()));
2750        assert!(argv.contains(&"256m".to_string()));
2751        assert!(argv.contains(&"--sysctl".to_string()));
2752        assert!(argv.contains(&"net.ipv4.ip_forward=1".to_string()));
2753        assert!(argv.contains(&"--tmpfs".to_string()));
2754        assert!(argv.contains(&"--privileged".to_string()));
2755        assert!(argv.contains(&"--stop-timeout".to_string()));
2756        assert!(argv.contains(&"30".to_string()));
2757        assert!(argv.contains(&"--user".to_string()));
2758        assert!(argv.contains(&"1000:1000".to_string()));
2759        assert!(argv.contains(&"--workdir".to_string()));
2760        assert!(argv.contains(&"/app".to_string()));
2761        assert!(argv.contains(&"--tty".to_string()));
2762        assert!(argv.contains(&"--interactive".to_string()));
2763        assert!(argv.contains(&"--read-only".to_string()));
2764    }
2765
2766    #[test]
2767    fn parse_linux_parameters_fills_defaults() {
2768        let raw = serde_json::json!({"initProcessEnabled": true});
2769        let lp = parse_linux_parameters(&raw).expect("parses");
2770        assert!(lp.init_process_enabled);
2771        assert!(!lp.privileged);
2772        assert!(lp.capabilities_add.is_empty());
2773    }
2774
2775    #[test]
2776    fn parse_device_uses_default_permissions() {
2777        let raw = serde_json::json!({"hostPath": "/dev/null", "containerPath": "/dev/null"});
2778        let dev = parse_device(&raw).expect("parses");
2779        assert_eq!(dev.permissions, "rwm");
2780    }
2781
2782    #[test]
2783    fn compute_elbv2_targets_empty_when_no_group() {
2784        let mut accounts: MultiAccountState<EcsState> =
2785            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2786        let acct = accounts.get_or_create("000000000000");
2787        let mut task = make_task("t1");
2788        task.group = None;
2789        acct.tasks.insert("t1".into(), task);
2790        let state = acct.clone();
2791        let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2792        assert!(targets.is_empty());
2793    }
2794
2795    #[test]
2796    fn compute_elbv2_targets_bridge_mode_uses_localhost_and_host_port() {
2797        let mut accounts: MultiAccountState<EcsState> =
2798            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2799        let acct = accounts.get_or_create("000000000000");
2800
2801        let td = crate::state::TaskDefinition {
2802            family: "app".into(),
2803            revision: 1,
2804            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2805            container_definitions: Vec::new(),
2806            network_mode: Some("bridge".into()),
2807            status: "ACTIVE".into(),
2808            task_role_arn: None,
2809            execution_role_arn: None,
2810            requires_compatibilities: Vec::new(),
2811            compatibilities: Vec::new(),
2812            cpu: None,
2813            memory: None,
2814            pid_mode: None,
2815            ipc_mode: None,
2816            volumes: Vec::new(),
2817            placement_constraints: Vec::new(),
2818            proxy_configuration: None,
2819            inference_accelerators: Vec::new(),
2820            ephemeral_storage: None,
2821            runtime_platform: None,
2822            requires_attributes: Vec::new(),
2823            registered_at: Utc::now(),
2824            registered_by: None,
2825            deregistered_at: None,
2826            tags: Vec::new(),
2827            enable_fault_injection: None,
2828        };
2829        acct.task_definitions.insert("app".into(), {
2830            let mut m = std::collections::BTreeMap::new();
2831            m.insert(1, td);
2832            m
2833        });
2834
2835        let service = crate::state::Service {
2836            service_name: "svc".into(),
2837            service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
2838            cluster_name: "default".into(),
2839            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2840            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2841            family: "app".into(),
2842            revision: 1,
2843            desired_count: 1,
2844            running_count: 0,
2845            pending_count: 0,
2846            launch_type: "FARGATE".into(),
2847            status: "ACTIVE".into(),
2848            scheduling_strategy: "REPLICA".into(),
2849            deployment_controller: "ECS".into(),
2850            minimum_healthy_percent: Some(0),
2851            maximum_percent: Some(200),
2852            circuit_breaker: None,
2853            deployments: Vec::new(),
2854            load_balancers: vec![serde_json::json!({
2855                "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
2856                "containerName": "app",
2857                "containerPort": 80,
2858            })],
2859            service_registries: Vec::new(),
2860            placement_constraints: Vec::new(),
2861            placement_strategy: Vec::new(),
2862            network_configuration: None,
2863            volume_configurations: vec![],
2864            tags: Vec::new(),
2865            created_at: Utc::now(),
2866            created_by: None,
2867            role_arn: None,
2868            platform_version: None,
2869            health_check_grace_period_seconds: None,
2870            enable_execute_command: false,
2871            enable_ecs_managed_tags: false,
2872            propagate_tags: None,
2873            capacity_provider_strategy: Vec::new(),
2874            availability_zone_rebalancing: None,
2875        };
2876        acct.services.insert(
2877            crate::state::EcsState::service_key("default", "svc"),
2878            service,
2879        );
2880
2881        let mut task = make_task("t1");
2882        task.group = Some("service:svc".into());
2883        task.containers = vec![crate::state::Container {
2884            container_arn: "arn:aws:ecs:us-east-1:000000000000:container/default/abc/app".into(),
2885            name: "app".into(),
2886            image: "alpine".into(),
2887            task_arn: task.task_arn.clone(),
2888            last_status: "RUNNING".into(),
2889            exit_code: None,
2890            reason: None,
2891            runtime_id: Some("dockerid-app".into()),
2892            essential: true,
2893            cpu: None,
2894            memory: None,
2895            memory_reservation: None,
2896            network_bindings: vec![serde_json::json!({
2897                "bindIP": "0.0.0.0",
2898                "containerPort": 80,
2899                "hostPort": 32768,
2900                "protocol": "tcp",
2901            })],
2902            network_interfaces: Vec::new(),
2903            health_status: None,
2904            managed_agents: None,
2905            image_digest: None,
2906        }];
2907        acct.tasks.insert("t1".into(), task);
2908
2909        let state = acct.clone();
2910        let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2911        assert_eq!(targets.len(), 1);
2912        let (arn, tg_targets) = &targets[0];
2913        assert_eq!(
2914            arn,
2915            "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
2916        );
2917        assert_eq!(tg_targets.len(), 1);
2918        assert_eq!(tg_targets[0].0, "127.0.0.1");
2919        assert_eq!(tg_targets[0].1, Some(32768));
2920    }
2921
2922    #[test]
2923    fn compute_elbv2_targets_awsvpc_uses_eni_ip() {
2924        let mut accounts: MultiAccountState<EcsState> =
2925            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2926        let acct = accounts.get_or_create("000000000000");
2927
2928        let td = crate::state::TaskDefinition {
2929            family: "app".into(),
2930            revision: 1,
2931            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2932            container_definitions: Vec::new(),
2933            network_mode: Some("awsvpc".into()),
2934            status: "ACTIVE".into(),
2935            task_role_arn: None,
2936            execution_role_arn: None,
2937            requires_compatibilities: Vec::new(),
2938            compatibilities: Vec::new(),
2939            cpu: None,
2940            memory: None,
2941            pid_mode: None,
2942            ipc_mode: None,
2943            volumes: Vec::new(),
2944            placement_constraints: Vec::new(),
2945            proxy_configuration: None,
2946            inference_accelerators: Vec::new(),
2947            ephemeral_storage: None,
2948            runtime_platform: None,
2949            requires_attributes: Vec::new(),
2950            registered_at: Utc::now(),
2951            registered_by: None,
2952            deregistered_at: None,
2953            tags: Vec::new(),
2954            enable_fault_injection: None,
2955        };
2956        acct.task_definitions.insert("app".into(), {
2957            let mut m = std::collections::BTreeMap::new();
2958            m.insert(1, td);
2959            m
2960        });
2961
2962        let service = crate::state::Service {
2963            service_name: "svc".into(),
2964            service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
2965            cluster_name: "default".into(),
2966            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2967            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2968            family: "app".into(),
2969            revision: 1,
2970            desired_count: 1,
2971            running_count: 0,
2972            pending_count: 0,
2973            launch_type: "FARGATE".into(),
2974            status: "ACTIVE".into(),
2975            scheduling_strategy: "REPLICA".into(),
2976            deployment_controller: "ECS".into(),
2977            minimum_healthy_percent: Some(0),
2978            maximum_percent: Some(200),
2979            circuit_breaker: None,
2980            deployments: Vec::new(),
2981            load_balancers: vec![serde_json::json!({
2982                "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
2983                "containerName": "app",
2984                "containerPort": 80,
2985            })],
2986            service_registries: Vec::new(),
2987            placement_constraints: Vec::new(),
2988            placement_strategy: Vec::new(),
2989            network_configuration: None,
2990            volume_configurations: vec![],
2991            tags: Vec::new(),
2992            created_at: Utc::now(),
2993            created_by: None,
2994            role_arn: None,
2995            platform_version: None,
2996            health_check_grace_period_seconds: None,
2997            enable_execute_command: false,
2998            enable_ecs_managed_tags: false,
2999            propagate_tags: None,
3000            capacity_provider_strategy: Vec::new(),
3001            availability_zone_rebalancing: None,
3002        };
3003        acct.services.insert(
3004            crate::state::EcsState::service_key("default", "svc"),
3005            service,
3006        );
3007
3008        let mut task = make_task("t1");
3009        task.group = Some("service:svc".into());
3010        task.attachments = vec![crate::state::TaskAttachment {
3011            id: "eni-123".into(),
3012            attachment_type: "eni".into(),
3013            status: "ATTACHED".into(),
3014            details: vec![
3015                crate::state::AttachmentDetail {
3016                    name: "privateIPv4Address".into(),
3017                    value: "172.18.0.2".into(),
3018                },
3019                crate::state::AttachmentDetail {
3020                    name: "macAddress".into(),
3021                    value: "02:42:ac:12:00:02".into(),
3022                },
3023            ],
3024        }];
3025        acct.tasks.insert("t1".into(), task);
3026
3027        let state = acct.clone();
3028        let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
3029        assert_eq!(targets.len(), 1);
3030        let (arn, tg_targets) = &targets[0];
3031        assert_eq!(
3032            arn,
3033            "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
3034        );
3035        assert_eq!(tg_targets.len(), 1);
3036        assert_eq!(tg_targets[0].0, "172.18.0.2");
3037        assert_eq!(tg_targets[0].1, Some(80));
3038    }
3039
3040    fn minimal_plan() -> ContainerPlan {
3041        ContainerPlan {
3042            container_name: "app".into(),
3043            image: "alpine".into(),
3044            env: Vec::new(),
3045            entry_point: Vec::new(),
3046            command: Vec::new(),
3047            secrets_refs: Vec::new(),
3048            essential: true,
3049            has_task_role: false,
3050            port_mappings: Vec::new(),
3051            network_mode: None,
3052            depends_on: Vec::new(),
3053            health_check: None,
3054            volume_mounts: Vec::new(),
3055            ulimits: Vec::new(),
3056            linux_parameters: None,
3057            stop_timeout: None,
3058            user: None,
3059            working_directory: None,
3060            tty: false,
3061            interactive: false,
3062            readonly_rootfs: false,
3063        }
3064    }
3065
3066    /// 4.1 — every ECS task container must carry the shared
3067    /// `fakecloud-instance` ownership label so the startup reaper picks it
3068    /// up after an ungraceful restart (it filters strictly on that label).
3069    #[test]
3070    fn build_run_argv_emits_fakecloud_instance_label() {
3071        let plan = minimal_plan();
3072        let argv = build_run_argv(
3073            &plan,
3074            &[],
3075            "task-1",
3076            "host.docker.internal",
3077            None,
3078            "alpine",
3079            true,
3080        );
3081        let expected = fakecloud_instance_label();
3082        assert!(
3083            argv.windows(2)
3084                .any(|w| w[0] == "--label" && w[1] == expected),
3085            "argv must contain `--label {expected}`: {argv:?}",
3086        );
3087    }
3088
3089    /// 4.1 — the label value must be exactly the shape the reaper parses:
3090    /// `fakecloud-instance=fakecloud-<pid>`. The reaper strips the
3091    /// `fakecloud-` prefix off the value and `parse::<u32>()`s the rest, so a
3092    /// non-numeric tail (e.g. a task id) would silently never reap.
3093    #[test]
3094    fn fakecloud_instance_label_matches_reaper_format() {
3095        let label = fakecloud_instance_label();
3096        let (key, value) = label.split_once('=').expect("label is key=value");
3097        assert_eq!(key, "fakecloud-instance");
3098        let pid_str = value
3099            .strip_prefix("fakecloud-")
3100            .expect("value starts with fakecloud-");
3101        assert_eq!(
3102            pid_str.parse::<u32>().ok(),
3103            Some(std::process::id()),
3104            "reaper must be able to parse the owning pid out of {label}",
3105        );
3106    }
3107}