use std::sync::Arc;
use async_trait::async_trait;
use chrono::Utc;
use http::StatusCode;
use serde_json::{json, Map, Value};
use tokio::sync::Mutex as AsyncMutex;
use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
use fakecloud_persistence::SnapshotStore;
use crate::state::{
Attribute, AttributeRef, AwsLogsConfig, CapacityProvider, CircuitBreakerConfig, Cluster,
Container, ContainerInstance, Deployment, EcsSnapshot, EcsState, Service, SharedEcsState,
TagEntry, Task, TaskDefinition, TaskSet, ECS_SNAPSHOT_SCHEMA_VERSION,
};
const SUPPORTED_ACTIONS: &[&str] = &[
"CreateCluster",
"DescribeClusters",
"DeleteCluster",
"ListClusters",
"UpdateCluster",
"UpdateClusterSettings",
"PutClusterCapacityProviders",
"RegisterTaskDefinition",
"DescribeTaskDefinition",
"DeregisterTaskDefinition",
"DeleteTaskDefinitions",
"ListTaskDefinitions",
"ListTaskDefinitionFamilies",
"TagResource",
"UntagResource",
"ListTagsForResource",
"PutAccountSetting",
"PutAccountSettingDefault",
"DeleteAccountSetting",
"ListAccountSettings",
"RunTask",
"StartTask",
"StopTask",
"DescribeTasks",
"ListTasks",
"CreateService",
"UpdateService",
"DeleteService",
"DescribeServices",
"ListServices",
"ListServicesByNamespace",
"RegisterContainerInstance",
"DeregisterContainerInstance",
"DescribeContainerInstances",
"ListContainerInstances",
"UpdateContainerAgent",
"UpdateContainerInstancesState",
"PutAttributes",
"DeleteAttributes",
"ListAttributes",
"CreateCapacityProvider",
"DeleteCapacityProvider",
"DescribeCapacityProviders",
"UpdateCapacityProvider",
"GetTaskProtection",
"UpdateTaskProtection",
"CreateTaskSet",
"UpdateTaskSet",
"DeleteTaskSet",
"DescribeTaskSets",
"UpdateServicePrimaryTaskSet",
"ExecuteCommand",
"SubmitContainerStateChange",
"SubmitTaskStateChange",
"SubmitAttachmentStateChanges",
"DiscoverPollEndpoint",
"StopServiceDeployment",
"ListServiceDeployments",
"DescribeServiceDeployments",
"DescribeServiceRevisions",
];
fn is_mutating(action: &str) -> bool {
matches!(
action,
"CreateCluster"
| "DeleteCluster"
| "UpdateCluster"
| "UpdateClusterSettings"
| "PutClusterCapacityProviders"
| "RegisterTaskDefinition"
| "DeregisterTaskDefinition"
| "DeleteTaskDefinitions"
| "TagResource"
| "UntagResource"
| "PutAccountSetting"
| "PutAccountSettingDefault"
| "DeleteAccountSetting"
| "RunTask"
| "StartTask"
| "StopTask"
| "CreateService"
| "UpdateService"
| "DeleteService"
| "RegisterContainerInstance"
| "DeregisterContainerInstance"
| "UpdateContainerAgent"
| "UpdateContainerInstancesState"
| "PutAttributes"
| "DeleteAttributes"
| "CreateCapacityProvider"
| "DeleteCapacityProvider"
| "UpdateCapacityProvider"
| "UpdateTaskProtection"
| "CreateTaskSet"
| "UpdateTaskSet"
| "DeleteTaskSet"
| "UpdateServicePrimaryTaskSet"
| "SubmitContainerStateChange"
| "SubmitTaskStateChange"
| "SubmitAttachmentStateChanges"
| "StopServiceDeployment"
)
}
pub struct EcsService {
state: SharedEcsState,
snapshot_store: Option<Arc<dyn SnapshotStore>>,
snapshot_lock: Arc<AsyncMutex<()>>,
runtime: Option<Arc<crate::runtime::EcsRuntime>>,
role_trust_validator: Option<Arc<dyn fakecloud_core::auth::RoleTrustValidator>>,
}
impl EcsService {
pub fn new(state: SharedEcsState) -> Self {
Self {
state,
snapshot_store: None,
snapshot_lock: Arc::new(AsyncMutex::new(())),
runtime: None,
role_trust_validator: None,
}
}
pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
self.snapshot_store = Some(store);
self
}
pub fn with_runtime(mut self, runtime: Arc<crate::runtime::EcsRuntime>) -> Self {
self.runtime = Some(runtime);
self
}
pub fn with_role_trust_validator(
mut self,
validator: Arc<dyn fakecloud_core::auth::RoleTrustValidator>,
) -> Self {
self.role_trust_validator = Some(validator);
self
}
fn check_pass_role(&self, account_id: &str, role_arn: &str) -> Result<(), AwsServiceError> {
let Some(ref validator) = self.role_trust_validator else {
return Ok(());
};
if let Err(err) = validator.validate(account_id, role_arn, "ecs-tasks.amazonaws.com") {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterException",
err.to_string(),
));
}
Ok(())
}
pub fn state_handle(&self) -> &SharedEcsState {
&self.state
}
async fn save_snapshot(&self) {
let Some(store) = self.snapshot_store.clone() else {
return;
};
let _guard = self.snapshot_lock.lock().await;
let snapshot = EcsSnapshot {
schema_version: ECS_SNAPSHOT_SCHEMA_VERSION,
accounts: Some(self.state.read().clone()),
};
let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
let bytes = serde_json::to_vec(&snapshot)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
store.save(&bytes)
})
.await;
match join {
Ok(Ok(())) => {}
Ok(Err(err)) => tracing::error!(%err, "failed to write ecs snapshot"),
Err(err) => tracing::error!(%err, "ecs snapshot task panicked"),
}
}
}
#[async_trait]
impl AwsService for EcsService {
fn service_name(&self) -> &str {
"ecs"
}
async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let mutates = is_mutating(request.action.as_str());
let result = match request.action.as_str() {
"CreateCluster" => self.create_cluster(&request),
"DescribeClusters" => self.describe_clusters(&request),
"DeleteCluster" => self.delete_cluster(&request),
"ListClusters" => self.list_clusters(&request),
"UpdateCluster" => self.update_cluster(&request),
"UpdateClusterSettings" => self.update_cluster_settings(&request),
"PutClusterCapacityProviders" => self.put_cluster_capacity_providers(&request),
"RegisterTaskDefinition" => self.register_task_definition(&request),
"DescribeTaskDefinition" => self.describe_task_definition(&request),
"DeregisterTaskDefinition" => self.deregister_task_definition(&request),
"DeleteTaskDefinitions" => self.delete_task_definitions(&request),
"ListTaskDefinitions" => self.list_task_definitions(&request),
"ListTaskDefinitionFamilies" => self.list_task_definition_families(&request),
"TagResource" => self.tag_resource(&request),
"UntagResource" => self.untag_resource(&request),
"ListTagsForResource" => self.list_tags_for_resource(&request),
"PutAccountSetting" => self.put_account_setting(&request),
"PutAccountSettingDefault" => self.put_account_setting_default(&request),
"DeleteAccountSetting" => self.delete_account_setting(&request),
"ListAccountSettings" => self.list_account_settings(&request),
"RunTask" => self.run_task(&request),
"StartTask" => self.start_task(&request),
"StopTask" => self.stop_task(&request).await,
"DescribeTasks" => self.describe_tasks(&request),
"ListTasks" => self.list_tasks(&request),
"CreateService" => self.create_service(&request),
"UpdateService" => self.update_service(&request),
"DeleteService" => self.delete_service(&request).await,
"DescribeServices" => self.describe_services(&request),
"ListServices" => self.list_services(&request),
"ListServicesByNamespace" => self.list_services_by_namespace(&request),
"RegisterContainerInstance" => self.register_container_instance(&request),
"DeregisterContainerInstance" => self.deregister_container_instance(&request),
"DescribeContainerInstances" => self.describe_container_instances(&request),
"ListContainerInstances" => self.list_container_instances(&request),
"UpdateContainerAgent" => self.update_container_agent(&request),
"UpdateContainerInstancesState" => self.update_container_instances_state(&request),
"PutAttributes" => self.put_attributes(&request),
"DeleteAttributes" => self.delete_attributes(&request),
"ListAttributes" => self.list_attributes(&request),
"CreateCapacityProvider" => self.create_capacity_provider(&request),
"DeleteCapacityProvider" => self.delete_capacity_provider(&request),
"DescribeCapacityProviders" => self.describe_capacity_providers(&request),
"UpdateCapacityProvider" => self.update_capacity_provider(&request),
"GetTaskProtection" => self.get_task_protection(&request),
"UpdateTaskProtection" => self.update_task_protection(&request),
"CreateTaskSet" => self.create_task_set(&request),
"UpdateTaskSet" => self.update_task_set(&request),
"DeleteTaskSet" => self.delete_task_set(&request),
"DescribeTaskSets" => self.describe_task_sets(&request),
"UpdateServicePrimaryTaskSet" => self.update_service_primary_task_set(&request),
"ExecuteCommand" => self.execute_command(&request).await,
"SubmitContainerStateChange" => self.submit_container_state_change(&request),
"SubmitTaskStateChange" => self.submit_task_state_change(&request),
"SubmitAttachmentStateChanges" => self.submit_attachment_state_changes(&request),
"DiscoverPollEndpoint" => self.discover_poll_endpoint(&request),
"StopServiceDeployment" => self.stop_service_deployment(&request),
"ListServiceDeployments" => self.list_service_deployments(&request),
"DescribeServiceDeployments" => self.describe_service_deployments(&request),
"DescribeServiceRevisions" => self.describe_service_revisions(&request),
_ => Err(AwsServiceError::action_not_implemented(
"ecs",
&request.action,
)),
};
if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
self.save_snapshot().await;
}
result
}
fn supported_actions(&self) -> &[&str] {
SUPPORTED_ACTIONS
}
}
fn req_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
body.get(field)
.and_then(|v| v.as_str())
.ok_or_else(|| client_exception(format!("Missing required field: {field}")))
}
fn opt_str<'a>(body: &'a Value, field: &str) -> Option<&'a str> {
body.get(field).and_then(|v| v.as_str())
}
fn client_exception(message: impl Into<String>) -> AwsServiceError {
AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "ClientException", message)
}
fn invalid_parameter(message: impl Into<String>) -> AwsServiceError {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterException",
message,
)
}
fn cluster_not_found(name: &str) -> AwsServiceError {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ClusterNotFoundException",
format!("The referenced cluster was inactive: {name}"),
)
}
fn cluster_contains_services() -> AwsServiceError {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ClusterContainsServicesException",
"The specified cluster still contains active services",
)
}
fn cluster_contains_tasks() -> AwsServiceError {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ClusterContainsTasksException",
"The specified cluster still contains active tasks",
)
}
fn task_definition_not_found(family_rev: &str) -> AwsServiceError {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ClientException",
format!("Unable to describe task definition: {family_rev}"),
)
}
fn parse_tags(body: &Value) -> Vec<TagEntry> {
body.get("tags")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|t| {
let k = t.get("key").and_then(|v| v.as_str())?;
let v = t.get("value").and_then(|v| v.as_str()).unwrap_or("");
Some(TagEntry {
key: k.to_string(),
value: v.to_string(),
})
})
.collect()
})
.unwrap_or_default()
}
fn tags_json(tags: &[TagEntry]) -> Value {
Value::Array(
tags.iter()
.map(|t| json!({"key": t.key, "value": t.value}))
.collect(),
)
}
fn merge_tags(current: &mut Vec<TagEntry>, incoming: Vec<TagEntry>) {
for new_tag in incoming {
if let Some(existing) = current.iter_mut().find(|t| t.key == new_tag.key) {
existing.value = new_tag.value;
} else {
current.push(new_tag);
}
}
}
fn cluster_to_json(cluster: &Cluster) -> Value {
json!({
"clusterArn": cluster.cluster_arn,
"clusterName": cluster.cluster_name,
"status": cluster.status,
"registeredContainerInstancesCount": cluster.registered_container_instances_count,
"runningTasksCount": cluster.running_tasks_count,
"pendingTasksCount": cluster.pending_tasks_count,
"activeServicesCount": cluster.active_services_count,
"statistics": cluster.statistics,
"tags": tags_json(&cluster.tags),
"settings": cluster.settings,
"configuration": cluster.configuration,
"capacityProviders": cluster.capacity_providers,
"defaultCapacityProviderStrategy": cluster.default_capacity_provider_strategy,
"attachments": cluster.attachments,
"attachmentsStatus": cluster.attachments_status,
"serviceConnectDefaults": cluster.service_connect_defaults,
})
}
fn task_definition_to_json(td: &TaskDefinition) -> Value {
let mut map = Map::new();
map.insert("taskDefinitionArn".into(), json!(td.task_definition_arn));
map.insert("family".into(), json!(td.family));
map.insert("revision".into(), json!(td.revision));
map.insert("status".into(), json!(td.status));
map.insert(
"containerDefinitions".into(),
Value::Array(td.container_definitions.clone()),
);
map.insert("compatibilities".into(), json!(td.compatibilities));
map.insert(
"requiresCompatibilities".into(),
json!(td.requires_compatibilities),
);
map.insert("volumes".into(), Value::Array(td.volumes.clone()));
map.insert(
"placementConstraints".into(),
Value::Array(td.placement_constraints.clone()),
);
map.insert(
"requiresAttributes".into(),
Value::Array(td.requires_attributes.clone()),
);
map.insert(
"inferenceAccelerators".into(),
Value::Array(td.inference_accelerators.clone()),
);
if let Some(ref x) = td.network_mode {
map.insert("networkMode".into(), json!(x));
}
if let Some(ref x) = td.cpu {
map.insert("cpu".into(), json!(x));
}
if let Some(ref x) = td.memory {
map.insert("memory".into(), json!(x));
}
if let Some(ref x) = td.task_role_arn {
map.insert("taskRoleArn".into(), json!(x));
}
if let Some(ref x) = td.execution_role_arn {
map.insert("executionRoleArn".into(), json!(x));
}
if let Some(ref x) = td.pid_mode {
map.insert("pidMode".into(), json!(x));
}
if let Some(ref x) = td.ipc_mode {
map.insert("ipcMode".into(), json!(x));
}
if let Some(ref x) = td.proxy_configuration {
map.insert("proxyConfiguration".into(), x.clone());
}
if let Some(ref x) = td.ephemeral_storage {
map.insert("ephemeralStorage".into(), x.clone());
}
if let Some(ref x) = td.runtime_platform {
map.insert("runtimePlatform".into(), x.clone());
}
if let Some(ref x) = td.registered_by {
map.insert("registeredBy".into(), json!(x));
}
map.insert("registeredAt".into(), json!(td.registered_at.timestamp()));
if let Some(ts) = td.deregistered_at {
map.insert("deregisteredAt".into(), json!(ts.timestamp()));
}
if let Some(enabled) = td.enable_fault_injection {
map.insert("enableFaultInjection".into(), json!(enabled));
}
Value::Object(map)
}
fn resolve_service_key(state: &EcsState, tail: &str) -> Option<String> {
if tail.contains('/') {
return state.services.contains_key(tail).then(|| tail.to_string());
}
let suffix = format!("/{tail}");
state
.services
.keys()
.find(|k| k.ends_with(&suffix))
.cloned()
}
fn resolve_container_instance_key(state: &EcsState, tail: &str) -> Option<String> {
if tail.contains('/') {
return state
.container_instances
.contains_key(tail)
.then(|| tail.to_string());
}
let suffix = format!("/{tail}");
state
.container_instances
.keys()
.find(|k| k.ends_with(&suffix))
.cloned()
}
fn decode_ecs_arn(arn: &str) -> Result<(String, String, String), AwsServiceError> {
let rest = arn
.strip_prefix("arn:aws:ecs:")
.ok_or_else(|| invalid_parameter(format!("Malformed ECS ARN: {arn}")))?;
let mut parts = rest.splitn(3, ':');
let _region = parts
.next()
.ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
let account = parts
.next()
.ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
let resource = parts
.next()
.ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
let (resource_type, tail) = resource
.split_once('/')
.ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
Ok((
account.to_string(),
resource_type.to_string(),
tail.to_string(),
))
}
fn parse_family_revision(input: &str) -> (String, Option<i32>) {
if let Some((family, rev)) = input.rsplit_once(':') {
if let Ok(n) = rev.parse::<i32>() {
return (family.to_string(), Some(n));
}
}
(input.to_string(), None)
}
fn resolve_task_definition_ref(
input: &str,
) -> Result<(Option<String>, String, Option<i32>), AwsServiceError> {
if input.starts_with("arn:aws:ecs:") {
let (account, resource_type, tail) = decode_ecs_arn(input)?;
if resource_type != "task-definition" {
return Err(invalid_parameter(format!(
"Expected task-definition ARN: {input}"
)));
}
let (family, rev) = parse_family_revision(&tail);
Ok((Some(account), family, rev))
} else {
let (family, rev) = parse_family_revision(input);
Ok((None, family, rev))
}
}
fn target_account_for_task_definition(request: &AwsRequest, td_ref: &str) -> String {
if let Ok((Some(account), _, _)) = resolve_task_definition_ref(td_ref) {
account
} else {
request.account_id.clone()
}
}
fn target_account_for_cluster(request: &AwsRequest, cluster_ref: Option<&str>) -> String {
if let Some(input) = cluster_ref {
if input.starts_with("arn:aws:ecs:") {
if let Ok((account, _, _)) = decode_ecs_arn(input) {
return account;
}
}
}
request.account_id.clone()
}
fn latest_active_revision(
revisions: &std::collections::BTreeMap<i32, TaskDefinition>,
) -> Option<&TaskDefinition> {
revisions.values().rev().find(|td| td.status == "ACTIVE")
}
impl EcsService {
fn create_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let cluster_name = opt_str(&body, "clusterName")
.unwrap_or("default")
.to_string();
let tags = parse_tags(&body);
let settings: Vec<Value> = body
.get("settings")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let configuration = body.get("configuration").cloned();
let capacity_providers: Vec<String> = body
.get("capacityProviders")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let default_strategy: Vec<Value> = body
.get("defaultCapacityProviderStrategy")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let service_connect = body.get("serviceConnectDefaults").cloned();
let account = request.account_id.clone();
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
let arn = state.cluster_arn(&cluster_name);
let mut cluster = Cluster::new(&cluster_name, arn);
cluster.tags = tags;
cluster.settings = settings;
cluster.configuration = configuration;
cluster.capacity_providers = capacity_providers;
cluster.default_capacity_provider_strategy = default_strategy;
cluster.service_connect_defaults = service_connect;
state.clusters.insert(cluster_name.clone(), cluster.clone());
Ok(AwsResponse::ok_json(json!({
"cluster": cluster_to_json(&cluster),
})))
}
fn describe_clusters(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let names: Vec<String> = body
.get("clusters")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| EcsState::resolve_cluster_name(Some(s))))
.collect()
})
.unwrap_or_else(|| vec!["default".to_string()]);
let account = request.account_id.clone();
let accounts = self.state.read();
let mut found = Vec::new();
let mut failures = Vec::new();
if let Some(state) = accounts.get(&account) {
for name in &names {
match state.clusters.get(name) {
Some(c) => found.push(cluster_to_json(c)),
None => failures.push(json!({
"arn": state.cluster_arn(name),
"reason": "MISSING",
})),
}
}
} else {
for name in &names {
failures.push(json!({
"arn": format!(
"arn:aws:ecs:{}:{}:cluster/{}",
accounts.region(),
account,
name
),
"reason": "MISSING",
}));
}
}
Ok(AwsResponse::ok_json(json!({
"clusters": found,
"failures": failures,
})))
}
fn delete_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let cluster_ref = opt_str(&body, "cluster");
let name = EcsState::resolve_cluster_name(cluster_ref);
let account = target_account_for_cluster(request, cluster_ref);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
let cluster = state
.clusters
.get_mut(&name)
.ok_or_else(|| cluster_not_found(&name))?;
if cluster.active_services_count > 0 {
return Err(cluster_contains_services());
}
if cluster.running_tasks_count > 0 || cluster.pending_tasks_count > 0 {
return Err(cluster_contains_tasks());
}
cluster.status = "INACTIVE".to_string();
let snapshot = cluster.clone();
state.clusters.remove(&name);
Ok(AwsResponse::ok_json(json!({
"cluster": cluster_to_json(&snapshot),
})))
}
fn list_clusters(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let max_results = body
.get("maxResults")
.and_then(|v| v.as_i64())
.filter(|n| (1..=100).contains(n))
.map(|n| n as usize)
.unwrap_or(100);
let next_token = opt_str(&body, "nextToken").unwrap_or("");
let account = request.account_id.clone();
let accounts = self.state.read();
let arns: Vec<String> = match accounts.get(&account) {
Some(state) => state
.clusters
.values()
.map(|c| c.cluster_arn.clone())
.collect(),
None => Vec::new(),
};
let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
let end = (start + max_results).min(arns.len());
let page = arns[start..end].to_vec();
let next = if end < arns.len() {
Some(end.to_string())
} else {
None
};
let mut out = json!({ "clusterArns": page });
if let Some(n) = next {
out.as_object_mut()
.unwrap()
.insert("nextToken".into(), json!(n));
}
Ok(AwsResponse::ok_json(out))
}
fn update_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let cluster_ref = req_str(&body, "cluster")?;
let name = EcsState::resolve_cluster_name(Some(cluster_ref));
let account = target_account_for_cluster(request, Some(cluster_ref));
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
let cluster = state
.clusters
.get_mut(&name)
.ok_or_else(|| cluster_not_found(&name))?;
if let Some(settings) = body.get("settings").and_then(|v| v.as_array()) {
cluster.settings = settings.clone();
}
if let Some(cfg) = body.get("configuration") {
cluster.configuration = Some(cfg.clone());
}
if let Some(sc) = body.get("serviceConnectDefaults") {
cluster.service_connect_defaults = Some(sc.clone());
}
let snapshot = cluster.clone();
Ok(AwsResponse::ok_json(json!({
"cluster": cluster_to_json(&snapshot),
})))
}
fn update_cluster_settings(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let cluster_ref = req_str(&body, "cluster")?;
let name = EcsState::resolve_cluster_name(Some(cluster_ref));
let account = target_account_for_cluster(request, Some(cluster_ref));
let settings: Vec<Value> = body
.get("settings")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
let cluster = state
.clusters
.get_mut(&name)
.ok_or_else(|| cluster_not_found(&name))?;
cluster.settings = settings;
let snapshot = cluster.clone();
Ok(AwsResponse::ok_json(json!({
"cluster": cluster_to_json(&snapshot),
})))
}
fn put_cluster_capacity_providers(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let cluster_ref = req_str(&body, "cluster")?;
let name = EcsState::resolve_cluster_name(Some(cluster_ref));
let account = target_account_for_cluster(request, Some(cluster_ref));
let capacity_providers: Vec<String> = body
.get("capacityProviders")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.ok_or_else(|| client_exception("Missing required field: capacityProviders"))?;
let default_strategy: Vec<Value> = body
.get("defaultCapacityProviderStrategy")
.and_then(|v| v.as_array())
.cloned()
.ok_or_else(|| {
client_exception("Missing required field: defaultCapacityProviderStrategy")
})?;
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
let cluster = state
.clusters
.get_mut(&name)
.ok_or_else(|| cluster_not_found(&name))?;
cluster.capacity_providers = capacity_providers;
cluster.default_capacity_provider_strategy = default_strategy;
let snapshot = cluster.clone();
Ok(AwsResponse::ok_json(json!({
"cluster": cluster_to_json(&snapshot),
})))
}
}
impl EcsService {
fn register_task_definition(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let family = req_str(&body, "family")?.to_string();
validate_family_name(&family)?;
let container_definitions = body
.get("containerDefinitions")
.and_then(|v| v.as_array())
.cloned()
.ok_or_else(|| client_exception("Missing required field: containerDefinitions"))?;
if container_definitions.is_empty() {
return Err(client_exception(
"Task definition must have at least one container",
));
}
for cd in &container_definitions {
if cd
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("")
.is_empty()
{
return Err(client_exception(
"Container definition is missing required field: name",
));
}
if cd
.get("image")
.and_then(|v| v.as_str())
.unwrap_or("")
.is_empty()
{
return Err(client_exception(
"Container definition is missing required field: image",
));
}
}
if let Some(role_arn) = opt_str(&body, "taskRoleArn") {
self.check_pass_role(&request.account_id, role_arn)?;
}
if let Some(role_arn) = opt_str(&body, "executionRoleArn") {
self.check_pass_role(&request.account_id, role_arn)?;
}
let tags = parse_tags(&body);
let requires_compatibilities: Vec<String> = body
.get("requiresCompatibilities")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let compatibilities = vec!["EC2".to_string(), "FARGATE".to_string()];
let account = request.account_id.clone();
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
let revision = state.allocate_revision(&family);
let arn = state.task_definition_arn(&family, revision);
let td = TaskDefinition {
family: family.clone(),
revision,
task_definition_arn: arn,
container_definitions,
status: "ACTIVE".to_string(),
task_role_arn: opt_str(&body, "taskRoleArn").map(String::from),
execution_role_arn: opt_str(&body, "executionRoleArn").map(String::from),
network_mode: opt_str(&body, "networkMode").map(String::from),
requires_compatibilities,
compatibilities,
cpu: opt_str(&body, "cpu").map(String::from),
memory: opt_str(&body, "memory").map(String::from),
pid_mode: opt_str(&body, "pidMode").map(String::from),
ipc_mode: opt_str(&body, "ipcMode").map(String::from),
volumes: body
.get("volumes")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default(),
placement_constraints: body
.get("placementConstraints")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default(),
proxy_configuration: body.get("proxyConfiguration").cloned(),
inference_accelerators: body
.get("inferenceAccelerators")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default(),
ephemeral_storage: body.get("ephemeralStorage").cloned(),
runtime_platform: body.get("runtimePlatform").cloned(),
requires_attributes: Vec::new(),
registered_at: Utc::now(),
registered_by: request
.principal
.as_ref()
.map(|p| p.arn.clone())
.or(Some(format!("arn:aws:iam::{}:root", state.account_id))),
deregistered_at: None,
tags: tags.clone(),
enable_fault_injection: body.get("enableFaultInjection").and_then(|v| v.as_bool()),
};
let td_json = task_definition_to_json(&td);
state
.task_definitions
.entry(family.clone())
.or_default()
.insert(revision, td);
Ok(AwsResponse::ok_json(json!({
"taskDefinition": td_json,
"tags": tags_json(&tags),
})))
}
fn describe_task_definition(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let td_ref = req_str(&body, "taskDefinition")?;
let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
let include_tags = body
.get("include")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
.unwrap_or(false);
let account = target_account_for_task_definition(request, td_ref);
let accounts = self.state.read();
let state = accounts
.get(&account)
.ok_or_else(|| task_definition_not_found(td_ref))?;
let revisions = state
.task_definitions
.get(&family)
.ok_or_else(|| task_definition_not_found(td_ref))?;
let td = match rev {
Some(n) => revisions
.get(&n)
.ok_or_else(|| task_definition_not_found(td_ref))?,
None => latest_active_revision(revisions)
.ok_or_else(|| task_definition_not_found(td_ref))?,
};
let mut out = json!({"taskDefinition": task_definition_to_json(td)});
if include_tags {
out.as_object_mut()
.unwrap()
.insert("tags".into(), tags_json(&td.tags));
}
Ok(AwsResponse::ok_json(out))
}
fn deregister_task_definition(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let td_ref = req_str(&body, "taskDefinition")?;
let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
let rev =
rev.ok_or_else(|| client_exception("taskDefinition must reference a revision"))?;
let account = target_account_for_task_definition(request, td_ref);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
let revisions = state
.task_definitions
.get_mut(&family)
.ok_or_else(|| task_definition_not_found(td_ref))?;
let td = revisions
.get_mut(&rev)
.ok_or_else(|| task_definition_not_found(td_ref))?;
td.status = "INACTIVE".to_string();
td.deregistered_at = Some(Utc::now());
let snapshot = td.clone();
Ok(AwsResponse::ok_json(json!({
"taskDefinition": task_definition_to_json(&snapshot),
})))
}
fn delete_task_definitions(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let refs: Vec<String> = body
.get("taskDefinitions")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.ok_or_else(|| client_exception("Missing required field: taskDefinitions"))?;
if refs.is_empty() {
return Err(client_exception("taskDefinitions must not be empty"));
}
let mut deleted = Vec::new();
let mut failures = Vec::new();
let account = request.account_id.clone();
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
for input in refs {
let parsed = match resolve_task_definition_ref(&input) {
Ok((_, family, Some(rev))) => Some((family, rev)),
Ok(_) => None,
Err(_) => None,
};
let Some((family, rev)) = parsed else {
failures.push(json!({
"arn": input,
"reason": "INVALID_REFERENCE",
"detail": "Expected family:revision or full task-definition ARN",
}));
continue;
};
let Some(revisions) = state.task_definitions.get_mut(&family) else {
failures.push(json!({"arn": input, "reason": "MISSING"}));
continue;
};
let Some(td) = revisions.get_mut(&rev) else {
failures.push(json!({"arn": input, "reason": "MISSING"}));
continue;
};
if td.status == "ACTIVE" {
failures.push(json!({
"arn": td.task_definition_arn.clone(),
"reason": "MUST_BE_INACTIVE",
"detail": "Task definitions must be deregistered before they can be deleted",
}));
continue;
}
td.status = "DELETE_IN_PROGRESS".to_string();
deleted.push(task_definition_to_json(td));
}
Ok(AwsResponse::ok_json(json!({
"taskDefinitions": deleted,
"failures": failures,
})))
}
fn list_task_definitions(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let family_prefix = opt_str(&body, "familyPrefix");
let status = opt_str(&body, "status").unwrap_or("ACTIVE");
let sort = opt_str(&body, "sort").unwrap_or("ASC");
let max_results = body
.get("maxResults")
.and_then(|v| v.as_i64())
.filter(|n| (1..=100).contains(n))
.map(|n| n as usize)
.unwrap_or(100);
let next_token = opt_str(&body, "nextToken").unwrap_or("");
let account = request.account_id.clone();
let accounts = self.state.read();
let mut arns: Vec<String> = Vec::new();
if let Some(state) = accounts.get(&account) {
for (family, revisions) in &state.task_definitions {
if let Some(prefix) = family_prefix {
if !family.starts_with(prefix) {
continue;
}
}
for td in revisions.values() {
if td.status == status {
arns.push(td.task_definition_arn.clone());
}
}
}
}
if sort == "DESC" {
arns.sort();
arns.reverse();
} else {
arns.sort();
}
let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
let end = (start + max_results).min(arns.len());
let page = arns[start..end].to_vec();
let mut out = json!({"taskDefinitionArns": page});
if end < arns.len() {
out.as_object_mut()
.unwrap()
.insert("nextToken".into(), json!(end.to_string()));
}
Ok(AwsResponse::ok_json(out))
}
fn list_task_definition_families(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let family_prefix = opt_str(&body, "familyPrefix");
let status = opt_str(&body, "status").unwrap_or("ACTIVE");
let max_results = body
.get("maxResults")
.and_then(|v| v.as_i64())
.filter(|n| (1..=100).contains(n))
.map(|n| n as usize)
.unwrap_or(100);
let next_token = opt_str(&body, "nextToken").unwrap_or("");
let account = request.account_id.clone();
let accounts = self.state.read();
let mut families: Vec<String> = Vec::new();
if let Some(state) = accounts.get(&account) {
for (family, revisions) in &state.task_definitions {
if let Some(prefix) = family_prefix {
if !family.starts_with(prefix) {
continue;
}
}
let matches_status = match status {
"ACTIVE" => revisions.values().any(|td| td.status == "ACTIVE"),
"INACTIVE" => revisions
.values()
.all(|td| td.status == "INACTIVE" || td.status == "DELETE_IN_PROGRESS"),
"ALL" => true,
_ => revisions.values().any(|td| td.status == status),
};
if matches_status {
families.push(family.clone());
}
}
}
families.sort();
let start = next_token.parse::<usize>().unwrap_or(0).min(families.len());
let end = (start + max_results).min(families.len());
let page = families[start..end].to_vec();
let mut out = json!({"families": page});
if end < families.len() {
out.as_object_mut()
.unwrap()
.insert("nextToken".into(), json!(end.to_string()));
}
Ok(AwsResponse::ok_json(out))
}
}
fn validate_family_name(family: &str) -> Result<(), AwsServiceError> {
if family.is_empty() || family.len() > 255 {
return Err(invalid_parameter(
"Task definition family must be 1-255 characters",
));
}
let ok = family
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-');
if !ok {
return Err(invalid_parameter(
"Task definition family may only contain letters, numbers, hyphens, and underscores",
));
}
Ok(())
}
impl EcsService {
fn tag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let arn = req_str(&body, "resourceArn")?.to_string();
let tags = parse_tags(&body);
let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
match resource_type.as_str() {
"cluster" => {
let cluster = state
.clusters
.get_mut(&tail)
.ok_or_else(|| resource_not_found(&arn))?;
merge_tags(&mut cluster.tags, tags);
}
"task-definition" => {
let (family, rev) = parse_family_revision(&tail);
let rev = rev.ok_or_else(|| {
invalid_parameter("task-definition ARN must include revision")
})?;
let td = state
.task_definitions
.get_mut(&family)
.and_then(|m| m.get_mut(&rev))
.ok_or_else(|| resource_not_found(&arn))?;
merge_tags(&mut td.tags, tags);
}
"service" => {
let key =
resolve_service_key(state, &tail).ok_or_else(|| resource_not_found(&arn))?;
let svc = state.services.get_mut(&key).expect("resolved key exists");
merge_tags(&mut svc.tags, tags);
}
"task" => {
let task_id = tail.rsplit('/').next().unwrap_or(&tail).to_string();
let task = state
.tasks
.get_mut(&task_id)
.ok_or_else(|| resource_not_found(&arn))?;
merge_tags(&mut task.tags, tags);
}
"task-set" => {
let ts = state
.task_sets
.get_mut(&tail)
.ok_or_else(|| resource_not_found(&arn))?;
merge_tags(&mut ts.tags, tags);
}
"container-instance" => {
let key = resolve_container_instance_key(state, &tail)
.ok_or_else(|| resource_not_found(&arn))?;
let ci = state
.container_instances
.get_mut(&key)
.expect("resolved key exists");
merge_tags(&mut ci.tags, tags);
}
"capacity-provider" => {
let cp = state
.capacity_providers
.get_mut(&tail)
.ok_or_else(|| resource_not_found(&arn))?;
merge_tags(&mut cp.tags, tags);
}
other => {
return Err(invalid_parameter(format!(
"Unknown ECS resource type: {other}"
)));
}
}
Ok(AwsResponse::ok_json(json!({})))
}
fn untag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let arn = req_str(&body, "resourceArn")?.to_string();
let keys: Vec<String> = body
.get("tagKeys")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
match resource_type.as_str() {
"cluster" => {
let cluster = state
.clusters
.get_mut(&tail)
.ok_or_else(|| resource_not_found(&arn))?;
cluster.tags.retain(|t| !keys.contains(&t.key));
}
"task-definition" => {
let (family, rev) = parse_family_revision(&tail);
let rev = rev.ok_or_else(|| {
invalid_parameter("task-definition ARN must include revision")
})?;
let td = state
.task_definitions
.get_mut(&family)
.and_then(|m| m.get_mut(&rev))
.ok_or_else(|| resource_not_found(&arn))?;
td.tags.retain(|t| !keys.contains(&t.key));
}
"service" => {
let key =
resolve_service_key(state, &tail).ok_or_else(|| resource_not_found(&arn))?;
let svc = state.services.get_mut(&key).expect("resolved key exists");
svc.tags.retain(|t| !keys.contains(&t.key));
}
"task" => {
let task_id = tail.rsplit('/').next().unwrap_or(&tail).to_string();
let task = state
.tasks
.get_mut(&task_id)
.ok_or_else(|| resource_not_found(&arn))?;
task.tags.retain(|t| !keys.contains(&t.key));
}
"task-set" => {
let ts = state
.task_sets
.get_mut(&tail)
.ok_or_else(|| resource_not_found(&arn))?;
ts.tags.retain(|t| !keys.contains(&t.key));
}
"container-instance" => {
let key = resolve_container_instance_key(state, &tail)
.ok_or_else(|| resource_not_found(&arn))?;
let ci = state
.container_instances
.get_mut(&key)
.expect("resolved key exists");
ci.tags.retain(|t| !keys.contains(&t.key));
}
"capacity-provider" => {
let cp = state
.capacity_providers
.get_mut(&tail)
.ok_or_else(|| resource_not_found(&arn))?;
cp.tags.retain(|t| !keys.contains(&t.key));
}
other => {
return Err(invalid_parameter(format!(
"Unknown ECS resource type: {other}"
)));
}
}
Ok(AwsResponse::ok_json(json!({})))
}
fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let arn = req_str(&body, "resourceArn")?.to_string();
let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
let accounts = self.state.read();
let state = accounts
.get(&account)
.ok_or_else(|| resource_not_found(&arn))?;
let tags = match resource_type.as_str() {
"cluster" => state
.clusters
.get(&tail)
.map(|c| c.tags.clone())
.ok_or_else(|| resource_not_found(&arn))?,
"task-definition" => {
let (family, rev) = parse_family_revision(&tail);
let rev = rev.ok_or_else(|| {
invalid_parameter("task-definition ARN must include revision")
})?;
state
.task_definitions
.get(&family)
.and_then(|m| m.get(&rev))
.map(|td| td.tags.clone())
.ok_or_else(|| resource_not_found(&arn))?
}
"service" => {
let key =
resolve_service_key(state, &tail).ok_or_else(|| resource_not_found(&arn))?;
state
.services
.get(&key)
.map(|s| s.tags.clone())
.expect("resolved key exists")
}
"task" => {
let task_id = tail.rsplit('/').next().unwrap_or(&tail).to_string();
state
.tasks
.get(&task_id)
.map(|t| t.tags.clone())
.ok_or_else(|| resource_not_found(&arn))?
}
"task-set" => state
.task_sets
.get(&tail)
.map(|t| t.tags.clone())
.ok_or_else(|| resource_not_found(&arn))?,
"container-instance" => {
let key = resolve_container_instance_key(state, &tail)
.ok_or_else(|| resource_not_found(&arn))?;
state
.container_instances
.get(&key)
.map(|c| c.tags.clone())
.expect("resolved key exists")
}
"capacity-provider" => state
.capacity_providers
.get(&tail)
.map(|c| c.tags.clone())
.ok_or_else(|| resource_not_found(&arn))?,
other => {
return Err(invalid_parameter(format!(
"Unknown ECS resource type: {other}"
)));
}
};
Ok(AwsResponse::ok_json(json!({"tags": tags_json(&tags)})))
}
}
fn resource_not_found(arn: &str) -> AwsServiceError {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ClientException",
format!("The referenced resource was not found: {arn}"),
)
}
impl EcsService {
fn put_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "name")?.to_string();
let value = req_str(&body, "value")?.to_string();
let principal_arn = opt_str(&body, "principalArn")
.map(String::from)
.or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
.unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
let account = request.account_id.clone();
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
state
.principal_account_settings
.entry(principal_arn.clone())
.or_default()
.insert(name.clone(), value.clone());
Ok(AwsResponse::ok_json(json!({
"setting": {
"name": name,
"value": value,
"principalArn": principal_arn,
}
})))
}
fn put_account_setting_default(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "name")?.to_string();
let value = req_str(&body, "value")?.to_string();
let account = request.account_id.clone();
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
state
.account_setting_defaults
.insert(name.clone(), value.clone());
Ok(AwsResponse::ok_json(json!({
"setting": {
"name": name,
"value": value,
"principalArn": format!("arn:aws:iam::{}:root", state.account_id),
}
})))
}
fn delete_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "name")?.to_string();
let principal_arn = opt_str(&body, "principalArn")
.map(String::from)
.or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
.unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
let account = request.account_id.clone();
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
let removed_value = state
.principal_account_settings
.get_mut(&principal_arn)
.and_then(|m| m.remove(&name));
Ok(AwsResponse::ok_json(json!({
"setting": {
"name": name,
"value": removed_value.unwrap_or_default(),
"principalArn": principal_arn,
}
})))
}
fn list_account_settings(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name_filter = opt_str(&body, "name");
let value_filter = opt_str(&body, "value");
let principal_filter = opt_str(&body, "principalArn");
let effective_only = body
.get("effectiveSettings")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let account = request.account_id.clone();
let accounts = self.state.read();
let Some(state) = accounts.get(&account) else {
return Ok(AwsResponse::ok_json(json!({"settings": []})));
};
let root_arn = format!("arn:aws:iam::{}:root", state.account_id);
let mut settings: Vec<Value> = Vec::new();
if effective_only {
let principal = principal_filter
.map(String::from)
.or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
.unwrap_or_else(|| root_arn.clone());
let mut merged = state.account_setting_defaults.clone();
if let Some(overrides) = state.principal_account_settings.get(&principal) {
for (k, v) in overrides {
merged.insert(k.clone(), v.clone());
}
}
for (k, v) in merged {
if matches_filter(name_filter, &k) && matches_filter(value_filter, &v) {
settings.push(json!({
"name": k,
"value": v,
"principalArn": principal,
}));
}
}
} else {
for (k, v) in &state.account_setting_defaults {
if matches_filter(name_filter, k)
&& matches_filter(value_filter, v)
&& (principal_filter.is_none() || principal_filter == Some(root_arn.as_str()))
{
settings.push(json!({
"name": k,
"value": v,
"principalArn": root_arn,
}));
}
}
for (principal, entries) in &state.principal_account_settings {
if principal_filter.is_some_and(|pf| pf != principal) {
continue;
}
for (k, v) in entries {
if matches_filter(name_filter, k) && matches_filter(value_filter, v) {
settings.push(json!({
"name": k,
"value": v,
"principalArn": principal,
}));
}
}
}
}
Ok(AwsResponse::ok_json(json!({"settings": settings})))
}
}
fn matches_filter(filter: Option<&str>, value: &str) -> bool {
filter.is_none_or(|f| f == value)
}
impl EcsService {
fn run_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let td_ref = req_str(&body, "taskDefinition")?;
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let launch_type = opt_str(&body, "launchType")
.unwrap_or("FARGATE")
.to_string();
let count = body
.get("count")
.and_then(|v| v.as_i64())
.filter(|n| (1..=10).contains(n))
.unwrap_or(1) as usize;
let group = opt_str(&body, "group").map(String::from);
let started_by = opt_str(&body, "startedBy").map(String::from);
let tags = parse_tags(&body);
if let Some(overrides) = body.get("overrides") {
if let Some(role_arn) = opt_str(overrides, "taskRoleArn") {
self.check_pass_role(&request.account_id, role_arn)?;
}
if let Some(role_arn) = opt_str(overrides, "executionRoleArn") {
self.check_pass_role(&request.account_id, role_arn)?;
}
}
let account = request.account_id.clone();
let runtime = self.runtime.clone();
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
let cluster_arn = state
.clusters
.get(&cluster_name)
.map(|c| c.cluster_arn.clone())
.unwrap_or_else(|| state.cluster_arn(&cluster_name));
let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
let revisions = state
.task_definitions
.get(&family)
.ok_or_else(|| task_definition_not_found(td_ref))?;
let td = match rev {
Some(n) => revisions
.get(&n)
.ok_or_else(|| task_definition_not_found(td_ref))?,
None => latest_active_revision(revisions)
.ok_or_else(|| task_definition_not_found(td_ref))?,
};
if td.status != "ACTIVE" {
return Err(client_exception(format!(
"Task definition {} is not ACTIVE",
td.task_definition_arn
)));
}
let td_arn = td.task_definition_arn.clone();
let td_family = td.family.clone();
let td_revision = td.revision;
let td_cpu = td.cpu.clone();
let td_memory = td.memory.clone();
let td_task_role = td.task_role_arn.clone();
let td_exec_role = td.execution_role_arn.clone();
let td_containers = td.container_definitions.clone();
let mut spawned_tasks: Vec<String> = Vec::new();
let mut task_jsons: Vec<Value> = Vec::new();
for _ in 0..count {
let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
let task_arn = state.task_arn(&cluster_name, &task_id);
let containers: Vec<Container> = td_containers
.iter()
.map(|def| Container {
container_arn: format!(
"arn:aws:ecs:{}:{}:container/{}/{}/{}",
state.region,
state.account_id,
cluster_name,
task_id,
def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
),
name: def
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("app")
.to_string(),
image: def
.get("image")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
task_arn: task_arn.clone(),
last_status: "PENDING".into(),
exit_code: None,
reason: None,
runtime_id: None,
essential: def
.get("essential")
.and_then(|v| v.as_bool())
.unwrap_or(true),
cpu: def
.get("cpu")
.and_then(|v| v.as_i64())
.map(|n| n.to_string()),
memory: def
.get("memory")
.and_then(|v| v.as_i64())
.map(|n| n.to_string()),
memory_reservation: def
.get("memoryReservation")
.and_then(|v| v.as_i64())
.map(|n| n.to_string()),
network_bindings: Vec::new(),
network_interfaces: Vec::new(),
health_status: Some("UNKNOWN".to_string()),
managed_agents: None,
})
.collect();
let awslogs = td_containers.iter().find_map(|def| {
let name = def.get("name").and_then(|v| v.as_str())?.to_string();
let log_cfg = def.get("logConfiguration")?;
if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
return None;
}
let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
Some(AwsLogsConfig {
group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
stream_prefix: opts
.get("awslogs-stream-prefix")
.and_then(|v| v.as_str())
.map(String::from),
region: opts
.get("awslogs-region")
.and_then(|v| v.as_str())
.unwrap_or(&state.region)
.to_string(),
container_name: name,
})
});
let task = Task {
task_arn: task_arn.clone(),
task_id: task_id.clone(),
cluster_arn: cluster_arn.clone(),
cluster_name: cluster_name.clone(),
task_definition_arn: td_arn.clone(),
family: td_family.clone(),
revision: td_revision,
last_status: "PROVISIONING".into(),
desired_status: "RUNNING".into(),
launch_type: launch_type.clone(),
platform_version: Some("1.4.0".into()),
cpu: body
.get("overrides")
.and_then(|v| v.get("cpu"))
.and_then(|v| v.as_str())
.map(String::from)
.or_else(|| td_cpu.clone()),
memory: body
.get("overrides")
.and_then(|v| v.get("memory"))
.and_then(|v| v.as_str())
.map(String::from)
.or_else(|| td_memory.clone()),
containers,
overrides: body.get("overrides").cloned().unwrap_or_else(|| json!({})),
started_by: started_by.clone(),
group: group.clone(),
connectivity: "CONNECTING".into(),
stop_code: None,
stopped_reason: None,
created_at: Utc::now(),
started_at: None,
stopping_at: None,
stopped_at: None,
pull_started_at: None,
pull_stopped_at: None,
connectivity_at: None,
started_by_ref_id: None,
execution_role_arn: td_exec_role.clone(),
task_role_arn: td_task_role.clone(),
tags: tags.clone(),
awslogs,
captured_logs: String::new(),
protection: None,
};
state.tasks.insert(task_id.clone(), task.clone());
if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
cluster.pending_tasks_count += 1;
}
if let Some(t) = state.tasks.get_mut(&task_id) {
t.last_status = "PENDING".into();
}
task_jsons.push(task_to_json(&task));
spawned_tasks.push(task_id.clone());
}
drop(accounts);
if let Some(rt) = runtime {
for id in &spawned_tasks {
rt.clone()
.run_task(self.state.clone(), id.clone(), account.clone());
}
} else {
let mut accounts = self.state.write();
if let Some(state) = accounts.get_mut(&account) {
let mut cluster_drains: Vec<String> = Vec::new();
for id in &spawned_tasks {
if let Some(t) = state.tasks.get_mut(id) {
t.last_status = "STOPPED".into();
t.desired_status = "STOPPED".into();
t.stop_code = Some("TaskFailedToStart".into());
t.stopped_reason = Some(
"No container runtime available (docker/podman not installed)".into(),
);
t.stopped_at = Some(Utc::now());
for c in t.containers.iter_mut() {
c.last_status = "STOPPED".into();
}
cluster_drains.push(t.cluster_name.clone());
}
}
for name in cluster_drains {
if let Some(cluster) = state.clusters.get_mut(&name) {
if cluster.pending_tasks_count > 0 {
cluster.pending_tasks_count -= 1;
}
}
}
}
}
Ok(AwsResponse::ok_json(json!({
"tasks": task_jsons,
"failures": [],
})))
}
fn start_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
self.run_task(request)
}
async fn stop_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let task_ref = req_str(&body, "task")?;
let reason = opt_str(&body, "reason")
.unwrap_or("UserInitiated")
.to_string();
let cluster_ref = opt_str(&body, "cluster");
let _cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let (task_id, account, task_snapshot) = {
let account = request.account_id.clone();
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| task_not_found(task_ref))?;
let task_id = resolve_task_id(state, task_ref)?;
let task = state
.tasks
.get_mut(&task_id)
.ok_or_else(|| task_not_found(task_ref))?;
task.desired_status = "STOPPED".into();
task.stopping_at = Some(Utc::now());
task.stopped_reason = Some(reason.clone());
task.stop_code = Some("UserInitiated".into());
(task_id, account, task.clone())
};
if let Some(rt) = &self.runtime {
rt.stop_task(&task_id, &reason).await;
}
let _ = account;
Ok(AwsResponse::ok_json(json!({
"task": task_to_json(&task_snapshot),
})))
}
fn describe_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let refs: Vec<String> = body
.get("tasks")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let include_tags = body
.get("include")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
.unwrap_or(false);
let account = request.account_id.clone();
let accounts = self.state.read();
let Some(state) = accounts.get(&account) else {
return Ok(AwsResponse::ok_json(json!({
"tasks": [],
"failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
})));
};
let mut found = Vec::new();
let mut failures = Vec::new();
for input in &refs {
let task_id = task_id_from_ref(input);
match state.tasks.get(&task_id) {
Some(t) => {
let mut v = task_to_json(t);
if include_tags {
v.as_object_mut()
.unwrap()
.insert("tags".into(), tags_json(&t.tags));
}
found.push(v);
}
None => {
failures.push(json!({
"arn": input,
"reason": "MISSING",
}));
}
}
}
Ok(AwsResponse::ok_json(json!({
"tasks": found,
"failures": failures,
})))
}
fn list_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let family = opt_str(&body, "family");
let status_filter = opt_str(&body, "desiredStatus");
let started_by = opt_str(&body, "startedBy");
let max_results = body
.get("maxResults")
.and_then(|v| v.as_i64())
.filter(|n| (1..=100).contains(n))
.map(|n| n as usize)
.unwrap_or(100);
let next_token = opt_str(&body, "nextToken").unwrap_or("");
let account = request.account_id.clone();
let accounts = self.state.read();
let mut arns: Vec<String> = match accounts.get(&account) {
Some(state) => state
.tasks
.values()
.filter(|t| t.cluster_name == cluster_name)
.filter(|t| family.is_none_or(|f| t.family == f))
.filter(|t| status_filter.is_none_or(|s| t.desired_status == s))
.filter(|t| started_by.is_none_or(|s| t.started_by.as_deref() == Some(s)))
.map(|t| t.task_arn.clone())
.collect(),
None => Vec::new(),
};
arns.sort();
let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
let end = (start + max_results).min(arns.len());
let page = arns[start..end].to_vec();
let mut out = json!({"taskArns": page});
if end < arns.len() {
out.as_object_mut()
.unwrap()
.insert("nextToken".into(), json!(end.to_string()));
}
Ok(AwsResponse::ok_json(out))
}
}
fn task_not_found(task_ref: &str) -> AwsServiceError {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ClientException",
format!("Task not found: {task_ref}"),
)
}
fn task_id_from_ref(input: &str) -> String {
if let Some(rest) = input.rsplit('/').next() {
return rest.to_string();
}
input.to_string()
}
fn resolve_task_id(state: &EcsState, task_ref: &str) -> Result<String, AwsServiceError> {
let id = task_id_from_ref(task_ref);
if state.tasks.contains_key(&id) {
Ok(id)
} else {
Err(task_not_found(task_ref))
}
}
fn task_to_json(task: &Task) -> Value {
let mut map = serde_json::Map::new();
map.insert("taskArn".into(), json!(task.task_arn));
map.insert("clusterArn".into(), json!(task.cluster_arn));
map.insert("taskDefinitionArn".into(), json!(task.task_definition_arn));
map.insert("lastStatus".into(), json!(task.last_status));
map.insert("desiredStatus".into(), json!(task.desired_status));
map.insert("launchType".into(), json!(task.launch_type));
if let Some(ref v) = task.platform_version {
map.insert("platformVersion".into(), json!(v));
}
if let Some(ref v) = task.cpu {
map.insert("cpu".into(), json!(v));
}
if let Some(ref v) = task.memory {
map.insert("memory".into(), json!(v));
}
map.insert(
"containers".into(),
Value::Array(task.containers.iter().map(container_to_json).collect()),
);
map.insert("overrides".into(), task.overrides.clone());
if let Some(ref v) = task.started_by {
map.insert("startedBy".into(), json!(v));
}
if let Some(ref v) = task.group {
map.insert("group".into(), json!(v));
}
map.insert("connectivity".into(), json!(task.connectivity));
if let Some(ref v) = task.stop_code {
map.insert("stopCode".into(), json!(v));
}
if let Some(ref v) = task.stopped_reason {
map.insert("stoppedReason".into(), json!(v));
}
if let Some(ref v) = task.task_role_arn {
map.insert("taskRoleArn".into(), json!(v));
}
if let Some(ref v) = task.execution_role_arn {
map.insert("executionRoleArn".into(), json!(v));
}
map.insert("createdAt".into(), json!(task.created_at.timestamp()));
if let Some(ts) = task.started_at {
map.insert("startedAt".into(), json!(ts.timestamp()));
}
if let Some(ts) = task.stopping_at {
map.insert("stoppingAt".into(), json!(ts.timestamp()));
}
if let Some(ts) = task.stopped_at {
map.insert("stoppedAt".into(), json!(ts.timestamp()));
}
if let Some(ts) = task.pull_started_at {
map.insert("pullStartedAt".into(), json!(ts.timestamp()));
}
if let Some(ts) = task.pull_stopped_at {
map.insert("pullStoppedAt".into(), json!(ts.timestamp()));
}
if let Some(ts) = task.connectivity_at {
map.insert("connectivityAt".into(), json!(ts.timestamp()));
}
Value::Object(map)
}
fn container_to_json(container: &Container) -> Value {
let mut map = serde_json::Map::new();
map.insert("containerArn".into(), json!(container.container_arn));
map.insert("taskArn".into(), json!(container.task_arn));
map.insert("name".into(), json!(container.name));
map.insert("image".into(), json!(container.image));
map.insert("lastStatus".into(), json!(container.last_status));
map.insert("essential".into(), json!(container.essential));
if let Some(code) = container.exit_code {
map.insert("exitCode".into(), json!(code));
}
if let Some(ref r) = container.reason {
map.insert("reason".into(), json!(r));
}
if let Some(ref id) = container.runtime_id {
map.insert("runtimeId".into(), json!(id));
}
if let Some(ref v) = container.cpu {
map.insert("cpu".into(), json!(v));
}
if let Some(ref v) = container.memory {
map.insert("memory".into(), json!(v));
}
if let Some(ref v) = container.memory_reservation {
map.insert("memoryReservation".into(), json!(v));
}
map.insert("networkBindings".into(), json!(container.network_bindings));
map.insert(
"networkInterfaces".into(),
json!(container.network_interfaces),
);
if let Some(ref v) = container.health_status {
map.insert("healthStatus".into(), json!(v));
}
Value::Object(map)
}
impl EcsService {
fn create_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let service_name = req_str(&body, "serviceName")?.to_string();
validate_service_name(&service_name)?;
let td_ref = req_str(&body, "taskDefinition")?;
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let desired_count = body
.get("desiredCount")
.and_then(|v| v.as_i64())
.filter(|n| *n >= 0)
.unwrap_or(1) as i32;
let launch_type = opt_str(&body, "launchType")
.unwrap_or("FARGATE")
.to_string();
let scheduling = opt_str(&body, "schedulingStrategy")
.unwrap_or("REPLICA")
.to_string();
let deployment_controller = body
.get("deploymentController")
.and_then(|v| v.get("type"))
.and_then(|v| v.as_str())
.unwrap_or("ECS")
.to_string();
let deployment_config = body.get("deploymentConfiguration");
let min_healthy = deployment_config
.and_then(|d| d.get("minimumHealthyPercent"))
.and_then(|v| v.as_i64())
.map(|n| n as i32);
let max_percent = deployment_config
.and_then(|d| d.get("maximumPercent"))
.and_then(|v| v.as_i64())
.map(|n| n as i32);
let circuit = deployment_config.and_then(|d| d.get("deploymentCircuitBreaker"));
let circuit_breaker = circuit.map(|c| CircuitBreakerConfig {
enable: c.get("enable").and_then(|v| v.as_bool()).unwrap_or(false),
rollback: c.get("rollback").and_then(|v| v.as_bool()).unwrap_or(false),
});
let tags = parse_tags(&body);
let role_arn = opt_str(&body, "role").map(String::from);
let load_balancers: Vec<Value> = body
.get("loadBalancers")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let service_registries: Vec<Value> = body
.get("serviceRegistries")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let placement_constraints: Vec<Value> = body
.get("placementConstraints")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let placement_strategy: Vec<Value> = body
.get("placementStrategy")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let network_configuration = body.get("networkConfiguration").cloned();
let runtime = self.runtime.clone();
let account = request.account_id.clone();
let principal_arn = request
.principal
.as_ref()
.map(|p| p.arn.clone())
.unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
let (service_json, spawn_task_ids) = {
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
let revisions = state
.task_definitions
.get(&family)
.ok_or_else(|| task_definition_not_found(td_ref))?;
let td = match rev {
Some(n) => revisions
.get(&n)
.ok_or_else(|| task_definition_not_found(td_ref))?,
None => latest_active_revision(revisions)
.ok_or_else(|| task_definition_not_found(td_ref))?,
};
let td_arn = td.task_definition_arn.clone();
let td_family = td.family.clone();
let td_revision = td.revision;
let cluster_arn = state
.clusters
.get(&cluster_name)
.map(|c| c.cluster_arn.clone())
.unwrap_or_else(|| state.cluster_arn(&cluster_name));
let service_arn = state.service_arn(&cluster_name, &service_name);
let key = EcsState::service_key(&cluster_name, &service_name);
if let Some(existing) = state.services.get(&key) {
if existing.status != "INACTIVE" {
return Err(service_already_exists(&service_name));
}
}
let deployment = Deployment {
deployment_id: format!(
"ecs-svc/{}",
uuid::Uuid::new_v4().as_u128() & 0xffff_ffff_ffff_ffff
),
status: "PRIMARY".into(),
task_definition_arn: td_arn.clone(),
desired_count,
pending_count: 0,
running_count: 0,
failed_tasks: 0,
created_at: Utc::now(),
updated_at: Utc::now(),
launch_type: launch_type.clone(),
rollout_state: "IN_PROGRESS".into(),
rollout_state_reason: Some("ECS deployment in progress.".into()),
};
let service = Service {
service_name: service_name.clone(),
service_arn: service_arn.clone(),
cluster_name: cluster_name.clone(),
cluster_arn: cluster_arn.clone(),
task_definition_arn: td_arn,
family: td_family,
revision: td_revision,
desired_count,
running_count: 0,
pending_count: 0,
launch_type: launch_type.clone(),
status: "ACTIVE".into(),
scheduling_strategy: scheduling,
deployment_controller,
minimum_healthy_percent: min_healthy,
maximum_percent: max_percent,
circuit_breaker,
deployments: vec![deployment],
load_balancers,
service_registries,
placement_constraints,
placement_strategy,
network_configuration,
tags: tags.clone(),
created_at: Utc::now(),
created_by: Some(principal_arn.clone()),
role_arn,
};
state.services.insert(key.clone(), service.clone());
if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
cluster.active_services_count += 1;
}
state.push_event(crate::state::LifecycleEvent {
at: Utc::now(),
event_type: "ServiceCreated".into(),
task_arn: None,
cluster_arn: Some(cluster_arn),
last_status: Some("ACTIVE".into()),
detail: json!({"serviceArn": service_arn, "desiredCount": desired_count}),
});
let ids =
spawn_service_tasks(state, &service, desired_count, &principal_arn, &launch_type);
(service_to_json(state.services.get(&key).unwrap()), ids)
};
if let Some(rt) = runtime {
for id in spawn_task_ids {
rt.clone().run_task(self.state.clone(), id, account.clone());
}
}
Ok(AwsResponse::ok_json(json!({ "service": service_json })))
}
fn update_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let service_ref = req_str(&body, "service")?;
let service_name = service_name_from_ref(service_ref);
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let new_desired = body.get("desiredCount").and_then(|v| v.as_i64());
let new_td_ref = opt_str(&body, "taskDefinition");
let account = request.account_id.clone();
let principal_arn = request
.principal
.as_ref()
.map(|p| p.arn.clone())
.unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
let runtime = self.runtime.clone();
let (service_json, spawn_ids, stop_ids) = {
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| service_not_found(&service_name))?;
let key = EcsState::service_key(&cluster_name, &service_name);
if !state.services.contains_key(&key) {
return Err(service_not_found(&service_name));
}
let (new_td_arn, new_family, new_revision) = if let Some(td_ref) = new_td_ref {
let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
let revisions = state
.task_definitions
.get(&family)
.ok_or_else(|| task_definition_not_found(td_ref))?;
let td = match rev {
Some(n) => revisions
.get(&n)
.ok_or_else(|| task_definition_not_found(td_ref))?,
None => latest_active_revision(revisions)
.ok_or_else(|| task_definition_not_found(td_ref))?,
};
(
Some(td.task_definition_arn.clone()),
td.family.clone(),
td.revision,
)
} else {
let svc = state.services.get(&key).unwrap();
(None, svc.family.clone(), svc.revision)
};
let service_cluster_arn;
let launch_type_clone;
let effective_desired;
let old_desired;
let mut old_deployments_drained: Vec<String> = Vec::new();
let mut new_deployment_triggered = false;
{
let svc = state.services.get_mut(&key).unwrap();
old_desired = svc.desired_count;
service_cluster_arn = svc.cluster_arn.clone();
launch_type_clone = svc.launch_type.clone();
if let Some(n) = new_desired {
let n = n.max(0) as i32;
svc.desired_count = n;
if let Some(d) = svc.deployments.iter_mut().find(|d| d.status == "PRIMARY") {
d.desired_count = n;
d.updated_at = Utc::now();
}
}
if let Some(arn) = new_td_arn.clone() {
for d in svc.deployments.iter_mut() {
if d.status == "PRIMARY" {
d.status = "ACTIVE".into();
old_deployments_drained.push(d.deployment_id.clone());
}
}
svc.deployments.insert(
0,
Deployment {
deployment_id: format!(
"ecs-svc/{}",
uuid::Uuid::new_v4().as_u128() & 0xffff_ffff_ffff_ffff
),
status: "PRIMARY".into(),
task_definition_arn: arn.clone(),
desired_count: svc.desired_count,
pending_count: 0,
running_count: 0,
failed_tasks: 0,
created_at: Utc::now(),
updated_at: Utc::now(),
launch_type: svc.launch_type.clone(),
rollout_state: "IN_PROGRESS".into(),
rollout_state_reason: Some("ECS deployment in progress.".into()),
},
);
svc.task_definition_arn = arn;
svc.family = new_family;
svc.revision = new_revision;
new_deployment_triggered = true;
}
effective_desired = svc.desired_count;
}
let mut spawn: Vec<String> = Vec::new();
let mut stop: Vec<String> = Vec::new();
let service_tag = format!("ecs-svc/{}", service_name);
let current_tasks: Vec<(String, String)> = state
.tasks
.iter()
.filter(|(_, t)| {
t.started_by.as_deref() == Some(service_tag.as_str())
&& t.cluster_name == cluster_name
&& t.last_status != "STOPPED"
})
.map(|(id, t)| (id.clone(), t.task_definition_arn.clone()))
.collect();
let current_count = current_tasks.len() as i32;
if effective_desired > current_count {
let add = (effective_desired - current_count) as usize;
let svc_snapshot = state.services.get(&key).unwrap().clone();
let mut new_ids = spawn_service_tasks(
state,
&svc_snapshot,
add as i32,
&principal_arn,
&launch_type_clone,
);
spawn.append(&mut new_ids);
} else if effective_desired < current_count {
let remove = (current_count - effective_desired) as usize;
for (id, _) in current_tasks.iter().take(remove) {
stop.push(id.clone());
}
}
if new_deployment_triggered {
let new_td_arn_match = state
.services
.get(&key)
.unwrap()
.task_definition_arn
.clone();
let kept_on_new_td: i32 = current_tasks
.iter()
.filter(|(id, t_arn)| *t_arn == new_td_arn_match && !stop.contains(id))
.count() as i32;
for (id, t_arn) in ¤t_tasks {
if *t_arn != new_td_arn_match && !stop.contains(id) {
stop.push(id.clone());
}
}
let already_spawned = spawn.len() as i32;
let need = (effective_desired - kept_on_new_td - already_spawned).max(0);
if need > 0 {
let svc_snapshot = state.services.get(&key).unwrap().clone();
let mut more = spawn_service_tasks(
state,
&svc_snapshot,
need,
&principal_arn,
&launch_type_clone,
);
spawn.append(&mut more);
}
}
state.push_event(crate::state::LifecycleEvent {
at: Utc::now(),
event_type: "ServiceUpdated".into(),
task_arn: None,
cluster_arn: Some(service_cluster_arn),
last_status: Some("ACTIVE".into()),
detail: json!({
"serviceArn": state.services.get(&key).unwrap().service_arn,
"desiredCount": effective_desired,
"previousDesiredCount": old_desired,
"newDeployment": new_deployment_triggered,
"drainedDeployments": old_deployments_drained,
}),
});
let svc = state.services.get(&key).unwrap();
(service_to_json(svc), spawn, stop)
};
if let Some(rt) = runtime {
for id in spawn_ids {
rt.clone().run_task(self.state.clone(), id, account.clone());
}
for id in stop_ids {
let rt2 = rt.clone();
let id_clone = id.clone();
tokio::spawn(async move {
rt2.stop_task(&id_clone, "ECS service scale-down").await;
});
let mut accounts = self.state.write();
if let Some(state) = accounts.get_mut(&account) {
if let Some(task) = state.tasks.get_mut(&id) {
task.desired_status = "STOPPED".into();
task.stopping_at = Some(Utc::now());
}
}
}
}
Ok(AwsResponse::ok_json(json!({ "service": service_json })))
}
async fn delete_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let service_ref = req_str(&body, "service")?;
let service_name = service_name_from_ref(service_ref);
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let force = body.get("force").and_then(|v| v.as_bool()).unwrap_or(false);
let (snapshot, task_ids_to_stop) = {
let mut accounts = self.state.write();
let state = accounts
.get_mut(&request.account_id)
.ok_or_else(|| service_not_found(&service_name))?;
let key = EcsState::service_key(&cluster_name, &service_name);
let svc = state
.services
.get_mut(&key)
.ok_or_else(|| service_not_found(&service_name))?;
if !force && svc.desired_count > 0 {
return Err(client_exception(
"The service cannot be stopped while it is scaled above 0. \
Either set desiredCount to 0 first, or pass force=true.",
));
}
svc.desired_count = 0;
svc.status = "DRAINING".into();
let service_tag = format!("ecs-svc/{}", service_name);
let stop_ids: Vec<String> = state
.tasks
.iter()
.filter(|(_, t)| {
t.started_by.as_deref() == Some(service_tag.as_str())
&& t.cluster_name == cluster_name
&& t.last_status != "STOPPED"
})
.map(|(id, _)| id.clone())
.collect();
if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
if cluster.active_services_count > 0 {
cluster.active_services_count -= 1;
}
}
let svc_snapshot = state.services.get(&key).unwrap().clone();
state.services.remove(&key);
state.push_event(crate::state::LifecycleEvent {
at: Utc::now(),
event_type: "ServiceDeleted".into(),
task_arn: None,
cluster_arn: Some(svc_snapshot.cluster_arn.clone()),
last_status: Some("DRAINING".into()),
detail: json!({"serviceArn": svc_snapshot.service_arn}),
});
(svc_snapshot, stop_ids)
};
if let Some(rt) = &self.runtime {
for id in &task_ids_to_stop {
rt.stop_task(id, "ECS service deletion").await;
let mut accounts = self.state.write();
if let Some(state) = accounts.get_mut(&request.account_id) {
if let Some(task) = state.tasks.get_mut(id) {
task.desired_status = "STOPPED".into();
task.stopping_at = Some(Utc::now());
}
}
}
}
Ok(AwsResponse::ok_json(
json!({ "service": service_to_json(&snapshot) }),
))
}
fn describe_services(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let refs: Vec<String> = body
.get("services")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let account = request.account_id.clone();
let accounts = self.state.read();
let mut found = Vec::new();
let mut failures = Vec::new();
let Some(state) = accounts.get(&account) else {
for r in &refs {
failures.push(json!({"arn": r, "reason": "MISSING"}));
}
return Ok(AwsResponse::ok_json(
json!({"services": found, "failures": failures}),
));
};
for r in &refs {
let name = service_name_from_ref(r);
let key = EcsState::service_key(&cluster_name, &name);
match state.services.get(&key) {
Some(svc) => {
let mut v = service_to_json(svc);
recompute_service_counts(state, &name, &cluster_name, &mut v);
found.push(v);
}
None => failures.push(json!({"arn": r, "reason": "MISSING"})),
}
}
Ok(AwsResponse::ok_json(json!({
"services": found,
"failures": failures,
})))
}
fn list_services(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let launch_type = opt_str(&body, "launchType");
let scheduling = opt_str(&body, "schedulingStrategy");
let max_results = body
.get("maxResults")
.and_then(|v| v.as_i64())
.filter(|n| (1..=100).contains(n))
.map(|n| n as usize)
.unwrap_or(100);
let next_token = opt_str(&body, "nextToken").unwrap_or("");
let account = request.account_id.clone();
let accounts = self.state.read();
let mut arns: Vec<String> = match accounts.get(&account) {
Some(state) => state
.services
.values()
.filter(|s| s.cluster_name == cluster_name)
.filter(|s| launch_type.is_none_or(|lt| s.launch_type == lt))
.filter(|s| scheduling.is_none_or(|sc| s.scheduling_strategy == sc))
.map(|s| s.service_arn.clone())
.collect(),
None => Vec::new(),
};
arns.sort();
let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
let end = (start + max_results).min(arns.len());
let page = arns[start..end].to_vec();
let mut out = json!({"serviceArns": page});
if end < arns.len() {
out.as_object_mut()
.unwrap()
.insert("nextToken".into(), json!(end.to_string()));
}
Ok(AwsResponse::ok_json(out))
}
fn list_services_by_namespace(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let namespace = req_str(&body, "namespace")?.to_string();
let account = request.account_id.clone();
let accounts = self.state.read();
let mut arns: Vec<String> = match accounts.get(&account) {
Some(state) => state
.services
.values()
.filter(|s| {
s.service_registries.iter().any(|r| {
r.get("registryArn")
.and_then(|v| v.as_str())
.is_some_and(|arn| arn.contains(&namespace))
})
})
.map(|s| s.service_arn.clone())
.collect(),
None => Vec::new(),
};
arns.sort();
Ok(AwsResponse::ok_json(json!({"serviceArns": arns})))
}
}
fn validate_service_name(name: &str) -> Result<(), AwsServiceError> {
if name.is_empty() || name.len() > 255 {
return Err(invalid_parameter("Service name must be 1-255 characters"));
}
let ok = name
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-');
if !ok {
return Err(invalid_parameter(
"Service name may only contain letters, numbers, hyphens, and underscores",
));
}
Ok(())
}
fn service_name_from_ref(input: &str) -> String {
if let Some(rest) = input.rsplit('/').next() {
return rest.to_string();
}
input.to_string()
}
fn service_not_found(name: &str) -> AwsServiceError {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ServiceNotFoundException",
format!("The service could not be found: {name}"),
)
}
fn service_already_exists(name: &str) -> AwsServiceError {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ServiceNotActiveException",
format!("The service {name} already exists"),
)
}
fn spawn_service_tasks(
state: &mut EcsState,
service: &Service,
count: i32,
principal_arn: &str,
launch_type: &str,
) -> Vec<String> {
if count <= 0 {
return Vec::new();
}
let Some(revisions) = state.task_definitions.get(&service.family) else {
return Vec::new();
};
let Some(td) = revisions.get(&service.revision) else {
return Vec::new();
};
let container_defs = td.container_definitions.clone();
let cpu = td.cpu.clone();
let memory = td.memory.clone();
let task_role = td.task_role_arn.clone();
let exec_role = td.execution_role_arn.clone();
let cluster_name = service.cluster_name.clone();
let cluster_arn = service.cluster_arn.clone();
let td_arn = service.task_definition_arn.clone();
let family = service.family.clone();
let revision = service.revision;
let service_tag = format!("ecs-svc/{}", service.service_name);
let mut ids = Vec::with_capacity(count as usize);
for _ in 0..count {
let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
let task_arn = state.task_arn(&cluster_name, &task_id);
let containers: Vec<Container> = container_defs
.iter()
.map(|def| Container {
container_arn: format!(
"arn:aws:ecs:{}:{}:container/{}/{}/{}",
state.region,
state.account_id,
cluster_name,
task_id,
def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
),
name: def
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("app")
.to_string(),
image: def
.get("image")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
task_arn: task_arn.clone(),
last_status: "PENDING".into(),
exit_code: None,
reason: None,
runtime_id: None,
essential: def
.get("essential")
.and_then(|v| v.as_bool())
.unwrap_or(true),
cpu: def
.get("cpu")
.and_then(|v| v.as_i64())
.map(|n| n.to_string()),
memory: def
.get("memory")
.and_then(|v| v.as_i64())
.map(|n| n.to_string()),
memory_reservation: def
.get("memoryReservation")
.and_then(|v| v.as_i64())
.map(|n| n.to_string()),
network_bindings: Vec::new(),
network_interfaces: Vec::new(),
health_status: Some("UNKNOWN".into()),
managed_agents: None,
})
.collect();
let awslogs = container_defs.iter().find_map(|def| {
let name = def.get("name").and_then(|v| v.as_str())?.to_string();
let log_cfg = def.get("logConfiguration")?;
if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
return None;
}
let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
Some(AwsLogsConfig {
group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
stream_prefix: opts
.get("awslogs-stream-prefix")
.and_then(|v| v.as_str())
.map(String::from),
region: opts
.get("awslogs-region")
.and_then(|v| v.as_str())
.unwrap_or(&state.region)
.to_string(),
container_name: name,
})
});
let task = Task {
task_arn: task_arn.clone(),
task_id: task_id.clone(),
cluster_arn: cluster_arn.clone(),
cluster_name: cluster_name.clone(),
task_definition_arn: td_arn.clone(),
family: family.clone(),
revision,
last_status: "PENDING".into(),
desired_status: "RUNNING".into(),
launch_type: launch_type.into(),
platform_version: Some("1.4.0".into()),
cpu: cpu.clone(),
memory: memory.clone(),
containers,
overrides: json!({}),
started_by: Some(service_tag.clone()),
group: Some(format!("service:{}", service.service_name)),
connectivity: "CONNECTING".into(),
stop_code: None,
stopped_reason: None,
created_at: Utc::now(),
started_at: None,
stopping_at: None,
stopped_at: None,
pull_started_at: None,
pull_stopped_at: None,
connectivity_at: None,
started_by_ref_id: None,
execution_role_arn: exec_role.clone(),
task_role_arn: task_role.clone(),
tags: Vec::new(),
awslogs,
captured_logs: String::new(),
protection: None,
};
state.tasks.insert(task_id.clone(), task);
if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
cluster.pending_tasks_count += 1;
}
ids.push(task_id);
}
let _ = principal_arn;
ids
}
fn recompute_service_counts(
state: &EcsState,
service_name: &str,
cluster_name: &str,
service_json: &mut Value,
) {
let service_tag = format!("ecs-svc/{}", service_name);
let mut running = 0i32;
let mut pending = 0i32;
for t in state.tasks.values() {
if t.started_by.as_deref() == Some(service_tag.as_str()) && t.cluster_name == cluster_name {
match t.last_status.as_str() {
"RUNNING" => running += 1,
"PENDING" | "PROVISIONING" => pending += 1,
_ => {}
}
}
}
if let Some(map) = service_json.as_object_mut() {
map.insert("runningCount".into(), json!(running));
map.insert("pendingCount".into(), json!(pending));
}
}
fn service_to_json(svc: &Service) -> Value {
let mut map = serde_json::Map::new();
map.insert("serviceArn".into(), json!(svc.service_arn));
map.insert("serviceName".into(), json!(svc.service_name));
map.insert("clusterArn".into(), json!(svc.cluster_arn));
map.insert("status".into(), json!(svc.status));
map.insert("desiredCount".into(), json!(svc.desired_count));
map.insert("runningCount".into(), json!(svc.running_count));
map.insert("pendingCount".into(), json!(svc.pending_count));
map.insert("launchType".into(), json!(svc.launch_type));
map.insert("schedulingStrategy".into(), json!(svc.scheduling_strategy));
map.insert("taskDefinition".into(), json!(svc.task_definition_arn));
map.insert(
"deploymentController".into(),
json!({"type": svc.deployment_controller}),
);
let mut deployment_cfg = serde_json::Map::new();
if let Some(n) = svc.minimum_healthy_percent {
deployment_cfg.insert("minimumHealthyPercent".into(), json!(n));
}
if let Some(n) = svc.maximum_percent {
deployment_cfg.insert("maximumPercent".into(), json!(n));
}
if let Some(ref cb) = svc.circuit_breaker {
deployment_cfg.insert(
"deploymentCircuitBreaker".into(),
json!({"enable": cb.enable, "rollback": cb.rollback}),
);
}
if !deployment_cfg.is_empty() {
map.insert(
"deploymentConfiguration".into(),
Value::Object(deployment_cfg),
);
}
map.insert(
"deployments".into(),
Value::Array(svc.deployments.iter().map(deployment_to_json).collect()),
);
map.insert(
"loadBalancers".into(),
Value::Array(svc.load_balancers.clone()),
);
map.insert(
"serviceRegistries".into(),
Value::Array(svc.service_registries.clone()),
);
map.insert(
"placementConstraints".into(),
Value::Array(svc.placement_constraints.clone()),
);
map.insert(
"placementStrategy".into(),
Value::Array(svc.placement_strategy.clone()),
);
if let Some(ref v) = svc.network_configuration {
map.insert("networkConfiguration".into(), v.clone());
}
if let Some(ref v) = svc.role_arn {
map.insert("roleArn".into(), json!(v));
}
if let Some(ref v) = svc.created_by {
map.insert("createdBy".into(), json!(v));
}
map.insert("createdAt".into(), json!(svc.created_at.timestamp()));
Value::Object(map)
}
fn deployment_to_json(d: &Deployment) -> Value {
json!({
"id": d.deployment_id,
"status": d.status,
"taskDefinition": d.task_definition_arn,
"desiredCount": d.desired_count,
"pendingCount": d.pending_count,
"runningCount": d.running_count,
"failedTasks": d.failed_tasks,
"createdAt": d.created_at.timestamp(),
"updatedAt": d.updated_at.timestamp(),
"launchType": d.launch_type,
"rolloutState": d.rollout_state,
"rolloutStateReason": d.rollout_state_reason,
})
}
impl EcsService {
fn register_container_instance(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let ec2_id = opt_str(&body, "instanceIdentityDocument")
.and_then(|s| serde_json::from_str::<Value>(s).ok())
.and_then(|v| {
v.get("instanceId")
.and_then(|x| x.as_str())
.map(String::from)
});
let tags = parse_tags(&body);
let account = request.account_id.clone();
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
let cluster_arn = state
.clusters
.get(&cluster_name)
.map(|c| c.cluster_arn.clone())
.unwrap_or_else(|| state.cluster_arn(&cluster_name));
let uuid = uuid::Uuid::new_v4().to_string();
let ci_arn = state.container_instance_arn(&cluster_name, &uuid);
let key = format!("{}/{}", cluster_name, uuid);
let ci = ContainerInstance {
container_instance_arn: ci_arn.clone(),
ec2_instance_id: ec2_id,
cluster_name: cluster_name.clone(),
cluster_arn,
status: "ACTIVE".into(),
version: 1,
version_info: body.get("versionInfo").cloned(),
agent_connected: true,
agent_update_status: None,
remaining_resources: body
.get("totalResources")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default(),
registered_resources: body
.get("totalResources")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default(),
running_tasks_count: 0,
pending_tasks_count: 0,
registered_at: Utc::now(),
attributes: body
.get("attributes")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|a| {
let name = a.get("name").and_then(|v| v.as_str())?;
Some(AttributeRef {
name: name.to_string(),
value: a.get("value").and_then(|v| v.as_str()).map(String::from),
target_type: a
.get("targetType")
.and_then(|v| v.as_str())
.map(String::from),
target_id: a
.get("targetId")
.and_then(|v| v.as_str())
.map(String::from),
})
})
.collect()
})
.unwrap_or_default(),
tags,
capacity_provider_name: None,
health_status: None,
};
state.container_instances.insert(key, ci.clone());
if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
cluster.registered_container_instances_count += 1;
}
Ok(AwsResponse::ok_json(json!({
"containerInstance": container_instance_to_json(&ci),
})))
}
fn deregister_container_instance(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let ci_ref = req_str(&body, "containerInstance")?.to_string();
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let id = container_instance_id_from_ref(&ci_ref);
let key = format!("{}/{}", cluster_name, id);
let account = request.account_id.clone();
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| container_instance_not_found(&ci_ref))?;
let mut ci = state
.container_instances
.remove(&key)
.ok_or_else(|| container_instance_not_found(&ci_ref))?;
ci.status = "INACTIVE".into();
if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
if cluster.registered_container_instances_count > 0 {
cluster.registered_container_instances_count -= 1;
}
}
Ok(AwsResponse::ok_json(json!({
"containerInstance": container_instance_to_json(&ci),
})))
}
fn describe_container_instances(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let refs: Vec<String> = body
.get("containerInstances")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let accounts = self.state.read();
let mut found = Vec::new();
let mut failures = Vec::new();
if let Some(state) = accounts.get(&request.account_id) {
for r in &refs {
let id = container_instance_id_from_ref(r);
let key = format!("{}/{}", cluster_name, id);
match state.container_instances.get(&key) {
Some(ci) => found.push(container_instance_to_json(ci)),
None => failures.push(json!({"arn": r, "reason": "MISSING"})),
}
}
} else {
for r in &refs {
failures.push(json!({"arn": r, "reason": "MISSING"}));
}
}
Ok(AwsResponse::ok_json(json!({
"containerInstances": found,
"failures": failures,
})))
}
fn list_container_instances(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let status_filter = opt_str(&body, "status");
let max_results = body
.get("maxResults")
.and_then(|v| v.as_i64())
.filter(|n| (1..=100).contains(n))
.map(|n| n as usize)
.unwrap_or(100);
let next_token = opt_str(&body, "nextToken").unwrap_or("");
let accounts = self.state.read();
let mut arns: Vec<String> = match accounts.get(&request.account_id) {
Some(state) => state
.container_instances
.values()
.filter(|ci| ci.cluster_name == cluster_name)
.filter(|ci| status_filter.is_none_or(|s| ci.status == s))
.map(|ci| ci.container_instance_arn.clone())
.collect(),
None => Vec::new(),
};
arns.sort();
let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
let end = (start + max_results).min(arns.len());
let page = arns[start..end].to_vec();
let mut out = json!({"containerInstanceArns": page});
if end < arns.len() {
out.as_object_mut()
.unwrap()
.insert("nextToken".into(), json!(end.to_string()));
}
Ok(AwsResponse::ok_json(out))
}
fn update_container_agent(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let ci_ref = req_str(&body, "containerInstance")?.to_string();
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let id = container_instance_id_from_ref(&ci_ref);
let key = format!("{}/{}", cluster_name, id);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&request.account_id)
.ok_or_else(|| container_instance_not_found(&ci_ref))?;
let ci = state
.container_instances
.get_mut(&key)
.ok_or_else(|| container_instance_not_found(&ci_ref))?;
ci.agent_update_status = Some("UPDATED".into());
Ok(AwsResponse::ok_json(json!({
"containerInstance": container_instance_to_json(ci),
})))
}
fn update_container_instances_state(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let status = req_str(&body, "status")?.to_string();
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let refs: Vec<String> = body
.get("containerInstances")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let mut accounts = self.state.write();
let state = accounts
.get_mut(&request.account_id)
.ok_or_else(|| client_exception("account not found"))?;
let mut found = Vec::new();
let mut failures = Vec::new();
for r in &refs {
let id = container_instance_id_from_ref(r);
let key = format!("{}/{}", cluster_name, id);
match state.container_instances.get_mut(&key) {
Some(ci) => {
ci.status = status.clone();
found.push(container_instance_to_json(ci));
}
None => failures.push(json!({"arn": r, "reason": "MISSING"})),
}
}
Ok(AwsResponse::ok_json(json!({
"containerInstances": found,
"failures": failures,
})))
}
fn put_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let attrs = body
.get("attributes")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
let mut stored = Vec::new();
for a in &attrs {
let Some(name) = a.get("name").and_then(|v| v.as_str()) else {
continue;
};
let target_type = a
.get("targetType")
.and_then(|v| v.as_str())
.unwrap_or("container-instance")
.to_string();
let target_id = a
.get("targetId")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let value = a.get("value").and_then(|v| v.as_str()).map(String::from);
let key = format!("{}/{}/{}", cluster_name, target_id, name);
let attr = Attribute {
cluster_name: cluster_name.clone(),
target_type: target_type.clone(),
target_id: target_id.clone(),
name: name.to_string(),
value: value.clone(),
};
state.attributes.insert(key, attr);
stored.push(json!({
"name": name,
"value": value,
"targetType": target_type,
"targetId": target_id,
}));
}
Ok(AwsResponse::ok_json(json!({"attributes": stored})))
}
fn delete_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let attrs = body
.get("attributes")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
let mut deleted = Vec::new();
for a in &attrs {
let Some(name) = a.get("name").and_then(|v| v.as_str()) else {
continue;
};
let target_id = a.get("targetId").and_then(|v| v.as_str()).unwrap_or("");
let key = format!("{}/{}/{}", cluster_name, target_id, name);
if let Some(attr) = state.attributes.remove(&key) {
deleted.push(json!({
"name": attr.name,
"value": attr.value,
"targetType": attr.target_type,
"targetId": attr.target_id,
}));
}
}
Ok(AwsResponse::ok_json(json!({"attributes": deleted})))
}
fn list_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let target_type = req_str(&body, "targetType")?.to_string();
let attr_name = opt_str(&body, "attributeName");
let attr_value = opt_str(&body, "attributeValue");
let accounts = self.state.read();
let attrs: Vec<Value> = match accounts.get(&request.account_id) {
Some(state) => state
.attributes
.values()
.filter(|a| a.cluster_name == cluster_name)
.filter(|a| a.target_type == target_type)
.filter(|a| attr_name.is_none_or(|n| a.name == n))
.filter(|a| attr_value.is_none_or(|v| a.value.as_deref() == Some(v)))
.map(|a| {
json!({
"name": a.name,
"value": a.value,
"targetType": a.target_type,
"targetId": a.target_id,
})
})
.collect(),
None => Vec::new(),
};
Ok(AwsResponse::ok_json(json!({"attributes": attrs})))
}
fn create_capacity_provider(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "name")?.to_string();
if name.starts_with("aws") || name.starts_with("ecs") {
return Err(invalid_parameter(format!(
"Capacity provider name cannot begin with 'aws' or 'ecs': {name}"
)));
}
let auto_scaling_group_provider = body.get("autoScalingGroupProvider").cloned();
let tags = parse_tags(&body);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&request.account_id);
if state.capacity_providers.contains_key(&name) {
return Err(client_exception(format!(
"Capacity provider already exists: {name}"
)));
}
let arn = format!(
"arn:aws:ecs:{}:{}:capacity-provider/{}",
state.region, state.account_id, name
);
let cp = CapacityProvider {
name: name.clone(),
arn,
status: "ACTIVE".into(),
auto_scaling_group_provider,
update_status: None,
update_status_reason: None,
created_at: Utc::now(),
tags,
};
state.capacity_providers.insert(name.clone(), cp.clone());
Ok(AwsResponse::ok_json(json!({
"capacityProvider": capacity_provider_to_json(&cp),
})))
}
fn delete_capacity_provider(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let input = req_str(&body, "capacityProvider")?.to_string();
let name = capacity_provider_name_from_ref(&input);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&request.account_id)
.ok_or_else(|| capacity_provider_not_found(&name))?;
let mut cp = state
.capacity_providers
.remove(&name)
.ok_or_else(|| capacity_provider_not_found(&name))?;
cp.status = "INACTIVE".into();
Ok(AwsResponse::ok_json(json!({
"capacityProvider": capacity_provider_to_json(&cp),
})))
}
fn describe_capacity_providers(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let names: Vec<String> = body
.get("capacityProviders")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(capacity_provider_name_from_ref))
.collect()
})
.unwrap_or_default();
let accounts = self.state.read();
let mut found = Vec::new();
let mut failures = Vec::new();
if let Some(state) = accounts.get(&request.account_id) {
if names.is_empty() {
for cp in state.capacity_providers.values() {
found.push(capacity_provider_to_json(cp));
}
} else {
for n in &names {
match state.capacity_providers.get(n) {
Some(cp) => found.push(capacity_provider_to_json(cp)),
None => failures.push(json!({"arn": n, "reason": "MISSING"})),
}
}
}
}
Ok(AwsResponse::ok_json(json!({
"capacityProviders": found,
"failures": failures,
})))
}
fn update_capacity_provider(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let input = req_str(&body, "name")?.to_string();
let name = capacity_provider_name_from_ref(&input);
let asg = body.get("autoScalingGroupProvider").cloned();
let mut accounts = self.state.write();
let state = accounts
.get_mut(&request.account_id)
.ok_or_else(|| capacity_provider_not_found(&name))?;
let cp = state
.capacity_providers
.get_mut(&name)
.ok_or_else(|| capacity_provider_not_found(&name))?;
if let Some(v) = asg {
cp.auto_scaling_group_provider = Some(v);
}
cp.update_status = Some("UPDATE_COMPLETE".into());
Ok(AwsResponse::ok_json(json!({
"capacityProvider": capacity_provider_to_json(cp),
})))
}
fn get_task_protection(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let refs: Vec<String> = body
.get("tasks")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let accounts = self.state.read();
let mut protections = Vec::new();
let mut failures = Vec::new();
if let Some(state) = accounts.get(&request.account_id) {
for r in &refs {
let id = task_id_from_ref(r);
match state.tasks.get(&id) {
Some(t) => protections.push(task_protection_json(t)),
None => failures.push(json!({"arn": r, "reason": "MISSING"})),
}
}
}
Ok(AwsResponse::ok_json(json!({
"protectedTasks": protections,
"failures": failures,
})))
}
fn update_task_protection(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let refs: Vec<String> = body
.get("tasks")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let protect = body
.get("protectionEnabled")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let expires_in_minutes = body
.get("expiresInMinutes")
.and_then(|v| v.as_i64())
.unwrap_or(2880);
let expiration = if protect {
Some(Utc::now() + chrono::Duration::minutes(expires_in_minutes))
} else {
None
};
let mut accounts = self.state.write();
let state = accounts
.get_mut(&request.account_id)
.ok_or_else(|| client_exception("account not found"))?;
let mut protections = Vec::new();
let mut failures = Vec::new();
for r in &refs {
let id = task_id_from_ref(r);
match state.tasks.get_mut(&id) {
Some(t) => {
t.protection = Some(crate::state::TaskProtection {
enabled: protect,
expiration,
});
protections.push(task_protection_json(t));
}
None => failures.push(json!({"arn": r, "reason": "MISSING"})),
}
}
Ok(AwsResponse::ok_json(json!({
"protectedTasks": protections,
"failures": failures,
})))
}
fn create_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let service_ref = req_str(&body, "service")?;
let service_name = service_name_from_ref(service_ref);
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let task_definition = req_str(&body, "taskDefinition")?.to_string();
let external_id = opt_str(&body, "externalId").map(String::from);
let launch_type = opt_str(&body, "launchType").map(String::from);
let platform_version = opt_str(&body, "platformVersion").map(String::from);
let scale = body.get("scale").cloned();
let tags = parse_tags(&body);
let load_balancers = body
.get("loadBalancers")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let service_registries = body
.get("serviceRegistries")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let capacity_provider_strategy = body
.get("capacityProviderStrategy")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let mut accounts = self.state.write();
let state = accounts
.get_mut(&request.account_id)
.ok_or_else(|| service_not_found(&service_name))?;
let service_key = EcsState::service_key(&cluster_name, &service_name);
let svc = state
.services
.get(&service_key)
.ok_or_else(|| service_not_found(&service_name))?;
if svc.deployment_controller != "EXTERNAL" {
return Err(client_exception(
"CreateTaskSet requires the service to be created with \
deploymentController.type = EXTERNAL",
));
}
let ts_id = format!("ecs-svc-{}", uuid::Uuid::new_v4().simple());
let task_set = TaskSet {
task_set_id: ts_id.clone(),
task_set_arn: format!(
"arn:aws:ecs:{}:{}:task-set/{}/{}/{}",
state.region, state.account_id, cluster_name, service_name, ts_id
),
service_arn: svc.service_arn.clone(),
cluster_arn: svc.cluster_arn.clone(),
service_name: service_name.clone(),
cluster_name: cluster_name.clone(),
external_id,
status: "ACTIVE".into(),
task_definition,
computed_desired_count: 0,
pending_count: 0,
running_count: 0,
launch_type,
platform_version,
scale,
stability_status: "STABILIZING".into(),
created_at: Utc::now(),
updated_at: Utc::now(),
load_balancers,
service_registries,
capacity_provider_strategy,
tags,
};
let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
state.task_sets.insert(key, task_set.clone());
Ok(AwsResponse::ok_json(json!({
"taskSet": task_set_to_json(&task_set),
})))
}
fn update_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let ts_ref = req_str(&body, "taskSet")?.to_string();
let service_ref = req_str(&body, "service")?;
let service_name = service_name_from_ref(service_ref);
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let scale = body.get("scale").cloned();
let mut accounts = self.state.write();
let state = accounts
.get_mut(&request.account_id)
.ok_or_else(|| client_exception("task set not found"))?;
let ts_id = task_set_id_from_ref(&ts_ref);
let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
let ts = state
.task_sets
.get_mut(&key)
.ok_or_else(|| client_exception(format!("task set not found: {}", ts_ref)))?;
if let Some(v) = scale {
ts.scale = Some(v);
}
ts.updated_at = Utc::now();
Ok(AwsResponse::ok_json(json!({
"taskSet": task_set_to_json(ts),
})))
}
fn delete_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let ts_ref = req_str(&body, "taskSet")?.to_string();
let service_ref = req_str(&body, "service")?;
let service_name = service_name_from_ref(service_ref);
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let ts_id = task_set_id_from_ref(&ts_ref);
let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&request.account_id)
.ok_or_else(|| client_exception("task set not found"))?;
let mut ts = state
.task_sets
.remove(&key)
.ok_or_else(|| client_exception(format!("task set not found: {}", ts_ref)))?;
ts.status = "DRAINING".into();
Ok(AwsResponse::ok_json(json!({
"taskSet": task_set_to_json(&ts),
})))
}
fn describe_task_sets(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let service_ref = req_str(&body, "service")?;
let service_name = service_name_from_ref(service_ref);
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let filter_refs: Vec<String> = body
.get("taskSets")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let accounts = self.state.read();
let mut found = Vec::new();
let mut failures = Vec::new();
if let Some(state) = accounts.get(&request.account_id) {
if filter_refs.is_empty() {
for ts in state.task_sets.values() {
if ts.cluster_name == cluster_name && ts.service_name == service_name {
found.push(task_set_to_json(ts));
}
}
} else {
for r in &filter_refs {
let id = task_set_id_from_ref(r);
let key = format!("{}/{}/{}", cluster_name, service_name, id);
match state.task_sets.get(&key) {
Some(ts) => found.push(task_set_to_json(ts)),
None => failures.push(json!({"arn": r, "reason": "MISSING"})),
}
}
}
}
Ok(AwsResponse::ok_json(json!({
"taskSets": found,
"failures": failures,
})))
}
fn update_service_primary_task_set(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let ts_ref = req_str(&body, "primaryTaskSet")?.to_string();
let service_ref = req_str(&body, "service")?;
let service_name = service_name_from_ref(service_ref);
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let ts_id = task_set_id_from_ref(&ts_ref);
let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&request.account_id)
.ok_or_else(|| client_exception("task set not found"))?;
if !state.task_sets.contains_key(&key) {
return Err(client_exception(format!("task set not found: {}", ts_ref)));
}
for ts in state.task_sets.values_mut() {
if ts.service_name == service_name
&& ts.cluster_name == cluster_name
&& ts.status == "PRIMARY"
&& ts.task_set_id != ts_id
{
ts.status = "ACTIVE".into();
ts.updated_at = Utc::now();
}
}
let ts = state.task_sets.get_mut(&key).unwrap();
ts.status = "PRIMARY".into();
ts.updated_at = Utc::now();
Ok(AwsResponse::ok_json(json!({
"taskSet": task_set_to_json(ts),
})))
}
async fn execute_command(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let task_ref = req_str(&body, "task")?.to_string();
let command = req_str(&body, "command")?.to_string();
let interactive = body
.get("interactive")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let container_id = {
let accounts = self.state.read();
let state = accounts
.get(&request.account_id)
.ok_or_else(|| task_not_found(&task_ref))?;
let id = task_id_from_ref(&task_ref);
state
.tasks
.get(&id)
.and_then(|t| t.containers.first())
.and_then(|c| c.runtime_id.clone())
};
let session_id = format!("ecs-execute-command-{}", uuid::Uuid::new_v4());
if let (Some(id), Some(_rt)) = (container_id.clone(), self.runtime.as_ref()) {
let out = tokio::process::Command::new("docker")
.args(["exec", &id, "sh", "-c", &command])
.output()
.await
.map_err(|e| client_exception(format!("docker exec failed: {e}")))?;
tracing::info!(
task = %task_ref,
exit = out.status.code().unwrap_or(-1),
"ExecuteCommand via docker exec"
);
}
Ok(AwsResponse::ok_json(json!({
"clusterArn": opt_str(&body, "cluster").unwrap_or(""),
"containerArn": container_id.unwrap_or_default(),
"containerName": opt_str(&body, "container").unwrap_or(""),
"interactive": interactive,
"session": {
"sessionId": session_id,
"streamUrl": "",
"tokenValue": "",
},
"taskArn": task_ref,
})))
}
fn submit_container_state_change(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let task_ref = opt_str(&body, "task").unwrap_or("");
let container_name = opt_str(&body, "containerName").unwrap_or("");
let status = opt_str(&body, "status").map(String::from);
let exit_code = body.get("exitCode").and_then(|v| v.as_i64());
let reason = opt_str(&body, "reason").map(String::from);
if !task_ref.is_empty() {
let mut accounts = self.state.write();
if let Some(state) = accounts.get_mut(&request.account_id) {
let id = task_id_from_ref(task_ref);
if let Some(task) = state.tasks.get_mut(&id) {
if let Some(container) = task
.containers
.iter_mut()
.find(|c| c.name == container_name)
{
if let Some(s) = status {
container.last_status = s;
}
if let Some(code) = exit_code {
container.exit_code = Some(code);
}
if let Some(r) = reason {
container.reason = Some(r);
}
}
}
}
}
Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
}
fn submit_task_state_change(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let task_ref = opt_str(&body, "task").unwrap_or("");
let status = opt_str(&body, "status").map(String::from);
if !task_ref.is_empty() {
let mut accounts = self.state.write();
if let Some(state) = accounts.get_mut(&request.account_id) {
let id = task_id_from_ref(task_ref);
if let Some(task) = state.tasks.get_mut(&id) {
if let Some(s) = status {
task.last_status = s;
}
}
}
}
Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
}
fn submit_attachment_state_changes(
&self,
_request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
}
fn discover_poll_endpoint(
&self,
_request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let accounts = self.state.read();
let endpoint = format!("https://ecs.{}.amazonaws.com/", accounts.region());
Ok(AwsResponse::ok_json(json!({
"endpoint": endpoint,
"telemetryEndpoint": endpoint,
"serviceConnectEndpoint": endpoint,
})))
}
fn stop_service_deployment(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let deployment_ref = req_str(&body, "serviceDeploymentArn")?.to_string();
let mut accounts = self.state.write();
let state = accounts
.get_mut(&request.account_id)
.ok_or_else(|| client_exception("service deployment not found"))?;
for svc in state.services.values_mut() {
for d in svc.deployments.iter_mut() {
if deployment_ref.contains(&d.deployment_id) {
d.status = "STOPPED".into();
d.rollout_state = "FAILED".into();
d.rollout_state_reason = Some("StopServiceDeployment requested".into());
d.updated_at = Utc::now();
return Ok(AwsResponse::ok_json(json!({
"serviceDeployment": deployment_to_json(d),
})));
}
}
}
Err(client_exception(format!(
"service deployment not found: {deployment_ref}"
)))
}
fn list_service_deployments(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let service_ref = req_str(&body, "service")?;
let service_name = service_name_from_ref(service_ref);
let cluster_ref = opt_str(&body, "cluster");
let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
let accounts = self.state.read();
let mut deployments: Vec<Value> = Vec::new();
if let Some(state) = accounts.get(&request.account_id) {
let key = EcsState::service_key(&cluster_name, &service_name);
if let Some(svc) = state.services.get(&key) {
for d in &svc.deployments {
deployments.push(json!({
"serviceDeploymentArn": format!("{}/{}", svc.service_arn, d.deployment_id),
"serviceArn": svc.service_arn,
"clusterArn": svc.cluster_arn,
"status": d.status,
"createdAt": d.created_at.timestamp(),
"startedAt": d.created_at.timestamp(),
"finishedAt": null,
}));
}
}
}
Ok(AwsResponse::ok_json(json!({
"serviceDeployments": deployments,
})))
}
fn describe_service_deployments(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let refs: Vec<String> = body
.get("serviceDeploymentArns")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let accounts = self.state.read();
let mut found = Vec::new();
let mut failures = Vec::new();
if let Some(state) = accounts.get(&request.account_id) {
'next_ref: for r in &refs {
for svc in state.services.values() {
for d in &svc.deployments {
if r.contains(&d.deployment_id) {
found.push(json!({
"serviceDeploymentArn": r,
"serviceArn": svc.service_arn,
"clusterArn": svc.cluster_arn,
"status": d.status,
"createdAt": d.created_at.timestamp(),
"startedAt": d.created_at.timestamp(),
"finishedAt": null,
"deploymentConfiguration": {
"minimumHealthyPercent": svc.minimum_healthy_percent,
"maximumPercent": svc.maximum_percent,
},
"sourceServiceRevisions": [],
"targetServiceRevision": {
"arn": d.task_definition_arn,
"requestedTaskCount": d.desired_count,
"runningTaskCount": d.running_count,
"pendingTaskCount": d.pending_count,
"failedTasks": d.failed_tasks,
},
}));
continue 'next_ref;
}
}
}
failures.push(json!({"arn": r, "reason": "MISSING"}));
}
}
Ok(AwsResponse::ok_json(json!({
"serviceDeployments": found,
"failures": failures,
})))
}
fn describe_service_revisions(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let refs: Vec<String> = body
.get("serviceRevisionArns")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
Ok(AwsResponse::ok_json(json!({
"serviceRevisions": [],
"failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
})))
}
}
fn container_instance_id_from_ref(input: &str) -> String {
input.rsplit('/').next().unwrap_or(input).to_string()
}
fn container_instance_not_found(input: &str) -> AwsServiceError {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ClientException",
format!("Container instance not found: {input}"),
)
}
fn capacity_provider_name_from_ref(input: &str) -> String {
input.rsplit('/').next().unwrap_or(input).to_string()
}
fn capacity_provider_not_found(name: &str) -> AwsServiceError {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ClientException",
format!("Capacity provider not found: {name}"),
)
}
fn task_set_id_from_ref(input: &str) -> String {
input.rsplit('/').next().unwrap_or(input).to_string()
}
fn container_instance_to_json(ci: &ContainerInstance) -> Value {
json!({
"containerInstanceArn": ci.container_instance_arn,
"ec2InstanceId": ci.ec2_instance_id,
"status": ci.status,
"version": ci.version,
"versionInfo": ci.version_info,
"agentConnected": ci.agent_connected,
"agentUpdateStatus": ci.agent_update_status,
"remainingResources": ci.remaining_resources,
"registeredResources": ci.registered_resources,
"runningTasksCount": ci.running_tasks_count,
"pendingTasksCount": ci.pending_tasks_count,
"registeredAt": ci.registered_at.timestamp(),
"attributes": ci.attributes.iter().map(|a| json!({
"name": a.name,
"value": a.value,
"targetType": a.target_type,
"targetId": a.target_id,
})).collect::<Vec<_>>(),
"tags": tags_json(&ci.tags),
"capacityProviderName": ci.capacity_provider_name,
"healthStatus": ci.health_status,
})
}
fn capacity_provider_to_json(cp: &CapacityProvider) -> Value {
json!({
"name": cp.name,
"capacityProviderArn": cp.arn,
"status": cp.status,
"autoScalingGroupProvider": cp.auto_scaling_group_provider,
"updateStatus": cp.update_status,
"updateStatusReason": cp.update_status_reason,
"tags": tags_json(&cp.tags),
})
}
fn task_set_to_json(ts: &TaskSet) -> Value {
json!({
"id": ts.task_set_id,
"taskSetArn": ts.task_set_arn,
"serviceArn": ts.service_arn,
"clusterArn": ts.cluster_arn,
"externalId": ts.external_id,
"status": ts.status,
"taskDefinition": ts.task_definition,
"computedDesiredCount": ts.computed_desired_count,
"pendingCount": ts.pending_count,
"runningCount": ts.running_count,
"launchType": ts.launch_type,
"platformVersion": ts.platform_version,
"scale": ts.scale,
"stabilityStatus": ts.stability_status,
"createdAt": ts.created_at.timestamp(),
"updatedAt": ts.updated_at.timestamp(),
"loadBalancers": ts.load_balancers,
"serviceRegistries": ts.service_registries,
"capacityProviderStrategy": ts.capacity_provider_strategy,
"tags": tags_json(&ts.tags),
})
}
fn task_protection_json(task: &Task) -> Value {
let p = task.protection.as_ref();
json!({
"taskArn": task.task_arn,
"protectionEnabled": p.map(|p| p.enabled).unwrap_or(false),
"expirationDate": p.and_then(|p| p.expiration).map(|e| e.timestamp()),
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_family_revision_with_revision() {
assert_eq!(parse_family_revision("web:3"), ("web".to_string(), Some(3)));
}
#[test]
fn parse_family_revision_without_revision() {
assert_eq!(parse_family_revision("web"), ("web".to_string(), None));
}
#[test]
fn parse_family_revision_non_numeric_treated_as_no_revision() {
assert_eq!(
parse_family_revision("web:latest"),
("web:latest".to_string(), None)
);
}
#[test]
fn decode_ecs_arn_cluster() {
let (account, rtype, tail) =
decode_ecs_arn("arn:aws:ecs:us-east-1:111122223333:cluster/prod").unwrap();
assert_eq!(account, "111122223333");
assert_eq!(rtype, "cluster");
assert_eq!(tail, "prod");
}
#[test]
fn decode_ecs_arn_task_definition() {
let (account, rtype, tail) =
decode_ecs_arn("arn:aws:ecs:us-east-1:111122223333:task-definition/web:5").unwrap();
assert_eq!(account, "111122223333");
assert_eq!(rtype, "task-definition");
assert_eq!(tail, "web:5");
}
#[test]
fn decode_ecs_arn_rejects_non_ecs() {
assert!(decode_ecs_arn("arn:aws:s3:::bucket").is_err());
}
#[test]
fn resolve_service_key_handles_short_and_long() {
let mut state = EcsState::new("123456789012", "us-east-1");
state.services.insert(
"default/api".to_string(),
Service {
service_name: "api".into(),
service_arn: "arn".into(),
cluster_name: "default".into(),
cluster_arn: "arn".into(),
task_definition_arn: "td-arn".into(),
family: "td".into(),
revision: 1,
desired_count: 0,
running_count: 0,
pending_count: 0,
launch_type: "FARGATE".into(),
status: "ACTIVE".into(),
scheduling_strategy: "REPLICA".into(),
deployment_controller: "ECS".into(),
minimum_healthy_percent: None,
maximum_percent: None,
circuit_breaker: None,
deployments: vec![],
load_balancers: vec![],
service_registries: vec![],
placement_constraints: vec![],
placement_strategy: vec![],
network_configuration: None,
tags: vec![],
created_at: chrono::Utc::now(),
created_by: None,
role_arn: None,
},
);
assert_eq!(
resolve_service_key(&state, "default/api"),
Some("default/api".to_string())
);
assert_eq!(
resolve_service_key(&state, "api"),
Some("default/api".to_string())
);
assert_eq!(resolve_service_key(&state, "nope"), None);
}
#[test]
fn resolve_container_instance_key_handles_short_and_long() {
let mut state = EcsState::new("123456789012", "us-east-1");
state.container_instances.insert(
"default/abc-123".to_string(),
ContainerInstance {
container_instance_arn: "arn".into(),
ec2_instance_id: Some("i-x".into()),
cluster_name: "default".into(),
cluster_arn: "arn".into(),
status: "ACTIVE".into(),
version: 0,
version_info: None,
agent_connected: true,
agent_update_status: None,
remaining_resources: vec![],
registered_resources: vec![],
running_tasks_count: 0,
pending_tasks_count: 0,
registered_at: chrono::Utc::now(),
attributes: vec![],
tags: vec![],
capacity_provider_name: None,
health_status: None,
},
);
assert_eq!(
resolve_container_instance_key(&state, "default/abc-123"),
Some("default/abc-123".to_string())
);
assert_eq!(
resolve_container_instance_key(&state, "abc-123"),
Some("default/abc-123".to_string())
);
assert_eq!(resolve_container_instance_key(&state, "nope"), None);
}
#[test]
fn validate_family_name_accepts_hyphen_underscore() {
assert!(validate_family_name("web_server-2").is_ok());
}
#[test]
fn validate_family_name_rejects_empty() {
assert!(validate_family_name("").is_err());
}
#[test]
fn validate_family_name_rejects_slash() {
assert!(validate_family_name("web/server").is_err());
}
#[test]
fn resolve_task_definition_ref_bare_family() {
let (account, family, rev) = resolve_task_definition_ref("web").unwrap();
assert_eq!(account, None);
assert_eq!(family, "web");
assert_eq!(rev, None);
}
#[test]
fn resolve_task_definition_ref_family_revision() {
let (account, family, rev) = resolve_task_definition_ref("web:3").unwrap();
assert_eq!(account, None);
assert_eq!(family, "web");
assert_eq!(rev, Some(3));
}
#[test]
fn resolve_task_definition_ref_full_arn() {
let (account, family, rev) =
resolve_task_definition_ref("arn:aws:ecs:us-east-1:111122223333:task-definition/web:3")
.unwrap();
assert_eq!(account, Some("111122223333".to_string()));
assert_eq!(family, "web");
assert_eq!(rev, Some(3));
}
#[test]
fn merge_tags_replaces_existing_value() {
let mut current = vec![TagEntry {
key: "env".into(),
value: "dev".into(),
}];
merge_tags(
&mut current,
vec![TagEntry {
key: "env".into(),
value: "prod".into(),
}],
);
assert_eq!(current.len(), 1);
assert_eq!(current[0].value, "prod");
}
#[test]
fn merge_tags_adds_new() {
let mut current = vec![TagEntry {
key: "env".into(),
value: "dev".into(),
}];
merge_tags(
&mut current,
vec![TagEntry {
key: "team".into(),
value: "platform".into(),
}],
);
assert_eq!(current.len(), 2);
}
#[test]
fn parse_tags_reads_lowercase_keys() {
let body = json!({
"tags": [
{"key": "env", "value": "prod"},
{"key": "team", "value": "platform"},
]
});
let tags = parse_tags(&body);
assert_eq!(tags.len(), 2);
assert_eq!(tags[0].key, "env");
assert_eq!(tags[0].value, "prod");
}
#[test]
fn matches_filter_respects_none() {
assert!(matches_filter(None, "anything"));
assert!(matches_filter(Some("x"), "x"));
assert!(!matches_filter(Some("x"), "y"));
}
}