Skip to main content

fakecloud_ecs/
runtime.rs

1//! Docker/Podman-based ECS task execution.
2//!
3//! Mirrors the Lambda `ContainerRuntime` approach (auto-detect CLI, forward
4//! localhost → host.docker.internal) but scoped for ECS's different
5//! lifecycle: tasks are ephemeral, so there is no warm-container pool. Each
6//! `run_task` spawns a background tokio task that pulls the image, starts
7//! the container, waits for exit, captures logs, and updates shared ECS
8//! state in place.
9
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use base64::Engine;
15use chrono::Utc;
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_logs::ingest::{append_events, IngestEvent};
18use fakecloud_logs::state::SharedLogsState;
19use fakecloud_secretsmanager::state::SharedSecretsManagerState;
20use fakecloud_ssm::state::SharedSsmState;
21use parking_lot::RwLock;
22use tempfile::TempDir;
23use tokio::process::Command;
24
25use crate::state::{LifecycleEvent, SharedEcsState, Task};
26
27#[derive(Debug, thiserror::Error)]
28pub enum RuntimeError {
29    #[error("container CLI not found (tried docker, podman)")]
30    NoCli,
31    #[error("image pull failed: {0}")]
32    ImagePull(String),
33    #[error("container start failed: {0}")]
34    ContainerStart(String),
35    #[error("docker wait failed: {0}")]
36    Wait(String),
37}
38
39/// Docker/Podman executor for ECS tasks.
40pub struct EcsRuntime {
41    cli: String,
42    host_ip: String,
43    /// Port the main fakecloud server bound to. Used to translate AWS
44    /// ECR URIs (`<acct>.dkr.ecr.<region>.amazonaws.com/<repo>:<tag>`) to
45    /// the local OCI v2 endpoint (`127.0.0.1:<port>/<repo>:<tag>`) so
46    /// tasks can pull images pushed to fakecloud's own ECR.
47    server_port: u16,
48    /// Isolated DOCKER_CONFIG dir pre-populated with Basic auth for
49    /// `127.0.0.1:<port>`; keeps the host user's `~/.docker/config.json`
50    /// untouched and lets `docker pull` succeed against fakecloud ECR
51    /// without a prior `aws ecr get-login-password | docker login`.
52    docker_config: Option<Arc<TempDir>>,
53    /// Tracks container IDs per task ID so `stop_task` can kill in-flight
54    /// work without needing to block on the spawned executor future.
55    containers: RwLock<std::collections::HashMap<String, String>>,
56    /// Cross-service delivery bus — emits `aws.ecs` EventBridge events
57    /// on task state transitions when wired. `None` if the server started
58    /// without EventBridge configured (or for unit tests).
59    delivery_bus: Option<Arc<DeliveryBus>>,
60    /// CloudWatch Logs state — when set, tasks whose container definition
61    /// declares the `awslogs` log driver get their captured stdout/stderr
62    /// forwarded to a log group/stream under this shared state.
63    logs_state: Option<SharedLogsState>,
64    /// SecretsManager state for resolving `containerDefinition.secrets[]`
65    /// entries whose `valueFrom` is a SecretsManager ARN.
66    secretsmanager_state: Option<SharedSecretsManagerState>,
67    /// SSM Parameter Store state for resolving `secrets[]` entries whose
68    /// `valueFrom` is an SSM parameter ARN.
69    ssm_state: Option<SharedSsmState>,
70}
71
72impl EcsRuntime {
73    /// Auto-detect Docker or Podman. Returns `None` if neither is
74    /// available. Honours `FAKECLOUD_CONTAINER_CLI` for explicit override.
75    /// `server_port` is the port the main fakecloud server bound to;
76    /// needed to resolve AWS ECR URIs against the local OCI v2 registry.
77    pub fn new(server_port: u16) -> Option<Self> {
78        let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
79            if cli_works(&cli) {
80                cli
81            } else {
82                return None;
83            }
84        } else if cli_works("docker") {
85            "docker".to_string()
86        } else if cli_works("podman") {
87            "podman".to_string()
88        } else {
89            return None;
90        };
91        let host_ip = if cfg!(target_os = "linux") {
92            "172.17.0.1".to_string()
93        } else {
94            "host-gateway".to_string()
95        };
96        let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
97        Some(Self {
98            cli,
99            host_ip,
100            server_port,
101            docker_config,
102            containers: RwLock::new(std::collections::HashMap::new()),
103            delivery_bus: None,
104            logs_state: None,
105            secretsmanager_state: None,
106            ssm_state: None,
107        })
108    }
109
110    /// Path suitable for `DOCKER_CONFIG`. `None` if the tempdir setup
111    /// failed; in that case pulls fall back to the user's own config and
112    /// will only work if they've already logged in.
113    fn docker_config_path(&self) -> Option<PathBuf> {
114        self.docker_config.as_ref().map(|d| d.path().to_path_buf())
115    }
116
117    /// Build a `Command` for the container CLI with `DOCKER_CONFIG` set
118    /// to our isolated tempdir so fakecloud ECR auth works out of the box.
119    fn cli_command(&self) -> Command {
120        let mut cmd = Command::new(&self.cli);
121        if let Some(p) = self.docker_config_path() {
122            cmd.env("DOCKER_CONFIG", p);
123        }
124        cmd
125    }
126
127    pub fn cli_name(&self) -> &str {
128        &self.cli
129    }
130
131    /// Wire EventBridge delivery so task state transitions emit
132    /// `aws.ecs` / `ECS Task State Change` events.
133    pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
134        self.delivery_bus = Some(bus);
135        self
136    }
137
138    /// Wire CloudWatch Logs state so tasks using the `awslogs` driver
139    /// get their captured stdout/stderr forwarded.
140    pub fn with_logs(mut self, logs: SharedLogsState) -> Self {
141        self.logs_state = Some(logs);
142        self
143    }
144
145    /// Wire SecretsManager state so `secrets[].valueFrom` entries
146    /// pointing at SecretsManager ARNs resolve at task launch.
147    pub fn with_secretsmanager(mut self, state: SharedSecretsManagerState) -> Self {
148        self.secretsmanager_state = Some(state);
149        self
150    }
151
152    /// Wire SSM state so `secrets[].valueFrom` entries pointing at
153    /// Parameter Store ARNs resolve at task launch.
154    pub fn with_ssm(mut self, state: SharedSsmState) -> Self {
155        self.ssm_state = Some(state);
156        self
157    }
158
159    /// Spawn the task asynchronously. Returns immediately after transitioning
160    /// the task to `PENDING`; the background task advances it to `RUNNING`
161    /// once the container is created and to `STOPPED` once the container
162    /// exits.
163    pub fn run_task(self: Arc<Self>, state: SharedEcsState, task_id: String, account_id: String) {
164        let rt = self.clone();
165        tokio::spawn(async move {
166            if let Err(err) = rt.run_task_inner(&state, &task_id, &account_id).await {
167                tracing::warn!(%err, task = %task_id, "ecs task execution failed");
168                finalize_failure(&state, &account_id, &task_id, &err.to_string());
169                rt.emit_state_change(
170                    &state,
171                    &account_id,
172                    &task_id,
173                    "STOPPED",
174                    Some(("TaskFailedToStart", err.to_string())),
175                );
176            }
177        });
178    }
179
180    async fn run_task_inner(
181        &self,
182        state: &SharedEcsState,
183        task_id: &str,
184        account_id: &str,
185    ) -> Result<(), RuntimeError> {
186        let (image, mut env, entry_point, command, awslogs_container, secrets_refs, has_task_role) = {
187            let accounts = state.read();
188            let s = accounts
189                .get(account_id)
190                .ok_or_else(|| RuntimeError::ContainerStart("account missing".into()))?;
191            let task = s
192                .tasks
193                .get(task_id)
194                .ok_or_else(|| RuntimeError::ContainerStart("task missing".into()))?;
195            let container = task
196                .containers
197                .first()
198                .ok_or_else(|| RuntimeError::ContainerStart("task has no containers".into()))?;
199            let def = find_container_definition(s, &task.family, task.revision, &container.name);
200            let secrets = def
201                .as_ref()
202                .and_then(|d| d.get("secrets").and_then(|v| v.as_array()).cloned())
203                .map(|arr| {
204                    arr.iter()
205                        .filter_map(|e| {
206                            let name = e.get("name").and_then(|v| v.as_str())?.to_string();
207                            let value_from =
208                                e.get("valueFrom").and_then(|v| v.as_str())?.to_string();
209                            Some((name, value_from))
210                        })
211                        .collect::<Vec<_>>()
212                })
213                .unwrap_or_default();
214            let str_array = |key: &str| -> Vec<String> {
215                def.as_ref()
216                    .and_then(|d| d.get(key).and_then(|v| v.as_array()).cloned())
217                    .map(|arr| {
218                        arr.iter()
219                            .filter_map(|v| v.as_str().map(String::from))
220                            .collect::<Vec<_>>()
221                    })
222                    .unwrap_or_default()
223            };
224            (
225                container.image.clone(),
226                def.as_ref()
227                    .and_then(|d| d.get("environment").and_then(|v| v.as_array()).cloned())
228                    .map(|arr| {
229                        arr.iter()
230                            .filter_map(|e| {
231                                let k = e.get("name").and_then(|v| v.as_str())?;
232                                let v = e.get("value").and_then(|v| v.as_str()).unwrap_or("");
233                                Some((k.to_string(), v.to_string()))
234                            })
235                            .collect::<Vec<_>>()
236                    })
237                    .unwrap_or_default(),
238                str_array("entryPoint"),
239                str_array("command"),
240                container.name.clone(),
241                secrets,
242                task.task_role_arn.is_some(),
243            )
244        };
245
246        // Resolve `containerDefinition.secrets[]` entries. Each entry is
247        // `{name, valueFrom}`; `valueFrom` is either a SecretsManager
248        // secret ARN (`arn:aws:secretsmanager:...:secret:name-AbCdEf`)
249        // or an SSM parameter ARN (`arn:aws:ssm:...:parameter/name`).
250        // Both are looked up synchronously against the in-process shared
251        // state and appended as env vars. Failed lookups fail the task —
252        // matching real ECS's "failed to retrieve secret" behaviour.
253        for (name, value_from) in &secrets_refs {
254            let resolved = self.resolve_secret(account_id, value_from);
255            match resolved {
256                Some(v) => env.push((name.clone(), v)),
257                None => {
258                    return Err(RuntimeError::ContainerStart(format!(
259                        "failed to resolve secret {name} from {value_from}"
260                    )))
261                }
262            }
263        }
264
265        // Inject `AWS_CONTAINER_CREDENTIALS_FULL_URI` when the task has
266        // a `taskRoleArn`. AWS SDKs pick this up via the default
267        // credential-provider chain. The endpoint runs on the main
268        // fakecloud server; the container reaches it via
269        // `host.docker.internal:<port>`.
270        if has_task_role {
271            env.push((
272                "AWS_CONTAINER_CREDENTIALS_FULL_URI".into(),
273                format!(
274                    "http://host.docker.internal:{}/_fakecloud/ecs/creds/{}",
275                    self.server_port, task_id
276                ),
277            ));
278        }
279
280        // Pull the image first so we can surface pull errors cleanly.
281        // AWS private-ECR URIs (`<acct>.dkr.ecr.<region>.amazonaws.com/...`)
282        // are translated to fakecloud's local OCI v2 endpoint; after
283        // pulling the local URI we retag it to the AWS URI so the
284        // container and task state carry the user-facing image reference.
285        mark_pull_started(state, account_id, task_id);
286        let local_pull_uri = fakecloud_core::ecr_uri::translate_to_local(&image, self.server_port);
287        let pull_uri = local_pull_uri.as_deref().unwrap_or(&image);
288        let pull_out = self
289            .cli_command()
290            .args(["pull", pull_uri])
291            .output()
292            .await
293            .map_err(|e| RuntimeError::ImagePull(e.to_string()))?;
294        if !pull_out.status.success() {
295            let err = String::from_utf8_lossy(&pull_out.stderr).to_string();
296            return Err(RuntimeError::ImagePull(err));
297        }
298        // Retag the local pull URI to the AWS URI so `docker run` finds
299        // the image under the user-facing name. Digest-pinned refs can't
300        // be `docker tag` targets, so we fall through and run under the
301        // local URI in that case (cosmetic tradeoff — the task's image
302        // field will show the 127.0.0.1 URI instead of the AWS digest).
303        let run_image = if let Some(ref local_uri) = local_pull_uri {
304            if fakecloud_core::ecr_uri::is_digest_ref(&image) {
305                local_uri.clone()
306            } else {
307                let _ = self
308                    .cli_command()
309                    .args(["tag", local_uri, &image])
310                    .output()
311                    .await;
312                image.clone()
313            }
314        } else {
315            image.clone()
316        };
317        mark_pull_stopped(state, account_id, task_id);
318
319        // Run the container detached so we can track its ID and wait
320        // asynchronously. `-d` prints the container ID on stdout.
321        let mut cmd = Command::new(&self.cli);
322        cmd.args(["run", "-d"])
323            .args(["--label", &format!("fakecloud-ecs-task={}", task_id)])
324            .args([
325                "--add-host",
326                &format!("host.docker.internal:{}", self.host_ip),
327            ]);
328        for (k, v) in &env {
329            let transformed = v
330                .replace("http://127.0.0.1:", "http://host.docker.internal:")
331                .replace("https://127.0.0.1:", "https://host.docker.internal:")
332                .replace("http://localhost:", "http://host.docker.internal:")
333                .replace("https://localhost:", "https://host.docker.internal:");
334            cmd.arg("-e").arg(format!("{}={}", k, transformed));
335        }
336        // `containerDefinition.entryPoint` overrides the image's ENTRYPOINT.
337        // Docker CLI's `--entrypoint` only takes a single executable; any
338        // additional entryPoint elements are appended as positional args
339        // before the user-supplied `command[]`, which matches how docker
340        // composes ENTRYPOINT + CMD at exec time.
341        if let Some(first) = entry_point.first() {
342            cmd.args(["--entrypoint", first]);
343        }
344        cmd.arg(&run_image);
345        for arg in entry_point.iter().skip(1) {
346            cmd.arg(arg);
347        }
348        for arg in &command {
349            cmd.arg(arg);
350        }
351        let run_out = cmd
352            .output()
353            .await
354            .map_err(|e| RuntimeError::ContainerStart(e.to_string()))?;
355        if !run_out.status.success() {
356            let err = String::from_utf8_lossy(&run_out.stderr).to_string();
357            return Err(RuntimeError::ContainerStart(err));
358        }
359        let container_id = String::from_utf8_lossy(&run_out.stdout).trim().to_string();
360        self.containers
361            .write()
362            .insert(task_id.to_string(), container_id.clone());
363        mark_running(
364            state,
365            account_id,
366            task_id,
367            &container_id,
368            &awslogs_container,
369        );
370        self.emit_state_change(state, account_id, task_id, "RUNNING", None);
371
372        // Wait for the container to exit. `docker wait` blocks until exit
373        // and prints the numeric exit code to stdout.
374        let wait_out = Command::new(&self.cli)
375            .args(["wait", &container_id])
376            .output()
377            .await
378            .map_err(|e| RuntimeError::Wait(e.to_string()))?;
379        if !wait_out.status.success() {
380            // `docker wait` itself failed — treat this as a Wait error so
381            // the task flips via `finalize_failure` rather than silently
382            // claiming the container exited normally.
383            let err = String::from_utf8_lossy(&wait_out.stderr).to_string();
384            return Err(RuntimeError::Wait(err));
385        }
386        let exit_code: i64 = String::from_utf8_lossy(&wait_out.stdout)
387            .trim()
388            .parse()
389            .unwrap_or(-1);
390
391        // Capture combined stdout+stderr via `docker logs`.
392        let logs_out = Command::new(&self.cli)
393            .args(["logs", &container_id])
394            .output()
395            .await
396            .map_err(|e| RuntimeError::Wait(e.to_string()))?;
397        let mut captured = String::new();
398        captured.push_str(&String::from_utf8_lossy(&logs_out.stdout));
399        captured.push_str(&String::from_utf8_lossy(&logs_out.stderr));
400
401        // Best-effort cleanup; failures here shouldn't keep the task from
402        // transitioning to STOPPED.
403        let _ = Command::new(&self.cli)
404            .args(["rm", &container_id])
405            .output()
406            .await;
407        self.containers.write().remove(task_id);
408
409        // Forward logs BEFORE flipping the task to STOPPED so a client
410        // that polls DescribeTasks and immediately queries DescribeLogStreams
411        // can't observe the STOPPED transition before the awslogs group/stream
412        // has been materialised.
413        self.forward_awslogs_if_configured(state, account_id, task_id, &captured);
414        finalize_stopped(
415            state,
416            account_id,
417            task_id,
418            exit_code,
419            &captured,
420            "EssentialContainerExited",
421            None,
422        );
423        self.emit_state_change(
424            state,
425            account_id,
426            task_id,
427            "STOPPED",
428            Some((
429                "EssentialContainerExited",
430                format!("Exit code {}", exit_code),
431            )),
432        );
433        Ok(())
434    }
435
436    /// Resolve a `secrets[].valueFrom` reference to the actual secret
437    /// payload. Supports SecretsManager secret ARNs and SSM parameter
438    /// ARNs; returns `None` when the referenced state isn't wired or
439    /// the lookup misses.
440    fn resolve_secret(&self, account_id: &str, value_from: &str) -> Option<String> {
441        if value_from.contains(":secret:") {
442            let state = self.secretsmanager_state.as_ref()?;
443            let accounts = state.read();
444            let sm = accounts.get(account_id)?;
445            // ARN shape: arn:aws:secretsmanager:<region>:<acct>:secret:<name>-<6char>
446            // Stored key is the secret name (no suffix). Strip the
447            // AWS-generated 6-char suffix when comparing.
448            let arn_tail = value_from.rsplit(":secret:").next()?;
449            let name = arn_tail
450                .rsplit_once('-')
451                .map(|(n, _)| n)
452                .unwrap_or(arn_tail);
453            let secret = sm.secrets.get(name).or_else(|| sm.secrets.get(arn_tail))?;
454            let version_id = secret.current_version_id.as_ref()?;
455            let v = secret.versions.get(version_id)?;
456            return v.secret_string.clone();
457        }
458        if value_from.contains(":parameter") {
459            let state = self.ssm_state.as_ref()?;
460            let accounts = state.read();
461            let ssm = accounts.get(account_id)?;
462            // ARN shape: arn:aws:ssm:<region>:<acct>:parameter/<name>
463            // Parameters are stored keyed by name (with leading slash)
464            // or without, depending on how they were created. Try both.
465            let after = value_from.rsplit(":parameter").next()?;
466            let name_with_slash = after.trim_start_matches('/');
467            return ssm
468                .parameters
469                .get(&format!("/{name_with_slash}"))
470                .or_else(|| ssm.parameters.get(name_with_slash))
471                .map(|p| p.value.clone());
472        }
473        None
474    }
475
476    /// Emit an `ECS Task State Change` EventBridge event. No-op when no
477    /// delivery bus is wired. Matches AWS event shape so downstream
478    /// rules can filter on `detail.lastStatus`, `detail.stopCode`, etc.
479    fn emit_state_change(
480        &self,
481        state: &SharedEcsState,
482        account_id: &str,
483        task_id: &str,
484        last_status: &str,
485        stop: Option<(&str, String)>,
486    ) {
487        let Some(ref bus) = self.delivery_bus else {
488            return;
489        };
490        let Some(task_view) = snapshot_task(state, account_id, task_id) else {
491            return;
492        };
493        let mut detail = serde_json::json!({
494            "taskArn": task_view.task_arn,
495            "clusterArn": task_view.cluster_arn,
496            "lastStatus": last_status,
497            "desiredStatus": if last_status == "STOPPED" { "STOPPED" } else { "RUNNING" },
498            "launchType": task_view.launch_type,
499            "group": task_view.group,
500            "taskDefinitionArn": task_view.task_definition_arn,
501            "containers": task_view.containers,
502        });
503        if let Some((code, reason)) = stop {
504            detail["stopCode"] = code.into();
505            detail["stoppedReason"] = reason.into();
506        }
507        bus.put_event_to_eventbridge(
508            "aws.ecs",
509            "ECS Task State Change",
510            &detail.to_string(),
511            "default",
512        );
513    }
514
515    /// Forward captured stdout/stderr to CloudWatch Logs when the task's
516    /// container definition declares the `awslogs` log driver. No-op when
517    /// logs_state isn't wired or the task has no awslogs config.
518    fn forward_awslogs_if_configured(
519        &self,
520        state: &SharedEcsState,
521        account_id: &str,
522        task_id: &str,
523        captured: &str,
524    ) {
525        let Some(ref logs) = self.logs_state else {
526            return;
527        };
528        // Clone out of the read guard so we don't hold it across the logs
529        // state write.
530        let (cfg, task_region) = {
531            let accounts = state.read();
532            let Some(s) = accounts.get(account_id) else {
533                return;
534            };
535            let Some(task) = s.tasks.get(task_id) else {
536                return;
537            };
538            let Some(ref cfg) = task.awslogs else {
539                return;
540            };
541            (cfg.clone(), s.region.clone())
542        };
543        if captured.is_empty() {
544            return;
545        }
546        let now = Utc::now().timestamp_millis();
547        let stream_name = cfg.stream_name(task_id);
548        let events: Vec<IngestEvent> = captured
549            .lines()
550            .enumerate()
551            .map(|(i, line)| IngestEvent {
552                // Stagger within the same millisecond so CloudWatch's
553                // chronological-order invariant holds without relying on
554                // the host clock's resolution.
555                timestamp_ms: now.saturating_add(i as i64),
556                message: line.to_string(),
557            })
558            .collect();
559        append_events(
560            logs,
561            account_id,
562            &task_region,
563            &cfg.group,
564            &stream_name,
565            &events,
566        );
567    }
568
569    /// Kill the container behind a task (if any) with the configured stop
570    /// timeout. Returns true if a container was killed. Called synchronously
571    /// from `StopTask`; the wait loop in `run_task_inner` observes the
572    /// exit and transitions the task to `STOPPED`.
573    pub async fn stop_task(&self, task_id: &str, reason: &str) -> bool {
574        let container_id = self.containers.read().get(task_id).cloned();
575        let Some(id) = container_id else {
576            return false;
577        };
578        // `docker stop` sends SIGTERM then SIGKILL after a timeout.
579        let _ = Command::new(&self.cli)
580            .args(["stop", "--time", "10", &id])
581            .output()
582            .await;
583        tracing::info!(task = %task_id, reason = %reason, "ecs task stop requested");
584        true
585    }
586
587    /// Kill every running container the runtime owns. Called on reset /
588    /// shutdown so docker state matches fakecloud state after a fresh
589    /// boot.
590    pub async fn stop_all(&self) {
591        let ids: Vec<String> = self.containers.read().values().cloned().collect();
592        for id in ids {
593            let _ = Command::new(&self.cli).args(["kill", &id]).output().await;
594            let _ = Command::new(&self.cli).args(["rm", &id]).output().await;
595        }
596        self.containers.write().clear();
597    }
598}
599
600struct TaskSnapshot {
601    task_arn: String,
602    cluster_arn: String,
603    launch_type: String,
604    group: Option<String>,
605    task_definition_arn: String,
606    containers: serde_json::Value,
607}
608
609fn snapshot_task(state: &SharedEcsState, account_id: &str, task_id: &str) -> Option<TaskSnapshot> {
610    let accounts = state.read();
611    let s = accounts.get(account_id)?;
612    let task = s.tasks.get(task_id)?;
613    Some(TaskSnapshot {
614        task_arn: task.task_arn.clone(),
615        cluster_arn: task.cluster_arn.clone(),
616        launch_type: task.launch_type.clone(),
617        group: task.group.clone(),
618        task_definition_arn: task.task_definition_arn.clone(),
619        containers: serde_json::Value::Array(
620            task.containers
621                .iter()
622                .map(|c| {
623                    serde_json::json!({
624                        "containerArn": c.container_arn,
625                        "name": c.name,
626                        "image": c.image,
627                        "lastStatus": c.last_status,
628                        "exitCode": c.exit_code,
629                        "reason": c.reason,
630                    })
631                })
632                .collect(),
633        ),
634    })
635}
636
637/// Unused silencer: keep `Task` in scope for future snapshot extensions.
638#[allow(dead_code)]
639fn _task_type_anchor(_t: &Task) {}
640
641fn cli_works(cli: &str) -> bool {
642    std::process::Command::new(cli)
643        .arg("info")
644        .stdout(std::process::Stdio::null())
645        .stderr(std::process::Stdio::null())
646        .status()
647        .map(|s| s.success())
648        .unwrap_or(false)
649}
650
651/// Build an isolated docker config directory with Basic auth for
652/// fakecloud ECR at `127.0.0.1:<port>`. Lets `docker pull/push/tag`
653/// work against the local OCI v2 registry without requiring the user
654/// to run `aws ecr get-login-password | docker login` first.
655fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
656    let dir = TempDir::new().ok()?;
657    let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-ecs-runtime");
658    let config = serde_json::json!({
659        "auths": {
660            format!("127.0.0.1:{server_port}"): { "auth": auth },
661        }
662    });
663    std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
664    Some(dir)
665}
666
667fn find_container_definition(
668    state: &crate::state::EcsState,
669    family: &str,
670    revision: i32,
671    name: &str,
672) -> Option<serde_json::Value> {
673    state
674        .task_definitions
675        .get(family)?
676        .get(&revision)?
677        .container_definitions
678        .iter()
679        .find(|c| c.get("name").and_then(|v| v.as_str()) == Some(name))
680        .cloned()
681}
682
683fn mark_pull_started(state: &SharedEcsState, account_id: &str, task_id: &str) {
684    let mut accounts = state.write();
685    let Some(s) = accounts.get_mut(account_id) else {
686        return;
687    };
688    let task_arn_cluster = s
689        .tasks
690        .get(task_id)
691        .map(|t| (t.task_arn.clone(), t.cluster_arn.clone()));
692    if let Some(task) = s.tasks.get_mut(task_id) {
693        task.pull_started_at = Some(Utc::now());
694    }
695    if let Some((arn, cluster_arn)) = task_arn_cluster {
696        s.push_event(LifecycleEvent {
697            at: Utc::now(),
698            event_type: "PullStarted".into(),
699            task_arn: Some(arn),
700            cluster_arn: Some(cluster_arn),
701            last_status: Some("PENDING".into()),
702            detail: serde_json::json!({}),
703        });
704    }
705}
706
707fn mark_pull_stopped(state: &SharedEcsState, account_id: &str, task_id: &str) {
708    let mut accounts = state.write();
709    let Some(s) = accounts.get_mut(account_id) else {
710        return;
711    };
712    if let Some(task) = s.tasks.get_mut(task_id) {
713        task.pull_stopped_at = Some(Utc::now());
714    }
715}
716
717fn mark_running(
718    state: &SharedEcsState,
719    account_id: &str,
720    task_id: &str,
721    container_id: &str,
722    container_name: &str,
723) {
724    let mut accounts = state.write();
725    let Some(s) = accounts.get_mut(account_id) else {
726        return;
727    };
728    let (arn, cluster_arn) = {
729        let Some(task) = s.tasks.get_mut(task_id) else {
730            return;
731        };
732        task.last_status = "RUNNING".into();
733        task.connectivity = "CONNECTED".into();
734        task.connectivity_at = Some(Utc::now());
735        task.started_at = Some(Utc::now());
736        if let Some(c) = task
737            .containers
738            .iter_mut()
739            .find(|c| c.name == container_name)
740        {
741            c.runtime_id = Some(container_id.into());
742            c.last_status = "RUNNING".into();
743        }
744        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
745            cluster.running_tasks_count += 1;
746            if cluster.pending_tasks_count > 0 {
747                cluster.pending_tasks_count -= 1;
748            }
749        }
750        (task.task_arn.clone(), task.cluster_arn.clone())
751    };
752    s.push_event(LifecycleEvent {
753        at: Utc::now(),
754        event_type: "TaskStateChange".into(),
755        task_arn: Some(arn),
756        cluster_arn: Some(cluster_arn),
757        last_status: Some("RUNNING".into()),
758        detail: serde_json::json!({}),
759    });
760}
761
762fn finalize_stopped(
763    state: &SharedEcsState,
764    account_id: &str,
765    task_id: &str,
766    exit_code: i64,
767    captured: &str,
768    stop_code: &str,
769    stopped_reason: Option<String>,
770) {
771    let mut accounts = state.write();
772    let Some(s) = accounts.get_mut(account_id) else {
773        return;
774    };
775    let (arn, cluster_arn) = {
776        let Some(task) = s.tasks.get_mut(task_id) else {
777            return;
778        };
779        task.last_status = "STOPPED".into();
780        task.desired_status = "STOPPED".into();
781        task.stopping_at = task.stopping_at.or(Some(Utc::now()));
782        task.stopped_at = Some(Utc::now());
783        task.stop_code = Some(stop_code.into());
784        task.stopped_reason = stopped_reason.or(Some(format!("Exit code {}", exit_code)));
785        task.captured_logs = captured.to_string();
786        for c in task.containers.iter_mut() {
787            c.last_status = "STOPPED".into();
788            if c.exit_code.is_none() {
789                c.exit_code = Some(exit_code);
790            }
791        }
792        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
793            if cluster.running_tasks_count > 0 {
794                cluster.running_tasks_count -= 1;
795            }
796        }
797        (task.task_arn.clone(), task.cluster_arn.clone())
798    };
799    s.push_event(LifecycleEvent {
800        at: Utc::now(),
801        event_type: "TaskStateChange".into(),
802        task_arn: Some(arn),
803        cluster_arn: Some(cluster_arn),
804        last_status: Some("STOPPED".into()),
805        detail: serde_json::json!({
806            "exitCode": exit_code,
807            "stopCode": stop_code,
808        }),
809    });
810}
811
812fn finalize_failure(state: &SharedEcsState, account_id: &str, task_id: &str, reason: &str) {
813    let mut accounts = state.write();
814    let Some(s) = accounts.get_mut(account_id) else {
815        return;
816    };
817    let (arn, cluster_arn) = {
818        let Some(task) = s.tasks.get_mut(task_id) else {
819            return;
820        };
821        // Capture the prior status before we clobber it: if the task had
822        // already reached RUNNING when execution failed (e.g. `docker wait`
823        // blew up after the container started), we owe the cluster a
824        // running-tasks decrement. Tasks that died before RUNNING only
825        // ever incremented pendingTasksCount.
826        let was_running = task.last_status == "RUNNING";
827        task.last_status = "STOPPED".into();
828        task.desired_status = "STOPPED".into();
829        task.stopped_at = Some(Utc::now());
830        task.stop_code = Some("TaskFailedToStart".into());
831        task.stopped_reason = Some(reason.to_string());
832        for c in task.containers.iter_mut() {
833            c.last_status = "STOPPED".into();
834            c.reason = Some(reason.to_string());
835        }
836        if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
837            if was_running {
838                if cluster.running_tasks_count > 0 {
839                    cluster.running_tasks_count -= 1;
840                }
841            } else if cluster.pending_tasks_count > 0 {
842                cluster.pending_tasks_count -= 1;
843            }
844        }
845        (task.task_arn.clone(), task.cluster_arn.clone())
846    };
847    s.push_event(LifecycleEvent {
848        at: Utc::now(),
849        event_type: "TaskFailedToStart".into(),
850        task_arn: Some(arn),
851        cluster_arn: Some(cluster_arn),
852        last_status: Some("STOPPED".into()),
853        detail: serde_json::json!({ "reason": reason }),
854    });
855}
856
857/// Short helper for tests + snapshot code to sleep between state
858/// transitions. Exposed on the crate boundary to keep test timing
859/// centralized.
860pub async fn sleep(duration: Duration) {
861    tokio::time::sleep(duration).await;
862}
863
864#[cfg(test)]
865mod tests {
866    use super::*;
867
868    #[test]
869    fn cli_works_for_known_missing_binary_is_false() {
870        assert!(!cli_works("definitely-not-a-real-cli-binary-xyz"));
871    }
872
873    #[test]
874    fn aws_ecr_uris_translate_for_local_pull() {
875        assert_eq!(
876            fakecloud_core::ecr_uri::translate_to_local(
877                "123456789012.dkr.ecr.us-east-1.amazonaws.com/app:latest",
878                4566
879            )
880            .as_deref(),
881            Some("127.0.0.1:4566/app:latest")
882        );
883    }
884}