1use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use base64::Engine;
15use chrono::Utc;
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_logs::ingest::{append_events, IngestEvent};
18use fakecloud_logs::SharedLogsState;
19use fakecloud_secretsmanager::SharedSecretsManagerState;
20use fakecloud_ssm::SharedSsmState;
21use parking_lot::RwLock;
22use tempfile::TempDir;
23use tokio::process::Command;
24
25use crate::state::{LifecycleEvent, SharedEcsState};
26
27#[derive(Debug, thiserror::Error)]
28pub enum RuntimeError {
29 #[error("container CLI not found (tried docker, podman)")]
30 NoCli,
31 #[error("image pull failed: {0}")]
32 ImagePull(String),
33 #[error("container start failed: {0}")]
34 ContainerStart(String),
35 #[error("docker wait failed: {0}")]
36 Wait(String),
37}
38
39pub struct EcsRuntime {
41 cli: String,
42 net: fakecloud_core::container_net::HostNetworking,
47 server_port: u16,
52 docker_config: Option<Arc<TempDir>>,
57 containers: RwLock<std::collections::HashMap<String, Vec<(String, String)>>>,
62 delivery_bus: Option<Arc<DeliveryBus>>,
66 logs_state: Option<SharedLogsState>,
70 secretsmanager_state: Option<SharedSecretsManagerState>,
73 ssm_state: Option<SharedSsmState>,
76 k8s: Option<k8s::K8sTaskBackend>,
80}
81
82mod config;
83mod k8s;
84mod lb;
85mod monitoring;
86mod secrets;
87mod task_lifecycle;
88
89impl EcsRuntime {
90 pub fn new(server_port: u16) -> Option<Self> {
95 let cli = fakecloud_core::container_net::detect_container_cli()?;
96 let net = fakecloud_core::container_net::HostNetworking::detect(&cli);
97 let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
98 Some(Self {
99 cli,
100 net,
101 server_port,
102 docker_config,
103 containers: RwLock::new(std::collections::HashMap::new()),
104 delivery_bus: None,
105 logs_state: None,
106 secretsmanager_state: None,
107 ssm_state: None,
108 k8s: None,
109 })
110 }
111
112 pub async fn new_k8s(server_port: u16) -> Result<Self, k8s::BackendInitError> {
116 let backend = k8s::K8sTaskBackend::from_env(server_port).await?;
117 let net = fakecloud_core::container_net::HostNetworking {
120 host_alias: String::new(),
121 add_host_arg: None,
122 sibling_host: String::new(),
123 };
124 Ok(Self {
125 cli: String::new(),
126 net,
127 server_port,
128 docker_config: None,
129 containers: RwLock::new(std::collections::HashMap::new()),
130 delivery_bus: None,
131 logs_state: None,
132 secretsmanager_state: None,
133 ssm_state: None,
134 k8s: Some(backend),
135 })
136 }
137
138 pub fn cli_name(&self) -> &str {
140 if self.k8s.is_some() {
141 "kubernetes"
142 } else {
143 &self.cli
144 }
145 }
146
147 pub async fn reap_stale(&self) {
150 if let Some(k) = &self.k8s {
151 k.reap_stale().await;
152 }
153 }
154
155 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
158 self.delivery_bus = Some(bus);
159 self
160 }
161
162 pub fn with_logs(mut self, logs: SharedLogsState) -> Self {
165 self.logs_state = Some(logs);
166 self
167 }
168}
169
170#[derive(Clone, Debug)]
172pub(crate) struct ContainerPlan {
173 pub(crate) container_name: String,
174 pub(crate) image: String,
175 pub(crate) env: Vec<(String, String)>,
176 pub(crate) entry_point: Vec<String>,
177 pub(crate) command: Vec<String>,
178 pub(crate) secrets_refs: Vec<(String, String)>,
179 pub(crate) essential: bool,
180 pub(crate) has_task_role: bool,
181 pub(crate) port_mappings: Vec<PortMapping>,
186 pub(crate) network_mode: Option<String>,
191 pub(crate) depends_on: Vec<DependsOn>,
199 pub(crate) health_check: Option<HealthCheckSpec>,
206 pub(crate) volume_mounts: Vec<VolumeMount>,
211 pub(crate) ulimits: Vec<Ulimit>,
214 pub(crate) linux_parameters: Option<LinuxParameters>,
218 pub(crate) stop_timeout: Option<u32>,
220 pub(crate) user: Option<String>,
222 pub(crate) working_directory: Option<String>,
224 pub(crate) tty: bool,
226 pub(crate) interactive: bool,
228 pub(crate) readonly_rootfs: bool,
230}
231
232#[derive(Clone, Debug, PartialEq, Eq)]
238pub(crate) struct DependsOn {
239 pub container_name: String,
240 pub condition: DependsOnCondition,
241}
242
243#[derive(Clone, Copy, Debug, PartialEq, Eq)]
247pub(crate) enum DependsOnCondition {
248 Start,
251 Complete,
253 Success,
255 Healthy,
259}
260
261impl DependsOnCondition {
262 pub fn parse(raw: &str) -> Option<Self> {
266 match raw {
267 "START" => Some(Self::Start),
268 "COMPLETE" => Some(Self::Complete),
269 "SUCCESS" => Some(Self::Success),
270 "HEALTHY" => Some(Self::Healthy),
271 _ => None,
272 }
273 }
274
275 pub fn as_aws_str(self) -> &'static str {
279 match self {
280 Self::Start => "START",
281 Self::Complete => "COMPLETE",
282 Self::Success => "SUCCESS",
283 Self::Healthy => "HEALTHY",
284 }
285 }
286}
287
288#[derive(Clone, Debug, PartialEq, Eq)]
294pub(crate) struct HealthCheckSpec {
295 pub command: Vec<String>,
301 pub interval_seconds: u32,
302 pub timeout_seconds: u32,
303 pub retries: u32,
304 pub start_period_seconds: u32,
305}
306
307#[derive(Clone, Debug, PartialEq, Eq)]
311pub(crate) struct PortMapping {
312 pub container_port: u16,
313 pub host_port: u16,
316 pub protocol: String,
318}
319
320#[derive(Clone, Debug, PartialEq, Eq)]
342pub(crate) struct VolumeMount {
343 pub source: String,
347 pub container_path: String,
350 pub read_only: bool,
354}
355
356#[derive(Clone, Debug, PartialEq, Eq)]
358pub(crate) struct Ulimit {
359 pub name: String,
360 pub soft_limit: i32,
361 pub hard_limit: i32,
362}
363
364#[derive(Clone, Debug, PartialEq, Eq)]
366pub(crate) struct Device {
367 pub host_path: String,
368 pub container_path: String,
369 pub permissions: String,
370}
371
372#[derive(Clone, Debug, PartialEq, Eq)]
374pub(crate) struct Sysctl {
375 pub name: String,
376 pub value: String,
377}
378
379#[derive(Clone, Debug, PartialEq, Eq, Default)]
381pub(crate) struct LinuxParameters {
382 pub capabilities_add: Vec<String>,
383 pub capabilities_drop: Vec<String>,
384 pub devices: Vec<Device>,
385 pub init_process_enabled: bool,
386 pub shared_memory_size: Option<i32>,
387 pub sysctls: Vec<Sysctl>,
388 pub tmpfs: Vec<Tmpfs>,
389 pub privileged: bool,
390}
391
392#[derive(Clone, Debug, PartialEq, Eq)]
394pub(crate) struct Tmpfs {
395 pub container_path: String,
396 pub size: i32,
397 pub mount_options: Vec<String>,
398}
399
400#[derive(Clone, Debug)]
401struct ResolvedContainerPlan {
402 plan: ContainerPlan,
403 env: Vec<(String, String)>,
404}
405
406#[derive(Clone, Debug)]
408struct TaskExitOutcome {
409 exited_index: Option<usize>,
413 exit_code: i64,
414 stop_code: &'static str,
415}
416
417#[derive(Clone, Debug)]
420pub(crate) struct RunningContainer {
421 pub(crate) name: String,
422 pub(crate) container_id: String,
423 pub(crate) essential: bool,
424 pub(crate) exit_code: Option<i64>,
425 pub(crate) network_bindings: Vec<serde_json::Value>,
429 pub(crate) image_digest: Option<String>,
434}
435
436pub(crate) fn task_should_stop(containers: &[RunningContainer]) -> bool {
441 if containers.is_empty() {
442 return true;
443 }
444 let any_essential_exited = containers
445 .iter()
446 .any(|c| c.essential && c.exit_code.is_some());
447 if any_essential_exited {
448 return true;
449 }
450 containers.iter().all(|c| c.exit_code.is_some())
451}
452
453fn build_container_plans(
454 state: &SharedEcsState,
455 account_id: &str,
456 task_id: &str,
457 _server_port: u16,
458) -> Result<Vec<ContainerPlan>, RuntimeError> {
459 let accounts = state.read();
460 let s = accounts
461 .get(account_id)
462 .ok_or_else(|| RuntimeError::ContainerStart("account missing".into()))?;
463 let task = s
464 .tasks
465 .get(task_id)
466 .ok_or_else(|| RuntimeError::ContainerStart("task missing".into()))?;
467 if task.containers.is_empty() {
468 return Err(RuntimeError::ContainerStart(
469 "task has no containers".into(),
470 ));
471 }
472 let has_task_role = task.task_role_arn.is_some();
473 let task_def = s
474 .task_definitions
475 .get(&task.family)
476 .and_then(|revs| revs.get(&task.revision));
477 let network_mode = task_def.and_then(|td| td.network_mode.clone());
478 let volumes_by_name: std::collections::HashMap<String, &serde_json::Value> = task_def
483 .map(|td| {
484 td.volumes
485 .iter()
486 .filter_map(|v| {
487 let name = v.get("name").and_then(|n| n.as_str())?;
488 Some((name.to_string(), v))
489 })
490 .collect()
491 })
492 .unwrap_or_default();
493 let mut plans = Vec::with_capacity(task.containers.len());
494 for container in &task.containers {
495 let def = find_container_definition(s, &task.family, task.revision, &container.name);
496 let secrets_refs = def
497 .as_ref()
498 .and_then(|d| d.get("secrets").and_then(|v| v.as_array()).cloned())
499 .map(|arr| {
500 arr.iter()
501 .filter_map(|e| {
502 let name = e.get("name").and_then(|v| v.as_str())?.to_string();
503 let value_from = e.get("valueFrom").and_then(|v| v.as_str())?.to_string();
504 Some((name, value_from))
505 })
506 .collect::<Vec<_>>()
507 })
508 .unwrap_or_default();
509 let str_array = |key: &str| -> Vec<String> {
510 def.as_ref()
511 .and_then(|d| d.get(key).and_then(|v| v.as_array()).cloned())
512 .map(|arr| {
513 arr.iter()
514 .filter_map(|v| v.as_str().map(String::from))
515 .collect::<Vec<_>>()
516 })
517 .unwrap_or_default()
518 };
519 let env = def
520 .as_ref()
521 .and_then(|d| d.get("environment").and_then(|v| v.as_array()).cloned())
522 .map(|arr| {
523 arr.iter()
524 .filter_map(|e| {
525 let k = e.get("name").and_then(|v| v.as_str())?;
526 let v = e.get("value").and_then(|v| v.as_str()).unwrap_or("");
527 Some((k.to_string(), v.to_string()))
528 })
529 .collect::<Vec<_>>()
530 })
531 .unwrap_or_default();
532 let port_mappings = def
533 .as_ref()
534 .and_then(|d| d.get("portMappings").and_then(|v| v.as_array()).cloned())
535 .map(|arr| {
536 arr.iter()
537 .filter_map(parse_port_mapping)
538 .collect::<Vec<_>>()
539 })
540 .unwrap_or_default();
541 let depends_on = def
542 .as_ref()
543 .and_then(|d| d.get("dependsOn").and_then(|v| v.as_array()).cloned())
544 .map(|arr| {
545 arr.iter()
546 .filter_map(parse_depends_on_entry)
547 .collect::<Vec<_>>()
548 })
549 .unwrap_or_default();
550 let health_check = def
551 .as_ref()
552 .and_then(|d| d.get("healthCheck"))
553 .and_then(parse_health_check);
554 let volume_mounts = def
555 .as_ref()
556 .and_then(|d| d.get("mountPoints").and_then(|v| v.as_array()).cloned())
557 .map(|arr| {
558 arr.iter()
559 .filter_map(|mp| resolve_mount_point(mp, &volumes_by_name))
560 .collect::<Vec<_>>()
561 })
562 .unwrap_or_default();
563 let ulimits = def
564 .as_ref()
565 .and_then(|d| d.get("ulimits").and_then(|v| v.as_array()).cloned())
566 .map(|arr| arr.iter().filter_map(parse_ulimit).collect::<Vec<_>>())
567 .unwrap_or_default();
568 let linux_parameters = def
569 .as_ref()
570 .and_then(|d| d.get("linuxParameters"))
571 .and_then(parse_linux_parameters);
572 let stop_timeout = def.as_ref().and_then(|d| {
573 d.get("stopTimeout")
574 .and_then(|v| v.as_u64())
575 .map(|n| n as u32)
576 });
577 let user = def
578 .as_ref()
579 .and_then(|d| d.get("user").and_then(|v| v.as_str()).map(String::from));
580 let working_directory = def.as_ref().and_then(|d| {
581 d.get("workingDirectory")
582 .and_then(|v| v.as_str())
583 .map(String::from)
584 });
585 let tty = def
586 .as_ref()
587 .and_then(|d| d.get("tty").and_then(|v| v.as_bool()))
588 .unwrap_or(false);
589 let interactive = def
590 .as_ref()
591 .and_then(|d| d.get("interactive").and_then(|v| v.as_bool()))
592 .unwrap_or(false);
593 let readonly_rootfs = def
594 .as_ref()
595 .and_then(|d| d.get("readonlyRootFilesystem").and_then(|v| v.as_bool()))
596 .unwrap_or(false);
597 plans.push(ContainerPlan {
598 container_name: container.name.clone(),
599 image: container.image.clone(),
600 env,
601 entry_point: str_array("entryPoint"),
602 command: str_array("command"),
603 secrets_refs,
604 essential: container.essential,
605 has_task_role,
606 port_mappings,
607 network_mode: network_mode.clone(),
608 depends_on,
609 health_check,
610 volume_mounts,
611 ulimits,
612 linux_parameters,
613 stop_timeout,
614 user,
615 working_directory,
616 tty,
617 interactive,
618 readonly_rootfs,
619 });
620 }
621 let plans = topo_sort_plans(plans);
622 Ok(plans)
623}
624
625fn resolve_mount_point(
633 mount_point: &serde_json::Value,
634 volumes_by_name: &std::collections::HashMap<String, &serde_json::Value>,
635) -> Option<VolumeMount> {
636 let container_path = mount_point
637 .get("containerPath")
638 .and_then(|v| v.as_str())?
639 .to_string();
640 let source_volume = mount_point.get("sourceVolume").and_then(|v| v.as_str())?;
641 let read_only = mount_point
642 .get("readOnly")
643 .and_then(|v| v.as_bool())
644 .unwrap_or(false);
645 let volume = volumes_by_name.get(source_volume)?;
646 let source = resolve_volume_source(source_volume, volume)?;
647 Some(VolumeMount {
648 source,
649 container_path,
650 read_only,
651 })
652}
653
654fn resolve_volume_source(name: &str, volume: &serde_json::Value) -> Option<String> {
673 if let Some(host) = volume.get("host") {
674 if let Some(path) = host.get("sourcePath").and_then(|v| v.as_str()) {
675 if !path.is_empty() {
678 ensure_dir_exists(path);
679 return Some(path.to_string());
680 }
681 }
682 }
683 if let Some(efs) = volume.get("efsVolumeConfiguration") {
684 let fs_id = efs.get("fileSystemId").and_then(|v| v.as_str())?;
685 let root = efs
686 .get("rootDirectory")
687 .and_then(|v| v.as_str())
688 .unwrap_or("/");
689 return Some(shared_volume_name("efs", fs_id, root));
690 }
691 if let Some(fsx) = volume.get("fsxWindowsFileServerVolumeConfiguration") {
692 let fs_id = fsx.get("fileSystemId").and_then(|v| v.as_str())?;
693 let root = fsx
694 .get("rootDirectory")
695 .and_then(|v| v.as_str())
696 .unwrap_or("/");
697 return Some(shared_volume_name("fsx", fs_id, root));
698 }
699 if volume.get("dockerVolumeConfiguration").is_some() {
700 return Some(name.to_string());
703 }
704 Some(name.to_string())
706}
707
708fn shared_volume_name(kind: &str, fs_id: &str, root: &str) -> String {
719 let trimmed = root.trim_start_matches('/').trim_end_matches('/');
720 let fs_id = sanitize_volume_segment(fs_id);
721 if trimmed.is_empty() {
722 format!("fakecloud-{kind}-{fs_id}")
723 } else {
724 format!(
725 "fakecloud-{kind}-{fs_id}-{}",
726 sanitize_volume_segment(trimmed)
727 )
728 }
729}
730
731fn sanitize_volume_segment(s: &str) -> String {
734 s.chars()
735 .map(|c| {
736 if c.is_ascii_alphanumeric() || matches!(c, '_' | '.' | '-') {
737 c
738 } else {
739 '-'
740 }
741 })
742 .collect()
743}
744
745fn ensure_dir_exists(path: &str) {
750 let _ = std::fs::create_dir_all(path);
751}
752
753fn parse_depends_on_entry(value: &serde_json::Value) -> Option<DependsOn> {
758 let container_name = value
759 .get("containerName")
760 .and_then(|v| v.as_str())?
761 .to_string();
762 let raw_condition = value.get("condition").and_then(|v| v.as_str())?;
763 let condition = DependsOnCondition::parse(raw_condition)?;
764 Some(DependsOn {
765 container_name,
766 condition,
767 })
768}
769
770fn topo_sort_plans(plans: Vec<ContainerPlan>) -> Vec<ContainerPlan> {
781 use std::collections::{HashMap, HashSet};
782 let names: HashSet<String> = plans.iter().map(|p| p.container_name.clone()).collect();
783 let index: HashMap<String, usize> = plans
784 .iter()
785 .enumerate()
786 .map(|(i, p)| (p.container_name.clone(), i))
787 .collect();
788 let mut in_degree: Vec<usize> = plans
793 .iter()
794 .map(|p| {
795 p.depends_on
796 .iter()
797 .filter(|d| names.contains(&d.container_name))
798 .count()
799 })
800 .collect();
801 let mut dependants: Vec<Vec<usize>> = vec![Vec::new(); plans.len()];
803 for (i, p) in plans.iter().enumerate() {
804 for d in &p.depends_on {
805 if let Some(&di) = index.get(&d.container_name) {
806 dependants[di].push(i);
807 }
808 }
809 }
810 let mut ordered: Vec<ContainerPlan> = Vec::with_capacity(plans.len());
811 let mut emitted: Vec<bool> = vec![false; plans.len()];
812 loop {
813 let next = (0..plans.len()).find(|&i| !emitted[i] && in_degree[i] == 0);
816 match next {
817 Some(i) => {
818 emitted[i] = true;
819 ordered.push(plans[i].clone());
820 for &di in &dependants[i] {
821 if in_degree[di] > 0 {
822 in_degree[di] -= 1;
823 }
824 }
825 }
826 None => break,
827 }
828 }
829 for (i, p) in plans.into_iter().enumerate() {
831 if !emitted[i] {
832 ordered.push(p);
833 }
834 }
835 ordered
836}
837
838pub(crate) fn find_depends_on_cycle(
847 container_definitions: &[serde_json::Value],
848) -> Option<(String, String)> {
849 use std::collections::HashMap;
850
851 let names: Vec<String> = container_definitions
852 .iter()
853 .filter_map(|c| c.get("name").and_then(|n| n.as_str()).map(String::from))
854 .collect();
855 let index: HashMap<&str, usize> = names
856 .iter()
857 .enumerate()
858 .map(|(i, n)| (n.as_str(), i))
859 .collect();
860
861 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); names.len()];
862 for (i, cd) in container_definitions.iter().enumerate() {
863 if i >= names.len() {
864 continue;
865 }
866 let Some(deps) = cd.get("dependsOn").and_then(|v| v.as_array()) else {
867 continue;
868 };
869 for d in deps {
870 let Some(target) = d.get("containerName").and_then(|v| v.as_str()) else {
871 continue;
872 };
873 if let Some(&j) = index.get(target) {
874 adj[i].push(j);
876 }
877 }
878 }
879
880 let mut state = vec![0u8; names.len()];
884 let mut stack: Vec<(usize, usize)> = Vec::new();
885 for start in 0..names.len() {
886 if state[start] != 0 {
887 continue;
888 }
889 stack.clear();
890 stack.push((start, 0));
891 state[start] = 1;
892 while let Some(&(node, next_edge)) = stack.last() {
893 if next_edge < adj[node].len() {
894 let nb = adj[node][next_edge];
895 stack.last_mut().unwrap().1 += 1;
896 match state[nb] {
897 0 => {
898 state[nb] = 1;
899 stack.push((nb, 0));
900 }
901 1 => {
902 return Some((names[node].clone(), names[nb].clone()));
903 }
904 _ => {}
905 }
906 } else {
907 state[node] = 2;
908 stack.pop();
909 }
910 }
911 }
912 None
913}
914
915#[derive(Debug, Clone)]
919struct InspectedState {
920 started: bool,
921 exited: bool,
922 exit_code: i64,
923 health: Option<String>,
924}
925
926async fn inspect_container_state(cli: &str, container_id: &str) -> Option<InspectedState> {
930 let format =
933 "{{.State.Status}}|{{.State.Running}}|{{.State.ExitCode}}|{{if .State.Health}}{{.State.Health.Status}}{{else}}<none>{{end}}";
934 let out = Command::new(cli)
935 .args(["inspect", "-f", format, container_id])
936 .output()
937 .await
938 .ok()?;
939 if !out.status.success() {
940 return None;
941 }
942 let raw = String::from_utf8_lossy(&out.stdout).trim().to_string();
943 let parts: Vec<&str> = raw.split('|').collect();
944 if parts.len() < 4 {
945 return None;
946 }
947 let status = parts[0];
948 let running = parts[1] == "true";
949 let exit_code: i64 = parts[2].parse().unwrap_or(-1);
950 let health = match parts[3] {
951 "<none>" | "" => None,
952 other => Some(other.to_string()),
953 };
954 let started = running || status == "exited" || status == "running" || status == "dead";
958 let exited = status == "exited" || status == "dead";
959 Some(InspectedState {
960 started,
961 exited,
962 exit_code,
963 health,
964 })
965}
966
967fn condition_is_met(condition: DependsOnCondition, state: &InspectedState) -> bool {
971 match condition {
972 DependsOnCondition::Start => state.started,
973 DependsOnCondition::Complete => state.exited,
974 DependsOnCondition::Success => state.exited && state.exit_code == 0,
975 DependsOnCondition::Healthy => state.health.as_deref() == Some("healthy"),
976 }
977}
978
979#[cfg(test)]
983pub(crate) fn __test_parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
984 parse_port_mapping(value)
985}
986
987fn parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
993 let cmd_arr = value.get("command")?.as_array()?;
994 let command: Vec<String> = cmd_arr
995 .iter()
996 .filter_map(|v| v.as_str().map(String::from))
997 .collect();
998 if command.is_empty() {
999 return None;
1000 }
1001 if command.first().map(|s| s.as_str()) == Some("NONE") {
1002 return None;
1003 }
1004 let read_u32 = |key: &str, default: u32| -> u32 {
1005 value
1006 .get(key)
1007 .and_then(|v| v.as_i64())
1008 .filter(|n| (0..=u32::MAX as i64).contains(n))
1009 .map(|n| n as u32)
1010 .unwrap_or(default)
1011 };
1012 Some(HealthCheckSpec {
1013 command,
1014 interval_seconds: read_u32("interval", 30),
1015 timeout_seconds: read_u32("timeout", 5),
1016 retries: read_u32("retries", 3),
1017 start_period_seconds: read_u32("startPeriod", 0),
1018 })
1019}
1020
1021fn parse_ulimit(value: &serde_json::Value) -> Option<Ulimit> {
1023 let name = value.get("name").and_then(|v| v.as_str())?;
1024 let soft = value
1025 .get("softLimit")
1026 .and_then(|v| v.as_i64())
1027 .filter(|n| *n >= 0)? as i32;
1028 let hard = value
1029 .get("hardLimit")
1030 .and_then(|v| v.as_i64())
1031 .filter(|n| *n >= 0)? as i32;
1032 Some(Ulimit {
1033 name: name.to_string(),
1034 soft_limit: soft,
1035 hard_limit: hard,
1036 })
1037}
1038
1039fn parse_linux_parameters(value: &serde_json::Value) -> Option<LinuxParameters> {
1041 let mut lp = LinuxParameters::default();
1042 if let Some(arr) = value
1043 .get("capabilities")
1044 .and_then(|v| v.get("add"))
1045 .and_then(|v| v.as_array())
1046 {
1047 lp.capabilities_add = arr
1048 .iter()
1049 .filter_map(|v| v.as_str().map(String::from))
1050 .collect();
1051 }
1052 if let Some(arr) = value
1053 .get("capabilities")
1054 .and_then(|v| v.get("drop"))
1055 .and_then(|v| v.as_array())
1056 {
1057 lp.capabilities_drop = arr
1058 .iter()
1059 .filter_map(|v| v.as_str().map(String::from))
1060 .collect();
1061 }
1062 if let Some(arr) = value.get("devices").and_then(|v| v.as_array()) {
1063 lp.devices = arr.iter().filter_map(parse_device).collect();
1064 }
1065 lp.init_process_enabled = value
1066 .get("initProcessEnabled")
1067 .and_then(|v| v.as_bool())
1068 .unwrap_or(false);
1069 lp.shared_memory_size = value
1070 .get("sharedMemorySize")
1071 .and_then(|v| v.as_i64())
1072 .map(|n| n as i32);
1073 if let Some(arr) = value.get("sysctl").and_then(|v| v.as_array()) {
1074 lp.sysctls = arr.iter().filter_map(parse_sysctl).collect();
1075 }
1076 if let Some(arr) = value.get("tmpfs").and_then(|v| v.as_array()) {
1077 lp.tmpfs = arr.iter().filter_map(parse_tmpfs).collect();
1078 }
1079 lp.privileged = value
1080 .get("privileged")
1081 .and_then(|v| v.as_bool())
1082 .unwrap_or(false);
1083 Some(lp)
1084}
1085
1086fn parse_device(value: &serde_json::Value) -> Option<Device> {
1087 let host_path = value.get("hostPath").and_then(|v| v.as_str())?.to_string();
1088 let container_path = value
1089 .get("containerPath")
1090 .and_then(|v| v.as_str())?
1091 .to_string();
1092 let permissions = value
1093 .get("permissions")
1094 .and_then(|v| v.as_str())
1095 .unwrap_or("rwm")
1096 .to_string();
1097 Some(Device {
1098 host_path,
1099 container_path,
1100 permissions,
1101 })
1102}
1103
1104fn parse_sysctl(value: &serde_json::Value) -> Option<Sysctl> {
1105 let name = value.get("name").and_then(|v| v.as_str())?.to_string();
1106 let value_str = value.get("value").and_then(|v| v.as_str())?.to_string();
1107 Some(Sysctl {
1108 name,
1109 value: value_str,
1110 })
1111}
1112
1113fn parse_tmpfs(value: &serde_json::Value) -> Option<Tmpfs> {
1114 let container_path = value
1115 .get("containerPath")
1116 .and_then(|v| v.as_str())?
1117 .to_string();
1118 let size = value
1119 .get("size")
1120 .and_then(|v| v.as_i64())
1121 .filter(|n| *n > 0)? as i32;
1122 let mount_options = value
1123 .get("mountOptions")
1124 .and_then(|v| v.as_array())
1125 .map(|arr| {
1126 arr.iter()
1127 .filter_map(|v| v.as_str().map(String::from))
1128 .collect()
1129 })
1130 .unwrap_or_default();
1131 Some(Tmpfs {
1132 container_path,
1133 size,
1134 mount_options,
1135 })
1136}
1137
1138pub(crate) fn render_health_flags(hc: &HealthCheckSpec) -> Vec<String> {
1145 if hc.command.len() < 2 {
1146 return Vec::new();
1147 }
1148 let cmd_kind = hc.command[0].as_str();
1149 if cmd_kind != "CMD" && cmd_kind != "CMD-SHELL" {
1150 return Vec::new();
1151 }
1152 let cmd_string = hc.command[1..].join(" ");
1153 vec![
1154 "--health-cmd".into(),
1155 cmd_string,
1156 format!("--health-interval={}s", hc.interval_seconds),
1157 format!("--health-timeout={}s", hc.timeout_seconds),
1158 format!("--health-retries={}", hc.retries),
1159 format!("--health-start-period={}s", hc.start_period_seconds),
1160 ]
1161}
1162
1163#[cfg(test)]
1167pub(crate) fn __test_parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
1168 parse_health_check(value)
1169}
1170
1171pub(crate) fn docker_health_to_ecs(raw: &str) -> &'static str {
1177 match raw.trim().to_ascii_lowercase().as_str() {
1178 "healthy" => "HEALTHY",
1179 "unhealthy" => "UNHEALTHY",
1180 _ => "UNKNOWN",
1181 }
1182}
1183
1184fn parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
1188 let container_port = value
1189 .get("containerPort")
1190 .and_then(|v| v.as_i64())
1191 .filter(|n| (0..=u16::MAX as i64).contains(n))? as u16;
1192 let host_port_raw = value
1193 .get("hostPort")
1194 .and_then(|v| v.as_i64())
1195 .filter(|n| (0..=u16::MAX as i64).contains(n))
1196 .map(|n| n as u16)
1197 .unwrap_or(0);
1198 let host_port = if host_port_raw == 0 {
1199 container_port
1200 } else {
1201 host_port_raw
1202 };
1203 let protocol = value
1204 .get("protocol")
1205 .and_then(|v| v.as_str())
1206 .map(|s| s.to_ascii_lowercase())
1207 .unwrap_or_else(|| "tcp".to_string());
1208 Some(PortMapping {
1209 container_port,
1210 host_port,
1211 protocol,
1212 })
1213}
1214
1215pub(crate) fn build_run_argv(
1221 plan: &ContainerPlan,
1222 env: &[(String, String)],
1223 task_id: &str,
1224 host_alias: &str,
1225 add_host_arg: Option<&str>,
1226 run_image: &str,
1227 awsvpc_network_ready: bool,
1228) -> Vec<String> {
1229 let mut argv: Vec<String> = Vec::new();
1230 argv.push("run".into());
1231 argv.push("-d".into());
1232 argv.push("--name".into());
1233 argv.push(format!("{}-{}", task_id, plan.container_name));
1234 argv.push("--label".into());
1235 argv.push(format!("fakecloud-ecs-task={}", task_id));
1236 argv.push("--label".into());
1237 argv.push(format!("fakecloud-ecs-container={}", plan.container_name));
1238 if let Some(arg) = add_host_arg {
1242 argv.push("--add-host".into());
1243 argv.push(arg.to_string());
1244 }
1245 let use_awsvpc_network = plan.network_mode.as_deref() == Some("awsvpc") && awsvpc_network_ready;
1246 if use_awsvpc_network {
1247 argv.push("--network".into());
1248 argv.push(format!("fakecloud-ecs-{}", task_id));
1249 }
1250 let publish_ports = !use_awsvpc_network;
1256 if publish_ports {
1257 for pm in &plan.port_mappings {
1258 argv.push("--publish".into());
1259 argv.push(format!(
1260 "{}:{}/{}",
1261 pm.container_port, pm.host_port, pm.protocol
1262 ));
1263 }
1264 }
1265 if let Some(ref hc) = plan.health_check {
1266 argv.extend(render_health_flags(hc));
1267 }
1268 let http_alias_prefix = format!("http://{host_alias}:");
1269 let https_alias_prefix = format!("https://{host_alias}:");
1270 for (k, v) in env {
1271 let transformed = v
1272 .replace("http://127.0.0.1:", http_alias_prefix.as_str())
1273 .replace("https://127.0.0.1:", https_alias_prefix.as_str())
1274 .replace("http://localhost:", http_alias_prefix.as_str())
1275 .replace("https://localhost:", https_alias_prefix.as_str());
1276 argv.push("-e".into());
1277 argv.push(format!("{}={}", k, transformed));
1278 }
1279 for vm in &plan.volume_mounts {
1284 argv.push("-v".into());
1285 let suffix = if vm.read_only { ":ro" } else { "" };
1286 argv.push(format!("{}:{}{}", vm.source, vm.container_path, suffix));
1287 }
1288 for ul in &plan.ulimits {
1289 argv.push("--ulimit".into());
1290 argv.push(format!("{}={}:{}", ul.name, ul.soft_limit, ul.hard_limit));
1291 }
1292 if let Some(ref lp) = plan.linux_parameters {
1293 for cap in &lp.capabilities_add {
1294 argv.push("--cap-add".into());
1295 argv.push(cap.clone());
1296 }
1297 for cap in &lp.capabilities_drop {
1298 argv.push("--cap-drop".into());
1299 argv.push(cap.clone());
1300 }
1301 for dev in &lp.devices {
1302 argv.push("--device".into());
1303 argv.push(format!(
1304 "{}:{}{}",
1305 dev.host_path, dev.container_path, dev.permissions
1306 ));
1307 }
1308 if lp.init_process_enabled {
1309 argv.push("--init".into());
1310 }
1311 if let Some(size) = lp.shared_memory_size {
1312 argv.push("--shm-size".into());
1313 argv.push(format!("{}m", size));
1314 }
1315 for sys in &lp.sysctls {
1316 argv.push("--sysctl".into());
1317 argv.push(format!("{}={}", sys.name, sys.value));
1318 }
1319 for tmp in &lp.tmpfs {
1320 let mut opts = tmp.mount_options.join(",");
1321 if !opts.is_empty() {
1322 opts = format!(",{}", opts);
1323 }
1324 argv.push("--tmpfs".into());
1325 argv.push(format!("{}:size={}M{}", tmp.container_path, tmp.size, opts));
1326 }
1327 if lp.privileged {
1328 argv.push("--privileged".into());
1329 }
1330 }
1331 if let Some(timeout) = plan.stop_timeout {
1332 argv.push("--stop-timeout".into());
1333 argv.push(format!("{}", timeout));
1334 }
1335 if let Some(ref user) = plan.user {
1336 argv.push("--user".into());
1337 argv.push(user.clone());
1338 }
1339 if let Some(ref wd) = plan.working_directory {
1340 argv.push("--workdir".into());
1341 argv.push(wd.clone());
1342 }
1343 if plan.tty {
1344 argv.push("--tty".into());
1345 }
1346 if plan.interactive {
1347 argv.push("--interactive".into());
1348 }
1349 if plan.readonly_rootfs {
1350 argv.push("--read-only".into());
1351 }
1352 if let Some(first) = plan.entry_point.first() {
1353 argv.push("--entrypoint".into());
1354 argv.push(first.clone());
1355 }
1356 argv.push(run_image.to_string());
1357 for arg in plan.entry_point.iter().skip(1) {
1358 argv.push(arg.clone());
1359 }
1360 for arg in &plan.command {
1361 argv.push(arg.clone());
1362 }
1363 argv
1364}
1365
1366pub(crate) fn network_bindings_for(plan: &ContainerPlan) -> Vec<serde_json::Value> {
1370 if plan.network_mode.as_deref() == Some("awsvpc") {
1371 return Vec::new();
1372 }
1373 plan.port_mappings
1374 .iter()
1375 .map(|pm| {
1376 serde_json::json!({
1377 "bindIP": "0.0.0.0",
1378 "containerPort": pm.container_port,
1379 "hostPort": pm.host_port,
1380 "protocol": pm.protocol,
1381 })
1382 })
1383 .collect()
1384}
1385
1386#[allow(clippy::type_complexity)]
1390pub(crate) fn compute_elbv2_targets(
1391 ecs_state: &crate::state::EcsState,
1392 task: &crate::state::Task,
1393) -> Vec<(String, Vec<(String, Option<i64>)>)> {
1394 let mut result = Vec::new();
1395 let Some(group) = task.group.as_deref() else {
1396 return result;
1397 };
1398 let service_name = group.strip_prefix("service:").unwrap_or(group);
1399 let key = crate::state::EcsState::service_key(&task.cluster_name, service_name);
1400 let Some(service) = ecs_state.services.get(&key) else {
1401 return result;
1402 };
1403
1404 let network_mode = ecs_state
1405 .task_definitions
1406 .get(&task.family)
1407 .and_then(|revs| revs.get(&task.revision))
1408 .and_then(|td| td.network_mode.as_deref());
1409
1410 for lb in &service.load_balancers {
1411 let tg_arn = lb.get("targetGroupArn").and_then(|v| v.as_str());
1412 let container_name = lb.get("containerName").and_then(|v| v.as_str());
1413 let container_port = lb.get("containerPort").and_then(|v| v.as_i64());
1414 let Some(tg_arn) = tg_arn else { continue };
1415 let Some(container_name) = container_name else {
1416 continue;
1417 };
1418
1419 let target_id = if network_mode == Some("awsvpc") {
1420 task.attachments
1421 .iter()
1422 .find(|a| a.attachment_type == "eni")
1423 .and_then(|eni| {
1424 eni.details
1425 .iter()
1426 .find(|d| d.name == "privateIPv4Address")
1427 .map(|d| d.value.clone())
1428 })
1429 } else {
1430 Some("127.0.0.1".to_string())
1431 };
1432
1433 let port = if network_mode == Some("awsvpc") {
1434 container_port
1435 } else {
1436 task.containers
1437 .iter()
1438 .find(|c| c.name == container_name)
1439 .and_then(|c| {
1440 c.network_bindings
1441 .iter()
1442 .find(|nb| {
1443 nb.get("containerPort").and_then(|v| v.as_i64()) == container_port
1444 })
1445 .and_then(|nb| nb.get("hostPort").and_then(|v| v.as_i64()))
1446 })
1447 };
1448
1449 if let Some(id) = target_id {
1450 if let Some(entry) = result.iter_mut().find(|(arn, _)| arn == tg_arn) {
1451 entry.1.push((id, port));
1452 } else {
1453 result.push((tg_arn.to_string(), vec![(id, port)]));
1454 }
1455 }
1456 }
1457 result
1458}
1459
1460struct TaskSnapshot {
1461 task_arn: String,
1462 cluster_arn: String,
1463 launch_type: String,
1464 group: Option<String>,
1465 task_definition_arn: String,
1466 containers: serde_json::Value,
1467}
1468
1469fn snapshot_task(state: &SharedEcsState, account_id: &str, task_id: &str) -> Option<TaskSnapshot> {
1470 let accounts = state.read();
1471 let s = accounts.get(account_id)?;
1472 let task = s.tasks.get(task_id)?;
1473 Some(TaskSnapshot {
1474 task_arn: task.task_arn.clone(),
1475 cluster_arn: task.cluster_arn.clone(),
1476 launch_type: task.launch_type.clone(),
1477 group: task.group.clone(),
1478 task_definition_arn: task.task_definition_arn.clone(),
1479 containers: serde_json::Value::Array(
1480 task.containers
1481 .iter()
1482 .map(|c| {
1483 serde_json::json!({
1484 "containerArn": c.container_arn,
1485 "name": c.name,
1486 "image": c.image,
1487 "lastStatus": c.last_status,
1488 "exitCode": c.exit_code,
1489 "reason": c.reason,
1490 })
1491 })
1492 .collect(),
1493 ),
1494 })
1495}
1496
1497fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
1505 let dir = TempDir::new().ok()?;
1506 let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-ecs-runtime");
1507 let config = serde_json::json!({
1508 "auths": {
1509 format!("127.0.0.1:{server_port}"): { "auth": auth },
1510 format!("host.docker.internal:{server_port}"): { "auth": auth },
1511 }
1512 });
1513 std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
1514 Some(dir)
1515}
1516
1517fn find_container_definition(
1518 state: &crate::state::EcsState,
1519 family: &str,
1520 revision: i32,
1521 name: &str,
1522) -> Option<serde_json::Value> {
1523 state
1524 .task_definitions
1525 .get(family)?
1526 .get(&revision)?
1527 .container_definitions
1528 .iter()
1529 .find(|c| c.get("name").and_then(|v| v.as_str()) == Some(name))
1530 .cloned()
1531}
1532
1533fn mark_pull_started(state: &SharedEcsState, account_id: &str, task_id: &str) {
1534 let mut accounts = state.write();
1535 let Some(s) = accounts.get_mut(account_id) else {
1536 return;
1537 };
1538 let task_arn_cluster = s
1539 .tasks
1540 .get(task_id)
1541 .map(|t| (t.task_arn.clone(), t.cluster_arn.clone()));
1542 if let Some(task) = s.tasks.get_mut(task_id) {
1543 task.pull_started_at = Some(Utc::now());
1544 }
1545 if let Some((arn, cluster_arn)) = task_arn_cluster {
1546 s.push_event(LifecycleEvent {
1547 at: Utc::now(),
1548 event_type: "PullStarted".into(),
1549 task_arn: Some(arn),
1550 cluster_arn: Some(cluster_arn),
1551 last_status: Some("PENDING".into()),
1552 detail: serde_json::json!({}),
1553 });
1554 }
1555}
1556
1557fn mark_pull_stopped(state: &SharedEcsState, account_id: &str, task_id: &str) {
1558 let mut accounts = state.write();
1559 let Some(s) = accounts.get_mut(account_id) else {
1560 return;
1561 };
1562 if let Some(task) = s.tasks.get_mut(task_id) {
1563 task.pull_stopped_at = Some(Utc::now());
1564 }
1565}
1566
1567pub(crate) fn mark_running_multi(
1568 state: &SharedEcsState,
1569 account_id: &str,
1570 task_id: &str,
1571 started: &[RunningContainer],
1572) {
1573 let mut accounts = state.write();
1574 let Some(s) = accounts.get_mut(account_id) else {
1575 return;
1576 };
1577 let (arn, cluster_arn) = {
1578 let Some(task) = s.tasks.get_mut(task_id) else {
1579 return;
1580 };
1581 task.last_status = "RUNNING".into();
1582 task.connectivity = "CONNECTED".into();
1583 task.connectivity_at = Some(Utc::now());
1584 task.started_at = Some(Utc::now());
1585 for rc in started {
1586 if let Some(c) = task.containers.iter_mut().find(|c| c.name == rc.name) {
1587 c.runtime_id = Some(rc.container_id.clone());
1588 c.last_status = "RUNNING".into();
1589 c.network_bindings = rc.network_bindings.clone();
1590 if rc.image_digest.is_some() {
1591 c.image_digest = rc.image_digest.clone();
1592 }
1593 }
1594 }
1595 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1596 cluster.running_tasks_count += 1;
1597 if cluster.pending_tasks_count > 0 {
1598 cluster.pending_tasks_count -= 1;
1599 }
1600 }
1601 if let Some(ref ci_arn) = task.container_instance_arn {
1602 if let Some(ci) = s
1603 .container_instances
1604 .values_mut()
1605 .find(|ci| ci.container_instance_arn == *ci_arn)
1606 {
1607 ci.running_tasks_count += 1;
1608 if ci.pending_tasks_count > 0 {
1609 ci.pending_tasks_count -= 1;
1610 }
1611 }
1612 }
1613 (task.task_arn.clone(), task.cluster_arn.clone())
1614 };
1615 s.push_event(LifecycleEvent {
1616 at: Utc::now(),
1617 event_type: "TaskStateChange".into(),
1618 task_arn: Some(arn),
1619 cluster_arn: Some(cluster_arn),
1620 last_status: Some("RUNNING".into()),
1621 detail: serde_json::json!({}),
1622 });
1623}
1624
1625#[allow(clippy::too_many_arguments)]
1626fn finalize_stopped_multi(
1627 state: &SharedEcsState,
1628 account_id: &str,
1629 task_id: &str,
1630 final_containers: &[RunningContainer],
1631 primary_exit_code: i64,
1632 captured: &str,
1633 stop_code: &str,
1634 stopped_reason: Option<String>,
1635) {
1636 let mut accounts = state.write();
1637 let Some(s) = accounts.get_mut(account_id) else {
1638 return;
1639 };
1640 let (arn, cluster_arn) = {
1641 let Some(task) = s.tasks.get_mut(task_id) else {
1642 return;
1643 };
1644 task.last_status = "STOPPED".into();
1645 task.desired_status = "STOPPED".into();
1646 task.stopping_at = task.stopping_at.or(Some(Utc::now()));
1647 task.stopped_at = Some(Utc::now());
1648 task.stop_code = Some(stop_code.into());
1649 task.stopped_reason = stopped_reason.or(Some(format!("Exit code {}", primary_exit_code)));
1650 task.captured_logs = captured.to_string();
1651 for c in task.containers.iter_mut() {
1652 c.last_status = "STOPPED".into();
1653 if c.exit_code.is_none() {
1654 let mapped = final_containers
1655 .iter()
1656 .find(|r| r.name == c.name)
1657 .and_then(|r| r.exit_code);
1658 c.exit_code = mapped.or(Some(primary_exit_code));
1659 }
1660 }
1661 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1662 if cluster.running_tasks_count > 0 {
1663 cluster.running_tasks_count -= 1;
1664 }
1665 }
1666 if let Some(ref ci_arn) = task.container_instance_arn {
1667 if let Some(ci) = s
1668 .container_instances
1669 .values_mut()
1670 .find(|ci| ci.container_instance_arn == *ci_arn)
1671 {
1672 if ci.running_tasks_count > 0 {
1673 ci.running_tasks_count -= 1;
1674 }
1675 }
1676 }
1677 (task.task_arn.clone(), task.cluster_arn.clone())
1678 };
1679 s.push_event(LifecycleEvent {
1680 at: Utc::now(),
1681 event_type: "TaskStateChange".into(),
1682 task_arn: Some(arn),
1683 cluster_arn: Some(cluster_arn),
1684 last_status: Some("STOPPED".into()),
1685 detail: serde_json::json!({
1686 "exitCode": primary_exit_code,
1687 "stopCode": stop_code,
1688 }),
1689 });
1690}
1691
1692fn finalize_failure(state: &SharedEcsState, account_id: &str, task_id: &str, reason: &str) {
1693 let mut accounts = state.write();
1694 let Some(s) = accounts.get_mut(account_id) else {
1695 return;
1696 };
1697 let (arn, cluster_arn) = {
1698 let Some(task) = s.tasks.get_mut(task_id) else {
1699 return;
1700 };
1701 let was_running = task.last_status == "RUNNING";
1707 task.last_status = "STOPPED".into();
1708 task.desired_status = "STOPPED".into();
1709 task.stopped_at = Some(Utc::now());
1710 task.stop_code = Some("TaskFailedToStart".into());
1711 task.stopped_reason = Some(reason.to_string());
1712 task.captured_logs = format!("[task failed to start]: {reason}");
1716 for c in task.containers.iter_mut() {
1717 c.last_status = "STOPPED".into();
1718 c.reason = Some(reason.to_string());
1719 }
1720 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
1721 if was_running {
1722 if cluster.running_tasks_count > 0 {
1723 cluster.running_tasks_count -= 1;
1724 }
1725 } else if cluster.pending_tasks_count > 0 {
1726 cluster.pending_tasks_count -= 1;
1727 }
1728 }
1729 if let Some(ref ci_arn) = task.container_instance_arn {
1730 if let Some(ci) = s
1731 .container_instances
1732 .values_mut()
1733 .find(|ci| ci.container_instance_arn == *ci_arn)
1734 {
1735 if was_running {
1736 if ci.running_tasks_count > 0 {
1737 ci.running_tasks_count -= 1;
1738 }
1739 } else if ci.pending_tasks_count > 0 {
1740 ci.pending_tasks_count -= 1;
1741 }
1742 }
1743 }
1744 (task.task_arn.clone(), task.cluster_arn.clone())
1745 };
1746 s.push_event(LifecycleEvent {
1747 at: Utc::now(),
1748 event_type: "TaskFailedToStart".into(),
1749 task_arn: Some(arn),
1750 cluster_arn: Some(cluster_arn),
1751 last_status: Some("STOPPED".into()),
1752 detail: serde_json::json!({ "reason": reason }),
1753 });
1754}
1755
1756pub async fn sleep(duration: Duration) {
1760 tokio::time::sleep(duration).await;
1761}
1762
1763#[cfg(test)]
1764mod tests {
1765 use super::*;
1766 use crate::state::{EcsState, Task};
1767 use fakecloud_aws::arn::Arn;
1768 use fakecloud_core::multi_account::MultiAccountState;
1769 use parking_lot::RwLock;
1770 use std::sync::Arc;
1771
1772 #[test]
1773 fn cli_available_for_known_missing_binary_is_false() {
1774 assert!(!fakecloud_core::container_net::cli_available(
1775 "definitely-not-a-real-cli-binary-xyz"
1776 ));
1777 }
1778
1779 #[test]
1780 fn aws_ecr_uris_translate_for_local_pull() {
1781 assert_eq!(
1782 fakecloud_core::ecr_uri::translate_to_local(
1783 "123456789012.dkr.ecr.us-east-1.amazonaws.com/app:latest",
1784 4566
1785 )
1786 .as_deref(),
1787 Some("127.0.0.1:4566/app:latest")
1788 );
1789 }
1790
1791 fn make_task(task_id: &str) -> Task {
1792 Task {
1793 task_arn: Arn::new(
1794 "ecs",
1795 "us-east-1",
1796 "000000000000",
1797 &format!("task/default/{task_id}"),
1798 )
1799 .to_string(),
1800 task_id: task_id.into(),
1801 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
1802 cluster_name: "default".into(),
1803 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
1804 family: "app".into(),
1805 revision: 1,
1806 container_instance_arn: None,
1807 capacity_provider_name: None,
1808 last_status: "PENDING".into(),
1809 desired_status: "RUNNING".into(),
1810 launch_type: "FARGATE".into(),
1811 platform_version: None,
1812 cpu: None,
1813 memory: None,
1814 containers: Vec::new(),
1815 overrides: serde_json::json!({}),
1816 started_by: None,
1817 group: None,
1818 connectivity: "CONNECTING".into(),
1819 stop_code: None,
1820 stopped_reason: None,
1821 created_at: Utc::now(),
1822 started_at: None,
1823 stopping_at: None,
1824 stopped_at: None,
1825 pull_started_at: None,
1826 pull_stopped_at: None,
1827 connectivity_at: None,
1828 started_by_ref_id: None,
1829 execution_role_arn: None,
1830 task_role_arn: None,
1831 tags: Vec::new(),
1832 awslogs: None,
1833 captured_logs: String::new(),
1834 protection: None,
1835 enable_execute_command: false,
1836 attachments: Vec::new(),
1837 volume_configurations: Vec::new(),
1838 task_set_arn: None,
1839 }
1840 }
1841
1842 #[test]
1843 fn finalize_failure_writes_reason_into_captured_logs() {
1844 let mut accounts: MultiAccountState<EcsState> =
1845 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1846 let acct = accounts.get_or_create("000000000000");
1847 acct.tasks.insert("t1".into(), make_task("t1"));
1848 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1849
1850 finalize_failure(
1851 &state,
1852 "000000000000",
1853 "t1",
1854 "failed to resolve secret DB_PASSWORD",
1855 );
1856
1857 let accounts = state.read();
1858 let task = accounts
1859 .get("000000000000")
1860 .unwrap()
1861 .tasks
1862 .get("t1")
1863 .unwrap();
1864 assert_eq!(task.last_status, "STOPPED");
1865 assert_eq!(task.stop_code.as_deref(), Some("TaskFailedToStart"));
1866 assert!(
1867 task.captured_logs
1868 .contains("failed to resolve secret DB_PASSWORD"),
1869 "captured_logs missing reason: {:?}",
1870 task.captured_logs
1871 );
1872 assert!(
1873 task.captured_logs.starts_with("[task failed to start]:"),
1874 "captured_logs missing prefix: {:?}",
1875 task.captured_logs
1876 );
1877 }
1878
1879 fn make_container(name: &str, essential: bool) -> crate::state::Container {
1880 crate::state::Container {
1881 container_arn: format!(
1882 "arn:aws:ecs:us-east-1:000000000000:container/default/abc/{name}"
1883 ),
1884 name: name.into(),
1885 image: "alpine".into(),
1886 task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
1887 last_status: "RUNNING".into(),
1888 exit_code: None,
1889 reason: None,
1890 runtime_id: Some(format!("dockerid-{name}")),
1891 essential,
1892 cpu: None,
1893 memory: None,
1894 memory_reservation: None,
1895 network_bindings: Vec::new(),
1896 network_interfaces: Vec::new(),
1897 health_status: None,
1898 managed_agents: None,
1899 image_digest: None,
1900 }
1901 }
1902
1903 #[test]
1904 fn task_should_stop_when_essential_exits() {
1905 let containers = vec![
1906 RunningContainer {
1907 name: "app".into(),
1908 container_id: "id-app".into(),
1909 essential: true,
1910 exit_code: Some(0),
1911 network_bindings: Vec::new(),
1912 image_digest: None,
1913 },
1914 RunningContainer {
1915 name: "sidecar".into(),
1916 container_id: "id-sc".into(),
1917 essential: false,
1918 exit_code: None,
1919 network_bindings: Vec::new(),
1920 image_digest: None,
1921 },
1922 ];
1923 assert!(task_should_stop(&containers));
1924 }
1925
1926 #[test]
1927 fn task_keeps_running_when_only_non_essential_exits() {
1928 let containers = vec![
1929 RunningContainer {
1930 name: "app".into(),
1931 container_id: "id-app".into(),
1932 essential: true,
1933 exit_code: None,
1934 network_bindings: Vec::new(),
1935 image_digest: None,
1936 },
1937 RunningContainer {
1938 name: "sidecar".into(),
1939 container_id: "id-sc".into(),
1940 essential: false,
1941 exit_code: Some(0),
1942 network_bindings: Vec::new(),
1943 image_digest: None,
1944 },
1945 ];
1946 assert!(!task_should_stop(&containers));
1947 }
1948
1949 #[test]
1950 fn task_stops_when_all_non_essentials_exit() {
1951 let containers = vec![
1952 RunningContainer {
1953 name: "a".into(),
1954 container_id: "id-a".into(),
1955 essential: false,
1956 exit_code: Some(0),
1957 network_bindings: Vec::new(),
1958 image_digest: None,
1959 },
1960 RunningContainer {
1961 name: "b".into(),
1962 container_id: "id-b".into(),
1963 essential: false,
1964 exit_code: Some(1),
1965 network_bindings: Vec::new(),
1966 image_digest: None,
1967 },
1968 ];
1969 assert!(task_should_stop(&containers));
1970 }
1971
1972 #[test]
1973 fn finalize_stopped_multi_assigns_per_container_exit_codes() {
1974 let mut accounts: MultiAccountState<EcsState> =
1975 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
1976 let acct = accounts.get_or_create("000000000000");
1977 let mut t = make_task("t1");
1978 t.containers = vec![
1979 make_container("app", true),
1980 make_container("sidecar", false),
1981 ];
1982 acct.tasks.insert("t1".into(), t);
1983 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
1984
1985 let final_containers = vec![
1986 RunningContainer {
1987 name: "app".into(),
1988 container_id: "id-app".into(),
1989 essential: true,
1990 exit_code: Some(0),
1991 network_bindings: Vec::new(),
1992 image_digest: None,
1993 },
1994 RunningContainer {
1995 name: "sidecar".into(),
1996 container_id: "id-sc".into(),
1997 essential: false,
1998 exit_code: Some(137),
1999 network_bindings: Vec::new(),
2000 image_digest: None,
2001 },
2002 ];
2003 finalize_stopped_multi(
2004 &state,
2005 "000000000000",
2006 "t1",
2007 &final_containers,
2008 0,
2009 "captured",
2010 "EssentialContainerExited",
2011 None,
2012 );
2013
2014 let accounts = state.read();
2015 let task = accounts
2016 .get("000000000000")
2017 .unwrap()
2018 .tasks
2019 .get("t1")
2020 .unwrap();
2021 assert_eq!(task.last_status, "STOPPED");
2022 assert_eq!(task.stop_code.as_deref(), Some("EssentialContainerExited"));
2023 let app = task.containers.iter().find(|c| c.name == "app").unwrap();
2024 let sc = task
2025 .containers
2026 .iter()
2027 .find(|c| c.name == "sidecar")
2028 .unwrap();
2029 assert_eq!(app.exit_code, Some(0));
2030 assert_eq!(sc.exit_code, Some(137));
2031 assert_eq!(app.last_status, "STOPPED");
2032 assert_eq!(sc.last_status, "STOPPED");
2033 }
2034
2035 fn plan(name: &str, deps: &[&str]) -> ContainerPlan {
2036 ContainerPlan {
2037 container_name: name.into(),
2038 image: "alpine".into(),
2039 env: Vec::new(),
2040 entry_point: Vec::new(),
2041 command: Vec::new(),
2042 secrets_refs: Vec::new(),
2043 essential: true,
2044 has_task_role: false,
2045 port_mappings: Vec::new(),
2046 network_mode: None,
2047 depends_on: deps
2048 .iter()
2049 .map(|s| DependsOn {
2050 container_name: (*s).to_string(),
2051 condition: DependsOnCondition::Start,
2052 })
2053 .collect(),
2054 health_check: None,
2055 volume_mounts: Vec::new(),
2056 ulimits: Vec::new(),
2057 linux_parameters: None,
2058 stop_timeout: None,
2059 user: None,
2060 working_directory: None,
2061 tty: false,
2062 interactive: false,
2063 readonly_rootfs: false,
2064 }
2065 }
2066
2067 #[test]
2068 fn topo_sort_orders_by_depends_on() {
2069 let plans = vec![plan("sidecar", &["app"]), plan("app", &[])];
2072 let ordered = topo_sort_plans(plans);
2073 assert_eq!(ordered[0].container_name, "app");
2074 assert_eq!(ordered[1].container_name, "sidecar");
2075 }
2076
2077 #[test]
2078 fn topo_sort_preserves_declaration_order_when_no_deps() {
2079 let plans = vec![plan("first", &[]), plan("second", &[]), plan("third", &[])];
2080 let ordered = topo_sort_plans(plans);
2081 let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2082 assert_eq!(names, vec!["first", "second", "third"]);
2083 }
2084
2085 #[test]
2086 fn topo_sort_handles_chain() {
2087 let plans = vec![plan("c", &["b"]), plan("b", &["a"]), plan("a", &[])];
2090 let ordered = topo_sort_plans(plans);
2091 let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2092 assert_eq!(names, vec!["a", "b", "c"]);
2093 }
2094
2095 #[test]
2096 fn topo_sort_ignores_unknown_dependency() {
2097 let plans = vec![plan("only", &["does-not-exist"])];
2101 let ordered = topo_sort_plans(plans);
2102 assert_eq!(ordered.len(), 1);
2103 assert_eq!(ordered[0].container_name, "only");
2104 }
2105
2106 #[test]
2107 fn topo_sort_recovers_from_cycle() {
2108 let plans = vec![plan("a", &["b"]), plan("b", &["a"])];
2111 let ordered = topo_sort_plans(plans);
2112 assert_eq!(ordered.len(), 2);
2113 }
2114
2115 #[test]
2116 fn parse_health_check_fills_aws_defaults() {
2117 let v = serde_json::json!({
2118 "command": ["CMD-SHELL", "curl -f http://localhost/ || exit 1"],
2119 });
2120 let hc = __test_parse_health_check(&v).expect("parsed");
2121 assert_eq!(hc.command[0], "CMD-SHELL");
2122 assert_eq!(hc.interval_seconds, 30);
2123 assert_eq!(hc.timeout_seconds, 5);
2124 assert_eq!(hc.retries, 3);
2125 assert_eq!(hc.start_period_seconds, 0);
2126 }
2127
2128 #[test]
2129 fn parse_health_check_overrides_explicit_values() {
2130 let v = serde_json::json!({
2131 "command": ["CMD", "/probe"],
2132 "interval": 7,
2133 "timeout": 2,
2134 "retries": 9,
2135 "startPeriod": 12,
2136 });
2137 let hc = __test_parse_health_check(&v).expect("parsed");
2138 assert_eq!(hc.interval_seconds, 7);
2139 assert_eq!(hc.timeout_seconds, 2);
2140 assert_eq!(hc.retries, 9);
2141 assert_eq!(hc.start_period_seconds, 12);
2142 }
2143
2144 #[test]
2145 fn parse_health_check_returns_none_for_none_sentinel() {
2146 let v = serde_json::json!({ "command": ["NONE"] });
2149 assert!(__test_parse_health_check(&v).is_none());
2150 }
2151
2152 #[test]
2153 fn parse_health_check_returns_none_for_missing_command() {
2154 let v = serde_json::json!({ "interval": 30 });
2155 assert!(__test_parse_health_check(&v).is_none());
2156 }
2157
2158 #[test]
2159 fn render_health_flags_emits_full_set_for_cmd_shell() {
2160 let hc = HealthCheckSpec {
2161 command: vec!["CMD-SHELL".into(), "curl -f http://localhost/".into()],
2162 interval_seconds: 15,
2163 timeout_seconds: 3,
2164 retries: 4,
2165 start_period_seconds: 10,
2166 };
2167 let flags = render_health_flags(&hc);
2168 assert_eq!(flags[0], "--health-cmd");
2169 assert_eq!(flags[1], "curl -f http://localhost/");
2170 assert!(flags.contains(&"--health-interval=15s".to_string()));
2171 assert!(flags.contains(&"--health-timeout=3s".to_string()));
2172 assert!(flags.contains(&"--health-retries=4".to_string()));
2173 assert!(flags.contains(&"--health-start-period=10s".to_string()));
2174 }
2175
2176 #[test]
2177 fn render_health_flags_joins_cmd_argv_with_spaces() {
2178 let hc = HealthCheckSpec {
2181 command: vec![
2182 "CMD".into(),
2183 "/bin/probe".into(),
2184 "--port".into(),
2185 "8080".into(),
2186 ],
2187 interval_seconds: 30,
2188 timeout_seconds: 5,
2189 retries: 3,
2190 start_period_seconds: 0,
2191 };
2192 let flags = render_health_flags(&hc);
2193 assert_eq!(flags[1], "/bin/probe --port 8080");
2194 }
2195
2196 #[test]
2197 fn build_run_argv_emits_health_flags_when_present() {
2198 let plan = ContainerPlan {
2199 container_name: "app".into(),
2200 image: "alpine".into(),
2201 env: Vec::new(),
2202 entry_point: Vec::new(),
2203 command: Vec::new(),
2204 secrets_refs: Vec::new(),
2205 essential: true,
2206 has_task_role: false,
2207 port_mappings: Vec::new(),
2208 network_mode: None,
2209 depends_on: Vec::new(),
2210 health_check: Some(HealthCheckSpec {
2211 command: vec!["CMD-SHELL".into(), "true".into()],
2212 interval_seconds: 5,
2213 timeout_seconds: 2,
2214 retries: 1,
2215 start_period_seconds: 1,
2216 }),
2217 volume_mounts: Vec::new(),
2218 ulimits: Vec::new(),
2219 linux_parameters: None,
2220 stop_timeout: None,
2221 user: None,
2222 working_directory: None,
2223 tty: false,
2224 interactive: false,
2225 readonly_rootfs: false,
2226 };
2227 let argv = build_run_argv(
2228 &plan,
2229 &[],
2230 "task-1",
2231 "host.docker.internal",
2232 None,
2233 "alpine",
2234 true,
2235 );
2236 let joined = argv.join(" ");
2237 assert!(joined.contains("--health-cmd true"), "argv: {joined}");
2238 assert!(joined.contains("--health-interval=5s"), "argv: {joined}");
2239 assert!(joined.contains("--health-timeout=2s"), "argv: {joined}");
2240 assert!(joined.contains("--health-retries=1"), "argv: {joined}");
2241 assert!(
2242 joined.contains("--health-start-period=1s"),
2243 "argv: {joined}"
2244 );
2245 }
2246
2247 #[test]
2248 fn build_run_argv_emits_no_health_flags_when_absent() {
2249 let plan = ContainerPlan {
2250 container_name: "app".into(),
2251 image: "alpine".into(),
2252 env: Vec::new(),
2253 entry_point: Vec::new(),
2254 command: Vec::new(),
2255 secrets_refs: Vec::new(),
2256 essential: true,
2257 has_task_role: false,
2258 port_mappings: Vec::new(),
2259 network_mode: None,
2260 depends_on: Vec::new(),
2261 health_check: None,
2262 volume_mounts: Vec::new(),
2263 ulimits: Vec::new(),
2264 linux_parameters: None,
2265 stop_timeout: None,
2266 user: None,
2267 working_directory: None,
2268 tty: false,
2269 interactive: false,
2270 readonly_rootfs: false,
2271 };
2272 let argv = build_run_argv(
2273 &plan,
2274 &[],
2275 "task-1",
2276 "host.docker.internal",
2277 None,
2278 "alpine",
2279 true,
2280 );
2281 assert!(!argv.iter().any(|s| s.starts_with("--health")));
2282 }
2283
2284 #[test]
2285 fn docker_health_to_ecs_maps_known_states() {
2286 assert_eq!(docker_health_to_ecs("healthy"), "HEALTHY");
2287 assert_eq!(docker_health_to_ecs("HEALTHY"), "HEALTHY");
2288 assert_eq!(docker_health_to_ecs("unhealthy"), "UNHEALTHY");
2289 assert_eq!(docker_health_to_ecs("starting"), "UNKNOWN");
2290 assert_eq!(docker_health_to_ecs("none"), "UNKNOWN");
2291 assert_eq!(docker_health_to_ecs(""), "UNKNOWN");
2292 }
2293
2294 #[test]
2297 fn resolve_host_bind_volume_uses_source_path() {
2298 let mut volumes = std::collections::HashMap::new();
2299 let v = serde_json::json!({
2300 "name": "data",
2301 "host": { "sourcePath": "/var/lib/myapp" }
2302 });
2303 volumes.insert("data".to_string(), &v);
2304 let mp = serde_json::json!({
2305 "sourceVolume": "data",
2306 "containerPath": "/app/data",
2307 "readOnly": false
2308 });
2309 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2310 assert_eq!(resolved.source, "/var/lib/myapp");
2311 assert_eq!(resolved.container_path, "/app/data");
2312 assert!(!resolved.read_only);
2313 }
2314
2315 #[test]
2318 fn read_only_mount_renders_ro_suffix() {
2319 let plan = ContainerPlan {
2320 container_name: "app".into(),
2321 image: "alpine".into(),
2322 env: Vec::new(),
2323 entry_point: Vec::new(),
2324 command: Vec::new(),
2325 secrets_refs: Vec::new(),
2326 essential: true,
2327 has_task_role: false,
2328 port_mappings: Vec::new(),
2329 network_mode: None,
2330 depends_on: Vec::new(),
2331 health_check: None,
2332 volume_mounts: vec![VolumeMount {
2333 source: "/host/path".into(),
2334 container_path: "/in/container".into(),
2335 read_only: true,
2336 }],
2337 ulimits: Vec::new(),
2338 linux_parameters: None,
2339 stop_timeout: None,
2340 user: None,
2341 working_directory: None,
2342 tty: false,
2343 interactive: false,
2344 readonly_rootfs: false,
2345 };
2346 let argv = build_run_argv(
2347 &plan,
2348 &[],
2349 "task-1",
2350 "host.docker.internal",
2351 None,
2352 "alpine",
2353 true,
2354 );
2355 let pair = argv
2356 .windows(2)
2357 .find(|w| w[0] == "-v")
2358 .expect("expected -v flag");
2359 assert_eq!(pair[1], "/host/path:/in/container:ro");
2360 }
2361
2362 #[test]
2367 fn resolve_efs_volume_uses_stub_dir() {
2368 let mut volumes = std::collections::HashMap::new();
2369 let v = serde_json::json!({
2370 "name": "efs-vol",
2371 "efsVolumeConfiguration": {
2372 "fileSystemId": "fs-12345678",
2373 "rootDirectory": "/exports/app"
2374 }
2375 });
2376 volumes.insert("efs-vol".to_string(), &v);
2377 let mp = serde_json::json!({
2378 "sourceVolume": "efs-vol",
2379 "containerPath": "/mnt/efs"
2380 });
2381 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2382 assert_eq!(resolved.source, "fakecloud-efs-fs-12345678-exports-app");
2385 assert_eq!(resolved.container_path, "/mnt/efs");
2386 }
2387
2388 #[test]
2392 fn efs_without_root_directory_uses_filesystem_root() {
2393 assert_eq!(
2396 shared_volume_name("efs", "fs-abc", "/"),
2397 "fakecloud-efs-fs-abc"
2398 );
2399 assert_eq!(
2400 shared_volume_name("efs", "fs-abc", ""),
2401 "fakecloud-efs-fs-abc"
2402 );
2403 }
2404
2405 #[test]
2409 fn resolve_docker_named_volume_uses_volume_name() {
2410 let mut volumes = std::collections::HashMap::new();
2411 let v = serde_json::json!({
2412 "name": "named-vol",
2413 "dockerVolumeConfiguration": {
2414 "scope": "task",
2415 "driver": "local"
2416 }
2417 });
2418 volumes.insert("named-vol".to_string(), &v);
2419 let mp = serde_json::json!({
2420 "sourceVolume": "named-vol",
2421 "containerPath": "/data"
2422 });
2423 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2424 assert_eq!(resolved.source, "named-vol");
2425 assert_eq!(resolved.container_path, "/data");
2426 }
2427
2428 #[test]
2431 fn resolve_fsx_volume_uses_stub_dir() {
2432 let mut volumes = std::collections::HashMap::new();
2433 let v = serde_json::json!({
2434 "name": "fsx-vol",
2435 "fsxWindowsFileServerVolumeConfiguration": {
2436 "fileSystemId": "fs-xyz",
2437 "rootDirectory": "share"
2438 }
2439 });
2440 volumes.insert("fsx-vol".to_string(), &v);
2441 let mp = serde_json::json!({
2442 "sourceVolume": "fsx-vol",
2443 "containerPath": "C:\\data"
2444 });
2445 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
2446 assert_eq!(resolved.source, "fakecloud-fsx-fs-xyz-share");
2448 }
2449
2450 #[test]
2454 fn unknown_source_volume_returns_none() {
2455 let volumes = std::collections::HashMap::new();
2456 let mp = serde_json::json!({
2457 "sourceVolume": "missing",
2458 "containerPath": "/x"
2459 });
2460 assert!(resolve_mount_point(&mp, &volumes).is_none());
2461 }
2462
2463 #[test]
2467 fn find_depends_on_cycle_detects_two_node_cycle() {
2468 let cds = vec![
2469 serde_json::json!({
2470 "name": "a",
2471 "image": "alpine",
2472 "dependsOn": [{"containerName": "b", "condition": "START"}],
2473 }),
2474 serde_json::json!({
2475 "name": "b",
2476 "image": "alpine",
2477 "dependsOn": [{"containerName": "a", "condition": "START"}],
2478 }),
2479 ];
2480 let cycle = find_depends_on_cycle(&cds);
2481 assert!(cycle.is_some(), "expected cycle to be detected");
2482 }
2483
2484 #[test]
2488 fn find_depends_on_cycle_accepts_chain() {
2489 let cds = vec![
2490 serde_json::json!({
2491 "name": "a",
2492 "image": "alpine",
2493 "dependsOn": [{"containerName": "b", "condition": "START"}],
2494 }),
2495 serde_json::json!({
2496 "name": "b",
2497 "image": "alpine",
2498 "dependsOn": [{"containerName": "c", "condition": "START"}],
2499 }),
2500 serde_json::json!({
2501 "name": "c",
2502 "image": "alpine",
2503 }),
2504 ];
2505 assert!(find_depends_on_cycle(&cds).is_none());
2506 }
2507
2508 #[test]
2512 fn find_depends_on_cycle_ignores_unknown_target() {
2513 let cds = vec![serde_json::json!({
2514 "name": "only",
2515 "image": "alpine",
2516 "dependsOn": [{"containerName": "ghost", "condition": "START"}],
2517 })];
2518 assert!(find_depends_on_cycle(&cds).is_none());
2519 }
2520
2521 #[test]
2525 fn condition_is_met_matches_aws_semantics() {
2526 let running = InspectedState {
2527 started: true,
2528 exited: false,
2529 exit_code: 0,
2530 health: None,
2531 };
2532 let exited_ok = InspectedState {
2533 started: true,
2534 exited: true,
2535 exit_code: 0,
2536 health: None,
2537 };
2538 let exited_fail = InspectedState {
2539 started: true,
2540 exited: true,
2541 exit_code: 1,
2542 health: None,
2543 };
2544 let healthy = InspectedState {
2545 started: true,
2546 exited: false,
2547 exit_code: 0,
2548 health: Some("healthy".into()),
2549 };
2550
2551 assert!(condition_is_met(DependsOnCondition::Start, &running));
2554 assert!(condition_is_met(DependsOnCondition::Start, &exited_ok));
2555
2556 assert!(!condition_is_met(DependsOnCondition::Complete, &running));
2558 assert!(condition_is_met(DependsOnCondition::Complete, &exited_ok));
2559 assert!(condition_is_met(DependsOnCondition::Complete, &exited_fail));
2560
2561 assert!(!condition_is_met(DependsOnCondition::Success, &running));
2563 assert!(condition_is_met(DependsOnCondition::Success, &exited_ok));
2564 assert!(!condition_is_met(DependsOnCondition::Success, &exited_fail));
2565
2566 assert!(!condition_is_met(DependsOnCondition::Healthy, &running));
2568 assert!(condition_is_met(DependsOnCondition::Healthy, &healthy));
2569 }
2570
2571 #[test]
2575 fn depends_on_condition_parse_round_trips() {
2576 assert_eq!(
2577 DependsOnCondition::parse("START"),
2578 Some(DependsOnCondition::Start)
2579 );
2580 assert_eq!(
2581 DependsOnCondition::parse("COMPLETE"),
2582 Some(DependsOnCondition::Complete)
2583 );
2584 assert_eq!(
2585 DependsOnCondition::parse("SUCCESS"),
2586 Some(DependsOnCondition::Success)
2587 );
2588 assert_eq!(
2589 DependsOnCondition::parse("HEALTHY"),
2590 Some(DependsOnCondition::Healthy)
2591 );
2592 assert_eq!(DependsOnCondition::parse("start"), None);
2593 assert_eq!(DependsOnCondition::parse("ANY"), None);
2594 }
2595
2596 #[test]
2599 fn build_run_argv_emits_ulimits() {
2600 let plan = ContainerPlan {
2601 container_name: "app".into(),
2602 image: "alpine".into(),
2603 env: Vec::new(),
2604 entry_point: Vec::new(),
2605 command: Vec::new(),
2606 secrets_refs: Vec::new(),
2607 essential: true,
2608 has_task_role: false,
2609 port_mappings: Vec::new(),
2610 network_mode: None,
2611 depends_on: Vec::new(),
2612 health_check: None,
2613 volume_mounts: Vec::new(),
2614 ulimits: vec![Ulimit {
2615 name: "nofile".into(),
2616 soft_limit: 1024,
2617 hard_limit: 2048,
2618 }],
2619 linux_parameters: None,
2620 stop_timeout: None,
2621 user: None,
2622 working_directory: None,
2623 tty: false,
2624 interactive: false,
2625 readonly_rootfs: false,
2626 };
2627 let argv = build_run_argv(&plan, &[], "t", "host.docker.internal", None, "img", true);
2628 assert!(argv.contains(&"--ulimit".to_string()));
2629 assert!(argv.contains(&"nofile=1024:2048".to_string()));
2630 }
2631
2632 #[test]
2633 fn build_run_argv_emits_linux_parameters() {
2634 let plan = ContainerPlan {
2635 container_name: "app".into(),
2636 image: "alpine".into(),
2637 env: Vec::new(),
2638 entry_point: Vec::new(),
2639 command: Vec::new(),
2640 secrets_refs: Vec::new(),
2641 essential: true,
2642 has_task_role: false,
2643 port_mappings: Vec::new(),
2644 network_mode: None,
2645 depends_on: Vec::new(),
2646 health_check: None,
2647 volume_mounts: Vec::new(),
2648 ulimits: Vec::new(),
2649 linux_parameters: Some(LinuxParameters {
2650 capabilities_add: vec!["NET_ADMIN".into()],
2651 capabilities_drop: vec!["ALL".into()],
2652 devices: vec![Device {
2653 host_path: "/dev/zero".into(),
2654 container_path: "/dev/zero".into(),
2655 permissions: "rwm".into(),
2656 }],
2657 init_process_enabled: true,
2658 shared_memory_size: Some(256),
2659 sysctls: vec![Sysctl {
2660 name: "net.ipv4.ip_forward".into(),
2661 value: "1".into(),
2662 }],
2663 tmpfs: vec![Tmpfs {
2664 container_path: "/tmp".into(),
2665 size: 128,
2666 mount_options: vec!["noexec".into()],
2667 }],
2668 privileged: true,
2669 }),
2670 stop_timeout: Some(30),
2671 user: Some("1000:1000".into()),
2672 working_directory: Some("/app".into()),
2673 tty: true,
2674 interactive: true,
2675 readonly_rootfs: true,
2676 };
2677 let argv = build_run_argv(&plan, &[], "t", "host.docker.internal", None, "img", true);
2678 assert!(argv.contains(&"--cap-add".to_string()));
2679 assert!(argv.contains(&"NET_ADMIN".to_string()));
2680 assert!(argv.contains(&"--cap-drop".to_string()));
2681 assert!(argv.contains(&"ALL".to_string()));
2682 assert!(argv.contains(&"--device".to_string()));
2683 assert!(argv.contains(&"/dev/zero:/dev/zerorwm".to_string()));
2684 assert!(argv.contains(&"--init".to_string()));
2685 assert!(argv.contains(&"--shm-size".to_string()));
2686 assert!(argv.contains(&"256m".to_string()));
2687 assert!(argv.contains(&"--sysctl".to_string()));
2688 assert!(argv.contains(&"net.ipv4.ip_forward=1".to_string()));
2689 assert!(argv.contains(&"--tmpfs".to_string()));
2690 assert!(argv.contains(&"--privileged".to_string()));
2691 assert!(argv.contains(&"--stop-timeout".to_string()));
2692 assert!(argv.contains(&"30".to_string()));
2693 assert!(argv.contains(&"--user".to_string()));
2694 assert!(argv.contains(&"1000:1000".to_string()));
2695 assert!(argv.contains(&"--workdir".to_string()));
2696 assert!(argv.contains(&"/app".to_string()));
2697 assert!(argv.contains(&"--tty".to_string()));
2698 assert!(argv.contains(&"--interactive".to_string()));
2699 assert!(argv.contains(&"--read-only".to_string()));
2700 }
2701
2702 #[test]
2703 fn parse_linux_parameters_fills_defaults() {
2704 let raw = serde_json::json!({"initProcessEnabled": true});
2705 let lp = parse_linux_parameters(&raw).expect("parses");
2706 assert!(lp.init_process_enabled);
2707 assert!(!lp.privileged);
2708 assert!(lp.capabilities_add.is_empty());
2709 }
2710
2711 #[test]
2712 fn parse_device_uses_default_permissions() {
2713 let raw = serde_json::json!({"hostPath": "/dev/null", "containerPath": "/dev/null"});
2714 let dev = parse_device(&raw).expect("parses");
2715 assert_eq!(dev.permissions, "rwm");
2716 }
2717
2718 #[test]
2719 fn compute_elbv2_targets_empty_when_no_group() {
2720 let mut accounts: MultiAccountState<EcsState> =
2721 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2722 let acct = accounts.get_or_create("000000000000");
2723 let mut task = make_task("t1");
2724 task.group = None;
2725 acct.tasks.insert("t1".into(), task);
2726 let state = acct.clone();
2727 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2728 assert!(targets.is_empty());
2729 }
2730
2731 #[test]
2732 fn compute_elbv2_targets_bridge_mode_uses_localhost_and_host_port() {
2733 let mut accounts: MultiAccountState<EcsState> =
2734 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2735 let acct = accounts.get_or_create("000000000000");
2736
2737 let td = crate::state::TaskDefinition {
2738 family: "app".into(),
2739 revision: 1,
2740 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2741 container_definitions: Vec::new(),
2742 network_mode: Some("bridge".into()),
2743 status: "ACTIVE".into(),
2744 task_role_arn: None,
2745 execution_role_arn: None,
2746 requires_compatibilities: Vec::new(),
2747 compatibilities: Vec::new(),
2748 cpu: None,
2749 memory: None,
2750 pid_mode: None,
2751 ipc_mode: None,
2752 volumes: Vec::new(),
2753 placement_constraints: Vec::new(),
2754 proxy_configuration: None,
2755 inference_accelerators: Vec::new(),
2756 ephemeral_storage: None,
2757 runtime_platform: None,
2758 requires_attributes: Vec::new(),
2759 registered_at: Utc::now(),
2760 registered_by: None,
2761 deregistered_at: None,
2762 tags: Vec::new(),
2763 enable_fault_injection: None,
2764 };
2765 acct.task_definitions.insert("app".into(), {
2766 let mut m = std::collections::BTreeMap::new();
2767 m.insert(1, td);
2768 m
2769 });
2770
2771 let service = crate::state::Service {
2772 service_name: "svc".into(),
2773 service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
2774 cluster_name: "default".into(),
2775 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2776 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2777 family: "app".into(),
2778 revision: 1,
2779 desired_count: 1,
2780 running_count: 0,
2781 pending_count: 0,
2782 launch_type: "FARGATE".into(),
2783 status: "ACTIVE".into(),
2784 scheduling_strategy: "REPLICA".into(),
2785 deployment_controller: "ECS".into(),
2786 minimum_healthy_percent: Some(0),
2787 maximum_percent: Some(200),
2788 circuit_breaker: None,
2789 deployments: Vec::new(),
2790 load_balancers: vec![serde_json::json!({
2791 "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
2792 "containerName": "app",
2793 "containerPort": 80,
2794 })],
2795 service_registries: Vec::new(),
2796 placement_constraints: Vec::new(),
2797 placement_strategy: Vec::new(),
2798 network_configuration: None,
2799 volume_configurations: vec![],
2800 tags: Vec::new(),
2801 created_at: Utc::now(),
2802 created_by: None,
2803 role_arn: None,
2804 platform_version: None,
2805 health_check_grace_period_seconds: None,
2806 enable_execute_command: false,
2807 enable_ecs_managed_tags: false,
2808 propagate_tags: None,
2809 capacity_provider_strategy: Vec::new(),
2810 availability_zone_rebalancing: None,
2811 };
2812 acct.services.insert(
2813 crate::state::EcsState::service_key("default", "svc"),
2814 service,
2815 );
2816
2817 let mut task = make_task("t1");
2818 task.group = Some("service:svc".into());
2819 task.containers = vec![crate::state::Container {
2820 container_arn: "arn:aws:ecs:us-east-1:000000000000:container/default/abc/app".into(),
2821 name: "app".into(),
2822 image: "alpine".into(),
2823 task_arn: task.task_arn.clone(),
2824 last_status: "RUNNING".into(),
2825 exit_code: None,
2826 reason: None,
2827 runtime_id: Some("dockerid-app".into()),
2828 essential: true,
2829 cpu: None,
2830 memory: None,
2831 memory_reservation: None,
2832 network_bindings: vec![serde_json::json!({
2833 "bindIP": "0.0.0.0",
2834 "containerPort": 80,
2835 "hostPort": 32768,
2836 "protocol": "tcp",
2837 })],
2838 network_interfaces: Vec::new(),
2839 health_status: None,
2840 managed_agents: None,
2841 image_digest: None,
2842 }];
2843 acct.tasks.insert("t1".into(), task);
2844
2845 let state = acct.clone();
2846 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2847 assert_eq!(targets.len(), 1);
2848 let (arn, tg_targets) = &targets[0];
2849 assert_eq!(
2850 arn,
2851 "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
2852 );
2853 assert_eq!(tg_targets.len(), 1);
2854 assert_eq!(tg_targets[0].0, "127.0.0.1");
2855 assert_eq!(tg_targets[0].1, Some(32768));
2856 }
2857
2858 #[test]
2859 fn compute_elbv2_targets_awsvpc_uses_eni_ip() {
2860 let mut accounts: MultiAccountState<EcsState> =
2861 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2862 let acct = accounts.get_or_create("000000000000");
2863
2864 let td = crate::state::TaskDefinition {
2865 family: "app".into(),
2866 revision: 1,
2867 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2868 container_definitions: Vec::new(),
2869 network_mode: Some("awsvpc".into()),
2870 status: "ACTIVE".into(),
2871 task_role_arn: None,
2872 execution_role_arn: None,
2873 requires_compatibilities: Vec::new(),
2874 compatibilities: Vec::new(),
2875 cpu: None,
2876 memory: None,
2877 pid_mode: None,
2878 ipc_mode: None,
2879 volumes: Vec::new(),
2880 placement_constraints: Vec::new(),
2881 proxy_configuration: None,
2882 inference_accelerators: Vec::new(),
2883 ephemeral_storage: None,
2884 runtime_platform: None,
2885 requires_attributes: Vec::new(),
2886 registered_at: Utc::now(),
2887 registered_by: None,
2888 deregistered_at: None,
2889 tags: Vec::new(),
2890 enable_fault_injection: None,
2891 };
2892 acct.task_definitions.insert("app".into(), {
2893 let mut m = std::collections::BTreeMap::new();
2894 m.insert(1, td);
2895 m
2896 });
2897
2898 let service = crate::state::Service {
2899 service_name: "svc".into(),
2900 service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
2901 cluster_name: "default".into(),
2902 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2903 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2904 family: "app".into(),
2905 revision: 1,
2906 desired_count: 1,
2907 running_count: 0,
2908 pending_count: 0,
2909 launch_type: "FARGATE".into(),
2910 status: "ACTIVE".into(),
2911 scheduling_strategy: "REPLICA".into(),
2912 deployment_controller: "ECS".into(),
2913 minimum_healthy_percent: Some(0),
2914 maximum_percent: Some(200),
2915 circuit_breaker: None,
2916 deployments: Vec::new(),
2917 load_balancers: vec![serde_json::json!({
2918 "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
2919 "containerName": "app",
2920 "containerPort": 80,
2921 })],
2922 service_registries: Vec::new(),
2923 placement_constraints: Vec::new(),
2924 placement_strategy: Vec::new(),
2925 network_configuration: None,
2926 volume_configurations: vec![],
2927 tags: Vec::new(),
2928 created_at: Utc::now(),
2929 created_by: None,
2930 role_arn: None,
2931 platform_version: None,
2932 health_check_grace_period_seconds: None,
2933 enable_execute_command: false,
2934 enable_ecs_managed_tags: false,
2935 propagate_tags: None,
2936 capacity_provider_strategy: Vec::new(),
2937 availability_zone_rebalancing: None,
2938 };
2939 acct.services.insert(
2940 crate::state::EcsState::service_key("default", "svc"),
2941 service,
2942 );
2943
2944 let mut task = make_task("t1");
2945 task.group = Some("service:svc".into());
2946 task.attachments = vec![crate::state::TaskAttachment {
2947 id: "eni-123".into(),
2948 attachment_type: "eni".into(),
2949 status: "ATTACHED".into(),
2950 details: vec![
2951 crate::state::AttachmentDetail {
2952 name: "privateIPv4Address".into(),
2953 value: "172.18.0.2".into(),
2954 },
2955 crate::state::AttachmentDetail {
2956 name: "macAddress".into(),
2957 value: "02:42:ac:12:00:02".into(),
2958 },
2959 ],
2960 }];
2961 acct.tasks.insert("t1".into(), task);
2962
2963 let state = acct.clone();
2964 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
2965 assert_eq!(targets.len(), 1);
2966 let (arn, tg_targets) = &targets[0];
2967 assert_eq!(
2968 arn,
2969 "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
2970 );
2971 assert_eq!(tg_targets.len(), 1);
2972 assert_eq!(tg_targets[0].0, "172.18.0.2");
2973 assert_eq!(tg_targets[0].1, Some(80));
2974 }
2975}