Skip to main content

fakecloud_ecs/
runtime.rs

1//! Docker/Podman-based ECS task execution.
2//!
3//! Mirrors the Lambda `ContainerRuntime` approach (auto-detect CLI, forward
4//! localhost → host.docker.internal) but scoped for ECS's different
5//! lifecycle: tasks are ephemeral, so there is no warm-container pool. Each
6//! `run_task` spawns a background tokio task that pulls the image, starts
7//! the container, waits for exit, captures logs, and updates shared ECS
8//! state in place.
9
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use base64::Engine;
15use chrono::Utc;
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_logs::ingest::{append_events, IngestEvent};
18use fakecloud_logs::state::SharedLogsState;
19use fakecloud_secretsmanager::state::SharedSecretsManagerState;
20use fakecloud_ssm::state::SharedSsmState;
21use parking_lot::RwLock;
22use tempfile::TempDir;
23use tokio::process::Command;
24
25use crate::state::{LifecycleEvent, SharedEcsState};
26
27#[derive(Debug, thiserror::Error)]
28pub enum RuntimeError {
29    #[error("container CLI not found (tried docker, podman)")]
30    NoCli,
31    #[error("image pull failed: {0}")]
32    ImagePull(String),
33    #[error("container start failed: {0}")]
34    ContainerStart(String),
35    #[error("docker wait failed: {0}")]
36    Wait(String),
37}
38
39/// Docker/Podman executor for ECS tasks.
40pub struct EcsRuntime {
41    cli: String,
42    host_ip: String,
43    /// Port the main fakecloud server bound to. Used to translate AWS
44    /// ECR URIs (`<acct>.dkr.ecr.<region>.amazonaws.com/<repo>:<tag>`) to
45    /// the local OCI v2 endpoint (`127.0.0.1:<port>/<repo>:<tag>`) so
46    /// tasks can pull images pushed to fakecloud's own ECR.
47    server_port: u16,
48    /// Isolated DOCKER_CONFIG dir pre-populated with Basic auth for
49    /// `127.0.0.1:<port>`; keeps the host user's `~/.docker/config.json`
50    /// untouched and lets `docker pull` succeed against fakecloud ECR
51    /// without a prior `aws ecr get-login-password | docker login`.
52    docker_config: Option<Arc<TempDir>>,
53    /// Tracks container IDs per task ID so `stop_task` can kill in-flight
54    /// work without needing to block on the spawned executor future.
55    containers: RwLock<std::collections::HashMap<String, String>>,
56    /// Cross-service delivery bus — emits `aws.ecs` EventBridge events
57    /// on task state transitions when wired. `None` if the server started
58    /// without EventBridge configured (or for unit tests).
59    delivery_bus: Option<Arc<DeliveryBus>>,
60    /// CloudWatch Logs state — when set, tasks whose container definition
61    /// declares the `awslogs` log driver get their captured stdout/stderr
62    /// forwarded to a log group/stream under this shared state.
63    logs_state: Option<SharedLogsState>,
64    /// SecretsManager state for resolving `containerDefinition.secrets[]`
65    /// entries whose `valueFrom` is a SecretsManager ARN.
66    secretsmanager_state: Option<SharedSecretsManagerState>,
67    /// SSM Parameter Store state for resolving `secrets[]` entries whose
68    /// `valueFrom` is an SSM parameter ARN.
69    ssm_state: Option<SharedSsmState>,
70}
71
72impl EcsRuntime {
73    /// Auto-detect Docker or Podman. Returns `None` if neither is
74    /// available. Honours `FAKECLOUD_CONTAINER_CLI` for explicit override.
75    /// `server_port` is the port the main fakecloud server bound to;
76    /// needed to resolve AWS ECR URIs against the local OCI v2 registry.
77    pub fn new(server_port: u16) -> Option<Self> {
78        let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
79            if cli_works(&cli) {
80                cli
81            } else {
82                return None;
83            }
84        } else if cli_works("docker") {
85            "docker".to_string()
86        } else if cli_works("podman") {
87            "podman".to_string()
88        } else {
89            return None;
90        };
91        let host_ip = if cfg!(target_os = "linux") {
92            "172.17.0.1".to_string()
93        } else {
94            "host-gateway".to_string()
95        };
96        let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
97        Some(Self {
98            cli,
99            host_ip,
100            server_port,
101            docker_config,
102            containers: RwLock::new(std::collections::HashMap::new()),
103            delivery_bus: None,
104            logs_state: None,
105            secretsmanager_state: None,
106            ssm_state: None,
107        })
108    }
109
110    /// Path suitable for `DOCKER_CONFIG`. `None` if the tempdir setup
111    /// failed; in that case pulls fall back to the user's own config and
112    /// will only work if they've already logged in.
113    fn docker_config_path(&self) -> Option<PathBuf> {
114        self.docker_config.as_ref().map(|d| d.path().to_path_buf())
115    }
116
117    /// Build a `Command` for the container CLI with `DOCKER_CONFIG` set
118    /// to our isolated tempdir so fakecloud ECR auth works out of the box.
119    fn cli_command(&self) -> Command {
120        let mut cmd = Command::new(&self.cli);
121        if let Some(p) = self.docker_config_path() {
122            cmd.env("DOCKER_CONFIG", p);
123        }
124        cmd
125    }
126
127    pub fn cli_name(&self) -> &str {
128        &self.cli
129    }
130
131    /// Wire EventBridge delivery so task state transitions emit
132    /// `aws.ecs` / `ECS Task State Change` events.
133    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
134        self.delivery_bus = Some(bus);
135        self
136    }
137
138    /// Wire CloudWatch Logs state so tasks using the `awslogs` driver
139    /// get their captured stdout/stderr forwarded.
140    pub fn with_logs(mut self, logs: SharedLogsState) -> Self {
141        self.logs_state = Some(logs);
142        self
143    }
144
145    /// Wire SecretsManager state so `secrets[].valueFrom` entries
146    /// pointing at SecretsManager ARNs resolve at task launch.
147    pub fn with_secretsmanager(mut self, state: SharedSecretsManagerState) -> Self {
148        self.secretsmanager_state = Some(state);
149        self
150    }
151
152    /// Wire SSM state so `secrets[].valueFrom` entries pointing at
153    /// Parameter Store ARNs resolve at task launch.
154    pub fn with_ssm(mut self, state: SharedSsmState) -> Self {
155        self.ssm_state = Some(state);
156        self
157    }
158
159    /// Spawn the task asynchronously. Returns immediately after transitioning
160    /// the task to `PENDING`; the background task advances it to `RUNNING`
161    /// once the container is created and to `STOPPED` once the container
162    /// exits.
163    pub fn run_task(self: Arc<Self>, state: SharedEcsState, task_id: String, account_id: String) {
164        let rt = self.clone();
165        tokio::spawn(async move {
166            if let Err(err) = rt.run_task_inner(&state, &task_id, &account_id).await {
167                tracing::warn!(%err, task = %task_id, "ecs task execution failed");
168                // Also surface on stderr so nextest's captured-output for a
169                // failed E2E shows the reason instead of just "empty logs".
170                eprintln!("[ecs] task {task_id} failed: {err}");
171                finalize_failure(&state, &account_id, &task_id, &err.to_string());
172                rt.emit_state_change(
173                    &state,
174                    &account_id,
175                    &task_id,
176                    "STOPPED",
177                    Some(("TaskFailedToStart", err.to_string())),
178                );
179            }
180        });
181    }
182
183    async fn run_task_inner(
184        &self,
185        state: &SharedEcsState,
186        task_id: &str,
187        account_id: &str,
188    ) -> Result<(), RuntimeError> {
189        let (image, mut env, entry_point, command, awslogs_container, secrets_refs, has_task_role) = {
190            let accounts = state.read();
191            let s = accounts
192                .get(account_id)
193                .ok_or_else(|| RuntimeError::ContainerStart("account missing".into()))?;
194            let task = s
195                .tasks
196                .get(task_id)
197                .ok_or_else(|| RuntimeError::ContainerStart("task missing".into()))?;
198            let container = task
199                .containers
200                .first()
201                .ok_or_else(|| RuntimeError::ContainerStart("task has no containers".into()))?;
202            let def = find_container_definition(s, &task.family, task.revision, &container.name);
203            let secrets = def
204                .as_ref()
205                .and_then(|d| d.get("secrets").and_then(|v| v.as_array()).cloned())
206                .map(|arr| {
207                    arr.iter()
208                        .filter_map(|e| {
209                            let name = e.get("name").and_then(|v| v.as_str())?.to_string();
210                            let value_from =
211                                e.get("valueFrom").and_then(|v| v.as_str())?.to_string();
212                            Some((name, value_from))
213                        })
214                        .collect::<Vec<_>>()
215                })
216                .unwrap_or_default();
217            let str_array = |key: &str| -> Vec<String> {
218                def.as_ref()
219                    .and_then(|d| d.get(key).and_then(|v| v.as_array()).cloned())
220                    .map(|arr| {
221                        arr.iter()
222                            .filter_map(|v| v.as_str().map(String::from))
223                            .collect::<Vec<_>>()
224                    })
225                    .unwrap_or_default()
226            };
227            (
228                container.image.clone(),
229                def.as_ref()
230                    .and_then(|d| d.get("environment").and_then(|v| v.as_array()).cloned())
231                    .map(|arr| {
232                        arr.iter()
233                            .filter_map(|e| {
234                                let k = e.get("name").and_then(|v| v.as_str())?;
235                                let v = e.get("value").and_then(|v| v.as_str()).unwrap_or("");
236                                Some((k.to_string(), v.to_string()))
237                            })
238                            .collect::<Vec<_>>()
239                    })
240                    .unwrap_or_default(),
241                str_array("entryPoint"),
242                str_array("command"),
243                container.name.clone(),
244                secrets,
245                task.task_role_arn.is_some(),
246            )
247        };
248
249        // Resolve `containerDefinition.secrets[]` entries. Each entry is
250        // `{name, valueFrom}`; `valueFrom` is either a SecretsManager
251        // secret ARN (`arn:aws:secretsmanager:...:secret:name-AbCdEf`)
252        // or an SSM parameter ARN (`arn:aws:ssm:...:parameter/name`).
253        // Both are looked up synchronously against the in-process shared
254        // state and appended as env vars. Failed lookups fail the task —
255        // matching real ECS's "failed to retrieve secret" behaviour.
256        for (name, value_from) in &secrets_refs {
257            let resolved = self.resolve_secret(account_id, value_from);
258            match resolved {
259                Some(v) => env.push((name.clone(), v)),
260                None => {
261                    return Err(RuntimeError::ContainerStart(format!(
262                        "failed to resolve secret {name} from {value_from}"
263                    )))
264                }
265            }
266        }
267
268        // Inject `AWS_CONTAINER_CREDENTIALS_FULL_URI` when the task has
269        // a `taskRoleArn`. AWS SDKs pick this up via the default
270        // credential-provider chain. The endpoint runs on the main
271        // fakecloud server; the container reaches it via
272        // `host.docker.internal:<port>`.
273        if has_task_role {
274            env.push((
275                "AWS_CONTAINER_CREDENTIALS_FULL_URI".into(),
276                format!(
277                    "http://host.docker.internal:{}/_fakecloud/ecs/creds/{}",
278                    self.server_port, task_id
279                ),
280            ));
281        }
282
283        // Pull the image first so we can surface pull errors cleanly.
284        // AWS private-ECR URIs (`<acct>.dkr.ecr.<region>.amazonaws.com/...`)
285        // are translated to fakecloud's local OCI v2 endpoint; after
286        // pulling the local URI we retag it to the AWS URI so the
287        // container and task state carry the user-facing image reference.
288        mark_pull_started(state, account_id, task_id);
289        let local_pull_uri = fakecloud_core::ecr_uri::translate_to_local(&image, self.server_port);
290        let pull_uri = local_pull_uri.as_deref().unwrap_or(&image);
291        let pull_out = self
292            .cli_command()
293            .args(["pull", pull_uri])
294            .output()
295            .await
296            .map_err(|e| RuntimeError::ImagePull(e.to_string()))?;
297        if !pull_out.status.success() {
298            let err = String::from_utf8_lossy(&pull_out.stderr).to_string();
299            return Err(RuntimeError::ImagePull(err));
300        }
301        // Retag the local pull URI to the AWS URI so `docker run` finds
302        // the image under the user-facing name. Digest-pinned refs can't
303        // be `docker tag` targets, so we fall through and run under the
304        // local URI in that case (cosmetic tradeoff — the task's image
305        // field will show the 127.0.0.1 URI instead of the AWS digest).
306        let run_image = if let Some(ref local_uri) = local_pull_uri {
307            if fakecloud_core::ecr_uri::is_digest_ref(&image) {
308                local_uri.clone()
309            } else {
310                let _ = self
311                    .cli_command()
312                    .args(["tag", local_uri, &image])
313                    .output()
314                    .await;
315                image.clone()
316            }
317        } else {
318            image.clone()
319        };
320        mark_pull_stopped(state, account_id, task_id);
321
322        // Run the container detached so we can track its ID and wait
323        // asynchronously. `-d` prints the container ID on stdout.
324        let mut cmd = Command::new(&self.cli);
325        cmd.args(["run", "-d"])
326            .args(["--label", &format!("fakecloud-ecs-task={}", task_id)])
327            .args([
328                "--add-host",
329                &format!("host.docker.internal:{}", self.host_ip),
330            ]);
331        for (k, v) in &env {
332            let transformed = v
333                .replace("http://127.0.0.1:", "http://host.docker.internal:")
334                .replace("https://127.0.0.1:", "https://host.docker.internal:")
335                .replace("http://localhost:", "http://host.docker.internal:")
336                .replace("https://localhost:", "https://host.docker.internal:");
337            cmd.arg("-e").arg(format!("{}={}", k, transformed));
338        }
339        // `containerDefinition.entryPoint` overrides the image's ENTRYPOINT.
340        // Docker CLI's `--entrypoint` only takes a single executable; any
341        // additional entryPoint elements are appended as positional args
342        // before the user-supplied `command[]`, which matches how docker
343        // composes ENTRYPOINT + CMD at exec time.
344        if let Some(first) = entry_point.first() {
345            cmd.args(["--entrypoint", first]);
346        }
347        cmd.arg(&run_image);
348        for arg in entry_point.iter().skip(1) {
349            cmd.arg(arg);
350        }
351        for arg in &command {
352            cmd.arg(arg);
353        }
354        let run_out = cmd
355            .output()
356            .await
357            .map_err(|e| RuntimeError::ContainerStart(e.to_string()))?;
358        if !run_out.status.success() {
359            let err = String::from_utf8_lossy(&run_out.stderr).to_string();
360            return Err(RuntimeError::ContainerStart(err));
361        }
362        let container_id = String::from_utf8_lossy(&run_out.stdout).trim().to_string();
363        self.containers
364            .write()
365            .insert(task_id.to_string(), container_id.clone());
366        mark_running(
367            state,
368            account_id,
369            task_id,
370            &container_id,
371            &awslogs_container,
372        );
373        self.emit_state_change(state, account_id, task_id, "RUNNING", None);
374
375        // Wait for the container to exit. `docker wait` blocks until exit
376        // and prints the numeric exit code to stdout.
377        let wait_out = Command::new(&self.cli)
378            .args(["wait", &container_id])
379            .output()
380            .await
381            .map_err(|e| RuntimeError::Wait(e.to_string()))?;
382        if !wait_out.status.success() {
383            // `docker wait` itself failed — treat this as a Wait error so
384            // the task flips via `finalize_failure` rather than silently
385            // claiming the container exited normally.
386            let err = String::from_utf8_lossy(&wait_out.stderr).to_string();
387            return Err(RuntimeError::Wait(err));
388        }
389        let exit_code: i64 = String::from_utf8_lossy(&wait_out.stdout)
390            .trim()
391            .parse()
392            .unwrap_or(-1);
393
394        // Capture combined stdout+stderr via `docker logs`.
395        let logs_out = Command::new(&self.cli)
396            .args(["logs", &container_id])
397            .output()
398            .await
399            .map_err(|e| RuntimeError::Wait(e.to_string()))?;
400        let mut captured = String::new();
401        captured.push_str(&String::from_utf8_lossy(&logs_out.stdout));
402        captured.push_str(&String::from_utf8_lossy(&logs_out.stderr));
403
404        // Best-effort cleanup; failures here shouldn't keep the task from
405        // transitioning to STOPPED.
406        let _ = Command::new(&self.cli)
407            .args(["rm", &container_id])
408            .output()
409            .await;
410        self.containers.write().remove(task_id);
411
412        // Forward logs BEFORE flipping the task to STOPPED so a client
413        // that polls DescribeTasks and immediately queries DescribeLogStreams
414        // can't observe the STOPPED transition before the awslogs group/stream
415        // has been materialised.
416        self.forward_awslogs_if_configured(state, account_id, task_id, &captured);
417        finalize_stopped(
418            state,
419            account_id,
420            task_id,
421            exit_code,
422            &captured,
423            "EssentialContainerExited",
424            None,
425        );
426        self.emit_state_change(
427            state,
428            account_id,
429            task_id,
430            "STOPPED",
431            Some((
432                "EssentialContainerExited",
433                format!("Exit code {}", exit_code),
434            )),
435        );
436        Ok(())
437    }
438
439    /// Resolve a `secrets[].valueFrom` reference to the actual secret
440    /// payload. Supports SecretsManager secret ARNs and SSM parameter
441    /// ARNs; returns `None` when the referenced state isn't wired or
442    /// the lookup misses.
443    fn resolve_secret(&self, account_id: &str, value_from: &str) -> Option<String> {
444        if value_from.contains(":secret:") {
445            let state = self.secretsmanager_state.as_ref()?;
446            let accounts = state.read();
447            let sm = accounts.get(account_id)?;
448            // ARN shape: arn:aws:secretsmanager:<region>:<acct>:secret:<name>-<6char>
449            // Stored key is the secret name (no suffix). Strip the
450            // AWS-generated 6-char suffix when comparing.
451            let arn_tail = value_from.rsplit(":secret:").next()?;
452            let name = arn_tail
453                .rsplit_once('-')
454                .map(|(n, _)| n)
455                .unwrap_or(arn_tail);
456            let secret = sm.secrets.get(name).or_else(|| sm.secrets.get(arn_tail))?;
457            let version_id = secret.current_version_id.as_ref()?;
458            let v = secret.versions.get(version_id)?;
459            return v.secret_string.clone();
460        }
461        if value_from.contains(":parameter") {
462            let state = self.ssm_state.as_ref()?;
463            let accounts = state.read();
464            let ssm = accounts.get(account_id)?;
465            // ARN shape: arn:aws:ssm:<region>:<acct>:parameter/<name>
466            // Parameters are stored keyed by name (with leading slash)
467            // or without, depending on how they were created. Try both.
468            let after = value_from.rsplit(":parameter").next()?;
469            let name_with_slash = after.trim_start_matches('/');
470            return ssm
471                .parameters
472                .get(&format!("/{name_with_slash}"))
473                .or_else(|| ssm.parameters.get(name_with_slash))
474                .map(|p| p.value.clone());
475        }
476        None
477    }
478
479    /// Emit an `ECS Task State Change` EventBridge event. No-op when no
480    /// delivery bus is wired. Matches AWS event shape so downstream
481    /// rules can filter on `detail.lastStatus`, `detail.stopCode`, etc.
482    fn emit_state_change(
483        &self,
484        state: &SharedEcsState,
485        account_id: &str,
486        task_id: &str,
487        last_status: &str,
488        stop: Option<(&str, String)>,
489    ) {
490        let Some(ref bus) = self.delivery_bus else {
491            return;
492        };
493        let Some(task_view) = snapshot_task(state, account_id, task_id) else {
494            return;
495        };
496        let mut detail = serde_json::json!({
497            "taskArn": task_view.task_arn,
498            "clusterArn": task_view.cluster_arn,
499            "lastStatus": last_status,
500            "desiredStatus": if last_status == "STOPPED" { "STOPPED" } else { "RUNNING" },
501            "launchType": task_view.launch_type,
502            "group": task_view.group,
503            "taskDefinitionArn": task_view.task_definition_arn,
504            "containers": task_view.containers,
505        });
506        if let Some((code, reason)) = stop {
507            detail["stopCode"] = code.into();
508            detail["stoppedReason"] = reason.into();
509        }
510        bus.put_event_to_eventbridge(
511            "aws.ecs",
512            "ECS Task State Change",
513            &detail.to_string(),
514            "default",
515        );
516    }
517
518    /// Forward captured stdout/stderr to CloudWatch Logs when the task's
519    /// container definition declares the `awslogs` log driver. No-op when
520    /// logs_state isn't wired or the task has no awslogs config.
521    fn forward_awslogs_if_configured(
522        &self,
523        state: &SharedEcsState,
524        account_id: &str,
525        task_id: &str,
526        captured: &str,
527    ) {
528        let Some(ref logs) = self.logs_state else {
529            return;
530        };
531        // Clone out of the read guard so we don't hold it across the logs
532        // state write.
533        let (cfg, task_region) = {
534            let accounts = state.read();
535            let Some(s) = accounts.get(account_id) else {
536                return;
537            };
538            let Some(task) = s.tasks.get(task_id) else {
539                return;
540            };
541            let Some(ref cfg) = task.awslogs else {
542                return;
543            };
544            (cfg.clone(), s.region.clone())
545        };
546        if captured.is_empty() {
547            return;
548        }
549        let now = Utc::now().timestamp_millis();
550        let stream_name = cfg.stream_name(task_id);
551        let events: Vec<IngestEvent> = captured
552            .lines()
553            .enumerate()
554            .map(|(i, line)| IngestEvent {
555                // Stagger within the same millisecond so CloudWatch's
556                // chronological-order invariant holds without relying on
557                // the host clock's resolution.
558                timestamp_ms: now.saturating_add(i as i64),
559                message: line.to_string(),
560            })
561            .collect();
562        append_events(
563            logs,
564            account_id,
565            &task_region,
566            &cfg.group,
567            &stream_name,
568            &events,
569        );
570    }
571
572    /// Kill the container behind a task (if any) with the configured stop
573    /// timeout. Returns true if a container was killed. Called synchronously
574    /// from `StopTask`; the wait loop in `run_task_inner` observes the
575    /// exit and transitions the task to `STOPPED`.
576    pub async fn stop_task(&self, task_id: &str, reason: &str) -> bool {
577        let container_id = self.containers.read().get(task_id).cloned();
578        let Some(id) = container_id else {
579            return false;
580        };
581        // `docker stop` sends SIGTERM then SIGKILL after a timeout.
582        let _ = Command::new(&self.cli)
583            .args(["stop", "--time", "10", &id])
584            .output()
585            .await;
586        tracing::info!(task = %task_id, reason = %reason, "ecs task stop requested");
587        true
588    }
589
590    /// Kill every running container the runtime owns. Called on reset /
591    /// shutdown so docker state matches fakecloud state after a fresh
592    /// boot.
593    pub async fn stop_all(&self) {
594        let ids: Vec<String> = self.containers.read().values().cloned().collect();
595        for id in ids {
596            let _ = Command::new(&self.cli).args(["kill", &id]).output().await;
597            let _ = Command::new(&self.cli).args(["rm", &id]).output().await;
598        }
599        self.containers.write().clear();
600    }
601}
602
603struct TaskSnapshot {
604    task_arn: String,
605    cluster_arn: String,
606    launch_type: String,
607    group: Option<String>,
608    task_definition_arn: String,
609    containers: serde_json::Value,
610}
611
612fn snapshot_task(state: &SharedEcsState, account_id: &str, task_id: &str) -> Option<TaskSnapshot> {
613    let accounts = state.read();
614    let s = accounts.get(account_id)?;
615    let task = s.tasks.get(task_id)?;
616    Some(TaskSnapshot {
617        task_arn: task.task_arn.clone(),
618        cluster_arn: task.cluster_arn.clone(),
619        launch_type: task.launch_type.clone(),
620        group: task.group.clone(),
621        task_definition_arn: task.task_definition_arn.clone(),
622        containers: serde_json::Value::Array(
623            task.containers
624                .iter()
625                .map(|c| {
626                    serde_json::json!({
627                        "containerArn": c.container_arn,
628                        "name": c.name,
629                        "image": c.image,
630                        "lastStatus": c.last_status,
631                        "exitCode": c.exit_code,
632                        "reason": c.reason,
633                    })
634                })
635                .collect(),
636        ),
637    })
638}
639
640fn cli_works(cli: &str) -> bool {
641    std::process::Command::new(cli)
642        .arg("info")
643        .stdout(std::process::Stdio::null())
644        .stderr(std::process::Stdio::null())
645        .status()
646        .map(|s| s.success())
647        .unwrap_or(false)
648}
649
650/// Build an isolated docker config directory with Basic auth for
651/// fakecloud ECR at `127.0.0.1:<port>`. Lets `docker pull/push/tag`
652/// work against the local OCI v2 registry without requiring the user
653/// to run `aws ecr get-login-password | docker login` first.
654fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
655    let dir = TempDir::new().ok()?;
656    let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-ecs-runtime");
657    let config = serde_json::json!({
658        "auths": {
659            format!("127.0.0.1:{server_port}"): { "auth": auth },
660        }
661    });
662    std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
663    Some(dir)
664}
665
666fn find_container_definition(
667    state: &crate::state::EcsState,
668    family: &str,
669    revision: i32,
670    name: &str,
671) -> Option<serde_json::Value> {
672    state
673        .task_definitions
674        .get(family)?
675        .get(&revision)?
676        .container_definitions
677        .iter()
678        .find(|c| c.get("name").and_then(|v| v.as_str()) == Some(name))
679        .cloned()
680}
681
682fn mark_pull_started(state: &SharedEcsState, account_id: &str, task_id: &str) {
683    let mut accounts = state.write();
684    let Some(s) = accounts.get_mut(account_id) else {
685        return;
686    };
687    let task_arn_cluster = s
688        .tasks
689        .get(task_id)
690        .map(|t| (t.task_arn.clone(), t.cluster_arn.clone()));
691    if let Some(task) = s.tasks.get_mut(task_id) {
692        task.pull_started_at = Some(Utc::now());
693    }
694    if let Some((arn, cluster_arn)) = task_arn_cluster {
695        s.push_event(LifecycleEvent {
696            at: Utc::now(),
697            event_type: "PullStarted".into(),
698            task_arn: Some(arn),
699            cluster_arn: Some(cluster_arn),
700            last_status: Some("PENDING".into()),
701            detail: serde_json::json!({}),
702        });
703    }
704}
705
706fn mark_pull_stopped(state: &SharedEcsState, account_id: &str, task_id: &str) {
707    let mut accounts = state.write();
708    let Some(s) = accounts.get_mut(account_id) else {
709        return;
710    };
711    if let Some(task) = s.tasks.get_mut(task_id) {
712        task.pull_stopped_at = Some(Utc::now());
713    }
714}
715
716fn mark_running(
717    state: &SharedEcsState,
718    account_id: &str,
719    task_id: &str,
720    container_id: &str,
721    container_name: &str,
722) {
723    let mut accounts = state.write();
724    let Some(s) = accounts.get_mut(account_id) else {
725        return;
726    };
727    let (arn, cluster_arn) = {
728        let Some(task) = s.tasks.get_mut(task_id) else {
729            return;
730        };
731        task.last_status = "RUNNING".into();
732        task.connectivity = "CONNECTED".into();
733        task.connectivity_at = Some(Utc::now());
734        task.started_at = Some(Utc::now());
735        if let Some(c) = task
736            .containers
737            .iter_mut()
738            .find(|c| c.name == container_name)
739        {
740            c.runtime_id = Some(container_id.into());
741            c.last_status = "RUNNING".into();
742        }
743        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
744            cluster.running_tasks_count += 1;
745            if cluster.pending_tasks_count > 0 {
746                cluster.pending_tasks_count -= 1;
747            }
748        }
749        (task.task_arn.clone(), task.cluster_arn.clone())
750    };
751    s.push_event(LifecycleEvent {
752        at: Utc::now(),
753        event_type: "TaskStateChange".into(),
754        task_arn: Some(arn),
755        cluster_arn: Some(cluster_arn),
756        last_status: Some("RUNNING".into()),
757        detail: serde_json::json!({}),
758    });
759}
760
761fn finalize_stopped(
762    state: &SharedEcsState,
763    account_id: &str,
764    task_id: &str,
765    exit_code: i64,
766    captured: &str,
767    stop_code: &str,
768    stopped_reason: Option<String>,
769) {
770    let mut accounts = state.write();
771    let Some(s) = accounts.get_mut(account_id) else {
772        return;
773    };
774    let (arn, cluster_arn) = {
775        let Some(task) = s.tasks.get_mut(task_id) else {
776            return;
777        };
778        task.last_status = "STOPPED".into();
779        task.desired_status = "STOPPED".into();
780        task.stopping_at = task.stopping_at.or(Some(Utc::now()));
781        task.stopped_at = Some(Utc::now());
782        task.stop_code = Some(stop_code.into());
783        task.stopped_reason = stopped_reason.or(Some(format!("Exit code {}", exit_code)));
784        task.captured_logs = captured.to_string();
785        for c in task.containers.iter_mut() {
786            c.last_status = "STOPPED".into();
787            if c.exit_code.is_none() {
788                c.exit_code = Some(exit_code);
789            }
790        }
791        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
792            if cluster.running_tasks_count > 0 {
793                cluster.running_tasks_count -= 1;
794            }
795        }
796        (task.task_arn.clone(), task.cluster_arn.clone())
797    };
798    s.push_event(LifecycleEvent {
799        at: Utc::now(),
800        event_type: "TaskStateChange".into(),
801        task_arn: Some(arn),
802        cluster_arn: Some(cluster_arn),
803        last_status: Some("STOPPED".into()),
804        detail: serde_json::json!({
805            "exitCode": exit_code,
806            "stopCode": stop_code,
807        }),
808    });
809}
810
811fn finalize_failure(state: &SharedEcsState, account_id: &str, task_id: &str, reason: &str) {
812    let mut accounts = state.write();
813    let Some(s) = accounts.get_mut(account_id) else {
814        return;
815    };
816    let (arn, cluster_arn) = {
817        let Some(task) = s.tasks.get_mut(task_id) else {
818            return;
819        };
820        // Capture the prior status before we clobber it: if the task had
821        // already reached RUNNING when execution failed (e.g. `docker wait`
822        // blew up after the container started), we owe the cluster a
823        // running-tasks decrement. Tasks that died before RUNNING only
824        // ever incremented pendingTasksCount.
825        let was_running = task.last_status == "RUNNING";
826        task.last_status = "STOPPED".into();
827        task.desired_status = "STOPPED".into();
828        task.stopped_at = Some(Utc::now());
829        task.stop_code = Some("TaskFailedToStart".into());
830        task.stopped_reason = Some(reason.to_string());
831        // Surface the failure reason on the /logs endpoint — without this,
832        // a task that never reached RUNNING returns an empty log string,
833        // leaving E2E assertions with no diagnostic.
834        task.captured_logs = format!("[task failed to start]: {reason}");
835        for c in task.containers.iter_mut() {
836            c.last_status = "STOPPED".into();
837            c.reason = Some(reason.to_string());
838        }
839        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
840            if was_running {
841                if cluster.running_tasks_count > 0 {
842                    cluster.running_tasks_count -= 1;
843                }
844            } else if cluster.pending_tasks_count > 0 {
845                cluster.pending_tasks_count -= 1;
846            }
847        }
848        (task.task_arn.clone(), task.cluster_arn.clone())
849    };
850    s.push_event(LifecycleEvent {
851        at: Utc::now(),
852        event_type: "TaskFailedToStart".into(),
853        task_arn: Some(arn),
854        cluster_arn: Some(cluster_arn),
855        last_status: Some("STOPPED".into()),
856        detail: serde_json::json!({ "reason": reason }),
857    });
858}
859
860/// Short helper for tests + snapshot code to sleep between state
861/// transitions. Exposed on the crate boundary to keep test timing
862/// centralized.
863pub async fn sleep(duration: Duration) {
864    tokio::time::sleep(duration).await;
865}
866
867#[cfg(test)]
868mod tests {
869    use super::*;
870    use crate::state::{EcsState, Task};
871    use fakecloud_core::multi_account::MultiAccountState;
872    use parking_lot::RwLock;
873    use std::sync::Arc;
874
875    #[test]
876    fn cli_works_for_known_missing_binary_is_false() {
877        assert!(!cli_works("definitely-not-a-real-cli-binary-xyz"));
878    }
879
880    #[test]
881    fn aws_ecr_uris_translate_for_local_pull() {
882        assert_eq!(
883            fakecloud_core::ecr_uri::translate_to_local(
884                "123456789012.dkr.ecr.us-east-1.amazonaws.com/app:latest",
885                4566
886            )
887            .as_deref(),
888            Some("127.0.0.1:4566/app:latest")
889        );
890    }
891
892    fn make_task(task_id: &str) -> Task {
893        Task {
894            task_arn: format!("arn:aws:ecs:us-east-1:000000000000:task/default/{task_id}"),
895            task_id: task_id.into(),
896            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
897            cluster_name: "default".into(),
898            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
899            family: "app".into(),
900            revision: 1,
901            last_status: "PENDING".into(),
902            desired_status: "RUNNING".into(),
903            launch_type: "FARGATE".into(),
904            platform_version: None,
905            cpu: None,
906            memory: None,
907            containers: Vec::new(),
908            overrides: serde_json::json!({}),
909            started_by: None,
910            group: None,
911            connectivity: "CONNECTING".into(),
912            stop_code: None,
913            stopped_reason: None,
914            created_at: Utc::now(),
915            started_at: None,
916            stopping_at: None,
917            stopped_at: None,
918            pull_started_at: None,
919            pull_stopped_at: None,
920            connectivity_at: None,
921            started_by_ref_id: None,
922            execution_role_arn: None,
923            task_role_arn: None,
924            tags: Vec::new(),
925            awslogs: None,
926            captured_logs: String::new(),
927            protection: None,
928        }
929    }
930
931    #[test]
932    fn finalize_failure_writes_reason_into_captured_logs() {
933        let mut accounts: MultiAccountState<EcsState> =
934            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
935        let acct = accounts.get_or_create("000000000000");
936        acct.tasks.insert("t1".into(), make_task("t1"));
937        let state: SharedEcsState = Arc::new(RwLock::new(accounts));
938
939        finalize_failure(
940            &state,
941            "000000000000",
942            "t1",
943            "failed to resolve secret DB_PASSWORD",
944        );
945
946        let accounts = state.read();
947        let task = accounts
948            .get("000000000000")
949            .unwrap()
950            .tasks
951            .get("t1")
952            .unwrap();
953        assert_eq!(task.last_status, "STOPPED");
954        assert_eq!(task.stop_code.as_deref(), Some("TaskFailedToStart"));
955        assert!(
956            task.captured_logs
957                .contains("failed to resolve secret DB_PASSWORD"),
958            "captured_logs missing reason: {:?}",
959            task.captured_logs
960        );
961        assert!(
962            task.captured_logs.starts_with("[task failed to start]:"),
963            "captured_logs missing prefix: {:?}",
964            task.captured_logs
965        );
966    }
967}