Skip to main content

fakecloud_ecs/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use chrono::Utc;
5use http::StatusCode;
6use serde_json::{json, Map, Value};
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_persistence::SnapshotStore;
12
13use crate::state::{
14    Attribute, AttributeRef, AwsLogsConfig, CapacityProvider, CircuitBreakerConfig, Cluster,
15    Container, ContainerInstance, Deployment, EcsSnapshot, EcsState, Service, SharedEcsState,
16    TagEntry, Task, TaskDefinition, TaskSet, ECS_SNAPSHOT_SCHEMA_VERSION,
17};
18
19const SUPPORTED_ACTIONS: &[&str] = &[
20    "CreateCluster",
21    "DescribeClusters",
22    "DeleteCluster",
23    "ListClusters",
24    "UpdateCluster",
25    "UpdateClusterSettings",
26    "PutClusterCapacityProviders",
27    "RegisterTaskDefinition",
28    "DescribeTaskDefinition",
29    "DeregisterTaskDefinition",
30    "DeleteTaskDefinitions",
31    "ListTaskDefinitions",
32    "ListTaskDefinitionFamilies",
33    "TagResource",
34    "UntagResource",
35    "ListTagsForResource",
36    "PutAccountSetting",
37    "PutAccountSettingDefault",
38    "DeleteAccountSetting",
39    "ListAccountSettings",
40    "RunTask",
41    "StartTask",
42    "StopTask",
43    "DescribeTasks",
44    "ListTasks",
45    "CreateService",
46    "UpdateService",
47    "DeleteService",
48    "DescribeServices",
49    "ListServices",
50    "ListServicesByNamespace",
51    "RegisterContainerInstance",
52    "DeregisterContainerInstance",
53    "DescribeContainerInstances",
54    "ListContainerInstances",
55    "UpdateContainerAgent",
56    "UpdateContainerInstancesState",
57    "PutAttributes",
58    "DeleteAttributes",
59    "ListAttributes",
60    "CreateCapacityProvider",
61    "DeleteCapacityProvider",
62    "DescribeCapacityProviders",
63    "UpdateCapacityProvider",
64    "GetTaskProtection",
65    "UpdateTaskProtection",
66    "CreateTaskSet",
67    "UpdateTaskSet",
68    "DeleteTaskSet",
69    "DescribeTaskSets",
70    "UpdateServicePrimaryTaskSet",
71    "ExecuteCommand",
72    "SubmitContainerStateChange",
73    "SubmitTaskStateChange",
74    "SubmitAttachmentStateChanges",
75    "DiscoverPollEndpoint",
76    "StopServiceDeployment",
77    "ListServiceDeployments",
78    "DescribeServiceDeployments",
79    "DescribeServiceRevisions",
80];
81
82fn is_mutating(action: &str) -> bool {
83    matches!(
84        action,
85        "CreateCluster"
86            | "DeleteCluster"
87            | "UpdateCluster"
88            | "UpdateClusterSettings"
89            | "PutClusterCapacityProviders"
90            | "RegisterTaskDefinition"
91            | "DeregisterTaskDefinition"
92            | "DeleteTaskDefinitions"
93            | "TagResource"
94            | "UntagResource"
95            | "PutAccountSetting"
96            | "PutAccountSettingDefault"
97            | "DeleteAccountSetting"
98            | "RunTask"
99            | "StartTask"
100            | "StopTask"
101            | "CreateService"
102            | "UpdateService"
103            | "DeleteService"
104            | "RegisterContainerInstance"
105            | "DeregisterContainerInstance"
106            | "UpdateContainerAgent"
107            | "UpdateContainerInstancesState"
108            | "PutAttributes"
109            | "DeleteAttributes"
110            | "CreateCapacityProvider"
111            | "DeleteCapacityProvider"
112            | "UpdateCapacityProvider"
113            | "UpdateTaskProtection"
114            | "CreateTaskSet"
115            | "UpdateTaskSet"
116            | "DeleteTaskSet"
117            | "UpdateServicePrimaryTaskSet"
118            | "SubmitContainerStateChange"
119            | "SubmitTaskStateChange"
120            | "SubmitAttachmentStateChanges"
121            | "StopServiceDeployment"
122    )
123}
124
125pub struct EcsService {
126    state: SharedEcsState,
127    snapshot_store: Option<Arc<dyn SnapshotStore>>,
128    snapshot_lock: Arc<AsyncMutex<()>>,
129    runtime: Option<Arc<crate::runtime::EcsRuntime>>,
130    role_trust_validator: Option<Arc<dyn fakecloud_core::auth::RoleTrustValidator>>,
131}
132
133impl EcsService {
134    pub fn new(state: SharedEcsState) -> Self {
135        Self {
136            state,
137            snapshot_store: None,
138            snapshot_lock: Arc::new(AsyncMutex::new(())),
139            runtime: None,
140            role_trust_validator: None,
141        }
142    }
143
144    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
145        self.snapshot_store = Some(store);
146        self
147    }
148
149    pub fn with_runtime(mut self, runtime: Arc<crate::runtime::EcsRuntime>) -> Self {
150        self.runtime = Some(runtime);
151        self
152    }
153
154    pub fn with_role_trust_validator(
155        mut self,
156        validator: Arc<dyn fakecloud_core::auth::RoleTrustValidator>,
157    ) -> Self {
158        self.role_trust_validator = Some(validator);
159        self
160    }
161
162    fn check_pass_role(&self, account_id: &str, role_arn: &str) -> Result<(), AwsServiceError> {
163        let Some(ref validator) = self.role_trust_validator else {
164            return Ok(());
165        };
166        if let Err(err) = validator.validate(account_id, role_arn, "ecs-tasks.amazonaws.com") {
167            return Err(AwsServiceError::aws_error(
168                StatusCode::BAD_REQUEST,
169                "InvalidParameterException",
170                err.to_string(),
171            ));
172        }
173        Ok(())
174    }
175
176    pub fn state_handle(&self) -> &SharedEcsState {
177        &self.state
178    }
179
180    async fn save_snapshot(&self) {
181        let Some(store) = self.snapshot_store.clone() else {
182            return;
183        };
184        let _guard = self.snapshot_lock.lock().await;
185        let snapshot = EcsSnapshot {
186            schema_version: ECS_SNAPSHOT_SCHEMA_VERSION,
187            accounts: Some(self.state.read().clone()),
188        };
189        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
190            let bytes = serde_json::to_vec(&snapshot)
191                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
192            store.save(&bytes)
193        })
194        .await;
195        match join {
196            Ok(Ok(())) => {}
197            Ok(Err(err)) => tracing::error!(%err, "failed to write ecs snapshot"),
198            Err(err) => tracing::error!(%err, "ecs snapshot task panicked"),
199        }
200    }
201}
202
203#[async_trait]
204impl AwsService for EcsService {
205    fn service_name(&self) -> &str {
206        "ecs"
207    }
208
209    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
210        let mutates = is_mutating(request.action.as_str());
211        let result = match request.action.as_str() {
212            "CreateCluster" => self.create_cluster(&request),
213            "DescribeClusters" => self.describe_clusters(&request),
214            "DeleteCluster" => self.delete_cluster(&request),
215            "ListClusters" => self.list_clusters(&request),
216            "UpdateCluster" => self.update_cluster(&request),
217            "UpdateClusterSettings" => self.update_cluster_settings(&request),
218            "PutClusterCapacityProviders" => self.put_cluster_capacity_providers(&request),
219            "RegisterTaskDefinition" => self.register_task_definition(&request),
220            "DescribeTaskDefinition" => self.describe_task_definition(&request),
221            "DeregisterTaskDefinition" => self.deregister_task_definition(&request),
222            "DeleteTaskDefinitions" => self.delete_task_definitions(&request),
223            "ListTaskDefinitions" => self.list_task_definitions(&request),
224            "ListTaskDefinitionFamilies" => self.list_task_definition_families(&request),
225            "TagResource" => self.tag_resource(&request),
226            "UntagResource" => self.untag_resource(&request),
227            "ListTagsForResource" => self.list_tags_for_resource(&request),
228            "PutAccountSetting" => self.put_account_setting(&request),
229            "PutAccountSettingDefault" => self.put_account_setting_default(&request),
230            "DeleteAccountSetting" => self.delete_account_setting(&request),
231            "ListAccountSettings" => self.list_account_settings(&request),
232            "RunTask" => self.run_task(&request),
233            "StartTask" => self.start_task(&request),
234            "StopTask" => self.stop_task(&request).await,
235            "DescribeTasks" => self.describe_tasks(&request),
236            "ListTasks" => self.list_tasks(&request),
237            "CreateService" => self.create_service(&request),
238            "UpdateService" => self.update_service(&request),
239            "DeleteService" => self.delete_service(&request).await,
240            "DescribeServices" => self.describe_services(&request),
241            "ListServices" => self.list_services(&request),
242            "ListServicesByNamespace" => self.list_services_by_namespace(&request),
243            "RegisterContainerInstance" => self.register_container_instance(&request),
244            "DeregisterContainerInstance" => self.deregister_container_instance(&request),
245            "DescribeContainerInstances" => self.describe_container_instances(&request),
246            "ListContainerInstances" => self.list_container_instances(&request),
247            "UpdateContainerAgent" => self.update_container_agent(&request),
248            "UpdateContainerInstancesState" => self.update_container_instances_state(&request),
249            "PutAttributes" => self.put_attributes(&request),
250            "DeleteAttributes" => self.delete_attributes(&request),
251            "ListAttributes" => self.list_attributes(&request),
252            "CreateCapacityProvider" => self.create_capacity_provider(&request),
253            "DeleteCapacityProvider" => self.delete_capacity_provider(&request),
254            "DescribeCapacityProviders" => self.describe_capacity_providers(&request),
255            "UpdateCapacityProvider" => self.update_capacity_provider(&request),
256            "GetTaskProtection" => self.get_task_protection(&request),
257            "UpdateTaskProtection" => self.update_task_protection(&request),
258            "CreateTaskSet" => self.create_task_set(&request),
259            "UpdateTaskSet" => self.update_task_set(&request),
260            "DeleteTaskSet" => self.delete_task_set(&request),
261            "DescribeTaskSets" => self.describe_task_sets(&request),
262            "UpdateServicePrimaryTaskSet" => self.update_service_primary_task_set(&request),
263            "ExecuteCommand" => self.execute_command(&request).await,
264            "SubmitContainerStateChange" => self.submit_container_state_change(&request),
265            "SubmitTaskStateChange" => self.submit_task_state_change(&request),
266            "SubmitAttachmentStateChanges" => self.submit_attachment_state_changes(&request),
267            "DiscoverPollEndpoint" => self.discover_poll_endpoint(&request),
268            "StopServiceDeployment" => self.stop_service_deployment(&request),
269            "ListServiceDeployments" => self.list_service_deployments(&request),
270            "DescribeServiceDeployments" => self.describe_service_deployments(&request),
271            "DescribeServiceRevisions" => self.describe_service_revisions(&request),
272            _ => Err(AwsServiceError::action_not_implemented(
273                "ecs",
274                &request.action,
275            )),
276        };
277        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
278            self.save_snapshot().await;
279        }
280        result
281    }
282
283    fn supported_actions(&self) -> &[&str] {
284        SUPPORTED_ACTIONS
285    }
286}
287
288// -------- helpers --------
289
290fn req_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
291    body.get(field)
292        .and_then(|v| v.as_str())
293        .ok_or_else(|| client_exception(format!("Missing required field: {field}")))
294}
295
296fn opt_str<'a>(body: &'a Value, field: &str) -> Option<&'a str> {
297    body.get(field).and_then(|v| v.as_str())
298}
299
300fn client_exception(message: impl Into<String>) -> AwsServiceError {
301    AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "ClientException", message)
302}
303
304fn invalid_parameter(message: impl Into<String>) -> AwsServiceError {
305    AwsServiceError::aws_error(
306        StatusCode::BAD_REQUEST,
307        "InvalidParameterException",
308        message,
309    )
310}
311
312fn cluster_not_found(name: &str) -> AwsServiceError {
313    AwsServiceError::aws_error(
314        StatusCode::BAD_REQUEST,
315        "ClusterNotFoundException",
316        format!("The referenced cluster was inactive: {name}"),
317    )
318}
319
320fn cluster_contains_services() -> AwsServiceError {
321    AwsServiceError::aws_error(
322        StatusCode::BAD_REQUEST,
323        "ClusterContainsServicesException",
324        "The specified cluster still contains active services",
325    )
326}
327
328fn cluster_contains_tasks() -> AwsServiceError {
329    AwsServiceError::aws_error(
330        StatusCode::BAD_REQUEST,
331        "ClusterContainsTasksException",
332        "The specified cluster still contains active tasks",
333    )
334}
335
336fn task_definition_not_found(family_rev: &str) -> AwsServiceError {
337    AwsServiceError::aws_error(
338        StatusCode::BAD_REQUEST,
339        "ClientException",
340        format!("Unable to describe task definition: {family_rev}"),
341    )
342}
343
344fn parse_tags(body: &Value) -> Vec<TagEntry> {
345    body.get("tags")
346        .and_then(|v| v.as_array())
347        .map(|arr| {
348            arr.iter()
349                .filter_map(|t| {
350                    let k = t.get("key").and_then(|v| v.as_str())?;
351                    let v = t.get("value").and_then(|v| v.as_str()).unwrap_or("");
352                    Some(TagEntry {
353                        key: k.to_string(),
354                        value: v.to_string(),
355                    })
356                })
357                .collect()
358        })
359        .unwrap_or_default()
360}
361
362fn tags_json(tags: &[TagEntry]) -> Value {
363    Value::Array(
364        tags.iter()
365            .map(|t| json!({"key": t.key, "value": t.value}))
366            .collect(),
367    )
368}
369
370fn merge_tags(current: &mut Vec<TagEntry>, incoming: Vec<TagEntry>) {
371    for new_tag in incoming {
372        if let Some(existing) = current.iter_mut().find(|t| t.key == new_tag.key) {
373            existing.value = new_tag.value;
374        } else {
375            current.push(new_tag);
376        }
377    }
378}
379
380fn cluster_to_json(cluster: &Cluster) -> Value {
381    json!({
382        "clusterArn": cluster.cluster_arn,
383        "clusterName": cluster.cluster_name,
384        "status": cluster.status,
385        "registeredContainerInstancesCount": cluster.registered_container_instances_count,
386        "runningTasksCount": cluster.running_tasks_count,
387        "pendingTasksCount": cluster.pending_tasks_count,
388        "activeServicesCount": cluster.active_services_count,
389        "statistics": cluster.statistics,
390        "tags": tags_json(&cluster.tags),
391        "settings": cluster.settings,
392        "configuration": cluster.configuration,
393        "capacityProviders": cluster.capacity_providers,
394        "defaultCapacityProviderStrategy": cluster.default_capacity_provider_strategy,
395        "attachments": cluster.attachments,
396        "attachmentsStatus": cluster.attachments_status,
397        "serviceConnectDefaults": cluster.service_connect_defaults,
398    })
399}
400
401fn task_definition_to_json(td: &TaskDefinition) -> Value {
402    let mut map = Map::new();
403    map.insert("taskDefinitionArn".into(), json!(td.task_definition_arn));
404    map.insert("family".into(), json!(td.family));
405    map.insert("revision".into(), json!(td.revision));
406    map.insert("status".into(), json!(td.status));
407    map.insert(
408        "containerDefinitions".into(),
409        Value::Array(td.container_definitions.clone()),
410    );
411    map.insert("compatibilities".into(), json!(td.compatibilities));
412    map.insert(
413        "requiresCompatibilities".into(),
414        json!(td.requires_compatibilities),
415    );
416    map.insert("volumes".into(), Value::Array(td.volumes.clone()));
417    map.insert(
418        "placementConstraints".into(),
419        Value::Array(td.placement_constraints.clone()),
420    );
421    map.insert(
422        "requiresAttributes".into(),
423        Value::Array(td.requires_attributes.clone()),
424    );
425    map.insert(
426        "inferenceAccelerators".into(),
427        Value::Array(td.inference_accelerators.clone()),
428    );
429    if let Some(ref x) = td.network_mode {
430        map.insert("networkMode".into(), json!(x));
431    }
432    if let Some(ref x) = td.cpu {
433        map.insert("cpu".into(), json!(x));
434    }
435    if let Some(ref x) = td.memory {
436        map.insert("memory".into(), json!(x));
437    }
438    if let Some(ref x) = td.task_role_arn {
439        map.insert("taskRoleArn".into(), json!(x));
440    }
441    if let Some(ref x) = td.execution_role_arn {
442        map.insert("executionRoleArn".into(), json!(x));
443    }
444    if let Some(ref x) = td.pid_mode {
445        map.insert("pidMode".into(), json!(x));
446    }
447    if let Some(ref x) = td.ipc_mode {
448        map.insert("ipcMode".into(), json!(x));
449    }
450    if let Some(ref x) = td.proxy_configuration {
451        map.insert("proxyConfiguration".into(), x.clone());
452    }
453    if let Some(ref x) = td.ephemeral_storage {
454        map.insert("ephemeralStorage".into(), x.clone());
455    }
456    if let Some(ref x) = td.runtime_platform {
457        map.insert("runtimePlatform".into(), x.clone());
458    }
459    if let Some(ref x) = td.registered_by {
460        map.insert("registeredBy".into(), json!(x));
461    }
462    map.insert("registeredAt".into(), json!(td.registered_at.timestamp()));
463    if let Some(ts) = td.deregistered_at {
464        map.insert("deregisteredAt".into(), json!(ts.timestamp()));
465    }
466    if let Some(enabled) = td.enable_fault_injection {
467        map.insert("enableFaultInjection".into(), json!(enabled));
468    }
469    Value::Object(map)
470}
471
472/// Resolve a service ARN tail to its storage key in `state.services`.
473/// Long-form ARNs carry `cluster/service` directly; short-form
474/// (pre-Nov-2018, still emitted when `serviceLongArnFormat=disabled`)
475/// only carry the service name, so we scan for any cluster whose stored
476/// key ends with `/<name>`.
477fn resolve_service_key(state: &EcsState, tail: &str) -> Option<String> {
478    if tail.contains('/') {
479        return state.services.contains_key(tail).then(|| tail.to_string());
480    }
481    let suffix = format!("/{tail}");
482    state
483        .services
484        .keys()
485        .find(|k| k.ends_with(&suffix))
486        .cloned()
487}
488
489/// Same idea for container instances. Storage keys are
490/// `cluster/instance-id`; short ARN tails are just `instance-id`.
491fn resolve_container_instance_key(state: &EcsState, tail: &str) -> Option<String> {
492    if tail.contains('/') {
493        return state
494            .container_instances
495            .contains_key(tail)
496            .then(|| tail.to_string());
497    }
498    let suffix = format!("/{tail}");
499    state
500        .container_instances
501        .keys()
502        .find(|k| k.ends_with(&suffix))
503        .cloned()
504}
505
506/// Decode an `arn:aws:ecs:<region>:<account>:<type>/<name>[:<rev>]` ARN
507/// into `(account, resource_type, tail)`. For task definitions `tail` is
508/// `family:revision`; for clusters it's `cluster_name`.
509fn decode_ecs_arn(arn: &str) -> Result<(String, String, String), AwsServiceError> {
510    let rest = arn
511        .strip_prefix("arn:aws:ecs:")
512        .ok_or_else(|| invalid_parameter(format!("Malformed ECS ARN: {arn}")))?;
513    // Resource portion may itself contain a trailing `:<revision>`, so we
514    // split at most three ways then treat the remainder as the resource.
515    let mut parts = rest.splitn(3, ':');
516    let _region = parts
517        .next()
518        .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
519    let account = parts
520        .next()
521        .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
522    let resource = parts
523        .next()
524        .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
525    let (resource_type, tail) = resource
526        .split_once('/')
527        .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
528    Ok((
529        account.to_string(),
530        resource_type.to_string(),
531        tail.to_string(),
532    ))
533}
534
535/// Parse a `family[:revision]` reference. Returns `(family, Some(rev))`
536/// when a specific revision is requested, or `(family, None)` for the
537/// latest-active shorthand.
538fn parse_family_revision(input: &str) -> (String, Option<i32>) {
539    if let Some((family, rev)) = input.rsplit_once(':') {
540        if let Ok(n) = rev.parse::<i32>() {
541            return (family.to_string(), Some(n));
542        }
543    }
544    (input.to_string(), None)
545}
546
547/// Task-definition ARNs may appear as the full ARN or just `family:rev`.
548/// Returns `(account, family, Some(rev))` where `account` is `None` for
549/// the bare shorthand form.
550fn resolve_task_definition_ref(
551    input: &str,
552) -> Result<(Option<String>, String, Option<i32>), AwsServiceError> {
553    if input.starts_with("arn:aws:ecs:") {
554        let (account, resource_type, tail) = decode_ecs_arn(input)?;
555        if resource_type != "task-definition" {
556            return Err(invalid_parameter(format!(
557                "Expected task-definition ARN: {input}"
558            )));
559        }
560        let (family, rev) = parse_family_revision(&tail);
561        Ok((Some(account), family, rev))
562    } else {
563        let (family, rev) = parse_family_revision(input);
564        Ok((None, family, rev))
565    }
566}
567
568fn target_account_for_task_definition(request: &AwsRequest, td_ref: &str) -> String {
569    if let Ok((Some(account), _, _)) = resolve_task_definition_ref(td_ref) {
570        account
571    } else {
572        request.account_id.clone()
573    }
574}
575
576fn target_account_for_cluster(request: &AwsRequest, cluster_ref: Option<&str>) -> String {
577    if let Some(input) = cluster_ref {
578        if input.starts_with("arn:aws:ecs:") {
579            if let Ok((account, _, _)) = decode_ecs_arn(input) {
580                return account;
581            }
582        }
583    }
584    request.account_id.clone()
585}
586
587fn latest_active_revision(
588    revisions: &std::collections::BTreeMap<i32, TaskDefinition>,
589) -> Option<&TaskDefinition> {
590    revisions.values().rev().find(|td| td.status == "ACTIVE")
591}
592
593// -------- operations: clusters --------
594
595impl EcsService {
596    fn create_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
597        let body = request.json_body();
598        let cluster_name = opt_str(&body, "clusterName")
599            .unwrap_or("default")
600            .to_string();
601        let tags = parse_tags(&body);
602        let settings: Vec<Value> = body
603            .get("settings")
604            .and_then(|v| v.as_array())
605            .cloned()
606            .unwrap_or_default();
607        let configuration = body.get("configuration").cloned();
608        let capacity_providers: Vec<String> = body
609            .get("capacityProviders")
610            .and_then(|v| v.as_array())
611            .map(|arr| {
612                arr.iter()
613                    .filter_map(|v| v.as_str().map(String::from))
614                    .collect()
615            })
616            .unwrap_or_default();
617        let default_strategy: Vec<Value> = body
618            .get("defaultCapacityProviderStrategy")
619            .and_then(|v| v.as_array())
620            .cloned()
621            .unwrap_or_default();
622        let service_connect = body.get("serviceConnectDefaults").cloned();
623
624        let account = request.account_id.clone();
625        let mut accounts = self.state.write();
626        let state = accounts.get_or_create(&account);
627        let arn = state.cluster_arn(&cluster_name);
628        let mut cluster = Cluster::new(&cluster_name, arn);
629        cluster.tags = tags;
630        cluster.settings = settings;
631        cluster.configuration = configuration;
632        cluster.capacity_providers = capacity_providers;
633        cluster.default_capacity_provider_strategy = default_strategy;
634        cluster.service_connect_defaults = service_connect;
635        // CreateCluster on an existing cluster is idempotent-ish — AWS
636        // returns the existing cluster, potentially with merged settings.
637        // We keep it simple and overwrite on recreate.
638        state.clusters.insert(cluster_name.clone(), cluster.clone());
639
640        Ok(AwsResponse::ok_json(json!({
641            "cluster": cluster_to_json(&cluster),
642        })))
643    }
644
645    fn describe_clusters(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
646        let body = request.json_body();
647        let names: Vec<String> = body
648            .get("clusters")
649            .and_then(|v| v.as_array())
650            .map(|arr| {
651                arr.iter()
652                    .filter_map(|v| v.as_str().map(|s| EcsState::resolve_cluster_name(Some(s))))
653                    .collect()
654            })
655            .unwrap_or_else(|| vec!["default".to_string()]);
656
657        let account = request.account_id.clone();
658        let accounts = self.state.read();
659        let mut found = Vec::new();
660        let mut failures = Vec::new();
661        if let Some(state) = accounts.get(&account) {
662            for name in &names {
663                match state.clusters.get(name) {
664                    Some(c) => found.push(cluster_to_json(c)),
665                    None => failures.push(json!({
666                        "arn": state.cluster_arn(name),
667                        "reason": "MISSING",
668                    })),
669                }
670            }
671        } else {
672            for name in &names {
673                failures.push(json!({
674                    "arn": format!(
675                        "arn:aws:ecs:{}:{}:cluster/{}",
676                        accounts.region(),
677                        account,
678                        name
679                    ),
680                    "reason": "MISSING",
681                }));
682            }
683        }
684        Ok(AwsResponse::ok_json(json!({
685            "clusters": found,
686            "failures": failures,
687        })))
688    }
689
690    fn delete_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
691        let body = request.json_body();
692        let cluster_ref = opt_str(&body, "cluster");
693        let name = EcsState::resolve_cluster_name(cluster_ref);
694        let account = target_account_for_cluster(request, cluster_ref);
695
696        let mut accounts = self.state.write();
697        let state = accounts.get_or_create(&account);
698        let cluster = state
699            .clusters
700            .get_mut(&name)
701            .ok_or_else(|| cluster_not_found(&name))?;
702        if cluster.active_services_count > 0 {
703            return Err(cluster_contains_services());
704        }
705        if cluster.running_tasks_count > 0 || cluster.pending_tasks_count > 0 {
706            return Err(cluster_contains_tasks());
707        }
708        cluster.status = "INACTIVE".to_string();
709        let snapshot = cluster.clone();
710        // Real ECS keeps the cluster visible as INACTIVE for about an
711        // hour before garbage-collecting it. We drop it immediately to
712        // keep state bounded — callers that try to describe it by name
713        // will get a MISSING failure, matching the long-tail behaviour.
714        state.clusters.remove(&name);
715        Ok(AwsResponse::ok_json(json!({
716            "cluster": cluster_to_json(&snapshot),
717        })))
718    }
719
720    fn list_clusters(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
721        let body = request.json_body();
722        let max_results = body
723            .get("maxResults")
724            .and_then(|v| v.as_i64())
725            .filter(|n| (1..=100).contains(n))
726            .map(|n| n as usize)
727            .unwrap_or(100);
728        let next_token = opt_str(&body, "nextToken").unwrap_or("");
729
730        let account = request.account_id.clone();
731        let accounts = self.state.read();
732        let arns: Vec<String> = match accounts.get(&account) {
733            Some(state) => state
734                .clusters
735                .values()
736                .map(|c| c.cluster_arn.clone())
737                .collect(),
738            None => Vec::new(),
739        };
740        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
741        let end = (start + max_results).min(arns.len());
742        let page = arns[start..end].to_vec();
743        let next = if end < arns.len() {
744            Some(end.to_string())
745        } else {
746            None
747        };
748        let mut out = json!({ "clusterArns": page });
749        if let Some(n) = next {
750            out.as_object_mut()
751                .unwrap()
752                .insert("nextToken".into(), json!(n));
753        }
754        Ok(AwsResponse::ok_json(out))
755    }
756
757    fn update_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
758        let body = request.json_body();
759        let cluster_ref = req_str(&body, "cluster")?;
760        let name = EcsState::resolve_cluster_name(Some(cluster_ref));
761        let account = target_account_for_cluster(request, Some(cluster_ref));
762
763        let mut accounts = self.state.write();
764        let state = accounts.get_or_create(&account);
765        let cluster = state
766            .clusters
767            .get_mut(&name)
768            .ok_or_else(|| cluster_not_found(&name))?;
769        if let Some(settings) = body.get("settings").and_then(|v| v.as_array()) {
770            cluster.settings = settings.clone();
771        }
772        if let Some(cfg) = body.get("configuration") {
773            cluster.configuration = Some(cfg.clone());
774        }
775        if let Some(sc) = body.get("serviceConnectDefaults") {
776            cluster.service_connect_defaults = Some(sc.clone());
777        }
778        let snapshot = cluster.clone();
779        Ok(AwsResponse::ok_json(json!({
780            "cluster": cluster_to_json(&snapshot),
781        })))
782    }
783
784    fn update_cluster_settings(
785        &self,
786        request: &AwsRequest,
787    ) -> Result<AwsResponse, AwsServiceError> {
788        let body = request.json_body();
789        let cluster_ref = req_str(&body, "cluster")?;
790        let name = EcsState::resolve_cluster_name(Some(cluster_ref));
791        let account = target_account_for_cluster(request, Some(cluster_ref));
792        let settings: Vec<Value> = body
793            .get("settings")
794            .and_then(|v| v.as_array())
795            .cloned()
796            .unwrap_or_default();
797
798        let mut accounts = self.state.write();
799        let state = accounts.get_or_create(&account);
800        let cluster = state
801            .clusters
802            .get_mut(&name)
803            .ok_or_else(|| cluster_not_found(&name))?;
804        cluster.settings = settings;
805        let snapshot = cluster.clone();
806        Ok(AwsResponse::ok_json(json!({
807            "cluster": cluster_to_json(&snapshot),
808        })))
809    }
810
811    fn put_cluster_capacity_providers(
812        &self,
813        request: &AwsRequest,
814    ) -> Result<AwsResponse, AwsServiceError> {
815        let body = request.json_body();
816        let cluster_ref = req_str(&body, "cluster")?;
817        let name = EcsState::resolve_cluster_name(Some(cluster_ref));
818        let account = target_account_for_cluster(request, Some(cluster_ref));
819        let capacity_providers: Vec<String> = body
820            .get("capacityProviders")
821            .and_then(|v| v.as_array())
822            .map(|arr| {
823                arr.iter()
824                    .filter_map(|v| v.as_str().map(String::from))
825                    .collect()
826            })
827            .ok_or_else(|| client_exception("Missing required field: capacityProviders"))?;
828        let default_strategy: Vec<Value> = body
829            .get("defaultCapacityProviderStrategy")
830            .and_then(|v| v.as_array())
831            .cloned()
832            .ok_or_else(|| {
833                client_exception("Missing required field: defaultCapacityProviderStrategy")
834            })?;
835
836        let mut accounts = self.state.write();
837        let state = accounts.get_or_create(&account);
838        let cluster = state
839            .clusters
840            .get_mut(&name)
841            .ok_or_else(|| cluster_not_found(&name))?;
842        cluster.capacity_providers = capacity_providers;
843        cluster.default_capacity_provider_strategy = default_strategy;
844        let snapshot = cluster.clone();
845        Ok(AwsResponse::ok_json(json!({
846            "cluster": cluster_to_json(&snapshot),
847        })))
848    }
849}
850
851// -------- operations: task definitions --------
852
853impl EcsService {
854    fn register_task_definition(
855        &self,
856        request: &AwsRequest,
857    ) -> Result<AwsResponse, AwsServiceError> {
858        let body = request.json_body();
859        let family = req_str(&body, "family")?.to_string();
860        validate_family_name(&family)?;
861        let container_definitions = body
862            .get("containerDefinitions")
863            .and_then(|v| v.as_array())
864            .cloned()
865            .ok_or_else(|| client_exception("Missing required field: containerDefinitions"))?;
866        if container_definitions.is_empty() {
867            return Err(client_exception(
868                "Task definition must have at least one container",
869            ));
870        }
871        for cd in &container_definitions {
872            if cd
873                .get("name")
874                .and_then(|v| v.as_str())
875                .unwrap_or("")
876                .is_empty()
877            {
878                return Err(client_exception(
879                    "Container definition is missing required field: name",
880                ));
881            }
882            if cd
883                .get("image")
884                .and_then(|v| v.as_str())
885                .unwrap_or("")
886                .is_empty()
887            {
888                return Err(client_exception(
889                    "Container definition is missing required field: image",
890                ));
891            }
892        }
893        // PassRole trust check on the task + execution roles. Real AWS
894        // rejects RegisterTaskDefinition when the role's trust policy
895        // doesn't list `ecs-tasks.amazonaws.com`.
896        if let Some(role_arn) = opt_str(&body, "taskRoleArn") {
897            self.check_pass_role(&request.account_id, role_arn)?;
898        }
899        if let Some(role_arn) = opt_str(&body, "executionRoleArn") {
900            self.check_pass_role(&request.account_id, role_arn)?;
901        }
902
903        let tags = parse_tags(&body);
904        let requires_compatibilities: Vec<String> = body
905            .get("requiresCompatibilities")
906            .and_then(|v| v.as_array())
907            .map(|arr| {
908                arr.iter()
909                    .filter_map(|v| v.as_str().map(String::from))
910                    .collect()
911            })
912            .unwrap_or_default();
913        // Compatibilities reflect what the task definition is compatible with.
914        // We always claim EC2 and FARGATE since we execute via Docker either
915        // way — callers with stricter requirements set `requiresCompatibilities`.
916        let compatibilities = vec!["EC2".to_string(), "FARGATE".to_string()];
917
918        let account = request.account_id.clone();
919        let mut accounts = self.state.write();
920        let state = accounts.get_or_create(&account);
921        let revision = state.allocate_revision(&family);
922        let arn = state.task_definition_arn(&family, revision);
923        let td = TaskDefinition {
924            family: family.clone(),
925            revision,
926            task_definition_arn: arn,
927            container_definitions,
928            status: "ACTIVE".to_string(),
929            task_role_arn: opt_str(&body, "taskRoleArn").map(String::from),
930            execution_role_arn: opt_str(&body, "executionRoleArn").map(String::from),
931            network_mode: opt_str(&body, "networkMode").map(String::from),
932            requires_compatibilities,
933            compatibilities,
934            cpu: opt_str(&body, "cpu").map(String::from),
935            memory: opt_str(&body, "memory").map(String::from),
936            pid_mode: opt_str(&body, "pidMode").map(String::from),
937            ipc_mode: opt_str(&body, "ipcMode").map(String::from),
938            volumes: body
939                .get("volumes")
940                .and_then(|v| v.as_array())
941                .cloned()
942                .unwrap_or_default(),
943            placement_constraints: body
944                .get("placementConstraints")
945                .and_then(|v| v.as_array())
946                .cloned()
947                .unwrap_or_default(),
948            proxy_configuration: body.get("proxyConfiguration").cloned(),
949            inference_accelerators: body
950                .get("inferenceAccelerators")
951                .and_then(|v| v.as_array())
952                .cloned()
953                .unwrap_or_default(),
954            ephemeral_storage: body.get("ephemeralStorage").cloned(),
955            runtime_platform: body.get("runtimePlatform").cloned(),
956            requires_attributes: Vec::new(),
957            registered_at: Utc::now(),
958            registered_by: request.principal.as_ref().map(|p| p.arn.clone()).or(Some(
959                Arn::global("iam", &state.account_id, "root").to_string(),
960            )),
961            deregistered_at: None,
962            tags: tags.clone(),
963            enable_fault_injection: body.get("enableFaultInjection").and_then(|v| v.as_bool()),
964        };
965        let td_json = task_definition_to_json(&td);
966        state
967            .task_definitions
968            .entry(family.clone())
969            .or_default()
970            .insert(revision, td);
971
972        Ok(AwsResponse::ok_json(json!({
973            "taskDefinition": td_json,
974            "tags": tags_json(&tags),
975        })))
976    }
977
978    fn describe_task_definition(
979        &self,
980        request: &AwsRequest,
981    ) -> Result<AwsResponse, AwsServiceError> {
982        let body = request.json_body();
983        let td_ref = req_str(&body, "taskDefinition")?;
984        let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
985        let include_tags = body
986            .get("include")
987            .and_then(|v| v.as_array())
988            .map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
989            .unwrap_or(false);
990
991        let account = target_account_for_task_definition(request, td_ref);
992        let accounts = self.state.read();
993        let state = accounts
994            .get(&account)
995            .ok_or_else(|| task_definition_not_found(td_ref))?;
996        let revisions = state
997            .task_definitions
998            .get(&family)
999            .ok_or_else(|| task_definition_not_found(td_ref))?;
1000        let td = match rev {
1001            Some(n) => revisions
1002                .get(&n)
1003                .ok_or_else(|| task_definition_not_found(td_ref))?,
1004            None => latest_active_revision(revisions)
1005                .ok_or_else(|| task_definition_not_found(td_ref))?,
1006        };
1007        let mut out = json!({"taskDefinition": task_definition_to_json(td)});
1008        if include_tags {
1009            out.as_object_mut()
1010                .unwrap()
1011                .insert("tags".into(), tags_json(&td.tags));
1012        }
1013        Ok(AwsResponse::ok_json(out))
1014    }
1015
1016    fn deregister_task_definition(
1017        &self,
1018        request: &AwsRequest,
1019    ) -> Result<AwsResponse, AwsServiceError> {
1020        let body = request.json_body();
1021        let td_ref = req_str(&body, "taskDefinition")?;
1022        let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
1023        let rev =
1024            rev.ok_or_else(|| client_exception("taskDefinition must reference a revision"))?;
1025
1026        let account = target_account_for_task_definition(request, td_ref);
1027        let mut accounts = self.state.write();
1028        let state = accounts.get_or_create(&account);
1029        let revisions = state
1030            .task_definitions
1031            .get_mut(&family)
1032            .ok_or_else(|| task_definition_not_found(td_ref))?;
1033        let td = revisions
1034            .get_mut(&rev)
1035            .ok_or_else(|| task_definition_not_found(td_ref))?;
1036        td.status = "INACTIVE".to_string();
1037        td.deregistered_at = Some(Utc::now());
1038        let snapshot = td.clone();
1039        Ok(AwsResponse::ok_json(json!({
1040            "taskDefinition": task_definition_to_json(&snapshot),
1041        })))
1042    }
1043
1044    fn delete_task_definitions(
1045        &self,
1046        request: &AwsRequest,
1047    ) -> Result<AwsResponse, AwsServiceError> {
1048        let body = request.json_body();
1049        let refs: Vec<String> = body
1050            .get("taskDefinitions")
1051            .and_then(|v| v.as_array())
1052            .map(|arr| {
1053                arr.iter()
1054                    .filter_map(|v| v.as_str().map(String::from))
1055                    .collect()
1056            })
1057            .ok_or_else(|| client_exception("Missing required field: taskDefinitions"))?;
1058        if refs.is_empty() {
1059            return Err(client_exception("taskDefinitions must not be empty"));
1060        }
1061
1062        let mut deleted = Vec::new();
1063        let mut failures = Vec::new();
1064        let account = request.account_id.clone();
1065        let mut accounts = self.state.write();
1066        let state = accounts.get_or_create(&account);
1067        for input in refs {
1068            let parsed = match resolve_task_definition_ref(&input) {
1069                Ok((_, family, Some(rev))) => Some((family, rev)),
1070                Ok(_) => None,
1071                Err(_) => None,
1072            };
1073            let Some((family, rev)) = parsed else {
1074                failures.push(json!({
1075                    "arn": input,
1076                    "reason": "INVALID_REFERENCE",
1077                    "detail": "Expected family:revision or full task-definition ARN",
1078                }));
1079                continue;
1080            };
1081            let Some(revisions) = state.task_definitions.get_mut(&family) else {
1082                failures.push(json!({"arn": input, "reason": "MISSING"}));
1083                continue;
1084            };
1085            let Some(td) = revisions.get_mut(&rev) else {
1086                failures.push(json!({"arn": input, "reason": "MISSING"}));
1087                continue;
1088            };
1089            if td.status == "ACTIVE" {
1090                failures.push(json!({
1091                    "arn": td.task_definition_arn.clone(),
1092                    "reason": "MUST_BE_INACTIVE",
1093                    "detail": "Task definitions must be deregistered before they can be deleted",
1094                }));
1095                continue;
1096            }
1097            td.status = "DELETE_IN_PROGRESS".to_string();
1098            deleted.push(task_definition_to_json(td));
1099        }
1100        Ok(AwsResponse::ok_json(json!({
1101            "taskDefinitions": deleted,
1102            "failures": failures,
1103        })))
1104    }
1105
1106    fn list_task_definitions(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1107        let body = request.json_body();
1108        let family_prefix = opt_str(&body, "familyPrefix");
1109        let status = opt_str(&body, "status").unwrap_or("ACTIVE");
1110        let sort = opt_str(&body, "sort").unwrap_or("ASC");
1111        let max_results = body
1112            .get("maxResults")
1113            .and_then(|v| v.as_i64())
1114            .filter(|n| (1..=100).contains(n))
1115            .map(|n| n as usize)
1116            .unwrap_or(100);
1117        let next_token = opt_str(&body, "nextToken").unwrap_or("");
1118
1119        let account = request.account_id.clone();
1120        let accounts = self.state.read();
1121        let mut arns: Vec<String> = Vec::new();
1122        if let Some(state) = accounts.get(&account) {
1123            for (family, revisions) in &state.task_definitions {
1124                if let Some(prefix) = family_prefix {
1125                    if !family.starts_with(prefix) {
1126                        continue;
1127                    }
1128                }
1129                for td in revisions.values() {
1130                    if td.status == status {
1131                        arns.push(td.task_definition_arn.clone());
1132                    }
1133                }
1134            }
1135        }
1136        if sort == "DESC" {
1137            arns.sort();
1138            arns.reverse();
1139        } else {
1140            arns.sort();
1141        }
1142        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
1143        let end = (start + max_results).min(arns.len());
1144        let page = arns[start..end].to_vec();
1145        let mut out = json!({"taskDefinitionArns": page});
1146        if end < arns.len() {
1147            out.as_object_mut()
1148                .unwrap()
1149                .insert("nextToken".into(), json!(end.to_string()));
1150        }
1151        Ok(AwsResponse::ok_json(out))
1152    }
1153
1154    fn list_task_definition_families(
1155        &self,
1156        request: &AwsRequest,
1157    ) -> Result<AwsResponse, AwsServiceError> {
1158        let body = request.json_body();
1159        let family_prefix = opt_str(&body, "familyPrefix");
1160        let status = opt_str(&body, "status").unwrap_or("ACTIVE");
1161        let max_results = body
1162            .get("maxResults")
1163            .and_then(|v| v.as_i64())
1164            .filter(|n| (1..=100).contains(n))
1165            .map(|n| n as usize)
1166            .unwrap_or(100);
1167        let next_token = opt_str(&body, "nextToken").unwrap_or("");
1168
1169        let account = request.account_id.clone();
1170        let accounts = self.state.read();
1171        let mut families: Vec<String> = Vec::new();
1172        if let Some(state) = accounts.get(&account) {
1173            for (family, revisions) in &state.task_definitions {
1174                if let Some(prefix) = family_prefix {
1175                    if !family.starts_with(prefix) {
1176                        continue;
1177                    }
1178                }
1179                let matches_status = match status {
1180                    "ACTIVE" => revisions.values().any(|td| td.status == "ACTIVE"),
1181                    "INACTIVE" => revisions
1182                        .values()
1183                        .all(|td| td.status == "INACTIVE" || td.status == "DELETE_IN_PROGRESS"),
1184                    "ALL" => true,
1185                    _ => revisions.values().any(|td| td.status == status),
1186                };
1187                if matches_status {
1188                    families.push(family.clone());
1189                }
1190            }
1191        }
1192        families.sort();
1193        let start = next_token.parse::<usize>().unwrap_or(0).min(families.len());
1194        let end = (start + max_results).min(families.len());
1195        let page = families[start..end].to_vec();
1196        let mut out = json!({"families": page});
1197        if end < families.len() {
1198            out.as_object_mut()
1199                .unwrap()
1200                .insert("nextToken".into(), json!(end.to_string()));
1201        }
1202        Ok(AwsResponse::ok_json(out))
1203    }
1204}
1205
1206fn validate_family_name(family: &str) -> Result<(), AwsServiceError> {
1207    if family.is_empty() || family.len() > 255 {
1208        return Err(invalid_parameter(
1209            "Task definition family must be 1-255 characters",
1210        ));
1211    }
1212    let ok = family
1213        .chars()
1214        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-');
1215    if !ok {
1216        return Err(invalid_parameter(
1217            "Task definition family may only contain letters, numbers, hyphens, and underscores",
1218        ));
1219    }
1220    Ok(())
1221}
1222
1223// -------- operations: tagging --------
1224
1225impl EcsService {
1226    fn tag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1227        let body = request.json_body();
1228        let arn = req_str(&body, "resourceArn")?.to_string();
1229        let tags = parse_tags(&body);
1230        let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1231        let mut accounts = self.state.write();
1232        let state = accounts.get_or_create(&account);
1233        match resource_type.as_str() {
1234            "cluster" => {
1235                let cluster = state
1236                    .clusters
1237                    .get_mut(&tail)
1238                    .ok_or_else(|| resource_not_found(&arn))?;
1239                merge_tags(&mut cluster.tags, tags);
1240            }
1241            "task-definition" => {
1242                let (family, rev) = parse_family_revision(&tail);
1243                let rev = rev.ok_or_else(|| {
1244                    invalid_parameter("task-definition ARN must include revision")
1245                })?;
1246                let td = state
1247                    .task_definitions
1248                    .get_mut(&family)
1249                    .and_then(|m| m.get_mut(&rev))
1250                    .ok_or_else(|| resource_not_found(&arn))?;
1251                merge_tags(&mut td.tags, tags);
1252            }
1253            "service" => {
1254                let key =
1255                    resolve_service_key(state, &tail).ok_or_else(|| resource_not_found(&arn))?;
1256                let svc = state.services.get_mut(&key).expect("resolved key exists");
1257                merge_tags(&mut svc.tags, tags);
1258            }
1259            "task" => {
1260                let task_id = tail.rsplit('/').next().unwrap_or(&tail).to_string();
1261                let task = state
1262                    .tasks
1263                    .get_mut(&task_id)
1264                    .ok_or_else(|| resource_not_found(&arn))?;
1265                merge_tags(&mut task.tags, tags);
1266            }
1267            "task-set" => {
1268                let ts = state
1269                    .task_sets
1270                    .get_mut(&tail)
1271                    .ok_or_else(|| resource_not_found(&arn))?;
1272                merge_tags(&mut ts.tags, tags);
1273            }
1274            "container-instance" => {
1275                let key = resolve_container_instance_key(state, &tail)
1276                    .ok_or_else(|| resource_not_found(&arn))?;
1277                let ci = state
1278                    .container_instances
1279                    .get_mut(&key)
1280                    .expect("resolved key exists");
1281                merge_tags(&mut ci.tags, tags);
1282            }
1283            "capacity-provider" => {
1284                let cp = state
1285                    .capacity_providers
1286                    .get_mut(&tail)
1287                    .ok_or_else(|| resource_not_found(&arn))?;
1288                merge_tags(&mut cp.tags, tags);
1289            }
1290            other => {
1291                return Err(invalid_parameter(format!(
1292                    "Unknown ECS resource type: {other}"
1293                )));
1294            }
1295        }
1296        Ok(AwsResponse::ok_json(json!({})))
1297    }
1298
1299    fn untag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1300        let body = request.json_body();
1301        let arn = req_str(&body, "resourceArn")?.to_string();
1302        let keys: Vec<String> = body
1303            .get("tagKeys")
1304            .and_then(|v| v.as_array())
1305            .map(|arr| {
1306                arr.iter()
1307                    .filter_map(|v| v.as_str().map(String::from))
1308                    .collect()
1309            })
1310            .unwrap_or_default();
1311        let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1312        let mut accounts = self.state.write();
1313        let state = accounts.get_or_create(&account);
1314        match resource_type.as_str() {
1315            "cluster" => {
1316                let cluster = state
1317                    .clusters
1318                    .get_mut(&tail)
1319                    .ok_or_else(|| resource_not_found(&arn))?;
1320                cluster.tags.retain(|t| !keys.contains(&t.key));
1321            }
1322            "task-definition" => {
1323                let (family, rev) = parse_family_revision(&tail);
1324                let rev = rev.ok_or_else(|| {
1325                    invalid_parameter("task-definition ARN must include revision")
1326                })?;
1327                let td = state
1328                    .task_definitions
1329                    .get_mut(&family)
1330                    .and_then(|m| m.get_mut(&rev))
1331                    .ok_or_else(|| resource_not_found(&arn))?;
1332                td.tags.retain(|t| !keys.contains(&t.key));
1333            }
1334            "service" => {
1335                let key =
1336                    resolve_service_key(state, &tail).ok_or_else(|| resource_not_found(&arn))?;
1337                let svc = state.services.get_mut(&key).expect("resolved key exists");
1338                svc.tags.retain(|t| !keys.contains(&t.key));
1339            }
1340            "task" => {
1341                let task_id = tail.rsplit('/').next().unwrap_or(&tail).to_string();
1342                let task = state
1343                    .tasks
1344                    .get_mut(&task_id)
1345                    .ok_or_else(|| resource_not_found(&arn))?;
1346                task.tags.retain(|t| !keys.contains(&t.key));
1347            }
1348            "task-set" => {
1349                let ts = state
1350                    .task_sets
1351                    .get_mut(&tail)
1352                    .ok_or_else(|| resource_not_found(&arn))?;
1353                ts.tags.retain(|t| !keys.contains(&t.key));
1354            }
1355            "container-instance" => {
1356                let key = resolve_container_instance_key(state, &tail)
1357                    .ok_or_else(|| resource_not_found(&arn))?;
1358                let ci = state
1359                    .container_instances
1360                    .get_mut(&key)
1361                    .expect("resolved key exists");
1362                ci.tags.retain(|t| !keys.contains(&t.key));
1363            }
1364            "capacity-provider" => {
1365                let cp = state
1366                    .capacity_providers
1367                    .get_mut(&tail)
1368                    .ok_or_else(|| resource_not_found(&arn))?;
1369                cp.tags.retain(|t| !keys.contains(&t.key));
1370            }
1371            other => {
1372                return Err(invalid_parameter(format!(
1373                    "Unknown ECS resource type: {other}"
1374                )));
1375            }
1376        }
1377        Ok(AwsResponse::ok_json(json!({})))
1378    }
1379
1380    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1381        let body = request.json_body();
1382        let arn = req_str(&body, "resourceArn")?.to_string();
1383        let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1384        let accounts = self.state.read();
1385        let state = accounts
1386            .get(&account)
1387            .ok_or_else(|| resource_not_found(&arn))?;
1388        let tags = match resource_type.as_str() {
1389            "cluster" => state
1390                .clusters
1391                .get(&tail)
1392                .map(|c| c.tags.clone())
1393                .ok_or_else(|| resource_not_found(&arn))?,
1394            "task-definition" => {
1395                let (family, rev) = parse_family_revision(&tail);
1396                let rev = rev.ok_or_else(|| {
1397                    invalid_parameter("task-definition ARN must include revision")
1398                })?;
1399                state
1400                    .task_definitions
1401                    .get(&family)
1402                    .and_then(|m| m.get(&rev))
1403                    .map(|td| td.tags.clone())
1404                    .ok_or_else(|| resource_not_found(&arn))?
1405            }
1406            "service" => {
1407                let key =
1408                    resolve_service_key(state, &tail).ok_or_else(|| resource_not_found(&arn))?;
1409                state
1410                    .services
1411                    .get(&key)
1412                    .map(|s| s.tags.clone())
1413                    .expect("resolved key exists")
1414            }
1415            "task" => {
1416                let task_id = tail.rsplit('/').next().unwrap_or(&tail).to_string();
1417                state
1418                    .tasks
1419                    .get(&task_id)
1420                    .map(|t| t.tags.clone())
1421                    .ok_or_else(|| resource_not_found(&arn))?
1422            }
1423            "task-set" => state
1424                .task_sets
1425                .get(&tail)
1426                .map(|t| t.tags.clone())
1427                .ok_or_else(|| resource_not_found(&arn))?,
1428            "container-instance" => {
1429                let key = resolve_container_instance_key(state, &tail)
1430                    .ok_or_else(|| resource_not_found(&arn))?;
1431                state
1432                    .container_instances
1433                    .get(&key)
1434                    .map(|c| c.tags.clone())
1435                    .expect("resolved key exists")
1436            }
1437            "capacity-provider" => state
1438                .capacity_providers
1439                .get(&tail)
1440                .map(|c| c.tags.clone())
1441                .ok_or_else(|| resource_not_found(&arn))?,
1442            other => {
1443                return Err(invalid_parameter(format!(
1444                    "Unknown ECS resource type: {other}"
1445                )));
1446            }
1447        };
1448        Ok(AwsResponse::ok_json(json!({"tags": tags_json(&tags)})))
1449    }
1450}
1451
1452fn resource_not_found(arn: &str) -> AwsServiceError {
1453    AwsServiceError::aws_error(
1454        StatusCode::BAD_REQUEST,
1455        "ClientException",
1456        format!("The referenced resource was not found: {arn}"),
1457    )
1458}
1459
1460// -------- operations: account settings --------
1461
1462impl EcsService {
1463    fn put_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1464        let body = request.json_body();
1465        let name = req_str(&body, "name")?.to_string();
1466        let value = req_str(&body, "value")?.to_string();
1467        let principal_arn = opt_str(&body, "principalArn")
1468            .map(String::from)
1469            .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1470            .unwrap_or_else(|| Arn::global("iam", &request.account_id, "root").to_string());
1471        let account = request.account_id.clone();
1472        let mut accounts = self.state.write();
1473        let state = accounts.get_or_create(&account);
1474        state
1475            .principal_account_settings
1476            .entry(principal_arn.clone())
1477            .or_default()
1478            .insert(name.clone(), value.clone());
1479        Ok(AwsResponse::ok_json(json!({
1480            "setting": {
1481                "name": name,
1482                "value": value,
1483                "principalArn": principal_arn,
1484            }
1485        })))
1486    }
1487
1488    fn put_account_setting_default(
1489        &self,
1490        request: &AwsRequest,
1491    ) -> Result<AwsResponse, AwsServiceError> {
1492        let body = request.json_body();
1493        let name = req_str(&body, "name")?.to_string();
1494        let value = req_str(&body, "value")?.to_string();
1495        let account = request.account_id.clone();
1496        let mut accounts = self.state.write();
1497        let state = accounts.get_or_create(&account);
1498        state
1499            .account_setting_defaults
1500            .insert(name.clone(), value.clone());
1501        Ok(AwsResponse::ok_json(json!({
1502            "setting": {
1503                "name": name,
1504                "value": value,
1505                "principalArn": Arn::global("iam", &state.account_id, "root").to_string(),
1506            }
1507        })))
1508    }
1509
1510    fn delete_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1511        let body = request.json_body();
1512        let name = req_str(&body, "name")?.to_string();
1513        let principal_arn = opt_str(&body, "principalArn")
1514            .map(String::from)
1515            .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1516            .unwrap_or_else(|| Arn::global("iam", &request.account_id, "root").to_string());
1517        let account = request.account_id.clone();
1518        let mut accounts = self.state.write();
1519        let state = accounts.get_or_create(&account);
1520        let removed_value = state
1521            .principal_account_settings
1522            .get_mut(&principal_arn)
1523            .and_then(|m| m.remove(&name));
1524        Ok(AwsResponse::ok_json(json!({
1525            "setting": {
1526                "name": name,
1527                "value": removed_value.unwrap_or_default(),
1528                "principalArn": principal_arn,
1529            }
1530        })))
1531    }
1532
1533    fn list_account_settings(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1534        let body = request.json_body();
1535        let name_filter = opt_str(&body, "name");
1536        let value_filter = opt_str(&body, "value");
1537        let principal_filter = opt_str(&body, "principalArn");
1538        let effective_only = body
1539            .get("effectiveSettings")
1540            .and_then(|v| v.as_bool())
1541            .unwrap_or(false);
1542
1543        let account = request.account_id.clone();
1544        let accounts = self.state.read();
1545        let Some(state) = accounts.get(&account) else {
1546            return Ok(AwsResponse::ok_json(json!({"settings": []})));
1547        };
1548        let root_arn = Arn::global("iam", &state.account_id, "root").to_string();
1549        let mut settings: Vec<Value> = Vec::new();
1550
1551        if effective_only {
1552            // Merge principal overrides onto defaults, scoped to principal_filter
1553            // when supplied; otherwise use the caller's own principal.
1554            let principal = principal_filter
1555                .map(String::from)
1556                .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1557                .unwrap_or_else(|| root_arn.clone());
1558            let mut merged = state.account_setting_defaults.clone();
1559            if let Some(overrides) = state.principal_account_settings.get(&principal) {
1560                for (k, v) in overrides {
1561                    merged.insert(k.clone(), v.clone());
1562                }
1563            }
1564            for (k, v) in merged {
1565                if matches_filter(name_filter, &k) && matches_filter(value_filter, &v) {
1566                    settings.push(json!({
1567                        "name": k,
1568                        "value": v,
1569                        "principalArn": principal,
1570                    }));
1571                }
1572            }
1573        } else {
1574            // Raw listing: include defaults (under the root ARN) plus any
1575            // principal-specific settings.
1576            for (k, v) in &state.account_setting_defaults {
1577                if matches_filter(name_filter, k)
1578                    && matches_filter(value_filter, v)
1579                    && (principal_filter.is_none() || principal_filter == Some(root_arn.as_str()))
1580                {
1581                    settings.push(json!({
1582                        "name": k,
1583                        "value": v,
1584                        "principalArn": root_arn,
1585                    }));
1586                }
1587            }
1588            for (principal, entries) in &state.principal_account_settings {
1589                if principal_filter.is_some_and(|pf| pf != principal) {
1590                    continue;
1591                }
1592                for (k, v) in entries {
1593                    if matches_filter(name_filter, k) && matches_filter(value_filter, v) {
1594                        settings.push(json!({
1595                            "name": k,
1596                            "value": v,
1597                            "principalArn": principal,
1598                        }));
1599                    }
1600                }
1601            }
1602        }
1603
1604        Ok(AwsResponse::ok_json(json!({"settings": settings})))
1605    }
1606}
1607
1608fn matches_filter(filter: Option<&str>, value: &str) -> bool {
1609    filter.is_none_or(|f| f == value)
1610}
1611
1612// -------- operations: tasks --------
1613
1614impl EcsService {
1615    fn run_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1616        let body = request.json_body();
1617        let td_ref = req_str(&body, "taskDefinition")?;
1618        let cluster_ref = opt_str(&body, "cluster");
1619        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1620        let launch_type = opt_str(&body, "launchType")
1621            .unwrap_or("FARGATE")
1622            .to_string();
1623        let count = body
1624            .get("count")
1625            .and_then(|v| v.as_i64())
1626            .filter(|n| (1..=10).contains(n))
1627            .unwrap_or(1) as usize;
1628        let group = opt_str(&body, "group").map(String::from);
1629        let started_by = opt_str(&body, "startedBy").map(String::from);
1630        let tags = parse_tags(&body);
1631
1632        // PassRole trust check on any role overrides supplied via the
1633        // overrides.taskRoleArn / overrides.executionRoleArn fields.
1634        // The base task definition was already checked at Register time,
1635        // but RunTask can override either role and AWS re-validates the
1636        // trust policy on every call.
1637        if let Some(overrides) = body.get("overrides") {
1638            if let Some(role_arn) = opt_str(overrides, "taskRoleArn") {
1639                self.check_pass_role(&request.account_id, role_arn)?;
1640            }
1641            if let Some(role_arn) = opt_str(overrides, "executionRoleArn") {
1642                self.check_pass_role(&request.account_id, role_arn)?;
1643            }
1644        }
1645
1646        let account = request.account_id.clone();
1647        let runtime = self.runtime.clone();
1648        let mut accounts = self.state.write();
1649        let state = accounts.get_or_create(&account);
1650        let cluster_arn = state
1651            .clusters
1652            .get(&cluster_name)
1653            .map(|c| c.cluster_arn.clone())
1654            .unwrap_or_else(|| state.cluster_arn(&cluster_name));
1655        let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
1656        let revisions = state
1657            .task_definitions
1658            .get(&family)
1659            .ok_or_else(|| task_definition_not_found(td_ref))?;
1660        let td = match rev {
1661            Some(n) => revisions
1662                .get(&n)
1663                .ok_or_else(|| task_definition_not_found(td_ref))?,
1664            None => latest_active_revision(revisions)
1665                .ok_or_else(|| task_definition_not_found(td_ref))?,
1666        };
1667        if td.status != "ACTIVE" {
1668            return Err(client_exception(format!(
1669                "Task definition {} is not ACTIVE",
1670                td.task_definition_arn
1671            )));
1672        }
1673        let td_arn = td.task_definition_arn.clone();
1674        let td_family = td.family.clone();
1675        let td_revision = td.revision;
1676        let td_cpu = td.cpu.clone();
1677        let td_memory = td.memory.clone();
1678        let td_task_role = td.task_role_arn.clone();
1679        let td_exec_role = td.execution_role_arn.clone();
1680        let td_containers = td.container_definitions.clone();
1681
1682        let mut spawned_tasks: Vec<String> = Vec::new();
1683        let mut task_jsons: Vec<Value> = Vec::new();
1684        for _ in 0..count {
1685            let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
1686            let task_arn = state.task_arn(&cluster_name, &task_id);
1687            let containers: Vec<Container> = td_containers
1688                .iter()
1689                .map(|def| Container {
1690                    container_arn: format!(
1691                        "arn:aws:ecs:{}:{}:container/{}/{}/{}",
1692                        state.region,
1693                        state.account_id,
1694                        cluster_name,
1695                        task_id,
1696                        def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
1697                    ),
1698                    name: def
1699                        .get("name")
1700                        .and_then(|v| v.as_str())
1701                        .unwrap_or("app")
1702                        .to_string(),
1703                    image: def
1704                        .get("image")
1705                        .and_then(|v| v.as_str())
1706                        .unwrap_or("")
1707                        .to_string(),
1708                    task_arn: task_arn.clone(),
1709                    last_status: "PENDING".into(),
1710                    exit_code: None,
1711                    reason: None,
1712                    runtime_id: None,
1713                    essential: def
1714                        .get("essential")
1715                        .and_then(|v| v.as_bool())
1716                        .unwrap_or(true),
1717                    cpu: def
1718                        .get("cpu")
1719                        .and_then(|v| v.as_i64())
1720                        .map(|n| n.to_string()),
1721                    memory: def
1722                        .get("memory")
1723                        .and_then(|v| v.as_i64())
1724                        .map(|n| n.to_string()),
1725                    memory_reservation: def
1726                        .get("memoryReservation")
1727                        .and_then(|v| v.as_i64())
1728                        .map(|n| n.to_string()),
1729                    network_bindings: Vec::new(),
1730                    network_interfaces: Vec::new(),
1731                    health_status: Some("UNKNOWN".to_string()),
1732                    managed_agents: None,
1733                })
1734                .collect();
1735            let awslogs = td_containers.iter().find_map(|def| {
1736                let name = def.get("name").and_then(|v| v.as_str())?.to_string();
1737                let log_cfg = def.get("logConfiguration")?;
1738                if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
1739                    return None;
1740                }
1741                let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
1742                Some(AwsLogsConfig {
1743                    group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
1744                    stream_prefix: opts
1745                        .get("awslogs-stream-prefix")
1746                        .and_then(|v| v.as_str())
1747                        .map(String::from),
1748                    region: opts
1749                        .get("awslogs-region")
1750                        .and_then(|v| v.as_str())
1751                        .unwrap_or(&state.region)
1752                        .to_string(),
1753                    container_name: name,
1754                })
1755            });
1756            let task = Task {
1757                task_arn: task_arn.clone(),
1758                task_id: task_id.clone(),
1759                cluster_arn: cluster_arn.clone(),
1760                cluster_name: cluster_name.clone(),
1761                task_definition_arn: td_arn.clone(),
1762                family: td_family.clone(),
1763                revision: td_revision,
1764                last_status: "PROVISIONING".into(),
1765                desired_status: "RUNNING".into(),
1766                launch_type: launch_type.clone(),
1767                platform_version: Some("1.4.0".into()),
1768                cpu: body
1769                    .get("overrides")
1770                    .and_then(|v| v.get("cpu"))
1771                    .and_then(|v| v.as_str())
1772                    .map(String::from)
1773                    .or_else(|| td_cpu.clone()),
1774                memory: body
1775                    .get("overrides")
1776                    .and_then(|v| v.get("memory"))
1777                    .and_then(|v| v.as_str())
1778                    .map(String::from)
1779                    .or_else(|| td_memory.clone()),
1780                containers,
1781                overrides: body.get("overrides").cloned().unwrap_or_else(|| json!({})),
1782                started_by: started_by.clone(),
1783                group: group.clone(),
1784                connectivity: "CONNECTING".into(),
1785                stop_code: None,
1786                stopped_reason: None,
1787                created_at: Utc::now(),
1788                started_at: None,
1789                stopping_at: None,
1790                stopped_at: None,
1791                pull_started_at: None,
1792                pull_stopped_at: None,
1793                connectivity_at: None,
1794                started_by_ref_id: None,
1795                execution_role_arn: td_exec_role.clone(),
1796                task_role_arn: td_task_role.clone(),
1797                tags: tags.clone(),
1798                awslogs,
1799                captured_logs: String::new(),
1800                protection: None,
1801            };
1802            state.tasks.insert(task_id.clone(), task.clone());
1803            if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
1804                cluster.pending_tasks_count += 1;
1805            }
1806            // Snapshot-in-progress: transition to PENDING synchronously so
1807            // callers that immediately DescribeTasks see movement. RUNNING /
1808            // STOPPED come later from the background runtime task.
1809            if let Some(t) = state.tasks.get_mut(&task_id) {
1810                t.last_status = "PENDING".into();
1811            }
1812            task_jsons.push(task_to_json(&task));
1813            spawned_tasks.push(task_id.clone());
1814        }
1815        drop(accounts);
1816
1817        // Launch container execution outside the state lock.
1818        if let Some(rt) = runtime {
1819            for id in &spawned_tasks {
1820                rt.clone()
1821                    .run_task(self.state.clone(), id.clone(), account.clone());
1822            }
1823        } else {
1824            // No runtime available — fail fast so the task doesn't stay
1825            // PENDING forever. We incremented pending_tasks_count above;
1826            // decrement it here so the cluster counter doesn't drift and
1827            // block later DeleteCluster calls.
1828            let mut accounts = self.state.write();
1829            if let Some(state) = accounts.get_mut(&account) {
1830                let mut cluster_drains: Vec<String> = Vec::new();
1831                for id in &spawned_tasks {
1832                    if let Some(t) = state.tasks.get_mut(id) {
1833                        t.last_status = "STOPPED".into();
1834                        t.desired_status = "STOPPED".into();
1835                        t.stop_code = Some("TaskFailedToStart".into());
1836                        t.stopped_reason = Some(
1837                            "No container runtime available (docker/podman not installed)".into(),
1838                        );
1839                        t.stopped_at = Some(Utc::now());
1840                        for c in t.containers.iter_mut() {
1841                            c.last_status = "STOPPED".into();
1842                        }
1843                        cluster_drains.push(t.cluster_name.clone());
1844                    }
1845                }
1846                for name in cluster_drains {
1847                    if let Some(cluster) = state.clusters.get_mut(&name) {
1848                        if cluster.pending_tasks_count > 0 {
1849                            cluster.pending_tasks_count -= 1;
1850                        }
1851                    }
1852                }
1853            }
1854        }
1855
1856        Ok(AwsResponse::ok_json(json!({
1857            "tasks": task_jsons,
1858            "failures": [],
1859        })))
1860    }
1861
1862    fn start_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1863        // StartTask targets explicit container instances. Our ECS emulator
1864        // has no concept of registered container instances yet (Batch 4);
1865        // fall through to the same semantics as RunTask so the API is
1866        // usable while the container-instance surface is pending.
1867        self.run_task(request)
1868    }
1869
1870    async fn stop_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1871        let body = request.json_body();
1872        let task_ref = req_str(&body, "task")?;
1873        let reason = opt_str(&body, "reason")
1874            .unwrap_or("UserInitiated")
1875            .to_string();
1876        let cluster_ref = opt_str(&body, "cluster");
1877        let _cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1878
1879        let (task_id, account, task_snapshot) = {
1880            let account = request.account_id.clone();
1881            let mut accounts = self.state.write();
1882            let state = accounts
1883                .get_mut(&account)
1884                .ok_or_else(|| task_not_found(task_ref))?;
1885            let task_id = resolve_task_id(state, task_ref)?;
1886            let task = state
1887                .tasks
1888                .get_mut(&task_id)
1889                .ok_or_else(|| task_not_found(task_ref))?;
1890            task.desired_status = "STOPPED".into();
1891            task.stopping_at = Some(Utc::now());
1892            task.stopped_reason = Some(reason.clone());
1893            task.stop_code = Some("UserInitiated".into());
1894            (task_id, account, task.clone())
1895        };
1896        if let Some(rt) = &self.runtime {
1897            rt.stop_task(&task_id, &reason).await;
1898        }
1899        let _ = account;
1900        Ok(AwsResponse::ok_json(json!({
1901            "task": task_to_json(&task_snapshot),
1902        })))
1903    }
1904
1905    fn describe_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1906        let body = request.json_body();
1907        let refs: Vec<String> = body
1908            .get("tasks")
1909            .and_then(|v| v.as_array())
1910            .map(|arr| {
1911                arr.iter()
1912                    .filter_map(|v| v.as_str().map(String::from))
1913                    .collect()
1914            })
1915            .unwrap_or_default();
1916        let include_tags = body
1917            .get("include")
1918            .and_then(|v| v.as_array())
1919            .map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
1920            .unwrap_or(false);
1921
1922        let account = request.account_id.clone();
1923        let accounts = self.state.read();
1924        let Some(state) = accounts.get(&account) else {
1925            return Ok(AwsResponse::ok_json(json!({
1926                "tasks": [],
1927                "failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
1928            })));
1929        };
1930        let mut found = Vec::new();
1931        let mut failures = Vec::new();
1932        for input in &refs {
1933            let task_id = task_id_from_ref(input);
1934            match state.tasks.get(&task_id) {
1935                Some(t) => {
1936                    let mut v = task_to_json(t);
1937                    if include_tags {
1938                        v.as_object_mut()
1939                            .unwrap()
1940                            .insert("tags".into(), tags_json(&t.tags));
1941                    }
1942                    found.push(v);
1943                }
1944                None => {
1945                    failures.push(json!({
1946                        "arn": input,
1947                        "reason": "MISSING",
1948                    }));
1949                }
1950            }
1951        }
1952        Ok(AwsResponse::ok_json(json!({
1953            "tasks": found,
1954            "failures": failures,
1955        })))
1956    }
1957
1958    fn list_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1959        let body = request.json_body();
1960        let cluster_ref = opt_str(&body, "cluster");
1961        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1962        let family = opt_str(&body, "family");
1963        let status_filter = opt_str(&body, "desiredStatus");
1964        let started_by = opt_str(&body, "startedBy");
1965        let max_results = body
1966            .get("maxResults")
1967            .and_then(|v| v.as_i64())
1968            .filter(|n| (1..=100).contains(n))
1969            .map(|n| n as usize)
1970            .unwrap_or(100);
1971        let next_token = opt_str(&body, "nextToken").unwrap_or("");
1972
1973        let account = request.account_id.clone();
1974        let accounts = self.state.read();
1975        let mut arns: Vec<String> = match accounts.get(&account) {
1976            Some(state) => state
1977                .tasks
1978                .values()
1979                .filter(|t| t.cluster_name == cluster_name)
1980                .filter(|t| family.is_none_or(|f| t.family == f))
1981                .filter(|t| status_filter.is_none_or(|s| t.desired_status == s))
1982                .filter(|t| started_by.is_none_or(|s| t.started_by.as_deref() == Some(s)))
1983                .map(|t| t.task_arn.clone())
1984                .collect(),
1985            None => Vec::new(),
1986        };
1987        arns.sort();
1988        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
1989        let end = (start + max_results).min(arns.len());
1990        let page = arns[start..end].to_vec();
1991        let mut out = json!({"taskArns": page});
1992        if end < arns.len() {
1993            out.as_object_mut()
1994                .unwrap()
1995                .insert("nextToken".into(), json!(end.to_string()));
1996        }
1997        Ok(AwsResponse::ok_json(out))
1998    }
1999}
2000
2001fn task_not_found(task_ref: &str) -> AwsServiceError {
2002    AwsServiceError::aws_error(
2003        StatusCode::BAD_REQUEST,
2004        "ClientException",
2005        format!("Task not found: {task_ref}"),
2006    )
2007}
2008
2009/// Strip cluster prefix + optional ARN prefix to recover the task UUID.
2010fn task_id_from_ref(input: &str) -> String {
2011    if let Some(rest) = input.rsplit('/').next() {
2012        return rest.to_string();
2013    }
2014    input.to_string()
2015}
2016
2017fn resolve_task_id(state: &EcsState, task_ref: &str) -> Result<String, AwsServiceError> {
2018    let id = task_id_from_ref(task_ref);
2019    if state.tasks.contains_key(&id) {
2020        Ok(id)
2021    } else {
2022        Err(task_not_found(task_ref))
2023    }
2024}
2025
2026fn task_to_json(task: &Task) -> Value {
2027    let mut map = serde_json::Map::new();
2028    map.insert("taskArn".into(), json!(task.task_arn));
2029    map.insert("clusterArn".into(), json!(task.cluster_arn));
2030    map.insert("taskDefinitionArn".into(), json!(task.task_definition_arn));
2031    map.insert("lastStatus".into(), json!(task.last_status));
2032    map.insert("desiredStatus".into(), json!(task.desired_status));
2033    map.insert("launchType".into(), json!(task.launch_type));
2034    if let Some(ref v) = task.platform_version {
2035        map.insert("platformVersion".into(), json!(v));
2036    }
2037    if let Some(ref v) = task.cpu {
2038        map.insert("cpu".into(), json!(v));
2039    }
2040    if let Some(ref v) = task.memory {
2041        map.insert("memory".into(), json!(v));
2042    }
2043    map.insert(
2044        "containers".into(),
2045        Value::Array(task.containers.iter().map(container_to_json).collect()),
2046    );
2047    map.insert("overrides".into(), task.overrides.clone());
2048    if let Some(ref v) = task.started_by {
2049        map.insert("startedBy".into(), json!(v));
2050    }
2051    if let Some(ref v) = task.group {
2052        map.insert("group".into(), json!(v));
2053    }
2054    map.insert("connectivity".into(), json!(task.connectivity));
2055    if let Some(ref v) = task.stop_code {
2056        map.insert("stopCode".into(), json!(v));
2057    }
2058    if let Some(ref v) = task.stopped_reason {
2059        map.insert("stoppedReason".into(), json!(v));
2060    }
2061    if let Some(ref v) = task.task_role_arn {
2062        map.insert("taskRoleArn".into(), json!(v));
2063    }
2064    if let Some(ref v) = task.execution_role_arn {
2065        map.insert("executionRoleArn".into(), json!(v));
2066    }
2067    map.insert("createdAt".into(), json!(task.created_at.timestamp()));
2068    if let Some(ts) = task.started_at {
2069        map.insert("startedAt".into(), json!(ts.timestamp()));
2070    }
2071    if let Some(ts) = task.stopping_at {
2072        map.insert("stoppingAt".into(), json!(ts.timestamp()));
2073    }
2074    if let Some(ts) = task.stopped_at {
2075        map.insert("stoppedAt".into(), json!(ts.timestamp()));
2076    }
2077    if let Some(ts) = task.pull_started_at {
2078        map.insert("pullStartedAt".into(), json!(ts.timestamp()));
2079    }
2080    if let Some(ts) = task.pull_stopped_at {
2081        map.insert("pullStoppedAt".into(), json!(ts.timestamp()));
2082    }
2083    if let Some(ts) = task.connectivity_at {
2084        map.insert("connectivityAt".into(), json!(ts.timestamp()));
2085    }
2086    Value::Object(map)
2087}
2088
2089fn container_to_json(container: &Container) -> Value {
2090    let mut map = serde_json::Map::new();
2091    map.insert("containerArn".into(), json!(container.container_arn));
2092    map.insert("taskArn".into(), json!(container.task_arn));
2093    map.insert("name".into(), json!(container.name));
2094    map.insert("image".into(), json!(container.image));
2095    map.insert("lastStatus".into(), json!(container.last_status));
2096    map.insert("essential".into(), json!(container.essential));
2097    if let Some(code) = container.exit_code {
2098        map.insert("exitCode".into(), json!(code));
2099    }
2100    if let Some(ref r) = container.reason {
2101        map.insert("reason".into(), json!(r));
2102    }
2103    if let Some(ref id) = container.runtime_id {
2104        map.insert("runtimeId".into(), json!(id));
2105    }
2106    if let Some(ref v) = container.cpu {
2107        map.insert("cpu".into(), json!(v));
2108    }
2109    if let Some(ref v) = container.memory {
2110        map.insert("memory".into(), json!(v));
2111    }
2112    if let Some(ref v) = container.memory_reservation {
2113        map.insert("memoryReservation".into(), json!(v));
2114    }
2115    map.insert("networkBindings".into(), json!(container.network_bindings));
2116    map.insert(
2117        "networkInterfaces".into(),
2118        json!(container.network_interfaces),
2119    );
2120    if let Some(ref v) = container.health_status {
2121        map.insert("healthStatus".into(), json!(v));
2122    }
2123    Value::Object(map)
2124}
2125
2126// -------- operations: services --------
2127
2128impl EcsService {
2129    fn create_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2130        let body = request.json_body();
2131        let service_name = req_str(&body, "serviceName")?.to_string();
2132        validate_service_name(&service_name)?;
2133        let td_ref = req_str(&body, "taskDefinition")?;
2134        let cluster_ref = opt_str(&body, "cluster");
2135        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2136        let desired_count = body
2137            .get("desiredCount")
2138            .and_then(|v| v.as_i64())
2139            .filter(|n| *n >= 0)
2140            .unwrap_or(1) as i32;
2141        let launch_type = opt_str(&body, "launchType")
2142            .unwrap_or("FARGATE")
2143            .to_string();
2144        let scheduling = opt_str(&body, "schedulingStrategy")
2145            .unwrap_or("REPLICA")
2146            .to_string();
2147        let deployment_controller = body
2148            .get("deploymentController")
2149            .and_then(|v| v.get("type"))
2150            .and_then(|v| v.as_str())
2151            .unwrap_or("ECS")
2152            .to_string();
2153        let deployment_config = body.get("deploymentConfiguration");
2154        let min_healthy = deployment_config
2155            .and_then(|d| d.get("minimumHealthyPercent"))
2156            .and_then(|v| v.as_i64())
2157            .map(|n| n as i32);
2158        let max_percent = deployment_config
2159            .and_then(|d| d.get("maximumPercent"))
2160            .and_then(|v| v.as_i64())
2161            .map(|n| n as i32);
2162        let circuit = deployment_config.and_then(|d| d.get("deploymentCircuitBreaker"));
2163        let circuit_breaker = circuit.map(|c| CircuitBreakerConfig {
2164            enable: c.get("enable").and_then(|v| v.as_bool()).unwrap_or(false),
2165            rollback: c.get("rollback").and_then(|v| v.as_bool()).unwrap_or(false),
2166        });
2167        let tags = parse_tags(&body);
2168        let role_arn = opt_str(&body, "role").map(String::from);
2169        let load_balancers: Vec<Value> = body
2170            .get("loadBalancers")
2171            .and_then(|v| v.as_array())
2172            .cloned()
2173            .unwrap_or_default();
2174        let service_registries: Vec<Value> = body
2175            .get("serviceRegistries")
2176            .and_then(|v| v.as_array())
2177            .cloned()
2178            .unwrap_or_default();
2179        let placement_constraints: Vec<Value> = body
2180            .get("placementConstraints")
2181            .and_then(|v| v.as_array())
2182            .cloned()
2183            .unwrap_or_default();
2184        let placement_strategy: Vec<Value> = body
2185            .get("placementStrategy")
2186            .and_then(|v| v.as_array())
2187            .cloned()
2188            .unwrap_or_default();
2189        let network_configuration = body.get("networkConfiguration").cloned();
2190
2191        let runtime = self.runtime.clone();
2192        let account = request.account_id.clone();
2193        let principal_arn = request
2194            .principal
2195            .as_ref()
2196            .map(|p| p.arn.clone())
2197            .unwrap_or_else(|| Arn::global("iam", &request.account_id, "root").to_string());
2198
2199        let (service_json, spawn_task_ids) = {
2200            let mut accounts = self.state.write();
2201            let state = accounts.get_or_create(&account);
2202            // Resolve task definition to get family/revision.
2203            let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
2204            let revisions = state
2205                .task_definitions
2206                .get(&family)
2207                .ok_or_else(|| task_definition_not_found(td_ref))?;
2208            let td = match rev {
2209                Some(n) => revisions
2210                    .get(&n)
2211                    .ok_or_else(|| task_definition_not_found(td_ref))?,
2212                None => latest_active_revision(revisions)
2213                    .ok_or_else(|| task_definition_not_found(td_ref))?,
2214            };
2215            let td_arn = td.task_definition_arn.clone();
2216            let td_family = td.family.clone();
2217            let td_revision = td.revision;
2218            let cluster_arn = state
2219                .clusters
2220                .get(&cluster_name)
2221                .map(|c| c.cluster_arn.clone())
2222                .unwrap_or_else(|| state.cluster_arn(&cluster_name));
2223            let service_arn = state.service_arn(&cluster_name, &service_name);
2224            let key = EcsState::service_key(&cluster_name, &service_name);
2225            if let Some(existing) = state.services.get(&key) {
2226                if existing.status != "INACTIVE" {
2227                    return Err(service_already_exists(&service_name));
2228                }
2229            }
2230            let deployment = Deployment {
2231                deployment_id: format!(
2232                    "ecs-svc/{}",
2233                    uuid::Uuid::new_v4().as_u128() & 0xffff_ffff_ffff_ffff
2234                ),
2235                status: "PRIMARY".into(),
2236                task_definition_arn: td_arn.clone(),
2237                desired_count,
2238                pending_count: 0,
2239                running_count: 0,
2240                failed_tasks: 0,
2241                created_at: Utc::now(),
2242                updated_at: Utc::now(),
2243                launch_type: launch_type.clone(),
2244                rollout_state: "IN_PROGRESS".into(),
2245                rollout_state_reason: Some("ECS deployment in progress.".into()),
2246            };
2247            let service = Service {
2248                service_name: service_name.clone(),
2249                service_arn: service_arn.clone(),
2250                cluster_name: cluster_name.clone(),
2251                cluster_arn: cluster_arn.clone(),
2252                task_definition_arn: td_arn,
2253                family: td_family,
2254                revision: td_revision,
2255                desired_count,
2256                running_count: 0,
2257                pending_count: 0,
2258                launch_type: launch_type.clone(),
2259                status: "ACTIVE".into(),
2260                scheduling_strategy: scheduling,
2261                deployment_controller,
2262                minimum_healthy_percent: min_healthy,
2263                maximum_percent: max_percent,
2264                circuit_breaker,
2265                deployments: vec![deployment],
2266                load_balancers,
2267                service_registries,
2268                placement_constraints,
2269                placement_strategy,
2270                network_configuration,
2271                tags: tags.clone(),
2272                created_at: Utc::now(),
2273                created_by: Some(principal_arn.clone()),
2274                role_arn,
2275            };
2276            state.services.insert(key.clone(), service.clone());
2277            if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2278                cluster.active_services_count += 1;
2279            }
2280            state.push_event(crate::state::LifecycleEvent {
2281                at: Utc::now(),
2282                event_type: "ServiceCreated".into(),
2283                task_arn: None,
2284                cluster_arn: Some(cluster_arn),
2285                last_status: Some("ACTIVE".into()),
2286                detail: json!({"serviceArn": service_arn, "desiredCount": desired_count}),
2287            });
2288            let ids =
2289                spawn_service_tasks(state, &service, desired_count, &principal_arn, &launch_type);
2290            (service_to_json(state.services.get(&key).unwrap()), ids)
2291        };
2292
2293        if let Some(rt) = runtime {
2294            for id in spawn_task_ids {
2295                rt.clone().run_task(self.state.clone(), id, account.clone());
2296            }
2297        }
2298
2299        Ok(AwsResponse::ok_json(json!({ "service": service_json })))
2300    }
2301
2302    fn update_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2303        let body = request.json_body();
2304        let service_ref = req_str(&body, "service")?;
2305        let service_name = service_name_from_ref(service_ref);
2306        let cluster_ref = opt_str(&body, "cluster");
2307        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2308        let new_desired = body.get("desiredCount").and_then(|v| v.as_i64());
2309        let new_td_ref = opt_str(&body, "taskDefinition");
2310        let account = request.account_id.clone();
2311        let principal_arn = request
2312            .principal
2313            .as_ref()
2314            .map(|p| p.arn.clone())
2315            .unwrap_or_else(|| Arn::global("iam", &request.account_id, "root").to_string());
2316        let runtime = self.runtime.clone();
2317
2318        let (service_json, spawn_ids, stop_ids) = {
2319            let mut accounts = self.state.write();
2320            let state = accounts
2321                .get_mut(&account)
2322                .ok_or_else(|| service_not_found(&service_name))?;
2323            let key = EcsState::service_key(&cluster_name, &service_name);
2324            if !state.services.contains_key(&key) {
2325                return Err(service_not_found(&service_name));
2326            }
2327
2328            // Resolve new task definition (may stay on current one).
2329            let (new_td_arn, new_family, new_revision) = if let Some(td_ref) = new_td_ref {
2330                let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
2331                let revisions = state
2332                    .task_definitions
2333                    .get(&family)
2334                    .ok_or_else(|| task_definition_not_found(td_ref))?;
2335                let td = match rev {
2336                    Some(n) => revisions
2337                        .get(&n)
2338                        .ok_or_else(|| task_definition_not_found(td_ref))?,
2339                    None => latest_active_revision(revisions)
2340                        .ok_or_else(|| task_definition_not_found(td_ref))?,
2341                };
2342                (
2343                    Some(td.task_definition_arn.clone()),
2344                    td.family.clone(),
2345                    td.revision,
2346                )
2347            } else {
2348                let svc = state.services.get(&key).unwrap();
2349                (None, svc.family.clone(), svc.revision)
2350            };
2351
2352            let service_cluster_arn;
2353            let launch_type_clone;
2354            let effective_desired;
2355            let old_desired;
2356            let mut old_deployments_drained: Vec<String> = Vec::new();
2357            let mut new_deployment_triggered = false;
2358
2359            {
2360                let svc = state.services.get_mut(&key).unwrap();
2361                old_desired = svc.desired_count;
2362                service_cluster_arn = svc.cluster_arn.clone();
2363                launch_type_clone = svc.launch_type.clone();
2364
2365                if let Some(n) = new_desired {
2366                    let n = n.max(0) as i32;
2367                    svc.desired_count = n;
2368                    if let Some(d) = svc.deployments.iter_mut().find(|d| d.status == "PRIMARY") {
2369                        d.desired_count = n;
2370                        d.updated_at = Utc::now();
2371                    }
2372                }
2373
2374                if let Some(arn) = new_td_arn.clone() {
2375                    // Roll a new PRIMARY deployment; mark the previous one ACTIVE
2376                    // so it's eligible for drain once the new deployment ramps.
2377                    for d in svc.deployments.iter_mut() {
2378                        if d.status == "PRIMARY" {
2379                            d.status = "ACTIVE".into();
2380                            old_deployments_drained.push(d.deployment_id.clone());
2381                        }
2382                    }
2383                    svc.deployments.insert(
2384                        0,
2385                        Deployment {
2386                            deployment_id: format!(
2387                                "ecs-svc/{}",
2388                                uuid::Uuid::new_v4().as_u128() & 0xffff_ffff_ffff_ffff
2389                            ),
2390                            status: "PRIMARY".into(),
2391                            task_definition_arn: arn.clone(),
2392                            desired_count: svc.desired_count,
2393                            pending_count: 0,
2394                            running_count: 0,
2395                            failed_tasks: 0,
2396                            created_at: Utc::now(),
2397                            updated_at: Utc::now(),
2398                            launch_type: svc.launch_type.clone(),
2399                            rollout_state: "IN_PROGRESS".into(),
2400                            rollout_state_reason: Some("ECS deployment in progress.".into()),
2401                        },
2402                    );
2403                    svc.task_definition_arn = arn;
2404                    svc.family = new_family;
2405                    svc.revision = new_revision;
2406                    new_deployment_triggered = true;
2407                }
2408
2409                effective_desired = svc.desired_count;
2410            }
2411
2412            // Compute spawn + stop plan.
2413            let mut spawn: Vec<String> = Vec::new();
2414            let mut stop: Vec<String> = Vec::new();
2415
2416            // Tasks belonging to this service (by startedBy convention):
2417            let service_tag = format!("ecs-svc/{}", service_name);
2418            let current_tasks: Vec<(String, String)> = state
2419                .tasks
2420                .iter()
2421                .filter(|(_, t)| {
2422                    t.started_by.as_deref() == Some(service_tag.as_str())
2423                        && t.cluster_name == cluster_name
2424                        && t.last_status != "STOPPED"
2425                })
2426                .map(|(id, t)| (id.clone(), t.task_definition_arn.clone()))
2427                .collect();
2428
2429            let current_count = current_tasks.len() as i32;
2430            if effective_desired > current_count {
2431                let add = (effective_desired - current_count) as usize;
2432                let svc_snapshot = state.services.get(&key).unwrap().clone();
2433                let mut new_ids = spawn_service_tasks(
2434                    state,
2435                    &svc_snapshot,
2436                    add as i32,
2437                    &principal_arn,
2438                    &launch_type_clone,
2439                );
2440                spawn.append(&mut new_ids);
2441            } else if effective_desired < current_count {
2442                let remove = (current_count - effective_desired) as usize;
2443                for (id, _) in current_tasks.iter().take(remove) {
2444                    stop.push(id.clone());
2445                }
2446            }
2447
2448            // If a new deployment was triggered, also stop tasks still on
2449            // the old task definition so the new deployment can ramp up,
2450            // then spawn replacements — but only enough to hit the
2451            // effective desired count (not `stop.len()`, which conflates
2452            // scale-down drain with TD-drain and would over-spawn).
2453            if new_deployment_triggered {
2454                let new_td_arn_match = state
2455                    .services
2456                    .get(&key)
2457                    .unwrap()
2458                    .task_definition_arn
2459                    .clone();
2460                // Tasks already on the new task definition that we're NOT
2461                // stopping (scale-down may have picked the first N; those
2462                // are skipped here via `stop.contains(id)`).
2463                let kept_on_new_td: i32 = current_tasks
2464                    .iter()
2465                    .filter(|(id, t_arn)| *t_arn == new_td_arn_match && !stop.contains(id))
2466                    .count() as i32;
2467                for (id, t_arn) in &current_tasks {
2468                    if *t_arn != new_td_arn_match && !stop.contains(id) {
2469                        stop.push(id.clone());
2470                    }
2471                }
2472                let already_spawned = spawn.len() as i32;
2473                let need = (effective_desired - kept_on_new_td - already_spawned).max(0);
2474                if need > 0 {
2475                    let svc_snapshot = state.services.get(&key).unwrap().clone();
2476                    let mut more = spawn_service_tasks(
2477                        state,
2478                        &svc_snapshot,
2479                        need,
2480                        &principal_arn,
2481                        &launch_type_clone,
2482                    );
2483                    spawn.append(&mut more);
2484                }
2485            }
2486
2487            state.push_event(crate::state::LifecycleEvent {
2488                at: Utc::now(),
2489                event_type: "ServiceUpdated".into(),
2490                task_arn: None,
2491                cluster_arn: Some(service_cluster_arn),
2492                last_status: Some("ACTIVE".into()),
2493                detail: json!({
2494                    "serviceArn": state.services.get(&key).unwrap().service_arn,
2495                    "desiredCount": effective_desired,
2496                    "previousDesiredCount": old_desired,
2497                    "newDeployment": new_deployment_triggered,
2498                    "drainedDeployments": old_deployments_drained,
2499                }),
2500            });
2501
2502            let svc = state.services.get(&key).unwrap();
2503            (service_to_json(svc), spawn, stop)
2504        };
2505
2506        if let Some(rt) = runtime {
2507            for id in spawn_ids {
2508                rt.clone().run_task(self.state.clone(), id, account.clone());
2509            }
2510            for id in stop_ids {
2511                let rt2 = rt.clone();
2512                let id_clone = id.clone();
2513                tokio::spawn(async move {
2514                    rt2.stop_task(&id_clone, "ECS service scale-down").await;
2515                });
2516                // Flip the task's desired status synchronously so DescribeTasks reflects intent.
2517                let mut accounts = self.state.write();
2518                if let Some(state) = accounts.get_mut(&account) {
2519                    if let Some(task) = state.tasks.get_mut(&id) {
2520                        task.desired_status = "STOPPED".into();
2521                        task.stopping_at = Some(Utc::now());
2522                    }
2523                }
2524            }
2525        }
2526
2527        Ok(AwsResponse::ok_json(json!({ "service": service_json })))
2528    }
2529
2530    async fn delete_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2531        let body = request.json_body();
2532        let service_ref = req_str(&body, "service")?;
2533        let service_name = service_name_from_ref(service_ref);
2534        let cluster_ref = opt_str(&body, "cluster");
2535        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2536        let force = body.get("force").and_then(|v| v.as_bool()).unwrap_or(false);
2537
2538        let (snapshot, task_ids_to_stop) = {
2539            let mut accounts = self.state.write();
2540            let state = accounts
2541                .get_mut(&request.account_id)
2542                .ok_or_else(|| service_not_found(&service_name))?;
2543            let key = EcsState::service_key(&cluster_name, &service_name);
2544            let svc = state
2545                .services
2546                .get_mut(&key)
2547                .ok_or_else(|| service_not_found(&service_name))?;
2548            if !force && svc.desired_count > 0 {
2549                return Err(client_exception(
2550                    "The service cannot be stopped while it is scaled above 0. \
2551                     Either set desiredCount to 0 first, or pass force=true.",
2552                ));
2553            }
2554            svc.desired_count = 0;
2555            svc.status = "DRAINING".into();
2556            let service_tag = format!("ecs-svc/{}", service_name);
2557            let stop_ids: Vec<String> = state
2558                .tasks
2559                .iter()
2560                .filter(|(_, t)| {
2561                    t.started_by.as_deref() == Some(service_tag.as_str())
2562                        && t.cluster_name == cluster_name
2563                        && t.last_status != "STOPPED"
2564                })
2565                .map(|(id, _)| id.clone())
2566                .collect();
2567            if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2568                if cluster.active_services_count > 0 {
2569                    cluster.active_services_count -= 1;
2570                }
2571            }
2572            let svc_snapshot = state.services.get(&key).unwrap().clone();
2573            state.services.remove(&key);
2574            state.push_event(crate::state::LifecycleEvent {
2575                at: Utc::now(),
2576                event_type: "ServiceDeleted".into(),
2577                task_arn: None,
2578                cluster_arn: Some(svc_snapshot.cluster_arn.clone()),
2579                last_status: Some("DRAINING".into()),
2580                detail: json!({"serviceArn": svc_snapshot.service_arn}),
2581            });
2582            (svc_snapshot, stop_ids)
2583        };
2584
2585        if let Some(rt) = &self.runtime {
2586            for id in &task_ids_to_stop {
2587                rt.stop_task(id, "ECS service deletion").await;
2588                let mut accounts = self.state.write();
2589                if let Some(state) = accounts.get_mut(&request.account_id) {
2590                    if let Some(task) = state.tasks.get_mut(id) {
2591                        task.desired_status = "STOPPED".into();
2592                        task.stopping_at = Some(Utc::now());
2593                    }
2594                }
2595            }
2596        }
2597
2598        Ok(AwsResponse::ok_json(
2599            json!({ "service": service_to_json(&snapshot) }),
2600        ))
2601    }
2602
2603    fn describe_services(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2604        let body = request.json_body();
2605        let cluster_ref = opt_str(&body, "cluster");
2606        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2607        let refs: Vec<String> = body
2608            .get("services")
2609            .and_then(|v| v.as_array())
2610            .map(|arr| {
2611                arr.iter()
2612                    .filter_map(|v| v.as_str().map(String::from))
2613                    .collect()
2614            })
2615            .unwrap_or_default();
2616
2617        let account = request.account_id.clone();
2618        let accounts = self.state.read();
2619        let mut found = Vec::new();
2620        let mut failures = Vec::new();
2621        let Some(state) = accounts.get(&account) else {
2622            for r in &refs {
2623                failures.push(json!({"arn": r, "reason": "MISSING"}));
2624            }
2625            return Ok(AwsResponse::ok_json(
2626                json!({"services": found, "failures": failures}),
2627            ));
2628        };
2629        for r in &refs {
2630            let name = service_name_from_ref(r);
2631            let key = EcsState::service_key(&cluster_name, &name);
2632            match state.services.get(&key) {
2633                Some(svc) => {
2634                    let mut v = service_to_json(svc);
2635                    // Update derived running/pending counts from current tasks.
2636                    recompute_service_counts(state, &name, &cluster_name, &mut v);
2637                    found.push(v);
2638                }
2639                None => failures.push(json!({"arn": r, "reason": "MISSING"})),
2640            }
2641        }
2642        Ok(AwsResponse::ok_json(json!({
2643            "services": found,
2644            "failures": failures,
2645        })))
2646    }
2647
2648    fn list_services(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2649        let body = request.json_body();
2650        let cluster_ref = opt_str(&body, "cluster");
2651        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2652        let launch_type = opt_str(&body, "launchType");
2653        let scheduling = opt_str(&body, "schedulingStrategy");
2654        let max_results = body
2655            .get("maxResults")
2656            .and_then(|v| v.as_i64())
2657            .filter(|n| (1..=100).contains(n))
2658            .map(|n| n as usize)
2659            .unwrap_or(100);
2660        let next_token = opt_str(&body, "nextToken").unwrap_or("");
2661
2662        let account = request.account_id.clone();
2663        let accounts = self.state.read();
2664        let mut arns: Vec<String> = match accounts.get(&account) {
2665            Some(state) => state
2666                .services
2667                .values()
2668                .filter(|s| s.cluster_name == cluster_name)
2669                .filter(|s| launch_type.is_none_or(|lt| s.launch_type == lt))
2670                .filter(|s| scheduling.is_none_or(|sc| s.scheduling_strategy == sc))
2671                .map(|s| s.service_arn.clone())
2672                .collect(),
2673            None => Vec::new(),
2674        };
2675        arns.sort();
2676        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
2677        let end = (start + max_results).min(arns.len());
2678        let page = arns[start..end].to_vec();
2679        let mut out = json!({"serviceArns": page});
2680        if end < arns.len() {
2681            out.as_object_mut()
2682                .unwrap()
2683                .insert("nextToken".into(), json!(end.to_string()));
2684        }
2685        Ok(AwsResponse::ok_json(out))
2686    }
2687
2688    fn list_services_by_namespace(
2689        &self,
2690        request: &AwsRequest,
2691    ) -> Result<AwsResponse, AwsServiceError> {
2692        // fakecloud doesn't model Cloud Map namespaces yet — return all
2693        // services attached to services with registries pointing at the
2694        // given namespace ARN. Filter is loose: treat the namespace as a
2695        // hint and return every service when none match to mirror AWS's
2696        // "loose match when ambiguous" response shape.
2697        let body = request.json_body();
2698        let namespace = req_str(&body, "namespace")?.to_string();
2699        let account = request.account_id.clone();
2700        let accounts = self.state.read();
2701        let mut arns: Vec<String> = match accounts.get(&account) {
2702            Some(state) => state
2703                .services
2704                .values()
2705                .filter(|s| {
2706                    s.service_registries.iter().any(|r| {
2707                        r.get("registryArn")
2708                            .and_then(|v| v.as_str())
2709                            .is_some_and(|arn| arn.contains(&namespace))
2710                    })
2711                })
2712                .map(|s| s.service_arn.clone())
2713                .collect(),
2714            None => Vec::new(),
2715        };
2716        arns.sort();
2717        Ok(AwsResponse::ok_json(json!({"serviceArns": arns})))
2718    }
2719}
2720
2721fn validate_service_name(name: &str) -> Result<(), AwsServiceError> {
2722    if name.is_empty() || name.len() > 255 {
2723        return Err(invalid_parameter("Service name must be 1-255 characters"));
2724    }
2725    let ok = name
2726        .chars()
2727        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-');
2728    if !ok {
2729        return Err(invalid_parameter(
2730            "Service name may only contain letters, numbers, hyphens, and underscores",
2731        ));
2732    }
2733    Ok(())
2734}
2735
2736fn service_name_from_ref(input: &str) -> String {
2737    if let Some(rest) = input.rsplit('/').next() {
2738        return rest.to_string();
2739    }
2740    input.to_string()
2741}
2742
2743fn service_not_found(name: &str) -> AwsServiceError {
2744    AwsServiceError::aws_error(
2745        StatusCode::BAD_REQUEST,
2746        "ServiceNotFoundException",
2747        format!("The service could not be found: {name}"),
2748    )
2749}
2750
2751fn service_already_exists(name: &str) -> AwsServiceError {
2752    AwsServiceError::aws_error(
2753        StatusCode::BAD_REQUEST,
2754        "ServiceNotActiveException",
2755        format!("The service {name} already exists"),
2756    )
2757}
2758
2759/// Spawn N tasks for a service by cloning the task-definition containers
2760/// and inserting `Task` rows in the shared state. The task IDs are
2761/// returned so the caller can hand them to `EcsRuntime::run_task` after
2762/// releasing the state write lock.
2763fn spawn_service_tasks(
2764    state: &mut EcsState,
2765    service: &Service,
2766    count: i32,
2767    principal_arn: &str,
2768    launch_type: &str,
2769) -> Vec<String> {
2770    if count <= 0 {
2771        return Vec::new();
2772    }
2773    let Some(revisions) = state.task_definitions.get(&service.family) else {
2774        return Vec::new();
2775    };
2776    let Some(td) = revisions.get(&service.revision) else {
2777        return Vec::new();
2778    };
2779    let container_defs = td.container_definitions.clone();
2780    let cpu = td.cpu.clone();
2781    let memory = td.memory.clone();
2782    let task_role = td.task_role_arn.clone();
2783    let exec_role = td.execution_role_arn.clone();
2784    let cluster_name = service.cluster_name.clone();
2785    let cluster_arn = service.cluster_arn.clone();
2786    let td_arn = service.task_definition_arn.clone();
2787    let family = service.family.clone();
2788    let revision = service.revision;
2789    let service_tag = format!("ecs-svc/{}", service.service_name);
2790
2791    let mut ids = Vec::with_capacity(count as usize);
2792    for _ in 0..count {
2793        let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
2794        let task_arn = state.task_arn(&cluster_name, &task_id);
2795        let containers: Vec<Container> = container_defs
2796            .iter()
2797            .map(|def| Container {
2798                container_arn: format!(
2799                    "arn:aws:ecs:{}:{}:container/{}/{}/{}",
2800                    state.region,
2801                    state.account_id,
2802                    cluster_name,
2803                    task_id,
2804                    def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
2805                ),
2806                name: def
2807                    .get("name")
2808                    .and_then(|v| v.as_str())
2809                    .unwrap_or("app")
2810                    .to_string(),
2811                image: def
2812                    .get("image")
2813                    .and_then(|v| v.as_str())
2814                    .unwrap_or("")
2815                    .to_string(),
2816                task_arn: task_arn.clone(),
2817                last_status: "PENDING".into(),
2818                exit_code: None,
2819                reason: None,
2820                runtime_id: None,
2821                essential: def
2822                    .get("essential")
2823                    .and_then(|v| v.as_bool())
2824                    .unwrap_or(true),
2825                cpu: def
2826                    .get("cpu")
2827                    .and_then(|v| v.as_i64())
2828                    .map(|n| n.to_string()),
2829                memory: def
2830                    .get("memory")
2831                    .and_then(|v| v.as_i64())
2832                    .map(|n| n.to_string()),
2833                memory_reservation: def
2834                    .get("memoryReservation")
2835                    .and_then(|v| v.as_i64())
2836                    .map(|n| n.to_string()),
2837                network_bindings: Vec::new(),
2838                network_interfaces: Vec::new(),
2839                health_status: Some("UNKNOWN".into()),
2840                managed_agents: None,
2841            })
2842            .collect();
2843        let awslogs = container_defs.iter().find_map(|def| {
2844            let name = def.get("name").and_then(|v| v.as_str())?.to_string();
2845            let log_cfg = def.get("logConfiguration")?;
2846            if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
2847                return None;
2848            }
2849            let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
2850            Some(AwsLogsConfig {
2851                group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
2852                stream_prefix: opts
2853                    .get("awslogs-stream-prefix")
2854                    .and_then(|v| v.as_str())
2855                    .map(String::from),
2856                region: opts
2857                    .get("awslogs-region")
2858                    .and_then(|v| v.as_str())
2859                    .unwrap_or(&state.region)
2860                    .to_string(),
2861                container_name: name,
2862            })
2863        });
2864        let task = Task {
2865            task_arn: task_arn.clone(),
2866            task_id: task_id.clone(),
2867            cluster_arn: cluster_arn.clone(),
2868            cluster_name: cluster_name.clone(),
2869            task_definition_arn: td_arn.clone(),
2870            family: family.clone(),
2871            revision,
2872            last_status: "PENDING".into(),
2873            desired_status: "RUNNING".into(),
2874            launch_type: launch_type.into(),
2875            platform_version: Some("1.4.0".into()),
2876            cpu: cpu.clone(),
2877            memory: memory.clone(),
2878            containers,
2879            overrides: json!({}),
2880            started_by: Some(service_tag.clone()),
2881            group: Some(format!("service:{}", service.service_name)),
2882            connectivity: "CONNECTING".into(),
2883            stop_code: None,
2884            stopped_reason: None,
2885            created_at: Utc::now(),
2886            started_at: None,
2887            stopping_at: None,
2888            stopped_at: None,
2889            pull_started_at: None,
2890            pull_stopped_at: None,
2891            connectivity_at: None,
2892            started_by_ref_id: None,
2893            execution_role_arn: exec_role.clone(),
2894            task_role_arn: task_role.clone(),
2895            tags: Vec::new(),
2896            awslogs,
2897            captured_logs: String::new(),
2898            protection: None,
2899        };
2900        state.tasks.insert(task_id.clone(), task);
2901        if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2902            cluster.pending_tasks_count += 1;
2903        }
2904        ids.push(task_id);
2905    }
2906    let _ = principal_arn;
2907    ids
2908}
2909
2910fn recompute_service_counts(
2911    state: &EcsState,
2912    service_name: &str,
2913    cluster_name: &str,
2914    service_json: &mut Value,
2915) {
2916    let service_tag = format!("ecs-svc/{}", service_name);
2917    let mut running = 0i32;
2918    let mut pending = 0i32;
2919    for t in state.tasks.values() {
2920        if t.started_by.as_deref() == Some(service_tag.as_str()) && t.cluster_name == cluster_name {
2921            match t.last_status.as_str() {
2922                "RUNNING" => running += 1,
2923                "PENDING" | "PROVISIONING" => pending += 1,
2924                _ => {}
2925            }
2926        }
2927    }
2928    if let Some(map) = service_json.as_object_mut() {
2929        map.insert("runningCount".into(), json!(running));
2930        map.insert("pendingCount".into(), json!(pending));
2931    }
2932}
2933
2934fn service_to_json(svc: &Service) -> Value {
2935    let mut map = serde_json::Map::new();
2936    map.insert("serviceArn".into(), json!(svc.service_arn));
2937    map.insert("serviceName".into(), json!(svc.service_name));
2938    map.insert("clusterArn".into(), json!(svc.cluster_arn));
2939    map.insert("status".into(), json!(svc.status));
2940    map.insert("desiredCount".into(), json!(svc.desired_count));
2941    map.insert("runningCount".into(), json!(svc.running_count));
2942    map.insert("pendingCount".into(), json!(svc.pending_count));
2943    map.insert("launchType".into(), json!(svc.launch_type));
2944    map.insert("schedulingStrategy".into(), json!(svc.scheduling_strategy));
2945    map.insert("taskDefinition".into(), json!(svc.task_definition_arn));
2946    map.insert(
2947        "deploymentController".into(),
2948        json!({"type": svc.deployment_controller}),
2949    );
2950    let mut deployment_cfg = serde_json::Map::new();
2951    if let Some(n) = svc.minimum_healthy_percent {
2952        deployment_cfg.insert("minimumHealthyPercent".into(), json!(n));
2953    }
2954    if let Some(n) = svc.maximum_percent {
2955        deployment_cfg.insert("maximumPercent".into(), json!(n));
2956    }
2957    if let Some(ref cb) = svc.circuit_breaker {
2958        deployment_cfg.insert(
2959            "deploymentCircuitBreaker".into(),
2960            json!({"enable": cb.enable, "rollback": cb.rollback}),
2961        );
2962    }
2963    if !deployment_cfg.is_empty() {
2964        map.insert(
2965            "deploymentConfiguration".into(),
2966            Value::Object(deployment_cfg),
2967        );
2968    }
2969    map.insert(
2970        "deployments".into(),
2971        Value::Array(svc.deployments.iter().map(deployment_to_json).collect()),
2972    );
2973    map.insert(
2974        "loadBalancers".into(),
2975        Value::Array(svc.load_balancers.clone()),
2976    );
2977    map.insert(
2978        "serviceRegistries".into(),
2979        Value::Array(svc.service_registries.clone()),
2980    );
2981    map.insert(
2982        "placementConstraints".into(),
2983        Value::Array(svc.placement_constraints.clone()),
2984    );
2985    map.insert(
2986        "placementStrategy".into(),
2987        Value::Array(svc.placement_strategy.clone()),
2988    );
2989    if let Some(ref v) = svc.network_configuration {
2990        map.insert("networkConfiguration".into(), v.clone());
2991    }
2992    if let Some(ref v) = svc.role_arn {
2993        map.insert("roleArn".into(), json!(v));
2994    }
2995    if let Some(ref v) = svc.created_by {
2996        map.insert("createdBy".into(), json!(v));
2997    }
2998    map.insert("createdAt".into(), json!(svc.created_at.timestamp()));
2999    Value::Object(map)
3000}
3001
3002fn deployment_to_json(d: &Deployment) -> Value {
3003    json!({
3004        "id": d.deployment_id,
3005        "status": d.status,
3006        "taskDefinition": d.task_definition_arn,
3007        "desiredCount": d.desired_count,
3008        "pendingCount": d.pending_count,
3009        "runningCount": d.running_count,
3010        "failedTasks": d.failed_tasks,
3011        "createdAt": d.created_at.timestamp(),
3012        "updatedAt": d.updated_at.timestamp(),
3013        "launchType": d.launch_type,
3014        "rolloutState": d.rollout_state,
3015        "rolloutStateReason": d.rollout_state_reason,
3016    })
3017}
3018
3019// -------- operations: container instances, attributes, capacity providers, task sets, task protection, ExecuteCommand, agent surface --------
3020
3021impl EcsService {
3022    fn register_container_instance(
3023        &self,
3024        request: &AwsRequest,
3025    ) -> Result<AwsResponse, AwsServiceError> {
3026        let body = request.json_body();
3027        let cluster_ref = opt_str(&body, "cluster");
3028        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3029        let ec2_id = opt_str(&body, "instanceIdentityDocument")
3030            .and_then(|s| serde_json::from_str::<Value>(s).ok())
3031            .and_then(|v| {
3032                v.get("instanceId")
3033                    .and_then(|x| x.as_str())
3034                    .map(String::from)
3035            });
3036        let tags = parse_tags(&body);
3037
3038        let account = request.account_id.clone();
3039        let mut accounts = self.state.write();
3040        let state = accounts.get_or_create(&account);
3041        let cluster_arn = state
3042            .clusters
3043            .get(&cluster_name)
3044            .map(|c| c.cluster_arn.clone())
3045            .unwrap_or_else(|| state.cluster_arn(&cluster_name));
3046        let uuid = uuid::Uuid::new_v4().to_string();
3047        let ci_arn = state.container_instance_arn(&cluster_name, &uuid);
3048        let key = format!("{}/{}", cluster_name, uuid);
3049        let ci = ContainerInstance {
3050            container_instance_arn: ci_arn.clone(),
3051            ec2_instance_id: ec2_id,
3052            cluster_name: cluster_name.clone(),
3053            cluster_arn,
3054            status: "ACTIVE".into(),
3055            version: 1,
3056            version_info: body.get("versionInfo").cloned(),
3057            agent_connected: true,
3058            agent_update_status: None,
3059            remaining_resources: body
3060                .get("totalResources")
3061                .and_then(|v| v.as_array())
3062                .cloned()
3063                .unwrap_or_default(),
3064            registered_resources: body
3065                .get("totalResources")
3066                .and_then(|v| v.as_array())
3067                .cloned()
3068                .unwrap_or_default(),
3069            running_tasks_count: 0,
3070            pending_tasks_count: 0,
3071            registered_at: Utc::now(),
3072            attributes: body
3073                .get("attributes")
3074                .and_then(|v| v.as_array())
3075                .map(|arr| {
3076                    arr.iter()
3077                        .filter_map(|a| {
3078                            let name = a.get("name").and_then(|v| v.as_str())?;
3079                            Some(AttributeRef {
3080                                name: name.to_string(),
3081                                value: a.get("value").and_then(|v| v.as_str()).map(String::from),
3082                                target_type: a
3083                                    .get("targetType")
3084                                    .and_then(|v| v.as_str())
3085                                    .map(String::from),
3086                                target_id: a
3087                                    .get("targetId")
3088                                    .and_then(|v| v.as_str())
3089                                    .map(String::from),
3090                            })
3091                        })
3092                        .collect()
3093                })
3094                .unwrap_or_default(),
3095            tags,
3096            capacity_provider_name: None,
3097            health_status: None,
3098        };
3099        state.container_instances.insert(key, ci.clone());
3100        if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
3101            cluster.registered_container_instances_count += 1;
3102        }
3103        Ok(AwsResponse::ok_json(json!({
3104            "containerInstance": container_instance_to_json(&ci),
3105        })))
3106    }
3107
3108    fn deregister_container_instance(
3109        &self,
3110        request: &AwsRequest,
3111    ) -> Result<AwsResponse, AwsServiceError> {
3112        let body = request.json_body();
3113        let ci_ref = req_str(&body, "containerInstance")?.to_string();
3114        let cluster_ref = opt_str(&body, "cluster");
3115        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3116        let id = container_instance_id_from_ref(&ci_ref);
3117        let key = format!("{}/{}", cluster_name, id);
3118
3119        let account = request.account_id.clone();
3120        let mut accounts = self.state.write();
3121        let state = accounts
3122            .get_mut(&account)
3123            .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3124        let mut ci = state
3125            .container_instances
3126            .remove(&key)
3127            .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3128        ci.status = "INACTIVE".into();
3129        if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
3130            if cluster.registered_container_instances_count > 0 {
3131                cluster.registered_container_instances_count -= 1;
3132            }
3133        }
3134        Ok(AwsResponse::ok_json(json!({
3135            "containerInstance": container_instance_to_json(&ci),
3136        })))
3137    }
3138
3139    fn describe_container_instances(
3140        &self,
3141        request: &AwsRequest,
3142    ) -> Result<AwsResponse, AwsServiceError> {
3143        let body = request.json_body();
3144        let cluster_ref = opt_str(&body, "cluster");
3145        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3146        let refs: Vec<String> = body
3147            .get("containerInstances")
3148            .and_then(|v| v.as_array())
3149            .map(|arr| {
3150                arr.iter()
3151                    .filter_map(|v| v.as_str().map(String::from))
3152                    .collect()
3153            })
3154            .unwrap_or_default();
3155
3156        let accounts = self.state.read();
3157        let mut found = Vec::new();
3158        let mut failures = Vec::new();
3159        if let Some(state) = accounts.get(&request.account_id) {
3160            for r in &refs {
3161                let id = container_instance_id_from_ref(r);
3162                let key = format!("{}/{}", cluster_name, id);
3163                match state.container_instances.get(&key) {
3164                    Some(ci) => found.push(container_instance_to_json(ci)),
3165                    None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3166                }
3167            }
3168        } else {
3169            for r in &refs {
3170                failures.push(json!({"arn": r, "reason": "MISSING"}));
3171            }
3172        }
3173        Ok(AwsResponse::ok_json(json!({
3174            "containerInstances": found,
3175            "failures": failures,
3176        })))
3177    }
3178
3179    fn list_container_instances(
3180        &self,
3181        request: &AwsRequest,
3182    ) -> Result<AwsResponse, AwsServiceError> {
3183        let body = request.json_body();
3184        let cluster_ref = opt_str(&body, "cluster");
3185        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3186        let status_filter = opt_str(&body, "status");
3187        let max_results = body
3188            .get("maxResults")
3189            .and_then(|v| v.as_i64())
3190            .filter(|n| (1..=100).contains(n))
3191            .map(|n| n as usize)
3192            .unwrap_or(100);
3193        let next_token = opt_str(&body, "nextToken").unwrap_or("");
3194
3195        let accounts = self.state.read();
3196        let mut arns: Vec<String> = match accounts.get(&request.account_id) {
3197            Some(state) => state
3198                .container_instances
3199                .values()
3200                .filter(|ci| ci.cluster_name == cluster_name)
3201                .filter(|ci| status_filter.is_none_or(|s| ci.status == s))
3202                .map(|ci| ci.container_instance_arn.clone())
3203                .collect(),
3204            None => Vec::new(),
3205        };
3206        arns.sort();
3207        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
3208        let end = (start + max_results).min(arns.len());
3209        let page = arns[start..end].to_vec();
3210        let mut out = json!({"containerInstanceArns": page});
3211        if end < arns.len() {
3212            out.as_object_mut()
3213                .unwrap()
3214                .insert("nextToken".into(), json!(end.to_string()));
3215        }
3216        Ok(AwsResponse::ok_json(out))
3217    }
3218
3219    fn update_container_agent(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3220        let body = request.json_body();
3221        let ci_ref = req_str(&body, "containerInstance")?.to_string();
3222        let cluster_ref = opt_str(&body, "cluster");
3223        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3224        let id = container_instance_id_from_ref(&ci_ref);
3225        let key = format!("{}/{}", cluster_name, id);
3226        let mut accounts = self.state.write();
3227        let state = accounts
3228            .get_mut(&request.account_id)
3229            .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3230        let ci = state
3231            .container_instances
3232            .get_mut(&key)
3233            .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3234        ci.agent_update_status = Some("UPDATED".into());
3235        Ok(AwsResponse::ok_json(json!({
3236            "containerInstance": container_instance_to_json(ci),
3237        })))
3238    }
3239
3240    fn update_container_instances_state(
3241        &self,
3242        request: &AwsRequest,
3243    ) -> Result<AwsResponse, AwsServiceError> {
3244        let body = request.json_body();
3245        let status = req_str(&body, "status")?.to_string();
3246        let cluster_ref = opt_str(&body, "cluster");
3247        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3248        let refs: Vec<String> = body
3249            .get("containerInstances")
3250            .and_then(|v| v.as_array())
3251            .map(|arr| {
3252                arr.iter()
3253                    .filter_map(|v| v.as_str().map(String::from))
3254                    .collect()
3255            })
3256            .unwrap_or_default();
3257        let mut accounts = self.state.write();
3258        let state = accounts
3259            .get_mut(&request.account_id)
3260            .ok_or_else(|| client_exception("account not found"))?;
3261        let mut found = Vec::new();
3262        let mut failures = Vec::new();
3263        for r in &refs {
3264            let id = container_instance_id_from_ref(r);
3265            let key = format!("{}/{}", cluster_name, id);
3266            match state.container_instances.get_mut(&key) {
3267                Some(ci) => {
3268                    ci.status = status.clone();
3269                    found.push(container_instance_to_json(ci));
3270                }
3271                None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3272            }
3273        }
3274        Ok(AwsResponse::ok_json(json!({
3275            "containerInstances": found,
3276            "failures": failures,
3277        })))
3278    }
3279
3280    fn put_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3281        let body = request.json_body();
3282        let cluster_ref = opt_str(&body, "cluster");
3283        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3284        let attrs = body
3285            .get("attributes")
3286            .and_then(|v| v.as_array())
3287            .cloned()
3288            .unwrap_or_default();
3289
3290        let mut accounts = self.state.write();
3291        let state = accounts.get_or_create(&request.account_id);
3292        let mut stored = Vec::new();
3293        for a in &attrs {
3294            let Some(name) = a.get("name").and_then(|v| v.as_str()) else {
3295                continue;
3296            };
3297            let target_type = a
3298                .get("targetType")
3299                .and_then(|v| v.as_str())
3300                .unwrap_or("container-instance")
3301                .to_string();
3302            let target_id = a
3303                .get("targetId")
3304                .and_then(|v| v.as_str())
3305                .unwrap_or("")
3306                .to_string();
3307            let value = a.get("value").and_then(|v| v.as_str()).map(String::from);
3308            let key = format!("{}/{}/{}", cluster_name, target_id, name);
3309            let attr = Attribute {
3310                cluster_name: cluster_name.clone(),
3311                target_type: target_type.clone(),
3312                target_id: target_id.clone(),
3313                name: name.to_string(),
3314                value: value.clone(),
3315            };
3316            state.attributes.insert(key, attr);
3317            stored.push(json!({
3318                "name": name,
3319                "value": value,
3320                "targetType": target_type,
3321                "targetId": target_id,
3322            }));
3323        }
3324        Ok(AwsResponse::ok_json(json!({"attributes": stored})))
3325    }
3326
3327    fn delete_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3328        let body = request.json_body();
3329        let cluster_ref = opt_str(&body, "cluster");
3330        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3331        let attrs = body
3332            .get("attributes")
3333            .and_then(|v| v.as_array())
3334            .cloned()
3335            .unwrap_or_default();
3336        let mut accounts = self.state.write();
3337        let state = accounts.get_or_create(&request.account_id);
3338        let mut deleted = Vec::new();
3339        for a in &attrs {
3340            let Some(name) = a.get("name").and_then(|v| v.as_str()) else {
3341                continue;
3342            };
3343            let target_id = a.get("targetId").and_then(|v| v.as_str()).unwrap_or("");
3344            let key = format!("{}/{}/{}", cluster_name, target_id, name);
3345            if let Some(attr) = state.attributes.remove(&key) {
3346                deleted.push(json!({
3347                    "name": attr.name,
3348                    "value": attr.value,
3349                    "targetType": attr.target_type,
3350                    "targetId": attr.target_id,
3351                }));
3352            }
3353        }
3354        Ok(AwsResponse::ok_json(json!({"attributes": deleted})))
3355    }
3356
3357    fn list_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3358        let body = request.json_body();
3359        let cluster_ref = opt_str(&body, "cluster");
3360        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3361        let target_type = req_str(&body, "targetType")?.to_string();
3362        let attr_name = opt_str(&body, "attributeName");
3363        let attr_value = opt_str(&body, "attributeValue");
3364
3365        let accounts = self.state.read();
3366        let attrs: Vec<Value> = match accounts.get(&request.account_id) {
3367            Some(state) => state
3368                .attributes
3369                .values()
3370                .filter(|a| a.cluster_name == cluster_name)
3371                .filter(|a| a.target_type == target_type)
3372                .filter(|a| attr_name.is_none_or(|n| a.name == n))
3373                .filter(|a| attr_value.is_none_or(|v| a.value.as_deref() == Some(v)))
3374                .map(|a| {
3375                    json!({
3376                        "name": a.name,
3377                        "value": a.value,
3378                        "targetType": a.target_type,
3379                        "targetId": a.target_id,
3380                    })
3381                })
3382                .collect(),
3383            None => Vec::new(),
3384        };
3385        Ok(AwsResponse::ok_json(json!({"attributes": attrs})))
3386    }
3387
3388    fn create_capacity_provider(
3389        &self,
3390        request: &AwsRequest,
3391    ) -> Result<AwsResponse, AwsServiceError> {
3392        let body = request.json_body();
3393        let name = req_str(&body, "name")?.to_string();
3394        if name.starts_with("aws") || name.starts_with("ecs") {
3395            return Err(invalid_parameter(format!(
3396                "Capacity provider name cannot begin with 'aws' or 'ecs': {name}"
3397            )));
3398        }
3399        let auto_scaling_group_provider = body.get("autoScalingGroupProvider").cloned();
3400        let tags = parse_tags(&body);
3401
3402        let mut accounts = self.state.write();
3403        let state = accounts.get_or_create(&request.account_id);
3404        if state.capacity_providers.contains_key(&name) {
3405            return Err(client_exception(format!(
3406                "Capacity provider already exists: {name}"
3407            )));
3408        }
3409        let arn = format!(
3410            "arn:aws:ecs:{}:{}:capacity-provider/{}",
3411            state.region, state.account_id, name
3412        );
3413        let cp = CapacityProvider {
3414            name: name.clone(),
3415            arn,
3416            status: "ACTIVE".into(),
3417            auto_scaling_group_provider,
3418            update_status: None,
3419            update_status_reason: None,
3420            created_at: Utc::now(),
3421            tags,
3422        };
3423        state.capacity_providers.insert(name.clone(), cp.clone());
3424        Ok(AwsResponse::ok_json(json!({
3425            "capacityProvider": capacity_provider_to_json(&cp),
3426        })))
3427    }
3428
3429    fn delete_capacity_provider(
3430        &self,
3431        request: &AwsRequest,
3432    ) -> Result<AwsResponse, AwsServiceError> {
3433        let body = request.json_body();
3434        let input = req_str(&body, "capacityProvider")?.to_string();
3435        let name = capacity_provider_name_from_ref(&input);
3436        let mut accounts = self.state.write();
3437        let state = accounts
3438            .get_mut(&request.account_id)
3439            .ok_or_else(|| capacity_provider_not_found(&name))?;
3440        let mut cp = state
3441            .capacity_providers
3442            .remove(&name)
3443            .ok_or_else(|| capacity_provider_not_found(&name))?;
3444        cp.status = "INACTIVE".into();
3445        Ok(AwsResponse::ok_json(json!({
3446            "capacityProvider": capacity_provider_to_json(&cp),
3447        })))
3448    }
3449
3450    fn describe_capacity_providers(
3451        &self,
3452        request: &AwsRequest,
3453    ) -> Result<AwsResponse, AwsServiceError> {
3454        let body = request.json_body();
3455        let names: Vec<String> = body
3456            .get("capacityProviders")
3457            .and_then(|v| v.as_array())
3458            .map(|arr| {
3459                arr.iter()
3460                    .filter_map(|v| v.as_str().map(capacity_provider_name_from_ref))
3461                    .collect()
3462            })
3463            .unwrap_or_default();
3464        let accounts = self.state.read();
3465        let mut found = Vec::new();
3466        let mut failures = Vec::new();
3467        if let Some(state) = accounts.get(&request.account_id) {
3468            if names.is_empty() {
3469                for cp in state.capacity_providers.values() {
3470                    found.push(capacity_provider_to_json(cp));
3471                }
3472            } else {
3473                for n in &names {
3474                    match state.capacity_providers.get(n) {
3475                        Some(cp) => found.push(capacity_provider_to_json(cp)),
3476                        None => failures.push(json!({"arn": n, "reason": "MISSING"})),
3477                    }
3478                }
3479            }
3480        }
3481        Ok(AwsResponse::ok_json(json!({
3482            "capacityProviders": found,
3483            "failures": failures,
3484        })))
3485    }
3486
3487    fn update_capacity_provider(
3488        &self,
3489        request: &AwsRequest,
3490    ) -> Result<AwsResponse, AwsServiceError> {
3491        let body = request.json_body();
3492        let input = req_str(&body, "name")?.to_string();
3493        let name = capacity_provider_name_from_ref(&input);
3494        let asg = body.get("autoScalingGroupProvider").cloned();
3495        let mut accounts = self.state.write();
3496        let state = accounts
3497            .get_mut(&request.account_id)
3498            .ok_or_else(|| capacity_provider_not_found(&name))?;
3499        let cp = state
3500            .capacity_providers
3501            .get_mut(&name)
3502            .ok_or_else(|| capacity_provider_not_found(&name))?;
3503        if let Some(v) = asg {
3504            cp.auto_scaling_group_provider = Some(v);
3505        }
3506        cp.update_status = Some("UPDATE_COMPLETE".into());
3507        Ok(AwsResponse::ok_json(json!({
3508            "capacityProvider": capacity_provider_to_json(cp),
3509        })))
3510    }
3511
3512    fn get_task_protection(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3513        let body = request.json_body();
3514        let refs: Vec<String> = body
3515            .get("tasks")
3516            .and_then(|v| v.as_array())
3517            .map(|arr| {
3518                arr.iter()
3519                    .filter_map(|v| v.as_str().map(String::from))
3520                    .collect()
3521            })
3522            .unwrap_or_default();
3523        let accounts = self.state.read();
3524        let mut protections = Vec::new();
3525        let mut failures = Vec::new();
3526        if let Some(state) = accounts.get(&request.account_id) {
3527            for r in &refs {
3528                let id = task_id_from_ref(r);
3529                match state.tasks.get(&id) {
3530                    Some(t) => protections.push(task_protection_json(t)),
3531                    None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3532                }
3533            }
3534        }
3535        Ok(AwsResponse::ok_json(json!({
3536            "protectedTasks": protections,
3537            "failures": failures,
3538        })))
3539    }
3540
3541    fn update_task_protection(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3542        let body = request.json_body();
3543        let refs: Vec<String> = body
3544            .get("tasks")
3545            .and_then(|v| v.as_array())
3546            .map(|arr| {
3547                arr.iter()
3548                    .filter_map(|v| v.as_str().map(String::from))
3549                    .collect()
3550            })
3551            .unwrap_or_default();
3552        let protect = body
3553            .get("protectionEnabled")
3554            .and_then(|v| v.as_bool())
3555            .unwrap_or(false);
3556        let expires_in_minutes = body
3557            .get("expiresInMinutes")
3558            .and_then(|v| v.as_i64())
3559            .unwrap_or(2880);
3560        let expiration = if protect {
3561            Some(Utc::now() + chrono::Duration::minutes(expires_in_minutes))
3562        } else {
3563            None
3564        };
3565
3566        let mut accounts = self.state.write();
3567        let state = accounts
3568            .get_mut(&request.account_id)
3569            .ok_or_else(|| client_exception("account not found"))?;
3570        let mut protections = Vec::new();
3571        let mut failures = Vec::new();
3572        for r in &refs {
3573            let id = task_id_from_ref(r);
3574            match state.tasks.get_mut(&id) {
3575                Some(t) => {
3576                    t.protection = Some(crate::state::TaskProtection {
3577                        enabled: protect,
3578                        expiration,
3579                    });
3580                    protections.push(task_protection_json(t));
3581                }
3582                None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3583            }
3584        }
3585        Ok(AwsResponse::ok_json(json!({
3586            "protectedTasks": protections,
3587            "failures": failures,
3588        })))
3589    }
3590
3591    fn create_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3592        let body = request.json_body();
3593        let service_ref = req_str(&body, "service")?;
3594        let service_name = service_name_from_ref(service_ref);
3595        let cluster_ref = opt_str(&body, "cluster");
3596        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3597        let task_definition = req_str(&body, "taskDefinition")?.to_string();
3598        let external_id = opt_str(&body, "externalId").map(String::from);
3599        let launch_type = opt_str(&body, "launchType").map(String::from);
3600        let platform_version = opt_str(&body, "platformVersion").map(String::from);
3601        let scale = body.get("scale").cloned();
3602        let tags = parse_tags(&body);
3603        let load_balancers = body
3604            .get("loadBalancers")
3605            .and_then(|v| v.as_array())
3606            .cloned()
3607            .unwrap_or_default();
3608        let service_registries = body
3609            .get("serviceRegistries")
3610            .and_then(|v| v.as_array())
3611            .cloned()
3612            .unwrap_or_default();
3613        let capacity_provider_strategy = body
3614            .get("capacityProviderStrategy")
3615            .and_then(|v| v.as_array())
3616            .cloned()
3617            .unwrap_or_default();
3618
3619        let mut accounts = self.state.write();
3620        let state = accounts
3621            .get_mut(&request.account_id)
3622            .ok_or_else(|| service_not_found(&service_name))?;
3623        let service_key = EcsState::service_key(&cluster_name, &service_name);
3624        let svc = state
3625            .services
3626            .get(&service_key)
3627            .ok_or_else(|| service_not_found(&service_name))?;
3628        if svc.deployment_controller != "EXTERNAL" {
3629            return Err(client_exception(
3630                "CreateTaskSet requires the service to be created with \
3631                 deploymentController.type = EXTERNAL",
3632            ));
3633        }
3634        let ts_id = format!("ecs-svc-{}", uuid::Uuid::new_v4().simple());
3635        let task_set = TaskSet {
3636            task_set_id: ts_id.clone(),
3637            task_set_arn: format!(
3638                "arn:aws:ecs:{}:{}:task-set/{}/{}/{}",
3639                state.region, state.account_id, cluster_name, service_name, ts_id
3640            ),
3641            service_arn: svc.service_arn.clone(),
3642            cluster_arn: svc.cluster_arn.clone(),
3643            service_name: service_name.clone(),
3644            cluster_name: cluster_name.clone(),
3645            external_id,
3646            status: "ACTIVE".into(),
3647            task_definition,
3648            computed_desired_count: 0,
3649            pending_count: 0,
3650            running_count: 0,
3651            launch_type,
3652            platform_version,
3653            scale,
3654            stability_status: "STABILIZING".into(),
3655            created_at: Utc::now(),
3656            updated_at: Utc::now(),
3657            load_balancers,
3658            service_registries,
3659            capacity_provider_strategy,
3660            tags,
3661        };
3662        let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3663        state.task_sets.insert(key, task_set.clone());
3664        Ok(AwsResponse::ok_json(json!({
3665            "taskSet": task_set_to_json(&task_set),
3666        })))
3667    }
3668
3669    fn update_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3670        let body = request.json_body();
3671        let ts_ref = req_str(&body, "taskSet")?.to_string();
3672        let service_ref = req_str(&body, "service")?;
3673        let service_name = service_name_from_ref(service_ref);
3674        let cluster_ref = opt_str(&body, "cluster");
3675        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3676        let scale = body.get("scale").cloned();
3677
3678        let mut accounts = self.state.write();
3679        let state = accounts
3680            .get_mut(&request.account_id)
3681            .ok_or_else(|| client_exception("task set not found"))?;
3682        let ts_id = task_set_id_from_ref(&ts_ref);
3683        let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3684        let ts = state
3685            .task_sets
3686            .get_mut(&key)
3687            .ok_or_else(|| client_exception(format!("task set not found: {}", ts_ref)))?;
3688        if let Some(v) = scale {
3689            ts.scale = Some(v);
3690        }
3691        ts.updated_at = Utc::now();
3692        Ok(AwsResponse::ok_json(json!({
3693            "taskSet": task_set_to_json(ts),
3694        })))
3695    }
3696
3697    fn delete_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3698        let body = request.json_body();
3699        let ts_ref = req_str(&body, "taskSet")?.to_string();
3700        let service_ref = req_str(&body, "service")?;
3701        let service_name = service_name_from_ref(service_ref);
3702        let cluster_ref = opt_str(&body, "cluster");
3703        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3704        let ts_id = task_set_id_from_ref(&ts_ref);
3705        let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3706
3707        let mut accounts = self.state.write();
3708        let state = accounts
3709            .get_mut(&request.account_id)
3710            .ok_or_else(|| client_exception("task set not found"))?;
3711        let mut ts = state
3712            .task_sets
3713            .remove(&key)
3714            .ok_or_else(|| client_exception(format!("task set not found: {}", ts_ref)))?;
3715        ts.status = "DRAINING".into();
3716        Ok(AwsResponse::ok_json(json!({
3717            "taskSet": task_set_to_json(&ts),
3718        })))
3719    }
3720
3721    fn describe_task_sets(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3722        let body = request.json_body();
3723        let service_ref = req_str(&body, "service")?;
3724        let service_name = service_name_from_ref(service_ref);
3725        let cluster_ref = opt_str(&body, "cluster");
3726        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3727        let filter_refs: Vec<String> = body
3728            .get("taskSets")
3729            .and_then(|v| v.as_array())
3730            .map(|arr| {
3731                arr.iter()
3732                    .filter_map(|v| v.as_str().map(String::from))
3733                    .collect()
3734            })
3735            .unwrap_or_default();
3736
3737        let accounts = self.state.read();
3738        let mut found = Vec::new();
3739        let mut failures = Vec::new();
3740        if let Some(state) = accounts.get(&request.account_id) {
3741            if filter_refs.is_empty() {
3742                for ts in state.task_sets.values() {
3743                    if ts.cluster_name == cluster_name && ts.service_name == service_name {
3744                        found.push(task_set_to_json(ts));
3745                    }
3746                }
3747            } else {
3748                for r in &filter_refs {
3749                    let id = task_set_id_from_ref(r);
3750                    let key = format!("{}/{}/{}", cluster_name, service_name, id);
3751                    match state.task_sets.get(&key) {
3752                        Some(ts) => found.push(task_set_to_json(ts)),
3753                        None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3754                    }
3755                }
3756            }
3757        }
3758        Ok(AwsResponse::ok_json(json!({
3759            "taskSets": found,
3760            "failures": failures,
3761        })))
3762    }
3763
3764    fn update_service_primary_task_set(
3765        &self,
3766        request: &AwsRequest,
3767    ) -> Result<AwsResponse, AwsServiceError> {
3768        let body = request.json_body();
3769        let ts_ref = req_str(&body, "primaryTaskSet")?.to_string();
3770        let service_ref = req_str(&body, "service")?;
3771        let service_name = service_name_from_ref(service_ref);
3772        let cluster_ref = opt_str(&body, "cluster");
3773        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3774        let ts_id = task_set_id_from_ref(&ts_ref);
3775        let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3776        let mut accounts = self.state.write();
3777        let state = accounts
3778            .get_mut(&request.account_id)
3779            .ok_or_else(|| client_exception("task set not found"))?;
3780        if !state.task_sets.contains_key(&key) {
3781            return Err(client_exception(format!("task set not found: {}", ts_ref)));
3782        }
3783        // Demote any existing PRIMARY on this service to ACTIVE before
3784        // promoting the new one. Otherwise the service would be left with
3785        // two PRIMARY task sets, which AWS forbids.
3786        for ts in state.task_sets.values_mut() {
3787            if ts.service_name == service_name
3788                && ts.cluster_name == cluster_name
3789                && ts.status == "PRIMARY"
3790                && ts.task_set_id != ts_id
3791            {
3792                ts.status = "ACTIVE".into();
3793                ts.updated_at = Utc::now();
3794            }
3795        }
3796        let ts = state.task_sets.get_mut(&key).unwrap();
3797        ts.status = "PRIMARY".into();
3798        ts.updated_at = Utc::now();
3799        Ok(AwsResponse::ok_json(json!({
3800            "taskSet": task_set_to_json(ts),
3801        })))
3802    }
3803
3804    async fn execute_command(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3805        let body = request.json_body();
3806        let task_ref = req_str(&body, "task")?.to_string();
3807        let command = req_str(&body, "command")?.to_string();
3808        let interactive = body
3809            .get("interactive")
3810            .and_then(|v| v.as_bool())
3811            .unwrap_or(false);
3812
3813        // Resolve runtime container id.
3814        let container_id = {
3815            let accounts = self.state.read();
3816            let state = accounts
3817                .get(&request.account_id)
3818                .ok_or_else(|| task_not_found(&task_ref))?;
3819            let id = task_id_from_ref(&task_ref);
3820            state
3821                .tasks
3822                .get(&id)
3823                .and_then(|t| t.containers.first())
3824                .and_then(|c| c.runtime_id.clone())
3825        };
3826
3827        let session_id = format!("ecs-execute-command-{}", uuid::Uuid::new_v4());
3828        if let (Some(id), Some(_rt)) = (container_id.clone(), self.runtime.as_ref()) {
3829            // Best-effort proxy: shell the command through docker exec. We
3830            // don't stream back stdout/stderr in this ExecuteCommand response
3831            // (real AWS returns a Session token for the SSM sidecar), so log
3832            // the result server-side for visibility.
3833            let out = tokio::process::Command::new("docker")
3834                .args(["exec", &id, "sh", "-c", &command])
3835                .output()
3836                .await
3837                .map_err(|e| client_exception(format!("docker exec failed: {e}")))?;
3838            tracing::info!(
3839                task = %task_ref,
3840                exit = out.status.code().unwrap_or(-1),
3841                "ExecuteCommand via docker exec"
3842            );
3843        }
3844
3845        Ok(AwsResponse::ok_json(json!({
3846            "clusterArn": opt_str(&body, "cluster").unwrap_or(""),
3847            "containerArn": container_id.unwrap_or_default(),
3848            "containerName": opt_str(&body, "container").unwrap_or(""),
3849            "interactive": interactive,
3850            "session": {
3851                "sessionId": session_id,
3852                "streamUrl": "",
3853                "tokenValue": "",
3854            },
3855            "taskArn": task_ref,
3856        })))
3857    }
3858
3859    fn submit_container_state_change(
3860        &self,
3861        request: &AwsRequest,
3862    ) -> Result<AwsResponse, AwsServiceError> {
3863        // Agent-side API: record whatever the agent tells us about the
3864        // container. We already drive state internally via the runtime,
3865        // so this is an idempotent ack that updates the in-memory copy
3866        // when the named task+container exist.
3867        let body = request.json_body();
3868        let task_ref = opt_str(&body, "task").unwrap_or("");
3869        let container_name = opt_str(&body, "containerName").unwrap_or("");
3870        let status = opt_str(&body, "status").map(String::from);
3871        let exit_code = body.get("exitCode").and_then(|v| v.as_i64());
3872        let reason = opt_str(&body, "reason").map(String::from);
3873
3874        if !task_ref.is_empty() {
3875            let mut accounts = self.state.write();
3876            if let Some(state) = accounts.get_mut(&request.account_id) {
3877                let id = task_id_from_ref(task_ref);
3878                if let Some(task) = state.tasks.get_mut(&id) {
3879                    if let Some(container) = task
3880                        .containers
3881                        .iter_mut()
3882                        .find(|c| c.name == container_name)
3883                    {
3884                        if let Some(s) = status {
3885                            container.last_status = s;
3886                        }
3887                        if let Some(code) = exit_code {
3888                            container.exit_code = Some(code);
3889                        }
3890                        if let Some(r) = reason {
3891                            container.reason = Some(r);
3892                        }
3893                    }
3894                }
3895            }
3896        }
3897        Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3898    }
3899
3900    fn submit_task_state_change(
3901        &self,
3902        request: &AwsRequest,
3903    ) -> Result<AwsResponse, AwsServiceError> {
3904        let body = request.json_body();
3905        let task_ref = opt_str(&body, "task").unwrap_or("");
3906        let status = opt_str(&body, "status").map(String::from);
3907        if !task_ref.is_empty() {
3908            let mut accounts = self.state.write();
3909            if let Some(state) = accounts.get_mut(&request.account_id) {
3910                let id = task_id_from_ref(task_ref);
3911                if let Some(task) = state.tasks.get_mut(&id) {
3912                    if let Some(s) = status {
3913                        task.last_status = s;
3914                    }
3915                }
3916            }
3917        }
3918        Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3919    }
3920
3921    fn submit_attachment_state_changes(
3922        &self,
3923        _request: &AwsRequest,
3924    ) -> Result<AwsResponse, AwsServiceError> {
3925        // Attachments (ENIs) are beyond what fakecloud models today.
3926        // Ack the report so agents don't retry in a tight loop.
3927        Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3928    }
3929
3930    fn discover_poll_endpoint(
3931        &self,
3932        _request: &AwsRequest,
3933    ) -> Result<AwsResponse, AwsServiceError> {
3934        // ECS agents use this to discover the long-poll + telemetry
3935        // endpoints. Point both at fakecloud's local listener; real
3936        // agent polling isn't wired, but the shape is correct.
3937        let accounts = self.state.read();
3938        let endpoint = format!("https://ecs.{}.amazonaws.com/", accounts.region());
3939        Ok(AwsResponse::ok_json(json!({
3940            "endpoint": endpoint,
3941            "telemetryEndpoint": endpoint,
3942            "serviceConnectEndpoint": endpoint,
3943        })))
3944    }
3945
3946    fn stop_service_deployment(
3947        &self,
3948        request: &AwsRequest,
3949    ) -> Result<AwsResponse, AwsServiceError> {
3950        let body = request.json_body();
3951        let deployment_ref = req_str(&body, "serviceDeploymentArn")?.to_string();
3952        let mut accounts = self.state.write();
3953        let state = accounts
3954            .get_mut(&request.account_id)
3955            .ok_or_else(|| client_exception("service deployment not found"))?;
3956        for svc in state.services.values_mut() {
3957            for d in svc.deployments.iter_mut() {
3958                if deployment_ref.contains(&d.deployment_id) {
3959                    d.status = "STOPPED".into();
3960                    d.rollout_state = "FAILED".into();
3961                    d.rollout_state_reason = Some("StopServiceDeployment requested".into());
3962                    d.updated_at = Utc::now();
3963                    return Ok(AwsResponse::ok_json(json!({
3964                        "serviceDeployment": deployment_to_json(d),
3965                    })));
3966                }
3967            }
3968        }
3969        Err(client_exception(format!(
3970            "service deployment not found: {deployment_ref}"
3971        )))
3972    }
3973
3974    fn list_service_deployments(
3975        &self,
3976        request: &AwsRequest,
3977    ) -> Result<AwsResponse, AwsServiceError> {
3978        let body = request.json_body();
3979        let service_ref = req_str(&body, "service")?;
3980        let service_name = service_name_from_ref(service_ref);
3981        let cluster_ref = opt_str(&body, "cluster");
3982        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3983        let accounts = self.state.read();
3984        let mut deployments: Vec<Value> = Vec::new();
3985        if let Some(state) = accounts.get(&request.account_id) {
3986            let key = EcsState::service_key(&cluster_name, &service_name);
3987            if let Some(svc) = state.services.get(&key) {
3988                for d in &svc.deployments {
3989                    deployments.push(json!({
3990                        "serviceDeploymentArn": format!("{}/{}", svc.service_arn, d.deployment_id),
3991                        "serviceArn": svc.service_arn,
3992                        "clusterArn": svc.cluster_arn,
3993                        "status": d.status,
3994                        "createdAt": d.created_at.timestamp(),
3995                        "startedAt": d.created_at.timestamp(),
3996                        "finishedAt": null,
3997                    }));
3998                }
3999            }
4000        }
4001        Ok(AwsResponse::ok_json(json!({
4002            "serviceDeployments": deployments,
4003        })))
4004    }
4005
4006    fn describe_service_deployments(
4007        &self,
4008        request: &AwsRequest,
4009    ) -> Result<AwsResponse, AwsServiceError> {
4010        let body = request.json_body();
4011        let refs: Vec<String> = body
4012            .get("serviceDeploymentArns")
4013            .and_then(|v| v.as_array())
4014            .map(|arr| {
4015                arr.iter()
4016                    .filter_map(|v| v.as_str().map(String::from))
4017                    .collect()
4018            })
4019            .unwrap_or_default();
4020        let accounts = self.state.read();
4021        let mut found = Vec::new();
4022        let mut failures = Vec::new();
4023        if let Some(state) = accounts.get(&request.account_id) {
4024            'next_ref: for r in &refs {
4025                for svc in state.services.values() {
4026                    for d in &svc.deployments {
4027                        if r.contains(&d.deployment_id) {
4028                            found.push(json!({
4029                                "serviceDeploymentArn": r,
4030                                "serviceArn": svc.service_arn,
4031                                "clusterArn": svc.cluster_arn,
4032                                "status": d.status,
4033                                "createdAt": d.created_at.timestamp(),
4034                                "startedAt": d.created_at.timestamp(),
4035                                "finishedAt": null,
4036                                "deploymentConfiguration": {
4037                                    "minimumHealthyPercent": svc.minimum_healthy_percent,
4038                                    "maximumPercent": svc.maximum_percent,
4039                                },
4040                                "sourceServiceRevisions": [],
4041                                "targetServiceRevision": {
4042                                    "arn": d.task_definition_arn,
4043                                    "requestedTaskCount": d.desired_count,
4044                                    "runningTaskCount": d.running_count,
4045                                    "pendingTaskCount": d.pending_count,
4046                                    "failedTasks": d.failed_tasks,
4047                                },
4048                            }));
4049                            continue 'next_ref;
4050                        }
4051                    }
4052                }
4053                failures.push(json!({"arn": r, "reason": "MISSING"}));
4054            }
4055        }
4056        Ok(AwsResponse::ok_json(json!({
4057            "serviceDeployments": found,
4058            "failures": failures,
4059        })))
4060    }
4061
4062    fn describe_service_revisions(
4063        &self,
4064        request: &AwsRequest,
4065    ) -> Result<AwsResponse, AwsServiceError> {
4066        let body = request.json_body();
4067        let refs: Vec<String> = body
4068            .get("serviceRevisionArns")
4069            .and_then(|v| v.as_array())
4070            .map(|arr| {
4071                arr.iter()
4072                    .filter_map(|v| v.as_str().map(String::from))
4073                    .collect()
4074            })
4075            .unwrap_or_default();
4076        Ok(AwsResponse::ok_json(json!({
4077            "serviceRevisions": [],
4078            "failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
4079        })))
4080    }
4081}
4082
4083fn container_instance_id_from_ref(input: &str) -> String {
4084    input.rsplit('/').next().unwrap_or(input).to_string()
4085}
4086
4087fn container_instance_not_found(input: &str) -> AwsServiceError {
4088    AwsServiceError::aws_error(
4089        StatusCode::BAD_REQUEST,
4090        "ClientException",
4091        format!("Container instance not found: {input}"),
4092    )
4093}
4094
4095fn capacity_provider_name_from_ref(input: &str) -> String {
4096    input.rsplit('/').next().unwrap_or(input).to_string()
4097}
4098
4099fn capacity_provider_not_found(name: &str) -> AwsServiceError {
4100    AwsServiceError::aws_error(
4101        StatusCode::BAD_REQUEST,
4102        "ClientException",
4103        format!("Capacity provider not found: {name}"),
4104    )
4105}
4106
4107fn task_set_id_from_ref(input: &str) -> String {
4108    input.rsplit('/').next().unwrap_or(input).to_string()
4109}
4110
4111fn container_instance_to_json(ci: &ContainerInstance) -> Value {
4112    json!({
4113        "containerInstanceArn": ci.container_instance_arn,
4114        "ec2InstanceId": ci.ec2_instance_id,
4115        "status": ci.status,
4116        "version": ci.version,
4117        "versionInfo": ci.version_info,
4118        "agentConnected": ci.agent_connected,
4119        "agentUpdateStatus": ci.agent_update_status,
4120        "remainingResources": ci.remaining_resources,
4121        "registeredResources": ci.registered_resources,
4122        "runningTasksCount": ci.running_tasks_count,
4123        "pendingTasksCount": ci.pending_tasks_count,
4124        "registeredAt": ci.registered_at.timestamp(),
4125        "attributes": ci.attributes.iter().map(|a| json!({
4126            "name": a.name,
4127            "value": a.value,
4128            "targetType": a.target_type,
4129            "targetId": a.target_id,
4130        })).collect::<Vec<_>>(),
4131        "tags": tags_json(&ci.tags),
4132        "capacityProviderName": ci.capacity_provider_name,
4133        "healthStatus": ci.health_status,
4134    })
4135}
4136
4137fn capacity_provider_to_json(cp: &CapacityProvider) -> Value {
4138    json!({
4139        "name": cp.name,
4140        "capacityProviderArn": cp.arn,
4141        "status": cp.status,
4142        "autoScalingGroupProvider": cp.auto_scaling_group_provider,
4143        "updateStatus": cp.update_status,
4144        "updateStatusReason": cp.update_status_reason,
4145        "tags": tags_json(&cp.tags),
4146    })
4147}
4148
4149fn task_set_to_json(ts: &TaskSet) -> Value {
4150    json!({
4151        "id": ts.task_set_id,
4152        "taskSetArn": ts.task_set_arn,
4153        "serviceArn": ts.service_arn,
4154        "clusterArn": ts.cluster_arn,
4155        "externalId": ts.external_id,
4156        "status": ts.status,
4157        "taskDefinition": ts.task_definition,
4158        "computedDesiredCount": ts.computed_desired_count,
4159        "pendingCount": ts.pending_count,
4160        "runningCount": ts.running_count,
4161        "launchType": ts.launch_type,
4162        "platformVersion": ts.platform_version,
4163        "scale": ts.scale,
4164        "stabilityStatus": ts.stability_status,
4165        "createdAt": ts.created_at.timestamp(),
4166        "updatedAt": ts.updated_at.timestamp(),
4167        "loadBalancers": ts.load_balancers,
4168        "serviceRegistries": ts.service_registries,
4169        "capacityProviderStrategy": ts.capacity_provider_strategy,
4170        "tags": tags_json(&ts.tags),
4171    })
4172}
4173
4174fn task_protection_json(task: &Task) -> Value {
4175    let p = task.protection.as_ref();
4176    json!({
4177        "taskArn": task.task_arn,
4178        "protectionEnabled": p.map(|p| p.enabled).unwrap_or(false),
4179        "expirationDate": p.and_then(|p| p.expiration).map(|e| e.timestamp()),
4180    })
4181}
4182
4183#[cfg(test)]
4184mod tests {
4185    use super::*;
4186
4187    #[test]
4188    fn parse_family_revision_with_revision() {
4189        assert_eq!(parse_family_revision("web:3"), ("web".to_string(), Some(3)));
4190    }
4191
4192    #[test]
4193    fn parse_family_revision_without_revision() {
4194        assert_eq!(parse_family_revision("web"), ("web".to_string(), None));
4195    }
4196
4197    #[test]
4198    fn parse_family_revision_non_numeric_treated_as_no_revision() {
4199        assert_eq!(
4200            parse_family_revision("web:latest"),
4201            ("web:latest".to_string(), None)
4202        );
4203    }
4204
4205    #[test]
4206    fn decode_ecs_arn_cluster() {
4207        let (account, rtype, tail) =
4208            decode_ecs_arn("arn:aws:ecs:us-east-1:111122223333:cluster/prod").unwrap();
4209        assert_eq!(account, "111122223333");
4210        assert_eq!(rtype, "cluster");
4211        assert_eq!(tail, "prod");
4212    }
4213
4214    #[test]
4215    fn decode_ecs_arn_task_definition() {
4216        let (account, rtype, tail) =
4217            decode_ecs_arn("arn:aws:ecs:us-east-1:111122223333:task-definition/web:5").unwrap();
4218        assert_eq!(account, "111122223333");
4219        assert_eq!(rtype, "task-definition");
4220        assert_eq!(tail, "web:5");
4221    }
4222
4223    #[test]
4224    fn decode_ecs_arn_rejects_non_ecs() {
4225        assert!(decode_ecs_arn("arn:aws:s3:::bucket").is_err());
4226    }
4227
4228    #[test]
4229    fn resolve_service_key_handles_short_and_long() {
4230        let mut state = EcsState::new("123456789012", "us-east-1");
4231        state.services.insert(
4232            "default/api".to_string(),
4233            Service {
4234                service_name: "api".into(),
4235                service_arn: "arn".into(),
4236                cluster_name: "default".into(),
4237                cluster_arn: "arn".into(),
4238                task_definition_arn: "td-arn".into(),
4239                family: "td".into(),
4240                revision: 1,
4241                desired_count: 0,
4242                running_count: 0,
4243                pending_count: 0,
4244                launch_type: "FARGATE".into(),
4245                status: "ACTIVE".into(),
4246                scheduling_strategy: "REPLICA".into(),
4247                deployment_controller: "ECS".into(),
4248                minimum_healthy_percent: None,
4249                maximum_percent: None,
4250                circuit_breaker: None,
4251                deployments: vec![],
4252                load_balancers: vec![],
4253                service_registries: vec![],
4254                placement_constraints: vec![],
4255                placement_strategy: vec![],
4256                network_configuration: None,
4257                tags: vec![],
4258                created_at: chrono::Utc::now(),
4259                created_by: None,
4260                role_arn: None,
4261            },
4262        );
4263        // Long-form: cluster/service.
4264        assert_eq!(
4265            resolve_service_key(&state, "default/api"),
4266            Some("default/api".to_string())
4267        );
4268        // Short-form: bare service name resolves via ends_with scan.
4269        assert_eq!(
4270            resolve_service_key(&state, "api"),
4271            Some("default/api".to_string())
4272        );
4273        // Unknown name returns None.
4274        assert_eq!(resolve_service_key(&state, "nope"), None);
4275    }
4276
4277    #[test]
4278    fn resolve_container_instance_key_handles_short_and_long() {
4279        let mut state = EcsState::new("123456789012", "us-east-1");
4280        state.container_instances.insert(
4281            "default/abc-123".to_string(),
4282            ContainerInstance {
4283                container_instance_arn: "arn".into(),
4284                ec2_instance_id: Some("i-x".into()),
4285                cluster_name: "default".into(),
4286                cluster_arn: "arn".into(),
4287                status: "ACTIVE".into(),
4288                version: 0,
4289                version_info: None,
4290                agent_connected: true,
4291                agent_update_status: None,
4292                remaining_resources: vec![],
4293                registered_resources: vec![],
4294                running_tasks_count: 0,
4295                pending_tasks_count: 0,
4296                registered_at: chrono::Utc::now(),
4297                attributes: vec![],
4298                tags: vec![],
4299                capacity_provider_name: None,
4300                health_status: None,
4301            },
4302        );
4303        assert_eq!(
4304            resolve_container_instance_key(&state, "default/abc-123"),
4305            Some("default/abc-123".to_string())
4306        );
4307        assert_eq!(
4308            resolve_container_instance_key(&state, "abc-123"),
4309            Some("default/abc-123".to_string())
4310        );
4311        assert_eq!(resolve_container_instance_key(&state, "nope"), None);
4312    }
4313
4314    #[test]
4315    fn validate_family_name_accepts_hyphen_underscore() {
4316        assert!(validate_family_name("web_server-2").is_ok());
4317    }
4318
4319    #[test]
4320    fn validate_family_name_rejects_empty() {
4321        assert!(validate_family_name("").is_err());
4322    }
4323
4324    #[test]
4325    fn validate_family_name_rejects_slash() {
4326        assert!(validate_family_name("web/server").is_err());
4327    }
4328
4329    #[test]
4330    fn resolve_task_definition_ref_bare_family() {
4331        let (account, family, rev) = resolve_task_definition_ref("web").unwrap();
4332        assert_eq!(account, None);
4333        assert_eq!(family, "web");
4334        assert_eq!(rev, None);
4335    }
4336
4337    #[test]
4338    fn resolve_task_definition_ref_family_revision() {
4339        let (account, family, rev) = resolve_task_definition_ref("web:3").unwrap();
4340        assert_eq!(account, None);
4341        assert_eq!(family, "web");
4342        assert_eq!(rev, Some(3));
4343    }
4344
4345    #[test]
4346    fn resolve_task_definition_ref_full_arn() {
4347        let (account, family, rev) =
4348            resolve_task_definition_ref("arn:aws:ecs:us-east-1:111122223333:task-definition/web:3")
4349                .unwrap();
4350        assert_eq!(account, Some("111122223333".to_string()));
4351        assert_eq!(family, "web");
4352        assert_eq!(rev, Some(3));
4353    }
4354
4355    #[test]
4356    fn merge_tags_replaces_existing_value() {
4357        let mut current = vec![TagEntry {
4358            key: "env".into(),
4359            value: "dev".into(),
4360        }];
4361        merge_tags(
4362            &mut current,
4363            vec![TagEntry {
4364                key: "env".into(),
4365                value: "prod".into(),
4366            }],
4367        );
4368        assert_eq!(current.len(), 1);
4369        assert_eq!(current[0].value, "prod");
4370    }
4371
4372    #[test]
4373    fn merge_tags_adds_new() {
4374        let mut current = vec![TagEntry {
4375            key: "env".into(),
4376            value: "dev".into(),
4377        }];
4378        merge_tags(
4379            &mut current,
4380            vec![TagEntry {
4381                key: "team".into(),
4382                value: "platform".into(),
4383            }],
4384        );
4385        assert_eq!(current.len(), 2);
4386    }
4387
4388    #[test]
4389    fn parse_tags_reads_lowercase_keys() {
4390        let body = json!({
4391            "tags": [
4392                {"key": "env", "value": "prod"},
4393                {"key": "team", "value": "platform"},
4394            ]
4395        });
4396        let tags = parse_tags(&body);
4397        assert_eq!(tags.len(), 2);
4398        assert_eq!(tags[0].key, "env");
4399        assert_eq!(tags[0].value, "prod");
4400    }
4401
4402    #[test]
4403    fn matches_filter_respects_none() {
4404        assert!(matches_filter(None, "anything"));
4405        assert!(matches_filter(Some("x"), "x"));
4406        assert!(!matches_filter(Some("x"), "y"));
4407    }
4408}