fakecloud_ecs/runtime/task_lifecycle.rs
1//! `EcsRuntime` `task_lifecycle` family — extracted from service.rs by audit-2026-05-19.
2
3use super::*;
4
5impl EcsRuntime {
6 /// Spawn the task asynchronously. Returns immediately after transitioning
7 /// the task to `PENDING`; the background task advances it to `RUNNING`
8 /// once the container is created and to `STOPPED` once the container
9 /// exits.
10 pub fn run_task(self: Arc<Self>, state: SharedEcsState, task_id: String, account_id: String) {
11 let rt = self.clone();
12 tokio::spawn(async move {
13 if let Err(err) = rt.run_task_inner(&state, &task_id, &account_id).await {
14 tracing::warn!(%err, task = %task_id, "ecs task execution failed");
15 // Also surface on stderr so nextest's captured-output for a
16 // failed E2E shows the reason instead of just "empty logs".
17 eprintln!("[ecs] task {task_id} failed: {err}");
18 finalize_failure(&state, &account_id, &task_id, &err.to_string());
19 rt.emit_state_change(
20 &state,
21 &account_id,
22 &task_id,
23 "STOPPED",
24 Some(("TaskFailedToStart", err.to_string())),
25 );
26 }
27 });
28 }
29
30 pub async fn run_task_inner(
31 &self,
32 state: &SharedEcsState,
33 task_id: &str,
34 account_id: &str,
35 ) -> Result<(), RuntimeError> {
36 if self.k8s.is_some() {
37 return self.k8s_run_task_inner(state, task_id, account_id).await;
38 }
39 // Build a per-container launch plan up-front so we hold the read
40 // lock once. Each entry carries everything needed to compose a
41 // `docker run` invocation for one container in the task.
42 let plans = build_container_plans(state, account_id, task_id, self.server_port)?;
43 if plans.is_empty() {
44 return Err(RuntimeError::ContainerStart(
45 "task has no containers".into(),
46 ));
47 }
48
49 // Resolve secrets for each plan. Failures fail the whole task to
50 // match real ECS's "failed to retrieve secret" behaviour — there's
51 // no point starting a sidecar when the app container will fail.
52 let mut resolved_plans: Vec<ResolvedContainerPlan> = Vec::with_capacity(plans.len());
53 for plan in plans {
54 let mut env = plan.env.clone();
55 for (name, value_from) in &plan.secrets_refs {
56 match self.resolve_secret(account_id, value_from) {
57 Some(v) => env.push((name.clone(), v)),
58 None => {
59 return Err(RuntimeError::ContainerStart(format!(
60 "failed to resolve secret {name} from {value_from}"
61 )));
62 }
63 }
64 }
65 // The agent/metadata endpoints live on fakecloud (the host);
66 // the container reaches them via the platform host alias —
67 // `host.docker.internal` for docker, `host.containers.internal`
68 // for podman (issue #1539).
69 let host_alias = &self.net.host_alias;
70 if plan.has_task_role {
71 env.push((
72 "AWS_CONTAINER_CREDENTIALS_FULL_URI".into(),
73 format!(
74 "http://{host_alias}:{}/_fakecloud/ecs/creds/{}",
75 self.server_port, task_id
76 ),
77 ));
78 }
79 env.push((
80 "ECS_CONTAINER_METADATA_URI".into(),
81 format!(
82 "http://{host_alias}:{}/_fakecloud/ecs/v3/{}",
83 self.server_port, task_id
84 ),
85 ));
86 env.push((
87 "ECS_CONTAINER_METADATA_URI_V4".into(),
88 format!(
89 "http://{host_alias}:{}/_fakecloud/ecs/v4/{}",
90 self.server_port, task_id
91 ),
92 ));
93 resolved_plans.push(ResolvedContainerPlan { plan, env });
94 }
95
96 // Pull every distinct image up-front so a second container's pull
97 // failure surfaces before we leave the first container running.
98 mark_pull_started(state, account_id, task_id);
99 let mut run_images: Vec<String> = Vec::with_capacity(resolved_plans.len());
100 let mut image_digests: Vec<Option<String>> = Vec::with_capacity(resolved_plans.len());
101 for rp in &resolved_plans {
102 // Rewrite ECR URIs to fakecloud's local registry at the sibling
103 // host (`127.0.0.1` on the host, `host.docker.internal` when
104 // fakecloud is containerized) so the daemon/sibling can reach
105 // fakecloud's published registry port (issue #1539, bug 0.8).
106 let local_pull_uri = fakecloud_core::ecr_uri::translate_to_local_at(
107 &rp.plan.image,
108 &self.net.sibling_host,
109 self.server_port,
110 );
111 let pull_uri = local_pull_uri.as_deref().unwrap_or(&rp.plan.image);
112 let pull_out = self
113 .cli_command()
114 .args(["pull", pull_uri])
115 .output()
116 .await
117 .map_err(|e| RuntimeError::ImagePull(e.to_string()))?;
118 if !pull_out.status.success() {
119 let err = String::from_utf8_lossy(&pull_out.stderr).to_string();
120 return Err(RuntimeError::ImagePull(err));
121 }
122 // Retag the local pull URI to the AWS URI so `docker run` finds
123 // the image under the user-facing name. Digest-pinned refs
124 // can't be `docker tag` targets, so we fall through and run
125 // under the local URI in that case.
126 let run_image = if let Some(ref local_uri) = local_pull_uri {
127 if fakecloud_core::ecr_uri::is_digest_ref(&rp.plan.image) {
128 local_uri.clone()
129 } else {
130 let _ = self
131 .cli_command()
132 .args(["tag", local_uri, &rp.plan.image])
133 .output()
134 .await;
135 rp.plan.image.clone()
136 }
137 } else {
138 rp.plan.image.clone()
139 };
140 // Best-effort image digest extraction so DescribeTasks emits
141 // the resolved digest the way real ECS does. Failures here
142 // (e.g. CLI without RepoDigests) are silent — digest stays
143 // `None` rather than failing the task.
144 let digest = self.lookup_image_digest(pull_uri).await;
145 run_images.push(run_image);
146 image_digests.push(digest);
147 }
148 mark_pull_stopped(state, account_id, task_id);
149
150 // For awsvpc network mode, create a per-task docker network so
151 // containers share an isolated bridge. Clean it up when the task
152 // stops. Network creation is best-effort: on failure we fall back
153 // to the default bridge and continue.
154 let awsvpc_network = resolved_plans
155 .iter()
156 .any(|rp| rp.plan.network_mode.as_deref() == Some("awsvpc"));
157 let network_name = format!("fakecloud-ecs-{}", task_id);
158 let network_created = if awsvpc_network {
159 let create = Command::new(&self.cli)
160 .args([
161 "network",
162 "create",
163 "--driver",
164 "bridge",
165 "--label",
166 &format!("fakecloud-ecs-task={}", task_id),
167 // Ownership label so the startup reaper can prune the
168 // per-task awsvpc network after an ungraceful restart,
169 // matching the container label.
170 "--label",
171 &super::fakecloud_instance_label(),
172 &network_name,
173 ])
174 .output()
175 .await;
176 match create {
177 Ok(out) if out.status.success() => {
178 tracing::info!(
179 task = %task_id,
180 network = %network_name,
181 "created awsvpc docker network"
182 );
183 true
184 }
185 Ok(out) => {
186 let err = String::from_utf8_lossy(&out.stderr);
187 tracing::warn!(
188 task = %task_id,
189 network = %network_name,
190 error = %err,
191 "awsvpc network creation failed; falling back to default bridge"
192 );
193 false
194 }
195 Err(e) => {
196 tracing::warn!(
197 task = %task_id,
198 network = %network_name,
199 error = %e,
200 "awsvpc network creation failed; falling back to default bridge"
201 );
202 false
203 }
204 }
205 } else {
206 false
207 };
208
209 if network_created {
210 let eni_id = format!(
211 "eni-{}",
212 uuid::Uuid::new_v4()
213 .to_string()
214 .replace('-', "")
215 .get(..17)
216 .unwrap_or("")
217 );
218 let mac = format!(
219 "02:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
220 rand::random::<u8>(),
221 rand::random::<u8>(),
222 rand::random::<u8>(),
223 rand::random::<u8>(),
224 rand::random::<u8>()
225 );
226 let ip = format!("10.0.{}.{}", rand::random::<u8>(), rand::random::<u8>());
227 let mut accounts = state.write();
228 if let Some(st) = accounts.get_mut(account_id) {
229 if let Some(task) = st.tasks.get_mut(task_id) {
230 task.attachments.push(crate::state::TaskAttachment {
231 id: eni_id.clone(),
232 attachment_type: "eni".into(),
233 status: "ATTACHED".into(),
234 details: vec![
235 crate::state::AttachmentDetail {
236 name: "subnetId".into(),
237 value: "subnet-fakecloud".into(),
238 },
239 crate::state::AttachmentDetail {
240 name: "privateIPv4Address".into(),
241 value: ip.clone(),
242 },
243 crate::state::AttachmentDetail {
244 name: "macAddress".into(),
245 value: mac.clone(),
246 },
247 ],
248 });
249 }
250 }
251 tracing::info!(
252 task = %task_id,
253 eni = %eni_id,
254 ip = %ip,
255 "populated awsvpc ENI attachment"
256 );
257 }
258
259 // Launch every container detached, in topological order. Before
260 // each `docker run` we honour the dependent's `dependsOn[]` by
261 // polling docker until each upstream container reaches the
262 // requested condition (START/COMPLETE/SUCCESS/HEALTHY). If any
263 // fails to start (or an upstream gate times out), kill the
264 // already-started containers and bail — partial-launch state is
265 // harder to reason about than a clean failure.
266 // Register the task in `self.containers` BEFORE the launch loop so a
267 // concurrent StopTask / scale-down / DeleteService that arrives
268 // during the multi-second pull+run window can find the entry and stop
269 // whatever has been launched so far. (`stop_task` is a no-op when the
270 // key is absent — the orphan-on-race bug.) We append each container
271 // id to this entry as it starts, mirroring the k8s backend which
272 // registers the Pod name before `create_pod`.
273 self.containers
274 .write()
275 .entry(task_id.to_string())
276 .or_default();
277 let mut started: Vec<RunningContainer> = Vec::with_capacity(resolved_plans.len());
278 for (idx, (rp, run_image)) in resolved_plans.iter().zip(run_images.iter()).enumerate() {
279 // Wait for every dependsOn[] entry on this container. Upstreams
280 // declared in the same task always show up earlier in the
281 // launch order thanks to topo_sort_plans, so we only ever look
282 // backwards into `started`.
283 for dep in &rp.plan.depends_on {
284 let upstream = match started.iter().find(|c| c.name == dep.container_name) {
285 Some(u) => u,
286 // Upstream not in this task definition (we ignored it
287 // during topo-sort too). Skip the gate — this matches
288 // the existing "ignore unknown dependency" behaviour.
289 None => continue,
290 };
291 // Whether the upstream has a healthCheck configured —
292 // governs the HEALTHY shortcut: AWS treats HEALTHY as
293 // immediately satisfied when the upstream has no probe.
294 let upstream_has_health_check = resolved_plans
295 .iter()
296 .find(|p| p.plan.container_name == dep.container_name)
297 .is_some_and(|p| p.plan.health_check.is_some());
298 if let Err(err) = self
299 .wait_for_depends_on(upstream, dep.condition, upstream_has_health_check)
300 .await
301 {
302 self.cleanup_partial_start(&started, task_id);
303 return Err(err);
304 }
305 }
306 let argv = build_run_argv(
307 &rp.plan,
308 &rp.env,
309 task_id,
310 &self.net.host_alias,
311 self.net.add_host_arg.as_deref(),
312 run_image,
313 network_created,
314 );
315 let mut cmd = Command::new(&self.cli);
316 cmd.args(&argv);
317 let run_out = cmd.output().await.map_err(|e| {
318 // Cleanup already-started containers on launch failure.
319 self.cleanup_partial_start(&started, task_id);
320 RuntimeError::ContainerStart(e.to_string())
321 })?;
322 if !run_out.status.success() {
323 let err = String::from_utf8_lossy(&run_out.stderr).to_string();
324 self.cleanup_partial_start(&started, task_id);
325 return Err(RuntimeError::ContainerStart(err));
326 }
327 let container_id = String::from_utf8_lossy(&run_out.stdout).trim().to_string();
328 // Append to the runtime map immediately so a StopTask landing
329 // between this container and the next one in the launch loop can
330 // reach it.
331 self.containers
332 .write()
333 .entry(task_id.to_string())
334 .or_default()
335 .push((rp.plan.container_name.clone(), container_id.clone()));
336 started.push(RunningContainer {
337 name: rp.plan.container_name.clone(),
338 container_id,
339 essential: rp.plan.essential,
340 exit_code: None,
341 network_bindings: network_bindings_for(&rp.plan),
342 image_digest: image_digests.get(idx).cloned().unwrap_or(None),
343 });
344 }
345
346 // A StopTask / scale-down / DeleteService that raced the launch may
347 // have flipped this task's desired_status to STOPPED while we were
348 // pulling/running (and may have already issued `docker stop` against
349 // the containers it could see in the runtime map). If so, stop every
350 // container we launched and finalize as a clean stop instead of
351 // marking the task RUNNING with a live workload the user asked to
352 // kill. Mirrors the k8s backend, which observes a 404 from the
353 // StopTask-deleted Pod and finalizes the same way.
354 if task_desired_stopped(state, account_id, task_id) {
355 for rc in &started {
356 let _ = Command::new(&self.cli)
357 .args(["stop", "--time", "10", &rc.container_id])
358 .output()
359 .await;
360 let _ = Command::new(&self.cli)
361 .args(["rm", "-f", &rc.container_id])
362 .output()
363 .await;
364 }
365 if network_created {
366 let _ = Command::new(&self.cli)
367 .args(["network", "rm", &network_name])
368 .output()
369 .await;
370 }
371 self.containers.write().remove(task_id);
372 let stopped: Vec<RunningContainer> = started
373 .iter()
374 .map(|rc| RunningContainer {
375 exit_code: Some(rc.exit_code.unwrap_or(137)),
376 ..rc.clone()
377 })
378 .collect();
379 finalize_stopped_multi(
380 state,
381 account_id,
382 task_id,
383 &stopped,
384 137,
385 "",
386 "UserInitiated",
387 Some("Task stopped during launch".into()),
388 );
389 self.deregister_lb_targets(state, account_id, task_id);
390 self.emit_state_change(
391 state,
392 account_id,
393 task_id,
394 "STOPPED",
395 Some(("UserInitiated", "Task stopped during launch".into())),
396 );
397 return Ok(());
398 }
399
400 mark_running_multi(state, account_id, task_id, &started);
401 self.register_lb_targets(state, account_id, task_id);
402 self.emit_state_change(state, account_id, task_id, "RUNNING", None);
403
404 // Wait for the first essential container (or, if none are
405 // essential, any container) to exit. ECS task lifetime is
406 // bounded by the first essential exit, after which all remaining
407 // containers are stopped. While polling we also refresh each
408 // container's `healthStatus` from `docker inspect` so
409 // DescribeTasks reflects HEALTHCHECK transitions in near real
410 // time.
411 let wait_outcome = self
412 .wait_for_task_exit_with_health(state, account_id, task_id, &started)
413 .await?;
414
415 // Stop and reap any sidecars still running. Best-effort — failures
416 // here shouldn't keep the task from transitioning to STOPPED.
417 let mut final_containers = started.clone();
418 for (i, rc) in started.iter().enumerate() {
419 if Some(i) == wait_outcome.exited_index {
420 final_containers[i].exit_code = Some(wait_outcome.exit_code);
421 continue;
422 }
423 // Try to grab the exit code if the container already exited
424 // on its own (non-essential exits don't stop the task), then
425 // fall back to `docker stop` for stragglers.
426 let inspect = Command::new(&self.cli)
427 .args(["inspect", "-f", "{{.State.ExitCode}}", &rc.container_id])
428 .output()
429 .await;
430 let still_running = match inspect {
431 Ok(out) if out.status.success() => {
432 let s = String::from_utf8_lossy(&out.stdout).trim().to_string();
433 // `docker inspect` returns 0 for not-yet-exited
434 // containers, so we additionally check `State.Running`.
435 let running = Command::new(&self.cli)
436 .args(["inspect", "-f", "{{.State.Running}}", &rc.container_id])
437 .output()
438 .await
439 .map(|o| String::from_utf8_lossy(&o.stdout).trim() == "true")
440 .unwrap_or(false);
441 if !running {
442 if let Ok(code) = s.parse::<i64>() {
443 final_containers[i].exit_code = Some(code);
444 }
445 }
446 running
447 }
448 _ => false,
449 };
450 if still_running {
451 let _ = Command::new(&self.cli)
452 .args(["stop", "--time", "10", &rc.container_id])
453 .output()
454 .await;
455 let wait_out = Command::new(&self.cli)
456 .args(["wait", &rc.container_id])
457 .output()
458 .await;
459 if let Ok(out) = wait_out {
460 let code: i64 = String::from_utf8_lossy(&out.stdout)
461 .trim()
462 .parse()
463 .unwrap_or(-1);
464 final_containers[i].exit_code = Some(code);
465 }
466 }
467 }
468
469 // Capture combined stdout+stderr from every container so the
470 // introspection endpoint shows logs from sidecars too.
471 let mut captured = String::new();
472 for rc in &started {
473 let logs_out = Command::new(&self.cli)
474 .args(["logs", &rc.container_id])
475 .output()
476 .await
477 .map_err(|e| RuntimeError::Wait(e.to_string()))?;
478 captured.push_str(&format!("[{}] ", rc.name));
479 captured.push_str(&String::from_utf8_lossy(&logs_out.stdout));
480 captured.push_str(&String::from_utf8_lossy(&logs_out.stderr));
481 }
482
483 // Reap every container we own.
484 for rc in &started {
485 let _ = Command::new(&self.cli)
486 .args(["rm", "-f", &rc.container_id])
487 .output()
488 .await;
489 }
490 // Clean up the per-task docker network for awsvpc.
491 if network_created {
492 let _ = Command::new(&self.cli)
493 .args(["network", "rm", &network_name])
494 .output()
495 .await;
496 }
497 self.containers.write().remove(task_id);
498
499 // Forward logs BEFORE flipping the task to STOPPED so a client
500 // that polls DescribeTasks and immediately queries
501 // DescribeLogStreams can't observe the STOPPED transition before
502 // the awslogs group/stream has been materialised.
503 self.forward_awslogs_if_configured(state, account_id, task_id, &captured);
504 let exit_code = wait_outcome.exit_code;
505 finalize_stopped_multi(
506 state,
507 account_id,
508 task_id,
509 &final_containers,
510 exit_code,
511 &captured,
512 wait_outcome.stop_code,
513 None,
514 );
515 self.deregister_lb_targets(state, account_id, task_id);
516 self.emit_state_change(
517 state,
518 account_id,
519 task_id,
520 "STOPPED",
521 Some((wait_outcome.stop_code, format!("Exit code {}", exit_code))),
522 );
523 Ok(())
524 }
525
526 /// Wait for the task to reach a stop condition (any essential
527 /// container exits, or every container exits when none are
528 /// essential) while also polling `docker inspect .State.Health.Status`
529 /// on every iteration to push the latest `healthStatus` onto each
530 /// task container — so DescribeTasks shows live HEALTHCHECK
531 /// transitions instead of the boot-time `UNKNOWN`. Returns the
532 /// index into `started` of the container whose exit determined the
533 /// task lifetime, its exit code, and the stopCode.
534 pub(super) async fn wait_for_task_exit_with_health(
535 &self,
536 state: &SharedEcsState,
537 account_id: &str,
538 task_id: &str,
539 started: &[RunningContainer],
540 ) -> Result<TaskExitOutcome, RuntimeError> {
541 let any_essential = started.iter().any(|c| c.essential);
542 let mut working: Vec<RunningContainer> = started.to_vec();
543 let mut first_exited: Option<usize> = None;
544 loop {
545 // Refresh health status before checking exits so a container
546 // that goes UNHEALTHY -> exits in the same iteration leaves
547 // its final health state on the task before we transition to
548 // STOPPED.
549 self.refresh_health_status(state, account_id, task_id, started)
550 .await;
551 for (i, rc) in started.iter().enumerate() {
552 if working[i].exit_code.is_some() {
553 continue;
554 }
555 let inspect = Command::new(&self.cli)
556 .args(["inspect", "-f", "{{.State.Running}}", &rc.container_id])
557 .output()
558 .await;
559 let running = match inspect {
560 Ok(out) if out.status.success() => {
561 String::from_utf8_lossy(&out.stdout).trim() == "true"
562 }
563 _ => false,
564 };
565 if running {
566 continue;
567 }
568 let wait_out = Command::new(&self.cli)
569 .args(["wait", &rc.container_id])
570 .output()
571 .await
572 .map_err(|e| RuntimeError::Wait(e.to_string()))?;
573 if !wait_out.status.success() {
574 let err = String::from_utf8_lossy(&wait_out.stderr).to_string();
575 return Err(RuntimeError::Wait(err));
576 }
577 let exit_code: i64 = String::from_utf8_lossy(&wait_out.stdout)
578 .trim()
579 .parse()
580 .unwrap_or(-1);
581 working[i].exit_code = Some(exit_code);
582 if first_exited.is_none() && (rc.essential || !any_essential) {
583 first_exited = Some(i);
584 }
585 }
586 if task_should_stop(&working) {
587 let idx = first_exited
588 .or_else(|| working.iter().position(|c| c.exit_code.is_some()))
589 .unwrap_or(0);
590 let exit_code = working[idx].exit_code.unwrap_or(-1);
591 return Ok(TaskExitOutcome {
592 exited_index: Some(idx),
593 exit_code,
594 stop_code: if any_essential {
595 "EssentialContainerExited"
596 } else {
597 "TaskCompleted"
598 },
599 });
600 }
601 sleep(Duration::from_millis(200)).await;
602 }
603 }
604
605 /// Block the launch of a dependent container until its upstream
606 /// reaches the requested `dependsOn[].condition`. We poll
607 /// `docker inspect` at a small interval; the wait is bounded by an
608 /// AWS-style timeout (120s by default — long enough for image
609 /// startup but short enough to surface bugs as a clean
610 /// `ContainerStart` failure).
611 ///
612 /// `upstream_has_health_check` is needed for the `HEALTHY` branch:
613 /// when the upstream has no healthCheck, AWS treats `HEALTHY` as
614 /// immediately satisfied (otherwise the dependent would block
615 /// forever, since docker reports `Health.Status` only when the
616 /// container has a HEALTHCHECK directive).
617 pub(super) async fn wait_for_depends_on(
618 &self,
619 upstream: &RunningContainer,
620 condition: DependsOnCondition,
621 upstream_has_health_check: bool,
622 ) -> Result<(), RuntimeError> {
623 // Bounded wait — chosen to comfortably cover slow init scripts
624 // without letting a wedged dependency stall a task indefinitely.
625 const WAIT_TIMEOUT: Duration = Duration::from_secs(120);
626 const POLL_INTERVAL: Duration = Duration::from_millis(200);
627
628 // HEALTHY against an upstream without a healthCheck: AWS treats
629 // this as immediately satisfied because there's no probe to
630 // observe. Skip the polling loop entirely so the dependent isn't
631 // wedged forever waiting for a status that docker will never set.
632 if matches!(condition, DependsOnCondition::Healthy) && !upstream_has_health_check {
633 return Ok(());
634 }
635
636 let deadline = std::time::Instant::now() + WAIT_TIMEOUT;
637 loop {
638 let inspect = inspect_container_state(&self.cli, &upstream.container_id).await;
639 if let Some(state) = inspect {
640 if condition_is_met(condition, &state) {
641 return Ok(());
642 }
643 // SUCCESS specifically: if the container exited with a
644 // non-zero code, the gate can never be satisfied. Bail
645 // immediately rather than waiting for the timeout — this
646 // matches ECS's "stoppedReason: dependency failed" path.
647 if matches!(condition, DependsOnCondition::Success)
648 && state.exited
649 && state.exit_code != 0
650 {
651 return Err(RuntimeError::ContainerStart(format!(
652 "dependency on container {} ({}) failed: upstream exited with code {}",
653 upstream.name,
654 DependsOnCondition::Success.as_aws_str(),
655 state.exit_code,
656 )));
657 }
658 }
659 if std::time::Instant::now() >= deadline {
660 return Err(RuntimeError::ContainerStart(format!(
661 "timed out waiting for container {} to reach condition {}",
662 upstream.name,
663 condition.as_aws_str(),
664 )));
665 }
666 tokio::time::sleep(POLL_INTERVAL).await;
667 }
668 }
669
670 /// Best-effort cleanup of containers we already started when a later
671 /// container in the task failed to launch. Without this, half-launched
672 /// tasks leak docker containers. `task_id` mirrors the value used at
673 /// network creation so `network rm` targets the right name —
674 /// deriving it from a container_id prefix was wrong (container ids
675 /// are docker-assigned, not task-shaped).
676 pub(super) fn cleanup_partial_start(&self, started: &[RunningContainer], task_id: &str) {
677 let cli = self.cli.clone();
678 let ids: Vec<String> = started.iter().map(|c| c.container_id.clone()).collect();
679 let network = format!("fakecloud-ecs-{task_id}");
680 // Drop the runtime map entry we pre-registered before the launch loop
681 // so a failed launch doesn't leave a stale (now-empty) key that
682 // future StopTask calls would treat as a live task.
683 self.containers.write().remove(task_id);
684 tokio::spawn(async move {
685 for id in ids {
686 let _ = Command::new(&cli).args(["rm", "-f", &id]).output().await;
687 }
688 let _ = Command::new(&cli)
689 .args(["network", "rm", &network])
690 .output()
691 .await;
692 });
693 }
694
695 /// Kill every container behind a task with the configured stop
696 /// timeout. Returns true if at least one container was killed. Called
697 /// synchronously from `StopTask`; the wait loop in `run_task_inner`
698 /// observes the exits and transitions the task to `STOPPED`.
699 pub async fn stop_task(&self, task_id: &str, reason: &str) -> bool {
700 if let Some(k) = &self.k8s {
701 tracing::info!(task = %task_id, reason = %reason, "ecs task stop requested (k8s)");
702 return k.stop_task(task_id).await;
703 }
704 let containers = self.containers.read().get(task_id).cloned();
705 let Some(list) = containers else {
706 return false;
707 };
708 if list.is_empty() {
709 return false;
710 }
711 // `docker stop` sends SIGTERM then SIGKILL after a timeout.
712 for (_name, id) in &list {
713 let _ = Command::new(&self.cli)
714 .args(["stop", "--time", "10", id])
715 .output()
716 .await;
717 }
718 tracing::info!(task = %task_id, reason = %reason, "ecs task stop requested");
719 true
720 }
721
722 /// Kill every running container the runtime owns. Called on reset /
723 /// shutdown so docker state matches fakecloud state after a fresh
724 /// boot.
725 pub async fn stop_all(&self) {
726 if let Some(k) = &self.k8s {
727 k.stop_all().await;
728 return;
729 }
730 let ids: Vec<String> = self
731 .containers
732 .read()
733 .values()
734 .flat_map(|list| list.iter().map(|(_, id)| id.clone()))
735 .collect();
736 for id in ids {
737 let _ = Command::new(&self.cli).args(["kill", &id]).output().await;
738 let _ = Command::new(&self.cli).args(["rm", &id]).output().await;
739 }
740 self.containers.write().clear();
741 }
742}