Skip to main content

fakecloud_ecs/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use chrono::Utc;
5use http::StatusCode;
6use serde_json::{json, Map, Value};
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
10use fakecloud_persistence::SnapshotStore;
11
12use crate::state::{
13    Attribute, AttributeRef, AwsLogsConfig, CapacityProvider, CircuitBreakerConfig, Cluster,
14    Container, ContainerInstance, Deployment, EcsSnapshot, EcsState, Service, SharedEcsState,
15    TagEntry, Task, TaskDefinition, TaskSet, ECS_SNAPSHOT_SCHEMA_VERSION,
16};
17
18const SUPPORTED_ACTIONS: &[&str] = &[
19    "CreateCluster",
20    "DescribeClusters",
21    "DeleteCluster",
22    "ListClusters",
23    "UpdateCluster",
24    "UpdateClusterSettings",
25    "PutClusterCapacityProviders",
26    "RegisterTaskDefinition",
27    "DescribeTaskDefinition",
28    "DeregisterTaskDefinition",
29    "DeleteTaskDefinitions",
30    "ListTaskDefinitions",
31    "ListTaskDefinitionFamilies",
32    "TagResource",
33    "UntagResource",
34    "ListTagsForResource",
35    "PutAccountSetting",
36    "PutAccountSettingDefault",
37    "DeleteAccountSetting",
38    "ListAccountSettings",
39    "RunTask",
40    "StartTask",
41    "StopTask",
42    "DescribeTasks",
43    "ListTasks",
44    "CreateService",
45    "UpdateService",
46    "DeleteService",
47    "DescribeServices",
48    "ListServices",
49    "ListServicesByNamespace",
50    "RegisterContainerInstance",
51    "DeregisterContainerInstance",
52    "DescribeContainerInstances",
53    "ListContainerInstances",
54    "UpdateContainerAgent",
55    "UpdateContainerInstancesState",
56    "PutAttributes",
57    "DeleteAttributes",
58    "ListAttributes",
59    "CreateCapacityProvider",
60    "DeleteCapacityProvider",
61    "DescribeCapacityProviders",
62    "UpdateCapacityProvider",
63    "GetTaskProtection",
64    "UpdateTaskProtection",
65    "CreateTaskSet",
66    "UpdateTaskSet",
67    "DeleteTaskSet",
68    "DescribeTaskSets",
69    "UpdateServicePrimaryTaskSet",
70    "ExecuteCommand",
71    "SubmitContainerStateChange",
72    "SubmitTaskStateChange",
73    "SubmitAttachmentStateChanges",
74    "DiscoverPollEndpoint",
75    "StopServiceDeployment",
76    "ListServiceDeployments",
77    "DescribeServiceDeployments",
78    "DescribeServiceRevisions",
79];
80
81fn is_mutating(action: &str) -> bool {
82    matches!(
83        action,
84        "CreateCluster"
85            | "DeleteCluster"
86            | "UpdateCluster"
87            | "UpdateClusterSettings"
88            | "PutClusterCapacityProviders"
89            | "RegisterTaskDefinition"
90            | "DeregisterTaskDefinition"
91            | "DeleteTaskDefinitions"
92            | "TagResource"
93            | "UntagResource"
94            | "PutAccountSetting"
95            | "PutAccountSettingDefault"
96            | "DeleteAccountSetting"
97            | "RunTask"
98            | "StartTask"
99            | "StopTask"
100            | "CreateService"
101            | "UpdateService"
102            | "DeleteService"
103            | "RegisterContainerInstance"
104            | "DeregisterContainerInstance"
105            | "UpdateContainerAgent"
106            | "UpdateContainerInstancesState"
107            | "PutAttributes"
108            | "DeleteAttributes"
109            | "CreateCapacityProvider"
110            | "DeleteCapacityProvider"
111            | "UpdateCapacityProvider"
112            | "UpdateTaskProtection"
113            | "CreateTaskSet"
114            | "UpdateTaskSet"
115            | "DeleteTaskSet"
116            | "UpdateServicePrimaryTaskSet"
117            | "SubmitContainerStateChange"
118            | "SubmitTaskStateChange"
119            | "SubmitAttachmentStateChanges"
120            | "StopServiceDeployment"
121    )
122}
123
124pub struct EcsService {
125    state: SharedEcsState,
126    snapshot_store: Option<Arc<dyn SnapshotStore>>,
127    snapshot_lock: Arc<AsyncMutex<()>>,
128    runtime: Option<Arc<crate::runtime::EcsRuntime>>,
129    role_trust_validator: Option<Arc<dyn fakecloud_core::auth::RoleTrustValidator>>,
130}
131
132impl EcsService {
133    pub fn new(state: SharedEcsState) -> Self {
134        Self {
135            state,
136            snapshot_store: None,
137            snapshot_lock: Arc::new(AsyncMutex::new(())),
138            runtime: None,
139            role_trust_validator: None,
140        }
141    }
142
143    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
144        self.snapshot_store = Some(store);
145        self
146    }
147
148    pub fn with_runtime(mut self, runtime: Arc<crate::runtime::EcsRuntime>) -> Self {
149        self.runtime = Some(runtime);
150        self
151    }
152
153    pub fn with_role_trust_validator(
154        mut self,
155        validator: Arc<dyn fakecloud_core::auth::RoleTrustValidator>,
156    ) -> Self {
157        self.role_trust_validator = Some(validator);
158        self
159    }
160
161    fn check_pass_role(&self, account_id: &str, role_arn: &str) -> Result<(), AwsServiceError> {
162        let Some(ref validator) = self.role_trust_validator else {
163            return Ok(());
164        };
165        if let Err(err) = validator.validate(account_id, role_arn, "ecs-tasks.amazonaws.com") {
166            return Err(AwsServiceError::aws_error(
167                StatusCode::BAD_REQUEST,
168                "InvalidParameterException",
169                err.to_string(),
170            ));
171        }
172        Ok(())
173    }
174
175    pub fn state_handle(&self) -> &SharedEcsState {
176        &self.state
177    }
178
179    async fn save_snapshot(&self) {
180        let Some(store) = self.snapshot_store.clone() else {
181            return;
182        };
183        let _guard = self.snapshot_lock.lock().await;
184        let snapshot = EcsSnapshot {
185            schema_version: ECS_SNAPSHOT_SCHEMA_VERSION,
186            accounts: Some(self.state.read().clone()),
187        };
188        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
189            let bytes = serde_json::to_vec(&snapshot)
190                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
191            store.save(&bytes)
192        })
193        .await;
194        match join {
195            Ok(Ok(())) => {}
196            Ok(Err(err)) => tracing::error!(%err, "failed to write ecs snapshot"),
197            Err(err) => tracing::error!(%err, "ecs snapshot task panicked"),
198        }
199    }
200}
201
202#[async_trait]
203impl AwsService for EcsService {
204    fn service_name(&self) -> &str {
205        "ecs"
206    }
207
208    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
209        let mutates = is_mutating(request.action.as_str());
210        let result = match request.action.as_str() {
211            "CreateCluster" => self.create_cluster(&request),
212            "DescribeClusters" => self.describe_clusters(&request),
213            "DeleteCluster" => self.delete_cluster(&request),
214            "ListClusters" => self.list_clusters(&request),
215            "UpdateCluster" => self.update_cluster(&request),
216            "UpdateClusterSettings" => self.update_cluster_settings(&request),
217            "PutClusterCapacityProviders" => self.put_cluster_capacity_providers(&request),
218            "RegisterTaskDefinition" => self.register_task_definition(&request),
219            "DescribeTaskDefinition" => self.describe_task_definition(&request),
220            "DeregisterTaskDefinition" => self.deregister_task_definition(&request),
221            "DeleteTaskDefinitions" => self.delete_task_definitions(&request),
222            "ListTaskDefinitions" => self.list_task_definitions(&request),
223            "ListTaskDefinitionFamilies" => self.list_task_definition_families(&request),
224            "TagResource" => self.tag_resource(&request),
225            "UntagResource" => self.untag_resource(&request),
226            "ListTagsForResource" => self.list_tags_for_resource(&request),
227            "PutAccountSetting" => self.put_account_setting(&request),
228            "PutAccountSettingDefault" => self.put_account_setting_default(&request),
229            "DeleteAccountSetting" => self.delete_account_setting(&request),
230            "ListAccountSettings" => self.list_account_settings(&request),
231            "RunTask" => self.run_task(&request),
232            "StartTask" => self.start_task(&request),
233            "StopTask" => self.stop_task(&request).await,
234            "DescribeTasks" => self.describe_tasks(&request),
235            "ListTasks" => self.list_tasks(&request),
236            "CreateService" => self.create_service(&request),
237            "UpdateService" => self.update_service(&request),
238            "DeleteService" => self.delete_service(&request).await,
239            "DescribeServices" => self.describe_services(&request),
240            "ListServices" => self.list_services(&request),
241            "ListServicesByNamespace" => self.list_services_by_namespace(&request),
242            "RegisterContainerInstance" => self.register_container_instance(&request),
243            "DeregisterContainerInstance" => self.deregister_container_instance(&request),
244            "DescribeContainerInstances" => self.describe_container_instances(&request),
245            "ListContainerInstances" => self.list_container_instances(&request),
246            "UpdateContainerAgent" => self.update_container_agent(&request),
247            "UpdateContainerInstancesState" => self.update_container_instances_state(&request),
248            "PutAttributes" => self.put_attributes(&request),
249            "DeleteAttributes" => self.delete_attributes(&request),
250            "ListAttributes" => self.list_attributes(&request),
251            "CreateCapacityProvider" => self.create_capacity_provider(&request),
252            "DeleteCapacityProvider" => self.delete_capacity_provider(&request),
253            "DescribeCapacityProviders" => self.describe_capacity_providers(&request),
254            "UpdateCapacityProvider" => self.update_capacity_provider(&request),
255            "GetTaskProtection" => self.get_task_protection(&request),
256            "UpdateTaskProtection" => self.update_task_protection(&request),
257            "CreateTaskSet" => self.create_task_set(&request),
258            "UpdateTaskSet" => self.update_task_set(&request),
259            "DeleteTaskSet" => self.delete_task_set(&request),
260            "DescribeTaskSets" => self.describe_task_sets(&request),
261            "UpdateServicePrimaryTaskSet" => self.update_service_primary_task_set(&request),
262            "ExecuteCommand" => self.execute_command(&request).await,
263            "SubmitContainerStateChange" => self.submit_container_state_change(&request),
264            "SubmitTaskStateChange" => self.submit_task_state_change(&request),
265            "SubmitAttachmentStateChanges" => self.submit_attachment_state_changes(&request),
266            "DiscoverPollEndpoint" => self.discover_poll_endpoint(&request),
267            "StopServiceDeployment" => self.stop_service_deployment(&request),
268            "ListServiceDeployments" => self.list_service_deployments(&request),
269            "DescribeServiceDeployments" => self.describe_service_deployments(&request),
270            "DescribeServiceRevisions" => self.describe_service_revisions(&request),
271            _ => Err(AwsServiceError::action_not_implemented(
272                "ecs",
273                &request.action,
274            )),
275        };
276        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
277            self.save_snapshot().await;
278        }
279        result
280    }
281
282    fn supported_actions(&self) -> &[&str] {
283        SUPPORTED_ACTIONS
284    }
285}
286
287// -------- helpers --------
288
289fn req_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
290    body.get(field)
291        .and_then(|v| v.as_str())
292        .ok_or_else(|| client_exception(format!("Missing required field: {field}")))
293}
294
295fn opt_str<'a>(body: &'a Value, field: &str) -> Option<&'a str> {
296    body.get(field).and_then(|v| v.as_str())
297}
298
299fn client_exception(message: impl Into<String>) -> AwsServiceError {
300    AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "ClientException", message)
301}
302
303fn invalid_parameter(message: impl Into<String>) -> AwsServiceError {
304    AwsServiceError::aws_error(
305        StatusCode::BAD_REQUEST,
306        "InvalidParameterException",
307        message,
308    )
309}
310
311fn cluster_not_found(name: &str) -> AwsServiceError {
312    AwsServiceError::aws_error(
313        StatusCode::BAD_REQUEST,
314        "ClusterNotFoundException",
315        format!("The referenced cluster was inactive: {name}"),
316    )
317}
318
319fn cluster_contains_services() -> AwsServiceError {
320    AwsServiceError::aws_error(
321        StatusCode::BAD_REQUEST,
322        "ClusterContainsServicesException",
323        "The specified cluster still contains active services",
324    )
325}
326
327fn cluster_contains_tasks() -> AwsServiceError {
328    AwsServiceError::aws_error(
329        StatusCode::BAD_REQUEST,
330        "ClusterContainsTasksException",
331        "The specified cluster still contains active tasks",
332    )
333}
334
335fn task_definition_not_found(family_rev: &str) -> AwsServiceError {
336    AwsServiceError::aws_error(
337        StatusCode::BAD_REQUEST,
338        "ClientException",
339        format!("Unable to describe task definition: {family_rev}"),
340    )
341}
342
343fn parse_tags(body: &Value) -> Vec<TagEntry> {
344    body.get("tags")
345        .and_then(|v| v.as_array())
346        .map(|arr| {
347            arr.iter()
348                .filter_map(|t| {
349                    let k = t.get("key").and_then(|v| v.as_str())?;
350                    let v = t.get("value").and_then(|v| v.as_str()).unwrap_or("");
351                    Some(TagEntry {
352                        key: k.to_string(),
353                        value: v.to_string(),
354                    })
355                })
356                .collect()
357        })
358        .unwrap_or_default()
359}
360
361fn tags_json(tags: &[TagEntry]) -> Value {
362    Value::Array(
363        tags.iter()
364            .map(|t| json!({"key": t.key, "value": t.value}))
365            .collect(),
366    )
367}
368
369fn merge_tags(current: &mut Vec<TagEntry>, incoming: Vec<TagEntry>) {
370    for new_tag in incoming {
371        if let Some(existing) = current.iter_mut().find(|t| t.key == new_tag.key) {
372            existing.value = new_tag.value;
373        } else {
374            current.push(new_tag);
375        }
376    }
377}
378
379fn cluster_to_json(cluster: &Cluster) -> Value {
380    json!({
381        "clusterArn": cluster.cluster_arn,
382        "clusterName": cluster.cluster_name,
383        "status": cluster.status,
384        "registeredContainerInstancesCount": cluster.registered_container_instances_count,
385        "runningTasksCount": cluster.running_tasks_count,
386        "pendingTasksCount": cluster.pending_tasks_count,
387        "activeServicesCount": cluster.active_services_count,
388        "statistics": cluster.statistics,
389        "tags": tags_json(&cluster.tags),
390        "settings": cluster.settings,
391        "configuration": cluster.configuration,
392        "capacityProviders": cluster.capacity_providers,
393        "defaultCapacityProviderStrategy": cluster.default_capacity_provider_strategy,
394        "attachments": cluster.attachments,
395        "attachmentsStatus": cluster.attachments_status,
396        "serviceConnectDefaults": cluster.service_connect_defaults,
397    })
398}
399
400fn task_definition_to_json(td: &TaskDefinition) -> Value {
401    let mut map = Map::new();
402    map.insert("taskDefinitionArn".into(), json!(td.task_definition_arn));
403    map.insert("family".into(), json!(td.family));
404    map.insert("revision".into(), json!(td.revision));
405    map.insert("status".into(), json!(td.status));
406    map.insert(
407        "containerDefinitions".into(),
408        Value::Array(td.container_definitions.clone()),
409    );
410    map.insert("compatibilities".into(), json!(td.compatibilities));
411    map.insert(
412        "requiresCompatibilities".into(),
413        json!(td.requires_compatibilities),
414    );
415    map.insert("volumes".into(), Value::Array(td.volumes.clone()));
416    map.insert(
417        "placementConstraints".into(),
418        Value::Array(td.placement_constraints.clone()),
419    );
420    map.insert(
421        "requiresAttributes".into(),
422        Value::Array(td.requires_attributes.clone()),
423    );
424    map.insert(
425        "inferenceAccelerators".into(),
426        Value::Array(td.inference_accelerators.clone()),
427    );
428    if let Some(ref x) = td.network_mode {
429        map.insert("networkMode".into(), json!(x));
430    }
431    if let Some(ref x) = td.cpu {
432        map.insert("cpu".into(), json!(x));
433    }
434    if let Some(ref x) = td.memory {
435        map.insert("memory".into(), json!(x));
436    }
437    if let Some(ref x) = td.task_role_arn {
438        map.insert("taskRoleArn".into(), json!(x));
439    }
440    if let Some(ref x) = td.execution_role_arn {
441        map.insert("executionRoleArn".into(), json!(x));
442    }
443    if let Some(ref x) = td.pid_mode {
444        map.insert("pidMode".into(), json!(x));
445    }
446    if let Some(ref x) = td.ipc_mode {
447        map.insert("ipcMode".into(), json!(x));
448    }
449    if let Some(ref x) = td.proxy_configuration {
450        map.insert("proxyConfiguration".into(), x.clone());
451    }
452    if let Some(ref x) = td.ephemeral_storage {
453        map.insert("ephemeralStorage".into(), x.clone());
454    }
455    if let Some(ref x) = td.runtime_platform {
456        map.insert("runtimePlatform".into(), x.clone());
457    }
458    if let Some(ref x) = td.registered_by {
459        map.insert("registeredBy".into(), json!(x));
460    }
461    map.insert("registeredAt".into(), json!(td.registered_at.timestamp()));
462    if let Some(ts) = td.deregistered_at {
463        map.insert("deregisteredAt".into(), json!(ts.timestamp()));
464    }
465    if let Some(enabled) = td.enable_fault_injection {
466        map.insert("enableFaultInjection".into(), json!(enabled));
467    }
468    Value::Object(map)
469}
470
471/// Decode an `arn:aws:ecs:<region>:<account>:<type>/<name>[:<rev>]` ARN
472/// into `(account, resource_type, tail)`. For task definitions `tail` is
473/// `family:revision`; for clusters it's `cluster_name`.
474fn decode_ecs_arn(arn: &str) -> Result<(String, String, String), AwsServiceError> {
475    let rest = arn
476        .strip_prefix("arn:aws:ecs:")
477        .ok_or_else(|| invalid_parameter(format!("Malformed ECS ARN: {arn}")))?;
478    // Resource portion may itself contain a trailing `:<revision>`, so we
479    // split at most three ways then treat the remainder as the resource.
480    let mut parts = rest.splitn(3, ':');
481    let _region = parts
482        .next()
483        .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
484    let account = parts
485        .next()
486        .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
487    let resource = parts
488        .next()
489        .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
490    let (resource_type, tail) = resource
491        .split_once('/')
492        .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
493    Ok((
494        account.to_string(),
495        resource_type.to_string(),
496        tail.to_string(),
497    ))
498}
499
500/// Parse a `family[:revision]` reference. Returns `(family, Some(rev))`
501/// when a specific revision is requested, or `(family, None)` for the
502/// latest-active shorthand.
503fn parse_family_revision(input: &str) -> (String, Option<i32>) {
504    if let Some((family, rev)) = input.rsplit_once(':') {
505        if let Ok(n) = rev.parse::<i32>() {
506            return (family.to_string(), Some(n));
507        }
508    }
509    (input.to_string(), None)
510}
511
512/// Task-definition ARNs may appear as the full ARN or just `family:rev`.
513/// Returns `(account, family, Some(rev))` where `account` is `None` for
514/// the bare shorthand form.
515fn resolve_task_definition_ref(
516    input: &str,
517) -> Result<(Option<String>, String, Option<i32>), AwsServiceError> {
518    if input.starts_with("arn:aws:ecs:") {
519        let (account, resource_type, tail) = decode_ecs_arn(input)?;
520        if resource_type != "task-definition" {
521            return Err(invalid_parameter(format!(
522                "Expected task-definition ARN: {input}"
523            )));
524        }
525        let (family, rev) = parse_family_revision(&tail);
526        Ok((Some(account), family, rev))
527    } else {
528        let (family, rev) = parse_family_revision(input);
529        Ok((None, family, rev))
530    }
531}
532
533fn target_account_for_task_definition(request: &AwsRequest, td_ref: &str) -> String {
534    if let Ok((Some(account), _, _)) = resolve_task_definition_ref(td_ref) {
535        account
536    } else {
537        request.account_id.clone()
538    }
539}
540
541fn target_account_for_cluster(request: &AwsRequest, cluster_ref: Option<&str>) -> String {
542    if let Some(input) = cluster_ref {
543        if input.starts_with("arn:aws:ecs:") {
544            if let Ok((account, _, _)) = decode_ecs_arn(input) {
545                return account;
546            }
547        }
548    }
549    request.account_id.clone()
550}
551
552fn latest_active_revision(
553    revisions: &std::collections::BTreeMap<i32, TaskDefinition>,
554) -> Option<&TaskDefinition> {
555    revisions.values().rev().find(|td| td.status == "ACTIVE")
556}
557
558// -------- operations: clusters --------
559
560impl EcsService {
561    fn create_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
562        let body = request.json_body();
563        let cluster_name = opt_str(&body, "clusterName")
564            .unwrap_or("default")
565            .to_string();
566        let tags = parse_tags(&body);
567        let settings: Vec<Value> = body
568            .get("settings")
569            .and_then(|v| v.as_array())
570            .cloned()
571            .unwrap_or_default();
572        let configuration = body.get("configuration").cloned();
573        let capacity_providers: Vec<String> = body
574            .get("capacityProviders")
575            .and_then(|v| v.as_array())
576            .map(|arr| {
577                arr.iter()
578                    .filter_map(|v| v.as_str().map(String::from))
579                    .collect()
580            })
581            .unwrap_or_default();
582        let default_strategy: Vec<Value> = body
583            .get("defaultCapacityProviderStrategy")
584            .and_then(|v| v.as_array())
585            .cloned()
586            .unwrap_or_default();
587        let service_connect = body.get("serviceConnectDefaults").cloned();
588
589        let account = request.account_id.clone();
590        let mut accounts = self.state.write();
591        let state = accounts.get_or_create(&account);
592        let arn = state.cluster_arn(&cluster_name);
593        let mut cluster = Cluster::new(&cluster_name, arn);
594        cluster.tags = tags;
595        cluster.settings = settings;
596        cluster.configuration = configuration;
597        cluster.capacity_providers = capacity_providers;
598        cluster.default_capacity_provider_strategy = default_strategy;
599        cluster.service_connect_defaults = service_connect;
600        // CreateCluster on an existing cluster is idempotent-ish — AWS
601        // returns the existing cluster, potentially with merged settings.
602        // We keep it simple and overwrite on recreate.
603        state.clusters.insert(cluster_name.clone(), cluster.clone());
604
605        Ok(AwsResponse::ok_json(json!({
606            "cluster": cluster_to_json(&cluster),
607        })))
608    }
609
610    fn describe_clusters(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
611        let body = request.json_body();
612        let names: Vec<String> = body
613            .get("clusters")
614            .and_then(|v| v.as_array())
615            .map(|arr| {
616                arr.iter()
617                    .filter_map(|v| v.as_str().map(|s| EcsState::resolve_cluster_name(Some(s))))
618                    .collect()
619            })
620            .unwrap_or_else(|| vec!["default".to_string()]);
621
622        let account = request.account_id.clone();
623        let accounts = self.state.read();
624        let mut found = Vec::new();
625        let mut failures = Vec::new();
626        if let Some(state) = accounts.get(&account) {
627            for name in &names {
628                match state.clusters.get(name) {
629                    Some(c) => found.push(cluster_to_json(c)),
630                    None => failures.push(json!({
631                        "arn": state.cluster_arn(name),
632                        "reason": "MISSING",
633                    })),
634                }
635            }
636        } else {
637            for name in &names {
638                failures.push(json!({
639                    "arn": format!(
640                        "arn:aws:ecs:{}:{}:cluster/{}",
641                        accounts.region(),
642                        account,
643                        name
644                    ),
645                    "reason": "MISSING",
646                }));
647            }
648        }
649        Ok(AwsResponse::ok_json(json!({
650            "clusters": found,
651            "failures": failures,
652        })))
653    }
654
655    fn delete_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
656        let body = request.json_body();
657        let cluster_ref = opt_str(&body, "cluster");
658        let name = EcsState::resolve_cluster_name(cluster_ref);
659        let account = target_account_for_cluster(request, cluster_ref);
660
661        let mut accounts = self.state.write();
662        let state = accounts.get_or_create(&account);
663        let cluster = state
664            .clusters
665            .get_mut(&name)
666            .ok_or_else(|| cluster_not_found(&name))?;
667        if cluster.active_services_count > 0 {
668            return Err(cluster_contains_services());
669        }
670        if cluster.running_tasks_count > 0 || cluster.pending_tasks_count > 0 {
671            return Err(cluster_contains_tasks());
672        }
673        cluster.status = "INACTIVE".to_string();
674        let snapshot = cluster.clone();
675        // Real ECS keeps the cluster visible as INACTIVE for about an
676        // hour before garbage-collecting it. We drop it immediately to
677        // keep state bounded — callers that try to describe it by name
678        // will get a MISSING failure, matching the long-tail behaviour.
679        state.clusters.remove(&name);
680        Ok(AwsResponse::ok_json(json!({
681            "cluster": cluster_to_json(&snapshot),
682        })))
683    }
684
685    fn list_clusters(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
686        let body = request.json_body();
687        let max_results = body
688            .get("maxResults")
689            .and_then(|v| v.as_i64())
690            .filter(|n| (1..=100).contains(n))
691            .map(|n| n as usize)
692            .unwrap_or(100);
693        let next_token = opt_str(&body, "nextToken").unwrap_or("");
694
695        let account = request.account_id.clone();
696        let accounts = self.state.read();
697        let arns: Vec<String> = match accounts.get(&account) {
698            Some(state) => state
699                .clusters
700                .values()
701                .map(|c| c.cluster_arn.clone())
702                .collect(),
703            None => Vec::new(),
704        };
705        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
706        let end = (start + max_results).min(arns.len());
707        let page = arns[start..end].to_vec();
708        let next = if end < arns.len() {
709            Some(end.to_string())
710        } else {
711            None
712        };
713        let mut out = json!({ "clusterArns": page });
714        if let Some(n) = next {
715            out.as_object_mut()
716                .unwrap()
717                .insert("nextToken".into(), json!(n));
718        }
719        Ok(AwsResponse::ok_json(out))
720    }
721
722    fn update_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
723        let body = request.json_body();
724        let cluster_ref = req_str(&body, "cluster")?;
725        let name = EcsState::resolve_cluster_name(Some(cluster_ref));
726        let account = target_account_for_cluster(request, Some(cluster_ref));
727
728        let mut accounts = self.state.write();
729        let state = accounts.get_or_create(&account);
730        let cluster = state
731            .clusters
732            .get_mut(&name)
733            .ok_or_else(|| cluster_not_found(&name))?;
734        if let Some(settings) = body.get("settings").and_then(|v| v.as_array()) {
735            cluster.settings = settings.clone();
736        }
737        if let Some(cfg) = body.get("configuration") {
738            cluster.configuration = Some(cfg.clone());
739        }
740        if let Some(sc) = body.get("serviceConnectDefaults") {
741            cluster.service_connect_defaults = Some(sc.clone());
742        }
743        let snapshot = cluster.clone();
744        Ok(AwsResponse::ok_json(json!({
745            "cluster": cluster_to_json(&snapshot),
746        })))
747    }
748
749    fn update_cluster_settings(
750        &self,
751        request: &AwsRequest,
752    ) -> Result<AwsResponse, AwsServiceError> {
753        let body = request.json_body();
754        let cluster_ref = req_str(&body, "cluster")?;
755        let name = EcsState::resolve_cluster_name(Some(cluster_ref));
756        let account = target_account_for_cluster(request, Some(cluster_ref));
757        let settings: Vec<Value> = body
758            .get("settings")
759            .and_then(|v| v.as_array())
760            .cloned()
761            .unwrap_or_default();
762
763        let mut accounts = self.state.write();
764        let state = accounts.get_or_create(&account);
765        let cluster = state
766            .clusters
767            .get_mut(&name)
768            .ok_or_else(|| cluster_not_found(&name))?;
769        cluster.settings = settings;
770        let snapshot = cluster.clone();
771        Ok(AwsResponse::ok_json(json!({
772            "cluster": cluster_to_json(&snapshot),
773        })))
774    }
775
776    fn put_cluster_capacity_providers(
777        &self,
778        request: &AwsRequest,
779    ) -> Result<AwsResponse, AwsServiceError> {
780        let body = request.json_body();
781        let cluster_ref = req_str(&body, "cluster")?;
782        let name = EcsState::resolve_cluster_name(Some(cluster_ref));
783        let account = target_account_for_cluster(request, Some(cluster_ref));
784        let capacity_providers: Vec<String> = body
785            .get("capacityProviders")
786            .and_then(|v| v.as_array())
787            .map(|arr| {
788                arr.iter()
789                    .filter_map(|v| v.as_str().map(String::from))
790                    .collect()
791            })
792            .ok_or_else(|| client_exception("Missing required field: capacityProviders"))?;
793        let default_strategy: Vec<Value> = body
794            .get("defaultCapacityProviderStrategy")
795            .and_then(|v| v.as_array())
796            .cloned()
797            .ok_or_else(|| {
798                client_exception("Missing required field: defaultCapacityProviderStrategy")
799            })?;
800
801        let mut accounts = self.state.write();
802        let state = accounts.get_or_create(&account);
803        let cluster = state
804            .clusters
805            .get_mut(&name)
806            .ok_or_else(|| cluster_not_found(&name))?;
807        cluster.capacity_providers = capacity_providers;
808        cluster.default_capacity_provider_strategy = default_strategy;
809        let snapshot = cluster.clone();
810        Ok(AwsResponse::ok_json(json!({
811            "cluster": cluster_to_json(&snapshot),
812        })))
813    }
814}
815
816// -------- operations: task definitions --------
817
818impl EcsService {
819    fn register_task_definition(
820        &self,
821        request: &AwsRequest,
822    ) -> Result<AwsResponse, AwsServiceError> {
823        let body = request.json_body();
824        let family = req_str(&body, "family")?.to_string();
825        validate_family_name(&family)?;
826        let container_definitions = body
827            .get("containerDefinitions")
828            .and_then(|v| v.as_array())
829            .cloned()
830            .ok_or_else(|| client_exception("Missing required field: containerDefinitions"))?;
831        if container_definitions.is_empty() {
832            return Err(client_exception(
833                "Task definition must have at least one container",
834            ));
835        }
836        for cd in &container_definitions {
837            if cd
838                .get("name")
839                .and_then(|v| v.as_str())
840                .unwrap_or("")
841                .is_empty()
842            {
843                return Err(client_exception(
844                    "Container definition is missing required field: name",
845                ));
846            }
847            if cd
848                .get("image")
849                .and_then(|v| v.as_str())
850                .unwrap_or("")
851                .is_empty()
852            {
853                return Err(client_exception(
854                    "Container definition is missing required field: image",
855                ));
856            }
857        }
858        // PassRole trust check on the task + execution roles. Real AWS
859        // rejects RegisterTaskDefinition when the role's trust policy
860        // doesn't list `ecs-tasks.amazonaws.com`.
861        if let Some(role_arn) = opt_str(&body, "taskRoleArn") {
862            self.check_pass_role(&request.account_id, role_arn)?;
863        }
864        if let Some(role_arn) = opt_str(&body, "executionRoleArn") {
865            self.check_pass_role(&request.account_id, role_arn)?;
866        }
867
868        let tags = parse_tags(&body);
869        let requires_compatibilities: Vec<String> = body
870            .get("requiresCompatibilities")
871            .and_then(|v| v.as_array())
872            .map(|arr| {
873                arr.iter()
874                    .filter_map(|v| v.as_str().map(String::from))
875                    .collect()
876            })
877            .unwrap_or_default();
878        // Compatibilities reflect what the task definition is compatible with.
879        // We always claim EC2 and FARGATE since we execute via Docker either
880        // way — callers with stricter requirements set `requiresCompatibilities`.
881        let compatibilities = vec!["EC2".to_string(), "FARGATE".to_string()];
882
883        let account = request.account_id.clone();
884        let mut accounts = self.state.write();
885        let state = accounts.get_or_create(&account);
886        let revision = state.allocate_revision(&family);
887        let arn = state.task_definition_arn(&family, revision);
888        let td = TaskDefinition {
889            family: family.clone(),
890            revision,
891            task_definition_arn: arn,
892            container_definitions,
893            status: "ACTIVE".to_string(),
894            task_role_arn: opt_str(&body, "taskRoleArn").map(String::from),
895            execution_role_arn: opt_str(&body, "executionRoleArn").map(String::from),
896            network_mode: opt_str(&body, "networkMode").map(String::from),
897            requires_compatibilities,
898            compatibilities,
899            cpu: opt_str(&body, "cpu").map(String::from),
900            memory: opt_str(&body, "memory").map(String::from),
901            pid_mode: opt_str(&body, "pidMode").map(String::from),
902            ipc_mode: opt_str(&body, "ipcMode").map(String::from),
903            volumes: body
904                .get("volumes")
905                .and_then(|v| v.as_array())
906                .cloned()
907                .unwrap_or_default(),
908            placement_constraints: body
909                .get("placementConstraints")
910                .and_then(|v| v.as_array())
911                .cloned()
912                .unwrap_or_default(),
913            proxy_configuration: body.get("proxyConfiguration").cloned(),
914            inference_accelerators: body
915                .get("inferenceAccelerators")
916                .and_then(|v| v.as_array())
917                .cloned()
918                .unwrap_or_default(),
919            ephemeral_storage: body.get("ephemeralStorage").cloned(),
920            runtime_platform: body.get("runtimePlatform").cloned(),
921            requires_attributes: Vec::new(),
922            registered_at: Utc::now(),
923            registered_by: request
924                .principal
925                .as_ref()
926                .map(|p| p.arn.clone())
927                .or(Some(format!("arn:aws:iam::{}:root", state.account_id))),
928            deregistered_at: None,
929            tags: tags.clone(),
930            enable_fault_injection: body.get("enableFaultInjection").and_then(|v| v.as_bool()),
931        };
932        let td_json = task_definition_to_json(&td);
933        state
934            .task_definitions
935            .entry(family.clone())
936            .or_default()
937            .insert(revision, td);
938
939        Ok(AwsResponse::ok_json(json!({
940            "taskDefinition": td_json,
941            "tags": tags_json(&tags),
942        })))
943    }
944
945    fn describe_task_definition(
946        &self,
947        request: &AwsRequest,
948    ) -> Result<AwsResponse, AwsServiceError> {
949        let body = request.json_body();
950        let td_ref = req_str(&body, "taskDefinition")?;
951        let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
952        let include_tags = body
953            .get("include")
954            .and_then(|v| v.as_array())
955            .map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
956            .unwrap_or(false);
957
958        let account = target_account_for_task_definition(request, td_ref);
959        let accounts = self.state.read();
960        let state = accounts
961            .get(&account)
962            .ok_or_else(|| task_definition_not_found(td_ref))?;
963        let revisions = state
964            .task_definitions
965            .get(&family)
966            .ok_or_else(|| task_definition_not_found(td_ref))?;
967        let td = match rev {
968            Some(n) => revisions
969                .get(&n)
970                .ok_or_else(|| task_definition_not_found(td_ref))?,
971            None => latest_active_revision(revisions)
972                .ok_or_else(|| task_definition_not_found(td_ref))?,
973        };
974        let mut out = json!({"taskDefinition": task_definition_to_json(td)});
975        if include_tags {
976            out.as_object_mut()
977                .unwrap()
978                .insert("tags".into(), tags_json(&td.tags));
979        }
980        Ok(AwsResponse::ok_json(out))
981    }
982
983    fn deregister_task_definition(
984        &self,
985        request: &AwsRequest,
986    ) -> Result<AwsResponse, AwsServiceError> {
987        let body = request.json_body();
988        let td_ref = req_str(&body, "taskDefinition")?;
989        let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
990        let rev =
991            rev.ok_or_else(|| client_exception("taskDefinition must reference a revision"))?;
992
993        let account = target_account_for_task_definition(request, td_ref);
994        let mut accounts = self.state.write();
995        let state = accounts.get_or_create(&account);
996        let revisions = state
997            .task_definitions
998            .get_mut(&family)
999            .ok_or_else(|| task_definition_not_found(td_ref))?;
1000        let td = revisions
1001            .get_mut(&rev)
1002            .ok_or_else(|| task_definition_not_found(td_ref))?;
1003        td.status = "INACTIVE".to_string();
1004        td.deregistered_at = Some(Utc::now());
1005        let snapshot = td.clone();
1006        Ok(AwsResponse::ok_json(json!({
1007            "taskDefinition": task_definition_to_json(&snapshot),
1008        })))
1009    }
1010
1011    fn delete_task_definitions(
1012        &self,
1013        request: &AwsRequest,
1014    ) -> Result<AwsResponse, AwsServiceError> {
1015        let body = request.json_body();
1016        let refs: Vec<String> = body
1017            .get("taskDefinitions")
1018            .and_then(|v| v.as_array())
1019            .map(|arr| {
1020                arr.iter()
1021                    .filter_map(|v| v.as_str().map(String::from))
1022                    .collect()
1023            })
1024            .ok_or_else(|| client_exception("Missing required field: taskDefinitions"))?;
1025        if refs.is_empty() {
1026            return Err(client_exception("taskDefinitions must not be empty"));
1027        }
1028
1029        let mut deleted = Vec::new();
1030        let mut failures = Vec::new();
1031        let account = request.account_id.clone();
1032        let mut accounts = self.state.write();
1033        let state = accounts.get_or_create(&account);
1034        for input in refs {
1035            let parsed = match resolve_task_definition_ref(&input) {
1036                Ok((_, family, Some(rev))) => Some((family, rev)),
1037                Ok(_) => None,
1038                Err(_) => None,
1039            };
1040            let Some((family, rev)) = parsed else {
1041                failures.push(json!({
1042                    "arn": input,
1043                    "reason": "INVALID_REFERENCE",
1044                    "detail": "Expected family:revision or full task-definition ARN",
1045                }));
1046                continue;
1047            };
1048            let Some(revisions) = state.task_definitions.get_mut(&family) else {
1049                failures.push(json!({"arn": input, "reason": "MISSING"}));
1050                continue;
1051            };
1052            let Some(td) = revisions.get_mut(&rev) else {
1053                failures.push(json!({"arn": input, "reason": "MISSING"}));
1054                continue;
1055            };
1056            if td.status == "ACTIVE" {
1057                failures.push(json!({
1058                    "arn": td.task_definition_arn.clone(),
1059                    "reason": "MUST_BE_INACTIVE",
1060                    "detail": "Task definitions must be deregistered before they can be deleted",
1061                }));
1062                continue;
1063            }
1064            td.status = "DELETE_IN_PROGRESS".to_string();
1065            deleted.push(task_definition_to_json(td));
1066        }
1067        Ok(AwsResponse::ok_json(json!({
1068            "taskDefinitions": deleted,
1069            "failures": failures,
1070        })))
1071    }
1072
1073    fn list_task_definitions(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1074        let body = request.json_body();
1075        let family_prefix = opt_str(&body, "familyPrefix");
1076        let status = opt_str(&body, "status").unwrap_or("ACTIVE");
1077        let sort = opt_str(&body, "sort").unwrap_or("ASC");
1078        let max_results = body
1079            .get("maxResults")
1080            .and_then(|v| v.as_i64())
1081            .filter(|n| (1..=100).contains(n))
1082            .map(|n| n as usize)
1083            .unwrap_or(100);
1084        let next_token = opt_str(&body, "nextToken").unwrap_or("");
1085
1086        let account = request.account_id.clone();
1087        let accounts = self.state.read();
1088        let mut arns: Vec<String> = Vec::new();
1089        if let Some(state) = accounts.get(&account) {
1090            for (family, revisions) in &state.task_definitions {
1091                if let Some(prefix) = family_prefix {
1092                    if !family.starts_with(prefix) {
1093                        continue;
1094                    }
1095                }
1096                for td in revisions.values() {
1097                    if td.status == status {
1098                        arns.push(td.task_definition_arn.clone());
1099                    }
1100                }
1101            }
1102        }
1103        if sort == "DESC" {
1104            arns.sort();
1105            arns.reverse();
1106        } else {
1107            arns.sort();
1108        }
1109        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
1110        let end = (start + max_results).min(arns.len());
1111        let page = arns[start..end].to_vec();
1112        let mut out = json!({"taskDefinitionArns": page});
1113        if end < arns.len() {
1114            out.as_object_mut()
1115                .unwrap()
1116                .insert("nextToken".into(), json!(end.to_string()));
1117        }
1118        Ok(AwsResponse::ok_json(out))
1119    }
1120
1121    fn list_task_definition_families(
1122        &self,
1123        request: &AwsRequest,
1124    ) -> Result<AwsResponse, AwsServiceError> {
1125        let body = request.json_body();
1126        let family_prefix = opt_str(&body, "familyPrefix");
1127        let status = opt_str(&body, "status").unwrap_or("ACTIVE");
1128        let max_results = body
1129            .get("maxResults")
1130            .and_then(|v| v.as_i64())
1131            .filter(|n| (1..=100).contains(n))
1132            .map(|n| n as usize)
1133            .unwrap_or(100);
1134        let next_token = opt_str(&body, "nextToken").unwrap_or("");
1135
1136        let account = request.account_id.clone();
1137        let accounts = self.state.read();
1138        let mut families: Vec<String> = Vec::new();
1139        if let Some(state) = accounts.get(&account) {
1140            for (family, revisions) in &state.task_definitions {
1141                if let Some(prefix) = family_prefix {
1142                    if !family.starts_with(prefix) {
1143                        continue;
1144                    }
1145                }
1146                let matches_status = match status {
1147                    "ACTIVE" => revisions.values().any(|td| td.status == "ACTIVE"),
1148                    "INACTIVE" => revisions
1149                        .values()
1150                        .all(|td| td.status == "INACTIVE" || td.status == "DELETE_IN_PROGRESS"),
1151                    "ALL" => true,
1152                    _ => revisions.values().any(|td| td.status == status),
1153                };
1154                if matches_status {
1155                    families.push(family.clone());
1156                }
1157            }
1158        }
1159        families.sort();
1160        let start = next_token.parse::<usize>().unwrap_or(0).min(families.len());
1161        let end = (start + max_results).min(families.len());
1162        let page = families[start..end].to_vec();
1163        let mut out = json!({"families": page});
1164        if end < families.len() {
1165            out.as_object_mut()
1166                .unwrap()
1167                .insert("nextToken".into(), json!(end.to_string()));
1168        }
1169        Ok(AwsResponse::ok_json(out))
1170    }
1171}
1172
1173fn validate_family_name(family: &str) -> Result<(), AwsServiceError> {
1174    if family.is_empty() || family.len() > 255 {
1175        return Err(invalid_parameter(
1176            "Task definition family must be 1-255 characters",
1177        ));
1178    }
1179    let ok = family
1180        .chars()
1181        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-');
1182    if !ok {
1183        return Err(invalid_parameter(
1184            "Task definition family may only contain letters, numbers, hyphens, and underscores",
1185        ));
1186    }
1187    Ok(())
1188}
1189
1190// -------- operations: tagging --------
1191
1192impl EcsService {
1193    fn tag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1194        let body = request.json_body();
1195        let arn = req_str(&body, "resourceArn")?.to_string();
1196        let tags = parse_tags(&body);
1197        let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1198        let mut accounts = self.state.write();
1199        let state = accounts.get_or_create(&account);
1200        match resource_type.as_str() {
1201            "cluster" => {
1202                let cluster = state
1203                    .clusters
1204                    .get_mut(&tail)
1205                    .ok_or_else(|| resource_not_found(&arn))?;
1206                merge_tags(&mut cluster.tags, tags);
1207            }
1208            "task-definition" => {
1209                let (family, rev) = parse_family_revision(&tail);
1210                let rev = rev.ok_or_else(|| {
1211                    invalid_parameter("task-definition ARN must include revision")
1212                })?;
1213                let td = state
1214                    .task_definitions
1215                    .get_mut(&family)
1216                    .and_then(|m| m.get_mut(&rev))
1217                    .ok_or_else(|| resource_not_found(&arn))?;
1218                merge_tags(&mut td.tags, tags);
1219            }
1220            other => {
1221                return Err(invalid_parameter(format!(
1222                    "Tagging not yet supported for resource type: {other}"
1223                )));
1224            }
1225        }
1226        Ok(AwsResponse::ok_json(json!({})))
1227    }
1228
1229    fn untag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1230        let body = request.json_body();
1231        let arn = req_str(&body, "resourceArn")?.to_string();
1232        let keys: Vec<String> = body
1233            .get("tagKeys")
1234            .and_then(|v| v.as_array())
1235            .map(|arr| {
1236                arr.iter()
1237                    .filter_map(|v| v.as_str().map(String::from))
1238                    .collect()
1239            })
1240            .unwrap_or_default();
1241        let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1242        let mut accounts = self.state.write();
1243        let state = accounts.get_or_create(&account);
1244        match resource_type.as_str() {
1245            "cluster" => {
1246                let cluster = state
1247                    .clusters
1248                    .get_mut(&tail)
1249                    .ok_or_else(|| resource_not_found(&arn))?;
1250                cluster.tags.retain(|t| !keys.contains(&t.key));
1251            }
1252            "task-definition" => {
1253                let (family, rev) = parse_family_revision(&tail);
1254                let rev = rev.ok_or_else(|| {
1255                    invalid_parameter("task-definition ARN must include revision")
1256                })?;
1257                let td = state
1258                    .task_definitions
1259                    .get_mut(&family)
1260                    .and_then(|m| m.get_mut(&rev))
1261                    .ok_or_else(|| resource_not_found(&arn))?;
1262                td.tags.retain(|t| !keys.contains(&t.key));
1263            }
1264            other => {
1265                return Err(invalid_parameter(format!(
1266                    "Tagging not yet supported for resource type: {other}"
1267                )));
1268            }
1269        }
1270        Ok(AwsResponse::ok_json(json!({})))
1271    }
1272
1273    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1274        let body = request.json_body();
1275        let arn = req_str(&body, "resourceArn")?.to_string();
1276        let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1277        let accounts = self.state.read();
1278        let state = accounts
1279            .get(&account)
1280            .ok_or_else(|| resource_not_found(&arn))?;
1281        let tags = match resource_type.as_str() {
1282            "cluster" => state
1283                .clusters
1284                .get(&tail)
1285                .map(|c| c.tags.clone())
1286                .ok_or_else(|| resource_not_found(&arn))?,
1287            "task-definition" => {
1288                let (family, rev) = parse_family_revision(&tail);
1289                let rev = rev.ok_or_else(|| {
1290                    invalid_parameter("task-definition ARN must include revision")
1291                })?;
1292                state
1293                    .task_definitions
1294                    .get(&family)
1295                    .and_then(|m| m.get(&rev))
1296                    .map(|td| td.tags.clone())
1297                    .ok_or_else(|| resource_not_found(&arn))?
1298            }
1299            other => {
1300                return Err(invalid_parameter(format!(
1301                    "ListTagsForResource not yet supported for resource type: {other}"
1302                )));
1303            }
1304        };
1305        Ok(AwsResponse::ok_json(json!({"tags": tags_json(&tags)})))
1306    }
1307}
1308
1309fn resource_not_found(arn: &str) -> AwsServiceError {
1310    AwsServiceError::aws_error(
1311        StatusCode::BAD_REQUEST,
1312        "ClientException",
1313        format!("The referenced resource was not found: {arn}"),
1314    )
1315}
1316
1317// -------- operations: account settings --------
1318
1319impl EcsService {
1320    fn put_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1321        let body = request.json_body();
1322        let name = req_str(&body, "name")?.to_string();
1323        let value = req_str(&body, "value")?.to_string();
1324        let principal_arn = opt_str(&body, "principalArn")
1325            .map(String::from)
1326            .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1327            .unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
1328        let account = request.account_id.clone();
1329        let mut accounts = self.state.write();
1330        let state = accounts.get_or_create(&account);
1331        state
1332            .principal_account_settings
1333            .entry(principal_arn.clone())
1334            .or_default()
1335            .insert(name.clone(), value.clone());
1336        Ok(AwsResponse::ok_json(json!({
1337            "setting": {
1338                "name": name,
1339                "value": value,
1340                "principalArn": principal_arn,
1341            }
1342        })))
1343    }
1344
1345    fn put_account_setting_default(
1346        &self,
1347        request: &AwsRequest,
1348    ) -> Result<AwsResponse, AwsServiceError> {
1349        let body = request.json_body();
1350        let name = req_str(&body, "name")?.to_string();
1351        let value = req_str(&body, "value")?.to_string();
1352        let account = request.account_id.clone();
1353        let mut accounts = self.state.write();
1354        let state = accounts.get_or_create(&account);
1355        state
1356            .account_setting_defaults
1357            .insert(name.clone(), value.clone());
1358        Ok(AwsResponse::ok_json(json!({
1359            "setting": {
1360                "name": name,
1361                "value": value,
1362                "principalArn": format!("arn:aws:iam::{}:root", state.account_id),
1363            }
1364        })))
1365    }
1366
1367    fn delete_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1368        let body = request.json_body();
1369        let name = req_str(&body, "name")?.to_string();
1370        let principal_arn = opt_str(&body, "principalArn")
1371            .map(String::from)
1372            .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1373            .unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
1374        let account = request.account_id.clone();
1375        let mut accounts = self.state.write();
1376        let state = accounts.get_or_create(&account);
1377        let removed_value = state
1378            .principal_account_settings
1379            .get_mut(&principal_arn)
1380            .and_then(|m| m.remove(&name));
1381        Ok(AwsResponse::ok_json(json!({
1382            "setting": {
1383                "name": name,
1384                "value": removed_value.unwrap_or_default(),
1385                "principalArn": principal_arn,
1386            }
1387        })))
1388    }
1389
1390    fn list_account_settings(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1391        let body = request.json_body();
1392        let name_filter = opt_str(&body, "name");
1393        let value_filter = opt_str(&body, "value");
1394        let principal_filter = opt_str(&body, "principalArn");
1395        let effective_only = body
1396            .get("effectiveSettings")
1397            .and_then(|v| v.as_bool())
1398            .unwrap_or(false);
1399
1400        let account = request.account_id.clone();
1401        let accounts = self.state.read();
1402        let Some(state) = accounts.get(&account) else {
1403            return Ok(AwsResponse::ok_json(json!({"settings": []})));
1404        };
1405        let root_arn = format!("arn:aws:iam::{}:root", state.account_id);
1406        let mut settings: Vec<Value> = Vec::new();
1407
1408        if effective_only {
1409            // Merge principal overrides onto defaults, scoped to principal_filter
1410            // when supplied; otherwise use the caller's own principal.
1411            let principal = principal_filter
1412                .map(String::from)
1413                .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1414                .unwrap_or_else(|| root_arn.clone());
1415            let mut merged = state.account_setting_defaults.clone();
1416            if let Some(overrides) = state.principal_account_settings.get(&principal) {
1417                for (k, v) in overrides {
1418                    merged.insert(k.clone(), v.clone());
1419                }
1420            }
1421            for (k, v) in merged {
1422                if matches_filter(name_filter, &k) && matches_filter(value_filter, &v) {
1423                    settings.push(json!({
1424                        "name": k,
1425                        "value": v,
1426                        "principalArn": principal,
1427                    }));
1428                }
1429            }
1430        } else {
1431            // Raw listing: include defaults (under the root ARN) plus any
1432            // principal-specific settings.
1433            for (k, v) in &state.account_setting_defaults {
1434                if matches_filter(name_filter, k)
1435                    && matches_filter(value_filter, v)
1436                    && (principal_filter.is_none() || principal_filter == Some(root_arn.as_str()))
1437                {
1438                    settings.push(json!({
1439                        "name": k,
1440                        "value": v,
1441                        "principalArn": root_arn,
1442                    }));
1443                }
1444            }
1445            for (principal, entries) in &state.principal_account_settings {
1446                if principal_filter.is_some_and(|pf| pf != principal) {
1447                    continue;
1448                }
1449                for (k, v) in entries {
1450                    if matches_filter(name_filter, k) && matches_filter(value_filter, v) {
1451                        settings.push(json!({
1452                            "name": k,
1453                            "value": v,
1454                            "principalArn": principal,
1455                        }));
1456                    }
1457                }
1458            }
1459        }
1460
1461        Ok(AwsResponse::ok_json(json!({"settings": settings})))
1462    }
1463}
1464
1465fn matches_filter(filter: Option<&str>, value: &str) -> bool {
1466    filter.is_none_or(|f| f == value)
1467}
1468
1469// -------- operations: tasks --------
1470
1471impl EcsService {
1472    fn run_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1473        let body = request.json_body();
1474        let td_ref = req_str(&body, "taskDefinition")?;
1475        let cluster_ref = opt_str(&body, "cluster");
1476        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1477        let launch_type = opt_str(&body, "launchType")
1478            .unwrap_or("FARGATE")
1479            .to_string();
1480        let count = body
1481            .get("count")
1482            .and_then(|v| v.as_i64())
1483            .filter(|n| (1..=10).contains(n))
1484            .unwrap_or(1) as usize;
1485        let group = opt_str(&body, "group").map(String::from);
1486        let started_by = opt_str(&body, "startedBy").map(String::from);
1487        let tags = parse_tags(&body);
1488
1489        // PassRole trust check on any role overrides supplied via the
1490        // overrides.taskRoleArn / overrides.executionRoleArn fields.
1491        // The base task definition was already checked at Register time,
1492        // but RunTask can override either role and AWS re-validates the
1493        // trust policy on every call.
1494        if let Some(overrides) = body.get("overrides") {
1495            if let Some(role_arn) = opt_str(overrides, "taskRoleArn") {
1496                self.check_pass_role(&request.account_id, role_arn)?;
1497            }
1498            if let Some(role_arn) = opt_str(overrides, "executionRoleArn") {
1499                self.check_pass_role(&request.account_id, role_arn)?;
1500            }
1501        }
1502
1503        let account = request.account_id.clone();
1504        let runtime = self.runtime.clone();
1505        let mut accounts = self.state.write();
1506        let state = accounts.get_or_create(&account);
1507        let cluster_arn = state
1508            .clusters
1509            .get(&cluster_name)
1510            .map(|c| c.cluster_arn.clone())
1511            .unwrap_or_else(|| state.cluster_arn(&cluster_name));
1512        let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
1513        let revisions = state
1514            .task_definitions
1515            .get(&family)
1516            .ok_or_else(|| task_definition_not_found(td_ref))?;
1517        let td = match rev {
1518            Some(n) => revisions
1519                .get(&n)
1520                .ok_or_else(|| task_definition_not_found(td_ref))?,
1521            None => latest_active_revision(revisions)
1522                .ok_or_else(|| task_definition_not_found(td_ref))?,
1523        };
1524        if td.status != "ACTIVE" {
1525            return Err(client_exception(format!(
1526                "Task definition {} is not ACTIVE",
1527                td.task_definition_arn
1528            )));
1529        }
1530        let td_arn = td.task_definition_arn.clone();
1531        let td_family = td.family.clone();
1532        let td_revision = td.revision;
1533        let td_cpu = td.cpu.clone();
1534        let td_memory = td.memory.clone();
1535        let td_task_role = td.task_role_arn.clone();
1536        let td_exec_role = td.execution_role_arn.clone();
1537        let td_containers = td.container_definitions.clone();
1538
1539        let mut spawned_tasks: Vec<String> = Vec::new();
1540        let mut task_jsons: Vec<Value> = Vec::new();
1541        for _ in 0..count {
1542            let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
1543            let task_arn = state.task_arn(&cluster_name, &task_id);
1544            let containers: Vec<Container> = td_containers
1545                .iter()
1546                .map(|def| Container {
1547                    container_arn: format!(
1548                        "arn:aws:ecs:{}:{}:container/{}/{}/{}",
1549                        state.region,
1550                        state.account_id,
1551                        cluster_name,
1552                        task_id,
1553                        def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
1554                    ),
1555                    name: def
1556                        .get("name")
1557                        .and_then(|v| v.as_str())
1558                        .unwrap_or("app")
1559                        .to_string(),
1560                    image: def
1561                        .get("image")
1562                        .and_then(|v| v.as_str())
1563                        .unwrap_or("")
1564                        .to_string(),
1565                    task_arn: task_arn.clone(),
1566                    last_status: "PENDING".into(),
1567                    exit_code: None,
1568                    reason: None,
1569                    runtime_id: None,
1570                    essential: def
1571                        .get("essential")
1572                        .and_then(|v| v.as_bool())
1573                        .unwrap_or(true),
1574                    cpu: def
1575                        .get("cpu")
1576                        .and_then(|v| v.as_i64())
1577                        .map(|n| n.to_string()),
1578                    memory: def
1579                        .get("memory")
1580                        .and_then(|v| v.as_i64())
1581                        .map(|n| n.to_string()),
1582                    memory_reservation: def
1583                        .get("memoryReservation")
1584                        .and_then(|v| v.as_i64())
1585                        .map(|n| n.to_string()),
1586                    network_bindings: Vec::new(),
1587                    network_interfaces: Vec::new(),
1588                    health_status: Some("UNKNOWN".to_string()),
1589                    managed_agents: None,
1590                })
1591                .collect();
1592            let awslogs = td_containers.iter().find_map(|def| {
1593                let name = def.get("name").and_then(|v| v.as_str())?.to_string();
1594                let log_cfg = def.get("logConfiguration")?;
1595                if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
1596                    return None;
1597                }
1598                let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
1599                Some(AwsLogsConfig {
1600                    group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
1601                    stream_prefix: opts
1602                        .get("awslogs-stream-prefix")
1603                        .and_then(|v| v.as_str())
1604                        .map(String::from),
1605                    region: opts
1606                        .get("awslogs-region")
1607                        .and_then(|v| v.as_str())
1608                        .unwrap_or(&state.region)
1609                        .to_string(),
1610                    container_name: name,
1611                })
1612            });
1613            let task = Task {
1614                task_arn: task_arn.clone(),
1615                task_id: task_id.clone(),
1616                cluster_arn: cluster_arn.clone(),
1617                cluster_name: cluster_name.clone(),
1618                task_definition_arn: td_arn.clone(),
1619                family: td_family.clone(),
1620                revision: td_revision,
1621                last_status: "PROVISIONING".into(),
1622                desired_status: "RUNNING".into(),
1623                launch_type: launch_type.clone(),
1624                platform_version: Some("1.4.0".into()),
1625                cpu: body
1626                    .get("overrides")
1627                    .and_then(|v| v.get("cpu"))
1628                    .and_then(|v| v.as_str())
1629                    .map(String::from)
1630                    .or_else(|| td_cpu.clone()),
1631                memory: body
1632                    .get("overrides")
1633                    .and_then(|v| v.get("memory"))
1634                    .and_then(|v| v.as_str())
1635                    .map(String::from)
1636                    .or_else(|| td_memory.clone()),
1637                containers,
1638                overrides: body.get("overrides").cloned().unwrap_or_else(|| json!({})),
1639                started_by: started_by.clone(),
1640                group: group.clone(),
1641                connectivity: "CONNECTING".into(),
1642                stop_code: None,
1643                stopped_reason: None,
1644                created_at: Utc::now(),
1645                started_at: None,
1646                stopping_at: None,
1647                stopped_at: None,
1648                pull_started_at: None,
1649                pull_stopped_at: None,
1650                connectivity_at: None,
1651                started_by_ref_id: None,
1652                execution_role_arn: td_exec_role.clone(),
1653                task_role_arn: td_task_role.clone(),
1654                tags: tags.clone(),
1655                awslogs,
1656                captured_logs: String::new(),
1657                protection: None,
1658            };
1659            state.tasks.insert(task_id.clone(), task.clone());
1660            if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
1661                cluster.pending_tasks_count += 1;
1662            }
1663            // Snapshot-in-progress: transition to PENDING synchronously so
1664            // callers that immediately DescribeTasks see movement. RUNNING /
1665            // STOPPED come later from the background runtime task.
1666            if let Some(t) = state.tasks.get_mut(&task_id) {
1667                t.last_status = "PENDING".into();
1668            }
1669            task_jsons.push(task_to_json(&task));
1670            spawned_tasks.push(task_id.clone());
1671        }
1672        drop(accounts);
1673
1674        // Launch container execution outside the state lock.
1675        if let Some(rt) = runtime {
1676            for id in &spawned_tasks {
1677                rt.clone()
1678                    .run_task(self.state.clone(), id.clone(), account.clone());
1679            }
1680        } else {
1681            // No runtime available — fail fast so the task doesn't stay
1682            // PENDING forever. We incremented pending_tasks_count above;
1683            // decrement it here so the cluster counter doesn't drift and
1684            // block later DeleteCluster calls.
1685            let mut accounts = self.state.write();
1686            if let Some(state) = accounts.get_mut(&account) {
1687                let mut cluster_drains: Vec<String> = Vec::new();
1688                for id in &spawned_tasks {
1689                    if let Some(t) = state.tasks.get_mut(id) {
1690                        t.last_status = "STOPPED".into();
1691                        t.desired_status = "STOPPED".into();
1692                        t.stop_code = Some("TaskFailedToStart".into());
1693                        t.stopped_reason = Some(
1694                            "No container runtime available (docker/podman not installed)".into(),
1695                        );
1696                        t.stopped_at = Some(Utc::now());
1697                        for c in t.containers.iter_mut() {
1698                            c.last_status = "STOPPED".into();
1699                        }
1700                        cluster_drains.push(t.cluster_name.clone());
1701                    }
1702                }
1703                for name in cluster_drains {
1704                    if let Some(cluster) = state.clusters.get_mut(&name) {
1705                        if cluster.pending_tasks_count > 0 {
1706                            cluster.pending_tasks_count -= 1;
1707                        }
1708                    }
1709                }
1710            }
1711        }
1712
1713        Ok(AwsResponse::ok_json(json!({
1714            "tasks": task_jsons,
1715            "failures": [],
1716        })))
1717    }
1718
1719    fn start_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1720        // StartTask targets explicit container instances. Our ECS emulator
1721        // has no concept of registered container instances yet (Batch 4);
1722        // fall through to the same semantics as RunTask so the API is
1723        // usable while the container-instance surface is pending.
1724        self.run_task(request)
1725    }
1726
1727    async fn stop_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1728        let body = request.json_body();
1729        let task_ref = req_str(&body, "task")?;
1730        let reason = opt_str(&body, "reason")
1731            .unwrap_or("UserInitiated")
1732            .to_string();
1733        let cluster_ref = opt_str(&body, "cluster");
1734        let _cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1735
1736        let (task_id, account, task_snapshot) = {
1737            let account = request.account_id.clone();
1738            let mut accounts = self.state.write();
1739            let state = accounts
1740                .get_mut(&account)
1741                .ok_or_else(|| task_not_found(task_ref))?;
1742            let task_id = resolve_task_id(state, task_ref)?;
1743            let task = state
1744                .tasks
1745                .get_mut(&task_id)
1746                .ok_or_else(|| task_not_found(task_ref))?;
1747            task.desired_status = "STOPPED".into();
1748            task.stopping_at = Some(Utc::now());
1749            task.stopped_reason = Some(reason.clone());
1750            task.stop_code = Some("UserInitiated".into());
1751            (task_id, account, task.clone())
1752        };
1753        if let Some(rt) = &self.runtime {
1754            rt.stop_task(&task_id, &reason).await;
1755        }
1756        let _ = account;
1757        Ok(AwsResponse::ok_json(json!({
1758            "task": task_to_json(&task_snapshot),
1759        })))
1760    }
1761
1762    fn describe_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1763        let body = request.json_body();
1764        let refs: Vec<String> = body
1765            .get("tasks")
1766            .and_then(|v| v.as_array())
1767            .map(|arr| {
1768                arr.iter()
1769                    .filter_map(|v| v.as_str().map(String::from))
1770                    .collect()
1771            })
1772            .unwrap_or_default();
1773        let include_tags = body
1774            .get("include")
1775            .and_then(|v| v.as_array())
1776            .map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
1777            .unwrap_or(false);
1778
1779        let account = request.account_id.clone();
1780        let accounts = self.state.read();
1781        let Some(state) = accounts.get(&account) else {
1782            return Ok(AwsResponse::ok_json(json!({
1783                "tasks": [],
1784                "failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
1785            })));
1786        };
1787        let mut found = Vec::new();
1788        let mut failures = Vec::new();
1789        for input in &refs {
1790            let task_id = task_id_from_ref(input);
1791            match state.tasks.get(&task_id) {
1792                Some(t) => {
1793                    let mut v = task_to_json(t);
1794                    if include_tags {
1795                        v.as_object_mut()
1796                            .unwrap()
1797                            .insert("tags".into(), tags_json(&t.tags));
1798                    }
1799                    found.push(v);
1800                }
1801                None => {
1802                    failures.push(json!({
1803                        "arn": input,
1804                        "reason": "MISSING",
1805                    }));
1806                }
1807            }
1808        }
1809        Ok(AwsResponse::ok_json(json!({
1810            "tasks": found,
1811            "failures": failures,
1812        })))
1813    }
1814
1815    fn list_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1816        let body = request.json_body();
1817        let cluster_ref = opt_str(&body, "cluster");
1818        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1819        let family = opt_str(&body, "family");
1820        let status_filter = opt_str(&body, "desiredStatus");
1821        let started_by = opt_str(&body, "startedBy");
1822        let max_results = body
1823            .get("maxResults")
1824            .and_then(|v| v.as_i64())
1825            .filter(|n| (1..=100).contains(n))
1826            .map(|n| n as usize)
1827            .unwrap_or(100);
1828        let next_token = opt_str(&body, "nextToken").unwrap_or("");
1829
1830        let account = request.account_id.clone();
1831        let accounts = self.state.read();
1832        let mut arns: Vec<String> = match accounts.get(&account) {
1833            Some(state) => state
1834                .tasks
1835                .values()
1836                .filter(|t| t.cluster_name == cluster_name)
1837                .filter(|t| family.is_none_or(|f| t.family == f))
1838                .filter(|t| status_filter.is_none_or(|s| t.desired_status == s))
1839                .filter(|t| started_by.is_none_or(|s| t.started_by.as_deref() == Some(s)))
1840                .map(|t| t.task_arn.clone())
1841                .collect(),
1842            None => Vec::new(),
1843        };
1844        arns.sort();
1845        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
1846        let end = (start + max_results).min(arns.len());
1847        let page = arns[start..end].to_vec();
1848        let mut out = json!({"taskArns": page});
1849        if end < arns.len() {
1850            out.as_object_mut()
1851                .unwrap()
1852                .insert("nextToken".into(), json!(end.to_string()));
1853        }
1854        Ok(AwsResponse::ok_json(out))
1855    }
1856}
1857
1858fn task_not_found(task_ref: &str) -> AwsServiceError {
1859    AwsServiceError::aws_error(
1860        StatusCode::BAD_REQUEST,
1861        "ClientException",
1862        format!("Task not found: {task_ref}"),
1863    )
1864}
1865
1866/// Strip cluster prefix + optional ARN prefix to recover the task UUID.
1867fn task_id_from_ref(input: &str) -> String {
1868    if let Some(rest) = input.rsplit('/').next() {
1869        return rest.to_string();
1870    }
1871    input.to_string()
1872}
1873
1874fn resolve_task_id(state: &EcsState, task_ref: &str) -> Result<String, AwsServiceError> {
1875    let id = task_id_from_ref(task_ref);
1876    if state.tasks.contains_key(&id) {
1877        Ok(id)
1878    } else {
1879        Err(task_not_found(task_ref))
1880    }
1881}
1882
1883fn task_to_json(task: &Task) -> Value {
1884    let mut map = serde_json::Map::new();
1885    map.insert("taskArn".into(), json!(task.task_arn));
1886    map.insert("clusterArn".into(), json!(task.cluster_arn));
1887    map.insert("taskDefinitionArn".into(), json!(task.task_definition_arn));
1888    map.insert("lastStatus".into(), json!(task.last_status));
1889    map.insert("desiredStatus".into(), json!(task.desired_status));
1890    map.insert("launchType".into(), json!(task.launch_type));
1891    if let Some(ref v) = task.platform_version {
1892        map.insert("platformVersion".into(), json!(v));
1893    }
1894    if let Some(ref v) = task.cpu {
1895        map.insert("cpu".into(), json!(v));
1896    }
1897    if let Some(ref v) = task.memory {
1898        map.insert("memory".into(), json!(v));
1899    }
1900    map.insert(
1901        "containers".into(),
1902        Value::Array(task.containers.iter().map(container_to_json).collect()),
1903    );
1904    map.insert("overrides".into(), task.overrides.clone());
1905    if let Some(ref v) = task.started_by {
1906        map.insert("startedBy".into(), json!(v));
1907    }
1908    if let Some(ref v) = task.group {
1909        map.insert("group".into(), json!(v));
1910    }
1911    map.insert("connectivity".into(), json!(task.connectivity));
1912    if let Some(ref v) = task.stop_code {
1913        map.insert("stopCode".into(), json!(v));
1914    }
1915    if let Some(ref v) = task.stopped_reason {
1916        map.insert("stoppedReason".into(), json!(v));
1917    }
1918    if let Some(ref v) = task.task_role_arn {
1919        map.insert("taskRoleArn".into(), json!(v));
1920    }
1921    if let Some(ref v) = task.execution_role_arn {
1922        map.insert("executionRoleArn".into(), json!(v));
1923    }
1924    map.insert("createdAt".into(), json!(task.created_at.timestamp()));
1925    if let Some(ts) = task.started_at {
1926        map.insert("startedAt".into(), json!(ts.timestamp()));
1927    }
1928    if let Some(ts) = task.stopping_at {
1929        map.insert("stoppingAt".into(), json!(ts.timestamp()));
1930    }
1931    if let Some(ts) = task.stopped_at {
1932        map.insert("stoppedAt".into(), json!(ts.timestamp()));
1933    }
1934    if let Some(ts) = task.pull_started_at {
1935        map.insert("pullStartedAt".into(), json!(ts.timestamp()));
1936    }
1937    if let Some(ts) = task.pull_stopped_at {
1938        map.insert("pullStoppedAt".into(), json!(ts.timestamp()));
1939    }
1940    if let Some(ts) = task.connectivity_at {
1941        map.insert("connectivityAt".into(), json!(ts.timestamp()));
1942    }
1943    Value::Object(map)
1944}
1945
1946fn container_to_json(container: &Container) -> Value {
1947    let mut map = serde_json::Map::new();
1948    map.insert("containerArn".into(), json!(container.container_arn));
1949    map.insert("taskArn".into(), json!(container.task_arn));
1950    map.insert("name".into(), json!(container.name));
1951    map.insert("image".into(), json!(container.image));
1952    map.insert("lastStatus".into(), json!(container.last_status));
1953    map.insert("essential".into(), json!(container.essential));
1954    if let Some(code) = container.exit_code {
1955        map.insert("exitCode".into(), json!(code));
1956    }
1957    if let Some(ref r) = container.reason {
1958        map.insert("reason".into(), json!(r));
1959    }
1960    if let Some(ref id) = container.runtime_id {
1961        map.insert("runtimeId".into(), json!(id));
1962    }
1963    if let Some(ref v) = container.cpu {
1964        map.insert("cpu".into(), json!(v));
1965    }
1966    if let Some(ref v) = container.memory {
1967        map.insert("memory".into(), json!(v));
1968    }
1969    if let Some(ref v) = container.memory_reservation {
1970        map.insert("memoryReservation".into(), json!(v));
1971    }
1972    map.insert("networkBindings".into(), json!(container.network_bindings));
1973    map.insert(
1974        "networkInterfaces".into(),
1975        json!(container.network_interfaces),
1976    );
1977    if let Some(ref v) = container.health_status {
1978        map.insert("healthStatus".into(), json!(v));
1979    }
1980    Value::Object(map)
1981}
1982
1983// -------- operations: services --------
1984
1985impl EcsService {
1986    fn create_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1987        let body = request.json_body();
1988        let service_name = req_str(&body, "serviceName")?.to_string();
1989        validate_service_name(&service_name)?;
1990        let td_ref = req_str(&body, "taskDefinition")?;
1991        let cluster_ref = opt_str(&body, "cluster");
1992        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1993        let desired_count = body
1994            .get("desiredCount")
1995            .and_then(|v| v.as_i64())
1996            .filter(|n| *n >= 0)
1997            .unwrap_or(1) as i32;
1998        let launch_type = opt_str(&body, "launchType")
1999            .unwrap_or("FARGATE")
2000            .to_string();
2001        let scheduling = opt_str(&body, "schedulingStrategy")
2002            .unwrap_or("REPLICA")
2003            .to_string();
2004        let deployment_controller = body
2005            .get("deploymentController")
2006            .and_then(|v| v.get("type"))
2007            .and_then(|v| v.as_str())
2008            .unwrap_or("ECS")
2009            .to_string();
2010        let deployment_config = body.get("deploymentConfiguration");
2011        let min_healthy = deployment_config
2012            .and_then(|d| d.get("minimumHealthyPercent"))
2013            .and_then(|v| v.as_i64())
2014            .map(|n| n as i32);
2015        let max_percent = deployment_config
2016            .and_then(|d| d.get("maximumPercent"))
2017            .and_then(|v| v.as_i64())
2018            .map(|n| n as i32);
2019        let circuit = deployment_config.and_then(|d| d.get("deploymentCircuitBreaker"));
2020        let circuit_breaker = circuit.map(|c| CircuitBreakerConfig {
2021            enable: c.get("enable").and_then(|v| v.as_bool()).unwrap_or(false),
2022            rollback: c.get("rollback").and_then(|v| v.as_bool()).unwrap_or(false),
2023        });
2024        let tags = parse_tags(&body);
2025        let role_arn = opt_str(&body, "role").map(String::from);
2026        let load_balancers: Vec<Value> = body
2027            .get("loadBalancers")
2028            .and_then(|v| v.as_array())
2029            .cloned()
2030            .unwrap_or_default();
2031        let service_registries: Vec<Value> = body
2032            .get("serviceRegistries")
2033            .and_then(|v| v.as_array())
2034            .cloned()
2035            .unwrap_or_default();
2036        let placement_constraints: Vec<Value> = body
2037            .get("placementConstraints")
2038            .and_then(|v| v.as_array())
2039            .cloned()
2040            .unwrap_or_default();
2041        let placement_strategy: Vec<Value> = body
2042            .get("placementStrategy")
2043            .and_then(|v| v.as_array())
2044            .cloned()
2045            .unwrap_or_default();
2046        let network_configuration = body.get("networkConfiguration").cloned();
2047
2048        let runtime = self.runtime.clone();
2049        let account = request.account_id.clone();
2050        let principal_arn = request
2051            .principal
2052            .as_ref()
2053            .map(|p| p.arn.clone())
2054            .unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
2055
2056        let (service_json, spawn_task_ids) = {
2057            let mut accounts = self.state.write();
2058            let state = accounts.get_or_create(&account);
2059            // Resolve task definition to get family/revision.
2060            let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
2061            let revisions = state
2062                .task_definitions
2063                .get(&family)
2064                .ok_or_else(|| task_definition_not_found(td_ref))?;
2065            let td = match rev {
2066                Some(n) => revisions
2067                    .get(&n)
2068                    .ok_or_else(|| task_definition_not_found(td_ref))?,
2069                None => latest_active_revision(revisions)
2070                    .ok_or_else(|| task_definition_not_found(td_ref))?,
2071            };
2072            let td_arn = td.task_definition_arn.clone();
2073            let td_family = td.family.clone();
2074            let td_revision = td.revision;
2075            let cluster_arn = state
2076                .clusters
2077                .get(&cluster_name)
2078                .map(|c| c.cluster_arn.clone())
2079                .unwrap_or_else(|| state.cluster_arn(&cluster_name));
2080            let service_arn = state.service_arn(&cluster_name, &service_name);
2081            let key = EcsState::service_key(&cluster_name, &service_name);
2082            if let Some(existing) = state.services.get(&key) {
2083                if existing.status != "INACTIVE" {
2084                    return Err(service_already_exists(&service_name));
2085                }
2086            }
2087            let deployment = Deployment {
2088                deployment_id: format!(
2089                    "ecs-svc/{}",
2090                    uuid::Uuid::new_v4().as_u128() & 0xffff_ffff_ffff_ffff
2091                ),
2092                status: "PRIMARY".into(),
2093                task_definition_arn: td_arn.clone(),
2094                desired_count,
2095                pending_count: 0,
2096                running_count: 0,
2097                failed_tasks: 0,
2098                created_at: Utc::now(),
2099                updated_at: Utc::now(),
2100                launch_type: launch_type.clone(),
2101                rollout_state: "IN_PROGRESS".into(),
2102                rollout_state_reason: Some("ECS deployment in progress.".into()),
2103            };
2104            let service = Service {
2105                service_name: service_name.clone(),
2106                service_arn: service_arn.clone(),
2107                cluster_name: cluster_name.clone(),
2108                cluster_arn: cluster_arn.clone(),
2109                task_definition_arn: td_arn,
2110                family: td_family,
2111                revision: td_revision,
2112                desired_count,
2113                running_count: 0,
2114                pending_count: 0,
2115                launch_type: launch_type.clone(),
2116                status: "ACTIVE".into(),
2117                scheduling_strategy: scheduling,
2118                deployment_controller,
2119                minimum_healthy_percent: min_healthy,
2120                maximum_percent: max_percent,
2121                circuit_breaker,
2122                deployments: vec![deployment],
2123                load_balancers,
2124                service_registries,
2125                placement_constraints,
2126                placement_strategy,
2127                network_configuration,
2128                tags: tags.clone(),
2129                created_at: Utc::now(),
2130                created_by: Some(principal_arn.clone()),
2131                role_arn,
2132            };
2133            state.services.insert(key.clone(), service.clone());
2134            if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2135                cluster.active_services_count += 1;
2136            }
2137            state.push_event(crate::state::LifecycleEvent {
2138                at: Utc::now(),
2139                event_type: "ServiceCreated".into(),
2140                task_arn: None,
2141                cluster_arn: Some(cluster_arn),
2142                last_status: Some("ACTIVE".into()),
2143                detail: json!({"serviceArn": service_arn, "desiredCount": desired_count}),
2144            });
2145            let ids =
2146                spawn_service_tasks(state, &service, desired_count, &principal_arn, &launch_type);
2147            (service_to_json(state.services.get(&key).unwrap()), ids)
2148        };
2149
2150        if let Some(rt) = runtime {
2151            for id in spawn_task_ids {
2152                rt.clone().run_task(self.state.clone(), id, account.clone());
2153            }
2154        }
2155
2156        Ok(AwsResponse::ok_json(json!({ "service": service_json })))
2157    }
2158
2159    fn update_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2160        let body = request.json_body();
2161        let service_ref = req_str(&body, "service")?;
2162        let service_name = service_name_from_ref(service_ref);
2163        let cluster_ref = opt_str(&body, "cluster");
2164        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2165        let new_desired = body.get("desiredCount").and_then(|v| v.as_i64());
2166        let new_td_ref = opt_str(&body, "taskDefinition");
2167        let account = request.account_id.clone();
2168        let principal_arn = request
2169            .principal
2170            .as_ref()
2171            .map(|p| p.arn.clone())
2172            .unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
2173        let runtime = self.runtime.clone();
2174
2175        let (service_json, spawn_ids, stop_ids) = {
2176            let mut accounts = self.state.write();
2177            let state = accounts
2178                .get_mut(&account)
2179                .ok_or_else(|| service_not_found(&service_name))?;
2180            let key = EcsState::service_key(&cluster_name, &service_name);
2181            if !state.services.contains_key(&key) {
2182                return Err(service_not_found(&service_name));
2183            }
2184
2185            // Resolve new task definition (may stay on current one).
2186            let (new_td_arn, new_family, new_revision) = if let Some(td_ref) = new_td_ref {
2187                let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
2188                let revisions = state
2189                    .task_definitions
2190                    .get(&family)
2191                    .ok_or_else(|| task_definition_not_found(td_ref))?;
2192                let td = match rev {
2193                    Some(n) => revisions
2194                        .get(&n)
2195                        .ok_or_else(|| task_definition_not_found(td_ref))?,
2196                    None => latest_active_revision(revisions)
2197                        .ok_or_else(|| task_definition_not_found(td_ref))?,
2198                };
2199                (
2200                    Some(td.task_definition_arn.clone()),
2201                    td.family.clone(),
2202                    td.revision,
2203                )
2204            } else {
2205                let svc = state.services.get(&key).unwrap();
2206                (None, svc.family.clone(), svc.revision)
2207            };
2208
2209            let service_cluster_arn;
2210            let launch_type_clone;
2211            let effective_desired;
2212            let old_desired;
2213            let mut old_deployments_drained: Vec<String> = Vec::new();
2214            let mut new_deployment_triggered = false;
2215
2216            {
2217                let svc = state.services.get_mut(&key).unwrap();
2218                old_desired = svc.desired_count;
2219                service_cluster_arn = svc.cluster_arn.clone();
2220                launch_type_clone = svc.launch_type.clone();
2221
2222                if let Some(n) = new_desired {
2223                    let n = n.max(0) as i32;
2224                    svc.desired_count = n;
2225                    if let Some(d) = svc.deployments.iter_mut().find(|d| d.status == "PRIMARY") {
2226                        d.desired_count = n;
2227                        d.updated_at = Utc::now();
2228                    }
2229                }
2230
2231                if let Some(arn) = new_td_arn.clone() {
2232                    // Roll a new PRIMARY deployment; mark the previous one ACTIVE
2233                    // so it's eligible for drain once the new deployment ramps.
2234                    for d in svc.deployments.iter_mut() {
2235                        if d.status == "PRIMARY" {
2236                            d.status = "ACTIVE".into();
2237                            old_deployments_drained.push(d.deployment_id.clone());
2238                        }
2239                    }
2240                    svc.deployments.insert(
2241                        0,
2242                        Deployment {
2243                            deployment_id: format!(
2244                                "ecs-svc/{}",
2245                                uuid::Uuid::new_v4().as_u128() & 0xffff_ffff_ffff_ffff
2246                            ),
2247                            status: "PRIMARY".into(),
2248                            task_definition_arn: arn.clone(),
2249                            desired_count: svc.desired_count,
2250                            pending_count: 0,
2251                            running_count: 0,
2252                            failed_tasks: 0,
2253                            created_at: Utc::now(),
2254                            updated_at: Utc::now(),
2255                            launch_type: svc.launch_type.clone(),
2256                            rollout_state: "IN_PROGRESS".into(),
2257                            rollout_state_reason: Some("ECS deployment in progress.".into()),
2258                        },
2259                    );
2260                    svc.task_definition_arn = arn;
2261                    svc.family = new_family;
2262                    svc.revision = new_revision;
2263                    new_deployment_triggered = true;
2264                }
2265
2266                effective_desired = svc.desired_count;
2267            }
2268
2269            // Compute spawn + stop plan.
2270            let mut spawn: Vec<String> = Vec::new();
2271            let mut stop: Vec<String> = Vec::new();
2272
2273            // Tasks belonging to this service (by startedBy convention):
2274            let service_tag = format!("ecs-svc/{}", service_name);
2275            let current_tasks: Vec<(String, String)> = state
2276                .tasks
2277                .iter()
2278                .filter(|(_, t)| {
2279                    t.started_by.as_deref() == Some(service_tag.as_str())
2280                        && t.cluster_name == cluster_name
2281                        && t.last_status != "STOPPED"
2282                })
2283                .map(|(id, t)| (id.clone(), t.task_definition_arn.clone()))
2284                .collect();
2285
2286            let current_count = current_tasks.len() as i32;
2287            if effective_desired > current_count {
2288                let add = (effective_desired - current_count) as usize;
2289                let svc_snapshot = state.services.get(&key).unwrap().clone();
2290                let mut new_ids = spawn_service_tasks(
2291                    state,
2292                    &svc_snapshot,
2293                    add as i32,
2294                    &principal_arn,
2295                    &launch_type_clone,
2296                );
2297                spawn.append(&mut new_ids);
2298            } else if effective_desired < current_count {
2299                let remove = (current_count - effective_desired) as usize;
2300                for (id, _) in current_tasks.iter().take(remove) {
2301                    stop.push(id.clone());
2302                }
2303            }
2304
2305            // If a new deployment was triggered, also stop tasks still on
2306            // the old task definition so the new deployment can ramp up,
2307            // then spawn replacements — but only enough to hit the
2308            // effective desired count (not `stop.len()`, which conflates
2309            // scale-down drain with TD-drain and would over-spawn).
2310            if new_deployment_triggered {
2311                let new_td_arn_match = state
2312                    .services
2313                    .get(&key)
2314                    .unwrap()
2315                    .task_definition_arn
2316                    .clone();
2317                // Tasks already on the new task definition that we're NOT
2318                // stopping (scale-down may have picked the first N; those
2319                // are skipped here via `stop.contains(id)`).
2320                let kept_on_new_td: i32 = current_tasks
2321                    .iter()
2322                    .filter(|(id, t_arn)| *t_arn == new_td_arn_match && !stop.contains(id))
2323                    .count() as i32;
2324                for (id, t_arn) in &current_tasks {
2325                    if *t_arn != new_td_arn_match && !stop.contains(id) {
2326                        stop.push(id.clone());
2327                    }
2328                }
2329                let already_spawned = spawn.len() as i32;
2330                let need = (effective_desired - kept_on_new_td - already_spawned).max(0);
2331                if need > 0 {
2332                    let svc_snapshot = state.services.get(&key).unwrap().clone();
2333                    let mut more = spawn_service_tasks(
2334                        state,
2335                        &svc_snapshot,
2336                        need,
2337                        &principal_arn,
2338                        &launch_type_clone,
2339                    );
2340                    spawn.append(&mut more);
2341                }
2342            }
2343
2344            state.push_event(crate::state::LifecycleEvent {
2345                at: Utc::now(),
2346                event_type: "ServiceUpdated".into(),
2347                task_arn: None,
2348                cluster_arn: Some(service_cluster_arn),
2349                last_status: Some("ACTIVE".into()),
2350                detail: json!({
2351                    "serviceArn": state.services.get(&key).unwrap().service_arn,
2352                    "desiredCount": effective_desired,
2353                    "previousDesiredCount": old_desired,
2354                    "newDeployment": new_deployment_triggered,
2355                    "drainedDeployments": old_deployments_drained,
2356                }),
2357            });
2358
2359            let svc = state.services.get(&key).unwrap();
2360            (service_to_json(svc), spawn, stop)
2361        };
2362
2363        if let Some(rt) = runtime {
2364            for id in spawn_ids {
2365                rt.clone().run_task(self.state.clone(), id, account.clone());
2366            }
2367            for id in stop_ids {
2368                let rt2 = rt.clone();
2369                let id_clone = id.clone();
2370                tokio::spawn(async move {
2371                    rt2.stop_task(&id_clone, "ECS service scale-down").await;
2372                });
2373                // Flip the task's desired status synchronously so DescribeTasks reflects intent.
2374                let mut accounts = self.state.write();
2375                if let Some(state) = accounts.get_mut(&account) {
2376                    if let Some(task) = state.tasks.get_mut(&id) {
2377                        task.desired_status = "STOPPED".into();
2378                        task.stopping_at = Some(Utc::now());
2379                    }
2380                }
2381            }
2382        }
2383
2384        Ok(AwsResponse::ok_json(json!({ "service": service_json })))
2385    }
2386
2387    async fn delete_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2388        let body = request.json_body();
2389        let service_ref = req_str(&body, "service")?;
2390        let service_name = service_name_from_ref(service_ref);
2391        let cluster_ref = opt_str(&body, "cluster");
2392        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2393        let force = body.get("force").and_then(|v| v.as_bool()).unwrap_or(false);
2394
2395        let (snapshot, task_ids_to_stop) = {
2396            let mut accounts = self.state.write();
2397            let state = accounts
2398                .get_mut(&request.account_id)
2399                .ok_or_else(|| service_not_found(&service_name))?;
2400            let key = EcsState::service_key(&cluster_name, &service_name);
2401            let svc = state
2402                .services
2403                .get_mut(&key)
2404                .ok_or_else(|| service_not_found(&service_name))?;
2405            if !force && svc.desired_count > 0 {
2406                return Err(client_exception(
2407                    "The service cannot be stopped while it is scaled above 0. \
2408                     Either set desiredCount to 0 first, or pass force=true.",
2409                ));
2410            }
2411            svc.desired_count = 0;
2412            svc.status = "DRAINING".into();
2413            let service_tag = format!("ecs-svc/{}", service_name);
2414            let stop_ids: Vec<String> = state
2415                .tasks
2416                .iter()
2417                .filter(|(_, t)| {
2418                    t.started_by.as_deref() == Some(service_tag.as_str())
2419                        && t.cluster_name == cluster_name
2420                        && t.last_status != "STOPPED"
2421                })
2422                .map(|(id, _)| id.clone())
2423                .collect();
2424            if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2425                if cluster.active_services_count > 0 {
2426                    cluster.active_services_count -= 1;
2427                }
2428            }
2429            let svc_snapshot = state.services.get(&key).unwrap().clone();
2430            state.services.remove(&key);
2431            state.push_event(crate::state::LifecycleEvent {
2432                at: Utc::now(),
2433                event_type: "ServiceDeleted".into(),
2434                task_arn: None,
2435                cluster_arn: Some(svc_snapshot.cluster_arn.clone()),
2436                last_status: Some("DRAINING".into()),
2437                detail: json!({"serviceArn": svc_snapshot.service_arn}),
2438            });
2439            (svc_snapshot, stop_ids)
2440        };
2441
2442        if let Some(rt) = &self.runtime {
2443            for id in &task_ids_to_stop {
2444                rt.stop_task(id, "ECS service deletion").await;
2445                let mut accounts = self.state.write();
2446                if let Some(state) = accounts.get_mut(&request.account_id) {
2447                    if let Some(task) = state.tasks.get_mut(id) {
2448                        task.desired_status = "STOPPED".into();
2449                        task.stopping_at = Some(Utc::now());
2450                    }
2451                }
2452            }
2453        }
2454
2455        Ok(AwsResponse::ok_json(
2456            json!({ "service": service_to_json(&snapshot) }),
2457        ))
2458    }
2459
2460    fn describe_services(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2461        let body = request.json_body();
2462        let cluster_ref = opt_str(&body, "cluster");
2463        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2464        let refs: Vec<String> = body
2465            .get("services")
2466            .and_then(|v| v.as_array())
2467            .map(|arr| {
2468                arr.iter()
2469                    .filter_map(|v| v.as_str().map(String::from))
2470                    .collect()
2471            })
2472            .unwrap_or_default();
2473
2474        let account = request.account_id.clone();
2475        let accounts = self.state.read();
2476        let mut found = Vec::new();
2477        let mut failures = Vec::new();
2478        let Some(state) = accounts.get(&account) else {
2479            for r in &refs {
2480                failures.push(json!({"arn": r, "reason": "MISSING"}));
2481            }
2482            return Ok(AwsResponse::ok_json(
2483                json!({"services": found, "failures": failures}),
2484            ));
2485        };
2486        for r in &refs {
2487            let name = service_name_from_ref(r);
2488            let key = EcsState::service_key(&cluster_name, &name);
2489            match state.services.get(&key) {
2490                Some(svc) => {
2491                    let mut v = service_to_json(svc);
2492                    // Update derived running/pending counts from current tasks.
2493                    recompute_service_counts(state, &name, &cluster_name, &mut v);
2494                    found.push(v);
2495                }
2496                None => failures.push(json!({"arn": r, "reason": "MISSING"})),
2497            }
2498        }
2499        Ok(AwsResponse::ok_json(json!({
2500            "services": found,
2501            "failures": failures,
2502        })))
2503    }
2504
2505    fn list_services(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2506        let body = request.json_body();
2507        let cluster_ref = opt_str(&body, "cluster");
2508        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2509        let launch_type = opt_str(&body, "launchType");
2510        let scheduling = opt_str(&body, "schedulingStrategy");
2511        let max_results = body
2512            .get("maxResults")
2513            .and_then(|v| v.as_i64())
2514            .filter(|n| (1..=100).contains(n))
2515            .map(|n| n as usize)
2516            .unwrap_or(100);
2517        let next_token = opt_str(&body, "nextToken").unwrap_or("");
2518
2519        let account = request.account_id.clone();
2520        let accounts = self.state.read();
2521        let mut arns: Vec<String> = match accounts.get(&account) {
2522            Some(state) => state
2523                .services
2524                .values()
2525                .filter(|s| s.cluster_name == cluster_name)
2526                .filter(|s| launch_type.is_none_or(|lt| s.launch_type == lt))
2527                .filter(|s| scheduling.is_none_or(|sc| s.scheduling_strategy == sc))
2528                .map(|s| s.service_arn.clone())
2529                .collect(),
2530            None => Vec::new(),
2531        };
2532        arns.sort();
2533        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
2534        let end = (start + max_results).min(arns.len());
2535        let page = arns[start..end].to_vec();
2536        let mut out = json!({"serviceArns": page});
2537        if end < arns.len() {
2538            out.as_object_mut()
2539                .unwrap()
2540                .insert("nextToken".into(), json!(end.to_string()));
2541        }
2542        Ok(AwsResponse::ok_json(out))
2543    }
2544
2545    fn list_services_by_namespace(
2546        &self,
2547        request: &AwsRequest,
2548    ) -> Result<AwsResponse, AwsServiceError> {
2549        // fakecloud doesn't model Cloud Map namespaces yet — return all
2550        // services attached to services with registries pointing at the
2551        // given namespace ARN. Filter is loose: treat the namespace as a
2552        // hint and return every service when none match to mirror AWS's
2553        // "loose match when ambiguous" response shape.
2554        let body = request.json_body();
2555        let namespace = req_str(&body, "namespace")?.to_string();
2556        let account = request.account_id.clone();
2557        let accounts = self.state.read();
2558        let mut arns: Vec<String> = match accounts.get(&account) {
2559            Some(state) => state
2560                .services
2561                .values()
2562                .filter(|s| {
2563                    s.service_registries.iter().any(|r| {
2564                        r.get("registryArn")
2565                            .and_then(|v| v.as_str())
2566                            .is_some_and(|arn| arn.contains(&namespace))
2567                    })
2568                })
2569                .map(|s| s.service_arn.clone())
2570                .collect(),
2571            None => Vec::new(),
2572        };
2573        arns.sort();
2574        Ok(AwsResponse::ok_json(json!({"serviceArns": arns})))
2575    }
2576}
2577
2578fn validate_service_name(name: &str) -> Result<(), AwsServiceError> {
2579    if name.is_empty() || name.len() > 255 {
2580        return Err(invalid_parameter("Service name must be 1-255 characters"));
2581    }
2582    let ok = name
2583        .chars()
2584        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-');
2585    if !ok {
2586        return Err(invalid_parameter(
2587            "Service name may only contain letters, numbers, hyphens, and underscores",
2588        ));
2589    }
2590    Ok(())
2591}
2592
2593fn service_name_from_ref(input: &str) -> String {
2594    if let Some(rest) = input.rsplit('/').next() {
2595        return rest.to_string();
2596    }
2597    input.to_string()
2598}
2599
2600fn service_not_found(name: &str) -> AwsServiceError {
2601    AwsServiceError::aws_error(
2602        StatusCode::BAD_REQUEST,
2603        "ServiceNotFoundException",
2604        format!("The service could not be found: {name}"),
2605    )
2606}
2607
2608fn service_already_exists(name: &str) -> AwsServiceError {
2609    AwsServiceError::aws_error(
2610        StatusCode::BAD_REQUEST,
2611        "ServiceNotActiveException",
2612        format!("The service {name} already exists"),
2613    )
2614}
2615
2616/// Spawn N tasks for a service by cloning the task-definition containers
2617/// and inserting `Task` rows in the shared state. The task IDs are
2618/// returned so the caller can hand them to `EcsRuntime::run_task` after
2619/// releasing the state write lock.
2620fn spawn_service_tasks(
2621    state: &mut EcsState,
2622    service: &Service,
2623    count: i32,
2624    principal_arn: &str,
2625    launch_type: &str,
2626) -> Vec<String> {
2627    if count <= 0 {
2628        return Vec::new();
2629    }
2630    let Some(revisions) = state.task_definitions.get(&service.family) else {
2631        return Vec::new();
2632    };
2633    let Some(td) = revisions.get(&service.revision) else {
2634        return Vec::new();
2635    };
2636    let container_defs = td.container_definitions.clone();
2637    let cpu = td.cpu.clone();
2638    let memory = td.memory.clone();
2639    let task_role = td.task_role_arn.clone();
2640    let exec_role = td.execution_role_arn.clone();
2641    let cluster_name = service.cluster_name.clone();
2642    let cluster_arn = service.cluster_arn.clone();
2643    let td_arn = service.task_definition_arn.clone();
2644    let family = service.family.clone();
2645    let revision = service.revision;
2646    let service_tag = format!("ecs-svc/{}", service.service_name);
2647
2648    let mut ids = Vec::with_capacity(count as usize);
2649    for _ in 0..count {
2650        let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
2651        let task_arn = state.task_arn(&cluster_name, &task_id);
2652        let containers: Vec<Container> = container_defs
2653            .iter()
2654            .map(|def| Container {
2655                container_arn: format!(
2656                    "arn:aws:ecs:{}:{}:container/{}/{}/{}",
2657                    state.region,
2658                    state.account_id,
2659                    cluster_name,
2660                    task_id,
2661                    def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
2662                ),
2663                name: def
2664                    .get("name")
2665                    .and_then(|v| v.as_str())
2666                    .unwrap_or("app")
2667                    .to_string(),
2668                image: def
2669                    .get("image")
2670                    .and_then(|v| v.as_str())
2671                    .unwrap_or("")
2672                    .to_string(),
2673                task_arn: task_arn.clone(),
2674                last_status: "PENDING".into(),
2675                exit_code: None,
2676                reason: None,
2677                runtime_id: None,
2678                essential: def
2679                    .get("essential")
2680                    .and_then(|v| v.as_bool())
2681                    .unwrap_or(true),
2682                cpu: def
2683                    .get("cpu")
2684                    .and_then(|v| v.as_i64())
2685                    .map(|n| n.to_string()),
2686                memory: def
2687                    .get("memory")
2688                    .and_then(|v| v.as_i64())
2689                    .map(|n| n.to_string()),
2690                memory_reservation: def
2691                    .get("memoryReservation")
2692                    .and_then(|v| v.as_i64())
2693                    .map(|n| n.to_string()),
2694                network_bindings: Vec::new(),
2695                network_interfaces: Vec::new(),
2696                health_status: Some("UNKNOWN".into()),
2697                managed_agents: None,
2698            })
2699            .collect();
2700        let awslogs = container_defs.iter().find_map(|def| {
2701            let name = def.get("name").and_then(|v| v.as_str())?.to_string();
2702            let log_cfg = def.get("logConfiguration")?;
2703            if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
2704                return None;
2705            }
2706            let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
2707            Some(AwsLogsConfig {
2708                group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
2709                stream_prefix: opts
2710                    .get("awslogs-stream-prefix")
2711                    .and_then(|v| v.as_str())
2712                    .map(String::from),
2713                region: opts
2714                    .get("awslogs-region")
2715                    .and_then(|v| v.as_str())
2716                    .unwrap_or(&state.region)
2717                    .to_string(),
2718                container_name: name,
2719            })
2720        });
2721        let task = Task {
2722            task_arn: task_arn.clone(),
2723            task_id: task_id.clone(),
2724            cluster_arn: cluster_arn.clone(),
2725            cluster_name: cluster_name.clone(),
2726            task_definition_arn: td_arn.clone(),
2727            family: family.clone(),
2728            revision,
2729            last_status: "PENDING".into(),
2730            desired_status: "RUNNING".into(),
2731            launch_type: launch_type.into(),
2732            platform_version: Some("1.4.0".into()),
2733            cpu: cpu.clone(),
2734            memory: memory.clone(),
2735            containers,
2736            overrides: json!({}),
2737            started_by: Some(service_tag.clone()),
2738            group: Some(format!("service:{}", service.service_name)),
2739            connectivity: "CONNECTING".into(),
2740            stop_code: None,
2741            stopped_reason: None,
2742            created_at: Utc::now(),
2743            started_at: None,
2744            stopping_at: None,
2745            stopped_at: None,
2746            pull_started_at: None,
2747            pull_stopped_at: None,
2748            connectivity_at: None,
2749            started_by_ref_id: None,
2750            execution_role_arn: exec_role.clone(),
2751            task_role_arn: task_role.clone(),
2752            tags: Vec::new(),
2753            awslogs,
2754            captured_logs: String::new(),
2755            protection: None,
2756        };
2757        state.tasks.insert(task_id.clone(), task);
2758        if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2759            cluster.pending_tasks_count += 1;
2760        }
2761        ids.push(task_id);
2762    }
2763    let _ = principal_arn;
2764    ids
2765}
2766
2767fn recompute_service_counts(
2768    state: &EcsState,
2769    service_name: &str,
2770    cluster_name: &str,
2771    service_json: &mut Value,
2772) {
2773    let service_tag = format!("ecs-svc/{}", service_name);
2774    let mut running = 0i32;
2775    let mut pending = 0i32;
2776    for t in state.tasks.values() {
2777        if t.started_by.as_deref() == Some(service_tag.as_str()) && t.cluster_name == cluster_name {
2778            match t.last_status.as_str() {
2779                "RUNNING" => running += 1,
2780                "PENDING" | "PROVISIONING" => pending += 1,
2781                _ => {}
2782            }
2783        }
2784    }
2785    if let Some(map) = service_json.as_object_mut() {
2786        map.insert("runningCount".into(), json!(running));
2787        map.insert("pendingCount".into(), json!(pending));
2788    }
2789}
2790
2791fn service_to_json(svc: &Service) -> Value {
2792    let mut map = serde_json::Map::new();
2793    map.insert("serviceArn".into(), json!(svc.service_arn));
2794    map.insert("serviceName".into(), json!(svc.service_name));
2795    map.insert("clusterArn".into(), json!(svc.cluster_arn));
2796    map.insert("status".into(), json!(svc.status));
2797    map.insert("desiredCount".into(), json!(svc.desired_count));
2798    map.insert("runningCount".into(), json!(svc.running_count));
2799    map.insert("pendingCount".into(), json!(svc.pending_count));
2800    map.insert("launchType".into(), json!(svc.launch_type));
2801    map.insert("schedulingStrategy".into(), json!(svc.scheduling_strategy));
2802    map.insert("taskDefinition".into(), json!(svc.task_definition_arn));
2803    map.insert(
2804        "deploymentController".into(),
2805        json!({"type": svc.deployment_controller}),
2806    );
2807    let mut deployment_cfg = serde_json::Map::new();
2808    if let Some(n) = svc.minimum_healthy_percent {
2809        deployment_cfg.insert("minimumHealthyPercent".into(), json!(n));
2810    }
2811    if let Some(n) = svc.maximum_percent {
2812        deployment_cfg.insert("maximumPercent".into(), json!(n));
2813    }
2814    if let Some(ref cb) = svc.circuit_breaker {
2815        deployment_cfg.insert(
2816            "deploymentCircuitBreaker".into(),
2817            json!({"enable": cb.enable, "rollback": cb.rollback}),
2818        );
2819    }
2820    if !deployment_cfg.is_empty() {
2821        map.insert(
2822            "deploymentConfiguration".into(),
2823            Value::Object(deployment_cfg),
2824        );
2825    }
2826    map.insert(
2827        "deployments".into(),
2828        Value::Array(svc.deployments.iter().map(deployment_to_json).collect()),
2829    );
2830    map.insert(
2831        "loadBalancers".into(),
2832        Value::Array(svc.load_balancers.clone()),
2833    );
2834    map.insert(
2835        "serviceRegistries".into(),
2836        Value::Array(svc.service_registries.clone()),
2837    );
2838    map.insert(
2839        "placementConstraints".into(),
2840        Value::Array(svc.placement_constraints.clone()),
2841    );
2842    map.insert(
2843        "placementStrategy".into(),
2844        Value::Array(svc.placement_strategy.clone()),
2845    );
2846    if let Some(ref v) = svc.network_configuration {
2847        map.insert("networkConfiguration".into(), v.clone());
2848    }
2849    if let Some(ref v) = svc.role_arn {
2850        map.insert("roleArn".into(), json!(v));
2851    }
2852    if let Some(ref v) = svc.created_by {
2853        map.insert("createdBy".into(), json!(v));
2854    }
2855    map.insert("createdAt".into(), json!(svc.created_at.timestamp()));
2856    Value::Object(map)
2857}
2858
2859fn deployment_to_json(d: &Deployment) -> Value {
2860    json!({
2861        "id": d.deployment_id,
2862        "status": d.status,
2863        "taskDefinition": d.task_definition_arn,
2864        "desiredCount": d.desired_count,
2865        "pendingCount": d.pending_count,
2866        "runningCount": d.running_count,
2867        "failedTasks": d.failed_tasks,
2868        "createdAt": d.created_at.timestamp(),
2869        "updatedAt": d.updated_at.timestamp(),
2870        "launchType": d.launch_type,
2871        "rolloutState": d.rollout_state,
2872        "rolloutStateReason": d.rollout_state_reason,
2873    })
2874}
2875
2876// -------- operations: container instances, attributes, capacity providers, task sets, task protection, ExecuteCommand, agent surface --------
2877
2878impl EcsService {
2879    fn register_container_instance(
2880        &self,
2881        request: &AwsRequest,
2882    ) -> Result<AwsResponse, AwsServiceError> {
2883        let body = request.json_body();
2884        let cluster_ref = opt_str(&body, "cluster");
2885        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2886        let ec2_id = opt_str(&body, "instanceIdentityDocument")
2887            .and_then(|s| serde_json::from_str::<Value>(s).ok())
2888            .and_then(|v| {
2889                v.get("instanceId")
2890                    .and_then(|x| x.as_str())
2891                    .map(String::from)
2892            });
2893        let tags = parse_tags(&body);
2894
2895        let account = request.account_id.clone();
2896        let mut accounts = self.state.write();
2897        let state = accounts.get_or_create(&account);
2898        let cluster_arn = state
2899            .clusters
2900            .get(&cluster_name)
2901            .map(|c| c.cluster_arn.clone())
2902            .unwrap_or_else(|| state.cluster_arn(&cluster_name));
2903        let uuid = uuid::Uuid::new_v4().to_string();
2904        let ci_arn = state.container_instance_arn(&cluster_name, &uuid);
2905        let key = format!("{}/{}", cluster_name, uuid);
2906        let ci = ContainerInstance {
2907            container_instance_arn: ci_arn.clone(),
2908            ec2_instance_id: ec2_id,
2909            cluster_name: cluster_name.clone(),
2910            cluster_arn,
2911            status: "ACTIVE".into(),
2912            version: 1,
2913            version_info: body.get("versionInfo").cloned(),
2914            agent_connected: true,
2915            agent_update_status: None,
2916            remaining_resources: body
2917                .get("totalResources")
2918                .and_then(|v| v.as_array())
2919                .cloned()
2920                .unwrap_or_default(),
2921            registered_resources: body
2922                .get("totalResources")
2923                .and_then(|v| v.as_array())
2924                .cloned()
2925                .unwrap_or_default(),
2926            running_tasks_count: 0,
2927            pending_tasks_count: 0,
2928            registered_at: Utc::now(),
2929            attributes: body
2930                .get("attributes")
2931                .and_then(|v| v.as_array())
2932                .map(|arr| {
2933                    arr.iter()
2934                        .filter_map(|a| {
2935                            let name = a.get("name").and_then(|v| v.as_str())?;
2936                            Some(AttributeRef {
2937                                name: name.to_string(),
2938                                value: a.get("value").and_then(|v| v.as_str()).map(String::from),
2939                                target_type: a
2940                                    .get("targetType")
2941                                    .and_then(|v| v.as_str())
2942                                    .map(String::from),
2943                                target_id: a
2944                                    .get("targetId")
2945                                    .and_then(|v| v.as_str())
2946                                    .map(String::from),
2947                            })
2948                        })
2949                        .collect()
2950                })
2951                .unwrap_or_default(),
2952            tags,
2953            capacity_provider_name: None,
2954            health_status: None,
2955        };
2956        state.container_instances.insert(key, ci.clone());
2957        if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2958            cluster.registered_container_instances_count += 1;
2959        }
2960        Ok(AwsResponse::ok_json(json!({
2961            "containerInstance": container_instance_to_json(&ci),
2962        })))
2963    }
2964
2965    fn deregister_container_instance(
2966        &self,
2967        request: &AwsRequest,
2968    ) -> Result<AwsResponse, AwsServiceError> {
2969        let body = request.json_body();
2970        let ci_ref = req_str(&body, "containerInstance")?.to_string();
2971        let cluster_ref = opt_str(&body, "cluster");
2972        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2973        let id = container_instance_id_from_ref(&ci_ref);
2974        let key = format!("{}/{}", cluster_name, id);
2975
2976        let account = request.account_id.clone();
2977        let mut accounts = self.state.write();
2978        let state = accounts
2979            .get_mut(&account)
2980            .ok_or_else(|| container_instance_not_found(&ci_ref))?;
2981        let mut ci = state
2982            .container_instances
2983            .remove(&key)
2984            .ok_or_else(|| container_instance_not_found(&ci_ref))?;
2985        ci.status = "INACTIVE".into();
2986        if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2987            if cluster.registered_container_instances_count > 0 {
2988                cluster.registered_container_instances_count -= 1;
2989            }
2990        }
2991        Ok(AwsResponse::ok_json(json!({
2992            "containerInstance": container_instance_to_json(&ci),
2993        })))
2994    }
2995
2996    fn describe_container_instances(
2997        &self,
2998        request: &AwsRequest,
2999    ) -> Result<AwsResponse, AwsServiceError> {
3000        let body = request.json_body();
3001        let cluster_ref = opt_str(&body, "cluster");
3002        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3003        let refs: Vec<String> = body
3004            .get("containerInstances")
3005            .and_then(|v| v.as_array())
3006            .map(|arr| {
3007                arr.iter()
3008                    .filter_map(|v| v.as_str().map(String::from))
3009                    .collect()
3010            })
3011            .unwrap_or_default();
3012
3013        let accounts = self.state.read();
3014        let mut found = Vec::new();
3015        let mut failures = Vec::new();
3016        if let Some(state) = accounts.get(&request.account_id) {
3017            for r in &refs {
3018                let id = container_instance_id_from_ref(r);
3019                let key = format!("{}/{}", cluster_name, id);
3020                match state.container_instances.get(&key) {
3021                    Some(ci) => found.push(container_instance_to_json(ci)),
3022                    None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3023                }
3024            }
3025        } else {
3026            for r in &refs {
3027                failures.push(json!({"arn": r, "reason": "MISSING"}));
3028            }
3029        }
3030        Ok(AwsResponse::ok_json(json!({
3031            "containerInstances": found,
3032            "failures": failures,
3033        })))
3034    }
3035
3036    fn list_container_instances(
3037        &self,
3038        request: &AwsRequest,
3039    ) -> Result<AwsResponse, AwsServiceError> {
3040        let body = request.json_body();
3041        let cluster_ref = opt_str(&body, "cluster");
3042        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3043        let status_filter = opt_str(&body, "status");
3044        let max_results = body
3045            .get("maxResults")
3046            .and_then(|v| v.as_i64())
3047            .filter(|n| (1..=100).contains(n))
3048            .map(|n| n as usize)
3049            .unwrap_or(100);
3050        let next_token = opt_str(&body, "nextToken").unwrap_or("");
3051
3052        let accounts = self.state.read();
3053        let mut arns: Vec<String> = match accounts.get(&request.account_id) {
3054            Some(state) => state
3055                .container_instances
3056                .values()
3057                .filter(|ci| ci.cluster_name == cluster_name)
3058                .filter(|ci| status_filter.is_none_or(|s| ci.status == s))
3059                .map(|ci| ci.container_instance_arn.clone())
3060                .collect(),
3061            None => Vec::new(),
3062        };
3063        arns.sort();
3064        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
3065        let end = (start + max_results).min(arns.len());
3066        let page = arns[start..end].to_vec();
3067        let mut out = json!({"containerInstanceArns": page});
3068        if end < arns.len() {
3069            out.as_object_mut()
3070                .unwrap()
3071                .insert("nextToken".into(), json!(end.to_string()));
3072        }
3073        Ok(AwsResponse::ok_json(out))
3074    }
3075
3076    fn update_container_agent(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3077        let body = request.json_body();
3078        let ci_ref = req_str(&body, "containerInstance")?.to_string();
3079        let cluster_ref = opt_str(&body, "cluster");
3080        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3081        let id = container_instance_id_from_ref(&ci_ref);
3082        let key = format!("{}/{}", cluster_name, id);
3083        let mut accounts = self.state.write();
3084        let state = accounts
3085            .get_mut(&request.account_id)
3086            .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3087        let ci = state
3088            .container_instances
3089            .get_mut(&key)
3090            .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3091        ci.agent_update_status = Some("UPDATED".into());
3092        Ok(AwsResponse::ok_json(json!({
3093            "containerInstance": container_instance_to_json(ci),
3094        })))
3095    }
3096
3097    fn update_container_instances_state(
3098        &self,
3099        request: &AwsRequest,
3100    ) -> Result<AwsResponse, AwsServiceError> {
3101        let body = request.json_body();
3102        let status = req_str(&body, "status")?.to_string();
3103        let cluster_ref = opt_str(&body, "cluster");
3104        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3105        let refs: Vec<String> = body
3106            .get("containerInstances")
3107            .and_then(|v| v.as_array())
3108            .map(|arr| {
3109                arr.iter()
3110                    .filter_map(|v| v.as_str().map(String::from))
3111                    .collect()
3112            })
3113            .unwrap_or_default();
3114        let mut accounts = self.state.write();
3115        let state = accounts
3116            .get_mut(&request.account_id)
3117            .ok_or_else(|| client_exception("account not found"))?;
3118        let mut found = Vec::new();
3119        let mut failures = Vec::new();
3120        for r in &refs {
3121            let id = container_instance_id_from_ref(r);
3122            let key = format!("{}/{}", cluster_name, id);
3123            match state.container_instances.get_mut(&key) {
3124                Some(ci) => {
3125                    ci.status = status.clone();
3126                    found.push(container_instance_to_json(ci));
3127                }
3128                None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3129            }
3130        }
3131        Ok(AwsResponse::ok_json(json!({
3132            "containerInstances": found,
3133            "failures": failures,
3134        })))
3135    }
3136
3137    fn put_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3138        let body = request.json_body();
3139        let cluster_ref = opt_str(&body, "cluster");
3140        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3141        let attrs = body
3142            .get("attributes")
3143            .and_then(|v| v.as_array())
3144            .cloned()
3145            .unwrap_or_default();
3146
3147        let mut accounts = self.state.write();
3148        let state = accounts.get_or_create(&request.account_id);
3149        let mut stored = Vec::new();
3150        for a in &attrs {
3151            let Some(name) = a.get("name").and_then(|v| v.as_str()) else {
3152                continue;
3153            };
3154            let target_type = a
3155                .get("targetType")
3156                .and_then(|v| v.as_str())
3157                .unwrap_or("container-instance")
3158                .to_string();
3159            let target_id = a
3160                .get("targetId")
3161                .and_then(|v| v.as_str())
3162                .unwrap_or("")
3163                .to_string();
3164            let value = a.get("value").and_then(|v| v.as_str()).map(String::from);
3165            let key = format!("{}/{}/{}", cluster_name, target_id, name);
3166            let attr = Attribute {
3167                cluster_name: cluster_name.clone(),
3168                target_type: target_type.clone(),
3169                target_id: target_id.clone(),
3170                name: name.to_string(),
3171                value: value.clone(),
3172            };
3173            state.attributes.insert(key, attr);
3174            stored.push(json!({
3175                "name": name,
3176                "value": value,
3177                "targetType": target_type,
3178                "targetId": target_id,
3179            }));
3180        }
3181        Ok(AwsResponse::ok_json(json!({"attributes": stored})))
3182    }
3183
3184    fn delete_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3185        let body = request.json_body();
3186        let cluster_ref = opt_str(&body, "cluster");
3187        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3188        let attrs = body
3189            .get("attributes")
3190            .and_then(|v| v.as_array())
3191            .cloned()
3192            .unwrap_or_default();
3193        let mut accounts = self.state.write();
3194        let state = accounts.get_or_create(&request.account_id);
3195        let mut deleted = Vec::new();
3196        for a in &attrs {
3197            let Some(name) = a.get("name").and_then(|v| v.as_str()) else {
3198                continue;
3199            };
3200            let target_id = a.get("targetId").and_then(|v| v.as_str()).unwrap_or("");
3201            let key = format!("{}/{}/{}", cluster_name, target_id, name);
3202            if let Some(attr) = state.attributes.remove(&key) {
3203                deleted.push(json!({
3204                    "name": attr.name,
3205                    "value": attr.value,
3206                    "targetType": attr.target_type,
3207                    "targetId": attr.target_id,
3208                }));
3209            }
3210        }
3211        Ok(AwsResponse::ok_json(json!({"attributes": deleted})))
3212    }
3213
3214    fn list_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3215        let body = request.json_body();
3216        let cluster_ref = opt_str(&body, "cluster");
3217        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3218        let target_type = req_str(&body, "targetType")?.to_string();
3219        let attr_name = opt_str(&body, "attributeName");
3220        let attr_value = opt_str(&body, "attributeValue");
3221
3222        let accounts = self.state.read();
3223        let attrs: Vec<Value> = match accounts.get(&request.account_id) {
3224            Some(state) => state
3225                .attributes
3226                .values()
3227                .filter(|a| a.cluster_name == cluster_name)
3228                .filter(|a| a.target_type == target_type)
3229                .filter(|a| attr_name.is_none_or(|n| a.name == n))
3230                .filter(|a| attr_value.is_none_or(|v| a.value.as_deref() == Some(v)))
3231                .map(|a| {
3232                    json!({
3233                        "name": a.name,
3234                        "value": a.value,
3235                        "targetType": a.target_type,
3236                        "targetId": a.target_id,
3237                    })
3238                })
3239                .collect(),
3240            None => Vec::new(),
3241        };
3242        Ok(AwsResponse::ok_json(json!({"attributes": attrs})))
3243    }
3244
3245    fn create_capacity_provider(
3246        &self,
3247        request: &AwsRequest,
3248    ) -> Result<AwsResponse, AwsServiceError> {
3249        let body = request.json_body();
3250        let name = req_str(&body, "name")?.to_string();
3251        if name.starts_with("aws") || name.starts_with("ecs") {
3252            return Err(invalid_parameter(format!(
3253                "Capacity provider name cannot begin with 'aws' or 'ecs': {name}"
3254            )));
3255        }
3256        let auto_scaling_group_provider = body.get("autoScalingGroupProvider").cloned();
3257        let tags = parse_tags(&body);
3258
3259        let mut accounts = self.state.write();
3260        let state = accounts.get_or_create(&request.account_id);
3261        if state.capacity_providers.contains_key(&name) {
3262            return Err(client_exception(format!(
3263                "Capacity provider already exists: {name}"
3264            )));
3265        }
3266        let arn = format!(
3267            "arn:aws:ecs:{}:{}:capacity-provider/{}",
3268            state.region, state.account_id, name
3269        );
3270        let cp = CapacityProvider {
3271            name: name.clone(),
3272            arn,
3273            status: "ACTIVE".into(),
3274            auto_scaling_group_provider,
3275            update_status: None,
3276            update_status_reason: None,
3277            created_at: Utc::now(),
3278            tags,
3279        };
3280        state.capacity_providers.insert(name.clone(), cp.clone());
3281        Ok(AwsResponse::ok_json(json!({
3282            "capacityProvider": capacity_provider_to_json(&cp),
3283        })))
3284    }
3285
3286    fn delete_capacity_provider(
3287        &self,
3288        request: &AwsRequest,
3289    ) -> Result<AwsResponse, AwsServiceError> {
3290        let body = request.json_body();
3291        let input = req_str(&body, "capacityProvider")?.to_string();
3292        let name = capacity_provider_name_from_ref(&input);
3293        let mut accounts = self.state.write();
3294        let state = accounts
3295            .get_mut(&request.account_id)
3296            .ok_or_else(|| capacity_provider_not_found(&name))?;
3297        let mut cp = state
3298            .capacity_providers
3299            .remove(&name)
3300            .ok_or_else(|| capacity_provider_not_found(&name))?;
3301        cp.status = "INACTIVE".into();
3302        Ok(AwsResponse::ok_json(json!({
3303            "capacityProvider": capacity_provider_to_json(&cp),
3304        })))
3305    }
3306
3307    fn describe_capacity_providers(
3308        &self,
3309        request: &AwsRequest,
3310    ) -> Result<AwsResponse, AwsServiceError> {
3311        let body = request.json_body();
3312        let names: Vec<String> = body
3313            .get("capacityProviders")
3314            .and_then(|v| v.as_array())
3315            .map(|arr| {
3316                arr.iter()
3317                    .filter_map(|v| v.as_str().map(capacity_provider_name_from_ref))
3318                    .collect()
3319            })
3320            .unwrap_or_default();
3321        let accounts = self.state.read();
3322        let mut found = Vec::new();
3323        let mut failures = Vec::new();
3324        if let Some(state) = accounts.get(&request.account_id) {
3325            if names.is_empty() {
3326                for cp in state.capacity_providers.values() {
3327                    found.push(capacity_provider_to_json(cp));
3328                }
3329            } else {
3330                for n in &names {
3331                    match state.capacity_providers.get(n) {
3332                        Some(cp) => found.push(capacity_provider_to_json(cp)),
3333                        None => failures.push(json!({"arn": n, "reason": "MISSING"})),
3334                    }
3335                }
3336            }
3337        }
3338        Ok(AwsResponse::ok_json(json!({
3339            "capacityProviders": found,
3340            "failures": failures,
3341        })))
3342    }
3343
3344    fn update_capacity_provider(
3345        &self,
3346        request: &AwsRequest,
3347    ) -> Result<AwsResponse, AwsServiceError> {
3348        let body = request.json_body();
3349        let input = req_str(&body, "name")?.to_string();
3350        let name = capacity_provider_name_from_ref(&input);
3351        let asg = body.get("autoScalingGroupProvider").cloned();
3352        let mut accounts = self.state.write();
3353        let state = accounts
3354            .get_mut(&request.account_id)
3355            .ok_or_else(|| capacity_provider_not_found(&name))?;
3356        let cp = state
3357            .capacity_providers
3358            .get_mut(&name)
3359            .ok_or_else(|| capacity_provider_not_found(&name))?;
3360        if let Some(v) = asg {
3361            cp.auto_scaling_group_provider = Some(v);
3362        }
3363        cp.update_status = Some("UPDATE_COMPLETE".into());
3364        Ok(AwsResponse::ok_json(json!({
3365            "capacityProvider": capacity_provider_to_json(cp),
3366        })))
3367    }
3368
3369    fn get_task_protection(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3370        let body = request.json_body();
3371        let refs: Vec<String> = body
3372            .get("tasks")
3373            .and_then(|v| v.as_array())
3374            .map(|arr| {
3375                arr.iter()
3376                    .filter_map(|v| v.as_str().map(String::from))
3377                    .collect()
3378            })
3379            .unwrap_or_default();
3380        let accounts = self.state.read();
3381        let mut protections = Vec::new();
3382        let mut failures = Vec::new();
3383        if let Some(state) = accounts.get(&request.account_id) {
3384            for r in &refs {
3385                let id = task_id_from_ref(r);
3386                match state.tasks.get(&id) {
3387                    Some(t) => protections.push(task_protection_json(t)),
3388                    None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3389                }
3390            }
3391        }
3392        Ok(AwsResponse::ok_json(json!({
3393            "protectedTasks": protections,
3394            "failures": failures,
3395        })))
3396    }
3397
3398    fn update_task_protection(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3399        let body = request.json_body();
3400        let refs: Vec<String> = body
3401            .get("tasks")
3402            .and_then(|v| v.as_array())
3403            .map(|arr| {
3404                arr.iter()
3405                    .filter_map(|v| v.as_str().map(String::from))
3406                    .collect()
3407            })
3408            .unwrap_or_default();
3409        let protect = body
3410            .get("protectionEnabled")
3411            .and_then(|v| v.as_bool())
3412            .unwrap_or(false);
3413        let expires_in_minutes = body
3414            .get("expiresInMinutes")
3415            .and_then(|v| v.as_i64())
3416            .unwrap_or(2880);
3417        let expiration = if protect {
3418            Some(Utc::now() + chrono::Duration::minutes(expires_in_minutes))
3419        } else {
3420            None
3421        };
3422
3423        let mut accounts = self.state.write();
3424        let state = accounts
3425            .get_mut(&request.account_id)
3426            .ok_or_else(|| client_exception("account not found"))?;
3427        let mut protections = Vec::new();
3428        let mut failures = Vec::new();
3429        for r in &refs {
3430            let id = task_id_from_ref(r);
3431            match state.tasks.get_mut(&id) {
3432                Some(t) => {
3433                    t.protection = Some(crate::state::TaskProtection {
3434                        enabled: protect,
3435                        expiration,
3436                    });
3437                    protections.push(task_protection_json(t));
3438                }
3439                None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3440            }
3441        }
3442        Ok(AwsResponse::ok_json(json!({
3443            "protectedTasks": protections,
3444            "failures": failures,
3445        })))
3446    }
3447
3448    fn create_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3449        let body = request.json_body();
3450        let service_ref = req_str(&body, "service")?;
3451        let service_name = service_name_from_ref(service_ref);
3452        let cluster_ref = opt_str(&body, "cluster");
3453        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3454        let task_definition = req_str(&body, "taskDefinition")?.to_string();
3455        let external_id = opt_str(&body, "externalId").map(String::from);
3456        let launch_type = opt_str(&body, "launchType").map(String::from);
3457        let platform_version = opt_str(&body, "platformVersion").map(String::from);
3458        let scale = body.get("scale").cloned();
3459        let tags = parse_tags(&body);
3460        let load_balancers = body
3461            .get("loadBalancers")
3462            .and_then(|v| v.as_array())
3463            .cloned()
3464            .unwrap_or_default();
3465        let service_registries = body
3466            .get("serviceRegistries")
3467            .and_then(|v| v.as_array())
3468            .cloned()
3469            .unwrap_or_default();
3470        let capacity_provider_strategy = body
3471            .get("capacityProviderStrategy")
3472            .and_then(|v| v.as_array())
3473            .cloned()
3474            .unwrap_or_default();
3475
3476        let mut accounts = self.state.write();
3477        let state = accounts
3478            .get_mut(&request.account_id)
3479            .ok_or_else(|| service_not_found(&service_name))?;
3480        let service_key = EcsState::service_key(&cluster_name, &service_name);
3481        let svc = state
3482            .services
3483            .get(&service_key)
3484            .ok_or_else(|| service_not_found(&service_name))?;
3485        if svc.deployment_controller != "EXTERNAL" {
3486            return Err(client_exception(
3487                "CreateTaskSet requires the service to be created with \
3488                 deploymentController.type = EXTERNAL",
3489            ));
3490        }
3491        let ts_id = format!("ecs-svc-{}", uuid::Uuid::new_v4().simple());
3492        let task_set = TaskSet {
3493            task_set_id: ts_id.clone(),
3494            task_set_arn: format!(
3495                "arn:aws:ecs:{}:{}:task-set/{}/{}/{}",
3496                state.region, state.account_id, cluster_name, service_name, ts_id
3497            ),
3498            service_arn: svc.service_arn.clone(),
3499            cluster_arn: svc.cluster_arn.clone(),
3500            service_name: service_name.clone(),
3501            cluster_name: cluster_name.clone(),
3502            external_id,
3503            status: "ACTIVE".into(),
3504            task_definition,
3505            computed_desired_count: 0,
3506            pending_count: 0,
3507            running_count: 0,
3508            launch_type,
3509            platform_version,
3510            scale,
3511            stability_status: "STABILIZING".into(),
3512            created_at: Utc::now(),
3513            updated_at: Utc::now(),
3514            load_balancers,
3515            service_registries,
3516            capacity_provider_strategy,
3517            tags,
3518        };
3519        let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3520        state.task_sets.insert(key, task_set.clone());
3521        Ok(AwsResponse::ok_json(json!({
3522            "taskSet": task_set_to_json(&task_set),
3523        })))
3524    }
3525
3526    fn update_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3527        let body = request.json_body();
3528        let ts_ref = req_str(&body, "taskSet")?.to_string();
3529        let service_ref = req_str(&body, "service")?;
3530        let service_name = service_name_from_ref(service_ref);
3531        let cluster_ref = opt_str(&body, "cluster");
3532        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3533        let scale = body.get("scale").cloned();
3534
3535        let mut accounts = self.state.write();
3536        let state = accounts
3537            .get_mut(&request.account_id)
3538            .ok_or_else(|| client_exception("task set not found"))?;
3539        let ts_id = task_set_id_from_ref(&ts_ref);
3540        let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3541        let ts = state
3542            .task_sets
3543            .get_mut(&key)
3544            .ok_or_else(|| client_exception(format!("task set not found: {}", ts_ref)))?;
3545        if let Some(v) = scale {
3546            ts.scale = Some(v);
3547        }
3548        ts.updated_at = Utc::now();
3549        Ok(AwsResponse::ok_json(json!({
3550            "taskSet": task_set_to_json(ts),
3551        })))
3552    }
3553
3554    fn delete_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3555        let body = request.json_body();
3556        let ts_ref = req_str(&body, "taskSet")?.to_string();
3557        let service_ref = req_str(&body, "service")?;
3558        let service_name = service_name_from_ref(service_ref);
3559        let cluster_ref = opt_str(&body, "cluster");
3560        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3561        let ts_id = task_set_id_from_ref(&ts_ref);
3562        let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3563
3564        let mut accounts = self.state.write();
3565        let state = accounts
3566            .get_mut(&request.account_id)
3567            .ok_or_else(|| client_exception("task set not found"))?;
3568        let mut ts = state
3569            .task_sets
3570            .remove(&key)
3571            .ok_or_else(|| client_exception(format!("task set not found: {}", ts_ref)))?;
3572        ts.status = "DRAINING".into();
3573        Ok(AwsResponse::ok_json(json!({
3574            "taskSet": task_set_to_json(&ts),
3575        })))
3576    }
3577
3578    fn describe_task_sets(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3579        let body = request.json_body();
3580        let service_ref = req_str(&body, "service")?;
3581        let service_name = service_name_from_ref(service_ref);
3582        let cluster_ref = opt_str(&body, "cluster");
3583        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3584        let filter_refs: Vec<String> = body
3585            .get("taskSets")
3586            .and_then(|v| v.as_array())
3587            .map(|arr| {
3588                arr.iter()
3589                    .filter_map(|v| v.as_str().map(String::from))
3590                    .collect()
3591            })
3592            .unwrap_or_default();
3593
3594        let accounts = self.state.read();
3595        let mut found = Vec::new();
3596        let mut failures = Vec::new();
3597        if let Some(state) = accounts.get(&request.account_id) {
3598            if filter_refs.is_empty() {
3599                for ts in state.task_sets.values() {
3600                    if ts.cluster_name == cluster_name && ts.service_name == service_name {
3601                        found.push(task_set_to_json(ts));
3602                    }
3603                }
3604            } else {
3605                for r in &filter_refs {
3606                    let id = task_set_id_from_ref(r);
3607                    let key = format!("{}/{}/{}", cluster_name, service_name, id);
3608                    match state.task_sets.get(&key) {
3609                        Some(ts) => found.push(task_set_to_json(ts)),
3610                        None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3611                    }
3612                }
3613            }
3614        }
3615        Ok(AwsResponse::ok_json(json!({
3616            "taskSets": found,
3617            "failures": failures,
3618        })))
3619    }
3620
3621    fn update_service_primary_task_set(
3622        &self,
3623        request: &AwsRequest,
3624    ) -> Result<AwsResponse, AwsServiceError> {
3625        let body = request.json_body();
3626        let ts_ref = req_str(&body, "primaryTaskSet")?.to_string();
3627        let service_ref = req_str(&body, "service")?;
3628        let service_name = service_name_from_ref(service_ref);
3629        let cluster_ref = opt_str(&body, "cluster");
3630        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3631        let ts_id = task_set_id_from_ref(&ts_ref);
3632        let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3633        let mut accounts = self.state.write();
3634        let state = accounts
3635            .get_mut(&request.account_id)
3636            .ok_or_else(|| client_exception("task set not found"))?;
3637        if !state.task_sets.contains_key(&key) {
3638            return Err(client_exception(format!("task set not found: {}", ts_ref)));
3639        }
3640        // Demote any existing PRIMARY on this service to ACTIVE before
3641        // promoting the new one. Otherwise the service would be left with
3642        // two PRIMARY task sets, which AWS forbids.
3643        for ts in state.task_sets.values_mut() {
3644            if ts.service_name == service_name
3645                && ts.cluster_name == cluster_name
3646                && ts.status == "PRIMARY"
3647                && ts.task_set_id != ts_id
3648            {
3649                ts.status = "ACTIVE".into();
3650                ts.updated_at = Utc::now();
3651            }
3652        }
3653        let ts = state.task_sets.get_mut(&key).unwrap();
3654        ts.status = "PRIMARY".into();
3655        ts.updated_at = Utc::now();
3656        Ok(AwsResponse::ok_json(json!({
3657            "taskSet": task_set_to_json(ts),
3658        })))
3659    }
3660
3661    async fn execute_command(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3662        let body = request.json_body();
3663        let task_ref = req_str(&body, "task")?.to_string();
3664        let command = req_str(&body, "command")?.to_string();
3665        let interactive = body
3666            .get("interactive")
3667            .and_then(|v| v.as_bool())
3668            .unwrap_or(false);
3669
3670        // Resolve runtime container id.
3671        let container_id = {
3672            let accounts = self.state.read();
3673            let state = accounts
3674                .get(&request.account_id)
3675                .ok_or_else(|| task_not_found(&task_ref))?;
3676            let id = task_id_from_ref(&task_ref);
3677            state
3678                .tasks
3679                .get(&id)
3680                .and_then(|t| t.containers.first())
3681                .and_then(|c| c.runtime_id.clone())
3682        };
3683
3684        let session_id = format!("ecs-execute-command-{}", uuid::Uuid::new_v4());
3685        if let (Some(id), Some(_rt)) = (container_id.clone(), self.runtime.as_ref()) {
3686            // Best-effort proxy: shell the command through docker exec. We
3687            // don't stream back stdout/stderr in this ExecuteCommand response
3688            // (real AWS returns a Session token for the SSM sidecar), so log
3689            // the result server-side for visibility.
3690            let out = tokio::process::Command::new("docker")
3691                .args(["exec", &id, "sh", "-c", &command])
3692                .output()
3693                .await
3694                .map_err(|e| client_exception(format!("docker exec failed: {e}")))?;
3695            tracing::info!(
3696                task = %task_ref,
3697                exit = out.status.code().unwrap_or(-1),
3698                "ExecuteCommand via docker exec"
3699            );
3700        }
3701
3702        Ok(AwsResponse::ok_json(json!({
3703            "clusterArn": opt_str(&body, "cluster").unwrap_or(""),
3704            "containerArn": container_id.unwrap_or_default(),
3705            "containerName": opt_str(&body, "container").unwrap_or(""),
3706            "interactive": interactive,
3707            "session": {
3708                "sessionId": session_id,
3709                "streamUrl": "",
3710                "tokenValue": "",
3711            },
3712            "taskArn": task_ref,
3713        })))
3714    }
3715
3716    fn submit_container_state_change(
3717        &self,
3718        request: &AwsRequest,
3719    ) -> Result<AwsResponse, AwsServiceError> {
3720        // Agent-side API: record whatever the agent tells us about the
3721        // container. We already drive state internally via the runtime,
3722        // so this is an idempotent ack that updates the in-memory copy
3723        // when the named task+container exist.
3724        let body = request.json_body();
3725        let task_ref = opt_str(&body, "task").unwrap_or("");
3726        let container_name = opt_str(&body, "containerName").unwrap_or("");
3727        let status = opt_str(&body, "status").map(String::from);
3728        let exit_code = body.get("exitCode").and_then(|v| v.as_i64());
3729        let reason = opt_str(&body, "reason").map(String::from);
3730
3731        if !task_ref.is_empty() {
3732            let mut accounts = self.state.write();
3733            if let Some(state) = accounts.get_mut(&request.account_id) {
3734                let id = task_id_from_ref(task_ref);
3735                if let Some(task) = state.tasks.get_mut(&id) {
3736                    if let Some(container) = task
3737                        .containers
3738                        .iter_mut()
3739                        .find(|c| c.name == container_name)
3740                    {
3741                        if let Some(s) = status {
3742                            container.last_status = s;
3743                        }
3744                        if let Some(code) = exit_code {
3745                            container.exit_code = Some(code);
3746                        }
3747                        if let Some(r) = reason {
3748                            container.reason = Some(r);
3749                        }
3750                    }
3751                }
3752            }
3753        }
3754        Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3755    }
3756
3757    fn submit_task_state_change(
3758        &self,
3759        request: &AwsRequest,
3760    ) -> Result<AwsResponse, AwsServiceError> {
3761        let body = request.json_body();
3762        let task_ref = opt_str(&body, "task").unwrap_or("");
3763        let status = opt_str(&body, "status").map(String::from);
3764        if !task_ref.is_empty() {
3765            let mut accounts = self.state.write();
3766            if let Some(state) = accounts.get_mut(&request.account_id) {
3767                let id = task_id_from_ref(task_ref);
3768                if let Some(task) = state.tasks.get_mut(&id) {
3769                    if let Some(s) = status {
3770                        task.last_status = s;
3771                    }
3772                }
3773            }
3774        }
3775        Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3776    }
3777
3778    fn submit_attachment_state_changes(
3779        &self,
3780        _request: &AwsRequest,
3781    ) -> Result<AwsResponse, AwsServiceError> {
3782        // Attachments (ENIs) are beyond what fakecloud models today.
3783        // Ack the report so agents don't retry in a tight loop.
3784        Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3785    }
3786
3787    fn discover_poll_endpoint(
3788        &self,
3789        _request: &AwsRequest,
3790    ) -> Result<AwsResponse, AwsServiceError> {
3791        // ECS agents use this to discover the long-poll + telemetry
3792        // endpoints. Point both at fakecloud's local listener; real
3793        // agent polling isn't wired, but the shape is correct.
3794        let accounts = self.state.read();
3795        let endpoint = format!("https://ecs.{}.amazonaws.com/", accounts.region());
3796        Ok(AwsResponse::ok_json(json!({
3797            "endpoint": endpoint,
3798            "telemetryEndpoint": endpoint,
3799            "serviceConnectEndpoint": endpoint,
3800        })))
3801    }
3802
3803    fn stop_service_deployment(
3804        &self,
3805        request: &AwsRequest,
3806    ) -> Result<AwsResponse, AwsServiceError> {
3807        let body = request.json_body();
3808        let deployment_ref = req_str(&body, "serviceDeploymentArn")?.to_string();
3809        let mut accounts = self.state.write();
3810        let state = accounts
3811            .get_mut(&request.account_id)
3812            .ok_or_else(|| client_exception("service deployment not found"))?;
3813        for svc in state.services.values_mut() {
3814            for d in svc.deployments.iter_mut() {
3815                if deployment_ref.contains(&d.deployment_id) {
3816                    d.status = "STOPPED".into();
3817                    d.rollout_state = "FAILED".into();
3818                    d.rollout_state_reason = Some("StopServiceDeployment requested".into());
3819                    d.updated_at = Utc::now();
3820                    return Ok(AwsResponse::ok_json(json!({
3821                        "serviceDeployment": deployment_to_json(d),
3822                    })));
3823                }
3824            }
3825        }
3826        Err(client_exception(format!(
3827            "service deployment not found: {deployment_ref}"
3828        )))
3829    }
3830
3831    fn list_service_deployments(
3832        &self,
3833        request: &AwsRequest,
3834    ) -> Result<AwsResponse, AwsServiceError> {
3835        let body = request.json_body();
3836        let service_ref = req_str(&body, "service")?;
3837        let service_name = service_name_from_ref(service_ref);
3838        let cluster_ref = opt_str(&body, "cluster");
3839        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3840        let accounts = self.state.read();
3841        let mut deployments: Vec<Value> = Vec::new();
3842        if let Some(state) = accounts.get(&request.account_id) {
3843            let key = EcsState::service_key(&cluster_name, &service_name);
3844            if let Some(svc) = state.services.get(&key) {
3845                for d in &svc.deployments {
3846                    deployments.push(json!({
3847                        "serviceDeploymentArn": format!("{}/{}", svc.service_arn, d.deployment_id),
3848                        "serviceArn": svc.service_arn,
3849                        "clusterArn": svc.cluster_arn,
3850                        "status": d.status,
3851                        "createdAt": d.created_at.timestamp(),
3852                        "startedAt": d.created_at.timestamp(),
3853                        "finishedAt": null,
3854                    }));
3855                }
3856            }
3857        }
3858        Ok(AwsResponse::ok_json(json!({
3859            "serviceDeployments": deployments,
3860        })))
3861    }
3862
3863    fn describe_service_deployments(
3864        &self,
3865        request: &AwsRequest,
3866    ) -> Result<AwsResponse, AwsServiceError> {
3867        let body = request.json_body();
3868        let refs: Vec<String> = body
3869            .get("serviceDeploymentArns")
3870            .and_then(|v| v.as_array())
3871            .map(|arr| {
3872                arr.iter()
3873                    .filter_map(|v| v.as_str().map(String::from))
3874                    .collect()
3875            })
3876            .unwrap_or_default();
3877        let accounts = self.state.read();
3878        let mut found = Vec::new();
3879        let mut failures = Vec::new();
3880        if let Some(state) = accounts.get(&request.account_id) {
3881            'next_ref: for r in &refs {
3882                for svc in state.services.values() {
3883                    for d in &svc.deployments {
3884                        if r.contains(&d.deployment_id) {
3885                            found.push(json!({
3886                                "serviceDeploymentArn": r,
3887                                "serviceArn": svc.service_arn,
3888                                "clusterArn": svc.cluster_arn,
3889                                "status": d.status,
3890                                "createdAt": d.created_at.timestamp(),
3891                                "startedAt": d.created_at.timestamp(),
3892                                "finishedAt": null,
3893                                "deploymentConfiguration": {
3894                                    "minimumHealthyPercent": svc.minimum_healthy_percent,
3895                                    "maximumPercent": svc.maximum_percent,
3896                                },
3897                                "sourceServiceRevisions": [],
3898                                "targetServiceRevision": {
3899                                    "arn": d.task_definition_arn,
3900                                    "requestedTaskCount": d.desired_count,
3901                                    "runningTaskCount": d.running_count,
3902                                    "pendingTaskCount": d.pending_count,
3903                                    "failedTasks": d.failed_tasks,
3904                                },
3905                            }));
3906                            continue 'next_ref;
3907                        }
3908                    }
3909                }
3910                failures.push(json!({"arn": r, "reason": "MISSING"}));
3911            }
3912        }
3913        Ok(AwsResponse::ok_json(json!({
3914            "serviceDeployments": found,
3915            "failures": failures,
3916        })))
3917    }
3918
3919    fn describe_service_revisions(
3920        &self,
3921        request: &AwsRequest,
3922    ) -> Result<AwsResponse, AwsServiceError> {
3923        let body = request.json_body();
3924        let refs: Vec<String> = body
3925            .get("serviceRevisionArns")
3926            .and_then(|v| v.as_array())
3927            .map(|arr| {
3928                arr.iter()
3929                    .filter_map(|v| v.as_str().map(String::from))
3930                    .collect()
3931            })
3932            .unwrap_or_default();
3933        Ok(AwsResponse::ok_json(json!({
3934            "serviceRevisions": [],
3935            "failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
3936        })))
3937    }
3938}
3939
3940fn container_instance_id_from_ref(input: &str) -> String {
3941    input.rsplit('/').next().unwrap_or(input).to_string()
3942}
3943
3944fn container_instance_not_found(input: &str) -> AwsServiceError {
3945    AwsServiceError::aws_error(
3946        StatusCode::BAD_REQUEST,
3947        "ClientException",
3948        format!("Container instance not found: {input}"),
3949    )
3950}
3951
3952fn capacity_provider_name_from_ref(input: &str) -> String {
3953    input.rsplit('/').next().unwrap_or(input).to_string()
3954}
3955
3956fn capacity_provider_not_found(name: &str) -> AwsServiceError {
3957    AwsServiceError::aws_error(
3958        StatusCode::BAD_REQUEST,
3959        "ClientException",
3960        format!("Capacity provider not found: {name}"),
3961    )
3962}
3963
3964fn task_set_id_from_ref(input: &str) -> String {
3965    input.rsplit('/').next().unwrap_or(input).to_string()
3966}
3967
3968fn container_instance_to_json(ci: &ContainerInstance) -> Value {
3969    json!({
3970        "containerInstanceArn": ci.container_instance_arn,
3971        "ec2InstanceId": ci.ec2_instance_id,
3972        "status": ci.status,
3973        "version": ci.version,
3974        "versionInfo": ci.version_info,
3975        "agentConnected": ci.agent_connected,
3976        "agentUpdateStatus": ci.agent_update_status,
3977        "remainingResources": ci.remaining_resources,
3978        "registeredResources": ci.registered_resources,
3979        "runningTasksCount": ci.running_tasks_count,
3980        "pendingTasksCount": ci.pending_tasks_count,
3981        "registeredAt": ci.registered_at.timestamp(),
3982        "attributes": ci.attributes.iter().map(|a| json!({
3983            "name": a.name,
3984            "value": a.value,
3985            "targetType": a.target_type,
3986            "targetId": a.target_id,
3987        })).collect::<Vec<_>>(),
3988        "tags": tags_json(&ci.tags),
3989        "capacityProviderName": ci.capacity_provider_name,
3990        "healthStatus": ci.health_status,
3991    })
3992}
3993
3994fn capacity_provider_to_json(cp: &CapacityProvider) -> Value {
3995    json!({
3996        "name": cp.name,
3997        "capacityProviderArn": cp.arn,
3998        "status": cp.status,
3999        "autoScalingGroupProvider": cp.auto_scaling_group_provider,
4000        "updateStatus": cp.update_status,
4001        "updateStatusReason": cp.update_status_reason,
4002        "tags": tags_json(&cp.tags),
4003    })
4004}
4005
4006fn task_set_to_json(ts: &TaskSet) -> Value {
4007    json!({
4008        "id": ts.task_set_id,
4009        "taskSetArn": ts.task_set_arn,
4010        "serviceArn": ts.service_arn,
4011        "clusterArn": ts.cluster_arn,
4012        "externalId": ts.external_id,
4013        "status": ts.status,
4014        "taskDefinition": ts.task_definition,
4015        "computedDesiredCount": ts.computed_desired_count,
4016        "pendingCount": ts.pending_count,
4017        "runningCount": ts.running_count,
4018        "launchType": ts.launch_type,
4019        "platformVersion": ts.platform_version,
4020        "scale": ts.scale,
4021        "stabilityStatus": ts.stability_status,
4022        "createdAt": ts.created_at.timestamp(),
4023        "updatedAt": ts.updated_at.timestamp(),
4024        "loadBalancers": ts.load_balancers,
4025        "serviceRegistries": ts.service_registries,
4026        "capacityProviderStrategy": ts.capacity_provider_strategy,
4027        "tags": tags_json(&ts.tags),
4028    })
4029}
4030
4031fn task_protection_json(task: &Task) -> Value {
4032    let p = task.protection.as_ref();
4033    json!({
4034        "taskArn": task.task_arn,
4035        "protectionEnabled": p.map(|p| p.enabled).unwrap_or(false),
4036        "expirationDate": p.and_then(|p| p.expiration).map(|e| e.timestamp()),
4037    })
4038}
4039
4040#[cfg(test)]
4041mod tests {
4042    use super::*;
4043
4044    #[test]
4045    fn parse_family_revision_with_revision() {
4046        assert_eq!(parse_family_revision("web:3"), ("web".to_string(), Some(3)));
4047    }
4048
4049    #[test]
4050    fn parse_family_revision_without_revision() {
4051        assert_eq!(parse_family_revision("web"), ("web".to_string(), None));
4052    }
4053
4054    #[test]
4055    fn parse_family_revision_non_numeric_treated_as_no_revision() {
4056        assert_eq!(
4057            parse_family_revision("web:latest"),
4058            ("web:latest".to_string(), None)
4059        );
4060    }
4061
4062    #[test]
4063    fn decode_ecs_arn_cluster() {
4064        let (account, rtype, tail) =
4065            decode_ecs_arn("arn:aws:ecs:us-east-1:111122223333:cluster/prod").unwrap();
4066        assert_eq!(account, "111122223333");
4067        assert_eq!(rtype, "cluster");
4068        assert_eq!(tail, "prod");
4069    }
4070
4071    #[test]
4072    fn decode_ecs_arn_task_definition() {
4073        let (account, rtype, tail) =
4074            decode_ecs_arn("arn:aws:ecs:us-east-1:111122223333:task-definition/web:5").unwrap();
4075        assert_eq!(account, "111122223333");
4076        assert_eq!(rtype, "task-definition");
4077        assert_eq!(tail, "web:5");
4078    }
4079
4080    #[test]
4081    fn decode_ecs_arn_rejects_non_ecs() {
4082        assert!(decode_ecs_arn("arn:aws:s3:::bucket").is_err());
4083    }
4084
4085    #[test]
4086    fn validate_family_name_accepts_hyphen_underscore() {
4087        assert!(validate_family_name("web_server-2").is_ok());
4088    }
4089
4090    #[test]
4091    fn validate_family_name_rejects_empty() {
4092        assert!(validate_family_name("").is_err());
4093    }
4094
4095    #[test]
4096    fn validate_family_name_rejects_slash() {
4097        assert!(validate_family_name("web/server").is_err());
4098    }
4099
4100    #[test]
4101    fn resolve_task_definition_ref_bare_family() {
4102        let (account, family, rev) = resolve_task_definition_ref("web").unwrap();
4103        assert_eq!(account, None);
4104        assert_eq!(family, "web");
4105        assert_eq!(rev, None);
4106    }
4107
4108    #[test]
4109    fn resolve_task_definition_ref_family_revision() {
4110        let (account, family, rev) = resolve_task_definition_ref("web:3").unwrap();
4111        assert_eq!(account, None);
4112        assert_eq!(family, "web");
4113        assert_eq!(rev, Some(3));
4114    }
4115
4116    #[test]
4117    fn resolve_task_definition_ref_full_arn() {
4118        let (account, family, rev) =
4119            resolve_task_definition_ref("arn:aws:ecs:us-east-1:111122223333:task-definition/web:3")
4120                .unwrap();
4121        assert_eq!(account, Some("111122223333".to_string()));
4122        assert_eq!(family, "web");
4123        assert_eq!(rev, Some(3));
4124    }
4125
4126    #[test]
4127    fn merge_tags_replaces_existing_value() {
4128        let mut current = vec![TagEntry {
4129            key: "env".into(),
4130            value: "dev".into(),
4131        }];
4132        merge_tags(
4133            &mut current,
4134            vec![TagEntry {
4135                key: "env".into(),
4136                value: "prod".into(),
4137            }],
4138        );
4139        assert_eq!(current.len(), 1);
4140        assert_eq!(current[0].value, "prod");
4141    }
4142
4143    #[test]
4144    fn merge_tags_adds_new() {
4145        let mut current = vec![TagEntry {
4146            key: "env".into(),
4147            value: "dev".into(),
4148        }];
4149        merge_tags(
4150            &mut current,
4151            vec![TagEntry {
4152                key: "team".into(),
4153                value: "platform".into(),
4154            }],
4155        );
4156        assert_eq!(current.len(), 2);
4157    }
4158
4159    #[test]
4160    fn parse_tags_reads_lowercase_keys() {
4161        let body = json!({
4162            "tags": [
4163                {"key": "env", "value": "prod"},
4164                {"key": "team", "value": "platform"},
4165            ]
4166        });
4167        let tags = parse_tags(&body);
4168        assert_eq!(tags.len(), 2);
4169        assert_eq!(tags[0].key, "env");
4170        assert_eq!(tags[0].value, "prod");
4171    }
4172
4173    #[test]
4174    fn matches_filter_respects_none() {
4175        assert!(matches_filter(None, "anything"));
4176        assert!(matches_filter(Some("x"), "x"));
4177        assert!(!matches_filter(Some("x"), "y"));
4178    }
4179}