1use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use base64::Engine;
15use chrono::Utc;
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_logs::ingest::{append_events, IngestEvent};
18use fakecloud_logs::SharedLogsState;
19use fakecloud_secretsmanager::SharedSecretsManagerState;
20use fakecloud_ssm::SharedSsmState;
21use parking_lot::RwLock;
22use tempfile::TempDir;
23use tokio::process::Command;
24
25use crate::state::{LifecycleEvent, SharedEcsState};
26
27#[derive(Debug, thiserror::Error)]
28pub enum RuntimeError {
29 #[error("container CLI not found (tried docker, podman)")]
30 NoCli,
31 #[error("image pull failed: {0}")]
32 ImagePull(String),
33 #[error("container start failed: {0}")]
34 ContainerStart(String),
35 #[error("docker wait failed: {0}")]
36 Wait(String),
37}
38
39pub struct EcsRuntime {
41 cli: String,
42 host_ip: String,
43 server_port: u16,
48 docker_config: Option<Arc<TempDir>>,
53 containers: RwLock<std::collections::HashMap<String, Vec<(String, String)>>>,
58 delivery_bus: Option<Arc<DeliveryBus>>,
62 logs_state: Option<SharedLogsState>,
66 secretsmanager_state: Option<SharedSecretsManagerState>,
69 ssm_state: Option<SharedSsmState>,
72}
73
74mod config;
75mod lb;
76mod monitoring;
77mod secrets;
78mod task_lifecycle;
79
80impl EcsRuntime {
81 pub fn new(server_port: u16) -> Option<Self> {
86 let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
87 if cli_works(&cli) {
88 cli
89 } else {
90 return None;
91 }
92 } else if cli_works("docker") {
93 "docker".to_string()
94 } else if cli_works("podman") {
95 "podman".to_string()
96 } else {
97 return None;
98 };
99 let host_ip = if cfg!(target_os = "linux") {
100 "172.17.0.1".to_string()
101 } else {
102 "host-gateway".to_string()
103 };
104 let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
105 Some(Self {
106 cli,
107 host_ip,
108 server_port,
109 docker_config,
110 containers: RwLock::new(std::collections::HashMap::new()),
111 delivery_bus: None,
112 logs_state: None,
113 secretsmanager_state: None,
114 ssm_state: None,
115 })
116 }
117
118 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
121 self.delivery_bus = Some(bus);
122 self
123 }
124
125 pub fn with_logs(mut self, logs: SharedLogsState) -> Self {
128 self.logs_state = Some(logs);
129 self
130 }
131}
132
133#[derive(Clone, Debug)]
135pub(crate) struct ContainerPlan {
136 pub(crate) container_name: String,
137 pub(crate) image: String,
138 pub(crate) env: Vec<(String, String)>,
139 pub(crate) entry_point: Vec<String>,
140 pub(crate) command: Vec<String>,
141 pub(crate) secrets_refs: Vec<(String, String)>,
142 pub(crate) essential: bool,
143 pub(crate) has_task_role: bool,
144 pub(crate) port_mappings: Vec<PortMapping>,
149 pub(crate) network_mode: Option<String>,
154 pub(crate) depends_on: Vec<DependsOn>,
162 pub(crate) health_check: Option<HealthCheckSpec>,
169 pub(crate) volume_mounts: Vec<VolumeMount>,
174 pub(crate) ulimits: Vec<Ulimit>,
177 pub(crate) linux_parameters: Option<LinuxParameters>,
181 pub(crate) stop_timeout: Option<u32>,
183 pub(crate) user: Option<String>,
185 pub(crate) working_directory: Option<String>,
187 pub(crate) tty: bool,
189 pub(crate) interactive: bool,
191 pub(crate) readonly_rootfs: bool,
193}
194
195#[derive(Clone, Debug, PartialEq, Eq)]
201pub(crate) struct DependsOn {
202 pub container_name: String,
203 pub condition: DependsOnCondition,
204}
205
206#[derive(Clone, Copy, Debug, PartialEq, Eq)]
210pub(crate) enum DependsOnCondition {
211 Start,
214 Complete,
216 Success,
218 Healthy,
222}
223
224impl DependsOnCondition {
225 pub fn parse(raw: &str) -> Option<Self> {
229 match raw {
230 "START" => Some(Self::Start),
231 "COMPLETE" => Some(Self::Complete),
232 "SUCCESS" => Some(Self::Success),
233 "HEALTHY" => Some(Self::Healthy),
234 _ => None,
235 }
236 }
237
238 pub fn as_aws_str(self) -> &'static str {
242 match self {
243 Self::Start => "START",
244 Self::Complete => "COMPLETE",
245 Self::Success => "SUCCESS",
246 Self::Healthy => "HEALTHY",
247 }
248 }
249}
250
251#[derive(Clone, Debug, PartialEq, Eq)]
257pub(crate) struct HealthCheckSpec {
258 pub command: Vec<String>,
264 pub interval_seconds: u32,
265 pub timeout_seconds: u32,
266 pub retries: u32,
267 pub start_period_seconds: u32,
268}
269
270#[derive(Clone, Debug, PartialEq, Eq)]
274pub(crate) struct PortMapping {
275 pub container_port: u16,
276 pub host_port: u16,
279 pub protocol: String,
281}
282
283#[derive(Clone, Debug, PartialEq, Eq)]
305pub(crate) struct VolumeMount {
306 pub source: String,
310 pub container_path: String,
313 pub read_only: bool,
317}
318
319#[derive(Clone, Debug, PartialEq, Eq)]
321pub(crate) struct Ulimit {
322 pub name: String,
323 pub soft_limit: i32,
324 pub hard_limit: i32,
325}
326
327#[derive(Clone, Debug, PartialEq, Eq)]
329pub(crate) struct Device {
330 pub host_path: String,
331 pub container_path: String,
332 pub permissions: String,
333}
334
335#[derive(Clone, Debug, PartialEq, Eq)]
337pub(crate) struct Sysctl {
338 pub name: String,
339 pub value: String,
340}
341
342#[derive(Clone, Debug, PartialEq, Eq, Default)]
344pub(crate) struct LinuxParameters {
345 pub capabilities_add: Vec<String>,
346 pub capabilities_drop: Vec<String>,
347 pub devices: Vec<Device>,
348 pub init_process_enabled: bool,
349 pub shared_memory_size: Option<i32>,
350 pub sysctls: Vec<Sysctl>,
351 pub tmpfs: Vec<Tmpfs>,
352 pub privileged: bool,
353}
354
355#[derive(Clone, Debug, PartialEq, Eq)]
357pub(crate) struct Tmpfs {
358 pub container_path: String,
359 pub size: i32,
360 pub mount_options: Vec<String>,
361}
362
363#[derive(Clone, Debug)]
364struct ResolvedContainerPlan {
365 plan: ContainerPlan,
366 env: Vec<(String, String)>,
367}
368
369#[derive(Clone, Debug)]
371struct TaskExitOutcome {
372 exited_index: Option<usize>,
376 exit_code: i64,
377 stop_code: &'static str,
378}
379
380#[derive(Clone, Debug)]
383pub(crate) struct RunningContainer {
384 pub(crate) name: String,
385 pub(crate) container_id: String,
386 pub(crate) essential: bool,
387 pub(crate) exit_code: Option<i64>,
388 pub(crate) network_bindings: Vec<serde_json::Value>,
392 pub(crate) image_digest: Option<String>,
397}
398
399pub(crate) fn task_should_stop(containers: &[RunningContainer]) -> bool {
404 if containers.is_empty() {
405 return true;
406 }
407 let any_essential_exited = containers
408 .iter()
409 .any(|c| c.essential && c.exit_code.is_some());
410 if any_essential_exited {
411 return true;
412 }
413 containers.iter().all(|c| c.exit_code.is_some())
414}
415
416fn build_container_plans(
417 state: &SharedEcsState,
418 account_id: &str,
419 task_id: &str,
420 _server_port: u16,
421) -> Result<Vec<ContainerPlan>, RuntimeError> {
422 let accounts = state.read();
423 let s = accounts
424 .get(account_id)
425 .ok_or_else(|| RuntimeError::ContainerStart("account missing".into()))?;
426 let task = s
427 .tasks
428 .get(task_id)
429 .ok_or_else(|| RuntimeError::ContainerStart("task missing".into()))?;
430 if task.containers.is_empty() {
431 return Err(RuntimeError::ContainerStart(
432 "task has no containers".into(),
433 ));
434 }
435 let has_task_role = task.task_role_arn.is_some();
436 let task_def = s
437 .task_definitions
438 .get(&task.family)
439 .and_then(|revs| revs.get(&task.revision));
440 let network_mode = task_def.and_then(|td| td.network_mode.clone());
441 let volumes_by_name: std::collections::HashMap<String, &serde_json::Value> = task_def
446 .map(|td| {
447 td.volumes
448 .iter()
449 .filter_map(|v| {
450 let name = v.get("name").and_then(|n| n.as_str())?;
451 Some((name.to_string(), v))
452 })
453 .collect()
454 })
455 .unwrap_or_default();
456 let mut plans = Vec::with_capacity(task.containers.len());
457 for container in &task.containers {
458 let def = find_container_definition(s, &task.family, task.revision, &container.name);
459 let secrets_refs = def
460 .as_ref()
461 .and_then(|d| d.get("secrets").and_then(|v| v.as_array()).cloned())
462 .map(|arr| {
463 arr.iter()
464 .filter_map(|e| {
465 let name = e.get("name").and_then(|v| v.as_str())?.to_string();
466 let value_from = e.get("valueFrom").and_then(|v| v.as_str())?.to_string();
467 Some((name, value_from))
468 })
469 .collect::<Vec<_>>()
470 })
471 .unwrap_or_default();
472 let str_array = |key: &str| -> Vec<String> {
473 def.as_ref()
474 .and_then(|d| d.get(key).and_then(|v| v.as_array()).cloned())
475 .map(|arr| {
476 arr.iter()
477 .filter_map(|v| v.as_str().map(String::from))
478 .collect::<Vec<_>>()
479 })
480 .unwrap_or_default()
481 };
482 let env = def
483 .as_ref()
484 .and_then(|d| d.get("environment").and_then(|v| v.as_array()).cloned())
485 .map(|arr| {
486 arr.iter()
487 .filter_map(|e| {
488 let k = e.get("name").and_then(|v| v.as_str())?;
489 let v = e.get("value").and_then(|v| v.as_str()).unwrap_or("");
490 Some((k.to_string(), v.to_string()))
491 })
492 .collect::<Vec<_>>()
493 })
494 .unwrap_or_default();
495 let port_mappings = def
496 .as_ref()
497 .and_then(|d| d.get("portMappings").and_then(|v| v.as_array()).cloned())
498 .map(|arr| {
499 arr.iter()
500 .filter_map(parse_port_mapping)
501 .collect::<Vec<_>>()
502 })
503 .unwrap_or_default();
504 let depends_on = def
505 .as_ref()
506 .and_then(|d| d.get("dependsOn").and_then(|v| v.as_array()).cloned())
507 .map(|arr| {
508 arr.iter()
509 .filter_map(parse_depends_on_entry)
510 .collect::<Vec<_>>()
511 })
512 .unwrap_or_default();
513 let health_check = def
514 .as_ref()
515 .and_then(|d| d.get("healthCheck"))
516 .and_then(parse_health_check);
517 let volume_mounts = def
518 .as_ref()
519 .and_then(|d| d.get("mountPoints").and_then(|v| v.as_array()).cloned())
520 .map(|arr| {
521 arr.iter()
522 .filter_map(|mp| resolve_mount_point(mp, &volumes_by_name))
523 .collect::<Vec<_>>()
524 })
525 .unwrap_or_default();
526 let ulimits = def
527 .as_ref()
528 .and_then(|d| d.get("ulimits").and_then(|v| v.as_array()).cloned())
529 .map(|arr| arr.iter().filter_map(parse_ulimit).collect::<Vec<_>>())
530 .unwrap_or_default();
531 let linux_parameters = def
532 .as_ref()
533 .and_then(|d| d.get("linuxParameters"))
534 .and_then(parse_linux_parameters);
535 let stop_timeout = def.as_ref().and_then(|d| {
536 d.get("stopTimeout")
537 .and_then(|v| v.as_u64())
538 .map(|n| n as u32)
539 });
540 let user = def
541 .as_ref()
542 .and_then(|d| d.get("user").and_then(|v| v.as_str()).map(String::from));
543 let working_directory = def.as_ref().and_then(|d| {
544 d.get("workingDirectory")
545 .and_then(|v| v.as_str())
546 .map(String::from)
547 });
548 let tty = def
549 .as_ref()
550 .and_then(|d| d.get("tty").and_then(|v| v.as_bool()))
551 .unwrap_or(false);
552 let interactive = def
553 .as_ref()
554 .and_then(|d| d.get("interactive").and_then(|v| v.as_bool()))
555 .unwrap_or(false);
556 let readonly_rootfs = def
557 .as_ref()
558 .and_then(|d| d.get("readonlyRootFilesystem").and_then(|v| v.as_bool()))
559 .unwrap_or(false);
560 plans.push(ContainerPlan {
561 container_name: container.name.clone(),
562 image: container.image.clone(),
563 env,
564 entry_point: str_array("entryPoint"),
565 command: str_array("command"),
566 secrets_refs,
567 essential: container.essential,
568 has_task_role,
569 port_mappings,
570 network_mode: network_mode.clone(),
571 depends_on,
572 health_check,
573 volume_mounts,
574 ulimits,
575 linux_parameters,
576 stop_timeout,
577 user,
578 working_directory,
579 tty,
580 interactive,
581 readonly_rootfs,
582 });
583 }
584 let plans = topo_sort_plans(plans);
585 Ok(plans)
586}
587
588fn resolve_mount_point(
596 mount_point: &serde_json::Value,
597 volumes_by_name: &std::collections::HashMap<String, &serde_json::Value>,
598) -> Option<VolumeMount> {
599 let container_path = mount_point
600 .get("containerPath")
601 .and_then(|v| v.as_str())?
602 .to_string();
603 let source_volume = mount_point.get("sourceVolume").and_then(|v| v.as_str())?;
604 let read_only = mount_point
605 .get("readOnly")
606 .and_then(|v| v.as_bool())
607 .unwrap_or(false);
608 let volume = volumes_by_name.get(source_volume)?;
609 let source = resolve_volume_source(source_volume, volume)?;
610 Some(VolumeMount {
611 source,
612 container_path,
613 read_only,
614 })
615}
616
617fn resolve_volume_source(name: &str, volume: &serde_json::Value) -> Option<String> {
636 if let Some(host) = volume.get("host") {
637 if let Some(path) = host.get("sourcePath").and_then(|v| v.as_str()) {
638 if !path.is_empty() {
641 ensure_dir_exists(path);
642 return Some(path.to_string());
643 }
644 }
645 }
646 if let Some(efs) = volume.get("efsVolumeConfiguration") {
647 let fs_id = efs.get("fileSystemId").and_then(|v| v.as_str())?;
648 let root = efs
649 .get("rootDirectory")
650 .and_then(|v| v.as_str())
651 .unwrap_or("/");
652 let path = stub_dir_for("efs", fs_id, root);
653 ensure_dir_exists(&path);
654 return Some(path);
655 }
656 if let Some(fsx) = volume.get("fsxWindowsFileServerVolumeConfiguration") {
657 let fs_id = fsx.get("fileSystemId").and_then(|v| v.as_str())?;
658 let root = fsx
659 .get("rootDirectory")
660 .and_then(|v| v.as_str())
661 .unwrap_or("/");
662 let path = stub_dir_for("fsx", fs_id, root);
663 ensure_dir_exists(&path);
664 return Some(path);
665 }
666 if volume.get("dockerVolumeConfiguration").is_some() {
667 return Some(name.to_string());
670 }
671 Some(name.to_string())
673}
674
675fn stub_dir_for(kind: &str, fs_id: &str, root: &str) -> String {
680 let trimmed = root.trim_start_matches('/');
681 if trimmed.is_empty() {
682 format!("/tmp/fakecloud/{kind}/{fs_id}")
683 } else {
684 format!("/tmp/fakecloud/{kind}/{fs_id}/{trimmed}")
685 }
686}
687
688fn ensure_dir_exists(path: &str) {
693 let _ = std::fs::create_dir_all(path);
694}
695
696fn parse_depends_on_entry(value: &serde_json::Value) -> Option<DependsOn> {
701 let container_name = value
702 .get("containerName")
703 .and_then(|v| v.as_str())?
704 .to_string();
705 let raw_condition = value.get("condition").and_then(|v| v.as_str())?;
706 let condition = DependsOnCondition::parse(raw_condition)?;
707 Some(DependsOn {
708 container_name,
709 condition,
710 })
711}
712
713fn topo_sort_plans(plans: Vec<ContainerPlan>) -> Vec<ContainerPlan> {
724 use std::collections::{HashMap, HashSet};
725 let names: HashSet<String> = plans.iter().map(|p| p.container_name.clone()).collect();
726 let index: HashMap<String, usize> = plans
727 .iter()
728 .enumerate()
729 .map(|(i, p)| (p.container_name.clone(), i))
730 .collect();
731 let mut in_degree: Vec<usize> = plans
736 .iter()
737 .map(|p| {
738 p.depends_on
739 .iter()
740 .filter(|d| names.contains(&d.container_name))
741 .count()
742 })
743 .collect();
744 let mut dependants: Vec<Vec<usize>> = vec![Vec::new(); plans.len()];
746 for (i, p) in plans.iter().enumerate() {
747 for d in &p.depends_on {
748 if let Some(&di) = index.get(&d.container_name) {
749 dependants[di].push(i);
750 }
751 }
752 }
753 let mut ordered: Vec<ContainerPlan> = Vec::with_capacity(plans.len());
754 let mut emitted: Vec<bool> = vec![false; plans.len()];
755 loop {
756 let next = (0..plans.len()).find(|&i| !emitted[i] && in_degree[i] == 0);
759 match next {
760 Some(i) => {
761 emitted[i] = true;
762 ordered.push(plans[i].clone());
763 for &di in &dependants[i] {
764 if in_degree[di] > 0 {
765 in_degree[di] -= 1;
766 }
767 }
768 }
769 None => break,
770 }
771 }
772 for (i, p) in plans.into_iter().enumerate() {
774 if !emitted[i] {
775 ordered.push(p);
776 }
777 }
778 ordered
779}
780
781pub(crate) fn find_depends_on_cycle(
790 container_definitions: &[serde_json::Value],
791) -> Option<(String, String)> {
792 use std::collections::HashMap;
793
794 let names: Vec<String> = container_definitions
795 .iter()
796 .filter_map(|c| c.get("name").and_then(|n| n.as_str()).map(String::from))
797 .collect();
798 let index: HashMap<&str, usize> = names
799 .iter()
800 .enumerate()
801 .map(|(i, n)| (n.as_str(), i))
802 .collect();
803
804 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); names.len()];
805 for (i, cd) in container_definitions.iter().enumerate() {
806 if i >= names.len() {
807 continue;
808 }
809 let Some(deps) = cd.get("dependsOn").and_then(|v| v.as_array()) else {
810 continue;
811 };
812 for d in deps {
813 let Some(target) = d.get("containerName").and_then(|v| v.as_str()) else {
814 continue;
815 };
816 if let Some(&j) = index.get(target) {
817 adj[i].push(j);
819 }
820 }
821 }
822
823 let mut state = vec![0u8; names.len()];
827 let mut stack: Vec<(usize, usize)> = Vec::new();
828 for start in 0..names.len() {
829 if state[start] != 0 {
830 continue;
831 }
832 stack.clear();
833 stack.push((start, 0));
834 state[start] = 1;
835 while let Some(&(node, next_edge)) = stack.last() {
836 if next_edge < adj[node].len() {
837 let nb = adj[node][next_edge];
838 stack.last_mut().unwrap().1 += 1;
839 match state[nb] {
840 0 => {
841 state[nb] = 1;
842 stack.push((nb, 0));
843 }
844 1 => {
845 return Some((names[node].clone(), names[nb].clone()));
846 }
847 _ => {}
848 }
849 } else {
850 state[node] = 2;
851 stack.pop();
852 }
853 }
854 }
855 None
856}
857
858#[derive(Debug, Clone)]
862struct InspectedState {
863 started: bool,
864 exited: bool,
865 exit_code: i64,
866 health: Option<String>,
867}
868
869async fn inspect_container_state(cli: &str, container_id: &str) -> Option<InspectedState> {
873 let format =
876 "{{.State.Status}}|{{.State.Running}}|{{.State.ExitCode}}|{{if .State.Health}}{{.State.Health.Status}}{{else}}<none>{{end}}";
877 let out = Command::new(cli)
878 .args(["inspect", "-f", format, container_id])
879 .output()
880 .await
881 .ok()?;
882 if !out.status.success() {
883 return None;
884 }
885 let raw = String::from_utf8_lossy(&out.stdout).trim().to_string();
886 let parts: Vec<&str> = raw.split('|').collect();
887 if parts.len() < 4 {
888 return None;
889 }
890 let status = parts[0];
891 let running = parts[1] == "true";
892 let exit_code: i64 = parts[2].parse().unwrap_or(-1);
893 let health = match parts[3] {
894 "<none>" | "" => None,
895 other => Some(other.to_string()),
896 };
897 let started = running || status == "exited" || status == "running" || status == "dead";
901 let exited = status == "exited" || status == "dead";
902 Some(InspectedState {
903 started,
904 exited,
905 exit_code,
906 health,
907 })
908}
909
910fn condition_is_met(condition: DependsOnCondition, state: &InspectedState) -> bool {
914 match condition {
915 DependsOnCondition::Start => state.started,
916 DependsOnCondition::Complete => state.exited,
917 DependsOnCondition::Success => state.exited && state.exit_code == 0,
918 DependsOnCondition::Healthy => state.health.as_deref() == Some("healthy"),
919 }
920}
921
922#[cfg(test)]
926pub(crate) fn __test_parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
927 parse_port_mapping(value)
928}
929
930fn parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
936 let cmd_arr = value.get("command")?.as_array()?;
937 let command: Vec<String> = cmd_arr
938 .iter()
939 .filter_map(|v| v.as_str().map(String::from))
940 .collect();
941 if command.is_empty() {
942 return None;
943 }
944 if command.first().map(|s| s.as_str()) == Some("NONE") {
945 return None;
946 }
947 let read_u32 = |key: &str, default: u32| -> u32 {
948 value
949 .get(key)
950 .and_then(|v| v.as_i64())
951 .filter(|n| (0..=u32::MAX as i64).contains(n))
952 .map(|n| n as u32)
953 .unwrap_or(default)
954 };
955 Some(HealthCheckSpec {
956 command,
957 interval_seconds: read_u32("interval", 30),
958 timeout_seconds: read_u32("timeout", 5),
959 retries: read_u32("retries", 3),
960 start_period_seconds: read_u32("startPeriod", 0),
961 })
962}
963
964fn parse_ulimit(value: &serde_json::Value) -> Option<Ulimit> {
966 let name = value.get("name").and_then(|v| v.as_str())?;
967 let soft = value
968 .get("softLimit")
969 .and_then(|v| v.as_i64())
970 .filter(|n| *n >= 0)? as i32;
971 let hard = value
972 .get("hardLimit")
973 .and_then(|v| v.as_i64())
974 .filter(|n| *n >= 0)? as i32;
975 Some(Ulimit {
976 name: name.to_string(),
977 soft_limit: soft,
978 hard_limit: hard,
979 })
980}
981
982fn parse_linux_parameters(value: &serde_json::Value) -> Option<LinuxParameters> {
984 let mut lp = LinuxParameters::default();
985 if let Some(arr) = value
986 .get("capabilities")
987 .and_then(|v| v.get("add"))
988 .and_then(|v| v.as_array())
989 {
990 lp.capabilities_add = arr
991 .iter()
992 .filter_map(|v| v.as_str().map(String::from))
993 .collect();
994 }
995 if let Some(arr) = value
996 .get("capabilities")
997 .and_then(|v| v.get("drop"))
998 .and_then(|v| v.as_array())
999 {
1000 lp.capabilities_drop = arr
1001 .iter()
1002 .filter_map(|v| v.as_str().map(String::from))
1003 .collect();
1004 }
1005 if let Some(arr) = value.get("devices").and_then(|v| v.as_array()) {
1006 lp.devices = arr.iter().filter_map(parse_device).collect();
1007 }
1008 lp.init_process_enabled = value
1009 .get("initProcessEnabled")
1010 .and_then(|v| v.as_bool())
1011 .unwrap_or(false);
1012 lp.shared_memory_size = value
1013 .get("sharedMemorySize")
1014 .and_then(|v| v.as_i64())
1015 .map(|n| n as i32);
1016 if let Some(arr) = value.get("sysctl").and_then(|v| v.as_array()) {
1017 lp.sysctls = arr.iter().filter_map(parse_sysctl).collect();
1018 }
1019 if let Some(arr) = value.get("tmpfs").and_then(|v| v.as_array()) {
1020 lp.tmpfs = arr.iter().filter_map(parse_tmpfs).collect();
1021 }
1022 lp.privileged = value
1023 .get("privileged")
1024 .and_then(|v| v.as_bool())
1025 .unwrap_or(false);
1026 Some(lp)
1027}
1028
1029fn parse_device(value: &serde_json::Value) -> Option<Device> {
1030 let host_path = value.get("hostPath").and_then(|v| v.as_str())?.to_string();
1031 let container_path = value
1032 .get("containerPath")
1033 .and_then(|v| v.as_str())?
1034 .to_string();
1035 let permissions = value
1036 .get("permissions")
1037 .and_then(|v| v.as_str())
1038 .unwrap_or("rwm")
1039 .to_string();
1040 Some(Device {
1041 host_path,
1042 container_path,
1043 permissions,
1044 })
1045}
1046
1047fn parse_sysctl(value: &serde_json::Value) -> Option<Sysctl> {
1048 let name = value.get("name").and_then(|v| v.as_str())?.to_string();
1049 let value_str = value.get("value").and_then(|v| v.as_str())?.to_string();
1050 Some(Sysctl {
1051 name,
1052 value: value_str,
1053 })
1054}
1055
1056fn parse_tmpfs(value: &serde_json::Value) -> Option<Tmpfs> {
1057 let container_path = value
1058 .get("containerPath")
1059 .and_then(|v| v.as_str())?
1060 .to_string();
1061 let size = value
1062 .get("size")
1063 .and_then(|v| v.as_i64())
1064 .filter(|n| *n > 0)? as i32;
1065 let mount_options = value
1066 .get("mountOptions")
1067 .and_then(|v| v.as_array())
1068 .map(|arr| {
1069 arr.iter()
1070 .filter_map(|v| v.as_str().map(String::from))
1071 .collect()
1072 })
1073 .unwrap_or_default();
1074 Some(Tmpfs {
1075 container_path,
1076 size,
1077 mount_options,
1078 })
1079}
1080
1081pub(crate) fn render_health_flags(hc: &HealthCheckSpec) -> Vec<String> {
1088 if hc.command.len() < 2 {
1089 return Vec::new();
1090 }
1091 let cmd_kind = hc.command[0].as_str();
1092 if cmd_kind != "CMD" && cmd_kind != "CMD-SHELL" {
1093 return Vec::new();
1094 }
1095 let cmd_string = hc.command[1..].join(" ");
1096 vec![
1097 "--health-cmd".into(),
1098 cmd_string,
1099 format!("--health-interval={}s", hc.interval_seconds),
1100 format!("--health-timeout={}s", hc.timeout_seconds),
1101 format!("--health-retries={}", hc.retries),
1102 format!("--health-start-period={}s", hc.start_period_seconds),
1103 ]
1104}
1105
1106#[cfg(test)]
1110pub(crate) fn __test_parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
1111 parse_health_check(value)
1112}
1113
1114pub(crate) fn docker_health_to_ecs(raw: &str) -> &'static str {
1120 match raw.trim().to_ascii_lowercase().as_str() {
1121 "healthy" => "HEALTHY",
1122 "unhealthy" => "UNHEALTHY",
1123 _ => "UNKNOWN",
1124 }
1125}
1126
1127fn parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
1131 let container_port = value
1132 .get("containerPort")
1133 .and_then(|v| v.as_i64())
1134 .filter(|n| (0..=u16::MAX as i64).contains(n))? as u16;
1135 let host_port_raw = value
1136 .get("hostPort")
1137 .and_then(|v| v.as_i64())
1138 .filter(|n| (0..=u16::MAX as i64).contains(n))
1139 .map(|n| n as u16)
1140 .unwrap_or(0);
1141 let host_port = if host_port_raw == 0 {
1142 container_port
1143 } else {
1144 host_port_raw
1145 };
1146 let protocol = value
1147 .get("protocol")
1148 .and_then(|v| v.as_str())
1149 .map(|s| s.to_ascii_lowercase())
1150 .unwrap_or_else(|| "tcp".to_string());
1151 Some(PortMapping {
1152 container_port,
1153 host_port,
1154 protocol,
1155 })
1156}
1157
1158pub(crate) fn build_run_argv(
1164 plan: &ContainerPlan,
1165 env: &[(String, String)],
1166 task_id: &str,
1167 host_ip: &str,
1168 run_image: &str,
1169 awsvpc_network_ready: bool,
1170) -> Vec<String> {
1171 let mut argv: Vec<String> = Vec::new();
1172 argv.push("run".into());
1173 argv.push("-d".into());
1174 argv.push("--name".into());
1175 argv.push(format!("{}-{}", task_id, plan.container_name));
1176 argv.push("--label".into());
1177 argv.push(format!("fakecloud-ecs-task={}", task_id));
1178 argv.push("--label".into());
1179 argv.push(format!("fakecloud-ecs-container={}", plan.container_name));
1180 argv.push("--add-host".into());
1181 argv.push(format!("host.docker.internal:{}", host_ip));
1182 let use_awsvpc_network = plan.network_mode.as_deref() == Some("awsvpc") && awsvpc_network_ready;
1183 if use_awsvpc_network {
1184 argv.push("--network".into());
1185 argv.push(format!("fakecloud-ecs-{}", task_id));
1186 }
1187 let publish_ports = !use_awsvpc_network;
1193 if publish_ports {
1194 for pm in &plan.port_mappings {
1195 argv.push("--publish".into());
1196 argv.push(format!(
1197 "{}:{}/{}",
1198 pm.container_port, pm.host_port, pm.protocol
1199 ));
1200 }
1201 }
1202 if let Some(ref hc) = plan.health_check {
1203 argv.extend(render_health_flags(hc));
1204 }
1205 for (k, v) in env {
1206 let transformed = v
1207 .replace("http://127.0.0.1:", "http://host.docker.internal:")
1208 .replace("https://127.0.0.1:", "https://host.docker.internal:")
1209 .replace("http://localhost:", "http://host.docker.internal:")
1210 .replace("https://localhost:", "https://host.docker.internal:");
1211 argv.push("-e".into());
1212 argv.push(format!("{}={}", k, transformed));
1213 }
1214 for vm in &plan.volume_mounts {
1219 argv.push("-v".into());
1220 let suffix = if vm.read_only { ":ro" } else { "" };
1221 argv.push(format!("{}:{}{}", vm.source, vm.container_path, suffix));
1222 }
1223 for ul in &plan.ulimits {
1224 argv.push("--ulimit".into());
1225 argv.push(format!("{}={}:{}", ul.name, ul.soft_limit, ul.hard_limit));
1226 }
1227 if let Some(ref lp) = plan.linux_parameters {
1228 for cap in &lp.capabilities_add {
1229 argv.push("--cap-add".into());
1230 argv.push(cap.clone());
1231 }
1232 for cap in &lp.capabilities_drop {
1233 argv.push("--cap-drop".into());
1234 argv.push(cap.clone());
1235 }
1236 for dev in &lp.devices {
1237 argv.push("--device".into());
1238 argv.push(format!(
1239 "{}:{}{}",
1240 dev.host_path, dev.container_path, dev.permissions
1241 ));
1242 }
1243 if lp.init_process_enabled {
1244 argv.push("--init".into());
1245 }
1246 if let Some(size) = lp.shared_memory_size {
1247 argv.push("--shm-size".into());
1248 argv.push(format!("{}m", size));
1249 }
1250 for sys in &lp.sysctls {
1251 argv.push("--sysctl".into());
1252 argv.push(format!("{}={}", sys.name, sys.value));
1253 }
1254 for tmp in &lp.tmpfs {
1255 let mut opts = tmp.mount_options.join(",");
1256 if !opts.is_empty() {
1257 opts = format!(",{}", opts);
1258 }
1259 argv.push("--tmpfs".into());
1260 argv.push(format!("{}:size={}M{}", tmp.container_path, tmp.size, opts));
1261 }
1262 if lp.privileged {
1263 argv.push("--privileged".into());
1264 }
1265 }
1266 if let Some(timeout) = plan.stop_timeout {
1267 argv.push("--stop-timeout".into());
1268 argv.push(format!("{}", timeout));
1269 }
1270 if let Some(ref user) = plan.user {
1271 argv.push("--user".into());
1272 argv.push(user.clone());
1273 }
1274 if let Some(ref wd) = plan.working_directory {
1275 argv.push("--workdir".into());
1276 argv.push(wd.clone());
1277 }
1278 if plan.tty {
1279 argv.push("--tty".into());
1280 }
1281 if plan.interactive {
1282 argv.push("--interactive".into());
1283 }
1284 if plan.readonly_rootfs {
1285 argv.push("--read-only".into());
1286 }
1287 if let Some(first) = plan.entry_point.first() {
1288 argv.push("--entrypoint".into());
1289 argv.push(first.clone());
1290 }
1291 argv.push(run_image.to_string());
1292 for arg in plan.entry_point.iter().skip(1) {
1293 argv.push(arg.clone());
1294 }
1295 for arg in &plan.command {
1296 argv.push(arg.clone());
1297 }
1298 argv
1299}
1300
1301pub(crate) fn network_bindings_for(plan: &ContainerPlan) -> Vec<serde_json::Value> {
1305 if plan.network_mode.as_deref() == Some("awsvpc") {
1306 return Vec::new();
1307 }
1308 plan.port_mappings
1309 .iter()
1310 .map(|pm| {
1311 serde_json::json!({
1312 "bindIP": "0.0.0.0",
1313 "containerPort": pm.container_port,
1314 "hostPort": pm.host_port,
1315 "protocol": pm.protocol,
1316 })
1317 })
1318 .collect()
1319}
1320
1321#[allow(clippy::type_complexity)]
1325pub(crate) fn compute_elbv2_targets(
1326 ecs_state: &crate::state::EcsState,
1327 task: &crate::state::Task,
1328) -> Vec<(String, Vec<(String, Option<i64>)>)> {
1329 let mut result = Vec::new();
1330 let Some(group) = task.group.as_deref() else {
1331 return result;
1332 };
1333 let service_name = group.strip_prefix("service:").unwrap_or(group);
1334 let key = crate::state::EcsState::service_key(&task.cluster_name, service_name);
1335 let Some(service) = ecs_state.services.get(&key) else {
1336 return result;
1337 };
1338
1339 let network_mode = ecs_state
1340 .task_definitions
1341 .get(&task.family)
1342 .and_then(|revs| revs.get(&task.revision))
1343 .and_then(|td| td.network_mode.as_deref());
1344
1345 for lb in &service.load_balancers {
1346 let tg_arn = lb.get("targetGroupArn").and_then(|v| v.as_str());
1347 let container_name = lb.get("containerName").and_then(|v| v.as_str());
1348 let container_port = lb.get("containerPort").and_then(|v| v.as_i64());
1349 let Some(tg_arn) = tg_arn else { continue };
1350 let Some(container_name) = container_name else {
1351 continue;
1352 };
1353
1354 let target_id = if network_mode == Some("awsvpc") {
1355 task.attachments
1356 .iter()
1357 .find(|a| a.attachment_type == "eni")
1358 .and_then(|eni| {
1359 eni.details
1360 .iter()
1361 .find(|d| d.name == "privateIPv4Address")
1362 .map(|d| d.value.clone())
1363 })
1364 } else {
1365 Some("127.0.0.1".to_string())
1366 };
1367
1368 let port = if network_mode == Some("awsvpc") {
1369 container_port
1370 } else {
1371 task.containers
1372 .iter()
1373 .find(|c| c.name == container_name)
1374 .and_then(|c| {
1375 c.network_bindings
1376 .iter()
1377 .find(|nb| {
1378 nb.get("containerPort").and_then(|v| v.as_i64()) == container_port
1379 })
1380 .and_then(|nb| nb.get("hostPort").and_then(|v| v.as_i64()))
1381 })
1382 };
1383
1384 if let Some(id) = target_id {
1385 if let Some(entry) = result.iter_mut().find(|(arn, _)| arn == tg_arn) {
1386 entry.1.push((id, port));
1387 } else {
1388 result.push((tg_arn.to_string(), vec![(id, port)]));
1389 }
1390 }
1391 }
1392 result
1393}
1394
1395struct TaskSnapshot {
1396 task_arn: String,
1397 cluster_arn: String,
1398 launch_type: String,
1399 group: Option<String>,
1400 task_definition_arn: String,
1401 containers: serde_json::Value,
1402}
1403
1404fn snapshot_task(state: &SharedEcsState, account_id: &str, task_id: &str) -> Option<TaskSnapshot> {
1405 let accounts = state.read();
1406 let s = accounts.get(account_id)?;
1407 let task = s.tasks.get(task_id)?;
1408 Some(TaskSnapshot {
1409 task_arn: task.task_arn.clone(),
1410 cluster_arn: task.cluster_arn.clone(),
1411 launch_type: task.launch_type.clone(),
1412 group: task.group.clone(),
1413 task_definition_arn: task.task_definition_arn.clone(),
1414 containers: serde_json::Value::Array(
1415 task.containers
1416 .iter()
1417 .map(|c| {
1418 serde_json::json!({
1419 "containerArn": c.container_arn,
1420 "name": c.name,
1421 "image": c.image,
1422 "lastStatus": c.last_status,
1423 "exitCode": c.exit_code,
1424 "reason": c.reason,
1425 })
1426 })
1427 .collect(),
1428 ),
1429 })
1430}
1431
1432fn cli_works(cli: &str) -> bool {
1433 std::process::Command::new(cli)
1434 .arg("info")
1435 .stdout(std::process::Stdio::null())
1436 .stderr(std::process::Stdio::null())
1437 .status()
1438 .map(|s| s.success())
1439 .unwrap_or(false)
1440}
1441
1442fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
1447 let dir = TempDir::new().ok()?;
1448 let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-ecs-runtime");
1449 let config = serde_json::json!({
1450 "auths": {
1451 format!("127.0.0.1:{server_port}"): { "auth": auth },
1452 }
1453 });
1454 std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
1455 Some(dir)
1456}
1457
1458fn find_container_definition(
1459 state: &crate::state::EcsState,
1460 family: &str,
1461 revision: i32,
1462 name: &str,
1463) -> Option<serde_json::Value> {
1464 state
1465 .task_definitions
1466 .get(family)?
1467 .get(&revision)?
1468 .container_definitions
1469 .iter()
1470 .find(|c| c.get("name").and_then(|v| v.as_str()) == Some(name))
1471 .cloned()
1472}
1473
1474fn mark_pull_started(state: &SharedEcsState, account_id: &str, task_id: &str) {
1475 let mut accounts = state.write();
1476 let Some(s) = accounts.get_mut(account_id) else {
1477 return;
1478 };
1479 let task_arn_cluster = s
1480 .tasks
1481 .get(task_id)
1482 .map(|t| (t.task_arn.clone(), t.cluster_arn.clone()));
1483 if let Some(task) = s.tasks.get_mut(task_id) {
1484 task.pull_started_at = Some(Utc::now());
1485 }
1486 if let Some((arn, cluster_arn)) = task_arn_cluster {
1487 s.push_event(LifecycleEvent {
1488 at: Utc::now(),
1489 event_type: "PullStarted".into(),
1490 task_arn: Some(arn),
1491 cluster_arn: Some(cluster_arn),
1492 last_status: Some("PENDING".into()),
1493 detail: serde_json::json!({}),
1494 });
1495 }
1496}
1497
1498fn mark_pull_stopped(state: &SharedEcsState, account_id: &str, task_id: &str) {
1499 let mut accounts = state.write();
1500 let Some(s) = accounts.get_mut(account_id) else {
1501 return;
1502 };
1503 if let Some(task) = s.tasks.get_mut(task_id) {
1504 task.pull_stopped_at = Some(Utc::now());
1505 }
1506}
1507
1508pub(crate) fn mark_running_multi(
1509 state: &SharedEcsState,
1510 account_id: &str,
1511 task_id: &str,
1512 started: &[RunningContainer],
1513) {
1514 let mut accounts = state.write();
1515 let Some(s) = accounts.get_mut(account_id) else {
1516 return;
1517 };
1518 let (arn, cluster_arn) = {
1519 let Some(task) = s.tasks.get_mut(task_id) else {
1520 return;
1521 };
1522 task.last_status = "RUNNING".into();
1523 task.connectivity = "CONNECTED".into();
1524 task.connectivity_at = Some(Utc::now());
1525 task.started_at = Some(Utc::now());
1526 for rc in started {
1527 if let Some(c) = task.containers.iter_mut().find(|c| c.name == rc.name) {
1528 c.runtime_id = Some(rc.container_id.clone());
1529 c.last_status = "RUNNING".into();
1530 c.network_bindings = rc.network_bindings.clone();
1531 if rc.image_digest.is_some() {
1532 c.image_digest = rc.image_digest.clone();
1533 }
1534 }
1535 }
1536 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1537 cluster.running_tasks_count += 1;
1538 if cluster.pending_tasks_count > 0 {
1539 cluster.pending_tasks_count -= 1;
1540 }
1541 }
1542 if let Some(ref ci_arn) = task.container_instance_arn {
1543 if let Some(ci) = s
1544 .container_instances
1545 .values_mut()
1546 .find(|ci| ci.container_instance_arn == *ci_arn)
1547 {
1548 ci.running_tasks_count += 1;
1549 if ci.pending_tasks_count > 0 {
1550 ci.pending_tasks_count -= 1;
1551 }
1552 }
1553 }
1554 (task.task_arn.clone(), task.cluster_arn.clone())
1555 };
1556 s.push_event(LifecycleEvent {
1557 at: Utc::now(),
1558 event_type: "TaskStateChange".into(),
1559 task_arn: Some(arn),
1560 cluster_arn: Some(cluster_arn),
1561 last_status: Some("RUNNING".into()),
1562 detail: serde_json::json!({}),
1563 });
1564}
1565
1566#[allow(clippy::too_many_arguments)]
1567fn finalize_stopped_multi(
1568 state: &SharedEcsState,
1569 account_id: &str,
1570 task_id: &str,
1571 final_containers: &[RunningContainer],
1572 primary_exit_code: i64,
1573 captured: &str,
1574 stop_code: &str,
1575 stopped_reason: Option<String>,
1576) {
1577 let mut accounts = state.write();
1578 let Some(s) = accounts.get_mut(account_id) else {
1579 return;
1580 };
1581 let (arn, cluster_arn) = {
1582 let Some(task) = s.tasks.get_mut(task_id) else {
1583 return;
1584 };
1585 task.last_status = "STOPPED".into();
1586 task.desired_status = "STOPPED".into();
1587 task.stopping_at = task.stopping_at.or(Some(Utc::now()));
1588 task.stopped_at = Some(Utc::now());
1589 task.stop_code = Some(stop_code.into());
1590 task.stopped_reason = stopped_reason.or(Some(format!("Exit code {}", primary_exit_code)));
1591 task.captured_logs = captured.to_string();
1592 for c in task.containers.iter_mut() {
1593 c.last_status = "STOPPED".into();
1594 if c.exit_code.is_none() {
1595 let mapped = final_containers
1596 .iter()
1597 .find(|r| r.name == c.name)
1598 .and_then(|r| r.exit_code);
1599 c.exit_code = mapped.or(Some(primary_exit_code));
1600 }
1601 }
1602 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1603 if cluster.running_tasks_count > 0 {
1604 cluster.running_tasks_count -= 1;
1605 }
1606 }
1607 if let Some(ref ci_arn) = task.container_instance_arn {
1608 if let Some(ci) = s
1609 .container_instances
1610 .values_mut()
1611 .find(|ci| ci.container_instance_arn == *ci_arn)
1612 {
1613 if ci.running_tasks_count > 0 {
1614 ci.running_tasks_count -= 1;
1615 }
1616 }
1617 }
1618 (task.task_arn.clone(), task.cluster_arn.clone())
1619 };
1620 s.push_event(LifecycleEvent {
1621 at: Utc::now(),
1622 event_type: "TaskStateChange".into(),
1623 task_arn: Some(arn),
1624 cluster_arn: Some(cluster_arn),
1625 last_status: Some("STOPPED".into()),
1626 detail: serde_json::json!({
1627 "exitCode": primary_exit_code,
1628 "stopCode": stop_code,
1629 }),
1630 });
1631}
1632
1633fn finalize_failure(state: &SharedEcsState, account_id: &str, task_id: &str, reason: &str) {
1634 let mut accounts = state.write();
1635 let Some(s) = accounts.get_mut(account_id) else {
1636 return;
1637 };
1638 let (arn, cluster_arn) = {
1639 let Some(task) = s.tasks.get_mut(task_id) else {
1640 return;
1641 };
1642 let was_running = task.last_status == "RUNNING";
1648 task.last_status = "STOPPED".into();
1649 task.desired_status = "STOPPED".into();
1650 task.stopped_at = Some(Utc::now());
1651 task.stop_code = Some("TaskFailedToStart".into());
1652 task.stopped_reason = Some(reason.to_string());
1653 task.captured_logs = format!("[task failed to start]: {reason}");
1657 for c in task.containers.iter_mut() {
1658 c.last_status = "STOPPED".into();
1659 c.reason = Some(reason.to_string());
1660 }
1661 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1662 if was_running {
1663 if cluster.running_tasks_count > 0 {
1664 cluster.running_tasks_count -= 1;
1665 }
1666 } else if cluster.pending_tasks_count > 0 {
1667 cluster.pending_tasks_count -= 1;
1668 }
1669 }
1670 if let Some(ref ci_arn) = task.container_instance_arn {
1671 if let Some(ci) = s
1672 .container_instances
1673 .values_mut()
1674 .find(|ci| ci.container_instance_arn == *ci_arn)
1675 {
1676 if was_running {
1677 if ci.running_tasks_count > 0 {
1678 ci.running_tasks_count -= 1;
1679 }
1680 } else if ci.pending_tasks_count > 0 {
1681 ci.pending_tasks_count -= 1;
1682 }
1683 }
1684 }
1685 (task.task_arn.clone(), task.cluster_arn.clone())
1686 };
1687 s.push_event(LifecycleEvent {
1688 at: Utc::now(),
1689 event_type: "TaskFailedToStart".into(),
1690 task_arn: Some(arn),
1691 cluster_arn: Some(cluster_arn),
1692 last_status: Some("STOPPED".into()),
1693 detail: serde_json::json!({ "reason": reason }),
1694 });
1695}
1696
1697pub async fn sleep(duration: Duration) {
1701 tokio::time::sleep(duration).await;
1702}
1703
1704#[cfg(test)]
1705mod tests {
1706 use super::*;
1707 use crate::state::{EcsState, Task};
1708 use fakecloud_aws::arn::Arn;
1709 use fakecloud_core::multi_account::MultiAccountState;
1710 use parking_lot::RwLock;
1711 use std::sync::Arc;
1712
1713 #[test]
1714 fn cli_works_for_known_missing_binary_is_false() {
1715 assert!(!cli_works("definitely-not-a-real-cli-binary-xyz"));
1716 }
1717
1718 #[test]
1719 fn aws_ecr_uris_translate_for_local_pull() {
1720 assert_eq!(
1721 fakecloud_core::ecr_uri::translate_to_local(
1722 "123456789012.dkr.ecr.us-east-1.amazonaws.com/app:latest",
1723 4566
1724 )
1725 .as_deref(),
1726 Some("127.0.0.1:4566/app:latest")
1727 );
1728 }
1729
1730 fn make_task(task_id: &str) -> Task {
1731 Task {
1732 task_arn: Arn::new(
1733 "ecs",
1734 "us-east-1",
1735 "000000000000",
1736 &format!("task/default/{task_id}"),
1737 )
1738 .to_string(),
1739 task_id: task_id.into(),
1740 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
1741 cluster_name: "default".into(),
1742 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
1743 family: "app".into(),
1744 revision: 1,
1745 container_instance_arn: None,
1746 capacity_provider_name: None,
1747 last_status: "PENDING".into(),
1748 desired_status: "RUNNING".into(),
1749 launch_type: "FARGATE".into(),
1750 platform_version: None,
1751 cpu: None,
1752 memory: None,
1753 containers: Vec::new(),
1754 overrides: serde_json::json!({}),
1755 started_by: None,
1756 group: None,
1757 connectivity: "CONNECTING".into(),
1758 stop_code: None,
1759 stopped_reason: None,
1760 created_at: Utc::now(),
1761 started_at: None,
1762 stopping_at: None,
1763 stopped_at: None,
1764 pull_started_at: None,
1765 pull_stopped_at: None,
1766 connectivity_at: None,
1767 started_by_ref_id: None,
1768 execution_role_arn: None,
1769 task_role_arn: None,
1770 tags: Vec::new(),
1771 awslogs: None,
1772 captured_logs: String::new(),
1773 protection: None,
1774 enable_execute_command: false,
1775 attachments: Vec::new(),
1776 volume_configurations: Vec::new(),
1777 task_set_arn: None,
1778 }
1779 }
1780
1781 #[test]
1782 fn finalize_failure_writes_reason_into_captured_logs() {
1783 let mut accounts: MultiAccountState<EcsState> =
1784 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1785 let acct = accounts.get_or_create("000000000000");
1786 acct.tasks.insert("t1".into(), make_task("t1"));
1787 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1788
1789 finalize_failure(
1790 &state,
1791 "000000000000",
1792 "t1",
1793 "failed to resolve secret DB_PASSWORD",
1794 );
1795
1796 let accounts = state.read();
1797 let task = accounts
1798 .get("000000000000")
1799 .unwrap()
1800 .tasks
1801 .get("t1")
1802 .unwrap();
1803 assert_eq!(task.last_status, "STOPPED");
1804 assert_eq!(task.stop_code.as_deref(), Some("TaskFailedToStart"));
1805 assert!(
1806 task.captured_logs
1807 .contains("failed to resolve secret DB_PASSWORD"),
1808 "captured_logs missing reason: {:?}",
1809 task.captured_logs
1810 );
1811 assert!(
1812 task.captured_logs.starts_with("[task failed to start]:"),
1813 "captured_logs missing prefix: {:?}",
1814 task.captured_logs
1815 );
1816 }
1817
1818 fn make_container(name: &str, essential: bool) -> crate::state::Container {
1819 crate::state::Container {
1820 container_arn: format!(
1821 "arn:aws:ecs:us-east-1:000000000000:container/default/abc/{name}"
1822 ),
1823 name: name.into(),
1824 image: "alpine".into(),
1825 task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
1826 last_status: "RUNNING".into(),
1827 exit_code: None,
1828 reason: None,
1829 runtime_id: Some(format!("dockerid-{name}")),
1830 essential,
1831 cpu: None,
1832 memory: None,
1833 memory_reservation: None,
1834 network_bindings: Vec::new(),
1835 network_interfaces: Vec::new(),
1836 health_status: None,
1837 managed_agents: None,
1838 image_digest: None,
1839 }
1840 }
1841
1842 #[test]
1843 fn task_should_stop_when_essential_exits() {
1844 let containers = vec![
1845 RunningContainer {
1846 name: "app".into(),
1847 container_id: "id-app".into(),
1848 essential: true,
1849 exit_code: Some(0),
1850 network_bindings: Vec::new(),
1851 image_digest: None,
1852 },
1853 RunningContainer {
1854 name: "sidecar".into(),
1855 container_id: "id-sc".into(),
1856 essential: false,
1857 exit_code: None,
1858 network_bindings: Vec::new(),
1859 image_digest: None,
1860 },
1861 ];
1862 assert!(task_should_stop(&containers));
1863 }
1864
1865 #[test]
1866 fn task_keeps_running_when_only_non_essential_exits() {
1867 let containers = vec![
1868 RunningContainer {
1869 name: "app".into(),
1870 container_id: "id-app".into(),
1871 essential: true,
1872 exit_code: None,
1873 network_bindings: Vec::new(),
1874 image_digest: None,
1875 },
1876 RunningContainer {
1877 name: "sidecar".into(),
1878 container_id: "id-sc".into(),
1879 essential: false,
1880 exit_code: Some(0),
1881 network_bindings: Vec::new(),
1882 image_digest: None,
1883 },
1884 ];
1885 assert!(!task_should_stop(&containers));
1886 }
1887
1888 #[test]
1889 fn task_stops_when_all_non_essentials_exit() {
1890 let containers = vec![
1891 RunningContainer {
1892 name: "a".into(),
1893 container_id: "id-a".into(),
1894 essential: false,
1895 exit_code: Some(0),
1896 network_bindings: Vec::new(),
1897 image_digest: None,
1898 },
1899 RunningContainer {
1900 name: "b".into(),
1901 container_id: "id-b".into(),
1902 essential: false,
1903 exit_code: Some(1),
1904 network_bindings: Vec::new(),
1905 image_digest: None,
1906 },
1907 ];
1908 assert!(task_should_stop(&containers));
1909 }
1910
1911 #[test]
1912 fn finalize_stopped_multi_assigns_per_container_exit_codes() {
1913 let mut accounts: MultiAccountState<EcsState> =
1914 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1915 let acct = accounts.get_or_create("000000000000");
1916 let mut t = make_task("t1");
1917 t.containers = vec![
1918 make_container("app", true),
1919 make_container("sidecar", false),
1920 ];
1921 acct.tasks.insert("t1".into(), t);
1922 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1923
1924 let final_containers = vec![
1925 RunningContainer {
1926 name: "app".into(),
1927 container_id: "id-app".into(),
1928 essential: true,
1929 exit_code: Some(0),
1930 network_bindings: Vec::new(),
1931 image_digest: None,
1932 },
1933 RunningContainer {
1934 name: "sidecar".into(),
1935 container_id: "id-sc".into(),
1936 essential: false,
1937 exit_code: Some(137),
1938 network_bindings: Vec::new(),
1939 image_digest: None,
1940 },
1941 ];
1942 finalize_stopped_multi(
1943 &state,
1944 "000000000000",
1945 "t1",
1946 &final_containers,
1947 0,
1948 "captured",
1949 "EssentialContainerExited",
1950 None,
1951 );
1952
1953 let accounts = state.read();
1954 let task = accounts
1955 .get("000000000000")
1956 .unwrap()
1957 .tasks
1958 .get("t1")
1959 .unwrap();
1960 assert_eq!(task.last_status, "STOPPED");
1961 assert_eq!(task.stop_code.as_deref(), Some("EssentialContainerExited"));
1962 let app = task.containers.iter().find(|c| c.name == "app").unwrap();
1963 let sc = task
1964 .containers
1965 .iter()
1966 .find(|c| c.name == "sidecar")
1967 .unwrap();
1968 assert_eq!(app.exit_code, Some(0));
1969 assert_eq!(sc.exit_code, Some(137));
1970 assert_eq!(app.last_status, "STOPPED");
1971 assert_eq!(sc.last_status, "STOPPED");
1972 }
1973
1974 fn plan(name: &str, deps: &[&str]) -> ContainerPlan {
1975 ContainerPlan {
1976 container_name: name.into(),
1977 image: "alpine".into(),
1978 env: Vec::new(),
1979 entry_point: Vec::new(),
1980 command: Vec::new(),
1981 secrets_refs: Vec::new(),
1982 essential: true,
1983 has_task_role: false,
1984 port_mappings: Vec::new(),
1985 network_mode: None,
1986 depends_on: deps
1987 .iter()
1988 .map(|s| DependsOn {
1989 container_name: (*s).to_string(),
1990 condition: DependsOnCondition::Start,
1991 })
1992 .collect(),
1993 health_check: None,
1994 volume_mounts: Vec::new(),
1995 ulimits: Vec::new(),
1996 linux_parameters: None,
1997 stop_timeout: None,
1998 user: None,
1999 working_directory: None,
2000 tty: false,
2001 interactive: false,
2002 readonly_rootfs: false,
2003 }
2004 }
2005
2006 #[test]
2007 fn topo_sort_orders_by_depends_on() {
2008 let plans = vec![plan("sidecar", &["app"]), plan("app", &[])];
2011 let ordered = topo_sort_plans(plans);
2012 assert_eq!(ordered[0].container_name, "app");
2013 assert_eq!(ordered[1].container_name, "sidecar");
2014 }
2015
2016 #[test]
2017 fn topo_sort_preserves_declaration_order_when_no_deps() {
2018 let plans = vec![plan("first", &[]), plan("second", &[]), plan("third", &[])];
2019 let ordered = topo_sort_plans(plans);
2020 let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2021 assert_eq!(names, vec!["first", "second", "third"]);
2022 }
2023
2024 #[test]
2025 fn topo_sort_handles_chain() {
2026 let plans = vec![plan("c", &["b"]), plan("b", &["a"]), plan("a", &[])];
2029 let ordered = topo_sort_plans(plans);
2030 let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2031 assert_eq!(names, vec!["a", "b", "c"]);
2032 }
2033
2034 #[test]
2035 fn topo_sort_ignores_unknown_dependency() {
2036 let plans = vec![plan("only", &["does-not-exist"])];
2040 let ordered = topo_sort_plans(plans);
2041 assert_eq!(ordered.len(), 1);
2042 assert_eq!(ordered[0].container_name, "only");
2043 }
2044
2045 #[test]
2046 fn topo_sort_recovers_from_cycle() {
2047 let plans = vec![plan("a", &["b"]), plan("b", &["a"])];
2050 let ordered = topo_sort_plans(plans);
2051 assert_eq!(ordered.len(), 2);
2052 }
2053
2054 #[test]
2055 fn parse_health_check_fills_aws_defaults() {
2056 let v = serde_json::json!({
2057 "command": ["CMD-SHELL", "curl -f http://localhost/ || exit 1"],
2058 });
2059 let hc = __test_parse_health_check(&v).expect("parsed");
2060 assert_eq!(hc.command[0], "CMD-SHELL");
2061 assert_eq!(hc.interval_seconds, 30);
2062 assert_eq!(hc.timeout_seconds, 5);
2063 assert_eq!(hc.retries, 3);
2064 assert_eq!(hc.start_period_seconds, 0);
2065 }
2066
2067 #[test]
2068 fn parse_health_check_overrides_explicit_values() {
2069 let v = serde_json::json!({
2070 "command": ["CMD", "/probe"],
2071 "interval": 7,
2072 "timeout": 2,
2073 "retries": 9,
2074 "startPeriod": 12,
2075 });
2076 let hc = __test_parse_health_check(&v).expect("parsed");
2077 assert_eq!(hc.interval_seconds, 7);
2078 assert_eq!(hc.timeout_seconds, 2);
2079 assert_eq!(hc.retries, 9);
2080 assert_eq!(hc.start_period_seconds, 12);
2081 }
2082
2083 #[test]
2084 fn parse_health_check_returns_none_for_none_sentinel() {
2085 let v = serde_json::json!({ "command": ["NONE"] });
2088 assert!(__test_parse_health_check(&v).is_none());
2089 }
2090
2091 #[test]
2092 fn parse_health_check_returns_none_for_missing_command() {
2093 let v = serde_json::json!({ "interval": 30 });
2094 assert!(__test_parse_health_check(&v).is_none());
2095 }
2096
2097 #[test]
2098 fn render_health_flags_emits_full_set_for_cmd_shell() {
2099 let hc = HealthCheckSpec {
2100 command: vec!["CMD-SHELL".into(), "curl -f http://localhost/".into()],
2101 interval_seconds: 15,
2102 timeout_seconds: 3,
2103 retries: 4,
2104 start_period_seconds: 10,
2105 };
2106 let flags = render_health_flags(&hc);
2107 assert_eq!(flags[0], "--health-cmd");
2108 assert_eq!(flags[1], "curl -f http://localhost/");
2109 assert!(flags.contains(&"--health-interval=15s".to_string()));
2110 assert!(flags.contains(&"--health-timeout=3s".to_string()));
2111 assert!(flags.contains(&"--health-retries=4".to_string()));
2112 assert!(flags.contains(&"--health-start-period=10s".to_string()));
2113 }
2114
2115 #[test]
2116 fn render_health_flags_joins_cmd_argv_with_spaces() {
2117 let hc = HealthCheckSpec {
2120 command: vec![
2121 "CMD".into(),
2122 "/bin/probe".into(),
2123 "--port".into(),
2124 "8080".into(),
2125 ],
2126 interval_seconds: 30,
2127 timeout_seconds: 5,
2128 retries: 3,
2129 start_period_seconds: 0,
2130 };
2131 let flags = render_health_flags(&hc);
2132 assert_eq!(flags[1], "/bin/probe --port 8080");
2133 }
2134
2135 #[test]
2136 fn build_run_argv_emits_health_flags_when_present() {
2137 let plan = ContainerPlan {
2138 container_name: "app".into(),
2139 image: "alpine".into(),
2140 env: Vec::new(),
2141 entry_point: Vec::new(),
2142 command: Vec::new(),
2143 secrets_refs: Vec::new(),
2144 essential: true,
2145 has_task_role: false,
2146 port_mappings: Vec::new(),
2147 network_mode: None,
2148 depends_on: Vec::new(),
2149 health_check: Some(HealthCheckSpec {
2150 command: vec!["CMD-SHELL".into(), "true".into()],
2151 interval_seconds: 5,
2152 timeout_seconds: 2,
2153 retries: 1,
2154 start_period_seconds: 1,
2155 }),
2156 volume_mounts: Vec::new(),
2157 ulimits: Vec::new(),
2158 linux_parameters: None,
2159 stop_timeout: None,
2160 user: None,
2161 working_directory: None,
2162 tty: false,
2163 interactive: false,
2164 readonly_rootfs: false,
2165 };
2166 let argv = build_run_argv(&plan, &[], "task-1", "host-gateway", "alpine", true);
2167 let joined = argv.join(" ");
2168 assert!(joined.contains("--health-cmd true"), "argv: {joined}");
2169 assert!(joined.contains("--health-interval=5s"), "argv: {joined}");
2170 assert!(joined.contains("--health-timeout=2s"), "argv: {joined}");
2171 assert!(joined.contains("--health-retries=1"), "argv: {joined}");
2172 assert!(
2173 joined.contains("--health-start-period=1s"),
2174 "argv: {joined}"
2175 );
2176 }
2177
2178 #[test]
2179 fn build_run_argv_emits_no_health_flags_when_absent() {
2180 let plan = ContainerPlan {
2181 container_name: "app".into(),
2182 image: "alpine".into(),
2183 env: Vec::new(),
2184 entry_point: Vec::new(),
2185 command: Vec::new(),
2186 secrets_refs: Vec::new(),
2187 essential: true,
2188 has_task_role: false,
2189 port_mappings: Vec::new(),
2190 network_mode: None,
2191 depends_on: Vec::new(),
2192 health_check: None,
2193 volume_mounts: Vec::new(),
2194 ulimits: Vec::new(),
2195 linux_parameters: None,
2196 stop_timeout: None,
2197 user: None,
2198 working_directory: None,
2199 tty: false,
2200 interactive: false,
2201 readonly_rootfs: false,
2202 };
2203 let argv = build_run_argv(&plan, &[], "task-1", "host-gateway", "alpine", true);
2204 assert!(!argv.iter().any(|s| s.starts_with("--health")));
2205 }
2206
2207 #[test]
2208 fn docker_health_to_ecs_maps_known_states() {
2209 assert_eq!(docker_health_to_ecs("healthy"), "HEALTHY");
2210 assert_eq!(docker_health_to_ecs("HEALTHY"), "HEALTHY");
2211 assert_eq!(docker_health_to_ecs("unhealthy"), "UNHEALTHY");
2212 assert_eq!(docker_health_to_ecs("starting"), "UNKNOWN");
2213 assert_eq!(docker_health_to_ecs("none"), "UNKNOWN");
2214 assert_eq!(docker_health_to_ecs(""), "UNKNOWN");
2215 }
2216
2217 #[test]
2220 fn resolve_host_bind_volume_uses_source_path() {
2221 let mut volumes = std::collections::HashMap::new();
2222 let v = serde_json::json!({
2223 "name": "data",
2224 "host": { "sourcePath": "/var/lib/myapp" }
2225 });
2226 volumes.insert("data".to_string(), &v);
2227 let mp = serde_json::json!({
2228 "sourceVolume": "data",
2229 "containerPath": "/app/data",
2230 "readOnly": false
2231 });
2232 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2233 assert_eq!(resolved.source, "/var/lib/myapp");
2234 assert_eq!(resolved.container_path, "/app/data");
2235 assert!(!resolved.read_only);
2236 }
2237
2238 #[test]
2241 fn read_only_mount_renders_ro_suffix() {
2242 let plan = ContainerPlan {
2243 container_name: "app".into(),
2244 image: "alpine".into(),
2245 env: Vec::new(),
2246 entry_point: Vec::new(),
2247 command: Vec::new(),
2248 secrets_refs: Vec::new(),
2249 essential: true,
2250 has_task_role: false,
2251 port_mappings: Vec::new(),
2252 network_mode: None,
2253 depends_on: Vec::new(),
2254 health_check: None,
2255 volume_mounts: vec![VolumeMount {
2256 source: "/host/path".into(),
2257 container_path: "/in/container".into(),
2258 read_only: true,
2259 }],
2260 ulimits: Vec::new(),
2261 linux_parameters: None,
2262 stop_timeout: None,
2263 user: None,
2264 working_directory: None,
2265 tty: false,
2266 interactive: false,
2267 readonly_rootfs: false,
2268 };
2269 let argv = build_run_argv(&plan, &[], "task-1", "host-gateway", "alpine", true);
2270 let pair = argv
2271 .windows(2)
2272 .find(|w| w[0] == "-v")
2273 .expect("expected -v flag");
2274 assert_eq!(pair[1], "/host/path:/in/container:ro");
2275 }
2276
2277 #[test]
2282 fn resolve_efs_volume_uses_stub_dir() {
2283 let mut volumes = std::collections::HashMap::new();
2284 let v = serde_json::json!({
2285 "name": "efs-vol",
2286 "efsVolumeConfiguration": {
2287 "fileSystemId": "fs-12345678",
2288 "rootDirectory": "/exports/app"
2289 }
2290 });
2291 volumes.insert("efs-vol".to_string(), &v);
2292 let mp = serde_json::json!({
2293 "sourceVolume": "efs-vol",
2294 "containerPath": "/mnt/efs"
2295 });
2296 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2297 assert_eq!(
2298 resolved.source,
2299 "/tmp/fakecloud/efs/fs-12345678/exports/app"
2300 );
2301 assert_eq!(resolved.container_path, "/mnt/efs");
2302 }
2303
2304 #[test]
2308 fn efs_without_root_directory_uses_filesystem_root() {
2309 assert_eq!(
2310 stub_dir_for("efs", "fs-abc", "/"),
2311 "/tmp/fakecloud/efs/fs-abc"
2312 );
2313 assert_eq!(
2314 stub_dir_for("efs", "fs-abc", ""),
2315 "/tmp/fakecloud/efs/fs-abc"
2316 );
2317 }
2318
2319 #[test]
2323 fn resolve_docker_named_volume_uses_volume_name() {
2324 let mut volumes = std::collections::HashMap::new();
2325 let v = serde_json::json!({
2326 "name": "named-vol",
2327 "dockerVolumeConfiguration": {
2328 "scope": "task",
2329 "driver": "local"
2330 }
2331 });
2332 volumes.insert("named-vol".to_string(), &v);
2333 let mp = serde_json::json!({
2334 "sourceVolume": "named-vol",
2335 "containerPath": "/data"
2336 });
2337 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2338 assert_eq!(resolved.source, "named-vol");
2339 assert_eq!(resolved.container_path, "/data");
2340 }
2341
2342 #[test]
2345 fn resolve_fsx_volume_uses_stub_dir() {
2346 let mut volumes = std::collections::HashMap::new();
2347 let v = serde_json::json!({
2348 "name": "fsx-vol",
2349 "fsxWindowsFileServerVolumeConfiguration": {
2350 "fileSystemId": "fs-xyz",
2351 "rootDirectory": "share"
2352 }
2353 });
2354 volumes.insert("fsx-vol".to_string(), &v);
2355 let mp = serde_json::json!({
2356 "sourceVolume": "fsx-vol",
2357 "containerPath": "C:\\data"
2358 });
2359 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2360 assert_eq!(resolved.source, "/tmp/fakecloud/fsx/fs-xyz/share");
2361 }
2362
2363 #[test]
2367 fn unknown_source_volume_returns_none() {
2368 let volumes = std::collections::HashMap::new();
2369 let mp = serde_json::json!({
2370 "sourceVolume": "missing",
2371 "containerPath": "/x"
2372 });
2373 assert!(resolve_mount_point(&mp, &volumes).is_none());
2374 }
2375
2376 #[test]
2380 fn find_depends_on_cycle_detects_two_node_cycle() {
2381 let cds = vec![
2382 serde_json::json!({
2383 "name": "a",
2384 "image": "alpine",
2385 "dependsOn": [{"containerName": "b", "condition": "START"}],
2386 }),
2387 serde_json::json!({
2388 "name": "b",
2389 "image": "alpine",
2390 "dependsOn": [{"containerName": "a", "condition": "START"}],
2391 }),
2392 ];
2393 let cycle = find_depends_on_cycle(&cds);
2394 assert!(cycle.is_some(), "expected cycle to be detected");
2395 }
2396
2397 #[test]
2401 fn find_depends_on_cycle_accepts_chain() {
2402 let cds = vec![
2403 serde_json::json!({
2404 "name": "a",
2405 "image": "alpine",
2406 "dependsOn": [{"containerName": "b", "condition": "START"}],
2407 }),
2408 serde_json::json!({
2409 "name": "b",
2410 "image": "alpine",
2411 "dependsOn": [{"containerName": "c", "condition": "START"}],
2412 }),
2413 serde_json::json!({
2414 "name": "c",
2415 "image": "alpine",
2416 }),
2417 ];
2418 assert!(find_depends_on_cycle(&cds).is_none());
2419 }
2420
2421 #[test]
2425 fn find_depends_on_cycle_ignores_unknown_target() {
2426 let cds = vec![serde_json::json!({
2427 "name": "only",
2428 "image": "alpine",
2429 "dependsOn": [{"containerName": "ghost", "condition": "START"}],
2430 })];
2431 assert!(find_depends_on_cycle(&cds).is_none());
2432 }
2433
2434 #[test]
2438 fn condition_is_met_matches_aws_semantics() {
2439 let running = InspectedState {
2440 started: true,
2441 exited: false,
2442 exit_code: 0,
2443 health: None,
2444 };
2445 let exited_ok = InspectedState {
2446 started: true,
2447 exited: true,
2448 exit_code: 0,
2449 health: None,
2450 };
2451 let exited_fail = InspectedState {
2452 started: true,
2453 exited: true,
2454 exit_code: 1,
2455 health: None,
2456 };
2457 let healthy = InspectedState {
2458 started: true,
2459 exited: false,
2460 exit_code: 0,
2461 health: Some("healthy".into()),
2462 };
2463
2464 assert!(condition_is_met(DependsOnCondition::Start, &running));
2467 assert!(condition_is_met(DependsOnCondition::Start, &exited_ok));
2468
2469 assert!(!condition_is_met(DependsOnCondition::Complete, &running));
2471 assert!(condition_is_met(DependsOnCondition::Complete, &exited_ok));
2472 assert!(condition_is_met(DependsOnCondition::Complete, &exited_fail));
2473
2474 assert!(!condition_is_met(DependsOnCondition::Success, &running));
2476 assert!(condition_is_met(DependsOnCondition::Success, &exited_ok));
2477 assert!(!condition_is_met(DependsOnCondition::Success, &exited_fail));
2478
2479 assert!(!condition_is_met(DependsOnCondition::Healthy, &running));
2481 assert!(condition_is_met(DependsOnCondition::Healthy, &healthy));
2482 }
2483
2484 #[test]
2488 fn depends_on_condition_parse_round_trips() {
2489 assert_eq!(
2490 DependsOnCondition::parse("START"),
2491 Some(DependsOnCondition::Start)
2492 );
2493 assert_eq!(
2494 DependsOnCondition::parse("COMPLETE"),
2495 Some(DependsOnCondition::Complete)
2496 );
2497 assert_eq!(
2498 DependsOnCondition::parse("SUCCESS"),
2499 Some(DependsOnCondition::Success)
2500 );
2501 assert_eq!(
2502 DependsOnCondition::parse("HEALTHY"),
2503 Some(DependsOnCondition::Healthy)
2504 );
2505 assert_eq!(DependsOnCondition::parse("start"), None);
2506 assert_eq!(DependsOnCondition::parse("ANY"), None);
2507 }
2508
2509 #[test]
2512 fn build_run_argv_emits_ulimits() {
2513 let plan = ContainerPlan {
2514 container_name: "app".into(),
2515 image: "alpine".into(),
2516 env: Vec::new(),
2517 entry_point: Vec::new(),
2518 command: Vec::new(),
2519 secrets_refs: Vec::new(),
2520 essential: true,
2521 has_task_role: false,
2522 port_mappings: Vec::new(),
2523 network_mode: None,
2524 depends_on: Vec::new(),
2525 health_check: None,
2526 volume_mounts: Vec::new(),
2527 ulimits: vec![Ulimit {
2528 name: "nofile".into(),
2529 soft_limit: 1024,
2530 hard_limit: 2048,
2531 }],
2532 linux_parameters: None,
2533 stop_timeout: None,
2534 user: None,
2535 working_directory: None,
2536 tty: false,
2537 interactive: false,
2538 readonly_rootfs: false,
2539 };
2540 let argv = build_run_argv(&plan, &[], "t", "host", "img", true);
2541 assert!(argv.contains(&"--ulimit".to_string()));
2542 assert!(argv.contains(&"nofile=1024:2048".to_string()));
2543 }
2544
2545 #[test]
2546 fn build_run_argv_emits_linux_parameters() {
2547 let plan = ContainerPlan {
2548 container_name: "app".into(),
2549 image: "alpine".into(),
2550 env: Vec::new(),
2551 entry_point: Vec::new(),
2552 command: Vec::new(),
2553 secrets_refs: Vec::new(),
2554 essential: true,
2555 has_task_role: false,
2556 port_mappings: Vec::new(),
2557 network_mode: None,
2558 depends_on: Vec::new(),
2559 health_check: None,
2560 volume_mounts: Vec::new(),
2561 ulimits: Vec::new(),
2562 linux_parameters: Some(LinuxParameters {
2563 capabilities_add: vec!["NET_ADMIN".into()],
2564 capabilities_drop: vec!["ALL".into()],
2565 devices: vec![Device {
2566 host_path: "/dev/zero".into(),
2567 container_path: "/dev/zero".into(),
2568 permissions: "rwm".into(),
2569 }],
2570 init_process_enabled: true,
2571 shared_memory_size: Some(256),
2572 sysctls: vec![Sysctl {
2573 name: "net.ipv4.ip_forward".into(),
2574 value: "1".into(),
2575 }],
2576 tmpfs: vec![Tmpfs {
2577 container_path: "/tmp".into(),
2578 size: 128,
2579 mount_options: vec!["noexec".into()],
2580 }],
2581 privileged: true,
2582 }),
2583 stop_timeout: Some(30),
2584 user: Some("1000:1000".into()),
2585 working_directory: Some("/app".into()),
2586 tty: true,
2587 interactive: true,
2588 readonly_rootfs: true,
2589 };
2590 let argv = build_run_argv(&plan, &[], "t", "host", "img", true);
2591 assert!(argv.contains(&"--cap-add".to_string()));
2592 assert!(argv.contains(&"NET_ADMIN".to_string()));
2593 assert!(argv.contains(&"--cap-drop".to_string()));
2594 assert!(argv.contains(&"ALL".to_string()));
2595 assert!(argv.contains(&"--device".to_string()));
2596 assert!(argv.contains(&"/dev/zero:/dev/zerorwm".to_string()));
2597 assert!(argv.contains(&"--init".to_string()));
2598 assert!(argv.contains(&"--shm-size".to_string()));
2599 assert!(argv.contains(&"256m".to_string()));
2600 assert!(argv.contains(&"--sysctl".to_string()));
2601 assert!(argv.contains(&"net.ipv4.ip_forward=1".to_string()));
2602 assert!(argv.contains(&"--tmpfs".to_string()));
2603 assert!(argv.contains(&"--privileged".to_string()));
2604 assert!(argv.contains(&"--stop-timeout".to_string()));
2605 assert!(argv.contains(&"30".to_string()));
2606 assert!(argv.contains(&"--user".to_string()));
2607 assert!(argv.contains(&"1000:1000".to_string()));
2608 assert!(argv.contains(&"--workdir".to_string()));
2609 assert!(argv.contains(&"/app".to_string()));
2610 assert!(argv.contains(&"--tty".to_string()));
2611 assert!(argv.contains(&"--interactive".to_string()));
2612 assert!(argv.contains(&"--read-only".to_string()));
2613 }
2614
2615 #[test]
2616 fn parse_linux_parameters_fills_defaults() {
2617 let raw = serde_json::json!({"initProcessEnabled": true});
2618 let lp = parse_linux_parameters(&raw).expect("parses");
2619 assert!(lp.init_process_enabled);
2620 assert!(!lp.privileged);
2621 assert!(lp.capabilities_add.is_empty());
2622 }
2623
2624 #[test]
2625 fn parse_device_uses_default_permissions() {
2626 let raw = serde_json::json!({"hostPath": "/dev/null", "containerPath": "/dev/null"});
2627 let dev = parse_device(&raw).expect("parses");
2628 assert_eq!(dev.permissions, "rwm");
2629 }
2630
2631 #[test]
2632 fn compute_elbv2_targets_empty_when_no_group() {
2633 let mut accounts: MultiAccountState<EcsState> =
2634 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2635 let acct = accounts.get_or_create("000000000000");
2636 let mut task = make_task("t1");
2637 task.group = None;
2638 acct.tasks.insert("t1".into(), task);
2639 let state = acct.clone();
2640 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2641 assert!(targets.is_empty());
2642 }
2643
2644 #[test]
2645 fn compute_elbv2_targets_bridge_mode_uses_localhost_and_host_port() {
2646 let mut accounts: MultiAccountState<EcsState> =
2647 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2648 let acct = accounts.get_or_create("000000000000");
2649
2650 let td = crate::state::TaskDefinition {
2651 family: "app".into(),
2652 revision: 1,
2653 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2654 container_definitions: Vec::new(),
2655 network_mode: Some("bridge".into()),
2656 status: "ACTIVE".into(),
2657 task_role_arn: None,
2658 execution_role_arn: None,
2659 requires_compatibilities: Vec::new(),
2660 compatibilities: Vec::new(),
2661 cpu: None,
2662 memory: None,
2663 pid_mode: None,
2664 ipc_mode: None,
2665 volumes: Vec::new(),
2666 placement_constraints: Vec::new(),
2667 proxy_configuration: None,
2668 inference_accelerators: Vec::new(),
2669 ephemeral_storage: None,
2670 runtime_platform: None,
2671 requires_attributes: Vec::new(),
2672 registered_at: Utc::now(),
2673 registered_by: None,
2674 deregistered_at: None,
2675 tags: Vec::new(),
2676 enable_fault_injection: None,
2677 };
2678 acct.task_definitions.insert("app".into(), {
2679 let mut m = std::collections::BTreeMap::new();
2680 m.insert(1, td);
2681 m
2682 });
2683
2684 let service = crate::state::Service {
2685 service_name: "svc".into(),
2686 service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
2687 cluster_name: "default".into(),
2688 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2689 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2690 family: "app".into(),
2691 revision: 1,
2692 desired_count: 1,
2693 running_count: 0,
2694 pending_count: 0,
2695 launch_type: "FARGATE".into(),
2696 status: "ACTIVE".into(),
2697 scheduling_strategy: "REPLICA".into(),
2698 deployment_controller: "ECS".into(),
2699 minimum_healthy_percent: Some(0),
2700 maximum_percent: Some(200),
2701 circuit_breaker: None,
2702 deployments: Vec::new(),
2703 load_balancers: vec![serde_json::json!({
2704 "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
2705 "containerName": "app",
2706 "containerPort": 80,
2707 })],
2708 service_registries: Vec::new(),
2709 placement_constraints: Vec::new(),
2710 placement_strategy: Vec::new(),
2711 network_configuration: None,
2712 volume_configurations: vec![],
2713 tags: Vec::new(),
2714 created_at: Utc::now(),
2715 created_by: None,
2716 role_arn: None,
2717 platform_version: None,
2718 health_check_grace_period_seconds: None,
2719 enable_execute_command: false,
2720 enable_ecs_managed_tags: false,
2721 propagate_tags: None,
2722 capacity_provider_strategy: Vec::new(),
2723 availability_zone_rebalancing: None,
2724 };
2725 acct.services.insert(
2726 crate::state::EcsState::service_key("default", "svc"),
2727 service,
2728 );
2729
2730 let mut task = make_task("t1");
2731 task.group = Some("service:svc".into());
2732 task.containers = vec![crate::state::Container {
2733 container_arn: "arn:aws:ecs:us-east-1:000000000000:container/default/abc/app".into(),
2734 name: "app".into(),
2735 image: "alpine".into(),
2736 task_arn: task.task_arn.clone(),
2737 last_status: "RUNNING".into(),
2738 exit_code: None,
2739 reason: None,
2740 runtime_id: Some("dockerid-app".into()),
2741 essential: true,
2742 cpu: None,
2743 memory: None,
2744 memory_reservation: None,
2745 network_bindings: vec![serde_json::json!({
2746 "bindIP": "0.0.0.0",
2747 "containerPort": 80,
2748 "hostPort": 32768,
2749 "protocol": "tcp",
2750 })],
2751 network_interfaces: Vec::new(),
2752 health_status: None,
2753 managed_agents: None,
2754 image_digest: None,
2755 }];
2756 acct.tasks.insert("t1".into(), task);
2757
2758 let state = acct.clone();
2759 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2760 assert_eq!(targets.len(), 1);
2761 let (arn, tg_targets) = &targets[0];
2762 assert_eq!(
2763 arn,
2764 "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
2765 );
2766 assert_eq!(tg_targets.len(), 1);
2767 assert_eq!(tg_targets[0].0, "127.0.0.1");
2768 assert_eq!(tg_targets[0].1, Some(32768));
2769 }
2770
2771 #[test]
2772 fn compute_elbv2_targets_awsvpc_uses_eni_ip() {
2773 let mut accounts: MultiAccountState<EcsState> =
2774 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2775 let acct = accounts.get_or_create("000000000000");
2776
2777 let td = crate::state::TaskDefinition {
2778 family: "app".into(),
2779 revision: 1,
2780 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2781 container_definitions: Vec::new(),
2782 network_mode: Some("awsvpc".into()),
2783 status: "ACTIVE".into(),
2784 task_role_arn: None,
2785 execution_role_arn: None,
2786 requires_compatibilities: Vec::new(),
2787 compatibilities: Vec::new(),
2788 cpu: None,
2789 memory: None,
2790 pid_mode: None,
2791 ipc_mode: None,
2792 volumes: Vec::new(),
2793 placement_constraints: Vec::new(),
2794 proxy_configuration: None,
2795 inference_accelerators: Vec::new(),
2796 ephemeral_storage: None,
2797 runtime_platform: None,
2798 requires_attributes: Vec::new(),
2799 registered_at: Utc::now(),
2800 registered_by: None,
2801 deregistered_at: None,
2802 tags: Vec::new(),
2803 enable_fault_injection: None,
2804 };
2805 acct.task_definitions.insert("app".into(), {
2806 let mut m = std::collections::BTreeMap::new();
2807 m.insert(1, td);
2808 m
2809 });
2810
2811 let service = crate::state::Service {
2812 service_name: "svc".into(),
2813 service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
2814 cluster_name: "default".into(),
2815 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2816 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2817 family: "app".into(),
2818 revision: 1,
2819 desired_count: 1,
2820 running_count: 0,
2821 pending_count: 0,
2822 launch_type: "FARGATE".into(),
2823 status: "ACTIVE".into(),
2824 scheduling_strategy: "REPLICA".into(),
2825 deployment_controller: "ECS".into(),
2826 minimum_healthy_percent: Some(0),
2827 maximum_percent: Some(200),
2828 circuit_breaker: None,
2829 deployments: Vec::new(),
2830 load_balancers: vec![serde_json::json!({
2831 "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
2832 "containerName": "app",
2833 "containerPort": 80,
2834 })],
2835 service_registries: Vec::new(),
2836 placement_constraints: Vec::new(),
2837 placement_strategy: Vec::new(),
2838 network_configuration: None,
2839 volume_configurations: vec![],
2840 tags: Vec::new(),
2841 created_at: Utc::now(),
2842 created_by: None,
2843 role_arn: None,
2844 platform_version: None,
2845 health_check_grace_period_seconds: None,
2846 enable_execute_command: false,
2847 enable_ecs_managed_tags: false,
2848 propagate_tags: None,
2849 capacity_provider_strategy: Vec::new(),
2850 availability_zone_rebalancing: None,
2851 };
2852 acct.services.insert(
2853 crate::state::EcsState::service_key("default", "svc"),
2854 service,
2855 );
2856
2857 let mut task = make_task("t1");
2858 task.group = Some("service:svc".into());
2859 task.attachments = vec![crate::state::TaskAttachment {
2860 id: "eni-123".into(),
2861 attachment_type: "eni".into(),
2862 status: "ATTACHED".into(),
2863 details: vec![
2864 crate::state::AttachmentDetail {
2865 name: "privateIPv4Address".into(),
2866 value: "172.18.0.2".into(),
2867 },
2868 crate::state::AttachmentDetail {
2869 name: "macAddress".into(),
2870 value: "02:42:ac:12:00:02".into(),
2871 },
2872 ],
2873 }];
2874 acct.tasks.insert("t1".into(), task);
2875
2876 let state = acct.clone();
2877 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2878 assert_eq!(targets.len(), 1);
2879 let (arn, tg_targets) = &targets[0];
2880 assert_eq!(
2881 arn,
2882 "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
2883 );
2884 assert_eq!(tg_targets.len(), 1);
2885 assert_eq!(tg_targets[0].0, "172.18.0.2");
2886 assert_eq!(tg_targets[0].1, Some(80));
2887 }
2888}