fakecloud_ecs/runtime/task_lifecycle.rs
1//! `EcsRuntime` `task_lifecycle` family — extracted from service.rs by audit-2026-05-19.
2
3use super::*;
4
5impl EcsRuntime {
6 /// Spawn the task asynchronously. Returns immediately after transitioning
7 /// the task to `PENDING`; the background task advances it to `RUNNING`
8 /// once the container is created and to `STOPPED` once the container
9 /// exits.
10 pub fn run_task(self: Arc<Self>, state: SharedEcsState, task_id: String, account_id: String) {
11 let rt = self.clone();
12 tokio::spawn(async move {
13 if let Err(err) = rt.run_task_inner(&state, &task_id, &account_id).await {
14 tracing::warn!(%err, task = %task_id, "ecs task execution failed");
15 // Also surface on stderr so nextest's captured-output for a
16 // failed E2E shows the reason instead of just "empty logs".
17 eprintln!("[ecs] task {task_id} failed: {err}");
18 finalize_failure(&state, &account_id, &task_id, &err.to_string());
19 rt.emit_state_change(
20 &state,
21 &account_id,
22 &task_id,
23 "STOPPED",
24 Some(("TaskFailedToStart", err.to_string())),
25 );
26 }
27 });
28 }
29
30 pub async fn run_task_inner(
31 &self,
32 state: &SharedEcsState,
33 task_id: &str,
34 account_id: &str,
35 ) -> Result<(), RuntimeError> {
36 // Build a per-container launch plan up-front so we hold the read
37 // lock once. Each entry carries everything needed to compose a
38 // `docker run` invocation for one container in the task.
39 let plans = build_container_plans(state, account_id, task_id, self.server_port)?;
40 if plans.is_empty() {
41 return Err(RuntimeError::ContainerStart(
42 "task has no containers".into(),
43 ));
44 }
45
46 // Resolve secrets for each plan. Failures fail the whole task to
47 // match real ECS's "failed to retrieve secret" behaviour — there's
48 // no point starting a sidecar when the app container will fail.
49 let mut resolved_plans: Vec<ResolvedContainerPlan> = Vec::with_capacity(plans.len());
50 for plan in plans {
51 let mut env = plan.env.clone();
52 for (name, value_from) in &plan.secrets_refs {
53 match self.resolve_secret(account_id, value_from) {
54 Some(v) => env.push((name.clone(), v)),
55 None => {
56 return Err(RuntimeError::ContainerStart(format!(
57 "failed to resolve secret {name} from {value_from}"
58 )));
59 }
60 }
61 }
62 if plan.has_task_role {
63 env.push((
64 "AWS_CONTAINER_CREDENTIALS_FULL_URI".into(),
65 format!(
66 "http://host.docker.internal:{}/_fakecloud/ecs/creds/{}",
67 self.server_port, task_id
68 ),
69 ));
70 }
71 env.push((
72 "ECS_CONTAINER_METADATA_URI".into(),
73 format!(
74 "http://host.docker.internal:{}/_fakecloud/ecs/v3/{}",
75 self.server_port, task_id
76 ),
77 ));
78 env.push((
79 "ECS_CONTAINER_METADATA_URI_V4".into(),
80 format!(
81 "http://host.docker.internal:{}/_fakecloud/ecs/v4/{}",
82 self.server_port, task_id
83 ),
84 ));
85 resolved_plans.push(ResolvedContainerPlan { plan, env });
86 }
87
88 // Pull every distinct image up-front so a second container's pull
89 // failure surfaces before we leave the first container running.
90 mark_pull_started(state, account_id, task_id);
91 let mut run_images: Vec<String> = Vec::with_capacity(resolved_plans.len());
92 let mut image_digests: Vec<Option<String>> = Vec::with_capacity(resolved_plans.len());
93 for rp in &resolved_plans {
94 let local_pull_uri =
95 fakecloud_core::ecr_uri::translate_to_local(&rp.plan.image, self.server_port);
96 let pull_uri = local_pull_uri.as_deref().unwrap_or(&rp.plan.image);
97 let pull_out = self
98 .cli_command()
99 .args(["pull", pull_uri])
100 .output()
101 .await
102 .map_err(|e| RuntimeError::ImagePull(e.to_string()))?;
103 if !pull_out.status.success() {
104 let err = String::from_utf8_lossy(&pull_out.stderr).to_string();
105 return Err(RuntimeError::ImagePull(err));
106 }
107 // Retag the local pull URI to the AWS URI so `docker run` finds
108 // the image under the user-facing name. Digest-pinned refs
109 // can't be `docker tag` targets, so we fall through and run
110 // under the local URI in that case.
111 let run_image = if let Some(ref local_uri) = local_pull_uri {
112 if fakecloud_core::ecr_uri::is_digest_ref(&rp.plan.image) {
113 local_uri.clone()
114 } else {
115 let _ = self
116 .cli_command()
117 .args(["tag", local_uri, &rp.plan.image])
118 .output()
119 .await;
120 rp.plan.image.clone()
121 }
122 } else {
123 rp.plan.image.clone()
124 };
125 // Best-effort image digest extraction so DescribeTasks emits
126 // the resolved digest the way real ECS does. Failures here
127 // (e.g. CLI without RepoDigests) are silent — digest stays
128 // `None` rather than failing the task.
129 let digest = self.lookup_image_digest(pull_uri).await;
130 run_images.push(run_image);
131 image_digests.push(digest);
132 }
133 mark_pull_stopped(state, account_id, task_id);
134
135 // For awsvpc network mode, create a per-task docker network so
136 // containers share an isolated bridge. Clean it up when the task
137 // stops. Network creation is best-effort: on failure we fall back
138 // to the default bridge and continue.
139 let awsvpc_network = resolved_plans
140 .iter()
141 .any(|rp| rp.plan.network_mode.as_deref() == Some("awsvpc"));
142 let network_name = format!("fakecloud-ecs-{}", task_id);
143 let network_created = if awsvpc_network {
144 let create = Command::new(&self.cli)
145 .args([
146 "network",
147 "create",
148 "--driver",
149 "bridge",
150 "--label",
151 &format!("fakecloud-ecs-task={}", task_id),
152 &network_name,
153 ])
154 .output()
155 .await;
156 match create {
157 Ok(out) if out.status.success() => {
158 tracing::info!(
159 task = %task_id,
160 network = %network_name,
161 "created awsvpc docker network"
162 );
163 true
164 }
165 Ok(out) => {
166 let err = String::from_utf8_lossy(&out.stderr);
167 tracing::warn!(
168 task = %task_id,
169 network = %network_name,
170 error = %err,
171 "awsvpc network creation failed; falling back to default bridge"
172 );
173 false
174 }
175 Err(e) => {
176 tracing::warn!(
177 task = %task_id,
178 network = %network_name,
179 error = %e,
180 "awsvpc network creation failed; falling back to default bridge"
181 );
182 false
183 }
184 }
185 } else {
186 false
187 };
188
189 if network_created {
190 let eni_id = format!(
191 "eni-{}",
192 uuid::Uuid::new_v4()
193 .to_string()
194 .replace('-', "")
195 .get(..17)
196 .unwrap_or("")
197 );
198 let mac = format!(
199 "02:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
200 rand::random::<u8>(),
201 rand::random::<u8>(),
202 rand::random::<u8>(),
203 rand::random::<u8>(),
204 rand::random::<u8>()
205 );
206 let ip = format!("10.0.{}.{}", rand::random::<u8>(), rand::random::<u8>());
207 let mut accounts = state.write();
208 if let Some(st) = accounts.get_mut(account_id) {
209 if let Some(task) = st.tasks.get_mut(task_id) {
210 task.attachments.push(crate::state::TaskAttachment {
211 id: eni_id.clone(),
212 attachment_type: "eni".into(),
213 status: "ATTACHED".into(),
214 details: vec![
215 crate::state::AttachmentDetail {
216 name: "subnetId".into(),
217 value: "subnet-fakecloud".into(),
218 },
219 crate::state::AttachmentDetail {
220 name: "privateIPv4Address".into(),
221 value: ip.clone(),
222 },
223 crate::state::AttachmentDetail {
224 name: "macAddress".into(),
225 value: mac.clone(),
226 },
227 ],
228 });
229 }
230 }
231 tracing::info!(
232 task = %task_id,
233 eni = %eni_id,
234 ip = %ip,
235 "populated awsvpc ENI attachment"
236 );
237 }
238
239 // Launch every container detached, in topological order. Before
240 // each `docker run` we honour the dependent's `dependsOn[]` by
241 // polling docker until each upstream container reaches the
242 // requested condition (START/COMPLETE/SUCCESS/HEALTHY). If any
243 // fails to start (or an upstream gate times out), kill the
244 // already-started containers and bail — partial-launch state is
245 // harder to reason about than a clean failure.
246 let mut started: Vec<RunningContainer> = Vec::with_capacity(resolved_plans.len());
247 for (idx, (rp, run_image)) in resolved_plans.iter().zip(run_images.iter()).enumerate() {
248 // Wait for every dependsOn[] entry on this container. Upstreams
249 // declared in the same task always show up earlier in the
250 // launch order thanks to topo_sort_plans, so we only ever look
251 // backwards into `started`.
252 for dep in &rp.plan.depends_on {
253 let upstream = match started.iter().find(|c| c.name == dep.container_name) {
254 Some(u) => u,
255 // Upstream not in this task definition (we ignored it
256 // during topo-sort too). Skip the gate — this matches
257 // the existing "ignore unknown dependency" behaviour.
258 None => continue,
259 };
260 // Whether the upstream has a healthCheck configured —
261 // governs the HEALTHY shortcut: AWS treats HEALTHY as
262 // immediately satisfied when the upstream has no probe.
263 let upstream_has_health_check = resolved_plans
264 .iter()
265 .find(|p| p.plan.container_name == dep.container_name)
266 .is_some_and(|p| p.plan.health_check.is_some());
267 if let Err(err) = self
268 .wait_for_depends_on(upstream, dep.condition, upstream_has_health_check)
269 .await
270 {
271 self.cleanup_partial_start(&started, task_id);
272 return Err(err);
273 }
274 }
275 let argv = build_run_argv(
276 &rp.plan,
277 &rp.env,
278 task_id,
279 &self.host_ip,
280 run_image,
281 network_created,
282 );
283 let mut cmd = Command::new(&self.cli);
284 cmd.args(&argv);
285 let run_out = cmd.output().await.map_err(|e| {
286 // Cleanup already-started containers on launch failure.
287 self.cleanup_partial_start(&started, task_id);
288 RuntimeError::ContainerStart(e.to_string())
289 })?;
290 if !run_out.status.success() {
291 let err = String::from_utf8_lossy(&run_out.stderr).to_string();
292 self.cleanup_partial_start(&started, task_id);
293 return Err(RuntimeError::ContainerStart(err));
294 }
295 let container_id = String::from_utf8_lossy(&run_out.stdout).trim().to_string();
296 started.push(RunningContainer {
297 name: rp.plan.container_name.clone(),
298 container_id,
299 essential: rp.plan.essential,
300 exit_code: None,
301 network_bindings: network_bindings_for(&rp.plan),
302 image_digest: image_digests.get(idx).cloned().unwrap_or(None),
303 });
304 }
305
306 // Stash all (name, container_id) pairs so StopTask/stop_all can
307 // reach every container backing this task.
308 {
309 let mut guard = self.containers.write();
310 guard.insert(
311 task_id.to_string(),
312 started
313 .iter()
314 .map(|c| (c.name.clone(), c.container_id.clone()))
315 .collect(),
316 );
317 }
318 mark_running_multi(state, account_id, task_id, &started);
319 self.register_lb_targets(state, account_id, task_id);
320 self.emit_state_change(state, account_id, task_id, "RUNNING", None);
321
322 // Wait for the first essential container (or, if none are
323 // essential, any container) to exit. ECS task lifetime is
324 // bounded by the first essential exit, after which all remaining
325 // containers are stopped. While polling we also refresh each
326 // container's `healthStatus` from `docker inspect` so
327 // DescribeTasks reflects HEALTHCHECK transitions in near real
328 // time.
329 let wait_outcome = self
330 .wait_for_task_exit_with_health(state, account_id, task_id, &started)
331 .await?;
332
333 // Stop and reap any sidecars still running. Best-effort — failures
334 // here shouldn't keep the task from transitioning to STOPPED.
335 let mut final_containers = started.clone();
336 for (i, rc) in started.iter().enumerate() {
337 if Some(i) == wait_outcome.exited_index {
338 final_containers[i].exit_code = Some(wait_outcome.exit_code);
339 continue;
340 }
341 // Try to grab the exit code if the container already exited
342 // on its own (non-essential exits don't stop the task), then
343 // fall back to `docker stop` for stragglers.
344 let inspect = Command::new(&self.cli)
345 .args(["inspect", "-f", "{{.State.ExitCode}}", &rc.container_id])
346 .output()
347 .await;
348 let still_running = match inspect {
349 Ok(out) if out.status.success() => {
350 let s = String::from_utf8_lossy(&out.stdout).trim().to_string();
351 // `docker inspect` returns 0 for not-yet-exited
352 // containers, so we additionally check `State.Running`.
353 let running = Command::new(&self.cli)
354 .args(["inspect", "-f", "{{.State.Running}}", &rc.container_id])
355 .output()
356 .await
357 .map(|o| String::from_utf8_lossy(&o.stdout).trim() == "true")
358 .unwrap_or(false);
359 if !running {
360 if let Ok(code) = s.parse::<i64>() {
361 final_containers[i].exit_code = Some(code);
362 }
363 }
364 running
365 }
366 _ => false,
367 };
368 if still_running {
369 let _ = Command::new(&self.cli)
370 .args(["stop", "--time", "10", &rc.container_id])
371 .output()
372 .await;
373 let wait_out = Command::new(&self.cli)
374 .args(["wait", &rc.container_id])
375 .output()
376 .await;
377 if let Ok(out) = wait_out {
378 let code: i64 = String::from_utf8_lossy(&out.stdout)
379 .trim()
380 .parse()
381 .unwrap_or(-1);
382 final_containers[i].exit_code = Some(code);
383 }
384 }
385 }
386
387 // Capture combined stdout+stderr from every container so the
388 // introspection endpoint shows logs from sidecars too.
389 let mut captured = String::new();
390 for rc in &started {
391 let logs_out = Command::new(&self.cli)
392 .args(["logs", &rc.container_id])
393 .output()
394 .await
395 .map_err(|e| RuntimeError::Wait(e.to_string()))?;
396 captured.push_str(&format!("[{}] ", rc.name));
397 captured.push_str(&String::from_utf8_lossy(&logs_out.stdout));
398 captured.push_str(&String::from_utf8_lossy(&logs_out.stderr));
399 }
400
401 // Reap every container we own.
402 for rc in &started {
403 let _ = Command::new(&self.cli)
404 .args(["rm", "-f", &rc.container_id])
405 .output()
406 .await;
407 }
408 // Clean up the per-task docker network for awsvpc.
409 if network_created {
410 let _ = Command::new(&self.cli)
411 .args(["network", "rm", &network_name])
412 .output()
413 .await;
414 }
415 self.containers.write().remove(task_id);
416
417 // Forward logs BEFORE flipping the task to STOPPED so a client
418 // that polls DescribeTasks and immediately queries
419 // DescribeLogStreams can't observe the STOPPED transition before
420 // the awslogs group/stream has been materialised.
421 self.forward_awslogs_if_configured(state, account_id, task_id, &captured);
422 let exit_code = wait_outcome.exit_code;
423 finalize_stopped_multi(
424 state,
425 account_id,
426 task_id,
427 &final_containers,
428 exit_code,
429 &captured,
430 wait_outcome.stop_code,
431 None,
432 );
433 self.deregister_lb_targets(state, account_id, task_id);
434 self.emit_state_change(
435 state,
436 account_id,
437 task_id,
438 "STOPPED",
439 Some((wait_outcome.stop_code, format!("Exit code {}", exit_code))),
440 );
441 Ok(())
442 }
443
444 /// Wait for the task to reach a stop condition (any essential
445 /// container exits, or every container exits when none are
446 /// essential) while also polling `docker inspect .State.Health.Status`
447 /// on every iteration to push the latest `healthStatus` onto each
448 /// task container — so DescribeTasks shows live HEALTHCHECK
449 /// transitions instead of the boot-time `UNKNOWN`. Returns the
450 /// index into `started` of the container whose exit determined the
451 /// task lifetime, its exit code, and the stopCode.
452 pub(super) async fn wait_for_task_exit_with_health(
453 &self,
454 state: &SharedEcsState,
455 account_id: &str,
456 task_id: &str,
457 started: &[RunningContainer],
458 ) -> Result<TaskExitOutcome, RuntimeError> {
459 let any_essential = started.iter().any(|c| c.essential);
460 let mut working: Vec<RunningContainer> = started.to_vec();
461 let mut first_exited: Option<usize> = None;
462 loop {
463 // Refresh health status before checking exits so a container
464 // that goes UNHEALTHY -> exits in the same iteration leaves
465 // its final health state on the task before we transition to
466 // STOPPED.
467 self.refresh_health_status(state, account_id, task_id, started)
468 .await;
469 for (i, rc) in started.iter().enumerate() {
470 if working[i].exit_code.is_some() {
471 continue;
472 }
473 let inspect = Command::new(&self.cli)
474 .args(["inspect", "-f", "{{.State.Running}}", &rc.container_id])
475 .output()
476 .await;
477 let running = match inspect {
478 Ok(out) if out.status.success() => {
479 String::from_utf8_lossy(&out.stdout).trim() == "true"
480 }
481 _ => false,
482 };
483 if running {
484 continue;
485 }
486 let wait_out = Command::new(&self.cli)
487 .args(["wait", &rc.container_id])
488 .output()
489 .await
490 .map_err(|e| RuntimeError::Wait(e.to_string()))?;
491 if !wait_out.status.success() {
492 let err = String::from_utf8_lossy(&wait_out.stderr).to_string();
493 return Err(RuntimeError::Wait(err));
494 }
495 let exit_code: i64 = String::from_utf8_lossy(&wait_out.stdout)
496 .trim()
497 .parse()
498 .unwrap_or(-1);
499 working[i].exit_code = Some(exit_code);
500 if first_exited.is_none() && (rc.essential || !any_essential) {
501 first_exited = Some(i);
502 }
503 }
504 if task_should_stop(&working) {
505 let idx = first_exited
506 .or_else(|| working.iter().position(|c| c.exit_code.is_some()))
507 .unwrap_or(0);
508 let exit_code = working[idx].exit_code.unwrap_or(-1);
509 return Ok(TaskExitOutcome {
510 exited_index: Some(idx),
511 exit_code,
512 stop_code: if any_essential {
513 "EssentialContainerExited"
514 } else {
515 "TaskCompleted"
516 },
517 });
518 }
519 sleep(Duration::from_millis(200)).await;
520 }
521 }
522
523 /// Block the launch of a dependent container until its upstream
524 /// reaches the requested `dependsOn[].condition`. We poll
525 /// `docker inspect` at a small interval; the wait is bounded by an
526 /// AWS-style timeout (120s by default — long enough for image
527 /// startup but short enough to surface bugs as a clean
528 /// `ContainerStart` failure).
529 ///
530 /// `upstream_has_health_check` is needed for the `HEALTHY` branch:
531 /// when the upstream has no healthCheck, AWS treats `HEALTHY` as
532 /// immediately satisfied (otherwise the dependent would block
533 /// forever, since docker reports `Health.Status` only when the
534 /// container has a HEALTHCHECK directive).
535 pub(super) async fn wait_for_depends_on(
536 &self,
537 upstream: &RunningContainer,
538 condition: DependsOnCondition,
539 upstream_has_health_check: bool,
540 ) -> Result<(), RuntimeError> {
541 // Bounded wait — chosen to comfortably cover slow init scripts
542 // without letting a wedged dependency stall a task indefinitely.
543 const WAIT_TIMEOUT: Duration = Duration::from_secs(120);
544 const POLL_INTERVAL: Duration = Duration::from_millis(200);
545
546 // HEALTHY against an upstream without a healthCheck: AWS treats
547 // this as immediately satisfied because there's no probe to
548 // observe. Skip the polling loop entirely so the dependent isn't
549 // wedged forever waiting for a status that docker will never set.
550 if matches!(condition, DependsOnCondition::Healthy) && !upstream_has_health_check {
551 return Ok(());
552 }
553
554 let deadline = std::time::Instant::now() + WAIT_TIMEOUT;
555 loop {
556 let inspect = inspect_container_state(&self.cli, &upstream.container_id).await;
557 if let Some(state) = inspect {
558 if condition_is_met(condition, &state) {
559 return Ok(());
560 }
561 // SUCCESS specifically: if the container exited with a
562 // non-zero code, the gate can never be satisfied. Bail
563 // immediately rather than waiting for the timeout — this
564 // matches ECS's "stoppedReason: dependency failed" path.
565 if matches!(condition, DependsOnCondition::Success)
566 && state.exited
567 && state.exit_code != 0
568 {
569 return Err(RuntimeError::ContainerStart(format!(
570 "dependency on container {} ({}) failed: upstream exited with code {}",
571 upstream.name,
572 DependsOnCondition::Success.as_aws_str(),
573 state.exit_code,
574 )));
575 }
576 }
577 if std::time::Instant::now() >= deadline {
578 return Err(RuntimeError::ContainerStart(format!(
579 "timed out waiting for container {} to reach condition {}",
580 upstream.name,
581 condition.as_aws_str(),
582 )));
583 }
584 tokio::time::sleep(POLL_INTERVAL).await;
585 }
586 }
587
588 /// Best-effort cleanup of containers we already started when a later
589 /// container in the task failed to launch. Without this, half-launched
590 /// tasks leak docker containers. `task_id` mirrors the value used at
591 /// network creation so `network rm` targets the right name —
592 /// deriving it from a container_id prefix was wrong (container ids
593 /// are docker-assigned, not task-shaped).
594 pub(super) fn cleanup_partial_start(&self, started: &[RunningContainer], task_id: &str) {
595 let cli = self.cli.clone();
596 let ids: Vec<String> = started.iter().map(|c| c.container_id.clone()).collect();
597 let network = format!("fakecloud-ecs-{task_id}");
598 tokio::spawn(async move {
599 for id in ids {
600 let _ = Command::new(&cli).args(["rm", "-f", &id]).output().await;
601 }
602 let _ = Command::new(&cli)
603 .args(["network", "rm", &network])
604 .output()
605 .await;
606 });
607 }
608
609 /// Kill every container behind a task with the configured stop
610 /// timeout. Returns true if at least one container was killed. Called
611 /// synchronously from `StopTask`; the wait loop in `run_task_inner`
612 /// observes the exits and transitions the task to `STOPPED`.
613 pub async fn stop_task(&self, task_id: &str, reason: &str) -> bool {
614 let containers = self.containers.read().get(task_id).cloned();
615 let Some(list) = containers else {
616 return false;
617 };
618 if list.is_empty() {
619 return false;
620 }
621 // `docker stop` sends SIGTERM then SIGKILL after a timeout.
622 for (_name, id) in &list {
623 let _ = Command::new(&self.cli)
624 .args(["stop", "--time", "10", id])
625 .output()
626 .await;
627 }
628 tracing::info!(task = %task_id, reason = %reason, "ecs task stop requested");
629 true
630 }
631
632 /// Kill every running container the runtime owns. Called on reset /
633 /// shutdown so docker state matches fakecloud state after a fresh
634 /// boot.
635 pub async fn stop_all(&self) {
636 let ids: Vec<String> = self
637 .containers
638 .read()
639 .values()
640 .flat_map(|list| list.iter().map(|(_, id)| id.clone()))
641 .collect();
642 for id in ids {
643 let _ = Command::new(&self.cli).args(["kill", &id]).output().await;
644 let _ = Command::new(&self.cli).args(["rm", &id]).output().await;
645 }
646 self.containers.write().clear();
647 }
648}