Skip to main content

fakecloud_ecs/runtime/
task_lifecycle.rs

1//! `EcsRuntime` `task_lifecycle` family — extracted from service.rs by audit-2026-05-19.
2
3use super::*;
4
5impl EcsRuntime {
6    /// Spawn the task asynchronously. Returns immediately after transitioning
7    /// the task to `PENDING`; the background task advances it to `RUNNING`
8    /// once the container is created and to `STOPPED` once the container
9    /// exits.
10    pub fn run_task(self: Arc<Self>, state: SharedEcsState, task_id: String, account_id: String) {
11        let rt = self.clone();
12        tokio::spawn(async move {
13            if let Err(err) = rt.run_task_inner(&state, &task_id, &account_id).await {
14                tracing::warn!(%err, task = %task_id, "ecs task execution failed");
15                // Also surface on stderr so nextest's captured-output for a
16                // failed E2E shows the reason instead of just "empty logs".
17                eprintln!("[ecs] task {task_id} failed: {err}");
18                finalize_failure(&state, &account_id, &task_id, &err.to_string());
19                rt.emit_state_change(
20                    &state,
21                    &account_id,
22                    &task_id,
23                    "STOPPED",
24                    Some(("TaskFailedToStart", err.to_string())),
25                );
26            }
27        });
28    }
29
30    pub async fn run_task_inner(
31        &self,
32        state: &SharedEcsState,
33        task_id: &str,
34        account_id: &str,
35    ) -> Result<(), RuntimeError> {
36        // Build a per-container launch plan up-front so we hold the read
37        // lock once. Each entry carries everything needed to compose a
38        // `docker run` invocation for one container in the task.
39        let plans = build_container_plans(state, account_id, task_id, self.server_port)?;
40        if plans.is_empty() {
41            return Err(RuntimeError::ContainerStart(
42                "task has no containers".into(),
43            ));
44        }
45
46        // Resolve secrets for each plan. Failures fail the whole task to
47        // match real ECS's "failed to retrieve secret" behaviour — there's
48        // no point starting a sidecar when the app container will fail.
49        let mut resolved_plans: Vec<ResolvedContainerPlan> = Vec::with_capacity(plans.len());
50        for plan in plans {
51            let mut env = plan.env.clone();
52            for (name, value_from) in &plan.secrets_refs {
53                match self.resolve_secret(account_id, value_from) {
54                    Some(v) => env.push((name.clone(), v)),
55                    None => {
56                        return Err(RuntimeError::ContainerStart(format!(
57                            "failed to resolve secret {name} from {value_from}"
58                        )));
59                    }
60                }
61            }
62            if plan.has_task_role {
63                env.push((
64                    "AWS_CONTAINER_CREDENTIALS_FULL_URI".into(),
65                    format!(
66                        "http://host.docker.internal:{}/_fakecloud/ecs/creds/{}",
67                        self.server_port, task_id
68                    ),
69                ));
70            }
71            env.push((
72                "ECS_CONTAINER_METADATA_URI".into(),
73                format!(
74                    "http://host.docker.internal:{}/_fakecloud/ecs/v3/{}",
75                    self.server_port, task_id
76                ),
77            ));
78            env.push((
79                "ECS_CONTAINER_METADATA_URI_V4".into(),
80                format!(
81                    "http://host.docker.internal:{}/_fakecloud/ecs/v4/{}",
82                    self.server_port, task_id
83                ),
84            ));
85            resolved_plans.push(ResolvedContainerPlan { plan, env });
86        }
87
88        // Pull every distinct image up-front so a second container's pull
89        // failure surfaces before we leave the first container running.
90        mark_pull_started(state, account_id, task_id);
91        let mut run_images: Vec<String> = Vec::with_capacity(resolved_plans.len());
92        let mut image_digests: Vec<Option<String>> = Vec::with_capacity(resolved_plans.len());
93        for rp in &resolved_plans {
94            let local_pull_uri =
95                fakecloud_core::ecr_uri::translate_to_local(&rp.plan.image, self.server_port);
96            let pull_uri = local_pull_uri.as_deref().unwrap_or(&rp.plan.image);
97            let pull_out = self
98                .cli_command()
99                .args(["pull", pull_uri])
100                .output()
101                .await
102                .map_err(|e| RuntimeError::ImagePull(e.to_string()))?;
103            if !pull_out.status.success() {
104                let err = String::from_utf8_lossy(&pull_out.stderr).to_string();
105                return Err(RuntimeError::ImagePull(err));
106            }
107            // Retag the local pull URI to the AWS URI so `docker run` finds
108            // the image under the user-facing name. Digest-pinned refs
109            // can't be `docker tag` targets, so we fall through and run
110            // under the local URI in that case.
111            let run_image = if let Some(ref local_uri) = local_pull_uri {
112                if fakecloud_core::ecr_uri::is_digest_ref(&rp.plan.image) {
113                    local_uri.clone()
114                } else {
115                    let _ = self
116                        .cli_command()
117                        .args(["tag", local_uri, &rp.plan.image])
118                        .output()
119                        .await;
120                    rp.plan.image.clone()
121                }
122            } else {
123                rp.plan.image.clone()
124            };
125            // Best-effort image digest extraction so DescribeTasks emits
126            // the resolved digest the way real ECS does. Failures here
127            // (e.g. CLI without RepoDigests) are silent — digest stays
128            // `None` rather than failing the task.
129            let digest = self.lookup_image_digest(pull_uri).await;
130            run_images.push(run_image);
131            image_digests.push(digest);
132        }
133        mark_pull_stopped(state, account_id, task_id);
134
135        // For awsvpc network mode, create a per-task docker network so
136        // containers share an isolated bridge. Clean it up when the task
137        // stops. Network creation is best-effort: on failure we fall back
138        // to the default bridge and continue.
139        let awsvpc_network = resolved_plans
140            .iter()
141            .any(|rp| rp.plan.network_mode.as_deref() == Some("awsvpc"));
142        let network_name = format!("fakecloud-ecs-{}", task_id);
143        let network_created = if awsvpc_network {
144            let create = Command::new(&self.cli)
145                .args([
146                    "network",
147                    "create",
148                    "--driver",
149                    "bridge",
150                    "--label",
151                    &format!("fakecloud-ecs-task={}", task_id),
152                    &network_name,
153                ])
154                .output()
155                .await;
156            match create {
157                Ok(out) if out.status.success() => {
158                    tracing::info!(
159                        task = %task_id,
160                        network = %network_name,
161                        "created awsvpc docker network"
162                    );
163                    true
164                }
165                Ok(out) => {
166                    let err = String::from_utf8_lossy(&out.stderr);
167                    tracing::warn!(
168                        task = %task_id,
169                        network = %network_name,
170                        error = %err,
171                        "awsvpc network creation failed; falling back to default bridge"
172                    );
173                    false
174                }
175                Err(e) => {
176                    tracing::warn!(
177                        task = %task_id,
178                        network = %network_name,
179                        error = %e,
180                        "awsvpc network creation failed; falling back to default bridge"
181                    );
182                    false
183                }
184            }
185        } else {
186            false
187        };
188
189        if network_created {
190            let eni_id = format!(
191                "eni-{}",
192                uuid::Uuid::new_v4()
193                    .to_string()
194                    .replace('-', "")
195                    .get(..17)
196                    .unwrap_or("")
197            );
198            let mac = format!(
199                "02:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
200                rand::random::<u8>(),
201                rand::random::<u8>(),
202                rand::random::<u8>(),
203                rand::random::<u8>(),
204                rand::random::<u8>()
205            );
206            let ip = format!("10.0.{}.{}", rand::random::<u8>(), rand::random::<u8>());
207            let mut accounts = state.write();
208            if let Some(st) = accounts.get_mut(account_id) {
209                if let Some(task) = st.tasks.get_mut(task_id) {
210                    task.attachments.push(crate::state::TaskAttachment {
211                        id: eni_id.clone(),
212                        attachment_type: "eni".into(),
213                        status: "ATTACHED".into(),
214                        details: vec![
215                            crate::state::AttachmentDetail {
216                                name: "subnetId".into(),
217                                value: "subnet-fakecloud".into(),
218                            },
219                            crate::state::AttachmentDetail {
220                                name: "privateIPv4Address".into(),
221                                value: ip.clone(),
222                            },
223                            crate::state::AttachmentDetail {
224                                name: "macAddress".into(),
225                                value: mac.clone(),
226                            },
227                        ],
228                    });
229                }
230            }
231            tracing::info!(
232                task = %task_id,
233                eni = %eni_id,
234                ip = %ip,
235                "populated awsvpc ENI attachment"
236            );
237        }
238
239        // Launch every container detached, in topological order. Before
240        // each `docker run` we honour the dependent's `dependsOn[]` by
241        // polling docker until each upstream container reaches the
242        // requested condition (START/COMPLETE/SUCCESS/HEALTHY). If any
243        // fails to start (or an upstream gate times out), kill the
244        // already-started containers and bail — partial-launch state is
245        // harder to reason about than a clean failure.
246        let mut started: Vec<RunningContainer> = Vec::with_capacity(resolved_plans.len());
247        for (idx, (rp, run_image)) in resolved_plans.iter().zip(run_images.iter()).enumerate() {
248            // Wait for every dependsOn[] entry on this container. Upstreams
249            // declared in the same task always show up earlier in the
250            // launch order thanks to topo_sort_plans, so we only ever look
251            // backwards into `started`.
252            for dep in &rp.plan.depends_on {
253                let upstream = match started.iter().find(|c| c.name == dep.container_name) {
254                    Some(u) => u,
255                    // Upstream not in this task definition (we ignored it
256                    // during topo-sort too). Skip the gate — this matches
257                    // the existing "ignore unknown dependency" behaviour.
258                    None => continue,
259                };
260                // Whether the upstream has a healthCheck configured —
261                // governs the HEALTHY shortcut: AWS treats HEALTHY as
262                // immediately satisfied when the upstream has no probe.
263                let upstream_has_health_check = resolved_plans
264                    .iter()
265                    .find(|p| p.plan.container_name == dep.container_name)
266                    .is_some_and(|p| p.plan.health_check.is_some());
267                if let Err(err) = self
268                    .wait_for_depends_on(upstream, dep.condition, upstream_has_health_check)
269                    .await
270                {
271                    self.cleanup_partial_start(&started, task_id);
272                    return Err(err);
273                }
274            }
275            let argv = build_run_argv(
276                &rp.plan,
277                &rp.env,
278                task_id,
279                &self.host_ip,
280                run_image,
281                network_created,
282            );
283            let mut cmd = Command::new(&self.cli);
284            cmd.args(&argv);
285            let run_out = cmd.output().await.map_err(|e| {
286                // Cleanup already-started containers on launch failure.
287                self.cleanup_partial_start(&started, task_id);
288                RuntimeError::ContainerStart(e.to_string())
289            })?;
290            if !run_out.status.success() {
291                let err = String::from_utf8_lossy(&run_out.stderr).to_string();
292                self.cleanup_partial_start(&started, task_id);
293                return Err(RuntimeError::ContainerStart(err));
294            }
295            let container_id = String::from_utf8_lossy(&run_out.stdout).trim().to_string();
296            started.push(RunningContainer {
297                name: rp.plan.container_name.clone(),
298                container_id,
299                essential: rp.plan.essential,
300                exit_code: None,
301                network_bindings: network_bindings_for(&rp.plan),
302                image_digest: image_digests.get(idx).cloned().unwrap_or(None),
303            });
304        }
305
306        // Stash all (name, container_id) pairs so StopTask/stop_all can
307        // reach every container backing this task.
308        {
309            let mut guard = self.containers.write();
310            guard.insert(
311                task_id.to_string(),
312                started
313                    .iter()
314                    .map(|c| (c.name.clone(), c.container_id.clone()))
315                    .collect(),
316            );
317        }
318        mark_running_multi(state, account_id, task_id, &started);
319        self.register_lb_targets(state, account_id, task_id);
320        self.emit_state_change(state, account_id, task_id, "RUNNING", None);
321
322        // Wait for the first essential container (or, if none are
323        // essential, any container) to exit. ECS task lifetime is
324        // bounded by the first essential exit, after which all remaining
325        // containers are stopped. While polling we also refresh each
326        // container's `healthStatus` from `docker inspect` so
327        // DescribeTasks reflects HEALTHCHECK transitions in near real
328        // time.
329        let wait_outcome = self
330            .wait_for_task_exit_with_health(state, account_id, task_id, &started)
331            .await?;
332
333        // Stop and reap any sidecars still running. Best-effort — failures
334        // here shouldn't keep the task from transitioning to STOPPED.
335        let mut final_containers = started.clone();
336        for (i, rc) in started.iter().enumerate() {
337            if Some(i) == wait_outcome.exited_index {
338                final_containers[i].exit_code = Some(wait_outcome.exit_code);
339                continue;
340            }
341            // Try to grab the exit code if the container already exited
342            // on its own (non-essential exits don't stop the task), then
343            // fall back to `docker stop` for stragglers.
344            let inspect = Command::new(&self.cli)
345                .args(["inspect", "-f", "{{.State.ExitCode}}", &rc.container_id])
346                .output()
347                .await;
348            let still_running = match inspect {
349                Ok(out) if out.status.success() => {
350                    let s = String::from_utf8_lossy(&out.stdout).trim().to_string();
351                    // `docker inspect` returns 0 for not-yet-exited
352                    // containers, so we additionally check `State.Running`.
353                    let running = Command::new(&self.cli)
354                        .args(["inspect", "-f", "{{.State.Running}}", &rc.container_id])
355                        .output()
356                        .await
357                        .map(|o| String::from_utf8_lossy(&o.stdout).trim() == "true")
358                        .unwrap_or(false);
359                    if !running {
360                        if let Ok(code) = s.parse::<i64>() {
361                            final_containers[i].exit_code = Some(code);
362                        }
363                    }
364                    running
365                }
366                _ => false,
367            };
368            if still_running {
369                let _ = Command::new(&self.cli)
370                    .args(["stop", "--time", "10", &rc.container_id])
371                    .output()
372                    .await;
373                let wait_out = Command::new(&self.cli)
374                    .args(["wait", &rc.container_id])
375                    .output()
376                    .await;
377                if let Ok(out) = wait_out {
378                    let code: i64 = String::from_utf8_lossy(&out.stdout)
379                        .trim()
380                        .parse()
381                        .unwrap_or(-1);
382                    final_containers[i].exit_code = Some(code);
383                }
384            }
385        }
386
387        // Capture combined stdout+stderr from every container so the
388        // introspection endpoint shows logs from sidecars too.
389        let mut captured = String::new();
390        for rc in &started {
391            let logs_out = Command::new(&self.cli)
392                .args(["logs", &rc.container_id])
393                .output()
394                .await
395                .map_err(|e| RuntimeError::Wait(e.to_string()))?;
396            captured.push_str(&format!("[{}] ", rc.name));
397            captured.push_str(&String::from_utf8_lossy(&logs_out.stdout));
398            captured.push_str(&String::from_utf8_lossy(&logs_out.stderr));
399        }
400
401        // Reap every container we own.
402        for rc in &started {
403            let _ = Command::new(&self.cli)
404                .args(["rm", "-f", &rc.container_id])
405                .output()
406                .await;
407        }
408        // Clean up the per-task docker network for awsvpc.
409        if network_created {
410            let _ = Command::new(&self.cli)
411                .args(["network", "rm", &network_name])
412                .output()
413                .await;
414        }
415        self.containers.write().remove(task_id);
416
417        // Forward logs BEFORE flipping the task to STOPPED so a client
418        // that polls DescribeTasks and immediately queries
419        // DescribeLogStreams can't observe the STOPPED transition before
420        // the awslogs group/stream has been materialised.
421        self.forward_awslogs_if_configured(state, account_id, task_id, &captured);
422        let exit_code = wait_outcome.exit_code;
423        finalize_stopped_multi(
424            state,
425            account_id,
426            task_id,
427            &final_containers,
428            exit_code,
429            &captured,
430            wait_outcome.stop_code,
431            None,
432        );
433        self.deregister_lb_targets(state, account_id, task_id);
434        self.emit_state_change(
435            state,
436            account_id,
437            task_id,
438            "STOPPED",
439            Some((wait_outcome.stop_code, format!("Exit code {}", exit_code))),
440        );
441        Ok(())
442    }
443
444    /// Wait for the task to reach a stop condition (any essential
445    /// container exits, or every container exits when none are
446    /// essential) while also polling `docker inspect .State.Health.Status`
447    /// on every iteration to push the latest `healthStatus` onto each
448    /// task container — so DescribeTasks shows live HEALTHCHECK
449    /// transitions instead of the boot-time `UNKNOWN`. Returns the
450    /// index into `started` of the container whose exit determined the
451    /// task lifetime, its exit code, and the stopCode.
452    pub(super) async fn wait_for_task_exit_with_health(
453        &self,
454        state: &SharedEcsState,
455        account_id: &str,
456        task_id: &str,
457        started: &[RunningContainer],
458    ) -> Result<TaskExitOutcome, RuntimeError> {
459        let any_essential = started.iter().any(|c| c.essential);
460        let mut working: Vec<RunningContainer> = started.to_vec();
461        let mut first_exited: Option<usize> = None;
462        loop {
463            // Refresh health status before checking exits so a container
464            // that goes UNHEALTHY -> exits in the same iteration leaves
465            // its final health state on the task before we transition to
466            // STOPPED.
467            self.refresh_health_status(state, account_id, task_id, started)
468                .await;
469            for (i, rc) in started.iter().enumerate() {
470                if working[i].exit_code.is_some() {
471                    continue;
472                }
473                let inspect = Command::new(&self.cli)
474                    .args(["inspect", "-f", "{{.State.Running}}", &rc.container_id])
475                    .output()
476                    .await;
477                let running = match inspect {
478                    Ok(out) if out.status.success() => {
479                        String::from_utf8_lossy(&out.stdout).trim() == "true"
480                    }
481                    _ => false,
482                };
483                if running {
484                    continue;
485                }
486                let wait_out = Command::new(&self.cli)
487                    .args(["wait", &rc.container_id])
488                    .output()
489                    .await
490                    .map_err(|e| RuntimeError::Wait(e.to_string()))?;
491                if !wait_out.status.success() {
492                    let err = String::from_utf8_lossy(&wait_out.stderr).to_string();
493                    return Err(RuntimeError::Wait(err));
494                }
495                let exit_code: i64 = String::from_utf8_lossy(&wait_out.stdout)
496                    .trim()
497                    .parse()
498                    .unwrap_or(-1);
499                working[i].exit_code = Some(exit_code);
500                if first_exited.is_none() && (rc.essential || !any_essential) {
501                    first_exited = Some(i);
502                }
503            }
504            if task_should_stop(&working) {
505                let idx = first_exited
506                    .or_else(|| working.iter().position(|c| c.exit_code.is_some()))
507                    .unwrap_or(0);
508                let exit_code = working[idx].exit_code.unwrap_or(-1);
509                return Ok(TaskExitOutcome {
510                    exited_index: Some(idx),
511                    exit_code,
512                    stop_code: if any_essential {
513                        "EssentialContainerExited"
514                    } else {
515                        "TaskCompleted"
516                    },
517                });
518            }
519            sleep(Duration::from_millis(200)).await;
520        }
521    }
522
523    /// Block the launch of a dependent container until its upstream
524    /// reaches the requested `dependsOn[].condition`. We poll
525    /// `docker inspect` at a small interval; the wait is bounded by an
526    /// AWS-style timeout (120s by default — long enough for image
527    /// startup but short enough to surface bugs as a clean
528    /// `ContainerStart` failure).
529    ///
530    /// `upstream_has_health_check` is needed for the `HEALTHY` branch:
531    /// when the upstream has no healthCheck, AWS treats `HEALTHY` as
532    /// immediately satisfied (otherwise the dependent would block
533    /// forever, since docker reports `Health.Status` only when the
534    /// container has a HEALTHCHECK directive).
535    pub(super) async fn wait_for_depends_on(
536        &self,
537        upstream: &RunningContainer,
538        condition: DependsOnCondition,
539        upstream_has_health_check: bool,
540    ) -> Result<(), RuntimeError> {
541        // Bounded wait — chosen to comfortably cover slow init scripts
542        // without letting a wedged dependency stall a task indefinitely.
543        const WAIT_TIMEOUT: Duration = Duration::from_secs(120);
544        const POLL_INTERVAL: Duration = Duration::from_millis(200);
545
546        // HEALTHY against an upstream without a healthCheck: AWS treats
547        // this as immediately satisfied because there's no probe to
548        // observe. Skip the polling loop entirely so the dependent isn't
549        // wedged forever waiting for a status that docker will never set.
550        if matches!(condition, DependsOnCondition::Healthy) && !upstream_has_health_check {
551            return Ok(());
552        }
553
554        let deadline = std::time::Instant::now() + WAIT_TIMEOUT;
555        loop {
556            let inspect = inspect_container_state(&self.cli, &upstream.container_id).await;
557            if let Some(state) = inspect {
558                if condition_is_met(condition, &state) {
559                    return Ok(());
560                }
561                // SUCCESS specifically: if the container exited with a
562                // non-zero code, the gate can never be satisfied. Bail
563                // immediately rather than waiting for the timeout — this
564                // matches ECS's "stoppedReason: dependency failed" path.
565                if matches!(condition, DependsOnCondition::Success)
566                    && state.exited
567                    && state.exit_code != 0
568                {
569                    return Err(RuntimeError::ContainerStart(format!(
570                        "dependency on container {} ({}) failed: upstream exited with code {}",
571                        upstream.name,
572                        DependsOnCondition::Success.as_aws_str(),
573                        state.exit_code,
574                    )));
575                }
576            }
577            if std::time::Instant::now() >= deadline {
578                return Err(RuntimeError::ContainerStart(format!(
579                    "timed out waiting for container {} to reach condition {}",
580                    upstream.name,
581                    condition.as_aws_str(),
582                )));
583            }
584            tokio::time::sleep(POLL_INTERVAL).await;
585        }
586    }
587
588    /// Best-effort cleanup of containers we already started when a later
589    /// container in the task failed to launch. Without this, half-launched
590    /// tasks leak docker containers. `task_id` mirrors the value used at
591    /// network creation so `network rm` targets the right name —
592    /// deriving it from a container_id prefix was wrong (container ids
593    /// are docker-assigned, not task-shaped).
594    pub(super) fn cleanup_partial_start(&self, started: &[RunningContainer], task_id: &str) {
595        let cli = self.cli.clone();
596        let ids: Vec<String> = started.iter().map(|c| c.container_id.clone()).collect();
597        let network = format!("fakecloud-ecs-{task_id}");
598        tokio::spawn(async move {
599            for id in ids {
600                let _ = Command::new(&cli).args(["rm", "-f", &id]).output().await;
601            }
602            let _ = Command::new(&cli)
603                .args(["network", "rm", &network])
604                .output()
605                .await;
606        });
607    }
608
609    /// Kill every container behind a task with the configured stop
610    /// timeout. Returns true if at least one container was killed. Called
611    /// synchronously from `StopTask`; the wait loop in `run_task_inner`
612    /// observes the exits and transitions the task to `STOPPED`.
613    pub async fn stop_task(&self, task_id: &str, reason: &str) -> bool {
614        let containers = self.containers.read().get(task_id).cloned();
615        let Some(list) = containers else {
616            return false;
617        };
618        if list.is_empty() {
619            return false;
620        }
621        // `docker stop` sends SIGTERM then SIGKILL after a timeout.
622        for (_name, id) in &list {
623            let _ = Command::new(&self.cli)
624                .args(["stop", "--time", "10", id])
625                .output()
626                .await;
627        }
628        tracing::info!(task = %task_id, reason = %reason, "ecs task stop requested");
629        true
630    }
631
632    /// Kill every running container the runtime owns. Called on reset /
633    /// shutdown so docker state matches fakecloud state after a fresh
634    /// boot.
635    pub async fn stop_all(&self) {
636        let ids: Vec<String> = self
637            .containers
638            .read()
639            .values()
640            .flat_map(|list| list.iter().map(|(_, id)| id.clone()))
641            .collect();
642        for id in ids {
643            let _ = Command::new(&self.cli).args(["kill", &id]).output().await;
644            let _ = Command::new(&self.cli).args(["rm", &id]).output().await;
645        }
646        self.containers.write().clear();
647    }
648}