Skip to main content

fakecloud_ecs/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use chrono::Utc;
5use http::StatusCode;
6use serde_json::{json, Map, Value};
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
10use fakecloud_persistence::SnapshotStore;
11
12use crate::state::{
13    Attribute, AttributeRef, AwsLogsConfig, CapacityProvider, CircuitBreakerConfig, Cluster,
14    Container, ContainerInstance, Deployment, EcsSnapshot, EcsState, Service, SharedEcsState,
15    TagEntry, Task, TaskDefinition, TaskSet, ECS_SNAPSHOT_SCHEMA_VERSION,
16};
17
18const SUPPORTED_ACTIONS: &[&str] = &[
19    "CreateCluster",
20    "DescribeClusters",
21    "DeleteCluster",
22    "ListClusters",
23    "UpdateCluster",
24    "UpdateClusterSettings",
25    "PutClusterCapacityProviders",
26    "RegisterTaskDefinition",
27    "DescribeTaskDefinition",
28    "DeregisterTaskDefinition",
29    "DeleteTaskDefinitions",
30    "ListTaskDefinitions",
31    "ListTaskDefinitionFamilies",
32    "TagResource",
33    "UntagResource",
34    "ListTagsForResource",
35    "PutAccountSetting",
36    "PutAccountSettingDefault",
37    "DeleteAccountSetting",
38    "ListAccountSettings",
39    "RunTask",
40    "StartTask",
41    "StopTask",
42    "DescribeTasks",
43    "ListTasks",
44    "CreateService",
45    "UpdateService",
46    "DeleteService",
47    "DescribeServices",
48    "ListServices",
49    "ListServicesByNamespace",
50    "RegisterContainerInstance",
51    "DeregisterContainerInstance",
52    "DescribeContainerInstances",
53    "ListContainerInstances",
54    "UpdateContainerAgent",
55    "UpdateContainerInstancesState",
56    "PutAttributes",
57    "DeleteAttributes",
58    "ListAttributes",
59    "CreateCapacityProvider",
60    "DeleteCapacityProvider",
61    "DescribeCapacityProviders",
62    "UpdateCapacityProvider",
63    "GetTaskProtection",
64    "UpdateTaskProtection",
65    "CreateTaskSet",
66    "UpdateTaskSet",
67    "DeleteTaskSet",
68    "DescribeTaskSets",
69    "UpdateServicePrimaryTaskSet",
70    "ExecuteCommand",
71    "SubmitContainerStateChange",
72    "SubmitTaskStateChange",
73    "SubmitAttachmentStateChanges",
74    "DiscoverPollEndpoint",
75    "StopServiceDeployment",
76    "ListServiceDeployments",
77    "DescribeServiceDeployments",
78    "DescribeServiceRevisions",
79];
80
81fn is_mutating(action: &str) -> bool {
82    matches!(
83        action,
84        "CreateCluster"
85            | "DeleteCluster"
86            | "UpdateCluster"
87            | "UpdateClusterSettings"
88            | "PutClusterCapacityProviders"
89            | "RegisterTaskDefinition"
90            | "DeregisterTaskDefinition"
91            | "DeleteTaskDefinitions"
92            | "TagResource"
93            | "UntagResource"
94            | "PutAccountSetting"
95            | "PutAccountSettingDefault"
96            | "DeleteAccountSetting"
97            | "RunTask"
98            | "StartTask"
99            | "StopTask"
100            | "CreateService"
101            | "UpdateService"
102            | "DeleteService"
103            | "RegisterContainerInstance"
104            | "DeregisterContainerInstance"
105            | "UpdateContainerAgent"
106            | "UpdateContainerInstancesState"
107            | "PutAttributes"
108            | "DeleteAttributes"
109            | "CreateCapacityProvider"
110            | "DeleteCapacityProvider"
111            | "UpdateCapacityProvider"
112            | "UpdateTaskProtection"
113            | "CreateTaskSet"
114            | "UpdateTaskSet"
115            | "DeleteTaskSet"
116            | "UpdateServicePrimaryTaskSet"
117            | "SubmitContainerStateChange"
118            | "SubmitTaskStateChange"
119            | "SubmitAttachmentStateChanges"
120            | "StopServiceDeployment"
121    )
122}
123
124pub struct EcsService {
125    state: SharedEcsState,
126    snapshot_store: Option<Arc<dyn SnapshotStore>>,
127    snapshot_lock: Arc<AsyncMutex<()>>,
128    runtime: Option<Arc<crate::runtime::EcsRuntime>>,
129    role_trust_validator: Option<Arc<dyn fakecloud_core::auth::RoleTrustValidator>>,
130}
131
132impl EcsService {
133    pub fn new(state: SharedEcsState) -> Self {
134        Self {
135            state,
136            snapshot_store: None,
137            snapshot_lock: Arc::new(AsyncMutex::new(())),
138            runtime: None,
139            role_trust_validator: None,
140        }
141    }
142
143    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
144        self.snapshot_store = Some(store);
145        self
146    }
147
148    pub fn with_runtime(mut self, runtime: Arc<crate::runtime::EcsRuntime>) -> Self {
149        self.runtime = Some(runtime);
150        self
151    }
152
153    pub fn with_role_trust_validator(
154        mut self,
155        validator: Arc<dyn fakecloud_core::auth::RoleTrustValidator>,
156    ) -> Self {
157        self.role_trust_validator = Some(validator);
158        self
159    }
160
161    fn check_pass_role(&self, account_id: &str, role_arn: &str) -> Result<(), AwsServiceError> {
162        let Some(ref validator) = self.role_trust_validator else {
163            return Ok(());
164        };
165        if let Err(err) = validator.validate(account_id, role_arn, "ecs-tasks.amazonaws.com") {
166            return Err(AwsServiceError::aws_error(
167                StatusCode::BAD_REQUEST,
168                "InvalidParameterException",
169                err.to_string(),
170            ));
171        }
172        Ok(())
173    }
174
175    pub fn state_handle(&self) -> &SharedEcsState {
176        &self.state
177    }
178
179    async fn save_snapshot(&self) {
180        let Some(store) = self.snapshot_store.clone() else {
181            return;
182        };
183        let _guard = self.snapshot_lock.lock().await;
184        let snapshot = EcsSnapshot {
185            schema_version: ECS_SNAPSHOT_SCHEMA_VERSION,
186            accounts: Some(self.state.read().clone()),
187        };
188        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
189            let bytes = serde_json::to_vec(&snapshot)
190                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
191            store.save(&bytes)
192        })
193        .await;
194        match join {
195            Ok(Ok(())) => {}
196            Ok(Err(err)) => tracing::error!(%err, "failed to write ecs snapshot"),
197            Err(err) => tracing::error!(%err, "ecs snapshot task panicked"),
198        }
199    }
200}
201
202#[async_trait]
203impl AwsService for EcsService {
204    fn service_name(&self) -> &str {
205        "ecs"
206    }
207
208    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
209        let mutates = is_mutating(request.action.as_str());
210        let result = match request.action.as_str() {
211            "CreateCluster" => self.create_cluster(&request),
212            "DescribeClusters" => self.describe_clusters(&request),
213            "DeleteCluster" => self.delete_cluster(&request),
214            "ListClusters" => self.list_clusters(&request),
215            "UpdateCluster" => self.update_cluster(&request),
216            "UpdateClusterSettings" => self.update_cluster_settings(&request),
217            "PutClusterCapacityProviders" => self.put_cluster_capacity_providers(&request),
218            "RegisterTaskDefinition" => self.register_task_definition(&request),
219            "DescribeTaskDefinition" => self.describe_task_definition(&request),
220            "DeregisterTaskDefinition" => self.deregister_task_definition(&request),
221            "DeleteTaskDefinitions" => self.delete_task_definitions(&request),
222            "ListTaskDefinitions" => self.list_task_definitions(&request),
223            "ListTaskDefinitionFamilies" => self.list_task_definition_families(&request),
224            "TagResource" => self.tag_resource(&request),
225            "UntagResource" => self.untag_resource(&request),
226            "ListTagsForResource" => self.list_tags_for_resource(&request),
227            "PutAccountSetting" => self.put_account_setting(&request),
228            "PutAccountSettingDefault" => self.put_account_setting_default(&request),
229            "DeleteAccountSetting" => self.delete_account_setting(&request),
230            "ListAccountSettings" => self.list_account_settings(&request),
231            "RunTask" => self.run_task(&request),
232            "StartTask" => self.start_task(&request),
233            "StopTask" => self.stop_task(&request).await,
234            "DescribeTasks" => self.describe_tasks(&request),
235            "ListTasks" => self.list_tasks(&request),
236            "CreateService" => self.create_service(&request),
237            "UpdateService" => self.update_service(&request),
238            "DeleteService" => self.delete_service(&request).await,
239            "DescribeServices" => self.describe_services(&request),
240            "ListServices" => self.list_services(&request),
241            "ListServicesByNamespace" => self.list_services_by_namespace(&request),
242            "RegisterContainerInstance" => self.register_container_instance(&request),
243            "DeregisterContainerInstance" => self.deregister_container_instance(&request),
244            "DescribeContainerInstances" => self.describe_container_instances(&request),
245            "ListContainerInstances" => self.list_container_instances(&request),
246            "UpdateContainerAgent" => self.update_container_agent(&request),
247            "UpdateContainerInstancesState" => self.update_container_instances_state(&request),
248            "PutAttributes" => self.put_attributes(&request),
249            "DeleteAttributes" => self.delete_attributes(&request),
250            "ListAttributes" => self.list_attributes(&request),
251            "CreateCapacityProvider" => self.create_capacity_provider(&request),
252            "DeleteCapacityProvider" => self.delete_capacity_provider(&request),
253            "DescribeCapacityProviders" => self.describe_capacity_providers(&request),
254            "UpdateCapacityProvider" => self.update_capacity_provider(&request),
255            "GetTaskProtection" => self.get_task_protection(&request),
256            "UpdateTaskProtection" => self.update_task_protection(&request),
257            "CreateTaskSet" => self.create_task_set(&request),
258            "UpdateTaskSet" => self.update_task_set(&request),
259            "DeleteTaskSet" => self.delete_task_set(&request),
260            "DescribeTaskSets" => self.describe_task_sets(&request),
261            "UpdateServicePrimaryTaskSet" => self.update_service_primary_task_set(&request),
262            "ExecuteCommand" => self.execute_command(&request).await,
263            "SubmitContainerStateChange" => self.submit_container_state_change(&request),
264            "SubmitTaskStateChange" => self.submit_task_state_change(&request),
265            "SubmitAttachmentStateChanges" => self.submit_attachment_state_changes(&request),
266            "DiscoverPollEndpoint" => self.discover_poll_endpoint(&request),
267            "StopServiceDeployment" => self.stop_service_deployment(&request),
268            "ListServiceDeployments" => self.list_service_deployments(&request),
269            "DescribeServiceDeployments" => self.describe_service_deployments(&request),
270            "DescribeServiceRevisions" => self.describe_service_revisions(&request),
271            _ => Err(AwsServiceError::action_not_implemented(
272                "ecs",
273                &request.action,
274            )),
275        };
276        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
277            self.save_snapshot().await;
278        }
279        result
280    }
281
282    fn supported_actions(&self) -> &[&str] {
283        SUPPORTED_ACTIONS
284    }
285}
286
287// -------- helpers --------
288
289fn req_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
290    body.get(field)
291        .and_then(|v| v.as_str())
292        .ok_or_else(|| client_exception(format!("Missing required field: {field}")))
293}
294
295fn opt_str<'a>(body: &'a Value, field: &str) -> Option<&'a str> {
296    body.get(field).and_then(|v| v.as_str())
297}
298
299fn client_exception(message: impl Into<String>) -> AwsServiceError {
300    AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "ClientException", message)
301}
302
303fn invalid_parameter(message: impl Into<String>) -> AwsServiceError {
304    AwsServiceError::aws_error(
305        StatusCode::BAD_REQUEST,
306        "InvalidParameterException",
307        message,
308    )
309}
310
311fn cluster_not_found(name: &str) -> AwsServiceError {
312    AwsServiceError::aws_error(
313        StatusCode::BAD_REQUEST,
314        "ClusterNotFoundException",
315        format!("The referenced cluster was inactive: {name}"),
316    )
317}
318
319fn cluster_contains_services() -> AwsServiceError {
320    AwsServiceError::aws_error(
321        StatusCode::BAD_REQUEST,
322        "ClusterContainsServicesException",
323        "The specified cluster still contains active services",
324    )
325}
326
327fn cluster_contains_tasks() -> AwsServiceError {
328    AwsServiceError::aws_error(
329        StatusCode::BAD_REQUEST,
330        "ClusterContainsTasksException",
331        "The specified cluster still contains active tasks",
332    )
333}
334
335fn task_definition_not_found(family_rev: &str) -> AwsServiceError {
336    AwsServiceError::aws_error(
337        StatusCode::BAD_REQUEST,
338        "ClientException",
339        format!("Unable to describe task definition: {family_rev}"),
340    )
341}
342
343fn parse_tags(body: &Value) -> Vec<TagEntry> {
344    body.get("tags")
345        .and_then(|v| v.as_array())
346        .map(|arr| {
347            arr.iter()
348                .filter_map(|t| {
349                    let k = t.get("key").and_then(|v| v.as_str())?;
350                    let v = t.get("value").and_then(|v| v.as_str()).unwrap_or("");
351                    Some(TagEntry {
352                        key: k.to_string(),
353                        value: v.to_string(),
354                    })
355                })
356                .collect()
357        })
358        .unwrap_or_default()
359}
360
361fn tags_json(tags: &[TagEntry]) -> Value {
362    Value::Array(
363        tags.iter()
364            .map(|t| json!({"key": t.key, "value": t.value}))
365            .collect(),
366    )
367}
368
369fn merge_tags(current: &mut Vec<TagEntry>, incoming: Vec<TagEntry>) {
370    for new_tag in incoming {
371        if let Some(existing) = current.iter_mut().find(|t| t.key == new_tag.key) {
372            existing.value = new_tag.value;
373        } else {
374            current.push(new_tag);
375        }
376    }
377}
378
379fn cluster_to_json(cluster: &Cluster) -> Value {
380    json!({
381        "clusterArn": cluster.cluster_arn,
382        "clusterName": cluster.cluster_name,
383        "status": cluster.status,
384        "registeredContainerInstancesCount": cluster.registered_container_instances_count,
385        "runningTasksCount": cluster.running_tasks_count,
386        "pendingTasksCount": cluster.pending_tasks_count,
387        "activeServicesCount": cluster.active_services_count,
388        "statistics": cluster.statistics,
389        "tags": tags_json(&cluster.tags),
390        "settings": cluster.settings,
391        "configuration": cluster.configuration,
392        "capacityProviders": cluster.capacity_providers,
393        "defaultCapacityProviderStrategy": cluster.default_capacity_provider_strategy,
394        "attachments": cluster.attachments,
395        "attachmentsStatus": cluster.attachments_status,
396        "serviceConnectDefaults": cluster.service_connect_defaults,
397    })
398}
399
400fn task_definition_to_json(td: &TaskDefinition) -> Value {
401    let mut map = Map::new();
402    map.insert("taskDefinitionArn".into(), json!(td.task_definition_arn));
403    map.insert("family".into(), json!(td.family));
404    map.insert("revision".into(), json!(td.revision));
405    map.insert("status".into(), json!(td.status));
406    map.insert(
407        "containerDefinitions".into(),
408        Value::Array(td.container_definitions.clone()),
409    );
410    map.insert("compatibilities".into(), json!(td.compatibilities));
411    map.insert(
412        "requiresCompatibilities".into(),
413        json!(td.requires_compatibilities),
414    );
415    map.insert("volumes".into(), Value::Array(td.volumes.clone()));
416    map.insert(
417        "placementConstraints".into(),
418        Value::Array(td.placement_constraints.clone()),
419    );
420    map.insert(
421        "requiresAttributes".into(),
422        Value::Array(td.requires_attributes.clone()),
423    );
424    map.insert(
425        "inferenceAccelerators".into(),
426        Value::Array(td.inference_accelerators.clone()),
427    );
428    if let Some(ref x) = td.network_mode {
429        map.insert("networkMode".into(), json!(x));
430    }
431    if let Some(ref x) = td.cpu {
432        map.insert("cpu".into(), json!(x));
433    }
434    if let Some(ref x) = td.memory {
435        map.insert("memory".into(), json!(x));
436    }
437    if let Some(ref x) = td.task_role_arn {
438        map.insert("taskRoleArn".into(), json!(x));
439    }
440    if let Some(ref x) = td.execution_role_arn {
441        map.insert("executionRoleArn".into(), json!(x));
442    }
443    if let Some(ref x) = td.pid_mode {
444        map.insert("pidMode".into(), json!(x));
445    }
446    if let Some(ref x) = td.ipc_mode {
447        map.insert("ipcMode".into(), json!(x));
448    }
449    if let Some(ref x) = td.proxy_configuration {
450        map.insert("proxyConfiguration".into(), x.clone());
451    }
452    if let Some(ref x) = td.ephemeral_storage {
453        map.insert("ephemeralStorage".into(), x.clone());
454    }
455    if let Some(ref x) = td.runtime_platform {
456        map.insert("runtimePlatform".into(), x.clone());
457    }
458    if let Some(ref x) = td.registered_by {
459        map.insert("registeredBy".into(), json!(x));
460    }
461    map.insert("registeredAt".into(), json!(td.registered_at.timestamp()));
462    if let Some(ts) = td.deregistered_at {
463        map.insert("deregisteredAt".into(), json!(ts.timestamp()));
464    }
465    if let Some(enabled) = td.enable_fault_injection {
466        map.insert("enableFaultInjection".into(), json!(enabled));
467    }
468    Value::Object(map)
469}
470
471/// Resolve a service ARN tail to its storage key in `state.services`.
472/// Long-form ARNs carry `cluster/service` directly; short-form
473/// (pre-Nov-2018, still emitted when `serviceLongArnFormat=disabled`)
474/// only carry the service name, so we scan for any cluster whose stored
475/// key ends with `/<name>`.
476fn resolve_service_key(state: &EcsState, tail: &str) -> Option<String> {
477    if tail.contains('/') {
478        return state.services.contains_key(tail).then(|| tail.to_string());
479    }
480    let suffix = format!("/{tail}");
481    state
482        .services
483        .keys()
484        .find(|k| k.ends_with(&suffix))
485        .cloned()
486}
487
488/// Same idea for container instances. Storage keys are
489/// `cluster/instance-id`; short ARN tails are just `instance-id`.
490fn resolve_container_instance_key(state: &EcsState, tail: &str) -> Option<String> {
491    if tail.contains('/') {
492        return state
493            .container_instances
494            .contains_key(tail)
495            .then(|| tail.to_string());
496    }
497    let suffix = format!("/{tail}");
498    state
499        .container_instances
500        .keys()
501        .find(|k| k.ends_with(&suffix))
502        .cloned()
503}
504
505/// Decode an `arn:aws:ecs:<region>:<account>:<type>/<name>[:<rev>]` ARN
506/// into `(account, resource_type, tail)`. For task definitions `tail` is
507/// `family:revision`; for clusters it's `cluster_name`.
508fn decode_ecs_arn(arn: &str) -> Result<(String, String, String), AwsServiceError> {
509    let rest = arn
510        .strip_prefix("arn:aws:ecs:")
511        .ok_or_else(|| invalid_parameter(format!("Malformed ECS ARN: {arn}")))?;
512    // Resource portion may itself contain a trailing `:<revision>`, so we
513    // split at most three ways then treat the remainder as the resource.
514    let mut parts = rest.splitn(3, ':');
515    let _region = parts
516        .next()
517        .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
518    let account = parts
519        .next()
520        .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
521    let resource = parts
522        .next()
523        .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
524    let (resource_type, tail) = resource
525        .split_once('/')
526        .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
527    Ok((
528        account.to_string(),
529        resource_type.to_string(),
530        tail.to_string(),
531    ))
532}
533
534/// Parse a `family[:revision]` reference. Returns `(family, Some(rev))`
535/// when a specific revision is requested, or `(family, None)` for the
536/// latest-active shorthand.
537fn parse_family_revision(input: &str) -> (String, Option<i32>) {
538    if let Some((family, rev)) = input.rsplit_once(':') {
539        if let Ok(n) = rev.parse::<i32>() {
540            return (family.to_string(), Some(n));
541        }
542    }
543    (input.to_string(), None)
544}
545
546/// Task-definition ARNs may appear as the full ARN or just `family:rev`.
547/// Returns `(account, family, Some(rev))` where `account` is `None` for
548/// the bare shorthand form.
549fn resolve_task_definition_ref(
550    input: &str,
551) -> Result<(Option<String>, String, Option<i32>), AwsServiceError> {
552    if input.starts_with("arn:aws:ecs:") {
553        let (account, resource_type, tail) = decode_ecs_arn(input)?;
554        if resource_type != "task-definition" {
555            return Err(invalid_parameter(format!(
556                "Expected task-definition ARN: {input}"
557            )));
558        }
559        let (family, rev) = parse_family_revision(&tail);
560        Ok((Some(account), family, rev))
561    } else {
562        let (family, rev) = parse_family_revision(input);
563        Ok((None, family, rev))
564    }
565}
566
567fn target_account_for_task_definition(request: &AwsRequest, td_ref: &str) -> String {
568    if let Ok((Some(account), _, _)) = resolve_task_definition_ref(td_ref) {
569        account
570    } else {
571        request.account_id.clone()
572    }
573}
574
575fn target_account_for_cluster(request: &AwsRequest, cluster_ref: Option<&str>) -> String {
576    if let Some(input) = cluster_ref {
577        if input.starts_with("arn:aws:ecs:") {
578            if let Ok((account, _, _)) = decode_ecs_arn(input) {
579                return account;
580            }
581        }
582    }
583    request.account_id.clone()
584}
585
586fn latest_active_revision(
587    revisions: &std::collections::BTreeMap<i32, TaskDefinition>,
588) -> Option<&TaskDefinition> {
589    revisions.values().rev().find(|td| td.status == "ACTIVE")
590}
591
592// -------- operations: clusters --------
593
594impl EcsService {
595    fn create_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
596        let body = request.json_body();
597        let cluster_name = opt_str(&body, "clusterName")
598            .unwrap_or("default")
599            .to_string();
600        let tags = parse_tags(&body);
601        let settings: Vec<Value> = body
602            .get("settings")
603            .and_then(|v| v.as_array())
604            .cloned()
605            .unwrap_or_default();
606        let configuration = body.get("configuration").cloned();
607        let capacity_providers: Vec<String> = body
608            .get("capacityProviders")
609            .and_then(|v| v.as_array())
610            .map(|arr| {
611                arr.iter()
612                    .filter_map(|v| v.as_str().map(String::from))
613                    .collect()
614            })
615            .unwrap_or_default();
616        let default_strategy: Vec<Value> = body
617            .get("defaultCapacityProviderStrategy")
618            .and_then(|v| v.as_array())
619            .cloned()
620            .unwrap_or_default();
621        let service_connect = body.get("serviceConnectDefaults").cloned();
622
623        let account = request.account_id.clone();
624        let mut accounts = self.state.write();
625        let state = accounts.get_or_create(&account);
626        let arn = state.cluster_arn(&cluster_name);
627        let mut cluster = Cluster::new(&cluster_name, arn);
628        cluster.tags = tags;
629        cluster.settings = settings;
630        cluster.configuration = configuration;
631        cluster.capacity_providers = capacity_providers;
632        cluster.default_capacity_provider_strategy = default_strategy;
633        cluster.service_connect_defaults = service_connect;
634        // CreateCluster on an existing cluster is idempotent-ish — AWS
635        // returns the existing cluster, potentially with merged settings.
636        // We keep it simple and overwrite on recreate.
637        state.clusters.insert(cluster_name.clone(), cluster.clone());
638
639        Ok(AwsResponse::ok_json(json!({
640            "cluster": cluster_to_json(&cluster),
641        })))
642    }
643
644    fn describe_clusters(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
645        let body = request.json_body();
646        let names: Vec<String> = body
647            .get("clusters")
648            .and_then(|v| v.as_array())
649            .map(|arr| {
650                arr.iter()
651                    .filter_map(|v| v.as_str().map(|s| EcsState::resolve_cluster_name(Some(s))))
652                    .collect()
653            })
654            .unwrap_or_else(|| vec!["default".to_string()]);
655
656        let account = request.account_id.clone();
657        let accounts = self.state.read();
658        let mut found = Vec::new();
659        let mut failures = Vec::new();
660        if let Some(state) = accounts.get(&account) {
661            for name in &names {
662                match state.clusters.get(name) {
663                    Some(c) => found.push(cluster_to_json(c)),
664                    None => failures.push(json!({
665                        "arn": state.cluster_arn(name),
666                        "reason": "MISSING",
667                    })),
668                }
669            }
670        } else {
671            for name in &names {
672                failures.push(json!({
673                    "arn": format!(
674                        "arn:aws:ecs:{}:{}:cluster/{}",
675                        accounts.region(),
676                        account,
677                        name
678                    ),
679                    "reason": "MISSING",
680                }));
681            }
682        }
683        Ok(AwsResponse::ok_json(json!({
684            "clusters": found,
685            "failures": failures,
686        })))
687    }
688
689    fn delete_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
690        let body = request.json_body();
691        let cluster_ref = opt_str(&body, "cluster");
692        let name = EcsState::resolve_cluster_name(cluster_ref);
693        let account = target_account_for_cluster(request, cluster_ref);
694
695        let mut accounts = self.state.write();
696        let state = accounts.get_or_create(&account);
697        let cluster = state
698            .clusters
699            .get_mut(&name)
700            .ok_or_else(|| cluster_not_found(&name))?;
701        if cluster.active_services_count > 0 {
702            return Err(cluster_contains_services());
703        }
704        if cluster.running_tasks_count > 0 || cluster.pending_tasks_count > 0 {
705            return Err(cluster_contains_tasks());
706        }
707        cluster.status = "INACTIVE".to_string();
708        let snapshot = cluster.clone();
709        // Real ECS keeps the cluster visible as INACTIVE for about an
710        // hour before garbage-collecting it. We drop it immediately to
711        // keep state bounded — callers that try to describe it by name
712        // will get a MISSING failure, matching the long-tail behaviour.
713        state.clusters.remove(&name);
714        Ok(AwsResponse::ok_json(json!({
715            "cluster": cluster_to_json(&snapshot),
716        })))
717    }
718
719    fn list_clusters(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
720        let body = request.json_body();
721        let max_results = body
722            .get("maxResults")
723            .and_then(|v| v.as_i64())
724            .filter(|n| (1..=100).contains(n))
725            .map(|n| n as usize)
726            .unwrap_or(100);
727        let next_token = opt_str(&body, "nextToken").unwrap_or("");
728
729        let account = request.account_id.clone();
730        let accounts = self.state.read();
731        let arns: Vec<String> = match accounts.get(&account) {
732            Some(state) => state
733                .clusters
734                .values()
735                .map(|c| c.cluster_arn.clone())
736                .collect(),
737            None => Vec::new(),
738        };
739        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
740        let end = (start + max_results).min(arns.len());
741        let page = arns[start..end].to_vec();
742        let next = if end < arns.len() {
743            Some(end.to_string())
744        } else {
745            None
746        };
747        let mut out = json!({ "clusterArns": page });
748        if let Some(n) = next {
749            out.as_object_mut()
750                .unwrap()
751                .insert("nextToken".into(), json!(n));
752        }
753        Ok(AwsResponse::ok_json(out))
754    }
755
756    fn update_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
757        let body = request.json_body();
758        let cluster_ref = req_str(&body, "cluster")?;
759        let name = EcsState::resolve_cluster_name(Some(cluster_ref));
760        let account = target_account_for_cluster(request, Some(cluster_ref));
761
762        let mut accounts = self.state.write();
763        let state = accounts.get_or_create(&account);
764        let cluster = state
765            .clusters
766            .get_mut(&name)
767            .ok_or_else(|| cluster_not_found(&name))?;
768        if let Some(settings) = body.get("settings").and_then(|v| v.as_array()) {
769            cluster.settings = settings.clone();
770        }
771        if let Some(cfg) = body.get("configuration") {
772            cluster.configuration = Some(cfg.clone());
773        }
774        if let Some(sc) = body.get("serviceConnectDefaults") {
775            cluster.service_connect_defaults = Some(sc.clone());
776        }
777        let snapshot = cluster.clone();
778        Ok(AwsResponse::ok_json(json!({
779            "cluster": cluster_to_json(&snapshot),
780        })))
781    }
782
783    fn update_cluster_settings(
784        &self,
785        request: &AwsRequest,
786    ) -> Result<AwsResponse, AwsServiceError> {
787        let body = request.json_body();
788        let cluster_ref = req_str(&body, "cluster")?;
789        let name = EcsState::resolve_cluster_name(Some(cluster_ref));
790        let account = target_account_for_cluster(request, Some(cluster_ref));
791        let settings: Vec<Value> = body
792            .get("settings")
793            .and_then(|v| v.as_array())
794            .cloned()
795            .unwrap_or_default();
796
797        let mut accounts = self.state.write();
798        let state = accounts.get_or_create(&account);
799        let cluster = state
800            .clusters
801            .get_mut(&name)
802            .ok_or_else(|| cluster_not_found(&name))?;
803        cluster.settings = settings;
804        let snapshot = cluster.clone();
805        Ok(AwsResponse::ok_json(json!({
806            "cluster": cluster_to_json(&snapshot),
807        })))
808    }
809
810    fn put_cluster_capacity_providers(
811        &self,
812        request: &AwsRequest,
813    ) -> Result<AwsResponse, AwsServiceError> {
814        let body = request.json_body();
815        let cluster_ref = req_str(&body, "cluster")?;
816        let name = EcsState::resolve_cluster_name(Some(cluster_ref));
817        let account = target_account_for_cluster(request, Some(cluster_ref));
818        let capacity_providers: Vec<String> = body
819            .get("capacityProviders")
820            .and_then(|v| v.as_array())
821            .map(|arr| {
822                arr.iter()
823                    .filter_map(|v| v.as_str().map(String::from))
824                    .collect()
825            })
826            .ok_or_else(|| client_exception("Missing required field: capacityProviders"))?;
827        let default_strategy: Vec<Value> = body
828            .get("defaultCapacityProviderStrategy")
829            .and_then(|v| v.as_array())
830            .cloned()
831            .ok_or_else(|| {
832                client_exception("Missing required field: defaultCapacityProviderStrategy")
833            })?;
834
835        let mut accounts = self.state.write();
836        let state = accounts.get_or_create(&account);
837        let cluster = state
838            .clusters
839            .get_mut(&name)
840            .ok_or_else(|| cluster_not_found(&name))?;
841        cluster.capacity_providers = capacity_providers;
842        cluster.default_capacity_provider_strategy = default_strategy;
843        let snapshot = cluster.clone();
844        Ok(AwsResponse::ok_json(json!({
845            "cluster": cluster_to_json(&snapshot),
846        })))
847    }
848}
849
850// -------- operations: task definitions --------
851
852impl EcsService {
853    fn register_task_definition(
854        &self,
855        request: &AwsRequest,
856    ) -> Result<AwsResponse, AwsServiceError> {
857        let body = request.json_body();
858        let family = req_str(&body, "family")?.to_string();
859        validate_family_name(&family)?;
860        let container_definitions = body
861            .get("containerDefinitions")
862            .and_then(|v| v.as_array())
863            .cloned()
864            .ok_or_else(|| client_exception("Missing required field: containerDefinitions"))?;
865        if container_definitions.is_empty() {
866            return Err(client_exception(
867                "Task definition must have at least one container",
868            ));
869        }
870        for cd in &container_definitions {
871            if cd
872                .get("name")
873                .and_then(|v| v.as_str())
874                .unwrap_or("")
875                .is_empty()
876            {
877                return Err(client_exception(
878                    "Container definition is missing required field: name",
879                ));
880            }
881            if cd
882                .get("image")
883                .and_then(|v| v.as_str())
884                .unwrap_or("")
885                .is_empty()
886            {
887                return Err(client_exception(
888                    "Container definition is missing required field: image",
889                ));
890            }
891        }
892        // PassRole trust check on the task + execution roles. Real AWS
893        // rejects RegisterTaskDefinition when the role's trust policy
894        // doesn't list `ecs-tasks.amazonaws.com`.
895        if let Some(role_arn) = opt_str(&body, "taskRoleArn") {
896            self.check_pass_role(&request.account_id, role_arn)?;
897        }
898        if let Some(role_arn) = opt_str(&body, "executionRoleArn") {
899            self.check_pass_role(&request.account_id, role_arn)?;
900        }
901
902        let tags = parse_tags(&body);
903        let requires_compatibilities: Vec<String> = body
904            .get("requiresCompatibilities")
905            .and_then(|v| v.as_array())
906            .map(|arr| {
907                arr.iter()
908                    .filter_map(|v| v.as_str().map(String::from))
909                    .collect()
910            })
911            .unwrap_or_default();
912        // Compatibilities reflect what the task definition is compatible with.
913        // We always claim EC2 and FARGATE since we execute via Docker either
914        // way — callers with stricter requirements set `requiresCompatibilities`.
915        let compatibilities = vec!["EC2".to_string(), "FARGATE".to_string()];
916
917        let account = request.account_id.clone();
918        let mut accounts = self.state.write();
919        let state = accounts.get_or_create(&account);
920        let revision = state.allocate_revision(&family);
921        let arn = state.task_definition_arn(&family, revision);
922        let td = TaskDefinition {
923            family: family.clone(),
924            revision,
925            task_definition_arn: arn,
926            container_definitions,
927            status: "ACTIVE".to_string(),
928            task_role_arn: opt_str(&body, "taskRoleArn").map(String::from),
929            execution_role_arn: opt_str(&body, "executionRoleArn").map(String::from),
930            network_mode: opt_str(&body, "networkMode").map(String::from),
931            requires_compatibilities,
932            compatibilities,
933            cpu: opt_str(&body, "cpu").map(String::from),
934            memory: opt_str(&body, "memory").map(String::from),
935            pid_mode: opt_str(&body, "pidMode").map(String::from),
936            ipc_mode: opt_str(&body, "ipcMode").map(String::from),
937            volumes: body
938                .get("volumes")
939                .and_then(|v| v.as_array())
940                .cloned()
941                .unwrap_or_default(),
942            placement_constraints: body
943                .get("placementConstraints")
944                .and_then(|v| v.as_array())
945                .cloned()
946                .unwrap_or_default(),
947            proxy_configuration: body.get("proxyConfiguration").cloned(),
948            inference_accelerators: body
949                .get("inferenceAccelerators")
950                .and_then(|v| v.as_array())
951                .cloned()
952                .unwrap_or_default(),
953            ephemeral_storage: body.get("ephemeralStorage").cloned(),
954            runtime_platform: body.get("runtimePlatform").cloned(),
955            requires_attributes: Vec::new(),
956            registered_at: Utc::now(),
957            registered_by: request
958                .principal
959                .as_ref()
960                .map(|p| p.arn.clone())
961                .or(Some(format!("arn:aws:iam::{}:root", state.account_id))),
962            deregistered_at: None,
963            tags: tags.clone(),
964            enable_fault_injection: body.get("enableFaultInjection").and_then(|v| v.as_bool()),
965        };
966        let td_json = task_definition_to_json(&td);
967        state
968            .task_definitions
969            .entry(family.clone())
970            .or_default()
971            .insert(revision, td);
972
973        Ok(AwsResponse::ok_json(json!({
974            "taskDefinition": td_json,
975            "tags": tags_json(&tags),
976        })))
977    }
978
979    fn describe_task_definition(
980        &self,
981        request: &AwsRequest,
982    ) -> Result<AwsResponse, AwsServiceError> {
983        let body = request.json_body();
984        let td_ref = req_str(&body, "taskDefinition")?;
985        let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
986        let include_tags = body
987            .get("include")
988            .and_then(|v| v.as_array())
989            .map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
990            .unwrap_or(false);
991
992        let account = target_account_for_task_definition(request, td_ref);
993        let accounts = self.state.read();
994        let state = accounts
995            .get(&account)
996            .ok_or_else(|| task_definition_not_found(td_ref))?;
997        let revisions = state
998            .task_definitions
999            .get(&family)
1000            .ok_or_else(|| task_definition_not_found(td_ref))?;
1001        let td = match rev {
1002            Some(n) => revisions
1003                .get(&n)
1004                .ok_or_else(|| task_definition_not_found(td_ref))?,
1005            None => latest_active_revision(revisions)
1006                .ok_or_else(|| task_definition_not_found(td_ref))?,
1007        };
1008        let mut out = json!({"taskDefinition": task_definition_to_json(td)});
1009        if include_tags {
1010            out.as_object_mut()
1011                .unwrap()
1012                .insert("tags".into(), tags_json(&td.tags));
1013        }
1014        Ok(AwsResponse::ok_json(out))
1015    }
1016
1017    fn deregister_task_definition(
1018        &self,
1019        request: &AwsRequest,
1020    ) -> Result<AwsResponse, AwsServiceError> {
1021        let body = request.json_body();
1022        let td_ref = req_str(&body, "taskDefinition")?;
1023        let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
1024        let rev =
1025            rev.ok_or_else(|| client_exception("taskDefinition must reference a revision"))?;
1026
1027        let account = target_account_for_task_definition(request, td_ref);
1028        let mut accounts = self.state.write();
1029        let state = accounts.get_or_create(&account);
1030        let revisions = state
1031            .task_definitions
1032            .get_mut(&family)
1033            .ok_or_else(|| task_definition_not_found(td_ref))?;
1034        let td = revisions
1035            .get_mut(&rev)
1036            .ok_or_else(|| task_definition_not_found(td_ref))?;
1037        td.status = "INACTIVE".to_string();
1038        td.deregistered_at = Some(Utc::now());
1039        let snapshot = td.clone();
1040        Ok(AwsResponse::ok_json(json!({
1041            "taskDefinition": task_definition_to_json(&snapshot),
1042        })))
1043    }
1044
1045    fn delete_task_definitions(
1046        &self,
1047        request: &AwsRequest,
1048    ) -> Result<AwsResponse, AwsServiceError> {
1049        let body = request.json_body();
1050        let refs: Vec<String> = body
1051            .get("taskDefinitions")
1052            .and_then(|v| v.as_array())
1053            .map(|arr| {
1054                arr.iter()
1055                    .filter_map(|v| v.as_str().map(String::from))
1056                    .collect()
1057            })
1058            .ok_or_else(|| client_exception("Missing required field: taskDefinitions"))?;
1059        if refs.is_empty() {
1060            return Err(client_exception("taskDefinitions must not be empty"));
1061        }
1062
1063        let mut deleted = Vec::new();
1064        let mut failures = Vec::new();
1065        let account = request.account_id.clone();
1066        let mut accounts = self.state.write();
1067        let state = accounts.get_or_create(&account);
1068        for input in refs {
1069            let parsed = match resolve_task_definition_ref(&input) {
1070                Ok((_, family, Some(rev))) => Some((family, rev)),
1071                Ok(_) => None,
1072                Err(_) => None,
1073            };
1074            let Some((family, rev)) = parsed else {
1075                failures.push(json!({
1076                    "arn": input,
1077                    "reason": "INVALID_REFERENCE",
1078                    "detail": "Expected family:revision or full task-definition ARN",
1079                }));
1080                continue;
1081            };
1082            let Some(revisions) = state.task_definitions.get_mut(&family) else {
1083                failures.push(json!({"arn": input, "reason": "MISSING"}));
1084                continue;
1085            };
1086            let Some(td) = revisions.get_mut(&rev) else {
1087                failures.push(json!({"arn": input, "reason": "MISSING"}));
1088                continue;
1089            };
1090            if td.status == "ACTIVE" {
1091                failures.push(json!({
1092                    "arn": td.task_definition_arn.clone(),
1093                    "reason": "MUST_BE_INACTIVE",
1094                    "detail": "Task definitions must be deregistered before they can be deleted",
1095                }));
1096                continue;
1097            }
1098            td.status = "DELETE_IN_PROGRESS".to_string();
1099            deleted.push(task_definition_to_json(td));
1100        }
1101        Ok(AwsResponse::ok_json(json!({
1102            "taskDefinitions": deleted,
1103            "failures": failures,
1104        })))
1105    }
1106
1107    fn list_task_definitions(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1108        let body = request.json_body();
1109        let family_prefix = opt_str(&body, "familyPrefix");
1110        let status = opt_str(&body, "status").unwrap_or("ACTIVE");
1111        let sort = opt_str(&body, "sort").unwrap_or("ASC");
1112        let max_results = body
1113            .get("maxResults")
1114            .and_then(|v| v.as_i64())
1115            .filter(|n| (1..=100).contains(n))
1116            .map(|n| n as usize)
1117            .unwrap_or(100);
1118        let next_token = opt_str(&body, "nextToken").unwrap_or("");
1119
1120        let account = request.account_id.clone();
1121        let accounts = self.state.read();
1122        let mut arns: Vec<String> = Vec::new();
1123        if let Some(state) = accounts.get(&account) {
1124            for (family, revisions) in &state.task_definitions {
1125                if let Some(prefix) = family_prefix {
1126                    if !family.starts_with(prefix) {
1127                        continue;
1128                    }
1129                }
1130                for td in revisions.values() {
1131                    if td.status == status {
1132                        arns.push(td.task_definition_arn.clone());
1133                    }
1134                }
1135            }
1136        }
1137        if sort == "DESC" {
1138            arns.sort();
1139            arns.reverse();
1140        } else {
1141            arns.sort();
1142        }
1143        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
1144        let end = (start + max_results).min(arns.len());
1145        let page = arns[start..end].to_vec();
1146        let mut out = json!({"taskDefinitionArns": page});
1147        if end < arns.len() {
1148            out.as_object_mut()
1149                .unwrap()
1150                .insert("nextToken".into(), json!(end.to_string()));
1151        }
1152        Ok(AwsResponse::ok_json(out))
1153    }
1154
1155    fn list_task_definition_families(
1156        &self,
1157        request: &AwsRequest,
1158    ) -> Result<AwsResponse, AwsServiceError> {
1159        let body = request.json_body();
1160        let family_prefix = opt_str(&body, "familyPrefix");
1161        let status = opt_str(&body, "status").unwrap_or("ACTIVE");
1162        let max_results = body
1163            .get("maxResults")
1164            .and_then(|v| v.as_i64())
1165            .filter(|n| (1..=100).contains(n))
1166            .map(|n| n as usize)
1167            .unwrap_or(100);
1168        let next_token = opt_str(&body, "nextToken").unwrap_or("");
1169
1170        let account = request.account_id.clone();
1171        let accounts = self.state.read();
1172        let mut families: Vec<String> = Vec::new();
1173        if let Some(state) = accounts.get(&account) {
1174            for (family, revisions) in &state.task_definitions {
1175                if let Some(prefix) = family_prefix {
1176                    if !family.starts_with(prefix) {
1177                        continue;
1178                    }
1179                }
1180                let matches_status = match status {
1181                    "ACTIVE" => revisions.values().any(|td| td.status == "ACTIVE"),
1182                    "INACTIVE" => revisions
1183                        .values()
1184                        .all(|td| td.status == "INACTIVE" || td.status == "DELETE_IN_PROGRESS"),
1185                    "ALL" => true,
1186                    _ => revisions.values().any(|td| td.status == status),
1187                };
1188                if matches_status {
1189                    families.push(family.clone());
1190                }
1191            }
1192        }
1193        families.sort();
1194        let start = next_token.parse::<usize>().unwrap_or(0).min(families.len());
1195        let end = (start + max_results).min(families.len());
1196        let page = families[start..end].to_vec();
1197        let mut out = json!({"families": page});
1198        if end < families.len() {
1199            out.as_object_mut()
1200                .unwrap()
1201                .insert("nextToken".into(), json!(end.to_string()));
1202        }
1203        Ok(AwsResponse::ok_json(out))
1204    }
1205}
1206
1207fn validate_family_name(family: &str) -> Result<(), AwsServiceError> {
1208    if family.is_empty() || family.len() > 255 {
1209        return Err(invalid_parameter(
1210            "Task definition family must be 1-255 characters",
1211        ));
1212    }
1213    let ok = family
1214        .chars()
1215        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-');
1216    if !ok {
1217        return Err(invalid_parameter(
1218            "Task definition family may only contain letters, numbers, hyphens, and underscores",
1219        ));
1220    }
1221    Ok(())
1222}
1223
1224// -------- operations: tagging --------
1225
1226impl EcsService {
1227    fn tag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1228        let body = request.json_body();
1229        let arn = req_str(&body, "resourceArn")?.to_string();
1230        let tags = parse_tags(&body);
1231        let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1232        let mut accounts = self.state.write();
1233        let state = accounts.get_or_create(&account);
1234        match resource_type.as_str() {
1235            "cluster" => {
1236                let cluster = state
1237                    .clusters
1238                    .get_mut(&tail)
1239                    .ok_or_else(|| resource_not_found(&arn))?;
1240                merge_tags(&mut cluster.tags, tags);
1241            }
1242            "task-definition" => {
1243                let (family, rev) = parse_family_revision(&tail);
1244                let rev = rev.ok_or_else(|| {
1245                    invalid_parameter("task-definition ARN must include revision")
1246                })?;
1247                let td = state
1248                    .task_definitions
1249                    .get_mut(&family)
1250                    .and_then(|m| m.get_mut(&rev))
1251                    .ok_or_else(|| resource_not_found(&arn))?;
1252                merge_tags(&mut td.tags, tags);
1253            }
1254            "service" => {
1255                let key =
1256                    resolve_service_key(state, &tail).ok_or_else(|| resource_not_found(&arn))?;
1257                let svc = state.services.get_mut(&key).expect("resolved key exists");
1258                merge_tags(&mut svc.tags, tags);
1259            }
1260            "task" => {
1261                let task_id = tail.rsplit('/').next().unwrap_or(&tail).to_string();
1262                let task = state
1263                    .tasks
1264                    .get_mut(&task_id)
1265                    .ok_or_else(|| resource_not_found(&arn))?;
1266                merge_tags(&mut task.tags, tags);
1267            }
1268            "task-set" => {
1269                let ts = state
1270                    .task_sets
1271                    .get_mut(&tail)
1272                    .ok_or_else(|| resource_not_found(&arn))?;
1273                merge_tags(&mut ts.tags, tags);
1274            }
1275            "container-instance" => {
1276                let key = resolve_container_instance_key(state, &tail)
1277                    .ok_or_else(|| resource_not_found(&arn))?;
1278                let ci = state
1279                    .container_instances
1280                    .get_mut(&key)
1281                    .expect("resolved key exists");
1282                merge_tags(&mut ci.tags, tags);
1283            }
1284            "capacity-provider" => {
1285                let cp = state
1286                    .capacity_providers
1287                    .get_mut(&tail)
1288                    .ok_or_else(|| resource_not_found(&arn))?;
1289                merge_tags(&mut cp.tags, tags);
1290            }
1291            other => {
1292                return Err(invalid_parameter(format!(
1293                    "Unknown ECS resource type: {other}"
1294                )));
1295            }
1296        }
1297        Ok(AwsResponse::ok_json(json!({})))
1298    }
1299
1300    fn untag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1301        let body = request.json_body();
1302        let arn = req_str(&body, "resourceArn")?.to_string();
1303        let keys: Vec<String> = body
1304            .get("tagKeys")
1305            .and_then(|v| v.as_array())
1306            .map(|arr| {
1307                arr.iter()
1308                    .filter_map(|v| v.as_str().map(String::from))
1309                    .collect()
1310            })
1311            .unwrap_or_default();
1312        let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1313        let mut accounts = self.state.write();
1314        let state = accounts.get_or_create(&account);
1315        match resource_type.as_str() {
1316            "cluster" => {
1317                let cluster = state
1318                    .clusters
1319                    .get_mut(&tail)
1320                    .ok_or_else(|| resource_not_found(&arn))?;
1321                cluster.tags.retain(|t| !keys.contains(&t.key));
1322            }
1323            "task-definition" => {
1324                let (family, rev) = parse_family_revision(&tail);
1325                let rev = rev.ok_or_else(|| {
1326                    invalid_parameter("task-definition ARN must include revision")
1327                })?;
1328                let td = state
1329                    .task_definitions
1330                    .get_mut(&family)
1331                    .and_then(|m| m.get_mut(&rev))
1332                    .ok_or_else(|| resource_not_found(&arn))?;
1333                td.tags.retain(|t| !keys.contains(&t.key));
1334            }
1335            "service" => {
1336                let key =
1337                    resolve_service_key(state, &tail).ok_or_else(|| resource_not_found(&arn))?;
1338                let svc = state.services.get_mut(&key).expect("resolved key exists");
1339                svc.tags.retain(|t| !keys.contains(&t.key));
1340            }
1341            "task" => {
1342                let task_id = tail.rsplit('/').next().unwrap_or(&tail).to_string();
1343                let task = state
1344                    .tasks
1345                    .get_mut(&task_id)
1346                    .ok_or_else(|| resource_not_found(&arn))?;
1347                task.tags.retain(|t| !keys.contains(&t.key));
1348            }
1349            "task-set" => {
1350                let ts = state
1351                    .task_sets
1352                    .get_mut(&tail)
1353                    .ok_or_else(|| resource_not_found(&arn))?;
1354                ts.tags.retain(|t| !keys.contains(&t.key));
1355            }
1356            "container-instance" => {
1357                let key = resolve_container_instance_key(state, &tail)
1358                    .ok_or_else(|| resource_not_found(&arn))?;
1359                let ci = state
1360                    .container_instances
1361                    .get_mut(&key)
1362                    .expect("resolved key exists");
1363                ci.tags.retain(|t| !keys.contains(&t.key));
1364            }
1365            "capacity-provider" => {
1366                let cp = state
1367                    .capacity_providers
1368                    .get_mut(&tail)
1369                    .ok_or_else(|| resource_not_found(&arn))?;
1370                cp.tags.retain(|t| !keys.contains(&t.key));
1371            }
1372            other => {
1373                return Err(invalid_parameter(format!(
1374                    "Unknown ECS resource type: {other}"
1375                )));
1376            }
1377        }
1378        Ok(AwsResponse::ok_json(json!({})))
1379    }
1380
1381    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1382        let body = request.json_body();
1383        let arn = req_str(&body, "resourceArn")?.to_string();
1384        let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1385        let accounts = self.state.read();
1386        let state = accounts
1387            .get(&account)
1388            .ok_or_else(|| resource_not_found(&arn))?;
1389        let tags = match resource_type.as_str() {
1390            "cluster" => state
1391                .clusters
1392                .get(&tail)
1393                .map(|c| c.tags.clone())
1394                .ok_or_else(|| resource_not_found(&arn))?,
1395            "task-definition" => {
1396                let (family, rev) = parse_family_revision(&tail);
1397                let rev = rev.ok_or_else(|| {
1398                    invalid_parameter("task-definition ARN must include revision")
1399                })?;
1400                state
1401                    .task_definitions
1402                    .get(&family)
1403                    .and_then(|m| m.get(&rev))
1404                    .map(|td| td.tags.clone())
1405                    .ok_or_else(|| resource_not_found(&arn))?
1406            }
1407            "service" => {
1408                let key =
1409                    resolve_service_key(state, &tail).ok_or_else(|| resource_not_found(&arn))?;
1410                state
1411                    .services
1412                    .get(&key)
1413                    .map(|s| s.tags.clone())
1414                    .expect("resolved key exists")
1415            }
1416            "task" => {
1417                let task_id = tail.rsplit('/').next().unwrap_or(&tail).to_string();
1418                state
1419                    .tasks
1420                    .get(&task_id)
1421                    .map(|t| t.tags.clone())
1422                    .ok_or_else(|| resource_not_found(&arn))?
1423            }
1424            "task-set" => state
1425                .task_sets
1426                .get(&tail)
1427                .map(|t| t.tags.clone())
1428                .ok_or_else(|| resource_not_found(&arn))?,
1429            "container-instance" => {
1430                let key = resolve_container_instance_key(state, &tail)
1431                    .ok_or_else(|| resource_not_found(&arn))?;
1432                state
1433                    .container_instances
1434                    .get(&key)
1435                    .map(|c| c.tags.clone())
1436                    .expect("resolved key exists")
1437            }
1438            "capacity-provider" => state
1439                .capacity_providers
1440                .get(&tail)
1441                .map(|c| c.tags.clone())
1442                .ok_or_else(|| resource_not_found(&arn))?,
1443            other => {
1444                return Err(invalid_parameter(format!(
1445                    "Unknown ECS resource type: {other}"
1446                )));
1447            }
1448        };
1449        Ok(AwsResponse::ok_json(json!({"tags": tags_json(&tags)})))
1450    }
1451}
1452
1453fn resource_not_found(arn: &str) -> AwsServiceError {
1454    AwsServiceError::aws_error(
1455        StatusCode::BAD_REQUEST,
1456        "ClientException",
1457        format!("The referenced resource was not found: {arn}"),
1458    )
1459}
1460
1461// -------- operations: account settings --------
1462
1463impl EcsService {
1464    fn put_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1465        let body = request.json_body();
1466        let name = req_str(&body, "name")?.to_string();
1467        let value = req_str(&body, "value")?.to_string();
1468        let principal_arn = opt_str(&body, "principalArn")
1469            .map(String::from)
1470            .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1471            .unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
1472        let account = request.account_id.clone();
1473        let mut accounts = self.state.write();
1474        let state = accounts.get_or_create(&account);
1475        state
1476            .principal_account_settings
1477            .entry(principal_arn.clone())
1478            .or_default()
1479            .insert(name.clone(), value.clone());
1480        Ok(AwsResponse::ok_json(json!({
1481            "setting": {
1482                "name": name,
1483                "value": value,
1484                "principalArn": principal_arn,
1485            }
1486        })))
1487    }
1488
1489    fn put_account_setting_default(
1490        &self,
1491        request: &AwsRequest,
1492    ) -> Result<AwsResponse, AwsServiceError> {
1493        let body = request.json_body();
1494        let name = req_str(&body, "name")?.to_string();
1495        let value = req_str(&body, "value")?.to_string();
1496        let account = request.account_id.clone();
1497        let mut accounts = self.state.write();
1498        let state = accounts.get_or_create(&account);
1499        state
1500            .account_setting_defaults
1501            .insert(name.clone(), value.clone());
1502        Ok(AwsResponse::ok_json(json!({
1503            "setting": {
1504                "name": name,
1505                "value": value,
1506                "principalArn": format!("arn:aws:iam::{}:root", state.account_id),
1507            }
1508        })))
1509    }
1510
1511    fn delete_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1512        let body = request.json_body();
1513        let name = req_str(&body, "name")?.to_string();
1514        let principal_arn = opt_str(&body, "principalArn")
1515            .map(String::from)
1516            .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1517            .unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
1518        let account = request.account_id.clone();
1519        let mut accounts = self.state.write();
1520        let state = accounts.get_or_create(&account);
1521        let removed_value = state
1522            .principal_account_settings
1523            .get_mut(&principal_arn)
1524            .and_then(|m| m.remove(&name));
1525        Ok(AwsResponse::ok_json(json!({
1526            "setting": {
1527                "name": name,
1528                "value": removed_value.unwrap_or_default(),
1529                "principalArn": principal_arn,
1530            }
1531        })))
1532    }
1533
1534    fn list_account_settings(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1535        let body = request.json_body();
1536        let name_filter = opt_str(&body, "name");
1537        let value_filter = opt_str(&body, "value");
1538        let principal_filter = opt_str(&body, "principalArn");
1539        let effective_only = body
1540            .get("effectiveSettings")
1541            .and_then(|v| v.as_bool())
1542            .unwrap_or(false);
1543
1544        let account = request.account_id.clone();
1545        let accounts = self.state.read();
1546        let Some(state) = accounts.get(&account) else {
1547            return Ok(AwsResponse::ok_json(json!({"settings": []})));
1548        };
1549        let root_arn = format!("arn:aws:iam::{}:root", state.account_id);
1550        let mut settings: Vec<Value> = Vec::new();
1551
1552        if effective_only {
1553            // Merge principal overrides onto defaults, scoped to principal_filter
1554            // when supplied; otherwise use the caller's own principal.
1555            let principal = principal_filter
1556                .map(String::from)
1557                .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1558                .unwrap_or_else(|| root_arn.clone());
1559            let mut merged = state.account_setting_defaults.clone();
1560            if let Some(overrides) = state.principal_account_settings.get(&principal) {
1561                for (k, v) in overrides {
1562                    merged.insert(k.clone(), v.clone());
1563                }
1564            }
1565            for (k, v) in merged {
1566                if matches_filter(name_filter, &k) && matches_filter(value_filter, &v) {
1567                    settings.push(json!({
1568                        "name": k,
1569                        "value": v,
1570                        "principalArn": principal,
1571                    }));
1572                }
1573            }
1574        } else {
1575            // Raw listing: include defaults (under the root ARN) plus any
1576            // principal-specific settings.
1577            for (k, v) in &state.account_setting_defaults {
1578                if matches_filter(name_filter, k)
1579                    && matches_filter(value_filter, v)
1580                    && (principal_filter.is_none() || principal_filter == Some(root_arn.as_str()))
1581                {
1582                    settings.push(json!({
1583                        "name": k,
1584                        "value": v,
1585                        "principalArn": root_arn,
1586                    }));
1587                }
1588            }
1589            for (principal, entries) in &state.principal_account_settings {
1590                if principal_filter.is_some_and(|pf| pf != principal) {
1591                    continue;
1592                }
1593                for (k, v) in entries {
1594                    if matches_filter(name_filter, k) && matches_filter(value_filter, v) {
1595                        settings.push(json!({
1596                            "name": k,
1597                            "value": v,
1598                            "principalArn": principal,
1599                        }));
1600                    }
1601                }
1602            }
1603        }
1604
1605        Ok(AwsResponse::ok_json(json!({"settings": settings})))
1606    }
1607}
1608
1609fn matches_filter(filter: Option<&str>, value: &str) -> bool {
1610    filter.is_none_or(|f| f == value)
1611}
1612
1613// -------- operations: tasks --------
1614
1615impl EcsService {
1616    fn run_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1617        let body = request.json_body();
1618        let td_ref = req_str(&body, "taskDefinition")?;
1619        let cluster_ref = opt_str(&body, "cluster");
1620        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1621        let launch_type = opt_str(&body, "launchType")
1622            .unwrap_or("FARGATE")
1623            .to_string();
1624        let count = body
1625            .get("count")
1626            .and_then(|v| v.as_i64())
1627            .filter(|n| (1..=10).contains(n))
1628            .unwrap_or(1) as usize;
1629        let group = opt_str(&body, "group").map(String::from);
1630        let started_by = opt_str(&body, "startedBy").map(String::from);
1631        let tags = parse_tags(&body);
1632
1633        // PassRole trust check on any role overrides supplied via the
1634        // overrides.taskRoleArn / overrides.executionRoleArn fields.
1635        // The base task definition was already checked at Register time,
1636        // but RunTask can override either role and AWS re-validates the
1637        // trust policy on every call.
1638        if let Some(overrides) = body.get("overrides") {
1639            if let Some(role_arn) = opt_str(overrides, "taskRoleArn") {
1640                self.check_pass_role(&request.account_id, role_arn)?;
1641            }
1642            if let Some(role_arn) = opt_str(overrides, "executionRoleArn") {
1643                self.check_pass_role(&request.account_id, role_arn)?;
1644            }
1645        }
1646
1647        let account = request.account_id.clone();
1648        let runtime = self.runtime.clone();
1649        let mut accounts = self.state.write();
1650        let state = accounts.get_or_create(&account);
1651        let cluster_arn = state
1652            .clusters
1653            .get(&cluster_name)
1654            .map(|c| c.cluster_arn.clone())
1655            .unwrap_or_else(|| state.cluster_arn(&cluster_name));
1656        let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
1657        let revisions = state
1658            .task_definitions
1659            .get(&family)
1660            .ok_or_else(|| task_definition_not_found(td_ref))?;
1661        let td = match rev {
1662            Some(n) => revisions
1663                .get(&n)
1664                .ok_or_else(|| task_definition_not_found(td_ref))?,
1665            None => latest_active_revision(revisions)
1666                .ok_or_else(|| task_definition_not_found(td_ref))?,
1667        };
1668        if td.status != "ACTIVE" {
1669            return Err(client_exception(format!(
1670                "Task definition {} is not ACTIVE",
1671                td.task_definition_arn
1672            )));
1673        }
1674        let td_arn = td.task_definition_arn.clone();
1675        let td_family = td.family.clone();
1676        let td_revision = td.revision;
1677        let td_cpu = td.cpu.clone();
1678        let td_memory = td.memory.clone();
1679        let td_task_role = td.task_role_arn.clone();
1680        let td_exec_role = td.execution_role_arn.clone();
1681        let td_containers = td.container_definitions.clone();
1682
1683        let mut spawned_tasks: Vec<String> = Vec::new();
1684        let mut task_jsons: Vec<Value> = Vec::new();
1685        for _ in 0..count {
1686            let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
1687            let task_arn = state.task_arn(&cluster_name, &task_id);
1688            let containers: Vec<Container> = td_containers
1689                .iter()
1690                .map(|def| Container {
1691                    container_arn: format!(
1692                        "arn:aws:ecs:{}:{}:container/{}/{}/{}",
1693                        state.region,
1694                        state.account_id,
1695                        cluster_name,
1696                        task_id,
1697                        def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
1698                    ),
1699                    name: def
1700                        .get("name")
1701                        .and_then(|v| v.as_str())
1702                        .unwrap_or("app")
1703                        .to_string(),
1704                    image: def
1705                        .get("image")
1706                        .and_then(|v| v.as_str())
1707                        .unwrap_or("")
1708                        .to_string(),
1709                    task_arn: task_arn.clone(),
1710                    last_status: "PENDING".into(),
1711                    exit_code: None,
1712                    reason: None,
1713                    runtime_id: None,
1714                    essential: def
1715                        .get("essential")
1716                        .and_then(|v| v.as_bool())
1717                        .unwrap_or(true),
1718                    cpu: def
1719                        .get("cpu")
1720                        .and_then(|v| v.as_i64())
1721                        .map(|n| n.to_string()),
1722                    memory: def
1723                        .get("memory")
1724                        .and_then(|v| v.as_i64())
1725                        .map(|n| n.to_string()),
1726                    memory_reservation: def
1727                        .get("memoryReservation")
1728                        .and_then(|v| v.as_i64())
1729                        .map(|n| n.to_string()),
1730                    network_bindings: Vec::new(),
1731                    network_interfaces: Vec::new(),
1732                    health_status: Some("UNKNOWN".to_string()),
1733                    managed_agents: None,
1734                })
1735                .collect();
1736            let awslogs = td_containers.iter().find_map(|def| {
1737                let name = def.get("name").and_then(|v| v.as_str())?.to_string();
1738                let log_cfg = def.get("logConfiguration")?;
1739                if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
1740                    return None;
1741                }
1742                let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
1743                Some(AwsLogsConfig {
1744                    group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
1745                    stream_prefix: opts
1746                        .get("awslogs-stream-prefix")
1747                        .and_then(|v| v.as_str())
1748                        .map(String::from),
1749                    region: opts
1750                        .get("awslogs-region")
1751                        .and_then(|v| v.as_str())
1752                        .unwrap_or(&state.region)
1753                        .to_string(),
1754                    container_name: name,
1755                })
1756            });
1757            let task = Task {
1758                task_arn: task_arn.clone(),
1759                task_id: task_id.clone(),
1760                cluster_arn: cluster_arn.clone(),
1761                cluster_name: cluster_name.clone(),
1762                task_definition_arn: td_arn.clone(),
1763                family: td_family.clone(),
1764                revision: td_revision,
1765                last_status: "PROVISIONING".into(),
1766                desired_status: "RUNNING".into(),
1767                launch_type: launch_type.clone(),
1768                platform_version: Some("1.4.0".into()),
1769                cpu: body
1770                    .get("overrides")
1771                    .and_then(|v| v.get("cpu"))
1772                    .and_then(|v| v.as_str())
1773                    .map(String::from)
1774                    .or_else(|| td_cpu.clone()),
1775                memory: body
1776                    .get("overrides")
1777                    .and_then(|v| v.get("memory"))
1778                    .and_then(|v| v.as_str())
1779                    .map(String::from)
1780                    .or_else(|| td_memory.clone()),
1781                containers,
1782                overrides: body.get("overrides").cloned().unwrap_or_else(|| json!({})),
1783                started_by: started_by.clone(),
1784                group: group.clone(),
1785                connectivity: "CONNECTING".into(),
1786                stop_code: None,
1787                stopped_reason: None,
1788                created_at: Utc::now(),
1789                started_at: None,
1790                stopping_at: None,
1791                stopped_at: None,
1792                pull_started_at: None,
1793                pull_stopped_at: None,
1794                connectivity_at: None,
1795                started_by_ref_id: None,
1796                execution_role_arn: td_exec_role.clone(),
1797                task_role_arn: td_task_role.clone(),
1798                tags: tags.clone(),
1799                awslogs,
1800                captured_logs: String::new(),
1801                protection: None,
1802            };
1803            state.tasks.insert(task_id.clone(), task.clone());
1804            if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
1805                cluster.pending_tasks_count += 1;
1806            }
1807            // Snapshot-in-progress: transition to PENDING synchronously so
1808            // callers that immediately DescribeTasks see movement. RUNNING /
1809            // STOPPED come later from the background runtime task.
1810            if let Some(t) = state.tasks.get_mut(&task_id) {
1811                t.last_status = "PENDING".into();
1812            }
1813            task_jsons.push(task_to_json(&task));
1814            spawned_tasks.push(task_id.clone());
1815        }
1816        drop(accounts);
1817
1818        // Launch container execution outside the state lock.
1819        if let Some(rt) = runtime {
1820            for id in &spawned_tasks {
1821                rt.clone()
1822                    .run_task(self.state.clone(), id.clone(), account.clone());
1823            }
1824        } else {
1825            // No runtime available — fail fast so the task doesn't stay
1826            // PENDING forever. We incremented pending_tasks_count above;
1827            // decrement it here so the cluster counter doesn't drift and
1828            // block later DeleteCluster calls.
1829            let mut accounts = self.state.write();
1830            if let Some(state) = accounts.get_mut(&account) {
1831                let mut cluster_drains: Vec<String> = Vec::new();
1832                for id in &spawned_tasks {
1833                    if let Some(t) = state.tasks.get_mut(id) {
1834                        t.last_status = "STOPPED".into();
1835                        t.desired_status = "STOPPED".into();
1836                        t.stop_code = Some("TaskFailedToStart".into());
1837                        t.stopped_reason = Some(
1838                            "No container runtime available (docker/podman not installed)".into(),
1839                        );
1840                        t.stopped_at = Some(Utc::now());
1841                        for c in t.containers.iter_mut() {
1842                            c.last_status = "STOPPED".into();
1843                        }
1844                        cluster_drains.push(t.cluster_name.clone());
1845                    }
1846                }
1847                for name in cluster_drains {
1848                    if let Some(cluster) = state.clusters.get_mut(&name) {
1849                        if cluster.pending_tasks_count > 0 {
1850                            cluster.pending_tasks_count -= 1;
1851                        }
1852                    }
1853                }
1854            }
1855        }
1856
1857        Ok(AwsResponse::ok_json(json!({
1858            "tasks": task_jsons,
1859            "failures": [],
1860        })))
1861    }
1862
1863    fn start_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1864        // StartTask targets explicit container instances. Our ECS emulator
1865        // has no concept of registered container instances yet (Batch 4);
1866        // fall through to the same semantics as RunTask so the API is
1867        // usable while the container-instance surface is pending.
1868        self.run_task(request)
1869    }
1870
1871    async fn stop_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1872        let body = request.json_body();
1873        let task_ref = req_str(&body, "task")?;
1874        let reason = opt_str(&body, "reason")
1875            .unwrap_or("UserInitiated")
1876            .to_string();
1877        let cluster_ref = opt_str(&body, "cluster");
1878        let _cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1879
1880        let (task_id, account, task_snapshot) = {
1881            let account = request.account_id.clone();
1882            let mut accounts = self.state.write();
1883            let state = accounts
1884                .get_mut(&account)
1885                .ok_or_else(|| task_not_found(task_ref))?;
1886            let task_id = resolve_task_id(state, task_ref)?;
1887            let task = state
1888                .tasks
1889                .get_mut(&task_id)
1890                .ok_or_else(|| task_not_found(task_ref))?;
1891            task.desired_status = "STOPPED".into();
1892            task.stopping_at = Some(Utc::now());
1893            task.stopped_reason = Some(reason.clone());
1894            task.stop_code = Some("UserInitiated".into());
1895            (task_id, account, task.clone())
1896        };
1897        if let Some(rt) = &self.runtime {
1898            rt.stop_task(&task_id, &reason).await;
1899        }
1900        let _ = account;
1901        Ok(AwsResponse::ok_json(json!({
1902            "task": task_to_json(&task_snapshot),
1903        })))
1904    }
1905
1906    fn describe_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1907        let body = request.json_body();
1908        let refs: Vec<String> = body
1909            .get("tasks")
1910            .and_then(|v| v.as_array())
1911            .map(|arr| {
1912                arr.iter()
1913                    .filter_map(|v| v.as_str().map(String::from))
1914                    .collect()
1915            })
1916            .unwrap_or_default();
1917        let include_tags = body
1918            .get("include")
1919            .and_then(|v| v.as_array())
1920            .map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
1921            .unwrap_or(false);
1922
1923        let account = request.account_id.clone();
1924        let accounts = self.state.read();
1925        let Some(state) = accounts.get(&account) else {
1926            return Ok(AwsResponse::ok_json(json!({
1927                "tasks": [],
1928                "failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
1929            })));
1930        };
1931        let mut found = Vec::new();
1932        let mut failures = Vec::new();
1933        for input in &refs {
1934            let task_id = task_id_from_ref(input);
1935            match state.tasks.get(&task_id) {
1936                Some(t) => {
1937                    let mut v = task_to_json(t);
1938                    if include_tags {
1939                        v.as_object_mut()
1940                            .unwrap()
1941                            .insert("tags".into(), tags_json(&t.tags));
1942                    }
1943                    found.push(v);
1944                }
1945                None => {
1946                    failures.push(json!({
1947                        "arn": input,
1948                        "reason": "MISSING",
1949                    }));
1950                }
1951            }
1952        }
1953        Ok(AwsResponse::ok_json(json!({
1954            "tasks": found,
1955            "failures": failures,
1956        })))
1957    }
1958
1959    fn list_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1960        let body = request.json_body();
1961        let cluster_ref = opt_str(&body, "cluster");
1962        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1963        let family = opt_str(&body, "family");
1964        let status_filter = opt_str(&body, "desiredStatus");
1965        let started_by = opt_str(&body, "startedBy");
1966        let max_results = body
1967            .get("maxResults")
1968            .and_then(|v| v.as_i64())
1969            .filter(|n| (1..=100).contains(n))
1970            .map(|n| n as usize)
1971            .unwrap_or(100);
1972        let next_token = opt_str(&body, "nextToken").unwrap_or("");
1973
1974        let account = request.account_id.clone();
1975        let accounts = self.state.read();
1976        let mut arns: Vec<String> = match accounts.get(&account) {
1977            Some(state) => state
1978                .tasks
1979                .values()
1980                .filter(|t| t.cluster_name == cluster_name)
1981                .filter(|t| family.is_none_or(|f| t.family == f))
1982                .filter(|t| status_filter.is_none_or(|s| t.desired_status == s))
1983                .filter(|t| started_by.is_none_or(|s| t.started_by.as_deref() == Some(s)))
1984                .map(|t| t.task_arn.clone())
1985                .collect(),
1986            None => Vec::new(),
1987        };
1988        arns.sort();
1989        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
1990        let end = (start + max_results).min(arns.len());
1991        let page = arns[start..end].to_vec();
1992        let mut out = json!({"taskArns": page});
1993        if end < arns.len() {
1994            out.as_object_mut()
1995                .unwrap()
1996                .insert("nextToken".into(), json!(end.to_string()));
1997        }
1998        Ok(AwsResponse::ok_json(out))
1999    }
2000}
2001
2002fn task_not_found(task_ref: &str) -> AwsServiceError {
2003    AwsServiceError::aws_error(
2004        StatusCode::BAD_REQUEST,
2005        "ClientException",
2006        format!("Task not found: {task_ref}"),
2007    )
2008}
2009
2010/// Strip cluster prefix + optional ARN prefix to recover the task UUID.
2011fn task_id_from_ref(input: &str) -> String {
2012    if let Some(rest) = input.rsplit('/').next() {
2013        return rest.to_string();
2014    }
2015    input.to_string()
2016}
2017
2018fn resolve_task_id(state: &EcsState, task_ref: &str) -> Result<String, AwsServiceError> {
2019    let id = task_id_from_ref(task_ref);
2020    if state.tasks.contains_key(&id) {
2021        Ok(id)
2022    } else {
2023        Err(task_not_found(task_ref))
2024    }
2025}
2026
2027fn task_to_json(task: &Task) -> Value {
2028    let mut map = serde_json::Map::new();
2029    map.insert("taskArn".into(), json!(task.task_arn));
2030    map.insert("clusterArn".into(), json!(task.cluster_arn));
2031    map.insert("taskDefinitionArn".into(), json!(task.task_definition_arn));
2032    map.insert("lastStatus".into(), json!(task.last_status));
2033    map.insert("desiredStatus".into(), json!(task.desired_status));
2034    map.insert("launchType".into(), json!(task.launch_type));
2035    if let Some(ref v) = task.platform_version {
2036        map.insert("platformVersion".into(), json!(v));
2037    }
2038    if let Some(ref v) = task.cpu {
2039        map.insert("cpu".into(), json!(v));
2040    }
2041    if let Some(ref v) = task.memory {
2042        map.insert("memory".into(), json!(v));
2043    }
2044    map.insert(
2045        "containers".into(),
2046        Value::Array(task.containers.iter().map(container_to_json).collect()),
2047    );
2048    map.insert("overrides".into(), task.overrides.clone());
2049    if let Some(ref v) = task.started_by {
2050        map.insert("startedBy".into(), json!(v));
2051    }
2052    if let Some(ref v) = task.group {
2053        map.insert("group".into(), json!(v));
2054    }
2055    map.insert("connectivity".into(), json!(task.connectivity));
2056    if let Some(ref v) = task.stop_code {
2057        map.insert("stopCode".into(), json!(v));
2058    }
2059    if let Some(ref v) = task.stopped_reason {
2060        map.insert("stoppedReason".into(), json!(v));
2061    }
2062    if let Some(ref v) = task.task_role_arn {
2063        map.insert("taskRoleArn".into(), json!(v));
2064    }
2065    if let Some(ref v) = task.execution_role_arn {
2066        map.insert("executionRoleArn".into(), json!(v));
2067    }
2068    map.insert("createdAt".into(), json!(task.created_at.timestamp()));
2069    if let Some(ts) = task.started_at {
2070        map.insert("startedAt".into(), json!(ts.timestamp()));
2071    }
2072    if let Some(ts) = task.stopping_at {
2073        map.insert("stoppingAt".into(), json!(ts.timestamp()));
2074    }
2075    if let Some(ts) = task.stopped_at {
2076        map.insert("stoppedAt".into(), json!(ts.timestamp()));
2077    }
2078    if let Some(ts) = task.pull_started_at {
2079        map.insert("pullStartedAt".into(), json!(ts.timestamp()));
2080    }
2081    if let Some(ts) = task.pull_stopped_at {
2082        map.insert("pullStoppedAt".into(), json!(ts.timestamp()));
2083    }
2084    if let Some(ts) = task.connectivity_at {
2085        map.insert("connectivityAt".into(), json!(ts.timestamp()));
2086    }
2087    Value::Object(map)
2088}
2089
2090fn container_to_json(container: &Container) -> Value {
2091    let mut map = serde_json::Map::new();
2092    map.insert("containerArn".into(), json!(container.container_arn));
2093    map.insert("taskArn".into(), json!(container.task_arn));
2094    map.insert("name".into(), json!(container.name));
2095    map.insert("image".into(), json!(container.image));
2096    map.insert("lastStatus".into(), json!(container.last_status));
2097    map.insert("essential".into(), json!(container.essential));
2098    if let Some(code) = container.exit_code {
2099        map.insert("exitCode".into(), json!(code));
2100    }
2101    if let Some(ref r) = container.reason {
2102        map.insert("reason".into(), json!(r));
2103    }
2104    if let Some(ref id) = container.runtime_id {
2105        map.insert("runtimeId".into(), json!(id));
2106    }
2107    if let Some(ref v) = container.cpu {
2108        map.insert("cpu".into(), json!(v));
2109    }
2110    if let Some(ref v) = container.memory {
2111        map.insert("memory".into(), json!(v));
2112    }
2113    if let Some(ref v) = container.memory_reservation {
2114        map.insert("memoryReservation".into(), json!(v));
2115    }
2116    map.insert("networkBindings".into(), json!(container.network_bindings));
2117    map.insert(
2118        "networkInterfaces".into(),
2119        json!(container.network_interfaces),
2120    );
2121    if let Some(ref v) = container.health_status {
2122        map.insert("healthStatus".into(), json!(v));
2123    }
2124    Value::Object(map)
2125}
2126
2127// -------- operations: services --------
2128
2129impl EcsService {
2130    fn create_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2131        let body = request.json_body();
2132        let service_name = req_str(&body, "serviceName")?.to_string();
2133        validate_service_name(&service_name)?;
2134        let td_ref = req_str(&body, "taskDefinition")?;
2135        let cluster_ref = opt_str(&body, "cluster");
2136        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2137        let desired_count = body
2138            .get("desiredCount")
2139            .and_then(|v| v.as_i64())
2140            .filter(|n| *n >= 0)
2141            .unwrap_or(1) as i32;
2142        let launch_type = opt_str(&body, "launchType")
2143            .unwrap_or("FARGATE")
2144            .to_string();
2145        let scheduling = opt_str(&body, "schedulingStrategy")
2146            .unwrap_or("REPLICA")
2147            .to_string();
2148        let deployment_controller = body
2149            .get("deploymentController")
2150            .and_then(|v| v.get("type"))
2151            .and_then(|v| v.as_str())
2152            .unwrap_or("ECS")
2153            .to_string();
2154        let deployment_config = body.get("deploymentConfiguration");
2155        let min_healthy = deployment_config
2156            .and_then(|d| d.get("minimumHealthyPercent"))
2157            .and_then(|v| v.as_i64())
2158            .map(|n| n as i32);
2159        let max_percent = deployment_config
2160            .and_then(|d| d.get("maximumPercent"))
2161            .and_then(|v| v.as_i64())
2162            .map(|n| n as i32);
2163        let circuit = deployment_config.and_then(|d| d.get("deploymentCircuitBreaker"));
2164        let circuit_breaker = circuit.map(|c| CircuitBreakerConfig {
2165            enable: c.get("enable").and_then(|v| v.as_bool()).unwrap_or(false),
2166            rollback: c.get("rollback").and_then(|v| v.as_bool()).unwrap_or(false),
2167        });
2168        let tags = parse_tags(&body);
2169        let role_arn = opt_str(&body, "role").map(String::from);
2170        let load_balancers: Vec<Value> = body
2171            .get("loadBalancers")
2172            .and_then(|v| v.as_array())
2173            .cloned()
2174            .unwrap_or_default();
2175        let service_registries: Vec<Value> = body
2176            .get("serviceRegistries")
2177            .and_then(|v| v.as_array())
2178            .cloned()
2179            .unwrap_or_default();
2180        let placement_constraints: Vec<Value> = body
2181            .get("placementConstraints")
2182            .and_then(|v| v.as_array())
2183            .cloned()
2184            .unwrap_or_default();
2185        let placement_strategy: Vec<Value> = body
2186            .get("placementStrategy")
2187            .and_then(|v| v.as_array())
2188            .cloned()
2189            .unwrap_or_default();
2190        let network_configuration = body.get("networkConfiguration").cloned();
2191
2192        let runtime = self.runtime.clone();
2193        let account = request.account_id.clone();
2194        let principal_arn = request
2195            .principal
2196            .as_ref()
2197            .map(|p| p.arn.clone())
2198            .unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
2199
2200        let (service_json, spawn_task_ids) = {
2201            let mut accounts = self.state.write();
2202            let state = accounts.get_or_create(&account);
2203            // Resolve task definition to get family/revision.
2204            let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
2205            let revisions = state
2206                .task_definitions
2207                .get(&family)
2208                .ok_or_else(|| task_definition_not_found(td_ref))?;
2209            let td = match rev {
2210                Some(n) => revisions
2211                    .get(&n)
2212                    .ok_or_else(|| task_definition_not_found(td_ref))?,
2213                None => latest_active_revision(revisions)
2214                    .ok_or_else(|| task_definition_not_found(td_ref))?,
2215            };
2216            let td_arn = td.task_definition_arn.clone();
2217            let td_family = td.family.clone();
2218            let td_revision = td.revision;
2219            let cluster_arn = state
2220                .clusters
2221                .get(&cluster_name)
2222                .map(|c| c.cluster_arn.clone())
2223                .unwrap_or_else(|| state.cluster_arn(&cluster_name));
2224            let service_arn = state.service_arn(&cluster_name, &service_name);
2225            let key = EcsState::service_key(&cluster_name, &service_name);
2226            if let Some(existing) = state.services.get(&key) {
2227                if existing.status != "INACTIVE" {
2228                    return Err(service_already_exists(&service_name));
2229                }
2230            }
2231            let deployment = Deployment {
2232                deployment_id: format!(
2233                    "ecs-svc/{}",
2234                    uuid::Uuid::new_v4().as_u128() & 0xffff_ffff_ffff_ffff
2235                ),
2236                status: "PRIMARY".into(),
2237                task_definition_arn: td_arn.clone(),
2238                desired_count,
2239                pending_count: 0,
2240                running_count: 0,
2241                failed_tasks: 0,
2242                created_at: Utc::now(),
2243                updated_at: Utc::now(),
2244                launch_type: launch_type.clone(),
2245                rollout_state: "IN_PROGRESS".into(),
2246                rollout_state_reason: Some("ECS deployment in progress.".into()),
2247            };
2248            let service = Service {
2249                service_name: service_name.clone(),
2250                service_arn: service_arn.clone(),
2251                cluster_name: cluster_name.clone(),
2252                cluster_arn: cluster_arn.clone(),
2253                task_definition_arn: td_arn,
2254                family: td_family,
2255                revision: td_revision,
2256                desired_count,
2257                running_count: 0,
2258                pending_count: 0,
2259                launch_type: launch_type.clone(),
2260                status: "ACTIVE".into(),
2261                scheduling_strategy: scheduling,
2262                deployment_controller,
2263                minimum_healthy_percent: min_healthy,
2264                maximum_percent: max_percent,
2265                circuit_breaker,
2266                deployments: vec![deployment],
2267                load_balancers,
2268                service_registries,
2269                placement_constraints,
2270                placement_strategy,
2271                network_configuration,
2272                tags: tags.clone(),
2273                created_at: Utc::now(),
2274                created_by: Some(principal_arn.clone()),
2275                role_arn,
2276            };
2277            state.services.insert(key.clone(), service.clone());
2278            if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2279                cluster.active_services_count += 1;
2280            }
2281            state.push_event(crate::state::LifecycleEvent {
2282                at: Utc::now(),
2283                event_type: "ServiceCreated".into(),
2284                task_arn: None,
2285                cluster_arn: Some(cluster_arn),
2286                last_status: Some("ACTIVE".into()),
2287                detail: json!({"serviceArn": service_arn, "desiredCount": desired_count}),
2288            });
2289            let ids =
2290                spawn_service_tasks(state, &service, desired_count, &principal_arn, &launch_type);
2291            (service_to_json(state.services.get(&key).unwrap()), ids)
2292        };
2293
2294        if let Some(rt) = runtime {
2295            for id in spawn_task_ids {
2296                rt.clone().run_task(self.state.clone(), id, account.clone());
2297            }
2298        }
2299
2300        Ok(AwsResponse::ok_json(json!({ "service": service_json })))
2301    }
2302
2303    fn update_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2304        let body = request.json_body();
2305        let service_ref = req_str(&body, "service")?;
2306        let service_name = service_name_from_ref(service_ref);
2307        let cluster_ref = opt_str(&body, "cluster");
2308        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2309        let new_desired = body.get("desiredCount").and_then(|v| v.as_i64());
2310        let new_td_ref = opt_str(&body, "taskDefinition");
2311        let account = request.account_id.clone();
2312        let principal_arn = request
2313            .principal
2314            .as_ref()
2315            .map(|p| p.arn.clone())
2316            .unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
2317        let runtime = self.runtime.clone();
2318
2319        let (service_json, spawn_ids, stop_ids) = {
2320            let mut accounts = self.state.write();
2321            let state = accounts
2322                .get_mut(&account)
2323                .ok_or_else(|| service_not_found(&service_name))?;
2324            let key = EcsState::service_key(&cluster_name, &service_name);
2325            if !state.services.contains_key(&key) {
2326                return Err(service_not_found(&service_name));
2327            }
2328
2329            // Resolve new task definition (may stay on current one).
2330            let (new_td_arn, new_family, new_revision) = if let Some(td_ref) = new_td_ref {
2331                let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
2332                let revisions = state
2333                    .task_definitions
2334                    .get(&family)
2335                    .ok_or_else(|| task_definition_not_found(td_ref))?;
2336                let td = match rev {
2337                    Some(n) => revisions
2338                        .get(&n)
2339                        .ok_or_else(|| task_definition_not_found(td_ref))?,
2340                    None => latest_active_revision(revisions)
2341                        .ok_or_else(|| task_definition_not_found(td_ref))?,
2342                };
2343                (
2344                    Some(td.task_definition_arn.clone()),
2345                    td.family.clone(),
2346                    td.revision,
2347                )
2348            } else {
2349                let svc = state.services.get(&key).unwrap();
2350                (None, svc.family.clone(), svc.revision)
2351            };
2352
2353            let service_cluster_arn;
2354            let launch_type_clone;
2355            let effective_desired;
2356            let old_desired;
2357            let mut old_deployments_drained: Vec<String> = Vec::new();
2358            let mut new_deployment_triggered = false;
2359
2360            {
2361                let svc = state.services.get_mut(&key).unwrap();
2362                old_desired = svc.desired_count;
2363                service_cluster_arn = svc.cluster_arn.clone();
2364                launch_type_clone = svc.launch_type.clone();
2365
2366                if let Some(n) = new_desired {
2367                    let n = n.max(0) as i32;
2368                    svc.desired_count = n;
2369                    if let Some(d) = svc.deployments.iter_mut().find(|d| d.status == "PRIMARY") {
2370                        d.desired_count = n;
2371                        d.updated_at = Utc::now();
2372                    }
2373                }
2374
2375                if let Some(arn) = new_td_arn.clone() {
2376                    // Roll a new PRIMARY deployment; mark the previous one ACTIVE
2377                    // so it's eligible for drain once the new deployment ramps.
2378                    for d in svc.deployments.iter_mut() {
2379                        if d.status == "PRIMARY" {
2380                            d.status = "ACTIVE".into();
2381                            old_deployments_drained.push(d.deployment_id.clone());
2382                        }
2383                    }
2384                    svc.deployments.insert(
2385                        0,
2386                        Deployment {
2387                            deployment_id: format!(
2388                                "ecs-svc/{}",
2389                                uuid::Uuid::new_v4().as_u128() & 0xffff_ffff_ffff_ffff
2390                            ),
2391                            status: "PRIMARY".into(),
2392                            task_definition_arn: arn.clone(),
2393                            desired_count: svc.desired_count,
2394                            pending_count: 0,
2395                            running_count: 0,
2396                            failed_tasks: 0,
2397                            created_at: Utc::now(),
2398                            updated_at: Utc::now(),
2399                            launch_type: svc.launch_type.clone(),
2400                            rollout_state: "IN_PROGRESS".into(),
2401                            rollout_state_reason: Some("ECS deployment in progress.".into()),
2402                        },
2403                    );
2404                    svc.task_definition_arn = arn;
2405                    svc.family = new_family;
2406                    svc.revision = new_revision;
2407                    new_deployment_triggered = true;
2408                }
2409
2410                effective_desired = svc.desired_count;
2411            }
2412
2413            // Compute spawn + stop plan.
2414            let mut spawn: Vec<String> = Vec::new();
2415            let mut stop: Vec<String> = Vec::new();
2416
2417            // Tasks belonging to this service (by startedBy convention):
2418            let service_tag = format!("ecs-svc/{}", service_name);
2419            let current_tasks: Vec<(String, String)> = state
2420                .tasks
2421                .iter()
2422                .filter(|(_, t)| {
2423                    t.started_by.as_deref() == Some(service_tag.as_str())
2424                        && t.cluster_name == cluster_name
2425                        && t.last_status != "STOPPED"
2426                })
2427                .map(|(id, t)| (id.clone(), t.task_definition_arn.clone()))
2428                .collect();
2429
2430            let current_count = current_tasks.len() as i32;
2431            if effective_desired > current_count {
2432                let add = (effective_desired - current_count) as usize;
2433                let svc_snapshot = state.services.get(&key).unwrap().clone();
2434                let mut new_ids = spawn_service_tasks(
2435                    state,
2436                    &svc_snapshot,
2437                    add as i32,
2438                    &principal_arn,
2439                    &launch_type_clone,
2440                );
2441                spawn.append(&mut new_ids);
2442            } else if effective_desired < current_count {
2443                let remove = (current_count - effective_desired) as usize;
2444                for (id, _) in current_tasks.iter().take(remove) {
2445                    stop.push(id.clone());
2446                }
2447            }
2448
2449            // If a new deployment was triggered, also stop tasks still on
2450            // the old task definition so the new deployment can ramp up,
2451            // then spawn replacements — but only enough to hit the
2452            // effective desired count (not `stop.len()`, which conflates
2453            // scale-down drain with TD-drain and would over-spawn).
2454            if new_deployment_triggered {
2455                let new_td_arn_match = state
2456                    .services
2457                    .get(&key)
2458                    .unwrap()
2459                    .task_definition_arn
2460                    .clone();
2461                // Tasks already on the new task definition that we're NOT
2462                // stopping (scale-down may have picked the first N; those
2463                // are skipped here via `stop.contains(id)`).
2464                let kept_on_new_td: i32 = current_tasks
2465                    .iter()
2466                    .filter(|(id, t_arn)| *t_arn == new_td_arn_match && !stop.contains(id))
2467                    .count() as i32;
2468                for (id, t_arn) in &current_tasks {
2469                    if *t_arn != new_td_arn_match && !stop.contains(id) {
2470                        stop.push(id.clone());
2471                    }
2472                }
2473                let already_spawned = spawn.len() as i32;
2474                let need = (effective_desired - kept_on_new_td - already_spawned).max(0);
2475                if need > 0 {
2476                    let svc_snapshot = state.services.get(&key).unwrap().clone();
2477                    let mut more = spawn_service_tasks(
2478                        state,
2479                        &svc_snapshot,
2480                        need,
2481                        &principal_arn,
2482                        &launch_type_clone,
2483                    );
2484                    spawn.append(&mut more);
2485                }
2486            }
2487
2488            state.push_event(crate::state::LifecycleEvent {
2489                at: Utc::now(),
2490                event_type: "ServiceUpdated".into(),
2491                task_arn: None,
2492                cluster_arn: Some(service_cluster_arn),
2493                last_status: Some("ACTIVE".into()),
2494                detail: json!({
2495                    "serviceArn": state.services.get(&key).unwrap().service_arn,
2496                    "desiredCount": effective_desired,
2497                    "previousDesiredCount": old_desired,
2498                    "newDeployment": new_deployment_triggered,
2499                    "drainedDeployments": old_deployments_drained,
2500                }),
2501            });
2502
2503            let svc = state.services.get(&key).unwrap();
2504            (service_to_json(svc), spawn, stop)
2505        };
2506
2507        if let Some(rt) = runtime {
2508            for id in spawn_ids {
2509                rt.clone().run_task(self.state.clone(), id, account.clone());
2510            }
2511            for id in stop_ids {
2512                let rt2 = rt.clone();
2513                let id_clone = id.clone();
2514                tokio::spawn(async move {
2515                    rt2.stop_task(&id_clone, "ECS service scale-down").await;
2516                });
2517                // Flip the task's desired status synchronously so DescribeTasks reflects intent.
2518                let mut accounts = self.state.write();
2519                if let Some(state) = accounts.get_mut(&account) {
2520                    if let Some(task) = state.tasks.get_mut(&id) {
2521                        task.desired_status = "STOPPED".into();
2522                        task.stopping_at = Some(Utc::now());
2523                    }
2524                }
2525            }
2526        }
2527
2528        Ok(AwsResponse::ok_json(json!({ "service": service_json })))
2529    }
2530
2531    async fn delete_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2532        let body = request.json_body();
2533        let service_ref = req_str(&body, "service")?;
2534        let service_name = service_name_from_ref(service_ref);
2535        let cluster_ref = opt_str(&body, "cluster");
2536        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2537        let force = body.get("force").and_then(|v| v.as_bool()).unwrap_or(false);
2538
2539        let (snapshot, task_ids_to_stop) = {
2540            let mut accounts = self.state.write();
2541            let state = accounts
2542                .get_mut(&request.account_id)
2543                .ok_or_else(|| service_not_found(&service_name))?;
2544            let key = EcsState::service_key(&cluster_name, &service_name);
2545            let svc = state
2546                .services
2547                .get_mut(&key)
2548                .ok_or_else(|| service_not_found(&service_name))?;
2549            if !force && svc.desired_count > 0 {
2550                return Err(client_exception(
2551                    "The service cannot be stopped while it is scaled above 0. \
2552                     Either set desiredCount to 0 first, or pass force=true.",
2553                ));
2554            }
2555            svc.desired_count = 0;
2556            svc.status = "DRAINING".into();
2557            let service_tag = format!("ecs-svc/{}", service_name);
2558            let stop_ids: Vec<String> = state
2559                .tasks
2560                .iter()
2561                .filter(|(_, t)| {
2562                    t.started_by.as_deref() == Some(service_tag.as_str())
2563                        && t.cluster_name == cluster_name
2564                        && t.last_status != "STOPPED"
2565                })
2566                .map(|(id, _)| id.clone())
2567                .collect();
2568            if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2569                if cluster.active_services_count > 0 {
2570                    cluster.active_services_count -= 1;
2571                }
2572            }
2573            let svc_snapshot = state.services.get(&key).unwrap().clone();
2574            state.services.remove(&key);
2575            state.push_event(crate::state::LifecycleEvent {
2576                at: Utc::now(),
2577                event_type: "ServiceDeleted".into(),
2578                task_arn: None,
2579                cluster_arn: Some(svc_snapshot.cluster_arn.clone()),
2580                last_status: Some("DRAINING".into()),
2581                detail: json!({"serviceArn": svc_snapshot.service_arn}),
2582            });
2583            (svc_snapshot, stop_ids)
2584        };
2585
2586        if let Some(rt) = &self.runtime {
2587            for id in &task_ids_to_stop {
2588                rt.stop_task(id, "ECS service deletion").await;
2589                let mut accounts = self.state.write();
2590                if let Some(state) = accounts.get_mut(&request.account_id) {
2591                    if let Some(task) = state.tasks.get_mut(id) {
2592                        task.desired_status = "STOPPED".into();
2593                        task.stopping_at = Some(Utc::now());
2594                    }
2595                }
2596            }
2597        }
2598
2599        Ok(AwsResponse::ok_json(
2600            json!({ "service": service_to_json(&snapshot) }),
2601        ))
2602    }
2603
2604    fn describe_services(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2605        let body = request.json_body();
2606        let cluster_ref = opt_str(&body, "cluster");
2607        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2608        let refs: Vec<String> = body
2609            .get("services")
2610            .and_then(|v| v.as_array())
2611            .map(|arr| {
2612                arr.iter()
2613                    .filter_map(|v| v.as_str().map(String::from))
2614                    .collect()
2615            })
2616            .unwrap_or_default();
2617
2618        let account = request.account_id.clone();
2619        let accounts = self.state.read();
2620        let mut found = Vec::new();
2621        let mut failures = Vec::new();
2622        let Some(state) = accounts.get(&account) else {
2623            for r in &refs {
2624                failures.push(json!({"arn": r, "reason": "MISSING"}));
2625            }
2626            return Ok(AwsResponse::ok_json(
2627                json!({"services": found, "failures": failures}),
2628            ));
2629        };
2630        for r in &refs {
2631            let name = service_name_from_ref(r);
2632            let key = EcsState::service_key(&cluster_name, &name);
2633            match state.services.get(&key) {
2634                Some(svc) => {
2635                    let mut v = service_to_json(svc);
2636                    // Update derived running/pending counts from current tasks.
2637                    recompute_service_counts(state, &name, &cluster_name, &mut v);
2638                    found.push(v);
2639                }
2640                None => failures.push(json!({"arn": r, "reason": "MISSING"})),
2641            }
2642        }
2643        Ok(AwsResponse::ok_json(json!({
2644            "services": found,
2645            "failures": failures,
2646        })))
2647    }
2648
2649    fn list_services(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2650        let body = request.json_body();
2651        let cluster_ref = opt_str(&body, "cluster");
2652        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2653        let launch_type = opt_str(&body, "launchType");
2654        let scheduling = opt_str(&body, "schedulingStrategy");
2655        let max_results = body
2656            .get("maxResults")
2657            .and_then(|v| v.as_i64())
2658            .filter(|n| (1..=100).contains(n))
2659            .map(|n| n as usize)
2660            .unwrap_or(100);
2661        let next_token = opt_str(&body, "nextToken").unwrap_or("");
2662
2663        let account = request.account_id.clone();
2664        let accounts = self.state.read();
2665        let mut arns: Vec<String> = match accounts.get(&account) {
2666            Some(state) => state
2667                .services
2668                .values()
2669                .filter(|s| s.cluster_name == cluster_name)
2670                .filter(|s| launch_type.is_none_or(|lt| s.launch_type == lt))
2671                .filter(|s| scheduling.is_none_or(|sc| s.scheduling_strategy == sc))
2672                .map(|s| s.service_arn.clone())
2673                .collect(),
2674            None => Vec::new(),
2675        };
2676        arns.sort();
2677        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
2678        let end = (start + max_results).min(arns.len());
2679        let page = arns[start..end].to_vec();
2680        let mut out = json!({"serviceArns": page});
2681        if end < arns.len() {
2682            out.as_object_mut()
2683                .unwrap()
2684                .insert("nextToken".into(), json!(end.to_string()));
2685        }
2686        Ok(AwsResponse::ok_json(out))
2687    }
2688
2689    fn list_services_by_namespace(
2690        &self,
2691        request: &AwsRequest,
2692    ) -> Result<AwsResponse, AwsServiceError> {
2693        // fakecloud doesn't model Cloud Map namespaces yet — return all
2694        // services attached to services with registries pointing at the
2695        // given namespace ARN. Filter is loose: treat the namespace as a
2696        // hint and return every service when none match to mirror AWS's
2697        // "loose match when ambiguous" response shape.
2698        let body = request.json_body();
2699        let namespace = req_str(&body, "namespace")?.to_string();
2700        let account = request.account_id.clone();
2701        let accounts = self.state.read();
2702        let mut arns: Vec<String> = match accounts.get(&account) {
2703            Some(state) => state
2704                .services
2705                .values()
2706                .filter(|s| {
2707                    s.service_registries.iter().any(|r| {
2708                        r.get("registryArn")
2709                            .and_then(|v| v.as_str())
2710                            .is_some_and(|arn| arn.contains(&namespace))
2711                    })
2712                })
2713                .map(|s| s.service_arn.clone())
2714                .collect(),
2715            None => Vec::new(),
2716        };
2717        arns.sort();
2718        Ok(AwsResponse::ok_json(json!({"serviceArns": arns})))
2719    }
2720}
2721
2722fn validate_service_name(name: &str) -> Result<(), AwsServiceError> {
2723    if name.is_empty() || name.len() > 255 {
2724        return Err(invalid_parameter("Service name must be 1-255 characters"));
2725    }
2726    let ok = name
2727        .chars()
2728        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-');
2729    if !ok {
2730        return Err(invalid_parameter(
2731            "Service name may only contain letters, numbers, hyphens, and underscores",
2732        ));
2733    }
2734    Ok(())
2735}
2736
2737fn service_name_from_ref(input: &str) -> String {
2738    if let Some(rest) = input.rsplit('/').next() {
2739        return rest.to_string();
2740    }
2741    input.to_string()
2742}
2743
2744fn service_not_found(name: &str) -> AwsServiceError {
2745    AwsServiceError::aws_error(
2746        StatusCode::BAD_REQUEST,
2747        "ServiceNotFoundException",
2748        format!("The service could not be found: {name}"),
2749    )
2750}
2751
2752fn service_already_exists(name: &str) -> AwsServiceError {
2753    AwsServiceError::aws_error(
2754        StatusCode::BAD_REQUEST,
2755        "ServiceNotActiveException",
2756        format!("The service {name} already exists"),
2757    )
2758}
2759
2760/// Spawn N tasks for a service by cloning the task-definition containers
2761/// and inserting `Task` rows in the shared state. The task IDs are
2762/// returned so the caller can hand them to `EcsRuntime::run_task` after
2763/// releasing the state write lock.
2764fn spawn_service_tasks(
2765    state: &mut EcsState,
2766    service: &Service,
2767    count: i32,
2768    principal_arn: &str,
2769    launch_type: &str,
2770) -> Vec<String> {
2771    if count <= 0 {
2772        return Vec::new();
2773    }
2774    let Some(revisions) = state.task_definitions.get(&service.family) else {
2775        return Vec::new();
2776    };
2777    let Some(td) = revisions.get(&service.revision) else {
2778        return Vec::new();
2779    };
2780    let container_defs = td.container_definitions.clone();
2781    let cpu = td.cpu.clone();
2782    let memory = td.memory.clone();
2783    let task_role = td.task_role_arn.clone();
2784    let exec_role = td.execution_role_arn.clone();
2785    let cluster_name = service.cluster_name.clone();
2786    let cluster_arn = service.cluster_arn.clone();
2787    let td_arn = service.task_definition_arn.clone();
2788    let family = service.family.clone();
2789    let revision = service.revision;
2790    let service_tag = format!("ecs-svc/{}", service.service_name);
2791
2792    let mut ids = Vec::with_capacity(count as usize);
2793    for _ in 0..count {
2794        let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
2795        let task_arn = state.task_arn(&cluster_name, &task_id);
2796        let containers: Vec<Container> = container_defs
2797            .iter()
2798            .map(|def| Container {
2799                container_arn: format!(
2800                    "arn:aws:ecs:{}:{}:container/{}/{}/{}",
2801                    state.region,
2802                    state.account_id,
2803                    cluster_name,
2804                    task_id,
2805                    def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
2806                ),
2807                name: def
2808                    .get("name")
2809                    .and_then(|v| v.as_str())
2810                    .unwrap_or("app")
2811                    .to_string(),
2812                image: def
2813                    .get("image")
2814                    .and_then(|v| v.as_str())
2815                    .unwrap_or("")
2816                    .to_string(),
2817                task_arn: task_arn.clone(),
2818                last_status: "PENDING".into(),
2819                exit_code: None,
2820                reason: None,
2821                runtime_id: None,
2822                essential: def
2823                    .get("essential")
2824                    .and_then(|v| v.as_bool())
2825                    .unwrap_or(true),
2826                cpu: def
2827                    .get("cpu")
2828                    .and_then(|v| v.as_i64())
2829                    .map(|n| n.to_string()),
2830                memory: def
2831                    .get("memory")
2832                    .and_then(|v| v.as_i64())
2833                    .map(|n| n.to_string()),
2834                memory_reservation: def
2835                    .get("memoryReservation")
2836                    .and_then(|v| v.as_i64())
2837                    .map(|n| n.to_string()),
2838                network_bindings: Vec::new(),
2839                network_interfaces: Vec::new(),
2840                health_status: Some("UNKNOWN".into()),
2841                managed_agents: None,
2842            })
2843            .collect();
2844        let awslogs = container_defs.iter().find_map(|def| {
2845            let name = def.get("name").and_then(|v| v.as_str())?.to_string();
2846            let log_cfg = def.get("logConfiguration")?;
2847            if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
2848                return None;
2849            }
2850            let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
2851            Some(AwsLogsConfig {
2852                group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
2853                stream_prefix: opts
2854                    .get("awslogs-stream-prefix")
2855                    .and_then(|v| v.as_str())
2856                    .map(String::from),
2857                region: opts
2858                    .get("awslogs-region")
2859                    .and_then(|v| v.as_str())
2860                    .unwrap_or(&state.region)
2861                    .to_string(),
2862                container_name: name,
2863            })
2864        });
2865        let task = Task {
2866            task_arn: task_arn.clone(),
2867            task_id: task_id.clone(),
2868            cluster_arn: cluster_arn.clone(),
2869            cluster_name: cluster_name.clone(),
2870            task_definition_arn: td_arn.clone(),
2871            family: family.clone(),
2872            revision,
2873            last_status: "PENDING".into(),
2874            desired_status: "RUNNING".into(),
2875            launch_type: launch_type.into(),
2876            platform_version: Some("1.4.0".into()),
2877            cpu: cpu.clone(),
2878            memory: memory.clone(),
2879            containers,
2880            overrides: json!({}),
2881            started_by: Some(service_tag.clone()),
2882            group: Some(format!("service:{}", service.service_name)),
2883            connectivity: "CONNECTING".into(),
2884            stop_code: None,
2885            stopped_reason: None,
2886            created_at: Utc::now(),
2887            started_at: None,
2888            stopping_at: None,
2889            stopped_at: None,
2890            pull_started_at: None,
2891            pull_stopped_at: None,
2892            connectivity_at: None,
2893            started_by_ref_id: None,
2894            execution_role_arn: exec_role.clone(),
2895            task_role_arn: task_role.clone(),
2896            tags: Vec::new(),
2897            awslogs,
2898            captured_logs: String::new(),
2899            protection: None,
2900        };
2901        state.tasks.insert(task_id.clone(), task);
2902        if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2903            cluster.pending_tasks_count += 1;
2904        }
2905        ids.push(task_id);
2906    }
2907    let _ = principal_arn;
2908    ids
2909}
2910
2911fn recompute_service_counts(
2912    state: &EcsState,
2913    service_name: &str,
2914    cluster_name: &str,
2915    service_json: &mut Value,
2916) {
2917    let service_tag = format!("ecs-svc/{}", service_name);
2918    let mut running = 0i32;
2919    let mut pending = 0i32;
2920    for t in state.tasks.values() {
2921        if t.started_by.as_deref() == Some(service_tag.as_str()) && t.cluster_name == cluster_name {
2922            match t.last_status.as_str() {
2923                "RUNNING" => running += 1,
2924                "PENDING" | "PROVISIONING" => pending += 1,
2925                _ => {}
2926            }
2927        }
2928    }
2929    if let Some(map) = service_json.as_object_mut() {
2930        map.insert("runningCount".into(), json!(running));
2931        map.insert("pendingCount".into(), json!(pending));
2932    }
2933}
2934
2935fn service_to_json(svc: &Service) -> Value {
2936    let mut map = serde_json::Map::new();
2937    map.insert("serviceArn".into(), json!(svc.service_arn));
2938    map.insert("serviceName".into(), json!(svc.service_name));
2939    map.insert("clusterArn".into(), json!(svc.cluster_arn));
2940    map.insert("status".into(), json!(svc.status));
2941    map.insert("desiredCount".into(), json!(svc.desired_count));
2942    map.insert("runningCount".into(), json!(svc.running_count));
2943    map.insert("pendingCount".into(), json!(svc.pending_count));
2944    map.insert("launchType".into(), json!(svc.launch_type));
2945    map.insert("schedulingStrategy".into(), json!(svc.scheduling_strategy));
2946    map.insert("taskDefinition".into(), json!(svc.task_definition_arn));
2947    map.insert(
2948        "deploymentController".into(),
2949        json!({"type": svc.deployment_controller}),
2950    );
2951    let mut deployment_cfg = serde_json::Map::new();
2952    if let Some(n) = svc.minimum_healthy_percent {
2953        deployment_cfg.insert("minimumHealthyPercent".into(), json!(n));
2954    }
2955    if let Some(n) = svc.maximum_percent {
2956        deployment_cfg.insert("maximumPercent".into(), json!(n));
2957    }
2958    if let Some(ref cb) = svc.circuit_breaker {
2959        deployment_cfg.insert(
2960            "deploymentCircuitBreaker".into(),
2961            json!({"enable": cb.enable, "rollback": cb.rollback}),
2962        );
2963    }
2964    if !deployment_cfg.is_empty() {
2965        map.insert(
2966            "deploymentConfiguration".into(),
2967            Value::Object(deployment_cfg),
2968        );
2969    }
2970    map.insert(
2971        "deployments".into(),
2972        Value::Array(svc.deployments.iter().map(deployment_to_json).collect()),
2973    );
2974    map.insert(
2975        "loadBalancers".into(),
2976        Value::Array(svc.load_balancers.clone()),
2977    );
2978    map.insert(
2979        "serviceRegistries".into(),
2980        Value::Array(svc.service_registries.clone()),
2981    );
2982    map.insert(
2983        "placementConstraints".into(),
2984        Value::Array(svc.placement_constraints.clone()),
2985    );
2986    map.insert(
2987        "placementStrategy".into(),
2988        Value::Array(svc.placement_strategy.clone()),
2989    );
2990    if let Some(ref v) = svc.network_configuration {
2991        map.insert("networkConfiguration".into(), v.clone());
2992    }
2993    if let Some(ref v) = svc.role_arn {
2994        map.insert("roleArn".into(), json!(v));
2995    }
2996    if let Some(ref v) = svc.created_by {
2997        map.insert("createdBy".into(), json!(v));
2998    }
2999    map.insert("createdAt".into(), json!(svc.created_at.timestamp()));
3000    Value::Object(map)
3001}
3002
3003fn deployment_to_json(d: &Deployment) -> Value {
3004    json!({
3005        "id": d.deployment_id,
3006        "status": d.status,
3007        "taskDefinition": d.task_definition_arn,
3008        "desiredCount": d.desired_count,
3009        "pendingCount": d.pending_count,
3010        "runningCount": d.running_count,
3011        "failedTasks": d.failed_tasks,
3012        "createdAt": d.created_at.timestamp(),
3013        "updatedAt": d.updated_at.timestamp(),
3014        "launchType": d.launch_type,
3015        "rolloutState": d.rollout_state,
3016        "rolloutStateReason": d.rollout_state_reason,
3017    })
3018}
3019
3020// -------- operations: container instances, attributes, capacity providers, task sets, task protection, ExecuteCommand, agent surface --------
3021
3022impl EcsService {
3023    fn register_container_instance(
3024        &self,
3025        request: &AwsRequest,
3026    ) -> Result<AwsResponse, AwsServiceError> {
3027        let body = request.json_body();
3028        let cluster_ref = opt_str(&body, "cluster");
3029        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3030        let ec2_id = opt_str(&body, "instanceIdentityDocument")
3031            .and_then(|s| serde_json::from_str::<Value>(s).ok())
3032            .and_then(|v| {
3033                v.get("instanceId")
3034                    .and_then(|x| x.as_str())
3035                    .map(String::from)
3036            });
3037        let tags = parse_tags(&body);
3038
3039        let account = request.account_id.clone();
3040        let mut accounts = self.state.write();
3041        let state = accounts.get_or_create(&account);
3042        let cluster_arn = state
3043            .clusters
3044            .get(&cluster_name)
3045            .map(|c| c.cluster_arn.clone())
3046            .unwrap_or_else(|| state.cluster_arn(&cluster_name));
3047        let uuid = uuid::Uuid::new_v4().to_string();
3048        let ci_arn = state.container_instance_arn(&cluster_name, &uuid);
3049        let key = format!("{}/{}", cluster_name, uuid);
3050        let ci = ContainerInstance {
3051            container_instance_arn: ci_arn.clone(),
3052            ec2_instance_id: ec2_id,
3053            cluster_name: cluster_name.clone(),
3054            cluster_arn,
3055            status: "ACTIVE".into(),
3056            version: 1,
3057            version_info: body.get("versionInfo").cloned(),
3058            agent_connected: true,
3059            agent_update_status: None,
3060            remaining_resources: body
3061                .get("totalResources")
3062                .and_then(|v| v.as_array())
3063                .cloned()
3064                .unwrap_or_default(),
3065            registered_resources: body
3066                .get("totalResources")
3067                .and_then(|v| v.as_array())
3068                .cloned()
3069                .unwrap_or_default(),
3070            running_tasks_count: 0,
3071            pending_tasks_count: 0,
3072            registered_at: Utc::now(),
3073            attributes: body
3074                .get("attributes")
3075                .and_then(|v| v.as_array())
3076                .map(|arr| {
3077                    arr.iter()
3078                        .filter_map(|a| {
3079                            let name = a.get("name").and_then(|v| v.as_str())?;
3080                            Some(AttributeRef {
3081                                name: name.to_string(),
3082                                value: a.get("value").and_then(|v| v.as_str()).map(String::from),
3083                                target_type: a
3084                                    .get("targetType")
3085                                    .and_then(|v| v.as_str())
3086                                    .map(String::from),
3087                                target_id: a
3088                                    .get("targetId")
3089                                    .and_then(|v| v.as_str())
3090                                    .map(String::from),
3091                            })
3092                        })
3093                        .collect()
3094                })
3095                .unwrap_or_default(),
3096            tags,
3097            capacity_provider_name: None,
3098            health_status: None,
3099        };
3100        state.container_instances.insert(key, ci.clone());
3101        if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
3102            cluster.registered_container_instances_count += 1;
3103        }
3104        Ok(AwsResponse::ok_json(json!({
3105            "containerInstance": container_instance_to_json(&ci),
3106        })))
3107    }
3108
3109    fn deregister_container_instance(
3110        &self,
3111        request: &AwsRequest,
3112    ) -> Result<AwsResponse, AwsServiceError> {
3113        let body = request.json_body();
3114        let ci_ref = req_str(&body, "containerInstance")?.to_string();
3115        let cluster_ref = opt_str(&body, "cluster");
3116        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3117        let id = container_instance_id_from_ref(&ci_ref);
3118        let key = format!("{}/{}", cluster_name, id);
3119
3120        let account = request.account_id.clone();
3121        let mut accounts = self.state.write();
3122        let state = accounts
3123            .get_mut(&account)
3124            .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3125        let mut ci = state
3126            .container_instances
3127            .remove(&key)
3128            .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3129        ci.status = "INACTIVE".into();
3130        if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
3131            if cluster.registered_container_instances_count > 0 {
3132                cluster.registered_container_instances_count -= 1;
3133            }
3134        }
3135        Ok(AwsResponse::ok_json(json!({
3136            "containerInstance": container_instance_to_json(&ci),
3137        })))
3138    }
3139
3140    fn describe_container_instances(
3141        &self,
3142        request: &AwsRequest,
3143    ) -> Result<AwsResponse, AwsServiceError> {
3144        let body = request.json_body();
3145        let cluster_ref = opt_str(&body, "cluster");
3146        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3147        let refs: Vec<String> = body
3148            .get("containerInstances")
3149            .and_then(|v| v.as_array())
3150            .map(|arr| {
3151                arr.iter()
3152                    .filter_map(|v| v.as_str().map(String::from))
3153                    .collect()
3154            })
3155            .unwrap_or_default();
3156
3157        let accounts = self.state.read();
3158        let mut found = Vec::new();
3159        let mut failures = Vec::new();
3160        if let Some(state) = accounts.get(&request.account_id) {
3161            for r in &refs {
3162                let id = container_instance_id_from_ref(r);
3163                let key = format!("{}/{}", cluster_name, id);
3164                match state.container_instances.get(&key) {
3165                    Some(ci) => found.push(container_instance_to_json(ci)),
3166                    None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3167                }
3168            }
3169        } else {
3170            for r in &refs {
3171                failures.push(json!({"arn": r, "reason": "MISSING"}));
3172            }
3173        }
3174        Ok(AwsResponse::ok_json(json!({
3175            "containerInstances": found,
3176            "failures": failures,
3177        })))
3178    }
3179
3180    fn list_container_instances(
3181        &self,
3182        request: &AwsRequest,
3183    ) -> Result<AwsResponse, AwsServiceError> {
3184        let body = request.json_body();
3185        let cluster_ref = opt_str(&body, "cluster");
3186        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3187        let status_filter = opt_str(&body, "status");
3188        let max_results = body
3189            .get("maxResults")
3190            .and_then(|v| v.as_i64())
3191            .filter(|n| (1..=100).contains(n))
3192            .map(|n| n as usize)
3193            .unwrap_or(100);
3194        let next_token = opt_str(&body, "nextToken").unwrap_or("");
3195
3196        let accounts = self.state.read();
3197        let mut arns: Vec<String> = match accounts.get(&request.account_id) {
3198            Some(state) => state
3199                .container_instances
3200                .values()
3201                .filter(|ci| ci.cluster_name == cluster_name)
3202                .filter(|ci| status_filter.is_none_or(|s| ci.status == s))
3203                .map(|ci| ci.container_instance_arn.clone())
3204                .collect(),
3205            None => Vec::new(),
3206        };
3207        arns.sort();
3208        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
3209        let end = (start + max_results).min(arns.len());
3210        let page = arns[start..end].to_vec();
3211        let mut out = json!({"containerInstanceArns": page});
3212        if end < arns.len() {
3213            out.as_object_mut()
3214                .unwrap()
3215                .insert("nextToken".into(), json!(end.to_string()));
3216        }
3217        Ok(AwsResponse::ok_json(out))
3218    }
3219
3220    fn update_container_agent(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3221        let body = request.json_body();
3222        let ci_ref = req_str(&body, "containerInstance")?.to_string();
3223        let cluster_ref = opt_str(&body, "cluster");
3224        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3225        let id = container_instance_id_from_ref(&ci_ref);
3226        let key = format!("{}/{}", cluster_name, id);
3227        let mut accounts = self.state.write();
3228        let state = accounts
3229            .get_mut(&request.account_id)
3230            .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3231        let ci = state
3232            .container_instances
3233            .get_mut(&key)
3234            .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3235        ci.agent_update_status = Some("UPDATED".into());
3236        Ok(AwsResponse::ok_json(json!({
3237            "containerInstance": container_instance_to_json(ci),
3238        })))
3239    }
3240
3241    fn update_container_instances_state(
3242        &self,
3243        request: &AwsRequest,
3244    ) -> Result<AwsResponse, AwsServiceError> {
3245        let body = request.json_body();
3246        let status = req_str(&body, "status")?.to_string();
3247        let cluster_ref = opt_str(&body, "cluster");
3248        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3249        let refs: Vec<String> = body
3250            .get("containerInstances")
3251            .and_then(|v| v.as_array())
3252            .map(|arr| {
3253                arr.iter()
3254                    .filter_map(|v| v.as_str().map(String::from))
3255                    .collect()
3256            })
3257            .unwrap_or_default();
3258        let mut accounts = self.state.write();
3259        let state = accounts
3260            .get_mut(&request.account_id)
3261            .ok_or_else(|| client_exception("account not found"))?;
3262        let mut found = Vec::new();
3263        let mut failures = Vec::new();
3264        for r in &refs {
3265            let id = container_instance_id_from_ref(r);
3266            let key = format!("{}/{}", cluster_name, id);
3267            match state.container_instances.get_mut(&key) {
3268                Some(ci) => {
3269                    ci.status = status.clone();
3270                    found.push(container_instance_to_json(ci));
3271                }
3272                None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3273            }
3274        }
3275        Ok(AwsResponse::ok_json(json!({
3276            "containerInstances": found,
3277            "failures": failures,
3278        })))
3279    }
3280
3281    fn put_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3282        let body = request.json_body();
3283        let cluster_ref = opt_str(&body, "cluster");
3284        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3285        let attrs = body
3286            .get("attributes")
3287            .and_then(|v| v.as_array())
3288            .cloned()
3289            .unwrap_or_default();
3290
3291        let mut accounts = self.state.write();
3292        let state = accounts.get_or_create(&request.account_id);
3293        let mut stored = Vec::new();
3294        for a in &attrs {
3295            let Some(name) = a.get("name").and_then(|v| v.as_str()) else {
3296                continue;
3297            };
3298            let target_type = a
3299                .get("targetType")
3300                .and_then(|v| v.as_str())
3301                .unwrap_or("container-instance")
3302                .to_string();
3303            let target_id = a
3304                .get("targetId")
3305                .and_then(|v| v.as_str())
3306                .unwrap_or("")
3307                .to_string();
3308            let value = a.get("value").and_then(|v| v.as_str()).map(String::from);
3309            let key = format!("{}/{}/{}", cluster_name, target_id, name);
3310            let attr = Attribute {
3311                cluster_name: cluster_name.clone(),
3312                target_type: target_type.clone(),
3313                target_id: target_id.clone(),
3314                name: name.to_string(),
3315                value: value.clone(),
3316            };
3317            state.attributes.insert(key, attr);
3318            stored.push(json!({
3319                "name": name,
3320                "value": value,
3321                "targetType": target_type,
3322                "targetId": target_id,
3323            }));
3324        }
3325        Ok(AwsResponse::ok_json(json!({"attributes": stored})))
3326    }
3327
3328    fn delete_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3329        let body = request.json_body();
3330        let cluster_ref = opt_str(&body, "cluster");
3331        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3332        let attrs = body
3333            .get("attributes")
3334            .and_then(|v| v.as_array())
3335            .cloned()
3336            .unwrap_or_default();
3337        let mut accounts = self.state.write();
3338        let state = accounts.get_or_create(&request.account_id);
3339        let mut deleted = Vec::new();
3340        for a in &attrs {
3341            let Some(name) = a.get("name").and_then(|v| v.as_str()) else {
3342                continue;
3343            };
3344            let target_id = a.get("targetId").and_then(|v| v.as_str()).unwrap_or("");
3345            let key = format!("{}/{}/{}", cluster_name, target_id, name);
3346            if let Some(attr) = state.attributes.remove(&key) {
3347                deleted.push(json!({
3348                    "name": attr.name,
3349                    "value": attr.value,
3350                    "targetType": attr.target_type,
3351                    "targetId": attr.target_id,
3352                }));
3353            }
3354        }
3355        Ok(AwsResponse::ok_json(json!({"attributes": deleted})))
3356    }
3357
3358    fn list_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3359        let body = request.json_body();
3360        let cluster_ref = opt_str(&body, "cluster");
3361        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3362        let target_type = req_str(&body, "targetType")?.to_string();
3363        let attr_name = opt_str(&body, "attributeName");
3364        let attr_value = opt_str(&body, "attributeValue");
3365
3366        let accounts = self.state.read();
3367        let attrs: Vec<Value> = match accounts.get(&request.account_id) {
3368            Some(state) => state
3369                .attributes
3370                .values()
3371                .filter(|a| a.cluster_name == cluster_name)
3372                .filter(|a| a.target_type == target_type)
3373                .filter(|a| attr_name.is_none_or(|n| a.name == n))
3374                .filter(|a| attr_value.is_none_or(|v| a.value.as_deref() == Some(v)))
3375                .map(|a| {
3376                    json!({
3377                        "name": a.name,
3378                        "value": a.value,
3379                        "targetType": a.target_type,
3380                        "targetId": a.target_id,
3381                    })
3382                })
3383                .collect(),
3384            None => Vec::new(),
3385        };
3386        Ok(AwsResponse::ok_json(json!({"attributes": attrs})))
3387    }
3388
3389    fn create_capacity_provider(
3390        &self,
3391        request: &AwsRequest,
3392    ) -> Result<AwsResponse, AwsServiceError> {
3393        let body = request.json_body();
3394        let name = req_str(&body, "name")?.to_string();
3395        if name.starts_with("aws") || name.starts_with("ecs") {
3396            return Err(invalid_parameter(format!(
3397                "Capacity provider name cannot begin with 'aws' or 'ecs': {name}"
3398            )));
3399        }
3400        let auto_scaling_group_provider = body.get("autoScalingGroupProvider").cloned();
3401        let tags = parse_tags(&body);
3402
3403        let mut accounts = self.state.write();
3404        let state = accounts.get_or_create(&request.account_id);
3405        if state.capacity_providers.contains_key(&name) {
3406            return Err(client_exception(format!(
3407                "Capacity provider already exists: {name}"
3408            )));
3409        }
3410        let arn = format!(
3411            "arn:aws:ecs:{}:{}:capacity-provider/{}",
3412            state.region, state.account_id, name
3413        );
3414        let cp = CapacityProvider {
3415            name: name.clone(),
3416            arn,
3417            status: "ACTIVE".into(),
3418            auto_scaling_group_provider,
3419            update_status: None,
3420            update_status_reason: None,
3421            created_at: Utc::now(),
3422            tags,
3423        };
3424        state.capacity_providers.insert(name.clone(), cp.clone());
3425        Ok(AwsResponse::ok_json(json!({
3426            "capacityProvider": capacity_provider_to_json(&cp),
3427        })))
3428    }
3429
3430    fn delete_capacity_provider(
3431        &self,
3432        request: &AwsRequest,
3433    ) -> Result<AwsResponse, AwsServiceError> {
3434        let body = request.json_body();
3435        let input = req_str(&body, "capacityProvider")?.to_string();
3436        let name = capacity_provider_name_from_ref(&input);
3437        let mut accounts = self.state.write();
3438        let state = accounts
3439            .get_mut(&request.account_id)
3440            .ok_or_else(|| capacity_provider_not_found(&name))?;
3441        let mut cp = state
3442            .capacity_providers
3443            .remove(&name)
3444            .ok_or_else(|| capacity_provider_not_found(&name))?;
3445        cp.status = "INACTIVE".into();
3446        Ok(AwsResponse::ok_json(json!({
3447            "capacityProvider": capacity_provider_to_json(&cp),
3448        })))
3449    }
3450
3451    fn describe_capacity_providers(
3452        &self,
3453        request: &AwsRequest,
3454    ) -> Result<AwsResponse, AwsServiceError> {
3455        let body = request.json_body();
3456        let names: Vec<String> = body
3457            .get("capacityProviders")
3458            .and_then(|v| v.as_array())
3459            .map(|arr| {
3460                arr.iter()
3461                    .filter_map(|v| v.as_str().map(capacity_provider_name_from_ref))
3462                    .collect()
3463            })
3464            .unwrap_or_default();
3465        let accounts = self.state.read();
3466        let mut found = Vec::new();
3467        let mut failures = Vec::new();
3468        if let Some(state) = accounts.get(&request.account_id) {
3469            if names.is_empty() {
3470                for cp in state.capacity_providers.values() {
3471                    found.push(capacity_provider_to_json(cp));
3472                }
3473            } else {
3474                for n in &names {
3475                    match state.capacity_providers.get(n) {
3476                        Some(cp) => found.push(capacity_provider_to_json(cp)),
3477                        None => failures.push(json!({"arn": n, "reason": "MISSING"})),
3478                    }
3479                }
3480            }
3481        }
3482        Ok(AwsResponse::ok_json(json!({
3483            "capacityProviders": found,
3484            "failures": failures,
3485        })))
3486    }
3487
3488    fn update_capacity_provider(
3489        &self,
3490        request: &AwsRequest,
3491    ) -> Result<AwsResponse, AwsServiceError> {
3492        let body = request.json_body();
3493        let input = req_str(&body, "name")?.to_string();
3494        let name = capacity_provider_name_from_ref(&input);
3495        let asg = body.get("autoScalingGroupProvider").cloned();
3496        let mut accounts = self.state.write();
3497        let state = accounts
3498            .get_mut(&request.account_id)
3499            .ok_or_else(|| capacity_provider_not_found(&name))?;
3500        let cp = state
3501            .capacity_providers
3502            .get_mut(&name)
3503            .ok_or_else(|| capacity_provider_not_found(&name))?;
3504        if let Some(v) = asg {
3505            cp.auto_scaling_group_provider = Some(v);
3506        }
3507        cp.update_status = Some("UPDATE_COMPLETE".into());
3508        Ok(AwsResponse::ok_json(json!({
3509            "capacityProvider": capacity_provider_to_json(cp),
3510        })))
3511    }
3512
3513    fn get_task_protection(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3514        let body = request.json_body();
3515        let refs: Vec<String> = body
3516            .get("tasks")
3517            .and_then(|v| v.as_array())
3518            .map(|arr| {
3519                arr.iter()
3520                    .filter_map(|v| v.as_str().map(String::from))
3521                    .collect()
3522            })
3523            .unwrap_or_default();
3524        let accounts = self.state.read();
3525        let mut protections = Vec::new();
3526        let mut failures = Vec::new();
3527        if let Some(state) = accounts.get(&request.account_id) {
3528            for r in &refs {
3529                let id = task_id_from_ref(r);
3530                match state.tasks.get(&id) {
3531                    Some(t) => protections.push(task_protection_json(t)),
3532                    None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3533                }
3534            }
3535        }
3536        Ok(AwsResponse::ok_json(json!({
3537            "protectedTasks": protections,
3538            "failures": failures,
3539        })))
3540    }
3541
3542    fn update_task_protection(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3543        let body = request.json_body();
3544        let refs: Vec<String> = body
3545            .get("tasks")
3546            .and_then(|v| v.as_array())
3547            .map(|arr| {
3548                arr.iter()
3549                    .filter_map(|v| v.as_str().map(String::from))
3550                    .collect()
3551            })
3552            .unwrap_or_default();
3553        let protect = body
3554            .get("protectionEnabled")
3555            .and_then(|v| v.as_bool())
3556            .unwrap_or(false);
3557        let expires_in_minutes = body
3558            .get("expiresInMinutes")
3559            .and_then(|v| v.as_i64())
3560            .unwrap_or(2880);
3561        let expiration = if protect {
3562            Some(Utc::now() + chrono::Duration::minutes(expires_in_minutes))
3563        } else {
3564            None
3565        };
3566
3567        let mut accounts = self.state.write();
3568        let state = accounts
3569            .get_mut(&request.account_id)
3570            .ok_or_else(|| client_exception("account not found"))?;
3571        let mut protections = Vec::new();
3572        let mut failures = Vec::new();
3573        for r in &refs {
3574            let id = task_id_from_ref(r);
3575            match state.tasks.get_mut(&id) {
3576                Some(t) => {
3577                    t.protection = Some(crate::state::TaskProtection {
3578                        enabled: protect,
3579                        expiration,
3580                    });
3581                    protections.push(task_protection_json(t));
3582                }
3583                None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3584            }
3585        }
3586        Ok(AwsResponse::ok_json(json!({
3587            "protectedTasks": protections,
3588            "failures": failures,
3589        })))
3590    }
3591
3592    fn create_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3593        let body = request.json_body();
3594        let service_ref = req_str(&body, "service")?;
3595        let service_name = service_name_from_ref(service_ref);
3596        let cluster_ref = opt_str(&body, "cluster");
3597        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3598        let task_definition = req_str(&body, "taskDefinition")?.to_string();
3599        let external_id = opt_str(&body, "externalId").map(String::from);
3600        let launch_type = opt_str(&body, "launchType").map(String::from);
3601        let platform_version = opt_str(&body, "platformVersion").map(String::from);
3602        let scale = body.get("scale").cloned();
3603        let tags = parse_tags(&body);
3604        let load_balancers = body
3605            .get("loadBalancers")
3606            .and_then(|v| v.as_array())
3607            .cloned()
3608            .unwrap_or_default();
3609        let service_registries = body
3610            .get("serviceRegistries")
3611            .and_then(|v| v.as_array())
3612            .cloned()
3613            .unwrap_or_default();
3614        let capacity_provider_strategy = body
3615            .get("capacityProviderStrategy")
3616            .and_then(|v| v.as_array())
3617            .cloned()
3618            .unwrap_or_default();
3619
3620        let mut accounts = self.state.write();
3621        let state = accounts
3622            .get_mut(&request.account_id)
3623            .ok_or_else(|| service_not_found(&service_name))?;
3624        let service_key = EcsState::service_key(&cluster_name, &service_name);
3625        let svc = state
3626            .services
3627            .get(&service_key)
3628            .ok_or_else(|| service_not_found(&service_name))?;
3629        if svc.deployment_controller != "EXTERNAL" {
3630            return Err(client_exception(
3631                "CreateTaskSet requires the service to be created with \
3632                 deploymentController.type = EXTERNAL",
3633            ));
3634        }
3635        let ts_id = format!("ecs-svc-{}", uuid::Uuid::new_v4().simple());
3636        let task_set = TaskSet {
3637            task_set_id: ts_id.clone(),
3638            task_set_arn: format!(
3639                "arn:aws:ecs:{}:{}:task-set/{}/{}/{}",
3640                state.region, state.account_id, cluster_name, service_name, ts_id
3641            ),
3642            service_arn: svc.service_arn.clone(),
3643            cluster_arn: svc.cluster_arn.clone(),
3644            service_name: service_name.clone(),
3645            cluster_name: cluster_name.clone(),
3646            external_id,
3647            status: "ACTIVE".into(),
3648            task_definition,
3649            computed_desired_count: 0,
3650            pending_count: 0,
3651            running_count: 0,
3652            launch_type,
3653            platform_version,
3654            scale,
3655            stability_status: "STABILIZING".into(),
3656            created_at: Utc::now(),
3657            updated_at: Utc::now(),
3658            load_balancers,
3659            service_registries,
3660            capacity_provider_strategy,
3661            tags,
3662        };
3663        let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3664        state.task_sets.insert(key, task_set.clone());
3665        Ok(AwsResponse::ok_json(json!({
3666            "taskSet": task_set_to_json(&task_set),
3667        })))
3668    }
3669
3670    fn update_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3671        let body = request.json_body();
3672        let ts_ref = req_str(&body, "taskSet")?.to_string();
3673        let service_ref = req_str(&body, "service")?;
3674        let service_name = service_name_from_ref(service_ref);
3675        let cluster_ref = opt_str(&body, "cluster");
3676        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3677        let scale = body.get("scale").cloned();
3678
3679        let mut accounts = self.state.write();
3680        let state = accounts
3681            .get_mut(&request.account_id)
3682            .ok_or_else(|| client_exception("task set not found"))?;
3683        let ts_id = task_set_id_from_ref(&ts_ref);
3684        let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3685        let ts = state
3686            .task_sets
3687            .get_mut(&key)
3688            .ok_or_else(|| client_exception(format!("task set not found: {}", ts_ref)))?;
3689        if let Some(v) = scale {
3690            ts.scale = Some(v);
3691        }
3692        ts.updated_at = Utc::now();
3693        Ok(AwsResponse::ok_json(json!({
3694            "taskSet": task_set_to_json(ts),
3695        })))
3696    }
3697
3698    fn delete_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3699        let body = request.json_body();
3700        let ts_ref = req_str(&body, "taskSet")?.to_string();
3701        let service_ref = req_str(&body, "service")?;
3702        let service_name = service_name_from_ref(service_ref);
3703        let cluster_ref = opt_str(&body, "cluster");
3704        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3705        let ts_id = task_set_id_from_ref(&ts_ref);
3706        let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3707
3708        let mut accounts = self.state.write();
3709        let state = accounts
3710            .get_mut(&request.account_id)
3711            .ok_or_else(|| client_exception("task set not found"))?;
3712        let mut ts = state
3713            .task_sets
3714            .remove(&key)
3715            .ok_or_else(|| client_exception(format!("task set not found: {}", ts_ref)))?;
3716        ts.status = "DRAINING".into();
3717        Ok(AwsResponse::ok_json(json!({
3718            "taskSet": task_set_to_json(&ts),
3719        })))
3720    }
3721
3722    fn describe_task_sets(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3723        let body = request.json_body();
3724        let service_ref = req_str(&body, "service")?;
3725        let service_name = service_name_from_ref(service_ref);
3726        let cluster_ref = opt_str(&body, "cluster");
3727        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3728        let filter_refs: Vec<String> = body
3729            .get("taskSets")
3730            .and_then(|v| v.as_array())
3731            .map(|arr| {
3732                arr.iter()
3733                    .filter_map(|v| v.as_str().map(String::from))
3734                    .collect()
3735            })
3736            .unwrap_or_default();
3737
3738        let accounts = self.state.read();
3739        let mut found = Vec::new();
3740        let mut failures = Vec::new();
3741        if let Some(state) = accounts.get(&request.account_id) {
3742            if filter_refs.is_empty() {
3743                for ts in state.task_sets.values() {
3744                    if ts.cluster_name == cluster_name && ts.service_name == service_name {
3745                        found.push(task_set_to_json(ts));
3746                    }
3747                }
3748            } else {
3749                for r in &filter_refs {
3750                    let id = task_set_id_from_ref(r);
3751                    let key = format!("{}/{}/{}", cluster_name, service_name, id);
3752                    match state.task_sets.get(&key) {
3753                        Some(ts) => found.push(task_set_to_json(ts)),
3754                        None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3755                    }
3756                }
3757            }
3758        }
3759        Ok(AwsResponse::ok_json(json!({
3760            "taskSets": found,
3761            "failures": failures,
3762        })))
3763    }
3764
3765    fn update_service_primary_task_set(
3766        &self,
3767        request: &AwsRequest,
3768    ) -> Result<AwsResponse, AwsServiceError> {
3769        let body = request.json_body();
3770        let ts_ref = req_str(&body, "primaryTaskSet")?.to_string();
3771        let service_ref = req_str(&body, "service")?;
3772        let service_name = service_name_from_ref(service_ref);
3773        let cluster_ref = opt_str(&body, "cluster");
3774        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3775        let ts_id = task_set_id_from_ref(&ts_ref);
3776        let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3777        let mut accounts = self.state.write();
3778        let state = accounts
3779            .get_mut(&request.account_id)
3780            .ok_or_else(|| client_exception("task set not found"))?;
3781        if !state.task_sets.contains_key(&key) {
3782            return Err(client_exception(format!("task set not found: {}", ts_ref)));
3783        }
3784        // Demote any existing PRIMARY on this service to ACTIVE before
3785        // promoting the new one. Otherwise the service would be left with
3786        // two PRIMARY task sets, which AWS forbids.
3787        for ts in state.task_sets.values_mut() {
3788            if ts.service_name == service_name
3789                && ts.cluster_name == cluster_name
3790                && ts.status == "PRIMARY"
3791                && ts.task_set_id != ts_id
3792            {
3793                ts.status = "ACTIVE".into();
3794                ts.updated_at = Utc::now();
3795            }
3796        }
3797        let ts = state.task_sets.get_mut(&key).unwrap();
3798        ts.status = "PRIMARY".into();
3799        ts.updated_at = Utc::now();
3800        Ok(AwsResponse::ok_json(json!({
3801            "taskSet": task_set_to_json(ts),
3802        })))
3803    }
3804
3805    async fn execute_command(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3806        let body = request.json_body();
3807        let task_ref = req_str(&body, "task")?.to_string();
3808        let command = req_str(&body, "command")?.to_string();
3809        let interactive = body
3810            .get("interactive")
3811            .and_then(|v| v.as_bool())
3812            .unwrap_or(false);
3813
3814        // Resolve runtime container id.
3815        let container_id = {
3816            let accounts = self.state.read();
3817            let state = accounts
3818                .get(&request.account_id)
3819                .ok_or_else(|| task_not_found(&task_ref))?;
3820            let id = task_id_from_ref(&task_ref);
3821            state
3822                .tasks
3823                .get(&id)
3824                .and_then(|t| t.containers.first())
3825                .and_then(|c| c.runtime_id.clone())
3826        };
3827
3828        let session_id = format!("ecs-execute-command-{}", uuid::Uuid::new_v4());
3829        if let (Some(id), Some(_rt)) = (container_id.clone(), self.runtime.as_ref()) {
3830            // Best-effort proxy: shell the command through docker exec. We
3831            // don't stream back stdout/stderr in this ExecuteCommand response
3832            // (real AWS returns a Session token for the SSM sidecar), so log
3833            // the result server-side for visibility.
3834            let out = tokio::process::Command::new("docker")
3835                .args(["exec", &id, "sh", "-c", &command])
3836                .output()
3837                .await
3838                .map_err(|e| client_exception(format!("docker exec failed: {e}")))?;
3839            tracing::info!(
3840                task = %task_ref,
3841                exit = out.status.code().unwrap_or(-1),
3842                "ExecuteCommand via docker exec"
3843            );
3844        }
3845
3846        Ok(AwsResponse::ok_json(json!({
3847            "clusterArn": opt_str(&body, "cluster").unwrap_or(""),
3848            "containerArn": container_id.unwrap_or_default(),
3849            "containerName": opt_str(&body, "container").unwrap_or(""),
3850            "interactive": interactive,
3851            "session": {
3852                "sessionId": session_id,
3853                "streamUrl": "",
3854                "tokenValue": "",
3855            },
3856            "taskArn": task_ref,
3857        })))
3858    }
3859
3860    fn submit_container_state_change(
3861        &self,
3862        request: &AwsRequest,
3863    ) -> Result<AwsResponse, AwsServiceError> {
3864        // Agent-side API: record whatever the agent tells us about the
3865        // container. We already drive state internally via the runtime,
3866        // so this is an idempotent ack that updates the in-memory copy
3867        // when the named task+container exist.
3868        let body = request.json_body();
3869        let task_ref = opt_str(&body, "task").unwrap_or("");
3870        let container_name = opt_str(&body, "containerName").unwrap_or("");
3871        let status = opt_str(&body, "status").map(String::from);
3872        let exit_code = body.get("exitCode").and_then(|v| v.as_i64());
3873        let reason = opt_str(&body, "reason").map(String::from);
3874
3875        if !task_ref.is_empty() {
3876            let mut accounts = self.state.write();
3877            if let Some(state) = accounts.get_mut(&request.account_id) {
3878                let id = task_id_from_ref(task_ref);
3879                if let Some(task) = state.tasks.get_mut(&id) {
3880                    if let Some(container) = task
3881                        .containers
3882                        .iter_mut()
3883                        .find(|c| c.name == container_name)
3884                    {
3885                        if let Some(s) = status {
3886                            container.last_status = s;
3887                        }
3888                        if let Some(code) = exit_code {
3889                            container.exit_code = Some(code);
3890                        }
3891                        if let Some(r) = reason {
3892                            container.reason = Some(r);
3893                        }
3894                    }
3895                }
3896            }
3897        }
3898        Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3899    }
3900
3901    fn submit_task_state_change(
3902        &self,
3903        request: &AwsRequest,
3904    ) -> Result<AwsResponse, AwsServiceError> {
3905        let body = request.json_body();
3906        let task_ref = opt_str(&body, "task").unwrap_or("");
3907        let status = opt_str(&body, "status").map(String::from);
3908        if !task_ref.is_empty() {
3909            let mut accounts = self.state.write();
3910            if let Some(state) = accounts.get_mut(&request.account_id) {
3911                let id = task_id_from_ref(task_ref);
3912                if let Some(task) = state.tasks.get_mut(&id) {
3913                    if let Some(s) = status {
3914                        task.last_status = s;
3915                    }
3916                }
3917            }
3918        }
3919        Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3920    }
3921
3922    fn submit_attachment_state_changes(
3923        &self,
3924        _request: &AwsRequest,
3925    ) -> Result<AwsResponse, AwsServiceError> {
3926        // Attachments (ENIs) are beyond what fakecloud models today.
3927        // Ack the report so agents don't retry in a tight loop.
3928        Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3929    }
3930
3931    fn discover_poll_endpoint(
3932        &self,
3933        _request: &AwsRequest,
3934    ) -> Result<AwsResponse, AwsServiceError> {
3935        // ECS agents use this to discover the long-poll + telemetry
3936        // endpoints. Point both at fakecloud's local listener; real
3937        // agent polling isn't wired, but the shape is correct.
3938        let accounts = self.state.read();
3939        let endpoint = format!("https://ecs.{}.amazonaws.com/", accounts.region());
3940        Ok(AwsResponse::ok_json(json!({
3941            "endpoint": endpoint,
3942            "telemetryEndpoint": endpoint,
3943            "serviceConnectEndpoint": endpoint,
3944        })))
3945    }
3946
3947    fn stop_service_deployment(
3948        &self,
3949        request: &AwsRequest,
3950    ) -> Result<AwsResponse, AwsServiceError> {
3951        let body = request.json_body();
3952        let deployment_ref = req_str(&body, "serviceDeploymentArn")?.to_string();
3953        let mut accounts = self.state.write();
3954        let state = accounts
3955            .get_mut(&request.account_id)
3956            .ok_or_else(|| client_exception("service deployment not found"))?;
3957        for svc in state.services.values_mut() {
3958            for d in svc.deployments.iter_mut() {
3959                if deployment_ref.contains(&d.deployment_id) {
3960                    d.status = "STOPPED".into();
3961                    d.rollout_state = "FAILED".into();
3962                    d.rollout_state_reason = Some("StopServiceDeployment requested".into());
3963                    d.updated_at = Utc::now();
3964                    return Ok(AwsResponse::ok_json(json!({
3965                        "serviceDeployment": deployment_to_json(d),
3966                    })));
3967                }
3968            }
3969        }
3970        Err(client_exception(format!(
3971            "service deployment not found: {deployment_ref}"
3972        )))
3973    }
3974
3975    fn list_service_deployments(
3976        &self,
3977        request: &AwsRequest,
3978    ) -> Result<AwsResponse, AwsServiceError> {
3979        let body = request.json_body();
3980        let service_ref = req_str(&body, "service")?;
3981        let service_name = service_name_from_ref(service_ref);
3982        let cluster_ref = opt_str(&body, "cluster");
3983        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3984        let accounts = self.state.read();
3985        let mut deployments: Vec<Value> = Vec::new();
3986        if let Some(state) = accounts.get(&request.account_id) {
3987            let key = EcsState::service_key(&cluster_name, &service_name);
3988            if let Some(svc) = state.services.get(&key) {
3989                for d in &svc.deployments {
3990                    deployments.push(json!({
3991                        "serviceDeploymentArn": format!("{}/{}", svc.service_arn, d.deployment_id),
3992                        "serviceArn": svc.service_arn,
3993                        "clusterArn": svc.cluster_arn,
3994                        "status": d.status,
3995                        "createdAt": d.created_at.timestamp(),
3996                        "startedAt": d.created_at.timestamp(),
3997                        "finishedAt": null,
3998                    }));
3999                }
4000            }
4001        }
4002        Ok(AwsResponse::ok_json(json!({
4003            "serviceDeployments": deployments,
4004        })))
4005    }
4006
4007    fn describe_service_deployments(
4008        &self,
4009        request: &AwsRequest,
4010    ) -> Result<AwsResponse, AwsServiceError> {
4011        let body = request.json_body();
4012        let refs: Vec<String> = body
4013            .get("serviceDeploymentArns")
4014            .and_then(|v| v.as_array())
4015            .map(|arr| {
4016                arr.iter()
4017                    .filter_map(|v| v.as_str().map(String::from))
4018                    .collect()
4019            })
4020            .unwrap_or_default();
4021        let accounts = self.state.read();
4022        let mut found = Vec::new();
4023        let mut failures = Vec::new();
4024        if let Some(state) = accounts.get(&request.account_id) {
4025            'next_ref: for r in &refs {
4026                for svc in state.services.values() {
4027                    for d in &svc.deployments {
4028                        if r.contains(&d.deployment_id) {
4029                            found.push(json!({
4030                                "serviceDeploymentArn": r,
4031                                "serviceArn": svc.service_arn,
4032                                "clusterArn": svc.cluster_arn,
4033                                "status": d.status,
4034                                "createdAt": d.created_at.timestamp(),
4035                                "startedAt": d.created_at.timestamp(),
4036                                "finishedAt": null,
4037                                "deploymentConfiguration": {
4038                                    "minimumHealthyPercent": svc.minimum_healthy_percent,
4039                                    "maximumPercent": svc.maximum_percent,
4040                                },
4041                                "sourceServiceRevisions": [],
4042                                "targetServiceRevision": {
4043                                    "arn": d.task_definition_arn,
4044                                    "requestedTaskCount": d.desired_count,
4045                                    "runningTaskCount": d.running_count,
4046                                    "pendingTaskCount": d.pending_count,
4047                                    "failedTasks": d.failed_tasks,
4048                                },
4049                            }));
4050                            continue 'next_ref;
4051                        }
4052                    }
4053                }
4054                failures.push(json!({"arn": r, "reason": "MISSING"}));
4055            }
4056        }
4057        Ok(AwsResponse::ok_json(json!({
4058            "serviceDeployments": found,
4059            "failures": failures,
4060        })))
4061    }
4062
4063    fn describe_service_revisions(
4064        &self,
4065        request: &AwsRequest,
4066    ) -> Result<AwsResponse, AwsServiceError> {
4067        let body = request.json_body();
4068        let refs: Vec<String> = body
4069            .get("serviceRevisionArns")
4070            .and_then(|v| v.as_array())
4071            .map(|arr| {
4072                arr.iter()
4073                    .filter_map(|v| v.as_str().map(String::from))
4074                    .collect()
4075            })
4076            .unwrap_or_default();
4077        Ok(AwsResponse::ok_json(json!({
4078            "serviceRevisions": [],
4079            "failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
4080        })))
4081    }
4082}
4083
4084fn container_instance_id_from_ref(input: &str) -> String {
4085    input.rsplit('/').next().unwrap_or(input).to_string()
4086}
4087
4088fn container_instance_not_found(input: &str) -> AwsServiceError {
4089    AwsServiceError::aws_error(
4090        StatusCode::BAD_REQUEST,
4091        "ClientException",
4092        format!("Container instance not found: {input}"),
4093    )
4094}
4095
4096fn capacity_provider_name_from_ref(input: &str) -> String {
4097    input.rsplit('/').next().unwrap_or(input).to_string()
4098}
4099
4100fn capacity_provider_not_found(name: &str) -> AwsServiceError {
4101    AwsServiceError::aws_error(
4102        StatusCode::BAD_REQUEST,
4103        "ClientException",
4104        format!("Capacity provider not found: {name}"),
4105    )
4106}
4107
4108fn task_set_id_from_ref(input: &str) -> String {
4109    input.rsplit('/').next().unwrap_or(input).to_string()
4110}
4111
4112fn container_instance_to_json(ci: &ContainerInstance) -> Value {
4113    json!({
4114        "containerInstanceArn": ci.container_instance_arn,
4115        "ec2InstanceId": ci.ec2_instance_id,
4116        "status": ci.status,
4117        "version": ci.version,
4118        "versionInfo": ci.version_info,
4119        "agentConnected": ci.agent_connected,
4120        "agentUpdateStatus": ci.agent_update_status,
4121        "remainingResources": ci.remaining_resources,
4122        "registeredResources": ci.registered_resources,
4123        "runningTasksCount": ci.running_tasks_count,
4124        "pendingTasksCount": ci.pending_tasks_count,
4125        "registeredAt": ci.registered_at.timestamp(),
4126        "attributes": ci.attributes.iter().map(|a| json!({
4127            "name": a.name,
4128            "value": a.value,
4129            "targetType": a.target_type,
4130            "targetId": a.target_id,
4131        })).collect::<Vec<_>>(),
4132        "tags": tags_json(&ci.tags),
4133        "capacityProviderName": ci.capacity_provider_name,
4134        "healthStatus": ci.health_status,
4135    })
4136}
4137
4138fn capacity_provider_to_json(cp: &CapacityProvider) -> Value {
4139    json!({
4140        "name": cp.name,
4141        "capacityProviderArn": cp.arn,
4142        "status": cp.status,
4143        "autoScalingGroupProvider": cp.auto_scaling_group_provider,
4144        "updateStatus": cp.update_status,
4145        "updateStatusReason": cp.update_status_reason,
4146        "tags": tags_json(&cp.tags),
4147    })
4148}
4149
4150fn task_set_to_json(ts: &TaskSet) -> Value {
4151    json!({
4152        "id": ts.task_set_id,
4153        "taskSetArn": ts.task_set_arn,
4154        "serviceArn": ts.service_arn,
4155        "clusterArn": ts.cluster_arn,
4156        "externalId": ts.external_id,
4157        "status": ts.status,
4158        "taskDefinition": ts.task_definition,
4159        "computedDesiredCount": ts.computed_desired_count,
4160        "pendingCount": ts.pending_count,
4161        "runningCount": ts.running_count,
4162        "launchType": ts.launch_type,
4163        "platformVersion": ts.platform_version,
4164        "scale": ts.scale,
4165        "stabilityStatus": ts.stability_status,
4166        "createdAt": ts.created_at.timestamp(),
4167        "updatedAt": ts.updated_at.timestamp(),
4168        "loadBalancers": ts.load_balancers,
4169        "serviceRegistries": ts.service_registries,
4170        "capacityProviderStrategy": ts.capacity_provider_strategy,
4171        "tags": tags_json(&ts.tags),
4172    })
4173}
4174
4175fn task_protection_json(task: &Task) -> Value {
4176    let p = task.protection.as_ref();
4177    json!({
4178        "taskArn": task.task_arn,
4179        "protectionEnabled": p.map(|p| p.enabled).unwrap_or(false),
4180        "expirationDate": p.and_then(|p| p.expiration).map(|e| e.timestamp()),
4181    })
4182}
4183
4184#[cfg(test)]
4185mod tests {
4186    use super::*;
4187
4188    #[test]
4189    fn parse_family_revision_with_revision() {
4190        assert_eq!(parse_family_revision("web:3"), ("web".to_string(), Some(3)));
4191    }
4192
4193    #[test]
4194    fn parse_family_revision_without_revision() {
4195        assert_eq!(parse_family_revision("web"), ("web".to_string(), None));
4196    }
4197
4198    #[test]
4199    fn parse_family_revision_non_numeric_treated_as_no_revision() {
4200        assert_eq!(
4201            parse_family_revision("web:latest"),
4202            ("web:latest".to_string(), None)
4203        );
4204    }
4205
4206    #[test]
4207    fn decode_ecs_arn_cluster() {
4208        let (account, rtype, tail) =
4209            decode_ecs_arn("arn:aws:ecs:us-east-1:111122223333:cluster/prod").unwrap();
4210        assert_eq!(account, "111122223333");
4211        assert_eq!(rtype, "cluster");
4212        assert_eq!(tail, "prod");
4213    }
4214
4215    #[test]
4216    fn decode_ecs_arn_task_definition() {
4217        let (account, rtype, tail) =
4218            decode_ecs_arn("arn:aws:ecs:us-east-1:111122223333:task-definition/web:5").unwrap();
4219        assert_eq!(account, "111122223333");
4220        assert_eq!(rtype, "task-definition");
4221        assert_eq!(tail, "web:5");
4222    }
4223
4224    #[test]
4225    fn decode_ecs_arn_rejects_non_ecs() {
4226        assert!(decode_ecs_arn("arn:aws:s3:::bucket").is_err());
4227    }
4228
4229    #[test]
4230    fn resolve_service_key_handles_short_and_long() {
4231        let mut state = EcsState::new("123456789012", "us-east-1");
4232        state.services.insert(
4233            "default/api".to_string(),
4234            Service {
4235                service_name: "api".into(),
4236                service_arn: "arn".into(),
4237                cluster_name: "default".into(),
4238                cluster_arn: "arn".into(),
4239                task_definition_arn: "td-arn".into(),
4240                family: "td".into(),
4241                revision: 1,
4242                desired_count: 0,
4243                running_count: 0,
4244                pending_count: 0,
4245                launch_type: "FARGATE".into(),
4246                status: "ACTIVE".into(),
4247                scheduling_strategy: "REPLICA".into(),
4248                deployment_controller: "ECS".into(),
4249                minimum_healthy_percent: None,
4250                maximum_percent: None,
4251                circuit_breaker: None,
4252                deployments: vec![],
4253                load_balancers: vec![],
4254                service_registries: vec![],
4255                placement_constraints: vec![],
4256                placement_strategy: vec![],
4257                network_configuration: None,
4258                tags: vec![],
4259                created_at: chrono::Utc::now(),
4260                created_by: None,
4261                role_arn: None,
4262            },
4263        );
4264        // Long-form: cluster/service.
4265        assert_eq!(
4266            resolve_service_key(&state, "default/api"),
4267            Some("default/api".to_string())
4268        );
4269        // Short-form: bare service name resolves via ends_with scan.
4270        assert_eq!(
4271            resolve_service_key(&state, "api"),
4272            Some("default/api".to_string())
4273        );
4274        // Unknown name returns None.
4275        assert_eq!(resolve_service_key(&state, "nope"), None);
4276    }
4277
4278    #[test]
4279    fn resolve_container_instance_key_handles_short_and_long() {
4280        let mut state = EcsState::new("123456789012", "us-east-1");
4281        state.container_instances.insert(
4282            "default/abc-123".to_string(),
4283            ContainerInstance {
4284                container_instance_arn: "arn".into(),
4285                ec2_instance_id: Some("i-x".into()),
4286                cluster_name: "default".into(),
4287                cluster_arn: "arn".into(),
4288                status: "ACTIVE".into(),
4289                version: 0,
4290                version_info: None,
4291                agent_connected: true,
4292                agent_update_status: None,
4293                remaining_resources: vec![],
4294                registered_resources: vec![],
4295                running_tasks_count: 0,
4296                pending_tasks_count: 0,
4297                registered_at: chrono::Utc::now(),
4298                attributes: vec![],
4299                tags: vec![],
4300                capacity_provider_name: None,
4301                health_status: None,
4302            },
4303        );
4304        assert_eq!(
4305            resolve_container_instance_key(&state, "default/abc-123"),
4306            Some("default/abc-123".to_string())
4307        );
4308        assert_eq!(
4309            resolve_container_instance_key(&state, "abc-123"),
4310            Some("default/abc-123".to_string())
4311        );
4312        assert_eq!(resolve_container_instance_key(&state, "nope"), None);
4313    }
4314
4315    #[test]
4316    fn validate_family_name_accepts_hyphen_underscore() {
4317        assert!(validate_family_name("web_server-2").is_ok());
4318    }
4319
4320    #[test]
4321    fn validate_family_name_rejects_empty() {
4322        assert!(validate_family_name("").is_err());
4323    }
4324
4325    #[test]
4326    fn validate_family_name_rejects_slash() {
4327        assert!(validate_family_name("web/server").is_err());
4328    }
4329
4330    #[test]
4331    fn resolve_task_definition_ref_bare_family() {
4332        let (account, family, rev) = resolve_task_definition_ref("web").unwrap();
4333        assert_eq!(account, None);
4334        assert_eq!(family, "web");
4335        assert_eq!(rev, None);
4336    }
4337
4338    #[test]
4339    fn resolve_task_definition_ref_family_revision() {
4340        let (account, family, rev) = resolve_task_definition_ref("web:3").unwrap();
4341        assert_eq!(account, None);
4342        assert_eq!(family, "web");
4343        assert_eq!(rev, Some(3));
4344    }
4345
4346    #[test]
4347    fn resolve_task_definition_ref_full_arn() {
4348        let (account, family, rev) =
4349            resolve_task_definition_ref("arn:aws:ecs:us-east-1:111122223333:task-definition/web:3")
4350                .unwrap();
4351        assert_eq!(account, Some("111122223333".to_string()));
4352        assert_eq!(family, "web");
4353        assert_eq!(rev, Some(3));
4354    }
4355
4356    #[test]
4357    fn merge_tags_replaces_existing_value() {
4358        let mut current = vec![TagEntry {
4359            key: "env".into(),
4360            value: "dev".into(),
4361        }];
4362        merge_tags(
4363            &mut current,
4364            vec![TagEntry {
4365                key: "env".into(),
4366                value: "prod".into(),
4367            }],
4368        );
4369        assert_eq!(current.len(), 1);
4370        assert_eq!(current[0].value, "prod");
4371    }
4372
4373    #[test]
4374    fn merge_tags_adds_new() {
4375        let mut current = vec![TagEntry {
4376            key: "env".into(),
4377            value: "dev".into(),
4378        }];
4379        merge_tags(
4380            &mut current,
4381            vec![TagEntry {
4382                key: "team".into(),
4383                value: "platform".into(),
4384            }],
4385        );
4386        assert_eq!(current.len(), 2);
4387    }
4388
4389    #[test]
4390    fn parse_tags_reads_lowercase_keys() {
4391        let body = json!({
4392            "tags": [
4393                {"key": "env", "value": "prod"},
4394                {"key": "team", "value": "platform"},
4395            ]
4396        });
4397        let tags = parse_tags(&body);
4398        assert_eq!(tags.len(), 2);
4399        assert_eq!(tags[0].key, "env");
4400        assert_eq!(tags[0].value, "prod");
4401    }
4402
4403    #[test]
4404    fn matches_filter_respects_none() {
4405        assert!(matches_filter(None, "anything"));
4406        assert!(matches_filter(Some("x"), "x"));
4407        assert!(!matches_filter(Some("x"), "y"));
4408    }
4409}