1use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use base64::Engine;
15use chrono::Utc;
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_logs::ingest::{append_events, IngestEvent};
18use fakecloud_logs::SharedLogsState;
19use fakecloud_secretsmanager::SharedSecretsManagerState;
20use fakecloud_ssm::SharedSsmState;
21use parking_lot::RwLock;
22use tempfile::TempDir;
23use tokio::process::Command;
24
25use crate::state::{LifecycleEvent, SharedEcsState};
26
27#[derive(Debug, thiserror::Error)]
28pub enum RuntimeError {
29 #[error("container CLI not found (tried docker, podman)")]
30 NoCli,
31 #[error("image pull failed: {0}")]
32 ImagePull(String),
33 #[error("container start failed: {0}")]
34 ContainerStart(String),
35 #[error("docker wait failed: {0}")]
36 Wait(String),
37}
38
39pub struct EcsRuntime {
41 cli: String,
42 net: fakecloud_core::container_net::HostNetworking,
47 server_port: u16,
52 docker_config: Option<Arc<TempDir>>,
57 containers: RwLock<std::collections::HashMap<String, Vec<(String, String)>>>,
62 delivery_bus: Option<Arc<DeliveryBus>>,
66 logs_state: Option<SharedLogsState>,
70 secretsmanager_state: Option<SharedSecretsManagerState>,
73 ssm_state: Option<SharedSsmState>,
76 k8s: Option<k8s::K8sTaskBackend>,
80}
81
82mod config;
83mod k8s;
84mod lb;
85mod monitoring;
86mod secrets;
87mod task_lifecycle;
88
89impl EcsRuntime {
90 pub fn new(server_port: u16) -> Option<Self> {
95 let cli = fakecloud_core::container_net::detect_container_cli()?;
96 let net = fakecloud_core::container_net::HostNetworking::detect(&cli);
97 let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
98 Some(Self {
99 cli,
100 net,
101 server_port,
102 docker_config,
103 containers: RwLock::new(std::collections::HashMap::new()),
104 delivery_bus: None,
105 logs_state: None,
106 secretsmanager_state: None,
107 ssm_state: None,
108 k8s: None,
109 })
110 }
111
112 pub async fn new_k8s(server_port: u16) -> Result<Self, k8s::BackendInitError> {
116 let backend = k8s::K8sTaskBackend::from_env(server_port).await?;
117 let net = fakecloud_core::container_net::HostNetworking {
120 host_alias: String::new(),
121 add_host_arg: None,
122 sibling_host: String::new(),
123 };
124 Ok(Self {
125 cli: String::new(),
126 net,
127 server_port,
128 docker_config: None,
129 containers: RwLock::new(std::collections::HashMap::new()),
130 delivery_bus: None,
131 logs_state: None,
132 secretsmanager_state: None,
133 ssm_state: None,
134 k8s: Some(backend),
135 })
136 }
137
138 pub fn cli_name(&self) -> &str {
140 if self.k8s.is_some() {
141 "kubernetes"
142 } else {
143 &self.cli
144 }
145 }
146
147 pub async fn reap_stale(&self) {
150 if let Some(k) = &self.k8s {
151 k.reap_stale().await;
152 }
153 }
154
155 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
158 self.delivery_bus = Some(bus);
159 self
160 }
161
162 pub fn with_logs(mut self, logs: SharedLogsState) -> Self {
165 self.logs_state = Some(logs);
166 self
167 }
168}
169
170#[derive(Clone, Debug)]
172pub(crate) struct ContainerPlan {
173 pub(crate) container_name: String,
174 pub(crate) image: String,
175 pub(crate) env: Vec<(String, String)>,
176 pub(crate) entry_point: Vec<String>,
177 pub(crate) command: Vec<String>,
178 pub(crate) secrets_refs: Vec<(String, String)>,
179 pub(crate) essential: bool,
180 pub(crate) has_task_role: bool,
181 pub(crate) port_mappings: Vec<PortMapping>,
186 pub(crate) network_mode: Option<String>,
191 pub(crate) depends_on: Vec<DependsOn>,
199 pub(crate) health_check: Option<HealthCheckSpec>,
206 pub(crate) volume_mounts: Vec<VolumeMount>,
211 pub(crate) ulimits: Vec<Ulimit>,
214 pub(crate) linux_parameters: Option<LinuxParameters>,
218 pub(crate) stop_timeout: Option<u32>,
220 pub(crate) user: Option<String>,
222 pub(crate) working_directory: Option<String>,
224 pub(crate) tty: bool,
226 pub(crate) interactive: bool,
228 pub(crate) readonly_rootfs: bool,
230}
231
232#[derive(Clone, Debug, PartialEq, Eq)]
238pub(crate) struct DependsOn {
239 pub container_name: String,
240 pub condition: DependsOnCondition,
241}
242
243#[derive(Clone, Copy, Debug, PartialEq, Eq)]
247pub(crate) enum DependsOnCondition {
248 Start,
251 Complete,
253 Success,
255 Healthy,
259}
260
261impl DependsOnCondition {
262 pub fn parse(raw: &str) -> Option<Self> {
266 match raw {
267 "START" => Some(Self::Start),
268 "COMPLETE" => Some(Self::Complete),
269 "SUCCESS" => Some(Self::Success),
270 "HEALTHY" => Some(Self::Healthy),
271 _ => None,
272 }
273 }
274
275 pub fn as_aws_str(self) -> &'static str {
279 match self {
280 Self::Start => "START",
281 Self::Complete => "COMPLETE",
282 Self::Success => "SUCCESS",
283 Self::Healthy => "HEALTHY",
284 }
285 }
286}
287
288#[derive(Clone, Debug, PartialEq, Eq)]
294pub(crate) struct HealthCheckSpec {
295 pub command: Vec<String>,
301 pub interval_seconds: u32,
302 pub timeout_seconds: u32,
303 pub retries: u32,
304 pub start_period_seconds: u32,
305}
306
307#[derive(Clone, Debug, PartialEq, Eq)]
311pub(crate) struct PortMapping {
312 pub container_port: u16,
313 pub host_port: u16,
316 pub protocol: String,
318}
319
320#[derive(Clone, Debug, PartialEq, Eq)]
342pub(crate) struct VolumeMount {
343 pub source: String,
347 pub container_path: String,
350 pub read_only: bool,
354}
355
356#[derive(Clone, Debug, PartialEq, Eq)]
358pub(crate) struct Ulimit {
359 pub name: String,
360 pub soft_limit: i32,
361 pub hard_limit: i32,
362}
363
364#[derive(Clone, Debug, PartialEq, Eq)]
366pub(crate) struct Device {
367 pub host_path: String,
368 pub container_path: String,
369 pub permissions: String,
370}
371
372#[derive(Clone, Debug, PartialEq, Eq)]
374pub(crate) struct Sysctl {
375 pub name: String,
376 pub value: String,
377}
378
379#[derive(Clone, Debug, PartialEq, Eq, Default)]
381pub(crate) struct LinuxParameters {
382 pub capabilities_add: Vec<String>,
383 pub capabilities_drop: Vec<String>,
384 pub devices: Vec<Device>,
385 pub init_process_enabled: bool,
386 pub shared_memory_size: Option<i32>,
387 pub sysctls: Vec<Sysctl>,
388 pub tmpfs: Vec<Tmpfs>,
389 pub privileged: bool,
390}
391
392#[derive(Clone, Debug, PartialEq, Eq)]
394pub(crate) struct Tmpfs {
395 pub container_path: String,
396 pub size: i32,
397 pub mount_options: Vec<String>,
398}
399
400#[derive(Clone, Debug)]
401struct ResolvedContainerPlan {
402 plan: ContainerPlan,
403 env: Vec<(String, String)>,
404}
405
406#[derive(Clone, Debug)]
408struct TaskExitOutcome {
409 exited_index: Option<usize>,
413 exit_code: i64,
414 stop_code: &'static str,
415}
416
417#[derive(Clone, Debug)]
420pub(crate) struct RunningContainer {
421 pub(crate) name: String,
422 pub(crate) container_id: String,
423 pub(crate) essential: bool,
424 pub(crate) exit_code: Option<i64>,
425 pub(crate) network_bindings: Vec<serde_json::Value>,
429 pub(crate) image_digest: Option<String>,
434}
435
436pub(crate) fn task_should_stop(containers: &[RunningContainer]) -> bool {
441 if containers.is_empty() {
442 return true;
443 }
444 let any_essential_exited = containers
445 .iter()
446 .any(|c| c.essential && c.exit_code.is_some());
447 if any_essential_exited {
448 return true;
449 }
450 containers.iter().all(|c| c.exit_code.is_some())
451}
452
453pub(crate) fn task_desired_stopped(
458 state: &SharedEcsState,
459 account_id: &str,
460 task_id: &str,
461) -> bool {
462 let accounts = state.read();
463 match accounts.get(account_id).and_then(|s| s.tasks.get(task_id)) {
464 Some(task) => task.desired_status == "STOPPED",
465 None => true,
466 }
467}
468
469fn build_container_plans(
470 state: &SharedEcsState,
471 account_id: &str,
472 task_id: &str,
473 _server_port: u16,
474) -> Result<Vec<ContainerPlan>, RuntimeError> {
475 let accounts = state.read();
476 let s = accounts
477 .get(account_id)
478 .ok_or_else(|| RuntimeError::ContainerStart("account missing".into()))?;
479 let task = s
480 .tasks
481 .get(task_id)
482 .ok_or_else(|| RuntimeError::ContainerStart("task missing".into()))?;
483 if task.containers.is_empty() {
484 return Err(RuntimeError::ContainerStart(
485 "task has no containers".into(),
486 ));
487 }
488 let has_task_role = task.task_role_arn.is_some();
489 let task_def = s
490 .task_definitions
491 .get(&task.family)
492 .and_then(|revs| revs.get(&task.revision));
493 let network_mode = task_def.and_then(|td| td.network_mode.clone());
494 let volumes_by_name: std::collections::HashMap<String, &serde_json::Value> = task_def
499 .map(|td| {
500 td.volumes
501 .iter()
502 .filter_map(|v| {
503 let name = v.get("name").and_then(|n| n.as_str())?;
504 Some((name.to_string(), v))
505 })
506 .collect()
507 })
508 .unwrap_or_default();
509 let mut plans = Vec::with_capacity(task.containers.len());
510 for container in &task.containers {
511 let def = find_container_definition(s, &task.family, task.revision, &container.name);
512 let secrets_refs = def
513 .as_ref()
514 .and_then(|d| d.get("secrets").and_then(|v| v.as_array()).cloned())
515 .map(|arr| {
516 arr.iter()
517 .filter_map(|e| {
518 let name = e.get("name").and_then(|v| v.as_str())?.to_string();
519 let value_from = e.get("valueFrom").and_then(|v| v.as_str())?.to_string();
520 Some((name, value_from))
521 })
522 .collect::<Vec<_>>()
523 })
524 .unwrap_or_default();
525 let str_array = |key: &str| -> Vec<String> {
526 def.as_ref()
527 .and_then(|d| d.get(key).and_then(|v| v.as_array()).cloned())
528 .map(|arr| {
529 arr.iter()
530 .filter_map(|v| v.as_str().map(String::from))
531 .collect::<Vec<_>>()
532 })
533 .unwrap_or_default()
534 };
535 let env = def
536 .as_ref()
537 .and_then(|d| d.get("environment").and_then(|v| v.as_array()).cloned())
538 .map(|arr| {
539 arr.iter()
540 .filter_map(|e| {
541 let k = e.get("name").and_then(|v| v.as_str())?;
542 let v = e.get("value").and_then(|v| v.as_str()).unwrap_or("");
543 Some((k.to_string(), v.to_string()))
544 })
545 .collect::<Vec<_>>()
546 })
547 .unwrap_or_default();
548 let port_mappings = def
549 .as_ref()
550 .and_then(|d| d.get("portMappings").and_then(|v| v.as_array()).cloned())
551 .map(|arr| {
552 arr.iter()
553 .filter_map(parse_port_mapping)
554 .collect::<Vec<_>>()
555 })
556 .unwrap_or_default();
557 let depends_on = def
558 .as_ref()
559 .and_then(|d| d.get("dependsOn").and_then(|v| v.as_array()).cloned())
560 .map(|arr| {
561 arr.iter()
562 .filter_map(parse_depends_on_entry)
563 .collect::<Vec<_>>()
564 })
565 .unwrap_or_default();
566 let health_check = def
567 .as_ref()
568 .and_then(|d| d.get("healthCheck"))
569 .and_then(parse_health_check);
570 let volume_mounts = def
571 .as_ref()
572 .and_then(|d| d.get("mountPoints").and_then(|v| v.as_array()).cloned())
573 .map(|arr| {
574 arr.iter()
575 .filter_map(|mp| resolve_mount_point(mp, &volumes_by_name))
576 .collect::<Vec<_>>()
577 })
578 .unwrap_or_default();
579 let ulimits = def
580 .as_ref()
581 .and_then(|d| d.get("ulimits").and_then(|v| v.as_array()).cloned())
582 .map(|arr| arr.iter().filter_map(parse_ulimit).collect::<Vec<_>>())
583 .unwrap_or_default();
584 let linux_parameters = def
585 .as_ref()
586 .and_then(|d| d.get("linuxParameters"))
587 .and_then(parse_linux_parameters);
588 let stop_timeout = def.as_ref().and_then(|d| {
589 d.get("stopTimeout")
590 .and_then(|v| v.as_u64())
591 .map(|n| n as u32)
592 });
593 let user = def
594 .as_ref()
595 .and_then(|d| d.get("user").and_then(|v| v.as_str()).map(String::from));
596 let working_directory = def.as_ref().and_then(|d| {
597 d.get("workingDirectory")
598 .and_then(|v| v.as_str())
599 .map(String::from)
600 });
601 let tty = def
602 .as_ref()
603 .and_then(|d| d.get("tty").and_then(|v| v.as_bool()))
604 .unwrap_or(false);
605 let interactive = def
606 .as_ref()
607 .and_then(|d| d.get("interactive").and_then(|v| v.as_bool()))
608 .unwrap_or(false);
609 let readonly_rootfs = def
610 .as_ref()
611 .and_then(|d| d.get("readonlyRootFilesystem").and_then(|v| v.as_bool()))
612 .unwrap_or(false);
613 plans.push(ContainerPlan {
614 container_name: container.name.clone(),
615 image: container.image.clone(),
616 env,
617 entry_point: str_array("entryPoint"),
618 command: str_array("command"),
619 secrets_refs,
620 essential: container.essential,
621 has_task_role,
622 port_mappings,
623 network_mode: network_mode.clone(),
624 depends_on,
625 health_check,
626 volume_mounts,
627 ulimits,
628 linux_parameters,
629 stop_timeout,
630 user,
631 working_directory,
632 tty,
633 interactive,
634 readonly_rootfs,
635 });
636 }
637 let plans = topo_sort_plans(plans);
638 Ok(plans)
639}
640
641fn resolve_mount_point(
649 mount_point: &serde_json::Value,
650 volumes_by_name: &std::collections::HashMap<String, &serde_json::Value>,
651) -> Option<VolumeMount> {
652 let container_path = mount_point
653 .get("containerPath")
654 .and_then(|v| v.as_str())?
655 .to_string();
656 let source_volume = mount_point.get("sourceVolume").and_then(|v| v.as_str())?;
657 let read_only = mount_point
658 .get("readOnly")
659 .and_then(|v| v.as_bool())
660 .unwrap_or(false);
661 let volume = volumes_by_name.get(source_volume)?;
662 let source = resolve_volume_source(source_volume, volume)?;
663 Some(VolumeMount {
664 source,
665 container_path,
666 read_only,
667 })
668}
669
670fn resolve_volume_source(name: &str, volume: &serde_json::Value) -> Option<String> {
689 if let Some(host) = volume.get("host") {
690 if let Some(path) = host.get("sourcePath").and_then(|v| v.as_str()) {
691 if !path.is_empty() {
694 ensure_dir_exists(path);
695 return Some(path.to_string());
696 }
697 }
698 }
699 if let Some(efs) = volume.get("efsVolumeConfiguration") {
700 let fs_id = efs.get("fileSystemId").and_then(|v| v.as_str())?;
701 let root = efs
702 .get("rootDirectory")
703 .and_then(|v| v.as_str())
704 .unwrap_or("/");
705 return Some(shared_volume_name("efs", fs_id, root));
706 }
707 if let Some(fsx) = volume.get("fsxWindowsFileServerVolumeConfiguration") {
708 let fs_id = fsx.get("fileSystemId").and_then(|v| v.as_str())?;
709 let root = fsx
710 .get("rootDirectory")
711 .and_then(|v| v.as_str())
712 .unwrap_or("/");
713 return Some(shared_volume_name("fsx", fs_id, root));
714 }
715 if volume.get("dockerVolumeConfiguration").is_some() {
716 return Some(name.to_string());
719 }
720 Some(name.to_string())
722}
723
724fn shared_volume_name(kind: &str, fs_id: &str, root: &str) -> String {
735 let trimmed = root.trim_start_matches('/').trim_end_matches('/');
736 let fs_id = sanitize_volume_segment(fs_id);
737 if trimmed.is_empty() {
738 format!("fakecloud-{kind}-{fs_id}")
739 } else {
740 format!(
741 "fakecloud-{kind}-{fs_id}-{}",
742 sanitize_volume_segment(trimmed)
743 )
744 }
745}
746
747fn sanitize_volume_segment(s: &str) -> String {
750 s.chars()
751 .map(|c| {
752 if c.is_ascii_alphanumeric() || matches!(c, '_' | '.' | '-') {
753 c
754 } else {
755 '-'
756 }
757 })
758 .collect()
759}
760
761fn ensure_dir_exists(path: &str) {
766 let _ = std::fs::create_dir_all(path);
767}
768
769fn parse_depends_on_entry(value: &serde_json::Value) -> Option<DependsOn> {
774 let container_name = value
775 .get("containerName")
776 .and_then(|v| v.as_str())?
777 .to_string();
778 let raw_condition = value.get("condition").and_then(|v| v.as_str())?;
779 let condition = DependsOnCondition::parse(raw_condition)?;
780 Some(DependsOn {
781 container_name,
782 condition,
783 })
784}
785
786fn topo_sort_plans(plans: Vec<ContainerPlan>) -> Vec<ContainerPlan> {
797 use std::collections::{HashMap, HashSet};
798 let names: HashSet<String> = plans.iter().map(|p| p.container_name.clone()).collect();
799 let index: HashMap<String, usize> = plans
800 .iter()
801 .enumerate()
802 .map(|(i, p)| (p.container_name.clone(), i))
803 .collect();
804 let mut in_degree: Vec<usize> = plans
809 .iter()
810 .map(|p| {
811 p.depends_on
812 .iter()
813 .filter(|d| names.contains(&d.container_name))
814 .count()
815 })
816 .collect();
817 let mut dependants: Vec<Vec<usize>> = vec![Vec::new(); plans.len()];
819 for (i, p) in plans.iter().enumerate() {
820 for d in &p.depends_on {
821 if let Some(&di) = index.get(&d.container_name) {
822 dependants[di].push(i);
823 }
824 }
825 }
826 let mut ordered: Vec<ContainerPlan> = Vec::with_capacity(plans.len());
827 let mut emitted: Vec<bool> = vec![false; plans.len()];
828 loop {
829 let next = (0..plans.len()).find(|&i| !emitted[i] && in_degree[i] == 0);
832 match next {
833 Some(i) => {
834 emitted[i] = true;
835 ordered.push(plans[i].clone());
836 for &di in &dependants[i] {
837 if in_degree[di] > 0 {
838 in_degree[di] -= 1;
839 }
840 }
841 }
842 None => break,
843 }
844 }
845 for (i, p) in plans.into_iter().enumerate() {
847 if !emitted[i] {
848 ordered.push(p);
849 }
850 }
851 ordered
852}
853
854pub(crate) fn find_depends_on_cycle(
863 container_definitions: &[serde_json::Value],
864) -> Option<(String, String)> {
865 use std::collections::HashMap;
866
867 let names: Vec<String> = container_definitions
868 .iter()
869 .filter_map(|c| c.get("name").and_then(|n| n.as_str()).map(String::from))
870 .collect();
871 let index: HashMap<&str, usize> = names
872 .iter()
873 .enumerate()
874 .map(|(i, n)| (n.as_str(), i))
875 .collect();
876
877 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); names.len()];
878 for (i, cd) in container_definitions.iter().enumerate() {
879 if i >= names.len() {
880 continue;
881 }
882 let Some(deps) = cd.get("dependsOn").and_then(|v| v.as_array()) else {
883 continue;
884 };
885 for d in deps {
886 let Some(target) = d.get("containerName").and_then(|v| v.as_str()) else {
887 continue;
888 };
889 if let Some(&j) = index.get(target) {
890 adj[i].push(j);
892 }
893 }
894 }
895
896 let mut state = vec![0u8; names.len()];
900 let mut stack: Vec<(usize, usize)> = Vec::new();
901 for start in 0..names.len() {
902 if state[start] != 0 {
903 continue;
904 }
905 stack.clear();
906 stack.push((start, 0));
907 state[start] = 1;
908 while let Some(&(node, next_edge)) = stack.last() {
909 if next_edge < adj[node].len() {
910 let nb = adj[node][next_edge];
911 stack.last_mut().unwrap().1 += 1;
912 match state[nb] {
913 0 => {
914 state[nb] = 1;
915 stack.push((nb, 0));
916 }
917 1 => {
918 return Some((names[node].clone(), names[nb].clone()));
919 }
920 _ => {}
921 }
922 } else {
923 state[node] = 2;
924 stack.pop();
925 }
926 }
927 }
928 None
929}
930
931#[derive(Debug, Clone)]
935struct InspectedState {
936 started: bool,
937 exited: bool,
938 exit_code: i64,
939 health: Option<String>,
940}
941
942async fn inspect_container_state(cli: &str, container_id: &str) -> Option<InspectedState> {
946 let format =
949 "{{.State.Status}}|{{.State.Running}}|{{.State.ExitCode}}|{{if .State.Health}}{{.State.Health.Status}}{{else}}<none>{{end}}";
950 let out = Command::new(cli)
951 .args(["inspect", "-f", format, container_id])
952 .output()
953 .await
954 .ok()?;
955 if !out.status.success() {
956 return None;
957 }
958 let raw = String::from_utf8_lossy(&out.stdout).trim().to_string();
959 let parts: Vec<&str> = raw.split('|').collect();
960 if parts.len() < 4 {
961 return None;
962 }
963 let status = parts[0];
964 let running = parts[1] == "true";
965 let exit_code: i64 = parts[2].parse().unwrap_or(-1);
966 let health = match parts[3] {
967 "<none>" | "" => None,
968 other => Some(other.to_string()),
969 };
970 let started = running || status == "exited" || status == "running" || status == "dead";
974 let exited = status == "exited" || status == "dead";
975 Some(InspectedState {
976 started,
977 exited,
978 exit_code,
979 health,
980 })
981}
982
983fn condition_is_met(condition: DependsOnCondition, state: &InspectedState) -> bool {
987 match condition {
988 DependsOnCondition::Start => state.started,
989 DependsOnCondition::Complete => state.exited,
990 DependsOnCondition::Success => state.exited && state.exit_code == 0,
991 DependsOnCondition::Healthy => state.health.as_deref() == Some("healthy"),
992 }
993}
994
995#[cfg(test)]
999pub(crate) fn __test_parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
1000 parse_port_mapping(value)
1001}
1002
1003fn parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
1009 let cmd_arr = value.get("command")?.as_array()?;
1010 let command: Vec<String> = cmd_arr
1011 .iter()
1012 .filter_map(|v| v.as_str().map(String::from))
1013 .collect();
1014 if command.is_empty() {
1015 return None;
1016 }
1017 if command.first().map(|s| s.as_str()) == Some("NONE") {
1018 return None;
1019 }
1020 let read_u32 = |key: &str, default: u32| -> u32 {
1021 value
1022 .get(key)
1023 .and_then(|v| v.as_i64())
1024 .filter(|n| (0..=u32::MAX as i64).contains(n))
1025 .map(|n| n as u32)
1026 .unwrap_or(default)
1027 };
1028 Some(HealthCheckSpec {
1029 command,
1030 interval_seconds: read_u32("interval", 30),
1031 timeout_seconds: read_u32("timeout", 5),
1032 retries: read_u32("retries", 3),
1033 start_period_seconds: read_u32("startPeriod", 0),
1034 })
1035}
1036
1037fn parse_ulimit(value: &serde_json::Value) -> Option<Ulimit> {
1039 let name = value.get("name").and_then(|v| v.as_str())?;
1040 let soft = value
1041 .get("softLimit")
1042 .and_then(|v| v.as_i64())
1043 .filter(|n| *n >= 0)? as i32;
1044 let hard = value
1045 .get("hardLimit")
1046 .and_then(|v| v.as_i64())
1047 .filter(|n| *n >= 0)? as i32;
1048 Some(Ulimit {
1049 name: name.to_string(),
1050 soft_limit: soft,
1051 hard_limit: hard,
1052 })
1053}
1054
1055fn parse_linux_parameters(value: &serde_json::Value) -> Option<LinuxParameters> {
1057 let mut lp = LinuxParameters::default();
1058 if let Some(arr) = value
1059 .get("capabilities")
1060 .and_then(|v| v.get("add"))
1061 .and_then(|v| v.as_array())
1062 {
1063 lp.capabilities_add = arr
1064 .iter()
1065 .filter_map(|v| v.as_str().map(String::from))
1066 .collect();
1067 }
1068 if let Some(arr) = value
1069 .get("capabilities")
1070 .and_then(|v| v.get("drop"))
1071 .and_then(|v| v.as_array())
1072 {
1073 lp.capabilities_drop = arr
1074 .iter()
1075 .filter_map(|v| v.as_str().map(String::from))
1076 .collect();
1077 }
1078 if let Some(arr) = value.get("devices").and_then(|v| v.as_array()) {
1079 lp.devices = arr.iter().filter_map(parse_device).collect();
1080 }
1081 lp.init_process_enabled = value
1082 .get("initProcessEnabled")
1083 .and_then(|v| v.as_bool())
1084 .unwrap_or(false);
1085 lp.shared_memory_size = value
1086 .get("sharedMemorySize")
1087 .and_then(|v| v.as_i64())
1088 .map(|n| n as i32);
1089 if let Some(arr) = value.get("sysctl").and_then(|v| v.as_array()) {
1090 lp.sysctls = arr.iter().filter_map(parse_sysctl).collect();
1091 }
1092 if let Some(arr) = value.get("tmpfs").and_then(|v| v.as_array()) {
1093 lp.tmpfs = arr.iter().filter_map(parse_tmpfs).collect();
1094 }
1095 lp.privileged = value
1096 .get("privileged")
1097 .and_then(|v| v.as_bool())
1098 .unwrap_or(false);
1099 Some(lp)
1100}
1101
1102fn parse_device(value: &serde_json::Value) -> Option<Device> {
1103 let host_path = value.get("hostPath").and_then(|v| v.as_str())?.to_string();
1104 let container_path = value
1105 .get("containerPath")
1106 .and_then(|v| v.as_str())?
1107 .to_string();
1108 let permissions = value
1109 .get("permissions")
1110 .and_then(|v| v.as_str())
1111 .unwrap_or("rwm")
1112 .to_string();
1113 Some(Device {
1114 host_path,
1115 container_path,
1116 permissions,
1117 })
1118}
1119
1120fn parse_sysctl(value: &serde_json::Value) -> Option<Sysctl> {
1121 let name = value.get("name").and_then(|v| v.as_str())?.to_string();
1122 let value_str = value.get("value").and_then(|v| v.as_str())?.to_string();
1123 Some(Sysctl {
1124 name,
1125 value: value_str,
1126 })
1127}
1128
1129fn parse_tmpfs(value: &serde_json::Value) -> Option<Tmpfs> {
1130 let container_path = value
1131 .get("containerPath")
1132 .and_then(|v| v.as_str())?
1133 .to_string();
1134 let size = value
1135 .get("size")
1136 .and_then(|v| v.as_i64())
1137 .filter(|n| *n > 0)? as i32;
1138 let mount_options = value
1139 .get("mountOptions")
1140 .and_then(|v| v.as_array())
1141 .map(|arr| {
1142 arr.iter()
1143 .filter_map(|v| v.as_str().map(String::from))
1144 .collect()
1145 })
1146 .unwrap_or_default();
1147 Some(Tmpfs {
1148 container_path,
1149 size,
1150 mount_options,
1151 })
1152}
1153
1154pub(crate) fn render_health_flags(hc: &HealthCheckSpec) -> Vec<String> {
1161 if hc.command.len() < 2 {
1162 return Vec::new();
1163 }
1164 let cmd_kind = hc.command[0].as_str();
1165 if cmd_kind != "CMD" && cmd_kind != "CMD-SHELL" {
1166 return Vec::new();
1167 }
1168 let cmd_string = hc.command[1..].join(" ");
1169 vec![
1170 "--health-cmd".into(),
1171 cmd_string,
1172 format!("--health-interval={}s", hc.interval_seconds),
1173 format!("--health-timeout={}s", hc.timeout_seconds),
1174 format!("--health-retries={}", hc.retries),
1175 format!("--health-start-period={}s", hc.start_period_seconds),
1176 ]
1177}
1178
1179#[cfg(test)]
1183pub(crate) fn __test_parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
1184 parse_health_check(value)
1185}
1186
1187pub(crate) fn docker_health_to_ecs(raw: &str) -> &'static str {
1193 match raw.trim().to_ascii_lowercase().as_str() {
1194 "healthy" => "HEALTHY",
1195 "unhealthy" => "UNHEALTHY",
1196 _ => "UNKNOWN",
1197 }
1198}
1199
1200fn parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
1204 let container_port = value
1205 .get("containerPort")
1206 .and_then(|v| v.as_i64())
1207 .filter(|n| (0..=u16::MAX as i64).contains(n))? as u16;
1208 let host_port_raw = value
1209 .get("hostPort")
1210 .and_then(|v| v.as_i64())
1211 .filter(|n| (0..=u16::MAX as i64).contains(n))
1212 .map(|n| n as u16)
1213 .unwrap_or(0);
1214 let host_port = if host_port_raw == 0 {
1215 container_port
1216 } else {
1217 host_port_raw
1218 };
1219 let protocol = value
1220 .get("protocol")
1221 .and_then(|v| v.as_str())
1222 .map(|s| s.to_ascii_lowercase())
1223 .unwrap_or_else(|| "tcp".to_string());
1224 Some(PortMapping {
1225 container_port,
1226 host_port,
1227 protocol,
1228 })
1229}
1230
1231pub(crate) fn fakecloud_instance_label() -> String {
1237 format!("fakecloud-instance=fakecloud-{}", std::process::id())
1238}
1239
1240pub(crate) fn build_run_argv(
1246 plan: &ContainerPlan,
1247 env: &[(String, String)],
1248 task_id: &str,
1249 host_alias: &str,
1250 add_host_arg: Option<&str>,
1251 run_image: &str,
1252 awsvpc_network_ready: bool,
1253) -> Vec<String> {
1254 let mut argv: Vec<String> = Vec::new();
1255 argv.push("run".into());
1256 argv.push("-d".into());
1257 argv.push("--name".into());
1258 argv.push(format!("{}-{}", task_id, plan.container_name));
1259 argv.push("--label".into());
1260 argv.push(format!("fakecloud-ecs-task={}", task_id));
1261 argv.push("--label".into());
1262 argv.push(format!("fakecloud-ecs-container={}", plan.container_name));
1263 argv.push("--label".into());
1270 argv.push(fakecloud_instance_label());
1271 if let Some(arg) = add_host_arg {
1275 argv.push("--add-host".into());
1276 argv.push(arg.to_string());
1277 }
1278 let use_awsvpc_network = plan.network_mode.as_deref() == Some("awsvpc") && awsvpc_network_ready;
1279 if use_awsvpc_network {
1280 argv.push("--network".into());
1281 argv.push(format!("fakecloud-ecs-{}", task_id));
1282 }
1283 let publish_ports = !use_awsvpc_network;
1289 if publish_ports {
1290 for pm in &plan.port_mappings {
1291 argv.push("--publish".into());
1292 argv.push(format!(
1293 "{}:{}/{}",
1294 pm.container_port, pm.host_port, pm.protocol
1295 ));
1296 }
1297 }
1298 if let Some(ref hc) = plan.health_check {
1299 argv.extend(render_health_flags(hc));
1300 }
1301 let http_alias_prefix = format!("http://{host_alias}:");
1302 let https_alias_prefix = format!("https://{host_alias}:");
1303 for (k, v) in env {
1304 let transformed = v
1305 .replace("http://127.0.0.1:", http_alias_prefix.as_str())
1306 .replace("https://127.0.0.1:", https_alias_prefix.as_str())
1307 .replace("http://localhost:", http_alias_prefix.as_str())
1308 .replace("https://localhost:", https_alias_prefix.as_str());
1309 argv.push("-e".into());
1310 argv.push(format!("{}={}", k, transformed));
1311 }
1312 for vm in &plan.volume_mounts {
1317 argv.push("-v".into());
1318 let suffix = if vm.read_only { ":ro" } else { "" };
1319 argv.push(format!("{}:{}{}", vm.source, vm.container_path, suffix));
1320 }
1321 for ul in &plan.ulimits {
1322 argv.push("--ulimit".into());
1323 argv.push(format!("{}={}:{}", ul.name, ul.soft_limit, ul.hard_limit));
1324 }
1325 if let Some(ref lp) = plan.linux_parameters {
1326 for cap in &lp.capabilities_add {
1327 argv.push("--cap-add".into());
1328 argv.push(cap.clone());
1329 }
1330 for cap in &lp.capabilities_drop {
1331 argv.push("--cap-drop".into());
1332 argv.push(cap.clone());
1333 }
1334 for dev in &lp.devices {
1335 argv.push("--device".into());
1336 argv.push(format!(
1337 "{}:{}{}",
1338 dev.host_path, dev.container_path, dev.permissions
1339 ));
1340 }
1341 if lp.init_process_enabled {
1342 argv.push("--init".into());
1343 }
1344 if let Some(size) = lp.shared_memory_size {
1345 argv.push("--shm-size".into());
1346 argv.push(format!("{}m", size));
1347 }
1348 for sys in &lp.sysctls {
1349 argv.push("--sysctl".into());
1350 argv.push(format!("{}={}", sys.name, sys.value));
1351 }
1352 for tmp in &lp.tmpfs {
1353 let mut opts = tmp.mount_options.join(",");
1354 if !opts.is_empty() {
1355 opts = format!(",{}", opts);
1356 }
1357 argv.push("--tmpfs".into());
1358 argv.push(format!("{}:size={}M{}", tmp.container_path, tmp.size, opts));
1359 }
1360 if lp.privileged {
1361 argv.push("--privileged".into());
1362 }
1363 }
1364 if let Some(timeout) = plan.stop_timeout {
1365 argv.push("--stop-timeout".into());
1366 argv.push(format!("{}", timeout));
1367 }
1368 if let Some(ref user) = plan.user {
1369 argv.push("--user".into());
1370 argv.push(user.clone());
1371 }
1372 if let Some(ref wd) = plan.working_directory {
1373 argv.push("--workdir".into());
1374 argv.push(wd.clone());
1375 }
1376 if plan.tty {
1377 argv.push("--tty".into());
1378 }
1379 if plan.interactive {
1380 argv.push("--interactive".into());
1381 }
1382 if plan.readonly_rootfs {
1383 argv.push("--read-only".into());
1384 }
1385 if let Some(first) = plan.entry_point.first() {
1386 argv.push("--entrypoint".into());
1387 argv.push(first.clone());
1388 }
1389 argv.push(run_image.to_string());
1390 for arg in plan.entry_point.iter().skip(1) {
1391 argv.push(arg.clone());
1392 }
1393 for arg in &plan.command {
1394 argv.push(arg.clone());
1395 }
1396 argv
1397}
1398
1399pub(crate) fn network_bindings_for(plan: &ContainerPlan) -> Vec<serde_json::Value> {
1403 if plan.network_mode.as_deref() == Some("awsvpc") {
1404 return Vec::new();
1405 }
1406 plan.port_mappings
1407 .iter()
1408 .map(|pm| {
1409 serde_json::json!({
1410 "bindIP": "0.0.0.0",
1411 "containerPort": pm.container_port,
1412 "hostPort": pm.host_port,
1413 "protocol": pm.protocol,
1414 })
1415 })
1416 .collect()
1417}
1418
1419#[allow(clippy::type_complexity)]
1423pub(crate) fn compute_elbv2_targets(
1424 ecs_state: &crate::state::EcsState,
1425 task: &crate::state::Task,
1426) -> Vec<(String, Vec<(String, Option<i64>)>)> {
1427 let mut result = Vec::new();
1428 let Some(group) = task.group.as_deref() else {
1429 return result;
1430 };
1431 let service_name = group.strip_prefix("service:").unwrap_or(group);
1432 let key = crate::state::EcsState::service_key(&task.cluster_name, service_name);
1433 let Some(service) = ecs_state.services.get(&key) else {
1434 return result;
1435 };
1436
1437 let network_mode = ecs_state
1438 .task_definitions
1439 .get(&task.family)
1440 .and_then(|revs| revs.get(&task.revision))
1441 .and_then(|td| td.network_mode.as_deref());
1442
1443 for lb in &service.load_balancers {
1444 let tg_arn = lb.get("targetGroupArn").and_then(|v| v.as_str());
1445 let container_name = lb.get("containerName").and_then(|v| v.as_str());
1446 let container_port = lb.get("containerPort").and_then(|v| v.as_i64());
1447 let Some(tg_arn) = tg_arn else { continue };
1448 let Some(container_name) = container_name else {
1449 continue;
1450 };
1451
1452 let target_id = if network_mode == Some("awsvpc") {
1453 task.attachments
1454 .iter()
1455 .find(|a| a.attachment_type == "eni")
1456 .and_then(|eni| {
1457 eni.details
1458 .iter()
1459 .find(|d| d.name == "privateIPv4Address")
1460 .map(|d| d.value.clone())
1461 })
1462 } else {
1463 Some("127.0.0.1".to_string())
1464 };
1465
1466 let port = if network_mode == Some("awsvpc") {
1467 container_port
1468 } else {
1469 task.containers
1470 .iter()
1471 .find(|c| c.name == container_name)
1472 .and_then(|c| {
1473 c.network_bindings
1474 .iter()
1475 .find(|nb| {
1476 nb.get("containerPort").and_then(|v| v.as_i64()) == container_port
1477 })
1478 .and_then(|nb| nb.get("hostPort").and_then(|v| v.as_i64()))
1479 })
1480 };
1481
1482 if let Some(id) = target_id {
1483 if let Some(entry) = result.iter_mut().find(|(arn, _)| arn == tg_arn) {
1484 entry.1.push((id, port));
1485 } else {
1486 result.push((tg_arn.to_string(), vec![(id, port)]));
1487 }
1488 }
1489 }
1490 result
1491}
1492
1493struct TaskSnapshot {
1494 task_arn: String,
1495 cluster_arn: String,
1496 launch_type: String,
1497 group: Option<String>,
1498 task_definition_arn: String,
1499 containers: serde_json::Value,
1500}
1501
1502fn snapshot_task(state: &SharedEcsState, account_id: &str, task_id: &str) -> Option<TaskSnapshot> {
1503 let accounts = state.read();
1504 let s = accounts.get(account_id)?;
1505 let task = s.tasks.get(task_id)?;
1506 Some(TaskSnapshot {
1507 task_arn: task.task_arn.clone(),
1508 cluster_arn: task.cluster_arn.clone(),
1509 launch_type: task.launch_type.clone(),
1510 group: task.group.clone(),
1511 task_definition_arn: task.task_definition_arn.clone(),
1512 containers: serde_json::Value::Array(
1513 task.containers
1514 .iter()
1515 .map(|c| {
1516 serde_json::json!({
1517 "containerArn": c.container_arn,
1518 "name": c.name,
1519 "image": c.image,
1520 "lastStatus": c.last_status,
1521 "exitCode": c.exit_code,
1522 "reason": c.reason,
1523 })
1524 })
1525 .collect(),
1526 ),
1527 })
1528}
1529
1530fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
1540 let dir = TempDir::new().ok()?;
1541 let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-ecs-runtime");
1542 let auths: serde_json::Map<String, serde_json::Value> =
1543 fakecloud_core::container_net::registry_auth_hosts(server_port)
1544 .into_iter()
1545 .map(|host| (host, serde_json::json!({ "auth": auth })))
1546 .collect();
1547 let config = serde_json::json!({ "auths": auths });
1548 std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
1549 Some(dir)
1550}
1551
1552fn find_container_definition(
1553 state: &crate::state::EcsState,
1554 family: &str,
1555 revision: i32,
1556 name: &str,
1557) -> Option<serde_json::Value> {
1558 state
1559 .task_definitions
1560 .get(family)?
1561 .get(&revision)?
1562 .container_definitions
1563 .iter()
1564 .find(|c| c.get("name").and_then(|v| v.as_str()) == Some(name))
1565 .cloned()
1566}
1567
1568fn mark_pull_started(state: &SharedEcsState, account_id: &str, task_id: &str) {
1569 let mut accounts = state.write();
1570 let Some(s) = accounts.get_mut(account_id) else {
1571 return;
1572 };
1573 let task_arn_cluster = s
1574 .tasks
1575 .get(task_id)
1576 .map(|t| (t.task_arn.clone(), t.cluster_arn.clone()));
1577 if let Some(task) = s.tasks.get_mut(task_id) {
1578 task.pull_started_at = Some(Utc::now());
1579 }
1580 if let Some((arn, cluster_arn)) = task_arn_cluster {
1581 s.push_event(LifecycleEvent {
1582 at: Utc::now(),
1583 event_type: "PullStarted".into(),
1584 task_arn: Some(arn),
1585 cluster_arn: Some(cluster_arn),
1586 last_status: Some("PENDING".into()),
1587 detail: serde_json::json!({}),
1588 });
1589 }
1590}
1591
1592fn mark_pull_stopped(state: &SharedEcsState, account_id: &str, task_id: &str) {
1593 let mut accounts = state.write();
1594 let Some(s) = accounts.get_mut(account_id) else {
1595 return;
1596 };
1597 if let Some(task) = s.tasks.get_mut(task_id) {
1598 task.pull_stopped_at = Some(Utc::now());
1599 }
1600}
1601
1602pub(crate) fn mark_running_multi(
1603 state: &SharedEcsState,
1604 account_id: &str,
1605 task_id: &str,
1606 started: &[RunningContainer],
1607) {
1608 let mut accounts = state.write();
1609 let Some(s) = accounts.get_mut(account_id) else {
1610 return;
1611 };
1612 let (arn, cluster_arn) = {
1613 let Some(task) = s.tasks.get_mut(task_id) else {
1614 return;
1615 };
1616 task.last_status = "RUNNING".into();
1617 task.connectivity = "CONNECTED".into();
1618 task.connectivity_at = Some(Utc::now());
1619 task.started_at = Some(Utc::now());
1620 for rc in started {
1621 if let Some(c) = task.containers.iter_mut().find(|c| c.name == rc.name) {
1622 c.runtime_id = Some(rc.container_id.clone());
1623 c.last_status = "RUNNING".into();
1624 c.network_bindings = rc.network_bindings.clone();
1625 if rc.image_digest.is_some() {
1626 c.image_digest = rc.image_digest.clone();
1627 }
1628 }
1629 }
1630 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1631 cluster.running_tasks_count += 1;
1632 if cluster.pending_tasks_count > 0 {
1633 cluster.pending_tasks_count -= 1;
1634 }
1635 }
1636 if let Some(ref ci_arn) = task.container_instance_arn {
1637 if let Some(ci) = s
1638 .container_instances
1639 .values_mut()
1640 .find(|ci| ci.container_instance_arn == *ci_arn)
1641 {
1642 ci.running_tasks_count += 1;
1643 if ci.pending_tasks_count > 0 {
1644 ci.pending_tasks_count -= 1;
1645 }
1646 }
1647 }
1648 (task.task_arn.clone(), task.cluster_arn.clone())
1649 };
1650 s.push_event(LifecycleEvent {
1651 at: Utc::now(),
1652 event_type: "TaskStateChange".into(),
1653 task_arn: Some(arn),
1654 cluster_arn: Some(cluster_arn),
1655 last_status: Some("RUNNING".into()),
1656 detail: serde_json::json!({}),
1657 });
1658}
1659
1660#[allow(clippy::too_many_arguments)]
1661fn finalize_stopped_multi(
1662 state: &SharedEcsState,
1663 account_id: &str,
1664 task_id: &str,
1665 final_containers: &[RunningContainer],
1666 primary_exit_code: i64,
1667 captured: &str,
1668 stop_code: &str,
1669 stopped_reason: Option<String>,
1670) {
1671 let mut accounts = state.write();
1672 let Some(s) = accounts.get_mut(account_id) else {
1673 return;
1674 };
1675 let (arn, cluster_arn) = {
1676 let Some(task) = s.tasks.get_mut(task_id) else {
1677 return;
1678 };
1679 task.last_status = "STOPPED".into();
1680 task.desired_status = "STOPPED".into();
1681 task.stopping_at = task.stopping_at.or(Some(Utc::now()));
1682 task.stopped_at = Some(Utc::now());
1683 task.stop_code = Some(stop_code.into());
1684 task.stopped_reason = stopped_reason.or(Some(format!("Exit code {}", primary_exit_code)));
1685 task.captured_logs = captured.to_string();
1686 for c in task.containers.iter_mut() {
1687 c.last_status = "STOPPED".into();
1688 if c.exit_code.is_none() {
1689 let mapped = final_containers
1690 .iter()
1691 .find(|r| r.name == c.name)
1692 .and_then(|r| r.exit_code);
1693 c.exit_code = mapped.or(Some(primary_exit_code));
1694 }
1695 }
1696 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1697 if cluster.running_tasks_count > 0 {
1698 cluster.running_tasks_count -= 1;
1699 }
1700 }
1701 if let Some(ref ci_arn) = task.container_instance_arn {
1702 if let Some(ci) = s
1703 .container_instances
1704 .values_mut()
1705 .find(|ci| ci.container_instance_arn == *ci_arn)
1706 {
1707 if ci.running_tasks_count > 0 {
1708 ci.running_tasks_count -= 1;
1709 }
1710 }
1711 }
1712 (task.task_arn.clone(), task.cluster_arn.clone())
1713 };
1714 s.push_event(LifecycleEvent {
1715 at: Utc::now(),
1716 event_type: "TaskStateChange".into(),
1717 task_arn: Some(arn),
1718 cluster_arn: Some(cluster_arn),
1719 last_status: Some("STOPPED".into()),
1720 detail: serde_json::json!({
1721 "exitCode": primary_exit_code,
1722 "stopCode": stop_code,
1723 }),
1724 });
1725}
1726
1727fn finalize_failure(state: &SharedEcsState, account_id: &str, task_id: &str, reason: &str) {
1728 let mut accounts = state.write();
1729 let Some(s) = accounts.get_mut(account_id) else {
1730 return;
1731 };
1732 let (arn, cluster_arn) = {
1733 let Some(task) = s.tasks.get_mut(task_id) else {
1734 return;
1735 };
1736 let was_running = task.last_status == "RUNNING";
1742 task.last_status = "STOPPED".into();
1743 task.desired_status = "STOPPED".into();
1744 task.stopped_at = Some(Utc::now());
1745 task.stop_code = Some("TaskFailedToStart".into());
1746 task.stopped_reason = Some(reason.to_string());
1747 task.captured_logs = format!("[task failed to start]: {reason}");
1751 for c in task.containers.iter_mut() {
1752 c.last_status = "STOPPED".into();
1753 c.reason = Some(reason.to_string());
1754 }
1755 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1756 if was_running {
1757 if cluster.running_tasks_count > 0 {
1758 cluster.running_tasks_count -= 1;
1759 }
1760 } else if cluster.pending_tasks_count > 0 {
1761 cluster.pending_tasks_count -= 1;
1762 }
1763 }
1764 if let Some(ref ci_arn) = task.container_instance_arn {
1765 if let Some(ci) = s
1766 .container_instances
1767 .values_mut()
1768 .find(|ci| ci.container_instance_arn == *ci_arn)
1769 {
1770 if was_running {
1771 if ci.running_tasks_count > 0 {
1772 ci.running_tasks_count -= 1;
1773 }
1774 } else if ci.pending_tasks_count > 0 {
1775 ci.pending_tasks_count -= 1;
1776 }
1777 }
1778 }
1779 (task.task_arn.clone(), task.cluster_arn.clone())
1780 };
1781 s.push_event(LifecycleEvent {
1782 at: Utc::now(),
1783 event_type: "TaskFailedToStart".into(),
1784 task_arn: Some(arn),
1785 cluster_arn: Some(cluster_arn),
1786 last_status: Some("STOPPED".into()),
1787 detail: serde_json::json!({ "reason": reason }),
1788 });
1789}
1790
1791pub async fn sleep(duration: Duration) {
1795 tokio::time::sleep(duration).await;
1796}
1797
1798#[cfg(test)]
1799mod tests {
1800 use super::*;
1801 use crate::state::{EcsState, Task};
1802 use fakecloud_aws::arn::Arn;
1803 use fakecloud_core::multi_account::MultiAccountState;
1804 use parking_lot::RwLock;
1805 use std::sync::Arc;
1806
1807 #[test]
1808 fn cli_available_for_known_missing_binary_is_false() {
1809 assert!(!fakecloud_core::container_net::cli_available(
1810 "definitely-not-a-real-cli-binary-xyz"
1811 ));
1812 }
1813
1814 #[test]
1815 fn aws_ecr_uris_translate_for_local_pull() {
1816 assert_eq!(
1817 fakecloud_core::ecr_uri::translate_to_local(
1818 "123456789012.dkr.ecr.us-east-1.amazonaws.com/app:latest",
1819 4566
1820 )
1821 .as_deref(),
1822 Some("127.0.0.1:4566/app:latest")
1823 );
1824 }
1825
1826 fn make_task(task_id: &str) -> Task {
1827 Task {
1828 task_arn: Arn::new(
1829 "ecs",
1830 "us-east-1",
1831 "000000000000",
1832 &format!("task/default/{task_id}"),
1833 )
1834 .to_string(),
1835 task_id: task_id.into(),
1836 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
1837 cluster_name: "default".into(),
1838 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
1839 family: "app".into(),
1840 revision: 1,
1841 container_instance_arn: None,
1842 capacity_provider_name: None,
1843 last_status: "PENDING".into(),
1844 desired_status: "RUNNING".into(),
1845 launch_type: "FARGATE".into(),
1846 platform_version: None,
1847 cpu: None,
1848 memory: None,
1849 containers: Vec::new(),
1850 overrides: serde_json::json!({}),
1851 started_by: None,
1852 group: None,
1853 connectivity: "CONNECTING".into(),
1854 stop_code: None,
1855 stopped_reason: None,
1856 created_at: Utc::now(),
1857 started_at: None,
1858 stopping_at: None,
1859 stopped_at: None,
1860 pull_started_at: None,
1861 pull_stopped_at: None,
1862 connectivity_at: None,
1863 started_by_ref_id: None,
1864 execution_role_arn: None,
1865 task_role_arn: None,
1866 tags: Vec::new(),
1867 awslogs: None,
1868 captured_logs: String::new(),
1869 protection: None,
1870 enable_execute_command: false,
1871 attachments: Vec::new(),
1872 volume_configurations: Vec::new(),
1873 task_set_arn: None,
1874 }
1875 }
1876
1877 #[test]
1878 fn finalize_failure_writes_reason_into_captured_logs() {
1879 let mut accounts: MultiAccountState<EcsState> =
1880 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1881 let acct = accounts.get_or_create("000000000000");
1882 acct.tasks.insert("t1".into(), make_task("t1"));
1883 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1884
1885 finalize_failure(
1886 &state,
1887 "000000000000",
1888 "t1",
1889 "failed to resolve secret DB_PASSWORD",
1890 );
1891
1892 let accounts = state.read();
1893 let task = accounts
1894 .get("000000000000")
1895 .unwrap()
1896 .tasks
1897 .get("t1")
1898 .unwrap();
1899 assert_eq!(task.last_status, "STOPPED");
1900 assert_eq!(task.stop_code.as_deref(), Some("TaskFailedToStart"));
1901 assert!(
1902 task.captured_logs
1903 .contains("failed to resolve secret DB_PASSWORD"),
1904 "captured_logs missing reason: {:?}",
1905 task.captured_logs
1906 );
1907 assert!(
1908 task.captured_logs.starts_with("[task failed to start]:"),
1909 "captured_logs missing prefix: {:?}",
1910 task.captured_logs
1911 );
1912 }
1913
1914 #[test]
1919 fn task_desired_stopped_detects_stop_during_launch() {
1920 let mut accounts: MultiAccountState<EcsState> =
1921 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1922 let acct = accounts.get_or_create("000000000000");
1923 acct.tasks.insert("running".into(), make_task("running"));
1924 let mut stopping = make_task("stopping");
1925 stopping.desired_status = "STOPPED".into();
1926 acct.tasks.insert("stopping".into(), stopping);
1927 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1928
1929 assert!(
1930 !task_desired_stopped(&state, "000000000000", "running"),
1931 "a RUNNING task must not be treated as stopped",
1932 );
1933 assert!(
1934 task_desired_stopped(&state, "000000000000", "stopping"),
1935 "a task whose desired_status is STOPPED must be treated as stopped",
1936 );
1937 assert!(
1938 task_desired_stopped(&state, "000000000000", "deleted-mid-launch"),
1939 "a task removed from state mid-launch must be treated as stopped",
1940 );
1941 }
1942
1943 fn make_container(name: &str, essential: bool) -> crate::state::Container {
1944 crate::state::Container {
1945 container_arn: format!(
1946 "arn:aws:ecs:us-east-1:000000000000:container/default/abc/{name}"
1947 ),
1948 name: name.into(),
1949 image: "alpine".into(),
1950 task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
1951 last_status: "RUNNING".into(),
1952 exit_code: None,
1953 reason: None,
1954 runtime_id: Some(format!("dockerid-{name}")),
1955 essential,
1956 cpu: None,
1957 memory: None,
1958 memory_reservation: None,
1959 network_bindings: Vec::new(),
1960 network_interfaces: Vec::new(),
1961 health_status: None,
1962 managed_agents: None,
1963 image_digest: None,
1964 }
1965 }
1966
1967 #[test]
1968 fn task_should_stop_when_essential_exits() {
1969 let containers = vec![
1970 RunningContainer {
1971 name: "app".into(),
1972 container_id: "id-app".into(),
1973 essential: true,
1974 exit_code: Some(0),
1975 network_bindings: Vec::new(),
1976 image_digest: None,
1977 },
1978 RunningContainer {
1979 name: "sidecar".into(),
1980 container_id: "id-sc".into(),
1981 essential: false,
1982 exit_code: None,
1983 network_bindings: Vec::new(),
1984 image_digest: None,
1985 },
1986 ];
1987 assert!(task_should_stop(&containers));
1988 }
1989
1990 #[test]
1991 fn task_keeps_running_when_only_non_essential_exits() {
1992 let containers = vec![
1993 RunningContainer {
1994 name: "app".into(),
1995 container_id: "id-app".into(),
1996 essential: true,
1997 exit_code: None,
1998 network_bindings: Vec::new(),
1999 image_digest: None,
2000 },
2001 RunningContainer {
2002 name: "sidecar".into(),
2003 container_id: "id-sc".into(),
2004 essential: false,
2005 exit_code: Some(0),
2006 network_bindings: Vec::new(),
2007 image_digest: None,
2008 },
2009 ];
2010 assert!(!task_should_stop(&containers));
2011 }
2012
2013 #[test]
2014 fn task_stops_when_all_non_essentials_exit() {
2015 let containers = vec![
2016 RunningContainer {
2017 name: "a".into(),
2018 container_id: "id-a".into(),
2019 essential: false,
2020 exit_code: Some(0),
2021 network_bindings: Vec::new(),
2022 image_digest: None,
2023 },
2024 RunningContainer {
2025 name: "b".into(),
2026 container_id: "id-b".into(),
2027 essential: false,
2028 exit_code: Some(1),
2029 network_bindings: Vec::new(),
2030 image_digest: None,
2031 },
2032 ];
2033 assert!(task_should_stop(&containers));
2034 }
2035
2036 #[test]
2037 fn finalize_stopped_multi_assigns_per_container_exit_codes() {
2038 let mut accounts: MultiAccountState<EcsState> =
2039 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2040 let acct = accounts.get_or_create("000000000000");
2041 let mut t = make_task("t1");
2042 t.containers = vec![
2043 make_container("app", true),
2044 make_container("sidecar", false),
2045 ];
2046 acct.tasks.insert("t1".into(), t);
2047 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
2048
2049 let final_containers = vec![
2050 RunningContainer {
2051 name: "app".into(),
2052 container_id: "id-app".into(),
2053 essential: true,
2054 exit_code: Some(0),
2055 network_bindings: Vec::new(),
2056 image_digest: None,
2057 },
2058 RunningContainer {
2059 name: "sidecar".into(),
2060 container_id: "id-sc".into(),
2061 essential: false,
2062 exit_code: Some(137),
2063 network_bindings: Vec::new(),
2064 image_digest: None,
2065 },
2066 ];
2067 finalize_stopped_multi(
2068 &state,
2069 "000000000000",
2070 "t1",
2071 &final_containers,
2072 0,
2073 "captured",
2074 "EssentialContainerExited",
2075 None,
2076 );
2077
2078 let accounts = state.read();
2079 let task = accounts
2080 .get("000000000000")
2081 .unwrap()
2082 .tasks
2083 .get("t1")
2084 .unwrap();
2085 assert_eq!(task.last_status, "STOPPED");
2086 assert_eq!(task.stop_code.as_deref(), Some("EssentialContainerExited"));
2087 let app = task.containers.iter().find(|c| c.name == "app").unwrap();
2088 let sc = task
2089 .containers
2090 .iter()
2091 .find(|c| c.name == "sidecar")
2092 .unwrap();
2093 assert_eq!(app.exit_code, Some(0));
2094 assert_eq!(sc.exit_code, Some(137));
2095 assert_eq!(app.last_status, "STOPPED");
2096 assert_eq!(sc.last_status, "STOPPED");
2097 }
2098
2099 fn plan(name: &str, deps: &[&str]) -> ContainerPlan {
2100 ContainerPlan {
2101 container_name: name.into(),
2102 image: "alpine".into(),
2103 env: Vec::new(),
2104 entry_point: Vec::new(),
2105 command: Vec::new(),
2106 secrets_refs: Vec::new(),
2107 essential: true,
2108 has_task_role: false,
2109 port_mappings: Vec::new(),
2110 network_mode: None,
2111 depends_on: deps
2112 .iter()
2113 .map(|s| DependsOn {
2114 container_name: (*s).to_string(),
2115 condition: DependsOnCondition::Start,
2116 })
2117 .collect(),
2118 health_check: None,
2119 volume_mounts: Vec::new(),
2120 ulimits: Vec::new(),
2121 linux_parameters: None,
2122 stop_timeout: None,
2123 user: None,
2124 working_directory: None,
2125 tty: false,
2126 interactive: false,
2127 readonly_rootfs: false,
2128 }
2129 }
2130
2131 #[test]
2132 fn topo_sort_orders_by_depends_on() {
2133 let plans = vec![plan("sidecar", &["app"]), plan("app", &[])];
2136 let ordered = topo_sort_plans(plans);
2137 assert_eq!(ordered[0].container_name, "app");
2138 assert_eq!(ordered[1].container_name, "sidecar");
2139 }
2140
2141 #[test]
2142 fn topo_sort_preserves_declaration_order_when_no_deps() {
2143 let plans = vec![plan("first", &[]), plan("second", &[]), plan("third", &[])];
2144 let ordered = topo_sort_plans(plans);
2145 let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2146 assert_eq!(names, vec!["first", "second", "third"]);
2147 }
2148
2149 #[test]
2150 fn topo_sort_handles_chain() {
2151 let plans = vec![plan("c", &["b"]), plan("b", &["a"]), plan("a", &[])];
2154 let ordered = topo_sort_plans(plans);
2155 let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2156 assert_eq!(names, vec!["a", "b", "c"]);
2157 }
2158
2159 #[test]
2160 fn topo_sort_ignores_unknown_dependency() {
2161 let plans = vec![plan("only", &["does-not-exist"])];
2165 let ordered = topo_sort_plans(plans);
2166 assert_eq!(ordered.len(), 1);
2167 assert_eq!(ordered[0].container_name, "only");
2168 }
2169
2170 #[test]
2171 fn topo_sort_recovers_from_cycle() {
2172 let plans = vec![plan("a", &["b"]), plan("b", &["a"])];
2175 let ordered = topo_sort_plans(plans);
2176 assert_eq!(ordered.len(), 2);
2177 }
2178
2179 #[test]
2180 fn parse_health_check_fills_aws_defaults() {
2181 let v = serde_json::json!({
2182 "command": ["CMD-SHELL", "curl -f http://localhost/ || exit 1"],
2183 });
2184 let hc = __test_parse_health_check(&v).expect("parsed");
2185 assert_eq!(hc.command[0], "CMD-SHELL");
2186 assert_eq!(hc.interval_seconds, 30);
2187 assert_eq!(hc.timeout_seconds, 5);
2188 assert_eq!(hc.retries, 3);
2189 assert_eq!(hc.start_period_seconds, 0);
2190 }
2191
2192 #[test]
2193 fn parse_health_check_overrides_explicit_values() {
2194 let v = serde_json::json!({
2195 "command": ["CMD", "/probe"],
2196 "interval": 7,
2197 "timeout": 2,
2198 "retries": 9,
2199 "startPeriod": 12,
2200 });
2201 let hc = __test_parse_health_check(&v).expect("parsed");
2202 assert_eq!(hc.interval_seconds, 7);
2203 assert_eq!(hc.timeout_seconds, 2);
2204 assert_eq!(hc.retries, 9);
2205 assert_eq!(hc.start_period_seconds, 12);
2206 }
2207
2208 #[test]
2209 fn parse_health_check_returns_none_for_none_sentinel() {
2210 let v = serde_json::json!({ "command": ["NONE"] });
2213 assert!(__test_parse_health_check(&v).is_none());
2214 }
2215
2216 #[test]
2217 fn parse_health_check_returns_none_for_missing_command() {
2218 let v = serde_json::json!({ "interval": 30 });
2219 assert!(__test_parse_health_check(&v).is_none());
2220 }
2221
2222 #[test]
2223 fn render_health_flags_emits_full_set_for_cmd_shell() {
2224 let hc = HealthCheckSpec {
2225 command: vec!["CMD-SHELL".into(), "curl -f http://localhost/".into()],
2226 interval_seconds: 15,
2227 timeout_seconds: 3,
2228 retries: 4,
2229 start_period_seconds: 10,
2230 };
2231 let flags = render_health_flags(&hc);
2232 assert_eq!(flags[0], "--health-cmd");
2233 assert_eq!(flags[1], "curl -f http://localhost/");
2234 assert!(flags.contains(&"--health-interval=15s".to_string()));
2235 assert!(flags.contains(&"--health-timeout=3s".to_string()));
2236 assert!(flags.contains(&"--health-retries=4".to_string()));
2237 assert!(flags.contains(&"--health-start-period=10s".to_string()));
2238 }
2239
2240 #[test]
2241 fn render_health_flags_joins_cmd_argv_with_spaces() {
2242 let hc = HealthCheckSpec {
2245 command: vec![
2246 "CMD".into(),
2247 "/bin/probe".into(),
2248 "--port".into(),
2249 "8080".into(),
2250 ],
2251 interval_seconds: 30,
2252 timeout_seconds: 5,
2253 retries: 3,
2254 start_period_seconds: 0,
2255 };
2256 let flags = render_health_flags(&hc);
2257 assert_eq!(flags[1], "/bin/probe --port 8080");
2258 }
2259
2260 #[test]
2261 fn build_run_argv_emits_health_flags_when_present() {
2262 let plan = ContainerPlan {
2263 container_name: "app".into(),
2264 image: "alpine".into(),
2265 env: Vec::new(),
2266 entry_point: Vec::new(),
2267 command: Vec::new(),
2268 secrets_refs: Vec::new(),
2269 essential: true,
2270 has_task_role: false,
2271 port_mappings: Vec::new(),
2272 network_mode: None,
2273 depends_on: Vec::new(),
2274 health_check: Some(HealthCheckSpec {
2275 command: vec!["CMD-SHELL".into(), "true".into()],
2276 interval_seconds: 5,
2277 timeout_seconds: 2,
2278 retries: 1,
2279 start_period_seconds: 1,
2280 }),
2281 volume_mounts: Vec::new(),
2282 ulimits: Vec::new(),
2283 linux_parameters: None,
2284 stop_timeout: None,
2285 user: None,
2286 working_directory: None,
2287 tty: false,
2288 interactive: false,
2289 readonly_rootfs: false,
2290 };
2291 let argv = build_run_argv(
2292 &plan,
2293 &[],
2294 "task-1",
2295 "host.docker.internal",
2296 None,
2297 "alpine",
2298 true,
2299 );
2300 let joined = argv.join(" ");
2301 assert!(joined.contains("--health-cmd true"), "argv: {joined}");
2302 assert!(joined.contains("--health-interval=5s"), "argv: {joined}");
2303 assert!(joined.contains("--health-timeout=2s"), "argv: {joined}");
2304 assert!(joined.contains("--health-retries=1"), "argv: {joined}");
2305 assert!(
2306 joined.contains("--health-start-period=1s"),
2307 "argv: {joined}"
2308 );
2309 }
2310
2311 #[test]
2312 fn build_run_argv_emits_no_health_flags_when_absent() {
2313 let plan = ContainerPlan {
2314 container_name: "app".into(),
2315 image: "alpine".into(),
2316 env: Vec::new(),
2317 entry_point: Vec::new(),
2318 command: Vec::new(),
2319 secrets_refs: Vec::new(),
2320 essential: true,
2321 has_task_role: false,
2322 port_mappings: Vec::new(),
2323 network_mode: None,
2324 depends_on: Vec::new(),
2325 health_check: None,
2326 volume_mounts: Vec::new(),
2327 ulimits: Vec::new(),
2328 linux_parameters: None,
2329 stop_timeout: None,
2330 user: None,
2331 working_directory: None,
2332 tty: false,
2333 interactive: false,
2334 readonly_rootfs: false,
2335 };
2336 let argv = build_run_argv(
2337 &plan,
2338 &[],
2339 "task-1",
2340 "host.docker.internal",
2341 None,
2342 "alpine",
2343 true,
2344 );
2345 assert!(!argv.iter().any(|s| s.starts_with("--health")));
2346 }
2347
2348 #[test]
2349 fn docker_health_to_ecs_maps_known_states() {
2350 assert_eq!(docker_health_to_ecs("healthy"), "HEALTHY");
2351 assert_eq!(docker_health_to_ecs("HEALTHY"), "HEALTHY");
2352 assert_eq!(docker_health_to_ecs("unhealthy"), "UNHEALTHY");
2353 assert_eq!(docker_health_to_ecs("starting"), "UNKNOWN");
2354 assert_eq!(docker_health_to_ecs("none"), "UNKNOWN");
2355 assert_eq!(docker_health_to_ecs(""), "UNKNOWN");
2356 }
2357
2358 #[test]
2361 fn resolve_host_bind_volume_uses_source_path() {
2362 let mut volumes = std::collections::HashMap::new();
2363 let v = serde_json::json!({
2364 "name": "data",
2365 "host": { "sourcePath": "/var/lib/myapp" }
2366 });
2367 volumes.insert("data".to_string(), &v);
2368 let mp = serde_json::json!({
2369 "sourceVolume": "data",
2370 "containerPath": "/app/data",
2371 "readOnly": false
2372 });
2373 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2374 assert_eq!(resolved.source, "/var/lib/myapp");
2375 assert_eq!(resolved.container_path, "/app/data");
2376 assert!(!resolved.read_only);
2377 }
2378
2379 #[test]
2382 fn read_only_mount_renders_ro_suffix() {
2383 let plan = ContainerPlan {
2384 container_name: "app".into(),
2385 image: "alpine".into(),
2386 env: Vec::new(),
2387 entry_point: Vec::new(),
2388 command: Vec::new(),
2389 secrets_refs: Vec::new(),
2390 essential: true,
2391 has_task_role: false,
2392 port_mappings: Vec::new(),
2393 network_mode: None,
2394 depends_on: Vec::new(),
2395 health_check: None,
2396 volume_mounts: vec![VolumeMount {
2397 source: "/host/path".into(),
2398 container_path: "/in/container".into(),
2399 read_only: true,
2400 }],
2401 ulimits: Vec::new(),
2402 linux_parameters: None,
2403 stop_timeout: None,
2404 user: None,
2405 working_directory: None,
2406 tty: false,
2407 interactive: false,
2408 readonly_rootfs: false,
2409 };
2410 let argv = build_run_argv(
2411 &plan,
2412 &[],
2413 "task-1",
2414 "host.docker.internal",
2415 None,
2416 "alpine",
2417 true,
2418 );
2419 let pair = argv
2420 .windows(2)
2421 .find(|w| w[0] == "-v")
2422 .expect("expected -v flag");
2423 assert_eq!(pair[1], "/host/path:/in/container:ro");
2424 }
2425
2426 #[test]
2431 fn resolve_efs_volume_uses_stub_dir() {
2432 let mut volumes = std::collections::HashMap::new();
2433 let v = serde_json::json!({
2434 "name": "efs-vol",
2435 "efsVolumeConfiguration": {
2436 "fileSystemId": "fs-12345678",
2437 "rootDirectory": "/exports/app"
2438 }
2439 });
2440 volumes.insert("efs-vol".to_string(), &v);
2441 let mp = serde_json::json!({
2442 "sourceVolume": "efs-vol",
2443 "containerPath": "/mnt/efs"
2444 });
2445 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2446 assert_eq!(resolved.source, "fakecloud-efs-fs-12345678-exports-app");
2449 assert_eq!(resolved.container_path, "/mnt/efs");
2450 }
2451
2452 #[test]
2456 fn efs_without_root_directory_uses_filesystem_root() {
2457 assert_eq!(
2460 shared_volume_name("efs", "fs-abc", "/"),
2461 "fakecloud-efs-fs-abc"
2462 );
2463 assert_eq!(
2464 shared_volume_name("efs", "fs-abc", ""),
2465 "fakecloud-efs-fs-abc"
2466 );
2467 }
2468
2469 #[test]
2473 fn resolve_docker_named_volume_uses_volume_name() {
2474 let mut volumes = std::collections::HashMap::new();
2475 let v = serde_json::json!({
2476 "name": "named-vol",
2477 "dockerVolumeConfiguration": {
2478 "scope": "task",
2479 "driver": "local"
2480 }
2481 });
2482 volumes.insert("named-vol".to_string(), &v);
2483 let mp = serde_json::json!({
2484 "sourceVolume": "named-vol",
2485 "containerPath": "/data"
2486 });
2487 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2488 assert_eq!(resolved.source, "named-vol");
2489 assert_eq!(resolved.container_path, "/data");
2490 }
2491
2492 #[test]
2495 fn resolve_fsx_volume_uses_stub_dir() {
2496 let mut volumes = std::collections::HashMap::new();
2497 let v = serde_json::json!({
2498 "name": "fsx-vol",
2499 "fsxWindowsFileServerVolumeConfiguration": {
2500 "fileSystemId": "fs-xyz",
2501 "rootDirectory": "share"
2502 }
2503 });
2504 volumes.insert("fsx-vol".to_string(), &v);
2505 let mp = serde_json::json!({
2506 "sourceVolume": "fsx-vol",
2507 "containerPath": "C:\\data"
2508 });
2509 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2510 assert_eq!(resolved.source, "fakecloud-fsx-fs-xyz-share");
2512 }
2513
2514 #[test]
2518 fn unknown_source_volume_returns_none() {
2519 let volumes = std::collections::HashMap::new();
2520 let mp = serde_json::json!({
2521 "sourceVolume": "missing",
2522 "containerPath": "/x"
2523 });
2524 assert!(resolve_mount_point(&mp, &volumes).is_none());
2525 }
2526
2527 #[test]
2531 fn find_depends_on_cycle_detects_two_node_cycle() {
2532 let cds = vec![
2533 serde_json::json!({
2534 "name": "a",
2535 "image": "alpine",
2536 "dependsOn": [{"containerName": "b", "condition": "START"}],
2537 }),
2538 serde_json::json!({
2539 "name": "b",
2540 "image": "alpine",
2541 "dependsOn": [{"containerName": "a", "condition": "START"}],
2542 }),
2543 ];
2544 let cycle = find_depends_on_cycle(&cds);
2545 assert!(cycle.is_some(), "expected cycle to be detected");
2546 }
2547
2548 #[test]
2552 fn find_depends_on_cycle_accepts_chain() {
2553 let cds = vec![
2554 serde_json::json!({
2555 "name": "a",
2556 "image": "alpine",
2557 "dependsOn": [{"containerName": "b", "condition": "START"}],
2558 }),
2559 serde_json::json!({
2560 "name": "b",
2561 "image": "alpine",
2562 "dependsOn": [{"containerName": "c", "condition": "START"}],
2563 }),
2564 serde_json::json!({
2565 "name": "c",
2566 "image": "alpine",
2567 }),
2568 ];
2569 assert!(find_depends_on_cycle(&cds).is_none());
2570 }
2571
2572 #[test]
2576 fn find_depends_on_cycle_ignores_unknown_target() {
2577 let cds = vec![serde_json::json!({
2578 "name": "only",
2579 "image": "alpine",
2580 "dependsOn": [{"containerName": "ghost", "condition": "START"}],
2581 })];
2582 assert!(find_depends_on_cycle(&cds).is_none());
2583 }
2584
2585 #[test]
2589 fn condition_is_met_matches_aws_semantics() {
2590 let running = InspectedState {
2591 started: true,
2592 exited: false,
2593 exit_code: 0,
2594 health: None,
2595 };
2596 let exited_ok = InspectedState {
2597 started: true,
2598 exited: true,
2599 exit_code: 0,
2600 health: None,
2601 };
2602 let exited_fail = InspectedState {
2603 started: true,
2604 exited: true,
2605 exit_code: 1,
2606 health: None,
2607 };
2608 let healthy = InspectedState {
2609 started: true,
2610 exited: false,
2611 exit_code: 0,
2612 health: Some("healthy".into()),
2613 };
2614
2615 assert!(condition_is_met(DependsOnCondition::Start, &running));
2618 assert!(condition_is_met(DependsOnCondition::Start, &exited_ok));
2619
2620 assert!(!condition_is_met(DependsOnCondition::Complete, &running));
2622 assert!(condition_is_met(DependsOnCondition::Complete, &exited_ok));
2623 assert!(condition_is_met(DependsOnCondition::Complete, &exited_fail));
2624
2625 assert!(!condition_is_met(DependsOnCondition::Success, &running));
2627 assert!(condition_is_met(DependsOnCondition::Success, &exited_ok));
2628 assert!(!condition_is_met(DependsOnCondition::Success, &exited_fail));
2629
2630 assert!(!condition_is_met(DependsOnCondition::Healthy, &running));
2632 assert!(condition_is_met(DependsOnCondition::Healthy, &healthy));
2633 }
2634
2635 #[test]
2639 fn depends_on_condition_parse_round_trips() {
2640 assert_eq!(
2641 DependsOnCondition::parse("START"),
2642 Some(DependsOnCondition::Start)
2643 );
2644 assert_eq!(
2645 DependsOnCondition::parse("COMPLETE"),
2646 Some(DependsOnCondition::Complete)
2647 );
2648 assert_eq!(
2649 DependsOnCondition::parse("SUCCESS"),
2650 Some(DependsOnCondition::Success)
2651 );
2652 assert_eq!(
2653 DependsOnCondition::parse("HEALTHY"),
2654 Some(DependsOnCondition::Healthy)
2655 );
2656 assert_eq!(DependsOnCondition::parse("start"), None);
2657 assert_eq!(DependsOnCondition::parse("ANY"), None);
2658 }
2659
2660 #[test]
2663 fn build_run_argv_emits_ulimits() {
2664 let plan = ContainerPlan {
2665 container_name: "app".into(),
2666 image: "alpine".into(),
2667 env: Vec::new(),
2668 entry_point: Vec::new(),
2669 command: Vec::new(),
2670 secrets_refs: Vec::new(),
2671 essential: true,
2672 has_task_role: false,
2673 port_mappings: Vec::new(),
2674 network_mode: None,
2675 depends_on: Vec::new(),
2676 health_check: None,
2677 volume_mounts: Vec::new(),
2678 ulimits: vec![Ulimit {
2679 name: "nofile".into(),
2680 soft_limit: 1024,
2681 hard_limit: 2048,
2682 }],
2683 linux_parameters: None,
2684 stop_timeout: None,
2685 user: None,
2686 working_directory: None,
2687 tty: false,
2688 interactive: false,
2689 readonly_rootfs: false,
2690 };
2691 let argv = build_run_argv(&plan, &[], "t", "host.docker.internal", None, "img", true);
2692 assert!(argv.contains(&"--ulimit".to_string()));
2693 assert!(argv.contains(&"nofile=1024:2048".to_string()));
2694 }
2695
2696 #[test]
2697 fn build_run_argv_emits_linux_parameters() {
2698 let plan = ContainerPlan {
2699 container_name: "app".into(),
2700 image: "alpine".into(),
2701 env: Vec::new(),
2702 entry_point: Vec::new(),
2703 command: Vec::new(),
2704 secrets_refs: Vec::new(),
2705 essential: true,
2706 has_task_role: false,
2707 port_mappings: Vec::new(),
2708 network_mode: None,
2709 depends_on: Vec::new(),
2710 health_check: None,
2711 volume_mounts: Vec::new(),
2712 ulimits: Vec::new(),
2713 linux_parameters: Some(LinuxParameters {
2714 capabilities_add: vec!["NET_ADMIN".into()],
2715 capabilities_drop: vec!["ALL".into()],
2716 devices: vec![Device {
2717 host_path: "/dev/zero".into(),
2718 container_path: "/dev/zero".into(),
2719 permissions: "rwm".into(),
2720 }],
2721 init_process_enabled: true,
2722 shared_memory_size: Some(256),
2723 sysctls: vec![Sysctl {
2724 name: "net.ipv4.ip_forward".into(),
2725 value: "1".into(),
2726 }],
2727 tmpfs: vec![Tmpfs {
2728 container_path: "/tmp".into(),
2729 size: 128,
2730 mount_options: vec!["noexec".into()],
2731 }],
2732 privileged: true,
2733 }),
2734 stop_timeout: Some(30),
2735 user: Some("1000:1000".into()),
2736 working_directory: Some("/app".into()),
2737 tty: true,
2738 interactive: true,
2739 readonly_rootfs: true,
2740 };
2741 let argv = build_run_argv(&plan, &[], "t", "host.docker.internal", None, "img", true);
2742 assert!(argv.contains(&"--cap-add".to_string()));
2743 assert!(argv.contains(&"NET_ADMIN".to_string()));
2744 assert!(argv.contains(&"--cap-drop".to_string()));
2745 assert!(argv.contains(&"ALL".to_string()));
2746 assert!(argv.contains(&"--device".to_string()));
2747 assert!(argv.contains(&"/dev/zero:/dev/zerorwm".to_string()));
2748 assert!(argv.contains(&"--init".to_string()));
2749 assert!(argv.contains(&"--shm-size".to_string()));
2750 assert!(argv.contains(&"256m".to_string()));
2751 assert!(argv.contains(&"--sysctl".to_string()));
2752 assert!(argv.contains(&"net.ipv4.ip_forward=1".to_string()));
2753 assert!(argv.contains(&"--tmpfs".to_string()));
2754 assert!(argv.contains(&"--privileged".to_string()));
2755 assert!(argv.contains(&"--stop-timeout".to_string()));
2756 assert!(argv.contains(&"30".to_string()));
2757 assert!(argv.contains(&"--user".to_string()));
2758 assert!(argv.contains(&"1000:1000".to_string()));
2759 assert!(argv.contains(&"--workdir".to_string()));
2760 assert!(argv.contains(&"/app".to_string()));
2761 assert!(argv.contains(&"--tty".to_string()));
2762 assert!(argv.contains(&"--interactive".to_string()));
2763 assert!(argv.contains(&"--read-only".to_string()));
2764 }
2765
2766 #[test]
2767 fn parse_linux_parameters_fills_defaults() {
2768 let raw = serde_json::json!({"initProcessEnabled": true});
2769 let lp = parse_linux_parameters(&raw).expect("parses");
2770 assert!(lp.init_process_enabled);
2771 assert!(!lp.privileged);
2772 assert!(lp.capabilities_add.is_empty());
2773 }
2774
2775 #[test]
2776 fn parse_device_uses_default_permissions() {
2777 let raw = serde_json::json!({"hostPath": "/dev/null", "containerPath": "/dev/null"});
2778 let dev = parse_device(&raw).expect("parses");
2779 assert_eq!(dev.permissions, "rwm");
2780 }
2781
2782 #[test]
2783 fn compute_elbv2_targets_empty_when_no_group() {
2784 let mut accounts: MultiAccountState<EcsState> =
2785 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2786 let acct = accounts.get_or_create("000000000000");
2787 let mut task = make_task("t1");
2788 task.group = None;
2789 acct.tasks.insert("t1".into(), task);
2790 let state = acct.clone();
2791 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2792 assert!(targets.is_empty());
2793 }
2794
2795 #[test]
2796 fn compute_elbv2_targets_bridge_mode_uses_localhost_and_host_port() {
2797 let mut accounts: MultiAccountState<EcsState> =
2798 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2799 let acct = accounts.get_or_create("000000000000");
2800
2801 let td = crate::state::TaskDefinition {
2802 family: "app".into(),
2803 revision: 1,
2804 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2805 container_definitions: Vec::new(),
2806 network_mode: Some("bridge".into()),
2807 status: "ACTIVE".into(),
2808 task_role_arn: None,
2809 execution_role_arn: None,
2810 requires_compatibilities: Vec::new(),
2811 compatibilities: Vec::new(),
2812 cpu: None,
2813 memory: None,
2814 pid_mode: None,
2815 ipc_mode: None,
2816 volumes: Vec::new(),
2817 placement_constraints: Vec::new(),
2818 proxy_configuration: None,
2819 inference_accelerators: Vec::new(),
2820 ephemeral_storage: None,
2821 runtime_platform: None,
2822 requires_attributes: Vec::new(),
2823 registered_at: Utc::now(),
2824 registered_by: None,
2825 deregistered_at: None,
2826 tags: Vec::new(),
2827 enable_fault_injection: None,
2828 };
2829 acct.task_definitions.insert("app".into(), {
2830 let mut m = std::collections::BTreeMap::new();
2831 m.insert(1, td);
2832 m
2833 });
2834
2835 let service = crate::state::Service {
2836 service_name: "svc".into(),
2837 service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
2838 cluster_name: "default".into(),
2839 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2840 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2841 family: "app".into(),
2842 revision: 1,
2843 desired_count: 1,
2844 running_count: 0,
2845 pending_count: 0,
2846 launch_type: "FARGATE".into(),
2847 status: "ACTIVE".into(),
2848 scheduling_strategy: "REPLICA".into(),
2849 deployment_controller: "ECS".into(),
2850 minimum_healthy_percent: Some(0),
2851 maximum_percent: Some(200),
2852 circuit_breaker: None,
2853 deployments: Vec::new(),
2854 load_balancers: vec![serde_json::json!({
2855 "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
2856 "containerName": "app",
2857 "containerPort": 80,
2858 })],
2859 service_registries: Vec::new(),
2860 placement_constraints: Vec::new(),
2861 placement_strategy: Vec::new(),
2862 network_configuration: None,
2863 volume_configurations: vec![],
2864 tags: Vec::new(),
2865 created_at: Utc::now(),
2866 created_by: None,
2867 role_arn: None,
2868 platform_version: None,
2869 health_check_grace_period_seconds: None,
2870 enable_execute_command: false,
2871 enable_ecs_managed_tags: false,
2872 propagate_tags: None,
2873 capacity_provider_strategy: Vec::new(),
2874 availability_zone_rebalancing: None,
2875 };
2876 acct.services.insert(
2877 crate::state::EcsState::service_key("default", "svc"),
2878 service,
2879 );
2880
2881 let mut task = make_task("t1");
2882 task.group = Some("service:svc".into());
2883 task.containers = vec![crate::state::Container {
2884 container_arn: "arn:aws:ecs:us-east-1:000000000000:container/default/abc/app".into(),
2885 name: "app".into(),
2886 image: "alpine".into(),
2887 task_arn: task.task_arn.clone(),
2888 last_status: "RUNNING".into(),
2889 exit_code: None,
2890 reason: None,
2891 runtime_id: Some("dockerid-app".into()),
2892 essential: true,
2893 cpu: None,
2894 memory: None,
2895 memory_reservation: None,
2896 network_bindings: vec![serde_json::json!({
2897 "bindIP": "0.0.0.0",
2898 "containerPort": 80,
2899 "hostPort": 32768,
2900 "protocol": "tcp",
2901 })],
2902 network_interfaces: Vec::new(),
2903 health_status: None,
2904 managed_agents: None,
2905 image_digest: None,
2906 }];
2907 acct.tasks.insert("t1".into(), task);
2908
2909 let state = acct.clone();
2910 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2911 assert_eq!(targets.len(), 1);
2912 let (arn, tg_targets) = &targets[0];
2913 assert_eq!(
2914 arn,
2915 "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
2916 );
2917 assert_eq!(tg_targets.len(), 1);
2918 assert_eq!(tg_targets[0].0, "127.0.0.1");
2919 assert_eq!(tg_targets[0].1, Some(32768));
2920 }
2921
2922 #[test]
2923 fn compute_elbv2_targets_awsvpc_uses_eni_ip() {
2924 let mut accounts: MultiAccountState<EcsState> =
2925 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2926 let acct = accounts.get_or_create("000000000000");
2927
2928 let td = crate::state::TaskDefinition {
2929 family: "app".into(),
2930 revision: 1,
2931 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2932 container_definitions: Vec::new(),
2933 network_mode: Some("awsvpc".into()),
2934 status: "ACTIVE".into(),
2935 task_role_arn: None,
2936 execution_role_arn: None,
2937 requires_compatibilities: Vec::new(),
2938 compatibilities: Vec::new(),
2939 cpu: None,
2940 memory: None,
2941 pid_mode: None,
2942 ipc_mode: None,
2943 volumes: Vec::new(),
2944 placement_constraints: Vec::new(),
2945 proxy_configuration: None,
2946 inference_accelerators: Vec::new(),
2947 ephemeral_storage: None,
2948 runtime_platform: None,
2949 requires_attributes: Vec::new(),
2950 registered_at: Utc::now(),
2951 registered_by: None,
2952 deregistered_at: None,
2953 tags: Vec::new(),
2954 enable_fault_injection: None,
2955 };
2956 acct.task_definitions.insert("app".into(), {
2957 let mut m = std::collections::BTreeMap::new();
2958 m.insert(1, td);
2959 m
2960 });
2961
2962 let service = crate::state::Service {
2963 service_name: "svc".into(),
2964 service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
2965 cluster_name: "default".into(),
2966 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2967 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2968 family: "app".into(),
2969 revision: 1,
2970 desired_count: 1,
2971 running_count: 0,
2972 pending_count: 0,
2973 launch_type: "FARGATE".into(),
2974 status: "ACTIVE".into(),
2975 scheduling_strategy: "REPLICA".into(),
2976 deployment_controller: "ECS".into(),
2977 minimum_healthy_percent: Some(0),
2978 maximum_percent: Some(200),
2979 circuit_breaker: None,
2980 deployments: Vec::new(),
2981 load_balancers: vec![serde_json::json!({
2982 "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
2983 "containerName": "app",
2984 "containerPort": 80,
2985 })],
2986 service_registries: Vec::new(),
2987 placement_constraints: Vec::new(),
2988 placement_strategy: Vec::new(),
2989 network_configuration: None,
2990 volume_configurations: vec![],
2991 tags: Vec::new(),
2992 created_at: Utc::now(),
2993 created_by: None,
2994 role_arn: None,
2995 platform_version: None,
2996 health_check_grace_period_seconds: None,
2997 enable_execute_command: false,
2998 enable_ecs_managed_tags: false,
2999 propagate_tags: None,
3000 capacity_provider_strategy: Vec::new(),
3001 availability_zone_rebalancing: None,
3002 };
3003 acct.services.insert(
3004 crate::state::EcsState::service_key("default", "svc"),
3005 service,
3006 );
3007
3008 let mut task = make_task("t1");
3009 task.group = Some("service:svc".into());
3010 task.attachments = vec![crate::state::TaskAttachment {
3011 id: "eni-123".into(),
3012 attachment_type: "eni".into(),
3013 status: "ATTACHED".into(),
3014 details: vec![
3015 crate::state::AttachmentDetail {
3016 name: "privateIPv4Address".into(),
3017 value: "172.18.0.2".into(),
3018 },
3019 crate::state::AttachmentDetail {
3020 name: "macAddress".into(),
3021 value: "02:42:ac:12:00:02".into(),
3022 },
3023 ],
3024 }];
3025 acct.tasks.insert("t1".into(), task);
3026
3027 let state = acct.clone();
3028 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
3029 assert_eq!(targets.len(), 1);
3030 let (arn, tg_targets) = &targets[0];
3031 assert_eq!(
3032 arn,
3033 "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
3034 );
3035 assert_eq!(tg_targets.len(), 1);
3036 assert_eq!(tg_targets[0].0, "172.18.0.2");
3037 assert_eq!(tg_targets[0].1, Some(80));
3038 }
3039
3040 fn minimal_plan() -> ContainerPlan {
3041 ContainerPlan {
3042 container_name: "app".into(),
3043 image: "alpine".into(),
3044 env: Vec::new(),
3045 entry_point: Vec::new(),
3046 command: Vec::new(),
3047 secrets_refs: Vec::new(),
3048 essential: true,
3049 has_task_role: false,
3050 port_mappings: Vec::new(),
3051 network_mode: None,
3052 depends_on: Vec::new(),
3053 health_check: None,
3054 volume_mounts: Vec::new(),
3055 ulimits: Vec::new(),
3056 linux_parameters: None,
3057 stop_timeout: None,
3058 user: None,
3059 working_directory: None,
3060 tty: false,
3061 interactive: false,
3062 readonly_rootfs: false,
3063 }
3064 }
3065
3066 #[test]
3070 fn build_run_argv_emits_fakecloud_instance_label() {
3071 let plan = minimal_plan();
3072 let argv = build_run_argv(
3073 &plan,
3074 &[],
3075 "task-1",
3076 "host.docker.internal",
3077 None,
3078 "alpine",
3079 true,
3080 );
3081 let expected = fakecloud_instance_label();
3082 assert!(
3083 argv.windows(2)
3084 .any(|w| w[0] == "--label" && w[1] == expected),
3085 "argv must contain `--label {expected}`: {argv:?}",
3086 );
3087 }
3088
3089 #[test]
3094 fn fakecloud_instance_label_matches_reaper_format() {
3095 let label = fakecloud_instance_label();
3096 let (key, value) = label.split_once('=').expect("label is key=value");
3097 assert_eq!(key, "fakecloud-instance");
3098 let pid_str = value
3099 .strip_prefix("fakecloud-")
3100 .expect("value starts with fakecloud-");
3101 assert_eq!(
3102 pid_str.parse::<u32>().ok(),
3103 Some(std::process::id()),
3104 "reaper must be able to parse the owning pid out of {label}",
3105 );
3106 }
3107}