Skip to main content

fakecloud_ecs/runtime/
task_lifecycle.rs

1//! `EcsRuntime` `task_lifecycle` family — extracted from service.rs by audit-2026-05-19.
2
3use super::*;
4
5impl EcsRuntime {
6    /// Spawn the task asynchronously. Returns immediately after transitioning
7    /// the task to `PENDING`; the background task advances it to `RUNNING`
8    /// once the container is created and to `STOPPED` once the container
9    /// exits.
10    pub fn run_task(self: Arc<Self>, state: SharedEcsState, task_id: String, account_id: String) {
11        let rt = self.clone();
12        tokio::spawn(async move {
13            if let Err(err) = rt.run_task_inner(&state, &task_id, &account_id).await {
14                tracing::warn!(%err, task = %task_id, "ecs task execution failed");
15                // Also surface on stderr so nextest's captured-output for a
16                // failed E2E shows the reason instead of just "empty logs".
17                eprintln!("[ecs] task {task_id} failed: {err}");
18                finalize_failure(&state, &account_id, &task_id, &err.to_string());
19                rt.emit_state_change(
20                    &state,
21                    &account_id,
22                    &task_id,
23                    "STOPPED",
24                    Some(("TaskFailedToStart", err.to_string())),
25                );
26            }
27        });
28    }
29
30    pub async fn run_task_inner(
31        &self,
32        state: &SharedEcsState,
33        task_id: &str,
34        account_id: &str,
35    ) -> Result<(), RuntimeError> {
36        if self.k8s.is_some() {
37            return self.k8s_run_task_inner(state, task_id, account_id).await;
38        }
39        // Build a per-container launch plan up-front so we hold the read
40        // lock once. Each entry carries everything needed to compose a
41        // `docker run` invocation for one container in the task.
42        let plans = build_container_plans(state, account_id, task_id, self.server_port)?;
43        if plans.is_empty() {
44            return Err(RuntimeError::ContainerStart(
45                "task has no containers".into(),
46            ));
47        }
48
49        // Resolve secrets for each plan. Failures fail the whole task to
50        // match real ECS's "failed to retrieve secret" behaviour — there's
51        // no point starting a sidecar when the app container will fail.
52        let mut resolved_plans: Vec<ResolvedContainerPlan> = Vec::with_capacity(plans.len());
53        for plan in plans {
54            let mut env = plan.env.clone();
55            for (name, value_from) in &plan.secrets_refs {
56                match self.resolve_secret(account_id, value_from) {
57                    Some(v) => env.push((name.clone(), v)),
58                    None => {
59                        return Err(RuntimeError::ContainerStart(format!(
60                            "failed to resolve secret {name} from {value_from}"
61                        )));
62                    }
63                }
64            }
65            // The agent/metadata endpoints live on fakecloud (the host);
66            // the container reaches them via the platform host alias —
67            // `host.docker.internal` for docker, `host.containers.internal`
68            // for podman (issue #1539).
69            let host_alias = &self.net.host_alias;
70            if plan.has_task_role {
71                env.push((
72                    "AWS_CONTAINER_CREDENTIALS_FULL_URI".into(),
73                    format!(
74                        "http://{host_alias}:{}/_fakecloud/ecs/creds/{}",
75                        self.server_port, task_id
76                    ),
77                ));
78            }
79            env.push((
80                "ECS_CONTAINER_METADATA_URI".into(),
81                format!(
82                    "http://{host_alias}:{}/_fakecloud/ecs/v3/{}",
83                    self.server_port, task_id
84                ),
85            ));
86            env.push((
87                "ECS_CONTAINER_METADATA_URI_V4".into(),
88                format!(
89                    "http://{host_alias}:{}/_fakecloud/ecs/v4/{}",
90                    self.server_port, task_id
91                ),
92            ));
93            resolved_plans.push(ResolvedContainerPlan { plan, env });
94        }
95
96        // Pull every distinct image up-front so a second container's pull
97        // failure surfaces before we leave the first container running.
98        mark_pull_started(state, account_id, task_id);
99        let mut run_images: Vec<String> = Vec::with_capacity(resolved_plans.len());
100        let mut image_digests: Vec<Option<String>> = Vec::with_capacity(resolved_plans.len());
101        for rp in &resolved_plans {
102            // Rewrite ECR URIs to fakecloud's local registry at the sibling
103            // host (`127.0.0.1` on the host, `host.docker.internal` when
104            // fakecloud is containerized) so the daemon/sibling can reach
105            // fakecloud's published registry port (issue #1539, bug 0.8).
106            let local_pull_uri = fakecloud_core::ecr_uri::translate_to_local_at(
107                &rp.plan.image,
108                &self.net.sibling_host,
109                self.server_port,
110            );
111            let pull_uri = local_pull_uri.as_deref().unwrap_or(&rp.plan.image);
112            let pull_out = self
113                .cli_command()
114                .args(["pull", pull_uri])
115                .output()
116                .await
117                .map_err(|e| RuntimeError::ImagePull(e.to_string()))?;
118            if !pull_out.status.success() {
119                let err = String::from_utf8_lossy(&pull_out.stderr).to_string();
120                return Err(RuntimeError::ImagePull(err));
121            }
122            // Retag the local pull URI to the AWS URI so `docker run` finds
123            // the image under the user-facing name. Digest-pinned refs
124            // can't be `docker tag` targets, so we fall through and run
125            // under the local URI in that case.
126            let run_image = if let Some(ref local_uri) = local_pull_uri {
127                if fakecloud_core::ecr_uri::is_digest_ref(&rp.plan.image) {
128                    local_uri.clone()
129                } else {
130                    let _ = self
131                        .cli_command()
132                        .args(["tag", local_uri, &rp.plan.image])
133                        .output()
134                        .await;
135                    rp.plan.image.clone()
136                }
137            } else {
138                rp.plan.image.clone()
139            };
140            // Best-effort image digest extraction so DescribeTasks emits
141            // the resolved digest the way real ECS does. Failures here
142            // (e.g. CLI without RepoDigests) are silent — digest stays
143            // `None` rather than failing the task.
144            let digest = self.lookup_image_digest(pull_uri).await;
145            run_images.push(run_image);
146            image_digests.push(digest);
147        }
148        mark_pull_stopped(state, account_id, task_id);
149
150        // For awsvpc network mode, create a per-task docker network so
151        // containers share an isolated bridge. Clean it up when the task
152        // stops. Network creation is best-effort: on failure we fall back
153        // to the default bridge and continue.
154        let awsvpc_network = resolved_plans
155            .iter()
156            .any(|rp| rp.plan.network_mode.as_deref() == Some("awsvpc"));
157        let network_name = format!("fakecloud-ecs-{}", task_id);
158        let network_created = if awsvpc_network {
159            let create = Command::new(&self.cli)
160                .args([
161                    "network",
162                    "create",
163                    "--driver",
164                    "bridge",
165                    "--label",
166                    &format!("fakecloud-ecs-task={}", task_id),
167                    // Ownership label so the startup reaper can prune the
168                    // per-task awsvpc network after an ungraceful restart,
169                    // matching the container label.
170                    "--label",
171                    &super::fakecloud_instance_label(),
172                    &network_name,
173                ])
174                .output()
175                .await;
176            match create {
177                Ok(out) if out.status.success() => {
178                    tracing::info!(
179                        task = %task_id,
180                        network = %network_name,
181                        "created awsvpc docker network"
182                    );
183                    true
184                }
185                Ok(out) => {
186                    let err = String::from_utf8_lossy(&out.stderr);
187                    tracing::warn!(
188                        task = %task_id,
189                        network = %network_name,
190                        error = %err,
191                        "awsvpc network creation failed; falling back to default bridge"
192                    );
193                    false
194                }
195                Err(e) => {
196                    tracing::warn!(
197                        task = %task_id,
198                        network = %network_name,
199                        error = %e,
200                        "awsvpc network creation failed; falling back to default bridge"
201                    );
202                    false
203                }
204            }
205        } else {
206            false
207        };
208
209        if network_created {
210            let eni_id = format!(
211                "eni-{}",
212                uuid::Uuid::new_v4()
213                    .to_string()
214                    .replace('-', "")
215                    .get(..17)
216                    .unwrap_or("")
217            );
218            let mac = format!(
219                "02:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
220                rand::random::<u8>(),
221                rand::random::<u8>(),
222                rand::random::<u8>(),
223                rand::random::<u8>(),
224                rand::random::<u8>()
225            );
226            let ip = format!("10.0.{}.{}", rand::random::<u8>(), rand::random::<u8>());
227            let mut accounts = state.write();
228            if let Some(st) = accounts.get_mut(account_id) {
229                if let Some(task) = st.tasks.get_mut(task_id) {
230                    task.attachments.push(crate::state::TaskAttachment {
231                        id: eni_id.clone(),
232                        attachment_type: "eni".into(),
233                        status: "ATTACHED".into(),
234                        details: vec![
235                            crate::state::AttachmentDetail {
236                                name: "subnetId".into(),
237                                value: "subnet-fakecloud".into(),
238                            },
239                            crate::state::AttachmentDetail {
240                                name: "privateIPv4Address".into(),
241                                value: ip.clone(),
242                            },
243                            crate::state::AttachmentDetail {
244                                name: "macAddress".into(),
245                                value: mac.clone(),
246                            },
247                        ],
248                    });
249                }
250            }
251            tracing::info!(
252                task = %task_id,
253                eni = %eni_id,
254                ip = %ip,
255                "populated awsvpc ENI attachment"
256            );
257        }
258
259        // Launch every container detached, in topological order. Before
260        // each `docker run` we honour the dependent's `dependsOn[]` by
261        // polling docker until each upstream container reaches the
262        // requested condition (START/COMPLETE/SUCCESS/HEALTHY). If any
263        // fails to start (or an upstream gate times out), kill the
264        // already-started containers and bail — partial-launch state is
265        // harder to reason about than a clean failure.
266        // Register the task in `self.containers` BEFORE the launch loop so a
267        // concurrent StopTask / scale-down / DeleteService that arrives
268        // during the multi-second pull+run window can find the entry and stop
269        // whatever has been launched so far. (`stop_task` is a no-op when the
270        // key is absent — the orphan-on-race bug.) We append each container
271        // id to this entry as it starts, mirroring the k8s backend which
272        // registers the Pod name before `create_pod`.
273        self.containers
274            .write()
275            .entry(task_id.to_string())
276            .or_default();
277        let mut started: Vec<RunningContainer> = Vec::with_capacity(resolved_plans.len());
278        for (idx, (rp, run_image)) in resolved_plans.iter().zip(run_images.iter()).enumerate() {
279            // Wait for every dependsOn[] entry on this container. Upstreams
280            // declared in the same task always show up earlier in the
281            // launch order thanks to topo_sort_plans, so we only ever look
282            // backwards into `started`.
283            for dep in &rp.plan.depends_on {
284                let upstream = match started.iter().find(|c| c.name == dep.container_name) {
285                    Some(u) => u,
286                    // Upstream not in this task definition (we ignored it
287                    // during topo-sort too). Skip the gate — this matches
288                    // the existing "ignore unknown dependency" behaviour.
289                    None => continue,
290                };
291                // Whether the upstream has a healthCheck configured —
292                // governs the HEALTHY shortcut: AWS treats HEALTHY as
293                // immediately satisfied when the upstream has no probe.
294                let upstream_has_health_check = resolved_plans
295                    .iter()
296                    .find(|p| p.plan.container_name == dep.container_name)
297                    .is_some_and(|p| p.plan.health_check.is_some());
298                if let Err(err) = self
299                    .wait_for_depends_on(upstream, dep.condition, upstream_has_health_check)
300                    .await
301                {
302                    self.cleanup_partial_start(&started, task_id);
303                    return Err(err);
304                }
305            }
306            let argv = build_run_argv(
307                &rp.plan,
308                &rp.env,
309                task_id,
310                &self.net.host_alias,
311                self.net.add_host_arg.as_deref(),
312                run_image,
313                network_created,
314            );
315            let mut cmd = Command::new(&self.cli);
316            cmd.args(&argv);
317            let run_out = cmd.output().await.map_err(|e| {
318                // Cleanup already-started containers on launch failure.
319                self.cleanup_partial_start(&started, task_id);
320                RuntimeError::ContainerStart(e.to_string())
321            })?;
322            if !run_out.status.success() {
323                let err = String::from_utf8_lossy(&run_out.stderr).to_string();
324                self.cleanup_partial_start(&started, task_id);
325                return Err(RuntimeError::ContainerStart(err));
326            }
327            let container_id = String::from_utf8_lossy(&run_out.stdout).trim().to_string();
328            // Append to the runtime map immediately so a StopTask landing
329            // between this container and the next one in the launch loop can
330            // reach it.
331            self.containers
332                .write()
333                .entry(task_id.to_string())
334                .or_default()
335                .push((rp.plan.container_name.clone(), container_id.clone()));
336            started.push(RunningContainer {
337                name: rp.plan.container_name.clone(),
338                container_id,
339                essential: rp.plan.essential,
340                exit_code: None,
341                network_bindings: network_bindings_for(&rp.plan),
342                image_digest: image_digests.get(idx).cloned().unwrap_or(None),
343            });
344        }
345
346        // A StopTask / scale-down / DeleteService that raced the launch may
347        // have flipped this task's desired_status to STOPPED while we were
348        // pulling/running (and may have already issued `docker stop` against
349        // the containers it could see in the runtime map). If so, stop every
350        // container we launched and finalize as a clean stop instead of
351        // marking the task RUNNING with a live workload the user asked to
352        // kill. Mirrors the k8s backend, which observes a 404 from the
353        // StopTask-deleted Pod and finalizes the same way.
354        if task_desired_stopped(state, account_id, task_id) {
355            for rc in &started {
356                let _ = Command::new(&self.cli)
357                    .args(["stop", "--time", "10", &rc.container_id])
358                    .output()
359                    .await;
360                let _ = Command::new(&self.cli)
361                    .args(["rm", "-f", &rc.container_id])
362                    .output()
363                    .await;
364            }
365            if network_created {
366                let _ = Command::new(&self.cli)
367                    .args(["network", "rm", &network_name])
368                    .output()
369                    .await;
370            }
371            self.containers.write().remove(task_id);
372            let stopped: Vec<RunningContainer> = started
373                .iter()
374                .map(|rc| RunningContainer {
375                    exit_code: Some(rc.exit_code.unwrap_or(137)),
376                    ..rc.clone()
377                })
378                .collect();
379            finalize_stopped_multi(
380                state,
381                account_id,
382                task_id,
383                &stopped,
384                137,
385                "",
386                "UserInitiated",
387                Some("Task stopped during launch".into()),
388            );
389            self.deregister_lb_targets(state, account_id, task_id);
390            self.emit_state_change(
391                state,
392                account_id,
393                task_id,
394                "STOPPED",
395                Some(("UserInitiated", "Task stopped during launch".into())),
396            );
397            return Ok(());
398        }
399
400        mark_running_multi(state, account_id, task_id, &started);
401        self.register_lb_targets(state, account_id, task_id);
402        self.emit_state_change(state, account_id, task_id, "RUNNING", None);
403
404        // Wait for the first essential container (or, if none are
405        // essential, any container) to exit. ECS task lifetime is
406        // bounded by the first essential exit, after which all remaining
407        // containers are stopped. While polling we also refresh each
408        // container's `healthStatus` from `docker inspect` so
409        // DescribeTasks reflects HEALTHCHECK transitions in near real
410        // time.
411        let wait_outcome = self
412            .wait_for_task_exit_with_health(state, account_id, task_id, &started)
413            .await?;
414
415        // Stop and reap any sidecars still running. Best-effort — failures
416        // here shouldn't keep the task from transitioning to STOPPED.
417        let mut final_containers = started.clone();
418        for (i, rc) in started.iter().enumerate() {
419            if Some(i) == wait_outcome.exited_index {
420                final_containers[i].exit_code = Some(wait_outcome.exit_code);
421                continue;
422            }
423            // Try to grab the exit code if the container already exited
424            // on its own (non-essential exits don't stop the task), then
425            // fall back to `docker stop` for stragglers.
426            let inspect = Command::new(&self.cli)
427                .args(["inspect", "-f", "{{.State.ExitCode}}", &rc.container_id])
428                .output()
429                .await;
430            let still_running = match inspect {
431                Ok(out) if out.status.success() => {
432                    let s = String::from_utf8_lossy(&out.stdout).trim().to_string();
433                    // `docker inspect` returns 0 for not-yet-exited
434                    // containers, so we additionally check `State.Running`.
435                    let running = Command::new(&self.cli)
436                        .args(["inspect", "-f", "{{.State.Running}}", &rc.container_id])
437                        .output()
438                        .await
439                        .map(|o| String::from_utf8_lossy(&o.stdout).trim() == "true")
440                        .unwrap_or(false);
441                    if !running {
442                        if let Ok(code) = s.parse::<i64>() {
443                            final_containers[i].exit_code = Some(code);
444                        }
445                    }
446                    running
447                }
448                _ => false,
449            };
450            if still_running {
451                let _ = Command::new(&self.cli)
452                    .args(["stop", "--time", "10", &rc.container_id])
453                    .output()
454                    .await;
455                let wait_out = Command::new(&self.cli)
456                    .args(["wait", &rc.container_id])
457                    .output()
458                    .await;
459                if let Ok(out) = wait_out {
460                    let code: i64 = String::from_utf8_lossy(&out.stdout)
461                        .trim()
462                        .parse()
463                        .unwrap_or(-1);
464                    final_containers[i].exit_code = Some(code);
465                }
466            }
467        }
468
469        // Capture combined stdout+stderr from every container so the
470        // introspection endpoint shows logs from sidecars too.
471        let mut captured = String::new();
472        for rc in &started {
473            let logs_out = Command::new(&self.cli)
474                .args(["logs", &rc.container_id])
475                .output()
476                .await
477                .map_err(|e| RuntimeError::Wait(e.to_string()))?;
478            captured.push_str(&format!("[{}] ", rc.name));
479            captured.push_str(&String::from_utf8_lossy(&logs_out.stdout));
480            captured.push_str(&String::from_utf8_lossy(&logs_out.stderr));
481        }
482
483        // Reap every container we own.
484        for rc in &started {
485            let _ = Command::new(&self.cli)
486                .args(["rm", "-f", &rc.container_id])
487                .output()
488                .await;
489        }
490        // Reap task-scoped docker volumes (anonymous "Docker volumes" and
491        // `dockerVolumeConfiguration` with scope=task), matching AWS, which
492        // deletes them when the task stops. Host binds, EFS/FSx stubs and
493        // scope=shared volumes are left in place — they outlive the task.
494        // `volume rm` is best-effort and no-ops on a volume still in use.
495        let mut task_volumes: std::collections::BTreeSet<String> =
496            std::collections::BTreeSet::new();
497        for rp in &resolved_plans {
498            for vm in &rp.plan.volume_mounts {
499                if vm.cleanup_on_stop {
500                    task_volumes.insert(vm.source.clone());
501                }
502            }
503        }
504        for vol in &task_volumes {
505            let _ = Command::new(&self.cli)
506                .args(["volume", "rm", "-f", vol])
507                .output()
508                .await;
509        }
510        // Clean up the per-task docker network for awsvpc.
511        if network_created {
512            let _ = Command::new(&self.cli)
513                .args(["network", "rm", &network_name])
514                .output()
515                .await;
516        }
517        self.containers.write().remove(task_id);
518
519        // Forward logs BEFORE flipping the task to STOPPED so a client
520        // that polls DescribeTasks and immediately queries
521        // DescribeLogStreams can't observe the STOPPED transition before
522        // the awslogs group/stream has been materialised.
523        self.forward_awslogs_if_configured(state, account_id, task_id, &captured);
524        let exit_code = wait_outcome.exit_code;
525        finalize_stopped_multi(
526            state,
527            account_id,
528            task_id,
529            &final_containers,
530            exit_code,
531            &captured,
532            wait_outcome.stop_code,
533            None,
534        );
535        self.deregister_lb_targets(state, account_id, task_id);
536        self.emit_state_change(
537            state,
538            account_id,
539            task_id,
540            "STOPPED",
541            Some((wait_outcome.stop_code, format!("Exit code {}", exit_code))),
542        );
543        Ok(())
544    }
545
546    /// Wait for the task to reach a stop condition (any essential
547    /// container exits, or every container exits when none are
548    /// essential) while also polling `docker inspect .State.Health.Status`
549    /// on every iteration to push the latest `healthStatus` onto each
550    /// task container — so DescribeTasks shows live HEALTHCHECK
551    /// transitions instead of the boot-time `UNKNOWN`. Returns the
552    /// index into `started` of the container whose exit determined the
553    /// task lifetime, its exit code, and the stopCode.
554    pub(super) async fn wait_for_task_exit_with_health(
555        &self,
556        state: &SharedEcsState,
557        account_id: &str,
558        task_id: &str,
559        started: &[RunningContainer],
560    ) -> Result<TaskExitOutcome, RuntimeError> {
561        let any_essential = started.iter().any(|c| c.essential);
562        let mut working: Vec<RunningContainer> = started.to_vec();
563        let mut first_exited: Option<usize> = None;
564        loop {
565            // Refresh health status before checking exits so a container
566            // that goes UNHEALTHY -> exits in the same iteration leaves
567            // its final health state on the task before we transition to
568            // STOPPED.
569            self.refresh_health_status(state, account_id, task_id, started)
570                .await;
571            for (i, rc) in started.iter().enumerate() {
572                if working[i].exit_code.is_some() {
573                    continue;
574                }
575                let inspect = Command::new(&self.cli)
576                    .args(["inspect", "-f", "{{.State.Running}}", &rc.container_id])
577                    .output()
578                    .await;
579                let running = match inspect {
580                    Ok(out) if out.status.success() => {
581                        String::from_utf8_lossy(&out.stdout).trim() == "true"
582                    }
583                    _ => false,
584                };
585                if running {
586                    continue;
587                }
588                let wait_out = Command::new(&self.cli)
589                    .args(["wait", &rc.container_id])
590                    .output()
591                    .await
592                    .map_err(|e| RuntimeError::Wait(e.to_string()))?;
593                if !wait_out.status.success() {
594                    let err = String::from_utf8_lossy(&wait_out.stderr).to_string();
595                    return Err(RuntimeError::Wait(err));
596                }
597                let exit_code: i64 = String::from_utf8_lossy(&wait_out.stdout)
598                    .trim()
599                    .parse()
600                    .unwrap_or(-1);
601                working[i].exit_code = Some(exit_code);
602                if first_exited.is_none() && (rc.essential || !any_essential) {
603                    first_exited = Some(i);
604                }
605            }
606            if task_should_stop(&working) {
607                let idx = first_exited
608                    .or_else(|| working.iter().position(|c| c.exit_code.is_some()))
609                    .unwrap_or(0);
610                let exit_code = working[idx].exit_code.unwrap_or(-1);
611                return Ok(TaskExitOutcome {
612                    exited_index: Some(idx),
613                    exit_code,
614                    stop_code: if any_essential {
615                        "EssentialContainerExited"
616                    } else {
617                        "TaskCompleted"
618                    },
619                });
620            }
621            sleep(Duration::from_millis(200)).await;
622        }
623    }
624
625    /// Block the launch of a dependent container until its upstream
626    /// reaches the requested `dependsOn[].condition`. We poll
627    /// `docker inspect` at a small interval; the wait is bounded by an
628    /// AWS-style timeout (120s by default — long enough for image
629    /// startup but short enough to surface bugs as a clean
630    /// `ContainerStart` failure).
631    ///
632    /// `upstream_has_health_check` is needed for the `HEALTHY` branch:
633    /// when the upstream has no healthCheck, AWS treats `HEALTHY` as
634    /// immediately satisfied (otherwise the dependent would block
635    /// forever, since docker reports `Health.Status` only when the
636    /// container has a HEALTHCHECK directive).
637    pub(super) async fn wait_for_depends_on(
638        &self,
639        upstream: &RunningContainer,
640        condition: DependsOnCondition,
641        upstream_has_health_check: bool,
642    ) -> Result<(), RuntimeError> {
643        // Bounded wait — chosen to comfortably cover slow init scripts
644        // without letting a wedged dependency stall a task indefinitely.
645        const WAIT_TIMEOUT: Duration = Duration::from_secs(120);
646        const POLL_INTERVAL: Duration = Duration::from_millis(200);
647
648        // HEALTHY against an upstream without a healthCheck: AWS treats
649        // this as immediately satisfied because there's no probe to
650        // observe. Skip the polling loop entirely so the dependent isn't
651        // wedged forever waiting for a status that docker will never set.
652        if matches!(condition, DependsOnCondition::Healthy) && !upstream_has_health_check {
653            return Ok(());
654        }
655
656        let deadline = std::time::Instant::now() + WAIT_TIMEOUT;
657        loop {
658            let inspect = inspect_container_state(&self.cli, &upstream.container_id).await;
659            if let Some(state) = inspect {
660                if condition_is_met(condition, &state) {
661                    return Ok(());
662                }
663                // SUCCESS specifically: if the container exited with a
664                // non-zero code, the gate can never be satisfied. Bail
665                // immediately rather than waiting for the timeout — this
666                // matches ECS's "stoppedReason: dependency failed" path.
667                if matches!(condition, DependsOnCondition::Success)
668                    && state.exited
669                    && state.exit_code != 0
670                {
671                    return Err(RuntimeError::ContainerStart(format!(
672                        "dependency on container {} ({}) failed: upstream exited with code {}",
673                        upstream.name,
674                        DependsOnCondition::Success.as_aws_str(),
675                        state.exit_code,
676                    )));
677                }
678            }
679            if std::time::Instant::now() >= deadline {
680                return Err(RuntimeError::ContainerStart(format!(
681                    "timed out waiting for container {} to reach condition {}",
682                    upstream.name,
683                    condition.as_aws_str(),
684                )));
685            }
686            tokio::time::sleep(POLL_INTERVAL).await;
687        }
688    }
689
690    /// Best-effort cleanup of containers we already started when a later
691    /// container in the task failed to launch. Without this, half-launched
692    /// tasks leak docker containers. `task_id` mirrors the value used at
693    /// network creation so `network rm` targets the right name —
694    /// deriving it from a container_id prefix was wrong (container ids
695    /// are docker-assigned, not task-shaped).
696    pub(super) fn cleanup_partial_start(&self, started: &[RunningContainer], task_id: &str) {
697        let cli = self.cli.clone();
698        let ids: Vec<String> = started.iter().map(|c| c.container_id.clone()).collect();
699        let network = format!("fakecloud-ecs-{task_id}");
700        // Drop the runtime map entry we pre-registered before the launch loop
701        // so a failed launch doesn't leave a stale (now-empty) key that
702        // future StopTask calls would treat as a live task.
703        self.containers.write().remove(task_id);
704        tokio::spawn(async move {
705            for id in ids {
706                let _ = Command::new(&cli).args(["rm", "-f", &id]).output().await;
707            }
708            let _ = Command::new(&cli)
709                .args(["network", "rm", &network])
710                .output()
711                .await;
712        });
713    }
714
715    /// Kill every container behind a task with the configured stop
716    /// timeout. Returns true if at least one container was killed. Called
717    /// synchronously from `StopTask`; the wait loop in `run_task_inner`
718    /// observes the exits and transitions the task to `STOPPED`.
719    pub async fn stop_task(&self, task_id: &str, reason: &str) -> bool {
720        if let Some(k) = &self.k8s {
721            tracing::info!(task = %task_id, reason = %reason, "ecs task stop requested (k8s)");
722            return k.stop_task(task_id).await;
723        }
724        let containers = self.containers.read().get(task_id).cloned();
725        let Some(list) = containers else {
726            return false;
727        };
728        if list.is_empty() {
729            return false;
730        }
731        // `docker stop` sends SIGTERM then SIGKILL after a timeout.
732        for (_name, id) in &list {
733            let _ = Command::new(&self.cli)
734                .args(["stop", "--time", "10", id])
735                .output()
736                .await;
737        }
738        tracing::info!(task = %task_id, reason = %reason, "ecs task stop requested");
739        true
740    }
741
742    /// Kill every running container the runtime owns. Called on reset /
743    /// shutdown so docker state matches fakecloud state after a fresh
744    /// boot.
745    pub async fn stop_all(&self) {
746        if let Some(k) = &self.k8s {
747            k.stop_all().await;
748            return;
749        }
750        let ids: Vec<String> = self
751            .containers
752            .read()
753            .values()
754            .flat_map(|list| list.iter().map(|(_, id)| id.clone()))
755            .collect();
756        for id in ids {
757            let _ = Command::new(&self.cli).args(["kill", &id]).output().await;
758            let _ = Command::new(&self.cli).args(["rm", &id]).output().await;
759        }
760        self.containers.write().clear();
761    }
762}