Skip to main content

fakecloud_ecs/runtime/
mod.rs

1//! Docker/Podman-based ECS task execution.
2//!
3//! Mirrors the Lambda `ContainerRuntime` approach (auto-detect CLI, forward
4//! localhost → host.docker.internal) but scoped for ECS's different
5//! lifecycle: tasks are ephemeral, so there is no warm-container pool. Each
6//! `run_task` spawns a background tokio task that pulls the image, starts
7//! the container, waits for exit, captures logs, and updates shared ECS
8//! state in place.
9
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use base64::Engine;
15use chrono::Utc;
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_logs::ingest::{append_events, IngestEvent};
18use fakecloud_logs::SharedLogsState;
19use fakecloud_secretsmanager::SharedSecretsManagerState;
20use fakecloud_ssm::SharedSsmState;
21use parking_lot::RwLock;
22use tempfile::TempDir;
23use tokio::process::Command;
24
25use crate::state::{LifecycleEvent, SharedEcsState};
26
27#[derive(Debug, thiserror::Error)]
28pub enum RuntimeError {
29    #[error("container CLI not found (tried docker, podman)")]
30    NoCli,
31    #[error("image pull failed: {0}")]
32    ImagePull(String),
33    #[error("container start failed: {0}")]
34    ContainerStart(String),
35    #[error("docker wait failed: {0}")]
36    Wait(String),
37}
38
39/// Docker/Podman executor for ECS tasks.
40pub struct EcsRuntime {
41    cli: String,
42    /// Container-to-host networking resolution (host alias, `--add-host`
43    /// arg, sibling-container address) shared with the other runtimes via
44    /// [`fakecloud_core::container_net`]. Carries the issue #1539 podman +
45    /// in-container fixes.
46    net: fakecloud_core::container_net::HostNetworking,
47    /// Port the main fakecloud server bound to. Used to translate AWS
48    /// ECR URIs (`<acct>.dkr.ecr.<region>.amazonaws.com/<repo>:<tag>`) to
49    /// the local OCI v2 endpoint (`127.0.0.1:<port>/<repo>:<tag>`) so
50    /// tasks can pull images pushed to fakecloud's own ECR.
51    server_port: u16,
52    /// Isolated DOCKER_CONFIG dir pre-populated with Basic auth for
53    /// `127.0.0.1:<port>`; keeps the host user's `~/.docker/config.json`
54    /// untouched and lets `docker pull` succeed against fakecloud ECR
55    /// without a prior `aws ecr get-login-password | docker login`.
56    docker_config: Option<Arc<TempDir>>,
57    /// Tracks per-task lists of `(container_name, docker_container_id)` so
58    /// `stop_task` can kill every container backing a task — multi-container
59    /// task definitions launch one docker container per `containerDefinitions`
60    /// entry, all of which must be torn down on stop.
61    containers: RwLock<std::collections::HashMap<String, Vec<(String, String)>>>,
62    /// Cross-service delivery bus — emits `aws.ecs` EventBridge events
63    /// on task state transitions when wired. `None` if the server started
64    /// without EventBridge configured (or for unit tests).
65    delivery_bus: Option<Arc<DeliveryBus>>,
66    /// CloudWatch Logs state — when set, tasks whose container definition
67    /// declares the `awslogs` log driver get their captured stdout/stderr
68    /// forwarded to a log group/stream under this shared state.
69    logs_state: Option<SharedLogsState>,
70    /// SecretsManager state for resolving `containerDefinition.secrets[]`
71    /// entries whose `valueFrom` is a SecretsManager ARN.
72    secretsmanager_state: Option<SharedSecretsManagerState>,
73    /// SSM Parameter Store state for resolving `secrets[]` entries whose
74    /// `valueFrom` is an SSM parameter ARN.
75    ssm_state: Option<SharedSsmState>,
76    /// `Some` when running on the Kubernetes backend; `run_task` then maps
77    /// each task to a Pod instead of `docker run`. `None` is the default
78    /// Docker/Podman backend, and the fields above drive it.
79    k8s: Option<k8s::K8sTaskBackend>,
80}
81
82mod config;
83mod k8s;
84mod lb;
85mod monitoring;
86mod secrets;
87mod task_lifecycle;
88
89impl EcsRuntime {
90    /// Auto-detect Docker or Podman. Returns `None` if neither is
91    /// available. Honours `FAKECLOUD_CONTAINER_CLI` for explicit override.
92    /// `server_port` is the port the main fakecloud server bound to;
93    /// needed to resolve AWS ECR URIs against the local OCI v2 registry.
94    pub fn new(server_port: u16) -> Option<Self> {
95        let cli = fakecloud_core::container_net::detect_container_cli()?;
96        let net = fakecloud_core::container_net::HostNetworking::detect(&cli);
97        let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
98        Some(Self {
99            cli,
100            net,
101            server_port,
102            docker_config,
103            containers: RwLock::new(std::collections::HashMap::new()),
104            delivery_bus: None,
105            logs_state: None,
106            secretsmanager_state: None,
107            ssm_state: None,
108            k8s: None,
109        })
110    }
111
112    /// Construct the Kubernetes backend. `server_port` is fakecloud's
113    /// bound port (used when `FAKECLOUD_K8S_SELF_URL` omits one). Fails
114    /// fast on misconfiguration — never silently degrades to Docker.
115    pub async fn new_k8s(server_port: u16) -> Result<Self, k8s::BackendInitError> {
116        let backend = k8s::K8sTaskBackend::from_env(server_port).await?;
117        // Docker fields are inert on the k8s backend; populate the cheap
118        // ones and leave docker_config unset.
119        let net = fakecloud_core::container_net::HostNetworking {
120            host_alias: String::new(),
121            add_host_arg: None,
122            sibling_host: String::new(),
123        };
124        Ok(Self {
125            cli: String::new(),
126            net,
127            server_port,
128            docker_config: None,
129            containers: RwLock::new(std::collections::HashMap::new()),
130            delivery_bus: None,
131            logs_state: None,
132            secretsmanager_state: None,
133            ssm_state: None,
134            k8s: Some(backend),
135        })
136    }
137
138    /// Backend name for logging.
139    pub fn cli_name(&self) -> &str {
140        if self.k8s.is_some() {
141            "kubernetes"
142        } else {
143            &self.cli
144        }
145    }
146
147    /// Sweep task Pods orphaned by a previous process (k8s only; no-op on
148    /// the Docker backend, handled by the shared container reaper).
149    pub async fn reap_stale(&self) {
150        if let Some(k) = &self.k8s {
151            k.reap_stale().await;
152        }
153    }
154
155    /// Wire EventBridge delivery so task state transitions emit
156    /// `aws.ecs` / `ECS Task State Change` events.
157    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
158        self.delivery_bus = Some(bus);
159        self
160    }
161
162    /// Wire CloudWatch Logs state so tasks using the `awslogs` driver
163    /// get their captured stdout/stderr forwarded.
164    pub fn with_logs(mut self, logs: SharedLogsState) -> Self {
165        self.logs_state = Some(logs);
166        self
167    }
168}
169
170/// Per-container launch plan derived from a task definition.
171#[derive(Clone, Debug)]
172pub(crate) struct ContainerPlan {
173    pub(crate) container_name: String,
174    pub(crate) image: String,
175    pub(crate) env: Vec<(String, String)>,
176    pub(crate) entry_point: Vec<String>,
177    pub(crate) command: Vec<String>,
178    pub(crate) secrets_refs: Vec<(String, String)>,
179    pub(crate) essential: bool,
180    pub(crate) has_task_role: bool,
181    /// Port mappings parsed from the task definition. Each entry becomes
182    /// a `--publish containerPort:hostPort/protocol` flag on the docker
183    /// run command (except for `awsvpc`, where ports are exposed via the
184    /// per-task ENI rather than the docker host's port table).
185    pub(crate) port_mappings: Vec<PortMapping>,
186    /// Task-level network mode propagated to every container plan so the
187    /// argv builder can decide whether to emit `--publish` flags. Real
188    /// ECS treats `awsvpc` as "container is on its own ENI"; the
189    /// equivalent in fakecloud is "don't publish to the host".
190    pub(crate) network_mode: Option<String>,
191    /// Container dependencies parsed from `dependsOn[]`. Each entry pairs
192    /// the target container name with the condition that must be observed
193    /// before this container is launched: `START` (target exists/running),
194    /// `COMPLETE` (target exited, any code), `SUCCESS` (target exited with
195    /// code 0), or `HEALTHY` (target's docker `Health.Status` is `healthy`).
196    /// Used both to topologically order the launch loop and to gate each
197    /// `docker run` on the upstream condition.
198    pub(crate) depends_on: Vec<DependsOn>,
199    /// Parsed `healthCheck` from the task definition. Translated into
200    /// docker `--health-*` flags on `docker run` so the container's
201    /// health is observable via `docker inspect .State.Health.Status`.
202    /// `None` when the task definition doesn't declare a healthCheck;
203    /// the container's `healthStatus` then stays `UNKNOWN` (matching ECS
204    /// behaviour for tasks without a health probe).
205    pub(crate) health_check: Option<HealthCheckSpec>,
206    /// Volume mounts resolved by joining the container definition's
207    /// `mountPoints[]` with the task definition's `volumes[]`. Each entry
208    /// renders as one `-v` flag on the `docker run` invocation. Empty when
209    /// the container has no mount points or no matching volume entries.
210    pub(crate) volume_mounts: Vec<VolumeMount>,
211    /// Parsed `ulimits` from the container definition. Each entry becomes
212    /// `--ulimit <name>=<soft>:<hard>` on `docker run`.
213    pub(crate) ulimits: Vec<Ulimit>,
214    /// Parsed `linuxParameters` from the container definition. Emits
215    /// `--cap-add`, `--cap-drop`, `--device`, `--init`, `--shm-size`,
216    /// `--sysctl`, `--tmpfs`, `--privileged`, and `--read-only` flags.
217    pub(crate) linux_parameters: Option<LinuxParameters>,
218    /// `stopTimeout` in seconds. Becomes `--stop-timeout <N>` on `docker run`.
219    pub(crate) stop_timeout: Option<u32>,
220    /// `user` from the container definition. Becomes `--user <value>`.
221    pub(crate) user: Option<String>,
222    /// `workingDirectory` from the container definition. Becomes `--workdir`.
223    pub(crate) working_directory: Option<String>,
224    /// `tty` from the container definition. Emits `--tty` when true.
225    pub(crate) tty: bool,
226    /// `interactive` from the container definition. Emits `--interactive` when true.
227    pub(crate) interactive: bool,
228    /// `readonlyRootFilesystem` from the container definition. Emits `--read-only` when true.
229    pub(crate) readonly_rootfs: bool,
230}
231
232/// One parsed `dependsOn[]` entry on a container. Pairs the upstream
233/// container name with the condition that must hold before the dependent
234/// container is launched. AWS spells the conditions `START`, `COMPLETE`,
235/// `SUCCESS`, `HEALTHY` and treats anything else as an error at register
236/// time — we mirror that in [`parse_depends_on`].
237#[derive(Clone, Debug, PartialEq, Eq)]
238pub(crate) struct DependsOn {
239    pub container_name: String,
240    pub condition: DependsOnCondition,
241}
242
243/// `dependsOn[].condition` from the task definition. The variants map
244/// 1:1 to AWS's documented values; the launch loop polls docker for the
245/// matching predicate before starting the dependent container.
246#[derive(Clone, Copy, Debug, PartialEq, Eq)]
247pub(crate) enum DependsOnCondition {
248    /// Upstream container has been started (docker container exists and
249    /// is either running or has exited).
250    Start,
251    /// Upstream container has exited (any exit code).
252    Complete,
253    /// Upstream container has exited with code 0.
254    Success,
255    /// Upstream container's `Health.Status` is `healthy`. When the
256    /// upstream has no healthCheck configured, AWS treats this as
257    /// immediately satisfied — we do the same.
258    Healthy,
259}
260
261impl DependsOnCondition {
262    /// Parse the AWS-spelled condition string. Returns `None` for
263    /// unrecognised values so callers can surface a `ClientException`
264    /// at register time.
265    pub fn parse(raw: &str) -> Option<Self> {
266        match raw {
267            "START" => Some(Self::Start),
268            "COMPLETE" => Some(Self::Complete),
269            "SUCCESS" => Some(Self::Success),
270            "HEALTHY" => Some(Self::Healthy),
271            _ => None,
272        }
273    }
274
275    /// AWS-spelled string for this condition. Used in user-facing error
276    /// messages so timeout/dependency-failed reasons echo back the same
277    /// value the user wrote in their task definition.
278    pub fn as_aws_str(self) -> &'static str {
279        match self {
280            Self::Start => "START",
281            Self::Complete => "COMPLETE",
282            Self::Success => "SUCCESS",
283            Self::Healthy => "HEALTHY",
284        }
285    }
286}
287
288/// Container health check parsed from the ECS task definition. Each
289/// field maps 1:1 to a docker `--health-*` flag on `docker run`. AWS
290/// defaults: interval=30s, timeout=5s, retries=3, startPeriod=0s — we
291/// preserve those defaults at parse time so the argv builder always
292/// has concrete values to emit.
293#[derive(Clone, Debug, PartialEq, Eq)]
294pub(crate) struct HealthCheckSpec {
295    /// `command[]` from the task definition. The first element selects
296    /// the docker syntax: `CMD-SHELL` => `--health-cmd <rest joined by space>`,
297    /// `CMD` => `--health-cmd <rest joined by space>` (still routed to
298    /// `--health-cmd` because docker doesn't accept argv-form here),
299    /// `NONE` => no flag emitted (caller skips emitting healthcheck).
300    pub command: Vec<String>,
301    pub interval_seconds: u32,
302    pub timeout_seconds: u32,
303    pub retries: u32,
304    pub start_period_seconds: u32,
305}
306
307/// One entry in a container's `portMappings`. Mirrors the AWS shape so
308/// [`build_run_argv`] and the `networkBindings` response can share the
309/// same parsed representation.
310#[derive(Clone, Debug, PartialEq, Eq)]
311pub(crate) struct PortMapping {
312    pub container_port: u16,
313    /// `0` (or unset in the source JSON) means "use the same value as
314    /// containerPort" — host-mode default per AWS docs.
315    pub host_port: u16,
316    /// Lower-case `tcp` / `udp`. Defaults to `tcp` when omitted.
317    pub protocol: String,
318}
319
320/// One resolved `mountPoints` entry on a container plan. Computed at
321/// launch by joining the container definition's `mountPoints` against the
322/// task definition's `volumes` array. Each entry becomes a single
323/// `-v <source>:<containerPath>[:ro]` flag on `docker run`.
324///
325/// Source resolution by volume kind:
326/// - **host bind** (`volume.host.sourcePath` set): bind the host path
327///   into the container at `containerPath`.
328/// - **EFS** (`efsVolumeConfiguration` set): bind a host-side stub
329///   directory at `/tmp/fakecloud/efs/<filesystemId>[/<rootDirectory>]`
330///   so multiple tasks targeting the same filesystem id can share state
331///   the way real EFS would. The stub directory is created with
332///   `mkdir -p` ahead of `docker run`.
333/// - **FSx for Windows** (`fsxWindowsFileServerVolumeConfiguration` set):
334///   stub directory at `/tmp/fakecloud/fsx/<filesystemId>/<rootDirectory>`
335///   created the same way as EFS.
336/// - **Docker named volume** (`dockerVolumeConfiguration` set): pass the
337///   volume name through to docker as a named volume reference.
338/// - **Bare volume** (only `name` set, no host config): treated as an
339///   anonymous docker volume for that task — matches AWS's "Docker
340///   volumes" default scope.
341#[derive(Clone, Debug, PartialEq, Eq)]
342pub(crate) struct VolumeMount {
343    /// Left side of `-v`: a host path, a docker named volume, or a stub
344    /// directory under `/tmp/fakecloud/{efs,fsx}/...` for shared FS
345    /// emulation.
346    pub source: String,
347    /// Container-side path, taken verbatim from the container
348    /// definition's `mountPoints[].containerPath`.
349    pub container_path: String,
350    /// `mountPoints[].readOnly` honoured: when true, append `:ro` to the
351    /// `-v` flag so the bind/named volume is read-only inside the
352    /// container. Defaults to false (read-write) when omitted.
353    pub read_only: bool,
354}
355
356/// One `ulimits` entry. Becomes `--ulimit <name>=<soft>:<hard>`.
357#[derive(Clone, Debug, PartialEq, Eq)]
358pub(crate) struct Ulimit {
359    pub name: String,
360    pub soft_limit: i32,
361    pub hard_limit: i32,
362}
363
364/// One `linuxParameters.devices` entry. Becomes `--device <hostPath>:<containerPath><permissions>`.
365#[derive(Clone, Debug, PartialEq, Eq)]
366pub(crate) struct Device {
367    pub host_path: String,
368    pub container_path: String,
369    pub permissions: String,
370}
371
372/// One `linuxParameters.sysctl` entry. Becomes `--sysctl <name>=<value>`.
373#[derive(Clone, Debug, PartialEq, Eq)]
374pub(crate) struct Sysctl {
375    pub name: String,
376    pub value: String,
377}
378
379/// Parsed `linuxParameters` from the container definition.
380#[derive(Clone, Debug, PartialEq, Eq, Default)]
381pub(crate) struct LinuxParameters {
382    pub capabilities_add: Vec<String>,
383    pub capabilities_drop: Vec<String>,
384    pub devices: Vec<Device>,
385    pub init_process_enabled: bool,
386    pub shared_memory_size: Option<i32>,
387    pub sysctls: Vec<Sysctl>,
388    pub tmpfs: Vec<Tmpfs>,
389    pub privileged: bool,
390}
391
392/// One `linuxParameters.tmpfs` entry. Becomes `--tmpfs <containerPath>:size=<size>M<,options>*`.
393#[derive(Clone, Debug, PartialEq, Eq)]
394pub(crate) struct Tmpfs {
395    pub container_path: String,
396    pub size: i32,
397    pub mount_options: Vec<String>,
398}
399
400#[derive(Clone, Debug)]
401struct ResolvedContainerPlan {
402    plan: ContainerPlan,
403    env: Vec<(String, String)>,
404}
405
406/// Result of waiting for a task's lifetime-determining container.
407#[derive(Clone, Debug)]
408struct TaskExitOutcome {
409    /// Index into the started-containers list of the container whose exit
410    /// closed out the task. `None` only in degenerate cases — kept as
411    /// `Option` so `final_containers` indexing stays explicit.
412    exited_index: Option<usize>,
413    exit_code: i64,
414    stop_code: &'static str,
415}
416
417/// Per-container record persisted on the task. Mirrors the AWS Container
418/// shape but tracks the docker-side container id alongside ECS metadata.
419#[derive(Clone, Debug)]
420pub(crate) struct RunningContainer {
421    pub(crate) name: String,
422    pub(crate) container_id: String,
423    pub(crate) essential: bool,
424    pub(crate) exit_code: Option<i64>,
425    /// Resolved `networkBindings` for DescribeTasks. Computed from the
426    /// task definition's `portMappings` at launch and surfaced verbatim
427    /// in the per-container response.
428    pub(crate) network_bindings: Vec<serde_json::Value>,
429    /// Image digest captured from `docker inspect` after pull. AWS
430    /// surfaces this on the Container response so callers can pin which
431    /// exact image revision a task is running. `None` when the inspect
432    /// failed or the CLI didn't expose `RepoDigests`.
433    pub(crate) image_digest: Option<String>,
434}
435
436/// Pure decision: does the current set of containers warrant stopping
437/// the task? Returns true when any essential container has exited, or
438/// when every container has exited (regardless of essential). Mirrors
439/// AWS ECS task lifetime semantics.
440pub(crate) fn task_should_stop(containers: &[RunningContainer]) -> bool {
441    if containers.is_empty() {
442        return true;
443    }
444    let any_essential_exited = containers
445        .iter()
446        .any(|c| c.essential && c.exit_code.is_some());
447    if any_essential_exited {
448        return true;
449    }
450    containers.iter().all(|c| c.exit_code.is_some())
451}
452
453fn build_container_plans(
454    state: &SharedEcsState,
455    account_id: &str,
456    task_id: &str,
457    _server_port: u16,
458) -> Result<Vec<ContainerPlan>, RuntimeError> {
459    let accounts = state.read();
460    let s = accounts
461        .get(account_id)
462        .ok_or_else(|| RuntimeError::ContainerStart("account missing".into()))?;
463    let task = s
464        .tasks
465        .get(task_id)
466        .ok_or_else(|| RuntimeError::ContainerStart("task missing".into()))?;
467    if task.containers.is_empty() {
468        return Err(RuntimeError::ContainerStart(
469            "task has no containers".into(),
470        ));
471    }
472    let has_task_role = task.task_role_arn.is_some();
473    let task_def = s
474        .task_definitions
475        .get(&task.family)
476        .and_then(|revs| revs.get(&task.revision));
477    let network_mode = task_def.and_then(|td| td.network_mode.clone());
478    // Index `volumes[]` by name so each container's `mountPoints[]` can
479    // resolve its volume in O(1). Real ECS rejects mountPoints that
480    // reference an undeclared volume at register time; we don't yet, so
481    // unresolved names just produce zero mounts at launch.
482    let volumes_by_name: std::collections::HashMap<String, &serde_json::Value> = task_def
483        .map(|td| {
484            td.volumes
485                .iter()
486                .filter_map(|v| {
487                    let name = v.get("name").and_then(|n| n.as_str())?;
488                    Some((name.to_string(), v))
489                })
490                .collect()
491        })
492        .unwrap_or_default();
493    let mut plans = Vec::with_capacity(task.containers.len());
494    for container in &task.containers {
495        let def = find_container_definition(s, &task.family, task.revision, &container.name);
496        let secrets_refs = def
497            .as_ref()
498            .and_then(|d| d.get("secrets").and_then(|v| v.as_array()).cloned())
499            .map(|arr| {
500                arr.iter()
501                    .filter_map(|e| {
502                        let name = e.get("name").and_then(|v| v.as_str())?.to_string();
503                        let value_from = e.get("valueFrom").and_then(|v| v.as_str())?.to_string();
504                        Some((name, value_from))
505                    })
506                    .collect::<Vec<_>>()
507            })
508            .unwrap_or_default();
509        let str_array = |key: &str| -> Vec<String> {
510            def.as_ref()
511                .and_then(|d| d.get(key).and_then(|v| v.as_array()).cloned())
512                .map(|arr| {
513                    arr.iter()
514                        .filter_map(|v| v.as_str().map(String::from))
515                        .collect::<Vec<_>>()
516                })
517                .unwrap_or_default()
518        };
519        let env = def
520            .as_ref()
521            .and_then(|d| d.get("environment").and_then(|v| v.as_array()).cloned())
522            .map(|arr| {
523                arr.iter()
524                    .filter_map(|e| {
525                        let k = e.get("name").and_then(|v| v.as_str())?;
526                        let v = e.get("value").and_then(|v| v.as_str()).unwrap_or("");
527                        Some((k.to_string(), v.to_string()))
528                    })
529                    .collect::<Vec<_>>()
530            })
531            .unwrap_or_default();
532        let port_mappings = def
533            .as_ref()
534            .and_then(|d| d.get("portMappings").and_then(|v| v.as_array()).cloned())
535            .map(|arr| {
536                arr.iter()
537                    .filter_map(parse_port_mapping)
538                    .collect::<Vec<_>>()
539            })
540            .unwrap_or_default();
541        let depends_on = def
542            .as_ref()
543            .and_then(|d| d.get("dependsOn").and_then(|v| v.as_array()).cloned())
544            .map(|arr| {
545                arr.iter()
546                    .filter_map(parse_depends_on_entry)
547                    .collect::<Vec<_>>()
548            })
549            .unwrap_or_default();
550        let health_check = def
551            .as_ref()
552            .and_then(|d| d.get("healthCheck"))
553            .and_then(parse_health_check);
554        let volume_mounts = def
555            .as_ref()
556            .and_then(|d| d.get("mountPoints").and_then(|v| v.as_array()).cloned())
557            .map(|arr| {
558                arr.iter()
559                    .filter_map(|mp| resolve_mount_point(mp, &volumes_by_name))
560                    .collect::<Vec<_>>()
561            })
562            .unwrap_or_default();
563        let ulimits = def
564            .as_ref()
565            .and_then(|d| d.get("ulimits").and_then(|v| v.as_array()).cloned())
566            .map(|arr| arr.iter().filter_map(parse_ulimit).collect::<Vec<_>>())
567            .unwrap_or_default();
568        let linux_parameters = def
569            .as_ref()
570            .and_then(|d| d.get("linuxParameters"))
571            .and_then(parse_linux_parameters);
572        let stop_timeout = def.as_ref().and_then(|d| {
573            d.get("stopTimeout")
574                .and_then(|v| v.as_u64())
575                .map(|n| n as u32)
576        });
577        let user = def
578            .as_ref()
579            .and_then(|d| d.get("user").and_then(|v| v.as_str()).map(String::from));
580        let working_directory = def.as_ref().and_then(|d| {
581            d.get("workingDirectory")
582                .and_then(|v| v.as_str())
583                .map(String::from)
584        });
585        let tty = def
586            .as_ref()
587            .and_then(|d| d.get("tty").and_then(|v| v.as_bool()))
588            .unwrap_or(false);
589        let interactive = def
590            .as_ref()
591            .and_then(|d| d.get("interactive").and_then(|v| v.as_bool()))
592            .unwrap_or(false);
593        let readonly_rootfs = def
594            .as_ref()
595            .and_then(|d| d.get("readonlyRootFilesystem").and_then(|v| v.as_bool()))
596            .unwrap_or(false);
597        plans.push(ContainerPlan {
598            container_name: container.name.clone(),
599            image: container.image.clone(),
600            env,
601            entry_point: str_array("entryPoint"),
602            command: str_array("command"),
603            secrets_refs,
604            essential: container.essential,
605            has_task_role,
606            port_mappings,
607            network_mode: network_mode.clone(),
608            depends_on,
609            health_check,
610            volume_mounts,
611            ulimits,
612            linux_parameters,
613            stop_timeout,
614            user,
615            working_directory,
616            tty,
617            interactive,
618            readonly_rootfs,
619        });
620    }
621    let plans = topo_sort_plans(plans);
622    Ok(plans)
623}
624
625/// Resolve one `mountPoints[]` entry against the indexed task-definition
626/// volumes. Returns `None` when:
627/// - the entry has no `containerPath` or `sourceVolume`,
628/// - the named volume isn't declared on the task definition.
629///
630/// Returns `Some(VolumeMount)` for every supported volume kind:
631/// host bind, EFS, FSx, named docker volume, anonymous docker volume.
632fn resolve_mount_point(
633    mount_point: &serde_json::Value,
634    volumes_by_name: &std::collections::HashMap<String, &serde_json::Value>,
635) -> Option<VolumeMount> {
636    let container_path = mount_point
637        .get("containerPath")
638        .and_then(|v| v.as_str())?
639        .to_string();
640    let source_volume = mount_point.get("sourceVolume").and_then(|v| v.as_str())?;
641    let read_only = mount_point
642        .get("readOnly")
643        .and_then(|v| v.as_bool())
644        .unwrap_or(false);
645    let volume = volumes_by_name.get(source_volume)?;
646    let source = resolve_volume_source(source_volume, volume)?;
647    Some(VolumeMount {
648        source,
649        container_path,
650        read_only,
651    })
652}
653
654/// Map a single task-definition `volumes[]` entry to the source side of a
655/// `docker run -v` flag. The matching here mirrors the AWS volume kinds:
656///
657/// 1. `host.sourcePath` -> use that path directly (bind mount).
658/// 2. `efsVolumeConfiguration.fileSystemId` -> stub directory under
659///    `/tmp/fakecloud/efs/<filesystemId>[/<rootDirectory>]`. Created with
660///    `mkdir -p` so different tasks targeting the same filesystem id
661///    share the same host directory, matching real EFS's "many tasks,
662///    one filesystem" semantics.
663/// 3. `fsxWindowsFileServerVolumeConfiguration.fileSystemId` -> stub
664///    directory under `/tmp/fakecloud/fsx/<filesystemId>/<rootDirectory>`.
665/// 4. `dockerVolumeConfiguration` -> the volume `name` itself (named
666///    docker volume; docker creates it on first reference).
667/// 5. Bare entry (only `name`) -> the volume `name` as an anonymous
668///    docker volume reference, matching AWS's "Docker volumes" default.
669///
670/// Returns `None` when the configuration is malformed (e.g. EFS without
671/// a fileSystemId).
672fn resolve_volume_source(name: &str, volume: &serde_json::Value) -> Option<String> {
673    if let Some(host) = volume.get("host") {
674        if let Some(path) = host.get("sourcePath").and_then(|v| v.as_str()) {
675            // Empty sourcePath means "anonymous host volume" — fall
676            // through to the named-volume default below.
677            if !path.is_empty() {
678                ensure_dir_exists(path);
679                return Some(path.to_string());
680            }
681        }
682    }
683    if let Some(efs) = volume.get("efsVolumeConfiguration") {
684        let fs_id = efs.get("fileSystemId").and_then(|v| v.as_str())?;
685        let root = efs
686            .get("rootDirectory")
687            .and_then(|v| v.as_str())
688            .unwrap_or("/");
689        return Some(shared_volume_name("efs", fs_id, root));
690    }
691    if let Some(fsx) = volume.get("fsxWindowsFileServerVolumeConfiguration") {
692        let fs_id = fsx.get("fileSystemId").and_then(|v| v.as_str())?;
693        let root = fsx
694            .get("rootDirectory")
695            .and_then(|v| v.as_str())
696            .unwrap_or("/");
697        return Some(shared_volume_name("fsx", fs_id, root));
698    }
699    if volume.get("dockerVolumeConfiguration").is_some() {
700        // Named docker volume — docker auto-creates it on first
701        // reference. Pass the volume name through verbatim.
702        return Some(name.to_string());
703    }
704    // Bare volume entry: anonymous docker volume keyed by name.
705    Some(name.to_string())
706}
707
708/// Compose the docker **named-volume** name for an EFS/FSx volume. A
709/// single shared volume per filesystem id when `rootDirectory` is unset
710/// or `/` (the EFS default mount target); otherwise the rootDirectory is
711/// folded into the name so distinct mount targets within one filesystem
712/// stay isolated. A docker named volume lives on the daemon rather than a
713/// host path, so tasks share state correctly *and* it works when
714/// fakecloud itself runs in a container (`FAKECLOUD_IN_CONTAINER=1`),
715/// where a host-path stub created inside fakecloud's own filesystem would
716/// resolve to an empty dir against the host daemon (issue #1539, bug 0.6).
717/// The segments are sanitized to docker's volume-name charset.
718fn shared_volume_name(kind: &str, fs_id: &str, root: &str) -> String {
719    let trimmed = root.trim_start_matches('/').trim_end_matches('/');
720    let fs_id = sanitize_volume_segment(fs_id);
721    if trimmed.is_empty() {
722        format!("fakecloud-{kind}-{fs_id}")
723    } else {
724        format!(
725            "fakecloud-{kind}-{fs_id}-{}",
726            sanitize_volume_segment(trimmed)
727        )
728    }
729}
730
731/// Map an arbitrary string to docker's volume-name charset by replacing
732/// every character outside `[A-Za-z0-9_.-]` with `-`.
733fn sanitize_volume_segment(s: &str) -> String {
734    s.chars()
735        .map(|c| {
736            if c.is_ascii_alphanumeric() || matches!(c, '_' | '.' | '-') {
737                c
738            } else {
739                '-'
740            }
741        })
742        .collect()
743}
744
745/// Best-effort `mkdir -p` so the EFS/FSx stub path exists before the
746/// first task tries to bind-mount it. Failures are ignored — docker
747/// will surface a clear error on the run, and unit tests don't have a
748/// writable `/tmp/fakecloud` in every sandbox.
749fn ensure_dir_exists(path: &str) {
750    let _ = std::fs::create_dir_all(path);
751}
752
753/// Parse one `dependsOn[]` entry. Returns `None` for malformed entries
754/// (missing `containerName`, unrecognised `condition`) so the caller
755/// can drop them silently from the launch plan — register-time
756/// validation already rejects bad values; this is a defensive fallback.
757fn parse_depends_on_entry(value: &serde_json::Value) -> Option<DependsOn> {
758    let container_name = value
759        .get("containerName")
760        .and_then(|v| v.as_str())?
761        .to_string();
762    let raw_condition = value.get("condition").and_then(|v| v.as_str())?;
763    let condition = DependsOnCondition::parse(raw_condition)?;
764    Some(DependsOn {
765        container_name,
766        condition,
767    })
768}
769
770/// Topologically sort container plans so `dependsOn` dependencies start
771/// before their dependants. Implements Kahn's algorithm with stable order:
772/// when multiple plans are ready, we keep their original declaration
773/// index, so a task without any dependsOn launches in the same order the
774/// user wrote in the task definition. Cycles fall through with the
775/// remaining plans appended in original order — the runtime will still
776/// launch every container; it just can't guarantee dependency ordering
777/// in that degenerate case. Cycles are rejected at register time
778/// (RegisterTaskDefinition -> validate_depends_on_acyclic), so reaching
779/// that branch from a real launch path means a bug elsewhere.
780fn topo_sort_plans(plans: Vec<ContainerPlan>) -> Vec<ContainerPlan> {
781    use std::collections::{HashMap, HashSet};
782    let names: HashSet<String> = plans.iter().map(|p| p.container_name.clone()).collect();
783    let index: HashMap<String, usize> = plans
784        .iter()
785        .enumerate()
786        .map(|(i, p)| (p.container_name.clone(), i))
787        .collect();
788    // in_degree[i] = number of unresolved dependencies for plan i. We
789    // ignore depends_on entries that name a container not in the task
790    // (real ECS rejects those at register time; our register path doesn't
791    // yet, so be defensive here).
792    let mut in_degree: Vec<usize> = plans
793        .iter()
794        .map(|p| {
795            p.depends_on
796                .iter()
797                .filter(|d| names.contains(&d.container_name))
798                .count()
799        })
800        .collect();
801    // dependants[i] = indices of plans that depend on plan i.
802    let mut dependants: Vec<Vec<usize>> = vec![Vec::new(); plans.len()];
803    for (i, p) in plans.iter().enumerate() {
804        for d in &p.depends_on {
805            if let Some(&di) = index.get(&d.container_name) {
806                dependants[di].push(i);
807            }
808        }
809    }
810    let mut ordered: Vec<ContainerPlan> = Vec::with_capacity(plans.len());
811    let mut emitted: Vec<bool> = vec![false; plans.len()];
812    loop {
813        // Pick the lowest-index plan whose in_degree is 0 to keep stable
814        // order across runs.
815        let next = (0..plans.len()).find(|&i| !emitted[i] && in_degree[i] == 0);
816        match next {
817            Some(i) => {
818                emitted[i] = true;
819                ordered.push(plans[i].clone());
820                for &di in &dependants[i] {
821                    if in_degree[di] > 0 {
822                        in_degree[di] -= 1;
823                    }
824                }
825            }
826            None => break,
827        }
828    }
829    // Cycle: append anything left in original order so we don't drop plans.
830    for (i, p) in plans.into_iter().enumerate() {
831        if !emitted[i] {
832            ordered.push(p);
833        }
834    }
835    ordered
836}
837
838/// Validate that `containerDefinitions[].dependsOn[]` graph is acyclic.
839/// Real ECS rejects cyclic dependencies at RegisterTaskDefinition time
840/// with a `ClientException`; we mirror that. Returns the offending pair
841/// of container names so the caller can produce a useful error.
842///
843/// Operates directly on the raw JSON definitions (rather than parsed
844/// `ContainerPlan`s) so register-time validation doesn't have to first
845/// build a full plan from a not-yet-stored task definition.
846pub(crate) fn find_depends_on_cycle(
847    container_definitions: &[serde_json::Value],
848) -> Option<(String, String)> {
849    use std::collections::HashMap;
850
851    let names: Vec<String> = container_definitions
852        .iter()
853        .filter_map(|c| c.get("name").and_then(|n| n.as_str()).map(String::from))
854        .collect();
855    let index: HashMap<&str, usize> = names
856        .iter()
857        .enumerate()
858        .map(|(i, n)| (n.as_str(), i))
859        .collect();
860
861    let mut adj: Vec<Vec<usize>> = vec![Vec::new(); names.len()];
862    for (i, cd) in container_definitions.iter().enumerate() {
863        if i >= names.len() {
864            continue;
865        }
866        let Some(deps) = cd.get("dependsOn").and_then(|v| v.as_array()) else {
867            continue;
868        };
869        for d in deps {
870            let Some(target) = d.get("containerName").and_then(|v| v.as_str()) else {
871                continue;
872            };
873            if let Some(&j) = index.get(target) {
874                // Edge: i depends on j -> for cycle DFS we walk from i to j.
875                adj[i].push(j);
876            }
877        }
878    }
879
880    // DFS with three-colour marking (white=0, gray=1, black=2). When we
881    // hit a gray neighbour we've closed a cycle; report the back-edge as
882    // the offending pair.
883    let mut state = vec![0u8; names.len()];
884    let mut stack: Vec<(usize, usize)> = Vec::new();
885    for start in 0..names.len() {
886        if state[start] != 0 {
887            continue;
888        }
889        stack.clear();
890        stack.push((start, 0));
891        state[start] = 1;
892        while let Some(&(node, next_edge)) = stack.last() {
893            if next_edge < adj[node].len() {
894                let nb = adj[node][next_edge];
895                stack.last_mut().unwrap().1 += 1;
896                match state[nb] {
897                    0 => {
898                        state[nb] = 1;
899                        stack.push((nb, 0));
900                    }
901                    1 => {
902                        return Some((names[node].clone(), names[nb].clone()));
903                    }
904                    _ => {}
905                }
906            } else {
907                state[node] = 2;
908                stack.pop();
909            }
910        }
911    }
912    None
913}
914
915/// Snapshot of the docker container state we care about for `dependsOn`
916/// gating: whether the container exists/started, whether it's exited,
917/// its exit code, and (when configured) its health status.
918#[derive(Debug, Clone)]
919struct InspectedState {
920    started: bool,
921    exited: bool,
922    exit_code: i64,
923    health: Option<String>,
924}
925
926/// One `docker inspect` call returning every field needed by
927/// [`condition_is_met`]. Returns `None` when the container doesn't exist
928/// yet or inspect fails — the caller will simply retry on the next poll.
929async fn inspect_container_state(cli: &str, container_id: &str) -> Option<InspectedState> {
930    // Compose all four fields into a single inspect format so the gate
931    // costs one process spawn per poll rather than four.
932    let format =
933        "{{.State.Status}}|{{.State.Running}}|{{.State.ExitCode}}|{{if .State.Health}}{{.State.Health.Status}}{{else}}<none>{{end}}";
934    let out = Command::new(cli)
935        .args(["inspect", "-f", format, container_id])
936        .output()
937        .await
938        .ok()?;
939    if !out.status.success() {
940        return None;
941    }
942    let raw = String::from_utf8_lossy(&out.stdout).trim().to_string();
943    let parts: Vec<&str> = raw.split('|').collect();
944    if parts.len() < 4 {
945        return None;
946    }
947    let status = parts[0];
948    let running = parts[1] == "true";
949    let exit_code: i64 = parts[2].parse().unwrap_or(-1);
950    let health = match parts[3] {
951        "<none>" | "" => None,
952        other => Some(other.to_string()),
953    };
954    // `created` is the brief moment between docker creating the
955    // container and the entrypoint running. Treat anything past
956    // `created` as "started" for the START condition.
957    let started = running || status == "exited" || status == "running" || status == "dead";
958    let exited = status == "exited" || status == "dead";
959    Some(InspectedState {
960        started,
961        exited,
962        exit_code,
963        health,
964    })
965}
966
967/// Decide whether the polled `state` satisfies a `dependsOn[].condition`.
968/// Encapsulates the AWS semantics so the polling loop is purely
969/// mechanical.
970fn condition_is_met(condition: DependsOnCondition, state: &InspectedState) -> bool {
971    match condition {
972        DependsOnCondition::Start => state.started,
973        DependsOnCondition::Complete => state.exited,
974        DependsOnCondition::Success => state.exited && state.exit_code == 0,
975        DependsOnCondition::Healthy => state.health.as_deref() == Some("healthy"),
976    }
977}
978
979/// Test-only re-export of [`parse_port_mapping`] so sibling test modules
980/// can lock in the default-port / default-protocol behaviour without us
981/// widening the visibility of the parser itself.
982#[cfg(test)]
983pub(crate) fn __test_parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
984    parse_port_mapping(value)
985}
986
987/// Parse a `healthCheck` block from a task definition's container
988/// definition. Returns `None` for missing `command` or for a command
989/// whose first token is `NONE` (the AWS-documented "disable healthcheck
990/// inherited from image" sentinel — emit no flags rather than a `none`
991/// healthcheck). Defaults follow AWS: 30s/5s/3/0s.
992fn parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
993    let cmd_arr = value.get("command")?.as_array()?;
994    let command: Vec<String> = cmd_arr
995        .iter()
996        .filter_map(|v| v.as_str().map(String::from))
997        .collect();
998    if command.is_empty() {
999        return None;
1000    }
1001    if command.first().map(|s| s.as_str()) == Some("NONE") {
1002        return None;
1003    }
1004    let read_u32 = |key: &str, default: u32| -> u32 {
1005        value
1006            .get(key)
1007            .and_then(|v| v.as_i64())
1008            .filter(|n| (0..=u32::MAX as i64).contains(n))
1009            .map(|n| n as u32)
1010            .unwrap_or(default)
1011    };
1012    Some(HealthCheckSpec {
1013        command,
1014        interval_seconds: read_u32("interval", 30),
1015        timeout_seconds: read_u32("timeout", 5),
1016        retries: read_u32("retries", 3),
1017        start_period_seconds: read_u32("startPeriod", 0),
1018    })
1019}
1020
1021/// Parse one `ulimits` entry from the container definition JSON.
1022fn parse_ulimit(value: &serde_json::Value) -> Option<Ulimit> {
1023    let name = value.get("name").and_then(|v| v.as_str())?;
1024    let soft = value
1025        .get("softLimit")
1026        .and_then(|v| v.as_i64())
1027        .filter(|n| *n >= 0)? as i32;
1028    let hard = value
1029        .get("hardLimit")
1030        .and_then(|v| v.as_i64())
1031        .filter(|n| *n >= 0)? as i32;
1032    Some(Ulimit {
1033        name: name.to_string(),
1034        soft_limit: soft,
1035        hard_limit: hard,
1036    })
1037}
1038
1039/// Parse `linuxParameters` from the container definition JSON.
1040fn parse_linux_parameters(value: &serde_json::Value) -> Option<LinuxParameters> {
1041    let mut lp = LinuxParameters::default();
1042    if let Some(arr) = value
1043        .get("capabilities")
1044        .and_then(|v| v.get("add"))
1045        .and_then(|v| v.as_array())
1046    {
1047        lp.capabilities_add = arr
1048            .iter()
1049            .filter_map(|v| v.as_str().map(String::from))
1050            .collect();
1051    }
1052    if let Some(arr) = value
1053        .get("capabilities")
1054        .and_then(|v| v.get("drop"))
1055        .and_then(|v| v.as_array())
1056    {
1057        lp.capabilities_drop = arr
1058            .iter()
1059            .filter_map(|v| v.as_str().map(String::from))
1060            .collect();
1061    }
1062    if let Some(arr) = value.get("devices").and_then(|v| v.as_array()) {
1063        lp.devices = arr.iter().filter_map(parse_device).collect();
1064    }
1065    lp.init_process_enabled = value
1066        .get("initProcessEnabled")
1067        .and_then(|v| v.as_bool())
1068        .unwrap_or(false);
1069    lp.shared_memory_size = value
1070        .get("sharedMemorySize")
1071        .and_then(|v| v.as_i64())
1072        .map(|n| n as i32);
1073    if let Some(arr) = value.get("sysctl").and_then(|v| v.as_array()) {
1074        lp.sysctls = arr.iter().filter_map(parse_sysctl).collect();
1075    }
1076    if let Some(arr) = value.get("tmpfs").and_then(|v| v.as_array()) {
1077        lp.tmpfs = arr.iter().filter_map(parse_tmpfs).collect();
1078    }
1079    lp.privileged = value
1080        .get("privileged")
1081        .and_then(|v| v.as_bool())
1082        .unwrap_or(false);
1083    Some(lp)
1084}
1085
1086fn parse_device(value: &serde_json::Value) -> Option<Device> {
1087    let host_path = value.get("hostPath").and_then(|v| v.as_str())?.to_string();
1088    let container_path = value
1089        .get("containerPath")
1090        .and_then(|v| v.as_str())?
1091        .to_string();
1092    let permissions = value
1093        .get("permissions")
1094        .and_then(|v| v.as_str())
1095        .unwrap_or("rwm")
1096        .to_string();
1097    Some(Device {
1098        host_path,
1099        container_path,
1100        permissions,
1101    })
1102}
1103
1104fn parse_sysctl(value: &serde_json::Value) -> Option<Sysctl> {
1105    let name = value.get("name").and_then(|v| v.as_str())?.to_string();
1106    let value_str = value.get("value").and_then(|v| v.as_str())?.to_string();
1107    Some(Sysctl {
1108        name,
1109        value: value_str,
1110    })
1111}
1112
1113fn parse_tmpfs(value: &serde_json::Value) -> Option<Tmpfs> {
1114    let container_path = value
1115        .get("containerPath")
1116        .and_then(|v| v.as_str())?
1117        .to_string();
1118    let size = value
1119        .get("size")
1120        .and_then(|v| v.as_i64())
1121        .filter(|n| *n > 0)? as i32;
1122    let mount_options = value
1123        .get("mountOptions")
1124        .and_then(|v| v.as_array())
1125        .map(|arr| {
1126            arr.iter()
1127                .filter_map(|v| v.as_str().map(String::from))
1128                .collect()
1129        })
1130        .unwrap_or_default();
1131    Some(Tmpfs {
1132        container_path,
1133        size,
1134        mount_options,
1135    })
1136}
1137
1138/// Render a [`HealthCheckSpec`] into the docker run flags that emulate
1139/// the equivalent ECS healthCheck. AWS's `command[0]` is a sentinel
1140/// (`CMD-SHELL`/`CMD`/`NONE`); docker's `--health-cmd` always takes a
1141/// single shell-string, so we collapse the remaining tokens with spaces
1142/// for either sentinel — matching how docker itself stringifies HEALTHCHECK
1143/// CMD ["a","b"] back to a shell string at inspect time.
1144pub(crate) fn render_health_flags(hc: &HealthCheckSpec) -> Vec<String> {
1145    if hc.command.len() < 2 {
1146        return Vec::new();
1147    }
1148    let cmd_kind = hc.command[0].as_str();
1149    if cmd_kind != "CMD" && cmd_kind != "CMD-SHELL" {
1150        return Vec::new();
1151    }
1152    let cmd_string = hc.command[1..].join(" ");
1153    vec![
1154        "--health-cmd".into(),
1155        cmd_string,
1156        format!("--health-interval={}s", hc.interval_seconds),
1157        format!("--health-timeout={}s", hc.timeout_seconds),
1158        format!("--health-retries={}", hc.retries),
1159        format!("--health-start-period={}s", hc.start_period_seconds),
1160    ]
1161}
1162
1163/// Test-only re-export of [`parse_health_check`] so unit tests in
1164/// sibling modules can lock in the AWS default-fill behaviour without
1165/// us widening the parser's visibility.
1166#[cfg(test)]
1167pub(crate) fn __test_parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
1168    parse_health_check(value)
1169}
1170
1171/// Map a docker `.State.Health.Status` value to the ECS `healthStatus`
1172/// shape. Docker emits `starting|healthy|unhealthy|none|""` (empty when
1173/// the image has no HEALTHCHECK and we didn't add one). ECS only knows
1174/// `HEALTHY|UNHEALTHY|UNKNOWN`, so anything that isn't a clean healthy/
1175/// unhealthy lands in `UNKNOWN`.
1176pub(crate) fn docker_health_to_ecs(raw: &str) -> &'static str {
1177    match raw.trim().to_ascii_lowercase().as_str() {
1178        "healthy" => "HEALTHY",
1179        "unhealthy" => "UNHEALTHY",
1180        _ => "UNKNOWN",
1181    }
1182}
1183
1184/// Parse a single `portMappings[]` entry. Returns `None` for entries
1185/// that are missing `containerPort` or have a value out of `u16` range.
1186/// Defaults: `hostPort` -> `containerPort`, `protocol` -> `tcp`.
1187fn parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
1188    let container_port = value
1189        .get("containerPort")
1190        .and_then(|v| v.as_i64())
1191        .filter(|n| (0..=u16::MAX as i64).contains(n))? as u16;
1192    let host_port_raw = value
1193        .get("hostPort")
1194        .and_then(|v| v.as_i64())
1195        .filter(|n| (0..=u16::MAX as i64).contains(n))
1196        .map(|n| n as u16)
1197        .unwrap_or(0);
1198    let host_port = if host_port_raw == 0 {
1199        container_port
1200    } else {
1201        host_port_raw
1202    };
1203    let protocol = value
1204        .get("protocol")
1205        .and_then(|v| v.as_str())
1206        .map(|s| s.to_ascii_lowercase())
1207        .unwrap_or_else(|| "tcp".to_string());
1208    Some(PortMapping {
1209        container_port,
1210        host_port,
1211        protocol,
1212    })
1213}
1214
1215/// Build the docker `run` argv for a single container plan. Pure so unit
1216/// tests can assert on flag ordering / `--publish` translation without
1217/// shelling out. The returned vector is everything *after* the binary
1218/// name (i.e. starts with `run`, ends with the user-supplied command
1219/// args).
1220pub(crate) fn build_run_argv(
1221    plan: &ContainerPlan,
1222    env: &[(String, String)],
1223    task_id: &str,
1224    host_alias: &str,
1225    add_host_arg: Option<&str>,
1226    run_image: &str,
1227    awsvpc_network_ready: bool,
1228) -> Vec<String> {
1229    let mut argv: Vec<String> = Vec::new();
1230    argv.push("run".into());
1231    argv.push("-d".into());
1232    argv.push("--name".into());
1233    argv.push(format!("{}-{}", task_id, plan.container_name));
1234    argv.push("--label".into());
1235    argv.push(format!("fakecloud-ecs-task={}", task_id));
1236    argv.push("--label".into());
1237    argv.push(format!("fakecloud-ecs-container={}", plan.container_name));
1238    // Inject `--add-host host.docker.internal:<ip>` only for docker;
1239    // podman provides `host.containers.internal` natively and rejects
1240    // the host-gateway mapping (issue #1539).
1241    if let Some(arg) = add_host_arg {
1242        argv.push("--add-host".into());
1243        argv.push(arg.to_string());
1244    }
1245    let use_awsvpc_network = plan.network_mode.as_deref() == Some("awsvpc") && awsvpc_network_ready;
1246    if use_awsvpc_network {
1247        argv.push("--network".into());
1248        argv.push(format!("fakecloud-ecs-{}", task_id));
1249    }
1250    // `awsvpc` puts the container on a per-task ENI; emulating that on a
1251    // local docker host means *not* publishing to the host port table.
1252    // Bridge / host / default network modes still get `--publish`. If
1253    // the awsvpc per-task network creation failed and we fell back to
1254    // bridge, we DO want to publish so the container is reachable.
1255    let publish_ports = !use_awsvpc_network;
1256    if publish_ports {
1257        for pm in &plan.port_mappings {
1258            argv.push("--publish".into());
1259            argv.push(format!(
1260                "{}:{}/{}",
1261                pm.container_port, pm.host_port, pm.protocol
1262            ));
1263        }
1264    }
1265    if let Some(ref hc) = plan.health_check {
1266        argv.extend(render_health_flags(hc));
1267    }
1268    let http_alias_prefix = format!("http://{host_alias}:");
1269    let https_alias_prefix = format!("https://{host_alias}:");
1270    for (k, v) in env {
1271        let transformed = v
1272            .replace("http://127.0.0.1:", http_alias_prefix.as_str())
1273            .replace("https://127.0.0.1:", https_alias_prefix.as_str())
1274            .replace("http://localhost:", http_alias_prefix.as_str())
1275            .replace("https://localhost:", https_alias_prefix.as_str());
1276        argv.push("-e".into());
1277        argv.push(format!("{}={}", k, transformed));
1278    }
1279    // Volume mounts: one `-v` flag per mountPoints entry, with the
1280    // source resolved from the task definition's `volumes[]`. EFS and
1281    // FSx stubs were materialised on the host (mkdir -p) before this
1282    // function returns, so docker can bind them straight in.
1283    for vm in &plan.volume_mounts {
1284        argv.push("-v".into());
1285        let suffix = if vm.read_only { ":ro" } else { "" };
1286        argv.push(format!("{}:{}{}", vm.source, vm.container_path, suffix));
1287    }
1288    for ul in &plan.ulimits {
1289        argv.push("--ulimit".into());
1290        argv.push(format!("{}={}:{}", ul.name, ul.soft_limit, ul.hard_limit));
1291    }
1292    if let Some(ref lp) = plan.linux_parameters {
1293        for cap in &lp.capabilities_add {
1294            argv.push("--cap-add".into());
1295            argv.push(cap.clone());
1296        }
1297        for cap in &lp.capabilities_drop {
1298            argv.push("--cap-drop".into());
1299            argv.push(cap.clone());
1300        }
1301        for dev in &lp.devices {
1302            argv.push("--device".into());
1303            argv.push(format!(
1304                "{}:{}{}",
1305                dev.host_path, dev.container_path, dev.permissions
1306            ));
1307        }
1308        if lp.init_process_enabled {
1309            argv.push("--init".into());
1310        }
1311        if let Some(size) = lp.shared_memory_size {
1312            argv.push("--shm-size".into());
1313            argv.push(format!("{}m", size));
1314        }
1315        for sys in &lp.sysctls {
1316            argv.push("--sysctl".into());
1317            argv.push(format!("{}={}", sys.name, sys.value));
1318        }
1319        for tmp in &lp.tmpfs {
1320            let mut opts = tmp.mount_options.join(",");
1321            if !opts.is_empty() {
1322                opts = format!(",{}", opts);
1323            }
1324            argv.push("--tmpfs".into());
1325            argv.push(format!("{}:size={}M{}", tmp.container_path, tmp.size, opts));
1326        }
1327        if lp.privileged {
1328            argv.push("--privileged".into());
1329        }
1330    }
1331    if let Some(timeout) = plan.stop_timeout {
1332        argv.push("--stop-timeout".into());
1333        argv.push(format!("{}", timeout));
1334    }
1335    if let Some(ref user) = plan.user {
1336        argv.push("--user".into());
1337        argv.push(user.clone());
1338    }
1339    if let Some(ref wd) = plan.working_directory {
1340        argv.push("--workdir".into());
1341        argv.push(wd.clone());
1342    }
1343    if plan.tty {
1344        argv.push("--tty".into());
1345    }
1346    if plan.interactive {
1347        argv.push("--interactive".into());
1348    }
1349    if plan.readonly_rootfs {
1350        argv.push("--read-only".into());
1351    }
1352    if let Some(first) = plan.entry_point.first() {
1353        argv.push("--entrypoint".into());
1354        argv.push(first.clone());
1355    }
1356    argv.push(run_image.to_string());
1357    for arg in plan.entry_point.iter().skip(1) {
1358        argv.push(arg.clone());
1359    }
1360    for arg in &plan.command {
1361        argv.push(arg.clone());
1362    }
1363    argv
1364}
1365
1366/// Render `networkBindings` JSON for a launched container. Empty under
1367/// `awsvpc` (the equivalent info goes on the task's ENI attachments) and
1368/// for containers without `portMappings`.
1369pub(crate) fn network_bindings_for(plan: &ContainerPlan) -> Vec<serde_json::Value> {
1370    if plan.network_mode.as_deref() == Some("awsvpc") {
1371        return Vec::new();
1372    }
1373    plan.port_mappings
1374        .iter()
1375        .map(|pm| {
1376            serde_json::json!({
1377                "bindIP": "0.0.0.0",
1378                "containerPort": pm.container_port,
1379                "hostPort": pm.host_port,
1380                "protocol": pm.protocol,
1381            })
1382        })
1383        .collect()
1384}
1385
1386/// Compute ELBv2 target registrations for a task based on its service's
1387/// loadBalancers configuration. Returns (target_group_arn, [(target_id, port)])
1388/// for each target group that should receive this task.
1389#[allow(clippy::type_complexity)]
1390pub(crate) fn compute_elbv2_targets(
1391    ecs_state: &crate::state::EcsState,
1392    task: &crate::state::Task,
1393) -> Vec<(String, Vec<(String, Option<i64>)>)> {
1394    let mut result = Vec::new();
1395    let Some(group) = task.group.as_deref() else {
1396        return result;
1397    };
1398    let service_name = group.strip_prefix("service:").unwrap_or(group);
1399    let key = crate::state::EcsState::service_key(&task.cluster_name, service_name);
1400    let Some(service) = ecs_state.services.get(&key) else {
1401        return result;
1402    };
1403
1404    let network_mode = ecs_state
1405        .task_definitions
1406        .get(&task.family)
1407        .and_then(|revs| revs.get(&task.revision))
1408        .and_then(|td| td.network_mode.as_deref());
1409
1410    for lb in &service.load_balancers {
1411        let tg_arn = lb.get("targetGroupArn").and_then(|v| v.as_str());
1412        let container_name = lb.get("containerName").and_then(|v| v.as_str());
1413        let container_port = lb.get("containerPort").and_then(|v| v.as_i64());
1414        let Some(tg_arn) = tg_arn else { continue };
1415        let Some(container_name) = container_name else {
1416            continue;
1417        };
1418
1419        let target_id = if network_mode == Some("awsvpc") {
1420            task.attachments
1421                .iter()
1422                .find(|a| a.attachment_type == "eni")
1423                .and_then(|eni| {
1424                    eni.details
1425                        .iter()
1426                        .find(|d| d.name == "privateIPv4Address")
1427                        .map(|d| d.value.clone())
1428                })
1429        } else {
1430            Some("127.0.0.1".to_string())
1431        };
1432
1433        let port = if network_mode == Some("awsvpc") {
1434            container_port
1435        } else {
1436            task.containers
1437                .iter()
1438                .find(|c| c.name == container_name)
1439                .and_then(|c| {
1440                    c.network_bindings
1441                        .iter()
1442                        .find(|nb| {
1443                            nb.get("containerPort").and_then(|v| v.as_i64()) == container_port
1444                        })
1445                        .and_then(|nb| nb.get("hostPort").and_then(|v| v.as_i64()))
1446                })
1447        };
1448
1449        if let Some(id) = target_id {
1450            if let Some(entry) = result.iter_mut().find(|(arn, _)| arn == tg_arn) {
1451                entry.1.push((id, port));
1452            } else {
1453                result.push((tg_arn.to_string(), vec![(id, port)]));
1454            }
1455        }
1456    }
1457    result
1458}
1459
1460struct TaskSnapshot {
1461    task_arn: String,
1462    cluster_arn: String,
1463    launch_type: String,
1464    group: Option<String>,
1465    task_definition_arn: String,
1466    containers: serde_json::Value,
1467}
1468
1469fn snapshot_task(state: &SharedEcsState, account_id: &str, task_id: &str) -> Option<TaskSnapshot> {
1470    let accounts = state.read();
1471    let s = accounts.get(account_id)?;
1472    let task = s.tasks.get(task_id)?;
1473    Some(TaskSnapshot {
1474        task_arn: task.task_arn.clone(),
1475        cluster_arn: task.cluster_arn.clone(),
1476        launch_type: task.launch_type.clone(),
1477        group: task.group.clone(),
1478        task_definition_arn: task.task_definition_arn.clone(),
1479        containers: serde_json::Value::Array(
1480            task.containers
1481                .iter()
1482                .map(|c| {
1483                    serde_json::json!({
1484                        "containerArn": c.container_arn,
1485                        "name": c.name,
1486                        "image": c.image,
1487                        "lastStatus": c.last_status,
1488                        "exitCode": c.exit_code,
1489                        "reason": c.reason,
1490                    })
1491                })
1492                .collect(),
1493        ),
1494    })
1495}
1496
1497/// Build an isolated docker config directory with Basic auth for
1498/// fakecloud ECR. Lets `docker pull/push/tag` work against the local OCI
1499/// v2 registry without requiring the user to run
1500/// `aws ecr get-login-password | docker login` first. Authorizes both
1501/// `127.0.0.1` (fakecloud on the host) and `host.docker.internal`
1502/// (fakecloud in a container, pull URIs rewritten to the sibling host —
1503/// issue #1539, bug 0.8).
1504fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
1505    let dir = TempDir::new().ok()?;
1506    let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-ecs-runtime");
1507    let config = serde_json::json!({
1508        "auths": {
1509            format!("127.0.0.1:{server_port}"): { "auth": auth },
1510            format!("host.docker.internal:{server_port}"): { "auth": auth },
1511        }
1512    });
1513    std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
1514    Some(dir)
1515}
1516
1517fn find_container_definition(
1518    state: &crate::state::EcsState,
1519    family: &str,
1520    revision: i32,
1521    name: &str,
1522) -> Option<serde_json::Value> {
1523    state
1524        .task_definitions
1525        .get(family)?
1526        .get(&revision)?
1527        .container_definitions
1528        .iter()
1529        .find(|c| c.get("name").and_then(|v| v.as_str()) == Some(name))
1530        .cloned()
1531}
1532
1533fn mark_pull_started(state: &SharedEcsState, account_id: &str, task_id: &str) {
1534    let mut accounts = state.write();
1535    let Some(s) = accounts.get_mut(account_id) else {
1536        return;
1537    };
1538    let task_arn_cluster = s
1539        .tasks
1540        .get(task_id)
1541        .map(|t| (t.task_arn.clone(), t.cluster_arn.clone()));
1542    if let Some(task) = s.tasks.get_mut(task_id) {
1543        task.pull_started_at = Some(Utc::now());
1544    }
1545    if let Some((arn, cluster_arn)) = task_arn_cluster {
1546        s.push_event(LifecycleEvent {
1547            at: Utc::now(),
1548            event_type: "PullStarted".into(),
1549            task_arn: Some(arn),
1550            cluster_arn: Some(cluster_arn),
1551            last_status: Some("PENDING".into()),
1552            detail: serde_json::json!({}),
1553        });
1554    }
1555}
1556
1557fn mark_pull_stopped(state: &SharedEcsState, account_id: &str, task_id: &str) {
1558    let mut accounts = state.write();
1559    let Some(s) = accounts.get_mut(account_id) else {
1560        return;
1561    };
1562    if let Some(task) = s.tasks.get_mut(task_id) {
1563        task.pull_stopped_at = Some(Utc::now());
1564    }
1565}
1566
1567pub(crate) fn mark_running_multi(
1568    state: &SharedEcsState,
1569    account_id: &str,
1570    task_id: &str,
1571    started: &[RunningContainer],
1572) {
1573    let mut accounts = state.write();
1574    let Some(s) = accounts.get_mut(account_id) else {
1575        return;
1576    };
1577    let (arn, cluster_arn) = {
1578        let Some(task) = s.tasks.get_mut(task_id) else {
1579            return;
1580        };
1581        task.last_status = "RUNNING".into();
1582        task.connectivity = "CONNECTED".into();
1583        task.connectivity_at = Some(Utc::now());
1584        task.started_at = Some(Utc::now());
1585        for rc in started {
1586            if let Some(c) = task.containers.iter_mut().find(|c| c.name == rc.name) {
1587                c.runtime_id = Some(rc.container_id.clone());
1588                c.last_status = "RUNNING".into();
1589                c.network_bindings = rc.network_bindings.clone();
1590                if rc.image_digest.is_some() {
1591                    c.image_digest = rc.image_digest.clone();
1592                }
1593            }
1594        }
1595        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1596            cluster.running_tasks_count += 1;
1597            if cluster.pending_tasks_count > 0 {
1598                cluster.pending_tasks_count -= 1;
1599            }
1600        }
1601        if let Some(ref ci_arn) = task.container_instance_arn {
1602            if let Some(ci) = s
1603                .container_instances
1604                .values_mut()
1605                .find(|ci| ci.container_instance_arn == *ci_arn)
1606            {
1607                ci.running_tasks_count += 1;
1608                if ci.pending_tasks_count > 0 {
1609                    ci.pending_tasks_count -= 1;
1610                }
1611            }
1612        }
1613        (task.task_arn.clone(), task.cluster_arn.clone())
1614    };
1615    s.push_event(LifecycleEvent {
1616        at: Utc::now(),
1617        event_type: "TaskStateChange".into(),
1618        task_arn: Some(arn),
1619        cluster_arn: Some(cluster_arn),
1620        last_status: Some("RUNNING".into()),
1621        detail: serde_json::json!({}),
1622    });
1623}
1624
1625#[allow(clippy::too_many_arguments)]
1626fn finalize_stopped_multi(
1627    state: &SharedEcsState,
1628    account_id: &str,
1629    task_id: &str,
1630    final_containers: &[RunningContainer],
1631    primary_exit_code: i64,
1632    captured: &str,
1633    stop_code: &str,
1634    stopped_reason: Option<String>,
1635) {
1636    let mut accounts = state.write();
1637    let Some(s) = accounts.get_mut(account_id) else {
1638        return;
1639    };
1640    let (arn, cluster_arn) = {
1641        let Some(task) = s.tasks.get_mut(task_id) else {
1642            return;
1643        };
1644        task.last_status = "STOPPED".into();
1645        task.desired_status = "STOPPED".into();
1646        task.stopping_at = task.stopping_at.or(Some(Utc::now()));
1647        task.stopped_at = Some(Utc::now());
1648        task.stop_code = Some(stop_code.into());
1649        task.stopped_reason = stopped_reason.or(Some(format!("Exit code {}", primary_exit_code)));
1650        task.captured_logs = captured.to_string();
1651        for c in task.containers.iter_mut() {
1652            c.last_status = "STOPPED".into();
1653            if c.exit_code.is_none() {
1654                let mapped = final_containers
1655                    .iter()
1656                    .find(|r| r.name == c.name)
1657                    .and_then(|r| r.exit_code);
1658                c.exit_code = mapped.or(Some(primary_exit_code));
1659            }
1660        }
1661        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1662            if cluster.running_tasks_count > 0 {
1663                cluster.running_tasks_count -= 1;
1664            }
1665        }
1666        if let Some(ref ci_arn) = task.container_instance_arn {
1667            if let Some(ci) = s
1668                .container_instances
1669                .values_mut()
1670                .find(|ci| ci.container_instance_arn == *ci_arn)
1671            {
1672                if ci.running_tasks_count > 0 {
1673                    ci.running_tasks_count -= 1;
1674                }
1675            }
1676        }
1677        (task.task_arn.clone(), task.cluster_arn.clone())
1678    };
1679    s.push_event(LifecycleEvent {
1680        at: Utc::now(),
1681        event_type: "TaskStateChange".into(),
1682        task_arn: Some(arn),
1683        cluster_arn: Some(cluster_arn),
1684        last_status: Some("STOPPED".into()),
1685        detail: serde_json::json!({
1686            "exitCode": primary_exit_code,
1687            "stopCode": stop_code,
1688        }),
1689    });
1690}
1691
1692fn finalize_failure(state: &SharedEcsState, account_id: &str, task_id: &str, reason: &str) {
1693    let mut accounts = state.write();
1694    let Some(s) = accounts.get_mut(account_id) else {
1695        return;
1696    };
1697    let (arn, cluster_arn) = {
1698        let Some(task) = s.tasks.get_mut(task_id) else {
1699            return;
1700        };
1701        // Capture the prior status before we clobber it: if the task had
1702        // already reached RUNNING when execution failed (e.g. `docker wait`
1703        // blew up after the container started), we owe the cluster a
1704        // running-tasks decrement. Tasks that died before RUNNING only
1705        // ever incremented pendingTasksCount.
1706        let was_running = task.last_status == "RUNNING";
1707        task.last_status = "STOPPED".into();
1708        task.desired_status = "STOPPED".into();
1709        task.stopped_at = Some(Utc::now());
1710        task.stop_code = Some("TaskFailedToStart".into());
1711        task.stopped_reason = Some(reason.to_string());
1712        // Surface the failure reason on the /logs endpoint — without this,
1713        // a task that never reached RUNNING returns an empty log string,
1714        // leaving E2E assertions with no diagnostic.
1715        task.captured_logs = format!("[task failed to start]: {reason}");
1716        for c in task.containers.iter_mut() {
1717            c.last_status = "STOPPED".into();
1718            c.reason = Some(reason.to_string());
1719        }
1720        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1721            if was_running {
1722                if cluster.running_tasks_count > 0 {
1723                    cluster.running_tasks_count -= 1;
1724                }
1725            } else if cluster.pending_tasks_count > 0 {
1726                cluster.pending_tasks_count -= 1;
1727            }
1728        }
1729        if let Some(ref ci_arn) = task.container_instance_arn {
1730            if let Some(ci) = s
1731                .container_instances
1732                .values_mut()
1733                .find(|ci| ci.container_instance_arn == *ci_arn)
1734            {
1735                if was_running {
1736                    if ci.running_tasks_count > 0 {
1737                        ci.running_tasks_count -= 1;
1738                    }
1739                } else if ci.pending_tasks_count > 0 {
1740                    ci.pending_tasks_count -= 1;
1741                }
1742            }
1743        }
1744        (task.task_arn.clone(), task.cluster_arn.clone())
1745    };
1746    s.push_event(LifecycleEvent {
1747        at: Utc::now(),
1748        event_type: "TaskFailedToStart".into(),
1749        task_arn: Some(arn),
1750        cluster_arn: Some(cluster_arn),
1751        last_status: Some("STOPPED".into()),
1752        detail: serde_json::json!({ "reason": reason }),
1753    });
1754}
1755
1756/// Short helper for tests + snapshot code to sleep between state
1757/// transitions. Exposed on the crate boundary to keep test timing
1758/// centralized.
1759pub async fn sleep(duration: Duration) {
1760    tokio::time::sleep(duration).await;
1761}
1762
1763#[cfg(test)]
1764mod tests {
1765    use super::*;
1766    use crate::state::{EcsState, Task};
1767    use fakecloud_aws::arn::Arn;
1768    use fakecloud_core::multi_account::MultiAccountState;
1769    use parking_lot::RwLock;
1770    use std::sync::Arc;
1771
1772    #[test]
1773    fn cli_available_for_known_missing_binary_is_false() {
1774        assert!(!fakecloud_core::container_net::cli_available(
1775            "definitely-not-a-real-cli-binary-xyz"
1776        ));
1777    }
1778
1779    #[test]
1780    fn aws_ecr_uris_translate_for_local_pull() {
1781        assert_eq!(
1782            fakecloud_core::ecr_uri::translate_to_local(
1783                "123456789012.dkr.ecr.us-east-1.amazonaws.com/app:latest",
1784                4566
1785            )
1786            .as_deref(),
1787            Some("127.0.0.1:4566/app:latest")
1788        );
1789    }
1790
1791    fn make_task(task_id: &str) -> Task {
1792        Task {
1793            task_arn: Arn::new(
1794                "ecs",
1795                "us-east-1",
1796                "000000000000",
1797                &format!("task/default/{task_id}"),
1798            )
1799            .to_string(),
1800            task_id: task_id.into(),
1801            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
1802            cluster_name: "default".into(),
1803            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
1804            family: "app".into(),
1805            revision: 1,
1806            container_instance_arn: None,
1807            capacity_provider_name: None,
1808            last_status: "PENDING".into(),
1809            desired_status: "RUNNING".into(),
1810            launch_type: "FARGATE".into(),
1811            platform_version: None,
1812            cpu: None,
1813            memory: None,
1814            containers: Vec::new(),
1815            overrides: serde_json::json!({}),
1816            started_by: None,
1817            group: None,
1818            connectivity: "CONNECTING".into(),
1819            stop_code: None,
1820            stopped_reason: None,
1821            created_at: Utc::now(),
1822            started_at: None,
1823            stopping_at: None,
1824            stopped_at: None,
1825            pull_started_at: None,
1826            pull_stopped_at: None,
1827            connectivity_at: None,
1828            started_by_ref_id: None,
1829            execution_role_arn: None,
1830            task_role_arn: None,
1831            tags: Vec::new(),
1832            awslogs: None,
1833            captured_logs: String::new(),
1834            protection: None,
1835            enable_execute_command: false,
1836            attachments: Vec::new(),
1837            volume_configurations: Vec::new(),
1838            task_set_arn: None,
1839        }
1840    }
1841
1842    #[test]
1843    fn finalize_failure_writes_reason_into_captured_logs() {
1844        let mut accounts: MultiAccountState<EcsState> =
1845            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1846        let acct = accounts.get_or_create("000000000000");
1847        acct.tasks.insert("t1".into(), make_task("t1"));
1848        let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1849
1850        finalize_failure(
1851            &state,
1852            "000000000000",
1853            "t1",
1854            "failed to resolve secret DB_PASSWORD",
1855        );
1856
1857        let accounts = state.read();
1858        let task = accounts
1859            .get("000000000000")
1860            .unwrap()
1861            .tasks
1862            .get("t1")
1863            .unwrap();
1864        assert_eq!(task.last_status, "STOPPED");
1865        assert_eq!(task.stop_code.as_deref(), Some("TaskFailedToStart"));
1866        assert!(
1867            task.captured_logs
1868                .contains("failed to resolve secret DB_PASSWORD"),
1869            "captured_logs missing reason: {:?}",
1870            task.captured_logs
1871        );
1872        assert!(
1873            task.captured_logs.starts_with("[task failed to start]:"),
1874            "captured_logs missing prefix: {:?}",
1875            task.captured_logs
1876        );
1877    }
1878
1879    fn make_container(name: &str, essential: bool) -> crate::state::Container {
1880        crate::state::Container {
1881            container_arn: format!(
1882                "arn:aws:ecs:us-east-1:000000000000:container/default/abc/{name}"
1883            ),
1884            name: name.into(),
1885            image: "alpine".into(),
1886            task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
1887            last_status: "RUNNING".into(),
1888            exit_code: None,
1889            reason: None,
1890            runtime_id: Some(format!("dockerid-{name}")),
1891            essential,
1892            cpu: None,
1893            memory: None,
1894            memory_reservation: None,
1895            network_bindings: Vec::new(),
1896            network_interfaces: Vec::new(),
1897            health_status: None,
1898            managed_agents: None,
1899            image_digest: None,
1900        }
1901    }
1902
1903    #[test]
1904    fn task_should_stop_when_essential_exits() {
1905        let containers = vec![
1906            RunningContainer {
1907                name: "app".into(),
1908                container_id: "id-app".into(),
1909                essential: true,
1910                exit_code: Some(0),
1911                network_bindings: Vec::new(),
1912                image_digest: None,
1913            },
1914            RunningContainer {
1915                name: "sidecar".into(),
1916                container_id: "id-sc".into(),
1917                essential: false,
1918                exit_code: None,
1919                network_bindings: Vec::new(),
1920                image_digest: None,
1921            },
1922        ];
1923        assert!(task_should_stop(&containers));
1924    }
1925
1926    #[test]
1927    fn task_keeps_running_when_only_non_essential_exits() {
1928        let containers = vec![
1929            RunningContainer {
1930                name: "app".into(),
1931                container_id: "id-app".into(),
1932                essential: true,
1933                exit_code: None,
1934                network_bindings: Vec::new(),
1935                image_digest: None,
1936            },
1937            RunningContainer {
1938                name: "sidecar".into(),
1939                container_id: "id-sc".into(),
1940                essential: false,
1941                exit_code: Some(0),
1942                network_bindings: Vec::new(),
1943                image_digest: None,
1944            },
1945        ];
1946        assert!(!task_should_stop(&containers));
1947    }
1948
1949    #[test]
1950    fn task_stops_when_all_non_essentials_exit() {
1951        let containers = vec![
1952            RunningContainer {
1953                name: "a".into(),
1954                container_id: "id-a".into(),
1955                essential: false,
1956                exit_code: Some(0),
1957                network_bindings: Vec::new(),
1958                image_digest: None,
1959            },
1960            RunningContainer {
1961                name: "b".into(),
1962                container_id: "id-b".into(),
1963                essential: false,
1964                exit_code: Some(1),
1965                network_bindings: Vec::new(),
1966                image_digest: None,
1967            },
1968        ];
1969        assert!(task_should_stop(&containers));
1970    }
1971
1972    #[test]
1973    fn finalize_stopped_multi_assigns_per_container_exit_codes() {
1974        let mut accounts: MultiAccountState<EcsState> =
1975            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1976        let acct = accounts.get_or_create("000000000000");
1977        let mut t = make_task("t1");
1978        t.containers = vec![
1979            make_container("app", true),
1980            make_container("sidecar", false),
1981        ];
1982        acct.tasks.insert("t1".into(), t);
1983        let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1984
1985        let final_containers = vec![
1986            RunningContainer {
1987                name: "app".into(),
1988                container_id: "id-app".into(),
1989                essential: true,
1990                exit_code: Some(0),
1991                network_bindings: Vec::new(),
1992                image_digest: None,
1993            },
1994            RunningContainer {
1995                name: "sidecar".into(),
1996                container_id: "id-sc".into(),
1997                essential: false,
1998                exit_code: Some(137),
1999                network_bindings: Vec::new(),
2000                image_digest: None,
2001            },
2002        ];
2003        finalize_stopped_multi(
2004            &state,
2005            "000000000000",
2006            "t1",
2007            &final_containers,
2008            0,
2009            "captured",
2010            "EssentialContainerExited",
2011            None,
2012        );
2013
2014        let accounts = state.read();
2015        let task = accounts
2016            .get("000000000000")
2017            .unwrap()
2018            .tasks
2019            .get("t1")
2020            .unwrap();
2021        assert_eq!(task.last_status, "STOPPED");
2022        assert_eq!(task.stop_code.as_deref(), Some("EssentialContainerExited"));
2023        let app = task.containers.iter().find(|c| c.name == "app").unwrap();
2024        let sc = task
2025            .containers
2026            .iter()
2027            .find(|c| c.name == "sidecar")
2028            .unwrap();
2029        assert_eq!(app.exit_code, Some(0));
2030        assert_eq!(sc.exit_code, Some(137));
2031        assert_eq!(app.last_status, "STOPPED");
2032        assert_eq!(sc.last_status, "STOPPED");
2033    }
2034
2035    fn plan(name: &str, deps: &[&str]) -> ContainerPlan {
2036        ContainerPlan {
2037            container_name: name.into(),
2038            image: "alpine".into(),
2039            env: Vec::new(),
2040            entry_point: Vec::new(),
2041            command: Vec::new(),
2042            secrets_refs: Vec::new(),
2043            essential: true,
2044            has_task_role: false,
2045            port_mappings: Vec::new(),
2046            network_mode: None,
2047            depends_on: deps
2048                .iter()
2049                .map(|s| DependsOn {
2050                    container_name: (*s).to_string(),
2051                    condition: DependsOnCondition::Start,
2052                })
2053                .collect(),
2054            health_check: None,
2055            volume_mounts: Vec::new(),
2056            ulimits: Vec::new(),
2057            linux_parameters: None,
2058            stop_timeout: None,
2059            user: None,
2060            working_directory: None,
2061            tty: false,
2062            interactive: false,
2063            readonly_rootfs: false,
2064        }
2065    }
2066
2067    #[test]
2068    fn topo_sort_orders_by_depends_on() {
2069        // sidecar depends on app, so app must come first regardless of
2070        // declaration order.
2071        let plans = vec![plan("sidecar", &["app"]), plan("app", &[])];
2072        let ordered = topo_sort_plans(plans);
2073        assert_eq!(ordered[0].container_name, "app");
2074        assert_eq!(ordered[1].container_name, "sidecar");
2075    }
2076
2077    #[test]
2078    fn topo_sort_preserves_declaration_order_when_no_deps() {
2079        let plans = vec![plan("first", &[]), plan("second", &[]), plan("third", &[])];
2080        let ordered = topo_sort_plans(plans);
2081        let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2082        assert_eq!(names, vec!["first", "second", "third"]);
2083    }
2084
2085    #[test]
2086    fn topo_sort_handles_chain() {
2087        // c -> b -> a, declared in reverse so the topological sort must
2088        // bubble dependencies up.
2089        let plans = vec![plan("c", &["b"]), plan("b", &["a"]), plan("a", &[])];
2090        let ordered = topo_sort_plans(plans);
2091        let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2092        assert_eq!(names, vec!["a", "b", "c"]);
2093    }
2094
2095    #[test]
2096    fn topo_sort_ignores_unknown_dependency() {
2097        // depends_on names a container not in this task definition. Real
2098        // ECS would reject this at register time; we don't (yet), so the
2099        // unknown dep should just be skipped instead of stalling the sort.
2100        let plans = vec![plan("only", &["does-not-exist"])];
2101        let ordered = topo_sort_plans(plans);
2102        assert_eq!(ordered.len(), 1);
2103        assert_eq!(ordered[0].container_name, "only");
2104    }
2105
2106    #[test]
2107    fn topo_sort_recovers_from_cycle() {
2108        // Cyclic dependsOn: both plans should still appear in the output
2109        // so the runtime doesn't silently drop them.
2110        let plans = vec![plan("a", &["b"]), plan("b", &["a"])];
2111        let ordered = topo_sort_plans(plans);
2112        assert_eq!(ordered.len(), 2);
2113    }
2114
2115    #[test]
2116    fn parse_health_check_fills_aws_defaults() {
2117        let v = serde_json::json!({
2118            "command": ["CMD-SHELL", "curl -f http://localhost/ || exit 1"],
2119        });
2120        let hc = __test_parse_health_check(&v).expect("parsed");
2121        assert_eq!(hc.command[0], "CMD-SHELL");
2122        assert_eq!(hc.interval_seconds, 30);
2123        assert_eq!(hc.timeout_seconds, 5);
2124        assert_eq!(hc.retries, 3);
2125        assert_eq!(hc.start_period_seconds, 0);
2126    }
2127
2128    #[test]
2129    fn parse_health_check_overrides_explicit_values() {
2130        let v = serde_json::json!({
2131            "command": ["CMD", "/probe"],
2132            "interval": 7,
2133            "timeout": 2,
2134            "retries": 9,
2135            "startPeriod": 12,
2136        });
2137        let hc = __test_parse_health_check(&v).expect("parsed");
2138        assert_eq!(hc.interval_seconds, 7);
2139        assert_eq!(hc.timeout_seconds, 2);
2140        assert_eq!(hc.retries, 9);
2141        assert_eq!(hc.start_period_seconds, 12);
2142    }
2143
2144    #[test]
2145    fn parse_health_check_returns_none_for_none_sentinel() {
2146        // ECS uses ["NONE"] to disable an inherited HEALTHCHECK; we
2147        // skip emission rather than passing a literal `none` to docker.
2148        let v = serde_json::json!({ "command": ["NONE"] });
2149        assert!(__test_parse_health_check(&v).is_none());
2150    }
2151
2152    #[test]
2153    fn parse_health_check_returns_none_for_missing_command() {
2154        let v = serde_json::json!({ "interval": 30 });
2155        assert!(__test_parse_health_check(&v).is_none());
2156    }
2157
2158    #[test]
2159    fn render_health_flags_emits_full_set_for_cmd_shell() {
2160        let hc = HealthCheckSpec {
2161            command: vec!["CMD-SHELL".into(), "curl -f http://localhost/".into()],
2162            interval_seconds: 15,
2163            timeout_seconds: 3,
2164            retries: 4,
2165            start_period_seconds: 10,
2166        };
2167        let flags = render_health_flags(&hc);
2168        assert_eq!(flags[0], "--health-cmd");
2169        assert_eq!(flags[1], "curl -f http://localhost/");
2170        assert!(flags.contains(&"--health-interval=15s".to_string()));
2171        assert!(flags.contains(&"--health-timeout=3s".to_string()));
2172        assert!(flags.contains(&"--health-retries=4".to_string()));
2173        assert!(flags.contains(&"--health-start-period=10s".to_string()));
2174    }
2175
2176    #[test]
2177    fn render_health_flags_joins_cmd_argv_with_spaces() {
2178        // CMD form in ECS is argv-style; docker `--health-cmd` only
2179        // accepts a single shell string, so we collapse with spaces.
2180        let hc = HealthCheckSpec {
2181            command: vec![
2182                "CMD".into(),
2183                "/bin/probe".into(),
2184                "--port".into(),
2185                "8080".into(),
2186            ],
2187            interval_seconds: 30,
2188            timeout_seconds: 5,
2189            retries: 3,
2190            start_period_seconds: 0,
2191        };
2192        let flags = render_health_flags(&hc);
2193        assert_eq!(flags[1], "/bin/probe --port 8080");
2194    }
2195
2196    #[test]
2197    fn build_run_argv_emits_health_flags_when_present() {
2198        let plan = ContainerPlan {
2199            container_name: "app".into(),
2200            image: "alpine".into(),
2201            env: Vec::new(),
2202            entry_point: Vec::new(),
2203            command: Vec::new(),
2204            secrets_refs: Vec::new(),
2205            essential: true,
2206            has_task_role: false,
2207            port_mappings: Vec::new(),
2208            network_mode: None,
2209            depends_on: Vec::new(),
2210            health_check: Some(HealthCheckSpec {
2211                command: vec!["CMD-SHELL".into(), "true".into()],
2212                interval_seconds: 5,
2213                timeout_seconds: 2,
2214                retries: 1,
2215                start_period_seconds: 1,
2216            }),
2217            volume_mounts: Vec::new(),
2218            ulimits: Vec::new(),
2219            linux_parameters: None,
2220            stop_timeout: None,
2221            user: None,
2222            working_directory: None,
2223            tty: false,
2224            interactive: false,
2225            readonly_rootfs: false,
2226        };
2227        let argv = build_run_argv(
2228            &plan,
2229            &[],
2230            "task-1",
2231            "host.docker.internal",
2232            None,
2233            "alpine",
2234            true,
2235        );
2236        let joined = argv.join(" ");
2237        assert!(joined.contains("--health-cmd true"), "argv: {joined}");
2238        assert!(joined.contains("--health-interval=5s"), "argv: {joined}");
2239        assert!(joined.contains("--health-timeout=2s"), "argv: {joined}");
2240        assert!(joined.contains("--health-retries=1"), "argv: {joined}");
2241        assert!(
2242            joined.contains("--health-start-period=1s"),
2243            "argv: {joined}"
2244        );
2245    }
2246
2247    #[test]
2248    fn build_run_argv_emits_no_health_flags_when_absent() {
2249        let plan = ContainerPlan {
2250            container_name: "app".into(),
2251            image: "alpine".into(),
2252            env: Vec::new(),
2253            entry_point: Vec::new(),
2254            command: Vec::new(),
2255            secrets_refs: Vec::new(),
2256            essential: true,
2257            has_task_role: false,
2258            port_mappings: Vec::new(),
2259            network_mode: None,
2260            depends_on: Vec::new(),
2261            health_check: None,
2262            volume_mounts: Vec::new(),
2263            ulimits: Vec::new(),
2264            linux_parameters: None,
2265            stop_timeout: None,
2266            user: None,
2267            working_directory: None,
2268            tty: false,
2269            interactive: false,
2270            readonly_rootfs: false,
2271        };
2272        let argv = build_run_argv(
2273            &plan,
2274            &[],
2275            "task-1",
2276            "host.docker.internal",
2277            None,
2278            "alpine",
2279            true,
2280        );
2281        assert!(!argv.iter().any(|s| s.starts_with("--health")));
2282    }
2283
2284    #[test]
2285    fn docker_health_to_ecs_maps_known_states() {
2286        assert_eq!(docker_health_to_ecs("healthy"), "HEALTHY");
2287        assert_eq!(docker_health_to_ecs("HEALTHY"), "HEALTHY");
2288        assert_eq!(docker_health_to_ecs("unhealthy"), "UNHEALTHY");
2289        assert_eq!(docker_health_to_ecs("starting"), "UNKNOWN");
2290        assert_eq!(docker_health_to_ecs("none"), "UNKNOWN");
2291        assert_eq!(docker_health_to_ecs(""), "UNKNOWN");
2292    }
2293
2294    /// `host.sourcePath` becomes a host bind mount with the path
2295    /// passed straight through to docker.
2296    #[test]
2297    fn resolve_host_bind_volume_uses_source_path() {
2298        let mut volumes = std::collections::HashMap::new();
2299        let v = serde_json::json!({
2300            "name": "data",
2301            "host": { "sourcePath": "/var/lib/myapp" }
2302        });
2303        volumes.insert("data".to_string(), &v);
2304        let mp = serde_json::json!({
2305            "sourceVolume": "data",
2306            "containerPath": "/app/data",
2307            "readOnly": false
2308        });
2309        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2310        assert_eq!(resolved.source, "/var/lib/myapp");
2311        assert_eq!(resolved.container_path, "/app/data");
2312        assert!(!resolved.read_only);
2313    }
2314
2315    /// `readOnly: true` on the mount point appends `:ro` to the
2316    /// rendered docker `-v` flag.
2317    #[test]
2318    fn read_only_mount_renders_ro_suffix() {
2319        let plan = ContainerPlan {
2320            container_name: "app".into(),
2321            image: "alpine".into(),
2322            env: Vec::new(),
2323            entry_point: Vec::new(),
2324            command: Vec::new(),
2325            secrets_refs: Vec::new(),
2326            essential: true,
2327            has_task_role: false,
2328            port_mappings: Vec::new(),
2329            network_mode: None,
2330            depends_on: Vec::new(),
2331            health_check: None,
2332            volume_mounts: vec![VolumeMount {
2333                source: "/host/path".into(),
2334                container_path: "/in/container".into(),
2335                read_only: true,
2336            }],
2337            ulimits: Vec::new(),
2338            linux_parameters: None,
2339            stop_timeout: None,
2340            user: None,
2341            working_directory: None,
2342            tty: false,
2343            interactive: false,
2344            readonly_rootfs: false,
2345        };
2346        let argv = build_run_argv(
2347            &plan,
2348            &[],
2349            "task-1",
2350            "host.docker.internal",
2351            None,
2352            "alpine",
2353            true,
2354        );
2355        let pair = argv
2356            .windows(2)
2357            .find(|w| w[0] == "-v")
2358            .expect("expected -v flag");
2359        assert_eq!(pair[1], "/host/path:/in/container:ro");
2360    }
2361
2362    /// EFS volumes resolve to a stub directory under `/tmp/fakecloud/efs`
2363    /// keyed by `fileSystemId`. `rootDirectory` (when set and not `/`)
2364    /// is appended so different mount targets within the same
2365    /// filesystem stay isolated.
2366    #[test]
2367    fn resolve_efs_volume_uses_stub_dir() {
2368        let mut volumes = std::collections::HashMap::new();
2369        let v = serde_json::json!({
2370            "name": "efs-vol",
2371            "efsVolumeConfiguration": {
2372                "fileSystemId": "fs-12345678",
2373                "rootDirectory": "/exports/app"
2374            }
2375        });
2376        volumes.insert("efs-vol".to_string(), &v);
2377        let mp = serde_json::json!({
2378            "sourceVolume": "efs-vol",
2379            "containerPath": "/mnt/efs"
2380        });
2381        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2382        // EFS resolves to a docker named volume (container-safe), with the
2383        // rootDirectory folded into the name (bug-audit 2026-05-28, 0.6).
2384        assert_eq!(resolved.source, "fakecloud-efs-fs-12345678-exports-app");
2385        assert_eq!(resolved.container_path, "/mnt/efs");
2386    }
2387
2388    /// EFS without `rootDirectory` (or with `/`) maps to the root of
2389    /// the filesystem stub so multiple tasks targeting the same id
2390    /// share state.
2391    #[test]
2392    fn efs_without_root_directory_uses_filesystem_root() {
2393        // No rootDirectory (or "/") -> a single shared named volume per
2394        // filesystem id.
2395        assert_eq!(
2396            shared_volume_name("efs", "fs-abc", "/"),
2397            "fakecloud-efs-fs-abc"
2398        );
2399        assert_eq!(
2400            shared_volume_name("efs", "fs-abc", ""),
2401            "fakecloud-efs-fs-abc"
2402        );
2403    }
2404
2405    /// `dockerVolumeConfiguration` resolves to the volume name itself,
2406    /// which docker treats as a named volume reference. No host path
2407    /// is materialised — docker creates the volume on first reference.
2408    #[test]
2409    fn resolve_docker_named_volume_uses_volume_name() {
2410        let mut volumes = std::collections::HashMap::new();
2411        let v = serde_json::json!({
2412            "name": "named-vol",
2413            "dockerVolumeConfiguration": {
2414                "scope": "task",
2415                "driver": "local"
2416            }
2417        });
2418        volumes.insert("named-vol".to_string(), &v);
2419        let mp = serde_json::json!({
2420            "sourceVolume": "named-vol",
2421            "containerPath": "/data"
2422        });
2423        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2424        assert_eq!(resolved.source, "named-vol");
2425        assert_eq!(resolved.container_path, "/data");
2426    }
2427
2428    /// FSx for Windows uses the same stub-directory pattern as EFS but
2429    /// scoped under `/tmp/fakecloud/fsx/<filesystemId>/`.
2430    #[test]
2431    fn resolve_fsx_volume_uses_stub_dir() {
2432        let mut volumes = std::collections::HashMap::new();
2433        let v = serde_json::json!({
2434            "name": "fsx-vol",
2435            "fsxWindowsFileServerVolumeConfiguration": {
2436                "fileSystemId": "fs-xyz",
2437                "rootDirectory": "share"
2438            }
2439        });
2440        volumes.insert("fsx-vol".to_string(), &v);
2441        let mp = serde_json::json!({
2442            "sourceVolume": "fsx-vol",
2443            "containerPath": "C:\\data"
2444        });
2445        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2446        // FSx resolves to a docker named volume (bug-audit 2026-05-28, 0.6).
2447        assert_eq!(resolved.source, "fakecloud-fsx-fs-xyz-share");
2448    }
2449
2450    /// Mount points that reference an undeclared `sourceVolume` resolve
2451    /// to `None` so `build_container_plans` skips them rather than
2452    /// emitting a broken `-v` flag.
2453    #[test]
2454    fn unknown_source_volume_returns_none() {
2455        let volumes = std::collections::HashMap::new();
2456        let mp = serde_json::json!({
2457            "sourceVolume": "missing",
2458            "containerPath": "/x"
2459        });
2460        assert!(resolve_mount_point(&mp, &volumes).is_none());
2461    }
2462
2463    /// `find_depends_on_cycle` returns the back-edge endpoints when a
2464    /// trivial 2-cycle exists. Real ECS would reject this at register
2465    /// time; our service-level handler relies on this helper.
2466    #[test]
2467    fn find_depends_on_cycle_detects_two_node_cycle() {
2468        let cds = vec![
2469            serde_json::json!({
2470                "name": "a",
2471                "image": "alpine",
2472                "dependsOn": [{"containerName": "b", "condition": "START"}],
2473            }),
2474            serde_json::json!({
2475                "name": "b",
2476                "image": "alpine",
2477                "dependsOn": [{"containerName": "a", "condition": "START"}],
2478            }),
2479        ];
2480        let cycle = find_depends_on_cycle(&cds);
2481        assert!(cycle.is_some(), "expected cycle to be detected");
2482    }
2483
2484    /// A three-node chain (a -> b -> c) is acyclic and must not be
2485    /// flagged. Guards against an over-eager DFS reporting back-edges
2486    /// from already-finished nodes.
2487    #[test]
2488    fn find_depends_on_cycle_accepts_chain() {
2489        let cds = vec![
2490            serde_json::json!({
2491                "name": "a",
2492                "image": "alpine",
2493                "dependsOn": [{"containerName": "b", "condition": "START"}],
2494            }),
2495            serde_json::json!({
2496                "name": "b",
2497                "image": "alpine",
2498                "dependsOn": [{"containerName": "c", "condition": "START"}],
2499            }),
2500            serde_json::json!({
2501                "name": "c",
2502                "image": "alpine",
2503            }),
2504        ];
2505        assert!(find_depends_on_cycle(&cds).is_none());
2506    }
2507
2508    /// `dependsOn[]` entries that name a container outside the task
2509    /// definition are ignored by the cycle check (they can't form a
2510    /// cycle by definition; runtime also drops them).
2511    #[test]
2512    fn find_depends_on_cycle_ignores_unknown_target() {
2513        let cds = vec![serde_json::json!({
2514            "name": "only",
2515            "image": "alpine",
2516            "dependsOn": [{"containerName": "ghost", "condition": "START"}],
2517        })];
2518        assert!(find_depends_on_cycle(&cds).is_none());
2519    }
2520
2521    /// `condition_is_met` covers each AWS condition value against a
2522    /// simulated docker inspect snapshot. Pinning these mappings here
2523    /// catches accidental re-orderings of the match arms.
2524    #[test]
2525    fn condition_is_met_matches_aws_semantics() {
2526        let running = InspectedState {
2527            started: true,
2528            exited: false,
2529            exit_code: 0,
2530            health: None,
2531        };
2532        let exited_ok = InspectedState {
2533            started: true,
2534            exited: true,
2535            exit_code: 0,
2536            health: None,
2537        };
2538        let exited_fail = InspectedState {
2539            started: true,
2540            exited: true,
2541            exit_code: 1,
2542            health: None,
2543        };
2544        let healthy = InspectedState {
2545            started: true,
2546            exited: false,
2547            exit_code: 0,
2548            health: Some("healthy".into()),
2549        };
2550
2551        // START is satisfied as soon as the container has started, even
2552        // if it later exited.
2553        assert!(condition_is_met(DependsOnCondition::Start, &running));
2554        assert!(condition_is_met(DependsOnCondition::Start, &exited_ok));
2555
2556        // COMPLETE requires an exit, regardless of code.
2557        assert!(!condition_is_met(DependsOnCondition::Complete, &running));
2558        assert!(condition_is_met(DependsOnCondition::Complete, &exited_ok));
2559        assert!(condition_is_met(DependsOnCondition::Complete, &exited_fail));
2560
2561        // SUCCESS requires an exit AND code 0.
2562        assert!(!condition_is_met(DependsOnCondition::Success, &running));
2563        assert!(condition_is_met(DependsOnCondition::Success, &exited_ok));
2564        assert!(!condition_is_met(DependsOnCondition::Success, &exited_fail));
2565
2566        // HEALTHY requires Health.Status == "healthy".
2567        assert!(!condition_is_met(DependsOnCondition::Healthy, &running));
2568        assert!(condition_is_met(DependsOnCondition::Healthy, &healthy));
2569    }
2570
2571    /// `DependsOnCondition::parse` accepts the four AWS-spelled values
2572    /// and rejects everything else — register-time validation depends on
2573    /// this returning `None` for unknowns.
2574    #[test]
2575    fn depends_on_condition_parse_round_trips() {
2576        assert_eq!(
2577            DependsOnCondition::parse("START"),
2578            Some(DependsOnCondition::Start)
2579        );
2580        assert_eq!(
2581            DependsOnCondition::parse("COMPLETE"),
2582            Some(DependsOnCondition::Complete)
2583        );
2584        assert_eq!(
2585            DependsOnCondition::parse("SUCCESS"),
2586            Some(DependsOnCondition::Success)
2587        );
2588        assert_eq!(
2589            DependsOnCondition::parse("HEALTHY"),
2590            Some(DependsOnCondition::Healthy)
2591        );
2592        assert_eq!(DependsOnCondition::parse("start"), None);
2593        assert_eq!(DependsOnCondition::parse("ANY"), None);
2594    }
2595
2596    // ── ulimits + linuxParameters + misc docker flags (O6) ──
2597
2598    #[test]
2599    fn build_run_argv_emits_ulimits() {
2600        let plan = ContainerPlan {
2601            container_name: "app".into(),
2602            image: "alpine".into(),
2603            env: Vec::new(),
2604            entry_point: Vec::new(),
2605            command: Vec::new(),
2606            secrets_refs: Vec::new(),
2607            essential: true,
2608            has_task_role: false,
2609            port_mappings: Vec::new(),
2610            network_mode: None,
2611            depends_on: Vec::new(),
2612            health_check: None,
2613            volume_mounts: Vec::new(),
2614            ulimits: vec![Ulimit {
2615                name: "nofile".into(),
2616                soft_limit: 1024,
2617                hard_limit: 2048,
2618            }],
2619            linux_parameters: None,
2620            stop_timeout: None,
2621            user: None,
2622            working_directory: None,
2623            tty: false,
2624            interactive: false,
2625            readonly_rootfs: false,
2626        };
2627        let argv = build_run_argv(&plan, &[], "t", "host.docker.internal", None, "img", true);
2628        assert!(argv.contains(&"--ulimit".to_string()));
2629        assert!(argv.contains(&"nofile=1024:2048".to_string()));
2630    }
2631
2632    #[test]
2633    fn build_run_argv_emits_linux_parameters() {
2634        let plan = ContainerPlan {
2635            container_name: "app".into(),
2636            image: "alpine".into(),
2637            env: Vec::new(),
2638            entry_point: Vec::new(),
2639            command: Vec::new(),
2640            secrets_refs: Vec::new(),
2641            essential: true,
2642            has_task_role: false,
2643            port_mappings: Vec::new(),
2644            network_mode: None,
2645            depends_on: Vec::new(),
2646            health_check: None,
2647            volume_mounts: Vec::new(),
2648            ulimits: Vec::new(),
2649            linux_parameters: Some(LinuxParameters {
2650                capabilities_add: vec!["NET_ADMIN".into()],
2651                capabilities_drop: vec!["ALL".into()],
2652                devices: vec![Device {
2653                    host_path: "/dev/zero".into(),
2654                    container_path: "/dev/zero".into(),
2655                    permissions: "rwm".into(),
2656                }],
2657                init_process_enabled: true,
2658                shared_memory_size: Some(256),
2659                sysctls: vec![Sysctl {
2660                    name: "net.ipv4.ip_forward".into(),
2661                    value: "1".into(),
2662                }],
2663                tmpfs: vec![Tmpfs {
2664                    container_path: "/tmp".into(),
2665                    size: 128,
2666                    mount_options: vec!["noexec".into()],
2667                }],
2668                privileged: true,
2669            }),
2670            stop_timeout: Some(30),
2671            user: Some("1000:1000".into()),
2672            working_directory: Some("/app".into()),
2673            tty: true,
2674            interactive: true,
2675            readonly_rootfs: true,
2676        };
2677        let argv = build_run_argv(&plan, &[], "t", "host.docker.internal", None, "img", true);
2678        assert!(argv.contains(&"--cap-add".to_string()));
2679        assert!(argv.contains(&"NET_ADMIN".to_string()));
2680        assert!(argv.contains(&"--cap-drop".to_string()));
2681        assert!(argv.contains(&"ALL".to_string()));
2682        assert!(argv.contains(&"--device".to_string()));
2683        assert!(argv.contains(&"/dev/zero:/dev/zerorwm".to_string()));
2684        assert!(argv.contains(&"--init".to_string()));
2685        assert!(argv.contains(&"--shm-size".to_string()));
2686        assert!(argv.contains(&"256m".to_string()));
2687        assert!(argv.contains(&"--sysctl".to_string()));
2688        assert!(argv.contains(&"net.ipv4.ip_forward=1".to_string()));
2689        assert!(argv.contains(&"--tmpfs".to_string()));
2690        assert!(argv.contains(&"--privileged".to_string()));
2691        assert!(argv.contains(&"--stop-timeout".to_string()));
2692        assert!(argv.contains(&"30".to_string()));
2693        assert!(argv.contains(&"--user".to_string()));
2694        assert!(argv.contains(&"1000:1000".to_string()));
2695        assert!(argv.contains(&"--workdir".to_string()));
2696        assert!(argv.contains(&"/app".to_string()));
2697        assert!(argv.contains(&"--tty".to_string()));
2698        assert!(argv.contains(&"--interactive".to_string()));
2699        assert!(argv.contains(&"--read-only".to_string()));
2700    }
2701
2702    #[test]
2703    fn parse_linux_parameters_fills_defaults() {
2704        let raw = serde_json::json!({"initProcessEnabled": true});
2705        let lp = parse_linux_parameters(&raw).expect("parses");
2706        assert!(lp.init_process_enabled);
2707        assert!(!lp.privileged);
2708        assert!(lp.capabilities_add.is_empty());
2709    }
2710
2711    #[test]
2712    fn parse_device_uses_default_permissions() {
2713        let raw = serde_json::json!({"hostPath": "/dev/null", "containerPath": "/dev/null"});
2714        let dev = parse_device(&raw).expect("parses");
2715        assert_eq!(dev.permissions, "rwm");
2716    }
2717
2718    #[test]
2719    fn compute_elbv2_targets_empty_when_no_group() {
2720        let mut accounts: MultiAccountState<EcsState> =
2721            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2722        let acct = accounts.get_or_create("000000000000");
2723        let mut task = make_task("t1");
2724        task.group = None;
2725        acct.tasks.insert("t1".into(), task);
2726        let state = acct.clone();
2727        let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2728        assert!(targets.is_empty());
2729    }
2730
2731    #[test]
2732    fn compute_elbv2_targets_bridge_mode_uses_localhost_and_host_port() {
2733        let mut accounts: MultiAccountState<EcsState> =
2734            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2735        let acct = accounts.get_or_create("000000000000");
2736
2737        let td = crate::state::TaskDefinition {
2738            family: "app".into(),
2739            revision: 1,
2740            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2741            container_definitions: Vec::new(),
2742            network_mode: Some("bridge".into()),
2743            status: "ACTIVE".into(),
2744            task_role_arn: None,
2745            execution_role_arn: None,
2746            requires_compatibilities: Vec::new(),
2747            compatibilities: Vec::new(),
2748            cpu: None,
2749            memory: None,
2750            pid_mode: None,
2751            ipc_mode: None,
2752            volumes: Vec::new(),
2753            placement_constraints: Vec::new(),
2754            proxy_configuration: None,
2755            inference_accelerators: Vec::new(),
2756            ephemeral_storage: None,
2757            runtime_platform: None,
2758            requires_attributes: Vec::new(),
2759            registered_at: Utc::now(),
2760            registered_by: None,
2761            deregistered_at: None,
2762            tags: Vec::new(),
2763            enable_fault_injection: None,
2764        };
2765        acct.task_definitions.insert("app".into(), {
2766            let mut m = std::collections::BTreeMap::new();
2767            m.insert(1, td);
2768            m
2769        });
2770
2771        let service = crate::state::Service {
2772            service_name: "svc".into(),
2773            service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
2774            cluster_name: "default".into(),
2775            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2776            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2777            family: "app".into(),
2778            revision: 1,
2779            desired_count: 1,
2780            running_count: 0,
2781            pending_count: 0,
2782            launch_type: "FARGATE".into(),
2783            status: "ACTIVE".into(),
2784            scheduling_strategy: "REPLICA".into(),
2785            deployment_controller: "ECS".into(),
2786            minimum_healthy_percent: Some(0),
2787            maximum_percent: Some(200),
2788            circuit_breaker: None,
2789            deployments: Vec::new(),
2790            load_balancers: vec![serde_json::json!({
2791                "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
2792                "containerName": "app",
2793                "containerPort": 80,
2794            })],
2795            service_registries: Vec::new(),
2796            placement_constraints: Vec::new(),
2797            placement_strategy: Vec::new(),
2798            network_configuration: None,
2799            volume_configurations: vec![],
2800            tags: Vec::new(),
2801            created_at: Utc::now(),
2802            created_by: None,
2803            role_arn: None,
2804            platform_version: None,
2805            health_check_grace_period_seconds: None,
2806            enable_execute_command: false,
2807            enable_ecs_managed_tags: false,
2808            propagate_tags: None,
2809            capacity_provider_strategy: Vec::new(),
2810            availability_zone_rebalancing: None,
2811        };
2812        acct.services.insert(
2813            crate::state::EcsState::service_key("default", "svc"),
2814            service,
2815        );
2816
2817        let mut task = make_task("t1");
2818        task.group = Some("service:svc".into());
2819        task.containers = vec![crate::state::Container {
2820            container_arn: "arn:aws:ecs:us-east-1:000000000000:container/default/abc/app".into(),
2821            name: "app".into(),
2822            image: "alpine".into(),
2823            task_arn: task.task_arn.clone(),
2824            last_status: "RUNNING".into(),
2825            exit_code: None,
2826            reason: None,
2827            runtime_id: Some("dockerid-app".into()),
2828            essential: true,
2829            cpu: None,
2830            memory: None,
2831            memory_reservation: None,
2832            network_bindings: vec![serde_json::json!({
2833                "bindIP": "0.0.0.0",
2834                "containerPort": 80,
2835                "hostPort": 32768,
2836                "protocol": "tcp",
2837            })],
2838            network_interfaces: Vec::new(),
2839            health_status: None,
2840            managed_agents: None,
2841            image_digest: None,
2842        }];
2843        acct.tasks.insert("t1".into(), task);
2844
2845        let state = acct.clone();
2846        let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2847        assert_eq!(targets.len(), 1);
2848        let (arn, tg_targets) = &targets[0];
2849        assert_eq!(
2850            arn,
2851            "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
2852        );
2853        assert_eq!(tg_targets.len(), 1);
2854        assert_eq!(tg_targets[0].0, "127.0.0.1");
2855        assert_eq!(tg_targets[0].1, Some(32768));
2856    }
2857
2858    #[test]
2859    fn compute_elbv2_targets_awsvpc_uses_eni_ip() {
2860        let mut accounts: MultiAccountState<EcsState> =
2861            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2862        let acct = accounts.get_or_create("000000000000");
2863
2864        let td = crate::state::TaskDefinition {
2865            family: "app".into(),
2866            revision: 1,
2867            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2868            container_definitions: Vec::new(),
2869            network_mode: Some("awsvpc".into()),
2870            status: "ACTIVE".into(),
2871            task_role_arn: None,
2872            execution_role_arn: None,
2873            requires_compatibilities: Vec::new(),
2874            compatibilities: Vec::new(),
2875            cpu: None,
2876            memory: None,
2877            pid_mode: None,
2878            ipc_mode: None,
2879            volumes: Vec::new(),
2880            placement_constraints: Vec::new(),
2881            proxy_configuration: None,
2882            inference_accelerators: Vec::new(),
2883            ephemeral_storage: None,
2884            runtime_platform: None,
2885            requires_attributes: Vec::new(),
2886            registered_at: Utc::now(),
2887            registered_by: None,
2888            deregistered_at: None,
2889            tags: Vec::new(),
2890            enable_fault_injection: None,
2891        };
2892        acct.task_definitions.insert("app".into(), {
2893            let mut m = std::collections::BTreeMap::new();
2894            m.insert(1, td);
2895            m
2896        });
2897
2898        let service = crate::state::Service {
2899            service_name: "svc".into(),
2900            service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
2901            cluster_name: "default".into(),
2902            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2903            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2904            family: "app".into(),
2905            revision: 1,
2906            desired_count: 1,
2907            running_count: 0,
2908            pending_count: 0,
2909            launch_type: "FARGATE".into(),
2910            status: "ACTIVE".into(),
2911            scheduling_strategy: "REPLICA".into(),
2912            deployment_controller: "ECS".into(),
2913            minimum_healthy_percent: Some(0),
2914            maximum_percent: Some(200),
2915            circuit_breaker: None,
2916            deployments: Vec::new(),
2917            load_balancers: vec![serde_json::json!({
2918                "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
2919                "containerName": "app",
2920                "containerPort": 80,
2921            })],
2922            service_registries: Vec::new(),
2923            placement_constraints: Vec::new(),
2924            placement_strategy: Vec::new(),
2925            network_configuration: None,
2926            volume_configurations: vec![],
2927            tags: Vec::new(),
2928            created_at: Utc::now(),
2929            created_by: None,
2930            role_arn: None,
2931            platform_version: None,
2932            health_check_grace_period_seconds: None,
2933            enable_execute_command: false,
2934            enable_ecs_managed_tags: false,
2935            propagate_tags: None,
2936            capacity_provider_strategy: Vec::new(),
2937            availability_zone_rebalancing: None,
2938        };
2939        acct.services.insert(
2940            crate::state::EcsState::service_key("default", "svc"),
2941            service,
2942        );
2943
2944        let mut task = make_task("t1");
2945        task.group = Some("service:svc".into());
2946        task.attachments = vec![crate::state::TaskAttachment {
2947            id: "eni-123".into(),
2948            attachment_type: "eni".into(),
2949            status: "ATTACHED".into(),
2950            details: vec![
2951                crate::state::AttachmentDetail {
2952                    name: "privateIPv4Address".into(),
2953                    value: "172.18.0.2".into(),
2954                },
2955                crate::state::AttachmentDetail {
2956                    name: "macAddress".into(),
2957                    value: "02:42:ac:12:00:02".into(),
2958                },
2959            ],
2960        }];
2961        acct.tasks.insert("t1".into(), task);
2962
2963        let state = acct.clone();
2964        let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2965        assert_eq!(targets.len(), 1);
2966        let (arn, tg_targets) = &targets[0];
2967        assert_eq!(
2968            arn,
2969            "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
2970        );
2971        assert_eq!(tg_targets.len(), 1);
2972        assert_eq!(tg_targets[0].0, "172.18.0.2");
2973        assert_eq!(tg_targets[0].1, Some(80));
2974    }
2975}