Skip to main content

fakecloud_ecs/runtime/
mod.rs

1//! Docker/Podman-based ECS task execution.
2//!
3//! Mirrors the Lambda `ContainerRuntime` approach (auto-detect CLI, forward
4//! localhost → host.docker.internal) but scoped for ECS's different
5//! lifecycle: tasks are ephemeral, so there is no warm-container pool. Each
6//! `run_task` spawns a background tokio task that pulls the image, starts
7//! the container, waits for exit, captures logs, and updates shared ECS
8//! state in place.
9
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use base64::Engine;
15use chrono::Utc;
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_logs::ingest::{append_events, IngestEvent};
18use fakecloud_logs::SharedLogsState;
19use fakecloud_secretsmanager::SharedSecretsManagerState;
20use fakecloud_ssm::SharedSsmState;
21use parking_lot::RwLock;
22use tempfile::TempDir;
23use tokio::process::Command;
24
25use crate::state::{LifecycleEvent, SharedEcsState};
26
27#[derive(Debug, thiserror::Error)]
28pub enum RuntimeError {
29    #[error("container CLI not found (tried docker, podman)")]
30    NoCli,
31    #[error("image pull failed: {0}")]
32    ImagePull(String),
33    #[error("container start failed: {0}")]
34    ContainerStart(String),
35    #[error("docker wait failed: {0}")]
36    Wait(String),
37}
38
39/// Docker/Podman executor for ECS tasks.
40pub struct EcsRuntime {
41    cli: String,
42    host_ip: String,
43    /// Port the main fakecloud server bound to. Used to translate AWS
44    /// ECR URIs (`<acct>.dkr.ecr.<region>.amazonaws.com/<repo>:<tag>`) to
45    /// the local OCI v2 endpoint (`127.0.0.1:<port>/<repo>:<tag>`) so
46    /// tasks can pull images pushed to fakecloud's own ECR.
47    server_port: u16,
48    /// Isolated DOCKER_CONFIG dir pre-populated with Basic auth for
49    /// `127.0.0.1:<port>`; keeps the host user's `~/.docker/config.json`
50    /// untouched and lets `docker pull` succeed against fakecloud ECR
51    /// without a prior `aws ecr get-login-password | docker login`.
52    docker_config: Option<Arc<TempDir>>,
53    /// Tracks per-task lists of `(container_name, docker_container_id)` so
54    /// `stop_task` can kill every container backing a task — multi-container
55    /// task definitions launch one docker container per `containerDefinitions`
56    /// entry, all of which must be torn down on stop.
57    containers: RwLock<std::collections::HashMap<String, Vec<(String, String)>>>,
58    /// Cross-service delivery bus — emits `aws.ecs` EventBridge events
59    /// on task state transitions when wired. `None` if the server started
60    /// without EventBridge configured (or for unit tests).
61    delivery_bus: Option<Arc<DeliveryBus>>,
62    /// CloudWatch Logs state — when set, tasks whose container definition
63    /// declares the `awslogs` log driver get their captured stdout/stderr
64    /// forwarded to a log group/stream under this shared state.
65    logs_state: Option<SharedLogsState>,
66    /// SecretsManager state for resolving `containerDefinition.secrets[]`
67    /// entries whose `valueFrom` is a SecretsManager ARN.
68    secretsmanager_state: Option<SharedSecretsManagerState>,
69    /// SSM Parameter Store state for resolving `secrets[]` entries whose
70    /// `valueFrom` is an SSM parameter ARN.
71    ssm_state: Option<SharedSsmState>,
72}
73
74mod config;
75mod lb;
76mod monitoring;
77mod secrets;
78mod task_lifecycle;
79
80impl EcsRuntime {
81    /// Auto-detect Docker or Podman. Returns `None` if neither is
82    /// available. Honours `FAKECLOUD_CONTAINER_CLI` for explicit override.
83    /// `server_port` is the port the main fakecloud server bound to;
84    /// needed to resolve AWS ECR URIs against the local OCI v2 registry.
85    pub fn new(server_port: u16) -> Option<Self> {
86        let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
87            if cli_works(&cli) {
88                cli
89            } else {
90                return None;
91            }
92        } else if cli_works("docker") {
93            "docker".to_string()
94        } else if cli_works("podman") {
95            "podman".to_string()
96        } else {
97            return None;
98        };
99        let host_ip = if cfg!(target_os = "linux") {
100            "172.17.0.1".to_string()
101        } else {
102            "host-gateway".to_string()
103        };
104        let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
105        Some(Self {
106            cli,
107            host_ip,
108            server_port,
109            docker_config,
110            containers: RwLock::new(std::collections::HashMap::new()),
111            delivery_bus: None,
112            logs_state: None,
113            secretsmanager_state: None,
114            ssm_state: None,
115        })
116    }
117
118    /// Wire EventBridge delivery so task state transitions emit
119    /// `aws.ecs` / `ECS Task State Change` events.
120    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
121        self.delivery_bus = Some(bus);
122        self
123    }
124
125    /// Wire CloudWatch Logs state so tasks using the `awslogs` driver
126    /// get their captured stdout/stderr forwarded.
127    pub fn with_logs(mut self, logs: SharedLogsState) -> Self {
128        self.logs_state = Some(logs);
129        self
130    }
131}
132
133/// Per-container launch plan derived from a task definition.
134#[derive(Clone, Debug)]
135pub(crate) struct ContainerPlan {
136    pub(crate) container_name: String,
137    pub(crate) image: String,
138    pub(crate) env: Vec<(String, String)>,
139    pub(crate) entry_point: Vec<String>,
140    pub(crate) command: Vec<String>,
141    pub(crate) secrets_refs: Vec<(String, String)>,
142    pub(crate) essential: bool,
143    pub(crate) has_task_role: bool,
144    /// Port mappings parsed from the task definition. Each entry becomes
145    /// a `--publish containerPort:hostPort/protocol` flag on the docker
146    /// run command (except for `awsvpc`, where ports are exposed via the
147    /// per-task ENI rather than the docker host's port table).
148    pub(crate) port_mappings: Vec<PortMapping>,
149    /// Task-level network mode propagated to every container plan so the
150    /// argv builder can decide whether to emit `--publish` flags. Real
151    /// ECS treats `awsvpc` as "container is on its own ENI"; the
152    /// equivalent in fakecloud is "don't publish to the host".
153    pub(crate) network_mode: Option<String>,
154    /// Container dependencies parsed from `dependsOn[]`. Each entry pairs
155    /// the target container name with the condition that must be observed
156    /// before this container is launched: `START` (target exists/running),
157    /// `COMPLETE` (target exited, any code), `SUCCESS` (target exited with
158    /// code 0), or `HEALTHY` (target's docker `Health.Status` is `healthy`).
159    /// Used both to topologically order the launch loop and to gate each
160    /// `docker run` on the upstream condition.
161    pub(crate) depends_on: Vec<DependsOn>,
162    /// Parsed `healthCheck` from the task definition. Translated into
163    /// docker `--health-*` flags on `docker run` so the container's
164    /// health is observable via `docker inspect .State.Health.Status`.
165    /// `None` when the task definition doesn't declare a healthCheck;
166    /// the container's `healthStatus` then stays `UNKNOWN` (matching ECS
167    /// behaviour for tasks without a health probe).
168    pub(crate) health_check: Option<HealthCheckSpec>,
169    /// Volume mounts resolved by joining the container definition's
170    /// `mountPoints[]` with the task definition's `volumes[]`. Each entry
171    /// renders as one `-v` flag on the `docker run` invocation. Empty when
172    /// the container has no mount points or no matching volume entries.
173    pub(crate) volume_mounts: Vec<VolumeMount>,
174    /// Parsed `ulimits` from the container definition. Each entry becomes
175    /// `--ulimit <name>=<soft>:<hard>` on `docker run`.
176    pub(crate) ulimits: Vec<Ulimit>,
177    /// Parsed `linuxParameters` from the container definition. Emits
178    /// `--cap-add`, `--cap-drop`, `--device`, `--init`, `--shm-size`,
179    /// `--sysctl`, `--tmpfs`, `--privileged`, and `--read-only` flags.
180    pub(crate) linux_parameters: Option<LinuxParameters>,
181    /// `stopTimeout` in seconds. Becomes `--stop-timeout <N>` on `docker run`.
182    pub(crate) stop_timeout: Option<u32>,
183    /// `user` from the container definition. Becomes `--user <value>`.
184    pub(crate) user: Option<String>,
185    /// `workingDirectory` from the container definition. Becomes `--workdir`.
186    pub(crate) working_directory: Option<String>,
187    /// `tty` from the container definition. Emits `--tty` when true.
188    pub(crate) tty: bool,
189    /// `interactive` from the container definition. Emits `--interactive` when true.
190    pub(crate) interactive: bool,
191    /// `readonlyRootFilesystem` from the container definition. Emits `--read-only` when true.
192    pub(crate) readonly_rootfs: bool,
193}
194
195/// One parsed `dependsOn[]` entry on a container. Pairs the upstream
196/// container name with the condition that must hold before the dependent
197/// container is launched. AWS spells the conditions `START`, `COMPLETE`,
198/// `SUCCESS`, `HEALTHY` and treats anything else as an error at register
199/// time — we mirror that in [`parse_depends_on`].
200#[derive(Clone, Debug, PartialEq, Eq)]
201pub(crate) struct DependsOn {
202    pub container_name: String,
203    pub condition: DependsOnCondition,
204}
205
206/// `dependsOn[].condition` from the task definition. The variants map
207/// 1:1 to AWS's documented values; the launch loop polls docker for the
208/// matching predicate before starting the dependent container.
209#[derive(Clone, Copy, Debug, PartialEq, Eq)]
210pub(crate) enum DependsOnCondition {
211    /// Upstream container has been started (docker container exists and
212    /// is either running or has exited).
213    Start,
214    /// Upstream container has exited (any exit code).
215    Complete,
216    /// Upstream container has exited with code 0.
217    Success,
218    /// Upstream container's `Health.Status` is `healthy`. When the
219    /// upstream has no healthCheck configured, AWS treats this as
220    /// immediately satisfied — we do the same.
221    Healthy,
222}
223
224impl DependsOnCondition {
225    /// Parse the AWS-spelled condition string. Returns `None` for
226    /// unrecognised values so callers can surface a `ClientException`
227    /// at register time.
228    pub fn parse(raw: &str) -> Option<Self> {
229        match raw {
230            "START" => Some(Self::Start),
231            "COMPLETE" => Some(Self::Complete),
232            "SUCCESS" => Some(Self::Success),
233            "HEALTHY" => Some(Self::Healthy),
234            _ => None,
235        }
236    }
237
238    /// AWS-spelled string for this condition. Used in user-facing error
239    /// messages so timeout/dependency-failed reasons echo back the same
240    /// value the user wrote in their task definition.
241    pub fn as_aws_str(self) -> &'static str {
242        match self {
243            Self::Start => "START",
244            Self::Complete => "COMPLETE",
245            Self::Success => "SUCCESS",
246            Self::Healthy => "HEALTHY",
247        }
248    }
249}
250
251/// Container health check parsed from the ECS task definition. Each
252/// field maps 1:1 to a docker `--health-*` flag on `docker run`. AWS
253/// defaults: interval=30s, timeout=5s, retries=3, startPeriod=0s — we
254/// preserve those defaults at parse time so the argv builder always
255/// has concrete values to emit.
256#[derive(Clone, Debug, PartialEq, Eq)]
257pub(crate) struct HealthCheckSpec {
258    /// `command[]` from the task definition. The first element selects
259    /// the docker syntax: `CMD-SHELL` => `--health-cmd <rest joined by space>`,
260    /// `CMD` => `--health-cmd <rest joined by space>` (still routed to
261    /// `--health-cmd` because docker doesn't accept argv-form here),
262    /// `NONE` => no flag emitted (caller skips emitting healthcheck).
263    pub command: Vec<String>,
264    pub interval_seconds: u32,
265    pub timeout_seconds: u32,
266    pub retries: u32,
267    pub start_period_seconds: u32,
268}
269
270/// One entry in a container's `portMappings`. Mirrors the AWS shape so
271/// [`build_run_argv`] and the `networkBindings` response can share the
272/// same parsed representation.
273#[derive(Clone, Debug, PartialEq, Eq)]
274pub(crate) struct PortMapping {
275    pub container_port: u16,
276    /// `0` (or unset in the source JSON) means "use the same value as
277    /// containerPort" — host-mode default per AWS docs.
278    pub host_port: u16,
279    /// Lower-case `tcp` / `udp`. Defaults to `tcp` when omitted.
280    pub protocol: String,
281}
282
283/// One resolved `mountPoints` entry on a container plan. Computed at
284/// launch by joining the container definition's `mountPoints` against the
285/// task definition's `volumes` array. Each entry becomes a single
286/// `-v <source>:<containerPath>[:ro]` flag on `docker run`.
287///
288/// Source resolution by volume kind:
289/// - **host bind** (`volume.host.sourcePath` set): bind the host path
290///   into the container at `containerPath`.
291/// - **EFS** (`efsVolumeConfiguration` set): bind a host-side stub
292///   directory at `/tmp/fakecloud/efs/<filesystemId>[/<rootDirectory>]`
293///   so multiple tasks targeting the same filesystem id can share state
294///   the way real EFS would. The stub directory is created with
295///   `mkdir -p` ahead of `docker run`.
296/// - **FSx for Windows** (`fsxWindowsFileServerVolumeConfiguration` set):
297///   stub directory at `/tmp/fakecloud/fsx/<filesystemId>/<rootDirectory>`
298///   created the same way as EFS.
299/// - **Docker named volume** (`dockerVolumeConfiguration` set): pass the
300///   volume name through to docker as a named volume reference.
301/// - **Bare volume** (only `name` set, no host config): treated as an
302///   anonymous docker volume for that task — matches AWS's "Docker
303///   volumes" default scope.
304#[derive(Clone, Debug, PartialEq, Eq)]
305pub(crate) struct VolumeMount {
306    /// Left side of `-v`: a host path, a docker named volume, or a stub
307    /// directory under `/tmp/fakecloud/{efs,fsx}/...` for shared FS
308    /// emulation.
309    pub source: String,
310    /// Container-side path, taken verbatim from the container
311    /// definition's `mountPoints[].containerPath`.
312    pub container_path: String,
313    /// `mountPoints[].readOnly` honoured: when true, append `:ro` to the
314    /// `-v` flag so the bind/named volume is read-only inside the
315    /// container. Defaults to false (read-write) when omitted.
316    pub read_only: bool,
317}
318
319/// One `ulimits` entry. Becomes `--ulimit <name>=<soft>:<hard>`.
320#[derive(Clone, Debug, PartialEq, Eq)]
321pub(crate) struct Ulimit {
322    pub name: String,
323    pub soft_limit: i32,
324    pub hard_limit: i32,
325}
326
327/// One `linuxParameters.devices` entry. Becomes `--device <hostPath>:<containerPath><permissions>`.
328#[derive(Clone, Debug, PartialEq, Eq)]
329pub(crate) struct Device {
330    pub host_path: String,
331    pub container_path: String,
332    pub permissions: String,
333}
334
335/// One `linuxParameters.sysctl` entry. Becomes `--sysctl <name>=<value>`.
336#[derive(Clone, Debug, PartialEq, Eq)]
337pub(crate) struct Sysctl {
338    pub name: String,
339    pub value: String,
340}
341
342/// Parsed `linuxParameters` from the container definition.
343#[derive(Clone, Debug, PartialEq, Eq, Default)]
344pub(crate) struct LinuxParameters {
345    pub capabilities_add: Vec<String>,
346    pub capabilities_drop: Vec<String>,
347    pub devices: Vec<Device>,
348    pub init_process_enabled: bool,
349    pub shared_memory_size: Option<i32>,
350    pub sysctls: Vec<Sysctl>,
351    pub tmpfs: Vec<Tmpfs>,
352    pub privileged: bool,
353}
354
355/// One `linuxParameters.tmpfs` entry. Becomes `--tmpfs <containerPath>:size=<size>M<,options>*`.
356#[derive(Clone, Debug, PartialEq, Eq)]
357pub(crate) struct Tmpfs {
358    pub container_path: String,
359    pub size: i32,
360    pub mount_options: Vec<String>,
361}
362
363#[derive(Clone, Debug)]
364struct ResolvedContainerPlan {
365    plan: ContainerPlan,
366    env: Vec<(String, String)>,
367}
368
369/// Result of waiting for a task's lifetime-determining container.
370#[derive(Clone, Debug)]
371struct TaskExitOutcome {
372    /// Index into the started-containers list of the container whose exit
373    /// closed out the task. `None` only in degenerate cases — kept as
374    /// `Option` so `final_containers` indexing stays explicit.
375    exited_index: Option<usize>,
376    exit_code: i64,
377    stop_code: &'static str,
378}
379
380/// Per-container record persisted on the task. Mirrors the AWS Container
381/// shape but tracks the docker-side container id alongside ECS metadata.
382#[derive(Clone, Debug)]
383pub(crate) struct RunningContainer {
384    pub(crate) name: String,
385    pub(crate) container_id: String,
386    pub(crate) essential: bool,
387    pub(crate) exit_code: Option<i64>,
388    /// Resolved `networkBindings` for DescribeTasks. Computed from the
389    /// task definition's `portMappings` at launch and surfaced verbatim
390    /// in the per-container response.
391    pub(crate) network_bindings: Vec<serde_json::Value>,
392    /// Image digest captured from `docker inspect` after pull. AWS
393    /// surfaces this on the Container response so callers can pin which
394    /// exact image revision a task is running. `None` when the inspect
395    /// failed or the CLI didn't expose `RepoDigests`.
396    pub(crate) image_digest: Option<String>,
397}
398
399/// Pure decision: does the current set of containers warrant stopping
400/// the task? Returns true when any essential container has exited, or
401/// when every container has exited (regardless of essential). Mirrors
402/// AWS ECS task lifetime semantics.
403pub(crate) fn task_should_stop(containers: &[RunningContainer]) -> bool {
404    if containers.is_empty() {
405        return true;
406    }
407    let any_essential_exited = containers
408        .iter()
409        .any(|c| c.essential && c.exit_code.is_some());
410    if any_essential_exited {
411        return true;
412    }
413    containers.iter().all(|c| c.exit_code.is_some())
414}
415
416fn build_container_plans(
417    state: &SharedEcsState,
418    account_id: &str,
419    task_id: &str,
420    _server_port: u16,
421) -> Result<Vec<ContainerPlan>, RuntimeError> {
422    let accounts = state.read();
423    let s = accounts
424        .get(account_id)
425        .ok_or_else(|| RuntimeError::ContainerStart("account missing".into()))?;
426    let task = s
427        .tasks
428        .get(task_id)
429        .ok_or_else(|| RuntimeError::ContainerStart("task missing".into()))?;
430    if task.containers.is_empty() {
431        return Err(RuntimeError::ContainerStart(
432            "task has no containers".into(),
433        ));
434    }
435    let has_task_role = task.task_role_arn.is_some();
436    let task_def = s
437        .task_definitions
438        .get(&task.family)
439        .and_then(|revs| revs.get(&task.revision));
440    let network_mode = task_def.and_then(|td| td.network_mode.clone());
441    // Index `volumes[]` by name so each container's `mountPoints[]` can
442    // resolve its volume in O(1). Real ECS rejects mountPoints that
443    // reference an undeclared volume at register time; we don't yet, so
444    // unresolved names just produce zero mounts at launch.
445    let volumes_by_name: std::collections::HashMap<String, &serde_json::Value> = task_def
446        .map(|td| {
447            td.volumes
448                .iter()
449                .filter_map(|v| {
450                    let name = v.get("name").and_then(|n| n.as_str())?;
451                    Some((name.to_string(), v))
452                })
453                .collect()
454        })
455        .unwrap_or_default();
456    let mut plans = Vec::with_capacity(task.containers.len());
457    for container in &task.containers {
458        let def = find_container_definition(s, &task.family, task.revision, &container.name);
459        let secrets_refs = def
460            .as_ref()
461            .and_then(|d| d.get("secrets").and_then(|v| v.as_array()).cloned())
462            .map(|arr| {
463                arr.iter()
464                    .filter_map(|e| {
465                        let name = e.get("name").and_then(|v| v.as_str())?.to_string();
466                        let value_from = e.get("valueFrom").and_then(|v| v.as_str())?.to_string();
467                        Some((name, value_from))
468                    })
469                    .collect::<Vec<_>>()
470            })
471            .unwrap_or_default();
472        let str_array = |key: &str| -> Vec<String> {
473            def.as_ref()
474                .and_then(|d| d.get(key).and_then(|v| v.as_array()).cloned())
475                .map(|arr| {
476                    arr.iter()
477                        .filter_map(|v| v.as_str().map(String::from))
478                        .collect::<Vec<_>>()
479                })
480                .unwrap_or_default()
481        };
482        let env = def
483            .as_ref()
484            .and_then(|d| d.get("environment").and_then(|v| v.as_array()).cloned())
485            .map(|arr| {
486                arr.iter()
487                    .filter_map(|e| {
488                        let k = e.get("name").and_then(|v| v.as_str())?;
489                        let v = e.get("value").and_then(|v| v.as_str()).unwrap_or("");
490                        Some((k.to_string(), v.to_string()))
491                    })
492                    .collect::<Vec<_>>()
493            })
494            .unwrap_or_default();
495        let port_mappings = def
496            .as_ref()
497            .and_then(|d| d.get("portMappings").and_then(|v| v.as_array()).cloned())
498            .map(|arr| {
499                arr.iter()
500                    .filter_map(parse_port_mapping)
501                    .collect::<Vec<_>>()
502            })
503            .unwrap_or_default();
504        let depends_on = def
505            .as_ref()
506            .and_then(|d| d.get("dependsOn").and_then(|v| v.as_array()).cloned())
507            .map(|arr| {
508                arr.iter()
509                    .filter_map(parse_depends_on_entry)
510                    .collect::<Vec<_>>()
511            })
512            .unwrap_or_default();
513        let health_check = def
514            .as_ref()
515            .and_then(|d| d.get("healthCheck"))
516            .and_then(parse_health_check);
517        let volume_mounts = def
518            .as_ref()
519            .and_then(|d| d.get("mountPoints").and_then(|v| v.as_array()).cloned())
520            .map(|arr| {
521                arr.iter()
522                    .filter_map(|mp| resolve_mount_point(mp, &volumes_by_name))
523                    .collect::<Vec<_>>()
524            })
525            .unwrap_or_default();
526        let ulimits = def
527            .as_ref()
528            .and_then(|d| d.get("ulimits").and_then(|v| v.as_array()).cloned())
529            .map(|arr| arr.iter().filter_map(parse_ulimit).collect::<Vec<_>>())
530            .unwrap_or_default();
531        let linux_parameters = def
532            .as_ref()
533            .and_then(|d| d.get("linuxParameters"))
534            .and_then(parse_linux_parameters);
535        let stop_timeout = def.as_ref().and_then(|d| {
536            d.get("stopTimeout")
537                .and_then(|v| v.as_u64())
538                .map(|n| n as u32)
539        });
540        let user = def
541            .as_ref()
542            .and_then(|d| d.get("user").and_then(|v| v.as_str()).map(String::from));
543        let working_directory = def.as_ref().and_then(|d| {
544            d.get("workingDirectory")
545                .and_then(|v| v.as_str())
546                .map(String::from)
547        });
548        let tty = def
549            .as_ref()
550            .and_then(|d| d.get("tty").and_then(|v| v.as_bool()))
551            .unwrap_or(false);
552        let interactive = def
553            .as_ref()
554            .and_then(|d| d.get("interactive").and_then(|v| v.as_bool()))
555            .unwrap_or(false);
556        let readonly_rootfs = def
557            .as_ref()
558            .and_then(|d| d.get("readonlyRootFilesystem").and_then(|v| v.as_bool()))
559            .unwrap_or(false);
560        plans.push(ContainerPlan {
561            container_name: container.name.clone(),
562            image: container.image.clone(),
563            env,
564            entry_point: str_array("entryPoint"),
565            command: str_array("command"),
566            secrets_refs,
567            essential: container.essential,
568            has_task_role,
569            port_mappings,
570            network_mode: network_mode.clone(),
571            depends_on,
572            health_check,
573            volume_mounts,
574            ulimits,
575            linux_parameters,
576            stop_timeout,
577            user,
578            working_directory,
579            tty,
580            interactive,
581            readonly_rootfs,
582        });
583    }
584    let plans = topo_sort_plans(plans);
585    Ok(plans)
586}
587
588/// Resolve one `mountPoints[]` entry against the indexed task-definition
589/// volumes. Returns `None` when:
590/// - the entry has no `containerPath` or `sourceVolume`,
591/// - the named volume isn't declared on the task definition.
592///
593/// Returns `Some(VolumeMount)` for every supported volume kind:
594/// host bind, EFS, FSx, named docker volume, anonymous docker volume.
595fn resolve_mount_point(
596    mount_point: &serde_json::Value,
597    volumes_by_name: &std::collections::HashMap<String, &serde_json::Value>,
598) -> Option<VolumeMount> {
599    let container_path = mount_point
600        .get("containerPath")
601        .and_then(|v| v.as_str())?
602        .to_string();
603    let source_volume = mount_point.get("sourceVolume").and_then(|v| v.as_str())?;
604    let read_only = mount_point
605        .get("readOnly")
606        .and_then(|v| v.as_bool())
607        .unwrap_or(false);
608    let volume = volumes_by_name.get(source_volume)?;
609    let source = resolve_volume_source(source_volume, volume)?;
610    Some(VolumeMount {
611        source,
612        container_path,
613        read_only,
614    })
615}
616
617/// Map a single task-definition `volumes[]` entry to the source side of a
618/// `docker run -v` flag. The matching here mirrors the AWS volume kinds:
619///
620/// 1. `host.sourcePath` -> use that path directly (bind mount).
621/// 2. `efsVolumeConfiguration.fileSystemId` -> stub directory under
622///    `/tmp/fakecloud/efs/<filesystemId>[/<rootDirectory>]`. Created with
623///    `mkdir -p` so different tasks targeting the same filesystem id
624///    share the same host directory, matching real EFS's "many tasks,
625///    one filesystem" semantics.
626/// 3. `fsxWindowsFileServerVolumeConfiguration.fileSystemId` -> stub
627///    directory under `/tmp/fakecloud/fsx/<filesystemId>/<rootDirectory>`.
628/// 4. `dockerVolumeConfiguration` -> the volume `name` itself (named
629///    docker volume; docker creates it on first reference).
630/// 5. Bare entry (only `name`) -> the volume `name` as an anonymous
631///    docker volume reference, matching AWS's "Docker volumes" default.
632///
633/// Returns `None` when the configuration is malformed (e.g. EFS without
634/// a fileSystemId).
635fn resolve_volume_source(name: &str, volume: &serde_json::Value) -> Option<String> {
636    if let Some(host) = volume.get("host") {
637        if let Some(path) = host.get("sourcePath").and_then(|v| v.as_str()) {
638            // Empty sourcePath means "anonymous host volume" — fall
639            // through to the named-volume default below.
640            if !path.is_empty() {
641                ensure_dir_exists(path);
642                return Some(path.to_string());
643            }
644        }
645    }
646    if let Some(efs) = volume.get("efsVolumeConfiguration") {
647        let fs_id = efs.get("fileSystemId").and_then(|v| v.as_str())?;
648        let root = efs
649            .get("rootDirectory")
650            .and_then(|v| v.as_str())
651            .unwrap_or("/");
652        let path = stub_dir_for("efs", fs_id, root);
653        ensure_dir_exists(&path);
654        return Some(path);
655    }
656    if let Some(fsx) = volume.get("fsxWindowsFileServerVolumeConfiguration") {
657        let fs_id = fsx.get("fileSystemId").and_then(|v| v.as_str())?;
658        let root = fsx
659            .get("rootDirectory")
660            .and_then(|v| v.as_str())
661            .unwrap_or("/");
662        let path = stub_dir_for("fsx", fs_id, root);
663        ensure_dir_exists(&path);
664        return Some(path);
665    }
666    if volume.get("dockerVolumeConfiguration").is_some() {
667        // Named docker volume — docker auto-creates it on first
668        // reference. Pass the volume name through verbatim.
669        return Some(name.to_string());
670    }
671    // Bare volume entry: anonymous docker volume keyed by name.
672    Some(name.to_string())
673}
674
675/// Compose the host stub directory path for an EFS/FSx volume. Falls
676/// back to a single shared directory per filesystem id when
677/// `rootDirectory` is unset or `/`, matching the EFS convention where
678/// the root of the filesystem is the default mount target.
679fn stub_dir_for(kind: &str, fs_id: &str, root: &str) -> String {
680    let trimmed = root.trim_start_matches('/');
681    if trimmed.is_empty() {
682        format!("/tmp/fakecloud/{kind}/{fs_id}")
683    } else {
684        format!("/tmp/fakecloud/{kind}/{fs_id}/{trimmed}")
685    }
686}
687
688/// Best-effort `mkdir -p` so the EFS/FSx stub path exists before the
689/// first task tries to bind-mount it. Failures are ignored — docker
690/// will surface a clear error on the run, and unit tests don't have a
691/// writable `/tmp/fakecloud` in every sandbox.
692fn ensure_dir_exists(path: &str) {
693    let _ = std::fs::create_dir_all(path);
694}
695
696/// Parse one `dependsOn[]` entry. Returns `None` for malformed entries
697/// (missing `containerName`, unrecognised `condition`) so the caller
698/// can drop them silently from the launch plan — register-time
699/// validation already rejects bad values; this is a defensive fallback.
700fn parse_depends_on_entry(value: &serde_json::Value) -> Option<DependsOn> {
701    let container_name = value
702        .get("containerName")
703        .and_then(|v| v.as_str())?
704        .to_string();
705    let raw_condition = value.get("condition").and_then(|v| v.as_str())?;
706    let condition = DependsOnCondition::parse(raw_condition)?;
707    Some(DependsOn {
708        container_name,
709        condition,
710    })
711}
712
713/// Topologically sort container plans so `dependsOn` dependencies start
714/// before their dependants. Implements Kahn's algorithm with stable order:
715/// when multiple plans are ready, we keep their original declaration
716/// index, so a task without any dependsOn launches in the same order the
717/// user wrote in the task definition. Cycles fall through with the
718/// remaining plans appended in original order — the runtime will still
719/// launch every container; it just can't guarantee dependency ordering
720/// in that degenerate case. Cycles are rejected at register time
721/// (RegisterTaskDefinition -> validate_depends_on_acyclic), so reaching
722/// that branch from a real launch path means a bug elsewhere.
723fn topo_sort_plans(plans: Vec<ContainerPlan>) -> Vec<ContainerPlan> {
724    use std::collections::{HashMap, HashSet};
725    let names: HashSet<String> = plans.iter().map(|p| p.container_name.clone()).collect();
726    let index: HashMap<String, usize> = plans
727        .iter()
728        .enumerate()
729        .map(|(i, p)| (p.container_name.clone(), i))
730        .collect();
731    // in_degree[i] = number of unresolved dependencies for plan i. We
732    // ignore depends_on entries that name a container not in the task
733    // (real ECS rejects those at register time; our register path doesn't
734    // yet, so be defensive here).
735    let mut in_degree: Vec<usize> = plans
736        .iter()
737        .map(|p| {
738            p.depends_on
739                .iter()
740                .filter(|d| names.contains(&d.container_name))
741                .count()
742        })
743        .collect();
744    // dependants[i] = indices of plans that depend on plan i.
745    let mut dependants: Vec<Vec<usize>> = vec![Vec::new(); plans.len()];
746    for (i, p) in plans.iter().enumerate() {
747        for d in &p.depends_on {
748            if let Some(&di) = index.get(&d.container_name) {
749                dependants[di].push(i);
750            }
751        }
752    }
753    let mut ordered: Vec<ContainerPlan> = Vec::with_capacity(plans.len());
754    let mut emitted: Vec<bool> = vec![false; plans.len()];
755    loop {
756        // Pick the lowest-index plan whose in_degree is 0 to keep stable
757        // order across runs.
758        let next = (0..plans.len()).find(|&i| !emitted[i] && in_degree[i] == 0);
759        match next {
760            Some(i) => {
761                emitted[i] = true;
762                ordered.push(plans[i].clone());
763                for &di in &dependants[i] {
764                    if in_degree[di] > 0 {
765                        in_degree[di] -= 1;
766                    }
767                }
768            }
769            None => break,
770        }
771    }
772    // Cycle: append anything left in original order so we don't drop plans.
773    for (i, p) in plans.into_iter().enumerate() {
774        if !emitted[i] {
775            ordered.push(p);
776        }
777    }
778    ordered
779}
780
781/// Validate that `containerDefinitions[].dependsOn[]` graph is acyclic.
782/// Real ECS rejects cyclic dependencies at RegisterTaskDefinition time
783/// with a `ClientException`; we mirror that. Returns the offending pair
784/// of container names so the caller can produce a useful error.
785///
786/// Operates directly on the raw JSON definitions (rather than parsed
787/// `ContainerPlan`s) so register-time validation doesn't have to first
788/// build a full plan from a not-yet-stored task definition.
789pub(crate) fn find_depends_on_cycle(
790    container_definitions: &[serde_json::Value],
791) -> Option<(String, String)> {
792    use std::collections::HashMap;
793
794    let names: Vec<String> = container_definitions
795        .iter()
796        .filter_map(|c| c.get("name").and_then(|n| n.as_str()).map(String::from))
797        .collect();
798    let index: HashMap<&str, usize> = names
799        .iter()
800        .enumerate()
801        .map(|(i, n)| (n.as_str(), i))
802        .collect();
803
804    let mut adj: Vec<Vec<usize>> = vec![Vec::new(); names.len()];
805    for (i, cd) in container_definitions.iter().enumerate() {
806        if i >= names.len() {
807            continue;
808        }
809        let Some(deps) = cd.get("dependsOn").and_then(|v| v.as_array()) else {
810            continue;
811        };
812        for d in deps {
813            let Some(target) = d.get("containerName").and_then(|v| v.as_str()) else {
814                continue;
815            };
816            if let Some(&j) = index.get(target) {
817                // Edge: i depends on j -> for cycle DFS we walk from i to j.
818                adj[i].push(j);
819            }
820        }
821    }
822
823    // DFS with three-colour marking (white=0, gray=1, black=2). When we
824    // hit a gray neighbour we've closed a cycle; report the back-edge as
825    // the offending pair.
826    let mut state = vec![0u8; names.len()];
827    let mut stack: Vec<(usize, usize)> = Vec::new();
828    for start in 0..names.len() {
829        if state[start] != 0 {
830            continue;
831        }
832        stack.clear();
833        stack.push((start, 0));
834        state[start] = 1;
835        while let Some(&(node, next_edge)) = stack.last() {
836            if next_edge < adj[node].len() {
837                let nb = adj[node][next_edge];
838                stack.last_mut().unwrap().1 += 1;
839                match state[nb] {
840                    0 => {
841                        state[nb] = 1;
842                        stack.push((nb, 0));
843                    }
844                    1 => {
845                        return Some((names[node].clone(), names[nb].clone()));
846                    }
847                    _ => {}
848                }
849            } else {
850                state[node] = 2;
851                stack.pop();
852            }
853        }
854    }
855    None
856}
857
858/// Snapshot of the docker container state we care about for `dependsOn`
859/// gating: whether the container exists/started, whether it's exited,
860/// its exit code, and (when configured) its health status.
861#[derive(Debug, Clone)]
862struct InspectedState {
863    started: bool,
864    exited: bool,
865    exit_code: i64,
866    health: Option<String>,
867}
868
869/// One `docker inspect` call returning every field needed by
870/// [`condition_is_met`]. Returns `None` when the container doesn't exist
871/// yet or inspect fails — the caller will simply retry on the next poll.
872async fn inspect_container_state(cli: &str, container_id: &str) -> Option<InspectedState> {
873    // Compose all four fields into a single inspect format so the gate
874    // costs one process spawn per poll rather than four.
875    let format =
876        "{{.State.Status}}|{{.State.Running}}|{{.State.ExitCode}}|{{if .State.Health}}{{.State.Health.Status}}{{else}}<none>{{end}}";
877    let out = Command::new(cli)
878        .args(["inspect", "-f", format, container_id])
879        .output()
880        .await
881        .ok()?;
882    if !out.status.success() {
883        return None;
884    }
885    let raw = String::from_utf8_lossy(&out.stdout).trim().to_string();
886    let parts: Vec<&str> = raw.split('|').collect();
887    if parts.len() < 4 {
888        return None;
889    }
890    let status = parts[0];
891    let running = parts[1] == "true";
892    let exit_code: i64 = parts[2].parse().unwrap_or(-1);
893    let health = match parts[3] {
894        "<none>" | "" => None,
895        other => Some(other.to_string()),
896    };
897    // `created` is the brief moment between docker creating the
898    // container and the entrypoint running. Treat anything past
899    // `created` as "started" for the START condition.
900    let started = running || status == "exited" || status == "running" || status == "dead";
901    let exited = status == "exited" || status == "dead";
902    Some(InspectedState {
903        started,
904        exited,
905        exit_code,
906        health,
907    })
908}
909
910/// Decide whether the polled `state` satisfies a `dependsOn[].condition`.
911/// Encapsulates the AWS semantics so the polling loop is purely
912/// mechanical.
913fn condition_is_met(condition: DependsOnCondition, state: &InspectedState) -> bool {
914    match condition {
915        DependsOnCondition::Start => state.started,
916        DependsOnCondition::Complete => state.exited,
917        DependsOnCondition::Success => state.exited && state.exit_code == 0,
918        DependsOnCondition::Healthy => state.health.as_deref() == Some("healthy"),
919    }
920}
921
922/// Test-only re-export of [`parse_port_mapping`] so sibling test modules
923/// can lock in the default-port / default-protocol behaviour without us
924/// widening the visibility of the parser itself.
925#[cfg(test)]
926pub(crate) fn __test_parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
927    parse_port_mapping(value)
928}
929
930/// Parse a `healthCheck` block from a task definition's container
931/// definition. Returns `None` for missing `command` or for a command
932/// whose first token is `NONE` (the AWS-documented "disable healthcheck
933/// inherited from image" sentinel — emit no flags rather than a `none`
934/// healthcheck). Defaults follow AWS: 30s/5s/3/0s.
935fn parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
936    let cmd_arr = value.get("command")?.as_array()?;
937    let command: Vec<String> = cmd_arr
938        .iter()
939        .filter_map(|v| v.as_str().map(String::from))
940        .collect();
941    if command.is_empty() {
942        return None;
943    }
944    if command.first().map(|s| s.as_str()) == Some("NONE") {
945        return None;
946    }
947    let read_u32 = |key: &str, default: u32| -> u32 {
948        value
949            .get(key)
950            .and_then(|v| v.as_i64())
951            .filter(|n| (0..=u32::MAX as i64).contains(n))
952            .map(|n| n as u32)
953            .unwrap_or(default)
954    };
955    Some(HealthCheckSpec {
956        command,
957        interval_seconds: read_u32("interval", 30),
958        timeout_seconds: read_u32("timeout", 5),
959        retries: read_u32("retries", 3),
960        start_period_seconds: read_u32("startPeriod", 0),
961    })
962}
963
964/// Parse one `ulimits` entry from the container definition JSON.
965fn parse_ulimit(value: &serde_json::Value) -> Option<Ulimit> {
966    let name = value.get("name").and_then(|v| v.as_str())?;
967    let soft = value
968        .get("softLimit")
969        .and_then(|v| v.as_i64())
970        .filter(|n| *n >= 0)? as i32;
971    let hard = value
972        .get("hardLimit")
973        .and_then(|v| v.as_i64())
974        .filter(|n| *n >= 0)? as i32;
975    Some(Ulimit {
976        name: name.to_string(),
977        soft_limit: soft,
978        hard_limit: hard,
979    })
980}
981
982/// Parse `linuxParameters` from the container definition JSON.
983fn parse_linux_parameters(value: &serde_json::Value) -> Option<LinuxParameters> {
984    let mut lp = LinuxParameters::default();
985    if let Some(arr) = value
986        .get("capabilities")
987        .and_then(|v| v.get("add"))
988        .and_then(|v| v.as_array())
989    {
990        lp.capabilities_add = arr
991            .iter()
992            .filter_map(|v| v.as_str().map(String::from))
993            .collect();
994    }
995    if let Some(arr) = value
996        .get("capabilities")
997        .and_then(|v| v.get("drop"))
998        .and_then(|v| v.as_array())
999    {
1000        lp.capabilities_drop = arr
1001            .iter()
1002            .filter_map(|v| v.as_str().map(String::from))
1003            .collect();
1004    }
1005    if let Some(arr) = value.get("devices").and_then(|v| v.as_array()) {
1006        lp.devices = arr.iter().filter_map(parse_device).collect();
1007    }
1008    lp.init_process_enabled = value
1009        .get("initProcessEnabled")
1010        .and_then(|v| v.as_bool())
1011        .unwrap_or(false);
1012    lp.shared_memory_size = value
1013        .get("sharedMemorySize")
1014        .and_then(|v| v.as_i64())
1015        .map(|n| n as i32);
1016    if let Some(arr) = value.get("sysctl").and_then(|v| v.as_array()) {
1017        lp.sysctls = arr.iter().filter_map(parse_sysctl).collect();
1018    }
1019    if let Some(arr) = value.get("tmpfs").and_then(|v| v.as_array()) {
1020        lp.tmpfs = arr.iter().filter_map(parse_tmpfs).collect();
1021    }
1022    lp.privileged = value
1023        .get("privileged")
1024        .and_then(|v| v.as_bool())
1025        .unwrap_or(false);
1026    Some(lp)
1027}
1028
1029fn parse_device(value: &serde_json::Value) -> Option<Device> {
1030    let host_path = value.get("hostPath").and_then(|v| v.as_str())?.to_string();
1031    let container_path = value
1032        .get("containerPath")
1033        .and_then(|v| v.as_str())?
1034        .to_string();
1035    let permissions = value
1036        .get("permissions")
1037        .and_then(|v| v.as_str())
1038        .unwrap_or("rwm")
1039        .to_string();
1040    Some(Device {
1041        host_path,
1042        container_path,
1043        permissions,
1044    })
1045}
1046
1047fn parse_sysctl(value: &serde_json::Value) -> Option<Sysctl> {
1048    let name = value.get("name").and_then(|v| v.as_str())?.to_string();
1049    let value_str = value.get("value").and_then(|v| v.as_str())?.to_string();
1050    Some(Sysctl {
1051        name,
1052        value: value_str,
1053    })
1054}
1055
1056fn parse_tmpfs(value: &serde_json::Value) -> Option<Tmpfs> {
1057    let container_path = value
1058        .get("containerPath")
1059        .and_then(|v| v.as_str())?
1060        .to_string();
1061    let size = value
1062        .get("size")
1063        .and_then(|v| v.as_i64())
1064        .filter(|n| *n > 0)? as i32;
1065    let mount_options = value
1066        .get("mountOptions")
1067        .and_then(|v| v.as_array())
1068        .map(|arr| {
1069            arr.iter()
1070                .filter_map(|v| v.as_str().map(String::from))
1071                .collect()
1072        })
1073        .unwrap_or_default();
1074    Some(Tmpfs {
1075        container_path,
1076        size,
1077        mount_options,
1078    })
1079}
1080
1081/// Render a [`HealthCheckSpec`] into the docker run flags that emulate
1082/// the equivalent ECS healthCheck. AWS's `command[0]` is a sentinel
1083/// (`CMD-SHELL`/`CMD`/`NONE`); docker's `--health-cmd` always takes a
1084/// single shell-string, so we collapse the remaining tokens with spaces
1085/// for either sentinel — matching how docker itself stringifies HEALTHCHECK
1086/// CMD ["a","b"] back to a shell string at inspect time.
1087pub(crate) fn render_health_flags(hc: &HealthCheckSpec) -> Vec<String> {
1088    if hc.command.len() < 2 {
1089        return Vec::new();
1090    }
1091    let cmd_kind = hc.command[0].as_str();
1092    if cmd_kind != "CMD" && cmd_kind != "CMD-SHELL" {
1093        return Vec::new();
1094    }
1095    let cmd_string = hc.command[1..].join(" ");
1096    vec![
1097        "--health-cmd".into(),
1098        cmd_string,
1099        format!("--health-interval={}s", hc.interval_seconds),
1100        format!("--health-timeout={}s", hc.timeout_seconds),
1101        format!("--health-retries={}", hc.retries),
1102        format!("--health-start-period={}s", hc.start_period_seconds),
1103    ]
1104}
1105
1106/// Test-only re-export of [`parse_health_check`] so unit tests in
1107/// sibling modules can lock in the AWS default-fill behaviour without
1108/// us widening the parser's visibility.
1109#[cfg(test)]
1110pub(crate) fn __test_parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
1111    parse_health_check(value)
1112}
1113
1114/// Map a docker `.State.Health.Status` value to the ECS `healthStatus`
1115/// shape. Docker emits `starting|healthy|unhealthy|none|""` (empty when
1116/// the image has no HEALTHCHECK and we didn't add one). ECS only knows
1117/// `HEALTHY|UNHEALTHY|UNKNOWN`, so anything that isn't a clean healthy/
1118/// unhealthy lands in `UNKNOWN`.
1119pub(crate) fn docker_health_to_ecs(raw: &str) -> &'static str {
1120    match raw.trim().to_ascii_lowercase().as_str() {
1121        "healthy" => "HEALTHY",
1122        "unhealthy" => "UNHEALTHY",
1123        _ => "UNKNOWN",
1124    }
1125}
1126
1127/// Parse a single `portMappings[]` entry. Returns `None` for entries
1128/// that are missing `containerPort` or have a value out of `u16` range.
1129/// Defaults: `hostPort` -> `containerPort`, `protocol` -> `tcp`.
1130fn parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
1131    let container_port = value
1132        .get("containerPort")
1133        .and_then(|v| v.as_i64())
1134        .filter(|n| (0..=u16::MAX as i64).contains(n))? as u16;
1135    let host_port_raw = value
1136        .get("hostPort")
1137        .and_then(|v| v.as_i64())
1138        .filter(|n| (0..=u16::MAX as i64).contains(n))
1139        .map(|n| n as u16)
1140        .unwrap_or(0);
1141    let host_port = if host_port_raw == 0 {
1142        container_port
1143    } else {
1144        host_port_raw
1145    };
1146    let protocol = value
1147        .get("protocol")
1148        .and_then(|v| v.as_str())
1149        .map(|s| s.to_ascii_lowercase())
1150        .unwrap_or_else(|| "tcp".to_string());
1151    Some(PortMapping {
1152        container_port,
1153        host_port,
1154        protocol,
1155    })
1156}
1157
1158/// Build the docker `run` argv for a single container plan. Pure so unit
1159/// tests can assert on flag ordering / `--publish` translation without
1160/// shelling out. The returned vector is everything *after* the binary
1161/// name (i.e. starts with `run`, ends with the user-supplied command
1162/// args).
1163pub(crate) fn build_run_argv(
1164    plan: &ContainerPlan,
1165    env: &[(String, String)],
1166    task_id: &str,
1167    host_ip: &str,
1168    run_image: &str,
1169    awsvpc_network_ready: bool,
1170) -> Vec<String> {
1171    let mut argv: Vec<String> = Vec::new();
1172    argv.push("run".into());
1173    argv.push("-d".into());
1174    argv.push("--name".into());
1175    argv.push(format!("{}-{}", task_id, plan.container_name));
1176    argv.push("--label".into());
1177    argv.push(format!("fakecloud-ecs-task={}", task_id));
1178    argv.push("--label".into());
1179    argv.push(format!("fakecloud-ecs-container={}", plan.container_name));
1180    argv.push("--add-host".into());
1181    argv.push(format!("host.docker.internal:{}", host_ip));
1182    let use_awsvpc_network = plan.network_mode.as_deref() == Some("awsvpc") && awsvpc_network_ready;
1183    if use_awsvpc_network {
1184        argv.push("--network".into());
1185        argv.push(format!("fakecloud-ecs-{}", task_id));
1186    }
1187    // `awsvpc` puts the container on a per-task ENI; emulating that on a
1188    // local docker host means *not* publishing to the host port table.
1189    // Bridge / host / default network modes still get `--publish`. If
1190    // the awsvpc per-task network creation failed and we fell back to
1191    // bridge, we DO want to publish so the container is reachable.
1192    let publish_ports = !use_awsvpc_network;
1193    if publish_ports {
1194        for pm in &plan.port_mappings {
1195            argv.push("--publish".into());
1196            argv.push(format!(
1197                "{}:{}/{}",
1198                pm.container_port, pm.host_port, pm.protocol
1199            ));
1200        }
1201    }
1202    if let Some(ref hc) = plan.health_check {
1203        argv.extend(render_health_flags(hc));
1204    }
1205    for (k, v) in env {
1206        let transformed = v
1207            .replace("http://127.0.0.1:", "http://host.docker.internal:")
1208            .replace("https://127.0.0.1:", "https://host.docker.internal:")
1209            .replace("http://localhost:", "http://host.docker.internal:")
1210            .replace("https://localhost:", "https://host.docker.internal:");
1211        argv.push("-e".into());
1212        argv.push(format!("{}={}", k, transformed));
1213    }
1214    // Volume mounts: one `-v` flag per mountPoints entry, with the
1215    // source resolved from the task definition's `volumes[]`. EFS and
1216    // FSx stubs were materialised on the host (mkdir -p) before this
1217    // function returns, so docker can bind them straight in.
1218    for vm in &plan.volume_mounts {
1219        argv.push("-v".into());
1220        let suffix = if vm.read_only { ":ro" } else { "" };
1221        argv.push(format!("{}:{}{}", vm.source, vm.container_path, suffix));
1222    }
1223    for ul in &plan.ulimits {
1224        argv.push("--ulimit".into());
1225        argv.push(format!("{}={}:{}", ul.name, ul.soft_limit, ul.hard_limit));
1226    }
1227    if let Some(ref lp) = plan.linux_parameters {
1228        for cap in &lp.capabilities_add {
1229            argv.push("--cap-add".into());
1230            argv.push(cap.clone());
1231        }
1232        for cap in &lp.capabilities_drop {
1233            argv.push("--cap-drop".into());
1234            argv.push(cap.clone());
1235        }
1236        for dev in &lp.devices {
1237            argv.push("--device".into());
1238            argv.push(format!(
1239                "{}:{}{}",
1240                dev.host_path, dev.container_path, dev.permissions
1241            ));
1242        }
1243        if lp.init_process_enabled {
1244            argv.push("--init".into());
1245        }
1246        if let Some(size) = lp.shared_memory_size {
1247            argv.push("--shm-size".into());
1248            argv.push(format!("{}m", size));
1249        }
1250        for sys in &lp.sysctls {
1251            argv.push("--sysctl".into());
1252            argv.push(format!("{}={}", sys.name, sys.value));
1253        }
1254        for tmp in &lp.tmpfs {
1255            let mut opts = tmp.mount_options.join(",");
1256            if !opts.is_empty() {
1257                opts = format!(",{}", opts);
1258            }
1259            argv.push("--tmpfs".into());
1260            argv.push(format!("{}:size={}M{}", tmp.container_path, tmp.size, opts));
1261        }
1262        if lp.privileged {
1263            argv.push("--privileged".into());
1264        }
1265    }
1266    if let Some(timeout) = plan.stop_timeout {
1267        argv.push("--stop-timeout".into());
1268        argv.push(format!("{}", timeout));
1269    }
1270    if let Some(ref user) = plan.user {
1271        argv.push("--user".into());
1272        argv.push(user.clone());
1273    }
1274    if let Some(ref wd) = plan.working_directory {
1275        argv.push("--workdir".into());
1276        argv.push(wd.clone());
1277    }
1278    if plan.tty {
1279        argv.push("--tty".into());
1280    }
1281    if plan.interactive {
1282        argv.push("--interactive".into());
1283    }
1284    if plan.readonly_rootfs {
1285        argv.push("--read-only".into());
1286    }
1287    if let Some(first) = plan.entry_point.first() {
1288        argv.push("--entrypoint".into());
1289        argv.push(first.clone());
1290    }
1291    argv.push(run_image.to_string());
1292    for arg in plan.entry_point.iter().skip(1) {
1293        argv.push(arg.clone());
1294    }
1295    for arg in &plan.command {
1296        argv.push(arg.clone());
1297    }
1298    argv
1299}
1300
1301/// Render `networkBindings` JSON for a launched container. Empty under
1302/// `awsvpc` (the equivalent info goes on the task's ENI attachments) and
1303/// for containers without `portMappings`.
1304pub(crate) fn network_bindings_for(plan: &ContainerPlan) -> Vec<serde_json::Value> {
1305    if plan.network_mode.as_deref() == Some("awsvpc") {
1306        return Vec::new();
1307    }
1308    plan.port_mappings
1309        .iter()
1310        .map(|pm| {
1311            serde_json::json!({
1312                "bindIP": "0.0.0.0",
1313                "containerPort": pm.container_port,
1314                "hostPort": pm.host_port,
1315                "protocol": pm.protocol,
1316            })
1317        })
1318        .collect()
1319}
1320
1321/// Compute ELBv2 target registrations for a task based on its service's
1322/// loadBalancers configuration. Returns (target_group_arn, [(target_id, port)])
1323/// for each target group that should receive this task.
1324#[allow(clippy::type_complexity)]
1325pub(crate) fn compute_elbv2_targets(
1326    ecs_state: &crate::state::EcsState,
1327    task: &crate::state::Task,
1328) -> Vec<(String, Vec<(String, Option<i64>)>)> {
1329    let mut result = Vec::new();
1330    let Some(group) = task.group.as_deref() else {
1331        return result;
1332    };
1333    let service_name = group.strip_prefix("service:").unwrap_or(group);
1334    let key = crate::state::EcsState::service_key(&task.cluster_name, service_name);
1335    let Some(service) = ecs_state.services.get(&key) else {
1336        return result;
1337    };
1338
1339    let network_mode = ecs_state
1340        .task_definitions
1341        .get(&task.family)
1342        .and_then(|revs| revs.get(&task.revision))
1343        .and_then(|td| td.network_mode.as_deref());
1344
1345    for lb in &service.load_balancers {
1346        let tg_arn = lb.get("targetGroupArn").and_then(|v| v.as_str());
1347        let container_name = lb.get("containerName").and_then(|v| v.as_str());
1348        let container_port = lb.get("containerPort").and_then(|v| v.as_i64());
1349        let Some(tg_arn) = tg_arn else { continue };
1350        let Some(container_name) = container_name else {
1351            continue;
1352        };
1353
1354        let target_id = if network_mode == Some("awsvpc") {
1355            task.attachments
1356                .iter()
1357                .find(|a| a.attachment_type == "eni")
1358                .and_then(|eni| {
1359                    eni.details
1360                        .iter()
1361                        .find(|d| d.name == "privateIPv4Address")
1362                        .map(|d| d.value.clone())
1363                })
1364        } else {
1365            Some("127.0.0.1".to_string())
1366        };
1367
1368        let port = if network_mode == Some("awsvpc") {
1369            container_port
1370        } else {
1371            task.containers
1372                .iter()
1373                .find(|c| c.name == container_name)
1374                .and_then(|c| {
1375                    c.network_bindings
1376                        .iter()
1377                        .find(|nb| {
1378                            nb.get("containerPort").and_then(|v| v.as_i64()) == container_port
1379                        })
1380                        .and_then(|nb| nb.get("hostPort").and_then(|v| v.as_i64()))
1381                })
1382        };
1383
1384        if let Some(id) = target_id {
1385            if let Some(entry) = result.iter_mut().find(|(arn, _)| arn == tg_arn) {
1386                entry.1.push((id, port));
1387            } else {
1388                result.push((tg_arn.to_string(), vec![(id, port)]));
1389            }
1390        }
1391    }
1392    result
1393}
1394
1395struct TaskSnapshot {
1396    task_arn: String,
1397    cluster_arn: String,
1398    launch_type: String,
1399    group: Option<String>,
1400    task_definition_arn: String,
1401    containers: serde_json::Value,
1402}
1403
1404fn snapshot_task(state: &SharedEcsState, account_id: &str, task_id: &str) -> Option<TaskSnapshot> {
1405    let accounts = state.read();
1406    let s = accounts.get(account_id)?;
1407    let task = s.tasks.get(task_id)?;
1408    Some(TaskSnapshot {
1409        task_arn: task.task_arn.clone(),
1410        cluster_arn: task.cluster_arn.clone(),
1411        launch_type: task.launch_type.clone(),
1412        group: task.group.clone(),
1413        task_definition_arn: task.task_definition_arn.clone(),
1414        containers: serde_json::Value::Array(
1415            task.containers
1416                .iter()
1417                .map(|c| {
1418                    serde_json::json!({
1419                        "containerArn": c.container_arn,
1420                        "name": c.name,
1421                        "image": c.image,
1422                        "lastStatus": c.last_status,
1423                        "exitCode": c.exit_code,
1424                        "reason": c.reason,
1425                    })
1426                })
1427                .collect(),
1428        ),
1429    })
1430}
1431
1432fn cli_works(cli: &str) -> bool {
1433    std::process::Command::new(cli)
1434        .arg("info")
1435        .stdout(std::process::Stdio::null())
1436        .stderr(std::process::Stdio::null())
1437        .status()
1438        .map(|s| s.success())
1439        .unwrap_or(false)
1440}
1441
1442/// Build an isolated docker config directory with Basic auth for
1443/// fakecloud ECR at `127.0.0.1:<port>`. Lets `docker pull/push/tag`
1444/// work against the local OCI v2 registry without requiring the user
1445/// to run `aws ecr get-login-password | docker login` first.
1446fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
1447    let dir = TempDir::new().ok()?;
1448    let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-ecs-runtime");
1449    let config = serde_json::json!({
1450        "auths": {
1451            format!("127.0.0.1:{server_port}"): { "auth": auth },
1452        }
1453    });
1454    std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
1455    Some(dir)
1456}
1457
1458fn find_container_definition(
1459    state: &crate::state::EcsState,
1460    family: &str,
1461    revision: i32,
1462    name: &str,
1463) -> Option<serde_json::Value> {
1464    state
1465        .task_definitions
1466        .get(family)?
1467        .get(&revision)?
1468        .container_definitions
1469        .iter()
1470        .find(|c| c.get("name").and_then(|v| v.as_str()) == Some(name))
1471        .cloned()
1472}
1473
1474fn mark_pull_started(state: &SharedEcsState, account_id: &str, task_id: &str) {
1475    let mut accounts = state.write();
1476    let Some(s) = accounts.get_mut(account_id) else {
1477        return;
1478    };
1479    let task_arn_cluster = s
1480        .tasks
1481        .get(task_id)
1482        .map(|t| (t.task_arn.clone(), t.cluster_arn.clone()));
1483    if let Some(task) = s.tasks.get_mut(task_id) {
1484        task.pull_started_at = Some(Utc::now());
1485    }
1486    if let Some((arn, cluster_arn)) = task_arn_cluster {
1487        s.push_event(LifecycleEvent {
1488            at: Utc::now(),
1489            event_type: "PullStarted".into(),
1490            task_arn: Some(arn),
1491            cluster_arn: Some(cluster_arn),
1492            last_status: Some("PENDING".into()),
1493            detail: serde_json::json!({}),
1494        });
1495    }
1496}
1497
1498fn mark_pull_stopped(state: &SharedEcsState, account_id: &str, task_id: &str) {
1499    let mut accounts = state.write();
1500    let Some(s) = accounts.get_mut(account_id) else {
1501        return;
1502    };
1503    if let Some(task) = s.tasks.get_mut(task_id) {
1504        task.pull_stopped_at = Some(Utc::now());
1505    }
1506}
1507
1508pub(crate) fn mark_running_multi(
1509    state: &SharedEcsState,
1510    account_id: &str,
1511    task_id: &str,
1512    started: &[RunningContainer],
1513) {
1514    let mut accounts = state.write();
1515    let Some(s) = accounts.get_mut(account_id) else {
1516        return;
1517    };
1518    let (arn, cluster_arn) = {
1519        let Some(task) = s.tasks.get_mut(task_id) else {
1520            return;
1521        };
1522        task.last_status = "RUNNING".into();
1523        task.connectivity = "CONNECTED".into();
1524        task.connectivity_at = Some(Utc::now());
1525        task.started_at = Some(Utc::now());
1526        for rc in started {
1527            if let Some(c) = task.containers.iter_mut().find(|c| c.name == rc.name) {
1528                c.runtime_id = Some(rc.container_id.clone());
1529                c.last_status = "RUNNING".into();
1530                c.network_bindings = rc.network_bindings.clone();
1531                if rc.image_digest.is_some() {
1532                    c.image_digest = rc.image_digest.clone();
1533                }
1534            }
1535        }
1536        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1537            cluster.running_tasks_count += 1;
1538            if cluster.pending_tasks_count > 0 {
1539                cluster.pending_tasks_count -= 1;
1540            }
1541        }
1542        if let Some(ref ci_arn) = task.container_instance_arn {
1543            if let Some(ci) = s
1544                .container_instances
1545                .values_mut()
1546                .find(|ci| ci.container_instance_arn == *ci_arn)
1547            {
1548                ci.running_tasks_count += 1;
1549                if ci.pending_tasks_count > 0 {
1550                    ci.pending_tasks_count -= 1;
1551                }
1552            }
1553        }
1554        (task.task_arn.clone(), task.cluster_arn.clone())
1555    };
1556    s.push_event(LifecycleEvent {
1557        at: Utc::now(),
1558        event_type: "TaskStateChange".into(),
1559        task_arn: Some(arn),
1560        cluster_arn: Some(cluster_arn),
1561        last_status: Some("RUNNING".into()),
1562        detail: serde_json::json!({}),
1563    });
1564}
1565
1566#[allow(clippy::too_many_arguments)]
1567fn finalize_stopped_multi(
1568    state: &SharedEcsState,
1569    account_id: &str,
1570    task_id: &str,
1571    final_containers: &[RunningContainer],
1572    primary_exit_code: i64,
1573    captured: &str,
1574    stop_code: &str,
1575    stopped_reason: Option<String>,
1576) {
1577    let mut accounts = state.write();
1578    let Some(s) = accounts.get_mut(account_id) else {
1579        return;
1580    };
1581    let (arn, cluster_arn) = {
1582        let Some(task) = s.tasks.get_mut(task_id) else {
1583            return;
1584        };
1585        task.last_status = "STOPPED".into();
1586        task.desired_status = "STOPPED".into();
1587        task.stopping_at = task.stopping_at.or(Some(Utc::now()));
1588        task.stopped_at = Some(Utc::now());
1589        task.stop_code = Some(stop_code.into());
1590        task.stopped_reason = stopped_reason.or(Some(format!("Exit code {}", primary_exit_code)));
1591        task.captured_logs = captured.to_string();
1592        for c in task.containers.iter_mut() {
1593            c.last_status = "STOPPED".into();
1594            if c.exit_code.is_none() {
1595                let mapped = final_containers
1596                    .iter()
1597                    .find(|r| r.name == c.name)
1598                    .and_then(|r| r.exit_code);
1599                c.exit_code = mapped.or(Some(primary_exit_code));
1600            }
1601        }
1602        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1603            if cluster.running_tasks_count > 0 {
1604                cluster.running_tasks_count -= 1;
1605            }
1606        }
1607        if let Some(ref ci_arn) = task.container_instance_arn {
1608            if let Some(ci) = s
1609                .container_instances
1610                .values_mut()
1611                .find(|ci| ci.container_instance_arn == *ci_arn)
1612            {
1613                if ci.running_tasks_count > 0 {
1614                    ci.running_tasks_count -= 1;
1615                }
1616            }
1617        }
1618        (task.task_arn.clone(), task.cluster_arn.clone())
1619    };
1620    s.push_event(LifecycleEvent {
1621        at: Utc::now(),
1622        event_type: "TaskStateChange".into(),
1623        task_arn: Some(arn),
1624        cluster_arn: Some(cluster_arn),
1625        last_status: Some("STOPPED".into()),
1626        detail: serde_json::json!({
1627            "exitCode": primary_exit_code,
1628            "stopCode": stop_code,
1629        }),
1630    });
1631}
1632
1633fn finalize_failure(state: &SharedEcsState, account_id: &str, task_id: &str, reason: &str) {
1634    let mut accounts = state.write();
1635    let Some(s) = accounts.get_mut(account_id) else {
1636        return;
1637    };
1638    let (arn, cluster_arn) = {
1639        let Some(task) = s.tasks.get_mut(task_id) else {
1640            return;
1641        };
1642        // Capture the prior status before we clobber it: if the task had
1643        // already reached RUNNING when execution failed (e.g. `docker wait`
1644        // blew up after the container started), we owe the cluster a
1645        // running-tasks decrement. Tasks that died before RUNNING only
1646        // ever incremented pendingTasksCount.
1647        let was_running = task.last_status == "RUNNING";
1648        task.last_status = "STOPPED".into();
1649        task.desired_status = "STOPPED".into();
1650        task.stopped_at = Some(Utc::now());
1651        task.stop_code = Some("TaskFailedToStart".into());
1652        task.stopped_reason = Some(reason.to_string());
1653        // Surface the failure reason on the /logs endpoint — without this,
1654        // a task that never reached RUNNING returns an empty log string,
1655        // leaving E2E assertions with no diagnostic.
1656        task.captured_logs = format!("[task failed to start]: {reason}");
1657        for c in task.containers.iter_mut() {
1658            c.last_status = "STOPPED".into();
1659            c.reason = Some(reason.to_string());
1660        }
1661        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1662            if was_running {
1663                if cluster.running_tasks_count > 0 {
1664                    cluster.running_tasks_count -= 1;
1665                }
1666            } else if cluster.pending_tasks_count > 0 {
1667                cluster.pending_tasks_count -= 1;
1668            }
1669        }
1670        if let Some(ref ci_arn) = task.container_instance_arn {
1671            if let Some(ci) = s
1672                .container_instances
1673                .values_mut()
1674                .find(|ci| ci.container_instance_arn == *ci_arn)
1675            {
1676                if was_running {
1677                    if ci.running_tasks_count > 0 {
1678                        ci.running_tasks_count -= 1;
1679                    }
1680                } else if ci.pending_tasks_count > 0 {
1681                    ci.pending_tasks_count -= 1;
1682                }
1683            }
1684        }
1685        (task.task_arn.clone(), task.cluster_arn.clone())
1686    };
1687    s.push_event(LifecycleEvent {
1688        at: Utc::now(),
1689        event_type: "TaskFailedToStart".into(),
1690        task_arn: Some(arn),
1691        cluster_arn: Some(cluster_arn),
1692        last_status: Some("STOPPED".into()),
1693        detail: serde_json::json!({ "reason": reason }),
1694    });
1695}
1696
1697/// Short helper for tests + snapshot code to sleep between state
1698/// transitions. Exposed on the crate boundary to keep test timing
1699/// centralized.
1700pub async fn sleep(duration: Duration) {
1701    tokio::time::sleep(duration).await;
1702}
1703
1704#[cfg(test)]
1705mod tests {
1706    use super::*;
1707    use crate::state::{EcsState, Task};
1708    use fakecloud_aws::arn::Arn;
1709    use fakecloud_core::multi_account::MultiAccountState;
1710    use parking_lot::RwLock;
1711    use std::sync::Arc;
1712
1713    #[test]
1714    fn cli_works_for_known_missing_binary_is_false() {
1715        assert!(!cli_works("definitely-not-a-real-cli-binary-xyz"));
1716    }
1717
1718    #[test]
1719    fn aws_ecr_uris_translate_for_local_pull() {
1720        assert_eq!(
1721            fakecloud_core::ecr_uri::translate_to_local(
1722                "123456789012.dkr.ecr.us-east-1.amazonaws.com/app:latest",
1723                4566
1724            )
1725            .as_deref(),
1726            Some("127.0.0.1:4566/app:latest")
1727        );
1728    }
1729
1730    fn make_task(task_id: &str) -> Task {
1731        Task {
1732            task_arn: Arn::new(
1733                "ecs",
1734                "us-east-1",
1735                "000000000000",
1736                &format!("task/default/{task_id}"),
1737            )
1738            .to_string(),
1739            task_id: task_id.into(),
1740            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
1741            cluster_name: "default".into(),
1742            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
1743            family: "app".into(),
1744            revision: 1,
1745            container_instance_arn: None,
1746            capacity_provider_name: None,
1747            last_status: "PENDING".into(),
1748            desired_status: "RUNNING".into(),
1749            launch_type: "FARGATE".into(),
1750            platform_version: None,
1751            cpu: None,
1752            memory: None,
1753            containers: Vec::new(),
1754            overrides: serde_json::json!({}),
1755            started_by: None,
1756            group: None,
1757            connectivity: "CONNECTING".into(),
1758            stop_code: None,
1759            stopped_reason: None,
1760            created_at: Utc::now(),
1761            started_at: None,
1762            stopping_at: None,
1763            stopped_at: None,
1764            pull_started_at: None,
1765            pull_stopped_at: None,
1766            connectivity_at: None,
1767            started_by_ref_id: None,
1768            execution_role_arn: None,
1769            task_role_arn: None,
1770            tags: Vec::new(),
1771            awslogs: None,
1772            captured_logs: String::new(),
1773            protection: None,
1774            enable_execute_command: false,
1775            attachments: Vec::new(),
1776            volume_configurations: Vec::new(),
1777            task_set_arn: None,
1778        }
1779    }
1780
1781    #[test]
1782    fn finalize_failure_writes_reason_into_captured_logs() {
1783        let mut accounts: MultiAccountState<EcsState> =
1784            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1785        let acct = accounts.get_or_create("000000000000");
1786        acct.tasks.insert("t1".into(), make_task("t1"));
1787        let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1788
1789        finalize_failure(
1790            &state,
1791            "000000000000",
1792            "t1",
1793            "failed to resolve secret DB_PASSWORD",
1794        );
1795
1796        let accounts = state.read();
1797        let task = accounts
1798            .get("000000000000")
1799            .unwrap()
1800            .tasks
1801            .get("t1")
1802            .unwrap();
1803        assert_eq!(task.last_status, "STOPPED");
1804        assert_eq!(task.stop_code.as_deref(), Some("TaskFailedToStart"));
1805        assert!(
1806            task.captured_logs
1807                .contains("failed to resolve secret DB_PASSWORD"),
1808            "captured_logs missing reason: {:?}",
1809            task.captured_logs
1810        );
1811        assert!(
1812            task.captured_logs.starts_with("[task failed to start]:"),
1813            "captured_logs missing prefix: {:?}",
1814            task.captured_logs
1815        );
1816    }
1817
1818    fn make_container(name: &str, essential: bool) -> crate::state::Container {
1819        crate::state::Container {
1820            container_arn: format!(
1821                "arn:aws:ecs:us-east-1:000000000000:container/default/abc/{name}"
1822            ),
1823            name: name.into(),
1824            image: "alpine".into(),
1825            task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
1826            last_status: "RUNNING".into(),
1827            exit_code: None,
1828            reason: None,
1829            runtime_id: Some(format!("dockerid-{name}")),
1830            essential,
1831            cpu: None,
1832            memory: None,
1833            memory_reservation: None,
1834            network_bindings: Vec::new(),
1835            network_interfaces: Vec::new(),
1836            health_status: None,
1837            managed_agents: None,
1838            image_digest: None,
1839        }
1840    }
1841
1842    #[test]
1843    fn task_should_stop_when_essential_exits() {
1844        let containers = vec![
1845            RunningContainer {
1846                name: "app".into(),
1847                container_id: "id-app".into(),
1848                essential: true,
1849                exit_code: Some(0),
1850                network_bindings: Vec::new(),
1851                image_digest: None,
1852            },
1853            RunningContainer {
1854                name: "sidecar".into(),
1855                container_id: "id-sc".into(),
1856                essential: false,
1857                exit_code: None,
1858                network_bindings: Vec::new(),
1859                image_digest: None,
1860            },
1861        ];
1862        assert!(task_should_stop(&containers));
1863    }
1864
1865    #[test]
1866    fn task_keeps_running_when_only_non_essential_exits() {
1867        let containers = vec![
1868            RunningContainer {
1869                name: "app".into(),
1870                container_id: "id-app".into(),
1871                essential: true,
1872                exit_code: None,
1873                network_bindings: Vec::new(),
1874                image_digest: None,
1875            },
1876            RunningContainer {
1877                name: "sidecar".into(),
1878                container_id: "id-sc".into(),
1879                essential: false,
1880                exit_code: Some(0),
1881                network_bindings: Vec::new(),
1882                image_digest: None,
1883            },
1884        ];
1885        assert!(!task_should_stop(&containers));
1886    }
1887
1888    #[test]
1889    fn task_stops_when_all_non_essentials_exit() {
1890        let containers = vec![
1891            RunningContainer {
1892                name: "a".into(),
1893                container_id: "id-a".into(),
1894                essential: false,
1895                exit_code: Some(0),
1896                network_bindings: Vec::new(),
1897                image_digest: None,
1898            },
1899            RunningContainer {
1900                name: "b".into(),
1901                container_id: "id-b".into(),
1902                essential: false,
1903                exit_code: Some(1),
1904                network_bindings: Vec::new(),
1905                image_digest: None,
1906            },
1907        ];
1908        assert!(task_should_stop(&containers));
1909    }
1910
1911    #[test]
1912    fn finalize_stopped_multi_assigns_per_container_exit_codes() {
1913        let mut accounts: MultiAccountState<EcsState> =
1914            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1915        let acct = accounts.get_or_create("000000000000");
1916        let mut t = make_task("t1");
1917        t.containers = vec![
1918            make_container("app", true),
1919            make_container("sidecar", false),
1920        ];
1921        acct.tasks.insert("t1".into(), t);
1922        let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1923
1924        let final_containers = vec![
1925            RunningContainer {
1926                name: "app".into(),
1927                container_id: "id-app".into(),
1928                essential: true,
1929                exit_code: Some(0),
1930                network_bindings: Vec::new(),
1931                image_digest: None,
1932            },
1933            RunningContainer {
1934                name: "sidecar".into(),
1935                container_id: "id-sc".into(),
1936                essential: false,
1937                exit_code: Some(137),
1938                network_bindings: Vec::new(),
1939                image_digest: None,
1940            },
1941        ];
1942        finalize_stopped_multi(
1943            &state,
1944            "000000000000",
1945            "t1",
1946            &final_containers,
1947            0,
1948            "captured",
1949            "EssentialContainerExited",
1950            None,
1951        );
1952
1953        let accounts = state.read();
1954        let task = accounts
1955            .get("000000000000")
1956            .unwrap()
1957            .tasks
1958            .get("t1")
1959            .unwrap();
1960        assert_eq!(task.last_status, "STOPPED");
1961        assert_eq!(task.stop_code.as_deref(), Some("EssentialContainerExited"));
1962        let app = task.containers.iter().find(|c| c.name == "app").unwrap();
1963        let sc = task
1964            .containers
1965            .iter()
1966            .find(|c| c.name == "sidecar")
1967            .unwrap();
1968        assert_eq!(app.exit_code, Some(0));
1969        assert_eq!(sc.exit_code, Some(137));
1970        assert_eq!(app.last_status, "STOPPED");
1971        assert_eq!(sc.last_status, "STOPPED");
1972    }
1973
1974    fn plan(name: &str, deps: &[&str]) -> ContainerPlan {
1975        ContainerPlan {
1976            container_name: name.into(),
1977            image: "alpine".into(),
1978            env: Vec::new(),
1979            entry_point: Vec::new(),
1980            command: Vec::new(),
1981            secrets_refs: Vec::new(),
1982            essential: true,
1983            has_task_role: false,
1984            port_mappings: Vec::new(),
1985            network_mode: None,
1986            depends_on: deps
1987                .iter()
1988                .map(|s| DependsOn {
1989                    container_name: (*s).to_string(),
1990                    condition: DependsOnCondition::Start,
1991                })
1992                .collect(),
1993            health_check: None,
1994            volume_mounts: Vec::new(),
1995            ulimits: Vec::new(),
1996            linux_parameters: None,
1997            stop_timeout: None,
1998            user: None,
1999            working_directory: None,
2000            tty: false,
2001            interactive: false,
2002            readonly_rootfs: false,
2003        }
2004    }
2005
2006    #[test]
2007    fn topo_sort_orders_by_depends_on() {
2008        // sidecar depends on app, so app must come first regardless of
2009        // declaration order.
2010        let plans = vec![plan("sidecar", &["app"]), plan("app", &[])];
2011        let ordered = topo_sort_plans(plans);
2012        assert_eq!(ordered[0].container_name, "app");
2013        assert_eq!(ordered[1].container_name, "sidecar");
2014    }
2015
2016    #[test]
2017    fn topo_sort_preserves_declaration_order_when_no_deps() {
2018        let plans = vec![plan("first", &[]), plan("second", &[]), plan("third", &[])];
2019        let ordered = topo_sort_plans(plans);
2020        let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2021        assert_eq!(names, vec!["first", "second", "third"]);
2022    }
2023
2024    #[test]
2025    fn topo_sort_handles_chain() {
2026        // c -> b -> a, declared in reverse so the topological sort must
2027        // bubble dependencies up.
2028        let plans = vec![plan("c", &["b"]), plan("b", &["a"]), plan("a", &[])];
2029        let ordered = topo_sort_plans(plans);
2030        let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2031        assert_eq!(names, vec!["a", "b", "c"]);
2032    }
2033
2034    #[test]
2035    fn topo_sort_ignores_unknown_dependency() {
2036        // depends_on names a container not in this task definition. Real
2037        // ECS would reject this at register time; we don't (yet), so the
2038        // unknown dep should just be skipped instead of stalling the sort.
2039        let plans = vec![plan("only", &["does-not-exist"])];
2040        let ordered = topo_sort_plans(plans);
2041        assert_eq!(ordered.len(), 1);
2042        assert_eq!(ordered[0].container_name, "only");
2043    }
2044
2045    #[test]
2046    fn topo_sort_recovers_from_cycle() {
2047        // Cyclic dependsOn: both plans should still appear in the output
2048        // so the runtime doesn't silently drop them.
2049        let plans = vec![plan("a", &["b"]), plan("b", &["a"])];
2050        let ordered = topo_sort_plans(plans);
2051        assert_eq!(ordered.len(), 2);
2052    }
2053
2054    #[test]
2055    fn parse_health_check_fills_aws_defaults() {
2056        let v = serde_json::json!({
2057            "command": ["CMD-SHELL", "curl -f http://localhost/ || exit 1"],
2058        });
2059        let hc = __test_parse_health_check(&v).expect("parsed");
2060        assert_eq!(hc.command[0], "CMD-SHELL");
2061        assert_eq!(hc.interval_seconds, 30);
2062        assert_eq!(hc.timeout_seconds, 5);
2063        assert_eq!(hc.retries, 3);
2064        assert_eq!(hc.start_period_seconds, 0);
2065    }
2066
2067    #[test]
2068    fn parse_health_check_overrides_explicit_values() {
2069        let v = serde_json::json!({
2070            "command": ["CMD", "/probe"],
2071            "interval": 7,
2072            "timeout": 2,
2073            "retries": 9,
2074            "startPeriod": 12,
2075        });
2076        let hc = __test_parse_health_check(&v).expect("parsed");
2077        assert_eq!(hc.interval_seconds, 7);
2078        assert_eq!(hc.timeout_seconds, 2);
2079        assert_eq!(hc.retries, 9);
2080        assert_eq!(hc.start_period_seconds, 12);
2081    }
2082
2083    #[test]
2084    fn parse_health_check_returns_none_for_none_sentinel() {
2085        // ECS uses ["NONE"] to disable an inherited HEALTHCHECK; we
2086        // skip emission rather than passing a literal `none` to docker.
2087        let v = serde_json::json!({ "command": ["NONE"] });
2088        assert!(__test_parse_health_check(&v).is_none());
2089    }
2090
2091    #[test]
2092    fn parse_health_check_returns_none_for_missing_command() {
2093        let v = serde_json::json!({ "interval": 30 });
2094        assert!(__test_parse_health_check(&v).is_none());
2095    }
2096
2097    #[test]
2098    fn render_health_flags_emits_full_set_for_cmd_shell() {
2099        let hc = HealthCheckSpec {
2100            command: vec!["CMD-SHELL".into(), "curl -f http://localhost/".into()],
2101            interval_seconds: 15,
2102            timeout_seconds: 3,
2103            retries: 4,
2104            start_period_seconds: 10,
2105        };
2106        let flags = render_health_flags(&hc);
2107        assert_eq!(flags[0], "--health-cmd");
2108        assert_eq!(flags[1], "curl -f http://localhost/");
2109        assert!(flags.contains(&"--health-interval=15s".to_string()));
2110        assert!(flags.contains(&"--health-timeout=3s".to_string()));
2111        assert!(flags.contains(&"--health-retries=4".to_string()));
2112        assert!(flags.contains(&"--health-start-period=10s".to_string()));
2113    }
2114
2115    #[test]
2116    fn render_health_flags_joins_cmd_argv_with_spaces() {
2117        // CMD form in ECS is argv-style; docker `--health-cmd` only
2118        // accepts a single shell string, so we collapse with spaces.
2119        let hc = HealthCheckSpec {
2120            command: vec![
2121                "CMD".into(),
2122                "/bin/probe".into(),
2123                "--port".into(),
2124                "8080".into(),
2125            ],
2126            interval_seconds: 30,
2127            timeout_seconds: 5,
2128            retries: 3,
2129            start_period_seconds: 0,
2130        };
2131        let flags = render_health_flags(&hc);
2132        assert_eq!(flags[1], "/bin/probe --port 8080");
2133    }
2134
2135    #[test]
2136    fn build_run_argv_emits_health_flags_when_present() {
2137        let plan = ContainerPlan {
2138            container_name: "app".into(),
2139            image: "alpine".into(),
2140            env: Vec::new(),
2141            entry_point: Vec::new(),
2142            command: Vec::new(),
2143            secrets_refs: Vec::new(),
2144            essential: true,
2145            has_task_role: false,
2146            port_mappings: Vec::new(),
2147            network_mode: None,
2148            depends_on: Vec::new(),
2149            health_check: Some(HealthCheckSpec {
2150                command: vec!["CMD-SHELL".into(), "true".into()],
2151                interval_seconds: 5,
2152                timeout_seconds: 2,
2153                retries: 1,
2154                start_period_seconds: 1,
2155            }),
2156            volume_mounts: Vec::new(),
2157            ulimits: Vec::new(),
2158            linux_parameters: None,
2159            stop_timeout: None,
2160            user: None,
2161            working_directory: None,
2162            tty: false,
2163            interactive: false,
2164            readonly_rootfs: false,
2165        };
2166        let argv = build_run_argv(&plan, &[], "task-1", "host-gateway", "alpine", true);
2167        let joined = argv.join(" ");
2168        assert!(joined.contains("--health-cmd true"), "argv: {joined}");
2169        assert!(joined.contains("--health-interval=5s"), "argv: {joined}");
2170        assert!(joined.contains("--health-timeout=2s"), "argv: {joined}");
2171        assert!(joined.contains("--health-retries=1"), "argv: {joined}");
2172        assert!(
2173            joined.contains("--health-start-period=1s"),
2174            "argv: {joined}"
2175        );
2176    }
2177
2178    #[test]
2179    fn build_run_argv_emits_no_health_flags_when_absent() {
2180        let plan = ContainerPlan {
2181            container_name: "app".into(),
2182            image: "alpine".into(),
2183            env: Vec::new(),
2184            entry_point: Vec::new(),
2185            command: Vec::new(),
2186            secrets_refs: Vec::new(),
2187            essential: true,
2188            has_task_role: false,
2189            port_mappings: Vec::new(),
2190            network_mode: None,
2191            depends_on: Vec::new(),
2192            health_check: None,
2193            volume_mounts: Vec::new(),
2194            ulimits: Vec::new(),
2195            linux_parameters: None,
2196            stop_timeout: None,
2197            user: None,
2198            working_directory: None,
2199            tty: false,
2200            interactive: false,
2201            readonly_rootfs: false,
2202        };
2203        let argv = build_run_argv(&plan, &[], "task-1", "host-gateway", "alpine", true);
2204        assert!(!argv.iter().any(|s| s.starts_with("--health")));
2205    }
2206
2207    #[test]
2208    fn docker_health_to_ecs_maps_known_states() {
2209        assert_eq!(docker_health_to_ecs("healthy"), "HEALTHY");
2210        assert_eq!(docker_health_to_ecs("HEALTHY"), "HEALTHY");
2211        assert_eq!(docker_health_to_ecs("unhealthy"), "UNHEALTHY");
2212        assert_eq!(docker_health_to_ecs("starting"), "UNKNOWN");
2213        assert_eq!(docker_health_to_ecs("none"), "UNKNOWN");
2214        assert_eq!(docker_health_to_ecs(""), "UNKNOWN");
2215    }
2216
2217    /// `host.sourcePath` becomes a host bind mount with the path
2218    /// passed straight through to docker.
2219    #[test]
2220    fn resolve_host_bind_volume_uses_source_path() {
2221        let mut volumes = std::collections::HashMap::new();
2222        let v = serde_json::json!({
2223            "name": "data",
2224            "host": { "sourcePath": "/var/lib/myapp" }
2225        });
2226        volumes.insert("data".to_string(), &v);
2227        let mp = serde_json::json!({
2228            "sourceVolume": "data",
2229            "containerPath": "/app/data",
2230            "readOnly": false
2231        });
2232        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2233        assert_eq!(resolved.source, "/var/lib/myapp");
2234        assert_eq!(resolved.container_path, "/app/data");
2235        assert!(!resolved.read_only);
2236    }
2237
2238    /// `readOnly: true` on the mount point appends `:ro` to the
2239    /// rendered docker `-v` flag.
2240    #[test]
2241    fn read_only_mount_renders_ro_suffix() {
2242        let plan = ContainerPlan {
2243            container_name: "app".into(),
2244            image: "alpine".into(),
2245            env: Vec::new(),
2246            entry_point: Vec::new(),
2247            command: Vec::new(),
2248            secrets_refs: Vec::new(),
2249            essential: true,
2250            has_task_role: false,
2251            port_mappings: Vec::new(),
2252            network_mode: None,
2253            depends_on: Vec::new(),
2254            health_check: None,
2255            volume_mounts: vec![VolumeMount {
2256                source: "/host/path".into(),
2257                container_path: "/in/container".into(),
2258                read_only: true,
2259            }],
2260            ulimits: Vec::new(),
2261            linux_parameters: None,
2262            stop_timeout: None,
2263            user: None,
2264            working_directory: None,
2265            tty: false,
2266            interactive: false,
2267            readonly_rootfs: false,
2268        };
2269        let argv = build_run_argv(&plan, &[], "task-1", "host-gateway", "alpine", true);
2270        let pair = argv
2271            .windows(2)
2272            .find(|w| w[0] == "-v")
2273            .expect("expected -v flag");
2274        assert_eq!(pair[1], "/host/path:/in/container:ro");
2275    }
2276
2277    /// EFS volumes resolve to a stub directory under `/tmp/fakecloud/efs`
2278    /// keyed by `fileSystemId`. `rootDirectory` (when set and not `/`)
2279    /// is appended so different mount targets within the same
2280    /// filesystem stay isolated.
2281    #[test]
2282    fn resolve_efs_volume_uses_stub_dir() {
2283        let mut volumes = std::collections::HashMap::new();
2284        let v = serde_json::json!({
2285            "name": "efs-vol",
2286            "efsVolumeConfiguration": {
2287                "fileSystemId": "fs-12345678",
2288                "rootDirectory": "/exports/app"
2289            }
2290        });
2291        volumes.insert("efs-vol".to_string(), &v);
2292        let mp = serde_json::json!({
2293            "sourceVolume": "efs-vol",
2294            "containerPath": "/mnt/efs"
2295        });
2296        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2297        assert_eq!(
2298            resolved.source,
2299            "/tmp/fakecloud/efs/fs-12345678/exports/app"
2300        );
2301        assert_eq!(resolved.container_path, "/mnt/efs");
2302    }
2303
2304    /// EFS without `rootDirectory` (or with `/`) maps to the root of
2305    /// the filesystem stub so multiple tasks targeting the same id
2306    /// share state.
2307    #[test]
2308    fn efs_without_root_directory_uses_filesystem_root() {
2309        assert_eq!(
2310            stub_dir_for("efs", "fs-abc", "/"),
2311            "/tmp/fakecloud/efs/fs-abc"
2312        );
2313        assert_eq!(
2314            stub_dir_for("efs", "fs-abc", ""),
2315            "/tmp/fakecloud/efs/fs-abc"
2316        );
2317    }
2318
2319    /// `dockerVolumeConfiguration` resolves to the volume name itself,
2320    /// which docker treats as a named volume reference. No host path
2321    /// is materialised — docker creates the volume on first reference.
2322    #[test]
2323    fn resolve_docker_named_volume_uses_volume_name() {
2324        let mut volumes = std::collections::HashMap::new();
2325        let v = serde_json::json!({
2326            "name": "named-vol",
2327            "dockerVolumeConfiguration": {
2328                "scope": "task",
2329                "driver": "local"
2330            }
2331        });
2332        volumes.insert("named-vol".to_string(), &v);
2333        let mp = serde_json::json!({
2334            "sourceVolume": "named-vol",
2335            "containerPath": "/data"
2336        });
2337        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2338        assert_eq!(resolved.source, "named-vol");
2339        assert_eq!(resolved.container_path, "/data");
2340    }
2341
2342    /// FSx for Windows uses the same stub-directory pattern as EFS but
2343    /// scoped under `/tmp/fakecloud/fsx/<filesystemId>/`.
2344    #[test]
2345    fn resolve_fsx_volume_uses_stub_dir() {
2346        let mut volumes = std::collections::HashMap::new();
2347        let v = serde_json::json!({
2348            "name": "fsx-vol",
2349            "fsxWindowsFileServerVolumeConfiguration": {
2350                "fileSystemId": "fs-xyz",
2351                "rootDirectory": "share"
2352            }
2353        });
2354        volumes.insert("fsx-vol".to_string(), &v);
2355        let mp = serde_json::json!({
2356            "sourceVolume": "fsx-vol",
2357            "containerPath": "C:\\data"
2358        });
2359        let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2360        assert_eq!(resolved.source, "/tmp/fakecloud/fsx/fs-xyz/share");
2361    }
2362
2363    /// Mount points that reference an undeclared `sourceVolume` resolve
2364    /// to `None` so `build_container_plans` skips them rather than
2365    /// emitting a broken `-v` flag.
2366    #[test]
2367    fn unknown_source_volume_returns_none() {
2368        let volumes = std::collections::HashMap::new();
2369        let mp = serde_json::json!({
2370            "sourceVolume": "missing",
2371            "containerPath": "/x"
2372        });
2373        assert!(resolve_mount_point(&mp, &volumes).is_none());
2374    }
2375
2376    /// `find_depends_on_cycle` returns the back-edge endpoints when a
2377    /// trivial 2-cycle exists. Real ECS would reject this at register
2378    /// time; our service-level handler relies on this helper.
2379    #[test]
2380    fn find_depends_on_cycle_detects_two_node_cycle() {
2381        let cds = vec![
2382            serde_json::json!({
2383                "name": "a",
2384                "image": "alpine",
2385                "dependsOn": [{"containerName": "b", "condition": "START"}],
2386            }),
2387            serde_json::json!({
2388                "name": "b",
2389                "image": "alpine",
2390                "dependsOn": [{"containerName": "a", "condition": "START"}],
2391            }),
2392        ];
2393        let cycle = find_depends_on_cycle(&cds);
2394        assert!(cycle.is_some(), "expected cycle to be detected");
2395    }
2396
2397    /// A three-node chain (a -> b -> c) is acyclic and must not be
2398    /// flagged. Guards against an over-eager DFS reporting back-edges
2399    /// from already-finished nodes.
2400    #[test]
2401    fn find_depends_on_cycle_accepts_chain() {
2402        let cds = vec![
2403            serde_json::json!({
2404                "name": "a",
2405                "image": "alpine",
2406                "dependsOn": [{"containerName": "b", "condition": "START"}],
2407            }),
2408            serde_json::json!({
2409                "name": "b",
2410                "image": "alpine",
2411                "dependsOn": [{"containerName": "c", "condition": "START"}],
2412            }),
2413            serde_json::json!({
2414                "name": "c",
2415                "image": "alpine",
2416            }),
2417        ];
2418        assert!(find_depends_on_cycle(&cds).is_none());
2419    }
2420
2421    /// `dependsOn[]` entries that name a container outside the task
2422    /// definition are ignored by the cycle check (they can't form a
2423    /// cycle by definition; runtime also drops them).
2424    #[test]
2425    fn find_depends_on_cycle_ignores_unknown_target() {
2426        let cds = vec![serde_json::json!({
2427            "name": "only",
2428            "image": "alpine",
2429            "dependsOn": [{"containerName": "ghost", "condition": "START"}],
2430        })];
2431        assert!(find_depends_on_cycle(&cds).is_none());
2432    }
2433
2434    /// `condition_is_met` covers each AWS condition value against a
2435    /// simulated docker inspect snapshot. Pinning these mappings here
2436    /// catches accidental re-orderings of the match arms.
2437    #[test]
2438    fn condition_is_met_matches_aws_semantics() {
2439        let running = InspectedState {
2440            started: true,
2441            exited: false,
2442            exit_code: 0,
2443            health: None,
2444        };
2445        let exited_ok = InspectedState {
2446            started: true,
2447            exited: true,
2448            exit_code: 0,
2449            health: None,
2450        };
2451        let exited_fail = InspectedState {
2452            started: true,
2453            exited: true,
2454            exit_code: 1,
2455            health: None,
2456        };
2457        let healthy = InspectedState {
2458            started: true,
2459            exited: false,
2460            exit_code: 0,
2461            health: Some("healthy".into()),
2462        };
2463
2464        // START is satisfied as soon as the container has started, even
2465        // if it later exited.
2466        assert!(condition_is_met(DependsOnCondition::Start, &running));
2467        assert!(condition_is_met(DependsOnCondition::Start, &exited_ok));
2468
2469        // COMPLETE requires an exit, regardless of code.
2470        assert!(!condition_is_met(DependsOnCondition::Complete, &running));
2471        assert!(condition_is_met(DependsOnCondition::Complete, &exited_ok));
2472        assert!(condition_is_met(DependsOnCondition::Complete, &exited_fail));
2473
2474        // SUCCESS requires an exit AND code 0.
2475        assert!(!condition_is_met(DependsOnCondition::Success, &running));
2476        assert!(condition_is_met(DependsOnCondition::Success, &exited_ok));
2477        assert!(!condition_is_met(DependsOnCondition::Success, &exited_fail));
2478
2479        // HEALTHY requires Health.Status == "healthy".
2480        assert!(!condition_is_met(DependsOnCondition::Healthy, &running));
2481        assert!(condition_is_met(DependsOnCondition::Healthy, &healthy));
2482    }
2483
2484    /// `DependsOnCondition::parse` accepts the four AWS-spelled values
2485    /// and rejects everything else — register-time validation depends on
2486    /// this returning `None` for unknowns.
2487    #[test]
2488    fn depends_on_condition_parse_round_trips() {
2489        assert_eq!(
2490            DependsOnCondition::parse("START"),
2491            Some(DependsOnCondition::Start)
2492        );
2493        assert_eq!(
2494            DependsOnCondition::parse("COMPLETE"),
2495            Some(DependsOnCondition::Complete)
2496        );
2497        assert_eq!(
2498            DependsOnCondition::parse("SUCCESS"),
2499            Some(DependsOnCondition::Success)
2500        );
2501        assert_eq!(
2502            DependsOnCondition::parse("HEALTHY"),
2503            Some(DependsOnCondition::Healthy)
2504        );
2505        assert_eq!(DependsOnCondition::parse("start"), None);
2506        assert_eq!(DependsOnCondition::parse("ANY"), None);
2507    }
2508
2509    // ── ulimits + linuxParameters + misc docker flags (O6) ──
2510
2511    #[test]
2512    fn build_run_argv_emits_ulimits() {
2513        let plan = ContainerPlan {
2514            container_name: "app".into(),
2515            image: "alpine".into(),
2516            env: Vec::new(),
2517            entry_point: Vec::new(),
2518            command: Vec::new(),
2519            secrets_refs: Vec::new(),
2520            essential: true,
2521            has_task_role: false,
2522            port_mappings: Vec::new(),
2523            network_mode: None,
2524            depends_on: Vec::new(),
2525            health_check: None,
2526            volume_mounts: Vec::new(),
2527            ulimits: vec![Ulimit {
2528                name: "nofile".into(),
2529                soft_limit: 1024,
2530                hard_limit: 2048,
2531            }],
2532            linux_parameters: None,
2533            stop_timeout: None,
2534            user: None,
2535            working_directory: None,
2536            tty: false,
2537            interactive: false,
2538            readonly_rootfs: false,
2539        };
2540        let argv = build_run_argv(&plan, &[], "t", "host", "img", true);
2541        assert!(argv.contains(&"--ulimit".to_string()));
2542        assert!(argv.contains(&"nofile=1024:2048".to_string()));
2543    }
2544
2545    #[test]
2546    fn build_run_argv_emits_linux_parameters() {
2547        let plan = ContainerPlan {
2548            container_name: "app".into(),
2549            image: "alpine".into(),
2550            env: Vec::new(),
2551            entry_point: Vec::new(),
2552            command: Vec::new(),
2553            secrets_refs: Vec::new(),
2554            essential: true,
2555            has_task_role: false,
2556            port_mappings: Vec::new(),
2557            network_mode: None,
2558            depends_on: Vec::new(),
2559            health_check: None,
2560            volume_mounts: Vec::new(),
2561            ulimits: Vec::new(),
2562            linux_parameters: Some(LinuxParameters {
2563                capabilities_add: vec!["NET_ADMIN".into()],
2564                capabilities_drop: vec!["ALL".into()],
2565                devices: vec![Device {
2566                    host_path: "/dev/zero".into(),
2567                    container_path: "/dev/zero".into(),
2568                    permissions: "rwm".into(),
2569                }],
2570                init_process_enabled: true,
2571                shared_memory_size: Some(256),
2572                sysctls: vec![Sysctl {
2573                    name: "net.ipv4.ip_forward".into(),
2574                    value: "1".into(),
2575                }],
2576                tmpfs: vec![Tmpfs {
2577                    container_path: "/tmp".into(),
2578                    size: 128,
2579                    mount_options: vec!["noexec".into()],
2580                }],
2581                privileged: true,
2582            }),
2583            stop_timeout: Some(30),
2584            user: Some("1000:1000".into()),
2585            working_directory: Some("/app".into()),
2586            tty: true,
2587            interactive: true,
2588            readonly_rootfs: true,
2589        };
2590        let argv = build_run_argv(&plan, &[], "t", "host", "img", true);
2591        assert!(argv.contains(&"--cap-add".to_string()));
2592        assert!(argv.contains(&"NET_ADMIN".to_string()));
2593        assert!(argv.contains(&"--cap-drop".to_string()));
2594        assert!(argv.contains(&"ALL".to_string()));
2595        assert!(argv.contains(&"--device".to_string()));
2596        assert!(argv.contains(&"/dev/zero:/dev/zerorwm".to_string()));
2597        assert!(argv.contains(&"--init".to_string()));
2598        assert!(argv.contains(&"--shm-size".to_string()));
2599        assert!(argv.contains(&"256m".to_string()));
2600        assert!(argv.contains(&"--sysctl".to_string()));
2601        assert!(argv.contains(&"net.ipv4.ip_forward=1".to_string()));
2602        assert!(argv.contains(&"--tmpfs".to_string()));
2603        assert!(argv.contains(&"--privileged".to_string()));
2604        assert!(argv.contains(&"--stop-timeout".to_string()));
2605        assert!(argv.contains(&"30".to_string()));
2606        assert!(argv.contains(&"--user".to_string()));
2607        assert!(argv.contains(&"1000:1000".to_string()));
2608        assert!(argv.contains(&"--workdir".to_string()));
2609        assert!(argv.contains(&"/app".to_string()));
2610        assert!(argv.contains(&"--tty".to_string()));
2611        assert!(argv.contains(&"--interactive".to_string()));
2612        assert!(argv.contains(&"--read-only".to_string()));
2613    }
2614
2615    #[test]
2616    fn parse_linux_parameters_fills_defaults() {
2617        let raw = serde_json::json!({"initProcessEnabled": true});
2618        let lp = parse_linux_parameters(&raw).expect("parses");
2619        assert!(lp.init_process_enabled);
2620        assert!(!lp.privileged);
2621        assert!(lp.capabilities_add.is_empty());
2622    }
2623
2624    #[test]
2625    fn parse_device_uses_default_permissions() {
2626        let raw = serde_json::json!({"hostPath": "/dev/null", "containerPath": "/dev/null"});
2627        let dev = parse_device(&raw).expect("parses");
2628        assert_eq!(dev.permissions, "rwm");
2629    }
2630
2631    #[test]
2632    fn compute_elbv2_targets_empty_when_no_group() {
2633        let mut accounts: MultiAccountState<EcsState> =
2634            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2635        let acct = accounts.get_or_create("000000000000");
2636        let mut task = make_task("t1");
2637        task.group = None;
2638        acct.tasks.insert("t1".into(), task);
2639        let state = acct.clone();
2640        let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2641        assert!(targets.is_empty());
2642    }
2643
2644    #[test]
2645    fn compute_elbv2_targets_bridge_mode_uses_localhost_and_host_port() {
2646        let mut accounts: MultiAccountState<EcsState> =
2647            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2648        let acct = accounts.get_or_create("000000000000");
2649
2650        let td = crate::state::TaskDefinition {
2651            family: "app".into(),
2652            revision: 1,
2653            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2654            container_definitions: Vec::new(),
2655            network_mode: Some("bridge".into()),
2656            status: "ACTIVE".into(),
2657            task_role_arn: None,
2658            execution_role_arn: None,
2659            requires_compatibilities: Vec::new(),
2660            compatibilities: Vec::new(),
2661            cpu: None,
2662            memory: None,
2663            pid_mode: None,
2664            ipc_mode: None,
2665            volumes: Vec::new(),
2666            placement_constraints: Vec::new(),
2667            proxy_configuration: None,
2668            inference_accelerators: Vec::new(),
2669            ephemeral_storage: None,
2670            runtime_platform: None,
2671            requires_attributes: Vec::new(),
2672            registered_at: Utc::now(),
2673            registered_by: None,
2674            deregistered_at: None,
2675            tags: Vec::new(),
2676            enable_fault_injection: None,
2677        };
2678        acct.task_definitions.insert("app".into(), {
2679            let mut m = std::collections::BTreeMap::new();
2680            m.insert(1, td);
2681            m
2682        });
2683
2684        let service = crate::state::Service {
2685            service_name: "svc".into(),
2686            service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
2687            cluster_name: "default".into(),
2688            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2689            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2690            family: "app".into(),
2691            revision: 1,
2692            desired_count: 1,
2693            running_count: 0,
2694            pending_count: 0,
2695            launch_type: "FARGATE".into(),
2696            status: "ACTIVE".into(),
2697            scheduling_strategy: "REPLICA".into(),
2698            deployment_controller: "ECS".into(),
2699            minimum_healthy_percent: Some(0),
2700            maximum_percent: Some(200),
2701            circuit_breaker: None,
2702            deployments: Vec::new(),
2703            load_balancers: vec![serde_json::json!({
2704                "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
2705                "containerName": "app",
2706                "containerPort": 80,
2707            })],
2708            service_registries: Vec::new(),
2709            placement_constraints: Vec::new(),
2710            placement_strategy: Vec::new(),
2711            network_configuration: None,
2712            volume_configurations: vec![],
2713            tags: Vec::new(),
2714            created_at: Utc::now(),
2715            created_by: None,
2716            role_arn: None,
2717            platform_version: None,
2718            health_check_grace_period_seconds: None,
2719            enable_execute_command: false,
2720            enable_ecs_managed_tags: false,
2721            propagate_tags: None,
2722            capacity_provider_strategy: Vec::new(),
2723            availability_zone_rebalancing: None,
2724        };
2725        acct.services.insert(
2726            crate::state::EcsState::service_key("default", "svc"),
2727            service,
2728        );
2729
2730        let mut task = make_task("t1");
2731        task.group = Some("service:svc".into());
2732        task.containers = vec![crate::state::Container {
2733            container_arn: "arn:aws:ecs:us-east-1:000000000000:container/default/abc/app".into(),
2734            name: "app".into(),
2735            image: "alpine".into(),
2736            task_arn: task.task_arn.clone(),
2737            last_status: "RUNNING".into(),
2738            exit_code: None,
2739            reason: None,
2740            runtime_id: Some("dockerid-app".into()),
2741            essential: true,
2742            cpu: None,
2743            memory: None,
2744            memory_reservation: None,
2745            network_bindings: vec![serde_json::json!({
2746                "bindIP": "0.0.0.0",
2747                "containerPort": 80,
2748                "hostPort": 32768,
2749                "protocol": "tcp",
2750            })],
2751            network_interfaces: Vec::new(),
2752            health_status: None,
2753            managed_agents: None,
2754            image_digest: None,
2755        }];
2756        acct.tasks.insert("t1".into(), task);
2757
2758        let state = acct.clone();
2759        let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2760        assert_eq!(targets.len(), 1);
2761        let (arn, tg_targets) = &targets[0];
2762        assert_eq!(
2763            arn,
2764            "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
2765        );
2766        assert_eq!(tg_targets.len(), 1);
2767        assert_eq!(tg_targets[0].0, "127.0.0.1");
2768        assert_eq!(tg_targets[0].1, Some(32768));
2769    }
2770
2771    #[test]
2772    fn compute_elbv2_targets_awsvpc_uses_eni_ip() {
2773        let mut accounts: MultiAccountState<EcsState> =
2774            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2775        let acct = accounts.get_or_create("000000000000");
2776
2777        let td = crate::state::TaskDefinition {
2778            family: "app".into(),
2779            revision: 1,
2780            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2781            container_definitions: Vec::new(),
2782            network_mode: Some("awsvpc".into()),
2783            status: "ACTIVE".into(),
2784            task_role_arn: None,
2785            execution_role_arn: None,
2786            requires_compatibilities: Vec::new(),
2787            compatibilities: Vec::new(),
2788            cpu: None,
2789            memory: None,
2790            pid_mode: None,
2791            ipc_mode: None,
2792            volumes: Vec::new(),
2793            placement_constraints: Vec::new(),
2794            proxy_configuration: None,
2795            inference_accelerators: Vec::new(),
2796            ephemeral_storage: None,
2797            runtime_platform: None,
2798            requires_attributes: Vec::new(),
2799            registered_at: Utc::now(),
2800            registered_by: None,
2801            deregistered_at: None,
2802            tags: Vec::new(),
2803            enable_fault_injection: None,
2804        };
2805        acct.task_definitions.insert("app".into(), {
2806            let mut m = std::collections::BTreeMap::new();
2807            m.insert(1, td);
2808            m
2809        });
2810
2811        let service = crate::state::Service {
2812            service_name: "svc".into(),
2813            service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
2814            cluster_name: "default".into(),
2815            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2816            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2817            family: "app".into(),
2818            revision: 1,
2819            desired_count: 1,
2820            running_count: 0,
2821            pending_count: 0,
2822            launch_type: "FARGATE".into(),
2823            status: "ACTIVE".into(),
2824            scheduling_strategy: "REPLICA".into(),
2825            deployment_controller: "ECS".into(),
2826            minimum_healthy_percent: Some(0),
2827            maximum_percent: Some(200),
2828            circuit_breaker: None,
2829            deployments: Vec::new(),
2830            load_balancers: vec![serde_json::json!({
2831                "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
2832                "containerName": "app",
2833                "containerPort": 80,
2834            })],
2835            service_registries: Vec::new(),
2836            placement_constraints: Vec::new(),
2837            placement_strategy: Vec::new(),
2838            network_configuration: None,
2839            volume_configurations: vec![],
2840            tags: Vec::new(),
2841            created_at: Utc::now(),
2842            created_by: None,
2843            role_arn: None,
2844            platform_version: None,
2845            health_check_grace_period_seconds: None,
2846            enable_execute_command: false,
2847            enable_ecs_managed_tags: false,
2848            propagate_tags: None,
2849            capacity_provider_strategy: Vec::new(),
2850            availability_zone_rebalancing: None,
2851        };
2852        acct.services.insert(
2853            crate::state::EcsState::service_key("default", "svc"),
2854            service,
2855        );
2856
2857        let mut task = make_task("t1");
2858        task.group = Some("service:svc".into());
2859        task.attachments = vec![crate::state::TaskAttachment {
2860            id: "eni-123".into(),
2861            attachment_type: "eni".into(),
2862            status: "ATTACHED".into(),
2863            details: vec![
2864                crate::state::AttachmentDetail {
2865                    name: "privateIPv4Address".into(),
2866                    value: "172.18.0.2".into(),
2867                },
2868                crate::state::AttachmentDetail {
2869                    name: "macAddress".into(),
2870                    value: "02:42:ac:12:00:02".into(),
2871                },
2872            ],
2873        }];
2874        acct.tasks.insert("t1".into(), task);
2875
2876        let state = acct.clone();
2877        let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2878        assert_eq!(targets.len(), 1);
2879        let (arn, tg_targets) = &targets[0];
2880        assert_eq!(
2881            arn,
2882            "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
2883        );
2884        assert_eq!(tg_targets.len(), 1);
2885        assert_eq!(tg_targets[0].0, "172.18.0.2");
2886        assert_eq!(tg_targets[0].1, Some(80));
2887    }
2888}