use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use base64::Engine;
use chrono::Utc;
use fakecloud_core::delivery::DeliveryBus;
use fakecloud_logs::ingest::{append_events, IngestEvent};
use fakecloud_logs::SharedLogsState;
use fakecloud_secretsmanager::SharedSecretsManagerState;
use fakecloud_ssm::SharedSsmState;
use parking_lot::RwLock;
use tempfile::TempDir;
use tokio::process::Command;
use crate::state::{LifecycleEvent, SharedEcsState};
#[derive(Debug, thiserror::Error)]
pub enum RuntimeError {
#[error("container CLI not found (tried docker, podman)")]
NoCli,
#[error("image pull failed: {0}")]
ImagePull(String),
#[error("container start failed: {0}")]
ContainerStart(String),
#[error("docker wait failed: {0}")]
Wait(String),
}
pub struct EcsRuntime {
cli: String,
net: fakecloud_core::container_net::HostNetworking,
server_port: u16,
docker_config: Option<Arc<TempDir>>,
containers: RwLock<std::collections::HashMap<String, Vec<(String, String)>>>,
delivery_bus: Option<Arc<DeliveryBus>>,
logs_state: Option<SharedLogsState>,
secretsmanager_state: Option<SharedSecretsManagerState>,
ssm_state: Option<SharedSsmState>,
k8s: Option<k8s::K8sTaskBackend>,
}
mod config;
mod k8s;
mod lb;
mod monitoring;
mod secrets;
mod task_lifecycle;
impl EcsRuntime {
pub fn new(server_port: u16) -> Option<Self> {
let cli = fakecloud_core::container_net::detect_container_cli()?;
let net = fakecloud_core::container_net::HostNetworking::detect(&cli);
let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
Some(Self {
cli,
net,
server_port,
docker_config,
containers: RwLock::new(std::collections::HashMap::new()),
delivery_bus: None,
logs_state: None,
secretsmanager_state: None,
ssm_state: None,
k8s: None,
})
}
pub async fn new_k8s(server_port: u16) -> Result<Self, k8s::BackendInitError> {
let backend = k8s::K8sTaskBackend::from_env(server_port).await?;
let net = fakecloud_core::container_net::HostNetworking {
host_alias: String::new(),
add_host_arg: None,
sibling_host: String::new(),
};
Ok(Self {
cli: String::new(),
net,
server_port,
docker_config: None,
containers: RwLock::new(std::collections::HashMap::new()),
delivery_bus: None,
logs_state: None,
secretsmanager_state: None,
ssm_state: None,
k8s: Some(backend),
})
}
pub fn cli_name(&self) -> &str {
if self.k8s.is_some() {
"kubernetes"
} else {
&self.cli
}
}
pub async fn reap_stale(&self) {
if let Some(k) = &self.k8s {
k.reap_stale().await;
}
}
pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
self.delivery_bus = Some(bus);
self
}
pub fn with_logs(mut self, logs: SharedLogsState) -> Self {
self.logs_state = Some(logs);
self
}
}
#[derive(Clone, Debug)]
pub(crate) struct ContainerPlan {
pub(crate) container_name: String,
pub(crate) image: String,
pub(crate) env: Vec<(String, String)>,
pub(crate) entry_point: Vec<String>,
pub(crate) command: Vec<String>,
pub(crate) secrets_refs: Vec<(String, String)>,
pub(crate) essential: bool,
pub(crate) has_task_role: bool,
pub(crate) port_mappings: Vec<PortMapping>,
pub(crate) network_mode: Option<String>,
pub(crate) depends_on: Vec<DependsOn>,
pub(crate) health_check: Option<HealthCheckSpec>,
pub(crate) volume_mounts: Vec<VolumeMount>,
pub(crate) ulimits: Vec<Ulimit>,
pub(crate) linux_parameters: Option<LinuxParameters>,
pub(crate) stop_timeout: Option<u32>,
pub(crate) user: Option<String>,
pub(crate) working_directory: Option<String>,
pub(crate) tty: bool,
pub(crate) interactive: bool,
pub(crate) readonly_rootfs: bool,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct DependsOn {
pub container_name: String,
pub condition: DependsOnCondition,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum DependsOnCondition {
Start,
Complete,
Success,
Healthy,
}
impl DependsOnCondition {
pub fn parse(raw: &str) -> Option<Self> {
match raw {
"START" => Some(Self::Start),
"COMPLETE" => Some(Self::Complete),
"SUCCESS" => Some(Self::Success),
"HEALTHY" => Some(Self::Healthy),
_ => None,
}
}
pub fn as_aws_str(self) -> &'static str {
match self {
Self::Start => "START",
Self::Complete => "COMPLETE",
Self::Success => "SUCCESS",
Self::Healthy => "HEALTHY",
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct HealthCheckSpec {
pub command: Vec<String>,
pub interval_seconds: u32,
pub timeout_seconds: u32,
pub retries: u32,
pub start_period_seconds: u32,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct PortMapping {
pub container_port: u16,
pub host_port: u16,
pub protocol: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct VolumeMount {
pub source: String,
pub container_path: String,
pub read_only: bool,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct Ulimit {
pub name: String,
pub soft_limit: i32,
pub hard_limit: i32,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct Device {
pub host_path: String,
pub container_path: String,
pub permissions: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct Sysctl {
pub name: String,
pub value: String,
}
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub(crate) struct LinuxParameters {
pub capabilities_add: Vec<String>,
pub capabilities_drop: Vec<String>,
pub devices: Vec<Device>,
pub init_process_enabled: bool,
pub shared_memory_size: Option<i32>,
pub sysctls: Vec<Sysctl>,
pub tmpfs: Vec<Tmpfs>,
pub privileged: bool,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct Tmpfs {
pub container_path: String,
pub size: i32,
pub mount_options: Vec<String>,
}
#[derive(Clone, Debug)]
struct ResolvedContainerPlan {
plan: ContainerPlan,
env: Vec<(String, String)>,
}
#[derive(Clone, Debug)]
struct TaskExitOutcome {
exited_index: Option<usize>,
exit_code: i64,
stop_code: &'static str,
}
#[derive(Clone, Debug)]
pub(crate) struct RunningContainer {
pub(crate) name: String,
pub(crate) container_id: String,
pub(crate) essential: bool,
pub(crate) exit_code: Option<i64>,
pub(crate) network_bindings: Vec<serde_json::Value>,
pub(crate) image_digest: Option<String>,
}
pub(crate) fn task_should_stop(containers: &[RunningContainer]) -> bool {
if containers.is_empty() {
return true;
}
let any_essential_exited = containers
.iter()
.any(|c| c.essential && c.exit_code.is_some());
if any_essential_exited {
return true;
}
containers.iter().all(|c| c.exit_code.is_some())
}
pub(crate) fn task_desired_stopped(
state: &SharedEcsState,
account_id: &str,
task_id: &str,
) -> bool {
let accounts = state.read();
match accounts.get(account_id).and_then(|s| s.tasks.get(task_id)) {
Some(task) => task.desired_status == "STOPPED",
None => true,
}
}
fn build_container_plans(
state: &SharedEcsState,
account_id: &str,
task_id: &str,
_server_port: u16,
) -> Result<Vec<ContainerPlan>, RuntimeError> {
let accounts = state.read();
let s = accounts
.get(account_id)
.ok_or_else(|| RuntimeError::ContainerStart("account missing".into()))?;
let task = s
.tasks
.get(task_id)
.ok_or_else(|| RuntimeError::ContainerStart("task missing".into()))?;
if task.containers.is_empty() {
return Err(RuntimeError::ContainerStart(
"task has no containers".into(),
));
}
let has_task_role = task.task_role_arn.is_some();
let task_def = s
.task_definitions
.get(&task.family)
.and_then(|revs| revs.get(&task.revision));
let network_mode = task_def.and_then(|td| td.network_mode.clone());
let volumes_by_name: std::collections::HashMap<String, &serde_json::Value> = task_def
.map(|td| {
td.volumes
.iter()
.filter_map(|v| {
let name = v.get("name").and_then(|n| n.as_str())?;
Some((name.to_string(), v))
})
.collect()
})
.unwrap_or_default();
let mut plans = Vec::with_capacity(task.containers.len());
for container in &task.containers {
let def = find_container_definition(s, &task.family, task.revision, &container.name);
let secrets_refs = def
.as_ref()
.and_then(|d| d.get("secrets").and_then(|v| v.as_array()).cloned())
.map(|arr| {
arr.iter()
.filter_map(|e| {
let name = e.get("name").and_then(|v| v.as_str())?.to_string();
let value_from = e.get("valueFrom").and_then(|v| v.as_str())?.to_string();
Some((name, value_from))
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
let str_array = |key: &str| -> Vec<String> {
def.as_ref()
.and_then(|d| d.get(key).and_then(|v| v.as_array()).cloned())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect::<Vec<_>>()
})
.unwrap_or_default()
};
let env = def
.as_ref()
.and_then(|d| d.get("environment").and_then(|v| v.as_array()).cloned())
.map(|arr| {
arr.iter()
.filter_map(|e| {
let k = e.get("name").and_then(|v| v.as_str())?;
let v = e.get("value").and_then(|v| v.as_str()).unwrap_or("");
Some((k.to_string(), v.to_string()))
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
let port_mappings = def
.as_ref()
.and_then(|d| d.get("portMappings").and_then(|v| v.as_array()).cloned())
.map(|arr| {
arr.iter()
.filter_map(parse_port_mapping)
.collect::<Vec<_>>()
})
.unwrap_or_default();
let depends_on = def
.as_ref()
.and_then(|d| d.get("dependsOn").and_then(|v| v.as_array()).cloned())
.map(|arr| {
arr.iter()
.filter_map(parse_depends_on_entry)
.collect::<Vec<_>>()
})
.unwrap_or_default();
let health_check = def
.as_ref()
.and_then(|d| d.get("healthCheck"))
.and_then(parse_health_check);
let volume_mounts = def
.as_ref()
.and_then(|d| d.get("mountPoints").and_then(|v| v.as_array()).cloned())
.map(|arr| {
arr.iter()
.filter_map(|mp| resolve_mount_point(mp, &volumes_by_name))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let ulimits = def
.as_ref()
.and_then(|d| d.get("ulimits").and_then(|v| v.as_array()).cloned())
.map(|arr| arr.iter().filter_map(parse_ulimit).collect::<Vec<_>>())
.unwrap_or_default();
let linux_parameters = def
.as_ref()
.and_then(|d| d.get("linuxParameters"))
.and_then(parse_linux_parameters);
let stop_timeout = def.as_ref().and_then(|d| {
d.get("stopTimeout")
.and_then(|v| v.as_u64())
.map(|n| n as u32)
});
let user = def
.as_ref()
.and_then(|d| d.get("user").and_then(|v| v.as_str()).map(String::from));
let working_directory = def.as_ref().and_then(|d| {
d.get("workingDirectory")
.and_then(|v| v.as_str())
.map(String::from)
});
let tty = def
.as_ref()
.and_then(|d| d.get("tty").and_then(|v| v.as_bool()))
.unwrap_or(false);
let interactive = def
.as_ref()
.and_then(|d| d.get("interactive").and_then(|v| v.as_bool()))
.unwrap_or(false);
let readonly_rootfs = def
.as_ref()
.and_then(|d| d.get("readonlyRootFilesystem").and_then(|v| v.as_bool()))
.unwrap_or(false);
plans.push(ContainerPlan {
container_name: container.name.clone(),
image: container.image.clone(),
env,
entry_point: str_array("entryPoint"),
command: str_array("command"),
secrets_refs,
essential: container.essential,
has_task_role,
port_mappings,
network_mode: network_mode.clone(),
depends_on,
health_check,
volume_mounts,
ulimits,
linux_parameters,
stop_timeout,
user,
working_directory,
tty,
interactive,
readonly_rootfs,
});
}
let plans = topo_sort_plans(plans);
Ok(plans)
}
fn resolve_mount_point(
mount_point: &serde_json::Value,
volumes_by_name: &std::collections::HashMap<String, &serde_json::Value>,
) -> Option<VolumeMount> {
let container_path = mount_point
.get("containerPath")
.and_then(|v| v.as_str())?
.to_string();
let source_volume = mount_point.get("sourceVolume").and_then(|v| v.as_str())?;
let read_only = mount_point
.get("readOnly")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let volume = volumes_by_name.get(source_volume)?;
let source = resolve_volume_source(source_volume, volume)?;
Some(VolumeMount {
source,
container_path,
read_only,
})
}
fn resolve_volume_source(name: &str, volume: &serde_json::Value) -> Option<String> {
if let Some(host) = volume.get("host") {
if let Some(path) = host.get("sourcePath").and_then(|v| v.as_str()) {
if !path.is_empty() {
ensure_dir_exists(path);
return Some(path.to_string());
}
}
}
if let Some(efs) = volume.get("efsVolumeConfiguration") {
let fs_id = efs.get("fileSystemId").and_then(|v| v.as_str())?;
let root = efs
.get("rootDirectory")
.and_then(|v| v.as_str())
.unwrap_or("/");
return Some(shared_volume_name("efs", fs_id, root));
}
if let Some(fsx) = volume.get("fsxWindowsFileServerVolumeConfiguration") {
let fs_id = fsx.get("fileSystemId").and_then(|v| v.as_str())?;
let root = fsx
.get("rootDirectory")
.and_then(|v| v.as_str())
.unwrap_or("/");
return Some(shared_volume_name("fsx", fs_id, root));
}
if volume.get("dockerVolumeConfiguration").is_some() {
return Some(name.to_string());
}
Some(name.to_string())
}
fn shared_volume_name(kind: &str, fs_id: &str, root: &str) -> String {
let trimmed = root.trim_start_matches('/').trim_end_matches('/');
let fs_id = sanitize_volume_segment(fs_id);
if trimmed.is_empty() {
format!("fakecloud-{kind}-{fs_id}")
} else {
format!(
"fakecloud-{kind}-{fs_id}-{}",
sanitize_volume_segment(trimmed)
)
}
}
fn sanitize_volume_segment(s: &str) -> String {
s.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || matches!(c, '_' | '.' | '-') {
c
} else {
'-'
}
})
.collect()
}
fn ensure_dir_exists(path: &str) {
let _ = std::fs::create_dir_all(path);
}
fn parse_depends_on_entry(value: &serde_json::Value) -> Option<DependsOn> {
let container_name = value
.get("containerName")
.and_then(|v| v.as_str())?
.to_string();
let raw_condition = value.get("condition").and_then(|v| v.as_str())?;
let condition = DependsOnCondition::parse(raw_condition)?;
Some(DependsOn {
container_name,
condition,
})
}
fn topo_sort_plans(plans: Vec<ContainerPlan>) -> Vec<ContainerPlan> {
use std::collections::{HashMap, HashSet};
let names: HashSet<String> = plans.iter().map(|p| p.container_name.clone()).collect();
let index: HashMap<String, usize> = plans
.iter()
.enumerate()
.map(|(i, p)| (p.container_name.clone(), i))
.collect();
let mut in_degree: Vec<usize> = plans
.iter()
.map(|p| {
p.depends_on
.iter()
.filter(|d| names.contains(&d.container_name))
.count()
})
.collect();
let mut dependants: Vec<Vec<usize>> = vec![Vec::new(); plans.len()];
for (i, p) in plans.iter().enumerate() {
for d in &p.depends_on {
if let Some(&di) = index.get(&d.container_name) {
dependants[di].push(i);
}
}
}
let mut ordered: Vec<ContainerPlan> = Vec::with_capacity(plans.len());
let mut emitted: Vec<bool> = vec![false; plans.len()];
loop {
let next = (0..plans.len()).find(|&i| !emitted[i] && in_degree[i] == 0);
match next {
Some(i) => {
emitted[i] = true;
ordered.push(plans[i].clone());
for &di in &dependants[i] {
if in_degree[di] > 0 {
in_degree[di] -= 1;
}
}
}
None => break,
}
}
for (i, p) in plans.into_iter().enumerate() {
if !emitted[i] {
ordered.push(p);
}
}
ordered
}
pub(crate) fn find_depends_on_cycle(
container_definitions: &[serde_json::Value],
) -> Option<(String, String)> {
use std::collections::HashMap;
let names: Vec<String> = container_definitions
.iter()
.filter_map(|c| c.get("name").and_then(|n| n.as_str()).map(String::from))
.collect();
let index: HashMap<&str, usize> = names
.iter()
.enumerate()
.map(|(i, n)| (n.as_str(), i))
.collect();
let mut adj: Vec<Vec<usize>> = vec![Vec::new(); names.len()];
for (i, cd) in container_definitions.iter().enumerate() {
if i >= names.len() {
continue;
}
let Some(deps) = cd.get("dependsOn").and_then(|v| v.as_array()) else {
continue;
};
for d in deps {
let Some(target) = d.get("containerName").and_then(|v| v.as_str()) else {
continue;
};
if let Some(&j) = index.get(target) {
adj[i].push(j);
}
}
}
let mut state = vec![0u8; names.len()];
let mut stack: Vec<(usize, usize)> = Vec::new();
for start in 0..names.len() {
if state[start] != 0 {
continue;
}
stack.clear();
stack.push((start, 0));
state[start] = 1;
while let Some(&(node, next_edge)) = stack.last() {
if next_edge < adj[node].len() {
let nb = adj[node][next_edge];
stack.last_mut().unwrap().1 += 1;
match state[nb] {
0 => {
state[nb] = 1;
stack.push((nb, 0));
}
1 => {
return Some((names[node].clone(), names[nb].clone()));
}
_ => {}
}
} else {
state[node] = 2;
stack.pop();
}
}
}
None
}
#[derive(Debug, Clone)]
struct InspectedState {
started: bool,
exited: bool,
exit_code: i64,
health: Option<String>,
}
async fn inspect_container_state(cli: &str, container_id: &str) -> Option<InspectedState> {
let format =
"{{.State.Status}}|{{.State.Running}}|{{.State.ExitCode}}|{{if .State.Health}}{{.State.Health.Status}}{{else}}<none>{{end}}";
let out = Command::new(cli)
.args(["inspect", "-f", format, container_id])
.output()
.await
.ok()?;
if !out.status.success() {
return None;
}
let raw = String::from_utf8_lossy(&out.stdout).trim().to_string();
let parts: Vec<&str> = raw.split('|').collect();
if parts.len() < 4 {
return None;
}
let status = parts[0];
let running = parts[1] == "true";
let exit_code: i64 = parts[2].parse().unwrap_or(-1);
let health = match parts[3] {
"<none>" | "" => None,
other => Some(other.to_string()),
};
let started = running || status == "exited" || status == "running" || status == "dead";
let exited = status == "exited" || status == "dead";
Some(InspectedState {
started,
exited,
exit_code,
health,
})
}
fn condition_is_met(condition: DependsOnCondition, state: &InspectedState) -> bool {
match condition {
DependsOnCondition::Start => state.started,
DependsOnCondition::Complete => state.exited,
DependsOnCondition::Success => state.exited && state.exit_code == 0,
DependsOnCondition::Healthy => state.health.as_deref() == Some("healthy"),
}
}
#[cfg(test)]
pub(crate) fn __test_parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
parse_port_mapping(value)
}
fn parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
let cmd_arr = value.get("command")?.as_array()?;
let command: Vec<String> = cmd_arr
.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect();
if command.is_empty() {
return None;
}
if command.first().map(|s| s.as_str()) == Some("NONE") {
return None;
}
let read_u32 = |key: &str, default: u32| -> u32 {
value
.get(key)
.and_then(|v| v.as_i64())
.filter(|n| (0..=u32::MAX as i64).contains(n))
.map(|n| n as u32)
.unwrap_or(default)
};
Some(HealthCheckSpec {
command,
interval_seconds: read_u32("interval", 30),
timeout_seconds: read_u32("timeout", 5),
retries: read_u32("retries", 3),
start_period_seconds: read_u32("startPeriod", 0),
})
}
fn parse_ulimit(value: &serde_json::Value) -> Option<Ulimit> {
let name = value.get("name").and_then(|v| v.as_str())?;
let soft = value
.get("softLimit")
.and_then(|v| v.as_i64())
.filter(|n| *n >= 0)? as i32;
let hard = value
.get("hardLimit")
.and_then(|v| v.as_i64())
.filter(|n| *n >= 0)? as i32;
Some(Ulimit {
name: name.to_string(),
soft_limit: soft,
hard_limit: hard,
})
}
fn parse_linux_parameters(value: &serde_json::Value) -> Option<LinuxParameters> {
let mut lp = LinuxParameters::default();
if let Some(arr) = value
.get("capabilities")
.and_then(|v| v.get("add"))
.and_then(|v| v.as_array())
{
lp.capabilities_add = arr
.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect();
}
if let Some(arr) = value
.get("capabilities")
.and_then(|v| v.get("drop"))
.and_then(|v| v.as_array())
{
lp.capabilities_drop = arr
.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect();
}
if let Some(arr) = value.get("devices").and_then(|v| v.as_array()) {
lp.devices = arr.iter().filter_map(parse_device).collect();
}
lp.init_process_enabled = value
.get("initProcessEnabled")
.and_then(|v| v.as_bool())
.unwrap_or(false);
lp.shared_memory_size = value
.get("sharedMemorySize")
.and_then(|v| v.as_i64())
.map(|n| n as i32);
if let Some(arr) = value.get("sysctl").and_then(|v| v.as_array()) {
lp.sysctls = arr.iter().filter_map(parse_sysctl).collect();
}
if let Some(arr) = value.get("tmpfs").and_then(|v| v.as_array()) {
lp.tmpfs = arr.iter().filter_map(parse_tmpfs).collect();
}
lp.privileged = value
.get("privileged")
.and_then(|v| v.as_bool())
.unwrap_or(false);
Some(lp)
}
fn parse_device(value: &serde_json::Value) -> Option<Device> {
let host_path = value.get("hostPath").and_then(|v| v.as_str())?.to_string();
let container_path = value
.get("containerPath")
.and_then(|v| v.as_str())?
.to_string();
let permissions = value
.get("permissions")
.and_then(|v| v.as_str())
.unwrap_or("rwm")
.to_string();
Some(Device {
host_path,
container_path,
permissions,
})
}
fn parse_sysctl(value: &serde_json::Value) -> Option<Sysctl> {
let name = value.get("name").and_then(|v| v.as_str())?.to_string();
let value_str = value.get("value").and_then(|v| v.as_str())?.to_string();
Some(Sysctl {
name,
value: value_str,
})
}
fn parse_tmpfs(value: &serde_json::Value) -> Option<Tmpfs> {
let container_path = value
.get("containerPath")
.and_then(|v| v.as_str())?
.to_string();
let size = value
.get("size")
.and_then(|v| v.as_i64())
.filter(|n| *n > 0)? as i32;
let mount_options = value
.get("mountOptions")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
Some(Tmpfs {
container_path,
size,
mount_options,
})
}
pub(crate) fn render_health_flags(hc: &HealthCheckSpec) -> Vec<String> {
if hc.command.len() < 2 {
return Vec::new();
}
let cmd_kind = hc.command[0].as_str();
if cmd_kind != "CMD" && cmd_kind != "CMD-SHELL" {
return Vec::new();
}
let cmd_string = hc.command[1..].join(" ");
vec![
"--health-cmd".into(),
cmd_string,
format!("--health-interval={}s", hc.interval_seconds),
format!("--health-timeout={}s", hc.timeout_seconds),
format!("--health-retries={}", hc.retries),
format!("--health-start-period={}s", hc.start_period_seconds),
]
}
#[cfg(test)]
pub(crate) fn __test_parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
parse_health_check(value)
}
pub(crate) fn docker_health_to_ecs(raw: &str) -> &'static str {
match raw.trim().to_ascii_lowercase().as_str() {
"healthy" => "HEALTHY",
"unhealthy" => "UNHEALTHY",
_ => "UNKNOWN",
}
}
fn parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
let container_port = value
.get("containerPort")
.and_then(|v| v.as_i64())
.filter(|n| (0..=u16::MAX as i64).contains(n))? as u16;
let host_port_raw = value
.get("hostPort")
.and_then(|v| v.as_i64())
.filter(|n| (0..=u16::MAX as i64).contains(n))
.map(|n| n as u16)
.unwrap_or(0);
let host_port = if host_port_raw == 0 {
container_port
} else {
host_port_raw
};
let protocol = value
.get("protocol")
.and_then(|v| v.as_str())
.map(|s| s.to_ascii_lowercase())
.unwrap_or_else(|| "tcp".to_string());
Some(PortMapping {
container_port,
host_port,
protocol,
})
}
pub(crate) fn fakecloud_instance_label() -> String {
format!("fakecloud-instance=fakecloud-{}", std::process::id())
}
pub(crate) fn build_run_argv(
plan: &ContainerPlan,
env: &[(String, String)],
task_id: &str,
host_alias: &str,
add_host_arg: Option<&str>,
run_image: &str,
awsvpc_network_ready: bool,
) -> Vec<String> {
let mut argv: Vec<String> = Vec::new();
argv.push("run".into());
argv.push("-d".into());
argv.push("--name".into());
argv.push(format!("{}-{}", task_id, plan.container_name));
argv.push("--label".into());
argv.push(format!("fakecloud-ecs-task={}", task_id));
argv.push("--label".into());
argv.push(format!("fakecloud-ecs-container={}", plan.container_name));
argv.push("--label".into());
argv.push(fakecloud_instance_label());
if let Some(arg) = add_host_arg {
argv.push("--add-host".into());
argv.push(arg.to_string());
}
let use_awsvpc_network = plan.network_mode.as_deref() == Some("awsvpc") && awsvpc_network_ready;
if use_awsvpc_network {
argv.push("--network".into());
argv.push(format!("fakecloud-ecs-{}", task_id));
}
let publish_ports = !use_awsvpc_network;
if publish_ports {
for pm in &plan.port_mappings {
argv.push("--publish".into());
argv.push(format!(
"{}:{}/{}",
pm.container_port, pm.host_port, pm.protocol
));
}
}
if let Some(ref hc) = plan.health_check {
argv.extend(render_health_flags(hc));
}
let http_alias_prefix = format!("http://{host_alias}:");
let https_alias_prefix = format!("https://{host_alias}:");
for (k, v) in env {
let transformed = v
.replace("http://127.0.0.1:", http_alias_prefix.as_str())
.replace("https://127.0.0.1:", https_alias_prefix.as_str())
.replace("http://localhost:", http_alias_prefix.as_str())
.replace("https://localhost:", https_alias_prefix.as_str());
argv.push("-e".into());
argv.push(format!("{}={}", k, transformed));
}
for vm in &plan.volume_mounts {
argv.push("-v".into());
let suffix = if vm.read_only { ":ro" } else { "" };
argv.push(format!("{}:{}{}", vm.source, vm.container_path, suffix));
}
for ul in &plan.ulimits {
argv.push("--ulimit".into());
argv.push(format!("{}={}:{}", ul.name, ul.soft_limit, ul.hard_limit));
}
if let Some(ref lp) = plan.linux_parameters {
for cap in &lp.capabilities_add {
argv.push("--cap-add".into());
argv.push(cap.clone());
}
for cap in &lp.capabilities_drop {
argv.push("--cap-drop".into());
argv.push(cap.clone());
}
for dev in &lp.devices {
argv.push("--device".into());
argv.push(format!(
"{}:{}{}",
dev.host_path, dev.container_path, dev.permissions
));
}
if lp.init_process_enabled {
argv.push("--init".into());
}
if let Some(size) = lp.shared_memory_size {
argv.push("--shm-size".into());
argv.push(format!("{}m", size));
}
for sys in &lp.sysctls {
argv.push("--sysctl".into());
argv.push(format!("{}={}", sys.name, sys.value));
}
for tmp in &lp.tmpfs {
let mut opts = tmp.mount_options.join(",");
if !opts.is_empty() {
opts = format!(",{}", opts);
}
argv.push("--tmpfs".into());
argv.push(format!("{}:size={}M{}", tmp.container_path, tmp.size, opts));
}
if lp.privileged {
argv.push("--privileged".into());
}
}
if let Some(timeout) = plan.stop_timeout {
argv.push("--stop-timeout".into());
argv.push(format!("{}", timeout));
}
if let Some(ref user) = plan.user {
argv.push("--user".into());
argv.push(user.clone());
}
if let Some(ref wd) = plan.working_directory {
argv.push("--workdir".into());
argv.push(wd.clone());
}
if plan.tty {
argv.push("--tty".into());
}
if plan.interactive {
argv.push("--interactive".into());
}
if plan.readonly_rootfs {
argv.push("--read-only".into());
}
if let Some(first) = plan.entry_point.first() {
argv.push("--entrypoint".into());
argv.push(first.clone());
}
argv.push(run_image.to_string());
for arg in plan.entry_point.iter().skip(1) {
argv.push(arg.clone());
}
for arg in &plan.command {
argv.push(arg.clone());
}
argv
}
pub(crate) fn network_bindings_for(plan: &ContainerPlan) -> Vec<serde_json::Value> {
if plan.network_mode.as_deref() == Some("awsvpc") {
return Vec::new();
}
plan.port_mappings
.iter()
.map(|pm| {
serde_json::json!({
"bindIP": "0.0.0.0",
"containerPort": pm.container_port,
"hostPort": pm.host_port,
"protocol": pm.protocol,
})
})
.collect()
}
#[allow(clippy::type_complexity)]
pub(crate) fn compute_elbv2_targets(
ecs_state: &crate::state::EcsState,
task: &crate::state::Task,
) -> Vec<(String, Vec<(String, Option<i64>)>)> {
let mut result = Vec::new();
let Some(group) = task.group.as_deref() else {
return result;
};
let service_name = group.strip_prefix("service:").unwrap_or(group);
let key = crate::state::EcsState::service_key(&task.cluster_name, service_name);
let Some(service) = ecs_state.services.get(&key) else {
return result;
};
let network_mode = ecs_state
.task_definitions
.get(&task.family)
.and_then(|revs| revs.get(&task.revision))
.and_then(|td| td.network_mode.as_deref());
for lb in &service.load_balancers {
let tg_arn = lb.get("targetGroupArn").and_then(|v| v.as_str());
let container_name = lb.get("containerName").and_then(|v| v.as_str());
let container_port = lb.get("containerPort").and_then(|v| v.as_i64());
let Some(tg_arn) = tg_arn else { continue };
let Some(container_name) = container_name else {
continue;
};
let target_id = if network_mode == Some("awsvpc") {
task.attachments
.iter()
.find(|a| a.attachment_type == "eni")
.and_then(|eni| {
eni.details
.iter()
.find(|d| d.name == "privateIPv4Address")
.map(|d| d.value.clone())
})
} else {
Some("127.0.0.1".to_string())
};
let port = if network_mode == Some("awsvpc") {
container_port
} else {
task.containers
.iter()
.find(|c| c.name == container_name)
.and_then(|c| {
c.network_bindings
.iter()
.find(|nb| {
nb.get("containerPort").and_then(|v| v.as_i64()) == container_port
})
.and_then(|nb| nb.get("hostPort").and_then(|v| v.as_i64()))
})
};
if let Some(id) = target_id {
if let Some(entry) = result.iter_mut().find(|(arn, _)| arn == tg_arn) {
entry.1.push((id, port));
} else {
result.push((tg_arn.to_string(), vec![(id, port)]));
}
}
}
result
}
struct TaskSnapshot {
task_arn: String,
cluster_arn: String,
launch_type: String,
group: Option<String>,
task_definition_arn: String,
containers: serde_json::Value,
}
fn snapshot_task(state: &SharedEcsState, account_id: &str, task_id: &str) -> Option<TaskSnapshot> {
let accounts = state.read();
let s = accounts.get(account_id)?;
let task = s.tasks.get(task_id)?;
Some(TaskSnapshot {
task_arn: task.task_arn.clone(),
cluster_arn: task.cluster_arn.clone(),
launch_type: task.launch_type.clone(),
group: task.group.clone(),
task_definition_arn: task.task_definition_arn.clone(),
containers: serde_json::Value::Array(
task.containers
.iter()
.map(|c| {
serde_json::json!({
"containerArn": c.container_arn,
"name": c.name,
"image": c.image,
"lastStatus": c.last_status,
"exitCode": c.exit_code,
"reason": c.reason,
})
})
.collect(),
),
})
}
fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
let dir = TempDir::new().ok()?;
let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-ecs-runtime");
let config = serde_json::json!({
"auths": {
format!("127.0.0.1:{server_port}"): { "auth": auth },
format!("host.docker.internal:{server_port}"): { "auth": auth },
}
});
std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
Some(dir)
}
fn find_container_definition(
state: &crate::state::EcsState,
family: &str,
revision: i32,
name: &str,
) -> Option<serde_json::Value> {
state
.task_definitions
.get(family)?
.get(&revision)?
.container_definitions
.iter()
.find(|c| c.get("name").and_then(|v| v.as_str()) == Some(name))
.cloned()
}
fn mark_pull_started(state: &SharedEcsState, account_id: &str, task_id: &str) {
let mut accounts = state.write();
let Some(s) = accounts.get_mut(account_id) else {
return;
};
let task_arn_cluster = s
.tasks
.get(task_id)
.map(|t| (t.task_arn.clone(), t.cluster_arn.clone()));
if let Some(task) = s.tasks.get_mut(task_id) {
task.pull_started_at = Some(Utc::now());
}
if let Some((arn, cluster_arn)) = task_arn_cluster {
s.push_event(LifecycleEvent {
at: Utc::now(),
event_type: "PullStarted".into(),
task_arn: Some(arn),
cluster_arn: Some(cluster_arn),
last_status: Some("PENDING".into()),
detail: serde_json::json!({}),
});
}
}
fn mark_pull_stopped(state: &SharedEcsState, account_id: &str, task_id: &str) {
let mut accounts = state.write();
let Some(s) = accounts.get_mut(account_id) else {
return;
};
if let Some(task) = s.tasks.get_mut(task_id) {
task.pull_stopped_at = Some(Utc::now());
}
}
pub(crate) fn mark_running_multi(
state: &SharedEcsState,
account_id: &str,
task_id: &str,
started: &[RunningContainer],
) {
let mut accounts = state.write();
let Some(s) = accounts.get_mut(account_id) else {
return;
};
let (arn, cluster_arn) = {
let Some(task) = s.tasks.get_mut(task_id) else {
return;
};
task.last_status = "RUNNING".into();
task.connectivity = "CONNECTED".into();
task.connectivity_at = Some(Utc::now());
task.started_at = Some(Utc::now());
for rc in started {
if let Some(c) = task.containers.iter_mut().find(|c| c.name == rc.name) {
c.runtime_id = Some(rc.container_id.clone());
c.last_status = "RUNNING".into();
c.network_bindings = rc.network_bindings.clone();
if rc.image_digest.is_some() {
c.image_digest = rc.image_digest.clone();
}
}
}
if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
cluster.running_tasks_count += 1;
if cluster.pending_tasks_count > 0 {
cluster.pending_tasks_count -= 1;
}
}
if let Some(ref ci_arn) = task.container_instance_arn {
if let Some(ci) = s
.container_instances
.values_mut()
.find(|ci| ci.container_instance_arn == *ci_arn)
{
ci.running_tasks_count += 1;
if ci.pending_tasks_count > 0 {
ci.pending_tasks_count -= 1;
}
}
}
(task.task_arn.clone(), task.cluster_arn.clone())
};
s.push_event(LifecycleEvent {
at: Utc::now(),
event_type: "TaskStateChange".into(),
task_arn: Some(arn),
cluster_arn: Some(cluster_arn),
last_status: Some("RUNNING".into()),
detail: serde_json::json!({}),
});
}
#[allow(clippy::too_many_arguments)]
fn finalize_stopped_multi(
state: &SharedEcsState,
account_id: &str,
task_id: &str,
final_containers: &[RunningContainer],
primary_exit_code: i64,
captured: &str,
stop_code: &str,
stopped_reason: Option<String>,
) {
let mut accounts = state.write();
let Some(s) = accounts.get_mut(account_id) else {
return;
};
let (arn, cluster_arn) = {
let Some(task) = s.tasks.get_mut(task_id) else {
return;
};
task.last_status = "STOPPED".into();
task.desired_status = "STOPPED".into();
task.stopping_at = task.stopping_at.or(Some(Utc::now()));
task.stopped_at = Some(Utc::now());
task.stop_code = Some(stop_code.into());
task.stopped_reason = stopped_reason.or(Some(format!("Exit code {}", primary_exit_code)));
task.captured_logs = captured.to_string();
for c in task.containers.iter_mut() {
c.last_status = "STOPPED".into();
if c.exit_code.is_none() {
let mapped = final_containers
.iter()
.find(|r| r.name == c.name)
.and_then(|r| r.exit_code);
c.exit_code = mapped.or(Some(primary_exit_code));
}
}
if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
if cluster.running_tasks_count > 0 {
cluster.running_tasks_count -= 1;
}
}
if let Some(ref ci_arn) = task.container_instance_arn {
if let Some(ci) = s
.container_instances
.values_mut()
.find(|ci| ci.container_instance_arn == *ci_arn)
{
if ci.running_tasks_count > 0 {
ci.running_tasks_count -= 1;
}
}
}
(task.task_arn.clone(), task.cluster_arn.clone())
};
s.push_event(LifecycleEvent {
at: Utc::now(),
event_type: "TaskStateChange".into(),
task_arn: Some(arn),
cluster_arn: Some(cluster_arn),
last_status: Some("STOPPED".into()),
detail: serde_json::json!({
"exitCode": primary_exit_code,
"stopCode": stop_code,
}),
});
}
fn finalize_failure(state: &SharedEcsState, account_id: &str, task_id: &str, reason: &str) {
let mut accounts = state.write();
let Some(s) = accounts.get_mut(account_id) else {
return;
};
let (arn, cluster_arn) = {
let Some(task) = s.tasks.get_mut(task_id) else {
return;
};
let was_running = task.last_status == "RUNNING";
task.last_status = "STOPPED".into();
task.desired_status = "STOPPED".into();
task.stopped_at = Some(Utc::now());
task.stop_code = Some("TaskFailedToStart".into());
task.stopped_reason = Some(reason.to_string());
task.captured_logs = format!("[task failed to start]: {reason}");
for c in task.containers.iter_mut() {
c.last_status = "STOPPED".into();
c.reason = Some(reason.to_string());
}
if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
if was_running {
if cluster.running_tasks_count > 0 {
cluster.running_tasks_count -= 1;
}
} else if cluster.pending_tasks_count > 0 {
cluster.pending_tasks_count -= 1;
}
}
if let Some(ref ci_arn) = task.container_instance_arn {
if let Some(ci) = s
.container_instances
.values_mut()
.find(|ci| ci.container_instance_arn == *ci_arn)
{
if was_running {
if ci.running_tasks_count > 0 {
ci.running_tasks_count -= 1;
}
} else if ci.pending_tasks_count > 0 {
ci.pending_tasks_count -= 1;
}
}
}
(task.task_arn.clone(), task.cluster_arn.clone())
};
s.push_event(LifecycleEvent {
at: Utc::now(),
event_type: "TaskFailedToStart".into(),
task_arn: Some(arn),
cluster_arn: Some(cluster_arn),
last_status: Some("STOPPED".into()),
detail: serde_json::json!({ "reason": reason }),
});
}
pub async fn sleep(duration: Duration) {
tokio::time::sleep(duration).await;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::state::{EcsState, Task};
use fakecloud_aws::arn::Arn;
use fakecloud_core::multi_account::MultiAccountState;
use parking_lot::RwLock;
use std::sync::Arc;
#[test]
fn cli_available_for_known_missing_binary_is_false() {
assert!(!fakecloud_core::container_net::cli_available(
"definitely-not-a-real-cli-binary-xyz"
));
}
#[test]
fn aws_ecr_uris_translate_for_local_pull() {
assert_eq!(
fakecloud_core::ecr_uri::translate_to_local(
"123456789012.dkr.ecr.us-east-1.amazonaws.com/app:latest",
4566
)
.as_deref(),
Some("127.0.0.1:4566/app:latest")
);
}
fn make_task(task_id: &str) -> Task {
Task {
task_arn: Arn::new(
"ecs",
"us-east-1",
"000000000000",
&format!("task/default/{task_id}"),
)
.to_string(),
task_id: task_id.into(),
cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
cluster_name: "default".into(),
task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
family: "app".into(),
revision: 1,
container_instance_arn: None,
capacity_provider_name: None,
last_status: "PENDING".into(),
desired_status: "RUNNING".into(),
launch_type: "FARGATE".into(),
platform_version: None,
cpu: None,
memory: None,
containers: Vec::new(),
overrides: serde_json::json!({}),
started_by: None,
group: None,
connectivity: "CONNECTING".into(),
stop_code: None,
stopped_reason: None,
created_at: Utc::now(),
started_at: None,
stopping_at: None,
stopped_at: None,
pull_started_at: None,
pull_stopped_at: None,
connectivity_at: None,
started_by_ref_id: None,
execution_role_arn: None,
task_role_arn: None,
tags: Vec::new(),
awslogs: None,
captured_logs: String::new(),
protection: None,
enable_execute_command: false,
attachments: Vec::new(),
volume_configurations: Vec::new(),
task_set_arn: None,
}
}
#[test]
fn finalize_failure_writes_reason_into_captured_logs() {
let mut accounts: MultiAccountState<EcsState> =
MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
let acct = accounts.get_or_create("000000000000");
acct.tasks.insert("t1".into(), make_task("t1"));
let state: SharedEcsState = Arc::new(RwLock::new(accounts));
finalize_failure(
&state,
"000000000000",
"t1",
"failed to resolve secret DB_PASSWORD",
);
let accounts = state.read();
let task = accounts
.get("000000000000")
.unwrap()
.tasks
.get("t1")
.unwrap();
assert_eq!(task.last_status, "STOPPED");
assert_eq!(task.stop_code.as_deref(), Some("TaskFailedToStart"));
assert!(
task.captured_logs
.contains("failed to resolve secret DB_PASSWORD"),
"captured_logs missing reason: {:?}",
task.captured_logs
);
assert!(
task.captured_logs.starts_with("[task failed to start]:"),
"captured_logs missing prefix: {:?}",
task.captured_logs
);
}
#[test]
fn task_desired_stopped_detects_stop_during_launch() {
let mut accounts: MultiAccountState<EcsState> =
MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
let acct = accounts.get_or_create("000000000000");
acct.tasks.insert("running".into(), make_task("running"));
let mut stopping = make_task("stopping");
stopping.desired_status = "STOPPED".into();
acct.tasks.insert("stopping".into(), stopping);
let state: SharedEcsState = Arc::new(RwLock::new(accounts));
assert!(
!task_desired_stopped(&state, "000000000000", "running"),
"a RUNNING task must not be treated as stopped",
);
assert!(
task_desired_stopped(&state, "000000000000", "stopping"),
"a task whose desired_status is STOPPED must be treated as stopped",
);
assert!(
task_desired_stopped(&state, "000000000000", "deleted-mid-launch"),
"a task removed from state mid-launch must be treated as stopped",
);
}
fn make_container(name: &str, essential: bool) -> crate::state::Container {
crate::state::Container {
container_arn: format!(
"arn:aws:ecs:us-east-1:000000000000:container/default/abc/{name}"
),
name: name.into(),
image: "alpine".into(),
task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
last_status: "RUNNING".into(),
exit_code: None,
reason: None,
runtime_id: Some(format!("dockerid-{name}")),
essential,
cpu: None,
memory: None,
memory_reservation: None,
network_bindings: Vec::new(),
network_interfaces: Vec::new(),
health_status: None,
managed_agents: None,
image_digest: None,
}
}
#[test]
fn task_should_stop_when_essential_exits() {
let containers = vec![
RunningContainer {
name: "app".into(),
container_id: "id-app".into(),
essential: true,
exit_code: Some(0),
network_bindings: Vec::new(),
image_digest: None,
},
RunningContainer {
name: "sidecar".into(),
container_id: "id-sc".into(),
essential: false,
exit_code: None,
network_bindings: Vec::new(),
image_digest: None,
},
];
assert!(task_should_stop(&containers));
}
#[test]
fn task_keeps_running_when_only_non_essential_exits() {
let containers = vec![
RunningContainer {
name: "app".into(),
container_id: "id-app".into(),
essential: true,
exit_code: None,
network_bindings: Vec::new(),
image_digest: None,
},
RunningContainer {
name: "sidecar".into(),
container_id: "id-sc".into(),
essential: false,
exit_code: Some(0),
network_bindings: Vec::new(),
image_digest: None,
},
];
assert!(!task_should_stop(&containers));
}
#[test]
fn task_stops_when_all_non_essentials_exit() {
let containers = vec![
RunningContainer {
name: "a".into(),
container_id: "id-a".into(),
essential: false,
exit_code: Some(0),
network_bindings: Vec::new(),
image_digest: None,
},
RunningContainer {
name: "b".into(),
container_id: "id-b".into(),
essential: false,
exit_code: Some(1),
network_bindings: Vec::new(),
image_digest: None,
},
];
assert!(task_should_stop(&containers));
}
#[test]
fn finalize_stopped_multi_assigns_per_container_exit_codes() {
let mut accounts: MultiAccountState<EcsState> =
MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
let acct = accounts.get_or_create("000000000000");
let mut t = make_task("t1");
t.containers = vec![
make_container("app", true),
make_container("sidecar", false),
];
acct.tasks.insert("t1".into(), t);
let state: SharedEcsState = Arc::new(RwLock::new(accounts));
let final_containers = vec![
RunningContainer {
name: "app".into(),
container_id: "id-app".into(),
essential: true,
exit_code: Some(0),
network_bindings: Vec::new(),
image_digest: None,
},
RunningContainer {
name: "sidecar".into(),
container_id: "id-sc".into(),
essential: false,
exit_code: Some(137),
network_bindings: Vec::new(),
image_digest: None,
},
];
finalize_stopped_multi(
&state,
"000000000000",
"t1",
&final_containers,
0,
"captured",
"EssentialContainerExited",
None,
);
let accounts = state.read();
let task = accounts
.get("000000000000")
.unwrap()
.tasks
.get("t1")
.unwrap();
assert_eq!(task.last_status, "STOPPED");
assert_eq!(task.stop_code.as_deref(), Some("EssentialContainerExited"));
let app = task.containers.iter().find(|c| c.name == "app").unwrap();
let sc = task
.containers
.iter()
.find(|c| c.name == "sidecar")
.unwrap();
assert_eq!(app.exit_code, Some(0));
assert_eq!(sc.exit_code, Some(137));
assert_eq!(app.last_status, "STOPPED");
assert_eq!(sc.last_status, "STOPPED");
}
fn plan(name: &str, deps: &[&str]) -> ContainerPlan {
ContainerPlan {
container_name: name.into(),
image: "alpine".into(),
env: Vec::new(),
entry_point: Vec::new(),
command: Vec::new(),
secrets_refs: Vec::new(),
essential: true,
has_task_role: false,
port_mappings: Vec::new(),
network_mode: None,
depends_on: deps
.iter()
.map(|s| DependsOn {
container_name: (*s).to_string(),
condition: DependsOnCondition::Start,
})
.collect(),
health_check: None,
volume_mounts: Vec::new(),
ulimits: Vec::new(),
linux_parameters: None,
stop_timeout: None,
user: None,
working_directory: None,
tty: false,
interactive: false,
readonly_rootfs: false,
}
}
#[test]
fn topo_sort_orders_by_depends_on() {
let plans = vec![plan("sidecar", &["app"]), plan("app", &[])];
let ordered = topo_sort_plans(plans);
assert_eq!(ordered[0].container_name, "app");
assert_eq!(ordered[1].container_name, "sidecar");
}
#[test]
fn topo_sort_preserves_declaration_order_when_no_deps() {
let plans = vec![plan("first", &[]), plan("second", &[]), plan("third", &[])];
let ordered = topo_sort_plans(plans);
let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
assert_eq!(names, vec!["first", "second", "third"]);
}
#[test]
fn topo_sort_handles_chain() {
let plans = vec![plan("c", &["b"]), plan("b", &["a"]), plan("a", &[])];
let ordered = topo_sort_plans(plans);
let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
assert_eq!(names, vec!["a", "b", "c"]);
}
#[test]
fn topo_sort_ignores_unknown_dependency() {
let plans = vec![plan("only", &["does-not-exist"])];
let ordered = topo_sort_plans(plans);
assert_eq!(ordered.len(), 1);
assert_eq!(ordered[0].container_name, "only");
}
#[test]
fn topo_sort_recovers_from_cycle() {
let plans = vec![plan("a", &["b"]), plan("b", &["a"])];
let ordered = topo_sort_plans(plans);
assert_eq!(ordered.len(), 2);
}
#[test]
fn parse_health_check_fills_aws_defaults() {
let v = serde_json::json!({
"command": ["CMD-SHELL", "curl -f http://localhost/ || exit 1"],
});
let hc = __test_parse_health_check(&v).expect("parsed");
assert_eq!(hc.command[0], "CMD-SHELL");
assert_eq!(hc.interval_seconds, 30);
assert_eq!(hc.timeout_seconds, 5);
assert_eq!(hc.retries, 3);
assert_eq!(hc.start_period_seconds, 0);
}
#[test]
fn parse_health_check_overrides_explicit_values() {
let v = serde_json::json!({
"command": ["CMD", "/probe"],
"interval": 7,
"timeout": 2,
"retries": 9,
"startPeriod": 12,
});
let hc = __test_parse_health_check(&v).expect("parsed");
assert_eq!(hc.interval_seconds, 7);
assert_eq!(hc.timeout_seconds, 2);
assert_eq!(hc.retries, 9);
assert_eq!(hc.start_period_seconds, 12);
}
#[test]
fn parse_health_check_returns_none_for_none_sentinel() {
let v = serde_json::json!({ "command": ["NONE"] });
assert!(__test_parse_health_check(&v).is_none());
}
#[test]
fn parse_health_check_returns_none_for_missing_command() {
let v = serde_json::json!({ "interval": 30 });
assert!(__test_parse_health_check(&v).is_none());
}
#[test]
fn render_health_flags_emits_full_set_for_cmd_shell() {
let hc = HealthCheckSpec {
command: vec!["CMD-SHELL".into(), "curl -f http://localhost/".into()],
interval_seconds: 15,
timeout_seconds: 3,
retries: 4,
start_period_seconds: 10,
};
let flags = render_health_flags(&hc);
assert_eq!(flags[0], "--health-cmd");
assert_eq!(flags[1], "curl -f http://localhost/");
assert!(flags.contains(&"--health-interval=15s".to_string()));
assert!(flags.contains(&"--health-timeout=3s".to_string()));
assert!(flags.contains(&"--health-retries=4".to_string()));
assert!(flags.contains(&"--health-start-period=10s".to_string()));
}
#[test]
fn render_health_flags_joins_cmd_argv_with_spaces() {
let hc = HealthCheckSpec {
command: vec![
"CMD".into(),
"/bin/probe".into(),
"--port".into(),
"8080".into(),
],
interval_seconds: 30,
timeout_seconds: 5,
retries: 3,
start_period_seconds: 0,
};
let flags = render_health_flags(&hc);
assert_eq!(flags[1], "/bin/probe --port 8080");
}
#[test]
fn build_run_argv_emits_health_flags_when_present() {
let plan = ContainerPlan {
container_name: "app".into(),
image: "alpine".into(),
env: Vec::new(),
entry_point: Vec::new(),
command: Vec::new(),
secrets_refs: Vec::new(),
essential: true,
has_task_role: false,
port_mappings: Vec::new(),
network_mode: None,
depends_on: Vec::new(),
health_check: Some(HealthCheckSpec {
command: vec!["CMD-SHELL".into(), "true".into()],
interval_seconds: 5,
timeout_seconds: 2,
retries: 1,
start_period_seconds: 1,
}),
volume_mounts: Vec::new(),
ulimits: Vec::new(),
linux_parameters: None,
stop_timeout: None,
user: None,
working_directory: None,
tty: false,
interactive: false,
readonly_rootfs: false,
};
let argv = build_run_argv(
&plan,
&[],
"task-1",
"host.docker.internal",
None,
"alpine",
true,
);
let joined = argv.join(" ");
assert!(joined.contains("--health-cmd true"), "argv: {joined}");
assert!(joined.contains("--health-interval=5s"), "argv: {joined}");
assert!(joined.contains("--health-timeout=2s"), "argv: {joined}");
assert!(joined.contains("--health-retries=1"), "argv: {joined}");
assert!(
joined.contains("--health-start-period=1s"),
"argv: {joined}"
);
}
#[test]
fn build_run_argv_emits_no_health_flags_when_absent() {
let plan = ContainerPlan {
container_name: "app".into(),
image: "alpine".into(),
env: Vec::new(),
entry_point: Vec::new(),
command: Vec::new(),
secrets_refs: Vec::new(),
essential: true,
has_task_role: false,
port_mappings: Vec::new(),
network_mode: None,
depends_on: Vec::new(),
health_check: None,
volume_mounts: Vec::new(),
ulimits: Vec::new(),
linux_parameters: None,
stop_timeout: None,
user: None,
working_directory: None,
tty: false,
interactive: false,
readonly_rootfs: false,
};
let argv = build_run_argv(
&plan,
&[],
"task-1",
"host.docker.internal",
None,
"alpine",
true,
);
assert!(!argv.iter().any(|s| s.starts_with("--health")));
}
#[test]
fn docker_health_to_ecs_maps_known_states() {
assert_eq!(docker_health_to_ecs("healthy"), "HEALTHY");
assert_eq!(docker_health_to_ecs("HEALTHY"), "HEALTHY");
assert_eq!(docker_health_to_ecs("unhealthy"), "UNHEALTHY");
assert_eq!(docker_health_to_ecs("starting"), "UNKNOWN");
assert_eq!(docker_health_to_ecs("none"), "UNKNOWN");
assert_eq!(docker_health_to_ecs(""), "UNKNOWN");
}
#[test]
fn resolve_host_bind_volume_uses_source_path() {
let mut volumes = std::collections::HashMap::new();
let v = serde_json::json!({
"name": "data",
"host": { "sourcePath": "/var/lib/myapp" }
});
volumes.insert("data".to_string(), &v);
let mp = serde_json::json!({
"sourceVolume": "data",
"containerPath": "/app/data",
"readOnly": false
});
let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
assert_eq!(resolved.source, "/var/lib/myapp");
assert_eq!(resolved.container_path, "/app/data");
assert!(!resolved.read_only);
}
#[test]
fn read_only_mount_renders_ro_suffix() {
let plan = ContainerPlan {
container_name: "app".into(),
image: "alpine".into(),
env: Vec::new(),
entry_point: Vec::new(),
command: Vec::new(),
secrets_refs: Vec::new(),
essential: true,
has_task_role: false,
port_mappings: Vec::new(),
network_mode: None,
depends_on: Vec::new(),
health_check: None,
volume_mounts: vec![VolumeMount {
source: "/host/path".into(),
container_path: "/in/container".into(),
read_only: true,
}],
ulimits: Vec::new(),
linux_parameters: None,
stop_timeout: None,
user: None,
working_directory: None,
tty: false,
interactive: false,
readonly_rootfs: false,
};
let argv = build_run_argv(
&plan,
&[],
"task-1",
"host.docker.internal",
None,
"alpine",
true,
);
let pair = argv
.windows(2)
.find(|w| w[0] == "-v")
.expect("expected -v flag");
assert_eq!(pair[1], "/host/path:/in/container:ro");
}
#[test]
fn resolve_efs_volume_uses_stub_dir() {
let mut volumes = std::collections::HashMap::new();
let v = serde_json::json!({
"name": "efs-vol",
"efsVolumeConfiguration": {
"fileSystemId": "fs-12345678",
"rootDirectory": "/exports/app"
}
});
volumes.insert("efs-vol".to_string(), &v);
let mp = serde_json::json!({
"sourceVolume": "efs-vol",
"containerPath": "/mnt/efs"
});
let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
assert_eq!(resolved.source, "fakecloud-efs-fs-12345678-exports-app");
assert_eq!(resolved.container_path, "/mnt/efs");
}
#[test]
fn efs_without_root_directory_uses_filesystem_root() {
assert_eq!(
shared_volume_name("efs", "fs-abc", "/"),
"fakecloud-efs-fs-abc"
);
assert_eq!(
shared_volume_name("efs", "fs-abc", ""),
"fakecloud-efs-fs-abc"
);
}
#[test]
fn resolve_docker_named_volume_uses_volume_name() {
let mut volumes = std::collections::HashMap::new();
let v = serde_json::json!({
"name": "named-vol",
"dockerVolumeConfiguration": {
"scope": "task",
"driver": "local"
}
});
volumes.insert("named-vol".to_string(), &v);
let mp = serde_json::json!({
"sourceVolume": "named-vol",
"containerPath": "/data"
});
let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
assert_eq!(resolved.source, "named-vol");
assert_eq!(resolved.container_path, "/data");
}
#[test]
fn resolve_fsx_volume_uses_stub_dir() {
let mut volumes = std::collections::HashMap::new();
let v = serde_json::json!({
"name": "fsx-vol",
"fsxWindowsFileServerVolumeConfiguration": {
"fileSystemId": "fs-xyz",
"rootDirectory": "share"
}
});
volumes.insert("fsx-vol".to_string(), &v);
let mp = serde_json::json!({
"sourceVolume": "fsx-vol",
"containerPath": "C:\\data"
});
let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
assert_eq!(resolved.source, "fakecloud-fsx-fs-xyz-share");
}
#[test]
fn unknown_source_volume_returns_none() {
let volumes = std::collections::HashMap::new();
let mp = serde_json::json!({
"sourceVolume": "missing",
"containerPath": "/x"
});
assert!(resolve_mount_point(&mp, &volumes).is_none());
}
#[test]
fn find_depends_on_cycle_detects_two_node_cycle() {
let cds = vec![
serde_json::json!({
"name": "a",
"image": "alpine",
"dependsOn": [{"containerName": "b", "condition": "START"}],
}),
serde_json::json!({
"name": "b",
"image": "alpine",
"dependsOn": [{"containerName": "a", "condition": "START"}],
}),
];
let cycle = find_depends_on_cycle(&cds);
assert!(cycle.is_some(), "expected cycle to be detected");
}
#[test]
fn find_depends_on_cycle_accepts_chain() {
let cds = vec![
serde_json::json!({
"name": "a",
"image": "alpine",
"dependsOn": [{"containerName": "b", "condition": "START"}],
}),
serde_json::json!({
"name": "b",
"image": "alpine",
"dependsOn": [{"containerName": "c", "condition": "START"}],
}),
serde_json::json!({
"name": "c",
"image": "alpine",
}),
];
assert!(find_depends_on_cycle(&cds).is_none());
}
#[test]
fn find_depends_on_cycle_ignores_unknown_target() {
let cds = vec![serde_json::json!({
"name": "only",
"image": "alpine",
"dependsOn": [{"containerName": "ghost", "condition": "START"}],
})];
assert!(find_depends_on_cycle(&cds).is_none());
}
#[test]
fn condition_is_met_matches_aws_semantics() {
let running = InspectedState {
started: true,
exited: false,
exit_code: 0,
health: None,
};
let exited_ok = InspectedState {
started: true,
exited: true,
exit_code: 0,
health: None,
};
let exited_fail = InspectedState {
started: true,
exited: true,
exit_code: 1,
health: None,
};
let healthy = InspectedState {
started: true,
exited: false,
exit_code: 0,
health: Some("healthy".into()),
};
assert!(condition_is_met(DependsOnCondition::Start, &running));
assert!(condition_is_met(DependsOnCondition::Start, &exited_ok));
assert!(!condition_is_met(DependsOnCondition::Complete, &running));
assert!(condition_is_met(DependsOnCondition::Complete, &exited_ok));
assert!(condition_is_met(DependsOnCondition::Complete, &exited_fail));
assert!(!condition_is_met(DependsOnCondition::Success, &running));
assert!(condition_is_met(DependsOnCondition::Success, &exited_ok));
assert!(!condition_is_met(DependsOnCondition::Success, &exited_fail));
assert!(!condition_is_met(DependsOnCondition::Healthy, &running));
assert!(condition_is_met(DependsOnCondition::Healthy, &healthy));
}
#[test]
fn depends_on_condition_parse_round_trips() {
assert_eq!(
DependsOnCondition::parse("START"),
Some(DependsOnCondition::Start)
);
assert_eq!(
DependsOnCondition::parse("COMPLETE"),
Some(DependsOnCondition::Complete)
);
assert_eq!(
DependsOnCondition::parse("SUCCESS"),
Some(DependsOnCondition::Success)
);
assert_eq!(
DependsOnCondition::parse("HEALTHY"),
Some(DependsOnCondition::Healthy)
);
assert_eq!(DependsOnCondition::parse("start"), None);
assert_eq!(DependsOnCondition::parse("ANY"), None);
}
#[test]
fn build_run_argv_emits_ulimits() {
let plan = ContainerPlan {
container_name: "app".into(),
image: "alpine".into(),
env: Vec::new(),
entry_point: Vec::new(),
command: Vec::new(),
secrets_refs: Vec::new(),
essential: true,
has_task_role: false,
port_mappings: Vec::new(),
network_mode: None,
depends_on: Vec::new(),
health_check: None,
volume_mounts: Vec::new(),
ulimits: vec![Ulimit {
name: "nofile".into(),
soft_limit: 1024,
hard_limit: 2048,
}],
linux_parameters: None,
stop_timeout: None,
user: None,
working_directory: None,
tty: false,
interactive: false,
readonly_rootfs: false,
};
let argv = build_run_argv(&plan, &[], "t", "host.docker.internal", None, "img", true);
assert!(argv.contains(&"--ulimit".to_string()));
assert!(argv.contains(&"nofile=1024:2048".to_string()));
}
#[test]
fn build_run_argv_emits_linux_parameters() {
let plan = ContainerPlan {
container_name: "app".into(),
image: "alpine".into(),
env: Vec::new(),
entry_point: Vec::new(),
command: Vec::new(),
secrets_refs: Vec::new(),
essential: true,
has_task_role: false,
port_mappings: Vec::new(),
network_mode: None,
depends_on: Vec::new(),
health_check: None,
volume_mounts: Vec::new(),
ulimits: Vec::new(),
linux_parameters: Some(LinuxParameters {
capabilities_add: vec!["NET_ADMIN".into()],
capabilities_drop: vec!["ALL".into()],
devices: vec![Device {
host_path: "/dev/zero".into(),
container_path: "/dev/zero".into(),
permissions: "rwm".into(),
}],
init_process_enabled: true,
shared_memory_size: Some(256),
sysctls: vec![Sysctl {
name: "net.ipv4.ip_forward".into(),
value: "1".into(),
}],
tmpfs: vec![Tmpfs {
container_path: "/tmp".into(),
size: 128,
mount_options: vec!["noexec".into()],
}],
privileged: true,
}),
stop_timeout: Some(30),
user: Some("1000:1000".into()),
working_directory: Some("/app".into()),
tty: true,
interactive: true,
readonly_rootfs: true,
};
let argv = build_run_argv(&plan, &[], "t", "host.docker.internal", None, "img", true);
assert!(argv.contains(&"--cap-add".to_string()));
assert!(argv.contains(&"NET_ADMIN".to_string()));
assert!(argv.contains(&"--cap-drop".to_string()));
assert!(argv.contains(&"ALL".to_string()));
assert!(argv.contains(&"--device".to_string()));
assert!(argv.contains(&"/dev/zero:/dev/zerorwm".to_string()));
assert!(argv.contains(&"--init".to_string()));
assert!(argv.contains(&"--shm-size".to_string()));
assert!(argv.contains(&"256m".to_string()));
assert!(argv.contains(&"--sysctl".to_string()));
assert!(argv.contains(&"net.ipv4.ip_forward=1".to_string()));
assert!(argv.contains(&"--tmpfs".to_string()));
assert!(argv.contains(&"--privileged".to_string()));
assert!(argv.contains(&"--stop-timeout".to_string()));
assert!(argv.contains(&"30".to_string()));
assert!(argv.contains(&"--user".to_string()));
assert!(argv.contains(&"1000:1000".to_string()));
assert!(argv.contains(&"--workdir".to_string()));
assert!(argv.contains(&"/app".to_string()));
assert!(argv.contains(&"--tty".to_string()));
assert!(argv.contains(&"--interactive".to_string()));
assert!(argv.contains(&"--read-only".to_string()));
}
#[test]
fn parse_linux_parameters_fills_defaults() {
let raw = serde_json::json!({"initProcessEnabled": true});
let lp = parse_linux_parameters(&raw).expect("parses");
assert!(lp.init_process_enabled);
assert!(!lp.privileged);
assert!(lp.capabilities_add.is_empty());
}
#[test]
fn parse_device_uses_default_permissions() {
let raw = serde_json::json!({"hostPath": "/dev/null", "containerPath": "/dev/null"});
let dev = parse_device(&raw).expect("parses");
assert_eq!(dev.permissions, "rwm");
}
#[test]
fn compute_elbv2_targets_empty_when_no_group() {
let mut accounts: MultiAccountState<EcsState> =
MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
let acct = accounts.get_or_create("000000000000");
let mut task = make_task("t1");
task.group = None;
acct.tasks.insert("t1".into(), task);
let state = acct.clone();
let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
assert!(targets.is_empty());
}
#[test]
fn compute_elbv2_targets_bridge_mode_uses_localhost_and_host_port() {
let mut accounts: MultiAccountState<EcsState> =
MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
let acct = accounts.get_or_create("000000000000");
let td = crate::state::TaskDefinition {
family: "app".into(),
revision: 1,
task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
container_definitions: Vec::new(),
network_mode: Some("bridge".into()),
status: "ACTIVE".into(),
task_role_arn: None,
execution_role_arn: None,
requires_compatibilities: Vec::new(),
compatibilities: Vec::new(),
cpu: None,
memory: None,
pid_mode: None,
ipc_mode: None,
volumes: Vec::new(),
placement_constraints: Vec::new(),
proxy_configuration: None,
inference_accelerators: Vec::new(),
ephemeral_storage: None,
runtime_platform: None,
requires_attributes: Vec::new(),
registered_at: Utc::now(),
registered_by: None,
deregistered_at: None,
tags: Vec::new(),
enable_fault_injection: None,
};
acct.task_definitions.insert("app".into(), {
let mut m = std::collections::BTreeMap::new();
m.insert(1, td);
m
});
let service = crate::state::Service {
service_name: "svc".into(),
service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
cluster_name: "default".into(),
cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
family: "app".into(),
revision: 1,
desired_count: 1,
running_count: 0,
pending_count: 0,
launch_type: "FARGATE".into(),
status: "ACTIVE".into(),
scheduling_strategy: "REPLICA".into(),
deployment_controller: "ECS".into(),
minimum_healthy_percent: Some(0),
maximum_percent: Some(200),
circuit_breaker: None,
deployments: Vec::new(),
load_balancers: vec![serde_json::json!({
"targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
"containerName": "app",
"containerPort": 80,
})],
service_registries: Vec::new(),
placement_constraints: Vec::new(),
placement_strategy: Vec::new(),
network_configuration: None,
volume_configurations: vec![],
tags: Vec::new(),
created_at: Utc::now(),
created_by: None,
role_arn: None,
platform_version: None,
health_check_grace_period_seconds: None,
enable_execute_command: false,
enable_ecs_managed_tags: false,
propagate_tags: None,
capacity_provider_strategy: Vec::new(),
availability_zone_rebalancing: None,
};
acct.services.insert(
crate::state::EcsState::service_key("default", "svc"),
service,
);
let mut task = make_task("t1");
task.group = Some("service:svc".into());
task.containers = vec![crate::state::Container {
container_arn: "arn:aws:ecs:us-east-1:000000000000:container/default/abc/app".into(),
name: "app".into(),
image: "alpine".into(),
task_arn: task.task_arn.clone(),
last_status: "RUNNING".into(),
exit_code: None,
reason: None,
runtime_id: Some("dockerid-app".into()),
essential: true,
cpu: None,
memory: None,
memory_reservation: None,
network_bindings: vec![serde_json::json!({
"bindIP": "0.0.0.0",
"containerPort": 80,
"hostPort": 32768,
"protocol": "tcp",
})],
network_interfaces: Vec::new(),
health_status: None,
managed_agents: None,
image_digest: None,
}];
acct.tasks.insert("t1".into(), task);
let state = acct.clone();
let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
assert_eq!(targets.len(), 1);
let (arn, tg_targets) = &targets[0];
assert_eq!(
arn,
"arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
);
assert_eq!(tg_targets.len(), 1);
assert_eq!(tg_targets[0].0, "127.0.0.1");
assert_eq!(tg_targets[0].1, Some(32768));
}
#[test]
fn compute_elbv2_targets_awsvpc_uses_eni_ip() {
let mut accounts: MultiAccountState<EcsState> =
MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
let acct = accounts.get_or_create("000000000000");
let td = crate::state::TaskDefinition {
family: "app".into(),
revision: 1,
task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
container_definitions: Vec::new(),
network_mode: Some("awsvpc".into()),
status: "ACTIVE".into(),
task_role_arn: None,
execution_role_arn: None,
requires_compatibilities: Vec::new(),
compatibilities: Vec::new(),
cpu: None,
memory: None,
pid_mode: None,
ipc_mode: None,
volumes: Vec::new(),
placement_constraints: Vec::new(),
proxy_configuration: None,
inference_accelerators: Vec::new(),
ephemeral_storage: None,
runtime_platform: None,
requires_attributes: Vec::new(),
registered_at: Utc::now(),
registered_by: None,
deregistered_at: None,
tags: Vec::new(),
enable_fault_injection: None,
};
acct.task_definitions.insert("app".into(), {
let mut m = std::collections::BTreeMap::new();
m.insert(1, td);
m
});
let service = crate::state::Service {
service_name: "svc".into(),
service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
cluster_name: "default".into(),
cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
family: "app".into(),
revision: 1,
desired_count: 1,
running_count: 0,
pending_count: 0,
launch_type: "FARGATE".into(),
status: "ACTIVE".into(),
scheduling_strategy: "REPLICA".into(),
deployment_controller: "ECS".into(),
minimum_healthy_percent: Some(0),
maximum_percent: Some(200),
circuit_breaker: None,
deployments: Vec::new(),
load_balancers: vec![serde_json::json!({
"targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
"containerName": "app",
"containerPort": 80,
})],
service_registries: Vec::new(),
placement_constraints: Vec::new(),
placement_strategy: Vec::new(),
network_configuration: None,
volume_configurations: vec![],
tags: Vec::new(),
created_at: Utc::now(),
created_by: None,
role_arn: None,
platform_version: None,
health_check_grace_period_seconds: None,
enable_execute_command: false,
enable_ecs_managed_tags: false,
propagate_tags: None,
capacity_provider_strategy: Vec::new(),
availability_zone_rebalancing: None,
};
acct.services.insert(
crate::state::EcsState::service_key("default", "svc"),
service,
);
let mut task = make_task("t1");
task.group = Some("service:svc".into());
task.attachments = vec![crate::state::TaskAttachment {
id: "eni-123".into(),
attachment_type: "eni".into(),
status: "ATTACHED".into(),
details: vec![
crate::state::AttachmentDetail {
name: "privateIPv4Address".into(),
value: "172.18.0.2".into(),
},
crate::state::AttachmentDetail {
name: "macAddress".into(),
value: "02:42:ac:12:00:02".into(),
},
],
}];
acct.tasks.insert("t1".into(), task);
let state = acct.clone();
let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
assert_eq!(targets.len(), 1);
let (arn, tg_targets) = &targets[0];
assert_eq!(
arn,
"arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
);
assert_eq!(tg_targets.len(), 1);
assert_eq!(tg_targets[0].0, "172.18.0.2");
assert_eq!(tg_targets[0].1, Some(80));
}
fn minimal_plan() -> ContainerPlan {
ContainerPlan {
container_name: "app".into(),
image: "alpine".into(),
env: Vec::new(),
entry_point: Vec::new(),
command: Vec::new(),
secrets_refs: Vec::new(),
essential: true,
has_task_role: false,
port_mappings: Vec::new(),
network_mode: None,
depends_on: Vec::new(),
health_check: None,
volume_mounts: Vec::new(),
ulimits: Vec::new(),
linux_parameters: None,
stop_timeout: None,
user: None,
working_directory: None,
tty: false,
interactive: false,
readonly_rootfs: false,
}
}
#[test]
fn build_run_argv_emits_fakecloud_instance_label() {
let plan = minimal_plan();
let argv = build_run_argv(
&plan,
&[],
"task-1",
"host.docker.internal",
None,
"alpine",
true,
);
let expected = fakecloud_instance_label();
assert!(
argv.windows(2)
.any(|w| w[0] == "--label" && w[1] == expected),
"argv must contain `--label {expected}`: {argv:?}",
);
}
#[test]
fn fakecloud_instance_label_matches_reaper_format() {
let label = fakecloud_instance_label();
let (key, value) = label.split_once('=').expect("label is key=value");
assert_eq!(key, "fakecloud-instance");
let pid_str = value
.strip_prefix("fakecloud-")
.expect("value starts with fakecloud-");
assert_eq!(
pid_str.parse::<u32>().ok(),
Some(std::process::id()),
"reaper must be able to parse the owning pid out of {label}",
);
}
}