1use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use base64::Engine;
15use chrono::Utc;
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_logs::ingest::{append_events, IngestEvent};
18use fakecloud_logs::state::SharedLogsState;
19use fakecloud_secretsmanager::state::SharedSecretsManagerState;
20use fakecloud_ssm::state::SharedSsmState;
21use parking_lot::RwLock;
22use tempfile::TempDir;
23use tokio::process::Command;
24
25use crate::state::{LifecycleEvent, SharedEcsState, Task};
26
27#[derive(Debug, thiserror::Error)]
28pub enum RuntimeError {
29 #[error("container CLI not found (tried docker, podman)")]
30 NoCli,
31 #[error("image pull failed: {0}")]
32 ImagePull(String),
33 #[error("container start failed: {0}")]
34 ContainerStart(String),
35 #[error("docker wait failed: {0}")]
36 Wait(String),
37}
38
39pub struct EcsRuntime {
41 cli: String,
42 host_ip: String,
43 server_port: u16,
48 docker_config: Option<Arc<TempDir>>,
53 containers: RwLock<std::collections::HashMap<String, String>>,
56 delivery_bus: Option<Arc<DeliveryBus>>,
60 logs_state: Option<SharedLogsState>,
64 secretsmanager_state: Option<SharedSecretsManagerState>,
67 ssm_state: Option<SharedSsmState>,
70}
71
72impl EcsRuntime {
73 pub fn new(server_port: u16) -> Option<Self> {
78 let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
79 if cli_works(&cli) {
80 cli
81 } else {
82 return None;
83 }
84 } else if cli_works("docker") {
85 "docker".to_string()
86 } else if cli_works("podman") {
87 "podman".to_string()
88 } else {
89 return None;
90 };
91 let host_ip = if cfg!(target_os = "linux") {
92 "172.17.0.1".to_string()
93 } else {
94 "host-gateway".to_string()
95 };
96 let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
97 Some(Self {
98 cli,
99 host_ip,
100 server_port,
101 docker_config,
102 containers: RwLock::new(std::collections::HashMap::new()),
103 delivery_bus: None,
104 logs_state: None,
105 secretsmanager_state: None,
106 ssm_state: None,
107 })
108 }
109
110 fn docker_config_path(&self) -> Option<PathBuf> {
114 self.docker_config.as_ref().map(|d| d.path().to_path_buf())
115 }
116
117 fn cli_command(&self) -> Command {
120 let mut cmd = Command::new(&self.cli);
121 if let Some(p) = self.docker_config_path() {
122 cmd.env("DOCKER_CONFIG", p);
123 }
124 cmd
125 }
126
127 pub fn cli_name(&self) -> &str {
128 &self.cli
129 }
130
131 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
134 self.delivery_bus = Some(bus);
135 self
136 }
137
138 pub fn with_logs(mut self, logs: SharedLogsState) -> Self {
141 self.logs_state = Some(logs);
142 self
143 }
144
145 pub fn with_secretsmanager(mut self, state: SharedSecretsManagerState) -> Self {
148 self.secretsmanager_state = Some(state);
149 self
150 }
151
152 pub fn with_ssm(mut self, state: SharedSsmState) -> Self {
155 self.ssm_state = Some(state);
156 self
157 }
158
159 pub fn run_task(self: Arc<Self>, state: SharedEcsState, task_id: String, account_id: String) {
164 let rt = self.clone();
165 tokio::spawn(async move {
166 if let Err(err) = rt.run_task_inner(&state, &task_id, &account_id).await {
167 tracing::warn!(%err, task = %task_id, "ecs task execution failed");
168 finalize_failure(&state, &account_id, &task_id, &err.to_string());
169 rt.emit_state_change(
170 &state,
171 &account_id,
172 &task_id,
173 "STOPPED",
174 Some(("TaskFailedToStart", err.to_string())),
175 );
176 }
177 });
178 }
179
180 async fn run_task_inner(
181 &self,
182 state: &SharedEcsState,
183 task_id: &str,
184 account_id: &str,
185 ) -> Result<(), RuntimeError> {
186 let (image, mut env, entry_point, command, awslogs_container, secrets_refs, has_task_role) = {
187 let accounts = state.read();
188 let s = accounts
189 .get(account_id)
190 .ok_or_else(|| RuntimeError::ContainerStart("account missing".into()))?;
191 let task = s
192 .tasks
193 .get(task_id)
194 .ok_or_else(|| RuntimeError::ContainerStart("task missing".into()))?;
195 let container = task
196 .containers
197 .first()
198 .ok_or_else(|| RuntimeError::ContainerStart("task has no containers".into()))?;
199 let def = find_container_definition(s, &task.family, task.revision, &container.name);
200 let secrets = def
201 .as_ref()
202 .and_then(|d| d.get("secrets").and_then(|v| v.as_array()).cloned())
203 .map(|arr| {
204 arr.iter()
205 .filter_map(|e| {
206 let name = e.get("name").and_then(|v| v.as_str())?.to_string();
207 let value_from =
208 e.get("valueFrom").and_then(|v| v.as_str())?.to_string();
209 Some((name, value_from))
210 })
211 .collect::<Vec<_>>()
212 })
213 .unwrap_or_default();
214 let str_array = |key: &str| -> Vec<String> {
215 def.as_ref()
216 .and_then(|d| d.get(key).and_then(|v| v.as_array()).cloned())
217 .map(|arr| {
218 arr.iter()
219 .filter_map(|v| v.as_str().map(String::from))
220 .collect::<Vec<_>>()
221 })
222 .unwrap_or_default()
223 };
224 (
225 container.image.clone(),
226 def.as_ref()
227 .and_then(|d| d.get("environment").and_then(|v| v.as_array()).cloned())
228 .map(|arr| {
229 arr.iter()
230 .filter_map(|e| {
231 let k = e.get("name").and_then(|v| v.as_str())?;
232 let v = e.get("value").and_then(|v| v.as_str()).unwrap_or("");
233 Some((k.to_string(), v.to_string()))
234 })
235 .collect::<Vec<_>>()
236 })
237 .unwrap_or_default(),
238 str_array("entryPoint"),
239 str_array("command"),
240 container.name.clone(),
241 secrets,
242 task.task_role_arn.is_some(),
243 )
244 };
245
246 for (name, value_from) in &secrets_refs {
254 let resolved = self.resolve_secret(account_id, value_from);
255 match resolved {
256 Some(v) => env.push((name.clone(), v)),
257 None => {
258 return Err(RuntimeError::ContainerStart(format!(
259 "failed to resolve secret {name} from {value_from}"
260 )))
261 }
262 }
263 }
264
265 if has_task_role {
271 env.push((
272 "AWS_CONTAINER_CREDENTIALS_FULL_URI".into(),
273 format!(
274 "http://host.docker.internal:{}/_fakecloud/ecs/creds/{}",
275 self.server_port, task_id
276 ),
277 ));
278 }
279
280 mark_pull_started(state, account_id, task_id);
286 let local_pull_uri = fakecloud_core::ecr_uri::translate_to_local(&image, self.server_port);
287 let pull_uri = local_pull_uri.as_deref().unwrap_or(&image);
288 let pull_out = self
289 .cli_command()
290 .args(["pull", pull_uri])
291 .output()
292 .await
293 .map_err(|e| RuntimeError::ImagePull(e.to_string()))?;
294 if !pull_out.status.success() {
295 let err = String::from_utf8_lossy(&pull_out.stderr).to_string();
296 return Err(RuntimeError::ImagePull(err));
297 }
298 let run_image = if let Some(ref local_uri) = local_pull_uri {
304 if fakecloud_core::ecr_uri::is_digest_ref(&image) {
305 local_uri.clone()
306 } else {
307 let _ = self
308 .cli_command()
309 .args(["tag", local_uri, &image])
310 .output()
311 .await;
312 image.clone()
313 }
314 } else {
315 image.clone()
316 };
317 mark_pull_stopped(state, account_id, task_id);
318
319 let mut cmd = Command::new(&self.cli);
322 cmd.args(["run", "-d"])
323 .args(["--label", &format!("fakecloud-ecs-task={}", task_id)])
324 .args([
325 "--add-host",
326 &format!("host.docker.internal:{}", self.host_ip),
327 ]);
328 for (k, v) in &env {
329 let transformed = v
330 .replace("http://127.0.0.1:", "http://host.docker.internal:")
331 .replace("https://127.0.0.1:", "https://host.docker.internal:")
332 .replace("http://localhost:", "http://host.docker.internal:")
333 .replace("https://localhost:", "https://host.docker.internal:");
334 cmd.arg("-e").arg(format!("{}={}", k, transformed));
335 }
336 if let Some(first) = entry_point.first() {
342 cmd.args(["--entrypoint", first]);
343 }
344 cmd.arg(&run_image);
345 for arg in entry_point.iter().skip(1) {
346 cmd.arg(arg);
347 }
348 for arg in &command {
349 cmd.arg(arg);
350 }
351 let run_out = cmd
352 .output()
353 .await
354 .map_err(|e| RuntimeError::ContainerStart(e.to_string()))?;
355 if !run_out.status.success() {
356 let err = String::from_utf8_lossy(&run_out.stderr).to_string();
357 return Err(RuntimeError::ContainerStart(err));
358 }
359 let container_id = String::from_utf8_lossy(&run_out.stdout).trim().to_string();
360 self.containers
361 .write()
362 .insert(task_id.to_string(), container_id.clone());
363 mark_running(
364 state,
365 account_id,
366 task_id,
367 &container_id,
368 &awslogs_container,
369 );
370 self.emit_state_change(state, account_id, task_id, "RUNNING", None);
371
372 let wait_out = Command::new(&self.cli)
375 .args(["wait", &container_id])
376 .output()
377 .await
378 .map_err(|e| RuntimeError::Wait(e.to_string()))?;
379 if !wait_out.status.success() {
380 let err = String::from_utf8_lossy(&wait_out.stderr).to_string();
384 return Err(RuntimeError::Wait(err));
385 }
386 let exit_code: i64 = String::from_utf8_lossy(&wait_out.stdout)
387 .trim()
388 .parse()
389 .unwrap_or(-1);
390
391 let logs_out = Command::new(&self.cli)
393 .args(["logs", &container_id])
394 .output()
395 .await
396 .map_err(|e| RuntimeError::Wait(e.to_string()))?;
397 let mut captured = String::new();
398 captured.push_str(&String::from_utf8_lossy(&logs_out.stdout));
399 captured.push_str(&String::from_utf8_lossy(&logs_out.stderr));
400
401 let _ = Command::new(&self.cli)
404 .args(["rm", &container_id])
405 .output()
406 .await;
407 self.containers.write().remove(task_id);
408
409 self.forward_awslogs_if_configured(state, account_id, task_id, &captured);
414 finalize_stopped(
415 state,
416 account_id,
417 task_id,
418 exit_code,
419 &captured,
420 "EssentialContainerExited",
421 None,
422 );
423 self.emit_state_change(
424 state,
425 account_id,
426 task_id,
427 "STOPPED",
428 Some((
429 "EssentialContainerExited",
430 format!("Exit code {}", exit_code),
431 )),
432 );
433 Ok(())
434 }
435
436 fn resolve_secret(&self, account_id: &str, value_from: &str) -> Option<String> {
441 if value_from.contains(":secret:") {
442 let state = self.secretsmanager_state.as_ref()?;
443 let accounts = state.read();
444 let sm = accounts.get(account_id)?;
445 let arn_tail = value_from.rsplit(":secret:").next()?;
449 let name = arn_tail
450 .rsplit_once('-')
451 .map(|(n, _)| n)
452 .unwrap_or(arn_tail);
453 let secret = sm.secrets.get(name).or_else(|| sm.secrets.get(arn_tail))?;
454 let version_id = secret.current_version_id.as_ref()?;
455 let v = secret.versions.get(version_id)?;
456 return v.secret_string.clone();
457 }
458 if value_from.contains(":parameter") {
459 let state = self.ssm_state.as_ref()?;
460 let accounts = state.read();
461 let ssm = accounts.get(account_id)?;
462 let after = value_from.rsplit(":parameter").next()?;
466 let name_with_slash = after.trim_start_matches('/');
467 return ssm
468 .parameters
469 .get(&format!("/{name_with_slash}"))
470 .or_else(|| ssm.parameters.get(name_with_slash))
471 .map(|p| p.value.clone());
472 }
473 None
474 }
475
476 fn emit_state_change(
480 &self,
481 state: &SharedEcsState,
482 account_id: &str,
483 task_id: &str,
484 last_status: &str,
485 stop: Option<(&str, String)>,
486 ) {
487 let Some(ref bus) = self.delivery_bus else {
488 return;
489 };
490 let Some(task_view) = snapshot_task(state, account_id, task_id) else {
491 return;
492 };
493 let mut detail = serde_json::json!({
494 "taskArn": task_view.task_arn,
495 "clusterArn": task_view.cluster_arn,
496 "lastStatus": last_status,
497 "desiredStatus": if last_status == "STOPPED" { "STOPPED" } else { "RUNNING" },
498 "launchType": task_view.launch_type,
499 "group": task_view.group,
500 "taskDefinitionArn": task_view.task_definition_arn,
501 "containers": task_view.containers,
502 });
503 if let Some((code, reason)) = stop {
504 detail["stopCode"] = code.into();
505 detail["stoppedReason"] = reason.into();
506 }
507 bus.put_event_to_eventbridge(
508 "aws.ecs",
509 "ECS Task State Change",
510 &detail.to_string(),
511 "default",
512 );
513 }
514
515 fn forward_awslogs_if_configured(
519 &self,
520 state: &SharedEcsState,
521 account_id: &str,
522 task_id: &str,
523 captured: &str,
524 ) {
525 let Some(ref logs) = self.logs_state else {
526 return;
527 };
528 let (cfg, task_region) = {
531 let accounts = state.read();
532 let Some(s) = accounts.get(account_id) else {
533 return;
534 };
535 let Some(task) = s.tasks.get(task_id) else {
536 return;
537 };
538 let Some(ref cfg) = task.awslogs else {
539 return;
540 };
541 (cfg.clone(), s.region.clone())
542 };
543 if captured.is_empty() {
544 return;
545 }
546 let now = Utc::now().timestamp_millis();
547 let stream_name = cfg.stream_name(task_id);
548 let events: Vec<IngestEvent> = captured
549 .lines()
550 .enumerate()
551 .map(|(i, line)| IngestEvent {
552 timestamp_ms: now.saturating_add(i as i64),
556 message: line.to_string(),
557 })
558 .collect();
559 append_events(
560 logs,
561 account_id,
562 &task_region,
563 &cfg.group,
564 &stream_name,
565 &events,
566 );
567 }
568
569 pub async fn stop_task(&self, task_id: &str, reason: &str) -> bool {
574 let container_id = self.containers.read().get(task_id).cloned();
575 let Some(id) = container_id else {
576 return false;
577 };
578 let _ = Command::new(&self.cli)
580 .args(["stop", "--time", "10", &id])
581 .output()
582 .await;
583 tracing::info!(task = %task_id, reason = %reason, "ecs task stop requested");
584 true
585 }
586
587 pub async fn stop_all(&self) {
591 let ids: Vec<String> = self.containers.read().values().cloned().collect();
592 for id in ids {
593 let _ = Command::new(&self.cli).args(["kill", &id]).output().await;
594 let _ = Command::new(&self.cli).args(["rm", &id]).output().await;
595 }
596 self.containers.write().clear();
597 }
598}
599
600struct TaskSnapshot {
601 task_arn: String,
602 cluster_arn: String,
603 launch_type: String,
604 group: Option<String>,
605 task_definition_arn: String,
606 containers: serde_json::Value,
607}
608
609fn snapshot_task(state: &SharedEcsState, account_id: &str, task_id: &str) -> Option<TaskSnapshot> {
610 let accounts = state.read();
611 let s = accounts.get(account_id)?;
612 let task = s.tasks.get(task_id)?;
613 Some(TaskSnapshot {
614 task_arn: task.task_arn.clone(),
615 cluster_arn: task.cluster_arn.clone(),
616 launch_type: task.launch_type.clone(),
617 group: task.group.clone(),
618 task_definition_arn: task.task_definition_arn.clone(),
619 containers: serde_json::Value::Array(
620 task.containers
621 .iter()
622 .map(|c| {
623 serde_json::json!({
624 "containerArn": c.container_arn,
625 "name": c.name,
626 "image": c.image,
627 "lastStatus": c.last_status,
628 "exitCode": c.exit_code,
629 "reason": c.reason,
630 })
631 })
632 .collect(),
633 ),
634 })
635}
636
637#[allow(dead_code)]
639fn _task_type_anchor(_t: &Task) {}
640
641fn cli_works(cli: &str) -> bool {
642 std::process::Command::new(cli)
643 .arg("info")
644 .stdout(std::process::Stdio::null())
645 .stderr(std::process::Stdio::null())
646 .status()
647 .map(|s| s.success())
648 .unwrap_or(false)
649}
650
651fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
656 let dir = TempDir::new().ok()?;
657 let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-ecs-runtime");
658 let config = serde_json::json!({
659 "auths": {
660 format!("127.0.0.1:{server_port}"): { "auth": auth },
661 }
662 });
663 std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
664 Some(dir)
665}
666
667fn find_container_definition(
668 state: &crate::state::EcsState,
669 family: &str,
670 revision: i32,
671 name: &str,
672) -> Option<serde_json::Value> {
673 state
674 .task_definitions
675 .get(family)?
676 .get(&revision)?
677 .container_definitions
678 .iter()
679 .find(|c| c.get("name").and_then(|v| v.as_str()) == Some(name))
680 .cloned()
681}
682
683fn mark_pull_started(state: &SharedEcsState, account_id: &str, task_id: &str) {
684 let mut accounts = state.write();
685 let Some(s) = accounts.get_mut(account_id) else {
686 return;
687 };
688 let task_arn_cluster = s
689 .tasks
690 .get(task_id)
691 .map(|t| (t.task_arn.clone(), t.cluster_arn.clone()));
692 if let Some(task) = s.tasks.get_mut(task_id) {
693 task.pull_started_at = Some(Utc::now());
694 }
695 if let Some((arn, cluster_arn)) = task_arn_cluster {
696 s.push_event(LifecycleEvent {
697 at: Utc::now(),
698 event_type: "PullStarted".into(),
699 task_arn: Some(arn),
700 cluster_arn: Some(cluster_arn),
701 last_status: Some("PENDING".into()),
702 detail: serde_json::json!({}),
703 });
704 }
705}
706
707fn mark_pull_stopped(state: &SharedEcsState, account_id: &str, task_id: &str) {
708 let mut accounts = state.write();
709 let Some(s) = accounts.get_mut(account_id) else {
710 return;
711 };
712 if let Some(task) = s.tasks.get_mut(task_id) {
713 task.pull_stopped_at = Some(Utc::now());
714 }
715}
716
717fn mark_running(
718 state: &SharedEcsState,
719 account_id: &str,
720 task_id: &str,
721 container_id: &str,
722 container_name: &str,
723) {
724 let mut accounts = state.write();
725 let Some(s) = accounts.get_mut(account_id) else {
726 return;
727 };
728 let (arn, cluster_arn) = {
729 let Some(task) = s.tasks.get_mut(task_id) else {
730 return;
731 };
732 task.last_status = "RUNNING".into();
733 task.connectivity = "CONNECTED".into();
734 task.connectivity_at = Some(Utc::now());
735 task.started_at = Some(Utc::now());
736 if let Some(c) = task
737 .containers
738 .iter_mut()
739 .find(|c| c.name == container_name)
740 {
741 c.runtime_id = Some(container_id.into());
742 c.last_status = "RUNNING".into();
743 }
744 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
745 cluster.running_tasks_count += 1;
746 if cluster.pending_tasks_count > 0 {
747 cluster.pending_tasks_count -= 1;
748 }
749 }
750 (task.task_arn.clone(), task.cluster_arn.clone())
751 };
752 s.push_event(LifecycleEvent {
753 at: Utc::now(),
754 event_type: "TaskStateChange".into(),
755 task_arn: Some(arn),
756 cluster_arn: Some(cluster_arn),
757 last_status: Some("RUNNING".into()),
758 detail: serde_json::json!({}),
759 });
760}
761
762fn finalize_stopped(
763 state: &SharedEcsState,
764 account_id: &str,
765 task_id: &str,
766 exit_code: i64,
767 captured: &str,
768 stop_code: &str,
769 stopped_reason: Option<String>,
770) {
771 let mut accounts = state.write();
772 let Some(s) = accounts.get_mut(account_id) else {
773 return;
774 };
775 let (arn, cluster_arn) = {
776 let Some(task) = s.tasks.get_mut(task_id) else {
777 return;
778 };
779 task.last_status = "STOPPED".into();
780 task.desired_status = "STOPPED".into();
781 task.stopping_at = task.stopping_at.or(Some(Utc::now()));
782 task.stopped_at = Some(Utc::now());
783 task.stop_code = Some(stop_code.into());
784 task.stopped_reason = stopped_reason.or(Some(format!("Exit code {}", exit_code)));
785 task.captured_logs = captured.to_string();
786 for c in task.containers.iter_mut() {
787 c.last_status = "STOPPED".into();
788 if c.exit_code.is_none() {
789 c.exit_code = Some(exit_code);
790 }
791 }
792 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
793 if cluster.running_tasks_count > 0 {
794 cluster.running_tasks_count -= 1;
795 }
796 }
797 (task.task_arn.clone(), task.cluster_arn.clone())
798 };
799 s.push_event(LifecycleEvent {
800 at: Utc::now(),
801 event_type: "TaskStateChange".into(),
802 task_arn: Some(arn),
803 cluster_arn: Some(cluster_arn),
804 last_status: Some("STOPPED".into()),
805 detail: serde_json::json!({
806 "exitCode": exit_code,
807 "stopCode": stop_code,
808 }),
809 });
810}
811
812fn finalize_failure(state: &SharedEcsState, account_id: &str, task_id: &str, reason: &str) {
813 let mut accounts = state.write();
814 let Some(s) = accounts.get_mut(account_id) else {
815 return;
816 };
817 let (arn, cluster_arn) = {
818 let Some(task) = s.tasks.get_mut(task_id) else {
819 return;
820 };
821 let was_running = task.last_status == "RUNNING";
827 task.last_status = "STOPPED".into();
828 task.desired_status = "STOPPED".into();
829 task.stopped_at = Some(Utc::now());
830 task.stop_code = Some("TaskFailedToStart".into());
831 task.stopped_reason = Some(reason.to_string());
832 for c in task.containers.iter_mut() {
833 c.last_status = "STOPPED".into();
834 c.reason = Some(reason.to_string());
835 }
836 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
837 if was_running {
838 if cluster.running_tasks_count > 0 {
839 cluster.running_tasks_count -= 1;
840 }
841 } else if cluster.pending_tasks_count > 0 {
842 cluster.pending_tasks_count -= 1;
843 }
844 }
845 (task.task_arn.clone(), task.cluster_arn.clone())
846 };
847 s.push_event(LifecycleEvent {
848 at: Utc::now(),
849 event_type: "TaskFailedToStart".into(),
850 task_arn: Some(arn),
851 cluster_arn: Some(cluster_arn),
852 last_status: Some("STOPPED".into()),
853 detail: serde_json::json!({ "reason": reason }),
854 });
855}
856
857pub async fn sleep(duration: Duration) {
861 tokio::time::sleep(duration).await;
862}
863
864#[cfg(test)]
865mod tests {
866 use super::*;
867
868 #[test]
869 fn cli_works_for_known_missing_binary_is_false() {
870 assert!(!cli_works("definitely-not-a-real-cli-binary-xyz"));
871 }
872
873 #[test]
874 fn aws_ecr_uris_translate_for_local_pull() {
875 assert_eq!(
876 fakecloud_core::ecr_uri::translate_to_local(
877 "123456789012.dkr.ecr.us-east-1.amazonaws.com/app:latest",
878 4566
879 )
880 .as_deref(),
881 Some("127.0.0.1:4566/app:latest")
882 );
883 }
884}