1use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use base64::Engine;
15use chrono::Utc;
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_logs::ingest::{append_events, IngestEvent};
18use fakecloud_logs::SharedLogsState;
19use fakecloud_secretsmanager::SharedSecretsManagerState;
20use fakecloud_ssm::SharedSsmState;
21use parking_lot::RwLock;
22use tempfile::TempDir;
23use tokio::process::Command;
24
25use crate::state::{LifecycleEvent, SharedEcsState};
26
27#[derive(Debug, thiserror::Error)]
28pub enum RuntimeError {
29 #[error("container CLI not found (tried docker, podman)")]
30 NoCli,
31 #[error("image pull failed: {0}")]
32 ImagePull(String),
33 #[error("container start failed: {0}")]
34 ContainerStart(String),
35 #[error("docker wait failed: {0}")]
36 Wait(String),
37}
38
39pub struct EcsRuntime {
41 cli: String,
42 net: fakecloud_core::container_net::HostNetworking,
47 server_port: u16,
52 docker_config: Option<Arc<TempDir>>,
57 containers: RwLock<std::collections::HashMap<String, Vec<(String, String)>>>,
62 delivery_bus: Option<Arc<DeliveryBus>>,
66 logs_state: Option<SharedLogsState>,
70 secretsmanager_state: Option<SharedSecretsManagerState>,
73 ssm_state: Option<SharedSsmState>,
76 k8s: Option<k8s::K8sTaskBackend>,
80}
81
82mod config;
83mod k8s;
84mod lb;
85mod monitoring;
86mod secrets;
87mod task_lifecycle;
88
89impl EcsRuntime {
90 pub fn new(server_port: u16) -> Option<Self> {
95 let cli = fakecloud_core::container_net::detect_container_cli()?;
96 let net = fakecloud_core::container_net::HostNetworking::detect(&cli);
97 let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
98 Some(Self {
99 cli,
100 net,
101 server_port,
102 docker_config,
103 containers: RwLock::new(std::collections::HashMap::new()),
104 delivery_bus: None,
105 logs_state: None,
106 secretsmanager_state: None,
107 ssm_state: None,
108 k8s: None,
109 })
110 }
111
112 pub async fn new_k8s(server_port: u16) -> Result<Self, k8s::BackendInitError> {
116 let backend = k8s::K8sTaskBackend::from_env(server_port).await?;
117 let net = fakecloud_core::container_net::HostNetworking {
120 host_alias: String::new(),
121 add_host_arg: None,
122 sibling_host: String::new(),
123 };
124 Ok(Self {
125 cli: String::new(),
126 net,
127 server_port,
128 docker_config: None,
129 containers: RwLock::new(std::collections::HashMap::new()),
130 delivery_bus: None,
131 logs_state: None,
132 secretsmanager_state: None,
133 ssm_state: None,
134 k8s: Some(backend),
135 })
136 }
137
138 pub fn cli_name(&self) -> &str {
140 if self.k8s.is_some() {
141 "kubernetes"
142 } else {
143 &self.cli
144 }
145 }
146
147 pub async fn reap_stale(&self) {
150 if let Some(k) = &self.k8s {
151 k.reap_stale().await;
152 }
153 }
154
155 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
158 self.delivery_bus = Some(bus);
159 self
160 }
161
162 pub fn with_logs(mut self, logs: SharedLogsState) -> Self {
165 self.logs_state = Some(logs);
166 self
167 }
168}
169
170#[derive(Clone, Debug)]
172pub(crate) struct ContainerPlan {
173 pub(crate) container_name: String,
174 pub(crate) image: String,
175 pub(crate) env: Vec<(String, String)>,
176 pub(crate) entry_point: Vec<String>,
177 pub(crate) command: Vec<String>,
178 pub(crate) secrets_refs: Vec<(String, String)>,
179 pub(crate) essential: bool,
180 pub(crate) has_task_role: bool,
181 pub(crate) port_mappings: Vec<PortMapping>,
186 pub(crate) network_mode: Option<String>,
191 pub(crate) depends_on: Vec<DependsOn>,
199 pub(crate) health_check: Option<HealthCheckSpec>,
206 pub(crate) volume_mounts: Vec<VolumeMount>,
211 pub(crate) ulimits: Vec<Ulimit>,
214 pub(crate) linux_parameters: Option<LinuxParameters>,
218 pub(crate) stop_timeout: Option<u32>,
220 pub(crate) user: Option<String>,
222 pub(crate) working_directory: Option<String>,
224 pub(crate) tty: bool,
226 pub(crate) interactive: bool,
228 pub(crate) readonly_rootfs: bool,
230}
231
232#[derive(Clone, Debug, PartialEq, Eq)]
238pub(crate) struct DependsOn {
239 pub container_name: String,
240 pub condition: DependsOnCondition,
241}
242
243#[derive(Clone, Copy, Debug, PartialEq, Eq)]
247pub(crate) enum DependsOnCondition {
248 Start,
251 Complete,
253 Success,
255 Healthy,
259}
260
261impl DependsOnCondition {
262 pub fn parse(raw: &str) -> Option<Self> {
266 match raw {
267 "START" => Some(Self::Start),
268 "COMPLETE" => Some(Self::Complete),
269 "SUCCESS" => Some(Self::Success),
270 "HEALTHY" => Some(Self::Healthy),
271 _ => None,
272 }
273 }
274
275 pub fn as_aws_str(self) -> &'static str {
279 match self {
280 Self::Start => "START",
281 Self::Complete => "COMPLETE",
282 Self::Success => "SUCCESS",
283 Self::Healthy => "HEALTHY",
284 }
285 }
286}
287
288#[derive(Clone, Debug, PartialEq, Eq)]
294pub(crate) struct HealthCheckSpec {
295 pub command: Vec<String>,
301 pub interval_seconds: u32,
302 pub timeout_seconds: u32,
303 pub retries: u32,
304 pub start_period_seconds: u32,
305}
306
307#[derive(Clone, Debug, PartialEq, Eq)]
311pub(crate) struct PortMapping {
312 pub container_port: u16,
313 pub host_port: u16,
316 pub protocol: String,
318}
319
320#[derive(Clone, Debug, PartialEq, Eq)]
342pub(crate) struct VolumeMount {
343 pub source: String,
347 pub container_path: String,
350 pub read_only: bool,
354 pub cleanup_on_stop: bool,
360}
361
362#[derive(Clone, Debug, PartialEq, Eq)]
364pub(crate) struct Ulimit {
365 pub name: String,
366 pub soft_limit: i32,
367 pub hard_limit: i32,
368}
369
370#[derive(Clone, Debug, PartialEq, Eq)]
372pub(crate) struct Device {
373 pub host_path: String,
374 pub container_path: String,
375 pub permissions: String,
376}
377
378#[derive(Clone, Debug, PartialEq, Eq)]
380pub(crate) struct Sysctl {
381 pub name: String,
382 pub value: String,
383}
384
385#[derive(Clone, Debug, PartialEq, Eq, Default)]
387pub(crate) struct LinuxParameters {
388 pub capabilities_add: Vec<String>,
389 pub capabilities_drop: Vec<String>,
390 pub devices: Vec<Device>,
391 pub init_process_enabled: bool,
392 pub shared_memory_size: Option<i32>,
393 pub sysctls: Vec<Sysctl>,
394 pub tmpfs: Vec<Tmpfs>,
395 pub privileged: bool,
396}
397
398#[derive(Clone, Debug, PartialEq, Eq)]
400pub(crate) struct Tmpfs {
401 pub container_path: String,
402 pub size: i32,
403 pub mount_options: Vec<String>,
404}
405
406#[derive(Clone, Debug)]
407struct ResolvedContainerPlan {
408 plan: ContainerPlan,
409 env: Vec<(String, String)>,
410}
411
412#[derive(Clone, Debug)]
414struct TaskExitOutcome {
415 exited_index: Option<usize>,
419 exit_code: i64,
420 stop_code: &'static str,
421}
422
423#[derive(Clone, Debug)]
426pub(crate) struct RunningContainer {
427 pub(crate) name: String,
428 pub(crate) container_id: String,
429 pub(crate) essential: bool,
430 pub(crate) exit_code: Option<i64>,
431 pub(crate) network_bindings: Vec<serde_json::Value>,
435 pub(crate) image_digest: Option<String>,
440}
441
442pub(crate) fn task_should_stop(containers: &[RunningContainer]) -> bool {
447 if containers.is_empty() {
448 return true;
449 }
450 let any_essential_exited = containers
451 .iter()
452 .any(|c| c.essential && c.exit_code.is_some());
453 if any_essential_exited {
454 return true;
455 }
456 containers.iter().all(|c| c.exit_code.is_some())
457}
458
459pub(crate) fn task_desired_stopped(
464 state: &SharedEcsState,
465 account_id: &str,
466 task_id: &str,
467) -> bool {
468 let accounts = state.read();
469 match accounts.get(account_id).and_then(|s| s.tasks.get(task_id)) {
470 Some(task) => task.desired_status == "STOPPED",
471 None => true,
472 }
473}
474
475fn build_container_plans(
476 state: &SharedEcsState,
477 account_id: &str,
478 task_id: &str,
479 _server_port: u16,
480) -> Result<Vec<ContainerPlan>, RuntimeError> {
481 let accounts = state.read();
482 let s = accounts
483 .get(account_id)
484 .ok_or_else(|| RuntimeError::ContainerStart("account missing".into()))?;
485 let task = s
486 .tasks
487 .get(task_id)
488 .ok_or_else(|| RuntimeError::ContainerStart("task missing".into()))?;
489 if task.containers.is_empty() {
490 return Err(RuntimeError::ContainerStart(
491 "task has no containers".into(),
492 ));
493 }
494 let has_task_role = task.task_role_arn.is_some();
495 let task_def = s
496 .task_definitions
497 .get(&task.family)
498 .and_then(|revs| revs.get(&task.revision));
499 let network_mode = task_def.and_then(|td| td.network_mode.clone());
500 let volumes_by_name: std::collections::HashMap<String, &serde_json::Value> = task_def
505 .map(|td| {
506 td.volumes
507 .iter()
508 .filter_map(|v| {
509 let name = v.get("name").and_then(|n| n.as_str())?;
510 Some((name.to_string(), v))
511 })
512 .collect()
513 })
514 .unwrap_or_default();
515 let mut plans = Vec::with_capacity(task.containers.len());
516 for container in &task.containers {
517 let def = find_container_definition(s, &task.family, task.revision, &container.name);
518 let secrets_refs = def
519 .as_ref()
520 .and_then(|d| d.get("secrets").and_then(|v| v.as_array()).cloned())
521 .map(|arr| {
522 arr.iter()
523 .filter_map(|e| {
524 let name = e.get("name").and_then(|v| v.as_str())?.to_string();
525 let value_from = e.get("valueFrom").and_then(|v| v.as_str())?.to_string();
526 Some((name, value_from))
527 })
528 .collect::<Vec<_>>()
529 })
530 .unwrap_or_default();
531 let str_array = |key: &str| -> Vec<String> {
532 def.as_ref()
533 .and_then(|d| d.get(key).and_then(|v| v.as_array()).cloned())
534 .map(|arr| {
535 arr.iter()
536 .filter_map(|v| v.as_str().map(String::from))
537 .collect::<Vec<_>>()
538 })
539 .unwrap_or_default()
540 };
541 let env = def
542 .as_ref()
543 .and_then(|d| d.get("environment").and_then(|v| v.as_array()).cloned())
544 .map(|arr| {
545 arr.iter()
546 .filter_map(|e| {
547 let k = e.get("name").and_then(|v| v.as_str())?;
548 let v = e.get("value").and_then(|v| v.as_str()).unwrap_or("");
549 Some((k.to_string(), v.to_string()))
550 })
551 .collect::<Vec<_>>()
552 })
553 .unwrap_or_default();
554 let port_mappings = def
555 .as_ref()
556 .and_then(|d| d.get("portMappings").and_then(|v| v.as_array()).cloned())
557 .map(|arr| {
558 arr.iter()
559 .filter_map(parse_port_mapping)
560 .collect::<Vec<_>>()
561 })
562 .unwrap_or_default();
563 let depends_on = def
564 .as_ref()
565 .and_then(|d| d.get("dependsOn").and_then(|v| v.as_array()).cloned())
566 .map(|arr| {
567 arr.iter()
568 .filter_map(parse_depends_on_entry)
569 .collect::<Vec<_>>()
570 })
571 .unwrap_or_default();
572 let health_check = def
573 .as_ref()
574 .and_then(|d| d.get("healthCheck"))
575 .and_then(parse_health_check);
576 let volume_mounts = def
577 .as_ref()
578 .and_then(|d| d.get("mountPoints").and_then(|v| v.as_array()).cloned())
579 .map(|arr| {
580 arr.iter()
581 .filter_map(|mp| resolve_mount_point(mp, &volumes_by_name))
582 .collect::<Vec<_>>()
583 })
584 .unwrap_or_default();
585 let ulimits = def
586 .as_ref()
587 .and_then(|d| d.get("ulimits").and_then(|v| v.as_array()).cloned())
588 .map(|arr| arr.iter().filter_map(parse_ulimit).collect::<Vec<_>>())
589 .unwrap_or_default();
590 let linux_parameters = def
591 .as_ref()
592 .and_then(|d| d.get("linuxParameters"))
593 .and_then(parse_linux_parameters);
594 let stop_timeout = def.as_ref().and_then(|d| {
595 d.get("stopTimeout")
596 .and_then(|v| v.as_u64())
597 .map(|n| n as u32)
598 });
599 let user = def
600 .as_ref()
601 .and_then(|d| d.get("user").and_then(|v| v.as_str()).map(String::from));
602 let working_directory = def.as_ref().and_then(|d| {
603 d.get("workingDirectory")
604 .and_then(|v| v.as_str())
605 .map(String::from)
606 });
607 let tty = def
608 .as_ref()
609 .and_then(|d| d.get("tty").and_then(|v| v.as_bool()))
610 .unwrap_or(false);
611 let interactive = def
612 .as_ref()
613 .and_then(|d| d.get("interactive").and_then(|v| v.as_bool()))
614 .unwrap_or(false);
615 let readonly_rootfs = def
616 .as_ref()
617 .and_then(|d| d.get("readonlyRootFilesystem").and_then(|v| v.as_bool()))
618 .unwrap_or(false);
619 plans.push(ContainerPlan {
620 container_name: container.name.clone(),
621 image: container.image.clone(),
622 env,
623 entry_point: str_array("entryPoint"),
624 command: str_array("command"),
625 secrets_refs,
626 essential: container.essential,
627 has_task_role,
628 port_mappings,
629 network_mode: network_mode.clone(),
630 depends_on,
631 health_check,
632 volume_mounts,
633 ulimits,
634 linux_parameters,
635 stop_timeout,
636 user,
637 working_directory,
638 tty,
639 interactive,
640 readonly_rootfs,
641 });
642 }
643 let plans = topo_sort_plans(plans);
644 Ok(plans)
645}
646
647fn resolve_mount_point(
655 mount_point: &serde_json::Value,
656 volumes_by_name: &std::collections::HashMap<String, &serde_json::Value>,
657) -> Option<VolumeMount> {
658 let container_path = mount_point
659 .get("containerPath")
660 .and_then(|v| v.as_str())?
661 .to_string();
662 let source_volume = mount_point.get("sourceVolume").and_then(|v| v.as_str())?;
663 let read_only = mount_point
664 .get("readOnly")
665 .and_then(|v| v.as_bool())
666 .unwrap_or(false);
667 let volume = volumes_by_name.get(source_volume)?;
668 let source = resolve_volume_source(source_volume, volume)?;
669 let cleanup_on_stop = volume_is_task_scoped(volume);
670 Some(VolumeMount {
671 source,
672 container_path,
673 read_only,
674 cleanup_on_stop,
675 })
676}
677
678fn volume_is_task_scoped(volume: &serde_json::Value) -> bool {
687 if let Some(host) = volume.get("host") {
688 if let Some(path) = host.get("sourcePath").and_then(|v| v.as_str()) {
689 if !path.is_empty() {
690 return false;
691 }
692 }
693 }
694 if volume.get("efsVolumeConfiguration").is_some()
695 || volume
696 .get("fsxWindowsFileServerVolumeConfiguration")
697 .is_some()
698 {
699 return false;
700 }
701 if let Some(docker) = volume.get("dockerVolumeConfiguration") {
702 let scope = docker
705 .get("scope")
706 .and_then(|v| v.as_str())
707 .unwrap_or("task");
708 return scope != "shared";
709 }
710 true
712}
713
714fn resolve_volume_source(name: &str, volume: &serde_json::Value) -> Option<String> {
733 if let Some(host) = volume.get("host") {
734 if let Some(path) = host.get("sourcePath").and_then(|v| v.as_str()) {
735 if !path.is_empty() {
738 ensure_dir_exists(path);
739 return Some(path.to_string());
740 }
741 }
742 }
743 if let Some(efs) = volume.get("efsVolumeConfiguration") {
744 let fs_id = efs.get("fileSystemId").and_then(|v| v.as_str())?;
745 let root = efs
746 .get("rootDirectory")
747 .and_then(|v| v.as_str())
748 .unwrap_or("/");
749 return Some(shared_volume_name("efs", fs_id, root));
750 }
751 if let Some(fsx) = volume.get("fsxWindowsFileServerVolumeConfiguration") {
752 let fs_id = fsx.get("fileSystemId").and_then(|v| v.as_str())?;
753 let root = fsx
754 .get("rootDirectory")
755 .and_then(|v| v.as_str())
756 .unwrap_or("/");
757 return Some(shared_volume_name("fsx", fs_id, root));
758 }
759 if volume.get("dockerVolumeConfiguration").is_some() {
760 return Some(name.to_string());
763 }
764 Some(name.to_string())
766}
767
768fn shared_volume_name(kind: &str, fs_id: &str, root: &str) -> String {
779 let trimmed = root.trim_start_matches('/').trim_end_matches('/');
780 let fs_id = sanitize_volume_segment(fs_id);
781 if trimmed.is_empty() {
782 format!("fakecloud-{kind}-{fs_id}")
783 } else {
784 format!(
785 "fakecloud-{kind}-{fs_id}-{}",
786 sanitize_volume_segment(trimmed)
787 )
788 }
789}
790
791fn sanitize_volume_segment(s: &str) -> String {
794 s.chars()
795 .map(|c| {
796 if c.is_ascii_alphanumeric() || matches!(c, '_' | '.' | '-') {
797 c
798 } else {
799 '-'
800 }
801 })
802 .collect()
803}
804
805fn ensure_dir_exists(path: &str) {
810 let _ = std::fs::create_dir_all(path);
811}
812
813fn parse_depends_on_entry(value: &serde_json::Value) -> Option<DependsOn> {
818 let container_name = value
819 .get("containerName")
820 .and_then(|v| v.as_str())?
821 .to_string();
822 let raw_condition = value.get("condition").and_then(|v| v.as_str())?;
823 let condition = DependsOnCondition::parse(raw_condition)?;
824 Some(DependsOn {
825 container_name,
826 condition,
827 })
828}
829
830fn topo_sort_plans(plans: Vec<ContainerPlan>) -> Vec<ContainerPlan> {
841 use std::collections::{HashMap, HashSet};
842 let names: HashSet<String> = plans.iter().map(|p| p.container_name.clone()).collect();
843 let index: HashMap<String, usize> = plans
844 .iter()
845 .enumerate()
846 .map(|(i, p)| (p.container_name.clone(), i))
847 .collect();
848 let mut in_degree: Vec<usize> = plans
853 .iter()
854 .map(|p| {
855 p.depends_on
856 .iter()
857 .filter(|d| names.contains(&d.container_name))
858 .count()
859 })
860 .collect();
861 let mut dependants: Vec<Vec<usize>> = vec![Vec::new(); plans.len()];
863 for (i, p) in plans.iter().enumerate() {
864 for d in &p.depends_on {
865 if let Some(&di) = index.get(&d.container_name) {
866 dependants[di].push(i);
867 }
868 }
869 }
870 let mut ordered: Vec<ContainerPlan> = Vec::with_capacity(plans.len());
871 let mut emitted: Vec<bool> = vec![false; plans.len()];
872 loop {
873 let next = (0..plans.len()).find(|&i| !emitted[i] && in_degree[i] == 0);
876 match next {
877 Some(i) => {
878 emitted[i] = true;
879 ordered.push(plans[i].clone());
880 for &di in &dependants[i] {
881 if in_degree[di] > 0 {
882 in_degree[di] -= 1;
883 }
884 }
885 }
886 None => break,
887 }
888 }
889 for (i, p) in plans.into_iter().enumerate() {
891 if !emitted[i] {
892 ordered.push(p);
893 }
894 }
895 ordered
896}
897
898pub(crate) fn find_depends_on_cycle(
907 container_definitions: &[serde_json::Value],
908) -> Option<(String, String)> {
909 use std::collections::HashMap;
910
911 let names: Vec<String> = container_definitions
912 .iter()
913 .filter_map(|c| c.get("name").and_then(|n| n.as_str()).map(String::from))
914 .collect();
915 let index: HashMap<&str, usize> = names
916 .iter()
917 .enumerate()
918 .map(|(i, n)| (n.as_str(), i))
919 .collect();
920
921 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); names.len()];
922 for (i, cd) in container_definitions.iter().enumerate() {
923 if i >= names.len() {
924 continue;
925 }
926 let Some(deps) = cd.get("dependsOn").and_then(|v| v.as_array()) else {
927 continue;
928 };
929 for d in deps {
930 let Some(target) = d.get("containerName").and_then(|v| v.as_str()) else {
931 continue;
932 };
933 if let Some(&j) = index.get(target) {
934 adj[i].push(j);
936 }
937 }
938 }
939
940 let mut state = vec![0u8; names.len()];
944 let mut stack: Vec<(usize, usize)> = Vec::new();
945 for start in 0..names.len() {
946 if state[start] != 0 {
947 continue;
948 }
949 stack.clear();
950 stack.push((start, 0));
951 state[start] = 1;
952 while let Some(&(node, next_edge)) = stack.last() {
953 if next_edge < adj[node].len() {
954 let nb = adj[node][next_edge];
955 stack.last_mut().unwrap().1 += 1;
956 match state[nb] {
957 0 => {
958 state[nb] = 1;
959 stack.push((nb, 0));
960 }
961 1 => {
962 return Some((names[node].clone(), names[nb].clone()));
963 }
964 _ => {}
965 }
966 } else {
967 state[node] = 2;
968 stack.pop();
969 }
970 }
971 }
972 None
973}
974
975#[derive(Debug, Clone)]
979struct InspectedState {
980 started: bool,
981 exited: bool,
982 exit_code: i64,
983 health: Option<String>,
984}
985
986async fn inspect_container_state(cli: &str, container_id: &str) -> Option<InspectedState> {
990 let format =
993 "{{.State.Status}}|{{.State.Running}}|{{.State.ExitCode}}|{{if .State.Health}}{{.State.Health.Status}}{{else}}<none>{{end}}";
994 let out = Command::new(cli)
995 .args(["inspect", "-f", format, container_id])
996 .output()
997 .await
998 .ok()?;
999 if !out.status.success() {
1000 return None;
1001 }
1002 let raw = String::from_utf8_lossy(&out.stdout).trim().to_string();
1003 let parts: Vec<&str> = raw.split('|').collect();
1004 if parts.len() < 4 {
1005 return None;
1006 }
1007 let status = parts[0];
1008 let running = parts[1] == "true";
1009 let exit_code: i64 = parts[2].parse().unwrap_or(-1);
1010 let health = match parts[3] {
1011 "<none>" | "" => None,
1012 other => Some(other.to_string()),
1013 };
1014 let started = running || status == "exited" || status == "running" || status == "dead";
1018 let exited = status == "exited" || status == "dead";
1019 Some(InspectedState {
1020 started,
1021 exited,
1022 exit_code,
1023 health,
1024 })
1025}
1026
1027fn condition_is_met(condition: DependsOnCondition, state: &InspectedState) -> bool {
1031 match condition {
1032 DependsOnCondition::Start => state.started,
1033 DependsOnCondition::Complete => state.exited,
1034 DependsOnCondition::Success => state.exited && state.exit_code == 0,
1035 DependsOnCondition::Healthy => state.health.as_deref() == Some("healthy"),
1036 }
1037}
1038
1039#[cfg(test)]
1043pub(crate) fn __test_parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
1044 parse_port_mapping(value)
1045}
1046
1047fn parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
1053 let cmd_arr = value.get("command")?.as_array()?;
1054 let command: Vec<String> = cmd_arr
1055 .iter()
1056 .filter_map(|v| v.as_str().map(String::from))
1057 .collect();
1058 if command.is_empty() {
1059 return None;
1060 }
1061 if command.first().map(|s| s.as_str()) == Some("NONE") {
1062 return None;
1063 }
1064 let read_u32 = |key: &str, default: u32| -> u32 {
1065 value
1066 .get(key)
1067 .and_then(|v| v.as_i64())
1068 .filter(|n| (0..=u32::MAX as i64).contains(n))
1069 .map(|n| n as u32)
1070 .unwrap_or(default)
1071 };
1072 Some(HealthCheckSpec {
1073 command,
1074 interval_seconds: read_u32("interval", 30),
1075 timeout_seconds: read_u32("timeout", 5),
1076 retries: read_u32("retries", 3),
1077 start_period_seconds: read_u32("startPeriod", 0),
1078 })
1079}
1080
1081fn parse_ulimit(value: &serde_json::Value) -> Option<Ulimit> {
1083 let name = value.get("name").and_then(|v| v.as_str())?;
1084 let soft = value
1085 .get("softLimit")
1086 .and_then(|v| v.as_i64())
1087 .filter(|n| *n >= 0)? as i32;
1088 let hard = value
1089 .get("hardLimit")
1090 .and_then(|v| v.as_i64())
1091 .filter(|n| *n >= 0)? as i32;
1092 Some(Ulimit {
1093 name: name.to_string(),
1094 soft_limit: soft,
1095 hard_limit: hard,
1096 })
1097}
1098
1099fn parse_linux_parameters(value: &serde_json::Value) -> Option<LinuxParameters> {
1101 let mut lp = LinuxParameters::default();
1102 if let Some(arr) = value
1103 .get("capabilities")
1104 .and_then(|v| v.get("add"))
1105 .and_then(|v| v.as_array())
1106 {
1107 lp.capabilities_add = arr
1108 .iter()
1109 .filter_map(|v| v.as_str().map(String::from))
1110 .collect();
1111 }
1112 if let Some(arr) = value
1113 .get("capabilities")
1114 .and_then(|v| v.get("drop"))
1115 .and_then(|v| v.as_array())
1116 {
1117 lp.capabilities_drop = arr
1118 .iter()
1119 .filter_map(|v| v.as_str().map(String::from))
1120 .collect();
1121 }
1122 if let Some(arr) = value.get("devices").and_then(|v| v.as_array()) {
1123 lp.devices = arr.iter().filter_map(parse_device).collect();
1124 }
1125 lp.init_process_enabled = value
1126 .get("initProcessEnabled")
1127 .and_then(|v| v.as_bool())
1128 .unwrap_or(false);
1129 lp.shared_memory_size = value
1130 .get("sharedMemorySize")
1131 .and_then(|v| v.as_i64())
1132 .map(|n| n as i32);
1133 if let Some(arr) = value.get("sysctl").and_then(|v| v.as_array()) {
1134 lp.sysctls = arr.iter().filter_map(parse_sysctl).collect();
1135 }
1136 if let Some(arr) = value.get("tmpfs").and_then(|v| v.as_array()) {
1137 lp.tmpfs = arr.iter().filter_map(parse_tmpfs).collect();
1138 }
1139 lp.privileged = value
1140 .get("privileged")
1141 .and_then(|v| v.as_bool())
1142 .unwrap_or(false);
1143 Some(lp)
1144}
1145
1146fn parse_device(value: &serde_json::Value) -> Option<Device> {
1147 let host_path = value.get("hostPath").and_then(|v| v.as_str())?.to_string();
1148 let container_path = value
1149 .get("containerPath")
1150 .and_then(|v| v.as_str())?
1151 .to_string();
1152 let permissions = value
1153 .get("permissions")
1154 .and_then(|v| v.as_str())
1155 .unwrap_or("rwm")
1156 .to_string();
1157 Some(Device {
1158 host_path,
1159 container_path,
1160 permissions,
1161 })
1162}
1163
1164fn parse_sysctl(value: &serde_json::Value) -> Option<Sysctl> {
1165 let name = value.get("name").and_then(|v| v.as_str())?.to_string();
1166 let value_str = value.get("value").and_then(|v| v.as_str())?.to_string();
1167 Some(Sysctl {
1168 name,
1169 value: value_str,
1170 })
1171}
1172
1173fn parse_tmpfs(value: &serde_json::Value) -> Option<Tmpfs> {
1174 let container_path = value
1175 .get("containerPath")
1176 .and_then(|v| v.as_str())?
1177 .to_string();
1178 let size = value
1179 .get("size")
1180 .and_then(|v| v.as_i64())
1181 .filter(|n| *n > 0)? as i32;
1182 let mount_options = value
1183 .get("mountOptions")
1184 .and_then(|v| v.as_array())
1185 .map(|arr| {
1186 arr.iter()
1187 .filter_map(|v| v.as_str().map(String::from))
1188 .collect()
1189 })
1190 .unwrap_or_default();
1191 Some(Tmpfs {
1192 container_path,
1193 size,
1194 mount_options,
1195 })
1196}
1197
1198pub(crate) fn render_health_flags(hc: &HealthCheckSpec) -> Vec<String> {
1205 if hc.command.len() < 2 {
1206 return Vec::new();
1207 }
1208 let cmd_kind = hc.command[0].as_str();
1209 if cmd_kind != "CMD" && cmd_kind != "CMD-SHELL" {
1210 return Vec::new();
1211 }
1212 let cmd_string = hc.command[1..].join(" ");
1213 vec![
1214 "--health-cmd".into(),
1215 cmd_string,
1216 format!("--health-interval={}s", hc.interval_seconds),
1217 format!("--health-timeout={}s", hc.timeout_seconds),
1218 format!("--health-retries={}", hc.retries),
1219 format!("--health-start-period={}s", hc.start_period_seconds),
1220 ]
1221}
1222
1223#[cfg(test)]
1227pub(crate) fn __test_parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
1228 parse_health_check(value)
1229}
1230
1231pub(crate) fn docker_health_to_ecs(raw: &str) -> &'static str {
1237 match raw.trim().to_ascii_lowercase().as_str() {
1238 "healthy" => "HEALTHY",
1239 "unhealthy" => "UNHEALTHY",
1240 _ => "UNKNOWN",
1241 }
1242}
1243
1244fn parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
1248 let container_port = value
1249 .get("containerPort")
1250 .and_then(|v| v.as_i64())
1251 .filter(|n| (0..=u16::MAX as i64).contains(n))? as u16;
1252 let host_port_raw = value
1253 .get("hostPort")
1254 .and_then(|v| v.as_i64())
1255 .filter(|n| (0..=u16::MAX as i64).contains(n))
1256 .map(|n| n as u16)
1257 .unwrap_or(0);
1258 let host_port = if host_port_raw == 0 {
1259 container_port
1260 } else {
1261 host_port_raw
1262 };
1263 let protocol = value
1264 .get("protocol")
1265 .and_then(|v| v.as_str())
1266 .map(|s| s.to_ascii_lowercase())
1267 .unwrap_or_else(|| "tcp".to_string());
1268 Some(PortMapping {
1269 container_port,
1270 host_port,
1271 protocol,
1272 })
1273}
1274
1275pub(crate) fn fakecloud_instance_label() -> String {
1281 format!("fakecloud-instance=fakecloud-{}", std::process::id())
1282}
1283
1284pub(crate) fn build_run_argv(
1290 plan: &ContainerPlan,
1291 env: &[(String, String)],
1292 task_id: &str,
1293 host_alias: &str,
1294 add_host_arg: Option<&str>,
1295 run_image: &str,
1296 awsvpc_network_ready: bool,
1297) -> Vec<String> {
1298 let mut argv: Vec<String> = Vec::new();
1299 argv.push("run".into());
1300 argv.push("-d".into());
1301 argv.push("--name".into());
1302 argv.push(format!("{}-{}", task_id, plan.container_name));
1303 argv.push("--label".into());
1304 argv.push(format!("fakecloud-ecs-task={}", task_id));
1305 argv.push("--label".into());
1306 argv.push(format!("fakecloud-ecs-container={}", plan.container_name));
1307 argv.push("--label".into());
1314 argv.push(fakecloud_instance_label());
1315 if let Some(arg) = add_host_arg {
1319 argv.push("--add-host".into());
1320 argv.push(arg.to_string());
1321 }
1322 let use_awsvpc_network = plan.network_mode.as_deref() == Some("awsvpc") && awsvpc_network_ready;
1323 if use_awsvpc_network {
1324 argv.push("--network".into());
1325 argv.push(format!("fakecloud-ecs-{}", task_id));
1326 }
1327 let publish_ports = !use_awsvpc_network;
1333 if publish_ports {
1334 for pm in &plan.port_mappings {
1335 argv.push("--publish".into());
1336 argv.push(format!(
1337 "{}:{}/{}",
1338 pm.container_port, pm.host_port, pm.protocol
1339 ));
1340 }
1341 }
1342 if let Some(ref hc) = plan.health_check {
1343 argv.extend(render_health_flags(hc));
1344 }
1345 let http_alias_prefix = format!("http://{host_alias}:");
1346 let https_alias_prefix = format!("https://{host_alias}:");
1347 for (k, v) in env {
1348 let transformed = v
1349 .replace("http://127.0.0.1:", http_alias_prefix.as_str())
1350 .replace("https://127.0.0.1:", https_alias_prefix.as_str())
1351 .replace("http://localhost:", http_alias_prefix.as_str())
1352 .replace("https://localhost:", https_alias_prefix.as_str());
1353 argv.push("-e".into());
1354 argv.push(format!("{}={}", k, transformed));
1355 }
1356 for vm in &plan.volume_mounts {
1361 argv.push("-v".into());
1362 let suffix = if vm.read_only { ":ro" } else { "" };
1363 argv.push(format!("{}:{}{}", vm.source, vm.container_path, suffix));
1364 }
1365 for ul in &plan.ulimits {
1366 argv.push("--ulimit".into());
1367 argv.push(format!("{}={}:{}", ul.name, ul.soft_limit, ul.hard_limit));
1368 }
1369 if let Some(ref lp) = plan.linux_parameters {
1370 for cap in &lp.capabilities_add {
1371 argv.push("--cap-add".into());
1372 argv.push(cap.clone());
1373 }
1374 for cap in &lp.capabilities_drop {
1375 argv.push("--cap-drop".into());
1376 argv.push(cap.clone());
1377 }
1378 for dev in &lp.devices {
1379 argv.push("--device".into());
1380 argv.push(format!(
1381 "{}:{}{}",
1382 dev.host_path, dev.container_path, dev.permissions
1383 ));
1384 }
1385 if lp.init_process_enabled {
1386 argv.push("--init".into());
1387 }
1388 if let Some(size) = lp.shared_memory_size {
1389 argv.push("--shm-size".into());
1390 argv.push(format!("{}m", size));
1391 }
1392 for sys in &lp.sysctls {
1393 argv.push("--sysctl".into());
1394 argv.push(format!("{}={}", sys.name, sys.value));
1395 }
1396 for tmp in &lp.tmpfs {
1397 let mut opts = tmp.mount_options.join(",");
1398 if !opts.is_empty() {
1399 opts = format!(",{}", opts);
1400 }
1401 argv.push("--tmpfs".into());
1402 argv.push(format!("{}:size={}M{}", tmp.container_path, tmp.size, opts));
1403 }
1404 if lp.privileged {
1405 argv.push("--privileged".into());
1406 }
1407 }
1408 if let Some(timeout) = plan.stop_timeout {
1409 argv.push("--stop-timeout".into());
1410 argv.push(format!("{}", timeout));
1411 }
1412 if let Some(ref user) = plan.user {
1413 argv.push("--user".into());
1414 argv.push(user.clone());
1415 }
1416 if let Some(ref wd) = plan.working_directory {
1417 argv.push("--workdir".into());
1418 argv.push(wd.clone());
1419 }
1420 if plan.tty {
1421 argv.push("--tty".into());
1422 }
1423 if plan.interactive {
1424 argv.push("--interactive".into());
1425 }
1426 if plan.readonly_rootfs {
1427 argv.push("--read-only".into());
1428 }
1429 if let Some(first) = plan.entry_point.first() {
1430 argv.push("--entrypoint".into());
1431 argv.push(first.clone());
1432 }
1433 argv.push(run_image.to_string());
1434 for arg in plan.entry_point.iter().skip(1) {
1435 argv.push(arg.clone());
1436 }
1437 for arg in &plan.command {
1438 argv.push(arg.clone());
1439 }
1440 argv
1441}
1442
1443pub(crate) fn network_bindings_for(plan: &ContainerPlan) -> Vec<serde_json::Value> {
1447 if plan.network_mode.as_deref() == Some("awsvpc") {
1448 return Vec::new();
1449 }
1450 plan.port_mappings
1451 .iter()
1452 .map(|pm| {
1453 serde_json::json!({
1454 "bindIP": "0.0.0.0",
1455 "containerPort": pm.container_port,
1456 "hostPort": pm.host_port,
1457 "protocol": pm.protocol,
1458 })
1459 })
1460 .collect()
1461}
1462
1463#[allow(clippy::type_complexity)]
1467pub(crate) fn compute_elbv2_targets(
1468 ecs_state: &crate::state::EcsState,
1469 task: &crate::state::Task,
1470) -> Vec<(String, Vec<(String, Option<i64>)>)> {
1471 let mut result = Vec::new();
1472 let Some(group) = task.group.as_deref() else {
1473 return result;
1474 };
1475 let service_name = group.strip_prefix("service:").unwrap_or(group);
1476 let key = crate::state::EcsState::service_key(&task.cluster_name, service_name);
1477 let Some(service) = ecs_state.services.get(&key) else {
1478 return result;
1479 };
1480
1481 let network_mode = ecs_state
1482 .task_definitions
1483 .get(&task.family)
1484 .and_then(|revs| revs.get(&task.revision))
1485 .and_then(|td| td.network_mode.as_deref());
1486
1487 for lb in &service.load_balancers {
1488 let tg_arn = lb.get("targetGroupArn").and_then(|v| v.as_str());
1489 let container_name = lb.get("containerName").and_then(|v| v.as_str());
1490 let container_port = lb.get("containerPort").and_then(|v| v.as_i64());
1491 let Some(tg_arn) = tg_arn else { continue };
1492 let Some(container_name) = container_name else {
1493 continue;
1494 };
1495
1496 let target_id = if network_mode == Some("awsvpc") {
1497 task.attachments
1498 .iter()
1499 .find(|a| a.attachment_type == "eni")
1500 .and_then(|eni| {
1501 eni.details
1502 .iter()
1503 .find(|d| d.name == "privateIPv4Address")
1504 .map(|d| d.value.clone())
1505 })
1506 } else {
1507 Some("127.0.0.1".to_string())
1508 };
1509
1510 let port = if network_mode == Some("awsvpc") {
1511 container_port
1512 } else {
1513 task.containers
1514 .iter()
1515 .find(|c| c.name == container_name)
1516 .and_then(|c| {
1517 c.network_bindings
1518 .iter()
1519 .find(|nb| {
1520 nb.get("containerPort").and_then(|v| v.as_i64()) == container_port
1521 })
1522 .and_then(|nb| nb.get("hostPort").and_then(|v| v.as_i64()))
1523 })
1524 };
1525
1526 if let Some(id) = target_id {
1527 if let Some(entry) = result.iter_mut().find(|(arn, _)| arn == tg_arn) {
1528 entry.1.push((id, port));
1529 } else {
1530 result.push((tg_arn.to_string(), vec![(id, port)]));
1531 }
1532 }
1533 }
1534 result
1535}
1536
1537struct TaskSnapshot {
1538 task_arn: String,
1539 cluster_arn: String,
1540 launch_type: String,
1541 group: Option<String>,
1542 task_definition_arn: String,
1543 containers: serde_json::Value,
1544}
1545
1546fn snapshot_task(state: &SharedEcsState, account_id: &str, task_id: &str) -> Option<TaskSnapshot> {
1547 let accounts = state.read();
1548 let s = accounts.get(account_id)?;
1549 let task = s.tasks.get(task_id)?;
1550 Some(TaskSnapshot {
1551 task_arn: task.task_arn.clone(),
1552 cluster_arn: task.cluster_arn.clone(),
1553 launch_type: task.launch_type.clone(),
1554 group: task.group.clone(),
1555 task_definition_arn: task.task_definition_arn.clone(),
1556 containers: serde_json::Value::Array(
1557 task.containers
1558 .iter()
1559 .map(|c| {
1560 serde_json::json!({
1561 "containerArn": c.container_arn,
1562 "name": c.name,
1563 "image": c.image,
1564 "lastStatus": c.last_status,
1565 "exitCode": c.exit_code,
1566 "reason": c.reason,
1567 })
1568 })
1569 .collect(),
1570 ),
1571 })
1572}
1573
1574fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
1584 let dir = TempDir::new().ok()?;
1585 let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-ecs-runtime");
1586 let auths: serde_json::Map<String, serde_json::Value> =
1587 fakecloud_core::container_net::registry_auth_hosts(server_port)
1588 .into_iter()
1589 .map(|host| (host, serde_json::json!({ "auth": auth })))
1590 .collect();
1591 let config = serde_json::json!({ "auths": auths });
1592 std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
1593 Some(dir)
1594}
1595
1596fn find_container_definition(
1597 state: &crate::state::EcsState,
1598 family: &str,
1599 revision: i32,
1600 name: &str,
1601) -> Option<serde_json::Value> {
1602 state
1603 .task_definitions
1604 .get(family)?
1605 .get(&revision)?
1606 .container_definitions
1607 .iter()
1608 .find(|c| c.get("name").and_then(|v| v.as_str()) == Some(name))
1609 .cloned()
1610}
1611
1612fn mark_pull_started(state: &SharedEcsState, account_id: &str, task_id: &str) {
1613 let mut accounts = state.write();
1614 let Some(s) = accounts.get_mut(account_id) else {
1615 return;
1616 };
1617 let task_arn_cluster = s
1618 .tasks
1619 .get(task_id)
1620 .map(|t| (t.task_arn.clone(), t.cluster_arn.clone()));
1621 if let Some(task) = s.tasks.get_mut(task_id) {
1622 task.pull_started_at = Some(Utc::now());
1623 }
1624 if let Some((arn, cluster_arn)) = task_arn_cluster {
1625 s.push_event(LifecycleEvent {
1626 at: Utc::now(),
1627 event_type: "PullStarted".into(),
1628 task_arn: Some(arn),
1629 cluster_arn: Some(cluster_arn),
1630 last_status: Some("PENDING".into()),
1631 detail: serde_json::json!({}),
1632 });
1633 }
1634}
1635
1636fn mark_pull_stopped(state: &SharedEcsState, account_id: &str, task_id: &str) {
1637 let mut accounts = state.write();
1638 let Some(s) = accounts.get_mut(account_id) else {
1639 return;
1640 };
1641 if let Some(task) = s.tasks.get_mut(task_id) {
1642 task.pull_stopped_at = Some(Utc::now());
1643 }
1644}
1645
1646pub(crate) fn mark_running_multi(
1647 state: &SharedEcsState,
1648 account_id: &str,
1649 task_id: &str,
1650 started: &[RunningContainer],
1651) {
1652 let mut accounts = state.write();
1653 let Some(s) = accounts.get_mut(account_id) else {
1654 return;
1655 };
1656 let (arn, cluster_arn) = {
1657 let Some(task) = s.tasks.get_mut(task_id) else {
1658 return;
1659 };
1660 task.last_status = "RUNNING".into();
1661 task.connectivity = "CONNECTED".into();
1662 task.connectivity_at = Some(Utc::now());
1663 task.started_at = Some(Utc::now());
1664 for rc in started {
1665 if let Some(c) = task.containers.iter_mut().find(|c| c.name == rc.name) {
1666 c.runtime_id = Some(rc.container_id.clone());
1667 c.last_status = "RUNNING".into();
1668 c.network_bindings = rc.network_bindings.clone();
1669 if rc.image_digest.is_some() {
1670 c.image_digest = rc.image_digest.clone();
1671 }
1672 }
1673 }
1674 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1675 cluster.running_tasks_count += 1;
1676 if cluster.pending_tasks_count > 0 {
1677 cluster.pending_tasks_count -= 1;
1678 }
1679 }
1680 if let Some(ref ci_arn) = task.container_instance_arn {
1681 if let Some(ci) = s
1682 .container_instances
1683 .values_mut()
1684 .find(|ci| ci.container_instance_arn == *ci_arn)
1685 {
1686 ci.running_tasks_count += 1;
1687 if ci.pending_tasks_count > 0 {
1688 ci.pending_tasks_count -= 1;
1689 }
1690 }
1691 }
1692 (task.task_arn.clone(), task.cluster_arn.clone())
1693 };
1694 s.push_event(LifecycleEvent {
1695 at: Utc::now(),
1696 event_type: "TaskStateChange".into(),
1697 task_arn: Some(arn),
1698 cluster_arn: Some(cluster_arn),
1699 last_status: Some("RUNNING".into()),
1700 detail: serde_json::json!({}),
1701 });
1702}
1703
1704#[allow(clippy::too_many_arguments)]
1705fn finalize_stopped_multi(
1706 state: &SharedEcsState,
1707 account_id: &str,
1708 task_id: &str,
1709 final_containers: &[RunningContainer],
1710 primary_exit_code: i64,
1711 captured: &str,
1712 stop_code: &str,
1713 stopped_reason: Option<String>,
1714) {
1715 let mut accounts = state.write();
1716 let Some(s) = accounts.get_mut(account_id) else {
1717 return;
1718 };
1719 let (arn, cluster_arn) = {
1720 let Some(task) = s.tasks.get_mut(task_id) else {
1721 return;
1722 };
1723 task.last_status = "STOPPED".into();
1724 task.desired_status = "STOPPED".into();
1725 task.stopping_at = task.stopping_at.or(Some(Utc::now()));
1726 task.stopped_at = Some(Utc::now());
1727 task.stop_code = Some(stop_code.into());
1728 task.stopped_reason = stopped_reason.or(Some(format!("Exit code {}", primary_exit_code)));
1729 task.captured_logs = captured.to_string();
1730 for c in task.containers.iter_mut() {
1731 c.last_status = "STOPPED".into();
1732 if c.exit_code.is_none() {
1733 let mapped = final_containers
1734 .iter()
1735 .find(|r| r.name == c.name)
1736 .and_then(|r| r.exit_code);
1737 c.exit_code = mapped.or(Some(primary_exit_code));
1738 }
1739 }
1740 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1741 if cluster.running_tasks_count > 0 {
1742 cluster.running_tasks_count -= 1;
1743 }
1744 }
1745 if let Some(ref ci_arn) = task.container_instance_arn {
1746 if let Some(ci) = s
1747 .container_instances
1748 .values_mut()
1749 .find(|ci| ci.container_instance_arn == *ci_arn)
1750 {
1751 if ci.running_tasks_count > 0 {
1752 ci.running_tasks_count -= 1;
1753 }
1754 }
1755 }
1756 (task.task_arn.clone(), task.cluster_arn.clone())
1757 };
1758 s.push_event(LifecycleEvent {
1759 at: Utc::now(),
1760 event_type: "TaskStateChange".into(),
1761 task_arn: Some(arn),
1762 cluster_arn: Some(cluster_arn),
1763 last_status: Some("STOPPED".into()),
1764 detail: serde_json::json!({
1765 "exitCode": primary_exit_code,
1766 "stopCode": stop_code,
1767 }),
1768 });
1769}
1770
1771fn finalize_failure(state: &SharedEcsState, account_id: &str, task_id: &str, reason: &str) {
1772 let mut accounts = state.write();
1773 let Some(s) = accounts.get_mut(account_id) else {
1774 return;
1775 };
1776 let (arn, cluster_arn) = {
1777 let Some(task) = s.tasks.get_mut(task_id) else {
1778 return;
1779 };
1780 let was_running = task.last_status == "RUNNING";
1786 task.last_status = "STOPPED".into();
1787 task.desired_status = "STOPPED".into();
1788 task.stopped_at = Some(Utc::now());
1789 task.stop_code = Some("TaskFailedToStart".into());
1790 task.stopped_reason = Some(reason.to_string());
1791 task.captured_logs = format!("[task failed to start]: {reason}");
1795 for c in task.containers.iter_mut() {
1796 c.last_status = "STOPPED".into();
1797 c.reason = Some(reason.to_string());
1798 }
1799 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1800 if was_running {
1801 if cluster.running_tasks_count > 0 {
1802 cluster.running_tasks_count -= 1;
1803 }
1804 } else if cluster.pending_tasks_count > 0 {
1805 cluster.pending_tasks_count -= 1;
1806 }
1807 }
1808 if let Some(ref ci_arn) = task.container_instance_arn {
1809 if let Some(ci) = s
1810 .container_instances
1811 .values_mut()
1812 .find(|ci| ci.container_instance_arn == *ci_arn)
1813 {
1814 if was_running {
1815 if ci.running_tasks_count > 0 {
1816 ci.running_tasks_count -= 1;
1817 }
1818 } else if ci.pending_tasks_count > 0 {
1819 ci.pending_tasks_count -= 1;
1820 }
1821 }
1822 }
1823 (task.task_arn.clone(), task.cluster_arn.clone())
1824 };
1825 s.push_event(LifecycleEvent {
1826 at: Utc::now(),
1827 event_type: "TaskFailedToStart".into(),
1828 task_arn: Some(arn),
1829 cluster_arn: Some(cluster_arn),
1830 last_status: Some("STOPPED".into()),
1831 detail: serde_json::json!({ "reason": reason }),
1832 });
1833}
1834
1835pub async fn sleep(duration: Duration) {
1839 tokio::time::sleep(duration).await;
1840}
1841
1842#[cfg(test)]
1843mod tests {
1844 use super::*;
1845 use crate::state::{EcsState, Task};
1846 use fakecloud_aws::arn::Arn;
1847 use fakecloud_core::multi_account::MultiAccountState;
1848 use parking_lot::RwLock;
1849 use std::sync::Arc;
1850
1851 #[test]
1852 fn cli_available_for_known_missing_binary_is_false() {
1853 assert!(!fakecloud_core::container_net::cli_available(
1854 "definitely-not-a-real-cli-binary-xyz"
1855 ));
1856 }
1857
1858 #[test]
1859 fn aws_ecr_uris_translate_for_local_pull() {
1860 assert_eq!(
1861 fakecloud_core::ecr_uri::translate_to_local(
1862 "123456789012.dkr.ecr.us-east-1.amazonaws.com/app:latest",
1863 4566
1864 )
1865 .as_deref(),
1866 Some("127.0.0.1:4566/app:latest")
1867 );
1868 }
1869
1870 fn make_task(task_id: &str) -> Task {
1871 Task {
1872 task_arn: Arn::new(
1873 "ecs",
1874 "us-east-1",
1875 "000000000000",
1876 &format!("task/default/{task_id}"),
1877 )
1878 .to_string(),
1879 task_id: task_id.into(),
1880 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
1881 cluster_name: "default".into(),
1882 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
1883 family: "app".into(),
1884 revision: 1,
1885 container_instance_arn: None,
1886 capacity_provider_name: None,
1887 last_status: "PENDING".into(),
1888 desired_status: "RUNNING".into(),
1889 launch_type: "FARGATE".into(),
1890 platform_version: None,
1891 cpu: None,
1892 memory: None,
1893 containers: Vec::new(),
1894 overrides: serde_json::json!({}),
1895 started_by: None,
1896 group: None,
1897 connectivity: "CONNECTING".into(),
1898 stop_code: None,
1899 stopped_reason: None,
1900 created_at: Utc::now(),
1901 started_at: None,
1902 stopping_at: None,
1903 stopped_at: None,
1904 pull_started_at: None,
1905 pull_stopped_at: None,
1906 connectivity_at: None,
1907 started_by_ref_id: None,
1908 execution_role_arn: None,
1909 task_role_arn: None,
1910 tags: Vec::new(),
1911 awslogs: None,
1912 captured_logs: String::new(),
1913 protection: None,
1914 enable_execute_command: false,
1915 attachments: Vec::new(),
1916 volume_configurations: Vec::new(),
1917 task_set_arn: None,
1918 }
1919 }
1920
1921 #[test]
1922 fn finalize_failure_writes_reason_into_captured_logs() {
1923 let mut accounts: MultiAccountState<EcsState> =
1924 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1925 let acct = accounts.get_or_create("000000000000");
1926 acct.tasks.insert("t1".into(), make_task("t1"));
1927 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1928
1929 finalize_failure(
1930 &state,
1931 "000000000000",
1932 "t1",
1933 "failed to resolve secret DB_PASSWORD",
1934 );
1935
1936 let accounts = state.read();
1937 let task = accounts
1938 .get("000000000000")
1939 .unwrap()
1940 .tasks
1941 .get("t1")
1942 .unwrap();
1943 assert_eq!(task.last_status, "STOPPED");
1944 assert_eq!(task.stop_code.as_deref(), Some("TaskFailedToStart"));
1945 assert!(
1946 task.captured_logs
1947 .contains("failed to resolve secret DB_PASSWORD"),
1948 "captured_logs missing reason: {:?}",
1949 task.captured_logs
1950 );
1951 assert!(
1952 task.captured_logs.starts_with("[task failed to start]:"),
1953 "captured_logs missing prefix: {:?}",
1954 task.captured_logs
1955 );
1956 }
1957
1958 #[test]
1963 fn task_desired_stopped_detects_stop_during_launch() {
1964 let mut accounts: MultiAccountState<EcsState> =
1965 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1966 let acct = accounts.get_or_create("000000000000");
1967 acct.tasks.insert("running".into(), make_task("running"));
1968 let mut stopping = make_task("stopping");
1969 stopping.desired_status = "STOPPED".into();
1970 acct.tasks.insert("stopping".into(), stopping);
1971 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1972
1973 assert!(
1974 !task_desired_stopped(&state, "000000000000", "running"),
1975 "a RUNNING task must not be treated as stopped",
1976 );
1977 assert!(
1978 task_desired_stopped(&state, "000000000000", "stopping"),
1979 "a task whose desired_status is STOPPED must be treated as stopped",
1980 );
1981 assert!(
1982 task_desired_stopped(&state, "000000000000", "deleted-mid-launch"),
1983 "a task removed from state mid-launch must be treated as stopped",
1984 );
1985 }
1986
1987 fn make_container(name: &str, essential: bool) -> crate::state::Container {
1988 crate::state::Container {
1989 container_arn: format!(
1990 "arn:aws:ecs:us-east-1:000000000000:container/default/abc/{name}"
1991 ),
1992 name: name.into(),
1993 image: "alpine".into(),
1994 task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
1995 last_status: "RUNNING".into(),
1996 exit_code: None,
1997 reason: None,
1998 runtime_id: Some(format!("dockerid-{name}")),
1999 essential,
2000 cpu: None,
2001 memory: None,
2002 memory_reservation: None,
2003 network_bindings: Vec::new(),
2004 network_interfaces: Vec::new(),
2005 health_status: None,
2006 managed_agents: None,
2007 image_digest: None,
2008 }
2009 }
2010
2011 #[test]
2012 fn task_should_stop_when_essential_exits() {
2013 let containers = vec![
2014 RunningContainer {
2015 name: "app".into(),
2016 container_id: "id-app".into(),
2017 essential: true,
2018 exit_code: Some(0),
2019 network_bindings: Vec::new(),
2020 image_digest: None,
2021 },
2022 RunningContainer {
2023 name: "sidecar".into(),
2024 container_id: "id-sc".into(),
2025 essential: false,
2026 exit_code: None,
2027 network_bindings: Vec::new(),
2028 image_digest: None,
2029 },
2030 ];
2031 assert!(task_should_stop(&containers));
2032 }
2033
2034 #[test]
2035 fn task_keeps_running_when_only_non_essential_exits() {
2036 let containers = vec![
2037 RunningContainer {
2038 name: "app".into(),
2039 container_id: "id-app".into(),
2040 essential: true,
2041 exit_code: None,
2042 network_bindings: Vec::new(),
2043 image_digest: None,
2044 },
2045 RunningContainer {
2046 name: "sidecar".into(),
2047 container_id: "id-sc".into(),
2048 essential: false,
2049 exit_code: Some(0),
2050 network_bindings: Vec::new(),
2051 image_digest: None,
2052 },
2053 ];
2054 assert!(!task_should_stop(&containers));
2055 }
2056
2057 #[test]
2058 fn task_stops_when_all_non_essentials_exit() {
2059 let containers = vec![
2060 RunningContainer {
2061 name: "a".into(),
2062 container_id: "id-a".into(),
2063 essential: false,
2064 exit_code: Some(0),
2065 network_bindings: Vec::new(),
2066 image_digest: None,
2067 },
2068 RunningContainer {
2069 name: "b".into(),
2070 container_id: "id-b".into(),
2071 essential: false,
2072 exit_code: Some(1),
2073 network_bindings: Vec::new(),
2074 image_digest: None,
2075 },
2076 ];
2077 assert!(task_should_stop(&containers));
2078 }
2079
2080 #[test]
2081 fn finalize_stopped_multi_assigns_per_container_exit_codes() {
2082 let mut accounts: MultiAccountState<EcsState> =
2083 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2084 let acct = accounts.get_or_create("000000000000");
2085 let mut t = make_task("t1");
2086 t.containers = vec![
2087 make_container("app", true),
2088 make_container("sidecar", false),
2089 ];
2090 acct.tasks.insert("t1".into(), t);
2091 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
2092
2093 let final_containers = vec![
2094 RunningContainer {
2095 name: "app".into(),
2096 container_id: "id-app".into(),
2097 essential: true,
2098 exit_code: Some(0),
2099 network_bindings: Vec::new(),
2100 image_digest: None,
2101 },
2102 RunningContainer {
2103 name: "sidecar".into(),
2104 container_id: "id-sc".into(),
2105 essential: false,
2106 exit_code: Some(137),
2107 network_bindings: Vec::new(),
2108 image_digest: None,
2109 },
2110 ];
2111 finalize_stopped_multi(
2112 &state,
2113 "000000000000",
2114 "t1",
2115 &final_containers,
2116 0,
2117 "captured",
2118 "EssentialContainerExited",
2119 None,
2120 );
2121
2122 let accounts = state.read();
2123 let task = accounts
2124 .get("000000000000")
2125 .unwrap()
2126 .tasks
2127 .get("t1")
2128 .unwrap();
2129 assert_eq!(task.last_status, "STOPPED");
2130 assert_eq!(task.stop_code.as_deref(), Some("EssentialContainerExited"));
2131 let app = task.containers.iter().find(|c| c.name == "app").unwrap();
2132 let sc = task
2133 .containers
2134 .iter()
2135 .find(|c| c.name == "sidecar")
2136 .unwrap();
2137 assert_eq!(app.exit_code, Some(0));
2138 assert_eq!(sc.exit_code, Some(137));
2139 assert_eq!(app.last_status, "STOPPED");
2140 assert_eq!(sc.last_status, "STOPPED");
2141 }
2142
2143 fn plan(name: &str, deps: &[&str]) -> ContainerPlan {
2144 ContainerPlan {
2145 container_name: name.into(),
2146 image: "alpine".into(),
2147 env: Vec::new(),
2148 entry_point: Vec::new(),
2149 command: Vec::new(),
2150 secrets_refs: Vec::new(),
2151 essential: true,
2152 has_task_role: false,
2153 port_mappings: Vec::new(),
2154 network_mode: None,
2155 depends_on: deps
2156 .iter()
2157 .map(|s| DependsOn {
2158 container_name: (*s).to_string(),
2159 condition: DependsOnCondition::Start,
2160 })
2161 .collect(),
2162 health_check: None,
2163 volume_mounts: Vec::new(),
2164 ulimits: Vec::new(),
2165 linux_parameters: None,
2166 stop_timeout: None,
2167 user: None,
2168 working_directory: None,
2169 tty: false,
2170 interactive: false,
2171 readonly_rootfs: false,
2172 }
2173 }
2174
2175 #[test]
2176 fn topo_sort_orders_by_depends_on() {
2177 let plans = vec![plan("sidecar", &["app"]), plan("app", &[])];
2180 let ordered = topo_sort_plans(plans);
2181 assert_eq!(ordered[0].container_name, "app");
2182 assert_eq!(ordered[1].container_name, "sidecar");
2183 }
2184
2185 #[test]
2186 fn topo_sort_preserves_declaration_order_when_no_deps() {
2187 let plans = vec![plan("first", &[]), plan("second", &[]), plan("third", &[])];
2188 let ordered = topo_sort_plans(plans);
2189 let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2190 assert_eq!(names, vec!["first", "second", "third"]);
2191 }
2192
2193 #[test]
2194 fn topo_sort_handles_chain() {
2195 let plans = vec![plan("c", &["b"]), plan("b", &["a"]), plan("a", &[])];
2198 let ordered = topo_sort_plans(plans);
2199 let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2200 assert_eq!(names, vec!["a", "b", "c"]);
2201 }
2202
2203 #[test]
2204 fn topo_sort_ignores_unknown_dependency() {
2205 let plans = vec![plan("only", &["does-not-exist"])];
2209 let ordered = topo_sort_plans(plans);
2210 assert_eq!(ordered.len(), 1);
2211 assert_eq!(ordered[0].container_name, "only");
2212 }
2213
2214 #[test]
2215 fn topo_sort_recovers_from_cycle() {
2216 let plans = vec![plan("a", &["b"]), plan("b", &["a"])];
2219 let ordered = topo_sort_plans(plans);
2220 assert_eq!(ordered.len(), 2);
2221 }
2222
2223 #[test]
2224 fn parse_health_check_fills_aws_defaults() {
2225 let v = serde_json::json!({
2226 "command": ["CMD-SHELL", "curl -f http://localhost/ || exit 1"],
2227 });
2228 let hc = __test_parse_health_check(&v).expect("parsed");
2229 assert_eq!(hc.command[0], "CMD-SHELL");
2230 assert_eq!(hc.interval_seconds, 30);
2231 assert_eq!(hc.timeout_seconds, 5);
2232 assert_eq!(hc.retries, 3);
2233 assert_eq!(hc.start_period_seconds, 0);
2234 }
2235
2236 #[test]
2237 fn parse_health_check_overrides_explicit_values() {
2238 let v = serde_json::json!({
2239 "command": ["CMD", "/probe"],
2240 "interval": 7,
2241 "timeout": 2,
2242 "retries": 9,
2243 "startPeriod": 12,
2244 });
2245 let hc = __test_parse_health_check(&v).expect("parsed");
2246 assert_eq!(hc.interval_seconds, 7);
2247 assert_eq!(hc.timeout_seconds, 2);
2248 assert_eq!(hc.retries, 9);
2249 assert_eq!(hc.start_period_seconds, 12);
2250 }
2251
2252 #[test]
2253 fn parse_health_check_returns_none_for_none_sentinel() {
2254 let v = serde_json::json!({ "command": ["NONE"] });
2257 assert!(__test_parse_health_check(&v).is_none());
2258 }
2259
2260 #[test]
2261 fn parse_health_check_returns_none_for_missing_command() {
2262 let v = serde_json::json!({ "interval": 30 });
2263 assert!(__test_parse_health_check(&v).is_none());
2264 }
2265
2266 #[test]
2267 fn render_health_flags_emits_full_set_for_cmd_shell() {
2268 let hc = HealthCheckSpec {
2269 command: vec!["CMD-SHELL".into(), "curl -f http://localhost/".into()],
2270 interval_seconds: 15,
2271 timeout_seconds: 3,
2272 retries: 4,
2273 start_period_seconds: 10,
2274 };
2275 let flags = render_health_flags(&hc);
2276 assert_eq!(flags[0], "--health-cmd");
2277 assert_eq!(flags[1], "curl -f http://localhost/");
2278 assert!(flags.contains(&"--health-interval=15s".to_string()));
2279 assert!(flags.contains(&"--health-timeout=3s".to_string()));
2280 assert!(flags.contains(&"--health-retries=4".to_string()));
2281 assert!(flags.contains(&"--health-start-period=10s".to_string()));
2282 }
2283
2284 #[test]
2285 fn render_health_flags_joins_cmd_argv_with_spaces() {
2286 let hc = HealthCheckSpec {
2289 command: vec![
2290 "CMD".into(),
2291 "/bin/probe".into(),
2292 "--port".into(),
2293 "8080".into(),
2294 ],
2295 interval_seconds: 30,
2296 timeout_seconds: 5,
2297 retries: 3,
2298 start_period_seconds: 0,
2299 };
2300 let flags = render_health_flags(&hc);
2301 assert_eq!(flags[1], "/bin/probe --port 8080");
2302 }
2303
2304 #[test]
2305 fn build_run_argv_emits_health_flags_when_present() {
2306 let plan = ContainerPlan {
2307 container_name: "app".into(),
2308 image: "alpine".into(),
2309 env: Vec::new(),
2310 entry_point: Vec::new(),
2311 command: Vec::new(),
2312 secrets_refs: Vec::new(),
2313 essential: true,
2314 has_task_role: false,
2315 port_mappings: Vec::new(),
2316 network_mode: None,
2317 depends_on: Vec::new(),
2318 health_check: Some(HealthCheckSpec {
2319 command: vec!["CMD-SHELL".into(), "true".into()],
2320 interval_seconds: 5,
2321 timeout_seconds: 2,
2322 retries: 1,
2323 start_period_seconds: 1,
2324 }),
2325 volume_mounts: Vec::new(),
2326 ulimits: Vec::new(),
2327 linux_parameters: None,
2328 stop_timeout: None,
2329 user: None,
2330 working_directory: None,
2331 tty: false,
2332 interactive: false,
2333 readonly_rootfs: false,
2334 };
2335 let argv = build_run_argv(
2336 &plan,
2337 &[],
2338 "task-1",
2339 "host.docker.internal",
2340 None,
2341 "alpine",
2342 true,
2343 );
2344 let joined = argv.join(" ");
2345 assert!(joined.contains("--health-cmd true"), "argv: {joined}");
2346 assert!(joined.contains("--health-interval=5s"), "argv: {joined}");
2347 assert!(joined.contains("--health-timeout=2s"), "argv: {joined}");
2348 assert!(joined.contains("--health-retries=1"), "argv: {joined}");
2349 assert!(
2350 joined.contains("--health-start-period=1s"),
2351 "argv: {joined}"
2352 );
2353 }
2354
2355 #[test]
2356 fn build_run_argv_emits_no_health_flags_when_absent() {
2357 let plan = ContainerPlan {
2358 container_name: "app".into(),
2359 image: "alpine".into(),
2360 env: Vec::new(),
2361 entry_point: Vec::new(),
2362 command: Vec::new(),
2363 secrets_refs: Vec::new(),
2364 essential: true,
2365 has_task_role: false,
2366 port_mappings: Vec::new(),
2367 network_mode: None,
2368 depends_on: Vec::new(),
2369 health_check: None,
2370 volume_mounts: Vec::new(),
2371 ulimits: Vec::new(),
2372 linux_parameters: None,
2373 stop_timeout: None,
2374 user: None,
2375 working_directory: None,
2376 tty: false,
2377 interactive: false,
2378 readonly_rootfs: false,
2379 };
2380 let argv = build_run_argv(
2381 &plan,
2382 &[],
2383 "task-1",
2384 "host.docker.internal",
2385 None,
2386 "alpine",
2387 true,
2388 );
2389 assert!(!argv.iter().any(|s| s.starts_with("--health")));
2390 }
2391
2392 #[test]
2393 fn docker_health_to_ecs_maps_known_states() {
2394 assert_eq!(docker_health_to_ecs("healthy"), "HEALTHY");
2395 assert_eq!(docker_health_to_ecs("HEALTHY"), "HEALTHY");
2396 assert_eq!(docker_health_to_ecs("unhealthy"), "UNHEALTHY");
2397 assert_eq!(docker_health_to_ecs("starting"), "UNKNOWN");
2398 assert_eq!(docker_health_to_ecs("none"), "UNKNOWN");
2399 assert_eq!(docker_health_to_ecs(""), "UNKNOWN");
2400 }
2401
2402 #[test]
2405 fn resolve_host_bind_volume_uses_source_path() {
2406 let mut volumes = std::collections::HashMap::new();
2407 let v = serde_json::json!({
2408 "name": "data",
2409 "host": { "sourcePath": "/var/lib/myapp" }
2410 });
2411 volumes.insert("data".to_string(), &v);
2412 let mp = serde_json::json!({
2413 "sourceVolume": "data",
2414 "containerPath": "/app/data",
2415 "readOnly": false
2416 });
2417 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2418 assert_eq!(resolved.source, "/var/lib/myapp");
2419 assert_eq!(resolved.container_path, "/app/data");
2420 assert!(!resolved.read_only);
2421 }
2422
2423 #[test]
2426 fn read_only_mount_renders_ro_suffix() {
2427 let plan = ContainerPlan {
2428 container_name: "app".into(),
2429 image: "alpine".into(),
2430 env: Vec::new(),
2431 entry_point: Vec::new(),
2432 command: Vec::new(),
2433 secrets_refs: Vec::new(),
2434 essential: true,
2435 has_task_role: false,
2436 port_mappings: Vec::new(),
2437 network_mode: None,
2438 depends_on: Vec::new(),
2439 health_check: None,
2440 volume_mounts: vec![VolumeMount {
2441 source: "/host/path".into(),
2442 container_path: "/in/container".into(),
2443 read_only: true,
2444 cleanup_on_stop: false,
2445 }],
2446 ulimits: Vec::new(),
2447 linux_parameters: None,
2448 stop_timeout: None,
2449 user: None,
2450 working_directory: None,
2451 tty: false,
2452 interactive: false,
2453 readonly_rootfs: false,
2454 };
2455 let argv = build_run_argv(
2456 &plan,
2457 &[],
2458 "task-1",
2459 "host.docker.internal",
2460 None,
2461 "alpine",
2462 true,
2463 );
2464 let pair = argv
2465 .windows(2)
2466 .find(|w| w[0] == "-v")
2467 .expect("expected -v flag");
2468 assert_eq!(pair[1], "/host/path:/in/container:ro");
2469 }
2470
2471 #[test]
2476 fn resolve_efs_volume_uses_stub_dir() {
2477 let mut volumes = std::collections::HashMap::new();
2478 let v = serde_json::json!({
2479 "name": "efs-vol",
2480 "efsVolumeConfiguration": {
2481 "fileSystemId": "fs-12345678",
2482 "rootDirectory": "/exports/app"
2483 }
2484 });
2485 volumes.insert("efs-vol".to_string(), &v);
2486 let mp = serde_json::json!({
2487 "sourceVolume": "efs-vol",
2488 "containerPath": "/mnt/efs"
2489 });
2490 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2491 assert_eq!(resolved.source, "fakecloud-efs-fs-12345678-exports-app");
2494 assert_eq!(resolved.container_path, "/mnt/efs");
2495 }
2496
2497 #[test]
2501 fn efs_without_root_directory_uses_filesystem_root() {
2502 assert_eq!(
2505 shared_volume_name("efs", "fs-abc", "/"),
2506 "fakecloud-efs-fs-abc"
2507 );
2508 assert_eq!(
2509 shared_volume_name("efs", "fs-abc", ""),
2510 "fakecloud-efs-fs-abc"
2511 );
2512 }
2513
2514 #[test]
2518 fn resolve_docker_named_volume_uses_volume_name() {
2519 let mut volumes = std::collections::HashMap::new();
2520 let v = serde_json::json!({
2521 "name": "named-vol",
2522 "dockerVolumeConfiguration": {
2523 "scope": "task",
2524 "driver": "local"
2525 }
2526 });
2527 volumes.insert("named-vol".to_string(), &v);
2528 let mp = serde_json::json!({
2529 "sourceVolume": "named-vol",
2530 "containerPath": "/data"
2531 });
2532 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2533 assert_eq!(resolved.source, "named-vol");
2534 assert_eq!(resolved.container_path, "/data");
2535 }
2536
2537 #[test]
2540 fn cleanup_on_stop_matches_aws_volume_scope() {
2541 let cases = [
2542 (
2544 serde_json::json!({ "name": "v", "host": { "sourcePath": "/data" } }),
2545 false,
2546 ),
2547 (
2548 serde_json::json!({ "name": "v", "efsVolumeConfiguration": { "fileSystemId": "fs-a" } }),
2549 false,
2550 ),
2551 (
2552 serde_json::json!({ "name": "v", "fsxWindowsFileServerVolumeConfiguration": { "fileSystemId": "fs-b" } }),
2553 false,
2554 ),
2555 (
2556 serde_json::json!({ "name": "v", "dockerVolumeConfiguration": { "scope": "shared" } }),
2557 false,
2558 ),
2559 (
2560 serde_json::json!({ "name": "v", "dockerVolumeConfiguration": { "scope": "task" } }),
2561 true,
2562 ),
2563 (
2565 serde_json::json!({ "name": "v", "dockerVolumeConfiguration": {} }),
2566 true,
2567 ),
2568 (serde_json::json!({ "name": "v" }), true),
2570 (
2571 serde_json::json!({ "name": "v", "host": { "sourcePath": "" } }),
2572 true,
2573 ),
2574 ];
2575 for (vol, expected) in cases {
2576 assert_eq!(
2577 volume_is_task_scoped(&vol),
2578 expected,
2579 "unexpected cleanup_on_stop for {vol}"
2580 );
2581 }
2582 }
2583
2584 #[test]
2587 fn resolve_fsx_volume_uses_stub_dir() {
2588 let mut volumes = std::collections::HashMap::new();
2589 let v = serde_json::json!({
2590 "name": "fsx-vol",
2591 "fsxWindowsFileServerVolumeConfiguration": {
2592 "fileSystemId": "fs-xyz",
2593 "rootDirectory": "share"
2594 }
2595 });
2596 volumes.insert("fsx-vol".to_string(), &v);
2597 let mp = serde_json::json!({
2598 "sourceVolume": "fsx-vol",
2599 "containerPath": "C:\\data"
2600 });
2601 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2602 assert_eq!(resolved.source, "fakecloud-fsx-fs-xyz-share");
2604 }
2605
2606 #[test]
2610 fn unknown_source_volume_returns_none() {
2611 let volumes = std::collections::HashMap::new();
2612 let mp = serde_json::json!({
2613 "sourceVolume": "missing",
2614 "containerPath": "/x"
2615 });
2616 assert!(resolve_mount_point(&mp, &volumes).is_none());
2617 }
2618
2619 #[test]
2623 fn find_depends_on_cycle_detects_two_node_cycle() {
2624 let cds = vec![
2625 serde_json::json!({
2626 "name": "a",
2627 "image": "alpine",
2628 "dependsOn": [{"containerName": "b", "condition": "START"}],
2629 }),
2630 serde_json::json!({
2631 "name": "b",
2632 "image": "alpine",
2633 "dependsOn": [{"containerName": "a", "condition": "START"}],
2634 }),
2635 ];
2636 let cycle = find_depends_on_cycle(&cds);
2637 assert!(cycle.is_some(), "expected cycle to be detected");
2638 }
2639
2640 #[test]
2644 fn find_depends_on_cycle_accepts_chain() {
2645 let cds = vec![
2646 serde_json::json!({
2647 "name": "a",
2648 "image": "alpine",
2649 "dependsOn": [{"containerName": "b", "condition": "START"}],
2650 }),
2651 serde_json::json!({
2652 "name": "b",
2653 "image": "alpine",
2654 "dependsOn": [{"containerName": "c", "condition": "START"}],
2655 }),
2656 serde_json::json!({
2657 "name": "c",
2658 "image": "alpine",
2659 }),
2660 ];
2661 assert!(find_depends_on_cycle(&cds).is_none());
2662 }
2663
2664 #[test]
2668 fn find_depends_on_cycle_ignores_unknown_target() {
2669 let cds = vec![serde_json::json!({
2670 "name": "only",
2671 "image": "alpine",
2672 "dependsOn": [{"containerName": "ghost", "condition": "START"}],
2673 })];
2674 assert!(find_depends_on_cycle(&cds).is_none());
2675 }
2676
2677 #[test]
2681 fn condition_is_met_matches_aws_semantics() {
2682 let running = InspectedState {
2683 started: true,
2684 exited: false,
2685 exit_code: 0,
2686 health: None,
2687 };
2688 let exited_ok = InspectedState {
2689 started: true,
2690 exited: true,
2691 exit_code: 0,
2692 health: None,
2693 };
2694 let exited_fail = InspectedState {
2695 started: true,
2696 exited: true,
2697 exit_code: 1,
2698 health: None,
2699 };
2700 let healthy = InspectedState {
2701 started: true,
2702 exited: false,
2703 exit_code: 0,
2704 health: Some("healthy".into()),
2705 };
2706
2707 assert!(condition_is_met(DependsOnCondition::Start, &running));
2710 assert!(condition_is_met(DependsOnCondition::Start, &exited_ok));
2711
2712 assert!(!condition_is_met(DependsOnCondition::Complete, &running));
2714 assert!(condition_is_met(DependsOnCondition::Complete, &exited_ok));
2715 assert!(condition_is_met(DependsOnCondition::Complete, &exited_fail));
2716
2717 assert!(!condition_is_met(DependsOnCondition::Success, &running));
2719 assert!(condition_is_met(DependsOnCondition::Success, &exited_ok));
2720 assert!(!condition_is_met(DependsOnCondition::Success, &exited_fail));
2721
2722 assert!(!condition_is_met(DependsOnCondition::Healthy, &running));
2724 assert!(condition_is_met(DependsOnCondition::Healthy, &healthy));
2725 }
2726
2727 #[test]
2731 fn depends_on_condition_parse_round_trips() {
2732 assert_eq!(
2733 DependsOnCondition::parse("START"),
2734 Some(DependsOnCondition::Start)
2735 );
2736 assert_eq!(
2737 DependsOnCondition::parse("COMPLETE"),
2738 Some(DependsOnCondition::Complete)
2739 );
2740 assert_eq!(
2741 DependsOnCondition::parse("SUCCESS"),
2742 Some(DependsOnCondition::Success)
2743 );
2744 assert_eq!(
2745 DependsOnCondition::parse("HEALTHY"),
2746 Some(DependsOnCondition::Healthy)
2747 );
2748 assert_eq!(DependsOnCondition::parse("start"), None);
2749 assert_eq!(DependsOnCondition::parse("ANY"), None);
2750 }
2751
2752 #[test]
2755 fn build_run_argv_emits_ulimits() {
2756 let plan = ContainerPlan {
2757 container_name: "app".into(),
2758 image: "alpine".into(),
2759 env: Vec::new(),
2760 entry_point: Vec::new(),
2761 command: Vec::new(),
2762 secrets_refs: Vec::new(),
2763 essential: true,
2764 has_task_role: false,
2765 port_mappings: Vec::new(),
2766 network_mode: None,
2767 depends_on: Vec::new(),
2768 health_check: None,
2769 volume_mounts: Vec::new(),
2770 ulimits: vec![Ulimit {
2771 name: "nofile".into(),
2772 soft_limit: 1024,
2773 hard_limit: 2048,
2774 }],
2775 linux_parameters: None,
2776 stop_timeout: None,
2777 user: None,
2778 working_directory: None,
2779 tty: false,
2780 interactive: false,
2781 readonly_rootfs: false,
2782 };
2783 let argv = build_run_argv(&plan, &[], "t", "host.docker.internal", None, "img", true);
2784 assert!(argv.contains(&"--ulimit".to_string()));
2785 assert!(argv.contains(&"nofile=1024:2048".to_string()));
2786 }
2787
2788 #[test]
2789 fn build_run_argv_emits_linux_parameters() {
2790 let plan = ContainerPlan {
2791 container_name: "app".into(),
2792 image: "alpine".into(),
2793 env: Vec::new(),
2794 entry_point: Vec::new(),
2795 command: Vec::new(),
2796 secrets_refs: Vec::new(),
2797 essential: true,
2798 has_task_role: false,
2799 port_mappings: Vec::new(),
2800 network_mode: None,
2801 depends_on: Vec::new(),
2802 health_check: None,
2803 volume_mounts: Vec::new(),
2804 ulimits: Vec::new(),
2805 linux_parameters: Some(LinuxParameters {
2806 capabilities_add: vec!["NET_ADMIN".into()],
2807 capabilities_drop: vec!["ALL".into()],
2808 devices: vec![Device {
2809 host_path: "/dev/zero".into(),
2810 container_path: "/dev/zero".into(),
2811 permissions: "rwm".into(),
2812 }],
2813 init_process_enabled: true,
2814 shared_memory_size: Some(256),
2815 sysctls: vec![Sysctl {
2816 name: "net.ipv4.ip_forward".into(),
2817 value: "1".into(),
2818 }],
2819 tmpfs: vec![Tmpfs {
2820 container_path: "/tmp".into(),
2821 size: 128,
2822 mount_options: vec!["noexec".into()],
2823 }],
2824 privileged: true,
2825 }),
2826 stop_timeout: Some(30),
2827 user: Some("1000:1000".into()),
2828 working_directory: Some("/app".into()),
2829 tty: true,
2830 interactive: true,
2831 readonly_rootfs: true,
2832 };
2833 let argv = build_run_argv(&plan, &[], "t", "host.docker.internal", None, "img", true);
2834 assert!(argv.contains(&"--cap-add".to_string()));
2835 assert!(argv.contains(&"NET_ADMIN".to_string()));
2836 assert!(argv.contains(&"--cap-drop".to_string()));
2837 assert!(argv.contains(&"ALL".to_string()));
2838 assert!(argv.contains(&"--device".to_string()));
2839 assert!(argv.contains(&"/dev/zero:/dev/zerorwm".to_string()));
2840 assert!(argv.contains(&"--init".to_string()));
2841 assert!(argv.contains(&"--shm-size".to_string()));
2842 assert!(argv.contains(&"256m".to_string()));
2843 assert!(argv.contains(&"--sysctl".to_string()));
2844 assert!(argv.contains(&"net.ipv4.ip_forward=1".to_string()));
2845 assert!(argv.contains(&"--tmpfs".to_string()));
2846 assert!(argv.contains(&"--privileged".to_string()));
2847 assert!(argv.contains(&"--stop-timeout".to_string()));
2848 assert!(argv.contains(&"30".to_string()));
2849 assert!(argv.contains(&"--user".to_string()));
2850 assert!(argv.contains(&"1000:1000".to_string()));
2851 assert!(argv.contains(&"--workdir".to_string()));
2852 assert!(argv.contains(&"/app".to_string()));
2853 assert!(argv.contains(&"--tty".to_string()));
2854 assert!(argv.contains(&"--interactive".to_string()));
2855 assert!(argv.contains(&"--read-only".to_string()));
2856 }
2857
2858 #[test]
2859 fn parse_linux_parameters_fills_defaults() {
2860 let raw = serde_json::json!({"initProcessEnabled": true});
2861 let lp = parse_linux_parameters(&raw).expect("parses");
2862 assert!(lp.init_process_enabled);
2863 assert!(!lp.privileged);
2864 assert!(lp.capabilities_add.is_empty());
2865 }
2866
2867 #[test]
2868 fn parse_device_uses_default_permissions() {
2869 let raw = serde_json::json!({"hostPath": "/dev/null", "containerPath": "/dev/null"});
2870 let dev = parse_device(&raw).expect("parses");
2871 assert_eq!(dev.permissions, "rwm");
2872 }
2873
2874 #[test]
2875 fn compute_elbv2_targets_empty_when_no_group() {
2876 let mut accounts: MultiAccountState<EcsState> =
2877 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2878 let acct = accounts.get_or_create("000000000000");
2879 let mut task = make_task("t1");
2880 task.group = None;
2881 acct.tasks.insert("t1".into(), task);
2882 let state = acct.clone();
2883 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2884 assert!(targets.is_empty());
2885 }
2886
2887 #[test]
2888 fn compute_elbv2_targets_bridge_mode_uses_localhost_and_host_port() {
2889 let mut accounts: MultiAccountState<EcsState> =
2890 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2891 let acct = accounts.get_or_create("000000000000");
2892
2893 let td = crate::state::TaskDefinition {
2894 family: "app".into(),
2895 revision: 1,
2896 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2897 container_definitions: Vec::new(),
2898 network_mode: Some("bridge".into()),
2899 status: "ACTIVE".into(),
2900 task_role_arn: None,
2901 execution_role_arn: None,
2902 requires_compatibilities: Vec::new(),
2903 compatibilities: Vec::new(),
2904 cpu: None,
2905 memory: None,
2906 pid_mode: None,
2907 ipc_mode: None,
2908 volumes: Vec::new(),
2909 placement_constraints: Vec::new(),
2910 proxy_configuration: None,
2911 inference_accelerators: Vec::new(),
2912 ephemeral_storage: None,
2913 runtime_platform: None,
2914 requires_attributes: Vec::new(),
2915 registered_at: Utc::now(),
2916 registered_by: None,
2917 deregistered_at: None,
2918 tags: Vec::new(),
2919 enable_fault_injection: None,
2920 };
2921 acct.task_definitions.insert("app".into(), {
2922 let mut m = std::collections::BTreeMap::new();
2923 m.insert(1, td);
2924 m
2925 });
2926
2927 let service = crate::state::Service {
2928 service_name: "svc".into(),
2929 service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
2930 cluster_name: "default".into(),
2931 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2932 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2933 family: "app".into(),
2934 revision: 1,
2935 desired_count: 1,
2936 running_count: 0,
2937 pending_count: 0,
2938 launch_type: "FARGATE".into(),
2939 status: "ACTIVE".into(),
2940 scheduling_strategy: "REPLICA".into(),
2941 deployment_controller: "ECS".into(),
2942 minimum_healthy_percent: Some(0),
2943 maximum_percent: Some(200),
2944 circuit_breaker: None,
2945 deployments: Vec::new(),
2946 load_balancers: vec![serde_json::json!({
2947 "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
2948 "containerName": "app",
2949 "containerPort": 80,
2950 })],
2951 service_registries: Vec::new(),
2952 placement_constraints: Vec::new(),
2953 placement_strategy: Vec::new(),
2954 network_configuration: None,
2955 volume_configurations: vec![],
2956 tags: Vec::new(),
2957 created_at: Utc::now(),
2958 created_by: None,
2959 role_arn: None,
2960 platform_version: None,
2961 health_check_grace_period_seconds: None,
2962 enable_execute_command: false,
2963 enable_ecs_managed_tags: false,
2964 propagate_tags: None,
2965 capacity_provider_strategy: Vec::new(),
2966 availability_zone_rebalancing: None,
2967 };
2968 acct.services.insert(
2969 crate::state::EcsState::service_key("default", "svc"),
2970 service,
2971 );
2972
2973 let mut task = make_task("t1");
2974 task.group = Some("service:svc".into());
2975 task.containers = vec![crate::state::Container {
2976 container_arn: "arn:aws:ecs:us-east-1:000000000000:container/default/abc/app".into(),
2977 name: "app".into(),
2978 image: "alpine".into(),
2979 task_arn: task.task_arn.clone(),
2980 last_status: "RUNNING".into(),
2981 exit_code: None,
2982 reason: None,
2983 runtime_id: Some("dockerid-app".into()),
2984 essential: true,
2985 cpu: None,
2986 memory: None,
2987 memory_reservation: None,
2988 network_bindings: vec![serde_json::json!({
2989 "bindIP": "0.0.0.0",
2990 "containerPort": 80,
2991 "hostPort": 32768,
2992 "protocol": "tcp",
2993 })],
2994 network_interfaces: Vec::new(),
2995 health_status: None,
2996 managed_agents: None,
2997 image_digest: None,
2998 }];
2999 acct.tasks.insert("t1".into(), task);
3000
3001 let state = acct.clone();
3002 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
3003 assert_eq!(targets.len(), 1);
3004 let (arn, tg_targets) = &targets[0];
3005 assert_eq!(
3006 arn,
3007 "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
3008 );
3009 assert_eq!(tg_targets.len(), 1);
3010 assert_eq!(tg_targets[0].0, "127.0.0.1");
3011 assert_eq!(tg_targets[0].1, Some(32768));
3012 }
3013
3014 #[test]
3015 fn compute_elbv2_targets_awsvpc_uses_eni_ip() {
3016 let mut accounts: MultiAccountState<EcsState> =
3017 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
3018 let acct = accounts.get_or_create("000000000000");
3019
3020 let td = crate::state::TaskDefinition {
3021 family: "app".into(),
3022 revision: 1,
3023 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
3024 container_definitions: Vec::new(),
3025 network_mode: Some("awsvpc".into()),
3026 status: "ACTIVE".into(),
3027 task_role_arn: None,
3028 execution_role_arn: None,
3029 requires_compatibilities: Vec::new(),
3030 compatibilities: Vec::new(),
3031 cpu: None,
3032 memory: None,
3033 pid_mode: None,
3034 ipc_mode: None,
3035 volumes: Vec::new(),
3036 placement_constraints: Vec::new(),
3037 proxy_configuration: None,
3038 inference_accelerators: Vec::new(),
3039 ephemeral_storage: None,
3040 runtime_platform: None,
3041 requires_attributes: Vec::new(),
3042 registered_at: Utc::now(),
3043 registered_by: None,
3044 deregistered_at: None,
3045 tags: Vec::new(),
3046 enable_fault_injection: None,
3047 };
3048 acct.task_definitions.insert("app".into(), {
3049 let mut m = std::collections::BTreeMap::new();
3050 m.insert(1, td);
3051 m
3052 });
3053
3054 let service = crate::state::Service {
3055 service_name: "svc".into(),
3056 service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
3057 cluster_name: "default".into(),
3058 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
3059 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
3060 family: "app".into(),
3061 revision: 1,
3062 desired_count: 1,
3063 running_count: 0,
3064 pending_count: 0,
3065 launch_type: "FARGATE".into(),
3066 status: "ACTIVE".into(),
3067 scheduling_strategy: "REPLICA".into(),
3068 deployment_controller: "ECS".into(),
3069 minimum_healthy_percent: Some(0),
3070 maximum_percent: Some(200),
3071 circuit_breaker: None,
3072 deployments: Vec::new(),
3073 load_balancers: vec![serde_json::json!({
3074 "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
3075 "containerName": "app",
3076 "containerPort": 80,
3077 })],
3078 service_registries: Vec::new(),
3079 placement_constraints: Vec::new(),
3080 placement_strategy: Vec::new(),
3081 network_configuration: None,
3082 volume_configurations: vec![],
3083 tags: Vec::new(),
3084 created_at: Utc::now(),
3085 created_by: None,
3086 role_arn: None,
3087 platform_version: None,
3088 health_check_grace_period_seconds: None,
3089 enable_execute_command: false,
3090 enable_ecs_managed_tags: false,
3091 propagate_tags: None,
3092 capacity_provider_strategy: Vec::new(),
3093 availability_zone_rebalancing: None,
3094 };
3095 acct.services.insert(
3096 crate::state::EcsState::service_key("default", "svc"),
3097 service,
3098 );
3099
3100 let mut task = make_task("t1");
3101 task.group = Some("service:svc".into());
3102 task.attachments = vec![crate::state::TaskAttachment {
3103 id: "eni-123".into(),
3104 attachment_type: "eni".into(),
3105 status: "ATTACHED".into(),
3106 details: vec![
3107 crate::state::AttachmentDetail {
3108 name: "privateIPv4Address".into(),
3109 value: "172.18.0.2".into(),
3110 },
3111 crate::state::AttachmentDetail {
3112 name: "macAddress".into(),
3113 value: "02:42:ac:12:00:02".into(),
3114 },
3115 ],
3116 }];
3117 acct.tasks.insert("t1".into(), task);
3118
3119 let state = acct.clone();
3120 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
3121 assert_eq!(targets.len(), 1);
3122 let (arn, tg_targets) = &targets[0];
3123 assert_eq!(
3124 arn,
3125 "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
3126 );
3127 assert_eq!(tg_targets.len(), 1);
3128 assert_eq!(tg_targets[0].0, "172.18.0.2");
3129 assert_eq!(tg_targets[0].1, Some(80));
3130 }
3131
3132 fn minimal_plan() -> ContainerPlan {
3133 ContainerPlan {
3134 container_name: "app".into(),
3135 image: "alpine".into(),
3136 env: Vec::new(),
3137 entry_point: Vec::new(),
3138 command: Vec::new(),
3139 secrets_refs: Vec::new(),
3140 essential: true,
3141 has_task_role: false,
3142 port_mappings: Vec::new(),
3143 network_mode: None,
3144 depends_on: Vec::new(),
3145 health_check: None,
3146 volume_mounts: Vec::new(),
3147 ulimits: Vec::new(),
3148 linux_parameters: None,
3149 stop_timeout: None,
3150 user: None,
3151 working_directory: None,
3152 tty: false,
3153 interactive: false,
3154 readonly_rootfs: false,
3155 }
3156 }
3157
3158 #[test]
3162 fn build_run_argv_emits_fakecloud_instance_label() {
3163 let plan = minimal_plan();
3164 let argv = build_run_argv(
3165 &plan,
3166 &[],
3167 "task-1",
3168 "host.docker.internal",
3169 None,
3170 "alpine",
3171 true,
3172 );
3173 let expected = fakecloud_instance_label();
3174 assert!(
3175 argv.windows(2)
3176 .any(|w| w[0] == "--label" && w[1] == expected),
3177 "argv must contain `--label {expected}`: {argv:?}",
3178 );
3179 }
3180
3181 #[test]
3186 fn fakecloud_instance_label_matches_reaper_format() {
3187 let label = fakecloud_instance_label();
3188 let (key, value) = label.split_once('=').expect("label is key=value");
3189 assert_eq!(key, "fakecloud-instance");
3190 let pid_str = value
3191 .strip_prefix("fakecloud-")
3192 .expect("value starts with fakecloud-");
3193 assert_eq!(
3194 pid_str.parse::<u32>().ok(),
3195 Some(std::process::id()),
3196 "reaper must be able to parse the owning pid out of {label}",
3197 );
3198 }
3199}