1use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use base64::Engine;
15use chrono::Utc;
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_logs::ingest::{append_events, IngestEvent};
18use fakecloud_logs::SharedLogsState;
19use fakecloud_secretsmanager::SharedSecretsManagerState;
20use fakecloud_ssm::SharedSsmState;
21use parking_lot::RwLock;
22use tempfile::TempDir;
23use tokio::process::Command;
24
25use crate::state::{LifecycleEvent, SharedEcsState};
26
27#[derive(Debug, thiserror::Error)]
28pub enum RuntimeError {
29 #[error("container CLI not found (tried docker, podman)")]
30 NoCli,
31 #[error("image pull failed: {0}")]
32 ImagePull(String),
33 #[error("container start failed: {0}")]
34 ContainerStart(String),
35 #[error("docker wait failed: {0}")]
36 Wait(String),
37}
38
39pub struct EcsRuntime {
41 cli: String,
42 net: fakecloud_core::container_net::HostNetworking,
47 server_port: u16,
52 docker_config: Option<Arc<TempDir>>,
57 containers: RwLock<std::collections::HashMap<String, Vec<(String, String)>>>,
62 delivery_bus: Option<Arc<DeliveryBus>>,
66 logs_state: Option<SharedLogsState>,
70 secretsmanager_state: Option<SharedSecretsManagerState>,
73 ssm_state: Option<SharedSsmState>,
76 k8s: Option<k8s::K8sTaskBackend>,
80}
81
82mod config;
83mod k8s;
84mod lb;
85mod monitoring;
86mod secrets;
87mod task_lifecycle;
88
89impl EcsRuntime {
90 pub fn new(server_port: u16) -> Option<Self> {
95 let cli = fakecloud_core::container_net::detect_container_cli()?;
96 let net = fakecloud_core::container_net::HostNetworking::detect(&cli);
97 let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
98 Some(Self {
99 cli,
100 net,
101 server_port,
102 docker_config,
103 containers: RwLock::new(std::collections::HashMap::new()),
104 delivery_bus: None,
105 logs_state: None,
106 secretsmanager_state: None,
107 ssm_state: None,
108 k8s: None,
109 })
110 }
111
112 pub async fn new_k8s(server_port: u16) -> Result<Self, k8s::BackendInitError> {
116 let backend = k8s::K8sTaskBackend::from_env(server_port).await?;
117 let net = fakecloud_core::container_net::HostNetworking {
120 host_alias: String::new(),
121 add_host_arg: None,
122 sibling_host: String::new(),
123 };
124 Ok(Self {
125 cli: String::new(),
126 net,
127 server_port,
128 docker_config: None,
129 containers: RwLock::new(std::collections::HashMap::new()),
130 delivery_bus: None,
131 logs_state: None,
132 secretsmanager_state: None,
133 ssm_state: None,
134 k8s: Some(backend),
135 })
136 }
137
138 pub fn cli_name(&self) -> &str {
140 if self.k8s.is_some() {
141 "kubernetes"
142 } else {
143 &self.cli
144 }
145 }
146
147 pub async fn reap_stale(&self) {
150 if let Some(k) = &self.k8s {
151 k.reap_stale().await;
152 }
153 }
154
155 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
158 self.delivery_bus = Some(bus);
159 self
160 }
161
162 pub fn with_logs(mut self, logs: SharedLogsState) -> Self {
165 self.logs_state = Some(logs);
166 self
167 }
168}
169
170#[derive(Clone, Debug)]
172pub(crate) struct ContainerPlan {
173 pub(crate) container_name: String,
174 pub(crate) image: String,
175 pub(crate) env: Vec<(String, String)>,
176 pub(crate) entry_point: Vec<String>,
177 pub(crate) command: Vec<String>,
178 pub(crate) secrets_refs: Vec<(String, String)>,
179 pub(crate) essential: bool,
180 pub(crate) has_task_role: bool,
181 pub(crate) port_mappings: Vec<PortMapping>,
186 pub(crate) network_mode: Option<String>,
191 pub(crate) depends_on: Vec<DependsOn>,
199 pub(crate) health_check: Option<HealthCheckSpec>,
206 pub(crate) volume_mounts: Vec<VolumeMount>,
211 pub(crate) ulimits: Vec<Ulimit>,
214 pub(crate) linux_parameters: Option<LinuxParameters>,
218 pub(crate) stop_timeout: Option<u32>,
220 pub(crate) user: Option<String>,
222 pub(crate) working_directory: Option<String>,
224 pub(crate) tty: bool,
226 pub(crate) interactive: bool,
228 pub(crate) readonly_rootfs: bool,
230}
231
232#[derive(Clone, Debug, PartialEq, Eq)]
238pub(crate) struct DependsOn {
239 pub container_name: String,
240 pub condition: DependsOnCondition,
241}
242
243#[derive(Clone, Copy, Debug, PartialEq, Eq)]
247pub(crate) enum DependsOnCondition {
248 Start,
251 Complete,
253 Success,
255 Healthy,
259}
260
261impl DependsOnCondition {
262 pub fn parse(raw: &str) -> Option<Self> {
266 match raw {
267 "START" => Some(Self::Start),
268 "COMPLETE" => Some(Self::Complete),
269 "SUCCESS" => Some(Self::Success),
270 "HEALTHY" => Some(Self::Healthy),
271 _ => None,
272 }
273 }
274
275 pub fn as_aws_str(self) -> &'static str {
279 match self {
280 Self::Start => "START",
281 Self::Complete => "COMPLETE",
282 Self::Success => "SUCCESS",
283 Self::Healthy => "HEALTHY",
284 }
285 }
286}
287
288#[derive(Clone, Debug, PartialEq, Eq)]
294pub(crate) struct HealthCheckSpec {
295 pub command: Vec<String>,
301 pub interval_seconds: u32,
302 pub timeout_seconds: u32,
303 pub retries: u32,
304 pub start_period_seconds: u32,
305}
306
307#[derive(Clone, Debug, PartialEq, Eq)]
311pub(crate) struct PortMapping {
312 pub container_port: u16,
313 pub host_port: u16,
316 pub protocol: String,
318}
319
320#[derive(Clone, Debug, PartialEq, Eq)]
342pub(crate) struct VolumeMount {
343 pub source: String,
347 pub container_path: String,
350 pub read_only: bool,
354}
355
356#[derive(Clone, Debug, PartialEq, Eq)]
358pub(crate) struct Ulimit {
359 pub name: String,
360 pub soft_limit: i32,
361 pub hard_limit: i32,
362}
363
364#[derive(Clone, Debug, PartialEq, Eq)]
366pub(crate) struct Device {
367 pub host_path: String,
368 pub container_path: String,
369 pub permissions: String,
370}
371
372#[derive(Clone, Debug, PartialEq, Eq)]
374pub(crate) struct Sysctl {
375 pub name: String,
376 pub value: String,
377}
378
379#[derive(Clone, Debug, PartialEq, Eq, Default)]
381pub(crate) struct LinuxParameters {
382 pub capabilities_add: Vec<String>,
383 pub capabilities_drop: Vec<String>,
384 pub devices: Vec<Device>,
385 pub init_process_enabled: bool,
386 pub shared_memory_size: Option<i32>,
387 pub sysctls: Vec<Sysctl>,
388 pub tmpfs: Vec<Tmpfs>,
389 pub privileged: bool,
390}
391
392#[derive(Clone, Debug, PartialEq, Eq)]
394pub(crate) struct Tmpfs {
395 pub container_path: String,
396 pub size: i32,
397 pub mount_options: Vec<String>,
398}
399
400#[derive(Clone, Debug)]
401struct ResolvedContainerPlan {
402 plan: ContainerPlan,
403 env: Vec<(String, String)>,
404}
405
406#[derive(Clone, Debug)]
408struct TaskExitOutcome {
409 exited_index: Option<usize>,
413 exit_code: i64,
414 stop_code: &'static str,
415}
416
417#[derive(Clone, Debug)]
420pub(crate) struct RunningContainer {
421 pub(crate) name: String,
422 pub(crate) container_id: String,
423 pub(crate) essential: bool,
424 pub(crate) exit_code: Option<i64>,
425 pub(crate) network_bindings: Vec<serde_json::Value>,
429 pub(crate) image_digest: Option<String>,
434}
435
436pub(crate) fn task_should_stop(containers: &[RunningContainer]) -> bool {
441 if containers.is_empty() {
442 return true;
443 }
444 let any_essential_exited = containers
445 .iter()
446 .any(|c| c.essential && c.exit_code.is_some());
447 if any_essential_exited {
448 return true;
449 }
450 containers.iter().all(|c| c.exit_code.is_some())
451}
452
453pub(crate) fn task_desired_stopped(
458 state: &SharedEcsState,
459 account_id: &str,
460 task_id: &str,
461) -> bool {
462 let accounts = state.read();
463 match accounts.get(account_id).and_then(|s| s.tasks.get(task_id)) {
464 Some(task) => task.desired_status == "STOPPED",
465 None => true,
466 }
467}
468
469fn build_container_plans(
470 state: &SharedEcsState,
471 account_id: &str,
472 task_id: &str,
473 _server_port: u16,
474) -> Result<Vec<ContainerPlan>, RuntimeError> {
475 let accounts = state.read();
476 let s = accounts
477 .get(account_id)
478 .ok_or_else(|| RuntimeError::ContainerStart("account missing".into()))?;
479 let task = s
480 .tasks
481 .get(task_id)
482 .ok_or_else(|| RuntimeError::ContainerStart("task missing".into()))?;
483 if task.containers.is_empty() {
484 return Err(RuntimeError::ContainerStart(
485 "task has no containers".into(),
486 ));
487 }
488 let has_task_role = task.task_role_arn.is_some();
489 let task_def = s
490 .task_definitions
491 .get(&task.family)
492 .and_then(|revs| revs.get(&task.revision));
493 let network_mode = task_def.and_then(|td| td.network_mode.clone());
494 let volumes_by_name: std::collections::HashMap<String, &serde_json::Value> = task_def
499 .map(|td| {
500 td.volumes
501 .iter()
502 .filter_map(|v| {
503 let name = v.get("name").and_then(|n| n.as_str())?;
504 Some((name.to_string(), v))
505 })
506 .collect()
507 })
508 .unwrap_or_default();
509 let mut plans = Vec::with_capacity(task.containers.len());
510 for container in &task.containers {
511 let def = find_container_definition(s, &task.family, task.revision, &container.name);
512 let secrets_refs = def
513 .as_ref()
514 .and_then(|d| d.get("secrets").and_then(|v| v.as_array()).cloned())
515 .map(|arr| {
516 arr.iter()
517 .filter_map(|e| {
518 let name = e.get("name").and_then(|v| v.as_str())?.to_string();
519 let value_from = e.get("valueFrom").and_then(|v| v.as_str())?.to_string();
520 Some((name, value_from))
521 })
522 .collect::<Vec<_>>()
523 })
524 .unwrap_or_default();
525 let str_array = |key: &str| -> Vec<String> {
526 def.as_ref()
527 .and_then(|d| d.get(key).and_then(|v| v.as_array()).cloned())
528 .map(|arr| {
529 arr.iter()
530 .filter_map(|v| v.as_str().map(String::from))
531 .collect::<Vec<_>>()
532 })
533 .unwrap_or_default()
534 };
535 let env = def
536 .as_ref()
537 .and_then(|d| d.get("environment").and_then(|v| v.as_array()).cloned())
538 .map(|arr| {
539 arr.iter()
540 .filter_map(|e| {
541 let k = e.get("name").and_then(|v| v.as_str())?;
542 let v = e.get("value").and_then(|v| v.as_str()).unwrap_or("");
543 Some((k.to_string(), v.to_string()))
544 })
545 .collect::<Vec<_>>()
546 })
547 .unwrap_or_default();
548 let port_mappings = def
549 .as_ref()
550 .and_then(|d| d.get("portMappings").and_then(|v| v.as_array()).cloned())
551 .map(|arr| {
552 arr.iter()
553 .filter_map(parse_port_mapping)
554 .collect::<Vec<_>>()
555 })
556 .unwrap_or_default();
557 let depends_on = def
558 .as_ref()
559 .and_then(|d| d.get("dependsOn").and_then(|v| v.as_array()).cloned())
560 .map(|arr| {
561 arr.iter()
562 .filter_map(parse_depends_on_entry)
563 .collect::<Vec<_>>()
564 })
565 .unwrap_or_default();
566 let health_check = def
567 .as_ref()
568 .and_then(|d| d.get("healthCheck"))
569 .and_then(parse_health_check);
570 let volume_mounts = def
571 .as_ref()
572 .and_then(|d| d.get("mountPoints").and_then(|v| v.as_array()).cloned())
573 .map(|arr| {
574 arr.iter()
575 .filter_map(|mp| resolve_mount_point(mp, &volumes_by_name))
576 .collect::<Vec<_>>()
577 })
578 .unwrap_or_default();
579 let ulimits = def
580 .as_ref()
581 .and_then(|d| d.get("ulimits").and_then(|v| v.as_array()).cloned())
582 .map(|arr| arr.iter().filter_map(parse_ulimit).collect::<Vec<_>>())
583 .unwrap_or_default();
584 let linux_parameters = def
585 .as_ref()
586 .and_then(|d| d.get("linuxParameters"))
587 .and_then(parse_linux_parameters);
588 let stop_timeout = def.as_ref().and_then(|d| {
589 d.get("stopTimeout")
590 .and_then(|v| v.as_u64())
591 .map(|n| n as u32)
592 });
593 let user = def
594 .as_ref()
595 .and_then(|d| d.get("user").and_then(|v| v.as_str()).map(String::from));
596 let working_directory = def.as_ref().and_then(|d| {
597 d.get("workingDirectory")
598 .and_then(|v| v.as_str())
599 .map(String::from)
600 });
601 let tty = def
602 .as_ref()
603 .and_then(|d| d.get("tty").and_then(|v| v.as_bool()))
604 .unwrap_or(false);
605 let interactive = def
606 .as_ref()
607 .and_then(|d| d.get("interactive").and_then(|v| v.as_bool()))
608 .unwrap_or(false);
609 let readonly_rootfs = def
610 .as_ref()
611 .and_then(|d| d.get("readonlyRootFilesystem").and_then(|v| v.as_bool()))
612 .unwrap_or(false);
613 plans.push(ContainerPlan {
614 container_name: container.name.clone(),
615 image: container.image.clone(),
616 env,
617 entry_point: str_array("entryPoint"),
618 command: str_array("command"),
619 secrets_refs,
620 essential: container.essential,
621 has_task_role,
622 port_mappings,
623 network_mode: network_mode.clone(),
624 depends_on,
625 health_check,
626 volume_mounts,
627 ulimits,
628 linux_parameters,
629 stop_timeout,
630 user,
631 working_directory,
632 tty,
633 interactive,
634 readonly_rootfs,
635 });
636 }
637 let plans = topo_sort_plans(plans);
638 Ok(plans)
639}
640
641fn resolve_mount_point(
649 mount_point: &serde_json::Value,
650 volumes_by_name: &std::collections::HashMap<String, &serde_json::Value>,
651) -> Option<VolumeMount> {
652 let container_path = mount_point
653 .get("containerPath")
654 .and_then(|v| v.as_str())?
655 .to_string();
656 let source_volume = mount_point.get("sourceVolume").and_then(|v| v.as_str())?;
657 let read_only = mount_point
658 .get("readOnly")
659 .and_then(|v| v.as_bool())
660 .unwrap_or(false);
661 let volume = volumes_by_name.get(source_volume)?;
662 let source = resolve_volume_source(source_volume, volume)?;
663 Some(VolumeMount {
664 source,
665 container_path,
666 read_only,
667 })
668}
669
670fn resolve_volume_source(name: &str, volume: &serde_json::Value) -> Option<String> {
689 if let Some(host) = volume.get("host") {
690 if let Some(path) = host.get("sourcePath").and_then(|v| v.as_str()) {
691 if !path.is_empty() {
694 ensure_dir_exists(path);
695 return Some(path.to_string());
696 }
697 }
698 }
699 if let Some(efs) = volume.get("efsVolumeConfiguration") {
700 let fs_id = efs.get("fileSystemId").and_then(|v| v.as_str())?;
701 let root = efs
702 .get("rootDirectory")
703 .and_then(|v| v.as_str())
704 .unwrap_or("/");
705 return Some(shared_volume_name("efs", fs_id, root));
706 }
707 if let Some(fsx) = volume.get("fsxWindowsFileServerVolumeConfiguration") {
708 let fs_id = fsx.get("fileSystemId").and_then(|v| v.as_str())?;
709 let root = fsx
710 .get("rootDirectory")
711 .and_then(|v| v.as_str())
712 .unwrap_or("/");
713 return Some(shared_volume_name("fsx", fs_id, root));
714 }
715 if volume.get("dockerVolumeConfiguration").is_some() {
716 return Some(name.to_string());
719 }
720 Some(name.to_string())
722}
723
724fn shared_volume_name(kind: &str, fs_id: &str, root: &str) -> String {
735 let trimmed = root.trim_start_matches('/').trim_end_matches('/');
736 let fs_id = sanitize_volume_segment(fs_id);
737 if trimmed.is_empty() {
738 format!("fakecloud-{kind}-{fs_id}")
739 } else {
740 format!(
741 "fakecloud-{kind}-{fs_id}-{}",
742 sanitize_volume_segment(trimmed)
743 )
744 }
745}
746
747fn sanitize_volume_segment(s: &str) -> String {
750 s.chars()
751 .map(|c| {
752 if c.is_ascii_alphanumeric() || matches!(c, '_' | '.' | '-') {
753 c
754 } else {
755 '-'
756 }
757 })
758 .collect()
759}
760
761fn ensure_dir_exists(path: &str) {
766 let _ = std::fs::create_dir_all(path);
767}
768
769fn parse_depends_on_entry(value: &serde_json::Value) -> Option<DependsOn> {
774 let container_name = value
775 .get("containerName")
776 .and_then(|v| v.as_str())?
777 .to_string();
778 let raw_condition = value.get("condition").and_then(|v| v.as_str())?;
779 let condition = DependsOnCondition::parse(raw_condition)?;
780 Some(DependsOn {
781 container_name,
782 condition,
783 })
784}
785
786fn topo_sort_plans(plans: Vec<ContainerPlan>) -> Vec<ContainerPlan> {
797 use std::collections::{HashMap, HashSet};
798 let names: HashSet<String> = plans.iter().map(|p| p.container_name.clone()).collect();
799 let index: HashMap<String, usize> = plans
800 .iter()
801 .enumerate()
802 .map(|(i, p)| (p.container_name.clone(), i))
803 .collect();
804 let mut in_degree: Vec<usize> = plans
809 .iter()
810 .map(|p| {
811 p.depends_on
812 .iter()
813 .filter(|d| names.contains(&d.container_name))
814 .count()
815 })
816 .collect();
817 let mut dependants: Vec<Vec<usize>> = vec![Vec::new(); plans.len()];
819 for (i, p) in plans.iter().enumerate() {
820 for d in &p.depends_on {
821 if let Some(&di) = index.get(&d.container_name) {
822 dependants[di].push(i);
823 }
824 }
825 }
826 let mut ordered: Vec<ContainerPlan> = Vec::with_capacity(plans.len());
827 let mut emitted: Vec<bool> = vec![false; plans.len()];
828 loop {
829 let next = (0..plans.len()).find(|&i| !emitted[i] && in_degree[i] == 0);
832 match next {
833 Some(i) => {
834 emitted[i] = true;
835 ordered.push(plans[i].clone());
836 for &di in &dependants[i] {
837 if in_degree[di] > 0 {
838 in_degree[di] -= 1;
839 }
840 }
841 }
842 None => break,
843 }
844 }
845 for (i, p) in plans.into_iter().enumerate() {
847 if !emitted[i] {
848 ordered.push(p);
849 }
850 }
851 ordered
852}
853
854pub(crate) fn find_depends_on_cycle(
863 container_definitions: &[serde_json::Value],
864) -> Option<(String, String)> {
865 use std::collections::HashMap;
866
867 let names: Vec<String> = container_definitions
868 .iter()
869 .filter_map(|c| c.get("name").and_then(|n| n.as_str()).map(String::from))
870 .collect();
871 let index: HashMap<&str, usize> = names
872 .iter()
873 .enumerate()
874 .map(|(i, n)| (n.as_str(), i))
875 .collect();
876
877 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); names.len()];
878 for (i, cd) in container_definitions.iter().enumerate() {
879 if i >= names.len() {
880 continue;
881 }
882 let Some(deps) = cd.get("dependsOn").and_then(|v| v.as_array()) else {
883 continue;
884 };
885 for d in deps {
886 let Some(target) = d.get("containerName").and_then(|v| v.as_str()) else {
887 continue;
888 };
889 if let Some(&j) = index.get(target) {
890 adj[i].push(j);
892 }
893 }
894 }
895
896 let mut state = vec![0u8; names.len()];
900 let mut stack: Vec<(usize, usize)> = Vec::new();
901 for start in 0..names.len() {
902 if state[start] != 0 {
903 continue;
904 }
905 stack.clear();
906 stack.push((start, 0));
907 state[start] = 1;
908 while let Some(&(node, next_edge)) = stack.last() {
909 if next_edge < adj[node].len() {
910 let nb = adj[node][next_edge];
911 stack.last_mut().unwrap().1 += 1;
912 match state[nb] {
913 0 => {
914 state[nb] = 1;
915 stack.push((nb, 0));
916 }
917 1 => {
918 return Some((names[node].clone(), names[nb].clone()));
919 }
920 _ => {}
921 }
922 } else {
923 state[node] = 2;
924 stack.pop();
925 }
926 }
927 }
928 None
929}
930
931#[derive(Debug, Clone)]
935struct InspectedState {
936 started: bool,
937 exited: bool,
938 exit_code: i64,
939 health: Option<String>,
940}
941
942async fn inspect_container_state(cli: &str, container_id: &str) -> Option<InspectedState> {
946 let format =
949 "{{.State.Status}}|{{.State.Running}}|{{.State.ExitCode}}|{{if .State.Health}}{{.State.Health.Status}}{{else}}<none>{{end}}";
950 let out = Command::new(cli)
951 .args(["inspect", "-f", format, container_id])
952 .output()
953 .await
954 .ok()?;
955 if !out.status.success() {
956 return None;
957 }
958 let raw = String::from_utf8_lossy(&out.stdout).trim().to_string();
959 let parts: Vec<&str> = raw.split('|').collect();
960 if parts.len() < 4 {
961 return None;
962 }
963 let status = parts[0];
964 let running = parts[1] == "true";
965 let exit_code: i64 = parts[2].parse().unwrap_or(-1);
966 let health = match parts[3] {
967 "<none>" | "" => None,
968 other => Some(other.to_string()),
969 };
970 let started = running || status == "exited" || status == "running" || status == "dead";
974 let exited = status == "exited" || status == "dead";
975 Some(InspectedState {
976 started,
977 exited,
978 exit_code,
979 health,
980 })
981}
982
983fn condition_is_met(condition: DependsOnCondition, state: &InspectedState) -> bool {
987 match condition {
988 DependsOnCondition::Start => state.started,
989 DependsOnCondition::Complete => state.exited,
990 DependsOnCondition::Success => state.exited && state.exit_code == 0,
991 DependsOnCondition::Healthy => state.health.as_deref() == Some("healthy"),
992 }
993}
994
995#[cfg(test)]
999pub(crate) fn __test_parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
1000 parse_port_mapping(value)
1001}
1002
1003fn parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
1009 let cmd_arr = value.get("command")?.as_array()?;
1010 let command: Vec<String> = cmd_arr
1011 .iter()
1012 .filter_map(|v| v.as_str().map(String::from))
1013 .collect();
1014 if command.is_empty() {
1015 return None;
1016 }
1017 if command.first().map(|s| s.as_str()) == Some("NONE") {
1018 return None;
1019 }
1020 let read_u32 = |key: &str, default: u32| -> u32 {
1021 value
1022 .get(key)
1023 .and_then(|v| v.as_i64())
1024 .filter(|n| (0..=u32::MAX as i64).contains(n))
1025 .map(|n| n as u32)
1026 .unwrap_or(default)
1027 };
1028 Some(HealthCheckSpec {
1029 command,
1030 interval_seconds: read_u32("interval", 30),
1031 timeout_seconds: read_u32("timeout", 5),
1032 retries: read_u32("retries", 3),
1033 start_period_seconds: read_u32("startPeriod", 0),
1034 })
1035}
1036
1037fn parse_ulimit(value: &serde_json::Value) -> Option<Ulimit> {
1039 let name = value.get("name").and_then(|v| v.as_str())?;
1040 let soft = value
1041 .get("softLimit")
1042 .and_then(|v| v.as_i64())
1043 .filter(|n| *n >= 0)? as i32;
1044 let hard = value
1045 .get("hardLimit")
1046 .and_then(|v| v.as_i64())
1047 .filter(|n| *n >= 0)? as i32;
1048 Some(Ulimit {
1049 name: name.to_string(),
1050 soft_limit: soft,
1051 hard_limit: hard,
1052 })
1053}
1054
1055fn parse_linux_parameters(value: &serde_json::Value) -> Option<LinuxParameters> {
1057 let mut lp = LinuxParameters::default();
1058 if let Some(arr) = value
1059 .get("capabilities")
1060 .and_then(|v| v.get("add"))
1061 .and_then(|v| v.as_array())
1062 {
1063 lp.capabilities_add = arr
1064 .iter()
1065 .filter_map(|v| v.as_str().map(String::from))
1066 .collect();
1067 }
1068 if let Some(arr) = value
1069 .get("capabilities")
1070 .and_then(|v| v.get("drop"))
1071 .and_then(|v| v.as_array())
1072 {
1073 lp.capabilities_drop = arr
1074 .iter()
1075 .filter_map(|v| v.as_str().map(String::from))
1076 .collect();
1077 }
1078 if let Some(arr) = value.get("devices").and_then(|v| v.as_array()) {
1079 lp.devices = arr.iter().filter_map(parse_device).collect();
1080 }
1081 lp.init_process_enabled = value
1082 .get("initProcessEnabled")
1083 .and_then(|v| v.as_bool())
1084 .unwrap_or(false);
1085 lp.shared_memory_size = value
1086 .get("sharedMemorySize")
1087 .and_then(|v| v.as_i64())
1088 .map(|n| n as i32);
1089 if let Some(arr) = value.get("sysctl").and_then(|v| v.as_array()) {
1090 lp.sysctls = arr.iter().filter_map(parse_sysctl).collect();
1091 }
1092 if let Some(arr) = value.get("tmpfs").and_then(|v| v.as_array()) {
1093 lp.tmpfs = arr.iter().filter_map(parse_tmpfs).collect();
1094 }
1095 lp.privileged = value
1096 .get("privileged")
1097 .and_then(|v| v.as_bool())
1098 .unwrap_or(false);
1099 Some(lp)
1100}
1101
1102fn parse_device(value: &serde_json::Value) -> Option<Device> {
1103 let host_path = value.get("hostPath").and_then(|v| v.as_str())?.to_string();
1104 let container_path = value
1105 .get("containerPath")
1106 .and_then(|v| v.as_str())?
1107 .to_string();
1108 let permissions = value
1109 .get("permissions")
1110 .and_then(|v| v.as_str())
1111 .unwrap_or("rwm")
1112 .to_string();
1113 Some(Device {
1114 host_path,
1115 container_path,
1116 permissions,
1117 })
1118}
1119
1120fn parse_sysctl(value: &serde_json::Value) -> Option<Sysctl> {
1121 let name = value.get("name").and_then(|v| v.as_str())?.to_string();
1122 let value_str = value.get("value").and_then(|v| v.as_str())?.to_string();
1123 Some(Sysctl {
1124 name,
1125 value: value_str,
1126 })
1127}
1128
1129fn parse_tmpfs(value: &serde_json::Value) -> Option<Tmpfs> {
1130 let container_path = value
1131 .get("containerPath")
1132 .and_then(|v| v.as_str())?
1133 .to_string();
1134 let size = value
1135 .get("size")
1136 .and_then(|v| v.as_i64())
1137 .filter(|n| *n > 0)? as i32;
1138 let mount_options = value
1139 .get("mountOptions")
1140 .and_then(|v| v.as_array())
1141 .map(|arr| {
1142 arr.iter()
1143 .filter_map(|v| v.as_str().map(String::from))
1144 .collect()
1145 })
1146 .unwrap_or_default();
1147 Some(Tmpfs {
1148 container_path,
1149 size,
1150 mount_options,
1151 })
1152}
1153
1154pub(crate) fn render_health_flags(hc: &HealthCheckSpec) -> Vec<String> {
1161 if hc.command.len() < 2 {
1162 return Vec::new();
1163 }
1164 let cmd_kind = hc.command[0].as_str();
1165 if cmd_kind != "CMD" && cmd_kind != "CMD-SHELL" {
1166 return Vec::new();
1167 }
1168 let cmd_string = hc.command[1..].join(" ");
1169 vec![
1170 "--health-cmd".into(),
1171 cmd_string,
1172 format!("--health-interval={}s", hc.interval_seconds),
1173 format!("--health-timeout={}s", hc.timeout_seconds),
1174 format!("--health-retries={}", hc.retries),
1175 format!("--health-start-period={}s", hc.start_period_seconds),
1176 ]
1177}
1178
1179#[cfg(test)]
1183pub(crate) fn __test_parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
1184 parse_health_check(value)
1185}
1186
1187pub(crate) fn docker_health_to_ecs(raw: &str) -> &'static str {
1193 match raw.trim().to_ascii_lowercase().as_str() {
1194 "healthy" => "HEALTHY",
1195 "unhealthy" => "UNHEALTHY",
1196 _ => "UNKNOWN",
1197 }
1198}
1199
1200fn parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
1204 let container_port = value
1205 .get("containerPort")
1206 .and_then(|v| v.as_i64())
1207 .filter(|n| (0..=u16::MAX as i64).contains(n))? as u16;
1208 let host_port_raw = value
1209 .get("hostPort")
1210 .and_then(|v| v.as_i64())
1211 .filter(|n| (0..=u16::MAX as i64).contains(n))
1212 .map(|n| n as u16)
1213 .unwrap_or(0);
1214 let host_port = if host_port_raw == 0 {
1215 container_port
1216 } else {
1217 host_port_raw
1218 };
1219 let protocol = value
1220 .get("protocol")
1221 .and_then(|v| v.as_str())
1222 .map(|s| s.to_ascii_lowercase())
1223 .unwrap_or_else(|| "tcp".to_string());
1224 Some(PortMapping {
1225 container_port,
1226 host_port,
1227 protocol,
1228 })
1229}
1230
1231pub(crate) fn fakecloud_instance_label() -> String {
1237 format!("fakecloud-instance=fakecloud-{}", std::process::id())
1238}
1239
1240pub(crate) fn build_run_argv(
1246 plan: &ContainerPlan,
1247 env: &[(String, String)],
1248 task_id: &str,
1249 host_alias: &str,
1250 add_host_arg: Option<&str>,
1251 run_image: &str,
1252 awsvpc_network_ready: bool,
1253) -> Vec<String> {
1254 let mut argv: Vec<String> = Vec::new();
1255 argv.push("run".into());
1256 argv.push("-d".into());
1257 argv.push("--name".into());
1258 argv.push(format!("{}-{}", task_id, plan.container_name));
1259 argv.push("--label".into());
1260 argv.push(format!("fakecloud-ecs-task={}", task_id));
1261 argv.push("--label".into());
1262 argv.push(format!("fakecloud-ecs-container={}", plan.container_name));
1263 argv.push("--label".into());
1270 argv.push(fakecloud_instance_label());
1271 if let Some(arg) = add_host_arg {
1275 argv.push("--add-host".into());
1276 argv.push(arg.to_string());
1277 }
1278 let use_awsvpc_network = plan.network_mode.as_deref() == Some("awsvpc") && awsvpc_network_ready;
1279 if use_awsvpc_network {
1280 argv.push("--network".into());
1281 argv.push(format!("fakecloud-ecs-{}", task_id));
1282 }
1283 let publish_ports = !use_awsvpc_network;
1289 if publish_ports {
1290 for pm in &plan.port_mappings {
1291 argv.push("--publish".into());
1292 argv.push(format!(
1293 "{}:{}/{}",
1294 pm.container_port, pm.host_port, pm.protocol
1295 ));
1296 }
1297 }
1298 if let Some(ref hc) = plan.health_check {
1299 argv.extend(render_health_flags(hc));
1300 }
1301 let http_alias_prefix = format!("http://{host_alias}:");
1302 let https_alias_prefix = format!("https://{host_alias}:");
1303 for (k, v) in env {
1304 let transformed = v
1305 .replace("http://127.0.0.1:", http_alias_prefix.as_str())
1306 .replace("https://127.0.0.1:", https_alias_prefix.as_str())
1307 .replace("http://localhost:", http_alias_prefix.as_str())
1308 .replace("https://localhost:", https_alias_prefix.as_str());
1309 argv.push("-e".into());
1310 argv.push(format!("{}={}", k, transformed));
1311 }
1312 for vm in &plan.volume_mounts {
1317 argv.push("-v".into());
1318 let suffix = if vm.read_only { ":ro" } else { "" };
1319 argv.push(format!("{}:{}{}", vm.source, vm.container_path, suffix));
1320 }
1321 for ul in &plan.ulimits {
1322 argv.push("--ulimit".into());
1323 argv.push(format!("{}={}:{}", ul.name, ul.soft_limit, ul.hard_limit));
1324 }
1325 if let Some(ref lp) = plan.linux_parameters {
1326 for cap in &lp.capabilities_add {
1327 argv.push("--cap-add".into());
1328 argv.push(cap.clone());
1329 }
1330 for cap in &lp.capabilities_drop {
1331 argv.push("--cap-drop".into());
1332 argv.push(cap.clone());
1333 }
1334 for dev in &lp.devices {
1335 argv.push("--device".into());
1336 argv.push(format!(
1337 "{}:{}{}",
1338 dev.host_path, dev.container_path, dev.permissions
1339 ));
1340 }
1341 if lp.init_process_enabled {
1342 argv.push("--init".into());
1343 }
1344 if let Some(size) = lp.shared_memory_size {
1345 argv.push("--shm-size".into());
1346 argv.push(format!("{}m", size));
1347 }
1348 for sys in &lp.sysctls {
1349 argv.push("--sysctl".into());
1350 argv.push(format!("{}={}", sys.name, sys.value));
1351 }
1352 for tmp in &lp.tmpfs {
1353 let mut opts = tmp.mount_options.join(",");
1354 if !opts.is_empty() {
1355 opts = format!(",{}", opts);
1356 }
1357 argv.push("--tmpfs".into());
1358 argv.push(format!("{}:size={}M{}", tmp.container_path, tmp.size, opts));
1359 }
1360 if lp.privileged {
1361 argv.push("--privileged".into());
1362 }
1363 }
1364 if let Some(timeout) = plan.stop_timeout {
1365 argv.push("--stop-timeout".into());
1366 argv.push(format!("{}", timeout));
1367 }
1368 if let Some(ref user) = plan.user {
1369 argv.push("--user".into());
1370 argv.push(user.clone());
1371 }
1372 if let Some(ref wd) = plan.working_directory {
1373 argv.push("--workdir".into());
1374 argv.push(wd.clone());
1375 }
1376 if plan.tty {
1377 argv.push("--tty".into());
1378 }
1379 if plan.interactive {
1380 argv.push("--interactive".into());
1381 }
1382 if plan.readonly_rootfs {
1383 argv.push("--read-only".into());
1384 }
1385 if let Some(first) = plan.entry_point.first() {
1386 argv.push("--entrypoint".into());
1387 argv.push(first.clone());
1388 }
1389 argv.push(run_image.to_string());
1390 for arg in plan.entry_point.iter().skip(1) {
1391 argv.push(arg.clone());
1392 }
1393 for arg in &plan.command {
1394 argv.push(arg.clone());
1395 }
1396 argv
1397}
1398
1399pub(crate) fn network_bindings_for(plan: &ContainerPlan) -> Vec<serde_json::Value> {
1403 if plan.network_mode.as_deref() == Some("awsvpc") {
1404 return Vec::new();
1405 }
1406 plan.port_mappings
1407 .iter()
1408 .map(|pm| {
1409 serde_json::json!({
1410 "bindIP": "0.0.0.0",
1411 "containerPort": pm.container_port,
1412 "hostPort": pm.host_port,
1413 "protocol": pm.protocol,
1414 })
1415 })
1416 .collect()
1417}
1418
1419#[allow(clippy::type_complexity)]
1423pub(crate) fn compute_elbv2_targets(
1424 ecs_state: &crate::state::EcsState,
1425 task: &crate::state::Task,
1426) -> Vec<(String, Vec<(String, Option<i64>)>)> {
1427 let mut result = Vec::new();
1428 let Some(group) = task.group.as_deref() else {
1429 return result;
1430 };
1431 let service_name = group.strip_prefix("service:").unwrap_or(group);
1432 let key = crate::state::EcsState::service_key(&task.cluster_name, service_name);
1433 let Some(service) = ecs_state.services.get(&key) else {
1434 return result;
1435 };
1436
1437 let network_mode = ecs_state
1438 .task_definitions
1439 .get(&task.family)
1440 .and_then(|revs| revs.get(&task.revision))
1441 .and_then(|td| td.network_mode.as_deref());
1442
1443 for lb in &service.load_balancers {
1444 let tg_arn = lb.get("targetGroupArn").and_then(|v| v.as_str());
1445 let container_name = lb.get("containerName").and_then(|v| v.as_str());
1446 let container_port = lb.get("containerPort").and_then(|v| v.as_i64());
1447 let Some(tg_arn) = tg_arn else { continue };
1448 let Some(container_name) = container_name else {
1449 continue;
1450 };
1451
1452 let target_id = if network_mode == Some("awsvpc") {
1453 task.attachments
1454 .iter()
1455 .find(|a| a.attachment_type == "eni")
1456 .and_then(|eni| {
1457 eni.details
1458 .iter()
1459 .find(|d| d.name == "privateIPv4Address")
1460 .map(|d| d.value.clone())
1461 })
1462 } else {
1463 Some("127.0.0.1".to_string())
1464 };
1465
1466 let port = if network_mode == Some("awsvpc") {
1467 container_port
1468 } else {
1469 task.containers
1470 .iter()
1471 .find(|c| c.name == container_name)
1472 .and_then(|c| {
1473 c.network_bindings
1474 .iter()
1475 .find(|nb| {
1476 nb.get("containerPort").and_then(|v| v.as_i64()) == container_port
1477 })
1478 .and_then(|nb| nb.get("hostPort").and_then(|v| v.as_i64()))
1479 })
1480 };
1481
1482 if let Some(id) = target_id {
1483 if let Some(entry) = result.iter_mut().find(|(arn, _)| arn == tg_arn) {
1484 entry.1.push((id, port));
1485 } else {
1486 result.push((tg_arn.to_string(), vec![(id, port)]));
1487 }
1488 }
1489 }
1490 result
1491}
1492
1493struct TaskSnapshot {
1494 task_arn: String,
1495 cluster_arn: String,
1496 launch_type: String,
1497 group: Option<String>,
1498 task_definition_arn: String,
1499 containers: serde_json::Value,
1500}
1501
1502fn snapshot_task(state: &SharedEcsState, account_id: &str, task_id: &str) -> Option<TaskSnapshot> {
1503 let accounts = state.read();
1504 let s = accounts.get(account_id)?;
1505 let task = s.tasks.get(task_id)?;
1506 Some(TaskSnapshot {
1507 task_arn: task.task_arn.clone(),
1508 cluster_arn: task.cluster_arn.clone(),
1509 launch_type: task.launch_type.clone(),
1510 group: task.group.clone(),
1511 task_definition_arn: task.task_definition_arn.clone(),
1512 containers: serde_json::Value::Array(
1513 task.containers
1514 .iter()
1515 .map(|c| {
1516 serde_json::json!({
1517 "containerArn": c.container_arn,
1518 "name": c.name,
1519 "image": c.image,
1520 "lastStatus": c.last_status,
1521 "exitCode": c.exit_code,
1522 "reason": c.reason,
1523 })
1524 })
1525 .collect(),
1526 ),
1527 })
1528}
1529
1530fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
1538 let dir = TempDir::new().ok()?;
1539 let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-ecs-runtime");
1540 let config = serde_json::json!({
1541 "auths": {
1542 format!("127.0.0.1:{server_port}"): { "auth": auth },
1543 format!("host.docker.internal:{server_port}"): { "auth": auth },
1544 }
1545 });
1546 std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
1547 Some(dir)
1548}
1549
1550fn find_container_definition(
1551 state: &crate::state::EcsState,
1552 family: &str,
1553 revision: i32,
1554 name: &str,
1555) -> Option<serde_json::Value> {
1556 state
1557 .task_definitions
1558 .get(family)?
1559 .get(&revision)?
1560 .container_definitions
1561 .iter()
1562 .find(|c| c.get("name").and_then(|v| v.as_str()) == Some(name))
1563 .cloned()
1564}
1565
1566fn mark_pull_started(state: &SharedEcsState, account_id: &str, task_id: &str) {
1567 let mut accounts = state.write();
1568 let Some(s) = accounts.get_mut(account_id) else {
1569 return;
1570 };
1571 let task_arn_cluster = s
1572 .tasks
1573 .get(task_id)
1574 .map(|t| (t.task_arn.clone(), t.cluster_arn.clone()));
1575 if let Some(task) = s.tasks.get_mut(task_id) {
1576 task.pull_started_at = Some(Utc::now());
1577 }
1578 if let Some((arn, cluster_arn)) = task_arn_cluster {
1579 s.push_event(LifecycleEvent {
1580 at: Utc::now(),
1581 event_type: "PullStarted".into(),
1582 task_arn: Some(arn),
1583 cluster_arn: Some(cluster_arn),
1584 last_status: Some("PENDING".into()),
1585 detail: serde_json::json!({}),
1586 });
1587 }
1588}
1589
1590fn mark_pull_stopped(state: &SharedEcsState, account_id: &str, task_id: &str) {
1591 let mut accounts = state.write();
1592 let Some(s) = accounts.get_mut(account_id) else {
1593 return;
1594 };
1595 if let Some(task) = s.tasks.get_mut(task_id) {
1596 task.pull_stopped_at = Some(Utc::now());
1597 }
1598}
1599
1600pub(crate) fn mark_running_multi(
1601 state: &SharedEcsState,
1602 account_id: &str,
1603 task_id: &str,
1604 started: &[RunningContainer],
1605) {
1606 let mut accounts = state.write();
1607 let Some(s) = accounts.get_mut(account_id) else {
1608 return;
1609 };
1610 let (arn, cluster_arn) = {
1611 let Some(task) = s.tasks.get_mut(task_id) else {
1612 return;
1613 };
1614 task.last_status = "RUNNING".into();
1615 task.connectivity = "CONNECTED".into();
1616 task.connectivity_at = Some(Utc::now());
1617 task.started_at = Some(Utc::now());
1618 for rc in started {
1619 if let Some(c) = task.containers.iter_mut().find(|c| c.name == rc.name) {
1620 c.runtime_id = Some(rc.container_id.clone());
1621 c.last_status = "RUNNING".into();
1622 c.network_bindings = rc.network_bindings.clone();
1623 if rc.image_digest.is_some() {
1624 c.image_digest = rc.image_digest.clone();
1625 }
1626 }
1627 }
1628 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1629 cluster.running_tasks_count += 1;
1630 if cluster.pending_tasks_count > 0 {
1631 cluster.pending_tasks_count -= 1;
1632 }
1633 }
1634 if let Some(ref ci_arn) = task.container_instance_arn {
1635 if let Some(ci) = s
1636 .container_instances
1637 .values_mut()
1638 .find(|ci| ci.container_instance_arn == *ci_arn)
1639 {
1640 ci.running_tasks_count += 1;
1641 if ci.pending_tasks_count > 0 {
1642 ci.pending_tasks_count -= 1;
1643 }
1644 }
1645 }
1646 (task.task_arn.clone(), task.cluster_arn.clone())
1647 };
1648 s.push_event(LifecycleEvent {
1649 at: Utc::now(),
1650 event_type: "TaskStateChange".into(),
1651 task_arn: Some(arn),
1652 cluster_arn: Some(cluster_arn),
1653 last_status: Some("RUNNING".into()),
1654 detail: serde_json::json!({}),
1655 });
1656}
1657
1658#[allow(clippy::too_many_arguments)]
1659fn finalize_stopped_multi(
1660 state: &SharedEcsState,
1661 account_id: &str,
1662 task_id: &str,
1663 final_containers: &[RunningContainer],
1664 primary_exit_code: i64,
1665 captured: &str,
1666 stop_code: &str,
1667 stopped_reason: Option<String>,
1668) {
1669 let mut accounts = state.write();
1670 let Some(s) = accounts.get_mut(account_id) else {
1671 return;
1672 };
1673 let (arn, cluster_arn) = {
1674 let Some(task) = s.tasks.get_mut(task_id) else {
1675 return;
1676 };
1677 task.last_status = "STOPPED".into();
1678 task.desired_status = "STOPPED".into();
1679 task.stopping_at = task.stopping_at.or(Some(Utc::now()));
1680 task.stopped_at = Some(Utc::now());
1681 task.stop_code = Some(stop_code.into());
1682 task.stopped_reason = stopped_reason.or(Some(format!("Exit code {}", primary_exit_code)));
1683 task.captured_logs = captured.to_string();
1684 for c in task.containers.iter_mut() {
1685 c.last_status = "STOPPED".into();
1686 if c.exit_code.is_none() {
1687 let mapped = final_containers
1688 .iter()
1689 .find(|r| r.name == c.name)
1690 .and_then(|r| r.exit_code);
1691 c.exit_code = mapped.or(Some(primary_exit_code));
1692 }
1693 }
1694 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1695 if cluster.running_tasks_count > 0 {
1696 cluster.running_tasks_count -= 1;
1697 }
1698 }
1699 if let Some(ref ci_arn) = task.container_instance_arn {
1700 if let Some(ci) = s
1701 .container_instances
1702 .values_mut()
1703 .find(|ci| ci.container_instance_arn == *ci_arn)
1704 {
1705 if ci.running_tasks_count > 0 {
1706 ci.running_tasks_count -= 1;
1707 }
1708 }
1709 }
1710 (task.task_arn.clone(), task.cluster_arn.clone())
1711 };
1712 s.push_event(LifecycleEvent {
1713 at: Utc::now(),
1714 event_type: "TaskStateChange".into(),
1715 task_arn: Some(arn),
1716 cluster_arn: Some(cluster_arn),
1717 last_status: Some("STOPPED".into()),
1718 detail: serde_json::json!({
1719 "exitCode": primary_exit_code,
1720 "stopCode": stop_code,
1721 }),
1722 });
1723}
1724
1725fn finalize_failure(state: &SharedEcsState, account_id: &str, task_id: &str, reason: &str) {
1726 let mut accounts = state.write();
1727 let Some(s) = accounts.get_mut(account_id) else {
1728 return;
1729 };
1730 let (arn, cluster_arn) = {
1731 let Some(task) = s.tasks.get_mut(task_id) else {
1732 return;
1733 };
1734 let was_running = task.last_status == "RUNNING";
1740 task.last_status = "STOPPED".into();
1741 task.desired_status = "STOPPED".into();
1742 task.stopped_at = Some(Utc::now());
1743 task.stop_code = Some("TaskFailedToStart".into());
1744 task.stopped_reason = Some(reason.to_string());
1745 task.captured_logs = format!("[task failed to start]: {reason}");
1749 for c in task.containers.iter_mut() {
1750 c.last_status = "STOPPED".into();
1751 c.reason = Some(reason.to_string());
1752 }
1753 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1754 if was_running {
1755 if cluster.running_tasks_count > 0 {
1756 cluster.running_tasks_count -= 1;
1757 }
1758 } else if cluster.pending_tasks_count > 0 {
1759 cluster.pending_tasks_count -= 1;
1760 }
1761 }
1762 if let Some(ref ci_arn) = task.container_instance_arn {
1763 if let Some(ci) = s
1764 .container_instances
1765 .values_mut()
1766 .find(|ci| ci.container_instance_arn == *ci_arn)
1767 {
1768 if was_running {
1769 if ci.running_tasks_count > 0 {
1770 ci.running_tasks_count -= 1;
1771 }
1772 } else if ci.pending_tasks_count > 0 {
1773 ci.pending_tasks_count -= 1;
1774 }
1775 }
1776 }
1777 (task.task_arn.clone(), task.cluster_arn.clone())
1778 };
1779 s.push_event(LifecycleEvent {
1780 at: Utc::now(),
1781 event_type: "TaskFailedToStart".into(),
1782 task_arn: Some(arn),
1783 cluster_arn: Some(cluster_arn),
1784 last_status: Some("STOPPED".into()),
1785 detail: serde_json::json!({ "reason": reason }),
1786 });
1787}
1788
1789pub async fn sleep(duration: Duration) {
1793 tokio::time::sleep(duration).await;
1794}
1795
1796#[cfg(test)]
1797mod tests {
1798 use super::*;
1799 use crate::state::{EcsState, Task};
1800 use fakecloud_aws::arn::Arn;
1801 use fakecloud_core::multi_account::MultiAccountState;
1802 use parking_lot::RwLock;
1803 use std::sync::Arc;
1804
1805 #[test]
1806 fn cli_available_for_known_missing_binary_is_false() {
1807 assert!(!fakecloud_core::container_net::cli_available(
1808 "definitely-not-a-real-cli-binary-xyz"
1809 ));
1810 }
1811
1812 #[test]
1813 fn aws_ecr_uris_translate_for_local_pull() {
1814 assert_eq!(
1815 fakecloud_core::ecr_uri::translate_to_local(
1816 "123456789012.dkr.ecr.us-east-1.amazonaws.com/app:latest",
1817 4566
1818 )
1819 .as_deref(),
1820 Some("127.0.0.1:4566/app:latest")
1821 );
1822 }
1823
1824 fn make_task(task_id: &str) -> Task {
1825 Task {
1826 task_arn: Arn::new(
1827 "ecs",
1828 "us-east-1",
1829 "000000000000",
1830 &format!("task/default/{task_id}"),
1831 )
1832 .to_string(),
1833 task_id: task_id.into(),
1834 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
1835 cluster_name: "default".into(),
1836 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
1837 family: "app".into(),
1838 revision: 1,
1839 container_instance_arn: None,
1840 capacity_provider_name: None,
1841 last_status: "PENDING".into(),
1842 desired_status: "RUNNING".into(),
1843 launch_type: "FARGATE".into(),
1844 platform_version: None,
1845 cpu: None,
1846 memory: None,
1847 containers: Vec::new(),
1848 overrides: serde_json::json!({}),
1849 started_by: None,
1850 group: None,
1851 connectivity: "CONNECTING".into(),
1852 stop_code: None,
1853 stopped_reason: None,
1854 created_at: Utc::now(),
1855 started_at: None,
1856 stopping_at: None,
1857 stopped_at: None,
1858 pull_started_at: None,
1859 pull_stopped_at: None,
1860 connectivity_at: None,
1861 started_by_ref_id: None,
1862 execution_role_arn: None,
1863 task_role_arn: None,
1864 tags: Vec::new(),
1865 awslogs: None,
1866 captured_logs: String::new(),
1867 protection: None,
1868 enable_execute_command: false,
1869 attachments: Vec::new(),
1870 volume_configurations: Vec::new(),
1871 task_set_arn: None,
1872 }
1873 }
1874
1875 #[test]
1876 fn finalize_failure_writes_reason_into_captured_logs() {
1877 let mut accounts: MultiAccountState<EcsState> =
1878 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1879 let acct = accounts.get_or_create("000000000000");
1880 acct.tasks.insert("t1".into(), make_task("t1"));
1881 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1882
1883 finalize_failure(
1884 &state,
1885 "000000000000",
1886 "t1",
1887 "failed to resolve secret DB_PASSWORD",
1888 );
1889
1890 let accounts = state.read();
1891 let task = accounts
1892 .get("000000000000")
1893 .unwrap()
1894 .tasks
1895 .get("t1")
1896 .unwrap();
1897 assert_eq!(task.last_status, "STOPPED");
1898 assert_eq!(task.stop_code.as_deref(), Some("TaskFailedToStart"));
1899 assert!(
1900 task.captured_logs
1901 .contains("failed to resolve secret DB_PASSWORD"),
1902 "captured_logs missing reason: {:?}",
1903 task.captured_logs
1904 );
1905 assert!(
1906 task.captured_logs.starts_with("[task failed to start]:"),
1907 "captured_logs missing prefix: {:?}",
1908 task.captured_logs
1909 );
1910 }
1911
1912 #[test]
1917 fn task_desired_stopped_detects_stop_during_launch() {
1918 let mut accounts: MultiAccountState<EcsState> =
1919 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1920 let acct = accounts.get_or_create("000000000000");
1921 acct.tasks.insert("running".into(), make_task("running"));
1922 let mut stopping = make_task("stopping");
1923 stopping.desired_status = "STOPPED".into();
1924 acct.tasks.insert("stopping".into(), stopping);
1925 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1926
1927 assert!(
1928 !task_desired_stopped(&state, "000000000000", "running"),
1929 "a RUNNING task must not be treated as stopped",
1930 );
1931 assert!(
1932 task_desired_stopped(&state, "000000000000", "stopping"),
1933 "a task whose desired_status is STOPPED must be treated as stopped",
1934 );
1935 assert!(
1936 task_desired_stopped(&state, "000000000000", "deleted-mid-launch"),
1937 "a task removed from state mid-launch must be treated as stopped",
1938 );
1939 }
1940
1941 fn make_container(name: &str, essential: bool) -> crate::state::Container {
1942 crate::state::Container {
1943 container_arn: format!(
1944 "arn:aws:ecs:us-east-1:000000000000:container/default/abc/{name}"
1945 ),
1946 name: name.into(),
1947 image: "alpine".into(),
1948 task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
1949 last_status: "RUNNING".into(),
1950 exit_code: None,
1951 reason: None,
1952 runtime_id: Some(format!("dockerid-{name}")),
1953 essential,
1954 cpu: None,
1955 memory: None,
1956 memory_reservation: None,
1957 network_bindings: Vec::new(),
1958 network_interfaces: Vec::new(),
1959 health_status: None,
1960 managed_agents: None,
1961 image_digest: None,
1962 }
1963 }
1964
1965 #[test]
1966 fn task_should_stop_when_essential_exits() {
1967 let containers = vec![
1968 RunningContainer {
1969 name: "app".into(),
1970 container_id: "id-app".into(),
1971 essential: true,
1972 exit_code: Some(0),
1973 network_bindings: Vec::new(),
1974 image_digest: None,
1975 },
1976 RunningContainer {
1977 name: "sidecar".into(),
1978 container_id: "id-sc".into(),
1979 essential: false,
1980 exit_code: None,
1981 network_bindings: Vec::new(),
1982 image_digest: None,
1983 },
1984 ];
1985 assert!(task_should_stop(&containers));
1986 }
1987
1988 #[test]
1989 fn task_keeps_running_when_only_non_essential_exits() {
1990 let containers = vec![
1991 RunningContainer {
1992 name: "app".into(),
1993 container_id: "id-app".into(),
1994 essential: true,
1995 exit_code: None,
1996 network_bindings: Vec::new(),
1997 image_digest: None,
1998 },
1999 RunningContainer {
2000 name: "sidecar".into(),
2001 container_id: "id-sc".into(),
2002 essential: false,
2003 exit_code: Some(0),
2004 network_bindings: Vec::new(),
2005 image_digest: None,
2006 },
2007 ];
2008 assert!(!task_should_stop(&containers));
2009 }
2010
2011 #[test]
2012 fn task_stops_when_all_non_essentials_exit() {
2013 let containers = vec![
2014 RunningContainer {
2015 name: "a".into(),
2016 container_id: "id-a".into(),
2017 essential: false,
2018 exit_code: Some(0),
2019 network_bindings: Vec::new(),
2020 image_digest: None,
2021 },
2022 RunningContainer {
2023 name: "b".into(),
2024 container_id: "id-b".into(),
2025 essential: false,
2026 exit_code: Some(1),
2027 network_bindings: Vec::new(),
2028 image_digest: None,
2029 },
2030 ];
2031 assert!(task_should_stop(&containers));
2032 }
2033
2034 #[test]
2035 fn finalize_stopped_multi_assigns_per_container_exit_codes() {
2036 let mut accounts: MultiAccountState<EcsState> =
2037 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2038 let acct = accounts.get_or_create("000000000000");
2039 let mut t = make_task("t1");
2040 t.containers = vec![
2041 make_container("app", true),
2042 make_container("sidecar", false),
2043 ];
2044 acct.tasks.insert("t1".into(), t);
2045 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
2046
2047 let final_containers = vec![
2048 RunningContainer {
2049 name: "app".into(),
2050 container_id: "id-app".into(),
2051 essential: true,
2052 exit_code: Some(0),
2053 network_bindings: Vec::new(),
2054 image_digest: None,
2055 },
2056 RunningContainer {
2057 name: "sidecar".into(),
2058 container_id: "id-sc".into(),
2059 essential: false,
2060 exit_code: Some(137),
2061 network_bindings: Vec::new(),
2062 image_digest: None,
2063 },
2064 ];
2065 finalize_stopped_multi(
2066 &state,
2067 "000000000000",
2068 "t1",
2069 &final_containers,
2070 0,
2071 "captured",
2072 "EssentialContainerExited",
2073 None,
2074 );
2075
2076 let accounts = state.read();
2077 let task = accounts
2078 .get("000000000000")
2079 .unwrap()
2080 .tasks
2081 .get("t1")
2082 .unwrap();
2083 assert_eq!(task.last_status, "STOPPED");
2084 assert_eq!(task.stop_code.as_deref(), Some("EssentialContainerExited"));
2085 let app = task.containers.iter().find(|c| c.name == "app").unwrap();
2086 let sc = task
2087 .containers
2088 .iter()
2089 .find(|c| c.name == "sidecar")
2090 .unwrap();
2091 assert_eq!(app.exit_code, Some(0));
2092 assert_eq!(sc.exit_code, Some(137));
2093 assert_eq!(app.last_status, "STOPPED");
2094 assert_eq!(sc.last_status, "STOPPED");
2095 }
2096
2097 fn plan(name: &str, deps: &[&str]) -> ContainerPlan {
2098 ContainerPlan {
2099 container_name: name.into(),
2100 image: "alpine".into(),
2101 env: Vec::new(),
2102 entry_point: Vec::new(),
2103 command: Vec::new(),
2104 secrets_refs: Vec::new(),
2105 essential: true,
2106 has_task_role: false,
2107 port_mappings: Vec::new(),
2108 network_mode: None,
2109 depends_on: deps
2110 .iter()
2111 .map(|s| DependsOn {
2112 container_name: (*s).to_string(),
2113 condition: DependsOnCondition::Start,
2114 })
2115 .collect(),
2116 health_check: None,
2117 volume_mounts: Vec::new(),
2118 ulimits: Vec::new(),
2119 linux_parameters: None,
2120 stop_timeout: None,
2121 user: None,
2122 working_directory: None,
2123 tty: false,
2124 interactive: false,
2125 readonly_rootfs: false,
2126 }
2127 }
2128
2129 #[test]
2130 fn topo_sort_orders_by_depends_on() {
2131 let plans = vec![plan("sidecar", &["app"]), plan("app", &[])];
2134 let ordered = topo_sort_plans(plans);
2135 assert_eq!(ordered[0].container_name, "app");
2136 assert_eq!(ordered[1].container_name, "sidecar");
2137 }
2138
2139 #[test]
2140 fn topo_sort_preserves_declaration_order_when_no_deps() {
2141 let plans = vec![plan("first", &[]), plan("second", &[]), plan("third", &[])];
2142 let ordered = topo_sort_plans(plans);
2143 let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2144 assert_eq!(names, vec!["first", "second", "third"]);
2145 }
2146
2147 #[test]
2148 fn topo_sort_handles_chain() {
2149 let plans = vec![plan("c", &["b"]), plan("b", &["a"]), plan("a", &[])];
2152 let ordered = topo_sort_plans(plans);
2153 let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2154 assert_eq!(names, vec!["a", "b", "c"]);
2155 }
2156
2157 #[test]
2158 fn topo_sort_ignores_unknown_dependency() {
2159 let plans = vec![plan("only", &["does-not-exist"])];
2163 let ordered = topo_sort_plans(plans);
2164 assert_eq!(ordered.len(), 1);
2165 assert_eq!(ordered[0].container_name, "only");
2166 }
2167
2168 #[test]
2169 fn topo_sort_recovers_from_cycle() {
2170 let plans = vec![plan("a", &["b"]), plan("b", &["a"])];
2173 let ordered = topo_sort_plans(plans);
2174 assert_eq!(ordered.len(), 2);
2175 }
2176
2177 #[test]
2178 fn parse_health_check_fills_aws_defaults() {
2179 let v = serde_json::json!({
2180 "command": ["CMD-SHELL", "curl -f http://localhost/ || exit 1"],
2181 });
2182 let hc = __test_parse_health_check(&v).expect("parsed");
2183 assert_eq!(hc.command[0], "CMD-SHELL");
2184 assert_eq!(hc.interval_seconds, 30);
2185 assert_eq!(hc.timeout_seconds, 5);
2186 assert_eq!(hc.retries, 3);
2187 assert_eq!(hc.start_period_seconds, 0);
2188 }
2189
2190 #[test]
2191 fn parse_health_check_overrides_explicit_values() {
2192 let v = serde_json::json!({
2193 "command": ["CMD", "/probe"],
2194 "interval": 7,
2195 "timeout": 2,
2196 "retries": 9,
2197 "startPeriod": 12,
2198 });
2199 let hc = __test_parse_health_check(&v).expect("parsed");
2200 assert_eq!(hc.interval_seconds, 7);
2201 assert_eq!(hc.timeout_seconds, 2);
2202 assert_eq!(hc.retries, 9);
2203 assert_eq!(hc.start_period_seconds, 12);
2204 }
2205
2206 #[test]
2207 fn parse_health_check_returns_none_for_none_sentinel() {
2208 let v = serde_json::json!({ "command": ["NONE"] });
2211 assert!(__test_parse_health_check(&v).is_none());
2212 }
2213
2214 #[test]
2215 fn parse_health_check_returns_none_for_missing_command() {
2216 let v = serde_json::json!({ "interval": 30 });
2217 assert!(__test_parse_health_check(&v).is_none());
2218 }
2219
2220 #[test]
2221 fn render_health_flags_emits_full_set_for_cmd_shell() {
2222 let hc = HealthCheckSpec {
2223 command: vec!["CMD-SHELL".into(), "curl -f http://localhost/".into()],
2224 interval_seconds: 15,
2225 timeout_seconds: 3,
2226 retries: 4,
2227 start_period_seconds: 10,
2228 };
2229 let flags = render_health_flags(&hc);
2230 assert_eq!(flags[0], "--health-cmd");
2231 assert_eq!(flags[1], "curl -f http://localhost/");
2232 assert!(flags.contains(&"--health-interval=15s".to_string()));
2233 assert!(flags.contains(&"--health-timeout=3s".to_string()));
2234 assert!(flags.contains(&"--health-retries=4".to_string()));
2235 assert!(flags.contains(&"--health-start-period=10s".to_string()));
2236 }
2237
2238 #[test]
2239 fn render_health_flags_joins_cmd_argv_with_spaces() {
2240 let hc = HealthCheckSpec {
2243 command: vec![
2244 "CMD".into(),
2245 "/bin/probe".into(),
2246 "--port".into(),
2247 "8080".into(),
2248 ],
2249 interval_seconds: 30,
2250 timeout_seconds: 5,
2251 retries: 3,
2252 start_period_seconds: 0,
2253 };
2254 let flags = render_health_flags(&hc);
2255 assert_eq!(flags[1], "/bin/probe --port 8080");
2256 }
2257
2258 #[test]
2259 fn build_run_argv_emits_health_flags_when_present() {
2260 let plan = ContainerPlan {
2261 container_name: "app".into(),
2262 image: "alpine".into(),
2263 env: Vec::new(),
2264 entry_point: Vec::new(),
2265 command: Vec::new(),
2266 secrets_refs: Vec::new(),
2267 essential: true,
2268 has_task_role: false,
2269 port_mappings: Vec::new(),
2270 network_mode: None,
2271 depends_on: Vec::new(),
2272 health_check: Some(HealthCheckSpec {
2273 command: vec!["CMD-SHELL".into(), "true".into()],
2274 interval_seconds: 5,
2275 timeout_seconds: 2,
2276 retries: 1,
2277 start_period_seconds: 1,
2278 }),
2279 volume_mounts: Vec::new(),
2280 ulimits: Vec::new(),
2281 linux_parameters: None,
2282 stop_timeout: None,
2283 user: None,
2284 working_directory: None,
2285 tty: false,
2286 interactive: false,
2287 readonly_rootfs: false,
2288 };
2289 let argv = build_run_argv(
2290 &plan,
2291 &[],
2292 "task-1",
2293 "host.docker.internal",
2294 None,
2295 "alpine",
2296 true,
2297 );
2298 let joined = argv.join(" ");
2299 assert!(joined.contains("--health-cmd true"), "argv: {joined}");
2300 assert!(joined.contains("--health-interval=5s"), "argv: {joined}");
2301 assert!(joined.contains("--health-timeout=2s"), "argv: {joined}");
2302 assert!(joined.contains("--health-retries=1"), "argv: {joined}");
2303 assert!(
2304 joined.contains("--health-start-period=1s"),
2305 "argv: {joined}"
2306 );
2307 }
2308
2309 #[test]
2310 fn build_run_argv_emits_no_health_flags_when_absent() {
2311 let plan = ContainerPlan {
2312 container_name: "app".into(),
2313 image: "alpine".into(),
2314 env: Vec::new(),
2315 entry_point: Vec::new(),
2316 command: Vec::new(),
2317 secrets_refs: Vec::new(),
2318 essential: true,
2319 has_task_role: false,
2320 port_mappings: Vec::new(),
2321 network_mode: None,
2322 depends_on: Vec::new(),
2323 health_check: None,
2324 volume_mounts: Vec::new(),
2325 ulimits: Vec::new(),
2326 linux_parameters: None,
2327 stop_timeout: None,
2328 user: None,
2329 working_directory: None,
2330 tty: false,
2331 interactive: false,
2332 readonly_rootfs: false,
2333 };
2334 let argv = build_run_argv(
2335 &plan,
2336 &[],
2337 "task-1",
2338 "host.docker.internal",
2339 None,
2340 "alpine",
2341 true,
2342 );
2343 assert!(!argv.iter().any(|s| s.starts_with("--health")));
2344 }
2345
2346 #[test]
2347 fn docker_health_to_ecs_maps_known_states() {
2348 assert_eq!(docker_health_to_ecs("healthy"), "HEALTHY");
2349 assert_eq!(docker_health_to_ecs("HEALTHY"), "HEALTHY");
2350 assert_eq!(docker_health_to_ecs("unhealthy"), "UNHEALTHY");
2351 assert_eq!(docker_health_to_ecs("starting"), "UNKNOWN");
2352 assert_eq!(docker_health_to_ecs("none"), "UNKNOWN");
2353 assert_eq!(docker_health_to_ecs(""), "UNKNOWN");
2354 }
2355
2356 #[test]
2359 fn resolve_host_bind_volume_uses_source_path() {
2360 let mut volumes = std::collections::HashMap::new();
2361 let v = serde_json::json!({
2362 "name": "data",
2363 "host": { "sourcePath": "/var/lib/myapp" }
2364 });
2365 volumes.insert("data".to_string(), &v);
2366 let mp = serde_json::json!({
2367 "sourceVolume": "data",
2368 "containerPath": "/app/data",
2369 "readOnly": false
2370 });
2371 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2372 assert_eq!(resolved.source, "/var/lib/myapp");
2373 assert_eq!(resolved.container_path, "/app/data");
2374 assert!(!resolved.read_only);
2375 }
2376
2377 #[test]
2380 fn read_only_mount_renders_ro_suffix() {
2381 let plan = ContainerPlan {
2382 container_name: "app".into(),
2383 image: "alpine".into(),
2384 env: Vec::new(),
2385 entry_point: Vec::new(),
2386 command: Vec::new(),
2387 secrets_refs: Vec::new(),
2388 essential: true,
2389 has_task_role: false,
2390 port_mappings: Vec::new(),
2391 network_mode: None,
2392 depends_on: Vec::new(),
2393 health_check: None,
2394 volume_mounts: vec![VolumeMount {
2395 source: "/host/path".into(),
2396 container_path: "/in/container".into(),
2397 read_only: true,
2398 }],
2399 ulimits: Vec::new(),
2400 linux_parameters: None,
2401 stop_timeout: None,
2402 user: None,
2403 working_directory: None,
2404 tty: false,
2405 interactive: false,
2406 readonly_rootfs: false,
2407 };
2408 let argv = build_run_argv(
2409 &plan,
2410 &[],
2411 "task-1",
2412 "host.docker.internal",
2413 None,
2414 "alpine",
2415 true,
2416 );
2417 let pair = argv
2418 .windows(2)
2419 .find(|w| w[0] == "-v")
2420 .expect("expected -v flag");
2421 assert_eq!(pair[1], "/host/path:/in/container:ro");
2422 }
2423
2424 #[test]
2429 fn resolve_efs_volume_uses_stub_dir() {
2430 let mut volumes = std::collections::HashMap::new();
2431 let v = serde_json::json!({
2432 "name": "efs-vol",
2433 "efsVolumeConfiguration": {
2434 "fileSystemId": "fs-12345678",
2435 "rootDirectory": "/exports/app"
2436 }
2437 });
2438 volumes.insert("efs-vol".to_string(), &v);
2439 let mp = serde_json::json!({
2440 "sourceVolume": "efs-vol",
2441 "containerPath": "/mnt/efs"
2442 });
2443 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2444 assert_eq!(resolved.source, "fakecloud-efs-fs-12345678-exports-app");
2447 assert_eq!(resolved.container_path, "/mnt/efs");
2448 }
2449
2450 #[test]
2454 fn efs_without_root_directory_uses_filesystem_root() {
2455 assert_eq!(
2458 shared_volume_name("efs", "fs-abc", "/"),
2459 "fakecloud-efs-fs-abc"
2460 );
2461 assert_eq!(
2462 shared_volume_name("efs", "fs-abc", ""),
2463 "fakecloud-efs-fs-abc"
2464 );
2465 }
2466
2467 #[test]
2471 fn resolve_docker_named_volume_uses_volume_name() {
2472 let mut volumes = std::collections::HashMap::new();
2473 let v = serde_json::json!({
2474 "name": "named-vol",
2475 "dockerVolumeConfiguration": {
2476 "scope": "task",
2477 "driver": "local"
2478 }
2479 });
2480 volumes.insert("named-vol".to_string(), &v);
2481 let mp = serde_json::json!({
2482 "sourceVolume": "named-vol",
2483 "containerPath": "/data"
2484 });
2485 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2486 assert_eq!(resolved.source, "named-vol");
2487 assert_eq!(resolved.container_path, "/data");
2488 }
2489
2490 #[test]
2493 fn resolve_fsx_volume_uses_stub_dir() {
2494 let mut volumes = std::collections::HashMap::new();
2495 let v = serde_json::json!({
2496 "name": "fsx-vol",
2497 "fsxWindowsFileServerVolumeConfiguration": {
2498 "fileSystemId": "fs-xyz",
2499 "rootDirectory": "share"
2500 }
2501 });
2502 volumes.insert("fsx-vol".to_string(), &v);
2503 let mp = serde_json::json!({
2504 "sourceVolume": "fsx-vol",
2505 "containerPath": "C:\\data"
2506 });
2507 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2508 assert_eq!(resolved.source, "fakecloud-fsx-fs-xyz-share");
2510 }
2511
2512 #[test]
2516 fn unknown_source_volume_returns_none() {
2517 let volumes = std::collections::HashMap::new();
2518 let mp = serde_json::json!({
2519 "sourceVolume": "missing",
2520 "containerPath": "/x"
2521 });
2522 assert!(resolve_mount_point(&mp, &volumes).is_none());
2523 }
2524
2525 #[test]
2529 fn find_depends_on_cycle_detects_two_node_cycle() {
2530 let cds = vec![
2531 serde_json::json!({
2532 "name": "a",
2533 "image": "alpine",
2534 "dependsOn": [{"containerName": "b", "condition": "START"}],
2535 }),
2536 serde_json::json!({
2537 "name": "b",
2538 "image": "alpine",
2539 "dependsOn": [{"containerName": "a", "condition": "START"}],
2540 }),
2541 ];
2542 let cycle = find_depends_on_cycle(&cds);
2543 assert!(cycle.is_some(), "expected cycle to be detected");
2544 }
2545
2546 #[test]
2550 fn find_depends_on_cycle_accepts_chain() {
2551 let cds = vec![
2552 serde_json::json!({
2553 "name": "a",
2554 "image": "alpine",
2555 "dependsOn": [{"containerName": "b", "condition": "START"}],
2556 }),
2557 serde_json::json!({
2558 "name": "b",
2559 "image": "alpine",
2560 "dependsOn": [{"containerName": "c", "condition": "START"}],
2561 }),
2562 serde_json::json!({
2563 "name": "c",
2564 "image": "alpine",
2565 }),
2566 ];
2567 assert!(find_depends_on_cycle(&cds).is_none());
2568 }
2569
2570 #[test]
2574 fn find_depends_on_cycle_ignores_unknown_target() {
2575 let cds = vec![serde_json::json!({
2576 "name": "only",
2577 "image": "alpine",
2578 "dependsOn": [{"containerName": "ghost", "condition": "START"}],
2579 })];
2580 assert!(find_depends_on_cycle(&cds).is_none());
2581 }
2582
2583 #[test]
2587 fn condition_is_met_matches_aws_semantics() {
2588 let running = InspectedState {
2589 started: true,
2590 exited: false,
2591 exit_code: 0,
2592 health: None,
2593 };
2594 let exited_ok = InspectedState {
2595 started: true,
2596 exited: true,
2597 exit_code: 0,
2598 health: None,
2599 };
2600 let exited_fail = InspectedState {
2601 started: true,
2602 exited: true,
2603 exit_code: 1,
2604 health: None,
2605 };
2606 let healthy = InspectedState {
2607 started: true,
2608 exited: false,
2609 exit_code: 0,
2610 health: Some("healthy".into()),
2611 };
2612
2613 assert!(condition_is_met(DependsOnCondition::Start, &running));
2616 assert!(condition_is_met(DependsOnCondition::Start, &exited_ok));
2617
2618 assert!(!condition_is_met(DependsOnCondition::Complete, &running));
2620 assert!(condition_is_met(DependsOnCondition::Complete, &exited_ok));
2621 assert!(condition_is_met(DependsOnCondition::Complete, &exited_fail));
2622
2623 assert!(!condition_is_met(DependsOnCondition::Success, &running));
2625 assert!(condition_is_met(DependsOnCondition::Success, &exited_ok));
2626 assert!(!condition_is_met(DependsOnCondition::Success, &exited_fail));
2627
2628 assert!(!condition_is_met(DependsOnCondition::Healthy, &running));
2630 assert!(condition_is_met(DependsOnCondition::Healthy, &healthy));
2631 }
2632
2633 #[test]
2637 fn depends_on_condition_parse_round_trips() {
2638 assert_eq!(
2639 DependsOnCondition::parse("START"),
2640 Some(DependsOnCondition::Start)
2641 );
2642 assert_eq!(
2643 DependsOnCondition::parse("COMPLETE"),
2644 Some(DependsOnCondition::Complete)
2645 );
2646 assert_eq!(
2647 DependsOnCondition::parse("SUCCESS"),
2648 Some(DependsOnCondition::Success)
2649 );
2650 assert_eq!(
2651 DependsOnCondition::parse("HEALTHY"),
2652 Some(DependsOnCondition::Healthy)
2653 );
2654 assert_eq!(DependsOnCondition::parse("start"), None);
2655 assert_eq!(DependsOnCondition::parse("ANY"), None);
2656 }
2657
2658 #[test]
2661 fn build_run_argv_emits_ulimits() {
2662 let plan = ContainerPlan {
2663 container_name: "app".into(),
2664 image: "alpine".into(),
2665 env: Vec::new(),
2666 entry_point: Vec::new(),
2667 command: Vec::new(),
2668 secrets_refs: Vec::new(),
2669 essential: true,
2670 has_task_role: false,
2671 port_mappings: Vec::new(),
2672 network_mode: None,
2673 depends_on: Vec::new(),
2674 health_check: None,
2675 volume_mounts: Vec::new(),
2676 ulimits: vec![Ulimit {
2677 name: "nofile".into(),
2678 soft_limit: 1024,
2679 hard_limit: 2048,
2680 }],
2681 linux_parameters: None,
2682 stop_timeout: None,
2683 user: None,
2684 working_directory: None,
2685 tty: false,
2686 interactive: false,
2687 readonly_rootfs: false,
2688 };
2689 let argv = build_run_argv(&plan, &[], "t", "host.docker.internal", None, "img", true);
2690 assert!(argv.contains(&"--ulimit".to_string()));
2691 assert!(argv.contains(&"nofile=1024:2048".to_string()));
2692 }
2693
2694 #[test]
2695 fn build_run_argv_emits_linux_parameters() {
2696 let plan = ContainerPlan {
2697 container_name: "app".into(),
2698 image: "alpine".into(),
2699 env: Vec::new(),
2700 entry_point: Vec::new(),
2701 command: Vec::new(),
2702 secrets_refs: Vec::new(),
2703 essential: true,
2704 has_task_role: false,
2705 port_mappings: Vec::new(),
2706 network_mode: None,
2707 depends_on: Vec::new(),
2708 health_check: None,
2709 volume_mounts: Vec::new(),
2710 ulimits: Vec::new(),
2711 linux_parameters: Some(LinuxParameters {
2712 capabilities_add: vec!["NET_ADMIN".into()],
2713 capabilities_drop: vec!["ALL".into()],
2714 devices: vec![Device {
2715 host_path: "/dev/zero".into(),
2716 container_path: "/dev/zero".into(),
2717 permissions: "rwm".into(),
2718 }],
2719 init_process_enabled: true,
2720 shared_memory_size: Some(256),
2721 sysctls: vec![Sysctl {
2722 name: "net.ipv4.ip_forward".into(),
2723 value: "1".into(),
2724 }],
2725 tmpfs: vec![Tmpfs {
2726 container_path: "/tmp".into(),
2727 size: 128,
2728 mount_options: vec!["noexec".into()],
2729 }],
2730 privileged: true,
2731 }),
2732 stop_timeout: Some(30),
2733 user: Some("1000:1000".into()),
2734 working_directory: Some("/app".into()),
2735 tty: true,
2736 interactive: true,
2737 readonly_rootfs: true,
2738 };
2739 let argv = build_run_argv(&plan, &[], "t", "host.docker.internal", None, "img", true);
2740 assert!(argv.contains(&"--cap-add".to_string()));
2741 assert!(argv.contains(&"NET_ADMIN".to_string()));
2742 assert!(argv.contains(&"--cap-drop".to_string()));
2743 assert!(argv.contains(&"ALL".to_string()));
2744 assert!(argv.contains(&"--device".to_string()));
2745 assert!(argv.contains(&"/dev/zero:/dev/zerorwm".to_string()));
2746 assert!(argv.contains(&"--init".to_string()));
2747 assert!(argv.contains(&"--shm-size".to_string()));
2748 assert!(argv.contains(&"256m".to_string()));
2749 assert!(argv.contains(&"--sysctl".to_string()));
2750 assert!(argv.contains(&"net.ipv4.ip_forward=1".to_string()));
2751 assert!(argv.contains(&"--tmpfs".to_string()));
2752 assert!(argv.contains(&"--privileged".to_string()));
2753 assert!(argv.contains(&"--stop-timeout".to_string()));
2754 assert!(argv.contains(&"30".to_string()));
2755 assert!(argv.contains(&"--user".to_string()));
2756 assert!(argv.contains(&"1000:1000".to_string()));
2757 assert!(argv.contains(&"--workdir".to_string()));
2758 assert!(argv.contains(&"/app".to_string()));
2759 assert!(argv.contains(&"--tty".to_string()));
2760 assert!(argv.contains(&"--interactive".to_string()));
2761 assert!(argv.contains(&"--read-only".to_string()));
2762 }
2763
2764 #[test]
2765 fn parse_linux_parameters_fills_defaults() {
2766 let raw = serde_json::json!({"initProcessEnabled": true});
2767 let lp = parse_linux_parameters(&raw).expect("parses");
2768 assert!(lp.init_process_enabled);
2769 assert!(!lp.privileged);
2770 assert!(lp.capabilities_add.is_empty());
2771 }
2772
2773 #[test]
2774 fn parse_device_uses_default_permissions() {
2775 let raw = serde_json::json!({"hostPath": "/dev/null", "containerPath": "/dev/null"});
2776 let dev = parse_device(&raw).expect("parses");
2777 assert_eq!(dev.permissions, "rwm");
2778 }
2779
2780 #[test]
2781 fn compute_elbv2_targets_empty_when_no_group() {
2782 let mut accounts: MultiAccountState<EcsState> =
2783 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2784 let acct = accounts.get_or_create("000000000000");
2785 let mut task = make_task("t1");
2786 task.group = None;
2787 acct.tasks.insert("t1".into(), task);
2788 let state = acct.clone();
2789 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2790 assert!(targets.is_empty());
2791 }
2792
2793 #[test]
2794 fn compute_elbv2_targets_bridge_mode_uses_localhost_and_host_port() {
2795 let mut accounts: MultiAccountState<EcsState> =
2796 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2797 let acct = accounts.get_or_create("000000000000");
2798
2799 let td = crate::state::TaskDefinition {
2800 family: "app".into(),
2801 revision: 1,
2802 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2803 container_definitions: Vec::new(),
2804 network_mode: Some("bridge".into()),
2805 status: "ACTIVE".into(),
2806 task_role_arn: None,
2807 execution_role_arn: None,
2808 requires_compatibilities: Vec::new(),
2809 compatibilities: Vec::new(),
2810 cpu: None,
2811 memory: None,
2812 pid_mode: None,
2813 ipc_mode: None,
2814 volumes: Vec::new(),
2815 placement_constraints: Vec::new(),
2816 proxy_configuration: None,
2817 inference_accelerators: Vec::new(),
2818 ephemeral_storage: None,
2819 runtime_platform: None,
2820 requires_attributes: Vec::new(),
2821 registered_at: Utc::now(),
2822 registered_by: None,
2823 deregistered_at: None,
2824 tags: Vec::new(),
2825 enable_fault_injection: None,
2826 };
2827 acct.task_definitions.insert("app".into(), {
2828 let mut m = std::collections::BTreeMap::new();
2829 m.insert(1, td);
2830 m
2831 });
2832
2833 let service = crate::state::Service {
2834 service_name: "svc".into(),
2835 service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
2836 cluster_name: "default".into(),
2837 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2838 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2839 family: "app".into(),
2840 revision: 1,
2841 desired_count: 1,
2842 running_count: 0,
2843 pending_count: 0,
2844 launch_type: "FARGATE".into(),
2845 status: "ACTIVE".into(),
2846 scheduling_strategy: "REPLICA".into(),
2847 deployment_controller: "ECS".into(),
2848 minimum_healthy_percent: Some(0),
2849 maximum_percent: Some(200),
2850 circuit_breaker: None,
2851 deployments: Vec::new(),
2852 load_balancers: vec![serde_json::json!({
2853 "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
2854 "containerName": "app",
2855 "containerPort": 80,
2856 })],
2857 service_registries: Vec::new(),
2858 placement_constraints: Vec::new(),
2859 placement_strategy: Vec::new(),
2860 network_configuration: None,
2861 volume_configurations: vec![],
2862 tags: Vec::new(),
2863 created_at: Utc::now(),
2864 created_by: None,
2865 role_arn: None,
2866 platform_version: None,
2867 health_check_grace_period_seconds: None,
2868 enable_execute_command: false,
2869 enable_ecs_managed_tags: false,
2870 propagate_tags: None,
2871 capacity_provider_strategy: Vec::new(),
2872 availability_zone_rebalancing: None,
2873 };
2874 acct.services.insert(
2875 crate::state::EcsState::service_key("default", "svc"),
2876 service,
2877 );
2878
2879 let mut task = make_task("t1");
2880 task.group = Some("service:svc".into());
2881 task.containers = vec![crate::state::Container {
2882 container_arn: "arn:aws:ecs:us-east-1:000000000000:container/default/abc/app".into(),
2883 name: "app".into(),
2884 image: "alpine".into(),
2885 task_arn: task.task_arn.clone(),
2886 last_status: "RUNNING".into(),
2887 exit_code: None,
2888 reason: None,
2889 runtime_id: Some("dockerid-app".into()),
2890 essential: true,
2891 cpu: None,
2892 memory: None,
2893 memory_reservation: None,
2894 network_bindings: vec![serde_json::json!({
2895 "bindIP": "0.0.0.0",
2896 "containerPort": 80,
2897 "hostPort": 32768,
2898 "protocol": "tcp",
2899 })],
2900 network_interfaces: Vec::new(),
2901 health_status: None,
2902 managed_agents: None,
2903 image_digest: None,
2904 }];
2905 acct.tasks.insert("t1".into(), task);
2906
2907 let state = acct.clone();
2908 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2909 assert_eq!(targets.len(), 1);
2910 let (arn, tg_targets) = &targets[0];
2911 assert_eq!(
2912 arn,
2913 "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
2914 );
2915 assert_eq!(tg_targets.len(), 1);
2916 assert_eq!(tg_targets[0].0, "127.0.0.1");
2917 assert_eq!(tg_targets[0].1, Some(32768));
2918 }
2919
2920 #[test]
2921 fn compute_elbv2_targets_awsvpc_uses_eni_ip() {
2922 let mut accounts: MultiAccountState<EcsState> =
2923 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2924 let acct = accounts.get_or_create("000000000000");
2925
2926 let td = crate::state::TaskDefinition {
2927 family: "app".into(),
2928 revision: 1,
2929 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2930 container_definitions: Vec::new(),
2931 network_mode: Some("awsvpc".into()),
2932 status: "ACTIVE".into(),
2933 task_role_arn: None,
2934 execution_role_arn: None,
2935 requires_compatibilities: Vec::new(),
2936 compatibilities: Vec::new(),
2937 cpu: None,
2938 memory: None,
2939 pid_mode: None,
2940 ipc_mode: None,
2941 volumes: Vec::new(),
2942 placement_constraints: Vec::new(),
2943 proxy_configuration: None,
2944 inference_accelerators: Vec::new(),
2945 ephemeral_storage: None,
2946 runtime_platform: None,
2947 requires_attributes: Vec::new(),
2948 registered_at: Utc::now(),
2949 registered_by: None,
2950 deregistered_at: None,
2951 tags: Vec::new(),
2952 enable_fault_injection: None,
2953 };
2954 acct.task_definitions.insert("app".into(), {
2955 let mut m = std::collections::BTreeMap::new();
2956 m.insert(1, td);
2957 m
2958 });
2959
2960 let service = crate::state::Service {
2961 service_name: "svc".into(),
2962 service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
2963 cluster_name: "default".into(),
2964 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2965 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2966 family: "app".into(),
2967 revision: 1,
2968 desired_count: 1,
2969 running_count: 0,
2970 pending_count: 0,
2971 launch_type: "FARGATE".into(),
2972 status: "ACTIVE".into(),
2973 scheduling_strategy: "REPLICA".into(),
2974 deployment_controller: "ECS".into(),
2975 minimum_healthy_percent: Some(0),
2976 maximum_percent: Some(200),
2977 circuit_breaker: None,
2978 deployments: Vec::new(),
2979 load_balancers: vec![serde_json::json!({
2980 "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
2981 "containerName": "app",
2982 "containerPort": 80,
2983 })],
2984 service_registries: Vec::new(),
2985 placement_constraints: Vec::new(),
2986 placement_strategy: Vec::new(),
2987 network_configuration: None,
2988 volume_configurations: vec![],
2989 tags: Vec::new(),
2990 created_at: Utc::now(),
2991 created_by: None,
2992 role_arn: None,
2993 platform_version: None,
2994 health_check_grace_period_seconds: None,
2995 enable_execute_command: false,
2996 enable_ecs_managed_tags: false,
2997 propagate_tags: None,
2998 capacity_provider_strategy: Vec::new(),
2999 availability_zone_rebalancing: None,
3000 };
3001 acct.services.insert(
3002 crate::state::EcsState::service_key("default", "svc"),
3003 service,
3004 );
3005
3006 let mut task = make_task("t1");
3007 task.group = Some("service:svc".into());
3008 task.attachments = vec![crate::state::TaskAttachment {
3009 id: "eni-123".into(),
3010 attachment_type: "eni".into(),
3011 status: "ATTACHED".into(),
3012 details: vec![
3013 crate::state::AttachmentDetail {
3014 name: "privateIPv4Address".into(),
3015 value: "172.18.0.2".into(),
3016 },
3017 crate::state::AttachmentDetail {
3018 name: "macAddress".into(),
3019 value: "02:42:ac:12:00:02".into(),
3020 },
3021 ],
3022 }];
3023 acct.tasks.insert("t1".into(), task);
3024
3025 let state = acct.clone();
3026 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
3027 assert_eq!(targets.len(), 1);
3028 let (arn, tg_targets) = &targets[0];
3029 assert_eq!(
3030 arn,
3031 "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
3032 );
3033 assert_eq!(tg_targets.len(), 1);
3034 assert_eq!(tg_targets[0].0, "172.18.0.2");
3035 assert_eq!(tg_targets[0].1, Some(80));
3036 }
3037
3038 fn minimal_plan() -> ContainerPlan {
3039 ContainerPlan {
3040 container_name: "app".into(),
3041 image: "alpine".into(),
3042 env: Vec::new(),
3043 entry_point: Vec::new(),
3044 command: Vec::new(),
3045 secrets_refs: Vec::new(),
3046 essential: true,
3047 has_task_role: false,
3048 port_mappings: Vec::new(),
3049 network_mode: None,
3050 depends_on: Vec::new(),
3051 health_check: None,
3052 volume_mounts: Vec::new(),
3053 ulimits: Vec::new(),
3054 linux_parameters: None,
3055 stop_timeout: None,
3056 user: None,
3057 working_directory: None,
3058 tty: false,
3059 interactive: false,
3060 readonly_rootfs: false,
3061 }
3062 }
3063
3064 #[test]
3068 fn build_run_argv_emits_fakecloud_instance_label() {
3069 let plan = minimal_plan();
3070 let argv = build_run_argv(
3071 &plan,
3072 &[],
3073 "task-1",
3074 "host.docker.internal",
3075 None,
3076 "alpine",
3077 true,
3078 );
3079 let expected = fakecloud_instance_label();
3080 assert!(
3081 argv.windows(2)
3082 .any(|w| w[0] == "--label" && w[1] == expected),
3083 "argv must contain `--label {expected}`: {argv:?}",
3084 );
3085 }
3086
3087 #[test]
3092 fn fakecloud_instance_label_matches_reaper_format() {
3093 let label = fakecloud_instance_label();
3094 let (key, value) = label.split_once('=').expect("label is key=value");
3095 assert_eq!(key, "fakecloud-instance");
3096 let pid_str = value
3097 .strip_prefix("fakecloud-")
3098 .expect("value starts with fakecloud-");
3099 assert_eq!(
3100 pid_str.parse::<u32>().ok(),
3101 Some(std::process::id()),
3102 "reaper must be able to parse the owning pid out of {label}",
3103 );
3104 }
3105}