1use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use base64::Engine;
15use chrono::Utc;
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_logs::ingest::{append_events, IngestEvent};
18use fakecloud_logs::state::SharedLogsState;
19use fakecloud_secretsmanager::state::SharedSecretsManagerState;
20use fakecloud_ssm::state::SharedSsmState;
21use parking_lot::RwLock;
22use tempfile::TempDir;
23use tokio::process::Command;
24
25use crate::state::{LifecycleEvent, SharedEcsState};
26
27#[derive(Debug, thiserror::Error)]
28pub enum RuntimeError {
29 #[error("container CLI not found (tried docker, podman)")]
30 NoCli,
31 #[error("image pull failed: {0}")]
32 ImagePull(String),
33 #[error("container start failed: {0}")]
34 ContainerStart(String),
35 #[error("docker wait failed: {0}")]
36 Wait(String),
37}
38
39pub struct EcsRuntime {
41 cli: String,
42 host_ip: String,
43 server_port: u16,
48 docker_config: Option<Arc<TempDir>>,
53 containers: RwLock<std::collections::HashMap<String, String>>,
56 delivery_bus: Option<Arc<DeliveryBus>>,
60 logs_state: Option<SharedLogsState>,
64 secretsmanager_state: Option<SharedSecretsManagerState>,
67 ssm_state: Option<SharedSsmState>,
70}
71
72impl EcsRuntime {
73 pub fn new(server_port: u16) -> Option<Self> {
78 let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
79 if cli_works(&cli) {
80 cli
81 } else {
82 return None;
83 }
84 } else if cli_works("docker") {
85 "docker".to_string()
86 } else if cli_works("podman") {
87 "podman".to_string()
88 } else {
89 return None;
90 };
91 let host_ip = if cfg!(target_os = "linux") {
92 "172.17.0.1".to_string()
93 } else {
94 "host-gateway".to_string()
95 };
96 let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
97 Some(Self {
98 cli,
99 host_ip,
100 server_port,
101 docker_config,
102 containers: RwLock::new(std::collections::HashMap::new()),
103 delivery_bus: None,
104 logs_state: None,
105 secretsmanager_state: None,
106 ssm_state: None,
107 })
108 }
109
110 fn docker_config_path(&self) -> Option<PathBuf> {
114 self.docker_config.as_ref().map(|d| d.path().to_path_buf())
115 }
116
117 fn cli_command(&self) -> Command {
120 let mut cmd = Command::new(&self.cli);
121 if let Some(p) = self.docker_config_path() {
122 cmd.env("DOCKER_CONFIG", p);
123 }
124 cmd
125 }
126
127 pub fn cli_name(&self) -> &str {
128 &self.cli
129 }
130
131 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
134 self.delivery_bus = Some(bus);
135 self
136 }
137
138 pub fn with_logs(mut self, logs: SharedLogsState) -> Self {
141 self.logs_state = Some(logs);
142 self
143 }
144
145 pub fn with_secretsmanager(mut self, state: SharedSecretsManagerState) -> Self {
148 self.secretsmanager_state = Some(state);
149 self
150 }
151
152 pub fn with_ssm(mut self, state: SharedSsmState) -> Self {
155 self.ssm_state = Some(state);
156 self
157 }
158
159 pub fn run_task(self: Arc<Self>, state: SharedEcsState, task_id: String, account_id: String) {
164 let rt = self.clone();
165 tokio::spawn(async move {
166 if let Err(err) = rt.run_task_inner(&state, &task_id, &account_id).await {
167 tracing::warn!(%err, task = %task_id, "ecs task execution failed");
168 eprintln!("[ecs] task {task_id} failed: {err}");
171 finalize_failure(&state, &account_id, &task_id, &err.to_string());
172 rt.emit_state_change(
173 &state,
174 &account_id,
175 &task_id,
176 "STOPPED",
177 Some(("TaskFailedToStart", err.to_string())),
178 );
179 }
180 });
181 }
182
183 async fn run_task_inner(
184 &self,
185 state: &SharedEcsState,
186 task_id: &str,
187 account_id: &str,
188 ) -> Result<(), RuntimeError> {
189 let (image, mut env, entry_point, command, awslogs_container, secrets_refs, has_task_role) = {
190 let accounts = state.read();
191 let s = accounts
192 .get(account_id)
193 .ok_or_else(|| RuntimeError::ContainerStart("account missing".into()))?;
194 let task = s
195 .tasks
196 .get(task_id)
197 .ok_or_else(|| RuntimeError::ContainerStart("task missing".into()))?;
198 let container = task
199 .containers
200 .first()
201 .ok_or_else(|| RuntimeError::ContainerStart("task has no containers".into()))?;
202 let def = find_container_definition(s, &task.family, task.revision, &container.name);
203 let secrets = def
204 .as_ref()
205 .and_then(|d| d.get("secrets").and_then(|v| v.as_array()).cloned())
206 .map(|arr| {
207 arr.iter()
208 .filter_map(|e| {
209 let name = e.get("name").and_then(|v| v.as_str())?.to_string();
210 let value_from =
211 e.get("valueFrom").and_then(|v| v.as_str())?.to_string();
212 Some((name, value_from))
213 })
214 .collect::<Vec<_>>()
215 })
216 .unwrap_or_default();
217 let str_array = |key: &str| -> Vec<String> {
218 def.as_ref()
219 .and_then(|d| d.get(key).and_then(|v| v.as_array()).cloned())
220 .map(|arr| {
221 arr.iter()
222 .filter_map(|v| v.as_str().map(String::from))
223 .collect::<Vec<_>>()
224 })
225 .unwrap_or_default()
226 };
227 (
228 container.image.clone(),
229 def.as_ref()
230 .and_then(|d| d.get("environment").and_then(|v| v.as_array()).cloned())
231 .map(|arr| {
232 arr.iter()
233 .filter_map(|e| {
234 let k = e.get("name").and_then(|v| v.as_str())?;
235 let v = e.get("value").and_then(|v| v.as_str()).unwrap_or("");
236 Some((k.to_string(), v.to_string()))
237 })
238 .collect::<Vec<_>>()
239 })
240 .unwrap_or_default(),
241 str_array("entryPoint"),
242 str_array("command"),
243 container.name.clone(),
244 secrets,
245 task.task_role_arn.is_some(),
246 )
247 };
248
249 for (name, value_from) in &secrets_refs {
257 let resolved = self.resolve_secret(account_id, value_from);
258 match resolved {
259 Some(v) => env.push((name.clone(), v)),
260 None => {
261 return Err(RuntimeError::ContainerStart(format!(
262 "failed to resolve secret {name} from {value_from}"
263 )))
264 }
265 }
266 }
267
268 if has_task_role {
274 env.push((
275 "AWS_CONTAINER_CREDENTIALS_FULL_URI".into(),
276 format!(
277 "http://host.docker.internal:{}/_fakecloud/ecs/creds/{}",
278 self.server_port, task_id
279 ),
280 ));
281 }
282
283 mark_pull_started(state, account_id, task_id);
289 let local_pull_uri = fakecloud_core::ecr_uri::translate_to_local(&image, self.server_port);
290 let pull_uri = local_pull_uri.as_deref().unwrap_or(&image);
291 let pull_out = self
292 .cli_command()
293 .args(["pull", pull_uri])
294 .output()
295 .await
296 .map_err(|e| RuntimeError::ImagePull(e.to_string()))?;
297 if !pull_out.status.success() {
298 let err = String::from_utf8_lossy(&pull_out.stderr).to_string();
299 return Err(RuntimeError::ImagePull(err));
300 }
301 let run_image = if let Some(ref local_uri) = local_pull_uri {
307 if fakecloud_core::ecr_uri::is_digest_ref(&image) {
308 local_uri.clone()
309 } else {
310 let _ = self
311 .cli_command()
312 .args(["tag", local_uri, &image])
313 .output()
314 .await;
315 image.clone()
316 }
317 } else {
318 image.clone()
319 };
320 mark_pull_stopped(state, account_id, task_id);
321
322 let mut cmd = Command::new(&self.cli);
325 cmd.args(["run", "-d"])
326 .args(["--label", &format!("fakecloud-ecs-task={}", task_id)])
327 .args([
328 "--add-host",
329 &format!("host.docker.internal:{}", self.host_ip),
330 ]);
331 for (k, v) in &env {
332 let transformed = v
333 .replace("http://127.0.0.1:", "http://host.docker.internal:")
334 .replace("https://127.0.0.1:", "https://host.docker.internal:")
335 .replace("http://localhost:", "http://host.docker.internal:")
336 .replace("https://localhost:", "https://host.docker.internal:");
337 cmd.arg("-e").arg(format!("{}={}", k, transformed));
338 }
339 if let Some(first) = entry_point.first() {
345 cmd.args(["--entrypoint", first]);
346 }
347 cmd.arg(&run_image);
348 for arg in entry_point.iter().skip(1) {
349 cmd.arg(arg);
350 }
351 for arg in &command {
352 cmd.arg(arg);
353 }
354 let run_out = cmd
355 .output()
356 .await
357 .map_err(|e| RuntimeError::ContainerStart(e.to_string()))?;
358 if !run_out.status.success() {
359 let err = String::from_utf8_lossy(&run_out.stderr).to_string();
360 return Err(RuntimeError::ContainerStart(err));
361 }
362 let container_id = String::from_utf8_lossy(&run_out.stdout).trim().to_string();
363 self.containers
364 .write()
365 .insert(task_id.to_string(), container_id.clone());
366 mark_running(
367 state,
368 account_id,
369 task_id,
370 &container_id,
371 &awslogs_container,
372 );
373 self.emit_state_change(state, account_id, task_id, "RUNNING", None);
374
375 let wait_out = Command::new(&self.cli)
378 .args(["wait", &container_id])
379 .output()
380 .await
381 .map_err(|e| RuntimeError::Wait(e.to_string()))?;
382 if !wait_out.status.success() {
383 let err = String::from_utf8_lossy(&wait_out.stderr).to_string();
387 return Err(RuntimeError::Wait(err));
388 }
389 let exit_code: i64 = String::from_utf8_lossy(&wait_out.stdout)
390 .trim()
391 .parse()
392 .unwrap_or(-1);
393
394 let logs_out = Command::new(&self.cli)
396 .args(["logs", &container_id])
397 .output()
398 .await
399 .map_err(|e| RuntimeError::Wait(e.to_string()))?;
400 let mut captured = String::new();
401 captured.push_str(&String::from_utf8_lossy(&logs_out.stdout));
402 captured.push_str(&String::from_utf8_lossy(&logs_out.stderr));
403
404 let _ = Command::new(&self.cli)
407 .args(["rm", &container_id])
408 .output()
409 .await;
410 self.containers.write().remove(task_id);
411
412 self.forward_awslogs_if_configured(state, account_id, task_id, &captured);
417 finalize_stopped(
418 state,
419 account_id,
420 task_id,
421 exit_code,
422 &captured,
423 "EssentialContainerExited",
424 None,
425 );
426 self.emit_state_change(
427 state,
428 account_id,
429 task_id,
430 "STOPPED",
431 Some((
432 "EssentialContainerExited",
433 format!("Exit code {}", exit_code),
434 )),
435 );
436 Ok(())
437 }
438
439 fn resolve_secret(&self, account_id: &str, value_from: &str) -> Option<String> {
444 if value_from.contains(":secret:") {
445 let state = self.secretsmanager_state.as_ref()?;
446 let accounts = state.read();
447 let sm = accounts.get(account_id)?;
448 let arn_tail = value_from.rsplit(":secret:").next()?;
452 let name = arn_tail
453 .rsplit_once('-')
454 .map(|(n, _)| n)
455 .unwrap_or(arn_tail);
456 let secret = sm.secrets.get(name).or_else(|| sm.secrets.get(arn_tail))?;
457 let version_id = secret.current_version_id.as_ref()?;
458 let v = secret.versions.get(version_id)?;
459 return v.secret_string.clone();
460 }
461 if value_from.contains(":parameter") {
462 let state = self.ssm_state.as_ref()?;
463 let accounts = state.read();
464 let ssm = accounts.get(account_id)?;
465 let after = value_from.rsplit(":parameter").next()?;
469 let name_with_slash = after.trim_start_matches('/');
470 return ssm
471 .parameters
472 .get(&format!("/{name_with_slash}"))
473 .or_else(|| ssm.parameters.get(name_with_slash))
474 .map(|p| p.value.clone());
475 }
476 None
477 }
478
479 fn emit_state_change(
483 &self,
484 state: &SharedEcsState,
485 account_id: &str,
486 task_id: &str,
487 last_status: &str,
488 stop: Option<(&str, String)>,
489 ) {
490 let Some(ref bus) = self.delivery_bus else {
491 return;
492 };
493 let Some(task_view) = snapshot_task(state, account_id, task_id) else {
494 return;
495 };
496 let mut detail = serde_json::json!({
497 "taskArn": task_view.task_arn,
498 "clusterArn": task_view.cluster_arn,
499 "lastStatus": last_status,
500 "desiredStatus": if last_status == "STOPPED" { "STOPPED" } else { "RUNNING" },
501 "launchType": task_view.launch_type,
502 "group": task_view.group,
503 "taskDefinitionArn": task_view.task_definition_arn,
504 "containers": task_view.containers,
505 });
506 if let Some((code, reason)) = stop {
507 detail["stopCode"] = code.into();
508 detail["stoppedReason"] = reason.into();
509 }
510 bus.put_event_to_eventbridge(
511 "aws.ecs",
512 "ECS Task State Change",
513 &detail.to_string(),
514 "default",
515 );
516 }
517
518 fn forward_awslogs_if_configured(
522 &self,
523 state: &SharedEcsState,
524 account_id: &str,
525 task_id: &str,
526 captured: &str,
527 ) {
528 let Some(ref logs) = self.logs_state else {
529 return;
530 };
531 let (cfg, task_region) = {
534 let accounts = state.read();
535 let Some(s) = accounts.get(account_id) else {
536 return;
537 };
538 let Some(task) = s.tasks.get(task_id) else {
539 return;
540 };
541 let Some(ref cfg) = task.awslogs else {
542 return;
543 };
544 (cfg.clone(), s.region.clone())
545 };
546 if captured.is_empty() {
547 return;
548 }
549 let now = Utc::now().timestamp_millis();
550 let stream_name = cfg.stream_name(task_id);
551 let events: Vec<IngestEvent> = captured
552 .lines()
553 .enumerate()
554 .map(|(i, line)| IngestEvent {
555 timestamp_ms: now.saturating_add(i as i64),
559 message: line.to_string(),
560 })
561 .collect();
562 append_events(
563 logs,
564 account_id,
565 &task_region,
566 &cfg.group,
567 &stream_name,
568 &events,
569 );
570 }
571
572 pub async fn stop_task(&self, task_id: &str, reason: &str) -> bool {
577 let container_id = self.containers.read().get(task_id).cloned();
578 let Some(id) = container_id else {
579 return false;
580 };
581 let _ = Command::new(&self.cli)
583 .args(["stop", "--time", "10", &id])
584 .output()
585 .await;
586 tracing::info!(task = %task_id, reason = %reason, "ecs task stop requested");
587 true
588 }
589
590 pub async fn stop_all(&self) {
594 let ids: Vec<String> = self.containers.read().values().cloned().collect();
595 for id in ids {
596 let _ = Command::new(&self.cli).args(["kill", &id]).output().await;
597 let _ = Command::new(&self.cli).args(["rm", &id]).output().await;
598 }
599 self.containers.write().clear();
600 }
601}
602
603struct TaskSnapshot {
604 task_arn: String,
605 cluster_arn: String,
606 launch_type: String,
607 group: Option<String>,
608 task_definition_arn: String,
609 containers: serde_json::Value,
610}
611
612fn snapshot_task(state: &SharedEcsState, account_id: &str, task_id: &str) -> Option<TaskSnapshot> {
613 let accounts = state.read();
614 let s = accounts.get(account_id)?;
615 let task = s.tasks.get(task_id)?;
616 Some(TaskSnapshot {
617 task_arn: task.task_arn.clone(),
618 cluster_arn: task.cluster_arn.clone(),
619 launch_type: task.launch_type.clone(),
620 group: task.group.clone(),
621 task_definition_arn: task.task_definition_arn.clone(),
622 containers: serde_json::Value::Array(
623 task.containers
624 .iter()
625 .map(|c| {
626 serde_json::json!({
627 "containerArn": c.container_arn,
628 "name": c.name,
629 "image": c.image,
630 "lastStatus": c.last_status,
631 "exitCode": c.exit_code,
632 "reason": c.reason,
633 })
634 })
635 .collect(),
636 ),
637 })
638}
639
640fn cli_works(cli: &str) -> bool {
641 std::process::Command::new(cli)
642 .arg("info")
643 .stdout(std::process::Stdio::null())
644 .stderr(std::process::Stdio::null())
645 .status()
646 .map(|s| s.success())
647 .unwrap_or(false)
648}
649
650fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
655 let dir = TempDir::new().ok()?;
656 let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-ecs-runtime");
657 let config = serde_json::json!({
658 "auths": {
659 format!("127.0.0.1:{server_port}"): { "auth": auth },
660 }
661 });
662 std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
663 Some(dir)
664}
665
666fn find_container_definition(
667 state: &crate::state::EcsState,
668 family: &str,
669 revision: i32,
670 name: &str,
671) -> Option<serde_json::Value> {
672 state
673 .task_definitions
674 .get(family)?
675 .get(&revision)?
676 .container_definitions
677 .iter()
678 .find(|c| c.get("name").and_then(|v| v.as_str()) == Some(name))
679 .cloned()
680}
681
682fn mark_pull_started(state: &SharedEcsState, account_id: &str, task_id: &str) {
683 let mut accounts = state.write();
684 let Some(s) = accounts.get_mut(account_id) else {
685 return;
686 };
687 let task_arn_cluster = s
688 .tasks
689 .get(task_id)
690 .map(|t| (t.task_arn.clone(), t.cluster_arn.clone()));
691 if let Some(task) = s.tasks.get_mut(task_id) {
692 task.pull_started_at = Some(Utc::now());
693 }
694 if let Some((arn, cluster_arn)) = task_arn_cluster {
695 s.push_event(LifecycleEvent {
696 at: Utc::now(),
697 event_type: "PullStarted".into(),
698 task_arn: Some(arn),
699 cluster_arn: Some(cluster_arn),
700 last_status: Some("PENDING".into()),
701 detail: serde_json::json!({}),
702 });
703 }
704}
705
706fn mark_pull_stopped(state: &SharedEcsState, account_id: &str, task_id: &str) {
707 let mut accounts = state.write();
708 let Some(s) = accounts.get_mut(account_id) else {
709 return;
710 };
711 if let Some(task) = s.tasks.get_mut(task_id) {
712 task.pull_stopped_at = Some(Utc::now());
713 }
714}
715
716fn mark_running(
717 state: &SharedEcsState,
718 account_id: &str,
719 task_id: &str,
720 container_id: &str,
721 container_name: &str,
722) {
723 let mut accounts = state.write();
724 let Some(s) = accounts.get_mut(account_id) else {
725 return;
726 };
727 let (arn, cluster_arn) = {
728 let Some(task) = s.tasks.get_mut(task_id) else {
729 return;
730 };
731 task.last_status = "RUNNING".into();
732 task.connectivity = "CONNECTED".into();
733 task.connectivity_at = Some(Utc::now());
734 task.started_at = Some(Utc::now());
735 if let Some(c) = task
736 .containers
737 .iter_mut()
738 .find(|c| c.name == container_name)
739 {
740 c.runtime_id = Some(container_id.into());
741 c.last_status = "RUNNING".into();
742 }
743 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
744 cluster.running_tasks_count += 1;
745 if cluster.pending_tasks_count > 0 {
746 cluster.pending_tasks_count -= 1;
747 }
748 }
749 (task.task_arn.clone(), task.cluster_arn.clone())
750 };
751 s.push_event(LifecycleEvent {
752 at: Utc::now(),
753 event_type: "TaskStateChange".into(),
754 task_arn: Some(arn),
755 cluster_arn: Some(cluster_arn),
756 last_status: Some("RUNNING".into()),
757 detail: serde_json::json!({}),
758 });
759}
760
761fn finalize_stopped(
762 state: &SharedEcsState,
763 account_id: &str,
764 task_id: &str,
765 exit_code: i64,
766 captured: &str,
767 stop_code: &str,
768 stopped_reason: Option<String>,
769) {
770 let mut accounts = state.write();
771 let Some(s) = accounts.get_mut(account_id) else {
772 return;
773 };
774 let (arn, cluster_arn) = {
775 let Some(task) = s.tasks.get_mut(task_id) else {
776 return;
777 };
778 task.last_status = "STOPPED".into();
779 task.desired_status = "STOPPED".into();
780 task.stopping_at = task.stopping_at.or(Some(Utc::now()));
781 task.stopped_at = Some(Utc::now());
782 task.stop_code = Some(stop_code.into());
783 task.stopped_reason = stopped_reason.or(Some(format!("Exit code {}", exit_code)));
784 task.captured_logs = captured.to_string();
785 for c in task.containers.iter_mut() {
786 c.last_status = "STOPPED".into();
787 if c.exit_code.is_none() {
788 c.exit_code = Some(exit_code);
789 }
790 }
791 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
792 if cluster.running_tasks_count > 0 {
793 cluster.running_tasks_count -= 1;
794 }
795 }
796 (task.task_arn.clone(), task.cluster_arn.clone())
797 };
798 s.push_event(LifecycleEvent {
799 at: Utc::now(),
800 event_type: "TaskStateChange".into(),
801 task_arn: Some(arn),
802 cluster_arn: Some(cluster_arn),
803 last_status: Some("STOPPED".into()),
804 detail: serde_json::json!({
805 "exitCode": exit_code,
806 "stopCode": stop_code,
807 }),
808 });
809}
810
811fn finalize_failure(state: &SharedEcsState, account_id: &str, task_id: &str, reason: &str) {
812 let mut accounts = state.write();
813 let Some(s) = accounts.get_mut(account_id) else {
814 return;
815 };
816 let (arn, cluster_arn) = {
817 let Some(task) = s.tasks.get_mut(task_id) else {
818 return;
819 };
820 let was_running = task.last_status == "RUNNING";
826 task.last_status = "STOPPED".into();
827 task.desired_status = "STOPPED".into();
828 task.stopped_at = Some(Utc::now());
829 task.stop_code = Some("TaskFailedToStart".into());
830 task.stopped_reason = Some(reason.to_string());
831 task.captured_logs = format!("[task failed to start]: {reason}");
835 for c in task.containers.iter_mut() {
836 c.last_status = "STOPPED".into();
837 c.reason = Some(reason.to_string());
838 }
839 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
840 if was_running {
841 if cluster.running_tasks_count > 0 {
842 cluster.running_tasks_count -= 1;
843 }
844 } else if cluster.pending_tasks_count > 0 {
845 cluster.pending_tasks_count -= 1;
846 }
847 }
848 (task.task_arn.clone(), task.cluster_arn.clone())
849 };
850 s.push_event(LifecycleEvent {
851 at: Utc::now(),
852 event_type: "TaskFailedToStart".into(),
853 task_arn: Some(arn),
854 cluster_arn: Some(cluster_arn),
855 last_status: Some("STOPPED".into()),
856 detail: serde_json::json!({ "reason": reason }),
857 });
858}
859
860pub async fn sleep(duration: Duration) {
864 tokio::time::sleep(duration).await;
865}
866
867#[cfg(test)]
868mod tests {
869 use super::*;
870 use crate::state::{EcsState, Task};
871 use fakecloud_core::multi_account::MultiAccountState;
872 use parking_lot::RwLock;
873 use std::sync::Arc;
874
875 #[test]
876 fn cli_works_for_known_missing_binary_is_false() {
877 assert!(!cli_works("definitely-not-a-real-cli-binary-xyz"));
878 }
879
880 #[test]
881 fn aws_ecr_uris_translate_for_local_pull() {
882 assert_eq!(
883 fakecloud_core::ecr_uri::translate_to_local(
884 "123456789012.dkr.ecr.us-east-1.amazonaws.com/app:latest",
885 4566
886 )
887 .as_deref(),
888 Some("127.0.0.1:4566/app:latest")
889 );
890 }
891
892 fn make_task(task_id: &str) -> Task {
893 Task {
894 task_arn: format!("arn:aws:ecs:us-east-1:000000000000:task/default/{task_id}"),
895 task_id: task_id.into(),
896 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
897 cluster_name: "default".into(),
898 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
899 family: "app".into(),
900 revision: 1,
901 last_status: "PENDING".into(),
902 desired_status: "RUNNING".into(),
903 launch_type: "FARGATE".into(),
904 platform_version: None,
905 cpu: None,
906 memory: None,
907 containers: Vec::new(),
908 overrides: serde_json::json!({}),
909 started_by: None,
910 group: None,
911 connectivity: "CONNECTING".into(),
912 stop_code: None,
913 stopped_reason: None,
914 created_at: Utc::now(),
915 started_at: None,
916 stopping_at: None,
917 stopped_at: None,
918 pull_started_at: None,
919 pull_stopped_at: None,
920 connectivity_at: None,
921 started_by_ref_id: None,
922 execution_role_arn: None,
923 task_role_arn: None,
924 tags: Vec::new(),
925 awslogs: None,
926 captured_logs: String::new(),
927 protection: None,
928 }
929 }
930
931 #[test]
932 fn finalize_failure_writes_reason_into_captured_logs() {
933 let mut accounts: MultiAccountState<EcsState> =
934 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
935 let acct = accounts.get_or_create("000000000000");
936 acct.tasks.insert("t1".into(), make_task("t1"));
937 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
938
939 finalize_failure(
940 &state,
941 "000000000000",
942 "t1",
943 "failed to resolve secret DB_PASSWORD",
944 );
945
946 let accounts = state.read();
947 let task = accounts
948 .get("000000000000")
949 .unwrap()
950 .tasks
951 .get("t1")
952 .unwrap();
953 assert_eq!(task.last_status, "STOPPED");
954 assert_eq!(task.stop_code.as_deref(), Some("TaskFailedToStart"));
955 assert!(
956 task.captured_logs
957 .contains("failed to resolve secret DB_PASSWORD"),
958 "captured_logs missing reason: {:?}",
959 task.captured_logs
960 );
961 assert!(
962 task.captured_logs.starts_with("[task failed to start]:"),
963 "captured_logs missing prefix: {:?}",
964 task.captured_logs
965 );
966 }
967}